{-# OPTIONS_HADDOCK not-home #-}
{-# LANGUAGE DeriveDataTypeable #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE Safe #-}
-------------------------------------------------------------------------------
-- |
-- Module      :  BroadcastChan.Extra
-- Copyright   :  (C) 2014-2021 Merijn Verstraaten
-- License     :  BSD-style (see the file LICENSE)
-- Maintainer  :  Merijn Verstraaten <merijn@inconsistent.nl>
-- Stability   :  experimental
-- Portability :  haha
--
-- Functions in this module are *NOT* intended to be used by regular users of
-- the library. Rather, they are intended for implementing parallel processing
-- libraries on top of @broadcast-chan@, such as @broadcast-chan-conduit@.
--
-- This module, while not for end users, is considered part of the public API,
-- so users can rely on PVP bounds to avoid breakage due to changes to this
-- module.
-------------------------------------------------------------------------------
module BroadcastChan.Extra
    ( Action(..)
    , BracketOnError(..)
    , Handler(..)
    , ThreadBracket(..)
    , mapHandler
    , runParallel
    , runParallelWith
    , runParallel_
    , runParallelWith_
    ) where

import Control.Concurrent (ThreadId, forkFinally, mkWeakThreadId, myThreadId)
import Control.Concurrent.MVar
import Control.Concurrent.QSem
import Control.Concurrent.QSemN
import Control.Exception (Exception(..), SomeException(..), bracketOnError)
import qualified Control.Exception as Exc
import Control.Monad ((>=>), join, replicateM, void)
import Control.Monad.Trans.Cont (ContT(..))
import Control.Monad.IO.Unlift (MonadIO(..))
import Data.Typeable (Typeable)
import System.Mem.Weak (Weak, deRefWeak)

import BroadcastChan.Internal

evalContT :: Monad m => ContT r m r -> m r
evalContT :: ContT r m r -> m r
evalContT ContT r m r
m = ContT r m r -> (r -> m r) -> m r
forall k (r :: k) (m :: k -> *) a. ContT r m a -> (a -> m r) -> m r
runContT ContT r m r
m r -> m r
forall (m :: * -> *) a. Monad m => a -> m a
return

-- DANGER! Breaks the invariant that you can't write to closed channels!
-- Only meant to be used in 'parallelCore'!
unsafeWriteBChan :: MonadIO m => BroadcastChan In a -> a -> m ()
unsafeWriteBChan :: BroadcastChan In a -> a -> m ()
unsafeWriteBChan (BChan MVar (Stream a)
writeVar) a
val = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
  Stream a
new_hole <- IO (Stream a)
forall a. IO (MVar a)
newEmptyMVar
  IO () -> IO ()
forall a. IO a -> IO a
Exc.mask_ (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
    Stream a
old_hole <- MVar (Stream a) -> IO (Stream a)
forall a. MVar a -> IO a
takeMVar MVar (Stream a)
writeVar
    -- old_hole is only full if the channel was previously closed
    Maybe (ChItem a)
item <- Stream a -> IO (Maybe (ChItem a))
forall a. MVar a -> IO (Maybe a)
tryTakeMVar Stream a
old_hole
    case Maybe (ChItem a)
item of
        Maybe (ChItem a)
Nothing -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
        Just ChItem a
Closed -> Stream a -> ChItem a -> IO ()
forall a. MVar a -> a -> IO ()
putMVar Stream a
new_hole ChItem a
forall a. ChItem a
Closed
        Just ChItem a
_ -> [Char] -> IO ()
forall a. HasCallStack => [Char] -> a
error [Char]
"unsafeWriteBChan hit an impossible condition!"
    Stream a -> ChItem a -> IO ()
forall a. MVar a -> a -> IO ()
putMVar Stream a
old_hole (a -> Stream a -> ChItem a
forall a. a -> Stream a -> ChItem a
ChItem a
val Stream a
new_hole)
    MVar (Stream a) -> Stream a -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar (Stream a)
writeVar Stream a
new_hole
{-# INLINE unsafeWriteBChan #-}

data Shutdown = Shutdown deriving (Int -> Shutdown -> ShowS
[Shutdown] -> ShowS
Shutdown -> [Char]
(Int -> Shutdown -> ShowS)
-> (Shutdown -> [Char]) -> ([Shutdown] -> ShowS) -> Show Shutdown
forall a.
(Int -> a -> ShowS) -> (a -> [Char]) -> ([a] -> ShowS) -> Show a
showList :: [Shutdown] -> ShowS
$cshowList :: [Shutdown] -> ShowS
show :: Shutdown -> [Char]
$cshow :: Shutdown -> [Char]
showsPrec :: Int -> Shutdown -> ShowS
$cshowsPrec :: Int -> Shutdown -> ShowS
Show, Typeable)
instance Exception Shutdown

-- | Action to take when an exception occurs while processing an element.
data Action
    = Drop
    -- ^ Drop the current element and continue processing.
    | Retry
    -- ^ Retry by appending the current element to the queue of remaining
    --   elements.
    | Terminate
    -- ^ Stop all processing and reraise the exception.
    deriving (Action -> Action -> Bool
(Action -> Action -> Bool)
-> (Action -> Action -> Bool) -> Eq Action
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: Action -> Action -> Bool
$c/= :: Action -> Action -> Bool
== :: Action -> Action -> Bool
$c== :: Action -> Action -> Bool
Eq, Int -> Action -> ShowS
[Action] -> ShowS
Action -> [Char]
(Int -> Action -> ShowS)
-> (Action -> [Char]) -> ([Action] -> ShowS) -> Show Action
forall a.
(Int -> a -> ShowS) -> (a -> [Char]) -> ([a] -> ShowS) -> Show a
showList :: [Action] -> ShowS
$cshowList :: [Action] -> ShowS
show :: Action -> [Char]
$cshow :: Action -> [Char]
showsPrec :: Int -> Action -> ShowS
$cshowsPrec :: Int -> Action -> ShowS
Show)

-- | Exception handler for parallel processing.
data Handler m a
    = Simple Action
    -- ^ Always take the specified 'Action'.
    | Handle (a -> SomeException -> m Action)
    -- ^ Allow inspection of the element, exception, and execution of monadic
    --   actions before deciding the 'Action' to take.

-- | Allocation, cleanup, and work actions for parallel processing. These
-- should be passed to an appropriate @bracketOnError@ function.
data BracketOnError m r
    = Bracket
    { BracketOnError m r -> IO [Weak ThreadId]
allocate :: IO [Weak ThreadId]
    -- ^ Allocation action that spawn threads and sets up handlers.
    , BracketOnError m r -> [Weak ThreadId] -> IO ()
cleanup :: [Weak ThreadId] -> IO ()
    -- ^ Cleanup action that handles exceptional termination
    , BracketOnError m r -> m r
action :: m r
    -- ^ Action that performs actual processing and waits for processing to
    --   finish and threads to terminate.
    }

-- | Datatype for specifying additional setup/cleanup around forking threads.
-- Used by 'runParallelWith' and 'runParallelWith_' to fix resource management
-- in @broadcast-chan-conduit@.
--
-- If the allocation action can fail/abort with an exception it __MUST__ take
-- care not to leak resources in these cases. In other words, IFF 'setupFork'
-- succeeds then this library will ensure the corresponding cleanup runs.
--
-- @since 0.2.1
data ThreadBracket
    = ThreadBracket
    { ThreadBracket -> IO ()
setupFork :: IO ()
    -- ^ Setup action to run before spawning a new thread.
    , ThreadBracket -> IO ()
cleanupFork :: IO ()
    -- ^ Normal cleanup action upon thread termination.
    , ThreadBracket -> IO ()
cleanupForkError :: IO ()
    -- ^ Exceptional cleanup action in case thread terminates due to an
    -- exception.
    }

noopBracket :: ThreadBracket
noopBracket :: ThreadBracket
noopBracket = ThreadBracket :: IO () -> IO () -> IO () -> ThreadBracket
ThreadBracket
    { setupFork :: IO ()
setupFork = () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
    , cleanupFork :: IO ()
cleanupFork = () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
    , cleanupForkError :: IO ()
cleanupForkError = () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
    }

-- | Convenience function for changing the monad the exception handler runs in.
mapHandler :: (m Action -> n Action) -> Handler m a -> Handler n a
mapHandler :: (m Action -> n Action) -> Handler m a -> Handler n a
mapHandler m Action -> n Action
_ (Simple Action
act) = Action -> Handler n a
forall (m :: * -> *) a. Action -> Handler m a
Simple Action
act
mapHandler m Action -> n Action
mmorph (Handle a -> SomeException -> m Action
f) = (a -> SomeException -> n Action) -> Handler n a
forall (m :: * -> *) a.
(a -> SomeException -> m Action) -> Handler m a
Handle ((a -> SomeException -> n Action) -> Handler n a)
-> (a -> SomeException -> n Action) -> Handler n a
forall a b. (a -> b) -> a -> b
$ \a
a SomeException
exc -> m Action -> n Action
mmorph (a -> SomeException -> m Action
f a
a SomeException
exc)

-- Workhorse function for runParallel_ and runParallel. Spawns threads, sets up
-- error handling, thread termination, etc.
parallelCore
    :: forall a m
     . MonadIO m
    => Handler IO a
    -> Int
    -> IO ()
    -> ThreadBracket
    -> (a -> IO ())
    -> m (IO [Weak ThreadId], [Weak ThreadId] -> IO (), a -> IO (), m ())
parallelCore :: Handler IO a
-> Int
-> IO ()
-> ThreadBracket
-> (a -> IO ())
-> m (IO [Weak ThreadId], [Weak ThreadId] -> IO (), a -> IO (),
      m ())
parallelCore Handler IO a
hndl Int
threads IO ()
onDrop ThreadBracket
threadBracket a -> IO ()
f = IO (IO [Weak ThreadId], [Weak ThreadId] -> IO (), a -> IO (), m ())
-> m (IO [Weak ThreadId], [Weak ThreadId] -> IO (), a -> IO (),
      m ())
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO
   (IO [Weak ThreadId], [Weak ThreadId] -> IO (), a -> IO (), m ())
 -> m (IO [Weak ThreadId], [Weak ThreadId] -> IO (), a -> IO (),
       m ()))
-> IO
     (IO [Weak ThreadId], [Weak ThreadId] -> IO (), a -> IO (), m ())
-> m (IO [Weak ThreadId], [Weak ThreadId] -> IO (), a -> IO (),
      m ())
forall a b. (a -> b) -> a -> b
$ do
    ThreadId
originTid <- IO ThreadId
myThreadId
    BroadcastChan In a
inChanIn <- IO (BroadcastChan In a)
forall (m :: * -> *) a. MonadIO m => m (BroadcastChan In a)
newBroadcastChan
    BroadcastChan Out a
inChanOut <- BroadcastChan In a -> IO (BroadcastChan Out a)
forall (m :: * -> *) (dir :: Direction) a.
MonadIO m =>
BroadcastChan dir a -> m (BroadcastChan Out a)
newBChanListener BroadcastChan In a
inChanIn
    QSemN
shutdownSem <- Int -> IO QSemN
newQSemN Int
0
    QSemN
endSem <- Int -> IO QSemN
newQSemN Int
0
    MVar (SomeException -> IO ())
excMVar <- (SomeException -> IO ()) -> IO (MVar (SomeException -> IO ()))
forall a. a -> IO (MVar a)
newMVar (ThreadId -> SomeException -> IO ()
forall e. Exception e => ThreadId -> e -> IO ()
Exc.throwTo ThreadId
originTid)

    let bufferValue :: a -> IO ()
        bufferValue :: a -> IO ()
bufferValue = IO Bool -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Bool -> IO ()) -> (a -> IO Bool) -> a -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. BroadcastChan In a -> a -> IO Bool
forall (m :: * -> *) a.
MonadIO m =>
BroadcastChan In a -> a -> m Bool
writeBChan BroadcastChan In a
inChanIn

        simpleHandler :: a -> SomeException -> Action -> IO ()
        simpleHandler :: a -> SomeException -> Action -> IO ()
simpleHandler a
val SomeException
exc Action
act = case Action
act of
            Action
Drop -> IO ()
onDrop
            Action
Retry -> BroadcastChan In a -> a -> IO ()
forall (m :: * -> *) a.
MonadIO m =>
BroadcastChan In a -> a -> m ()
unsafeWriteBChan BroadcastChan In a
inChanIn a
val
            Action
Terminate -> SomeException -> IO ()
forall e a. Exception e => e -> IO a
Exc.throwIO SomeException
exc

        handler :: a -> SomeException -> IO ()
        handler :: a -> SomeException -> IO ()
handler a
_ SomeException
exc | Just Shutdown
Shutdown <- SomeException -> Maybe Shutdown
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
exc = SomeException -> IO ()
forall e a. Exception e => e -> IO a
Exc.throwIO SomeException
exc
        handler a
val SomeException
exc = case Handler IO a
hndl of
            Simple Action
a -> a -> SomeException -> Action -> IO ()
simpleHandler a
val SomeException
exc Action
a
            Handle a -> SomeException -> IO Action
h -> a -> SomeException -> IO Action
h a
val SomeException
exc IO Action -> (Action -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= a -> SomeException -> Action -> IO ()
simpleHandler a
val SomeException
exc

        processInput :: IO ()
        processInput :: IO ()
processInput = do
            Maybe a
x <- BroadcastChan Out a -> IO (Maybe a)
forall (m :: * -> *) a.
MonadIO m =>
BroadcastChan Out a -> m (Maybe a)
readBChan BroadcastChan Out a
inChanOut
            case Maybe a
x of
                Maybe a
Nothing -> QSemN -> Int -> IO ()
signalQSemN QSemN
endSem Int
1
                Just a
a -> do
                    a -> IO ()
f a
a IO () -> (SomeException -> IO ()) -> IO ()
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`Exc.catch` a -> SomeException -> IO ()
handler a
a
                    IO ()
processInput

        unsafeAllocThread :: IO (Weak ThreadId)
        unsafeAllocThread :: IO (Weak ThreadId)
unsafeAllocThread = do
            IO ()
setupFork
            ThreadId
tid <- IO () -> (Either SomeException () -> IO ()) -> IO ThreadId
forall a. IO a -> (Either SomeException a -> IO ()) -> IO ThreadId
forkFinally IO ()
processInput ((Either SomeException () -> IO ()) -> IO ThreadId)
-> (Either SomeException () -> IO ()) -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ \Either SomeException ()
exit -> do
                QSemN -> Int -> IO ()
signalQSemN QSemN
shutdownSem Int
1
                case Either SomeException ()
exit of
                    Left SomeException
exc
                      | Just Shutdown
Shutdown <- SomeException -> Maybe Shutdown
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
exc -> IO ()
cleanupForkError
                      | Bool
otherwise -> do
                          IO ()
cleanupForkError
                          Maybe (SomeException -> IO ())
reportErr <- MVar (SomeException -> IO ())
-> IO (Maybe (SomeException -> IO ()))
forall a. MVar a -> IO (Maybe a)
tryTakeMVar MVar (SomeException -> IO ())
excMVar
                          case Maybe (SomeException -> IO ())
reportErr of
                              Maybe (SomeException -> IO ())
Nothing -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
                              Just SomeException -> IO ()
throw -> SomeException -> IO ()
throw SomeException
exc IO () -> (Shutdown -> IO ()) -> IO ()
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`Exc.catch` Shutdown -> IO ()
forall (m :: * -> *). Monad m => Shutdown -> m ()
shutdownHandler
                    Right () -> IO ()
cleanupFork

            ThreadId -> IO (Weak ThreadId)
mkWeakThreadId ThreadId
tid
          where
            shutdownHandler :: Shutdown -> m ()
shutdownHandler Shutdown
Shutdown = () -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()

        allocThread :: ContT r IO (Weak ThreadId)
        allocThread :: ContT r IO (Weak ThreadId)
allocThread = ((Weak ThreadId -> IO r) -> IO r) -> ContT r IO (Weak ThreadId)
forall k (r :: k) (m :: k -> *) a.
((a -> m r) -> m r) -> ContT r m a
ContT (((Weak ThreadId -> IO r) -> IO r) -> ContT r IO (Weak ThreadId))
-> ((Weak ThreadId -> IO r) -> IO r) -> ContT r IO (Weak ThreadId)
forall a b. (a -> b) -> a -> b
$ IO (Weak ThreadId)
-> (Weak ThreadId -> IO ()) -> (Weak ThreadId -> IO r) -> IO r
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracketOnError IO (Weak ThreadId)
unsafeAllocThread Weak ThreadId -> IO ()
killWeakThread

        allocateThreads :: IO [Weak ThreadId]
        allocateThreads :: IO [Weak ThreadId]
allocateThreads = ContT [Weak ThreadId] IO [Weak ThreadId] -> IO [Weak ThreadId]
forall (m :: * -> *) r. Monad m => ContT r m r -> m r
evalContT (ContT [Weak ThreadId] IO [Weak ThreadId] -> IO [Weak ThreadId])
-> ContT [Weak ThreadId] IO [Weak ThreadId] -> IO [Weak ThreadId]
forall a b. (a -> b) -> a -> b
$ Int
-> ContT [Weak ThreadId] IO (Weak ThreadId)
-> ContT [Weak ThreadId] IO [Weak ThreadId]
forall (m :: * -> *) a. Applicative m => Int -> m a -> m [a]
replicateM Int
threads ContT [Weak ThreadId] IO (Weak ThreadId)
forall r. ContT r IO (Weak ThreadId)
allocThread

        cleanup :: [Weak ThreadId] -> IO ()
        cleanup :: [Weak ThreadId] -> IO ()
cleanup [Weak ThreadId]
threadIds = IO () -> IO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> (IO () -> IO ()) -> IO () -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO () -> IO ()
forall a. IO a -> IO a
Exc.uninterruptibleMask_ (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
            (Weak ThreadId -> IO ()) -> [Weak ThreadId] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ Weak ThreadId -> IO ()
killWeakThread [Weak ThreadId]
threadIds
            QSemN -> Int -> IO ()
waitQSemN QSemN
shutdownSem Int
threads

        wait :: m ()
        wait :: m ()
wait = do
            BroadcastChan In a -> m Bool
forall (m :: * -> *) a. MonadIO m => BroadcastChan In a -> m Bool
closeBChan BroadcastChan In a
inChanIn
            IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ QSemN -> Int -> IO ()
waitQSemN QSemN
endSem Int
threads

    (IO [Weak ThreadId], [Weak ThreadId] -> IO (), a -> IO (), m ())
-> IO
     (IO [Weak ThreadId], [Weak ThreadId] -> IO (), a -> IO (), m ())
forall (m :: * -> *) a. Monad m => a -> m a
return (IO [Weak ThreadId]
allocateThreads, [Weak ThreadId] -> IO ()
cleanup, a -> IO ()
bufferValue, m ()
wait)
  where
    ThreadBracket{IO ()
setupFork :: IO ()
setupFork :: ThreadBracket -> IO ()
setupFork,IO ()
cleanupFork :: IO ()
cleanupFork :: ThreadBracket -> IO ()
cleanupFork,IO ()
cleanupForkError :: IO ()
cleanupForkError :: ThreadBracket -> IO ()
cleanupForkError} = ThreadBracket
threadBracket

    killWeakThread :: Weak ThreadId -> IO ()
    killWeakThread :: Weak ThreadId -> IO ()
killWeakThread Weak ThreadId
wTid = do
        Maybe ThreadId
tid <- Weak ThreadId -> IO (Maybe ThreadId)
forall v. Weak v -> IO (Maybe v)
deRefWeak Weak ThreadId
wTid
        case Maybe ThreadId
tid of
            Maybe ThreadId
Nothing -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
            Just ThreadId
t -> ThreadId -> Shutdown -> IO ()
forall e. Exception e => ThreadId -> e -> IO ()
Exc.throwTo ThreadId
t Shutdown
Shutdown

-- | Sets up parallel processing.
--
-- The workhorses of this function are the output yielder and \"stream\"
-- processing functions.
--
-- The output yielder is responsible for handling the produced @b@ values,
-- which if can either yield downstream ('Left') when used with something like
-- @conduit@ or @pipes@, or fold into a single results ('Right') when used to
-- run IO in parallel.
--
-- The stream processing function gets two arguments:
--
--     [@a -> m ()@] Should be used to buffer a number of elements equal to the
--                   number of threads.
--
--     [@a -> m b@] Which should be used to process the remainder of the
--                  element stream via, for example, 'Data.Conduit.mapM'.
--
-- See "BroadcastChan" or @broadcast-chan-conduit@ for examples.
--
-- The returned 'BracketOnError' has a 'allocate' action that takes care of
-- setting up 'Control.Concurrent.forkIO' threads and exception handlers. The
-- 'cleanup' action ensures all threads are terminate in case of an exception.
-- Finally, 'action' performs the actual parallel processing of elements.
runParallel
    :: forall a b m n r
     . (MonadIO m, MonadIO n)
    => Either (b -> n r) (r -> b -> n r)
    -- ^ Output yielder
    -> Handler IO a
    -- ^ Parallel processing exception handler
    -> Int
    -- ^ Number of threads to use
    -> (a -> IO b)
    -- ^ Function to run in parallel
    -> ((a -> m ()) -> (a -> m (Maybe b)) -> n r)
    -- ^ \"Stream\" processing function
    -> n (BracketOnError n r)
runParallel :: Either (b -> n r) (r -> b -> n r)
-> Handler IO a
-> Int
-> (a -> IO b)
-> ((a -> m ()) -> (a -> m (Maybe b)) -> n r)
-> n (BracketOnError n r)
runParallel = ThreadBracket
-> Either (b -> n r) (r -> b -> n r)
-> Handler IO a
-> Int
-> (a -> IO b)
-> ((a -> m ()) -> (a -> m (Maybe b)) -> n r)
-> n (BracketOnError n r)
forall a b (m :: * -> *) (n :: * -> *) r.
(MonadIO m, MonadIO n) =>
ThreadBracket
-> Either (b -> n r) (r -> b -> n r)
-> Handler IO a
-> Int
-> (a -> IO b)
-> ((a -> m ()) -> (a -> m (Maybe b)) -> n r)
-> n (BracketOnError n r)
runParallelWith ThreadBracket
noopBracket

-- | Like 'runParallel', but accepts a setup and cleanup action that will be
-- run before spawning a new thread and upon thread exit respectively.
--
-- The main use case is to properly manage the resource reference counts of
-- 'Control.Monad.Trans.Resource.ResourceT'.
--
-- If the setup throws an 'IO' exception or otherwise aborts, it __MUST__
-- ensure any allocated resource are freed. If it completes without an
-- exception, the cleanup is guaranteed to run (assuming proper use of
-- bracketing with the returned 'BracketOnError').
--
-- @since 0.2.1
runParallelWith
    :: forall a b m n r
     . (MonadIO m, MonadIO n)
    => ThreadBracket
    -- ^ Bracketing action used to manage resources across thread spawns
    -> Either (b -> n r) (r -> b -> n r)
    -- ^ Output yielder
    -> Handler IO a
    -- ^ Parallel processing exception handler
    -> Int
    -- ^ Number of threads to use
    -> (a -> IO b)
    -- ^ Function to run in parallel
    -> ((a -> m ()) -> (a -> m (Maybe b)) -> n r)
    -- ^ \"Stream\" processing function
    -> n (BracketOnError n r)
runParallelWith :: ThreadBracket
-> Either (b -> n r) (r -> b -> n r)
-> Handler IO a
-> Int
-> (a -> IO b)
-> ((a -> m ()) -> (a -> m (Maybe b)) -> n r)
-> n (BracketOnError n r)
runParallelWith ThreadBracket
threadBracket Either (b -> n r) (r -> b -> n r)
yielder Handler IO a
hndl Int
threads a -> IO b
work (a -> m ()) -> (a -> m (Maybe b)) -> n r
pipe = do
    BroadcastChan In (Maybe b)
outChanIn <- n (BroadcastChan In (Maybe b))
forall (m :: * -> *) a. MonadIO m => m (BroadcastChan In a)
newBroadcastChan
    BroadcastChan Out (Maybe b)
outChanOut <- BroadcastChan In (Maybe b) -> n (BroadcastChan Out (Maybe b))
forall (m :: * -> *) (dir :: Direction) a.
MonadIO m =>
BroadcastChan dir a -> m (BroadcastChan Out a)
newBChanListener BroadcastChan In (Maybe b)
outChanIn

    let process :: MonadIO f => a -> f ()
        process :: a -> f ()
process = IO () -> f ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> f ()) -> (a -> IO ()) -> a -> f ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (a -> IO b
work (a -> IO b) -> (b -> IO ()) -> a -> IO ()
forall (m :: * -> *) a b c.
Monad m =>
(a -> m b) -> (b -> m c) -> a -> m c
>=> IO Bool -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Bool -> IO ()) -> (b -> IO Bool) -> b -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. BroadcastChan In (Maybe b) -> Maybe b -> IO Bool
forall (m :: * -> *) a.
MonadIO m =>
BroadcastChan In a -> a -> m Bool
writeBChan BroadcastChan In (Maybe b)
outChanIn (Maybe b -> IO Bool) -> (b -> Maybe b) -> b -> IO Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. b -> Maybe b
forall a. a -> Maybe a
Just)

        notifyDrop :: IO ()
        notifyDrop :: IO ()
notifyDrop = IO Bool -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Bool -> IO ()) -> IO Bool -> IO ()
forall a b. (a -> b) -> a -> b
$ BroadcastChan In (Maybe b) -> Maybe b -> IO Bool
forall (m :: * -> *) a.
MonadIO m =>
BroadcastChan In a -> a -> m Bool
writeBChan BroadcastChan In (Maybe b)
outChanIn Maybe b
forall a. Maybe a
Nothing

    (IO [Weak ThreadId]
allocate, [Weak ThreadId] -> IO ()
cleanup, a -> IO ()
bufferValue, n ()
wait) <-
        Handler IO a
-> Int
-> IO ()
-> ThreadBracket
-> (a -> IO ())
-> n (IO [Weak ThreadId], [Weak ThreadId] -> IO (), a -> IO (),
      n ())
forall a (m :: * -> *).
MonadIO m =>
Handler IO a
-> Int
-> IO ()
-> ThreadBracket
-> (a -> IO ())
-> m (IO [Weak ThreadId], [Weak ThreadId] -> IO (), a -> IO (),
      m ())
parallelCore Handler IO a
hndl Int
threads IO ()
notifyDrop ThreadBracket
threadBracket a -> IO ()
forall (f :: * -> *). MonadIO f => a -> f ()
process

    let queueAndYield :: a -> m (Maybe b)
        queueAndYield :: a -> m (Maybe b)
queueAndYield a
x = do
            Maybe b
v <- Maybe (Maybe b) -> Maybe b
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (Maybe (Maybe b) -> Maybe b) -> m (Maybe (Maybe b)) -> m (Maybe b)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO (Maybe (Maybe b)) -> m (Maybe (Maybe b))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (BroadcastChan Out (Maybe b) -> IO (Maybe (Maybe b))
forall (m :: * -> *) a.
MonadIO m =>
BroadcastChan Out a -> m (Maybe a)
readBChan BroadcastChan Out (Maybe b)
outChanOut IO (Maybe (Maybe b)) -> IO () -> IO (Maybe (Maybe b))
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f a
<* a -> IO ()
bufferValue a
x)
            Maybe b -> m (Maybe b)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe b
v

        finish :: r -> n r
        finish :: r -> n r
finish r
r = do
            Maybe (Maybe b)
next <- BroadcastChan Out (Maybe b) -> n (Maybe (Maybe b))
forall (m :: * -> *) a.
MonadIO m =>
BroadcastChan Out a -> m (Maybe a)
readBChan BroadcastChan Out (Maybe b)
outChanOut
            case Maybe (Maybe b)
next of
                Maybe (Maybe b)
Nothing -> r -> n r
forall (m :: * -> *) a. Monad m => a -> m a
return r
r
                Just Maybe b
Nothing -> r -> n r
finish r
r
                Just (Just b
v) -> r -> b -> n r
foldFun r
r b
v n r -> (r -> n r) -> n r
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= r -> n r
finish

        action :: n r
        action :: n r
action = do
            r
result <- (a -> m ()) -> (a -> m (Maybe b)) -> n r
pipe (IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> (a -> IO ()) -> a -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> IO ()
bufferValue) a -> m (Maybe b)
queueAndYield
            n ()
wait
            BroadcastChan In (Maybe b) -> n Bool
forall (m :: * -> *) a. MonadIO m => BroadcastChan In a -> m Bool
closeBChan BroadcastChan In (Maybe b)
outChanIn
            r -> n r
finish r
result

    BracketOnError n r -> n (BracketOnError n r)
forall (m :: * -> *) a. Monad m => a -> m a
return Bracket :: forall (m :: * -> *) r.
IO [Weak ThreadId]
-> ([Weak ThreadId] -> IO ()) -> m r -> BracketOnError m r
Bracket{IO [Weak ThreadId]
allocate :: IO [Weak ThreadId]
allocate :: IO [Weak ThreadId]
allocate,[Weak ThreadId] -> IO ()
cleanup :: [Weak ThreadId] -> IO ()
cleanup :: [Weak ThreadId] -> IO ()
cleanup,n r
action :: n r
action :: n r
action}
  where
    foldFun :: r -> b -> n r
foldFun = case Either (b -> n r) (r -> b -> n r)
yielder of
        Left b -> n r
g -> (b -> n r) -> r -> b -> n r
forall a b. a -> b -> a
const b -> n r
g
        Right r -> b -> n r
g -> r -> b -> n r
g

-- | Sets up parallel processing for functions where we ignore the result.
--
-- The stream processing argument is the workhorse of this function. It gets a
-- (rate-limited) function @a -> m ()@ that queues @a@ values for processing.
-- This function should be applied to all @a@ elements that should be
-- processed. This would be either a partially applied 'Control.Monad.forM_'
-- for parallel processing, or something like conduit's 'Data.Conduit.mapM_' to
-- construct a \"sink\" for @a@ values. See "BroadcastChan" or
-- @broadcast-chan-conduit@ for examples.
--
-- The returned 'BracketOnError' has a 'allocate' action that takes care of
-- setting up 'Control.Concurrent.forkIO' threads and exception handlers. Th
-- 'cleanup' action ensures all threads are terminate in case of an exception.
-- Finally, 'action' performs the actual parallel processing of elements.
runParallel_
    :: (MonadIO m, MonadIO n)
    => Handler IO a
    -- ^ Parallel processing exception handler
    -> Int
    -- ^ Number of threads to use
    -> (a -> IO ())
    -- ^ Function to run in parallel
    -> ((a -> m ()) -> n r)
    -- ^ \"Stream\" processing function
    -> n (BracketOnError n r)
runParallel_ :: Handler IO a
-> Int
-> (a -> IO ())
-> ((a -> m ()) -> n r)
-> n (BracketOnError n r)
runParallel_ = ThreadBracket
-> Handler IO a
-> Int
-> (a -> IO ())
-> ((a -> m ()) -> n r)
-> n (BracketOnError n r)
forall (m :: * -> *) (n :: * -> *) a r.
(MonadIO m, MonadIO n) =>
ThreadBracket
-> Handler IO a
-> Int
-> (a -> IO ())
-> ((a -> m ()) -> n r)
-> n (BracketOnError n r)
runParallelWith_ ThreadBracket
noopBracket

-- | Like 'runParallel_', but accepts a setup and cleanup action that will be
-- run before spawning a new thread and upon thread exit respectively.
--
-- The main use case is to properly manage the resource reference counts of
-- 'Control.Monad.Trans.Resource.ResourceT'.
--
-- If the setup throws an 'IO' exception or otherwise aborts, it __MUST__
-- ensure any allocated resource are freed. If it completes without an
-- exception, the cleanup is guaranteed to run (assuming proper use of
-- bracketing with the returned 'BracketOnError').
--
-- @since 0.2.1
runParallelWith_
    :: (MonadIO m, MonadIO n)
    => ThreadBracket
    -- ^ Bracketing action used to manage resources across thread spawns
    -> Handler IO a
    -- ^ Parallel processing exception handler
    -> Int
    -- ^ Number of threads to use
    -> (a -> IO ())
    -- ^ Function to run in parallel
    -> ((a -> m ()) -> n r)
    -- ^ \"Stream\" processing function
    -> n (BracketOnError n r)
runParallelWith_ :: ThreadBracket
-> Handler IO a
-> Int
-> (a -> IO ())
-> ((a -> m ()) -> n r)
-> n (BracketOnError n r)
runParallelWith_ ThreadBracket
threadBracket Handler IO a
hndl Int
threads a -> IO ()
workFun (a -> m ()) -> n r
processElems = do
    QSem
sem <- IO QSem -> n QSem
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO QSem -> n QSem) -> IO QSem -> n QSem
forall a b. (a -> b) -> a -> b
$ Int -> IO QSem
newQSem Int
threads

    let process :: a -> IO ()
