module LaunchDarkly.Server.Network.Streaming (streamingThread) where

import           Data.Text                           (Text)
import qualified Data.Text as                        T
import           Data.Attoparsec.Text as             P hiding (Result, try)
import           Data.Function                       (fix)
import           Control.Monad                       (void, mzero)
import           Control.Monad.Catch                 (Exception, MonadCatch, MonadMask, try)
import           Control.Exception                   (throwIO)
import           Data.ByteString                     (ByteString)
import qualified Data.ByteString as                  B
import           Control.Applicative                 (many)
import           Data.Text.Encoding                  (decodeUtf8, encodeUtf8)
import           Network.HTTP.Client                 (Manager, Response(..), Request, HttpException(..), HttpExceptionContent(..), brRead, throwErrorStatusCodes)
import           Control.Monad.Logger                (MonadLogger, logInfo, logWarn, logError, logDebug)
import           Control.Monad.IO.Class              (MonadIO, liftIO)
import           Data.Aeson                          (FromJSON, parseJSON, withObject, eitherDecode, (.:), (.:?), (.!=), fromJSON, Result(..))
import qualified Data.ByteString.Lazy as             L
import           GHC.Natural                         (Natural)
import           GHC.Generics                        (Generic)
import           Control.Retry                       (RetryPolicyM, RetryStatus, fullJitterBackoff, capDelay, retrying)
import           Network.HTTP.Types.Status           (ok200, Status(statusCode))
import           System.Timeout                      (timeout)
import           System.Clock                        (getTime, Clock(Monotonic), TimeSpec(TimeSpec))

import           LaunchDarkly.Server.Config.HttpConfiguration (HttpConfiguration(..), prepareRequest)
import           LaunchDarkly.Server.Config.ClientContext     (ClientContext(..))
import           LaunchDarkly.Server.Store.Internal           (StoreResult)
import           LaunchDarkly.Server.DataSource.Internal      (DataSourceUpdates(..))
import           LaunchDarkly.Server.Features                 (Flag, Segment)
import           LaunchDarkly.Server.Network.Common           (handleUnauthorized, checkAuthorization, withResponseGeneric, tryHTTP)
import           LaunchDarkly.AesonCompat                     (KeyMap)


data PutBody = PutBody
    { PutBody -> KeyMap Flag
flags    :: !(KeyMap Flag)
    , PutBody -> KeyMap Segment
segments :: !(KeyMap Segment)
    } deriving (forall x. Rep PutBody x -> PutBody
forall x. PutBody -> Rep PutBody x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep PutBody x -> PutBody
$cfrom :: forall x. PutBody -> Rep PutBody x
Generic, Int -> PutBody -> ShowS
[PutBody] -> ShowS
PutBody -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [PutBody] -> ShowS
$cshowList :: [PutBody] -> ShowS
show :: PutBody -> String
$cshow :: PutBody -> String
showsPrec :: Int -> PutBody -> ShowS
$cshowsPrec :: Int -> PutBody -> ShowS
Show, Value -> Parser [PutBody]
Value -> Parser PutBody
forall a.
(Value -> Parser a) -> (Value -> Parser [a]) -> FromJSON a
parseJSONList :: Value -> Parser [PutBody]
$cparseJSONList :: Value -> Parser [PutBody]
parseJSON :: Value -> Parser PutBody
$cparseJSON :: Value -> Parser PutBody
FromJSON)

data PathData d = PathData
    { forall d. PathData d -> Text
path     :: !Text
    , forall d. PathData d -> d
pathData :: !d
    } deriving (forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
forall d x. Rep (PathData d) x -> PathData d
forall d x. PathData d -> Rep (PathData d) x
$cto :: forall d x. Rep (PathData d) x -> PathData d
$cfrom :: forall d x. PathData d -> Rep (PathData d) x
Generic, Int -> PathData d -> ShowS
forall d. Show d => Int -> PathData d -> ShowS
forall d. Show d => [PathData d] -> ShowS
forall d. Show d => PathData d -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [PathData d] -> ShowS
$cshowList :: forall d. Show d => [PathData d] -> ShowS
show :: PathData d -> String
$cshow :: forall d. Show d => PathData d -> String
showsPrec :: Int -> PathData d -> ShowS
$cshowsPrec :: forall d. Show d => Int -> PathData d -> ShowS
Show)

