{-# LANGUAGE ScopedTypeVariables #-} module Pipes.Interleave ( interleave , combine , combineM , merge , mergeM , groupBy ) where import Control.Monad (liftM) import Data.Either (rights) import qualified Data.Heap as H import qualified Data.Sequence as Seq import Data.Foldable (toList) import Pipes -- $setup -- >>> import Pipes.Prelude -- >>> import Data.Function -- >>> import Data.Functor.Identity -- | Interleave elements from a set of 'Producers' such that the interleaved -- stream is increasing with respect to the given ordering. -- -- >>> toList $ interleave [each [1,3..10], each [1,5..20]] -- [1,1,3,5,5,7,9,9,13,17] -- interleave :: forall a m. (Monad m, Ord a) => [Producer a m ()] -- ^ element producers -> Producer a m () interleave producers = do xs <- lift $ rights `liftM` mapM Pipes.next producers go (H.fromList $ map (uncurry H.Entry) xs) where go :: (Monad m, Functor m) => H.Heap (H.Entry a (Producer a m ())) -> Producer a m () go xs | Just (H.Entry a producer, xs') <- H.viewMin xs = do yield a x' <- lift $ next producer go $ either (const xs') (\(x,prod) -> H.insert (H.Entry x prod) xs') x' | otherwise = return () {-# INLINABLE interleave #-} -- | Given a stream of increasing elements, combine those that are equal. -- -- >>> let append (Entry k v) (Entry _ v') = Entry k (v+v') -- >>> toList $ combine append (each $ map (uncurry Entry) [(1,1), (1,4), (2,3), (3,10)]) -- [Entry {priority = 1, payload = 5},Entry {priority = 2, payload = 3},Entry {priority = 3, payload = 10}] -- combine :: (Monad m, Eq a) => (a -> a -> a) -- ^ combine operation -> Producer a m r -> Producer a m r combine append = combineM (\a b->return $ append a b) {-# INLINEABLE combine #-} -- | 'combine' with monadic side-effects in the combine operation. combineM :: (Monad m, Eq a) => (a -> a -> m a) -- ^ combine operation -> Producer a m r -> Producer a m r combineM append producer = lift (next producer) >>= either return (uncurry go) where go a producer' = do n <- lift $ next producer' case n of Left r -> yield a >> return r Right (a', producer'') | a == a' -> do a'' <- lift $ append a a' go a'' producer'' | otherwise -> yield a >> go a' producer'' {-# INLINABLE combineM #-} -- | Equivalent to 'combine' composed with 'interleave' -- -- >>> let append (Entry k v) (Entry _ v') = Entry k (v+v') -- >>> let producers = [ each [Entry i 2 | i <- [1,3..10]], each [Entry i 10 | i <- [1,5..20]] ] :: [Producer (Entry Int Int) Identity ()] -- >>> toList $ merge append producers -- [(1,12),(3,2),(5,12),(7,2),(9,12),(13,10),(17,10)] -- merge :: (Monad m, Ord a) => (a -> a -> a) -- ^ combine operation -> [Producer a m ()] -- ^ producers of elements -> Producer a m () merge append = mergeM (\a b->return $ append a b) {-# INLINABLE merge #-} -- | Merge with monadic side-effects in the combine operation. mergeM :: (Monad m, Ord a) => (a -> a -> m a) -- ^ combine operation -> [Producer a m ()] -- ^ producers of elements -> Producer a m () mergeM append = combineM append . interleave {-# INLINABLE mergeM #-} -- | Split stream into groups of equal elements. -- Note that this is a non-local operation: if the 'Producer' generates -- a large run of equal elements, all of them will remain in memory until the -- run ends. -- -- >>> toList $ groupBy (each [Entry 1 1, Entry 1 4, Entry 2 3, Entry 3 10]) -- [[Entry {priority = 1, payload = 1},Entry {priority = 1, payload = 4}],[Entry {priority = 2, payload = 3}],[Entry {priority = 3, payload = 10}]] -- groupBy :: forall a r m. (Monad m, Ord a) => Producer a m r -> Producer [a] m r groupBy producer = lift (next producer) >>= either return (\(x,producer)->go (Seq.singleton x) producer) where go :: Seq.Seq a -> Producer a m r -> Producer [a] m r go xs producer' = do n <- lift $ next producer' case n of Left r -> yield (toList xs) >> return r Right (x, producer'') | x == x0 -> go (xs Seq.|> x) producer'' | otherwise -> yield (toList xs) >> go (Seq.singleton x) producer'' where x0 Seq.:< _ = Seq.viewl xs {-# INLINABLE groupBy #-}