-- Hoogle documentation, generated by Haddock
-- See Hoogle, http://www.haskell.org/hoogle/
-- | PostgreSQL adaptor for eventsourcing.
--
-- Adaptor to use PostgreSQL as a back-end with eventsourcing.
@package eventsourcing-postgresql
@version 0.9.0
module Database.CQRS.PostgreSQL.SQLQuery
-- | A wrapper around a SELECT query that instantiates ReadModel
-- so that it can be used by the application layer without said layer to
-- be aware of SQL. The implementation can then be swapped for something
-- else, e.g. for tests.
data SQLQuery req resp
SQLQuery :: (forall a. (Connection -> IO a) -> IO a) -> Query -> SQLQuery req resp
[connectionPool] :: SQLQuery req resp -> forall a. (Connection -> IO a) -> IO a
[queryTemplate] :: SQLQuery req resp -> Query
instance (Control.Monad.IO.Class.MonadIO m, Database.PostgreSQL.Simple.FromRow.FromRow resp, Database.PostgreSQL.Simple.ToRow.ToRow req) => Database.CQRS.ReadModel.ReadModel m (Database.CQRS.PostgreSQL.SQLQuery.SQLQuery req resp)
module Database.CQRS.PostgreSQL.Stream
-- | Stream of events stored in a PostgreSQL relation.
--
-- The job of sharding streams in different tables is left to the
-- database. If this is something you want to do, you can create a view
-- and a trigger on insert into that view.
data Stream identifier metadata event
-- | Make a Stream from basic information about the relation name
-- and columns.
makeStream :: forall identifier metadata event. (WritableEvent event, ToField (EncodingFormat event), ToField identifier, ToRow metadata) => (forall a. (Connection -> IO a) -> IO a) -> Query -> Query -> [Query] -> Query -> Stream identifier metadata event
-- | Make a stream from queries.
--
-- This function is less safe than makeStream and should only be
-- used when makeStream is not flexible enough. It is also closer
-- to the implementation and more subject to changes.
makeStream' :: (ToRow r, ToRow r') => (forall a. (Connection -> IO a) -> IO a) -> (Query, r) -> (forall encEvent. (ToField encEvent, ToRow metadata) => encEvent -> metadata -> ConsistencyCheck identifier -> (Query, r')) -> Query -> Stream identifier metadata event
instance (Database.CQRS.Event.Event event, Control.Monad.Error.Class.MonadError Database.CQRS.Error.Error m, Control.Monad.IO.Class.MonadIO m, GHC.Classes.Ord identifier, Database.PostgreSQL.Simple.FromField.FromField identifier, Database.PostgreSQL.Simple.ToField.ToField identifier, Database.PostgreSQL.Simple.FromRow.FromRow metadata, Database.PostgreSQL.Simple.FromField.FromField (Database.CQRS.Event.EncodingFormat event)) => Database.CQRS.Stream.Stream m (Database.CQRS.PostgreSQL.Stream.Stream identifier metadata event)
instance (Database.CQRS.Event.WritableEvent event, Control.Monad.Error.Class.MonadError Database.CQRS.Error.Error m, Control.Monad.IO.Class.MonadIO m, GHC.Classes.Ord identifier, Database.PostgreSQL.Simple.FromField.FromField identifier, Database.PostgreSQL.Simple.ToField.ToField identifier, Database.PostgreSQL.Simple.FromField.FromField (Database.CQRS.Event.EncodingFormat event), Database.PostgreSQL.Simple.ToField.ToField (Database.CQRS.Event.EncodingFormat event), Database.PostgreSQL.Simple.FromRow.FromRow metadata, Database.PostgreSQL.Simple.ToRow.ToRow metadata) => Database.CQRS.Stream.WritableStream m (Database.CQRS.PostgreSQL.Stream.Stream identifier metadata event)
module Database.CQRS.PostgreSQL.StreamFamily
-- | Family of event streams stored in a PostgreSQL relation.
--
-- Each stream should have a unique stream identifier and event
-- identifiers must be unique within a stream, but not necessarily across
-- them.
--
-- allNewEvents starts a new thread which reads notifications on
-- the given channel and writes them to a transactional bounded queue (a
-- TBQueue) which is then consumed by the returned
-- Producer. The maximum size of this queue is hard-coded to
-- 100. Should an exception be raised in the listening thread, it is
-- thrown back by the producer.
data StreamFamily streamId eventId metadata event
StreamFamily :: (forall a. (Connection -> IO a) -> IO a) -> Query -> Query -> (ByteString -> Either String (streamId, eventId)) -> Query -> Query -> [Query] -> Query -> StreamFamily streamId eventId metadata event
[connectionPool] :: StreamFamily streamId eventId metadata event -> forall a. (Connection -> IO a) -> IO a
[relation] :: StreamFamily streamId eventId metadata event -> Query
[notificationChannel] :: StreamFamily streamId eventId metadata event -> Query
[parseNotification] :: StreamFamily streamId eventId metadata event -> ByteString -> Either String (streamId, eventId)
[streamIdentifierColumn] :: StreamFamily streamId eventId metadata event -> Query
[eventIdentifierColumn] :: StreamFamily streamId eventId metadata event -> Query
[metadataColumns] :: StreamFamily streamId eventId metadata event -> [Query]
[eventColumn] :: StreamFamily streamId eventId metadata event -> Query
makeStreamFamily :: (forall a. (Connection -> IO a) -> IO a) -> Query -> Query -> (ByteString -> Either String (streamId, eventId)) -> Query -> Query -> [Query] -> Query -> StreamFamily streamId eventId metadata event
instance (Database.PostgreSQL.Simple.ToField.ToField a, Database.PostgreSQL.Simple.ToField.ToField b) => Database.PostgreSQL.Simple.ToField.ToField (Database.CQRS.PostgreSQL.StreamFamily.Pair a b)
instance (Database.CQRS.Event.Event event, Control.Monad.Error.Class.MonadError Database.CQRS.Error.Error m, Control.Monad.IO.Class.MonadIO m, Database.PostgreSQL.Simple.FromField.FromField eventId, Database.PostgreSQL.Simple.FromField.FromField streamId, Database.PostgreSQL.Simple.FromField.FromField (Database.CQRS.Event.EncodingFormat event), Database.PostgreSQL.Simple.FromRow.FromRow metadata, Database.PostgreSQL.Simple.ToField.ToField eventId, Database.PostgreSQL.Simple.ToField.ToField streamId) => Database.CQRS.StreamFamily.StreamFamily m (Database.CQRS.PostgreSQL.StreamFamily.StreamFamily streamId eventId metadata event)
module Database.CQRS.PostgreSQL.Migration
-- | Migrate a stream family stored in a PostgreSQL database to the same
-- database. It is meant to run in parallel with the application using
-- the stream family without disturbing it.
--
-- An alternative use of this is to migrate a stream family to a new
-- relation without swapping the tables at the end. The old table stays
-- in use by the application and the new one can be read by an external
-- system for instance.
--
-- If the new table already exists (and the initialisation query does not
-- fail in that case,) the migration will start over from the point it
-- left off.
migrate :: forall streamId eventId metadata event transformedStreamFamily m. (WritableEvent (EventType (StreamType transformedStreamFamily)), Stream m (StreamType transformedStreamFamily), StreamFamily m transformedStreamFamily, MonadError Error m, Hashable (StreamIdentifier transformedStreamFamily), MonadIO m, Ord (EventIdentifier (StreamType transformedStreamFamily)), Ord (StreamIdentifier transformedStreamFamily), FromField (StreamIdentifier transformedStreamFamily), FromField (EventIdentifier (StreamType transformedStreamFamily)), FromRow (EventMetadata (StreamType transformedStreamFamily)), FromField (EncodingFormat (EventType (StreamType transformedStreamFamily))), ToField (StreamIdentifier transformedStreamFamily), ToField (EventIdentifier (StreamType transformedStreamFamily)), ToRow (EventMetadata (StreamType transformedStreamFamily)), ToField (EncodingFormat (EventType (StreamType transformedStreamFamily))), Show (EventIdentifier (StreamType transformedStreamFamily))) => StreamFamily streamId eventId metadata event -> (StreamFamily streamId eventId metadata event -> transformedStreamFamily) -> Query -> Query -> Query -> [Query] -> Query -> (Query -> Query) -> (Query -> Query) -> (Query -> Query) -> m ()
module Database.CQRS.PostgreSQL.TrackingTable
data TrackingTable streamId eventId st
-- | Create tracking table if it doesn't exist already.
--
-- A tracking table is a table used to track the last events processed by
-- a projection for each stream in a stream family. It allows them to
-- restart from where they have left off.
createTrackingTable :: (MonadError Error m, MonadIO m) => (forall r. (Connection -> IO r) -> IO r) -> Query -> Query -> Query -> Query -> m (TrackingTable streamId eventId st)
-- | Return SQL query to upsert a row in a tracking table.
upsertTrackingTable :: (ToField streamId, ToField eventId, ToField st) => TrackingTable streamId eventId st -> streamId -> eventId -> Either String st -> SqlAction
-- | Update the tracking table for the given stream.
doUpsertTrackingTable :: (MonadError Error m, MonadIO m, ToField eventId, ToField streamId, ToField st) => TrackingTable streamId eventId st -> streamId -> eventId -> Either String st -> m ()
instance (Control.Monad.Error.Class.MonadError Database.CQRS.Error.Error m, Control.Monad.IO.Class.MonadIO m, Database.PostgreSQL.Simple.FromField.FromField eventId, Database.PostgreSQL.Simple.FromField.FromField st, Database.PostgreSQL.Simple.FromField.FromField streamId, Database.PostgreSQL.Simple.ToField.ToField eventId, Database.PostgreSQL.Simple.ToField.ToField st, Database.PostgreSQL.Simple.ToField.ToField streamId) => Database.CQRS.Projection.TrackingTable m (Database.CQRS.PostgreSQL.TrackingTable.TrackingTable streamId eventId st) streamId eventId st
instance Database.PostgreSQL.Simple.FromField.FromField st => Database.PostgreSQL.Simple.FromField.FromField (Database.CQRS.PostgreSQL.TrackingTable.OptionalState st)
module Database.CQRS.PostgreSQL.Projection
type Projection event st = Projection event st SqlAction
-- | Execute the SQL actions and update the tracking table in one
-- transaction.
--
-- The custom actions are transformed into a list of SQL actions by the
-- given function. See fromTabularDataActions for an example.
executeSqlActions :: forall streamId eventId action m st. (MonadError Error m, MonadIO m, ToField eventId, ToField streamId, ToField st) => ([action] -> [SqlAction]) -> (forall r. (Connection -> IO r) -> IO r) -> TrackingTable streamId eventId st -> Consumer (st, [action], streamId, eventId) m ()
-- | Execute custom actions by calling the runner function on each action
-- in turn and updating the tracking table accordingly.
executeCustomActions :: forall streamId eventId action m st. (MonadError Error m, MonadIO m, ToField eventId, ToField streamId, ToField st) => (action -> m (Either String (m ()))) -> TrackingTable streamId eventId st -> Consumer (st, [action], streamId, eventId) m ()
fromTabularDataActions :: FromTabularDataAction cols => Query -> [TabularDataAction cols] -> [SqlAction]
instance (Database.CQRS.TabularData.Internal.AllColumns Database.PostgreSQL.Simple.ToField.ToField (Database.CQRS.TabularData.Internal.Flatten ('Database.CQRS.TabularData.Internal.WithUniqueKey keyCols cols)), Database.CQRS.TabularData.Internal.AllColumns Database.PostgreSQL.Simple.ToField.ToField keyCols, Database.CQRS.TabularData.Internal.AllColumns Database.PostgreSQL.Simple.ToField.ToField cols, Database.CQRS.TabularData.Internal.MergeSplitTuple keyCols cols) => Database.CQRS.PostgreSQL.Projection.FromTabularDataAction ('Database.CQRS.TabularData.Internal.WithUniqueKey keyCols cols)
instance Database.CQRS.TabularData.Internal.AllColumns Database.PostgreSQL.Simple.ToField.ToField cols => Database.CQRS.PostgreSQL.Projection.FromTabularDataAction ('Database.CQRS.TabularData.Internal.Flat cols)
module Database.CQRS.PostgreSQL
-- | Stream of events stored in a PostgreSQL relation.
--
-- The job of sharding streams in different tables is left to the
-- database. If this is something you want to do, you can create a view
-- and a trigger on insert into that view.
data Stream identifier metadata event
-- | Make a Stream from basic information about the relation name
-- and columns.
makeStream :: forall identifier metadata event. (WritableEvent event, ToField (EncodingFormat event), ToField identifier, ToRow metadata) => (forall a. (Connection -> IO a) -> IO a) -> Query -> Query -> [Query] -> Query -> Stream identifier metadata event
-- | Make a stream from queries.
--
-- This function is less safe than makeStream and should only be
-- used when makeStream is not flexible enough. It is also closer
-- to the implementation and more subject to changes.
makeStream' :: (ToRow r, ToRow r') => (forall a. (Connection -> IO a) -> IO a) -> (Query, r) -> (forall encEvent. (ToField encEvent, ToRow metadata) => encEvent -> metadata -> ConsistencyCheck identifier -> (Query, r')) -> Query -> Stream identifier metadata event
-- | Family of event streams stored in a PostgreSQL relation.
--
-- Each stream should have a unique stream identifier and event
-- identifiers must be unique within a stream, but not necessarily across
-- them.
--
-- allNewEvents starts a new thread which reads notifications on
-- the given channel and writes them to a transactional bounded queue (a
-- TBQueue) which is then consumed by the returned
-- Producer. The maximum size of this queue is hard-coded to
-- 100. Should an exception be raised in the listening thread, it is
-- thrown back by the producer.
data StreamFamily streamId eventId metadata event
makeStreamFamily :: (forall a. (Connection -> IO a) -> IO a) -> Query -> Query -> (ByteString -> Either String (streamId, eventId)) -> Query -> Query -> [Query] -> Query -> StreamFamily streamId eventId metadata event
type Projection event st = Projection event st SqlAction
-- | Execute the SQL actions and update the tracking table in one
-- transaction.
--
-- The custom actions are transformed into a list of SQL actions by the
-- given function. See fromTabularDataActions for an example.
executeSqlActions :: forall streamId eventId action m st. (MonadError Error m, MonadIO m, ToField eventId, ToField streamId, ToField st) => ([action] -> [SqlAction]) -> (forall r. (Connection -> IO r) -> IO r) -> TrackingTable streamId eventId st -> Consumer (st, [action], streamId, eventId) m ()
-- | Execute custom actions by calling the runner function on each action
-- in turn and updating the tracking table accordingly.
executeCustomActions :: forall streamId eventId action m st. (MonadError Error m, MonadIO m, ToField eventId, ToField streamId, ToField st) => (action -> m (Either String (m ()))) -> TrackingTable streamId eventId st -> Consumer (st, [action], streamId, eventId) m ()
fromTabularDataActions :: FromTabularDataAction cols => Query -> [TabularDataAction cols] -> [SqlAction]
-- | Create tracking table if it doesn't exist already.
--
-- A tracking table is a table used to track the last events processed by
-- a projection for each stream in a stream family. It allows them to
-- restart from where they have left off.
createTrackingTable :: (MonadError Error m, MonadIO m) => (forall r. (Connection -> IO r) -> IO r) -> Query -> Query -> Query -> Query -> m (TrackingTable streamId eventId st)