module Data.Conduit.Branching (mkBranchingConduit, branchConduits) where
import Data.Conduit
import qualified Data.Conduit.List as CL
import Control.Monad
import Control.Monad.IO.Class (liftIO)
import qualified Data.IntMap as IntMap
import Data.Maybe (mapMaybe)
import Control.Concurrent.ParallelIO
import Data.Conduit.TMChan
import GHC.Conc (atomically)
mkSink :: (MonadResource m)
=> (a -> [Int])
-> [TBMChan a]
-> Sink a m ()
mkSink brfunc chans =
let cmap = IntMap.fromList $ zip [0..] chans
querymap x = IntMap.lookup x cmap
cleanup = mapM_ (liftIO . atomically . closeTBMChan) chans
inject input =
let outchans = mapMaybe querymap (brfunc input)
in mapM_ (\c -> liftIO $ atomically $ writeTBMChan c input) outchans
in addCleanup (const cleanup) (CL.mapM_ inject)
mkBranchingConduit :: (MonadResource m)
=> Int
-> (a -> [Int])
-> IO (Sink a m (), [Source m a])
mkBranchingConduit nbbranches brfunction = do
chans <- replicateM nbbranches (newTBMChanIO 16)
return (mkSink brfunction chans, map sourceTBMChan chans)
branchConduits :: Source (ResourceT IO) a
-> (a -> [Int])
-> [Sink a (ResourceT IO) ()]
-> IO ()
branchConduits src brfunc sinks = do
(newsink, sources) <- mkBranchingConduit (length sinks) brfunc
let srcconduit = src $$ newsink
dstconduits = map (uncurry ($$)) (zip sources sinks)
actions = map runResourceT (srcconduit : dstconduits)
parallel_ actions