module Database.EventStore
(
Connection
, ConnectionType(..)
, ConnectionException(..)
, Credentials
, Settings(..)
, Retry
, atMost
, keepRetrying
, credentials
, defaultSettings
, defaultSSLSettings
, connect
, shutdown
, waitTillClosed
, connectionSettings
, ClusterSettings(..)
, DnsServer(..)
, GossipSeed
, gossipSeed
, gossipSeedWithHeader
, gossipSeedHost
, gossipSeedHeader
, gossipSeedPort
, gossipSeedClusterSettings
, dnsClusterSettings
, Event
, EventData
, EventType(..)
, createEvent
, withJson
, withJsonAndMetadata
, withBinary
, withBinaryAndMetadata
, StreamMetadataResult(..)
, readEvent
, readAllEventsBackward
, readAllEventsForward
, readStreamEventsBackward
, readStreamEventsForward
, getStreamMetadata
, StreamACL(..)
, StreamMetadata(..)
, getCustomPropertyValue
, getCustomProperty
, emptyStreamACL
, emptyStreamMetadata
, deleteStream
, sendEvent
, sendEvents
, setStreamMetadata
, Builder
, StreamACLBuilder
, buildStreamACL
, modifyStreamACL
, setReadRoles
, setReadRole
, setWriteRoles
, setWriteRole
, setDeleteRoles
, setDeleteRole
, setMetaReadRoles
, setMetaReadRole
, setMetaWriteRoles
, setMetaWriteRole
, StreamMetadataBuilder
, buildStreamMetadata
, modifyStreamMetadata
, setMaxCount
, setMaxAge
, setTruncateBefore
, setCacheControl
, setACL
, modifyACL
, setCustomProperty
, Transaction
, TransactionId
, startTransaction
, transactionId
, transactionCommit
, transactionRollback
, transactionWrite
, SubscriptionClosed(..)
, SubscriptionId
, Subscription
, SubDropReason(..)
, waitConfirmation
, unsubscribeConfirmed
, unsubscribeConfirmedSTM
, waitUnsubscribeConfirmed
, Regular
, subscribe
, subscribeToAll
, getSubId
, getSubStream
, isSubscribedToAll
, unsubscribe
, nextEvent
, nextEventMaybe
, getSubResolveLinkTos
, getSubLastCommitPos
, getSubLastEventNumber
, Catchup
, subscribeFrom
, subscribeToAllFrom
, waitTillCatchup
, hasCaughtUp
, hasCaughtUpSTM
, Persistent
, PersistentSubscriptionSettings(..)
, SystemConsumerStrategy(..)
, NakAction(..)
, PersistActionException(..)
, acknowledge
, acknowledgeEvents
, failed
, eventsFailed
, notifyEventsProcessed
, notifyEventsFailed
, defaultPersistentSubscriptionSettings
, createPersistentSubscription
, updatePersistentSubscription
, deletePersistentSubscription
, connectToPersistentSubscription
, Slice(..)
, AllSlice
, Op.DeleteResult(..)
, WriteResult(..)
, ReadResult(..)
, RecordedEvent(..)
, Op.ReadEvent(..)
, StreamType(..)
, StreamSlice
, Position(..)
, ReadDirection(..)
, ResolvedEvent(..)
, OperationError(..)
, StreamName(..)
, isEventResolvedLink
, resolvedEventOriginal
, resolvedEventDataAsJson
, resolvedEventOriginalStreamId
, resolvedEventOriginalId
, recordedEventDataAsJson
, positionStart
, positionEnd
, Command
, DropReason(..)
, ExpectedVersion
, anyVersion
, noStreamVersion
, emptyStreamVersion
, exactEventVersion
, streamExists
, waitAsync
, (<>)
, NonEmpty(..)
, nonEmpty
, TLSSettings
) where
import Data.Int
import Data.Maybe
import ClassyPrelude hiding (Builder, group)
import Data.List.NonEmpty(NonEmpty(..), nonEmpty)
import Network.Connection (TLSSettings)
import Database.EventStore.Internal.Command
import Database.EventStore.Internal.Connection
import Database.EventStore.Internal.Discovery
import Database.EventStore.Internal.Subscription
import Database.EventStore.Internal.Manager.Subscription.Driver hiding (unsubscribe)
import Database.EventStore.Internal.Manager.Subscription.Message
import Database.EventStore.Internal.Operation (OperationError(..))
import qualified Database.EventStore.Internal.Operations as Op
import Database.EventStore.Internal.Operation.Read.Common
import Database.EventStore.Internal.Operation.Write.Common
import Database.EventStore.Internal.Stream
import Database.EventStore.Internal.Types
import Database.EventStore.Internal.Execution.Production
data ConnectionType
= Static String Int
| Cluster ClusterSettings
| Dns ByteString (Maybe DnsServer) Int
data Connection
= Connection
{ _prod :: Production
, _settings :: Settings
, _type :: ConnectionType
}
connect :: Settings -> ConnectionType -> IO Connection
connect settings tpe = do
disc <- case tpe of
Static host port -> return $ staticEndPointDiscovery host port
Cluster setts -> clusterDnsEndPointDiscovery setts
Dns dom srv port -> return $ simpleDnsEndPointDiscovery dom srv port
prod <- newExecutionModel settings disc
return $ Connection prod settings tpe
waitTillClosed :: Connection -> IO ()
waitTillClosed Connection{..} = prodWaitTillClosed _prod
connectionSettings :: Connection -> Settings
connectionSettings = _settings
shutdown :: Connection -> IO ()
shutdown Connection{..} = shutdownExecutionModel _prod
sendEvent :: Connection
-> Text
-> ExpectedVersion
-> Event
-> IO (Async WriteResult)
sendEvent mgr evt_stream exp_ver evt =
sendEvents mgr evt_stream exp_ver [evt]
sendEvents :: Connection
-> Text
-> ExpectedVersion
-> [Event]
-> IO (Async WriteResult)
sendEvents Connection{..} evt_stream exp_ver evts = do
(k, as) <- createOpAsync
let op = Op.writeEvents _settings evt_stream exp_ver evts
pushOperation _prod k op
return as
deleteStream :: Connection
-> Text
-> ExpectedVersion
-> Maybe Bool
-> IO (Async Op.DeleteResult)
deleteStream Connection{..} evt_stream exp_ver hard_del = do
(k, as) <- createOpAsync
let op = Op.deleteStream _settings evt_stream exp_ver hard_del
pushOperation _prod k op
return as
data Transaction =
Transaction
{ _tStream :: Text
, _tTransId :: TransactionId
, _tExpVer :: ExpectedVersion
, _tConn :: Connection
}
newtype TransactionId =
TransactionId { _unTransId :: Int64 }
deriving (Eq, Ord, Show)
transactionId :: Transaction -> TransactionId
transactionId = _tTransId
startTransaction :: Connection
-> Text
-> ExpectedVersion
-> IO (Async Transaction)
startTransaction conn@Connection{..} evt_stream exp_ver = do
(k, as) <- createOpAsync
let op = Op.transactionStart _settings evt_stream exp_ver
pushOperation _prod k op
let _F trans_id =
Transaction
{ _tStream = evt_stream
, _tTransId = TransactionId trans_id
, _tExpVer = exp_ver
, _tConn = conn
}
return $ fmap _F as
transactionWrite :: Transaction -> [Event] -> IO (Async ())
transactionWrite Transaction{..} evts = do
(k, as) <- createOpAsync
let Connection{..} = _tConn
raw_id = _unTransId _tTransId
op = Op.transactionWrite _settings _tStream _tExpVer raw_id evts
pushOperation _prod k op
return as
transactionCommit :: Transaction -> IO (Async WriteResult)
transactionCommit Transaction{..} = do
(k, as) <- createOpAsync
let Connection{..} = _tConn
raw_id = _unTransId _tTransId
op = Op.transactionCommit _settings _tStream _tExpVer raw_id
pushOperation _prod k op
return as
transactionRollback :: Transaction -> IO ()
transactionRollback _ = return ()
readEvent :: Connection
-> Text
-> Int32
-> Bool
-> IO (Async (ReadResult 'RegularStream Op.ReadEvent))
readEvent Connection{..} stream_id evt_num res_link_tos = do
(k, as) <- createOpAsync
let op = Op.readEvent _settings stream_id evt_num res_link_tos
pushOperation _prod k op
return as
readStreamEventsForward :: Connection
-> Text
-> Int32
-> Int32
-> Bool
-> IO (Async (ReadResult 'RegularStream StreamSlice))
readStreamEventsForward mgr =
readStreamEventsCommon mgr Forward
readStreamEventsBackward :: Connection
-> Text
-> Int32
-> Int32
-> Bool
-> IO (Async (ReadResult 'RegularStream StreamSlice))
readStreamEventsBackward mgr =
readStreamEventsCommon mgr Backward
readStreamEventsCommon :: Connection
-> ReadDirection
-> Text
-> Int32
-> Int32
-> Bool
-> IO (Async (ReadResult 'RegularStream StreamSlice))
readStreamEventsCommon Connection{..} dir stream_id start cnt res_link_tos = do
(k, as) <- createOpAsync
let op = Op.readStreamEvents _settings dir stream_id start cnt res_link_tos
pushOperation _prod k op
return as
readAllEventsForward :: Connection
-> Position
-> Int32
-> Bool
-> IO (Async AllSlice)
readAllEventsForward mgr =
readAllEventsCommon mgr Forward
readAllEventsBackward :: Connection
-> Position
-> Int32
-> Bool
-> IO (Async AllSlice)
readAllEventsBackward mgr =
readAllEventsCommon mgr Backward
readAllEventsCommon :: Connection
-> ReadDirection
-> Position
-> Int32
-> Bool
-> IO (Async AllSlice)
readAllEventsCommon Connection{..} dir pos max_c res_link_tos = do
(k, as) <- createOpAsync
let op = Op.readAllEvents _settings c_pos p_pos max_c res_link_tos dir
pushOperation _prod k op
return as
where
Position c_pos p_pos = pos
mkSubEnv :: Connection -> SubEnv
mkSubEnv Connection{..} =
SubEnv
{ subSettings = _settings
, subPushOp = pushOperation _prod
, subPushConnect = \k cmd ->
case cmd of
PushRegular stream tos ->
pushConnectStream _prod k stream tos
PushPersistent group stream size ->
pushConnectPersist _prod k group stream size
, subPushUnsub = pushUnsubscribe _prod
, subAckCmd = \cmd run uuids ->
case cmd of
AckCmd -> pushAckPersist _prod run uuids
NakCmd act res -> pushNakPersist _prod run act res uuids
, subForceReconnect = \node ->
pushForceReconnect _prod node
}
subscribe :: Connection
-> Text
-> Bool
-> IO (Subscription Regular)
subscribe conn streamId resLnkTos =
regularSub (mkSubEnv conn) streamId resLnkTos
subscribeToAll :: Connection
-> Bool
-> IO (Subscription Regular)
subscribeToAll conn = subscribe conn ""
subscribeFrom :: Connection
-> Text
-> Bool
-> Maybe Int32
-> Maybe Int32
-> IO (Subscription Catchup)
subscribeFrom conn streamId resLnkTos lastChkPt batch =
subscribeFromCommon conn resLnkTos batch tpe
where
tpe = Op.RegularCatchup streamId (fromMaybe 0 lastChkPt)
subscribeToAllFrom :: Connection
-> Bool
-> Maybe Position
-> Maybe Int32
-> IO (Subscription Catchup)
subscribeToAllFrom conn resLnkTos lastChkPt batch =
subscribeFromCommon conn resLnkTos batch tpe
where
Position cPos pPos = fromMaybe positionStart lastChkPt
tpe = Op.AllCatchup cPos pPos
subscribeFromCommon :: Connection
-> Bool
-> Maybe Int32
-> Op.CatchupState
-> IO (Subscription Catchup)
subscribeFromCommon conn resLnkTos batch tpe =
catchupSub (mkSubEnv conn) params
where
params = CatchupParams { catchupResLnkTos = resLnkTos
, catchupState = tpe
, catchupBatchSize = batch
}
setStreamMetadata :: Connection
-> Text
-> ExpectedVersion
-> StreamMetadata
-> IO (Async WriteResult)
setStreamMetadata Connection{..} evt_stream exp_ver metadata = do
(k, as) <- createOpAsync
let op = Op.setMetaStream _settings evt_stream exp_ver metadata
pushOperation _prod k op
return as
getStreamMetadata :: Connection -> Text -> IO (Async StreamMetadataResult)
getStreamMetadata Connection{..} evt_stream = do
(k, as) <- createOpAsync
let op = Op.readMetaStream _settings evt_stream
pushOperation _prod k op
return as
createPersistentSubscription :: Connection
-> Text
-> Text
-> PersistentSubscriptionSettings
-> IO (Async (Maybe PersistActionException))
createPersistentSubscription Connection{..} group stream sett = do
mvar <- newEmptyTMVarIO
let _F res = atomically $
case res of
Left e -> putTMVar mvar (Just e)
_ -> putTMVar mvar Nothing
pushCreatePersist _prod _F group stream sett
async $ atomically $ readTMVar mvar
updatePersistentSubscription :: Connection
-> Text
-> Text
-> PersistentSubscriptionSettings
-> IO (Async (Maybe PersistActionException))
updatePersistentSubscription Connection{..} group stream sett = do
mvar <- newEmptyTMVarIO
let _F res = atomically $
case res of
Left e -> putTMVar mvar (Just e)
_ -> putTMVar mvar Nothing
pushUpdatePersist _prod _F group stream sett
async $ atomically $ readTMVar mvar
deletePersistentSubscription :: Connection
-> Text
-> Text
-> IO (Async (Maybe PersistActionException))
deletePersistentSubscription Connection{..} group stream = do
mvar <- newEmptyTMVarIO
let _F res = atomically $
case res of
Left e -> putTMVar mvar (Just e)
_ -> putTMVar mvar Nothing
pushDeletePersist _prod _F group stream
async $ atomically $ readTMVar mvar
connectToPersistentSubscription :: Connection
-> Text
-> Text
-> Int32
-> IO (Subscription Persistent)
connectToPersistentSubscription conn group stream bufSize =
persistentSub (mkSubEnv conn) group stream bufSize
createOpAsync :: IO (Either OperationError a -> IO (), Async a)
createOpAsync = do
mvar <- newEmptyMVar
as <- async $ do
res <- readMVar mvar
either throwIO return res
return (putMVar mvar, as)