{-# LANGUAGE CPP                        #-}
{-# LANGUAGE DataKinds                  #-}
{-# LANGUAGE DeriveGeneric              #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE OverloadedStrings          #-}
{-# LANGUAGE ScopedTypeVariables        #-}
{-# LANGUAGE TypeOperators              #-}

module Kafka.Avro.SchemaRegistry
( schemaRegistry, loadSchema, sendSchema
, SchemaId(..), Subject(..)
, SchemaRegistry, SchemaRegistryError(..)
, RegisteredSchema(..)
) where

import           Control.Monad.IO.Class
import           Control.Monad.Trans.Except (ExceptT (..), runExceptT, withExceptT)
import           Data.Aeson
import           Data.Avro.Schema           (Schema, Type (..), typeName)
import           Data.Cache                 as C
import           Data.Hashable
import           Data.Int
import           Data.Proxy
import           Data.Text                  (Text, append, cons)
import qualified Data.Text.Encoding         as Text
import qualified Data.Text.Lazy.Encoding    as LText
import           GHC.Exception
import           GHC.Generics               (Generic)
import           Network.HTTP.Client        (Manager, defaultManagerSettings, newManager)
import           Servant.API
import           Servant.Client

newtype SchemaId = SchemaId Int32 deriving (Eq, Ord, Show, Hashable)
newtype SchemaName = SchemaName Text deriving (Eq, Ord, Show, Hashable)

newtype Subject = Subject Text deriving (Eq, Show, Generic, Hashable)

newtype RegisteredSchema = RegisteredSchema Schema deriving (Generic, Show)

data SchemaRegistry = SchemaRegistry
  { srCache        :: Cache SchemaId Schema
  , srReverseCache :: Cache (Subject, SchemaName) SchemaId
#if MIN_VERSION_servant(0,9,1)
  , srClientEnv    :: ClientEnv
#else
  , srManager      :: Manager
  , srBaseUrl      :: BaseUrl
#endif
  }

data SchemaRegistryError = SchemaRegistryConnectError String
                         | SchemaDecodeError SchemaId String
                         | SchemaRegistryLoadError SchemaId
                         | SchemaRegistryError SchemaId
                         | SchemaRegistrySendError String
                         deriving (Show, Eq)

schemaRegistry :: MonadIO m => String -> m SchemaRegistry
schemaRegistry url = liftIO $
  SchemaRegistry
  <$> newCache Nothing
  <*> newCache Nothing
#if MIN_VERSION_servant(0,9,1)
  <*> (ClientEnv <$> newManager defaultManagerSettings <*> parseBaseUrl url)
#else
  <*> newManager defaultManagerSettings
  <*> parseBaseUrl url
#endif

loadSchema :: MonadIO m => SchemaRegistry -> SchemaId -> m (Either SchemaRegistryError Schema)
loadSchema sr sid = do
  sc <- cachedSchema sr sid
  case sc of
    Just s  -> return (Right s)
    Nothing -> do
#if MIN_VERSION_servant(0,9,1)
       res <- loadSchemaFromSR (srClientEnv sr) sid
#else
       res <- loadSchemaFromSR (srManager sr) (srBaseUrl sr) sid
#endif
       _   <- traverse (cacheSchema sr sid) res
       return res

sendSchema :: MonadIO m => SchemaRegistry -> Subject -> Schema -> m (Either SchemaRegistryError SchemaId)
sendSchema sr subj sc = do
  sid <- cachedId sr subj schemaName
  case sid of
    Just sid' -> return (Right sid')
    Nothing   -> do
#if MIN_VERSION_servant(0,9,1)
      res <- sendSchemaToSR (srClientEnv sr) subj sc
#else
      res <- sendSchemaToSR (srManager sr) (srBaseUrl sr) subj sc
#endif
      _   <- traverse (cacheId sr subj schemaName) res
      _   <- traverse (\sid' -> cacheSchema sr sid' sc) res
      return res
  where
    schemaName = fullTypeName sc

------------------ PRIVATE: HELPERS --------------------------------------------

type API = "schemas" :> "ids" :> Capture "id" Int32 :> Get '[JSON] RegisteredSchema
      :<|> "subjects" :> Capture "subject" Subject :> "versions" :> ReqBody '[JSON] RegisteredSchema :> Post '[JSON] SchemaId
api :: Proxy API
api = Proxy

#if MIN_VERSION_servant(0,9,1)
--ExceptT ServantError IO SchemaId
getSchemaById :: Int32 -> ClientM RegisteredSchema
putSchema :: Subject -> RegisteredSchema -> ClientM SchemaId
#else
getSchemaById :: Int32 -> Manager -> BaseUrl -> ClientM RegisteredSchema
putSchema :: Subject -> RegisteredSchema -> Manager -> BaseUrl -> ClientM SchemaId
#endif
getSchemaById :<|> putSchema = client api

type P = ServantError

#if MIN_VERSION_servant(0,9,1)
sendSchemaToSR :: MonadIO m => ClientEnv -> Subject -> Schema -> m (Either SchemaRegistryError SchemaId)
sendSchemaToSR env subj s =
  runExceptT $ withExceptT toSRError $ runServant env $ putSchema subj (RegisteredSchema s)
#else
sendSchemaToSR :: MonadIO m => Manager -> BaseUrl -> Subject -> Schema -> m (Either SchemaRegistryError SchemaId)
sendSchemaToSR m u subj s =
  liftExceptT . withExceptT toSRError $ putSchema subj (RegisteredSchema s) m u
#endif
  where
    toSRError msg = case msg of
      ConnectionError ex   -> SchemaRegistryConnectError (show ex)
      DecodeFailure de _ _ -> SchemaRegistrySendError de
      err                  -> SchemaRegistrySendError (show err)

#if MIN_VERSION_servant(0,9,1)
loadSchemaFromSR :: MonadIO m => ClientEnv -> SchemaId -> m (Either SchemaRegistryError Schema)
loadSchemaFromSR env sid@(SchemaId i) =
  runExceptT (withExceptT toSRError $ runServant env $ unwrapResponse <$> getSchemaById i)
#else
loadSchemaFromSR :: MonadIO m => Manager -> BaseUrl -> SchemaId -> m (Either SchemaRegistryError Schema)
loadSchemaFromSR m u sid@(SchemaId i) =
  liftExceptT (withExceptT toSRError $ unwrapResponse <$> getSchemaById i m u)
#endif
  where
    unwrapResponse (RegisteredSchema s) = s
    toSRError msg = case msg of
      ConnectionError ex   -> SchemaRegistryConnectError (show ex)
      FailureResponse{}    -> SchemaRegistryLoadError sid
      DecodeFailure de _ _ -> SchemaDecodeError sid de
      _                    -> SchemaRegistryLoadError sid

---------------------------------------------------------------------
fullTypeName :: Schema -> SchemaName
fullTypeName r = SchemaName $ case r of
  Record{} -> maybe (typeName r)
                    (\ns -> ns `append` ('.' `cons` typeName r))
                    (namespace r)
  _        -> typeName r

cachedSchema :: MonadIO m => SchemaRegistry -> SchemaId -> m (Maybe Schema)
cachedSchema sr k = liftIO $ C.lookup (srCache sr) k
{-# INLINE cachedSchema #-}

cacheSchema :: MonadIO m => SchemaRegistry -> SchemaId -> Schema -> m ()
cacheSchema sr k v = liftIO $ C.insert (srCache sr) k v
{-# INLINE cacheSchema #-}

cachedId :: MonadIO m => SchemaRegistry -> Subject -> SchemaName -> m (Maybe SchemaId)
cachedId sr subj scn = liftIO $ C.lookup (srReverseCache sr) (subj, scn)
{-# INLINE cachedId #-}

cacheId :: MonadIO m => SchemaRegistry -> Subject -> SchemaName -> SchemaId -> m ()
cacheId sr subj scn sid = liftIO $ C.insert (srReverseCache sr) (subj, scn) sid
{-# INLINE cacheId #-}

instance FromJSON RegisteredSchema where
  parseJSON (Object v) =
    withObject "expected schema" (\obj -> do
      sch <- obj .: "schema"
      maybe mempty (return . RegisteredSchema) (decode $ LText.encodeUtf8 sch)
    ) (Object v)

  parseJSON _ = mempty

instance ToJSON RegisteredSchema where
  toJSON (RegisteredSchema v) = object ["schema" .= LText.decodeUtf8 (encode $ toJSON v)]

instance FromJSON SchemaId where
  parseJSON (Object v) = SchemaId <$> v .: "id"
  parseJSON _          = mempty

instance ToHttpApiData Subject where
  toUrlPiece (Subject s) = toUrlPiece s

#if MIN_VERSION_servant(0,9,1)
runServant :: MonadIO m => ClientEnv -> ClientM a -> ExceptT ServantError m a
runServant env cli = ExceptT $ liftIO (runClientM cli env)
#else
liftExceptT :: MonadIO m => ExceptT l IO r -> m (Either l r)
liftExceptT = liftIO . runExceptT
#endif