data PathVersion = PathVersion
    { PathVersion -> Text
path    :: !Text
    , PathVersion -> Natural
version :: !Natural
    } deriving (forall x. Rep PathVersion x -> PathVersion
forall x. PathVersion -> Rep PathVersion x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep PathVersion x -> PathVersion
$cfrom :: forall x. PathVersion -> Rep PathVersion x
Generic, Int -> PathVersion -> ShowS
[PathVersion] -> ShowS
PathVersion -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [PathVersion] -> ShowS
$cshowList :: [PathVersion] -> ShowS
show :: PathVersion -> String
$cshow :: PathVersion -> String
showsPrec :: Int -> PathVersion -> ShowS
$cshowsPrec :: Int -> PathVersion -> ShowS
Show, Value -> Parser [PathVersion]
Value -> Parser PathVersion
forall a.
(Value -> Parser a) -> (Value -> Parser [a]) -> FromJSON a
parseJSONList :: Value -> Parser [PathVersion]
$cparseJSONList :: Value -> Parser [PathVersion]
parseJSON :: Value -> Parser PathVersion
$cparseJSON :: Value -> Parser PathVersion
FromJSON)

instance FromJSON a => FromJSON (PathData a) where
    parseJSON :: Value -> Parser (PathData a)
parseJSON = forall a. String -> (Object -> Parser a) -> Value -> Parser a
withObject String
"Put" forall a b. (a -> b) -> a -> b
$ \Object
o -> do
        a
pathData <- Object
o forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"data"
        Text
path     <- Object
o forall a. FromJSON a => Object -> Key -> Parser (Maybe a)
.:? Key
"path" forall a. Parser (Maybe a) -> a -> Parser a
.!= Text
"/"
        forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ PathData { $sel:path:PathData :: Text
path = Text
path, $sel:pathData:PathData :: a
pathData = a
pathData }

data SSE = SSE
    { SSE -> Text
name        :: !Text
    , SSE -> Text
buffer      :: !Text
    , SSE -> Maybe Text
lastEventId :: !(Maybe Text)
    , SSE -> Maybe Text
retry       :: !(Maybe Text)
    } deriving (forall x. Rep SSE x -> SSE
forall x. SSE -> Rep SSE x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep SSE x -> SSE
$cfrom :: forall x. SSE -> Rep SSE x
Generic, Int -> SSE -> ShowS
[SSE] -> ShowS
SSE -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [SSE] -> ShowS
$cshowList :: [SSE] -> ShowS
show :: SSE -> String
$cshow :: SSE -> String
showsPrec :: Int -> SSE -> ShowS
$cshowsPrec :: Int -> SSE -> ShowS
Show, SSE -> SSE -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: SSE -> SSE -> Bool
$c/= :: SSE -> SSE -> Bool
== :: SSE -> SSE -> Bool
$c== :: SSE -> SSE -> Bool
Eq)

nameCharPredicate :: Char -> Bool
nameCharPredicate :: Char -> Bool
nameCharPredicate Char
x = Char
x forall a. Eq a => a -> a -> Bool
/= Char
'\r' Bool -> Bool -> Bool
&& Char
x forall a. Eq a => a -> a -> Bool
/= Char
':' Bool -> Bool -> Bool
&& Char
x forall a. Eq a => a -> a -> Bool
/= Char
'\n'

anyCharPredicate :: Char -> Bool
anyCharPredicate :: Char -> Bool
anyCharPredicate Char
x = Char
x forall a. Eq a => a -> a -> Bool
/= Char
'\r' Bool -> Bool -> Bool
&& Char
x forall a. Eq a => a -> a -> Bool
/= Char
'\n'

endOfLineSSE :: Parser ()
endOfLineSSE :: Parser ()
endOfLineSSE = forall (f :: * -> *) a. Alternative f => [f a] -> f a
choice [forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ Text -> Parser Text
string Text
"\r\n", forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ Text -> Parser Text
string Text
"\r", forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ Text -> Parser Text
string Text
"\n", forall t. Chunk t => Parser t ()
endOfInput]

comment :: Parser ()
comment :: Parser ()
comment = Char -> Parser Char
char Char
':' forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> (Char -> Bool) -> Parser Text
P.takeWhile Char -> Bool
anyCharPredicate forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Parser ()
endOfLineSSE forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall (f :: * -> *) a. Applicative f => a -> f a
pure ()

