-- |
-- Module      : Streamly.Internal.Data.Sink
-- Copyright   : (c) 2019 Composewell Technologies
-- License     : BSD3
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC
--
--
-- The 'Sink' type is a just a special case of 'Fold' and we can do without
-- it. However, in some cases 'Sink' is a simpler type and may provide better
-- performance than 'Fold' because it does not maintain any state. Folds can
-- be used for both pure and monadic computations. Sinks are not applicable to
-- pure computations.

module Streamly.Internal.Data.Sink
    (
      Sink (..)

    -- * Upgrading
    , toFold

    -- * Composing Sinks
    -- ** Distribute
    , tee
    , distribute

    -- ** Demultiplex
    , demux

    -- ** Unzip
    , unzipM
    , unzip

    -- -- ** Group
    -- , grouped

    -- -- ** Nest
    -- , concatFold

    -- -- * Comonad
    -- , duplicate

    -- * Input Transformation
    -- | These are contravariant operations i.e. they apply on the input of the
    -- 'Sink', for this reason they are prefixed with 'l' for 'left'.
    , lmap
    , lmapM
    , lfilter
    , lfilterM

    -- * Sinks
    , drain
    , drainM
    -- , drainN
    -- , drainWhile
    )
where

import Control.Monad ((>=>), when, void)
import Data.Map.Strict (Map)
import Prelude
       hiding (filter, drop, dropWhile, take, takeWhile, zipWith, foldr,
               foldl, map, mapM, sequence, all, any, sum, product, elem,
               notElem, maximum, minimum, head, last, tail, length, null,
               reverse, iterate, init, and, or, lookup, foldr1, (!!),
               scanl, scanl1, replicate, concatMap, mconcat, foldMap, unzip)

import Streamly.Internal.Data.Fold.Type (Fold(..), Step(..))
import Streamly.Internal.Data.Sink.Type (Sink(..))

import qualified Data.Map.Strict as Map

------------------------------------------------------------------------------
-- Conversion
------------------------------------------------------------------------------

-- | Convert a 'Sink' to a 'Fold'. When you want to compose sinks and folds
-- together, upgrade a sink to a fold before composing.
toFold :: Monad m => Sink m a -> Fold m a ()
toFold :: Sink m a -> Fold m a ()
toFold (Sink a -> m ()
f) = (() -> a -> m (Step () ()))
-> m (Step () ()) -> (() -> m ()) -> Fold m a ()
forall (m :: * -> *) a b s.
(s -> a -> m (Step s b))
-> m (Step s b) -> (s -> m b) -> Fold m a b
Fold () -> a -> m (Step () ())
forall p b. p -> a -> m (Step () b)
step m (Step () ())
forall b. m (Step () b)
begin () -> m ()
forall (m :: * -> *) p. Monad m => p -> m ()
done
    where
    begin :: m (Step () b)
begin = Step () b -> m (Step () b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step () b -> m (Step () b)) -> Step () b -> m (Step () b)
forall a b. (a -> b) -> a -> b
$ () -> Step () b
forall s b. s -> Step s b
Partial ()
    step :: p -> a -> m (Step () b)
step p
_ a
a = () -> Step () b
forall s b. s -> Step s b
Partial (() -> Step () b) -> m () -> m (Step () b)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> a -> m ()
f a
a
    done :: p -> m ()
done p
_ = () -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()

------------------------------------------------------------------------------
-- Composing with sinks
------------------------------------------------------------------------------

-- | Distribute one copy each of the input to both the sinks.
--
-- @
--                 |-------Sink m a
-- ---stream m a---|
--                 |-------Sink m a
-- @
-- @
-- > let pr x = Sink.drainM (putStrLn . ((x ++ " ") ++) . show)
-- > sink (Sink.tee (pr \"L") (pr \"R")) (S.enumerateFromTo 1 2)
-- L 1
-- R 1
-- L 2
-- R 2
-- @
--
tee :: Monad m => Sink m a -> Sink m a -> Sink m a
tee :: Sink m a -> Sink m a -> Sink m a
tee (Sink a -> m ()
fL) (Sink a -> m ()
fR) = (a -> m ()) -> Sink m a
forall (m :: * -> *) a. (a -> m ()) -> Sink m a
Sink (\a
a -> a -> m ()
fL a
a m () -> m () -> m ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> a -> m ()
fR a
a)

