-- Haskell98! -- | Monadic and General Iteratees: -- incremental input parsers, processors and transformers -- -- The running example, parts 1 and 2 -- Part 1 is reading the headers, the sequence of lines terminated by an -- empty line. Each line is terminated by CR, LF, or CRLF. -- We should return the headers in order. In the case of error, -- we should return the headers read so far and the description of the error. -- Part 2 is reading the headers and reading all the lines from the -- HTTP-chunk-encoded content that follows the headers. Part 2 thus -- verifies layering of streams, and processing of one stream -- embedded (chunk encoded) into another stream. module System.IterateeM where import System.Posix import Foreign.C import Foreign.Ptr import Foreign.Marshal.Alloc import Data.List (splitAt) import Data.Char (isHexDigit, digitToInt, isSpace) import Control.Monad.Trans import Control.Monad.Identity import System.LowLevelIO -- | A stream is a (continuing) sequence of elements bundled in Chunks. -- The first two variants indicate termination of the stream. -- Chunk [a] gives the currently available part of the stream. -- The stream is not terminated yet. -- The case (Chunk []) signifies a stream with no currently available -- data but which is still continuing. A stream processor should, -- informally speaking, ``suspend itself'' and wait for more data -- to arrive. -- Later on, we can add another variant: IE_block (Ptr CChar) CSize -- so we could parse right from the buffer. data StreamG a = EOF | Err String | Chunk [a] deriving Show -- | A particular instance of StreamG: the stream of characters. -- This stream is used by many input parsers. type Stream = StreamG Char -- | Iteratee -- a generic stream processor, what is being folded over -- a stream -- When Iteratee is in the 'done' state, it contains the computed -- result and the remaining part of the stream. -- In the 'cont' state, the iteratee has not finished the computation -- and needs more input. -- We assume that all iteratees are `good' -- given bounded input, -- they do the bounded amount of computation and take the bounded amount -- of resources. The monad m describes the sort of computations done -- by the iteratee as it processes the stream. The monad m could be -- the identity monad (for pure computations) or the IO monad -- (to let the iteratee store the stream processing results as they -- are computed). -- We also assume that given a terminated stream, an iteratee -- moves to the done state, so the results computed so far could be returned. -- -- We could have used existentials instead, by doing the closure conversion -- data IterateeG el m a = IE_done a (StreamG el) | IE_cont (StreamG el -> IterateeGM el m a) newtype IterateeGM el m a = IM{unIM:: m (IterateeG el m a)} type Iteratee m a = IterateeG Char m a type IterateeM m a = IterateeGM Char m a -- | Useful combinators for implementing iteratees and enumerators -- liftI :: Monad m => IterateeG el m a -> IterateeGM el m a liftI = IM . return -- | Just like bind (at run-time, this is indeed exactly bind) infixl 1 >>== (>>==):: Monad m => IterateeGM el m a -> (IterateeG el m a -> IterateeGM el' m b) -> IterateeGM el' m b m >>== f = IM (unIM m >>= unIM . f) -- | Just like an application -- a call-by-value-like application infixr 1 ==<< (==<<) :: Monad m => (IterateeG el m a -> IterateeGM el' m b) -> IterateeGM el m a -> IterateeGM el' m b f ==<< m = m >>== f -- | The following is a `variant' of join in the IterateeGM el m monad. -- When el' is the same as el, the type of joinI is indeed that of -- true monadic join. However, joinI is subtly different: since -- generally el' is different from el, it makes no sense to -- continue using the internal, IterateeG el' m a: we no longer -- have elements of the type el' to feed to that iteratee. -- We thus send EOF to the internal Iteratee and propagate its result. -- This join function is useful when dealing with `derived iteratees' -- for embedded/nested streams. In particular, joinI is useful to -- process the result of stake, map_stream, or conv_stream below. joinI :: Monad m => IterateeGM el m (IterateeG el' m a) -> IterateeGM el m a joinI m = m >>= (\iter -> enum_eof iter >>== check) where check (IE_done x (Err str)) = liftI $ (IE_done x (Err str)) check (IE_done x _) = liftI $ (IE_done x EOF) check (IE_cont _) = error "joinI: can't happen: EOF didn't terminate" -- | It turns out, IterateeGM form a monad. We can use the familiar do -- notation for composing Iteratees -- instance Monad m => Monad (IterateeGM el m) where return x = liftI $ IE_done x (Chunk []) m >>= f = m >>== docase where docase (IE_done a (Chunk [])) = f a docase (IE_done a stream) = f a >>== (\r -> case r of IE_done x _ -> liftI $ IE_done x stream IE_cont k -> k stream) docase (IE_cont k) = liftI $ IE_cont ((>>= f) . k) instance MonadTrans (IterateeGM el) where lift m = IM (m >>= unIM . return) -- ------------------------------------------------------------------------ -- Primitive iteratees -- | Read a stream to the end and return all of its elements as a list stream2list :: Monad m => IterateeGM el m [el] stream2list = liftI $ IE_cont (step []) where step acc (Chunk []) = liftI $ IE_cont (step acc) step acc (Chunk ls) = liftI $ IE_cont (step $ acc ++ ls) step acc stream = liftI $ IE_done acc stream -- | Check to see if the stream is in error iter_report_err :: Monad m => IterateeGM el m (Maybe String) iter_report_err = liftI $ IE_cont step where step s@(Err str) = liftI $ IE_done (Just str) s step s = liftI $ IE_done Nothing s -- ------------------------------------------------------------------------ -- Parser combinators -- | The analogue of List.break -- It takes an element predicate and returns a pair: -- (str, Just c) -- the element 'c' is the first element of the stream -- satisfying the break predicate; -- The list str is the prefix of the stream up -- to but including 'c' -- (str,Nothing) -- The stream is terminated with EOF or error before -- any element satisfying the break predicate was found. -- str is the scanned part of the stream. -- None of the element in str satisfy the break predicate. -- sbreak :: Monad m => (el -> Bool) -> IterateeGM el m ([el],Maybe el) sbreak cpred = liftI $ IE_cont (liftI . step []) where step before (Chunk []) = IE_cont (liftI . step before) step before (Chunk str) = case break cpred str of (_,[]) -> IE_cont (liftI . step (before ++ str)) (str,c:tail) -> done (before ++ str) (Just c) (Chunk tail) step before stream = done before Nothing stream done line char stream = IE_done (line,char) stream -- | A particular optimized case of the above: skip all elements of the stream -- satisfying the given predicate -- until the first element -- that does not satisfy the predicate, or the end of the stream. -- This is the analogue of List.dropWhile sdropWhile :: Monad m => (el -> Bool) -> IterateeGM el m () sdropWhile cpred = liftI $ IE_cont step where step (Chunk []) = sdropWhile cpred step (Chunk str) = case dropWhile cpred str of [] -> sdropWhile cpred str -> liftI $ IE_done () (Chunk str) step stream = liftI $ IE_done () stream -- | Attempt to read the next element of the stream -- Return (Just c) if successful, return Nothing if the stream is -- terminated (by EOF or an error) snext :: Monad m => IterateeGM el m (Maybe el) snext = liftI $ IE_cont step where step (Chunk []) = snext step (Chunk (c:t)) = liftI $ IE_done (Just c) (Chunk t) step stream = liftI $ IE_done Nothing stream -- | Look ahead at the next element of the stream, without removing -- it from the stream. -- Return (Just c) if successful, return Nothing if the stream is -- terminated (by EOF or an error) speek :: Monad m => IterateeGM el m (Maybe el) speek = liftI $ IE_cont step where step (Chunk []) = speek step s@(Chunk (c:_)) = liftI $ IE_done (Just c) s step stream = liftI $ IE_done Nothing stream -- | Skip the rest of the stream skip_till_eof :: Monad m => IterateeGM el m () skip_till_eof = liftI $ IE_cont step where step (Chunk _) = skip_till_eof step _ = return () -- | Skip n elements of the stream, if there are that many -- This is the analogue of List.drop sdrop :: Monad m => Int -> IterateeGM el m () sdrop 0 = return () sdrop n = liftI $ IE_cont step where step (Chunk str) | length str <= n = sdrop (n - length str) step (Chunk str) = liftI $ IE_done () (Chunk s2) where (s1,s2) = splitAt n str step stream = liftI $ IE_done () stream -- ------------------------------------------------------------------------ -- | Iteratee converters for stream embedding -- The converters show a different way of composing two iteratees: -- `vertical' rather than `horizontal' -- -- The type of the converter from the stream with elements el_outer -- to the stream with element el_inner. The result is the iteratee -- for the outer stream that uses an `IterateeG el_inner m a' -- to process the embedded, inner stream as it reads the outer stream. type EnumeratorN el_outer el_inner m a = IterateeG el_inner m a -> IterateeGM el_outer m (IterateeG el_inner m a) -- | Read n elements from a stream and apply the given iteratee to the -- stream of the read elements. Unless the stream is terminated early, we -- read exactly n elements (even if the iteratee has accepted fewer). stake :: Monad m => Int -> EnumeratorN el el m a stake 0 iter = return iter stake n iter@IE_done{} = sdrop n >> return iter stake n (IE_cont k) = liftI $ IE_cont step where step (Chunk []) = liftI $ IE_cont step step chunk@(Chunk str) | length str <= n = stake (n - length str) ==<< k chunk step (Chunk str) = done (Chunk s1) (Chunk s2) where (s1,s2) = splitAt n str step stream = done stream stream done s1 s2 = k s1 >>== \r -> liftI $ IE_done r s2 -- | Map the stream: yet another iteratee transformer -- Given the stream of elements of the type el and the function el->el', -- build a nested stream of elements of the type el' and apply the -- given iteratee to it. -- Note the contravariance -- map_stream :: Monad m => (el -> el') -> EnumeratorN el el' m a map_stream f iter@IE_done{} = return iter map_stream f (IE_cont k) = liftI $ IE_cont step where step (Chunk []) = liftI $ IE_cont step step (Chunk str) = k (Chunk (map f str)) >>== map_stream f step EOF = k EOF >>== \r -> liftI $ IE_done r EOF step (Err err) = k (Err err) >>== \r -> liftI $ IE_done r (Err err) -- | Convert one stream into another, not necessarily in `lockstep' -- The transformer map_stream maps one element of the outer stream -- to one element of the nested stream. The transformer below is more -- general: it may take several elements of the outer stream to produce -- one element of the inner stream, or the other way around. -- The transformation from one stream to the other is specified as -- IterateeGM el m (Maybe [el']). The `Maybe' type reflects the -- possibility of the conversion error. -- conv_stream :: Monad m => IterateeGM el m (Maybe [el']) -> EnumeratorN el el' m a conv_stream fi iter@IE_done{} = return iter conv_stream fi (IE_cont k) = fi >>= (conv_stream fi ==<<) . k . maybe (Err "conv: stream error") Chunk -- ------------------------------------------------------------------------ -- | Combining the primitive iteratees to solve the running problem: -- Reading headers and the content from an HTTP-like stream -- type Line = String -- The line of text, terminators are not included -- | Read the line of text from the stream -- The line can be terminated by CR, LF or CRLF. -- Return (Right Line) if successful. Return (Left Line) if EOF or -- a stream error were encountered before the terminator is seen. -- The returned line is the string read so far. -- -- The code is the same as that of pure Iteratee, only the signature -- has changed. -- Compare the code below with GHCBufferIO.line_lazy -- line :: Monad m => IterateeM m (Either Line Line) line = sbreak (\c -> c == '\r' || c == '\n') >>= check_next where check_next (line,Just '\r') = speek >>= \c -> case c of Just '\n' -> snext >> return (Right line) Just _ -> return (Right line) Nothing -> return (Left line) check_next (line,Just _) = return (Right line) check_next (line,Nothing) = return (Left line) -- | Line iteratees: processors of a stream whose elements are made of Lines -- -- Collect all read lines and return them as a list -- see stream2list -- -- Print lines as they are received. This is the first `impure' iteratee -- with non-trivial actions during chunk processing print_lines :: IterateeGM Line IO () print_lines = liftI $ IE_cont step where step (Chunk []) = print_lines step (Chunk ls) = lift (mapM_ pr_line ls) >> print_lines step EOF = lift (putStrLn ">> natural end") >> liftI (IE_done () EOF) step stream = lift (putStrLn ">> unnatural end") >> liftI (IE_done () stream) pr_line line = putStrLn $ ">> read line: " ++ line -- | Convert the stream of characters to the stream of lines, and -- apply the given iteratee to enumerate the latter. -- The stream of lines is normally terminated by the empty line. -- When the stream of characters is terminated, the stream of lines -- is also terminated, abnormally. -- This is the first proper iteratee-enumerator: it is the iteratee of the -- character stream and the enumerator of the line stream. -- More generally, we could have used conv_stream to implement enum_lines. -- enum_lines :: Monad m => EnumeratorN Char Line m a enum_lines iter@IE_done{} = return iter enum_lines (IE_cont k) = line >>= check_line k where check_line k (Right "") = enum_lines ==<< k EOF -- empty line, normal term check_line k (Right l) = enum_lines ==<< k (Chunk [l]) check_line k _ = enum_lines ==<< k (Err "EOF") -- abnormal termin -- | Convert the stream of characters to the stream of words, and -- apply the given iteratee to enumerate the latter. -- Words are delimited by white space. -- This is the analogue of List.words -- It is instructive to compare the code below with the code of -- List.words, which is: -- -- >words :: String -> [String] -- >words s = case dropWhile isSpace s of -- > "" -> [] -- > s' -> w : words s'' -- > where (w, s'') = -- > break isSpace s' -- -- One should keep in mind that enum_words is a more general, monadic -- function. -- More generally, we could have used conv_stream to implement enum_words. -- enum_words :: Monad m => EnumeratorN Char String m a enum_words iter@IE_done{} = return iter enum_words (IE_cont k) = sdropWhile isSpace >> sbreak isSpace >>= check_word k where check_word k ("",_) = enum_words ==<< k EOF check_word k (str,_) = enum_words ==<< k (Chunk [str]) -- ------------------------------------------------------------------------ -- | Enumerators -- Each enumerator takes an iteratee and returns an iteratee -- an Enumerator is an iteratee transformer. -- The enumerator normally stops when the stream is terminated -- or when the iteratee moves to the done state, whichever comes first. -- When to stop is of course up to the enumerator... -- -- We have two choices of composition: compose iteratees or compose -- enumerators. The latter is useful when one iteratee -- reads from the concatenation of two data sources. -- type EnumeratorGM el m a = IterateeG el m a -> IterateeGM el m a type EnumeratorM m a = EnumeratorGM Char m a -- | The most primitive enumerator: applies the iteratee to the terminated -- stream. The result is the iteratee usually in the done state. enum_eof :: Monad m => EnumeratorGM el m a enum_eof iter@(IE_done _ (Err _)) = liftI iter enum_eof (IE_done x _) = liftI $ IE_done x EOF enum_eof (IE_cont k) = k EOF -- | Another primitive enumerator: report an error enum_err :: Monad m => String -> EnumeratorGM el m a enum_err str iter@(IE_done _ (Err _)) = liftI iter enum_err str (IE_done x _) = liftI $ IE_done x (Err str) enum_err str (IE_cont k) = k (Err str) -- | The composition of two enumerators: essentially the functional composition -- It is convenient to flip the order of the arguments of the composition -- though: in e1 >. e2, e1 is executed first -- (>.):: Monad m => EnumeratorGM el m a -> EnumeratorGM el m a -> EnumeratorGM el m a e1 >. e2 = (e2 ==<<) . e1 -- | The pure 1-chunk enumerator -- It passes a given list of elements to the iteratee in one chunk -- This enumerator does no IO and is useful for testing of base parsing enum_pure_1chunk :: Monad m => [el] -> EnumeratorGM el m a enum_pure_1chunk str iter@IE_done{} = liftI $ iter enum_pure_1chunk str (IE_cont k) = k (Chunk str) -- | The pure n-chunk enumerator -- It passes a given lift of elements to the iteratee in n chunks -- This enumerator does no IO and is useful for testing of base parsing -- and handling of chunk boundaries enum_pure_nchunk :: Monad m => [el] -> Int -> EnumeratorGM el m a enum_pure_nchunk str n iter@IE_done{} = liftI $ iter enum_pure_nchunk [] n iter = liftI $ iter enum_pure_nchunk str n (IE_cont k) = enum_pure_nchunk s2 n ==<< k (Chunk s1) where (s1,s2) = splitAt n str -- | The enumerator of a POSIX Fd -- Unlike fdRead (which allocates a new buffer on -- each invocation), we use the same buffer all throughout enum_fd :: Fd -> EnumeratorM IO a enum_fd fd iter = IM $ allocaBytes (fromIntegral buffer_size) (loop iter) where -- buffer_size = 4096 buffer_size = 5 -- for tests; in real life, there should be 1024 or so loop iter@IE_done{} p = return iter loop iter@(IE_cont step) p = do n <- myfdRead fd p buffer_size putStrLn $ "Read buffer, size " ++ either (const "IO err") show n case n of Left errno -> unIM $ step (Err "IO error") Right 0 -> return iter Right n -> do str <- peekCAStringLen (p,fromIntegral n) im <- unIM $ step (Chunk str) loop im p enum_file :: FilePath -> EnumeratorM IO a enum_file filepath iter = IM $ do putStrLn $ "opened file " ++ filepath fd <- openFd filepath ReadOnly Nothing defaultFileFlags r <- unIM $ enum_fd fd iter closeFd fd putStrLn $ "closed file " ++ filepath return r -- | HTTP chunk decoding -- Each chunk has the following format: -- -- > CRLF CRLF -- -- where is the hexadecimal number; is a -- sequence of bytes. -- The last chunk (so-called EOF chunk) has the format -- 0 CRLF CRLF (where 0 is an ASCII zero, a character with the decimal code 48). -- For more detail, see "Chunked Transfer Coding", Sec 3.6.1 of -- the HTTP/1.1 standard: -- -- -- The following enum_chunk_decoded has the signature of the enumerator -- of the nested (encapsulated and chunk-encoded) stream. It receives -- an iteratee for the embedded stream and returns the iteratee for -- the base, embedding stream. Thus what is an enumerator and what -- is an iteratee may be a matter of perspective. -- -- We have a decision to make: Suppose an iteratee has finished (either because -- it obtained all needed data or encountered an error that makes further -- processing meaningless). While skipping the rest of the stream/the trailer, -- we encountered a framing error (e.g., missing CRLF after chunk data). -- What do we do? We chose to disregard the latter problem. -- Rationale: when the iteratee has finished, we are in the process -- of skipping up to the EOF (draining the source). -- Disregarding the errors seems OK then. -- Also, the iteratee may have found an error and decided to abort further -- processing. Flushing the remainder of the input is reasonable then. -- One can make a different choice... -- enum_chunk_decoded :: Monad m => Iteratee m a -> IterateeM m a enum_chunk_decoded = docase where docase iter@IE_done{} = liftI iter >>= (\r -> (enum_chunk_decoded ==<< skip_till_eof) >> return r) docase iter@(IE_cont k) = line >>= check_size where check_size (Right "0") = line >> k EOF check_size (Right str) = maybe (k . Err $ "Bad chunk size: " ++ str) (read_chunk iter) $ read_hex 0 str check_size _ = k (Err "Error reading chunk size") read_chunk iter size = do r <- stake size iter c1 <- snext c2 <- snext case (c1,c2) of (Just '\r',Just '\n') -> docase r _ -> (enum_chunk_decoded ==<< skip_till_eof) >> enum_err "Bad chunk trailer" r read_hex acc "" = Just acc read_hex acc (d:rest) | isHexDigit d = read_hex (16*acc + digitToInt d) rest read_hex acc _ = Nothing -- ------------------------------------------------------------------------ -- Tests -- Pure tests, requiring no IO test_str1 = "header1: v1\rheader2: v2\r\nheader3: v3\nheader4: v4\n" ++ "header5: v5\r\nheader6: v6\r\nheader7: v7\r\n\nrest\n" testp1 = let IE_done (IE_done lines EOF) (Chunk rest) = runIdentity . unIM $ enum_pure_1chunk test_str1 ==<< (enum_lines ==<< stream2list) in lines == ["header1: v1","header2: v2","header3: v3","header4: v4", "header5: v5","header6: v6","header7: v7"] && rest == "rest\n" testp2 = let IE_done (IE_done lines EOF) (Chunk rest) = runIdentity . unIM $ enum_pure_nchunk test_str1 5 ==<< (enum_lines ==<< stream2list) in lines == ["header1: v1","header2: v2","header3: v3","header4: v4", "header5: v5","header6: v6","header7: v7"] && rest == "r" testw1 = let test_str = "header1: v1\rheader2: v2\r\nheader3:\t v3" expected = ["header1:","v1","header2:","v2","header3:","v3"] in let run_test test_str = let IE_done (IE_done words EOF) EOF = runIdentity . unIM $ (enum_pure_nchunk test_str 5 >. enum_eof) ==<< (enum_words ==<< stream2list) in words in and [run_test test_str == expected, run_test (test_str ++ " ") == expected] -- Test Fd driver test_driver line_collector filepath = do fd <- openFd filepath ReadOnly Nothing defaultFileFlags putStrLn "About to read headers" result <- unIM $ (enum_fd fd >. enum_eof) ==<< read_lines_and_one_more_line closeFd fd putStrLn "Finished reading headers" case result of IE_done (IE_done headers EOF,after) _ -> do putStrLn $ "The line after headers is: " ++ show after putStrLn "Complete headers" print headers IE_done (IE_done headers err,_) stream -> do putStrLn $ "Problem " ++ show stream putStrLn "Incomplete headers" print headers where read_lines_and_one_more_line = do lines <- enum_lines ==<< line_collector after <- line return (lines,after) test11 = test_driver stream2list "test1.txt" test12 = test_driver stream2list "test2.txt" test13 = test_driver stream2list "test3.txt" test14 = test_driver stream2list "/dev/null" test21 = test_driver print_lines "test1.txt" test22 = test_driver print_lines "test2.txt" test23 = test_driver print_lines "test3.txt" test24 = test_driver print_lines "/dev/null" -- Run the complete test, reading the headers and the body -- | This simple iteratee is used to process a variety of streams: -- embedded, interleaved, etc. line_printer = enum_lines ==<< print_lines -- |Two sample processors -- -- Read the headers, print the headers, read the lines of the chunk-encoded -- body and print each line as it has been read read_headers_print_body = do headers <- enum_lines ==<< stream2list case headers of IE_done headers EOF -> lift $ do putStrLn "Complete headers" print headers IE_done headers (Err err) -> lift $ do putStrLn $ "Incomplete headers due to " ++ err print headers lift $ putStrLn "\nLines of the body follow" enum_chunk_decoded ==<< line_printer -- | Read the headers and print the header right after it has been read -- Read the lines of the chunk-encoded body and print each line as -- it has been read print_headers_print_body = do lift $ putStrLn "\nLines of the headers follow" line_printer lift $ putStrLn "\nLines of the body follow" enum_chunk_decoded ==<< line_printer test_driver_full iter filepath = do fd <- openFd filepath ReadOnly Nothing defaultFileFlags putStrLn "About to read headers" unIM $ (enum_fd fd >. enum_eof) ==<< iter closeFd fd putStrLn "Finished reading" test31 = test_driver_full read_headers_print_body "test_full1.txt" test32 = test_driver_full read_headers_print_body "test_full2.txt" test33 = test_driver_full read_headers_print_body "test_full3.txt" test34 = test_driver_full print_headers_print_body "test_full3.txt" -- | Interleaved reading from two descriptors using select -- -- If the two arguments are the names of regular files, the driver -- does simple round-robin interleaving, reading a block from one -- file and a block from the other file. If the arguments name -- pipes or devices, the reading becomes truly supply-driven. -- We use select for multiplexing. -- The first argument is the reader-iteratee. It is exactly -- the same iteratee that is being used in the `sequential' tests above. -- By design, two Fds are being read independently and in parallel, -- closely emulating two OS processes each reading from their own file. -- The code below is a simple, round-robin OS scheduler. test_driver_mux iter fpath1 fpath2 = do fd1 <- openFd fpath1 ReadOnly Nothing defaultFileFlags fd2 <- openFd fpath2 ReadOnly Nothing defaultFileFlags let fds = [fd1,fd2] putStrLn $ "Opened file descriptors: " ++ show fds mapM (\(fd,reader) -> unIM reader >>= return . ((,) fd)) (zip fds (repeat iter)) >>= allocaBytes (fromIntegral buffer_size) . loop mapM_ closeFd fds putStrLn $ "Closed file descriptors. All done" where -- we use one single IO buffer for reading buffer_size = 5 -- for tests; in real life, there should be 1024 or so loop fjque buf = do let fds = get_fds fjque if null fds then return () else do selected <- select'read'pending fds case selected of Left errno -> putStrLn "IO Err" >> tell_iteratee_err "IO Err" fjque >> return () Right [] -> loop fjque buf Right sel -> process buf sel fjque -- get Fds from the jobqueue for the unfinished iteratees get_fds = foldr (\ (fd,iter) acc -> case iter of {IE_cont _ -> fd:acc; _ -> acc}) [] -- find the first ready jobqueue element, -- that is, the job queue element whose Fd is in selected. -- Return the element and the rest of the queue get_ready selected jq = (e, before ++ after) where (before,e:after) = break (\(fd,_) -> fd `elem` selected) jq process buf selected fjque = do let ((fd,IE_cont step),fjrest) = get_ready selected fjque n <- myfdRead fd buf buffer_size putStrLn $ unwords ["Read buffer, size", either (const "IO err") show n, "from fd", show fd] case n of Left errno -> unIM (step (Err "IO error")) >> loop fjrest buf Right 0 -> unIM (step EOF) >> loop fjrest buf Right n -> do str <- peekCAStringLen (buf,fromIntegral n) im <- unIM $ step (Chunk str) loop (fjrest ++ [(fd,im)]) buf -- round-robin tell_iteratee_err err = mapM_ (\ (_,iter) -> unIM (enum_err err iter)) -- Running these tests shows true interleaving, of reading from the -- two file descriptors and of printing the results. All IO is interleaved, -- and yet it is safe. No unsafe operations are used. testm1 = test_driver_mux line_printer "test1.txt" "test3.txt" testm2 = test_driver_mux print_headers_print_body "test_full2.txt" "test_full3.txt"