parseField :: Parser (Text, Text)
parseField :: Parser (Text, Text)
parseField = do
    Text
fieldName <- (Char -> Bool) -> Parser Text
P.takeWhile1 Char -> Bool
nameCharPredicate
    forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) a. Alternative f => a -> f a -> f a
option Char
' ' forall a b. (a -> b) -> a -> b
$ Char -> Parser Char
char Char
':'
    forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) a. Alternative f => a -> f a -> f a
option Char
' ' forall a b. (a -> b) -> a -> b
$ Char -> Parser Char
char Char
' '
    Text
fieldValue <- (Char -> Bool) -> Parser Text
P.takeWhile Char -> Bool
anyCharPredicate
    Parser ()
endOfLineSSE
    forall (f :: * -> *) a. Applicative f => a -> f a
pure (Text
fieldName, Text
fieldValue)

processField :: (Text, Text) -> SSE -> SSE
processField :: (Text, Text) -> SSE -> SSE
processField (Text
fieldName, Text
fieldValue) SSE
event = case Text
fieldName of
    Text
"event"   -> SSE
event { $sel:name:SSE :: Text
name = Text
fieldValue }
    Text
"id"      -> SSE
event { $sel:lastEventId:SSE :: Maybe Text
lastEventId = forall a. a -> Maybe a
Just Text
fieldValue }
    Text
"retry"   -> SSE
event { $sel:retry:SSE :: Maybe Text
retry = forall a. a -> Maybe a
Just Text
fieldValue }
    Text
"data"    -> SSE
event { $sel:buffer:SSE :: Text
buffer = [Text] -> Text
T.concat [ SSE -> Text
buffer SSE
event, if Text -> Bool
T.null (SSE -> Text
buffer SSE
event) then Text
"" else Text
"\n", Text
fieldValue] }
    Text
_         -> SSE
event

parseEvent :: Parser SSE
parseEvent :: Parser SSE
parseEvent = do
    [(Text, Text)]
fields <- forall (f :: * -> *) a. Alternative f => f a -> f [a]
many (forall (f :: * -> *) a. Alternative f => f a -> f [a]
many Parser ()
comment forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Parser (Text, Text)
parseField forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= forall (f :: * -> *) a. Applicative f => a -> f a
pure)
    Parser ()
endOfLineSSE
    let event :: SSE
event = forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr (Text, Text) -> SSE -> SSE
processField (Text -> Text -> Maybe Text -> Maybe Text -> SSE
SSE Text
"" Text
"" forall (m :: * -> *) a. MonadPlus m => m a
mzero forall (m :: * -> *) a. MonadPlus m => m a
mzero) [(Text, Text)]
fields
    if Text -> Bool
T.null (SSE -> Text
name SSE
event) Bool -> Bool -> Bool
|| Text -> Bool
T.null (SSE -> Text
buffer SSE
event) then Parser SSE
parseEvent else forall (f :: * -> *) a. Applicative f => a -> f a
pure SSE
event

processPut :: (MonadIO m, MonadLogger m) => DataSourceUpdates -> L.ByteString -> m ()
processPut :: forall (m :: * -> *).
(MonadIO m, MonadLogger m) =>
DataSourceUpdates -> ByteString -> m ()
processPut DataSourceUpdates
dataSourceUpdates ByteString
value = case forall a. FromJSON a => ByteString -> Either String a
eitherDecode ByteString
value of
    Right (PathData Text
_ (PutBody KeyMap Flag
flags KeyMap Segment
segments)) -> do
        $(logInfo) Text
"initializing dataSourceUpdates with put"
        forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (DataSourceUpdates
-> KeyMap Flag -> KeyMap Segment -> IO (Either Text ())
dataSourceUpdatesInit DataSourceUpdates
dataSourceUpdates KeyMap Flag
flags KeyMap Segment
segments) forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
            Left Text
err -> $(logError) forall a b. (a -> b) -> a -> b
$ Text -> Text -> Text
T.append Text
"dataSourceUpdates failed put: " Text
err
            Either Text ()
_        -> forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
    Left String
err -> $(logError) forall a b. (a -> b) -> a -> b
$ Text -> Text -> Text
T.append Text
"failed to parse put body" (String -> Text
T.pack String
err)

