{-# LANGUAGE CPP #-}
{-# LANGUAGE ConstraintKinds #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE MagicHash #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE UnboxedTuples #-}
{-# LANGUAGE UndecidableInstances #-}
#include "inline.h"
module Streamly.Streams.SVar
(
fromSVar
, toSVar
, maxThreads
, maxBuffer
, maxYields
)
where
import Control.Monad.Catch (throwM)
import Streamly.SVar
import Streamly.Streams.StreamK
import Streamly.Streams.Serial (SerialT)
{-# NOINLINE fromStreamVar #-}
fromStreamVar :: MonadAsync m => SVar Stream m a -> Stream m a
fromStreamVar sv = Stream $ \st stp sng yld -> do
list <- readOutputQ sv
unStream (processEvents $ reverse list) (rstState st) stp sng yld
where
allDone stp = do
#ifdef DIAGNOSTICS
#ifdef DIAGNOSTICS_VERBOSE
svInfo <- liftIO $ dumpSVar sv
liftIO $ hPutStrLn stderr $ "fromStreamVar done\n" ++ svInfo
#endif
#endif
stp
{-# INLINE processEvents #-}
processEvents [] = Stream $ \st stp sng yld -> do
done <- postProcess sv
if done
then allDone stp
else unStream (fromStreamVar sv) (rstState st) stp sng yld
processEvents (ev : es) = Stream $ \st stp sng yld -> do
let rest = processEvents es
case ev of
ChildYield a -> yld a rest
ChildStop tid e -> do
accountThread sv tid
case e of
Nothing -> unStream rest (rstState st) stp sng yld
Just ex -> throwM ex
{-# INLINE fromSVar #-}
fromSVar :: (MonadAsync m, IsStream t) => SVar Stream m a -> t m a
fromSVar sv = fromStream $ fromStreamVar sv
toSVar :: (IsStream t, MonadAsync m) => SVar Stream m a -> t m a -> m ()
toSVar sv m = toStreamVar sv (toStream m)
{-# INLINE_NORMAL maxThreads #-}
maxThreads :: IsStream t => Int -> t m a -> t m a
maxThreads n m = fromStream $ Stream $ \st stp sng yld -> do
let n' = if n == 0 then defaultMaxThreads else n
unStream (toStream m) (st {threadsHigh = n'}) stp sng yld
{-# RULES "maxThreadsSerial serial" maxThreads = maxThreadsSerial #-}
maxThreadsSerial :: Int -> SerialT m a -> SerialT m a
maxThreadsSerial _ = id
{-# INLINE_NORMAL maxBuffer #-}
maxBuffer :: IsStream t => Int -> t m a -> t m a
maxBuffer n m = fromStream $ Stream $ \st stp sng yld -> do
let n' = if n == 0 then defaultMaxBuffer else n
unStream (toStream m) (st {bufferHigh = n'}) stp sng yld
{-# RULES "maxBuffer serial" maxBuffer = maxBufferSerial #-}
maxBufferSerial :: Int -> SerialT m a -> SerialT m a
maxBufferSerial _ = id
{-# INLINE_NORMAL maxYields #-}
maxYields :: IsStream t => Maybe Int -> t m a -> t m a
maxYields n m = fromStream $ Stream $ \st stp sng yld -> do
unStream (toStream m) (st {yieldLimit = n}) stp sng yld
{-# RULES "maxYields serial" maxYields = maxYieldsSerial #-}
maxYieldsSerial :: Maybe Int -> SerialT m a -> SerialT m a
maxYieldsSerial _ = id