{-# LANGUAGE FlexibleContexts #-} module Network.AMQP.Worker.Connection ( Connection , connect , disconnect , withChannel ) where import Control.Concurrent.MVar (MVar, putMVar, newEmptyMVar, readMVar, takeMVar) import Control.Monad.Catch (throwM, catch) import Data.Pool (Pool) import qualified Data.Pool as Pool import qualified Network.AMQP as AMQP import Network.AMQP (Channel, AMQPException(..)) import Control.Monad.Trans.Control (MonadBaseControl) data Connection = Connection (MVar AMQP.Connection) (Pool Channel) -- | Connect to the AMQP server. -- -- > conn <- connect (fromURI "amqp://guest:guest@localhost:5672") -- connect :: AMQP.ConnectionOpts -> IO Connection connect opts = do -- create a single connection in an mvar cvar <- newEmptyMVar openConnection cvar -- open a shared pool for channels chans <- Pool.createPool (create cvar) destroy numStripes openTime numChans pure $ Connection cvar chans where numStripes = 1 openTime = 10 numChans = 4 openConnection cvar = do -- open a connection and store in the mvar conn <- AMQP.openConnection'' opts putMVar cvar conn reopenConnection cvar = do -- clear the mvar and reopen _ <- takeMVar cvar openConnection cvar create cvar = do conn <- readMVar cvar chan <- catch (AMQP.openChannel conn) (createEx cvar) return chan createEx cvar (ConnectionClosedException _) = do reopenConnection cvar create cvar createEx _ ex = throwM ex destroy chan = do AMQP.closeChannel chan disconnect :: Connection -> IO () disconnect (Connection c p) = do conn <- readMVar c Pool.destroyAllResources p AMQP.closeConnection conn withChannel :: MonadBaseControl IO m => Connection -> (Channel -> m b) -> m b withChannel (Connection _ p) action = do Pool.withResource p action