module Data.CQRS.PostgreSQL.Internal.SnapshotStore
( newSnapshotStore
) where
import Control.Exception (catchJust)
import Data.ByteString (ByteString)
import Data.Pool (Pool, withResource)
import Data.CQRS.Types.Snapshot (Snapshot(..))
import Data.CQRS.Types.SnapshotStore (SnapshotStore(..))
import Data.CQRS.PostgreSQL.Internal.Utils (execSql, execSql', ioQuery, isDuplicateKey, withTransaction, badQueryResultMsg, SqlValue(..))
import Data.UUID.Types (UUID)
import Database.PostgreSQL.LibPQ (Connection)
import qualified System.IO.Streams as Streams
writeSnapshot :: Pool Connection -> UUID -> Snapshot ByteString -> IO ()
writeSnapshot connectionPool aggregateId (Snapshot v d) =
withResource connectionPool $ \c -> do
ignoreDuplicateKey $
withTransaction c $ do
maybeUpdatedRows <- execSql' c updateSnapshotSql
[ SqlByteArray (Just d)
, SqlInt32 $ Just $ fromIntegral v
, SqlUUID $ Just aggregateId
]
if (maybe 0 id maybeUpdatedRows) > 0 then
return ()
else
execSql c insertSnapshotSql
[ SqlUUID $ Just aggregateId
, SqlByteArray (Just d)
, SqlInt32 $ Just $ fromIntegral v
]
where
updateSnapshotSql =
"UPDATE snapshot SET data=$1, version=$2 WHERE aggregate_uuid=$3"
insertSnapshotSql =
"INSERT INTO snapshot \
\ (aggregate_uuid, data, version) \
\ VALUES ($1, $2, $3)"
ignoreDuplicateKey action =
catchJust isDuplicateKey action $ \_ ->
return ()
readSnapshot :: Pool Connection -> UUID -> IO (Maybe (Snapshot ByteString))
readSnapshot connectionPool aggregateId = do
withResource connectionPool $ \connection -> do
withTransaction connection $ do
let unpackColumns :: [SqlValue] -> (ByteString, Int)
unpackColumns [ SqlByteArray (Just d)
, SqlInt32 (Just v) ] = (d, fromIntegral v)
unpackColumns columns = error $ badQueryResultMsg [show aggregateId] columns
r <- ioQuery connection selectSnapshotSql [SqlUUID $ Just aggregateId] $ \inputStream -> do
cs <- Streams.read inputStream
return $ fmap unpackColumns cs
case r of
Just (d,v) -> return $ Just $ Snapshot v d
Nothing -> return Nothing
where
selectSnapshotSql =
"SELECT data, version FROM snapshot WHERE aggregate_uuid=$1;"
newSnapshotStore :: Pool Connection -> IO (SnapshotStore ByteString)
newSnapshotStore connectionPool = do
return $ SnapshotStore
(writeSnapshot connectionPool)
(readSnapshot connectionPool)