module LaunchDarkly.Server.Network.Streaming (streamingThread) where

import           Data.Text                           (Text)
import qualified Data.Text as                        T
import           Data.Attoparsec.Text as             P hiding (Result, try)
import           Data.Function                       (fix)
import           Control.Monad                       (void, mzero)
import           Control.Monad.Catch                 (Exception, MonadCatch, MonadMask, try)
import           Control.Exception                   (throwIO)
import           Data.ByteString                     (ByteString)
import qualified Data.ByteString as                  B
import           Control.Applicative                 (many)
import           Data.Text.Encoding                  (decodeUtf8, encodeUtf8)
import           Network.HTTP.Client                 (Manager, Response(..), Request, HttpException(..), HttpExceptionContent(..), brRead, throwErrorStatusCodes)
import           Control.Monad.Logger                (MonadLogger, logInfo, logWarn, logError, logDebug)
import           Control.Monad.IO.Class              (MonadIO, liftIO)
import           Data.Aeson                          (FromJSON, parseJSON, withObject, eitherDecode, (.:), (.:?), (.!=), fromJSON, Result(..))
import qualified Data.ByteString.Lazy as             L
import           GHC.Natural                         (Natural)
import           GHC.Generics                        (Generic)
import           Control.Retry                       (RetryPolicyM, RetryStatus, fullJitterBackoff, capDelay, retrying)
import           Network.HTTP.Types.Status           (ok200, Status(statusCode))
import           System.Timeout                      (timeout)
import           System.Clock                        (getTime, Clock(Monotonic), TimeSpec(TimeSpec))

import           LaunchDarkly.Server.Config.HttpConfiguration (HttpConfiguration(..), prepareRequest)
import           LaunchDarkly.Server.Config.ClientContext     (ClientContext(..))
import           LaunchDarkly.Server.Store.Internal           (StoreResult)
import           LaunchDarkly.Server.DataSource.Internal      (DataSourceUpdates(..))
import           LaunchDarkly.Server.Features                 (Flag, Segment)
import           LaunchDarkly.Server.Network.Common           (handleUnauthorized, checkAuthorization, withResponseGeneric, tryHTTP)
import           LaunchDarkly.AesonCompat                     (KeyMap)


data PutBody = PutBody
    { PutBody -> KeyMap Flag
flags    :: !(KeyMap Flag)
    , PutBody -> KeyMap Segment
segments :: !(KeyMap Segment)
    } deriving ((forall x. PutBody -> Rep PutBody x)
-> (forall x. Rep PutBody x -> PutBody) -> Generic PutBody
forall x. Rep PutBody x -> PutBody
forall x. PutBody -> Rep PutBody x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep PutBody x -> PutBody
$cfrom :: forall x. PutBody -> Rep PutBody x
Generic, Int -> PutBody -> ShowS
[PutBody] -> ShowS
PutBody -> String
(Int -> PutBody -> ShowS)
-> (PutBody -> String) -> ([PutBody] -> ShowS) -> Show PutBody
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [PutBody] -> ShowS
$cshowList :: [PutBody] -> ShowS
show :: PutBody -> String
$cshow :: PutBody -> String
showsPrec :: Int -> PutBody -> ShowS
$cshowsPrec :: Int -> PutBody -> ShowS
Show, Value -> Parser [PutBody]
Value -> Parser PutBody
(Value -> Parser PutBody)
-> (Value -> Parser [PutBody]) -> FromJSON PutBody
forall a.
(Value -> Parser a) -> (Value -> Parser [a]) -> FromJSON a
parseJSONList :: Value -> Parser [PutBody]
$cparseJSONList :: Value -> Parser [PutBody]
parseJSON :: Value -> Parser PutBody
$cparseJSON :: Value -> Parser PutBody
FromJSON)

data PathData d = PathData
    { PathData d -> Text
path     :: !Text
    , PathData d -> d
pathData :: !d
    } deriving ((forall x. PathData d -> Rep (PathData d) x)
-> (forall x. Rep (PathData d) x -> PathData d)
-> Generic (PathData d)
forall x. Rep (PathData d) x -> PathData d
forall x. PathData d -> Rep (PathData d) x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
forall d x. Rep (PathData d) x -> PathData d
forall d x. PathData d -> Rep (PathData d) x
$cto :: forall d x. Rep (PathData d) x -> PathData d
$cfrom :: forall d x. PathData d -> Rep (PathData d) x
Generic, Int -> PathData d -> ShowS
[PathData d] -> ShowS
PathData d -> String
(Int -> PathData d -> ShowS)
-> (PathData d -> String)
-> ([PathData d] -> ShowS)
-> Show (PathData d)
forall d. Show d => Int -> PathData d -> ShowS
forall d. Show d => [PathData d] -> ShowS
forall d. Show d => PathData d -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [PathData d] -> ShowS
$cshowList :: forall d. Show d => [PathData d] -> ShowS
show :: PathData d -> String
$cshow :: forall d. Show d => PathData d -> String
showsPrec :: Int -> PathData d -> ShowS
$cshowsPrec :: forall d. Show d => Int -> PathData d -> ShowS
Show)

