module Data.PowerQueue
(
QueueWorker(..), newQueueWorker, JobResult(..)
, Queue, newQueue, mapQueue, enqueueJob, getQueueBackend, getQueueWorker
, QueueBackend(..), mapBackend, basicChanBackend
, workStep
, LocalWorkerConfig(..), localQueueWorker
)
where
import Control.Concurrent.Async
import Control.Exception
import Control.Monad
import Data.Bifunctor
import Data.Functor.Contravariant
import Data.Functor.Contravariant.Divisible
import Data.IORef
import Data.Time.TimeSpan
import qualified Control.Concurrent.Chan as C
data JobResult
= JOk
| JRetry
deriving (Show, Eq)
data QueueBackend j
= forall tx m. Monad m =>
QueueBackend
{ qb_lift :: forall a. m a -> IO a
, qb_enqueue :: j -> m Bool
, qb_dequeue :: m (tx, j)
, qb_confirm :: tx -> m ()
, qb_rollback :: tx -> m ()
, qb_reportProgress :: tx -> m ()
, qb_progressReportInterval :: !TimeSpan
}
basicChanBackend :: forall j. IO (QueueBackend j)
basicChanBackend =
do jobChannel <- C.newChan
inProgress <- newIORef (0, [])
let dequeueHandler :: j -> (Int, [(Int, j)]) -> ((Int, [(Int, j)]), (Int, j))
dequeueHandler job st@(idx, _) =
let entry = (idx, job)
in ( bimap (+1) (entry:) st
, entry
)
forget txId =
atomicModifyIORef' inProgress $ \st ->
( second (filter (\x -> fst x == txId)) st
, snd $ second (lookup txId) st
)
pure
QueueBackend
{ qb_lift = id
, qb_enqueue = \x -> C.writeChan jobChannel x >> pure True
, qb_dequeue =
do nextJob <- C.readChan jobChannel
atomicModifyIORef' inProgress (dequeueHandler nextJob)
, qb_confirm = void . forget
, qb_rollback =
\txId ->
do oldVal <- forget txId
case oldVal of
Just j -> C.writeChan jobChannel j
Nothing -> pure ()
, qb_reportProgress = const $ pure ()
, qb_progressReportInterval = hours 1
}
mapBackend :: (a -> b) -> (b -> a) -> QueueBackend a -> QueueBackend b
mapBackend f g (QueueBackend qlift qenq qdeq qconf qroll rp intv) =
QueueBackend
{ qb_lift = qlift
, qb_enqueue = qenq . g
, qb_dequeue = fmap (second f) qdeq
, qb_confirm = qconf
, qb_rollback = qroll
, qb_reportProgress = rp
, qb_progressReportInterval = intv
}
data QueueWorker j
= QueueWorker
{ qw_execute :: j -> IO JobResult
}
newQueueWorker :: (j -> IO JobResult) -> QueueWorker j
newQueueWorker exec =
QueueWorker
{ qw_execute = exec
}
instance Contravariant QueueWorker where
contramap f (QueueWorker qexec) =
QueueWorker
{ qw_execute = \val -> qexec (f val)
}
instance Divisible QueueWorker where
divide f (QueueWorker qe1) (QueueWorker qe2) =
QueueWorker
{ qw_execute = \val ->
do let (l, r) = f val
ok1 <- qe1 l
ok2 <- qe2 r
pure $
if ok1 /= JOk || ok2 /= JOk
then JRetry
else JOk
}
conquer =
newQueueWorker $ \_ -> pure JOk
data Queue j
= Queue
{ q_worker :: !(QueueWorker j)
, q_backend :: !(QueueBackend j)
}
getQueueWorker :: Queue j -> QueueWorker j
getQueueWorker = q_worker
getQueueBackend :: Queue j -> QueueBackend j
getQueueBackend = q_backend
mapQueue :: (a -> b) -> (b -> a) -> Queue a -> Queue b
mapQueue f g q =
Queue
{ q_worker = contramap g (q_worker q)
, q_backend = mapBackend f g (q_backend q)
}
newQueue :: QueueBackend j -> QueueWorker j -> Queue j
newQueue qb qw =
Queue
{ q_worker = qw
, q_backend = qb
}
enqueueJob :: j -> Queue j -> IO Bool
enqueueJob j q =
let enqueue QueueBackend{..} = qb_lift (qb_enqueue j)
in enqueue (q_backend q)
workStep :: Queue j -> IO ()
workStep q = workStepInternal (q_backend q) (q_worker q)
workStepInternal :: QueueBackend j -> QueueWorker j -> IO ()
workStepInternal QueueBackend{..} QueueWorker{..} =
let acquire =
do v <- qb_lift qb_dequeue
pReport <-
async $ forever $
do sleepTS qb_progressReportInterval
qb_lift $ qb_reportProgress (fst v)
pure (pReport, v)
onError (pg, (txId, _)) =
do cancel pg
qb_lift (qb_rollback txId)
go (pg, (txId, job)) =
do execRes <-
try $ qw_execute job
cancel pg
case execRes of
Left (_ :: SomeException) ->
qb_lift (qb_rollback txId)
Right res ->
case res of
JOk -> qb_lift (qb_confirm txId)
JRetry -> qb_lift (qb_rollback txId)
in bracketOnError acquire onError go
data LocalWorkerConfig
= LocalWorkerConfig
{ lwc_concurrentJobs :: !Int
}
localQueueWorker :: LocalWorkerConfig -> Queue j -> IO ()
localQueueWorker cfg q =
replicateConcurrently_ (lwc_concurrentJobs cfg) $
forever $ workStep q