{-|
Module      : Database.MySQL.BinLog
Description : Binary protocol toolkit
Copyright   : (c) Winterland, 2016
License     : BSD
Maintainer  : drkoster@qq.com
Stability   : experimental
Portability : PORTABLE

This module provide tools for binlog listening and row based binlog decoding.
-}

module Database.MySQL.BinLog
    ( -- * binlog utilities
      SlaveID
    , BinLogTracker(..)
    , registerPesudoSlave
    , dumpBinLog
    , RowBinLogEvent(..)
    , decodeRowBinLogEvent
    -- * helpers
    , getLastBinLogTracker
    , isCheckSumEnabled
    , isSemiSyncEnabled
    -- * re-export
    , module Database.MySQL.BinLogProtocol.BinLogEvent
    , module Database.MySQL.BinLogProtocol.BinLogValue
    , module Database.MySQL.BinLogProtocol.BinLogMeta
    ) where

import           Control.Applicative
import           Control.Exception                         (throwIO)
import           Control.Monad
import           Data.Binary.Put
import           Data.ByteString                           (ByteString)
import           Data.IORef                                (IORef, newIORef,
                                                            readIORef,
                                                            writeIORef)
import           Data.Text.Encoding                        (encodeUtf8)
import           Data.Word
import           Database.MySQL.Base
import           Database.MySQL.BinLogProtocol.BinLogEvent
import           Database.MySQL.BinLogProtocol.BinLogMeta
import           Database.MySQL.BinLogProtocol.BinLogValue
import           Database.MySQL.Connection
import           GHC.Generics                              (Generic)
import           System.IO.Streams                         (InputStream)
import qualified System.IO.Streams                         as Stream

type SlaveID = Word32

