module Database.EventStore.Internal.Operation.WriteEventsOperation
( writeEventsOperation ) where
import Control.Concurrent
import Data.Int
import Data.Maybe
import Data.Traversable
import GHC.Generics (Generic)
import Data.ProtocolBuffers
import Data.Text
import Database.EventStore.Internal.Manager.Operation
import Database.EventStore.Internal.Types
data WriteEvents
= WriteEvents
{ writeStreamId :: Required 1 (Value Text)
, writeExpectedVersion :: Required 2 (Value Int32)
, writeEvents :: Repeated 3 (Message NewEvent)
, writeRequireMaster :: Required 4 (Value Bool)
}
deriving (Generic, Show)
instance Encode WriteEvents
newWriteEvents :: Text
-> Int32
-> [NewEvent]
-> Bool
-> WriteEvents
newWriteEvents stream_id exp_ver evts req_master =
WriteEvents
{ writeStreamId = putField stream_id
, writeExpectedVersion = putField exp_ver
, writeEvents = putField evts
, writeRequireMaster = putField req_master
}
data WriteEventsCompleted
= WriteEventsCompleted
{ writeCompletedResult :: Required 1 (Enumeration OpResult)
, writeCompletedMessage :: Optional 2 (Value Text)
, writeCompletedFirstNumber :: Required 3 (Value Int32)
, writeCompletedLastNumber :: Required 4 (Value Int32)
, writeCompletedPreparePosition :: Optional 5 (Value Int64)
, writeCompletedCommitPosition :: Optional 6 (Value Int64)
}
deriving (Generic, Show)
instance Decode WriteEventsCompleted
writeEventsOperation :: Settings
-> MVar (OperationExceptional WriteResult)
-> Text
-> ExpectedVersion
-> [Event]
-> OperationParams
writeEventsOperation settings mvar evt_stream exp_ver evts =
OperationParams
{ opSettings = settings
, opRequestCmd = 0x82
, opResponseCmd = 0x83
, opRequest = do
new_evts <- traverse eventToNewEvent evts
let require_master = s_requireMaster settings
exp_ver_int32 = expVersionInt32 exp_ver
request = newWriteEvents evt_stream
exp_ver_int32
new_evts
require_master
return request
, opSuccess = inspect mvar evt_stream exp_ver
, opFailure = failed mvar
}
inspect :: MVar (OperationExceptional WriteResult)
-> Text
-> ExpectedVersion
-> WriteEventsCompleted
-> IO Decision
inspect mvar stream exp_ver wec = go (getField $ writeCompletedResult wec)
where
go OP_SUCCESS = succeed mvar wec
go OP_PREPARE_TIMEOUT = return Retry
go OP_FORWARD_TIMEOUT = return Retry
go OP_COMMIT_TIMEOUT = return Retry
go OP_WRONG_EXPECTED_VERSION = failed mvar wrong_version
go OP_STREAM_DELETED = failed mvar (StreamDeleted stream)
go OP_INVALID_TRANSACTION = failed mvar InvalidTransaction
go OP_ACCESS_DENIED = failed mvar (AccessDenied stream)
wrong_version = WrongExpectedVersion stream exp_ver
succeed :: MVar (OperationExceptional WriteResult)
-> WriteEventsCompleted
-> IO Decision
succeed mvar wec = do
putMVar mvar (Right wr)
return EndOperation
where
last_evt_num = getField $ writeCompletedLastNumber wec
com_pos = getField $ writeCompletedCommitPosition wec
pre_pos = getField $ writeCompletedPreparePosition wec
com_pos_int = fromMaybe (1) com_pos
pre_pos_int = fromMaybe (1) pre_pos
pos = Position com_pos_int pre_pos_int
wr = WriteResult last_evt_num pos
failed :: MVar (OperationExceptional WriteResult)
-> OperationException
-> IO Decision
failed mvar e = do
putMVar mvar (Left e)
return EndOperation
eventToNewEvent :: Event -> IO NewEvent
eventToNewEvent evt =
newEvent evt_type
evt_data_type
evt_metadata_type
evt_data_bytes
evt_metadata_bytes
where
evt_type = eventType evt
evt_data_bytes = eventDataBytes $ eventData evt
evt_data_type = eventDataType $ eventData evt
evt_metadata_bytes = eventMetadataBytes $ eventData evt
evt_metadata_type = eventMetadataType $ eventData evt