-- | Postgres events with state as an IORef
module DomainDriven.Persistance.Postgres.Internal where

import Control.Monad
import Control.Monad.Catch
import Control.Monad.IO.Class
import Data.Aeson
import Data.Foldable
import Data.Generics.Product
import Data.IORef
import Data.Int
import Data.Sequence (Seq (..))
import qualified Data.Sequence as Seq
import Data.String
import Database.PostgreSQL.Simple as PG
import qualified Database.PostgreSQL.Simple.Cursor as Cursor
import DomainDriven.Persistance.Class
import DomainDriven.Persistance.Postgres.Types
import GHC.Generics (Generic)
import Lens.Micro
    ( to
    , (^.)
    )
import qualified Streamly.Data.Unfold as Unfold
import qualified Streamly.Prelude as S
import UnliftIO (MonadUnliftIO (..))
import UnliftIO.Pool
    ( LocalPool
    , Pool
    , createPool
    , destroyResource
    , putResource
    , takeResource
    , withResource
    )
import Prelude

data PostgresEvent model event = PostgresEvent
    { forall model event. PostgresEvent model event -> Pool Connection
connectionPool :: Pool Connection
    , forall model event. PostgresEvent model event -> String
eventTableName :: EventTableName
    , forall model event.
PostgresEvent model event -> IORef (NumberedModel model)
modelIORef :: IORef (NumberedModel model)
    , forall model event.
PostgresEvent model event -> model -> Stored event -> model
app :: model -> Stored event -> model
    , forall model event. PostgresEvent model event -> model
seed :: model
    , forall model event. PostgresEvent model event -> Int
chunkSize :: ChunkSize
    -- ^ Number of events read from postgres per batch
    }
    deriving (forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
forall model event x.
Rep (PostgresEvent model event) x -> PostgresEvent model event
forall model event x.
PostgresEvent model event -> Rep (PostgresEvent model event) x
$cto :: forall model event x.
Rep (PostgresEvent model event) x -> PostgresEvent model event
$cfrom :: forall model event x.
PostgresEvent model event -> Rep (PostgresEvent model event) x
Generic)

data PostgresEventTrans model event = PostgresEventTrans
    { forall model event.
PostgresEventTrans model event -> OngoingTransaction
transaction :: OngoingTransaction
    , forall model event. PostgresEventTrans model event -> String
eventTableName :: EventTableName
    , forall model event.
PostgresEventTrans model event -> IORef (NumberedModel model)
modelIORef :: IORef (NumberedModel model)
    , forall model event.
PostgresEventTrans model event -> model -> Stored event -> model
app :: model -> Stored event -> model
    , forall model event. PostgresEventTrans model event -> model
seed :: model
    , forall model event. PostgresEventTrans model event -> Int
chunkSize :: ChunkSize
    -- ^ Number of events read from postgres per batch
    }
    deriving (forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
forall model event x.
Rep (PostgresEventTrans model event) x
-> PostgresEventTrans model event
forall model event x.
PostgresEventTrans model event
-> Rep (PostgresEventTrans model event) x
$cto :: forall model event x.
Rep (PostgresEventTrans model event) x
-> PostgresEventTrans model event
$cfrom :: forall model event x.
PostgresEventTrans model event
-> Rep (PostgresEventTrans model event) x
Generic)

instance (FromJSON e) => ReadModel (PostgresEvent m e) where
    type Model (PostgresEvent m e) = m
    type Event (PostgresEvent m e) = e
    applyEvent :: PostgresEvent m e
-> Model (PostgresEvent m e)
-> Stored (Event (PostgresEvent m e))
-> Model (PostgresEvent m e)
applyEvent PostgresEvent m e
pg = PostgresEvent m e
pg forall s a. s -> Getting a s a -> a
^. forall (field :: Symbol) s t a b.
HasField field s t a b =>
Lens s t a b
field @"app"
    getModel :: PostgresEvent m e -> IO (Model (PostgresEvent m e))
getModel PostgresEvent m e
pg = forall a model event.
PostgresEvent model event
-> (PostgresEventTrans model event -> IO a) -> IO a
withIOTrans PostgresEvent m e
pg forall e m. FromJSON e => PostgresEventTrans m e -> IO m
getModel'

    getEventList :: PostgresEvent m e -> IO [Stored (Event (PostgresEvent m e))]
getEventList PostgresEvent m e
pg = forall (m :: * -> *) a b.
MonadUnliftIO m =>
Pool a -> (a -> m b) -> m b
withResource (forall model event. PostgresEvent model event -> Pool Connection
connectionPool PostgresEvent m e
pg) forall a b. (a -> b) -> a -> b
$ \Connection
conn ->
        forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall a b. (a, b) -> a
fst forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a.
FromJSON a =>
Connection -> String -> IO [(Stored a, EventNumber)]
queryEvents Connection
conn (PostgresEvent m e
pg forall s a. s -> Getting a s a -> a
^. forall (field :: Symbol) s t a b.
HasField field s t a b =>
Lens s t a b
field @"eventTableName")

    getEventStream :: PostgresEvent m e
-> SerialT IO (Stored (Event (PostgresEvent m e)))
getEventStream PostgresEvent m e
pg = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a model event.
(IsStream t, MonadAsync m, MonadCatch m) =>
PostgresEvent model event
-> (PostgresEventTrans model event -> t m a) -> t m a
withStreamReadTransaction PostgresEvent m e
pg forall event model.
FromJSON event =>
PostgresEventTrans model event -> SerialT IO (Stored event)
getEventStream'

getEventTableName :: EventTable -> EventTableName
getEventTableName :: EventTable -> String
getEventTableName = Int -> EventTable -> String
go Int
0
  where
    go :: Int -> EventTable -> String
    go :: Int -> EventTable -> String
go Int
i = \case
        MigrateUsing EventMigration
_ EventTable
u -> Int -> EventTable -> String
go (Int
i forall a. Num a => a -> a -> a
+ Int
1) EventTable
u
        InitialVersion String
