{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE Safe #-}
{-# LANGUAGE ScopedTypeVariables #-}
module BroadcastChan.Pipes.Internal (parMapM, parMapM_) where

import Control.Monad ((>=>), replicateM)
import Data.Foldable (traverse_)
import Pipes
import qualified Pipes.Prelude as P
import Pipes.Safe (MonadSafe)
import qualified Pipes.Safe as Safe

import BroadcastChan.Extra
    (BracketOnError(..), Handler, runParallel, runParallel_)

bracketOnError :: MonadSafe m => IO a -> (a -> IO b) -> m c -> m c
bracketOnError :: IO a -> (a -> IO b) -> m c -> m c
bracketOnError IO a
alloc a -> IO b
clean =
  Base m a -> (a -> Base m b) -> (a -> m c) -> m c
forall (m :: * -> *) a b c.
MonadSafe m =>
Base m a -> (a -> Base m b) -> (a -> m c) -> m c
Safe.bracketOnError (IO a -> Base m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO a
alloc) (IO b -> Base m b
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO b -> Base m b) -> (a -> IO b) -> a -> Base m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> IO b
clean) ((a -> m c) -> m c) -> (m c -> a -> m c) -> m c -> m c
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m c -> a -> m c
forall a b. a -> b -> a
const

-- | Create a producer that processes its inputs in parallel.
--
-- This function does __NOT__ guarantee that input elements are processed or
-- output in a deterministic order!
parMapM
    :: forall a b m
     . MonadSafe m
    => Handler IO a
    -- ^ Exception handler
    -> Int
    -- ^ Number of parallel threads to use
    -> (a -> IO b)
    -- ^ Function to run in parallel
    -> Producer a m ()
    -- ^ Input producer
    -> Producer b m ()
parMapM :: Handler IO a
-> Int -> (a -> IO b) -> Producer a m () -> Producer b m ()
parMapM Handler IO a
hndl Int
i a -> IO b
f Producer a m ()
prod = do
    Bracket{IO [Weak ThreadId]
allocate :: forall (m :: * -> *) r. BracketOnError m r -> IO [Weak ThreadId]
allocate :: IO [Weak ThreadId]
allocate,[Weak ThreadId] -> IO ()
cleanup :: forall (m :: * -> *) r.
BracketOnError m r -> [Weak ThreadId] -> IO ()
cleanup :: [Weak ThreadId] -> IO ()
cleanup,Producer b m ()
action :: forall (m :: * -> *) r. BracketOnError m r -> m r
action :: Producer b m ()
action} <- Either (b -> Producer b m ()) (() -> b -> Producer b m ())
-> Handler IO a
-> Int
-> (a -> IO b)
-> ((a -> m ()) -> (a -> m (Maybe b)) -> Producer b m ())
-> Proxy X () () b m (BracketOnError (Proxy X () () b m) ())
forall a b (m :: * -> *) (n :: * -> *) r.
(MonadIO m, MonadIO n) =>
Either (b -> n r) (r -> b -> n r)
-> Handler IO a
-> Int
-> (a -> IO b)
-> ((a -> m ()) -> (a -> m (Maybe b)) -> n r)
-> n (BracketOnError n r)
runParallel ((b -> Producer b m ())
-> Either (b -> Producer b m ()) (() -> b -> Producer b m ())
forall a b. a -> Either a b
Left b -> Producer b m ()
forall (m :: * -> *) a x' x. Functor m => a -> Proxy x' x () a m ()
yield) Handler IO a
hndl Int
i a -> IO b
f (a -> m ()) -> (a -> m (Maybe b)) -> Producer b m ()
body
    IO [Weak ThreadId]
-> ([Weak ThreadId] -> IO ()) -> Producer b m () -> Producer b m ()
forall (m :: * -> *) a b c.
MonadSafe m =>
IO a -> (a -> IO b) -> m c -> m c
bracketOnError IO [Weak ThreadId]
allocate [Weak ThreadId] -> IO ()
cleanup Producer b m ()
action
  where
    body :: (a -> m ()) -> (a -> m (Maybe b)) -> Producer b m ()
    body :: (a -> m ()) -> (a -> m (Maybe b)) -> Producer b m ()
body a -> m ()
buffer a -> m (Maybe b)
process = Producer a m ()
prod Producer a m () -> Proxy () a () b m () -> Producer b m ()
forall (m :: * -> *) a' a b r c' c.
Functor m =>
Proxy a' a () b m r -> Proxy () b c' c m r -> Proxy a' a c' c m r
>-> Proxy () a () b m ()
work
      where
        work :: Pipe a b m ()
        work :: Proxy () a () b m ()
work = do
            Int -> Proxy () a () b m () -> Proxy () a () b m [()]
forall (m :: * -> *) a. Applicative m => Int -> m a -> m [a]
replicateM Int
i (Proxy () a () b m a
forall (m :: * -> *) a. Functor m => Consumer' a m a
await Proxy () a () b m a
-> (a -> Proxy () a () b m ()) -> Proxy () a () b m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= m () -> Proxy () a () b m ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m () -> Proxy () a () b m ())
-> (a -> m ()) -> a -> Proxy () a () b m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> m ()
buffer)
            Proxy () a () a m ()
