{-# LANGUAGE DeriveGeneric, ScopedTypeVariables, MonoLocalBinds, DerivingVia #-}
{-|
Module: ProjectM36.Client

Client interface to local and remote Project:M36 databases. To get started, connect with 'connectProjectM36', then run some database changes with 'executeDatabaseContextExpr', and issue queries using 'executeRelationalExpr'.
-}
module ProjectM36.Client
       (ConnectionInfo(..),
       Connection(..),
       Port,
       Hostname,
       ServiceName,
       DatabaseName,
       ConnectionError(..),
       connectProjectM36,
       close,
       closeRemote_,
       executeRelationalExpr,
       executeDatabaseContextExpr,
       executeDatabaseContextIOExpr,
       executeDataFrameExpr,
       executeGraphExpr,
       executeSchemaExpr,
       executeTransGraphRelationalExpr,
       commit,
       rollback,
       typeForRelationalExpr,
       inclusionDependencies,
       ProjectM36.Client.typeConstructorMapping,
       ProjectM36.Client.databaseContextFunctionsAsRelation,      
       planForDatabaseContextExpr,
       currentSchemaName,
       SchemaName,
       HeadName,
       setCurrentSchemaName,
       transactionGraphAsRelation,
       relationVariablesAsRelation,
       registeredQueriesAsRelation,
       ddlAsRelation,
       ProjectM36.Client.atomFunctionsAsRelation,
       disconnectedTransactionIsDirty,
       headName,
       remoteDBLookupName,
       defaultServerPort,
       headTransactionId,
       defaultDatabaseName,
       defaultRemoteConnectionInfo,
       defaultHeadName,
       addClientNode,
       getDDLHash,
       PersistenceStrategy(..),
       RelationalExpr,
       RelationalExprBase(..),
       DatabaseContextExprBase(..),
       DatabaseContextExpr,
       DatabaseContextIOExprBase(..),
       DatabaseContextIOExpr,
       Attribute(..),
       MergeStrategy(..),
       attributesFromList,
       createSessionAtCommit,
       createSessionAtHead,
       closeSession,
       callTestTimeout_,
       RelationCardinality(..),
       TransactionGraphOperator(..),
       ProjectM36.Client.autoMergeToHead,
       transactionGraph_,
       disconnectedTransaction_,
       TransGraphRelationalExpr,
       TransactionIdLookup(..),
       TransactionIdHeadBacktrack(..),
       Atom(..),
       Session,
       SessionId,
       NotificationCallback,
       emptyNotificationCallback,
       EvaluatedNotification(..),
       atomTypesAsRelation,
       AttributeExpr,
       inclusionDependencyForKey,
       databaseContextExprForUniqueKey,
       databaseContextExprForForeignKey,
       createScriptedAtomFunction,
       ProjectM36.Client.validateMerkleHashes,
       AttributeExprBase(..),
       TypeConstructorBase(..),
       TypeConstructorDef(..),
       DataConstructorDef(..),
       AttributeNamesBase(..),
       RelVarName,
       IncDepName,
       InclusionDependency(..),
       AttributeName,
       DF.DataFrame,
       DF.DataFrameExpr,
       DF.AttributeOrderExpr,
       DF.Order(..),
       RelationalError(..),
       RequestTimeoutException(..),
       RemoteProcessDiedException(..),
       AtomType(..),
       Atomable(..),
       TupleExprBase(..),
       TupleExprsBase(..),
       AtomExprBase(..),
       RestrictionPredicateExprBase(..),
       withTransaction
       ) where
import ProjectM36.Base hiding (inclusionDependencies) --defined in this module as well
import qualified ProjectM36.Base as B
import ProjectM36.Serialise.Error ()
import ProjectM36.Error
import ProjectM36.Atomable
import ProjectM36.AtomFunction as AF
import ProjectM36.StaticOptimizer
import ProjectM36.Key
import qualified ProjectM36.DataFrame as DF
import ProjectM36.DatabaseContextFunction as DCF
import qualified ProjectM36.IsomorphicSchema as Schema
import Control.Monad.State
import qualified ProjectM36.RelationalExpression as RE
import ProjectM36.DatabaseContext (basicDatabaseContext)
import qualified ProjectM36.TransactionGraph as Graph
import ProjectM36.TransactionGraph as TG
import qualified ProjectM36.Transaction as Trans
import ProjectM36.TransactionGraph.Persist
import ProjectM36.Attribute
import ProjectM36.TransGraphRelationalExpression as TGRE (TransGraphRelationalExpr)
import ProjectM36.Persist (DiskSync(..))
import ProjectM36.FileLock
import ProjectM36.DDLType
import ProjectM36.NormalizeExpr
import ProjectM36.Notifications
import ProjectM36.Server.RemoteCallTypes
import qualified ProjectM36.DisconnectedTransaction as Discon
import ProjectM36.Relation (typesAsRelation)
import ProjectM36.ScriptSession (initScriptSession, ScriptSession)
import qualified ProjectM36.Relation as R
import Control.Exception.Base
import Control.Concurrent.STM
import Control.Concurrent.Async

import Data.Either (isRight)
import Data.UUID.V4 (nextRandom)
import Data.Word
import Data.Hashable
import Control.Concurrent.MVar
import Codec.Winery hiding (Schema, schema)
import qualified Data.Map as M
#if MIN_VERSION_stm_containers(1,0,0)
import qualified StmContainers.Map as StmMap
import qualified StmContainers.Set as StmSet
#else
import qualified STMContainers.Map as StmMap
import qualified STMContainers.Set as StmSet
#endif
import qualified ProjectM36.Session as Sess
import ProjectM36.Session
import ProjectM36.Sessions
import ProjectM36.HashSecurely (SecureHash)
import ProjectM36.RegisteredQuery
import GHC.Generics (Generic)
import Control.DeepSeq (force)
import System.IO
import Data.Time.Clock
import qualified Network.RPC.Curryer.Client as RPC
import qualified Network.RPC.Curryer.Server as RPC
import Network.Socket (Socket, AddrInfo(..), getAddrInfo, defaultHints, AddrInfoFlag(..), SocketType(..), ServiceName, hostAddressToTuple, SockAddr(..))
import GHC.Conc (unsafeIOToSTM)

type Hostname = String

type Port = Word16

-- | The type for notifications callbacks in the client. When a registered notification fires due to a changed relational expression evaluation, the server propagates the notifications to the clients in the form of the callback.
type NotificationCallback = NotificationName -> EvaluatedNotification -> IO ()

-- | The empty notification callback ignores all callbacks.
emptyNotificationCallback :: NotificationCallback
emptyNotificationCallback :: NotificationCallback
emptyNotificationCallback SchemaName
_ EvaluatedNotification
_ = forall (f :: * -> *) a. Applicative f => a -> f a
pure ()

type GhcPkgPath = String

data RemoteProcessDiedException = RemoteProcessDiedException
                                  deriving (Int -> RemoteProcessDiedException -> ShowS
[RemoteProcessDiedException] -> ShowS
RemoteProcessDiedException -> [Char]
forall a.
(Int -> a -> ShowS) -> (a -> [Char]) -> ([a] -> ShowS) -> Show a
showList :: [RemoteProcessDiedException] -> ShowS
$cshowList :: [RemoteProcessDiedException] -> ShowS
show :: RemoteProcessDiedException -> [Char]
$cshow :: RemoteProcessDiedException -> [Char]
showsPrec :: Int -> RemoteProcessDiedException -> ShowS
$cshowsPrec :: Int -> RemoteProcessDiedException -> ShowS
Show, RemoteProcessDiedException -> RemoteProcessDiedException -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: RemoteProcessDiedException -> RemoteProcessDiedException -> Bool
$c/= :: RemoteProcessDiedException -> RemoteProcessDiedException -> Bool
== :: RemoteProcessDiedException -> RemoteProcessDiedException -> Bool
$c== :: RemoteProcessDiedException -> RemoteProcessDiedException -> Bool
Eq)
                                           
instance Exception RemoteProcessDiedException                                          
  
data RequestTimeoutException = RequestTimeoutException
                             deriving (Int -> RequestTimeoutException -> ShowS
[RequestTimeoutException] -> ShowS
RequestTimeoutException -> [Char]
forall a.
(Int -> a -> ShowS) -> (a -> [Char]) -> ([a] -> ShowS) -> Show a
showList :: [RequestTimeoutException] -> ShowS
$cshowList :: [RequestTimeoutException] -> ShowS
show :: RequestTimeoutException -> [Char]
$cshow :: RequestTimeoutException -> [Char]
showsPrec :: Int -> RequestTimeoutException -> ShowS
$cshowsPrec :: Int -> RequestTimeoutException -> ShowS
Show, RequestTimeoutException -> RequestTimeoutException -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: RequestTimeoutException -> RequestTimeoutException -> Bool
$c/= :: RequestTimeoutException -> RequestTimeoutException -> Bool
== :: RequestTimeoutException -> RequestTimeoutException -> Bool
$c== :: RequestTimeoutException -> RequestTimeoutException -> Bool
Eq)

instance Exception RequestTimeoutException

-- | Construct a 'ConnectionInfo' to describe how to make the 'Connection'. The database can be run within the current process or running remotely via RPC.
data ConnectionInfo = InProcessConnectionInfo PersistenceStrategy NotificationCallback [GhcPkgPath] |
                      RemoteConnectionInfo DatabaseName Hostname ServiceName NotificationCallback
                      
type EvaluatedNotifications = M.Map NotificationName EvaluatedNotification

-- | Used for callbacks from the server when monitored changes have been made.
newtype NotificationMessage = NotificationMessage EvaluatedNotifications
                           deriving (NotificationMessage -> NotificationMessage -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: NotificationMessage -> NotificationMessage -> Bool
$c/= :: NotificationMessage -> NotificationMessage -> Bool
== :: NotificationMessage -> NotificationMessage -> Bool
$c== :: NotificationMessage -> NotificationMessage -> Bool
Eq, Int -> NotificationMessage -> ShowS
[NotificationMessage] -> ShowS
NotificationMessage -> [Char]
forall a.
(Int -> a -> ShowS) -> (a -> [Char]) -> ([a] -> ShowS) -> Show a
showList :: [NotificationMessage] -> ShowS
$cshowList :: [NotificationMessage] -> ShowS
show :: NotificationMessage -> [Char]
$cshow :: NotificationMessage -> [Char]
showsPrec :: Int -> NotificationMessage -> ShowS
$cshowsPrec :: Int -> NotificationMessage -> ShowS
Show, forall x. Rep NotificationMessage x -> NotificationMessage
forall x. NotificationMessage -> Rep NotificationMessage x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep NotificationMessage x -> NotificationMessage
$cfrom :: forall x. NotificationMessage -> Rep NotificationMessage x
Generic)
                           deriving Typeable NotificationMessage
BundleSerialise NotificationMessage
Extractor NotificationMessage
Decoder NotificationMessage
Proxy NotificationMessage -> SchemaGen Schema
NotificationMessage -> Builder
forall a.
Typeable a
-> (Proxy a -> SchemaGen Schema)
-> (a -> Builder)
-> Extractor a
-> Decoder a
-> BundleSerialise a
-> Serialise a
bundleSerialise :: BundleSerialise NotificationMessage
$cbundleSerialise :: BundleSerialise NotificationMessage
decodeCurrent :: Decoder NotificationMessage
$cdecodeCurrent :: Decoder NotificationMessage
extractor :: Extractor NotificationMessage
$cextractor :: Extractor NotificationMessage
toBuilder :: NotificationMessage -> Builder
$ctoBuilder :: NotificationMessage -> Builder
schemaGen :: Proxy NotificationMessage -> SchemaGen Schema
$cschemaGen :: Proxy NotificationMessage -> SchemaGen Schema
Serialise via WineryVariant NotificationMessage

-- | When a notification is fired, the 'reportOldExpr' is evaluated in the commit's pre-change context while the 'reportNewExpr' is evaluated in the post-change context and they are returned along with the original notification.
data EvaluatedNotification = EvaluatedNotification {
  EvaluatedNotification -> Notification
notification :: Notification,
  EvaluatedNotification -> Either RelationalError Relation
reportOldRelation :: Either RelationalError Relation,
  EvaluatedNotification -> Either RelationalError Relation
reportNewRelation :: Either RelationalError Relation
  }
  deriving (EvaluatedNotification -> EvaluatedNotification -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: EvaluatedNotification -> EvaluatedNotification -> Bool
$c/= :: EvaluatedNotification -> EvaluatedNotification -> Bool
== :: EvaluatedNotification -> EvaluatedNotification -> Bool
$c== :: EvaluatedNotification -> EvaluatedNotification -> Bool
Eq, Int -> EvaluatedNotification -> ShowS
[EvaluatedNotification] -> ShowS
EvaluatedNotification -> [Char]
forall a.
(Int -> a -> ShowS) -> (a -> [Char]) -> ([a] -> ShowS) -> Show a
showList :: [EvaluatedNotification] -> ShowS
$cshowList :: [EvaluatedNotification] -> ShowS
show :: EvaluatedNotification -> [Char]
$cshow :: EvaluatedNotification -> [Char]
showsPrec :: Int -> EvaluatedNotification -> ShowS
$cshowsPrec :: Int -> EvaluatedNotification -> ShowS
Show, forall x. Rep EvaluatedNotification x -> EvaluatedNotification
forall x. EvaluatedNotification -> Rep EvaluatedNotification x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep EvaluatedNotification x -> EvaluatedNotification
$cfrom :: forall x. EvaluatedNotification -> Rep EvaluatedNotification x
Generic)
  deriving Typeable EvaluatedNotification
BundleSerialise EvaluatedNotification
Extractor EvaluatedNotification
Decoder EvaluatedNotification
Proxy EvaluatedNotification -> SchemaGen Schema
EvaluatedNotification -> Builder
forall a.
Typeable a
-> (Proxy a -> SchemaGen Schema)
-> (a -> Builder)
-> Extractor a
-> Decoder a
-> BundleSerialise a
-> Serialise a
bundleSerialise :: BundleSerialise EvaluatedNotification
$cbundleSerialise :: BundleSerialise EvaluatedNotification
decodeCurrent :: Decoder EvaluatedNotification
$cdecodeCurrent :: Decoder EvaluatedNotification
extractor :: Extractor EvaluatedNotification
$cextractor :: Extractor EvaluatedNotification
toBuilder :: EvaluatedNotification -> Builder
$ctoBuilder :: EvaluatedNotification -> Builder
schemaGen :: Proxy EvaluatedNotification -> SchemaGen Schema
$cschemaGen :: Proxy EvaluatedNotification -> SchemaGen Schema
Serialise via WineryRecord EvaluatedNotification
                      

-- | Use this for connecting to remote servers on the default port.
defaultServerPort :: Port
defaultServerPort :: Port
defaultServerPort = Port
6543

-- | Use this for connecting to remote servers with the default database name.
defaultDatabaseName :: DatabaseName
defaultDatabaseName :: [Char]
defaultDatabaseName = [Char]
"base"

-- | Use this for connecting to remote servers with the default head name.
defaultHeadName :: HeadName
defaultHeadName :: SchemaName
defaultHeadName = SchemaName
"master"

-- | Create a connection configuration which connects to the localhost on the default server port and default server database name. The configured notification callback is set to ignore all events.
defaultRemoteConnectionInfo :: ConnectionInfo
defaultRemoteConnectionInfo :: ConnectionInfo
defaultRemoteConnectionInfo =
  [Char]
-> [Char] -> [Char] -> NotificationCallback -> ConnectionInfo
RemoteConnectionInfo [Char]
defaultDatabaseName [Char]
defaultServerHostname (forall a. Show a => a -> [Char]
show Port
defaultServerPort) NotificationCallback
emptyNotificationCallback

defaultServerHostname :: Hostname
defaultServerHostname :: [Char]
defaultServerHostname = [Char]
"localhost"

newtype RemoteConnectionConf = RemoteConnectionConf RPC.Connection
  
data Connection = InProcessConnection InProcessConnectionConf |
                  RemoteConnection RemoteConnectionConf
                  
-- | There are several reasons why a connection can fail.
data ConnectionError = SetupDatabaseDirectoryError PersistenceError |
                       IOExceptionError IOException |
                       NoSuchDatabaseByNameError DatabaseName |
                       DatabaseValidationError [MerkleValidationError] |
                       LoginError 
                       deriving (Int -> ConnectionError -> ShowS
[ConnectionError] -> ShowS
ConnectionError -> [Char]
forall a.
(Int -> a -> ShowS) -> (a -> [Char]) -> ([a] -> ShowS) -> Show a
showList :: [ConnectionError] -> ShowS
$cshowList :: [ConnectionError] -> ShowS
show :: ConnectionError -> [Char]
$cshow :: ConnectionError -> [Char]
showsPrec :: Int -> ConnectionError -> ShowS
$cshowsPrec :: Int -> ConnectionError -> ShowS
Show, ConnectionError -> ConnectionError -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: ConnectionError -> ConnectionError -> Bool
$c/= :: ConnectionError -> ConnectionError -> Bool
== :: ConnectionError -> ConnectionError -> Bool
$c== :: ConnectionError -> ConnectionError -> Bool
Eq, forall x. Rep ConnectionError x -> ConnectionError
forall x. ConnectionError -> Rep ConnectionError x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep ConnectionError x -> ConnectionError
$cfrom :: forall x. ConnectionError -> Rep ConnectionError x
Generic)
                  
remoteDBLookupName :: DatabaseName -> String    
remoteDBLookupName :: ShowS
remoteDBLookupName = forall a. [a] -> [a] -> [a]
(++) [Char]
"db-" 

createScriptSession :: [String] -> IO (Maybe ScriptSession)  
createScriptSession :: [[Char]] -> IO (Maybe ScriptSession)
createScriptSession [[Char]]
ghcPkgPaths = do
  Either ScriptSessionError ScriptSession
eScriptSession <- [[Char]] -> IO (Either ScriptSessionError ScriptSession)
initScriptSession [[Char]]
ghcPkgPaths
  case Either ScriptSessionError ScriptSession
eScriptSession of
    Left ScriptSessionError
err -> Handle -> [Char] -> IO ()
hPutStrLn Handle
stderr ([Char]
"Warning: Haskell scripting disabled: " forall a. [a] -> [a] -> [a]
++ forall a. Show a => a -> [Char]
show ScriptSessionError
err) forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a. Maybe a
Nothing --not a fatal error, but the scripting feature must be disabled
    Right ScriptSession
s -> forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a. a -> Maybe a
Just ScriptSession
s)


