module Data.Repa.Flow.Simple.Operator ( -- * Constructors repeat_i , replicate_i , prepend_i -- * Mapping , map_i, map_o -- * Connecting , dup_oo, dup_io, dup_oi , connect_i -- * Splitting , head_i , peek_i -- * Grouping , groups_i -- * Packing , pack_ii -- * Folding , folds_ii -- * Watching , watch_i , watch_o , trigger_o -- * Ignorance , ignore_o , abandon_o) where import Data.Repa.Flow.Simple.Base import Data.Repa.Flow.States (States (..)) import qualified Data.Repa.Flow.Generic as G #include "repa-flow.h" -- Constructors --------------------------------------------------------------- -- | Yield a source that always produces the same value. repeat_i :: States () m => a -> m (Source m a) repeat_i x = G.repeat_i () (const x) {-# INLINE repeat_i #-} -- | Yield a source of the given length that always produces the same value. replicate_i :: States () m => Int -> a -> m (Source m a) replicate_i n x = G.replicate_i () n (const x) {-# INLINE replicate_i #-} -- | Prepend some more elements to the front of a source. prepend_i :: States () m => [a] -> Source m a -> m (Source m a) prepend_i = G.prepend_i {-# INLINE prepend_i #-} -- Mapping -------------------------------------------------------------------- -- | Apply a function to every element pulled from some source, -- producing a new source. map_i :: States () m => (a -> b) -> Source m a -> m (Source m b) map_i f s = G.smap_i (\_ x -> f x) s {-# INLINE map_i #-} -- | Apply a function to every element pushed to some sink, -- producing a new sink. map_o :: States () m => (a -> b) -> Sink m b -> m (Sink m a) map_o f s = G.smap_o (\_ x -> f x) s {-# INLINE map_o #-} -- Connecting ----------------------------------------------------------------- -- | Send the same data to two consumers. -- -- Given two argument sinks, yield a result sink. -- Pushing to the result sink causes the same element to be pushed to both -- argument sinks. dup_oo :: States () m => Sink m a -> Sink m a -> m (Sink m a) dup_oo = G.dup_oo {-# INLINE dup_oo #-} -- | Send the same data to two consumers. -- -- Given an argument source and argument sink, yield a result source. -- Pulling an element from the result source pulls from the argument source, -- and pushes that element to the sink, as well as returning it via the -- result source. dup_io :: States () m => Source m a -> Sink m a -> m (Source m a) dup_io = G.dup_io {-# INLINE dup_io #-} -- | Send the same data to two consumers. -- -- Like `dup_io` but with the arguments flipped. -- dup_oi :: States () m => Sink m a -> Source m a -> m (Source m a) dup_oi = G.dup_oi {-# INLINE dup_oi #-} -- | Connect an argument source to two result sources. -- -- Pulling from either result source pulls from the argument source. -- Each result source only gets the elements pulled at the time, -- so if one side pulls all the elements the other side won't get any. connect_i :: States () m => Source m a -> m (Source m a, Source m a) connect_i = G.connect_i {-# INLINE connect_i #-} -- Splitting ------------------------------------------------------------------ -- | Split the given number of elements from the head of a source, -- returning those elements in a list, and yielding a new source -- for the rest. head_i :: States () m => Int -> Source m a -> m ([a], Source m a) head_i len s0 = G.head_i len s0 () {-# INLINE head_i #-} -- | Peek at the given number of elements in the stream, -- returning a result stream that still produces them all. peek_i :: States () m => Int -> Source m a -> m ([a], Source m a) peek_i n s0 = do (s1, s2) <- G.connect_i s0 xs <- G.takeList1 n () s1 s3 <- G.prepend_i xs s2 return (xs, s3) {-# INLINE peek_i #-} -- Grouping ------------------------------------------------------------------- -- | From a stream of values which has consecutive runs of idential values, -- produce a stream of the lengths of these runs. -- -- Example: groups [4, 4, 4, 3, 3, 1, 1, 1, 4] = [3, 2, 3, 1] -- groups_i :: (Monad m, Eq a) => Source m a -> m (Source m Int) groups_i = G.groups_i {-# INLINE groups_i #-} -- Packing -------------------------------------------------------------------- -- | Given a stream of flags and a stream of values, produce a new stream -- of values where the corresponding flag was True. The length of the result -- is the length of the shorter of the two inputs. pack_ii :: Monad m => Source m Bool -> Source m a -> m (Source m a) pack_ii s0 s1 = G.pack_ii s0 s1 {-# INLINE pack_ii #-} -- Folding -------------------------------------------------------------------- -- | Segmented fold. folds_ii :: Monad m => (a -> a -> a) -> a -> Source m Int -> Source m a -> m (Source m a) folds_ii f z s0 s1 = G.folds_ii f z s0 s1 {-# INLINE folds_ii #-} -- Watching ------------------------------------------------------------------- -- | Apply a monadic function to every element pulled from a source -- producing a new source. watch_i :: Monad m => (a -> m ()) -> Source m a -> m (Source m a) watch_i f s0 = G.watch_i (\_ x -> f x) s0 {-# INLINE watch_i #-} -- | Pass elements to the provided action as they are pushed to the sink. watch_o :: Monad m => (a -> m ()) -> Sink m a -> m (Sink m a) watch_o f s0 = G.watch_o (\_ x -> f x) s0 {-# INLINE watch_o #-} -- | Like `watch` but doesn't pass elements to another sink. trigger_o :: Monad m => (a -> m ()) -> m (Sink m a) trigger_o f = G.trigger_o () (\_ x -> f x) {-# INLINE trigger_o #-} -- Ignorance ------------------------------------------------------------------ -- | A sink that ignores all incoming elements. -- -- This sink is strict in the elements, so they are demanded before being -- discarded. Haskell debugging thunks attached to the elements will be demanded. ignore_o :: Monad m => m (Sink m a) ignore_o = G.ignore_o () {-# INLINE ignore_o #-} -- | A sink that drops all data on the floor. -- -- This sink is non-strict in the elements. -- Haskell tracing thinks attached to the elements will *not* be demanded. abandon_o :: Monad m => m (Sink m a) abandon_o = G.abandon_o () {-# INLINE abandon_o #-}