{-# LANGUAGE Strict #-}

{-|
Module      : Database.PostgreSQL.Replicant.Protocol
Description : Streaming replication protocol
Copyright   : (c) James King, 2020, 2021
License     : BSD3
Maintainer  : james@agentultra.com
Stability   : experimental
Portability : POSIX

This module implements the Postgres streaming replication protocol.

See: https://www.postgresql.org/docs/9.5/protocol-replication.html
-}
module Database.PostgreSQL.Replicant.Protocol where

import Control.Concurrent
import Control.Concurrent.Async
import Control.Concurrent.STM
import Control.Exception.Base
import Control.Monad (forever)
import Data.Aeson (eitherDecode')
import Data.ByteString (ByteString)
import qualified Data.ByteString.Lazy as BL
import qualified Data.ByteString.Char8 as B
import Data.Maybe
import Data.Serialize hiding (flush)
import Database.PostgreSQL.LibPQ

import Database.PostgreSQL.Replicant.Connection
import Database.PostgreSQL.Replicant.Exception
import Database.PostgreSQL.Replicant.Message
import Database.PostgreSQL.Replicant.PostgresUtils
import Database.PostgreSQL.Replicant.State
import Database.PostgreSQL.Replicant.Types.Lsn

-- | The information returned by the @IDENTIFY_SYSTEM@ command
-- establishes the stream's log start, position, and information about
-- the database.
data IdentifySystem
  = IdentifySystem
  { IdentifySystem -> ByteString
identifySystemSytemId  :: ByteString
  , IdentifySystem -> ByteString
identifySystemTimeline :: ByteString
  , IdentifySystem -> LSN
identifySystemLogPos   :: LSN
  , IdentifySystem -> Maybe ByteString
identifySystemDbName   :: Maybe ByteString
  }
  deriving (IdentifySystem -> IdentifySystem -> Bool
(IdentifySystem -> IdentifySystem -> Bool)
-> (IdentifySystem -> IdentifySystem -> Bool) -> Eq IdentifySystem
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: IdentifySystem -> IdentifySystem -> Bool
$c/= :: IdentifySystem -> IdentifySystem -> Bool
== :: IdentifySystem -> IdentifySystem -> Bool
$c== :: IdentifySystem -> IdentifySystem -> Bool
Eq, Int -> IdentifySystem -> ShowS
[IdentifySystem] -> ShowS
IdentifySystem -> String
(Int -> IdentifySystem -> ShowS)
-> (IdentifySystem -> String)
-> ([IdentifySystem] -> ShowS)
-> Show IdentifySystem
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [IdentifySystem] -> ShowS
$cshowList :: [IdentifySystem] -> ShowS
show :: IdentifySystem -> String
$cshow :: IdentifySystem -> String
showsPrec :: Int -> IdentifySystem -> ShowS
$cshowsPrec :: Int -> IdentifySystem -> ShowS
Show)

identifySystemCommand :: ByteString
identifySystemCommand :: ByteString
identifySystemCommand = ByteString
"IDENTIFY_SYSTEM"

-- | Synchronously execute the @IDENTIFY SYSTEM@ command which returns
-- some basic system information about the server.
identifySystemSync :: ReplicantConnection -> IO (Maybe IdentifySystem)
identifySystemSync :: ReplicantConnection -> IO (Maybe IdentifySystem)
identifySystemSync ReplicantConnection
conn = do
  Maybe Result
result <- Connection -> ByteString -> IO (Maybe Result)
exec (ReplicantConnection -> Connection
getConnection ReplicantConnection
conn) ByteString
identifySystemCommand
  case Maybe Result
result of
    Just Result
r -> do
      ExecStatus
resultStatus <- Result -> IO ExecStatus
resultStatus Result
r
      case ExecStatus
resultStatus of
        ExecStatus
TuplesOk -> do
          Maybe ByteString
systemId <- Result -> Row -> Column -> IO (Maybe ByteString)
getvalue' Result
r (Integer -> Row
forall a. Integral a => a -> Row
toRow Integer
0) (Integer -> Column
forall a. Integral a => a -> Column
toColumn Integer
0)
          Maybe ByteString
timeline <- Result -> Row -> Column -> IO (Maybe ByteString)
getvalue' Result
r (Integer -> Row
forall a. Integral a => a -> Row
toRow Integer
0) (Integer -> Column
forall a. Integral a => a -> Column
toColumn Integer
1)
          Maybe ByteString
logpos   <- Result -> Row -> Column -> IO (Maybe ByteString)
getvalue' Result
r (Integer -> Row
forall a. Integral a => a -> Row
toRow Integer
0) (Integer -> Column
forall a. Integral a => a -> Column
toColumn Integer
2)
          Maybe ByteString
dbname   <- Result -> Row -> Column -> IO (Maybe ByteString)
getvalue' Result
r (Integer -> Row
forall a. Integral a => a -> Row
toRow Integer
0) (Integer -> Column
forall a. Integral a => a -> Column
toColumn Integer
3)
          case (Maybe ByteString
systemId, Maybe ByteString
timeline, Maybe ByteString
logpos, Maybe ByteString
dbname) of
            (Just ByteString
s, Just ByteString
t, Just ByteString
l, Maybe ByteString
d) -> do
              case ByteString -> Either String LSN
fromByteString ByteString
l of
                Left String
_ -> Maybe IdentifySystem -> IO (Maybe IdentifySystem)
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe IdentifySystem
forall a. Maybe a
Nothing
                Right LSN
logPosLsn -> do
                  Maybe IdentifySystem -> IO (Maybe IdentifySystem)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe IdentifySystem -> IO (Maybe IdentifySystem))
-> Maybe IdentifySystem -> IO (Maybe IdentifySystem)
forall a b. (a -> b) -> a -> b
$ IdentifySystem -> Maybe IdentifySystem
forall a. a -> Maybe a
Just (ByteString
-> ByteString -> LSN -> Maybe ByteString -> IdentifySystem
IdentifySystem ByteString
s ByteString
t LSN
logPosLsn Maybe ByteString
d)
            (Maybe ByteString, Maybe ByteString, Maybe ByteString,
 Maybe ByteString)
_ -> Maybe IdentifySystem -> IO (Maybe IdentifySystem)
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe IdentifySystem
forall a. Maybe a
Nothing
        ExecStatus
_ -> do
          ByteString
err <- ByteString -> Maybe ByteString -> ByteString
forall a. a -> Maybe a -> a
fromMaybe ByteString
"identifySystemSync: unknown error"
            (Maybe ByteString -> ByteString)
-> IO (Maybe ByteString) -> IO ByteString
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Connection -> IO (Maybe ByteString)
errorMessage (ReplicantConnection -> Connection
getConnection ReplicantConnection
conn)
          ReplicantException -> IO (Maybe IdentifySystem)
forall e a. Exception e => e -> IO a
throwIO (ReplicantException -> IO (Maybe IdentifySystem))
-> ReplicantException -> IO (Maybe IdentifySystem)
forall a b. (a -> b) -> a -> b
$ String -> ReplicantException
ReplicantException (ByteString -> String
B.unpack ByteString
err)
    Maybe Result
_ -> do
      ByteString
err <- ByteString -> Maybe ByteString -> ByteString
forall a. a -> Maybe a -> a
fromMaybe ByteString
"identifySystemSync: unknown error"
        (Maybe ByteString -> ByteString)
-> IO (Maybe ByteString) -> IO ByteString
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Connection -> IO (Maybe ByteString)
errorMessage (ReplicantConnection -> Connection
getConnection ReplicantConnection
conn)
      ReplicantException -> IO (Maybe IdentifySystem)
forall e a. Exception e => e -> IO a
throwIO (ReplicantException -> IO (Maybe IdentifySystem))
-> ReplicantException -> IO (Maybe IdentifySystem)
forall a b. (a -> b) -> a -> b
$ String -> ReplicantException
ReplicantException (ByteString -> String
B.unpack ByteString
err)

-- | Create a @START_REPLICATION_SLOT@ query, escaping the slot name
-- passed in by the user.
startReplicationCommand :: ReplicantConnection -> ByteString -> LSN -> IO ByteString
startReplicationCommand :: ReplicantConnection -> ByteString -> LSN -> IO ByteString
startReplicationCommand ReplicantConnection
conn ByteString
slotName LSN
systemLogPos = do
  Maybe ByteString
escapedName <- Connection -> ByteString -> IO (Maybe ByteString)
escapeIdentifier (ReplicantConnection -> Connection
getConnection ReplicantConnection
conn) ByteString
slotName
  case Maybe ByteString
escapedName of
    Maybe ByteString
Nothing -> ReplicantException -> IO ByteString
forall e a. Exception e => e -> IO a
throwIO (ReplicantException -> IO ByteString)
-> ReplicantException -> IO ByteString
forall a b. (a -> b) -> a -> b
$ String -> ReplicantException
ReplicantException (String -> ReplicantException) -> String -> ReplicantException
forall a b. (a -> b) -> a -> b
$ String
"Invalid slot name: " String -> ShowS
forall a. [a] -> [a] -> [a]
++ ByteString -> String
forall a. Show a => a -> String
show ByteString
slotName
    Just ByteString
escaped ->
      ByteString -> IO ByteString
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ByteString -> IO ByteString) -> ByteString -> IO ByteString
forall a b. (a -> b) -> a -> b
$
      ByteString -> [ByteString] -> ByteString
