-- Copyright (c) 2020-present, EMQX, Inc.
-- All rights reserved.
--
-- This source code is distributed under the terms of a MIT license,
-- found in the LICENSE file.

----------------------------------------------------------------------


{-# LANGUAGE FlexibleContexts #-}

-- | This module provides functions for handling data streaming communications.
--   For internal use only.
-- 

module Database.ClickHouseDriver.Block
  ( BlockInfo (..),
    writeInfo,
    readInfo,
    readBlockInputStream,
    Block (..),
    defaultBlockInfo,
    writeBlockOutputStream,
    defaultBlock
  )
where

import Database.ClickHouseDriver.Column
    (readColumn, writeColumn )
import Database.ClickHouseDriver.Defines as Defines
    ( _DBMS_MIN_REVISION_WITH_BLOCK_INFO )
import Database.ClickHouseDriver.Types
    ( writeBlockInfo,
      Block(..),
      BlockInfo(..),
      Context(Context),
      ServerInfo(revision),
      ClickhouseType )
import Database.ClickHouseDriver.IO.BufferedReader
    ( readBinaryInt32,
      readBinaryStr,
      readBinaryUInt8,
      readVarInt,
      Reader )
import Database.ClickHouseDriver.IO.BufferedWriter
    ( writeBinaryInt32,
      writeBinaryStr,
      writeBinaryUInt8,
      writeVarUInt,
      Writer )
import Data.ByteString ( ByteString )
import Data.ByteString.Builder ( Builder )
import           Data.Vector                        (Vector)
import           Data.Vector                        ((!))
import qualified Data.Vector                        as V
import           Data.Time.LocalTime          (CalendarDiffTime (..))
--Debug
--import           Debug.Trace

defaultBlockInfo :: BlockInfo
defaultBlockInfo :: BlockInfo
defaultBlockInfo =
  Info :: Bool -> Int32 -> BlockInfo
Info
    { is_overflows :: Bool
is_overflows = Bool
False,
      bucket_num :: Int32
bucket_num = -Int32
1
    }

defaultBlock :: Block
defaultBlock :: Block
defaultBlock =
   ColumnOrientedBlock :: Vector (ByteString, ByteString)
-> Vector (Vector ClickhouseType) -> BlockInfo -> Block
ColumnOrientedBlock {
     columns_with_type :: Vector (ByteString, ByteString)
columns_with_type=Vector (ByteString, ByteString)
forall a. Vector a
V.empty,
     cdata :: Vector (Vector ClickhouseType)
cdata = Vector (Vector ClickhouseType)
forall a. Vector a
V.empty,
     info :: BlockInfo
info = BlockInfo
defaultBlockInfo
   }

-- | write block informamtion to string builder
writeInfo :: BlockInfo->Writer Builder
writeInfo :: BlockInfo -> Writer Builder
writeInfo (Info Bool
is_overflows Int32
bucket_num) = do
    Word -> Writer Builder
forall w. MonoidMap ByteString w => Word -> Writer w
writeVarUInt Word
1
    Word8 -> Writer Builder
forall w. MonoidMap ByteString w => Word8 -> Writer w
writeBinaryUInt8 (if Bool
is_overflows then Word8
1 else Word8
0)
    Word -> Writer Builder
forall w. MonoidMap ByteString w => Word -> Writer w
writeVarUInt Word
2
    Int32 -> Writer Builder
forall w. MonoidMap ByteString w => Int32 -> Writer w
writeBinaryInt32 Int32
bucket_num
    Word -> Writer Builder
forall w. MonoidMap ByteString w => Word -> Writer w
writeVarUInt Word
0

-- | read information from block information
readInfo :: BlockInfo -> Reader BlockInfo
readInfo :: BlockInfo -> Reader BlockInfo
readInfo info :: BlockInfo
info@Info {is_overflows :: BlockInfo -> Bool
is_overflows = Bool
io, bucket_num :: BlockInfo -> Int32
bucket_num = Int32
bn} = do
  Word
field_num <- Reader Word
readVarInt
  case Word
field_num of
    Word
1 -> do
      Word8
io' <- Reader Word8
readBinaryUInt8
      BlockInfo -> Reader BlockInfo
readInfo Info :: Bool -> Int32 -> BlockInfo
Info {is_overflows :: Bool
is_overflows = if Word8
io' Word8 -> Word8 -> Bool
forall a. Eq a => a -> a -> Bool
== Word8
0 then Bool
False else Bool
True, bucket_num :: Int32
bucket_num = Int32
bn}
    Word
2 -> do
      Int32
bn' <- Reader Int32
readBinaryInt32
      BlockInfo -> Reader BlockInfo
readInfo Info :: Bool -> Int32 -> BlockInfo
Info {is_overflows :: Bool
is_overflows = Bool
io, bucket_num :: Int32
bucket_num = Int32
bn'}
    Word
_ -> BlockInfo -> Reader BlockInfo
forall (m :: * -> *) a. Monad m => a -> m a
return BlockInfo
info

-- | Read a stream of data into a block. Data are read into column type
readBlockInputStream :: ServerInfo->Reader Block
readBlockInputStream :: ServerInfo -> Reader Block
readBlockInputStream ServerInfo
server_info = do
  let defaultInfo :: BlockInfo
defaultInfo =
        Info :: Bool -> Int32 -> BlockInfo
Info
          { is_overflows :: Bool
is_overflows = Bool
False,
            bucket_num :: Int32
bucket_num = -Int32
1
          } -- TODO should have considered the revision
  BlockInfo
info <- BlockInfo -> Reader BlockInfo
readInfo BlockInfo
defaultInfo
  Word
n_columns <- Reader Word
readVarInt
  Word
n_rows <- Reader Word
readVarInt
  let loop :: Int -> Reader (Vector ClickhouseType, ByteString, ByteString)
      loop :: Int -> Reader (Vector ClickhouseType, ByteString, ByteString)
loop Int
n = do
        ByteString
column_name <- Reader ByteString
readBinaryStr
        ByteString
column_type <- Reader ByteString
readBinaryStr
        Vector ClickhouseType
column <- ServerInfo -> Int -> ByteString -> Reader (Vector ClickhouseType)
readColumn ServerInfo
server_info (Word -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word
n_rows) ByteString
column_type
        (Vector ClickhouseType, ByteString, ByteString)
-> Reader (Vector ClickhouseType, ByteString, ByteString)
forall (m :: * -> *) a. Monad m => a -> m a
return (Vector ClickhouseType
column, ByteString
column_name, ByteString
column_type)
  Vector (Vector ClickhouseType, ByteString, ByteString)
v <- Int
-> (Int -> Reader (Vector ClickhouseType, ByteString, ByteString))
-> StateT
     Buffer IO (Vector (Vector ClickhouseType, ByteString, ByteString))
forall (m :: * -> *) a.
Monad m =>
Int -> (Int -> m a) -> m (Vector a)
V.generateM (Word -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word
n_columns) Int -> Reader (Vector ClickhouseType, ByteString, ByteString)
loop
  let datas :: Vector (Vector ClickhouseType)
datas = (\(Vector ClickhouseType
x, ByteString
_, ByteString
_) -> Vector ClickhouseType
x) ((Vector ClickhouseType, ByteString, ByteString)
 -> Vector ClickhouseType)
-> Vector (Vector ClickhouseType, ByteString, ByteString)
-> Vector (Vector ClickhouseType)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Vector (Vector ClickhouseType, ByteString, ByteString)
v
      names :: Vector ByteString
names = (\(Vector ClickhouseType
_, ByteString
x, ByteString
_) -> ByteString
x) ((Vector ClickhouseType, ByteString, ByteString) -> ByteString)
-> Vector (Vector ClickhouseType, ByteString, ByteString)
-> Vector ByteString
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Vector (Vector ClickhouseType, ByteString, ByteString)
v
      types :: Vector ByteString
types = (\(Vector ClickhouseType
_, ByteString
_, ByteString
x) -> ByteString
x) ((Vector ClickhouseType, ByteString, ByteString) -> ByteString)
-> Vector (Vector ClickhouseType, ByteString, ByteString)
-> Vector ByteString
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Vector (Vector ClickhouseType, ByteString, ByteString)
v
  Block -> Reader Block
forall (m :: * -> *) a. Monad m => a -> m a
return
    ColumnOrientedBlock :: Vector (ByteString, ByteString)
-> Vector (Vector ClickhouseType) -> BlockInfo -> Block
ColumnOrientedBlock
      { cdata :: Vector (Vector ClickhouseType)
cdata = Vector (Vector ClickhouseType)
datas,
        info :: BlockInfo
info = BlockInfo
info,
        columns_with_type :: Vector (ByteString, ByteString)
columns_with_type = Vector ByteString
-> Vector ByteString -> Vector (ByteString, ByteString)
forall a b. Vector a -> Vector b -> Vector (a, b)
V.zip Vector ByteString
names Vector ByteString
types
      }

-- | write data from column type into string builder.
writeBlockOutputStream :: Context->Block->Writer Builder
writeBlockOutputStream :: Context -> Block -> Writer Builder
writeBlockOutputStream ctx :: Context
ctx@(Context Maybe ClientInfo
_ Maybe ServerInfo
server_info Maybe ClientSetting
_)
  (ColumnOrientedBlock Vector (ByteString, ByteString)
columns_with_type Vector (Vector ClickhouseType)
cdata BlockInfo
info) = do
  let revis :: Integer
revis = Word -> Integer
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Word -> Integer) -> Word -> Integer
forall a b. (a -> b) -> a -> b
$
        ServerInfo -> Word
revision (ServerInfo -> Word) -> ServerInfo -> Word
forall a b. (a -> b) -> a -> b
$
        (case Maybe ServerInfo
server_info of
        Maybe ServerInfo
Nothing   -> [Char] -> ServerInfo
forall a. HasCallStack => [Char] -> a
error [Char]
""
        Just ServerInfo
info -> ServerInfo
info)
  if Integer
revis Integer -> Integer -> Bool
forall a. Ord a => a -> a -> Bool
>= Integer
Defines._DBMS_MIN_REVISION_WITH_BLOCK_INFO
    then BlockInfo -> Writer Builder
writeBlockInfo BlockInfo
info
    else () -> Writer Builder
forall (m :: * -> *) a. Monad m => a -> m a
return ()
  let n_rows :: Word
n_rows = Int -> Word
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> Word) -> Int -> Word
forall a b. (a -> b) -> a -> b
$ Vector (Vector ClickhouseType) -> Int
forall a. Vector a -> Int
V.length Vector (Vector ClickhouseType)
cdata
      n_columns :: Word
