module LaunchDarkly.Server.Network.Streaming (streamingThread) where
import Data.Text (Text)
import qualified Data.Text as T
import Data.Attoparsec.Text as P hiding (Result, try)
import Data.Function (fix)
import Control.Monad (void, mzero)
import Control.Monad.Catch (Exception, MonadCatch, MonadMask, try)
import Control.Exception (throwIO)
import Data.ByteString (ByteString)
import qualified Data.ByteString as B
import Control.Applicative (many)
import Data.Text.Encoding (decodeUtf8, encodeUtf8)
import Network.HTTP.Client (Manager, Response(..), Request, HttpException(..), HttpExceptionContent(..), brRead, throwErrorStatusCodes)
import Control.Monad.Logger (MonadLogger, logInfo, logWarn, logError, logDebug)
import Control.Monad.IO.Class (MonadIO, liftIO)
import Data.Aeson (FromJSON, parseJSON, withObject, eitherDecode, (.:), (.:?), (.!=), fromJSON, Result(..))
import qualified Data.ByteString.Lazy as L
import GHC.Natural (Natural)
import GHC.Generics (Generic)
import Control.Retry (RetryPolicyM, RetryStatus, fullJitterBackoff, capDelay, retrying)
import Network.HTTP.Types.Status (ok200, Status(statusCode))
import System.Timeout (timeout)
import System.Clock (getTime, Clock(Monotonic), TimeSpec(TimeSpec))
import LaunchDarkly.Server.Config.HttpConfiguration (HttpConfiguration(..), prepareRequest)
import LaunchDarkly.Server.Config.ClientContext (ClientContext(..))
import LaunchDarkly.Server.Store.Internal (StoreResult)
import LaunchDarkly.Server.DataSource.Internal (DataSourceUpdates(..))
import LaunchDarkly.Server.Features (Flag, Segment)
import LaunchDarkly.Server.Network.Common (handleUnauthorized, checkAuthorization, withResponseGeneric, tryHTTP)
import LaunchDarkly.AesonCompat (KeyMap)
data PutBody = PutBody
{ PutBody -> KeyMap Flag
flags :: !(KeyMap Flag)
, PutBody -> KeyMap Segment
segments :: !(KeyMap Segment)
} deriving (forall x. Rep PutBody x -> PutBody
forall x. PutBody -> Rep PutBody x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep PutBody x -> PutBody
$cfrom :: forall x. PutBody -> Rep PutBody x
Generic, Int -> PutBody -> ShowS
[PutBody] -> ShowS
PutBody -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [PutBody] -> ShowS
$cshowList :: [PutBody] -> ShowS
show :: PutBody -> String
$cshow :: PutBody -> String
showsPrec :: Int -> PutBody -> ShowS
$cshowsPrec :: Int -> PutBody -> ShowS
Show, Value -> Parser [PutBody]
Value -> Parser PutBody
forall a.
(Value -> Parser a) -> (Value -> Parser [a]) -> FromJSON a
parseJSONList :: Value -> Parser [PutBody]
$cparseJSONList :: Value -> Parser [PutBody]
parseJSON :: Value -> Parser PutBody
$cparseJSON :: Value -> Parser PutBody
FromJSON)
data PathData d = PathData
{ forall d. PathData d -> Text
path :: !Text
, forall d. PathData d -> d
pathData :: !d
} deriving (forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
forall d x. Rep (PathData d) x -> PathData d
forall d x. PathData d -> Rep (PathData d) x
$cto :: forall d x. Rep (PathData d) x -> PathData d
$cfrom :: forall d x. PathData d -> Rep (PathData d) x
Generic, Int -> PathData d -> ShowS
forall d. Show d => Int -> PathData d -> ShowS
forall d. Show d => [PathData d] -> ShowS
forall d. Show d => PathData d -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [PathData d] -> ShowS
$cshowList :: forall d. Show d => [PathData d] -> ShowS
show :: PathData d -> String
$cshow :: forall d. Show d => PathData d -> String
showsPrec :: Int -> PathData d -> ShowS
$cshowsPrec :: forall d. Show d => Int -> PathData d -> ShowS
Show)
data PathVersion = PathVersion
{ PathVersion -> Text
path :: !Text
, PathVersion -> Natural
version :: !Natural
} deriving (forall x. Rep PathVersion x -> PathVersion
forall x. PathVersion -> Rep PathVersion x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep PathVersion x -> PathVersion
$cfrom :: forall x. PathVersion -> Rep PathVersion x
Generic, Int -> PathVersion -> ShowS
[PathVersion] -> ShowS
PathVersion -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [PathVersion] -> ShowS
$cshowList :: [PathVersion] -> ShowS
show :: PathVersion -> String
$cshow :: PathVersion -> String
showsPrec :: Int -> PathVersion -> ShowS
$cshowsPrec :: Int -> PathVersion -> ShowS
Show, Value -> Parser [PathVersion]
Value -> Parser PathVersion
forall a.
(Value -> Parser a) -> (Value -> Parser [a]) -> FromJSON a
parseJSONList :: Value -> Parser [PathVersion]
$cparseJSONList :: Value -> Parser [PathVersion]
parseJSON :: Value -> Parser PathVersion
$cparseJSON :: Value -> Parser PathVersion
FromJSON)
instance FromJSON a => FromJSON (PathData a) where
parseJSON :: Value -> Parser (PathData a)
parseJSON = forall a. String -> (Object -> Parser a) -> Value -> Parser a
withObject String
"Put" forall a b. (a -> b) -> a -> b
$ \Object
o -> do
a
pathData <- Object
o forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"data"
Text
path <- Object
o forall a. FromJSON a => Object -> Key -> Parser (Maybe a)
.:? Key
"path" forall a. Parser (Maybe a) -> a -> Parser a
.!= Text
"/"
forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ PathData { $sel:path:PathData :: Text
path = Text
path, $sel:pathData:PathData :: a
pathData = a
pathData }
data SSE = SSE
{ SSE -> Text
name :: !Text
, SSE -> Text
buffer :: !Text
, SSE -> Maybe Text
lastEventId :: !(Maybe Text)
, SSE -> Maybe Text
retry :: !(Maybe Text)
} deriving (forall x. Rep SSE x -> SSE
forall x. SSE -> Rep SSE x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep SSE x -> SSE
$cfrom :: forall x. SSE -> Rep SSE x
Generic, Int -> SSE -> ShowS
[SSE] -> ShowS
SSE -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [SSE] -> ShowS
$cshowList :: [SSE] -> ShowS
show :: SSE -> String
$cshow :: SSE -> String
showsPrec :: Int -> SSE -> ShowS
$cshowsPrec :: Int -> SSE -> ShowS
Show, SSE -> SSE -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: SSE -> SSE -> Bool
$c/= :: SSE -> SSE -> Bool
== :: SSE -> SSE -> Bool
$c== :: SSE -> SSE -> Bool
Eq)
nameCharPredicate :: Char -> Bool
nameCharPredicate :: Char -> Bool
nameCharPredicate Char
x = Char
x forall a. Eq a => a -> a -> Bool
/= Char
'\r' Bool -> Bool -> Bool
&& Char
x forall a. Eq a => a -> a -> Bool
/= Char
':' Bool -> Bool -> Bool
&& Char
x forall a. Eq a => a -> a -> Bool
/= Char
'\n'
anyCharPredicate :: Char -> Bool
anyCharPredicate :: Char -> Bool
anyCharPredicate Char
x = Char
x forall a. Eq a => a -> a -> Bool
/= Char
'\r' Bool -> Bool -> Bool
&& Char
x forall a. Eq a => a -> a -> Bool
/= Char
'\n'
endOfLineSSE :: Parser ()
endOfLineSSE :: Parser ()
endOfLineSSE = forall (f :: * -> *) a. Alternative f => [f a] -> f a
choice [forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ Text -> Parser Text
string Text
"\r\n", forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ Text -> Parser Text
string Text
"\r", forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ Text -> Parser Text
string Text
"\n", forall t. Chunk t => Parser t ()
endOfInput]
comment :: Parser ()
= Char -> Parser Char
char Char
':' forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> (Char -> Bool) -> Parser Text
P.takeWhile Char -> Bool
anyCharPredicate forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Parser ()
endOfLineSSE forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
parseField :: Parser (Text, Text)
parseField :: Parser (Text, Text)
parseField = do
Text
fieldName <- (Char -> Bool) -> Parser Text
P.takeWhile1 Char -> Bool
nameCharPredicate
forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) a. Alternative f => a -> f a -> f a
option Char
' ' forall a b. (a -> b) -> a -> b
$ Char -> Parser Char
char Char
':'
forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) a. Alternative f => a -> f a -> f a
option Char
' ' forall a b. (a -> b) -> a -> b
$ Char -> Parser Char
char Char
' '
Text
fieldValue <- (Char -> Bool) -> Parser Text
P.takeWhile Char -> Bool
anyCharPredicate
Parser ()
endOfLineSSE
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Text
fieldName, Text
fieldValue)
processField :: (Text, Text) -> SSE -> SSE
processField :: (Text, Text) -> SSE -> SSE
processField (Text
fieldName, Text
fieldValue) SSE
event = case Text
fieldName of
Text
"event" -> SSE
event { $sel:name:SSE :: Text
name = Text
fieldValue }
Text
"id" -> SSE
event { $sel:lastEventId:SSE :: Maybe Text
lastEventId = forall a. a -> Maybe a
Just Text
fieldValue }
Text
"retry" -> SSE
event { $sel:retry:SSE :: Maybe Text
retry = forall a. a -> Maybe a
Just Text
fieldValue }
Text
"data" -> SSE
event { $sel:buffer:SSE :: Text
buffer = [Text] -> Text
T.concat [ SSE -> Text
buffer SSE
event, if Text -> Bool
T.null (SSE -> Text
buffer SSE
event) then Text
"" else Text
"\n", Text
fieldValue] }
Text
_ -> SSE
event
parseEvent :: Parser SSE
parseEvent :: Parser SSE
parseEvent = do
[(Text, Text)]
fields <- forall (f :: * -> *) a. Alternative f => f a -> f [a]
many (forall (f :: * -> *) a. Alternative f => f a -> f [a]
many Parser ()
comment forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Parser (Text, Text)
parseField forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= forall (f :: * -> *) a. Applicative f => a -> f a
pure)
Parser ()
endOfLineSSE
let event :: SSE
event = forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr (Text, Text) -> SSE -> SSE
processField (Text -> Text -> Maybe Text -> Maybe Text -> SSE
SSE Text
"" Text
"" forall (m :: * -> *) a. MonadPlus m => m a
mzero forall (m :: * -> *) a. MonadPlus m => m a
mzero) [(Text, Text)]
fields
if Text -> Bool
T.null (SSE -> Text
name SSE
event) Bool -> Bool -> Bool
|| Text -> Bool
T.null (SSE -> Text
buffer SSE
event) then Parser SSE
parseEvent else forall (f :: * -> *) a. Applicative f => a -> f a
pure SSE
event
processPut :: (MonadIO m, MonadLogger m) => DataSourceUpdates -> L.ByteString -> m ()
processPut :: forall (m :: * -> *).
(MonadIO m, MonadLogger m) =>
DataSourceUpdates -> ByteString -> m ()
processPut DataSourceUpdates
dataSourceUpdates ByteString
value = case forall a. FromJSON a => ByteString -> Either String a
eitherDecode ByteString
value of
Right (PathData Text
_ (PutBody KeyMap Flag
flags KeyMap Segment
segments)) -> do
$(logInfo) Text
"initializing dataSourceUpdates with put"
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (DataSourceUpdates
-> KeyMap Flag -> KeyMap Segment -> IO (Either Text ())
dataSourceUpdatesInit DataSourceUpdates
dataSourceUpdates KeyMap Flag
flags KeyMap Segment
segments) forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Left Text
err -> $(logError) forall a b. (a -> b) -> a -> b
$ Text -> Text -> Text
T.append Text
"dataSourceUpdates failed put: " Text
err
Either Text ()
_ -> forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
Left String
err -> $(logError) forall a b. (a -> b) -> a -> b
$ Text -> Text -> Text
T.append Text
"failed to parse put body" (String -> Text
T.pack String
err)
processPatch :: forall m. (MonadIO m, MonadLogger m) => DataSourceUpdates -> L.ByteString -> m ()
processPatch :: forall (m :: * -> *).
(MonadIO m, MonadLogger m) =>
DataSourceUpdates -> ByteString -> m ()
processPatch DataSourceUpdates
dataSourceUpdates ByteString
value = case forall a. FromJSON a => ByteString -> Either String a
eitherDecode ByteString
value of
Right (PathData Text
path Value
body)
| Text -> Text -> Bool
T.isPrefixOf Text
"/flags/" Text
path -> forall a.
Text
-> Text
-> (DataSourceUpdates -> a -> IO (Either Text ()))
-> Result a
-> m ()
insPatch Text
"flag" Text
path DataSourceUpdates -> Flag -> IO (Either Text ())
dataSourceUpdatesInsertFlag (forall a. FromJSON a => Value -> Result a
fromJSON Value
body)
| Text -> Text -> Bool
T.isPrefixOf Text
"/segments/" Text
path -> forall a.
Text
-> Text
-> (DataSourceUpdates -> a -> IO (Either Text ()))
-> Result a
-> m ()
insPatch Text
"segment" Text
path DataSourceUpdates -> Segment -> IO (Either Text ())
dataSourceUpdatesInsertSegment (forall a. FromJSON a => Value -> Result a
fromJSON Value
body)
| Bool
otherwise -> $(logError) Text
"unknown patch path"
Left String
err -> $(logError) forall a b. (a -> b) -> a -> b
$ Text -> Text -> Text
T.append Text
"failed to parse patch generic" (String -> Text
T.pack String
err)
where
insPatch :: Text -> Text -> (DataSourceUpdates -> a -> IO (Either Text ())) -> Result a -> m ()
insPatch :: forall a.
Text
-> Text
-> (DataSourceUpdates -> a -> IO (Either Text ()))
-> Result a
-> m ()
insPatch Text
name Text
_ DataSourceUpdates -> a -> IO (Either Text ())
_ (Error String
err) = $(logError) forall a b. (a -> b) -> a -> b
$ [Text] -> Text
T.concat [Text
"failed to parse patch ", Text
name, Text
": ", String -> Text
T.pack String
err]
insPatch Text
name Text
path DataSourceUpdates -> a -> IO (Either Text ())
insert (Success a
item) = do
$(logInfo) forall a b. (a -> b) -> a -> b
$ [Text] -> Text
T.concat [Text
"patching ", Text
name, Text
" with path: ", Text
path]
Either Text ()
status <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ DataSourceUpdates -> a -> IO (Either Text ())
insert DataSourceUpdates
dataSourceUpdates a
item
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either (\Text
err -> $(logError) forall a b. (a -> b) -> a -> b
$ [Text] -> Text
T.concat [Text
"dataSourceUpdates failed ", Text
name, Text
" patch: ", Text
err]) forall (f :: * -> *) a. Applicative f => a -> f a
pure Either Text ()
status
processDelete :: forall m. (MonadIO m, MonadLogger m) => DataSourceUpdates -> L.ByteString -> m ()
processDelete :: forall (m :: * -> *).
(MonadIO m, MonadLogger m) =>
DataSourceUpdates -> ByteString -> m ()
processDelete DataSourceUpdates
dataSourceUpdates ByteString
value = case forall a. FromJSON a => ByteString -> Either String a
eitherDecode ByteString
value :: Either String PathVersion of
Right (PathVersion Text
path Natural
version)
| Text -> Text -> Bool
T.isPrefixOf Text
"/flags/" Text
path -> Text -> Text -> IO (Either Text ()) -> m ()
logDelete Text
"flag" Text
path (DataSourceUpdates -> Text -> Natural -> IO (Either Text ())
dataSourceUpdatesDeleteFlag DataSourceUpdates
dataSourceUpdates (Int -> Text -> Text
T.drop Int
7 Text
path) Natural
version)
| Text -> Text -> Bool
T.isPrefixOf Text
"/segments/" Text
path -> Text -> Text -> IO (Either Text ()) -> m ()
logDelete Text
"segment" Text
path (DataSourceUpdates -> Text -> Natural -> IO (Either Text ())
dataSourceUpdatesDeleteSegment DataSourceUpdates
dataSourceUpdates (Int -> Text -> Text
T.drop Int
10 Text
path) Natural
version)
| Bool
otherwise -> $(logError) Text
"unknown delete path"
Left String
err -> $(logError) forall a b. (a -> b) -> a -> b
$ Text -> Text -> Text
T.append Text
"failed to parse delete" (String -> Text
T.pack String
err)
where logDelete :: Text -> Text -> StoreResult () -> m ()
logDelete :: Text -> Text -> IO (Either Text ()) -> m ()
logDelete Text
name Text
path IO (Either Text ())
action = do
$(logInfo) forall a b. (a -> b) -> a -> b
$ [Text] -> Text
T.concat [Text
"deleting ", Text
name, Text
" with path: ", Text
path]
Either Text ()
status <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (Either Text ())
action
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either (\Text
err -> $(logError) forall a b. (a -> b) -> a -> b
$ [Text] -> Text
T.concat [Text
"dataSourceUpdates failed ", Text
name, Text
" delete: ", Text
err]) forall (f :: * -> *) a. Applicative f => a -> f a
pure Either Text ()
status
processEvent :: (MonadIO m, MonadLogger m) => DataSourceUpdates -> Text -> L.ByteString -> m ()
processEvent :: forall (m :: * -> *).
(MonadIO m, MonadLogger m) =>
DataSourceUpdates -> Text -> ByteString -> m ()
processEvent DataSourceUpdates
dataSourceUpdates Text
name ByteString
value = case Text
name of
Text
"put" -> forall (m :: * -> *).
(MonadIO m, MonadLogger m) =>
DataSourceUpdates -> ByteString -> m ()
processPut DataSourceUpdates
dataSourceUpdates ByteString
value
Text
"patch" -> forall (m :: * -> *).
(MonadIO m, MonadLogger m) =>
DataSourceUpdates -> ByteString -> m ()
processPatch DataSourceUpdates
dataSourceUpdates ByteString
value
Text
"delete" -> forall (m :: * -> *).
(MonadIO m, MonadLogger m) =>
DataSourceUpdates -> ByteString -> m ()
processDelete DataSourceUpdates
dataSourceUpdates ByteString
value
Text
_ -> $(logWarn) Text
"unknown event type"
data ReadE = ReadETimeout | ReadEClosed deriving (Int -> ReadE -> ShowS
[ReadE] -> ShowS
ReadE -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [ReadE] -> ShowS
$cshowList :: [ReadE] -> ShowS
show :: ReadE -> String
$cshow :: ReadE -> String
showsPrec :: Int -> ReadE -> ShowS
$cshowsPrec :: Int -> ReadE -> ShowS
Show, Show ReadE
Typeable ReadE
SomeException -> Maybe ReadE
ReadE -> String
ReadE -> SomeException
forall e.
Typeable e
-> Show e
-> (e -> SomeException)
-> (SomeException -> Maybe e)
-> (e -> String)
-> Exception e
displayException :: ReadE -> String
$cdisplayException :: ReadE -> String
fromException :: SomeException -> Maybe ReadE
$cfromException :: SomeException -> Maybe ReadE
toException :: ReadE -> SomeException
$ctoException :: ReadE -> SomeException
Exception)
tryReadE :: MonadCatch m => m a -> m (Either ReadE a)
tryReadE :: forall (m :: * -> *) a. MonadCatch m => m a -> m (Either ReadE a)
tryReadE = forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> m (Either e a)
try
readWithException :: IO ByteString -> IO Text
readWithException :: IO ByteString -> IO Text
readWithException IO ByteString
body = forall a. Int -> IO a -> IO (Maybe a)
timeout (Int
1000000 forall a. Num a => a -> a -> a
* Int
300) (IO ByteString -> IO ByteString
brRead IO ByteString
body) forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Maybe ByteString
Nothing -> forall e a. Exception e => e -> IO a
throwIO ReadE
ReadETimeout
Just ByteString
bytes -> if ByteString
bytes forall a. Eq a => a -> a -> Bool
== ByteString
B.empty then forall e a. Exception e => e -> IO a
throwIO ReadE
ReadEClosed else forall (f :: * -> *) a. Applicative f => a -> f a
pure (ByteString -> Text
decodeUtf8 ByteString
bytes)
readStream :: (MonadIO m, MonadLogger m, MonadMask m) => IO ByteString -> DataSourceUpdates -> m ()
readStream :: forall (m :: * -> *).
(MonadIO m, MonadLogger m, MonadMask m) =>
IO ByteString -> DataSourceUpdates -> m ()
readStream IO ByteString
body DataSourceUpdates
dataSourceUpdates = Text -> m ()
loop Text
"" where
loop :: Text -> m ()
loop Text
initial = forall (m :: * -> *) a. MonadCatch m => m a -> m (Either ReadE a)
tryReadE (forall (m :: * -> *) a.
Monad m =>
m Text -> Parser a -> Text -> m (Result a)
parseWith (forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ IO ByteString -> IO Text
readWithException IO ByteString
body) Parser SSE
parseEvent Text
initial) forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
(Left ReadE
ReadETimeout) -> $(logError) Text
"streaming connection unexpectedly closed"
(Left ReadE
ReadEClosed) -> $(logError) Text
"timeout waiting for SSE event"
(Right Result SSE
parsed) -> case Result SSE
parsed of
Done Text
remaining SSE
event -> do
forall (m :: * -> *).
(MonadIO m, MonadLogger m) =>
DataSourceUpdates -> Text -> ByteString -> m ()
processEvent DataSourceUpdates
dataSourceUpdates (SSE -> Text
name SSE
event) (ByteString -> ByteString
L.fromStrict forall a b. (a -> b) -> a -> b
$ Text -> ByteString
encodeUtf8 forall a b. (a -> b) -> a -> b
$ SSE -> Text
buffer SSE
event)
Text -> m ()
loop Text
remaining
Fail Text
_ [String]
context String
err ->
$(logError) forall a b. (a -> b) -> a -> b
$ Text -> [Text] -> Text
T.intercalate Text
" " [Text
"failed parsing SSE frame", String -> Text
T.pack forall a b. (a -> b) -> a -> b
$ forall a. Show a => a -> String
show [String]
context, String -> Text
T.pack String
err]
Partial Text -> Result SSE
_ -> $(logError) Text
"failed parsing SSE frame unexpected partial"
retryPolicy :: MonadIO m => RetryPolicyM m
retryPolicy :: forall (m :: * -> *). MonadIO m => RetryPolicyM m
retryPolicy = forall (m :: * -> *).
Monad m =>
Int -> RetryPolicyM m -> RetryPolicyM m
capDelay (Int
30 forall a. Num a => a -> a -> a
* Int
1000000) forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *). MonadIO m => Int -> RetryPolicyM m
fullJitterBackoff (Int
1 forall a. Num a => a -> a -> a
* Int
1000000)
data Failure = FailurePermanent | FailureTemporary | FailureReset deriving (Failure -> Failure -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: Failure -> Failure -> Bool
$c/= :: Failure -> Failure -> Bool
== :: Failure -> Failure -> Bool
$c== :: Failure -> Failure -> Bool
Eq)
handleStream :: (MonadIO m, MonadLogger m, MonadMask m) => Manager -> Request -> DataSourceUpdates -> RetryStatus -> m Failure
handleStream :: forall (m :: * -> *).
(MonadIO m, MonadLogger m, MonadMask m) =>
Manager -> Request -> DataSourceUpdates -> RetryStatus -> m Failure
handleStream Manager
manager Request
request DataSourceUpdates
dataSourceUpdates RetryStatus
_ = do
$(logDebug) Text
"starting new streaming connection"
TimeSpec
startTime <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ Clock -> IO TimeSpec
getTime Clock
Monotonic
Either HttpException ()
status <- forall (m :: * -> *) a.
MonadCatch m =>
m a -> m (Either HttpException a)
tryHTTP forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
(MonadIO m, MonadMask m) =>
Request -> Manager -> (Response (IO ByteString) -> m a) -> m a
withResponseGeneric Request
request Manager
manager forall a b. (a -> b) -> a -> b
$ \Response (IO ByteString)
response -> forall (m :: * -> *) body. MonadThrow m => Response body -> m ()
checkAuthorization Response (IO ByteString)
response forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>>
if forall body. Response body -> Status
responseStatus Response (IO ByteString)
response forall a. Eq a => a -> a -> Bool
/= Status
ok200 then forall (m :: * -> *).
MonadIO m =>
Request -> Response (IO ByteString) -> m ()
throwErrorStatusCodes Request
request Response (IO ByteString)
response else
forall (m :: * -> *).
(MonadIO m, MonadLogger m, MonadMask m) =>
IO ByteString -> DataSourceUpdates -> m ()
readStream (forall body. Response body -> body
responseBody Response (IO ByteString)
response) DataSourceUpdates
dataSourceUpdates
case Either HttpException ()
status of
(Right ()) -> forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Clock -> IO TimeSpec
getTime Clock
Monotonic) forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \TimeSpec
now -> if TimeSpec
now forall a. Ord a => a -> a -> Bool
>= TimeSpec
startTime forall a. Num a => a -> a -> a
+ (Int64 -> Int64 -> TimeSpec
TimeSpec Int64
60 Int64
0)
then do
$(logError) Text
"streaming connection closed after 60 seconds, retrying instantly"
forall (f :: * -> *) a. Applicative f => a -> f a
pure Failure
FailureReset
else do
$(logError) Text
"streaming connection closed before 60 seconds, retrying after backoff"
forall (f :: * -> *) a. Applicative f => a -> f a
pure Failure
FailureTemporary
(Left HttpException
err) -> do
$(logError) (Text -> [Text] -> Text
T.intercalate Text
" " [Text
"HTTP Exception", String -> Text
T.pack forall a b. (a -> b) -> a -> b
$ forall a. Show a => a -> String
show HttpException
err])
forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ case HttpException
err of
(InvalidUrlException String
_ String
_) -> Failure
FailurePermanent
(HttpExceptionRequest Request
_ (StatusCodeException Response ()
response ByteString
_)) ->
let code :: Int
code = Status -> Int
statusCode (forall body. Response body -> Status
responseStatus Response ()
response) in if Int
code forall a. Ord a => a -> a -> Bool
>= Int
400 Bool -> Bool -> Bool
&& Int
code forall a. Ord a => a -> a -> Bool
< Int
500
then if forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
elem Int
code [Int
400, Int
408, Int
429] then Failure
FailureTemporary else Failure
FailurePermanent
else Failure
FailureTemporary
HttpException
_ -> Failure
FailureTemporary
streamingThread :: (MonadIO m, MonadLogger m, MonadMask m) => Text -> ClientContext -> DataSourceUpdates -> m ()
streamingThread :: forall (m :: * -> *).
(MonadIO m, MonadLogger m, MonadMask m) =>
Text -> ClientContext -> DataSourceUpdates -> m ()
streamingThread Text
streamURI ClientContext
clientContext DataSourceUpdates
dataSourceUpdates = do
let manager :: Manager
manager = HttpConfiguration -> Manager
tlsManager forall a b. (a -> b) -> a -> b
$ ClientContext -> HttpConfiguration
httpConfiguration ClientContext
clientContext
Request
req <- forall (m :: * -> *).
MonadThrow m =>
HttpConfiguration -> String -> m Request
prepareRequest (ClientContext -> HttpConfiguration
httpConfiguration ClientContext
clientContext) (Text -> String
T.unpack Text
streamURI forall a. [a] -> [a] -> [a]
++ String
"/all")
forall (m :: * -> *).
(MonadIO m, MonadLogger m, MonadCatch m) =>
DataSourceUpdates -> m () -> m ()
handleUnauthorized DataSourceUpdates
dataSourceUpdates forall a b. (a -> b) -> a -> b
$ forall a. (a -> a) -> a
fix forall a b. (a -> b) -> a -> b
$ \m ()
loop ->
forall (m :: * -> *) b.
MonadIO m =>
RetryPolicyM m
-> (RetryStatus -> b -> m Bool) -> (RetryStatus -> m b) -> m b
retrying forall (m :: * -> *). MonadIO m => RetryPolicyM m
retryPolicy (\RetryStatus
_ Failure
status -> forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ Failure
status forall a. Eq a => a -> a -> Bool
== Failure
FailureTemporary) (forall (m :: * -> *).
(MonadIO m, MonadLogger m, MonadMask m) =>
Manager -> Request -> DataSourceUpdates -> RetryStatus -> m Failure
handleStream Manager
manager Request
req DataSourceUpdates
dataSourceUpdates) forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>=
\case Failure
FailureReset -> m ()
loop; Failure
_ -> forall (f :: * -> *) a. Applicative f => a -> f a
pure ()