n -> String
n forall a. Semigroup a => a -> a -> a
<> String
"_v" forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show (Int
i forall a. Num a => a -> a -> a
+ Int
1)

-- | Create the table required for storing state and events, if they do not yet exist.
createEventTable
    :: (FromJSON e, WriteModel (PostgresEventTrans m e))
    => PostgresEventTrans m e
    -> IO ()
createEventTable :: forall e m.
(FromJSON e, WriteModel (PostgresEventTrans m e)) =>
PostgresEventTrans m e -> IO ()
createEventTable PostgresEventTrans m e
pgt = do
    forall (f :: * -> *) a. Functor f => f a -> f ()
void (forall p. ReadModel p => p -> IO (Model p)
getModel PostgresEventTrans m e
pgt)
        forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> (e -> m a) -> m a
`catch` ( forall a b. a -> b -> a
const @_ @SqlError forall a b. (a -> b) -> a -> b
$ do
                    let etName :: String
etName = PostgresEventTrans m e
pgt forall s a. s -> Getting a s a -> a
^. forall (field :: Symbol) s t a b.
HasField field s t a b =>
Lens s t a b
field @"eventTableName"
                    Int64
_ <-
                        Connection -> String -> IO Int64
createEventTable'
                            (PostgresEventTrans m e
pgt forall s a. s -> Getting a s a -> a
^. forall (field :: Symbol) s t a b.
HasField field s t a b =>
Lens s t a b
field @"transaction" forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall s a. (s -> a) -> SimpleGetter s a
to OngoingTransaction -> Connection
connection)
                            String
etName
                    forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall m e.
FromJSON e =>
PostgresEventTrans m e -> IO (m, EventNumber)
refreshModel PostgresEventTrans m e
pgt
                )

createEventTable' :: Connection -> EventTableName -> IO Int64
createEventTable' :: Connection -> String -> IO Int64
createEventTable' Connection
conn String
eventTable =
    Connection -> Query -> IO Int64
execute_ Connection
conn forall a b. (a -> b) -> a -> b
$
        Query
"create table if not exists \""
            forall a. Semigroup a => a -> a -> a
<> forall a. IsString a => String -> a
fromString String
eventTable
            forall a. Semigroup a => a -> a -> a
<> Query
"\" \
               \( id uuid primary key\
               \, event_number bigint not null generated always as identity\
               \, timestamp timestamptz not null default now()\
               \, event jsonb not null\
               \);"

retireTable :: Connection -> EventTableName -> IO ()
retireTable :: Connection -> String -> IO ()
retireTable Connection
conn String
tableName = do
    Connection -> IO ()
createRetireFunction Connection
conn
    forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$
        Connection -> Query -> IO Int64
execute_ Connection
conn forall a b. (a -> b) -> a -> b
$
            Query
"create trigger retired before insert on \""
                forall a. Semigroup a => a -> a -> a
<> forall a. IsString a => String -> a
fromString String
tableName
                forall a. Semigroup a => a -> a -> a
<> Query
"\" execute procedure retired_table()"

createRetireFunction :: Connection -> IO ()
createRetireFunction :: Connection -> IO ()
createRetireFunction Connection
conn =
    forall (f :: * -> *) a. Functor f => f a -> f ()
void
        forall b c a. (b -> c) -> (a -> b) -> a -> c
. Connection -> Query -> IO Int64
execute_ Connection
conn
        forall a b. (a -> b) -> a -> b
$ Query
"create or replace function retired_table() returns trigger as \
          \$$ begin raise exception 'Event table has been retired.'; end; $$ \
          \language plpgsql;"

simplePool' :: MonadUnliftIO m => PG.ConnectInfo -> m (Pool Connection)
simplePool' :: forall (m :: * -> *).
MonadUnliftIO m =>
ConnectInfo -> m (Pool Connection)
simplePool' ConnectInfo
connInfo =
    forall (m :: * -> *) a.
MonadUnliftIO m =>
m a -> (a -> m ()) -> Int -> NominalDiffTime -> Int -> m (Pool a)
createPool (forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ ConnectInfo -> IO Connection
PG.connect ConnectInfo
connInfo) (forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall b c a. (b -> c) -> (a -> b) -> a -> c
. Connection -> IO ()
PG.close) Int
1 NominalDiffTime
5 Int
5

simplePool :: MonadUnliftIO m => IO Connection -> m (Pool Connection)
simplePool :: forall (m :: * -> *).
MonadUnliftIO m =>
IO Connection -> m (Pool Connection)
simplePool IO Connection
getConn =
    forall (m :: * -> *) a.
MonadUnliftIO m =>
m a -> (a -> m ()) -> Int -> NominalDiffTime -> Int -> m (Pool a)
createPool (forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO Connection
getConn) (forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall b c a. (b -> c) -> (a -> b) -> a -> c
. Connection -> IO ()
PG.close) Int
1 NominalDiffTime
5 Int
5

-- | Setup the persistance model and verify that the tables exist.
postgresWriteModelNoMigration
    :: (FromJSON e, WriteModel (PostgresEventTrans m e))
    => Pool Connection
    -> EventTableName
    -> (m -> Stored e -> m)
    -> m
    -> IO (PostgresEvent m e)
postgresWriteModelNoMigration :: forall e m.
(FromJSON e, WriteModel (PostgresEventTrans m e)) =>
Pool Connection
-> String -> (m -> Stored e -> m) -> m -> IO (PostgresEvent m e)
postgresWriteModelNoMigration Pool Connection
pool String
eventTable m -> Stored e -> m
app' m
seed' = do
    PostgresEvent m e
pg <- forall event model.
Pool Connection
-> String
-> (model -> Stored event -> model)
-> model
-> IO (PostgresEvent model event)
createPostgresPersistance Pool Connection
pool String
eventTable m -> Stored e -> m
app' m
seed'
    forall a model event.
