{-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE RecordWildCards #-} -------------------------------------------------------------------------------- -- | -- Module : Database.EventStore -- Copyright : (C) 2014 Yorick Laupa -- License : (see the file LICENSE) -- -- Maintainer : Yorick Laupa -- Stability : provisional -- Portability : non-portable -- -------------------------------------------------------------------------------- module Database.EventStore ( -- * Event Event , EventData , createEvent , withJson , withJsonAndMetadata -- * Connection , Connection , ConnectionException(..) , Credentials , Settings(..) , Retry , atMost , keepRetrying , credentials , defaultSettings , connect , shutdown -- * Read Operations , StreamMetadataResult(..) , readEvent , readAllEventsBackward , readAllEventsForward , readStreamEventsBackward , readStreamEventsForward , getStreamMetadata -- * Write Operations , StreamACL(..) , StreamMetadata(..) , streamMetadataGetCustomPropertyValue , streamMetadataGetCustomProperty , emptyStreamACL , emptyStreamMetadata , deleteStream , sendEvent , sendEvents , setStreamMetadata -- * Builder , Builder -- * Stream ACL Builder , StreamACLBuilder , buildStreamACL , modifyStreamACL , setReadRoles , setReadRole , setWriteRoles , setWriteRole , setDeleteRoles , setDeleteRole , setMetaReadRoles , setMetaReadRole , setMetaWriteRoles , setMetaWriteRole -- * Stream Metadata Builder , StreamMetadataBuilder , buildStreamMetadata , modifyStreamMetadata , setMaxCount , setMaxAge , setTruncateBefore , setCacheControl , setACL , modifyACL , setCustomProperty -- * TimeSpan , TimeSpan , timeSpanTicks , timeSpanHoursMinsSecs , timeSpanDaysHoursMinsSecs , timeSpanDaysHoursMinsSecsMillis , timeSpanGetTicks , timeSpanGetDays , timeSpanGetHours , timeSpanGetMinutes , timeSpanGetSeconds , timeSpanGetMillis , timeSpanFromSeconds , timeSpanFromMinutes , timeSpanFromHours , timeSpanFromDays , timeSpanTotalMillis -- * Transaction , Transaction , transactionStart , transactionCommit , transactionRollback , transactionSendEvents -- * Volatile Subscription , DropReason(..) , Identifiable , Subscription , NextEvent , Regular , Catchup , Persistent , subscribe , subscribeToAll , subNextEvent , subId , subStreamId , subIsSubscribedToAll , subResolveLinkTos , subLastCommitPos , subLastEventNumber , subUnsubscribe -- * Catch-up Subscription , CatchupError(..) , subscribeFrom , subscribeToAllFrom , waitTillCatchup , hasCaughtUp -- * Persistent Subscription , PersistentSubscriptionSettings(..) , SystemConsumerStrategy(..) , NakAction(..) , notifyEventsProcessed , notifyEventsFailed , defaultPersistentSubscriptionSettings , createPersistentSubscription , updatePersistentSubscription , deletePersistentSubscription , connectToPersistentSubscription -- * Results , AllEventsSlice(..) , DeleteResult(..) , WriteResult(..) , ReadResult(..) , RecordedEvent(..) , StreamEventsSlice(..) , Position(..) , ReadDirection(..) , ReadAllResult(..) , ReadEventResult(..) , ResolvedEvent(..) , ReadStreamResult(..) , OperationException(..) , eventResolved , resolvedEventOriginal , resolvedEventOriginalStreamId , resolvedEventOriginalId , positionStart , positionEnd -- * Misc , ExpectedVersion , anyStream , noStream , emptyStream , exactStream -- * Re-export , module Control.Concurrent.Async , (<>) ) where -------------------------------------------------------------------------------- import Control.Concurrent import Control.Concurrent.STM (atomically) import Control.Exception import Data.ByteString.Lazy (fromStrict) import Data.Int import Data.Monoid ((<>)) -------------------------------------------------------------------------------- import Control.Concurrent.Async import Data.Aeson (decode) import Data.Text hiding (group) -------------------------------------------------------------------------------- import Database.EventStore.Catchup import Database.EventStore.Internal.Manager.Subscription import Database.EventStore.Internal.Operation.DeleteStreamOperation import Database.EventStore.Internal.Operation.ReadAllEventsOperation import Database.EventStore.Internal.Operation.ReadEventOperation import Database.EventStore.Internal.Operation.ReadStreamEventsOperation import Database.EventStore.Internal.Operation.TransactionStartOperation import Database.EventStore.Internal.Operation.WriteEventsOperation import Database.EventStore.Internal.Processor import Database.EventStore.Internal.TimeSpan import Database.EventStore.Internal.Types -------------------------------------------------------------------------------- -- Connection -------------------------------------------------------------------------------- -- | Represents a connection to a single EventStore node. data Connection = Connection { _runCmd :: Cmd -> IO () , _settings :: Settings } -------------------------------------------------------------------------------- -- | Creates a new 'Connection' to a single node. It maintains a full duplex -- connection to the EventStore. An EventStore 'Connection' operates quite -- differently than say a SQL connection. Normally when you use a SQL -- connection you want to keep the connection open for a much longer of time -- than when you use a SQL connection. -- -- Another difference is that with the EventStore 'Connection' all operation -- are handled in a full async manner (even if you call the synchronous -- behaviors). Many threads can use an EvenStore 'Connection' at the same time -- or a single thread can make many asynchronous requests. To get the most -- performance out of the connection it is generally recommend to use it in -- this way. connect :: Settings -> String -- ^ HostName -> Int -- ^ Port -> IO Connection connect settings host port = do processor <- newProcessor settings processor (DoConnect host port) return $ Connection processor settings -------------------------------------------------------------------------------- -- | Asynchronously closes the 'Connection'. shutdown :: Connection -> IO () shutdown Connection{..} = _runCmd DoShutdown -------------------------------------------------------------------------------- -- | Sends a single 'Event' to given stream. sendEvent :: Connection -> Text -- ^ Stream name -> ExpectedVersion -> Event -> IO (Async WriteResult) sendEvent mgr evt_stream exp_ver evt = sendEvents mgr evt_stream exp_ver [evt] -------------------------------------------------------------------------------- -- | Sends a list of 'Event' to given stream. sendEvents :: Connection -> Text -- ^ Stream name -> ExpectedVersion -> [Event] -> IO (Async WriteResult) sendEvents Connection{..} evt_stream exp_ver evts = do (as, mvar) <- createAsync let op = writeEventsOperation _settings mvar evt_stream exp_ver evts _runCmd (NewOperation op) return as -------------------------------------------------------------------------------- -- | Deletes given stream. deleteStream :: Connection -> Text -- ^ Stream name -> ExpectedVersion -> Maybe Bool -- ^ Hard delete -> IO (Async DeleteResult) deleteStream Connection{..} evt_stream exp_ver hard_del = do (as, mvar) <- createAsync let op = deleteStreamOperation _settings mvar evt_stream exp_ver hard_del _runCmd (NewOperation op) return as -------------------------------------------------------------------------------- -- | Starts a transaction on given stream. transactionStart :: Connection -> Text -- ^ Stream name -> ExpectedVersion -> IO (Async Transaction) transactionStart Connection{..} evt_stream exp_ver = do (as, mvar) <- createAsync let op = transactionStartOperation _settings _runCmd mvar evt_stream exp_ver _runCmd (NewOperation op) return as -------------------------------------------------------------------------------- -- | Reads a single event from given stream. readEvent :: Connection -> Text -- ^ Stream name -> Int32 -- ^ Event number -> Bool -- ^ Resolve Link Tos -> IO (Async ReadResult) readEvent Connection{..} stream_id evt_num res_link_tos = do (as, mvar) <- createAsync let op = readEventOperation _settings mvar stream_id evt_num res_link_tos _runCmd (NewOperation op) return as -------------------------------------------------------------------------------- -- | Reads events from a given stream forward. readStreamEventsForward :: Connection -> Text -- ^ Stream name -> Int32 -- ^ From event number -> Int32 -- ^ Batch size -> Bool -- ^ Resolve Link Tos -> IO (Async StreamEventsSlice) readStreamEventsForward mgr = readStreamEventsCommon mgr Forward -------------------------------------------------------------------------------- -- | Reads events from a given stream backward. readStreamEventsBackward :: Connection -> Text -- ^ Stream name -> Int32 -- ^ From event number -> Int32 -- ^ Batch size -> Bool -- ^ Resolve Link Tos -> IO (Async StreamEventsSlice) readStreamEventsBackward mgr = readStreamEventsCommon mgr Backward -------------------------------------------------------------------------------- readStreamEventsCommon :: Connection -> ReadDirection -> Text -> Int32 -> Int32 -> Bool -> IO (Async StreamEventsSlice) readStreamEventsCommon Connection{..} dir stream_id start cnt res_link_tos = do (as, mvar) <- createAsync let op = readStreamEventsOperation _settings dir mvar stream_id start cnt res_link_tos _runCmd (NewOperation op) return as -------------------------------------------------------------------------------- -- | Reads events from the $all stream forward. readAllEventsForward :: Connection -> Position -> Int32 -- ^ Batch size -> Bool -- ^ Resolve Link Tos -> IO (Async AllEventsSlice) readAllEventsForward mgr = readAllEventsCommon mgr Forward -------------------------------------------------------------------------------- -- | Reads events from the $all stream backward readAllEventsBackward :: Connection -> Position -> Int32 -- ^ Batch size -> Bool -- ^ Resolve Link Tos -> IO (Async AllEventsSlice) readAllEventsBackward mgr = readAllEventsCommon mgr Backward -------------------------------------------------------------------------------- readAllEventsCommon :: Connection -> ReadDirection -> Position -> Int32 -> Bool -> IO (Async AllEventsSlice) readAllEventsCommon Connection{..} dir pos max_c res_link_tos = do (as, mvar) <- createAsync let op = readAllEventsOperation _settings dir mvar c_pos p_pos max_c res_link_tos _runCmd (NewOperation op) return as where Position c_pos p_pos = pos -------------------------------------------------------------------------------- -- | Subcribes to given stream. subscribe :: Connection -> Text -- ^ Stream name -> Bool -- ^ Resolve Link Tos -> IO (Async (Subscription Regular)) subscribe Connection{..} stream_id res_lnk_tos = do tmp <- newEmptyMVar _runCmd (NewSub stream_id res_lnk_tos (putMVar tmp)) async $ readMVar tmp -------------------------------------------------------------------------------- -- | Subcribes to $all stream. subscribeToAll :: Connection -> Bool -- ^ Resolve Link Tos -> IO (Async (Subscription Regular)) subscribeToAll Connection{..} res_lnk_tos = do tmp <- newEmptyMVar _runCmd (NewSub "" res_lnk_tos (putMVar tmp)) async $ readMVar tmp -------------------------------------------------------------------------------- -- | Subscribes to given stream. If last checkpoint is defined, this will -- 'readStreamEventsForward' from that event number, otherwise from the -- beginning. Once last stream event reached up, a subscription request will -- be sent using 'subscribe'. subscribeFrom :: Connection -> Text -- ^ Stream name -> Bool -- ^ Resolve Link Tos -> Maybe Int32 -- ^ Last checkpoint -> Maybe Int32 -- ^ Batch size -> IO (Subscription Catchup) subscribeFrom conn stream_id res_lnk_tos last_chk_pt batch_m = do catchupStart evts_fwd get_sub stream_id batch_m last_chk_pt where evts_fwd cur_num batch_size = readStreamEventsForward conn stream_id cur_num batch_size res_lnk_tos get_sub = subscribe conn stream_id res_lnk_tos -------------------------------------------------------------------------------- -- | Same as 'subscribeFrom' but applied to $all stream. subscribeToAllFrom :: Connection -> Bool -- ^ Resolve Link Tos -> Maybe Position -- ^ Last checkpoint -> Maybe Int32 -- ^ Batch size -> IO (Subscription Catchup) subscribeToAllFrom conn res_lnk_tos last_chk_pt batch_m = do catchupAllStart evts_fwd get_sub last_chk_pt batch_m where evts_fwd pos batch_size = readAllEventsForward conn pos batch_size res_lnk_tos get_sub = subscribeToAll conn res_lnk_tos -------------------------------------------------------------------------------- -- | Asynchronously sets the metadata for a stream. setStreamMetadata :: Connection -> Text -> ExpectedVersion -> StreamMetadata -> IO (Async WriteResult) setStreamMetadata conn evt_stream exp_ver metadata = let dat = withJson $ streamMetadataJSON metadata evt = createEvent "$metadata" Nothing dat in sendEvent conn (metaStreamOf evt_stream) exp_ver evt -------------------------------------------------------------------------------- -- | Asynchronously gets the metadata of a stream. getStreamMetadata :: Connection -> Text -> IO (Async StreamMetadataResult) getStreamMetadata conn evt_stream = do as <- readEvent conn (metaStreamOf evt_stream) (-1) False async $ atomically $ waitSTM as >>= extractStreamMetadataResult evt_stream -------------------------------------------------------------------------------- extractStreamMetadataResult :: Monad m => Text -> ReadResult -> m StreamMetadataResult extractStreamMetadataResult stream rres = case readResultStatus rres of RE_SUCCESS -> case action of Just orig -> case decode $ fromStrict $ recordedEventData orig of Just s -> let res = StreamMetadataResult { streamMetaResultStream = stream , streamMetaResultVersion = evt_number , streamMetaResultData = s } in return res Nothing -> fail "StreamMetadata: wrong format." Nothing -> fail "impossible: extractStreamMetadataResult" RE_STREAM_DELETED -> return $ DeletedStreamMetadataResult stream RE_NOT_FOUND -> return $ NotFoundStreamMetadataResult stream RE_NO_STREAM -> return $ NotFoundStreamMetadataResult stream _ -> fail "unexpected ReadEventResult" where action = readResultResolvedEvent rres >>= resolvedEventOriginal evt_number = readResultEventNumber rres -------------------------------------------------------------------------------- -- | Asynchronously create a persistent subscription group on a stream. createPersistentSubscription :: Connection -> Text -> Text -> PersistentSubscriptionSettings -> IO (Async ()) createPersistentSubscription Connection{..} group stream sett = do (as, mvar) <- createAsync _runCmd (CreatePersist group stream sett (putMVar mvar)) return as -------------------------------------------------------------------------------- -- | Asynchronously update a persistent subscription group on a stream. updatePersistentSubscription :: Connection -> Text -> Text -> PersistentSubscriptionSettings -> IO (Async ()) updatePersistentSubscription Connection{..} group stream sett = do (as, mvar) <- createAsync _runCmd (UpdatePersist group stream sett (putMVar mvar)) return as -------------------------------------------------------------------------------- -- | Asynchronously delete a persistent subscription group on a stream. deletePersistentSubscription :: Connection -> Text -> Text -> IO (Async ()) deletePersistentSubscription Connection{..} group stream = do (as, mvar) <- createAsync _runCmd (DeletePersist group stream (putMVar mvar)) return as -------------------------------------------------------------------------------- -- | Asynchronously connect to a persistent subscription given a group on a -- stream. connectToPersistentSubscription :: Connection -> Text -> Text -> Int32 -> IO (Async (Subscription Persistent)) connectToPersistentSubscription Connection{..} group stream bufSize = do mvar <- newEmptyMVar _runCmd (ConnectPersist group stream bufSize (putMVar mvar)) async $ readMVar mvar -------------------------------------------------------------------------------- createAsync :: IO (Async a, MVar (OperationExceptional a)) createAsync = do mvar <- newEmptyMVar as <- async $ do res <- readMVar mvar either throwIO return res return (as, mvar) -------------------------------------------------------------------------------- metaStreamOf :: Text -> Text metaStreamOf s = "$$" <> s