module Control.Concurrent.SCC.Streams
(
Sink, Source, SinkFunctor, SourceFunctor, AncestorFunctor,
pipe, pipeP, pipeG, nullSink, nullSource,
get, getWith, peek, put, tryPut,
liftSink, liftSource,
pour, tee, teeSink, teeSource,
getList, putList, putQueue,
getTicked, getWhile, getUntil,
pourTicked, pourWhile, pourUntil,
mapSink, mapStream,
mapMaybeStream, concatMapStream,
mapStreamChunks, foldStream, mapAccumStream, concatMapAccumStream, partitionStream,
mapMStream, mapMStream_, mapMStreamChunks_,
filterMStream, foldMStream, foldMStream_, unfoldMStream, unmapMStream_, unmapMStreamChunks_,
zipWithMStream, parZipWithMStream,
)
where
import qualified Control.Monad
import Control.Monad (liftM, when, unless, foldM)
import Data.Foldable (toList)
import Data.Maybe (mapMaybe)
import Data.List (mapAccumL)
import Data.Sequence (Seq, viewl)
import Control.Cofunctor.Ticker
import Control.Monad.Parallel (MonadParallel(..))
import Control.Monad.Coroutine
import Control.Monad.Coroutine.SuspensionFunctors (EitherFunctor(..), Request, request, liftedLazyTickerRequestResolver)
import Control.Monad.Coroutine.Nested (AncestorFunctor(..), liftAncestor, seesawNested)
type SourceFunctor a x = EitherFunctor a (Request (Ticker x) ([x], Either x (Ticker x)))
type SinkFunctor a x = EitherFunctor a (Request [x] [x])
newtype Sink (m :: * -> *) a x =
Sink
{
putChunk :: forall d. AncestorFunctor a d => [x] -> Coroutine d m [x]
}
newtype Source (m :: * -> *) a x =
Source
{
foldChunk :: forall d. AncestorFunctor a d => Ticker x -> Coroutine d m ([x], Either x (Ticker x))
}
nullSink :: forall m a x. Monad m => Sink m a x
nullSink = Sink{putChunk= const (return [])}
nullSource :: forall m a x. Monad m => Source m a x
nullSource = Source{foldChunk= \t-> return ([], Right t)}
liftSink :: forall m a d x. (Monad m, AncestorFunctor a d) => Sink m a x -> Sink m d x
liftSink s = Sink {putChunk= liftAncestor . (putChunk s :: [x] -> Coroutine d m [x])}
liftSource :: forall m a d x. (Monad m, AncestorFunctor a d) => Source m a x -> Source m d x
liftSource s = Source {foldChunk= liftAncestor . (foldChunk s :: Ticker x -> Coroutine d m ([x], Either x (Ticker x)))}
pipe :: forall m a a1 a2 x r1 r2. (Monad m, Functor a, a1 ~ SinkFunctor a x, a2 ~ SourceFunctor a x) =>
(Sink m a1 x -> Coroutine a1 m r1) -> (Source m a2 x -> Coroutine a2 m r2) -> Coroutine a m (r1, r2)
pipe = pipeG sequentialBinder
pipeP :: forall m a a1 a2 x r1 r2. (MonadParallel m, Functor a, a1 ~ SinkFunctor a x, a2 ~ SourceFunctor a x) =>
(Sink m a1 x -> Coroutine a1 m r1) -> (Source m a2 x -> Coroutine a2 m r2) -> Coroutine a m (r1, r2)
pipeP = pipeG bindM2
pipeG :: forall m a a1 a2 x r1 r2. (Monad m, Functor a, a1 ~ SinkFunctor a x, a2 ~ SourceFunctor a x) =>
PairBinder m -> (Sink m a1 x -> Coroutine a1 m r1) -> (Source m a2 x -> Coroutine a2 m r2)
-> Coroutine a m (r1, r2)
pipeG run2 producer consumer =
liftM (uncurry (flip (,))) $
seesawNested run2 (liftedLazyTickerRequestResolver RightF) (consumer source) (producer sink)
where sink = Sink {putChunk= \xs-> if null xs then return []
else (liftAncestor (mapSuspension RightF (request xs) :: Coroutine a1 m [x]))}
source = Source {foldChunk= \t-> liftAncestor (mapSuspension RightF (request t)
:: Coroutine a2 m ([x], Either x (Ticker x)))}
get :: forall m a d x. (Monad m, AncestorFunctor a d) => Source m a x -> Coroutine d m (Maybe x)
get source = foldChunk source tickOne
>>= return . nullOrElse Nothing (Just . head) . fst
peek :: forall m a d x. (Monad m, AncestorFunctor a d) => Source m a x -> Coroutine d m (Maybe x)
peek source = foldChunk source tickNone >>= return . either Just (const Nothing) . snd
getList :: forall m a d x. (Monad m, AncestorFunctor a d) => Source m a x -> Coroutine d m [x]
getList = getTicked tickAll
getWith :: forall m a d x. (Monad m, AncestorFunctor a d) => (x -> Coroutine d m ()) -> Source m a x -> Coroutine d m ()
getWith consumer source = get source >>= maybe (return ()) consumer
getTicked :: forall m a d x. (Monad m, AncestorFunctor a d) => Ticker x -> Source m a x -> Coroutine d m [x]
getTicked ticker source = loop return ticker
where loop cont t = foldChunk source t
>>= \(chunk, result)-> if null chunk then cont chunk
else either (const $ cont chunk) (loop (cont . (chunk ++))) result
getWhile :: forall m a d x. (Monad m, AncestorFunctor a d) => (x -> Bool) -> Source m a x -> Coroutine d m [x]
getWhile predicate = getTicked (tickWhile predicate)
getUntil :: forall m a d x. (Monad m, AncestorFunctor a d) =>
(x -> Bool) -> Source m a x -> Coroutine d m ([x], Maybe x)
getUntil f source = loop id
where loop cont = foldChunk source (tickUntil f)
>>= \(chunk, result)->
if null chunk then return (cont chunk, either Just (const Nothing) result)
else either (\x-> return (cont chunk, Just x)) (const $ loop (cont . (chunk ++))) result
pour :: forall m a1 a2 d x . (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d)
=> Source m a1 x -> Sink m a2 x -> Coroutine d m ()
pour source sink = loop
where loop = getChunk source >>= nullOrElse (return ()) ((>> loop) . putChunk sink)
pourTicked :: forall m a1 a2 d x . (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d)
=> Ticker x -> Source m a1 x -> Sink m a2 x -> Coroutine d m ()
pourTicked ticker source sink = loop ticker
where loop t = foldChunk source t
>>= \(chunk, next)-> unless (null chunk) (putChunk sink chunk >> either (const $ return ()) loop next)
pourWhile :: forall m a1 a2 d x . (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d)
=> (x -> Bool) -> Source m a1 x -> Sink m a2 x -> Coroutine d m ()
pourWhile f = pourTicked (tickWhile f)
pourUntil :: forall m a1 a2 d x . (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d)
=> (x -> Bool) -> Source m a1 x -> Sink m a2 x -> Coroutine d m (Maybe x)
pourUntil f source sink = loop
where loop = foldChunk source (tickUntil f)
>>= \(chunk, next)-> if null chunk then return (either Just (const Nothing) next)
else putChunk sink chunk >> either (return . Just) (const loop) next
mapStream :: forall m a1 a2 d x y . (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d)
=> (x -> y) -> Source m a1 x -> Sink m a2 y -> Coroutine d m ()
mapStream f source sink = loop
where loop = getChunk source >>= nullOrElse (return ()) ((>> loop) . putChunk sink . map f)
mapSink :: forall m a x y. Monad m => (x -> y) -> Sink m a y -> Sink m a x
mapSink f sink = Sink{putChunk= \xs-> putChunk sink (map f xs)
>>= \rest-> return (dropExcept (length rest) xs)}
where dropExcept :: forall z. Int -> [z] -> [z]
dropExcept 0 _ = []
dropExcept n list = snd (drop' list)
where drop' :: [z] -> (Int, [z])
drop' [] = (0, [])
drop' (x:xs) = let r@(len, tl) = drop' xs in if len < n then (succ len, x:tl) else r
mapMaybeStream :: forall m a1 a2 d x y . (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d)
=> (x -> Maybe y) -> Source m a1 x -> Sink m a2 y -> Coroutine d m ()
mapMaybeStream f source sink = mapMStreamChunks_ ((>> return ()) . putChunk sink . mapMaybe f) source
concatMapStream :: forall m a1 a2 d x y . (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d)
=> (x -> [y]) -> Source m a1 x -> Sink m a2 y -> Coroutine d m ()
concatMapStream f source sink = loop
where loop = getChunk source >>= nullOrElse (return ()) ((>> loop) . putChunk sink . concatMap f)
mapAccumStream :: forall m a1 a2 d x y acc . (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d)
=> (acc -> x -> (acc, y)) -> acc -> Source m a1 x -> Sink m a2 y -> Coroutine d m acc
mapAccumStream f acc source sink = foldMStreamChunks (\a xs-> dispatch $ mapAccumL f a xs) acc source
where dispatch (a, ys) = putChunk sink ys >> return a
concatMapAccumStream :: forall m a1 a2 d x y acc . (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d)
=> (acc -> x -> (acc, [y])) -> acc -> Source m a1 x -> Sink m a2 y -> Coroutine d m acc
concatMapAccumStream f acc source sink = foldMStreamChunks (\a xs-> dispatch $ concatMapAccumL a xs) acc source
where dispatch (a, ys) = putChunk sink ys >> return a
concatMapAccumL s [] = (s, [])
concatMapAccumL s (x:xs) = (s'', y ++ ys)
where (s', y ) = f s x
(s'', ys) = concatMapAccumL s' xs
mapStreamChunks :: forall m a1 a2 d x y . (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d)
=> ([x] -> [y]) -> Source m a1 x -> Sink m a2 y -> Coroutine d m ()
mapStreamChunks f source sink = loop
where loop = getChunk source >>= nullOrElse (return ()) ((>> loop) . flip putList sink . f)
mapMStream :: forall m a1 a2 d x y . (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d)
=> (x -> Coroutine d m y) -> Source m a1 x -> Sink m a2 y -> Coroutine d m ()
mapMStream f source sink = loop
where loop = getChunk source >>= nullOrElse (return ()) ((>> loop) . (putChunk sink =<<) . mapM f)
mapMStream_ :: forall m a d x . (Monad m, AncestorFunctor a d)
=> (x -> Coroutine d m ()) -> Source m a x -> Coroutine d m ()
mapMStream_ f = mapMStreamChunks_ (Control.Monad.mapM_ f)
mapMStreamChunks_ :: forall m a d x . (Monad m, AncestorFunctor a d)
=> ([x] -> Coroutine d m ()) -> Source m a x -> Coroutine d m ()
mapMStreamChunks_ f source = loop
where loop = getChunk source >>= nullOrElse (return ()) ((>> loop) . f)
filterMStream :: forall m a1 a2 d x . (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d)
=> (x -> Coroutine d m Bool) -> Source m a1 x -> Sink m a2 x -> Coroutine d m ()
filterMStream f source sink = mapMStream_ (\x-> f x >>= flip when (put sink x)) source
foldStream :: forall m a d x acc . (Monad m, AncestorFunctor a d)
=> (acc -> x -> acc) -> acc -> Source m a x -> Coroutine d m acc
foldStream f acc source = loop acc
where loop s = getChunk source >>= nullOrElse (return s) (loop . foldl f s)
foldMStream :: forall m a d x acc . (Monad m, AncestorFunctor a d)
=> (acc -> x -> Coroutine d m acc) -> acc -> Source m a x -> Coroutine d m acc
foldMStream f acc source = loop acc
where loop a = getChunk source >>= nullOrElse (return a) ((loop =<<) . foldM f a)
foldMStream_ :: forall m a d x acc . (Monad m, AncestorFunctor a d)
=> (acc -> x -> Coroutine d m acc) -> acc -> Source m a x -> Coroutine d m ()
foldMStream_ f acc source = foldMStream f acc source >> return ()
foldMStreamChunks :: forall m a d x acc . (Monad m, AncestorFunctor a d)
=> (acc -> [x] -> Coroutine d m acc) -> acc -> Source m a x -> Coroutine d m acc
foldMStreamChunks f acc source = loop acc
where loop a = getChunk source >>= nullOrElse (return a) ((loop =<<) . f a)
unfoldMStream :: forall m a d x acc . (Monad m, AncestorFunctor a d)
=> (acc -> Coroutine d m (Maybe (x, acc))) -> acc -> Sink m a x -> Coroutine d m acc
unfoldMStream f acc sink = loop acc
where loop a = f a >>= maybe (return a) (\(x, acc')-> put sink x >> loop acc')
unmapMStream_ :: forall m a d x . (Monad m, AncestorFunctor a d)
=> Coroutine d m (Maybe x) -> Sink m a x -> Coroutine d m ()
unmapMStream_ f sink = loop
where loop = f >>= maybe (return ()) (\x-> put sink x >> loop)
unmapMStreamChunks_ :: forall m a d x . (Monad m, AncestorFunctor a d)
=> Coroutine d m [x] -> Sink m a x -> Coroutine d m ()
unmapMStreamChunks_ f sink = loop >> return ()
where loop = f >>= nullOrElse (return []) ((>>= nullOrElse loop return) . putChunk sink)
partitionStream :: forall m a1 a2 a3 d x . (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d)
=> (x -> Bool) -> Source m a1 x -> Sink m a2 x -> Sink m a3 x -> Coroutine d m ()
partitionStream f source true false = mapMStreamChunks_ partitionChunk source
where partitionChunk (x:rest) = partitionTo (f x) x rest
partitionChunk [] = error "Chunks cannot be empty!"
partitionTo False x chunk = let (falses, rest) = break f chunk
in putChunk false (x:falses)
>> case rest of y:ys -> partitionTo True y ys
[] -> return ()
partitionTo True x chunk = let (trues, rest) = span f chunk
in putChunk true (x:trues)
>> case rest of y:ys -> partitionTo False y ys
[] -> return ()
zipWithMStream :: forall m a1 a2 a3 d x y z. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d)
=> (x -> y -> Coroutine d m z) -> Source m a1 x -> Source m a2 y -> Sink m a3 z -> Coroutine d m ()
zipWithMStream f source1 source2 sink = loop
where loop = do mx <- get source1
my <- get source2
case (mx, my) of (Just x, Just y) -> f x y >>= put sink >> loop
_ -> return ()
parZipWithMStream :: forall m a1 a2 a3 d x y z.
(MonadParallel m, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d)
=> (x -> y -> Coroutine d m z) -> Source m a1 x -> Source m a2 y -> Sink m a3 z -> Coroutine d m ()
parZipWithMStream f source1 source2 sink = loop
where loop = bindM2 zipMaybe (get source1) (get source2)
zipMaybe (Just x) (Just y) = f x y >>= put sink >> loop
zipMaybe _ _ = return ()
tee :: forall m a1 a2 a3 d x . (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d)
=> Source m a1 x -> Sink m a2 x -> Sink m a3 x -> Coroutine d m ()
tee source sink1 sink2 = distribute
where distribute = getChunk source
>>= nullOrElse (return ()) (\x-> putChunk sink1 x >> putChunk sink2 x >> distribute)
teeSink :: forall m a1 a2 a3 x . (Monad m, AncestorFunctor a1 a3, AncestorFunctor a2 a3)
=> Sink m a1 x -> Sink m a2 x -> Sink m a3 x
teeSink s1 s2 = Sink{putChunk= teeChunk}
where teeChunk :: forall d. AncestorFunctor a3 d => [x] -> Coroutine d m [x]
teeChunk x = putChunk s1' x >> putChunk s2' x
s1' :: Sink m a3 x
s1' = liftSink s1
s2' :: Sink m a3 x
s2' = liftSink s2
teeSource :: forall m a1 a2 a3 x . (Monad m, AncestorFunctor a1 a3, AncestorFunctor a2 a3)
=> Sink m a1 x -> Source m a2 x -> Source m a3 x
teeSource sink source = Source{foldChunk= teeChunk}
where teeChunk :: forall d. AncestorFunctor a3 d => Ticker x -> Coroutine d m ([x], Either x (Ticker x))
teeChunk t = do p@(chunk, _) <- foldChunk source' t
_ <- if null chunk then return [] else putChunk sink' chunk
return p
sink' :: Sink m a3 x
sink' = liftSink sink
source' :: Source m a3 x
source' = liftSource source
put :: forall m a d x. (Monad m, AncestorFunctor a d) => Sink m a x -> x -> Coroutine d m ()
put sink x = putChunk sink [x] >> return ()
tryPut :: forall m a d x. (Monad m, AncestorFunctor a d) => Sink m a x -> x -> Coroutine d m Bool
tryPut sink x = liftM null $ putChunk sink [x]
putList :: forall m a d x. (Monad m, AncestorFunctor a d) => [x] -> Sink m a x -> Coroutine d m [x]
putList l sink = if null l then return [] else putChunk sink l
getChunk :: forall m a d x. (Monad m, AncestorFunctor a d) => Source m a x -> Coroutine d m [x]
getChunk source = liftM fst $ foldChunk source tickAll
putQueue :: forall m a d x. (Monad m, AncestorFunctor a d) => Seq x -> Sink m a x -> Coroutine d m [x]
putQueue q sink = putList (toList (viewl q)) sink
nullOrElse :: a -> ([x] -> a) -> [x] -> a
nullOrElse nullCase _ [] = nullCase
nullOrElse _ f list = f list