{- 
    Copyright 2010 Mario Blazevic

    This file is part of the Streaming Component Combinators (SCC) project.

    The SCC project is free software: you can redistribute it and/or modify it under the terms of the GNU General Public
    License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later
    version.

    SCC is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty
    of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more details.

    You should have received a copy of the GNU General Public License along with SCC.  If not, see
    <http://www.gnu.org/licenses/>.
-}

-- | This module defines 'Source' and 'Sink' types and 'pipe' functions that create them. The method 'get' on 'Source'
-- abstracts away 'Control.Concurrent.SCC.Coroutine.await', and the method 'put' on 'Sink' is a higher-level
-- abstraction of 'Control.Concurrent.SCC.Coroutine.yield'. With this arrangement, a single coroutine can yield values
-- to multiple sinks and await values from multiple sources with no need to change the
-- 'Control.Concurrent.SCC.Coroutine.Coroutine' functor; the only requirement is for each funtor of the sources and
-- sinks the coroutine uses to be an 'Control.Concurrent.SCC.Coroutine.AncestorFunctor' of the coroutine's
-- functor. For example, coroutine /zip/ that takes two sources and one sink would be declared like this:
-- 
-- @
-- zip :: forall m a1 a2 a3 d x y. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d)
--        => Source m a1 x -> Source m a2 y -> Sink m a3 (x, y) -> Coroutine d m ()
-- @
-- 
-- Sources, sinks, and coroutines communicating through them are all created using the 'pipe' function or one of its
-- variants. They effectively split the current coroutine into a producer-consumer coroutine pair. The producer gets a
-- new 'Sink' to write to and the consumer a new 'Source' to read from, in addition to all the streams that are visible
-- in the original coroutine. The following function, for example, uses the /zip/ coroutine above to add together the
-- values from two Integer sources:
--
-- @
-- add :: forall m a1 a2 a3 d. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d)
--        => Source m a1 Integer -> Source m a2 Integer -> Sink m a3 Integer -> Coroutine d m ()
-- add source1 source2 sink = do pipe
--                                  (\pairSink-> zip source1 source2 pairSink)            -- producer coroutine
--                                  (\pairSource-> pourMap (uncurry (+)) pairSource sink) -- consumer coroutine
--                               return ()
-- @

