-- | Stream utilities for working with concurrent channels. {-# LANGUAGE BangPatterns #-} {-# LANGUAGE CPP #-} module System.IO.Streams.Concurrent ( -- * Channel conversions inputToChan , chanToInput , chanToOutput , concurrentMerge , makeChanPipe ) where ------------------------------------------------------------------------------ #if !MIN_VERSION_base(4,8,0) import Control.Applicative ((<$>), (<*>)) #endif import Control.Concurrent (forkIO) import Control.Concurrent.Chan (Chan, newChan, readChan, writeChan) import Control.Concurrent.MVar (modifyMVar, newEmptyMVar, newMVar, putMVar, takeMVar) import Control.Exception (SomeException, mask, throwIO, try) import Control.Monad (forM_) import Prelude hiding (read) ------------------------------------------------------------------------------ import System.IO.Streams.Internal (InputStream, OutputStream, makeInputStream, makeOutputStream, read) ------------------------------------------------------------------------------ -- | Writes the contents of an input stream to a channel until the input stream -- yields end-of-stream. inputToChan :: InputStream a -> Chan (Maybe a) -> IO () inputToChan is ch = go where go = do mb <- read is writeChan ch mb maybe (return $! ()) (const go) mb ------------------------------------------------------------------------------ -- | Turns a 'Chan' into an input stream. -- chanToInput :: Chan (Maybe a) -> IO (InputStream a) chanToInput ch = makeInputStream $! readChan ch ------------------------------------------------------------------------------ -- | Turns a 'Chan' into an output stream. -- chanToOutput :: Chan (Maybe a) -> IO (OutputStream a) chanToOutput = makeOutputStream . writeChan ------------------------------------------------------------------------------ -- | Concurrently merges a list of 'InputStream's, combining values in the -- order they become available. -- -- Note: does /not/ forward individual end-of-stream notifications, the -- produced stream does not yield end-of-stream until all of the input streams -- have finished. -- -- This traps exceptions in each concurrent thread and re-raises them in the -- current thread. concurrentMerge :: [InputStream a] -> IO (InputStream a) concurrentMerge iss = do mv <- newEmptyMVar nleft <- newMVar $! length iss mask $ \restore -> forM_ iss $ \is -> forkIO $ do let producer = do emb <- try $ restore $ read is case emb of Left exc -> do putMVar mv (Left (exc :: SomeException)) producer Right Nothing -> putMVar mv $! Right Nothing Right x -> putMVar mv (Right x) >> producer producer makeInputStream $ chunk mv nleft where chunk mv nleft = do emb <- takeMVar mv case emb of Left exc -> throwIO exc Right Nothing -> do x <- modifyMVar nleft $ \n -> let !n' = n - 1 in return $! (n', n') if x > 0 then chunk mv nleft else return Nothing Right x -> return x -------------------------------------------------------------------------------- -- | Create a new pair of streams using an underlying 'Chan'. Everything written -- to the 'OutputStream' will appear as-is on the 'InputStream'. -- -- Since reading from the 'InputStream' and writing to the 'OutputStream' are -- blocking calls, be sure to do so in different threads. makeChanPipe :: IO (InputStream a, OutputStream a) makeChanPipe = do chan <- newChan (,) <$> chanToInput chan <*> chanToOutput chan