{-# LANGUAGE DeriveFunctor #-} {-# LANGUAGE ExistentialQuantification #-} {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE FlexibleInstances #-} {-# LANGUAGE LambdaCase #-} {-# LANGUAGE MultiParamTypeClasses #-} {-# LANGUAGE RecordWildCards #-} {-# LANGUAGE TupleSections #-} {-# LANGUAGE TypeFamilies #-} module Database.CQRS.Transformer ( -- * Transformed stream Transformer , TransformedStream , transformStream -- * Transformed stream family , TransformedStreamFamily , transformStreamFamily -- * Transform monad , Transform , pushEvent , mergeEvents , flushEvents , failTransformer ) where import Control.Monad (forM_) import Control.Monad.Trans (lift) import Data.Functor ((<&>)) import Data.Hashable (Hashable) import qualified Data.HashMap.Strict as HM import qualified Control.Monad.Free as Free import qualified Control.Monad.State as St import qualified Pipes import qualified Pipes.Prelude as Pipes import qualified Database.CQRS.Stream as CQRS import qualified Database.CQRS.StreamFamily as CQRS type Transformer inputEvent eventId event = inputEvent -> Transform eventId event () data TransformedStream m identifier metadata event = forall stream. CQRS.Stream m stream => TransformedStream { transformer :: Transformer (Either (CQRS.EventIdentifier stream, String) (CQRS.EventWithContext' stream)) identifier (CQRS.EventWithContext identifier metadata event) , inputStream :: stream , reverseIdentifier :: identifier -> m (CQRS.EventIdentifier stream) } transformStream :: CQRS.Stream m stream => Transformer (Either (CQRS.EventIdentifier stream, String) (CQRS.EventWithContext' stream)) identifier (CQRS.EventWithContext identifier metadata event) -> (identifier -> m (CQRS.EventIdentifier stream)) -> stream -> TransformedStream m identifier metadata event transformStream transformer reverseIdentifier inputStream = TransformedStream{..} instance Monad m => CQRS.Stream m (TransformedStream m identifier metadata event) where type EventType (TransformedStream m identifier metadata event) = event type EventIdentifier (TransformedStream m identifier metadata event) = identifier type EventMetadata (TransformedStream m identifier metadata event) = metadata streamEvents = transformedStreamStreamEvents transformedStreamStreamEvents :: Monad m => TransformedStream m identifier metadata event -> CQRS.StreamBounds identifier -> Pipes.Producer [ Either (identifier, String) (CQRS.EventWithContext identifier metadata event) ] m () transformedStreamStreamEvents TransformedStream{..} bounds = do inputBounds <- lift $ traverse reverseIdentifier bounds let inputs = Pipes.hoist lift $ CQRS.streamEvents inputStream inputBounds finalEvents <- (\f -> Pipes.foldM f (pure []) pure inputs) $ \waitingEvents batch -> do let (batches, waitingEvents') = runTransform waitingEvents . mapM_ transformer $ batch Pipes.each $ batches <&> \case Left err -> [Left err] Right b -> map Right b -- If there are too many waiting events, we flush them anyway to avoid -- using too much memmory. case drop 100 waitingEvents' of [] -> pure waitingEvents' _ -> do Pipes.yield $ map Right waitingEvents' pure [] Pipes.yield $ map Right finalEvents data TransformedStreamFamily m streamId eventId metadata event = forall streamFamily. ( Hashable (CQRS.StreamIdentifier streamFamily) , Ord (CQRS.StreamIdentifier streamFamily) , CQRS.Stream m (CQRS.StreamType streamFamily) , CQRS.StreamFamily m streamFamily ) => TransformedStreamFamily { inputStreamFamily :: streamFamily , streamTransformer :: Transformer (Either (CQRS.EventIdentifier (CQRS.StreamType streamFamily), String) (CQRS.EventWithContext' (CQRS.StreamType streamFamily))) eventId (CQRS.EventWithContext eventId metadata event) , makeStreamIdentifier :: CQRS.StreamIdentifier streamFamily -> m streamId , reverseStreamIdentifier :: streamId -> m (CQRS.StreamIdentifier streamFamily) , makeEventIdentifier :: CQRS.EventIdentifier (CQRS.StreamType streamFamily) -> m eventId , reverseEventIdentifier :: eventId -> m (CQRS.EventIdentifier (CQRS.StreamType streamFamily)) } transformStreamFamily :: forall m streamId eventId metadata event streamFamily. ( Hashable (CQRS.StreamIdentifier streamFamily) , Ord (CQRS.StreamIdentifier streamFamily) , CQRS.Stream m (CQRS.StreamType streamFamily) , CQRS.StreamFamily m streamFamily ) => Transformer (Either (CQRS.EventIdentifier (CQRS.StreamType streamFamily), String) (CQRS.EventWithContext' (CQRS.StreamType streamFamily))) eventId (CQRS.EventWithContext eventId metadata event) -> (CQRS.StreamIdentifier streamFamily -> m streamId) -> (streamId -> m (CQRS.StreamIdentifier streamFamily)) -> (CQRS.EventIdentifier (CQRS.StreamType streamFamily) -> m eventId) -> (eventId -> m (CQRS.EventIdentifier (CQRS.StreamType streamFamily))) -> streamFamily -> TransformedStreamFamily m streamId eventId metadata event transformStreamFamily streamTransformer makeStreamIdentifier reverseStreamIdentifier makeEventIdentifier reverseEventIdentifier inputStreamFamily = TransformedStreamFamily{..} instance Monad m => CQRS.StreamFamily m (TransformedStreamFamily m streamId eventId metadata event) where type StreamType (TransformedStreamFamily m streamId eventId metadata event) = TransformedStream m eventId metadata event type StreamIdentifier (TransformedStreamFamily m streamId eventId metadata event) = streamId getStream = transformedStreamFamilyGetStream allNewEvents = transformedStreamFamilyAllNewEvents latestEventIdentifiers = transformedStreamFamilyLatestEventIdentifiers transformedStreamFamilyGetStream :: Monad m => TransformedStreamFamily m streamId eventId metadata event -> streamId -> m (TransformedStream m eventId metadata event) transformedStreamFamilyGetStream TransformedStreamFamily{..} streamId = do inputStreamId <- reverseStreamIdentifier streamId inputStream <- CQRS.getStream inputStreamFamily inputStreamId pure $ transformStream streamTransformer reverseEventIdentifier inputStream transformedStreamFamilyAllNewEvents :: Monad m => TransformedStreamFamily m streamId eventId metadata event -> m (Pipes.Producer [ ( streamId , Either (eventId, String) (CQRS.EventWithContext eventId metadata event) ) ] m a) transformedStreamFamilyAllNewEvents TransformedStreamFamily{..} = do inputNewEvents <- CQRS.allNewEvents inputStreamFamily pure $ Pipes.for inputNewEvents $ \batch -> do let eventsByStream = HM.toList . HM.fromListWith (++) . map (fmap pure) $ batch forM_ eventsByStream $ \(inputStreamId, events) -> do streamId <- lift $ makeStreamIdentifier inputStreamId let (batches, waitingEvents) = runTransform [] . mapM_ streamTransformer $ events Pipes.each $ (batches ++ [Right waitingEvents]) <&> \case Left err -> [(streamId, Left err)] Right b -> map ((streamId,) . Right) b transformedStreamFamilyLatestEventIdentifiers :: Monad m => TransformedStreamFamily m streamId eventId metadata event -> Pipes.Producer (streamId, eventId) m () transformedStreamFamilyLatestEventIdentifiers TransformedStreamFamily{..} = do let inputIds = CQRS.latestEventIdentifiers inputStreamFamily Pipes.for inputIds $ \(inputStreamId, inputEventId) -> do streamId <- lift $ makeStreamIdentifier inputStreamId eventId <- lift $ makeEventIdentifier inputEventId Pipes.yield (streamId, eventId) data TransformF eventId event a = PushEvent a event | MergeEvents ([event] -> (a, [event])) | FlushEvents a | Failure a eventId String deriving Functor -- | Monad in which you can push, merge and flush events. type Transform eventId event = Free.Free (TransformF eventId event) -- | Run the transformation starting with some waiting events and returning -- batches of events to flush and a new list of waiting events to be fed to the -- next call. If the transformer fails, return the error instead. -- -- All given events will be processed except in case of failure. runTransform :: [event] -> Transform eventId event () -> ([Either (eventId, String) [event]], [event]) runTransform lastWaitingEvents = flip St.execState ([], lastWaitingEvents) . Free.foldFree interpreter where interpreter :: TransformF eventId event a -> St.State ([Either (eventId, String) [event]], [event]) a interpreter = \case PushEvent x event -> do St.modify $ \(batches, waitingEvents) -> (batches, waitingEvents ++ [event]) pure x MergeEvents f -> St.state $ \(batches, waitingEvents) -> let (x, waitingEvents') = f waitingEvents st = (batches, waitingEvents') in (x, st) FlushEvents x -> do St.modify $ \(batches, waitingEvents) -> (batches ++ [Right waitingEvents], []) pure x Failure x eventId err -> do St.modify $ \(batches, waitingEvents) -> (batches ++ [Right waitingEvents, Left (eventId, err)], []) pure x -- | Push a new event at the end of the queue. pushEvent :: event -> Transform eventId event () pushEvent = Free.liftF . PushEvent () -- | Apply a function to the queue of event returning a value and a new queue, -- sets the queue to the new one and return the value. -- -- The intent is to allow a new event to be merged in a previous one if possible -- to make the new event stream more compact. mergeEvents :: ([event] -> (a, [event])) -> Transform eventId event a mergeEvents = Free.liftF . MergeEvents -- | Flush the queue so it can be processed downstream, e.g. sent to a message -- broker. -- -- Flushing may also occur automatically. flushEvents :: Transform eventId event () flushEvents = Free.liftF $ FlushEvents () -- | Flush the events and push an error downstream. failTransformer :: eventId -> String -> Transform eventId event () failTransformer eventId err = Free.liftF $ Failure () eventId err