{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE NumericUnderscores #-}

module LaunchDarkly.Server.Network.Streaming (streamingThread) where

import Control.Applicative (many)
import Control.Concurrent (threadDelay)
import Control.Exception (throwIO)
import Control.Monad (mzero, void)
import Control.Monad.Catch (Exception, MonadCatch, MonadMask, try)
import Control.Monad.IO.Class (MonadIO, liftIO)
import Control.Monad.Logger (MonadLogger, logDebug, logError, logInfo, logWarn)
import Control.Monad.Loops (iterateUntilM)
import Data.Aeson (FromJSON, Result (..), eitherDecode, fromJSON, parseJSON, withObject, (.!=), (.:), (.:?))
import Data.Attoparsec.Text as P hiding (Result, try)
import Data.ByteString (ByteString)
import qualified Data.ByteString as B
import qualified Data.ByteString.Lazy as L
import Data.Text (Text)
import qualified Data.Text as T
import Data.Text.Encoding (decodeUtf8, encodeUtf8)
import GHC.Generics (Generic)
import GHC.Natural (Natural)
import Network.HTTP.Client (HttpException (..), HttpExceptionContent (..), Manager, Request, Response (..), brRead)
import Network.HTTP.Types.Status (Status (statusCode))
import System.Clock (Clock (Monotonic), TimeSpec (TimeSpec), getTime)
import System.Random (Random (randomR), newStdGen)
import System.Timeout (timeout)

import LaunchDarkly.AesonCompat (KeyMap)
import LaunchDarkly.Server.Config.ClientContext (ClientContext (..))
import LaunchDarkly.Server.Config.HttpConfiguration (HttpConfiguration (..), prepareRequest)
import LaunchDarkly.Server.DataSource.Internal (DataSourceUpdates (..))
import LaunchDarkly.Server.Features (Flag, Segment)
import LaunchDarkly.Server.Network.Common (checkAuthorization, handleUnauthorized, isHttpUnrecoverable, throwIfNot200, tryHTTP, withResponseGeneric)
import LaunchDarkly.Server.Store.Internal (StoreResult)

data PutBody = PutBody
    { PutBody -> KeyMap Flag
flags :: !(KeyMap Flag)
    , PutBody -> KeyMap Segment
segments :: !(KeyMap Segment)
    }
    deriving (forall x. Rep PutBody x -> PutBody
forall x. PutBody -> Rep PutBody x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep PutBody x -> PutBody
$cfrom :: forall x. PutBody -> Rep PutBody x
Generic, Int -> PutBody -> ShowS
[PutBody] -> ShowS
PutBody -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [PutBody] -> ShowS
$cshowList :: [PutBody] -> ShowS
show :: PutBody -> String
$cshow :: PutBody -> String
showsPrec :: Int -> PutBody -> ShowS
$cshowsPrec :: Int -> PutBody -> ShowS
Show, Value -> Parser [PutBody]
Value -> Parser PutBody
forall a.
(Value -> Parser a) -> (Value -> Parser [a]) -> FromJSON a
parseJSONList :: Value -> Parser [PutBody]
$cparseJSONList :: Value -> Parser [PutBody]
parseJSON :: Value -> Parser PutBody
$cparseJSON :: Value -> Parser PutBody
FromJSON)

data PathData d = PathData
    { forall d. PathData d -> Text
path :: !Text
    , forall d. PathData d -> d
pathData :: !d
    }
    deriving (forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
forall d x. Rep (PathData d) x -> PathData d
forall d x. PathData d -> Rep (PathData d) x
$cto :: forall d x. Rep (PathData d) x -> PathData d
$cfrom :: forall d x. PathData d -> Rep (PathData d) x
Generic, Int -> PathData d -> ShowS
forall d. Show d => Int -> PathData d -> ShowS
forall d. Show d => [PathData d] -> ShowS
forall d. Show d => PathData d -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [PathData d] -> ShowS
$cshowList :: forall d. Show d => [PathData d] -> ShowS
show :: PathData d -> String
$cshow :: forall d. Show d => PathData d -> String
showsPrec :: Int -> PathData d -> ShowS
$cshowsPrec :: forall d. Show d => Int -> PathData d -> ShowS
Show)

data PathVersion = PathVersion
    { PathVersion -> Text
path :: !Text
    , PathVersion -> Natural
version :: !Natural
    }
    deriving (forall x. Rep PathVersion x -> PathVersion
forall x. PathVersion -> Rep PathVersion x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep PathVersion x -> PathVersion
$cfrom :: forall x. PathVersion -> Rep PathVersion x
Generic, Int -> PathVersion -> ShowS
[PathVersion] -> ShowS
PathVersion -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [PathVersion] -> ShowS
$cshowList :: [PathVersion] -> ShowS
show :: PathVersion -> String
$cshow :: PathVersion -> String
showsPrec :: Int -> PathVersion -> ShowS
$cshowsPrec :: Int -> PathVersion -> ShowS
Show, Value -> Parser [PathVersion]
Value -> Parser PathVersion
forall a.
(Value -> Parser a) -> (Value -> Parser [a]) -> FromJSON a
parseJSONList :: Value -> Parser [PathVersion]
$cparseJSONList :: Value -> Parser [PathVersion]
parseJSON :: Value -> Parser PathVersion
$cparseJSON :: Value -> Parser PathVersion
FromJSON)

instance FromJSON a => FromJSON (PathData a) where
    parseJSON :: Value -> Parser (PathData a)
parseJSON = forall a. String -> (Object -> Parser a) -> Value -> Parser a
withObject String
"Put" forall a b. (a -> b) -> a -> b
$ \Object
o -> do
        a
pathData <- Object
o forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"data"
        Text
path <- Object
o forall a. FromJSON a => Object -> Key -> Parser (Maybe a)
.:? Key
"path" forall a. Parser (Maybe a) -> a -> Parser a
.!= Text
"/"
        forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ PathData {$sel:path:PathData :: Text
path = Text
path, $sel:pathData:PathData :: a
pathData = a
pathData}

data SSE = SSE
    { SSE -> Text
name :: !Text
    , SSE -> Text
buffer :: !Text
    , SSE -> Maybe Text
lastEventId :: !(Maybe Text)
    , SSE -> Maybe Text
retry :: !(Maybe Text)
    }
    deriving (forall x. Rep SSE x -> SSE
forall x. SSE -> Rep SSE x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep SSE x -> SSE
$cfrom :: forall x. SSE -> Rep SSE x
Generic, Int -> SSE -> ShowS
[SSE] -> ShowS
SSE -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [SSE] -> ShowS
$cshowList :: [SSE] -> ShowS
show :: SSE -> String
$cshow :: SSE -> String
showsPrec :: Int -> SSE -> ShowS
$cshowsPrec :: Int -> SSE -> ShowS
Show, SSE -> SSE -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: SSE -> SSE -> Bool
$c/= :: SSE -> SSE -> Bool
== :: SSE -> SSE -> Bool
$c== :: SSE -> SSE -> Bool
Eq)

nameCharPredicate :: Char -> Bool
nameCharPredicate :: Char -> Bool
nameCharPredicate Char
x = Char
x forall a. Eq a => a -> a -> Bool
/= Char
'\r' Bool -> Bool -> Bool
&& Char
x forall a. Eq a => a -> a -> Bool
/= Char
':' Bool -> Bool -> Bool
&& Char
x forall a. Eq a => a -> a -> Bool
/= Char
'\n'

anyCharPredicate :: Char -> Bool
anyCharPredicate :: Char -> Bool
anyCharPredicate Char
x = Char
x forall a. Eq a => a -> a -> Bool
/= Char
'\r' Bool -> Bool -> Bool
&& Char
x forall a. Eq a => a -> a -> Bool
/= Char
'\n'

endOfLineSSE :: Parser ()
endOfLineSSE :: Parser ()
endOfLineSSE = forall (f :: * -> *) a. Alternative f => [f a] -> f a
choice [forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ Text -> Parser Text
string Text
"\r\n", forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ Text -> Parser Text
string Text
"\r", forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ Text -> Parser Text
string Text
"\n", forall t. Chunk t => Parser t ()
endOfInput]

comment :: Parser ()
comment :: Parser ()
comment = Char -> Parser Char
char Char
':' forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> (Char -> Bool) -> Parser Text
P.takeWhile Char -> Bool
anyCharPredicate forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Parser ()
endOfLineSSE forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall (f :: * -> *) a. Applicative f => a -> f a
pure ()

parseField :: Parser (Text, Text)
parseField :: Parser (Text, Text)
parseField = do
    Text
fieldName <- (Char -> Bool) -> Parser Text
P.takeWhile1 Char -> Bool
nameCharPredicate
    forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) a. Alternative f => a -> f a -> f a