-- | To create a 'Connection' to a remote or local database, create a 'ConnectionInfo' and call 'connectProjectM36'.
connectProjectM36 :: ConnectionInfo -> IO (Either ConnectionError Connection)
--create a new in-memory database/transaction graph
connectProjectM36 :: ConnectionInfo -> IO (Either ConnectionError Connection)
connectProjectM36 (InProcessConnectionInfo PersistenceStrategy
strat NotificationCallback
notificationCallback [[Char]]
ghcPkgPaths) = do
  SessionId
freshId <- IO SessionId
nextRandom
  UTCTime
tstamp <- IO UTCTime
getCurrentTime
  let bootstrapContext :: DatabaseContext
bootstrapContext = DatabaseContext
basicDatabaseContext 
      freshGraph :: TransactionGraph
freshGraph = UTCTime -> SessionId -> DatabaseContext -> TransactionGraph
bootstrapTransactionGraph UTCTime
tstamp SessionId
freshId DatabaseContext
bootstrapContext
  case PersistenceStrategy
strat of
    --create date examples graph for now- probably should be empty context in the future
    PersistenceStrategy
NoPersistence -> do
        TVar TransactionGraph
graphTvar <- forall a. a -> IO (TVar a)
newTVarIO TransactionGraph
freshGraph
        Set ClientInfo
clientNodes <- forall item. IO (Set item)
StmSet.newIO
        Map SessionId Session
sessions <- forall key value. IO (Map key value)
StmMap.newIO
        Maybe ScriptSession
mScriptSession <- [[Char]] -> IO (Maybe ScriptSession)
createScriptSession [[Char]]
ghcPkgPaths
        Async ()
notifAsync <- Set ClientInfo -> NotificationCallback -> IO (Async ())
startNotificationListener Set ClientInfo
clientNodes NotificationCallback
notificationCallback
        let conn :: Connection
conn = InProcessConnectionConf -> Connection
InProcessConnection InProcessConnectionConf {
                                           ipPersistenceStrategy :: PersistenceStrategy
ipPersistenceStrategy = PersistenceStrategy
strat, 
                                           ipClientNodes :: Set ClientInfo
ipClientNodes = Set ClientInfo
clientNodes, 
                                           ipSessions :: Map SessionId Session
ipSessions = Map SessionId Session
sessions, 
                                           ipTransactionGraph :: TVar TransactionGraph
ipTransactionGraph = TVar TransactionGraph
graphTvar, 
                                           ipScriptSession :: Maybe ScriptSession
ipScriptSession = Maybe ScriptSession
mScriptSession,
                                           ipLocks :: Maybe (LockFile, MVar LockFileHash)
ipLocks = forall a. Maybe a
Nothing,
                                           ipCallbackAsync :: Async ()
ipCallbackAsync = Async ()
notifAsync
                                           }
        forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a b. b -> Either a b
Right Connection
conn)
    MinimalPersistence [Char]
dbdir -> PersistenceStrategy
-> DiskSync
-> [Char]
-> TransactionGraph
-> NotificationCallback
-> [[Char]]
-> IO (Either ConnectionError Connection)
connectPersistentProjectM36 PersistenceStrategy
strat DiskSync
NoDiskSync [Char]
dbdir TransactionGraph
freshGraph NotificationCallback
notificationCallback [[Char]]
ghcPkgPaths
    CrashSafePersistence [Char]
dbdir -> PersistenceStrategy
-> DiskSync
-> [Char]
-> TransactionGraph
-> NotificationCallback
-> [[Char]]
-> IO (Either ConnectionError Connection)
connectPersistentProjectM36 PersistenceStrategy
strat DiskSync
FsyncDiskSync [Char]
dbdir TransactionGraph
freshGraph NotificationCallback
notificationCallback [[Char]]
ghcPkgPaths
        
connectProjectM36 (RemoteConnectionInfo [Char]
dbName [Char]
hostName [Char]
servicePort NotificationCallback
notificationCallback) = do
  --TODO- add notification callback thread
  let resolutionHints :: AddrInfo
resolutionHints = AddrInfo
defaultHints { addrFlags :: [AddrInfoFlag]
addrFlags = [AddrInfoFlag
AI_NUMERICHOST, AddrInfoFlag
AI_NUMERICSERV],
                                       addrSocketType :: SocketType
addrSocketType = SocketType
Stream
                                       }
  [AddrInfo]
resolved <- Maybe AddrInfo -> Maybe [Char] -> Maybe [Char] -> IO [AddrInfo]
getAddrInfo (forall a. a -> Maybe a
Just AddrInfo
resolutionHints) (forall a. a -> Maybe a
Just [Char]
hostName) (forall a. a -> Maybe a
Just [Char]
servicePort)
  case [AddrInfo]
resolved of
    [] -> forall a. HasCallStack => [Char] -> a
error ([Char]
"DNS resolution failed for" forall a. Semigroup a => a -> a -> a
<> [Char]
hostName forall a. Semigroup a => a -> a -> a
<> [Char]
":" forall a. Semigroup a => a -> a -> a
<> [Char]
servicePort)
    AddrInfo
addrInfo:[AddrInfo]
_ -> do
      --supports IPv4 only for now
      let (PortNumber
port, HostAddress
addr) = case AddrInfo -> SockAddr
addrAddress AddrInfo
addrInfo of
                           SockAddrInet PortNumber
p HostAddress
a -> (PortNumber
p, HostAddress
a)
                           SockAddr
_ -> forall a. HasCallStack => [Char] -> a
error [Char]
"no IPv4 address available (IPv6 not implemented)"
          notificationHandlers :: [ClientAsyncRequestHandler]
notificationHandlers =
            [forall a. Serialise a => (a -> IO ()) -> ClientAsyncRequestHandler
RPC.ClientAsyncRequestHandler forall a b. (a -> b) -> a -> b
$
             \(NotificationMessage EvaluatedNotifications
notifications') ->
               forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ (forall k a. Map k a -> [(k, a)]
M.toList EvaluatedNotifications
notifications') (forall a b c. (a -> b -> c) -> (a, b) -> c
uncurry NotificationCallback
notificationCallback)
            ]
      let connectExcHandler :: IOException -> f (Either ConnectionError b)
connectExcHandler (IOException
e :: IOException) = forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ forall a b. a -> Either a b
Left (IOException -> ConnectionError
IOExceptionError IOException
e)
      Either ConnectionError Connection
eConn <- (forall a b. b -> Either a b
Right forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [ClientAsyncRequestHandler]
-> HostAddr -> PortNumber -> IO Connection
RPC.connect [ClientAsyncRequestHandler]
notificationHandlers (HostAddress -> HostAddr
hostAddressToTuple HostAddress
addr) PortNumber
port) forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`catch` forall {f :: * -> *} {b}.
Applicative f =>
IOException -> f (Either ConnectionError b)
connectExcHandler
      case Either ConnectionError Connection
eConn of
        Left ConnectionError
err -> forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a b. a -> Either a b
Left ConnectionError
err)
        Right Connection
conn -> do
          Either ConnectionError Bool
eRet <- forall request response.
(Serialise request, Serialise response) =>
Connection -> request -> IO (Either ConnectionError response)
RPC.call Connection
conn ([Char] -> Login
Login [Char]
dbName)
          case Either ConnectionError Bool
eRet of
            Left ConnectionError
err -> forall a. HasCallStack => [Char] -> a
error (forall a. Show a => a -> [Char]
show ConnectionError
err)
            Right Bool
False -> forall a. HasCallStack => [Char] -> a
error [Char]
"wtf"
            Right Bool
True ->
      --TODO handle connection errors!
              forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a b. b -> Either a b
Right (RemoteConnectionConf -> Connection
RemoteConnection (Connection -> RemoteConnectionConf
RemoteConnectionConf Connection
conn)))

--convert RPC errors into exceptions
convertRPCErrors :: RPC.ConnectionError -> IO a
convertRPCErrors :: forall a. ConnectionError -> IO a
convertRPCErrors ConnectionError
err =
  case ConnectionError
err of
    ConnectionError
RPC.TimeoutError -> forall a e. Exception e => e -> a
throw RequestTimeoutException
RequestTimeoutException
    RPC.CodecError [Char]
msg -> forall a. HasCallStack => [Char] -> a
error forall a b. (a -> b) -> a -> b
$ [Char]
"decoding message failed on server: " forall a. Semigroup a => a -> a -> a
<> [Char]
msg
    RPC.ExceptionError [Char]
msg -> forall a. HasCallStack => [Char] -> a
error forall a b. (a -> b) -> a -> b
$ [Char]
"server threw exception: " forall a. Semigroup a => a -> a -> a
<> [Char]
msg

addClientNode :: Connection -> RPC.Locking Socket -> IO ()
addClientNode :: Connection -> Locking Socket -> IO ()
addClientNode (RemoteConnection RemoteConnectionConf
_) Locking Socket
_ = forall a. HasCallStack => [Char] -> a
error [Char]
"addClientNode called on remote connection"
addClientNode (InProcessConnection InProcessConnectionConf
conf) Locking Socket
lockSock = forall a. STM a -> IO a
atomically (forall item. Hashable item => item -> Set item -> STM ()
StmSet.insert ClientInfo
clientInfo (InProcessConnectionConf -> Set ClientInfo
ipClientNodes InProcessConnectionConf
conf))
  where
    clientInfo :: ClientInfo
clientInfo = Locking Socket -> ClientInfo
RemoteClientInfo Locking Socket
lockSock

connectPersistentProjectM36 :: PersistenceStrategy ->
                               DiskSync ->
                               FilePath -> 
                               TransactionGraph ->
                               NotificationCallback ->
                               [GhcPkgPath] -> 
                               IO (Either ConnectionError Connection)      
connectPersistentProjectM36 :: PersistenceStrategy
-> DiskSync
-> [Char]
-> TransactionGraph
-> NotificationCallback
-> [[Char]]
-> IO (Either ConnectionError Connection)
connectPersistentProjectM36 PersistenceStrategy
strat DiskSync
sync [Char]
dbdir TransactionGraph
freshGraph NotificationCallback
notificationCallback [[Char]]
ghcPkgPaths = do
  Either PersistenceError (LockFile, LockFileHash)
err <- DiskSync
-> [Char]
-> TransactionGraph
-> IO (Either PersistenceError (LockFile, LockFileHash))
setupDatabaseDir DiskSync
sync [Char]
dbdir TransactionGraph
freshGraph 
  case Either PersistenceError (LockFile, LockFileHash)
err of
    Left PersistenceError
err' -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a b. a -> Either a b
Left (PersistenceError -> ConnectionError
SetupDatabaseDirectoryError PersistenceError
err')
    Right (LockFile
lockFileH, LockFileHash
digest) -> do
      Maybe ScriptSession
mScriptSession <- [[Char]] -> IO (Maybe ScriptSession)
createScriptSession [[Char]]
ghcPkgPaths
      Either PersistenceError TransactionGraph
graph <- [Char]
-> TransactionGraph
-> Maybe ScriptSession
-> IO (Either PersistenceError TransactionGraph)
transactionGraphLoad [Char]
dbdir TransactionGraph
emptyTransactionGraph Maybe ScriptSession
mScriptSession
      case Either PersistenceError TransactionGraph
graph of
        Left PersistenceError
err' -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a b. a -> Either a b
Left (PersistenceError -> ConnectionError
SetupDatabaseDirectoryError PersistenceError
err')
        Right TransactionGraph
graph' -> do
          case TransactionGraph -> Either [MerkleValidationError] ()
TG.validateMerkleHashes TransactionGraph
graph' of
            Left [MerkleValidationError]
merkleErrs -> forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a b. a -> Either a b
Left ([MerkleValidationError] -> ConnectionError
DatabaseValidationError [MerkleValidationError]
merkleErrs))
            Right ()
_ -> do
              TVar TransactionGraph
tvarGraph <- forall a. a -> IO (TVar a)
newTVarIO TransactionGraph
graph'
              Map SessionId Session
sessions <- forall key value. IO (Map key value)
StmMap.newIO
              Set ClientInfo
clientNodes <- forall item. IO (Set item)
StmSet.newIO
              MVar LockFileHash
lockMVar <- forall a. a -> IO (MVar a)
newMVar LockFileHash
digest
              Async ()
notifAsync <- Set ClientInfo -> NotificationCallback -> IO (Async ())
startNotificationListener Set ClientInfo
clientNodes NotificationCallback
notificationCallback
              let conn :: Connection
conn = InProcessConnectionConf -> Connection
InProcessConnection InProcessConnectionConf {
                                             ipPersistenceStrategy :: PersistenceStrategy
ipPersistenceStrategy = PersistenceStrategy
strat,
                                             ipClientNodes :: Set ClientInfo
ipClientNodes = Set ClientInfo
clientNodes,
                                             ipSessions :: Map SessionId Session
ipSessions = Map SessionId Session
sessions,
                                             ipTransactionGraph :: TVar TransactionGraph
ipTransactionGraph = TVar TransactionGraph
tvarGraph,
                                             ipScriptSession :: Maybe ScriptSession
ipScriptSession = Maybe ScriptSession
mScriptSession,
                                             ipLocks :: Maybe (LockFile, MVar LockFileHash)
ipLocks = forall a. a -> Maybe a
Just (LockFile
lockFileH, MVar LockFileHash
lockMVar),
                                             ipCallbackAsync :: Async ()
ipCallbackAsync = Async ()
notifAsync
                                             }
              forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a b. b -> Either a b
Right Connection
conn)

--startup local async process to handle notification callbacks
startNotificationListener :: ClientNodes -> NotificationCallback -> IO (Async ())
startNotificationListener :: Set ClientInfo -> NotificationCallback -> IO (Async ())
startNotificationListener Set ClientInfo
cNodes NotificationCallback
notificationCallback = do
  inProcessClientInfo :: ClientInfo
inProcessClientInfo@(InProcessClientInfo MVar EvaluatedNotifications
notifMVar) <- MVar EvaluatedNotifications -> ClientInfo
InProcessClientInfo forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. IO (MVar a)
newEmptyMVar          
  forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall item. Hashable item => item -> Set item -> STM ()
StmSet.insert ClientInfo
inProcessClientInfo Set ClientInfo
cNodes 
  forall a. IO a -> IO (Async a)
async forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) a b. Applicative f => f a -> f b
forever forall a b. (a -> b) -> a -> b
$ do
    EvaluatedNotifications
notifs <- forall a. MVar a -> IO a
takeMVar MVar EvaluatedNotifications
notifMVar
    forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ (forall k a. Map k a -> [(k, a)]
M.toList EvaluatedNotifications
notifs) forall a b. (a -> b) -> a -> b
$ forall a b c. (a -> b -> c) -> (a, b) -> c
uncurry NotificationCallback
notificationCallback

-- | Create a new session at the transaction id and return the session's Id.
createSessionAtCommit :: Connection -> TransactionId -> IO (Either RelationalError SessionId)
createSessionAtCommit :: Connection -> SessionId -> IO (Either RelationalError SessionId)
createSessionAtCommit conn :: Connection
conn@(InProcessConnection InProcessConnectionConf
_) SessionId
commitId = do
   SessionId
newSessionId <- IO SessionId
nextRandom
   forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ SessionId