data PathVersion = PathVersion
    { PathVersion -> Text
path    :: !Text
    , PathVersion -> Natural
version :: !Natural
    } deriving ((forall x. PathVersion -> Rep PathVersion x)
-> (forall x. Rep PathVersion x -> PathVersion)
-> Generic PathVersion
forall x. Rep PathVersion x -> PathVersion
forall x. PathVersion -> Rep PathVersion x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep PathVersion x -> PathVersion
$cfrom :: forall x. PathVersion -> Rep PathVersion x
Generic, Int -> PathVersion -> ShowS
[PathVersion] -> ShowS
PathVersion -> String
(Int -> PathVersion -> ShowS)
-> (PathVersion -> String)
-> ([PathVersion] -> ShowS)
-> Show PathVersion
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [PathVersion] -> ShowS
$cshowList :: [PathVersion] -> ShowS
show :: PathVersion -> String
$cshow :: PathVersion -> String
showsPrec :: Int -> PathVersion -> ShowS
$cshowsPrec :: Int -> PathVersion -> ShowS
Show, Value -> Parser [PathVersion]
Value -> Parser PathVersion
(Value -> Parser PathVersion)
-> (Value -> Parser [PathVersion]) -> FromJSON PathVersion
forall a.
(Value -> Parser a) -> (Value -> Parser [a]) -> FromJSON a
parseJSONList :: Value -> Parser [PathVersion]
$cparseJSONList :: Value -> Parser [PathVersion]
parseJSON :: Value -> Parser PathVersion
$cparseJSON :: Value -> Parser PathVersion
FromJSON)

instance FromJSON a => FromJSON (PathData a) where
    parseJSON :: Value -> Parser (PathData a)
parseJSON = String
-> (Object -> Parser (PathData a)) -> Value -> Parser (PathData a)
forall a. String -> (Object -> Parser a) -> Value -> Parser a
withObject String
"Put" ((Object -> Parser (PathData a)) -> Value -> Parser (PathData a))
-> (Object -> Parser (PathData a)) -> Value -> Parser (PathData a)
forall a b. (a -> b) -> a -> b
$ \Object
o -> do
        a
pathData <- Object
o Object -> Key -> Parser a
forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"data"
        Text
