module LaunchDarkly.Server.Network.Eventing (eventThread) where
import Data.Aeson (encode)
import Data.Function ((&))
import Data.Tuple (swap)
import Data.IORef (newIORef, readIORef, atomicModifyIORef')
import qualified Data.UUID as UUID
import Control.Monad.Logger (MonadLogger, logDebug, logWarn, logError)
import Control.Monad.IO.Class (MonadIO, liftIO)
import Network.HTTP.Client (Manager, Request(..), RequestBody(..), httpLbs, parseRequest, responseStatus)
import Data.Generics.Product (getField)
import qualified Data.Text as T
import Control.Concurrent (killThread, myThreadId)
import Control.Monad (forever, when, void, unless)
import Control.Monad.Catch (MonadMask, MonadThrow)
import Control.Concurrent.MVar (takeMVar, readMVar, swapMVar)
import System.Timeout (timeout)
import System.Random (newStdGen, random)
import Data.Text.Encoding (decodeUtf8)
import qualified Data.ByteString.Lazy as L
import Network.HTTP.Types.Status (status400, status408, status429, status500)
import LaunchDarkly.Server.Client.Internal (ClientI, Status(ShuttingDown))
import LaunchDarkly.Server.Network.Common (tryAuthorized, checkAuthorization, prepareRequest, tryHTTP, addToAL)
import LaunchDarkly.Server.Events (processSummary)
processSend :: (MonadIO m, MonadLogger m, MonadMask m, MonadThrow m) => Manager -> Request -> m Bool
processSend :: Manager -> Request -> m Bool
processSend Manager
manager Request
req = (IO (Either HttpException (Response ByteString))
-> m (Either HttpException (Response ByteString))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either HttpException (Response ByteString))
-> m (Either HttpException (Response ByteString)))
-> IO (Either HttpException (Response ByteString))
-> m (Either HttpException (Response ByteString))
forall a b. (a -> b) -> a -> b
$ IO (Response ByteString)
-> IO (Either HttpException (Response ByteString))
forall (m :: * -> *) a.
MonadCatch m =>
m a -> m (Either HttpException a)
tryHTTP (IO (Response ByteString)
-> IO (Either HttpException (Response ByteString)))
-> IO (Response ByteString)
-> IO (Either HttpException (Response ByteString))
forall a b. (a -> b) -> a -> b
$ Request -> Manager -> IO (Response ByteString)
httpLbs Request
req Manager
manager) m (Either HttpException (Response ByteString))
-> (Either HttpException (Response ByteString) -> m Bool) -> m Bool
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
(Left HttpException
err) -> $(Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> m ()
(Text -> m ()) -> (Text -> Text) -> Text -> m ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
id :: forall a. a -> a
logError) (String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ HttpException -> String
forall a. Show a => a -> String
show HttpException
err) m () -> m Bool -> m Bool
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Bool -> m Bool
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
False
(Right Response ByteString
response) -> Response ByteString -> m ()
forall (m :: * -> *) body. MonadThrow m => Response body -> m ()
checkAuthorization Response ByteString
response m () -> m Bool -> m Bool
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> let code :: Status
code = Response ByteString -> Status
forall body. Response body -> Status
responseStatus Response ByteString
response in
if Status
code Status -> Status -> Bool
forall a. Ord a => a -> a -> Bool
< Status
status400 then Bool -> m Bool
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
True else if (Status -> [Status] -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
elem Status
code [Status
status400, Status
status408, Status
status429]) Bool -> Bool -> Bool
|| Status
code Status -> Status -> Bool
forall a. Ord a => a -> a -> Bool
>= Status
status500 then Bool -> m Bool
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
False else
$(Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> m ()
(Text -> m ()) -> (Text -> Text) -> Text -> m ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
id :: forall a. a -> a
logWarn) (Text -> Text -> Text
T.append Text
"got non recoverable event post response dropping payload: " (Text -> Text) -> Text -> Text
forall a b. (a -> b) -> a -> b
$ String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Status -> String
forall a. Show a => a -> String
show Status
code) m () -> m Bool -> m Bool
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Bool -> m Bool
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
True
setEventHeaders :: Request -> Request
Request
request = Request
request
{ requestHeaders :: RequestHeaders
requestHeaders = (Request -> RequestHeaders
requestHeaders Request
request)
RequestHeaders
-> (RequestHeaders -> RequestHeaders) -> RequestHeaders
forall a b. a -> (a -> b) -> b
& \RequestHeaders
l -> RequestHeaders -> HeaderName -> ByteString -> RequestHeaders
forall k v. Eq k => [(k, v)] -> k -> v -> [(k, v)]
addToAL RequestHeaders
l HeaderName
"Content-Type" ByteString
"application/json"
RequestHeaders
-> (RequestHeaders -> RequestHeaders) -> RequestHeaders
forall a b. a -> (a -> b) -> b
& \RequestHeaders
l -> RequestHeaders -> HeaderName -> ByteString -> RequestHeaders
forall k v. Eq k => [(k, v)] -> k -> v -> [(k, v)]
addToAL RequestHeaders
l HeaderName
"X-LaunchDarkly-Event-Schema" ByteString
"3"
, method :: ByteString
method = ByteString
"POST"
}
eventThread :: (MonadIO m, MonadLogger m, MonadMask m) => Manager -> ClientI -> m ()
eventThread :: Manager -> ClientI -> m ()
eventThread Manager
manager ClientI
client = do
let state :: EventState
state = ClientI -> EventState
forall (f :: Symbol) a s. HasField' f s a => s -> a
getField @"events" ClientI
client; config :: ConfigI
config = ClientI -> ConfigI
forall (f :: Symbol) a s. HasField' f s a => s -> a
getField @"config" ClientI
client;
IORef StdGen
rngRef <- IO (IORef StdGen) -> m (IORef StdGen)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef StdGen) -> m (IORef StdGen))
-> IO (IORef StdGen) -> m (IORef StdGen)
forall a b. (a -> b) -> a -> b
$ IO StdGen
forall (m :: * -> *). MonadIO m => m StdGen
newStdGen IO StdGen -> (StdGen -> IO (IORef StdGen)) -> IO (IORef StdGen)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= StdGen -> IO (IORef StdGen)
forall a. a -> IO (IORef a)
newIORef
Request
req <- (IO Request -> m Request
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Request -> m Request) -> IO Request -> m Request
forall a b. (a -> b) -> a -> b
$ String -> IO Request
forall (m :: * -> *). MonadThrow m => String -> m Request
parseRequest (String -> IO Request) -> String -> IO Request
forall a b. (a -> b) -> a -> b
$ (Text -> String
T.unpack (Text -> String) -> Text -> String
forall a b. (a -> b) -> a -> b
$ ConfigI -> Text
forall (f :: Symbol) a s. HasField' f s a => s -> a
getField @"eventsURI" ConfigI
config) String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
"/bulk") m Request -> (Request -> m Request) -> m Request
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Request -> m Request
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Request -> m Request)
-> (Request -> Request) -> Request -> m Request
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Request -> Request
setEventHeaders (Request -> Request) -> (Request -> Request) -> Request -> Request
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ConfigI -> Request -> Request
prepareRequest ConfigI
config
m () -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ ClientI -> m Any -> m ()
forall (m :: * -> *) a.
(MonadIO m, MonadLogger m, MonadCatch m) =>
ClientI -> m a -> m ()
tryAuthorized ClientI
client (m Any -> m ()) -> m Any -> m ()
forall a b. (a -> b) -> a -> b
$ m () -> m Any
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (m () -> m Any) -> m () -> m Any
forall a b. (a -> b) -> a -> b
$ do
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ ConfigI -> EventState -> IO ()
processSummary ConfigI
config EventState
state
[EventType]
events' <- IO [EventType] -> m [EventType]
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO [EventType] -> m [EventType])
-> IO [EventType] -> m [EventType]
forall a b. (a -> b) -> a -> b
$ MVar [EventType] -> [EventType] -> IO [EventType]
forall a. MVar a -> a -> IO a
swapMVar (EventState -> MVar [EventType]
forall (f :: Symbol) a s. HasField' f s a => s -> a
getField @"events" EventState
state) []
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool -> Bool
not (Bool -> Bool) -> Bool -> Bool
forall a b. (a -> b) -> a -> b
$ [EventType] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [EventType]
events') (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
UUID
payloadId <- IO UUID -> m UUID
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO UUID -> m UUID) -> IO UUID -> m UUID
forall a b. (a -> b) -> a -> b
$ IORef StdGen -> (StdGen -> (StdGen, UUID)) -> IO UUID
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef StdGen
rngRef ((UUID, StdGen) -> (StdGen, UUID)
forall a b. (a, b) -> (b, a)
swap ((UUID, StdGen) -> (StdGen, UUID))
-> (StdGen -> (UUID, StdGen)) -> StdGen -> (StdGen, UUID)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StdGen -> (UUID, StdGen)
forall a g. (Random a, RandomGen g) => g -> (a, g)
random)
let
encoded :: ByteString
encoded = [EventType] -> ByteString
forall a. ToJSON a => a -> ByteString
encode [EventType]
events'
thisReq :: Request
thisReq = Request
req
{ requestBody :: RequestBody
requestBody = ByteString -> RequestBody
RequestBodyLBS ByteString
encoded
, requestHeaders :: RequestHeaders
requestHeaders = (Request -> RequestHeaders
requestHeaders Request
req)
RequestHeaders
-> (RequestHeaders -> RequestHeaders) -> RequestHeaders
forall a b. a -> (a -> b) -> b
& \RequestHeaders
l -> RequestHeaders -> HeaderName -> ByteString -> RequestHeaders
forall k v. Eq k => [(k, v)] -> k -> v -> [(k, v)]
addToAL RequestHeaders
l HeaderName
"X-LaunchDarkly-Payload-ID" (UUID -> ByteString
UUID.toASCIIBytes UUID
payloadId)
}
$(Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> m ()
(Text -> m ()) -> (Text -> Text) -> Text -> m ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
id :: forall a. a -> a
logDebug) (Text -> m ()) -> Text -> m ()
forall a b. (a -> b) -> a -> b
$ Text -> Text -> Text
T.append Text
"sending events: " (Text -> Text) -> Text -> Text
forall a b. (a -> b) -> a -> b
$ ByteString -> Text
decodeUtf8 (ByteString -> Text) -> ByteString -> Text
forall a b. (a -> b) -> a -> b
$ ByteString -> ByteString
L.toStrict ByteString
encoded
Bool
success <- Manager -> Request -> m Bool
forall (m :: * -> *).
(MonadIO m, MonadLogger m, MonadMask m, MonadThrow m) =>
Manager -> Request -> m Bool
processSend Manager
manager Request
thisReq
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
success (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
$(Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> m ()
(Text -> m ()) -> (Text -> Text) -> Text -> m ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
id :: forall a. a -> a
logWarn) Text
"retrying event delivery after one second"
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IO (Maybe ()) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Maybe ()) -> IO ()) -> IO (Maybe ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ Int -> IO () -> IO (Maybe ())
forall a. Int -> IO a -> IO (Maybe a)
timeout (Int
1 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
1000000) (IO () -> IO (Maybe ())) -> IO () -> IO (Maybe ())
forall a b. (a -> b) -> a -> b
$ MVar () -> IO ()
forall a. MVar a -> IO a
readMVar (MVar () -> IO ()) -> MVar () -> IO ()
forall a b. (a -> b) -> a -> b
$ EventState -> MVar ()
forall (f :: Symbol) a s. HasField' f s a => s -> a
getField @"flush" EventState
state
Bool
success' <- Manager -> Request -> m Bool
forall (m :: * -> *).
(MonadIO m, MonadLogger m, MonadMask m, MonadThrow m) =>
Manager -> Request -> m Bool
processSend Manager
manager Request
thisReq
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
success' (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ $(Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> m ()
(Text -> m ()) -> (Text -> Text) -> Text -> m ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
id :: forall a. a -> a
logWarn) Text
"failed sending events on retry, dropping event batch"
$(Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> m ()
(Text -> m ()) -> (Text -> Text) -> Text -> m ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
id :: forall a. a -> a
logDebug) Text
"finished send of event batch"
Status
status <- IO Status -> m Status
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Status -> m Status) -> IO Status -> m Status
forall a b. (a -> b) -> a -> b
$ IORef Status -> IO Status
forall a. IORef a -> IO a
readIORef (IORef Status -> IO Status) -> IORef Status -> IO Status
forall a b. (a -> b) -> a -> b
$ ClientI -> IORef Status
forall (f :: Symbol) a s. HasField' f s a => s -> a
getField @"status" ClientI
client
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Status
status Status -> Status -> Bool
forall a. Eq a => a -> a -> Bool
== Status
ShuttingDown) (IO ThreadId
myThreadId IO ThreadId -> (ThreadId -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= ThreadId -> IO ()
killThread)
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IO (Maybe ()) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Maybe ()) -> IO ()) -> IO (Maybe ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ Int -> IO () -> IO (Maybe ())
forall a. Int -> IO a -> IO (Maybe a)
timeout (Int -> Int -> Int
forall a. Num a => a -> a -> a
(*) Int
1000000 (Int -> Int) -> Int -> Int
forall a b. (a -> b) -> a -> b
$ Natural -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Natural -> Int) -> Natural -> Int
forall a b. (a -> b) -> a -> b
$ ConfigI -> Natural
forall (f :: Symbol) a s. HasField' f s a => s -> a
getField @"flushIntervalSeconds" ConfigI
config) (IO () -> IO (Maybe ())) -> IO () -> IO (Maybe ())
forall a b. (a -> b) -> a -> b
$ MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar (MVar () -> IO ()) -> MVar () -> IO ()
forall a b. (a -> b) -> a -> b
$ EventState -> MVar ()
forall (f :: Symbol) a s. HasField' f s a => s -> a
getField @"flush" EventState
state