flink-statefulfun-0.3.0.0: Flink stateful functions SDK
Safe HaskellNone
LanguageHaskell2010

Network.Flink.Internal.Stateful

Synopsis

Documentation

class MonadIO m => StatefulFunc s m | m -> s where Source #

Used to represent all Flink stateful function capabilities.

Contexts are received from Flink and deserializeBytesd into s all modifications to state are shipped back to Flink at the end of the batch to be persisted.

Message passing is also queued up and passed back at the end of the current batch.

Methods

insideCtx :: (s -> a) -> m a Source #

getCtx :: m s Source #

setCtx :: s -> m () Source #

modifyCtx :: (s -> s) -> m () Source #

sendMsg Source #

Arguments

:: Message a 
=> (Text, Text, Text)

Function address (namespace, type, id)

-> a

protobuf message to send

-> m () 

sendMsgDelay Source #

Arguments

:: Message a 
=> (Text, Text, Text)

Function address (namespace, type, id)

-> Int

delay before message send

-> a

protobuf message to send

-> m () 

sendEgressMsg Source #

Arguments

:: Message a 
=> (Text, Text)

egress address (namespace, type)

-> a

protobuf message to send (should be a Kafka or Kinesis protobuf record)

-> m () 

sendByteMsg Source #

Arguments

:: Serde a 
=> (Text, Text, Text)

Function address (namespace, type, id)

-> a

message to send

-> m () 

sendByteMsgDelay Source #

Arguments

:: Serde a 
=> (Text, Text, Text)

Function address (namespace, type, id)

-> Int

delay before message send

-> a

message to send

-> m () 

Instances

Instances details
StatefulFunc s (Function s) Source # 
Instance details

Defined in Network.Flink.Internal.Stateful

Methods

setInitialCtx :: s -> Function s ()

insideCtx :: (s -> a) -> Function s a Source #

getCtx :: Function s s Source #

setCtx :: s -> Function s () Source #

modifyCtx :: (s -> s) -> Function s () Source #

sendMsg :: Message a => (Text, Text, Text) -> a -> Function s () Source #

sendMsgDelay :: Message a => (Text, Text, Text) -> Int -> a -> Function s () Source #

sendEgressMsg :: Message a => (Text, Text) -> a -> Function s () Source #

sendByteMsg :: Serde a => (Text, Text, Text) -> a -> Function s () Source #

sendByteMsgDelay :: Serde a => (Text, Text, Text) -> Int -> a -> Function s () Source #

flinkWrapper :: Serde s => (Any -> Function s ()) -> ByteString -> Env -> ToFunction'InvocationBatchRequest -> IO (Either FlinkError (FunctionState ByteString)) Source #

Takes a function taking an arbitrary state type and converts it to take ByteStrings. This allows each function in the FunctionTable to take its own individual type of state and just expose a function accepting ByteString to the library code.

createApp :: FunctionTable -> Application Source #

Takes function table and creates a wai Application to serve flink requests

flinkServer :: FunctionTable -> Server FlinkApi Source #

Takes function table and creates a servant Server to serve flink requests

flinkApi :: Proxy FlinkApi Source #

newtype Function s a Source #

Monad stack used for the execution of a Flink stateful function Don't reference this directly in your code if possible

Instances

Instances details
MonadReader Env (Function s) Source # 
Instance details

Defined in Network.Flink.Internal.Stateful

Methods

ask :: Function s Env #

local :: (Env -> Env) -> Function s a -> Function s a #

reader :: (Env -> a) -> Function s a #

MonadError FlinkError (Function s) Source # 
Instance details

Defined in Network.Flink.Internal.Stateful

Methods

throwError :: FlinkError -> Function s a #

catchError :: Function s a -> (FlinkError -> Function s a) -> Function s a #

StatefulFunc s (Function s) Source # 
Instance details

Defined in Network.Flink.Internal.Stateful

Methods

setInitialCtx :: s -> Function s ()

insideCtx :: (s -> a) -> Function s a Source #

getCtx :: Function s s Source #

setCtx :: s -> Function s () Source #

modifyCtx :: (s -> s) -> Function s () Source #

sendMsg :: Message a => (Text, Text, Text) -> a -> Function s () Source #

sendMsgDelay :: Message a => (Text, Text, Text) -> Int -> a -> Function s () Source #

