module LaunchDarkly.Server.Network.Eventing (eventThread) where
import qualified Codec.Compression.GZip as GZip
import Control.Concurrent (killThread, myThreadId)
import Control.Concurrent.MVar (modifyMVar_, readMVar, swapMVar, takeMVar)
import Control.Monad (forever, unless, void, when)
import Control.Monad.Catch (MonadMask, MonadThrow)
import Control.Monad.IO.Class (MonadIO, liftIO)
import Control.Monad.Logger (MonadLogger, logDebug, logError, logWarn)
import Data.Aeson (encode)
import qualified Data.ByteString.Lazy as L
import Data.Function ((&))
import Data.Generics.Product (getField)
import Data.IORef (atomicModifyIORef', newIORef, readIORef)
import qualified Data.Text as T
import Data.Text.Encoding (decodeUtf8)
import Data.Tuple (swap)
import qualified Data.UUID as UUID
import Network.HTTP.Client (Manager, Request (..), RequestBody (..), httpLbs, responseStatus)
import Network.HTTP.Types.Status (Status (statusCode), status400)
import System.Random (newStdGen, random)
import System.Timeout (timeout)
import LaunchDarkly.Server.Client.Internal (Client, Status (ShuttingDown))
import LaunchDarkly.Server.Config.ClientContext
import LaunchDarkly.Server.Config.HttpConfiguration (prepareRequest)
import LaunchDarkly.Server.Events (EventState, processSummary)
import LaunchDarkly.Server.Network.Common (addToAL, checkAuthorization, getServerTime, isHttpUnrecoverable, tryAuthorized, tryHTTP)
processSend :: (MonadIO m, MonadLogger m, MonadMask m, MonadThrow m) => Manager -> Request -> m (Bool, Integer)
processSend :: forall (m :: * -> *).
(MonadIO m, MonadLogger m, MonadMask m, MonadThrow m) =>
Manager -> Request -> m (Bool, Integer)
processSend Manager
manager Request
req =
(IO (Either HttpException (Response ByteString))
-> m (Either HttpException (Response ByteString))
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either HttpException (Response ByteString))
-> m (Either HttpException (Response ByteString)))
-> IO (Either HttpException (Response ByteString))
-> m (Either HttpException (Response ByteString))
forall a b. (a -> b) -> a -> b
$ IO (Response ByteString)
-> IO (Either HttpException (Response ByteString))
forall (m :: * -> *) a.
MonadCatch m =>
m a -> m (Either HttpException a)
tryHTTP (IO (Response ByteString)
-> IO (Either HttpException (Response ByteString)))
-> IO (Response ByteString)
-> IO (Either HttpException (Response ByteString))
forall a b. (a -> b) -> a -> b
$ Request -> Manager -> IO (Response ByteString)
httpLbs Request
req Manager
manager) m (Either HttpException (Response ByteString))
-> (Either HttpException (Response ByteString)
-> m (Bool, Integer))
-> m (Bool, Integer)
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
(Left HttpException
err) -> $(logError) (String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ HttpException -> String
forall a. Show a => a -> String
show HttpException
err) m () -> m (Bool, Integer) -> m (Bool, Integer)
forall a b. m a -> m b -> m b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> (Bool, Integer) -> m (Bool, Integer)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Bool
False, Integer
0)
(Right Response ByteString
response) -> do
Response ByteString -> m ()
forall (m :: * -> *) body. MonadThrow m => Response body -> m ()
checkAuthorization Response ByteString
response
let code :: Status
code = Response ByteString -> Status
forall body. Response body -> Status
responseStatus Response ByteString
response
serverTime :: Integer
serverTime = Response ByteString -> Integer
forall body. Response body -> Integer
getServerTime Response ByteString
response
in $(logWarn) (Text -> Text -> Text
T.append Text
"@@@ server time from LD was determined to be: " (Text -> Text) -> Text -> Text
forall a b. (a -> b) -> a -> b
$ String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Integer -> String
forall a. Show a => a -> String
show Integer
serverTime)
m () -> m (Bool, Integer) -> m (Bool, Integer)
forall a b. m a -> m b -> m b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> if Status
code Status -> Status -> Bool
forall a. Ord a => a -> a -> Bool
< Status
status400
then (Bool, Integer) -> m (Bool, Integer)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Bool
True, Integer
serverTime)
else
if Int -> Bool
isHttpUnrecoverable (Int -> Bool) -> Int -> Bool
forall a b. (a -> b) -> a -> b
$ Status -> Int
statusCode (Status -> Int) -> Status -> Int
forall a b. (a -> b) -> a -> b
$ Status
code
then $(logWarn) (Text -> Text -> Text
T.append Text
"got non recoverable event post response dropping payload: " (Text -> Text) -> Text -> Text
forall a b. (a -> b) -> a -> b
$ String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Status -> String
forall a. Show a => a -> String
show Status
code) m () -> m (Bool, Integer) -> m (Bool, Integer)
forall a b. m a -> m b -> m b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> (Bool, Integer) -> m (Bool, Integer)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Bool
True, Integer
serverTime)
else (Bool, Integer) -> m (Bool, Integer)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Bool
False, Integer
serverTime)
setEventHeaders :: Request -> Request
Request
request =
Request
request
{ requestHeaders =
(requestHeaders request)
& \RequestHeaders
l ->
RequestHeaders -> HeaderName -> ByteString -> RequestHeaders
forall k v. Eq k => [(k, v)] -> k -> v -> [(k, v)]
addToAL RequestHeaders
l HeaderName
"Content-Type" ByteString
"application/json"
RequestHeaders
-> (RequestHeaders -> RequestHeaders) -> RequestHeaders
forall a b. a -> (a -> b) -> b
& \RequestHeaders
l -> RequestHeaders -> HeaderName -> ByteString -> RequestHeaders
forall k v. Eq k => [(k, v)] -> k -> v -> [(k, v)]
addToAL RequestHeaders
l HeaderName
"X-LaunchDarkly-Event-Schema" ByteString
"4"
, method = "POST"
}
updateLastKnownServerTime :: EventState -> Integer -> IO ()
updateLastKnownServerTime :: EventState -> Integer -> IO ()
updateLastKnownServerTime EventState
state Integer
serverTime = MVar Integer -> (Integer -> IO Integer) -> IO ()
forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ (forall (f :: Symbol) a s. HasField' f s a => s -> a
getField @"lastKnownServerTime" EventState
state) (\Integer
lastKnown -> Integer -> IO Integer
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Integer -> IO Integer) -> Integer -> IO Integer
forall a b. (a -> b) -> a -> b
$ Integer -> Integer -> Integer
forall a. Ord a => a -> a -> a
max Integer
serverTime Integer
lastKnown)
eventThread :: (MonadIO m, MonadLogger m, MonadMask m) => Manager -> Client -> ClientContext -> m ()
eventThread :: forall (m :: * -> *).
(MonadIO m, MonadLogger m, MonadMask m) =>
Manager -> Client -> ClientContext -> m ()
eventThread Manager
manager Client
client ClientContext
clientContext = do
let
state :: EventState
state = forall (f :: Symbol) a s. HasField' f s a => s -> a
getField @"events" Client
client
config :: Config
config = forall (f :: Symbol) a s. HasField' f s a => s -> a
getField @"config" Client
client
compressEvents :: Bool
compressEvents = forall (f :: Symbol) a s. HasField' f s a => s -> a
getField @"compressEvents" Config
config
httpConfig :: HttpConfiguration
httpConfig = ClientContext -> HttpConfiguration
httpConfiguration ClientContext
clientContext
IORef StdGen
rngRef <- IO (IORef StdGen) -> m (IORef StdGen)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef StdGen) -> m (IORef StdGen))
-> IO (IORef StdGen) -> m (IORef StdGen)
forall a b. (a -> b) -> a -> b
$ IO StdGen
forall (m :: * -> *). MonadIO m => m StdGen
newStdGen IO StdGen -> (StdGen -> IO (IORef StdGen)) -> IO (IORef StdGen)
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= StdGen -> IO (IORef StdGen)
forall a. a -> IO (IORef a)
newIORef
Request
req <- (IO Request -> m Request
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Request -> m Request) -> IO Request -> m Request
forall a b. (a -> b) -> a -> b
$ HttpConfiguration -> String -> IO Request
forall (m :: * -> *).
MonadThrow m =>
HttpConfiguration -> String -> m Request
prepareRequest HttpConfiguration
httpConfig (String -> IO Request) -> String -> IO Request
forall a b. (a -> b) -> a -> b
$ (Text -> String
T.unpack (Text -> String) -> Text -> String
forall a b. (a -> b) -> a -> b
$ forall (f :: Symbol) a s. HasField' f s a => s -> a
getField @"eventsURI" Config
config) String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
"/bulk") m Request -> (Request -> m Request) -> m Request
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Request -> m Request
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Request -> m Request)
-> (Request -> Request) -> Request -> m Request
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Request -> Request
setEventHeaders
m () -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ Client -> m Any -> m ()
forall (m :: * -> *) a.
(MonadIO m, MonadLogger m, MonadCatch m) =>
Client -> m a -> m ()
tryAuthorized Client
client (m Any -> m ()) -> m Any -> m ()
forall a b. (a -> b) -> a -> b
$ m () -> m Any
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (m () -> m Any) -> m () -> m Any
forall a b. (a -> b) -> a -> b
$ do
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Config -> EventState -> IO ()
processSummary Config
config EventState
state
[EventType]
events' <- IO [EventType] -> m [EventType]
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO [EventType] -> m [EventType])
-> IO [EventType] -> m [EventType]
forall a b. (a -> b) -> a -> b
$ MVar [EventType] -> [EventType] -> IO [EventType]
forall a. MVar a -> a -> IO a
swapMVar (forall (f :: Symbol) a s. HasField' f s a => s -> a
getField @"events" EventState
state) []
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool -> Bool
not (Bool -> Bool) -> Bool -> Bool
forall a b. (a -> b) -> a -> b
$ [EventType] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [EventType]
events') (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
UUID
payloadId <- IO UUID -> m UUID
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO UUID -> m UUID) -> IO UUID -> m UUID
forall a b. (a -> b) -> a -> b
$ IORef StdGen -> (StdGen -> (StdGen, UUID)) -> IO UUID
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef StdGen
rngRef ((UUID, StdGen) -> (StdGen, UUID)
forall a b. (a, b) -> (b, a)
swap ((UUID, StdGen) -> (StdGen, UUID))
-> (StdGen -> (UUID, StdGen)) -> StdGen -> (StdGen, UUID)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StdGen -> (UUID, StdGen)
forall g. RandomGen g => g -> (UUID, g)
forall a g. (Random a, RandomGen g) => g -> (a, g)
random)
let
encoded :: ByteString
encoded = [EventType] -> ByteString
forall a. ToJSON a => a -> ByteString
encode [EventType]
events'
payload :: ByteString
payload = if Bool
compressEvents then ByteString -> ByteString
GZip.compress ByteString
encoded else ByteString
encoded
thisReq :: Request
thisReq =
Request
req
{ requestBody = RequestBodyLBS payload
, requestHeaders =
(requestHeaders req)
& \RequestHeaders
l ->
RequestHeaders -> HeaderName -> ByteString -> RequestHeaders
forall k v. Eq k => [(k, v)] -> k -> v -> [(k, v)]
addToAL RequestHeaders
l HeaderName
"X-LaunchDarkly-Payload-ID" (UUID -> ByteString
UUID.toASCIIBytes UUID
payloadId)
RequestHeaders
-> (RequestHeaders -> RequestHeaders) -> RequestHeaders
forall a b. a -> (a -> b) -> b
& \RequestHeaders
l -> if Bool
compressEvents then RequestHeaders -> HeaderName -> ByteString -> RequestHeaders
forall k v. Eq k => [(k, v)] -> k -> v -> [(k, v)]
addToAL RequestHeaders
l HeaderName
"Content-Encoding" ByteString
"gzip" else RequestHeaders
l
}
(Bool
success, Integer
serverTime) <- Manager -> Request -> m (Bool, Integer)
forall (m :: * -> *).
(MonadIO m, MonadLogger m, MonadMask m, MonadThrow m) =>
Manager -> Request -> m (Bool, Integer)
processSend Manager
manager Request
thisReq
$(logDebug) (Text -> m ()) -> Text -> m ()
forall a b. (a -> b) -> a -> b
$ Text -> Text -> Text
T.append Text
"sending events: " (Text -> Text) -> Text -> Text
forall a b. (a -> b) -> a -> b
$ ByteString -> Text
decodeUtf8 (ByteString -> Text) -> ByteString -> Text
forall a b. (a -> b) -> a -> b
$ ByteString -> ByteString
L.toStrict ByteString
encoded
()
_ <- case Bool
success of
Bool
True -> IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ EventState -> Integer -> IO ()
updateLastKnownServerTime EventState
state Integer
serverTime
Bool
False -> do
$(logWarn) Text
"retrying event delivery after one second"
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IO (Maybe ()) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Maybe ()) -> IO ()) -> IO (Maybe ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ Int -> IO () -> IO (Maybe ())
forall a. Int -> IO a -> IO (Maybe a)
timeout (Int
1 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
1000000) (IO () -> IO (Maybe ())) -> IO () -> IO (Maybe ())
forall a b. (a -> b) -> a -> b
$ MVar () -> IO ()
forall a. MVar a -> IO a
readMVar (MVar () -> IO ()) -> MVar () -> IO ()
forall a b. (a -> b) -> a -> b
$ forall (f :: Symbol) a s. HasField' f s a => s -> a
getField @"flush" EventState
state
(Bool
success', Integer
serverTime') <- Manager -> Request -> m (Bool, Integer)
forall (m :: * -> *).
(MonadIO m, MonadLogger m, MonadMask m, MonadThrow m) =>
Manager -> Request -> m (Bool, Integer)
processSend Manager
manager Request
thisReq
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
success' (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
$(logWarn) Text
"failed sending events on retry, dropping event batch"
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ EventState -> Integer -> IO ()
updateLastKnownServerTime EventState
state Integer
serverTime'
$(logDebug) Text
"finished send of event batch"
Status
status <- IO Status -> m Status
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Status -> m Status) -> IO Status -> m Status
forall a b. (a -> b) -> a -> b
$ IORef Status -> IO Status
forall a. IORef a -> IO a
readIORef (IORef Status -> IO Status) -> IORef Status -> IO Status
forall a b. (a -> b) -> a -> b
$ forall (f :: Symbol) a s. HasField' f s a => s -> a
getField @"status" Client
client
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Status
status Status -> Status -> Bool
forall a. Eq a => a -> a -> Bool
== Status
ShuttingDown) (IO ThreadId
myThreadId IO ThreadId -> (ThreadId -> IO ()) -> IO ()
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= ThreadId -> IO ()
killThread)
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IO (Maybe ()) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Maybe ()) -> IO ()) -> IO (Maybe ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ Int -> IO () -> IO (Maybe ())
forall a. Int -> IO a -> IO (Maybe a)
timeout (Int -> Int -> Int
forall a. Num a => a -> a -> a
(*) Int
1000000 (Int -> Int) -> Int -> Int
forall a b. (a -> b) -> a -> b
$ Natural -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Natural -> Int) -> Natural -> Int
forall a b. (a -> b) -> a -> b
$ forall (f :: Symbol) a s. HasField' f s a => s -> a
getField @"flushIntervalSeconds" Config
config) (IO () -> IO (Maybe ())) -> IO () -> IO (Maybe ())
forall a b. (a -> b) -> a -> b
$ MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar (MVar () -> IO ()) -> MVar () -> IO ()
forall a b. (a -> b) -> a -> b
$ forall (f :: Symbol) a s. HasField' f s a => s -> a
getField @"flush" EventState
state