-- | binlog filename and position to start listening.
--
data BinLogTracker = BinLogTracker
    { BinLogTracker -> ByteString
btFileName :: {-# UNPACK #-} !ByteString
    , BinLogTracker -> Word32
btNextPos  :: {-# UNPACK #-} !Word32
    } deriving (Int -> BinLogTracker -> ShowS
[BinLogTracker] -> ShowS
BinLogTracker -> String
(Int -> BinLogTracker -> ShowS)
-> (BinLogTracker -> String)
-> ([BinLogTracker] -> ShowS)
-> Show BinLogTracker
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [BinLogTracker] -> ShowS
$cshowList :: [BinLogTracker] -> ShowS
show :: BinLogTracker -> String
$cshow :: BinLogTracker -> String
showsPrec :: Int -> BinLogTracker -> ShowS
$cshowsPrec :: Int -> BinLogTracker -> ShowS
Show, BinLogTracker -> BinLogTracker -> Bool
(BinLogTracker -> BinLogTracker -> Bool)
-> (BinLogTracker -> BinLogTracker -> Bool) -> Eq BinLogTracker
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: BinLogTracker -> BinLogTracker -> Bool
$c/= :: BinLogTracker -> BinLogTracker -> Bool
== :: BinLogTracker -> BinLogTracker -> Bool
$c== :: BinLogTracker -> BinLogTracker -> Bool
Eq, (forall x. BinLogTracker -> Rep BinLogTracker x)
-> (forall x. Rep BinLogTracker x -> BinLogTracker)
-> Generic BinLogTracker
forall x. Rep BinLogTracker x -> BinLogTracker
forall x. BinLogTracker -> Rep BinLogTracker x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep BinLogTracker x -> BinLogTracker
$cfrom :: forall x. BinLogTracker -> Rep BinLogTracker x
Generic)

-- | Register a pesudo slave to master, although MySQL document suggests you should call this
-- before calling 'dumpBinLog', but it seems it's not really necessary.
--
registerPesudoSlave :: MySQLConn -> SlaveID -> IO OK
registerPesudoSlave :: MySQLConn -> Word32 -> IO OK
registerPesudoSlave MySQLConn
conn Word32
sid = MySQLConn -> Command -> IO OK
command MySQLConn
conn (Word32
-> ByteString
-> ByteString
-> ByteString
-> Word16
-> Word32
-> Word32
-> Command
COM_REGISTER_SLAVE Word32
sid ByteString
"" ByteString
"" ByteString
"" Word16
0 Word32
0 Word32
0)

-- | Setup binlog listening on given connection, during listening
-- the connection CAN NOT be used to do query, or an 'UnconsumedResultSet' will be thrown.
--
dumpBinLog :: MySQLConn               -- ^ connection to be listened
           -> SlaveID                 -- ^ a number for our pesudo slave.
           -> BinLogTracker           -- ^ binlog position
           -> Bool                    -- ^ if master support semi-ack, do we want to enable it?
                                      -- if master doesn't support, this parameter will be ignored.
           -> IO (FormatDescription, IORef ByteString, InputStream BinLogPacket)
                -- ^ 'FormatDescription', 'IORef' contains current binlog filename, 'BinLogPacket' stream.
dumpBinLog :: MySQLConn
-> Word32
-> BinLogTracker
-> Bool
-> IO
     (FormatDescription, IORef ByteString, InputStream BinLogPacket)
dumpBinLog conn :: MySQLConn
conn@(MySQLConn InputStream Packet
is Packet -> IO ()
wp IO ()
_ IORef Bool
consumed) Word32
sid (BinLogTracker ByteString
initfn Word32
initpos) Bool
wantAck = do
    MySQLConn -> IO ()
guardUnconsumed MySQLConn
conn
    Bool
checksum <- MySQLConn -> IO Bool
isCheckSumEnabled MySQLConn
conn
    Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
checksum (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ IO OK -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO OK -> IO ()) -> IO OK -> IO ()
forall a b. (a -> b) -> a -> b
$ MySQLConn -> Query -> IO OK
execute_ MySQLConn
conn Query
"SET @master_binlog_checksum = @@global.binlog_checksum"
    Bool
semiAck  <- MySQLConn -> IO Bool
isSemiSyncEnabled MySQLConn
conn
    let needAck :: Bool
needAck = Bool
semiAck Bool -> Bool -> Bool
&& Bool
wantAck
    Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
needAck (IO () -> IO ()) -> (IO OK -> IO ()) -> IO OK -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO OK -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO OK -> IO ()) -> IO OK -> IO ()
forall a b. (a -> b) -> a -> b
$ MySQLConn -> Query -> IO OK
execute_ MySQLConn
conn Query
"SET @rpl_semi_sync_slave = 1"
    Command -> (Packet -> IO ()) -> IO ()
writeCommand (Word32 -> Word16 -> Word32 -> ByteString -> Command
COM_BINLOG_DUMP Word32
initpos Word16
0x00 Word32
sid ByteString
initfn) Packet -> IO ()
wp
    IORef Bool -> Bool -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef Bool
consumed Bool
False

    BinLogPacket
rp <- IO (Maybe BinLogPacket) -> BinLogEventType -> IO BinLogPacket
skipToPacketT (Bool -> Bool -> InputStream Packet -> IO (Maybe BinLogPacket)
readBinLogPacket Bool
checksum Bool
needAck InputStream Packet
is) BinLogEventType
BINLOG_ROTATE_EVENT
    RotateEvent
re <- Get RotateEvent -> BinLogPacket -> IO RotateEvent
forall a. Get a -> BinLogPacket -> IO a
getFromBinLogPacket Get RotateEvent
getRotateEvent BinLogPacket
rp
    IORef ByteString
fref <- ByteString -> IO (IORef ByteString)
forall a. a -> IO (IORef a)
newIORef (RotateEvent -> ByteString
rFileName RotateEvent
re)

    BinLogPacket
p <- IO (Maybe BinLogPacket) -> BinLogEventType -> IO BinLogPacket
skipToPacketT (Bool -> Bool -> InputStream Packet -> IO (Maybe BinLogPacket)
readBinLogPacket Bool
checksum Bool
needAck InputStream Packet
is) BinLogEventType
BINLOG_FORMAT_DESCRIPTION_EVENT
    Bool
-> BinLogPacket -> IORef ByteString -> (Packet -> IO ()) -> IO ()
replyAck Bool
needAck BinLogPacket
p IORef ByteString
fref Packet -> IO ()
wp
    FormatDescription
fmt <- Get FormatDescription -> BinLogPacket -> IO FormatDescription
forall a. Get a -> BinLogPacket -> IO a
getFromBinLogPacket Get FormatDescription
getFormatDescription BinLogPacket
p

    InputStream BinLogPacket
es <- IO (Maybe BinLogPacket) -> IO (InputStream BinLogPacket)
forall a. IO (Maybe a) -> IO (InputStream a)
Stream.makeInputStream (IO (Maybe BinLogPacket) -> IO (InputStream BinLogPacket))
-> IO (Maybe BinLogPacket) -> IO (InputStream BinLogPacket)
forall a b. (a -> b) -> a -> b
$ do
        Maybe BinLogPacket
q <- Bool -> Bool -> InputStream Packet -> IO (Maybe BinLogPacket)
readBinLogPacket Bool
checksum Bool
needAck InputStream Packet
is
        case Maybe BinLogPacket
q of
            Maybe BinLogPacket
Nothing   -> IORef Bool -> Bool -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef Bool
consumed Bool
True IO () -> IO (Maybe BinLogPacket) -> IO (Maybe BinLogPacket)
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Maybe BinLogPacket -> IO (Maybe BinLogPacket)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe BinLogPacket
forall a. Maybe a
Nothing
            Just BinLogPacket
q' -> do Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (BinLogPacket -> BinLogEventType
blEventType BinLogPacket
q' BinLogEventType -> BinLogEventType -> Bool
forall a. Eq a => a -> a -> Bool
== BinLogEventType
BINLOG_ROTATE_EVENT) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
                                RotateEvent
