{-# LANGUAGE ScopedTypeVariables #-}
-- | Basic pipe combinators.
module Control.Pipe.Combinators (
  -- ** Control operators
  tryAwait,
  forP,
  -- ** Composition
  ($$),
  -- ** Producers
  fromList,
  -- ** Folds
  -- | Folds are pipes that consume all their input and return a value. Some of
  -- them, like 'fold1', do not return anything when they don't receive any
  -- input at all. That means that the upstream return value will be returned
  -- instead.
  --
  -- Folds are normally used as 'Consumer's, but they are actually polymorphic
  -- in the output type, to encourage their use in the implementation of
  -- higher-level combinators.
  fold,
  fold1,
  consume,
  consume1,
  -- ** List-like pipe combinators
  -- The following combinators are analogous to the corresponding list
  -- functions, when the stream of input values is thought of as a (potentially
  -- infinite) list.
  take,
  drop,
  takeWhile,
  takeWhile_,
  dropWhile,
  intersperse,
  groupBy,
  filter,
  -- ** Other combinators
  pipeList,
  nullP,
  feed,
  ) where

import Control.Applicative
import Control.Monad
import Control.Pipe
import Control.Pipe.Exception
import Data.Maybe
import Prelude hiding (until, take, drop, concatMap, filter, takeWhile, dropWhile, catch)

-- | Like 'await', but returns @Just x@ when the upstream pipe yields some value
-- @x@, and 'Nothing' when it terminates.
--
-- Further calls to 'tryAwait' after upstream termination will keep returning
-- 'Nothing', whereas calling 'await' will terminate the current pipe
-- immediately.
tryAwait :: Monad m => Pipe a b m (Maybe a)
tryAwait = catch (Just <$> await) $ \(_ :: BrokenUpstreamPipe) -> return Nothing

-- | Execute the specified pipe for each value in the input stream.
--
-- Any action after a call to 'forP' will be executed when upstream terminates.
forP :: Monad m => (a -> Pipe a b m r) -> Pipe a b m ()
forP f = tryAwait >>= maybe (return ()) (\a -> f a >> forP f)

-- | Connect producer to consumer, ignoring producer return value.
infixr 5 $$
($$) :: Monad m => Pipe x a m r' -> Pipe a y m r -> Pipe x y m (Maybe r)
p1 $$ p2 = (p1 >> return Nothing) >+> fmap Just p2

-- | Successively yield elements of a list.
fromList :: Monad m => [a] -> Pipe x a m ()
fromList = mapM_ yield

-- | A pipe that terminates immediately.
nullP :: Monad m => Pipe a b m ()
nullP = return ()

-- | A fold pipe. Apply a binary function to successive input values and an
-- accumulator, and return the final result.
fold :: Monad m => (b -> a -> b) -> b -> Pipe a x m b
fold f = go
  where
    go x = tryAwait >>= maybe (return x) (go . f x)

-- | A variation of 'fold' without an initial value for the accumulator. This
-- pipe doesn't return any value if no input values are received.
fold1 :: Monad m => (a -> a -> a) -> Pipe a x m a
fold1 f = tryAwait >>= maybe discard (fold f)

-- | Accumulate all input values into a list.
consume :: Monad m => Pipe a x m [a]
consume = pipe (:) >+> (fold (.) id <*> pure [])

-- | Accumulate all input values into a non-empty list.
consume1 :: Monad m => Pipe a x m [a]
consume1 = pipe (:) >+> (fold1 (.) <*> pure [])

-- | Act as an identity for the first 'n' values, then terminate.
take :: Monad m => Int -> Pipe a a m ()
take n = replicateM_ n $ await >>= yield

-- | Remove the first 'n' values from the stream, then act as an identity.
drop :: Monad m => Int -> Pipe a a m r
drop n = replicateM_ n await >> idP

-- | Apply a function with multiple return values to the stream.
pipeList :: Monad m => (a -> [b]) -> Pipe a b m r
pipeList f = forever $ await >>= mapM_ yield . f

-- | Act as an identity until as long as inputs satisfy the given predicate.
-- Return the first element that doesn't satisfy the predicate.
takeWhile :: Monad m => (a -> Bool) -> Pipe a a m a
takeWhile p = go
  where
    go = await >>= \x -> if p x then yield x >> go else return x

-- | Variation of 'takeWhile' returning @()@.
takeWhile_ :: Monad m => (a -> Bool) -> Pipe a a m ()
takeWhile_ p = takeWhile p >> return ()

-- | Remove inputs as long as they satisfy the given predicate, then act as an
-- identity.
dropWhile :: Monad m => (a -> Bool) -> Pipe a a m r
dropWhile p = (takeWhile p >+> discard) >>= yield >> idP

-- | Yield Nothing when an input satisfying the predicate is received.
intersperse :: Monad m => (a -> Bool) -> Pipe a (Maybe a) m r
intersperse p = forever $ do
  x <- await
  when (p x) $ yield Nothing
  yield $ Just x

-- | Group input values by the given predicate.
groupBy :: Monad m => (a -> a -> Bool) -> Pipe a [a] m r
groupBy p = streaks >+> createGroups
  where
    streaks = await >>= \x -> yield (Just x) >> streaks' x
    streaks' x = do
      y <- await
      unless (p x y) $ yield Nothing
      yield $ Just y
      streaks' y
    createGroups = forever $
      takeWhile_ isJust >+>
      pipe fromJust >+>
      (consume1 >>= yield)

-- | Remove values from the stream that don't satisfy the given predicate.
filter :: Monad m => (a -> Bool) -> Pipe a a m r
filter p = forever $ takeWhile_ p

-- | Feed an input element to a pipe.
feed :: Monad m => a -> Pipe a b m r -> Pipe a b m r

-- this could be implemented as
-- feed x p = (yield x >> idP) >+> p
-- but this version is more efficient
feed _ (Pure r) = return r
feed _ (Throw e) = throw e
feed a (Free c h) = case go a c of
  (False, p) -> p >>= feed a
  (True, p)  -> join p
  where
    go a (Await k) = (True, return $ k a)
    go _ (Yield y c) = (False, yield y >> return c)
    go _ (M m s) = (False, liftP s m)