-> SessionId
-> Connection
-> STM (Either RelationalError SessionId)
createSessionAtCommit_ SessionId
commitId SessionId
newSessionId Connection
conn
createSessionAtCommit conn :: Connection
conn@(RemoteConnection RemoteConnectionConf
_) SessionId
uuid = forall a b. (Serialise a, Serialise b) => Connection -> a -> IO b
remoteCall Connection
conn (SessionId -> CreateSessionAtCommit
CreateSessionAtCommit SessionId
uuid)

createSessionAtCommit_ :: TransactionId -> SessionId -> Connection -> STM (Either RelationalError SessionId)
createSessionAtCommit_ :: SessionId
-> SessionId
-> Connection
-> STM (Either RelationalError SessionId)
createSessionAtCommit_ SessionId
commitId SessionId
newSessionId (InProcessConnection InProcessConnectionConf
conf) = do
    let sessions :: Map SessionId Session
sessions = InProcessConnectionConf -> Map SessionId Session
ipSessions InProcessConnectionConf
conf
        graphTvar :: TVar TransactionGraph
graphTvar = InProcessConnectionConf -> TVar TransactionGraph
ipTransactionGraph InProcessConnectionConf
conf
    TransactionGraph
graph <- forall a. TVar a -> STM a
readTVar TVar TransactionGraph
graphTvar
    case SessionId -> TransactionGraph -> Either RelationalError Transaction
RE.transactionForId SessionId
commitId TransactionGraph
graph of
        Left RelationalError
err -> forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a b. a -> Either a b
Left RelationalError
err)
        Right Transaction
transaction -> do
            let freshDiscon :: DisconnectedTransaction
freshDiscon = SessionId -> Schemas -> Bool -> DisconnectedTransaction
DisconnectedTransaction SessionId
commitId (SessionId -> Schemas -> Schemas
Discon.loadGraphRefRelVarsOnly SessionId
commitId (Transaction -> Schemas
Trans.schemas Transaction
transaction)) Bool
False
            Maybe Session
keyDuplication <- forall key value.
Hashable key =>
key -> Map key value -> STM (Maybe value)
StmMap.lookup SessionId
newSessionId Map SessionId Session
sessions
            case Maybe Session
keyDuplication of
                Just Session
_ -> forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ forall a b. a -> Either a b
Left (SessionId -> RelationalError
SessionIdInUseError SessionId
newSessionId)
                Maybe Session
Nothing -> do
                   forall key value.
Hashable key =>
value -> key -> Map key value -> STM ()
StmMap.insert (DisconnectedTransaction -> SchemaName -> Session
Session DisconnectedTransaction
freshDiscon SchemaName
defaultSchemaName) SessionId
newSessionId Map SessionId Session
sessions
                   forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ forall a b. b -> Either a b
Right SessionId
newSessionId
createSessionAtCommit_ SessionId
_ SessionId
_ (RemoteConnection RemoteConnectionConf
_) = forall a. HasCallStack => [Char] -> a
error [Char]
"createSessionAtCommit_ called on remote connection"
  
-- | Call 'createSessionAtHead' with a transaction graph's head's name to create a new session pinned to that head. This function returns a 'SessionId' which can be used in other function calls to reference the point in the transaction graph.
createSessionAtHead :: Connection -> HeadName -> IO (Either RelationalError SessionId)
createSessionAtHead :: Connection -> SchemaName -> IO (Either RelationalError SessionId)
createSessionAtHead conn :: Connection
conn@(InProcessConnection InProcessConnectionConf
conf) SchemaName
headn = do
    let graphTvar :: TVar TransactionGraph
graphTvar = InProcessConnectionConf -> TVar TransactionGraph
ipTransactionGraph InProcessConnectionConf
conf
    SessionId
newSessionId <- IO SessionId
nextRandom
    forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
        TransactionGraph
graph <- forall a. TVar a -> STM a
readTVar TVar TransactionGraph
graphTvar
        case SchemaName -> TransactionGraph -> Maybe Transaction
transactionForHead SchemaName
headn TransactionGraph
graph of
            Maybe Transaction
Nothing -> forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ forall a b. a -> Either a b
Left (SchemaName -> RelationalError
NoSuchHeadNameError SchemaName
headn)
            Just Transaction
trans -> SessionId
-> SessionId
-> Connection
-> STM (Either RelationalError SessionId)
createSessionAtCommit_ (Transaction -> SessionId
transactionId Transaction
trans) SessionId
newSessionId Connection
conn
createSessionAtHead conn :: Connection
conn@(RemoteConnection RemoteConnectionConf
_) SchemaName
headn = forall a b. (Serialise a, Serialise b) => Connection -> a -> IO b
remoteCall Connection
conn (SchemaName -> CreateSessionAtHead
CreateSessionAtHead SchemaName
headn)

-- | Discards a session, eliminating any uncommitted changes present in the session.
closeSession :: SessionId -> Connection -> IO ()
closeSession :: SessionId -> Connection -> IO ()
closeSession SessionId
sessionId (InProcessConnection InProcessConnectionConf
conf) = 
    forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall key value. Hashable key => key -> Map key value -> STM ()
StmMap.delete SessionId
sessionId (InProcessConnectionConf -> Map SessionId Session
ipSessions InProcessConnectionConf
conf)
closeSession SessionId
sessionId conn :: Connection
conn@(RemoteConnection RemoteConnectionConf
_) = forall a b. (Serialise a, Serialise b) => Connection -> a -> IO b
remoteCall Connection
conn (SessionId -> CloseSession
CloseSession SessionId
sessionId)       

-- | 'close' cleans up the database access connection and closes any relevant sockets.
close :: Connection -> IO ()
close :: Connection -> IO ()
close (InProcessConnection InProcessConnectionConf
conf) = do
  forall a. Async a -> IO ()
cancel (InProcessConnectionConf -> Async ()
ipCallbackAsync InProcessConnectionConf
conf)
  forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
    let sessions :: Map SessionId Session
sessions = InProcessConnectionConf -> Map SessionId Session
ipSessions InProcessConnectionConf
conf
#if MIN_VERSION_stm_containers(1,0,0)        
    forall key value. Map key value -> STM ()
StmMap.reset Map SessionId Session
sessions
#else
    StmMap.deleteAll sessions
#endif
    forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
  let mLocks :: Maybe (LockFile, MVar LockFileHash)
mLocks = InProcessConnectionConf -> Maybe (LockFile, MVar LockFileHash)
ipLocks InProcessConnectionConf
conf
  case Maybe (LockFile, MVar LockFileHash)
mLocks of
    Maybe (LockFile, MVar LockFileHash)
Nothing -> forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
    Just (LockFile
lockFileH, MVar LockFileHash
_) -> LockFile -> IO ()
closeLockFile LockFile
lockFileH

close (RemoteConnection (RemoteConnectionConf Connection
conn)) =
  Connection -> IO ()
RPC.close Connection
conn

--used only by the server EntryPoints
closeRemote_ :: Connection -> IO ()
closeRemote_ :: Connection -> IO ()
closeRemote_ (InProcessConnection InProcessConnectionConf
_) = forall a. HasCallStack => [Char] -> a
error [Char]
"invalid call of closeRemote_ on InProcessConnection"
closeRemote_ (RemoteConnection (RemoteConnectionConf Connection
conn)) = Connection -> IO ()
RPC.close Connection
conn

  --we need to actually close the localNode's connection to the remote
--within the database server, we must catch and handle all exception lest they take down the database process- this handling might be different for other use-cases
--exceptions should generally *NOT* be thrown from any Project:M36 code paths, but third-party code such as AtomFunction scripts could conceivably throw undefined, etc.

excEither :: IO (Either RelationalError a) -> IO (Either RelationalError a)
excEither :: forall a.
IO (Either RelationalError a) -> IO (Either RelationalError a)
excEither = forall e a. Exception e => (e -> IO a) -> IO a -> IO a
handle forall {b}. SomeException -> IO (Either RelationalError b)
handler
  where
    handler :: SomeException -> IO (Either RelationalError b)
handler SomeException
exc | Just (AsyncException
_ :: AsyncException) <- forall e. Exception e => SomeException -> Maybe e
fromException SomeException
exc = forall e a. Exception e => e -> IO a
throwIO SomeException
exc
                | Bool
otherwise = forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a b. a -> Either a b
Left ([Char] -> RelationalError
UnhandledExceptionError (forall a. Show a => a -> [Char]
show SomeException
exc)))


remoteCall :: (Serialise a, Serialise b) => Connection -> a -> IO b
remoteCall :: forall a b. (Serialise a, Serialise b) => Connection -> a -> IO b
remoteCall (InProcessConnection InProcessConnectionConf
_ ) a
_ = forall a. HasCallStack => [Char] -> a
error [Char]
"remoteCall called on local connection"
remoteCall (RemoteConnection (RemoteConnectionConf Connection
rpcConn)) a
arg = do
  Either ConnectionError b
eRet <- forall request response.
(Serialise request, Serialise response) =>
Connection -> request -> IO (Either ConnectionError response)
RPC.call Connection
rpcConn a
arg
  case Either ConnectionError b
eRet of
    Left ConnectionError
err -> forall a. ConnectionError -> IO a
convertRPCErrors ConnectionError
err
    Right b
val -> forall (f :: * -> *) a. Applicative f => a -> f a
pure b
val

sessionForSessionId :: SessionId -> Sessions -> STM (Either RelationalError Session)
sessionForSessionId :: SessionId
-> Map SessionId Session -> STM (Either RelationalError Session)
sessionForSessionId SessionId
sessionId Map SessionId Session
sessions = 
  forall b a. b -> (a -> b) -> Maybe a -> b
maybe (forall a b. a -> Either a b
Left forall a b. (a -> b) -> a -> b
$ SessionId -> RelationalError
NoSuchSessionError SessionId
sessionId) forall a b. b -> Either a b
Right forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall key value.
Hashable key =>
key -> Map key value -> STM (Maybe value)
StmMap.lookup SessionId
sessionId Map SessionId Session
sessions
  
schemaForSessionId :: Session -> STM (Either RelationalError Schema)  
schemaForSessionId :: Session -> STM (Either RelationalError Schema)
schemaForSessionId Session
session = do
  let sname :: SchemaName
sname = Session -> SchemaName
schemaName Session
session
  if SchemaName
sname forall a. Eq a => a -> a -> Bool
== SchemaName
defaultSchemaName then
    forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a b. b -> Either a b
Right (SchemaIsomorphs -> Schema
Schema [])) -- the main schema includes no transformations (but neither do empty schemas :/ )
    else
    case forall k a. Ord k => k -> Map k a -> Maybe a
M.lookup SchemaName
sname (Session -> Subschemas
subschemas Session
session) of
      Maybe Schema
Nothing -> forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a b. a -> Either a b
Left (SchemaName -> RelationalError
SubschemaNameNotInUseError SchemaName
sname))
      Just Schema
schema -> forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a b. b -> Either a b
Right Schema
schema)
  
sessionAndSchema :: SessionId -> Sessions -> STM (Either RelationalError (Session, Schema))
sessionAndSchema :: SessionId
-> Map SessionId Session
-> STM (Either RelationalError (Session, Schema))
sessionAndSchema SessionId
sessionId Map SessionId Session
sessions = do
  Either RelationalError Session
eSession <- SessionId
-> Map SessionId Session -> STM (Either RelationalError Session)
sessionForSessionId SessionId
sessionId Map SessionId Session
sessions
  case Either RelationalError Session
eSession of
    Left RelationalError
err -> forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a b. a -> Either a b
Left RelationalError
err)
    Right Session
session -> do  
      Either RelationalError Schema
eSchema <- Session -> STM (Either RelationalError Schema)
schemaForSessionId Session
session
      case Either RelationalError Schema
eSchema of
        Left RelationalError
err -> forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a b. a -> Either a b
Left RelationalError
err)
        Right Schema
schema -> forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a b. b -> Either a b
Right (Session
session, Schema
schema))
  
-- | Returns the name of the currently selected isomorphic schema.
currentSchemaName :: SessionId -> Connection -> IO (Either RelationalError SchemaName)
currentSchemaName :: SessionId -> Connection -> IO (Either RelationalError SchemaName)
currentSchemaName SessionId
sessionId (InProcessConnection InProcessConnectionConf
conf) = forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
  let sessions :: Map SessionId Session
sessions = InProcessConnectionConf -> Map SessionId Session
ipSessions InProcessConnectionConf
conf
  Either RelationalError Session
eSession <- SessionId
-> Map SessionId Session -> STM (Either RelationalError Session)
sessionForSessionId SessionId
sessionId Map SessionId Session
sessions
  case Either RelationalError Session
eSession of
    Left RelationalError
err -> forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a b. a -> Either a b
Left RelationalError
err)
    Right Session
session -> forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a b. b -> Either a b
Right (Session -> SchemaName
Sess.schemaName Session
session))
currentSchemaName SessionId
sessionId conn :: Connection
conn@(RemoteConnection RemoteConnectionConf
_) = forall a b. (Serialise a, Serialise b) => Connection -> a -> IO b
remoteCall Connection
conn (SessionId -> RetrieveCurrentSchemaName
RetrieveCurrentSchemaName SessionId
sessionId)

-- | Switch to the named isomorphic schema.
setCurrentSchemaName :: SessionId -> Connection -> SchemaName -> IO (Either RelationalError ())
setCurrentSchemaName :: SessionId
-> Connection -> SchemaName -> IO (Either RelationalError ())
setCurrentSchemaName SessionId
sessionId (InProcessConnection InProcessConnectionConf
conf) SchemaName
sname = forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
  let sessions :: Map SessionId Session
sessions = InProcessConnectionConf -> Map SessionId Session
ipSessions InProcessConnectionConf
conf
  Either RelationalError Session
eSession <- SessionId
-> Map SessionId Session -> STM (Either RelationalError Session)
sessionForSessionId SessionId
sessionId Map SessionId Session
sessions
  case Either RelationalError Session
eSession of
    Left RelationalError
err -> forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a b. a -> Either a b
Left RelationalError
err)
    Right Session
session -> case SchemaName -> Session -> Either RelationalError Session
Sess.setSchemaName SchemaName
sname Session
session of
      Left RelationalError
err -> forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a b. a -> Either a b
Left RelationalError
err)
      Right Session
newSession -> forall key value.
Hashable key =>
value -> key -> Map key value -> STM ()
StmMap.insert Session
newSession SessionId
sessionId Map SessionId Session
sessions forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a b. b -> Either a b
Right ())
setCurrentSchemaName SessionId
sessionId conn :: Connection
conn@(RemoteConnection RemoteConnectionConf
_) SchemaName
sname = forall a b. (Serialise a, Serialise b) => Connection -> a -> IO b
remoteCall Connection
conn (SessionId -> SchemaName -> ExecuteSetCurrentSchema
ExecuteSetCurrentSchema SessionId
sessionId SchemaName
sname)

-- | Execute a relational expression in the context of the session and connection. Relational expressions are queries and therefore cannot alter the database.
executeRelationalExpr :: SessionId -> Connection -> RelationalExpr -> IO (Either RelationalError Relation)
executeRelationalExpr :: SessionId
-> Connection
-> RelationalExpr
-> IO (Either RelationalError Relation)
executeRelationalExpr SessionId
sessionId (InProcessConnection InProcessConnectionConf
conf) RelationalExpr
expr = forall a.
IO (Either RelationalError a) -> IO (Either RelationalError a)
excEither forall a b. (a -> b) -> a -> b
$ forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
  let sessions :: Map SessionId Session
sessions = InProcessConnectionConf -> Map SessionId Session
ipSessions InProcessConnectionConf
conf
  Either RelationalError (Session, Schema)
eSession <- SessionId
-> Map SessionId Session
-> STM (Either RelationalError (Session, Schema))
sessionAndSchema SessionId
sessionId Map SessionId Session
sessions
  case Either RelationalError (Session, Schema)
eSession of
    Left RelationalError
err -> forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ forall a b. a -> Either a b
Left RelationalError
err
    Right (Session
session, Schema
schema) -> do
      let expr' :: Either RelationalError RelationalExpr
expr' = if Session -> SchemaName
schemaName Session
session forall a. Eq a => a -> a -> Bool
/= SchemaName
defaultSchemaName then
                    Schema -> RelationalExpr -> Either RelationalError RelationalExpr
Schema.processRelationalExprInSchema Schema
schema RelationalExpr
expr
                  else
                    forall a b. b -> Either a b
Right RelationalExpr
expr
      case Either RelationalError RelationalExpr
expr' of
        Left RelationalError
err -> forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a b. a -> Either a b
Left RelationalError
err)
        Right RelationalExpr
expr'' -> do
          let graphTvar :: TVar TransactionGraph
graphTvar = InProcessConnectionConf -> TVar TransactionGraph
ipTransactionGraph InProcessConnectionConf
conf
          TransactionGraph
