{-# LANGUAGE OverloadedStrings #-}

module Kafka.Consumer.Subscription
( Subscription(..)
, topics
, offsetReset
, autoCommit
, extraSubscriptionProps
)
where

import qualified Data.Text            as Text
import           Data.Text            (Text)
import           Data.Map             (Map)
import qualified Data.Map             as M
import           Data.Semigroup       as Sem
import           Kafka.Consumer.Types (OffsetReset(..))
import           Kafka.Types          (TopicName(..), Millis(..))
import           Data.Set             (Set)
import qualified Data.Set             as Set

data Subscription = Subscription (Set TopicName) (Map Text Text)

instance Sem.Semigroup Subscription where
  (Subscription ts1 m1) <> (Subscription ts2 m2) =
    let ts' = Set.union ts1 ts2
        ps' = M.union m1 m2
     in Subscription ts' ps'
  {-# INLINE (<>) #-}

instance Monoid Subscription where
  mempty = Subscription Set.empty M.empty
  {-# INLINE mempty #-}
  mappend = (Sem.<>)
  {-# INLINE mappend #-}

topics :: [TopicName] -> Subscription
topics ts = Subscription (Set.fromList ts) M.empty

offsetReset :: OffsetReset -> Subscription
offsetReset o =
  let o' = case o of
             Earliest -> "earliest"
             Latest   -> "latest"
   in Subscription (Set.empty) (M.fromList [("auto.offset.reset", o')])

autoCommit :: Millis -> Subscription
autoCommit (Millis ms) = Subscription (Set.empty) $
  M.fromList
    [ ("enable.auto.commit", "true")
    , ("auto.commit.interval.ms", Text.pack $ show ms)
    ]

extraSubscriptionProps :: Map Text Text -> Subscription
extraSubscriptionProps = Subscription (Set.empty)