{-# LANGUAGE Trustworthy #-}

-- | @pipes@ utilities for interfacing with the @courier@ message-passing framework.
module Pipes.Courier
    ( -- * Endpoints
      -- $endpoints
      send
    , send'
    , broadcast
    , post
    , receive
    , receiveTimeout
    , receiveTimeout'
    , select
    , selectTimeout
    , selectTimeout'

      -- * Re-exports
      -- $reexports
    , module Network.Endpoints
    ) where

import Data.Function
import Network.Endpoints
import Pipes


{- $endpoints
    
    Pipes which operate on 'Endpoint's.
-}

{- | @send ep name@ sends the message supplied upstream through the given 'Endpoint' @ep@ to the
     'Name' @name@.

     Analogous to 'sendMessage_'.
-}
send :: MonadIO m => Endpoint -> Name -> Consumer' Message m r
send ep name = for cat $ \m -> liftIO (sendMessage_ ep name m)
{-# INLINABLE send #-}

{-# RULES
    "p >-> send" forall p ep name .
        p >-> send ep name = for p (\m -> liftIO (sendMessage_ ep name m))
  #-}

{- | @send' ep name@ is like @send@ except the pipe closes if an error occurs while finding a
     suitable 'Transport'.
-}
send' :: MonadIO m => Endpoint -> Name -> Consumer' Message m ()
send' ep name = for cat $ \m -> fix $ \f ->
    liftIO (sendMessage ep name m) >>= \x -> case x of { Left _ -> return (); _ -> f }
{-# INLINABLE send' #-}

{-# RULES
    "p >-> send'" forall p ep name .
        p >-> send' ep name = for p $ \m -> fix $ \f ->
            liftIO (sendMessage ep name m) >>= \x -> case x of { Left _ -> return (); _ -> f }
  #-}

{- | @broadcast ep names@ sends the message supplied upstream through the given 'Endpoint' @ep@ to
     the 'Name's specified in the list of @names@.

     Analogous to 'broadcastMessage_'.
-}
broadcast :: MonadIO m => Endpoint -> [Name] -> Consumer' Message m r
broadcast ep names = for cat $ \m -> liftIO (broadcastMessage_ ep names m)
{-# INLINABLE broadcast #-}

{-# RULES
    "p >-> broadcast" forall p ep names .
        p >-> broadcast ep names = for p (\m -> liftIO (broadcastMessage_ ep names m))
  #-}

{- | @post ep@ posts the message supplied by upstream to the given 'Endpoint' @ep@ without using a
     transport.

     Analogous to 'postMessage'.
-}
post :: MonadIO m => Endpoint -> Consumer' Message m r
post ep = for cat $ \m -> liftIO (postMessage ep m)
{-# INLINABLE post #-}

{-# RULES
    "p >-> post" forall p ep .
        p >-> post ep = for p (\m -> liftIO (postMessage ep m))
  #-}

{- | @receive ep@ forwards the next message available from the given 'Endpoint' @ep@ downstream.
     
     Analogous to 'receiveMessage'.
-}
receive :: MonadIO m => Endpoint -> Producer' Message m r
receive ep = liftIO (receiveMessage ep) >~ cat
{-# INLINABLE receive #-}

{-# RULES
    "receive ep >-> p" forall ep p .
        receive ep >-> p = liftIO (receiveMessage ep) >~ p
  #-}

{- | @receiveTimeout ep tout@ is like 'receive' except operates using a timeout. If a message becomes
     available before the given timeout then downstream will receive a 'Just', otherwise it will
     receive 'Nothing' (but the pipe will remain open).

     Analogous to 'receiveMessageTimeout'.
-}
receiveTimeout :: MonadIO m => Endpoint -> Int -> Producer' (Maybe Message) m r
receiveTimeout ep tout = liftIO (receiveMessageTimeout ep tout) >~ cat
{-# INLINABLE receiveTimeout #-}

{-# RULES
    "receiveTimeout ep tout >-> p" forall ep tout p .
        receiveTimeout ep tout >-> p = liftIO (receiveMessageTimeout ep tout) >~ p
  #-}

{- | @receiveTimeout' ep tout@ is like 'receiveTimeout' except the pipe closes if no messages are
     received within the timeout period.
-}
receiveTimeout' :: MonadIO m => Endpoint -> Int -> Producer' Message m ()
receiveTimeout' ep tout = go
  where
    go = do
        msg <- liftIO $ receiveMessageTimeout ep tout
        case msg of
            Just m  -> yield m >> go
            Nothing -> return ()
{-# INLINABLE receiveTimeout' #-}

{- | @select ep f@ takes all of the messages available from the given 'Endpoint' @ep@ and applies
     them to the function @f@. Messages will only be sent downstream if @f@ returns 'Just'.

     Analogous to 'selectMessage'.
-}
select :: MonadIO m => Endpoint -> (Message -> Maybe v) -> Producer' v m r
select ep f = liftIO (selectMessage ep f) >~ cat
{-# INLINABLE select #-}

{-# RULES
    "select ep f >-> p" forall ep f p .
        select ep f >-> p = liftIO (selectMessage ep f) >~ p
  #-}

{- | @selectTimeout ep tout f@ is like 'select' except operates using a timeout. If a message is 
     selected before the given timeout then downstream will receive a 'Just', otherwise it will
     receive 'Nothing' (but the pipe will remain open).

     Analogous to 'selectMessageTimeout'.
-}
selectTimeout :: MonadIO m => Endpoint -> Int -> (Message -> Maybe v) -> Producer' (Maybe v) m r
selectTimeout ep tout f = liftIO (selectMessageTimeout ep tout f) >~ cat
{-# INLINABLE selectTimeout #-}

{-# RULES
    "selectTimeout ep tout f >-> p" forall ep tout f p .
        selectTimeout ep tout f >-> p = liftIO (selectMessageTimeout ep tout f) >~ p
  #-}

{- | @selectTimeout' ep tout f@ is like 'selectTimeout' except the pipe closes if no messages are
     received within the timeout period.
-}
selectTimeout' :: MonadIO m => Endpoint -> Int -> (Message -> Maybe v) -> Producer' v m ()
selectTimeout' ep tout f = go
  where
    go = do
        msg <- liftIO $ selectMessageTimeout ep tout f
        case msg of
            Just m  -> yield m >> go
            Nothing -> return ()
{-# INLINABLE selectTimeout' #-}


-- $reexports