-- | Distribute copies of the input to all the sinks in a container.
--
-- @
--                 |-------Sink m a
-- ---stream m a---|
--                 |-------Sink m a
--                 |
--                       ...
-- @
-- @
-- > let pr x = Sink.drainM (putStrLn . ((x ++ " ") ++) . show)
-- > sink (Sink.distribute [(pr \"L"), (pr \"R")]) (S.enumerateFromTo 1 2)
-- L 1
-- R 1
-- L 2
-- R 2
-- @
--
-- This is the consumer side dual of the producer side 'sequence_' operation.
{-# INLINE distribute #-}
distribute :: Monad m => [Sink m a] -> Sink m a
distribute :: [Sink m a] -> Sink m a
distribute [Sink m a]
ss = (a -> m ()) -> Sink m a
forall (m :: * -> *) a. (a -> m ()) -> Sink m a
Sink (\a
a -> (Sink m a -> m ()) -> [Sink m a] -> m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
Prelude.mapM_ (\(Sink a -> m ()
f) -> a -> m ()
f a
a) [Sink m a]
ss)

-- | Demultiplex to multiple consumers without collecting the results. Useful
-- to run different effectful computations depending on the value of the stream
-- elements, for example handling network packets of different types using
-- different handlers.
--
-- @
--
--                             |-------Sink m a
-- -----stream m a-----Map-----|
--                             |-------Sink m a
--                             |
--                                       ...
-- @
--
-- @
-- > let pr x = Sink.drainM (putStrLn . ((x ++ " ") ++) . show)
-- > let table = Data.Map.fromList [(1, pr \"One"), (2, pr \"Two")]
--   in Sink.sink (Sink.demux id table) (S.enumerateFromTo 1 100)
-- One 1
-- Two 2
-- @
{-
demux :: (Monad m, Ord k) => (a -> k) -> Map k (Sink m a) -> Sink m a
demux f kv = Sink step

    where

    step a =
        -- XXX should we raise an exception in Nothing case?
        -- Ideally we should enforce that it is a total map over k so that look
        -- up never fails
        case Map.lookup (f a) kv of
            Nothing -> return ()
            Just (Sink g) -> g a
-}

demux :: (Monad m, Ord k) => Map k (Sink m a) -> Sink m (a, k)
demux :: Map k (Sink m a) -> Sink m (a, k)
demux Map k (Sink m a)
kv = ((a, k) -> m ()) -> Sink m (a, k)
forall (m :: * -> *) a. (a -> m ()) -> Sink m a
Sink (a, k) -> m ()
step

    where

    step :: (a, k) -> m ()
step (a
a, k
k) =
        -- XXX should we raise an exception in Nothing case?
        -- Ideally we should enforce that it is a total map over k so that look
        -- up never fails
        case k -> Map k (Sink m a) -> Maybe (Sink m a)
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup k
k Map k (Sink m a)
kv of
            Maybe (Sink m a)
Nothing -> () -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
            Just (Sink a -> m ()
g) -> a -> m ()
g a
a

-- | Split elements in the input stream into two parts using a monadic unzip
-- function, direct each part to a different sink.
--
-- @
--
--                           |-------Sink m b
-- -----Stream m a----(b,c)--|
--                           |-------Sink m c
-- @
-- @
-- > let pr x = Sink.drainM (putStrLn . ((x ++ " ") ++) . show)
--   in Sink.sink (Sink.unzip return (pr \"L") (pr \"R")) (S.fromPure (1,2))
-- L 1
-- R 2
-- @
{-# INLINE unzipM #-}
unzipM :: Monad m => (a -> m (b,c)) -> Sink m b -> Sink m c -> Sink m a
unzipM :: (a -> m (b, c)) -> Sink m b -> Sink m c -> Sink m a
unzipM a -> m (b, c)
f (Sink b -> m ()
stepB) (Sink c -> m ()
stepC) =
    (a -> m ()) -> Sink m a
forall (m :: * -> *) a. (a -> m ()) -> Sink m a
Sink (a -> m (b, c)
f (a -> m (b, c)) -> ((b, c) -> m ()) -> a -> m ()
forall (m :: * -> *) a b c.
Monad m =>
(a -> m b) -> (b -> m c) -> a -> m c
>=> (\(b
b, c
c) -> b -> m ()
stepB b
b m () -> m () -> m ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> c -> m ()
stepC c
c))

-- | Same as 'unzipM' but with a pure unzip function.
{-# INLINE unzip #-}
unzip :: Monad m => (a -> (b,c)) -> Sink m b -> Sink m c -> Sink m a
unzip :: (a -> (b, c)) -> Sink m b -> Sink m c -> Sink m a
unzip a -> (b, c)
f = (a -> m (b, c)) -> Sink m b -> Sink m c -> Sink m a
forall (m :: * -> *) a b c.
Monad m =>
(a -> m (b, c)) -> Sink m b -> Sink m c -> Sink m a
unzipM ((b, c) -> m (b, c)
forall (m :: * -> *) a. Monad m => a -> m a
return ((b, c) -> m (b, c)) -> (a -> (b, c)) -> a -> m (b, c)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> (b, c)
f)

------------------------------------------------------------------------------
-- Input transformation
------------------------------------------------------------------------------

-- | Map a pure function on the input of a 'Sink'.
{-# INLINABLE lmap #-}
lmap :: (a -> b) -> Sink m b -> Sink m a
lmap :: (a -> b) -> Sink m b -> Sink m a
lmap a -> b
f (Sink b -> m ()
step) = (a -> m ()) -> Sink m a
forall (m :: * -> *) a. (a -> m ()) -> Sink m a
Sink (b -> m ()
step (b -> m ()) -> (a -> b) -> a -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> b
f)

-- | Map a monadic function on the input of a 'Sink'.
{-# INLINABLE lmapM #-}
lmapM :: Monad m => (a -> m b) -> Sink m b -> Sink m a
lmapM :: (a -> m b) -> Sink m b -> Sink m a
lmapM a -> m b
f (Sink b -> m ()
step) = (a -> m ()) -> Sink m a
forall (m :: * -> *) a. (a -> m ()) -> Sink m a
Sink (a -> m b
f (a -> m b) -> (b -> m ()) -> a -> m ()
forall (m :: * -> *) a b c.
Monad m =>
(a -> m b) -> (b -> m c) -> a -> m c
>=> b -> m ()
step)

-- | Filter the input of a 'Sink' using a pure predicate function.
{-# INLINABLE lfilter #-}
lfilter :: Monad m => (a -> Bool) -> Sink m a -> Sink m a
lfilter :: (a -> Bool) -> Sink m a -> Sink m a
lfilter a -> Bool
f (Sink a -> m ()
step) = (a -> m ()) -> Sink m a
forall (m :: * -> *) a. (a -> m ()) -> Sink m a
Sink (\a
a -> Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (a -> Bool
f a
a) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ a -> m ()
step a
a)

-- | Filter the input of a 'Sink' using a monadic predicate function.
{-# INLINABLE lfilterM #-}
lfilterM :: Monad m => (a -> m Bool) -> Sink m a -> Sink m a
lfilterM :: (a -> m Bool) -> Sink m a -> Sink m a
lfilterM a -> m Bool
f (Sink a -> m ()
step) = (a -> m ()) -> Sink m a
forall (m :: * -> *) a. (a -> m ()) -> Sink m a
Sink (\a
a -> a -> m Bool
f a
a m Bool -> (Bool -> m ()) -> m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \Bool
use -> Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
use (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ a -> m ()
step a
a)

------------------------------------------------------------------------------
-- Sinks
------------------------------------------------------------------------------

-- | Drain all input, running the effects and discarding the results.
drain :: Monad m => Sink m a
drain :: Sink m a
drain = (a -> m ()) -> Sink m a
forall (m :: * -> *) a. (a -> m ()) -> Sink m a
Sink (\a
_ -> () -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ())

-- |
-- > drainM f = lmapM f drain
--
-- Drain all input after passing it through a monadic function.
{-# INLINABLE drainM #-}
drainM ::  Monad m => (a -> m b) -> Sink m a
drainM :: (a -> m b) -> Sink m a
drainM a -> m b
f = (a -> m ()) -> Sink m a
forall (m :: * -> *) a. (a -> m ()) -> Sink m a
Sink (m b -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m b -> m ()) -> (a -> m b) -> a -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> m b
f)