process a
x = QSem -> IO ()
signalQSem QSem
sem IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> a -> IO ()
workFun a
x

    (IO [Weak ThreadId]
allocate, [Weak ThreadId] -> IO ()
cleanup, a -> IO ()
bufferValue, n ()
wait) <-
        Handler IO a
-> Int
-> IO ()
-> ThreadBracket
-> (a -> IO ())
-> n (IO [Weak ThreadId], [Weak ThreadId] -> IO (), a -> IO (),
      n ())
forall a (m :: * -> *).
MonadIO m =>
Handler IO a
-> Int
-> IO ()
-> ThreadBracket
-> (a -> IO ())
-> m (IO [Weak ThreadId], [Weak ThreadId] -> IO (), a -> IO (),
      m ())
parallelCore Handler IO a
hndl Int
threads (() -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()) ThreadBracket
threadBracket a -> IO ()
process

    let action :: n r
action = do
            r
result <- (a -> m ()) -> n r
processElems ((a -> m ()) -> n r) -> (a -> m ()) -> n r
forall a b. (a -> b) -> a -> b
$ \a
v -> IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
                QSem -> IO ()
waitQSem QSem
sem
                a -> IO ()
bufferValue a
v
            n ()
wait
            r -> n r
forall (m :: * -> *) a. Monad m => a -> m a
return r
result

    BracketOnError n r -> n (BracketOnError n r)
forall (m :: * -> *) a. Monad m => a -> m a
return Bracket :: forall (m :: * -> *) r.
IO [Weak ThreadId]
-> ([Weak ThreadId] -> IO ()) -> m r -> BracketOnError m r
Bracket{IO [Weak ThreadId]
allocate :: IO [Weak ThreadId]
allocate :: IO [Weak ThreadId]
allocate,[Weak ThreadId] -> IO ()
cleanup :: [Weak ThreadId] -> IO ()
cleanup :: [Weak ThreadId] -> IO ()
cleanup,n r
action :: n r
action :: n r
action}