module CQRSExample.Query ( Refresh(..) , setupQueryDatabase , qFindUser , qProjectList , qTaskList , qTimeSheet , reactToUnseenEvents ) where import Control.Monad (liftM, void) import qualified Data.ByteString.Char8 as BS8 import Data.Conduit (runResourceT, ($$)) import qualified Data.Conduit.List as CL import Data.Convertible (Convertible(..), convError) import Data.CQRS (enumerateEventStore, GUID, EventStore) import qualified Data.CQRS.GUID as G import Data.CQRS.PersistedEvent (PersistedEvent(..)) import Data.Foldable (forM_) import Data.Set (Set) import qualified Data.Set as S import Data.Text (Text) import Data.Time.Calendar (Day) import Database.HDBC (IConnection, SqlValue(..), fromSql, quickQuery', run, runRaw, toSql, withTransaction) import CQRSExample.Aggregates (UserId) import CQRSExample.Duration (Duration) import qualified CQRSExample.Duration as D import CQRSExample.Events (Event(..), ProjectEvent(..), TaskEvent(..), UserEvent(..), SiteEvent(..)) -- Refresh event. data Refresh = RefreshTasks | RefreshProjects | RefreshTimesheet deriving (Show, Eq, Ord) -- Conversion between Duration and SQLValue. instance Convertible Duration SqlValue where safeConvert duration = Right $ toSql $ D.toSeconds duration instance Convertible SqlValue Duration where safeConvert (SqlInteger i) = Right $ D.seconds i safeConvert (SqlByteString s) = Right $ D.seconds $ read $ BS8.unpack s safeConvert a = convError "Invalid source value" a -- Conversion between GUID and SQLValue. instance Convertible GUID SqlValue where safeConvert g = Right $ toSql $ G.toByteString g instance Convertible SqlValue GUID where safeConvert (SqlByteString s) = Right $ G.fromByteString s safeConvert a = convError "Invalid source value" a -- Simple query with result field mapping. smq :: IConnection c => c -> String -> [SqlValue] -> ([SqlValue] -> a) -> IO [a] smq c sql params f = withTransaction c $ \_ -> fmap (map f) $ quickQuery' c sql params -- Extract a single result (if any); fails if there is more than one result. extract1 :: [a] -> Maybe a extract1 [] = Nothing extract1 [r] = Just r extract1 _ = error "Too many results" -- Initialize query database if necessary. setupQueryDatabase :: IConnection c => c -> IO () setupQueryDatabase c = do runRaw c "CREATE TABLE IF NOT EXISTS gversion ( gversion INTEGER )" runRaw c "CREATE TABLE IF NOT EXISTS projects ( guid BLOB , name TEXT , short_desc TEXT )" runRaw c "CREATE TABLE IF NOT EXISTS tasks ( guid BLOB , project_guid BLOB , short_desc TEXT , long_desc TEXT , estimate INTEGER , user_guid BLOB , created_ts TEXT )" runRaw c "CREATE TABLE IF NOT EXISTS users ( user_id BLOB , user_name TEXT , user_pass TEXT , first_name TEXT , last_name TEXT )" runRaw c "CREATE TABLE IF NOT EXISTS work_units ( task_guid BLOB , wu_guid BLOB , wu_date TEXT , wu_duration INTEGER , wu_comment TEXT , user_id BLOB )" runRaw c "CREATE TABLE IF NOT EXISTS tasks_starred_by ( task_id BLOB, user_id BLOB )" -- Find user password from database. qFindUser :: IConnection c => c -> Text -> IO (Maybe (GUID, Text)) qFindUser c userName = fmap extract1 $ smq c "SELECT U.user_id, U.user_pass FROM users U WHERE U.user_name = ?" [ toSql userName ] convert where convert :: [SqlValue] -> (GUID, Text) convert [uid, p] = (fromSql uid, fromSql p) convert _ = error "Invalid number of columns in SQL query result" -- Query project list qProjectList :: IConnection c => c -> IO [(GUID, Text, Text)] qProjectList c = smq c "SELECT guid, name, short_desc FROM projects" [ ] convert where convert :: [SqlValue] -> (GUID, Text, Text) convert [spg, spn, spsd] = (fromSql spg, fromSql spn, fromSql spsd) convert _ = error "Invalid number of columns in SQL query result" -- Query task list qTaskList :: IConnection c => UserId -> Maybe GUID -> c -> IO [(GUID, Text, Bool)] qTaskList userId mProjectId c = case mProjectId of Nothing -> smq c "SELECT T.guid, T.short_desc, TS.user_id IS NULL FROM tasks T LEFT OUTER JOIN tasks_starred_by TS ON T.guid = TS.task_id WHERE (TS.user_id = ? OR TS.user_id IS NULL)" [ toSql userId ] convert Just projectId -> smq c "SELECT T.guid, T.short_desc, TS.user_id IS NULL FROM tasks T LEFT OUTER JOIN tasks_starred_by TS ON T.guid = TS.task_id WHERE (TS.user_id = ? OR TS.user_id IS NULL) AND T.project_guid = ?" [ toSql userId, toSql projectId ] convert where convert :: [SqlValue] -> (GUID,Text,Bool) convert [ guid, short_desc, starred_by_user ] = (fromSql guid, fromSql short_desc, case (fromSql starred_by_user :: Int) of 0 -> True 1 -> False _ -> error "Invalid query result") convert _ = error "Invalid columns in SQL query result" -- Query work units in a date interval. The result is a list of tasks -- (IDs, short description), with daily totals (in minutes) for each task. qTimeSheet :: IConnection c => Day -> Day -> UserId -> c -> IO [(GUID, Text, Maybe Day, Duration)] qTimeSheet startDate endDate userId c = (smq c "SELECT T.guid, W.wu_date, T.short_desc, COALESCE(SUM(W.wu_duration),0) FROM tasks T LEFT OUTER JOIN work_units W ON W.task_guid=T.guid LEFT OUTER JOIN tasks_starred_by ST ON ST.task_id=T.guid WHERE ((W.wu_duration IS NOT NULL) AND (W.wu_date>=? AND W.wu_date (GUID, Text, Maybe Day, Duration) convert [ taskId, day, taskShortDesc, total ] = (fromSql taskId, fromSql taskShortDesc, fromSql day, fromSql total) convert _ = error "Invalid columns in SQL query result" -- Event sourcing. reactToUnseenEvents :: IConnection c => c -> EventStore Event -> IO (Set Refresh) reactToUnseenEvents connection eventStore = withTransaction connection $ \_ -> do -- Find the version number to start at. startVersion <- getStartVersion -- Enumerate all the events form the event store since last run. evs <- runResourceT $ enumerateEventStore eventStore startVersion $$ CL.consume -- React to all the events. forM_ evs $ reactToEvent connection -- Update the global version number. updateStartVersion $ startVersion + length evs -- Done. return $ S.fromList $ calculateRefresh $ evs where getStartVersion :: IO Int getStartVersion = do liftM head $ smq connection "SELECT COALESCE(MAX(gversion), 1) FROM gversion" [ ] (\columns -> case columns of [i] -> fromSql i _ -> error "Invalid columns in SQL query result") updateStartVersion :: Int -> IO () updateStartVersion newStartVersion = do _ <- run connection "DELETE FROM gversion;" [] _ <- run connection "INSERT INTO gversion (gversion) VALUES (?);" [ toSql newStartVersion ] return () calculateRefresh :: [PersistedEvent Event] -> [Refresh] calculateRefresh pes = concat $ map (f . peEvent) pes where f (ProjectEvent _) = [RefreshProjects] f (TaskEvent te) = fte te f (UserEvent _) = [] f (SiteEvent _) = [] fte (TaskAdded _ _ _ _ _) = [RefreshTasks] fte (TaskStarred _) = [RefreshTasks, RefreshTimesheet] fte (TaskUnstarred _) = [RefreshTasks, RefreshTimesheet] fte (RecordedWorkUnit _ _ _ _ _) = [RefreshTimesheet] reactToEvent :: IConnection c => c -> PersistedEvent Event -> IO () reactToEvent connection ev = do let gv = peGlobalVer ev let e = peEvent ev putStrLn $ show $ "gv(" ++ show gv ++ "): guid: " ++ show g ++ " event: " ++ show e react e where g = peAggregateGUID ev reactProject (ProjectCreated pn psd) = do go "INSERT INTO projects ( guid , name , short_desc ) VALUES ( ? , ? , ? );" [ toSql g , toSql pn , toSql psd ] reactProject (ProjectRenamed pn) = do go "UPDATE projects SET name = ? WHERE guid = ?;" [ toSql pn , toSql g ] reactTask (TaskAdded pid tsd est uid ts) = do go "INSERT INTO tasks ( guid , project_guid , short_desc , estimate , user_guid , created_ts) VALUES (?, ?, ?, ? , ?, ?);" [ toSql g , toSql pid , toSql tsd , toSql est , toSql uid , toSql ts ] reactTask (TaskStarred userId) = do go "INSERT INTO tasks_starred_by ( task_id , user_id ) VALUES (?, ?);" [ toSql g , toSql userId ] reactTask (TaskUnstarred userId) = do go "DELETE FROM tasks_starred_by WHERE task_id = ? AND user_id = ?;" [ toSql g , toSql userId ] reactTask (RecordedWorkUnit wuId wuDay wuDuration wuComment wuUserId) = do go "INSERT INTO work_units ( task_guid , wu_guid , wu_date , wu_duration , wu_comment , user_id ) VALUES ( ? , ? , ? , ? , ? , ? );" [ toSql g , toSql wuId , toSql $ show wuDay , toSql $ wuDuration , toSql $ wuComment , toSql wuUserId ] reactUser (UserCreated ucUserName ucPassword ucFirstName ucLastName) = do go "INSERT INTO users ( user_id , user_name , user_pass, first_name , last_name ) VALUES ( ? , ? , ? , ? , ? );" [ toSql g , toSql ucUserName , toSql ucPassword , toSql ucFirstName , toSql ucLastName ] reactSite (UserRegistered _) = return () -- No need to record react (ProjectEvent p) = reactProject p react (TaskEvent e) = reactTask e react (UserEvent e) = reactUser e react (SiteEvent e) = reactSite e go :: String -> [SqlValue] -> IO () go sql parameters = void $ run connection sql parameters