{-# LANGUAGE Strict #-}

{-|
Module      : Database.PostgreSQL.Replicant.ReplicationSlot
Description : Replication slot query commands
Copyright   : (c) James King, 2020, 2021
License     : BSD3
Maintainer  : james@agentultra.com
Stability   : experimental
Portability : POSIX

This module contains the PostgreSQL queries, types, and functions for
working with querying, creating, and working with replication slots.
-}
module Database.PostgreSQL.Replicant.ReplicationSlot where

import Control.Exception
import Data.ByteString (ByteString)
import Data.Maybe
import qualified Data.ByteString as B
import qualified Data.ByteString.Char8 as B8
import Database.PostgreSQL.LibPQ

import Database.PostgreSQL.Replicant.Connection
import Database.PostgreSQL.Replicant.Exception
import Database.PostgreSQL.Replicant.Types.Lsn

data ReplicationSlotInfo
  = ReplicationSlotInfo
  { ReplicationSlotInfo -> ByteString
slotName    :: ByteString
  , ReplicationSlotInfo -> ByteString
slotPlugin  :: ByteString
  , ReplicationSlotInfo -> ReplicationSlotType
slotType    :: ReplicationSlotType
  , ReplicationSlotInfo -> ReplicationSlotActive
slotActive  :: ReplicationSlotActive
  , ReplicationSlotInfo -> LSN
slotRestart :: LSN
  }
  deriving (ReplicationSlotInfo -> ReplicationSlotInfo -> Bool
(ReplicationSlotInfo -> ReplicationSlotInfo -> Bool)
-> (ReplicationSlotInfo -> ReplicationSlotInfo -> Bool)
-> Eq ReplicationSlotInfo
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: ReplicationSlotInfo -> ReplicationSlotInfo -> Bool
$c/= :: ReplicationSlotInfo -> ReplicationSlotInfo -> Bool
== :: ReplicationSlotInfo -> ReplicationSlotInfo -> Bool
$c== :: ReplicationSlotInfo -> ReplicationSlotInfo -> Bool
Eq, Int -> ReplicationSlotInfo -> ShowS
[ReplicationSlotInfo] -> ShowS
ReplicationSlotInfo -> String
(Int -> ReplicationSlotInfo -> ShowS)
-> (ReplicationSlotInfo -> String)
-> ([ReplicationSlotInfo] -> ShowS)
-> Show ReplicationSlotInfo
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [ReplicationSlotInfo] -> ShowS
$cshowList :: [ReplicationSlotInfo] -> ShowS
show :: ReplicationSlotInfo -> String
$cshow :: ReplicationSlotInfo -> String
showsPrec :: Int -> ReplicationSlotInfo -> ShowS
$cshowsPrec :: Int -> ReplicationSlotInfo -> ShowS
Show)

data ReplicationSlotType = Logical | Physical | UnknownSlotType
  deriving (ReplicationSlotType -> ReplicationSlotType -> Bool
(ReplicationSlotType -> ReplicationSlotType -> Bool)
-> (ReplicationSlotType -> ReplicationSlotType -> Bool)
-> Eq ReplicationSlotType
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: ReplicationSlotType -> ReplicationSlotType -> Bool
$c/= :: ReplicationSlotType -> ReplicationSlotType -> Bool
== :: ReplicationSlotType -> ReplicationSlotType -> Bool
$c== :: ReplicationSlotType -> ReplicationSlotType -> Bool
Eq, Int -> ReplicationSlotType -> ShowS
[ReplicationSlotType] -> ShowS
ReplicationSlotType -> String
(Int -> ReplicationSlotType -> ShowS)
-> (ReplicationSlotType -> String)
-> ([ReplicationSlotType] -> ShowS)
-> Show ReplicationSlotType
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [ReplicationSlotType] -> ShowS
$cshowList :: [ReplicationSlotType] -> ShowS
show :: ReplicationSlotType -> String
$cshow :: ReplicationSlotType -> String
showsPrec :: Int -> ReplicationSlotType -> ShowS
$cshowsPrec :: Int -> ReplicationSlotType -> ShowS
Show)

parseSlotType :: ByteString -> ReplicationSlotType
parseSlotType :: ByteString -> ReplicationSlotType
parseSlotType ByteString
"logical"  = ReplicationSlotType
Logical
parseSlotType ByteString
"physical" = ReplicationSlotType
Physical
parseSlotType ByteString
_          = ReplicationSlotType
UnknownSlotType

data ReplicationSlotActive = Active | Inactive
  deriving (ReplicationSlotActive -> ReplicationSlotActive -> Bool
(ReplicationSlotActive -> ReplicationSlotActive -> Bool)
-> (ReplicationSlotActive -> ReplicationSlotActive -> Bool)
-> Eq ReplicationSlotActive
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: ReplicationSlotActive -> ReplicationSlotActive -> Bool
$c/= :: ReplicationSlotActive -> ReplicationSlotActive -> Bool
== :: ReplicationSlotActive -> ReplicationSlotActive -> Bool
$c== :: ReplicationSlotActive -> ReplicationSlotActive -> Bool
Eq, Int -> ReplicationSlotActive -> ShowS
[ReplicationSlotActive] -> ShowS
ReplicationSlotActive -> String
(Int -> ReplicationSlotActive -> ShowS)
-> (ReplicationSlotActive -> String)
-> ([ReplicationSlotActive] -> ShowS)
-> Show ReplicationSlotActive
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [ReplicationSlotActive] -> ShowS
$cshowList :: [ReplicationSlotActive] -> ShowS
show :: ReplicationSlotActive -> String
$cshow :: ReplicationSlotActive -> String
showsPrec :: Int -> ReplicationSlotActive -> ShowS
$cshowsPrec :: Int -> ReplicationSlotActive -> ShowS
Show)

parseSlotActive :: ByteString -> ReplicationSlotActive
parseSlotActive :: ByteString -> ReplicationSlotActive
parseSlotActive ByteString
"t" = ReplicationSlotActive
Active
parseSlotActive ByteString
"f" = ReplicationSlotActive
Inactive
parseSlotActive ByteString
_   = ReplicationSlotActive
Inactive

createReplicationSlotCommand :: ReplicantConnection -> ByteString -> IO ByteString
createReplicationSlotCommand :: ReplicantConnection -> ByteString -> IO ByteString
createReplicationSlotCommand ReplicantConnection
conn ByteString
slotName = do
  Maybe ByteString
escapedName <- Connection -> ByteString -> IO (Maybe ByteString)
escapeIdentifier (ReplicantConnection -> Connection
getConnection ReplicantConnection
conn) ByteString
slotName
  case Maybe ByteString
escapedName of
    Maybe ByteString
Nothing -> ReplicantException -> IO ByteString
forall e a. Exception e => e -> IO a
throwIO (ReplicantException -> IO ByteString)
-> ReplicantException -> IO ByteString
forall a b. (a -> b) -> a -> b
$ String -> ReplicantException
ReplicantException (String -> ReplicantException) -> String -> ReplicantException
forall a b. (a -> b) -> a -> b
$ String
"Invalid slot name: " String -> ShowS
forall a. [a] -> [a] -> [a]
++ ByteString -> String
forall a. Show a => a -> String
show ByteString
slotName
    Just ByteString
escaped ->
      ByteString -> IO ByteString
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ByteString -> IO ByteString) -> ByteString -> IO ByteString
forall a b. (a -> b) -> a -> b
$
      ByteString -> [ByteString] -> ByteString
