{- 
    Copyright 2008 Mario Blazevic

    This file is part of the Streaming Component Combinators (SCC) project.

    The SCC project is free software: you can redistribute it and/or modify it under the terms of the GNU General Public
    License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later
    version.

    SCC is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty
    of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more details.

    You should have received a copy of the GNU General Public License along with SCC.  If not, see
    <http://www.gnu.org/licenses/>.
-}

-- | The "Combinators" module defines combinators applicable to 'Transducer' and 'Splitter' components defined in the
-- "ComponentTypes" module.

{-# LANGUAGE ScopedTypeVariables, Rank2Types #-}

module Control.Concurrent.SCC.Combinators
   (-- * Consumer and producer combinators
    (->>), (<<-),
    -- * Transducer combinators
    (>->), join,
    -- * Pseudo-logic splitter combinators
    -- | Combinators '>&' and '>|' are only /pseudo/-logic. While the laws of double negation and De Morgan's laws hold,
    -- '>&' and '>|' are in general not commutative, associative, nor idempotent. In the special case when all argument
    -- splitters are stateless, such as those produced by 'Components.liftStatelessSplitter', these combinators do satisfy
    -- all laws of Boolean algebra.
    snot, (>&), (>|),
    -- ** Zipping logic combinators
    -- | The '&&' and '||' combinators run the argument splitters in parallel and combine their logical outputs using
    -- the corresponding logical operation on each output pair, in a manner similar to 'Prelude.zipWith'.
    (&&), (||),
    -- * Flow-control combinators
    -- | The following combinators resemble the common flow-control programming language constructs. Combinators 
    -- 'wherever', 'unless', and 'select' are just the special cases of the combinator 'ifs'.
    --
    --    * /transducer/ ``wherever`` /splitter/ = 'ifs' /splitter/ /transducer/ 'Components.asis'
    --
    --    * /transducer/ ``unless`` /splitter/ = 'ifs' /splitter/ 'Components.asis' /transducer/
    --
    --    * 'select' /splitter/ = 'ifs' /splitter/ 'Components.asis' 'Components.suppress'
    --
    ifs, wherever, unless, select,
    -- ** Recursive
    while, nestedIn,
    -- * Section-based combinators
    -- | All combinators in this section use their 'Splitter' argument to determine the
    -- structure of the input. Every contiguous portion of the input that gets passed to one or the other sink of the
    -- splitter is treated as one section in the logical structure of the input stream. What is done with the section
    -- depends on the combinator, but the sections, and therefore the logical structure of the input stream, are
    -- determined by the argument splitter alone.
    foreach, having, havingOnly, followedBy, even,
    -- ** first and its variants
    first, uptoFirst, prefix,
    -- ** last and its variants
    last, lastAndAfter, suffix,
    -- ** input ranges
    between, (...))
where

import Control.Concurrent.SCC.Foundation
import Control.Concurrent.SCC.ComponentTypes

import Prelude hiding (even, last, (||), (&&))
import qualified Prelude
import Control.Exception (assert)
import Control.Monad (liftM, when)
import qualified Control.Monad as Monad
import Data.Maybe (isJust, isNothing, fromJust)
import Data.Typeable (Typeable)
import qualified Data.Foldable as Foldable
import qualified Data.Sequence as Seq
import Data.Sequence (Seq, (|>), (><), ViewL (EmptyL, (:<)))

import Debug.Trace (trace)


infixr ->>

-- | The result of combinator '->>' is a consumer that acts as a composition of the given transducer and consumer
-- arguments.
(->>) :: forall x y m r. (Monad m, Typeable x, Typeable y) => Transducer m x y -> Consumer m y r -> Consumer m x r
Transducer t ->> consumer = consumer'
   where consumer' source = liftM snd $ pipeD "->>" (t source) consumer

-- | The result of combinator '<<-' is a producer that acts as a composition of the given transducer and producer
-- arguments.
(<<-) :: forall x y m r c c1. (Monad m, Typeable x, Typeable y) => Transducer m x y -> Producer m x r -> Producer m y r
Transducer t <<- producer = producer'
   where producer' sink = liftM fst $ pipeD "<<-" producer (flip t sink)


-- | The '>->' combinator composes its argument transducers. The resulting composition /t1 >-> t2/ passes its input through the
-- first transducer /t1/, the output of /t1/ is passed to the other transducer /t2/, and its output becomes the output of the
-- composition.
(>->) :: forall m x y z. Monad m => Transducer m x y -> Transducer m y z -> Transducer m x z
Transducer t1 >-> Transducer t2 = Transducer t
   where t source sink = liftM fst $ pipeD ">->" (t1 source) (flip t2 sink)

-- | The 'join' combinator arranges the two transducer arguments in parallel. The input of the resulting transducer is replicated
-- to both component transducers in parallel, and the output of the resulting transducer is a concatenation of the two component
-- transducers' outputs.
join :: (Monad m, Typeable x) => Transducer m x y -> Transducer m x y -> Transducer m x y
join (Transducer t1) (Transducer t2) = Transducer t
   where t source sink = do (((), l), extra) <- pipeD "join 1"
                                                   (\sink1-> pipeD "join 2" (\sink2-> tee source sink1 sink2) getList)
                                                   (flip t1 sink)
                            pipeD "join 3" (putList l) (flip t2 sink)
                            return extra
-- | The 'snot' (streaming not) combinator simply reverses the outputs of the argument splitter.
-- In other words, data that the argument splitter sends to its /true/ sink goes to the /false/ sink of the result, and vice versa.
snot :: (Monad m, Typeable x) => Splitter m x -> Splitter m x
snot splitter = liftSectionSplitter (\source true false-> splitSections splitter source false true)

-- | The '>&' combinator sends the /true/ sink output of its left operand to the input of its right operand for further
-- splitting. Both operands' /false/ sinks are connected to the /false/ sink of the combined splitter, but any input
-- value to reach the /true/ sink of the combined component data must be deemed true by both splitters.
(>&) :: (Monad m, Typeable x) => Splitter m x -> Splitter m x -> Splitter m x
s1 >& s2 = liftSimpleSplitter (\source true false->
                               liftM fst $ pipeD ">&" (\true-> split s1 source true false) (\source-> split s2 source true false))

-- | A '>|' combinator's input value can reach its /false/ sink only by going through both argument splitters' /false/
-- sinks.
(>|) :: (Monad m, Typeable x) => Splitter m x -> Splitter m x -> Splitter m x
s1 >| s2 = liftSimpleSplitter (\source true false->
                               liftM fst $ pipeD ">|" (split s1 source true) (\source-> split s2 source true false))

-- | Combinator '&&' is a pairwise logical conjunction of two splitters run in parallel on the same input.
(&&) :: (Monad m, Typeable x) => Splitter m x -> Splitter m x -> Splitter m x
(&&) = zipSplittersWith (Prelude.&&)

-- | Combinator '||' is a pairwise logical disjunction of two splitters run in parallel on the same input.
(||) :: (Monad m, Typeable x) => Splitter m x -> Splitter m x -> Splitter m x
(||) = zipSplittersWith (Prelude.||)

-- | The result of the combinator 'ifs' is a transducer that applies one argument transducer to one portion of
-- the input and the other transducer to the other portion of input, depending on where the splitter argument routes the data.
ifs :: (Monad m, Typeable x) => Splitter m x -> Transducer m x y -> Transducer m x y -> Transducer m x y
ifs s (Transducer t1) (Transducer t2) = Transducer t
   where t source sink = liftM fst3 $ splitConsumer "ifs" s (flip t1 sink) (flip t2 sink) source

wherever :: (Monad m, Typeable x) => Transducer m x x -> Splitter m x -> Transducer m x x
wherever (Transducer t) s = Transducer wherever'
   where wherever' source sink = liftM fst3 $ splitConsumer "wherever" s (flip t sink) (flip pour sink) source

unless :: (Monad m, Typeable x) => Transducer m x x -> Splitter m x -> Transducer m x x
unless (Transducer t) s = Transducer unless'
   where unless' source sink = liftM fst3 $ splitConsumer "unless" s (flip pour sink) (flip t sink) source

select :: (Monad m, Typeable x) => Splitter m x -> Transducer m x x
select s = Transducer (\source sink-> liftM fst3 $ splitConsumer "select" s (flip pour sink) consumeAndSuppress source)

-- | The recursive combinator 'while' feeds the true sink of the argument splitter back to itself, modified by the
-- argument transducer. Data fed to the splitter's false sink is passed on unmodified.
while :: (Monad m, Typeable x) => Transducer m x x -> Splitter m x -> Transducer m x x
while t s = Transducer while'
   where while' source sink = liftM fst3 $ splitConsumer "while" s (t ->> while t s ->> flip pour sink) (flip pour sink) source

-- | The recursive combinator 'nestedIn' combines two splitters into a mutually recursive loop acting as a single splitter.
-- The true  sink of one of the argument splitters and false sink of the other become the true and false sinks of the loop.
-- The other two sinks are bound to the other splitter's source.
-- The use of 'nestedIn' makes sense only on hierarchically structured streams. If we gave it some input containing
-- a flat sequence of values, and assuming both component splitters are deterministic and stateless,
-- a value would either not loop at all or it would loop forever.
nestedIn :: (Monad m, Typeable x) => Splitter m x -> Splitter m x -> Splitter m x
nestedIn s1 s2 = s
   where s = liftSimpleSplitter (\source true false->
                                 liftM fst $
                                 pipe (\false-> split s1 source true false)
                                      (\source-> pipe (\true-> split s2 source true false)
                                                 (\source-> split (nestedIn s1 s2) source true false)))

-- | The 'foreach' combinator is similar to the combinator 'ifs' in that it combines a splitter and two transducers into
-- another transducer. However, in this case the transducers are re-instantiated for each consecutive portion of the
-- input as the splitter chunks it up. Each contiguous portion of the input that the splitter sends to one of its two
-- sinks gets transducered through the appropriate argument transducer as that transducer's whole input. As soon as the
-- contiguous portion is finished, the transducer gets terminated.
foreach :: (Monad m, Typeable x, Typeable y) => Splitter m x -> Transducer m x y -> Transducer m x y -> Transducer m x y
foreach s t1 t2 = Transducer t
   where t source sink = liftM fst $
                         pipeD "foreach"
                            (transduce (splitterToMarker s) source)
                            (\source-> groupMarks source (\b chunk-> transduce (if b then t1 else t2) chunk sink))

-- | The 'having' combinator combines two pure splitters into a pure splitter. One splitter is used to chunk the input
-- into contiguous portions. Its /false/ sink is routed directly to the /false/ sink of the combined splitter. The
-- second splitter is instantiated and run on each portion of the input that goes to first splitter's /true/ sink. If
-- the second splitter sends any output at all to its /true/ sink, the whole input portion is passed on to the /true/
-- sink of the combined splitter, otherwise it goes to its /false/ sink.
having :: (Monad m, Typeable x) => Splitter m x -> Splitter m x -> Splitter m x
having s1 s2 = liftSectionSplitter s
   where s source true false = liftM fst $
                               pipeD "having"
                                  (transduce (splitterToMarker s1) source)
                                  (\source-> groupMarks source (\b chunk-> if b then test chunk else pourMaybe chunk false))
            where test chunk = pipe (\sink1-> pipe (\sink2-> tee chunk sink1 sink2) getList)
                                    (\chunk-> pipe (\sink-> suppressProducer (split s2 chunk sink)) getList)
                               >>= \(((), chunk), (_, truePart))-> let chunk' = if null chunk
                                                                                then [Nothing]
                                                                                else map Just chunk
                                                                   in (if null truePart
                                                                       then putList chunk' false
                                                                       else putList chunk' true)
                                                                      >> return ()

-- | The 'havingOnly' combinator is analogous to the 'having' combinator, but it succeeds and passes each chunk of the
-- input to its /true/ sink only if the second splitter sends no part of it to its /false/ sink.
havingOnly :: (Monad m, Typeable x) => Splitter m x -> Splitter m x -> Splitter m x
havingOnly s1 s2 = liftSectionSplitter s
   where s source true false = liftM fst $
                               pipeD "havingOnly"
                                  (transduce (splitterToMarker s1) source)
                                  (\source-> groupMarks source (\b chunk-> if b then test chunk else pourMaybe chunk false))
            where test chunk = pipe (\sink1-> pipe (\sink2-> tee chunk sink1 sink2) getList)
                                    (\chunk-> pipe (\sink-> suppressProducer (\suppress-> split s2 chunk suppress sink))
                                                   getList)
                               >>= \(((), chunk), (_, falsePart))-> let chunk' = if null chunk
                                                                                 then [Nothing]
                                                                                 else map Just chunk
                                                                    in (if null falsePart
                                                                        then putList chunk' true
                                                                        else putList chunk' false)
                                                                       >> return ()

-- | The result of combinator 'first' behaves the same as the argument splitter up to and including the first portion of
-- the input which goes into the argument's /true/ sink. All input following the first true portion goes into the
-- /false/ sink.
first :: (Monad m, Typeable x) => Splitter m x -> Splitter m x
first splitter = liftSectionSplitter s
   where s source true false = liftM (\(x, y)-> y ++ x) $
                               pipeD "first" (transduce (splitterToMarker splitter) source)
                               (\source-> let get1 (x, False) = p false x get1
                                              get1 (x, True) = p true x get2
                                              get2 (x, True) = p true x get2
                                              get2 (x, False) = p false x get3
                                              get3 (x, _) = p false x get3
                                              p sink x succeed = put sink x
                                                                 >>= cond (get source >>= maybe (return []) succeed)
                                                                          (return $ maybe [] (:[]) x)
                                          in get source >>= maybe (return []) get1)

-- | The result of combinator 'uptoFirst' takes all input up to and including the first portion of the input which goes
-- into the argument's /true/ sink and feeds it to the result splitter's /true/ sink. All the rest of the input goes
-- into the /false/ sink. The only difference between 'last' and 'lastAndAfter' combinators is in where they direct the
-- /false/ portion of the input preceding the first /true/ part.
uptoFirst :: (Monad m, Typeable x) => Splitter m x -> Splitter m x
uptoFirst splitter = liftSectionSplitter s
   where s source true false = liftM (\(x, y)-> concatMap (maybe [] (:[])) y ++ x) $
                               pipeD "uptoFirst" (transduce (splitterToMarker splitter) source)
                               (\source-> let get1 q (x, False) = let q' = q |> x
                                                                  in get source
                                                                     >>= maybe
                                                                            (putQueue q' false)
                                                                            (get1 q')
                                              get1 q p@(x, True) = do rest <- putQueue q true
                                                                      if null rest then get2 p else return rest
                                              get2 (x, True) = p true x get2
                                              get2 (x, False) = p false x get3
                                              get3 (x, _) = p false x get3
                                              p sink x succeed = put sink x
                                                                 >>= cond (get source >>= maybe (return []) succeed)
                                                                          (return [x])
                                          in get source >>= maybe (return []) (get1 Seq.empty))

-- | The result of the combinator 'last' is a splitter which directs all input to its /false/ sink, up to the last
-- portion of the input which goes to its argument's /true/ sink. That portion of the input is the only one that goes to
-- the resulting component's /true/ sink.  The splitter returned by the combinator 'last' has to buffer the previous two
-- portions of its input, because it cannot know if a true portion of the input is the last one until it sees the end of
-- the input or another portion succeeding the previous one.
last :: (Monad m, Typeable x) => Splitter m x -> Splitter m x
last splitter = liftSectionSplitter s
   where s source true false = liftM (\(x, y)-> concatMap (maybe [] (:[])) y ++ x) $
                               pipeD "last" (transduce (splitterToMarker splitter) source)
                               (\source-> let get1 (x, False) = put false x
                                                                >>= cond (get source >>= maybe (return []) get1)
                                                                         (return [x])
                                              get1 p@(x, True) = get2 Seq.empty p
                                              get2 q (x, True) = let q' = q |> x
                                                                 in get source
                                                                    >>= maybe
                                                                           (putQueue q' true)
                                                                           (get2 q')
                                              get2 q p@(x, False) = get3 q Seq.empty p
                                              get3 qt qf (x, False) = let qf' = qf |> x
                                                                      in get source
                                                                         >>= maybe
                                                                                (putQueue qt true >> putQueue qf' false)
                                                                                (get3 qt qf')
                                              get3 qt qf p@(x, True) = do rest1 <- putQueue qt false
                                                                          rest2 <- putQueue qf false 
                                                                          if null rest1 Prelude.&& null rest2
                                                                             then get2 Seq.empty p
                                                                             else return (rest1 ++ rest2)
                                              p succeed = get source >>= maybe (return []) succeed
                                          in p get1)

-- | The result of the combinator 'lastAndAfter' is a splitter which directs all input to its /false/ sink, up to the
-- last portion of the input which goes to its argument's /true/ sink. That portion and the remainder of the input is fed
-- to the resulting component's /true/ sink. The difference between 'last' and 'lastAndAfter' combinators is where they
-- feed the /false/ portion of the input, if any, remaining after the last /true/ part.
lastAndAfter :: (Monad m, Typeable x) => Splitter m x -> Splitter m x
lastAndAfter splitter = liftSectionSplitter s
   where s source true false = liftM (\(x, y)-> concatMap (maybe [] (:[])) y ++ x) $
                               pipeD "lastAndAfter" (transduce (splitterToMarker splitter) source)
                               (\source-> let get1 (x, False) = put false x >>= cond (p get1) (return [x])
                                              get1 p@(x, True) = get2 Seq.empty p
                                              get2 q (x, True) = let q' = q |> x
                                                                      in get source
                                                                         >>= maybe
                                                                                (putQueue q' true)
                                                                                (get2 q')
                                              get2 q p@(x, False) = get3 q p
                                              get3 q (x, False) = let q' = q |> x
                                                                  in get source
                                                                     >>= maybe
                                                                            (putQueue q' true)
                                                                            (get3 q')
                                              get3 q p@(x, True) = putQueue q false >>= whenNull (get1 p)
                                              p succeed = get source >>= maybe (return []) succeed
                                          in p get1)

-- | The 'prefix' combinator feeds its /true/ sink only the prefix of the input that its argument feeds to its /true/ sink.
-- All the rest of the input is dumped into the /false/ sink of the result.
prefix :: (Monad m, Typeable x) => Splitter m x -> Splitter m x
prefix splitter = liftSectionSplitter s
   where s source true false = liftM (\(x, y)-> y ++ x) $
                               pipeD "prefix" (transduce (splitterToMarker splitter) source)
                               (\source-> let get1 (x, False) = p false x get2
                                              get1 (x, True) = p true x get1
                                              get2 (x, _) = p false x get2
                                              p sink x succeed = put sink x
                                                                 >>= cond (get source >>= maybe (return []) succeed)
                                                                          (return $ maybe [] (:[]) x)
                                          in get source >>= maybe (return []) get1)

-- | The 'suffix' combinator feeds its /true/ sink only the suffix of the input that its argument feeds to its /true/ sink.
-- All the rest of the input is dumped into the /false/ sink of the result.
suffix :: (Monad m, Typeable x) => Splitter m x -> Splitter m x
suffix splitter = liftSectionSplitter s
   where s source true false = liftM (\(x, y)-> concatMap (maybe [] (:[])) y ++ x) $
                               pipeD "suffix" (transduce (splitterToMarker splitter) source)
                               (\source-> let get1 (x, False) = put false x >>= cond (p get1) (return [x])
                                              get1 (x, True) = get2 (Seq.singleton x)
                                              get2 q = get source
                                                       >>= maybe (putQueue q true) (get3 q)
                                              get3 q (x, True) = get2 (q |> x)
                                              get3 q p@(x, False) = putQueue q false >>= whenNull (get1 p)
                                              p succeed = get source >>= maybe (return []) succeed
                                          in p get1)

-- | The 'even' combinator takes every input section that its argument splitters deems /true/, and feeds even ones into
-- its /true/ sink. The odd sections and parts of input that are /false/ according to its argument splitter are fed to
-- 'even' splitter's /false/ sink.
even :: (Monad m, Typeable x) => Splitter m x -> Splitter m x
even splitter = liftSectionSplitter s
   where s source true false = liftM (\(x, y)-> concatMap (maybe [] (:[])) y ++ x) $
                               pipeD "even"
                                  (transduce (splitterToMarker splitter) source)
                                  (\source-> let get1 (x, False) = put false x
                                                                   >>= cond (get source >>= maybe (return []) get1)
                                                                            (return [x])
                                                 get1 p@(x, True) = get2 p
                                                 get2 (x, True) = put false x
                                                                  >>= cond (get source >>= maybe (return []) get2)
                                                                           (return [x])
                                                 get2 p@(x, False) = get3 p
                                                 get3 (x, False) = put false x
                                                                   >>= cond (get source >>= maybe (return []) get3)
                                                                            (return [x])
                                                 get3 p@(x, True) = get4 p
                                                 get4 (x, True) = put true x
                                                                  >>= cond (get source >>= maybe (return []) get4)
                                                                           (return [x])
                                                 get4 p@(x, False) = get1 p
                                             in get source >>= maybe (return []) get1)

-- | Combinator 'followedBy' treats its argument 'Splitter's as patterns components and returns a 'Splitter' that
-- matches their concatenation. A section of input is considered /true/ by the result iff its prefix is considered
-- /true/ by argument /s1/ and the rest of the section is considered /true/ by /s2/. The splitter /s2/ is started anew
-- after every section split to /true/ sink by /s1/.
followedBy :: forall m x. (Monad m, Typeable x) => Splitter m x -> Splitter m x -> Splitter m x
followedBy s1 s2 = liftSectionSplitter s
   where s source true false
            = liftM (\(x, y)-> concatMap (maybe [] (:[])) y ++ x) $
              pipeD "followedBy"
                 (transduce (splitterToMarker s1) source)
                 (\source-> let get0 q = case Seq.viewl q
                                         of Seq.EmptyL -> get source >>= maybe (return []) get1
                                            (x, False) :< rest -> put false x
                                                                  >>= cond (get0 rest)
                                                                           (return $ Foldable.toList $ Seq.viewl $ fmap fst q)
                                            (x, True) :< rest -> get2 Seq.empty q
                                get1 (x, False) = put false x
                                                  >>= cond (get source >>= maybe (return []) get1)
                                                           (return [x])
                                get1 p@(x, True) = get2 Seq.empty (Seq.singleton p)
                                get2 q q' = case Seq.viewl q'
                                            of Seq.EmptyL -> get source
                                                             >>= maybe (testEnd q) (get2 q . Seq.singleton)
                                               (x, True) :< rest -> get2 (q |> x) rest
                                               (x, False) :< rest -> do ((q1, q2), n) <- pipeD "followedBy tail"
                                                                                               (get3 Seq.empty q') (test q)
                                                                        case n of Nothing -> putQueue q false
                                                                                             >>= whenNull (get0 (q1 >< q2))
                                                                                  Just n -> do put false Nothing
                                                                                               get0 (dropJust n q1 >< q2)
                                get3 q1 q2 sink = canPut sink
                                                  >>= cond (case Seq.viewl q2
                                                            of Seq.EmptyL -> get source
                                                                             >>= maybe (return (q1, q2))
                                                                                       (\p-> maybe (return True) (put sink) (fst p)
                                                                                                >> get3 (q1 |> p)