-------------------------------------------------------------------------------
-- |
-- Module     : Network/Mom/Patterns/Basic/Subscriber.hs
-- Copyright  : (c) Tobias Schoofs
-- License    : LGPL 
-- Stability  : experimental
-- Portability: non-portable
-- 
-- Subscriber side of \'Publish Subscribe\'
-------------------------------------------------------------------------------
module Network.Mom.Patterns.Basic.Subscriber (
         Sub, withSub, subscribe, checkSub)
         
where

  import qualified Data.Conduit          as C
  import qualified System.ZMQ            as Z
  import           Network.Mom.Patterns.Types
  import           Network.Mom.Patterns.Streams

  ------------------------------------------------------------------------
  -- | Subscription data type
  ------------------------------------------------------------------------
  newtype Sub = Sub {subSock :: Z.Socket Z.Sub}

  ------------------------------------------------------------------------
  -- | Create a subscription and start the action, in which it lives
  --
  --   * 'Context'       - The zeromq context
  --
  --   * 'String'        - The address 
  --
  --   * 'LinkType'      - The link type, usually Connect
  --
  --   * ('Sub' -> IO a) - The action, in which the subscription lives
  ------------------------------------------------------------------------
  withSub :: Context       ->
             String        -> 
             LinkType      ->
             (Sub -> IO a) -> IO a
  withSub ctx add lt act = 
    Z.withSocket ctx Z.Sub $ \s -> 
      link lt s add [] >> act (Sub s)

  ------------------------------------------------------------------------
  -- | Subscribe to a list of topics;
  --   Note that a subscriber has to subscribe to at least one topic
  --   to receive any data.
  --
  --   * 'Sub'       - The subscriber
  --
  --   * ['Service'] - The list of topics to subscribe to
  ------------------------------------------------------------------------
  subscribe :: Sub -> [Service] -> IO ()
  subscribe s = mapM_ (Z.subscribe $ subSock s) 

  ------------------------------------------------------------------------
  -- | Check for new data:
  --
  --   * 'Sub'     - The subscriber
  --
  --   * 'Timeout' - When timeout expires,
  --                 the function returns 'Nothing'.
  --                 Timeout may be 
  --                 -1  - listen eternally,
  --                 0   - return immediately,
  --                 \> 0 - timeout in microseconds
  --
  --   * 'SinkR'   - Sink the result stream.
  --                 Note that the subscription header,
  --                 /i.e./ a message segment containing
  --                        a comma-separated list 
  --                        of the topics, to which
  --                        the data belong,
  --                 is dropped.
  ------------------------------------------------------------------------
  checkSub :: Sub -> Timeout -> SinkR (Maybe a) -> IO (Maybe a)
  checkSub s tmo snk = runReceiver (subSock s) tmo subSnk
    where subSnk = do
            mb <- C.await -- subscription header is filtered out!
            case mb of
              Nothing -> return Nothing
              Just _  -> snk