{-# LANGUAGE RecordWildCards #-}
--------------------------------------------------------------------------------
-- |
-- Module : Database.EventStore.Internal.Operation.ReadAllEvents
-- Copyright : (C) 2015 Yorick Laupa
-- License : (see the file LICENSE)
--
-- Maintainer : Yorick Laupa <yo.eight@gmail.com>
-- Stability : provisional
-- Portability : non-portable
--
--------------------------------------------------------------------------------
module Database.EventStore.Internal.Operation.ReadAllEvents
    ( readAllEvents ) where

--------------------------------------------------------------------------------
import Data.Int
import Data.Maybe

--------------------------------------------------------------------------------
import Data.ProtocolBuffers

--------------------------------------------------------------------------------
import Database.EventStore.Internal.Command
import Database.EventStore.Internal.Control (publishWith)
import Database.EventStore.Internal.Communication (Transmit(..))
import Database.EventStore.Internal.Exec (Exec)
import Database.EventStore.Internal.Operation
import Database.EventStore.Internal.Operation.Read.Common
import Database.EventStore.Internal.Operation.ReadAllEvents.Message
import Database.EventStore.Internal.Prelude
import Database.EventStore.Internal.Settings
import Database.EventStore.Internal.Stream
import Database.EventStore.Internal.Types

--------------------------------------------------------------------------------
-- | Batch read on $all stream operation.
readAllEvents
  :: Settings
  -> Exec
  -> Int64
  -> Int64
  -> Int32
  -> Bool
  -> ReadDirection
  -> Maybe Credentials
  -> IO (Async AllSlice)
readAllEvents :: Settings
-> Exec
-> Int64
-> Int64
-> Int32
-> Bool
-> ReadDirection
-> Maybe Credentials
-> IO (Async AllSlice)
readAllEvents Settings{Bool
Maybe Text
Maybe TLSSettings
Maybe Credentials
LogType
NominalDiffTime
LoggerFilter
MonitoringBackend
Retry
s_defaultUserCredentials :: Settings -> Maybe Credentials
s_defaultConnectionName :: Settings -> Maybe Text
s_monitoring :: Settings -> MonitoringBackend
s_operationRetry :: Settings -> Retry
s_operationTimeout :: Settings -> NominalDiffTime
s_loggerDetailed :: Settings -> Bool
s_loggerFilter :: Settings -> LoggerFilter
s_loggerType :: Settings -> LogType
s_ssl :: Settings -> Maybe TLSSettings
s_reconnect_delay :: Settings -> NominalDiffTime
s_retry :: Settings -> Retry
s_requireMaster :: Settings -> Bool
s_heartbeatTimeout :: Settings -> NominalDiffTime
s_heartbeatInterval :: Settings -> NominalDiffTime
s_defaultUserCredentials :: Maybe Credentials
s_defaultConnectionName :: Maybe Text
s_monitoring :: MonitoringBackend
s_operationRetry :: Retry
s_operationTimeout :: NominalDiffTime
s_loggerDetailed :: Bool
s_loggerFilter :: LoggerFilter
s_loggerType :: LogType
s_ssl :: Maybe TLSSettings
s_reconnect_delay :: NominalDiffTime
s_retry :: Retry
s_requireMaster :: Bool
s_heartbeatTimeout :: NominalDiffTime
s_heartbeatInterval :: NominalDiffTime
..} Exec
exec Int64
c_pos Int64
p_pos Int32
max_c Bool
tos ReadDirection
dir Maybe Credentials
cred
  = do Mailbox
m <- IO Mailbox
forall (m :: * -> *). MonadBase IO m => m Mailbox
mailboxNew
       IO AllSlice -> IO (Async (StM IO AllSlice))
forall (m :: * -> *) a.
MonadBaseControl IO m =>
m a -> m (Async (StM m a))
async (IO AllSlice -> IO (Async (StM IO AllSlice)))
-> IO AllSlice -> IO (Async (StM IO AllSlice))
forall a b. (a -> b) -> a -> b
$
         do let req :: Request
req = Int64 -> Int64 -> Int32 -> Bool -> Bool -> Request
newRequest Int64
c_pos Int64
p_pos Int32
max_c Bool
tos Bool
s_requireMaster
                cmd :: Command
cmd =
                  case ReadDirection
dir of
                    ReadDirection
Forward  -> Command
readAllEventsForwardCmd
                    ReadDirection
Backward -> Command
readAllEventsBackwardCmd

            Package
pkg <- Command -> Maybe Credentials -> Request -> IO Package
forall msg (m :: * -> *).
(Encode msg, MonadIO m) =>
Command -> Maybe Credentials -> msg -> m Package
createPkg Command
cmd Maybe Credentials
cred Request
req
            Exec -> Transmit -> IO ()
forall p a (m :: * -> *).
(Pub p, Typeable a, MonadIO m) =>
p -> a -> m ()
publishWith Exec
exec (Mailbox -> Lifetime -> Package -> Transmit
Transmit Mailbox
m Lifetime
OneTime Package
pkg)
            Either OperationError Response
outcome <- Mailbox -> IO (Either OperationError Response)
forall (m :: * -> *) resp.
(MonadBase IO m, Decode resp) =>
Mailbox -> m (Either OperationError resp)
mailboxReadDecoded Mailbox
m
            case Either OperationError Response
outcome of
              Left OperationError
e
                -> OperationError -> IO AllSlice
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throw OperationError
e
              Right Response
resp
                -> let r :: FieldType (Field 6 (OptionalField (Last (Enumeration Result))))
r = Field 6 (OptionalField (Last (Enumeration Result)))
-> FieldType (Field 6 (OptionalField (Last (Enumeration Result))))
forall a. HasField a => a -> FieldType a
getField (Field 6 (OptionalField (Last (Enumeration Result)))
 -> FieldType (Field 6 (OptionalField (Last (Enumeration Result)))))
-> Field 6 (OptionalField (Last (Enumeration Result)))
-> FieldType (Field 6 (OptionalField (Last (Enumeration Result))))
forall a b. (a -> b) -> a -> b
$ Response -> Optional 6 (Enumeration Result)
_Result Response
resp
                       err :: FieldType (Field 7 (OptionalField (Last (Value Text))))
err = Field 7 (OptionalField (Last (Value Text)))
-> FieldType (Field 7 (OptionalField (Last (Value Text))))
forall a. HasField a => a -> FieldType a
getField (Field 7 (OptionalField (Last (Value Text)))
 -> FieldType (Field 7 (OptionalField (Last (Value Text)))))
-> Field 7 (OptionalField (Last (Value Text)))
-> FieldType (Field 7 (OptionalField (Last (Value Text))))
forall a b. (a -> b) -> a -> b
$ Response -> Optional 7 (Value Text)
_Error Response
resp
                       nc_pos :: FieldType (Field 4 (RequiredField (Always (Value Int64))))
nc_pos = Field 4 (RequiredField (Always (Value Int64)))
-> FieldType (Field 4 (RequiredField (Always (Value Int64))))
forall a. HasField a => a -> FieldType a
getField (Field 4 (RequiredField (Always (Value Int64)))
 -> FieldType (Field 4 (RequiredField (Always (Value Int64)))))
-> Field 4 (RequiredField (Always (Value Int64)))
-> FieldType (Field 4 (RequiredField (Always (Value Int64))))
forall a b. (a -> b) -> a -> b
$ Response -> Required 4 (Value Int64)
_NextCommitPosition Response
resp
                       np_pos :: FieldType (Field 5 (RequiredField (Always (Value Int64))))
np_pos = Field 5 (RequiredField (Always (Value Int64)))
-> FieldType (Field 5 (RequiredField (Always (Value Int64))))
forall a. HasField a => a -> FieldType a
getField (Field 5 (RequiredField (Always (Value Int64)))
 -> FieldType (Field 5 (RequiredField (Always (Value Int64)))))
-> Field 5 (RequiredField (Always (Value Int64)))
-> FieldType (Field 5 (RequiredField (Always (Value Int64))))
forall a b. (a -> b) -> a -> b
$ Response -> Required 5 (Value Int64)
_NextPreparePosition Response
resp
                       es :: FieldType (Repeated 3 (Message ResolvedEventBuf))
es = Repeated 3 (Message ResolvedEventBuf)
-> FieldType (Repeated 3 (Message ResolvedEventBuf))
forall a. HasField a => a -> FieldType a
getField (Repeated 3 (Message ResolvedEventBuf)
 -> FieldType (Repeated 3 (Message ResolvedEventBuf)))
-> Repeated 3 (Message ResolvedEventBuf)
-> FieldType (Repeated 3 (Message ResolvedEventBuf))
forall a b. (a -> b) -> a -> b
$ Response -> Repeated 3 (Message ResolvedEventBuf)
_Events Response
resp
                       evts :: [ResolvedEvent]
evts = (ResolvedEventBuf -> ResolvedEvent)
-> [ResolvedEventBuf] -> [ResolvedEvent]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ResolvedEventBuf -> ResolvedEvent
newResolvedEventFromBuf [ResolvedEventBuf]
FieldType (Repeated 3 (Message ResolvedEventBuf))
es
                       eos :: Bool
eos = [ResolvedEvent] -> Bool
forall mono. MonoFoldable mono => mono -> Bool
null [ResolvedEvent]
evts
                       n_pos :: Position
n_pos = Int64 -> Int64 -> Position
Position Int64
FieldType (Field 4 (RequiredField (Always (Value Int64))))
nc_pos Int64
FieldType (Field 5 (RequiredField (Always (Value Int64))))
np_pos
                       slice :: AllSlice
slice =
                           if Bool
eos then AllSlice
forall t. Slice t
SliceEndOfStream else [ResolvedEvent] -> Maybe Position -> AllSlice
forall t. [ResolvedEvent] -> Maybe t -> Slice t
Slice [ResolvedEvent]
evts (Position -> Maybe Position
forall a. a -> Maybe a
Just Position
n_pos) in
                   case Result -> Maybe Result -> Result
forall a. a -> Maybe a -> a
fromMaybe Result
SUCCESS Maybe Result
FieldType (Field 6 (OptionalField (Last (Enumeration Result))))
r of
                     Result
ERROR -> OperationError -> IO AllSlice
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throw (OperationError -> IO AllSlice) -> OperationError -> IO AllSlice
forall a b. (a -> b) -> a -> b
$ Maybe Text -> OperationError
ServerError Maybe Text
FieldType (Field 7 (OptionalField (Last (Value Text))))
err
                     Result
ACCESS_DENIED -> OperationError -> IO AllSlice
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throw (OperationError -> IO AllSlice) -> OperationError -> IO AllSlice
forall a b. (a -> b) -> a -> b
$ StreamId Position -> OperationError
forall t. StreamId t -> OperationError
AccessDenied StreamId Position
All
                     Result
_ -> AllSlice -> IO AllSlice
forall (f :: * -> *) a. Applicative f => a -> f a
pure AllSlice
slice