-> (a -> Proxy () a () b m ()) -> Proxy () a () b m ()
forall (m :: * -> *) x' x b' b a' c' c.
Functor m =>
Proxy x' x b' b m a'
-> (b -> Proxy x' x c' c m b') -> Proxy x' x c' c m a'
for Proxy () a () a m ()
forall (m :: * -> *) a r. Functor m => Pipe a a m r
cat ((a -> Proxy () a () b m ()) -> Proxy () a () b m ())
-> (a -> Proxy () a () b m ()) -> Proxy () a () b m ()
forall a b. (a -> b) -> a -> b
$ m (Maybe b) -> Proxy () a () b m (Maybe b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m (Maybe b) -> Proxy () a () b m (Maybe b))
-> (a -> m (Maybe b)) -> a -> Proxy () a () b m (Maybe b)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> m (Maybe b)
process (a -> Proxy () a () b m (Maybe b))
-> (Maybe b -> Proxy () a () b m ()) -> a -> Proxy () a () b m ()
forall (m :: * -> *) a b c.
Monad m =>
(a -> m b) -> (b -> m c) -> a -> m c
>=> (b -> Proxy () a () b m ()) -> Maybe b -> Proxy () a () b m ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ b -> Proxy () a () b m ()
forall (m :: * -> *) a x' x. Functor m => a -> Proxy x' x () a m ()
yield

-- | Create an Effect that processes its inputs in parallel.
--
-- This function does __NOT__ guarantee that input elements are processed or
-- output in a deterministic order!
parMapM_
    :: MonadSafe m
    => Handler IO a
    -- ^ Exception handler
    -> Int
    -- ^ Number of parallel threads to use
    -> (a -> IO ())
    -- ^ Function to run in parallel
    -> Producer a m r
    -- ^ Input producer
    -> Effect m r
parMapM_ :: Handler IO a -> Int -> (a -> IO ()) -> Producer a m r -> Effect m r
parMapM_ Handler IO a
hndl Int
i a -> IO ()
f Producer a m r
prod = do
    Bracket{IO [Weak ThreadId]
allocate :: IO [Weak ThreadId]
allocate :: forall (m :: * -> *) r. BracketOnError m r -> IO [Weak ThreadId]
allocate,[Weak ThreadId] -> IO ()
cleanup :: [Weak ThreadId] -> IO ()
cleanup :: forall (m :: * -> *) r.
BracketOnError m r -> [Weak ThreadId] -> IO ()
cleanup,Effect m r
action :: Effect m r
action :: forall (m :: * -> *) r. BracketOnError m r -> m r
action} <- Handler IO a
-> Int
-> (a -> IO ())
-> ((a -> m ()) -> Effect m r)
-> Proxy X () () X m (BracketOnError (Proxy X () () X m) r)
forall (m :: * -> *) (n :: * -> *) a r.
(MonadIO m, MonadIO n) =>
Handler IO a
-> Int
-> (a -> IO ())
-> ((a -> m ()) -> n r)
-> n (BracketOnError n r)
runParallel_ Handler IO a
hndl Int
i a -> IO ()
f (a -> m ()) -> Effect m r
forall c' c. (a -> m ()) -> Proxy X () c' c m r
workProd
    IO [Weak ThreadId]
-> ([Weak ThreadId] -> IO ()) -> Effect m r -> Effect m r
forall (m :: * -> *) a b c.
MonadSafe m =>
IO a -> (a -> IO b) -> m c -> m c
bracketOnError IO [Weak ThreadId]
allocate [Weak ThreadId] -> IO ()
cleanup Effect m r
action
  where
    workProd :: (a -> m ()) -> Proxy X () c' c m r
workProd a -> m ()
buffer = Producer a m r
prod Producer a m r -> Proxy () a c' c m r -> Proxy X () c' c m r
forall (m :: * -> *) a' a b r c' c.
Functor m =>
Proxy a' a () b m r -> Proxy () b c' c m r -> Proxy a' a c' c m r
>-> (a -> m ()) -> Consumer' a m r
forall (m :: * -> *) a r. Monad m => (a -> m ()) -> Consumer' a m r
P.mapM_ a -> m ()
buffer