{-# LANGUAGE RankNTypes, ScopedTypeVariables, OverloadedStrings #-} module Web.Spock.Worker ( -- * Worker WorkQueue , WorkHandler , WorkerConfig (..) , WorkerConcurrentStrategy (..) , newWorker , addWork , WorkExecution (..) , WorkResult (..) -- * Error Handeling , ErrorHandler(..), InternalError ) where import Control.Concurrent import Control.Concurrent.STM import Control.Monad import Control.Monad.Trans import Control.Monad.Trans.Error import Control.Exception.Lifted as EX import Data.Time import Web.Spock.Shared import qualified Web.Spock.Worker.Internal.Queue as Q type InternalError = String -- | Describe how you want to handle errors. Make sure you catch all exceptions -- that can happen inside this handler, otherwise the worker will crash! data ErrorHandler conn sess st a = ErrorHandlerIO (InternalError -> a -> IO WorkResult) | ErrorHandlerSpock (InternalError -> a -> (WebStateM conn sess st) WorkResult) -- | Describe how you want jobs in the queue to be performed type WorkHandler conn sess st a = a -> ErrorT InternalError (WebStateM conn sess st) WorkResult -- | The queue containing scheduled jobs newtype WorkQueue a = WorkQueue { _unWorkQueue :: Q.WorkerQueue UTCTime a } -- | Describes when a job should be executed data WorkExecution = WorkNow | WorkIn NominalDiffTime | WorkAt UTCTime -- | Describes the outcome of a job after completion. You can repeat jobs data WorkResult = WorkComplete | WorkError | WorkRepeatIn NominalDiffTime | WorkRepeatAt UTCTime deriving (Show, Eq) -- | Configure the concurrent behaviour of a worker. If you want tasks executed -- concurrently, consider using 'WorkerConcurrentBounded' data WorkerConcurrentStrategy = WorkerNoConcurrency | WorkerConcurrentBounded Int | WorkerConcurrentUnbounded -- | Configure how the worker handles it's task and define the queue size data WorkerConfig = WorkerConfig { wc_queueLimit :: Int , wc_concurrent :: WorkerConcurrentStrategy } -- | Create a new background worker and limit the size of the job queue. newWorker :: (MonadTrans t, Monad (t (WebStateM conn sess st))) => WorkerConfig -> WorkHandler conn sess st a -> ErrorHandler conn sess st a -> t (WebStateM conn sess st) (WorkQueue a) newWorker wc workHandler errorHandler = do heart <- getSpockHeart q <- lift . liftIO $ Q.newQueue (wc_queueLimit wc) _ <- lift . liftIO $ forkIO (workProcessor q workHandler errorHandler heart (wc_concurrent wc)) return (WorkQueue q) workProcessor :: Q.WorkerQueue UTCTime a -> WorkHandler conn sess st a -> ErrorHandler conn sess st a -> WebState conn sess st -> WorkerConcurrentStrategy -> IO () workProcessor q workHandler errorHandler spockCore concurrentStrategy = do runningTasksVar <- newTVarIO 0 loop runningTasksVar where runWork work = do workRes <- EX.catch (runSpockIO spockCore $ runErrorT $ workHandler work) (\(e::SomeException) -> return $ Left (show e)) case workRes of Left err -> case errorHandler of ErrorHandlerIO h -> h err work ErrorHandlerSpock h -> runSpockIO spockCore $ h err work Right r -> return r loop runningTasksV = do now <- getCurrentTime mWork <- atomically $ Q.dequeue now q case mWork of Nothing -> do threadDelay (1000 * 1000) -- 1 sec loop runningTasksV Just work -> do case concurrentStrategy of WorkerConcurrentBounded limit -> do atomically $ do runningTasks <- readTVar runningTasksV when (runningTasks >= limit) retry _ <- forkIO $ launchWork runningTasksV work return () WorkerNoConcurrency -> launchWork runningTasksV work WorkerConcurrentUnbounded -> do _ <- forkIO $ launchWork runningTasksV work return () loop runningTasksV launchWork runningTasksV work = do atomically $ modifyTVar runningTasksV (\x -> x + 1) res <- (runWork work `EX.finally` (atomically $ modifyTVar runningTasksV (\x -> x - 1))) case res of WorkRepeatIn secs -> addWork (WorkIn secs) work (WorkQueue q) WorkRepeatAt time -> addWork (WorkAt time) work (WorkQueue q) _ -> return () -- | Add a new job to the background worker. If the queue is full this will block addWork :: MonadIO m => WorkExecution -> a -> WorkQueue a -> m () addWork we work (WorkQueue q) = liftIO $ do now <- getCurrentTime let execTime = case we of WorkNow -> now WorkIn later -> addUTCTime later now WorkAt ts -> ts atomically $ Q.enqueue execTime work q