B.intercalate
      ByteString
""
      [ ByteString
"CREATE_REPLICATION_SLOT"
      , ByteString
escaped
      , ByteString
"LOGICAL wal2json"
      ]

-- | Create a replication slot using synchronous query execution.
--
-- May throw an exception if the command fails.
createReplicationSlotSync :: ReplicantConnection -> ByteString -> IO ReplicationSlotInfo
createReplicationSlotSync :: ReplicantConnection -> ByteString -> IO ReplicationSlotInfo
createReplicationSlotSync ReplicantConnection
conn ByteString
slotName = do
  ByteString
createReplicationSlotQuery <- ReplicantConnection -> ByteString -> IO ByteString
createReplicationSlotCommand ReplicantConnection
conn ByteString
slotName
  Maybe Result
result <- Connection -> ByteString -> IO (Maybe Result)
exec (ReplicantConnection -> Connection
getConnection ReplicantConnection
conn) ByteString
createReplicationSlotQuery
  case Maybe Result
result of
    Just Result
r -> do
      ExecStatus
resultStatus <- Result -> IO ExecStatus
resultStatus Result
r
      case ExecStatus
resultStatus of
        ExecStatus
TuplesOk -> do
          Maybe ByteString
sName           <- Result -> Row -> Column -> IO (Maybe ByteString)
getvalue' Result
r (Integer -> Row
forall a. Integral a => a -> Row
toRow Integer
0) (Integer -> Column
forall a. Integral a => a -> Column
toColumn Integer
0)
          Maybe ByteString
