Safe Haskell | None |
---|---|
Language | Haskell2010 |
Network.Flink.Internal.Stateful
Synopsis
- class MonadIO m => StatefulFunc s m | m -> s where
- insideCtx :: (s -> a) -> m a
- getCtx :: m s
- setCtx :: s -> m ()
- modifyCtx :: (s -> s) -> m ()
- sendEgressMsg :: Message a => (Text, Text) -> a -> m ()
- sendMsg :: Serde a => Address -> a -> m ()
- sendMsgDelay :: Serde a => Address -> NominalDiffTime -> a -> m ()
- flinkWrapper :: forall a s. (Serde a, Serde s) => s -> Expiration -> (a -> Function s ()) -> FuncExec
- createApp :: FunctionTable -> Application
- flinkServer :: FunctionTable -> Server FlinkApi
- flinkApi :: Proxy FlinkApi
- data Address where
- data FuncType = FuncType Text Text
- newtype Function s a = Function {
- runFunction :: ExceptT FlinkError (StateT (FunctionState s) (ReaderT Env IO)) a
- class Serde a where
- tpName :: Proxy a -> Text
- deserializeBytes :: ByteString -> Either String a
- serializeBytes :: a -> ByteString
- data FunctionState ctx = FunctionState {}
- data FlinkError
- type FunctionTable = Map FuncType FuncExec
- data Env = Env {}
- data Expiration = Expiration {}
- data ExpirationMode
- newState :: a -> FunctionState a
- newtype ProtoSerde a = ProtoSerde {
- getProto :: a
- newtype JsonSerde a = JsonSerde {
- getJson :: a
- jsonState :: Json s => Function s () -> Function (JsonSerde s) ()
- protoState :: Message s => Function s () -> Function (ProtoSerde s) ()
- sendProtoMsg :: (StatefulFunc s m, Message a) => Address -> a -> m ()
- sendProtoMsgDelay :: (StatefulFunc s m, Message a) => Address -> NominalDiffTime -> a -> m ()
Documentation
class MonadIO m => StatefulFunc s m | m -> s where Source #
Used to represent all Flink stateful function capabilities.
Contexts are received from Flink and deserializeBytesd into s
all modifications to state are shipped back to Flink at the end of the
batch to be persisted.
Message passing is also queued up and passed back at the end of the current batch.
Minimal complete definition
setInitialCtx, insideCtx, getCtx, setCtx, modifyCtx, sendEgressMsg, sendMsg, sendMsgDelay
Methods
insideCtx :: (s -> a) -> m a Source #
modifyCtx :: (s -> s) -> m () Source #
Arguments
:: Message a | |
=> (Text, Text) | egress address (namespace, type) |
-> a | protobuf message to send (should be a Kafka or Kinesis protobuf record) |
-> m () |
Arguments
:: Serde a | |
=> Address | Function address (namespace, type, id) |
-> NominalDiffTime | delay before message send |
-> a | message to send |
-> m () | returns cancelation token with which delivery of the message could be canceled |
Instances
StatefulFunc s (Function s) Source # | |
Defined in Network.Flink.Internal.Stateful Methods setInitialCtx :: s -> Function s () insideCtx :: (s -> a) -> Function s a Source # getCtx :: Function s s Source # setCtx :: s -> Function s () Source # modifyCtx :: (s -> s) -> Function s () Source # sendEgressMsg :: Message a => (Text, Text) -> a -> Function s () Source # sendMsg :: Serde a => Address -> a -> Function s () Source # sendMsgDelay :: Serde a => Address -> NominalDiffTime -> a -> Function s () Source # |
flinkWrapper :: forall a s. (Serde a, Serde s) => s -> Expiration -> (a -> Function s ()) -> FuncExec Source #
Takes a function taking an arbitrary state type and converts it to take ByteString
s.
This allows each function in the FunctionTable
to take its own individual type of state and just expose
a function accepting ByteString
to the library code.
createApp :: FunctionTable -> Application Source #
Takes function table and creates a wai Application
to serve flink requests
flinkServer :: FunctionTable -> Server FlinkApi Source #
Takes function table and creates a servant Server
to serve flink requests
Monad stack used for the execution of a Flink stateful function Don't reference this directly in your code if possible
Constructors
Function | |
Fields
|
Instances
Methods
tpName :: Proxy a -> Text Source #
Type name
deserializeBytes :: ByteString -> Either String a Source #
decodes types from strict ByteString
s
serializeBytes :: a -> ByteString Source #
encodes types to strict ByteString
s
Instances
data FunctionState ctx Source #
Constructors
FunctionState | |
Instances
Functor FunctionState Source # | |
Defined in Network.Flink.Internal.Stateful Methods fmap :: (a -> b) -> FunctionState a -> FunctionState b # (<$) :: a -> FunctionState b -> FunctionState a # | |
Show ctx => Show (FunctionState ctx) Source # | |
Defined in Network.Flink.Internal.Stateful Methods showsPrec :: Int -> FunctionState ctx -> ShowS # show :: FunctionState ctx -> String # showList :: [FunctionState ctx] -> ShowS # | |
MonadState (FunctionState s) (Function s) Source # | |
Defined in Network.Flink.Internal.Stateful Methods get :: Function s (FunctionState s) # put :: FunctionState s -> Function s () # state :: (FunctionState s -> (a, FunctionState s)) -> Function s a # |
data FlinkError Source #
Constructors
Instances
Eq FlinkError Source # | |
Defined in Network.Flink.Internal.Stateful | |
Show FlinkError Source # | |
Defined in Network.Flink.Internal.Stateful Methods showsPrec :: Int -> FlinkError -> ShowS # show :: FlinkError -> String # showList :: [FlinkError] -> ShowS # | |
MonadError FlinkError (Function s) Source # | |
Defined in Network.Flink.Internal.Stateful Methods throwError :: FlinkError -> Function s a # catchError :: Function s a -> (FlinkError -> Function s a) -> Function s a # |
type FunctionTable = Map FuncType FuncExec Source #
data Expiration Source #
Constructors
Expiration | |
Fields |
Instances
Eq Expiration Source # | |
Defined in Network.Flink.Internal.Stateful | |
Show Expiration Source # | |
Defined in Network.Flink.Internal.Stateful Methods showsPrec :: Int -> Expiration -> ShowS # show :: Expiration -> String # showList :: [Expiration] -> ShowS # |
data ExpirationMode Source #
Constructors
NONE | |
AFTER_WRITE | |
AFTER_CALL |
Instances
Eq ExpirationMode Source # | |
Defined in Network.Flink.Internal.Stateful Methods (==) :: ExpirationMode -> ExpirationMode -> Bool # (/=) :: ExpirationMode -> ExpirationMode -> Bool # | |
Show ExpirationMode Source # | |
Defined in Network.Flink.Internal.Stateful Methods showsPrec :: Int -> ExpirationMode -> ShowS # show :: ExpirationMode -> String # showList :: [ExpirationMode] -> ShowS # |
newState :: a -> FunctionState a Source #
newtype ProtoSerde a Source #
Constructors
ProtoSerde | |
Fields
|
Instances
Functor ProtoSerde Source # | |
Defined in Network.Flink.Internal.Stateful Methods fmap :: (a -> b) -> ProtoSerde a -> ProtoSerde b # (<$) :: a -> ProtoSerde b -> ProtoSerde a # | |
Message a => Serde (ProtoSerde a) Source # | |
Defined in Network.Flink.Internal.Stateful Methods tpName :: Proxy (ProtoSerde a) -> Text Source # deserializeBytes :: ByteString -> Either String (ProtoSerde a) Source # serializeBytes :: ProtoSerde a -> ByteString Source # |
jsonState :: Json s => Function s () -> Function (JsonSerde s) () Source #
Convenience function for wrapping state in newtype for JSON serialization
protoState :: Message s => Function s () -> Function (ProtoSerde s) () Source #
Convenience function for wrapping state in newtype for Protobuf serialization
sendProtoMsg :: (StatefulFunc s m, Message a) => Address -> a -> m () Source #
Convinience function to send protobuf messages
sendProtoMsgDelay :: (StatefulFunc s m, Message a) => Address -> NominalDiffTime -> a -> m () Source #
Convinience function to send delayed protobuf messages