{-# Language RankNTypes #-}
-------------------------------------------------------------------------------
-- |
-- Module     : Network/Mom/Stompl/Client/Conduit.hs
-- Copyright  : (c) Tobias Schoofs
-- License    : LGPL 
-- Stability  : experimental
-- Portability: portable
--
-- This module provides Conduit interfaces
-- for the stomp-queue library.
-- The interfaces create or receive
-- streams of messages instead of single messages.
-- This approach aims to simplify the integration
-- of messaging into applications by means of well-defined
-- streaming interfaces. 
-------------------------------------------------------------------------------
module Network.Mom.Stompl.Client.Conduit (
                                  -- * Plain message streams
                                  qSource, qSink, 
                                  -- * Multipart messages as streams
                                  qMultiSource, qMultiSink)
where

  import qualified Data.Conduit as C
  import           Codec.MIME.Type (Type)
  import           Control.Monad.Trans (liftIO)
  import           Control.Monad.Trans.Resource (MonadResource)
  import           System.Timeout

  import           Network.Mom.Stompl.Client.Queue
  import           Network.Mom.Stompl.Frame

  ------------------------------------------------------------------------
  -- | Reads from a 'Reader' queue with timeout 
  --   and returns a 'C.Producer' of type 'Message' i.
  --
  --   The function ends iff the timeout expires 
  --   and loops infinitely otherwise.
  --
  --   Parameters:
  --
  --   * 'Reader' i: The input interface, a Stomp 'Reader'. 
  --
  --   * Int: Timeout in microseconds
  ------------------------------------------------------------------------
  qSource :: MonadResource m => 
             Reader i -> Int -> C.Producer m (Message i)
  qSource r tmo = go
    where go = liftIO (timeout tmo $ readQ r) >>= mbYield
          mbYield mbX = case mbX of
                          Nothing -> return ()
                          Just x  -> C.yield x >> go

  ------------------------------------------------------------------------
  -- | Writes a stream of messages to a 'Writer' queue
  --   and returns a 'C.Consumer' of type o.
  --   The function terminates, when the stream ends.
  --
  --   Parameters:
  --
  --   * 'Writer' o: The output interface, a Stomp 'Writer'.
  --
  --   * 'Type': The mime type of the message content.
  --
  --   * ['Header']: Headers to add to each message.
  ------------------------------------------------------------------------
  qSink :: MonadResource m => 
           Writer o -> Type -> [Header] -> C.Consumer o m ()
  qSink w t hs = C.awaitForever $ \x -> liftIO (writeQ w t hs x)

  -- header to mark the last segment of a multipart message -------------
  lastMHdr :: Header
  lastMHdr = ("__last__", "true")

  ------------------------------------------------------------------------
  -- | Reads from a 'Reader' queue with timeout 
  --   and returns a 'C.Producer' of type 'Message' i.
  --   The function ends when the timeout expires 
  --   or  after having received a segment that is marked as the last one.
  --   Note that multipart messages are not foreseen by the standard.
  --   'qMultiSink' and 'qMultiSource' use a header named \"__last__\"
  --   to label the last segment of a multipart message.
  --
  --   For parameters, please refer to 'qSource'. 
  ------------------------------------------------------------------------
  qMultiSource :: MonadResource m =>
                  Reader i -> Int -> C.Producer m (Message i)
  qMultiSource r tmo = loop
    where loop = do
            mbX <- liftIO (timeout tmo $ readQ r)
            case mbX of
              Nothing -> return ()
              Just x  -> case lookup (fst lastMHdr) $ msgHdrs x of
                           Just "true" -> C.yield x
                           _           -> C.yield x >> loop
  

  ------------------------------------------------------------------------
  -- | Writes a multipart message to a 'Writer' queue
  --   and returns a 'C.Consumer' of type o.
  --   The function terminates, when the stream ends.
  --   The last segment is sent with the header (\"__last__\", \"true\").
  --   Note that multipart messages are not foreseen by the standard.
  --
  --   For parameters, please refer to 'qSink'. 
  ------------------------------------------------------------------------
  qMultiSink :: MonadResource m =>
                Writer o -> Type -> [Header] -> C.Consumer o m ()
  qMultiSink w t hs = do
    mbX <- C.await
    case mbX of
      Nothing -> return ()
      Just x  -> go x
    where hs'  = lastMHdr:hs
          go x = do
            mbY <- C.await
            case mbY of
              Nothing -> liftIO (writeQ w t hs' x) 
              Just y  -> liftIO (writeQ w t hs  x) >> go y