module Streamly.Internal.Data.Stream.Concurrent.Channel.Interleave
(
newChannel
)
where
#include "inline.hs"
import Control.Concurrent (myThreadId)
import Control.Concurrent.MVar (newEmptyMVar)
import Control.Monad.IO.Class (MonadIO(liftIO))
import Data.Concurrent.Queue.MichaelScott (LinkedQueue, newQ, nullQ, tryPopR, pushL)
import Data.IORef (newIORef, readIORef)
import Streamly.Internal.Control.Concurrent
(MonadRunInIO, MonadAsync, RunInIO(..), askRunInIO, restoreM)
import Streamly.Internal.Data.Stream.Channel.Dispatcher (delThread)
import qualified Data.Set as Set
import qualified Streamly.Internal.Data.Stream.StreamK.Type as K
import Streamly.Internal.Data.Stream.Concurrent.Channel.Consumer
import Streamly.Internal.Data.Stream.Concurrent.Channel.Type
import Streamly.Internal.Data.Stream.Channel.Types
data WorkerStatus = Continue | Suspend
{-# INLINE enqueueFIFO #-}
enqueueFIFO ::
Channel m a
-> LinkedQueue (RunInIO m, K.StreamK m a)
-> (RunInIO m, K.StreamK m a)
-> IO ()
enqueueFIFO :: forall (m :: * -> *) a.
Channel m a
-> LinkedQueue (RunInIO m, StreamK m a)
-> (RunInIO m, StreamK m a)
-> IO ()
enqueueFIFO Channel m a
sv LinkedQueue (RunInIO m, StreamK m a)
q (RunInIO m, StreamK m a)
m = do
forall a. LinkedQueue a -> a -> IO ()
pushL LinkedQueue (RunInIO m, StreamK m a)
q (RunInIO m, StreamK m a)
m
IORef Bool -> MVar () -> IO ()
ringDoorBell (forall (m :: * -> *) a. Channel m a -> IORef Bool
doorBellOnWorkQ Channel m a
sv) (forall (m :: * -> *) a. Channel m a -> MVar ()
outputDoorBell Channel m a
sv)
{-# INLINE workLoopFIFO #-}
workLoopFIFO
:: MonadRunInIO m
=> LinkedQueue (RunInIO m, K.StreamK m a)
-> Channel m a
-> Maybe WorkerInfo
-> m ()
workLoopFIFO :: forall (m :: * -> *) a.
MonadRunInIO m =>
LinkedQueue (RunInIO m, StreamK m a)
-> Channel m a -> Maybe WorkerInfo -> m ()
workLoopFIFO LinkedQueue (RunInIO m, StreamK m a)
q Channel m a
sv Maybe WorkerInfo
winfo = m ()
run
where
run :: m ()
run = do
Maybe (RunInIO m, StreamK m a)
work <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. LinkedQueue a -> IO (Maybe a)
tryPopR LinkedQueue (RunInIO m, StreamK m a)
q
case Maybe (RunInIO m, StreamK m a)
work of
Maybe (RunInIO m, StreamK m a)
Nothing -> forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Channel m a -> Maybe WorkerInfo -> IO ()
stop Channel m a
sv Maybe WorkerInfo
winfo
Just (RunInIO forall b. m b -> IO (StM m b)
runin, StreamK m a
m) -> do
StM m WorkerStatus
r <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
forall a b. (a -> b) -> a -> b
$ forall b. m b -> IO (StM m b)
runin
forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared
forall a. HasCallStack => a
undefined a -> StreamK m a -> m WorkerStatus
yieldk forall {m :: * -> *}. MonadIO m => a -> m WorkerStatus
single (forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Continue) StreamK m a
m
WorkerStatus
res <- forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
StM m a -> m a
restoreM StM m WorkerStatus
r
case WorkerStatus
res of
WorkerStatus
Continue -> m ()
run
WorkerStatus
Suspend -> forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Channel m a -> Maybe WorkerInfo -> IO ()
stop Channel m a
sv Maybe WorkerInfo
winfo
single :: a -> m WorkerStatus
single a
a = do
Bool
res <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
Channel m a -> Maybe WorkerInfo -> a -> IO Bool
yield Channel m a
sv Maybe WorkerInfo
winfo a
a
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ if Bool
res then WorkerStatus
Continue else WorkerStatus
Suspend
yieldk :: a -> StreamK m a -> m WorkerStatus
yieldk a
a StreamK m a
r = do
Bool
res <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
Channel m a -> Maybe WorkerInfo -> a -> IO Bool
yield Channel m a
sv Maybe WorkerInfo
winfo a
a
RunInIO m
runInIO <- forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
Channel m a
-> LinkedQueue (RunInIO m, StreamK m a)
-> (RunInIO m, StreamK m a)
-> IO ()
enqueueFIFO Channel m a
sv LinkedQueue (RunInIO m, StreamK m a)
q (RunInIO m
runInIO, StreamK m a
r)
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ if Bool
res then WorkerStatus
Continue else WorkerStatus
Suspend
{-# INLINE workLoopFIFOLimited #-}
workLoopFIFOLimited
:: forall m a. MonadRunInIO m
=> LinkedQueue (RunInIO m, K.StreamK m a)
-> Channel m a
-> Maybe WorkerInfo
-> m ()
workLoopFIFOLimited :: forall (m :: * -> *) a.
MonadRunInIO m =>
LinkedQueue (RunInIO m, StreamK m a)
-> Channel m a -> Maybe WorkerInfo -> m ()
workLoopFIFOLimited LinkedQueue (RunInIO m, StreamK m a)
q Channel m a
sv Maybe WorkerInfo
winfo = m ()
run
where
incrContinue :: m WorkerStatus
incrContinue =
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Maybe (IORef Count) -> IO ()
incrementYieldLimit (forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv)) forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Continue
run :: m ()
run = do
Maybe (RunInIO m, StreamK m a)
work <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. LinkedQueue a -> IO (Maybe a)
tryPopR LinkedQueue (RunInIO m, StreamK m a)
q
case Maybe (RunInIO m, StreamK m a)
work of
Maybe (RunInIO m, StreamK m a)
Nothing -> forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Channel m a -> Maybe WorkerInfo -> IO ()
stop Channel m a
sv Maybe WorkerInfo
winfo
Just (RunInIO forall b. m b -> IO (StM m b)
runin, StreamK m a
m) -> do
Bool
yieldLimitOk <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ Maybe (IORef Count) -> IO Bool
decrementYieldLimit (forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv)
if Bool
yieldLimitOk
then do
StM m WorkerStatus
r <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
forall a b. (a -> b) -> a -> b
$ forall b. m b -> IO (StM m b)
runin
forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared
forall a. HasCallStack => a
undefined a -> StreamK m a -> m WorkerStatus
yieldk forall {m :: * -> *}. MonadIO m => a -> m WorkerStatus
single m WorkerStatus
incrContinue StreamK m a
m
WorkerStatus
res <- forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
StM m a -> m a
restoreM StM m WorkerStatus
r
case WorkerStatus
res of
WorkerStatus
Continue -> m ()
run
WorkerStatus
Suspend -> forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Channel m a -> Maybe WorkerInfo -> IO ()
stop Channel m a
sv Maybe WorkerInfo
winfo
else forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ do
forall (m :: * -> *) a.
Channel m a
-> LinkedQueue (RunInIO m, StreamK m a)
-> (RunInIO m, StreamK m a)
-> IO ()
enqueueFIFO Channel m a
sv LinkedQueue (RunInIO m, StreamK m a)
q (forall (m :: * -> *). (forall b. m b -> IO (StM m b)) -> RunInIO m
RunInIO forall b. m b -> IO (StM m b)
runin, StreamK m a
m)
Maybe (IORef Count) -> IO ()
incrementYieldLimit (forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv)
forall (m :: * -> *) a. Channel m a -> Maybe WorkerInfo -> IO ()
stop Channel m a
sv Maybe WorkerInfo
winfo
single :: a -> m WorkerStatus
single a
a = do
Bool
res <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
Channel m a -> Maybe WorkerInfo -> a -> IO Bool
yield Channel m a
sv Maybe WorkerInfo
winfo a
a
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ if Bool
res then WorkerStatus
Continue else WorkerStatus
Suspend
yieldk :: a -> StreamK m a -> m WorkerStatus
yieldk a
a StreamK m a
r = do
Bool
res <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
Channel m a -> Maybe WorkerInfo -> a -> IO Bool
yield Channel m a
sv Maybe WorkerInfo
winfo a
a
RunInIO m
runInIO <- forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
Channel m a
-> LinkedQueue (RunInIO m, StreamK m a)
-> (RunInIO m, StreamK m a)
-> IO ()
enqueueFIFO Channel m a
sv LinkedQueue (RunInIO m, StreamK m a)
q (RunInIO m
runInIO, StreamK m a
r)
Bool
yieldLimitOk <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ Maybe (IORef Count) -> IO Bool
decrementYieldLimit (forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv)
if Bool
res Bool -> Bool -> Bool
&& Bool
yieldLimitOk
then forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Continue
else forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ do
Maybe (IORef Count) -> IO ()
incrementYieldLimit (forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv)
forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Suspend
getFifoSVar :: forall m a. MonadRunInIO m =>
RunInIO m -> Config -> IO (Channel m a)
getFifoSVar :: forall (m :: * -> *) a.
MonadRunInIO m =>
RunInIO m -> Config -> IO (Channel m a)
getFifoSVar RunInIO m
mrun Config
cfg = do
IORef ([ChildEvent a], Int)
outQ <- forall a. a -> IO (IORef a)
newIORef ([], Int
0)
MVar ()
outQMv <- forall a. IO (MVar a)
newEmptyMVar
IORef Int
active <- forall a. a -> IO (IORef a)
newIORef Int
0
IORef Bool
wfw <- forall a. a -> IO (IORef a)
newIORef Bool
False
IORef (Set ThreadId)
running <- forall a. a -> IO (IORef a)
newIORef forall a. Set a
Set.empty
LinkedQueue (RunInIO m, StreamK m a)
q <- forall a. IO (LinkedQueue a)
newQ
Maybe (IORef Count)
yl <- case Config -> Maybe Count
getYieldLimit Config
cfg of
Maybe Count
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a. Maybe a
Nothing
Just Count
x -> forall a. a -> Maybe a
Just forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. a -> IO (IORef a)
newIORef Count
x
Maybe YieldRateInfo
rateInfo <- Config -> IO (Maybe YieldRateInfo)
newRateInfo Config
cfg
SVarStats
stats <- IO SVarStats
newSVarStats
ThreadId
tid <- IO ThreadId
myThreadId
let isWorkFinished :: p -> IO Bool
isWorkFinished p
_ = forall a. LinkedQueue a -> IO Bool
nullQ LinkedQueue (RunInIO m, StreamK m a)
q
let isWorkFinishedLimited :: Channel m a -> IO Bool
isWorkFinishedLimited Channel m a
sv = do
Bool
yieldsDone <-
case forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv of
Just IORef Count
ref -> do
Count
n <- forall a. IORef a -> IO a
readIORef IORef Count
ref
forall (m :: * -> *) a. Monad m => a -> m a
return (Count
n forall a. Ord a => a -> a -> Bool
<= Count
0)
Maybe (IORef Count)
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
Bool
qEmpty <- forall a. LinkedQueue a -> IO Bool
nullQ LinkedQueue (RunInIO m, StreamK m a)
q
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ Bool
qEmpty Bool -> Bool -> Bool
|| Bool
yieldsDone
let getSVar :: Channel m a
-> (Channel m a -> m [ChildEvent a])
-> (Channel m a -> m Bool)
-> (Channel m a -> IO Bool)
-> (LinkedQueue (RunInIO m, K.StreamK m a)
-> Channel m a
-> Maybe WorkerInfo
-> m())
-> Channel m a
getSVar :: Channel m a
-> (Channel m a -> m [ChildEvent a])
-> (Channel m a -> m Bool)
-> (Channel m a -> IO Bool)
-> (LinkedQueue (RunInIO m, StreamK m a)
-> Channel m a -> Maybe WorkerInfo -> m ())
-> Channel m a
getSVar Channel m a
sv Channel m a -> m [ChildEvent a]
readOutput Channel m a -> m Bool
postProc Channel m a -> IO Bool
workDone LinkedQueue (RunInIO m, StreamK m a)
-> Channel m a -> Maybe WorkerInfo -> m ()
wloop = Channel
{ outputQueue :: IORef ([ChildEvent a], Int)
outputQueue = IORef ([ChildEvent a], Int)
outQ
, remainingWork :: Maybe (IORef Count)
remainingWork = Maybe (IORef Count)
yl
, maxBufferLimit :: Limit
maxBufferLimit = Config -> Limit
getMaxBuffer Config
cfg
, maxWorkerLimit :: Limit
maxWorkerLimit = forall a. Ord a => a -> a -> a
min (Config -> Limit
getMaxThreads Config
cfg) (Config -> Limit
getMaxBuffer Config
cfg)
, yieldRateInfo :: Maybe YieldRateInfo
yieldRateInfo = Maybe YieldRateInfo
rateInfo
, outputDoorBell :: MVar ()
outputDoorBell = MVar ()
outQMv
, readOutputQ :: m [ChildEvent a]
readOutputQ = Channel m a -> m [ChildEvent a]
readOutput Channel m a
sv
, postProcess :: m Bool
postProcess = Channel m a -> m Bool
postProc Channel m a
sv
, workerThreads :: IORef (Set ThreadId)
workerThreads = IORef (Set ThreadId)
running
, workLoop :: Maybe WorkerInfo -> m ()
workLoop = LinkedQueue (RunInIO m, StreamK m a)
-> Channel m a -> Maybe WorkerInfo -> m ()
wloop LinkedQueue (RunInIO m, StreamK m a)
q Channel m a
sv
, enqueue :: Bool -> (RunInIO m, StreamK m a) -> IO ()
enqueue = \Bool
_ -> forall (m :: * -> *) a.
Channel m a
-> LinkedQueue (RunInIO m, StreamK m a)
-> (RunInIO m, StreamK m a)
-> IO ()
enqueueFIFO Channel m a
sv LinkedQueue (RunInIO m, StreamK m a)
q
, eagerDispatch :: m ()
eagerDispatch = forall (m :: * -> *) a. Monad m => a -> m a
return ()
, isWorkDone :: IO Bool
isWorkDone = Channel m a -> IO Bool
workDone Channel m a
sv
, isQueueDone :: IO Bool
isQueueDone = Channel m a -> IO Bool
workDone Channel m a
sv
, doorBellOnWorkQ :: IORef Bool
doorBellOnWorkQ = IORef Bool
wfw
, svarMrun :: RunInIO m
svarMrun = RunInIO m
mrun
, workerCount :: IORef Int
workerCount = IORef Int
active
, accountThread :: ThreadId -> m ()
accountThread = forall (m :: * -> *).
MonadIO m =>
IORef (Set ThreadId) -> ThreadId -> m ()
delThread IORef (Set ThreadId)
running
, workerStopMVar :: MVar ()
workerStopMVar = forall a. HasCallStack => a
undefined
, svarRef :: Maybe (IORef ())
svarRef = forall a. Maybe a
Nothing
, svarInspectMode :: Bool
svarInspectMode = Config -> Bool
getInspectMode Config
cfg
, svarCreator :: ThreadId
svarCreator = ThreadId
tid
, svarStats :: SVarStats
svarStats = SVarStats
stats
}
let sv :: Channel m a
sv =
case Config -> Maybe Rate
getStreamRate Config
cfg of
Maybe Rate
Nothing ->
case Config -> Maybe Count
getYieldLimit Config
cfg of
Maybe Count
Nothing -> Channel m a
-> (Channel m a -> m [ChildEvent a])
-> (Channel m a -> m Bool)
-> (Channel m a -> IO Bool)
-> (LinkedQueue (RunInIO m, StreamK m a)
-> Channel m a -> Maybe WorkerInfo -> m ())
-> Channel m a
getSVar Channel m a
sv (forall (m :: * -> *) a.
MonadRunInIO m =>
Bool -> Channel m a -> m [ChildEvent a]
readOutputQBounded Bool
False)
forall (m :: * -> *) a. MonadRunInIO m => Channel m a -> m Bool
postProcessBounded
forall {p}. p -> IO Bool
isWorkFinished
forall (m :: * -> *) a.
MonadRunInIO m =>
LinkedQueue (RunInIO m, StreamK m a)
-> Channel m a -> Maybe WorkerInfo -> m ()
workLoopFIFO
Just Count
_ -> Channel m a
-> (Channel m a -> m [ChildEvent a])
-> (Channel m a -> m Bool)
-> (Channel m a -> IO Bool)
-> (LinkedQueue (RunInIO m, StreamK m a)
-> Channel m a -> Maybe WorkerInfo -> m ())
-> Channel m a
getSVar Channel m a
sv (forall (m :: * -> *) a.
MonadRunInIO m =>
Bool -> Channel m a -> m [ChildEvent a]
readOutputQBounded Bool
False)
forall (m :: * -> *) a. MonadRunInIO m => Channel m a -> m Bool
postProcessBounded
forall {m :: * -> *} {a}. Channel m a -> IO Bool
isWorkFinishedLimited
forall (m :: * -> *) a.
MonadRunInIO m =>
LinkedQueue (RunInIO m, StreamK m a)
-> Channel m a -> Maybe WorkerInfo -> m ()
workLoopFIFOLimited
Just Rate
_ ->
case Config -> Maybe Count
getYieldLimit Config
cfg of
Maybe Count
Nothing -> Channel m a
-> (Channel m a -> m [ChildEvent a])
-> (Channel m a -> m Bool)
-> (Channel m a -> IO Bool)
-> (LinkedQueue (RunInIO m, StreamK m a)
-> Channel m a -> Maybe WorkerInfo -> m ())
-> Channel m a
getSVar Channel m a
sv forall (m :: * -> *) a.
MonadRunInIO m =>
Channel m a -> m [ChildEvent a]
readOutputQPaced
forall (m :: * -> *) a. MonadRunInIO m => Channel m a -> m Bool
postProcessPaced
forall {p}. p -> IO Bool
isWorkFinished
forall (m :: * -> *) a.
MonadRunInIO m =>
LinkedQueue (RunInIO m, StreamK m a)
-> Channel m a -> Maybe WorkerInfo -> m ()
workLoopFIFO
Just Count
_ -> Channel m a
-> (Channel m a -> m [ChildEvent a])
-> (Channel m a -> m Bool)
-> (Channel m a -> IO Bool)
-> (LinkedQueue (RunInIO m, StreamK m a)
-> Channel m a -> Maybe WorkerInfo -> m ())
-> Channel m a
getSVar Channel m a
sv forall (m :: * -> *) a.
MonadRunInIO m =>
Channel m a -> m [ChildEvent a]
readOutputQPaced
forall (m :: * -> *) a. MonadRunInIO m => Channel m a -> m Bool
postProcessPaced
forall {m :: * -> *} {a}. Channel m a -> IO Bool
isWorkFinishedLimited
forall (m :: * -> *) a.
MonadRunInIO m =>
LinkedQueue (RunInIO m, StreamK m a)
-> Channel m a -> Maybe WorkerInfo -> m ()
workLoopFIFOLimited
in forall (m :: * -> *) a. Monad m => a -> m a
return Channel m a
sv
{-# INLINABLE newChannel #-}
{-# SPECIALIZE newChannel :: (Config -> Config) -> IO (Channel IO a) #-}
newChannel :: MonadAsync m =>
(Config -> Config) -> m (Channel m a)
newChannel :: forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> m (Channel m a)
newChannel Config -> Config
modifier = do
RunInIO m
mrun <- forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
MonadRunInIO m =>
RunInIO m -> Config -> IO (Channel m a)
getFifoSVar RunInIO m
mrun (Config -> Config
modifier Config
defaultConfig)