{-# LANGUAGE Trustworthy #-} -- | @pipes@ utilities for interfacing with the @courier@ message-passing framework. module Pipes.Courier ( -- * Endpoints -- $endpoints send , send' , broadcast , post , receive , receiveTimeout , receiveTimeout' , select , selectTimeout , selectTimeout' -- * Re-exports -- $reexports , module Network.Endpoints ) where import Data.Function import Network.Endpoints import Pipes {- $endpoints Pipes which operate on 'Endpoint's. -} {- | @send ep name@ sends the message supplied upstream through the given 'Endpoint' @ep@ to the 'Name' @name@. Analogous to 'sendMessage_'. -} send :: MonadIO m => Endpoint -> Name -> Consumer' Message m r send ep name = for cat $ \m -> liftIO (sendMessage_ ep name m) {-# INLINABLE send #-} {-# RULES "p >-> send" forall p ep name . p >-> send ep name = for p (\m -> liftIO (sendMessage_ ep name m)) #-} {- | @send' ep name@ is like @send@ except the pipe closes if an error occurs while finding a suitable 'Transport'. -} send' :: MonadIO m => Endpoint -> Name -> Consumer' Message m () send' ep name = for cat $ \m -> fix $ \f -> liftIO (sendMessage ep name m) >>= \x -> case x of { Left _ -> return (); _ -> f } {-# INLINABLE send' #-} {-# RULES "p >-> send'" forall p ep name . p >-> send' ep name = for p $ \m -> fix $ \f -> liftIO (sendMessage ep name m) >>= \x -> case x of { Left _ -> return (); _ -> f } #-} {- | @broadcast ep names@ sends the message supplied upstream through the given 'Endpoint' @ep@ to the 'Name's specified in the list of @names@. Analogous to 'broadcastMessage_'. -} broadcast :: MonadIO m => Endpoint -> [Name] -> Consumer' Message m r broadcast ep names = for cat $ \m -> liftIO (broadcastMessage_ ep names m) {-# INLINABLE broadcast #-} {-# RULES "p >-> broadcast" forall p ep names . p >-> broadcast ep names = for p (\m -> liftIO (broadcastMessage_ ep names m)) #-} {- | @post ep@ posts the message supplied by upstream to the given 'Endpoint' @ep@ without using a transport. Analogous to 'postMessage'. -} post :: MonadIO m => Endpoint -> Consumer' Message m r post ep = for cat $ \m -> liftIO (postMessage ep m) {-# INLINABLE post #-} {-# RULES "p >-> post" forall p ep . p >-> post ep = for p (\m -> liftIO (postMessage ep m)) #-} {- | @receive ep@ forwards the next message available from the given 'Endpoint' @ep@ downstream. Analogous to 'receiveMessage'. -} receive :: MonadIO m => Endpoint -> Producer' Message m r receive ep = liftIO (receiveMessage ep) >~ cat {-# INLINABLE receive #-} {-# RULES "receive ep >-> p" forall ep p . receive ep >-> p = liftIO (receiveMessage ep) >~ p #-} {- | @receiveTimeout ep tout@ is like 'receive' except operates using a timeout. If a message becomes available before the given timeout then downstream will receive a 'Just', otherwise it will receive 'Nothing' (but the pipe will remain open). Analogous to 'receiveMessageTimeout'. -} receiveTimeout :: MonadIO m => Endpoint -> Int -> Producer' (Maybe Message) m r receiveTimeout ep tout = liftIO (receiveMessageTimeout ep tout) >~ cat {-# INLINABLE receiveTimeout #-} {-# RULES "receiveTimeout ep tout >-> p" forall ep tout p . receiveTimeout ep tout >-> p = liftIO (receiveMessageTimeout ep tout) >~ p #-} {- | @receiveTimeout' ep tout@ is like 'receiveTimeout' except the pipe closes if no messages are received within the timeout period. -} receiveTimeout' :: MonadIO m => Endpoint -> Int -> Producer' Message m () receiveTimeout' ep tout = go where go = do msg <- liftIO $ receiveMessageTimeout ep tout case msg of Just m -> yield m >> go Nothing -> return () {-# INLINABLE receiveTimeout' #-} {- | @select ep f@ takes all of the messages available from the given 'Endpoint' @ep@ and applies them to the function @f@. Messages will only be sent downstream if @f@ returns 'Just'. Analogous to 'selectMessage'. -} select :: MonadIO m => Endpoint -> (Message -> Maybe v) -> Producer' v m r select ep f = liftIO (selectMessage ep f) >~ cat {-# INLINABLE select #-} {-# RULES "select ep f >-> p" forall ep f p . select ep f >-> p = liftIO (selectMessage ep f) >~ p #-} {- | @selectTimeout ep tout f@ is like 'select' except operates using a timeout. If a message is selected before the given timeout then downstream will receive a 'Just', otherwise it will receive 'Nothing' (but the pipe will remain open). Analogous to 'selectMessageTimeout'. -} selectTimeout :: MonadIO m => Endpoint -> Int -> (Message -> Maybe v) -> Producer' (Maybe v) m r selectTimeout ep tout f = liftIO (selectMessageTimeout ep tout f) >~ cat {-# INLINABLE selectTimeout #-} {-# RULES "selectTimeout ep tout f >-> p" forall ep tout f p . selectTimeout ep tout f >-> p = liftIO (selectMessageTimeout ep tout f) >~ p #-} {- | @selectTimeout' ep tout f@ is like 'selectTimeout' except the pipe closes if no messages are received within the timeout period. -} selectTimeout' :: MonadIO m => Endpoint -> Int -> (Message -> Maybe v) -> Producer' v m () selectTimeout' ep tout f = go where go = do msg <- liftIO $ selectMessageTimeout ep tout f case msg of Just m -> yield m >> go Nothing -> return () {-# INLINABLE selectTimeout' #-} -- $reexports