{-# LANGUAGE FlexibleContexts #-}
module Network.AMQP.Streamly
(
SendInstructions(..)
, produce
, consume
)
where
import Control.Concurrent.MVar
import Control.Monad.IO.Class ( MonadIO
, liftIO
)
import Data.Text ( Text )
import Network.AMQP
import Streamly.Prelude
import qualified Streamly.Internal.Data.Stream.IsStream.Common as S
import qualified Streamly.Prelude as S
data SendInstructions = SendInstructions { SendInstructions -> Text
exchange :: Text, SendInstructions -> Text
routingKey :: Text, SendInstructions -> Bool
mandatory :: Bool, SendInstructions -> Message
message :: Message } deriving (Int -> SendInstructions -> ShowS
[SendInstructions] -> ShowS
SendInstructions -> String
(Int -> SendInstructions -> ShowS)
-> (SendInstructions -> String)
-> ([SendInstructions] -> ShowS)
-> Show SendInstructions
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [SendInstructions] -> ShowS
$cshowList :: [SendInstructions] -> ShowS
show :: SendInstructions -> String
$cshow :: SendInstructions -> String
showsPrec :: Int -> SendInstructions -> ShowS
$cshowsPrec :: Int -> SendInstructions -> ShowS
Show)
type Queue = Text
produce
:: (IsStream t, MonadAsync m) => Channel -> t m SendInstructions -> t m ()
produce :: Channel -> t m SendInstructions -> t m ()
produce Channel
channel = (SendInstructions -> m ()) -> t m SendInstructions -> t m ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
(a -> m b) -> t m a -> t m b
S.mapM SendInstructions -> m ()
forall (m :: * -> *). MonadIO m => SendInstructions -> m ()
send
where
send :: SendInstructions -> m ()
send SendInstructions
i = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
Channel -> Text -> Text -> Bool -> Message -> IO (Maybe Int)
publishMsg' Channel
channel (SendInstructions -> Text
exchange SendInstructions
i) (SendInstructions -> Text
routingKey SendInstructions
i) (SendInstructions -> Bool
mandatory SendInstructions
i) (SendInstructions -> Message
message SendInstructions
i)
() -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
consume
:: (IsStream t, MonadAsync m)
=> Channel
-> Queue
-> Ack
-> t m (Message, Envelope)
consume :: Channel -> Text -> Ack -> t m (Message, Envelope)
consume Channel
channel Text
queue Ack
ack = m (t m (Message, Envelope)) -> t m (Message, Envelope)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
m (t m a) -> t m a
S.concatM (m (t m (Message, Envelope)) -> t m (Message, Envelope))
-> m (t m (Message, Envelope)) -> t m (Message, Envelope)
forall a b. (a -> b) -> a -> b
$ IO (t m (Message, Envelope)) -> m (t m (Message, Envelope))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (t m (Message, Envelope)) -> m (t m (Message, Envelope)))
-> IO (t m (Message, Envelope)) -> m (t m (Message, Envelope))
forall a b. (a -> b) -> a -> b
$ do
MVar (Message, Envelope)
mvar <- IO (MVar (Message, Envelope))
forall a. IO (MVar a)
newEmptyMVar
Channel -> Text -> Ack -> ((Message, Envelope) -> IO ()) -> IO Text
consumeMsgs Channel
channel Text
queue Ack
Ack (((Message, Envelope) -> IO ()) -> IO Text)
-> ((Message, Envelope) -> IO ()) -> IO Text
forall a b. (a -> b) -> a -> b
$ MVar (Message, Envelope) -> (Message, Envelope) -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar (Message, Envelope)
mvar
t m (Message, Envelope) -> IO (t m (Message, Envelope))
forall (m :: * -> *) a. Monad m => a -> m a
return (t m (Message, Envelope) -> IO (t m (Message, Envelope)))
-> t m (Message, Envelope) -> IO (t m (Message, Envelope))
forall a b. (a -> b) -> a -> b
$ m (Message, Envelope) -> t m (Message, Envelope)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
m a -> t m a
S.repeatM (m (Message, Envelope) -> t m (Message, Envelope))
-> m (Message, Envelope) -> t m (Message, Envelope)
forall a b. (a -> b) -> a -> b
$ MVar (Message, Envelope) -> m (Message, Envelope)
forall (m :: * -> *).
MonadIO m =>
MVar (Message, Envelope) -> m (Message, Envelope)
taking MVar (Message, Envelope)
mvar
where
taking :: MonadIO m => MVar (Message, Envelope) -> m (Message, Envelope)
taking :: MVar (Message, Envelope) -> m (Message, Envelope)
taking MVar (Message, Envelope)
mvar = IO (Message, Envelope) -> m (Message, Envelope)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Message, Envelope) -> m (Message, Envelope))
-> IO (Message, Envelope) -> m (Message, Envelope)
forall a b. (a -> b) -> a -> b
$ if Ack
ack Ack -> Ack -> Bool
forall a. Eq a => a -> a -> Bool
== Ack
NoAck
then do
(Message, Envelope)
retrieved <- MVar (Message, Envelope) -> IO (Message, Envelope)
forall a. MVar a -> IO a
takeMVar MVar (Message, Envelope)
mvar
Envelope -> IO ()
ackEnv (Envelope -> IO ()) -> Envelope -> IO ()
forall a b. (a -> b) -> a -> b
$ (Message, Envelope) -> Envelope
forall a b. (a, b) -> b
snd (Message, Envelope)
retrieved
(Message, Envelope) -> IO (Message, Envelope)
forall (m :: * -> *) a. Monad m => a -> m a
return (Message, Envelope)
retrieved
else MVar (Message, Envelope) -> IO (Message, Envelope)
forall a. MVar a -> IO a
takeMVar MVar (Message, Envelope)
mvar