module Network.AMQP.Worker.Connection
( Connection
, connect
, disconnect
, withChannel
) where
import Control.Concurrent.MVar (readMVar, newEmptyMVar, putMVar)
import Data.Pool (Pool)
import qualified Data.Pool as Pool
import qualified Network.AMQP as AMQP
import Network.AMQP (Channel)
import Control.Monad.Trans.Control (MonadBaseControl)
newtype Connection =
Connection (Pool Channel)
connect :: AMQP.ConnectionOpts -> IO Connection
connect opts = do
chansVar <- newEmptyMVar
conns <- Pool.createPool createConn (destroyConn chansVar) 1 connOpenTime 1
chans <- Pool.createPool (create conns) destroy numStripes openTime numResources
putMVar chansVar chans
pure $ Connection chans
where
numStripes = 1
openTime = 10
numResources = 4
connOpenTime = 60
create connPool = do
Pool.withResource connPool $ AMQP.openChannel
destroy chan = do
AMQP.closeChannel chan
createConn = do
conn <- AMQP.openConnection'' opts
pure conn
destroyConn chans conn = do
chanPool <- readMVar chans
Pool.destroyAllResources chanPool
AMQP.closeConnection conn
disconnect :: Connection -> IO ()
disconnect (Connection p) =
Pool.destroyAllResources p
withChannel :: MonadBaseControl IO m => Connection -> (Channel -> m b) -> m b
withChannel (Connection p) action =
Pool.withResource p action