path     <- Object
o Object -> Key -> Parser (Maybe Text)
forall a. FromJSON a => Object -> Key -> Parser (Maybe a)
.:? Key
"path" Parser (Maybe Text) -> Text -> Parser Text
forall a. Parser (Maybe a) -> a -> Parser a
.!= Text
"/"
        PathData a -> Parser (PathData a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (PathData a -> Parser (PathData a))
-> PathData a -> Parser (PathData a)
forall a b. (a -> b) -> a -> b
$ PathData :: forall d. Text -> d -> PathData d
PathData { $sel:path:PathData :: Text
path = Text
path, $sel:pathData:PathData :: a
pathData = a
pathData }

data SSE = SSE
    { SSE -> Text
name        :: !Text
    , SSE -> Text
buffer      :: !Text
    , SSE -> Maybe Text
lastEventId :: !(Maybe Text)
    , SSE -> Maybe Text
retry       :: !(Maybe Text)
    } deriving ((forall x. SSE -> Rep SSE x)
-> (forall x. Rep SSE x -> SSE) -> Generic SSE
forall x. Rep SSE x -> SSE
forall x. SSE -> Rep SSE x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep SSE x -> SSE
$cfrom :: forall x. SSE -> Rep SSE x
Generic, Int -> SSE -> ShowS
[SSE] -> ShowS
SSE -> String
(Int -> SSE -> ShowS)
-> (SSE -> String) -> ([SSE] -> ShowS) -> Show SSE
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [SSE] -> ShowS
$cshowList :: [SSE] -> ShowS
show :: SSE -> String
$cshow :: SSE -> String
showsPrec :: Int -> SSE -> ShowS
$cshowsPrec :: Int -> SSE -> ShowS
Show, SSE -> SSE -> Bool
(SSE -> SSE -> Bool) -> (SSE -> SSE -> Bool) -> Eq SSE
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: SSE -> SSE -> Bool
$c/= :: SSE -> SSE -> Bool
== :: SSE -> SSE -> Bool
$c== :: SSE -> SSE -> Bool
Eq)

nameCharPredicate :: Char -> Bool
nameCharPredicate :: Char -> Bool
nameCharPredicate Char
x = Char
x Char -> Char -> Bool
forall a. Eq a => a -> a -> Bool
/= Char
'\r' Bool -> Bool -> Bool
&& Char
x Char -> Char -> Bool
forall a. Eq a => a -> a -> Bool
/= Char
':' Bool -> Bool -> Bool
&& Char
x Char -> Char -> Bool
forall a. Eq a => a -> a -> Bool
/= Char
'\n'

anyCharPredicate :: Char -> Bool
anyCharPredicate :: Char -> Bool
anyCharPredicate Char
x = Char
x Char -> Char -> Bool
forall a. Eq a => a -> a -> Bool
/= Char
'\r' Bool -> Bool -> Bool
&& Char
x Char -> Char -> Bool
forall a. Eq a => a -> a -> Bool
/= Char
'\n'

endOfLineSSE :: Parser ()
endOfLineSSE :: Parser ()
endOfLineSSE = [Parser ()] -> Parser ()
forall (f :: * -> *) a. Alternative f => [f a] -> f a
choice [Parser Text Text -> Parser ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (Parser Text Text -> Parser ()) -> Parser Text Text -> Parser ()
forall a b. (a -> b) -> a -> b
$ Text -> Parser Text Text
string Text
"\r\n", Parser Text Text -> Parser ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (Parser Text Text -> Parser ()) -> Parser Text Text -> Parser ()
forall a b. (a -> b) -> a -> b
$ Text -> Parser Text Text
string Text
"\r", Parser Text Text -> Parser ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (Parser Text Text -> Parser ()) -> Parser Text Text -> Parser ()
forall a b. (a -> b) -> a -> b
$ Text -> Parser Text Text
string Text
"\n", Parser ()
forall t. Chunk t => Parser t ()
endOfInput]

comment :: Parser ()
comment :: Parser ()
comment = Char -> Parser Char
char Char
':' Parser Char -> Parser Text Text -> Parser Text Text
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> (Char -> Bool) -> Parser Text Text
P.takeWhile Char -> Bool
anyCharPredicate Parser Text Text -> Parser () -> Parser ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Parser ()
endOfLineSSE Parser () -> Parser () -> Parser ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> () -> Parser ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()

parseField :: Parser (Text, Text)
parseField :: Parser (Text, Text)
parseField = do
    Text
fieldName <- (Char -> Bool) -> Parser Text Text
P.takeWhile1 Char -> Bool
nameCharPredicate
    Parser Char -> Parser ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (Parser Char -> Parser ()) -> Parser Char -> Parser ()
forall a b. (a -> b) -> a -> b
$ Char -> Parser Char -> Parser Char
forall (f :: * -> *) a. Alternative f => a -> f a -> f a
option Char
' ' (Parser Char -> Parser Char) -> Parser Char -> Parser Char
forall a b. (a -> b) -> a -> b
$ Char -> Parser Char
char Char
':'
    Parser Char -> Parser ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (Parser Char -> Parser ()) -> Parser Char -> Parser ()
forall a b. (a -> b) -> a -> b
$ Char -> Parser Char -> Parser Char
forall (f :: * -> *) a. Alternative f => a -> f a -> f a
option Char
' ' (Parser Char -> Parser Char) -> Parser Char -> Parser Char
forall a b. (a -> b) -> a -> b
$ Char -> Parser Char
char Char
' '
    Text
fieldValue <- (Char -> Bool) -> Parser Text Text
P.takeWhile Char -> Bool
anyCharPredicate
    Parser ()
endOfLineSSE
    (Text, Text) -> Parser (Text, Text)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Text
fieldName, Text
fieldValue)

processField :: (Text, Text) -> SSE -> SSE
processField :: (Text, Text) -> SSE -> SSE
processField (Text
fieldName, Text
fieldValue) SSE
event = case Text
fieldName of
    Text
"event"   -> SSE
event { $sel:name:SSE :: Text
name = Text
fieldValue }
    Text
"id"      -> SSE
event { $sel:lastEventId:SSE :: Maybe Text
lastEventId = Text -> Maybe Text
forall a. a -> Maybe a
Just Text
fieldValue }
    Text
"retry"   -> SSE
event { $sel:retry:SSE :: Maybe Text
retry = Text -> Maybe Text
forall a. a -> Maybe a
Just Text
fieldValue }
    Text
"data"    -> SSE
event { $sel:buffer:SSE :: Text
buffer = [Text] -> Text
T.concat [ SSE -> Text
buffer SSE
event, if Text -> Bool
T.null (SSE -> Text
buffer SSE
event) then Text
"" else Text
"\n", Text
fieldValue] }
    Text
_         -> SSE
event

parseEvent :: Parser SSE
parseEvent :: Parser SSE
parseEvent = do
    [(Text, Text)]
fields <- Parser (Text, Text) -> Parser Text [(Text, Text)]
forall (f :: * -> *) a. Alternative f => f a -> f [a]
many (Parser () -> Parser Text [()]
forall (f :: * -> *) a. Alternative f => f a -> f [a]
many Parser ()
comment Parser Text [()] -> Parser (Text, Text) -> Parser (Text, Text)
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Parser (Text, Text)
parseField Parser (Text, Text)
-> ((Text, Text) -> Parser (Text, Text)) -> Parser (Text, Text)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (Text, Text) -> Parser (Text, Text)
forall (f :: * -> *) a. Applicative f => a -> f a
pure)
    Parser ()
endOfLineSSE
    let event :: SSE
event = ((Text, Text) -> SSE -> SSE) -> SSE -> [(Text, Text)] -> SSE
forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr (Text, Text) -> SSE -> SSE
processField (Text -> Text -> Maybe Text -> Maybe Text -> SSE
SSE Text
"" Text
"" Maybe Text
forall (m :: * -> *) a. MonadPlus m => m a
mzero Maybe Text
forall (m :: * -> *) a. MonadPlus m => m a
mzero) [(Text, Text)]
fields
    if Text -> Bool
