module Data.CQRS.EventStore.Backend.Sqlite3
( openSqliteEventStore
) where
import Control.Monad (when, forM_, liftM)
import Data.ByteString (ByteString)
import Data.Conduit (Source, ($=), ($$), runResourceT)
import qualified Data.Conduit.List as CL
import Data.CQRS.EventStore.Backend (EventStoreBackend(..), RawEvent, RawSnapshot(..))
import Data.CQRS.EventStore.Backend.Sqlite3Utils (withTransaction, execSql, sourceQuery)
import Data.CQRS.GUID (GUID)
import Data.CQRS.PersistedEvent (PersistedEvent(..))
import Data.CQRS.Serialize (decode', encode)
import qualified Database.SQLite3 as SQL
import Database.SQLite3 (Database, SQLData(..))
import Prelude hiding (catch)
class ToSQLData a where
toSQLData :: a -> SQLData
instance ToSQLData GUID where
toSQLData = SQLBlob . encode
instance ToSQLData Int where
toSQLData = SQLInteger . fromIntegral
instance ToSQLData ByteString where
toSQLData = SQLBlob
createEventsSql :: String
createEventsSql = "CREATE TABLE IF NOT EXISTS events ( guid BLOB , ev_data BLOB , version INTEGER , gversion INTEGER, PRIMARY KEY (guid, version) );"
selectEventsSql :: String
selectEventsSql = "SELECT version, gversion, ev_data FROM events WHERE guid = ? AND version > ? ORDER BY version ASC;"
enumerateAllEventsSql :: String
enumerateAllEventsSql = "SELECT gversion, guid, version, ev_data FROM events WHERE gversion >= ? ORDER BY gversion ASC;"
insertEventSql :: String
insertEventSql = "INSERT INTO events ( guid, version, ev_data, gversion ) VALUES (?, ?, ?, ?);"
createAggregateVersionsSql :: String
createAggregateVersionsSql = "CREATE TABLE IF NOT EXISTS versions ( guid BLOB PRIMARY KEY , version INTEGER );"
getCurrentVersionSql :: String
getCurrentVersionSql = "SELECT version FROM versions WHERE guid = ?;"
getLatestVersionSql :: String
getLatestVersionSql = "SELECT COALESCE(MAX(gversion), 0) FROM events;"
updateCurrentVersionSql :: String
updateCurrentVersionSql = "INSERT OR REPLACE INTO versions ( guid, version ) VALUES (?,?);"
createSnapshotSql :: String
createSnapshotSql = "CREATE TABLE IF NOT EXISTS snapshots ( guid BLOB PRIMARY KEY , data BLOB , version INTEGER );"
writeSnapshotSql :: String
writeSnapshotSql = "INSERT OR REPLACE INTO snapshots ( guid , data, version ) VALUES ( ?, ?, ? );"
selectSnapshotSql :: String
selectSnapshotSql = "SELECT data, version FROM snapshots WHERE guid = ?;"
badQueryResultMsg :: [String] -> [SQLData] -> String
badQueryResultMsg params columns = concat ["Invalid query result shape. Params: ", show params, ". Result columns: ", show columns]
versionConflict :: (Show a, Show b) => a -> b -> IO c
versionConflict ov cv =
fail $ concat [ "Version conflict detected (expected ", show ov
, ", saw ", show cv, ")"
]
storeEvents :: Database -> GUID -> Int -> [RawEvent] -> IO ()
storeEvents database guid originatingVersion events = do
let unpackColumns [ SQLInteger v ] = v
unpackColumns columns = error $ badQueryResultMsg [show guid] columns
curVer <- runResourceT $ (sourceQuery database getCurrentVersionSql [toSQLData guid]) $$ CL.fold (\x -> max x . unpackColumns) 0
when (fromIntegral curVer /= originatingVersion) $
versionConflict originatingVersion curVer
execSql database updateCurrentVersionSql
[ toSQLData guid
, toSQLData $ originatingVersion + length events
]
forM_ events $ \e -> do
execSql database insertEventSql
[ toSQLData guid
, toSQLData $ peSequenceNumber e
, toSQLData $ peEvent e
, toSQLData $ peGlobalVer e
]
retrieveEvents :: Database -> GUID -> Int -> Source IO RawEvent
retrieveEvents database guid v0 = do
let unpackColumns [SQLInteger version, SQLInteger gversion, SQLBlob eventData] = PersistedEvent guid eventData (fromIntegral version) (fromIntegral gversion)
unpackColumns columns = error $ badQueryResultMsg [show guid, show v0] columns
fmap unpackColumns $ sourceQuery database selectEventsSql [toSQLData guid, toSQLData v0]
enumerateAllEvents :: Database -> Int -> Source IO RawEvent
enumerateAllEvents database minVersion = do
sourceQuery database enumerateAllEventsSql [toSQLData minVersion] $= CL.map
(\columns -> do
case columns of
[ SQLInteger gv, SQLBlob g, SQLInteger v, SQLBlob ed ] ->
PersistedEvent (decode' g) ed (fromIntegral v) (fromIntegral gv)
_ ->
error $ badQueryResultMsg [show minVersion] columns)
writeSnapshot :: Database -> GUID -> RawSnapshot -> IO ()
writeSnapshot database guid (RawSnapshot v d) = do
execSql database writeSnapshotSql
[ toSQLData guid
, toSQLData d
, toSQLData v
]
getLatestSnapshot :: Database -> GUID -> IO (Maybe RawSnapshot)
getLatestSnapshot database guid = do
let unpackColumns [SQLBlob d, SQLInteger v] = Just $ RawSnapshot (fromIntegral v) d
unpackColumns columns = error $ badQueryResultMsg [show guid] columns
runResourceT $ (sourceQuery database selectSnapshotSql [toSQLData guid] $= (CL.map unpackColumns)) $$ CL.fold const Nothing
getLatestVersion :: Database -> IO Int
getLatestVersion database = do
let unpackColumns :: [SQLData] -> Int
unpackColumns [SQLInteger v] = fromIntegral v
unpackColumns columns = error $ badQueryResultMsg [] columns
liftM head $ runResourceT $ (sourceQuery database getLatestVersionSql [] $= (CL.map unpackColumns)) $$ CL.consume
openSqliteEventStore :: String -> IO EventStoreBackend
openSqliteEventStore databaseFileName = do
database <- SQL.open databaseFileName
execSql database createEventsSql []
execSql database createAggregateVersionsSql []
execSql database createSnapshotSql []
return $ EventStoreBackend { esbStoreEvents = storeEvents database
, esbRetrieveEvents = retrieveEvents database
, esbEnumerateAllEvents = enumerateAllEvents database
, esbWriteSnapshot = writeSnapshot database
, esbGetLatestSnapshot = getLatestSnapshot database
, esbGetLatestVersion = getLatestVersion database
, esbWithTransaction = withTransaction database
, esbCloseEventStoreBackend = SQL.close database
}