module Cook.State.Manager
( StateManager, HashManager(..)
, createStateManager, markUsingImage
, isImageKnown, fastFileHash
, syncImages, waitForWrites
, getImageId, setImageId
, _STATE_DIR_NAME_, findStateDirectory
)
where
import Cook.Util
import Cook.State.Model
import Cook.Types
import Control.Applicative
import Control.Concurrent
import Control.Concurrent.STM
import Control.Exception
import Control.Monad
import Control.Monad.Logger hiding (logInfo, logDebug, logError, logWarn)
import Control.Monad.State
import Control.Monad.Trans.Resource
import Control.Retry
import Data.Maybe
import Data.Time.Clock
import Data.Time.Clock.POSIX
import Database.Persist.Sqlite
import System.Directory
import System.FilePath
import System.Posix.Files
import qualified Data.HashMap.Strict as HM
import qualified Data.Text as T
import qualified Data.Vector as V
data StateManager
= StateManager
{ sm_runSqlGet :: forall a. SqlPersistM a -> IO a
, sm_runSqlWrite :: SqlPersistM () -> IO ()
, sm_databaseFile :: FilePath
, sm_waitForWrites :: IO ()
}
data HashManager
= HashManager
{ hm_lookup :: forall m. MonadIO m => FilePath -> m SHA1 -> m SHA1
, hm_didFileChange :: forall m. MonadIO m => FilePath -> m Bool
}
fastFileHash :: forall m. MonadIO m => HashManager -> FilePath -> m SHA1 -> m SHA1
fastFileHash hm = hm_lookup hm
_STATE_DIR_NAME_ = ".kitchen"
findStateDirectory :: IO FilePath
findStateDirectory =
do currentDir <- getCurrentDirectory
checkLoop currentDir
where
checkLoop parentDir =
do logDebug $ "Looking for a " ++ _STATE_DIR_NAME_ ++ " directory in " ++ parentDir
isThere <- doesDirectoryExist (parentDir </> _STATE_DIR_NAME_)
if isThere
then return (parentDir </> _STATE_DIR_NAME_)
else do when (normalise parentDir == "/" || normalise parentDir == "") $
fail "Can't find my kitchen! Did you run dockercook init?"
checkLoop (normalise $ takeDirectory parentDir)
waitForWrites :: StateManager -> IO ()
waitForWrites st =
sm_waitForWrites st
createStateManager :: FilePath -> IO (StateManager, HashManager)
createStateManager stateDirectory =
do logDebug $ "Creating state manager with directory " ++ stateDirectory
pool <- runNoLoggingT $ createSqlitePool (T.pack sqlLoc) 5
let runSql action =
let tryTx = (runResourceT . runNoLoggingT . ((flip runSqlPool) pool)) action
in (recoverAll (exponentialBackoff 500000 <> limitRetries 5) tryTx) `catch` \(e :: SomeException) ->
fail $ "Sqlite-Transaction finally failed: " ++ show e
runSql (runMigration migrateState)
sqlQueue <- newTBQueueIO 100
allHashes <- runSql $ selectList [] []
logDebug $ "Initializing hash manager"
let hashMap =
foldl (\hm entity ->
let v = entityVal entity
in HM.insert (dbHashCacheFullPath v) (dbHashCacheMtime v, SHA1 $ dbHashCacheHash v) hm
) HM.empty allHashes
hashMapV <- newTVarIO hashMap
hashWriteChan <- newTBQueueIO 1000
workV <- newTVarIO False
let waitWrites =
atomically $
do emptyQ <- isEmptyTBQueue sqlQueue
unless emptyQ $ retry
stateMgr =
StateManager
{ sm_runSqlGet =
\action ->
do liftIO waitWrites
runSql action
, sm_runSqlWrite =
\writeReq ->
atomically $ writeTBQueue sqlQueue writeReq
, sm_databaseFile = stateDirectory </> "database.db"
, sm_waitForWrites =
do waitForHashes workV hashWriteChan
waitWrites
}
_ <- forkIO (hashManagerPersistWorker stateMgr workV hashWriteChan)
_ <- forkIO (queueWorker runSql sqlQueue)
let hashMgr =
HashManager
{ hm_lookup = hashManagerLookup stateMgr hashMapV hashWriteChan
, hm_didFileChange = didFileChange stateMgr hashMapV
}
return (stateMgr, hashMgr)
where
sqlLoc = stateDirectory </> "database.db"
queueWorker runSql queue =
do action <- atomically $ readTBQueue queue
_ <- runSql action
queueWorker runSql queue
waitForHashes :: TVar Bool -> TBQueue DbHashCache -> IO ()
waitForHashes workEnqueuedVar hashWriteChan =
atomically $
do queueEmpty <- isEmptyTBQueue hashWriteChan
workEnqueued <- readTVar workEnqueuedVar
when ((not queueEmpty) || workEnqueued) retry
hashManagerPersistWorker :: StateManager -> TVar Bool -> TBQueue DbHashCache -> IO ()
hashManagerPersistWorker (StateManager{..}) workEnqueuedVar hashWriteChan =
do batchV <- newTVarIO V.empty
_ <- forkIO (batchBuilder batchV)
loop batchV
where
batchBuilder :: TVar (V.Vector DbHashCache) -> IO ()
batchBuilder batchV =
do writeOp <-
atomically $
do writeTVar workEnqueuedVar True
readTBQueue hashWriteChan
atomically $ modifyTVar' batchV (\v -> V.snoc v writeOp)
batchBuilder batchV
loop :: TVar (V.Vector DbHashCache) -> IO ()
loop batchV =
do logDebug $ "Waiting for next hash batch to arrive"
writeBatch <-
atomically $
do v <- readTVar batchV
when (V.null v) retry
writeTVar batchV V.empty
return v
let sqlAction =
sm_runSqlWrite $
do let xs = V.toList writeBatch
mapM_ (\oldHash -> deleteWhere [DbHashCacheFullPath ==. oldHash]) (map dbHashCacheFullPath xs)
_ <- insertMany xs
logDebug $ "Stored " ++ (show $ V.length writeBatch) ++ " hashes"
return ()
logDebug $ "Storing " ++ (show $ V.length writeBatch) ++ " hashes in database."
atomically $ writeTVar workEnqueuedVar False
sqlAction `catch` \(e :: SomeException) ->
do logError $ "Hash persist error: " ++ show e
atomically $
do modifyTVar' batchV (\newV -> V.concat [writeBatch, newV])
writeTVar workEnqueuedVar True
threadDelay 1000000
loop batchV
didFileChange :: MonadIO m
=> StateManager
-> TVar (HM.HashMap FilePath (UTCTime, SHA1))
-> FilePath
-> m Bool
didFileChange (StateManager{..}) hashMapV fullFilePath =
liftIO $
do fileExists <- doesFileExist fullFilePath
if not fileExists
then return True
else do currModTime <- getModTime fullFilePath
mEntry <- liftIO $ atomically $ HM.lookup fullFilePath <$> readTVar hashMapV
case mEntry of
Nothing ->
return True
Just (oldMtime, _) ->
return (oldMtime /= currModTime)
getModTime :: FilePath -> IO UTCTime
getModTime fullFilePath =
do stat <- liftIO $ getFileStatus fullFilePath
return $ (\t -> posixSecondsToUTCTime (realToFrac t :: POSIXTime)) $ modificationTime stat
hashManagerLookup :: MonadIO m
=> StateManager
-> TVar (HM.HashMap FilePath (UTCTime, SHA1))
-> TBQueue DbHashCache
-> FilePath
-> m SHA1
-> m SHA1
hashManagerLookup (StateManager{..}) hashMapV hashWriteChan fullFilePath computeHash =
do modTime <- liftIO $ getModTime fullFilePath
mEntry <- liftIO $ atomically $ HM.lookup fullFilePath <$> readTVar hashMapV
let recomputeHash =
do newHash <- computeHash
liftIO $ atomically $
do modifyTVar' hashMapV (HM.insert fullFilePath (modTime, newHash))
writeTBQueue hashWriteChan (DbHashCache fullFilePath modTime (unSha1 newHash))
return newHash
case mEntry of
Just (mtime, oldHash) ->
if mtime == modTime
then return oldHash
else recomputeHash
Nothing ->
recomputeHash
syncImages :: StateManager -> (DockerImage -> IO Bool) -> IO ()
syncImages (StateManager{..}) imageStillExists =
do x <- sm_runSqlGet $ selectList [] []
forM_ x $ \entity ->
do let dockerImage = entityVal entity
name = dbDockerImageName dockerImage
exists <- imageStillExists (DockerImage name)
unless exists $
do logInfo ("The image " ++ T.unpack name
++ " doesn't exist on remote docker server. Removing it from local state.")
sm_runSqlWrite $ delete (entityKey entity)
sm_waitForWrites
isImageKnown :: StateManager -> DockerImage -> IO Bool
isImageKnown (StateManager{..}) (DockerImage imageName) =
do x <- sm_runSqlGet $ getBy (UniqueDbDockerImage imageName)
return (isJust x)
getImageId :: StateManager -> DockerImage -> IO (Maybe DockerImageId)
getImageId (StateManager{..}) (DockerImage imageName) =
do x <- sm_runSqlGet $ getBy (UniqueDbDockerImage imageName)
case x of
Nothing -> return Nothing
Just entity ->
return $ fmap DockerImageId (dbDockerImageRawImageId $ entityVal entity)
setImageId :: StateManager -> DockerImage -> DockerImageId -> IO ()
setImageId (StateManager{..}) (DockerImage imageName) (DockerImageId imageId) =
sm_runSqlWrite $ updateWhere [ DbDockerImageName ==. imageName ] [ DbDockerImageRawImageId =. (Just imageId) ]
markUsingImage :: StateManager -> DockerImage -> IO ()
markUsingImage (StateManager{..}) (DockerImage imageName) =
do mImageEntity <- sm_runSqlGet $ getBy (UniqueDbDockerImage imageName)
now <- getCurrentTime
case mImageEntity of
Nothing ->
sm_runSqlWrite $
do _ <- insert $ DbDockerImage imageName Nothing now now 1
return ()
Just imageEntity ->
sm_runSqlGet $ update (entityKey imageEntity) [ DbDockerImageUsageCount +=. 1
, DbDockerImageLastUsed =. now
]