{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE LambdaCase   #-}
{-# LANGUAGE RankNTypes   #-}

-- |
-- Module      : Data.Conduino.Combinators
-- Copyright   : (c) Justin Le 2019
-- License     : BSD3
--
-- Maintainer  : justin@jle.im
-- Stability   : experimental
-- Portability : non-portable
--
-- A basic collection of base 'Pipe's that serve as a "prelude" for the
-- package.  This module is meant to be imported qualified.
--
-- > import qualified Data.Conduino.Combinators as C
--
module Data.Conduino.Combinators (
  -- * Sources
  -- ** Pure
  -- *** Infinite
    unfold
  , iterate
  , repeat
  -- *** Finite
  , unfoldMaybe
  , unfoldEither
  , iterateMaybe
  , iterateEither
  , sourceList
  , replicate
  -- ** Monadic
  -- *** Infinite
  , repeatM
  -- *** Finite
  , repeatMaybeM
  , repeatEitherM
  , replicateM
  , sourceHandleLines
  , stdinLines
  , sourceHandle
  , stdin
  -- * Pipes
  , map
  , mapM
  , scan
  , mapAccum
  , take
  , takeWhile
  , filter
  , concatMap
  , concat
  , pairs
  , consecutive
  -- * Sinks
  -- ** Pure
  , drop
  , dropWhile
  , foldr
  , foldl
  , foldMap
  , fold
  , sinkNull
  , sinkList
  , last
  -- ** Monadic
  , sinkHandle
  , stdout
  , stderr
  ) where

import           Control.Applicative
import           Control.Exception
import           Control.Monad hiding          (mapM, replicateM)
import           Control.Monad.IO.Class
import           Control.Monad.Trans.Class
import           Control.Monad.Trans.Maybe
import           Data.Conduino
import           Data.Either
import           Data.Foldable hiding          (foldr, foldl, fold, concat, concatMap, foldMap)
import           Data.Maybe
import           Data.Semigroup
import           Prelude hiding                (map, iterate, mapM, replicate, repeat, foldr, drop, foldl, last, take, concatMap, filter, concat, takeWhile, dropWhile, foldMap)
import           System.IO.Error
import qualified Data.ByteString               as BS
import qualified Data.ByteString.Lazy.Internal as BSL
import qualified Data.Sequence                 as Seq
import qualified System.IO                     as S

-- | A version of 'unfoldMaybe' that can choose the "result" value by
-- passing it in as 'Left'.
unfoldEither
    :: (s -> Either a (o, s))
    -> s
    -> Pipe i o u m a
unfoldEither f = go
  where
    go z = case f z of
      Left  r       -> pure r
      Right (x, z') -> yield x *> go z'

-- | A version of 'unfold' that can terminate and end by returning
-- 'Nothing'.
unfoldMaybe
    :: (s -> Maybe (o, s))
    -> s
    -> Pipe i o u m ()
unfoldMaybe f = unfoldEither (maybe (Left ()) Right . f)

-- | Repeatedly apply an "unfolding" function to a given initial state,
-- yielding the first item in the tuple as output and updating the state as
-- the second item in the tuple.  Goes on forever.  See 'unfoldMaybe' for
-- a version that stops.
unfold
    :: (s -> (o, s))
    -> s
    -> Pipe i o u m a
unfold f = go
  where
    go z = yield x *> go z'
      where
        (x, z') = f z

-- | A version of 'iterateMaybe' that can specify a result value by
-- providing it in the 'Left'.
iterateEither
    :: (o -> Either a o)
    -> o
    -> Pipe i o u m a
iterateEither f = unfoldEither (fmap (join (,)) . f)

-- | A version of 'iterate' that can choose to terminate and stop by
-- returning 'Nothing'.
iterateMaybe
    :: (o -> Maybe o)
    -> o
    -> Pipe i o u m ()
iterateMaybe f = unfoldMaybe (fmap (join (,)) . f)

-- | Repeatedly apply a function to a given starting value and yield each
-- result forever.
--
-- >>> runPipePure $ iterate succ 0
--       .| take 5
--       .| sinkList
--
-- [1,2,3,4,5]
--
-- This doesn't yield the original starting value.  However, you can yield
-- it iterate after:
--
-- >>> runPipePure $ (yield 0 >> iterate succ 0)
--       .| take 5
--       .| sinkList
--
-- [0,1,2,3,4,5]
iterate
    :: (o -> o)
    -> o
    -> Pipe i o u m a
iterate f = unfold (join (,) . f)

-- | Yield every item in a foldable container.
sourceList :: Foldable t => t a -> Pipe i a u m ()
sourceList = traverse_ yield

-- | Repeatedly yield a given item forever.
repeat :: o -> Pipe i o u m a
repeat = forever . yield

-- | Yield a given item a certain number of times.
replicate :: Int -> o -> Pipe i o u m ()
replicate n = replicateM_ n . yield

-- | Like 'repeatMaybeM', but allow specification of a final result type.
repeatEitherM
    :: Monad m
    => m (Either a o)
    -> Pipe i o u m a
repeatEitherM x = go
  where
    go = lift x >>= \case
      Left r  -> pure r
      Right y -> yield y *> go

-- | Repeat a monadic action, yielding the item in the 'Just' every time.
-- As soon as it sees 'Nothing', stop producing forever.
--
-- Remember that each item will only be "executed" when something
-- downstream requests output.
repeatMaybeM
    :: Monad m
    => m (Maybe o)
    -> Pipe i o u m ()
repeatMaybeM = repeatEitherM . fmap (maybe (Left ()) Right)

-- | Repeat a monadic action a given number of times, yielding each result,
-- and then stop producing forever.
--
-- Remember that each item will only be "executed" when something
-- downstream requests output.
replicateM
    :: Monad m
    => Int
    -> m o
    -> Pipe i o u m ()
replicateM n x = replicateM_ n $ lift x >>= yield

-- | Source from each line received from 'stdin'.  This stops as soon as
-- end-of-file is reached, or an empty line is seen.
stdinLines :: MonadIO m => Pipe i String u m ()
stdinLines = sourceHandleLines S.stdin

-- | Source from stdin, yielding bytestrings as they are drawn.  If you
-- want to retrieve each line as a string, see 'stdinLines'.
stdin :: MonadIO m => Pipe i BS.ByteString u m ()
stdin = sourceHandle S.stdin

-- | Source from a given I/O handle, yielding each line drawn as a string.
-- To draw raw bytes, use 'sourceHandle'.
--
-- This stop as soon as end-of-file is reached, or an empty line is seen.
sourceHandleLines
    :: MonadIO m
    => S.Handle
    -> Pipe i String u m ()
sourceHandleLines h = repeatMaybeM $ do
    d <- liftIO $ S.hIsEOF h
    if d
      then pure Nothing
      else liftIO . catchJust
                (guard . isEOFError)
                (mfilter (not . null) . Just <$> S.hGetLine h)
                $ \_ -> pure Nothing

-- | Source from a given I/O handle, yielding bytestrings as they are
-- pulled.  If you want to retrieve each line as a string, see
-- 'sourceHandleLines'.
sourceHandle
    :: MonadIO m
    => S.Handle
    -> Pipe i BS.ByteString u m ()
sourceHandle h = repeatMaybeM
               . fmap (mfilter (not . BS.null) . Just)
               . liftIO
               $ BS.hGetSome h BSL.defaultChunkSize

-- | Sink into a given I/O handle, writing each input to the handle.
sinkHandle
    :: MonadIO m
    => S.Handle
    -> Pipe BS.ByteString o u m ()
sinkHandle h = mapM (liftIO . BS.hPut h)
            .| sinkNull

-- | A sink into stdout.
stdout :: MonadIO m => Pipe BS.ByteString o u m ()
stdout = sinkHandle S.stdout

-- | A sink into stderr.
stderr :: MonadIO m => Pipe BS.ByteString o u m ()
stderr = sinkHandle S.stderr

-- | Repeat a monadic action forever, yielding each output.
--
-- Remember that each item will only be "executed" when something
-- downstream requests output.
repeatM
    :: Monad m
    => m o
    -> Pipe i o u m a
repeatM x = go
  where
    go = (yield =<< lift x) *> go

-- | Process every incoming item with a pure function, and yield its
-- output.
map :: (i -> o) -> Pipe i o u m u
map f = awaitForever (yield . f)

-- | Map a monadic function to process every input, and yield its output.
mapM :: Monad m => (i -> m o) -> Pipe i o u m u
mapM f = awaitForever ((yield =<<) . lift . f)

-- | Map a pure "stateful" function over each incoming item.  Give
-- a function to update the state and return an output and an initial
-- state.
mapAccum
    :: (i -> s -> (s, o))       -- ^ update state and output
    -> s                        -- ^ initial state
    -> Pipe i o u m u
mapAccum f = go
  where
    go !x = awaitWith $ \y ->
        let (!x', !z) = f y x
        in  yield z *> go x'

-- | Like 'foldl', but yields every accumulator value downstream.
--
-- >>> runPipePure $ sourceList [1..10]
--       .| scan (+) 0
--       .| sinkList
-- [1,3,6,10,15,21,28,36,45,55]
-- @
scan
    :: (o -> i -> o)
    -> o
    -> Pipe i o u m u
scan f = go
  where
    go !x = awaitWith $ \y ->
      let x' = f x y
      in  yield x' *> go x'

-- | Yield consecutive pairs of values.
--
-- >>> runPipePure $ sourceList [1..5]
--       .| pairs
--       .| sinkList
-- [(1,2),(2,3),(3,4),(4,5)]
pairs :: Pipe i (i, i) u m u
pairs = awaitWith go
  where
    go x = awaitWith $ \y -> do
      yield (x, y)
      go y

-- | Yield consecutive runs of at most @n@ of values, starting with an
-- empty sequence.
--
-- To get only "full" sequences, pipe with 'filter'.
--
-- >>> runPipePure $ sourceList [1..6]
--       .| consecutive 3
--       .| map toList
--       .| sinkList
-- [[],[1],[1,2],[1,2,3],[2,3,4],[3,4,5],[4,5,6]]
--
-- >>> runPipePure $ sourceList [1..6]
--       .| consecutive 3
--       .| filter ((== 3) . Seq.length)
--       .| map toList
--       .| sinkList
-- [[1,2,3],[2,3,4],[3,4,5],[4,5,6]]
consecutive :: Int -> Pipe i (Seq.Seq i) u m u
consecutive n = go Seq.empty
  where
    go xs = do
      yield xs
      awaitWith $ \y -> go . Seq.drop (Seq.length xs - n + 1) $ (xs Seq.:|> y)


-- | Let a given number of items pass through the stream uninhibited, and
-- then stop producing forever.
--
-- This is most useful if you sequence a second conduit after it.
--
-- >>> runPipePure $ sourceList [1..8]
--       .| (do take 3 .| map (*2)         -- double the first 3 items
--              map negate                 -- negate the rest
--          )
--       .| sinkList
-- [2,4,6,-4,-5,-6,-7,-8]
take :: Int -> Pipe i i u m ()
take n = void . runMaybeT . replicateM_ n $
    lift . yield =<< MaybeT await

-- | Let elements pass until an element is received that does not satisfy
-- the predicate, then stop producing forever.
--
-- Like 'take', is most useful if you sequence a second conduit after it.
takeWhile :: (i -> Bool) -> Pipe i i u m ()
takeWhile p = go
  where
    go = await >>= \case
      Nothing -> pure ()
      Just x
        | p x       -> yield x *> go
        | otherwise -> pure ()

-- | Only allow values satisfying a predicate to pass.
filter
    :: (i -> Bool)
    -> Pipe i i u m u
filter p = awaitForever $ \x -> when (p x) $ yield x

-- | Map a function returning a container onto every incoming item, and
-- yield all of the outputs from that function.
concatMap
    :: Foldable t
    => (i -> t o)
    -> Pipe i o u m u
concatMap f = awaitForever (sourceList . f)

-- | Take an input of containers and output each of their elements
-- successively.
concat :: Foldable t => Pipe (t i) i u m u
concat = awaitForever sourceList

-- | Right-fold every input into an accumulated value.
--
-- Essentially this builds up a giant continuation that will be run all at
-- once on the final result.
foldr :: (a -> b -> b) -> b -> Pipe a o u m b
foldr f z = go
  where
    go = await >>= \case
      Nothing -> pure z
      Just x  -> f x <$> go

-- | Left-fold every input into an accumulated value.
--
-- Essentially this maintains a state and modifies that state with every
-- input, using the given accumulating function.
foldl :: (b -> a -> b) -> b -> Pipe a o u m b
foldl f = go
  where
    go !z = await >>= \case
      Nothing -> pure z
      Just !x -> go (f z x)

-- | Fold every incoming item monoidally, and return the result once
-- finished.
fold :: Monoid a => Pipe a o u m a
fold = foldl (<>) mempty

-- | Fold every incoming item according to a monoidal projection, and
-- return the result once finished.
--
-- This can be used to implement many useful consumers, like ones that find
-- the sum or the maximum item:
--
-- @
-- sum :: Num i => Pipe i o u m i
-- sum = getSum <$> foldMap Sum
--
-- maximum :: Ord i => Pipe i o u m (Maybe i)
-- maximum = fmap getMax <$> foldMap (Just . Max)
-- @
foldMap :: Monoid a => (i -> a) -> Pipe i o u m a
foldMap f = foldl (\x y -> x <> f y) mempty

-- | Sink every incoming item into a list.
--
-- Note that this keeps the entire list in memory until it is all
-- eventually read.
sinkList :: Pipe i o u m [i]
sinkList = foldr (:) []

-- | Ignore a certain amount of items from the input stream, and then stop
-- producing forever.
--
-- This is most useful if you sequence a second consumer after it:
--
-- >>> runPipePure $ sourceList [1..8]
--       .| (drop 3 >> 'sinkList')
-- [4,5,6,7,8]
drop :: Int -> Pipe i o u m ()
drop n = replicateM_ n await

-- | Ignore items from an input stream as long as they match a predicate.
-- Afterwards, stop producing forever.
--
-- Like for 'drop', is most useful of you sequence a second consumer after
-- it.
dropWhile
    :: (i -> Bool)
    -> Pipe i o u m ()
dropWhile p = go
  where
    go = await >>= \case
      Nothing -> pure ()
      Just x
        | p x       -> go
        | otherwise -> pure ()

-- | Consume an entire input stream and ignore all of its outputs.
sinkNull :: Pipe i o u m ()
sinkNull = await >>= \case
    Nothing -> pure ()
    Just _  -> sinkNull

-- | Get the last item emitted by a stream.
--
-- To get the first item ("head"), use 'await' or 'awaitSurely'.
last :: Pipe i o u m (Maybe i)
last = fmap getLast <$> foldMap (Just . Last)