{-# LANGUAGE CPP #-} {-# LANGUAGE ConstraintKinds #-} {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE FlexibleInstances #-} {-# LANGUAGE LambdaCase #-} {-# LANGUAGE MagicHash #-} {-# LANGUAGE MultiParamTypeClasses #-} {-# LANGUAGE RankNTypes #-} {-# LANGUAGE UnboxedTuples #-} {-# LANGUAGE UndecidableInstances #-} -- XXX #include "inline.h" -- | -- Module : Streamly.Streams.SVar -- Copyright : (c) 2017 Harendra Kumar -- -- License : BSD3 -- Maintainer : harendra.kumar@gmail.com -- Stability : experimental -- Portability : GHC -- -- module Streamly.Streams.SVar ( fromSVar , toSVar , maxThreads , maxBuffer , maxYields ) where import Control.Monad.Catch (throwM) import Streamly.SVar import Streamly.Streams.StreamK import Streamly.Streams.Serial (SerialT) -- MVar diagnostics has some overhead - around 5% on asyncly null benchmark, we -- can keep it on in production to debug problems quickly if and when they -- happen, but it may result in unexpected output when threads are left hanging -- until they are GCed because the consumer went away. -- | Pull a stream from an SVar. {-# NOINLINE fromStreamVar #-} fromStreamVar :: MonadAsync m => SVar Stream m a -> Stream m a fromStreamVar sv = Stream $ \st stp sng yld -> do list <- readOutputQ sv -- Reversing the output is important to guarantee that we process the -- outputs in the same order as they were generated by the constituent -- streams. unStream (processEvents $ reverse list) (rstState st) stp sng yld where allDone stp = do #ifdef DIAGNOSTICS #ifdef DIAGNOSTICS_VERBOSE svInfo <- liftIO $ dumpSVar sv liftIO $ hPutStrLn stderr $ "fromStreamVar done\n" ++ svInfo #endif #endif stp {-# INLINE processEvents #-} processEvents [] = Stream $ \st stp sng yld -> do done <- postProcess sv if done then allDone stp else unStream (fromStreamVar sv) (rstState st) stp sng yld processEvents (ev : es) = Stream $ \st stp sng yld -> do let rest = processEvents es case ev of ChildYield a -> yld a rest ChildStop tid e -> do accountThread sv tid case e of Nothing -> unStream rest (rstState st) stp sng yld Just ex -> throwM ex {-# INLINE fromSVar #-} fromSVar :: (MonadAsync m, IsStream t) => SVar Stream m a -> t m a fromSVar sv = fromStream $ fromStreamVar sv -- | Write a stream to an 'SVar' in a non-blocking manner. The stream can then -- be read back from the SVar using 'fromSVar'. toSVar :: (IsStream t, MonadAsync m) => SVar Stream m a -> t m a -> m () toSVar sv m = toStreamVar sv (toStream m) ------------------------------------------------------------------------------- -- Concurrency control ------------------------------------------------------------------------------- -- -- XXX need to write these in direct style otherwise they will break fusion. -- -- | Specify the maximum number of threads that can be spawned concurrently -- when using concurrent streams. This is not the grand total number of threads -- but the maximum number of threads at each point of concurrency. -- A value of 0 resets the thread limit to default, a negative value means -- there is no limit. The default value is 1500. -- -- @since 0.4.0 {-# INLINE_NORMAL maxThreads #-} maxThreads :: IsStream t => Int -> t m a -> t m a maxThreads n m = fromStream $ Stream $ \st stp sng yld -> do let n' = if n == 0 then defaultMaxThreads else n unStream (toStream m) (st {threadsHigh = n'}) stp sng yld {-# RULES "maxThreadsSerial serial" maxThreads = maxThreadsSerial #-} maxThreadsSerial :: Int -> SerialT m a -> SerialT m a maxThreadsSerial _ = id -- | Specify the maximum size of the buffer for storing the results from -- concurrent computations. If the buffer becomes full we stop spawning more -- concurrent tasks until there is space in the buffer. -- A value of 0 resets the buffer size to default, a negative value means -- there is no limit. The default value is 1500. -- -- @since 0.4.0 {-# INLINE_NORMAL maxBuffer #-} maxBuffer :: IsStream t => Int -> t m a -> t m a maxBuffer n m = fromStream $ Stream $ \st stp sng yld -> do let n' = if n == 0 then defaultMaxBuffer else n unStream (toStream m) (st {bufferHigh = n'}) stp sng yld {-# RULES "maxBuffer serial" maxBuffer = maxBufferSerial #-} maxBufferSerial :: Int -> SerialT m a -> SerialT m a maxBufferSerial _ = id -- Stop concurrent dispatches after this limit. This is useful in API's like -- "take" where we want to dispatch only upto the number of elements "take" -- needs. This value applies only to the immediate next level and is not -- inherited by everything in enclosed scope. {-# INLINE_NORMAL maxYields #-} maxYields :: IsStream t => Maybe Int -> t m a -> t m a maxYields n m = fromStream $ Stream $ \st stp sng yld -> do unStream (toStream m) (st {yieldLimit = n}) stp sng yld {-# RULES "maxYields serial" maxYields = maxYieldsSerial #-} maxYieldsSerial :: Maybe Int -> SerialT m a -> SerialT m a maxYieldsSerial _ = id