module Data.CQRS.EventStore.Backend.Sqlite3
( openSqliteEventStore
) where
import Control.Monad (when, forM_, liftM)
import Data.ByteString (ByteString)
import Data.CQRS.EventStore.Backend (EventStoreBackend(..), RawEvent)
import Data.CQRS.EventStore.Backend.Sqlite3Utils (withTransaction, execSql, enumQueryResult)
import Data.CQRS.GUID (GUID)
import Data.CQRS.PersistedEvent (PersistedEvent(..))
import Data.CQRS.Serialize (decode')
import Data.Enumerator (Enumerator, (>>==), ($=), run_)
import qualified Data.Enumerator.List as EL
import Data.Serialize (encode)
import qualified Database.SQLite3 as SQL
import Database.SQLite3 (Database, SQLData(..))
import Prelude hiding (catch)
class ToSQLData a where
toSQLData :: a -> SQLData
instance ToSQLData GUID where
toSQLData = SQLBlob . encode
instance ToSQLData Int where
toSQLData = SQLInteger . fromIntegral
instance ToSQLData ByteString where
toSQLData = SQLBlob
createEventsSql :: String
createEventsSql = "CREATE TABLE IF NOT EXISTS events ( guid BLOB , ev_data BLOB , version INTEGER , gversion INTEGER, PRIMARY KEY (guid, version) );"
selectEventsSql :: String
selectEventsSql = "SELECT version, gversion, ev_data FROM events WHERE guid = ? AND version > ? ORDER BY version ASC;"
enumerateAllEventsSql :: String
enumerateAllEventsSql = "SELECT gversion, guid, version, ev_data FROM events WHERE gversion >= ? ORDER BY gversion ASC;"
insertEventSql :: String
insertEventSql = "INSERT INTO events ( guid, version, ev_data, gversion ) VALUES (?, ?, ?, ?);"
createAggregateVersionsSql :: String
createAggregateVersionsSql = "CREATE TABLE IF NOT EXISTS versions ( guid BLOB PRIMARY KEY , version INTEGER );"
getCurrentVersionSql :: String
getCurrentVersionSql = "SELECT version FROM versions WHERE guid = ?;"
getLatestVersionSql :: String
getLatestVersionSql = "SELECT COALESCE(MAX(gversion), 0) FROM events;"
updateCurrentVersionSql :: String
updateCurrentVersionSql = "INSERT OR REPLACE INTO versions ( guid, version ) VALUES (?,?);"
createSnapshotSql :: String
createSnapshotSql = "CREATE TABLE IF NOT EXISTS snapshots ( guid BLOB PRIMARY KEY , data BLOB , version INTEGER );"
writeSnapshotSql :: String
writeSnapshotSql = "INSERT OR REPLACE INTO snapshots ( guid , data, version ) VALUES ( ?, ?, ? );"
selectSnapshotSql :: String
selectSnapshotSql = "SELECT data, version FROM snapshots WHERE guid = ?;"
badQueryResultMsg :: [String] -> [SQLData] -> String
badQueryResultMsg params columns = concat ["Invalid query result shape. Params: ", show params, ". Result columns: ", show columns]
versionConflict :: (Show a, Show b) => a -> b -> IO c
versionConflict ov cv =
fail $ concat [ "Version conflict detected (expected ", show ov
, ", saw ", show cv, ")"
]
storeEvents :: Database -> GUID -> Int -> [RawEvent] -> IO ()
storeEvents database guid originatingVersion events = do
let unpackColumns [ SQLInteger v ] = v
unpackColumns columns = error $ badQueryResultMsg [show guid] columns
curVer <- run_ $ EL.fold (\x -> max x . unpackColumns) 0 >>==
(enumQueryResult database getCurrentVersionSql [toSQLData guid])
when (fromIntegral curVer /= originatingVersion) $
versionConflict originatingVersion curVer
execSql database updateCurrentVersionSql
[ toSQLData guid
, toSQLData $ originatingVersion + length events
]
forM_ events $ \e -> do
execSql database insertEventSql
[ toSQLData guid
, toSQLData $ peSequenceNumber e
, toSQLData $ peEvent e
, toSQLData $ peGlobalVer e
]
retrieveEvents :: Database -> GUID -> Int -> IO [RawEvent]
retrieveEvents database guid v0 = do
let unpackColumns [SQLInteger version, SQLInteger gversion, SQLBlob eventData] = PersistedEvent guid eventData (fromIntegral version) (fromIntegral gversion)
unpackColumns columns = error $ badQueryResultMsg [show guid, show v0] columns
run_ $ EL.consume >>==
(enumQueryResult database selectEventsSql [toSQLData guid, toSQLData v0] $=
(EL.map unpackColumns))
enumerateAllEvents :: Database -> Int -> Enumerator RawEvent IO a
enumerateAllEvents database minVersion = do
enumQueryResult database enumerateAllEventsSql [toSQLData minVersion] $= EL.map
(\columns -> do
case columns of
[ SQLInteger gv, SQLBlob g, SQLInteger v, SQLBlob ed ] ->
PersistedEvent (decode' g) ed (fromIntegral v) (fromIntegral gv)
_ ->
error $ badQueryResultMsg [show minVersion] columns)
writeSnapshot :: Database -> GUID -> (Int, ByteString) -> IO ()
writeSnapshot database guid (v,a) = do
execSql database writeSnapshotSql
[ toSQLData guid
, toSQLData a
, toSQLData v
]
getLatestSnapshot :: Database -> GUID -> IO (Maybe (Int, ByteString))
getLatestSnapshot database guid = do
let unpackColumns :: [SQLData] -> Maybe (Int,ByteString)
unpackColumns [SQLBlob a, SQLInteger v] = Just (fromIntegral v, a)
unpackColumns columns = error $ badQueryResultMsg [show guid] columns
run_ $ EL.fold const Nothing >>==
(enumQueryResult database selectSnapshotSql [toSQLData guid] $= (EL.map unpackColumns))
getLatestVersion :: Database -> IO Int
getLatestVersion database = do
let unpackColumns :: [SQLData] -> Int
unpackColumns [SQLInteger v] = fromIntegral v
unpackColumns columns = error $ badQueryResultMsg [] columns
liftM head $ run_ $ EL.consume >>== (enumQueryResult database getLatestVersionSql [] $= (EL.map unpackColumns))
openSqliteEventStore :: String -> IO EventStoreBackend
openSqliteEventStore databaseFileName = do
database <- SQL.open databaseFileName
execSql database createEventsSql []
execSql database createAggregateVersionsSql []
execSql database createSnapshotSql []
return $ EventStoreBackend { esbStoreEvents = storeEvents database
, esbRetrieveEvents = retrieveEvents database
, esbEnumerateAllEvents = enumerateAllEvents database
, esbWriteSnapshot = writeSnapshot database
, esbGetLatestSnapshot = getLatestSnapshot database
, esbGetLatestVersion = getLatestVersion database
, esbWithTransaction = withTransaction database
, esbCloseEventStoreBackend = SQL.close database
}