B.intercalate
      ByteString
""
      [ ByteString
"START_REPLICATION SLOT "
      , ByteString
escaped
      , ByteString
" LOGICAL "
      , LSN -> ByteString
toByteString LSN
systemLogPos
      , ByteString
" (\"include-lsn\" 'on')"
      ]

-- | This handles the COPY OUT mode messages.  PostgreSQL uses this
-- mode to copy the data from a WAL log file to the socket in the
-- streaming replication protocol.
handleCopyOutData
  :: TChan PrimaryKeepAlive
  -> WalProgressState
  -> ReplicantConnection
  -> (Change -> IO LSN)
  -> IO ()
handleCopyOutData :: TChan PrimaryKeepAlive
-> WalProgressState
-> ReplicantConnection
-> (Change -> IO LSN)
-> IO ()
handleCopyOutData TChan PrimaryKeepAlive
chan WalProgressState
walState ReplicantConnection
conn Change -> IO LSN
cb = IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
  CopyOutResult
d <- Connection -> Bool -> IO CopyOutResult
getCopyData (ReplicantConnection -> Connection
getConnection ReplicantConnection
conn) Bool
False
  case CopyOutResult
d of
    CopyOutRow ByteString
row -> TChan PrimaryKeepAlive
-> WalProgressState
-> ReplicantConnection
-> ByteString
-> (Change -> IO LSN)
-> IO ()
handleReplicationRow TChan PrimaryKeepAlive
chan WalProgressState
walState ReplicantConnection
conn ByteString
row Change -> IO LSN
cb
    CopyOutResult