e <- Get RotateEvent -> BinLogPacket -> IO RotateEvent
forall a. Get a -> BinLogPacket -> IO a
getFromBinLogPacket Get RotateEvent
getRotateEvent BinLogPacket
q'
                                IORef ByteString -> ByteString -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef' IORef ByteString
fref (RotateEvent -> ByteString
rFileName RotateEvent
e)
                          Bool
-> BinLogPacket -> IORef ByteString -> (Packet -> IO ()) -> IO ()
replyAck Bool
needAck BinLogPacket
q' IORef ByteString
fref Packet -> IO ()
wp
                          Maybe BinLogPacket -> IO (Maybe BinLogPacket)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe BinLogPacket
q
    (FormatDescription, IORef ByteString, InputStream BinLogPacket)
-> IO
     (FormatDescription, IORef ByteString, InputStream BinLogPacket)
forall (m :: * -> *) a. Monad m => a -> m a
return (FormatDescription
fmt, IORef ByteString
fref, InputStream BinLogPacket
es)
  where
    skipToPacketT :: IO (Maybe BinLogPacket) -> BinLogEventType -> IO BinLogPacket
skipToPacketT IO (Maybe BinLogPacket)
iop BinLogEventType
typ = do
        Maybe BinLogPacket
p <- IO (Maybe BinLogPacket)
iop
        case Maybe BinLogPacket
p of
            Just BinLogPacket
p' -> do
                if BinLogPacket -> BinLogEventType
blEventType BinLogPacket
p' BinLogEventType -> BinLogEventType -> Bool
forall a. Eq a => a -> a -> Bool
== BinLogEventType
typ then BinLogPacket -> IO BinLogPacket
forall (m :: * -> *) a. Monad m => a -> m a
return BinLogPacket
p' else IO (Maybe BinLogPacket) -> BinLogEventType -> IO BinLogPacket
skipToPacketT IO (Maybe BinLogPacket)
iop BinLogEventType
typ
            Maybe BinLogPacket
Nothing -> NetworkException -> IO BinLogPacket
forall e a. Exception e => e -> IO a
throwIO NetworkException
NetworkException

    replyAck :: Bool