PostgresEvent model event
-> (PostgresEventTrans model event -> IO a) -> IO a
withIOTrans PostgresEvent m e
pg forall e m.
(FromJSON e, WriteModel (PostgresEventTrans m e)) =>
PostgresEventTrans m e -> IO ()
createEventTable
    forall (f :: * -> *) a. Applicative f => a -> f a
pure PostgresEvent m e
pg

-- | Setup the persistance model and verify that the tables exist.
postgresWriteModel
    :: Pool Connection
    -> EventTable
    -> (m -> Stored e -> m)
    -> m
    -> IO (PostgresEvent m e)
postgresWriteModel :: forall m e.
Pool Connection
-> EventTable
-> (m -> Stored e -> m)
-> m
-> IO (PostgresEvent m e)
postgresWriteModel Pool Connection
pool EventTable
eventTable m -> Stored e -> m
app' m
seed' = do
    PostgresEvent m e
pg <- forall event model.
Pool Connection
-> String
-> (model -> Stored event -> model)
-> model
-> IO (PostgresEvent model event)
createPostgresPersistance Pool Connection
pool (EventTable -> String
getEventTableName EventTable
eventTable) m -> Stored e -> m
app' m
seed'
    forall a model event.
PostgresEvent model event
-> (PostgresEventTrans model event -> IO a) -> IO a
withIOTrans PostgresEvent m e
pg forall a b. (a -> b) -> a -> b
$ \PostgresEventTrans m e
pgt -> OngoingTransaction -> EventTable -> IO ()
runMigrations (PostgresEventTrans m e
pgt forall s a. s -> Getting a s a -> a
^. forall (field :: Symbol) s t a b.
HasField field s t a b =>
Lens s t a b
field @"transaction") EventTable
eventTable
    forall (f :: * -> *) a. Applicative f => a -> f a
pure PostgresEvent m e
pg

newtype Exists = Exists
    { Exists -> Bool
exists :: Bool
    }
    deriving (Int -> Exists -> ShowS
[Exists] -> ShowS
Exists -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [Exists] -> ShowS
$cshowList :: [Exists] -> ShowS
show :: Exists -> String
$cshow :: Exists -> String
showsPrec :: Int -> Exists -> ShowS
$cshowsPrec :: Int -> Exists -> ShowS
Show, Exists -> Exists -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: Exists -> Exists -> Bool
$c/= :: Exists -> Exists -> Bool
== :: Exists -> Exists -> Bool
$c== :: Exists -> Exists -> Bool
Eq, forall x. Rep Exists x -> Exists
forall x. Exists -> Rep Exists x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep Exists x -> Exists
$cfrom :: forall x. Exists -> Rep Exists x
Generic)
    deriving anyclass (RowParser Exists
forall a. RowParser a -> FromRow a
fromRow :: RowParser Exists
$cfromRow :: RowParser Exists
FromRow)

runMigrations :: OngoingTransaction -> EventTable -> IO ()
runMigrations :: OngoingTransaction -> EventTable -> IO ()
runMigrations OngoingTransaction
trans EventTable
et = do
    [Only Bool]
tableExistQuery <-
        forall q r.
(ToRow q, FromRow r) =>
Connection -> Query -> q -> IO [r]
query
            Connection
conn
            Query
"select exists (select * from information_schema.tables where table_schema='public' and table_name=?)"
            (forall a. a -> Only a
Only forall a b. (a -> b) -> a -> b
$ EventTable -> String
getEventTableName EventTable
et)

    case (EventTable
et, [Only Bool]
tableExistQuery) of
        (InitialVersion String
_, [Only Bool
True]) -> forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
        (MigrateUsing{}, [Only Bool
True]) -> forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
        (InitialVersion String
_, [Only Bool
False]) -> IO ()
createTable
        (MigrateUsing EventMigration
mig EventTable
prevEt, [Only Bool
False]) -> do
            OngoingTransaction -> EventTable -> IO ()
runMigrations OngoingTransaction
trans EventTable
prevEt
            IO ()
createTable
            EventMigration
mig (EventTable -> String
getEventTableName EventTable
prevEt) (EventTable -> String
getEventTableName EventTable
et) Connection
conn
            Connection -> String -> IO ()
retireTable Connection
conn (EventTable -> String
getEventTableName EventTable
prevEt)
        (EventTable
_, [Only Bool]
r) -> forall (m :: * -> *) a. MonadFail m => String -> m a
fail forall a b. (a -> b) -> a -> b
$ String
"Unexpected table query result: " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show [Only Bool]
r
  where
    conn :: Connection
    conn :: Connection
conn = OngoingTransaction -> Connection
connection OngoingTransaction
trans

    createTable :: IO ()
    createTable :: IO ()
createTable = do
        let tableName :: String
tableName = EventTable -> String
getEventTableName EventTable
et
        forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ Connection -> String -> IO Int64
createEventTable' Connection
conn String
tableName

createPostgresPersistance
    :: forall event model
     . Pool Connection
    -> EventTableName
    -> (model -> Stored event -> model)
    -> model
    -> IO (PostgresEvent model event)
createPostgresPersistance :: forall event model.
Pool Connection
-> String
-> (model -> Stored event -> model)
-> model
-> IO (PostgresEvent model event)
createPostgresPersistance Pool Connection
pool String
eventTable model -> Stored event -> model
app' model
seed' = do
    IORef (NumberedModel model)
ref <- forall a. a -> IO (IORef a)
newIORef forall a b. (a -> b) -> a -> b
$ forall m. m -> EventNumber -> NumberedModel m
NumberedModel model
seed' EventNumber
0
    forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$
        PostgresEvent
            { $sel:connectionPool:PostgresEvent :: Pool Connection
connectionPool = Pool Connection
pool
            , $sel:eventTableName:PostgresEvent :: String
eventTableName = String
eventTable
            , $sel:modelIORef:PostgresEvent :: IORef (NumberedModel model)
modelIORef = IORef (NumberedModel model)
ref
            , $sel:app:PostgresEvent :: model -> Stored event -> model
app = model -> Stored event -> model
app'
            , $sel:seed:PostgresEvent :: model
seed = model
seed'
            , $sel:chunkSize:PostgresEvent :: Int
chunkSize = Int
50
            }

