{-# LANGUAGE DeriveAnyClass, DeriveGeneric, ScopedTypeVariables, BangPatterns #-} {-| Module: ProjectM36.Client Client interface to local and remote Project:M36 databases. To get started, connect with 'connectProjectM36', then run some database changes with 'executeDatabaseContextExpr', and issue queries using 'executeRelationalExpr'. -} module ProjectM36.Client (ConnectionInfo(..), Connection(..), Port, Hostname, DatabaseName, ConnectionError(..), connectProjectM36, close, closeRemote_, executeRelationalExpr, executeDatabaseContextExpr, executeDatabaseContextIOExpr, executeGraphExpr, executeSchemaExpr, executeTransGraphRelationalExpr, commit, rollback, typeForRelationalExpr, inclusionDependencies, planForDatabaseContextExpr, currentSchemaName, SchemaName, HeadName, setCurrentSchemaName, transactionGraphAsRelation, relationVariablesAsRelation, headName, remoteDBLookupName, defaultServerPort, headTransactionId, defaultDatabaseName, defaultRemoteConnectionInfo, defaultHeadName, PersistenceStrategy(..), RelationalExpr, RelationalExprBase(..), DatabaseContextExpr(..), DatabaseContextIOExpr(..), Attribute(..), attributesFromList, createNodeId, createSessionAtCommit, createSessionAtHead, closeSession, addClientNode, callTestTimeout_, RelationCardinality(..), TransactionGraphOperator(..), CommitOption(..), transactionGraph_, disconnectedTransaction_, TransGraphRelationalExpr, TransactionIdLookup(..), TransactionIdHeadBacktrack(..), NodeId(..), Atom(..), Session, SessionId, NotificationCallback, emptyNotificationCallback, EvaluatedNotification(..), atomTypesAsRelation, AttributeExpr, inclusionDependencyForKey, databaseContextExprForUniqueKey, databaseContextExprForForeignKey, createScriptedAtomFunction, AttributeExprBase(..), TypeConstructor(..), TypeConstructorDef(..), DataConstructorDef(..), AttributeNames(..), RelVarName, IncDepName, InclusionDependency(..), AttributeName, RequestTimeoutException(..), RemoteProcessDiedException(..), AtomType(..), Atomable(..), TupleExprBase(..), AtomExprBase(..), RestrictionPredicateExprBase(..) ) where import ProjectM36.Base hiding (inclusionDependencies) --defined in this module as well import qualified ProjectM36.Base as B import ProjectM36.Error import ProjectM36.Atomable import ProjectM36.AtomFunction import ProjectM36.StaticOptimizer import ProjectM36.Key import qualified ProjectM36.IsomorphicSchema as Schema import Control.Monad.State import Control.Monad.Trans.Reader import qualified ProjectM36.RelationalExpression as RE import ProjectM36.DatabaseContext (basicDatabaseContext) import ProjectM36.TransactionGraph import qualified ProjectM36.Transaction as Trans import ProjectM36.TransactionGraph.Persist import ProjectM36.Attribute hiding (atomTypes) import ProjectM36.TransGraphRelationalExpression (TransGraphRelationalExpr, evalTransGraphRelationalExpr) import ProjectM36.Persist (DiskSync(..)) import ProjectM36.FileLock import ProjectM36.Notifications import ProjectM36.Server.RemoteCallTypes import qualified ProjectM36.DisconnectedTransaction as Discon import ProjectM36.Relation (typesAsRelation) import ProjectM36.ScriptSession (initScriptSession, ScriptSession) import qualified ProjectM36.Relation as R import qualified ProjectM36.DatabaseContext as DBC import Control.Exception.Base import GHC.Conc.Sync import Network.Transport (Transport(closeTransport)) import Network.Transport.TCP (createTransport, defaultTCPParameters, encodeEndPointAddress) import Control.Distributed.Process.Node (newLocalNode, initRemoteTable, runProcess, LocalNode, forkProcess, closeLocalNode) import Control.Distributed.Process.Extras.Internal.Types (whereisRemote) import Control.Distributed.Process.ManagedProcess.Client (call, safeCall) import Control.Distributed.Process (NodeId(..), reconnect) import Data.UUID.V4 (nextRandom) import Data.Word import Control.Distributed.Process (ProcessId, Process, receiveWait, send, match) import Control.Exception (IOException, handle, AsyncException, throwIO, fromException, Exception) import Control.Concurrent.MVar import qualified Data.Map as M import Control.Distributed.Process.Serializable (Serializable) import qualified STMContainers.Map as STMMap import qualified STMContainers.Set as STMSet import qualified ProjectM36.Session as Sess import ProjectM36.Session import ProjectM36.Sessions import ListT import Data.Binary (Binary) import GHC.Generics (Generic) import Control.DeepSeq (force) import System.IO type Hostname = String type Port = Word16 type DatabaseName = String -- | The type for notifications callbacks in the client. When a registered notification fires due to a changed relational expression evaluation, the server propagates the notifications to the clients in the form of the callback. type NotificationCallback = NotificationName -> EvaluatedNotification -> IO () -- | The empty notification callback ignores all callbacks. emptyNotificationCallback :: NotificationCallback emptyNotificationCallback _ _ = pure () type GhcPkgPath = String data RemoteProcessDiedException = RemoteProcessDiedException deriving (Show, Eq) instance Exception RemoteProcessDiedException data RequestTimeoutException = RequestTimeoutException deriving (Show, Eq) instance Exception RequestTimeoutException -- | Construct a 'ConnectionInfo' to describe how to make the 'Connection'. The database can be run within the current process or running remotely via distributed-process. data ConnectionInfo = InProcessConnectionInfo PersistenceStrategy NotificationCallback [GhcPkgPath]| RemoteProcessConnectionInfo DatabaseName NodeId NotificationCallback type EvaluatedNotifications = M.Map NotificationName EvaluatedNotification -- | Used for callbacks from the server when monitored changes have been made. data NotificationMessage = NotificationMessage EvaluatedNotifications deriving (Binary, Eq, Show, Generic) -- | When a notification is fired, the 'reportExpr' is evaluated in the commit's context, so that is returned along with the original notification. data EvaluatedNotification = EvaluatedNotification { notification :: Notification, reportRelation :: Either RelationalError Relation } deriving(Binary, Eq, Show, Generic) -- | Create a 'NodeId' for use in connecting to a remote server using distributed-process. createNodeId :: Hostname -> Port -> NodeId createNodeId host port = NodeId $ encodeEndPointAddress host (show port) 1 -- | Use this for connecting to remote servers on the default port. defaultServerPort :: Port defaultServerPort = 6543 -- | Use this for connecting to remote servers with the default database name. defaultDatabaseName :: DatabaseName defaultDatabaseName = "base" -- | Use this for connecting to remote servers with the default head name. defaultHeadName :: HeadName defaultHeadName = "master" -- | Create a connection configuration which connects to the localhost on the default server port and default server database name. The configured notification callback is set to ignore all events. defaultRemoteConnectionInfo :: ConnectionInfo defaultRemoteConnectionInfo = RemoteProcessConnectionInfo defaultDatabaseName (createNodeId "127.0.0.1" defaultServerPort) emptyNotificationCallback -- | The 'Connection' represents either local or remote access to a database. All operations flow through the connection. type ClientNodes = STMSet.Set ProcessId type TransactionGraphLockHandle = Handle -- internal structure specific to in-process connections data InProcessConnectionConf = InProcessConnectionConf { ipPersistenceStrategy :: PersistenceStrategy, ipClientNodes :: ClientNodes, ipSessions :: Sessions, ipTransactionGraph :: TVar TransactionGraph, ipScriptSession :: Maybe ScriptSession, ipLocalNode :: LocalNode, ipTransport :: Transport, -- we hold onto this so that we can close it gracefully ipLocks :: Maybe (TransactionGraphLockHandle, MVar LockFileHash) -- nothing when NoPersistence } data RemoteProcessConnectionConf = RemoteProcessConnectionConf { rLocalNode :: LocalNode, rProcessId :: ProcessId, --remote processId rTransport :: Transport --the TCP socket transport } data Connection = InProcessConnection InProcessConnectionConf | RemoteProcessConnection RemoteProcessConnectionConf -- | There are several reasons why a connection can fail. data ConnectionError = SetupDatabaseDirectoryError PersistenceError | IOExceptionError IOException | NoSuchDatabaseByNameError DatabaseName | LoginError deriving (Show, Eq, Generic) remoteDBLookupName :: DatabaseName -> String remoteDBLookupName = (++) "db-" createLocalNode :: IO (LocalNode, Transport) createLocalNode = do eLocalTransport <- createTransport "127.0.0.1" "0" defaultTCPParameters case eLocalTransport of Left err -> error ("failed to create transport: " ++ show err) Right localTransport -> do localNode <- newLocalNode localTransport initRemoteTable pure (localNode, localTransport) notificationListener :: NotificationCallback -> Process () notificationListener callback = do --pid <- getSelfPid --liftIO $ putStrLn $ "LISTENER THREAD START " ++ show pid _ <- forever $ do receiveWait [ match (\(NotificationMessage eNots) -> do --say $ "NOTIFICATION: " ++ show eNots -- when notifications are thrown, they are not adjusted for the current schema which could be problematic, but we don't have access to the current session here liftIO $ mapM_ (uncurry callback) (M.toList eNots) ) ] --say "LISTENER THREAD EXIT" pure () startNotificationListener :: LocalNode -> NotificationCallback -> IO (ProcessId) startNotificationListener localNode callback = forkProcess localNode (notificationListener callback) createScriptSession :: [String] -> IO (Maybe ScriptSession) createScriptSession ghcPkgPaths = do eScriptSession <- initScriptSession ghcPkgPaths case eScriptSession of Left err -> hPutStrLn stderr ("Failed to load scripting engine- scripting disabled: " ++ (show err)) >> pure Nothing --not a fatal error, but the scripting feature must be disabled Right s -> pure (Just s) -- | To create a 'Connection' to a remote or local database, create a 'ConnectionInfo' and call 'connectProjectM36'. connectProjectM36 :: ConnectionInfo -> IO (Either ConnectionError Connection) --create a new in-memory database/transaction graph connectProjectM36 (InProcessConnectionInfo strat notificationCallback ghcPkgPaths) = do freshId <- nextRandom let bootstrapContext = basicDatabaseContext freshGraph = bootstrapTransactionGraph freshId bootstrapContext case strat of --create date examples graph for now- probably should be empty context in the future NoPersistence -> do graphTvar <- newTVarIO freshGraph clientNodes <- STMSet.newIO sessions <- STMMap.newIO (localNode, transport) <- createLocalNode notificationPid <- startNotificationListener localNode notificationCallback mScriptSession <- createScriptSession ghcPkgPaths let conn = InProcessConnection (InProcessConnectionConf { ipPersistenceStrategy = strat, ipClientNodes = clientNodes, ipSessions = sessions, ipTransactionGraph = graphTvar, ipScriptSession = mScriptSession, ipLocalNode = localNode, ipTransport = transport, ipLocks = Nothing}) addClientNode conn notificationPid pure (Right conn) MinimalPersistence dbdir -> connectPersistentProjectM36 strat NoDiskSync dbdir freshGraph notificationCallback ghcPkgPaths CrashSafePersistence dbdir -> connectPersistentProjectM36 strat FsyncDiskSync dbdir freshGraph notificationCallback ghcPkgPaths connectProjectM36 (RemoteProcessConnectionInfo databaseName serverNodeId notificationCallback) = do connStatus <- newEmptyMVar (localNode, transport) <- createLocalNode let dbName = remoteDBLookupName databaseName --putStrLn $ "Connecting to " ++ show serverNodeId ++ " " ++ dbName notificationListenerPid <- startNotificationListener localNode notificationCallback runProcess localNode $ do --even a failed connection retains an open socket! mServerProcessId <- whereisRemote serverNodeId dbName case mServerProcessId of Nothing -> liftIO $ putMVar connStatus $ Left (NoSuchDatabaseByNameError databaseName) Just serverProcessId -> do loginConfirmation <- safeLogin (Login notificationListenerPid) serverProcessId if not loginConfirmation then liftIO $ putMVar connStatus (Left LoginError) else do liftIO $ putMVar connStatus (Right $ RemoteProcessConnection (RemoteProcessConnectionConf {rLocalNode = localNode, rProcessId = serverProcessId, rTransport = transport})) status <- takeMVar connStatus pure status connectPersistentProjectM36 :: PersistenceStrategy -> DiskSync -> FilePath -> TransactionGraph -> NotificationCallback -> [GhcPkgPath] -> IO (Either ConnectionError Connection) connectPersistentProjectM36 strat sync dbdir freshGraph notificationCallback ghcPkgPaths = do err <- setupDatabaseDir sync dbdir freshGraph case err of Left err' -> return $ Left (SetupDatabaseDirectoryError err') Right (lockFileH, digest) -> do mScriptSession <- createScriptSession ghcPkgPaths graph <- transactionGraphLoad dbdir emptyTransactionGraph mScriptSession case graph of Left err' -> return $ Left (SetupDatabaseDirectoryError err') Right graph' -> do tvarGraph <- newTVarIO graph' sessions <- STMMap.newIO clientNodes <- STMSet.newIO (localNode, transport) <- createLocalNode lockMVar <- newMVar digest let conn = InProcessConnection (InProcessConnectionConf { ipPersistenceStrategy = strat, ipClientNodes = clientNodes, ipSessions = sessions, ipTransactionGraph = tvarGraph, ipScriptSession = mScriptSession, ipLocalNode = localNode, ipTransport = transport, ipLocks = Just (lockFileH, lockMVar) }) notificationPid <- startNotificationListener localNode notificationCallback addClientNode conn notificationPid pure (Right conn) -- | Create a new session at the transaction id and return the session's Id. createSessionAtCommit :: TransactionId -> Connection -> IO (Either RelationalError SessionId) createSessionAtCommit commitId conn@(InProcessConnection _) = do newSessionId <- nextRandom atomically $ do createSessionAtCommit_ commitId newSessionId conn createSessionAtCommit uuid conn@(RemoteProcessConnection _) = remoteCall conn (CreateSessionAtCommit uuid) createSessionAtCommit_ :: TransactionId -> SessionId -> Connection -> STM (Either RelationalError SessionId) createSessionAtCommit_ commitId newSessionId (InProcessConnection conf) = do let sessions = ipSessions conf graphTvar = ipTransactionGraph conf graph <- readTVar graphTvar case transactionForId commitId graph of Left err -> pure (Left err) Right transaction -> do let freshDiscon = DisconnectedTransaction commitId (Trans.schemas transaction) False keyDuplication <- STMMap.lookup newSessionId sessions case keyDuplication of Just _ -> pure $ Left (SessionIdInUseError newSessionId) Nothing -> do STMMap.insert (Session freshDiscon defaultSchemaName) newSessionId sessions pure $ Right newSessionId createSessionAtCommit_ _ _ (RemoteProcessConnection _) = error "createSessionAtCommit_ called on remote connection" -- | Call 'createSessionAtHead' with a transaction graph's head's name to create a new session pinned to that head. This function returns a 'SessionId' which can be used in other function calls to reference the point in the transaction graph. createSessionAtHead :: HeadName -> Connection -> IO (Either RelationalError SessionId) createSessionAtHead headn conn@(InProcessConnection conf) = do let graphTvar = ipTransactionGraph conf newSessionId <- nextRandom atomically $ do graph <- readTVar graphTvar case transactionForHead headn graph of Nothing -> pure $ Left (NoSuchHeadNameError headn) Just trans -> createSessionAtCommit_ (transactionId trans) newSessionId conn createSessionAtHead headn conn@(RemoteProcessConnection _) = remoteCall conn (CreateSessionAtHead headn) -- | Used internally for server connections to keep track of remote nodes for the purpose of sending notifications later. addClientNode :: Connection -> ProcessId -> IO () addClientNode (RemoteProcessConnection _) _ = error "addClientNode called on remote connection" addClientNode (InProcessConnection conf) newProcessId = atomically (STMSet.insert newProcessId (ipClientNodes conf)) -- | Discards a session, eliminating any uncommitted changes present in the session. closeSession :: SessionId -> Connection -> IO () closeSession sessionId (InProcessConnection conf) = do atomically $ STMMap.delete sessionId (ipSessions conf) closeSession sessionId conn@(RemoteProcessConnection _) = remoteCall conn (CloseSession sessionId) -- | 'close' cleans up the database access connection and closes any relevant sockets. close :: Connection -> IO () close (InProcessConnection conf) = do atomically $ do let sessions = ipSessions conf STMMap.deleteAll sessions pure () closeLocalNode (ipLocalNode conf) closeTransport (ipTransport conf) close conn@(RemoteProcessConnection conf) = do _ <- (remoteCall conn Logout) :: IO Bool closeLocalNode (rLocalNode conf) closeTransport (rTransport conf) --used only by the server EntryPoints closeRemote_ :: Connection -> IO () closeRemote_ (InProcessConnection _) = error "invalid call of closeRemote_ on InProcessConnection" closeRemote_ (RemoteProcessConnection conf) = runProcess (rLocalNode conf) (reconnect (rProcessId conf)) --we need to actually close the localNode's connection to the remote --within the database server, we must catch and handle all exception lest they take down the database process- this handling might be different for other use-cases --exceptions should generally *NOT* be thrown from any Project:M36 code paths, but third-party code such as AtomFunction scripts could conceivably throw undefined, etc. excMaybe :: IO (Maybe RelationalError) -> IO (Maybe RelationalError) excMaybe m = handle handler m where handler exc | Just (_ :: AsyncException) <- fromException exc = throwIO exc | otherwise = pure (Just (UnhandledExceptionError (show exc))) excEither :: IO (Either RelationalError a) -> IO (Either RelationalError a) excEither m = handle handler m where handler exc | Just (_ :: AsyncException) <- fromException exc = throwIO exc | otherwise = pure (Left (UnhandledExceptionError (show exc))) runProcessResult :: LocalNode -> Process a -> IO a runProcessResult localNode proc = do ret <- newEmptyMVar runProcess localNode $ do val <- proc liftIO $ putMVar ret val takeMVar ret safeLogin :: Login -> ProcessId -> Process (Bool) safeLogin login procId = do ret <- call procId login case ret of Left (_ :: ServerError) -> pure False Right val -> pure val remoteCall :: (Serializable a, Serializable b) => Connection -> a -> IO b remoteCall (InProcessConnection _ ) _ = error "remoteCall called on local connection" remoteCall (RemoteProcessConnection conf) arg = runProcessResult localNode $ do ret <- safeCall serverProcessId arg case ret of Left err -> error ("server died: " ++ show err) Right ret' -> case ret' of Left RequestTimeoutError -> liftIO (throwIO RequestTimeoutException) Left (ProcessDiedError _) -> liftIO (throwIO RemoteProcessDiedException) Right val -> pure val where localNode = rLocalNode conf serverProcessId = rProcessId conf sessionForSessionId :: SessionId -> Sessions -> STM (Either RelationalError Session) sessionForSessionId sessionId sessions = do maybeSession <- STMMap.lookup sessionId sessions pure $ maybe (Left $ NoSuchSessionError sessionId) Right maybeSession schemaForSessionId :: Session -> STM (Either RelationalError Schema) schemaForSessionId session = do let sname = schemaName session if sname == defaultSchemaName then pure (Right (Schema [])) -- the main schema includes no transformations (but neither do empty schemas :/ ) else case M.lookup sname (subschemas session) of Nothing -> pure (Left (SubschemaNameNotInUseError sname)) Just schema -> pure (Right schema) sessionAndSchema :: SessionId -> Sessions -> STM (Either RelationalError (Session, Schema)) sessionAndSchema sessionId sessions = do eSession <- sessionForSessionId sessionId sessions case eSession of Left err -> pure (Left err) Right session -> do eSchema <- schemaForSessionId session case eSchema of Left err -> pure (Left err) Right schema -> pure (Right (session, schema)) -- | Returns the name of the currently selected isomorphic schema. currentSchemaName :: SessionId -> Connection -> IO (Maybe SchemaName) currentSchemaName sessionId (InProcessConnection conf) = atomically $ do let sessions = ipSessions conf eSession <- sessionForSessionId sessionId sessions case eSession of Left _ -> pure Nothing Right session -> pure (Just (Sess.schemaName session)) currentSchemaName sessionId conn@(RemoteProcessConnection _) = remoteCall conn (RetrieveCurrentSchemaName sessionId) -- | Switch to the named isomorphic schema. setCurrentSchemaName :: SessionId -> Connection -> SchemaName -> IO (Maybe RelationalError) setCurrentSchemaName sessionId (InProcessConnection conf) sname = atomically $ do let sessions = ipSessions conf eSession <- sessionForSessionId sessionId sessions case eSession of Left _ -> pure Nothing Right session -> case Sess.setSchemaName sname session of Left err -> pure (Just err) Right newSession -> STMMap.insert newSession sessionId sessions >> pure Nothing setCurrentSchemaName sessionId conn@(RemoteProcessConnection _) sname = remoteCall conn (ExecuteSetCurrentSchema sessionId sname) -- | Execute a relational expression in the context of the session and connection. Relational expressions are queries and therefore cannot alter the database. executeRelationalExpr :: SessionId -> Connection -> RelationalExpr -> IO (Either RelationalError Relation) executeRelationalExpr sessionId (InProcessConnection conf) expr = excEither $ atomically $ do let sessions = ipSessions conf eSession <- sessionAndSchema sessionId sessions case eSession of Left err -> pure $ Left err Right (session, schema) -> do let expr' = if schemaName session /= defaultSchemaName then Schema.processRelationalExprInSchema schema expr else Right expr case expr' of Left err -> pure (Left err) Right expr'' -> case runReader (RE.evalRelationalExpr expr'') (RE.mkRelationalExprState (Sess.concreteDatabaseContext session)) of Left err -> pure (Left err) Right rel -> pure (force (Right rel)) -- this is necessary so that any undefined/error exceptions are spit out here executeRelationalExpr sessionId conn@(RemoteProcessConnection _) relExpr = remoteCall conn (ExecuteRelationalExpr sessionId relExpr) -- | Execute a database context expression in the context of the session and connection. Database expressions modify the current session's disconnected transaction but cannot modify the transaction graph. executeDatabaseContextExpr :: SessionId -> Connection -> DatabaseContextExpr -> IO (Maybe RelationalError) executeDatabaseContextExpr sessionId (InProcessConnection conf) expr = excMaybe $ atomically $ do let sessions = ipSessions conf eSession <- sessionAndSchema sessionId sessions case eSession of Left err -> pure $ Just err Right (session, schema) -> do let expr' = if schemaName session == defaultSchemaName then Right expr else Schema.processDatabaseContextExprInSchema schema expr case expr' of Left err -> pure (Just err) Right expr'' -> case runState (RE.evalDatabaseContextExpr expr'') (RE.freshDatabaseState (Sess.concreteDatabaseContext session)) of (Just err,_) -> return $ Just err (Nothing, (_,_,False)) -> pure Nothing --optimization- if nothing was dirtied, nothing to do (Nothing, (!context',_,True)) -> do let newDiscon = DisconnectedTransaction (Sess.parentId session) newSchemas True newSubschemas = Schema.processDatabaseContextExprSchemasUpdate (Sess.subschemas session) expr newSchemas = Schemas context' newSubschemas newSession = Session newDiscon (Sess.schemaName session) STMMap.insert newSession sessionId sessions pure Nothing executeDatabaseContextExpr sessionId conn@(RemoteProcessConnection _) dbExpr = remoteCall conn (ExecuteDatabaseContextExpr sessionId dbExpr) -- | Execute a database context IO-monad-based expression for the given session and connection. `DatabaseContextIOExpr`s modify the DatabaseContext but cannot be purely implemented. --this is almost completely identical to executeDatabaseContextExpr above executeDatabaseContextIOExpr :: SessionId -> Connection -> DatabaseContextIOExpr -> IO (Maybe RelationalError) executeDatabaseContextIOExpr sessionId (InProcessConnection conf) expr = excMaybe $ do let sessions = ipSessions conf scriptSession = ipScriptSession conf eSession <- atomically $ sessionForSessionId sessionId sessions --potentially race condition due to interleaved IO? case eSession of Left err -> pure $ Just err Right session -> do res <- RE.evalDatabaseContextIOExpr scriptSession (Sess.concreteDatabaseContext session) expr case res of Left err -> pure (Just err) Right context' -> do let newDiscon = DisconnectedTransaction (Sess.parentId session) newSchemas True newSchemas = Schemas context' (Sess.subschemas session) newSession = Session newDiscon (Sess.schemaName session) atomically $ STMMap.insert newSession sessionId sessions pure Nothing executeDatabaseContextIOExpr sessionId conn@(RemoteProcessConnection _) dbExpr = remoteCall conn (ExecuteDatabaseContextIOExpr sessionId dbExpr) executeGraphExprSTM_ :: Bool -> TransactionId -> SessionId -> Session -> Sessions -> TransactionGraphOperator -> TransactionGraph -> TVar TransactionGraph -> STM (Either RelationalError TransactionGraph) executeGraphExprSTM_ updateGraphOnError freshId sessionId session sessions graphExpr graph graphTVar= do case evalGraphOp freshId (Sess.disconnectedTransaction session) graph graphExpr of Left err -> do when updateGraphOnError (writeTVar graphTVar graph) pure $ Left err Right (discon', graph') -> do writeTVar graphTVar graph' let newSession = Session discon' (Sess.schemaName session) STMMap.insert newSession sessionId sessions pure $ Right graph' -- process notifications for commits executeCommitExprSTM_ :: DatabaseContext -> DatabaseContext -> ClientNodes -> STM (EvaluatedNotifications, ClientNodes) executeCommitExprSTM_ oldContext newContext nodes = do let nots = notifications oldContext fireNots = notificationChanges nots oldContext newContext evaldNots = M.map mkEvaldNot fireNots mkEvaldNot notif = EvaluatedNotification { notification = notif, reportRelation = runReader (RE.evalRelationalExpr (reportExpr notif)) (RE.mkRelationalExprState oldContext) } pure (evaldNots, nodes) -- | Execute a transaction graph expression in the context of the session and connection. Transaction graph operators modify the transaction graph state. -- OPTIMIZATION OPPORTUNITY: no locks are required to write new transaction data, only to update the transaction graph id file -- if writing data is re-entrant, we may be able to use unsafeIOtoSTM -- perhaps keep hash of data file instead of checking if our head was updated on every write executeGraphExpr :: SessionId -> Connection -> TransactionGraphOperator -> IO (Maybe RelationalError) executeGraphExpr sessionId (InProcessConnection conf) graphExpr = excMaybe $ do let strat = ipPersistenceStrategy conf clientNodes = ipClientNodes conf sessions = ipSessions conf graphTvar = ipTransactionGraph conf mLockFileH = ipLocks conf lockHandler body = case graphExpr of Commit _ -> case mLockFileH of Nothing -> body False Just (lockFileH, lockMVar) -> let acquireLocks = do lastWrittenDigest <- takeMVar lockMVar lockFile lockFileH WriteLock latestDigest <- readGraphTransactionIdDigest strat pure (latestDigest /= lastWrittenDigest) releaseLocks _ = do --still holding the lock- get the latest digest gDigest <- readGraphTransactionIdDigest strat unlockFile lockFileH putMVar lockMVar gDigest in bracket acquireLocks releaseLocks body _ -> body False freshId <- nextRandom lockHandler $ \dbWrittenByOtherProcess -> do --if the database file has been updated since we wrote it last, load it before trying to sync our version done- this can result in TransactionNotAHeadErrors --read transaction data and compare to existing graph --in the future, we can detect if updated transaction graph can be safely merged (such as with a transaction on a separate head) (rebase-able commits should force the user to rebase from the client to confirm that the action makes sense) manip <- atomically $ do eSession <- sessionForSessionId sessionId sessions --handle graph update by other process oldGraph <- readTVar graphTvar case eSession of Left err -> pure (Left err) Right session -> do let mScriptSession = ipScriptSession conf dbdir = case strat of MinimalPersistence x -> x CrashSafePersistence x -> x _ -> error "accessing dbdir on non-persisted connection" eRefreshedGraph <- if dbWrittenByOtherProcess then unsafeIOToSTM (transactionGraphLoad dbdir oldGraph mScriptSession) else pure (Right oldGraph) case eRefreshedGraph of Left err -> pure (Left (DatabaseLoadError err)) Right refreshedGraph -> do if not (isDirty session) && graphExpr == Commit IgnoreEmptyCommitOption then pure (Right (M.empty, [], oldGraph)) else do eGraph <- executeGraphExprSTM_ dbWrittenByOtherProcess freshId sessionId session sessions graphExpr refreshedGraph graphTvar case eGraph of Left err -> pure (Left err) Right newGraph -> do --handle commit if not (isDirty session) && graphExpr == Commit ForbidEmptyCommitOption then pure (Left EmptyCommitError) else if not (isDirty session) && graphExpr == Commit IgnoreEmptyCommitOption then pure (Right (M.empty, [], newGraph)) else if isCommit graphExpr then do case transactionForId (Sess.parentId session) oldGraph of Left err -> pure $ Left err Right previousTrans -> do (evaldNots, nodes) <- executeCommitExprSTM_ (Trans.concreteDatabaseContext previousTrans) (Sess.concreteDatabaseContext session) clientNodes nodesToNotify <- toList (STMSet.stream nodes) pure $ Right (evaldNots, nodesToNotify, newGraph) else pure $ Right (M.empty, [], newGraph) case manip of Left err -> return $ Just err Right (notsToFire, nodesToNotify, newGraph) -> do --update filesystem database, if necessary processTransactionGraphPersistence strat newGraph sendNotifications nodesToNotify (ipLocalNode conf) notsToFire pure Nothing executeGraphExpr sessionId conn@(RemoteProcessConnection _) graphExpr = remoteCall conn (ExecuteGraphExpr sessionId graphExpr) -- | A trans-graph expression is a relational query executed against the entirety of a transaction graph. executeTransGraphRelationalExpr :: SessionId -> Connection -> TransGraphRelationalExpr -> IO (Either RelationalError Relation) executeTransGraphRelationalExpr _ (InProcessConnection conf) tgraphExpr = excEither . atomically $ do let graphTvar = ipTransactionGraph conf graph <- readTVar graphTvar case evalTransGraphRelationalExpr tgraphExpr graph of Left err -> pure (Left err) Right relExpr -> case runReader (RE.evalRelationalExpr relExpr) (RE.mkRelationalExprState DBC.empty) of Left err -> pure (Left err) Right rel -> pure (force (Right rel)) executeTransGraphRelationalExpr sessionId conn@(RemoteProcessConnection _) tgraphExpr = remoteCall conn (ExecuteTransGraphRelationalExpr sessionId tgraphExpr) -- | Schema expressions manipulate the isomorphic schemas for the current 'DatabaseContext'. executeSchemaExpr :: SessionId -> Connection -> Schema.SchemaExpr -> IO (Maybe RelationalError) executeSchemaExpr sessionId (InProcessConnection conf) schemaExpr = atomically $ do let sessions = ipSessions conf eSession <- sessionAndSchema sessionId sessions case eSession of Left err -> pure (Just err) Right (session, _) -> do let subschemas' = subschemas session case Schema.evalSchemaExpr schemaExpr (Sess.concreteDatabaseContext session) subschemas' of Left err -> pure (Just err) Right (newSubschemas, newContext) -> do --hm- maybe we should start using lenses let discon = Sess.disconnectedTransaction session newSchemas = Schemas newContext newSubschemas newSession = Session (DisconnectedTransaction (Discon.parentId discon) newSchemas True) (Sess.schemaName session) STMMap.insert newSession sessionId sessions pure Nothing executeSchemaExpr sessionId conn@(RemoteProcessConnection _) schemaExpr = remoteCall conn (ExecuteSchemaExpr sessionId schemaExpr) -- | After modifying a 'DatabaseContext', 'commit' the transaction to the transaction graph at the head which the session is referencing. This will also trigger checks for any notifications which need to be propagated. commit :: SessionId -> Connection -> CommitOption -> IO (Maybe RelationalError) commit sessionId conn@(InProcessConnection _) cOpt = executeGraphExpr sessionId conn (Commit cOpt) commit sessionId conn@(RemoteProcessConnection _) cOpt = remoteCall conn (ExecuteGraphExpr sessionId (Commit cOpt)) sendNotifications :: [ProcessId] -> LocalNode -> EvaluatedNotifications -> IO () sendNotifications pids localNode nots = mapM_ sendNots pids where sendNots remoteClientPid = do when (not (M.null nots)) $ runProcess localNode $ send remoteClientPid (NotificationMessage nots) -- | Discard any changes made in the current 'Session' and 'DatabaseContext'. This resets the disconnected transaction to reference the original database context of the parent transaction and is a very cheap operation. rollback :: SessionId -> Connection -> IO (Maybe RelationalError) rollback sessionId conn@(InProcessConnection _) = executeGraphExpr sessionId conn Rollback rollback sessionId conn@(RemoteProcessConnection _) = remoteCall conn (ExecuteGraphExpr sessionId Rollback) -- | Write the transaction graph to disk. This function can be used to incrementally write new transactions to disk. processTransactionGraphPersistence :: PersistenceStrategy -> TransactionGraph -> IO () processTransactionGraphPersistence NoPersistence _ = pure () processTransactionGraphPersistence (MinimalPersistence dbdir) graph = transactionGraphPersist NoDiskSync dbdir graph >> pure () processTransactionGraphPersistence (CrashSafePersistence dbdir) graph = transactionGraphPersist FsyncDiskSync dbdir graph >> pure () readGraphTransactionIdDigest :: PersistenceStrategy -> IO (LockFileHash) readGraphTransactionIdDigest NoPersistence = error "attempt to read digest from transaction log without persistence enabled" readGraphTransactionIdDigest (MinimalPersistence dbdir) = readGraphTransactionIdFileDigest dbdir readGraphTransactionIdDigest (CrashSafePersistence dbdir) = readGraphTransactionIdFileDigest dbdir -- | Return a relation whose type would match that of the relational expression if it were executed. This is useful for checking types and validating a relational expression's types. typeForRelationalExpr :: SessionId -> Connection -> RelationalExpr -> IO (Either RelationalError Relation) typeForRelationalExpr sessionId conn@(InProcessConnection _) relExpr = atomically $ typeForRelationalExprSTM sessionId conn relExpr typeForRelationalExpr sessionId conn@(RemoteProcessConnection _) relExpr = remoteCall conn (ExecuteTypeForRelationalExpr sessionId relExpr) typeForRelationalExprSTM :: SessionId -> Connection -> RelationalExpr -> STM (Either RelationalError Relation) typeForRelationalExprSTM sessionId (InProcessConnection conf) relExpr = do let sessions = ipSessions conf eSession <- sessionAndSchema sessionId sessions case eSession of Left err -> pure $ Left err Right (session, schema) -> do let processed = if schemaName session == defaultSchemaName then Right relExpr else Schema.processRelationalExprInSchema schema relExpr case processed of Left err -> pure (Left err) Right relExpr' -> pure $ runReader (RE.typeForRelationalExpr relExpr') (RE.mkRelationalExprState (Sess.concreteDatabaseContext session)) typeForRelationalExprSTM _ _ _ = error "typeForRelationalExprSTM called on non-local connection" -- | Return a 'Map' of the database's constraints at the context of the session and connection. inclusionDependencies :: SessionId -> Connection -> IO (Either RelationalError InclusionDependencies) inclusionDependencies sessionId (InProcessConnection conf) = do let sessions = ipSessions conf atomically $ do eSession <- sessionAndSchema sessionId sessions case eSession of Left err -> pure $ Left err Right (session, schema) -> do let context = Sess.concreteDatabaseContext session if schemaName session == defaultSchemaName then pure $ Right (B.inclusionDependencies context) else pure (Schema.inclusionDependenciesInSchema schema (B.inclusionDependencies context)) inclusionDependencies sessionId conn@(RemoteProcessConnection _) = remoteCall conn (RetrieveInclusionDependencies sessionId) -- | Return an optimized database expression which is logically equivalent to the input database expression. This function can be used to determine which expression will actually be evaluated. planForDatabaseContextExpr :: SessionId -> Connection -> DatabaseContextExpr -> IO (Either RelationalError DatabaseContextExpr) planForDatabaseContextExpr sessionId (InProcessConnection conf) dbExpr = do let sessions = ipSessions conf atomically $ do eSession <- sessionAndSchema sessionId sessions case eSession of Left err -> pure $ Left err Right (session, _) -> if schemaName session == defaultSchemaName then pure $ evalState (applyStaticDatabaseOptimization dbExpr) (RE.freshDatabaseState (Sess.concreteDatabaseContext session)) else -- don't show any optimization because the current optimization infrastructure relies on access to the base context- this probably underscores the need for each schema to have its own DatabaseContext, even if it is generated on-the-fly pure (Right dbExpr) planForDatabaseContextExpr sessionId conn@(RemoteProcessConnection _) dbExpr = remoteCall conn (RetrievePlanForDatabaseContextExpr sessionId dbExpr) -- | Return a relation which represents the current state of the global transaction graph. The attributes are -- * current- boolean attribute representing whether or not the current session references this transaction -- * head- text attribute which is a non-empty 'HeadName' iff the transaction references a head. -- * id- id attribute of the transaction -- * parents- a relation-valued attribute which contains a relation of transaction ids which are parent transaction to the transaction transactionGraphAsRelation :: SessionId -> Connection -> IO (Either RelationalError Relation) transactionGraphAsRelation sessionId (InProcessConnection conf) = do let sessions = ipSessions conf tvar = ipTransactionGraph conf atomically $ do eSession <- sessionForSessionId sessionId sessions case eSession of Left err -> pure $ Left err Right session -> do graph <- readTVar tvar pure $ graphAsRelation (Sess.disconnectedTransaction session) graph transactionGraphAsRelation sessionId conn@(RemoteProcessConnection _) = remoteCall conn (RetrieveTransactionGraph sessionId) -- | Returns the names and types of the relation variables in the current 'Session'. relationVariablesAsRelation :: SessionId -> Connection -> IO (Either RelationalError Relation) relationVariablesAsRelation sessionId (InProcessConnection conf) = do let sessions = ipSessions conf atomically $ do eSession <- sessionAndSchema sessionId sessions case eSession of Left err -> pure (Left err) Right (session, schema) -> do let context = Sess.concreteDatabaseContext session if Sess.schemaName session == defaultSchemaName then pure $ R.relationVariablesAsRelation (relationVariables context) else case Schema.relationVariablesInSchema schema context of Left err -> pure (Left err) Right relvars -> pure $ R.relationVariablesAsRelation relvars relationVariablesAsRelation sessionId conn@(RemoteProcessConnection _) = remoteCall conn (RetrieveRelationVariableSummary sessionId) -- | Returns the transaction id for the connection's disconnected transaction committed parent transaction. headTransactionId :: SessionId -> Connection -> IO (Maybe TransactionId) headTransactionId sessionId (InProcessConnection conf) = do let sessions = ipSessions conf atomically $ do eSession <- sessionForSessionId sessionId sessions case eSession of Left _ -> pure Nothing Right session -> pure $ Just (Sess.parentId session) headTransactionId sessionId conn@(RemoteProcessConnection _) = remoteCall conn (RetrieveHeadTransactionId sessionId) headNameSTM_ :: SessionId -> Sessions -> TVar TransactionGraph -> STM (Maybe HeadName) headNameSTM_ sessionId sessions graphTvar = do graph <- readTVar graphTvar eSession <- sessionForSessionId sessionId sessions case eSession of Left _ -> pure $ Nothing Right session -> pure $ case transactionForId (Sess.parentId session) graph of Left _ -> Nothing Right parentTrans -> headNameForTransaction parentTrans graph -- | Returns Just the name of the head of the current disconnected transaction or Nothing. headName :: SessionId -> Connection -> IO (Maybe HeadName) headName sessionId (InProcessConnection conf) = do let sessions = ipSessions conf graphTvar = ipTransactionGraph conf atomically (headNameSTM_ sessionId sessions graphTvar) headName sessionId conn@(RemoteProcessConnection _) = remoteCall conn (ExecuteHeadName sessionId) -- | Returns a listing of all available atom types. atomTypesAsRelation :: SessionId -> Connection -> IO (Either RelationalError Relation) atomTypesAsRelation sessionId (InProcessConnection conf) = do let sessions = ipSessions conf atomically $ do eSession <- sessionForSessionId sessionId sessions case eSession of Left err -> pure (Left err) Right session -> do case typesAsRelation (typeConstructorMapping (Sess.concreteDatabaseContext session)) of Left err -> pure (Left err) Right rel -> pure (Right rel) atomTypesAsRelation sessionId conn@(RemoteProcessConnection _) = remoteCall conn (RetrieveAtomTypesAsRelation sessionId) --used only for testing- we expect this to throw an exception callTestTimeout_ :: SessionId -> Connection -> IO Bool callTestTimeout_ _ (InProcessConnection _) = error "bad testing call" callTestTimeout_ sessionId conn@(RemoteProcessConnection _) = remoteCall conn (TestTimeout sessionId) --used in tests only transactionGraph_ :: Connection -> IO TransactionGraph transactionGraph_ (InProcessConnection conf) = atomically $ readTVar (ipTransactionGraph conf) transactionGraph_ _ = error "remote connection used" --used in tests only disconnectedTransaction_ :: SessionId -> Connection -> IO DisconnectedTransaction disconnectedTransaction_ sessionId (InProcessConnection conf) = do let sessions = ipSessions conf mSession <- atomically $ do STMMap.lookup sessionId sessions case mSession of Nothing -> error "No such session" Just (Sess.Session discon _) -> pure discon disconnectedTransaction_ _ _= error "remote connection used"