module Data.CQRS.EventStore.Sqlite3
( openSqliteEventStore
, closeEventStore
) where
import Control.Exception (catch, bracket, onException, SomeException)
import Control.Monad (when, forM_)
import Data.Binary (encode, decode, Binary)
import qualified Data.ByteString.Char8 as B8
import qualified Data.ByteString.Lazy.Char8 as BSL
import Data.CQRS.EventStore
import Data.CQRS.Event (Event)
import Data.CQRS.GUID
import qualified Database.SQLite3 as SQL
import Database.SQLite3 (Database, Statement, SQLData(..), StepResult(..))
import Prelude hiding (catch)
createEventsSql :: String
createEventsSql = "CREATE TABLE IF NOT EXISTS events ( guid BLOB , ev_data BLOB , version INTEGER , PRIMARY KEY (guid, version) );"
selectEventsSql :: String
selectEventsSql = "SELECT version, ev_data FROM events WHERE guid = ?;"
insertEventSql :: String
insertEventSql = "INSERT INTO events ( guid, version, ev_data ) VALUES (?, ?, ?);"
createAggregateVersionsSql :: String
createAggregateVersionsSql = "CREATE TABLE IF NOT EXISTS versions ( guid BLOB PRIMARY KEY , version INTEGER );"
getCurrentVersionSql :: String
getCurrentVersionSql = "SELECT version FROM versions WHERE guid = ?;"
updateCurrentVersionSql :: String
updateCurrentVersionSql = "INSERT OR REPLACE INTO versions ( guid, version ) VALUES (?,?);"
beginTransaction :: Database -> IO ()
beginTransaction database = execSql database "BEGIN TRANSACTION;" []
commitTransaction :: Database -> IO ()
commitTransaction database = execSql database "COMMIT TRANSACTION;" []
rollbackTransaction :: Database -> IO ()
rollbackTransaction database = execSql database "ROLLBACK TRANSACTION;" []
toSQLBlob :: (Binary a) => a -> SQLData
toSQLBlob a = SQLBlob $ B8.concat $ BSL.toChunks $ encode a
withSqlStatement :: Database -> String -> [SQLData] -> (Statement -> IO a) -> IO a
withSqlStatement database sql parameters action =
bracket (SQL.prepare database sql) SQL.finalize $ \statement -> do
SQL.bind statement parameters
action statement
execSql :: Database -> String -> [SQLData] -> IO ()
execSql database sql parameters =
withSqlStatement database sql parameters $ \stmt -> do
_ <- SQL.step stmt
return ()
querySql :: Database -> String -> [SQLData] -> ([SQLData] -> IO a) -> IO [a]
querySql database sql parameters reader =
withSqlStatement database sql parameters go
where
go statement = loop [ ]
where
loop acc = do
res <- SQL.step statement
case res of
Done -> return $ reverse acc
Row -> do
cols <- SQL.columns statement
a <- reader cols
loop (a:acc)
badQueryResult :: GUID a -> [SQLData] -> IO b
badQueryResult guid columns =
fail $ concat ["Invalid query result for ", show guid, ": ", show columns]
versionConflict :: (Show a, Show b) => a -> b -> IO c
versionConflict ov cv =
fail $ concat [ "Version conflict detected (expected ", show ov
, ", saw ", show cv, ")"
]
storeEvents_ :: Database -> (Event e a, Binary e) => GUID a -> Int -> [e] -> IO ()
storeEvents_ database guid originatingVersion events = do
versions <- querySql database getCurrentVersionSql [toSQLBlob guid] $ \columns ->
case columns of
[ SQLInteger v ] -> return v
_ -> badQueryResult guid columns
let curVer = maximum (0 : versions)
when (fromIntegral curVer /= originatingVersion) $
versionConflict originatingVersion curVer
execSql database updateCurrentVersionSql
[ toSQLBlob guid
, SQLInteger $ fromIntegral $ originatingVersion + length events
]
forM_ (zip [1 + originatingVersion..] events) $ \(v,e) -> do
execSql database insertEventSql
[ toSQLBlob guid
, SQLInteger $ fromIntegral v
, toSQLBlob e
]
retrieveEvents_ :: (Event e a, Binary e) => Database -> GUID a -> IO (Int,[e])
retrieveEvents_ database guid = do
results <- querySql database selectEventsSql [toSQLBlob guid] $ \columns -> do
case columns of
[SQLInteger version, SQLBlob eventData] ->
return (version, decode $ BSL.fromChunks [eventData])
_ ->
badQueryResult guid columns
let maxVersion = maximum $ (:) 0 $ map fst results
return (fromIntegral maxVersion, map snd results)
withTransaction_ :: forall a . Database -> IO a -> IO a
withTransaction_ database action = do
beginTransaction database
onException runAction tryRollback
where
runAction = do
r <- action
commitTransaction database
return r
tryRollback =
catch (rollbackTransaction database) (\(_::SomeException) -> return ())
openSqliteEventStore :: String -> IO EventStore
openSqliteEventStore databaseFileName = do
database <- SQL.open databaseFileName
execSql database createEventsSql []
execSql database createAggregateVersionsSql []
return $ EventStore { storeEvents = storeEvents_ database
, retrieveEvents = retrieveEvents_ database
, withTransaction = withTransaction_ database
, closeEventStore = SQL.close database
}