module Data.PowerQueue
(
QueueWorker, newQueueWorker, JobResult(..)
, Queue, newQueue, mapQueue, enqueueJob
, QueueBackend(..), mapBackend, basicChanBackend
, workStep
, LocalWorkerConfig(..), localQueueWorker
)
where
import Control.Concurrent.Async
import Control.Exception
import Control.Monad
import Data.Bifunctor
import Data.Functor.Contravariant
import Data.Functor.Contravariant.Divisible
import Data.IORef
import qualified Control.Concurrent.Chan as C
data JobResult
= JOk
| JRetry
deriving (Show, Eq)
data QueueBackend j
= forall tx m. Monad m =>
QueueBackend
{ qb_lift :: forall a. m a -> IO a
, qb_enqueue :: j -> m Bool
, qb_dequeue :: m (tx, j)
, qb_confirm :: tx -> m ()
, qb_rollback :: tx -> m ()
}
basicChanBackend :: forall j. IO (QueueBackend j)
basicChanBackend =
do jobChannel <- C.newChan
inProgress <- newIORef (0, [])
let dequeueHandler :: j -> (Int, [(Int, j)]) -> ((Int, [(Int, j)]), (Int, j))
dequeueHandler job st@(idx, _) =
let entry = (idx, job)
in ( bimap (+1) (entry:) st
, entry
)
forget txId =
atomicModifyIORef' inProgress $ \st ->
( second (filter (\x -> fst x == txId)) st
, snd $ second (lookup txId) st
)
pure
QueueBackend
{ qb_lift = id
, qb_enqueue = \x -> C.writeChan jobChannel x >> pure True
, qb_dequeue =
do nextJob <- C.readChan jobChannel
atomicModifyIORef' inProgress (dequeueHandler nextJob)
, qb_confirm = void . forget
, qb_rollback =
\txId ->
do oldVal <- forget txId
case oldVal of
Just j -> C.writeChan jobChannel j
Nothing -> pure ()
}
mapBackend :: (a -> b) -> (b -> a) -> QueueBackend a -> QueueBackend b
mapBackend f g (QueueBackend qlift qenq qdeq qconf qroll) =
QueueBackend
{ qb_lift = qlift
, qb_enqueue = qenq . g
, qb_dequeue = fmap (second f) qdeq
, qb_confirm = qconf
, qb_rollback = qroll
}
data QueueWorker j
= QueueWorker
{ qw_execute :: j -> IO JobResult
}
newQueueWorker :: (j -> IO JobResult) -> QueueWorker j
newQueueWorker exec =
QueueWorker
{ qw_execute = exec
}
instance Contravariant QueueWorker where
contramap f (QueueWorker qexec) =
QueueWorker
{ qw_execute = \val -> qexec (f val)
}
instance Divisible QueueWorker where
divide f (QueueWorker qe1) (QueueWorker qe2) =
QueueWorker
{ qw_execute = \val ->
do let (l, r) = f val
ok1 <- qe1 l
ok2 <- qe2 r
pure $
if ok1 /= JOk || ok2 /= JOk
then JRetry
else JOk
}
conquer =
newQueueWorker $ \_ -> pure JOk
data Queue j
= Queue
{ q_worker :: !(QueueWorker j)
, q_backend :: !(QueueBackend j)
}
mapQueue :: (a -> b) -> (b -> a) -> Queue a -> Queue b
mapQueue f g q =
Queue
{ q_worker = contramap g (q_worker q)
, q_backend = mapBackend f g (q_backend q)
}
newQueue :: QueueBackend j -> QueueWorker j -> Queue j
newQueue qb qw =
Queue
{ q_worker = qw
, q_backend = qb
}
enqueueJob :: j -> Queue j -> IO Bool
enqueueJob j q =
let enqueue QueueBackend{..} = qb_lift (qb_enqueue j)
in enqueue (q_backend q)
workStep :: Queue j -> IO ()
workStep q = workStepInternal (q_backend q) (q_worker q)
workStepInternal :: QueueBackend j -> QueueWorker j -> IO ()
workStepInternal QueueBackend{..} QueueWorker{..} =
let acquire = qb_lift qb_dequeue
onError (txId, _) = qb_lift (qb_rollback txId)
go (txId, job) =
do execRes <-
try $ qw_execute job
case execRes of
Left (_ :: SomeException) ->
qb_lift (qb_rollback txId)
Right res ->
case res of
JOk -> qb_lift (qb_confirm txId)
JRetry -> qb_lift (qb_rollback txId)
in bracketOnError acquire onError go
data LocalWorkerConfig
= LocalWorkerConfig
{ lwc_concurrentJobs :: !Int
}
localQueueWorker :: LocalWorkerConfig -> Queue j -> IO ()
localQueueWorker cfg q =
replicateConcurrently_ (lwc_concurrentJobs cfg) $
forever $ workStep q