option Char
' ' forall a b. (a -> b) -> a -> b
$ Char -> Parser Char
char Char
':'
    forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) a. Alternative f => a -> f a -> f a
option Char
' ' forall a b. (a -> b) -> a -> b
$ Char -> Parser Char
char Char
' '
    Text
fieldValue <- (Char -> Bool) -> Parser Text
P.takeWhile Char -> Bool
anyCharPredicate
    Parser ()
endOfLineSSE
    forall (f :: * -> *) a. Applicative f => a -> f a
pure (Text
fieldName, Text
fieldValue)

processField :: (Text, Text) -> SSE -> SSE
processField :: (Text, Text) -> SSE -> SSE
processField (Text
fieldName, Text
fieldValue) SSE
event = case Text
fieldName of
    Text
"event" -> SSE
event {$sel:name:SSE :: Text
name = Text
fieldValue}
    Text
"id" -> SSE
event {$sel:lastEventId:SSE :: Maybe Text
lastEventId = forall a. a -> Maybe a
Just Text
fieldValue}
    Text
"retry" -> SSE
event {$sel:retry:SSE :: Maybe Text
retry = forall a. a -> Maybe a
Just Text
fieldValue}
    Text
"data" -> SSE
event {$sel:buffer:SSE :: Text
buffer = [Text] -> Text
T.concat [SSE -> Text
buffer SSE
event, if Text -> Bool
T.null (SSE -> Text
buffer SSE
event) then Text
"" else Text
"\n", Text
fieldValue]}
    Text
