{-# LANGUAGE CPP                       #-}
{-# LANGUAGE ConstraintKinds           #-}
{-# LANGUAGE FlexibleContexts          #-}
{-# LANGUAGE FlexibleInstances         #-}
{-# LANGUAGE MultiParamTypeClasses     #-}
{-# LANGUAGE RankNTypes                #-}
{-# LANGUAGE UndecidableInstances      #-} -- XXX

#include "inline.h"

-- |
-- Module      : Streamly.Streams.SVar
-- Copyright   : (c) 2017 Harendra Kumar
--
-- License     : BSD3
-- Maintainer  : harendra.kumar@gmail.com
-- Stability   : experimental
-- Portability : GHC
--
--
module Streamly.Streams.SVar
    (
      fromSVar
    , toSVar
    , maxThreads
    , maxBuffer
    , maxYields
    , rate
    , avgRate
    , minRate
    , maxRate
    , constRate
    , inspectMode
    , printState
    )
where

import Control.Exception (fromException)
import Control.Monad (when)
import Control.Monad.Catch (throwM)
import Control.Monad.IO.Class (MonadIO(liftIO))
import Data.Int (Int64)
import Data.IORef (newIORef, readIORef, mkWeakIORef, writeIORef)
import Data.Maybe (isNothing)
import Data.Semigroup ((<>))
import System.IO (hPutStrLn, stderr)
import System.Clock (Clock(Monotonic), getTime)
import System.Mem (performMajorGC)

import Streamly.SVar
import Streamly.Streams.StreamK
import Streamly.Streams.Serial (SerialT)

printSVar :: SVar t m a -> String -> IO ()
printSVar sv how = do
    svInfo <- dumpSVar sv
    hPutStrLn stderr $ "\n" <> how <> "\n" <> svInfo

printState :: MonadIO m => State Stream m a -> m ()
printState st = liftIO $ do
    let msv = streamVar st
    case msv of
        Just sv -> dumpSVar sv >>= putStrLn
        Nothing -> putStrLn "No SVar"

-- | Pull a stream from an SVar.
{-# NOINLINE fromStreamVar #-}
fromStreamVar :: MonadAsync m => SVar Stream m a -> Stream m a
fromStreamVar sv = Stream $ \st stp sng yld -> do
    list <- readOutputQ sv
    -- Reversing the output is important to guarantee that we process the
    -- outputs in the same order as they were generated by the constituent
    -- streams.
    unStream (processEvents $ reverse list) (rstState st) stp sng yld

    where

    allDone stp = do
        when (svarInspectMode sv) $ do
            t <- liftIO $ getTime Monotonic
            liftIO $ writeIORef (svarStopTime (svarStats sv)) (Just t)
            liftIO $ printSVar sv "SVar Done"
        stp

    {-# INLINE processEvents #-}
    processEvents [] = Stream $ \st stp sng yld -> do
        done <- postProcess sv
        if done
        then allDone stp
        else unStream (fromStreamVar sv) (rstState st) stp sng yld

    processEvents (ev : es) = Stream $ \st stp sng yld -> do
        let rest = processEvents es
        case ev of
            ChildYield a -> yld a rest
            ChildStop tid e -> do
                accountThread sv tid
                case e of
                    Nothing -> unStream rest (rstState st) stp sng yld
                    Just ex ->
                        case fromException ex of
                            Just ThreadAbort ->
                                unStream rest (rstState st) stp sng yld
                            Nothing -> liftIO (cleanupSVar sv) >> throwM ex

{-# INLINE fromSVar #-}
fromSVar :: (MonadAsync m, IsStream t) => SVar Stream m a -> t m a
fromSVar sv =
    fromStream $ Stream $ \st stp sng yld -> do
        ref <- liftIO $ newIORef ()
        _ <- liftIO $ mkWeakIORef ref hook
        -- We pass a copy of sv to fromStreamVar, so that we know that it has
        -- no other references, when that copy gets garbage collected "ref"
        -- will get garbage collected and our hook will be called.
        unStream (fromStreamVar sv{svarRef = Just ref}) st stp sng yld
    where

    hook = do
        when (svarInspectMode sv) $ do
            r <- liftIO $ readIORef (svarStopTime (svarStats sv))
            when (isNothing r) $
                printSVar sv "SVar Garbage Collected"
        cleanupSVar sv
        -- If there are any SVars referenced by this SVar a GC will prompt
        -- them to be cleaned up quickly.
        when (svarInspectMode sv) performMajorGC

-- | Write a stream to an 'SVar' in a non-blocking manner. The stream can then
-- be read back from the SVar using 'fromSVar'.
toSVar :: (IsStream t, MonadAsync m) => SVar Stream m a -> t m a -> m ()
toSVar sv m = toStreamVar sv (toStream m)

-------------------------------------------------------------------------------
-- Concurrency control
-------------------------------------------------------------------------------
--
-- XXX need to write these in direct style otherwise they will break fusion.
--
-- | Specify the maximum number of threads that can be spawned concurrently for
-- any concurrent combinator in a stream.
-- A value of 0 resets the thread limit to default, a negative value means
-- there is no limit. The default value is 1500.
--
-- When the actions in a stream are IO bound, having blocking IO calls, this
-- option can be used to control the maximum number of in-flight IO requests.
-- When the actions are CPU bound this option can be used to
-- control the amount of CPU used by the stream.
--
-- @since 0.4.0
{-# INLINE_NORMAL maxThreads #-}
maxThreads :: IsStream t => Int -> t m a -> t m a
maxThreads n m = fromStream $ Stream $ \st stp sng yld ->
    unStream (toStream m) (setMaxThreads n st) stp sng yld

{-
{-# RULES "maxThreadsSerial serial" maxThreads = maxThreadsSerial #-}
maxThreadsSerial :: Int -> SerialT m a -> SerialT m a
maxThreadsSerial _ = id
-}

-- | Specify the maximum size of the buffer for storing the results from
-- concurrent computations. If the buffer becomes full we stop spawning more
-- concurrent tasks until there is space in the buffer.
-- A value of 0 resets the buffer size to default, a negative value means
-- there is no limit. The default value is 1500.
--
-- CAUTION! using an unbounded 'maxBuffer' value (i.e. a negative value)
-- coupled with an unbounded 'maxThreads' value is a recipe for disaster in
-- presence of infinite streams, or very large streams.  Especially, it must
-- not be used when 'pure' is used in 'ZipAsyncM' streams as 'pure' in
-- applicative zip streams generates an infinite stream causing unbounded
-- concurrent generation with no limit on the buffer or threads.
--
-- @since 0.4.0
{-# INLINE_NORMAL maxBuffer #-}
maxBuffer :: IsStream t => Int -> t m a -> t m a
maxBuffer n m = fromStream $ Stream $ \st stp sng yld ->
    unStream (toStream m) (setMaxBuffer n st) stp sng yld

{-
{-# RULES "maxBuffer serial" maxBuffer = maxBufferSerial #-}
maxBufferSerial :: Int -> SerialT m a -> SerialT m a
maxBufferSerial _ = id
-}

-- | Specify the pull rate of a stream.
-- A 'Nothing' value resets the rate to default which is unlimited.  When the
-- rate is specified, concurrent production may be ramped up or down
-- automatically to achieve the specified yield rate. The specific behavior for
-- different styles of 'Rate' specifications is documented under 'Rate'.  The
-- effective maximum production rate achieved by a stream is governed by:
--
-- * The 'maxThreads' limit
-- * The 'maxBuffer' limit
-- * The maximum rate that the stream producer can achieve
-- * The maximum rate that the stream consumer can achieve
--
-- @since 0.5.0
{-# INLINE_NORMAL rate #-}
rate :: IsStream t => Maybe Rate -> t m a -> t m a
rate r m = fromStream $ Stream $ \st stp sng yld ->
    case r of
        Just (Rate low goal _ _) | goal < low ->
            error "rate: Target rate cannot be lower than minimum rate."
        Just (Rate _ goal high _) | goal > high ->
            error "rate: Target rate cannot be greater than maximum rate."
        Just (Rate low _ high _) | low > high ->
            error "rate: Minimum rate cannot be greater than maximum rate."
        _ -> unStream (toStream m) (setStreamRate r st) stp sng yld

-- XXX implement for serial streams as well, as a simple delay

{-
{-# RULES "rate serial" rate = yieldRateSerial #-}
yieldRateSerial :: Double -> SerialT m a -> SerialT m a
yieldRateSerial _ = id
-}

-- | Same as @rate (Just $ Rate (r/2) r (2*r) maxBound)@
--
-- Specifies the average production rate of a stream in number of yields
-- per second (i.e.  @Hertz@).  Concurrent production is ramped up or down
-- automatically to achieve the specified average yield rate. The rate can
-- go down to half of the specified rate on the lower side and double of
-- the specified rate on the higher side.
--
-- @since 0.5.0
avgRate :: IsStream t => Double -> t m a -> t m a
avgRate r = rate (Just $ Rate (r/2) r (2*r) maxBound)

-- | Same as @rate (Just $ Rate r r (2*r) maxBound)@
--
-- Specifies the minimum rate at which the stream should yield values. As
-- far as possible the yield rate would never be allowed to go below the
-- specified rate, even though it may possibly go above it at times, the
-- upper limit is double of the specified rate.
--
-- @since 0.5.0
minRate :: IsStream t => Double -> t m a -> t m a
minRate r = rate (Just $ Rate r r (2*r) maxBound)

-- | Same as @rate (Just $ Rate (r/2) r r maxBound)@
--
-- Specifies the maximum rate at which the stream should yield values. As
-- far as possible the yield rate would never be allowed to go above the
-- specified rate, even though it may possibly go below it at times, the
-- lower limit is half of the specified rate. This can be useful in
-- applications where certain resource usage must not be allowed to go
-- beyond certain limits.
--
-- @since 0.5.0
maxRate :: IsStream t => Double -> t m a -> t m a
maxRate r = rate (Just $ Rate (r/2) r r maxBound)

-- | Same as @rate (Just $ Rate r r r 0)@
--
-- Specifies a constant yield rate. If for some reason the actual rate
-- goes above or below the specified rate we do not try to recover it by
-- increasing or decreasing the rate in future.  This can be useful in
-- applications like graphics frame refresh where we need to maintain a
-- constant refresh rate.
--
-- @since 0.5.0
constRate :: IsStream t => Double -> t m a -> t m a
constRate r = rate (Just $ Rate r r r 0)

-- | Specify the average latency, in nanoseconds, of a single threaded action
-- in a concurrent composition. Streamly can measure the latencies, but that is
-- possible only after at least one task has completed. This combinator can be
-- used to provide a latency hint so that rate control using 'rate' can take
-- that into account right from the beginning. When not specified then a
-- default behavior is chosen which could be too slow or too fast, and would be
-- restricted by any other control parameters configured.
-- A value of 0 indicates default behavior, a negative value means there is no
-- limit i.e. zero latency.
-- This would normally be useful only in high latency and high throughput
-- cases.
--
{-# INLINE_NORMAL _serialLatency #-}
_serialLatency :: IsStream t => Int -> t m a -> t m a
_serialLatency n m = fromStream $ Stream $ \st stp sng yld ->
    unStream (toStream m) (setStreamLatency n st) stp sng yld

{-
{-# RULES "serialLatency serial" _serialLatency = serialLatencySerial #-}
serialLatencySerial :: Int -> SerialT m a -> SerialT m a
serialLatencySerial _ = id
-}

-- Stop concurrent dispatches after this limit. This is useful in API's like
-- "take" where we want to dispatch only upto the number of elements "take"
-- needs.  This value applies only to the immediate next level and is not
-- inherited by everything in enclosed scope.
{-# INLINE_NORMAL maxYields #-}
maxYields :: IsStream t => Maybe Int64 -> t m a -> t m a
maxYields n m = fromStream $ Stream $ \st stp sng yld ->
    unStream (toStream m) (setYieldLimit n st) stp sng yld

{-# RULES "maxYields serial" maxYields = maxYieldsSerial #-}
maxYieldsSerial :: Maybe Int64 -> SerialT m a -> SerialT m a
maxYieldsSerial _ = id

-- | Print debug information about an SVar when the stream ends
inspectMode :: IsStream t => t m a -> t m a
inspectMode m = fromStream $ Stream $ \st stp sng yld ->
     unStream (toStream m) (setInspectMode st) stp sng yld