{-# LANGUAGE CPP #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Kafka.Avro.SchemaRegistry
( schemaRegistry, loadSchema, sendSchema
, loadSubjectSchema
, getGlobalConfig, getSubjectConfig
, getVersions, isCompatible
, getSubjects
, SchemaId(..), Subject(..)
, SchemaRegistry, SchemaRegistryError(..)
, Schema(..)
, Compatibility(..), Version(..)
) where
import Control.Arrow (first)
import Control.Exception (throwIO)
import Control.Lens (view, (&), (.~), (^.))
import Control.Monad (void)
import Control.Monad.IO.Class (MonadIO, liftIO)
import Data.Aeson
import Data.Aeson.Types (typeMismatch)
import Data.Avro.Schema.Schema (Schema (..), typeName)
import Data.Bifunctor (bimap)
import Data.Cache as C
import Data.Hashable (Hashable)
import qualified Data.HashMap.Lazy as HM
import Data.Int (Int32)
import Data.String (IsString)
import Data.Text (Text, append, cons, unpack)
import qualified Data.Text.Encoding as Text
import qualified Data.Text.Lazy.Encoding as LText
import Data.Word (Word32)
import GHC.Exception (SomeException, displayException, fromException)
import GHC.Generics (Generic)
import Network.HTTP.Client (HttpException (..), HttpExceptionContent (..), Manager, defaultManagerSettings, newManager)
import qualified Network.Wreq as Wreq
newtype SchemaId = SchemaId { unSchemaId :: Int32} deriving (Eq, Ord, Show, Hashable)
newtype SchemaName = SchemaName Text deriving (Eq, Ord, IsString, Show, Hashable)
newtype Subject = Subject { unSubject :: Text} deriving (Eq, Show, IsString, Ord, Generic, Hashable)
newtype RegisteredSchema = RegisteredSchema { unRegisteredSchema :: Schema} deriving (Generic, Show)
newtype Version = Version { unVersion :: Word32 } deriving (Eq, Ord, Show, Hashable)
data Compatibility = NoCompatibility
| FullCompatibility
| ForwardCompatibility
| BackwardCompatibility
deriving (Eq, Show, Ord)
data SchemaRegistry = SchemaRegistry
{ srCache :: Cache SchemaId Schema
, srReverseCache :: Cache (Subject, SchemaName) SchemaId
, srBaseUrl :: String
}
data SchemaRegistryError = SchemaRegistryConnectError String
| SchemaRegistryLoadError SchemaId
| SchemaRegistrySchemaNotFound SchemaId
| SchemaRegistrySendError String
deriving (Show, Eq)
schemaRegistry :: MonadIO m => String -> m SchemaRegistry
schemaRegistry url = liftIO $
SchemaRegistry
<$> newCache Nothing
<*> newCache Nothing
<*> pure url
loadSchema :: MonadIO m => SchemaRegistry -> SchemaId -> m (Either SchemaRegistryError Schema)
loadSchema sr sid = do
sc <- cachedSchema sr sid
case sc of
Just s -> return (Right s)
Nothing -> liftIO $ do
res <- getSchemaById (srBaseUrl sr) sid
traverse ((\schema -> schema <$ cacheSchema sr sid schema) . unRegisteredSchema) res
loadSubjectSchema :: MonadIO m => SchemaRegistry -> Subject -> Version -> m (Either SchemaRegistryError Schema)
loadSubjectSchema sr (Subject sbj) (Version version) = do
let url = (srBaseUrl sr) ++ "/subjects/" ++ unpack sbj ++ "/versions/" ++ show version
resp <- liftIO $ Wreq.getWith wreqOpts url
wrapped <- pure $ bimap wrapError (view Wreq.responseBody) (Wreq.asValue resp)
schema <- getData "schema" wrapped
schemaId <- getData "id" wrapped
case (,) <$> schema <*> schemaId of
Left err -> return $ Left err
Right ((RegisteredSchema schema), schemaId) -> cacheSchema sr schemaId schema *> (return $ Right schema)
where
getData :: (MonadIO m, FromJSON a) => String -> Either e Value -> m (Either e a)
getData key = either (pure . Left) (viewData key)
viewData :: (MonadIO m, FromJSON a) => String -> Value -> m (Either e a)
viewData key value = liftIO $ either (throwIO . Wreq.JSONError)
(return . return)
(toData value)
toData :: FromJSON a => Value -> Either String a
toData value = case fromJSON value of
Success a -> Right a
Error e -> Left e
sendSchema :: MonadIO m => SchemaRegistry -> Subject -> Schema -> m (Either SchemaRegistryError SchemaId)
sendSchema sr subj sc = do
sid <- cachedId sr subj schemaName
case sid of
Just sid' -> return (Right sid')
Nothing -> do
res <- liftIO $ putSchema (srBaseUrl sr) subj (RegisteredSchema sc)
void $ traverse (cacheId sr subj schemaName) res
void $ traverse (\sid' -> cacheSchema sr sid' sc) res
pure res
where
schemaName = fullTypeName sc
getVersions :: MonadIO m => SchemaRegistry -> Subject -> m (Either SchemaRegistryError [Version])
getVersions sr (Subject sbj) = do
let url = (srBaseUrl sr) ++ "/subjects/" ++ unpack sbj ++ "/versions"
resp <- liftIO $ Wreq.getWith wreqOpts url
pure $ bimap wrapError (fmap Version . view Wreq.responseBody) (Wreq.asJSON resp)
isCompatible :: MonadIO m => SchemaRegistry -> Subject -> Version -> Schema -> m (Either SchemaRegistryError Bool)
isCompatible sr (Subject sbj) (Version version) schema = do
let url = (srBaseUrl sr) ++ "/compatibility/subjects/" ++ unpack sbj ++ "/versions/" ++ show version
resp <- liftIO $ Wreq.postWith wreqOpts url (toJSON $ RegisteredSchema schema)
wrapped <- pure $ bimap wrapError (view Wreq.responseBody) (Wreq.asValue resp)
either (return . Left) getCompatibility wrapped
where
getCompatibility :: MonadIO m => Value -> m (Either e Bool)
getCompatibility = liftIO . maybe (throwIO $ Wreq.JSONError "Missing key 'is_compatible' in Schema Registry response") (return . return) . viewCompatibility
viewCompatibility :: Value -> Maybe Bool
viewCompatibility (Object obj) = HM.lookup "is_compatible" obj >>= toBool
viewCompatibility _ = Nothing
toBool :: Value -> Maybe Bool
toBool (Bool b) = Just b
toBool _ = Nothing
getGlobalConfig :: MonadIO m => SchemaRegistry -> m (Either SchemaRegistryError Compatibility)
getGlobalConfig sr = do
let url = (srBaseUrl sr) ++ "/config"
resp <- liftIO $ Wreq.getWith wreqOpts url
pure $ bimap wrapError (view Wreq.responseBody) (Wreq.asJSON resp)
getSubjectConfig :: MonadIO m => SchemaRegistry -> Subject -> m (Either SchemaRegistryError Compatibility)
getSubjectConfig sr (Subject sbj) = do
let url = (srBaseUrl sr) ++ "/config/" ++ unpack sbj
resp <- liftIO $ Wreq.getWith wreqOpts url
pure $ bimap wrapError (view Wreq.responseBody) (Wreq.asJSON resp)
getSubjects :: MonadIO m => SchemaRegistry -> m (Either SchemaRegistryError [Subject])
getSubjects sr = do
let url = (srBaseUrl sr) ++ "/subjects"
resp <- liftIO $ Wreq.getWith wreqOpts url
pure $ bimap wrapError (fmap Subject . view Wreq.responseBody) (Wreq.asJSON resp)
wreqOpts :: Wreq.Options
wreqOpts =
let accept = ["application/vnd.schemaregistry.v1+json", "application/vnd.schemaregistry+json", "application/json"]
in Wreq.defaults & Wreq.header "Accept" .~ accept
getSchemaById :: String -> SchemaId -> IO (Either SchemaRegistryError RegisteredSchema)
getSchemaById baseUrl sid@(SchemaId i) = do
let schemaUrl = baseUrl ++ "/schemas/ids/" ++ show i
resp <- Wreq.getWith wreqOpts schemaUrl
pure $ bimap (const (SchemaRegistryLoadError sid)) (view Wreq.responseBody) (Wreq.asJSON resp)
putSchema :: String -> Subject -> RegisteredSchema -> IO (Either SchemaRegistryError SchemaId)
putSchema baseUrl (Subject sbj) schema = do
let schemaUrl = baseUrl ++ "/subjects/" ++ unpack sbj ++ "/versions"
resp <- Wreq.postWith wreqOpts schemaUrl (toJSON schema)
pure $ bimap wrapError (view Wreq.responseBody) (Wreq.asJSON resp)
fromHttpError :: HttpException -> (HttpExceptionContent -> SchemaRegistryError) -> SchemaRegistryError
fromHttpError err f = case err of
InvalidUrlException fld err' -> SchemaRegistryConnectError (fld ++ ": " ++ err')
HttpExceptionRequest _ (ConnectionFailure err) -> SchemaRegistryConnectError (displayException err)
HttpExceptionRequest _ ConnectionTimeout -> SchemaRegistryConnectError (displayException err)
HttpExceptionRequest _ ProxyConnectException{} -> SchemaRegistryConnectError (displayException err)
HttpExceptionRequest _ ConnectionClosed -> SchemaRegistryConnectError (displayException err)
HttpExceptionRequest _ (InvalidDestinationHost _) -> SchemaRegistryConnectError (displayException err)
HttpExceptionRequest _ TlsNotSupported -> SchemaRegistryConnectError (displayException err)
#if MIN_VERSION_http_client(0,5,7)
HttpExceptionRequest _ (InvalidProxySettings _) -> SchemaRegistryConnectError (displayException err)
#endif
HttpExceptionRequest _ err' -> f err'
wrapError :: SomeException -> SchemaRegistryError
wrapError someErr = case fromException someErr of
Nothing -> SchemaRegistrySendError (displayException someErr)
Just httpErr -> fromHttpError httpErr (\_ -> SchemaRegistrySendError (displayException someErr))
fullTypeName :: Schema -> SchemaName
fullTypeName r = SchemaName $ typeName r
cachedSchema :: MonadIO m => SchemaRegistry -> SchemaId -> m (Maybe Schema)
cachedSchema sr k = liftIO $ C.lookup (srCache sr) k
{-# INLINE cachedSchema #-}
cacheSchema :: MonadIO m => SchemaRegistry -> SchemaId -> Schema -> m ()
cacheSchema sr k v = liftIO $ C.insert (srCache sr) k v
{-# INLINE cacheSchema #-}
cachedId :: MonadIO m => SchemaRegistry -> Subject -> SchemaName -> m (Maybe SchemaId)
cachedId sr subj scn = liftIO $ C.lookup (srReverseCache sr) (subj, scn)
{-# INLINE cachedId #-}
cacheId :: MonadIO m => SchemaRegistry -> Subject -> SchemaName -> SchemaId -> m ()
cacheId sr subj scn sid = liftIO $ C.insert (srReverseCache sr) (subj, scn) sid
{-# INLINE cacheId #-}
instance FromJSON RegisteredSchema where
parseJSON (Object v) =
withObject "expected schema" (\obj -> do
sch <- obj .: "schema"
maybe mempty (return . RegisteredSchema) (decode $ LText.encodeUtf8 sch)
) (Object v)
parseJSON _ = mempty
instance ToJSON RegisteredSchema where
toJSON (RegisteredSchema v) = object ["schema" .= LText.decodeUtf8 (encode $ toJSON v)]
instance FromJSON SchemaId where
parseJSON (Object v) = SchemaId <$> v .: "id"
parseJSON _ = mempty
instance FromJSON Compatibility where
parseJSON = withObject "Compatibility" $ \v -> do
compatibility <- v .: "compatibilityLevel"
case compatibility of
"NONE" -> return $ NoCompatibility
"FULL" -> return $ FullCompatibility
"FORWARD" -> return $ ForwardCompatibility
"BACKWARD" -> return $ BackwardCompatibility
_ -> typeMismatch "Compatibility" compatibility