_ -> SSE
event

parseEvent :: Parser SSE
parseEvent :: Parser SSE
parseEvent = do
    [(Text, Text)]
fields <- forall (f :: * -> *) a. Alternative f => f a -> f [a]
many (forall (f :: * -> *) a. Alternative f => f a -> f [a]
many Parser ()
comment forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Parser (Text, Text)
parseField forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= forall (f :: * -> *) a. Applicative f => a -> f a
pure)
    Parser ()
endOfLineSSE
    let event :: SSE
event = forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr (Text, Text) -> SSE -> SSE
processField (Text -> Text -> Maybe Text -> Maybe Text -> SSE
SSE Text
"" Text
"" forall (m :: * -> *) a. MonadPlus m => m a
mzero forall (m :: * -> *) a. MonadPlus m => m a
mzero) [(Text, Text)]
fields
    if Text -> Bool
T.null (SSE -> Text
name SSE
event) Bool -> Bool -> Bool
|| Text -> Bool
T.null (SSE -> Text
buffer SSE
event) then Parser SSE
parseEvent else forall (f :: * -> *) a. Applicative f => a -> f a
pure SSE
event

processPut :: (MonadIO m, MonadLogger m) => DataSourceUpdates -> L.ByteString -> m Bool
processPut :: forall (m :: * -> *).
(MonadIO m, MonadLogger m) =>
DataSourceUpdates -> ByteString -> m Bool
processPut DataSourceUpdates
dataSourceUpdates ByteString
value = case forall a. FromJSON a => ByteString -> Either String a
eitherDecode ByteString
value of
    Right (PathData Text
_ (PutBody KeyMap Flag
flags KeyMap Segment
segments)) -> do
        $(logInfo) Text
"initializing dataSourceUpdates with put"
        forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (DataSourceUpdates
-> KeyMap Flag -> KeyMap Segment -> IO (Either Text ())
dataSourceUpdatesInit DataSourceUpdates
dataSourceUpdates KeyMap Flag
flags KeyMap Segment
segments) forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
            Left Text
err -> do
                $(logError) forall a b. (a -> b) -> a -> b
$ Text -> Text -> Text
T.append Text
"dataSourceUpdates failed put: " Text
err
                forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
False
            Either Text ()
_ -> forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
True
    Left String
err -> do
        $(logError) forall a b. (a -> b) -> a -> b
$ Text -> Text -> Text
T.append Text
"failed to parse put body" (String -> Text
T.pack String
err)
        forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
False

processPatch :: forall m. (MonadIO m, MonadLogger m) => DataSourceUpdates -> L.ByteString -> m Bool
processPatch :: forall (m :: * -> *).
(MonadIO m, MonadLogger m) =>
DataSourceUpdates -> ByteString -> m Bool
processPatch DataSourceUpdates
dataSourceUpdates ByteString
value = case forall a. FromJSON a => ByteString -> Either String a
eitherDecode ByteString
value of
    Right (PathData Text
path Value
body)
        | Text -> Text -> Bool
T.isPrefixOf Text
"/flags/" Text
path -> forall a.
Text
-> Text
-> (DataSourceUpdates -> a -> IO (Either Text ()))
-> Result a
-> m Bool
insPatch Text
"flag" Text
path DataSourceUpdates -> Flag -> IO (Either Text ())
dataSourceUpdatesInsertFlag (forall a. FromJSON a => Value -> Result a
fromJSON Value
body)
        | Text -> Text -> Bool
