-- Hoogle documentation, generated by Haddock -- See Hoogle, http://www.haskell.org/hoogle/ -- | PostgreSQL logical streaming replication library -- -- Please see the README on GitHub at -- https://github.com/agentultra/postgresql-replicant#readme @package postgresql-replicant @version 0.2.0.1 module Database.PostgreSQL.Replicant.Exception newtype ReplicantException ReplicantException :: String -> ReplicantException instance GHC.Show.Show Database.PostgreSQL.Replicant.Exception.ReplicantException instance GHC.Exception.Type.Exception Database.PostgreSQL.Replicant.Exception.ReplicantException module Database.PostgreSQL.Replicant.PostgresUtils postgresEpoch :: IO Int64 mkUTCTime :: (Integer, Int, Int) -> (Int, Int, Pico) -> UTCTime module Database.PostgreSQL.Replicant.Serialize -- | Consume the rest of the Get a input as a ByteString consumeByteStringToEnd :: Get ByteString module Database.PostgreSQL.Replicant.Settings data PgSettings PgSettings :: String -> Maybe String -> String -> String -> String -> String -> String -> PgSettings [pgUser] :: PgSettings -> String [pgPassword] :: PgSettings -> Maybe String [pgDbName] :: PgSettings -> String [pgHost] :: PgSettings -> String [pgPort] :: PgSettings -> String [pgSlotName] :: PgSettings -> String -- | Controls how frequently the primaryKeepAlive thread updates -- PostgresSQL in ms [pgUpdateDelay] :: PgSettings -> String pgConnectionString :: PgSettings -> ByteString instance GHC.Show.Show Database.PostgreSQL.Replicant.Settings.PgSettings instance GHC.Classes.Eq Database.PostgreSQL.Replicant.Settings.PgSettings -- | Log Sequence Number or LSN is a pointer to a place inside of a -- WAL log file. It contains the file name and an offset in bytes encoded -- in two parts. -- -- LSNs can be serialized into 64-bit big-endian numbers in the binary -- protocol but are also represented textually in query results and other -- places. -- -- This module follows a similar convention to many containers libraries -- and should probably be imported qualified to avoid name clashes if -- needed. -- -- See: https://www.postgresql.org/docs/10/datatype-pg-lsn.html module Database.PostgreSQL.Replicant.Types.Lsn data LSN LSN :: !Int32 -> !Int32 -> LSN -- | Filepart [filepart] :: LSN -> !Int32 -- | Offset [offset] :: LSN -> !Int32 -- | Convert an LSN to a 64-bit integer toInt64 :: LSN -> Int64 -- | Convert a 64-bit integer to an LSN fromInt64 :: Int64 -> LSN lsnParser :: Parser LSN fromByteString :: ByteString -> Either String LSN -- | Note that as of bytestring ~0.10.12.0 we don't have upper-case hex -- encoders but the patch to add them has been merged and when available -- we should switch to them toByteString :: LSN -> ByteString -- | Add a number of bytes to an LSN add :: LSN -> Int64 -> LSN -- | Subtract a number of bytes from an LSN sub :: LSN -> Int64 -> LSN -- | Subtract two LSN's to calculate the difference of bytes between them. subLsn :: LSN -> LSN -> Int64 instance GHC.Classes.Eq Database.PostgreSQL.Replicant.Types.Lsn.LSN instance GHC.Show.Show Database.PostgreSQL.Replicant.Types.Lsn.LSN instance GHC.Classes.Ord Database.PostgreSQL.Replicant.Types.Lsn.LSN instance Data.Serialize.Serialize Database.PostgreSQL.Replicant.Types.Lsn.LSN instance Data.Aeson.Types.ToJSON.ToJSON Database.PostgreSQL.Replicant.Types.Lsn.LSN instance Data.Aeson.Types.FromJSON.FromJSON Database.PostgreSQL.Replicant.Types.Lsn.LSN -- | This module has the types and functions for maintaining the client -- stream state. -- -- After initiating a replication stream the wal_sender process on the -- server may require clients to periodically send progress updates. The -- wal_sender process uses those updates to maintain its internal view of -- the clients' state. -- -- This enables the server to report on things like replication lag and -- enables the client to disconnect and restart the stream where it left -- off. module Database.PostgreSQL.Replicant.State data WalProgress WalProgress :: LSN -> LSN -> LSN -> WalProgress [walProgressReceived] :: WalProgress -> LSN [walProgressFlushed] :: WalProgress -> LSN [walProgressApplied] :: WalProgress -> LSN newtype WalProgressState WalProgressState :: MVar WalProgress -> WalProgressState updateWalProgress :: WalProgressState -> LSN -> IO () instance GHC.Show.Show Database.PostgreSQL.Replicant.State.WalProgress instance GHC.Classes.Eq Database.PostgreSQL.Replicant.State.WalProgress -- | This module contains the binary protocols messages used in the -- streaming replication protocol as well as the messages used in the -- body of the logical stream messages. module Database.PostgreSQL.Replicant.Message -- | Indicates whether the server or the client should respond to the -- message. data ResponseExpectation ShouldRespond :: ResponseExpectation DoNotRespond :: ResponseExpectation -- | The Postgres WAL sender thread may periodically send these messages. -- When the server expects the client to respond it updates its internal -- state of the client based on the response. Failure to respond results -- in the dreaded "WAL timeout" error. -- -- See StandbyStatusUpdate for the message the client should respond -- with. data PrimaryKeepAlive PrimaryKeepAlive :: !Int64 -> !Int64 -> !ResponseExpectation -> PrimaryKeepAlive [primaryKeepAliveWalEnd] :: PrimaryKeepAlive -> !Int64 [primaryKeepAliveSendTime] :: PrimaryKeepAlive -> !Int64 [primaryKeepAliveResponseExpectation] :: PrimaryKeepAlive -> !ResponseExpectation -- | Sent by the client. Can be sent periodically or in response to a -- PrimaryKeepAlive message from the server. Gives the server information -- about the client stream state. data StandbyStatusUpdate StandbyStatusUpdate :: !LSN -> !LSN -> !LSN -> !Int64 -> !ResponseExpectation -> StandbyStatusUpdate [standbyStatuUpdateLastWalByteReceived] :: StandbyStatusUpdate -> !LSN [standbyStatusUpdateLastWalByteFlushed] :: StandbyStatusUpdate -> !LSN [standbyStatusUpdateLastWalByteApplied] :: StandbyStatusUpdate -> !LSN [standbyStatusUpdateSendTime] :: StandbyStatusUpdate -> !Int64 [standbyStatusUpdateResponseExpectation] :: StandbyStatusUpdate -> !ResponseExpectation -- | Carries WAL segments in the streaming protocol. data XLogData XLogData :: !LSN -> !LSN -> !Int64 -> !ByteString -> XLogData [xLogDataWalStart] :: XLogData -> !LSN [xLogDataWalEnd] :: XLogData -> !LSN [xLogDataSendTime] :: XLogData -> !Int64 [xLogDataWalData] :: XLogData -> !ByteString -- | Not used yet but enables streaming in hot-standby mode. data HotStandbyFeedback HotStandbyFeedback :: !Int64 -> !Int32 -> !Int32 -> HotStandbyFeedback [hotStandbyFeedbackClientSendTime] :: HotStandbyFeedback -> !Int64 [hotStandbyFeedbackCurrentXmin] :: HotStandbyFeedback -> !Int32 [hotStandbyFeedbackCurrentEpoch] :: HotStandbyFeedback -> !Int32 -- | This structure wraps the two messages sent by the server so that we -- get a Serialize instance for both. data WalCopyData XLogDataM :: !XLogData -> WalCopyData KeepAliveM :: !PrimaryKeepAlive -> WalCopyData -- | Wraps Postgres values. Since this library currently supports only the -- wal2json logical decoder plugin we have the JSON values much -- like Aeson does. data WalValue WalString :: !Text -> WalValue WalNumber :: !Scientific -> WalValue WalBool :: !Bool -> WalValue WalNull :: WalValue -- | Represents a single table column. We only support the -- wal2json logical decoder plugin and make no attempt to parse -- anything but JSON-like primitives. data Column Column :: !Text -> !Text -> !WalValue -> Column [columnName] :: Column -> !Text [columnType] :: Column -> !Text [columnValue] :: Column -> !WalValue data ColumnParseError ColumnLengthMatchError :: ColumnParseError -- | Some WAL output plugins encode column values in three, equal length, -- heterogeneous lists. columns :: [Text] -> [Text] -> [WalValue] -> Either ColumnParseError [Column] fromColumn :: Column -> (Text, Text, WalValue) fromColumns :: [Column] -> ([Text], [Text], [WalValue]) -- | Represents a single insert query in the logical replication format. data Insert Insert :: !String -> !String -> ![Column] -> Insert [insertSchema] :: Insert -> !String [insertTable] :: Insert -> !String [insertColumns] :: Insert -> ![Column] -- | Represents a single update query in the logical replication format. data Update Update :: !Text -> !Text -> ![Column] -> Update [updateSchema] :: Update -> !Text [updateTable] :: Update -> !Text [updateColumns] :: Update -> ![Column] -- | Represents a single delete query in the logical replication format data Delete Delete :: !Text -> !Text -> ![Column] -> Delete [deleteSchema] :: Delete -> !Text [deleteTable] :: Delete -> !Text [deleteColumns] :: Delete -> ![Column] -- | Occasionally the server may also send these for informational purposes -- and can be ignored. May be used internally. data Message Message :: !Bool -> !Text -> !Text -> Message [messageTransactional] :: Message -> !Bool [messagePrefix] :: Message -> !Text [messageContent] :: Message -> !Text data WalLogData WInsert :: !Insert -> WalLogData WUpdate :: !Update -> WalLogData WDelete :: !Delete -> WalLogData WMessage :: !Message -> WalLogData data Change Change :: LSN -> [WalLogData] -> Change -- | Return this LSN in your callback to update the stream state in -- replicant [changeNextLSN] :: Change -> LSN -- | The list of WAL log changes in this transaction. [changeDeltas] :: Change -> [WalLogData] instance GHC.Show.Show Database.PostgreSQL.Replicant.Message.ResponseExpectation instance GHC.Generics.Generic Database.PostgreSQL.Replicant.Message.ResponseExpectation instance GHC.Classes.Eq Database.PostgreSQL.Replicant.Message.ResponseExpectation instance GHC.Show.Show Database.PostgreSQL.Replicant.Message.PrimaryKeepAlive instance GHC.Generics.Generic Database.PostgreSQL.Replicant.Message.PrimaryKeepAlive instance GHC.Classes.Eq Database.PostgreSQL.Replicant.Message.PrimaryKeepAlive instance GHC.Show.Show Database.PostgreSQL.Replicant.Message.StandbyStatusUpdate instance GHC.Generics.Generic Database.PostgreSQL.Replicant.Message.StandbyStatusUpdate instance GHC.Classes.Eq Database.PostgreSQL.Replicant.Message.StandbyStatusUpdate instance GHC.Show.Show Database.PostgreSQL.Replicant.Message.XLogData instance GHC.Generics.Generic Database.PostgreSQL.Replicant.Message.XLogData instance GHC.Classes.Eq Database.PostgreSQL.Replicant.Message.XLogData instance GHC.Show.Show Database.PostgreSQL.Replicant.Message.HotStandbyFeedback instance GHC.Generics.Generic Database.PostgreSQL.Replicant.Message.HotStandbyFeedback instance GHC.Classes.Eq Database.PostgreSQL.Replicant.Message.HotStandbyFeedback instance GHC.Show.Show Database.PostgreSQL.Replicant.Message.WalCopyData instance GHC.Generics.Generic Database.PostgreSQL.Replicant.Message.WalCopyData instance GHC.Classes.Eq Database.PostgreSQL.Replicant.Message.WalCopyData instance GHC.Show.Show Database.PostgreSQL.Replicant.Message.WalValue instance GHC.Generics.Generic Database.PostgreSQL.Replicant.Message.WalValue instance GHC.Classes.Eq Database.PostgreSQL.Replicant.Message.WalValue instance GHC.Show.Show Database.PostgreSQL.Replicant.Message.Column instance GHC.Classes.Eq Database.PostgreSQL.Replicant.Message.Column instance GHC.Show.Show Database.PostgreSQL.Replicant.Message.ColumnParseError instance GHC.Classes.Eq Database.PostgreSQL.Replicant.Message.ColumnParseError instance GHC.Show.Show Database.PostgreSQL.Replicant.Message.Insert instance GHC.Classes.Eq Database.PostgreSQL.Replicant.Message.Insert instance GHC.Show.Show Database.PostgreSQL.Replicant.Message.Update instance GHC.Classes.Eq Database.PostgreSQL.Replicant.Message.Update instance GHC.Show.Show Database.PostgreSQL.Replicant.Message.Delete instance GHC.Classes.Eq Database.PostgreSQL.Replicant.Message.Delete instance GHC.Show.Show Database.PostgreSQL.Replicant.Message.Message instance GHC.Classes.Eq Database.PostgreSQL.Replicant.Message.Message instance GHC.Show.Show Database.PostgreSQL.Replicant.Message.WalLogData instance GHC.Generics.Generic Database.PostgreSQL.Replicant.Message.WalLogData instance GHC.Classes.Eq Database.PostgreSQL.Replicant.Message.WalLogData instance GHC.Show.Show Database.PostgreSQL.Replicant.Message.Change instance GHC.Generics.Generic Database.PostgreSQL.Replicant.Message.Change instance GHC.Classes.Eq Database.PostgreSQL.Replicant.Message.Change instance Data.Aeson.Types.ToJSON.ToJSON Database.PostgreSQL.Replicant.Message.Change instance Data.Aeson.Types.FromJSON.FromJSON Database.PostgreSQL.Replicant.Message.Change instance Data.Aeson.Types.ToJSON.ToJSON Database.PostgreSQL.Replicant.Message.WalLogData instance Data.Aeson.Types.FromJSON.FromJSON Database.PostgreSQL.Replicant.Message.WalLogData instance Data.Aeson.Types.FromJSON.FromJSON Database.PostgreSQL.Replicant.Message.Message instance Data.Aeson.Types.ToJSON.ToJSON Database.PostgreSQL.Replicant.Message.Message instance Data.Aeson.Types.FromJSON.FromJSON Database.PostgreSQL.Replicant.Message.Delete instance Data.Aeson.Types.ToJSON.ToJSON Database.PostgreSQL.Replicant.Message.Delete instance Data.Aeson.Types.FromJSON.FromJSON Database.PostgreSQL.Replicant.Message.Update instance Data.Aeson.Types.ToJSON.ToJSON Database.PostgreSQL.Replicant.Message.Update instance Data.Aeson.Types.FromJSON.FromJSON Database.PostgreSQL.Replicant.Message.Insert instance Data.Aeson.Types.ToJSON.ToJSON Database.PostgreSQL.Replicant.Message.Insert instance Data.Aeson.Types.FromJSON.FromJSON Database.PostgreSQL.Replicant.Message.WalValue instance Data.Aeson.Types.ToJSON.ToJSON Database.PostgreSQL.Replicant.Message.WalValue instance Data.Serialize.Serialize Database.PostgreSQL.Replicant.Message.WalCopyData instance Data.Serialize.Serialize Database.PostgreSQL.Replicant.Message.HotStandbyFeedback instance Data.Serialize.Serialize Database.PostgreSQL.Replicant.Message.XLogData instance Data.Serialize.Serialize Database.PostgreSQL.Replicant.Message.StandbyStatusUpdate instance Data.Serialize.Serialize Database.PostgreSQL.Replicant.Message.PrimaryKeepAlive instance Data.Serialize.Serialize Database.PostgreSQL.Replicant.Message.ResponseExpectation module Database.PostgreSQL.Replicant.Util mkInt64 :: Int -> Int64 maybeThrow :: Exception e => e -> Maybe a -> IO a module Database.PostgreSQL.Replicant.Connection data ReplicantConnection -- | Connect to the PostgreSQL server in replication mode connect :: PgSettings -> IO ReplicantConnection getConnection :: ReplicantConnection -> Connection -- | Unsafe function for wrapping regular libpq Connection. This is unsafe -- because the Connection needs to be set up to send replication -- commands. Improperly constructed connections can lead to runtime -- exceptions. unsafeCreateConnection :: Connection -> ReplicantConnection instance GHC.Classes.Eq Database.PostgreSQL.Replicant.Connection.ReplicantConnection instance GHC.Show.Show Database.PostgreSQL.Replicant.Connection.ConnectResult instance GHC.Classes.Eq Database.PostgreSQL.Replicant.Connection.ConnectResult -- | This module contains the PostgreSQL queries, types, and functions for -- working with querying, creating, and working with replication slots. module Database.PostgreSQL.Replicant.ReplicationSlot data ReplicationSlotInfo ReplicationSlotInfo :: ByteString -> ByteString -> ReplicationSlotType -> ReplicationSlotActive -> LSN -> ReplicationSlotInfo [slotName] :: ReplicationSlotInfo -> ByteString [slotPlugin] :: ReplicationSlotInfo -> ByteString [slotType] :: ReplicationSlotInfo -> ReplicationSlotType [slotActive] :: ReplicationSlotInfo -> ReplicationSlotActive [slotRestart] :: ReplicationSlotInfo -> LSN data ReplicationSlotType Logical :: ReplicationSlotType Physical :: ReplicationSlotType UnknownSlotType :: ReplicationSlotType parseSlotType :: ByteString -> ReplicationSlotType data ReplicationSlotActive Active :: ReplicationSlotActive Inactive :: ReplicationSlotActive parseSlotActive :: ByteString -> ReplicationSlotActive createReplicationSlotCommand :: ReplicantConnection -> ByteString -> IO ByteString -- | Create a replication slot using synchronous query execution. -- -- May throw an exception if the command fails. createReplicationSlotSync :: ReplicantConnection -> ByteString -> IO ReplicationSlotInfo getReplicationSlotInfoCommand :: ReplicantConnection -> ByteString -> IO ByteString -- | Get information about an existing replication slot. Returns -- Nothing when the requested slot cannot be found. -- -- May throw an exception if the command query fails. getReplicationSlotSync :: ReplicantConnection -> ByteString -> IO (Maybe ReplicationSlotInfo) -- | Create replication slot or retrieve an existing slot. -- -- Can throw exceptions from getReplicationSlotSync or -- createReplicationSlotSync. setupReplicationSlot :: ReplicantConnection -> ByteString -> IO ReplicationSlotInfo instance GHC.Show.Show Database.PostgreSQL.Replicant.ReplicationSlot.ReplicationSlotType instance GHC.Classes.Eq Database.PostgreSQL.Replicant.ReplicationSlot.ReplicationSlotType instance GHC.Show.Show Database.PostgreSQL.Replicant.ReplicationSlot.ReplicationSlotActive instance GHC.Classes.Eq Database.PostgreSQL.Replicant.ReplicationSlot.ReplicationSlotActive instance GHC.Show.Show Database.PostgreSQL.Replicant.ReplicationSlot.ReplicationSlotInfo instance GHC.Classes.Eq Database.PostgreSQL.Replicant.ReplicationSlot.ReplicationSlotInfo -- | This module implements the Postgres streaming replication protocol. -- -- See: -- https://www.postgresql.org/docs/9.5/protocol-replication.html module Database.PostgreSQL.Replicant.Protocol -- | The information returned by the IDENTIFY_SYSTEM command -- establishes the stream's log start, position, and information about -- the database. data IdentifySystem IdentifySystem :: ByteString -> ByteString -> LSN -> Maybe ByteString -> IdentifySystem [identifySystemSytemId] :: IdentifySystem -> ByteString [identifySystemTimeline] :: IdentifySystem -> ByteString [identifySystemLogPos] :: IdentifySystem -> LSN [identifySystemDbName] :: IdentifySystem -> Maybe ByteString identifySystemCommand :: ByteString -- | Synchronously execute the IDENTIFY SYSTEM command which -- returns some basic system information about the server. identifySystemSync :: ReplicantConnection -> IO (Maybe IdentifySystem) -- | Create a START_REPLICATION_SLOT query, escaping the slot name -- passed in by the user. startReplicationCommand :: ReplicantConnection -> ByteString -> LSN -> IO ByteString -- | This handles the COPY OUT mode messages. PostgreSQL uses this mode to -- copy the data from a WAL log file to the socket in the streaming -- replication protocol. handleCopyOutData :: TChan PrimaryKeepAlive -> WalProgressState -> ReplicantConnection -> (Change -> IO LSN) -> IO () handleReplicationRow :: TChan PrimaryKeepAlive -> WalProgressState -> ReplicantConnection -> ByteString -> (Change -> IO LSN) -> IO () -- | Used to re-throw an exception received from the server. handleReplicationError :: ReplicantConnection -> IO () handleReplicationNoop :: IO () -- | Initiate the streaming replication protocol handler. This will race -- the keep-alive and copy data handler threads. It will -- catch and rethrow exceptions from either thread if any fails or -- returns. startReplicationStream :: ReplicantConnection -> ByteString -> LSN -> Int -> (Change -> IO LSN) -> IO () -- | This listens on the channel for primary keep-alive messages -- from the server and responds to them with the update status -- message using the current WAL stream state. It will attempt to buffer -- prior update messages when the socket is blocked. keepAliveHandler :: ReplicantConnection -> TChan PrimaryKeepAlive -> WalProgressState -> IO () sendStatusUpdate :: ReplicantConnection -> WalProgressState -> IO () instance GHC.Show.Show Database.PostgreSQL.Replicant.Protocol.IdentifySystem instance GHC.Classes.Eq Database.PostgreSQL.Replicant.Protocol.IdentifySystem -- | Connect to a PostgreSQL server as a logical replication client and -- receive changes. -- -- The basic API is this: -- --
--   withLogicalStream defaultSettings $ change -> do
--     print change
--     catch err -> do
--       show err
--   
-- -- This is a low-level library meant to give the primitives necessary to -- library authors to add streaming replication support. The API here to -- rather simplistic but should be hooked up to something like conduit to -- provide better ergonomics. module Database.PostgreSQL.Replicant data Change Change :: LSN -> [WalLogData] -> Change -- | Return this LSN in your callback to update the stream state in -- replicant [changeNextLSN] :: Change -> LSN -- | The list of WAL log changes in this transaction. [changeDeltas] :: Change -> [WalLogData] -- | Represents a single table column. We only support the -- wal2json logical decoder plugin and make no attempt to parse -- anything but JSON-like primitives. data Column Column :: !Text -> !Text -> !WalValue -> Column [columnName] :: Column -> !Text [columnType] :: Column -> !Text [columnValue] :: Column -> !WalValue -- | Represents a single delete query in the logical replication format data Delete Delete :: !Text -> !Text -> ![Column] -> Delete [deleteSchema] :: Delete -> !Text [deleteTable] :: Delete -> !Text [deleteColumns] :: Delete -> ![Column] -- | Represents a single insert query in the logical replication format. data Insert Insert :: !String -> !String -> ![Column] -> Insert [insertSchema] :: Insert -> !String [insertTable] :: Insert -> !String [insertColumns] :: Insert -> ![Column] -- | Occasionally the server may also send these for informational purposes -- and can be ignored. May be used internally. data Message Message :: !Bool -> !Text -> !Text -> Message [messageTransactional] :: Message -> !Bool [messagePrefix] :: Message -> !Text [messageContent] :: Message -> !Text data PgSettings PgSettings :: String -> Maybe String -> String -> String -> String -> String -> String -> PgSettings [pgUser] :: PgSettings -> String [pgPassword] :: PgSettings -> Maybe String [pgDbName] :: PgSettings -> String [pgHost] :: PgSettings -> String [pgPort] :: PgSettings -> String [pgSlotName] :: PgSettings -> String -- | Controls how frequently the primaryKeepAlive thread updates -- PostgresSQL in ms [pgUpdateDelay] :: PgSettings -> String data ReplicantConnection -- | Represents a single update query in the logical replication format. data Update Update :: !Text -> !Text -> ![Column] -> Update [updateSchema] :: Update -> !Text [updateTable] :: Update -> !Text [updateColumns] :: Update -> ![Column] data WalLogData WInsert :: !Insert -> WalLogData WUpdate :: !Update -> WalLogData WDelete :: !Delete -> WalLogData WMessage :: !Message -> WalLogData -- | Connect to the PostgreSQL server in replication mode connect :: PgSettings -> IO ReplicantConnection getConnection :: ReplicantConnection -> Connection -- | Unsafe function for wrapping regular libpq Connection. This is unsafe -- because the Connection needs to be set up to send replication -- commands. Improperly constructed connections can lead to runtime -- exceptions. unsafeCreateConnection :: Connection -> ReplicantConnection -- | Connect to a PostgreSQL database as a user with the replication -- attribute and start receiving changes using the logical replication -- protocol. Logical replication happens at the query level so the -- changes you get represent the set of queries in a transaction: -- insert, update, and delete. -- -- This function will create the replication slot, if it doesn't exist, -- or reconnect to it otherwise and restart the stream from where the -- replication slot left off. -- -- This function can throw exceptions in IO and shut-down the -- socket in case of any error. withLogicalStream :: PgSettings -> (Change -> IO LSN) -> IO ()