{-# LANGUAGE TypeSynonymInstances, FlexibleInstances, NamedFieldPuns, DeriveGeneric, FlexibleContexts, TypeFamilies, StandaloneDeriving #-} module Test where import Test.Tasty as Tasty import qualified PGQueue.Migrations as Migrations import qualified PGQueue.Job as Job import Database.PostgreSQL.Simple as PGS import Data.Functor (void) import Data.Pool as Pool import Test.Tasty.HUnit import Debug.Trace import Control.Exception.Lifted (finally, catch) import Control.Monad.Logger import Control.Monad.Reader import Data.Aeson as Aeson import Control.Concurrent.Lifted import Control.Concurrent.Async.Lifted import PGQueue.Job (Job(..), JobId) import System.Log.FastLogger (fromLogStr, withFastLogger, LogType(..), defaultBufSize, FastLogger, FileLogSpec(..)) import Data.String.Conv (toS) import Data.Time import GHC.Generics import Hedgehog import qualified Hedgehog.Gen as Gen import qualified Hedgehog.Range as Range import Test.Tasty.Hedgehog import qualified System.Random as R import Data.String (fromString) import qualified Data.IntMap.Strict as Map import Control.Monad.Trans.Control (liftWith, restoreT) import Control.Monad.Morph (hoist) import Data.List as DL import PGQueue.Web as Web import Data.Time.Convenience as Time import qualified Data.Text as T import Data.Ord (comparing, Down(..)) import Data.Maybe (fromMaybe) main :: IO () main = do let connInfo = ConnectInfo { connectHost = "localhost" , connectPort = fromIntegral 5432 , connectUser = "jobs_test" , connectPassword = "jobs_test" , connectDatabase = "jobs_test" } appPool <- createPool (PGS.connect connInfo) -- cretea a new resource (PGS.close) -- destroy resource 1 -- stripes (fromRational 10) -- number of seconds unused resources are kept around 45 -- maximum open connections jobPool <- createPool (PGS.connect connInfo) -- cretea a new resource (PGS.close) -- destroy resource 1 -- stripes (fromRational 10) -- number of seconds unused resources are kept around 45 -- maximum open connections defaultMain $ tests appPool jobPool tests appPool jobPool = testGroup "All tests" [ testGroup "simple tests" [testJobCreation appPool jobPool, testJobScheduling appPool jobPool, testJobFailure appPool jobPool, testEnsureShutdown appPool jobPool] , testGroup "property tests" [ testEverything appPool jobPool , propFilterJobs appPool jobPool ] ] myTestCase :: TestName -> (Connection -> Assertion) -> Pool Connection -> TestTree myTestCase tname actualtest pool = testCase tname $ Pool.withResource pool actualtest -- myTestCase tname actualtest pool = Tasty.withResource -- (traceShow "connection taken" $ Pool.takeResource pool) -- (\(conn, lpool) -> traceShow "connection released" $ Pool.putResource lpool conn) -- (\res -> testCase tname $ do -- (conn, _) <- res -- (actualtest conn)) data Env = Env { envDbPool :: Pool Connection , envLogger :: FastLogger } type TestM = ReaderT Env IO instance Job.HasJobMonitor TestM where getDbPool = envDbPool <$> ask onJobRetry Job{jobId} = logInfoN $ "Retry JobId=" <> toS (show jobId) onJobSuccess Job{jobId} = logInfoN $ "Success JobId=" <> toS (show jobId) instance {-# OVERLAPPING #-} MonadLogger TestM where monadLoggerLog _ _ _ msg = do flogger <- envLogger <$> ask liftIO $ flogger $ toLogStr msg <> toLogStr ("\n" :: String) testPayload :: Value testPayload = toJSON (10 :: Int) jobRunner :: Job.Job -> IO () jobRunner Job{jobPayload, jobAttempts} = case (fromJSON jobPayload) of Aeson.Error e -> error e Success (j :: JobPayload) -> let recur pload idx = case pload of PayloadAlwaysFail delay -> (threadDelay delay) >> (error $ "Forced error after " <> show delay <> " seconds") PayloadSucceed delay -> (threadDelay delay) >> pure () PayloadFail delay innerpload -> if idx> (error $ "Forced error after " <> show delay <> " seconds. step=" <> show idx) in recur j 0 data JobPayload = PayloadSucceed Int | PayloadFail Int JobPayload | PayloadAlwaysFail Int deriving (Eq, Show, Generic) instance ToJSON JobPayload where toJSON = genericToJSON Aeson.defaultOptions instance FromJSON JobPayload where parseJSON = genericParseJSON Aeson.defaultOptions oneSec :: Int oneSec = 1000000 assertJobIdStatus :: (HasCallStack) => Connection -> Job.TableName -> String -> Job.Status -> JobId -> Assertion assertJobIdStatus conn tname msg st jid = Job.findJobById conn tname jid >>= \case Nothing -> assertFailure $ "Not expecting job to be deleted. JobId=" <> show jid Just (Job{jobStatus}) -> assertEqual msg st jobStatus ensureJobId :: (HasCallStack) => Connection -> Job.TableName -> JobId -> IO Job ensureJobId conn tname jid = Job.findJobById conn tname jid >>= \case Nothing -> error $ "Not expecting job to be deleted. JobId=" <> show jid Just j -> pure j -- withRandomTable :: (MonadIO m) => Pool Connection -> (Job.TableName -> m a) -> m a withRandomTable jobPool action = do (tname :: Job.TableName) <- liftIO ((("jobs_" <>) . fromString) <$> (replicateM 10 (R.randomRIO ('a', 'z')))) finally ((Pool.withResource jobPool $ \conn -> (liftIO $ Migrations.createJobTable conn tname)) >> (action tname)) (Pool.withResource jobPool $ \conn -> liftIO $ void $ PGS.execute_ conn ("drop table if exists " <> tname <> ";")) -- withNewJobMonitor :: (Pool Connection) -> (TableName -> Assertion) -> Assertion withNewJobMonitor jobPool actualTest = withRandomTable jobPool $ \tname -> withNamedJobMonitor tname jobPool (actualTest tname) withNamedJobMonitor tname jobPool actualTest = do (defaults, cleanup) <- (Job.defaultJobMonitor tname jobPool) let jobMonitorSettings = defaults{ Job.monitorJobRunner = jobRunner , Job.monitorTableName = tname , Job.monitorDefaultMaxAttempts = 3 } finally (withAsync (Job.runJobMonitor jobMonitorSettings) (const actualTest)) (cleanup) payloadGen :: MonadGen m => m JobPayload payloadGen = Gen.recursive Gen.choice nonRecursive recursive where nonRecursive = [ PayloadAlwaysFail <$> Gen.element [1, 2, 3] , PayloadSucceed <$> Gen.element [1, 2, 3]] recursive = [ PayloadFail <$> (Gen.element [1, 2, 3]) <*> payloadGen ] testJobCreation appPool jobPool = testCase "job creation" $ withNewJobMonitor jobPool $ \tname -> Pool.withResource appPool $ \conn -> do Job{jobId} <- Job.createJob conn tname (PayloadSucceed 0) threadDelay (oneSec * 6) assertJobIdStatus conn tname "Expecting job to tbe successful by now" Job.Success jobId testEnsureShutdown appPool jobPool = testCase "ensure shutdown" $ withRandomTable jobPool $ \tname -> do jid <- scheduleJob tname threadDelay (2 * Job.defaultPollingInterval) Pool.withResource appPool $ \conn -> assertJobIdStatus conn tname "Job should still be in queued state if job-monitor is no longer runner" Job.Queued jid where scheduleJob tname = withNamedJobMonitor tname jobPool $ do t <- getCurrentTime Pool.withResource appPool $ \conn -> do Job{jobId} <- Job.scheduleJob conn tname (PayloadSucceed 0) (addUTCTime (fromIntegral (2 * (Job.defaultPollingInterval `div` oneSec))) t) assertJobIdStatus conn tname "Job is scheduled in future, should still be queueud" Job.Queued jobId pure jobId testJobScheduling appPool jobPool = testCase "job scheduling" $ withNewJobMonitor jobPool $ \tname -> Pool.withResource appPool $ \conn -> do t <- getCurrentTime job@Job{jobId} <- Job.scheduleJob conn tname (PayloadSucceed 0) (addUTCTime (fromIntegral 3600) t) threadDelay (oneSec * 2) assertJobIdStatus conn tname "Job is scheduled in the future. It should NOT have been successful by now" Job.Queued jobId j <- Job.saveJob conn tname job{jobRunAt = (addUTCTime (fromIntegral (-1)) t)} threadDelay (Job.defaultPollingInterval + (oneSec * 2)) assertJobIdStatus conn tname "Job had a runAt date in the past. It should have been successful by now" Job.Success jobId testJobFailure appPool jobPool = testCase "job retry" $ withNewJobMonitor jobPool $ \tname -> Pool.withResource appPool $ \conn -> do Job{jobId} <- Job.createJob conn tname (PayloadAlwaysFail 0) threadDelay (oneSec * 15) Job{jobAttempts, jobStatus} <- ensureJobId conn tname jobId assertEqual "Exepcting job to be in Failed status" Job.Failed jobStatus assertEqual ("Expecting job attempts to be 3. Found " <> show jobAttempts) 3 jobAttempts data JobEvent = JobStart | JobRetry | JobSuccess | JobFailed deriving (Eq, Show) testEverything appPool jobPool = testProperty "test everything" $ property $ do jobPayloads <- forAll $ Gen.list (Range.linear 1 1000) payloadGen jobsMVar <- liftIO $ newMVar (Map.empty :: Map.IntMap [(JobEvent, Job.Job)]) test $ withRandomTable jobPool $ \tname -> do (defaults, cleanup) <- liftIO $ Job.defaultJobMonitor tname jobPool let jobMonitorSettings = defaults { Job.monitorJobRunner = jobRunner , Job.monitorTableName = tname , Job.monitorOnJobStart = onJobEvent JobStart jobsMVar , Job.monitorOnJobRetry = onJobEvent JobRetry jobsMVar , Job.monitorOnJobPermanentlyFailed = onJobEvent JobFailed jobsMVar , Job.monitorOnJobSuccess = onJobEvent JobSuccess jobsMVar , Job.monitorDefaultMaxAttempts = 3 , Job.monitorPollingInterval = oneSec * jobPollingInterval } (jobs :: [Job]) <- withAsync (liftIO $ Job.runJobMonitor jobMonitorSettings) (const $ finally (liftIO $ actualTest jobPayloads tname jobsMVar) (liftIO cleanup)) jobAudit <- takeMVar jobsMVar -- All jobs should show up in the audit, which means they should have -- been attempted /at least/ once (DL.sort $ map jobId jobs) === (DL.sort $ Map.keys jobAudit) -- No job should've been simultaneously picked-up by more than one -- worker True === (Map.foldl (\m js -> m && noRaceCondition js) True jobAudit) liftIO $ print $ "Test passed with job-count = " <> show (length jobPayloads) where noRaceCondition js = DL.foldl (&&) True $ DL.zipWith (\(x,_) (y,_) -> not $ x==JobStart && y==JobStart) js (tail js) jobPollingInterval = 2 onJobEvent evt jobsMVar job@Job{jobId} = void $ modifyMVar_ jobsMVar $ \jobMap -> do pure $ Map.insertWith (++) jobId [(evt, job)] jobMap actualTest :: [JobPayload] -> Job.TableName -> MVar (Map.IntMap [(JobEvent, Job.Job)]) -> IO [Job] actualTest jobPayloads tname jobsMVar = do jobs <- forConcurrently jobPayloads $ \pload -> Pool.withResource appPool $ \conn -> liftIO $ Job.createJob conn tname pload let concurrencyFactor = 5 maxDelay = sum $ map (payloadDelay jobPollingInterval) jobPayloads timeout = (maxDelay `div` concurrencyFactor) + (2 * testPollingInterval) testPollingInterval = 5 poller nextAction = case nextAction of Left s -> pure $ Left s Right remaining -> if remaining == 0 then pure (Right 0) else do threadDelay (oneSec * testPollingInterval) print "------- Polling ------" x <- withMVar jobsMVar $ \jobMap -> if (Map.foldl (\m js -> m && (isJobTerminalState js)) True jobMap) then pure (Right 0) else if remaining < testPollingInterval then pure (Left $ "Timeout. Job count=" <> show (length jobPayloads) <> " timeout=" <> show timeout <> " payload delay=" <> show maxDelay) else pure $ Right (remaining - testPollingInterval) poller x poller (Right timeout) >>= \case Left s -> Prelude.error s Right _ -> pure jobs isJobTerminalState js = case js of [] -> False (_, j):_ -> case (Job.jobStatus j) of Job.Failed -> True Job.Success -> True _ -> False payloadDelay :: Int -> JobPayload -> Int payloadDelay jobPollingInterval pload = payloadDelay_ 0 pload where payloadDelay_ total p = case p of PayloadAlwaysFail x -> total + x + jobPollingInterval PayloadSucceed x -> total + x + jobPollingInterval PayloadFail x ip -> payloadDelay_ (total + x + jobPollingInterval) ip -- TODO: test to ensure that errors in callback do not affect the running of jobs deriving instance Enum Time.Unit deriving instance Enum Time.Direction timeGen :: MonadGen m => UTCTime -> Time.Direction -> m UTCTime timeGen t d = do u <- Gen.enum Time.Seconds Time.Fortnights -- d <- Gen.element $ enumFrom Time.Ago i <- Gen.integral (Range.constant 0 10) pure $ timeSince t (fromInteger i) u d anyTimeGen :: MonadGen m => UTCTime -> m UTCTime anyTimeGen t = (Gen.element (enumFrom Time.Ago)) >>= (timeGen t) futureTimeGen :: MonadGen m => UTCTime -> m UTCTime futureTimeGen t = timeGen t Time.FromThat pastTimeGen :: MonadGen m => UTCTime -> m UTCTime pastTimeGen t = timeGen t Time.Ago jobGen :: (MonadGen m) => UTCTime -> m (UTCTime, UTCTime, UTCTime, Job.Status, Aeson.Value, Int, Maybe UTCTime, Maybe T.Text) jobGen t = do createdAt <- anyTimeGen t updatedAt <- futureTimeGen createdAt runAt <- futureTimeGen createdAt status <- Gen.element $ enumFrom Job.Success pload <- payloadGen attempts <- Gen.int (Range.constant 0 10) (lockedAt, lockedBy) <- case status of Job.Locked -> do x <- futureTimeGen createdAt y <- Gen.text (Range.constant 1 100) Gen.ascii pure (Just x, Just y) _ -> pure (Nothing, Nothing) pure (createdAt, updatedAt, runAt, status, toJSON pload, attempts, lockedAt, lockedBy) propFilterJobs appPool jobPool = testProperty "filter jobs" $ property $ do t <- liftIO getCurrentTime jobs <- forAll $ Gen.list (Range.linear 1 1000) (jobGen t) f <- forAll $ genFilter t test $ withRandomTable jobPool $ \tname -> do (savedJobs, dbJobs) <- liftIO $ do savedJobs <- Pool.withResource appPool $ \conn -> forM jobs $ \j -> (PGS.query conn (qry tname) j) >>= (pure . Prelude.head) (jm, cleanup) <- liftIO $ Job.defaultJobMonitor tname jobPool dbJobs <- (flip finally) cleanup $ (flip runReaderT) jm $ Web.filterJobs f pure (savedJobs, dbJobs) let testJobs = Test.filterJobs f savedJobs -- (sortBy (comparing jobId) dbJobs) === (sortBy (comparing jobId) testJobs) dbJobs === testJobs where qry tname = "INSERT INTO " <> tname <> " (created_at, updated_at, run_at, status, payload, attempts, locked_at, locked_by) values(?, ?, ?, ?, ?, ?, ?, ?) RETURNING " <> Job.concatJobDbColumns genFilter :: MonadGen m => UTCTime -> m Web.Filter genFilter t = do statuses <- Gen.list (Range.constant 0 2) (Gen.element $ enumFrom Job.Success) createdAfter <- Gen.maybe (anyTimeGen t) createdBefore <- case createdAfter of Nothing -> Gen.maybe (anyTimeGen t) Just x -> Gen.maybe (futureTimeGen x) updatedAfter <- Gen.maybe (anyTimeGen t) updatedBefore <- case updatedAfter of Nothing -> Gen.maybe (anyTimeGen t) Just x -> Gen.maybe (futureTimeGen x) orderClause <- Gen.maybe ((,) <$> (Gen.element $ enumFrom Web.OrdCreatedAt) <*> (Gen.element $ enumFrom Web.Asc)) limitOffset <- Gen.maybe ((,) <$> (Gen.int (Range.constant 5 10)) <*> (Gen.int (Range.constant 0 30))) runAfter <- Gen.maybe (futureTimeGen t) pure Web.blankFilter { filterStatuses = statuses , filterCreatedAfter = createdAfter , filterCreatedBefore = createdBefore , filterUpdatedAfter = updatedAfter , filterUpdatedBefore = updatedBefore , filterOrder = orderClause , filterPage = limitOffset , filterRunAfter = runAfter } filterJobs :: Filter -> [Job] -> [Job] filterJobs Web.Filter{filterStatuses, filterCreatedAfter, filterCreatedBefore, filterUpdatedAfter, filterUpdatedBefore, filterOrder, filterPage, filterRunAfter} js = applyLimitOffset $ applyOrdering (fromMaybe (Web.OrdUpdatedAt, Web.Desc) filterOrder) $ (flip DL.filter) js $ \j -> (filterByStatus j) && (filterByCreatedAfter j) && (filterByCreatedBefore j) && (filterByUpdatedAfter j) && (filterByUpdatedBefore j) && (filterByRunAfter j) where applyLimitOffset = maybe Prelude.id (\(l, o) -> (Prelude.take l). (Prelude.drop o)) filterPage applyOrdering (fld, dir) lst = let comparer = resultOrder $ case fld of Web.OrdCreatedAt -> (comparing jobCreatedAt) Web.OrdUpdatedAt -> (comparing jobUpdatedAt) Web.OrdLockedAt -> (comparing jobLockedAt) Web.OrdStatus -> (comparing jobStatus) Web.OrdJobType -> comparing Job.jobType resultOrder fn = \x y -> case fn x y of EQ -> compare (Down $ jobId x) (Down $ jobId y) LT -> case dir of Web.Asc -> LT Web.Desc -> GT GT -> case dir of Web.Asc -> GT Web.Desc -> LT in sortBy comparer lst filterByStatus Job.Job{jobStatus} = if Prelude.null filterStatuses then True else jobStatus `elem` filterStatuses filterByCreatedAfter Job.Job{jobCreatedAt} = maybe True (<= jobCreatedAt) filterCreatedAfter filterByCreatedBefore Job.Job{jobCreatedAt} = maybe True (> jobCreatedAt) filterCreatedBefore filterByUpdatedAfter Job.Job{jobUpdatedAt} = maybe True (<= jobUpdatedAt) filterUpdatedAfter filterByUpdatedBefore Job.Job{jobUpdatedAt} = maybe True (> jobUpdatedAt) filterUpdatedBefore filterByRunAfter Job.Job{jobRunAt} = maybe True (< jobRunAt) filterRunAfter