T.isPrefixOf Text
"/segments/" Text
path -> forall a.
Text
-> Text
-> (DataSourceUpdates -> a -> IO (Either Text ()))
-> Result a
-> m Bool
insPatch Text
"segment" Text
path DataSourceUpdates -> Segment -> IO (Either Text ())
dataSourceUpdatesInsertSegment (forall a. FromJSON a => Value -> Result a
fromJSON Value
body)
        | Bool
otherwise -> do
            $(logError) Text
"unknown patch path"
            forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
True
    Left String
err -> do
        $(logError) forall a b. (a -> b) -> a -> b
$ Text -> Text -> Text
T.append Text
"failed to parse patch generic" (String -> Text
T.pack String
err)
        forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
False
  where
    insPatch :: Text -> Text -> (DataSourceUpdates -> a -> IO (Either Text ())) -> Result a -> m Bool
    insPatch :: forall a.
Text
-> Text
-> (DataSourceUpdates -> a -> IO (Either Text ()))
-> Result a
-> m Bool
insPatch Text
name Text
_ DataSourceUpdates -> a -> IO (Either Text ())
_ (Error String
err) = do
        $(logError) forall a b. (a -> b) -> a -> b
$ [Text] -> Text
T.concat [Text
"failed to parse patch ", Text
name, Text
": ", String -> Text
T.pack String
err]
        forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
False
    insPatch Text
name Text
path DataSourceUpdates -> a -> IO (Either Text ())
insert (Success a
item) = do
        $(logInfo) forall a b. (a -> b) -> a -> b
$ [Text] -> Text
T.concat [Text
"patching ", Text
name, Text
" with path: ", Text
path]
        Either Text ()
status <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ DataSourceUpdates -> a -> IO (Either Text ())
insert DataSourceUpdates
dataSourceUpdates a
item
        forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either
            ( \Text
err -> do
                $(logError) forall a b. (a -> b) -> a -> b
$ [Text] -> Text
T.concat [Text
"dataSourceUpdates failed ", Text
name, Text
" patch: ", Text
err]
                forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
False
            )
            (forall a b. a -> b -> a
const forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
True)
            Either Text ()
status

processDelete :: forall m. (MonadIO m, MonadLogger m) => DataSourceUpdates -> L.ByteString -> m Bool
processDelete :: forall (m :: * -> *).
(MonadIO m, MonadLogger m) =>
DataSourceUpdates -> ByteString -> m Bool
processDelete DataSourceUpdates
dataSourceUpdates ByteString
value = case forall a. FromJSON a => ByteString -> Either String a
eitherDecode ByteString
value :: Either String PathVersion of
    Right (PathVersion Text
path Natural
version)
        | Text -> Text -> Bool
T.isPrefixOf Text
"/flags/" Text
path -> Text -> Text -> IO (Either Text ()) -> m Bool
logDelete Text
"flag" Text
path (DataSourceUpdates -> Text -> Natural -> IO (Either Text ())
dataSourceUpdatesDeleteFlag DataSourceUpdates
dataSourceUpdates (Int -> Text -> Text
T.drop Int
7 Text
path) Natural
version)
        | Text -> Text -> Bool
T.isPrefixOf Text
"/segments/" Text
path -> Text -> Text -> IO (Either Text ()) -> m Bool
logDelete Text
"segment" Text
path (DataSourceUpdates -> Text -> Natural -> IO (Either Text ())
dataSourceUpdatesDeleteSegment DataSourceUpdates
dataSourceUpdates (Int -> Text -> Text
T.drop Int
10 Text
path) Natural
version)
        | Bool
otherwise -> do
            $(logError) Text
"unknown delete path"
            forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
False
    Left String
err -> do
        $(logError) forall a b. (a -> b) -> a -> b
$ Text -> Text -> Text
T.append Text
"failed to parse delete" (String -> Text
T.pack String
err)
        forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
False
  where
    logDelete :: Text -> Text -> StoreResult () -> m Bool
    logDelete :: Text -> Text -> IO (Either Text ()) -> m Bool
logDelete Text
name Text
path IO (Either Text ())
action = do
        $(logInfo) forall a b. (a -> b) -> a -> b
$ [Text] -> Text
T.concat [Text
"deleting ", Text
name, Text
" with path: ", Text
path]
        Either Text ()
