module Web.Spock.Worker
(
WorkQueue
, WorkHandler
, WorkerConfig (..)
, WorkerConcurrentStrategy (..)
, newWorker
, addWork
, WorkExecution (..)
, WorkResult (..)
, ErrorHandler(..), InternalError
)
where
import Control.Concurrent
import Control.Concurrent.STM
import Control.Monad
import Control.Monad.Trans
import Control.Monad.Trans.Error
import Control.Exception.Lifted as EX
import Data.Time
import Web.Spock.Shared
import qualified Web.Spock.Worker.Internal.Queue as Q
type InternalError = String
data ErrorHandler conn sess st a
= ErrorHandlerIO (InternalError -> a -> IO WorkResult)
| ErrorHandlerSpock (InternalError -> a -> (WebStateM conn sess st) WorkResult)
type WorkHandler conn sess st a
= a -> ErrorT InternalError (WebStateM conn sess st) WorkResult
newtype WorkQueue a
= WorkQueue { _unWorkQueue :: Q.WorkerQueue UTCTime a }
data WorkExecution
= WorkNow
| WorkIn NominalDiffTime
| WorkAt UTCTime
data WorkResult
= WorkComplete
| WorkError
| WorkRepeatIn NominalDiffTime
| WorkRepeatAt UTCTime
deriving (Show, Eq)
data WorkerConcurrentStrategy
= WorkerNoConcurrency
| WorkerConcurrentBounded Int
| WorkerConcurrentUnbounded
data WorkerConfig
= WorkerConfig
{ wc_queueLimit :: Int
, wc_concurrent :: WorkerConcurrentStrategy
}
newWorker :: (MonadTrans t, Monad (t (WebStateM conn sess st)))
=> WorkerConfig
-> WorkHandler conn sess st a
-> ErrorHandler conn sess st a
-> t (WebStateM conn sess st) (WorkQueue a)
newWorker wc workHandler errorHandler =
do heart <- getSpockHeart
q <- lift . liftIO $ Q.newQueue (wc_queueLimit wc)
_ <- lift . liftIO $ forkIO (workProcessor q workHandler errorHandler heart (wc_concurrent wc))
return (WorkQueue q)
workProcessor :: Q.WorkerQueue UTCTime a
-> WorkHandler conn sess st a
-> ErrorHandler conn sess st a
-> WebState conn sess st
-> WorkerConcurrentStrategy
-> IO ()
workProcessor q workHandler errorHandler spockCore concurrentStrategy =
do runningTasksVar <- newTVarIO 0
loop runningTasksVar
where
runWork work =
do workRes <-
EX.catch (runSpockIO spockCore $ runErrorT $ workHandler work)
(\(e::SomeException) -> return $ Left (show e))
case workRes of
Left err ->
case errorHandler of
ErrorHandlerIO h ->
h err work
ErrorHandlerSpock h ->
runSpockIO spockCore $ h err work
Right r -> return r
loop runningTasksV =
do now <- getCurrentTime
mWork <- atomically $ Q.dequeue now q
case mWork of
Nothing ->
do threadDelay (1000 * 1000)
loop runningTasksV
Just work ->
do case concurrentStrategy of
WorkerConcurrentBounded limit ->
do atomically $
do runningTasks <- readTVar runningTasksV
when (runningTasks >= limit) retry
_ <- forkIO $ launchWork runningTasksV work
return ()
WorkerNoConcurrency ->
launchWork runningTasksV work
WorkerConcurrentUnbounded ->
do _ <- forkIO $ launchWork runningTasksV work
return ()
loop runningTasksV
launchWork runningTasksV work =
do atomically $ modifyTVar runningTasksV (\x -> x + 1)
res <- (runWork work `EX.finally` (atomically $ modifyTVar runningTasksV (\x -> x 1)))
case res of
WorkRepeatIn secs ->
addWork (WorkIn secs) work (WorkQueue q)
WorkRepeatAt time ->
addWork (WorkAt time) work (WorkQueue q)
_ ->
return ()
addWork :: MonadIO m => WorkExecution -> a -> WorkQueue a -> m ()
addWork we work (WorkQueue q) =
liftIO $
do now <- getCurrentTime
let execTime =
case we of
WorkNow -> now
WorkIn later -> addUTCTime later now
WorkAt ts -> ts
atomically $ Q.enqueue execTime work q