module Data.Conduit.Merge (mergeSources, mergeSourcesOn) where
import Control.Monad.Trans (lift)
import Data.Conduit (Producer, Source, await, newResumableSource, yield, ($$++))
import Data.List (sortOn)
mergeSources :: (Ord a, Monad m) => [Source m a] -> Producer m a
mergeSources = mergeSourcesOn id
mergeSourcesOn :: (Ord b, Monad m) => (a -> b) -> [Source m a] -> Producer m a
mergeSourcesOn key = mergeResumable . fmap newResumableSource
where
mergeResumable sources = do
prefetchedSources <- lift $ traverse ($$++ await) sources
go [(a, s) | (s, Just a) <- prefetchedSources]
go [] = pure ()
go sources = do
let (a, src1) : sources1 = sortOn (key . fst) sources
yield a
(src2, mb) <- lift $ src1 $$++ await
let sources2 = case mb of
Nothing -> sources1
Just b -> (b, src2) : sources1
go sources2