queryEvents
    :: (FromJSON a)
    => Connection
    -> EventTableName
    -> IO [(Stored a, EventNumber)]
queryEvents :: forall a.
FromJSON a =>
Connection -> String -> IO [(Stored a, EventNumber)]
queryEvents Connection
conn String
eventTable = do
    forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
traverse forall e (m :: * -> *).
(FromJSON e, MonadThrow m) =>
EventRowOut -> m (Stored e, EventNumber)
fromEventRow forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< forall r. FromRow r => Connection -> Query -> IO [r]
query_ Connection
conn Query
q
  where
    q :: PG.Query
    q :: Query
q =
        Query
"select id, event_number,timestamp,event from \""
            forall a. Semigroup a => a -> a -> a
<> forall a. IsString a => String -> a
fromString String
eventTable
            forall a. Semigroup a => a -> a -> a
<> Query
"\" order by event_number"

queryEventsAfter
    :: (FromJSON a)
    => Connection
    -> EventTableName
    -> EventNumber
    -> IO [(Stored a, EventNumber)]
queryEventsAfter :: forall a.
FromJSON a =>
Connection -> String -> EventNumber -> IO [(Stored a, EventNumber)]
queryEventsAfter Connection
conn String
eventTable (EventNumber Int64
lastEvent) =
    forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
traverse forall e (m :: * -> *).
(FromJSON e, MonadThrow m) =>
EventRowOut -> m (Stored e, EventNumber)
fromEventRow
        forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< forall r. FromRow r => Connection -> Query -> IO [r]
query_
            Connection
conn
            ( Query
"select id, event_number,timestamp,event from \""
                forall a. Semigroup a => a -> a -> a
<> forall a. IsString a => String -> a
fromString String
eventTable
                forall a. Semigroup a => a -> a -> a
<> Query
"\" where event_number > "
                forall a. Semigroup a => a -> a -> a
<> forall a. IsString a => String -> a
fromString (forall a. Show a => a -> String
show Int64
lastEvent)
                forall a. Semigroup a => a -> a -> a
<> Query
" order by event_number"
            )

newtype EventQuery = EventQuery {EventQuery -> Query
getPgQuery :: PG.Query}
    deriving (Int -> EventQuery -> ShowS
[EventQuery] -> ShowS
EventQuery -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [EventQuery] -> ShowS
$cshowList :: [EventQuery] -> ShowS
show :: EventQuery -> String
$cshow :: EventQuery -> String
showsPrec :: Int -> EventQuery -> ShowS
$cshowsPrec :: Int -> EventQuery -> ShowS
Show, forall x. Rep EventQuery x -> EventQuery
forall x. EventQuery -> Rep EventQuery x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep EventQuery x -> EventQuery
$cfrom :: forall x. EventQuery -> Rep EventQuery x
Generic)

mkEventsAfterQuery :: EventTableName -> EventNumber -> EventQuery
mkEventsAfterQuery :: String -> EventNumber -> EventQuery
mkEventsAfterQuery String
eventTable (EventNumber Int64
lastEvent) =
    Query -> EventQuery
EventQuery forall a b. (a -> b) -> a -> b
$
        Query
"select id, event_number,timestamp,event from \""
            forall a. Semigroup a => a -> a -> a
<> forall a. IsString a => String -> a
fromString String
eventTable
            forall a. Semigroup a => a -> a -> a
<> Query
"\" where event_number > "
            forall a. Semigroup a => a -> a -> a
<> forall a. IsString a => String -> a
fromString (forall a. Show a => a -> String
show Int64
lastEvent)
            forall a. Semigroup a => a -> a -> a
<> Query
" order by event_number"

mkEventQuery :: EventTableName -> EventQuery
mkEventQuery :: String -> EventQuery
mkEventQuery String
eventTable =
    Query -> EventQuery
EventQuery forall a b. (a -> b) -> a -> b
$
        Query
"select id, event_number,timestamp,event from \""
            forall a. Semigroup a => a -> a -> a
<> forall a. IsString a => String -> a
fromString String
eventTable
            forall a. Semigroup a => a -> a -> a
<> Query
"\" order by event_number"

headMay :: [a] -> Maybe a
headMay :: forall a. [a] -> Maybe a
headMay = \case
    a
a : [a]
_ -> forall a. a -> Maybe a
Just a
a
    [] -> forall a. Maybe a
Nothing

queryHasEventsAfter :: Connection -> EventTableName -> EventNumber -> IO Bool
queryHasEventsAfter :: Connection -> String -> EventNumber -> IO Bool
queryHasEventsAfter Connection
conn String
eventTable (EventNumber Int64
lastEvent) =
    forall b a. b -> (a -> b) -> Maybe a -> b
maybe Bool
True forall a. Only a -> a
fromOnly forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. [a] -> Maybe a
headMay forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall r. FromRow r => Connection -> Query -> IO [r]
query_ Connection
conn Query
q
  where
    q :: PG.Query
    q :: Query
q =
        Query
"select count(*) > 0 from \""
            forall a. Semigroup a => a -> a -> a
<> forall a. IsString a => String -> a
fromString String
eventTable
            forall a. Semigroup a => a -> a -> a
<> Query
"\" where event_number > "
            forall a. Semigroup a => a -> a -> a
<> forall a. IsString a => String -> a
fromString (forall a. Show a => a -> String
show Int64
lastEvent)

writeEvents
    :: forall a
     . (ToJSON a)
    => Connection
    -> EventTableName
    -> [Stored a]
    -> IO EventNumber
writeEvents :: forall a.
ToJSON a =>
Connection -> String -> [Stored a] -> IO EventNumber
writeEvents Connection
conn String
eventTable [Stored a]
storedEvents = do
    Int64
_ <-
        forall q. ToRow q => Connection -> Query -> [q] -> IO Int64
executeMany
            Connection
