{- Implements a queue with the following properties:

- waits for queue to be fully pushed when exiting using ctrl-c (SIGINT)
- allows stopping the producer
- avoid duplicate pushing of the same store paths

To safetly exit on demand, signal SIGINT.
-}
module Cachix.Client.PushQueue
  ( startWorkers,
    Queue,
  )
where

import qualified Cachix.Client.Push as Push
import Cachix.Client.Retry (retryAll)
import Control.Concurrent.Async
import Control.Concurrent.STM (TVar, modifyTVar', newTVarIO, readTVar)
import qualified Control.Concurrent.STM.Lock as Lock
import qualified Control.Concurrent.STM.TBQueue as TBQueue
import qualified Data.Set as S
import Hercules.CNix.Store (StorePath)
import Protolude
import qualified System.Posix.Signals as Signals

type Queue = TBQueue.TBQueue StorePath

data PushWorkerState = PushWorkerState
  { PushWorkerState -> Queue
pushQueue :: Queue,
    PushWorkerState -> TVar Int
inProgress :: TVar Int
  }

data QueryWorkerState = QueryWorkerState
  { QueryWorkerState -> Queue
queryQueue :: Queue,
    QueryWorkerState -> Set StorePath
alreadyQueued :: S.Set StorePath,
    QueryWorkerState -> Lock
lock :: Lock.Lock
  }

worker :: Push.PushParams IO () -> PushWorkerState -> IO ()
worker :: PushParams IO () -> PushWorkerState -> IO ()
worker PushParams IO ()
pushParams PushWorkerState
workerState = IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
  StorePath
storePath <- STM StorePath -> IO StorePath
forall a. STM a -> IO a
atomically (STM StorePath -> IO StorePath) -> STM StorePath -> IO StorePath
forall a b. (a -> b) -> a -> b
$ Queue -> STM StorePath
forall a. TBQueue a -> STM a
TBQueue.readTBQueue (Queue -> STM StorePath) -> Queue -> STM StorePath
forall a b. (a -> b) -> a -> b
$ PushWorkerState -> Queue
pushQueue PushWorkerState
workerState
  IO () -> IO () -> IO () -> IO ()
forall a b c. IO a -> IO b -> IO c -> IO c
bracket_ ((Int -> Int) -> IO ()
inProgresModify (Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)) ((Int -> Int) -> IO ()
inProgresModify (\Int
x -> Int
x Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1)) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
    (RetryStatus -> IO ()) -> IO ()
forall (m :: * -> *) a.
(MonadIO m, MonadMask m) =>
(RetryStatus -> m a) -> m a
retryAll ((RetryStatus -> IO ()) -> IO ())
-> (RetryStatus -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$
      PushParams IO () -> StorePath -> RetryStatus -> IO ()
forall (m :: * -> *) r.
(MonadMask m, MonadIO m) =>
PushParams m r -> StorePath -> RetryStatus -> m r
Push.uploadStorePath PushParams IO ()
pushParams StorePath
storePath
  where
    inProgresModify :: (Int -> Int) -> IO ()
inProgresModify Int -> Int
f =
      STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' (PushWorkerState -> TVar Int
inProgress PushWorkerState
workerState) Int -> Int
f

-- NOTE: producer is responsible for signaling SIGINT upon termination
-- NOTE: producer should return an `IO ()` that should be a blocking operation for terminating it
startWorkers :: Int -> (Queue -> IO (IO ())) -> Push.PushParams IO () -> IO ()
startWorkers :: Int -> (Queue -> IO (IO ())) -> PushParams IO () -> IO ()
startWorkers Int
numWorkers Queue -> IO (IO ())
mkProducer PushParams IO ()
pushParams = do
  -- start query worker
  (Queue
newQueryQueue, Queue
newPushQueue, Lock
newLock) <-
    STM (Queue, Queue, Lock) -> IO (Queue, Queue, Lock)
forall a. STM a -> IO a
atomically (STM (Queue, Queue, Lock) -> IO (Queue, Queue, Lock))
-> STM (Queue, Queue, Lock) -> IO (Queue, Queue, Lock)
forall a b. (a -> b) -> a -> b
$
      (,,) (Queue -> Queue -> Lock -> (Queue, Queue, Lock))
-> STM Queue -> STM (Queue -> Lock -> (Queue, Queue, Lock))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Natural -> STM Queue
forall a. Natural -> STM (TBQueue a)
TBQueue.newTBQueue Natural
10000 STM (Queue -> Lock -> (Queue, Queue, Lock))
-> STM Queue -> STM (Lock -> (Queue, Queue, Lock))
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Natural -> STM Queue
forall a. Natural -> STM (TBQueue a)
TBQueue.newTBQueue Natural
10000 STM (Lock -> (Queue, Queue, Lock))
-> STM Lock -> STM (Queue, Queue, Lock)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> STM Lock
Lock.new
  let queryWorkerState :: QueryWorkerState
queryWorkerState = Queue -> Set StorePath -> Lock -> QueryWorkerState
QueryWorkerState Queue
newQueryQueue Set StorePath
forall a. Set a
S.empty Lock
newLock
  Async ()
queryWorker <- IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async (IO () -> IO (Async ())) -> IO () -> IO (Async ())
forall a b. (a -> b) -> a -> b
$ QueryWorkerState -> Queue -> PushParams IO () -> IO ()
queryLoop QueryWorkerState
queryWorkerState Queue
newPushQueue PushParams IO ()
pushParams
  -- start push workers
  IO ()
stopProducerCallback <- Queue -> IO (IO ())
mkProducer Queue
newQueryQueue
  TVar Int
progress <- Int -> IO (TVar Int)
forall a. a -> IO (TVar a)
newTVarIO Int
0
  let pushWorkerState :: PushWorkerState
pushWorkerState = Queue -> TVar Int -> PushWorkerState
PushWorkerState Queue
newPushQueue TVar Int
progress
  Async ()
pushWorker <- IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async (IO () -> IO (Async ())) -> IO () -> IO (Async ())
forall a b. (a -> b) -> a -> b
$ Int -> IO () -> IO ()
forall a. Int -> IO a -> IO ()
replicateConcurrently_ Int
numWorkers (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ PushParams IO () -> PushWorkerState -> IO ()
worker PushParams IO ()
pushParams PushWorkerState
pushWorkerState
  IO Handler -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Handler -> IO ()) -> IO Handler -> IO ()
forall a b. (a -> b) -> a -> b
$ Signal -> Handler -> Maybe SignalSet -> IO Handler
Signals.installHandler Signal
Signals.sigINT (IO () -> Handler
Signals.CatchOnce (IO ()
-> Async ()
-> Async ()
-> QueryWorkerState
-> PushWorkerState
-> IO ()
exitOnceQueueIsEmpty IO ()
stopProducerCallback Async ()
pushWorker Async ()
queryWorker QueryWorkerState
queryWorkerState PushWorkerState
pushWorkerState)) Maybe SignalSet
forall a. Maybe a
Nothing
  (Async ()
_, Either SomeException ()
eitherException) <- [Async ()] -> IO (Async (), Either SomeException ())
forall a. [Async a] -> IO (Async a, Either SomeException a)
waitAnyCatchCancel [Async ()
pushWorker, Async ()
queryWorker]
  case Either SomeException ()
eitherException of
    Left SomeException
exc | SomeException -> Maybe StopWorker
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
exc Maybe StopWorker -> Maybe StopWorker -> Bool
forall a. Eq a => a -> a -> Bool
== StopWorker -> Maybe StopWorker
forall a. a -> Maybe a
Just StopWorker
StopWorker -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
    Left SomeException
exc -> SomeException -> IO ()
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO SomeException
exc
    Right () -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()

data StopWorker = StopWorker
  deriving (StopWorker -> StopWorker -> Bool
(StopWorker -> StopWorker -> Bool)
-> (StopWorker -> StopWorker -> Bool) -> Eq StopWorker
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: StopWorker -> StopWorker -> Bool
$c/= :: StopWorker -> StopWorker -> Bool
== :: StopWorker -> StopWorker -> Bool
$c== :: StopWorker -> StopWorker -> Bool
Eq, Int -> StopWorker -> ShowS
[StopWorker] -> ShowS
StopWorker -> String
(Int -> StopWorker -> ShowS)
-> (StopWorker -> String)
-> ([StopWorker] -> ShowS)
-> Show StopWorker
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [StopWorker] -> ShowS
$cshowList :: [StopWorker] -> ShowS
show :: StopWorker -> String
$cshow :: StopWorker -> String
showsPrec :: Int -> StopWorker -> ShowS
$cshowsPrec :: Int -> StopWorker -> ShowS
Show)

instance Exception StopWorker

queryLoop :: QueryWorkerState -> Queue -> Push.PushParams IO () -> IO ()
queryLoop :: QueryWorkerState -> Queue -> PushParams IO () -> IO ()
queryLoop QueryWorkerState
workerState Queue
pushqueue PushParams IO ()
pushParams = do
  -- this blocks until item is available and doesn't remove it from the queue
  StorePath
_ <- STM StorePath -> IO StorePath
forall a. STM a -> IO a
atomically (STM StorePath -> IO StorePath) -> STM StorePath -> IO StorePath
forall a b. (a -> b) -> a -> b
$ Queue -> STM StorePath
forall a. TBQueue a -> STM a
TBQueue.peekTBQueue (QueryWorkerState -> Queue
queryQueue QueryWorkerState
workerState)
  (Set StorePath
missingStorePathsSet, Set StorePath
alreadyQueuedSet) <- Lock
-> IO (Set StorePath, Set StorePath)
-> IO (Set StorePath, Set StorePath)
forall a. Lock -> IO a -> IO a
Lock.with (QueryWorkerState -> Lock
lock QueryWorkerState
workerState) (IO (Set StorePath, Set StorePath)
 -> IO (Set StorePath, Set StorePath))
-> IO (Set StorePath, Set StorePath)
-> IO (Set StorePath, Set StorePath)
forall a b. (a -> b) -> a -> b
$ do
    [StorePath]
storePaths <- STM [StorePath] -> IO [StorePath]
forall a. STM a -> IO a
atomically (STM [StorePath] -> IO [StorePath])
-> STM [StorePath] -> IO [StorePath]
forall a b. (a -> b) -> a -> b
$ Queue -> STM [StorePath]
forall a. TBQueue a -> STM [a]
TBQueue.flushTBQueue (QueryWorkerState -> Queue
queryQueue QueryWorkerState
workerState)
    -- if push queue is empty we can our store path cache here as getClosure will do its job
    Set StorePath
alreadyQueuedSet <- STM (Set StorePath) -> IO (Set StorePath)
forall a. STM a -> IO a
atomically (STM (Set StorePath) -> IO (Set StorePath))
-> STM (Set StorePath) -> IO (Set StorePath)
forall a b. (a -> b) -> a -> b
$ do
      Bool
isEmpty <- Queue -> STM Bool
forall a. TBQueue a -> STM Bool
TBQueue.isEmptyTBQueue Queue
pushqueue
      if Bool
isEmpty
        then Set StorePath -> STM (Set StorePath)
forall (m :: * -> *) a. Monad m => a -> m a
return Set StorePath
forall a. Set a
S.empty
        else Set StorePath -> STM (Set StorePath)
forall (m :: * -> *) a. Monad m => a -> m a
return (Set StorePath -> STM (Set StorePath))
-> Set StorePath -> STM (Set StorePath)
forall a b. (a -> b) -> a -> b
$ QueryWorkerState -> Set StorePath
alreadyQueued QueryWorkerState
workerState
    [StorePath]
missingStorePaths <- PushParams IO () -> [StorePath] -> IO [StorePath]
forall (m :: * -> *) r.
(MonadIO m, MonadMask m) =>
PushParams m r -> [StorePath] -> m [StorePath]
Push.getMissingPathsForClosure PushParams IO ()
pushParams [StorePath]
storePaths
    let missingStorePathsSet :: Set StorePath
missingStorePathsSet = [StorePath] -> Set StorePath
forall a. Ord a => [a] -> Set a
S.fromList [StorePath]
missingStorePaths
        uncachedMissingStorePaths :: Set StorePath
uncachedMissingStorePaths = Set StorePath -> Set StorePath -> Set StorePath
forall a. Ord a => Set a -> Set a -> Set a
S.difference Set StorePath
missingStorePathsSet Set StorePath
alreadyQueuedSet
    STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ Set StorePath -> (StorePath -> STM ()) -> STM ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ Set StorePath
uncachedMissingStorePaths ((StorePath -> STM ()) -> STM ())
-> (StorePath -> STM ()) -> STM ()
forall a b. (a -> b) -> a -> b
$ Queue -> StorePath -> STM ()
forall a. TBQueue a -> a -> STM ()
TBQueue.writeTBQueue Queue
pushqueue
    (Set StorePath, Set StorePath) -> IO (Set StorePath, Set StorePath)
forall (m :: * -> *) a. Monad m => a -> m a
return (Set StorePath
missingStorePathsSet, Set StorePath
alreadyQueuedSet)
  QueryWorkerState -> Queue -> PushParams IO () -> IO ()
queryLoop (QueryWorkerState
workerState {alreadyQueued :: Set StorePath
alreadyQueued = Set StorePath -> Set StorePath -> Set StorePath
forall a. Ord a => Set a -> Set a -> Set a
S.union Set StorePath
missingStorePathsSet Set StorePath
alreadyQueuedSet}) Queue
pushqueue PushParams IO ()
pushParams

exitOnceQueueIsEmpty :: IO () -> Async () -> Async () -> QueryWorkerState -> PushWorkerState -> IO ()
exitOnceQueueIsEmpty :: IO ()
-> Async ()
-> Async ()
-> QueryWorkerState
-> PushWorkerState
-> IO ()
exitOnceQueueIsEmpty IO ()
stopProducerCallback Async ()
pushWorker Async ()
queryWorker QueryWorkerState
queryWorkerState PushWorkerState
pushWorkerState = do
  Text -> IO ()
forall (m :: * -> *). MonadIO m => Text -> m ()
putText Text
"Stopped watching /nix/store and waiting for queue to empty ..."
  IO ()
stopProducerCallback
  IO ()
go
  where
    go :: IO ()
go = do
      (Bool
isDone, Int
inprogress, Natural
queueLength) <- STM (Bool, Int, Natural) -> IO (Bool, Int, Natural)
forall a. STM a -> IO a
atomically (STM (Bool, Int, Natural) -> IO (Bool, Int, Natural))
-> STM (Bool, Int, Natural) -> IO (Bool, Int, Natural)
forall a b. (a -> b) -> a -> b
$ do
        Natural
pushQueueLength <- Queue -> STM Natural
forall a. TBQueue a -> STM Natural
TBQueue.lengthTBQueue (Queue -> STM Natural) -> Queue -> STM Natural
forall a b. (a -> b) -> a -> b
$ PushWorkerState -> Queue
pushQueue PushWorkerState
pushWorkerState
        Natural
queryQueueLength <- Queue -> STM Natural
forall a. TBQueue a -> STM Natural
TBQueue.lengthTBQueue (Queue -> STM Natural) -> Queue -> STM Natural
forall a b. (a -> b) -> a -> b
$ QueryWorkerState -> Queue
queryQueue QueryWorkerState
queryWorkerState
        Int
inprogress <- TVar Int -> STM Int
forall a. TVar a -> STM a
readTVar (TVar Int -> STM Int) -> TVar Int -> STM Int
forall a b. (a -> b) -> a -> b
$ PushWorkerState -> TVar Int
inProgress PushWorkerState
pushWorkerState
        Bool
isLocked <- Lock -> STM Bool
Lock.locked (QueryWorkerState -> Lock
lock QueryWorkerState
queryWorkerState)
        let isDone :: Bool
isDone = Natural
pushQueueLength Natural -> Natural -> Bool
forall a. Eq a => a -> a -> Bool
== Natural
0 Bool -> Bool -> Bool
&& Natural
queryQueueLength Natural -> Natural -> Bool
forall a. Eq a => a -> a -> Bool
== Natural
0 Bool -> Bool -> Bool
&& Int
inprogress Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0 Bool -> Bool -> Bool
&& Bool -> Bool
not Bool
isLocked
        (Bool, Int, Natural) -> STM (Bool, Int, Natural)
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool
isDone, Int
inprogress, Natural
pushQueueLength)
      if Bool
isDone
        then do
          Text -> IO ()
forall (m :: * -> *). MonadIO m => Text -> m ()
putText Text
"Done."
          Async () -> StopWorker -> IO ()
forall e a. Exception e => Async a -> e -> IO ()
cancelWith Async ()
queryWorker StopWorker
StopWorker
          Async () -> StopWorker -> IO ()
forall e a. Exception e => Async a -> e -> IO ()
cancelWith Async ()
pushWorker StopWorker
StopWorker
        else do
          Text -> IO ()
forall (m :: * -> *). MonadIO m => Text -> m ()
putText (Text -> IO ()) -> Text -> IO ()
forall a b. (a -> b) -> a -> b
$ Text
"Waiting to finish: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Int -> Text
forall a b. (Show a, ConvertText String b) => a -> b
show Int
inprogress Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" pushing, " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Natural -> Text
forall a b. (Show a, ConvertText String b) => a -> b
show Natural
queueLength Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" in queue"
          Int -> IO ()
threadDelay (Int
1000 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
1000)
          IO ()
go