module Database.EventStore
(
Event
, EventData
, createEvent
, withJson
, withJsonAndMetadata
, Connection
, ConnectionException(..)
, Credentials
, Settings(..)
, Retry
, atMost
, keepRetrying
, credentials
, defaultSettings
, connect
, shutdown
, StreamMetadataResult(..)
, readEvent
, readAllEventsBackward
, readAllEventsForward
, readStreamEventsBackward
, readStreamEventsForward
, getStreamMetadata
, StreamACL(..)
, StreamMetadata(..)
, streamMetadataGetCustomPropertyValue
, streamMetadataGetCustomProperty
, emptyStreamACL
, emptyStreamMetadata
, deleteStream
, sendEvent
, sendEvents
, setStreamMetadata
, Builder
, StreamACLBuilder
, buildStreamACL
, modifyStreamACL
, setReadRoles
, setReadRole
, setWriteRoles
, setWriteRole
, setDeleteRoles
, setDeleteRole
, setMetaReadRoles
, setMetaReadRole
, setMetaWriteRoles
, setMetaWriteRole
, StreamMetadataBuilder
, buildStreamMetadata
, modifyStreamMetadata
, setMaxCount
, setMaxAge
, setTruncateBefore
, setCacheControl
, setACL
, modifyACL
, setCustomProperty
, TimeSpan
, timeSpanTicks
, timeSpanHoursMinsSecs
, timeSpanDaysHoursMinsSecs
, timeSpanDaysHoursMinsSecsMillis
, timeSpanGetTicks
, timeSpanGetDays
, timeSpanGetHours
, timeSpanGetMinutes
, timeSpanGetSeconds
, timeSpanGetMillis
, timeSpanFromSeconds
, timeSpanFromMinutes
, timeSpanFromHours
, timeSpanFromDays
, timeSpanTotalMillis
, Transaction
, transactionStart
, transactionCommit
, transactionRollback
, transactionSendEvents
, DropReason(..)
, Identifiable
, Subscription
, NextEvent
, Regular
, Catchup
, Persistent
, subscribe
, subscribeToAll
, subNextEvent
, subId
, subStreamId
, subIsSubscribedToAll
, subResolveLinkTos
, subLastCommitPos
, subLastEventNumber
, subUnsubscribe
, CatchupError(..)
, subscribeFrom
, subscribeToAllFrom
, waitTillCatchup
, hasCaughtUp
, PersistentSubscriptionSettings(..)
, SystemConsumerStrategy(..)
, NakAction(..)
, notifyEventsProcessed
, notifyEventsFailed
, defaultPersistentSubscriptionSettings
, createPersistentSubscription
, updatePersistentSubscription
, deletePersistentSubscription
, connectToPersistentSubscription
, AllEventsSlice(..)
, DeleteResult(..)
, WriteResult(..)
, ReadResult(..)
, RecordedEvent(..)
, StreamEventsSlice(..)
, Position(..)
, ReadDirection(..)
, ReadAllResult(..)
, ReadEventResult(..)
, ResolvedEvent(..)
, ReadStreamResult(..)
, OperationException(..)
, eventResolved
, resolvedEventOriginal
, resolvedEventOriginalStreamId
, resolvedEventOriginalId
, positionStart
, positionEnd
, ExpectedVersion
, anyStream
, noStream
, emptyStream
, exactStream
, module Control.Concurrent.Async
, (<>)
) where
import Control.Concurrent
import Control.Concurrent.STM (atomically)
import Control.Exception
import Data.ByteString.Lazy (fromStrict)
import Data.Int
import Data.Monoid ((<>))
import Control.Concurrent.Async
import Data.Aeson (decode)
import Data.Text hiding (group)
import Database.EventStore.Catchup
import Database.EventStore.Internal.Manager.Subscription
import Database.EventStore.Internal.Operation.DeleteStreamOperation
import Database.EventStore.Internal.Operation.ReadAllEventsOperation
import Database.EventStore.Internal.Operation.ReadEventOperation
import Database.EventStore.Internal.Operation.ReadStreamEventsOperation
import Database.EventStore.Internal.Operation.TransactionStartOperation
import Database.EventStore.Internal.Operation.WriteEventsOperation
import Database.EventStore.Internal.Processor
import Database.EventStore.Internal.TimeSpan
import Database.EventStore.Internal.Types
data Connection
= Connection
{ _runCmd :: Cmd -> IO ()
, _settings :: Settings
}
connect :: Settings
-> String
-> Int
-> IO Connection
connect settings host port = do
processor <- newProcessor settings
processor (DoConnect host port)
return $ Connection processor settings
shutdown :: Connection -> IO ()
shutdown Connection{..} = _runCmd DoShutdown
sendEvent :: Connection
-> Text
-> ExpectedVersion
-> Event
-> IO (Async WriteResult)
sendEvent mgr evt_stream exp_ver evt =
sendEvents mgr evt_stream exp_ver [evt]
sendEvents :: Connection
-> Text
-> ExpectedVersion
-> [Event]
-> IO (Async WriteResult)
sendEvents Connection{..} evt_stream exp_ver evts = do
(as, mvar) <- createAsync
let op = writeEventsOperation _settings mvar evt_stream exp_ver evts
_runCmd (NewOperation op)
return as
deleteStream :: Connection
-> Text
-> ExpectedVersion
-> Maybe Bool
-> IO (Async DeleteResult)
deleteStream Connection{..} evt_stream exp_ver hard_del = do
(as, mvar) <- createAsync
let op = deleteStreamOperation _settings mvar evt_stream exp_ver hard_del
_runCmd (NewOperation op)
return as
transactionStart :: Connection
-> Text
-> ExpectedVersion
-> IO (Async Transaction)
transactionStart Connection{..} evt_stream exp_ver = do
(as, mvar) <- createAsync
let op = transactionStartOperation _settings
_runCmd
mvar
evt_stream
exp_ver
_runCmd (NewOperation op)
return as
readEvent :: Connection
-> Text
-> Int32
-> Bool
-> IO (Async ReadResult)
readEvent Connection{..} stream_id evt_num res_link_tos = do
(as, mvar) <- createAsync
let op = readEventOperation _settings mvar stream_id evt_num res_link_tos
_runCmd (NewOperation op)
return as
readStreamEventsForward :: Connection
-> Text
-> Int32
-> Int32
-> Bool
-> IO (Async StreamEventsSlice)
readStreamEventsForward mgr =
readStreamEventsCommon mgr Forward
readStreamEventsBackward :: Connection
-> Text
-> Int32
-> Int32
-> Bool
-> IO (Async StreamEventsSlice)
readStreamEventsBackward mgr =
readStreamEventsCommon mgr Backward
readStreamEventsCommon :: Connection
-> ReadDirection
-> Text
-> Int32
-> Int32
-> Bool
-> IO (Async StreamEventsSlice)
readStreamEventsCommon Connection{..} dir stream_id start cnt res_link_tos = do
(as, mvar) <- createAsync
let op = readStreamEventsOperation _settings
dir
mvar
stream_id
start
cnt
res_link_tos
_runCmd (NewOperation op)
return as
readAllEventsForward :: Connection
-> Position
-> Int32
-> Bool
-> IO (Async AllEventsSlice)
readAllEventsForward mgr =
readAllEventsCommon mgr Forward
readAllEventsBackward :: Connection
-> Position
-> Int32
-> Bool
-> IO (Async AllEventsSlice)
readAllEventsBackward mgr =
readAllEventsCommon mgr Backward
readAllEventsCommon :: Connection
-> ReadDirection
-> Position
-> Int32
-> Bool
-> IO (Async AllEventsSlice)
readAllEventsCommon Connection{..} dir pos max_c res_link_tos = do
(as, mvar) <- createAsync
let op = readAllEventsOperation _settings
dir
mvar
c_pos
p_pos
max_c
res_link_tos
_runCmd (NewOperation op)
return as
where
Position c_pos p_pos = pos
subscribe :: Connection
-> Text
-> Bool
-> IO (Async (Subscription Regular))
subscribe Connection{..} stream_id res_lnk_tos = do
tmp <- newEmptyMVar
_runCmd (NewSub stream_id res_lnk_tos (putMVar tmp))
async $ readMVar tmp
subscribeToAll :: Connection
-> Bool
-> IO (Async (Subscription Regular))
subscribeToAll Connection{..} res_lnk_tos = do
tmp <- newEmptyMVar
_runCmd (NewSub "" res_lnk_tos (putMVar tmp))
async $ readMVar tmp
subscribeFrom :: Connection
-> Text
-> Bool
-> Maybe Int32
-> Maybe Int32
-> IO (Subscription Catchup)
subscribeFrom conn stream_id res_lnk_tos last_chk_pt batch_m = do
catchupStart evts_fwd get_sub stream_id batch_m last_chk_pt
where
evts_fwd cur_num batch_size =
readStreamEventsForward conn stream_id cur_num batch_size res_lnk_tos
get_sub = subscribe conn stream_id res_lnk_tos
subscribeToAllFrom :: Connection
-> Bool
-> Maybe Position
-> Maybe Int32
-> IO (Subscription Catchup)
subscribeToAllFrom conn res_lnk_tos last_chk_pt batch_m = do
catchupAllStart evts_fwd get_sub last_chk_pt batch_m
where
evts_fwd pos batch_size =
readAllEventsForward conn pos batch_size res_lnk_tos
get_sub = subscribeToAll conn res_lnk_tos
setStreamMetadata :: Connection
-> Text
-> ExpectedVersion
-> StreamMetadata
-> IO (Async WriteResult)
setStreamMetadata conn evt_stream exp_ver metadata =
let dat = withJson $ streamMetadataJSON metadata
evt = createEvent "$metadata" Nothing dat in
sendEvent conn (metaStreamOf evt_stream) exp_ver evt
getStreamMetadata :: Connection -> Text -> IO (Async StreamMetadataResult)
getStreamMetadata conn evt_stream = do
as <- readEvent conn (metaStreamOf evt_stream) (1) False
async $ atomically $ waitSTM as >>= extractStreamMetadataResult evt_stream
extractStreamMetadataResult :: Monad m
=> Text
-> ReadResult
-> m StreamMetadataResult
extractStreamMetadataResult stream rres =
case readResultStatus rres of
RE_SUCCESS ->
case action of
Just orig ->
case decode $ fromStrict $ recordedEventData orig of
Just s ->
let res = StreamMetadataResult
{ streamMetaResultStream = stream
, streamMetaResultVersion = evt_number
, streamMetaResultData = s
} in
return res
Nothing -> fail "StreamMetadata: wrong format."
Nothing -> fail "impossible: extractStreamMetadataResult"
RE_STREAM_DELETED -> return $ DeletedStreamMetadataResult stream
RE_NOT_FOUND -> return $ NotFoundStreamMetadataResult stream
RE_NO_STREAM -> return $ NotFoundStreamMetadataResult stream
_ -> fail "unexpected ReadEventResult"
where
action = readResultResolvedEvent rres >>= resolvedEventOriginal
evt_number = readResultEventNumber rres
createPersistentSubscription :: Connection
-> Text
-> Text
-> PersistentSubscriptionSettings
-> IO (Async ())
createPersistentSubscription Connection{..} group stream sett = do
(as, mvar) <- createAsync
_runCmd (CreatePersist group stream sett (putMVar mvar))
return as
updatePersistentSubscription :: Connection
-> Text
-> Text
-> PersistentSubscriptionSettings
-> IO (Async ())
updatePersistentSubscription Connection{..} group stream sett = do
(as, mvar) <- createAsync
_runCmd (UpdatePersist group stream sett (putMVar mvar))
return as
deletePersistentSubscription :: Connection
-> Text
-> Text
-> IO (Async ())
deletePersistentSubscription Connection{..} group stream = do
(as, mvar) <- createAsync
_runCmd (DeletePersist group stream (putMVar mvar))
return as
connectToPersistentSubscription :: Connection
-> Text
-> Text
-> Int32
-> IO (Async (Subscription Persistent))
connectToPersistentSubscription Connection{..} group stream bufSize = do
mvar <- newEmptyMVar
_runCmd (ConnectPersist group stream bufSize (putMVar mvar))
async $ readMVar mvar
createAsync :: IO (Async a, MVar (OperationExceptional a))
createAsync = do
mvar <- newEmptyMVar
as <- async $ do
res <- readMVar mvar
either throwIO return res
return (as, mvar)
metaStreamOf :: Text -> Text
metaStreamOf s = "$$" <> s