{-# LANGUAGE FlexibleContexts #-}

module Network.AMQP.Streamly
  (
    -- * How to use this library
    -- $use
    SendInstructions(..)
  , produce
  , consume
  )
where

import           Control.Concurrent.MVar
import           Control.Monad.IO.Class         ( MonadIO
                                                , liftIO
                                                )

import           Data.Text                      ( Text )
import           Network.AMQP
import           Streamly.Prelude
import qualified Streamly.Internal.Data.Stream.IsStream.Common     as S
import qualified Streamly.Prelude              as S

-- | Informations to be sent
--
-- See @Network.AMQP.publishMsg'@ for options
data SendInstructions = SendInstructions { SendInstructions -> Text
exchange :: Text, SendInstructions -> Text
routingKey :: Text, SendInstructions -> Bool
mandatory :: Bool, SendInstructions -> Message
message :: Message } deriving (Int -> SendInstructions -> ShowS
[SendInstructions] -> ShowS
SendInstructions -> String
(Int -> SendInstructions -> ShowS)
-> (SendInstructions -> String)
-> ([SendInstructions] -> ShowS)
-> Show SendInstructions
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [SendInstructions] -> ShowS
$cshowList :: [SendInstructions] -> ShowS
show :: SendInstructions -> String
$cshow :: SendInstructions -> String
showsPrec :: Int -> SendInstructions -> ShowS
$cshowsPrec :: Int -> SendInstructions -> ShowS
Show)

-- | The Queue name
type Queue = Text

-- | Publish the produced messages
produce
  :: (IsStream t, MonadAsync m) => Channel -> t m SendInstructions -> t m ()
produce :: Channel -> t m SendInstructions -> t m ()
produce Channel
channel = (SendInstructions -> m ()) -> t m SendInstructions -> t m ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
(a -> m b) -> t m a -> t m b
S.mapM SendInstructions -> m ()
forall (m :: * -> *). MonadIO m => SendInstructions -> m ()
send
 where
  send :: SendInstructions -> m ()
send SendInstructions
i = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
    Channel -> Text -> Text -> Bool -> Message -> IO (Maybe Int)
publishMsg' Channel
channel (SendInstructions -> Text
exchange SendInstructions
i) (SendInstructions -> Text
routingKey SendInstructions
i) (SendInstructions -> Bool
mandatory SendInstructions
i) (SendInstructions -> Message
message SendInstructions
i)
    () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()

-- | Stream messages from a queue
--
-- See @Network.AMQP.consumeMsgs@ for options
consume
  :: (IsStream t, MonadAsync m)
  => Channel
  -> Queue
  -> Ack
  -> t m (Message, Envelope)
consume :: Channel -> Text -> Ack -> t m (Message, Envelope)
consume Channel
channel Text
queue Ack
ack = m (t m (Message, Envelope)) -> t m (Message, Envelope)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
m (t m a) -> t m a
S.concatM (m (t m (Message, Envelope)) -> t m (Message, Envelope))
-> m (t m (Message, Envelope)) -> t m (Message, Envelope)
forall a b. (a -> b) -> a -> b
$ IO (t m (Message, Envelope)) -> m (t m (Message, Envelope))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (t m (Message, Envelope)) -> m (t m (Message, Envelope)))
-> IO (t m (Message, Envelope)) -> m (t m (Message, Envelope))
forall a b. (a -> b) -> a -> b
$ do
  MVar (Message, Envelope)
mvar <- IO (MVar (Message, Envelope))
forall a. IO (MVar a)
newEmptyMVar
  Channel -> Text -> Ack -> ((Message, Envelope) -> IO ()) -> IO Text
consumeMsgs Channel
channel Text
queue Ack
Ack (((Message, Envelope) -> IO ()) -> IO Text)
-> ((Message, Envelope) -> IO ()) -> IO Text
forall a b. (a -> b) -> a -> b
$ MVar (Message, Envelope) -> (Message, Envelope) -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar (Message, Envelope)
mvar
  t m (Message, Envelope) -> IO (t m (Message, Envelope))
forall (m :: * -> *) a. Monad m => a -> m a
return (t m (Message, Envelope) -> IO (t m (Message, Envelope)))
-> t m (Message, Envelope) -> IO (t m (Message, Envelope))
forall a b. (a -> b) -> a -> b
$ m (Message, Envelope) -> t m (Message, Envelope)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
m a -> t m a
S.repeatM (m (Message, Envelope) -> t m (Message, Envelope))
-> m (Message, Envelope) -> t m (Message, Envelope)
forall a b. (a -> b) -> a -> b
$ MVar (Message, Envelope) -> m (Message, Envelope)
forall (m :: * -> *).
MonadIO m =>
MVar (Message, Envelope) -> m (Message, Envelope)
taking MVar (Message, Envelope)
mvar
 where
  taking :: MonadIO m => MVar (Message, Envelope) -> m (Message, Envelope)
  taking :: MVar (Message, Envelope) -> m (Message, Envelope)
taking MVar (Message, Envelope)
mvar = IO (Message, Envelope) -> m (Message, Envelope)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Message, Envelope) -> m (Message, Envelope))
-> IO (Message, Envelope) -> m (Message, Envelope)
forall a b. (a -> b) -> a -> b
$ if Ack
ack Ack -> Ack -> Bool
forall a. Eq a => a -> a -> Bool
== Ack
NoAck
    then do
      (Message, Envelope)
retrieved <- MVar (Message, Envelope) -> IO (Message, Envelope)
forall a. MVar a -> IO a
takeMVar MVar (Message, Envelope)
mvar
      Envelope -> IO ()
ackEnv (Envelope -> IO ()) -> Envelope -> IO ()
forall a b. (a -> b) -> a -> b
$ (Message, Envelope) -> Envelope
forall a b. (a, b) -> b
snd (Message, Envelope)
retrieved
      (Message, Envelope) -> IO (Message, Envelope)
forall (m :: * -> *) a. Monad m => a -> m a
return (Message, Envelope)
retrieved
    else MVar (Message, Envelope) -> IO (Message, Envelope)
forall a. MVar a -> IO a
takeMVar MVar (Message, Envelope)
mvar


-- $use
--
-- This section contains basic step-by-step usage of the library.
--
-- You can either build a producer, which will publish all the messages of
-- a stream:
--
-- > Streamly.drain $ produce channel sendInstructionsStream
--
-- Or a consumer, which will contain the @Message@s and @Envelope@s of
-- a queue:
--
-- > Streamly.drain $ consume channel aQueue NoAck