conn
            ( Query
"insert into \""
                forall a. Semigroup a => a -> a -> a
<> forall a. IsString a => String -> a
fromString String
eventTable
                forall a. Semigroup a => a -> a -> a
<> Query
"\" (id, timestamp, event) \
                   \values (?, ?, ?)"
            )
            ( forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap
                (\Stored a
x -> (forall a. Stored a -> UUID
storedUUID Stored a
x, forall a. Stored a -> UTCTime
storedTimestamp Stored a
x, forall a. ToJSON a => a -> ByteString
encode forall a b. (a -> b) -> a -> b
$ forall a. Stored a -> a
storedEvent Stored a
x))
                [Stored a]
storedEvents
            )
    forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
foldl' forall a. Ord a => a -> a -> a
max EventNumber
0 forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall a. Only a -> a
fromOnly
        forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall r. FromRow r => Connection -> Query -> IO [r]
query_
            Connection
conn
            (Query
"select coalesce(max(event_number),1) from \"" forall a. Semigroup a => a -> a -> a
<> forall a. IsString a => String -> a
fromString String
eventTable forall a. Semigroup a => a -> a -> a
<> Query
"\"")

getEventStream'
    :: FromJSON event => PostgresEventTrans model event -> S.SerialT IO (Stored event)
getEventStream' :: forall event model.
FromJSON event =>
PostgresEventTrans model event -> SerialT IO (Stored event)
getEventStream' PostgresEventTrans model event
pgt =
    forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> b) -> t m a -> t m b
S.map forall a b. (a, b) -> a
fst forall a b. (a -> b) -> a -> b
$
        forall event.
FromJSON event =>
Int
-> Connection
-> EventQuery
-> SerialT IO (Stored event, EventNumber)
mkEventStream
            (PostgresEventTrans model event
pgt forall s a. s -> Getting a s a -> a
^. forall (field :: Symbol) s t a b.
HasField field s t a b =>
Lens s t a b
field @"chunkSize")
            (PostgresEventTrans model event
pgt forall s a. s -> Getting a s a -> a
^. forall (field :: Symbol) s t a b.
HasField field s t a b =>
Lens s t a b
field @"transaction" forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (field :: Symbol) s t a b.
HasField field s t a b =>
Lens s t a b
field @"connection")
            (PostgresEventTrans model event
pgt forall s a. s -> Getting a s a -> a
^. forall (field :: Symbol) s t a b.
HasField field s t a b =>
Lens s t a b
field @"eventTableName" forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall s a. (s -> a) -> SimpleGetter s a
to String -> EventQuery
mkEventQuery)

-- | A transaction that is always rolled back at the end.
-- This is useful when using cursors as they can only be used inside a transaction.
withStreamReadTransaction
    :: forall t m a model event
     . (S.IsStream t, S.MonadAsync m, MonadCatch m)
    => PostgresEvent model event
    -> (PostgresEventTrans model event -> t m a)
    -> t m a
withStreamReadTransaction :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a model event.
(IsStream t, MonadAsync m, MonadCatch m) =>
PostgresEvent model event
-> (PostgresEventTrans model event -> t m a) -> t m a
withStreamReadTransaction PostgresEvent model event
pg = forall (t :: (* -> *) -> * -> *) (m :: * -> *) b c a.
(IsStream t, MonadAsync m, MonadCatch m) =>
m b -> (b -> m c) -> (b -> t m a) -> t m a
S.bracket m (PostgresEventTrans model event)
startTrans PostgresEventTrans model event -> m ()
rollbackTrans
  where
    startTrans :: m (PostgresEventTrans model event)
    startTrans :: m (PostgresEventTrans model event)
startTrans = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ do
        (Connection
conn, LocalPool Connection
localPool) <- forall (m :: * -> *) a.
MonadUnliftIO m =>
Pool a -> m (a, LocalPool a)
takeResource (forall model event. PostgresEvent model event -> Pool Connection
connectionPool PostgresEvent model event
pg)
        Connection -> IO ()
PG.begin Connection
conn
        forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$
            PostgresEventTrans
                { $sel:transaction:PostgresEventTrans :: OngoingTransaction
transaction = Connection -> LocalPool Connection -> OngoingTransaction
OngoingTransaction Connection
conn LocalPool Connection
localPool
                , $sel:eventTableName:PostgresEventTrans :: String
eventTableName = PostgresEvent model event
pg forall s a. s -> Getting a s a -> a
^. forall (field :: Symbol) s t a b.
HasField field s t a b =>
Lens s t a b
field @"eventTableName"
                , $sel:modelIORef:PostgresEventTrans :: IORef (NumberedModel model)
modelIORef = PostgresEvent model event
pg forall s a. s -> Getting a s a -> a
^. forall (field :: Symbol) s t a b.
HasField field s t a b =>
Lens s t a b
field @"modelIORef"
                , $sel:app:PostgresEventTrans :: model -> Stored event -> model
app = PostgresEvent model event
pg forall s a. s -> Getting a s a -> a
^. forall (field :: Symbol) s t a b.
HasField field s t a b =>
Lens s t a b
field @"app"
                , $sel:seed:PostgresEventTrans :: model
seed = PostgresEvent model event
pg forall s a. s -> Getting a s a -> a
^. forall (field :: Symbol) s t a b.
HasField field s t a b =>
Lens s t a b
field @"seed"
                , $sel:chunkSize:PostgresEventTrans :: Int
chunkSize = PostgresEvent model event
pg forall s a. s -> Getting a s a -> a
^. forall (field :: Symbol) s t a b.
HasField field s t a b =>
Lens s t a b
field @"chunkSize"
                }

    rollbackTrans :: PostgresEventTrans model event -> m ()
    rollbackTrans :: PostgresEventTrans model event -> m ()
rollbackTrans PostgresEventTrans model event
pgt = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ do
        -- Nothing changes. We just need the transaction to be able to stream events.
        let OngoingTransaction Connection