status <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (Either Text ())
action
        forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either
            ( \Text
err -> do
                $(logError) forall a b. (a -> b) -> a -> b
$ [Text] -> Text
T.concat [Text
"dataSourceUpdates failed ", Text
name, Text
" delete: ", Text
err]
                forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
False
            )
            (forall a b. a -> b -> a
const forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
True)
            Either Text ()
status

processEvent :: (MonadIO m, MonadLogger m) => DataSourceUpdates -> Text -> L.ByteString -> m Bool
processEvent :: forall (m :: * -> *).
(MonadIO m, MonadLogger m) =>
DataSourceUpdates -> Text -> ByteString -> m Bool
processEvent DataSourceUpdates
dataSourceUpdates Text
name ByteString
value = case Text
name of
    Text
"put" -> forall (m :: * -> *).
(MonadIO m, MonadLogger m) =>
DataSourceUpdates -> ByteString -> m Bool
processPut DataSourceUpdates
dataSourceUpdates ByteString
value
    Text
"patch" -> forall (m :: * -> *).
(MonadIO m, MonadLogger m) =>
DataSourceUpdates -> ByteString -> m Bool
processPatch DataSourceUpdates
dataSourceUpdates ByteString
value
    Text
"delete" -> forall (m :: * -> *).
(MonadIO m, MonadLogger m) =>
DataSourceUpdates -> ByteString -> m Bool
processDelete DataSourceUpdates
dataSourceUpdates ByteString
value
    Text
_ -> do
        $(logWarn) Text
"unknown event type"
        forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
True

data ReadE = ReadETimeout | ReadEClosed deriving (Int -> ReadE -> ShowS
[ReadE] -> ShowS
ReadE -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [ReadE] -> ShowS
$cshowList :: [ReadE] -> ShowS
show :: ReadE -> String
$cshow :: ReadE -> String
showsPrec :: Int -> ReadE -> ShowS
$cshowsPrec :: Int -> ReadE -> ShowS
Show, Show ReadE
Typeable ReadE
SomeException -> Maybe ReadE
ReadE -> String
ReadE -> SomeException
forall e.
Typeable e
-> Show e
-> (e -> SomeException)
-> (SomeException -> Maybe e)
-> (e -> String)
-> Exception e
displayException :: ReadE -> String
$cdisplayException :: ReadE -> String
fromException :: SomeException -> Maybe ReadE
$cfromException :: SomeException -> Maybe ReadE
toException :: ReadE -> SomeException
$ctoException :: ReadE -> SomeException
Exception)

tryReadE :: MonadCatch m => m a -> m (Either ReadE a)
tryReadE :: forall (m :: * -> *) a. MonadCatch m => m a -> m (Either ReadE a)
tryReadE = forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> m (Either e a)
try

-- heartbeat expected every 120 seconds
readWithException :: IO ByteString -> IO Text
readWithException :: IO ByteString -> IO Text
readWithException IO ByteString
body =
    forall a. Int -> IO a -> IO (Maybe a)
timeout (Int
1_000_000 forall a. Num a => a -> a -> a
* Int
300) (IO ByteString -> IO ByteString
brRead IO ByteString
body) forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
        Maybe ByteString
Nothing -> forall e a. Exception e => e -> IO a
throwIO ReadE
ReadETimeout
        Just ByteString
bytes -> if ByteString
bytes forall a. Eq a => a -> a -> Bool
== ByteString
B.empty then forall e a. Exception e => e -> IO a
throwIO ReadE
ReadEClosed else forall (f :: * -> *) a. Applicative f => a -> f a
pure (ByteString -> Text
decodeUtf8 ByteString
bytes)

readStream :: (MonadIO m, MonadLogger m, MonadMask m) => IO ByteString -> DataSourceUpdates -> m Bool
readStream :: forall (m :: * -> *).
(MonadIO m, MonadLogger m, MonadMask m) =>
IO ByteString -> DataSourceUpdates -> m Bool
readStream IO ByteString
body DataSourceUpdates
dataSourceUpdates = do
    $(logError) Text
"starting readStream"
    Text -> Bool -> m Bool
loop Text
"" Bool
False
  where
    loop :: Text -> Bool -> m Bool
loop Text
initial Bool
processedEvent =
        forall (m :: * -> *) a. MonadCatch m => m a -> m (Either ReadE a)
tryReadE (forall (m :: * -> *) a.
Monad m =>
m Text -> Parser a -> Text -> m (Result a)
parseWith (forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ IO ByteString -> IO Text
readWithException IO ByteString
body) Parser SSE
parseEvent Text
initial) forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
            (Left ReadE
ReadETimeout) -> do
                $(logError) Text
