module Sos.JobQueue
( JobQueue
, newJobQueue
, clearJobQueue
, jobQueueLength
, jobQueueJobs
, enqueueJob
, dequeueJob
) where
import Sos.FileEvent
import Sos.Job
import Sos.Utils
import Control.Applicative
import Control.Concurrent.Async
import Control.Concurrent.STM
import Data.Foldable (find)
import Data.List.NonEmpty (NonEmpty(..))
import Data.Monoid
import Data.Sequence (Seq, ViewL(..), (|>), viewl)
import qualified Data.Sequence as Sequence
newtype JobQueue = JobQueue (TVar (Seq Job))
newJobQueue :: IO JobQueue
newJobQueue = JobQueue <$> newTVarIO mempty
clearJobQueue :: JobQueue -> IO ()
clearJobQueue (JobQueue queue) = atomically (writeTVar queue mempty)
jobQueueLength :: JobQueue -> IO Int
jobQueueLength queue = Sequence.length <$> jobQueueJobs queue
jobQueueJobs :: JobQueue -> IO (Seq Job)
jobQueueJobs (JobQueue queue_tv) = atomically (readTVar queue_tv)
enqueueJob :: FileEvent -> NonEmpty ShellCommand -> JobQueue -> IO ()
enqueueJob event cmds (JobQueue queue_tv) = atomically $ do
queue <- readTVar queue_tv
case find (\j -> jobCommands j == cmds) queue of
Nothing -> do
job <- newJob event cmds
writeTVar queue_tv (queue |> job)
Just job -> restartJob job
dequeueJob :: JobQueue -> IO JobResult
dequeueJob (JobQueue queue_tv) = do
job <- atomically $ do
queue <- readTVar queue_tv
case viewl queue of
j :< _ -> do
unrestartJob j
pure j
_ -> retry
putStrLn ("\n" <> cyan (showFileEvent (jobEvent job)))
a <- async (runJob job)
let doRunJob :: STM JobResult
doRunJob = do
r <- waitSTM a
modifyTVar' queue_tv seqTail
pure r
atomically (doRunJob <|||> shouldRestartJob job) >>= \case
Left result -> pure result
Right () -> do
cancel a
_ <- waitCatch a
dequeueJob (JobQueue queue_tv)
seqTail :: Seq a -> Seq a
seqTail s =
case viewl s of
_ :< xs -> xs
_ -> error "seqTail: empty sequence"
showFileEvent :: FileEvent -> String
showFileEvent = \case
FileAdded path -> unpackBS ("Added: " <> path)
FileModified path -> unpackBS ("Modified: " <> path)