conn LocalPool Connection
localPool = PostgresEventTrans model event
pgt forall s a. s -> Getting a s a -> a
^. forall (field :: Symbol) s a. HasField' field s a => Lens s s a a
field' @"transaction"

            giveBackConn :: IO ()
            giveBackConn :: IO ()
giveBackConn = do
                Connection -> IO ()
PG.rollback Connection
conn
                forall (m :: * -> *) a. MonadUnliftIO m => LocalPool a -> a -> m ()
putResource LocalPool Connection
localPool Connection
conn
        IO ()
giveBackConn forall (m :: * -> *) a.
MonadCatch m =>
m a -> (SomeException -> m a) -> m a
`catchAll` \SomeException
_ ->
            forall (m :: * -> *) a.
MonadUnliftIO m =>
Pool a -> LocalPool a -> a -> m ()
destroyResource (forall model event. PostgresEvent model event -> Pool Connection
connectionPool PostgresEvent model event
pg) LocalPool Connection
localPool Connection
conn

withIOTrans
    :: forall a model event
     . PostgresEvent model event
    -> (PostgresEventTrans model event -> IO a)
    -> IO a
withIOTrans :: forall a model event.
PostgresEvent model event
-> (PostgresEventTrans model event -> IO a) -> IO a
withIOTrans PostgresEvent model event
pg PostgresEventTrans model event -> IO a
f = do
    IORef Bool
transactionCompleted <- forall a. a -> IO (IORef a)
newIORef Bool
False
    (Connection
conn, LocalPool Connection
localPool) <- forall (m :: * -> *) a.
MonadUnliftIO m =>
Pool a -> m (a, LocalPool a)
takeResource (forall model event. PostgresEvent model event -> Pool Connection
connectionPool PostgresEvent model event
pg)
    forall (m :: * -> *) a c b.
MonadMask m =>
m a -> (a -> m c) -> (a -> m b) -> m b
bracket (Connection
-> LocalPool Connection -> IO (PostgresEventTrans model event)
prepareTransaction Connection
conn LocalPool Connection
localPool) (IORef Bool -> PostgresEventTrans model event -> IO ()
cleanup IORef Bool
transactionCompleted) forall a b. (a -> b) -> a -> b
$ \PostgresEventTrans model event
pgt -> do
        a
a <- PostgresEventTrans model event -> IO a
f PostgresEventTrans model event
pgt
        forall a. IORef a -> a -> IO ()
writeIORef IORef Bool
transactionCompleted Bool
True
        forall (f :: * -> *) a. Applicative f => a -> f a
pure a
a
  where
    cleanup :: IORef Bool -> PostgresEventTrans model event -> IO ()
    cleanup :: IORef Bool -> PostgresEventTrans model event -> IO ()
cleanup IORef Bool
transactionCompleted PostgresEventTrans model event
pgt = do
        let OngoingTransaction Connection
conn LocalPool Connection
localPool = PostgresEventTrans model event
pgt forall s a. s -> Getting a s a -> a
^. forall (field :: Symbol) s a. HasField' field s a => Lens s s a a
field' @"transaction"

            giveBackConn :: IO ()
            giveBackConn :: IO ()
giveBackConn = do
                forall a. IORef a -> IO a
readIORef IORef Bool
transactionCompleted forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
                    Bool
True -> Connection -> IO ()
PG.commit Connection
conn
                    Bool
False -> Connection -> IO ()
PG.rollback Connection
conn
                forall (m :: * -> *) a. MonadUnliftIO m => LocalPool a -> a -> m ()
putResource LocalPool Connection
localPool Connection
conn
        IO ()
giveBackConn forall (m :: * -> *) a.
MonadCatch m =>
m a -> (SomeException -> m a) -> m a
`catchAll` \SomeException
_ ->
            forall (m :: * -> *) a.
MonadUnliftIO m =>
Pool a -> LocalPool a -> a -> m ()
destroyResource (forall model event. PostgresEvent model event -> Pool Connection
connectionPool PostgresEvent model event
pg) LocalPool Connection
localPool Connection
conn

    prepareTransaction :: Connection -> LocalPool Connection -> IO (PostgresEventTrans model event)
    prepareTransaction :: Connection
-> LocalPool Connection -> IO (PostgresEventTrans model event)
prepareTransaction Connection
conn LocalPool Connection
localPool = do
        Connection -> IO ()
PG.begin Connection
conn
        forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$
            PostgresEventTrans
                { $sel:transaction:PostgresEventTrans :: OngoingTransaction
transaction = Connection -> LocalPool Connection -> OngoingTransaction
OngoingTransaction Connection
conn LocalPool Connection
localPool
                , $sel:eventTableName:PostgresEventTrans :: String
eventTableName = PostgresEvent model event
pg forall s a. s -> Getting a s a -> a
^. forall (field :: Symbol) s t a b.
HasField field s t a b =>
Lens s t a b
field @"eventTableName"
                , $sel:modelIORef:PostgresEventTrans :: IORef (NumberedModel model)
modelIORef = PostgresEvent model event
pg forall s a. s -> Getting a s a -> a
^. forall (field :: Symbol) s t a b.
HasField field s t a b =>
Lens s t a b
field @"modelIORef"
                , $sel:app:PostgresEventTrans :: model -> Stored event -> model
app = PostgresEvent model event
pg forall s a. s -> Getting a s a -> a
^. forall (field :: Symbol) s t a b.
HasField field s t a b =>
Lens s t a b
field @"app"
                , $sel:seed:PostgresEventTrans :: model
seed = PostgresEvent model event
pg forall s a. s -> Getting a s a -> a
^. forall (field :: Symbol) s t a b.
HasField field s t a b =>
Lens s t a b
field @"seed"
                , $sel:chunkSize:PostgresEventTrans :: Int
chunkSize = PostgresEvent model event
pg forall s a. s -> Getting a s a -> a
^. forall (field :: Symbol) s t a b.
HasField field s t a b =>
Lens s t a b
field @"chunkSize"
                }

mkEventStream
    :: FromJSON event
    => ChunkSize
    -> Connection
    -> EventQuery
    -> S.SerialT IO (Stored event, EventNumber)
