{-# LANGUAGE CPP #-}
{-# LANGUAGE ConstraintKinds #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE MagicHash #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE UnboxedTuples #-}
{-# LANGUAGE UndecidableInstances #-}
module Streamly.Core
(
MonadAsync
, Stream (..)
, nil
, cons
, singleton
, once
, repeat
, consM
, consMAhead
, consMAsync
, consMWAsync
, consMParallel
, serial
, wSerial
, ahead
, async
, wAsync
, parallel
, applyWith
, runWith
, zipWith
, zipAsyncWith
, SVar
, SVarStyle (..)
, newStreamVar1
, fromStreamVar
, toStreamVar
)
where
import Control.Concurrent (ThreadId, myThreadId,
threadDelay, getNumCapabilities)
import Control.Concurrent.MVar (MVar, newEmptyMVar,
tryPutMVar, takeMVar)
import Control.Exception (SomeException (..), catch, mask)
import Control.Monad (when)
import Control.Monad.Catch (MonadThrow, throwM)
import Control.Monad.IO.Class (MonadIO(..))
import Control.Monad.Trans.Class (MonadTrans (lift))
import Control.Monad.Trans.Control (MonadBaseControl, control)
import Data.Atomics (casIORef, readForCAS, peekTicket
,atomicModifyIORefCAS_
,writeBarrier,storeLoadBarrier)
import Data.Concurrent.Queue.MichaelScott (LinkedQueue, newQ, pushL,
tryPopR, nullQ)
import Data.Functor (void)
import Data.Heap (Heap, Entry(..))
import qualified Data.Heap as H
import Data.IORef (IORef, modifyIORef, newIORef,
readIORef, atomicModifyIORef
#ifdef DIAGNOSTICS
, writeIORef
#endif
)
import Data.Maybe (fromJust)
import Data.Semigroup (Semigroup(..))
import Data.Set (Set)
import qualified Data.Set as S
import Prelude hiding (repeat, zipWith)
import GHC.Exts
import GHC.Conc (ThreadId(..))
import GHC.IO (IO(..))
#ifdef DIAGNOSTICS
import Control.Concurrent.MVar (tryTakeMVar)
import Control.Exception (catches, throwIO, Handler(..),
BlockedIndefinitelyOnMVar(..),
BlockedIndefinitelyOnSTM(..))
import System.IO (hPutStrLn, stderr)
#endif
data ChildEvent a =
ChildYield a
| ChildStop ThreadId (Maybe SomeException)
data AheadHeapEntry m a =
AheadEntryPure a
| AheadEntryStream (Stream m a)
data SVarStyle =
AsyncVar
| WAsyncVar
| ParallelVar
| AheadVar
deriving (Eq, Show)
data SVar m a =
SVar {
svarStyle :: SVarStyle
, outputQueue :: IORef ([ChildEvent a], Int)
, doorBell :: MVar ()
, outputHeap :: IORef (Heap (Entry Int (AheadHeapEntry m a))
, Int
)
, workQueue :: IORef ([Stream m a], Int)
, enqueue :: Stream m a -> IO ()
, queueEmpty :: m Bool
, waitingForWork :: IORef Bool
, runqueue :: m ()
, runningThreads :: IORef (Set ThreadId)
, activeWorkers :: IORef Int
#ifdef DIAGNOSTICS
, totalDispatches :: IORef Int
, maxWorkers :: IORef Int
, maxOutQSize :: IORef Int
, maxHeapSize :: IORef Int
, maxWorkQSize :: IORef Int
#endif
}
#ifdef DIAGNOSTICS
{-# NOINLINE dumpSVar #-}
dumpSVar :: SVar m a -> IO String
dumpSVar sv = do
tid <- myThreadId
(oqList, oqLen) <- readIORef $ outputQueue sv
db <- tryTakeMVar $ doorBell sv
aheadDump <-
if svarStyle sv == AheadVar
then do
(oheap, oheapSeq) <- readIORef $ outputHeap sv
(wq, wqSeq) <- readIORef $ workQueue sv
maxHp <- readIORef $ maxHeapSize sv
return $ unlines
[ "heap length = " ++ show (H.size oheap)
, "heap seqeunce = " ++ show oheapSeq
, "work queue length = " ++ show (length wq)
, "work queue sequence = " ++ show wqSeq
, "heap max size = " ++ show maxHp
]
else return []
waiting <- readIORef $ waitingForWork sv
rthread <- readIORef $ runningThreads sv
workers <- readIORef $ activeWorkers sv
maxWrk <- readIORef $ maxWorkers sv
dispatches <- readIORef $ totalDispatches sv
maxOq <- readIORef $ maxOutQSize sv
return $ unlines
[ "tid = " ++ show tid
, "style = " ++ show (svarStyle sv)
, "outputQueue length computed = " ++ show (length oqList)
, "outputQueue length maintained = " ++ show oqLen
, "output doorBell = " ++ show db
, "total dispatches = " ++ show dispatches
, "max workers = " ++ show maxWrk
, "max outQSize = " ++ show maxOq
]
++ aheadDump ++ unlines
[ "waitingForWork = " ++ show waiting
, "running threads = " ++ show rthread
, "running thread count = " ++ show workers
]
{-# NOINLINE mvarExcHandler #-}
mvarExcHandler :: SVar m a -> String -> BlockedIndefinitelyOnMVar -> IO ()
mvarExcHandler sv label e@BlockedIndefinitelyOnMVar = do
svInfo <- dumpSVar sv
hPutStrLn stderr $ label ++ " " ++ "BlockedIndefinitelyOnMVar\n" ++ svInfo
throwIO e
{-# NOINLINE stmExcHandler #-}
stmExcHandler :: SVar m a -> String -> BlockedIndefinitelyOnSTM -> IO ()
stmExcHandler sv label e@BlockedIndefinitelyOnSTM = do
svInfo <- dumpSVar sv
hPutStrLn stderr $ label ++ " " ++ "BlockedIndefinitelyOnSTM\n" ++ svInfo
throwIO e
withDBGMVar :: SVar m a -> String -> IO () -> IO ()
withDBGMVar sv label action =
action `catches` [ Handler (mvarExcHandler sv label)
, Handler (stmExcHandler sv label)
]
#else
withDBGMVar :: SVar m a -> String -> IO () -> IO ()
withDBGMVar _ _ action = action
#endif
{-# INLINE atomicModifyIORefCAS #-}
atomicModifyIORefCAS :: IORef a -> (a -> (a,b)) -> IO b
atomicModifyIORefCAS ref fn = do
tkt <- readForCAS ref
loop tkt retries
where
retries = 25 :: Int
loop _ 0 = atomicModifyIORef ref fn
loop old tries = do
let (new, result) = fn $ peekTicket old
(success, tkt) <- casIORef ref old new
if success
then return result
else loop tkt (tries - 1)
newtype Stream m a =
Stream {
runStream :: forall r.
Maybe (SVar m a)
-> m r
-> (a -> m r)
-> (a -> Stream m a -> m r)
-> m r
}
nil :: Stream m a
nil = Stream $ \_ stp _ _ -> stp
cons :: a -> Stream m a -> Stream m a
cons a r = Stream $ \_ _ _ yld -> yld a r
singleton :: a -> Stream m a
singleton a = Stream $ \_ _ single _ -> single a
{-# INLINE once #-}
once :: Monad m => m a -> Stream m a
once m = Stream $ \_ _ single _ -> m >>= single
{-# INLINE consM #-}
consM :: Monad m => m a -> Stream m a -> Stream m a
consM m r = Stream $ \_ _ _ yld -> m >>= \a -> yld a r
repeat :: a -> Stream m a
repeat a = let x = cons a x in x
{-# INLINE serial #-}
serial :: Stream m a -> Stream m a -> Stream m a
serial m1 m2 = go m1
where
go (Stream m) = Stream $ \_ stp sng yld ->
let stop = (runStream m2) Nothing stp sng yld
single a = yld a m2
yield a r = yld a (go r)
in m Nothing stop single yield
instance Semigroup (Stream m a) where
(<>) = serial
instance Monoid (Stream m a) where
mempty = nil
mappend = (<>)
{-# INLINE wSerial #-}
wSerial :: Stream m a -> Stream m a -> Stream m a
wSerial m1 m2 = Stream $ \_ stp sng yld -> do
let stop = (runStream m2) Nothing stp sng yld
single a = yld a m2
yield a r = yld a (wSerial m2 r)
(runStream m1) Nothing stop single yield
type MonadAsync m = (MonadIO m, MonadBaseControl IO m, MonadThrow m)
{-# INLINE rawForkIO #-}
rawForkIO :: IO () -> IO ThreadId
rawForkIO action = IO $ \ s ->
case (fork# action s) of (# s1, tid #) -> (# s1, ThreadId tid #)
{-# INLINE doFork #-}
doFork :: MonadBaseControl IO m
=> m ()
-> (SomeException -> IO ())
-> m ThreadId
doFork action exHandler =
control $ \runInIO ->
mask $ \restore -> do
tid <- rawForkIO $ catch (restore $ void $ runInIO action)
exHandler
runInIO (return tid)
maxOutputQLen :: Int
maxOutputQLen = 1500
{-# NOINLINE send #-}
send :: SVar m a -> ChildEvent a -> IO Bool
send sv msg = do
len <- atomicModifyIORefCAS (outputQueue sv) $ \(es, n) ->
((msg : es, n + 1), n)
when (len <= 0) $ do
writeBarrier
void $ tryPutMVar (doorBell sv) ()
return (len < maxOutputQLen)
{-# NOINLINE sendStop #-}
sendStop :: SVar m a -> IO ()
sendStop sv = do
liftIO $ atomicModifyIORefCAS_ (activeWorkers sv) $ \n -> n - 1
myThreadId >>= \tid -> void $ send sv (ChildStop tid Nothing)
{-# INLINE enqueueLIFO #-}
enqueueLIFO :: SVar m a -> IORef [Stream m a] -> Stream m a -> IO ()
enqueueLIFO sv q m = do
atomicModifyIORefCAS_ q $ \ms -> m : ms
storeLoadBarrier
w <- readIORef $ waitingForWork sv
when w $ do
atomicModifyIORefCAS_ (waitingForWork sv) (const False)
void $ tryPutMVar (doorBell sv) ()
runqueueLIFO :: MonadIO m => SVar m a -> IORef [Stream m a] -> m ()
runqueueLIFO sv q = run
where
run = do
work <- dequeue
case work of
Nothing -> liftIO $ sendStop sv
Just m -> (runStream m) (Just sv) run single yield
single a = do
res <- liftIO $ send sv (ChildYield a)
if res then run else liftIO $ sendStop sv
yield a r = do
res <- liftIO $ send sv (ChildYield a)
if res
then (runStream r) (Just sv) run single yield
else liftIO $ enqueueLIFO sv q r >> sendStop sv
dequeue = liftIO $ atomicModifyIORefCAS q $ \case
[] -> ([], Nothing)
x : xs -> (xs, Just x)
{-# INLINE enqueueFIFO #-}
enqueueFIFO :: SVar m a -> LinkedQueue (Stream m a) -> Stream m a -> IO ()
enqueueFIFO sv q m = do
pushL q m
storeLoadBarrier
w <- readIORef $ waitingForWork sv
when w $ do
atomicModifyIORefCAS_ (waitingForWork sv) (const False)
void $ tryPutMVar (doorBell sv) ()
runqueueFIFO :: MonadIO m => SVar m a -> LinkedQueue (Stream m a) -> m ()
runqueueFIFO sv q = run
where
run = do
work <- dequeue
case work of
Nothing -> liftIO $ sendStop sv
Just m -> (runStream m) (Just sv) run single yield
dequeue = liftIO $ tryPopR q
single a = do
res <- liftIO $ send sv (ChildYield a)
if res then run else liftIO $ sendStop sv
yield a r = do
res <- liftIO $ send sv (ChildYield a)
liftIO (enqueueFIFO sv q r)
if res then run else liftIO $ sendStop sv
{-# NOINLINE runOne #-}
runOne :: MonadIO m => SVar m a -> Stream m a -> m ()
runOne sv m = (runStream m) (Just sv) stop single yield
where
stop = liftIO $ sendStop sv
sendit a = liftIO $ send sv (ChildYield a)
single a = sendit a >> stop
yield a r = void (sendit a) >> runOne sv r
{-# INLINE enqueueAhead #-}
enqueueAhead :: SVar m a -> IORef ([Stream m a], Int) -> Stream m a -> IO ()
enqueueAhead sv q m = do
atomicModifyIORefCAS_ q $ \ case
([], n) -> ([m], n + 1)
_ -> error "not empty"
storeLoadBarrier
w <- readIORef $ waitingForWork sv
when w $ do
atomicModifyIORefCAS_ (waitingForWork sv) (const False)
void $ tryPutMVar (doorBell sv) ()
runqueueAhead :: MonadIO m => SVar m a -> IORef ([Stream m a], Int) -> m ()
runqueueAhead sv q = runHeap
where
maxHeap = 1500
toHeap seqNo ent = do
hp <- liftIO $ atomicModifyIORefCAS (outputHeap sv) $ \(h, snum) ->
((H.insert (Entry seqNo ent) h, snum), h)
if H.size hp <= maxHeap
then runHeap
else liftIO $ sendStop sv
singleToHeap seqNo a = toHeap seqNo (AheadEntryPure a)
yieldToHeap seqNo a r = toHeap seqNo (AheadEntryStream (a `cons` r))
singleOutput seqNo a = do
continue <- liftIO $ send sv (ChildYield a)
if continue
then runQueueToken seqNo
else liftIO $ do
atomicModifyIORefCAS_ (outputHeap sv) $ \(h, _) -> (h, seqNo + 1)
sendStop sv
yieldOutput seqNo a r = do
continue <- liftIO $ send sv (ChildYield a)
if continue
then (runStream r) (Just sv) (runQueueToken seqNo)
(singleOutput seqNo)
(yieldOutput seqNo)
else liftIO $ do
atomicModifyIORefCAS_ (outputHeap sv) $ \(h, _) ->
(H.insert (Entry seqNo (AheadEntryStream r)) h, seqNo)
sendStop sv
{-# INLINE runQueueToken #-}
runQueueToken prevSeqNo = do
work <- dequeue
case work of
Nothing -> do
liftIO $ atomicModifyIORefCAS_ (outputHeap sv) $ \(h, _) ->
(h, prevSeqNo + 1)
runHeap
Just (m, seqNo) -> do
if seqNo == prevSeqNo + 1
then
(runStream m) (Just sv) (runQueueToken seqNo)
(singleOutput seqNo)
(yieldOutput seqNo)
else do
liftIO $ atomicModifyIORefCAS_ (outputHeap sv) $ \(h, _) ->
(h, prevSeqNo + 1)
(runStream m) (Just sv) runHeap
(singleToHeap seqNo)
(yieldToHeap seqNo)
runQueueNoToken = do
work <- dequeue
case work of
Nothing -> runHeap
Just (m, seqNo) -> do
if seqNo == 0
then
(runStream m) (Just sv) (runQueueToken seqNo)
(singleOutput seqNo)
(yieldOutput seqNo)
else
(runStream m) (Just sv) runHeap
(singleToHeap seqNo)
(yieldToHeap seqNo)
{-# NOINLINE runHeap #-}
runHeap = do
#ifdef DIAGNOSTICS
liftIO $ do
maxHp <- readIORef (maxHeapSize sv)
(hp, _) <- readIORef (outputHeap sv)
when (H.size hp > maxHp) $ writeIORef (maxHeapSize sv) (H.size hp)
#endif
ent <- liftIO $ dequeueFromHeap (outputHeap sv)
case ent of
Nothing -> do
done <- queueEmpty sv
if done
then liftIO $ sendStop sv
else runQueueNoToken
Just (Entry seqNo hent) -> do
case hent of
AheadEntryPure a -> singleOutput seqNo a
AheadEntryStream r ->
(runStream r) (Just sv) (runQueueToken seqNo)
(singleOutput seqNo)
(yieldOutput seqNo)
dequeue = liftIO $ do
atomicModifyIORefCAS q $ \case
([], n) -> (([], n), Nothing)
(x : [], n) -> (([], n), Just (x, n))
_ -> error "more than one item on queue"
dequeueFromHeap
:: IORef (Heap (Entry Int (AheadHeapEntry m a)), Int)
-> IO (Maybe (Entry Int (AheadHeapEntry m a)))
dequeueFromHeap hpRef = do
atomicModifyIORefCAS hpRef $ \hp@(h, snum) -> do
let r = H.uncons h
case r of
Nothing -> (hp, Nothing)
Just (ent@(Entry seqNo _ev), hp') ->
if (seqNo == snum)
then ((hp', seqNo), Just ent)
else (hp, Nothing)
{-# NOINLINE addThread #-}
addThread :: MonadIO m => SVar m a -> ThreadId -> m ()
addThread sv tid =
liftIO $ modifyIORef (runningThreads sv) (S.insert tid)
{-# INLINE delThread #-}
delThread :: MonadIO m => SVar m a -> ThreadId -> m ()
delThread sv tid =
liftIO $ modifyIORef (runningThreads sv) $ (\s -> S.delete tid s)
{-# INLINE modifyThread #-}
modifyThread :: MonadIO m => SVar m a -> ThreadId -> m ()
modifyThread sv tid = do
changed <- liftIO $ atomicModifyIORefCAS (runningThreads sv) $ \old ->
if (S.member tid old)
then let new = (S.delete tid old) in (new, new)
else let new = (S.insert tid old) in (new, old)
if null changed
then liftIO $ do
writeBarrier
void $ tryPutMVar (doorBell sv) ()
else return ()
{-# INLINE allThreadsDone #-}
allThreadsDone :: MonadIO m => SVar m a -> m Bool
allThreadsDone sv = liftIO $ S.null <$> readIORef (runningThreads sv)
{-# NOINLINE handleChildException #-}
handleChildException :: SVar m a -> SomeException -> IO ()
handleChildException sv e = do
tid <- myThreadId
void $ send sv (ChildStop tid (Just e))
#ifdef DIAGNOSTICS
recordMaxWorkers :: MonadIO m => SVar m a -> m ()
recordMaxWorkers sv = liftIO $ do
active <- readIORef (activeWorkers sv)
maxWrk <- readIORef (maxWorkers sv)
when (active > maxWrk) $ writeIORef (maxWorkers sv) active
modifyIORef (totalDispatches sv) (+1)
#endif
{-# NOINLINE pushWorker #-}
pushWorker :: MonadAsync m => SVar m a -> m ()
pushWorker sv = do
liftIO $ atomicModifyIORefCAS_ (activeWorkers sv) $ \n -> n + 1
#ifdef DIAGNOSTICS
recordMaxWorkers sv
#endif
doFork (runqueue sv) (handleChildException sv) >>= addThread sv
{-# NOINLINE pushWorkerPar #-}
pushWorkerPar :: MonadAsync m => SVar m a -> Stream m a -> m ()
pushWorkerPar sv m = do
#ifdef DIAGNOSTICS
liftIO $ atomicModifyIORefCAS_ (activeWorkers sv) $ \n -> n + 1
recordMaxWorkers sv
#endif
doFork (runOne sv m) (handleChildException sv) >>= modifyThread sv
{-# INLINE workDone #-}
workDone :: MonadIO m => SVar m a -> m Bool
workDone sv = do
heapDone <-
if (svarStyle sv == AheadVar)
then do
(hp, _) <- liftIO $ readIORef (outputHeap sv)
return (H.size hp <= 0)
else return True
queueDone <- queueEmpty sv
return $ queueDone && heapDone
maxWorkerLimit :: Int
maxWorkerLimit = 1500
dispatchWorker :: MonadAsync m => SVar m a -> m ()
dispatchWorker sv = do
done <- workDone sv
when (not done) $ do
cnt <- liftIO $ readIORef $ activeWorkers sv
when (cnt < maxWorkerLimit) $ pushWorker sv
{-# NOINLINE sendWorkerWait #-}
sendWorkerWait :: MonadAsync m => SVar m a -> m ()
sendWorkerWait sv = do
ncpu <- liftIO $ getNumCapabilities
if ncpu <= 1
then
if (svarStyle sv == AheadVar)
then liftIO $ threadDelay 100
else liftIO $ threadDelay 25
else
if (svarStyle sv == AheadVar)
then liftIO $ threadDelay 100
else liftIO $ threadDelay 10
(_, n) <- liftIO $ readIORef (outputQueue sv)
when (n <= 0) $ do
liftIO $ atomicModifyIORefCAS_ (waitingForWork sv) $ const True
liftIO $ storeLoadBarrier
dispatchWorker sv
done <- workDone sv
if done
then do
liftIO $ withDBGMVar sv "sendWorkerWait: nothing to do"
$ takeMVar (doorBell sv)
(_, len) <- liftIO $ readIORef (outputQueue sv)
when (len <= 0) $ sendWorkerWait sv
else sendWorkerWait sv
{-# NOINLINE fromStreamVar #-}
fromStreamVar :: MonadAsync m => SVar m a -> Stream m a
fromStreamVar sv = Stream $ \_ stp sng yld -> do
(list, _) <-
if svarStyle sv == ParallelVar
then do
liftIO $ withDBGMVar sv "fromStreamVar: doorbell"
$ takeMVar (doorBell sv)
readOutputQ sv
else do
res@(_, len) <- readOutputQ sv
if len <= 0
then blockingRead
else do
sendWorker
return res
runStream (processEvents $ reverse list) Nothing stp sng yld
where
{-# INLINE readOutputQ #-}
readOutputQ svr = liftIO $ do
(list, len) <- atomicModifyIORefCAS (outputQueue svr) $
\x -> (([],0), x)
#ifdef DIAGNOSTICS
oqLen <- readIORef (maxOutQSize svr)
when (len > oqLen) $ writeIORef (maxOutQSize svr) len
#endif
return (list, len)
sendWorker = do
cnt <- liftIO $ readIORef $ activeWorkers sv
when (cnt <= 0) $ do
done <- workDone sv
when (not done) $ pushWorker sv
{-# INLINE blockingRead #-}
blockingRead = do
sendWorkerWait sv
readOutputQ sv
allDone stp = do
#ifdef DIAGNOSTICS
#ifdef DIAGNOSTICS_VERBOSE
svInfo <- liftIO $ dumpSVar sv
liftIO $ hPutStrLn stderr $ "fromStreamVar done\n" ++ svInfo
#endif
#endif
stp
{-# INLINE processEvents #-}
processEvents [] = Stream $ \_ stp sng yld -> do
workersDone <- allThreadsDone sv
done <-
if svarStyle sv == ParallelVar
then return workersDone
else
if workersDone
then do
r <- workDone sv
when (not r) $ pushWorker sv
return r
else return False
if done
then allDone stp
else runStream (fromStreamVar sv) Nothing stp sng yld
processEvents (ev : es) = Stream $ \_ stp sng yld -> do
let rest = processEvents es
case ev of
ChildYield a -> yld a rest
ChildStop tid e -> do
if svarStyle sv == ParallelVar
then modifyThread sv tid
else delThread sv tid
case e of
Nothing -> runStream rest Nothing stp sng yld
Just ex -> throwM ex
getFifoSVar :: MonadIO m => SVarStyle -> IO (SVar m a)
getFifoSVar ctype = do
outQ <- newIORef ([], 0)
outQMv <- newEmptyMVar
active <- newIORef 0
wfw <- newIORef False
running <- newIORef S.empty
q <- newQ
#ifdef DIAGNOSTICS
disp <- newIORef 0
maxWrk <- newIORef 0
maxOq <- newIORef 0
maxHs <- newIORef 0
maxWq <- newIORef 0
#endif
let sv =
SVar { outputQueue = outQ
, doorBell = outQMv
, outputHeap = undefined
, runningThreads = running
, workQueue = undefined
, runqueue = runqueueFIFO sv q
, enqueue = enqueueFIFO sv q
, queueEmpty = liftIO $ nullQ q
, waitingForWork = wfw
, svarStyle = ctype
, activeWorkers = active
#ifdef DIAGNOSTICS
, totalDispatches = disp
, maxWorkers = maxWrk
, maxOutQSize = maxOq
, maxHeapSize = maxHs
, maxWorkQSize = maxWq
#endif
}
in return sv
getLifoSVar :: MonadIO m => SVarStyle -> IO (SVar m a)
getLifoSVar ctype = do
outQ <- newIORef ([], 0)
outQMv <- newEmptyMVar
active <- newIORef 0
wfw <- newIORef False
running <- newIORef S.empty
q <- newIORef []
#ifdef DIAGNOSTICS
disp <- newIORef 0
maxWrk <- newIORef 0
maxOq <- newIORef 0
maxHs <- newIORef 0
maxWq <- newIORef 0
#endif
let checkEmpty = null <$> liftIO (readIORef q)
let sv =
SVar { outputQueue = outQ
, doorBell = outQMv
, outputHeap = undefined
, runningThreads = running
, workQueue = undefined
, runqueue = runqueueLIFO sv q
, enqueue = enqueueLIFO sv q
, queueEmpty = checkEmpty
, waitingForWork = wfw
, svarStyle = ctype
, activeWorkers = active
#ifdef DIAGNOSTICS
, maxWorkers = maxWrk
, totalDispatches = disp
, maxOutQSize = maxOq
, maxHeapSize = maxHs
, maxWorkQSize = maxWq
#endif
}
in return sv
getParSVar :: SVarStyle -> IO (SVar m a)
getParSVar style = do
outQ <- newIORef ([], 0)
outQMv <- newEmptyMVar
active <- newIORef 0
wfw <- newIORef False
running <- newIORef S.empty
#ifdef DIAGNOSTICS
disp <- newIORef 0
maxWrk <- newIORef 0
maxOq <- newIORef 0
maxHs <- newIORef 0
maxWq <- newIORef 0
#endif
let sv =
SVar { outputQueue = outQ
, doorBell = outQMv
, outputHeap = undefined
, runningThreads = running
, workQueue = undefined
, runqueue = undefined
, enqueue = undefined
, queueEmpty = undefined
, waitingForWork = wfw
, svarStyle = style
, activeWorkers = active
#ifdef DIAGNOSTICS
, totalDispatches = disp
, maxWorkers = maxWrk
, maxOutQSize = maxOq
, maxHeapSize = maxHs
, maxWorkQSize = maxWq
#endif
}
in return sv
getAheadSVar :: MonadIO m => SVarStyle -> IO (SVar m a)
getAheadSVar style = do
outQ <- newIORef ([], 0)
outH <- newIORef (H.empty, 0)
outQMv <- newEmptyMVar
active <- newIORef 0
wfw <- newIORef False
running <- newIORef S.empty
q <- newIORef ([], -1)
#ifdef DIAGNOSTICS
disp <- newIORef 0
maxWrk <- newIORef 0
maxOq <- newIORef 0
maxHs <- newIORef 0
maxWq <- newIORef 0
#endif
let checkEmpty = liftIO $ do
(xs, _) <- readIORef q
return $ null xs
let sv =
SVar { outputQueue = outQ
, doorBell = outQMv
, outputHeap = outH
, runningThreads = running
, workQueue = q
, runqueue = runqueueAhead sv q
, enqueue = undefined
, queueEmpty = checkEmpty
, waitingForWork = wfw
, svarStyle = style
, activeWorkers = active
#ifdef DIAGNOSTICS
, totalDispatches = disp
, maxWorkers = maxWrk
, maxOutQSize = maxOq
, maxHeapSize = maxHs
, maxWorkQSize = maxWq
#endif
}
in return sv
newEmptySVar :: MonadAsync m => SVarStyle -> m (SVar m a)
newEmptySVar style = do
liftIO $
case style of
WAsyncVar -> getFifoSVar style
AsyncVar -> getLifoSVar style
ParallelVar -> getParSVar style
AheadVar -> getAheadSVar style
{-# INLINABLE newStreamVar1 #-}
newStreamVar1 :: MonadAsync m => SVarStyle -> Stream m a -> m (SVar m a)
newStreamVar1 style m = do
sv <- newEmptySVar style
if style == ParallelVar
then pushWorkerPar sv m
else do
liftIO $ (enqueue sv) m
pushWorker sv
return sv
{-# INLINABLE newStreamVarAhead #-}
newStreamVarAhead :: MonadAsync m => Stream m a -> m (SVar m a)
newStreamVarAhead m = do
sv <- newEmptySVar AheadVar
liftIO $ enqueueAhead sv (workQueue sv) m
pushWorker sv
return sv
toStreamVar :: MonadAsync m => SVar m a -> Stream m a -> m ()
toStreamVar sv m = do
liftIO $ (enqueue sv) m
done <- allThreadsDone sv
when done $ pushWorker sv
forkSVarAsync :: MonadAsync m => SVarStyle -> Stream m a -> Stream m a -> Stream m a
forkSVarAsync style m1 m2 = Stream $ \_ stp sng yld -> do
sv <- newStreamVar1 style (concurrently m1 m2)
(runStream (fromStreamVar sv)) Nothing stp sng yld
where
concurrently ma mb = Stream $ \svr stp sng yld -> do
liftIO $ enqueue (fromJust svr) mb
(runStream ma) svr stp sng yld
{-# INLINE joinStreamVarAsync #-}
joinStreamVarAsync :: MonadAsync m
=> SVarStyle -> Stream m a -> Stream m a -> Stream m a
joinStreamVarAsync style m1 m2 = Stream $ \svr stp sng yld ->
case svr of
Just sv | svarStyle sv == style ->
liftIO ((enqueue sv) m2) >> (runStream m1) svr stp sng yld
_ -> runStream (forkSVarAsync style m1 m2) Nothing stp sng yld
{-# NOINLINE forkSVarPar #-}
forkSVarPar :: MonadAsync m => Stream m a -> Stream m a -> Stream m a
forkSVarPar m r = Stream $ \_ stp sng yld -> do
sv <- newEmptySVar ParallelVar
pushWorkerPar sv m
pushWorkerPar sv r
(runStream (fromStreamVar sv)) Nothing stp sng yld
{-# INLINE joinStreamVarPar #-}
joinStreamVarPar :: MonadAsync m
=> SVarStyle -> Stream m a -> Stream m a -> Stream m a
joinStreamVarPar style m1 m2 = Stream $ \svr stp sng yld ->
case svr of
Just sv | svarStyle sv == style -> do
pushWorkerPar sv m1 >> (runStream m2) svr stp sng yld
_ -> runStream (forkSVarPar m1 m2) Nothing stp sng yld
forkSVarAhead :: MonadAsync m => Stream m a -> Stream m a -> Stream m a
forkSVarAhead m1 m2 = Stream $ \_ stp sng yld -> do
sv <- newStreamVarAhead (concurrently m1 m2)
(runStream (fromStreamVar sv)) Nothing stp sng yld
where
concurrently ma mb = Stream $ \svr stp sng yld -> do
liftIO $ enqueueAhead (fromJust svr) (workQueue (fromJust svr)) mb
(runStream ma) Nothing stp sng yld
{-# INLINE ahead #-}
ahead :: MonadAsync m => Stream m a -> Stream m a -> Stream m a
ahead m1 m2 = Stream $ \svr stp sng yld -> do
case svr of
Just sv | svarStyle sv == AheadVar -> do
liftIO $ enqueueAhead sv (workQueue sv) m2
(runStream m1) Nothing stp sng yld
_ -> runStream (forkSVarAhead m1 m2) Nothing stp sng yld
{-# INLINE consMAhead #-}
consMAhead :: MonadAsync m => m a -> Stream m a -> Stream m a
consMAhead m r = once m `ahead` r
{-# INLINE async #-}
async :: MonadAsync m => Stream m a -> Stream m a -> Stream m a
async = joinStreamVarAsync AsyncVar
{-# INLINE consMAsync #-}
consMAsync :: MonadAsync m => m a -> Stream m a -> Stream m a
consMAsync m r = once m `async` r
{-# INLINE wAsync #-}
wAsync :: MonadAsync m => Stream m a -> Stream m a -> Stream m a
wAsync = joinStreamVarAsync WAsyncVar
{-# INLINE consMWAsync #-}
consMWAsync :: MonadAsync m => m a -> Stream m a -> Stream m a
consMWAsync m r = once m `wAsync` r
{-# INLINE parallel #-}
parallel :: MonadAsync m => Stream m a -> Stream m a -> Stream m a
parallel = joinStreamVarPar ParallelVar
{-# INLINE consMParallel #-}
consMParallel :: MonadAsync m => m a -> Stream m a -> Stream m a
consMParallel m r = once m `parallel` r
instance Monad m => Functor (Stream m) where
fmap f m = Stream $ \_ stp sng yld ->
let single = sng . f
yield a r = yld (f a) (fmap f r)
in (runStream m) Nothing stp single yield
_alt :: Stream m a -> Stream m a -> Stream m a
_alt m1 m2 = Stream $ \_ stp sng yld ->
let stop = runStream m2 Nothing stp sng yld
in runStream m1 Nothing stop sng yld
applyWith :: MonadAsync m
=> SVarStyle -> (Stream m a -> Stream m b) -> Stream m a -> Stream m b
applyWith style f m = Stream $ \svr stp sng yld -> do
sv <- newStreamVar1 style m
runStream (f $ fromStreamVar sv) svr stp sng yld
runWith :: MonadAsync m
=> SVarStyle -> (Stream m a -> m b) -> Stream m a -> m b
runWith style f m = do
sv <- newStreamVar1 style m
f $ fromStreamVar sv
{-# INLINE zipWith #-}
zipWith :: (a -> b -> c) -> Stream m a -> Stream m b -> Stream m c
zipWith f m1 m2 = go m1 m2
where
go mx my = Stream $ \_ stp sng yld -> do
let merge a ra =
let single2 b = sng (f a b)
yield2 b rb = yld (f a b) (go ra rb)
in (runStream my) Nothing stp single2 yield2
let single1 a = merge a nil
yield1 a ra = merge a ra
(runStream mx) Nothing stp single1 yield1
{-# INLINE zipAsyncWith #-}
zipAsyncWith :: MonadAsync m
=> (a -> b -> c) -> Stream m a -> Stream m b -> Stream m c
zipAsyncWith f m1 m2 = Stream $ \_ stp sng yld -> do
ma <- mkAsync m1
mb <- mkAsync m2
(runStream (zipWith f ma mb)) Nothing stp sng yld
where
mkAsync :: MonadAsync m => Stream m a -> m (Stream m a)
mkAsync m = newStreamVar1 AsyncVar m
>>= return . fromStreamVar
instance MonadTrans Stream where
lift = once