module CQRSExample.Query ( setupQueryDatabase , qFindUserPassword , qProjectList , qTaskList , qTimeSheet , qUserIdFromUserName , reactToUnseenEvents ) where import Control.Monad (liftM, void) import CQRSExample.Aggregates (UserId) import CQRSExample.Events (Event(..)) import Data.Convertible (Convertible(..), convError) import Data.CQRS (enumerateEventStore, GUID, EventStore) import qualified Data.CQRS.GUID as G import Data.CQRS.PersistedEvent (PersistedEvent(..)) import Data.Enumerator (run_, (>>==)) import qualified Data.Enumerator.List as EL import Data.Foldable (forM_) import Data.Text (Text) import Data.Time.Calendar (Day) import Database.HDBC (IConnection, SqlValue(..), fromSql, quickQuery', run, runRaw, toSql, withTransaction) -- Conversion between GUID and SQLValue. instance Convertible GUID SqlValue where safeConvert g = Right $ toSql $ G.toByteString g instance Convertible SqlValue GUID where safeConvert (SqlByteString s) = Right $ G.fromByteString s safeConvert a = convError "Invalid source value" a -- Simple query with result field mapping. smq :: IConnection c => c -> String -> [SqlValue] -> ([SqlValue] -> a) -> IO [a] smq c sql params f = withTransaction c $ \_ -> fmap (map f) $ quickQuery' c sql params -- Extract a single result (if any); fails if there is more than one result. extract1 :: [a] -> Maybe a extract1 [] = Nothing extract1 [r] = Just r extract1 _ = error "Too many results" -- Initialize query database if necessary. setupQueryDatabase :: IConnection c => c -> IO () setupQueryDatabase c = do runRaw c "CREATE TABLE IF NOT EXISTS gversion ( gversion INTEGER )" runRaw c "CREATE TABLE IF NOT EXISTS projects ( guid BLOB , name TEXT , short_desc TEXT )" runRaw c "CREATE TABLE IF NOT EXISTS tasks ( guid BLOB , project_guid BLOB , short_desc TEXT , long_desc TEXT )" runRaw c "CREATE TABLE IF NOT EXISTS users ( user_id BLOB , user_name TEXT , user_pass TEXT , first_name TEXT , last_name TEXT )" runRaw c "CREATE TABLE IF NOT EXISTS work_units ( task_guid BLOB , wu_guid BLOB , wu_date TEXT , wu_duration INTEGER , wu_comment TEXT , user_id BLOB )" runRaw c "CREATE TABLE IF NOT EXISTS tasks_starred_by ( task_id BLOB, user_id BLOB , user_name TEXT )" -- Find user password from database. qFindUserPassword :: IConnection c => c -> Text -> IO (Maybe Text) qFindUserPassword c userName = fmap extract1 $ smq c "SELECT user_pass FROM users WHERE user_name = ?" [ toSql userName ] convert where convert :: [SqlValue] -> Text convert [p] = fromSql p convert _ = error "Invalid number of columns in SQL query result" -- Query project list qProjectList :: IConnection c => c -> IO [(GUID, Text, Text)] qProjectList c = smq c "SELECT guid, name, short_desc FROM projects" [ ] convert where convert :: [SqlValue] -> (GUID, Text, Text) convert [spg, spn, spsd] = (fromSql spg, fromSql spn, fromSql spsd) convert _ = error "Invalid number of columns in SQL query result" -- Query task list qTaskList :: IConnection c => String -> Maybe GUID -> c -> IO [(GUID, Text, Bool)] qTaskList userName mProjectId c = case mProjectId of Nothing -> smq c "SELECT T.guid, T.short_desc, TS.user_id IS NULL FROM tasks T LEFT OUTER JOIN tasks_starred_by TS ON T.guid = TS.task_id WHERE (TS.user_name = ? OR TS.user_name IS NULL)" [ toSql userName ] convert Just projectId -> smq c "SELECT T.guid, T.short_desc, TS.user_id IS NULL FROM tasks T LEFT OUTER JOIN tasks_starred_by TS ON T.guid = TS.task_id WHERE (TS.user_name = ? OR TS.user_name IS NULL) AND T.project_guid = ?" [ toSql userName, toSql projectId ] convert where convert :: [SqlValue] -> (GUID,Text,Bool) convert [ guid, short_desc, starred_by_user ] = (fromSql guid, fromSql short_desc, case (fromSql starred_by_user :: Int) of 0 -> True 1 -> False _ -> error "Invalid query result") convert _ = error "Invalid columns in SQL query result" -- Query work units in a date interval. The result is a list of tasks -- (IDs, short description), with daily totals (in minutes) for each task. qTimeSheet :: IConnection c => Day -> Day -> String -> c -> IO [(GUID, Text, Maybe Day, Int)] qTimeSheet startDate endDate userName c = (smq c "SELECT T.guid, W.wu_date, T.short_desc, COALESCE(SUM(W.wu_duration),0) FROM tasks T LEFT OUTER JOIN work_units W ON W.task_guid=T.guid LEFT OUTER JOIN tasks_starred_by ST ON ST.task_id=T.guid WHERE ((W.wu_duration IS NOT NULL) AND (W.wu_date>=? AND W.wu_date (GUID, Text, Maybe Day, Int) convert [ taskId, day, taskShortDesc, total ] = (fromSql taskId, fromSql taskShortDesc, fromSql day, fromSql total) convert _ = error "Invalid columns in SQL query result" -- Get user name for a given user ID. qUserNameFromUserId :: IConnection c => c -> GUID -> IO (Maybe Text) qUserNameFromUserId c userId = fmap extract1 $ smq c "SELECT U.user_name FROM users U WHERE U.user_id = ?" [ toSql userId ] convert where convert :: [SqlValue] -> Text convert [u] = fromSql u convert _ = error "Invalid columns in SQL query result" -- Get user ID from user name. qUserIdFromUserName :: IConnection c => c -> Text -> IO (Maybe GUID) qUserIdFromUserName c userName = fmap extract1 $ smq c "SELECT U.user_id FROM users U WHERE U.user_name = ?" [ toSql $ userName ] convert where convert :: [SqlValue] -> GUID convert [u] = fromSql u convert _ = error "Invalid columns in SQL query result" -- Event sourcing. reactToUnseenEvents :: IConnection c => c -> EventStore Event -> IO () reactToUnseenEvents connection eventStore = withTransaction connection $ \_ -> do -- Find the version number to start at. startVersion <- getStartVersion -- Enumerate all the events form the event store since last run. evs <- run_ (EL.consume >>== enumerateEventStore eventStore startVersion) -- React to all the events. forM_ evs $ reactToEvent connection -- Update the global version number. updateStartVersion $ startVersion + length evs -- Done. return () where getStartVersion :: IO Int getStartVersion = do liftM head $ smq connection "SELECT COALESCE(MAX(gversion), 1) FROM gversion" [ ] (\columns -> case columns of [i] -> fromSql i _ -> error "Invalid columns in SQL query result") updateStartVersion :: Int -> IO () updateStartVersion newStartVersion = do _ <- run connection "DELETE FROM gversion;" [] _ <- run connection "INSERT INTO gversion (gversion) VALUES (?);" [ toSql newStartVersion ] return () reactToEvent :: IConnection c => c -> PersistedEvent Event -> IO () reactToEvent connection ev = do let gv = peGlobalVer ev let e = peEvent ev putStrLn $ show $ "gv(" ++ show gv ++ "): guid: " ++ show g ++ " event: " ++ show e react e where g = peAggregateGUID ev react (ProjectCreated pn psd) = do go "INSERT INTO projects ( guid , name , short_desc ) VALUES ( ? , ? , ? );" [ toSql g , toSql pn , toSql psd ] react (ProjectRenamed pn) = do go "UPDATE projects SET name = ? WHERE guid = ?;" [ toSql pn , toSql g ] react (TaskAdded pid tsd) = do go "INSERT INTO tasks ( guid , project_guid , short_desc ) VALUES (?, ?, ?);" [ toSql g , toSql pid , toSql tsd ] react (RecordedWorkUnit wuId wuDay wuDuration wuComment wuUserId) = do go "INSERT INTO work_units ( task_guid , wu_guid , wu_date , wu_duration , wu_comment , user_id ) VALUES ( ? , ? , ? , ? , ? , ? );" [ toSql g , toSql wuId , toSql $ show wuDay , toSql $ wuDuration , toSql $ wuComment , toSql wuUserId ] react (UserCreated ucUserName ucPassword ucFirstName ucLastName) = do go "INSERT INTO users ( user_id , user_name , user_pass, first_name , last_name ) VALUES ( ? , ? , ? , ? , ? );" [ toSql g , toSql ucUserName , toSql ucPassword , toSql ucFirstName , toSql ucLastName ] react (TaskStarred userId) = do mUserName <- qUserNameFromUserId connection userId forM_ mUserName $ \userName -> do go "INSERT INTO tasks_starred_by ( task_id , user_id , user_name ) VALUES (?, ?, ?);" [ toSql g , toSql userId , toSql $ userName ] react (TaskUnstarred userId) = do go "DELETE FROM tasks_starred_by WHERE task_id = ? AND user_id = ?;" [ toSql g , toSql userId ] react (UserRegistered _) = do return () -- No need to record go :: String -> [SqlValue] -> IO () go sql parameters = void $ run connection sql parameters