T.null (SSE -> Text
name SSE
event) Bool -> Bool -> Bool
|| Text -> Bool
T.null (SSE -> Text
buffer SSE
event) then Parser SSE
parseEvent else SSE -> Parser SSE
forall (f :: * -> *) a. Applicative f => a -> f a
pure SSE
event

processPut :: (MonadIO m, MonadLogger m) => DataSourceUpdates -> L.ByteString -> m ()
processPut :: DataSourceUpdates -> ByteString -> m ()
processPut DataSourceUpdates
dataSourceUpdates ByteString
value = case ByteString -> Either String (PathData PutBody)
forall a. FromJSON a => ByteString -> Either String a
eitherDecode ByteString
value of
    Right (PathData Text
_ (PutBody KeyMap Flag
flags KeyMap Segment
segments)) -> do
        $(Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> m ()
(Text -> m ()) -> (Text -> Text) -> Text -> m ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
id :: forall a. a -> a
logInfo) Text
"initializing dataSourceUpdates with put"
        IO (Either Text ()) -> m (Either Text ())
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (DataSourceUpdates
-> KeyMap Flag -> KeyMap Segment -> IO (Either Text ())
dataSourceUpdatesInit DataSourceUpdates
dataSourceUpdates KeyMap Flag
flags KeyMap Segment
segments) m (Either Text ()) -> (Either Text () -> m ()) -> m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
            Left Text
err -> $(Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> m ()
(Text -> m ()) -> (Text -> Text) -> Text -> m ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
id :: forall a. a -> a
logError) (Text -> m ()) -> Text -> m ()
forall a b. (a -> b) -> a -> b
$ Text -> Text -> Text
T.append Text
"dataSourceUpdates failed put: " Text
err
            Either Text ()
_        -> () -> m ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
    Left String
err -> $(Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> m ()
(Text -> m ()) -> (Text -> Text) -> Text -> m ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
id :: forall a. a -> a
logError) (Text -> m ()) -> Text -> m ()
forall a b. (a -> b) -> a -> b
$ Text -> Text -> Text
T.append Text
"failed to parse put body" (String -> Text
T.pack String
err)

processPatch :: forall m. (MonadIO m, MonadLogger m) => DataSourceUpdates -> L.ByteString -> m ()
processPatch :: DataSourceUpdates -> ByteString -> m ()
processPatch DataSourceUpdates
dataSourceUpdates ByteString
value = case ByteString -> Either String (PathData Value)
forall a. FromJSON a => ByteString -> Either String a
eitherDecode ByteString
value of
    Right (PathData Text
path Value
body)
        | Text -> Text -> Bool
T.isPrefixOf Text
"/flags/" Text
path    -> Text
-> Text
-> (DataSourceUpdates -> Flag -> IO (Either Text ()))
-> Result Flag
-> m ()
forall a.
Text
-> Text
-> (DataSourceUpdates -> a -> IO (Either Text ()))
-> Result a
-> m ()
insPatch Text
"flag" Text
path DataSourceUpdates -> Flag -> IO (Either Text ())
dataSourceUpdatesInsertFlag (Value -> Result Flag
forall a. FromJSON a => Value -> Result a
fromJSON Value
body)
        | Text -> Text -> Bool
T.isPrefixOf Text
"/segments/" Text
path -> Text
-> Text
-> (DataSourceUpdates -> Segment -> IO (Either Text ()))
-> Result Segment
-> m ()
forall a.
Text
-> Text
-> (DataSourceUpdates -> a -> IO (Either Text ()))
-> Result a
-> m ()
insPatch Text
"segment" Text
path DataSourceUpdates -> Segment -> IO (Either Text ())
dataSourceUpdatesInsertSegment (Value -> Result Segment
forall a. FromJSON a => Value -> Result a
fromJSON Value
body)
        | Bool
otherwise                      -> $(Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> m ()
(Text -> m ()) -> (Text -> Text) -> Text -> m ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
id :: forall a. a -> a
logError) Text
"unknown patch path"
    Left String
err                             -> $(Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> m ()
(Text -> m ()) -> (Text -> Text) -> Text -> m ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
id :: forall a. a -> a
logError) (Text -> m ()) -> Text -> m ()
forall a b. (a -> b) -> a -> b
$ Text -> Text -> Text
T.append Text
"failed to parse patch generic" (String -> Text
T.pack String
err)
    where
      insPatch :: Text -> Text -> (DataSourceUpdates -> a -> IO (Either Text ())) -> Result a -> m ()
      insPatch :: Text
-> Text
-> (DataSourceUpdates -> a -> IO (Either Text ()))
-> Result a
-> m ()
insPatch Text
name Text
_ DataSourceUpdates -> a -> IO (Either Text ())
_ (Error String
err) = $(Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> m ()
(Text -> m ()) -> (Text -> Text) -> Text -> m ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
id :: forall a. a -> a
logError) (Text -> m ()) -> Text -> m ()
forall a b. (a -> b) -> a -> b
$ [Text] -> Text
T.concat [Text
"failed to parse patch ", Text
name, Text
": ", String -> Text
T.pack String
err]
      insPatch Text
