import Control.Concurrent (forkIO) import Control.Concurrent.STM (atomically, TChan) import qualified Control.Concurrent.STM.TChan as C import Control.Concurrent.STM.TVar (TVar, newTVarIO) import Control.Monad (void, forM_) import Control.Monad.Trans.Class (lift) import Data.Conduit (runResourceT, ($$), awaitForever) import Data.CQRS.EventStore.Backend.Memory (createBackendPool) import Data.CQRS.Query (EventStoreBackend, Repository, enumerateAndStreamEvents) import Data.CQRS.Repository (Settings(..)) import qualified Data.CQRS.Repository as R import Network.Wai.EventSource (ServerEvent(..)) import Web.Scotty (scotty) import CQRSExample.Events import CQRSExample.Instances () import CQRSExample.Notifications import CQRSExample.Query import CQRSExample.Routing -- Source of refresh events to the browser. eventSourcingThread :: EventStoreBackend b => TVar QueryState -> TChan ServerEvent -> Repository Event b -> IO () eventSourcingThread qs serverEvents repository = do putStrLn "Sourcing events..." -- Sink for sourced events runResourceT $ (enumerateAndStreamEvents repository) $$ sink where sink = awaitForever $ \events -> lift $ lift $ do forM_ events $ runQuery qs . reactToEvent atomically . C.writeTChan serverEvents . toServerEvent $ calculateNotifications events -- Start serving the application. startServing :: IO () startServing = do qState <- newTVarIO newQueryState -- Queue of json events to send to browser. serverEvents <- atomically $ C.newBroadcastTChan -- Create the repository. repository <- do let repositorySettings = R.defaultSettings { settingsSnapshotFrequency = Just 10 } pool <- createBackendPool 5 R.newRepository repositorySettings pool -- Start sourcing events. void $ forkIO $ do eventSourcingThread qState serverEvents repository -- Web serving thread. void $ forkIO $ do scotty 8000 $ routes qState repository serverEvents main :: IO () main = do putStrLn "Starting..." startServing putStrLn "Press to quit" _ <- getLine return ()