{-# LANGUAGE NoImplicitPrelude #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE ExplicitForAll #-}
{-# LANGUAGE DeriveDataTypeable #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Mismi.S3.Internal.Parallel (
Workers
, Result
, newResult
, emptyResult
, addResult
, getResult
, emptyWorkers
, failWorkers
, getWorkers
, addWorker
, waitForWorkers
, waitForWorkers'
, RunError (..)
, renderRunError
, consume_
, consume
, waitEitherBoth
) where
import Control.Concurrent.MVar (newEmptyMVar, takeMVar, putMVar)
import Control.Concurrent (threadDelay)
import Control.Concurrent.Async (Async, async, cancel, poll)
import Control.Concurrent.Async (waitBoth, wait, waitEither)
import Control.Concurrent.MSem (new, signal)
import qualified Control.Concurrent.MSem as M
import Control.Concurrent.MVar (MVar, newMVar, readMVar, modifyMVar, modifyMVar_)
import Control.Monad.Catch (Exception, SomeException, catch, catchAll, finally, throwM)
import Control.Monad.IO.Class (MonadIO, liftIO)
import Control.Monad.Loops (untilM_)
import Control.Monad.Trans.Except (ExceptT (..), runExceptT)
import Control.Monad.Trans.Bifunctor (firstT)
import qualified Data.Text as T
import Data.Typeable (Typeable)
import P
import Mismi.S3.Internal.Async (waitEitherBoth)
import Mismi.S3.Internal.Queue
newtype Result a =
Result (MVar a)
newResult :: a -> IO (Result a)
newResult a =
Result <$> newMVar a
emptyResult :: IO (Result a)
emptyResult =
Result <$> newEmptyMVar
addResult :: Monad m => Result (m a) -> m a -> IO (m a)
addResult (Result r) w =
modifyMVar r (\x -> pure $ ((x >> w), x))
getResult :: Result a -> IO a
getResult (Result r) =
readMVar r
newtype Workers a =
Workers (MVar [Async a])
emptyWorkers :: IO (Workers a)
emptyWorkers =
Workers <$> newMVar []
failWorkers :: Workers a -> IO ()
failWorkers (Workers w) =
readMVar w >>=
mapM_ cancel
getWorkers :: Workers a -> IO [Async a]
getWorkers (Workers w) =
readMVar w
addWorker :: (Workers a) -> Async a -> IO ()
addWorker (Workers w) r =
modifyMVar_ w $ pure . (:) r
waitForWorkers :: (Workers a) -> IO ()
waitForWorkers (Workers w) =
readMVar w >>= mapM_ wait
waitForWorkers' :: (Workers a) -> IO [a]
waitForWorkers' (Workers w) =
readMVar w >>= mapM wait
consume_ :: MonadIO m => (Queue b -> IO a) -> Int -> (b -> ExceptT e IO ()) -> ExceptT (RunError e) m a
consume_ pro fork action = ExceptT . liftIO $ do
q <- newQueue fork
producer <- async $ pro q
workers <- emptyWorkers
result <- newResult $ Right ()
sem <- new fork
let spawn = do
m <- tryReadQueue q
flip (maybe $ return ()) m $ \a -> do
w <- do
M.wait sem
async $ (runExceptT (action a) >>= void . addResult result . first WorkerError) `finally` signal sem
addWorker workers w
let check = do
threadDelay 1000
p <- poll producer
e <- isQueueEmpty q
pure $ (isJust p) && e
submitter <- async $ untilM_ spawn check
void . async . forever $ do
threadDelay 1000000
getResult result >>=
either (const $ cancel producer >> cancel submitter) pure
let waiter = do
(i, _) <- waitBoth producer submitter
waitForWorkers workers
pure i
(waiter >>= \i -> getResult result >>= pure . second (const $ i))
`catchAll` (\z ->
failWorkers workers >>
getResult result >>= \w ->
pure (w >> Left (BlowUpError z)))
data RunError a =
WorkerError a
| BlowUpError SomeException
deriving Show
renderRunError :: RunError a -> (a -> Text) -> Text
renderRunError r render =
case r of
WorkerError a ->
"Worker failed: " <> render a
BlowUpError e ->
"An unknown exception was caught: " <> T.pack (show e)
data EarlyTermination =
EarlyTermination deriving (Eq, Show, Typeable)
instance Exception EarlyTermination
consume :: forall a b c e . (Queue a -> IO b) -> Int -> (a -> IO (Either e c)) -> IO (Either (RunError e) (b, [c]))
consume pro fork action = flip catchAll (pure . Left . BlowUpError) $ do
q <- newQueue fork
producer <- async $ pro q
workers <- (emptyWorkers :: IO (Workers c))
sem <- new fork
early <- newEmptyMVar
terminator <- async $ takeMVar early
let spawn :: IO ()
spawn = do
m <- tryReadQueue q
flip (maybe $ return ()) m $ \a -> do
w <- do
M.wait sem
async $ flip finally (signal sem) $ do
r <- action a
case r of
Left e ->
putMVar early e >>
throwM EarlyTermination
Right c ->
pure $! c
addWorker workers w
let check = do
threadDelay 1000
p <- poll producer
e <- isQueueEmpty q
pure $ (isJust p) && e
submitter <- async $ untilM_ spawn check
let waiter = runExceptT $ do
(i, _) <- firstT WorkerError . ExceptT $
waitEitherBoth terminator producer submitter
ws <- liftIO $ getWorkers workers
ii <- mapM (firstT WorkerError . ExceptT . waitEither terminator) ws
pure $ (i, ii)
waiter `catch` (\(_ :: EarlyTermination) ->
failWorkers workers >>
(Left . WorkerError) <$> wait terminator)