{-# LANGUAGE RankNTypes #-}
module Data.Conduit.Merge (mergeSources, mergeSourcesOn) where
import Control.Monad.Trans (lift)
import Data.Conduit (ConduitT, await, sealConduitT, yield, ($$++))
import Data.Foldable (toList)
import Data.List (sortOn)
mergeSources :: (Ord a, Foldable f, Monad m) => f (ConduitT () a m ()) -> ConduitT i a m ()
mergeSources = mergeSourcesOn id
{-# INLINE mergeSources #-}
mergeSourcesOn
:: (Ord b, Foldable f, Monad m)
=> (a -> b) -> f (ConduitT () a m ()) -> ConduitT i a m ()
mergeSourcesOn key = mergeSealed . fmap sealConduitT . toList where
mergeSealed sources = do
prefetchedSources <- lift $ traverse ($$++ await) sources
go [(a, s) | (s, Just a) <- prefetchedSources]
go sources =
case sortOn (key . fst) sources of
[] -> pure ()
(a, src1) : sources1 -> do
yield a
(src2, mb) <- lift $ src1 $$++ await
let sources2 = case mb of
Nothing -> sources1
Just b -> (b, src2) : sources1
go sources2