| Safe Haskell | None |
|---|---|
| Language | Haskell2010 |
Network.Flink.Internal.Stateful
Synopsis
- class MonadIO m => StatefulFunc s m | m -> s where
- insideCtx :: (s -> a) -> m a
- getCtx :: m s
- setCtx :: s -> m ()
- modifyCtx :: (s -> s) -> m ()
- sendMsg :: Message a => (Text, Text, Text) -> a -> m ()
- sendMsgDelay :: Message a => (Text, Text, Text) -> Int -> a -> m ()
- sendEgressMsg :: Message a => (Text, Text) -> a -> m ()
- sendByteMsg :: Serde a => (Text, Text, Text) -> a -> m ()
- sendByteMsgDelay :: Serde a => (Text, Text, Text) -> Int -> a -> m ()
- flinkWrapper :: Serde s => (Any -> Function s ()) -> ByteString -> Env -> ToFunction'InvocationBatchRequest -> IO (Either FlinkError (FunctionState ByteString))
- createApp :: FunctionTable -> Application
- flinkServer :: FunctionTable -> Server FlinkApi
- flinkApi :: Proxy FlinkApi
- newtype Function s a = Function {
- runFunction :: ExceptT FlinkError (StateT (FunctionState s) (ReaderT Env IO)) a
- class Serde a where
- deserializeBytes :: ByteString -> Either String a
- serializeBytes :: a -> ByteString
- data FunctionState ctx = FunctionState {}
- data FlinkError
- type FunctionTable = Map (Text, Text) (ByteString, ByteString -> Env -> ToFunction'InvocationBatchRequest -> IO (Either FlinkError (FunctionState ByteString)))
- data Env = Env {}
- newState :: a -> FunctionState a
- newtype ProtoSerde a = ProtoSerde {
- getMessage :: a
- newtype JsonSerde a = JsonSerde {
- getJson :: a
- jsonState :: Json s => (a -> Function s ()) -> a -> Function (JsonSerde s) ()
- protoState :: Message s => (a -> Function s ()) -> a -> Function (ProtoSerde s) ()
- serdeInput :: (Serde s, Serde a, StatefulFunc s m, MonadError FlinkError m, MonadReader Env m) => (a -> m b) -> Any -> m b
- protoInput :: (Serde s, Message a, StatefulFunc s m, MonadError FlinkError m, MonadReader Env m) => (a -> m b) -> Any -> m b
- jsonInput :: (Serde s, Json a, StatefulFunc s m, MonadError FlinkError m, MonadReader Env m) => (a -> m b) -> Any -> m b
Documentation
class MonadIO m => StatefulFunc s m | m -> s where Source #
Used to represent all Flink stateful function capabilities.
Contexts are received from Flink and deserializeBytesd into s
all modifications to state are shipped back to Flink at the end of the
batch to be persisted.
Message passing is also queued up and passed back at the end of the current batch.
Minimal complete definition
setInitialCtx, insideCtx, getCtx, setCtx, modifyCtx, sendMsg, sendMsgDelay, sendEgressMsg, sendByteMsg, sendByteMsgDelay
Methods
insideCtx :: (s -> a) -> m a Source #
modifyCtx :: (s -> s) -> m () Source #
Arguments
| :: Message a | |
| => (Text, Text, Text) | Function address (namespace, type, id) |
| -> a | protobuf message to send |
| -> m () |
Arguments
| :: Message a | |
| => (Text, Text, Text) | Function address (namespace, type, id) |
| -> Int | delay before message send |
| -> a | protobuf message to send |
| -> m () |
Arguments
| :: Message a | |
| => (Text, Text) | egress address (namespace, type) |
| -> a | protobuf message to send (should be a Kafka or Kinesis protobuf record) |
| -> m () |
Instances
| StatefulFunc s (Function s) Source # | |
Defined in Network.Flink.Internal.Stateful Methods setInitialCtx :: s -> Function s () insideCtx :: (s -> a) -> Function s a Source # getCtx :: Function s s Source # setCtx :: s -> Function s () Source # modifyCtx :: (s -> s) -> Function s () Source # sendMsg :: Message a => (Text, Text, Text) -> a -> Function s () Source # sendMsgDelay :: Message a => (Text, Text, Text) -> Int -> a -> Function s () Source # sendEgressMsg :: Message a => (Text, Text) -> a -> Function s () Source # sendByteMsg :: Serde a => (Text, Text, Text) -> a -> Function s () Source # sendByteMsgDelay :: Serde a => (Text, Text, Text) -> Int -> a -> Function s () Source # | |
flinkWrapper :: Serde s => (Any -> Function s ()) -> ByteString -> Env -> ToFunction'InvocationBatchRequest -> IO (Either FlinkError (FunctionState ByteString)) Source #
Takes a function taking an arbitrary state type and converts it to take ByteStrings.
This allows each function in the FunctionTable to take its own individual type of state and just expose
a function accepting ByteString to the library code.
createApp :: FunctionTable -> Application Source #
Takes function table and creates a wai Application to serve flink requests
flinkServer :: FunctionTable -> Server FlinkApi Source #
Takes function table and creates a servant Server to serve flink requests
Monad stack used for the execution of a Flink stateful function Don't reference this directly in your code if possible
Constructors
| Function | |
Fields
| |
Instances
Methods
deserializeBytes :: ByteString -> Either String a Source #
decodes types from strict ByteStrings
serializeBytes :: a -> ByteString Source #
encodes types to strict ByteStrings
Instances
| Serde () Source # | |
Defined in Network.Flink.Internal.Stateful Methods deserializeBytes :: ByteString -> Either String () Source # serializeBytes :: () -> ByteString Source # | |
| Serde ByteString Source # | |
Defined in Network.Flink.Internal.Stateful Methods deserializeBytes :: ByteString -> Either String ByteString Source # | |
| Serde ByteString Source # | |
Defined in Network.Flink.Internal.Stateful Methods deserializeBytes :: ByteString0 -> Either String ByteString Source # | |
| Json a => Serde (JsonSerde a) Source # | |
Defined in Network.Flink.Internal.Stateful Methods deserializeBytes :: ByteString -> Either String (JsonSerde a) Source # serializeBytes :: JsonSerde a -> ByteString Source # | |
| Message a => Serde (ProtoSerde a) Source # | |
Defined in Network.Flink.Internal.Stateful Methods deserializeBytes :: ByteString -> Either String (ProtoSerde a) Source # serializeBytes :: ProtoSerde a -> ByteString Source # | |
data FunctionState ctx Source #
Constructors
| FunctionState | |
Instances
| Functor FunctionState Source # | |
Defined in Network.Flink.Internal.Stateful Methods fmap :: (a -> b) -> FunctionState a -> FunctionState b # (<$) :: a -> FunctionState b -> FunctionState a # | |
| Show ctx => Show (FunctionState ctx) Source # | |
Defined in Network.Flink.Internal.Stateful Methods showsPrec :: Int -> FunctionState ctx -> ShowS # show :: FunctionState ctx -> String # showList :: [FunctionState ctx] -> ShowS # | |
| MonadState (FunctionState s) (Function s) Source # | |
Defined in Network.Flink.Internal.Stateful Methods get :: Function s (FunctionState s) # put :: FunctionState s -> Function s () # state :: (FunctionState s -> (a, FunctionState s)) -> Function s a # | |
data FlinkError Source #
Constructors
| MissingInvocationBatch | |
| ProtodeserializeBytesError String | |
| StateDecodeError String | |
| MessageDecodeError String | |
| ProtoMessageDecodeError UnpackError | |
| NoSuchFunction (Text, Text) |
Instances
| Eq FlinkError Source # | |
Defined in Network.Flink.Internal.Stateful | |
| Show FlinkError Source # | |
Defined in Network.Flink.Internal.Stateful Methods showsPrec :: Int -> FlinkError -> ShowS # show :: FlinkError -> String # showList :: [FlinkError] -> ShowS # | |
| MonadError FlinkError (Function s) Source # | |
Defined in Network.Flink.Internal.Stateful Methods throwError :: FlinkError -> Function s a # catchError :: Function s a -> (FlinkError -> Function s a) -> Function s a # | |
type FunctionTable = Map (Text, Text) (ByteString, ByteString -> Env -> ToFunction'InvocationBatchRequest -> IO (Either FlinkError (FunctionState ByteString))) Source #
newState :: a -> FunctionState a Source #
newtype ProtoSerde a Source #
Constructors
| ProtoSerde | |
Fields
| |
Instances
| Functor ProtoSerde Source # | |
Defined in Network.Flink.Internal.Stateful Methods fmap :: (a -> b) -> ProtoSerde a -> ProtoSerde b # (<$) :: a -> ProtoSerde b -> ProtoSerde a # | |
| Message a => Serde (ProtoSerde a) Source # | |
Defined in Network.Flink.Internal.Stateful Methods deserializeBytes :: ByteString -> Either String (ProtoSerde a) Source # serializeBytes :: ProtoSerde a -> ByteString Source # | |
Instances
| Functor JsonSerde Source # | |
| Json a => Serde (JsonSerde a) Source # | |
Defined in Network.Flink.Internal.Stateful Methods deserializeBytes :: ByteString -> Either String (JsonSerde a) Source # serializeBytes :: JsonSerde a -> ByteString Source # | |
jsonState :: Json s => (a -> Function s ()) -> a -> Function (JsonSerde s) () Source #
Convenience function for wrapping state in newtype for JSON serialization
protoState :: Message s => (a -> Function s ()) -> a -> Function (ProtoSerde s) () Source #
Convenience function for wrapping state in newtype for Protobuf serialization
serdeInput :: (Serde s, Serde a, StatefulFunc s m, MonadError FlinkError m, MonadReader Env m) => (a -> m b) -> Any -> m b Source #
Deserializes input messages as arbitrary bytes by extracting them out of the protobuf Any and ignoring the type since that's protobuf specific
protoInput :: (Serde s, Message a, StatefulFunc s m, MonadError FlinkError m, MonadReader Env m) => (a -> m b) -> Any -> m b Source #
Deserializes input messages by unpacking the protobuf Any into the expected type.
If you are passing messages via protobuf, this is much more typesafe than serdeInput
jsonInput :: (Serde s, Json a, StatefulFunc s m, MonadError FlinkError m, MonadReader Env m) => (a -> m b) -> Any -> m b Source #
Deserializes input messages as arbitrary bytes by extracting them out of the protobuf Any and ignoring the type since that's protobuf specific