{-# LANGUAGE DataKinds                 #-}
{-# LANGUAGE DeriveDataTypeable        #-}
{-# LANGUAGE ExistentialQuantification #-}
{-# LANGUAGE OverloadedStrings         #-}
{-# LANGUAGE RecordWildCards           #-}
{-# LANGUAGE ScopedTypeVariables       #-}
--------------------------------------------------------------------------------
-- |
-- Module : Database.EventStore
-- Copyright : (C) 2014 Yorick Laupa
-- License : (see the file LICENSE)
--
-- Maintainer : Yorick Laupa <yo.eight@gmail.com>
-- Stability : provisional
-- Portability : non-portable
--
--------------------------------------------------------------------------------
module Database.EventStore.Internal where

--------------------------------------------------------------------------------
import Prelude (String)
import Data.Int
import Data.Maybe

--------------------------------------------------------------------------------
import           Database.EventStore.Internal.Communication
import           Database.EventStore.Internal.Connection (connectionBuilder)
import           Database.EventStore.Internal.Control hiding (subscribe)
import           Database.EventStore.Internal.Discovery
import           Database.EventStore.Internal.Exec
import           Database.EventStore.Internal.Subscription.Catchup
import           Database.EventStore.Internal.Subscription.Persistent
import           Database.EventStore.Internal.Subscription.Types
import           Database.EventStore.Internal.Subscription.Regular
import           Database.EventStore.Internal.Logger
import qualified Database.EventStore.Internal.Operations as Op
import           Database.EventStore.Internal.Operation.Read.Common
import           Database.EventStore.Internal.Operation.Write.Common
import           Database.EventStore.Internal.Prelude
import           Database.EventStore.Internal.Stream
import           Database.EventStore.Internal.Types

--------------------------------------------------------------------------------
-- Connection
--------------------------------------------------------------------------------
-- | Gathers every connection type handled by the client.
data ConnectionType
    = Static String Int
      -- ^ HostName and Port.
    | Cluster ClusterSettings
    | Dns ByteString (Maybe DnsServer) Int
      -- ^ Domain name, optional DNS server and port.

--------------------------------------------------------------------------------
-- | Represents a connection to a single EventStore node.
data Connection
    = Connection
      { Connection -> Exec
_exec     :: Exec
      , Connection -> Settings
_settings :: Settings
      , Connection -> ConnectionType
_type     :: ConnectionType
      }

--------------------------------------------------------------------------------
-- | Creates a new 'Connection' to a single node. It maintains a full duplex
--   connection to the EventStore. An EventStore 'Connection' operates quite
--   differently than say a SQL connection. Normally when you use an EventStore
--   connection you want to keep the connection open for a much longer of time
--   than when you use a SQL connection.
--
--   Another difference  is that with the EventStore 'Connection' all operations
--   are handled in a full async manner (even if you call the synchronous
--   behaviors). Many threads can use an EvenStore 'Connection' at the same time
--   or a single thread can make many asynchronous requests. To get the most
--   performance out of the connection it is generally recommended to use it in
--   this way.
connect :: Settings -> ConnectionType -> IO Connection
connect :: Settings -> ConnectionType -> IO Connection
connect settings :: Settings
settings@Settings{Bool
Maybe Text
Maybe TLSSettings
Maybe Credentials
LogType
NominalDiffTime
LoggerFilter
MonitoringBackend
Retry
s_defaultUserCredentials :: Settings -> Maybe Credentials
s_defaultConnectionName :: Settings -> Maybe Text
s_monitoring :: Settings -> MonitoringBackend
s_operationRetry :: Settings -> Retry
s_operationTimeout :: Settings -> NominalDiffTime
s_loggerDetailed :: Settings -> Bool
s_loggerFilter :: Settings -> LoggerFilter
s_loggerType :: Settings -> LogType
s_ssl :: Settings -> Maybe TLSSettings
s_reconnect_delay :: Settings -> NominalDiffTime
s_retry :: Settings -> Retry
s_requireMaster :: Settings -> Bool
s_heartbeatTimeout :: Settings -> NominalDiffTime
s_heartbeatInterval :: Settings -> NominalDiffTime
s_defaultUserCredentials :: Maybe Credentials
s_defaultConnectionName :: Maybe Text
s_monitoring :: MonitoringBackend
s_operationRetry :: Retry
s_operationTimeout :: NominalDiffTime
s_loggerDetailed :: Bool
s_loggerFilter :: LoggerFilter
s_loggerType :: LogType
s_ssl :: Maybe TLSSettings
s_reconnect_delay :: NominalDiffTime
s_retry :: Retry
s_requireMaster :: Bool
s_heartbeatTimeout :: NominalDiffTime
s_heartbeatInterval :: NominalDiffTime
..} ConnectionType
tpe = do
    Discovery
disc <- case ConnectionType
tpe of
        Static String
host Int
port -> Discovery -> IO Discovery
forall (m :: * -> *) a. Monad m => a -> m a
return (Discovery -> IO Discovery) -> Discovery -> IO Discovery
forall a b. (a -> b) -> a -> b
$ String -> Int -> Discovery
staticEndPointDiscovery String
host Int
port
        Cluster ClusterSettings
setts    -> ClusterSettings -> IO Discovery
clusterDnsEndPointDiscovery ClusterSettings
setts
        Dns ByteString
dom Maybe DnsServer
srv Int
port -> Discovery -> IO Discovery
forall (m :: * -> *) a. Monad m => a -> m a
return (Discovery -> IO Discovery) -> Discovery -> IO Discovery
forall a b. (a -> b) -> a -> b
$ ByteString -> Maybe DnsServer -> Int -> Discovery
simpleDnsEndPointDiscovery ByteString
dom Maybe DnsServer
srv Int
port

    LoggerRef
logRef  <- LogType -> LoggerFilter -> Bool -> IO LoggerRef
newLoggerRef LogType
s_loggerType LoggerFilter
s_loggerFilter Bool
s_loggerDetailed
    Bus
mainBus <- LoggerRef -> Settings -> IO Bus
newBus LoggerRef
logRef Settings
settings
    ConnectionBuilder
builder <- Settings -> IO ConnectionBuilder
connectionBuilder Settings
settings
    Exec
exec    <- Settings -> Bus -> ConnectionBuilder -> Discovery -> IO Exec
newExec Settings
settings Bus
mainBus ConnectionBuilder
builder Discovery
disc
    Connection -> IO Connection
forall (m :: * -> *) a. Monad m => a -> m a
return (Connection -> IO Connection) -> Connection -> IO Connection
forall a b. (a -> b) -> a -> b
$ Exec -> Settings -> ConnectionType -> Connection
Connection Exec
exec Settings
settings ConnectionType
tpe

--------------------------------------------------------------------------------
-- | Waits the 'Connection' to be closed.
waitTillClosed :: Connection -> IO ()
waitTillClosed :: Connection -> IO ()
waitTillClosed Connection{Settings
Exec
ConnectionType
_type :: ConnectionType
_settings :: Settings
_exec :: Exec
_type :: Connection -> ConnectionType
_settings :: Connection -> Settings
_exec :: Connection -> Exec
..} = Exec -> IO ()
execWaitTillClosed Exec
_exec

--------------------------------------------------------------------------------
-- | Returns a 'Connection' 's 'Settings'.
connectionSettings :: Connection -> Settings
connectionSettings :: Connection -> Settings
connectionSettings = Connection -> Settings
_settings

--------------------------------------------------------------------------------
-- | Asynchronously closes the 'Connection'.
shutdown :: Connection -> IO ()
shutdown :: Connection -> IO ()
shutdown Connection{Settings
Exec
ConnectionType
_type :: ConnectionType
_settings :: Settings
_exec :: Exec
_type :: Connection -> ConnectionType
_settings :: Connection -> Settings
_exec :: Connection -> Exec
..} = Exec -> SystemShutdown -> IO ()
forall p a (m :: * -> *).
(Pub p, Typeable a, MonadIO m) =>
p -> a -> m ()
publishWith Exec
_exec SystemShutdown
SystemShutdown

--------------------------------------------------------------------------------
-- | Sends a single 'Event' to given stream.
sendEvent :: Connection
          -> StreamName
          -> ExpectedVersion
          -> Event
          -> Maybe Credentials
          -> IO (Async WriteResult)
sendEvent :: Connection
-> StreamName
-> ExpectedVersion
-> Event
-> Maybe Credentials
-> IO (Async WriteResult)
sendEvent Connection
mgr StreamName
evt_stream ExpectedVersion
exp_ver Event
evt Maybe Credentials
cred =
    Connection
-> StreamName
-> ExpectedVersion
-> [Event]
-> Maybe Credentials
-> IO (Async WriteResult)
sendEvents Connection
mgr StreamName
evt_stream ExpectedVersion
exp_ver [Event
evt] Maybe Credentials
cred

--------------------------------------------------------------------------------
-- | Sends a list of 'Event' to given stream.
sendEvents :: Connection
           -> StreamName
           -> ExpectedVersion
           -> [Event]
           -> Maybe Credentials
           -> IO (Async WriteResult)
sendEvents :: Connection
-> StreamName
-> ExpectedVersion
-> [Event]
-> Maybe Credentials
-> IO (Async WriteResult)
sendEvents Connection{Settings
Exec
ConnectionType
_type :: ConnectionType
_settings :: Settings
_exec :: Exec
_type :: Connection -> ConnectionType
_settings :: Connection -> Settings
_exec :: Connection -> Exec
..} StreamName
evt_stream ExpectedVersion
exp_ver [Event]
evts Maybe Credentials
cred =
    Settings
-> Exec
-> Text
-> ExpectedVersion
-> Maybe Credentials
-> [Event]
-> IO (Async WriteResult)
Op.writeEvents Settings
_settings Exec
_exec (StreamName -> Text
forall t. StreamId t -> Text
streamIdRaw StreamName
evt_stream) ExpectedVersion
exp_ver Maybe Credentials
cred [Event]
evts

--------------------------------------------------------------------------------
-- | Deletes given stream.
deleteStream :: Connection
             -> StreamName
             -> ExpectedVersion
             -> Maybe Bool       -- ^ Hard delete
             -> Maybe Credentials
             -> IO (Async Op.DeleteResult)
deleteStream :: Connection
-> StreamName
-> ExpectedVersion
-> Maybe Bool
-> Maybe Credentials
-> IO (Async DeleteResult)
deleteStream Connection{Settings
Exec
ConnectionType
_type :: ConnectionType
_settings :: Settings
_exec :: Exec
_type :: Connection -> ConnectionType
_settings :: Connection -> Settings
_exec :: Connection -> Exec
..} StreamName
stream ExpectedVersion
expVer Maybe Bool
hardDel Maybe Credentials
cred =
    Settings
-> Exec
-> Text
-> ExpectedVersion
-> Maybe Bool
-> Maybe Credentials
-> IO (Async DeleteResult)
Op.deleteStream Settings
_settings Exec
_exec (StreamName -> Text
forall t. StreamId t -> Text
streamIdRaw StreamName
stream) ExpectedVersion
expVer Maybe Bool
hardDel Maybe Credentials
cred

--------------------------------------------------------------------------------
-- | Represents a multi-request transaction with the EventStore.
data Transaction =
    Transaction
    { Transaction -> Text
_tStream  :: Text
    , Transaction -> TransactionId
_tTransId :: TransactionId
    , Transaction -> ExpectedVersion
_tExpVer  :: ExpectedVersion
    , Transaction -> Connection
_tConn    :: Connection
    }

--------------------------------------------------------------------------------
-- | The id of a 'Transaction'.
newtype TransactionId =
    TransactionId { TransactionId -> Int64
_unTransId :: Int64 }
    deriving (TransactionId -> TransactionId -> Bool
(TransactionId -> TransactionId -> Bool)
-> (TransactionId -> TransactionId -> Bool) -> Eq TransactionId
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: TransactionId -> TransactionId -> Bool
$c/= :: TransactionId -> TransactionId -> Bool
== :: TransactionId -> TransactionId -> Bool
$c== :: TransactionId -> TransactionId -> Bool
Eq, Eq TransactionId
Eq TransactionId
-> (TransactionId -> TransactionId -> Ordering)
-> (TransactionId -> TransactionId -> Bool)
-> (TransactionId -> TransactionId -> Bool)
-> (TransactionId -> TransactionId -> Bool)
-> (TransactionId -> TransactionId -> Bool)
-> (TransactionId -> TransactionId -> TransactionId)
-> (TransactionId -> TransactionId -> TransactionId)
-> Ord TransactionId
TransactionId -> TransactionId -> Bool
TransactionId -> TransactionId -> Ordering
TransactionId -> TransactionId -> TransactionId
forall a.
Eq a
-> (a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
min :: TransactionId -> TransactionId -> TransactionId
$cmin :: TransactionId -> TransactionId -> TransactionId
max :: TransactionId -> TransactionId -> TransactionId
$cmax :: TransactionId -> TransactionId -> TransactionId
>= :: TransactionId -> TransactionId -> Bool
$c>= :: TransactionId -> TransactionId -> Bool
> :: TransactionId -> TransactionId -> Bool
$c> :: TransactionId -> TransactionId -> Bool
<= :: TransactionId -> TransactionId -> Bool
$c<= :: TransactionId -> TransactionId -> Bool
< :: TransactionId -> TransactionId -> Bool
$c< :: TransactionId -> TransactionId -> Bool
compare :: TransactionId -> TransactionId -> Ordering
$ccompare :: TransactionId -> TransactionId -> Ordering
$cp1Ord :: Eq TransactionId
Ord, Int -> TransactionId -> ShowS
[TransactionId] -> ShowS
TransactionId -> String
(Int -> TransactionId -> ShowS)
-> (TransactionId -> String)
-> ([TransactionId] -> ShowS)
-> Show TransactionId
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [TransactionId] -> ShowS
$cshowList :: [TransactionId] -> ShowS
show :: TransactionId -> String
$cshow :: TransactionId -> String
showsPrec :: Int -> TransactionId -> ShowS
$cshowsPrec :: Int -> TransactionId -> ShowS
Show)

--------------------------------------------------------------------------------
-- | Gets the id of a 'Transaction'.
transactionId :: Transaction -> TransactionId
transactionId :: Transaction -> TransactionId
transactionId = Transaction -> TransactionId
_tTransId

--------------------------------------------------------------------------------
-- | Starts a transaction on given stream.
startTransaction :: Connection
                 -> StreamName            -- ^ Stream name
                 -> ExpectedVersion
                 -> Maybe Credentials
                 -> IO (Async Transaction)
startTransaction :: Connection
-> StreamName
-> ExpectedVersion
-> Maybe Credentials
-> IO (Async Transaction)
startTransaction conn :: Connection
conn@Connection{Settings
Exec
ConnectionType
_type :: ConnectionType
_settings :: Settings
_exec :: Exec
_type :: Connection -> ConnectionType
_settings :: Connection -> Settings
_exec :: Connection -> Exec
..} StreamName
evt_stream ExpectedVersion
exp_ver Maybe Credentials
cred = do
    Async Int64
as <- Settings
-> Exec
-> Text
-> ExpectedVersion
-> Maybe Credentials
-> IO (Async Int64)
Op.transactionStart Settings
_settings Exec
_exec (StreamName -> Text
forall t. StreamId t -> Text
streamIdRaw StreamName
evt_stream) ExpectedVersion
exp_ver Maybe Credentials
cred
    IO Transaction -> IO (Async (StM IO Transaction))
forall (m :: * -> *) a.
MonadBaseControl IO m =>
m a -> m (Async (StM m a))
async (IO Transaction -> IO (Async (StM IO Transaction)))
-> IO Transaction -> IO (Async (StM IO Transaction))
forall a b. (a -> b) -> a -> b
$ do
        Int64
tid <- Async (StM IO Int64) -> IO Int64
forall (m :: * -> *) a.
MonadBaseControl IO m =>
Async (StM m a) -> m a
wait Async Int64
Async (StM IO Int64)
as
        Transaction -> IO Transaction
forall (m :: * -> *) a. Monad m => a -> m a
return Transaction :: Text
-> TransactionId -> ExpectedVersion -> Connection -> Transaction
Transaction
               { _tStream :: Text
_tStream  = StreamName -> Text
forall t. StreamId t -> Text
streamIdRaw StreamName
evt_stream
               , _tTransId :: TransactionId
_tTransId = Int64 -> TransactionId
TransactionId Int64
tid
               , _tExpVer :: ExpectedVersion
_tExpVer  = ExpectedVersion
exp_ver
               , _tConn :: Connection
_tConn    = Connection
conn
               }

--------------------------------------------------------------------------------
-- | Asynchronously writes to a transaction in the EventStore.
transactionWrite :: Transaction
                 -> [Event]
                 -> Maybe Credentials
                 -> IO (Async ())
transactionWrite :: Transaction -> [Event] -> Maybe Credentials -> IO (Async ())
transactionWrite Transaction{Text
ExpectedVersion
TransactionId
Connection
_tConn :: Connection
_tExpVer :: ExpectedVersion
_tTransId :: TransactionId
_tStream :: Text
_tConn :: Transaction -> Connection
_tExpVer :: Transaction -> ExpectedVersion
_tTransId :: Transaction -> TransactionId
_tStream :: Transaction -> Text
..} [Event]
evts Maybe Credentials
cred = do
    let Connection{Settings
Exec
ConnectionType
_type :: ConnectionType
_settings :: Settings
_exec :: Exec
_type :: Connection -> ConnectionType
_settings :: Connection -> Settings
_exec :: Connection -> Exec
..} = Connection
_tConn
        raw_id :: Int64
raw_id = TransactionId -> Int64
_unTransId TransactionId
_tTransId
    Settings
-> Exec
-> Text
-> ExpectedVersion
-> Int64
-> [Event]
-> Maybe Credentials
-> IO (Async ())
Op.transactionWrite Settings
_settings Exec
_exec Text
_tStream ExpectedVersion
_tExpVer Int64
raw_id [Event]
evts Maybe Credentials
cred

--------------------------------------------------------------------------------
-- | Asynchronously commits this transaction.
transactionCommit :: Transaction -> Maybe Credentials -> IO (Async WriteResult)
transactionCommit :: Transaction -> Maybe Credentials -> IO (Async WriteResult)
transactionCommit Transaction{Text
ExpectedVersion
TransactionId
Connection
_tConn :: Connection
_tExpVer :: ExpectedVersion
_tTransId :: TransactionId
_tStream :: Text
_tConn :: Transaction -> Connection
_tExpVer :: Transaction -> ExpectedVersion
_tTransId :: Transaction -> TransactionId
_tStream :: Transaction -> Text
..} Maybe Credentials
cred = do
    let Connection{Settings
Exec
ConnectionType
_type :: ConnectionType
_settings :: Settings
_exec :: Exec
_type :: Connection -> ConnectionType
_settings :: Connection -> Settings
_exec :: Connection -> Exec
..} = Connection
_tConn
        raw_id :: Int64
raw_id = TransactionId -> Int64
_unTransId TransactionId
_tTransId
    Settings
-> Exec
-> Text
-> ExpectedVersion
-> Int64
-> Maybe Credentials
-> IO (Async WriteResult)
Op.transactionCommit Settings
_settings Exec
_exec Text
_tStream ExpectedVersion
_tExpVer Int64
raw_id Maybe Credentials
cred

--------------------------------------------------------------------------------
-- | There isn't such of thing in EventStore parlance. Basically, if you want to
--   rollback, you just have to not 'transactionCommit' a 'Transaction'.
transactionRollback :: Transaction -> IO ()
transactionRollback :: Transaction -> IO ()
transactionRollback Transaction
_ = () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()

--------------------------------------------------------------------------------
-- | Reads a single event from given stream.
readEvent :: Connection
          -> StreamName
          -> EventNumber
          -> ResolveLink
          -> Maybe Credentials
          -> IO (Async (ReadResult EventNumber Op.ReadEvent))
readEvent :: Connection
-> StreamName
-> EventNumber
-> ResolveLink
-> Maybe Credentials
-> IO (Async (ReadResult EventNumber ReadEvent))
readEvent Connection{Settings
Exec
ConnectionType
_type :: ConnectionType
_settings :: Settings
_exec :: Exec
_type :: Connection -> ConnectionType
_settings :: Connection -> Settings
_exec :: Connection -> Exec
..} StreamName
stream EventNumber
evtNum ResolveLink
resLinkTos Maybe Credentials
cred = do
    let evtNumRaw :: Int64
evtNumRaw = EventNumber -> Int64
eventNumberToInt64 EventNumber
evtNum
        linkTos :: Bool
linkTos = ResolveLink -> Bool
resolveLinkToBool ResolveLink
resLinkTos
    Settings
-> Exec
-> Text
-> Int64
-> Bool
-> Maybe Credentials
-> IO (Async (ReadResult EventNumber ReadEvent))
Op.readEvent Settings
_settings Exec
_exec (StreamName -> Text
forall t. StreamId t -> Text
streamIdRaw StreamName
stream) Int64
evtNumRaw Bool
linkTos Maybe Credentials
cred

--------------------------------------------------------------------------------
-- | When batch-reading a stream, this type-level function maps the result you
--   will have whether you read a regular stream or $all stream. When reading
--   a regular stream, some read-error can occur like the stream got deleted.
--   However read-error cannot occur when reading $all stream (because $all
--   cannot get deleted).
type family BatchResult t where
    BatchResult EventNumber = ReadResult EventNumber StreamSlice
    BatchResult Position    = AllSlice

--------------------------------------------------------------------------------
-- | Reads events from a stream forward.
readEventsForward :: Connection
                  -> StreamId t
                  -> t
                  -> Int32      -- ^ Batch size
                  -> ResolveLink
                  -> Maybe Credentials
                  -> IO (Async (BatchResult t))
readEventsForward :: Connection
-> StreamId t
-> t
-> Int32
-> ResolveLink
-> Maybe Credentials
-> IO (Async (BatchResult t))
readEventsForward Connection
conn = Connection
-> ReadDirection
-> StreamId t
-> t
-> Int32
-> ResolveLink
-> Maybe Credentials
-> IO (Async (BatchResult t))
forall t.
Connection
-> ReadDirection
-> StreamId t
-> t
-> Int32
-> ResolveLink
-> Maybe Credentials
-> IO (Async (BatchResult t))
readEventsCommon Connection
conn ReadDirection
Forward

--------------------------------------------------------------------------------
-- | Reads events from a stream backward.
readEventsBackward :: Connection
                   -> StreamId t
                   -> t
                   -> Int32      -- ^ Batch size
                   -> ResolveLink
                   -> Maybe Credentials
                   -> IO (Async (BatchResult t))
readEventsBackward :: Connection
-> StreamId t
-> t
-> Int32
-> ResolveLink
-> Maybe Credentials
-> IO (Async (BatchResult t))
readEventsBackward Connection
conn = Connection
-> ReadDirection
-> StreamId t
-> t
-> Int32
-> ResolveLink
-> Maybe Credentials
-> IO (Async (BatchResult t))
forall t.
Connection
-> ReadDirection
-> StreamId t
-> t
-> Int32
-> ResolveLink
-> Maybe Credentials
-> IO (Async (BatchResult t))
readEventsCommon Connection
conn ReadDirection
Backward

--------------------------------------------------------------------------------
readEventsCommon :: Connection
                 -> ReadDirection
                 -> StreamId t
                 -> t
                 -> Int32
                 -> ResolveLink
                 -> Maybe Credentials
                 -> IO (Async (BatchResult t))
readEventsCommon :: Connection
-> ReadDirection
-> StreamId t
-> t
-> Int32
-> ResolveLink
-> Maybe Credentials
-> IO (Async (BatchResult t))
readEventsCommon Connection{Settings
Exec
ConnectionType
_type :: ConnectionType
_settings :: Settings
_exec :: Exec
_type :: Connection -> ConnectionType
_settings :: Connection -> Settings
_exec :: Connection -> Exec
..} ReadDirection
dir StreamId t
streamId t
start Int32
cnt ResolveLink
resLinkTos Maybe Credentials
cred = do
    let res_link_tos :: Bool
res_link_tos = ResolveLink -> Bool
resolveLinkToBool ResolveLink
resLinkTos
    case StreamId t
streamId of
        StreamName{} ->
            let name :: Text
name   = StreamId t -> Text
forall t. StreamId t -> Text
streamIdRaw StreamId t
streamId
                evtNum :: Int64
evtNum = EventNumber -> Int64
eventNumberToInt64 t
EventNumber
start in
            Settings
-> Exec
-> ReadDirection
-> Text
-> Int64
-> Int32
-> Bool
-> Maybe Credentials
-> IO (Async (ReadResult EventNumber StreamSlice))
Op.readStreamEvents Settings
_settings Exec
_exec ReadDirection
dir Text
name Int64
evtNum Int32
cnt Bool
res_link_tos Maybe Credentials
cred
        StreamId t
All ->
            let Position Int64
c_pos Int64
p_pos = t
Position
start in
            Settings
-> Exec
-> Int64
-> Int64
-> Int32
-> Bool
-> ReadDirection
-> Maybe Credentials
-> IO (Async AllSlice)
Op.readAllEvents Settings
_settings Exec
_exec Int64
c_pos Int64
p_pos Int32
cnt Bool
res_link_tos ReadDirection
dir Maybe Credentials
cred

--------------------------------------------------------------------------------
-- | Subscribes to a stream.
subscribe :: Connection
          -> StreamId t
          -> ResolveLink
          -> Maybe Credentials
          -> IO (RegularSubscription t)
subscribe :: Connection
-> StreamId t
-> ResolveLink
-> Maybe Credentials
-> IO (RegularSubscription t)
subscribe Connection{Settings
Exec
ConnectionType
_type :: ConnectionType
_settings :: Settings
_exec :: Exec
_type :: Connection -> ConnectionType
_settings :: Connection -> Settings
_exec :: Connection -> Exec
..} StreamId t
stream ResolveLink
resLinkTos Maybe Credentials
cred =
    Exec
-> StreamId t
-> Bool
-> Maybe Credentials
-> IO (RegularSubscription t)
forall t.
Exec
-> StreamId t
-> Bool
-> Maybe Credentials
-> IO (RegularSubscription t)
newRegularSubscription Exec
_exec StreamId t
stream Bool
resLnkTos Maybe Credentials
cred
  where
    resLnkTos :: Bool
resLnkTos = ResolveLink -> Bool
resolveLinkToBool ResolveLink
resLinkTos

--------------------------------------------------------------------------------
-- | Subscribes to a stream. If last checkpoint is defined, this will
--   'readStreamEventsForward' from that event number, otherwise from the
--   beginning. Once last stream event reached up, a subscription request will
--   be sent using 'subscribe'.
subscribeFrom :: Connection
              -> StreamId t
              -> ResolveLink
              -> Maybe t
              -> Maybe Int32 -- ^ Batch size
              -> Maybe Credentials
              -> IO (CatchupSubscription t)
subscribeFrom :: Connection
-> StreamId t
-> ResolveLink
-> Maybe t
-> Maybe Int32
-> Maybe Credentials
-> IO (CatchupSubscription t)
subscribeFrom Connection{Settings
Exec
ConnectionType
_type :: ConnectionType
_settings :: Settings
_exec :: Exec
_type :: Connection -> ConnectionType
_settings :: Connection -> Settings
_exec :: Connection -> Exec
..} StreamId t
streamId ResolveLink
resLinkTos Maybe t
lastChkPt Maybe Int32
batch Maybe Credentials
cred =
    Exec
-> Bool
-> Maybe Int32
-> Maybe Credentials
-> StreamId t
-> t
-> IO (CatchupSubscription t)
forall t.
Exec
-> Bool
-> Maybe Int32
-> Maybe Credentials
-> StreamId t
-> t
-> IO (CatchupSubscription t)
newCatchupSubscription Exec
_exec Bool
resLnkTos Maybe Int32
batch Maybe Credentials
cred StreamId t
streamId (t -> IO (CatchupSubscription t))
-> t -> IO (CatchupSubscription t)
forall a b. (a -> b) -> a -> b
$
        case StreamId t
streamId of
            StreamName{} -> EventNumber -> Maybe EventNumber -> EventNumber
forall a. a -> Maybe a -> a
fromMaybe EventNumber
streamStart Maybe t
Maybe EventNumber
lastChkPt
            StreamId t
All          -> Position -> Maybe Position -> Position
forall a. a -> Maybe a -> a
fromMaybe Position
positionStart Maybe t
Maybe Position
lastChkPt
  where
    resLnkTos :: Bool
resLnkTos = ResolveLink -> Bool
resolveLinkToBool ResolveLink
resLinkTos

--------------------------------------------------------------------------------
subscribeFromCommon :: Connection
                    -> ResolveLink
                    -> Maybe Int32
                    -> Maybe Credentials
                    -> StreamId t
                    -> t
                    -> IO (CatchupSubscription t)
subscribeFromCommon :: Connection
-> ResolveLink
-> Maybe Int32
-> Maybe Credentials
-> StreamId t
-> t
-> IO (CatchupSubscription t)
subscribeFromCommon Connection{Settings
Exec
ConnectionType
_type :: ConnectionType
_settings :: Settings
_exec :: Exec
_type :: Connection -> ConnectionType
_settings :: Connection -> Settings
_exec :: Connection -> Exec
..} ResolveLink
resLinkTos Maybe Int32
batch Maybe Credentials
cred StreamId t
kind t
seed =
    Exec
-> Bool
-> Maybe Int32
-> Maybe Credentials
-> StreamId t
-> t
-> IO (CatchupSubscription t)
forall t.
Exec
-> Bool
-> Maybe Int32
-> Maybe Credentials
-> StreamId t
-> t
-> IO (CatchupSubscription t)
newCatchupSubscription Exec
_exec Bool
resLnkTos Maybe Int32
batch Maybe Credentials
cred StreamId t
kind t
seed
  where
    resLnkTos :: Bool
resLnkTos = ResolveLink -> Bool
resolveLinkToBool ResolveLink
resLinkTos

--------------------------------------------------------------------------------
-- | Asynchronously sets the metadata for a stream.
setStreamMetadata :: Connection
                  -> StreamName
                  -> ExpectedVersion
                  -> StreamMetadata
                  -> Maybe Credentials
                  -> IO (Async WriteResult)
setStreamMetadata :: Connection
-> StreamName
-> ExpectedVersion
-> StreamMetadata
-> Maybe Credentials
-> IO (Async WriteResult)
setStreamMetadata Connection{Settings
Exec
ConnectionType
_type :: ConnectionType
_settings :: Settings
_exec :: Exec
_type :: Connection -> ConnectionType
_settings :: Connection -> Settings
_exec :: Connection -> Exec
..} StreamName
evt_stream ExpectedVersion
exp_ver StreamMetadata
metadata Maybe Credentials
cred = do
    let name :: Text
name = StreamName -> Text
forall t. StreamId t -> Text
streamIdRaw StreamName
evt_stream
    Settings
-> Exec
-> Text
-> ExpectedVersion
-> Maybe Credentials
-> StreamMetadata
-> IO (Async WriteResult)
Op.setMetaStream Settings
_settings Exec
_exec Text
name ExpectedVersion
exp_ver Maybe Credentials
cred StreamMetadata
metadata

--------------------------------------------------------------------------------
-- | Asynchronously gets the metadata of a stream.
getStreamMetadata :: Connection
                  -> StreamName
                  -> Maybe Credentials
                  -> IO (Async StreamMetadataResult)
getStreamMetadata :: Connection
-> StreamName
-> Maybe Credentials
-> IO (Async StreamMetadataResult)
getStreamMetadata Connection{Settings
Exec
ConnectionType
_type :: ConnectionType
_settings :: Settings
_exec :: Exec
_type :: Connection -> ConnectionType
_settings :: Connection -> Settings
_exec :: Connection -> Exec
..} StreamName
stream Maybe Credentials
cred =
    Settings
-> Exec
-> Text
-> Maybe Credentials
-> IO (Async StreamMetadataResult)
Op.readMetaStream Settings
_settings Exec
_exec (StreamName -> Text
forall t. StreamId t -> Text
streamIdRaw StreamName
stream) Maybe Credentials
cred

--------------------------------------------------------------------------------
-- | Asynchronously create a persistent subscription group on a stream.
createPersistentSubscription :: Connection
                             -> Text
                             -> StreamName
                             -> PersistentSubscriptionSettings
                             -> Maybe Credentials
                             -> IO (Async (Maybe PersistActionException))
createPersistentSubscription :: Connection
-> Text
-> StreamName
-> PersistentSubscriptionSettings
-> Maybe Credentials
-> IO (Async (Maybe PersistActionException))
createPersistentSubscription Connection{Settings
Exec
ConnectionType
_type :: ConnectionType
_settings :: Settings
_exec :: Exec
_type :: Connection -> ConnectionType
_settings :: Connection -> Settings
_exec :: Connection -> Exec
..} Text
grp StreamName
stream PersistentSubscriptionSettings
sett Maybe Credentials
cred =
    Exec
-> Text
-> Text
-> PersistentSubscriptionSettings
-> Maybe Credentials
-> IO (Async (Maybe PersistActionException))
Op.createPersist Exec
_exec Text
grp (StreamName -> Text
forall t. StreamId t -> Text
streamIdRaw StreamName
stream) PersistentSubscriptionSettings
sett Maybe Credentials
cred

--------------------------------------------------------------------------------
-- | Asynchronously update a persistent subscription group on a stream.
updatePersistentSubscription :: Connection
                             -> Text
                             -> StreamName
                             -> PersistentSubscriptionSettings
                             -> Maybe Credentials
                             -> IO (Async (Maybe PersistActionException))
updatePersistentSubscription :: Connection
-> Text
-> StreamName
-> PersistentSubscriptionSettings
-> Maybe Credentials
-> IO (Async (Maybe PersistActionException))
updatePersistentSubscription Connection{Settings
Exec
ConnectionType
_type :: ConnectionType
_settings :: Settings
_exec :: Exec
_type :: Connection -> ConnectionType
_settings :: Connection -> Settings
_exec :: Connection -> Exec
..} Text
grp StreamName
stream PersistentSubscriptionSettings
sett Maybe Credentials
cred =
    Exec
-> Text
-> Text
-> PersistentSubscriptionSettings
-> Maybe Credentials
-> IO (Async (Maybe PersistActionException))
Op.updatePersist Exec
_exec Text
grp (StreamName -> Text
forall t. StreamId t -> Text
streamIdRaw StreamName
stream) PersistentSubscriptionSettings
sett Maybe Credentials
cred

--------------------------------------------------------------------------------
-- | Asynchronously delete a persistent subscription group on a stream.
deletePersistentSubscription :: Connection
                             -> Text
                             -> StreamName
                             -> Maybe Credentials
                             -> IO (Async (Maybe PersistActionException))
deletePersistentSubscription :: Connection
-> Text
-> StreamName
-> Maybe Credentials
-> IO (Async (Maybe PersistActionException))
deletePersistentSubscription Connection{Settings
Exec
ConnectionType
_type :: ConnectionType
_settings :: Settings
_exec :: Exec
_type :: Connection -> ConnectionType
_settings :: Connection -> Settings
_exec :: Connection -> Exec
..} Text
grp StreamName
stream Maybe Credentials
cred =
    Exec
-> Text
-> Text
-> Maybe Credentials
-> IO (Async (Maybe PersistActionException))
Op.deletePersist Exec
_exec Text
grp (StreamName -> Text
forall t. StreamId t -> Text
streamIdRaw StreamName
stream) Maybe Credentials
cred

--------------------------------------------------------------------------------
-- | Asynchronously connect to a persistent subscription given a group on a
--   stream.
connectToPersistentSubscription :: Connection
                                -> Text
                                -> StreamName
                                -> Int32
                                -> Maybe Credentials
                                -> IO PersistentSubscription
connectToPersistentSubscription :: Connection
-> Text
-> StreamName
-> Int32
-> Maybe Credentials
-> IO PersistentSubscription
connectToPersistentSubscription Connection{Settings
Exec
ConnectionType
_type :: ConnectionType
_settings :: Settings
_exec :: Exec
_type :: Connection -> ConnectionType
_settings :: Connection -> Settings
_exec :: Connection -> Exec
..} Text
group StreamName
stream Int32
bufSize Maybe Credentials
cred =
    Exec
-> Text
-> StreamName
-> Int32
-> Maybe Credentials
-> IO PersistentSubscription
newPersistentSubscription Exec
_exec Text
group StreamName
stream Int32
bufSize Maybe Credentials
cred