module Kafka.Avro.SchemaRegistry
( schemaRegistry, loadSchema, sendSchema
, SchemaId(..), Subject(..)
, SchemaRegistry, SchemaRegistryError(..)
, RegisteredSchema(..)
) where
import Control.Monad.IO.Class
import Control.Monad.Trans.Except (ExceptT (..), runExceptT, withExceptT)
import Data.Aeson
import Data.Avro.Schema (Schema, Type (..), typeName)
import Data.Cache as C
import Data.Hashable
import Data.Int
import Data.Proxy
import Data.Text (Text, append, cons)
import qualified Data.Text.Encoding as Text
import qualified Data.Text.Lazy.Encoding as LText
import GHC.Exception
import GHC.Generics (Generic)
import Network.HTTP.Client (Manager, defaultManagerSettings, newManager)
import Servant.API
import Servant.Client
newtype SchemaId = SchemaId Int32 deriving (Eq, Ord, Show, Hashable)
newtype SchemaName = SchemaName Text deriving (Eq, Ord, Show, Hashable)
newtype Subject = Subject Text deriving (Eq, Show, Generic, Hashable)
newtype RegisteredSchema = RegisteredSchema Schema deriving (Generic, Show)
data SchemaRegistry = SchemaRegistry
{ srCache :: Cache SchemaId Schema
, srReverseCache :: Cache (Subject, SchemaName) SchemaId
#if MIN_VERSION_servant(0,9,1)
, srClientEnv :: ClientEnv
#else
, srManager :: Manager
, srBaseUrl :: BaseUrl
#endif
}
data SchemaRegistryError = SchemaRegistryConnectError String
| SchemaDecodeError SchemaId String
| SchemaRegistryLoadError SchemaId
| SchemaRegistryError SchemaId
| SchemaRegistrySendError String
deriving (Show, Eq)
schemaRegistry :: MonadIO m => String -> m SchemaRegistry
schemaRegistry url = liftIO $
SchemaRegistry
<$> newCache Nothing
<*> newCache Nothing
#if MIN_VERSION_servant(0,9,1)
<*> (ClientEnv <$> newManager defaultManagerSettings <*> parseBaseUrl url)
#else
<*> newManager defaultManagerSettings
<*> parseBaseUrl url
#endif
loadSchema :: MonadIO m => SchemaRegistry -> SchemaId -> m (Either SchemaRegistryError Schema)
loadSchema sr sid = do
sc <- cachedSchema sr sid
case sc of
Just s -> return (Right s)
Nothing -> do
#if MIN_VERSION_servant(0,9,1)
res <- loadSchemaFromSR (srClientEnv sr) sid
#else
res <- loadSchemaFromSR (srManager sr) (srBaseUrl sr) sid
#endif
_ <- traverse (cacheSchema sr sid) res
return res
sendSchema :: MonadIO m => SchemaRegistry -> Subject -> Schema -> m (Either SchemaRegistryError SchemaId)
sendSchema sr subj sc = do
sid <- cachedId sr subj schemaName
case sid of
Just sid' -> return (Right sid')
Nothing -> do
#if MIN_VERSION_servant(0,9,1)
res <- sendSchemaToSR (srClientEnv sr) subj sc
#else
res <- sendSchemaToSR (srManager sr) (srBaseUrl sr) subj sc
#endif
_ <- traverse (cacheId sr subj schemaName) res
_ <- traverse (\sid' -> cacheSchema sr sid' sc) res
return res
where
schemaName = fullTypeName sc
type API = "schemas" :> "ids" :> Capture "id" Int32 :> Get '[JSON] RegisteredSchema
:<|> "subjects" :> Capture "subject" Subject :> "versions" :> ReqBody '[JSON] RegisteredSchema :> Post '[JSON] SchemaId
api :: Proxy API
api = Proxy
#if MIN_VERSION_servant(0,9,1)
getSchemaById :: Int32 -> ClientM RegisteredSchema
putSchema :: Subject -> RegisteredSchema -> ClientM SchemaId
#else
getSchemaById :: Int32 -> Manager -> BaseUrl -> ClientM RegisteredSchema
putSchema :: Subject -> RegisteredSchema -> Manager -> BaseUrl -> ClientM SchemaId
#endif
getSchemaById :<|> putSchema = client api
type P = ServantError
#if MIN_VERSION_servant(0,9,1)
sendSchemaToSR :: MonadIO m => ClientEnv -> Subject -> Schema -> m (Either SchemaRegistryError SchemaId)
sendSchemaToSR env subj s =
runExceptT $ withExceptT toSRError $ runServant env $ putSchema subj (RegisteredSchema s)
#else
sendSchemaToSR :: MonadIO m => Manager -> BaseUrl -> Subject -> Schema -> m (Either SchemaRegistryError SchemaId)
sendSchemaToSR m u subj s =
liftExceptT . withExceptT toSRError $ putSchema subj (RegisteredSchema s) m u
#endif
where
toSRError msg = case msg of
ConnectionError ex -> SchemaRegistryConnectError (show ex)
DecodeFailure de _ _ -> SchemaRegistrySendError de
err -> SchemaRegistrySendError (show err)
#if MIN_VERSION_servant(0,9,1)
loadSchemaFromSR :: MonadIO m => ClientEnv -> SchemaId -> m (Either SchemaRegistryError Schema)
loadSchemaFromSR env sid@(SchemaId i) =
runExceptT (withExceptT toSRError $ runServant env $ unwrapResponse <$> getSchemaById i)
#else
loadSchemaFromSR :: MonadIO m => Manager -> BaseUrl -> SchemaId -> m (Either SchemaRegistryError Schema)
loadSchemaFromSR m u sid@(SchemaId i) =
liftExceptT (withExceptT toSRError $ unwrapResponse <$> getSchemaById i m u)
#endif
where
unwrapResponse (RegisteredSchema s) = s
toSRError msg = case msg of
ConnectionError ex -> SchemaRegistryConnectError (show ex)
FailureResponse{} -> SchemaRegistryLoadError sid
DecodeFailure de _ _ -> SchemaDecodeError sid de
_ -> SchemaRegistryLoadError sid
fullTypeName :: Schema -> SchemaName
fullTypeName r = SchemaName $ case r of
Record{} -> maybe (typeName r)
(\ns -> ns `append` ('.' `cons` typeName r))
(namespace r)
_ -> typeName r
cachedSchema :: MonadIO m => SchemaRegistry -> SchemaId -> m (Maybe Schema)
cachedSchema sr k = liftIO $ C.lookup (srCache sr) k
cacheSchema :: MonadIO m => SchemaRegistry -> SchemaId -> Schema -> m ()
cacheSchema sr k v = liftIO $ C.insert (srCache sr) k v
cachedId :: MonadIO m => SchemaRegistry -> Subject -> SchemaName -> m (Maybe SchemaId)
cachedId sr subj scn = liftIO $ C.lookup (srReverseCache sr) (subj, scn)
cacheId :: MonadIO m => SchemaRegistry -> Subject -> SchemaName -> SchemaId -> m ()
cacheId sr subj scn sid = liftIO $ C.insert (srReverseCache sr) (subj, scn) sid
instance FromJSON RegisteredSchema where
parseJSON (Object v) =
withObject "expected schema" (\obj -> do
sch <- obj .: "schema"
maybe mempty (return . RegisteredSchema) (decode $ LText.encodeUtf8 sch)
) (Object v)
parseJSON _ = mempty
instance ToJSON RegisteredSchema where
toJSON (RegisteredSchema v) = object ["schema" .= LText.decodeUtf8 (encode $ toJSON v)]
instance FromJSON SchemaId where
parseJSON (Object v) = SchemaId <$> v .: "id"
parseJSON _ = mempty
instance ToHttpApiData Subject where
toUrlPiece (Subject s) = toUrlPiece s
#if MIN_VERSION_servant(0,9,1)
runServant :: MonadIO m => ClientEnv -> ClientM a -> ExceptT ServantError m a
runServant env cli = ExceptT $ liftIO (runClientM cli env)
#else
liftExceptT :: MonadIO m => ExceptT l IO r -> m (Either l r)
liftExceptT = liftIO . runExceptT
#endif