module LaunchDarkly.Server.Network.Eventing (eventThread) where

import           Data.Aeson                          (encode)
import           Data.Function                       ((&))
import           Data.Tuple                          (swap)
import           Data.IORef                          (newIORef, readIORef, atomicModifyIORef')
import qualified Data.UUID as                        UUID
import           Control.Monad.Logger                (MonadLogger, logDebug, logWarn, logError)
import           Control.Monad.IO.Class              (MonadIO, liftIO)
import           Network.HTTP.Client                 (Manager, Request(..), RequestBody(..), httpLbs, parseRequest, responseStatus)
import           Data.Generics.Product               (getField)
import qualified Data.Text as                        T
import           Control.Concurrent                  (killThread, myThreadId)
import           Control.Monad                       (forever, when, void, unless)
import           Control.Monad.Catch                 (MonadMask, MonadThrow)
import           Control.Concurrent.MVar             (takeMVar, readMVar, swapMVar, modifyMVar_)
import           System.Timeout                      (timeout)
import           System.Random                       (newStdGen, random)
import           Data.Text.Encoding                  (decodeUtf8)
import qualified Data.ByteString.Lazy as             L
import           Network.HTTP.Types.Status           (status400, status408, status429, status500)

import           LaunchDarkly.Server.Client.Internal (ClientI, Status(ShuttingDown))
import           LaunchDarkly.Server.Network.Common  (tryAuthorized, checkAuthorization, getServerTime, prepareRequest, tryHTTP, addToAL)
import           LaunchDarkly.Server.Events          (processSummary, EventState)

-- A true result indicates a retry does not need to be attempted
processSend :: (MonadIO m, MonadLogger m, MonadMask m, MonadThrow m) => Manager -> Request -> m (Bool, Integer)
processSend :: forall (m :: * -> *).
(MonadIO m, MonadLogger m, MonadMask m, MonadThrow m) =>
Manager -> Request -> m (Bool, Integer)
processSend Manager
manager Request
req = (forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
MonadCatch m =>
m a -> m (Either HttpException a)
tryHTTP forall a b. (a -> b) -> a -> b
$ Request -> Manager -> IO (Response ByteString)
httpLbs Request
req Manager
manager) forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
    (Left HttpException
err)       -> $(logError) (String -> Text
T.pack forall a b. (a -> b) -> a -> b
$ forall a. Show a => a -> String
show HttpException
err) forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall (f :: * -> *) a. Applicative f => a -> f a
pure (Bool
False, Integer
0)
    (Right Response ByteString
response) -> do
        forall (m :: * -> *) body. MonadThrow m => Response body -> m ()
checkAuthorization Response ByteString
response
        let code :: Status
code = forall body. Response body -> Status
responseStatus Response ByteString
response
            serverTime :: Integer
serverTime = forall body. Response body -> Integer
getServerTime Response ByteString
response in
            $(logWarn) (Text -> Text -> Text
T.append Text
"@@@ server time from LD was determined to be: " forall a b. (a -> b) -> a -> b
$ String -> Text
T.pack forall a b. (a -> b) -> a -> b
$ forall a. Show a => a -> String
show Integer
serverTime) forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>>
            if Status
code forall a. Ord a => a -> a -> Bool
< Status
status400
               then forall (f :: * -> *) a. Applicative f => a -> f a
pure (Bool
True, Integer
serverTime)
           else if (forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
elem Status
code [Status
status400, Status
status408, Status
status429]) Bool -> Bool -> Bool
|| Status
code forall a. Ord a => a -> a -> Bool
>= Status
status500
           then forall (f :: * -> *) a. Applicative f => a -> f a
pure (Bool
False, Integer
serverTime)
           else $(logWarn) (Text -> Text -> Text
T.append Text
"got non recoverable event post response dropping payload: " forall a b. (a -> b) -> a -> b
$ String -> Text
T.pack forall a b. (a -> b) -> a -> b
$ forall a. Show a => a -> String
show Status
code) forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall (f :: * -> *) a. Applicative f => a -> f a
pure (Bool
True, Integer
serverTime)

setEventHeaders :: Request -> Request
setEventHeaders :: Request -> Request
setEventHeaders Request
request = Request
request
    { requestHeaders :: RequestHeaders
requestHeaders = (Request -> RequestHeaders
requestHeaders Request
request)
        forall a b. a -> (a -> b) -> b
& \RequestHeaders
l -> forall k v. Eq k => [(k, v)] -> k -> v -> [(k, v)]
addToAL RequestHeaders
l HeaderName
"Content-Type" ByteString
"application/json"
        forall a b. a -> (a -> b) -> b
& \RequestHeaders
l -> forall k v. Eq k => [(k, v)] -> k -> v -> [(k, v)]
addToAL RequestHeaders
l HeaderName
"X-LaunchDarkly-Event-Schema" ByteString
"3"
    , method :: ByteString
method         = ByteString
"POST"
    }

updateLastKnownServerTime :: EventState -> Integer -> IO ()
updateLastKnownServerTime :: EventState -> Integer -> IO ()
updateLastKnownServerTime EventState
state Integer
serverTime = forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ (forall (f :: Symbol) a s. HasField' f s a => s -> a
getField @"lastKnownServerTime" EventState
state) (\Integer
lastKnown -> forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ forall a. Ord a => a -> a -> a
max Integer
serverTime Integer
lastKnown)

eventThread :: (MonadIO m, MonadLogger m, MonadMask m) => Manager -> ClientI -> m ()
eventThread :: forall (m :: * -> *).
(MonadIO m, MonadLogger m, MonadMask m) =>
Manager -> ClientI -> m ()
eventThread Manager
manager ClientI
client = do
    let state :: EventState
state = forall (f :: Symbol) a s. HasField' f s a => s -> a
getField @"events" ClientI
client; config :: ConfigI
config = forall (f :: Symbol) a s. HasField' f s a => s -> a
getField @"config" ClientI
client;
    IORef StdGen
rngRef <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *). MonadIO m => m StdGen
newStdGen forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= forall a. a -> IO (IORef a)
newIORef
    Request
