-- |
-- Module      : Streamly.Internal.Data.Stream.Concurrent.Channel.Type
-- Copyright   : (c) 2017 Composewell Technologies
-- License     : BSD-3-Clause
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC

module Streamly.Internal.Data.Stream.Concurrent.Channel.Type
    (
      Channel(..)
    , yield
    , stop
    , stopChannel
    , dumpSVar
    )
where

import Control.Concurrent (ThreadId)
import Control.Concurrent.MVar (MVar)
import Control.Monad (void)
import Control.Monad.IO.Class (MonadIO(..))
import Data.IORef (IORef)
import Data.List (intersperse)
import Data.Set (Set)
import Streamly.Internal.Control.Concurrent (RunInIO)
import Streamly.Internal.Data.Atomics (atomicModifyIORefCAS_)
import Streamly.Internal.Data.Stream.Channel.Dispatcher (dumpSVarStats)
import Streamly.Internal.Data.Stream.Channel.Worker
    (sendYield, sendStop, sendWithDoorBell)
import Streamly.Internal.Data.Stream.StreamK.Type (StreamK)

import Streamly.Internal.Data.Stream.Channel.Types

-- IMPORTANT NOTE: we cannot update the SVar after generating it as we have
-- references to the original SVar stored in several functions which will keep
-- pointing to the original data and the new updates won't reflect there.
-- Any updateable parts must be kept in mutable references (IORef).

-- XXX Since we have stream specific channels now, we can remove functions like
-- enqueue, readOuputQ, postProcess, workLoop etc from this.

-- | A mutable channel to evaluate multiple streams concurrently and provide
-- the combined results as output stream.
data Channel m a = Channel
    {
      forall (m :: * -> *) a. Channel m a -> RunInIO m
svarMrun        :: RunInIO m
    -- FORWARD FLOW: Flow of data from the workers to the consumer

    -- Shared output queue (events, length)
    -- XXX For better efficiency we can try a preallocated array type (perhaps
    -- something like a vector) that allows an O(1) append. That way we will
    -- avoid constructing and reversing the list. Possibly we can also avoid
    -- the GC copying overhead. When the size increases we should be able to
    -- allocate the array in chunks.
    --
    -- [LOCKING] Frequent locked access. This is updated by workers on each
    -- yield and once in a while read by the consumer thread. This could have
    -- big locking overhead if the number of workers is high.
    --
    -- XXX We can use a per-CPU data structure to reduce the locking overhead.
    -- However, a per-cpu structure cannot guarantee the exact sequence in
    -- which the elements were added, though that may not be important.
    -- XXX We can send a bundle of events of one type coaleseced together in an
    -- unboxed structure.
    , forall (m :: * -> *) a. Channel m a -> IORef ([ChildEvent a], Int)
outputQueue    :: IORef ([ChildEvent a], Int)

    -- [LOCKING] Infrequent MVar. Used when the outputQ transitions from empty
    -- to non-empty, or a work item is queued by a worker to the work queue and
    -- doorBellOnWorkQ is set by the consumer.
    , forall (m :: * -> *) a. Channel m a -> MVar ()
outputDoorBell :: MVar ()  -- signal the consumer about output
    -- XXX Can we use IO instead of m here?
    , forall (m :: * -> *) a. Channel m a -> m [ChildEvent a]
readOutputQ    :: m [ChildEvent a]
    , forall (m :: * -> *) a. Channel m a -> m Bool
postProcess    :: m Bool

    -- Scheduling --

    -- Combined/aggregate parameters
    -- This is capped to maxBufferLimit if set to more than that. Otherwise
    -- potentially each worker may yield one value to the buffer in the worst
    -- case exceeding the requested buffer size.
    , forall (m :: * -> *) a. Channel m a -> Limit
maxWorkerLimit :: Limit
    , forall (m :: * -> *) a. Channel m a -> Limit
maxBufferLimit :: Limit

    -- [LOCKING] Read only access by consumer when dispatching a worker.
    -- Decremented by workers when picking work and undo decrement if the
    -- worker does not yield a value.
    , forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork  :: Maybe (IORef Count)
    , forall (m :: * -> *) a. Channel m a -> Maybe YieldRateInfo
yieldRateInfo  :: Maybe YieldRateInfo

    , forall (m :: * -> *) a.
Channel m a -> Bool -> (RunInIO m, StreamK m a) -> IO ()
enqueue        :: Bool -> (RunInIO m, StreamK m a) -> IO ()
    , forall (m :: * -> *) a. Channel m a -> m ()
eagerDispatch  :: m ()
    , forall (m :: * -> *) a. Channel m a -> IO Bool
isWorkDone     :: IO Bool
    , forall (m :: * -> *) a. Channel m a -> IO Bool
isQueueDone    :: IO Bool
    , forall (m :: * -> *) a. Channel m a -> IORef Bool
doorBellOnWorkQ   :: IORef Bool -- ring outputDoorBell on enqueue so that a
    -- sleeping consumer thread can wake up and send more workers.
    , forall (m :: * -> *) a. Channel m a -> Maybe WorkerInfo -> m ()
workLoop       :: Maybe WorkerInfo -> m ()

    -- Shared, thread tracking
    -- [LOCKING] Updated unlocked, only by consumer thread.
    , forall (m :: * -> *) a. Channel m a -> IORef (Set ThreadId)
workerThreads  :: IORef (Set ThreadId)

    -- [LOCKING] Updated locked, by consumer thread when dispatching a worker
    -- and by the worker threads when the thread stops. This is read unsafely
    -- at several places where we want to rely on an approximate value.
    , forall (m :: * -> *) a. Channel m a -> IORef Int
workerCount    :: IORef Int
    -- XXX Can we use IO instead of m here?
    , forall (m :: * -> *) a. Channel m a -> ThreadId -> m ()
accountThread  :: ThreadId -> m ()
    , forall (m :: * -> *) a. Channel m a -> MVar ()
workerStopMVar :: MVar () -- Used only in ordered streams

    -- cleanup: to track garbage collection of SVar --
    , forall (m :: * -> *) a. Channel m a -> Maybe (IORef ())
svarRef        :: Maybe (IORef ())

    -- Stats --
    , forall (m :: * -> *) a. Channel m a -> SVarStats
svarStats      :: SVarStats

    -- Diagnostics --
    , forall (m :: * -> *) a. Channel m a -> Bool
svarInspectMode :: Bool
    , forall (m :: * -> *) a. Channel m a -> ThreadId
svarCreator    :: ThreadId
    }