"streaming connection unexpectedly closed"
                forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
processedEvent
            (Left ReadE
ReadEClosed) -> do
                $(logError) Text
"timeout waiting for SSE event"
                forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
processedEvent
            (Right Result SSE
parsed) -> case Result SSE
parsed of
                Done Text
remaining SSE
event -> do
                    Bool
processed <- forall (m :: * -> *).
(MonadIO m, MonadLogger m) =>
DataSourceUpdates -> Text -> ByteString -> m Bool
processEvent DataSourceUpdates
dataSourceUpdates (SSE -> Text
name SSE
event) (ByteString -> ByteString
L.fromStrict forall a b. (a -> b) -> a -> b
$ Text -> ByteString
encodeUtf8 forall a b. (a -> b) -> a -> b
$ SSE -> Text
buffer SSE
event)
                    if Bool
processed then Text -> Bool -> m Bool
loop Text
remaining Bool
True else forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
processedEvent
                Fail Text
_ [String]
context String
err -> do
                    $(logError) forall a b. (a -> b) -> a -> b
$ Text -> [Text] -> Text
T.intercalate Text
" " [Text
"failed parsing SSE frame", String -> Text
T.pack forall a b. (a -> b) -> a -> b
$ forall a. Show a => a -> String
show [String]
context, String -> Text
T.pack String
err]
                    forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
processedEvent
                Partial Text -> Result SSE
_ -> do
                    $(logError) Text
"failed parsing SSE frame unexpected partial"
                    forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
processedEvent

-- This function is responsible for consuming a streaming connection.
--
-- It is responsible for opening the connection, parsing the body for as long as it is able, and then handling shut down
-- cleanly. While events are being processed, it is responsible for implementing a sane retry policy that prevents the
-- stampeding herd problem in the unlikely event of upstream failures.
startNewConnection :: (MonadIO m, MonadLogger m, MonadMask m) => Manager -> Request -> DataSourceUpdates -> StreamingState -> m StreamingState
startNewConnection :: forall (m :: * -> *).
(MonadIO m, MonadLogger m, MonadMask m) =>
Manager
-> Request
-> DataSourceUpdates
-> StreamingState
-> m StreamingState
startNewConnection Manager
manager Request
request DataSourceUpdates
dataSourceUpdates state :: StreamingState
state@(StreamingState {Maybe TimeSpec
$sel:activeSince:StreamingState :: StreamingState -> Maybe TimeSpec
activeSince :: Maybe TimeSpec
activeSince}) = do
    $(logDebug) Text
"starting new streaming connection"
    Either HttpException Bool
status <- forall (m :: * -> *) a.
MonadCatch m =>
m a -> m (Either HttpException a)
tryHTTP forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
(MonadIO m, MonadMask m) =>
Request -> Manager -> (Response (IO ByteString) -> m a) -> m a
withResponseGeneric Request
request Manager
manager forall a b. (a -> b) -> a -> b
$ \Response (IO ByteString)
response -> do
        forall (m :: * -> *) body. MonadThrow m => Response body -> m ()
checkAuthorization Response (IO ByteString)
response
            forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall (m :: * -> *).
MonadIO m =>
Request -> Response (IO ByteString) -> m ()
throwIfNot200 Request
request Response (IO ByteString)
response
            forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall (m :: * -> *).
(MonadIO m, MonadLogger m, MonadMask m) =>
IO ByteString -> DataSourceUpdates -> m Bool
readStream (forall body. Response body -> body
responseBody Response (IO ByteString)
response) DataSourceUpdates
dataSourceUpdates
    TimeSpec
now <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ Clock -> IO TimeSpec
getTime Clock
Monotonic
    forall (m :: * -> *).
(MonadIO m, MonadLogger m, MonadMask m) =>
StreamingState
-> TimeSpec -> Either HttpException Bool -> m StreamingState
handleResponse StreamingState
state TimeSpec
now Either HttpException Bool
status
  where
    -- This function is responsible for parsing the final output from a now closed streaming connection.
    --
    -- Given the result of the stream run, this function can choose to either mark the state as cancelled, stopping all
    -- further attempts, or it can update the state and wait some amount of time before starting another attempt.
    handleResponse :: (MonadIO m, MonadLogger m, MonadMask m) => StreamingState -> TimeSpec -> (Either HttpException Bool) -> m StreamingState
    handleResponse :: forall (m :: * -> *).
