{-| @WARNING: executables using this function must be compiled with -threaded@ These functions let you connect several sinks to a single source, according to a branching strategy. For example : @ module Main where import Data.Conduit.Branching import Data.Conduit import qualified Data.Conduit.List as CL import Control.Monad.IO.Class src :: Monad m => Producer m (Either Int String) src = CL.sourceList [Left 5, Left 4, Right \"five\", Right \"four\"] sinkString :: (Monad m, MonadIO m) => Sink (Either Int String) m () sinkString = CL.mapM_ $ \(Right x) -> liftIO (putStrLn (\"This is a string: \" ++ x)) sinkInt :: (Monad m, MonadIO m) => Sink (Either Int String) m () sinkInt = CL.mapM_ $ \(Left x) -> liftIO (putStrLn (\"This is an integer: \" ++ show x)) sinkLog :: (Monad m, MonadIO m) => Sink (Either Int String) m () sinkLog = CL.mapM_ (liftIO . putStrLn . (\"Raw logging: \" ++) . show) main :: IO () main = branchConduits src branching [sinkInt, sinkString, sinkLog] where branching (Left _) = [0,2] branching (Right _) = [1,2] @ -} module Data.Conduit.Branching (mkBranchingConduit, branchConduits) where import Data.Conduit import qualified Data.Conduit.List as CL import Control.Monad import Control.Monad.IO.Class (liftIO) import qualified Data.IntMap as IntMap import Data.Maybe (mapMaybe) import Control.Concurrent.ParallelIO import Data.Conduit.TMChan import GHC.Conc (atomically) mkSink :: (MonadResource m) => (a -> [Int]) -> [TBMChan a] -> Sink a m () mkSink brfunc chans = let cmap = IntMap.fromList $ zip [0..] chans -- creates an intmap with all output channels querymap x = IntMap.lookup x cmap -- query the intmap to get a Maybe (TBMChan a) cleanup = mapM_ (liftIO . atomically . closeTBMChan) chans -- this is the cleanup function that closes all chans inject input = let outchans = mapMaybe querymap (brfunc input) -- compiles the list of output channels in mapM_ (\c -> liftIO $ atomically $ writeTBMChan c input) outchans -- and write into them in addCleanup (const cleanup) (CL.mapM_ inject) mkBranchingConduit :: (MonadResource m) => Int -- ^ Number of branches -> (a -> [Int]) -- ^ Branching function, where 0 is the first branch -> IO (Sink a m (), [Source m a]) -- ^ Returns a sink and N sources mkBranchingConduit nbbranches brfunction = do chans <- replicateM nbbranches (newTBMChanIO 16) return (mkSink brfunction chans, map sourceTBMChan chans) branchConduits :: Source (ResourceT IO) a -- ^ The source to branch from -> (a -> [Int]) -- ^ The branching function (0 is the first sink) -> [Sink a (ResourceT IO) ()] -- ^ The destination sinks -> IO () -- ^ Results of the sinks branchConduits src brfunc sinks = do (newsink, sources) <- mkBranchingConduit (length sinks) brfunc let srcconduit = src $$ newsink dstconduits = map (uncurry ($$)) (zip sources sinks) actions = map runResourceT (srcconduit : dstconduits) parallel_ actions