{-# INLINE yield #-}
yield :: Channel m a -> Maybe WorkerInfo -> a -> IO Bool
yield :: forall (m :: * -> *) a.
Channel m a -> Maybe WorkerInfo -> a -> IO Bool
yield Channel m a
sv Maybe WorkerInfo
winfo a
x =
    forall a.
Limit
-> Limit
-> IORef Int
-> Maybe WorkerInfo
-> Maybe YieldRateInfo
-> IORef ([ChildEvent a], Int)
-> MVar ()
-> ChildEvent a
-> IO Bool
sendYield
        (forall (m :: * -> *) a. Channel m a -> Limit
maxBufferLimit Channel m a
sv)
        (forall (m :: * -> *) a. Channel m a -> Limit
maxWorkerLimit Channel m a
sv)
        (forall (m :: * -> *) a. Channel m a -> IORef Int
workerCount Channel m a
sv)
        Maybe WorkerInfo
winfo
        (forall (m :: * -> *) a. Channel m a -> Maybe YieldRateInfo
yieldRateInfo Channel m a
sv)
        (forall (m :: * -> *) a. Channel m a -> IORef ([ChildEvent a], Int)
outputQueue Channel m a
sv)
        (forall (m :: * -> *) a. Channel m a -> MVar ()
outputDoorBell Channel m a
sv)
        (forall a. a -> ChildEvent a
ChildYield a
x)

{-# INLINE stop #-}
stop :: Channel m a -> Maybe WorkerInfo -> IO ()
stop :: forall (m :: * -> *) a. Channel m a -> Maybe WorkerInfo -> IO ()
stop Channel m a
sv Maybe WorkerInfo
winfo =
    forall a.
IORef Int
-> Maybe WorkerInfo
-> Maybe YieldRateInfo
-> IORef ([ChildEvent a], Int)
-> MVar ()
-> IO ()
sendStop
        (forall (m :: * -> *) a. Channel m a -> IORef Int
workerCount Channel m a
sv)
        Maybe WorkerInfo
winfo
        (forall (m :: * -> *) a. Channel m a -> Maybe YieldRateInfo
yieldRateInfo Channel m a
sv)
        (forall (m :: * -> *) a. Channel m a -> IORef ([ChildEvent a], Int)
outputQueue Channel m a
sv)
        (forall (m :: * -> *) a. Channel m a -> MVar ()
outputDoorBell Channel m a
sv)

-- | Stop the channel. Kill all running worker threads.
{-# INLINABLE stopChannel #-}
stopChannel :: MonadIO m => Channel m a -> m ()
stopChannel :: forall (m :: * -> *) a. MonadIO m => Channel m a -> m ()
stopChannel Channel m a
chan = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ do
    forall t. IORef t -> (t -> t) -> IO ()
atomicModifyIORefCAS_ (forall (m :: * -> *) a. Channel m a -> IORef Int
workerCount Channel m a
chan) forall a b. (a -> b) -> a -> b
$ \Int
n -> Int
n forall a. Num a => a -> a -> a
- Int
1
    forall (f :: * -> *) a. Functor f => f a -> f ()
void
        forall a b. (a -> b) -> a -> b
$ forall a.
IORef ([ChildEvent a], Int) -> MVar () -> ChildEvent a -> IO Int
sendWithDoorBell
            (forall (m :: * -> *) a. Channel m a -> IORef ([ChildEvent a], Int)
outputQueue Channel m a
chan)
            (forall (m :: * -> *) a. Channel m a -> MVar ()
outputDoorBell Channel m a
chan)
            forall a. ChildEvent a
ChildStopChannel

{-# NOINLINE dumpSVar #-}
dumpSVar :: Channel m a -> IO String
dumpSVar :: forall (m :: * -> *) a. Channel m a -> IO String
dumpSVar Channel m a
sv = do
    [String]
xs <- forall (t :: * -> *) (m :: * -> *) a.
(Traversable t, Monad m) =>
t (m a) -> m (t a)
sequence forall a b. (a -> b) -> a -> b
$ forall a. a -> [a] -> [a]
intersperse (forall (m :: * -> *) a. Monad m => a -> m a
return String
"\n")
        [ forall (m :: * -> *) a. Monad m => a -> m a
return (forall a. Show a => a -> String
dumpCreator (forall (m :: * -> *) a. Channel m a -> ThreadId
svarCreator Channel m a
sv))
        , forall (m :: * -> *) a. Monad m => a -> m a
return String
"---------CURRENT STATE-----------"
        , forall (t :: * -> *) a1 a2.
(Foldable t, Show a1) =>
IORef (t a2, a1) -> IO String
dumpOutputQ (forall (m :: * -> *) a. Channel m a -> IORef ([ChildEvent a], Int)
outputQueue Channel m a
sv)
        -- XXX print the types of events in the outputQueue, first 5
        , forall a. Show a => MVar a -> IO String
dumpDoorBell (forall (m :: * -> *) a. Channel m a -> MVar ()
outputDoorBell Channel m a
sv)
        , forall a. Show a => IORef a -> IO String
dumpNeedDoorBell (forall (m :: * -> *) a. Channel m a -> IORef Bool
doorBellOnWorkQ Channel m a
sv)
        , forall a. Show a => IORef a -> IO String
dumpRunningThreads (forall (m :: * -> *) a. Channel m a -> IORef (Set ThreadId)
workerThreads Channel m a
sv)
        -- XXX print the status of first 5 threads
        , forall a. Show a => IORef a -> IO String
dumpWorkerCount (forall (m :: * -> *) a. Channel m a -> IORef Int
workerCount Channel m a
sv)
        , forall (m :: * -> *) a. Monad m => a -> m a
return String
"---------STATS-----------\n"
        , Bool -> Maybe YieldRateInfo -> SVarStats -> IO String
dumpSVarStats (forall (m :: * -> *) a. Channel m a -> Bool
svarInspectMode Channel m a
sv) (forall (m :: * -> *) a. Channel m a -> Maybe YieldRateInfo
yieldRateInfo Channel m a
sv) (forall (m :: * -> *) a. Channel m a -> SVarStats
svarStats Channel m a
sv)
        ]
    forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall (t :: * -> *) a. Foldable t => t [a] -> [a]
concat [String]
xs