consistentPoint <- Result -> Row -> Column -> IO (Maybe ByteString)
getvalue' Result
r (Integer -> Row
forall a. Integral a => a -> Row
toRow Integer
0) (Integer -> Column
forall a. Integral a => a -> Column
toColumn Integer
1)
          Maybe ByteString
outputPlugin    <- Result -> Row -> Column -> IO (Maybe ByteString)
getvalue' Result
r (Integer -> Row
forall a. Integral a => a -> Row
toRow Integer
0) (Integer -> Column
forall a. Integral a => a -> Column
toColumn Integer
3)
          case (Maybe ByteString
sName, Maybe ByteString
consistentPoint, Maybe ByteString
outputPlugin) of
            (Just ByteString
s, Just ByteString
c, Just ByteString
op) ->
              case ByteString -> Either String LSN
fromByteString ByteString
c of
                Left String
_ -> ReplicantException -> IO ReplicationSlotInfo
forall e a. Exception e => e -> IO a
throwIO (ReplicantException -> IO ReplicationSlotInfo)
-> ReplicantException -> IO ReplicationSlotInfo
forall a b. (a -> b) -> a -> b
$ String -> ReplicantException
ReplicantException String
"createReplicationSlotSync: invalid LSN detected"
                Right LSN
lsn -> ReplicationSlotInfo -> IO ReplicationSlotInfo
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ReplicationSlotInfo -> IO ReplicationSlotInfo)
-> ReplicationSlotInfo -> IO ReplicationSlotInfo
forall a b. (a -> b) -> a -> b
$ ByteString
-> ByteString
-> ReplicationSlotType
-> ReplicationSlotActive
-> LSN
-> ReplicationSlotInfo
ReplicationSlotInfo ByteString
s ByteString
op ReplicationSlotType
Logical ReplicationSlotActive
Active LSN
lsn
            (Maybe ByteString, Maybe ByteString, Maybe ByteString)
_ -> do
              ByteString
err <- ByteString -> Maybe ByteString -> ByteString
forall a. a -> Maybe a -> a
fromMaybe ByteString
"createReplicationSlotSync: unknown error" (Maybe ByteString -> ByteString)
-> IO (Maybe ByteString) -> IO ByteString
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Connection -> IO (Maybe ByteString)
errorMessage (ReplicantConnection -> Connection
getConnection ReplicantConnection
conn)
              ReplicantException -> IO ReplicationSlotInfo
forall e a. Exception e => e -> IO a
throwIO (ReplicantException -> IO ReplicationSlotInfo)
-> ReplicantException -> IO ReplicationSlotInfo
forall a b. (a -> b) -> a -> b
$ String -> ReplicantException
ReplicantException (ByteString -> String
B8.unpack ByteString
err)
        ExecStatus
_ -> do
          ByteString
err <- ByteString -> Maybe ByteString -> ByteString
forall a. a -> Maybe a -> a
fromMaybe ByteString
"createReplicationSlotSync: unknown error" (Maybe ByteString -> ByteString)
-> IO (Maybe ByteString) -> IO ByteString
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Connection -> IO (Maybe ByteString)
errorMessage (ReplicantConnection -> Connection
getConnection ReplicantConnection
conn)
          ReplicantException -> IO ReplicationSlotInfo
forall e a. Exception e => e -> IO a
throwIO (ReplicantException -> IO ReplicationSlotInfo)
-> ReplicantException -> IO ReplicationSlotInfo
forall a b. (a -> b) -> a -> b
$ String -> ReplicantException
ReplicantException (ByteString -> String
B8.unpack ByteString
err)
    Maybe Result
_ -> do
      ByteString
err <- ByteString -> Maybe ByteString -> ByteString
forall a. a -> Maybe a -> a
fromMaybe ByteString
"createReplicationSlotSync: unknown error" (Maybe ByteString -> ByteString)
-> IO (Maybe ByteString) -> IO ByteString
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Connection -> IO (Maybe ByteString)
errorMessage (ReplicantConnection -> Connection
getConnection ReplicantConnection
conn)
      ReplicantException -> IO ReplicationSlotInfo
forall e a. Exception e => e -> IO a
throwIO (ReplicantException -> IO ReplicationSlotInfo)
-> ReplicantException -> IO ReplicationSlotInfo
forall a b. (a -> b) -> a -> b
$ String -> ReplicantException
ReplicantException (ByteString -> String
B8.unpack ByteString
err)

