{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE LambdaCase   #-}
{-# LANGUAGE RankNTypes   #-}

-- |
-- Module      : Data.Conduino.Combinators
-- Copyright   : (c) Justin Le 2019
-- License     : BSD3
--
-- Maintainer  : justin@jle.im
-- Stability   : experimental
-- Portability : non-portable
--
-- A basic collection of base 'Pipe's that serve as a "prelude" for the
-- package.  This module is meant to be imported qualified.
--
-- > import qualified Data.Conduino.Combinators as C
--
module Data.Conduino.Combinators (
  -- * Sources
  -- ** Pure
  -- *** Infinite
    unfold
  , iterate
  , repeat
  -- *** Finite
  , unfoldMaybe
  , unfoldEither
  , iterateMaybe
  , iterateEither
  , sourceList
  , replicate
  -- ** Monadic
  -- *** Infinite
  , repeatM
  -- *** Finite
  , repeatMaybeM
  , repeatEitherM
  , replicateM
  , sourceHandleLines
  , sourceHandleLinesText
  , stdinLines
  , sourceHandle
  , stdin
  -- * Pipes
  , map
  , mapM
  , iterM
  , scan
  , mapAccum
  , take
  , takeWhile
  , filter
  , concatMap
  , concat
  , pairs
  , consecutive
  -- * Sinks
  -- ** Pure
  , drop
  , dropWhile
  , foldr
  , foldl
  , foldMap
  , fold
  , sinkNull
  , sinkList
  , last
  -- ** Monadic
  , sinkHandle
  , stdout
  , stderr
  ) where

import           Control.Applicative
import           Control.Exception
import           Control.Monad hiding          (mapM, replicateM)
import           Control.Monad.IO.Class
import           Control.Monad.Trans.Class
import           Control.Monad.Trans.Maybe
import           Data.Conduino
import           Data.Either
import           Data.Foldable hiding          (foldr, foldl, fold, concat, concatMap, foldMap)
import           Data.Maybe
import           Data.Semigroup
import           Prelude hiding                (map, iterate, mapM, replicate, repeat, foldr, drop, foldl, last, take, concatMap, filter, concat, takeWhile, dropWhile, foldMap)
import           System.IO.Error
import qualified Data.ByteString               as BS
import qualified Data.ByteString.Lazy.Internal as BSL
import qualified Data.Sequence                 as Seq
import qualified Data.Text                     as T
import qualified Data.Text.IO                  as T
import qualified System.IO                     as S

-- | A version of 'unfoldMaybe' that can choose the "result" value by
-- passing it in as 'Left'.
unfoldEither
    :: (s -> Either a (o, s))
    -> s
    -> Pipe i o u m a
unfoldEither :: forall s a o i u (m :: * -> *).
(s -> Either a (o, s)) -> s -> Pipe i o u m a
unfoldEither s -> Either a (o, s)
f = forall {i} {u} {m :: * -> *}. s -> Pipe i o u m a
go
  where
    go :: s -> Pipe i o u m a
go s
z = case s -> Either a (o, s)
f s
z of
      Left  a
r       -> forall (f :: * -> *) a. Applicative f => a -> f a
pure a
r
      Right (o
x, s
z') -> forall o i u (m :: * -> *). o -> Pipe i o u m ()
yield o
x forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> s -> Pipe i o u m a
go s
z'
{-# INLINE unfoldEither #-}

-- | A version of 'unfold' that can terminate and end by returning
-- 'Nothing'.
unfoldMaybe
    :: (s -> Maybe (o, s))
    -> s
    -> Pipe i o u m ()
unfoldMaybe :: forall s o i u (m :: * -> *).
(s -> Maybe (o, s)) -> s -> Pipe i o u m ()
unfoldMaybe s -> Maybe (o, s)
f = forall s a o i u (m :: * -> *).
(s -> Either a (o, s)) -> s -> Pipe i o u m a
unfoldEither (forall b a. b -> (a -> b) -> Maybe a -> b
maybe (forall a b. a -> Either a b
Left ()) forall a b. b -> Either a b
Right forall b c a. (b -> c) -> (a -> b) -> a -> c
. s -> Maybe (o, s)
f)
{-# INLINE unfoldMaybe #-}

-- | Repeatedly apply an "unfolding" function to a given initial state,
-- yielding the first item in the tuple as output and updating the state as
-- the second item in the tuple.  Goes on forever.  See 'unfoldMaybe' for
-- a version that stops.
unfold
    :: (s -> (o, s))
    -> s
    -> Pipe i o u m a
unfold :: forall s o i u (m :: * -> *) a.
(s -> (o, s)) -> s -> Pipe i o u m a
unfold s -> (o, s)
f = forall {i} {u} {m :: * -> *} {b}. s -> Pipe i o u m b
go
  where
    go :: s -> Pipe i o u m b
go s
z = forall o i u (m :: * -> *). o -> Pipe i o u m ()
yield o
x forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> s -> Pipe i o u m b
go s
z'
      where
        (o
x, s
z') = s -> (o, s)
f s
z
{-# INLINE unfold #-}

-- | A version of 'iterateMaybe' that can specify a result value by
-- providing it in the 'Left'.
iterateEither
    :: (o -> Either a o)
    -> o
    -> Pipe i o u m a
iterateEither :: forall o a i u (m :: * -> *).
(o -> Either a o) -> o -> Pipe i o u m a
iterateEither o -> Either a o
f = forall s a o i u (m :: * -> *).
(s -> Either a (o, s)) -> s -> Pipe i o u m a
unfoldEither (forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (,)) forall b c a. (b -> c) -> (a -> b) -> a -> c
. o -> Either a o
f)
{-# INLINE iterateEither #-}

-- | A version of 'iterate' that can choose to terminate and stop by
-- returning 'Nothing'.
iterateMaybe
    :: (o -> Maybe o)
    -> o
    -> Pipe i o u m ()
iterateMaybe :: forall o i u (m :: * -> *). (o -> Maybe o) -> o -> Pipe i o u m ()
iterateMaybe o -> Maybe o
f = forall s o i u (m :: * -> *).
(s -> Maybe (o, s)) -> s -> Pipe i o u m ()
unfoldMaybe (forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (,)) forall b c a. (b -> c) -> (a -> b) -> a -> c
. o -> Maybe o
f)
{-# INLINE iterateMaybe #-}

-- | Repeatedly apply a function to a given starting value and yield each
-- result forever.
--
-- >>> runPipePure $ iterate succ 0
--       .| take 5
--       .| sinkList
--
-- [1,2,3,4,5]
--
-- This doesn't yield the original starting value.  However, you can yield
-- it iterate after:
--
-- >>> runPipePure $ (yield 0 >> iterate succ 0)
--       .| take 5
--       .| sinkList
--
-- [0,1,2,3,4,5]
iterate
    :: (o -> o)
    -> o
    -> Pipe i o u m a
iterate :: forall o i u (m :: * -> *) a. (o -> o) -> o -> Pipe i o u m a
iterate o -> o
f = forall s o i u (m :: * -> *) a.
(s -> (o, s)) -> s -> Pipe i o u m a
unfold (forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (,) forall b c a. (b -> c) -> (a -> b) -> a -> c
. o -> o
f)
{-# INLINE iterate #-}

-- | Yield every item in a foldable container.
sourceList :: Foldable t => t a -> Pipe i a u m ()
sourceList :: forall (t :: * -> *) a i u (m :: * -> *).
Foldable t =>
t a -> Pipe i a u m ()
sourceList = forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ forall o i u (m :: * -> *). o -> Pipe i o u m ()
yield
{-# INLINE sourceList #-}

-- | Repeatedly yield a given item forever.
repeat :: o -> Pipe i o u m a
repeat :: forall o i u (m :: * -> *) a. o -> Pipe i o u m a
repeat = forall (f :: * -> *) a b. Applicative f => f a -> f b
forever forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall o i u (m :: * -> *). o -> Pipe i o u m ()
yield
{-# INLINE repeat #-}

-- | Yield a given item a certain number of times.
replicate :: Int -> o -> Pipe i o u m ()
replicate :: forall o i u (m :: * -> *). Int -> o -> Pipe i o u m ()
replicate Int
n = forall (m :: * -> *) a. Applicative m => Int -> m a -> m ()
replicateM_ Int
n forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall o i u (m :: * -> *). o -> Pipe i o u m ()
yield
{-# INLINE replicate #-}

-- | Like 'repeatMaybeM', but allow specification of a final result type.
repeatEitherM
    :: Monad m
    => m (Either a o)
    -> Pipe i o u m a
repeatEitherM :: forall (m :: * -> *) a o i u.
Monad m =>
m (Either a o) -> Pipe i o u m a
repeatEitherM m (Either a o)
x = forall {i} {u}. Pipe i o u m a
go
  where
    go :: Pipe i o u m a
go = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift m (Either a o)
x forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
      Left a
r  -> forall (f :: * -> *) a. Applicative f => a -> f a
pure a
r
      Right o
y -> forall o i u (m :: * -> *). o -> Pipe i o u m ()
yield o
y forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> Pipe i o u m a
go
{-# INLINE repeatEitherM #-}

-- | Repeat a monadic action, yielding the item in the 'Just' every time.
-- As soon as it sees 'Nothing', stop producing forever.
--
-- Remember that each item will only be "executed" when something
-- downstream requests output.
repeatMaybeM
    :: Monad m
    => m (Maybe o)
    -> Pipe i o u m ()
repeatMaybeM :: forall (m :: * -> *) o i u.
Monad m =>
m (Maybe o) -> Pipe i o u m ()
repeatMaybeM = forall (m :: * -> *) a o i u.
Monad m =>
m (Either a o) -> Pipe i o u m a
repeatEitherM forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (forall b a. b -> (a -> b) -> Maybe a -> b
maybe (forall a b. a -> Either a b
Left ()) forall a b. b -> Either a b
Right)
{-# INLINE repeatMaybeM #-}

-- | Repeat a monadic action a given number of times, yielding each result,
-- and then stop producing forever.
--
-- Remember that each item will only be "executed" when something
-- downstream requests output.
replicateM
    :: Monad m
    => Int
    -> m o
    -> Pipe i o u m ()
replicateM :: forall (m :: * -> *) o i u.
Monad m =>
Int -> m o -> Pipe i o u m ()
replicateM Int
n = forall (m :: * -> *) a. Applicative m => Int -> m a -> m ()
replicateM_ Int
n forall b c a. (b -> c) -> (a -> b) -> a -> c
. (forall o i u (m :: * -> *). o -> Pipe i o u m ()
yield forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<<) forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift
{-# INLINE replicateM #-}

-- | Source from each line received from 'stdin'.  This stops as soon as
-- end-of-file is reached, or an empty line is seen.
stdinLines :: MonadIO m => Pipe i String u m ()
stdinLines :: forall (m :: * -> *) i u. MonadIO m => Pipe i String u m ()
stdinLines = forall (m :: * -> *) i u.
MonadIO m =>
Handle -> Pipe i String u m ()
sourceHandleLines Handle
S.stdin
          forall (m :: * -> *) a b u v c r.
Monad m =>
Pipe a b u m v -> Pipe b c v m r -> Pipe a c u m r
.| forall i u (m :: * -> *). (i -> Bool) -> Pipe i i u m ()
takeWhile (Bool -> Bool
not forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: * -> *) a. Foldable t => t a -> Bool
null)
{-# INLINE stdinLines #-}

-- | Source from stdin, yielding bytestrings as they are drawn.  If you
-- want to retrieve each line as a string, see 'stdinLines'.
stdin :: MonadIO m => Pipe i BS.ByteString u m ()
stdin :: forall (m :: * -> *) i u. MonadIO m => Pipe i ByteString u m ()
stdin = forall (m :: * -> *) i u.
MonadIO m =>
Handle -> Pipe i ByteString u m ()
sourceHandle Handle
S.stdin
{-# INLINE stdin #-}

-- | Source from a given I/O handle, yielding each line drawn as a string.
-- To draw raw bytes, use 'sourceHandle'.
--
-- This stop as soon as end-of-file is reached.
--
-- Sinve v0.2.3.0, this now continues through empty lines.
sourceHandleLines
    :: MonadIO m
    => S.Handle
    -> Pipe i String u m ()
sourceHandleLines :: forall (m :: * -> *) i u.
MonadIO m =>
Handle -> Pipe i String u m ()
sourceHandleLines Handle
h = forall (m :: * -> *) o i u.
Monad m =>
m (Maybe o) -> Pipe i o u m ()
repeatMaybeM forall a b. (a -> b) -> a -> b
$ do
    Bool
d <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ Handle -> IO Bool
S.hIsEOF Handle
h
    if Bool
d
      then forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a. Maybe a
Nothing
      else forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall e b a.
Exception e =>
(e -> Maybe b) -> IO a -> (b -> IO a) -> IO a
catchJust
                (forall (f :: * -> *). Alternative f => Bool -> f ()
guard forall b c a. (b -> c) -> (a -> b) -> a -> c
. IOError -> Bool
isEOFError)
                (forall a. a -> Maybe a
Just forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Handle -> IO String
S.hGetLine Handle
h)
                forall a b. (a -> b) -> a -> b
$ \()
_ -> forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a. Maybe a
Nothing
{-# INLINE sourceHandleLines #-}

-- | Source from a given I/O handle, yielding each line drawn as a string.
-- To draw raw bytes, use 'sourceHandle'.
--
-- This stop as soon as end-of-file is reached, or an empty line is seen.
sourceHandleLinesText
    :: MonadIO m
    => S.Handle
    -> Pipe i T.Text u m ()
sourceHandleLinesText :: forall (m :: * -> *) i u. MonadIO m => Handle -> Pipe i Text u m ()
sourceHandleLinesText Handle
h = forall (m :: * -> *) o i u.
Monad m =>
m (Maybe o) -> Pipe i o u m ()
repeatMaybeM forall a b. (a -> b) -> a -> b
$ do
    Bool
d <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ Handle -> IO Bool
S.hIsEOF Handle
h
    if Bool
d
      then forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a. Maybe a
Nothing
      else forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall e b a.
Exception e =>
(e -> Maybe b) -> IO a -> (b -> IO a) -> IO a
catchJust
                (forall (f :: * -> *). Alternative f => Bool -> f ()
guard forall b c a. (b -> c) -> (a -> b) -> a -> c
. IOError -> Bool
isEOFError)
                (forall a. a -> Maybe a
Just forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Handle -> IO Text
T.hGetLine Handle
h)
                forall a b. (a -> b) -> a -> b
$ \()
_ -> forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a. Maybe a
Nothing
{-# INLINE sourceHandleLinesText #-}

-- | Source from a given I/O handle, yielding bytestrings as they are
-- pulled.  If you want to retrieve each line as a string, see
-- 'sourceHandleLines'.
sourceHandle
    :: MonadIO m
    => S.Handle
    -> Pipe i BS.ByteString u m ()
sourceHandle :: forall (m :: * -> *) i u.
MonadIO m =>
Handle -> Pipe i ByteString u m ()
sourceHandle Handle
h = forall (m :: * -> *) o i u.
Monad m =>
m (Maybe o) -> Pipe i o u m ()
repeatMaybeM
               forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (forall (m :: * -> *) a. MonadPlus m => (a -> Bool) -> m a -> m a
mfilter (Bool -> Bool
not forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> Bool
BS.null) forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. a -> Maybe a
Just)
               forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
               forall a b. (a -> b) -> a -> b
$ Handle -> Int -> IO ByteString
BS.hGetSome Handle
h Int
BSL.defaultChunkSize
{-# INLINE sourceHandle #-}

-- | Sink into a given I/O handle, writing each input to the handle.
sinkHandle
    :: MonadIO m
    => S.Handle
    -> Pipe BS.ByteString o u m ()
sinkHandle :: forall (m :: * -> *) o u.
MonadIO m =>
Handle -> Pipe ByteString o u m ()
sinkHandle Handle
h = forall (m :: * -> *) i o u. Monad m => (i -> m o) -> Pipe i o u m u
mapM (forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall b c a. (b -> c) -> (a -> b) -> a -> c
. Handle -> ByteString -> IO ()
BS.hPut Handle
h)
            forall (m :: * -> *) a b u v c r.
Monad m =>
Pipe a b u m v -> Pipe b c v m r -> Pipe a c u m r
.| forall i o u (m :: * -> *). Pipe i o u m ()
sinkNull
{-# INLINE sinkHandle #-}

-- | A sink into stdout.
stdout :: MonadIO m => Pipe BS.ByteString o u m ()
stdout :: forall (m :: * -> *) o u. MonadIO m => Pipe ByteString o u m ()
stdout = forall (m :: * -> *) o u.
MonadIO m =>
Handle -> Pipe ByteString o u m ()
sinkHandle Handle
S.stdout
{-# INLINE stdout #-}

-- | A sink into stderr.
stderr :: MonadIO m => Pipe BS.ByteString o u m ()
stderr :: forall (m :: * -> *) o u. MonadIO m => Pipe ByteString o u m ()
stderr = forall (m :: * -> *) o u.
MonadIO m =>
Handle -> Pipe ByteString o u m ()
sinkHandle Handle
S.stderr
{-# INLINE stderr #-}

-- | Repeat a monadic action forever, yielding each output.
--
-- Remember that each item will only be "executed" when something
-- downstream requests output.
repeatM
    :: Monad m
    => m o
    -> Pipe i o u m a
repeatM :: forall (m :: * -> *) o i u a. Monad m => m o -> Pipe i o u m a
repeatM m o
x = forall {i} {u} {b}. Pipe i o u m b
go
  where
    go :: Pipe i o u m b
go = (forall o i u (m :: * -> *). o -> Pipe i o u m ()
yield forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift m o
x) forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> Pipe i o u m b
go
{-# INLINE repeatM #-}

-- | Process every incoming item with a pure function, and yield its
-- output.
map :: (i -> o) -> Pipe i o u m u
map :: forall i o u (m :: * -> *). (i -> o) -> Pipe i o u m u
map i -> o
f = forall i o u (m :: * -> *) a.
(i -> Pipe i o u m a) -> Pipe i o u m u
awaitForever (forall o i u (m :: * -> *). o -> Pipe i o u m ()
yield forall b c a. (b -> c) -> (a -> b) -> a -> c
. i -> o
f)
{-# INLINE map #-}

-- | Map a monadic function to process every input, and yield its output.
mapM :: Monad m => (i -> m o) -> Pipe i o u m u
mapM :: forall (m :: * -> *) i o u. Monad m => (i -> m o) -> Pipe i o u m u
mapM i -> m o
f = forall i o u (m :: * -> *) a.
(i -> Pipe i o u m a) -> Pipe i o u m u
awaitForever ((forall o i u (m :: * -> *). o -> Pipe i o u m ()
yield forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<<) forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift forall b c a. (b -> c) -> (a -> b) -> a -> c
. i -> m o
f)
{-# INLINE mapM #-}

-- | Execute a monadic function to process every input, passing through the
-- original value back downstream.
--
-- @since 0.2.1.0
iterM :: Monad m => (i -> m ()) -> Pipe i i u m u
iterM :: forall (m :: * -> *) i u. Monad m => (i -> m ()) -> Pipe i i u m u
iterM i -> m ()
f = forall (m :: * -> *) i o u. Monad m => (i -> m o) -> Pipe i o u m u
mapM (\i
x -> i
x forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ i -> m ()
f i
x)
{-# INLINE iterM #-}

-- | Map a pure "stateful" function over each incoming item.  Give
-- a function to update the state and return an output and an initial
-- state.
mapAccum
    :: (i -> s -> (s, o))       -- ^ update state and output
    -> s                        -- ^ initial state
    -> Pipe i o u m u
mapAccum :: forall i s o u (m :: * -> *).
(i -> s -> (s, o)) -> s -> Pipe i o u m u
mapAccum i -> s -> (s, o)
f = forall {u} {m :: * -> *}. s -> Pipe i o u m u
go
  where
    go :: s -> Pipe i o u m u
go !s
x = forall i o u (m :: * -> *). (i -> Pipe i o u m u) -> Pipe i o u m u
awaitWith forall a b. (a -> b) -> a -> b
$ \i
y ->
        let (!s
x', !o
z) = i -> s -> (s, o)
f i
y s
x
        in  forall o i u (m :: * -> *). o -> Pipe i o u m ()
yield o
z forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> s -> Pipe i o u m u
go s
x'
{-# INLINE mapAccum #-}

-- | Like 'foldl', but yields every accumulator value downstream.
--
-- >>> runPipePure $ sourceList [1..10]
--       .| scan (+) 0
--       .| sinkList
-- [1,3,6,10,15,21,28,36,45,55]
-- @
scan
    :: (o -> i -> o)
    -> o
    -> Pipe i o u m u
scan :: forall o i u (m :: * -> *). (o -> i -> o) -> o -> Pipe i o u m u
scan o -> i -> o
f = forall {u} {m :: * -> *}. o -> Pipe i o u m u
go
  where
    go :: o -> Pipe i o u m u
go !o
x = forall i o u (m :: * -> *). (i -> Pipe i o u m u) -> Pipe i o u m u
awaitWith forall a b. (a -> b) -> a -> b
$ \i
y ->
      let x' :: o
x' = o -> i -> o
f o
x i
y
      in  forall o i u (m :: * -> *). o -> Pipe i o u m ()
yield o
x' forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> o -> Pipe i o u m u
go o
x'
{-# INLINE scan #-}

-- | Yield consecutive pairs of values.
--
-- >>> runPipePure $ sourceList [1..5]
--       .| pairs
--       .| sinkList
-- [(1,2),(2,3),(3,4),(4,5)]
pairs :: Pipe i (i, i) u m u
pairs :: forall i u (m :: * -> *). Pipe i (i, i) u m u
pairs = forall i o u (m :: * -> *). (i -> Pipe i o u m u) -> Pipe i o u m u
awaitWith forall {t} {u} {m :: * -> *}. t -> Pipe t (t, t) u m u
go
  where
    go :: t -> Pipe t (t, t) u m u
go t
x = forall i o u (m :: * -> *). (i -> Pipe i o u m u) -> Pipe i o u m u
awaitWith forall a b. (a -> b) -> a -> b
$ \t
y -> do
      forall o i u (m :: * -> *). o -> Pipe i o u m ()
yield (t
x, t
y)
      t -> Pipe t (t, t) u m u
go t
y
{-# INLINE pairs #-}

-- | Yield consecutive runs of at most @n@ of values, starting with an
-- empty sequence.
--
-- To get only "full" sequences, pipe with 'filter'.
--
-- >>> runPipePure $ sourceList [1..6]
--       .| consecutive 3
--       .| map toList
--       .| sinkList
-- [[],[1],[1,2],[1,2,3],[2,3,4],[3,4,5],[4,5,6]]
--
-- >>> runPipePure $ sourceList [1..6]
--       .| consecutive 3
--       .| filter ((== 3) . Seq.length)
--       .| map toList
--       .| sinkList
-- [[1,2,3],[2,3,4],[3,4,5],[4,5,6]]
consecutive :: Int -> Pipe i (Seq.Seq i) u m u
consecutive :: forall i u (m :: * -> *). Int -> Pipe i (Seq i) u m u
consecutive Int
n = forall {a} {u} {m :: * -> *}. Seq a -> Pipe a (Seq a) u m u
go forall a. Seq a
Seq.empty
  where
    go :: Seq a -> Pipe a (Seq a) u m u
go Seq a
xs = do
      forall o i u (m :: * -> *). o -> Pipe i o u m ()
yield Seq a
xs
      forall i o u (m :: * -> *). (i -> Pipe i o u m u) -> Pipe i o u m u
awaitWith forall a b. (a -> b) -> a -> b
$ \a
y -> Seq a -> Pipe a (Seq a) u m u
go forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. Int -> Seq a -> Seq a
Seq.drop (forall a. Seq a -> Int
Seq.length Seq a
xs forall a. Num a => a -> a -> a
- Int
n forall a. Num a => a -> a -> a
+ Int
1) forall a b. (a -> b) -> a -> b
$ (Seq a
xs forall a. Seq a -> a -> Seq a
Seq.:|> a
y)
{-# INLINE consecutive #-}

-- | Let a given number of items pass through the stream uninhibited, and
-- then stop producing forever.
--
-- This is most useful if you sequence a second conduit after it.
--
-- >>> runPipePure $ sourceList [1..8]
--       .| (do take 3 .| map (*2)         -- double the first 3 items
--              map negate                 -- negate the rest
--          )
--       .| sinkList
-- [2,4,6,-4,-5,-6,-7,-8]
take :: Int -> Pipe i i u m ()
take :: forall i u (m :: * -> *). Int -> Pipe i i u m ()
take Int
n = forall (f :: * -> *) a. Functor f => f a -> f ()
void forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a. MaybeT m a -> m (Maybe a)
runMaybeT forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a. Applicative m => Int -> m a -> m ()
replicateM_ Int
n forall a b. (a -> b) -> a -> b
$
    forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall o i u (m :: * -> *). o -> Pipe i o u m ()
yield forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< forall (m :: * -> *) a. m (Maybe a) -> MaybeT m a
MaybeT forall i o u (m :: * -> *). Pipe i o u m (Maybe i)
await
{-# INLINE take #-}

-- | Let elements pass until an element is received that does not satisfy
-- the predicate, then stop producing forever.
--
-- Like 'take', is most useful if you sequence a second conduit after it.
takeWhile :: (i -> Bool) -> Pipe i i u m ()
takeWhile :: forall i u (m :: * -> *). (i -> Bool) -> Pipe i i u m ()
takeWhile i -> Bool
p = forall {u} {m :: * -> *}. Pipe i i u m ()
go
  where
    go :: Pipe i i u m ()
go = forall i o u (m :: * -> *). Pipe i o u m (Maybe i)
await forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
      Maybe i
Nothing -> forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
      Just i
x
        | i -> Bool
p i
x       -> forall o i u (m :: * -> *). o -> Pipe i o u m ()
yield i
x forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> Pipe i i u m ()
go
        | Bool
otherwise -> forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
{-# INLINE takeWhile #-}

-- | Only allow values satisfying a predicate to pass.
filter
    :: (i -> Bool)
    -> Pipe i i u m u
filter :: forall i u (m :: * -> *). (i -> Bool) -> Pipe i i u m u
filter i -> Bool
p = forall i o u (m :: * -> *) a.
(i -> Pipe i o u m a) -> Pipe i o u m u
awaitForever forall a b. (a -> b) -> a -> b
$ \i
x -> forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (i -> Bool
p i
x) forall a b. (a -> b) -> a -> b
$ forall o i u (m :: * -> *). o -> Pipe i o u m ()
yield i
x
{-# INLINE filter #-}

-- | Map a function returning a container onto every incoming item, and
-- yield all of the outputs from that function.
concatMap
    :: Foldable t
    => (i -> t o)
    -> Pipe i o u m u
concatMap :: forall (t :: * -> *) i o u (m :: * -> *).
Foldable t =>
(i -> t o) -> Pipe i o u m u
concatMap i -> t o
f = forall i o u (m :: * -> *) a.
(i -> Pipe i o u m a) -> Pipe i o u m u
awaitForever (forall (t :: * -> *) a i u (m :: * -> *).
Foldable t =>
t a -> Pipe i a u m ()
sourceList forall b c a. (b -> c) -> (a -> b) -> a -> c
. i -> t o
f)
{-# INLINE concatMap #-}

-- | Take an input of containers and output each of their elements
-- successively.
concat :: Foldable t => Pipe (t i) i u m u
concat :: forall (t :: * -> *) i u (m :: * -> *).
Foldable t =>
Pipe (t i) i u m u
concat = forall i o u (m :: * -> *) a.
(i -> Pipe i o u m a) -> Pipe i o u m u
awaitForever forall (t :: * -> *) a i u (m :: * -> *).
Foldable t =>
t a -> Pipe i a u m ()
sourceList
{-# INLINE concat #-}

-- | Right-fold every input into an accumulated value.
--
-- Essentially this builds up a giant continuation that will be run all at
-- once on the final result.
foldr :: (a -> b -> b) -> b -> Pipe a o u m b
foldr :: forall a b o u (m :: * -> *). (a -> b -> b) -> b -> Pipe a o u m b
foldr a -> b -> b
f b
z = forall {o} {u} {m :: * -> *}. Pipe a o u m b
go
  where
    go :: Pipe a o u m b
go = forall i o u (m :: * -> *). Pipe i o u m (Maybe i)
await forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
      Maybe a
Nothing -> forall (f :: * -> *) a. Applicative f => a -> f a
pure b
z
      Just a
x  -> a -> b -> b
f a
x forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Pipe a o u m b
go
{-# INLINE foldr #-}

-- | Left-fold every input into an accumulated value.
--
-- Essentially this maintains a state and modifies that state with every
-- input, using the given accumulating function.
foldl :: (b -> a -> b) -> b -> Pipe a o u m b
foldl :: forall b a o u (m :: * -> *). (b -> a -> b) -> b -> Pipe a o u m b
foldl b -> a -> b
f = forall {o} {u} {m :: * -> *}. b -> Pipe a o u m b
go
  where
    go :: b -> Pipe a o u m b
go !b
z = forall i o u (m :: * -> *). Pipe i o u m (Maybe i)
await forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
      Maybe a
Nothing -> forall (f :: * -> *) a. Applicative f => a -> f a
pure b
z
      Just !a
x -> b -> Pipe a o u m b
go (b -> a -> b
f b
z a
x)
{-# INLINE foldl #-}

-- | Fold every incoming item monoidally, and return the result once
-- finished.
fold :: Monoid a => Pipe a o u m a
fold :: forall a o u (m :: * -> *). Monoid a => Pipe a o u m a
fold = forall b a o u (m :: * -> *). (b -> a -> b) -> b -> Pipe a o u m b
foldl forall a. Semigroup a => a -> a -> a
(<>) forall a. Monoid a => a
mempty
{-# INLINE fold #-}

-- | Fold every incoming item according to a monoidal projection, and
-- return the result once finished.
--
-- This can be used to implement many useful consumers, like ones that find
-- the sum or the maximum item:
--
-- @
-- sum :: Num i => Pipe i o u m i
-- sum = getSum <$> foldMap Sum
--
-- maximum :: Ord i => Pipe i o u m (Maybe i)
-- maximum = fmap getMax <$> foldMap (Just . Max)
-- @
foldMap :: Monoid a => (i -> a) -> Pipe i o u m a
foldMap :: forall a i o u (m :: * -> *).
Monoid a =>
(i -> a) -> Pipe i o u m a
foldMap i -> a
f = forall b a o u (m :: * -> *). (b -> a -> b) -> b -> Pipe a o u m b
foldl (\a
x i
y -> a
x forall a. Semigroup a => a -> a -> a
<> i -> a
f i
y) forall a. Monoid a => a
mempty
{-# INLINE foldMap #-}

-- | Sink every incoming item into a list.
--
-- Note that this keeps the entire list in memory until it is all
-- eventually read, and does no "lazy IO".  It exhauts the whole stream
-- before returning anything.
sinkList :: Pipe i o u m [i]
sinkList :: forall i o u (m :: * -> *). Pipe i o u m [i]
sinkList = forall a b o u (m :: * -> *). (a -> b -> b) -> b -> Pipe a o u m b
foldr (:) []
{-# INLINE sinkList #-}

-- | Ignore a certain amount of items from the input stream, and then stop
-- producing forever.
--
-- This is most useful if you sequence a second consumer after it:
--
-- >>> runPipePure $ sourceList [1..8]
--       .| (drop 3 >> 'sinkList')
-- [4,5,6,7,8]
drop :: Int -> Pipe i o u m ()
drop :: forall i o u (m :: * -> *). Int -> Pipe i o u m ()
drop Int
n = forall (m :: * -> *) a. Applicative m => Int -> m a -> m ()
replicateM_ Int
n forall i o u (m :: * -> *). Pipe i o u m (Maybe i)
await
{-# INLINE drop #-}

-- | Ignore items from an input stream as long as they match a predicate.
-- Afterwards, stop producing forever.
--
-- Like for 'drop', is most useful of you sequence a second consumer after
-- it.
dropWhile
    :: (i -> Bool)
    -> Pipe i o u m ()
dropWhile :: forall i o u (m :: * -> *). (i -> Bool) -> Pipe i o u m ()
dropWhile i -> Bool
p = forall {o} {u} {m :: * -> *}. Pipe i o u m ()
go
  where
    go :: Pipe i o u m ()
go = forall i o u (m :: * -> *). Pipe i o u m (Maybe i)
await forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
      Maybe i
Nothing -> forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
      Just i
x
        | i -> Bool
p i
x       -> Pipe i o u m ()
go
        | Bool
otherwise -> forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
{-# INLINE dropWhile #-}

-- | Consume an entire input stream and ignore all of its outputs.
sinkNull :: Pipe i o u m ()
sinkNull :: forall i o u (m :: * -> *). Pipe i o u m ()
sinkNull = forall i o u (m :: * -> *). Pipe i o u m (Maybe i)
await forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
    Maybe i
Nothing -> forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
    Just i
_  -> forall i o u (m :: * -> *). Pipe i o u m ()
sinkNull

-- | Get the last item emitted by a stream.
--
-- To get the first item ("head"), use 'await' or 'awaitSurely'.
last :: Pipe i o u m (Maybe i)
last :: forall i o u (m :: * -> *). Pipe i o u m (Maybe i)
last = forall {a} {o} {u} {m :: * -> *}. Maybe a -> Pipe a o u m (Maybe a)
go forall a. Maybe a
Nothing
  where
    go :: Maybe a -> Pipe a o u m (Maybe a)
go Maybe a
x = forall i o u (m :: * -> *). Pipe i o u m (Maybe i)
await forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
      Maybe a
Nothing -> forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe a
x
      Just a
r  -> Maybe a -> Pipe a o u m (Maybe a)
go (forall a. a -> Maybe a
Just a
r)
{-# INLINE last #-}