{-# LANGUAGE CPP #-} {-# LANGUAGE ConstraintKinds #-} {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE FlexibleInstances #-} {-# LANGUAGE LambdaCase #-} {-# LANGUAGE MagicHash #-} {-# LANGUAGE MultiParamTypeClasses #-} {-# LANGUAGE RankNTypes #-} {-# LANGUAGE UnboxedTuples #-} {-# LANGUAGE UndecidableInstances #-} -- XXX #include "inline.h" #ifdef DIAGNOSTICS_VERBOSE #define DIAGNOSTICS #endif -- | -- Module : Streamly.Streams.SVar -- Copyright : (c) 2017 Harendra Kumar -- -- License : BSD3 -- Maintainer : harendra.kumar@gmail.com -- Stability : experimental -- Portability : GHC -- -- module Streamly.Streams.SVar ( fromSVar , toSVar , maxThreads , maxBuffer , maxYields , rate , avgRate , minRate , maxRate , constRate ) where import Control.Exception (fromException) import Control.Monad.Catch (throwM) import Data.Int (Int64) import Control.Monad.IO.Class (liftIO) import Data.IORef (newIORef, mkWeakIORef) #ifdef DIAGNOSTICS import Data.IORef (writeIORef) import System.IO (hPutStrLn, stderr) import System.Clock (Clock(Monotonic), getTime) #endif import Streamly.SVar import Streamly.Streams.StreamK import Streamly.Streams.Serial (SerialT) -- MVar diagnostics has some overhead - around 5% on asyncly null benchmark, we -- can keep it on in production to debug problems quickly if and when they -- happen, but it may result in unexpected output when threads are left hanging -- until they are GCed because the consumer went away. #ifdef DIAGNOSTICS #ifdef DIAGNOSTICS_VERBOSE printSVar :: SVar t m a -> String -> IO () printSVar sv how = do svInfo <- dumpSVar sv hPutStrLn stderr $ "\n" ++ how ++ "\n" ++ svInfo #endif #endif -- | Pull a stream from an SVar. {-# NOINLINE fromStreamVar #-} fromStreamVar :: MonadAsync m => SVar Stream m a -> Stream m a fromStreamVar sv = Stream $ \st stp sng yld -> do list <- readOutputQ sv -- Reversing the output is important to guarantee that we process the -- outputs in the same order as they were generated by the constituent -- streams. unStream (processEvents $ reverse list) (rstState st) stp sng yld where allDone stp = do #ifdef DIAGNOSTICS t <- liftIO $ getTime Monotonic liftIO $ writeIORef (svarStopTime (svarStats sv)) (Just t) #ifdef DIAGNOSTICS_VERBOSE liftIO $ printSVar sv "SVar Done" #endif #endif stp {-# INLINE processEvents #-} processEvents [] = Stream $ \st stp sng yld -> do done <- postProcess sv if done then allDone stp else unStream (fromStreamVar sv) (rstState st) stp sng yld processEvents (ev : es) = Stream $ \st stp sng yld -> do let rest = processEvents es case ev of ChildYield a -> yld a rest ChildStop tid e -> do accountThread sv tid case e of Nothing -> unStream rest (rstState st) stp sng yld Just ex -> case fromException ex of Just ThreadAbort -> unStream rest (rstState st) stp sng yld Nothing -> throwM ex {-# INLINE fromSVar #-} fromSVar :: (MonadAsync m, IsStream t) => SVar Stream m a -> t m a fromSVar sv = do fromStream $ Stream $ \st stp sng yld -> do ref <- liftIO $ newIORef () _ <- liftIO $ mkWeakIORef ref hook -- We pass a copy of sv to fromStreamVar, so that we know that it has -- no other references, when that copy gets garbage collected "ref" -- will get garbage collected and our hook will be called. unStream (fromStreamVar sv{svarRef = Just ref}) st stp sng yld where hook = do #ifdef DIAGNOSTICS_VERBOSE printSVar sv "SVar Garbage Collected" #endif cleanupSVar sv -- | Write a stream to an 'SVar' in a non-blocking manner. The stream can then -- be read back from the SVar using 'fromSVar'. toSVar :: (IsStream t, MonadAsync m) => SVar Stream m a -> t m a -> m () toSVar sv m = toStreamVar sv (toStream m) ------------------------------------------------------------------------------- -- Concurrency control ------------------------------------------------------------------------------- -- -- XXX need to write these in direct style otherwise they will break fusion. -- -- | Specify the maximum number of threads that can be spawned concurrently for -- any concurrent combinator in a stream. -- A value of 0 resets the thread limit to default, a negative value means -- there is no limit. The default value is 1500. -- -- When the actions in a stream are IO bound, having blocking IO calls, this -- option can be used to control the maximum number of in-flight IO requests. -- When the actions are CPU bound this option can be used to -- control the amount of CPU used by the stream. -- -- @since 0.4.0 {-# INLINE_NORMAL maxThreads #-} maxThreads :: IsStream t => Int -> t m a -> t m a maxThreads n m = fromStream $ Stream $ \st stp sng yld -> do unStream (toStream m) (setMaxThreads n st) stp sng yld {- {-# RULES "maxThreadsSerial serial" maxThreads = maxThreadsSerial #-} maxThreadsSerial :: Int -> SerialT m a -> SerialT m a maxThreadsSerial _ = id -} -- | Specify the maximum size of the buffer for storing the results from -- concurrent computations. If the buffer becomes full we stop spawning more -- concurrent tasks until there is space in the buffer. -- A value of 0 resets the buffer size to default, a negative value means -- there is no limit. The default value is 1500. -- -- CAUTION! using an unbounded 'maxBuffer' value (i.e. a negative value) -- coupled with an unbounded 'maxThreads' value is a recipe for disaster in -- presence of infinite streams, or very large streams. Especially, it must -- not be used when 'pure' is used in 'ZipAsyncM' streams as 'pure' in -- applicative zip streams generates an infinite stream causing unbounded -- concurrent generation with no limit on the buffer or threads. -- -- @since 0.4.0 {-# INLINE_NORMAL maxBuffer #-} maxBuffer :: IsStream t => Int -> t m a -> t m a maxBuffer n m = fromStream $ Stream $ \st stp sng yld -> do unStream (toStream m) (setMaxBuffer n st) stp sng yld {- {-# RULES "maxBuffer serial" maxBuffer = maxBufferSerial #-} maxBufferSerial :: Int -> SerialT m a -> SerialT m a maxBufferSerial _ = id -} -- | Specify the pull rate of a stream. -- A 'Nothing' value resets the rate to default which is unlimited. When the -- rate is specified, concurrent production may be ramped up or down -- automatically to achieve the specified yield rate. The specific behavior for -- different styles of 'Rate' specifications is documented under 'Rate'. The -- effective maximum production rate achieved by a stream is governed by: -- -- * The 'maxThreads' limit -- * The 'maxBuffer' limit -- * The maximum rate that the stream producer can achieve -- * The maximum rate that the stream consumer can achieve -- -- @since 0.5.0 {-# INLINE_NORMAL rate #-} rate :: IsStream t => Maybe Rate -> t m a -> t m a rate r m = fromStream $ Stream $ \st stp sng yld -> do case r of Just (Rate low goal _ _) | goal < low -> error "rate: Target rate cannot be lower than minimum rate." Just (Rate _ goal high _) | goal > high -> error "rate: Target rate cannot be greater than maximum rate." Just (Rate low _ high _) | low > high -> error "rate: Minimum rate cannot be greater than maximum rate." _ -> unStream (toStream m) (setStreamRate r st) stp sng yld {- {-# RULES "rate serial" rate = yieldRateSerial #-} yieldRateSerial :: Double -> SerialT m a -> SerialT m a yieldRateSerial _ = id -} -- | Same as @rate (Just $ Rate (r/2) r (2*r) maxBound)@ -- -- Specifies the average production rate of a stream in number of yields -- per second (i.e. @Hertz@). Concurrent production is ramped up or down -- automatically to achieve the specified average yield rate. The rate can -- go down to half of the specified rate on the lower side and double of -- the specified rate on the higher side. -- -- @since 0.5.0 avgRate :: IsStream t => Double -> t m a -> t m a avgRate r = rate (Just $ Rate (r/2) r (2*r) maxBound) -- | Same as @rate (Just $ Rate r r (2*r) maxBound)@ -- -- Specifies the minimum rate at which the stream should yield values. As -- far as possible the yield rate would never be allowed to go below the -- specified rate, even though it may possibly go above it at times, the -- upper limit is double of the specified rate. -- -- @since 0.5.0 minRate :: IsStream t => Double -> t m a -> t m a minRate r = rate (Just $ Rate r r (2*r) maxBound) -- | Same as @rate (Just $ Rate (r/2) r r maxBound)@ -- -- Specifies the maximum rate at which the stream should yield values. As -- far as possible the yield rate would never be allowed to go above the -- specified rate, even though it may possibly go below it at times, the -- lower limit is half of the specified rate. This can be useful in -- applications where certain resource usage must not be allowed to go -- beyond certain limits. -- -- @since 0.5.0 maxRate :: IsStream t => Double -> t m a -> t m a maxRate r = rate (Just $ Rate (r/2) r r maxBound) -- | Same as @rate (Just $ Rate r r r 0)@ -- -- Specifies a constant yield rate. If for some reason the actual rate -- goes above or below the specified rate we do not try to recover it by -- increasing or decreasing the rate in future. This can be useful in -- applications like graphics frame refresh where we need to maintain a -- constant refresh rate. -- -- @since 0.5.0 constRate :: IsStream t => Double -> t m a -> t m a constRate r = rate (Just $ Rate r r r 0) -- | Specify the average latency, in nanoseconds, of a single threaded action -- in a concurrent composition. Streamly can measure the latencies, but that is -- possible only after at least one task has completed. This combinator can be -- used to provide a latency hint so that rate control using 'rate' can take -- that into account right from the beginning. When not specified then a -- default behavior is chosen which could be too slow or too fast, and would be -- restricted by any other control parameters configured. -- A value of 0 indicates default behavior, a negative value means there is no -- limit i.e. zero latency. -- This would normally be useful only in high latency and high throughput -- cases. -- {-# INLINE_NORMAL _serialLatency #-} _serialLatency :: IsStream t => Int -> t m a -> t m a _serialLatency n m = fromStream $ Stream $ \st stp sng yld -> do unStream (toStream m) (setStreamLatency n st) stp sng yld {- {-# RULES "serialLatency serial" _serialLatency = serialLatencySerial #-} serialLatencySerial :: Int -> SerialT m a -> SerialT m a serialLatencySerial _ = id -} -- Stop concurrent dispatches after this limit. This is useful in API's like -- "take" where we want to dispatch only upto the number of elements "take" -- needs. This value applies only to the immediate next level and is not -- inherited by everything in enclosed scope. {-# INLINE_NORMAL maxYields #-} maxYields :: IsStream t => Maybe Int64 -> t m a -> t m a maxYields n m = fromStream $ Stream $ \st stp sng yld -> do unStream (toStream m) (setYieldLimit n st) stp sng yld {-# RULES "maxYields serial" maxYields = maxYieldsSerial #-} maxYieldsSerial :: Maybe Int64 -> SerialT m a -> SerialT m a maxYieldsSerial _ = id