(MonadIO m, MonadLogger m, MonadMask m) =>
StreamingState
-> TimeSpec -> Either HttpException Bool -> m StreamingState
handleResponse StreamingState
state TimeSpec
now Either HttpException Bool
result =
        let state' :: StreamingState
state' = StreamingState
-> TimeSpec -> Either HttpException Bool -> StreamingState
updateState StreamingState
state TimeSpec
now Either HttpException Bool
result
         in if StreamingState -> Bool
cancel StreamingState
state'
                then forall (f :: * -> *) a. Applicative f => a -> f a
pure StreamingState
state'
                else do
                    Int
delay <- forall (m :: * -> *).
(MonadIO m, MonadLogger m, MonadMask m) =>
StreamingState -> m Int
calculateDelay StreamingState
state'
                    ()
_ <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ Int -> IO ()
threadDelay Int
delay
                    forall (f :: * -> *) a. Applicative f => a -> f a
pure StreamingState
state'

    -- Once a streaming connection run has ended, we need to update the streaming state.
    --
    -- Given the result of a stream run, this function can either choose to mark the state as cancelled, meaning halt all
    -- further attempts, or it can be updated according to the rules of our streaming specification.
    updateState :: StreamingState -> TimeSpec -> (Either HttpException Bool) -> StreamingState
    updateState :: StreamingState
-> TimeSpec -> Either HttpException Bool -> StreamingState
updateState StreamingState
state TimeSpec
now (Right Bool
_) = StreamingState
state {$sel:initialConnection:StreamingState :: Bool
initialConnection = Bool
False, $sel:activeSince:StreamingState :: Maybe TimeSpec
activeSince = forall a. a -> Maybe a
Just TimeSpec
now, $sel:attempt:StreamingState :: Int
attempt = Int
1}
    updateState state :: StreamingState
state@(StreamingState {$sel:attempt:StreamingState :: StreamingState -> Int
attempt = Int
att}) TimeSpec
now (Left (HttpExceptionRequest Request
_ (StatusCodeException Response ()
response ByteString
_)))
        | Int -> Bool
isHttpUnrecoverable Int
code = StreamingState
state {$sel:cancel:StreamingState :: Bool
cancel = Bool
True}
        | Bool
otherwise = do
            case Maybe TimeSpec
activeSince of
                (Just TimeSpec
time)
                    | (TimeSpec
now forall a. Ord a => a -> a -> Bool
>= TimeSpec
time forall a. Num a => a -> a -> a
+ (Int64 -> Int64 -> TimeSpec
TimeSpec Int64
60 Int64
0)) -> StreamingState
state {$sel:attempt:StreamingState :: Int
attempt = Int
1, $sel:activeSince:StreamingState :: Maybe TimeSpec
activeSince = forall a. Maybe a
Nothing}
                    | Bool
otherwise -> StreamingState
state {$sel:attempt:StreamingState :: Int
attempt = Int
att forall a. Num a => a -> a -> a
+ Int
1}
                Maybe TimeSpec
Nothing -> StreamingState
state {$sel:attempt:StreamingState :: Int
attempt = Int
att forall a. Num a => a -> a -> a
+ Int
1}
      where
        code :: Int
code = Status -> Int
statusCode (forall body. Response body -> Status
responseStatus Response ()
response)
    updateState state :: StreamingState
state@(StreamingState {$sel:attempt:StreamingState :: StreamingState -> Int
attempt = Int
att}) TimeSpec
_ Either HttpException Bool
_ = StreamingState
state {$sel:attempt:StreamingState :: Int
attempt = Int
att forall a. Num a => a -> a -> a
+ Int
1}

    -- Calculate the next delay period following a backoff + jitter + max delay algorithm.
    --
    -- See https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/
    calculateDelay :: (MonadIO m, MonadLogger m, MonadMask m) => StreamingState -> m Int
    calculateDelay :: forall (m :: * -> *).
(MonadIO m, MonadLogger m, MonadMask m) =>
StreamingState -> m Int
calculateDelay StreamingState {Int
$sel:initialRetryDelay:StreamingState :: StreamingState -> Int
initialRetryDelay :: Int
initialRetryDelay, $sel:attempt:StreamingState :: StreamingState -> Int
attempt = Int
att} = do
        forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$
            forall (m :: * -> *). MonadIO m => m StdGen
newStdGen forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \StdGen
gen ->
                let timespan :: Int
timespan = forall a. Ord a => a -> a -> a
min (Int
30 forall a. Num a => a -> a -> a
* Int
1_000_000) ((Int
initialRetryDelay forall a. Num a => a -> a -> a
* Int
1_000) forall a. Num a => a -> a -> a
* (Int
2 forall a b. (Num a, Integral b) => a -> b -> a
^ (Int
att forall a. Num a => a -> a -> a
- Int
1)))
                    jitter :: Int
