{-# Language RankNTypes #-}
-------------------------------------------------------------------------------
-- |
-- Module     : Network/Mom/Stompl/Client/Conduit.hs
-- Copyright  : (c) Tobias Schoofs
-- License    : LGPL 
-- Stability  : experimental
-- Portability: portable
--
-- This module provides Conduit interfaces
-- for the stomp-queue library.
-- The interfaces create or receive
-- streams of messages instead of single messages.
-- This approach aims to simplify the integration
-- of messaging into applications by means of well-defined
-- streaming interfaces. 
-------------------------------------------------------------------------------
module Network.Mom.Stompl.Client.Conduit (
                                  -- * Plain message streams
                                  qSource, qSink, 
                                  -- * Multipart messages as streams
                                  qMultiSource, qMultiSink)
where

  import qualified Data.Conduit as C
  import           Codec.MIME.Type (Type)
  import           Control.Monad.Trans (liftIO)
  import           Control.Monad.IO.Class (MonadIO)
  import           System.Timeout

  import           Network.Mom.Stompl.Client.Queue
  import           Network.Mom.Stompl.Frame

  ------------------------------------------------------------------------
  -- | Reads from a 'Reader' queue with timeout 
  --   and returns a 'C.Producer' of type 'Message' i.
  --
  --   The function ends iff the timeout expires 
  --   and loops infinitely otherwise.
  --
  --   Parameters:
  --
  --   * 'Reader' i: The input interface, a Stomp 'Reader'. 
  --
  --   * Int: Timeout in microseconds
  ------------------------------------------------------------------------
  qSource :: MonadIO m => 
             Reader i -> Int -> C.ConduitT () (Message i) m ()
  qSource :: Reader i -> Int -> ConduitT () (Message i) m ()
qSource Reader i
r Int
tmo = ConduitT () (Message i) m ()
forall i. ConduitT i (Message i) m ()
go
    where go :: ConduitT i (Message i) m ()
go = IO (Maybe (Message i))
-> ConduitT i (Message i) m (Maybe (Message i))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Int -> IO (Message i) -> IO (Maybe (Message i))
forall a. Int -> IO a -> IO (Maybe a)
timeout Int
tmo (IO (Message i) -> IO (Maybe (Message i)))
-> IO (Message i) -> IO (Maybe (Message i))
forall a b. (a -> b) -> a -> b
$ Reader i -> IO (Message i)
forall a. Reader a -> IO (Message a)
readQ Reader i
r) ConduitT i (Message i) m (Maybe (Message i))
-> (Maybe (Message i) -> ConduitT i (Message i) m ())
-> ConduitT i (Message i) m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Maybe (Message i) -> ConduitT i (Message i) m ()
mbYield
          mbYield :: Maybe (Message i) -> ConduitT i (Message i) m ()
mbYield Maybe (Message i)
mbX = case Maybe (Message i)
mbX of
                          Maybe (Message i)
Nothing -> () -> ConduitT i (Message i) m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
                          Just Message i
x  -> Message i -> ConduitT i (Message i) m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
C.yield Message i
x ConduitT i (Message i) m ()
-> ConduitT i (Message i) m () -> ConduitT i (Message i) m ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> ConduitT i (Message i) m ()
go

  ------------------------------------------------------------------------
  -- | Writes a stream of messages to a 'Writer' queue
  --   and returns a 'C.Consumer' of type o.
  --   The function terminates, when the stream ends.
  --
  --   Parameters:
  --
  --   * 'Writer' o: The output interface, a Stomp 'Writer'.
  --
  --   * 'Type': The mime type of the message content.
  --
  --   * ['Header']: Headers to add to each message.
  ------------------------------------------------------------------------
  qSink :: MonadIO m => 
           Writer o -> Type -> [Header] -> C.ConduitT o C.Void m ()
  qSink :: Writer o -> Type -> [Header] -> ConduitT o Void m ()
qSink Writer o
w Type
t [Header]
hs = (o -> ConduitT o Void m ()) -> ConduitT o Void m ()
forall (m :: * -> *) i o r.
Monad m =>
(i -> ConduitT i o m r) -> ConduitT i o m ()
C.awaitForever ((o -> ConduitT o Void m ()) -> ConduitT o Void m ())
-> (o -> ConduitT o Void m ()) -> ConduitT o Void m ()
forall a b. (a -> b) -> a -> b
$ \o
x -> IO () -> ConduitT o Void m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Writer o -> Type -> [Header] -> o -> IO ()
forall a. Writer a -> Type -> [Header] -> a -> IO ()
writeQ Writer o
w Type
t [Header]
hs o
x)

  -- header to mark the last segment of a multipart message -------------
  lastMHdr :: Header
  lastMHdr :: Header