graph <- forall a. TVar a -> STM a
readTVar TVar TransactionGraph
graphTvar
          let reEnv :: RelationalExprEnv
reEnv = DatabaseContext -> TransactionGraph -> RelationalExprEnv
RE.mkRelationalExprEnv (Session -> DatabaseContext
Sess.concreteDatabaseContext Session
session) TransactionGraph
graph
          case RelationalExprEnv
-> RelationalExpr -> Either RelationalError Relation
optimizeAndEvalRelationalExpr RelationalExprEnv
reEnv RelationalExpr
expr'' of
            Right Relation
rel -> forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a. NFData a => a -> a
force (forall a b. b -> Either a b
Right Relation
rel)) -- this is necessary so that any undefined/error exceptions are spit out here 
            Left RelationalError
err -> forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a b. a -> Either a b
Left RelationalError
err)

executeRelationalExpr SessionId
sessionId conn :: Connection
conn@(RemoteConnection RemoteConnectionConf
_) RelationalExpr
relExpr = forall a b. (Serialise a, Serialise b) => Connection -> a -> IO b
remoteCall Connection
conn (SessionId -> RelationalExpr -> ExecuteRelationalExpr
ExecuteRelationalExpr SessionId
sessionId RelationalExpr
relExpr)

-- | Execute a database context expression in the context of the session and connection. Database expressions modify the current session's disconnected transaction but cannot modify the transaction graph.
executeDatabaseContextExpr :: SessionId -> Connection -> DatabaseContextExpr -> IO (Either RelationalError ())
executeDatabaseContextExpr :: SessionId
-> Connection
-> DatabaseContextExpr
-> IO (Either RelationalError ())
executeDatabaseContextExpr SessionId
sessionId (InProcessConnection InProcessConnectionConf
conf) DatabaseContextExpr
expr = forall a.
IO (Either RelationalError a) -> IO (Either RelationalError a)
excEither forall a b. (a -> b) -> a -> b
$ forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
  let sessions :: Map SessionId Session
sessions = InProcessConnectionConf -> Map SessionId Session
ipSessions InProcessConnectionConf
conf
  Either RelationalError (Session, Schema)
eSession <- SessionId
-> Map SessionId Session
-> STM (Either RelationalError (Session, Schema))
sessionAndSchema SessionId
sessionId Map SessionId Session
sessions
  case Either RelationalError (Session, Schema)
eSession of
    Left RelationalError
err -> forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a b. a -> Either a b
Left RelationalError
err)
    Right (Session
session, Schema
schema) -> do
      let expr' :: Either RelationalError DatabaseContextExpr
expr' = if Session -> SchemaName
schemaName Session
session forall a. Eq a => a -> a -> Bool
== SchemaName
defaultSchemaName then
                    forall a b. b -> Either a b
Right DatabaseContextExpr
expr
                  else
                    Schema
-> DatabaseContextExpr
-> Either RelationalError DatabaseContextExpr
Schema.processDatabaseContextExprInSchema Schema
schema DatabaseContextExpr
expr
      case Either RelationalError DatabaseContextExpr
expr' of 
        Left RelationalError
err -> forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a b. a -> Either a b
Left RelationalError
err)
        Right DatabaseContextExpr
expr'' -> do
          TransactionGraph
graph <- forall a. TVar a -> STM a
readTVar (InProcessConnectionConf -> TVar TransactionGraph
ipTransactionGraph InProcessConnectionConf
conf)
          let ctx :: DatabaseContext
ctx = Session -> DatabaseContext
Sess.concreteDatabaseContext Session
session
              env :: DatabaseContextEvalEnv
env = SessionId -> TransactionGraph -> DatabaseContextEvalEnv
RE.mkDatabaseContextEvalEnv SessionId
transId TransactionGraph
graph
              transId :: SessionId
transId = Session -> SessionId
Sess.parentId Session
session
          case DatabaseContext
-> DatabaseContextEvalEnv
-> DatabaseContextEvalMonad ()
-> Either RelationalError DatabaseContextEvalState
RE.runDatabaseContextEvalMonad DatabaseContext
ctx DatabaseContextEvalEnv
env (Bool -> DatabaseContextExpr -> DatabaseContextEvalMonad ()
optimizeAndEvalDatabaseContextExpr Bool
True DatabaseContextExpr
expr'') of
            Left RelationalError
err -> forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a b. a -> Either a b
Left RelationalError
err)
            Right DatabaseContextEvalState
newState ->
              if Bool -> Bool
not (DatabaseContextEvalState -> Bool
RE.dbc_dirty DatabaseContextEvalState
newState) then --nothing dirtied, nothing to do
                forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a b. b -> Either a b
Right ())
              else do
                let newDiscon :: DisconnectedTransaction
newDiscon = SessionId -> Schemas -> Bool -> DisconnectedTransaction
DisconnectedTransaction (Session -> SessionId
Sess.parentId Session
session) Schemas
newSchemas Bool
True
                    context' :: DatabaseContext
context' = DatabaseContextEvalState -> DatabaseContext
RE.dbc_context DatabaseContextEvalState
newState
                    newSubschemas :: Subschemas
newSubschemas = Subschemas -> DatabaseContextExpr -> Subschemas
Schema.processDatabaseContextExprSchemasUpdate (Session -> Subschemas
Sess.subschemas Session
session) DatabaseContextExpr
expr
                    newSchemas :: Schemas
newSchemas = DatabaseContext -> Subschemas -> Schemas
Schemas DatabaseContext
context' Subschemas
newSubschemas
                    newSession :: Session
newSession = DisconnectedTransaction -> SchemaName -> Session
Session DisconnectedTransaction
newDiscon (Session -> SchemaName
Sess.schemaName Session
session)
                forall key value.
Hashable key =>
value -> key -> Map key value -> STM ()
StmMap.insert Session
newSession SessionId
sessionId Map SessionId Session
sessions
                forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a b. b -> Either a b
Right ())
executeDatabaseContextExpr SessionId
sessionId conn :: Connection
conn@(RemoteConnection RemoteConnectionConf
_) DatabaseContextExpr
dbExpr = forall a b. (Serialise a, Serialise b) => Connection -> a -> IO b
remoteCall Connection
conn (SessionId -> DatabaseContextExpr -> ExecuteDatabaseContextExpr
ExecuteDatabaseContextExpr SessionId
sessionId DatabaseContextExpr
dbExpr)

-- | Similar to a git rebase, 'autoMergeToHead' atomically creates a temporary branch and merges it to the latest commit of the branch referred to by the 'HeadName' and commits the merge. This is useful to reduce incidents of 'TransactionIsNotAHeadError's but at the risk of merge errors (thus making it similar to rebasing). Alternatively, as an optimization, if a simple commit is possible (meaning that the head has not changed), then a fast-forward commit takes place instead.
autoMergeToHead :: SessionId -> Connection -> MergeStrategy -> HeadName -> IO (Either RelationalError ())
autoMergeToHead :: SessionId
-> Connection
-> MergeStrategy
-> SchemaName
-> IO (Either RelationalError ())
autoMergeToHead SessionId
sessionId (InProcessConnection InProcessConnectionConf
conf) MergeStrategy
strat SchemaName
headName' = do
  let sessions :: Map SessionId Session
sessions = InProcessConnectionConf -> Map SessionId Session
ipSessions InProcessConnectionConf
conf
  SessionId
id1 <- IO SessionId
nextRandom
  SessionId
id2 <- IO SessionId
nextRandom
  SessionId
id3 <- IO SessionId
nextRandom
  UTCTime
tstamp <- IO UTCTime
getCurrentTime
  SessionId
-> InProcessConnectionConf
-> (TransactionGraph
    -> STM
         (Either
            RelationalError
            (DisconnectedTransaction, TransactionGraph, [SessionId])))
-> IO (Either RelationalError ())
commitLock_ SessionId
sessionId InProcessConnectionConf
conf forall a b. (a -> b) -> a -> b
$ \TransactionGraph
graph -> do
    Either RelationalError Session
eSession <- SessionId
-> Map SessionId Session -> STM (Either RelationalError Session)
sessionForSessionId SessionId
sessionId Map SessionId Session
sessions  
    case Either RelationalError Session
eSession of
      Left RelationalError
err -> forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a b. a -> Either a b
Left RelationalError
err)
      Right Session
session -> 
        case SchemaName -> TransactionGraph -> Maybe Transaction
Graph.transactionForHead SchemaName
headName' TransactionGraph
graph of
          Maybe Transaction
Nothing -> forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a b. a -> Either a b
Left (SchemaName -> RelationalError
NoSuchHeadNameError SchemaName
headName'))
          Just Transaction
headTrans -> do
            --attempt fast-forward commit, if possible
            let graphInfo :: Either
  RelationalError
  ((DisconnectedTransaction, TransactionGraph), [SessionId])
graphInfo = if Session -> SessionId
Sess.parentId Session
session forall a. Eq a => a -> a -> Bool
== Transaction -> SessionId
transactionId Transaction
headTrans then do
                              (DisconnectedTransaction, TransactionGraph)
ret <- UTCTime
-> SessionId
-> DisconnectedTransaction
-> TransactionGraph
-> TransactionGraphOperator
-> Either
     RelationalError (DisconnectedTransaction, TransactionGraph)
Graph.evalGraphOp UTCTime
tstamp SessionId
id1 (Session -> DisconnectedTransaction
Sess.disconnectedTransaction Session
session) TransactionGraph
graph TransactionGraphOperator
Commit
                              forall (f :: * -> *) a. Applicative f => a -> f a
pure ((DisconnectedTransaction, TransactionGraph)
ret, [SessionId
id1])
                            else do
                              (DisconnectedTransaction, TransactionGraph)
ret <- UTCTime
-> (SessionId, SessionId, SessionId)
-> DisconnectedTransaction
-> SchemaName
-> MergeStrategy
-> TransactionGraph
-> Either
     RelationalError (DisconnectedTransaction, TransactionGraph)
Graph.autoMergeToHead UTCTime
tstamp (SessionId
id1, SessionId
id2, SessionId
id3) (Session -> DisconnectedTransaction
Sess.disconnectedTransaction Session
session) SchemaName
headName' MergeStrategy
strat TransactionGraph
graph 
                              forall (f :: * -> *) a. Applicative f => a -> f a
pure ((DisconnectedTransaction, TransactionGraph)
ret, [SessionId
id1,SessionId
id2,SessionId
id3])
            case Either
  RelationalError
  ((DisconnectedTransaction, TransactionGraph), [SessionId])
graphInfo of
              Left RelationalError
err -> forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a b. a -> Either a b
Left RelationalError
err)
              Right ((DisconnectedTransaction
discon', TransactionGraph
graph'), [SessionId]
transactionIdsAdded) ->
                forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a b. b -> Either a b
Right (DisconnectedTransaction
discon', TransactionGraph
graph', [SessionId]
transactionIdsAdded))
autoMergeToHead SessionId
sessionId conn :: Connection
conn@(RemoteConnection RemoteConnectionConf
_) MergeStrategy
strat SchemaName
headName' = forall a b. (Serialise a, Serialise b) => Connection -> a -> IO b
remoteCall Connection
conn (SessionId -> MergeStrategy -> SchemaName -> ExecuteAutoMergeToHead
ExecuteAutoMergeToHead SessionId
sessionId MergeStrategy
strat SchemaName
headName')
      
-- | Execute a database context IO-monad-based expression for the given session and connection. `DatabaseContextIOExpr`s modify the DatabaseContext but cannot be purely implemented.
--this is almost completely identical to executeDatabaseContextExpr above
executeDatabaseContextIOExpr :: SessionId -> Connection -> DatabaseContextIOExpr -> IO (Either RelationalError ())
executeDatabaseContextIOExpr :: SessionId
-> Connection
-> DatabaseContextIOExpr
-> IO (Either RelationalError ())
executeDatabaseContextIOExpr SessionId
sessionId (InProcessConnection InProcessConnectionConf
conf) DatabaseContextIOExpr
expr = forall a.
IO (Either RelationalError a) -> IO (Either RelationalError a)
excEither forall a b. (a -> b) -> a -> b
$ do
  let sessions :: Map SessionId Session
sessions = InProcessConnectionConf -> Map SessionId Session
ipSessions InProcessConnectionConf
conf
      scriptSession :: Maybe ScriptSession
scriptSession = InProcessConnectionConf -> Maybe ScriptSession
ipScriptSession InProcessConnectionConf
conf
  Either RelationalError Session
eSession <- forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ SessionId
-> Map SessionId Session -> STM (Either RelationalError Session)
sessionForSessionId SessionId
sessionId Map SessionId Session
sessions --potentially race condition due to interleaved IO?
  case Either RelationalError Session
eSession of
    Left RelationalError
err -> forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a b. a -> Either a b
Left RelationalError
err)
    Right Session
session -> do
      TransactionGraph
graph <- forall a. TVar a -> IO a
readTVarIO (InProcessConnectionConf -> TVar TransactionGraph
ipTransactionGraph InProcessConnectionConf
conf)
      let env :: DatabaseContextIOEvalEnv
env = SessionId
-> TransactionGraph
-> Maybe ScriptSession
-> Maybe [Char]
-> DatabaseContextIOEvalEnv
RE.DatabaseContextIOEvalEnv SessionId
transId TransactionGraph
graph Maybe ScriptSession
scriptSession Maybe [Char]
objFilesPath
          objFilesPath :: Maybe [Char]
objFilesPath = ShowS
objectFilesPath forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> PersistenceStrategy -> Maybe [Char]
persistenceDirectory (InProcessConnectionConf -> PersistenceStrategy
ipPersistenceStrategy InProcessConnectionConf
conf)
          transId :: SessionId
transId = Session -> SessionId
Sess.parentId Session
session
          context :: DatabaseContext
context = Session -> DatabaseContext
Sess.concreteDatabaseContext Session
session
      Either RelationalError DatabaseContextEvalState
res <- DatabaseContextIOEvalEnv
-> DatabaseContext
-> DatabaseContextIOEvalMonad (Either RelationalError ())
-> IO (Either RelationalError DatabaseContextEvalState)
RE.runDatabaseContextIOEvalMonad DatabaseContextIOEvalEnv
env DatabaseContext
context (DatabaseContextIOExpr
-> DatabaseContextIOEvalMonad (Either RelationalError ())
optimizeAndEvalDatabaseContextIOExpr DatabaseContextIOExpr
expr)
      case Either RelationalError DatabaseContextEvalState
res of
        Left RelationalError
err -> forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a b. a -> Either a b
Left RelationalError
err)
        Right DatabaseContextEvalState
newState -> do
          let newDiscon :: DisconnectedTransaction
newDiscon = SessionId -> Schemas -> Bool -> DisconnectedTransaction
DisconnectedTransaction (Session -> SessionId
Sess.parentId Session
session) Schemas
newSchemas Bool
False
              newSchemas :: Schemas
newSchemas = DatabaseContext -> Subschemas -> Schemas
Schemas DatabaseContext
context' (Session -> Subschemas
Sess.subschemas Session
session)
              newSession :: Session
newSession = DisconnectedTransaction -> SchemaName -> Session
Session DisconnectedTransaction
newDiscon (Session -> SchemaName
Sess.schemaName Session
session)
              context' :: DatabaseContext
context' = DatabaseContextEvalState -> DatabaseContext
RE.dbc_context DatabaseContextEvalState
newState
          forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall key value.
Hashable key =>
value -> key -> Map key value -> STM ()
StmMap.insert Session
newSession SessionId
sessionId Map SessionId Session
sessions
          forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a b. b -> Either a b
Right ())
executeDatabaseContextIOExpr SessionId
sessionId conn :: Connection
conn@(RemoteConnection RemoteConnectionConf
_) DatabaseContextIOExpr
dbExpr = forall a b. (Serialise a, Serialise b) => Connection -> a -> IO b
remoteCall Connection
conn (SessionId -> DatabaseContextIOExpr -> ExecuteDatabaseContextIOExpr
ExecuteDatabaseContextIOExpr SessionId
sessionId DatabaseContextIOExpr
dbExpr)
         
-- process notifications for commits
executeCommitExprSTM_
  :: TransactionGraph
  -> DatabaseContext
  -> DatabaseContext
  -> ClientNodes
  -> STM (EvaluatedNotifications, ClientNodes)
executeCommitExprSTM_ :: TransactionGraph
-> DatabaseContext
-> DatabaseContext
-> Set ClientInfo
-> STM (EvaluatedNotifications, Set ClientInfo)
executeCommitExprSTM_ TransactionGraph
graph DatabaseContext
oldContext DatabaseContext
newContext Set ClientInfo
nodes = do
  let nots :: Notifications
nots = DatabaseContext -> Notifications
notifications DatabaseContext
oldContext
      fireNots :: Notifications
