module Cachix.Client.PushQueue
( startWorkers,
Queue,
)
where
import qualified Cachix.Client.Push as Push
import Cachix.Client.Retry (retryAll)
import Control.Concurrent.Async
import Control.Concurrent.STM (TVar, modifyTVar', newTVarIO, readTVar)
import qualified Control.Concurrent.STM.Lock as Lock
import qualified Control.Concurrent.STM.TBQueue as TBQueue
import qualified Data.Set as S
import Protolude
import qualified System.Posix.Signals as Signals
type StorePath = Text
type Queue = TBQueue.TBQueue StorePath
data PushWorkerState
= PushWorkerState
{ PushWorkerState -> Queue
pushQueue :: Queue,
PushWorkerState -> TVar Int
inProgress :: TVar Int
}
data QueryWorkerState
= QueryWorkerState
{ QueryWorkerState -> Queue
queryQueue :: Queue,
QueryWorkerState -> Set StorePath
alreadyQueued :: S.Set StorePath,
QueryWorkerState -> Lock
lock :: Lock.Lock
}
worker :: Push.PushParams IO () -> PushWorkerState -> IO ()
worker :: PushParams IO () -> PushWorkerState -> IO ()
worker pushParams :: PushParams IO ()
pushParams workerState :: PushWorkerState
workerState = IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
StorePath
storePath <- STM StorePath -> IO StorePath
forall a. STM a -> IO a
atomically (STM StorePath -> IO StorePath) -> STM StorePath -> IO StorePath
forall a b. (a -> b) -> a -> b
$ Queue -> STM StorePath
forall a. TBQueue a -> STM a
TBQueue.readTBQueue (Queue -> STM StorePath) -> Queue -> STM StorePath
forall a b. (a -> b) -> a -> b
$ PushWorkerState -> Queue
pushQueue PushWorkerState
workerState
IO () -> IO () -> IO () -> IO ()
forall a b c. IO a -> IO b -> IO c -> IO c
bracket_ ((Int -> Int) -> IO ()
inProgresModify (Int -> Int -> Int
forall a. Num a => a -> a -> a
+ 1)) ((Int -> Int) -> IO ()
inProgresModify (\x :: Int
x -> Int
x Int -> Int -> Int
forall a. Num a => a -> a -> a
- 1))
(IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ (RetryStatus -> IO ()) -> IO ()
forall (m :: * -> *) a.
(MonadIO m, MonadMask m) =>
(RetryStatus -> m a) -> m a
retryAll
((RetryStatus -> IO ()) -> IO ())
-> (RetryStatus -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ PushParams IO () -> StorePath -> RetryStatus -> IO ()
forall (m :: * -> *) r.
(MonadMask m, MonadIO m) =>
PushParams m r -> StorePath -> RetryStatus -> m r
Push.uploadStorePath PushParams IO ()
pushParams StorePath
storePath
where
inProgresModify :: (Int -> Int) -> IO ()
inProgresModify f :: Int -> Int
f =
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' (PushWorkerState -> TVar Int
inProgress PushWorkerState
workerState) Int -> Int
f
startWorkers :: Int -> (Queue -> IO (IO ())) -> Push.PushParams IO () -> IO ()
startWorkers :: Int -> (Queue -> IO (IO ())) -> PushParams IO () -> IO ()
startWorkers numWorkers :: Int
numWorkers mkProducer :: Queue -> IO (IO ())
mkProducer pushParams :: PushParams IO ()
pushParams = do
(newQueryQueue :: Queue
newQueryQueue, newPushQueue :: Queue
newPushQueue, newLock :: Lock
newLock) <-
STM (Queue, Queue, Lock) -> IO (Queue, Queue, Lock)
forall a. STM a -> IO a
atomically (STM (Queue, Queue, Lock) -> IO (Queue, Queue, Lock))
-> STM (Queue, Queue, Lock) -> IO (Queue, Queue, Lock)
forall a b. (a -> b) -> a -> b
$
(,,) (Queue -> Queue -> Lock -> (Queue, Queue, Lock))
-> STM Queue -> STM (Queue -> Lock -> (Queue, Queue, Lock))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Natural -> STM Queue
forall a. Natural -> STM (TBQueue a)
TBQueue.newTBQueue 10000 STM (Queue -> Lock -> (Queue, Queue, Lock))
-> STM Queue -> STM (Lock -> (Queue, Queue, Lock))
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Natural -> STM Queue
forall a. Natural -> STM (TBQueue a)
TBQueue.newTBQueue 10000 STM (Lock -> (Queue, Queue, Lock))
-> STM Lock -> STM (Queue, Queue, Lock)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> STM Lock
Lock.new
let queryWorkerState :: QueryWorkerState
queryWorkerState = Queue -> Set StorePath -> Lock -> QueryWorkerState
QueryWorkerState Queue
newQueryQueue Set StorePath
forall a. Set a
S.empty Lock
newLock
Async ()
queryWorker <- IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async (IO () -> IO (Async ())) -> IO () -> IO (Async ())
forall a b. (a -> b) -> a -> b
$ QueryWorkerState -> Queue -> PushParams IO () -> IO ()
queryLoop QueryWorkerState
queryWorkerState Queue
newPushQueue PushParams IO ()
pushParams
IO ()
stopProducerCallback <- Queue -> IO (IO ())
mkProducer Queue
newQueryQueue
TVar Int
progress <- Int -> IO (TVar Int)
forall a. a -> IO (TVar a)
newTVarIO 0
let pushWorkerState :: PushWorkerState
pushWorkerState = Queue -> TVar Int -> PushWorkerState
PushWorkerState Queue
newPushQueue TVar Int
progress
Async ()
pushWorker <- IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async (IO () -> IO (Async ())) -> IO () -> IO (Async ())
forall a b. (a -> b) -> a -> b
$ Int -> IO () -> IO ()
forall a. Int -> IO a -> IO ()
replicateConcurrently_ Int
numWorkers (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ PushParams IO () -> PushWorkerState -> IO ()
worker PushParams IO ()
pushParams PushWorkerState
pushWorkerState
IO Handler -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Handler -> IO ()) -> IO Handler -> IO ()
forall a b. (a -> b) -> a -> b
$ Signal -> Handler -> Maybe SignalSet -> IO Handler
Signals.installHandler Signal
Signals.sigINT (IO () -> Handler
Signals.CatchOnce (IO ()
-> Async ()
-> Async ()
-> QueryWorkerState
-> PushWorkerState
-> IO ()
exitOnceQueueIsEmpty IO ()
stopProducerCallback Async ()
pushWorker Async ()
queryWorker QueryWorkerState
queryWorkerState PushWorkerState
pushWorkerState)) Maybe SignalSet
forall a. Maybe a
Nothing
(_, eitherException :: Either SomeException ()
eitherException) <- [Async ()] -> IO (Async (), Either SomeException ())
forall a. [Async a] -> IO (Async a, Either SomeException a)
waitAnyCatchCancel [Async ()
pushWorker, Async ()
queryWorker]
case Either SomeException ()
eitherException of
Left exc :: SomeException
exc | SomeException -> Maybe StopWorker
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
exc Maybe StopWorker -> Maybe StopWorker -> Bool
forall a. Eq a => a -> a -> Bool
== StopWorker -> Maybe StopWorker
forall a. a -> Maybe a
Just StopWorker
StopWorker -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Left exc :: SomeException
exc -> SomeException -> IO ()
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO SomeException
exc
Right () -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
data StopWorker = StopWorker
deriving (StopWorker -> StopWorker -> Bool
(StopWorker -> StopWorker -> Bool)
-> (StopWorker -> StopWorker -> Bool) -> Eq StopWorker
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: StopWorker -> StopWorker -> Bool
$c/= :: StopWorker -> StopWorker -> Bool
== :: StopWorker -> StopWorker -> Bool
$c== :: StopWorker -> StopWorker -> Bool
Eq, Int -> StopWorker -> ShowS
[StopWorker] -> ShowS
StopWorker -> String
(Int -> StopWorker -> ShowS)
-> (StopWorker -> String)
-> ([StopWorker] -> ShowS)
-> Show StopWorker
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [StopWorker] -> ShowS
$cshowList :: [StopWorker] -> ShowS
show :: StopWorker -> String
$cshow :: StopWorker -> String
showsPrec :: Int -> StopWorker -> ShowS
$cshowsPrec :: Int -> StopWorker -> ShowS
Show)
instance Exception StopWorker
queryLoop :: QueryWorkerState -> Queue -> Push.PushParams IO () -> IO ()
queryLoop :: QueryWorkerState -> Queue -> PushParams IO () -> IO ()
queryLoop workerState :: QueryWorkerState
workerState pushqueue :: Queue
pushqueue pushParams :: PushParams IO ()
pushParams = do
StorePath
_ <- STM StorePath -> IO StorePath
forall a. STM a -> IO a
atomically (STM StorePath -> IO StorePath) -> STM StorePath -> IO StorePath
forall a b. (a -> b) -> a -> b
$ Queue -> STM StorePath
forall a. TBQueue a -> STM a
TBQueue.peekTBQueue (QueryWorkerState -> Queue
queryQueue QueryWorkerState
workerState)
(missingStorePathsSet :: Set StorePath
missingStorePathsSet, alreadyQueuedSet :: Set StorePath
alreadyQueuedSet) <- Lock
-> IO (Set StorePath, Set StorePath)
-> IO (Set StorePath, Set StorePath)
forall a. Lock -> IO a -> IO a
Lock.with (QueryWorkerState -> Lock
lock QueryWorkerState
workerState) (IO (Set StorePath, Set StorePath)
-> IO (Set StorePath, Set StorePath))
-> IO (Set StorePath, Set StorePath)
-> IO (Set StorePath, Set StorePath)
forall a b. (a -> b) -> a -> b
$ do
[StorePath]
storePaths <- STM [StorePath] -> IO [StorePath]
forall a. STM a -> IO a
atomically (STM [StorePath] -> IO [StorePath])
-> STM [StorePath] -> IO [StorePath]
forall a b. (a -> b) -> a -> b
$ Queue -> STM [StorePath]
forall a. TBQueue a -> STM [a]
TBQueue.flushTBQueue (QueryWorkerState -> Queue
queryQueue QueryWorkerState
workerState)
Set StorePath
alreadyQueuedSet <- STM (Set StorePath) -> IO (Set StorePath)
forall a. STM a -> IO a
atomically (STM (Set StorePath) -> IO (Set StorePath))
-> STM (Set StorePath) -> IO (Set StorePath)
forall a b. (a -> b) -> a -> b
$ do
Bool
isEmpty <- Queue -> STM Bool
forall a. TBQueue a -> STM Bool
TBQueue.isEmptyTBQueue Queue
pushqueue
if Bool
isEmpty
then Set StorePath -> STM (Set StorePath)
forall (m :: * -> *) a. Monad m => a -> m a
return Set StorePath
forall a. Set a
S.empty
else Set StorePath -> STM (Set StorePath)
forall (m :: * -> *) a. Monad m => a -> m a
return (Set StorePath -> STM (Set StorePath))
-> Set StorePath -> STM (Set StorePath)
forall a b. (a -> b) -> a -> b
$ QueryWorkerState -> Set StorePath
alreadyQueued QueryWorkerState
workerState
[StorePath]
missingStorePaths <- PushParams IO () -> [StorePath] -> IO [StorePath]
forall (m :: * -> *) r.
(MonadIO m, MonadMask m) =>
PushParams m r -> [StorePath] -> m [StorePath]
Push.getMissingPathsForClosure PushParams IO ()
pushParams [StorePath]
storePaths
let missingStorePathsSet :: Set StorePath
missingStorePathsSet = [StorePath] -> Set StorePath
forall a. Ord a => [a] -> Set a
S.fromList [StorePath]
missingStorePaths
uncachedMissingStorePaths :: Set StorePath
uncachedMissingStorePaths = Set StorePath -> Set StorePath -> Set StorePath
forall a. Ord a => Set a -> Set a -> Set a
S.difference Set StorePath
missingStorePathsSet Set StorePath
alreadyQueuedSet
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ Set StorePath -> (StorePath -> STM ()) -> STM ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ Set StorePath
uncachedMissingStorePaths ((StorePath -> STM ()) -> STM ())
-> (StorePath -> STM ()) -> STM ()
forall a b. (a -> b) -> a -> b
$ Queue -> StorePath -> STM ()
forall a. TBQueue a -> a -> STM ()
TBQueue.writeTBQueue Queue
pushqueue
(Set StorePath, Set StorePath) -> IO (Set StorePath, Set StorePath)
forall (m :: * -> *) a. Monad m => a -> m a
return (Set StorePath
missingStorePathsSet, Set StorePath
alreadyQueuedSet)
QueryWorkerState -> Queue -> PushParams IO () -> IO ()
queryLoop (QueryWorkerState
workerState {alreadyQueued :: Set StorePath
alreadyQueued = Set StorePath -> Set StorePath -> Set StorePath
forall a. Ord a => Set a -> Set a -> Set a
S.union Set StorePath
missingStorePathsSet Set StorePath
alreadyQueuedSet}) Queue
pushqueue PushParams IO ()
pushParams
exitOnceQueueIsEmpty :: IO () -> Async () -> Async () -> QueryWorkerState -> PushWorkerState -> IO ()
exitOnceQueueIsEmpty :: IO ()
-> Async ()
-> Async ()
-> QueryWorkerState
-> PushWorkerState
-> IO ()
exitOnceQueueIsEmpty stopProducerCallback :: IO ()
stopProducerCallback pushWorker :: Async ()
pushWorker queryWorker :: Async ()
queryWorker queryWorkerState :: QueryWorkerState
queryWorkerState pushWorkerState :: PushWorkerState
pushWorkerState = do
StorePath -> IO ()
forall (m :: * -> *). MonadIO m => StorePath -> m ()
putText "Stopped watching /nix/store and waiting for queue to empty ..."
IO ()
stopProducerCallback
IO ()
go
where
go :: IO ()
go = do
(isDone :: Bool
isDone, inprogress :: Int
inprogress, queueLength :: Natural
queueLength) <- STM (Bool, Int, Natural) -> IO (Bool, Int, Natural)
forall a. STM a -> IO a
atomically (STM (Bool, Int, Natural) -> IO (Bool, Int, Natural))
-> STM (Bool, Int, Natural) -> IO (Bool, Int, Natural)
forall a b. (a -> b) -> a -> b
$ do
Natural
pushQueueLength <- Queue -> STM Natural
forall a. TBQueue a -> STM Natural
TBQueue.lengthTBQueue (Queue -> STM Natural) -> Queue -> STM Natural
forall a b. (a -> b) -> a -> b
$ PushWorkerState -> Queue
pushQueue PushWorkerState
pushWorkerState
Natural
queryQueueLength <- Queue -> STM Natural
forall a. TBQueue a -> STM Natural
TBQueue.lengthTBQueue (Queue -> STM Natural) -> Queue -> STM Natural
forall a b. (a -> b) -> a -> b
$ QueryWorkerState -> Queue
queryQueue QueryWorkerState
queryWorkerState
Int
inprogress <- TVar Int -> STM Int
forall a. TVar a -> STM a
readTVar (TVar Int -> STM Int) -> TVar Int -> STM Int
forall a b. (a -> b) -> a -> b
$ PushWorkerState -> TVar Int
inProgress PushWorkerState
pushWorkerState
Bool
isLocked <- Lock -> STM Bool
Lock.locked (QueryWorkerState -> Lock
lock QueryWorkerState
queryWorkerState)
let isDone :: Bool
isDone = Natural
pushQueueLength Natural -> Natural -> Bool
forall a. Eq a => a -> a -> Bool
== 0 Bool -> Bool -> Bool
&& Natural
queryQueueLength Natural -> Natural -> Bool
forall a. Eq a => a -> a -> Bool
== 0 Bool -> Bool -> Bool
&& Int
inprogress Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== 0 Bool -> Bool -> Bool
&& Bool -> Bool
not Bool
isLocked
(Bool, Int, Natural) -> STM (Bool, Int, Natural)
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool
isDone, Int
inprogress, Natural
pushQueueLength)
if Bool
isDone
then do
StorePath -> IO ()
forall (m :: * -> *). MonadIO m => StorePath -> m ()
putText "Done."
Async () -> StopWorker -> IO ()
forall e a. Exception e => Async a -> e -> IO ()
cancelWith Async ()
queryWorker StopWorker
StopWorker
Async () -> StopWorker -> IO ()
forall e a. Exception e => Async a -> e -> IO ()
cancelWith Async ()
pushWorker StopWorker
StopWorker
else do
StorePath -> IO ()
forall (m :: * -> *). MonadIO m => StorePath -> m ()
putText (StorePath -> IO ()) -> StorePath -> IO ()
forall a b. (a -> b) -> a -> b
$ "Waiting to finish: " StorePath -> StorePath -> StorePath
forall a. Semigroup a => a -> a -> a
<> Int -> StorePath
forall a b. (Show a, ConvertText String b) => a -> b
show Int
inprogress StorePath -> StorePath -> StorePath
forall a. Semigroup a => a -> a -> a
<> " pushing, " StorePath -> StorePath -> StorePath
forall a. Semigroup a => a -> a -> a
<> Natural -> StorePath
forall a b. (Show a, ConvertText String b) => a -> b
show Natural
queueLength StorePath -> StorePath -> StorePath
forall a. Semigroup a => a -> a -> a
<> " in queue"
Int -> IO ()
threadDelay (1000 Int -> Int -> Int
forall a. Num a => a -> a -> a
* 1000)
IO ()
go