{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE Trustworthy #-}
{-# LANGUAGE ScopedTypeVariables #-}
module BroadcastChan.Conduit.Internal (parMapM, parMapM_) where
import Control.Monad ((>=>))
import Control.Monad.Trans.Resource (MonadResource)
import Control.Monad.Trans.Class (lift)
import Control.Monad.IO.Unlift (MonadUnliftIO(askUnliftIO), UnliftIO(..))
import Data.Acquire
(ReleaseType(ReleaseException), allocateAcquire, mkAcquireType)
import Data.Conduit
import qualified Data.Conduit.List as C
import Data.Foldable (traverse_)
import Data.Void (Void)
import BroadcastChan.Extra
(BracketOnError(..), Handler, mapHandler, runParallel, runParallel_)
bracketOnError :: MonadResource m => IO a -> (a -> IO ()) -> m r -> m r
bracketOnError alloc clean work =
allocateAcquire (mkAcquireType alloc cleanup) >>= const work
where
cleanup x ReleaseException = clean x
cleanup _ _ = return ()
parMapM
:: (MonadResource m, MonadUnliftIO m)
=> Handler m a
-> Int
-> (a -> m b)
-> ConduitM a b m ()
parMapM hnd threads workFun = do
UnliftIO runInIO <- lift askUnliftIO
Bracket{allocate,cleanup,action} <- runParallel
(Left yield)
(mapHandler runInIO hnd)
threads
(runInIO . workFun)
body
bracketOnError allocate cleanup action
where
body :: Monad m => (a -> m ()) -> (a -> m (Maybe b)) -> ConduitM a b m ()
body buffer process = do
C.isolate threads .| C.mapM_ buffer
awaitForever $ lift . process >=> traverse_ yield
parMapM_
:: (MonadResource m, MonadUnliftIO m)
=> Handler m a
-> Int
-> (a -> m ())
-> ConduitM a Void m ()
parMapM_ hnd threads workFun = do
UnliftIO runInIO <- lift askUnliftIO
Bracket{allocate,cleanup,action} <- runParallel_
(mapHandler runInIO hnd)
threads
(runInIO . workFun)
C.mapM_
bracketOnError allocate cleanup action