module LaunchDarkly.Server.Network.Eventing (eventThread) where
import Data.Aeson (encode)
import Data.Function ((&))
import Data.Tuple (swap)
import Data.IORef (newIORef, readIORef, atomicModifyIORef')
import qualified Data.UUID as UUID
import Control.Monad.Logger (MonadLogger, logDebug, logWarn, logError)
import Control.Monad.IO.Class (MonadIO, liftIO)
import Network.HTTP.Client (Manager, Request(..), RequestBody(..), httpLbs, responseStatus)
import Data.Generics.Product (getField)
import qualified Data.Text as T
import Control.Concurrent (killThread, myThreadId)
import Control.Monad (forever, when, void, unless)
import Control.Monad.Catch (MonadMask, MonadThrow)
import Control.Concurrent.MVar (takeMVar, readMVar, swapMVar, modifyMVar_)
import System.Timeout (timeout)
import System.Random (newStdGen, random)
import Data.Text.Encoding (decodeUtf8)
import qualified Data.ByteString.Lazy as L
import Network.HTTP.Types.Status (status400, status408, status429, status500)
import LaunchDarkly.Server.Client.Internal (ClientI, Status(ShuttingDown))
import LaunchDarkly.Server.Network.Common (tryAuthorized, checkAuthorization, getServerTime, tryHTTP, addToAL)
import LaunchDarkly.Server.Events (processSummary, EventState)
import LaunchDarkly.Server.Config.ClientContext
import LaunchDarkly.Server.Config.HttpConfiguration (prepareRequest)
processSend :: (MonadIO m, MonadLogger m, MonadMask m, MonadThrow m) => Manager -> Request -> m (Bool, Integer)
processSend manager req = (liftIO $ tryHTTP $ httpLbs req manager) >>= \case
(Left err) -> $(logError) (T.pack $ show err) >> pure (False, 0)
(Right response) -> do
checkAuthorization response
let code = responseStatus response
serverTime = getServerTime response in
$(logWarn) (T.append "@@@ server time from LD was determined to be: " $ T.pack $ show serverTime) >>
if code < status400
then pure (True, serverTime)
else if (elem code [status400, status408, status429]) || code >= status500
then pure (False, serverTime)
else $(logWarn) (T.append "got non recoverable event post response dropping payload: " $ T.pack $ show code) >> pure (True, serverTime)
setEventHeaders :: Request -> Request
setEventHeaders request = request
{ requestHeaders = (requestHeaders request)
& \l -> addToAL l "Content-Type" "application/json"
& \l -> addToAL l "X-LaunchDarkly-Event-Schema" "3"
, method = "POST"
}
updateLastKnownServerTime :: EventState -> Integer -> IO ()
updateLastKnownServerTime state serverTime = modifyMVar_ (getField @"lastKnownServerTime" state) (\lastKnown -> pure $ max serverTime lastKnown)
eventThread :: (MonadIO m, MonadLogger m, MonadMask m) => Manager -> ClientI -> ClientContext -> m ()
eventThread manager client clientContext = do
let state = getField @"events" client; config = getField @"config" client; httpConfig = httpConfiguration clientContext
rngRef <- liftIO $ newStdGen >>= newIORef
req <- (liftIO $ prepareRequest httpConfig $ (T.unpack $ getField @"eventsURI" config) ++ "/bulk") >>= pure . setEventHeaders
void $ tryAuthorized client $ forever $ do
liftIO $ processSummary config state
events' <- liftIO $ swapMVar (getField @"events" state) []
when (not $ null events') $ do
payloadId <- liftIO $ atomicModifyIORef' rngRef (swap . random)
let
encoded = encode events'
thisReq = req
{ requestBody = RequestBodyLBS encoded
, requestHeaders = (requestHeaders req)
& \l -> addToAL l "X-LaunchDarkly-Payload-ID" (UUID.toASCIIBytes payloadId)
}
(success, serverTime) <- processSend manager thisReq
$(logDebug) $ T.append "sending events: " $ decodeUtf8 $ L.toStrict encoded
_ <- case success of
True -> liftIO $ updateLastKnownServerTime state serverTime
False -> do
$(logWarn) "retrying event delivery after one second"
liftIO $ void $ timeout (1 * 1000000) $ readMVar $ getField @"flush" state
(success', serverTime') <- processSend manager thisReq
unless success' $ do
$(logWarn) "failed sending events on retry, dropping event batch"
liftIO $ updateLastKnownServerTime state serverTime'
$(logDebug) "finished send of event batch"
status <- liftIO $ readIORef $ getField @"status" client
liftIO $ when (status == ShuttingDown) (myThreadId >>= killThread)
liftIO $ void $ timeout ((*) 1000000 $ fromIntegral $ getField @"flushIntervalSeconds" config) $ takeMVar $ getField @"flush" state