-- |
-- Module      : Streamly.Internal.Data.Fold.SVar
-- Copyright   : (c) 2017 Composewell Technologies
-- License     : BSD-3-Clause
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC
--
--
module Streamly.Internal.Data.Fold.SVar
    (
      write
    , writeLimited
    )
where

#include "inline.hs"

import Control.Monad (void)
import Control.Monad.IO.Class (MonadIO(liftIO))
import Streamly.Internal.Data.Fold.Type (Fold(..))

import qualified Streamly.Internal.Data.Fold.Type as FL

import Streamly.Internal.Data.SVar

-- | A fold to write a stream to an SVar. Unlike 'toSVar' this does not allow
-- for concurrent evaluation of the stream, as the fold receives the input one
-- element at a time, it just forwards the elements to the SVar. However, we
-- can safely execute the fold in an independent thread, the SVar can act as a
-- buffer decoupling the sender from the receiver. Also, we can have multiple
-- folds running concurrently pusing the streams to the SVar.
--
{-# INLINE write #-}
write :: MonadIO m => SVar t m a -> Maybe WorkerInfo -> Fold m a ()
write :: forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> Maybe WorkerInfo -> Fold m a ()
write SVar t m a
svar Maybe WorkerInfo
winfo = forall (m :: * -> *) a b s.
(s -> a -> m (Step s b))
-> m (Step s b) -> (s -> m b) -> Fold m a b
Fold forall {m :: * -> *} {b}. MonadIO m => () -> a -> m (Step () b)
step forall {b}. m (Step () b)
initial forall {m :: * -> *}. MonadIO m => () -> m ()
extract

    where

    initial :: m (Step () b)
initial = forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s b. s -> Step s b
FL.Partial ()

    -- XXX we can have a separate fold for unlimited buffer case to avoid a
    -- branch in the step here.
    step :: () -> a -> m (Step () b)
step () a
x =
        forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ do
            forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
decrementBufferLimit SVar t m a
svar
            forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ChildEvent a -> IO Int
send SVar t m a
svar (forall a. a -> ChildEvent a
ChildYield a
x)
            forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s b. s -> Step s b
FL.Partial ()

    extract :: () -> m ()
extract () = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar t m a
svar Maybe WorkerInfo
winfo

-- | Like write, but applies a yield limit.
--
{-# INLINE writeLimited #-}
writeLimited :: MonadIO m
    => SVar t m a -> Maybe WorkerInfo -> Fold m a ()
writeLimited :: forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> Maybe WorkerInfo -> Fold m a ()
writeLimited SVar t m a
svar Maybe WorkerInfo
winfo = forall (m :: * -> *) a b s.
(s -> a -> m (Step s b))
-> m (Step s b) -> (s -> m b) -> Fold m a b
Fold forall {m :: * -> *}. MonadIO m => Bool -> a -> m (Step Bool ())
step forall {b}. m (Step Bool b)
initial forall {m :: * -> *}. MonadIO m => Bool -> m ()
extract

    where

    initial :: m (Step Bool b)
initial = forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s b. s -> Step s b
FL.Partial Bool
True

    step :: Bool -> a -> m (Step Bool ())
step Bool
True a
x =
        forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ do
            Bool
yieldLimitOk <- forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
decrementYieldLimit SVar t m a
svar
            if Bool
yieldLimitOk
            then do
                forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
decrementBufferLimit SVar t m a
svar
                forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ChildEvent a -> IO Int
send SVar t m a
svar (forall a. a -> ChildEvent a
ChildYield a
x)
                forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s b. s -> Step s b
FL.Partial Bool
True
            else do
                forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
cleanupSVarFromWorker SVar t m a
svar
                forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar t m a
svar Maybe WorkerInfo
winfo
                forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s b. b -> Step s b
FL.Done ()
    step Bool
False a
_ = forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s b. b -> Step s b
FL.Done ()

    extract :: Bool -> m ()
extract Bool
True = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar t m a
svar Maybe WorkerInfo
winfo
    extract Bool
False = forall (m :: * -> *) a. Monad m => a -> m a
return ()