module Network.AMQP.Conduit (
amqpReceiveSource
, amqpSendSink
, AmqpConf (..)
, AmqpConn (..)
, ExchangeKey
, AmqpURI
, withAMQPChannel
, withAMQPChannels
, createAMQPChannels
, closeChannels
, destoryConnection
, createConsumers
, createConsumers'
, createConsumer
, deleteConsumers
, deleteConsumer
, pauseConsumers
, pauseConsumer
, resumeConsumers
, resumeConsumer
, module Network.AMQP
) where
import Control.Exception.Lifted (bracket)
import Control.Monad.Except
import Control.Monad.Trans.Control (MonadBaseControl)
import Data.Conduit (Sink, Source, await, yield)
import Data.Maybe (isJust, isNothing)
import Data.Text (Text)
import Network.AMQP (AMQPException (ConnectionClosedException),
Ack (Ack, NoAck), Channel,
Connection, ConsumerTag, Envelope,
ExchangeOpts, Message, QueueOpts,
addConnectionClosedHandler,
bindQueue, cancelConsumer,
closeChannel, closeConnection,
declareExchange, declareQueue,
exchangeName, flow, fromURI,
getMsg, openChannel,
openConnection'', publishMsg,
queueName)
import Network.AMQP.Lifted (consumeMsgs)
data AmqpConn = AmqpConn
{ amqpConn :: Connection
, amqpChan :: [(Channel, Maybe ConsumerTag)]
, amqpConf :: AmqpConf
}
data AmqpConf = AmqpConf
{
amqpUri :: AmqpURI
, amqpQueue :: QueueOpts
, amqpExchange :: ExchangeOpts
, amqpExchanKey :: ExchangeKey
}
type ExchangeKey = Text
type AmqpURI = String
withAMQPChannels :: (MonadIO m, MonadBaseControl IO m)
=> AmqpConf
-> Int
-> (AmqpConn -> m a)
-> m a
withAMQPChannels conf num f =
bracket
(do
liftIO $ connect conf (numCheck num))
(\conn -> do
liftIO $liftIO $ disconnect conn)
(\conn -> do
f conn)
withAMQPChannel:: (MonadIO m, MonadBaseControl IO m)
=> AmqpConf
-> (AmqpConn -> m a)
-> m a
withAMQPChannel conf = withAMQPChannels conf 1
createAMQPChannels :: (MonadIO m, MonadBaseControl IO m)
=> AmqpConf
-> Int
-> m AmqpConn
createAMQPChannels conf num = liftIO $ connect conf (numCheck num)
createConsumer :: (MonadIO m, MonadBaseControl IO m)
=> (Channel, Maybe ConsumerTag)
-> Text
-> Ack
-> ((Message, Envelope) -> m ())
-> m (Channel, Maybe ConsumerTag)
createConsumer (chan, tag) queue ack f =
case tag of
Nothing -> do
tag' <- consumeMsgs chan queue ack f
return (chan, Just tag')
Just _ -> return (chan, tag)
deleteConsumer :: MonadIO m
=> (Channel, Maybe ConsumerTag)
-> m (Channel, Maybe ConsumerTag)
deleteConsumer (chan, tag) =
case tag of
Nothing -> return (chan, tag)
Just s -> do
liftIO $ putStrLn "cancel cunsumer channel."
liftIO $ cancelConsumer chan s
return (chan, Nothing)
createConsumersHelper :: (MonadIO m, MonadBaseControl IO m)
=> AmqpConn
-> ((Message, Envelope) -> m ())
-> Ack
-> m AmqpConn
createConsumersHelper conn f ack = do
chan <- mapM (\chan -> createConsumer chan qName ack f) chanList
return $ conn {amqpChan = chan}
where
qName = queueName (amqpQueue (amqpConf conn))
chanList = filter (\(_, tag) -> isNothing tag) (amqpChan conn)
createConsumers' :: (MonadIO m, MonadBaseControl IO m)
=> AmqpConn
-> ((Message, Envelope) -> m ())
-> m AmqpConn
createConsumers' conn f = createConsumersHelper conn f NoAck
createConsumers :: (MonadIO m, MonadBaseControl IO m)
=> AmqpConn
-> ((Message, Envelope) -> m ())
-> m AmqpConn
createConsumers conn f = createConsumersHelper conn f Ack
deleteConsumers :: AmqpConn
-> IO AmqpConn
deleteConsumers conn = do
chan <- mapM deleteConsumer chanList
return $ conn {amqpChan = chan}
where
chanList = filter (\(_, tag) -> isJust tag) (amqpChan conn)
pauseConsumers :: AmqpConn
-> IO AmqpConn
pauseConsumers conn = do
chan <- mapM pauseConsumer chanList
return $ conn {amqpChan = chan}
where
chanList = filter (\(_, tag) -> isJust tag) (amqpChan conn)
pauseConsumer :: (Channel, Maybe ConsumerTag )
-> IO (Channel, Maybe ConsumerTag)
pauseConsumer chan = flowConsumer chan False
resumeConsumers :: AmqpConn
-> IO AmqpConn
resumeConsumers conn = do
chan <- mapM resumeConsumer chanList
return $ conn {amqpChan = chan}
where
chanList = filter (\(_, tag) -> isJust tag) (amqpChan conn)
resumeConsumer :: (Channel, Maybe ConsumerTag)
-> IO (Channel, Maybe ConsumerTag)
resumeConsumer chan = flowConsumer chan True
flowConsumer :: (Channel, Maybe ConsumerTag)
-> Bool
-> IO (Channel, Maybe ConsumerTag)
flowConsumer (chan, tag) flag=
case tag of
Nothing -> return (chan, tag)
Just _ -> do
flow chan flag
return (chan, tag)
closeChannels :: AmqpConn
-> IO AmqpConn
closeChannels conn = do
mapM_ (\(chan, _) -> closeChannel chan ) (amqpChan conn)
return $ conn {amqpChan =[]}
destoryConnection :: AmqpConn
-> IO ()
destoryConnection conn = do
addConnectionClosedHandler (amqpConn conn) False (return ())
closeConnection (amqpConn conn)
connect :: AmqpConf
-> Int
-> IO AmqpConn
connect conf num = do
conn <- openConnection'' uri
chan <- replicateM num (initChan conf conn)
return $ AmqpConn conn chan conf
where
uri = fromURI (amqpUri conf)
initChan :: AmqpConf
-> Connection
-> IO (Channel, Maybe ConsumerTag)
initChan conf conn = do
chan <- openChannel conn
declareExchange chan (amqpExchange conf)
_ <- declareQueue chan (amqpQueue conf)
bindQueue chan qName eName key
return (chan, Nothing)
where
qName = queueName (amqpQueue conf)
key = amqpExchanKey conf
eName = exchangeName (amqpExchange conf)
disconnect :: AmqpConn
-> IO ()
disconnect conn = do
closeConnection (amqpConn conn)
send :: AmqpConn
-> ExchangeKey
-> Message
-> IO ()
send conn key msg = do
publishMsg
channel
(exchangeName (amqpExchange (amqpConf conn))) key msg
where
channel = fst $ head $ (amqpChan conn)
numCheck :: Int -> Int
numCheck int
| int <= 0 = 1
| int > 0 = int
| otherwise = 1
amqpReceiveSource :: (Monad m, MonadIO m)
=> (Message, Envelope)
-> Source m (Message, Envelope)
amqpReceiveSource (msg, env) = loop
where
loop = do
yield (msg, env)
loop
amqpSendSink :: (Monad m, MonadIO m)
=> AmqpConn
-> ExchangeKey
-> Sink Message m ()
amqpSendSink conn key = loop
where
loop = await >>= maybe (return ()) (\v -> (liftIO $ send conn key v) >> loop)