name Text
path DataSourceUpdates -> a -> IO (Either Text ())
insert (Success a
item) = do
        $(Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> m ()
(Text -> m ()) -> (Text -> Text) -> Text -> m ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
id :: forall a. a -> a
logInfo) (Text -> m ()) -> Text -> m ()
forall a b. (a -> b) -> a -> b
$ [Text] -> Text
T.concat [Text
"patching ", Text
name, Text
" with path: ", Text
path]
        Either Text ()
status <- IO (Either Text ()) -> m (Either Text ())
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either Text ()) -> m (Either Text ()))
-> IO (Either Text ()) -> m (Either Text ())
forall a b. (a -> b) -> a -> b
$ DataSourceUpdates -> a -> IO (Either Text ())
insert DataSourceUpdates
dataSourceUpdates a
item
        (Text -> m ()) -> (() -> m ()) -> Either Text () -> m ()
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either (\Text
err -> $(Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> m ()
(Text -> m ()) -> (Text -> Text) -> Text -> m ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
id :: forall a. a -> a
logError) (Text -> m ()) -> Text -> m ()
forall a b. (a -> b) -> a -> b
$ [Text] -> Text
T.concat [Text
"dataSourceUpdates failed ", Text
name, Text
" patch: ", Text
err]) () -> m ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure Either Text ()
status

processDelete :: forall m. (MonadIO m, MonadLogger m) => DataSourceUpdates -> L.ByteString -> m ()
processDelete :: DataSourceUpdates -> ByteString -> m ()
processDelete DataSourceUpdates
dataSourceUpdates ByteString
value = case ByteString -> Either String PathVersion
forall a. FromJSON a => ByteString -> Either String a
eitherDecode ByteString
value :: Either String PathVersion of
    Right (PathVersion Text
path Natural
version)
        | Text -> Text -> Bool
T.isPrefixOf Text
"/flags/" Text
path    -> Text -> Text -> IO (Either Text ()) -> m ()
logDelete Text
"flag" Text
path (DataSourceUpdates -> Text -> Natural -> IO (Either Text ())
dataSourceUpdatesDeleteFlag DataSourceUpdates
dataSourceUpdates (Int -> Text -> Text
T.drop Int
7 Text
path) Natural
version)
        | Text -> Text -> Bool
T.isPrefixOf Text
"/segments/" Text
path -> Text -> Text -> IO (Either Text ()) -> m ()
logDelete Text
"segment" Text
path (DataSourceUpdates -> Text -> Natural -> IO (Either Text ())
dataSourceUpdatesDeleteSegment DataSourceUpdates
dataSourceUpdates (Int -> Text -> Text
T.drop Int
10 Text
path) Natural
version)
        | Bool
otherwise                      -> $(Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> m ()
(Text -> m ()) -> (Text -> Text) -> Text -> m ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
id :: forall a. a -> a
logError) Text
"unknown delete path"
    Left String
err                             -> $(Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> m ()
(Text -> m ()) -> (Text -> Text) -> Text -> m ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
id :: forall a. a -> a
logError) (Text -> m ()) -> Text -> m ()
forall a b. (a -> b) -> a -> b
$ Text -> Text -> Text
T.append Text
"failed to parse delete" (String -> Text
T.pack String
err)
    where logDelete :: Text -> Text -> StoreResult () -> m ()
          logDelete :: Text -> Text -> IO (Either Text ()) -> m ()
logDelete Text
name Text
path IO (Either Text ())
action = do
            $(Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> m ()
(Text -> m ()) -> (Text -> Text) -> Text -> m ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
id :: forall a. a -> a
logInfo) (Text -> m ()) -> Text -> m ()
forall a b. (a -> b) -> a -> b
$ [Text] -> Text
T.concat [Text
"deleting ", Text
name, Text
" with path: ", Text
path]
            Either Text ()
status <- IO (Either Text ()) -> m (Either Text ())
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (Either Text ())
action
            (Text -> m ()) -> (() -> m ()) -> Either Text () -> m ()
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either (\Text
err -> $(Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> m ()
(Text -> m ()) -> (Text -> Text) -> Text -> m ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
id :: forall a. a -> a
logError) (Text -> m ()) -> Text -> m ()
forall a b. (a -> b) -> a -> b
$ [Text] -> Text
T.concat [Text
"dataSourceUpdates failed ", Text
name, Text
" delete: ", Text
err]) () -> m ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure Either Text ()
status

processEvent :: (MonadIO m, MonadLogger m) => DataSourceUpdates -> Text -> L.ByteString -> m ()
processEvent :: DataSourceUpdates -> Text -> ByteString -> m ()
processEvent DataSourceUpdates
dataSourceUpdates Text
name ByteString
value = case Text
name of
    Text
"put"    -> DataSourceUpdates -> ByteString -> m ()
forall (m :: * -> *).
(MonadIO m, MonadLogger m) =>
DataSourceUpdates -> ByteString -> m ()
processPut DataSourceUpdates
dataSourceUpdates ByteString
value
    Text
