{-# LANGUAGE DeriveFunctor              #-}
{-# LANGUAGE FlexibleContexts           #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE LambdaCase                 #-}
{-# LANGUAGE PatternSynonyms            #-}
{-# LANGUAGE RankNTypes                 #-}
{-# LANGUAGE ScopedTypeVariables        #-}
{-# LANGUAGE TypeInType                 #-}
{-# LANGUAGE ViewPatterns               #-}

-- |
-- Module      : Data.Conduino
-- Copyright   : (c) Justin Le 2019
-- License     : BSD3
--
-- Maintainer  : justin@jle.im
-- Stability   : experimental
-- Portability : non-portable
--
-- Base API for 'Pipe'.  See documentation for 'Pipe', '.|', and 'runPipe'
-- for information on usage.
--
-- A "prelude" of useful pipes can be found in "Data.Conduino.Combinators".
--
-- == Why a stream processing library?
-- 
-- A stream processing library is a way to stream processors in a /composable/ way:
-- instead of defining your entire stream processing function as a single
-- recursive loop with some global state, instead think about each "stage" of the process,
-- and isolate each state to its own segment.  Each component can contain its own
-- isolated state:
-- 
-- >>> runPipePure $ sourceList [1..10]
--       .| scan (+) 0
--       .| sinkList
-- [1,3,6,10,15,21,28,36,45,55]
-- 
-- All of these components have internal "state":
-- 
-- *   @sourceList@ keeps track of "which" item in the list to yield next
-- *   @scan@ keeps track of the current running sum
-- *   @sinkList@ keeps track of all items that have been seen so far, as a list
-- 
-- They all work together without knowing any other component's internal state, so
-- you can write your total streaming function without concerning yourself, at
-- each stage, with the entire part.
-- 
-- In addition, there are useful functions to "combine" stream processors:
-- 
-- *   'zipSink' combines sinks in an "and" sort of way: combine two sinks in
--     parallel and finish when all finish.
-- *   'altSink' combines sinks in an "or" sort of way: combine two sinks in
--     parallel and finish when any of them finish
-- *   'zipSource' combines sources in parallel and collate their outputs.
-- 
-- Stream processing libraries are also useful for streaming composition of
-- monadic effects (like IO or State), as well.
--
module Data.Conduino (
    Pipe
  , (.|)
  , runPipe, runPipePure
  , awaitEither, await, awaitWith, awaitSurely, awaitForever, yield
  , mapInput, mapOutput, mapUpRes, trimapPipe
  -- * Wrappers
  , ZipSource(..)
  , unconsZipSource
  , ZipSink(..)
  , zipSink, altSink
  -- * Generators
  , toListT, fromListT
  , pattern PipeList
  , withSource, genSource
  ) where

import           Control.Applicative
import           Control.Monad
import           Control.Monad.Trans.Class
import           Control.Monad.Trans.Free        (FreeT(..), FreeF(..))
import           Control.Monad.Trans.Free.Church
import           Data.Conduino.Internal
import           Data.Functor
import           Data.Functor.Identity
import           Data.Void
import           List.Transformer                (ListT(..), Step(..))
import qualified List.Transformer                as LT

-- | Await input from upstream.  Will block until upstream 'yield's.
--
-- Will return 'Nothing' if the upstream pipe finishes and terminates.
--
-- If the upstream pipe never terminates, then you can use 'awaitSurely' to
-- guarantee a result.
--
-- Will always return 'Just' if @u@ is 'Void'.
await :: Pipe i o u m (Maybe i)
await = either (const Nothing) Just <$> awaitEither

-- | 'await', but directly chaining a continuation if the 'await' was
-- succesful.
--
-- The await will always be succesful if @u@ is 'Void'.
--
-- This is a way of writing code in a way that is agnostic to how the
-- upstream pipe terminates.
awaitWith :: (i -> Pipe i o u m u) -> Pipe i o u m u
awaitWith f = awaitEither >>= \case
    Left  r -> pure r
    Right x -> f x

-- | Await input from upstream where the upstream pipe is guaranteed to
-- never terminate.
--
-- A common type error will occur if @u@ (upstream pipe result type) is not
-- 'Void' -- it might be @()@ or some non-'Void' type.  This means that the
-- upstream pipe terminates, so awaiting cannot be assured.
--
-- In that case, either change your upstream pipe to be one that never
-- terminates (which is most likely not possible), or use 'await' instead
-- of 'awaitSurely'.
awaitSurely :: Pipe i o Void m i
awaitSurely = either absurd id <$> awaitEither

-- | A useful utility function over repeated 'await's.  Will repeatedly
-- 'await' and then continue with the given pipe whenever the upstream pipe
-- yields.
--
-- Can be used to implement many pipe combinators:
--
-- @
-- 'Data.Conduino.Combinators.map' f = 'awaitForever' $ \x -> 'yield' (f x)
-- @
awaitForever :: (i -> Pipe i o u m a) -> Pipe i o u m u
awaitForever = awaitForeverWith pure

-- | 'awaitForever', but with a way to handle the result of the
-- upstream pipe, which will be called when the upstream pipe stops
-- producing.
awaitForeverWith
    :: (u -> Pipe () o u m b)       -- ^ how to handle upstream ending, transitioning to a source
    -> (i -> Pipe i o u m a)        -- ^ how to handle upstream output
    -> Pipe i o u m b
awaitForeverWith f g = go
  where
    go = awaitEither >>= \case
      Left x  -> mapInput (const ()) $ f x
      Right x -> g x *> go

-- | Run a pipe that is both a source and a sink (an "effect") into the
-- effect that it represents.
--
-- Usually you wouild construct this using something like:
--
-- @
-- 'runPipe' $ someSource
--        '.|' somePipe
--        .| someOtherPipe
--        .| someSink
-- @
--
-- 'runPipe' will produce the result of that final sink.
--
-- Some common errors you might receive:
--
-- *  @i@ is not @()@: If you give a pipe where the first parameter
--    ("input") is not @()@, it means that your pipe is not a producer.
--    Pre-compose it (using '.|') with a producer of the type you need.
--
--    For example, if you have a @myPipe :: 'Pipe' 'Int' o u m a@, this is
--    a pipe that is awaiting 'Int's from upstream.  Pre-compose with
--    a producer of 'Int's, like @'Data.Conduino.Combinators.sourceList'
--    [1,2,3] '.|' myPipe@, in order to be able to run it.
--
-- *  @o@ is not 'Void': If you give a pipe where the second parameter
--    ("output") is not 'Void', it means that your pipe is not a consumer.
--    Post-compose it (using '.|') with a consumer of the type you need.
--
--    For example, if you have @myPipe :: 'Pipe' i 'Int' u m a@, this is
--    a pipe that is yielding 'Int's downstream that are going unhandled.
--    Post-compose it a consumer of 'Int's, like @myPipe '.|'
--    'Data.Conduino.foldl' (+) 0@, in order to be able to run it.
--
--    If you just want to ignore all downstream yields, post-compose with
--    'Data.Conduino.Combinators.sinkNull'.
--
runPipe :: Monad m => Pipe () Void u m a -> m a
runPipe = iterT go . pipeFree
  where
    go = \case
      PAwaitF _ f -> f ()
      PYieldF o _ -> absurd o

-- | 'runPipe' when the underlying monad is 'Identity', and so has no
-- effects.
runPipePure :: Pipe () Void Void Identity a -> a
runPipePure = runIdentity . runPipe

-- | The main operator for chaining pipes together.  @pipe1 .| pipe2@ will
-- connect the output of @pipe1@ to the input of @pipe2@.
--
-- "Running" a pipe will draw from @pipe2@, and if @pipe2@ ever asks for
-- input (with 'await' or something similar), it will block until @pipe1@
-- outputs something (or signals termination).
--
-- The structure of a full pipeline usually looks like:
--
-- @
-- 'runPipe' $ someSource
--        '.|' somePipe
--        .| someOtherPipe
--        .| someSink
-- @
--
-- Where you route a source into a series of pipes, which eventually ends
-- up at a sink.  'runPipe' will then produce the result of that sink.
(.|)
    :: Monad m
    => Pipe a b u m v
    -> Pipe b c v m r
    -> Pipe a c u m r
Pipe p .| Pipe q = Pipe $ toFT $ compPipe_ (fromFT p) (fromFT q)
infixr 2 .|

compPipe_
    :: forall a b c u v m r. (Monad m)
    => RecPipe a b u m v
    -> RecPipe b c v m r
    -> RecPipe a c u m r
compPipe_ p q = FreeT $ runFreeT q >>= \qq -> case qq of
    Pure x             -> pure . Pure $ x
    Free (PAwaitF f g) -> runFreeT p >>= \pp -> case pp of
      Pure x'              -> runFreeT $ compPipe_ (FreeT (pure pp)) (f x')
      Free (PAwaitF f' g') -> pure . Free $ PAwaitF ((`compPipe_` FreeT (pure qq)) . f')
                                                    ((`compPipe_` FreeT (pure qq)) . g')
      Free (PYieldF x' y') -> runFreeT $ compPipe_ y' (g x')
    Free (PYieldF x y) -> pure . Free $ PYieldF x (compPipe_ p y)

-- | A newtype wrapper over a source (@'Pipe' () o 'Void'@) that gives it an
-- alternative 'Applicative' and 'Alternative' instance, matching "ListT
-- done right".
--
-- '<*>' will pair up each output that the sources produce: if you 'await'
-- a value from downstream, it will wait until both paired sources yield
-- before passing them on together.
--
-- '<|>' will completely exhaust the first source before moving on to the
-- next source.
--
-- 'ZipSource' is effectively equivalent to "ListT done right", the true
-- List Monad transformer.  '<|>' is concatentation.  You can use this type
-- with 'lift' to lift a yielding action and '<|>' to sequence yields to
-- implement the pattern described in
-- <http://www.haskellforall.com/2014/11/how-to-build-library-agnostic-streaming.html>,
-- where you can write streaming producers in a polymorphic way, and have
-- it run with pipes, conduit, etc.
--
-- The main difference is that its 'Applicative' instance ("zipping") is
-- different from the traditional 'Applicative' instance for 'ListT'
-- ("all combinations").  Effectively this becomes like a "zipping"
-- 'Applicative' instance for 'ListT'.
--
-- If you want a 'Monad' (or 'Control.Monad.IO.Class.MonadIO') instance,
-- use 'ListT' instead, and convert using 'toListT'/'fromListT' or the
-- 'PipeList' pattern/constructor.
newtype ZipSource m a = ZipSource { getZipSource :: Pipe () a Void m () }

-- | A source is equivalent to a 'ListT' producing a 'Maybe'; this pattern
-- synonym lets you treat it as such.  It essentialyl wraps over 'toListT'
-- and 'fromListT'.
pattern PipeList :: Monad m => ListT m (Maybe a) -> Pipe () a u m ()
pattern PipeList xs <- (toListT->xs)
  where
    PipeList xs = fromListT xs
{-# COMPLETE PipeList #-}

instance Functor (ZipSource m) where
    fmap f = ZipSource . mapOutput f . getZipSource

instance Monad m => Applicative (ZipSource m) where
    pure = ZipSource . yield
    ZipSource (PipeList fs) <*> ZipSource (PipeList xs) = ZipSource . PipeList . fmap Just $
            uncurry ($)
        <$> LT.zip (concatListT fs) (concatListT xs)

concatListT :: Monad m => ListT m (Maybe a) -> ListT m a
concatListT xs = ListT $ next xs >>= \case
    Nil              -> pure Nil
    Cons Nothing  ys -> next (concatListT ys)
    Cons (Just y) ys -> pure $ Cons y (concatListT ys)

instance Monad m => Alternative (ZipSource m) where
    empty = ZipSource $ pure ()
    ZipSource p <|> ZipSource q = ZipSource (p *> q)

instance MonadTrans ZipSource where
    lift = ZipSource . (yield =<<) . lift

-- | A source is essentially equivalent to 'ListT' producing a 'Maybe'
-- result.  This converts it to the 'ListT' it encodes.
--
-- See 'ZipSource' for a wrapper over 'Pipe' that gives the right 'Functor'
-- and 'Alternative' instances.
toListT
    :: Applicative m
    => Pipe () o u m ()
    -> ListT m (Maybe o)
toListT p = ListT $ runFT (pipeFree p)
    (\_ -> pure Nil)
    (\pNext -> \case
        PAwaitF _ g -> pure $ Cons Nothing  (ListT . pNext $ g ())
        PYieldF x y -> pure $ Cons (Just x) (ListT . pNext $ y   )
    )

-- | A source is essentially 'ListT' producing a 'Maybe' result.  This
-- converts a 'ListT' to the source it encodes.
--
-- See 'ZipSource' for a wrapper over 'Pipe' that gives the right 'Functor'
-- and 'Alternative' instances.
fromListT
    :: Monad m
    => ListT m (Maybe o)
    -> Pipe i o u m ()
fromListT = fromRecPipe . go
  where
    go xs = FreeT $ next xs >>= \case
      Nil              -> pure . Pure $ ()
      Cons Nothing  ys -> pure . Free $ PAwaitF (\_ -> pure ()) $ \_ -> go ys
      Cons (Just y) ys -> pure . Free $ PYieldF y (go ys)

---- | A source is essentially equiavlent to 'ListT'.  This converts
---- a 'ListT' to the source it encodes.
----
---- See 'ZipSource' for a wrapper over 'Pipe' that gives the right 'Functor'
---- and 'Alternative' instances.
--fromListT
--    :: Monad m
--    => ListT m o
--    -> Pipe i o u m ()
--fromListT = fromRecPipe . go
--  where
--    go xs = FreeT $ next xs >>= \case
--      Nil       -> pure . Pure $ ()
--      Cons y ys -> pure . Free $ PYieldF y (go ys)

-- | Given a "generator" of @o@ in @m@, return a /source/ that that
-- generator encodes.  Is the inverse of 'withSource'.
--
-- The generator is essentially a church-encoded 'ListT'.
genSource
    :: (forall r. (Maybe (o, m r) -> m r) -> m r)
    -> Pipe i o u m ()
genSource f = Pipe $ FT $ \pDone pFree -> f $ \case
    Nothing      -> pDone ()
    Just (x, xs) -> pFree id (PYieldF x xs)

-- | A source can be "run" by providing a continuation to handle and
-- sequence each of its outputs.  Is ths inverse of 'genSource'.
--
-- This essentially turns a pipe into a church-encoded 'ListT'.
withSource
    :: Pipe () o u m ()
    -> (Maybe (o, m r) -> m r)    -- ^ handler ('Nothing' = done, @'Just' (x, next)@ = yielded value and next action
    -> m r
withSource p f = runFT (pipeFree p)
    (\_ -> f Nothing)
    (\pNext -> \case
        PAwaitF _ g -> pNext $ g ()
        PYieldF x y -> f (Just (x, pNext y))
    )

-- | 'ZipSource' is effectively 'ListT' returning a 'Maybe'.  As such, you
-- can use 'unconsZipSource' to "peel off" the first yielded item, if it
-- exists, and return the "rest of the list".
unconsZipSource
    :: Monad m
    => ZipSource m a
    -> m (Maybe (Maybe a, ZipSource m a))
unconsZipSource (ZipSource (PipeList p)) = next p <&> \case
    Cons x xs -> Just (x, ZipSource (PipeList xs))
    Nil       -> Nothing

-- | A newtype wrapper over a sink (@'Pipe' i 'Void'@) that gives it an
-- alternative 'Applicative' and 'Alternative' instance.
--
-- '<*>' will distribute input over both sinks, and output a final result
-- once both sinks finish.
--
-- '<|>' will distribute input over both sinks, and output a final result
-- as soon as one or the other finishes.
newtype ZipSink i u m a = ZipSink { getZipSink :: Pipe i Void u m a }
  deriving Functor

zipSink_
    :: Monad m
    => RecPipe i Void u m (a -> b)
    -> RecPipe i Void u m a
    -> RecPipe i Void u m b
zipSink_ p q = FreeT $ runFreeT p >>= \pp -> case pp of
    Pure x             -> runFreeT q >>= \case
      Pure x'              -> pure . Pure $ x x'
      Free (PAwaitF f' g') -> pure . Free $
        PAwaitF (zipSink_ (FreeT (pure pp)) . f')
                (zipSink_ (FreeT (pure pp)) . g')
      Free (PYieldF x' _ ) -> absurd x'
    Free (PAwaitF f g) -> runFreeT q >>= \qq -> case qq of
      Pure _               -> pure . Free $
        PAwaitF ((`zipSink_` FreeT (pure qq)) . f)
                ((`zipSink_` FreeT (pure qq)) . g)
      Free (PAwaitF f' g') -> pure . Free $
        PAwaitF (zipSink_ <$> f <*> f') (zipSink_ <$> g <*> g')
      Free (PYieldF x' _ ) -> absurd x'
    Free (PYieldF x _) -> absurd x

altSink_
    :: Monad m
    => RecPipe i Void u m a
    -> RecPipe i Void u m a
    -> RecPipe i Void u m a
altSink_ p q = FreeT $ runFreeT p >>= \case
    Pure x             -> pure . Pure $ x
    Free (PAwaitF f g) -> runFreeT q <&> \case
      Pure x'              -> Pure x'
      Free (PAwaitF f' g') -> Free $ PAwaitF (altSink_ <$> f <*> f') (altSink_ <$> g <*> g')
      Free (PYieldF x' _ ) -> absurd x'
    Free (PYieldF x _) -> absurd x

-- | Distribute input to both sinks, and finishes with the final result
-- once both finish.
--
-- Forms an identity with 'pure'.
zipSink
    :: Monad m
    => Pipe i Void u m (a -> b)
    -> Pipe i Void u m a
    -> Pipe i Void u m b
zipSink (Pipe p) (Pipe q) = Pipe $ toFT $ zipSink_ (fromFT p) (fromFT q)

-- | Distribute input to both sinks, and finishes with the result of
-- the one that finishes first.
altSink
    :: Monad m
    => Pipe i Void u m a
    -> Pipe i Void u m a
    -> Pipe i Void u m a
altSink (Pipe p) (Pipe q) = Pipe $ toFT $ altSink_ (fromFT p) (fromFT q)

-- | '<*>' = distribute input to all, and return result when they finish
--
-- 'pure' = immediately finish
instance Monad m => Applicative (ZipSink i u m) where
    pure = ZipSink . pure
    ZipSink p <*> ZipSink q = ZipSink $ zipSink p q

-- | '<|>' = distribute input to all, and return the first result that
-- finishes
--
-- 'empty' = never finish
instance Monad m => Alternative (ZipSink i u m) where
    empty = ZipSink go
      where
        go = forever await
    ZipSink p <|> ZipSink q = ZipSink $ altSink p q

instance MonadTrans (ZipSink i u) where
    lift = ZipSink . lift