module LaunchDarkly.Server.Network.Eventing (eventThread) where

import Control.Concurrent (killThread, myThreadId)
import Control.Concurrent.MVar (modifyMVar_, readMVar, swapMVar, takeMVar)
import Control.Monad (forever, unless, void, when)
import Control.Monad.Catch (MonadMask, MonadThrow)
import Control.Monad.IO.Class (MonadIO, liftIO)
import Control.Monad.Logger (MonadLogger, logDebug, logError, logWarn)
import Data.Aeson (encode)
import qualified Data.ByteString.Lazy as L
import Data.Function ((&))
import Data.Generics.Product (getField)
import Data.IORef (atomicModifyIORef', newIORef, readIORef)
import qualified Data.Text as T
import Data.Text.Encoding (decodeUtf8)
import Data.Tuple (swap)
import qualified Data.UUID as UUID
import Network.HTTP.Client (Manager, Request (..), RequestBody (..), httpLbs, responseStatus)
import Network.HTTP.Types.Status (Status (statusCode), status400)
import System.Random (newStdGen, random)
import System.Timeout (timeout)

import LaunchDarkly.Server.Client.Internal (Client, Status (ShuttingDown))
import LaunchDarkly.Server.Config.ClientContext
import LaunchDarkly.Server.Config.HttpConfiguration (prepareRequest)
import LaunchDarkly.Server.Events (EventState, processSummary)
import LaunchDarkly.Server.Network.Common (addToAL, checkAuthorization, getServerTime, isHttpUnrecoverable, tryAuthorized, tryHTTP)

-- A true result indicates a retry does not need to be attempted
processSend :: (MonadIO m, MonadLogger m, MonadMask m, MonadThrow m) => Manager -> Request -> m (Bool, Integer)
processSend :: forall (m :: * -> *).
(MonadIO m, MonadLogger m, MonadMask m, MonadThrow m) =>
Manager -> Request -> m (Bool, Integer)
processSend Manager
manager Request
req =
    (forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
MonadCatch m =>
m a -> m (Either HttpException a)
tryHTTP forall a b. (a -> b) -> a -> b
$ Request -> Manager -> IO (Response ByteString)
httpLbs Request
req Manager
manager) forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
        (Left HttpException
err) -> $(logError) (String -> Text
T.pack forall a b. (a -> b) -> a -> b
$ forall a. Show a => a -> String
show HttpException
err) forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall (f :: * -> *) a. Applicative f => a -> f a
pure (Bool
False, Integer
0)
        (Right Response ByteString
response) -> do
            forall (m :: * -> *) body. MonadThrow m => Response body -> m ()
checkAuthorization Response ByteString
response
            let code :: Status
code = forall body. Response body -> Status
responseStatus Response ByteString
response
                serverTime :: Integer
serverTime = forall body. Response body -> Integer
getServerTime Response ByteString
response
             in $(logWarn) (Text -> Text -> Text
T.append Text
"@@@ server time from LD was determined to be: " forall a b. (a -> b) -> a -> b
$ String -> Text
T.pack forall a b. (a -> b) -> a -> b
$ forall a. Show a => a -> String
show Integer
serverTime)
                    forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> if Status
code forall a. Ord a => a -> a -> Bool
< Status
status400
                        then forall (f :: * -> *) a. Applicative f => a -> f a
pure (Bool
True, Integer
serverTime)
                        else
                            if Int -> Bool
isHttpUnrecoverable forall a b. (a -> b) -> a -> b
$ Status -> Int
statusCode forall a b. (a -> b) -> a -> b
$ Status
code
                                then $(logWarn) (Text -> Text -> Text
T.append Text
"got non recoverable event post response dropping payload: " forall a b. (a -> b) -> a -> b
$ String -> Text
T.pack forall a b. (a -> b) -> a -> b
$ forall a. Show a => a -> String
show Status
code) forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall (f :: * -> *) a. Applicative f => a -> f a
pure (Bool
True, Integer
serverTime)
                                else forall (f :: * -> *) a. Applicative f => a -> f a
pure (Bool
False, Integer
serverTime)

setEventHeaders :: Request -> Request
setEventHeaders :: Request -> Request
setEventHeaders Request
request =
    Request
request
        { requestHeaders :: RequestHeaders
requestHeaders =
            (Request -> RequestHeaders
requestHeaders Request
request)
                forall a b. a -> (a -> b) -> b
& \RequestHeaders
l ->
                    forall k v. Eq k => [(k, v)] -> k -> v -> [(k, v)]
addToAL RequestHeaders
l HeaderName
"Content-Type" ByteString
"application/json"
                        forall a b. a -> (a -> b) -> b
& \RequestHeaders
l -> forall k v. Eq k => [(k, v)] -> k -> v -> [(k, v)]
addToAL RequestHeaders
l HeaderName
"X-LaunchDarkly-Event-Schema" ByteString
"4"
        , method :: ByteString
method = ByteString
"POST"
        }

updateLastKnownServerTime :: EventState -> Integer -> IO ()
updateLastKnownServerTime :: EventState -> Integer -> IO ()
updateLastKnownServerTime EventState
state Integer
serverTime = forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ (forall (f :: Symbol) a s. HasField' f s a => s -> a
getField @"lastKnownServerTime" EventState
state) (\Integer
lastKnown -> forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ forall a. Ord a => a -> a -> a
max Integer
serverTime Integer
lastKnown)

eventThread :: (MonadIO m, MonadLogger m, MonadMask m) => Manager -> Client -> ClientContext -> m ()
eventThread :: forall (m :: * -> *).
(MonadIO m, MonadLogger m, MonadMask m) =>
Manager -> Client -> ClientContext -> m ()
eventThread Manager
manager Client
client ClientContext
clientContext = do
    let state :: EventState
state = forall (f :: Symbol) a s. HasField' f s a => s -> a
getField @"events" Client
client; config :: Config
config = forall (f :: Symbol) a s. HasField' f s a => s -> a
getField @"config" Client
client; httpConfig :: HttpConfiguration
httpConfig = ClientContext -> HttpConfiguration
httpConfiguration ClientContext
clientContext
    IORef StdGen
rngRef <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *). MonadIO m => m StdGen
newStdGen forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= forall a. a -> IO (IORef a)
newIORef
    Request