processPatch :: forall m. (MonadIO m, MonadLogger m) => DataSourceUpdates -> L.ByteString -> m ()
processPatch :: forall (m :: * -> *).
(MonadIO m, MonadLogger m) =>
DataSourceUpdates -> ByteString -> m ()
processPatch DataSourceUpdates
dataSourceUpdates ByteString
value = case forall a. FromJSON a => ByteString -> Either String a
eitherDecode ByteString
value of
    Right (PathData Text
path Value
body)
        | Text -> Text -> Bool
T.isPrefixOf Text
"/flags/" Text
path    -> forall a.
Text
-> Text
-> (DataSourceUpdates -> a -> IO (Either Text ()))
-> Result a
-> m ()
insPatch Text
"flag" Text
path DataSourceUpdates -> Flag -> IO (Either Text ())
dataSourceUpdatesInsertFlag (forall a. FromJSON a => Value -> Result a
fromJSON Value
body)
        | Text -> Text -> Bool
T.isPrefixOf Text
"/segments/" Text
path -> forall a.
Text
-> Text
-> (DataSourceUpdates -> a -> IO (Either Text ()))
-> Result a
-> m ()
insPatch Text
"segment" Text
path DataSourceUpdates -> Segment -> IO (Either Text ())
dataSourceUpdatesInsertSegment (forall a. FromJSON a => Value -> Result a
fromJSON Value
body)
        | Bool
otherwise                      -> $(logError) Text
"unknown patch path"
    Left String
err                             -> $(logError) forall a b. (a -> b) -> a -> b
$ Text -> Text -> Text
T.append Text
"failed to parse patch generic" (String -> Text
T.pack String
err)
    where
      insPatch :: Text -> Text -> (DataSourceUpdates -> a -> IO (Either Text ())) -> Result a -> m ()
      insPatch :: forall a.
Text
-> Text
-> (DataSourceUpdates -> a -> IO (Either Text ()))
-> Result a
-> m ()
insPatch Text
name Text
_ DataSourceUpdates -> a -> IO (Either Text ())
_ (Error String
err) = $(logError) forall a b. (a -> b) -> a -> b
$ [Text] -> Text
T.concat [Text
"failed to parse patch ", Text
name, Text
": ", String -> Text
T.pack String
err]
      insPatch Text
name Text
path DataSourceUpdates -> a -> IO (Either Text ())
insert (Success a
item) = do
        $(logInfo) forall a b. (a -> b) -> a -> b
$ [Text] -> Text
T.concat [Text
"patching ", Text
name, Text
" with path: ", Text
path]
        Either Text ()
status <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ DataSourceUpdates -> a -> IO (Either Text ())
insert DataSourceUpdates
dataSourceUpdates a
item
        forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either (\Text
err -> $(logError) forall a b. (a -> b) -> a -> b
$ [Text] -> Text
T.concat [Text
"dataSourceUpdates failed ", Text
name, Text
" patch: ", Text
err]) forall (f :: * -> *) a. Applicative f => a -> f a
pure Either Text ()
status

processDelete :: forall m. (MonadIO m, MonadLogger m) => DataSourceUpdates -> L.ByteString -> m ()
processDelete :: forall (m :: * -> *).
(MonadIO m, MonadLogger m) =>
DataSourceUpdates -> ByteString -> m ()
processDelete DataSourceUpdates
dataSourceUpdates ByteString
value = case forall a. FromJSON a => ByteString -> Either String a
eitherDecode ByteString
value :: Either String PathVersion of
    Right (PathVersion Text
path Natural
version)
        | Text -> Text -> Bool
T.isPrefixOf Text
"/flags/" Text
path    -> Text -> Text -> IO (Either Text ()) -> m ()
logDelete Text
"flag" Text
path (DataSourceUpdates -> Text -> Natural -> IO (Either Text ())
dataSourceUpdatesDeleteFlag DataSourceUpdates
dataSourceUpdates (Int -> Text -> Text
T.drop Int
7 Text
path) Natural
version)
        | Text -> Text -> Bool
T.isPrefixOf Text
"/segments/" Text
path -> Text -> Text -> IO (Either Text ()) -> m ()
logDelete Text
"segment" Text
path (DataSourceUpdates -> Text -> Natural -> IO (Either Text ())
dataSourceUpdatesDeleteSegment DataSourceUpdates
dataSourceUpdates (Int -> Text -> Text
T.drop Int
10 Text
path) Natural
version)
        | Bool
