{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE DerivingStrategies #-}
{-# LANGUAGE GeneralisedNewtypeDeriving #-}
module Database.Franz.Internal.Protocol
  ( apiVersion
  , defaultPort
  , IndexName
  , StreamName(..)
  , streamNameToPath
  , FranzException(..)
  , RequestType(..)
  , ItemRef(..)
  , Query(..)
  , RawRequest(..)
  , ResponseId
  , ResponseHeader(..)
  , PayloadHeader(..)) where

import Control.Exception (Exception)
import qualified Data.ByteString.Char8 as B
import Data.Serialize hiding (getInt64le)
import Database.Franz.Internal.IO (getInt64le)
import Data.Hashable (Hashable)
import Data.String
import qualified Data.Text as T
import qualified Data.Text.Encoding as T
import Network.Socket (PortNumber)
import GHC.Generics (Generic)
import qualified Data.ByteString.FastBuilder as BB
import qualified Data.Vector as V

apiVersion :: B.ByteString
apiVersion :: ByteString
apiVersion = String -> ByteString
B.pack String
"0"

defaultPort :: PortNumber
defaultPort :: PortNumber
defaultPort = PortNumber
1886

type IndexName = B.ByteString

newtype StreamName = StreamName { StreamName -> ByteString
unStreamName :: B.ByteString }
  deriving newtype (Int -> StreamName -> ShowS
[StreamName] -> ShowS
StreamName -> String
(Int -> StreamName -> ShowS)
-> (StreamName -> String)
-> ([StreamName] -> ShowS)
-> Show StreamName
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [StreamName] -> ShowS
$cshowList :: [StreamName] -> ShowS
show :: StreamName -> String
$cshow :: StreamName -> String
showsPrec :: Int -> StreamName -> ShowS
$cshowsPrec :: Int -> StreamName -> ShowS
Show, StreamName -> StreamName -> Bool
(StreamName -> StreamName -> Bool)
-> (StreamName -> StreamName -> Bool) -> Eq StreamName
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: StreamName -> StreamName -> Bool
$c/= :: StreamName -> StreamName -> Bool
== :: StreamName -> StreamName -> Bool
$c== :: StreamName -> StreamName -> Bool
Eq, Eq StreamName
Eq StreamName
-> (StreamName -> StreamName -> Ordering)
-> (StreamName -> StreamName -> Bool)
-> (StreamName -> StreamName -> Bool)
-> (StreamName -> StreamName -> Bool)
-> (StreamName -> StreamName -> Bool)
-> (StreamName -> StreamName -> StreamName)
-> (StreamName -> StreamName -> StreamName)
-> Ord StreamName
StreamName -> StreamName -> Bool
StreamName -> StreamName -> Ordering
StreamName -> StreamName -> StreamName
forall a.
Eq a
-> (a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
min :: StreamName -> StreamName -> StreamName
$cmin :: StreamName -> StreamName -> StreamName
max :: StreamName -> StreamName -> StreamName
$cmax :: StreamName -> StreamName -> StreamName
>= :: StreamName -> StreamName -> Bool
$c>= :: StreamName -> StreamName -> Bool
> :: StreamName -> StreamName -> Bool
$c> :: StreamName -> StreamName -> Bool
<= :: StreamName -> StreamName -> Bool
$c<= :: StreamName -> StreamName -> Bool
< :: StreamName -> StreamName -> Bool
$c< :: StreamName -> StreamName -> Bool
compare :: StreamName -> StreamName -> Ordering
$ccompare :: StreamName -> StreamName -> Ordering
$cp1Ord :: Eq StreamName
Ord, Eq StreamName
Eq StreamName
-> (Int -> StreamName -> Int)
-> (StreamName -> Int)
-> Hashable StreamName
Int -> StreamName -> Int
StreamName -> Int
forall a. Eq a -> (Int -> a -> Int) -> (a -> Int) -> Hashable a
hash :: StreamName -> Int
$chash :: StreamName -> Int
hashWithSalt :: Int -> StreamName -> Int
$chashWithSalt :: Int -> StreamName -> Int
$cp1Hashable :: Eq StreamName
Hashable, Get StreamName
Putter StreamName
Putter StreamName -> Get StreamName -> Serialize StreamName
forall t. Putter t -> Get t -> Serialize t
get :: Get StreamName
$cget :: Get StreamName
put :: Putter StreamName
$cput :: Putter StreamName
Serialize)

streamNameToPath :: StreamName -> FilePath
streamNameToPath :: StreamName -> String
streamNameToPath = Text -> String
T.unpack (Text -> String) -> (StreamName -> Text) -> StreamName -> String
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> Text
T.decodeUtf8 (ByteString -> Text)
-> (StreamName -> ByteString) -> StreamName -> Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StreamName -> ByteString
unStreamName

-- | UTF-8 encoded
instance IsString StreamName where
  fromString :: String -> StreamName
fromString = ByteString -> StreamName
StreamName (ByteString -> StreamName)
-> (String -> ByteString) -> String -> StreamName
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Builder -> ByteString
BB.toStrictByteString (Builder -> ByteString)
-> (String -> Builder) -> String -> ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> Builder
BB.stringUtf8

data FranzException = MalformedRequest !String
  | StreamNotFound !FilePath
  | IndexNotFound !IndexName ![IndexName]
  | InternalError !String
  | ClientError !String
  deriving (Int -> FranzException -> ShowS
[FranzException] -> ShowS
FranzException -> String
(Int -> FranzException -> ShowS)
-> (FranzException -> String)
-> ([FranzException] -> ShowS)
-> Show FranzException
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [FranzException] -> ShowS
$cshowList :: [FranzException] -> ShowS
show :: FranzException -> String
$cshow :: FranzException -> String
showsPrec :: Int -> FranzException -> ShowS
$cshowsPrec :: Int -> FranzException -> ShowS
Show, (forall x. FranzException -> Rep FranzException x)
-> (forall x. Rep FranzException x -> FranzException)
-> Generic FranzException
forall x. Rep FranzException x -> FranzException
forall x. FranzException -> Rep FranzException x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep FranzException x -> FranzException
$cfrom :: forall x. FranzException -> Rep FranzException x
Generic)
instance Serialize FranzException
instance Exception FranzException

data RequestType = AllItems
  | LastItem
  | FirstItem
  deriving (Int -> RequestType -> ShowS
[RequestType] -> ShowS
RequestType -> String
(Int -> RequestType -> ShowS)
-> (RequestType -> String)
-> ([RequestType] -> ShowS)
-> Show RequestType
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [RequestType] -> ShowS
$cshowList :: [RequestType] -> ShowS
show :: RequestType -> String
$cshow :: RequestType -> String
showsPrec :: Int -> RequestType -> ShowS
$cshowsPrec :: Int -> RequestType -> ShowS
Show, RequestType -> RequestType -> Bool
(RequestType -> RequestType -> Bool)
-> (RequestType -> RequestType -> Bool) -> Eq RequestType
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: RequestType -> RequestType -> Bool
$c/= :: RequestType -> RequestType -> Bool
== :: RequestType -> RequestType -> Bool
$c== :: RequestType -> RequestType -> Bool
Eq, (forall x. RequestType -> Rep RequestType x)
-> (forall x. Rep RequestType x -> RequestType)
-> Generic RequestType
forall x. Rep RequestType x -> RequestType
forall x. RequestType -> Rep RequestType x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep RequestType x -> RequestType
$cfrom :: forall x. RequestType -> Rep RequestType x
Generic)
instance Serialize RequestType

data ItemRef = BySeqNum !Int -- ^ sequential number
  | ByIndex !IndexName !Int -- ^ index name and value
  deriving (Int -> ItemRef -> ShowS
[ItemRef] -> ShowS
ItemRef -> String
(Int -> ItemRef -> ShowS)
-> (ItemRef -> String) -> ([ItemRef] -> ShowS) -> Show ItemRef
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [ItemRef] -> ShowS
$cshowList :: [ItemRef] -> ShowS
show :: ItemRef -> String
$cshow :: ItemRef -> String
showsPrec :: Int -> ItemRef -> ShowS
$cshowsPrec :: Int -> ItemRef -> ShowS
Show, ItemRef -> ItemRef -> Bool
(ItemRef -> ItemRef -> Bool)
-> (ItemRef -> ItemRef -> Bool) -> Eq ItemRef
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: ItemRef -> ItemRef -> Bool
$c/= :: ItemRef -> ItemRef -> Bool
== :: ItemRef -> ItemRef -> Bool
$c== :: ItemRef -> ItemRef -> Bool
Eq, (forall x. ItemRef -> Rep ItemRef x)
-> (forall x. Rep ItemRef x -> ItemRef) -> Generic ItemRef
forall x. Rep ItemRef x -> ItemRef
forall x. ItemRef -> Rep ItemRef x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep ItemRef x -> ItemRef
$cfrom :: forall x. ItemRef -> Rep ItemRef x
Generic)
instance Serialize ItemRef

data Query = Query
  { Query -> StreamName
reqStream :: !StreamName
  , Query -> ItemRef
reqFrom :: !ItemRef -- ^ name of the index to search
  , Query -> ItemRef
reqTo :: !ItemRef -- ^ name of the index to search
  , Query -> RequestType
reqType :: !RequestType
  } deriving (Int -> Query -> ShowS
[Query] -> ShowS
Query -> String
(Int -> Query -> ShowS)
-> (Query -> String) -> ([Query] -> ShowS) -> Show Query
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [Query] -> ShowS
$cshowList :: [Query] -> ShowS
show :: Query -> String
$cshow :: Query -> String
showsPrec :: Int -> Query -> ShowS
$cshowsPrec :: Int -> Query -> ShowS
Show, Query -> Query -> Bool
(Query -> Query -> Bool) -> (Query -> Query -> Bool) -> Eq Query
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: Query -> Query -> Bool
$c/= :: Query -> Query -> Bool
== :: Query -> Query -> Bool
$c== :: Query -> Query -> Bool
Eq, (forall x. Query -> Rep Query x)
-> (forall x. Rep Query x -> Query) -> Generic Query
forall x. Rep Query x -> Query
forall x. Query -> Rep Query x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep Query x -> Query
$cfrom :: forall x. Query -> Rep Query x
Generic)
instance Serialize Query

data RawRequest
  = RawRequest !ResponseId !Query
  | RawClean !ResponseId deriving (forall x. RawRequest -> Rep RawRequest x)
-> (forall x. Rep RawRequest x -> RawRequest) -> Generic RawRequest
forall x. Rep RawRequest x -> RawRequest
forall x. RawRequest -> Rep RawRequest x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep RawRequest x -> RawRequest
$cfrom :: forall x. RawRequest -> Rep RawRequest x
Generic
instance Serialize RawRequest

type ResponseId = Int

data ResponseHeader = Response !ResponseId
    -- ^ response ID, number of streams; there are items satisfying the query
    | ResponseWait !ResponseId -- ^ response ID; requested elements are not available right now
    | ResponseError !ResponseId !FranzException -- ^ something went wrong
    deriving (Int -> ResponseHeader -> ShowS
[ResponseHeader] -> ShowS
ResponseHeader -> String
(Int -> ResponseHeader -> ShowS)
-> (ResponseHeader -> String)
-> ([ResponseHeader] -> ShowS)
-> Show ResponseHeader
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [ResponseHeader] -> ShowS
$cshowList :: [ResponseHeader] -> ShowS
show :: ResponseHeader -> String
$cshow :: ResponseHeader -> String
showsPrec :: Int -> ResponseHeader -> ShowS
$cshowsPrec :: Int -> ResponseHeader -> ShowS
Show, (forall x. ResponseHeader -> Rep ResponseHeader x)
-> (forall x. Rep ResponseHeader x -> ResponseHeader)
-> Generic ResponseHeader
forall x. Rep ResponseHeader x -> ResponseHeader
forall x. ResponseHeader -> Rep ResponseHeader x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep ResponseHeader x -> ResponseHeader
$cfrom :: forall x. ResponseHeader -> Rep ResponseHeader x
Generic)
instance Serialize ResponseHeader

-- | Initial seqno, final seqno, base offset, index names
data PayloadHeader = PayloadHeader !Int !Int !Int !(V.Vector IndexName)

instance Serialize PayloadHeader where
  put :: Putter PayloadHeader
put (PayloadHeader Int
s Int
t Int
u Vector ByteString
xs) = Int -> Put
f Int
s Put -> Put -> Put
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> Int -> Put
f Int
t Put -> Put -> Put
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> Int -> Put
f Int
u
    Put -> Put -> Put
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> Int -> Put
forall t. Serialize t => Putter t
put (Vector ByteString -> Int
forall a. Vector a -> Int
V.length Vector ByteString
xs) Put -> Put -> Put
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> (ByteString -> Put) -> Vector ByteString -> Put
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ ByteString -> Put
forall t. Serialize t => Putter t
put Vector ByteString
xs where
    f :: Int -> Put
f = Putter Int64
putInt64le Putter Int64 -> (Int -> Int64) -> Int -> Put
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral
  get :: Get PayloadHeader
get = Int -> Int -> Int -> Vector ByteString -> PayloadHeader
PayloadHeader (Int -> Int -> Int -> Vector ByteString -> PayloadHeader)
-> Get Int
-> Get (Int -> Int -> Vector ByteString -> PayloadHeader)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Get Int
forall a. Num a => Get a
getInt64le Get (Int -> Int -> Vector ByteString -> PayloadHeader)
-> Get Int -> Get (Int -> Vector ByteString -> PayloadHeader)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Get Int
forall a. Num a => Get a
getInt64le Get (Int -> Vector ByteString -> PayloadHeader)
-> Get Int -> Get (Vector ByteString -> PayloadHeader)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Get Int
forall a. Num a => Get a
getInt64le
    Get (Vector ByteString -> PayloadHeader)
-> Get (Vector ByteString) -> Get PayloadHeader
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> do
      Int
len <- Get Int
forall t. Serialize t => Get t
get
      Int -> Get ByteString -> Get (Vector ByteString)
forall (m :: * -> *) a. Monad m => Int -> m a -> m (Vector a)
V.replicateM Int
len Get ByteString
forall t. Serialize t => Get t
get