req <- (forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *). MonadThrow m => String -> m Request
parseRequest forall a b. (a -> b) -> a -> b
$ (Text -> String
T.unpack forall a b. (a -> b) -> a -> b
$ forall (f :: Symbol) a s. HasField' f s a => s -> a
getField @"eventsURI" ConfigI
config) forall a. [a] -> [a] -> [a]
++ String
"/bulk") forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= forall (f :: * -> *) a. Applicative f => a -> f a
pure forall b c a. (b -> c) -> (a -> b) -> a -> c
. Request -> Request
setEventHeaders forall b c a. (b -> c) -> (a -> b) -> a -> c
. ConfigI -> Request -> Request
prepareRequest ConfigI
config
    forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
(MonadIO m, MonadLogger m, MonadCatch m) =>
ClientI -> m a -> m ()
tryAuthorized ClientI
client forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) a b. Applicative f => f a -> f b
forever forall a b. (a -> b) -> a -> b
$ do
        forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ ConfigI -> EventState -> IO ()
processSummary ConfigI
config EventState
state
        [EventType]
events' <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. MVar a -> a -> IO a
swapMVar (forall (f :: Symbol) a s. HasField' f s a => s -> a
getField @"events" EventState
state) []
        forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool -> Bool
not forall a b. (a -> b) -> a -> b
$ forall (t :: * -> *) a. Foldable t => t a -> Bool
null [EventType]
events') forall a b. (a -> b) -> a -> b
$ do
            UUID
payloadId <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef StdGen
rngRef (forall a b. (a, b) -> (b, a)
swap forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a g. (Random a, RandomGen g) => g -> (a, g)
random)
            let
                encoded :: ByteString
encoded = forall a. ToJSON a => a -> ByteString
encode [EventType]
events'
                thisReq :: Request
thisReq = Request
req
                    { requestBody :: RequestBody
requestBody    = ByteString -> RequestBody
RequestBodyLBS ByteString
encoded
                    , requestHeaders :: RequestHeaders
requestHeaders = (Request -> RequestHeaders
requestHeaders Request
req)
                        forall a b. a -> (a -> b) -> b
& \RequestHeaders
l -> forall k v. Eq k => [(k, v)] -> k -> v -> [(k, v)]
addToAL RequestHeaders
l HeaderName
"X-LaunchDarkly-Payload-ID" (UUID -> ByteString
UUID.toASCIIBytes UUID
payloadId)
                    }
            (Bool
success, Integer
serverTime) <- forall (m :: * -> *).
(MonadIO m, MonadLogger m, MonadMask m, MonadThrow m) =>
Manager -> Request -> m (Bool, Integer)
processSend Manager
manager Request
thisReq
            $(logDebug) forall a b. (a -> b) -> a -> b
$ Text -> Text -> Text
T.append Text
"sending events: " forall a b. (a -> b) -> a -> b
$ ByteString -> Text
decodeUtf8 forall a b. (a -> b) -> a -> b
$ ByteString -> ByteString
L.toStrict ByteString
encoded
            ()
_ <- case Bool
success of
              Bool
True -> forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ EventState -> Integer -> IO ()
updateLastKnownServerTime EventState
state Integer
serverTime
              Bool
False -> do
                $(logWarn) Text
"retrying event delivery after one second"
                forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall a. Int -> IO a -> IO (Maybe a)
timeout (Int
1 forall a. Num a => a -> a -> a
* Int
1000000) forall a b. (a -> b) -> a -> b
$ forall a. MVar a -> IO a
readMVar forall a b. (a -> b) -> a -> b
$ forall (f :: Symbol) a s. HasField' f s a => s -> a
getField @"flush" EventState
state
                (Bool
success', Integer
serverTime') <- forall (m :: * -> *).
(MonadIO m, MonadLogger m, MonadMask m, MonadThrow m) =>
Manager -> Request -> m (Bool, Integer)
processSend Manager
manager Request
thisReq
                forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
success' forall a b. (a -> b) -> a -> b
$ do
                    $(logWarn) Text
"failed sending events on retry, dropping event batch"
                forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ EventState -> Integer -> IO ()
updateLastKnownServerTime EventState
state Integer
serverTime'
            $(logDebug) Text
"finished send of event batch"
        Status
status <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> IO a
readIORef forall a b. (a -> b) -> a -> b
$ forall (f :: Symbol) a s. HasField' f s a => s -> a
getField @"status" ClientI
client
        forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Status
status forall a. Eq a => a -> a -> Bool
== Status
ShuttingDown) (IO ThreadId
myThreadId forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= ThreadId -> IO ()
killThread)
        forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall a. Int -> IO a -> IO (Maybe a)
timeout (forall a. Num a => a -> a -> a
(*) Int
1000000 forall a b. (a -> b) -> a -> b
$ forall a b. (Integral a, Num b) => a -> b
fromIntegral forall a b. (a -> b) -> a -> b
$ forall (f :: Symbol) a s. HasField' f s a => s -> a
getField @"flushIntervalSeconds" ConfigI
config) forall a b. (a -> b) -> a -> b
$ forall a. MVar a -> IO a
takeMVar forall a b. (a -> b) -> a -> b
$ forall (f :: Symbol) a s. HasField' f s a => s -> a
getField @"flush" EventState
state