module Data.Repa.Flow.Generic.Connect ( -- * Dup dup_oo , dup_io , dup_oi -- * Connect , connect_i -- * Funnel , funnel_i , funnel_o) where import Data.Repa.Flow.Generic.Base import Control.Monad import Prelude as P #include "repa-flow.h" -- Dup ------------------------------------------------------------------------ -- | Send the same data to two consumers. -- -- Given two argument sinks, yield a result sink. -- Pushing to the result sink causes the same element to be pushed to both -- argument sinks. dup_oo :: (Ord i, States i m) => Sinks i m a -> Sinks i m a -> m (Sinks i m a) dup_oo (Sinks n1 push1 eject1) (Sinks n2 push2 eject2) = return $ Sinks (min n1 n2) push_dup eject_dup where push_dup i x = push1 i x >> push2 i x {-# INLINE push_dup #-} eject_dup i = eject1 i >> eject2 i {-# INLINE eject_dup #-} {-# INLINE_FLOW dup_oo #-} -- | Send the same data to two consumers. -- -- Given an argument source and argument sink, yield a result source. -- Pulling an element from the result source pulls from the argument -- source, and pushes that element to the sink, as well as returning it -- via the result source. -- dup_io :: (Ord i, Monad m) => Sources i m a -> Sinks i m a -> m (Sources i m a) dup_io (Sources n1 pull1) (Sinks n2 push2 eject2) = return $ Sources (min n1 n2) pull_dup where pull_dup i eat eject = pull1 i eat_x eject_x where eat_x x = eat x >> push2 i x {-# INLINE eat_x #-} eject_x = eject >> eject2 i {-# INLINE eject_x #-} {-# INLINE pull_dup #-} {-# INLINE_FLOW dup_io #-} -- | Send the same data to two consumers. -- -- Like `dup_io` but with the arguments flipped. -- dup_oi :: (Ord i, Monad m) => Sinks i m a -> Sources i m a -> m (Sources i m a) dup_oi sink1 source2 = dup_io source2 sink1 {-# INLINE_FLOW dup_oi #-} -- Connect -------------------------------------------------------------------- -- | Connect an argument source to two result sources. -- -- Pulling from either result source pulls from the argument source. -- Each result source only gets the elements pulled at the time, -- so if one side pulls all the elements the other side won't get any. -- connect_i :: States i m => Sources i m a -> m (Sources i m a, Sources i m a) connect_i (Sources n pullX) = do refs <- newRefs n Nothing -- IMPORTANT: the pump function is set to NOINLINE so that pullX -- will not be inlined into both consumers. We do not want to -- duplicate that code for both result sources. Instead, calling -- pump writes its element into a ref, and then only the code -- that reads the ref is duplicated. let pump_connect i = pullX i pump_eat pump_eject where pump_eat !x = writeRefs refs i (Just x) {-# INLINE pump_eat #-} pump_eject = writeRefs refs i Nothing {-# INLINE pump_eject #-} {-# NOINLINE pump_connect #-} let pull_splitAt i eat eject = do pump_connect i mx <- readRefs refs i case mx of Just x -> eat x Nothing -> eject {-# INLINE pull_splitAt #-} return ( Sources n pull_splitAt , Sources n pull_splitAt ) {-# INLINE_FLOW connect_i #-} -- Funneling ------------------------------------------------------------------ -- | Given a bundle of sources containing several streams, produce a new -- bundle containing a single stream that gets data from the former. -- -- Streams from the source are consumed in their natural order, -- and a complete stream is consumed before moving onto the next one. -- -- @ -- > import Data.Repa.Flow.Generic -- > toList1 () =<< funnel_i =<< fromList (3 :: Int) [11, 22, 33] -- [11,22,33,11,22,33,11,22,33] -- @ funnel_i :: (States i m, States () m) => Sources i m a -> m (Sources () m a) funnel_i (Sources n pullX) = do -- Ref to hold the current stream index. refCur <- newRefs () first let pull_funnel _ eat eject = do i <- readRefs refCur () pullX i (eat_funnel i) (eject_funnel i) where eat_funnel _ x = eat x {-# INLINE eat_funnel #-} eject_funnel i = case next i n of Nothing -> eject Just i' -> do writeRefs refCur () i' pullX i' (eat_funnel i') (eject_funnel i') {-# INLINE eject_funnel #-} return $ Sources () pull_funnel {-# INLINE funnel_i #-} -- | Given a bundle of sinks consisting of a single stream, produce a new -- bundle of the given arity that sends all data to the former, ignoring -- the stream index. -- -- The argument stream is ejected only when all of the streams in the -- result bundle have been ejected. -- -- * Using this function in conjunction with parallel operators like -- `drainP` introduces non-determinism. Elements pushed to different -- streams in the result bundle could enter the single stream in the -- argument bundle in any order. -- -- @ -- > import Data.Repa.Flow.Generic -- > import Data.Repa.Array.Material -- > import Data.Repa.Nice -- -- > let things = [(0 :: Int, \"foo\"), (1, \"bar\"), (2, \"baz\")] -- > result \<- capture_o B () (\\k -> funnel_o 4 k >>= pushList things) -- > nice result -- [((),\"foo\"),((),\"bar\"),((),\"baz\")] -- @ -- funnel_o :: States i m => i -> Sinks () m a -> m (Sinks i m a) funnel_o nSinks (Sinks _ pushX ejectX) = do -- Refs to track which streams have been ejected. refs <- newRefs nSinks False -- Push all received data into the single stream of the -- argument bundle. let push_funnel _ x = pushX () x {-# INLINE push_funnel #-} -- When all the result streams have been ejected, -- eject the argument stream. let eject_funnel i = do -- RACE: If two concurrent processes eject the final two -- streams then they will both think they were the last -- one, and eject the single argument stream. This is ok -- as we allow the argument sink to be ejected multiple -- times. -- -- See docs of `Sinks` type in "Data.Repa.Flow.Generic.Base". -- writeRefs refs i True done <- foldRefsM (&&) True refs when done $ ejectX () {-# INLINE eject_funnel #-} return $ Sinks nSinks push_funnel eject_funnel {-# INLINE_FLOW funnel_o #-}