{-# LANGUAGE CPP #-}
{-# LANGUAGE ConstraintKinds #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE GeneralizedNewtypeDeriving#-}
{-# LANGUAGE InstanceSigs #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE UndecidableInstances #-}
#include "inline.hs"
module Streamly.Internal.Data.Stream.Async
(
AsyncT
, Async
, asyncly
, async
, (<|)
, mkAsync
, mkAsyncK
, WAsyncT
, WAsync
, wAsyncly
, wAsync
)
where
import Control.Concurrent (myThreadId)
import Control.Monad.Base (MonadBase(..), liftBaseDefault)
import Control.Monad.Catch (MonadThrow, throwM)
import Control.Monad.Trans.Control (MonadBaseControl (..))
import Control.Concurrent.MVar (newEmptyMVar)
import Control.Monad.IO.Class (MonadIO(..))
import Control.Monad.Reader.Class (MonadReader(..))
import Control.Monad.State.Class (MonadState(..))
import Control.Monad.Trans.Class (MonadTrans(lift))
import Data.Concurrent.Queue.MichaelScott (LinkedQueue, newQ, nullQ, tryPopR)
import Data.IORef (IORef, newIORef, readIORef)
import Data.Maybe (fromJust)
#if __GLASGOW_HASKELL__ < 808
import Data.Semigroup (Semigroup(..))
#endif
import Prelude hiding (map)
import qualified Data.Set as S
import Streamly.Internal.Data.Atomics (atomicModifyIORefCAS)
import Streamly.Internal.Data.Stream.SVar (fromSVar)
import Streamly.Internal.Data.SVar
import Streamly.Internal.Data.Stream.StreamK
(IsStream(..), Stream, mkStream, foldStream, adapt, foldStreamShared)
import qualified Streamly.Internal.Data.Stream.StreamK as K
import qualified Streamly.Internal.Data.Stream.StreamD as D
#include "Instances.hs"
data WorkerStatus = Continue | Suspend
{-# INLINE workLoopLIFO #-}
workLoopLIFO
:: (MonadIO m, MonadBaseControl IO m)
=> IORef [Stream m a]
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> m ()
workLoopLIFO :: forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
IORef [Stream m a]
-> State Stream m a -> SVar Stream m a -> Maybe WorkerInfo -> m ()
workLoopLIFO IORef [Stream m a]
q State Stream m a
st SVar Stream m a
sv Maybe WorkerInfo
winfo = m ()
run
where
mrun :: m b -> IO (StM m b)
mrun = forall (m :: * -> *). RunInIO m -> forall b. m b -> IO (StM m b)
runInIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> RunInIO m
svarMrun SVar Stream m a
sv
run :: m ()
run = do
Maybe (Stream m a)
work <- m (Maybe (Stream m a))
dequeue
let stop :: m ()
stop = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar Stream m a
sv Maybe WorkerInfo
winfo
case Maybe (Stream m a)
work of
Maybe (Stream m a)
Nothing -> m ()
stop
Just Stream m a
m -> do
StM m WorkerStatus
r <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall b. m b -> IO (StM m b)
mrun forall a b. (a -> b) -> a -> b
$
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStreamShared State Stream m a
st a -> Stream m a -> m WorkerStatus
yieldk forall {m :: * -> *}. MonadIO m => a -> m WorkerStatus
single (forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Continue) Stream m a
m
WorkerStatus
res <- forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
StM m a -> m a
restoreM StM m WorkerStatus
r
case WorkerStatus
res of
WorkerStatus
Continue -> m ()
run
WorkerStatus
Suspend -> m ()
stop
single :: a -> m WorkerStatus
single a
a = do
Bool
res <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
sendYield SVar Stream m a
sv Maybe WorkerInfo
winfo (forall a. a -> ChildEvent a
ChildYield a
a)
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ if Bool
res then WorkerStatus
Continue else WorkerStatus
Suspend
yieldk :: a -> Stream m a -> m WorkerStatus
yieldk a
a Stream m a
r = do
Bool
res <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
sendYield SVar Stream m a
sv Maybe WorkerInfo
winfo (forall a. a -> ChildEvent a
ChildYield a
a)
if Bool
res
then forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStreamShared State Stream m a
st a -> Stream m a -> m WorkerStatus
yieldk forall {m :: * -> *}. MonadIO m => a -> m WorkerStatus
single (forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Continue) Stream m a
r
else forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ do
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef [t m a] -> t m a -> IO ()
enqueueLIFO SVar Stream m a
sv IORef [Stream m a]
q Stream m a
r
forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Suspend
dequeue :: m (Maybe (Stream m a))
dequeue = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS IORef [Stream m a]
q forall a b. (a -> b) -> a -> b
$ \case
[] -> ([], forall a. Maybe a
Nothing)
Stream m a
x : [Stream m a]
xs -> ([Stream m a]
xs, forall a. a -> Maybe a
Just Stream m a
x)
{-# INLINE workLoopLIFOLimited #-}
workLoopLIFOLimited
:: (MonadIO m, MonadBaseControl IO m)
=> IORef [Stream m a]
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> m ()
workLoopLIFOLimited :: forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
IORef [Stream m a]
-> State Stream m a -> SVar Stream m a -> Maybe WorkerInfo -> m ()
workLoopLIFOLimited IORef [Stream m a]
q State Stream m a
st SVar Stream m a
sv Maybe WorkerInfo
winfo = m ()
run
where
mrun :: m b -> IO (StM m b)
mrun = forall (m :: * -> *). RunInIO m -> forall b. m b -> IO (StM m b)
runInIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> RunInIO m
svarMrun SVar Stream m a
sv
incrContinue :: m WorkerStatus
incrContinue = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementYieldLimit SVar Stream m a
sv) forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Continue
run :: m ()
run = do
Maybe (Stream m a)
work <- m (Maybe (Stream m a))
dequeue
let stop :: m ()
stop = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar Stream m a
sv Maybe WorkerInfo
winfo
case Maybe (Stream m a)
work of
Maybe (Stream m a)
Nothing -> m ()
stop
Just Stream m a
m -> do
Bool
yieldLimitOk <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
decrementYieldLimit SVar Stream m a
sv
if Bool
yieldLimitOk
then do
StM m WorkerStatus
r <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall b. m b -> IO (StM m b)
mrun forall a b. (a -> b) -> a -> b
$
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStreamShared State Stream m a
st a -> Stream m a -> m WorkerStatus
yieldk forall {m :: * -> *}. MonadIO m => a -> m WorkerStatus
single m WorkerStatus
incrContinue Stream m a
m
WorkerStatus
res <- forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
StM m a -> m a
restoreM StM m WorkerStatus
r
case WorkerStatus
res of
WorkerStatus
Continue -> m ()
run
WorkerStatus
Suspend -> m ()
stop
else forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ do
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef [t m a] -> t m a -> IO ()
enqueueLIFO SVar Stream m a
sv IORef [Stream m a]
q Stream m a
m
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementYieldLimit SVar Stream m a
sv
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar Stream m a
sv Maybe WorkerInfo
winfo
single :: a -> m WorkerStatus
single a
a = do
Bool
res <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
sendYield SVar Stream m a
sv Maybe WorkerInfo
winfo (forall a. a -> ChildEvent a
ChildYield a
a)
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ if Bool
res then WorkerStatus
Continue else WorkerStatus
Suspend
yieldk :: a -> Stream m a -> m WorkerStatus
yieldk a
a Stream m a
r = do
Bool
res <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
sendYield SVar Stream m a
sv Maybe WorkerInfo
winfo (forall a. a -> ChildEvent a
ChildYield a
a)
Bool
yieldLimitOk <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
decrementYieldLimit SVar Stream m a
sv
if Bool
res Bool -> Bool -> Bool
&& Bool
yieldLimitOk
then forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStreamShared State Stream m a
st a -> Stream m a -> m WorkerStatus
yieldk forall {m :: * -> *}. MonadIO m => a -> m WorkerStatus
single m WorkerStatus
incrContinue Stream m a
r
else forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ do
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementYieldLimit SVar Stream m a
sv
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef [t m a] -> t m a -> IO ()
enqueueLIFO SVar Stream m a
sv IORef [Stream m a]
q Stream m a
r
forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Suspend
dequeue :: m (Maybe (Stream m a))
dequeue = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS IORef [Stream m a]
q forall a b. (a -> b) -> a -> b
$ \case
[] -> ([], forall a. Maybe a
Nothing)
Stream m a
x : [Stream m a]
xs -> ([Stream m a]
xs, forall a. a -> Maybe a
Just Stream m a
x)
{-# INLINE workLoopFIFO #-}
workLoopFIFO
:: (MonadIO m, MonadBaseControl IO m)
=> LinkedQueue (Stream m a)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> m ()
workLoopFIFO :: forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
LinkedQueue (Stream m a)
-> State Stream m a -> SVar Stream m a -> Maybe WorkerInfo -> m ()
workLoopFIFO LinkedQueue (Stream m a)
q State Stream m a
st SVar Stream m a
sv Maybe WorkerInfo
winfo = m ()
run
where
mrun :: m b -> IO (StM m b)
mrun = forall (m :: * -> *). RunInIO m -> forall b. m b -> IO (StM m b)
runInIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> RunInIO m
svarMrun SVar Stream m a
sv
run :: m ()
run = do
Maybe (Stream m a)
work <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. LinkedQueue a -> IO (Maybe a)
tryPopR LinkedQueue (Stream m a)
q
let stop :: m ()
stop = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar Stream m a
sv Maybe WorkerInfo
winfo
case Maybe (Stream m a)
work of
Maybe (Stream m a)
Nothing -> m ()
stop
Just Stream m a
m -> do
StM m WorkerStatus
r <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall b. m b -> IO (StM m b)
mrun forall a b. (a -> b) -> a -> b
$
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStreamShared State Stream m a
st forall {m :: * -> *}.
MonadIO m =>
a -> Stream m a -> m WorkerStatus
yieldk forall {m :: * -> *}. MonadIO m => a -> m WorkerStatus
single (forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Continue) Stream m a
m
WorkerStatus
res <- forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
StM m a -> m a
restoreM StM m WorkerStatus
r
case WorkerStatus
res of
WorkerStatus
Continue -> m ()
run
WorkerStatus
Suspend -> m ()
stop
single :: a -> m WorkerStatus
single a
a = do
Bool
res <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
sendYield SVar Stream m a
sv Maybe WorkerInfo
winfo (forall a. a -> ChildEvent a
ChildYield a
a)
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ if Bool
res then WorkerStatus
Continue else WorkerStatus
Suspend
yieldk :: a -> Stream m a -> m WorkerStatus
yieldk a
a Stream m a
r = do
Bool
res <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
sendYield SVar Stream m a
sv Maybe WorkerInfo
winfo (forall a. a -> ChildEvent a
ChildYield a
a)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> LinkedQueue (t m a) -> t m a -> IO ()
enqueueFIFO SVar Stream m a
sv LinkedQueue (Stream m a)
q Stream m a
r
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ if Bool
res then WorkerStatus
Continue else WorkerStatus
Suspend
{-# INLINE workLoopFIFOLimited #-}
workLoopFIFOLimited
:: (MonadIO m, MonadBaseControl IO m)
=> LinkedQueue (Stream m a)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> m ()
workLoopFIFOLimited :: forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
LinkedQueue (Stream m a)
-> State Stream m a -> SVar Stream m a -> Maybe WorkerInfo -> m ()
workLoopFIFOLimited LinkedQueue (Stream m a)
q State Stream m a
st SVar Stream m a
sv Maybe WorkerInfo
winfo = m ()
run
where
mrun :: m b -> IO (StM m b)
mrun = forall (m :: * -> *). RunInIO m -> forall b. m b -> IO (StM m b)
runInIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> RunInIO m
svarMrun SVar Stream m a
sv
incrContinue :: m WorkerStatus
incrContinue = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementYieldLimit SVar Stream m a
sv) forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Continue
run :: m ()
run = do
Maybe (Stream m a)
work <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. LinkedQueue a -> IO (Maybe a)
tryPopR LinkedQueue (Stream m a)
q
let stop :: m ()
stop = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar Stream m a
sv Maybe WorkerInfo
winfo
case Maybe (Stream m a)
work of
Maybe (Stream m a)
Nothing -> m ()
stop
Just Stream m a
m -> do
Bool
yieldLimitOk <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
decrementYieldLimit SVar Stream m a
sv
if Bool
yieldLimitOk
then do
StM m WorkerStatus
r <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall b. m b -> IO (StM m b)
mrun forall a b. (a -> b) -> a -> b
$
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStreamShared State Stream m a
st forall {m :: * -> *}.
MonadIO m =>
a -> Stream m a -> m WorkerStatus
yieldk forall {m :: * -> *}. MonadIO m => a -> m WorkerStatus
single m WorkerStatus
incrContinue Stream m a
m
WorkerStatus
res <- forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
StM m a -> m a
restoreM StM m WorkerStatus
r
case WorkerStatus
res of
WorkerStatus
Continue -> m ()
run
WorkerStatus
Suspend -> m ()
stop
else forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ do
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> LinkedQueue (t m a) -> t m a -> IO ()
enqueueFIFO SVar Stream m a
sv LinkedQueue (Stream m a)
q Stream m a
m
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementYieldLimit SVar Stream m a
sv
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar Stream m a
sv Maybe WorkerInfo
winfo
single :: a -> m WorkerStatus
single a
a = do
Bool
res <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
sendYield SVar Stream m a
sv Maybe WorkerInfo
winfo (forall a. a -> ChildEvent a
ChildYield a
a)
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ if Bool
res then WorkerStatus
Continue else WorkerStatus
Suspend
yieldk :: a -> Stream m a -> m WorkerStatus
yieldk a
a Stream m a
r = do
Bool
res <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
sendYield SVar Stream m a
sv Maybe WorkerInfo
winfo (forall a. a -> ChildEvent a
ChildYield a
a)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> LinkedQueue (t m a) -> t m a -> IO ()
enqueueFIFO SVar Stream m a
sv LinkedQueue (Stream m a)
q Stream m a
r
Bool
yieldLimitOk <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
decrementYieldLimit SVar Stream m a
sv
if Bool
res Bool -> Bool -> Bool
&& Bool
yieldLimitOk
then forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Continue
else forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ do
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementYieldLimit SVar Stream m a
sv
forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Suspend
getLifoSVar :: forall m a. MonadAsync m
=> State Stream m a -> RunInIO m -> IO (SVar Stream m a)
getLifoSVar :: forall (m :: * -> *) a.
MonadAsync m =>
State Stream m a -> RunInIO m -> IO (SVar Stream m a)
getLifoSVar State Stream m a
st RunInIO m
mrun = do
IORef ([ChildEvent a], Int)
outQ <- forall a. a -> IO (IORef a)
newIORef ([], Int
0)
MVar ()
outQMv <- forall a. IO (MVar a)
newEmptyMVar
IORef Int
active <- forall a. a -> IO (IORef a)
newIORef Int
0
IORef Bool
wfw <- forall a. a -> IO (IORef a)
newIORef Bool
False
IORef (Set ThreadId)
running <- forall a. a -> IO (IORef a)
newIORef forall a. Set a
S.empty
IORef [Stream m a]
q <- forall a. a -> IO (IORef a)
newIORef []
Maybe (IORef Count)
yl <- case forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe Count
getYieldLimit State Stream m a
st of
Maybe Count
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a. Maybe a
Nothing
Just Count
x -> forall a. a -> Maybe a
Just forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. a -> IO (IORef a)
newIORef Count
x
Maybe YieldRateInfo
rateInfo <- forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> IO (Maybe YieldRateInfo)
getYieldRateInfo State Stream m a
st
SVarStats
stats <- IO SVarStats
newSVarStats
ThreadId
tid <- IO ThreadId
myThreadId
let isWorkFinished :: p -> IO Bool
isWorkFinished p
_ = forall (t :: * -> *) a. Foldable t => t a -> Bool
null forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. IORef a -> IO a
readIORef IORef [Stream m a]
q
let isWorkFinishedLimited :: SVar t m a -> IO Bool
isWorkFinishedLimited SVar t m a
sv = do
Bool
yieldsDone <-
case forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe (IORef Count)
remainingWork SVar t m a
sv of
Just IORef Count
ref -> do
Count
n <- forall a. IORef a -> IO a
readIORef IORef Count
ref
forall (m :: * -> *) a. Monad m => a -> m a
return (Count
n forall a. Ord a => a -> a -> Bool
<= Count
0)
Maybe (IORef Count)
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
Bool
qEmpty <- forall (t :: * -> *) a. Foldable t => t a -> Bool
null forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. IORef a -> IO a
readIORef IORef [Stream m a]
q
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ Bool
qEmpty Bool -> Bool -> Bool
|| Bool
yieldsDone
let getSVar :: SVar Stream m a
-> (SVar Stream m a -> m [ChildEvent a])
-> (SVar Stream m a -> m Bool)
-> (SVar Stream m a -> IO Bool)
-> (IORef [Stream m a]
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> m())
-> SVar Stream m a
getSVar :: SVar Stream m a
-> (SVar Stream m a -> m [ChildEvent a])
-> (SVar Stream m a -> m Bool)
-> (SVar Stream m a -> IO Bool)
-> (IORef [Stream m a]
-> State Stream m a -> SVar Stream m a -> Maybe WorkerInfo -> m ())
-> SVar Stream m a
getSVar SVar Stream m a
sv SVar Stream m a -> m [ChildEvent a]
readOutput SVar Stream m a -> m Bool
postProc SVar Stream m a -> IO Bool
workDone IORef [Stream m a]
-> State Stream m a -> SVar Stream m a -> Maybe WorkerInfo -> m ()
wloop = SVar
{ outputQueue :: IORef ([ChildEvent a], Int)
outputQueue = IORef ([ChildEvent a], Int)
outQ
, outputQueueFromConsumer :: IORef ([ChildEvent a], Int)
outputQueueFromConsumer = forall a. HasCallStack => a
undefined
, remainingWork :: Maybe (IORef Count)
remainingWork = Maybe (IORef Count)
yl
, maxBufferLimit :: Limit
maxBufferLimit = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Limit
getMaxBuffer State Stream m a
st
, pushBufferSpace :: IORef Count
pushBufferSpace = forall a. HasCallStack => a
undefined
, pushBufferPolicy :: PushBufferPolicy
pushBufferPolicy = forall a. HasCallStack => a
undefined
, pushBufferMVar :: MVar ()
pushBufferMVar = forall a. HasCallStack => a
undefined
, maxWorkerLimit :: Limit
maxWorkerLimit = forall a. Ord a => a -> a -> a
min (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Limit
getMaxThreads State Stream m a
st) (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Limit
getMaxBuffer State Stream m a
st)
, yieldRateInfo :: Maybe YieldRateInfo
yieldRateInfo = Maybe YieldRateInfo
rateInfo
, outputDoorBell :: MVar ()
outputDoorBell = MVar ()
outQMv
, outputDoorBellFromConsumer :: MVar ()
outputDoorBellFromConsumer = forall a. HasCallStack => a
undefined
, readOutputQ :: m [ChildEvent a]
readOutputQ = SVar Stream m a -> m [ChildEvent a]
readOutput SVar Stream m a
sv
, postProcess :: m Bool
postProcess = SVar Stream m a -> m Bool
postProc SVar Stream m a
sv
, workerThreads :: IORef (Set ThreadId)
workerThreads = IORef (Set ThreadId)
running
, workLoop :: Maybe WorkerInfo -> m ()
workLoop = IORef [Stream m a]
-> State Stream m a -> SVar Stream m a -> Maybe WorkerInfo -> m ()
wloop IORef [Stream m a]
q State Stream m a
st{streamVar :: Maybe (SVar Stream m a)
streamVar = forall a. a -> Maybe a
Just SVar Stream m a
sv} SVar Stream m a
sv
, enqueue :: Stream m a -> IO ()
enqueue = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef [t m a] -> t m a -> IO ()
enqueueLIFO SVar Stream m a
sv IORef [Stream m a]
q
, isWorkDone :: IO Bool
isWorkDone = SVar Stream m a -> IO Bool
workDone SVar Stream m a
sv
, isQueueDone :: IO Bool
isQueueDone = SVar Stream m a -> IO Bool
workDone SVar Stream m a
sv
, needDoorBell :: IORef Bool
needDoorBell = IORef Bool
wfw
, svarStyle :: SVarStyle
svarStyle = SVarStyle
AsyncVar
, svarStopStyle :: SVarStopStyle
svarStopStyle = SVarStopStyle
StopNone
, svarStopBy :: IORef ThreadId
svarStopBy = forall a. HasCallStack => a
undefined
, svarMrun :: RunInIO m
svarMrun = RunInIO m
mrun
, workerCount :: IORef Int
workerCount = IORef Int
active
, accountThread :: ThreadId -> m ()
accountThread = forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> ThreadId -> m ()
delThread SVar Stream m a
sv
, workerStopMVar :: MVar ()
workerStopMVar = forall a. HasCallStack => a
undefined
, svarRef :: Maybe (IORef ())
svarRef = forall a. Maybe a
Nothing
, svarInspectMode :: Bool
svarInspectMode = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Bool
getInspectMode State Stream m a
st
, svarCreator :: ThreadId
svarCreator = ThreadId
tid
, aheadWorkQueue :: IORef ([Stream m a], Int)
aheadWorkQueue = forall a. HasCallStack => a
undefined
, outputHeap :: IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
outputHeap = forall a. HasCallStack => a
undefined
, svarStats :: SVarStats
svarStats = SVarStats
stats
}
let sv :: SVar Stream m a
sv =
case forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe Rate
getStreamRate State Stream m a
st of
Maybe Rate
Nothing ->
case forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe Count
getYieldLimit State Stream m a
st of
Maybe Count
Nothing -> SVar Stream m a
-> (SVar Stream m a -> m [ChildEvent a])
-> (SVar Stream m a -> m Bool)
-> (SVar Stream m a -> IO Bool)
-> (IORef [Stream m a]
-> State Stream m a -> SVar Stream m a -> Maybe WorkerInfo -> m ())
-> SVar Stream m a
getSVar SVar Stream m a
sv forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m [ChildEvent a]
readOutputQBounded
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m Bool
postProcessBounded
forall {p}. p -> IO Bool
isWorkFinished
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
IORef [Stream m a]
-> State Stream m a -> SVar Stream m a -> Maybe WorkerInfo -> m ()
workLoopLIFO
Just Count
_ -> SVar Stream m a
-> (SVar Stream m a -> m [ChildEvent a])
-> (SVar Stream m a -> m Bool)
-> (SVar Stream m a -> IO Bool)
-> (IORef [Stream m a]
-> State Stream m a -> SVar Stream m a -> Maybe WorkerInfo -> m ())
-> SVar Stream m a
getSVar SVar Stream m a
sv forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m [ChildEvent a]
readOutputQBounded
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m Bool
postProcessBounded
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
isWorkFinishedLimited
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
IORef [Stream m a]
-> State Stream m a -> SVar Stream m a -> Maybe WorkerInfo -> m ()
workLoopLIFOLimited
Just Rate
_ ->
case forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe Count
getYieldLimit State Stream m a
st of
Maybe Count
Nothing -> SVar Stream m a
-> (SVar Stream m a -> m [ChildEvent a])
-> (SVar Stream m a -> m Bool)
-> (SVar Stream m a -> IO Bool)
-> (IORef [Stream m a]
-> State Stream m a -> SVar Stream m a -> Maybe WorkerInfo -> m ())
-> SVar Stream m a
getSVar SVar Stream m a
sv forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m [ChildEvent a]
readOutputQPaced
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m Bool
postProcessPaced
forall {p}. p -> IO Bool
isWorkFinished
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
IORef [Stream m a]
-> State Stream m a -> SVar Stream m a -> Maybe WorkerInfo -> m ()
workLoopLIFO
Just Count
_ -> SVar Stream m a
-> (SVar Stream m a -> m [ChildEvent a])
-> (SVar Stream m a -> m Bool)
-> (SVar Stream m a -> IO Bool)
-> (IORef [Stream m a]
-> State Stream m a -> SVar Stream m a -> Maybe WorkerInfo -> m ())
-> SVar Stream m a
getSVar SVar Stream m a
sv forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m [ChildEvent a]
readOutputQPaced
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m Bool
postProcessPaced
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
isWorkFinishedLimited
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
IORef [Stream m a]
-> State Stream m a -> SVar Stream m a -> Maybe WorkerInfo -> m ()
workLoopLIFOLimited
in forall (m :: * -> *) a. Monad m => a -> m a
return SVar Stream m a
sv
getFifoSVar :: forall m a. MonadAsync m
=> State Stream m a -> RunInIO m -> IO (SVar Stream m a)
getFifoSVar :: forall (m :: * -> *) a.
MonadAsync m =>
State Stream m a -> RunInIO m -> IO (SVar Stream m a)
getFifoSVar State Stream m a
st RunInIO m
mrun = do
IORef ([ChildEvent a], Int)
outQ <- forall a. a -> IO (IORef a)
newIORef ([], Int
0)
MVar ()
outQMv <- forall a. IO (MVar a)
newEmptyMVar
IORef Int
active <- forall a. a -> IO (IORef a)
newIORef Int
0
IORef Bool
wfw <- forall a. a -> IO (IORef a)
newIORef Bool
False
IORef (Set ThreadId)
running <- forall a. a -> IO (IORef a)
newIORef forall a. Set a
S.empty
LinkedQueue (Stream m a)
q <- forall a. IO (LinkedQueue a)
newQ
Maybe (IORef Count)
yl <- case forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe Count
getYieldLimit State Stream m a
st of
Maybe Count
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a. Maybe a
Nothing
Just Count
x -> forall a. a -> Maybe a
Just forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. a -> IO (IORef a)
newIORef Count
x
Maybe YieldRateInfo
rateInfo <- forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> IO (Maybe YieldRateInfo)
getYieldRateInfo State Stream m a
st
SVarStats
stats <- IO SVarStats
newSVarStats
ThreadId
tid <- IO ThreadId
myThreadId
let isWorkFinished :: p -> IO Bool
isWorkFinished p
_ = forall a. LinkedQueue a -> IO Bool
nullQ LinkedQueue (Stream m a)
q
let isWorkFinishedLimited :: SVar t m a -> IO Bool
isWorkFinishedLimited SVar t m a
sv = do
Bool
yieldsDone <-
case forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe (IORef Count)
remainingWork SVar t m a
sv of
Just IORef Count
ref -> do
Count
n <- forall a. IORef a -> IO a
readIORef IORef Count
ref
forall (m :: * -> *) a. Monad m => a -> m a
return (Count
n forall a. Ord a => a -> a -> Bool
<= Count
0)
Maybe (IORef Count)
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
Bool
qEmpty <- forall a. LinkedQueue a -> IO Bool
nullQ LinkedQueue (Stream m a)
q
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ Bool
qEmpty Bool -> Bool -> Bool
|| Bool
yieldsDone
let getSVar :: SVar Stream m a
-> (SVar Stream m a -> m [ChildEvent a])
-> (SVar Stream m a -> m Bool)
-> (SVar Stream m a -> IO Bool)
-> (LinkedQueue (Stream m a)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> m())
-> SVar Stream m a
getSVar :: SVar Stream m a
-> (SVar Stream m a -> m [ChildEvent a])
-> (SVar Stream m a -> m Bool)
-> (SVar Stream m a -> IO Bool)
-> (LinkedQueue (Stream m a)
-> State Stream m a -> SVar Stream m a -> Maybe WorkerInfo -> m ())
-> SVar Stream m a
getSVar SVar Stream m a
sv SVar Stream m a -> m [ChildEvent a]
readOutput SVar Stream m a -> m Bool
postProc SVar Stream m a -> IO Bool
workDone LinkedQueue (Stream m a)
-> State Stream m a -> SVar Stream m a -> Maybe WorkerInfo -> m ()
wloop = SVar
{ outputQueue :: IORef ([ChildEvent a], Int)
outputQueue = IORef ([ChildEvent a], Int)
outQ
, outputQueueFromConsumer :: IORef ([ChildEvent a], Int)
outputQueueFromConsumer = forall a. HasCallStack => a
undefined
, remainingWork :: Maybe (IORef Count)
remainingWork = Maybe (IORef Count)
yl
, maxBufferLimit :: Limit
maxBufferLimit = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Limit
getMaxBuffer State Stream m a
st
, pushBufferSpace :: IORef Count
pushBufferSpace = forall a. HasCallStack => a
undefined
, pushBufferPolicy :: PushBufferPolicy
pushBufferPolicy = forall a. HasCallStack => a
undefined
, pushBufferMVar :: MVar ()
pushBufferMVar = forall a. HasCallStack => a
undefined
, maxWorkerLimit :: Limit
maxWorkerLimit = forall a. Ord a => a -> a -> a
min (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Limit
getMaxThreads State Stream m a
st) (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Limit
getMaxBuffer State Stream m a
st)
, yieldRateInfo :: Maybe YieldRateInfo
yieldRateInfo = Maybe YieldRateInfo
rateInfo
, outputDoorBell :: MVar ()
outputDoorBell = MVar ()
outQMv
, outputDoorBellFromConsumer :: MVar ()
outputDoorBellFromConsumer = forall a. HasCallStack => a
undefined
, readOutputQ :: m [ChildEvent a]
readOutputQ = SVar Stream m a -> m [ChildEvent a]
readOutput SVar Stream m a
sv
, postProcess :: m Bool
postProcess = SVar Stream m a -> m Bool
postProc SVar Stream m a
sv
, workerThreads :: IORef (Set ThreadId)
workerThreads = IORef (Set ThreadId)
running
, workLoop :: Maybe WorkerInfo -> m ()
workLoop = LinkedQueue (Stream m a)
-> State Stream m a -> SVar Stream m a -> Maybe WorkerInfo -> m ()
wloop LinkedQueue (Stream m a)
q State Stream m a
st{streamVar :: Maybe (SVar Stream m a)
streamVar = forall a. a -> Maybe a
Just SVar Stream m a
sv} SVar Stream m a
sv
, enqueue :: Stream m a -> IO ()
enqueue = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> LinkedQueue (t m a) -> t m a -> IO ()
enqueueFIFO SVar Stream m a
sv LinkedQueue (Stream m a)
q
, isWorkDone :: IO Bool
isWorkDone = SVar Stream m a -> IO Bool
workDone SVar Stream m a
sv
, isQueueDone :: IO Bool
isQueueDone = SVar Stream m a -> IO Bool
workDone SVar Stream m a
sv
, needDoorBell :: IORef Bool
needDoorBell = IORef Bool
wfw
, svarStyle :: SVarStyle
svarStyle = SVarStyle
WAsyncVar
, svarStopStyle :: SVarStopStyle
svarStopStyle = SVarStopStyle
StopNone
, svarStopBy :: IORef ThreadId
svarStopBy = forall a. HasCallStack => a
undefined
, svarMrun :: RunInIO m
svarMrun = RunInIO m
mrun
, workerCount :: IORef Int
workerCount = IORef Int
active
, accountThread :: ThreadId -> m ()
accountThread = forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> ThreadId -> m ()
delThread SVar Stream m a
sv
, workerStopMVar :: MVar ()
workerStopMVar = forall a. HasCallStack => a
undefined
, svarRef :: Maybe (IORef ())
svarRef = forall a. Maybe a
Nothing
, svarInspectMode :: Bool
svarInspectMode = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Bool
getInspectMode State Stream m a
st
, svarCreator :: ThreadId
svarCreator = ThreadId
tid
, aheadWorkQueue :: IORef ([Stream m a], Int)
aheadWorkQueue = forall a. HasCallStack => a
undefined
, outputHeap :: IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
outputHeap = forall a. HasCallStack => a
undefined
, svarStats :: SVarStats
svarStats = SVarStats
stats
}
let sv :: SVar Stream m a
sv =
case forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe Rate
getStreamRate State Stream m a
st of
Maybe Rate
Nothing ->
case forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe Count
getYieldLimit State Stream m a
st of
Maybe Count
Nothing -> SVar Stream m a
-> (SVar Stream m a -> m [ChildEvent a])
-> (SVar Stream m a -> m Bool)
-> (SVar Stream m a -> IO Bool)
-> (LinkedQueue (Stream m a)
-> State Stream m a -> SVar Stream m a -> Maybe WorkerInfo -> m ())
-> SVar Stream m a
getSVar SVar Stream m a
sv forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m [ChildEvent a]
readOutputQBounded
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m Bool
postProcessBounded
forall {p}. p -> IO Bool
isWorkFinished
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
LinkedQueue (Stream m a)
-> State Stream m a -> SVar Stream m a -> Maybe WorkerInfo -> m ()
workLoopFIFO
Just Count
_ -> SVar Stream m a
-> (SVar Stream m a -> m [ChildEvent a])
-> (SVar Stream m a -> m Bool)
-> (SVar Stream m a -> IO Bool)
-> (LinkedQueue (Stream m a)
-> State Stream m a -> SVar Stream m a -> Maybe WorkerInfo -> m ())
-> SVar Stream m a
getSVar SVar Stream m a
sv forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m [ChildEvent a]
readOutputQBounded
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m Bool
postProcessBounded
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
isWorkFinishedLimited
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
LinkedQueue (Stream m a)
-> State Stream m a -> SVar Stream m a -> Maybe WorkerInfo -> m ()
workLoopFIFOLimited
Just Rate
_ ->
case forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe Count
getYieldLimit State Stream m a
st of
Maybe Count
Nothing -> SVar Stream m a
-> (SVar Stream m a -> m [ChildEvent a])
-> (SVar Stream m a -> m Bool)
-> (SVar Stream m a -> IO Bool)
-> (LinkedQueue (Stream m a)
-> State Stream m a -> SVar Stream m a -> Maybe WorkerInfo -> m ())
-> SVar Stream m a
getSVar SVar Stream m a
sv forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m [ChildEvent a]
readOutputQPaced
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m Bool
postProcessPaced
forall {p}. p -> IO Bool
isWorkFinished
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
LinkedQueue (Stream m a)
-> State Stream m a -> SVar Stream m a -> Maybe WorkerInfo -> m ()
workLoopFIFO
Just Count
_ -> SVar Stream m a
-> (SVar Stream m a -> m [ChildEvent a])
-> (SVar Stream m a -> m Bool)
-> (SVar Stream m a -> IO Bool)
-> (LinkedQueue (Stream m a)
-> State Stream m a -> SVar Stream m a -> Maybe WorkerInfo -> m ())
-> SVar Stream m a
getSVar SVar Stream m a
sv forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m [ChildEvent a]
readOutputQPaced
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m Bool
postProcessPaced
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
isWorkFinishedLimited
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
LinkedQueue (Stream m a)
-> State Stream m a -> SVar Stream m a -> Maybe WorkerInfo -> m ()
workLoopFIFOLimited
in forall (m :: * -> *) a. Monad m => a -> m a
return SVar Stream m a
sv
{-# INLINABLE newAsyncVar #-}
newAsyncVar :: MonadAsync m
=> State Stream m a -> Stream m a -> m (SVar Stream m a)
newAsyncVar :: forall (m :: * -> *) a.
MonadAsync m =>
State Stream m a -> Stream m a -> m (SVar Stream m a)
newAsyncVar State Stream m a
st Stream m a
m = do
RunInIO m
mrun <- forall (m :: * -> *). MonadBaseControl IO m => m (RunInIO m)
captureMonadState
SVar Stream m a
sv <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
MonadAsync m =>
State Stream m a -> RunInIO m -> IO (SVar Stream m a)
getLifoSVar State Stream m a
st RunInIO m
mrun
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> t m a -> m (SVar t m a)
sendFirstWorker SVar Stream m a
sv Stream m a
m
{-# INLINABLE mkAsyncK #-}
mkAsyncK :: (IsStream t, MonadAsync m) => t m a -> t m a
mkAsyncK :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
t m a -> t m a
mkAsyncK t m a
m = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
(forall r.
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
mkStream forall a b. (a -> b) -> a -> b
$ \State Stream m a
st a -> t m a -> m r
yld a -> m r
sng m r
stp -> do
SVar Stream m a
sv <- forall (m :: * -> *) a.
MonadAsync m =>
State Stream m a -> Stream m a -> m (SVar Stream m a)
newAsyncVar (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State Stream m a
st) (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> Stream m a
toStream t m a
m)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStream State Stream m a
st a -> t m a -> m r
yld a -> m r
sng m r
stp forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(MonadAsync m, IsStream t) =>
SVar Stream m a -> t m a
fromSVar SVar Stream m a
sv
{-# INLINE_NORMAL mkAsyncD #-}
mkAsyncD :: MonadAsync m => D.Stream m a -> D.Stream m a
mkAsyncD :: forall (m :: * -> *) a. MonadAsync m => Stream m a -> Stream m a
mkAsyncD Stream m a
m = forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream State Stream m a
-> Maybe (Stream m a) -> m (Step (Maybe (Stream m a)) a)
step forall a. Maybe a
Nothing
where
step :: State Stream m a
-> Maybe (Stream m a) -> m (Step (Maybe (Stream m a)) a)
step State Stream m a
gst Maybe (Stream m a)
Nothing = do
SVar Stream m a
sv <- forall (m :: * -> *) a.
MonadAsync m =>
State Stream m a -> Stream m a -> m (SVar Stream m a)
newAsyncVar State Stream m a
gst (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD Stream m a
m)
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
D.Skip forall a b. (a -> b) -> a -> b
$ forall a. a -> Maybe a
Just forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> Stream m a
D.fromSVar SVar Stream m a
sv
step State Stream m a
gst (Just (D.UnStream State Stream m a -> s -> m (Step s a)
step1 s
st)) = do
Step s a
r <- State Stream m a -> s -> m (Step s a)
step1 State Stream m a
gst s
st
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ case Step s a
r of
D.Yield a
a s
s -> forall s a. a -> s -> Step s a
D.Yield a
a (forall a. a -> Maybe a
Just forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream State Stream m a -> s -> m (Step s a)
step1 s
s)
D.Skip s
s -> forall s a. s -> Step s a
D.Skip (forall a. a -> Maybe a
Just forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream State Stream m a -> s -> m (Step s a)
step1 s
s)
Step s a
D.Stop -> forall s a. Step s a
D.Stop
{-# INLINE_NORMAL mkAsync #-}
mkAsync :: (K.IsStream t, MonadAsync m) => t m a -> t m a
mkAsync :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
t m a -> t m a
mkAsync = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a. MonadAsync m => Stream m a -> Stream m a
mkAsyncD forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD
{-# INLINABLE newWAsyncVar #-}
newWAsyncVar :: MonadAsync m
=> State Stream m a -> Stream m a -> m (SVar Stream m a)
newWAsyncVar :: forall (m :: * -> *) a.
MonadAsync m =>
State Stream m a -> Stream m a -> m (SVar Stream m a)
newWAsyncVar State Stream m a
st Stream m a
m = do
RunInIO m
mrun <- forall (m :: * -> *). MonadBaseControl IO m => m (RunInIO m)
captureMonadState
SVar Stream m a
sv <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
MonadAsync m =>
State Stream m a -> RunInIO m -> IO (SVar Stream m a)
getFifoSVar State Stream m a
st RunInIO m
mrun
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> t m a -> m (SVar t m a)
sendFirstWorker SVar Stream m a
sv Stream m a
m
forkSVarAsync :: (IsStream t, MonadAsync m)
=> SVarStyle -> t m a -> t m a -> t m a
forkSVarAsync :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
SVarStyle -> t m a -> t m a -> t m a
forkSVarAsync SVarStyle
style t m a
m1 t m a
m2 = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
(forall r.
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
mkStream forall a b. (a -> b) -> a -> b
$ \State Stream m a
st a -> t m a -> m r
yld a -> m r
sng m r
stp -> do
SVar Stream m a
sv <- case SVarStyle
style of
SVarStyle
AsyncVar -> forall (m :: * -> *) a.
MonadAsync m =>
State Stream m a -> Stream m a -> m (SVar Stream m a)
newAsyncVar State Stream m a
st (forall {t :: (* -> *) -> * -> *} {m :: * -> *} {a}.
(MonadIO m, IsStream t) =>
t m a -> Stream m a -> t m a
concurrently (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> Stream m a
toStream t m a
m1) (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> Stream m a
toStream t m a
m2))
SVarStyle
WAsyncVar -> forall (m :: * -> *) a.
MonadAsync m =>
State Stream m a -> Stream m a -> m (SVar Stream m a)
newWAsyncVar State Stream m a
st (forall {t :: (* -> *) -> * -> *} {m :: * -> *} {a}.
(MonadIO m, IsStream t) =>
t m a -> Stream m a -> t m a
concurrently (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> Stream m a
toStream t m a
m1) (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> Stream m a
toStream t m a
m2))
SVarStyle
_ -> forall a. HasCallStack => [Char] -> a
error [Char]
"illegal svar type"
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStream State Stream m a
st a -> t m a -> m r
yld a -> m r
sng m r
stp forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(MonadAsync m, IsStream t) =>
SVar Stream m a -> t m a
fromSVar SVar Stream m a
sv
where
concurrently :: t m a -> Stream m a -> t m a
concurrently t m a
ma Stream m a
mb = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
(forall r.
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
mkStream forall a b. (a -> b) -> a -> b
$ \State Stream m a
st a -> t m a -> m r
yld a -> m r
sng m r
stp -> do
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> t m a -> IO ()
enqueue (forall a. HasCallStack => Maybe a -> a
fromJust forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe (SVar t m a)
streamVar State Stream m a
st) Stream m a
mb
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStreamShared State Stream m a
st a -> t m a -> m r
yld a -> m r
sng m r
stp t m a
ma
{-# INLINE joinStreamVarAsync #-}
joinStreamVarAsync :: (IsStream t, MonadAsync m)
=> SVarStyle -> t m a -> t m a -> t m a
joinStreamVarAsync :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
SVarStyle -> t m a -> t m a -> t m a
joinStreamVarAsync SVarStyle
style t m a
m1 t m a
m2 = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
(forall r.
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
mkStream forall a b. (a -> b) -> a -> b
$ \State Stream m a
st a -> t m a -> m r
yld a -> m r
sng m r
stp ->
case forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe (SVar t m a)
streamVar State Stream m a
st of
Just SVar Stream m a
sv | forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStyle
svarStyle SVar Stream m a
sv forall a. Eq a => a -> a -> Bool
== SVarStyle
style -> do
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> t m a -> IO ()
enqueue SVar Stream m a
sv (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> Stream m a
toStream t m a
m2)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStreamShared State Stream m a
st a -> t m a -> m r
yld a -> m r
sng m r
stp t m a
m1
Maybe (SVar Stream m a)
_ -> forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStreamShared State Stream m a
st a -> t m a -> m r
yld a -> m r
sng m r
stp (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
SVarStyle -> t m a -> t m a -> t m a
forkSVarAsync SVarStyle
style t m a
m1 t m a
m2)
{-# INLINE async #-}
async :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a
async :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
t m a -> t m a -> t m a
async = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
SVarStyle -> t m a -> t m a -> t m a
joinStreamVarAsync SVarStyle
AsyncVar
{-# DEPRECATED (<|) "Please use 'async' instead." #-}
{-# INLINE (<|) #-}
(<|) :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a
<| :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
t m a -> t m a -> t m a
(<|) = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
t m a -> t m a -> t m a
async
{-# INLINE consMAsync #-}
{-# SPECIALIZE consMAsync :: IO a -> AsyncT IO a -> AsyncT IO a #-}
consMAsync :: MonadAsync m => m a -> AsyncT m a -> AsyncT m a
consMAsync :: forall (m :: * -> *) a.
MonadAsync m =>
m a -> AsyncT m a -> AsyncT m a
consMAsync m a
m AsyncT m a
r = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
Stream m a -> t m a
fromStream forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, IsStream t) =>
m a -> t m a
K.yieldM m a
m forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
t m a -> t m a -> t m a
`async` (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> Stream m a
toStream AsyncT m a
r)
newtype AsyncT m a = AsyncT {forall (m :: * -> *) a. AsyncT m a -> Stream m a
getAsyncT :: Stream m a}
deriving (forall (m :: * -> *) a. Monad m => m a -> AsyncT m a
forall (t :: (* -> *) -> * -> *).
(forall (m :: * -> *) a. Monad m => m a -> t m a) -> MonadTrans t
lift :: forall (m :: * -> *) a. Monad m => m a -> AsyncT m a
$clift :: forall (m :: * -> *) a. Monad m => m a -> AsyncT m a
MonadTrans)
type Async = AsyncT IO
asyncly :: IsStream t => AsyncT m a -> t m a
asyncly :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
AsyncT m a -> t m a
asyncly = forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
(m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
adapt
instance IsStream AsyncT where
toStream :: forall (m :: * -> *) a. AsyncT m a -> Stream m a
toStream = forall (m :: * -> *) a. AsyncT m a -> Stream m a
getAsyncT
fromStream :: forall (m :: * -> *) a. Stream m a -> AsyncT m a
fromStream = forall (m :: * -> *) a. Stream m a -> AsyncT m a
AsyncT
consM :: forall (m :: * -> *) a.
MonadAsync m =>
m a -> AsyncT m a -> AsyncT m a
consM = forall (m :: * -> *) a.
MonadAsync m =>
m a -> AsyncT m a -> AsyncT m a
consMAsync
|: :: forall (m :: * -> *) a.
MonadAsync m =>
m a -> AsyncT m a -> AsyncT m a
(|:) = forall (m :: * -> *) a.
MonadAsync m =>
m a -> AsyncT m a -> AsyncT m a
consMAsync
{-# INLINE mappendAsync #-}
{-# SPECIALIZE mappendAsync :: AsyncT IO a -> AsyncT IO a -> AsyncT IO a #-}
mappendAsync :: MonadAsync m => AsyncT m a -> AsyncT m a -> AsyncT m a
mappendAsync :: forall (m :: * -> *) a.
MonadAsync m =>
AsyncT m a -> AsyncT m a -> AsyncT m a
mappendAsync AsyncT m a
m1 AsyncT m a
m2 = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
Stream m a -> t m a
fromStream forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
t m a -> t m a -> t m a
async (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> Stream m a
toStream AsyncT m a
m1) (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> Stream m a
toStream AsyncT m a
m2)
instance MonadAsync m => Semigroup (AsyncT m a) where
<> :: AsyncT m a -> AsyncT m a -> AsyncT m a
(<>) = forall (m :: * -> *) a.
MonadAsync m =>
AsyncT m a -> AsyncT m a -> AsyncT m a
mappendAsync
instance MonadAsync m => Monoid (AsyncT m a) where
mempty :: AsyncT m a
mempty = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
K.nil
mappend :: AsyncT m a -> AsyncT m a -> AsyncT m a
mappend = forall a. Semigroup a => a -> a -> a
(<>)
{-# INLINE apAsync #-}
{-# SPECIALIZE apAsync :: AsyncT IO (a -> b) -> AsyncT IO a -> AsyncT IO b #-}
apAsync :: MonadAsync m => AsyncT m (a -> b) -> AsyncT m a -> AsyncT m b
apAsync :: forall (m :: * -> *) a b.
MonadAsync m =>
AsyncT m (a -> b) -> AsyncT m a -> AsyncT m b
apAsync (AsyncT Stream m (a -> b)
m1) (AsyncT Stream m a
m2) =
let f :: (a -> b) -> Stream m b
f a -> b
x1 = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
IsStream t =>
(forall c. t m c -> t m c -> t m c)
-> (a -> t m b) -> t m a -> t m b
K.concatMapBy forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
t m a -> t m a -> t m a
async (forall (f :: * -> *) a. Applicative f => a -> f a
pure forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> b
x1) Stream m a
m2
in forall (m :: * -> *) a. Stream m a -> AsyncT m a
AsyncT forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
IsStream t =>
(forall c. t m c -> t m c -> t m c)
-> (a -> t m b) -> t m a -> t m b
K.concatMapBy forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
t m a -> t m a -> t m a
async forall {b}. (a -> b) -> Stream m b
f Stream m (a -> b)
m1
instance (Monad m, MonadAsync m) => Applicative (AsyncT m) where
{-# INLINE pure #-}
pure :: forall a. a -> AsyncT m a
pure = forall (m :: * -> *) a. Stream m a -> AsyncT m a
AsyncT forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) a (m :: * -> *).
IsStream t =>
a -> t m a
K.yield
{-# INLINE (<*>) #-}
<*> :: forall a b. AsyncT m (a -> b) -> AsyncT m a -> AsyncT m b
(<*>) = forall (m :: * -> *) a b.
MonadAsync m =>
AsyncT m (a -> b) -> AsyncT m a -> AsyncT m b
apAsync
{-# INLINE bindAsync #-}
{-# SPECIALIZE bindAsync :: AsyncT IO a -> (a -> AsyncT IO b) -> AsyncT IO b #-}
bindAsync :: MonadAsync m => AsyncT m a -> (a -> AsyncT m b) -> AsyncT m b
bindAsync :: forall (m :: * -> *) a b.
MonadAsync m =>
AsyncT m a -> (a -> AsyncT m b) -> AsyncT m b
bindAsync AsyncT m a
m a -> AsyncT m b
f = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
Stream m a -> t m a
fromStream forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
IsStream t =>
(forall c. t m c -> t m c -> t m c)
-> t m a -> (a -> t m b) -> t m b
K.bindWith forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
t m a -> t m a -> t m a
async (forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
(m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
adapt AsyncT m a
m) (\a
a -> forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
(m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
adapt forall a b. (a -> b) -> a -> b
$ a -> AsyncT m b
f a
a)
instance MonadAsync m => Monad (AsyncT m) where
return :: forall a. a -> AsyncT m a
return = forall (f :: * -> *) a. Applicative f => a -> f a
pure
>>= :: forall a b. AsyncT m a -> (a -> AsyncT m b) -> AsyncT m b
(>>=) = forall (m :: * -> *) a b.
MonadAsync m =>
AsyncT m a -> (a -> AsyncT m b) -> AsyncT m b
bindAsync
MONAD_COMMON_INSTANCES(AsyncT, MONADPARALLEL)
{-# INLINE consMWAsync #-}
{-# SPECIALIZE consMWAsync :: IO a -> WAsyncT IO a -> WAsyncT IO a #-}
consMWAsync :: MonadAsync m => m a -> WAsyncT m a -> WAsyncT m a
consMWAsync :: forall (m :: * -> *) a.
MonadAsync m =>
m a -> WAsyncT m a -> WAsyncT m a
consMWAsync m a
m WAsyncT m a
r = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
Stream m a -> t m a
fromStream forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, IsStream t) =>
m a -> t m a
K.yieldM m a
m forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
t m a -> t m a -> t m a
`wAsync` (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> Stream m a
toStream WAsyncT m a
r)
{-# INLINE wAsync #-}
wAsync :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a
wAsync :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
t m a -> t m a -> t m a
wAsync = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
SVarStyle -> t m a -> t m a -> t m a
joinStreamVarAsync SVarStyle
WAsyncVar
newtype WAsyncT m a = WAsyncT {forall (m :: * -> *) a. WAsyncT m a -> Stream m a
getWAsyncT :: Stream m a}
deriving (forall (m :: * -> *) a. Monad m => m a -> WAsyncT m a
forall (t :: (* -> *) -> * -> *).
(forall (m :: * -> *) a. Monad m => m a -> t m a) -> MonadTrans t
lift :: forall (m :: * -> *) a. Monad m => m a -> WAsyncT m a
$clift :: forall (m :: * -> *) a. Monad m => m a -> WAsyncT m a
MonadTrans)
type WAsync = WAsyncT IO
wAsyncly :: IsStream t => WAsyncT m a -> t m a
wAsyncly :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
WAsyncT m a -> t m a
wAsyncly = forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
(m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
adapt
instance IsStream WAsyncT where
toStream :: forall (m :: * -> *) a. WAsyncT m a -> Stream m a
toStream = forall (m :: * -> *) a. WAsyncT m a -> Stream m a
getWAsyncT
fromStream :: forall (m :: * -> *) a. Stream m a -> WAsyncT m a
fromStream = forall (m :: * -> *) a. Stream m a -> WAsyncT m a
WAsyncT
consM :: forall (m :: * -> *) a.
MonadAsync m =>
m a -> WAsyncT m a -> WAsyncT m a
consM = forall (m :: * -> *) a.
MonadAsync m =>
m a -> WAsyncT m a -> WAsyncT m a
consMWAsync
|: :: forall (m :: * -> *) a.
MonadAsync m =>
m a -> WAsyncT m a -> WAsyncT m a
(|:) = forall (m :: * -> *) a.
MonadAsync m =>
m a -> WAsyncT m a -> WAsyncT m a
consMWAsync
{-# INLINE mappendWAsync #-}
{-# SPECIALIZE mappendWAsync :: WAsyncT IO a -> WAsyncT IO a -> WAsyncT IO a #-}
mappendWAsync :: MonadAsync m => WAsyncT m a -> WAsyncT m a -> WAsyncT m a
mappendWAsync :: forall (m :: * -> *) a.
MonadAsync m =>
WAsyncT m a -> WAsyncT m a -> WAsyncT m a
mappendWAsync WAsyncT m a
m1 WAsyncT m a
m2 = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
Stream m a -> t m a
fromStream forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
t m a -> t m a -> t m a
wAsync (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> Stream m a
toStream WAsyncT m a
m1) (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> Stream m a
toStream WAsyncT m a
m2)
instance MonadAsync m => Semigroup (WAsyncT m a) where
<> :: WAsyncT m a -> WAsyncT m a -> WAsyncT m a
(<>) = forall (m :: * -> *) a.
MonadAsync m =>
WAsyncT m a -> WAsyncT m a -> WAsyncT m a
mappendWAsync
instance MonadAsync m => Monoid (WAsyncT m a) where
mempty :: WAsyncT m a
mempty = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
K.nil
mappend :: WAsyncT m a -> WAsyncT m a -> WAsyncT m a
mappend = forall a. Semigroup a => a -> a -> a
(<>)
{-# INLINE apWAsync #-}
{-# SPECIALIZE apWAsync :: WAsyncT IO (a -> b) -> WAsyncT IO a -> WAsyncT IO b #-}
apWAsync :: MonadAsync m => WAsyncT m (a -> b) -> WAsyncT m a -> WAsyncT m b
apWAsync :: forall (m :: * -> *) a b.
MonadAsync m =>
WAsyncT m (a -> b) -> WAsyncT m a -> WAsyncT m b
apWAsync (WAsyncT Stream m (a -> b)
m1) (WAsyncT Stream m a
m2) =
let f :: (a -> b) -> Stream m b
f a -> b
x1 = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
IsStream t =>
(forall c. t m c -> t m c -> t m c)
-> (a -> t m b) -> t m a -> t m b
K.concatMapBy forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
t m a -> t m a -> t m a
wAsync (forall (f :: * -> *) a. Applicative f => a -> f a
pure forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> b
x1) Stream m a
m2
in forall (m :: * -> *) a. Stream m a -> WAsyncT m a
WAsyncT forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
IsStream t =>
(forall c. t m c -> t m c -> t m c)
-> (a -> t m b) -> t m a -> t m b
K.concatMapBy forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
t m a -> t m a -> t m a
wAsync forall {b}. (a -> b) -> Stream m b
f Stream m (a -> b)
m1
instance (Monad m, MonadAsync m) => Applicative (WAsyncT m) where
pure :: forall a. a -> WAsyncT m a
pure = forall (m :: * -> *) a. Stream m a -> WAsyncT m a
WAsyncT forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) a (m :: * -> *).
IsStream t =>
a -> t m a
K.yield
<*> :: forall a b. WAsyncT m (a -> b) -> WAsyncT m a -> WAsyncT m b
(<*>) = forall (m :: * -> *) a b.
MonadAsync m =>
WAsyncT m (a -> b) -> WAsyncT m a -> WAsyncT m b
apWAsync
{-# INLINE bindWAsync #-}
{-# SPECIALIZE bindWAsync :: WAsyncT IO a -> (a -> WAsyncT IO b) -> WAsyncT IO b #-}
bindWAsync :: MonadAsync m => WAsyncT m a -> (a -> WAsyncT m b) -> WAsyncT m b
bindWAsync :: forall (m :: * -> *) a b.
MonadAsync m =>
WAsyncT m a -> (a -> WAsyncT m b) -> WAsyncT m b
bindWAsync WAsyncT m a
m a -> WAsyncT m b
f = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
Stream m a -> t m a
fromStream forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
IsStream t =>
(forall c. t m c -> t m c -> t m c)
-> t m a -> (a -> t m b) -> t m b
K.bindWith forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
t m a -> t m a -> t m a
wAsync (forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
(m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
adapt WAsyncT m a
m) (\a
a -> forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
(m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
adapt forall a b. (a -> b) -> a -> b
$ a -> WAsyncT m b
f a
a)
instance MonadAsync m => Monad (WAsyncT m) where
return :: forall a. a -> WAsyncT m a
return = forall (f :: * -> *) a. Applicative f => a -> f a
pure
>>= :: forall a b. WAsyncT m a -> (a -> WAsyncT m b) -> WAsyncT m b
(>>=) = forall (m :: * -> *) a b.
MonadAsync m =>
WAsyncT m a -> (a -> WAsyncT m b) -> WAsyncT m b
bindWAsync
MONAD_COMMON_INSTANCES(WAsyncT, MONADPARALLEL)