module Cachix.Client.PushQueue
( startWorkers,
Queue,
)
where
import qualified Cachix.Client.Push as Push
import Cachix.Client.Retry (retryAll)
import Control.Concurrent.Async
import Control.Concurrent.STM (TVar, modifyTVar', newTVarIO, readTVar)
import qualified Control.Concurrent.STM.Lock as Lock
import qualified Control.Concurrent.STM.TBQueue as TBQueue
import qualified Data.Set as S
import Hercules.CNix.Store (StorePath)
import Protolude
import qualified System.Posix.Signals as Signals
type Queue = TBQueue.TBQueue StorePath
data PushWorkerState = PushWorkerState
{ PushWorkerState -> Queue
pushQueue :: Queue,
PushWorkerState -> TVar Int
inProgress :: TVar Int
}
data QueryWorkerState = QueryWorkerState
{ QueryWorkerState -> Queue
queryQueue :: Queue,
QueryWorkerState -> Set StorePath
alreadyQueued :: S.Set StorePath,
QueryWorkerState -> Lock
lock :: Lock.Lock
}
worker :: Push.PushParams IO () -> PushWorkerState -> IO ()
worker :: PushParams IO () -> PushWorkerState -> IO ()
worker PushParams IO ()
pushParams PushWorkerState
workerState = IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
StorePath
storePath <- STM StorePath -> IO StorePath
forall a. STM a -> IO a
atomically (STM StorePath -> IO StorePath) -> STM StorePath -> IO StorePath
forall a b. (a -> b) -> a -> b
$ Queue -> STM StorePath
forall a. TBQueue a -> STM a
TBQueue.readTBQueue (Queue -> STM StorePath) -> Queue -> STM StorePath
forall a b. (a -> b) -> a -> b
$ PushWorkerState -> Queue
pushQueue PushWorkerState
workerState
IO () -> IO () -> IO () -> IO ()
forall a b c. IO a -> IO b -> IO c -> IO c
bracket_ ((Int -> Int) -> IO ()
inProgresModify (Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)) ((Int -> Int) -> IO ()
inProgresModify (\Int
x -> Int
x Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1)) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
(RetryStatus -> IO ()) -> IO ()
forall (m :: * -> *) a.
(MonadIO m, MonadMask m) =>
(RetryStatus -> m a) -> m a
retryAll ((RetryStatus -> IO ()) -> IO ())
-> (RetryStatus -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$
PushParams IO () -> StorePath -> RetryStatus -> IO ()
forall (m :: * -> *) r.
(MonadMask m, MonadIO m) =>
PushParams m r -> StorePath -> RetryStatus -> m r
Push.uploadStorePath PushParams IO ()
pushParams StorePath
storePath
where
inProgresModify :: (Int -> Int) -> IO ()
inProgresModify Int -> Int
f =
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' (PushWorkerState -> TVar Int
inProgress PushWorkerState
workerState) Int -> Int
f
startWorkers :: Int -> (Queue -> IO (IO ())) -> Push.PushParams IO () -> IO ()
startWorkers :: Int -> (Queue -> IO (IO ())) -> PushParams IO () -> IO ()
startWorkers Int
numWorkers Queue -> IO (IO ())
mkProducer PushParams IO ()
pushParams = do
(Queue
newQueryQueue, Queue
newPushQueue, Lock
newLock) <-
STM (Queue, Queue, Lock) -> IO (Queue, Queue, Lock)
forall a. STM a -> IO a
atomically (STM (Queue, Queue, Lock) -> IO (Queue, Queue, Lock))
-> STM (Queue, Queue, Lock) -> IO (Queue, Queue, Lock)
forall a b. (a -> b) -> a -> b
$
(,,) (Queue -> Queue -> Lock -> (Queue, Queue, Lock))
-> STM Queue -> STM (Queue -> Lock -> (Queue, Queue, Lock))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Natural -> STM Queue
forall a. Natural -> STM (TBQueue a)
TBQueue.newTBQueue Natural
10000 STM (Queue -> Lock -> (Queue, Queue, Lock))
-> STM Queue -> STM (Lock -> (Queue, Queue, Lock))
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Natural -> STM Queue
forall a. Natural -> STM (TBQueue a)
TBQueue.newTBQueue Natural
10000 STM (Lock -> (Queue, Queue, Lock))
-> STM Lock -> STM (Queue, Queue, Lock)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> STM Lock
Lock.new
let queryWorkerState :: QueryWorkerState
queryWorkerState = Queue -> Set StorePath -> Lock -> QueryWorkerState
QueryWorkerState Queue
newQueryQueue Set StorePath
forall a. Set a
S.empty Lock
newLock
Async ()
queryWorker <- IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async (IO () -> IO (Async ())) -> IO () -> IO (Async ())
forall a b. (a -> b) -> a -> b
$ QueryWorkerState -> Queue -> PushParams IO () -> IO ()
queryLoop QueryWorkerState
queryWorkerState Queue
newPushQueue PushParams IO ()
pushParams
IO ()
stopProducerCallback <- Queue -> IO (IO ())
mkProducer Queue
newQueryQueue
TVar Int
progress <- Int -> IO (TVar Int)
forall a. a -> IO (TVar a)
newTVarIO Int
0
let pushWorkerState :: PushWorkerState
pushWorkerState = Queue -> TVar Int -> PushWorkerState
PushWorkerState Queue
newPushQueue TVar Int
progress
Async ()
pushWorker <- IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async (IO () -> IO (Async ())) -> IO () -> IO (Async ())
forall a b. (a -> b) -> a -> b
$ Int -> IO () -> IO ()
forall a. Int -> IO a -> IO ()
replicateConcurrently_ Int
numWorkers (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ PushParams IO () -> PushWorkerState -> IO ()
worker PushParams IO ()
pushParams PushWorkerState
pushWorkerState
IO Handler -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Handler -> IO ()) -> IO Handler -> IO ()
forall a b. (a -> b) -> a -> b
$ Signal -> Handler -> Maybe SignalSet -> IO Handler
Signals.installHandler Signal
Signals.sigINT (IO () -> Handler
Signals.CatchOnce (IO ()
-> Async ()
-> Async ()
-> QueryWorkerState
-> PushWorkerState
-> IO ()
exitOnceQueueIsEmpty IO ()
stopProducerCallback Async ()
pushWorker Async ()
queryWorker QueryWorkerState
queryWorkerState PushWorkerState
pushWorkerState)) Maybe SignalSet
forall a. Maybe a
Nothing
(Async ()
_, Either SomeException ()
eitherException) <- [Async ()] -> IO (Async (), Either SomeException ())
forall a. [Async a] -> IO (Async a, Either SomeException a)
waitAnyCatchCancel [Async ()
pushWorker, Async ()
queryWorker]
case Either SomeException ()
eitherException of
Left SomeException
exc | SomeException -> Maybe StopWorker
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
exc Maybe StopWorker -> Maybe StopWorker -> Bool
forall a. Eq a => a -> a -> Bool
== StopWorker -> Maybe StopWorker
forall a. a -> Maybe a
Just StopWorker
StopWorker -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Left SomeException
exc -> SomeException -> IO ()
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO SomeException
exc
Right () -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
data StopWorker = StopWorker
deriving (StopWorker -> StopWorker -> Bool
(StopWorker -> StopWorker -> Bool)
-> (StopWorker -> StopWorker -> Bool) -> Eq StopWorker
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: StopWorker -> StopWorker -> Bool
$c/= :: StopWorker -> StopWorker -> Bool
== :: StopWorker -> StopWorker -> Bool
$c== :: StopWorker -> StopWorker -> Bool
Eq, Int -> StopWorker -> ShowS
[StopWorker] -> ShowS
StopWorker -> String
(Int -> StopWorker -> ShowS)
-> (StopWorker -> String)
-> ([StopWorker] -> ShowS)
-> Show StopWorker
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [StopWorker] -> ShowS
$cshowList :: [StopWorker] -> ShowS
show :: StopWorker -> String
$cshow :: StopWorker -> String
showsPrec :: Int -> StopWorker -> ShowS
$cshowsPrec :: Int -> StopWorker -> ShowS
Show)
instance Exception StopWorker
queryLoop :: QueryWorkerState -> Queue -> Push.PushParams IO () -> IO ()
queryLoop :: QueryWorkerState -> Queue -> PushParams IO () -> IO ()
queryLoop QueryWorkerState
workerState Queue
pushqueue PushParams IO ()
pushParams = do
StorePath
_ <- STM StorePath -> IO StorePath
forall a. STM a -> IO a
atomically (STM StorePath -> IO StorePath) -> STM StorePath -> IO StorePath
forall a b. (a -> b) -> a -> b
$ Queue -> STM StorePath
forall a. TBQueue a -> STM a
TBQueue.peekTBQueue (QueryWorkerState -> Queue
queryQueue QueryWorkerState
workerState)
(Set StorePath
missingStorePathsSet, Set StorePath
alreadyQueuedSet) <- Lock
-> IO (Set StorePath, Set StorePath)
-> IO (Set StorePath, Set StorePath)
forall a. Lock -> IO a -> IO a
Lock.with (QueryWorkerState -> Lock
lock QueryWorkerState
workerState) (IO (Set StorePath, Set StorePath)
-> IO (Set StorePath, Set StorePath))
-> IO (Set StorePath, Set StorePath)
-> IO (Set StorePath, Set StorePath)
forall a b. (a -> b) -> a -> b
$ do
[StorePath]
storePaths <- STM [StorePath] -> IO [StorePath]
forall a. STM a -> IO a
atomically (STM [StorePath] -> IO [StorePath])
-> STM [StorePath] -> IO [StorePath]
forall a b. (a -> b) -> a -> b
$ Queue -> STM [StorePath]
forall a. TBQueue a -> STM [a]
TBQueue.flushTBQueue (QueryWorkerState -> Queue
queryQueue QueryWorkerState
workerState)
Set StorePath
alreadyQueuedSet <- STM (Set StorePath) -> IO (Set StorePath)
forall a. STM a -> IO a
atomically (STM (Set StorePath) -> IO (Set StorePath))
-> STM (Set StorePath) -> IO (Set StorePath)
forall a b. (a -> b) -> a -> b
$ do
Bool
isEmpty <- Queue -> STM Bool
forall a. TBQueue a -> STM Bool
TBQueue.isEmptyTBQueue Queue
pushqueue
if Bool
isEmpty
then Set StorePath -> STM (Set StorePath)
forall (m :: * -> *) a. Monad m => a -> m a
return Set StorePath
forall a. Set a
S.empty
else Set StorePath -> STM (Set StorePath)
forall (m :: * -> *) a. Monad m => a -> m a
return (Set StorePath -> STM (Set StorePath))
-> Set StorePath -> STM (Set StorePath)
forall a b. (a -> b) -> a -> b
$ QueryWorkerState -> Set StorePath
alreadyQueued QueryWorkerState
workerState
[StorePath]
missingStorePaths <- PushParams IO () -> [StorePath] -> IO [StorePath]
forall (m :: * -> *) r.
(MonadIO m, MonadMask m) =>
PushParams m r -> [StorePath] -> m [StorePath]
Push.getMissingPathsForClosure PushParams IO ()
pushParams [StorePath]
storePaths
let missingStorePathsSet :: Set StorePath
missingStorePathsSet = [StorePath] -> Set StorePath
forall a. Ord a => [a] -> Set a
S.fromList [StorePath]
missingStorePaths
uncachedMissingStorePaths :: Set StorePath
uncachedMissingStorePaths = Set StorePath -> Set StorePath -> Set StorePath
forall a. Ord a => Set a -> Set a -> Set a
S.difference Set StorePath
missingStorePathsSet Set StorePath
alreadyQueuedSet
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ Set StorePath -> (StorePath -> STM ()) -> STM ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ Set StorePath
uncachedMissingStorePaths ((StorePath -> STM ()) -> STM ())
-> (StorePath -> STM ()) -> STM ()
forall a b. (a -> b) -> a -> b
$ Queue -> StorePath -> STM ()
forall a. TBQueue a -> a -> STM ()
TBQueue.writeTBQueue Queue
pushqueue
(Set StorePath, Set StorePath) -> IO (Set StorePath, Set StorePath)
forall (m :: * -> *) a. Monad m => a -> m a
return (Set StorePath
missingStorePathsSet, Set StorePath
alreadyQueuedSet)
QueryWorkerState -> Queue -> PushParams IO () -> IO ()
queryLoop (QueryWorkerState
workerState {alreadyQueued :: Set StorePath
alreadyQueued = Set StorePath -> Set StorePath -> Set StorePath
forall a. Ord a => Set a -> Set a -> Set a
S.union Set StorePath
missingStorePathsSet Set StorePath
alreadyQueuedSet}) Queue
pushqueue PushParams IO ()
pushParams
exitOnceQueueIsEmpty :: IO () -> Async () -> Async () -> QueryWorkerState -> PushWorkerState -> IO ()
exitOnceQueueIsEmpty :: IO ()
-> Async ()
-> Async ()
-> QueryWorkerState
-> PushWorkerState
-> IO ()
exitOnceQueueIsEmpty IO ()
stopProducerCallback Async ()
pushWorker Async ()
queryWorker QueryWorkerState
queryWorkerState PushWorkerState
pushWorkerState = do
Text -> IO ()
forall (m :: * -> *). MonadIO m => Text -> m ()
putText Text
"Stopped watching /nix/store and waiting for queue to empty ..."
IO ()
stopProducerCallback
IO ()
go
where
go :: IO ()
go = do
(Bool
isDone, Int
inprogress, Natural
queueLength) <- STM (Bool, Int, Natural) -> IO (Bool, Int, Natural)
forall a. STM a -> IO a
atomically (STM (Bool, Int, Natural) -> IO (Bool, Int, Natural))
-> STM (Bool, Int, Natural) -> IO (Bool, Int, Natural)
forall a b. (a -> b) -> a -> b
$ do
Natural
pushQueueLength <- Queue -> STM Natural
forall a. TBQueue a -> STM Natural
TBQueue.lengthTBQueue (Queue -> STM Natural) -> Queue -> STM Natural
forall a b. (a -> b) -> a -> b
$ PushWorkerState -> Queue
pushQueue PushWorkerState
pushWorkerState
Natural
queryQueueLength <- Queue -> STM Natural
forall a. TBQueue a -> STM Natural
TBQueue.lengthTBQueue (Queue -> STM Natural) -> Queue -> STM Natural
forall a b. (a -> b) -> a -> b
$ QueryWorkerState -> Queue
queryQueue QueryWorkerState
queryWorkerState
Int
inprogress <- TVar Int -> STM Int
forall a. TVar a -> STM a
readTVar (TVar Int -> STM Int) -> TVar Int -> STM Int
forall a b. (a -> b) -> a -> b
$ PushWorkerState -> TVar Int
inProgress PushWorkerState
pushWorkerState
Bool
isLocked <- Lock -> STM Bool
Lock.locked (QueryWorkerState -> Lock
lock QueryWorkerState
queryWorkerState)
let isDone :: Bool
isDone = Natural
pushQueueLength Natural -> Natural -> Bool
forall a. Eq a => a -> a -> Bool
== Natural
0 Bool -> Bool -> Bool
&& Natural
queryQueueLength Natural -> Natural -> Bool
forall a. Eq a => a -> a -> Bool
== Natural
0 Bool -> Bool -> Bool
&& Int
inprogress Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0 Bool -> Bool -> Bool
&& Bool -> Bool
not Bool
isLocked
(Bool, Int, Natural) -> STM (Bool, Int, Natural)
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool
isDone, Int
inprogress, Natural
pushQueueLength)
if Bool
isDone
then do
Text -> IO ()
forall (m :: * -> *). MonadIO m => Text -> m ()
putText Text
"Done."
Async () -> StopWorker -> IO ()
forall e a. Exception e => Async a -> e -> IO ()
cancelWith Async ()
queryWorker StopWorker
StopWorker
Async () -> StopWorker -> IO ()
forall e a. Exception e => Async a -> e -> IO ()
cancelWith Async ()
pushWorker StopWorker
StopWorker
else do
Text -> IO ()
forall (m :: * -> *). MonadIO m => Text -> m ()
putText (Text -> IO ()) -> Text -> IO ()
forall a b. (a -> b) -> a -> b
$ Text
"Waiting to finish: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Int -> Text
forall a b. (Show a, ConvertText String b) => a -> b
show Int
inprogress Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" pushing, " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Natural -> Text
forall a b. (Show a, ConvertText String b) => a -> b
show Natural
queueLength Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" in queue"
Int -> IO ()
threadDelay (Int
1000 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
1000)
IO ()
go