{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Kafka.Avro.SchemaRegistry
( schemaRegistry, loadSchema, sendSchema
, SchemaId(..), Subject(..)
, SchemaRegistry, SchemaRegistryError(..)
, Schema(..)
) where
import Control.Arrow (first)
import Control.Lens (view, (&), (.~), (^.))
import Control.Monad (void)
import Control.Monad.IO.Class (MonadIO, liftIO)
import Data.Aeson
import Data.Avro.Schema (Schema, Type (..), typeName)
import Data.Bifunctor (bimap)
import Data.Cache as C
import Data.Hashable (Hashable)
import Data.Int (Int32)
import Data.String (IsString)
import Data.Text (Text, append, cons, unpack)
import qualified Data.Text.Encoding as Text
import qualified Data.Text.Lazy.Encoding as LText
import GHC.Exception (SomeException, displayException, fromException)
import GHC.Generics (Generic)
import Network.HTTP.Client (HttpException (..), HttpExceptionContent (..), Manager, defaultManagerSettings, newManager)
import qualified Network.Wreq as Wreq
newtype SchemaId = SchemaId { unSchemaId :: Int32} deriving (Eq, Ord, Show, Hashable)
newtype SchemaName = SchemaName Text deriving (Eq, Ord, IsString, Show, Hashable)
newtype Subject = Subject { unSubject :: Text} deriving (Eq, Show, IsString, Ord, Generic, Hashable)
newtype RegisteredSchema = RegisteredSchema { unRegisteredSchema :: Schema} deriving (Generic, Show)
data SchemaRegistry = SchemaRegistry
{ srCache :: Cache SchemaId Schema
, srReverseCache :: Cache (Subject, SchemaName) SchemaId
, srBaseUrl :: String
}
data SchemaRegistryError = SchemaRegistryConnectError String
| SchemaRegistryLoadError SchemaId
| SchemaRegistrySchemaNotFound SchemaId
| SchemaRegistrySendError String
deriving (Show, Eq)
schemaRegistry :: MonadIO m => String -> m SchemaRegistry
schemaRegistry url = liftIO $
SchemaRegistry
<$> newCache Nothing
<*> newCache Nothing
<*> pure url
loadSchema :: MonadIO m => SchemaRegistry -> SchemaId -> m (Either SchemaRegistryError Schema)
loadSchema sr sid = do
sc <- cachedSchema sr sid
case sc of
Just s -> return (Right s)
Nothing -> liftIO $ do
res <- getSchemaById (srBaseUrl sr) sid
pure (unRegisteredSchema <$> res)
sendSchema :: MonadIO m => SchemaRegistry -> Subject -> Schema -> m (Either SchemaRegistryError SchemaId)
sendSchema sr subj sc = do
sid <- cachedId sr subj schemaName
case sid of
Just sid' -> return (Right sid')
Nothing -> do
res <- liftIO $ putSchema (srBaseUrl sr) subj (RegisteredSchema sc)
void $ traverse (cacheId sr subj schemaName) res
void $ traverse (\sid' -> cacheSchema sr sid' sc) res
pure res
where
schemaName = fullTypeName sc
wreqOpts :: Wreq.Options
wreqOpts =
let accept = ["application/vnd.schemaregistry.v1+json", "application/vnd.schemaregistry+json", "application/json"]
in Wreq.defaults & Wreq.header "Accept" .~ accept
getSchemaById :: String -> SchemaId -> IO (Either SchemaRegistryError RegisteredSchema)
getSchemaById baseUrl sid@(SchemaId i) = do
let schemaUrl = baseUrl ++ "/schemas/ids/" ++ show i
resp <- Wreq.getWith wreqOpts schemaUrl
pure $ bimap (const (SchemaRegistryLoadError sid)) (view Wreq.responseBody) (Wreq.asJSON resp)
where
wrapError :: SomeException -> SchemaRegistryError
wrapError someErr = case fromException someErr of
Nothing -> SchemaRegistryLoadError sid
Just httpErr -> fromHttpError httpErr $ \case
StatusCodeException r _ | r ^. Wreq.responseStatus . Wreq.statusCode == 404 -> SchemaRegistrySchemaNotFound sid
_ -> SchemaRegistryLoadError sid
putSchema :: String -> Subject -> RegisteredSchema -> IO (Either SchemaRegistryError SchemaId)
putSchema baseUrl (Subject sbj) schema = do
let schemaUrl = baseUrl ++ "/subjects/" ++ unpack sbj ++ "/versions"
resp <- Wreq.postWith wreqOpts schemaUrl (toJSON schema)
pure $ bimap wrapError (view Wreq.responseBody) (Wreq.asJSON resp)
where
wrapError :: SomeException -> SchemaRegistryError
wrapError someErr = case fromException someErr of
Nothing -> SchemaRegistrySendError (displayException someErr)
Just httpErr -> fromHttpError httpErr (\_ -> SchemaRegistrySendError (displayException someErr))
fromHttpError :: HttpException -> (HttpExceptionContent -> SchemaRegistryError) -> SchemaRegistryError
fromHttpError err f = case err of
InvalidUrlException fld err' -> SchemaRegistryConnectError (fld ++ ": " ++ err')
HttpExceptionRequest _ (ConnectionFailure err) -> SchemaRegistryConnectError (displayException err)
HttpExceptionRequest _ ConnectionTimeout -> SchemaRegistryConnectError (displayException err)
HttpExceptionRequest _ ProxyConnectException{} -> SchemaRegistryConnectError (displayException err)
HttpExceptionRequest _ ConnectionClosed -> SchemaRegistryConnectError (displayException err)
HttpExceptionRequest _ (InvalidProxySettings _) -> SchemaRegistryConnectError (displayException err)
HttpExceptionRequest _ (InvalidDestinationHost _) -> SchemaRegistryConnectError (displayException err)
HttpExceptionRequest _ TlsNotSupported -> SchemaRegistryConnectError (displayException err)
HttpExceptionRequest _ err' -> f err'
fullTypeName :: Schema -> SchemaName
fullTypeName r = SchemaName $ case r of
Record{} -> maybe (typeName r)
(\ns -> ns `append` ('.' `cons` typeName r))
(namespace r)
_ -> typeName r
cachedSchema :: MonadIO m => SchemaRegistry -> SchemaId -> m (Maybe Schema)
cachedSchema sr k = liftIO $ C.lookup (srCache sr) k
{-# INLINE cachedSchema #-}
cacheSchema :: MonadIO m => SchemaRegistry -> SchemaId -> Schema -> m ()
cacheSchema sr k v = liftIO $ C.insert (srCache sr) k v
{-# INLINE cacheSchema #-}
cachedId :: MonadIO m => SchemaRegistry -> Subject -> SchemaName -> m (Maybe SchemaId)
cachedId sr subj scn = liftIO $ C.lookup (srReverseCache sr) (subj, scn)
{-# INLINE cachedId #-}
cacheId :: MonadIO m => SchemaRegistry -> Subject -> SchemaName -> SchemaId -> m ()
cacheId sr subj scn sid = liftIO $ C.insert (srReverseCache sr) (subj, scn) sid
{-# INLINE cacheId #-}
instance FromJSON RegisteredSchema where
parseJSON (Object v) =
withObject "expected schema" (\obj -> do
sch <- obj .: "schema"
maybe mempty (return . RegisteredSchema) (decode $ LText.encodeUtf8 sch)
) (Object v)
parseJSON _ = mempty
instance ToJSON RegisteredSchema where
toJSON (RegisteredSchema v) = object ["schema" .= LText.decodeUtf8 (encode $ toJSON v)]
instance FromJSON SchemaId where
parseJSON (Object v) = SchemaId <$> v .: "id"
parseJSON _ = mempty