{-# LANGUAGE RecordWildCards #-}
--------------------------------------------------------------------------------
-- |
-- Module : Database.EventStore.Internal.Operation.WriteEvents
-- Copyright : (C) 2015 Yorick Laupa
-- License : (see the file LICENSE)
--
-- Maintainer : Yorick Laupa <yo.eight@gmail.com>
-- Stability : provisional
-- Portability : non-portable
--
--------------------------------------------------------------------------------
module Database.EventStore.Internal.Operation.WriteEvents
    ( writeEvents ) where

--------------------------------------------------------------------------------
import Data.Maybe

--------------------------------------------------------------------------------
import Data.ProtocolBuffers

--------------------------------------------------------------------------------
import Database.EventStore.Internal.Command
import Database.EventStore.Internal.Communication (Transmit(..))
import Database.EventStore.Internal.Control (publishWith)
import Database.EventStore.Internal.Exec
import Database.EventStore.Internal.Operation (OpResult(..))
import Database.EventStore.Internal.Operation
import Database.EventStore.Internal.Operation.Write.Common
import Database.EventStore.Internal.Operation.WriteEvents.Message
import Database.EventStore.Internal.Prelude
import Database.EventStore.Internal.Settings
import Database.EventStore.Internal.Stream
import Database.EventStore.Internal.Types

-------------------------------------------------------------------------------
writeEvents
  :: Settings
  -> Exec
  -> Text
  -> ExpectedVersion
  -> Maybe Credentials
  -> [Event]
  -> IO (Async WriteResult)
writeEvents :: Settings
-> Exec
-> Text
-> ExpectedVersion
-> Maybe Credentials
-> [Event]
-> IO (Async WriteResult)
writeEvents Settings
setts Exec
exec Text
stream ExpectedVersion
version Maybe Credentials
creds [Event]
evts
  = do Mailbox
m <- forall (m :: * -> *). MonadBase IO m => m Mailbox
mailboxNew
       forall (m :: * -> *) a.
MonadBaseControl IO m =>
m a -> m (Async (StM m a))
async forall a b. (a -> b) -> a -> b
$
         do [NewEvent]
nevts <- forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
traverse Event -> IO NewEvent
eventToNewEventIO [Event]
evts
            let req :: Request
req = Text -> Int64 -> [NewEvent] -> Bool -> Request
newRequest Text
stream (ExpectedVersion -> Int64
expVersionInt64 ExpectedVersion
version) [NewEvent]
nevts (Settings -> Bool
s_requireMaster Settings
setts)

            Package
pkg <- forall msg (m :: * -> *).
(Encode msg, MonadIO m) =>
Command -> Maybe Credentials -> msg -> m Package
createPkg Command
writeEventsCmd Maybe Credentials
creds Request
req

            forall (m :: * -> *) a. Monad m => m (Loop a) -> m a
keepLooping forall a b. (a -> b) -> a -> b
$ do
              forall p a (m :: * -> *).
(Pub p, Typeable a, MonadIO m) =>
p -> a -> m ()
publishWith Exec
exec (Mailbox -> Lifetime -> Package -> Transmit
Transmit Mailbox
m Lifetime
OneTime Package
pkg)
              Either OperationError Response
outcome <- forall (m :: * -> *) resp.
(MonadBase IO m, Decode resp) =>
Mailbox -> m (Either OperationError resp)
mailboxReadDecoded Mailbox
m
              case Either OperationError Response
outcome of
                Left OperationError
e
                  -> forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throw OperationError
e
                Right Response
resp
                  -> let r :: FieldType (Required 1 (Enumeration OpResult))
r = forall a. HasField a => a -> FieldType a
getField forall a b. (a -> b) -> a -> b
$ Response -> Required 1 (Enumeration OpResult)
_result Response
resp
                         com_pos :: FieldType (Optional 6 (Value Int64))
com_pos = forall a. HasField a => a -> FieldType a
getField forall a b. (a -> b) -> a -> b
$ Response -> Optional 6 (Value Int64)
_commitPosition Response
resp
                         prep_pos :: FieldType (Optional 5 (Value Int64))
prep_pos = forall a. HasField a => a -> FieldType a
getField forall a b. (a -> b) -> a -> b
$ Response -> Optional 5 (Value Int64)
_preparePosition Response
resp
                         lst_num :: FieldType (Required 4 (Value Int64))
lst_num = forall a. HasField a => a -> FieldType a
getField forall a b. (a -> b) -> a -> b
$ Response -> Required 4 (Value Int64)
_lastNumber Response
resp
                         com_pos_int :: Int64
com_pos_int = forall a. a -> Maybe a -> a
fromMaybe (-Int64
1) FieldType (Optional 6 (Value Int64))
com_pos
                         prep_pos_int :: Int64
prep_pos_int = forall a. a -> Maybe a -> a
fromMaybe (-Int64
1) FieldType (Optional 5 (Value Int64))
prep_pos
                         pos :: Position
pos = Int64 -> Int64 -> Position
Position Int64
com_pos_int Int64
prep_pos_int
                         res :: WriteResult
res = Int64 -> Position -> WriteResult
WriteResult FieldType (Required 4 (Value Int64))
lst_num Position
pos in
                     case FieldType (Required 1 (Enumeration OpResult))
r of
                         FieldType (Required 1 (Enumeration OpResult))
OpResult
OP_SUCCESS -> forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ forall a. a -> Loop a
Break WriteResult
res
                         FieldType (Required 1 (Enumeration OpResult))
OpResult
OP_PREPARE_TIMEOUT -> forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a. Loop a
Loop
                         FieldType (Required 1 (Enumeration OpResult))
OpResult
OP_FORWARD_TIMEOUT -> forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a. Loop a
Loop
                         FieldType (Required 1 (Enumeration OpResult))
OpResult
OP_COMMIT_TIMEOUT -> forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a. Loop a
Loop
                         FieldType (Required 1 (Enumeration OpResult))
OpResult
OP_WRONG_EXPECTED_VERSION -> forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throw forall a b. (a -> b) -> a -> b
$ Text -> ExpectedVersion -> OperationError
WrongExpectedVersion Text
stream ExpectedVersion
version
                         FieldType (Required 1 (Enumeration OpResult))
OpResult
OP_STREAM_DELETED -> forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throw forall a b. (a -> b) -> a -> b
$ StreamName -> OperationError
StreamDeleted forall a b. (a -> b) -> a -> b
$ Text -> StreamName
StreamName Text
stream
                         FieldType (Required 1 (Enumeration OpResult))
OpResult
OP_INVALID_TRANSACTION -> forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throw OperationError
InvalidTransaction
                         FieldType (Required 1 (Enumeration OpResult))
OpResult
OP_ACCESS_DENIED -> forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throw forall a b. (a -> b) -> a -> b
$ forall t. StreamId t -> OperationError
AccessDenied (Text -> StreamName
StreamName Text
stream)