{-| "Pipes.Group.Tutorial" is the correct introduction to the use of this module,
    which is mostly just an optimized @Pipes.Group@, replacing @FreeT@ with @Stream@. 
    The module also includes optimized functions for interoperation:

> fromStream :: Monad m => Stream (Of a) m r -> Producer' a m r
> toStream :: Monad m => Producer a m r -> Stream (Of a) m r
    .
    It is not a drop in replacement for @Pipes.Group@. The only systematic difference
    is that this simple module omits lenses. It is hoped that this will
    may make elementary usage easier to grasp. The lenses exported the pipes packages
    only come into their own with the simple @StateT@ parsing procedure pipes promotes. 
    We are not attempting here to replicate this advanced procedure, but only to make 
    elementary forms of breaking and splitting possible in the simplest possible way.
    .
    The @pipes-group@ tutorial 
    is framed as a hunt for a genuinely streaming
    @threeGroups@, which would collect the first three groups of matching items while
    never holding more than the present item in  memory. 
    The formulation it opts for in the end would 
    be expressed here thus:

> import Pipes
> import Streaming.Pipes 
> import qualified Pipes.Prelude as P
>
> threeGroups :: (Monad m, Eq a) => Producer a m () -> Producer a m ()
> threeGroups = concats . takes 3 . groups

   The program splits the initial producer into a connected stream of
   producers containing  "equal" values; it takes three of those; and then
   erases the effects of splitting. So for example

>>> runEffect $ threeGroups (each "aabccoooooo") >-> P.print
'a'
'a'
'b'
'c'
'c'
 
   The new user might look at the examples of splitting, breaking and joining
   in @Streaming.Prelude@ keeping in mind that @Producer a m r@ is equivalent
   to @Stream (Of a) m r@.
   .
   For the rest, only part of the tutorial that would need revision is 
   the bit at the end about writing explicit @FreeT@ programs. Here one does
   not proceed by pattern matching, but uses `inspect` in place of `runFreeT`

> inspect :: (Monad m, Functor f) => Stream f m r -> m (Either r (f (Stream f m r)))

   and for construction of a @Stream (Producer a m) m r@, the usual battery of combinators: 

> wrap   :: (Monad m, Functor f) => f (Stream f m r) -> Stream f m r
> effect :: (Monad m, Functor f) => m (Stream f m r) -> Stream f m r
> yields :: (Monad m, Functor f) => f r -> Stream f m r
> lift   :: (Monad m, Functor f) => m r -> Stream f m r 

   and so on. 

-}

