{-# LANGUAGE BangPatterns #-} {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE RecordWildCards #-} {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE TemplateHaskell #-} {-| This module abstracts the storage engine and provides functions to manipulate objects stored in it. -} module Avers.Storage where import Control.Applicative import Control.Concurrent.STM import Control.Monad.Random (getRandomR, evalRandIO) import Control.Monad.State import Control.Monad.Except import Data.Char import Data.Monoid import Data.Maybe import qualified Data.ByteString.Lazy as BL import qualified Data.ByteString.Base16 as BS16 import Data.Vector (Vector) import qualified Data.Vector as V import Data.Text (Text) import qualified Data.Text as T import qualified Data.Text.Encoding as T import qualified Data.Map as M import Data.Time import Data.List (find) import Data.Aeson (Value) import Data.Aeson.Types (emptyObject) import qualified Crypto.Hash.SHA3 as SHA3 import qualified Database.RethinkDB as R import Crypto.Scrypt import Avers.Metrics import Avers.Types import Avers.Patching import Avers.TH import Avers.Views import Avers.Storage.Backend import Avers.Storage.Expressions requireResult :: AversError -> Maybe a -> Avers a requireResult err Nothing = throwError err requireResult _ (Just v) = return v -- All the tables which this storage engine uses. objectsTable, sessionsTable, snapshotsTable, patchesTable, secretsTable, blobsTable :: R.Exp R.Table blobsTable = R.Table Nothing $ R.lift ("blobs" :: Text) objectsTable = R.Table Nothing $ R.lift ("objects" :: Text) patchesTable = R.Table Nothing $ R.lift ("patches" :: Text) secretsTable = R.Table Nothing $ R.lift ("secrets" :: Text) sessionsTable = R.Table Nothing $ R.lift ("sessions" :: Text) snapshotsTable = R.Table Nothing $ R.lift ("snapshots" :: Text) trace :: String -> Avers a -> Avers a trace label act = do start <- liftIO $ getCurrentTime ret <- act end <- liftIO $ getCurrentTime reportPoint ("avers.storage." <> (T.pack label) <> ".duration") (realToFrac $ diffUTCTime end start) return ret -- | True if the object exists. exists :: ObjId -> Avers Bool exists objId = trace "exists" $ existsDocument objectsTable (BaseObjectId objId) -- | Lookup an 'Object' by its 'ObjId'. Throws 'ObjectNotFound' if the object -- doesn't exist. lookupObject :: ObjId -> Avers Object lookupObject objId = trace "lookupObject" $ do mbObject <- lookupDocument objectsTable (BaseObjectId objId) requireResult (ObjectNotFound objId) mbObject -- | Create a new object of the given type. An initial snapshot ('RevId' 0) -- is created from the supplied content. createObject :: (ToJSON a) => ObjectType a -> ObjId -> a -> Avers ObjId createObject ot@ObjectType{..} createdBy content = do objId <- otId now <- liftIO $ getCurrentTime let object = Object objId otType now createdBy Nothing boId = BaseObjectId objId snapshot = Snapshot boId zeroRevId (toJSON content) insertDocument objectsTable object insertDocument snapshotsTable snapshot updateObjectViews ot objId (Just content) return objId -- | Mark the object as deleted. deleteObject :: ObjId -> Avers () deleteObject objId = do obj <- lookupObject objId upsertDocument objectsTable (obj { objectDeleted = Just True }) SomeObjectType objType <- lookupObjectType (objectType obj) updateObjectViews objType objId Nothing -- | Prune the object from the database. This is only allowed if the object is -- marked as deleted. Note that this is a very dangerous operation, it can not -- be undone. pruneObject :: ObjId -> Avers () pruneObject objId = do obj <- lookupObject objId SomeObjectType objType <- lookupObjectType (objectType obj) unless (objectDeleted obj == Just True) $ strErr $ "pruneObject: object " ++ show objId ++ " is not deleted" -- Drop all related patches. Note that there will never be a patch with -- revision zero, so it's safe to use 'patchesAfterRevision'. patches <- patchesAfterRevision (BaseObjectId objId) zeroRevId void $ mapM (deleteDocument patchesTable) patches -- Drop all related snapshots. snapshotDatums <- runQueryCollect $ R.BetweenIndexed "objectSnapshotSequence" ( R.Closed $ R.Array $ V.fromList [R.String $ toPk objId, R.Number $ fromIntegral (0 :: Int)] , R.Open $ R.Array $ V.fromList [R.String $ toPk objId, R.Number $ fromIntegral (99999999 :: Int)] ) snapshotsTable snapshots <- V.mapM parseDatum snapshotDatums :: Avers (Vector Snapshot) void $ V.mapM (deleteDocument snapshotsTable) snapshots -- Remove related entries from all views. forM_ (otViews objType) $ \(SomeView view) -> do deleteDocument (viewTable view) objId -- Drop the object itself. void $ deleteDocument objectsTable objId ----------------------------------------------------------------------------- -- | Fetch the content of the object and try to parse it. -- -- This function will fail with a 'ParseError' if the content can not be -- decoded into the desired type. objectContent :: (FromJSON a) => ObjectId -> Avers a objectContent objId = do Snapshot{..} <- lookupLatestSnapshot objId parseValue snapshotContent ----------------------------------------------------------------------------- -- | Get the snapshot of the newest revision of the given object. lookupLatestSnapshot :: ObjectId -> Avers Snapshot lookupLatestSnapshot objId = trace "lookupLatestSnapshot" $ do snapshot <- newestSnapshot objId patches <- patchesAfterRevision objId (snapshotRevisionId snapshot) applyPatches snapshot patches -- TODO: Verify that the patch is applicable to the snapshot, ie. -- snapshot revId == patch revId + 1 applyPatchToSnapshot :: Snapshot -> Patch -> Avers Snapshot applyPatchToSnapshot snapshot@Snapshot{..} Patch{..} = case applyOperation snapshotContent patchOperation of Left e -> patchError e Right v -> return $ snapshot { snapshotContent = v , snapshotRevisionId = patchRevisionId } applyPatches :: Snapshot -> [Patch] -> Avers Snapshot applyPatches snapshot patches = foldM applyPatchToSnapshot snapshot patches lookupRecentRevision :: ObjectId -> Avers (Maybe RevId) lookupRecentRevision objId = do m <- liftIO . atomically . readTVar =<< gets recentRevisionCache return $ M.lookup objId m updateRecentRevision :: ObjectId -> RevId -> Avers () updateRecentRevision objId revId = do cache <- gets recentRevisionCache liftIO $ atomically $ modifyTVar' cache $ M.insertWith max objId revId ----------------------------------------------------------------------------- -- | Get the newest snapshot which is stored in the database. The object may -- be at a higher revision if the later snapshots are missing from the -- database. -- -- This is an internal function. If you want the latest snapshot, you should -- use 'lookupLatestSnapshot'. newestSnapshot :: ObjectId -> Avers Snapshot newestSnapshot objId = trace "newestSnapshot" $ do (RevId revId) <- fromMaybe zeroRevId <$> lookupRecentRevision objId snapshot <- runQueryDatum $ headE $ R.OrderByIndexed (R.Descending "objectSnapshotSequence") $ R.BetweenIndexed "objectSnapshotSequence" ((lowerBound revId), upperBound) $ snapshotsTable updateRecentRevision objId (snapshotRevisionId snapshot) return snapshot where lowerBound revId = R.Closed $ R.Array $ V.fromList [R.String $ toPk objId, R.Number $ fromIntegral revId] upperBound = R.Open $ R.Array $ V.fromList [R.String $ toPk objId, R.Number $ fromIntegral (99999999 :: Int)] lookupSnapshot :: ObjectId -> RevId -> Avers Snapshot lookupSnapshot objId (RevId revId) = trace "lookupSnapshot" $ do snapshot <- runQueryDatum $ headE $ R.OrderByIndexed (R.Descending "objectSnapshotSequence") $ R.BetweenIndexed "objectSnapshotSequence" (lowerBound, upperBound) $ snapshotsTable -- Get all patches which we need to apply on top of the snapshot to -- arrive at the desired revision. patches0 <- patchesAfterRevision objId (snapshotRevisionId snapshot) let patches = filter (\Patch{..} -> patchRevisionId <= RevId revId) patches0 foldM applyPatchToSnapshot snapshot patches where lowerBound = R.Closed $ R.Array $ V.fromList [R.String $ toPk objId, R.Number 0] upperBound = R.Closed $ R.Array $ V.fromList [R.String $ toPk objId, R.Number $ fromIntegral revId] savePatch :: Patch -> Avers () savePatch = insertDocument patchesTable saveSnapshot :: Snapshot -> Avers () saveSnapshot = insertDocument snapshotsTable ----------------------------------------------------------------------------- -- Update the secret value. If a secret with the given 'SecretId' does not -- exist yet, one will be created (IOW this function has upsert semantics). updateSecret :: SecretId -> Text -> Avers () updateSecret secId secret = do ep <- liftIO $ encryptPassIO defaultParams (Pass $ T.encodeUtf8 secret) saveSecretValue secId ep ----------------------------------------------------------------------------- -- | Verify the value against the secret. If that fails, then this function -- throws an error. -- -- This function automatically updates the secret in the database if the -- scrypt params have changed. verifySecret :: SecretId -> Text -> Avers () verifySecret secId secret = do Secret{..} <- requireResult (DocumentNotFound $ toPk secId) =<< lookupDocument secretsTable secId let ep = EncryptedPass $ T.encodeUtf8 secretValue let (ok, new) = verifyPass defaultParams (Pass $ T.encodeUtf8 secret) ep -- If the secret doens't match, return NotFound when (not ok) $ strErr $ "Not Found" -- Update the database if we got a new value from the verify step. This -- is the case when the scrypt params have changed. See documentation of -- 'verifyPass'. maybe (return ()) (saveSecretValue secId) new ----------------------------------------------------------------------------- -- | Internal function which actually saves a secret in the database. saveSecretValue :: SecretId -> EncryptedPass -> Avers () saveSecretValue secId (EncryptedPass x) = do upsertDocument secretsTable $ Secret secId $ T.decodeUtf8 x patchesAfterRevision :: ObjectId -> RevId -> Avers [Patch] patchesAfterRevision objId (RevId revId) = trace "patchesAfterRevision" $ do res <- runQuery $ R.OrderBy [R.Ascending "revisionId"] $ R.BetweenIndexed "objectPatchSequence" (lowerBound, upperBound) $ patchesTable V.toList <$> V.mapM parseDatum res where lowerBound = R.Open $ R.Array $ V.fromList [R.String $ toPk objId, R.Number $ fromIntegral revId] upperBound = R.Open $ R.Array $ V.fromList [R.String $ toPk objId, R.Number $ fromIntegral (revId + 999999)] lookupPatch :: ObjectId -> RevId -> Avers Patch lookupPatch objId revId = trace "lookupPatch" $ do runQuerySingleSelection $ R.Get patchesTable $ R.lift $ toPk objId <> "@" <> toPk revId -- | Lookup an object type which is registered in the Avers monad. lookupObjectType :: Text -> Avers SomeObjectType lookupObjectType objType = do types <- objectTypes <$> gets config case find (\(SomeObjectType ObjectType{..}) -> otType == objType) types of Nothing -> throwError $ UnknownObjectType objType Just x -> return x applyObjectUpdates :: ObjectId -- ^ The object which you want to update -> RevId -- ^ The 'RevId' against which the operations were created -> ObjId -- ^ Committer -> [Operation] -- ^ The operations to apply -> Bool -- ^ True if validation should be skipped -> Avers ([Patch], Int, [Patch]) applyObjectUpdates objId revId committerId ops novalidate = trace "applyObjectUpdates" $ do -- First check that the object exists. We'll need its metadata later. obj <- lookupObject baseObjId SomeObjectType ot <- lookupObjectType (objectType obj) -- The 'Snapshot' against which the submitted operations were created. baseSnapshot <- lookupSnapshot objId revId -- If there are any patches which the client doesn't know about we need -- to let her know. previousPatches <- patchesAfterRevision objId revId latestSnapshot <- applyPatches baseSnapshot previousPatches -- Apply the operations and get the final snapshot. (Snapshot{..}, PatchState{..}) <- runStateT (patchHandler novalidate) $ PatchState ot objId revId committerId ops 0 baseSnapshot latestSnapshot previousPatches [] -- Update object views. unless novalidate $ do content <- parseValue snapshotContent updateObjectViews ot baseObjId (Just content) return (previousPatches, psNumConsumedOperations, psPatches) where baseObjId = objectIdBase objId data PatchState a = PatchState { psObjectType :: ObjectType a , psObjectId :: ObjectId , psRevisionId :: RevId , psCommitterId :: ObjId , psOperations :: [ Operation ] , psNumConsumedOperations :: Int , psBaseSnapshot :: Snapshot , psLatestSnapshot :: Snapshot , psPreviousPatches :: [ Patch ] , psPatches :: [ Patch ] } type AversPatch a b = StateT (PatchState a) Avers b patchHandler :: (FromJSON a) => Bool -> AversPatch a Snapshot patchHandler novalidate = do PatchState{..} <- get foldM (saveOperation $ snapshotContent psBaseSnapshot) psLatestSnapshot psOperations where saveOperation baseContent snapshot@Snapshot{..} op = do PatchState{..} <- get case rebaseOperation baseContent op psPreviousPatches of Nothing -> return snapshot Just op' -> do now <- liftIO $ getCurrentTime let revId = succ snapshotRevisionId patch = Patch psObjectId revId psCommitterId now op' case applyOperation snapshotContent op' of Left e -> error $ "Failure: " ++ (show e) Right newContent | newContent /= snapshotContent -> do unless novalidate $ do lift $ validateWithType psObjectType newContent let newSnapshot = snapshot { snapshotContent = newContent , snapshotRevisionId = revId } -- Now we know that the patch can be applied cleanly, so -- we can save it in the database. lift $ savePatch patch modify $ \s -> s { psPatches = psPatches ++ [patch] , psNumConsumedOperations = psNumConsumedOperations + 1 } lift $ saveSnapshot newSnapshot return newSnapshot | otherwise -> return snapshot existsBlob :: BlobId -> Avers Bool existsBlob = existsDocument blobsTable lookupBlob :: BlobId -> Avers Blob lookupBlob bId = lookupDocument blobsTable bId >>= requireResult (DocumentNotFound $ toPk bId) insertBlob :: Blob -> Avers () insertBlob = insertDocument blobsTable saveBlobContent :: Blob -> BL.ByteString -> Avers () saveBlobContent Blob{..} content = do cfg <- gets config liftIO $ (putBlob cfg) blobId blobContentType content saveSession :: Session -> Avers () saveSession = insertDocument sessionsTable lookupSession :: SessionId -> Avers Session lookupSession sessId = lookupDocument sessionsTable sessId >>= requireResult (DocumentNotFound $ toPk sessId) dropSession :: SessionId -> Avers () dropSession sessId = void $ deleteDocument sessionsTable sessId newId :: Int -> IO Text newId n = T.pack <$> take n <$> randomAlphanumericSequence where randomAlphanumericSequence :: IO String randomAlphanumericSequence = map alnum <$> evalRandIO (sequence $ repeat $ getRandomR (0, 61)) alnum :: Int -> Char alnum x | x < 26 = chr ((x ) + 65) | x < 52 = chr ((x - 26) + 97) | x < 62 = chr ((x - 52) + 48) | otherwise = error $ "Out of range: " ++ show x validateObject :: Text -> Value -> Avers () validateObject objType value = do (SomeObjectType ot) <- lookupObjectType objType validateWithType ot value validateWithType :: (FromJSON a) => ObjectType a -> Value -> Avers () validateWithType ot value = case parseValueAs ot value of Left e -> throwError e Right _ -> return () lookupRelease :: ObjId -> RevId -> Avers Release lookupRelease objId revId = objectContent (ReleaseObjectId objId revId) -- | Create a new release of the given revision. If the object doesn't exist, -- it will fail with 'ObjectNotFound'. createRelease :: ObjId -> RevId -> Avers () createRelease objId revId = do -- The object must exist. objectExists <- exists objId unless objectExists $ throwError (ObjectNotFound objId) -- The release objects are not patchable at the moment. If one already -- exists, then there is nothing to do. Otherwise create the initial -- snapshot with empty content. exists' <- existsDocument snapshotsTable (toPk snapshot) unless exists' $ insertDocument snapshotsTable snapshot where objectId = ReleaseObjectId objId revId snapshot = Snapshot objectId zeroRevId emptyObject lookupLatestRelease :: ObjId -> Avers (Maybe RevId) lookupLatestRelease objId = do let match = R.lift $ "^" <> toPk objId <> "/release/" predicate :: R.Exp R.Object -> R.Exp Bool predicate x = R.Coerce (R.Match (objectFieldE "objectId" x) match) (R.lift ("bool"::Text)) oId <- runQueryDatum $ (objectFieldE "objectId" :: R.Exp R.Object -> R.Exp R.Datum) $ headE $ R.OrderBy [R.Descending "objectId"] $ R.Filter predicate snapshotsTable case oId of ReleaseObjectId objId1 revId -> do when (objId /= objId1) $ databaseError "lookupLatestRelease: objId do not match" return $ Just revId _ -> return Nothing ------------------------------------------------------------------------- -- Blobs ------------------------------------------------------------------------- createBlob :: BL.ByteString -> Text -> Avers Blob createBlob body contentType = do -- Only create the blob if it doesn't already exist. That way we can -- avoid having to upload the blob to the storage. ex <- existsBlob (blobId blob) unless ex $ do saveBlobContent blob body insertBlob blob return blob where size = fromIntegral $ BL.length body hash = BS16.encode $ SHA3.hashlazy 256 body blob = Blob (BlobId $ T.decodeUtf8 hash) size contentType objectsOfType :: ObjectType a -> Avers (Vector ObjId) objectsOfType objType = do let predicate :: R.Exp R.Object -> R.Exp Bool predicate = objectFieldEqE "type" (otType objType) res <- runQueryCollect $ R.Map mapId $ R.OrderBy [R.Descending "createdAt", R.Ascending "id"] $ R.Filter isNotDeleted $ R.Filter predicate objectsTable return $ V.map ObjId res allObjectsOfType :: ObjectType a -> Avers (Vector ObjId) allObjectsOfType objType = do let predicate :: R.Exp R.Object -> R.Exp Bool predicate = objectFieldEqE "type" (otType objType) res <- runQueryCollect $ R.Map mapId $ R.OrderBy [R.Descending "createdAt", R.Ascending "id"] $ R.Filter predicate objectsTable return $ V.map ObjId res isNotDeleted :: R.Exp R.Object -> R.Exp Bool isNotDeleted x = R.Any [ R.Not $ R.HasFields ["deleted"] x , objectFieldEqE "deleted" False x ] mapId:: R.Exp R.Object -> R.Exp Text mapId = R.GetField "id"