{-| Implementation of an SQLite3-based event store. -} module Data.CQRS.EventStore.Backend.Sqlite3 ( openSqliteEventStore ) where import Control.Monad (when, forM_, liftM) import Data.ByteString (ByteString) import Data.Conduit (Source, ($=), ($$), runResourceT) import qualified Data.Conduit.List as CL import Data.CQRS.EventStore.Backend (EventStoreBackend(..), RawEvent, RawSnapshot(..)) import Data.CQRS.EventStore.Backend.Sqlite3Utils (withTransaction, execSql, sourceQuery) import Data.CQRS.GUID (GUID) import Data.CQRS.PersistedEvent (PersistedEvent(..)) import Data.CQRS.Serialize (decode', encode) import qualified Database.SQLite3 as SQL import Database.SQLite3 (Database, SQLData(..)) import Prelude hiding (catch) -- Convenience class for converting values to SQLData. class ToSQLData a where toSQLData :: a -> SQLData instance ToSQLData GUID where toSQLData = SQLBlob . encode instance ToSQLData Int where toSQLData = SQLInteger . fromIntegral instance ToSQLData ByteString where toSQLData = SQLBlob -- SQL createEventsSql :: String createEventsSql = "CREATE TABLE IF NOT EXISTS events ( guid BLOB , ev_data BLOB , version INTEGER , gversion INTEGER, PRIMARY KEY (guid, version) );" selectEventsSql :: String selectEventsSql = "SELECT version, gversion, ev_data FROM events WHERE guid = ? AND version > ? ORDER BY version ASC;" enumerateAllEventsSql :: String enumerateAllEventsSql = "SELECT gversion, guid, version, ev_data FROM events WHERE gversion >= ? ORDER BY gversion ASC;" insertEventSql :: String insertEventSql = "INSERT INTO events ( guid, version, ev_data, gversion ) VALUES (?, ?, ?, ?);" createAggregateVersionsSql :: String createAggregateVersionsSql = "CREATE TABLE IF NOT EXISTS versions ( guid BLOB PRIMARY KEY , version INTEGER );" getCurrentVersionSql :: String getCurrentVersionSql = "SELECT version FROM versions WHERE guid = ?;" getLatestVersionSql :: String getLatestVersionSql = "SELECT COALESCE(MAX(gversion), 0) FROM events;" updateCurrentVersionSql :: String updateCurrentVersionSql = "INSERT OR REPLACE INTO versions ( guid, version ) VALUES (?,?);" createSnapshotSql :: String createSnapshotSql = "CREATE TABLE IF NOT EXISTS snapshots ( guid BLOB PRIMARY KEY , data BLOB , version INTEGER );" writeSnapshotSql :: String writeSnapshotSql = "INSERT OR REPLACE INTO snapshots ( guid , data, version ) VALUES ( ?, ?, ? );" selectSnapshotSql :: String selectSnapshotSql = "SELECT data, version FROM snapshots WHERE guid = ?;" badQueryResultMsg :: [String] -> [SQLData] -> String badQueryResultMsg params columns = concat ["Invalid query result shape. Params: ", show params, ". Result columns: ", show columns] versionConflict :: (Show a, Show b) => a -> b -> IO c versionConflict ov cv = fail $ concat [ "Version conflict detected (expected ", show ov , ", saw ", show cv, ")" ] storeEvents :: Database -> GUID -> Int -> [RawEvent] -> IO () storeEvents database guid originatingVersion events = do -- Column unpacking. let unpackColumns [ SQLInteger v ] = v unpackColumns columns = error $ badQueryResultMsg [show guid] columns -- Get the current version number of the aggregate. curVer <- runResourceT $ (sourceQuery database getCurrentVersionSql [toSQLData guid]) $$ CL.fold (\x -> max x . unpackColumns) 0 -- Sanity check current version number. when (fromIntegral curVer /= originatingVersion) $ versionConflict originatingVersion curVer -- Update de-normalized version number. execSql database updateCurrentVersionSql [ toSQLData guid , toSQLData $ originatingVersion + length events ] -- Store the supplied events. forM_ events $ \e -> do execSql database insertEventSql [ toSQLData guid , toSQLData $ peSequenceNumber e , toSQLData $ peEvent e , toSQLData $ peGlobalVer e ] retrieveEvents :: Database -> GUID -> Int -> Source IO RawEvent retrieveEvents database guid v0 = do -- Unpack the columns into tuples. let unpackColumns [SQLInteger version, SQLInteger gversion, SQLBlob eventData] = PersistedEvent guid eventData (fromIntegral version) (fromIntegral gversion) unpackColumns columns = error $ badQueryResultMsg [show guid, show v0] columns -- Find events with version numbers. fmap unpackColumns $ sourceQuery database selectEventsSql [toSQLData guid, toSQLData v0] enumerateAllEvents :: Database -> Int -> Source IO RawEvent enumerateAllEvents database minVersion = do sourceQuery database enumerateAllEventsSql [toSQLData minVersion] $= CL.map (\columns -> do case columns of [ SQLInteger gv, SQLBlob g, SQLInteger v, SQLBlob ed ] -> PersistedEvent (decode' g) ed (fromIntegral v) (fromIntegral gv) _ -> error $ badQueryResultMsg [show minVersion] columns) writeSnapshot :: Database -> GUID -> RawSnapshot -> IO () writeSnapshot database guid (RawSnapshot v d) = do execSql database writeSnapshotSql [ toSQLData guid , toSQLData d , toSQLData v ] getLatestSnapshot :: Database -> GUID -> IO (Maybe RawSnapshot) getLatestSnapshot database guid = do -- Unpack columns from result. let unpackColumns [SQLBlob d, SQLInteger v] = Just $ RawSnapshot (fromIntegral v) d unpackColumns columns = error $ badQueryResultMsg [show guid] columns -- Run the query. runResourceT $ (sourceQuery database selectSnapshotSql [toSQLData guid] $= (CL.map unpackColumns)) $$ CL.fold const Nothing getLatestVersion :: Database -> IO Int getLatestVersion database = do -- Unpack columns from result. let unpackColumns :: [SQLData] -> Int unpackColumns [SQLInteger v] = fromIntegral v unpackColumns columns = error $ badQueryResultMsg [] columns -- Run the query. liftM head $ runResourceT $ (sourceQuery database getLatestVersionSql [] $= (CL.map unpackColumns)) $$ CL.consume -- | Open an SQLite3-based event store using the named SQLite database file. -- The database file is created if it does not exist. openSqliteEventStore :: String -> IO EventStoreBackend openSqliteEventStore databaseFileName = do -- Create the database. database <- SQL.open databaseFileName -- Set up tables. execSql database createEventsSql [] execSql database createAggregateVersionsSql [] execSql database createSnapshotSql [] -- Return event store. return $ EventStoreBackend { esbStoreEvents = storeEvents database , esbRetrieveEvents = retrieveEvents database , esbEnumerateAllEvents = enumerateAllEvents database , esbWriteSnapshot = writeSnapshot database , esbGetLatestSnapshot = getLatestSnapshot database , esbGetLatestVersion = getLatestVersion database , esbWithTransaction = withTransaction database , esbCloseEventStoreBackend = SQL.close database }