| Safe Haskell | None |
|---|---|
| Language | Haskell2010 |
Network.Flink.Stateful
Synopsis
- class MonadIO m => StatefulFunc s m | m -> s where
- makeConcrete :: (FlinkState s, Message a) => (a -> Function s ()) -> ByteString -> Env -> ToFunction'InvocationBatchRequest -> IO (Either FlinkError (FunctionState ByteString))
- createApp :: FunctionTable -> Application
- flinkServer :: FunctionTable -> Server FlinkApi
- flinkApi :: Proxy FlinkApi
- kafkaRecord :: Message v => Text -> Text -> v -> KafkaProducerRecord
- data Function s a
- class FlinkState s where
- decodeState :: ByteString -> Either String s
- encodeState :: s -> ByteString
- type FunctionTable = Map (Text, Text) (ByteString, ByteString -> Env -> ToFunction'InvocationBatchRequest -> IO (Either FlinkError (FunctionState ByteString)))
Documentation
class MonadIO m => StatefulFunc s m | m -> s where Source #
Used to represent all Flink stateful function capabilities.
Contexts are received from Flink and deserialized into s
all modifications to state are shipped back to Flink at the end of the
batch to be persisted.
Message passing is also queued up and passed back at the end of the current batch.
Example of a stateless function (done by setting s to ()) that adds one
to a number and puts the protobuf response on Kafka via an egress message:
adder :: StatefulFunc () m => AdderRequest -> m ()
adder msg = sendEgressMsg ("adder", "added") (kafkaRecord "added" name added)
where
num = msg ^. AdderRequest.num
added = defMessage & AdderResponse.num .~ (num + 1)
Example of a stateful function:
newtype GreeterState = GreeterState
{ greeterStateCount :: Int
}
deriving (Generic, Show, ToJSON, FromJSON)
instance FlinkState GreeterState where
decodeState = eitherDecode . BSL.fromStrict
encodeState = BSL.toStrict . Data.Aeson.encode
counter :: StatefulFunc GreeterState m => EX.GreeterRequest -> m ()
counter msg = do
newCount <- (+ 1) <$> insideCtx greeterStateCount
let respMsg = "Saw " <> T.unpack name <> " " <> show newCount <> " time(s)"
sendEgressMsg ("greeting", "greets") (kafkaRecord "greets" name $ response (T.pack respMsg))
modifyCtx (old -> old {greeterStateCount = newCount})
where
name = msg ^. EX.name
response :: Text -> EX.GreeterResponse
response greeting =
defMessage
& EX.greeting .~ greeting
This will respond to each event by counting how many times it has been called for the name it was passed. The final state is taken and sent back to Flink. Failures of any kind will cause state to rollback to previous values seamlessly without double counting.
Minimal complete definition
setInitialCtx, insideCtx, getCtx, setCtx, modifyCtx, sendMsg, sendMsgDelay, sendEgressMsg
Methods
insideCtx :: (s -> a) -> m a Source #
modifyCtx :: (s -> s) -> m () Source #
Arguments
| :: Message a | |
| => (Text, Text, Text) | Function address (namespace, type, id) |
| -> a | protobuf message to send |
| -> m () |
Instances
| FlinkState s => StatefulFunc s (Function s) Source # | |
Defined in Network.Flink.Stateful Methods setInitialCtx :: s -> Function s () insideCtx :: (s -> a) -> Function s a Source # getCtx :: Function s s Source # setCtx :: s -> Function s () Source # modifyCtx :: (s -> s) -> Function s () Source # sendMsg :: Message a => (Text, Text, Text) -> a -> Function s () Source # sendMsgDelay :: Message a => (Text, Text, Text) -> Int -> a -> Function s () Source # sendEgressMsg :: Message a => (Text, Text) -> a -> Function s () Source # | |
makeConcrete :: (FlinkState s, Message a) => (a -> Function s ()) -> ByteString -> Env -> ToFunction'InvocationBatchRequest -> IO (Either FlinkError (FunctionState ByteString)) Source #
Takes a function taking an abstract state/message type and converts it to take concrete ByteStrings
This allows each function in the FunctionTable to take its own individual type of state and just expose
a function accepting ByteString to the library code.
createApp :: FunctionTable -> Application Source #
Takes function table and creates a wai Application to serve flink requests
flinkServer :: FunctionTable -> Server FlinkApi Source #
Takes function table and creates a servant Server to serve flink requests
kafkaRecord :: Message v => Text -> Text -> v -> KafkaProducerRecord Source #
Takes a topic, key, and protobuf value to construct KafkaProducerRecords for egress
Monad stack used for the execution of a Flink stateful function Don't reference this directly in your code if possible
Instances
| FlinkState s => StatefulFunc s (Function s) Source # | |
Defined in Network.Flink.Stateful Methods setInitialCtx :: s -> Function s () insideCtx :: (s -> a) -> Function s a Source # getCtx :: Function s s Source # setCtx :: s -> Function s () Source # modifyCtx :: (s -> s) -> Function s () Source # sendMsg :: Message a => (Text, Text, Text) -> a -> Function s () Source # sendMsgDelay :: Message a => (Text, Text, Text) -> Int -> a -> Function s () Source # sendEgressMsg :: Message a => (Text, Text) -> a -> Function s () Source # | |
| Monad (Function s) Source # | |
| Functor (Function s) Source # | |
| Applicative (Function s) Source # | |
Defined in Network.Flink.Stateful | |
| MonadIO (Function s) Source # | |
Defined in Network.Flink.Stateful | |
class FlinkState s where Source #
Provides functions for Flink state SerDe
Methods
decodeState :: ByteString -> Either String s Source #
decodes Flink state types from strict ByteStrings
encodeState :: s -> ByteString Source #
encodes Flink state types to strict ByteStrings
Instances
| FlinkState () Source # | |
Defined in Network.Flink.Stateful Methods decodeState :: ByteString -> Either String () Source # encodeState :: () -> ByteString Source # | |
type FunctionTable = Map (Text, Text) (ByteString, ByteString -> Env -> ToFunction'InvocationBatchRequest -> IO (Either FlinkError (FunctionState ByteString))) Source #