-- | Generic stream manipulations

{-# LANGUAGE BangPatterns       #-}
{-# LANGUAGE DeriveDataTypeable #-}
{-# LANGUAGE RankNTypes         #-}

module System.IO.Streams.Combinators
 ( -- * Folds
   inputFoldM
 , outputFoldM
 , fold
 , foldM
 , fold_
 , foldM_
 , any
 , all
 , maximum
 , minimum

   -- * Unfolds
 , unfoldM

   -- * Maps
 , map
 , mapM
 , mapM_
 , mapMaybe
 , contramap
 , contramapM
 , contramapM_
 , contramapMaybe

   -- * Filter
 , filter
 , filterM
 , filterOutput
 , filterOutputM

   -- * Takes and drops
 , give
 , take
 , drop
 , ignore

   -- * Zip and unzip
 , zip
 , zipWith
 , zipWithM
 , unzip
 , contraunzip

   -- * Utility
 , intersperse
 , skipToEof
 , ignoreEof
 , atEndOfInput
 , atEndOfOutput
 ) where

------------------------------------------------------------------------------
import           Control.Concurrent.MVar    (newMVar, withMVar)
import           Control.Monad              (liftM, void, when)
import           Control.Monad.IO.Class     (liftIO)
import           Data.Int                   (Int64)
import           Data.IORef                 (IORef, atomicModifyIORef, modifyIORef, newIORef, readIORef, writeIORef)
import           Data.Maybe                 (isJust)
import           Prelude                    hiding (all, any, drop, filter, map, mapM, mapM_, maximum, minimum, read, take, unzip, zip, zipWith)
------------------------------------------------------------------------------
import           System.IO.Streams.Internal (InputStream (..), OutputStream (..), fromGenerator, makeInputStream, makeOutputStream, read, unRead, write, yield)


------------------------------------------------------------------------------
-- | A side-effecting fold over an 'OutputStream', as a stream transformer.
--
-- The IO action returned by 'outputFoldM' can be used to fetch and reset the updated
-- seed value. Example:
--
-- @
-- ghci> is <- Streams.'System.IO.Streams.List.fromList' [1, 2, 3::Int]
-- ghci> (os, getList) <- Streams.'System.IO.Streams.List.listOutputStream'
-- ghci> (os', getSeed) \<- Streams.'outputFoldM' (\\x y -> return (x+y)) 0 os
-- ghci> Streams.'System.IO.Streams.connect' is os'
-- ghci> getList
-- [1,2,3]
-- ghci> getSeed
-- 6
-- @
outputFoldM :: (a -> b -> IO a)           -- ^ fold function
            -> a                          -- ^ initial seed
            -> OutputStream b             -- ^ output stream
            -> IO (OutputStream b, IO a)  -- ^ returns a new stream as well as
                                          -- an IO action to fetch and reset the
                                          --  updated seed value.
outputFoldM :: (a -> b -> IO a)
-> a -> OutputStream b -> IO (OutputStream b, IO a)
outputFoldM a -> b -> IO a
f a
initial OutputStream b
stream = do
    IORef a
ref <- a -> IO (IORef a)
forall a. a -> IO (IORef a)
newIORef a
initial
    OutputStream b
os  <- (Maybe b -> IO ()) -> IO (OutputStream b)
forall a. (Maybe a -> IO ()) -> IO (OutputStream a)
makeOutputStream (IORef a -> Maybe b -> IO ()
wr IORef a
ref)
    (OutputStream b, IO a) -> IO (OutputStream b, IO a)
forall (m :: * -> *) a. Monad m => a -> m a
return (OutputStream b
os, IORef a -> IO a
fetch IORef a
ref)

  where
    wr :: IORef a -> Maybe b -> IO ()
wr IORef a
_ Maybe b
Nothing       = Maybe b -> OutputStream b -> IO ()
forall a. Maybe a -> OutputStream a -> IO ()
write Maybe b
forall a. Maybe a
Nothing OutputStream b
stream
    wr IORef a
ref mb :: Maybe b
mb@(Just b
x) = do
        !a
z  <- IORef a -> IO a
forall a. IORef a -> IO a
readIORef IORef a
ref
        !a
z' <- a -> b -> IO a
f a
z b
x
        IORef a -> a -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef a
ref a
z'
        Maybe b -> OutputStream b -> IO ()
forall a. Maybe a -> OutputStream a -> IO ()
write Maybe b
mb OutputStream b
stream

    fetch :: IORef a -> IO a
fetch IORef a
ref = IORef a -> (a -> (a, a)) -> IO a
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef IORef a
ref ((a -> (a, a)) -> IO a) -> (a -> (a, a)) -> IO a
forall a b. (a -> b) -> a -> b
$ \a
x -> (a
initial, a
x)


------------------------------------------------------------------------------
-- | A side-effecting fold over an 'InputStream', as a stream transformer.
--
-- The IO action returned by 'inputFoldM' can be used to fetch and reset the updated seed
-- value. Example:
--
-- @
-- ghci> is <- Streams.'System.IO.Streams.List.fromList' [1, 2, 3::Int]
-- ghci> (is', getSeed) \<- Streams.'inputFoldM' (\\x y -> return (x+y)) 0 is
-- ghci> Streams.'System.IO.Streams.List.toList' is'
-- [1,2,3]
-- ghci> getSeed
-- 6
-- @
inputFoldM :: (a -> b -> IO a)          -- ^ fold function
           -> a                         -- ^ initial seed
           -> InputStream b             -- ^ input stream
           -> IO (InputStream b, IO a)  -- ^ returns a new stream as well as an
                                        -- IO action to fetch and reset the
                                        -- updated seed value.
inputFoldM :: (a -> b -> IO a) -> a -> InputStream b -> IO (InputStream b, IO a)
inputFoldM a -> b -> IO a
f a
initial InputStream b
stream = do
    IORef a
ref <- a -> IO (IORef a)
forall a. a -> IO (IORef a)
newIORef a
initial
    InputStream b
is  <- IO (Maybe b) -> IO (InputStream b)
forall a. IO (Maybe a) -> IO (InputStream a)
makeInputStream (IORef a -> IO (Maybe b)
rd IORef a
ref)
    (InputStream b, IO a) -> IO (InputStream b, IO a)
forall (m :: * -> *) a. Monad m => a -> m a
return (InputStream b
is, IORef a -> IO a
fetch IORef a
ref)

  where
    twiddle :: IORef a -> Maybe b -> IO (Maybe b)
twiddle IORef a
_ Maybe b
Nothing = Maybe b -> IO (Maybe b)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe b
forall a. Maybe a
Nothing

    twiddle IORef a
ref mb :: Maybe b
mb@(Just b
x) = do
        !a
z  <- IORef a -> IO a
forall a. IORef a -> IO a
readIORef IORef a
ref
        !a
z' <- a -> b -> IO a
f a
z b
x
        IORef a -> a -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef a
ref a
z'
        Maybe b -> IO (Maybe b)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe b
mb

    rd :: IORef a -> IO (Maybe b)
rd IORef a
ref = InputStream b -> IO (Maybe b)
forall a. InputStream a -> IO (Maybe a)
read InputStream b
stream IO (Maybe b) -> (Maybe b -> IO (Maybe b)) -> IO (Maybe b)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= IORef a -> Maybe b -> IO (Maybe b)
twiddle IORef a
ref

    fetch :: IORef a -> IO a
fetch IORef a
ref = IORef a -> (a -> (a, a)) -> IO a
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef IORef a
ref ((a -> (a, a)) -> IO a) -> (a -> (a, a)) -> IO a
forall a b. (a -> b) -> a -> b
$ \a
x -> (a
initial, a
x)


------------------------------------------------------------------------------
-- | A left fold over an input stream. The input stream is fully consumed. See
-- 'Prelude.foldl'.
--
-- Example:
--
-- @
-- ghci> Streams.'System.IO.Streams.fromList' [1..10] >>= Streams.'fold' (+) 0
-- 55
-- @
fold :: (s -> a -> s)       -- ^ fold function
     -> s                   -- ^ initial seed
     -> InputStream a       -- ^ input stream
     -> IO s
fold :: (s -> a -> s) -> s -> InputStream a -> IO s
fold s -> a -> s
f s
seed InputStream a
stream = s -> IO s
go s
seed
  where
    go :: s -> IO s
go !s
s = InputStream a -> IO (Maybe a)
forall a. InputStream a -> IO (Maybe a)
read InputStream a
stream IO (Maybe a) -> (Maybe a -> IO s) -> IO s
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= IO s -> (a -> IO s) -> Maybe a -> IO s
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (s -> IO s
forall (m :: * -> *) a. Monad m => a -> m a
return s
s) (s -> IO s
go (s -> IO s) -> (a -> s) -> a -> IO s
forall b c a. (b -> c) -> (a -> b) -> a -> c
. s -> a -> s
f s
s)


------------------------------------------------------------------------------
-- | A side-effecting left fold over an input stream. The input stream is fully
-- consumed. See 'Prelude.foldl'.
--
-- Example:
--
-- @
-- ghci> Streams.'System.IO.Streams.fromList' [1..10] >>= Streams.'foldM' (\x y -> 'return' (x + y)) 0
-- 55
-- @
foldM :: (s -> a -> IO s)       -- ^ fold function
      -> s                      -- ^ initial seed
      -> InputStream a          -- ^ input stream
      -> IO s
foldM :: (s -> a -> IO s) -> s -> InputStream a -> IO s
foldM s -> a -> IO s
f s
seed InputStream a
stream = s -> IO s
go s
seed
  where
    go :: s -> IO s
go !s
s = InputStream a -> IO (Maybe a)
forall a. InputStream a -> IO (Maybe a)
read InputStream a
stream IO (Maybe a) -> (Maybe a -> IO s) -> IO s
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= IO s -> (a -> IO s) -> Maybe a -> IO s
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (s -> IO s
forall (m :: * -> *) a. Monad m => a -> m a
return s
s) ((s -> IO s
go (s -> IO s) -> IO s -> IO s
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<<) (IO s -> IO s) -> (a -> IO s) -> a -> IO s
forall b c a. (b -> c) -> (a -> b) -> a -> c
. s -> a -> IO s
f s
s)


------------------------------------------------------------------------------
-- | A variant of 'System.IO.Streams.fold' suitable for use with composable folds
-- from \'beautiful folding\' libraries like
-- <http://hackage.haskell.org/package/foldl the foldl library>.
-- The input stream is fully consumed. 
--
-- Example:
--
-- @
-- ghci> let folds = liftA3 (,,) Foldl.length Foldl.mean Foldl.maximum
-- ghci> Streams.'System.IO.Streams.fromList' [1..10::Double] >>= Foldl.purely Streams.'System.IO.Streams.fold_' folds is
-- ghci> (10,5.5,Just 10.0)
-- @
--
-- /Since 1.3.6.0/
--
fold_ :: (x -> a -> x)    -- ^ accumulator update function
      -> x                -- ^ initial seed
      -> (x -> s)         -- ^ recover folded value
      -> InputStream a    -- ^ input stream
      -> IO s
fold_ :: (x -> a -> x) -> x -> (x -> s) -> InputStream a -> IO s
fold_ x -> a -> x
op x
seed x -> s
done InputStream a
stream = (x -> s) -> IO x -> IO s
forall (m :: * -> *) a1 r. Monad m => (a1 -> r) -> m a1 -> m r
liftM x -> s
done (x -> IO x
go x
seed)
   where 
     go :: x -> IO x
go !x
s = InputStream a -> IO (Maybe a)
forall a. InputStream a -> IO (Maybe a)
read InputStream a
stream IO (Maybe a) -> (Maybe a -> IO x) -> IO x
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= IO x -> (a -> IO x) -> Maybe a -> IO x
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (x -> IO x
forall (m :: * -> *) a. Monad m => a -> m a
return x
s) (x -> IO x
go (x -> IO x) -> (a -> x) -> a -> IO x
forall b c a. (b -> c) -> (a -> b) -> a -> c
. x -> a -> x
op x
s)


------------------------------------------------------------------------------
-- | A variant of 'System.IO.Streams.foldM' suitable for use with composable folds
-- from \'beautiful folding\' libraries like
-- <http://hackage.haskell.org/package/foldl the foldl library>.
-- The input stream is fully consumed. 
--
-- Example:
--
-- @
-- ghci> let folds = Foldl.mapM_ print *> Foldl.generalize (liftA2 (,) Foldl.sum Foldl.mean)
-- ghci> Streams.'System.IO.Streams.fromList' [1..3::Double] >>= Foldl.impurely Streams.'System.IO.Streams.foldM_' folds
-- 1.0
-- 2.0
-- 3.0
-- (6.0,2.0)
-- @
--
-- /Since 1.3.6.0/
--
foldM_ :: (x -> a -> IO x)   -- ^ accumulator update action
       -> IO x               -- ^ initial seed
       -> (x -> IO s)        -- ^ recover folded value
       -> InputStream a      -- ^ input stream
       -> IO s
foldM_ :: (x -> a -> IO x) -> IO x -> (x -> IO s) -> InputStream a -> IO s
foldM_ x -> a -> IO x
f IO x
seed x -> IO s
done InputStream a
stream = IO x
seed IO x -> (x -> IO s) -> IO s
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= x -> IO s
go 
  where
    go :: x -> IO s
go !x
x = InputStream a -> IO (Maybe a)
forall a. InputStream a -> IO (Maybe a)
read InputStream a
stream IO (Maybe a) -> (Maybe a -> IO s) -> IO s
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= IO s -> (a -> IO s) -> Maybe a -> IO s
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (x -> IO s
done x
x) ((x -> IO s
go (x -> IO s) -> IO x -> IO s
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<<) (IO x -> IO s) -> (a -> IO x) -> a -> IO s
forall b c a. (b -> c) -> (a -> b) -> a -> c
. x -> a -> IO x
f x
x)


------------------------------------------------------------------------------
-- | @any predicate stream@ returns 'True' if any element in @stream@ matches
-- the predicate.
--
-- 'any' consumes as few elements as possible, ending consumption if an element
-- satisfies the predicate.
--
-- @
-- ghci> is <- Streams.'System.IO.Streams.List.fromList' [1, 2, 3]
-- ghci> Streams.'System.IO.Streams.Combinators.any' (> 0) is    -- Consumes one element
-- True
-- ghci> Streams.'System.IO.Streams.read' is
-- Just 2
-- ghci> Streams.'System.IO.Streams.Combinators.any' even is     -- Only 3 remains
-- False
-- @
any :: (a -> Bool) -> InputStream a -> IO Bool
any :: (a -> Bool) -> InputStream a -> IO Bool
any a -> Bool
predicate InputStream a
stream = IO Bool
go
  where
    go :: IO Bool
go = do
        Maybe a
mElem <- InputStream a -> IO (Maybe a)
forall a. InputStream a -> IO (Maybe a)
read InputStream a
stream
        case Maybe a
mElem of
            Maybe a
Nothing -> Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
            Just a
e  -> if a -> Bool
predicate a
e then Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True else IO Bool
go


------------------------------------------------------------------------------
-- | @all predicate stream@ returns 'True' if every element in @stream@ matches
-- the predicate.
--
-- 'all' consumes as few elements as possible, ending consumption if any element
-- fails the predicate.
--
-- @
-- ghci> is <- Streams.'System.IO.Streams.List.fromList' [1, 2, 3]
-- ghci> Streams.'System.IO.Streams.Combinators.all' (< 0) is    -- Consumes one element
-- False
-- ghci> Streams.'System.IO.Streams.read' is
-- Just 2
-- ghci> Streams.'System.IO.Streams.Combinators.all' odd is      -- Only 3 remains
-- True
-- @
all :: (a -> Bool) -> InputStream a -> IO Bool
all :: (a -> Bool) -> InputStream a -> IO Bool
all a -> Bool
predicate InputStream a
stream = IO Bool
go
  where
    go :: IO Bool
go = do
        Maybe a
mElem <- InputStream a -> IO (Maybe a)
forall a. InputStream a -> IO (Maybe a)
read InputStream a
stream
        case Maybe a
mElem of
            Maybe a
Nothing -> Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
            Just a
e  -> if a -> Bool
predicate a
e then IO Bool
go else Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False


------------------------------------------------------------------------------
-- | @maximum stream@ returns the greatest element in @stream@ or 'Nothing' if
-- the stream is empty.
--
-- 'maximum' consumes the entire stream.
--
-- @
-- ghci> is <- Streams.'System.IO.Streams.List.fromList' [1, 2, 3]
-- ghci> Streams.'System.IO.Streams.Combinators.maximum' is
-- 3
-- ghci> Streams.'System.IO.Streams.read' is     -- The stream is now empty
-- Nothing
-- @
maximum :: (Ord a) => InputStream a -> IO (Maybe a)
maximum :: InputStream a -> IO (Maybe a)
maximum InputStream a
stream = do
    Maybe a
mElem0 <- InputStream a -> IO (Maybe a)
forall a. InputStream a -> IO (Maybe a)
read InputStream a
stream
    case Maybe a
mElem0 of
        Maybe a
Nothing -> Maybe a -> IO (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
forall a. Maybe a
Nothing
        Just a
e  -> a -> IO (Maybe a)
go a
e
  where
    go :: a -> IO (Maybe a)
go a
oldElem = do
        Maybe a
mElem <- InputStream a -> IO (Maybe a)
forall a. InputStream a -> IO (Maybe a)
read InputStream a
stream
        case Maybe a
mElem of
            Maybe a
Nothing      -> Maybe a -> IO (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return (a -> Maybe a
forall a. a -> Maybe a
Just a
oldElem)
            Just a
newElem -> a -> IO (Maybe a)
go (a -> a -> a
forall a. Ord a => a -> a -> a
max a
oldElem a
newElem)


------------------------------------------------------------------------------
-- | @minimum stream@ returns the greatest element in @stream@
--
-- 'minimum' consumes the entire stream.
--
-- @
-- ghci> is <- Streams.'System.IO.Streams.List.fromList' [1, 2, 3]
-- ghci> Streams.'System.IO.Streams.Combinators.minimum' is
-- 1
-- ghci> Streams.'System.IO.Streams.read' is    -- The stream is now empty
-- Nothing
-- @
minimum :: (Ord a) => InputStream a -> IO (Maybe a)
minimum :: InputStream a -> IO (Maybe a)
minimum InputStream a
stream = do
    Maybe a
mElem0 <- InputStream a -> IO (Maybe a)
forall a. InputStream a -> IO (Maybe a)
read InputStream a
stream
    case Maybe a
mElem0 of
        Maybe a
Nothing -> Maybe a -> IO (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
forall a. Maybe a
Nothing
        Just a
e  -> a -> IO (Maybe a)
go a
e
  where
    go :: a -> IO (Maybe a)
go a
oldElem = do
        Maybe a
mElem <- InputStream a -> IO (Maybe a)
forall a. InputStream a -> IO (Maybe a)
read InputStream a
stream
        case Maybe a
mElem of
            Maybe a
Nothing      -> Maybe a -> IO (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return (a -> Maybe a
forall a. a -> Maybe a
Just a
oldElem)
            Just a
newElem -> a -> IO (Maybe a)
go (a -> a -> a
forall a. Ord a => a -> a -> a
min a
oldElem a
newElem)


------------------------------------------------------------------------------
-- | @unfoldM f seed@ builds an 'InputStream' from successively applying @f@ to
-- the @seed@ value, continuing if @f@ produces 'Just' and halting on
-- 'Nothing'.
--
-- @
-- ghci> is \<- Streams.'System.IO.Streams.Combinators.unfoldM' (\n -> return $ if n < 3 then Just (n, n + 1) else Nothing) 0
-- ghci> Streams.'System.IO.Streams.List.toList' is
-- [0,1,2]
-- @
unfoldM :: (b -> IO (Maybe (a, b))) -> b -> IO (InputStream a)
unfoldM :: (b -> IO (Maybe (a, b))) -> b -> IO (InputStream a)
unfoldM b -> IO (Maybe (a, b))
f b
seed = Generator a () -> IO (InputStream a)
forall r a. Generator r a -> IO (InputStream r)
fromGenerator (b -> Generator a ()
go b
seed)
  where
    go :: b -> Generator a ()
go b
oldSeed = do
       Maybe (a, b)
m <- IO (Maybe (a, b)) -> Generator a (Maybe (a, b))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (b -> IO (Maybe (a, b))
f b
oldSeed)
       case Maybe (a, b)
m of
           Maybe (a, b)
Nothing           -> () -> Generator a ()
forall (m :: * -> *) a. Monad m => a -> m a
return (() -> Generator a ()) -> () -> Generator a ()
forall a b. (a -> b) -> a -> b
$! ()
           Just (a
a, b
newSeed) -> do
               a -> Generator a ()
forall r. r -> Generator r ()
yield a
a
               b -> Generator a ()
go b
newSeed

------------------------------------------------------------------------------
-- | Maps a pure function over an 'InputStream'.
--
-- @map f s@ passes all output from @s@ through the function @f@.
--
-- Satisfies the following laws:
--
-- @
-- Streams.'map' (g . f) === Streams.'map' f >=> Streams.'map' g
-- Streams.'map' 'id' === Streams.'makeInputStream' . Streams.'read'
-- @
map :: (a -> b) -> InputStream a -> IO (InputStream b)
map :: (a -> b) -> InputStream a -> IO (InputStream b)
map a -> b
f InputStream a
s = IO (Maybe b) -> IO (InputStream b)
forall a. IO (Maybe a) -> IO (InputStream a)
makeInputStream IO (Maybe b)
g
  where
    g :: IO (Maybe b)
g = InputStream a -> IO (Maybe a)
forall a. InputStream a -> IO (Maybe a)
read InputStream a
s IO (Maybe a) -> (Maybe a -> IO (Maybe b)) -> IO (Maybe b)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Maybe b -> IO (Maybe b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe b -> IO (Maybe b))
-> (Maybe a -> Maybe b) -> Maybe a -> IO (Maybe b)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (a -> b) -> Maybe a -> Maybe b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap a -> b
f


------------------------------------------------------------------------------
-- | Maps an impure function over an 'InputStream'.
--
-- @mapM f s@ passes all output from @s@ through the IO action @f@.
--
-- Satisfies the following laws:
--
-- @
-- Streams.'mapM' (f >=> g) === Streams.'mapM' f >=> Streams.'mapM' g
-- Streams.'mapM' 'return' === Streams.'makeInputStream' . Streams.'read'
-- @
--
mapM :: (a -> IO b) -> InputStream a -> IO (InputStream b)
mapM :: (a -> IO b) -> InputStream a -> IO (InputStream b)
mapM a -> IO b
f InputStream a
s = IO (Maybe b) -> IO (InputStream b)
forall a. IO (Maybe a) -> IO (InputStream a)
makeInputStream IO (Maybe b)
g
  where
    g :: IO (Maybe b)
g = do
        Maybe b
mb <- InputStream a -> IO (Maybe a)
forall a. InputStream a -> IO (Maybe a)
read InputStream a
s IO (Maybe a) -> (Maybe a -> IO (Maybe b)) -> IO (Maybe b)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= IO (Maybe b) -> (a -> IO (Maybe b)) -> Maybe a -> IO (Maybe b)
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (Maybe b -> IO (Maybe b)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe b
forall a. Maybe a
Nothing)
                               (\a
x -> (b -> Maybe b) -> IO b -> IO (Maybe b)
forall (m :: * -> *) a1 r. Monad m => (a1 -> r) -> m a1 -> m r
liftM b -> Maybe b
forall a. a -> Maybe a
Just (IO b -> IO (Maybe b)) -> IO b -> IO (Maybe b)
forall a b. (a -> b) -> a -> b
$ a -> IO b
f a
x)

        Maybe b -> IO (Maybe b)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe b
mb


------------------------------------------------------------------------------
-- | Maps a side effect over an 'InputStream'.
--
-- @mapM_ f s@ produces a new input stream that passes all output from @s@
-- through the side-effecting IO action @f@.
--
-- Example:
--
-- @
-- ghci> Streams.'System.IO.Streams.fromList' [1,2,3] >>=
--       Streams.'mapM_' ('putStrLn' . 'show' . (*2)) >>=
--       Streams.'System.IO.Streams.toList'
-- 2
-- 4
-- 6
-- [1,2,3]
-- @
--
mapM_ :: (a -> IO b) -> InputStream a -> IO (InputStream a)
mapM_ :: (a -> IO b) -> InputStream a -> IO (InputStream a)
mapM_ a -> IO b
f InputStream a
s = IO (Maybe a) -> IO (InputStream a)
forall a. IO (Maybe a) -> IO (InputStream a)
makeInputStream (IO (Maybe a) -> IO (InputStream a))
-> IO (Maybe a) -> IO (InputStream a)
forall a b. (a -> b) -> a -> b
$ do
    Maybe a
mb <- InputStream a -> IO (Maybe a)
forall a. InputStream a -> IO (Maybe a)
read InputStream a
s
    ()
_  <- IO () -> (a -> IO ()) -> Maybe a -> IO ()
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (() -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return (() -> IO ()) -> () -> IO ()
forall a b. (a -> b) -> a -> b
$! ()) (IO b -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO b -> IO ()) -> (a -> IO b) -> a -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> IO b
f) Maybe a
mb
    Maybe a -> IO (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
mb


------------------------------------------------------------------------------
-- | A version of map that discards elements
--
-- @mapMaybe f s@ passes all output from @s@ through the function @f@ and
-- discards elements for which @f s@ evaluates to 'Nothing'.
--
-- Example:
--
-- @
-- ghci> Streams.'System.IO.Streams.fromList' [Just 1, None, Just 3] >>=
--       Streams.'mapMaybe' 'id' >>=
--       Streams.'System.IO.Streams.toList'
-- [1,3]
-- @
--
-- /Since: 1.2.1.0/
mapMaybe :: (a -> Maybe b) -> InputStream a -> IO (InputStream b)
mapMaybe :: (a -> Maybe b) -> InputStream a -> IO (InputStream b)
mapMaybe a -> Maybe b
f InputStream a
src = IO (Maybe b) -> IO (InputStream b)
forall a. IO (Maybe a) -> IO (InputStream a)
makeInputStream IO (Maybe b)
g
  where
    g :: IO (Maybe b)
g = do
      Maybe a
s <- InputStream a -> IO (Maybe a)
forall a. InputStream a -> IO (Maybe a)
read InputStream a
src
      case Maybe a
s of
        Maybe a
Nothing -> Maybe b -> IO (Maybe b)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe b
forall a. Maybe a
Nothing
        Just a
x ->
          case a -> Maybe b
f a
x of
            Maybe b
Nothing -> IO (Maybe b)
g
            Maybe b
y -> Maybe b -> IO (Maybe b)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe b
y
------------------------------------------------------------------------------
-- | Contravariant counterpart to 'map'.
--
-- @contramap f s@ passes all input to @s@ through the function @f@.
--
-- Satisfies the following laws:
--
-- @
-- Streams.'contramap' (g . f) === Streams.'contramap' g >=> Streams.'contramap' f
-- Streams.'contramap' 'id' === 'return'
-- @
contramap :: (a -> b) -> OutputStream b -> IO (OutputStream a)
contramap :: (a -> b) -> OutputStream b -> IO (OutputStream a)
contramap a -> b
f OutputStream b
s = (Maybe a -> IO ()) -> IO (OutputStream a)
forall a. (Maybe a -> IO ()) -> IO (OutputStream a)
makeOutputStream ((Maybe a -> IO ()) -> IO (OutputStream a))
-> (Maybe a -> IO ()) -> IO (OutputStream a)
forall a b. (a -> b) -> a -> b
$ (Maybe b -> OutputStream b -> IO ())
-> OutputStream b -> Maybe b -> IO ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip Maybe b -> OutputStream b -> IO ()
forall a. Maybe a -> OutputStream a -> IO ()
write OutputStream b
s (Maybe b -> IO ()) -> (Maybe a -> Maybe b) -> Maybe a -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (a -> b) -> Maybe a -> Maybe b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap a -> b
f


------------------------------------------------------------------------------
-- | Contravariant counterpart to 'mapM'.
--
-- @contramapM f s@ passes all input to @s@ through the IO action @f@
--
-- Satisfies the following laws:
--
-- @
-- Streams.'contramapM' (f >=> g) = Streams.'contramapM' g >=> Streams.'contramapM' f
-- Streams.'contramapM' 'return' = 'return'
-- @
contramapM :: (a -> IO b) -> OutputStream b -> IO (OutputStream a)
contramapM :: (a -> IO b) -> OutputStream b -> IO (OutputStream a)
contramapM a -> IO b
f OutputStream b
s = (Maybe a -> IO ()) -> IO (OutputStream a)
forall a. (Maybe a -> IO ()) -> IO (OutputStream a)
makeOutputStream Maybe a -> IO ()
g
  where
    g :: Maybe a -> IO ()
g Maybe a
Nothing = Maybe b -> OutputStream b -> IO ()
forall a. Maybe a -> OutputStream a -> IO ()
write Maybe b
forall a. Maybe a
Nothing OutputStream b
s

    g (Just a
x) = do
        !b
y <- a -> IO b
f a
x
        Maybe b -> OutputStream b -> IO ()
forall a. Maybe a -> OutputStream a -> IO ()
write (b -> Maybe b
forall a. a -> Maybe a
Just b
y) OutputStream b
s


------------------------------------------------------------------------------
-- | Equivalent to 'mapM_' for output.
--
-- @contramapM f s@ passes all input to @s@ through the side-effecting IO
-- action @f@.
--
contramapM_ :: (a -> IO b) -> OutputStream a -> IO (OutputStream a)
contramapM_ :: (a -> IO b) -> OutputStream a -> IO (OutputStream a)
contramapM_ a -> IO b
f OutputStream a
s = (Maybe a -> IO ()) -> IO (OutputStream a)
forall a. (Maybe a -> IO ()) -> IO (OutputStream a)
makeOutputStream ((Maybe a -> IO ()) -> IO (OutputStream a))
-> (Maybe a -> IO ()) -> IO (OutputStream a)
forall a b. (a -> b) -> a -> b
$ \Maybe a
mb -> do
    ()
_ <- IO () -> (a -> IO ()) -> Maybe a -> IO ()
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (() -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return (() -> IO ()) -> () -> IO ()
forall a b. (a -> b) -> a -> b
$! ()) (IO b -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO b -> IO ()) -> (a -> IO b) -> a -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> IO b
f) Maybe a
mb
    Maybe a -> OutputStream a -> IO ()
forall a. Maybe a -> OutputStream a -> IO ()
write Maybe a
mb OutputStream a
s


------------------------------------------------------------------------------
-- | Contravariant counterpart to 'contramapMaybe'.
--
-- @contramap f s@ passes all input to @s@ through the function @f@.
-- Discards all the elements for which @f@ returns 'Nothing'.
--
-- /Since: 1.2.1.0/
--
contramapMaybe :: (a -> Maybe b) -> OutputStream b -> IO (OutputStream a)
contramapMaybe :: (a -> Maybe b) -> OutputStream b -> IO (OutputStream a)
contramapMaybe a -> Maybe b
f OutputStream b
s = (Maybe a -> IO ()) -> IO (OutputStream a)
forall a. (Maybe a -> IO ()) -> IO (OutputStream a)
makeOutputStream ((Maybe a -> IO ()) -> IO (OutputStream a))
-> (Maybe a -> IO ()) -> IO (OutputStream a)
forall a b. (a -> b) -> a -> b
$ Maybe a -> IO ()
g
    where
      g :: Maybe a -> IO ()
g Maybe a
Nothing = Maybe b -> OutputStream b -> IO ()
forall a. Maybe a -> OutputStream a -> IO ()
write Maybe b
forall a. Maybe a
Nothing OutputStream b
s
      g (Just a
a) =
        case a -> Maybe b
f a
a of
          Maybe b
Nothing -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
          Maybe b
x -> Maybe b -> OutputStream b -> IO ()
forall a. Maybe a -> OutputStream a -> IO ()
write Maybe b
x OutputStream b
s


------------------------------------------------------------------------------
-- | Drives an 'InputStream' to end-of-stream, discarding all of the yielded
-- values.
skipToEof :: InputStream a -> IO ()
skipToEof :: InputStream a -> IO ()
skipToEof InputStream a
str = IO ()
go
  where
    go :: IO ()
go = InputStream a -> IO (Maybe a)
forall a. InputStream a -> IO (Maybe a)
read InputStream a
str IO (Maybe a) -> (Maybe a -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= IO () -> (a -> IO ()) -> Maybe a -> IO ()
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (() -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return (() -> IO ()) -> () -> IO ()
forall a b. (a -> b) -> a -> b
$! ()) (IO () -> a -> IO ()
forall a b. a -> b -> a
const IO ()
go)
{-# INLINE skipToEof #-}


------------------------------------------------------------------------------
-- | Drops chunks from an input stream if they fail to match a given filter
-- predicate. See 'Prelude.filter'.
--
-- Items pushed back to the returned stream are propagated back upstream.
--
-- Example:
--
-- @
-- ghci> Streams.'System.IO.Streams.fromList' [\"the\", \"quick\", \"brown\", \"fox\"] >>=
--       Streams.'filterM' ('return' . (/= \"brown\")) >>= Streams.'System.IO.Streams.toList'
-- [\"the\",\"quick\",\"fox\"]
-- @
filterM :: (a -> IO Bool)
        -> InputStream a
        -> IO (InputStream a)
filterM :: (a -> IO Bool) -> InputStream a -> IO (InputStream a)
filterM a -> IO Bool
p InputStream a
src = InputStream a -> IO (InputStream a)
forall (m :: * -> *) a. Monad m => a -> m a
return (InputStream a -> IO (InputStream a))
-> InputStream a -> IO (InputStream a)
forall a b. (a -> b) -> a -> b
$! IO (Maybe a) -> (a -> IO ()) -> InputStream a
forall a. IO (Maybe a) -> (a -> IO ()) -> InputStream a
InputStream IO (Maybe a)
prod a -> IO ()
pb
  where
    prod :: IO (Maybe a)
prod = InputStream a -> IO (Maybe a)
forall a. InputStream a -> IO (Maybe a)
read InputStream a
src IO (Maybe a) -> (Maybe a -> IO (Maybe a)) -> IO (Maybe a)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= IO (Maybe a) -> (a -> IO (Maybe a)) -> Maybe a -> IO (Maybe a)
forall b a. b -> (a -> b) -> Maybe a -> b
maybe IO (Maybe a)
forall a. IO (Maybe a)
eof a -> IO (Maybe a)
chunk

    chunk :: a -> IO (Maybe a)
chunk a
s = do
        Bool
b <- a -> IO Bool
p a
s
        if Bool
b then Maybe a -> IO (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe a -> IO (Maybe a)) -> Maybe a -> IO (Maybe a)
forall a b. (a -> b) -> a -> b
$! a -> Maybe a
forall a. a -> Maybe a
Just a
s
             else IO (Maybe a)
prod

    eof :: IO (Maybe a)
eof = Maybe a -> IO (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
forall a. Maybe a
Nothing

    pb :: a -> IO ()
pb a
s = a -> InputStream a -> IO ()
forall a. a -> InputStream a -> IO ()
unRead a
s InputStream a
src


------------------------------------------------------------------------------
-- | Drops chunks from an input stream if they fail to match a given filter
-- predicate. See 'Prelude.filter'.
--
-- Items pushed back to the returned stream are propagated back upstream.
--
-- Example:
--
-- @
-- ghci> Streams.'System.IO.Streams.fromList' [\"the\", \"quick\", \"brown\", \"fox\"] >>=
--       Streams.'filter' (/= \"brown\") >>= Streams.'System.IO.Streams.toList'
-- [\"the\",\"quick\",\"fox\"]
-- @
filter :: (a -> Bool)
       -> InputStream a
       -> IO (InputStream a)
filter :: (a -> Bool) -> InputStream a -> IO (InputStream a)
filter a -> Bool
p InputStream a
src = InputStream a -> IO (InputStream a)
forall (m :: * -> *) a. Monad m => a -> m a
return (InputStream a -> IO (InputStream a))
-> InputStream a -> IO (InputStream a)
forall a b. (a -> b) -> a -> b
$! IO (Maybe a) -> (a -> IO ()) -> InputStream a
forall a. IO (Maybe a) -> (a -> IO ()) -> InputStream a
InputStream IO (Maybe a)
prod a -> IO ()
pb
  where
    prod :: IO (Maybe a)
prod = InputStream a -> IO (Maybe a)
forall a. InputStream a -> IO (Maybe a)
read InputStream a
src IO (Maybe a) -> (Maybe a -> IO (Maybe a)) -> IO (Maybe a)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= IO (Maybe a) -> (a -> IO (Maybe a)) -> Maybe a -> IO (Maybe a)
forall b a. b -> (a -> b) -> Maybe a -> b
maybe IO (Maybe a)
forall a. IO (Maybe a)
eof a -> IO (Maybe a)
chunk

    chunk :: a -> IO (Maybe a)
chunk a
s = do
        let b :: Bool
b = a -> Bool
p a
s
        if Bool
b then Maybe a -> IO (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe a -> IO (Maybe a)) -> Maybe a -> IO (Maybe a)
forall a b. (a -> b) -> a -> b
$! a -> Maybe a
forall a. a -> Maybe a
Just a
s
             else IO (Maybe a)
prod

    eof :: IO (Maybe a)
eof  = Maybe a -> IO (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
forall a. Maybe a
Nothing
    pb :: a -> IO ()
pb a
s = a -> InputStream a -> IO ()
forall a. a -> InputStream a -> IO ()
unRead a
s InputStream a
src


------------------------------------------------------------------------------
-- | The function @intersperse v s@ wraps the 'OutputStream' @s@, creating a
-- new output stream that writes its input to @s@ interspersed with the
-- provided value @v@. See 'Data.List.intersperse'.
--
-- Example:
--
-- @
-- ghci> import Control.Monad ((>=>))
-- ghci> is <- Streams.'System.IO.Streams.List.fromList' [\"nom\", \"nom\", \"nom\"::'ByteString']
-- ghci> Streams.'System.IO.Streams.List.outputToList' (Streams.'intersperse' \"burp!\" >=> Streams.'System.IO.Streams.connect' is)
-- [\"nom\",\"burp!\",\"nom\",\"burp!\",\"nom\"]
-- @
intersperse :: a -> OutputStream a -> IO (OutputStream a)
intersperse :: a -> OutputStream a -> IO (OutputStream a)
intersperse a
sep OutputStream a
os = Bool -> IO (IORef Bool)
forall a. a -> IO (IORef a)
newIORef Bool
False IO (IORef Bool)
-> (IORef Bool -> IO (OutputStream a)) -> IO (OutputStream a)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (Maybe a -> IO ()) -> IO (OutputStream a)
forall a. (Maybe a -> IO ()) -> IO (OutputStream a)
makeOutputStream ((Maybe a -> IO ()) -> IO (OutputStream a))
-> (IORef Bool -> Maybe a -> IO ())
-> IORef Bool
-> IO (OutputStream a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IORef Bool -> Maybe a -> IO ()
f
  where
    f :: IORef Bool -> Maybe a -> IO ()
f IORef Bool
_ Maybe a
Nothing = Maybe a -> OutputStream a -> IO ()
forall a. Maybe a -> OutputStream a -> IO ()
write Maybe a
forall a. Maybe a
Nothing OutputStream a
os
    f IORef Bool
sendRef Maybe a
s    = do
        Bool
b <- IORef Bool -> IO Bool
forall a. IORef a -> IO a
readIORef IORef Bool
sendRef
        IORef Bool -> Bool -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef Bool
sendRef Bool
True
        Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
b (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Maybe a -> OutputStream a -> IO ()
forall a. Maybe a -> OutputStream a -> IO ()
write (a -> Maybe a
forall a. a -> Maybe a
Just a
sep) OutputStream a
os
        Maybe a -> OutputStream a -> IO ()
forall a. Maybe a -> OutputStream a -> IO ()
write Maybe a
s OutputStream a
os


------------------------------------------------------------------------------
-- | Combines two input streams. Continues yielding elements from both input
-- streams until one of them finishes.
zip :: InputStream a -> InputStream b -> IO (InputStream (a, b))
zip :: InputStream a -> InputStream b -> IO (InputStream (a, b))
zip InputStream a
src1 InputStream b
src2 = IO (Maybe (a, b)) -> IO (InputStream (a, b))
forall a. IO (Maybe a) -> IO (InputStream a)
makeInputStream IO (Maybe (a, b))
src
  where
    src :: IO (Maybe (a, b))
src = InputStream a -> IO (Maybe a)
forall a. InputStream a -> IO (Maybe a)
read InputStream a
src1 IO (Maybe a) -> (Maybe a -> IO (Maybe (a, b))) -> IO (Maybe (a, b))
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (IO (Maybe (a, b))
-> (a -> IO (Maybe (a, b))) -> Maybe a -> IO (Maybe (a, b))
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (Maybe (a, b) -> IO (Maybe (a, b))
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (a, b)
forall a. Maybe a
Nothing) ((a -> IO (Maybe (a, b))) -> Maybe a -> IO (Maybe (a, b)))
-> (a -> IO (Maybe (a, b))) -> Maybe a -> IO (Maybe (a, b))
forall a b. (a -> b) -> a -> b
$ \a
a ->
            InputStream b -> IO (Maybe b)
forall a. InputStream a -> IO (Maybe a)
read InputStream b
src2 IO (Maybe b) -> (Maybe b -> IO (Maybe (a, b))) -> IO (Maybe (a, b))
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (IO (Maybe (a, b))
-> (b -> IO (Maybe (a, b))) -> Maybe b -> IO (Maybe (a, b))
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (a -> InputStream a -> IO ()
forall a. a -> InputStream a -> IO ()
unRead a
a InputStream a
src1 IO () -> IO (Maybe (a, b)) -> IO (Maybe (a, b))
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Maybe (a, b) -> IO (Maybe (a, b))
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (a, b)
forall a. Maybe a
Nothing) ((b -> IO (Maybe (a, b))) -> Maybe b -> IO (Maybe (a, b)))
-> (b -> IO (Maybe (a, b))) -> Maybe b -> IO (Maybe (a, b))
forall a b. (a -> b) -> a -> b
$ \b
b ->
              Maybe (a, b) -> IO (Maybe (a, b))
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe (a, b) -> IO (Maybe (a, b)))
-> Maybe (a, b) -> IO (Maybe (a, b))
forall a b. (a -> b) -> a -> b
$! (a, b) -> Maybe (a, b)
forall a. a -> Maybe a
Just ((a, b) -> Maybe (a, b)) -> (a, b) -> Maybe (a, b)
forall a b. (a -> b) -> a -> b
$! (a
a, b
b)))


------------------------------------------------------------------------------
-- | Combines two input streams using the supplied function. Continues yielding
-- elements from both input streams until one of them finishes.
zipWith :: (a -> b -> c)
        -> InputStream a
        -> InputStream b
        -> IO (InputStream c)
zipWith :: (a -> b -> c)
-> InputStream a -> InputStream b -> IO (InputStream c)
zipWith a -> b -> c
f InputStream a
src1 InputStream b
src2 = IO (Maybe c) -> IO (InputStream c)
forall a. IO (Maybe a) -> IO (InputStream a)
makeInputStream IO (Maybe c)
src
  where
    src :: IO (Maybe c)
src = InputStream a -> IO (Maybe a)
forall a. InputStream a -> IO (Maybe a)
read InputStream a
src1 IO (Maybe a) -> (Maybe a -> IO (Maybe c)) -> IO (Maybe c)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (IO (Maybe c) -> (a -> IO (Maybe c)) -> Maybe a -> IO (Maybe c)
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (Maybe c -> IO (Maybe c)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe c
forall a. Maybe a
Nothing) ((a -> IO (Maybe c)) -> Maybe a -> IO (Maybe c))
-> (a -> IO (Maybe c)) -> Maybe a -> IO (Maybe c)
forall a b. (a -> b) -> a -> b
$ \a
a ->
            InputStream b -> IO (Maybe b)
forall a. InputStream a -> IO (Maybe a)
read InputStream b
src2 IO (Maybe b) -> (Maybe b -> IO (Maybe c)) -> IO (Maybe c)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (IO (Maybe c) -> (b -> IO (Maybe c)) -> Maybe b -> IO (Maybe c)
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (a -> InputStream a -> IO ()
forall a. a -> InputStream a -> IO ()
unRead a
a InputStream a
src1 IO () -> IO (Maybe c) -> IO (Maybe c)
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Maybe c -> IO (Maybe c)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe c
forall a. Maybe a
Nothing) ((b -> IO (Maybe c)) -> Maybe b -> IO (Maybe c))
-> (b -> IO (Maybe c)) -> Maybe b -> IO (Maybe c)
forall a b. (a -> b) -> a -> b
$ \b
b ->
              Maybe c -> IO (Maybe c)
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe c -> IO (Maybe c)) -> Maybe c -> IO (Maybe c)
forall a b. (a -> b) -> a -> b
$! c -> Maybe c
forall a. a -> Maybe a
Just (c -> Maybe c) -> c -> Maybe c
forall a b. (a -> b) -> a -> b
$! a -> b -> c
f a
a b
b ) )


------------------------------------------------------------------------------
-- | Combines two input streams using the supplied monadic function. Continues
-- yielding elements from both input streams until one of them finishes.
zipWithM :: (a -> b -> IO c)
         -> InputStream a
         -> InputStream b
         -> IO (InputStream c)
zipWithM :: (a -> b -> IO c)
-> InputStream a -> InputStream b -> IO (InputStream c)
zipWithM a -> b -> IO c
f InputStream a
src1 InputStream b
src2 = IO (Maybe c) -> IO (InputStream c)
forall a. IO (Maybe a) -> IO (InputStream a)
makeInputStream IO (Maybe c)
src
  where
    src :: IO (Maybe c)
src = InputStream a -> IO (Maybe a)
forall a. InputStream a -> IO (Maybe a)
read InputStream a
src1 IO (Maybe a) -> (Maybe a -> IO (Maybe c)) -> IO (Maybe c)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (IO (Maybe c) -> (a -> IO (Maybe c)) -> Maybe a -> IO (Maybe c)
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (Maybe c -> IO (Maybe c)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe c
forall a. Maybe a
Nothing) ((a -> IO (Maybe c)) -> Maybe a -> IO (Maybe c))
-> (a -> IO (Maybe c)) -> Maybe a -> IO (Maybe c)
forall a b. (a -> b) -> a -> b
$ \a
a ->
            InputStream b -> IO (Maybe b)
forall a. InputStream a -> IO (Maybe a)
read InputStream b
src2 IO (Maybe b) -> (Maybe b -> IO (Maybe c)) -> IO (Maybe c)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (IO (Maybe c) -> (b -> IO (Maybe c)) -> Maybe b -> IO (Maybe c)
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (a -> InputStream a -> IO ()
forall a. a -> InputStream a -> IO ()
unRead a
a InputStream a
src1 IO () -> IO (Maybe c) -> IO (Maybe c)
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Maybe c -> IO (Maybe c)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe c
forall a. Maybe a
Nothing) ((b -> IO (Maybe c)) -> Maybe b -> IO (Maybe c))
-> (b -> IO (Maybe c)) -> Maybe b -> IO (Maybe c)
forall a b. (a -> b) -> a -> b
$ \b
b ->
              a -> b -> IO c
f a
a b
b IO c -> (c -> IO (Maybe c)) -> IO (Maybe c)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \c
c -> Maybe c -> IO (Maybe c)
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe c -> IO (Maybe c)) -> Maybe c -> IO (Maybe c)
forall a b. (a -> b) -> a -> b
$! c -> Maybe c
forall a. a -> Maybe a
Just (c -> Maybe c) -> c -> Maybe c
forall a b. (a -> b) -> a -> b
$! c
c ) )


------------------------------------------------------------------------------
-- | Filters output to be sent to the given 'OutputStream' using a pure
-- function. See 'filter'.
--
-- Example:
--
-- @
-- ghci> import qualified "Data.ByteString.Char8" as S
-- ghci> os1 \<- Streams.'System.IO.Streams.stdout' >>= Streams.'System.IO.Streams.unlines
-- ghci> os2 \<- os1 >>= Streams.'contramap' (S.pack . show) >>= Streams.'filterOutput' even
-- ghci> Streams.'write' (Just 3) os2
-- ghci> Streams.'write' (Just 4) os2
-- 4
-- @
{- Note: The example is a lie, because unlines has weird behavior -}
filterOutput :: (a -> Bool) -> OutputStream a -> IO (OutputStream a)
filterOutput :: (a -> Bool) -> OutputStream a -> IO (OutputStream a)
filterOutput a -> Bool
p OutputStream a
output = (Maybe a -> IO ()) -> IO (OutputStream a)
forall a. (Maybe a -> IO ()) -> IO (OutputStream a)
makeOutputStream Maybe a -> IO ()
chunk
  where
    chunk :: Maybe a -> IO ()
chunk Maybe a
Nothing  = Maybe a -> OutputStream a -> IO ()
forall a. Maybe a -> OutputStream a -> IO ()
write Maybe a
forall a. Maybe a
Nothing OutputStream a
output
    chunk ch :: Maybe a
ch@(Just a
x) = Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (a -> Bool
p a
x) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Maybe a -> OutputStream a -> IO ()
forall a. Maybe a -> OutputStream a -> IO ()
write Maybe a
ch OutputStream a
output


------------------------------------------------------------------------------
-- | Filters output to be sent to the given 'OutputStream' using a predicate
-- function in IO. See 'filterM'.
--
-- Example:
--
-- @
-- ghci> let check a = putStrLn a ("Allow " ++ show a ++ "?") >> readLn :: IO Bool
-- ghci> import qualified Data.ByteString.Char8 as S
-- ghci> os1 <- Streams.'System.IO.Streams.unlines' Streams.'System.IO.Streams.stdout'
-- ghci> os2 \<- os1 >>= Streams.'contramap' (S.pack . show) >>= Streams.'filterOutputM' check
-- ghci> Streams.'System.IO.Streams.write' (Just 3) os2
-- Allow 3?
-- False\<Enter>
-- ghci> Streams.'System.IO.Streams.write' (Just 4) os2
-- Allow 4?
-- True\<Enter>
-- 4
-- @
filterOutputM :: (a -> IO Bool) -> OutputStream a -> IO (OutputStream a)
filterOutputM :: (a -> IO Bool) -> OutputStream a -> IO (OutputStream a)
filterOutputM a -> IO Bool
p OutputStream a
output = (Maybe a -> IO ()) -> IO (OutputStream a)
forall a. (Maybe a -> IO ()) -> IO (OutputStream a)
makeOutputStream Maybe a -> IO ()
chunk
  where
    chunk :: Maybe a -> IO ()
chunk Maybe a
Nothing  = Maybe a -> OutputStream a -> IO ()
forall a. Maybe a -> OutputStream a -> IO ()
write Maybe a
forall a. Maybe a
Nothing OutputStream a
output
    chunk ch :: Maybe a
ch@(Just a
x) = do
        Bool
b <- a -> IO Bool
p a
x
        if Bool
b then Maybe a -> OutputStream a -> IO ()
forall a. Maybe a -> OutputStream a -> IO ()
write Maybe a
ch OutputStream a
output else () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return (() -> IO ()) -> () -> IO ()
forall a b. (a -> b) -> a -> b
$! ()


------------------------------------------------------------------------------
-- | Takes apart a stream of pairs, producing a pair of input streams. Reading
-- from either of the produced streams will cause a pair of values to be pulled
-- from the original stream if necessary. Note that reading @n@ values from one
-- of the returned streams will cause @n@ values to be buffered at the other
-- stream.
--
-- Access to the original stream is thread safe, i.e. guarded by a lock.
unzip :: forall a b . InputStream (a, b) -> IO (InputStream a, InputStream b)
unzip :: InputStream (a, b) -> IO (InputStream a, InputStream b)
unzip InputStream (a, b)
os = do
    MVar ()
lock <- () -> IO (MVar ())
forall a. a -> IO (MVar a)
newMVar (() -> IO (MVar ())) -> () -> IO (MVar ())
forall a b. (a -> b) -> a -> b
$! ()
    IORef ([a] -> [a])
buf1 <- ([a] -> [a]) -> IO (IORef ([a] -> [a]))
forall a. a -> IO (IORef a)
newIORef [a] -> [a]
forall a. a -> a
id
    IORef ([b] -> [b])
buf2 <- ([b] -> [b]) -> IO (IORef ([b] -> [b]))
forall a. a -> IO (IORef a)
newIORef [b] -> [b]
forall a. a -> a
id

    InputStream a
is1  <- IO (Maybe a) -> IO (InputStream a)
forall a. IO (Maybe a) -> IO (InputStream a)
makeInputStream (IO (Maybe a) -> IO (InputStream a))
-> IO (Maybe a) -> IO (InputStream a)
forall a b. (a -> b) -> a -> b
$ MVar () -> IORef ([a] -> [a]) -> IORef ([b] -> [b]) -> IO (Maybe a)
forall b.
MVar b -> IORef ([a] -> [a]) -> IORef ([b] -> [b]) -> IO (Maybe a)
src1 MVar ()
lock IORef ([a] -> [a])
buf1 IORef ([b] -> [b])
buf2
    InputStream b
is2  <- IO (Maybe b) -> IO (InputStream b)
forall a. IO (Maybe a) -> IO (InputStream a)
makeInputStream (IO (Maybe b) -> IO (InputStream b))
-> IO (Maybe b) -> IO (InputStream b)
forall a b. (a -> b) -> a -> b
$ MVar () -> IORef ([a] -> [a]) -> IORef ([b] -> [b]) -> IO (Maybe b)
forall b.
MVar b -> IORef ([a] -> [a]) -> IORef ([b] -> [b]) -> IO (Maybe b)
src2 MVar ()
lock IORef ([a] -> [a])
buf1 IORef ([b] -> [b])
buf2

    (InputStream a, InputStream b) -> IO (InputStream a, InputStream b)
forall (m :: * -> *) a. Monad m => a -> m a
return (InputStream a
is1, InputStream b
is2)

  where
    twist :: (b, a) -> (a, b)
twist (b
a,a
b) = (a
b,b
a)

    src1 :: MVar b -> IORef ([a] -> [a]) -> IORef ([b] -> [b]) -> IO (Maybe a)
src1 MVar b
lock IORef ([a] -> [a])
aBuf IORef ([b] -> [b])
bBuf = MVar b -> (b -> IO (Maybe a)) -> IO (Maybe a)
forall a b. MVar a -> (a -> IO b) -> IO b
withMVar MVar b
lock ((b -> IO (Maybe a)) -> IO (Maybe a))
-> (b -> IO (Maybe a)) -> IO (Maybe a)
forall a b. (a -> b) -> a -> b
$ IO (Maybe a) -> b -> IO (Maybe a)
forall a b. a -> b -> a
const (IO (Maybe a) -> b -> IO (Maybe a))
-> IO (Maybe a) -> b -> IO (Maybe a)
forall a b. (a -> b) -> a -> b
$ do
        [a] -> [a]
dl <- IORef ([a] -> [a]) -> IO ([a] -> [a])
forall a. IORef a -> IO a
readIORef IORef ([a] -> [a])
aBuf
        case [a] -> [a]
dl [] of
          []     -> InputStream (a, b)
-> ((a, b) -> (a, b)) -> IORef ([b] -> [b]) -> IO (Maybe a)
forall a b x y.
InputStream (a, b)
-> ((a, b) -> (x, y)) -> IORef ([y] -> [y]) -> IO (Maybe x)
more InputStream (a, b)
os (a, b) -> (a, b)
forall a. a -> a
id IORef ([b] -> [b])
bBuf
          (a
x:[a]
xs) -> IORef ([a] -> [a]) -> ([a] -> [a]) -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef ([a] -> [a])
aBuf ([a]
xs[a] -> [a] -> [a]
forall a. [a] -> [a] -> [a]
++) IO () -> IO (Maybe a) -> IO (Maybe a)
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> (Maybe a -> IO (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe a -> IO (Maybe a)) -> Maybe a -> IO (Maybe a)
forall a b. (a -> b) -> a -> b
$! a -> Maybe a
forall a. a -> Maybe a
Just a
x)

    src2 :: MVar b -> IORef ([a] -> [a]) -> IORef ([b] -> [b]) -> IO (Maybe b)
src2 MVar b
lock IORef ([a] -> [a])
aBuf IORef ([b] -> [b])
bBuf = MVar b -> (b -> IO (Maybe b)) -> IO (Maybe b)
forall a b. MVar a -> (a -> IO b) -> IO b
withMVar MVar b
lock ((b -> IO (Maybe b)) -> IO (Maybe b))
-> (b -> IO (Maybe b)) -> IO (Maybe b)
forall a b. (a -> b) -> a -> b
$ IO (Maybe b) -> b -> IO (Maybe b)
forall a b. a -> b -> a
const (IO (Maybe b) -> b -> IO (Maybe b))
-> IO (Maybe b) -> b -> IO (Maybe b)
forall a b. (a -> b) -> a -> b
$ do
        [b] -> [b]
dl <- IORef ([b] -> [b]) -> IO ([b] -> [b])
forall a. IORef a -> IO a
readIORef IORef ([b] -> [b])
bBuf
        case [b] -> [b]
dl [] of
          []     -> InputStream (a, b)
-> ((a, b) -> (b, a)) -> IORef ([a] -> [a]) -> IO (Maybe b)
forall a b x y.
InputStream (a, b)
-> ((a, b) -> (x, y)) -> IORef ([y] -> [y]) -> IO (Maybe x)
more InputStream (a, b)
os (a, b) -> (b, a)
forall b a. (b, a) -> (a, b)
twist IORef ([a] -> [a])
aBuf
          (b
y:[b]
ys) -> IORef ([b] -> [b]) -> ([b] -> [b]) -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef ([b] -> [b])
bBuf ([b]
ys[b] -> [b] -> [b]
forall a. [a] -> [a] -> [a]
++) IO () -> IO (Maybe b) -> IO (Maybe b)
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> (Maybe b -> IO (Maybe b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe b -> IO (Maybe b)) -> Maybe b -> IO (Maybe b)
forall a b. (a -> b) -> a -> b
$! b -> Maybe b
forall a. a -> Maybe a
Just b
y)

    more :: forall a b x y .
            InputStream (a,b)
         -> ((a,b) -> (x,y))
         -> IORef ([y] -> [y])
         -> IO (Maybe x)
    more :: InputStream (a, b)
-> ((a, b) -> (x, y)) -> IORef ([y] -> [y]) -> IO (Maybe x)
more InputStream (a, b)
origs (a, b) -> (x, y)
proj IORef ([y] -> [y])
buf = InputStream (a, b) -> IO (Maybe (a, b))
forall a. InputStream a -> IO (Maybe a)
read InputStream (a, b)
origs IO (Maybe (a, b)) -> (Maybe (a, b) -> IO (Maybe x)) -> IO (Maybe x)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>=
                          IO (Maybe x)
-> ((a, b) -> IO (Maybe x)) -> Maybe (a, b) -> IO (Maybe x)
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (Maybe x -> IO (Maybe x)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe x
forall a. Maybe a
Nothing)
                                (\(a, b)
x -> do
                                    let (x
a, y
b) = (a, b) -> (x, y)
proj (a, b)
x
                                    IORef ([y] -> [y]) -> (([y] -> [y]) -> [y] -> [y]) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef IORef ([y] -> [y])
buf (([y] -> [y]) -> ([y] -> [y]) -> [y] -> [y]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (y
by -> [y] -> [y]
forall a. a -> [a] -> [a]
:))
                                    Maybe x -> IO (Maybe x)
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe x -> IO (Maybe x)) -> Maybe x -> IO (Maybe x)
forall a b. (a -> b) -> a -> b
$! x -> Maybe x
forall a. a -> Maybe a
Just x
a)


------------------------------------------------------------------------------
-- | Given two 'OutputStream's, returns a new stream that "unzips" the tuples
-- being written, writing the two elements to the corresponding given streams.
--
-- You can use this together with @'contramap' (\\ x -> (x, x))@ to "fork" a
-- stream into two.
--
-- /Since: 1.5.2.0/
contraunzip :: OutputStream a -> OutputStream b -> IO (OutputStream (a, b))
contraunzip :: OutputStream a -> OutputStream b -> IO (OutputStream (a, b))
contraunzip OutputStream a
sink1 OutputStream b
sink2 = (Maybe (a, b) -> IO ()) -> IO (OutputStream (a, b))
forall a. (Maybe a -> IO ()) -> IO (OutputStream a)
makeOutputStream ((Maybe (a, b) -> IO ()) -> IO (OutputStream (a, b)))
-> (Maybe (a, b) -> IO ()) -> IO (OutputStream (a, b))
forall a b. (a -> b) -> a -> b
$ \ Maybe (a, b)
tuple -> do
    Maybe a -> OutputStream a -> IO ()
forall a. Maybe a -> OutputStream a -> IO ()
write (((a, b) -> a) -> Maybe (a, b) -> Maybe a
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (a, b) -> a
forall a b. (a, b) -> a
fst Maybe (a, b)
tuple) OutputStream a
sink1
    Maybe b -> OutputStream b -> IO ()
forall a. Maybe a -> OutputStream a -> IO ()
write (((a, b) -> b) -> Maybe (a, b) -> Maybe b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (a, b) -> b
forall a b. (a, b) -> b
snd Maybe (a, b)
tuple) OutputStream b
sink2


------------------------------------------------------------------------------
-- | Wraps an 'InputStream', producing a new 'InputStream' that will produce at
-- most @n@ items, subsequently yielding end-of-stream forever.
--
-- Items pushed back to the returned 'InputStream' will be propagated upstream,
-- modifying the count of taken items accordingly.
--
-- Example:
--
-- @
-- ghci> is <- Streams.'fromList' [1..9::Int]
-- ghci> is' <- Streams.'take' 1 is
-- ghci> Streams.'read' is'
-- Just 1
-- ghci> Streams.'read' is'
-- Nothing
-- ghci> Streams.'System.IO.Streams.peek' is
-- Just 2
-- ghci> Streams.'unRead' 11 is'
-- ghci> Streams.'System.IO.Streams.peek' is
-- Just 11
-- ghci> Streams.'System.IO.Streams.peek' is'
-- Just 11
-- ghci> Streams.'read' is'
-- Just 11
-- ghci> Streams.'read' is'
-- Nothing
-- ghci> Streams.'read' is
-- Just 2
-- ghci> Streams.'toList' is
-- [3,4,5,6,7,8,9]
-- @
--
take :: Int64 -> InputStream a -> IO (InputStream a)
take :: Int64 -> InputStream a -> IO (InputStream a)
take Int64
k0 InputStream a
input = do
    IORef Int64
kref <- Int64 -> IO (IORef Int64)
forall a. a -> IO (IORef a)
newIORef Int64
k0
    InputStream a -> IO (InputStream a)
forall (m :: * -> *) a. Monad m => a -> m a
return (InputStream a -> IO (InputStream a))
-> InputStream a -> IO (InputStream a)
forall a b. (a -> b) -> a -> b
$! IO (Maybe a) -> (a -> IO ()) -> InputStream a
forall a. IO (Maybe a) -> (a -> IO ()) -> InputStream a
InputStream (IORef Int64 -> IO (Maybe a)
forall a. (Ord a, Num a) => IORef a -> IO (Maybe a)
prod IORef Int64
kref) (IORef Int64 -> a -> IO ()
forall a. Num a => IORef a -> a -> IO ()
pb IORef Int64
kref)
  where
    prod :: IORef a -> IO (Maybe a)
prod IORef a
kref = do
        !a
k <- IORef a -> IO a
forall a. IORef a -> IO a
readIORef IORef a
kref
        if a
k a -> a -> Bool
forall a. Ord a => a -> a -> Bool
<= a
0
          then Maybe a -> IO (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
forall a. Maybe a
Nothing
          else do
              Maybe a
m <- InputStream a -> IO (Maybe a)
forall a. InputStream a -> IO (Maybe a)
read InputStream a
input
              Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Maybe a -> Bool
forall a. Maybe a -> Bool
isJust Maybe a
m) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ IORef a -> (a -> a) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef IORef a
kref ((a -> a) -> IO ()) -> (a -> a) -> IO ()
forall a b. (a -> b) -> a -> b
$ \a
x -> a
x a -> a -> a
forall a. Num a => a -> a -> a
- a
1
              Maybe a -> IO (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
m

    pb :: IORef a -> a -> IO ()
pb IORef a
kref !a
s = do
       a -> InputStream a -> IO ()
forall a. a -> InputStream a -> IO ()
unRead a
s InputStream a
input
       IORef a -> (a -> a) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef IORef a
kref (a -> a -> a
forall a. Num a => a -> a -> a
+a
1)


------------------------------------------------------------------------------
-- | Wraps an 'InputStream', producing a new 'InputStream' that will drop the
-- first @n@ items produced by the wrapped stream. See 'Prelude.drop'.
--
-- Items pushed back to the returned 'InputStream' will be propagated upstream,
-- modifying the count of dropped items accordingly.
drop :: Int64 -> InputStream a -> IO (InputStream a)
drop :: Int64 -> InputStream a -> IO (InputStream a)
drop Int64
k0 InputStream a
input = do
    IORef Int64
kref <- Int64 -> IO (IORef Int64)
forall a. a -> IO (IORef a)
newIORef Int64
k0
    InputStream a -> IO (InputStream a)
forall (m :: * -> *) a. Monad m => a -> m a
return (InputStream a -> IO (InputStream a))
-> InputStream a -> IO (InputStream a)
forall a b. (a -> b) -> a -> b
$! IO (Maybe a) -> (a -> IO ()) -> InputStream a
forall a. IO (Maybe a) -> (a -> IO ()) -> InputStream a
InputStream (IORef Int64 -> IO (Maybe a)
forall a. (Ord a, Num a) => IORef a -> IO (Maybe a)
prod IORef Int64
kref) (IORef Int64 -> a -> IO ()
forall a. Num a => IORef a -> a -> IO ()
pb IORef Int64
kref)
  where
    prod :: IORef a -> IO (Maybe a)
prod IORef a
kref = do
        !a
k <- IORef a -> IO a
forall a. IORef a -> IO a
readIORef IORef a
kref
        if a
k a -> a -> Bool
forall a. Ord a => a -> a -> Bool
<= a
0
          then IORef a -> IO (Maybe a)
forall a. Num a => IORef a -> IO (Maybe a)
getInput IORef a
kref
          else IORef a -> IO (Maybe a)
discard IORef a
kref

    getInput :: IORef a -> IO (Maybe a)
getInput IORef a
kref = do
        InputStream a -> IO (Maybe a)
forall a. InputStream a -> IO (Maybe a)
read InputStream a
input IO (Maybe a) -> (Maybe a -> IO (Maybe a)) -> IO (Maybe a)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= IO (Maybe a) -> (a -> IO (Maybe a)) -> Maybe a -> IO (Maybe a)
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (Maybe a -> IO (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
forall a. Maybe a
Nothing) (\a
c -> do
            IORef a -> (a -> a) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef IORef a
kref (\a
x -> a
x a -> a -> a
forall a. Num a => a -> a -> a
- a
1)
            Maybe a -> IO (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe a -> IO (Maybe a)) -> Maybe a -> IO (Maybe a)
forall a b. (a -> b) -> a -> b
$! a -> Maybe a
forall a. a -> Maybe a
Just a
c)

    discard :: IORef a -> IO (Maybe a)
discard IORef a
kref = IORef a -> IO (Maybe a)
forall a. Num a => IORef a -> IO (Maybe a)
getInput IORef a
kref IO (Maybe a) -> (Maybe a -> IO (Maybe a)) -> IO (Maybe a)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= IO (Maybe a) -> (a -> IO (Maybe a)) -> Maybe a -> IO (Maybe a)
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (Maybe a -> IO (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
forall a. Maybe a
Nothing) (IO (Maybe a) -> a -> IO (Maybe a)
forall a b. a -> b -> a
const (IO (Maybe a) -> a -> IO (Maybe a))
-> IO (Maybe a) -> a -> IO (Maybe a)
forall a b. (a -> b) -> a -> b
$ IORef a -> IO (Maybe a)
prod IORef a
kref)

    pb :: IORef a -> a -> IO ()
pb IORef a
kref a
s = do
        a -> InputStream a -> IO ()
forall a. a -> InputStream a -> IO ()
unRead a
s InputStream a
input
        IORef a -> (a -> a) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef IORef a
kref (a -> a -> a
forall a. Num a => a -> a -> a
+a
1)


------------------------------------------------------------------------------
-- | Wraps an 'OutputStream', producing a new 'OutputStream' that will pass at
-- most @n@ items on to the wrapped stream, subsequently ignoring the rest of
-- the input.
--
give :: Int64 -> OutputStream a -> IO (OutputStream a)
give :: Int64 -> OutputStream a -> IO (OutputStream a)
give Int64
k OutputStream a
output = Int64 -> IO (IORef Int64)
forall a. a -> IO (IORef a)
newIORef Int64
k IO (IORef Int64)
-> (IORef Int64 -> IO (OutputStream a)) -> IO (OutputStream a)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (Maybe a -> IO ()) -> IO (OutputStream a)
forall a. (Maybe a -> IO ()) -> IO (OutputStream a)
makeOutputStream ((Maybe a -> IO ()) -> IO (OutputStream a))
-> (IORef Int64 -> Maybe a -> IO ())
-> IORef Int64
-> IO (OutputStream a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IORef Int64 -> Maybe a -> IO ()
forall a. (Ord a, Num a) => IORef a -> Maybe a -> IO ()
chunk
  where
    chunk :: IORef a -> Maybe a -> IO ()
chunk IORef a
ref = IO () -> (a -> IO ()) -> Maybe a -> IO ()
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (() -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return (() -> IO ()) -> () -> IO ()
forall a b. (a -> b) -> a -> b
$! ()) ((a -> IO ()) -> Maybe a -> IO ())
-> (a -> IO ()) -> Maybe a -> IO ()
forall a b. (a -> b) -> a -> b
$ \a
x -> do
                    !a
n <- IORef a -> IO a
forall a. IORef a -> IO a
readIORef IORef a
ref
                    if a
n a -> a -> Bool
forall a. Ord a => a -> a -> Bool
<= a
0
                      then () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return (() -> IO ()) -> () -> IO ()
forall a b. (a -> b) -> a -> b
$! ()
                      else do
                          IORef a -> a -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef a
ref (a -> IO ()) -> a -> IO ()
forall a b. (a -> b) -> a -> b
$! a
n a -> a -> a
forall a. Num a => a -> a -> a
- a
1
                          Maybe a -> OutputStream a -> IO ()
forall a. Maybe a -> OutputStream a -> IO ()
write (a -> Maybe a
forall a. a -> Maybe a
Just a
x) OutputStream a
output


------------------------------------------------------------------------------
-- | Wraps an 'OutputStream', producing a new 'OutputStream' that will ignore
-- the first @n@ items received, subsequently passing the rest of the input on
-- to the wrapped stream.
--
ignore :: Int64 -> OutputStream a -> IO (OutputStream a)
ignore :: Int64 -> OutputStream a -> IO (OutputStream a)
ignore Int64
k OutputStream a
output = Int64 -> IO (IORef Int64)
forall a. a -> IO (IORef a)
newIORef Int64
k IO (IORef Int64)
-> (IORef Int64 -> IO (OutputStream a)) -> IO (OutputStream a)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (Maybe a -> IO ()) -> IO (OutputStream a)
forall a. (Maybe a -> IO ()) -> IO (OutputStream a)
makeOutputStream ((Maybe a -> IO ()) -> IO (OutputStream a))
-> (IORef Int64 -> Maybe a -> IO ())
-> IORef Int64
-> IO (OutputStream a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IORef Int64 -> Maybe a -> IO ()
forall a. (Ord a, Num a) => IORef a -> Maybe a -> IO ()
chunk
  where
    chunk :: IORef a -> Maybe a -> IO ()
chunk IORef a
ref = IO () -> (a -> IO ()) -> Maybe a -> IO ()
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (() -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return (() -> IO ()) -> () -> IO ()
forall a b. (a -> b) -> a -> b
$! ()) ((a -> IO ()) -> Maybe a -> IO ())
-> (a -> IO ()) -> Maybe a -> IO ()
forall a b. (a -> b) -> a -> b
$ \a
x -> do
                    !a
n <- IORef a -> IO a
forall a. IORef a -> IO a
readIORef IORef a
ref
                    if a
n a -> a -> Bool
forall a. Ord a => a -> a -> Bool
> a
0
                      then IORef a -> a -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef a
ref (a -> IO ()) -> a -> IO ()
forall a b. (a -> b) -> a -> b
$! a
n a -> a -> a
forall a. Num a => a -> a -> a
- a
1
                      else Maybe a -> OutputStream a -> IO ()
forall a. Maybe a -> OutputStream a -> IO ()
write (a -> Maybe a
forall a. a -> Maybe a
Just a
x) OutputStream a
output


------------------------------------------------------------------------------
-- | Wraps an 'OutputStream', ignoring any end-of-stream 'Nothing' values
-- written to the returned stream.
--
-- /Since: 1.0.1.0/
--
ignoreEof :: OutputStream a -> IO (OutputStream a)
ignoreEof :: OutputStream a -> IO (OutputStream a)
ignoreEof OutputStream a
s = OutputStream a -> IO (OutputStream a)
forall (m :: * -> *) a. Monad m => a -> m a
return (OutputStream a -> IO (OutputStream a))
-> OutputStream a -> IO (OutputStream a)
forall a b. (a -> b) -> a -> b
$ (Maybe a -> IO ()) -> OutputStream a
forall a. (Maybe a -> IO ()) -> OutputStream a
OutputStream Maybe a -> IO ()
f
  where
    f :: Maybe a -> IO ()
f Maybe a
Nothing  = () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return (() -> IO ()) -> () -> IO ()
forall a b. (a -> b) -> a -> b
$! ()
    f Maybe a
x        = Maybe a -> OutputStream a -> IO ()
forall a. Maybe a -> OutputStream a -> IO ()
write Maybe a
x OutputStream a
s


------------------------------------------------------------------------------
-- | Wraps an 'InputStream', running the specified action when the stream
-- yields end-of-file.
--
-- /Since: 1.0.2.0/
--
atEndOfInput :: IO b -> InputStream a -> IO (InputStream a)
atEndOfInput :: IO b -> InputStream a -> IO (InputStream a)
atEndOfInput IO b
m InputStream a
is = InputStream a -> IO (InputStream a)
forall (m :: * -> *) a. Monad m => a -> m a
return (InputStream a -> IO (InputStream a))
-> InputStream a -> IO (InputStream a)
forall a b. (a -> b) -> a -> b
$! IO (Maybe a) -> (a -> IO ()) -> InputStream a
forall a. IO (Maybe a) -> (a -> IO ()) -> InputStream a
InputStream IO (Maybe a)
prod a -> IO ()
pb
  where
    prod :: IO (Maybe a)
prod    = InputStream a -> IO (Maybe a)
forall a. InputStream a -> IO (Maybe a)
read InputStream a
is IO (Maybe a) -> (Maybe a -> IO (Maybe a)) -> IO (Maybe a)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= IO (Maybe a) -> (a -> IO (Maybe a)) -> Maybe a -> IO (Maybe a)
forall b a. b -> (a -> b) -> Maybe a -> b
maybe IO (Maybe a)
forall a. IO (Maybe a)
eof (Maybe a -> IO (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe a -> IO (Maybe a)) -> (a -> Maybe a) -> a -> IO (Maybe a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Maybe a
forall a. a -> Maybe a
Just)
    eof :: IO (Maybe a)
eof     = IO b -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void IO b
m IO () -> IO (Maybe a) -> IO (Maybe a)
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Maybe a -> IO (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
forall a. Maybe a
Nothing
    pb :: a -> IO ()
pb a
s    = a -> InputStream a -> IO ()
forall a. a -> InputStream a -> IO ()
unRead a
s InputStream a
is


------------------------------------------------------------------------------
-- | Wraps an 'OutputStream', running the specified action when the stream
-- receives end-of-file.
--
-- /Since: 1.0.2.0/
--
atEndOfOutput :: IO b -> OutputStream a -> IO (OutputStream a)
atEndOfOutput :: IO b -> OutputStream a -> IO (OutputStream a)
atEndOfOutput IO b
m OutputStream a
os = (Maybe a -> IO ()) -> IO (OutputStream a)
forall a. (Maybe a -> IO ()) -> IO (OutputStream a)
makeOutputStream Maybe a -> IO ()
f
  where
    f :: Maybe a -> IO ()
f Maybe a
Nothing = Maybe a -> OutputStream a -> IO ()
forall a. Maybe a -> OutputStream a -> IO ()
write Maybe a
forall a. Maybe a
Nothing OutputStream a
os IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO b -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void IO b
m
    f Maybe a
x       = Maybe a -> OutputStream a -> IO ()
forall a. Maybe a -> OutputStream a -> IO ()
write Maybe a
x OutputStream a
os