{-# LANGUAGE ScopedTypeVariables, Rank2Types, TypeFamilies, KindSignatures #-}

module Control.Concurrent.SCC.Streams
   (
    -- * Sink and Source types
    Sink(put, canPut), Source(get),
    SinkFunctor, SourceFunctor,
    -- * Various pipe functions
    pipe, pipeP, pipePS,
    -- * Utility functions
    get', getSuccess,
    liftSink, liftSource,
    consumeAndSuppress, tee, pour, pourMap, getList, putList, putQueue,
    cond, whenNull
   )
where

import Control.Concurrent.Coroutine

import Control.Monad (when)
import Data.Foldable (toList)
import Data.Sequence (Seq, viewl)

type TryYield x = EitherFunctor (Yield x) (Await Bool)

tryYield :: forall m x. Monad m => x -> Coroutine (TryYield x) m Bool
tryYield x = suspend (LeftF (Yield x (suspend (RightF (Await return)))))

canYield :: forall m x. Monad m => Coroutine (TryYield x) m Bool
canYield = suspend (RightF (Await return))

type SourceFunctor a x = EitherFunctor a (Await (Maybe x))
type SinkFunctor a x = EitherFunctor a (TryYield x)

-- | A 'Sink' can be used to yield values from any nested `Coroutine` computation whose functor provably descends from
-- the functor /a/. It's the write-only end of a 'Pipe' communication channel.
data Sink (m :: * -> *) a x =
   Sink
   {
   -- | Function 'put' tries to put a value into the given `Sink`. The intervening 'Coroutine' computations suspend up
   -- to the 'pipe' invocation that has created the argument sink. The result of 'put' indicates whether the operation
   -- succeded.
   put :: forall d. (AncestorFunctor a d) => x -> Coroutine d m Bool,
   -- | Function 'canPut' checks if the argument `Sink` accepts values, i.e., whether a 'put' operation would succeed on
   -- the sink.
   canPut :: forall d. (AncestorFunctor a d) => Coroutine d m Bool
   }

-- | A 'Source' can be used to read values into any nested `Coroutine` computation whose functor provably descends from
-- the functor /a/. It's the read-only end of a 'Pipe' communication channel.
newtype Source (m :: * -> *) a x =
   Source
   {
   -- | Function 'get' tries to get a value from the given 'Source' argument. The intervening 'Coroutine' computations
   -- suspend all the way to the 'pipe' function invocation that created the source. The function returns 'Nothing' if
   -- the argument source is empty.
   get :: forall d. (AncestorFunctor a d) => Coroutine d m (Maybe x)
   }

-- | Converts a 'Sink' on the ancestor functor /a/ into a sink on the descendant functor /d/.
liftSink :: forall m a d x. (Monad m, AncestorFunctor a d) => Sink m a x -> Sink m d x
liftSink s = Sink {put= liftOut . (put s :: x -> Coroutine d m Bool),
                   canPut= liftOut (canPut s :: Coroutine d m Bool)}

-- | Converts a 'Source' on the ancestor functor /a/ into a source on the descendant functor /d/.
liftSource :: forall m a d x. (Monad m, AncestorFunctor a d) => Source m a x -> Source m d x
liftSource s = Source {get= liftOut (get s :: Coroutine d m (Maybe x))}

-- | The 'pipe' function splits the computation into two concurrent parts, /producer/ and /consumer/. The /producer/ is
-- given a 'Sink' to put values into, and /consumer/ a 'Source' to get those values from. Once producer and consumer
-- both complete, 'pipe' returns their paired results.
pipe :: forall m a a1 a2 x r1 r2. (Monad m, Functor a, a1 ~ SinkFunctor a x, a2 ~ SourceFunctor a x) =>
        (Sink m a1 x -> Coroutine a1 m r1) -> (Source m a2 x -> Coroutine a2 m r2) -> Coroutine a m (r1, r2)
pipe = pipeG (\ f mx my -> do {x <- mx; y <- my; f x y})

-- | The 'pipeP' function is equivalent to 'pipe', except the /producer/ and /consumer/ are run in parallel.
pipeP :: forall m a a1 a2 x r1 r2. (ParallelizableMonad m, Functor a, a1 ~ SinkFunctor a x, a2 ~ SourceFunctor a x) =>
         (Sink m a1 x -> Coroutine a1 m r1) -> (Source m a2 x -> Coroutine a2 m r2) -> Coroutine a m (r1, r2)
pipeP = pipeG bindM2

-- | The 'pipePS' function acts either as 'pipeP' or as 'pipe', depending on the argument /parallel/.
pipePS :: forall m a a1 a2 x r1 r2. (ParallelizableMonad m, Functor a, a1 ~ SinkFunctor a x, a2 ~ SourceFunctor a x) =>
          Bool -> (Sink m a1 x -> Coroutine a1 m r1) -> (Source m a2 x -> Coroutine a2 m r2) ->
          Coroutine a m (r1, r2)
pipePS parallel = if parallel then pipeP else pipe

-- | A generic version of 'pipe'. The first argument is used to combine two computation steps.
pipeG :: forall m a a1 a2 x r1 r2. (Monad m, Functor a, a1 ~ SinkFunctor a x, a2 ~ SourceFunctor a x) =>
         (forall x y r. (x -> y -> m r) -> m x -> m y -> m r)
      -> (Sink m a1 x -> Coroutine a1 m r1) -> (Source m a2 x -> Coroutine a2 m r2)
      -> Coroutine a m (r1, r2)
pipeG run2 producer consumer =
   seesawNested run2 resolver (producer sink) (consumer source)
   where sink = Sink {put= liftOut . (local . tryYield :: x -> Coroutine a1 m Bool),
                      canPut= liftOut (local canYield :: Coroutine a1 m Bool)} :: Sink m a1 x
         source = Source (liftOut (local await :: Coroutine a2 m (Maybe x))) :: Source m a2 x
         resolver = SeesawResolver {
                      resumeLeft= \s-> case s of (LeftF (Yield _ c))-> c
                                                 (RightF (Await c))-> c False,
                      resumeRight = \(Await c)-> c Nothing,
                      resumeAny= \ resumeProducer _ resumeBoth s (Await cc) ->
                                 case s of LeftF (Yield x cp) -> resumeBoth cp (cc (Just x))
                                           RightF (Await cp) -> resumeProducer (cp True)
                    }

getSuccess :: forall m a d x . (Monad m, AncestorFunctor a d)
              => Source m a x -> (x -> Coroutine d m ()) {- ^ Success continuation -} -> Coroutine d m ()
getSuccess source succeed = get source >>= maybe (return ()) succeed

-- | Function 'get'' assumes that the argument source is not empty and returns the value the source yields. If the
-- source is empty, the function throws an error.
get' :: forall m a d x . (Monad m, AncestorFunctor a d) => Source m a x -> Coroutine d m x
get' source = get source >>= maybe (error "get' failed") return

-- | 'pour' copies all data from the /source/ argument into the /sink/ argument, as long as there is anything to copy
-- and the sink accepts it.
pour :: forall m a1 a2 d x . (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d)
        => Source m a1 x -> Sink m a2 x -> Coroutine d m ()
pour source sink = fill'
   where fill' = canPut sink >>= flip when (getSuccess source (\x-> put sink x >> fill'))