"patch"  -> DataSourceUpdates -> ByteString -> m ()
forall (m :: * -> *).
(MonadIO m, MonadLogger m) =>
DataSourceUpdates -> ByteString -> m ()
processPatch DataSourceUpdates
dataSourceUpdates ByteString
value
    Text
"delete" -> DataSourceUpdates -> ByteString -> m ()
forall (m :: * -> *).
(MonadIO m, MonadLogger m) =>
DataSourceUpdates -> ByteString -> m ()
processDelete DataSourceUpdates
dataSourceUpdates ByteString
value
    Text
_        -> $(Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> m ()
(Text -> m ()) -> (Text -> Text) -> Text -> m ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
id :: forall a. a -> a
logWarn) Text
"unknown event type"

data ReadE = ReadETimeout | ReadEClosed deriving (Int -> ReadE -> ShowS
[ReadE] -> ShowS
ReadE -> String
(Int -> ReadE -> ShowS)
-> (ReadE -> String) -> ([ReadE] -> ShowS) -> Show ReadE
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [ReadE] -> ShowS
$cshowList :: [ReadE] -> ShowS
show :: ReadE -> String
$cshow :: ReadE -> String
showsPrec :: Int -> ReadE -> ShowS
$cshowsPrec :: Int -> ReadE -> ShowS
Show, Show ReadE
Typeable ReadE
Typeable ReadE
-> Show ReadE
-> (ReadE -> SomeException)
-> (SomeException -> Maybe ReadE)
-> (ReadE -> String)
-> Exception ReadE
SomeException -> Maybe ReadE
ReadE -> String
ReadE -> SomeException
forall e.
Typeable e
-> Show e
-> (e -> SomeException)
-> (SomeException -> Maybe e)
-> (e -> String)
-> Exception e
displayException :: ReadE -> String
$cdisplayException :: ReadE -> String
fromException :: SomeException -> Maybe ReadE
$cfromException :: SomeException -> Maybe ReadE
toException :: ReadE -> SomeException
$ctoException :: ReadE -> SomeException
$cp2Exception :: Show ReadE
$cp1Exception :: Typeable ReadE
Exception)

tryReadE :: MonadCatch m => m a -> m (Either ReadE a)
tryReadE :: m a -> m (Either ReadE a)
tryReadE = m a -> m (Either ReadE a)
forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> m (Either e a)
try

-- heartbeat expected every 120 seconds
readWithException :: IO ByteString -> IO Text
readWithException :: IO ByteString -> IO Text
readWithException IO ByteString
body = Int -> IO ByteString -> IO (Maybe ByteString)
forall a. Int -> IO a -> IO (Maybe a)
timeout (Int
1000000 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
300) (IO ByteString -> IO ByteString
brRead IO ByteString
body) IO (Maybe ByteString) -> (Maybe ByteString -> IO Text) -> IO Text
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
    Maybe ByteString
Nothing    -> ReadE -> IO Text
forall e a. Exception e => e -> IO a
throwIO ReadE
ReadETimeout
    Just ByteString
bytes -> if ByteString
bytes ByteString -> ByteString -> Bool
forall a. Eq a => a -> a -> Bool
== ByteString
B.empty then ReadE -> IO Text
forall e a. Exception e => e -> IO a
throwIO ReadE
ReadEClosed else Text -> IO Text
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ByteString -> Text
decodeUtf8 ByteString
bytes)

readStream :: (MonadIO m, MonadLogger m, MonadMask m) => IO ByteString -> DataSourceUpdates -> m ()
readStream :: IO ByteString -> DataSourceUpdates -> m ()
readStream IO ByteString
body DataSourceUpdates
dataSourceUpdates = Text -> m ()
loop Text
"" where
    loop :: Text -> m ()
loop Text
initial = m (Result SSE) -> m (Either ReadE (Result SSE))
forall (m :: * -> *) a. MonadCatch m => m a -> m (Either ReadE a)
tryReadE (m Text -> Parser SSE -> Text -> m (Result SSE)
forall (m :: * -> *) a.
Monad m =>
m Text -> Parser a -> Text -> m (Result a)
parseWith (IO Text -> m Text
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Text -> m Text) -> IO Text -> m Text
forall a b. (a -> b) -> a -> b
$ IO ByteString -> IO Text
readWithException IO ByteString
body) Parser SSE
parseEvent Text
initial) m (Either ReadE (Result SSE))
-> (Either ReadE (Result SSE) -> m ()) -> m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
        (Left ReadE
ReadETimeout) -> $(Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> m ()
(Text -> m ()) -> (Text -> Text) -> Text -> m ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
id :: forall a. a -> a
logError) Text
"streaming connection unexpectedly closed"
        (Left ReadE
ReadEClosed)  -> $(Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> m ()
(Text -> m ()) -> (Text -> Text) -> Text -> m ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
id :: forall a. a -> a
logError) Text
"timeout waiting for SSE event"
        (Right Result SSE
parsed)      -> case Result SSE
parsed of
            Done Text
