module Avers.Storage where
import Control.Applicative
import Control.Concurrent.STM
import Control.Monad.Random (getRandomR, evalRandIO)
import Control.Monad.State
import Control.Monad.Except
import Data.Char
import Data.Monoid
import Data.Maybe
import qualified Data.ByteString.Lazy as BL
import qualified Data.ByteString.Base16 as BS16
import Data.Vector (Vector)
import qualified Data.Vector as V
import Data.Text (Text)
import qualified Data.Text as T
import qualified Data.Text.Encoding as T
import qualified Data.Map as M
import Data.Time
import Data.List (find)
import Data.Aeson (Value)
import Data.Aeson.Types (emptyObject)
import qualified Crypto.Hash.SHA3 as SHA3
import qualified Database.RethinkDB as R
import Crypto.Scrypt
import Avers.Metrics
import Avers.Types
import Avers.Patching
import Avers.TH
import Avers.Views
import Avers.Storage.Backend
import Avers.Storage.Expressions
requireResult :: AversError -> Maybe a -> Avers a
requireResult err Nothing = throwError err
requireResult _ (Just v) = return v
objectsTable, sessionsTable, snapshotsTable, patchesTable, secretsTable, blobsTable :: R.Exp R.Table
blobsTable = R.Table Nothing $ R.lift ("blobs" :: Text)
objectsTable = R.Table Nothing $ R.lift ("objects" :: Text)
patchesTable = R.Table Nothing $ R.lift ("patches" :: Text)
secretsTable = R.Table Nothing $ R.lift ("secrets" :: Text)
sessionsTable = R.Table Nothing $ R.lift ("sessions" :: Text)
snapshotsTable = R.Table Nothing $ R.lift ("snapshots" :: Text)
trace :: String -> Avers a -> Avers a
trace label act = do
start <- liftIO $ getCurrentTime
ret <- act
end <- liftIO $ getCurrentTime
reportPoint
("avers.storage." <> (T.pack label) <> ".duration")
(realToFrac $ diffUTCTime end start)
return ret
exists :: ObjId -> Avers Bool
exists objId = trace "exists" $
existsDocument objectsTable (BaseObjectId objId)
lookupObject :: ObjId -> Avers Object
lookupObject objId = trace "lookupObject" $ do
mbObject <- lookupDocument objectsTable (BaseObjectId objId)
requireResult (ObjectNotFound objId) mbObject
createObject :: (ToJSON a) => ObjectType a -> ObjId -> a -> Avers ObjId
createObject ot@ObjectType{..} createdBy content = do
objId <- otId
now <- liftIO $ getCurrentTime
let object = Object objId otType now createdBy Nothing
boId = BaseObjectId objId
snapshot = Snapshot boId zeroRevId (toJSON content)
insertDocument objectsTable object
insertDocument snapshotsTable snapshot
updateObjectViews ot objId (Just content)
return objId
deleteObject :: ObjId -> Avers ()
deleteObject objId = do
obj <- lookupObject objId
upsertDocument objectsTable (obj { objectDeleted = Just True })
SomeObjectType objType <- lookupObjectType (objectType obj)
updateObjectViews objType objId Nothing
pruneObject :: ObjId -> Avers ()
pruneObject objId = do
obj <- lookupObject objId
SomeObjectType objType <- lookupObjectType (objectType obj)
unless (objectDeleted obj == Just True) $
strErr $ "pruneObject: object " ++ show objId ++ " is not deleted"
patches <- patchesAfterRevision (BaseObjectId objId) zeroRevId
void $ mapM (deleteDocument patchesTable) patches
snapshotDatums <- runQueryCollect $
R.BetweenIndexed
"objectSnapshotSequence"
( R.Closed $ R.Array $ V.fromList [R.String $ toPk objId, R.Number $ fromIntegral (0 :: Int)]
, R.Open $ R.Array $ V.fromList [R.String $ toPk objId, R.Number $ fromIntegral (99999999 :: Int)]
)
snapshotsTable
snapshots <- V.mapM parseDatum snapshotDatums :: Avers (Vector Snapshot)
void $ V.mapM (deleteDocument snapshotsTable) snapshots
forM_ (otViews objType) $ \(SomeView view) -> do
deleteDocument (viewTable view) objId
void $ deleteDocument objectsTable objId
objectContent :: (FromJSON a) => ObjectId -> Avers a
objectContent objId = do
Snapshot{..} <- lookupLatestSnapshot objId
parseValue snapshotContent
lookupLatestSnapshot :: ObjectId -> Avers Snapshot
lookupLatestSnapshot objId = trace "lookupLatestSnapshot" $ do
snapshot <- newestSnapshot objId
patches <- patchesAfterRevision objId (snapshotRevisionId snapshot)
applyPatches snapshot patches
applyPatchToSnapshot :: Snapshot -> Patch -> Avers Snapshot
applyPatchToSnapshot snapshot@Snapshot{..} Patch{..} =
case applyOperation snapshotContent patchOperation of
Left e -> patchError e
Right v -> return $ snapshot { snapshotContent = v
, snapshotRevisionId = patchRevisionId
}
applyPatches :: Snapshot -> [Patch] -> Avers Snapshot
applyPatches snapshot patches =
foldM applyPatchToSnapshot snapshot patches
lookupRecentRevision :: ObjectId -> Avers (Maybe RevId)
lookupRecentRevision objId = do
m <- liftIO . atomically . readTVar =<< gets recentRevisionCache
return $ M.lookup objId m
updateRecentRevision :: ObjectId -> RevId -> Avers ()
updateRecentRevision objId revId = do
cache <- gets recentRevisionCache
liftIO $ atomically $ modifyTVar' cache $
M.insertWith max objId revId
newestSnapshot :: ObjectId -> Avers Snapshot
newestSnapshot objId = trace "newestSnapshot" $ do
(RevId revId) <- fromMaybe zeroRevId <$> lookupRecentRevision objId
snapshot <- runQueryDatum $ headE $
R.OrderByIndexed (R.Descending "objectSnapshotSequence") $
R.BetweenIndexed "objectSnapshotSequence" ((lowerBound revId), upperBound) $
snapshotsTable
updateRecentRevision objId (snapshotRevisionId snapshot)
return snapshot
where
lowerBound revId = R.Closed $ R.Array $ V.fromList [R.String $ toPk objId, R.Number $ fromIntegral revId]
upperBound = R.Open $ R.Array $ V.fromList [R.String $ toPk objId, R.Number $ fromIntegral (99999999 :: Int)]
lookupSnapshot :: ObjectId -> RevId -> Avers Snapshot
lookupSnapshot objId (RevId revId) = trace "lookupSnapshot" $ do
snapshot <- runQueryDatum $ headE $
R.OrderByIndexed (R.Descending "objectSnapshotSequence") $
R.BetweenIndexed "objectSnapshotSequence" (lowerBound, upperBound) $
snapshotsTable
patches0 <- patchesAfterRevision objId (snapshotRevisionId snapshot)
let patches = filter (\Patch{..} -> patchRevisionId <= RevId revId) patches0
foldM applyPatchToSnapshot snapshot patches
where
lowerBound = R.Closed $ R.Array $ V.fromList [R.String $ toPk objId, R.Number 0]
upperBound = R.Closed $ R.Array $ V.fromList [R.String $ toPk objId, R.Number $ fromIntegral revId]
savePatch :: Patch -> Avers ()
savePatch = insertDocument patchesTable
saveSnapshot :: Snapshot -> Avers ()
saveSnapshot = insertDocument snapshotsTable
updateSecret :: SecretId -> Text -> Avers ()
updateSecret secId secret = do
ep <- liftIO $ encryptPassIO defaultParams (Pass $ T.encodeUtf8 secret)
saveSecretValue secId ep
verifySecret :: SecretId -> Text -> Avers ()
verifySecret secId secret = do
Secret{..} <- requireResult (DocumentNotFound $ toPk secId) =<<
lookupDocument secretsTable secId
let ep = EncryptedPass $ T.encodeUtf8 secretValue
let (ok, new) = verifyPass defaultParams (Pass $ T.encodeUtf8 secret) ep
when (not ok) $ strErr $ "Not Found"
maybe (return ()) (saveSecretValue secId) new
saveSecretValue :: SecretId -> EncryptedPass -> Avers ()
saveSecretValue secId (EncryptedPass x) = do
upsertDocument secretsTable $ Secret secId $ T.decodeUtf8 x
patchesAfterRevision :: ObjectId -> RevId -> Avers [Patch]
patchesAfterRevision objId (RevId revId) = trace "patchesAfterRevision" $ do
res <- runQuery $
R.OrderBy [R.Ascending "revisionId"] $
R.BetweenIndexed "objectPatchSequence" (lowerBound, upperBound) $
patchesTable
V.toList <$> V.mapM parseDatum res
where
lowerBound = R.Open $ R.Array $ V.fromList [R.String $ toPk objId, R.Number $ fromIntegral revId]
upperBound = R.Open $ R.Array $ V.fromList [R.String $ toPk objId, R.Number $ fromIntegral (revId + 999999)]
lookupPatch :: ObjectId -> RevId -> Avers Patch
lookupPatch objId revId = trace "lookupPatch" $ do
runQuerySingleSelection $ R.Get patchesTable $ R.lift $
toPk objId <> "@" <> toPk revId
lookupObjectType :: Text -> Avers SomeObjectType
lookupObjectType objType = do
types <- objectTypes <$> gets config
case find (\(SomeObjectType ObjectType{..}) -> otType == objType) types of
Nothing -> throwError $ UnknownObjectType objType
Just x -> return x
applyObjectUpdates
:: ObjectId
-> RevId
-> ObjId
-> [Operation]
-> Bool
-> Avers ([Patch], Int, [Patch])
applyObjectUpdates objId revId committerId ops novalidate = trace "applyObjectUpdates" $ do
obj <- lookupObject baseObjId
SomeObjectType ot <- lookupObjectType (objectType obj)
baseSnapshot <- lookupSnapshot objId revId
previousPatches <- patchesAfterRevision objId revId
latestSnapshot <- applyPatches baseSnapshot previousPatches
(Snapshot{..}, PatchState{..}) <- runStateT (patchHandler novalidate) $
PatchState ot objId revId committerId ops 0
baseSnapshot latestSnapshot previousPatches []
unless novalidate $ do
content <- parseValue snapshotContent
updateObjectViews ot baseObjId (Just content)
return (previousPatches, psNumConsumedOperations, psPatches)
where
baseObjId = objectIdBase objId
data PatchState a = PatchState
{ psObjectType :: ObjectType a
, psObjectId :: ObjectId
, psRevisionId :: RevId
, psCommitterId :: ObjId
, psOperations :: [ Operation ]
, psNumConsumedOperations :: Int
, psBaseSnapshot :: Snapshot
, psLatestSnapshot :: Snapshot
, psPreviousPatches :: [ Patch ]
, psPatches :: [ Patch ]
}
type AversPatch a b = StateT (PatchState a) Avers b
patchHandler :: (FromJSON a) => Bool -> AversPatch a Snapshot
patchHandler novalidate = do
PatchState{..} <- get
foldM (saveOperation $ snapshotContent psBaseSnapshot)
psLatestSnapshot psOperations
where
saveOperation baseContent snapshot@Snapshot{..} op = do
PatchState{..} <- get
case rebaseOperation baseContent op psPreviousPatches of
Nothing -> return snapshot
Just op' -> do
now <- liftIO $ getCurrentTime
let revId = succ snapshotRevisionId
patch = Patch psObjectId revId psCommitterId now op'
case applyOperation snapshotContent op' of
Left e -> error $ "Failure: " ++ (show e)
Right newContent
| newContent /= snapshotContent -> do
unless novalidate $ do
lift $ validateWithType psObjectType newContent
let newSnapshot = snapshot { snapshotContent = newContent
, snapshotRevisionId = revId
}
lift $ savePatch patch
modify $ \s -> s
{ psPatches = psPatches ++ [patch]
, psNumConsumedOperations = psNumConsumedOperations + 1
}
lift $ saveSnapshot newSnapshot
return newSnapshot
| otherwise -> return snapshot
existsBlob :: BlobId -> Avers Bool
existsBlob = existsDocument blobsTable
lookupBlob :: BlobId -> Avers Blob
lookupBlob bId = lookupDocument blobsTable bId >>=
requireResult (DocumentNotFound $ toPk bId)
insertBlob :: Blob -> Avers ()
insertBlob = insertDocument blobsTable
saveBlobContent :: Blob -> BL.ByteString -> Avers ()
saveBlobContent Blob{..} content = do
cfg <- gets config
liftIO $ (putBlob cfg) blobId blobContentType content
saveSession :: Session -> Avers ()
saveSession = insertDocument sessionsTable
lookupSession :: SessionId -> Avers Session
lookupSession sessId = lookupDocument sessionsTable sessId >>=
requireResult (DocumentNotFound $ toPk sessId)
dropSession :: SessionId -> Avers ()
dropSession sessId = void $ deleteDocument sessionsTable sessId
newId :: Int -> IO Text
newId n = T.pack <$> take n <$> randomAlphanumericSequence
where
randomAlphanumericSequence :: IO String
randomAlphanumericSequence = map alnum <$>
evalRandIO (sequence $ repeat $ getRandomR (0, 61))
alnum :: Int -> Char
alnum x
| x < 26 = chr ((x ) + 65)
| x < 52 = chr ((x 26) + 97)
| x < 62 = chr ((x 52) + 48)
| otherwise = error $ "Out of range: " ++ show x
validateObject :: Text -> Value -> Avers ()
validateObject objType value = do
(SomeObjectType ot) <- lookupObjectType objType
validateWithType ot value
validateWithType :: (FromJSON a) => ObjectType a -> Value -> Avers ()
validateWithType ot value = case parseValueAs ot value of
Left e -> throwError e
Right _ -> return ()
lookupRelease :: ObjId -> RevId -> Avers Release
lookupRelease objId revId = objectContent (ReleaseObjectId objId revId)
createRelease :: ObjId -> RevId -> Avers ()
createRelease objId revId = do
objectExists <- exists objId
unless objectExists $ throwError (ObjectNotFound objId)
exists' <- existsDocument snapshotsTable (toPk snapshot)
unless exists' $ insertDocument snapshotsTable snapshot
where
objectId = ReleaseObjectId objId revId
snapshot = Snapshot objectId zeroRevId emptyObject
lookupLatestRelease :: ObjId -> Avers (Maybe RevId)
lookupLatestRelease objId = do
let match = R.lift $ "^" <> toPk objId <> "/release/"
predicate :: R.Exp R.Object -> R.Exp Bool
predicate x = R.Coerce
(R.Match (objectFieldE "objectId" x) match)
(R.lift ("bool"::Text))
oId <- runQueryDatum $
(objectFieldE "objectId" :: R.Exp R.Object -> R.Exp R.Datum) $
headE $
R.OrderBy [R.Descending "objectId"] $
R.Filter predicate snapshotsTable
case oId of
ReleaseObjectId objId1 revId -> do
when (objId /= objId1) $ databaseError "lookupLatestRelease: objId do not match"
return $ Just revId
_ -> return Nothing
createBlob :: BL.ByteString -> Text -> Avers Blob
createBlob body contentType = do
ex <- existsBlob (blobId blob)
unless ex $ do
saveBlobContent blob body
insertBlob blob
return blob
where
size = fromIntegral $ BL.length body
hash = BS16.encode $ SHA3.hashlazy 256 body
blob = Blob (BlobId $ T.decodeUtf8 hash) size contentType
objectsOfType :: ObjectType a -> Avers (Vector ObjId)
objectsOfType objType = do
let predicate :: R.Exp R.Object -> R.Exp Bool
predicate = objectFieldEqE "type" (otType objType)
res <- runQueryCollect $
R.Map mapId $
R.OrderBy [R.Descending "createdAt", R.Ascending "id"] $
R.Filter isNotDeleted $
R.Filter predicate objectsTable
return $ V.map ObjId res
allObjectsOfType :: ObjectType a -> Avers (Vector ObjId)
allObjectsOfType objType = do
let predicate :: R.Exp R.Object -> R.Exp Bool
predicate = objectFieldEqE "type" (otType objType)
res <- runQueryCollect $
R.Map mapId $
R.OrderBy [R.Descending "createdAt", R.Ascending "id"] $
R.Filter predicate objectsTable
return $ V.map ObjId res
isNotDeleted :: R.Exp R.Object -> R.Exp Bool
isNotDeleted x = R.Any
[ R.Not $ R.HasFields ["deleted"] x
, objectFieldEqE "deleted" False x
]
mapId:: R.Exp R.Object -> R.Exp Text
mapId = R.GetField "id"