otherwise                      -> $(logError) Text
"unknown delete path"
    Left String
err                             -> $(logError) forall a b. (a -> b) -> a -> b
$ Text -> Text -> Text
T.append Text
"failed to parse delete" (String -> Text
T.pack String
err)
    where logDelete :: Text -> Text -> StoreResult () -> m ()
          logDelete :: Text -> Text -> IO (Either Text ()) -> m ()
logDelete Text
name Text
path IO (Either Text ())
action = do
            $(logInfo) forall a b. (a -> b) -> a -> b
$ [Text] -> Text
T.concat [Text
"deleting ", Text
name, Text
" with path: ", Text
path]
            Either Text ()
status <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (Either Text ())
action
            forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either (\Text
err -> $(logError) forall a b. (a -> b) -> a -> b
$ [Text] -> Text
T.concat [Text
"dataSourceUpdates failed ", Text
name, Text
" delete: ", Text
err]) forall (f :: * -> *) a. Applicative f => a -> f a
pure Either Text ()
status

processEvent :: (MonadIO m, MonadLogger m) => DataSourceUpdates -> Text -> L.ByteString -> m ()
processEvent :: forall (m :: * -> *).
(MonadIO m, MonadLogger m) =>
DataSourceUpdates -> Text -> ByteString -> m ()
processEvent DataSourceUpdates
dataSourceUpdates Text
name ByteString
value = case Text
name of
    Text
"put"    -> forall (m :: * -> *).
(MonadIO m, MonadLogger m) =>
DataSourceUpdates -> ByteString -> m ()
processPut DataSourceUpdates
dataSourceUpdates ByteString
value
    Text
"patch"  -> forall (m :: * -> *).
(MonadIO m, MonadLogger m) =>
DataSourceUpdates -> ByteString -> m ()
processPatch DataSourceUpdates
dataSourceUpdates ByteString
value
    Text
"delete" -> forall (m :: * -> *).
(MonadIO m, MonadLogger m) =>
DataSourceUpdates -> ByteString -> m ()
processDelete DataSourceUpdates
dataSourceUpdates ByteString
value
    Text
_        -> $(logWarn) Text
"unknown event type"

data ReadE = ReadETimeout | ReadEClosed deriving (Int -> ReadE -> ShowS
[ReadE] -> ShowS
ReadE -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [ReadE] -> ShowS
$cshowList :: [ReadE] -> ShowS
show :: ReadE -> String
$cshow :: ReadE -> String
showsPrec :: Int -> ReadE -> ShowS
$cshowsPrec :: Int -> ReadE -> ShowS
Show, Show ReadE
Typeable ReadE
SomeException -> Maybe ReadE
ReadE -> String
ReadE -> SomeException
forall e.
Typeable e
-> Show e
-> (e -> SomeException)
-> (SomeException -> Maybe e)
-> (e -> String)
-> Exception e
displayException :: ReadE -> String
$cdisplayException :: ReadE -> String
fromException :: SomeException -> Maybe ReadE
$cfromException :: SomeException -> Maybe ReadE
toException :: ReadE -> SomeException
$ctoException :: ReadE -> SomeException
Exception)

tryReadE :: MonadCatch m => m a -> m (Either ReadE a)
tryReadE :: forall (m :: * -> *) a. MonadCatch m => m a -> m (Either ReadE a)
tryReadE = forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> m (Either e a)
try

-- heartbeat expected every 120 seconds
readWithException :: IO ByteString -> IO Text
readWithException :: IO ByteString -> IO Text
readWithException IO ByteString
body = forall a. Int -> IO a -> IO (Maybe a)
timeout (Int
1000000 forall a. Num a => a -> a -> a
* Int
300) (IO ByteString -> IO ByteString
brRead IO ByteString
body) forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
    Maybe ByteString
Nothing    -> forall e a. Exception e => e -> IO a
throwIO ReadE
ReadETimeout
    Just ByteString
bytes -> if ByteString
bytes forall a. Eq a => a -> a -> Bool
== ByteString
B.empty then forall e a. Exception e => e -> IO a
throwIO ReadE
ReadEClosed else forall (f :: * -> *) a. Applicative f => a -> f a
pure (ByteString -> Text
decodeUtf8 ByteString
bytes)

