module Data.CQRS.EventStore.Backend.Sqlite3
( SQLite3EventStoreBackend
, createBackendPool
) where
import Control.Monad (when, forM_)
import Data.ByteString (ByteString)
import Data.Conduit (ResourceT, Source, ($=), ($$), runResourceT)
import qualified Data.Conduit.List as CL
import Data.Conduit.Pool (Pool, createPool)
import Data.CQRS.EventStore.Backend (EventStoreBackend(..), RawEvent, RawSnapshot(..))
import Data.CQRS.EventStore.Backend.Sqlite3Utils (withTransaction, execSql, sourceQuery)
import Data.CQRS.GUID (GUID)
import qualified Data.CQRS.GUID as G
import Data.CQRS.PersistedEvent (PersistedEvent(..))
import Data.Text (Text)
import qualified Database.SQLite3 as SQL
import Database.SQLite3 (Database, SQLData(..))
class ToSQLData a where
toSQLData :: a -> SQLData
instance ToSQLData GUID where
toSQLData = SQLBlob . G.toByteString
instance ToSQLData Int where
toSQLData = SQLInteger . fromIntegral
instance ToSQLData ByteString where
toSQLData = SQLBlob
createEventsSql :: Text
createEventsSql = "CREATE TABLE IF NOT EXISTS events ( guid BLOB , ev_data BLOB , version INTEGER , PRIMARY KEY (guid, version) );"
selectEventsSql :: Text
selectEventsSql = "SELECT version, ev_data FROM events WHERE guid = ? AND version >= ? ORDER BY version ASC;"
enumerateAllEventsSql :: Text
enumerateAllEventsSql = "SELECT guid, version, ev_data FROM events ORDER BY version ASC;"
insertEventSql :: Text
insertEventSql = "INSERT INTO events ( guid, version, ev_data ) VALUES (?, ?, ?);"
createAggregateVersionsSql :: Text
createAggregateVersionsSql = "CREATE TABLE IF NOT EXISTS versions ( guid BLOB PRIMARY KEY , version INTEGER );"
getCurrentVersionSql :: Text
getCurrentVersionSql = "SELECT version FROM versions WHERE guid = ?;"
updateCurrentVersionSql :: Text
updateCurrentVersionSql = "INSERT OR REPLACE INTO versions ( guid, version ) VALUES (?,?);"
createSnapshotSql :: Text
createSnapshotSql = "CREATE TABLE IF NOT EXISTS snapshots ( guid BLOB PRIMARY KEY , data BLOB , version INTEGER );"
badQueryResultMsg :: [String] -> [SQLData] -> String
badQueryResultMsg params columns = concat ["Invalid query result shape. Params: ", show params, ". Result columns: ", show columns]
versionConflict :: (Show a, Show b) => a -> b -> IO c
versionConflict ov cv =
fail $ concat [ "Version conflict detected (expected ", show ov
, ", saw ", show cv, ")"
]
storeEvents :: Database -> GUID -> Int -> [RawEvent] -> IO ()
storeEvents database guid originatingVersion events = do
let unpackColumns [ SQLInteger v ] = v
unpackColumns columns = error $ badQueryResultMsg [show guid] columns
curVer <- runResourceT $ (sourceQuery database getCurrentVersionSql [toSQLData guid]) $$ CL.fold (\x -> max x . unpackColumns) 0
when (fromIntegral curVer /= originatingVersion) $
versionConflict originatingVersion curVer
execSql database updateCurrentVersionSql
[ toSQLData guid
, toSQLData $ originatingVersion + length events
]
forM_ events $ \e -> do
execSql database insertEventSql
[ toSQLData guid
, toSQLData $ peSequenceNumber e
, toSQLData $ peEvent e
]
retrieveEvents :: Database -> GUID -> Int -> Source (ResourceT IO) RawEvent
retrieveEvents database guid v0 = do
let unpackColumns [SQLInteger version, SQLBlob eventData] = PersistedEvent guid eventData (fromIntegral version)
unpackColumns columns = error $ badQueryResultMsg [show guid, show v0] columns
sourceQuery database selectEventsSql [toSQLData guid, toSQLData v0] $= CL.map unpackColumns
enumerateAllEvents :: Database -> Source (ResourceT IO) RawEvent
enumerateAllEvents database = do
let unpackColumns [ SQLBlob g, SQLInteger v, SQLBlob ed ] = PersistedEvent (G.fromByteString g) ed (fromIntegral v)
unpackColumns columns = error $ badQueryResultMsg [] columns
sourceQuery database enumerateAllEventsSql [] $= CL.map unpackColumns
writeSnapshot :: Database -> GUID -> RawSnapshot -> IO ()
writeSnapshot database guid (RawSnapshot v d) = do
execSql database "INSERT OR REPLACE INTO snapshots ( guid , data, version ) VALUES ( ?, ?, ? );"
[ toSQLData guid
, toSQLData d
, toSQLData v
]
getLatestSnapshot :: Database -> GUID -> IO (Maybe RawSnapshot)
getLatestSnapshot database guid = do
let unpackColumns [SQLBlob d, SQLInteger v] = Just $ RawSnapshot (fromIntegral v) d
unpackColumns columns = error $ badQueryResultMsg [show guid] columns
runResourceT $ (sourceQuery database selectSnapshotSql [toSQLData guid] $= (CL.map unpackColumns)) $$ CL.fold (\_ x -> x) Nothing
where
selectSnapshotSql = "SELECT data, version FROM snapshots WHERE guid = ?;"
newtype SQLite3EventStoreBackend = ESB Database
instance EventStoreBackend SQLite3EventStoreBackend where
esbStoreEvents (ESB database) =
storeEvents database
esbRetrieveEvents (ESB database) =
retrieveEvents database
esbEnumerateAllEvents (ESB database) =
enumerateAllEvents database
esbWriteSnapshot (ESB database) =
writeSnapshot database
esbGetLatestSnapshot (ESB database) =
getLatestSnapshot database
esbWithTransaction (ESB database) =
withTransaction database
createBackendPool :: Int -> Text -> IO (Pool SQLite3EventStoreBackend)
createBackendPool n databaseFileName = do
createPool open close 1 1 n
where
open = do
database <- SQL.open databaseFileName
execSql database createEventsSql []
execSql database createAggregateVersionsSql []
execSql database createSnapshotSql []
return $ ESB database
close (ESB db) = do
SQL.close db