{-# LANGUAGE CPP #-}
{-# LANGUAGE ConstraintKinds #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE GeneralizedNewtypeDeriving#-}
{-# LANGUAGE InstanceSigs #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE UndecidableInstances #-}
module Streamly.Streams.Async
(
AsyncT
, Async
, asyncly
, async
, (<|)
, mkAsync
, mkAsync'
, WAsyncT
, WAsync
, wAsyncly
, wAsync
)
where
import Control.Concurrent (myThreadId)
import Control.Monad (ap)
import Control.Monad.Base (MonadBase(..), liftBaseDefault)
import Control.Monad.Catch (MonadThrow, throwM)
import Control.Concurrent.MVar (newEmptyMVar)
import Control.Monad.IO.Class (MonadIO(..))
import Control.Monad.Reader.Class (MonadReader(..))
import Control.Monad.State.Class (MonadState(..))
import Control.Monad.Trans.Class (MonadTrans(lift))
import Data.Concurrent.Queue.MichaelScott (LinkedQueue, newQ, nullQ, tryPopR)
import Data.IORef (IORef, newIORef, readIORef)
import Data.Maybe (fromJust)
import Data.Semigroup (Semigroup(..))
import Prelude hiding (map)
import qualified Data.Set as S
import Streamly.Streams.SVar (fromSVar)
import Streamly.Streams.Serial (map)
import Streamly.SVar
import Streamly.Streams.StreamK (IsStream(..), Stream(..), adapt, runStreamSVar)
import qualified Streamly.Streams.StreamK as K
#include "Instances.hs"
{-# INLINE workLoopLIFO #-}
workLoopLIFO
:: MonadIO m
=> IORef [Stream m a]
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> m ()
workLoopLIFO q st sv winfo = run
where
run = do
work <- dequeue
case work of
Nothing -> liftIO $ sendStop sv winfo
Just m -> runStreamSVar sv m st run single yieldk
single a = do
res <- liftIO $ sendYield sv winfo (ChildYield a)
if res then run else liftIO $ sendStop sv winfo
yieldk a r = do
res <- liftIO $ sendYield sv winfo (ChildYield a)
if res
then runStreamSVar sv r st run single yieldk
else liftIO $ do
enqueueLIFO sv q r
sendStop sv winfo
dequeue = liftIO $ atomicModifyIORefCAS q $ \case
[] -> ([], Nothing)
x : xs -> (xs, Just x)
{-# INLINE workLoopLIFOLimited #-}
workLoopLIFOLimited
:: MonadIO m
=> IORef [Stream m a]
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> m ()
workLoopLIFOLimited q st sv winfo = run
where
run = do
work <- dequeue
case work of
Nothing -> liftIO $ sendStop sv winfo
Just m -> do
yieldLimitOk <- liftIO $ decrementYieldLimit sv
if yieldLimitOk
then do
let stop = liftIO (incrementYieldLimit sv) >> run
runStreamSVar sv m st stop single yieldk
else liftIO $ do
enqueueLIFO sv q m
incrementYieldLimit sv
sendStop sv winfo
single a = do
res <- liftIO $ sendYield sv winfo (ChildYield a)
if res then run else liftIO $ sendStop sv winfo
yieldk a r = do
res <- liftIO $ sendYield sv winfo (ChildYield a)
yieldLimitOk <- liftIO $ decrementYieldLimit sv
let stop = liftIO (incrementYieldLimit sv) >> run
if res && yieldLimitOk
then runStreamSVar sv r st stop single yieldk
else liftIO $ do
incrementYieldLimit sv
enqueueLIFO sv q r
sendStop sv winfo
dequeue = liftIO $ atomicModifyIORefCAS q $ \case
[] -> ([], Nothing)
x : xs -> (xs, Just x)
{-# INLINE workLoopFIFO #-}
workLoopFIFO
:: MonadIO m
=> LinkedQueue (Stream m a)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> m ()
workLoopFIFO q st sv winfo = run
where
run = do
work <- liftIO $ tryPopR q
case work of
Nothing -> liftIO $ sendStop sv winfo
Just m -> runStreamSVar sv m st run single yieldk
single a = do
res <- liftIO $ sendYield sv winfo (ChildYield a)
if res then run else liftIO $ sendStop sv winfo
yieldk a r = do
res <- liftIO $ sendYield sv winfo (ChildYield a)
if res
then runStreamSVar sv r st run single yieldk
else liftIO $ do
enqueueFIFO sv q r
sendStop sv winfo
{-# INLINE workLoopFIFOLimited #-}
workLoopFIFOLimited
:: MonadIO m
=> LinkedQueue (Stream m a)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> m ()
workLoopFIFOLimited q st sv winfo = run
where
run = do
work <- liftIO $ tryPopR q
case work of
Nothing -> liftIO $ sendStop sv winfo
Just m -> do
yieldLimitOk <- liftIO $ decrementYieldLimit sv
if yieldLimitOk
then do
let stop = liftIO (incrementYieldLimit sv) >> run
runStreamSVar sv m st stop single yieldk
else liftIO $ do
enqueueFIFO sv q m
incrementYieldLimit sv
sendStop sv winfo
single a = do
res <- liftIO $ sendYield sv winfo (ChildYield a)
if res then run else liftIO $ sendStop sv winfo
yieldk a r = do
res <- liftIO $ sendYield sv winfo (ChildYield a)
yieldLimitOk <- liftIO $ decrementYieldLimit sv
let stop = liftIO (incrementYieldLimit sv) >> run
if res && yieldLimitOk
then runStreamSVar sv r st stop single yieldk
else liftIO $ do
incrementYieldLimit sv
enqueueFIFO sv q r
sendStop sv winfo
getLifoSVar :: forall m a. MonadAsync m
=> State Stream m a -> RunInIO m -> IO (SVar Stream m a)
getLifoSVar st mrun = do
outQ <- newIORef ([], 0)
outQMv <- newEmptyMVar
active <- newIORef 0
wfw <- newIORef False
running <- newIORef S.empty
q <- newIORef []
yl <- case getYieldLimit st of
Nothing -> return Nothing
Just x -> Just <$> newIORef x
rateInfo <- getYieldRateInfo st
stats <- newSVarStats
tid <- myThreadId
let isWorkFinished _ = null <$> readIORef q
let isWorkFinishedLimited sv = do
yieldsDone <-
case remainingWork sv of
Just ref -> do
n <- readIORef ref
return (n <= 0)
Nothing -> return False
qEmpty <- null <$> readIORef q
return $ qEmpty || yieldsDone
let getSVar :: SVar Stream m a
-> (SVar Stream m a -> m [ChildEvent a])
-> (SVar Stream m a -> m Bool)
-> (SVar Stream m a -> IO Bool)
-> (IORef [Stream m a]
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> m())
-> SVar Stream m a
getSVar sv readOutput postProc workDone wloop = SVar
{ outputQueue = outQ
, remainingWork = yl
, maxBufferLimit = getMaxBuffer st
, maxWorkerLimit = getMaxThreads st
, yieldRateInfo = rateInfo
, outputDoorBell = outQMv
, readOutputQ = readOutput sv
, postProcess = postProc sv
, workerThreads = running
, workLoop = wloop q st{streamVar = Just sv} sv
, enqueue = enqueueLIFO sv q
, isWorkDone = workDone sv
, isQueueDone = workDone sv
, needDoorBell = wfw
, svarStyle = AsyncVar
, svarMrun = mrun
, workerCount = active
, accountThread = delThread sv
, workerStopMVar = undefined
, svarRef = Nothing
, svarInspectMode = getInspectMode st
, svarCreator = tid
, aheadWorkQueue = undefined
, outputHeap = undefined
, svarStats = stats
}
let sv =
case getStreamRate st of
Nothing ->
case getYieldLimit st of
Nothing -> getSVar sv readOutputQBounded
postProcessBounded
isWorkFinished
workLoopLIFO
Just _ -> getSVar sv readOutputQBounded
postProcessBounded
isWorkFinishedLimited
workLoopLIFOLimited
Just _ ->
case getYieldLimit st of
Nothing -> getSVar sv readOutputQPaced
postProcessPaced
isWorkFinished
workLoopLIFO
Just _ -> getSVar sv readOutputQPaced
postProcessPaced
isWorkFinishedLimited
workLoopLIFOLimited
in return sv
getFifoSVar :: forall m a. MonadAsync m
=> State Stream m a -> RunInIO m -> IO (SVar Stream m a)
getFifoSVar st mrun = do
outQ <- newIORef ([], 0)
outQMv <- newEmptyMVar
active <- newIORef 0
wfw <- newIORef False
running <- newIORef S.empty
q <- newQ
yl <- case getYieldLimit st of
Nothing -> return Nothing
Just x -> Just <$> newIORef x
rateInfo <- getYieldRateInfo st
stats <- newSVarStats
tid <- myThreadId
let isWorkFinished _ = nullQ q
let isWorkFinishedLimited sv = do
yieldsDone <-
case remainingWork sv of
Just ref -> do
n <- readIORef ref
return (n <= 0)
Nothing -> return False
qEmpty <- nullQ q
return $ qEmpty || yieldsDone
let getSVar :: SVar Stream m a
-> (SVar Stream m a -> m [ChildEvent a])
-> (SVar Stream m a -> m Bool)
-> (SVar Stream m a -> IO Bool)
-> (LinkedQueue (Stream m a)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> m())
-> SVar Stream m a
getSVar sv readOutput postProc workDone wloop = SVar
{ outputQueue = outQ
, remainingWork = yl
, maxBufferLimit = getMaxBuffer st
, maxWorkerLimit = getMaxThreads st
, yieldRateInfo = rateInfo
, outputDoorBell = outQMv
, readOutputQ = readOutput sv
, postProcess = postProc sv
, workerThreads = running
, workLoop = wloop q st{streamVar = Just sv} sv
, enqueue = enqueueFIFO sv q
, isWorkDone = workDone sv
, isQueueDone = workDone sv
, needDoorBell = wfw
, svarStyle = WAsyncVar
, svarMrun = mrun
, workerCount = active
, accountThread = delThread sv
, workerStopMVar = undefined
, svarRef = Nothing
, svarInspectMode = getInspectMode st
, svarCreator = tid
, aheadWorkQueue = undefined
, outputHeap = undefined
, svarStats = stats
}
let sv =
case getStreamRate st of
Nothing ->
case getYieldLimit st of
Nothing -> getSVar sv readOutputQBounded
postProcessBounded
isWorkFinished
workLoopFIFO
Just _ -> getSVar sv readOutputQBounded
postProcessBounded
isWorkFinishedLimited
workLoopFIFOLimited
Just _ ->
case getYieldLimit st of
Nothing -> getSVar sv readOutputQPaced
postProcessPaced
isWorkFinished
workLoopFIFO
Just _ -> getSVar sv readOutputQPaced
postProcessPaced
isWorkFinishedLimited
workLoopFIFOLimited
in return sv
{-# INLINABLE newAsyncVar #-}
newAsyncVar :: MonadAsync m
=> State Stream m a -> Stream m a -> m (SVar Stream m a)
newAsyncVar st m = do
mrun <- captureMonadState
sv <- liftIO $ getLifoSVar st mrun
sendFirstWorker sv m
{-# INLINABLE mkAsync #-}
mkAsync :: (IsStream t, MonadAsync m) => t m a -> m (t m a)
mkAsync m = fmap fromSVar (newAsyncVar defState (toStream m))
{-# INLINABLE mkAsync' #-}
mkAsync' :: (IsStream t, MonadAsync m) => State Stream m a -> t m a -> m (t m a)
mkAsync' st m = fmap fromSVar (newAsyncVar st (toStream m))
{-# INLINABLE newWAsyncVar #-}
newWAsyncVar :: MonadAsync m
=> State Stream m a -> Stream m a -> m (SVar Stream m a)
newWAsyncVar st m = do
mrun <- captureMonadState
sv <- liftIO $ getFifoSVar st mrun
sendFirstWorker sv m
forkSVarAsync :: MonadAsync m
=> SVarStyle -> Stream m a -> Stream m a -> Stream m a
forkSVarAsync style m1 m2 = Stream $ \st stp sng yld -> do
sv <- case style of
AsyncVar -> newAsyncVar st (concurrently m1 m2)
WAsyncVar -> newWAsyncVar st (concurrently m1 m2)
_ -> error "illegal svar type"
unStream (fromSVar sv) (rstState st) stp sng yld
where
concurrently ma mb = Stream $ \st stp sng yld -> do
liftIO $ enqueue (fromJust $ streamVar st) mb
unStream ma st stp sng yld
{-# INLINE joinStreamVarAsync #-}
joinStreamVarAsync :: MonadAsync m
=> SVarStyle -> Stream m a -> Stream m a -> Stream m a
joinStreamVarAsync style m1 m2 = Stream $ \st stp sng yld ->
case streamVar st of
Just sv | svarStyle sv == style ->
liftIO (enqueue sv m2) >> unStream m1 st stp sng yld
_ -> unStream (forkSVarAsync style m1 m2) st stp sng yld
{-# INLINE asyncS #-}
asyncS :: MonadAsync m => Stream m a -> Stream m a -> Stream m a
asyncS = joinStreamVarAsync AsyncVar
{-# INLINE async #-}
async :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a
async m1 m2 = fromStream $ Stream $ \st stp sng yld ->
unStream (joinStreamVarAsync AsyncVar (toStream m1) (toStream m2))
st stp sng yld
{-# DEPRECATED (<|) "Please use 'async' instead." #-}
{-# INLINE (<|) #-}
(<|) :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a
(<|) = async
{-# INLINE consMAsync #-}
consMAsync :: MonadAsync m => m a -> Stream m a -> Stream m a
consMAsync m r = K.yieldM m `asyncS` r
newtype AsyncT m a = AsyncT {getAsyncT :: Stream m a}
deriving (MonadTrans)
type Async = AsyncT IO
asyncly :: IsStream t => AsyncT m a -> t m a
asyncly = adapt
instance IsStream AsyncT where
toStream = getAsyncT
fromStream = AsyncT
{-# INLINE consM #-}
{-# SPECIALIZE consM :: IO a -> AsyncT IO a -> AsyncT IO a #-}
consM m r = fromStream $ consMAsync m (toStream r)
{-# INLINE (|:) #-}
{-# SPECIALIZE (|:) :: IO a -> AsyncT IO a -> AsyncT IO a #-}
(|:) = consM
instance MonadAsync m => Semigroup (AsyncT m a) where
(<>) = async
instance MonadAsync m => Monoid (AsyncT m a) where
mempty = K.nil
mappend = (<>)
instance MonadAsync m => Monad (AsyncT m) where
return = pure
(AsyncT m) >>= f = AsyncT $ K.bindWith asyncS m (getAsyncT . f)
MONAD_APPLICATIVE_INSTANCE(AsyncT,MONADPARALLEL)
MONAD_COMMON_INSTANCES(AsyncT, MONADPARALLEL)
{-# INLINE wAsyncS #-}
wAsyncS :: MonadAsync m => Stream m a -> Stream m a -> Stream m a
wAsyncS = joinStreamVarAsync WAsyncVar
{-# INLINE consMWAsync #-}
consMWAsync :: MonadAsync m => m a -> Stream m a -> Stream m a
consMWAsync m r = K.yieldM m `wAsyncS` r
{-# INLINE wAsync #-}
wAsync :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a
wAsync m1 m2 = fromStream $ Stream $ \st stp sng yld ->
unStream (wAsyncS (toStream m1) (toStream m2)) st stp sng yld
newtype WAsyncT m a = WAsyncT {getWAsyncT :: Stream m a}
deriving (MonadTrans)
type WAsync = WAsyncT IO
wAsyncly :: IsStream t => WAsyncT m a -> t m a
wAsyncly = adapt
instance IsStream WAsyncT where
toStream = getWAsyncT
fromStream = WAsyncT
{-# INLINE consM #-}
{-# SPECIALIZE consM :: IO a -> WAsyncT IO a -> WAsyncT IO a #-}
consM m r = fromStream $ consMWAsync m (toStream r)
{-# INLINE (|:) #-}
{-# SPECIALIZE (|:) :: IO a -> WAsyncT IO a -> WAsyncT IO a #-}
(|:) = consM
instance MonadAsync m => Semigroup (WAsyncT m a) where
(<>) = wAsync
instance MonadAsync m => Monoid (WAsyncT m a) where
mempty = K.nil
mappend = (<>)
instance MonadAsync m => Monad (WAsyncT m) where
return = pure
(WAsyncT m) >>= f =
WAsyncT $ K.bindWith wAsyncS m (getWAsyncT . f)
MONAD_APPLICATIVE_INSTANCE(WAsyncT,MONADPARALLEL)
MONAD_COMMON_INSTANCES(WAsyncT, MONADPARALLEL)