-> BinLogPacket -> IORef ByteString -> (Packet -> IO ()) -> IO ()
replyAck Bool
needAck BinLogPacket
p IORef ByteString
fref Packet -> IO ()
wp' = Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool
needAck Bool -> Bool -> Bool
&& BinLogPacket -> Bool
blSemiAck BinLogPacket
p) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
        ByteString
fn <- IORef ByteString -> IO ByteString
forall a. IORef a -> IO a
readIORef IORef ByteString
fref
        Packet -> IO ()
wp' (Word64 -> ByteString -> Packet
makeSemiAckPacket (BinLogPacket -> Word64
blLogPos BinLogPacket
p) ByteString
fn)

    makeSemiAckPacket :: Word64 -> ByteString -> Packet
makeSemiAckPacket Word64
pos ByteString
fn = Word8 -> Put -> Packet
putToPacket Word8
0 (Put -> Packet) -> Put -> Packet
forall a b. (a -> b) -> a -> b
$ do
        Word8 -> Put
putWord8 Word8
0xEF      -- semi-ack
        Word64 -> Put
putWord64le Word64
pos
        ByteString -> Put
putByteString ByteString
fn

    readBinLogPacket :: Bool -> Bool -> InputStream Packet -> IO (Maybe BinLogPacket)
readBinLogPacket Bool
checksum Bool
needAck InputStream Packet
is' = do
        Packet
p <- InputStream Packet -> IO Packet
readPacket InputStream Packet
is'
        if  | Packet -> Bool
isOK Packet
p -> BinLogPacket -> Maybe BinLogPacket
forall a. a -> Maybe a
Just (BinLogPacket -> Maybe BinLogPacket)
-> IO BinLogPacket -> IO (Maybe BinLogPacket)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Get BinLogPacket -> Packet -> IO BinLogPacket
forall a. Get a -> Packet -> IO a
getFromPacket (Bool -> Bool -> Get BinLogPacket
getBinLogPacket Bool
checksum Bool
needAck) Packet
p
            | Packet -> Bool
isEOF Packet
p -> Maybe BinLogPacket -> IO (Maybe BinLogPacket)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe BinLogPacket
forall a. Maybe a
Nothing
            | Packet -> Bool
isERR Packet
p -> Packet -> IO ERR
forall a. Binary a => Packet -> IO a
decodeFromPacket Packet
p IO ERR
-> (ERR -> IO (Maybe BinLogPacket)) -> IO (Maybe BinLogPacket)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= ERRException -> IO (Maybe BinLogPacket)
forall e a. Exception e => e -> IO a
throwIO (ERRException -> IO (Maybe BinLogPacket))
-> (ERR -> ERRException) -> ERR -> IO (Maybe BinLogPacket)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ERR -> ERRException
ERRException

