module Pipes.Interleave ( interleave , combine , combineM , merge , mergeM , groupBy ) where import Control.Monad (liftM) import Data.List (sortBy) import Data.Function (on) import Data.Either (rights) import qualified Data.Sequence as Seq import Data.Foldable (toList) import Pipes -- $setup -- >>> import Pipes.Prelude -- >>> import Data.Function -- >>> import Data.Functor.Identity -- | Interleave elements from a set of 'Producers' such that the interleaved -- stream is increasing with respect to the given ordering. -- -- >>> toList $ interleave compare [each [1,3..10], each [1,5..20]] -- [1,1,3,5,5,7,9,9,13,17] -- interleave :: (Monad m) => (a -> a -> Ordering) -- ^ ordering on elements -> [Producer a m ()] -- ^ element producers -> Producer a m () interleave compare producers = do xs <- lift $ rights `liftM` mapM Pipes.next producers go xs where --go :: (Monad m, Functor m) => [(a, Producer a m ())] -> Producer a m () go [] = return () go xs = do let (a,producer):xs' = sortBy (compare `on` fst) xs yield a x' <- lift $ next producer go $ either (const xs') (:xs') x' {-# INLINABLE interleave #-} -- | Given a stream of increasing elements, combine those equal under the -- given equality relation -- -- >>> let append (k,v) (_,v') = return (k, v+v') -- >>> toList $ combine ((==) `on` fst) append (each [(1,1), (1,4), (2,3), (3,10)]) -- [(1,5),(2,3),(3,10)] -- combine :: (Monad m) => (a -> a -> Bool) -- ^ equality test -> (a -> a -> a) -- ^ combine operation -> Producer a m r -> Producer a m r combine eq append = combineM eq (\a b->return $ append a b) {-# INLINEABLE combine #-} -- | 'combine' with monadic side-effects in combine operation. combineM :: (Monad m) => (a -> a -> Bool) -- ^ equality test -> (a -> a -> m a) -- ^ combine operation -> Producer a m r -> Producer a m r combineM eq append producer = lift (next producer) >>= either return (uncurry go) where go a producer' = do n <- lift $ next producer' case n of Left r -> yield a >> return r Right (a', producer'') | a `eq` a' -> do a'' <- lift $ append a a' go a'' producer'' | otherwise -> yield a >> go a' producer'' {-# INLINABLE combineM #-} -- | Equivalent to 'combine' composed with 'interleave' -- -- >>> let append (k,v) (_,v') = return (k, v+v') -- >>> let producers = [ each [(i,2) | i <- [1,3..10]], each [(i,10) | i <- [1,5..20]] ] :: [Producer (Int,Int) Identity ()] -- >>> toList $ merge (compare `on` fst) append producers -- [(1,12),(3,2),(5,12),(7,2),(9,12),(13,10),(17,10)] -- merge :: (Monad m) => (a -> a -> Ordering) -- ^ ordering on elements -> (a -> a -> a) -- ^ combine operation -> [Producer a m ()] -- ^ producers of elements -> Producer a m () merge compare append = mergeM compare (\a b->return $ append a b) {-# INLINABLE merge #-} -- | Merge with monadic side-effects in combine operation. mergeM :: (Monad m) => (a -> a -> Ordering) -- ^ ordering on elements -> (a -> a -> m a) -- ^ combine operation -> [Producer a m ()] -- ^ producers of elements -> Producer a m () mergeM compare append = combineM (\a b->compare a b == EQ) append . interleave compare {-# INLINABLE mergeM #-} -- | Split stream into groups of equal elements. -- Note that this is a non-local operation: if the 'Producer' generates -- a large run of equal elements, all of them will remain in memory until the -- run ends. -- -- >>> toList $ groupBy ((==) `on` fst) (each [(1,1), (1,4), (2,3), (3,10)]) -- [[(1,1),(1,4)],[(2,3)],[(3,10)]] -- groupBy :: (Monad m) => (a -> a -> Bool) -- ^ equality test -> Producer a m r -> Producer [a] m r groupBy eq producer = lift (next producer) >>= either return (\(x,producer)->go (Seq.singleton x) producer) where -- go :: Monad m => Seq.Seq a -> Producer a m r -> Producer [a] m r go xs producer' = do n <- lift $ next producer' case n of Left r -> yield (toList xs) >> return r Right (x, producer'') | x `eq` x0 -> go (xs Seq.|> x) producer'' | otherwise -> yield (toList xs) >> go (Seq.singleton x) producer'' where x0 Seq.:< _ = Seq.viewl xs {-# INLINABLE groupBy #-}