req <- (forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *).
MonadThrow m =>
HttpConfiguration -> String -> m Request
prepareRequest HttpConfiguration
httpConfig forall a b. (a -> b) -> a -> b
$ (Text -> String
T.unpack forall a b. (a -> b) -> a -> b
$ forall (f :: Symbol) a s. HasField' f s a => s -> a
getField @"eventsURI" Config
config) forall a. [a] -> [a] -> [a]
++ String
"/bulk") forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= forall (f :: * -> *) a. Applicative f => a -> f a
pure forall b c a. (b -> c) -> (a -> b) -> a -> c
. Request -> Request
setEventHeaders
    forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
(MonadIO m, MonadLogger m, MonadCatch m) =>
Client -> m a -> m ()
tryAuthorized Client
client forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) a b. Applicative f => f a -> f b
forever forall a b. (a -> b) -> a -> b
$ do
        forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ Config -> EventState -> IO ()
processSummary Config
config EventState
state
        [EventType]
events' <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. MVar a -> a -> IO a
swapMVar (forall (f :: Symbol) a s. HasField' f s a => s -> a
getField @"events" EventState
state) []
        forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool -> Bool
not forall a b. (a -> b) -> a -> b
$ forall (t :: * -> *) a. Foldable t => t a -> Bool
null [EventType]
events') forall a b. (a -> b) -> a -> b
$ do
            UUID
payloadId <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef StdGen
rngRef (forall a b. (a, b) -> (b, a)
swap forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a g. (Random a, RandomGen g) => g -> (a, g)
random)
            let
                encoded :: ByteString
encoded = forall a. ToJSON a => a -> ByteString
encode [EventType]
events'
                thisReq :: Request
thisReq =
                    Request
req
                        { requestBody :: RequestBody
requestBody = ByteString -> RequestBody
RequestBodyLBS ByteString
encoded
                        , requestHeaders :: RequestHeaders
requestHeaders =
                            (Request -> RequestHeaders
requestHeaders Request
req)
                                forall a b. a -> (a -> b) -> b
& \RequestHeaders
l -> forall k v. Eq k => [(k, v)] -> k -> v -> [(k, v)]
addToAL RequestHeaders
l HeaderName
"X-LaunchDarkly-Payload-ID" (UUID -> ByteString
UUID.toASCIIBytes UUID
payloadId)
                        }
            (Bool
success, Integer
serverTime) <- forall (m :: * -> *).
(MonadIO m, MonadLogger m, MonadMask m, MonadThrow m) =>
Manager -> Request -> m (Bool, Integer)
processSend Manager
manager Request
thisReq
            $(logDebug) forall a b. (a -> b) -> a -> b
$ Text -> Text -> Text
T.append Text
"sending events: " forall a b. (a -> b) -> a -> b
$ ByteString -> Text
decodeUtf8 forall a b. (a -> b) -> a -> b
$ ByteString -> ByteString
L.toStrict ByteString
encoded
            ()
_ <- case Bool
success of
                Bool
True -> forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ EventState -> Integer -> IO ()
updateLastKnownServerTime EventState
state Integer
serverTime
                Bool
False -> do
                    $(logWarn) Text
"retrying event delivery after one second"
                    forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall a. Int -> IO a -> IO (Maybe a)
timeout (Int
1 forall a. Num a => a -> a -> a
* Int
1000000) forall a b. (a -> b) -> a -> b
$ forall a. MVar a -> IO a
readMVar forall a b. (a -> b) -> a -> b
$ forall (f :: Symbol) a s. HasField' f s a => s -> a
getField @"flush" EventState
state
                    (Bool
success', Integer
serverTime') <- forall (m :: * -> *).
(MonadIO m, MonadLogger m, MonadMask m, MonadThrow m) =>
Manager -> Request -> m (Bool, Integer)
processSend Manager
manager Request
thisReq
                    forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
success' forall a b. (a -> b) -> a -> b
$ do
                        $(logWarn) Text
"failed sending events on retry, dropping event batch"
                    forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ EventState -> Integer -> IO ()
updateLastKnownServerTime EventState
state Integer
serverTime'
            $(logDebug) Text
"finished send of event batch"
        Status
status <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> IO a
readIORef forall a b. (a -> b) -> a -> b
$ forall (f :: Symbol) a s. HasField' f s a => s -> a
getField @"status" Client
client
        forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Status
status forall a. Eq a => a -> a -> Bool
== Status
ShuttingDown) (IO ThreadId
myThreadId forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= ThreadId -> IO ()
killThread)
        forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall a. Int -> IO a -> IO (Maybe a)
timeout (forall a. Num a => a -> a -> a
(*) Int
1000000 forall a b. (a -> b) -> a -> b
$ forall a b. (Integral a, Num b) => a -> b
fromIntegral forall a b. (a -> b) -> a -> b
$ forall (f :: Symbol) a s. HasField' f s a => s -> a
getField @"flushIntervalSeconds" Config
config) forall a b. (a -> b) -> a -> b
$ forall a. MVar a -> IO a
takeMVar forall a b. (a -> b) -> a -> b
$ forall (f :: Symbol) a s. HasField' f s a => s -> a
getField @"flush" EventState
state