lastMHdr = ([Char]
"__last__", [Char]
"true")

  ------------------------------------------------------------------------
  -- | Reads from a 'Reader' queue with timeout 
  --   and returns a 'C.Producer' of type 'Message' i.
  --   The function ends when the timeout expires 
  --   or  after having received a segment that is marked as the last one.
  --   Note that multipart messages are not foreseen by the standard.
  --   'qMultiSink' and 'qMultiSource' use a header named \"__last__\"
  --   to label the last segment of a multipart message.
  --
  --   For parameters, please refer to 'qSource'. 
  ------------------------------------------------------------------------
  qMultiSource :: MonadIO m =>
                  Reader i -> Int -> C.ConduitT () (Message i) m ()
  qMultiSource :: Reader i -> Int -> ConduitT () (Message i) m ()
qMultiSource Reader i
r Int
tmo = ConduitT () (Message i) m ()
forall i. ConduitT i (Message i) m ()
loop
    where loop :: ConduitT i (Message i) m ()
loop = do
            Maybe (Message i)
mbX <- IO (Maybe (Message i))
-> ConduitT i (Message i) m (Maybe (Message i))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Int -> IO (Message i) -> IO (Maybe (Message i))
forall a. Int -> IO a -> IO (Maybe a)
timeout Int
tmo (IO (Message i) -> IO (Maybe (Message i)))
-> IO (Message i) -> IO (Maybe (Message i))
forall a b. (a -> b) -> a -> b
$ Reader i -> IO (Message i)
forall a. Reader a -> IO (Message a)
readQ Reader i
r)
            case Maybe (Message i)
mbX of
              Maybe (Message i)
Nothing -> () -> ConduitT i (Message i) m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
              Just Message i
x  -> case [Char] -> [Header] -> Maybe [Char]
forall a b. Eq a => a -> [(a, b)] -> Maybe b
lookup (Header -> [Char]
forall a b. (a, b) -> a
fst Header
lastMHdr) ([Header] -> Maybe [Char]) -> [Header] -> Maybe [Char]
forall a b. (a -> b) -> a -> b
$ Message i -> [Header]
forall a. Message a -> [Header]
msgHdrs Message i
x of
                           Just [Char]
"true" -> Message i -> ConduitT i (Message i) m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
C.yield Message i
x
                           Maybe [Char]
_           -> Message i -> ConduitT i (Message i) m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
C.yield Message i
x ConduitT i (Message i) m ()
-> ConduitT i (Message i) m () -> ConduitT i (Message i) m ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> ConduitT i (Message i) m ()
loop
  

  ------------------------------------------------------------------------
  -- | Writes a multipart message to a 'Writer' queue
  --   and returns a 'C.Consumer' of type o.
  --   The function terminates, when the stream ends.
  --   The last segment is sent with the header (\"__last__\", \"true\").
  --   Note that multipart messages are not foreseen by the standard.
  --
  --   For parameters, please refer to 'qSink'. 
  ------------------------------------------------------------------------
  qMultiSink :: MonadIO m =>
                Writer o -> Type -> [Header] -> C.ConduitT o C.Void m ()
  qMultiSink :: Writer o -> Type -> [Header] -> ConduitT o Void m ()
qMultiSink Writer o
w Type
t [Header]
hs = do
    Maybe o
mbX <- ConduitT o Void m (Maybe o)
forall (m :: * -> *) i. Monad m => Consumer i m (Maybe i)
C.await
    case Maybe o
mbX of
      Maybe o
Nothing -> () -> ConduitT o Void m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
      Just o
x  -> o -> ConduitT o Void m ()
forall (m :: * -> *) o. MonadIO m => o -> ConduitT o o m ()
go o
x
    where hs' :: [Header]
hs'  = Header
lastMHdrHeader -> [Header] -> [Header]
forall a. a -> [a] -> [a]
:[Header]
hs
          go :: o -> ConduitT o o m ()
go o
x = do
            Maybe o
mbY <- ConduitT o o m (Maybe o)
forall (m :: * -> *) i. Monad m => Consumer i m (Maybe i)
C.await
            case Maybe o
mbY of
              Maybe o
Nothing -> IO () -> ConduitT o o m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Writer o -> Type -> [Header] -> o -> IO ()
forall a. Writer a -> Type -> [Header] -> a -> IO ()
writeQ Writer o
w Type
t [Header]
hs' o
x) 
              Just o
y  -> IO () -> ConduitT o o m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Writer o -> Type -> [Header] -> o -> IO ()
forall a. Writer a -> Type -> [Header] -> a -> IO ()
writeQ Writer o
w Type
t [Header]
hs  o
x) ConduitT o o m () -> ConduitT o o m () -> ConduitT o o m ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> o -> ConduitT o o m ()
go o
y