{-#LANGUAGE RankNTypes, BangPatterns #-}

module Streaming.Pipes (
  -- * @Streaming@ \/ @Pipes@ interoperation
  fromStream,
  toStream,
  toStreamingByteString,
  fromStreamingByteString,
  
  -- * Splitting a 'Producer' into a connected stream of 'Producer's
  chunksOf,
  groups,
  groupsBy,
  groupsBy',
  split,
  breaks,
  
  -- * Rejoining a connected stream of 'Producer's
  concats, 
  intercalates,
  
  -- * Folding over the separate layers of a connected stream of 'Producer's
  folds,
  foldsM,
  
  -- * Transforming a connected stream of 'Producer's
  takes,
  takes',
  maps,
  
  -- * Streaming division of a 'Producer' into two
  span,
  break,
  splitAt,
  group,
  groupBy,
  

  ) where

import Pipes
import Streaming hiding (concats, groups, chunksOf)
import qualified Streaming.Internal as SI
import qualified Pipes.Internal as PI
import qualified Pipes.Prelude as P
import qualified Pipes as P


import qualified Streaming.Prelude as S
import Control.Monad (liftM)
import Prelude hiding (span, splitAt, break)
import qualified Data.ByteString as B
import qualified Data.ByteString.Streaming as Q
import qualified Data.ByteString.Streaming.Internal as Q


{- | Construct an ordinary pipes 'Producer' from a 'Stream' of elements

>>> runEffect $ fromStream (S.each  [1..3]) >-> P.print
1
2
3

-}
fromStream :: Monad m => Stream (Of a) m r -> Producer' a m r
fromStream :: forall (m :: * -> *) a r.
Monad m =>
Stream (Of a) m r -> Producer' a m r
fromStream = forall {m :: * -> *} {b} {r} {a'} {a} {b'}.
Monad m =>
Stream (Of b) m r -> Proxy a' a b' b m r
loop where
  loop :: Stream (Of b) m r -> Proxy a' a b' b m r
loop Stream (Of b) m r
stream = case Stream (Of b) m r
stream of -- this should be rewritten without constructors
    SI.Return r
r -> forall a' a b' b (m :: * -> *) r. r -> Proxy a' a b' b m r
PI.Pure r
r
    SI.Effect m (Stream (Of b) m r)
m  -> forall a' a b' b (m :: * -> *) r.
m (Proxy a' a b' b m r) -> Proxy a' a b' b m r
PI.M (forall (m :: * -> *) a1 r. Monad m => (a1 -> r) -> m a1 -> m r
liftM Stream (Of b) m r -> Proxy a' a b' b m r
loop m (Stream (Of b) m r)
m)
    SI.Step (b
a:>Stream (Of b) m r
rest) -> forall a' a b' b (m :: * -> *) r.
b -> (b' -> Proxy a' a b' b m r) -> Proxy a' a b' b m r
PI.Respond b
a  (\b'
_ -> Stream (Of b) m r -> Proxy a' a b' b m r
loop Stream (Of b) m r
rest)
{-# INLINABLE fromStream #-}

{- | Construct a 'Stream' of elements from a @pipes@ 'Producer'

>>> S.print $ toStream $ P.each [1..3]
1
2
3

-}
toStream :: Monad m => Producer a m r -> Stream (Of a) m r
toStream :: forall (m :: * -> *) a r.
Monad m =>
Producer a m r -> Stream (Of a) m r
toStream = forall {m :: * -> *} {a} {a} {r}.
Monad m =>
Proxy X a () a m r -> Stream (Of a) m r
loop where
  loop :: Proxy X a () a m r -> Stream (Of a) m r
loop Proxy X a () a m r
stream = case Proxy X a () a m r
stream of
    PI.Pure r
r -> forall (f :: * -> *) (m :: * -> *) r. r -> Stream f m r
SI.Return r
r 
    PI.M m (Proxy X a () a m r)
m -> forall (f :: * -> *) (m :: * -> *) r.
m (Stream f m r) -> Stream f m r
SI.Effect (forall (m :: * -> *) a1 r. Monad m => (a1 -> r) -> m a1 -> m r
liftM Proxy X a () a m r -> Stream (Of a) m r
loop m (Proxy X a () a m r)
m)
    PI.Respond a
a () -> Proxy X a () a m r
f -> forall (f :: * -> *) (m :: * -> *) r.
f (Stream f m r) -> Stream f m r
SI.Step (a
a forall a b. a -> b -> Of a b
:> Proxy X a () a m r -> Stream (Of a) m r
loop (() -> Proxy X a () a m r
f ()))
    PI.Request X
x a -> Proxy X a () a m r
g -> forall a. X -> a
PI.closed X
x
{-# INLINABLE toStream #-}

{-| Link the chunks of a producer of bytestrings into a single byte stream
-}
toStreamingByteString :: Monad m => Producer B.ByteString m r -> Q.ByteString m r
toStreamingByteString :: forall (m :: * -> *) r.
Monad m =>
Producer ByteString m r -> ByteString m r
toStreamingByteString = forall {m :: * -> *} {a} {r}.
Monad m =>
Proxy X a () ByteString m r -> ByteStream m r
loop where
  loop :: Proxy X a () ByteString m r -> ByteStream m r
loop Proxy X a () ByteString m r
stream = case Proxy X a () ByteString m r
stream of
    PI.Pure r
r -> forall (m :: * -> *) r. r -> ByteStream m r
Q.Empty r
r 
    PI.M m (Proxy X a () ByteString m r)
m    -> forall (m :: * -> *) r. m (ByteStream m r) -> ByteStream m r
Q.Go (forall (m :: * -> *) a1 r. Monad m => (a1 -> r) -> m a1 -> m r
liftM Proxy X a () ByteString m r -> ByteStream m r
loop m (Proxy X a () ByteString m r)
m)
    PI.Respond ByteString
a () -> Proxy X a () ByteString m r
f -> forall (m :: * -> *) r.
ByteString -> ByteStream m r -> ByteStream m r
Q.Chunk ByteString
a (Proxy X a () ByteString m r -> ByteStream m r
loop (() -> Proxy X a () ByteString m r
f ()))
    PI.Request X
x a -> Proxy X a () ByteString m r
g -> forall a. X -> a
PI.closed X
x
{-# INLINABLE toStreamingByteString #-}


{-| Successively yield the chunks hidden in a byte stream. 
-}
fromStreamingByteString :: Monad m => Q.ByteString m r -> Producer' B.ByteString m r
fromStreamingByteString :: forall (m :: * -> *) r.
Monad m =>
ByteString m r -> Producer' ByteString m r
fromStreamingByteString = forall {m :: * -> *} {r} {a'} {a} {b'}.
Monad m =>
ByteStream m r -> Proxy a' a b' ByteString m r
loop where
  loop :: ByteStream m r -> Proxy a' a b' ByteString m r
loop ByteStream m r
stream = case ByteStream m r
stream of -- this should be rewritten without constructors
    Q.Empty r
r      -> forall a' a b' b (m :: * -> *) r. r -> Proxy a' a b' b m r
PI.Pure r
r
    Q.Go m (ByteStream m r)
m         -> forall a' a b' b (m :: * -> *) r.
m (Proxy a' a b' b m r) -> Proxy a' a b' b m r
PI.M (forall (m :: * -> *) a1 r. Monad m => (a1 -> r) -> m a1 -> m r
liftM ByteStream m r -> Proxy a' a b' ByteString m r
loop m (ByteStream m r)
m)
    Q.Chunk ByteString
a ByteStream m r
rest -> forall a' a b' b (m :: * -> *) r.
b -> (b' -> Proxy a' a b' b m r) -> Proxy a' a b' b m r
PI.Respond ByteString
a  (\b'
_ -> ByteStream m r -> Proxy a' a b' ByteString m r
loop ByteStream m r
rest)
{-# INLINABLE fromStreamingByteString #-}



{-| 'span' splits a 'Producer' into two 'Producer's; the outer 'Producer' 
    is the longest consecutive group of elements that satisfy the predicate.
    Its inverse is 'Control.Monad.join'
-}
span :: Monad m => (a -> Bool) -> Producer a m r -> Producer a m (Producer a m r)
span :: forall (m :: * -> *) a r.
Monad m =>
(a -> Bool) -> Producer a m r -> Producer a m (Producer a m r)
span a -> Bool
predicate = forall {m :: * -> *} {a} {x'} {x}.
Monad m =>
Producer a m a -> Proxy x' x () a m (Producer a m a)
loop where
  loop :: Producer a m a -> Proxy x' x () a m (Producer a m a)
loop Producer a m a
p = do
    Either a (a, Producer a m a)
e <- forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (forall (m :: * -> *) a r.
Monad m =>
Producer a m r -> m (Either r (a, Producer a m r))
next Producer a m a
p)
    case Either a (a, Producer a m a)
e of
      Left   a
r      -> forall (m :: * -> *) a. Monad m => a -> m a
return (forall (m :: * -> *) a. Monad m => a -> m a
return a
r)
      Right (a
a, Producer a m a
p') ->
          if a -> Bool
predicate a
a
          then forall (m :: * -> *) a x' x. Functor m => a -> Proxy x' x () a m ()
yield a
a forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Producer a m a -> Proxy x' x () a m (Producer a m a)
loop Producer a m a
p'
          else forall (m :: * -> *) a. Monad m => a -> m a
return (forall (m :: * -> *) a x' x. Functor m => a -> Proxy x' x () a m ()
yield a
a forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Producer a m a
p')
{-# INLINABLE span #-}



{-| 'break' splits a 'Producer' into two 'Producer's; the outer 'Producer' 
    is the longest consecutive group of elements that fail the predicate.
    Its inverse is 'Control.Monad.join'
-}
break :: Monad m => (a -> Bool) -> Producer a m r -> Producer a m (Producer a m r)
break :: forall (m :: * -> *) a r.
Monad m =>
(a -> Bool) -> Producer a m r -> Producer a m (Producer a m r)
break a -> Bool
predicate = forall (m :: * -> *) a r.
Monad m =>
(a -> Bool) -> Producer a m r -> Producer a m (Producer a m r)
span (Bool -> Bool
not forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Bool
predicate)
{-#INLINE break #-}

split :: (Eq a, Monad m) =>
      a -> Producer a m r -> Stream (Producer a m) m r
split :: forall a (m :: * -> *) r.
(Eq a, Monad m) =>
a -> Producer a m r -> Stream (Producer a m) m r
split a
t  = forall {m :: * -> *} {r}.
Monad m =>
Producer a m r -> Stream (Proxy X () () a m) m r
loop  where
  loop :: Producer a m r -> Stream (Proxy X () () a m) m r
loop Producer a m r
stream = do
    Either r (a, Producer a m r)
e <- forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a r.
Monad m =>
Producer a m r -> m (Either r (a, Producer a m r))
next Producer a m r
stream
    case Either r (a, Producer a m r)
e of
      Left   r
r      ->  forall (f :: * -> *) (m :: * -> *) r. r -> Stream f m r
SI.Return r
r
      Right (a
a, Producer a m r
p') -> 
        if a
a forall a. Eq a => a -> a -> Bool
/= a
t
          then forall (f :: * -> *) (m :: * -> *) r.
f (Stream f m r) -> Stream f m r
SI.Step forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Producer a m r -> Stream (Proxy X () () a m) m r
loop (forall (m :: * -> *) a x' x. Functor m => a -> Proxy x' x () a m ()
yield a
a forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall (m :: * -> *) a r.
Monad m =>
(a -> Bool) -> Producer a m r -> Producer a m (Producer a m r)
span (forall a. Eq a => a -> a -> Bool
/= a
t) Producer a m r
p')
          else Producer a m r -> Stream (Proxy X () () a m) m r
loop Producer a m r
p'
{-#INLINABLE split #-}

breaks :: (Eq a, Monad m) =>
      (a -> Bool) -> Producer a m r -> Stream (Producer a m) m r
breaks :: forall a (m :: * -> *) r.
(Eq a, Monad m) =>
(a -> Bool) -> Producer a m r -> Stream (Producer a m) m r
breaks a -> Bool
predicate  = forall {m :: * -> *} {r}.
Monad m =>
Producer a m r -> Stream (Proxy X () () a m) m r
loop  where
  loop :: Producer a m r -> Stream (Proxy X () () a m) m r
loop Producer a m r
stream = do
    Either r (a, Producer a m r)
e <- forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a r.
Monad m =>
Producer a m r -> m (Either r (a, Producer a m r))
next Producer a m r
stream
    case Either r (a, Producer a m r)
e of
      Left   r
r      ->  forall (f :: * -> *) (m :: * -> *) r. r -> Stream f m r
SI.Return r
r
      Right (a
a, Producer a m r
p') -> 
        if Bool -> Bool
not (a -> Bool
predicate a
a)
          then forall (f :: * -> *) (m :: * -> *) r.
f (Stream f m r) -> Stream f m r
SI.Step forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Producer a m r -> Stream (Proxy X () () a m) m r
loop (forall (m :: * -> *) a x' x. Functor m => a -> Proxy x' x () a m ()
yield a
a forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall (m :: * -> *) a r.
Monad m =>
(a -> Bool) -> Producer a m r -> Producer a m (Producer a m r)
break (a -> Bool
predicate) Producer a m r
p')
          else Producer a m r -> Stream (Proxy X () () a m) m r
loop Producer a m r
p'
{-#INLINABLE breaks #-}

{-| 'splitAt' divides a 'Producer' into two 'Producer's 
    after a fixed number of elements. Its inverse is 'Control.Monad.join'

-}
splitAt
    :: Monad m
    => Int -> Producer a m r -> Producer a m (Producer a m r)
splitAt :: forall (m :: * -> *) a r.
Monad m =>
Int -> Producer a m r -> Producer a m (Producer a m r)
splitAt = forall {t} {m :: * -> *} {a} {r} {x'} {x}.
(Ord t, Num t, Monad m) =>
t -> Producer a m r -> Proxy x' x () a m (Producer a m r)
loop where 
  loop :: t -> Producer a m r -> Proxy x' x () a m (Producer a m r)
loop t
n Producer a m r
p | t
n forall a. Ord a => a -> a -> Bool
<= t
0 = forall (m :: * -> *) a. Monad m => a -> m a
return Producer a m r
p
  loop t
n Producer a m r
p = do
    Either r (a, Producer a m r)
e <- forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (forall (m :: * -> *) a r.
Monad m =>
Producer a m r -> m (Either r (a, Producer a m r))
next Producer a m r
p)
    case Either r (a, Producer a m r)
e of
      Left   r
r      -> forall (m :: * -> *) a. Monad m => a -> m a
return (forall (m :: * -> *) a. Monad m => a -> m a
return r
r)
      Right (a
a, Producer a m r
p') -> forall (m :: * -> *) a x' x. Functor m => a -> Proxy x' x () a m ()
yield a
a forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> t -> Producer a m r -> Proxy x' x () a m (Producer a m r)
loop (t
n forall a. Num a => a -> a -> a
- t
1) Producer a m r
p'
{-# INLINABLE splitAt #-}

{-| 'groupBy' splits a 'Producer' into two 'Producer's; the second
     producer begins where we meet an element that is different
     according to the equality predicate. Its inverse is 'Control.Monad.join'
-}
groupBy
    :: Monad m
    => (a -> a -> Bool) -> Producer a m r -> Producer a m (Producer a m r)
groupBy :: forall (m :: * -> *) a r.
Monad m =>
(a -> a -> Bool) -> Producer a m r -> Producer a m (Producer a m r)
groupBy a -> a -> Bool
equals = forall {m :: * -> *} {a}.
Monad m =>
Producer a m a -> Proxy X () () a m (Producer a m a)
loop where
  loop :: Producer a m a -> Proxy X () () a m (Producer a m a)
loop Producer a m a
p = do
    Either a (a, Producer a m a)
x <- forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (forall (m :: * -> *) a r.
Monad m =>
Producer a m r -> m (Either r (a, Producer a m r))
next Producer a m a
p)
    case Either a (a, Producer a m a)
x of
      Left   a
r      -> forall (m :: * -> *) a. Monad m => a -> m a
return (forall (m :: * -> *) a. Monad m => a -> m a
return a
r)
      Right (a
a, Producer a m a
p') -> forall (m :: * -> *) a r.
Monad m =>
(a -> Bool) -> Producer a m r -> Producer a m (Producer a m r)
span (a -> a -> Bool
equals a
a) (forall (m :: * -> *) a x' x. Functor m => a -> Proxy x' x () a m ()
yield a
a forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Producer a m a
p') 
{-# INLINABLE groupBy #-}

-- | Like 'groupBy', where the equality predicate is ('==')
group
    :: (Monad m, Eq a) => Producer a m r -> Producer a m (Producer a m r)
group :: forall (m :: * -> *) a r.
(Monad m, Eq a) =>
Producer a m r -> Producer a m (Producer a m r)
group = forall (m :: * -> *) a r.
Monad m =>
(a -> a -> Bool) -> Producer a m r -> Producer a m (Producer a m r)
groupBy forall a. Eq a => a -> a -> Bool
(==)
{-# INLINABLE group #-}

{-| 'groupsBy' splits a 'Producer' into a 'Stream' of 'Producer's of equal items.
    Its inverse is 'concats'
-}
groupsBy
    :: Monad m
    => (a -> a -> Bool)
    -> Producer a m r -> Stream (Producer a m) m r 
groupsBy :: forall (m :: * -> *) a r.
Monad m =>
(a -> a -> Bool) -> Producer a m r -> Stream (Producer a m) m r
groupsBy a -> a -> Bool
equals = forall {m :: * -> *} {r}.
Monad m =>
Producer a m r -> Stream (Proxy X () () a m) m r
loop where
  loop :: Producer a m r -> Stream (Proxy X () () a m) m r
loop Producer a m r
p = forall (f :: * -> *) (m :: * -> *) r.
m (Stream f m r) -> Stream f m r
SI.Effect forall a b. (a -> b) -> a -> b
$ do
    Either r (a, Producer a m r)
e <- forall (m :: * -> *) a r.
Monad m =>
Producer a m r -> m (Either r (a, Producer a m r))
next Producer a m r
p
    forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ case Either r (a, Producer a m r)
e of
      Left   r
r      -> forall (f :: * -> *) (m :: * -> *) r. r -> Stream f m r
SI.Return r
r
      Right (a
a, Producer a m r
p') -> forall (f :: * -> *) (m :: * -> *) r.
f (Stream f m r) -> Stream f m r
SI.Step (forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Producer a m r -> Stream (Proxy X () () a m) m r
loop (forall (m :: * -> *) a x' x. Functor m => a -> Proxy x' x () a m ()
yield a
a forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall (m :: * -> *) a r.
Monad m =>
(a -> Bool) -> Producer a m r -> Producer a m (Producer a m r)
span (a -> a -> Bool
equals a
a) Producer a m r
p'))
{-# INLINABLE groupsBy #-}

{-| @groupsBy'@ splits a 'Producer' into a 'Stream' of 'Producer's grouped using
    the given relation. Its inverse is 'concats'

    This differs from 'groupsBy' by comparing successive elements
    instead of comparing each element to the first member of the group

>>> import Pipes (yield, each)
>>> import Pipes.Prelude (toList)
>>> let rel c1 c2 = succ c1 == c2
>>> (toList . intercalates (yield '|') . groupsBy' rel) (each "12233345")
"12|23|3|345"
>>> (toList . intercalates (yield '|') . groupsBy  rel) (each "12233345")
"122|3|3|34|5"
-}
groupsBy'
    :: Monad m
    => (a -> a -> Bool) -> Producer a m r -> Stream (Producer a m) m r
groupsBy' :: forall (m :: * -> *) a r.
Monad m =>
(a -> a -> Bool) -> Producer a m r -> Stream (Producer a m) m r
groupsBy' a -> a -> Bool
equals = forall {m :: * -> *} {a} {x'} {x}.
Monad m =>
Proxy X () () a m a -> Stream (Proxy x' x () a m) m a
loop where
  loop :: Proxy X () () a m a -> Stream (Proxy x' x () a m) m a
loop Proxy X () () a m a
p = forall (f :: * -> *) (m :: * -> *) r.
m (Stream f m r) -> Stream f m r
SI.Effect forall a b. (a -> b) -> a -> b
$ do
    Either a (a, Proxy X () () a m a)
e <- forall (m :: * -> *) a r.
Monad m =>
Producer a m r -> m (Either r (a, Producer a m r))
next Proxy X () () a m a
p
    forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ case Either a (a, Proxy X () () a m a)
e of
      Left   a
r      -> forall (f :: * -> *) (m :: * -> *) r. r -> Stream f m r
SI.Return a
r
      Right (a
a, Proxy X () () a m a
p') -> forall (f :: * -> *) (m :: * -> *) r.
f (Stream f m r) -> Stream f m r
SI.Step (forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Proxy X () () a m a -> Stream (Proxy x' x () a m) m a
loop (forall {m :: * -> *} {a} {x'} {x}.
Monad m =>
Producer a m a -> Proxy x' x () a m (Producer a m a)
loop0 (forall (m :: * -> *) a x' x. Functor m => a -> Proxy x' x () a m ()
yield a
a forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Proxy X () () a m a
p')))
  loop0 :: Producer a m a -> Proxy x' x () a m (Producer a m a)
loop0 Producer a m a
p1 = do
    Either a (a, Producer a m a)
e <- forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (forall (m :: * -> *) a r.
Monad m =>
Producer a m r -> m (Either r (a, Producer a m r))
next Producer a m a
p1)
    case Either a (a, Producer a m a)
e of
        Left   a
r      -> forall (m :: * -> *) a. Monad m => a -> m a
return (forall (m :: * -> *) a. Monad m => a -> m a
return a
r)
        Right (a
a2, Producer a m a
p2) -> do
            forall (m :: * -> *) a x' x. Functor m => a -> Proxy x' x () a m ()
yield a
a2
            let loop1 :: a -> Producer a m a -> Proxy x' x () a m (Producer a m a)
loop1 a
a Producer a m a
p = do
                    Either a (a, Producer a m a)
e' <- forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (forall (m :: * -> *) a r.
Monad m =>
Producer a m r -> m (Either r (a, Producer a m r))
next Producer a m a
p)
                    case Either a (a, Producer a m a)
e' of
                        Left   a
r      -> forall (m :: * -> *) a. Monad m => a -> m a
return (forall (m :: * -> *) a. Monad m => a -> m a
return a
r)
                        Right (a
a', Producer a m a
p') ->
                            if a -> a -> Bool
equals a
a a
a'
                            then do
                                forall (m :: * -> *) a x' x. Functor m => a -> Proxy x' x () a m ()
yield a
a'
                                a -> Producer a m a -> Proxy x' x () a m (Producer a m a)
loop1 a
a' Producer a m a
p'
                            else forall (m :: * -> *) a. Monad m => a -> m a
return (forall (m :: * -> *) a x' x. Functor m => a -> Proxy x' x () a m ()
yield a
a' forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Producer a m a
p')
            forall {m :: * -> *} {a} {x'} {x}.
Monad m =>
a -> Producer a m a -> Proxy x' x () a m (Producer a m a)
loop1 a
a2 Producer a m a
p2
{-# INLINABLE groupsBy' #-}

groups:: (Monad m, Eq a)
    =>  Producer a m r -> Stream (Producer a m) m r
groups :: forall (m :: * -> *) a r.
(Monad m, Eq a) =>
Producer a m r -> Stream (Producer a m) m r
groups = forall (m :: * -> *) a r.
Monad m =>
(a -> a -> Bool) -> Producer a m r -> Stream (Producer a m) m r
groupsBy forall a. Eq a => a -> a -> Bool
(==)


{-| 'chunksOf' splits a 'Producer' into a 'Stream' of 'Producer's of a given length.
    Its inverse is 'concats'.

>>> let listN n = L.purely P.folds L.list . P.chunksOf n
>>> runEffect $ listN 3 P.stdinLn >-> P.take 2 >-> P.map unwords >-> P.print
1<Enter>
2<Enter>
3<Enter>
"1 2 3"
4<Enter>
5<Enter>
6<Enter>
"4 5 6"
>>> let stylish = P.concats . P.maps (<* P.yield "-*-") . P.chunksOf 2
>>> runEffect $ stylish (P.each $ words "one two three four five six") >-> P.stdoutLn
one
two
-*-
three
four
-*-
five
six
-*-

-}

chunksOf
    :: Monad m => Int -> Producer a m r -> Stream (Producer a m) m r
chunksOf :: forall (m :: * -> *) a r.
Monad m =>
Int -> Producer a m r -> Stream (Producer a m) m r
chunksOf Int
n = forall {m :: * -> *} {a} {r}.
Monad m =>
Producer a m r -> Stream (Proxy X () () a m) m r
loop where
  loop :: Producer a m r -> Stream (Proxy X () () a m) m r
loop Producer a m r
p = forall (f :: * -> *) (m :: * -> *) r.
m (Stream f m r) -> Stream f m r
SI.Effect forall a b. (a -> b) -> a -> b
$ do
    Either r (a, Producer a m r)
e <- forall (m :: * -> *) a r.
Monad m =>
Producer a m r -> m (Either r (a, Producer a m r))
next Producer a m r
p
    forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ case Either r (a, Producer a m r)
e of
      Left   r
r      -> forall (f :: * -> *) (m :: * -> *) r. r -> Stream f m r
SI.Return r
r
      Right (a
a, Producer a m r
p') -> forall (f :: * -> *) (m :: * -> *) r.
f (Stream f m r) -> Stream f m r
SI.Step (forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Producer a m r -> Stream (Proxy X () () a m) m r
loop (forall (m :: * -> *) a r.
Monad m =>
Int -> Producer a m r -> Producer a m (Producer a m r)
splitAt Int
n (forall (m :: * -> *) a x' x. Functor m => a -> Proxy x' x () a m ()
yield a
a forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Producer a m r
p')))
{-# INLINABLE chunksOf #-}

-- | Join a stream of 'Producer's into a single 'Producer'
concats :: Monad m => Stream (Producer a m) m r -> Producer a m r
concats :: forall (m :: * -> *) a r.
Monad m =>
Stream (Producer a m) m r -> Producer a m r
concats = forall {m :: * -> *} {a'} {a} {b'} {b} {a}.
Monad m =>
Stream (Proxy a' a b' b m) m a -> Proxy a' a b' b m a
loop where
  loop :: Stream (Proxy a' a b' b m) m a -> Proxy a' a b' b m a
loop Stream (Proxy a' a b' b m) m a
stream = case Stream (Proxy a' a b' b m) m a
stream of
    SI.Return a
r -> forall (m :: * -> *) a. Monad m => a -> m a
return a
r
    SI.Effect m (Stream (Proxy a' a b' b m) m a)
m -> forall a' a b' b (m :: * -> *) r.
m (Proxy a' a b' b m r) -> Proxy a' a b' b m r
PI.M forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a1 r. Monad m => (a1 -> r) -> m a1 -> m r
liftM Stream (Proxy a' a b' b m) m a -> Proxy a' a b' b m a
loop m (Stream (Proxy a' a b' b m) m a)
m
    SI.Step Proxy a' a b' b m (Stream (Proxy a' a b' b m) m a)
p -> do 
      Stream (Proxy a' a b' b m) m a
rest <- Proxy a' a b' b m (Stream (Proxy a' a b' b m) m a)
p
      Stream (Proxy a' a b' b m) m a -> Proxy a' a b' b m a
loop Stream (Proxy a' a b' b m) m a
rest 
{-# INLINABLE concats #-}

-- {-| Join a 'FreeT'-delimited stream of 'Producer's into a single 'Producer' by
--     intercalating a 'Producer' in between them
-- -}
-- intercalates
--     :: Monad m => Producer a m () -> Stream (Producer a m) m r -> Producer a m r
-- intercalates sep = loop where
--   loop stream = case stream of
--
--       x <- lift (runFreeT f)
--       case x of
--           Pure r -> return r
--           Free p -> do
--               f' <- p
--               go1 f'
--   go1 f = do
--       x <- lift (runFreeT f)
--       case x of
--           Pure r -> return r
--           Free p -> do
--               sep
--               f' <- p
--               go1 f'
-- {-# INLINABLE intercalates #-}

{- $folds
    These folds are designed to be compatible with the @foldl@ library.  See
    the 'Control.Foldl.purely' and 'Control.Foldl.impurely' functions from that
    library for more details.

    For example, to count the number of 'Producer' layers in a 'Stream', you can
    write:

> import Control.Applicative (pure)
> import qualified Control.Foldl as L
> import Pipes.Group
> import qualified Pipes.Prelude as P
>
> count :: Monad m => Stream (Producer a m) m () -> m Int
> count = P.sum . L.purely folds (pure 1)
-}

{-| Fold each 'Producer' in a producer 'Stream'

> purely folds
>     :: Monad m => Fold a b -> Stream (Producer a m) m r -> Producer b m r
-}

folds
    :: Monad m
    => (x -> a -> x)
    -- ^ Step function
    -> x
    -- ^ Initial accumulator
    -> (x -> b)
    -- ^ Extraction function
    -> Stream (Producer a m) m r
    -- ^
    -> Producer b m r
folds :: forall (m :: * -> *) x a b r.
Monad m =>
(x -> a -> x)
-> x -> (x -> b) -> Stream (Producer a m) m r -> Producer b m r
folds x -> a -> x
step x
begin x -> b
done = forall {m :: * -> *} {a} {a'} {a}.
Monad m =>
Stream (Proxy X () () a m) m a -> Proxy a' a () b m a
loop where
  loop :: Stream (Proxy X () () a m) m a -> Proxy a' a () b m a
loop Stream (Proxy X () () a m) m a
stream = case Stream (Proxy X () () a m) m a
stream of 
    SI.Return a
r -> forall (m :: * -> *) a. Monad m => a -> m a
return a
r
    SI.Effect m (Stream (Proxy X () () a m) m a)
m  -> forall a' a b' b (m :: * -> *) r.
m (Proxy a' a b' b m r) -> Proxy a' a b' b m r
PI.M forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a1 r. Monad m => (a1 -> r) -> m a1 -> m r
liftM Stream (Proxy X () () a m) m a -> Proxy a' a () b m a
loop m (Stream (Proxy X () () a m) m a)
m
    SI.Step Proxy X () () a m (Stream (Proxy X () () a m) m a)
p   -> do
        (Stream (Proxy X () () a m) m a
stream', b
b) <- forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (forall {m :: * -> *} {a}.
Monad m =>
Producer a m a -> x -> m (a, b)
fold Proxy X () () a m (Stream (Proxy X () () a m) m a)
p x
begin)
        forall (m :: * -> *) a x' x. Functor m => a -> Proxy x' x () a m ()
yield b
b
        Stream (Proxy X () () a m) m a -> Proxy a' a () b m a
loop Stream (Proxy X () () a m) m a
stream'
  fold :: Producer a m a -> x -> m (a, b)
fold Producer a m a
p x
x = do
      Either a (a, Producer a m a)
y <- forall (m :: * -> *) a r.
Monad m =>
Producer a m r -> m (Either r (a, Producer a m r))
next Producer a m a
p
      case Either a (a, Producer a m a)
y of
          Left   a
f      -> forall (m :: * -> *) a. Monad m => a -> m a
return (a
f, x -> b
done x
x)
          Right (a
a, Producer a m a
p') -> Producer a m a -> x -> m (a, b)
fold Producer a m a
p' forall a b. (a -> b) -> a -> b
$! x -> a -> x
step x
x a
a
{-# INLINABLE folds #-}



{-| Fold each 'Producer' in a 'Producer' stream, monadically

> impurely foldsM
>     :: Monad m => FoldM a b -> Stream (Producer a m) m r -> Producer b m r
-}
foldsM
    :: Monad m
    => (x -> a -> m x)
    -- ^ Step function
    -> m x
    -- ^ Initial accumulator
    -> (x -> m b)
    -- ^ Extraction function
    -> Stream (Producer a m) m r
    -- ^
    -> Producer b m r
foldsM :: forall (m :: * -> *) x a b r.
Monad m =>
(x -> a -> m x)
-> m x -> (x -> m b) -> Stream (Producer a m) m r -> Producer b m r
foldsM x -> a -> m x
step m x
begin x -> m b
done = forall {a} {a'} {a}.
Stream (Proxy X () () a m) m a -> Proxy a' a () b m a
loop where
  loop :: Stream (Proxy X () () a m) m a -> Proxy a' a () b m a
loop Stream (Proxy X () () a m) m a
stream = case Stream (Proxy X () () a m) m a
stream of 
    SI.Return a
r -> forall (m :: * -> *) a. Monad m => a -> m a
return a
r
    SI.Effect m (Stream (Proxy X () () a m) m a)
m -> forall a' a b' b (m :: * -> *) r.
m (Proxy a' a b' b m r) -> Proxy a' a b' b m r
PI.M (forall (m :: * -> *) a1 r. Monad m => (a1 -> r) -> m a1 -> m r
liftM Stream (Proxy X () () a m) m a -> Proxy a' a () b m a
loop m (Stream (Proxy X () () a m) m a)
m)
    SI.Step Proxy X () () a m (Stream (Proxy X () () a m) m a)
p -> do
      (Stream (Proxy X () () a m) m a
f', b
b) <- forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift forall a b. (a -> b) -> a -> b
$ m x
begin forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>=  forall {a}. Producer a m a -> x -> m (a, b)
foldM Proxy X () () a m (Stream (Proxy X () () a m) m a)
p 
      forall (m :: * -> *) a x' x. Functor m => a -> Proxy x' x () a m ()
yield b
b
      Stream (Proxy X () () a m) m a -> Proxy a' a () b m a
loop Stream (Proxy X () () a m) m a
f'

  foldM :: Producer a m a -> x -> m (a, b)
foldM Producer a m a
p x
x = do
    Either a (a, Producer a m a)
y <- forall (m :: * -> *) a r.
Monad m =>
Producer a m r -> m (Either r (a, Producer a m r))
next Producer a m a
p
    case Either a (a, Producer a m a)
y of
      Left   a
f      -> do
          b
b <- x -> m b
done x
x
          forall (m :: * -> *) a. Monad m => a -> m a
return (a
f, b
b)
      Right (a
a, Producer a m a
p') -> do
          x
x' <- x -> a -> m x
step x
x a
a
          Producer a m a -> x -> m (a, b)
foldM Producer a m a
p' forall a b. (a -> b) -> a -> b
$! x
x'
{-# INLINABLE foldsM #-}

{-| @(takes' n)@ only keeps the first @n@ 'Producer's of a linked 'Stream' of 'Producers'

    Unlike 'takes', 'takes'' is not functor-general - it is aware that a 'Producer'
    can be /drained/, as functors cannot generally be. Here, then, we drain 
    the unused 'Producer's in order to preserve the return value.  
    This makes it a suitable argument for 'maps'.
-}
takes' :: Monad m => Int -> Stream (Producer a m) m r -> Stream (Producer a m) m r
takes' :: forall (m :: * -> *) a r.
Monad m =>
Int -> Stream (Producer a m) m r -> Stream (Producer a m) m r
takes' = forall {t} {m :: * -> *} {b} {r}.
(Ord t, Num t, Monad m) =>
t
-> Stream (Proxy X () () b m) m r -> Stream (Proxy X () () b m) m r
loop where
  
  loop :: t
-> Stream (Proxy X () () b m) m r -> Stream (Proxy X () () b m) m r
loop !t
n Stream (Proxy X () () b m) m r
stream | t
n forall a. Ord a => a -> a -> Bool
<= t
0 = forall {m :: * -> *} {b} {r} {f :: * -> *}.
Monad m =>
Stream (Proxy X () () b m) m r -> Stream f m r
drain_loop Stream (Proxy X () () b m) m r
stream
  loop t
n Stream (Proxy X () () b m) m r
stream = case Stream (Proxy X () () b m) m r
stream of
    SI.Return r
r -> forall (f :: * -> *) (m :: * -> *) r. r -> Stream f m r
SI.Return r
r
    SI.Effect  m (Stream (Proxy X () () b m) m r)
m -> forall (f :: * -> *) (m :: * -> *) r.
m (Stream f m r) -> Stream f m r
SI.Effect (forall (m :: * -> *) a1 r. Monad m => (a1 -> r) -> m a1 -> m r
liftM (t
-> Stream (Proxy X () () b m) m r -> Stream (Proxy X () () b m) m r
loop t
n) m (Stream (Proxy X () () b m) m r)
m)
    SI.Step Proxy X () () b m (Stream (Proxy X () () b m) m r)
p   -> forall (f :: * -> *) (m :: * -> *) r.
f (Stream f m r) -> Stream f m r
SI.Step  (forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (t
-> Stream (Proxy X () () b m) m r -> Stream (Proxy X () () b m) m r
loop (t
n forall a. Num a => a -> a -> a
- t
1)) Proxy X () () b m (Stream (Proxy X () () b m) m r)
p)

  drain_loop :: Stream (Proxy X () () b m) m r -> Stream f m r
drain_loop Stream (Proxy X () () b m) m r
stream = case Stream (Proxy X () () b m) m r
stream of
    SI.Return r
r -> forall (f :: * -> *) (m :: * -> *) r. r -> Stream f m r
SI.Return r
r
    SI.Effect  m (Stream (Proxy X () () b m) m r)
m -> forall (f :: * -> *) (m :: * -> *) r.
m (Stream f m r) -> Stream f m r
SI.Effect (forall (m :: * -> *) a1 r. Monad m => (a1 -> r) -> m a1 -> m r
liftM Stream (Proxy X () () b m) m r -> Stream f m r
drain_loop m (Stream (Proxy X () () b m) m r)
m)
    SI.Step Proxy X () () b m (Stream (Proxy X () () b m) m r)
p   -> forall (f :: * -> *) (m :: * -> *) r.
m (Stream f m r) -> Stream f m r
SI.Effect forall a b. (a -> b) -> a -> b
$ do 
      Stream (Proxy X () () b m) m r
stream' <- forall (m :: * -> *) r. Monad m => Effect m r -> m r
runEffect (forall (m :: * -> *) x' x b' b a' c' c.
Functor m =>
Proxy x' x b' b m a'
-> (b -> Proxy x' x c' c m b') -> Proxy x' x c' c m a'
P.for Proxy X () () b m (Stream (Proxy X () () b m) m r)
p forall (m :: * -> *) a. Monad m => a -> m ()
P.discard)
      forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ Stream (Proxy X () () b m) m r -> Stream f m r
drain_loop Stream (Proxy X () () b m) m r
stream'
{-# INLINABLE takes' #-}