CopyOutError   -> ReplicantConnection -> IO ()
handleReplicationError ReplicantConnection
conn
    CopyOutResult
_              -> IO ()
handleReplicationNoop

handleReplicationRow
  :: TChan PrimaryKeepAlive
  -> WalProgressState
  -> ReplicantConnection
  -> ByteString
  -> (Change -> IO LSN)
  -> IO ()
handleReplicationRow :: TChan PrimaryKeepAlive
-> WalProgressState
-> ReplicantConnection
-> ByteString
-> (Change -> IO LSN)
-> IO ()
handleReplicationRow TChan PrimaryKeepAlive
keepAliveChan WalProgressState
walState ReplicantConnection
_ ByteString
row Change -> IO LSN
cb =
  case ByteString -> Either String WalCopyData
forall a. Serialize a => ByteString -> Either String a
decode @WalCopyData ByteString
row of
    Left String
err ->
      ReplicantException -> IO ()
forall e a. Exception e => e -> IO a
throwIO
      (ReplicantException -> IO ()) -> ReplicantException -> IO ()
forall a b. (a -> b) -> a -> b
$ String -> ReplicantException
ReplicantException
      (String -> ReplicantException) -> String -> ReplicantException
forall a b. (a -> b) -> a -> b
$ String
"handleReplicationRow (decode error): " String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
err
    Right WalCopyData
