{-# LANGUAGE CPP #-} {-# LANGUAGE DeriveGeneric #-} {-# LANGUAGE GeneralizedNewtypeDeriving #-} {-# LANGUAGE LambdaCase #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE ScopedTypeVariables #-} module Kafka.Avro.SchemaRegistry ( schemaRegistry, loadSchema, sendSchema , loadSubjectSchema , getGlobalConfig, getSubjectConfig , getVersions, isCompatible , getSubjects , SchemaId(..), Subject(..) , SchemaRegistry, SchemaRegistryError(..) , Schema(..) , Compatibility(..), Version(..) ) where import Control.Arrow (first) import Control.Exception (throwIO) import Control.Lens (view, (&), (.~), (^.)) import Control.Monad (void) import Control.Monad.IO.Class (MonadIO, liftIO) import Data.Aeson import Data.Aeson.Types (typeMismatch) import Data.Avro.Schema.Schema (Schema (..), typeName) import Data.Bifunctor (bimap) import Data.Cache as C import Data.Hashable (Hashable) import qualified Data.HashMap.Lazy as HM import Data.Int (Int32) import Data.String (IsString) import Data.Text (Text, append, cons, unpack) import qualified Data.Text.Encoding as Text import qualified Data.Text.Lazy.Encoding as LText import Data.Word (Word32) import GHC.Exception (SomeException, displayException, fromException) import GHC.Generics (Generic) import Network.HTTP.Client (HttpException (..), HttpExceptionContent (..), Manager, defaultManagerSettings, newManager) import qualified Network.Wreq as Wreq newtype SchemaId = SchemaId { unSchemaId :: Int32} deriving (Eq, Ord, Show, Hashable) newtype SchemaName = SchemaName Text deriving (Eq, Ord, IsString, Show, Hashable) newtype Subject = Subject { unSubject :: Text} deriving (Eq, Show, IsString, Ord, Generic, Hashable) newtype RegisteredSchema = RegisteredSchema { unRegisteredSchema :: Schema} deriving (Generic, Show) newtype Version = Version { unVersion :: Word32 } deriving (Eq, Ord, Show, Hashable) data Compatibility = NoCompatibility | FullCompatibility | ForwardCompatibility | BackwardCompatibility deriving (Eq, Show, Ord) data SchemaRegistry = SchemaRegistry { srCache :: Cache SchemaId Schema , srReverseCache :: Cache (Subject, SchemaName) SchemaId , srBaseUrl :: String } data SchemaRegistryError = SchemaRegistryConnectError String | SchemaRegistryLoadError SchemaId | SchemaRegistrySchemaNotFound SchemaId | SchemaRegistrySendError String deriving (Show, Eq) schemaRegistry :: MonadIO m => String -> m SchemaRegistry schemaRegistry url = liftIO $ SchemaRegistry <$> newCache Nothing <*> newCache Nothing <*> pure url loadSchema :: MonadIO m => SchemaRegistry -> SchemaId -> m (Either SchemaRegistryError Schema) loadSchema sr sid = do sc <- cachedSchema sr sid case sc of Just s -> return (Right s) Nothing -> liftIO $ do res <- getSchemaById (srBaseUrl sr) sid traverse ((\schema -> schema <$ cacheSchema sr sid schema) . unRegisteredSchema) res loadSubjectSchema :: MonadIO m => SchemaRegistry -> Subject -> Version -> m (Either SchemaRegistryError Schema) loadSubjectSchema sr (Subject sbj) (Version version) = do let url = (srBaseUrl sr) ++ "/subjects/" ++ unpack sbj ++ "/versions/" ++ show version resp <- liftIO $ Wreq.getWith wreqOpts url wrapped <- pure $ bimap wrapError (view Wreq.responseBody) (Wreq.asValue resp) schema <- getData "schema" wrapped schemaId <- getData "id" wrapped case (,) <$> schema <*> schemaId of Left err -> return $ Left err Right ((RegisteredSchema schema), schemaId) -> cacheSchema sr schemaId schema *> (return $ Right schema) where getData :: (MonadIO m, FromJSON a) => String -> Either e Value -> m (Either e a) getData key = either (pure . Left) (viewData key) viewData :: (MonadIO m, FromJSON a) => String -> Value -> m (Either e a) viewData key value = liftIO $ either (throwIO . Wreq.JSONError) (return . return) (toData value) toData :: FromJSON a => Value -> Either String a toData value = case fromJSON value of Success a -> Right a Error e -> Left e sendSchema :: MonadIO m => SchemaRegistry -> Subject -> Schema -> m (Either SchemaRegistryError SchemaId) sendSchema sr subj sc = do sid <- cachedId sr subj schemaName case sid of Just sid' -> return (Right sid') Nothing -> do res <- liftIO $ putSchema (srBaseUrl sr) subj (RegisteredSchema sc) void $ traverse (cacheId sr subj schemaName) res void $ traverse (\sid' -> cacheSchema sr sid' sc) res pure res where schemaName = fullTypeName sc getVersions :: MonadIO m => SchemaRegistry -> Subject -> m (Either SchemaRegistryError [Version]) getVersions sr (Subject sbj) = do let url = (srBaseUrl sr) ++ "/subjects/" ++ unpack sbj ++ "/versions" resp <- liftIO $ Wreq.getWith wreqOpts url pure $ bimap wrapError (fmap Version . view Wreq.responseBody) (Wreq.asJSON resp) isCompatible :: MonadIO m => SchemaRegistry -> Subject -> Version -> Schema -> m (Either SchemaRegistryError Bool) isCompatible sr (Subject sbj) (Version version) schema = do let url = (srBaseUrl sr) ++ "/compatibility/subjects/" ++ unpack sbj ++ "/versions/" ++ show version resp <- liftIO $ Wreq.postWith wreqOpts url (toJSON $ RegisteredSchema schema) wrapped <- pure $ bimap wrapError (view Wreq.responseBody) (Wreq.asValue resp) either (return . Left) getCompatibility wrapped where getCompatibility :: MonadIO m => Value -> m (Either e Bool) getCompatibility = liftIO . maybe (throwIO $ Wreq.JSONError "Missing key 'is_compatible' in Schema Registry response") (return . return) . viewCompatibility viewCompatibility :: Value -> Maybe Bool viewCompatibility (Object obj) = HM.lookup "is_compatible" obj >>= toBool viewCompatibility _ = Nothing toBool :: Value -> Maybe Bool toBool (Bool b) = Just b toBool _ = Nothing getGlobalConfig :: MonadIO m => SchemaRegistry -> m (Either SchemaRegistryError Compatibility) getGlobalConfig sr = do let url = (srBaseUrl sr) ++ "/config" resp <- liftIO $ Wreq.getWith wreqOpts url pure $ bimap wrapError (view Wreq.responseBody) (Wreq.asJSON resp) getSubjectConfig :: MonadIO m => SchemaRegistry -> Subject -> m (Either SchemaRegistryError Compatibility) getSubjectConfig sr (Subject sbj) = do let url = (srBaseUrl sr) ++ "/config/" ++ unpack sbj resp <- liftIO $ Wreq.getWith wreqOpts url pure $ bimap wrapError (view Wreq.responseBody) (Wreq.asJSON resp) getSubjects :: MonadIO m => SchemaRegistry -> m (Either SchemaRegistryError [Subject]) getSubjects sr = do let url = (srBaseUrl sr) ++ "/subjects" resp <- liftIO $ Wreq.getWith wreqOpts url pure $ bimap wrapError (fmap Subject . view Wreq.responseBody) (Wreq.asJSON resp) ------------------ PRIVATE: HELPERS -------------------------------------------- wreqOpts :: Wreq.Options wreqOpts = let accept = ["application/vnd.schemaregistry.v1+json", "application/vnd.schemaregistry+json", "application/json"] in Wreq.defaults & Wreq.header "Accept" .~ accept getSchemaById :: String -> SchemaId -> IO (Either SchemaRegistryError RegisteredSchema) getSchemaById baseUrl sid@(SchemaId i) = do let schemaUrl = baseUrl ++ "/schemas/ids/" ++ show i resp <- Wreq.getWith wreqOpts schemaUrl pure $ bimap (const (SchemaRegistryLoadError sid)) (view Wreq.responseBody) (Wreq.asJSON resp) putSchema :: String -> Subject -> RegisteredSchema -> IO (Either SchemaRegistryError SchemaId) putSchema baseUrl (Subject sbj) schema = do let schemaUrl = baseUrl ++ "/subjects/" ++ unpack sbj ++ "/versions" resp <- Wreq.postWith wreqOpts schemaUrl (toJSON schema) pure $ bimap wrapError (view Wreq.responseBody) (Wreq.asJSON resp) fromHttpError :: HttpException -> (HttpExceptionContent -> SchemaRegistryError) -> SchemaRegistryError fromHttpError err f = case err of InvalidUrlException fld err' -> SchemaRegistryConnectError (fld ++ ": " ++ err') HttpExceptionRequest _ (ConnectionFailure err) -> SchemaRegistryConnectError (displayException err) HttpExceptionRequest _ ConnectionTimeout -> SchemaRegistryConnectError (displayException err) HttpExceptionRequest _ ProxyConnectException{} -> SchemaRegistryConnectError (displayException err) HttpExceptionRequest _ ConnectionClosed -> SchemaRegistryConnectError (displayException err) HttpExceptionRequest _ (InvalidDestinationHost _) -> SchemaRegistryConnectError (displayException err) HttpExceptionRequest _ TlsNotSupported -> SchemaRegistryConnectError (displayException err) #if MIN_VERSION_http_client(0,5,7) HttpExceptionRequest _ (InvalidProxySettings _) -> SchemaRegistryConnectError (displayException err) #endif HttpExceptionRequest _ err' -> f err' wrapError :: SomeException -> SchemaRegistryError wrapError someErr = case fromException someErr of Nothing -> SchemaRegistrySendError (displayException someErr) Just httpErr -> fromHttpError httpErr (\_ -> SchemaRegistrySendError (displayException someErr)) --------------------------------------------------------------------- fullTypeName :: Schema -> SchemaName fullTypeName r = SchemaName $ typeName r cachedSchema :: MonadIO m => SchemaRegistry -> SchemaId -> m (Maybe Schema) cachedSchema sr k = liftIO $ C.lookup (srCache sr) k {-# INLINE cachedSchema #-} cacheSchema :: MonadIO m => SchemaRegistry -> SchemaId -> Schema -> m () cacheSchema sr k v = liftIO $ C.insert (srCache sr) k v {-# INLINE cacheSchema #-} cachedId :: MonadIO m => SchemaRegistry -> Subject -> SchemaName -> m (Maybe SchemaId) cachedId sr subj scn = liftIO $ C.lookup (srReverseCache sr) (subj, scn) {-# INLINE cachedId #-} cacheId :: MonadIO m => SchemaRegistry -> Subject -> SchemaName -> SchemaId -> m () cacheId sr subj scn sid = liftIO $ C.insert (srReverseCache sr) (subj, scn) sid {-# INLINE cacheId #-} instance FromJSON RegisteredSchema where parseJSON (Object v) = withObject "expected schema" (\obj -> do sch <- obj .: "schema" maybe mempty (return . RegisteredSchema) (decode $ LText.encodeUtf8 sch) ) (Object v) parseJSON _ = mempty instance ToJSON RegisteredSchema where toJSON (RegisteredSchema v) = object ["schema" .= LText.decodeUtf8 (encode $ toJSON v)] instance FromJSON SchemaId where parseJSON (Object v) = SchemaId <$> v .: "id" parseJSON _ = mempty instance FromJSON Compatibility where parseJSON = withObject "Compatibility" $ \v -> do compatibility <- v .: "compatibilityLevel" case compatibility of "NONE" -> return $ NoCompatibility "FULL" -> return $ FullCompatibility "FORWARD" -> return $ ForwardCompatibility "BACKWARD" -> return $ BackwardCompatibility _ -> typeMismatch "Compatibility" compatibility