{-# LANGUAGE RankNTypes, StandaloneDeriving,DeriveDataTypeable, BangPatterns #-}
{-# LANGUAGE UndecidableInstances, CPP, FlexibleInstances, MultiParamTypeClasses  #-} 
module Streaming.Internal (
    -- * The free monad transformer
    -- $stream
    Stream (..)
    
    -- * Introducing a stream
    , unfold 
    , replicates
    , repeats
    , repeatsM
    , effect
    , wrap
    , yields
    , streamBuild 
    , cycles
    
    -- * Eliminating a stream
    , intercalates 
    , concats 
    , iterT 
    , iterTM 
    , destroy 
    , streamFold
    
    -- * Inspecting a stream wrap by wrap
    , inspect 
    
    -- * Transforming streams
    , maps 
    , mapsM 
    , mapped
    , decompose
    , mapsM_
    , run
    , distribute
    , groups
--    , groupInL
    
    -- *  Splitting streams
    , chunksOf 
    , splitsAt
    , takes
    -- , period
    -- , periods
    
    -- * Zipping and unzipping streams
    , zipsWith
    , zips
    , unzips
    , interleaves
    , separate
    , unseparate

    
    -- * Assorted Data.Functor.x help
    
    , switch
    
    -- * ResourceT help
    
    , bracketStream
    
    -- *  For use in implementation
    , unexposed
    , hoistExposed
    , mapsExposed
    , mapsMExposed
    , destroyExposed
    
   ) where

import Control.Monad
import Control.Monad.Trans
import Control.Monad.Trans.Class
import Control.Applicative
import Data.Foldable ( Foldable(..) )
import Data.Traversable
import Control.Monad.Morph
import Data.Monoid (Monoid (..), (<>))
import Data.Functor.Identity
import GHC.Exts ( build )
import Data.Data ( Data, Typeable )
import Prelude hiding (splitAt)
import Data.Functor.Compose
import Data.Functor.Sum
-- import Data.Time (getCurrentTime, diffUTCTime, picosecondsToDiffTime, addUTCTime)

import Control.Monad.Base
import Control.Monad.Trans.Resource
import Control.Monad.Catch (MonadCatch (..))

{- $stream

    The 'Stream' data type is equivalent to @FreeT@ and can represent any effectful
    succession of steps, where the form of the steps or 'commands' is 
    specified by the first (functor) parameter. 

> data Stream f m r = Step !(f (Stream f m r)) | Effect (m (Stream f m r)) | Return r

    The /producer/ concept uses the simple functor @ (a,_) @ \- or the stricter 
    @ Of a _ @. Then the news at each step or layer is just: an individual item of type @a@. 
    Since @Stream (Of a) m r@ is equivalent to @Pipe.Producer a m r@, much of
    the @pipes@ @Prelude@ can easily be mirrored in a @streaming@ @Prelude@. Similarly, 
    a simple @Consumer a m r@ or @Parser a m r@ concept arises when the base functor is
    @ (a -> _) @ . @Stream ((->) input) m result@ consumes @input@ until it returns a 
    @result@.

    To avoid breaking reasoning principles, the constructors 
    should not be used directly. A pattern-match should go by way of 'inspect' \
    \- or, in the producer case, 'Streaming.Prelude.next'
    The constructors are exported by the 'Internal' module.
-}
data Stream f m r = Step !(f (Stream f m r))
                  | Effect (m (Stream f m r))
                  | Return r
#if __GLASGOW_HASKELL__ >= 710
                  deriving (Typeable)
#endif
deriving instance (Show r, Show (m (Stream f m r))
                  , Show (f (Stream f m r))) => Show (Stream f m r)
deriving instance (Eq r, Eq (m (Stream f m r))
                  , Eq (f (Stream f m r))) => Eq (Stream f m r)
#if __GLASGOW_HASKELL__ >= 710
deriving instance (Typeable f, Typeable m, Data r, Data (m (Stream f m r))
                  , Data (f (Stream f m r))) => Data (Stream f m r)
#endif
instance (Functor f, Monad m) => Functor (Stream f m) where
  fmap f = loop where
    loop stream = case stream of
      Return r -> Return (f r)
      Effect m  -> Effect (do {stream' <- m; return (loop stream')})
      Step f   -> Step (fmap loop f)
  {-# INLINABLE fmap #-}
  a <$ stream0 = loop stream0 where
    loop stream = case stream of
      Return r -> Return a
      Effect m  -> Effect (do {stream' <- m; return (loop stream')})
      Step f    -> Step (fmap loop f)
  {-# INLINABLE (<$) #-}    
  
instance (Functor f, Monad m) => Monad (Stream f m) where
  return = Return
  {-# INLINE return #-}
  stream1 >> stream2 = loop stream1 where
    loop stream = case stream of
      Return _ -> stream2
      Effect m  -> Effect (liftM loop m)
      Step f   -> Step (fmap loop f)    
  {-# INLINABLE (>>) #-}
  (>>=) = _bind
  {-#INLINE (>>=) #-}
  
  -- stream >>= f = 
  --   loop stream where
  --   loop stream0 = case stream0 of
  --     Step fstr -> Step (fmap loop fstr)
  --     Effect m   -> Effect (liftM loop m)
  --     Return r  -> f r
  -- {-# INLINABLE (>>=) #-}                         

  fail = lift . fail
  {-#INLINE fail #-}
  

_bind
    :: (Functor f, Monad m)
    => Stream f m r
    -> (r -> Stream f m s)
    -> Stream f m s
_bind p0 f = go p0 where
    go p = case p of
      Step fstr  -> Step (fmap go fstr)
      Effect m   -> Effect (m >>= \s -> return (go s))
      Return r  -> f r
{-#INLINABLE[0] _bind #-}
      
{-# RULES
    "_bind (Step    fstr) f" forall  fstr f .
        _bind (Step fstr) f = Step (fmap (\p -> _bind p f) fstr);
    "_bind (Effect      m) f" forall m    f .
        _bind (Effect   m) f = Effect (m >>= \p -> return (_bind p f));
    "_bind (Return     r) f" forall r    f .
        _bind (Return  r) f = f r;
  #-}

  
  
instance (Functor f, Monad m) => Applicative (Stream f m) where
  pure = Return
  {-# INLINE pure #-}
  streamf <*> streamx = do {f <- streamf; x <- streamx; return (f x)}   
  {-# INLINE (<*>) #-}    
  -- stra0 *> strb = loop stra0 where
  --   loop stra = case stra of
  --     Return _ -> strb
  --     Effect m  -> Effect (do {stra' <- m ; return (stra' *> strb)})
  --     Step fstr -> Step (fmap (*> strb) fstr)
  -- {-# INLINABLE (*>) #-}
  -- stra <* strb0 = loop strb0 where
  --   loop strb = case strb of
  --     Return _ -> stra
  --     Effect m  -> Effect (do {strb' <- m ; return (stra <* strb')})
  --     Step fstr -> Step (fmap (stra <*) fstr)
  -- {-# INLINABLE (<*) #-}
  --
instance Functor f => MonadTrans (Stream f) where
  lift = Effect . liftM Return
  {-# INLINE lift #-}

instance Functor f => MFunctor (Stream f) where
  hoist trans = loop . unexposed where
    loop stream = case stream of 
      Return r  -> Return r
      Effect m   -> Effect (trans (liftM loop m))
      Step f    -> Step (fmap loop f)
  {-# INLINABLE hoist #-}    

instance Functor f => MMonad (Stream f) where
  embed phi = loop where
    loop stream = case stream of
      Return r -> Return r
      Effect  m -> phi m >>= loop
      Step   f -> Step (fmap loop f)
  {-# INLINABLE embed #-}   

instance (MonadIO m, Functor f) => MonadIO (Stream f m) where
  liftIO = Effect . liftM Return . liftIO
  {-# INLINE liftIO #-}
  
instance (MonadBase b m, Functor f) => MonadBase b (Stream f m) where
  liftBase  = effect . fmap return . liftBase
  {-#INLINE liftBase #-}
  
instance (MonadThrow m, Functor f) => MonadThrow (Stream f m) where
  throwM = lift . throwM 
  {-#INLINE throwM #-}
  


instance (MonadCatch m, Functor f) => MonadCatch (Stream f m) where
  catch str f = go str
    where
    go p = case p of
      Step f      -> Step (fmap go f)
      Return  r   -> Return r
      Effect  m   -> Effect (catch (do
          p' <- m
          return (go p'))  
       (\e -> return (f e)) )
  {-#INLINABLE catch #-}
       
instance (MonadResource m, Functor f) => MonadResource (Stream f m) where
  liftResourceT = lift . liftResourceT
  {-#INLINE liftResourceT #-}
  
bracketStream :: (Functor f, MonadResource m) =>
       IO a -> (a -> IO ()) -> (a -> Stream f m b) -> Stream f m b
bracketStream alloc free inside = do
        (key, seed) <- lift (allocate alloc free)
        clean key (inside seed)
  where
    clean key = loop where
      loop str = case str of 
        Return r -> Effect (release key >> return (Return r))
        Effect m -> Effect (liftM loop m)
        Step f   -> Step (fmap loop f)
{-#INLINABLE bracketStream #-}
        
{-| Map a stream directly to its church encoding; compare @Data.List.foldr@
-}
destroy
  :: (Functor f, Monad m) =>
     Stream f m r -> (f b -> b) -> (m b -> b) -> (r -> b) -> b
destroy stream0 construct effect done = loop (unexposed stream0) where
  loop stream = case stream of
    Return r -> done r
    Effect m  -> effect (liftM loop m)
    Step fs  -> construct (fmap loop fs)
{-# INLINABLE destroy #-}


{-| 'streamFold' reorders the arguments of 'destroy' to be more akin
    to @foldr@  It is more convenient to query in ghci to figure out
    what kind of \'algebra\' you need to write. 

>>> :t streamFold return join 
(Monad m, Functor f) => 
     (f (m a) -> m a) -> Stream f m a -> m a        -- iterT

>>> :t streamFold return (join . lift)
(Monad m, Monad (t m), Functor f, MonadTrans t) =>
     (f (t m a) -> t m a) -> Stream f m a -> t m a  -- iterTM

>>> :t streamFold return effect 
(Monad m, Functor f, Functor g) =>
     (f (Stream g m r) -> Stream g m r) -> Stream f m r -> Stream g m r

>>> :t \f -> streamFold return effect (wrap . f)
(Monad m, Functor f, Functor g) =>
     (f (Stream g m a) -> g (Stream g m a))
     -> Stream f m a -> Stream g m a                 -- maps

>>> :t \f -> streamFold return effect (effect . liftM wrap . f)
(Monad m, Functor f, Functor g) =>
     (f (Stream g m a) -> m (g (Stream g m a)))
     -> Stream f m a -> Stream g m a                 -- mapped

    So for example, when we realize that

>>> :t streamFold return Q.mwrap 
(Monad m, Functor f) =>
   (f (Q.ByteString m a) -> Q.ByteString m a)
   -> Stream f m a -> Q.ByteString m a

   it is easy to see how to write @fromChunks@:

>>> streamFold return Q.mwrap (\(a:>b) -> Q.chunk a >>  b)
Monad m => Stream (Of B.ByteString) m a -> Q.ByteString m a -- fromChunks
-}
streamFold
  :: (Functor f, Monad m) =>
     (r -> b) -> (m b -> b) ->  (f b -> b) -> Stream f m r -> b
streamFold done effect construct stream  = destroy stream construct effect done
{-#INLINE streamFold #-}

{- | Reflect a church-encoded stream; cp. @GHC.Exts.build@

> destroy a b c (streamBuild psi)  = 
-}
streamBuild
  :: (forall b . (f b -> b) -> (m b -> b) -> (r -> b) -> b) ->  Stream f m r
streamBuild = \phi -> phi Step Effect Return
{-# INLINE streamBuild #-}


{-| Inspect the first stage of a freely layered sequence. 
    Compare @Pipes.next@ and the replica @Streaming.Prelude.next@. 
    This is the 'uncons' for the general 'unfold'.

> unfold inspect = id
> Streaming.Prelude.unfoldr StreamingPrelude.next = id
-}
inspect :: (Functor f, Monad m) =>
     Stream f m r -> m (Either r (f (Stream f m r)))
inspect = loop where
  loop stream = case stream of
    Return r -> return (Left r)
    Effect m  -> m >>= loop
    Step fs  -> return (Right fs)
{-# INLINABLE inspect #-}
    
{-| Build a @Stream@ by unfolding steps starting from a seed. See also
    the specialized 'Streaming.Prelude.unfoldr' in the prelude.

> unfold inspect = id -- modulo the quotient we work with
> unfold Pipes.next :: Monad m => Producer a m r -> Stream ((,) a) m r
> unfold (curry (:>) . Pipes.next) :: Monad m => Producer a m r -> Stream (Of a) m r

-}
unfold :: (Monad m, Functor f) 
        => (s -> m (Either r (f s))) 
        -> s -> Stream f m r
unfold step = loop where
  loop s0 = Effect $ do 
    e <- step s0
    case e of
      Left r -> return (Return r)
      Right fs -> return (Step (fmap loop fs))
{-# INLINABLE unfold #-}


{- | Map layers of one functor to another with a transformation. Compare
     hoist, which has a similar effect on the 'monadic' parameter. 

> maps id = id
> maps f . maps g = maps (f . g)

-}
maps :: (Monad m, Functor f) 
     => (forall x . f x -> g x) -> Stream f m r -> Stream g m r
maps phi = loop where
  loop stream = case stream of 
    Return r  -> Return r
    Effect m   -> Effect (liftM loop m)
    Step f    -> Step (phi (fmap loop f))
{-# INLINABLE maps #-}


{- | Map layers of one functor to another with a transformation involving the base monad
     @maps@ is more fundamental than @mapsM@, which is best understood as a convenience
     for effecting this frequent composition:

> mapsM phi = decompose . maps (Compose . phi)
-}
mapsM :: (Monad m, Functor f) => (forall x . f x -> m (g x)) -> Stream f m r -> Stream g m r
mapsM phi = loop where
  loop stream = case stream of 
    Return r  -> Return r
    Effect m   -> Effect (liftM loop m)
    Step f    -> Effect (liftM Step (phi (fmap loop f)))
{-# INLINABLE mapsM #-}

{- | Map layers of one functor to another with a transformation involving the base monad
     @maps@ is more fundamental than @mapped@, which is best understood as a convenience
     for effecting this frequent composition:

> mapped = mapsM 
> mapsM phi = decompose . maps (Compose . phi)  

     @mapped@ obeys these rules:

> mapped return       = id
> mapped f . mapped g = mapped (f <=< g)
> map f . mapped g    = mapped (liftM f . g)
> mapped f . map g    = mapped (f . g)

-}

mapped :: (Monad m, Functor f) => (forall x . f x -> m (g x)) -> Stream f m r -> Stream g m r
mapped = mapsM
{-#INLINE mapped #-}

{-| Resort a succession of layers of the form @m (f x)@. Though @mapsM@ 
    is best understood as:

> mapsM phi = decompose . maps (Compose . phi)

   we could as well define @decompose@ by @mapsM@:

> decompose = mapsM getCompose

-}
decompose :: (Monad m, Functor f) => Stream (Compose m f) m r -> Stream f m r
decompose = loop where
  loop stream = case stream of 
    Return r -> Return r 
    Effect m ->  Effect (liftM loop m)
    Step (Compose mstr) -> Effect $ do
      str <- mstr
      return (Step (fmap loop str))


{-| Run the effects in a stream that merely layers effects.
-}
run :: Monad m => Stream m m r -> m r
run = loop where
  loop stream = case stream of
    Return r -> return r
    Effect  m -> m >>= loop
    Step mrest -> mrest >>= loop
{-# INLINABLE run #-}


{-| Map each layer to an effect, and run them all.
-}
mapsM_ :: (Functor f, Monad m) => (forall x . f x -> m x) -> Stream f m r -> m r
mapsM_ f = run . maps f 
{-# INLINE mapsM_ #-}


{-| Interpolate a layer at each segment. This specializes to e.g.

> intercalates :: (Monad m, Functor f) => Stream f m () -> Stream (Stream f m) m r -> Stream f m r
-}
intercalates :: (Monad m, Monad (t m), MonadTrans t) =>
     t m x -> Stream (t m) m r -> t m r
intercalates sep = go0
  where
    go0 f = case f of 
      Return r -> return r 
      Effect m -> lift m >>= go0 
      Step fstr -> do
                f' <- fstr
                go1 f'
    go1 f = case f of 
      Return r -> return r 
      Effect m     -> lift m >>= go1
      Step fstr ->  do
                sep
                f' <- fstr
                go1 f'
{-# INLINABLE intercalates #-}

{-| Specialized fold following the usage of @Control.Monad.Trans.Free@

> iterTM alg = streamFold return (join . lift)
-}
iterTM ::
  (Functor f, Monad m, MonadTrans t,
   Monad (t m)) =>
  (f (t m a) -> t m a) -> Stream f m a -> t m a
iterTM out stream = destroyExposed stream out (join . lift) return
{-# INLINE iterTM #-}

{-| Specialized fold following the usage of @Control.Monad.Trans.Free@

> iterT alg = streamFold return join alg 
-}
iterT ::
  (Functor f, Monad m) => (f (m a) -> m a) -> Stream f m a -> m a
iterT out stream = destroyExposed stream out join return
{-# INLINE iterT #-}

{-| Dissolves the segmentation into layers of @Stream f m@ layers.

-}
concats :: (Monad m, Functor f) => Stream (Stream f m) m r -> Stream f m r
concats  = loop where
  loop stream = case stream of
    Return r -> return r
    Effect m  -> join $ lift (liftM loop m)
    Step fs  -> join (fmap loop fs)
{-# INLINE concats #-}

{-| Split a succession of layers after some number, returning a streaming or
    effectful pair.

>>> rest <- S.print $ S.splitAt 1 $ each [1..3]
1
>>> S.print rest
2
3

> splitAt 0 = return
> splitAt n >=> splitAt m = splitAt (m+n)

    Thus, e.g. 

>>> rest <- S.print $ splitsAt 2 >=> splitsAt 2 $ each [1..5]
1
2
3
4
>>> S.print rest
5

-}
splitsAt :: (Monad m, Functor f) => Int -> Stream f m r -> Stream f m (Stream f m r)
splitsAt = loop where
  loop !n stream 
    | n <= 0 = Return stream
    | otherwise = case stream of
        Return r       -> Return (Return r)
        Effect m        -> Effect (liftM (loop n) m)
        Step fs        -> case n of 
          0 -> Return (Step fs)
          _ -> Step (fmap (loop (n-1)) fs)
{-# INLINABLE splitsAt #-}  

{- Functor-general take. 

   @takes 3@ can take three individual values

>>> S.print $ takes 3 $ each [1..]
1
2
3


    or three sub-streams

>>> S.print $ mapped S.toList $ takes 3 $ chunksOf 2 $ each [1..]
[1,2]
[3,4]
[5,6]

   Or, using 'Data.ByteString.Streaming.Char' (here called @Q@),
   three byte streams.

>>> Q.stdout $ Q.unlines $ takes 3 $ Q.lines $ Q.chunk "a\nb\nc\nd\ne\nf"
a
b
c

-}
takes :: (Monad m, Functor f) => Int -> Stream f m r -> Stream f m ()
takes n = void . splitsAt n
{-# INLINE takes #-}                        

{-| Break a stream into substreams each with n functorial layers. 

>>>  S.print $ mapped S.sum $ chunksOf 2 $ each [1,1,1,1,1]
2
2
1
-}
chunksOf :: (Monad m, Functor f) => Int -> Stream f m r -> Stream (Stream f m) m r
chunksOf n0 = loop where
  loop stream = case stream of
    Return r  -> Return r
    Effect m  -> Effect (liftM loop m)
    Step fs   -> Step (Step (fmap (fmap loop . splitsAt (n0-1)) fs))
{-# INLINABLE chunksOf #-}          

{- | Make it possible to \'run\' the underlying transformed monad. 
-}
distribute :: (Monad m, Functor f, MonadTrans t, MFunctor t, Monad (t (Stream f m)))
           => Stream f (t m) r -> t (Stream f m) r
distribute = loop where
  loop stream = case stream of 
    Return r    -> lift $ Return r
    Effect tmstr -> hoist lift tmstr >>= distribute
    Step fstr   -> join $ lift (Step (fmap (Return . distribute) fstr))
    
-- | Repeat a functorial layer, command or instruction forever.
repeats :: (Monad m, Functor f) => f () -> Stream f m r 
repeats f = loop where
  loop = Step $ fmap (\_ -> loop) f

-- Repeat a functorial layer, command or instruction forever.
repeatsM :: (Monad m, Functor f) => m (f ()) -> Stream f m r 
repeatsM mf = loop where
  loop = Effect $ do
     f <- mf
     return $ Step $ fmap (\_ -> loop) f

-- | Repeat a functorial layer, command or instruct several times.
replicates :: (Monad m, Functor f) => Int -> f () -> Stream f m ()
replicates n f = splitsAt n (repeats f) >> return ()

{-| Construct an infinite stream by cycling a finite one

> cycles = forever

>>> 
-}
 
cycles :: (Monad m, Functor f) =>  Stream f m () -> Stream f m r
cycles = forever



hoistExposed trans = loop where
  loop stream = case stream of 
    Return r  -> Return r
    Effect m   -> Effect (trans (liftM loop m))
    Step f    -> Step (fmap loop f)

mapsExposed :: (Monad m, Functor f) 
     => (forall x . f x -> g x) -> Stream f m r -> Stream g m r
mapsExposed phi = loop where
  loop stream = case stream of 
    Return r  -> Return r
    Effect m   -> Effect (liftM loop m)
    Step f    -> Step (phi (fmap loop f))
{-# INLINABLE mapsExposed #-}

mapsMExposed phi = loop where
  loop stream = case stream of 
    Return r  -> Return r
    Effect m   -> Effect (liftM loop m)
    Step f    -> Effect (liftM Step (phi (fmap loop f)))
{-# INLINABLE mapsMExposed #-}

--     Map a stream directly to its church encoding; compare @Data.List.foldr@
--     It permits distinctions that should be hidden, as can be seen from
--     e.g.
--
-- isPure stream = destroy (const True) (const False) (const True)
--
--     and similar nonsense.  The crucial
--     constraint is that the @m x -> x@ argument is an /Eilenberg-Moore algebra/.
--     See Atkey "Reasoning about Stream Processing with Effects"


destroyExposed stream0 construct effect done = loop stream0 where
  loop stream = case stream of
    Return r -> done r
    Effect m  -> effect (liftM loop m)
    Step fs  -> construct (fmap loop fs)
{-# INLINABLE destroyExposed #-}


{-| This is akin to the @observe@ of @Pipes.Internal@ . It reeffects the layering
    in instances of @Stream f m r@ so that it replicates that of 
    @FreeT@. 

-}
unexposed :: (Functor f, Monad m) => Stream f m r -> Stream f m r
unexposed = Effect . loop where
  loop stream = case stream of 
    Return r -> return (Return r)
    Effect  m -> m >>= loop
    Step   f -> return (Step (fmap (Effect . loop) f))
{-# INLINABLE unexposed #-}   



effect :: (Monad m, Functor f ) => m (Stream f m r) -> Stream f m r
effect = Effect
{-#INLINE effect #-}

wrap :: (Monad m, Functor f ) => f (Stream f m r) -> Stream f m r
wrap = Step
{-#INLINE wrap #-}


{-| Lift for items in the base functor. Makes a singleton or
    one-layer succession. It is named by similarity to lift: 

> lift :: (Monad m, Functor f)     => m r -> Stream f m r
> yields ::  (Monad m, Functor f) => f r -> Stream f m r
-}

yields ::  (Monad m, Functor f) => f r -> Stream f m r
yields fr = Step (fmap Return fr)
{-#INLINE yields #-}


zipsWith :: (Monad m, Functor h) 
  => (forall x y . f x -> g y -> h (x,y)) 
  -> Stream f m r -> Stream g m r -> Stream h m r
zipsWith phi = curry loop where
  loop (s1, s2) = Effect $ go s1 s2
  go (Return r)  p        = return $ Return r
  go q         (Return s) = return $ Return s
  go (Effect m) p          = m >>= \s -> go s p
  go q         (Effect m)  = m >>= go q
  go (Step f) (Step g)    = return $ Step $ fmap loop (phi f g)
{-# INLINABLE zipsWith #-}   
  
zips :: (Monad m, Functor f, Functor g) 
     => Stream f m r -> Stream g m r -> Stream (Compose f g) m r  
zips = zipsWith go where
  go fx gy = Compose (fmap (\x -> fmap (\y -> (x,y)) gy) fx)
{-# INLINE zips #-}   


{-| Interleave functor layers, with the effects of the first preceding
    the effects of the second.

> interleaves = zipsWith (liftA2 (,))

>>> let paste = \a b -> interleaves (Q.lines a) (maps (Q.cons' '\t') (Q.lines b))
>>> Q.stdout $ Q.unlines $ paste "hello\nworld\n" "goodbye\nworld\n"
hello	goodbye
world	world

-}
  
interleaves
  :: (Monad m, Applicative h) =>
     Stream h m r -> Stream h m r -> Stream h m r
interleaves = zipsWith (liftA2 (,))
{-# INLINE interleaves #-}   


{-| Swap the order of functors in a sum of functors. 


>>> S.toListM' $ S.print $ separate $ maps S.switch $ maps (S.distinguish (=='a')) $ S.each "banana"
'a'
'a'
'a'
"bnn" :> ()
>>> S.toListM' $ S.print $ separate $ maps (S.distinguish (=='a')) $ S.each "banana"
'b'
'n'
'n'
"aaa" :> ()
-}
switch :: Sum f g r -> Sum g f r
switch s = case s of InL a -> InR a; InR a -> InL a
{-#INLINE switch #-}


  
{-| Given a stream on a sum of functors, make it a stream on the left functor,
    with the streaming on the other functor as the governing monad. This is
    useful for acting on one or the other functor with a fold.
  
>>> let odd_even = S.maps (S.distinguish even) $ S.each [1..10::Int]
>>> :t separate odd_even
separate odd_even
  :: Monad m => Stream (Of Int) (Stream (Of Int) m) ()

    Now, for example, it is convenient to fold on the left and right values separately:

>>>  toList $ toList $ separate odd_even
[2,4,6,8,10] :> ([1,3,5,7,9] :> ())

   We can achieve the above effect more simply
   in the case of @Stream (Of a) m r@ by using 'Streaming.Prelude.duplicate'

>>> S.toList . S.filter even $ S.toList . S.filter odd $ S.duplicate $ each [1..10::Int]
[2,4,6,8,10] :> ([1,3,5,7,9] :> ())


    But 'separate' and 'unseparate' are functor-general. 

-}

separate :: (Monad m, Functor f, Functor g) => Stream (Sum f g) m r -> Stream f (Stream g m) r
separate str = destroyExposed 
  str 
  (\x -> case x of InL fss -> wrap fss; InR gss -> effect (yields gss))
  (effect . lift) 
  return 
{-#INLINABLE separate #-}

unseparate :: (Monad m, Functor f, Functor g) =>  Stream f (Stream g m) r -> Stream (Sum f g) m r
unseparate str = destroyExposed 
  str 
  (wrap . InL) 
  (join . maps InR) 
  return 
{-#INLINABLE unseparate #-}


unzips :: (Monad m, Functor f, Functor g) => 
   Stream (Compose f g) m r ->  Stream f (Stream g m) r 
unzips str = destroyExposed
  str 
  (\(Compose fgstr) -> Step (fmap (Effect . yields) fgstr))
  (Effect . lift) 
  return 
{-#INLINABLE unzips #-}

{-| Group layers in an alternating stream into adjoining sub-streams
    of one type or another. 
-}
groups :: (Monad m, Functor f, Functor g) 
           => Stream (Sum f g) m r 
           -> Stream (Sum (Stream f m) (Stream g m)) m r
groups = loop 
  where
  loop str = do
    e <- lift $ inspect str
    case e of
      Left r -> return r
      Right ostr -> case ostr of
        InR gstr -> wrap $ InR (fmap loop (cleanR (wrap (InR gstr))))
        InL fstr -> wrap $ InL (fmap loop (cleanL (wrap (InL fstr))))

  cleanL  :: (Monad m, Functor f, Functor g) =>
       Stream (Sum f g) m r -> Stream f m (Stream (Sum f g) m r)
  cleanL = loop where
    loop s = do
     e <- lift $ inspect s
     case e of
      Left r           -> return (return r)
      Right (InL fstr) -> wrap (fmap loop fstr)
      Right (InR gstr) -> return (wrap (InR gstr))

  cleanR  :: (Monad m, Functor f, Functor g) =>
       Stream (Sum f g) m r -> Stream g m (Stream (Sum f g) m r)
--  cleanR = fmap (maps switch) . cleanL . maps switch
  cleanR = loop where
    loop s = do
     e <- lift $ inspect s
     case e of
      Left r           -> return (return r)
      Right (InL fstr) -> return (wrap (InL fstr))
      Right (InR gstr) -> wrap (fmap loop gstr)
{-#INLINABLE groups #-}
      
-- groupInL :: (Monad m, Functor f, Functor g)
--                      => Stream (Sum f g) m r
--                      -> Stream (Sum (Stream f m) g) m r
-- groupInL = loop
--   where
--   loop str = do
--     e <- lift $ inspect str
--     case e of
--       Left r -> return r
--       Right ostr -> case ostr of
--         InR gstr -> wrap $ InR (fmap loop gstr)
--         InL fstr -> wrap $ InL (fmap loop (cleanL (wrap (InL fstr))))
--   cleanL  :: (Monad m, Functor f, Functor g) =>
--        Stream (Sum f g) m r -> Stream f m (Stream (Sum f g) m r)
--   cleanL = loop where
--     loop s = dos
--      e <- lift $ inspect s
--      case e of
--       Left r           -> return (return r)
--       Right (InL fstr) -> wrap (fmap loop fstr)
--       Right (InR gstr) -> return (wrap (InR gstr))

-- {-| Permit streamed actions to proceed unless the clock has run out.
--
-- -}
-- period :: (MonadIO m, Functor f) => Double -> Stream f m r -> Stream f m (Stream f m r)
-- period seconds str = do
--     utc <- liftIO getCurrentTime
--     let loop s = do
--           utc' <- liftIO getCurrentTime
--           if diffUTCTime utc' utc > (cutoff / 1000000000)
--             then return s
--             else case s of
--               Return r -> Return (Return r)
--               Effect m -> Effect (liftM loop m)
--               Step f   -> Step (fmap loop f)
--     loop str
--   where
--   cutoff = fromInteger (truncate (1000000000 * seconds))
-- {-#INLINABLE period #-}
--
--
-- {-| Divide a succession of phases according to a specified time interval. If time runs out
--     while an action is proceeding, it is allowed to run to completion. The clock is only then
--     restarted.
-- -}
-- periods :: (MonadIO m, Functor f) => Double -> Stream f m r -> Stream (Stream f m) m r
-- periods seconds s = do
--   utc <- liftIO getCurrentTime
--   loop (addUTCTime cutoff utc) s
--
--   where
--   cutoff = fromInteger (truncate (1000000000 * seconds)) / 1000000000
--   loop final stream = do
--     utc <- liftIO getCurrentTime
--     if utc > final
--       then loop (addUTCTime cutoff utc) stream
--       else case stream of
--         Return r  -> Return r
--         Effect m  -> Effect $ liftM (loop final) m
--         Step fstr -> Step $ fmap (periods seconds) (cutoff_ final (Step fstr))
--
--         -- do
--         --   let sloop s = do
--         --         utc' <- liftIO getCurrentTime
--         --         if final < utc'
--         --           then return s
--         --           else case s of
--         --             Return r -> Return (Return r)
--         --             Effect m -> Effect (liftM sloop m)
--         --             Step f   -> Step (fmap sloop f)
--         --   Step (Step (fmap (fmap (periods seconds) . sloop) fstr))
--           -- str <- m
--           -- utc' <- liftIO getCurrentTime
--           -- if diffUTCTime utc' utc > (cutoff / 1000000000)
--           --   then return (loop utc' str)
--           --   else return (loop utc str)
--         -- Step fs   -> do
--         --   let check str = do
--         --         utc' <- liftIO getCurrentTime
--         --         loop utc' str
--         --
-- {-# INLINABLE periods #-}
--
-- cutoff_ final str = do
--     let loop s = do
--           utc' <- liftIO getCurrentTime
--           if utc' > final
--             then Return s
--             else case s of
--               Return r -> Return (Return r)
--               Effect m -> Effect (liftM loop m)
--               Step f   -> Step (fmap loop f)
--     loop str