m  -> case WalCopyData
m of
      XLogDataM XLogData
xlog -> do
        case FromJSON Change => ByteString -> Either String Change
forall a. FromJSON a => ByteString -> Either String a
eitherDecode' @Change (ByteString -> Either String Change)
-> ByteString -> Either String Change
forall a b. (a -> b) -> a -> b
$ ByteString -> ByteString
BL.fromStrict (ByteString -> ByteString) -> ByteString -> ByteString
forall a b. (a -> b) -> a -> b
$ XLogData -> ByteString
xLogDataWalData XLogData
xlog of
          Left String
err ->
            ReplicantException -> IO ()
forall e a. Exception e => e -> IO a
throwIO
            (ReplicantException -> IO ()) -> ReplicantException -> IO ()
forall a b. (a -> b) -> a -> b
$ String -> ReplicantException
ReplicantException
            (String -> ReplicantException) -> String -> ReplicantException
forall a b. (a -> b) -> a -> b
$ String
"handleReplicationRow (parse error): " String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
err
          Right Change
walLogData -> do
            LSN
consumedLSN <- Change -> IO LSN
cb Change
walLogData
            WalProgressState -> LSN -> IO ()
updateWalProgress WalProgressState
walState LSN
consumedLSN
      KeepAliveM PrimaryKeepAlive
keepAlive -> STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TChan PrimaryKeepAlive -> PrimaryKeepAlive -> STM ()
forall a. TChan a -> a -> STM ()
writeTChan TChan PrimaryKeepAlive
keepAliveChan PrimaryKeepAlive
keepAlive

-- | Used to re-throw an exception received from the server.
handleReplicationError :: ReplicantConnection -> IO ()
handleReplicationError :: ReplicantConnection -> IO ()
handleReplicationError ReplicantConnection
conn = do
  Maybe ByteString
err <- Connection -> IO (Maybe ByteString)
errorMessage (Connection -> IO (Maybe ByteString))
-> Connection -> IO (Maybe ByteString)
forall a b. (a -> b) -> a -> b
$ ReplicantConnection -> Connection
getConnection ReplicantConnection
conn
  ReplicantException -> IO Any
forall e a. Exception e => e -> IO a
throwIO (String -> ReplicantException
ReplicantException (String -> ReplicantException) -> String -> ReplicantException
forall a b. (a -> b) -> a -> b
$ ByteString -> String
B.unpack (ByteString -> String)
-> (Maybe ByteString -> ByteString) -> Maybe ByteString -> String
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> Maybe ByteString -> ByteString
forall a. a -> Maybe a -> a
fromMaybe ByteString
"Unknown error" (Maybe ByteString -> String) -> Maybe ByteString -> String
forall a b. (a -> b) -> a -> b
$ Maybe ByteString
err)
  () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()

handleReplicationNoop :: IO ()
handleReplicationNoop :: IO ()
handleReplicationNoop = () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()

