{-# LANGUAGE Strict #-}
module Database.PostgreSQL.Replicant.ReplicationSlot where
import Control.Exception
import Data.ByteString (ByteString)
import Data.Maybe
import qualified Data.ByteString as B
import qualified Data.ByteString.Char8 as B8
import Database.PostgreSQL.LibPQ
import Database.PostgreSQL.Replicant.Connection
import Database.PostgreSQL.Replicant.Exception
import Database.PostgreSQL.Replicant.Types.Lsn
data ReplicationSlotInfo
= ReplicationSlotInfo
{ ReplicationSlotInfo -> ByteString
slotName :: ByteString
, ReplicationSlotInfo -> ByteString
slotPlugin :: ByteString
, ReplicationSlotInfo -> ReplicationSlotType
slotType :: ReplicationSlotType
, ReplicationSlotInfo -> ReplicationSlotActive
slotActive :: ReplicationSlotActive
, ReplicationSlotInfo -> LSN
slotRestart :: LSN
}
deriving (ReplicationSlotInfo -> ReplicationSlotInfo -> Bool
(ReplicationSlotInfo -> ReplicationSlotInfo -> Bool)
-> (ReplicationSlotInfo -> ReplicationSlotInfo -> Bool)
-> Eq ReplicationSlotInfo
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: ReplicationSlotInfo -> ReplicationSlotInfo -> Bool
$c/= :: ReplicationSlotInfo -> ReplicationSlotInfo -> Bool
== :: ReplicationSlotInfo -> ReplicationSlotInfo -> Bool
$c== :: ReplicationSlotInfo -> ReplicationSlotInfo -> Bool
Eq, Int -> ReplicationSlotInfo -> ShowS
[ReplicationSlotInfo] -> ShowS
ReplicationSlotInfo -> String
(Int -> ReplicationSlotInfo -> ShowS)
-> (ReplicationSlotInfo -> String)
-> ([ReplicationSlotInfo] -> ShowS)
-> Show ReplicationSlotInfo
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [ReplicationSlotInfo] -> ShowS
$cshowList :: [ReplicationSlotInfo] -> ShowS
show :: ReplicationSlotInfo -> String
$cshow :: ReplicationSlotInfo -> String
showsPrec :: Int -> ReplicationSlotInfo -> ShowS
$cshowsPrec :: Int -> ReplicationSlotInfo -> ShowS
Show)
data ReplicationSlotType = Logical | Physical | UnknownSlotType
deriving (ReplicationSlotType -> ReplicationSlotType -> Bool
(ReplicationSlotType -> ReplicationSlotType -> Bool)
-> (ReplicationSlotType -> ReplicationSlotType -> Bool)
-> Eq ReplicationSlotType
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: ReplicationSlotType -> ReplicationSlotType -> Bool
$c/= :: ReplicationSlotType -> ReplicationSlotType -> Bool
== :: ReplicationSlotType -> ReplicationSlotType -> Bool
$c== :: ReplicationSlotType -> ReplicationSlotType -> Bool
Eq, Int -> ReplicationSlotType -> ShowS
[ReplicationSlotType] -> ShowS
ReplicationSlotType -> String
(Int -> ReplicationSlotType -> ShowS)
-> (ReplicationSlotType -> String)
-> ([ReplicationSlotType] -> ShowS)
-> Show ReplicationSlotType
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [ReplicationSlotType] -> ShowS
$cshowList :: [ReplicationSlotType] -> ShowS
show :: ReplicationSlotType -> String
$cshow :: ReplicationSlotType -> String
showsPrec :: Int -> ReplicationSlotType -> ShowS
$cshowsPrec :: Int -> ReplicationSlotType -> ShowS
Show)
parseSlotType :: ByteString -> ReplicationSlotType
parseSlotType :: ByteString -> ReplicationSlotType
parseSlotType ByteString
"logical" = ReplicationSlotType
Logical
parseSlotType ByteString
"physical" = ReplicationSlotType
Physical
parseSlotType ByteString
_ = ReplicationSlotType
UnknownSlotType
data ReplicationSlotActive = Active | Inactive
deriving (ReplicationSlotActive -> ReplicationSlotActive -> Bool
(ReplicationSlotActive -> ReplicationSlotActive -> Bool)
-> (ReplicationSlotActive -> ReplicationSlotActive -> Bool)
-> Eq ReplicationSlotActive
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: ReplicationSlotActive -> ReplicationSlotActive -> Bool
$c/= :: ReplicationSlotActive -> ReplicationSlotActive -> Bool
== :: ReplicationSlotActive -> ReplicationSlotActive -> Bool
$c== :: ReplicationSlotActive -> ReplicationSlotActive -> Bool
Eq, Int -> ReplicationSlotActive -> ShowS
[ReplicationSlotActive] -> ShowS
ReplicationSlotActive -> String
(Int -> ReplicationSlotActive -> ShowS)
-> (ReplicationSlotActive -> String)
-> ([ReplicationSlotActive] -> ShowS)
-> Show ReplicationSlotActive
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [ReplicationSlotActive] -> ShowS
$cshowList :: [ReplicationSlotActive] -> ShowS
show :: ReplicationSlotActive -> String
$cshow :: ReplicationSlotActive -> String
showsPrec :: Int -> ReplicationSlotActive -> ShowS
$cshowsPrec :: Int -> ReplicationSlotActive -> ShowS
Show)
parseSlotActive :: ByteString -> ReplicationSlotActive
parseSlotActive :: ByteString -> ReplicationSlotActive
parseSlotActive ByteString
"t" = ReplicationSlotActive
Active
parseSlotActive ByteString
"f" = ReplicationSlotActive
Inactive
parseSlotActive ByteString
_ = ReplicationSlotActive
Inactive
createReplicationSlotCommand :: ReplicantConnection -> ByteString -> IO ByteString
createReplicationSlotCommand :: ReplicantConnection -> ByteString -> IO ByteString
createReplicationSlotCommand ReplicantConnection
conn ByteString
slotName = do
Maybe ByteString
escapedName <- Connection -> ByteString -> IO (Maybe ByteString)
escapeIdentifier (ReplicantConnection -> Connection
getConnection ReplicantConnection
conn) ByteString
slotName
case Maybe ByteString
escapedName of
Maybe ByteString
Nothing -> ReplicantException -> IO ByteString
forall e a. Exception e => e -> IO a
throwIO (ReplicantException -> IO ByteString)
-> ReplicantException -> IO ByteString
forall a b. (a -> b) -> a -> b
$ String -> ReplicantException
ReplicantException (String -> ReplicantException) -> String -> ReplicantException
forall a b. (a -> b) -> a -> b
$ String
"Invalid slot name: " String -> ShowS
forall a. [a] -> [a] -> [a]
++ ByteString -> String
forall a. Show a => a -> String
show ByteString
slotName
Just ByteString
escaped ->
ByteString -> IO ByteString
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ByteString -> IO ByteString) -> ByteString -> IO ByteString
forall a b. (a -> b) -> a -> b
$
ByteString -> [ByteString] -> ByteString
B.intercalate
ByteString
""
[ ByteString
"CREATE_REPLICATION_SLOT"
, ByteString
escaped
, ByteString
"LOGICAL wal2json"
]
createReplicationSlotSync :: ReplicantConnection -> ByteString -> IO ReplicationSlotInfo
createReplicationSlotSync :: ReplicantConnection -> ByteString -> IO ReplicationSlotInfo
createReplicationSlotSync ReplicantConnection
conn ByteString
slotName = do
ByteString
createReplicationSlotQuery <- ReplicantConnection -> ByteString -> IO ByteString
createReplicationSlotCommand ReplicantConnection
conn ByteString
slotName
Maybe Result
result <- Connection -> ByteString -> IO (Maybe Result)
exec (ReplicantConnection -> Connection
getConnection ReplicantConnection
conn) ByteString
createReplicationSlotQuery
case Maybe Result
result of
Just Result
r -> do
ExecStatus
resultStatus <- Result -> IO ExecStatus
resultStatus Result
r
case ExecStatus
resultStatus of
ExecStatus
TuplesOk -> do
Maybe ByteString
sName <- Result -> Row -> Column -> IO (Maybe ByteString)
getvalue' Result
r (Integer -> Row
forall a. Integral a => a -> Row
toRow Integer
0) (Integer -> Column
forall a. Integral a => a -> Column
toColumn Integer
0)
Maybe ByteString
consistentPoint <- Result -> Row -> Column -> IO (Maybe ByteString)
getvalue' Result
r (Integer -> Row
forall a. Integral a => a -> Row
toRow Integer
0) (Integer -> Column
forall a. Integral a => a -> Column
toColumn Integer
1)
Maybe ByteString
outputPlugin <- Result -> Row -> Column -> IO (Maybe ByteString)
getvalue' Result
r (Integer -> Row
forall a. Integral a => a -> Row
toRow Integer
0) (Integer -> Column
forall a. Integral a => a -> Column
toColumn Integer
3)
case (Maybe ByteString
sName, Maybe ByteString
consistentPoint, Maybe ByteString
outputPlugin) of
(Just ByteString
s, Just ByteString
c, Just ByteString
op) ->
case ByteString -> Either String LSN
fromByteString ByteString
c of
Left String
_ -> ReplicantException -> IO ReplicationSlotInfo
forall e a. Exception e => e -> IO a
throwIO (ReplicantException -> IO ReplicationSlotInfo)
-> ReplicantException -> IO ReplicationSlotInfo
forall a b. (a -> b) -> a -> b
$ String -> ReplicantException
ReplicantException String
"createReplicationSlotSync: invalid LSN detected"
Right LSN
lsn -> ReplicationSlotInfo -> IO ReplicationSlotInfo
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ReplicationSlotInfo -> IO ReplicationSlotInfo)
-> ReplicationSlotInfo -> IO ReplicationSlotInfo
forall a b. (a -> b) -> a -> b
$ ByteString
-> ByteString
-> ReplicationSlotType
-> ReplicationSlotActive
-> LSN
-> ReplicationSlotInfo
ReplicationSlotInfo ByteString
s ByteString
op ReplicationSlotType
Logical ReplicationSlotActive
Active LSN
lsn
(Maybe ByteString, Maybe ByteString, Maybe ByteString)
_ -> do
ByteString
err <- ByteString -> Maybe ByteString -> ByteString
forall a. a -> Maybe a -> a
fromMaybe ByteString
"createReplicationSlotSync: unknown error" (Maybe ByteString -> ByteString)
-> IO (Maybe ByteString) -> IO ByteString
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Connection -> IO (Maybe ByteString)
errorMessage (ReplicantConnection -> Connection
getConnection ReplicantConnection
conn)
ReplicantException -> IO ReplicationSlotInfo
forall e a. Exception e => e -> IO a
throwIO (ReplicantException -> IO ReplicationSlotInfo)
-> ReplicantException -> IO ReplicationSlotInfo
forall a b. (a -> b) -> a -> b
$ String -> ReplicantException
ReplicantException (ByteString -> String
B8.unpack ByteString
err)
ExecStatus
_ -> do
ByteString
err <- ByteString -> Maybe ByteString -> ByteString
forall a. a -> Maybe a -> a
fromMaybe ByteString
"createReplicationSlotSync: unknown error" (Maybe ByteString -> ByteString)
-> IO (Maybe ByteString) -> IO ByteString
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Connection -> IO (Maybe ByteString)
errorMessage (ReplicantConnection -> Connection
getConnection ReplicantConnection
conn)
ReplicantException -> IO ReplicationSlotInfo
forall e a. Exception e => e -> IO a
throwIO (ReplicantException -> IO ReplicationSlotInfo)
-> ReplicantException -> IO ReplicationSlotInfo
forall a b. (a -> b) -> a -> b
$ String -> ReplicantException
ReplicantException (ByteString -> String
B8.unpack ByteString
err)
Maybe Result
_ -> do
ByteString
err <- ByteString -> Maybe ByteString -> ByteString
forall a. a -> Maybe a -> a
fromMaybe ByteString
"createReplicationSlotSync: unknown error" (Maybe ByteString -> ByteString)
-> IO (Maybe ByteString) -> IO ByteString
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Connection -> IO (Maybe ByteString)
errorMessage (ReplicantConnection -> Connection
getConnection ReplicantConnection
conn)
ReplicantException -> IO ReplicationSlotInfo
forall e a. Exception e => e -> IO a
throwIO (ReplicantException -> IO ReplicationSlotInfo)
-> ReplicantException -> IO ReplicationSlotInfo
forall a b. (a -> b) -> a -> b
$ String -> ReplicantException
ReplicantException (ByteString -> String
B8.unpack ByteString
err)
getReplicationSlotInfoCommand :: ReplicantConnection -> ByteString -> IO ByteString
getReplicationSlotInfoCommand :: ReplicantConnection -> ByteString -> IO ByteString
getReplicationSlotInfoCommand ReplicantConnection
conn ByteString
slotName = do
Maybe ByteString
escapedName <- Connection -> ByteString -> IO (Maybe ByteString)
escapeStringConn (ReplicantConnection -> Connection
getConnection ReplicantConnection
conn) ByteString
slotName
case Maybe ByteString
escapedName of
Maybe ByteString
Nothing -> ReplicantException -> IO ByteString
forall e a. Exception e => e -> IO a
throwIO (ReplicantException -> IO ByteString)
-> ReplicantException -> IO ByteString
forall a b. (a -> b) -> a -> b
$ String -> ReplicantException
ReplicantException (String -> ReplicantException) -> String -> ReplicantException
forall a b. (a -> b) -> a -> b
$ String
"Invalid slot name: " String -> ShowS
forall a. [a] -> [a] -> [a]
++ ByteString -> String
forall a. Show a => a -> String
show ByteString
slotName
Just ByteString
escaped ->
ByteString -> IO ByteString
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ByteString -> IO ByteString) -> ByteString -> IO ByteString
forall a b. (a -> b) -> a -> b
$
ByteString -> [ByteString] -> ByteString
B.intercalate
ByteString
""
[ ByteString
"select slot_name, plugin, slot_type, active, restart_lsn from pg_replication_slots where slot_name = '"
, ByteString
escaped
, ByteString
"';"
]
getReplicationSlotSync :: ReplicantConnection -> ByteString -> IO (Maybe ReplicationSlotInfo)
getReplicationSlotSync :: ReplicantConnection -> ByteString -> IO (Maybe ReplicationSlotInfo)
getReplicationSlotSync ReplicantConnection
conn ByteString
slotName = do
ByteString
replicationSlotInfoQuery <- ReplicantConnection -> ByteString -> IO ByteString
getReplicationSlotInfoCommand ReplicantConnection
conn ByteString
slotName
Maybe Result
result <- Connection -> ByteString -> IO (Maybe Result)
exec (ReplicantConnection -> Connection
getConnection ReplicantConnection
conn) ByteString
replicationSlotInfoQuery
case Maybe Result
result of
Just Result
r -> do
ExecStatus
resultStatus <- Result -> IO ExecStatus
resultStatus Result
r
case ExecStatus
resultStatus of
ExecStatus
TuplesOk -> do
Row
nRows <- Result -> IO Row
ntuples Result
r
if Row
nRows Row -> Row -> Bool
forall a. Eq a => a -> a -> Bool
== Row
0
then Maybe ReplicationSlotInfo -> IO (Maybe ReplicationSlotInfo)
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe ReplicationSlotInfo
forall a. Maybe a
Nothing
else do
Maybe ByteString
slotName <- Result -> Row -> Column -> IO (Maybe ByteString)
getvalue' Result
r (Integer -> Row
forall a. Integral a => a -> Row
toRow Integer
0) (Integer -> Column
forall a. Integral a => a -> Column
toColumn Integer
0)
Maybe ByteString
slotPlugin <- Result -> Row -> Column -> IO (Maybe ByteString)
getvalue' Result
r (Integer -> Row
forall a. Integral a => a -> Row
toRow Integer
0) (Integer -> Column
forall a. Integral a => a -> Column
toColumn Integer
1)
Maybe ByteString
slotType <- Result -> Row -> Column -> IO (Maybe ByteString)
getvalue' Result
r (Integer -> Row
forall a. Integral a => a -> Row
toRow Integer
0) (Integer -> Column
forall a. Integral a => a -> Column
toColumn Integer
2)
Maybe ByteString
slotActive <- Result -> Row -> Column -> IO (Maybe ByteString)
getvalue' Result
r (Integer -> Row
forall a. Integral a => a -> Row
toRow Integer
0) (Integer -> Column
forall a. Integral a => a -> Column
toColumn Integer
3)
Maybe ByteString
slotRestart <- Result -> Row -> Column -> IO (Maybe ByteString)
getvalue' Result
r (Integer -> Row
forall a. Integral a => a -> Row
toRow Integer
0) (Integer -> Column
forall a. Integral a => a -> Column
toColumn Integer
4)
case (Maybe ByteString
slotName, Maybe ByteString
slotPlugin, Maybe ByteString
slotType, Maybe ByteString
slotActive, Maybe ByteString
slotRestart) of
(Just ByteString
n, Just ByteString
p, Just ByteString
t, Just ByteString
a, Just ByteString
restart) -> do
case ByteString -> Either String LSN
fromByteString ByteString
restart of
Left String
_ -> Maybe ReplicationSlotInfo -> IO (Maybe ReplicationSlotInfo)
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe ReplicationSlotInfo
forall a. Maybe a
Nothing
Right LSN
lsn -> Maybe ReplicationSlotInfo -> IO (Maybe ReplicationSlotInfo)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe ReplicationSlotInfo -> IO (Maybe ReplicationSlotInfo))
-> Maybe ReplicationSlotInfo -> IO (Maybe ReplicationSlotInfo)
forall a b. (a -> b) -> a -> b
$ ReplicationSlotInfo -> Maybe ReplicationSlotInfo
forall a. a -> Maybe a
Just (ReplicationSlotInfo -> Maybe ReplicationSlotInfo)
-> ReplicationSlotInfo -> Maybe ReplicationSlotInfo
forall a b. (a -> b) -> a -> b
$ ByteString
-> ByteString
-> ReplicationSlotType
-> ReplicationSlotActive
-> LSN
-> ReplicationSlotInfo
ReplicationSlotInfo ByteString
n ByteString
p (ByteString -> ReplicationSlotType
parseSlotType ByteString
t) (ByteString -> ReplicationSlotActive
parseSlotActive ByteString
a) LSN
lsn
(Maybe ByteString, Maybe ByteString, Maybe ByteString,
Maybe ByteString, Maybe ByteString)
_ -> Maybe ReplicationSlotInfo -> IO (Maybe ReplicationSlotInfo)
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe ReplicationSlotInfo
forall a. Maybe a
Nothing
ExecStatus
_ -> Maybe ReplicationSlotInfo -> IO (Maybe ReplicationSlotInfo)
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe ReplicationSlotInfo
forall a. Maybe a
Nothing
Maybe Result
_ -> do
ByteString
err <- ByteString -> Maybe ByteString -> ByteString
forall a. a -> Maybe a -> a
fromMaybe ByteString
"getReplicationSlotSync: unknown error" (Maybe ByteString -> ByteString)
-> IO (Maybe ByteString) -> IO ByteString
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Connection -> IO (Maybe ByteString)
errorMessage (ReplicantConnection -> Connection
getConnection ReplicantConnection
conn)
ReplicantException -> IO (Maybe ReplicationSlotInfo)
forall e a. Exception e => e -> IO a
throwIO (ReplicantException -> IO (Maybe ReplicationSlotInfo))
-> ReplicantException -> IO (Maybe ReplicationSlotInfo)
forall a b. (a -> b) -> a -> b
$ String -> ReplicantException
ReplicantException (ByteString -> String
B8.unpack ByteString
err)
setupReplicationSlot :: ReplicantConnection -> ByteString -> IO ReplicationSlotInfo
setupReplicationSlot :: ReplicantConnection -> ByteString -> IO ReplicationSlotInfo
setupReplicationSlot ReplicantConnection
conn ByteString
slotName = do
Maybe ReplicationSlotInfo
maybeSlot <- ReplicantConnection -> ByteString -> IO (Maybe ReplicationSlotInfo)
getReplicationSlotSync ReplicantConnection
conn ByteString
slotName
case Maybe ReplicationSlotInfo
maybeSlot of
Just ReplicationSlotInfo
slot -> ReplicationSlotInfo -> IO ReplicationSlotInfo
forall (f :: * -> *) a. Applicative f => a -> f a
pure ReplicationSlotInfo
slot
Maybe ReplicationSlotInfo
Nothing -> ReplicantConnection -> ByteString -> IO ReplicationSlotInfo
createReplicationSlotSync ReplicantConnection
conn ByteString
slotName