fireNots = Notifications
-> TransactionGraph
-> DatabaseContext
-> DatabaseContext
-> Notifications
notificationChanges Notifications
nots TransactionGraph
graph DatabaseContext
oldContext DatabaseContext
newContext 
      evaldNots :: EvaluatedNotifications
evaldNots = forall a b k. (a -> b) -> Map k a -> Map k b
M.map Notification -> EvaluatedNotification
mkEvaldNot Notifications
fireNots
      evalInContext :: RelationalExpr
-> DatabaseContext -> Either RelationalError Relation
evalInContext RelationalExpr
expr DatabaseContext
ctx = RelationalExprEnv
-> RelationalExpr -> Either RelationalError Relation
optimizeAndEvalRelationalExpr (DatabaseContext -> TransactionGraph -> RelationalExprEnv
RE.mkRelationalExprEnv DatabaseContext
ctx TransactionGraph
graph) RelationalExpr
expr
 
      mkEvaldNot :: Notification -> EvaluatedNotification
mkEvaldNot Notification
notif = EvaluatedNotification { notification :: Notification
notification = Notification
notif, 
                                                 reportOldRelation :: Either RelationalError Relation
reportOldRelation = RelationalExpr
-> DatabaseContext -> Either RelationalError Relation
evalInContext (Notification -> RelationalExpr
reportOldExpr Notification
notif) DatabaseContext
oldContext,
                                                 reportNewRelation :: Either RelationalError Relation
reportNewRelation = RelationalExpr
-> DatabaseContext -> Either RelationalError Relation
evalInContext (Notification -> RelationalExpr
reportNewExpr Notification
notif) DatabaseContext
newContext}
  forall (f :: * -> *) a. Applicative f => a -> f a
pure (EvaluatedNotifications
evaldNots, Set ClientInfo
nodes)
  
-- | Execute a transaction graph expression in the context of the session and connection. Transaction graph operators modify the transaction graph state.

-- OPTIMIZATION OPPORTUNITY: no locks are required to write new transaction data, only to update the transaction graph id file
-- if writing data is re-entrant, we may be able to use unsafeIOtoSTM
-- perhaps keep hash of data file instead of checking if our head was updated on every write
executeGraphExpr :: SessionId -> Connection -> TransactionGraphOperator -> IO (Either RelationalError ())
executeGraphExpr :: SessionId
-> Connection
-> TransactionGraphOperator
-> IO (Either RelationalError ())
executeGraphExpr SessionId
sessionId (InProcessConnection InProcessConnectionConf
conf) TransactionGraphOperator
graphExpr = forall a.
IO (Either RelationalError a) -> IO (Either RelationalError a)
excEither forall a b. (a -> b) -> a -> b
$ do
  let sessions :: Map SessionId Session
sessions = InProcessConnectionConf -> Map SessionId Session
ipSessions InProcessConnectionConf
conf
  SessionId
freshId <- IO SessionId
nextRandom
  UTCTime
tstamp <- IO UTCTime
getCurrentTime
  SessionId
-> InProcessConnectionConf
-> (TransactionGraph
    -> STM
         (Either
            RelationalError
            (DisconnectedTransaction, TransactionGraph, [SessionId])))
-> IO (Either RelationalError ())
commitLock_ SessionId
sessionId InProcessConnectionConf
conf forall a b. (a -> b) -> a -> b
$ \TransactionGraph
updatedGraph -> do
    Either RelationalError Session
eSession <- SessionId
-> Map SessionId Session -> STM (Either RelationalError Session)
sessionForSessionId SessionId
sessionId Map SessionId Session
sessions
    case Either RelationalError Session
eSession of
      Left RelationalError
err -> forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a b. a -> Either a b
Left RelationalError
err)
      Right Session
session -> do
        let discon :: DisconnectedTransaction
discon = Session -> DisconnectedTransaction
Sess.disconnectedTransaction Session
session
        case UTCTime
-> SessionId
-> DisconnectedTransaction
-> TransactionGraph
-> TransactionGraphOperator
-> Either
     RelationalError (DisconnectedTransaction, TransactionGraph)
evalGraphOp UTCTime
tstamp SessionId
freshId DisconnectedTransaction
discon TransactionGraph
updatedGraph TransactionGraphOperator
graphExpr of
          Left RelationalError
err -> forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a b. a -> Either a b
Left RelationalError
err)
          Right (DisconnectedTransaction
discon', TransactionGraph
graph') -> do
            --if freshId appears in the graph, then we need to pass it on
            let transIds :: [SessionId]
transIds = [SessionId
freshId | forall a b. Either a b -> Bool
isRight (SessionId -> TransactionGraph -> Either RelationalError Transaction
RE.transactionForId SessionId
freshId TransactionGraph
graph')]
            forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a b. b -> Either a b
Right (DisconnectedTransaction
discon', TransactionGraph
graph', [SessionId]
transIds))

executeGraphExpr SessionId
sessionId conn :: Connection
conn@(RemoteConnection RemoteConnectionConf
_) TransactionGraphOperator
graphExpr = forall a b. (Serialise a, Serialise b) => Connection -> a -> IO b
remoteCall Connection
conn (SessionId -> TransactionGraphOperator -> ExecuteGraphExpr
ExecuteGraphExpr SessionId
sessionId TransactionGraphOperator
graphExpr)

-- | A trans-graph expression is a relational query executed against the entirety of a transaction graph.
executeTransGraphRelationalExpr :: SessionId -> Connection -> TransGraphRelationalExpr -> IO (Either RelationalError Relation)
executeTransGraphRelationalExpr :: SessionId
-> Connection
-> TransGraphRelationalExpr
-> IO (Either RelationalError Relation)
executeTransGraphRelationalExpr SessionId
_ (InProcessConnection InProcessConnectionConf
conf) TransGraphRelationalExpr
tgraphExpr = forall a.
IO (Either RelationalError a) -> IO (Either RelationalError a)
excEither forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
  let graphTvar :: TVar TransactionGraph
graphTvar = InProcessConnectionConf -> TVar TransactionGraph
ipTransactionGraph InProcessConnectionConf
conf
  TransactionGraph
graph <- forall a. TVar a -> STM a
readTVar TVar TransactionGraph
graphTvar
  forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ forall a. NFData a => a -> a
force forall a b. (a -> b) -> a -> b
$ TransactionGraph
-> TransGraphRelationalExpr -> Either RelationalError Relation
optimizeAndEvalTransGraphRelationalExpr TransactionGraph
graph TransGraphRelationalExpr
tgraphExpr
executeTransGraphRelationalExpr SessionId
sessionId conn :: Connection
conn@(RemoteConnection RemoteConnectionConf
_) TransGraphRelationalExpr
tgraphExpr = forall a b. (Serialise a, Serialise b) => Connection -> a -> IO b
remoteCall Connection
conn (SessionId
-> TransGraphRelationalExpr -> ExecuteTransGraphRelationalExpr
ExecuteTransGraphRelationalExpr SessionId
sessionId TransGraphRelationalExpr
tgraphExpr)  

-- | Schema expressions manipulate the isomorphic schemas for the current 'DatabaseContext'.
executeSchemaExpr :: SessionId -> Connection -> Schema.SchemaExpr -> IO (Either RelationalError ())
executeSchemaExpr :: SessionId
-> Connection -> SchemaExpr -> IO (Either RelationalError ())
executeSchemaExpr SessionId
sessionId (InProcessConnection InProcessConnectionConf
conf) SchemaExpr
schemaExpr = forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
  let sessions :: Map SessionId Session
sessions = InProcessConnectionConf -> Map SessionId Session
ipSessions InProcessConnectionConf
conf
  Either RelationalError (Session, Schema)
eSession <- SessionId
-> Map SessionId Session
-> STM (Either RelationalError (Session, Schema))
sessionAndSchema SessionId
sessionId Map SessionId Session
sessions  
  case Either RelationalError (Session, Schema)
eSession of
    Left RelationalError
err -> forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a b. a -> Either a b
Left RelationalError
err)
    Right (Session
session, Schema
_) -> do
      let subschemas' :: Subschemas
subschemas' = Session -> Subschemas
subschemas Session
session
          transId :: SessionId
transId = Session -> SessionId
Sess.parentId Session
session
          context :: DatabaseContext
context = Session -> DatabaseContext
Sess.concreteDatabaseContext Session
session
      TransactionGraph
graph <- forall a. TVar a -> STM a
readTVar (InProcessConnectionConf -> TVar TransactionGraph
ipTransactionGraph InProcessConnectionConf
conf)
      case SchemaExpr
-> DatabaseContext
-> SessionId
-> TransactionGraph
-> Subschemas
-> Either RelationalError (Subschemas, DatabaseContext)
Schema.evalSchemaExpr SchemaExpr
schemaExpr DatabaseContext
context SessionId
transId TransactionGraph
graph Subschemas
subschemas' of
        Left RelationalError
err -> forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a b. a -> Either a b
Left RelationalError
err)
        Right (Subschemas
newSubschemas, DatabaseContext
newContext) -> do
          --hm- maybe we should start using lenses
          let discon :: DisconnectedTransaction
discon = Session -> DisconnectedTransaction
Sess.disconnectedTransaction Session
session 
              newSchemas :: Schemas
newSchemas = DatabaseContext -> Subschemas -> Schemas
Schemas DatabaseContext
newContext Subschemas
newSubschemas
              newSession :: Session
newSession = DisconnectedTransaction -> SchemaName -> Session
Session (SessionId -> Schemas -> Bool -> DisconnectedTransaction
DisconnectedTransaction (DisconnectedTransaction -> SessionId
Discon.parentId DisconnectedTransaction
discon) Schemas
newSchemas Bool
False) (Session -> SchemaName
Sess.schemaName Session
session)
          forall key value.
Hashable key =>
value -> key -> Map key value -> STM ()
StmMap.insert Session
newSession SessionId
sessionId Map SessionId Session
sessions
          forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a b. b -> Either a b
Right ())
executeSchemaExpr SessionId
sessionId conn :: Connection
conn@(RemoteConnection RemoteConnectionConf
_) SchemaExpr
schemaExpr = forall a b. (Serialise a, Serialise b) => Connection -> a -> IO b
remoteCall Connection
conn (SessionId -> SchemaExpr -> ExecuteSchemaExpr
ExecuteSchemaExpr SessionId
sessionId SchemaExpr
schemaExpr)          

-- | After modifying a 'DatabaseContext', 'commit' the transaction to the transaction graph at the head which the session is referencing. This will also trigger checks for any notifications which need to be propagated.
commit :: SessionId -> Connection -> IO (Either RelationalError ())
commit :: SessionId -> Connection -> IO (Either RelationalError ())
commit SessionId
sessionId conn :: Connection
conn@(InProcessConnection InProcessConnectionConf
_) = SessionId
-> Connection
-> TransactionGraphOperator
-> IO (Either RelationalError ())
executeGraphExpr SessionId
sessionId Connection
conn TransactionGraphOperator
Commit 
commit SessionId
sessionId conn :: Connection
conn@(RemoteConnection RemoteConnectionConf
_) = forall a b. (Serialise a, Serialise b) => Connection -> a -> IO b
remoteCall Connection
conn (SessionId -> TransactionGraphOperator -> ExecuteGraphExpr
ExecuteGraphExpr SessionId
sessionId TransactionGraphOperator
Commit)

sendNotifications :: [ClientInfo] -> EvaluatedNotifications -> IO ()
sendNotifications :: [ClientInfo] -> EvaluatedNotifications -> IO ()
sendNotifications [ClientInfo]
clients EvaluatedNotifications
notifs =
  forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (forall k a. Map k a -> Bool
M.null EvaluatedNotifications
notifs) forall a b. (a -> b) -> a -> b
$ forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [ClientInfo]
clients ClientInfo -> IO ()
sender
 where
  sender :: ClientInfo -> IO ()
sender (RemoteClientInfo Locking Socket
sock) = forall a. Serialise a => Locking Socket -> a -> IO ()
RPC.sendMessage Locking Socket
sock (EvaluatedNotifications -> NotificationMessage
NotificationMessage EvaluatedNotifications
notifs)
  sender (InProcessClientInfo MVar EvaluatedNotifications
tvar) = forall a. MVar a -> a -> IO ()
putMVar MVar EvaluatedNotifications
tvar EvaluatedNotifications
notifs

-- | Discard any changes made in the current 'Session' and 'DatabaseContext'. This resets the disconnected transaction to reference the original database context of the parent transaction and is a very cheap operation.
rollback :: SessionId -> Connection -> IO (Either RelationalError ())
rollback :: SessionId -> Connection -> IO (Either RelationalError ())
rollback SessionId
sessionId conn :: Connection
conn@(InProcessConnection InProcessConnectionConf
_) = SessionId
-> Connection
-> TransactionGraphOperator
-> IO (Either RelationalError ())
executeGraphExpr SessionId
sessionId Connection
conn TransactionGraphOperator
Rollback      
rollback SessionId
sessionId conn :: Connection
conn@(RemoteConnection RemoteConnectionConf
_) = forall a b. (Serialise a, Serialise b) => Connection -> a -> IO b
remoteCall Connection
conn (SessionId -> TransactionGraphOperator -> ExecuteGraphExpr
ExecuteGraphExpr SessionId
sessionId TransactionGraphOperator
Rollback)

-- | Write the transaction graph to disk. This function can be used to incrementally write new transactions to disk.
processTransactionGraphPersistence :: PersistenceStrategy -> [TransactionId] -> TransactionGraph -> IO ()
processTransactionGraphPersistence :: PersistenceStrategy -> [SessionId] -> TransactionGraph -> IO ()
processTransactionGraphPersistence PersistenceStrategy
NoPersistence [SessionId]
_ TransactionGraph
_ = forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
processTransactionGraphPersistence (MinimalPersistence [Char]
dbdir) [SessionId]
transIds TransactionGraph
graph = forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ DiskSync
-> [Char] -> [SessionId] -> TransactionGraph -> IO LockFileHash
transactionGraphPersist DiskSync
NoDiskSync [Char]
dbdir [SessionId]
transIds TransactionGraph
graph
processTransactionGraphPersistence (CrashSafePersistence [Char]
dbdir) [SessionId]
transIds TransactionGraph
graph = forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ DiskSync
-> [Char] -> [SessionId] -> TransactionGraph -> IO LockFileHash
transactionGraphPersist DiskSync
FsyncDiskSync [Char]
dbdir [SessionId]
transIds TransactionGraph
graph

readGraphTransactionIdDigest :: PersistenceStrategy -> IO LockFileHash
readGraphTransactionIdDigest :: PersistenceStrategy -> IO LockFileHash
readGraphTransactionIdDigest PersistenceStrategy
NoPersistence = forall a. HasCallStack => [Char] -> a
error [Char]
"attempt to read digest from transaction log without persistence enabled"
readGraphTransactionIdDigest (MinimalPersistence [Char]
dbdir) = [Char] -> IO LockFileHash
readGraphTransactionIdFileDigest [Char]
dbdir 
readGraphTransactionIdDigest (CrashSafePersistence [Char]
dbdir) = [Char] -> IO LockFileHash
readGraphTransactionIdFileDigest [Char]
dbdir 

-- | Return a relation whose type would match that of the relational expression if it were executed. This is useful for checking types and validating a relational expression's types.
typeForRelationalExpr :: SessionId -> Connection -> RelationalExpr -> IO (Either RelationalError Relation)
typeForRelationalExpr :: SessionId
-> Connection
-> RelationalExpr
-> IO (Either RelationalError Relation)
typeForRelationalExpr SessionId
sessionId conn :: Connection
conn@(InProcessConnection InProcessConnectionConf
_) RelationalExpr
relExpr = forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ SessionId
-> Connection
-> RelationalExpr
-> STM (Either RelationalError Relation)
typeForRelationalExprSTM SessionId
sessionId Connection
conn RelationalExpr
relExpr
typeForRelationalExpr SessionId
sessionId conn :: Connection
conn@(RemoteConnection RemoteConnectionConf
_) RelationalExpr
relExpr = forall a b. (Serialise a, Serialise b) => Connection -> a -> IO b
remoteCall Connection
conn (SessionId -> RelationalExpr -> ExecuteTypeForRelationalExpr
ExecuteTypeForRelationalExpr SessionId
sessionId RelationalExpr
relExpr)
    
typeForRelationalExprSTM :: SessionId -> Connection -> RelationalExpr -> STM (Either RelationalError Relation)    
typeForRelationalExprSTM :: SessionId
-> Connection
-> RelationalExpr
-> STM (Either RelationalError Relation)
typeForRelationalExprSTM SessionId
sessionId (InProcessConnection InProcessConnectionConf
conf) RelationalExpr
relExpr = do
  let sessions :: Map SessionId Session
sessions = InProcessConnectionConf -> Map SessionId Session
ipSessions InProcessConnectionConf
conf
  Either RelationalError (Session, Schema)
eSession <- SessionId
-> Map SessionId Session
-> STM (Either RelationalError (Session, Schema))
sessionAndSchema SessionId
sessionId Map SessionId Session
sessions
  case Either RelationalError (Session, Schema)
