{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE OverloadedStrings #-}

module Network.AMQP.Worker.Connection
    ( Connection (..)
    , connect
    , disconnect
    , withChannel
    ) where

import Control.Concurrent.MVar
    ( MVar
    , newEmptyMVar
    , putMVar
    , readMVar
    , takeMVar
    )
import Control.Monad.Catch (catch, throwM)
import Control.Monad.IO.Class (MonadIO, liftIO)
import Data.Pool (Pool)
import qualified Data.Pool as Pool
import Data.Text (Text)
import Network.AMQP (AMQPException (..), Channel)
import qualified Network.AMQP as AMQP

type ExchangeName = Text

-- | Internal connection details
data Connection = Connection
    { Connection -> MVar Connection
amqpConn :: MVar AMQP.Connection
    , Connection -> Pool Channel
pool :: Pool Channel
    , Connection -> ExchangeName
exchange :: ExchangeName
    }

-- | Connect to the AMQP server.
--
-- >>> conn <- connect (fromURI "amqp://guest:guest@localhost:5672")
connect :: MonadIO m => AMQP.ConnectionOpts -> m Connection
connect :: forall (m :: * -> *). MonadIO m => ConnectionOpts -> m Connection
connect ConnectionOpts
opts = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ do
    -- use a default exchange name
    let exchangeName :: ExchangeName
exchangeName = ExchangeName
"amq.topic"

    -- create a single connection in an mvar
    MVar Connection
cvar <- forall a. IO (MVar a)
newEmptyMVar
    MVar Connection -> IO ()
openConnection MVar Connection
cvar

    let config :: PoolConfig Channel
config = forall a. IO a -> (a -> IO ()) -> Double -> Int -> PoolConfig a
Pool.defaultPoolConfig (MVar Connection -> IO Channel
create MVar Connection
cvar) Channel -> IO ()
destroy Double
openTime Int
numChans

    -- open a shared pool for channels
    Pool Channel
chans <- forall a. PoolConfig a -> IO (Pool a)
Pool.newPool PoolConfig Channel
config

    forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ MVar Connection -> Pool Channel -> ExchangeName -> Connection
Connection MVar Connection
cvar Pool Channel
chans ExchangeName
exchangeName
  where
    openTime :: Double
openTime = Double
10
    numChans :: Int
numChans = Int
4

    openConnection :: MVar Connection -> IO ()
openConnection MVar Connection
cvar = do
        -- open a connection and store in the mvar
        Connection
conn <- ConnectionOpts -> IO Connection
AMQP.openConnection'' ConnectionOpts
opts
        forall a. MVar a -> a -> IO ()
putMVar MVar Connection
cvar Connection
conn

    reopenConnection :: MVar Connection -> IO ()
reopenConnection MVar Connection
cvar = do
        -- clear the mvar and reopen
        Connection
_ <- forall a. MVar a -> IO a
takeMVar MVar Connection
cvar
        MVar Connection -> IO ()
openConnection MVar Connection
cvar

    create :: MVar Connection -> IO Channel
create MVar Connection
cvar = do
        Connection
conn <- forall a. MVar a -> IO a
readMVar MVar Connection
cvar
        Channel
chan <- forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> (e -> m a) -> m a
catch (Connection -> IO Channel
AMQP.openChannel Connection
conn) (MVar Connection -> AMQPException -> IO Channel
createEx MVar Connection
cvar)
        forall (m :: * -> *) a. Monad m => a -> m a
return Channel
chan

    createEx :: MVar Connection -> AMQPException -> IO Channel
createEx MVar Connection
cvar (ConnectionClosedException CloseType
_ String
_) = do
        MVar Connection -> IO ()
reopenConnection MVar Connection
cvar
        MVar Connection -> IO Channel
create MVar Connection
cvar
    createEx MVar Connection
_ AMQPException
ex = forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwM AMQPException
ex

    destroy :: Channel -> IO ()
destroy Channel
chan = do
        Channel -> IO ()
AMQP.closeChannel Channel
chan

disconnect :: MonadIO m => Connection -> m ()
disconnect :: forall (m :: * -> *). MonadIO m => Connection -> m ()
disconnect Connection
c = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ do
    Connection
conn <- forall a. MVar a -> IO a
readMVar forall a b. (a -> b) -> a -> b
$ Connection -> MVar Connection
amqpConn Connection
c
    forall a. Pool a -> IO ()
Pool.destroyAllResources forall a b. (a -> b) -> a -> b
$ Connection -> Pool Channel
pool Connection
c
    Connection -> IO ()
AMQP.closeConnection Connection
conn

-- | Perform an action with a channel resource
withChannel :: Connection -> (Channel -> IO b) -> IO b
withChannel :: forall b. Connection -> (Channel -> IO b) -> IO b
withChannel (Connection MVar Connection
_ Pool Channel
p ExchangeName
_) Channel -> IO b
action = do
    forall a r. Pool a -> (a -> IO r) -> IO r
Pool.withResource Pool Channel
p Channel -> IO b
action