mkEventStream :: forall event.
FromJSON event =>
Int
-> Connection
-> EventQuery
-> SerialT IO (Stored event, EventNumber)
mkEventStream Int
chunkSize Connection
conn EventQuery
q = do
    let step :: Cursor.Cursor -> IO (Maybe (Seq EventRowOut, Cursor.Cursor))
        step :: Cursor -> IO (Maybe (Seq EventRowOut, Cursor))
step Cursor
cursor = do
            Either (Seq EventRowOut) (Seq EventRowOut)
r <- forall r a.
FromRow r =>
Cursor -> Int -> (a -> r -> IO a) -> a -> IO (Either a a)
Cursor.foldForward Cursor
cursor Int
chunkSize (\Seq EventRowOut
a EventRowOut
r -> forall (f :: * -> *) a. Applicative f => a -> f a
pure (Seq EventRowOut
a forall a. Seq a -> a -> Seq a
:|> EventRowOut
r)) forall a. Seq a
Seq.Empty
            case Either (Seq EventRowOut) (Seq EventRowOut)
r of
                Left Seq EventRowOut
Seq.Empty -> forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a. Maybe a
Nothing
                Left Seq EventRowOut
a -> forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ forall a. a -> Maybe a
Just (Seq EventRowOut
a, Cursor
cursor)
                Right Seq EventRowOut
a -> forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ forall a. a -> Maybe a
Just (Seq EventRowOut
a, Cursor
cursor)

    Cursor
cursor <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ Connection -> Query -> IO Cursor
Cursor.declareCursor Connection
conn (EventQuery -> Query
getPgQuery EventQuery
q)
    forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
(a -> m b) -> t m a -> t m b
S.mapM forall e (m :: * -> *).
(FromJSON e, MonadThrow m) =>
EventRowOut -> m (Stored e, EventNumber)
fromEventRow forall a b. (a -> b) -> a -> b
$
        forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
Unfold m a b -> t m a -> t m b
S.unfoldMany forall (m :: * -> *) a. Monad m => Unfold m [a] a
Unfold.fromList forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall (t :: * -> *) a. Foldable t => t a -> [a]
toList forall a b. (a -> b) -> a -> b
$
            forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
(IsStream t, MonadAsync m) =>
(b -> m (Maybe (a, b))) -> b -> t m a
S.unfoldrM
                Cursor -> IO (Maybe (Seq EventRowOut, Cursor))
step
                Cursor
cursor

getModel' :: forall e m. (FromJSON e) => PostgresEventTrans m e -> IO m
getModel' :: forall e m. FromJSON e => PostgresEventTrans m e -> IO m
getModel' PostgresEventTrans m e
pgt = do
    NumberedModel m
model EventNumber
lastEventNo <- forall a. IORef a -> IO a
readIORef (PostgresEventTrans m e
pgt forall s a. s -> Getting a s a -> a
^. forall (field :: Symbol) s t a b.
HasField field s t a b =>
Lens s t a b
field @"modelIORef")
    Bool
hasNewEvents <-
        Connection -> String -> EventNumber -> IO Bool
queryHasEventsAfter
            (PostgresEventTrans m e
pgt forall s a. s -> Getting a s a -> a
^. forall (field :: Symbol) s t a b.
HasField field s t a b =>
Lens s t a b
field @"transaction" forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall s a. (s -> a) -> SimpleGetter s a
to OngoingTransaction -> Connection
connection)
            (PostgresEventTrans m e
pgt forall s a. s -> Getting a s a -> a
^. forall (field :: Symbol) s t a b.
HasField field s t a b =>
Lens s t a b
field @"eventTableName")
            EventNumber
lastEventNo
    if Bool
hasNewEvents then forall a b. (a, b) -> a
fst forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall m e.
FromJSON e =>
PostgresEventTrans m e -> IO (m, EventNumber)
refreshModel PostgresEventTrans m e
pgt else forall (f :: * -> *) a. Applicative f => a -> f a
pure m
model

refreshModel
    :: forall m e
     . (FromJSON e)
    => PostgresEventTrans m e
    -> IO (m, EventNumber)
refreshModel :: forall m e.
FromJSON e =>
PostgresEventTrans m e -> IO (m, EventNumber)
refreshModel PostgresEventTrans m e
pg = do
    -- refresh doesn't write any events but changes the state and thus needs a lock
    OngoingTransaction -> String -> IO ()
exclusiveLock (PostgresEventTrans m e
pg forall s a. s -> Getting a s a -> a
^. forall (field :: Symbol) s t a b.
HasField field s t a b =>
Lens s t a b
field @"transaction") (PostgresEventTrans m e
pg forall s a. s -> Getting a s a -> a
^. forall (field :: Symbol) s t a b.
HasField field s t a b =>
Lens s t a b
field @"eventTableName")
    NumberedModel m
model EventNumber
lastEventNo <- forall a. IORef a -> IO a
readIORef (PostgresEventTrans m e
pg forall s a. s -> Getting a s a -> a
^. forall (field :: Symbol) s t a b.
HasField field s t a b =>
Lens s t a b
field @"modelIORef")
    let eventStream :: S.SerialT IO (Stored e, EventNumber)
        eventStream :: SerialT IO (Stored e, EventNumber)
eventStream =
            forall event.
FromJSON event =>
Int
-> Connection
-> EventQuery
-> SerialT IO (Stored event, EventNumber)
mkEventStream
                (PostgresEventTrans m e
pg forall s a. s -> Getting a s a -> a
^. forall (field :: Symbol) s t a b.
HasField field s t a b =>
Lens s t a b
field @"chunkSize")
                (PostgresEventTrans m e
pg forall s a. s -> Getting a s a -> a
^. forall (field :: Symbol) s t a b.
HasField field s t a b =>
Lens s t a b
field @"transaction" forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (field :: Symbol) s t a b.
HasField field s t a b =>
Lens s t a b
field @"connection")
                (String -> EventNumber -> EventQuery
mkEventsAfterQuery (PostgresEventTrans m e
pg forall s a. s -> Getting a s a -> a
^. forall (field :: Symbol) s t a b.
HasField field s t a b =>
Lens s t a b
field @"eventTableName") EventNumber
lastEventNo)

        applyModel :: NumberedModel m -> (Stored e, EventNumber) -> NumberedModel m
        applyModel :: NumberedModel m -> (Stored e, EventNumber) -> NumberedModel m