eSession of
    Left RelationalError
err -> forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ forall a b. a -> Either a b
Left RelationalError
err
    Right (Session
session, Schema
schema) -> do
      let processed :: Either RelationalError RelationalExpr
processed = if Session -> SchemaName
schemaName Session
session forall a. Eq a => a -> a -> Bool
== SchemaName
defaultSchemaName then
                       forall a b. b -> Either a b
Right RelationalExpr
relExpr
                     else
                       Schema -> RelationalExpr -> Either RelationalError RelationalExpr
Schema.processRelationalExprInSchema Schema
schema RelationalExpr
relExpr
      case Either RelationalError RelationalExpr
processed of
        Left RelationalError
err -> forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a b. a -> Either a b
Left RelationalError
err)
        Right RelationalExpr
relExpr' -> do
          TransactionGraph
graph <- forall a. TVar a -> STM a
readTVar (InProcessConnectionConf -> TVar TransactionGraph
ipTransactionGraph InProcessConnectionConf
conf)          
          let reEnv :: RelationalExprEnv
reEnv = DatabaseContext -> TransactionGraph -> RelationalExprEnv
RE.mkRelationalExprEnv (Session -> DatabaseContext
Sess.concreteDatabaseContext Session
session) TransactionGraph
graph
          forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ forall a.
RelationalExprEnv -> RelationalExprM a -> Either RelationalError a
RE.runRelationalExprM RelationalExprEnv
reEnv (RelationalExpr -> RelationalExprM Relation
RE.typeForRelationalExpr RelationalExpr
relExpr') 
    
typeForRelationalExprSTM SessionId
_ Connection
_ RelationalExpr
_ = forall a. HasCallStack => [Char] -> a
error [Char]
"typeForRelationalExprSTM called on non-local connection"

-- | Return a 'Map' of the database's constraints at the context of the session and connection.
inclusionDependencies :: SessionId -> Connection -> IO (Either RelationalError InclusionDependencies)
inclusionDependencies :: SessionId
-> Connection -> IO (Either RelationalError InclusionDependencies)
inclusionDependencies SessionId
sessionId (InProcessConnection InProcessConnectionConf
conf) = do
  let sessions :: Map SessionId Session
sessions = InProcessConnectionConf -> Map SessionId Session
ipSessions InProcessConnectionConf
conf
  forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
    Either RelationalError (Session, Schema)
eSession <- SessionId
-> Map SessionId Session
-> STM (Either RelationalError (Session, Schema))
sessionAndSchema SessionId
sessionId Map SessionId Session
sessions
    case Either RelationalError (Session, Schema)
eSession of
      Left RelationalError
err -> forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ forall a b. a -> Either a b
Left RelationalError
err 
      Right (Session
session, Schema
schema) -> do
            let context :: DatabaseContext
context = Session -> DatabaseContext
Sess.concreteDatabaseContext Session
session
            if Session -> SchemaName
schemaName Session
session forall a. Eq a => a -> a -> Bool
== SchemaName
defaultSchemaName then
              forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ forall a b. b -> Either a b
Right (DatabaseContext -> InclusionDependencies
B.inclusionDependencies DatabaseContext
context)
              else
              forall (f :: * -> *) a. Applicative f => a -> f a
pure (Schema
-> InclusionDependencies
-> Either RelationalError InclusionDependencies
Schema.inclusionDependenciesInSchema Schema
schema (DatabaseContext -> InclusionDependencies
B.inclusionDependencies DatabaseContext
context))

inclusionDependencies SessionId
sessionId conn :: Connection
conn@(RemoteConnection RemoteConnectionConf
_) = forall a b. (Serialise a, Serialise b) => Connection -> a -> IO b
remoteCall Connection
conn (SessionId -> RetrieveInclusionDependencies
RetrieveInclusionDependencies SessionId
sessionId)

typeConstructorMapping :: SessionId -> Connection -> IO (Either RelationalError TypeConstructorMapping)
typeConstructorMapping :: SessionId
-> Connection -> IO (Either RelationalError TypeConstructorMapping)
typeConstructorMapping SessionId
sessionId (InProcessConnection InProcessConnectionConf
conf) = do
  let sessions :: Map SessionId Session
sessions = InProcessConnectionConf -> Map SessionId Session
ipSessions InProcessConnectionConf
conf
  forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
    Either RelationalError (Session, Schema)
eSession <- SessionId
-> Map SessionId Session
-> STM (Either RelationalError (Session, Schema))
sessionAndSchema SessionId
sessionId Map SessionId Session
sessions
    case Either RelationalError (Session, Schema)
eSession of
      Left RelationalError
err -> forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ forall a b. a -> Either a b
Left RelationalError
err 
      Right (Session
session, Schema
_) -> --warning, no schema support for typeconstructors
        forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a b. b -> Either a b
Right (DatabaseContext -> TypeConstructorMapping
B.typeConstructorMapping (Session -> DatabaseContext
Sess.concreteDatabaseContext Session
session)))
typeConstructorMapping SessionId
sessionId conn :: Connection
conn@(RemoteConnection RemoteConnectionConf
_) = forall a b. (Serialise a, Serialise b) => Connection -> a -> IO b
remoteCall Connection
conn (SessionId -> RetrieveTypeConstructorMapping
RetrieveTypeConstructorMapping SessionId
sessionId)
  
-- | Return an optimized database expression which is logically equivalent to the input database expression. This function can be used to determine which expression will actually be evaluated.
planForDatabaseContextExpr :: SessionId -> Connection -> DatabaseContextExpr -> IO (Either RelationalError GraphRefDatabaseContextExpr)  
planForDatabaseContextExpr :: SessionId
-> Connection
-> DatabaseContextExpr
-> IO (Either RelationalError GraphRefDatabaseContextExpr)
planForDatabaseContextExpr SessionId
sessionId (InProcessConnection InProcessConnectionConf
conf) DatabaseContextExpr
dbExpr = do
  let sessions :: Map SessionId Session
sessions = InProcessConnectionConf -> Map SessionId Session
ipSessions InProcessConnectionConf
conf
  forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
    TransactionGraph
graph <- forall a. TVar a -> STM a
readTVar (InProcessConnectionConf -> TVar TransactionGraph
ipTransactionGraph InProcessConnectionConf
conf)    
    Either RelationalError (Session, Schema)
eSession <- SessionId
-> Map SessionId Session
-> STM (Either RelationalError (Session, Schema))
sessionAndSchema SessionId
sessionId Map SessionId Session
sessions
    case Either RelationalError (Session, Schema)
eSession of
      Left RelationalError
err -> forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ forall a b. a -> Either a b
Left RelationalError
err 
      Right (Session
session, Schema
_) ->
        if Session -> SchemaName
schemaName Session
session forall a. Eq a => a -> a -> Bool
== SchemaName
defaultSchemaName then do
          let ctx :: DatabaseContext
ctx = Session -> DatabaseContext
Sess.concreteDatabaseContext Session
session
              transId :: SessionId
transId = Session -> SessionId
Sess.parentId Session
session
              gfExpr :: GraphRefDatabaseContextExpr
gfExpr = forall a. GraphRefTransactionMarker -> ProcessExprM a -> a
runProcessExprM GraphRefTransactionMarker
UncommittedContextMarker (DatabaseContextExpr -> ProcessExprM GraphRefDatabaseContextExpr
processDatabaseContextExpr DatabaseContextExpr
dbExpr)
          forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ forall a.
SessionId
-> DatabaseContext
-> TransactionGraph
-> GraphRefSOptDatabaseContextExprM a
-> Either RelationalError a
runGraphRefSOptDatabaseContextExprM SessionId
transId DatabaseContext
ctx TransactionGraph
graph (GraphRefDatabaseContextExpr
-> GraphRefSOptDatabaseContextExprM GraphRefDatabaseContextExpr
optimizeGraphRefDatabaseContextExpr GraphRefDatabaseContextExpr
gfExpr)
        else -- don't show any optimization because the current optimization infrastructure relies on access to the base context- this probably underscores the need for each schema to have its own DatabaseContext, even if it is generated on-the-fly-}
          forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a b. a -> Either a b
Left RelationalError
NonConcreteSchemaPlanError)

planForDatabaseContextExpr SessionId
sessionId conn :: Connection
conn@(RemoteConnection RemoteConnectionConf
_) DatabaseContextExpr
dbExpr = forall a b. (Serialise a, Serialise b) => Connection -> a -> IO b
remoteCall Connection
conn (SessionId
-> DatabaseContextExpr -> RetrievePlanForDatabaseContextExpr
RetrievePlanForDatabaseContextExpr SessionId
sessionId DatabaseContextExpr
dbExpr)
             
-- | Return a relation which represents the current state of the global transaction graph. The attributes are 
--    * current- boolean attribute representing whether or not the current session references this transaction
--    * head- text attribute which is a non-empty 'HeadName' iff the transaction references a head.
--    * id- id attribute of the transaction
--    * parents- a relation-valued attribute which contains a relation of transaction ids which are parent transaction to the transaction
transactionGraphAsRelation :: SessionId -> Connection -> IO (Either RelationalError Relation)
transactionGraphAsRelation :: SessionId -> Connection -> IO (Either RelationalError Relation)
transactionGraphAsRelation SessionId
sessionId (InProcessConnection InProcessConnectionConf
conf) = do
  let sessions :: Map SessionId Session
sessions = InProcessConnectionConf -> Map SessionId Session
ipSessions InProcessConnectionConf
conf
      tvar :: TVar TransactionGraph
tvar = InProcessConnectionConf -> TVar TransactionGraph
ipTransactionGraph InProcessConnectionConf
conf
  forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
    Either RelationalError Session
eSession <- SessionId
-> Map SessionId Session -> STM (Either RelationalError Session)
sessionForSessionId SessionId
sessionId Map SessionId Session
sessions
    case Either RelationalError Session
eSession of
      Left RelationalError
err -> forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ forall a b. a -> Either a b
Left RelationalError
err
      Right Session
session ->
        DisconnectedTransaction
-> TransactionGraph -> Either RelationalError Relation
graphAsRelation (Session -> DisconnectedTransaction
Sess.disconnectedTransaction Session
session) forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. TVar a -> STM a
readTVar TVar TransactionGraph
tvar
    
transactionGraphAsRelation SessionId
sessionId conn :: Connection
conn@(RemoteConnection RemoteConnectionConf
_) = forall a b. (Serialise a, Serialise b) => Connection -> a -> IO b
remoteCall Connection
conn (SessionId -> RetrieveTransactionGraph
RetrieveTransactionGraph SessionId
sessionId) 

-- | Returns the names and types of the relation variables in the current 'Session'.
relationVariablesAsRelation :: SessionId -> Connection -> IO (Either RelationalError Relation)
relationVariablesAsRelation :: SessionId -> Connection -> IO (Either RelationalError Relation)
relationVariablesAsRelation SessionId
sessionId (InProcessConnection InProcessConnectionConf
conf) = do
  let sessions :: Map SessionId Session
sessions = InProcessConnectionConf -> Map SessionId Session
ipSessions InProcessConnectionConf
conf
  forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
    TransactionGraph
graph <- forall a. TVar a -> STM a
readTVar (InProcessConnectionConf -> TVar TransactionGraph
ipTransactionGraph InProcessConnectionConf
conf)
    Either RelationalError (Session, Schema)
eSession <- SessionId
-> Map SessionId Session
-> STM (Either RelationalError (Session, Schema))
sessionAndSchema SessionId
sessionId Map SessionId Session
sessions
    case Either RelationalError (Session, Schema)
eSession of
      Left RelationalError
err -> forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a b. a -> Either a b
Left RelationalError
err)
      Right (Session
session, Schema
schema) -> do
        let context :: DatabaseContext
context = Session -> DatabaseContext
Sess.concreteDatabaseContext Session
session
        forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ DatabaseContext
-> Schema -> TransactionGraph -> Either RelationalError Relation
Schema.relationVariablesAsRelationInSchema DatabaseContext
context Schema
schema TransactionGraph
graph
      
relationVariablesAsRelation SessionId
sessionId conn :: Connection
conn@(RemoteConnection RemoteConnectionConf
_) = forall a b. (Serialise a, Serialise b) => Connection -> a -> IO b
remoteCall Connection
conn (SessionId -> RetrieveRelationVariableSummary
RetrieveRelationVariableSummary SessionId
sessionId)

-- | Returns a relation representing the complete DDL of the current `DatabaseContext`.
ddlAsRelation :: SessionId -> Connection -> IO (Either RelationalError Relation)
ddlAsRelation :: SessionId -> Connection -> IO (Either RelationalError Relation)
ddlAsRelation SessionId
sessionId (InProcessConnection InProcessConnectionConf
conf) = do
  let sessions :: Map SessionId Session
sessions = InProcessConnectionConf -> Map SessionId Session
ipSessions InProcessConnectionConf
conf
  forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
    TransactionGraph
graph <- forall a. TVar a -> STM a
readTVar (InProcessConnectionConf -> TVar TransactionGraph
ipTransactionGraph InProcessConnectionConf
conf)
    Either RelationalError (Session, Schema)
eSession <- SessionId
-> Map SessionId Session
-> STM (Either RelationalError (Session, Schema))
sessionAndSchema SessionId
sessionId Map SessionId Session
sessions
    case Either RelationalError (Session, Schema)
eSession of
      Left RelationalError
err -> forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a b. a -> Either a b
Left RelationalError
err)
      Right (Session
session, Schema
schema) -> do
        let context :: DatabaseContext
context = Session -> DatabaseContext
Sess.concreteDatabaseContext Session
session
        forall (f :: * -> *) a. Applicative f => a -> f a
pure (Schema
-> DatabaseContext
-> TransactionGraph
-> Either RelationalError Relation
ddlType Schema
schema DatabaseContext
context TransactionGraph
graph)
ddlAsRelation SessionId
sessionId conn :: Connection
conn@RemoteConnection{} = forall a b. (Serialise a, Serialise b) => Connection -> a -> IO b
remoteCall Connection
conn (SessionId -> RetrieveDDLAsRelation
RetrieveDDLAsRelation SessionId
sessionId)

-- | Returns the names and types of the atom functions in the current 'Session'.
atomFunctionsAsRelation :: SessionId -> Connection -> IO (Either RelationalError Relation)
atomFunctionsAsRelation :: SessionId -> Connection -> IO (Either RelationalError Relation)
atomFunctionsAsRelation SessionId
sessionId (InProcessConnection InProcessConnectionConf
conf) = do
  let sessions :: Map SessionId Session
sessions = InProcessConnectionConf -> Map SessionId Session
ipSessions InProcessConnectionConf
conf
  forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
    Either RelationalError (Session, Schema)
eSession <- SessionId
-> Map SessionId Session
-> STM (Either RelationalError (Session, Schema))
sessionAndSchema SessionId
sessionId Map SessionId Session
sessions
    case Either RelationalError (Session, Schema)
eSession of
      Left RelationalError
err -> forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a b. a -> Either a b
Left RelationalError
err)
      Right (Session
session, Schema
_) -> 
        forall (f :: * -> *) a. Applicative f => a -> f a
pure (AtomFunctions -> Either RelationalError Relation
AF.atomFunctionsAsRelation (DatabaseContext -> AtomFunctions
atomFunctions (Session -> DatabaseContext
concreteDatabaseContext Session
session)))
        
atomFunctionsAsRelation SessionId
sessionId conn :: Connection
conn@(RemoteConnection RemoteConnectionConf
_) = forall a b. (Serialise a, Serialise b) => Connection -> a -> IO b
remoteCall Connection
conn (SessionId -> RetrieveAtomFunctionSummary
RetrieveAtomFunctionSummary SessionId
sessionId)        

databaseContextFunctionsAsRelation :: SessionId -> Connection -> IO (Either RelationalError Relation)
databaseContextFunctionsAsRelation :: SessionId -> Connection -> IO (Either RelationalError Relation)
databaseContextFunctionsAsRelation SessionId
sessionId (InProcessConnection InProcessConnectionConf
conf) = do
  let sessions :: Map SessionId Session
sessions = InProcessConnectionConf -> Map SessionId Session
ipSessions InProcessConnectionConf
conf
  forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
    Either RelationalError (Session, Schema)
eSession <- SessionId
-> Map SessionId Session
-> STM (Either RelationalError (Session, Schema))
sessionAndSchema SessionId
sessionId Map SessionId Session
sessions
    case Either RelationalError (Session, Schema)
eSession of
      Left RelationalError
err -> forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a b. a -> Either a b
Left RelationalError
err)
      Right (Session
session, Schema
_) ->
        forall (f :: * -> *) a. Applicative f => a -> f a
pure (DatabaseContextFunctions -> Either RelationalError Relation
DCF.databaseContextFunctionsAsRelation (DatabaseContext -> DatabaseContextFunctions
dbcFunctions (Session -> DatabaseContext
concreteDatabaseContext Session
session)))

