module Data.CQRS.PostgreSQL.Internal.EventStore
( newEventStore
) where
import Control.Exception (throw, catchJust)
import Control.Monad (forM_, (>=>))
import Data.ByteString (ByteString)
import Data.Pool (Pool, withResource)
import Data.CQRS.Types.EventStore (EventStore(..), StoreError(..))
import Data.CQRS.Types.PersistedEvent (PersistedEvent(..))
import Data.CQRS.PostgreSQL.Internal.Utils (execSql, ioQuery, SqlValue(..), isDuplicateKey, withTransaction, badQueryResultMsg)
import Data.Time.Clock.POSIX (getPOSIXTime)
import Data.UUID.Types (UUID)
import Database.PostgreSQL.LibPQ (Connection)
import System.IO.Streams (InputStream)
import qualified System.IO.Streams.Combinators as SC
storeEvents :: Pool Connection -> UUID -> [PersistedEvent ByteString] -> IO ()
storeEvents cp aggregateId es = do
withResource cp $ \c -> do
translateExceptions aggregateId $ do
withTransaction c $ do
forM_ es $ \e -> do
timestamp <- fmap (\t -> round $ t * 1000) $ getPOSIXTime
execSql c sqlInsertEvent
[ SqlUUID $ Just aggregateId
, SqlUUID $ Just $ peEventId e
, SqlByteArray $ Just $ peEvent e
, SqlInt32 $ Just $ fromIntegral $ peSequenceNumber e
, SqlInt64 $ Just $ timestamp
]
where
translateExceptions uuid action =
catchJust isDuplicateKey action $ \_ ->
throw $ VersionConflict uuid
sqlInsertEvent =
"INSERT INTO event (\
\ aggregate_uuid, \
\ event_uuid, \
\ event_data, \
\ seq_no, \
\ \"timestamp\", \
\ archive_uuid \
\) VALUES ($1, $2, $3, $4, $5, NULL)"
retrieveEvents :: Pool Connection -> UUID -> Int -> (InputStream (PersistedEvent ByteString) -> IO a) -> IO a
retrieveEvents cp aggregateId v0 f = withResource cp $ \c -> withTransaction c $ do
ioQuery c sqlSelectEvent [ SqlUUID $ Just aggregateId
, SqlInt32 $ Just $ fromIntegral v0
] $ SC.map unpack >=> f
where
unpack [ SqlInt32 (Just sequenceNumber)
, SqlUUID (Just eventId)
, SqlByteArray (Just eventData)
] = PersistedEvent eventData (fromIntegral sequenceNumber) eventId
unpack columns = error $ badQueryResultMsg [show aggregateId, show v0] columns
sqlSelectEvent =
" SELECT seq_no, event_uuid, event_data \
\ FROM event \
\ WHERE aggregate_uuid = $1 \
\ AND seq_no > $2 \
\ORDER BY seq_no ASC"
retrieveAllEvents :: Pool Connection -> (InputStream (UUID, PersistedEvent ByteString) -> IO a) -> IO a
retrieveAllEvents cp f = withResource cp $ \c -> withTransaction c $ do
ioQuery c sqlSelectAllEvents [ ] $ SC.map unpack >=> f
where
unpack [ SqlUUID (Just aggregateId)
, SqlInt32 (Just sequenceNumber)
, SqlUUID (Just eventId)
, SqlByteArray (Just eventData)
] = (aggregateId, PersistedEvent eventData (fromIntegral sequenceNumber) eventId)
unpack columns = error $ badQueryResultMsg [] columns
sqlSelectAllEvents =
" SELECT aggregate_uuid, seq_no, event_uuid, event_data \
\ FROM event \
\ORDER BY aggregate_uuid, seq_no ASC"
newEventStore :: Pool Connection -> IO (EventStore ByteString)
newEventStore connectionPool = do
return $ EventStore
{ esStoreEvents = storeEvents connectionPool
, esRetrieveEvents = retrieveEvents connectionPool
, esRetrieveAllEvents = retrieveAllEvents connectionPool
}