{-# LANGUAGE RankNTypes #-} module Data.Conduit.Merge (mergeSources) where import Control.Monad.Trans (lift) import Data.Conduit (Producer, Source, await, newResumableSource, yield, ($$++)) import Data.List (sortOn) mergeSources :: (Ord a, Monad m) => [Source m a] -> Producer m a mergeSources = mergeResumable . map newResumableSource where mergeResumable sources = do prefetchedSources <- lift $ mapM ($$++ await) sources go [(a, s) | (s, Just a) <- prefetchedSources] go [] = pure () go sources = do let (a, src1) : sources1 = sortOn fst sources yield a (src2, mb) <- lift $ src1 $$++ await let sources2 = case mb of Nothing -> sources1 Just b -> (b, src2) : sources1 go sources2