databaseContextFunctionsAsRelation SessionId
sessionId conn :: Connection
conn@(RemoteConnection RemoteConnectionConf
_) = forall a b. (Serialise a, Serialise b) => Connection -> a -> IO b
remoteCall Connection
conn (SessionId -> RetrieveDatabaseContextFunctionSummary
RetrieveDatabaseContextFunctionSummary SessionId
sessionId)        

-- | Returns the transaction id for the connection's disconnected transaction committed parent transaction.  
headTransactionId :: SessionId -> Connection -> IO (Either RelationalError TransactionId)
headTransactionId :: SessionId -> Connection -> IO (Either RelationalError SessionId)
headTransactionId SessionId
sessionId (InProcessConnection InProcessConnectionConf
conf) = do
  let sessions :: Map SessionId Session
sessions = InProcessConnectionConf -> Map SessionId Session
ipSessions InProcessConnectionConf
conf  
  forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
    Either RelationalError Session
eSession <- SessionId
-> Map SessionId Session -> STM (Either RelationalError Session)
sessionForSessionId SessionId
sessionId Map SessionId Session
sessions
    case Either RelationalError Session
eSession of
      Left RelationalError
err -> forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a b. a -> Either a b
Left RelationalError
err)
      Right Session
session -> forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ forall a b. b -> Either a b
Right (Session -> SessionId
Sess.parentId Session
session)
headTransactionId SessionId
sessionId conn :: Connection
conn@(RemoteConnection RemoteConnectionConf
_) = forall a b. (Serialise a, Serialise b) => Connection -> a -> IO b
remoteCall Connection
conn (SessionId -> RetrieveHeadTransactionId
RetrieveHeadTransactionId SessionId
sessionId)
    
headNameSTM_ :: SessionId -> Sessions -> TVar TransactionGraph -> STM (Either RelationalError HeadName)  
headNameSTM_ :: SessionId
-> Map SessionId Session
-> TVar TransactionGraph
-> STM (Either RelationalError SchemaName)
headNameSTM_ SessionId
sessionId Map SessionId Session
sessions TVar TransactionGraph
graphTvar = do
    TransactionGraph
graph <- forall a. TVar a -> STM a
readTVar TVar TransactionGraph
graphTvar
    Either RelationalError Session
eSession <- SessionId
-> Map SessionId Session -> STM (Either RelationalError Session)
sessionForSessionId SessionId
sessionId Map SessionId Session
sessions
    case Either RelationalError Session
eSession of
      Left RelationalError
err -> forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a b. a -> Either a b
Left RelationalError
err)
      Right Session
session -> case SessionId -> TransactionGraph -> Either RelationalError Transaction
RE.transactionForId (Session -> SessionId
Sess.parentId Session
session) TransactionGraph
graph of
        Left RelationalError
err -> forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a b. a -> Either a b
Left RelationalError
err)
        Right Transaction
parentTrans -> case Transaction -> TransactionGraph -> Maybe SchemaName
headNameForTransaction Transaction
parentTrans TransactionGraph
graph of
          Maybe SchemaName
Nothing -> forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a b. a -> Either a b
Left RelationalError
UnknownHeadError)
          Just SchemaName
headName' -> forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a b. b -> Either a b
Right SchemaName
headName')
  
-- | Returns Just the name of the head of the current disconnected transaction or Nothing.    
headName :: SessionId -> Connection -> IO (Either RelationalError HeadName)
headName :: SessionId -> Connection -> IO (Either RelationalError SchemaName)
headName SessionId
sessionId (InProcessConnection InProcessConnectionConf
conf) = do
  let sessions :: Map SessionId Session
sessions = InProcessConnectionConf -> Map SessionId Session
ipSessions InProcessConnectionConf
conf
      graphTvar :: TVar TransactionGraph
graphTvar = InProcessConnectionConf -> TVar TransactionGraph
ipTransactionGraph InProcessConnectionConf
conf
  forall a. STM a -> IO a
atomically (SessionId
-> Map SessionId Session
-> TVar TransactionGraph
-> STM (Either RelationalError SchemaName)
headNameSTM_ SessionId
sessionId Map SessionId Session
sessions TVar TransactionGraph
graphTvar)
headName SessionId
sessionId conn :: Connection
conn@(RemoteConnection RemoteConnectionConf
_) = forall a b. (Serialise a, Serialise b) => Connection -> a -> IO b
remoteCall Connection
conn (SessionId -> ExecuteHeadName
ExecuteHeadName SessionId
sessionId)

-- | Returns a listing of all available atom types.
atomTypesAsRelation :: SessionId -> Connection -> IO (Either RelationalError Relation)
atomTypesAsRelation :: SessionId -> Connection -> IO (Either RelationalError Relation)
atomTypesAsRelation SessionId
sessionId (InProcessConnection InProcessConnectionConf
conf) = do
  let sessions :: Map SessionId Session
sessions = InProcessConnectionConf -> Map SessionId Session
ipSessions InProcessConnectionConf
conf
  forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
    Either RelationalError Session
eSession <- SessionId
-> Map SessionId Session -> STM (Either RelationalError Session)
sessionForSessionId SessionId
sessionId Map SessionId Session
sessions
    case Either RelationalError Session
eSession of
      Left RelationalError
err -> forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a b. a -> Either a b
Left RelationalError
err)
      Right Session
session ->
        case TypeConstructorMapping -> Either RelationalError Relation
typesAsRelation (DatabaseContext -> TypeConstructorMapping
B.typeConstructorMapping (Session -> DatabaseContext
Sess.concreteDatabaseContext Session
session)) of
          Left RelationalError
err -> forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a b. a -> Either a b
Left RelationalError
err)
          Right Relation
rel -> forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a b. b -> Either a b
Right Relation
rel)
atomTypesAsRelation SessionId
sessionId conn :: Connection
conn@(RemoteConnection RemoteConnectionConf
_) = forall a b. (Serialise a, Serialise b) => Connection -> a -> IO b
remoteCall Connection
conn (SessionId -> RetrieveAtomTypesAsRelation
RetrieveAtomTypesAsRelation SessionId
sessionId)

disconnectedTransactionIsDirty :: SessionId -> Connection -> IO (Either RelationalError Bool)
disconnectedTransactionIsDirty :: SessionId -> Connection -> IO (Either RelationalError Bool)
disconnectedTransactionIsDirty SessionId
sessionId (InProcessConnection InProcessConnectionConf
conf) = do
  let sessions :: Map SessionId Session
sessions = InProcessConnectionConf -> Map SessionId Session
ipSessions InProcessConnectionConf
conf
  forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
    Either RelationalError Session
eSession <- SessionId
-> Map SessionId Session -> STM (Either RelationalError Session)
sessionForSessionId SessionId
sessionId Map SessionId Session
sessions
    case Either RelationalError Session
eSession of
      Left RelationalError
err -> forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a b. a -> Either a b
Left RelationalError
err)
      Right Session
session ->
        forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a b. b -> Either a b
Right (Session -> Bool
isDirty Session
session))
disconnectedTransactionIsDirty SessionId
sessionId conn :: Connection
conn@(RemoteConnection RemoteConnectionConf
_) = forall a b. (Serialise a, Serialise b) => Connection -> a -> IO b
remoteCall Connection
conn (SessionId -> RetrieveSessionIsDirty
RetrieveSessionIsDirty SessionId
sessionId)
        
--used only for testing- we expect this to throw an exception
callTestTimeout_ :: SessionId -> Connection -> IO Bool
callTestTimeout_ :: SessionId -> Connection -> IO Bool
callTestTimeout_ SessionId
_ (InProcessConnection InProcessConnectionConf
_) = forall a. HasCallStack => [Char] -> a
error [Char]
"bad testing call"
callTestTimeout_ SessionId
sessionId conn :: Connection
conn@(RemoteConnection RemoteConnectionConf
_) = forall a b. (Serialise a, Serialise b) => Connection -> a -> IO b
remoteCall Connection
conn (SessionId -> TestTimeout
TestTimeout SessionId
sessionId)

--used in tests only
transactionGraph_ :: Connection -> IO TransactionGraph
transactionGraph_ :: Connection -> IO TransactionGraph
transactionGraph_ (InProcessConnection InProcessConnectionConf
conf) = forall a. TVar a -> IO a
readTVarIO (InProcessConnectionConf -> TVar TransactionGraph
ipTransactionGraph InProcessConnectionConf
conf)
transactionGraph_ Connection
_ = forall a. HasCallStack => [Char] -> a
error [Char]
"remote connection used"

--used in tests only
disconnectedTransaction_ :: SessionId -> Connection -> IO DisconnectedTransaction
disconnectedTransaction_ :: SessionId -> Connection -> IO DisconnectedTransaction
disconnectedTransaction_ SessionId
sessionId (InProcessConnection InProcessConnectionConf
conf) = do
  let sessions :: Map SessionId Session
sessions = InProcessConnectionConf -> Map SessionId Session
ipSessions InProcessConnectionConf
conf
  Maybe Session
mSession <- forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall key value.
Hashable key =>
key -> Map key value -> STM (Maybe value)
StmMap.lookup SessionId
sessionId Map SessionId Session
sessions
  case Maybe Session
mSession of
    Maybe Session
Nothing -> forall a. HasCallStack => [Char] -> a
error [Char]
"No such session"
    Just (Sess.Session DisconnectedTransaction
discon SchemaName
_) -> forall (f :: * -> *) a. Applicative f => a -> f a
pure DisconnectedTransaction
discon
disconnectedTransaction_ SessionId
_ Connection
_= forall a. HasCallStack => [Char] -> a
error [Char]
"remote connection used"

-- wrap a graph evaluation in file locking
commitLock_ :: SessionId -> 
               InProcessConnectionConf -> 
               (TransactionGraph -> 
                STM (Either RelationalError (DisconnectedTransaction, TransactionGraph, [TransactionId]))) -> 
               IO (Either RelationalError ())
commitLock_ :: SessionId
-> InProcessConnectionConf
-> (TransactionGraph
    -> STM
         (Either
            RelationalError
            (DisconnectedTransaction, TransactionGraph, [SessionId])))
-> IO (Either RelationalError ())
commitLock_ SessionId
sessionId InProcessConnectionConf
conf TransactionGraph
-> STM
     (Either
        RelationalError
        (DisconnectedTransaction, TransactionGraph, [SessionId]))
stmBlock = do
  let sessions :: Map SessionId Session
sessions = InProcessConnectionConf -> Map SessionId Session
ipSessions InProcessConnectionConf
conf
      strat :: PersistenceStrategy
strat = InProcessConnectionConf -> PersistenceStrategy
ipPersistenceStrategy InProcessConnectionConf
conf      
      mScriptSession :: Maybe ScriptSession
mScriptSession = InProcessConnectionConf -> Maybe ScriptSession
ipScriptSession InProcessConnectionConf
conf              
      graphTvar :: TVar TransactionGraph
graphTvar = InProcessConnectionConf -> TVar TransactionGraph
ipTransactionGraph InProcessConnectionConf
conf
      clientNodes :: Set ClientInfo
clientNodes = InProcessConnectionConf -> Set ClientInfo
ipClientNodes InProcessConnectionConf
conf      
      mLockFileH :: Maybe (LockFile, MVar LockFileHash)
mLockFileH = InProcessConnectionConf -> Maybe (LockFile, MVar LockFileHash)
ipLocks InProcessConnectionConf
conf
      lockHandler :: (Bool
 -> IO
      (Either
         RelationalError
         (EvaluatedNotifications, [ClientInfo], TransactionGraph,
          [SessionId])))
-> IO
     (Either
        RelationalError
        (EvaluatedNotifications, [ClientInfo], TransactionGraph,
         [SessionId]))
lockHandler Bool
-> IO
     (Either
        RelationalError
        (EvaluatedNotifications, [ClientInfo], TransactionGraph,
         [SessionId]))
body = case Maybe (LockFile, MVar LockFileHash)
mLockFileH of
        Maybe (LockFile, MVar LockFileHash)
Nothing -> Bool
-> IO
     (Either
        RelationalError
        (EvaluatedNotifications, [ClientInfo], TransactionGraph,
         [SessionId]))
body Bool
False
        Just (LockFile
lockFileH, MVar LockFileHash
lockMVar) ->
          let acquireLocks :: IO Bool
acquireLocks = do
                LockFileHash
lastWrittenDigest <- forall a. MVar a -> IO a
takeMVar MVar LockFileHash
lockMVar 
                LockFile -> LockType -> IO ()
lockFile LockFile
lockFileH LockType
WriteLock
                LockFileHash
latestDigest <- PersistenceStrategy -> IO LockFileHash
readGraphTransactionIdDigest PersistenceStrategy
strat
                forall (f :: * -> *) a. Applicative f => a -> f a
pure (LockFileHash
latestDigest forall a. Eq a => a -> a -> Bool
/= LockFileHash
lastWrittenDigest)
              releaseLocks :: Bool -> IO ()
releaseLocks Bool
_ = do
                --still holding the lock- get the latest digest
                LockFileHash
gDigest <- PersistenceStrategy -> IO LockFileHash
readGraphTransactionIdDigest PersistenceStrategy
strat
                LockFile -> IO ()
unlockFile LockFile
lockFileH 
                forall a. MVar a -> a -> IO ()
putMVar MVar LockFileHash
lockMVar LockFileHash
gDigest
          in forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket IO Bool
acquireLocks Bool -> IO ()
releaseLocks Bool
-> IO
     (Either
        RelationalError
        (EvaluatedNotifications, [ClientInfo], TransactionGraph,
         [SessionId]))
body
  Either
  RelationalError
  (EvaluatedNotifications, [ClientInfo], TransactionGraph,
   [SessionId])
manip <- (Bool
 -> IO
      (Either
         RelationalError
         (EvaluatedNotifications, [ClientInfo], TransactionGraph,
          [SessionId])))
-> IO
     (Either
        RelationalError
        (EvaluatedNotifications, [ClientInfo], TransactionGraph,
         [SessionId]))
lockHandler forall a b. (a -> b) -> a -> b
$ \Bool
dbWrittenByOtherProcess -> forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
     Either RelationalError Session
eSession <- SessionId
-> Map SessionId Session -> STM (Either RelationalError Session)
sessionForSessionId SessionId
sessionId Map SessionId Session
sessions
     --handle graph update by other process
     TransactionGraph
oldGraph <- forall a. TVar a -> STM a
readTVar TVar TransactionGraph
graphTvar
     case Either RelationalError Session
eSession of
      Left RelationalError
err -> forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a b. a -> Either a b
Left RelationalError
err)
      Right Session
session -> do
        let dbdir :: [Char]
dbdir = case PersistenceStrategy
strat of
              MinimalPersistence [Char]
x -> [Char]
x
              CrashSafePersistence [Char]
x -> [Char]
x
              PersistenceStrategy
_ -> forall a. HasCallStack => [Char] -> a
error [Char]
"accessing dbdir on non-persisted connection"
        --this should also happen for non-commit expressions
        Either PersistenceError TransactionGraph
eRefreshedGraph <- if Bool
dbWrittenByOtherProcess then
                             forall a. IO a -> STM a
unsafeIOToSTM ([Char]
-> TransactionGraph
-> Maybe ScriptSession
-> IO (Either PersistenceError TransactionGraph)
transactionGraphLoad [Char]
dbdir TransactionGraph
oldGraph Maybe ScriptSession
mScriptSession)
                           else
                             forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a b. b -> Either a b
Right TransactionGraph
oldGraph)
        case Either PersistenceError TransactionGraph
eRefreshedGraph of
          Left PersistenceError
err -> forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a b. a -> Either a b
Left (PersistenceError -> RelationalError
DatabaseLoadError PersistenceError
err))
          Right TransactionGraph
refreshedGraph -> do
            Either
  RelationalError
  (DisconnectedTransaction, TransactionGraph, [SessionId])
eGraph <- TransactionGraph
-> STM
     (Either
        RelationalError
        (DisconnectedTransaction, TransactionGraph, [SessionId]))
stmBlock TransactionGraph
refreshedGraph
            case Either
  RelationalError
  (DisconnectedTransaction, TransactionGraph, [SessionId])
eGraph of
              Left RelationalError
err -> forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a b. a -> Either a b
Left RelationalError
err)
              Right (DisconnectedTransaction
discon', TransactionGraph
graph', [SessionId]
transactionIdsToPersist) -> do
                forall a. TVar a -> a -> STM ()
writeTVar TVar TransactionGraph
graphTvar TransactionGraph
graph'
                let newSession :: Session
newSession = DisconnectedTransaction -> SchemaName -> Session
Session DisconnectedTransaction
discon' (Session -> SchemaName
Sess.schemaName Session
session)
                forall key value.