jitter = forall a b. (a, b) -> a
fst forall a b. (a -> b) -> a -> b
$ forall a g. (Random a, RandomGen g) => (a, a) -> g -> (a, g)
randomR (Int
0, Int
timespan forall a. Integral a => a -> a -> a
`div` Int
2) StdGen
gen
                 in forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ (Int
timespan forall a. Num a => a -> a -> a
- Int
jitter)

data StreamingState = StreamingState
    { StreamingState -> Bool
initialConnection :: Bool -- Marker used to determine the first time the streamer connects.
    , StreamingState -> Int
initialRetryDelay :: Int -- The base duration used for the retry delay calculation
    , StreamingState -> Maybe TimeSpec
activeSince :: Maybe TimeSpec -- TimeSpec to denote the last time the SDK successfully connected to the stream
    , StreamingState -> Int
attempt :: Int -- A number representing the attempt # of this connection
    , StreamingState -> Bool
cancel :: Bool -- A marker to shortcut logic and halt the streaming process
    }
    deriving (forall x. Rep StreamingState x -> StreamingState
forall x. StreamingState -> Rep StreamingState x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep StreamingState x -> StreamingState
$cfrom :: forall x. StreamingState -> Rep StreamingState x
Generic, Int -> StreamingState -> ShowS
[StreamingState] -> ShowS
StreamingState -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [StreamingState] -> ShowS
$cshowList :: [StreamingState] -> ShowS
show :: StreamingState -> String
$cshow :: StreamingState -> String
showsPrec :: Int -> StreamingState -> ShowS
$cshowsPrec :: Int -> StreamingState -> ShowS
Show)

-- Start a thread for streaming events back from LaunchDarkly services.
streamingThread :: (MonadIO m, MonadLogger m, MonadMask m) => Text -> Int -> ClientContext -> DataSourceUpdates -> m ()
streamingThread :: forall (m :: * -> *).
(MonadIO m, MonadLogger m, MonadMask m) =>
Text -> Int -> ClientContext -> DataSourceUpdates -> m ()
streamingThread Text
streamURI Int
initialRetryDelay ClientContext
clientContext DataSourceUpdates
dataSourceUpdates = do
    let manager :: Manager
manager = HttpConfiguration -> Manager
tlsManager forall a b. (a -> b) -> a -> b
$ ClientContext -> HttpConfiguration
httpConfiguration ClientContext
clientContext
    Request
req <- forall (m :: * -> *).
MonadThrow m =>
HttpConfiguration -> String -> m Request
prepareRequest (ClientContext -> HttpConfiguration
httpConfiguration ClientContext
clientContext) (Text -> String
T.unpack Text
streamURI forall a. [a] -> [a] -> [a]
++ String
"/all")
    forall (m :: * -> *).
(MonadIO m, MonadLogger m, MonadCatch m) =>
DataSourceUpdates -> m () -> m ()
handleUnauthorized DataSourceUpdates
dataSourceUpdates (forall (m :: * -> *).
(MonadIO m, MonadLogger m, MonadMask m) =>
Manager -> Request -> m ()
processStream Manager
manager Request
req)
  where
    processStream :: (MonadIO m, MonadLogger m, MonadMask m) => Manager -> Request -> m ()
    processStream :: forall (m :: * -> *).
(MonadIO m, MonadLogger m, MonadMask m) =>
Manager -> Request -> m ()
processStream Manager
manager Request
req = do
        forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
Monad m =>
(a -> Bool) -> (a -> m a) -> a -> m a
iterateUntilM (StreamingState -> Bool
cancel) (\StreamingState
state -> forall (m :: * -> *).
(MonadIO m, MonadLogger m, MonadMask m) =>
Manager
-> Request
-> DataSourceUpdates
-> StreamingState
-> m StreamingState
startNewConnection Manager
manager Request
req DataSourceUpdates
dataSourceUpdates StreamingState
state) StreamingState {$sel:initialConnection:StreamingState :: Bool
initialConnection = Bool
True, Int
initialRetryDelay :: Int
$sel:initialRetryDelay:StreamingState :: Int
initialRetryDelay, $sel:activeSince:StreamingState :: Maybe TimeSpec
activeSince = forall a. Maybe a
Nothing, $sel:attempt:StreamingState :: Int
attempt = Int
0, $sel:cancel:StreamingState :: Bool
cancel = Bool
False}