module Keenser.Types
( Config(..)
, Configurator
, Job(..)
, JobId
, Manager(..)
, ManagerStatus(..)
, Middleware
, Queue
, RunningJob(..)
, Signal(..)
, Worker(..)
, WorkerName
) where
import Control.Concurrent (ThreadId)
import Control.Concurrent.STM.TVar (TVar)
import Control.Monad.Trans.State (StateT)
import Data.Aeson
import qualified Data.ByteString as BS
import Data.Int (Int32)
import qualified Data.Map as M
import qualified Data.Text as T
import Data.Text.Encoding (decodeUtf8, encodeUtf8)
import Database.Redis (Connection)
import Keenser.Import
data Config m = Config
{ kWorkers :: M.Map WorkerName (Worker m Value)
, kQueues :: [Queue]
, kConcurrency :: Int
, kRedis :: Connection
, kMiddleware :: [Middleware m]
}
type Configurator m = StateT (Config m) m ()
type JobId = BS.ByteString
data Job a = Job
{ jobClass :: WorkerName
, jobArgs :: a
, jobId :: JobId
, jobRetry :: Bool
, jobEnqueuedAt :: UTCTime
, jobQueue :: Queue
}
data Manager = Manager
{ managerHostname :: BS.ByteString
, managerStartedAt :: UTCTime
, managerPid :: Int32
, managerConcurrency :: Int
, managerQueues :: [Queue]
, managerLabels :: [T.Text]
, managerIdentity :: BS.ByteString
, managerRedis :: Connection
, managerRunning :: TVar (M.Map ThreadId RunningJob)
, managerComplete :: TVar Integer
, managerFailed :: TVar Integer
, managerQuiet :: TVar Bool
}
data ManagerStatus = ManagerStatus
{ sProcs :: [BS.ByteString]
, sState :: [(BS.ByteString, BS.ByteString)]
, sDone :: Int
, sFailed :: Int
, sQueues :: [(BS.ByteString, Integer)]
} deriving Show
type Middleware m = Manager -> Worker m Value -> Object -> Queue -> m () -> m ()
type Queue = BS.ByteString
data RunningJob = RunningJob
{ rjPayload :: Object
, rjQueue :: Queue
, rjRunAt :: UTCTime
, rjThread :: ThreadId
}
data Signal = USR1 | TERM deriving (Show, Read)
type WorkerName = BS.ByteString
data Worker m a = Worker
{ workerName :: WorkerName
, workerQueue :: Queue
, workerPerform :: a -> m ()
}
instance Functor Job where
fmap f (Job klass args _id retry at q) = Job klass (f args) _id retry at q
instance ToJSON a => ToJSON (Job a) where
toJSON Job{..} = object
[ "class" .= jobClass
, "args" .= [jobArgs]
, "jid" .= jobId
, "retry" .= jobRetry
, "enqueued_at" .= timeToJson jobEnqueuedAt
, "queue" .= jobQueue
]
instance FromJSON a => FromJSON (Job a) where
parseJSON = withObject "job" $ \v -> do
jobClass <- v .: "class"
jobArgs <- head <$> v .: "args"
jobId <- v .: "jid"
jobRetry <- v .: "retry"
jobEnqueuedAt <- jsonToTime <$> v .: "enqueued_at"
jobQueue <- v .: "queue"
return $! Job{..}
instance ToJSON Manager where
toJSON Manager{..} = object
[ "hostname" .= managerHostname
, "started_at" .= timeToJson managerStartedAt
, "pid" .= managerPid
, "concurrency" .= managerConcurrency
, "queues" .= managerQueues
, "identity" .= managerIdentity
]
instance ToJSON RunningJob where
toJSON RunningJob{..} = object
[ "queue" .= rjQueue
, "payload" .= fmap (: []) rjPayload
, "run_at" .= timeToJson rjRunAt
]
instance ToJSON WorkerName where
toJSON = Data.Aeson.String . decodeUtf8
instance FromJSON WorkerName where
parseJSON = withText "worker_name" $ return . encodeUtf8