{-# LANGUAGE CPP #-} ------------------------------------------------------------------------------- -- | -- Module : Network/Mom/Patterns/Enumerator.hs -- Copyright : (c) Tobias Schoofs -- License : LGPL -- Stability : experimental -- Portability: portable -- -- Enumerators for basic patterns ------------------------------------------------------------------------------- module Network.Mom.Patterns.Enumerator ( -- * Enumerators -- $enums -- ** Raw Enumerators enumWith, enumFor, once, just, -- ** Fetchers Fetch, Fetch_, FetchHelper, FetchHelper', FetchHelper_, FetchHelper_', fetcher, fetcher_, fetch1, fetch1_, fetchFor, fetchFor_, fetchJust, fetchJust_, listFetcher, listFetcher_, #ifdef _TEST err, #endif -- * Iteratees -- $its -- ** Raw Iteratees one, mbOne, toList, toString, append, store, -- ** Dumps Dump, sink, sinkI, nosink) where import Types import qualified Data.Enumerator as E import Data.Enumerator (($$)) import qualified Data.Enumerator.List as EL import qualified Data.Monoid as M import Data.List (foldl', intercalate) import Control.Applicative ((<$>)) import Control.Monad import Control.Monad.Trans import Prelude hiding (catch) import Control.Exception (AssertionFailed(..), catch, throwIO) import qualified System.ZMQ as Z ------------------------------------------------------------------------ -- $enums -- Enumerators generate streams -- and pass chunks of the stream for further processing -- to Iteratees. -- The Enumerator-Iteratee abstraction is very powerful -- and is by far not discussed exhaustively here. -- For more details, please refer to the documentation -- of the Enumerator package. -- -- The Patterns package provides a small set -- of enumerators that may be useful for many messaging patterns. -- Enumerators are split into raw enumerators, -- which can be used with patterns under direct control -- of application code such as 'Client', 'Pub' and 'Peer', -- and Fetchers, which are used with services, /i.e./ -- 'withServer' and 'withPeriodicPub'. ------------------------------------------------------------------------ -- | Calls an application-defined /getter/ function -- until this returns 'Nothing'; -- if the getter throws an exception, -- the enumerator returns 'E.Error'. ------------------------------------------------------------------------ enumWith :: (i -> IO (Maybe o)) -> i -> E.Enumerator o IO () enumWith get i step = case step of E.Continue k -> chainIOe (get i) $ \mbO -> case mbO of Nothing -> E.continue k Just o -> enumWith get i $$ k (E.Chunks [o]) _ -> E.returnI step ------------------------------------------------------------------------ -- | Calls the application-defined /getter/ function /n/ times; -- The enumerator receives a pair ('Int', 'Int'), -- where the first integer is a counter and -- the second is the upper bound. -- /n/ is defined as /snd - fst/, /i.e./ -- the counter is incremented until it reaches the value -- of the bound. The counter must be a value less than the bound -- to avoid protocol errors, /i.e./ the /getter/ must be called -- at least once. -- The current value of the counter and additional input -- are passed to the /getter/. -- if the getter throws an exception, -- the enumerator returns 'E.Error'. ------------------------------------------------------------------------ enumFor :: (Int -> i -> IO o) -> (Int, Int) -> i -> E.Enumerator o IO () enumFor get runner i = go runner where go (c,e) step = case step of E.Continue k -> if c >= e then E.continue k else chainIOe (get c i) $ \o -> go (c+1,e) $$ k (E.Chunks [o]) _ -> E.returnI step ------------------------------------------------------------------------ -- | Calls the application-defined /getter/ function once; -- the enumerator must return a value -- (the result type is not 'Maybe'), -- otherwise, the sending iteratee has nothing to send -- which would most likely result in a protocol error. -- if the getter throws an exception, -- the enumerator returns 'E.Error'. ------------------------------------------------------------------------ once :: (i -> IO o) -> i -> E.Enumerator o IO () once = go True where go first get i step = case step of E.Continue k -> if first then chainIOe (get i) $ \o -> go False get i $$ k (E.Chunks [o]) else E.continue k _ -> E.returnI step ------------------------------------------------------------------------ -- | Passes just the input value to iteratee; -- -- > just "hello world" -- -- hence, reduces to just "hello world" sent over the wire. ------------------------------------------------------------------------ just :: o -> E.Enumerator o IO () just = go True where go first o step = case step of E.Continue k -> if first then go False o $$ k (E.Chunks [o]) else E.continue k _ -> E.returnI step ------------------------------------------------------------------------ -- | Calls the application-defined 'FetchHelper' -- until it returns 'Nothing'; -- note that the 'FetchHelper' shall return at least one 'Just' -- value to avoid a protocol error. -- If the 'FetchHelper' throws an exception, -- the 'fetcher' returns 'E.Error'. ------------------------------------------------------------------------ fetcher :: FetchHelper i o -> Fetch i o fetcher fetch ctx p i step = case step of (E.Continue k) -> chainIOe (fetch ctx p i) $ \mbo -> case mbo of Nothing -> E.continue k Just o -> fetcher fetch ctx p i $$ k (E.Chunks [o]) _ -> E.returnI step ------------------------------------------------------------------------ -- | A variant of 'fetcher' without input; ------------------------------------------------------------------------ fetcher_ :: FetchHelper_ o -> Fetch_ o fetcher_ = fetcher ------------------------------------------------------------------------ -- | Calls the application-defined 'FetchHelper'' once; -- If the 'FetchHelper'' throws an exception, -- the 'fetcher' returns 'E.Error'. ------------------------------------------------------------------------ fetch1 :: FetchHelper' i o -> Fetch i o fetch1 = go True where go first fetch ctx p i step = case step of (E.Continue k) -> if first then chainIOe (fetch ctx p i) $ \o -> go False fetch ctx p i $$ k (E.Chunks [o]) else E.continue k _ -> E.returnI step ------------------------------------------------------------------------ -- | A variant of 'fetch1' without input; ------------------------------------------------------------------------ fetch1_ :: FetchHelper_' o -> Fetch_ o fetch1_ = fetch1 ------------------------------------------------------------------------ -- | Calls the iteratee for each element of the input list ------------------------------------------------------------------------ listFetcher :: Fetch [o] o listFetcher ctx p os step = case step of (E.Continue k) -> if null os then E.continue k else listFetcher ctx p (tail os) $$ k (E.Chunks [head os]) _ -> E.returnI step ------------------------------------------------------------------------ -- | A variant of 'listFetcher' for services without input; -- the list, in this case, is passed as an additional argument -- to the fetcher. ------------------------------------------------------------------------ listFetcher_ :: [o] -> Fetch_ o listFetcher_ l ctx p _ step = case step of (E.Continue k) -> if null l then E.continue k else listFetcher_ (tail l) ctx p () $$ k (E.Chunks [head l]) _ -> E.returnI step ------------------------------------------------------------------------ -- | Calls the application-defined /getter/ /n/ times; -- The /getter/ is a variant of 'FetchHelper'' -- with the current value of the counter as additional argument. -- For more details, refer to 'enumFor'. ------------------------------------------------------------------------ fetchFor :: (Z.Context -> Parameter -> Int -> i -> IO o) -> (Int, Int) -> Fetch i o fetchFor fetch (c,e) ctx p i step = case step of (E.Continue k) -> if c >= e then E.continue k else chainIOe (fetch ctx p c i) $ \x -> fetchFor fetch (c+1, e) ctx p i $$ k (E.Chunks [x]) _ -> E.returnI step ------------------------------------------------------------------------ -- | A variant of 'fetchFor' without input ------------------------------------------------------------------------ fetchFor_ :: (Z.Context -> Parameter -> Int -> () -> IO o) -> (Int, Int) -> Fetch_ o fetchFor_ = fetchFor ------------------------------------------------------------------------ -- | Passes just the input value to the iteratee; -- -- > fetchJust "hello world" -- -- hence, reduces to just \"hello world\" sent over the wire. -- Note that the input /i/ is ignored. ------------------------------------------------------------------------ fetchJust :: o -> Fetch i o fetchJust o _ _ _ = go True where go first step = case step of (E.Continue k) -> if first then go False $$ k (E.Chunks [o]) -- yield? else E.continue k _ -> E.returnI step ------------------------------------------------------------------------ -- | A variant of 'fetchJust' without input ------------------------------------------------------------------------ fetchJust_ :: o -> Fetch_ o fetchJust_ = fetchJust ------------------------------------------------------------------------ -- For testing only ------------------------------------------------------------------------ #ifdef _TEST err :: Fetch_ o err _ _ _ s = do ei <- liftIO $ catch (throwIO (AssertionFailed "Test") >>= \_ -> return $ Right ()) (\e -> return $ Left e) case ei of Left e -> E.returnI (E.Error e) Right _ -> E.returnI s #endif ------------------------------------------------------------------------ -- $its -- Iteratees process chunks of streams -- passed in by an enumerator. -- The Enumerator-Iteratee abstraction is very powerful -- and is by far not discussed exhaustively here. -- For more details, please refer to the documentation -- of the Enumerator package. -- -- The Patterns package provides a small set -- of iteratees that may be useful for many messaging patterns. -- Iteratees are split into raw iteratees, -- which can be used with patterns under direct control -- of application code such as 'Client', 'Pub', 'Peer' -- and, for obtaining the request, 'withServer', -- and Dumps, which are used with services, /i.e./ -- 'withSub' and 'withPuller'. ------------------------------------------------------------------------ -- | Calls the application-defined IO action -- for each element of the stream; -- The IO action could, for instance, -- write to an already opened file, -- store values in an 'MVar' or -- send them through a 'Chan' to another thread -- for further processing. -- An exception thrown in the IO action -- is re-thrown by 'E.throwError'. ------------------------------------------------------------------------ store :: (i -> IO ()) -> E.Iteratee i IO () store save = do mbi <- EL.head case mbi of Nothing -> return () Just i -> tryIO (save i) >> store save ------------------------------------------------------------------------ -- | Returns one value of type /i/; -- if the enumerator creates a value, this value is returned; -- otherwise, the input value is returned. ------------------------------------------------------------------------ one :: i -> E.Iteratee i IO i one x = do mbi <- EL.head case mbi of Nothing -> return x Just i -> return i ------------------------------------------------------------------------ -- | Returns one value of type 'Maybe' /i/; -- equal to 'Data.Enumerator.List.head' ------------------------------------------------------------------------ mbOne :: E.Iteratee i IO (Maybe i) mbOne = EL.head ------------------------------------------------------------------------ -- | Returns a list containing all chunks of the stream; -- equal to 'Data.Enumerator.List.consume'; -- note that this iteratee causes a space leak -- and is not suitable for huge streams or streams of unknown size. ------------------------------------------------------------------------ toList :: E.Iteratee i IO [i] -- equal to EL.consume toList = EL.consume ------------------------------------------------------------------------ -- | Returns a string containing all chunks of the stream -- intercalated with the input string, /e.g./: -- if the stream consists of the two elements \"hello\" and \"world\" -- -- > toString " " -- -- returns "hello world". -- Note that this iteratee causes a space leak -- and is not suitable for huge streams or streams of unknown size. ------------------------------------------------------------------------ toString :: String -> E.Iteratee String IO String toString s = intercalate s <$> EL.consume ------------------------------------------------------------------------ -- | Merges the elements of a stream using 'M.mappend'; -- if the stream is empty, 'append' returns 'M.mempty'. -- The type /i/ must be instance of 'M.Monoid'. -- Note that this iteratee causes a space leak -- and is not suitable for huge streams or streams of unknown size. ------------------------------------------------------------------------ append :: M.Monoid i => E.Iteratee i IO i append = foldl' M.mappend M.mempty <$> EL.consume ------------------------------------------------------------------------ -- | Opens a data sink, dumps the stream into this sink -- and closes the sink when the stream terminates -- or when an error occurs; -- the first IO action is used to open the sink (of type /s/), -- the second closes the sink and -- the third writes one element into the sink. ------------------------------------------------------------------------ sink :: (Z.Context -> String -> IO s ) -> (Z.Context -> String -> s -> IO ()) -> (Z.Context -> String -> s -> i -> IO ()) -> Dump i sink op cl sv ctx p = tryIO (op ctx p) >>= go where go s = E.catchError (body s) (onerr s) body s = do mbi <- EL.head case mbi of Nothing -> tryIO (cl ctx p s) Just i -> tryIO (sv ctx p s i) >> body s onerr s e = tryIO (cl ctx p s) >> E.throwError e ------------------------------------------------------------------------ -- | Variant of 'sink' that uses the first segment of the stream -- as input parameter to open the sink. -- The first segment, which could contain -- a file name or parameters for an /SQL/ query, -- is not written to the sink. -- As with 'sink', the sink is closed when the stream terminates or -- when an error occurs. ------------------------------------------------------------------------ sinkI :: (Z.Context -> String -> i -> IO s ) -> (Z.Context -> String -> s -> IO ()) -> (Z.Context -> String -> s -> i -> IO ()) -> Dump i sinkI op cl sv ctx p = do mbi <- EL.head case mbi of Nothing -> return () Just i -> do s <- tryIO (op ctx p i) E.catchError (body s) (onerr s) where body s = do mbi <- EL.head case mbi of Nothing -> tryIO (cl ctx p s) Just i -> tryIO (sv ctx p s i) >> body s onerr s e = tryIO (cl ctx p s) >> E.throwError e ------------------------------------------------------------------------ -- | Similar to 'sink', but uses a data sink that is opened and closed -- outside the scope of the service or does not need to be -- opened and closed at all; -- examples may be services that write to 'MVar' or 'Chan'. -- 'nosink' is implemented as a closure of 'store': -- -- > nosink save ctx p = store (save ctx p) ------------------------------------------------------------------------ nosink :: (Z.Context -> String -> i -> IO ()) -> Dump i nosink sv ctx p = store (sv ctx p)