{- Copyright 2010 Mario Blazevic This file is part of the Streaming Component Combinators (SCC) project. The SCC project is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. SCC is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with SCC. If not, see . -} -- | This module defines 'Source' and 'Sink' types and 'pipe' functions that create them. The method 'get' on 'Source' -- abstracts away 'Control.Concurrent.SCC.Coroutine.await', and the method 'put' on 'Sink' is a higher-level -- abstraction of 'Control.Concurrent.SCC.Coroutine.yield'. With this arrangement, a single coroutine can yield values -- to multiple sinks and await values from multiple sources with no need to change the -- 'Control.Concurrent.SCC.Coroutine.Coroutine' functor; the only requirement is for each funtor of the sources and -- sinks the coroutine uses to be an 'Control.Concurrent.SCC.Coroutine.AncestorFunctor' of the coroutine's -- functor. For example, coroutine /zip/ that takes two sources and one sink would be declared like this: -- -- @ -- zip :: forall m a1 a2 a3 d x y. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d) -- => Source m a1 x -> Source m a2 y -> Sink m a3 (x, y) -> Coroutine d m () -- @ -- -- Sources, sinks, and coroutines communicating through them are all created using the 'pipe' function or one of its -- variants. They effectively split the current coroutine into a producer-consumer coroutine pair. The producer gets a -- new 'Sink' to write to and the consumer a new 'Source' to read from, in addition to all the streams that are visible -- in the original coroutine. The following function, for example, uses the /zip/ coroutine above to add together the -- values from two Integer sources: -- -- @ -- add :: forall m a1 a2 a3 d. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d) -- => Source m a1 Integer -> Source m a2 Integer -> Sink m a3 Integer -> Coroutine d m () -- add source1 source2 sink = do pipe -- (\pairSink-> zip source1 source2 pairSink) -- producer coroutine -- (\pairSource-> pourMap (uncurry (+)) pairSource sink) -- consumer coroutine -- return () -- @ {-# LANGUAGE ScopedTypeVariables, Rank2Types, TypeFamilies, KindSignatures #-} module Control.Concurrent.SCC.Streams ( -- * Sink and Source types Sink(put, canPut), Source(get), SinkFunctor, SourceFunctor, -- * Various pipe functions pipe, pipeP, pipePS, -- * Utility functions get', getSuccess, liftSink, liftSource, consumeAndSuppress, tee, pour, pourMap, getList, putList, putQueue, cond, whenNull ) where import Control.Concurrent.Coroutine import Control.Monad (when) import Data.Foldable (toList) import Data.Sequence (Seq, viewl) type TryYield x = EitherFunctor (Yield x) (Await Bool) tryYield :: forall m x. Monad m => x -> Coroutine (TryYield x) m Bool tryYield x = suspend (LeftF (Yield x (suspend (RightF (Await return))))) canYield :: forall m x. Monad m => Coroutine (TryYield x) m Bool canYield = suspend (RightF (Await return)) type SourceFunctor a x = EitherFunctor a (Await (Maybe x)) type SinkFunctor a x = EitherFunctor a (TryYield x) -- | A 'Sink' can be used to yield values from any nested `Coroutine` computation whose functor provably descends from -- the functor /a/. It's the write-only end of a 'Pipe' communication channel. data Sink (m :: * -> *) a x = Sink { -- | Function 'put' tries to put a value into the given `Sink`. The intervening 'Coroutine' computations suspend up -- to the 'pipe' invocation that has created the argument sink. The result of 'put' indicates whether the operation -- succeded. put :: forall d. (AncestorFunctor a d) => x -> Coroutine d m Bool, -- | Function 'canPut' checks if the argument `Sink` accepts values, i.e., whether a 'put' operation would succeed on -- the sink. canPut :: forall d. (AncestorFunctor a d) => Coroutine d m Bool } -- | A 'Source' can be used to read values into any nested `Coroutine` computation whose functor provably descends from -- the functor /a/. It's the read-only end of a 'Pipe' communication channel. newtype Source (m :: * -> *) a x = Source { -- | Function 'get' tries to get a value from the given 'Source' argument. The intervening 'Coroutine' computations -- suspend all the way to the 'pipe' function invocation that created the source. The function returns 'Nothing' if -- the argument source is empty. get :: forall d. (AncestorFunctor a d) => Coroutine d m (Maybe x) } -- | Converts a 'Sink' on the ancestor functor /a/ into a sink on the descendant functor /d/. liftSink :: forall m a d x. (Monad m, AncestorFunctor a d) => Sink m a x -> Sink m d x liftSink s = Sink {put= liftOut . (put s :: x -> Coroutine d m Bool), canPut= liftOut (canPut s :: Coroutine d m Bool)} -- | Converts a 'Source' on the ancestor functor /a/ into a source on the descendant functor /d/. liftSource :: forall m a d x. (Monad m, AncestorFunctor a d) => Source m a x -> Source m d x liftSource s = Source {get= liftOut (get s :: Coroutine d m (Maybe x))} -- | The 'pipe' function splits the computation into two concurrent parts, /producer/ and /consumer/. The /producer/ is -- given a 'Sink' to put values into, and /consumer/ a 'Source' to get those values from. Once producer and consumer -- both complete, 'pipe' returns their paired results. pipe :: forall m a a1 a2 x r1 r2. (Monad m, Functor a, a1 ~ SinkFunctor a x, a2 ~ SourceFunctor a x) => (Sink m a1 x -> Coroutine a1 m r1) -> (Source m a2 x -> Coroutine a2 m r2) -> Coroutine a m (r1, r2) pipe = pipeG (\ f mx my -> do {x <- mx; y <- my; f x y}) -- | The 'pipeP' function is equivalent to 'pipe', except the /producer/ and /consumer/ are run in parallel. pipeP :: forall m a a1 a2 x r1 r2. (ParallelizableMonad m, Functor a, a1 ~ SinkFunctor a x, a2 ~ SourceFunctor a x) => (Sink m a1 x -> Coroutine a1 m r1) -> (Source m a2 x -> Coroutine a2 m r2) -> Coroutine a m (r1, r2) pipeP = pipeG bindM2 -- | The 'pipePS' function acts either as 'pipeP' or as 'pipe', depending on the argument /parallel/. pipePS :: forall m a a1 a2 x r1 r2. (ParallelizableMonad m, Functor a, a1 ~ SinkFunctor a x, a2 ~ SourceFunctor a x) => Bool -> (Sink m a1 x -> Coroutine a1 m r1) -> (Source m a2 x -> Coroutine a2 m r2) -> Coroutine a m (r1, r2) pipePS parallel = if parallel then pipeP else pipe -- | A generic version of 'pipe'. The first argument is used to combine two computation steps. pipeG :: forall m a a1 a2 x r1 r2. (Monad m, Functor a, a1 ~ SinkFunctor a x, a2 ~ SourceFunctor a x) => (forall x y r. (x -> y -> m r) -> m x -> m y -> m r) -> (Sink m a1 x -> Coroutine a1 m r1) -> (Source m a2 x -> Coroutine a2 m r2) -> Coroutine a m (r1, r2) pipeG run2 producer consumer = seesawNested run2 resolver (producer sink) (consumer source) where sink = Sink {put= liftOut . (local . tryYield :: x -> Coroutine a1 m Bool), canPut= liftOut (local canYield :: Coroutine a1 m Bool)} :: Sink m a1 x source = Source (liftOut (local await :: Coroutine a2 m (Maybe x))) :: Source m a2 x resolver = SeesawResolver { resumeLeft= \s-> case s of (LeftF (Yield _ c))-> c (RightF (Await c))-> c False, resumeRight = \(Await c)-> c Nothing, resumeAny= \ resumeProducer _ resumeBoth s (Await cc) -> case s of LeftF (Yield x cp) -> resumeBoth cp (cc (Just x)) RightF (Await cp) -> resumeProducer (cp True) } getSuccess :: forall m a d x . (Monad m, AncestorFunctor a d) => Source m a x -> (x -> Coroutine d m ()) {- ^ Success continuation -} -> Coroutine d m () getSuccess source succeed = get source >>= maybe (return ()) succeed -- | Function 'get'' assumes that the argument source is not empty and returns the value the source yields. If the -- source is empty, the function throws an error. get' :: forall m a d x . (Monad m, AncestorFunctor a d) => Source m a x -> Coroutine d m x get' source = get source >>= maybe (error "get' failed") return -- | 'pour' copies all data from the /source/ argument into the /sink/ argument, as long as there is anything to copy -- and the sink accepts it. pour :: forall m a1 a2 d x . (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d) => Source m a1 x -> Sink m a2 x -> Coroutine d m () pour source sink = fill' where fill' = canPut sink >>= flip when (getSuccess source (\x-> put sink x >> fill')) -- | 'pourMap' is like 'pour' that applies the function /f/ to each argument before passing it into the /sink/. pourMap :: forall m a1 a2 d x y . (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d) => (x -> y) -> Source m a1 x -> Sink m a2 y -> Coroutine d m () pourMap f source sink = loop where loop = canPut sink >>= flip when (get source >>= maybe (return ()) (\x-> put sink (f x) >> loop)) -- | 'pourMapMaybe' is to 'pourMap' like 'Data.Maybe.mapMaybe' is to 'Data.List.Map'. pourMapMaybe :: forall m a1 a2 d x y . (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d) => (x -> Maybe y) -> Source m a1 x -> Sink m a2 y -> Coroutine d m () pourMapMaybe f source sink = loop where loop = canPut sink >>= flip when (get source >>= maybe (return ()) (\x-> maybe (return False) (put sink) (f x) >> loop)) -- | 'tee' is similar to 'pour' except it distributes every input value from the /source/ arguments into both /sink1/ -- and /sink2/. tee :: forall m a1 a2 a3 d x . (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d) => Source m a1 x -> Sink m a2 x -> Sink m a3 x -> Coroutine d m () tee source sink1 sink2 = distribute where distribute = do c1 <- canPut sink1 c2 <- canPut sink2 when (c1 && c2) (get source >>= maybe (return ()) (\x-> put sink1 x >> put sink2 x >> distribute)) -- | 'putList' puts entire list into its /sink/ argument, as long as the sink accepts it. The remainder that wasn't -- accepted by the sink is the result value. putList :: forall m a d x. (Monad m, AncestorFunctor a d) => [x] -> Sink m a x -> Coroutine d m [x] putList [] sink = return [] putList l@(x:rest) sink = put sink x >>= cond (putList rest sink) (return l) -- | 'getList' returns the list of all values generated by the source. getList :: forall m a d x. (Monad m, AncestorFunctor a d) => Source m a x -> Coroutine d m [x] getList source = getList' return where getList' f = get source >>= maybe (f []) (\x-> getList' (f . (x:))) -- | 'consumeAndSuppress' consumes the entire source ignoring the values it generates. consumeAndSuppress :: forall m a d x. (Monad m, AncestorFunctor a d) => Source m a x -> Coroutine d m () consumeAndSuppress source = get source >>= maybe (return ()) (const (consumeAndSuppress source)) -- | A utility function wrapping if-then-else, useful for handling monadic truth values cond :: a -> a -> Bool -> a cond x y test = if test then x else y -- | A utility function, useful for handling monadic list values where empty list means success whenNull :: forall a m. Monad m => m [a] -> [a] -> m [a] whenNull action list = if null list then action else return list -- | Like 'putList', except it puts the contents of the given 'Data.Sequence.Seq' into the sink. putQueue :: forall m a d x. (Monad m, AncestorFunctor a d) => Seq x -> Sink m a x -> Coroutine d m [x] putQueue q sink = putList (toList (viewl q)) sink