module LaunchDarkly.Server.Network.Streaming (streamingThread) where
import Data.Text (Text)
import qualified Data.Text as T
import Data.Attoparsec.Text as P hiding (Result, try)
import Data.Function (fix)
import Control.Monad (void, mzero)
import Control.Monad.Catch (Exception, MonadCatch, try)
import Control.Exception (throwIO)
import Data.ByteString (ByteString)
import qualified Data.ByteString as B
import Control.Applicative (many)
import Data.Text.Encoding (decodeUtf8, encodeUtf8)
import Data.HashMap.Strict (HashMap)
import Network.HTTP.Client (Manager, Response(..), Request, HttpException(..), HttpExceptionContent(..), parseRequest, brRead, throwErrorStatusCodes)
import Control.Monad.Logger (MonadLogger, logInfo, logWarn, logError, logDebug)
import Control.Monad.IO.Class (MonadIO, liftIO)
import Data.Generics.Product (getField)
import Data.Aeson (FromJSON, parseJSON, withObject, eitherDecode, (.:), fromJSON, Result(..))
import qualified Data.ByteString.Lazy as L
import GHC.Natural (Natural)
import GHC.Generics (Generic)
import Control.Monad.Catch (MonadMask)
import Control.Retry (RetryPolicyM, RetryStatus, fullJitterBackoff, capDelay, retrying)
import Network.HTTP.Types.Status (ok200, Status(statusCode))
import System.Timeout (timeout)
import System.Clock (getTime, Clock(Monotonic), TimeSpec(TimeSpec))
import LaunchDarkly.Server.Client.Internal (ClientI)
import LaunchDarkly.Server.Store.Internal (StoreHandle, StoreResult, initializeStore, insertFlag, deleteFlag, deleteSegment, insertSegment)
import LaunchDarkly.Server.Features (Flag, Segment)
import LaunchDarkly.Server.Network.Common (tryAuthorized, checkAuthorization, prepareRequest, withResponseGeneric, tryHTTP)
data PutBody = PutBody
{ PutBody -> HashMap Text Flag
flags :: !(HashMap Text Flag)
, PutBody -> HashMap Text Segment
segments :: !(HashMap Text Segment)
} deriving ((forall x. PutBody -> Rep PutBody x)
-> (forall x. Rep PutBody x -> PutBody) -> Generic PutBody
forall x. Rep PutBody x -> PutBody
forall x. PutBody -> Rep PutBody x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep PutBody x -> PutBody
$cfrom :: forall x. PutBody -> Rep PutBody x
Generic, Int -> PutBody -> ShowS
[PutBody] -> ShowS
PutBody -> String
(Int -> PutBody -> ShowS)
-> (PutBody -> String) -> ([PutBody] -> ShowS) -> Show PutBody
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [PutBody] -> ShowS
$cshowList :: [PutBody] -> ShowS
show :: PutBody -> String
$cshow :: PutBody -> String
showsPrec :: Int -> PutBody -> ShowS
$cshowsPrec :: Int -> PutBody -> ShowS
Show, Value -> Parser [PutBody]
Value -> Parser PutBody
(Value -> Parser PutBody)
-> (Value -> Parser [PutBody]) -> FromJSON PutBody
forall a.
(Value -> Parser a) -> (Value -> Parser [a]) -> FromJSON a
parseJSONList :: Value -> Parser [PutBody]
$cparseJSONList :: Value -> Parser [PutBody]
parseJSON :: Value -> Parser PutBody
$cparseJSON :: Value -> Parser PutBody
FromJSON)
data PathData d = PathData
{ PathData d -> Text
path :: !Text
, PathData d -> d
pathData :: !d
} deriving ((forall x. PathData d -> Rep (PathData d) x)
-> (forall x. Rep (PathData d) x -> PathData d)
-> Generic (PathData d)
forall x. Rep (PathData d) x -> PathData d
forall x. PathData d -> Rep (PathData d) x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
forall d x. Rep (PathData d) x -> PathData d
forall d x. PathData d -> Rep (PathData d) x
$cto :: forall d x. Rep (PathData d) x -> PathData d
$cfrom :: forall d x. PathData d -> Rep (PathData d) x
Generic, Int -> PathData d -> ShowS
[PathData d] -> ShowS
PathData d -> String
(Int -> PathData d -> ShowS)
-> (PathData d -> String)
-> ([PathData d] -> ShowS)
-> Show (PathData d)
forall d. Show d => Int -> PathData d -> ShowS
forall d. Show d => [PathData d] -> ShowS
forall d. Show d => PathData d -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [PathData d] -> ShowS
$cshowList :: forall d. Show d => [PathData d] -> ShowS
show :: PathData d -> String
$cshow :: forall d. Show d => PathData d -> String
showsPrec :: Int -> PathData d -> ShowS
$cshowsPrec :: forall d. Show d => Int -> PathData d -> ShowS
Show)
data PathVersion = PathVersion
{ PathVersion -> Text
path :: !Text
, PathVersion -> Natural
version :: !Natural
} deriving ((forall x. PathVersion -> Rep PathVersion x)
-> (forall x. Rep PathVersion x -> PathVersion)
-> Generic PathVersion
forall x. Rep PathVersion x -> PathVersion
forall x. PathVersion -> Rep PathVersion x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep PathVersion x -> PathVersion
$cfrom :: forall x. PathVersion -> Rep PathVersion x
Generic, Int -> PathVersion -> ShowS
[PathVersion] -> ShowS
PathVersion -> String
(Int -> PathVersion -> ShowS)
-> (PathVersion -> String)
-> ([PathVersion] -> ShowS)
-> Show PathVersion
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [PathVersion] -> ShowS
$cshowList :: [PathVersion] -> ShowS
show :: PathVersion -> String
$cshow :: PathVersion -> String
showsPrec :: Int -> PathVersion -> ShowS
$cshowsPrec :: Int -> PathVersion -> ShowS
Show, Value -> Parser [PathVersion]
Value -> Parser PathVersion
(Value -> Parser PathVersion)
-> (Value -> Parser [PathVersion]) -> FromJSON PathVersion
forall a.
(Value -> Parser a) -> (Value -> Parser [a]) -> FromJSON a
parseJSONList :: Value -> Parser [PathVersion]
$cparseJSONList :: Value -> Parser [PathVersion]
parseJSON :: Value -> Parser PathVersion
$cparseJSON :: Value -> Parser PathVersion
FromJSON)
instance FromJSON a => FromJSON (PathData a) where
parseJSON :: Value -> Parser (PathData a)
parseJSON = String
-> (Object -> Parser (PathData a)) -> Value -> Parser (PathData a)
forall a. String -> (Object -> Parser a) -> Value -> Parser a
withObject String
"Put" ((Object -> Parser (PathData a)) -> Value -> Parser (PathData a))
-> (Object -> Parser (PathData a)) -> Value -> Parser (PathData a)
forall a b. (a -> b) -> a -> b
$ \Object
o -> do
a
pathData <- Object
o Object -> Text -> Parser a
forall a. FromJSON a => Object -> Text -> Parser a
.: Text
"data"
Text
path <- Object
o Object -> Text -> Parser Text
forall a. FromJSON a => Object -> Text -> Parser a
.: Text
"path"
PathData a -> Parser (PathData a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (PathData a -> Parser (PathData a))
-> PathData a -> Parser (PathData a)
forall a b. (a -> b) -> a -> b
$ PathData :: forall d. Text -> d -> PathData d
PathData { $sel:path:PathData :: Text
path = Text
path, $sel:pathData:PathData :: a
pathData = a
pathData }
data SSE = SSE
{ SSE -> Text
name :: !Text
, SSE -> Text
buffer :: !Text
, SSE -> Maybe Text
lastEventId :: !(Maybe Text)
, SSE -> Maybe Text
retry :: !(Maybe Text)
} deriving ((forall x. SSE -> Rep SSE x)
-> (forall x. Rep SSE x -> SSE) -> Generic SSE
forall x. Rep SSE x -> SSE
forall x. SSE -> Rep SSE x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep SSE x -> SSE
$cfrom :: forall x. SSE -> Rep SSE x
Generic, Int -> SSE -> ShowS
[SSE] -> ShowS
SSE -> String
(Int -> SSE -> ShowS)
-> (SSE -> String) -> ([SSE] -> ShowS) -> Show SSE
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [SSE] -> ShowS
$cshowList :: [SSE] -> ShowS
show :: SSE -> String
$cshow :: SSE -> String
showsPrec :: Int -> SSE -> ShowS
$cshowsPrec :: Int -> SSE -> ShowS
Show, SSE -> SSE -> Bool
(SSE -> SSE -> Bool) -> (SSE -> SSE -> Bool) -> Eq SSE
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: SSE -> SSE -> Bool
$c/= :: SSE -> SSE -> Bool
== :: SSE -> SSE -> Bool
$c== :: SSE -> SSE -> Bool
Eq)
nameCharPredicate :: Char -> Bool
nameCharPredicate :: Char -> Bool
nameCharPredicate Char
x = Char
x Char -> Char -> Bool
forall a. Eq a => a -> a -> Bool
/= Char
'\r' Bool -> Bool -> Bool
&& Char
x Char -> Char -> Bool
forall a. Eq a => a -> a -> Bool
/= Char
':' Bool -> Bool -> Bool
&& Char
x Char -> Char -> Bool
forall a. Eq a => a -> a -> Bool
/= Char
'\n'
anyCharPredicate :: Char -> Bool
anyCharPredicate :: Char -> Bool
anyCharPredicate Char
x = Char
x Char -> Char -> Bool
forall a. Eq a => a -> a -> Bool
/= Char
'\r' Bool -> Bool -> Bool
&& Char
x Char -> Char -> Bool
forall a. Eq a => a -> a -> Bool
/= Char
'\n'
endOfLineSSE :: Parser ()
endOfLineSSE :: Parser ()
endOfLineSSE = [Parser ()] -> Parser ()
forall (f :: * -> *) a. Alternative f => [f a] -> f a
choice [Parser Text Text -> Parser ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (Parser Text Text -> Parser ()) -> Parser Text Text -> Parser ()
forall a b. (a -> b) -> a -> b
$ Text -> Parser Text Text
string Text
"\r\n", Parser Text Text -> Parser ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (Parser Text Text -> Parser ()) -> Parser Text Text -> Parser ()
forall a b. (a -> b) -> a -> b
$ Text -> Parser Text Text
string Text
"\r", Parser Text Text -> Parser ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (Parser Text Text -> Parser ()) -> Parser Text Text -> Parser ()
forall a b. (a -> b) -> a -> b
$ Text -> Parser Text Text
string Text
"\n", Parser ()
forall t. Chunk t => Parser t ()
endOfInput]
comment :: Parser ()
= Char -> Parser Char
char Char
':' Parser Char -> Parser Text Text -> Parser Text Text
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> (Char -> Bool) -> Parser Text Text
P.takeWhile Char -> Bool
anyCharPredicate Parser Text Text -> Parser () -> Parser ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Parser ()
endOfLineSSE Parser () -> Parser () -> Parser ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> () -> Parser ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
parseField :: Parser (Text, Text)
parseField :: Parser (Text, Text)
parseField = do
Text
fieldName <- (Char -> Bool) -> Parser Text Text
P.takeWhile1 Char -> Bool
nameCharPredicate
Parser Char -> Parser ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (Parser Char -> Parser ()) -> Parser Char -> Parser ()
forall a b. (a -> b) -> a -> b
$ Char -> Parser Char -> Parser Char
forall (f :: * -> *) a. Alternative f => a -> f a -> f a
option Char
' ' (Parser Char -> Parser Char) -> Parser Char -> Parser Char
forall a b. (a -> b) -> a -> b
$ Char -> Parser Char
char Char
':'
Parser Char -> Parser ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (Parser Char -> Parser ()) -> Parser Char -> Parser ()
forall a b. (a -> b) -> a -> b
$ Char -> Parser Char -> Parser Char
forall (f :: * -> *) a. Alternative f => a -> f a -> f a
option Char
' ' (Parser Char -> Parser Char) -> Parser Char -> Parser Char
forall a b. (a -> b) -> a -> b
$ Char -> Parser Char
char Char
' '
Text
fieldValue <- (Char -> Bool) -> Parser Text Text
P.takeWhile Char -> Bool
anyCharPredicate
Parser ()
endOfLineSSE
(Text, Text) -> Parser (Text, Text)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Text
fieldName, Text
fieldValue)
processField :: (Text, Text) -> SSE -> SSE
processField :: (Text, Text) -> SSE -> SSE
processField (Text
fieldName, Text
fieldValue) SSE
event = case Text
fieldName of
Text
"event" -> SSE
event { $sel:name:SSE :: Text
name = Text
fieldValue }
Text
"id" -> SSE
event { $sel:lastEventId:SSE :: Maybe Text
lastEventId = Text -> Maybe Text
forall a. a -> Maybe a
Just Text
fieldValue }
Text
"retry" -> SSE
event { $sel:retry:SSE :: Maybe Text
retry = Text -> Maybe Text
forall a. a -> Maybe a
Just Text
fieldValue }
Text
"data" -> SSE
event { $sel:buffer:SSE :: Text
buffer = [Text] -> Text
T.concat [ SSE -> Text
buffer SSE
event, if Text -> Bool
T.null (SSE -> Text
buffer SSE
event) then Text
"" else Text
"\n", Text
fieldValue] }
Text
_ -> SSE
event
parseEvent :: Parser SSE
parseEvent :: Parser SSE
parseEvent = do
[(Text, Text)]
fields <- Parser (Text, Text) -> Parser Text [(Text, Text)]
forall (f :: * -> *) a. Alternative f => f a -> f [a]
many (Parser () -> Parser Text [()]
forall (f :: * -> *) a. Alternative f => f a -> f [a]
many Parser ()
comment Parser Text [()] -> Parser (Text, Text) -> Parser (Text, Text)
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Parser (Text, Text)
parseField Parser (Text, Text)
-> ((Text, Text) -> Parser (Text, Text)) -> Parser (Text, Text)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (Text, Text) -> Parser (Text, Text)
forall (f :: * -> *) a. Applicative f => a -> f a
pure)
Parser ()
endOfLineSSE
let event :: SSE
event = ((Text, Text) -> SSE -> SSE) -> SSE -> [(Text, Text)] -> SSE
forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr (Text, Text) -> SSE -> SSE
processField (Text -> Text -> Maybe Text -> Maybe Text -> SSE
SSE Text
"" Text
"" Maybe Text
forall (m :: * -> *) a. MonadPlus m => m a
mzero Maybe Text
forall (m :: * -> *) a. MonadPlus m => m a
mzero) [(Text, Text)]
fields
if Text -> Bool
T.null (SSE -> Text
name SSE
event) Bool -> Bool -> Bool
|| Text -> Bool
T.null (SSE -> Text
buffer SSE
event) then Parser SSE
parseEvent else SSE -> Parser SSE
forall (f :: * -> *) a. Applicative f => a -> f a
pure SSE
event
processPut :: (MonadIO m, MonadLogger m) => StoreHandle IO -> L.ByteString -> m ()
processPut :: StoreHandle IO -> ByteString -> m ()
processPut StoreHandle IO
store ByteString
value = case ByteString -> Either String (PathData PutBody)
forall a. FromJSON a => ByteString -> Either String a
eitherDecode ByteString
value of
Right (PathData Text
_ (PutBody HashMap Text Flag
flags HashMap Text Segment
segments)) -> do
$(Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> m ()
(Text -> m ()) -> (Text -> Text) -> Text -> m ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
id :: forall a. a -> a
logInfo) Text
"initializing store with put"
IO (Either Text ()) -> m (Either Text ())
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (StoreHandle IO
-> HashMap Text Flag -> HashMap Text Segment -> IO (Either Text ())
forall store (m :: * -> *).
(LaunchDarklyStoreWrite store m, Monad m) =>
store
-> HashMap Text Flag -> HashMap Text Segment -> StoreResultM m ()
initializeStore StoreHandle IO
store HashMap Text Flag
flags HashMap Text Segment
segments) m (Either Text ()) -> (Either Text () -> m ()) -> m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Left Text
err -> $(Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> m ()
(Text -> m ()) -> (Text -> Text) -> Text -> m ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
id :: forall a. a -> a
logError) (Text -> m ()) -> Text -> m ()
forall a b. (a -> b) -> a -> b
$ Text -> Text -> Text
T.append Text
"store failed put: " Text
err
Either Text ()
_ -> () -> m ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
Left String
err -> $(Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> m ()
(Text -> m ()) -> (Text -> Text) -> Text -> m ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
id :: forall a. a -> a
logError) (Text -> m ()) -> Text -> m ()
forall a b. (a -> b) -> a -> b
$ Text -> Text -> Text
T.append Text
"failed to parse put body" (String -> Text
T.pack String
err)
processPatch :: forall m. (MonadIO m, MonadLogger m) => StoreHandle IO -> L.ByteString -> m ()
processPatch :: StoreHandle IO -> ByteString -> m ()
processPatch StoreHandle IO
store ByteString
value = case ByteString -> Either String (PathData Value)
forall a. FromJSON a => ByteString -> Either String a
eitherDecode ByteString
value of
Right (PathData Text
path Value
body)
| Text -> Text -> Bool
T.isPrefixOf Text
"/flags/" Text
path -> Text
-> Text
-> (StoreHandle IO -> Flag -> IO (Either Text ()))
-> Result Flag
-> m ()
forall a.
Text
-> Text
-> (StoreHandle IO -> a -> IO (Either Text ()))
-> Result a
-> m ()
insPatch Text
"flag" Text
path StoreHandle IO -> Flag -> IO (Either Text ())
forall store (m :: * -> *).
(LaunchDarklyStoreWrite store m, Monad m) =>
store -> Flag -> StoreResultM m ()
insertFlag (Value -> Result Flag
forall a. FromJSON a => Value -> Result a
fromJSON Value
body)
| Text -> Text -> Bool
T.isPrefixOf Text
"/segments/" Text
path -> Text
-> Text
-> (StoreHandle IO -> Segment -> IO (Either Text ()))
-> Result Segment
-> m ()
forall a.
Text
-> Text
-> (StoreHandle IO -> a -> IO (Either Text ()))
-> Result a
-> m ()
insPatch Text
"segment" Text
path StoreHandle IO -> Segment -> IO (Either Text ())
forall store (m :: * -> *).
(LaunchDarklyStoreWrite store m, Monad m) =>
store -> Segment -> StoreResultM m ()
insertSegment (Value -> Result Segment
forall a. FromJSON a => Value -> Result a
fromJSON Value
body)
| Bool
otherwise -> $(Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> m ()
(Text -> m ()) -> (Text -> Text) -> Text -> m ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
id :: forall a. a -> a
logError) Text
"unknown patch path"
Left String
err -> $(Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> m ()
(Text -> m ()) -> (Text -> Text) -> Text -> m ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
id :: forall a. a -> a
logError) (Text -> m ()) -> Text -> m ()
forall a b. (a -> b) -> a -> b
$ Text -> Text -> Text
T.append Text
"failed to parse patch generic" (String -> Text
T.pack String
err)
where
insPatch :: Text -> Text -> (StoreHandle IO -> a -> StoreResult ()) -> Result a -> m ()
insPatch :: Text
-> Text
-> (StoreHandle IO -> a -> IO (Either Text ()))
-> Result a
-> m ()
insPatch Text
name Text
_ StoreHandle IO -> a -> IO (Either Text ())
_ (Error String
err) = $(Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> m ()
(Text -> m ()) -> (Text -> Text) -> Text -> m ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
id :: forall a. a -> a
logError) (Text -> m ()) -> Text -> m ()
forall a b. (a -> b) -> a -> b
$ [Text] -> Text
T.concat [Text
"failed to parse patch ", Text
name, Text
": ", String -> Text
T.pack String
err]
insPatch Text
name Text
path StoreHandle IO -> a -> IO (Either Text ())
insert (Success a
item) = do
$(Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> m ()
(Text -> m ()) -> (Text -> Text) -> Text -> m ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
id :: forall a. a -> a
logInfo) (Text -> m ()) -> Text -> m ()
forall a b. (a -> b) -> a -> b
$ [Text] -> Text
T.concat [Text
"patching ", Text
name, Text
" with path: ", Text
path]
Either Text ()
status <- IO (Either Text ()) -> m (Either Text ())
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either Text ()) -> m (Either Text ()))
-> IO (Either Text ()) -> m (Either Text ())
forall a b. (a -> b) -> a -> b
$ StoreHandle IO -> a -> IO (Either Text ())
insert StoreHandle IO
store a
item
(Text -> m ()) -> (() -> m ()) -> Either Text () -> m ()
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either (\Text
err -> $(Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> m ()
(Text -> m ()) -> (Text -> Text) -> Text -> m ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
id :: forall a. a -> a
logError) (Text -> m ()) -> Text -> m ()
forall a b. (a -> b) -> a -> b
$ [Text] -> Text
T.concat [Text
"store failed ", Text
name, Text
" patch: ", Text
err]) () -> m ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure Either Text ()
status
processDelete :: forall m. (MonadIO m, MonadLogger m) => StoreHandle IO -> L.ByteString -> m ()
processDelete :: StoreHandle IO -> ByteString -> m ()
processDelete StoreHandle IO
store ByteString
value = case ByteString -> Either String PathVersion
forall a. FromJSON a => ByteString -> Either String a
eitherDecode ByteString
value :: Either String PathVersion of
Right (PathVersion Text
path Natural
version)
| Text -> Text -> Bool
T.isPrefixOf Text
"/flags/" Text
path -> Text -> Text -> IO (Either Text ()) -> m ()
logDelete Text
"flag" Text
path (StoreHandle IO -> Text -> Natural -> IO (Either Text ())
forall store (m :: * -> *).
(LaunchDarklyStoreWrite store m, Monad m) =>
store -> Text -> Natural -> StoreResultM m ()
deleteFlag StoreHandle IO
store (Int -> Text -> Text
T.drop Int
7 Text
path) Natural
version)
| Text -> Text -> Bool
T.isPrefixOf Text
"/segments/" Text
path -> Text -> Text -> IO (Either Text ()) -> m ()
logDelete Text
"segment" Text
path (StoreHandle IO -> Text -> Natural -> IO (Either Text ())
forall store (m :: * -> *).
(LaunchDarklyStoreWrite store m, Monad m) =>
store -> Text -> Natural -> StoreResultM m ()
deleteSegment StoreHandle IO
store (Int -> Text -> Text
T.drop Int
10 Text
path) Natural
version)
| Bool
otherwise -> $(Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> m ()
(Text -> m ()) -> (Text -> Text) -> Text -> m ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
id :: forall a. a -> a
logError) Text
"unknown delete path"
Left String
err -> $(Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> m ()
(Text -> m ()) -> (Text -> Text) -> Text -> m ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
id :: forall a. a -> a
logError) (Text -> m ()) -> Text -> m ()
forall a b. (a -> b) -> a -> b
$ Text -> Text -> Text
T.append Text
"failed to parse delete" (String -> Text
T.pack String
err)
where logDelete :: Text -> Text -> StoreResult () -> m ()
logDelete :: Text -> Text -> IO (Either Text ()) -> m ()
logDelete Text
name Text
path IO (Either Text ())
action = do
$(Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> m ()
(Text -> m ()) -> (Text -> Text) -> Text -> m ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
id :: forall a. a -> a
logInfo) (Text -> m ()) -> Text -> m ()
forall a b. (a -> b) -> a -> b
$ [Text] -> Text
T.concat [Text
"deleting ", Text
name, Text
" with path: ", Text
path]
Either Text ()
status <- IO (Either Text ()) -> m (Either Text ())
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (Either Text ())
action
(Text -> m ()) -> (() -> m ()) -> Either Text () -> m ()
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either (\Text
err -> $(Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> m ()
(Text -> m ()) -> (Text -> Text) -> Text -> m ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
id :: forall a. a -> a
logError) (Text -> m ()) -> Text -> m ()
forall a b. (a -> b) -> a -> b
$ [Text] -> Text
T.concat [Text
"store failed ", Text
name, Text
" delete: ", Text
err]) () -> m ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure Either Text ()
status
processEvent :: (MonadIO m, MonadLogger m) => StoreHandle IO -> Text -> L.ByteString -> m ()
processEvent :: StoreHandle IO -> Text -> ByteString -> m ()
processEvent StoreHandle IO
store Text
name ByteString
value = case Text
name of
Text
"put" -> StoreHandle IO -> ByteString -> m ()
forall (m :: * -> *).
(MonadIO m, MonadLogger m) =>
StoreHandle IO -> ByteString -> m ()
processPut StoreHandle IO
store ByteString
value
Text
"patch" -> StoreHandle IO -> ByteString -> m ()
forall (m :: * -> *).
(MonadIO m, MonadLogger m) =>
StoreHandle IO -> ByteString -> m ()
processPatch StoreHandle IO
store ByteString
value
Text
"delete" -> StoreHandle IO -> ByteString -> m ()
forall (m :: * -> *).
(MonadIO m, MonadLogger m) =>
StoreHandle IO -> ByteString -> m ()
processDelete StoreHandle IO
store ByteString
value
Text
_ -> $(Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> m ()
(Text -> m ()) -> (Text -> Text) -> Text -> m ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
id :: forall a. a -> a
logWarn) Text
"unknown event type"
data ReadE = ReadETimeout | ReadEClosed deriving (Int -> ReadE -> ShowS
[ReadE] -> ShowS
ReadE -> String
(Int -> ReadE -> ShowS)
-> (ReadE -> String) -> ([ReadE] -> ShowS) -> Show ReadE
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [ReadE] -> ShowS
$cshowList :: [ReadE] -> ShowS
show :: ReadE -> String
$cshow :: ReadE -> String
showsPrec :: Int -> ReadE -> ShowS
$cshowsPrec :: Int -> ReadE -> ShowS
Show, Show ReadE
Typeable ReadE
Typeable ReadE
-> Show ReadE
-> (ReadE -> SomeException)
-> (SomeException -> Maybe ReadE)
-> (ReadE -> String)
-> Exception ReadE
SomeException -> Maybe ReadE
ReadE -> String
ReadE -> SomeException
forall e.
Typeable e
-> Show e
-> (e -> SomeException)
-> (SomeException -> Maybe e)
-> (e -> String)
-> Exception e
displayException :: ReadE -> String
$cdisplayException :: ReadE -> String
fromException :: SomeException -> Maybe ReadE
$cfromException :: SomeException -> Maybe ReadE
toException :: ReadE -> SomeException
$ctoException :: ReadE -> SomeException
$cp2Exception :: Show ReadE
$cp1Exception :: Typeable ReadE
Exception)
tryReadE :: MonadCatch m => m a -> m (Either ReadE a)
tryReadE :: m a -> m (Either ReadE a)
tryReadE = m a -> m (Either ReadE a)
forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> m (Either e a)
try
readWithException :: IO ByteString -> IO Text
readWithException :: IO ByteString -> IO Text
readWithException IO ByteString
body = Int -> IO ByteString -> IO (Maybe ByteString)
forall a. Int -> IO a -> IO (Maybe a)
timeout (Int
1000000 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
300) (IO ByteString -> IO ByteString
brRead IO ByteString
body) IO (Maybe ByteString) -> (Maybe ByteString -> IO Text) -> IO Text
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Maybe ByteString
Nothing -> ReadE -> IO Text
forall e a. Exception e => e -> IO a
throwIO ReadE
ReadETimeout
Just ByteString
bytes -> if ByteString
bytes ByteString -> ByteString -> Bool
forall a. Eq a => a -> a -> Bool
== ByteString
B.empty then ReadE -> IO Text
forall e a. Exception e => e -> IO a
throwIO ReadE
ReadEClosed else Text -> IO Text
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ByteString -> Text
decodeUtf8 ByteString
bytes)
readStream :: (MonadIO m, MonadLogger m, MonadMask m) => IO ByteString -> StoreHandle IO -> m ()
readStream :: IO ByteString -> StoreHandle IO -> m ()
readStream IO ByteString
body StoreHandle IO
store = Text -> m ()
loop Text
"" where
loop :: Text -> m ()
loop Text
initial = m (Result SSE) -> m (Either ReadE (Result SSE))
forall (m :: * -> *) a. MonadCatch m => m a -> m (Either ReadE a)
tryReadE (m Text -> Parser SSE -> Text -> m (Result SSE)
forall (m :: * -> *) a.
Monad m =>
m Text -> Parser a -> Text -> m (Result a)
parseWith (IO Text -> m Text
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Text -> m Text) -> IO Text -> m Text
forall a b. (a -> b) -> a -> b
$ IO ByteString -> IO Text
readWithException IO ByteString
body) Parser SSE
parseEvent Text
initial) m (Either ReadE (Result SSE))
-> (Either ReadE (Result SSE) -> m ()) -> m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
(Left ReadE
ReadETimeout) -> $(Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> m ()
(Text -> m ()) -> (Text -> Text) -> Text -> m ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
id :: forall a. a -> a
logError) Text
"streaming connection unexpectedly closed"
(Left ReadE
ReadEClosed) -> $(Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> m ()
(Text -> m ()) -> (Text -> Text) -> Text -> m ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
id :: forall a. a -> a
logError) Text
"timeout waiting for SSE event"
(Right Result SSE
parsed) -> case Result SSE
parsed of
Done Text
remaining SSE
event -> do
StoreHandle IO -> Text -> ByteString -> m ()
forall (m :: * -> *).
(MonadIO m, MonadLogger m) =>
StoreHandle IO -> Text -> ByteString -> m ()
processEvent StoreHandle IO
store (SSE -> Text
name SSE
event) (ByteString -> ByteString
L.fromStrict (ByteString -> ByteString) -> ByteString -> ByteString
forall a b. (a -> b) -> a -> b
$ Text -> ByteString
encodeUtf8 (Text -> ByteString) -> Text -> ByteString
forall a b. (a -> b) -> a -> b
$ SSE -> Text
buffer SSE
event)
Text -> m ()
loop Text
remaining
Fail Text
_ [String]
context String
err ->
$(Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> m ()
(Text -> m ()) -> (Text -> Text) -> Text -> m ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
id :: forall a. a -> a
logError) (Text -> m ()) -> Text -> m ()
forall a b. (a -> b) -> a -> b
$ Text -> [Text] -> Text
T.intercalate Text
" " [Text
"failed parsing SSE frame", String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ [String] -> String
forall a. Show a => a -> String
show [String]
context, String -> Text
T.pack String
err]
Partial Text -> Result SSE
_ -> $(Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> m ()
(Text -> m ()) -> (Text -> Text) -> Text -> m ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
id :: forall a. a -> a
logError) Text
"failed parsing SSE frame unexpected partial"
retryPolicy :: MonadIO m => RetryPolicyM m
retryPolicy :: RetryPolicyM m
retryPolicy = Int -> RetryPolicyM m -> RetryPolicyM m
forall (m :: * -> *).
Monad m =>
Int -> RetryPolicyM m -> RetryPolicyM m
capDelay (Int
30 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
1000000) (RetryPolicyM m -> RetryPolicyM m)
-> RetryPolicyM m -> RetryPolicyM m
forall a b. (a -> b) -> a -> b
$ Int -> RetryPolicyM m
forall (m :: * -> *). MonadIO m => Int -> RetryPolicyM m
fullJitterBackoff (Int
1 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
1000000)
data Failure = FailurePermanent | FailureTemporary | FailureReset deriving (Failure -> Failure -> Bool
(Failure -> Failure -> Bool)
-> (Failure -> Failure -> Bool) -> Eq Failure
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: Failure -> Failure -> Bool
$c/= :: Failure -> Failure -> Bool
== :: Failure -> Failure -> Bool
$c== :: Failure -> Failure -> Bool
Eq)
handleStream :: (MonadIO m, MonadLogger m, MonadMask m) => Manager -> Request -> StoreHandle IO -> RetryStatus -> m Failure
handleStream :: Manager -> Request -> StoreHandle IO -> RetryStatus -> m Failure
handleStream Manager
manager Request
request StoreHandle IO
store RetryStatus
_ = do
$(Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> m ()
(Text -> m ()) -> (Text -> Text) -> Text -> m ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
id :: forall a. a -> a
logDebug) Text
"starting new streaming connection"
TimeSpec
startTime <- IO TimeSpec -> m TimeSpec
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO TimeSpec -> m TimeSpec) -> IO TimeSpec -> m TimeSpec
forall a b. (a -> b) -> a -> b
$ Clock -> IO TimeSpec
getTime Clock
Monotonic
Either HttpException ()
status <- m () -> m (Either HttpException ())
forall (m :: * -> *) a.
MonadCatch m =>
m a -> m (Either HttpException a)
tryHTTP (m () -> m (Either HttpException ()))
-> m () -> m (Either HttpException ())
forall a b. (a -> b) -> a -> b
$ Request -> Manager -> (Response (IO ByteString) -> m ()) -> m ()
forall (m :: * -> *) a.
(MonadIO m, MonadMask m) =>
Request -> Manager -> (Response (IO ByteString) -> m a) -> m a
withResponseGeneric Request
request Manager
manager ((Response (IO ByteString) -> m ()) -> m ())
-> (Response (IO ByteString) -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \Response (IO ByteString)
response -> Response (IO ByteString) -> m ()
forall (m :: * -> *) body. MonadThrow m => Response body -> m ()
checkAuthorization Response (IO ByteString)
response m () -> m () -> m ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>>
if Response (IO ByteString) -> Status
forall body. Response body -> Status
responseStatus Response (IO ByteString)
response Status -> Status -> Bool
forall a. Eq a => a -> a -> Bool
/= Status
ok200 then Request -> Response (IO ByteString) -> m ()
forall (m :: * -> *).
MonadIO m =>
Request -> Response (IO ByteString) -> m ()
throwErrorStatusCodes Request
request Response (IO ByteString)
response else
IO ByteString -> StoreHandle IO -> m ()
forall (m :: * -> *).
(MonadIO m, MonadLogger m, MonadMask m) =>
IO ByteString -> StoreHandle IO -> m ()
readStream (Response (IO ByteString) -> IO ByteString
forall body. Response body -> body
responseBody Response (IO ByteString)
response) StoreHandle IO
store
case Either HttpException ()
status of
(Right ()) -> IO TimeSpec -> m TimeSpec
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Clock -> IO TimeSpec
getTime Clock
Monotonic) m TimeSpec -> (TimeSpec -> m Failure) -> m Failure
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \TimeSpec
now -> if TimeSpec
now TimeSpec -> TimeSpec -> Bool
forall a. Ord a => a -> a -> Bool
>= TimeSpec
startTime TimeSpec -> TimeSpec -> TimeSpec
forall a. Num a => a -> a -> a
+ (Int64 -> Int64 -> TimeSpec
TimeSpec Int64
60 Int64
0)
then do
$(Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> m ()
(Text -> m ()) -> (Text -> Text) -> Text -> m ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
id :: forall a. a -> a
logError) Text
"streaming connection closed after 60 seconds, retrying instantly"
Failure -> m Failure
forall (f :: * -> *) a. Applicative f => a -> f a
pure Failure
FailureReset
else do
$(Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> m ()
(Text -> m ()) -> (Text -> Text) -> Text -> m ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
id :: forall a. a -> a
logError) Text
"streaming connection closed before 60 seconds, retrying after backoff"
Failure -> m Failure
forall (f :: * -> *) a. Applicative f => a -> f a
pure Failure
FailureTemporary
(Left HttpException
err) -> do
$(Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> m ()
(Text -> m ()) -> (Text -> Text) -> Text -> m ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
id :: forall a. a -> a
logError) (Text -> [Text] -> Text
T.intercalate Text
" " [Text
"HTTP Exception", String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ HttpException -> String
forall a. Show a => a -> String
show HttpException
err])
Failure -> m Failure
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Failure -> m Failure) -> Failure -> m Failure
forall a b. (a -> b) -> a -> b
$ case HttpException
err of
(InvalidUrlException String
_ String
_) -> Failure
FailurePermanent
(HttpExceptionRequest Request
_ (StatusCodeException Response ()
response ByteString
_)) ->
let code :: Int
code = Status -> Int
statusCode (Response () -> Status
forall body. Response body -> Status
responseStatus Response ()
response) in if Int
code Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
400 Bool -> Bool -> Bool
&& Int
code Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
500
then if Int -> [Int] -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
elem Int
code [Int
400, Int
408, Int
429] then Failure
FailureTemporary else Failure
FailurePermanent
else Failure
FailureTemporary
HttpException
_ -> Failure
FailureTemporary
streamingThread :: (MonadIO m, MonadLogger m, MonadMask m) => Manager -> ClientI -> m ()
streamingThread :: Manager -> ClientI -> m ()
streamingThread Manager
manager ClientI
client = do
let config :: ConfigI
config = ClientI -> ConfigI
forall (f :: Symbol) a s. HasField' f s a => s -> a
getField @"config" ClientI
client; store :: StoreHandle IO
store = ClientI -> StoreHandle IO
forall (f :: Symbol) a s. HasField' f s a => s -> a
getField @"store" ClientI
client;
Request
req <- ConfigI -> Request -> Request
prepareRequest ConfigI
config (Request -> Request) -> m Request -> m Request
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO Request -> m Request
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (String -> IO Request
forall (m :: * -> *). MonadThrow m => String -> m Request
parseRequest (String -> IO Request) -> String -> IO Request
forall a b. (a -> b) -> a -> b
$ Text -> String
T.unpack (ConfigI -> Text
forall (f :: Symbol) a s. HasField' f s a => s -> a
getField @"streamURI" ConfigI
config) String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
"/all")
ClientI -> m () -> m ()
forall (m :: * -> *) a.
(MonadIO m, MonadLogger m, MonadCatch m) =>
ClientI -> m a -> m ()
tryAuthorized ClientI
client (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ (m () -> m ()) -> m ()
forall a. (a -> a) -> a
fix ((m () -> m ()) -> m ()) -> (m () -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \m ()
loop ->
RetryPolicyM m
-> (RetryStatus -> Failure -> m Bool)
-> (RetryStatus -> m Failure)
-> m Failure
forall (m :: * -> *) b.
MonadIO m =>
RetryPolicyM m
-> (RetryStatus -> b -> m Bool) -> (RetryStatus -> m b) -> m b
retrying RetryPolicyM m
forall (m :: * -> *). MonadIO m => RetryPolicyM m
retryPolicy (\RetryStatus
_ Failure
status -> Bool -> m Bool
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Bool -> m Bool) -> Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ Failure
status Failure -> Failure -> Bool
forall a. Eq a => a -> a -> Bool
== Failure
FailureTemporary) (Manager -> Request -> StoreHandle IO -> RetryStatus -> m Failure
forall (m :: * -> *).
(MonadIO m, MonadLogger m, MonadMask m) =>
Manager -> Request -> StoreHandle IO -> RetryStatus -> m Failure
handleStream Manager
manager Request
req StoreHandle IO
store) m Failure -> (Failure -> m ()) -> m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>=
\case Failure
FailureReset -> m ()
loop; Failure
_ -> () -> m ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()