{-# LANGUAGE DeriveGeneric #-} {-# LANGUAGE GeneralizedNewtypeDeriving #-} {-# LANGUAGE LambdaCase #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE ScopedTypeVariables #-} module Kafka.Avro.SchemaRegistry ( schemaRegistry, loadSchema, sendSchema , SchemaId(..), Subject(..) , SchemaRegistry, SchemaRegistryError(..) , Schema(..) ) where import Control.Arrow (first) import Control.Lens (view, (&), (.~), (^.)) import Control.Monad (void) import Control.Monad.IO.Class (MonadIO, liftIO) import Data.Aeson import Data.Avro.Schema (Schema, Type (..), typeName) import Data.Bifunctor (bimap) import Data.Cache as C import Data.Hashable (Hashable) import Data.Int (Int32) import Data.String (IsString) import Data.Text (Text, append, cons, unpack) import qualified Data.Text.Encoding as Text import qualified Data.Text.Lazy.Encoding as LText import GHC.Exception (SomeException, displayException, fromException) import GHC.Generics (Generic) import Network.HTTP.Client (HttpException (..), HttpExceptionContent (..), Manager, defaultManagerSettings, newManager) import qualified Network.Wreq as Wreq newtype SchemaId = SchemaId { unSchemaId :: Int32} deriving (Eq, Ord, Show, Hashable) newtype SchemaName = SchemaName Text deriving (Eq, Ord, IsString, Show, Hashable) newtype Subject = Subject { unSubject :: Text} deriving (Eq, Show, IsString, Ord, Generic, Hashable) newtype RegisteredSchema = RegisteredSchema { unRegisteredSchema :: Schema} deriving (Generic, Show) data SchemaRegistry = SchemaRegistry { srCache :: Cache SchemaId Schema , srReverseCache :: Cache (Subject, SchemaName) SchemaId , srBaseUrl :: String } data SchemaRegistryError = SchemaRegistryConnectError String | SchemaRegistryLoadError SchemaId | SchemaRegistrySchemaNotFound SchemaId | SchemaRegistrySendError String deriving (Show, Eq) schemaRegistry :: MonadIO m => String -> m SchemaRegistry schemaRegistry url = liftIO $ SchemaRegistry <$> newCache Nothing <*> newCache Nothing <*> pure url loadSchema :: MonadIO m => SchemaRegistry -> SchemaId -> m (Either SchemaRegistryError Schema) loadSchema sr sid = do sc <- cachedSchema sr sid case sc of Just s -> return (Right s) Nothing -> liftIO $ do res <- getSchemaById (srBaseUrl sr) sid pure (unRegisteredSchema <$> res) sendSchema :: MonadIO m => SchemaRegistry -> Subject -> Schema -> m (Either SchemaRegistryError SchemaId) sendSchema sr subj sc = do sid <- cachedId sr subj schemaName case sid of Just sid' -> return (Right sid') Nothing -> do res <- liftIO $ putSchema (srBaseUrl sr) subj (RegisteredSchema sc) void $ traverse (cacheId sr subj schemaName) res void $ traverse (\sid' -> cacheSchema sr sid' sc) res pure res where schemaName = fullTypeName sc ------------------ PRIVATE: HELPERS -------------------------------------------- wreqOpts :: Wreq.Options wreqOpts = let accept = ["application/vnd.schemaregistry.v1+json", "application/vnd.schemaregistry+json", "application/json"] in Wreq.defaults & Wreq.header "Accept" .~ accept getSchemaById :: String -> SchemaId -> IO (Either SchemaRegistryError RegisteredSchema) getSchemaById baseUrl sid@(SchemaId i) = do let schemaUrl = baseUrl ++ "/schemas/ids/" ++ show i resp <- Wreq.getWith wreqOpts schemaUrl pure $ bimap (const (SchemaRegistryLoadError sid)) (view Wreq.responseBody) (Wreq.asJSON resp) where wrapError :: SomeException -> SchemaRegistryError wrapError someErr = case fromException someErr of Nothing -> SchemaRegistryLoadError sid Just httpErr -> fromHttpError httpErr $ \case StatusCodeException r _ | r ^. Wreq.responseStatus . Wreq.statusCode == 404 -> SchemaRegistrySchemaNotFound sid _ -> SchemaRegistryLoadError sid putSchema :: String -> Subject -> RegisteredSchema -> IO (Either SchemaRegistryError SchemaId) putSchema baseUrl (Subject sbj) schema = do let schemaUrl = baseUrl ++ "/subjects/" ++ unpack sbj ++ "/versions" resp <- Wreq.postWith wreqOpts schemaUrl (toJSON schema) pure $ bimap wrapError (view Wreq.responseBody) (Wreq.asJSON resp) where wrapError :: SomeException -> SchemaRegistryError wrapError someErr = case fromException someErr of Nothing -> SchemaRegistrySendError (displayException someErr) Just httpErr -> fromHttpError httpErr (\_ -> SchemaRegistrySendError (displayException someErr)) fromHttpError :: HttpException -> (HttpExceptionContent -> SchemaRegistryError) -> SchemaRegistryError fromHttpError err f = case err of InvalidUrlException fld err' -> SchemaRegistryConnectError (fld ++ ": " ++ err') HttpExceptionRequest _ (ConnectionFailure err) -> SchemaRegistryConnectError (displayException err) HttpExceptionRequest _ ConnectionTimeout -> SchemaRegistryConnectError (displayException err) HttpExceptionRequest _ ProxyConnectException{} -> SchemaRegistryConnectError (displayException err) HttpExceptionRequest _ ConnectionClosed -> SchemaRegistryConnectError (displayException err) HttpExceptionRequest _ (InvalidProxySettings _) -> SchemaRegistryConnectError (displayException err) HttpExceptionRequest _ (InvalidDestinationHost _) -> SchemaRegistryConnectError (displayException err) HttpExceptionRequest _ TlsNotSupported -> SchemaRegistryConnectError (displayException err) HttpExceptionRequest _ err' -> f err' --------------------------------------------------------------------- fullTypeName :: Schema -> SchemaName fullTypeName r = SchemaName $ case r of Record{} -> maybe (typeName r) (\ns -> ns `append` ('.' `cons` typeName r)) (namespace r) _ -> typeName r cachedSchema :: MonadIO m => SchemaRegistry -> SchemaId -> m (Maybe Schema) cachedSchema sr k = liftIO $ C.lookup (srCache sr) k {-# INLINE cachedSchema #-} cacheSchema :: MonadIO m => SchemaRegistry -> SchemaId -> Schema -> m () cacheSchema sr k v = liftIO $ C.insert (srCache sr) k v {-# INLINE cacheSchema #-} cachedId :: MonadIO m => SchemaRegistry -> Subject -> SchemaName -> m (Maybe SchemaId) cachedId sr subj scn = liftIO $ C.lookup (srReverseCache sr) (subj, scn) {-# INLINE cachedId #-} cacheId :: MonadIO m => SchemaRegistry -> Subject -> SchemaName -> SchemaId -> m () cacheId sr subj scn sid = liftIO $ C.insert (srReverseCache sr) (subj, scn) sid {-# INLINE cacheId #-} instance FromJSON RegisteredSchema where parseJSON (Object v) = withObject "expected schema" (\obj -> do sch <- obj .: "schema" maybe mempty (return . RegisteredSchema) (decode $ LText.encodeUtf8 sch) ) (Object v) parseJSON _ = mempty instance ToJSON RegisteredSchema where toJSON (RegisteredSchema v) = object ["schema" .= LText.decodeUtf8 (encode $ toJSON v)] instance FromJSON SchemaId where parseJSON (Object v) = SchemaId <$> v .: "id" parseJSON _ = mempty