sendEgressMsg :: Message a => (Text, Text) -> a -> Function s () Source #

sendByteMsg :: Serde a => (Text, Text, Text) -> a -> Function s () Source #

sendByteMsgDelay :: Serde a => (Text, Text, Text) -> Int -> a -> Function s () Source #

Monad (Function s) Source # 
Instance details

Defined in Network.Flink.Internal.Stateful

Methods

(>>=) :: Function s a -> (a -> Function s b) -> Function s b #

(>>) :: Function s a -> Function s b -> Function s b #

return :: a -> Function s a #

Functor (Function s) Source # 
Instance details

Defined in Network.Flink.Internal.Stateful

Methods

fmap :: (a -> b) -> Function s a -> Function s b #

(<$) :: a -> Function s b -> Function s a #

Applicative (Function s) Source # 
Instance details

Defined in Network.Flink.Internal.Stateful

Methods

pure :: a -> Function s a #

(<*>) :: Function s (a -> b) -> Function s a -> Function s b #

liftA2 :: (a -> b -> c) -> Function s a -> Function s b -> Function s c #

(*>) :: Function s a -> Function s b -> Function s b #

(<*) :: Function s a -> Function s b -> Function s a #

MonadIO (Function s) Source # 
Instance details

Defined in Network.Flink.Internal.Stateful

Methods

liftIO :: IO a -> Function s a #

MonadState (FunctionState s) (Function s) Source # 
Instance details

Defined in Network.Flink.Internal.Stateful

Methods

get :: Function s (FunctionState s) #

put :: FunctionState s -> Function s () #

state :: (FunctionState s -> (a, FunctionState s)) -> Function s a #

data Env Source #

Instances

Instances details
Show Env Source # 
Instance details

Defined in Network.Flink.Internal.Stateful

Methods

showsPrec :: Int -> Env -> ShowS #

show :: Env -> String #

showList :: [Env] -> ShowS #

MonadReader Env (Function s) Source # 
Instance details

Defined in Network.Flink.Internal.Stateful

Methods

ask :: Function s Env #

local :: (Env -> Env) -> Function s a -> Function s a #

reader :: (Env -> a) -> Function s a #

newtype ProtoSerde a Source #

Constructors

ProtoSerde 

Fields

Instances

Instances details
Functor ProtoSerde Source # 
Instance details

Defined in Network.Flink.Internal.Stateful

Methods

fmap :: (a -> b) -> ProtoSerde a -> ProtoSerde b #

(<$) :: a -> ProtoSerde b -> ProtoSerde a #

Message a => Serde (ProtoSerde a) Source # 
Instance details

Defined in Network.Flink.Internal.Stateful

newtype JsonSerde a Source #

Constructors

JsonSerde 

Fields

Instances

Instances details
Functor JsonSerde Source # 
Instance details

Defined in Network.Flink.Internal.Stateful

Methods

fmap :: (a -> b) -> JsonSerde a -> JsonSerde b #

(<$) :: a -> JsonSerde b -> JsonSerde a #

Json a => Serde (JsonSerde a) Source # 
Instance details

Defined in Network.Flink.Internal.Stateful

jsonState :: Json s => (a -> Function s ()) -> a -> Function (JsonSerde s) () Source #

Convenience function for wrapping state in newtype for JSON serialization

protoState :: Message s => (a -> Function s ()) -> a -> Function (ProtoSerde s) () Source #

Convenience function for wrapping state in newtype for Protobuf serialization

serdeInput :: (Serde s, Serde a, StatefulFunc s m, MonadError FlinkError m, MonadReader Env m) => (a -> m b) -> Any -> m b Source #

Deserializes input messages as arbitrary bytes by extracting them out of the protobuf Any and ignoring the type since that's protobuf specific

protoInput :: (Serde s, Message a, StatefulFunc s m, MonadError FlinkError m, MonadReader Env m) => (a -> m b) -> Any -> m b Source #

Deserializes input messages by unpacking the protobuf Any into the expected type. If you are passing messages via protobuf, this is much more typesafe than serdeInput

jsonInput :: (Serde s, Json a, StatefulFunc s m, MonadError FlinkError m, MonadReader Env m) => (a -> m b) -> Any -> m b Source #

Deserializes input messages as arbitrary bytes by extracting them out of the protobuf Any and ignoring the type since that's protobuf specific