-- | Initiate the streaming replication protocol handler.  This will
-- race the /keep-alive/ and /copy data/ handler threads.  It will
-- catch and rethrow exceptions from either thread if any fails or
-- returns.
startReplicationStream :: ReplicantConnection -> ByteString -> LSN -> Int -> (Change -> IO LSN) -> IO ()
startReplicationStream :: ReplicantConnection
-> ByteString -> LSN -> Int -> (Change -> IO LSN) -> IO ()
startReplicationStream ReplicantConnection
conn ByteString
slotName LSN
systemLogPos Int
_ Change -> IO LSN
cb = do
  let initialWalProgress :: WalProgress
initialWalProgress = LSN -> LSN -> LSN -> WalProgress
WalProgress LSN
systemLogPos LSN
systemLogPos LSN
systemLogPos
  WalProgressState
walProgressState <- MVar WalProgress -> WalProgressState
WalProgressState (MVar WalProgress -> WalProgressState)
-> IO (MVar WalProgress) -> IO WalProgressState
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> WalProgress -> IO (MVar WalProgress)
forall a. a -> IO (MVar a)
newMVar WalProgress
initialWalProgress
  ByteString
replicationCommandQuery <- ReplicantConnection -> ByteString -> LSN -> IO ByteString
startReplicationCommand ReplicantConnection
conn ByteString
slotName LSN
systemLogPos
  Maybe Result
result <- Connection -> ByteString -> IO (Maybe Result)
exec (ReplicantConnection -> Connection
getConnection ReplicantConnection
conn) ByteString
replicationCommandQuery
  case Maybe Result
result of
    Maybe Result
Nothing -> do
      ByteString
err <- ByteString -> Maybe ByteString -> ByteString
forall a. a -> Maybe a -> a
fromMaybe ByteString
"startReplicationStream: unknown error starting stream"
        (Maybe ByteString -> ByteString)
-> IO (Maybe ByteString) -> IO ByteString
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Connection -> IO (Maybe ByteString)
errorMessage (ReplicantConnection -> Connection
getConnection ReplicantConnection
conn)
      ReplicantException -> IO ()
forall e a. Exception e => e -> IO a
throwIO (ReplicantException -> IO ()) -> ReplicantException -> IO ()
forall a b. (a -> b) -> a -> b
$ String -> ReplicantException
ReplicantException (String -> ReplicantException) -> String -> ReplicantException
forall a b. (a -> b) -> a -> b
$ String
"startReplicationStream: " String -> ShowS
forall a. [a] -> [a] -> [a]
++ ByteString -> String
B.unpack ByteString
err
    Just Result
r  -> do
      ExecStatus
status <- Result -> IO ExecStatus
resultStatus Result
r
      case ExecStatus
status of
        ExecStatus
CopyBoth -> do
          TChan PrimaryKeepAlive
keepAliveChan <- STM (TChan PrimaryKeepAlive) -> IO (TChan PrimaryKeepAlive)
forall a. STM a -> IO a
atomically STM (TChan PrimaryKeepAlive)
forall a. STM (TChan a)
newTChan
          IO () -> IO () -> IO (Either () ())
forall a b. IO a -> IO b -> IO (Either a b)
race
            (ReplicantConnection
-> TChan PrimaryKeepAlive -> WalProgressState -> IO ()
keepAliveHandler ReplicantConnection
conn TChan PrimaryKeepAlive
keepAliveChan WalProgressState
walProgressState)
            (TChan PrimaryKeepAlive
-> WalProgressState
-> ReplicantConnection
-> (Change -> IO LSN)
-> IO ()
handleCopyOutData TChan PrimaryKeepAlive
keepAliveChan WalProgressState
walProgressState ReplicantConnection
conn Change -> IO LSN
cb)
            IO (Either () ())
-> (SomeException -> IO (Either () ())) -> IO (Either () ())
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`catch`
            \SomeException
exc -> do
              Connection -> IO ()
finish (ReplicantConnection -> Connection
getConnection ReplicantConnection
conn)
              SomeException -> IO (Either () ())
forall e a. Exception e => e -> IO a
throwIO @SomeException SomeException
exc
          () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
        ExecStatus