Hashable key =>
value -> key -> Map key value -> STM ()
StmMap.insert Session
newSession SessionId
sessionId Map SessionId Session
sessions
                case SessionId -> TransactionGraph -> Either RelationalError Transaction
RE.transactionForId (Session -> SessionId
Sess.parentId Session
session) TransactionGraph
oldGraph of
                  Left RelationalError
err -> forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ forall a b. a -> Either a b
Left RelationalError
err
                  Right Transaction
previousTrans ->
                    if Bool -> Bool
not (forall (t :: * -> *) a. Foldable t => t a -> Bool
Prelude.null [SessionId]
transactionIdsToPersist) then do
                      (EvaluatedNotifications
evaldNots, Set ClientInfo
nodes) <- TransactionGraph
-> DatabaseContext
-> DatabaseContext
-> Set ClientInfo
-> STM (EvaluatedNotifications, Set ClientInfo)
executeCommitExprSTM_ TransactionGraph
graph' (Transaction -> DatabaseContext
Trans.concreteDatabaseContext Transaction
previousTrans) (Session -> DatabaseContext
Sess.concreteDatabaseContext Session
session) Set ClientInfo
clientNodes
                      [ClientInfo]
nodesToNotify <- forall v. Set v -> STM [v]
stmSetToList Set ClientInfo
nodes
                      forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ forall a b. b -> Either a b
Right (EvaluatedNotifications
evaldNots, [ClientInfo]
nodesToNotify, TransactionGraph
graph', [SessionId]
transactionIdsToPersist)
                    else forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a b. b -> Either a b
Right (forall k a. Map k a
M.empty, [], TransactionGraph
graph', []))

      --handle notification firing                
  case Either
  RelationalError
  (EvaluatedNotifications, [ClientInfo], TransactionGraph,
   [SessionId])
manip of 
    Left RelationalError
err -> forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a b. a -> Either a b
Left RelationalError
err)
    Right (EvaluatedNotifications
notsToFire, [ClientInfo]
nodesToNotify, TransactionGraph
newGraph, [SessionId]
transactionIdsToPersist) -> do
      --update filesystem database, if necessary
      PersistenceStrategy -> [SessionId] -> TransactionGraph -> IO ()
processTransactionGraphPersistence PersistenceStrategy
strat [SessionId]
transactionIdsToPersist TransactionGraph
newGraph
      [ClientInfo] -> EvaluatedNotifications -> IO ()
sendNotifications [ClientInfo]
nodesToNotify EvaluatedNotifications
notsToFire
      forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a b. b -> Either a b
Right ())

-- | Runs an IO monad, commits the result when the monad returns no errors, otherwise, rolls back the changes and the error.
withTransaction :: SessionId -> Connection -> IO (Either RelationalError a) -> IO (Either RelationalError ()) -> IO (Either RelationalError a)
withTransaction :: forall a.
SessionId
-> Connection
-> IO (Either RelationalError a)
-> IO (Either RelationalError ())
-> IO (Either RelationalError a)
withTransaction SessionId
sessionId Connection
conn IO (Either RelationalError a)
io IO (Either RelationalError ())
successFunc = forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracketOnError (forall (f :: * -> *) a. Applicative f => a -> f a
pure ()) (forall a b. a -> b -> a
const IO (Either RelationalError ())
do_rollback) () -> IO (Either RelationalError a)
block
  where
    do_rollback :: IO (Either RelationalError ())
do_rollback = SessionId -> Connection -> IO (Either RelationalError ())
rollback SessionId
sessionId Connection
conn
    block :: () -> IO (Either RelationalError a)
block ()
_ = do
      Either RelationalError a
eErr <- IO (Either RelationalError a)
io
      case Either RelationalError a
eErr of 
        Left RelationalError
err -> do
          Either RelationalError ()
_ <- IO (Either RelationalError ())
do_rollback
          forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a b. a -> Either a b
Left RelationalError
err)
        Right a
val -> do
            Either RelationalError Bool
eIsDirty <- SessionId -> Connection -> IO (Either RelationalError Bool)
disconnectedTransactionIsDirty SessionId
sessionId Connection
conn
            case Either RelationalError Bool
eIsDirty of
              Left RelationalError
err -> forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a b. a -> Either a b
Left RelationalError
err)
              Right Bool
dirty -> 
                if Bool
dirty then do
                  Either RelationalError ()
res <- IO (Either RelationalError ())
successFunc
                  case Either RelationalError ()
res of
                    Left RelationalError
err -> forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a b. a -> Either a b
Left RelationalError
err)
                    Right ()
_ -> forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a b. b -> Either a b
Right a
val)
                  else -- no updates executed, so don't create a commit
                  forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a b. b -> Either a b
Right a
val)

executeDataFrameExpr :: SessionId -> Connection -> DF.DataFrameExpr -> IO (Either RelationalError DF.DataFrame)
executeDataFrameExpr :: SessionId
-> Connection
-> DataFrameExpr
-> IO (Either RelationalError DataFrame)
executeDataFrameExpr SessionId
sessionId conn :: Connection
conn@(InProcessConnection InProcessConnectionConf
_) DataFrameExpr
dfExpr = do
  Either RelationalError Relation
eRel <- SessionId
-> Connection
-> RelationalExpr
-> IO (Either RelationalError Relation)
executeRelationalExpr SessionId
sessionId Connection
conn (DataFrameExpr -> RelationalExpr
DF.convertExpr DataFrameExpr
dfExpr)
  case Either RelationalError Relation
eRel of
    Left RelationalError
err -> forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a b. a -> Either a b
Left RelationalError
err)
    Right Relation
rel -> do
      let relAttrs :: Attributes
relAttrs = Relation -> Attributes
R.attributes Relation
rel
          attrName :: AttributeOrderExpr -> SchemaName
attrName (DF.AttributeOrderExpr SchemaName
name Order
_) = SchemaName
name
          order :: AttributeOrderExpr -> Order
order (DF.AttributeOrderExpr SchemaName
_ Order
ord) = Order
ord
          orders :: [Order]
orders = forall a b. (a -> b) -> [a] -> [b]
map AttributeOrderExpr -> Order
order (DataFrameExpr -> [AttributeOrderExpr]
DF.orderExprs DataFrameExpr
dfExpr)
          attributeForName' :: SchemaName -> Either RelationalError Attribute
attributeForName' = forall a b c. (a -> b -> c) -> b -> a -> c
flip SchemaName -> Attributes -> Either RelationalError Attribute
attributeForName Attributes
relAttrs 
          attrNames :: [SchemaName]
attrNames = forall a b. (a -> b) -> [a] -> [b]
map AttributeOrderExpr -> SchemaName
attrName (DataFrameExpr -> [AttributeOrderExpr]
DF.orderExprs DataFrameExpr
dfExpr)
          verified :: Either RelationalError [Attribute]
verified = forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM [SchemaName]
attrNames SchemaName -> Either RelationalError Attribute
attributeForName'
      case Either RelationalError [Attribute]
verified of
        Left RelationalError
err -> forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a b. a -> Either a b
Left RelationalError
err)
        Right [Attribute]
attrs -> do
          let attrOrders :: [AttributeOrder]
attrOrders = forall a b c. (a -> b -> c) -> [a] -> [b] -> [c]
zipWith
                            (SchemaName -> Order -> AttributeOrder
DF.AttributeOrder forall b c a. (b -> c) -> (a -> b) -> a -> c
. Attribute -> SchemaName
attributeName)
                           [Attribute]
attrs
                           [Order]
orders
          case [AttributeOrder] -> DataFrame -> Either RelationalError DataFrame
DF.sortDataFrameBy [AttributeOrder]
attrOrders forall b c a. (b -> c) -> (a -> b) -> a -> c
. Relation -> DataFrame
DF.toDataFrame forall a b. (a -> b) -> a -> b
$ Relation
rel of
            Left RelationalError
err -> forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a b. a -> Either a b
Left RelationalError
err)
            Right DataFrame
dFrame -> do
              let dFrame' :: DataFrame
dFrame' = forall b a. b -> (a -> b) -> Maybe a -> b
maybe DataFrame
dFrame (Integer -> DataFrame -> DataFrame
`DF.drop'` DataFrame
dFrame) (DataFrameExpr -> Maybe Integer
DF.offset DataFrameExpr
dfExpr)
                  dFrame'' :: DataFrame
dFrame'' = forall b a. b -> (a -> b) -> Maybe a -> b
maybe DataFrame
dFrame' (Integer -> DataFrame -> DataFrame
`DF.take'` DataFrame
dFrame') (DataFrameExpr -> Maybe Integer
DF.limit DataFrameExpr
dfExpr)
              forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a b. b -> Either a b
Right DataFrame
dFrame'')
executeDataFrameExpr SessionId
sessionId conn :: Connection
conn@(RemoteConnection RemoteConnectionConf
_) DataFrameExpr
dfExpr = forall a b. (Serialise a, Serialise b) => Connection -> a -> IO b
remoteCall Connection
conn (SessionId -> DataFrameExpr -> ExecuteDataFrameExpr
ExecuteDataFrameExpr SessionId
sessionId DataFrameExpr
dfExpr)
        
validateMerkleHashes :: SessionId -> Connection -> IO (Either RelationalError ())
validateMerkleHashes :: SessionId -> Connection -> IO (Either RelationalError ())
validateMerkleHashes SessionId
sessionId (InProcessConnection InProcessConnectionConf
conf) = do
  let sessions :: Map SessionId Session
sessions = InProcessConnectionConf -> Map SessionId Session
ipSessions InProcessConnectionConf
conf
  forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
    Either RelationalError Session
eSession <- SessionId
-> Map SessionId Session -> STM (Either RelationalError Session)
sessionForSessionId SessionId
sessionId Map SessionId Session
sessions
    case Either RelationalError Session
eSession of
      Left RelationalError
err -> forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a b. a -> Either a b
Left RelationalError
err)
      Right Session
_ -> do
        TransactionGraph
graph <- forall a. TVar a -> STM a
readTVar (InProcessConnectionConf -> TVar TransactionGraph
ipTransactionGraph InProcessConnectionConf
conf)
        case TransactionGraph -> Either [MerkleValidationError] ()
Graph.validateMerkleHashes TransactionGraph
graph of
          Left [MerkleValidationError]
merkleErrs -> forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ forall a b. a -> Either a b
Left forall a b. (a -> b) -> a -> b
$ [RelationalError] -> RelationalError
someErrors (forall a b. (a -> b) -> [a] -> [b]
map (\(MerkleValidationError SessionId
tid MerkleHash
expected MerkleHash
actual) -> SessionId -> MerkleHash -> MerkleHash -> RelationalError
MerkleHashValidationError SessionId
tid MerkleHash
expected MerkleHash
actual) [MerkleValidationError]
merkleErrs)
          Right () -> forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a b. b -> Either a b
Right ())
validateMerkleHashes SessionId
sessionId conn :: Connection
conn@RemoteConnection{} = forall a b. (Serialise a, Serialise b) => Connection -> a -> IO b
remoteCall Connection
conn (SessionId -> ExecuteValidateMerkleHashes
ExecuteValidateMerkleHashes SessionId
sessionId)

-- | Calculate a hash on the DDL of the current database context (not the graph). This is useful for validating on the client that the database schema meets the client's expectation. Any DDL change will change this hash. This hash does not change based on the current isomorphic schema being examined. This function is not affected by the current schema (since they are all isomorphic anyway, they should return the same hash).
getDDLHash :: SessionId -> Connection -> IO (Either RelationalError SecureHash)
getDDLHash :: SessionId -> Connection -> IO (Either RelationalError SecureHash)
getDDLHash SessionId
sessionId (InProcessConnection InProcessConnectionConf
conf) = do
  let sessions :: Map SessionId Session
sessions = InProcessConnectionConf -> Map SessionId Session
ipSessions InProcessConnectionConf
conf
  forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
    Either RelationalError Session
eSession <- SessionId
-> Map SessionId Session -> STM (Either RelationalError Session)
sessionForSessionId SessionId
sessionId Map SessionId Session
sessions
    case Either RelationalError Session
eSession of
      Left RelationalError
err -> forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a b. a -> Either a b
Left RelationalError
err)
      Right Session
session -> do
        let ctx :: DatabaseContext
ctx = Session -> DatabaseContext
Sess.concreteDatabaseContext Session
session            
        TransactionGraph
graph <- forall a. TVar a -> STM a
readTVar (InProcessConnectionConf -> TVar TransactionGraph
ipTransactionGraph InProcessConnectionConf
conf)
        forall (f :: * -> *) a. Applicative f => a -> f a
pure (DatabaseContext
-> TransactionGraph -> Either RelationalError SecureHash
ddlHash DatabaseContext
ctx TransactionGraph
graph)
getDDLHash SessionId
sessionId conn :: Connection
conn@RemoteConnection{} = forall a b. (Serialise a, Serialise b) => Connection -> a -> IO b
remoteCall Connection
conn (SessionId -> GetDDLHash
GetDDLHash SessionId
sessionId)

registeredQueriesAsRelation :: SessionId -> Connection -> IO (Either RelationalError Relation)
registeredQueriesAsRelation :: SessionId -> Connection -> IO (Either RelationalError Relation)
registeredQueriesAsRelation SessionId
sessionId (InProcessConnection InProcessConnectionConf
conf) = do
  let sessions :: Map SessionId Session
sessions = InProcessConnectionConf -> Map SessionId Session
ipSessions InProcessConnectionConf
conf
  forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
    Either RelationalError (Session, Schema)
eSession <- SessionId
-> Map SessionId Session
-> STM (Either RelationalError (Session, Schema))
sessionAndSchema SessionId
sessionId Map SessionId Session
sessions
    case Either RelationalError (Session, Schema)
eSession of
      Left RelationalError
err -> forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a b. a -> Either a b
Left RelationalError
err)
      Right (Session
session, Schema
schema) -> do
        let ctx :: DatabaseContext
ctx = Session -> DatabaseContext
Sess.concreteDatabaseContext Session
session        
        forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ Schema -> RegisteredQueries -> Either RelationalError Relation
registeredQueriesAsRelationInSchema Schema
schema (DatabaseContext -> RegisteredQueries
registeredQueries DatabaseContext
ctx)
registeredQueriesAsRelation SessionId
sessionId conn :: Connection
conn@RemoteConnection{} = forall a b. (Serialise a, Serialise b) => Connection -> a -> IO b
remoteCall Connection
conn (SessionId -> RetrieveRegisteredQueries
RetrieveRegisteredQueries SessionId
sessionId)        

type ClientNodes = StmSet.Set ClientInfo

-- internal structure specific to in-process connections
data InProcessConnectionConf = InProcessConnectionConf {
  InProcessConnectionConf -> PersistenceStrategy
ipPersistenceStrategy :: PersistenceStrategy, 
  InProcessConnectionConf -> Set ClientInfo
ipClientNodes :: ClientNodes, 
  InProcessConnectionConf -> Map SessionId Session
ipSessions :: Sessions, 
  InProcessConnectionConf -> TVar TransactionGraph
ipTransactionGraph :: TVar TransactionGraph,
  InProcessConnectionConf -> Maybe ScriptSession
ipScriptSession :: Maybe ScriptSession,
  InProcessConnectionConf -> Maybe (LockFile, MVar LockFileHash)
ipLocks :: Maybe (LockFile, MVar LockFileHash), -- nothing when NoPersistence
  InProcessConnectionConf -> Async ()
ipCallbackAsync :: Async ()
  }

-- clients may connect associate one socket/mvar with the server to register for change callbacks
data ClientInfo = RemoteClientInfo (RPC.Locking Socket) |
                  InProcessClientInfo (MVar EvaluatedNotifications)

instance Eq ClientInfo where
  (RemoteClientInfo Locking Socket
a) == :: ClientInfo -> ClientInfo -> Bool
== (RemoteClientInfo Locking Socket
b) = forall a. Locking a -> a
RPC.lockless Locking Socket
a forall a. Eq a => a -> a -> Bool
== forall a. Locking a -> a
RPC.lockless Locking Socket
b
  (InProcessClientInfo MVar EvaluatedNotifications
a) == (InProcessClientInfo MVar EvaluatedNotifications
b) = MVar EvaluatedNotifications
a forall a. Eq a => a -> a -> Bool
== MVar EvaluatedNotifications
b
  ClientInfo
_ == ClientInfo
_ = Bool
False

instance Hashable ClientInfo where
  hashWithSalt :: Int -> ClientInfo -> Int
hashWithSalt Int
salt (RemoteClientInfo Locking Socket
sock) = forall a. Hashable a => Int -> a -> Int
hashWithSalt Int
salt (forall a. Show a => a -> [Char]
show (forall a. Locking a -> a
RPC.lockless Locking Socket
sock))
  hashWithSalt Int
salt (InProcessClientInfo MVar EvaluatedNotifications
_) = forall a. Hashable a => Int -> a -> Int
hashWithSalt Int
salt (Int
1::Int)