-- | 'pourMap' is like 'pour' that applies the function /f/ to each argument before passing it into the /sink/.
pourMap :: forall m a1 a2 d x y . (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d)
           => (x -> y) -> Source m a1 x -> Sink m a2 y -> Coroutine d m ()
pourMap f source sink = loop
   where loop = canPut sink >>= flip when (get source >>= maybe (return ()) (\x-> put sink (f x) >> loop))

-- | 'pourMapMaybe' is to 'pourMap' like 'Data.Maybe.mapMaybe' is to 'Data.List.Map'.
pourMapMaybe :: forall m a1 a2 d x y . (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d)
                => (x -> Maybe y) -> Source m a1 x -> Sink m a2 y -> Coroutine d m ()
pourMapMaybe f source sink = loop
   where loop = canPut sink >>= flip when (get source >>= maybe (return ()) (\x-> maybe (return False) (put sink) (f x) >> loop))

-- | 'tee' is similar to 'pour' except it distributes every input value from the /source/ arguments into both /sink1/
-- and /sink2/.
tee :: forall m a1 a2 a3 d x . (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d)
       => Source m a1 x -> Sink m a2 x -> Sink m a3 x -> Coroutine d m ()
tee source sink1 sink2 = distribute
   where distribute = do c1 <- canPut sink1
                         c2 <- canPut sink2
                         when (c1 && c2)
                            (get source >>= maybe (return ()) (\x-> put sink1 x >> put sink2 x >> distribute))

-- | 'putList' puts entire list into its /sink/ argument, as long as the sink accepts it. The remainder that wasn't
-- accepted by the sink is the result value.
putList :: forall m a d x. (Monad m, AncestorFunctor a d) => [x] -> Sink m a x -> Coroutine d m [x]
putList [] sink = return []
putList l@(x:rest) sink = put sink x >>= cond (putList rest sink) (return l)

-- | 'getList' returns the list of all values generated by the source.
getList :: forall m a d x. (Monad m, AncestorFunctor a d) => Source m a x -> Coroutine d m [x]
getList source = getList' return
   where getList' f = get source >>= maybe (f []) (\x-> getList' (f . (x:)))

-- | 'consumeAndSuppress' consumes the entire source ignoring the values it generates.
consumeAndSuppress :: forall m a d x. (Monad m, AncestorFunctor a d) => Source m a x -> Coroutine d m ()
consumeAndSuppress source = get source
                            >>= maybe (return ()) (const (consumeAndSuppress source))

-- | A utility function wrapping if-then-else, useful for handling monadic truth values
cond :: a -> a -> Bool -> a
cond x y test = if test then x else y

-- | A utility function, useful for handling monadic list values where empty list means success
whenNull :: forall a m. Monad m => m [a] -> [a] -> m [a]
whenNull action list = if null list then action else return list

-- | Like 'putList', except it puts the contents of the given 'Data.Sequence.Seq' into the sink.
putQueue :: forall m a d x. (Monad m, AncestorFunctor a d) => Seq x -> Sink m a x -> Coroutine d m [x]
putQueue q sink = putList (toList (viewl q)) sink