module Network.Nakadi.Config
( newConfig
, newConfigIO
, newConfigWithDedicatedManager
, newConfigFromEnv
, setHttpManager
, setRequestModifier
, setDeserializationFailureCallback
, setStreamConnectCallback
, setHttpErrorCallback
, setLogFunc
, setRetryPolicy
, setWorkerThreads
, setMaxUncommittedEvents
, setBatchLimit
, setStreamLimit
, setBatchFlushTimeout
, setStreamTimeout
, setStreamKeepAliveLimit
, setFlowId
, setCommitStrategy
, setCommitTimeout
, setShowTimeLag
)
where
import Network.Nakadi.Internal.Prelude
import Control.Lens
import Control.Retry
import Network.HTTP.Client ( Manager
, ManagerSettings
)
import Network.HTTP.Client.TLS ( newTlsManagerWith )
import Network.Nakadi.Internal.HttpBackendIO
import qualified Network.Nakadi.Internal.Lenses
as L
import Network.Nakadi.Internal.Types
import Network.Nakadi.Internal.Config
newConfigIO
:: Request
-> ConfigIO
newConfigIO = newConfig httpBackendIO
newConfigWithDedicatedManager
:: (MonadIO b, MonadMask b, MonadIO m)
=> ManagerSettings
-> Request
-> m (Config b)
newConfigWithDedicatedManager mngrSettings request = do
manager <- newTlsManagerWith mngrSettings
pure $ newConfig httpBackendIO request & setHttpManager manager
setHttpManager :: Manager -> Config m -> Config m
setHttpManager = (L.manager ?~)
setRequestModifier :: (Request -> m Request) -> Config m -> Config m
setRequestModifier = (L.requestModifier .~)
setDeserializationFailureCallback :: (ByteString -> Text -> m ()) -> Config m -> Config m
setDeserializationFailureCallback = (L.deserializationFailureCallback ?~)
setStreamConnectCallback :: StreamConnectCallback m -> Config m -> Config m
setStreamConnectCallback = (L.streamConnectCallback ?~)
setHttpErrorCallback :: HttpErrorCallback m -> Config m -> Config m
setHttpErrorCallback = (L.httpErrorCallback ?~)
setLogFunc :: LogFunc m -> Config m -> Config m
setLogFunc = (L.logFunc ?~)
setRetryPolicy :: RetryPolicyM IO -> Config m -> Config m
setRetryPolicy = (L.retryPolicy .~)
setFlowId :: FlowId -> Config m -> Config m
setFlowId = (L.flowId ?~)
setCommitStrategy :: CommitStrategy -> Config m -> Config m
setCommitStrategy = (L.commitStrategy .~)
setCommitTimeout :: CommitTimeout -> Config m -> Config m
setCommitTimeout = (L.commitTimeout ?~)
setWorkerThreads :: Int -> Config m -> Config m
setWorkerThreads n = (L.worker . L.nThreads .~ n)
setMaxUncommittedEvents :: Int32 -> Config m -> Config m
setMaxUncommittedEvents = (L.maxUncommittedEvents ?~)
setBatchLimit :: Int32 -> Config m -> Config m
setBatchLimit = (L.batchLimit ?~)
setStreamLimit :: Int32 -> Config m -> Config m
setStreamLimit = (L.streamLimit ?~)
setBatchFlushTimeout :: Int32 -> Config m -> Config m
setBatchFlushTimeout = (L.batchFlushTimeout ?~)
setStreamTimeout :: Int32 -> Config m -> Config m
setStreamTimeout = (L.streamTimeout ?~)
setStreamKeepAliveLimit :: Int32 -> Config m -> Config m
setStreamKeepAliveLimit = (L.streamKeepAliveLimit ?~)
setShowTimeLag :: Bool -> Config m -> Config m
setShowTimeLag flag = L.subscriptionStats ?~ SubscriptionStatsConf { _showTimeLag = flag }