readStream :: (MonadIO m, MonadLogger m, MonadMask m) => IO ByteString -> DataSourceUpdates -> m ()
readStream :: forall (m :: * -> *).
(MonadIO m, MonadLogger m, MonadMask m) =>
IO ByteString -> DataSourceUpdates -> m ()
readStream IO ByteString
body DataSourceUpdates
dataSourceUpdates = Text -> m ()
loop Text
"" where
    loop :: Text -> m ()
loop Text
initial = forall (m :: * -> *) a. MonadCatch m => m a -> m (Either ReadE a)
tryReadE (forall (m :: * -> *) a.
Monad m =>
m Text -> Parser a -> Text -> m (Result a)
parseWith (forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ IO ByteString -> IO Text
readWithException IO ByteString
body) Parser SSE
parseEvent Text
initial) forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
        (Left ReadE
ReadETimeout) -> $(logError) Text
"streaming connection unexpectedly closed"
        (Left ReadE
ReadEClosed)  -> $(logError) Text
"timeout waiting for SSE event"
        (Right Result SSE
parsed)      -> case Result SSE
parsed of
            Done Text
remaining SSE
event -> do
                forall (m :: * -> *).
(MonadIO m, MonadLogger m) =>
DataSourceUpdates -> Text -> ByteString -> m ()
processEvent DataSourceUpdates
dataSourceUpdates (SSE -> Text
name SSE
event) (ByteString -> ByteString
L.fromStrict forall a b. (a -> b) -> a -> b
$ Text -> ByteString
encodeUtf8 forall a b. (a -> b) -> a -> b
$ SSE -> Text
buffer SSE
event)
                Text -> m ()
loop Text
remaining
            Fail Text
_ [String]
context String
err ->
                $(logError) forall a b. (a -> b) -> a -> b
$ Text -> [Text] -> Text
T.intercalate Text
" " [Text
"failed parsing SSE frame", String -> Text
T.pack forall a b. (a -> b) -> a -> b
$ forall a. Show a => a -> String
show [String]
context, String -> Text
T.pack String
err]
            Partial Text -> Result SSE
_ -> $(logError) Text
"failed parsing SSE frame unexpected partial"

-- https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/
retryPolicy :: MonadIO m => RetryPolicyM m
retryPolicy :: forall (m :: * -> *). MonadIO m => RetryPolicyM m
retryPolicy = forall (m :: * -> *).
Monad m =>
Int -> RetryPolicyM m -> RetryPolicyM m
capDelay (Int
30 forall a. Num a => a -> a -> a
* Int
1000000) forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *). MonadIO m => Int -> RetryPolicyM m
fullJitterBackoff (Int
1 forall a. Num a => a -> a -> a
* Int
1000000)

data Failure = FailurePermanent | FailureTemporary | FailureReset deriving (Failure -> Failure -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: Failure -> Failure -> Bool
$c/= :: Failure -> Failure -> Bool
== :: Failure -> Failure -> Bool
$c== :: Failure -> Failure -> Bool
Eq)

handleStream :: (MonadIO m, MonadLogger m, MonadMask m) => Manager -> Request -> DataSourceUpdates -> RetryStatus -> m Failure
handleStream :: forall (m :: * -> *).
(MonadIO m, MonadLogger m, MonadMask m) =>
Manager -> Request -> DataSourceUpdates -> RetryStatus -> m Failure
handleStream Manager
manager Request
request DataSourceUpdates
dataSourceUpdates RetryStatus
_ = do
    $(logDebug) Text
"starting new streaming connection"
    TimeSpec
startTime <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ Clock -> IO TimeSpec
getTime Clock
Monotonic
    Either HttpException ()
status <- forall (m :: * -> *) a.
MonadCatch m =>
m a -> m (Either HttpException a)
tryHTTP forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
(MonadIO m, MonadMask m) =>
Request -> Manager -> (Response (IO ByteString) -> m a) -> m a
withResponseGeneric Request
request Manager
manager forall a b. (a -> b) -> a -> b
$ \Response (IO ByteString)
response -> forall (m :: * -> *) body. MonadThrow m => Response body -> m ()
checkAuthorization Response (IO ByteString)
response forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>>
        if forall body. Response body -> Status
responseStatus Response (IO ByteString)
response forall a. Eq a => a -> a -> Bool
/= Status
ok200 then forall (m :: * -> *).
MonadIO m =>
Request -> Response (IO ByteString) -> m ()
throwErrorStatusCodes Request
request Response (IO ByteString)
response else
            forall (m :: * -> *).
