module Network.AMQP.Worker.Connection
( Connection
, connect
, disconnect
, withChannel
) where
import Control.Concurrent.MVar (MVar, putMVar, newEmptyMVar, readMVar, takeMVar)
import Control.Monad.Catch (throwM, catch)
import Data.Pool (Pool)
import qualified Data.Pool as Pool
import qualified Network.AMQP as AMQP
import Network.AMQP (Channel, AMQPException(..))
import Control.Monad.Trans.Control (MonadBaseControl)
data Connection =
Connection (MVar AMQP.Connection) (Pool Channel)
connect :: AMQP.ConnectionOpts -> IO Connection
connect opts = do
cvar <- newEmptyMVar
openConnection cvar
chans <- Pool.createPool (create cvar) destroy numStripes openTime numChans
pure $ Connection cvar chans
where
numStripes = 1
openTime = 10
numChans = 4
openConnection cvar = do
conn <- AMQP.openConnection'' opts
putMVar cvar conn
reopenConnection cvar = do
_ <- takeMVar cvar
openConnection cvar
create cvar = do
conn <- readMVar cvar
chan <- catch (AMQP.openChannel conn) (createEx cvar)
return chan
createEx cvar (ConnectionClosedException _) = do
reopenConnection cvar
create cvar
createEx _ ex = throwM ex
destroy chan = do
AMQP.closeChannel chan
disconnect :: Connection -> IO ()
disconnect (Connection c p) = do
conn <- readMVar c
Pool.destroyAllResources p
AMQP.closeConnection conn
withChannel :: MonadBaseControl IO m => Connection -> (Channel -> m b) -> m b
withChannel (Connection _ p) action = do
Pool.withResource p action