getReplicationSlotInfoCommand :: ReplicantConnection -> ByteString -> IO ByteString
getReplicationSlotInfoCommand :: ReplicantConnection -> ByteString -> IO ByteString
getReplicationSlotInfoCommand ReplicantConnection
conn ByteString
slotName = do
  Maybe ByteString
escapedName <- Connection -> ByteString -> IO (Maybe ByteString)
escapeStringConn (ReplicantConnection -> Connection
getConnection ReplicantConnection
conn) ByteString
slotName
  case Maybe ByteString
escapedName of
    Maybe ByteString
Nothing -> ReplicantException -> IO ByteString
forall e a. Exception e => e -> IO a
throwIO (ReplicantException -> IO ByteString)
-> ReplicantException -> IO ByteString
forall a b. (a -> b) -> a -> b
$ String -> ReplicantException
ReplicantException (String -> ReplicantException) -> String -> ReplicantException
forall a b. (a -> b) -> a -> b
$ String
"Invalid slot name: " String -> ShowS
forall a. [a] -> [a] -> [a]
++ ByteString -> String
forall a. Show a => a -> String
show ByteString
slotName
    Just ByteString
escaped ->
      ByteString -> IO ByteString
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ByteString -> IO ByteString) -> ByteString -> IO ByteString
forall a b. (a -> b) -> a -> b
$
      ByteString -> [ByteString] -> ByteString
B.intercalate
      ByteString
""
      [ ByteString
"select slot_name, plugin, slot_type, active, restart_lsn from pg_replication_slots where slot_name = '"
      , ByteString
escaped
      , ByteString
"';"
      ]

-- | Get information about an existing replication slot.  Returns
-- @Nothing@ when the requested slot cannot be found.
--
-- May throw an exception if the command query fails.
getReplicationSlotSync :: ReplicantConnection -> ByteString -> IO (Maybe ReplicationSlotInfo)
getReplicationSlotSync :: ReplicantConnection -> ByteString -> IO (Maybe ReplicationSlotInfo)
getReplicationSlotSync ReplicantConnection
conn ByteString
slotName = do
  ByteString
replicationSlotInfoQuery <- ReplicantConnection -> ByteString -> IO ByteString
getReplicationSlotInfoCommand ReplicantConnection
conn ByteString
slotName
  Maybe Result
result <- Connection -> ByteString -> IO (Maybe Result)
exec (ReplicantConnection -> Connection
getConnection ReplicantConnection
conn) ByteString
replicationSlotInfoQuery
  case Maybe Result
result of
    Just Result
r -> do
      ExecStatus
resultStatus <- Result -> IO ExecStatus
resultStatus Result
r
      case ExecStatus
resultStatus of
        ExecStatus
TuplesOk -> do
          Row
nRows <- Result -> IO Row
ntuples Result
r
          if Row
nRows Row -> Row -> Bool
forall a. Eq a => a -> a -> Bool
== Row
0
            then Maybe ReplicationSlotInfo -> IO (Maybe ReplicationSlotInfo)
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe ReplicationSlotInfo
forall a. Maybe a
Nothing
            else do
            Maybe ByteString
slotName    <- Result -> Row -> Column -> IO (Maybe ByteString)
getvalue' Result
r (Integer -> Row
forall a. Integral a => a -> Row
toRow Integer
0) (Integer -> Column
forall a. Integral a => a -> Column
toColumn Integer
0)
            Maybe ByteString
slotPlugin  <- Result -> Row -> Column -> IO (Maybe ByteString)
getvalue' Result
r (Integer -> Row
forall a. Integral a => a -> Row
toRow Integer
0) (Integer -> Column
forall a. Integral a => a -> Column
toColumn Integer
1)
            Maybe ByteString
slotType    <- Result -> Row -> Column -> IO (Maybe ByteString)
getvalue' Result
r (Integer -> Row
forall a. Integral a => a -> Row
toRow Integer
0) (Integer -> Column
forall a. Integral a => a -> Column
toColumn Integer
2)
            Maybe ByteString
slotActive  <- Result -> Row -> Column -> IO (Maybe ByteString)
getvalue' Result
r (Integer -> Row
forall a. Integral a => a -> Row
toRow Integer
0) (Integer -> Column
forall a. Integral a => a -> Column
toColumn Integer
3)
            Maybe ByteString
