{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Control.Immortal.Queue
(
ImmortalQueue(..)
, processImmortalQueue
, QueueId
, closeImmortalQueue
, killImmortalQueue
)
where
import Control.Concurrent ( MVar
, newEmptyMVar
, takeMVar
, putMVar
, threadDelay
, readMVar
, tryPutMVar
, tryTakeMVar
)
import Control.Concurrent.Async ( Async
, async
, wait
, race
, cancel
)
import Control.Exception ( Exception )
import Control.Immortal ( Thread )
import Control.Monad ( (>=>)
, forever
, void
)
import Numeric.Natural ( Natural )
import qualified Control.Immortal as Immortal
data ImmortalQueue a =
ImmortalQueue
{ qThreadCount :: Natural
, qPollWorkerTime :: Int
, qPop :: IO a
, qPush :: a -> IO ()
, qHandler :: a -> IO ()
, qFailure :: forall e. Exception e => a -> e -> IO ()
}
processImmortalQueue :: forall a . ImmortalQueue a -> IO QueueId
processImmortalQueue queue = do
shutdown <- newEmptyMVar
asyncQueue <- async $ do
threads <- mapM (const makeWorker) [1 .. qThreadCount queue]
nextAction <- newEmptyMVar
asyncQueuePopper <- async $ popQueue nextAction threads
cleanClose <- takeMVar shutdown
cancel asyncQueuePopper
if cleanClose
then do
mapM_ (Immortal.mortalize . wdThread) threads
mapM_ (flip putMVar () . wdCloseMVar) threads
else mapM_ (Immortal.stop . wdThread) threads
mapM_ (Immortal.wait . wdThread) threads
tryTakeMVar nextAction >>= \case
Nothing -> return ()
Just action -> qPush queue action
return QueueId { qiCloseCleanly = shutdown, qiAsyncQueue = asyncQueue }
where
makeWorker :: IO (WorkerData a)
makeWorker = do
closeMVar <- newEmptyMVar
inputMVar <- newEmptyMVar
let makeData thread = WorkerData { wdCloseMVar = closeMVar
, wdInputMVar = inputMVar
, wdThread = thread
}
makeData <$> Immortal.create (processAction . makeData)
processAction :: WorkerData a -> IO ()
processAction workerData = do
actionOrClose <- race (takeMVar $ wdCloseMVar workerData)
(readMVar $ wdInputMVar workerData)
case actionOrClose of
Left () -> return ()
Right action ->
let flushInput = void $ takeMVar $ wdInputMVar workerData
finalize = errorHandler action >=> const flushInput
in Immortal.onFinish finalize $ qHandler queue action
errorHandler :: Exception e => a -> Either e () -> IO ()
errorHandler action = either (qFailure queue action) (const $ return ())
popQueue :: MVar a -> [WorkerData a] -> IO ()
popQueue actionMVar workers = forever $ do
nextAction <- qPop queue
putMVar actionMVar nextAction
assignWork workers nextAction
void $ takeMVar actionMVar
assignWork :: [WorkerData a] -> a -> IO ()
assignWork workers action = do
assigned <- assignToFreeWorker action workers
if assigned
then return ()
else
threadDelay (1000 * qPollWorkerTime queue)
>> assignWork workers action
assignToFreeWorker :: a -> [WorkerData a] -> IO Bool
assignToFreeWorker action = \case
[] -> return False
worker : ws -> do
successfulPut <- tryPutMVar (wdInputMVar worker) action
if successfulPut then return True else assignToFreeWorker action ws
data WorkerData a =
WorkerData
{ wdCloseMVar :: MVar ()
, wdInputMVar :: MVar a
, wdThread :: Thread
}
data QueueId =
QueueId
{ qiCloseCleanly :: MVar Bool
, qiAsyncQueue :: Async ()
}
closeImmortalQueue :: QueueId -> IO ()
closeImmortalQueue queueId =
putMVar (qiCloseCleanly queueId) True >> wait (qiAsyncQueue queueId)
killImmortalQueue :: QueueId -> IO ()
killImmortalQueue queueId =
putMVar (qiCloseCleanly queueId) False >> wait (qiAsyncQueue queueId)