(MonadIO m, MonadLogger m, MonadMask m) =>
IO ByteString -> DataSourceUpdates -> m ()
readStream (forall body. Response body -> body
responseBody Response (IO ByteString)
response) DataSourceUpdates
dataSourceUpdates
    case Either HttpException ()
status of
        (Right ()) -> forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Clock -> IO TimeSpec
getTime Clock
Monotonic) forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \TimeSpec
now -> if TimeSpec
now forall a. Ord a => a -> a -> Bool
>= TimeSpec
startTime forall a. Num a => a -> a -> a
+ (Int64 -> Int64 -> TimeSpec
TimeSpec Int64
60 Int64
0)
            then do
                $(logError) Text
"streaming connection closed after 60 seconds, retrying instantly"
                forall (f :: * -> *) a. Applicative f => a -> f a
pure Failure
FailureReset
            else do
                $(logError) Text
"streaming connection closed before 60 seconds, retrying after backoff"
                forall (f :: * -> *) a. Applicative f => a -> f a
pure Failure
FailureTemporary
        (Left HttpException
err) -> do
            $(logError) (Text -> [Text] -> Text
T.intercalate Text
" " [Text
"HTTP Exception", String -> Text
T.pack forall a b. (a -> b) -> a -> b
$ forall a. Show a => a -> String
show HttpException
err])
            forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ case HttpException
err of
                (InvalidUrlException String
_ String
_)                                 -> Failure
FailurePermanent
                (HttpExceptionRequest Request
_ (StatusCodeException Response ()
response ByteString
_)) ->
                    let code :: Int
code = Status -> Int
statusCode (forall body. Response body -> Status
responseStatus Response ()
response) in if Int
code forall a. Ord a => a -> a -> Bool
>= Int
400 Bool -> Bool -> Bool
&& Int
code forall a. Ord a => a -> a -> Bool
< Int
500
                        then if forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
elem Int
code [Int
400, Int
408, Int
429] then Failure
FailureTemporary else Failure
FailurePermanent
                        else Failure
FailureTemporary
                HttpException
_                                                         -> Failure
FailureTemporary

streamingThread :: (MonadIO m, MonadLogger m, MonadMask m) => Text -> ClientContext -> DataSourceUpdates -> m ()
streamingThread :: forall (m :: * -> *).
(MonadIO m, MonadLogger m, MonadMask m) =>
Text -> ClientContext -> DataSourceUpdates -> m ()
streamingThread Text
streamURI ClientContext
clientContext DataSourceUpdates
dataSourceUpdates = do
    let manager :: Manager
manager = HttpConfiguration -> Manager
tlsManager forall a b. (a -> b) -> a -> b
$ ClientContext -> HttpConfiguration
httpConfiguration ClientContext
clientContext
    Request
req <- forall (m :: * -> *).
MonadThrow m =>
HttpConfiguration -> String -> m Request
prepareRequest (ClientContext -> HttpConfiguration
httpConfiguration ClientContext
clientContext) (Text -> String
T.unpack Text
streamURI  forall a. [a] -> [a] -> [a]
++ String
"/all")
    forall (m :: * -> *).
(MonadIO m, MonadLogger m, MonadCatch m) =>
DataSourceUpdates -> m () -> m ()
handleUnauthorized DataSourceUpdates
dataSourceUpdates forall a b. (a -> b) -> a -> b
$ forall a. (a -> a) -> a
fix forall a b. (a -> b) -> a -> b
$ \m ()
loop ->
        forall (m :: * -> *) b.
MonadIO m =>
RetryPolicyM m
-> (RetryStatus -> b -> m Bool) -> (RetryStatus -> m b) -> m b
retrying forall (m :: * -> *). MonadIO m => RetryPolicyM m
retryPolicy (\RetryStatus
_ Failure
status -> forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ Failure
status forall a. Eq a => a -> a -> Bool
== Failure
FailureTemporary) (forall (m :: * -> *).
(MonadIO m, MonadLogger m, MonadMask m) =>
Manager -> Request -> DataSourceUpdates -> RetryStatus -> m Failure
handleStream Manager
manager Request
req DataSourceUpdates
dataSourceUpdates) forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>=
            \case Failure
FailureReset -> m ()
loop; Failure
_ -> forall (f :: * -> *) a. Applicative f => a -> f a
pure ()