remaining SSE
event -> do
                DataSourceUpdates -> Text -> ByteString -> m ()
forall (m :: * -> *).
(MonadIO m, MonadLogger m) =>
DataSourceUpdates -> Text -> ByteString -> m ()
processEvent DataSourceUpdates
dataSourceUpdates (SSE -> Text
name SSE
event) (ByteString -> ByteString
L.fromStrict (ByteString -> ByteString) -> ByteString -> ByteString
forall a b. (a -> b) -> a -> b
$ Text -> ByteString
encodeUtf8 (Text -> ByteString) -> Text -> ByteString
forall a b. (a -> b) -> a -> b
$ SSE -> Text
buffer SSE
event)
                Text -> m ()
loop Text
remaining
            Fail Text
_ [String]
context String
err ->
                $(Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> m ()
(Text -> m ()) -> (Text -> Text) -> Text -> m ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
id :: forall a. a -> a
logError) (Text -> m ()) -> Text -> m ()
forall a b. (a -> b) -> a -> b
$ Text -> [Text] -> Text
T.intercalate Text
" " [Text
"failed parsing SSE frame", String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ [String] -> String
forall a. Show a => a -> String
show [String]
context, String -> Text
T.pack String
err]
            Partial Text -> Result SSE
_ -> $(Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> m ()
(Text -> m ()) -> (Text -> Text) -> Text -> m ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
id :: forall a. a -> a
logError) Text
"failed parsing SSE frame unexpected partial"

-- https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/
retryPolicy :: MonadIO m => RetryPolicyM m
retryPolicy :: RetryPolicyM m
retryPolicy = Int -> RetryPolicyM m -> RetryPolicyM m
forall (m :: * -> *).
Monad m =>
Int -> RetryPolicyM m -> RetryPolicyM m
capDelay (Int
30 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
1000000) (RetryPolicyM m -> RetryPolicyM m)
-> RetryPolicyM m -> RetryPolicyM m
forall a b. (a -> b) -> a -> b
$ Int -> RetryPolicyM m
forall (m :: * -> *). MonadIO m => Int -> RetryPolicyM m
fullJitterBackoff (Int
1 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
1000000)

data Failure = FailurePermanent | FailureTemporary | FailureReset deriving (Failure -> Failure -> Bool
(Failure -> Failure -> Bool)
-> (Failure -> Failure -> Bool) -> Eq Failure
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: Failure -> Failure -> Bool
$c/= :: Failure -> Failure -> Bool
== :: Failure -> Failure -> Bool
$c== :: Failure -> Failure -> Bool
Eq)

handleStream :: (MonadIO m, MonadLogger m, MonadMask m) => Manager -> Request -> DataSourceUpdates -> RetryStatus -> m Failure
handleStream :: Manager -> Request -> DataSourceUpdates -> RetryStatus -> m Failure
handleStream Manager
manager Request
request DataSourceUpdates
dataSourceUpdates RetryStatus
_ = do
    $(Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> m ()
(Text -> m ()) -> (Text -> Text) -> Text -> m ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
id :: forall a. a -> a
logDebug) Text
"starting new streaming connection"
    TimeSpec
startTime <- IO TimeSpec -> m TimeSpec
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO TimeSpec -> m TimeSpec) -> IO TimeSpec -> m TimeSpec
forall a b. (a -> b) -> a -> b
$ Clock -> IO TimeSpec
getTime Clock
Monotonic
    Either HttpException ()
status <- m () -> m (Either HttpException ())
forall (m :: * -> *) a.
MonadCatch m =>
m a -> m (Either HttpException a)
tryHTTP (m () -> m (Either HttpException ()))
-> m () -> m (Either HttpException ())
forall a b. (a -> b) -> a -> b
$ Request -> Manager -> (Response (IO ByteString) -> m ()) -> m ()
forall (m :: * -> *) a.
(MonadIO m, MonadMask m) =>
Request -> Manager -> (Response (IO ByteString) -> m a) -> m a
withResponseGeneric Request
request Manager
manager ((Response (IO ByteString) -> m ()) -> m ())
-> (Response (IO ByteString) -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \Response (IO ByteString)
response -> Response (IO ByteString) -> m ()
forall (m :: * -> *) body. MonadThrow m => Response body -> m ()
checkAuthorization Response (IO ByteString)
response m () -> m () -> m ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>>
        if Response (IO ByteString) -> Status
forall body. Response body -> Status
responseStatus Response (IO ByteString)
response Status -> Status -> Bool
forall a. Eq a => a -> a -> Bool
/= Status
ok200 then Request -> Response (IO ByteString) -> m ()
forall (m :: * -> *).
MonadIO m =>
Request -> Response (IO ByteString) -> m ()
throwErrorStatusCodes Request
request Response (IO ByteString)
response else
            IO ByteString -> DataSourceUpdates -> m ()
forall (m :: * -> *).
(MonadIO m, MonadLogger m, MonadMask m) =>
IO ByteString -> DataSourceUpdates -> m ()
readStream (Response (IO ByteString) -> IO ByteString
forall body. Response body -> body
responseBody Response (IO ByteString)
response) DataSourceUpdates
dataSourceUpdates
    case Either HttpException ()
status of
        (Right ()) -> IO TimeSpec -> m TimeSpec
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Clock -> IO TimeSpec
getTime Clock
Monotonic) m TimeSpec -> (TimeSpec -> m Failure) -> m Failure
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \TimeSpec
now -> if TimeSpec
now TimeSpec -> TimeSpec -> Bool
forall a. Ord a => a -> a -> Bool
>= TimeSpec
startTime TimeSpec -> TimeSpec -> TimeSpec
forall a. Num a => a -> a -> a
+ (Int64 -> Int64 -> TimeSpec
TimeSpec Int64
60 Int64
0)
            then do
                $(Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> m ()
(Text -> m ()) -> (Text -> Text) -> Text -> m ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
id :: forall a. a -> a
logError) Text
"streaming connection closed after 60 seconds, retrying instantly"
                Failure -> m Failure
