-- Hoogle documentation, generated by Haddock -- See Hoogle, http://www.haskell.org/hoogle/ -- | Common patterns in message-oriented applications -- -- In distributed, message-oriented applications, similar communication -- patterns are used over and over again, such as Server/Client (a.k.a -- Request/Response), Publish/Subscribe, Pipline (a.k.a. Push/Pull) and -- Exclusive Pair (a.k.a. Peer-to-Peer). The Patterns package implements -- those patterns based on zeromq. More information on zeromq can be -- found at http://www.zeromq.org. More examples and a test suite -- are available on http://github.com/toschoo/mom. -- -- Release History: -- -- @package patterns @version 0.1.0 -- | Fundamental streaming types module Network.Mom.Patterns.Types -- | The IO Resource transformer. See the conduit package for details type RIO = ResourceT IO -- | A stream source type Source = Source RIO ByteString -- | A stream sink without return type type Sink = Sink ByteString RIO () -- | A stream sink wit return type type SinkR r = Sink ByteString RIO r -- | A conduit that links source and sink applying some transformation to -- the stream. Input is always ByteString, output and return type -- may vary. type Conduit o r = ConduitM ByteString o RIO r -- | Simplified Conduit where output is always ByteString and no -- final value is returned. type Conduit_ = Conduit ByteString () -- | Streaming the elements of a list streamList :: [ByteString] -> Producer RIO ByteString -- | Pass the stream through without applying any transformation to it passThrough :: Conduit ByteString () -- | A zeromq AccessPoint can be bound or connected to its -- address. Only one peer can bind the address, all other parties have to -- connect. data LinkType -- | Bind the address Bind :: LinkType -- | Connect to the address Connect :: LinkType -- | Error handler for all services that are implemented as background -- services, e.g. servers and brokers. The handler receives the -- Criticality of the error event, the exception and an additional -- descriptive string. -- -- A good policy is to terminate or restart the service when a -- Fatal or Critical error occurs and to continue, if -- possible, on a plain Error. The error handler may perform -- additional, user-defined actions, such as logging the incident or -- sending an SMS. type OnError_ = Criticality -> SomeException -> String -> IO () -- | Indicates criticality of the error event data Criticality -- | The current operation (e.g. processing a request) has not -- terminated properly, but the service is able to continue; the error -- may have been caused by a faulty request or other temporal conditions. Error :: Criticality -- | The event has impact on the process, leaving it in an unkown state. Critical :: Criticality -- | The service cannot recover and will terminate Fatal :: Criticality -- | Stream Exception data StreamException -- | low-level error SocketExc :: String -> StreamException -- | IO error IOExc :: String -> StreamException -- | Protocol error ProtocolExc :: String -> StreamException -- | Application-defined error AppExc :: String -> StreamException -- | Internal error, indicating a code error in library Ouch :: String -> StreamException -- | String identifying a stream in the streams device type Identifier = String -- | String identifying a service provided, e.g. by a server type Service = String -- | Identity of a communication peer, needed for complex patterns -- (e.g. broker) type Identity = ByteString -- | Message body, needed for complex patterns (e.g. broker) type Body = [ByteString] -- | Milliseconds type Msec = Int -- | Reexport from zeromq (timeout in microseconds) type Timeout = Timeout -- | Reexport from zeromq type Context = Context -- | Reexport from zeromq type Size = Size -- | Reexport from zeromq withContext :: Size -> (Context -> IO a) -> IO a instance Typeable StreamException instance Show LinkType instance Read LinkType instance Eq Criticality instance Ord Criticality instance Show Criticality instance Read Criticality instance Show StreamException instance Read StreamException instance Eq StreamException instance Exception StreamException -- | Stream processing services module Network.Mom.Patterns.Streams -- | Starts polling on a set of streams. The actual polling will be run in -- another thread. The current thread continues with the action passed -- in. When this action terminates, the streamer stops polling. -- -- Parameters: -- -- withStreams :: Context -> Service -> Timeout -> [PollEntry] -> StreamAction -> OnError_ -> StreamSink -> Control a -> IO a -- | Receiver Sink: Internally a zeromq socket is waiting for input; when -- input is available, it is send to the sink. -- -- runReceiver :: Socket a -> Timeout -> SinkR (Maybe o) -> IO (Maybe o) -- | Sender Source: The Source generates a stream, which is relayed -- to the Socket. runSender :: Socket a -> Source -> IO () -- | A poll entry describes how to access and identify a socket data PollEntry Poll :: Identifier -> String -> AccessType -> LinkType -> [Service] -> [SocketOption] -> PollEntry -- | How to address this particular stream pollId :: PollEntry -> Identifier -- | The address to link to pollAdd :: PollEntry -> String -- | The zeromq socket type pollType :: PollEntry -> AccessType -- | How to link (bind or connect) pollLink :: PollEntry -> LinkType -- | List of Service (or topics) for subscribers pollSub :: PollEntry -> [Service] -- | zeromq socket options pollOs :: PollEntry -> [SocketOption] -- | Defines the type of a PollEntry; the names of the constructors -- are similar to the corresponding ZMQ socket types. data AccessType -- | Represents a server and expects connections from clients; corresponds -- to ZMQ Socket Type Rep ServerT :: AccessType -- | Represents a client and connects to a server; corresponds to ZMQ -- Socket Type Req ClientT :: AccessType -- | Represents a load balancer, expecting connections from clients; -- corresponds to ZMQ Socket Type XRep RouterT :: AccessType -- | Represents a router expecting connections from servers; corresponds to -- ZMQ Socket Type XReq DealerT :: AccessType -- | Represents a publisher; corresponds to ZMQ Socket Type Pub PubT :: AccessType -- | Represents a subscriber; corresponds to ZMQ Socket Type Sub SubT :: AccessType -- | Represents a Pipe; corresponds to ZMQ Socket Type Push PipeT :: AccessType -- | Represents a Puller; corresponds to ZMQ Socket Type Pull PullT :: AccessType -- | Represents a Peer; corresponds to ZMQ Socket Type Pair PeerT :: AccessType -- | Safely read AccessType; ignores the case of the input string -- (e.g. "servert" -> ServerT) parseAccess :: String -> Maybe AccessType -- | A zeromq AccessPoint can be bound or connected to its -- address. Only one peer can bind the address, all other parties have to -- connect. data LinkType -- | Bind the address Bind :: LinkType -- | Connect to the address Connect :: LinkType -- | Binds or connects a socket to an address link :: LinkType -> Socket a -> String -> [SocketOption] -> IO () -- | Safely read LinkType; ignores the case of the input string and, -- besides "bind" and "connect", also accepts "bin", "con" and "conn"; -- intended for use with command line parameters parseLink :: String -> Maybe LinkType -- | Holds information on streams and the current state of the streamer, -- i.e. the current source. Streamers are passed to processing -- conduits. data Streamer -- | Conduit with Streamer type StreamConduit = Streamer -> Conduit ByteString () -- | Sink with Streamer type StreamSink = Streamer -> Sink -- | IO Action with Streamer (e.g. Timeout action) type StreamAction = Streamer -> IO () -- | Filter subset of streams; usually you want to filter a subset of -- streams to which to relay an incoming stream. Note that the result is -- just a list of stream identifiers, which of course could be used -- directly in the first place. A meaningful use of filterstreams would -- be, for instance: -- --
--   let targets = filterStreams s (/= getSource s)
--   
-- -- Where all streams but the source are selected. filterStreams :: Streamer -> (Identifier -> Bool) -> [Identifier] -- | Get current source getSource :: Streamer -> Identifier -- | Send the ByteString segments to the outgoing streams -- identified by [Identifier]. The stream is terminated. stream :: Streamer -> [Identifier] -> [ByteString] -> Sink -- | Send the ByteString segments to the outgoing streams -- identified by [Identifier] without terminating the stream, -- i.e. more segments must be sent. part :: Streamer -> [Identifier] -> [ByteString] -> Sink -- | Pass all segments of an incoming stream to a list of outgoing streams. -- The stream is terminated. passAll :: Streamer -> [Identifier] -> Sink -- | Pass one segment and ignore the remainder of the stream. The stream is -- terminated. pass1 :: Streamer -> [Identifier] -> Sink -- | Pass n segments and ignore the remainder of the stream. The stream is -- terminated. passN :: Streamer -> [Identifier] -> Int -> Sink -- | Pass while condition is true and ignore the remainder of the stream. -- The stream is terminated. passWhile :: Streamer -> [Identifier] -> (ByteString -> Bool) -> Sink -- | Ignore an incoming stream ignoreStream :: Sink -- | Controller data Controller -- | Control Action type Control a = Controller -> IO a -- | The internal stream that represents the Controller. StreamSinks -- can write to this stream, e.g.: -- --
--   passAll s [internal]
--   
-- -- And the streamer may also receive from this stream, e.g.: -- --
--   if getSource s == internal
--   
internal :: Identifier -- | Stop streams stop :: Controller -> IO () -- | Pause streams pause :: Controller -> IO () -- | Resume streams resume :: Controller -> IO () -- | Send a stream through the controller send :: Controller -> [Identifier] -> Source -> IO () -- | Receive a stream through the controller that was sink'd to the target -- internal. receive :: Controller -> Timeout -> SinkR (Maybe a) -> IO (Maybe a) instance Eq AccessType instance Show AccessType instance Read AccessType instance Show PollEntry instance Read PollEntry instance Eq PollEntry -- | Client side of Client/Server module Network.Mom.Patterns.Basic.Client -- | Client data type data Client clService :: Client -> Service -- | Create a client with name Service, linking to address -- String, connecting or binding the address according to -- LinkType and finally entering the action, in whose scope the -- client lives. withClient :: Context -> Service -> String -> LinkType -> (Client -> IO a) -> IO a -- | Request a service: -- -- -- -- A 'hello world' Example: -- --
--   import qualified Data.Conduit          as C
--   import qualified Data.ByteString.Char8 as B
--   import           Network.Mom.Patterns.Basic.Client
--   import           Network.Mom.Patterns.Types
--   
-- --
--   main :: IO ()
--   main = withContext 1 $ \ctx -> 
--            withClient ctx "test" 
--                "tcp://localhost:5555" Connect $ \c -> do
--              mbX <- request c (-1) src snk
--              case mbX of
--                Nothing -> putStrLn "No Result"
--                Just x  -> putStrLn $ "Result: " ++ x
--     where src = C.yield (B.pack "hello world")
--           snk = do mbX <- C.await 
--                    case mbX of
--                      Nothing -> return Nothing
--                      Just x  -> return $ Just $ B.unpack x
--   
request :: Client -> Timeout -> Source -> SinkR (Maybe a) -> IO (Maybe a) -- | Server side of 'Client/Server' module Network.Mom.Patterns.Basic.Server -- | Start a server as a background process -- -- -- -- A very simple example, which just sends the incoming stream back to -- the client (bounce): -- --
--   import           Control.Monad (forever)
--   import           Control.Concurrent
--   import           Network.Mom.Patterns.Basic.Server
--   import           Network.Mom.Patterns.Types
--   
-- --
--   main :: IO ()
--   main = withContext 1 $ \ctx -> 
--              withServer ctx "Bouncer" "tcp://*:5555" Bind
--                         (\_ _ _ -> return ()) -- ignore error
--                         bounce $ \_ -> forever $ threadDelay 100000
--     where bounce = passThrough
--   
withServer :: Context -> Service -> String -> LinkType -> OnError_ -> Conduit_ -> Control a -> IO a -- | A simple load balancer device to link clients and servers. -- -- withQueue :: Context -> Service -> (String, LinkType) -> (String, LinkType) -> OnError_ -> Control a -> IO a -- | Publish side of 'Publish/Subscribe' module Network.Mom.Patterns.Basic.Publisher -- | Publisher data type data Pub -- | Create and link a publisher: -- -- withPub :: Context -> String -> LinkType -> (Pub -> IO a) -> IO a -- | Publish data: -- -- issue :: Pub -> [Service] -> Source -> IO () -- | A simple forwarder, i.e. a device that connects to a publisher -- and provides an additional endpoint for more subscribers to connect -- to. A forwarder, hence, is a means to extend the capacity of a -- publisher. -- -- withForwarder :: Context -> Service -> [Service] -> (String, LinkType) -> (String, LinkType) -> OnError_ -> Control a -> IO a -- | Subscriber side of 'Publish Subscribe' module Network.Mom.Patterns.Basic.Subscriber -- | Subscription data type data Sub -- | Create a subscription and start the action, in which it lives -- -- withSub :: Context -> String -> LinkType -> (Sub -> IO a) -> IO a -- | Subscribe to a list of topics; Note that a subscriber has to subscribe -- to at least one topic to receive any data. -- -- subscribe :: Sub -> [Service] -> IO () -- | Check for new data: -- -- checkSub :: Sub -> Timeout -> SinkR (Maybe a) -> IO (Maybe a) -- | Pusher side of 'Pipeline' module Network.Mom.Patterns.Basic.Pusher -- | The pusher data type data Pusher -- | The function in whose scope the pusher lives: -- -- withPusher :: Context -> String -> LinkType -> (Pusher -> IO a) -> IO a -- | Push a job down the pipeline; the Source creates the outgoing -- stream. push :: Pusher -> Source -> IO () -- | Puller side of 'Pipeline' module Network.Mom.Patterns.Basic.Puller -- | Start a puller as a background service: -- -- withPuller :: Context -> Service -> String -> LinkType -> OnError_ -> Sink -> (Controller -> IO a) -> IO a -- | A pipeline extends the capacity of the pusher-puller chain; a pipeline -- connects to a pusher and provides an access point to a set of pullers. -- -- withPipe :: Context -> Service -> (String, LinkType) -> (String, LinkType) -> OnError_ -> (Controller -> IO a) -> IO a -- | Basic communication patterns module Network.Mom.Patterns.Basic -- | Majordomo common definitions module Network.Mom.Patterns.Broker.Common -- | Majordomo protocol client/worker version 1 mdpC01 :: ByteString -- | Majordomo protocol client/worker version 1 mdpW01 :: ByteString -- | Message types (ready, request, reply, heartbeat, disconnect) xReady :: ByteString -- | Message types (ready, request, reply, heartbeat, disconnect) xDisc :: ByteString -- | Message types (ready, request, reply, heartbeat, disconnect) xHeartBeat :: ByteString -- | Message types (ready, request, reply, heartbeat, disconnect) xReply :: ByteString -- | Message types (ready, request, reply, heartbeat, disconnect) xRequest :: ByteString -- | Service name type ServiceName = String -- | Majordomo Management Interface (MMI) - "mmi.service" mmiHdr :: ByteString -- | Majordomo Management Interface (MMI) - "mmi.service" mmiSrv :: ByteString -- | Majordomo Management Interface -- responses: Found ("200"), NotFound -- ("404"), NotImplemented ("501") mmiFound :: ByteString -- | Majordomo Management Interface -- responses: Found ("200"), NotFound -- ("404"), NotImplemented ("501") mmiNimpl :: ByteString -- | Majordomo Management Interface -- responses: Found ("200"), NotFound -- ("404"), NotImplemented ("501") mmiNotFound :: ByteString -- | Client -> Broker: send request mdpCSndReq :: ServiceName -> Conduit ByteString () -- | Client -> Broker: receive request mdpCRcvReq :: Conduit o (Identity, ByteString) -- | Broker -> Client: send reply mdpCSndRep :: ByteString -> [Identity] -> Conduit ByteString () -- | Broker -> Client: receive reply mdpCRcvRep :: ServiceName -> Conduit ByteString () -- | Broker -> Server: send request mdpWSndReq :: Identity -> [Identity] -> Conduit ByteString () -- | Broker -> Server: receive request mdpWRcvReq :: Conduit o WFrame -- | Server -> Broker: send reply mdpWSndRep :: [Identity] -> Conduit ByteString () -- | Server -> Broker: receive reply mdpWRcvRep :: Conduit o WFrame -- | Broker <-> Server: send heartbeat mdpWBeat :: Conduit ByteString () -- | Server -> Broker: send connect request (ready) mdpWConnect :: ServiceName -> Source -- | Server -> Broker: disconnect mdpWDisconnect :: Source -- | Broker -> Server: disconnect mdpWBrkDisc :: Identity -> Source -- | Broker / Server protocol: Heartbeat, Ready, Reply, Request, Disconnect data WFrame WBeat :: Identity -> WFrame WReady :: Identity -> ByteString -> WFrame WReply :: Identity -> [Identity] -> WFrame WRequest :: [Identity] -> WFrame WDisc :: Identity -> WFrame -- | Worker Frame Type data FrameType ReadyT :: FrameType RequestT :: FrameType ReplyT :: FrameType HeartBeatT :: FrameType DisconnectT :: FrameType -- | Get frame type frameType :: Conduit o FrameType -- | Get empty segment empty :: Conduit o () -- | Check segment contents chunk :: ByteString -> String -> Conduit o () -- | Get segment contents getChunk :: Conduit o ByteString -- | Get identity identity :: Conduit o Identity -- | Get block of identities (envelope) envelope :: Conduit o [Identity] -- | Create envelope [(identity, B.empty)] toIs :: [Identity] -> [ByteString] -- | MDP Exception data MDPException -- | Server-side exception ServerExc :: String -> MDPException -- | Client-side exception ClientExc :: String -> MDPException -- | Broker exception BrokerExc :: String -> MDPException -- | Generic Protocol MDPExc :: String -> MDPException -- | MMI Protocol MMIExc :: String -> MDPException -- | SingleBroker error (another broker is already running in the same -- process) SingleBrokerExc :: String -> MDPException instance Typeable MDPException instance Eq WFrame instance Show WFrame instance Eq FrameType instance Show FrameType instance Read FrameType instance Show MDPException instance Read MDPException instance Eq MDPException instance Exception MDPException -- | Majordomo Client module Network.Mom.Patterns.Broker.Client -- | Client data type data Client -- | Create a client and start the action, in whose scope the client lives; withClient :: Context -> Service -> String -> LinkType -> (Client -> IO a) -> IO a -- | Service discovery: The function checks whether the client's service is -- provided by the broker. -- -- Return values: -- -- checkService :: Client -> Timeout -> IO (Maybe Bool) -- | Request a service: -- -- request :: Client -> Timeout -> Source -> SinkR (Maybe a) -> IO (Maybe a) -- | Majordomo Server module Network.Mom.Patterns.Broker.Server -- | Start a server as a background process -- -- withServer :: Context -> Service -> Msec -> String -> OnError_ -> Conduit_ -> (Control a) -> IO a -- | Majordomo Broker module Network.Mom.Patterns.Broker.Broker -- | Start a broker as a background process -- -- withBroker :: Context -> Service -> Msec -> String -> String -> OnError_ -> (Controller -> IO r) -> IO r -- | Majordomo Service Broker module Network.Mom.Patterns.Broker -- | In distributed message-oriented applications, the same communication -- patterns show up over and over again. This package implements some of -- these patterns based on the zeromq library. Patterns -- uses the zeromq-haskell package, but goes beyond in several -- aspects: -- -- -- -- More information on zeromq can be found at -- http://www.zeromq.org. module Network.Mom.Patterns