_ -> do
          ByteString
err <- ByteString -> Maybe ByteString -> ByteString
forall a. a -> Maybe a -> a
fromMaybe ByteString
"startReplicationStream: unknown error entering COPY mode" (Maybe ByteString -> ByteString)
-> IO (Maybe ByteString) -> IO ByteString
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Connection -> IO (Maybe ByteString)
errorMessage (ReplicantConnection -> Connection
getConnection ReplicantConnection
conn)
          ReplicantException -> IO ()
forall e a. Exception e => e -> IO a
throwIO (ReplicantException -> IO ()) -> ReplicantException -> IO ()
forall a b. (a -> b) -> a -> b
$ String -> ReplicantException
ReplicantException (String -> ReplicantException) -> String -> ReplicantException
forall a b. (a -> b) -> a -> b
$ ByteString -> String
B.unpack ByteString
err

-- | This listens on the channel for /primary keep-alive messages/
-- from the server and responds to them with the /update status/
-- message using the current WAL stream state.  It will attempt to
-- buffer prior update messages when the socket is blocked.
keepAliveHandler :: ReplicantConnection -> TChan PrimaryKeepAlive -> WalProgressState -> IO ()
keepAliveHandler :: ReplicantConnection
-> TChan PrimaryKeepAlive -> WalProgressState -> IO ()
keepAliveHandler ReplicantConnection
conn TChan PrimaryKeepAlive
msgs WalProgressState
walProgressState = IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
  Maybe PrimaryKeepAlive
mKeepAlive <- STM (Maybe PrimaryKeepAlive) -> IO (Maybe PrimaryKeepAlive)
forall a. STM a -> IO a
atomically (STM (Maybe PrimaryKeepAlive) -> IO (Maybe PrimaryKeepAlive))
-> STM (Maybe PrimaryKeepAlive) -> IO (Maybe PrimaryKeepAlive)
forall a b. (a -> b) -> a -> b
$ TChan PrimaryKeepAlive -> STM (Maybe PrimaryKeepAlive)
forall a. TChan a -> STM (Maybe a)
tryReadTChan TChan PrimaryKeepAlive
msgs
  case Maybe PrimaryKeepAlive
mKeepAlive of
    Maybe PrimaryKeepAlive
Nothing -> do
      ReplicantConnection -> WalProgressState -> IO ()
sendStatusUpdate ReplicantConnection
conn WalProgressState
walProgressState
      Int -> IO ()
threadDelay Int
3000000
    Just PrimaryKeepAlive
keepAlive' -> do
      case PrimaryKeepAlive -> ResponseExpectation
primaryKeepAliveResponseExpectation PrimaryKeepAlive
keepAlive' of
        ResponseExpectation
DoNotRespond -> do
          Int -> IO ()
threadDelay Int
1000
        ResponseExpectation
ShouldRespond -> do
          ReplicantConnection -> WalProgressState -> IO ()
sendStatusUpdate ReplicantConnection
conn WalProgressState
walProgressState

sendStatusUpdate
  :: ReplicantConnection
  -> WalProgressState
  -> IO ()
sendStatusUpdate :: ReplicantConnection -> WalProgressState -> IO ()
sendStatusUpdate ReplicantConnection
conn w :: WalProgressState
w@(WalProgressState MVar WalProgress
walState) = do
  (WalProgress LSN
received LSN
flushed LSN
applied) <- MVar WalProgress -> IO WalProgress
forall a. MVar a -> IO a
readMVar MVar WalProgress
walState
  Int64
timestamp <- IO Int64
postgresEpoch
  let statusUpdate :: StandbyStatusUpdate
statusUpdate =
        LSN
-> LSN
-> LSN
-> Int64
-> ResponseExpectation
-> StandbyStatusUpdate
StandbyStatusUpdate
        LSN
received
        LSN
flushed
        LSN
applied
        Int64
