{-# LANGUAGE CPP                       #-}

#include "inline.hs"

-- |
-- Module      : Streamly.Streams.Combinators
-- Copyright   : (c) 2017 Harendra Kumar
--
-- License     : BSD3
-- Maintainer  : harendra.kumar@gmail.com
-- Stability   : experimental
-- Portability : GHC
--
--
module Streamly.Streams.Combinators
    ( maxThreads
    , maxBuffer
    , maxYields
    , rate
    , avgRate
    , minRate
    , maxRate
    , constRate
    , inspectMode
    , printState
    )
where

import Control.Monad.IO.Class (MonadIO(liftIO))
import Data.Int (Int64)

import Streamly.SVar
import Streamly.Streams.StreamK
import Streamly.Streams.Serial (SerialT)

-------------------------------------------------------------------------------
-- Concurrency control
-------------------------------------------------------------------------------
--
-- XXX need to write these in direct style otherwise they will break fusion.
--
-- | Specify the maximum number of threads that can be spawned concurrently for
-- any concurrent combinator in a stream.
-- A value of 0 resets the thread limit to default, a negative value means
-- there is no limit. The default value is 1500.
--
-- When the actions in a stream are IO bound, having blocking IO calls, this
-- option can be used to control the maximum number of in-flight IO requests.
-- When the actions are CPU bound this option can be used to
-- control the amount of CPU used by the stream.
--
-- @since 0.4.0
{-# INLINE_NORMAL maxThreads #-}
maxThreads :: IsStream t => Int -> t m a -> t m a
maxThreads n m = mkStream $ \st stp sng yld ->
    foldStreamShared (setMaxThreads n st) stp sng yld m

{-
{-# RULES "maxThreadsSerial serial" maxThreads = maxThreadsSerial #-}
maxThreadsSerial :: Int -> SerialT m a -> SerialT m a
maxThreadsSerial _ = id
-}

-- | Specify the maximum size of the buffer for storing the results from
-- concurrent computations. If the buffer becomes full we stop spawning more
-- concurrent tasks until there is space in the buffer.
-- A value of 0 resets the buffer size to default, a negative value means
-- there is no limit. The default value is 1500.
--
-- CAUTION! using an unbounded 'maxBuffer' value (i.e. a negative value)
-- coupled with an unbounded 'maxThreads' value is a recipe for disaster in
-- presence of infinite streams, or very large streams.  Especially, it must
-- not be used when 'pure' is used in 'ZipAsyncM' streams as 'pure' in
-- applicative zip streams generates an infinite stream causing unbounded
-- concurrent generation with no limit on the buffer or threads.
--
-- @since 0.4.0
{-# INLINE_NORMAL maxBuffer #-}
maxBuffer :: IsStream t => Int -> t m a -> t m a
maxBuffer n m = mkStream $ \st stp sng yld ->
    foldStreamShared (setMaxBuffer n st) stp sng yld m

{-
{-# RULES "maxBuffer serial" maxBuffer = maxBufferSerial #-}
maxBufferSerial :: Int -> SerialT m a -> SerialT m a
maxBufferSerial _ = id
-}

-- | Specify the pull rate of a stream.
-- A 'Nothing' value resets the rate to default which is unlimited.  When the
-- rate is specified, concurrent production may be ramped up or down
-- automatically to achieve the specified yield rate. The specific behavior for
-- different styles of 'Rate' specifications is documented under 'Rate'.  The
-- effective maximum production rate achieved by a stream is governed by:
--
-- * The 'maxThreads' limit
-- * The 'maxBuffer' limit
-- * The maximum rate that the stream producer can achieve
-- * The maximum rate that the stream consumer can achieve
--
-- @since 0.5.0
{-# INLINE_NORMAL rate #-}
rate :: IsStream t => Maybe Rate -> t m a -> t m a
rate r m = mkStream $ \st stp sng yld ->
    case r of
        Just (Rate low goal _ _) | goal < low ->
            error "rate: Target rate cannot be lower than minimum rate."
        Just (Rate _ goal high _) | goal > high ->
            error "rate: Target rate cannot be greater than maximum rate."
        Just (Rate low _ high _) | low > high ->
            error "rate: Minimum rate cannot be greater than maximum rate."
        _ -> foldStreamShared (setStreamRate r st) stp sng yld m

-- XXX implement for serial streams as well, as a simple delay

{-
{-# RULES "rate serial" rate = yieldRateSerial #-}
yieldRateSerial :: Double -> SerialT m a -> SerialT m a
yieldRateSerial _ = id
-}

-- | Same as @rate (Just $ Rate (r/2) r (2*r) maxBound)@
--
-- Specifies the average production rate of a stream in number of yields
-- per second (i.e.  @Hertz@).  Concurrent production is ramped up or down
-- automatically to achieve the specified average yield rate. The rate can
-- go down to half of the specified rate on the lower side and double of
-- the specified rate on the higher side.
--
-- @since 0.5.0
avgRate :: IsStream t => Double -> t m a -> t m a
avgRate r = rate (Just $ Rate (r/2) r (2*r) maxBound)

-- | Same as @rate (Just $ Rate r r (2*r) maxBound)@
--
-- Specifies the minimum rate at which the stream should yield values. As
-- far as possible the yield rate would never be allowed to go below the
-- specified rate, even though it may possibly go above it at times, the
-- upper limit is double of the specified rate.
--
-- @since 0.5.0
minRate :: IsStream t => Double -> t m a -> t m a
minRate r = rate (Just $ Rate r r (2*r) maxBound)

-- | Same as @rate (Just $ Rate (r/2) r r maxBound)@
--
-- Specifies the maximum rate at which the stream should yield values. As
-- far as possible the yield rate would never be allowed to go above the
-- specified rate, even though it may possibly go below it at times, the
-- lower limit is half of the specified rate. This can be useful in
-- applications where certain resource usage must not be allowed to go
-- beyond certain limits.
--
-- @since 0.5.0
maxRate :: IsStream t => Double -> t m a -> t m a
maxRate r = rate (Just $ Rate (r/2) r r maxBound)

-- | Same as @rate (Just $ Rate r r r 0)@
--
-- Specifies a constant yield rate. If for some reason the actual rate
-- goes above or below the specified rate we do not try to recover it by
-- increasing or decreasing the rate in future.  This can be useful in
-- applications like graphics frame refresh where we need to maintain a
-- constant refresh rate.
--
-- @since 0.5.0
constRate :: IsStream t => Double -> t m a -> t m a
constRate r = rate (Just $ Rate r r r 0)

-- | Specify the average latency, in nanoseconds, of a single threaded action
-- in a concurrent composition. Streamly can measure the latencies, but that is
-- possible only after at least one task has completed. This combinator can be
-- used to provide a latency hint so that rate control using 'rate' can take
-- that into account right from the beginning. When not specified then a
-- default behavior is chosen which could be too slow or too fast, and would be
-- restricted by any other control parameters configured.
-- A value of 0 indicates default behavior, a negative value means there is no
-- limit i.e. zero latency.
-- This would normally be useful only in high latency and high throughput
-- cases.
--
{-# INLINE_NORMAL _serialLatency #-}
_serialLatency :: IsStream t => Int -> t m a -> t m a
_serialLatency n m = mkStream $ \st stp sng yld ->
    foldStreamShared (setStreamLatency n st) stp sng yld m

{-
{-# RULES "serialLatency serial" _serialLatency = serialLatencySerial #-}
serialLatencySerial :: Int -> SerialT m a -> SerialT m a
serialLatencySerial _ = id
-}

-- Stop concurrent dispatches after this limit. This is useful in API's like
-- "take" where we want to dispatch only upto the number of elements "take"
-- needs.  This value applies only to the immediate next level and is not
-- inherited by everything in enclosed scope.
{-# INLINE_NORMAL maxYields #-}
maxYields :: IsStream t => Maybe Int64 -> t m a -> t m a
maxYields n m = mkStream $ \st stp sng yld ->
    foldStreamShared (setYieldLimit n st) stp sng yld m

{-# RULES "maxYields serial" maxYields = maxYieldsSerial #-}
maxYieldsSerial :: Maybe Int64 -> SerialT m a -> SerialT m a
maxYieldsSerial _ = id

printState :: MonadIO m => State Stream m a -> m ()
printState st = liftIO $ do
    let msv = streamVar st
    case msv of
        Just sv -> dumpSVar sv >>= putStrLn
        Nothing -> putStrLn "No SVar"

-- | Print debug information about an SVar when the stream ends
inspectMode :: IsStream t => t m a -> t m a
inspectMode m = mkStream $ \st stp sng yld ->
     foldStreamShared (setInspectMode st) stp sng yld m