n_columns = Int -> Word
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> Word) -> Int -> Word
forall a b. (a -> b) -> a -> b
$ Vector ClickhouseType -> Int
forall a. Vector a -> Int
V.length (Vector (Vector ClickhouseType)
cdata Vector (Vector ClickhouseType) -> Int -> Vector ClickhouseType
forall a. Vector a -> Int -> a
! Int
0)
  Word -> Writer Builder
forall w. MonoidMap ByteString w => Word -> Writer w
writeVarUInt Word
n_rows
  Word -> Writer Builder
forall w. MonoidMap ByteString w => Word -> Writer w
writeVarUInt Word
n_columns
  (Int -> (ByteString, ByteString) -> Writer Builder)
-> Vector (ByteString, ByteString) -> Writer Builder
forall (m :: * -> *) a b.
Monad m =>
(Int -> a -> m b) -> Vector a -> m ()
V.imapM_ (\Int
i (ByteString
col, ByteString
t)->do
       ByteString -> Writer Builder
forall w. MonoidMap ByteString w => ByteString -> Writer w
writeBinaryStr ByteString
col
       ByteString -> Writer Builder
forall w. MonoidMap ByteString w => ByteString -> Writer w
writeBinaryStr ByteString
t
       Context
-> ByteString
-> ByteString
-> Vector ClickhouseType
-> Writer Builder
writeColumn Context
ctx ByteString
col ByteString
t (Vector (Vector ClickhouseType)
cdata Vector (Vector ClickhouseType) -> Int -> Vector ClickhouseType
forall a. Vector a -> Int -> a
! Int
i)
       ) Vector (ByteString, ByteString)
columns_with_type