slotRestart <- Result -> Row -> Column -> IO (Maybe ByteString)
getvalue' Result
r (Integer -> Row
forall a. Integral a => a -> Row
toRow Integer
0) (Integer -> Column
forall a. Integral a => a -> Column
toColumn Integer
4)
            case (Maybe ByteString
slotName, Maybe ByteString
slotPlugin, Maybe ByteString
slotType, Maybe ByteString
slotActive, Maybe ByteString
slotRestart) of
              (Just ByteString
n, Just ByteString
p, Just ByteString
t, Just ByteString
a, Just ByteString
restart) -> do
                case ByteString -> Either String LSN
fromByteString ByteString
restart of
                  Left String
_ -> Maybe ReplicationSlotInfo -> IO (Maybe ReplicationSlotInfo)
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe ReplicationSlotInfo
forall a. Maybe a
Nothing -- TODO: this shouldn't happen...
                  Right LSN
lsn -> Maybe ReplicationSlotInfo -> IO (Maybe ReplicationSlotInfo)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe ReplicationSlotInfo -> IO (Maybe ReplicationSlotInfo))
-> Maybe ReplicationSlotInfo -> IO (Maybe ReplicationSlotInfo)
forall a b. (a -> b) -> a -> b
$ ReplicationSlotInfo -> Maybe ReplicationSlotInfo
forall a. a -> Maybe a
Just (ReplicationSlotInfo -> Maybe ReplicationSlotInfo)
-> ReplicationSlotInfo -> Maybe ReplicationSlotInfo
forall a b. (a -> b) -> a -> b
$ ByteString
-> ByteString
-> ReplicationSlotType
-> ReplicationSlotActive
-> LSN
-> ReplicationSlotInfo
ReplicationSlotInfo ByteString
n ByteString
p (ByteString -> ReplicationSlotType
parseSlotType ByteString
t) (ByteString -> ReplicationSlotActive
parseSlotActive ByteString
a) LSN
lsn
              (Maybe ByteString, Maybe ByteString, Maybe ByteString,
 Maybe ByteString, Maybe ByteString)
_ ->  Maybe ReplicationSlotInfo -> IO (Maybe ReplicationSlotInfo)
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe ReplicationSlotInfo
forall a. Maybe a
Nothing
        ExecStatus
_ -> Maybe ReplicationSlotInfo -> IO (Maybe ReplicationSlotInfo)
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe ReplicationSlotInfo
forall a. Maybe a
Nothing
    Maybe Result
_ -> do
      ByteString
err <- ByteString -> Maybe ByteString -> ByteString
forall a. a -> Maybe a -> a
fromMaybe ByteString
"getReplicationSlotSync: unknown error" (Maybe ByteString -> ByteString)
-> IO (Maybe ByteString) -> IO ByteString
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Connection -> IO (Maybe ByteString)
errorMessage (ReplicantConnection -> Connection
getConnection ReplicantConnection
conn)
      ReplicantException -> IO (Maybe ReplicationSlotInfo)
forall e a. Exception e => e -> IO a
throwIO (ReplicantException -> IO (Maybe ReplicationSlotInfo))
-> ReplicantException -> IO (Maybe ReplicationSlotInfo)
forall a b. (a -> b) -> a -> b
$ String -> ReplicantException
ReplicantException (ByteString -> String
B8.unpack ByteString
err)

-- | Create replication slot or retrieve an existing slot.
--
-- Can throw exceptions from @getReplicationSlotSync@ or
-- @createReplicationSlotSync@.
setupReplicationSlot :: ReplicantConnection -> ByteString -> IO ReplicationSlotInfo
setupReplicationSlot :: ReplicantConnection -> ByteString -> IO ReplicationSlotInfo
setupReplicationSlot ReplicantConnection
conn ByteString
slotName = do
  Maybe ReplicationSlotInfo
maybeSlot <- ReplicantConnection -> ByteString -> IO (Maybe ReplicationSlotInfo)
getReplicationSlotSync ReplicantConnection
conn ByteString
slotName
  case Maybe ReplicationSlotInfo
maybeSlot of
    Just ReplicationSlotInfo
slot -> ReplicationSlotInfo -> IO ReplicationSlotInfo
forall (f :: * -> *) a. Applicative f => a -> f a
pure ReplicationSlotInfo
slot
    Maybe ReplicationSlotInfo
Nothing   -> ReplicantConnection -> ByteString -> IO ReplicationSlotInfo
createReplicationSlotSync ReplicantConnection
conn ByteString
slotName