module Kafka.Avro
( module X
, propagateKeySchema
, propagateValueSchema
) where

import Control.Monad.IO.Class
import Data.ByteString.Lazy

import Kafka.Avro.Decode         as X
import Kafka.Avro.Encode         as X
import Kafka.Avro.SchemaRegistry as X

-- | Registers schema that was used for a given payload against the specified subject as a key shema.
-- It is possible that a given payload doesn't have schema registered against it, in this case no prapagation happens.
propagateKeySchema :: MonadIO m => SchemaRegistry -> Subject -> ByteString -> m (Either SchemaRegistryError (Maybe SchemaId))
propagateKeySchema :: SchemaRegistry
-> Subject
-> ByteString
-> m (Either SchemaRegistryError (Maybe SchemaId))
propagateKeySchema SchemaRegistry
sr Subject
subj = SchemaRegistry
-> Subject
-> ByteString
-> m (Either SchemaRegistryError (Maybe SchemaId))
forall (m :: * -> *).
MonadIO m =>
SchemaRegistry
-> Subject
-> ByteString
-> m (Either SchemaRegistryError (Maybe SchemaId))
propagateSchema SchemaRegistry
sr (Subject -> Subject
keySubject Subject
subj)

-- | Registers schema that was used for a given payload against the specified subject as a value schema.
-- It is possible that a given payload doesn't have schema registered against it, in this case no prapagation happens.
propagateValueSchema :: MonadIO m => SchemaRegistry -> Subject -> ByteString -> m (Either SchemaRegistryError (Maybe SchemaId))
propagateValueSchema :: SchemaRegistry
-> Subject
-> ByteString
-> m (Either SchemaRegistryError (Maybe SchemaId))
propagateValueSchema SchemaRegistry
sr Subject
subj = SchemaRegistry
-> Subject
-> ByteString
-> m (Either SchemaRegistryError (Maybe SchemaId))
forall (m :: * -> *).
MonadIO m =>
SchemaRegistry
-> Subject
-> ByteString
-> m (Either SchemaRegistryError (Maybe SchemaId))
propagateSchema SchemaRegistry
sr (Subject -> Subject
valueSubject Subject
subj)

propagateSchema :: MonadIO m
                => SchemaRegistry
                -> Subject
                -> ByteString
                -> m (Either SchemaRegistryError (Maybe SchemaId))
propagateSchema :: SchemaRegistry
-> Subject
-> ByteString
-> m (Either SchemaRegistryError (Maybe SchemaId))
propagateSchema SchemaRegistry
sr Subject
subj ByteString
bs =
  case ByteString -> Maybe (SchemaId, ByteString)
extractSchemaId ByteString
bs of
    Maybe (SchemaId, ByteString)
Nothing -> Either SchemaRegistryError (Maybe SchemaId)
-> m (Either SchemaRegistryError (Maybe SchemaId))
forall (m :: * -> *) a. Monad m => a -> m a
return (Either SchemaRegistryError (Maybe SchemaId)
 -> m (Either SchemaRegistryError (Maybe SchemaId)))
-> Either SchemaRegistryError (Maybe SchemaId)
-> m (Either SchemaRegistryError (Maybe SchemaId))
forall a b. (a -> b) -> a -> b
$ Maybe SchemaId -> Either SchemaRegistryError (Maybe SchemaId)
forall a b. b -> Either a b
Right Maybe SchemaId
forall a. Maybe a
Nothing
    Just (SchemaId
sid, ByteString
_) -> do
      Either SchemaRegistryError Schema
mSchema <- SchemaRegistry -> SchemaId -> m (Either SchemaRegistryError Schema)
forall (m :: * -> *).
MonadIO m =>
SchemaRegistry -> SchemaId -> m (Either SchemaRegistryError Schema)
loadSchema SchemaRegistry
sr SchemaId
sid
      case Either SchemaRegistryError Schema
mSchema of
        Left (SchemaRegistrySchemaNotFound SchemaId
_) -> Either SchemaRegistryError (Maybe SchemaId)
-> m (Either SchemaRegistryError (Maybe SchemaId))
forall (m :: * -> *) a. Monad m => a -> m a
return (Either SchemaRegistryError (Maybe SchemaId)
 -> m (Either SchemaRegistryError (Maybe SchemaId)))
-> Either SchemaRegistryError (Maybe SchemaId)
-> m (Either SchemaRegistryError (Maybe SchemaId))
forall a b. (a -> b) -> a -> b
$ Maybe SchemaId -> Either SchemaRegistryError (Maybe SchemaId)
forall a b. b -> Either a b
Right Maybe SchemaId
forall a. Maybe a
Nothing
        Left SchemaRegistryError
err                              -> Either SchemaRegistryError (Maybe SchemaId)
-> m (Either SchemaRegistryError (Maybe SchemaId))
forall (m :: * -> *) a. Monad m => a -> m a
return (Either SchemaRegistryError (Maybe SchemaId)
 -> m (Either SchemaRegistryError (Maybe SchemaId)))
-> Either SchemaRegistryError (Maybe SchemaId)
-> m (Either SchemaRegistryError (Maybe SchemaId))
forall a b. (a -> b) -> a -> b
$ SchemaRegistryError -> Either SchemaRegistryError (Maybe SchemaId)
forall a b. a -> Either a b
Left SchemaRegistryError
err
        Right Schema
schema                          -> (SchemaId -> Maybe SchemaId)
-> Either SchemaRegistryError SchemaId
-> Either SchemaRegistryError (Maybe SchemaId)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap SchemaId -> Maybe SchemaId
forall a. a -> Maybe a
Just (Either SchemaRegistryError SchemaId
 -> Either SchemaRegistryError (Maybe SchemaId))
-> m (Either SchemaRegistryError SchemaId)
-> m (Either SchemaRegistryError (Maybe SchemaId))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> SchemaRegistry
-> Subject -> Schema -> m (Either SchemaRegistryError SchemaId)
forall (m :: * -> *).
MonadIO m =>
SchemaRegistry
-> Subject -> Schema -> m (Either SchemaRegistryError SchemaId)
sendSchema SchemaRegistry
sr Subject
subj Schema
schema