module Data.Conduit.Branching (mkBranchingConduit, branchConduits) where
import Data.Conduit
import qualified Data.Conduit.List as CL
import Control.Monad
import Control.Monad.IO.Class (liftIO)
import Control.Concurrent (MVar, putMVar, takeMVar, newEmptyMVar)
import qualified Data.IntMap as IntMap
import Data.Maybe (mapMaybe)
import Control.Concurrent.ParallelIO
mkBranchingConduit :: (MonadResource m)
=> Int
-> (a -> [Int])
-> IO (Sink a m (), [Source m a])
mkBranchingConduit nbbranches brfunction = do
mvars <- replicateM nbbranches newEmptyMVar
return (mvarSink brfunction mvars, map mvarSource mvars)
mvarSink :: (MonadResource m) => (a -> [Int]) -> [MVar (Maybe a)] -> Sink a m ()
mvarSink brfunc mvs =
let mvarmap = IntMap.fromList (zip [0..] mvs)
doBranch input =
let channels = brfunc input
mvars = mapMaybe (\x -> IntMap.lookup x mvarmap) channels
in mapM_ (\mv -> liftIO $ putMVar mv (Just input)) mvars
in bracketP (return ()) (const $ mapM_ (\mv -> putMVar mv Nothing) mvs) (const $ CL.mapM_ doBranch)
mvarSource :: (MonadResource m) => MVar (Maybe a) -> Source m a
mvarSource mv = do
v <- liftIO $ takeMVar mv
case v of
Just x -> yield x >> mvarSource mv
Nothing -> return ()
branchConduits :: Source (ResourceT IO) a
-> (a -> [Int])
-> [Sink a (ResourceT IO) ()]
-> IO ()
branchConduits src brfunc sinks = do
(newsink, sources) <- mkBranchingConduit (length sinks) brfunc
let srcconduit = src $$ newsink
dstconduits = map (uncurry ($$)) (zip sources sinks)
actions = map runResourceT (srcconduit : dstconduits)
parallel_ actions