{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE NumericUnderscores #-}
module LaunchDarkly.Server.Network.Streaming (streamingThread) where
import Control.Applicative (many, (<|>))
import Control.Concurrent (threadDelay)
import Control.Exception (throwIO)
import Control.Monad (mzero, void)
import Control.Monad.Catch (Exception, MonadCatch, MonadMask, try)
import Control.Monad.IO.Class (MonadIO, liftIO)
import Control.Monad.Logger (MonadLogger, logDebug, logError, logWarn)
import Control.Monad.Loops (iterateUntilM)
import Data.Aeson (FromJSON, Result (..), eitherDecode, fromJSON, parseJSON, withObject, (.!=), (.:), (.:?))
import Data.Attoparsec.Text as P hiding (Result, try)
import Data.ByteString (ByteString)
import qualified Data.ByteString as B
import qualified Data.ByteString.Lazy as L
import Data.Text (Text)
import qualified Data.Text as T
import Data.Text.Encoding (decodeUtf8, encodeUtf8)
import GHC.Generics (Generic)
import GHC.Natural (Natural)
import Network.HTTP.Client (HttpException (..), HttpExceptionContent (..), Manager, Request, Response (..), brRead)
import Network.HTTP.Types.Status (Status (statusCode))
import System.Clock (Clock (Monotonic), TimeSpec (TimeSpec), getTime)
import System.Random (Random (randomR), newStdGen)
import System.Timeout (timeout)
import LaunchDarkly.AesonCompat (KeyMap, emptyObject)
import LaunchDarkly.Server.Config.ClientContext (ClientContext (..))
import LaunchDarkly.Server.Config.HttpConfiguration (HttpConfiguration (..), prepareRequest)
import LaunchDarkly.Server.DataSource.Internal (DataSourceUpdates (..))
import LaunchDarkly.Server.Features (Flag, Segment)
import LaunchDarkly.Server.Network.Common (checkAuthorization, handleUnauthorized, isHttpUnrecoverable, throwIfNot200, tryHTTP, withResponseGeneric)
import LaunchDarkly.Server.Store.Internal (StoreResult)
data PutBody = PutBody
{ PutBody -> KeyMap Flag
flags :: !(KeyMap Flag)
, PutBody -> KeyMap Segment
segments :: !(KeyMap Segment)
}
deriving ((forall x. PutBody -> Rep PutBody x)
-> (forall x. Rep PutBody x -> PutBody) -> Generic PutBody
forall x. Rep PutBody x -> PutBody
forall x. PutBody -> Rep PutBody x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cfrom :: forall x. PutBody -> Rep PutBody x
from :: forall x. PutBody -> Rep PutBody x
$cto :: forall x. Rep PutBody x -> PutBody
to :: forall x. Rep PutBody x -> PutBody
Generic, Int -> PutBody -> ShowS
[PutBody] -> ShowS
PutBody -> String
(Int -> PutBody -> ShowS)
-> (PutBody -> String) -> ([PutBody] -> ShowS) -> Show PutBody
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> PutBody -> ShowS
showsPrec :: Int -> PutBody -> ShowS
$cshow :: PutBody -> String
show :: PutBody -> String
$cshowList :: [PutBody] -> ShowS
showList :: [PutBody] -> ShowS
Show)
data PathData d = PathData
{ forall d. PathData d -> Text
path :: !Text
, forall d. PathData d -> d
pathData :: !d
}
deriving ((forall x. PathData d -> Rep (PathData d) x)
-> (forall x. Rep (PathData d) x -> PathData d)
-> Generic (PathData d)
forall x. Rep (PathData d) x -> PathData d
forall x. PathData d -> Rep (PathData d) x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
forall d x. Rep (PathData d) x -> PathData d
forall d x. PathData d -> Rep (PathData d) x
$cfrom :: forall d x. PathData d -> Rep (PathData d) x
from :: forall x. PathData d -> Rep (PathData d) x
$cto :: forall d x. Rep (PathData d) x -> PathData d
to :: forall x. Rep (PathData d) x -> PathData d
Generic, Int -> PathData d -> ShowS
[PathData d] -> ShowS
PathData d -> String
(Int -> PathData d -> ShowS)
-> (PathData d -> String)
-> ([PathData d] -> ShowS)
-> Show (PathData d)
forall d. Show d => Int -> PathData d -> ShowS
forall d. Show d => [PathData d] -> ShowS
forall d. Show d => PathData d -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: forall d. Show d => Int -> PathData d -> ShowS
showsPrec :: Int -> PathData d -> ShowS
$cshow :: forall d. Show d => PathData d -> String
show :: PathData d -> String
$cshowList :: forall d. Show d => [PathData d] -> ShowS
showList :: [PathData d] -> ShowS
Show)
data PathVersion = PathVersion
{ PathVersion -> Text
path :: !Text
, PathVersion -> Natural
version :: !Natural
}
deriving ((forall x. PathVersion -> Rep PathVersion x)
-> (forall x. Rep PathVersion x -> PathVersion)
-> Generic PathVersion
forall x. Rep PathVersion x -> PathVersion
forall x. PathVersion -> Rep PathVersion x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cfrom :: forall x. PathVersion -> Rep PathVersion x
from :: forall x. PathVersion -> Rep PathVersion x
$cto :: forall x. Rep PathVersion x -> PathVersion
to :: forall x. Rep PathVersion x -> PathVersion
Generic, Int -> PathVersion -> ShowS
[PathVersion] -> ShowS
PathVersion -> String
(Int -> PathVersion -> ShowS)
-> (PathVersion -> String)
-> ([PathVersion] -> ShowS)
-> Show PathVersion
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> PathVersion -> ShowS
showsPrec :: Int -> PathVersion -> ShowS
$cshow :: PathVersion -> String
show :: PathVersion -> String
$cshowList :: [PathVersion] -> ShowS
showList :: [PathVersion] -> ShowS
Show, Maybe PathVersion
Value -> Parser [PathVersion]
Value -> Parser PathVersion
(Value -> Parser PathVersion)
-> (Value -> Parser [PathVersion])
-> Maybe PathVersion
-> FromJSON PathVersion
forall a.
(Value -> Parser a)
-> (Value -> Parser [a]) -> Maybe a -> FromJSON a
$cparseJSON :: Value -> Parser PathVersion
parseJSON :: Value -> Parser PathVersion
$cparseJSONList :: Value -> Parser [PathVersion]
parseJSONList :: Value -> Parser [PathVersion]
$comittedField :: Maybe PathVersion
omittedField :: Maybe PathVersion
FromJSON)
instance FromJSON PutBody where
parseJSON :: Value -> Parser PutBody
parseJSON = String -> (Object -> Parser PutBody) -> Value -> Parser PutBody
forall a. String -> (Object -> Parser a) -> Value -> Parser a
withObject String
"PutBody" ((Object -> Parser PutBody) -> Value -> Parser PutBody)
-> (Object -> Parser PutBody) -> Value -> Parser PutBody
forall a b. (a -> b) -> a -> b
$ \Object
o -> do
KeyMap Flag
flags <- Object
o Object -> Key -> Parser (KeyMap Flag)
forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"flags"
KeyMap Segment
segments <- Object
o Object -> Key -> Parser (Maybe (KeyMap Segment))
forall a. FromJSON a => Object -> Key -> Parser (Maybe a)
.:? Key
"segments" Parser (Maybe (KeyMap Segment))
-> KeyMap Segment -> Parser (KeyMap Segment)
forall a. Parser (Maybe a) -> a -> Parser a
.!= KeyMap Segment
forall v. KeyMap v
emptyObject
PutBody -> Parser PutBody
forall a. a -> Parser a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (PutBody -> Parser PutBody) -> PutBody -> Parser PutBody
forall a b. (a -> b) -> a -> b
$ PutBody {$sel:flags:PutBody :: KeyMap Flag
flags = KeyMap Flag
flags, $sel:segments:PutBody :: KeyMap Segment
segments = KeyMap Segment
segments}
instance FromJSON a => FromJSON (PathData a) where
parseJSON :: Value -> Parser (PathData a)
parseJSON = String
-> (Object -> Parser (PathData a)) -> Value -> Parser (PathData a)
forall a. String -> (Object -> Parser a) -> Value -> Parser a
withObject String
"Put" ((Object -> Parser (PathData a)) -> Value -> Parser (PathData a))
-> (Object -> Parser (PathData a)) -> Value -> Parser (PathData a)
forall a b. (a -> b) -> a -> b
$ \Object
o -> do
a
pathData <- Object
o Object -> Key -> Parser a
forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"data"
Text
path <- Object
o Object -> Key -> Parser (Maybe Text)
forall a. FromJSON a => Object -> Key -> Parser (Maybe a)
.:? Key
"path" Parser (Maybe Text) -> Text -> Parser Text
forall a. Parser (Maybe a) -> a -> Parser a
.!= Text
"/"
PathData a -> Parser (PathData a)
forall a. a -> Parser a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (PathData a -> Parser (PathData a))
-> PathData a -> Parser (PathData a)
forall a b. (a -> b) -> a -> b
$ PathData {$sel:path:PathData :: Text
path = Text
path, $sel:pathData:PathData :: a
pathData = a
pathData}
data SSE = SSE
{ SSE -> Text
name :: !Text
, SSE -> Text
buffer :: !Text
, SSE -> Maybe Text
lastEventId :: !(Maybe Text)
, SSE -> Maybe Text
retry :: !(Maybe Text)
}
deriving ((forall x. SSE -> Rep SSE x)
-> (forall x. Rep SSE x -> SSE) -> Generic SSE
forall x. Rep SSE x -> SSE
forall x. SSE -> Rep SSE x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cfrom :: forall x. SSE -> Rep SSE x
from :: forall x. SSE -> Rep SSE x
$cto :: forall x. Rep SSE x -> SSE
to :: forall x. Rep SSE x -> SSE
Generic, Int -> SSE -> ShowS
[SSE] -> ShowS
SSE -> String
(Int -> SSE -> ShowS)
-> (SSE -> String) -> ([SSE] -> ShowS) -> Show SSE
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> SSE -> ShowS
showsPrec :: Int -> SSE -> ShowS
$cshow :: SSE -> String
show :: SSE -> String
$cshowList :: [SSE] -> ShowS
showList :: [SSE] -> ShowS
Show, SSE -> SSE -> Bool
(SSE -> SSE -> Bool) -> (SSE -> SSE -> Bool) -> Eq SSE
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: SSE -> SSE -> Bool
== :: SSE -> SSE -> Bool
$c/= :: SSE -> SSE -> Bool
/= :: SSE -> SSE -> Bool
Eq)
nameCharPredicate :: Char -> Bool
nameCharPredicate :: Char -> Bool
nameCharPredicate Char
x = Char
x Char -> Char -> Bool
forall a. Eq a => a -> a -> Bool
/= Char
'\r' Bool -> Bool -> Bool
&& Char
x Char -> Char -> Bool
forall a. Eq a => a -> a -> Bool
/= Char
':' Bool -> Bool -> Bool
&& Char
x Char -> Char -> Bool
forall a. Eq a => a -> a -> Bool
/= Char
'\n'
anyCharPredicate :: Char -> Bool
anyCharPredicate :: Char -> Bool
anyCharPredicate Char
x = Char
x Char -> Char -> Bool
forall a. Eq a => a -> a -> Bool
/= Char
'\r' Bool -> Bool -> Bool
&& Char
x Char -> Char -> Bool
forall a. Eq a => a -> a -> Bool
/= Char
'\n'
endOfLineSSE :: Parser ()
endOfLineSSE :: Parser ()
endOfLineSSE = [Parser ()] -> Parser ()
forall (f :: * -> *) a. Alternative f => [f a] -> f a
choice [Parser Text Text -> Parser ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (Parser Text Text -> Parser ()) -> Parser Text Text -> Parser ()
forall a b. (a -> b) -> a -> b
$ Text -> Parser Text Text
string Text
"\r\n", Parser Text Text -> Parser ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (Parser Text Text -> Parser ()) -> Parser Text Text -> Parser ()
forall a b. (a -> b) -> a -> b
$ Text -> Parser Text Text
string Text
"\r", Parser Text Text -> Parser ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (Parser Text Text -> Parser ()) -> Parser Text Text -> Parser ()
forall a b. (a -> b) -> a -> b
$ Text -> Parser Text Text
string Text
"\n", Parser ()
forall t. Chunk t => Parser t ()
endOfInput]
comment :: Parser ()
= Char -> Parser Char
char Char
':' Parser Char -> Parser Text Text -> Parser Text Text
forall a b. Parser Text a -> Parser Text b -> Parser Text b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> (Char -> Bool) -> Parser Text Text
P.takeWhile Char -> Bool
anyCharPredicate Parser Text Text -> Parser () -> Parser ()
forall a b. Parser Text a -> Parser Text b -> Parser Text b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Parser ()
endOfLineSSE Parser () -> Parser () -> Parser ()
forall a b. Parser Text a -> Parser Text b -> Parser Text b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> () -> Parser ()
forall a. a -> Parser Text a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
parseField :: Parser (Text, Text)
parseField :: Parser (Text, Text)
parseField = do
Text
fieldName <- (Char -> Bool) -> Parser Text Text
P.takeWhile1 Char -> Bool
nameCharPredicate
Parser Char -> Parser ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (Parser Char -> Parser ()) -> Parser Char -> Parser ()
forall a b. (a -> b) -> a -> b
$ Char -> Parser Char -> Parser Char
forall (f :: * -> *) a. Alternative f => a -> f a -> f a
option Char
' ' (Parser Char -> Parser Char) -> Parser Char -> Parser Char
forall a b. (a -> b) -> a -> b
$ Char -> Parser Char
char Char
':'
Parser Char -> Parser ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (Parser Char -> Parser ()) -> Parser Char -> Parser ()
forall a b. (a -> b) -> a -> b
$ Char -> Parser Char -> Parser Char
forall (f :: * -> *) a. Alternative f => a -> f a -> f a
option Char
' ' (Parser Char -> Parser Char) -> Parser Char -> Parser Char
forall a b. (a -> b) -> a -> b
$ Char -> Parser Char
char Char
' '
Text
fieldValue <- (Char -> Bool) -> Parser Text Text
P.takeWhile Char -> Bool
anyCharPredicate
Parser ()
endOfLineSSE
(Text, Text) -> Parser (Text, Text)
forall a. a -> Parser Text a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Text
fieldName, Text
fieldValue)
processField :: (Text, Text) -> SSE -> SSE
processField :: (Text, Text) -> SSE -> SSE
processField (Text
fieldName, Text
fieldValue) SSE
event = case Text
fieldName of
Text
"event" -> SSE
event {name = fieldValue}
Text
"id" -> SSE
event {lastEventId = Just fieldValue}
Text
"retry" -> SSE
event {retry = Just fieldValue}
Text
"data" -> SSE
event {buffer = T.concat [buffer event, if T.null (buffer event) then "" else "\n", fieldValue]}
Text
_ -> SSE
event
parseEvent :: Parser SSE
parseEvent :: Parser SSE
parseEvent = do
[(Text, Text)]
fields <- [[(Text, Text)]] -> [(Text, Text)]
forall (t :: * -> *) a. Foldable t => t [a] -> [a]
concat ([[(Text, Text)]] -> [(Text, Text)])
-> Parser Text [[(Text, Text)]] -> Parser Text [(Text, Text)]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Parser Text [(Text, Text)] -> Parser Text [[(Text, Text)]]
forall a. Parser Text a -> Parser Text [a]
forall (f :: * -> *) a. Alternative f => f a -> f [a]
many ((Parser ()
comment Parser ()
-> Parser Text [(Text, Text)] -> Parser Text [(Text, Text)]
forall a b. Parser Text a -> Parser Text b -> Parser Text b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> [(Text, Text)] -> Parser Text [(Text, Text)]
forall a. a -> Parser Text a
forall (f :: * -> *) a. Applicative f => a -> f a
pure []) Parser Text [(Text, Text)]
-> Parser Text [(Text, Text)] -> Parser Text [(Text, Text)]
forall a. Parser Text a -> Parser Text a -> Parser Text a
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> ((Text, Text) -> [(Text, Text)])
-> Parser (Text, Text) -> Parser Text [(Text, Text)]
forall a b. (a -> b) -> Parser Text a -> Parser Text b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((Text, Text) -> [(Text, Text)] -> [(Text, Text)]
forall a. a -> [a] -> [a]
: []) Parser (Text, Text)
parseField)
Parser ()
endOfLineSSE
let event :: SSE
event = ((Text, Text) -> SSE -> SSE) -> SSE -> [(Text, Text)] -> SSE
forall a b. (a -> b -> b) -> b -> [a] -> b
forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr (Text, Text) -> SSE -> SSE
processField (Text -> Text -> Maybe Text -> Maybe Text -> SSE
SSE Text
"" Text
"" Maybe Text
forall a. Maybe a
forall (m :: * -> *) a. MonadPlus m => m a
mzero Maybe Text
forall a. Maybe a
forall (m :: * -> *) a. MonadPlus m => m a
mzero) [(Text, Text)]
fields
if Text -> Bool
T.null (SSE -> Text
name SSE
event) Bool -> Bool -> Bool
|| Text -> Bool
T.null (SSE -> Text
buffer SSE
event) then Parser SSE
parseEvent else SSE -> Parser SSE
forall a. a -> Parser Text a
forall (f :: * -> *) a. Applicative f => a -> f a
pure SSE
event
processPut :: (MonadIO m, MonadLogger m) => DataSourceUpdates -> L.ByteString -> m Bool
processPut :: forall (m :: * -> *).
(MonadIO m, MonadLogger m) =>
DataSourceUpdates -> ByteString -> m Bool
processPut DataSourceUpdates
dataSourceUpdates ByteString
value = case ByteString -> Either String (PathData PutBody)
forall a. FromJSON a => ByteString -> Either String a
eitherDecode ByteString
value of
Right (PathData Text
_ (PutBody KeyMap Flag
flags KeyMap Segment
segments)) -> do
$(logDebug) Text
"initializing dataSourceUpdates with put"
IO (Either Text ()) -> m (Either Text ())
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (DataSourceUpdates
-> KeyMap Flag -> KeyMap Segment -> IO (Either Text ())
dataSourceUpdatesInit DataSourceUpdates
dataSourceUpdates KeyMap Flag
flags KeyMap Segment
segments) m (Either Text ()) -> (Either Text () -> m Bool) -> m Bool
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Left Text
err -> do
$(logError) (Text -> m ()) -> Text -> m ()
forall a b. (a -> b) -> a -> b
$ Text -> Text -> Text
T.append Text
"dataSourceUpdates failed put: " Text
err
Bool -> m Bool
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
False
Either Text ()
_ -> Bool -> m Bool
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
True
Left String
err -> do
$(logError) (Text -> m ()) -> Text -> m ()
forall a b. (a -> b) -> a -> b
$ Text -> Text -> Text
T.append Text
"failed to parse put body" (String -> Text
T.pack String
err)
Bool -> m Bool
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
False
processPatch :: forall m. (MonadIO m, MonadLogger m) => DataSourceUpdates -> L.ByteString -> m Bool
processPatch :: forall (m :: * -> *).
(MonadIO m, MonadLogger m) =>
DataSourceUpdates -> ByteString -> m Bool
processPatch DataSourceUpdates
dataSourceUpdates ByteString
value = case ByteString -> Either String (PathData Value)
forall a. FromJSON a => ByteString -> Either String a
eitherDecode ByteString
value of
Right (PathData Text
path Value
body)
| Text -> Text -> Bool
T.isPrefixOf Text
"/flags/" Text
path -> Text
-> Text
-> (DataSourceUpdates -> Flag -> IO (Either Text ()))
-> Result Flag
-> m Bool
forall a.
Text
-> Text
-> (DataSourceUpdates -> a -> IO (Either Text ()))
-> Result a
-> m Bool
insPatch Text
"flag" Text
path DataSourceUpdates -> Flag -> IO (Either Text ())
dataSourceUpdatesInsertFlag (Value -> Result Flag
forall a. FromJSON a => Value -> Result a
fromJSON Value
body)
| Text -> Text -> Bool
T.isPrefixOf Text
"/segments/" Text
path -> Text
-> Text
-> (DataSourceUpdates -> Segment -> IO (Either Text ()))
-> Result Segment
-> m Bool
forall a.
Text
-> Text
-> (DataSourceUpdates -> a -> IO (Either Text ()))
-> Result a
-> m Bool
insPatch Text
"segment" Text
path DataSourceUpdates -> Segment -> IO (Either Text ())
dataSourceUpdatesInsertSegment (Value -> Result Segment
forall a. FromJSON a => Value -> Result a
fromJSON Value
body)
| Bool
otherwise -> do
$(logError) Text
"unknown patch path"
Bool -> m Bool
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
True
Left String
err -> do
$(logError) (Text -> m ()) -> Text -> m ()
forall a b. (a -> b) -> a -> b
$ Text -> Text -> Text
T.append Text
"failed to parse patch generic" (String -> Text
T.pack String
err)
Bool -> m Bool
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
False
where
insPatch :: Text -> Text -> (DataSourceUpdates -> a -> IO (Either Text ())) -> Result a -> m Bool
insPatch :: forall a.
Text
-> Text
-> (DataSourceUpdates -> a -> IO (Either Text ()))
-> Result a
-> m Bool
insPatch Text
name Text
_ DataSourceUpdates -> a -> IO (Either Text ())
_ (Error String
err) = do
$(logError) (Text -> m ()) -> Text -> m ()
forall a b. (a -> b) -> a -> b
$ [Text] -> Text
T.concat [Text
"failed to parse patch ", Text
name, Text
": ", String -> Text
T.pack String
err]
Bool -> m Bool
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
False
insPatch Text
name Text
path DataSourceUpdates -> a -> IO (Either Text ())
insert (Success a
item) = do
$(logDebug) (Text -> m ()) -> Text -> m ()
forall a b. (a -> b) -> a -> b
$ [Text] -> Text
T.concat [Text
"patching ", Text
name, Text
" with path: ", Text
path]
Either Text ()
status <- IO (Either Text ()) -> m (Either Text ())
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either Text ()) -> m (Either Text ()))
-> IO (Either Text ()) -> m (Either Text ())
forall a b. (a -> b) -> a -> b
$ DataSourceUpdates -> a -> IO (Either Text ())
insert DataSourceUpdates
dataSourceUpdates a
item
(Text -> m Bool) -> (() -> m Bool) -> Either Text () -> m Bool
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either
( \Text
err -> do
$(logError) (Text -> m ()) -> Text -> m ()
forall a b. (a -> b) -> a -> b
$ [Text] -> Text
T.concat [Text
"dataSourceUpdates failed ", Text
name, Text
" patch: ", Text
err]
Bool -> m Bool
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
False
)
(m Bool -> () -> m Bool
forall a b. a -> b -> a
const (m Bool -> () -> m Bool) -> m Bool -> () -> m Bool
forall a b. (a -> b) -> a -> b
$ Bool -> m Bool
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
True)
Either Text ()
status
processDelete :: forall m. (MonadIO m, MonadLogger m) => DataSourceUpdates -> L.ByteString -> m Bool
processDelete :: forall (m :: * -> *).
(MonadIO m, MonadLogger m) =>
DataSourceUpdates -> ByteString -> m Bool
processDelete DataSourceUpdates
dataSourceUpdates ByteString
value = case ByteString -> Either String PathVersion
forall a. FromJSON a => ByteString -> Either String a
eitherDecode ByteString
value :: Either String PathVersion of
Right (PathVersion Text
path Natural
version)
| Text -> Text -> Bool
T.isPrefixOf Text
"/flags/" Text
path -> Text -> Text -> IO (Either Text ()) -> m Bool
logDelete Text
"flag" Text
path (DataSourceUpdates -> Text -> Natural -> IO (Either Text ())
dataSourceUpdatesDeleteFlag DataSourceUpdates
dataSourceUpdates (Int -> Text -> Text
T.drop Int
7 Text
path) Natural
version)
| Text -> Text -> Bool
T.isPrefixOf Text
"/segments/" Text
path -> Text -> Text -> IO (Either Text ()) -> m Bool
logDelete Text
"segment" Text
path (DataSourceUpdates -> Text -> Natural -> IO (Either Text ())
dataSourceUpdatesDeleteSegment DataSourceUpdates
dataSourceUpdates (Int -> Text -> Text
T.drop Int
10 Text
path) Natural
version)
| Bool
otherwise -> do
$(logError) Text
"unknown delete path"
Bool -> m Bool
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
False
Left String
err -> do
$(logError) (Text -> m ()) -> Text -> m ()
forall a b. (a -> b) -> a -> b
$ Text -> Text -> Text
T.append Text
"failed to parse delete" (String -> Text
T.pack String
err)
Bool -> m Bool
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
False
where
logDelete :: Text -> Text -> StoreResult () -> m Bool
logDelete :: Text -> Text -> IO (Either Text ()) -> m Bool
logDelete Text
name Text
path IO (Either Text ())
action = do
$(logDebug) (Text -> m ()) -> Text -> m ()
forall a b. (a -> b) -> a -> b
$ [Text] -> Text
T.concat [Text
"deleting ", Text
name, Text
" with path: ", Text
path]
Either Text ()
status <- IO (Either Text ()) -> m (Either Text ())
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (Either Text ())
action
(Text -> m Bool) -> (() -> m Bool) -> Either Text () -> m Bool
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either
( \Text
err -> do
$(logError) (Text -> m ()) -> Text -> m ()
forall a b. (a -> b) -> a -> b
$ [Text] -> Text
T.concat [Text
"dataSourceUpdates failed ", Text
name, Text
" delete: ", Text
err]
Bool -> m Bool
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
False
)
(m Bool -> () -> m Bool
forall a b. a -> b -> a
const (m Bool -> () -> m Bool) -> m Bool -> () -> m Bool
forall a b. (a -> b) -> a -> b
$ Bool -> m Bool
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
True)
Either Text ()
status
processEvent :: (MonadIO m, MonadLogger m) => DataSourceUpdates -> Text -> L.ByteString -> m Bool
processEvent :: forall (m :: * -> *).
(MonadIO m, MonadLogger m) =>
DataSourceUpdates -> Text -> ByteString -> m Bool
processEvent DataSourceUpdates
dataSourceUpdates Text
name ByteString
value = case Text
name of
Text
"put" -> DataSourceUpdates -> ByteString -> m Bool
forall (m :: * -> *).
(MonadIO m, MonadLogger m) =>
DataSourceUpdates -> ByteString -> m Bool
processPut DataSourceUpdates
dataSourceUpdates ByteString
value
Text
"patch" -> DataSourceUpdates -> ByteString -> m Bool
forall (m :: * -> *).
(MonadIO m, MonadLogger m) =>
DataSourceUpdates -> ByteString -> m Bool
processPatch DataSourceUpdates
dataSourceUpdates ByteString
value
Text
"delete" -> DataSourceUpdates -> ByteString -> m Bool
forall (m :: * -> *).
(MonadIO m, MonadLogger m) =>
DataSourceUpdates -> ByteString -> m Bool
processDelete DataSourceUpdates
dataSourceUpdates ByteString
value
Text
_ -> do
$(logWarn) Text
"unknown event type"
Bool -> m Bool
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
True
data ReadE = ReadETimeout | ReadEClosed deriving (Int -> ReadE -> ShowS
[ReadE] -> ShowS
ReadE -> String
(Int -> ReadE -> ShowS)
-> (ReadE -> String) -> ([ReadE] -> ShowS) -> Show ReadE
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> ReadE -> ShowS
showsPrec :: Int -> ReadE -> ShowS
$cshow :: ReadE -> String
show :: ReadE -> String
$cshowList :: [ReadE] -> ShowS
showList :: [ReadE] -> ShowS
Show, Show ReadE
Typeable ReadE
(Typeable ReadE, Show ReadE) =>
(ReadE -> SomeException)
-> (SomeException -> Maybe ReadE)
-> (ReadE -> String)
-> Exception ReadE
SomeException -> Maybe ReadE
ReadE -> String
ReadE -> SomeException
forall e.
(Typeable e, Show e) =>
(e -> SomeException)
-> (SomeException -> Maybe e) -> (e -> String) -> Exception e
$ctoException :: ReadE -> SomeException
toException :: ReadE -> SomeException
$cfromException :: SomeException -> Maybe ReadE
fromException :: SomeException -> Maybe ReadE
$cdisplayException :: ReadE -> String
displayException :: ReadE -> String
Exception)
tryReadE :: MonadCatch m => m a -> m (Either ReadE a)
tryReadE :: forall (m :: * -> *) a. MonadCatch m => m a -> m (Either ReadE a)
tryReadE = m a -> m (Either ReadE a)
forall (m :: * -> *) e a.
(HasCallStack, MonadCatch m, Exception e) =>
m a -> m (Either e a)
try
readWithException :: IO ByteString -> IO Text
readWithException :: IO ByteString -> IO Text
readWithException IO ByteString
body =
Int -> IO ByteString -> IO (Maybe ByteString)
forall a. Int -> IO a -> IO (Maybe a)
timeout (Int
1_000_000 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
300) (IO ByteString -> IO ByteString
brRead IO ByteString
body) IO (Maybe ByteString) -> (Maybe ByteString -> IO Text) -> IO Text
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Maybe ByteString
Nothing -> ReadE -> IO Text
forall e a. Exception e => e -> IO a
throwIO ReadE
ReadETimeout
Just ByteString
bytes -> if ByteString
bytes ByteString -> ByteString -> Bool
forall a. Eq a => a -> a -> Bool
== ByteString
B.empty then ReadE -> IO Text
forall e a. Exception e => e -> IO a
throwIO ReadE
ReadEClosed else Text -> IO Text
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ByteString -> Text
decodeUtf8 ByteString
bytes)
readStream :: (MonadIO m, MonadLogger m, MonadMask m) => IO ByteString -> DataSourceUpdates -> m Bool
readStream :: forall (m :: * -> *).
(MonadIO m, MonadLogger m, MonadMask m) =>
IO ByteString -> DataSourceUpdates -> m Bool
readStream IO ByteString
body DataSourceUpdates
dataSourceUpdates = Text -> Bool -> m Bool
loop Text
"" Bool
False
where
loop :: Text -> Bool -> m Bool
loop Text
initial Bool
processedEvent =
m (Result SSE) -> m (Either ReadE (Result SSE))
forall (m :: * -> *) a. MonadCatch m => m a -> m (Either ReadE a)
tryReadE (m Text -> Parser SSE -> Text -> m (Result SSE)
forall (m :: * -> *) a.
Monad m =>
m Text -> Parser a -> Text -> m (Result a)
parseWith (IO Text -> m Text
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Text -> m Text) -> IO Text -> m Text
forall a b. (a -> b) -> a -> b
$ IO ByteString -> IO Text
readWithException IO ByteString
body) Parser SSE
parseEvent Text
initial) m (Either ReadE (Result SSE))
-> (Either ReadE (Result SSE) -> m Bool) -> m Bool
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
(Left ReadE
ReadETimeout) -> do
$(logError) Text
"streaming connection unexpectedly closed"
Bool -> m Bool
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
processedEvent
(Left ReadE
ReadEClosed) -> do
$(logError) Text
"timeout waiting for SSE event"
Bool -> m Bool
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
processedEvent
(Right Result SSE
parsed) -> case Result SSE
parsed of
Done Text
remaining SSE
event -> do
Bool
processed <- DataSourceUpdates -> Text -> ByteString -> m Bool
forall (m :: * -> *).
(MonadIO m, MonadLogger m) =>
DataSourceUpdates -> Text -> ByteString -> m Bool
processEvent DataSourceUpdates
dataSourceUpdates (SSE -> Text
name SSE
event) (ByteString -> ByteString
L.fromStrict (ByteString -> ByteString) -> ByteString -> ByteString
forall a b. (a -> b) -> a -> b
$ Text -> ByteString
encodeUtf8 (Text -> ByteString) -> Text -> ByteString
forall a b. (a -> b) -> a -> b
$ SSE -> Text
buffer SSE
event)
if Bool
processed then Text -> Bool -> m Bool
loop Text
remaining Bool
True else Bool -> m Bool
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
processedEvent
Fail Text
_ [String]
context String
err -> do
$(logError) (Text -> m ()) -> Text -> m ()
forall a b. (a -> b) -> a -> b
$ Text -> [Text] -> Text
T.intercalate Text
" " [Text
"failed parsing SSE frame", String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ [String] -> String
forall a. Show a => a -> String
show [String]
context, String -> Text
T.pack String
err]
Bool -> m Bool
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
processedEvent
Partial Text -> Result SSE
_ -> do
$(logError) Text
"failed parsing SSE frame unexpected partial"
Bool -> m Bool
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
processedEvent
startNewConnection :: (MonadIO m, MonadLogger m, MonadMask m) => Manager -> Request -> DataSourceUpdates -> StreamingState -> m StreamingState
startNewConnection :: forall (m :: * -> *).
(MonadIO m, MonadLogger m, MonadMask m) =>
Manager
-> Request
-> DataSourceUpdates
-> StreamingState
-> m StreamingState
startNewConnection Manager
manager Request
request DataSourceUpdates
dataSourceUpdates state :: StreamingState
state@(StreamingState {Maybe TimeSpec
activeSince :: Maybe TimeSpec
$sel:activeSince:StreamingState :: StreamingState -> Maybe TimeSpec
activeSince}) = do
$(logDebug) Text
"starting new streaming connection"
Either HttpException Bool
status <- m Bool -> m (Either HttpException Bool)
forall (m :: * -> *) a.
MonadCatch m =>
m a -> m (Either HttpException a)
tryHTTP (m Bool -> m (Either HttpException Bool))
-> m Bool -> m (Either HttpException Bool)
forall a b. (a -> b) -> a -> b
$ Request
-> Manager -> (Response (IO ByteString) -> m Bool) -> m Bool
forall (m :: * -> *) a.
(MonadIO m, MonadMask m) =>
Request -> Manager -> (Response (IO ByteString) -> m a) -> m a
withResponseGeneric Request
request Manager
manager ((Response (IO ByteString) -> m Bool) -> m Bool)
-> (Response (IO ByteString) -> m Bool) -> m Bool
forall a b. (a -> b) -> a -> b
$ \Response (IO ByteString)
response -> do
Response (IO ByteString) -> m ()
forall (m :: * -> *) body. MonadThrow m => Response body -> m ()
checkAuthorization Response (IO ByteString)
response
m () -> m () -> m ()
forall a b. m a -> m b -> m b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Request -> Response (IO ByteString) -> m ()
forall (m :: * -> *).
MonadIO m =>
Request -> Response (IO ByteString) -> m ()
throwIfNot200 Request
request Response (IO ByteString)
response
m () -> m Bool -> m Bool
forall a b. m a -> m b -> m b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO ByteString -> DataSourceUpdates -> m Bool
forall (m :: * -> *).
(MonadIO m, MonadLogger m, MonadMask m) =>
IO ByteString -> DataSourceUpdates -> m Bool
readStream (Response (IO ByteString) -> IO ByteString
forall body. Response body -> body
responseBody Response (IO ByteString)
response) DataSourceUpdates
dataSourceUpdates
TimeSpec
now <- IO TimeSpec -> m TimeSpec
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO TimeSpec -> m TimeSpec) -> IO TimeSpec -> m TimeSpec
forall a b. (a -> b) -> a -> b
$ Clock -> IO TimeSpec
getTime Clock
Monotonic
StreamingState
-> TimeSpec -> Either HttpException Bool -> m StreamingState
forall (m :: * -> *).
(MonadIO m, MonadLogger m, MonadMask m) =>
StreamingState
-> TimeSpec -> Either HttpException Bool -> m StreamingState
handleResponse StreamingState
state TimeSpec
now Either HttpException Bool
status
where
handleResponse :: (MonadIO m, MonadLogger m, MonadMask m) => StreamingState -> TimeSpec -> (Either HttpException Bool) -> m StreamingState
handleResponse :: forall (m :: * -> *).
(MonadIO m, MonadLogger m, MonadMask m) =>
StreamingState
-> TimeSpec -> Either HttpException Bool -> m StreamingState
handleResponse StreamingState
state TimeSpec
now Either HttpException Bool
result =
let state' :: StreamingState
state' = StreamingState
-> TimeSpec -> Either HttpException Bool -> StreamingState
updateState StreamingState
state TimeSpec
now Either HttpException Bool
result
in if StreamingState -> Bool
cancel StreamingState
state'
then StreamingState -> m StreamingState
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure StreamingState
state'
else do
Int
delay <- StreamingState -> m Int
forall (m :: * -> *).
(MonadIO m, MonadLogger m, MonadMask m) =>
StreamingState -> m Int
calculateDelay StreamingState
state'
()
_ <- IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Int -> IO ()
threadDelay Int
delay
StreamingState -> m StreamingState
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure StreamingState
state'
updateState :: StreamingState -> TimeSpec -> (Either HttpException Bool) -> StreamingState
updateState :: StreamingState
-> TimeSpec -> Either HttpException Bool -> StreamingState
updateState StreamingState
state TimeSpec
now (Right Bool
_) = StreamingState
state {initialConnection = False, activeSince = Just now, attempt = 1}
updateState state :: StreamingState
state@(StreamingState {$sel:attempt:StreamingState :: StreamingState -> Int
attempt = Int
att}) TimeSpec
now (Left (HttpExceptionRequest Request
_ (StatusCodeException Response ()
response ByteString
_)))
| Int -> Bool
isHttpUnrecoverable Int
code = StreamingState
state {cancel = True}
| Bool
otherwise = do
case Maybe TimeSpec
activeSince of
(Just TimeSpec
time)
| (TimeSpec
now TimeSpec -> TimeSpec -> Bool
forall a. Ord a => a -> a -> Bool
>= TimeSpec
time TimeSpec -> TimeSpec -> TimeSpec
forall a. Num a => a -> a -> a
+ (Int64 -> Int64 -> TimeSpec
TimeSpec Int64
60 Int64
0)) -> StreamingState
state {attempt = 1, activeSince = Nothing}
| Bool
otherwise -> StreamingState
state {attempt = att + 1}
Maybe TimeSpec
Nothing -> StreamingState
state {attempt = att + 1}
where
code :: Int
code = Status -> Int
statusCode (Response () -> Status
forall body. Response body -> Status
responseStatus Response ()
response)
updateState state :: StreamingState
state@(StreamingState {$sel:attempt:StreamingState :: StreamingState -> Int
attempt = Int
att}) TimeSpec
_ Either HttpException Bool
_ = StreamingState
state {attempt = att + 1}
calculateDelay :: (MonadIO m, MonadLogger m, MonadMask m) => StreamingState -> m Int
calculateDelay :: forall (m :: * -> *).
(MonadIO m, MonadLogger m, MonadMask m) =>
StreamingState -> m Int
calculateDelay StreamingState {Int
initialRetryDelay :: Int
$sel:initialRetryDelay:StreamingState :: StreamingState -> Int
initialRetryDelay, $sel:attempt:StreamingState :: StreamingState -> Int
attempt = Int
att} = do
IO Int -> m Int
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Int -> m Int) -> IO Int -> m Int
forall a b. (a -> b) -> a -> b
$
IO StdGen
forall (m :: * -> *). MonadIO m => m StdGen
newStdGen IO StdGen -> (StdGen -> IO Int) -> IO Int
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \StdGen
gen ->
let timespan :: Int
timespan = Int -> Int -> Int
forall a. Ord a => a -> a -> a
min (Int
30 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
1_000_000) ((Int
initialRetryDelay Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
1_000) Int -> Int -> Int
forall a. Num a => a -> a -> a
* (Int
2 Int -> Int -> Int
forall a b. (Num a, Integral b) => a -> b -> a
^ (Int
att Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1)))
jitter :: Int
jitter = (Int, StdGen) -> Int
forall a b. (a, b) -> a
fst ((Int, StdGen) -> Int) -> (Int, StdGen) -> Int
forall a b. (a -> b) -> a -> b
$ CharPos -> StdGen -> (Int, StdGen)
forall g. RandomGen g => CharPos -> g -> (Int, g)
forall a g. (Random a, RandomGen g) => (a, a) -> g -> (a, g)
randomR (Int
0, Int
timespan Int -> Int -> Int
forall a. Integral a => a -> a -> a
`div` Int
2) StdGen
gen
in Int -> IO Int
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Int -> IO Int) -> Int -> IO Int
forall a b. (a -> b) -> a -> b
$ (Int
timespan Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
jitter)
data StreamingState = StreamingState
{ StreamingState -> Bool
initialConnection :: Bool
, StreamingState -> Int
initialRetryDelay :: Int
, StreamingState -> Maybe TimeSpec
activeSince :: Maybe TimeSpec
, StreamingState -> Int
attempt :: Int
, StreamingState -> Bool
cancel :: Bool
}
deriving ((forall x. StreamingState -> Rep StreamingState x)
-> (forall x. Rep StreamingState x -> StreamingState)
-> Generic StreamingState
forall x. Rep StreamingState x -> StreamingState
forall x. StreamingState -> Rep StreamingState x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cfrom :: forall x. StreamingState -> Rep StreamingState x
from :: forall x. StreamingState -> Rep StreamingState x
$cto :: forall x. Rep StreamingState x -> StreamingState
to :: forall x. Rep StreamingState x -> StreamingState
Generic, Int -> StreamingState -> ShowS
[StreamingState] -> ShowS
StreamingState -> String
(Int -> StreamingState -> ShowS)
-> (StreamingState -> String)
-> ([StreamingState] -> ShowS)
-> Show StreamingState
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> StreamingState -> ShowS
showsPrec :: Int -> StreamingState -> ShowS
$cshow :: StreamingState -> String
show :: StreamingState -> String
$cshowList :: [StreamingState] -> ShowS
showList :: [StreamingState] -> ShowS
Show)
streamingThread :: (MonadIO m, MonadLogger m, MonadMask m) => Text -> Int -> ClientContext -> DataSourceUpdates -> m ()
streamingThread :: forall (m :: * -> *).
(MonadIO m, MonadLogger m, MonadMask m) =>
Text -> Int -> ClientContext -> DataSourceUpdates -> m ()
streamingThread Text
streamURI Int
initialRetryDelay ClientContext
clientContext DataSourceUpdates
dataSourceUpdates = do
let manager :: Manager
manager = HttpConfiguration -> Manager
tlsManager (HttpConfiguration -> Manager) -> HttpConfiguration -> Manager
forall a b. (a -> b) -> a -> b
$ ClientContext -> HttpConfiguration
httpConfiguration ClientContext
clientContext
Request
req <- HttpConfiguration -> String -> m Request
forall (m :: * -> *).
MonadThrow m =>
HttpConfiguration -> String -> m Request
prepareRequest (ClientContext -> HttpConfiguration
httpConfiguration ClientContext
clientContext) (Text -> String
T.unpack Text
streamURI String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
"/all")
DataSourceUpdates -> m () -> m ()
forall (m :: * -> *).
(MonadIO m, MonadLogger m, MonadCatch m) =>
DataSourceUpdates -> m () -> m ()
handleUnauthorized DataSourceUpdates
dataSourceUpdates (Manager -> Request -> m ()
forall (m :: * -> *).
(MonadIO m, MonadLogger m, MonadMask m) =>
Manager -> Request -> m ()
processStream Manager
manager Request
req)
where
processStream :: (MonadIO m, MonadLogger m, MonadMask m) => Manager -> Request -> m ()
processStream :: forall (m :: * -> *).
(MonadIO m, MonadLogger m, MonadMask m) =>
Manager -> Request -> m ()
processStream Manager
manager Request
req = do
m StreamingState -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m StreamingState -> m ()) -> m StreamingState -> m ()
forall a b. (a -> b) -> a -> b
$ (StreamingState -> Bool)
-> (StreamingState -> m StreamingState)
-> StreamingState
-> m StreamingState
forall (m :: * -> *) a.
Monad m =>
(a -> Bool) -> (a -> m a) -> a -> m a
iterateUntilM (StreamingState -> Bool
cancel) (\StreamingState
state -> Manager
-> Request
-> DataSourceUpdates
-> StreamingState
-> m StreamingState
forall (m :: * -> *).
(MonadIO m, MonadLogger m, MonadMask m) =>
Manager
-> Request
-> DataSourceUpdates
-> StreamingState
-> m StreamingState
startNewConnection Manager
manager Request
req DataSourceUpdates
dataSourceUpdates StreamingState
state) StreamingState {$sel:initialConnection:StreamingState :: Bool
initialConnection = Bool
True, Int
$sel:initialRetryDelay:StreamingState :: Int
initialRetryDelay :: Int
initialRetryDelay, $sel:activeSince:StreamingState :: Maybe TimeSpec
activeSince = Maybe TimeSpec
forall a. Maybe a
Nothing, $sel:attempt:StreamingState :: Int
attempt = Int
0, $sel:cancel:StreamingState :: Bool
cancel = Bool
False}