applyModel (NumberedModel m
m EventNumber
_) (Stored e
ev, EventNumber
evNumber) =
            forall m. m -> EventNumber -> NumberedModel m
NumberedModel ((PostgresEventTrans m e
pg forall s a. s -> Getting a s a -> a
^. forall (field :: Symbol) s t a b.
HasField field s t a b =>
Lens s t a b
field @"app") m
m Stored e
ev) EventNumber
evNumber

    NumberedModel m
newModel EventNumber
lastNewEventNo <-
        forall (m :: * -> *) b a.
Monad m =>
(b -> a -> b) -> b -> SerialT m a -> m b
S.foldl'
            NumberedModel m -> (Stored e, EventNumber) -> NumberedModel m
applyModel
            (forall m. m -> EventNumber -> NumberedModel m
NumberedModel m
model EventNumber
lastEventNo)
            SerialT IO (Stored e, EventNumber)
eventStream

    ()
_ <- forall a. IORef a -> a -> IO ()
writeIORef (PostgresEventTrans m e
pg forall s a. s -> Getting a s a -> a
^. forall (field :: Symbol) s t a b.
HasField field s t a b =>
Lens s t a b
field @"modelIORef") forall a b. (a -> b) -> a -> b
$ forall m. m -> EventNumber -> NumberedModel m
NumberedModel m
newModel EventNumber
lastNewEventNo
    forall (f :: * -> *) a. Applicative f => a -> f a
pure (m
newModel, EventNumber
lastNewEventNo)

exclusiveLock :: OngoingTransaction -> EventTableName -> IO ()
exclusiveLock :: OngoingTransaction -> String -> IO ()
exclusiveLock (OngoingTransaction Connection
conn LocalPool Connection
_) String
etName =
    forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ Connection -> Query -> IO Int64
execute_ Connection
conn (Query
"lock \"" forall a. Semigroup a => a -> a -> a
<> forall a. IsString a => String -> a
fromString String
etName forall a. Semigroup a => a -> a -> a
<> Query
"\" in exclusive mode")

instance (ToJSON e, FromJSON e) => WriteModel (PostgresEvent m e) where
    transactionalUpdate :: forall (m :: * -> *) a.
MonadUnliftIO m =>
PostgresEvent m e
-> (Model (PostgresEvent m e)
    -> m (Model (PostgresEvent m e) -> a, [Event (PostgresEvent m e)]))
-> m a
transactionalUpdate PostgresEvent m e
pg Model (PostgresEvent m e)
-> m (Model (PostgresEvent m e) -> a, [Event (PostgresEvent m e)])
cmd = forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
runInIO -> do
        forall a model event.
PostgresEvent model event
-> (PostgresEventTrans model event -> IO a) -> IO a
withIOTrans PostgresEvent m e
pg forall a b. (a -> b) -> a -> b
$ \PostgresEventTrans m e
pgt -> do
            let eventTable :: String
eventTable = PostgresEvent m e
pg forall s a. s -> Getting a s a -> a
^. forall (field :: Symbol) s t a b.
HasField field s t a b =>
Lens s t a b
field @"eventTableName"
            OngoingTransaction -> String -> IO ()
exclusiveLock (PostgresEventTrans m e
pgt forall s a. s -> Getting a s a -> a
^. forall (field :: Symbol) s t a b.
HasField field s t a b =>
Lens s t a b
field @"transaction") String
eventTable
            m
m <- forall e m. FromJSON e => PostgresEventTrans m e -> IO m
getModel' PostgresEventTrans m e
pgt
            (m -> a
returnFun, [e]
evs) <- forall a. m a -> IO a
runInIO forall a b. (a -> b) -> a -> b
$ Model (PostgresEvent m e)
-> m (Model (PostgresEvent m e) -> a, [Event (PostgresEvent m e)])
cmd m
m
            NumberedModel m
m' EventNumber
_ <- forall a. IORef a -> IO a
readIORef (PostgresEvent m e
pg forall s a. s -> Getting a s a -> a
^. forall (field :: Symbol) s t a b.
HasField field s t a b =>
Lens s t a b
field @"modelIORef")
            [Stored e]
storedEvs <- forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
traverse forall (m :: * -> *) e. MonadIO m => e -> m (Stored e)
toStored [e]
evs
            let newM :: m
newM = forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
foldl' (PostgresEvent m e
pg forall s a. s -> Getting a s a -> a
^. forall (field :: Symbol) s t a b.
HasField field s t a b =>
Lens s t a b
field @"app") m
m' [Stored e]
storedEvs
            EventNumber
lastEventNo <-
                forall a.
ToJSON a =>
Connection -> String -> [Stored a] -> IO EventNumber
writeEvents
                    (PostgresEventTrans m e
pgt forall s a. s -> Getting a s a -> a
^. forall (field :: Symbol) s t a b.
HasField field s t a b =>
Lens s t a b
field @"transaction" forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall s a. (s -> a) -> SimpleGetter s a
to OngoingTransaction -> Connection
connection)
                    String
eventTable
                    [Stored e]
storedEvs
            ()
_ <- forall a. IORef a -> a -> IO ()
writeIORef (PostgresEvent m e
pg forall s a. s -> Getting a s a -> a
^. forall (field :: Symbol) s t a b.
HasField field s t a b =>
Lens s t a b
field @"modelIORef") forall a b. (a -> b) -> a -> b
$ forall m. m -> EventNumber -> NumberedModel m
NumberedModel m
newM EventNumber
lastEventNo
            forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ m -> a
returnFun m
newM