-- | Row based binlog event type.
--
-- It's recommended to enable row query event before 'dumpBinLog', so that you can get
-- 'RowQueryEvent' in row based binlog(it's important for detect a table change for example),
-- more information please refer <http://dev.mysql.com/doc/refman/5.7/en/replication-options-binary-log.html#sysvar_binlog_rows_query_log_events sysvar_binlog_rows_query_log_events>
--
-- A 'BinLogTracker' is included so that you can roll up your own HA solutions,
-- for example, writing the tracker to zookeeper when you done with an event.
--
-- The first 'Word32' field is a timestamp present when this event is logged.
--
data RowBinLogEvent
    = RowQueryEvent  {-# UNPACK #-} !Word32 !BinLogTracker !QueryEvent'
    | RowDeleteEvent {-# UNPACK #-} !Word32 !BinLogTracker !TableMapEvent !DeleteRowsEvent
    | RowWriteEvent  {-# UNPACK #-} !Word32 !BinLogTracker !TableMapEvent !WriteRowsEvent
    | RowUpdateEvent {-# UNPACK #-} !Word32 !BinLogTracker !TableMapEvent !UpdateRowsEvent
  deriving (Int -> RowBinLogEvent -> ShowS
[RowBinLogEvent] -> ShowS
RowBinLogEvent -> String
(Int -> RowBinLogEvent -> ShowS)
-> (RowBinLogEvent -> String)
-> ([RowBinLogEvent] -> ShowS)
-> Show RowBinLogEvent
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [RowBinLogEvent] -> ShowS
$cshowList :: [RowBinLogEvent] -> ShowS
show :: RowBinLogEvent -> String
$cshow :: RowBinLogEvent -> String
showsPrec :: Int -> RowBinLogEvent -> ShowS
$cshowsPrec :: Int -> RowBinLogEvent -> ShowS
Show, RowBinLogEvent -> RowBinLogEvent -> Bool
(RowBinLogEvent -> RowBinLogEvent -> Bool)
-> (RowBinLogEvent -> RowBinLogEvent -> Bool) -> Eq RowBinLogEvent
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: RowBinLogEvent -> RowBinLogEvent -> Bool
$c/= :: RowBinLogEvent -> RowBinLogEvent -> Bool
== :: RowBinLogEvent -> RowBinLogEvent -> Bool
$c== :: RowBinLogEvent -> RowBinLogEvent -> Bool
Eq, (forall x. RowBinLogEvent -> Rep RowBinLogEvent x)
-> (forall x. Rep RowBinLogEvent x -> RowBinLogEvent)
-> Generic RowBinLogEvent
forall x. Rep RowBinLogEvent x -> RowBinLogEvent
forall x. RowBinLogEvent -> Rep RowBinLogEvent x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep RowBinLogEvent x -> RowBinLogEvent
$cfrom :: forall x. RowBinLogEvent -> Rep RowBinLogEvent x
Generic)

-- | decode row based event from 'BinLogPacket' stream.
decodeRowBinLogEvent :: (FormatDescription, IORef ByteString, InputStream BinLogPacket)
                     -> IO (InputStream RowBinLogEvent)
decodeRowBinLogEvent :: (FormatDescription, IORef ByteString, InputStream BinLogPacket)
-> IO (InputStream RowBinLogEvent)
decodeRowBinLogEvent (FormatDescription
fd', IORef ByteString
fref', InputStream BinLogPacket
is') = IO (Maybe RowBinLogEvent) -> IO (InputStream RowBinLogEvent)
forall a. IO (Maybe a) -> IO (InputStream a)
Stream.makeInputStream (FormatDescription
-> IORef ByteString
-> InputStream BinLogPacket
-> IO (Maybe RowBinLogEvent)
loop FormatDescription
fd' IORef ByteString
fref' InputStream BinLogPacket
is')
  where
    loop :: FormatDescription
-> IORef ByteString
-> InputStream BinLogPacket
-> IO (Maybe RowBinLogEvent)
loop FormatDescription
fd IORef ByteString
fref InputStream BinLogPacket
is = do
        Maybe BinLogPacket
p <- InputStream BinLogPacket -> IO (Maybe BinLogPacket)
forall a. InputStream a -> IO (Maybe a)
Stream.read InputStream BinLogPacket
is
        case Maybe BinLogPacket
p of
            Maybe BinLogPacket
Nothing -> Maybe RowBinLogEvent -> IO (Maybe RowBinLogEvent)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe RowBinLogEvent
forall a. Maybe a
Nothing
            Just BinLogPacket
p' -> do
                let t :: BinLogEventType
t = BinLogPacket -> BinLogEventType
blEventType BinLogPacket
p'
                if  | BinLogEventType
t BinLogEventType -> BinLogEventType -> Bool
forall a. Eq a => a -> a -> Bool
== BinLogEventType
BINLOG_ROWS_QUERY_EVENT -> do
                        BinLogTracker
tr <- BinLogPacket -> IORef ByteString -> IO BinLogTracker
track BinLogPacket
p' IORef ByteString
fref
                        QueryEvent'
e <- Get QueryEvent' -> BinLogPacket -> IO QueryEvent'
forall a. Get a -> BinLogPacket -> IO a
getFromBinLogPacket Get QueryEvent'
getQueryEvent' BinLogPacket
p'
                        Maybe RowBinLogEvent -> IO (Maybe RowBinLogEvent)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (RowBinLogEvent -> Maybe RowBinLogEvent
forall a. a -> Maybe a
Just (Word32 -> BinLogTracker -> QueryEvent' -> RowBinLogEvent
RowQueryEvent (BinLogPacket -> Word32
blTimestamp BinLogPacket
p') BinLogTracker
tr QueryEvent'
e))
                    | BinLogEventType
t BinLogEventType -> BinLogEventType -> Bool
forall a. Eq a => a -> a -> Bool
== BinLogEventType
BINLOG_TABLE_MAP_EVENT -> do
                        TableMapEvent
tme <- Get TableMapEvent -> BinLogPacket -> IO TableMapEvent
forall a. Get a -> BinLogPacket -> IO a
getFromBinLogPacket (FormatDescription -> Get TableMapEvent
getTableMapEvent FormatDescription
fd) BinLogPacket
p'
                        Maybe BinLogPacket
q <- InputStream BinLogPacket -> IO (Maybe BinLogPacket)
forall a. InputStream a -> IO (Maybe a)
Stream.read InputStream BinLogPacket
is
                        case Maybe BinLogPacket
q of
                            Maybe BinLogPacket
Nothing -> Maybe RowBinLogEvent -> IO (Maybe RowBinLogEvent)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe RowBinLogEvent
forall a. Maybe a
Nothing
                            Just BinLogPacket
q' -> do
                                let u :: BinLogEventType
u = BinLogPacket -> BinLogEventType
blEventType BinLogPacket
q'
                                if  | BinLogEventType
u BinLogEventType -> BinLogEventType -> Bool
forall a. Eq a => a -> a -> Bool
== BinLogEventType
BINLOG_WRITE_ROWS_EVENTv1 Bool -> Bool -> Bool
|| BinLogEventType
u BinLogEventType -> BinLogEventType -> Bool
forall a. Eq a => a -> a -> Bool
== BinLogEventType
BINLOG_WRITE_ROWS_EVENTv2 -> do
                                        BinLogTracker
tr <- BinLogPacket -> IORef ByteString -> IO BinLogTracker
track BinLogPacket
q' IORef ByteString
fref
                                        WriteRowsEvent
e <- (BinLogEventType -> Get WriteRowsEvent)
-> BinLogPacket -> IO WriteRowsEvent
forall a. (BinLogEventType -> Get a) -> BinLogPacket -> IO a
getFromBinLogPacket' (FormatDescription
-> TableMapEvent -> BinLogEventType -> Get WriteRowsEvent
getWriteRowEvent FormatDescription
fd TableMapEvent
tme) BinLogPacket
q'
                                        Maybe RowBinLogEvent -> IO (Maybe RowBinLogEvent)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (RowBinLogEvent -> Maybe RowBinLogEvent
forall a. a -> Maybe a
Just (Word32
-> BinLogTracker
-> TableMapEvent
-> WriteRowsEvent
-> RowBinLogEvent
RowWriteEvent (BinLogPacket -> Word32
blTimestamp BinLogPacket
q') BinLogTracker
tr TableMapEvent
tme WriteRowsEvent
e))
                                    | BinLogEventType
u BinLogEventType -> BinLogEventType -> Bool
forall a. Eq a => a -> a -> Bool
== BinLogEventType
BINLOG_DELETE_ROWS_EVENTv1 Bool -> Bool -> Bool
|| BinLogEventType
u BinLogEventType -> BinLogEventType -> Bool
forall a. Eq a => a -> a -> Bool
== BinLogEventType
BINLOG_DELETE_ROWS_EVENTv2 -> do
                                        BinLogTracker
tr <- BinLogPacket -> IORef ByteString -> IO BinLogTracker
track BinLogPacket
q' IORef ByteString
fref
                                        DeleteRowsEvent
e <- (BinLogEventType -> Get DeleteRowsEvent)
-> BinLogPacket -> IO DeleteRowsEvent
forall a. (BinLogEventType -> Get a) -> BinLogPacket -> IO a
getFromBinLogPacket' (FormatDescription
-> TableMapEvent -> BinLogEventType -> Get DeleteRowsEvent
getDeleteRowEvent FormatDescription
fd TableMapEvent
tme) BinLogPacket
q'
                                        Maybe RowBinLogEvent -> IO (Maybe RowBinLogEvent)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (RowBinLogEvent -> Maybe RowBinLogEvent
forall a. a -> Maybe a
Just (Word32
-> BinLogTracker
-> TableMapEvent
-> DeleteRowsEvent
-> RowBinLogEvent
RowDeleteEvent (BinLogPacket -> Word32
blTimestamp BinLogPacket
q') BinLogTracker
tr TableMapEvent
tme DeleteRowsEvent
e))
                                    | BinLogEventType
u BinLogEventType -> BinLogEventType -> Bool
forall a. Eq a => a -> a -> Bool
== BinLogEventType
BINLOG_UPDATE_ROWS_EVENTv1 Bool -> Bool -> Bool
|| BinLogEventType
u BinLogEventType -> BinLogEventType -> Bool
forall a. Eq a => a -> a -> Bool
== BinLogEventType
BINLOG_UPDATE_ROWS_EVENTv2 -> do
                                        BinLogTracker
tr <- BinLogPacket -> IORef ByteString -> IO BinLogTracker
track BinLogPacket
q' IORef ByteString
fref
                                        UpdateRowsEvent
e <- (BinLogEventType -> Get UpdateRowsEvent)
-> BinLogPacket -> IO UpdateRowsEvent
forall a. (BinLogEventType -> Get a) -> BinLogPacket -> IO a
getFromBinLogPacket' (FormatDescription
-> TableMapEvent -> BinLogEventType -> Get UpdateRowsEvent
getUpdateRowEvent FormatDescription
fd TableMapEvent
tme) BinLogPacket
q'
                                        Maybe RowBinLogEvent -> IO (Maybe RowBinLogEvent)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (RowBinLogEvent -> Maybe RowBinLogEvent
forall a. a -> Maybe a
Just (Word32
-> BinLogTracker
-> TableMapEvent
-> UpdateRowsEvent
-> RowBinLogEvent
RowUpdateEvent (BinLogPacket -> Word32
blTimestamp BinLogPacket
q') BinLogTracker
tr TableMapEvent
tme UpdateRowsEvent
e))
                                    | Bool
otherwise -> FormatDescription
-> IORef ByteString
-> InputStream BinLogPacket
-> IO (Maybe RowBinLogEvent)
loop FormatDescription
fd IORef ByteString
fref InputStream BinLogPacket
is
                    | Bool
otherwise -> FormatDescription
-> IORef ByteString
-> InputStream BinLogPacket
-> IO (Maybe RowBinLogEvent)
loop FormatDescription
fd IORef ByteString
fref InputStream BinLogPacket
is

    track :: BinLogPacket -> IORef ByteString -> IO BinLogTracker
track BinLogPacket
p IORef ByteString
fref = ByteString -> Word32 -> BinLogTracker
BinLogTracker (ByteString -> Word32 -> BinLogTracker)
-> IO ByteString -> IO (Word32 -> BinLogTracker)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IORef ByteString -> IO ByteString
forall a. IORef a -> IO a
readIORef IORef ByteString
fref IO (Word32 -> BinLogTracker) -> IO Word32 -> IO BinLogTracker
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> (Word32 -> IO Word32
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Word32 -> IO Word32)
-> (BinLogPacket -> Word32) -> BinLogPacket -> IO Word32
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Word64 -> Word32
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Word64 -> Word32)
-> (BinLogPacket -> Word64) -> BinLogPacket -> Word32
forall b c a. (b -> c) -> (a -> b) -> a -> c
. BinLogPacket -> Word64
blLogPos) BinLogPacket
p

-- | Get latest master's binlog filename and position.
--
getLastBinLogTracker :: MySQLConn -> IO (Maybe BinLogTracker)
getLastBinLogTracker :: MySQLConn -> IO (Maybe BinLogTracker)
getLastBinLogTracker MySQLConn
conn = do
    ([ColumnDef]
_, InputStream [MySQLValue]
is) <- MySQLConn -> Query -> IO ([ColumnDef], InputStream [MySQLValue])
query_ MySQLConn
conn Query
"SHOW MASTER STATUS"
    Maybe [MySQLValue]
row <- InputStream [MySQLValue] -> IO (Maybe [MySQLValue])
forall a. InputStream a -> IO (Maybe a)
Stream.read InputStream [MySQLValue]
is
    InputStream [MySQLValue] -> IO ()
forall a. InputStream a -> IO ()
Stream.skipToEof InputStream [MySQLValue]
is
    case Maybe [MySQLValue]
row of
        Just (MySQLText Text
fn : MySQLInt64U Word64
pos : [MySQLValue]
_) -> Maybe BinLogTracker -> IO (Maybe BinLogTracker)
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe BinLogTracker -> IO (Maybe BinLogTracker))
-> (BinLogTracker -> Maybe BinLogTracker)
-> BinLogTracker
-> IO (Maybe BinLogTracker)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. BinLogTracker -> Maybe BinLogTracker
forall a. a -> Maybe a
Just (BinLogTracker -> IO (Maybe BinLogTracker))
-> BinLogTracker -> IO (Maybe BinLogTracker)
forall a b. (a -> b) -> a -> b
$ ByteString -> Word32 -> BinLogTracker
BinLogTracker (Text -> ByteString
encodeUtf8 Text
fn) (Word64 -> Word32
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word64
pos)
        Maybe [MySQLValue]
_                                         -> Maybe BinLogTracker -> IO (Maybe BinLogTracker)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe BinLogTracker
forall a. Maybe a
Nothing

-- | Return True if binlog_checksum = CRC32. Only for MySQL > 5.6
--
isCheckSumEnabled :: MySQLConn -> IO Bool
isCheckSumEnabled :: MySQLConn -> IO Bool
isCheckSumEnabled MySQLConn
conn = do
    ([ColumnDef]
_, InputStream [MySQLValue]
is) <- MySQLConn -> Query -> IO ([ColumnDef], InputStream [MySQLValue])
query_ MySQLConn
conn Query
"SHOW GLOBAL VARIABLES LIKE 'binlog_checksum'"
    Maybe [MySQLValue]
row <- InputStream [MySQLValue] -> IO (Maybe [MySQLValue])
forall a. InputStream a -> IO (Maybe a)
Stream.read InputStream [MySQLValue]
is
    InputStream [MySQLValue] -> IO ()
forall a. InputStream a -> IO ()
Stream.skipToEof InputStream [MySQLValue]
is
    case Maybe [MySQLValue]
row of
        Just [MySQLValue
_, MySQLText Text
"CRC32"] -> Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
        Maybe [MySQLValue]
_                           -> Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False

-- | Return True if rpl_semi_sync_master_enabled = ON. Only for MySQL > 5.5
--
isSemiSyncEnabled :: MySQLConn -> IO Bool
isSemiSyncEnabled :: MySQLConn -> IO Bool
isSemiSyncEnabled MySQLConn
conn = do
    ([ColumnDef]
_, InputStream [MySQLValue]
is) <- MySQLConn -> Query -> IO ([ColumnDef], InputStream [MySQLValue])
query_ MySQLConn
conn Query
"SHOW VARIABLES LIKE 'rpl_semi_sync_master_enabled'"
    Maybe [MySQLValue]
row <- InputStream [MySQLValue] -> IO (Maybe [MySQLValue])
forall a. InputStream a -> IO (Maybe a)
Stream.read InputStream [MySQLValue]
is
    InputStream [MySQLValue] -> IO ()
forall a. InputStream a -> IO ()
Stream.skipToEof InputStream [MySQLValue]
is
    case Maybe [MySQLValue]
row of
        Just [MySQLValue
_, MySQLText Text
"ON"] -> Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
        Maybe [MySQLValue]
_                        -> Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False