timestamp
        ResponseExpectation
DoNotRespond
  CopyInResult
copyResult <- Connection -> ByteString -> IO CopyInResult
putCopyData (ReplicantConnection -> Connection
getConnection ReplicantConnection
conn) (ByteString -> IO CopyInResult) -> ByteString -> IO CopyInResult
forall a b. (a -> b) -> a -> b
$ StandbyStatusUpdate -> ByteString
forall a. Serialize a => a -> ByteString
encode StandbyStatusUpdate
statusUpdate
  case CopyInResult
copyResult of
    CopyInResult
CopyInOk -> do
      FlushStatus
flushResult <- Connection -> IO FlushStatus
flush (ReplicantConnection -> Connection
getConnection ReplicantConnection
conn)
      case FlushStatus
flushResult of
        FlushStatus
FlushOk -> () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
        FlushStatus
FlushFailed -> do
          ByteString
err <- ByteString -> Maybe ByteString -> ByteString
forall a. a -> Maybe a -> a
fromMaybe ByteString
"sendStatusUpdate: error flushing message to server" (Maybe ByteString -> ByteString)
-> IO (Maybe ByteString) -> IO ByteString
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Connection -> IO (Maybe ByteString)
errorMessage (ReplicantConnection -> Connection
getConnection ReplicantConnection
conn)
          ReplicantException -> IO ()
forall e a. Exception e => e -> IO a
throwIO (ReplicantException -> IO ()) -> ReplicantException -> IO ()
forall a b. (a -> b) -> a -> b
$ String -> ReplicantException
ReplicantException (String -> ReplicantException) -> String -> ReplicantException
forall a b. (a -> b) -> a -> b
$ ByteString -> String
B.unpack ByteString
err
        FlushStatus
FlushWriting -> Connection -> WalProgressState -> IO ()
tryAgain (ReplicantConnection -> Connection
getConnection ReplicantConnection
conn) WalProgressState
w
    CopyInResult
CopyInError -> do
      ByteString
err <- ByteString -> Maybe ByteString -> ByteString
forall a. a -> Maybe a -> a
fromMaybe ByteString
"sendStatusUpdate: unknown error sending COPY IN" (Maybe ByteString -> ByteString)
-> IO (Maybe ByteString) -> IO ByteString
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Connection -> IO (Maybe ByteString)
errorMessage (ReplicantConnection -> Connection
getConnection ReplicantConnection
conn)
      ReplicantException -> IO ()
forall e a. Exception e => e -> IO a
throwIO (ReplicantException -> IO ()) -> ReplicantException -> IO ()
forall a b. (a -> b) -> a -> b
$ String -> ReplicantException
ReplicantException (String -> ReplicantException) -> String -> ReplicantException
forall a b. (a -> b) -> a -> b
$ ByteString -> String
B.unpack ByteString
err
    CopyInResult
CopyInWouldBlock -> Connection -> WalProgressState -> IO ()
tryAgain (ReplicantConnection -> Connection
getConnection ReplicantConnection
conn) WalProgressState
w
  where
    tryAgain :: Connection -> WalProgressState -> IO ()
tryAgain Connection
c WalProgressState
ws = do
      Maybe Fd
mSockFd <- Connection -> IO (Maybe Fd)
socket Connection
c
      case Maybe Fd
mSockFd of
        Maybe Fd
Nothing ->
          ReplicantException -> IO ()
forall e a. Exception e => e -> IO a
throwIO (ReplicantException -> IO ()) -> ReplicantException -> IO ()
forall a b. (a -> b) -> a -> b
$ String -> ReplicantException
ReplicantException String
"sendStatusUpdate: failed to get socket fd"
        Just Fd
sockFd -> do
          Fd -> IO ()
threadWaitWrite Fd
sockFd
          ReplicantConnection -> WalProgressState -> IO ()
sendStatusUpdate ReplicantConnection
conn WalProgressState
ws