forall (f :: * -> *) a. Applicative f => a -> f a
pure Failure
FailureReset
            else do
                $(Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> m ()
(Text -> m ()) -> (Text -> Text) -> Text -> m ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
id :: forall a. a -> a
logError) Text
"streaming connection closed before 60 seconds, retrying after backoff"
                Failure -> m Failure
forall (f :: * -> *) a. Applicative f => a -> f a
pure Failure
FailureTemporary
        (Left HttpException
err) -> do
            $(Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> m ()
(Text -> m ()) -> (Text -> Text) -> Text -> m ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
id :: forall a. a -> a
logError) (Text -> [Text] -> Text
T.intercalate Text
" " [Text
"HTTP Exception", String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ HttpException -> String
forall a. Show a => a -> String
show HttpException
err])
            Failure -> m Failure
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Failure -> m Failure) -> Failure -> m Failure
forall a b. (a -> b) -> a -> b
$ case HttpException
err of
                (InvalidUrlException String
_ String
_)                                 -> Failure
FailurePermanent
                (HttpExceptionRequest Request
_ (StatusCodeException Response ()
response ByteString
_)) ->
                    let code :: Int
code = Status -> Int
statusCode (Response () -> Status
forall body. Response body -> Status
responseStatus Response ()
response) in if Int
code Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
400 Bool -> Bool -> Bool
&& Int
code Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
500
                        then if Int -> [Int] -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
elem Int
code [Int
400, Int
408, Int
429] then Failure
FailureTemporary else Failure
FailurePermanent
                        else Failure
FailureTemporary
                HttpException
_                                                         -> Failure
FailureTemporary

streamingThread :: (MonadIO m, MonadLogger m, MonadMask m) => Text -> ClientContext -> DataSourceUpdates -> m ()
streamingThread :: Text -> ClientContext -> DataSourceUpdates -> m ()
streamingThread Text
streamURI ClientContext
clientContext DataSourceUpdates
dataSourceUpdates = do
    let manager :: Manager
manager = HttpConfiguration -> Manager
tlsManager (HttpConfiguration -> Manager) -> HttpConfiguration -> Manager
forall a b. (a -> b) -> a -> b
$ ClientContext -> HttpConfiguration
httpConfiguration ClientContext
clientContext
    Request
req <- HttpConfiguration -> String -> m Request
forall (m :: * -> *).
MonadThrow m =>
HttpConfiguration -> String -> m Request
prepareRequest (ClientContext -> HttpConfiguration
httpConfiguration ClientContext
clientContext) (Text -> String
T.unpack Text
streamURI  String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
"/all")
    DataSourceUpdates -> m () -> m ()
forall (m :: * -> *).
(MonadIO m, MonadLogger m, MonadCatch m) =>
DataSourceUpdates -> m () -> m ()
handleUnauthorized DataSourceUpdates
dataSourceUpdates (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ (m () -> m ()) -> m ()
forall a. (a -> a) -> a
fix ((m () -> m ()) -> m ()) -> (m () -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \m ()
loop ->
        RetryPolicyM m
-> (RetryStatus -> Failure -> m Bool)
-> (RetryStatus -> m Failure)
-> m Failure
forall (m :: * -> *) b.
MonadIO m =>
RetryPolicyM m
-> (RetryStatus -> b -> m Bool) -> (RetryStatus -> m b) -> m b
retrying RetryPolicyM m
forall (m :: * -> *). MonadIO m => RetryPolicyM m
retryPolicy (\RetryStatus
_ Failure
status -> Bool -> m Bool
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Bool -> m Bool) -> Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ Failure
status Failure -> Failure -> Bool
forall a. Eq a => a -> a -> Bool
== Failure
FailureTemporary) (Manager -> Request -> DataSourceUpdates -> RetryStatus -> m Failure
forall (m :: * -> *).
(MonadIO m, MonadLogger m, MonadMask m) =>
Manager -> Request -> DataSourceUpdates -> RetryStatus -> m Failure
handleStream Manager
manager Request
req DataSourceUpdates
dataSourceUpdates) m Failure -> (Failure -> m ()) -> m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>=
            \case Failure
FailureReset -> m ()
loop; Failure
_ -> () -> m ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()