flink-statefulfun-0.4.0.0: Flink stateful functions SDK
Safe HaskellNone
LanguageHaskell2010

Network.Flink.Stateful

Description

Primary module containing everything needed to create stateful functions with Flink.

All stateful functions should have a single record type that represents the entire internal state of the function. Stateful functions API provides many "slots" to store state, but for the purposes of this library that is hardcoded to the single key flink_state which you can see in the example module.yaml.

The Serde typeclass abstracts serialization away from the library so that users can decide how state should be serialized. Aeson is very convenient so I use it in the example, but protobuf or any other binary format is also acceptable. Flink essentially stores function state as an opaque ByteString regardless.

When running your program don't forget to pass +RTS -N to your binary to run on all cores.

Synopsis

Documentation

class MonadIO m => StatefulFunc s m | m -> s where Source #

Used to represent all Flink stateful function capabilities.

Contexts are received from Flink and deserializeBytesd into s all modifications to state are shipped back to Flink at the end of the batch to be persisted.

Message passing is also queued up and passed back at the end of the current batch.

Minimal complete definition

setInitialCtx, insideCtx, getCtx, setCtx, modifyCtx, sendEgressMsg, sendMsg, sendMsgDelay

Methods

insideCtx :: (s -> a) -> m a Source #

getCtx :: m s Source #

setCtx :: s -> m () Source #

modifyCtx :: (s -> s) -> m () Source #

sendEgressMsg Source #

Arguments

:: Message a 
=> (Text, Text)

egress address (namespace, type)

-> a

protobuf message to send (should be a Kafka or Kinesis protobuf record)

-> m () 

sendMsg Source #

Arguments

:: Serde a 
=> Address

Function address (namespace, type, id)

-> a

message to send

-> m () 

sendMsgDelay Source #

Arguments

:: Serde a 
=> Address

Function address (namespace, type, id)

-> NominalDiffTime

delay before message send

-> a

message to send

-> m ()

returns cancelation token with which delivery of the message could be canceled

Instances

Instances details
StatefulFunc s (Function s) Source # 
Instance details

Defined in Network.Flink.Internal.Stateful

Methods

setInitialCtx :: s -> Function s ()

insideCtx :: (s -> a) -> Function s a Source #

getCtx :: Function s s Source #

setCtx :: s -> Function s () Source #

modifyCtx :: (s -> s) -> Function s () Source #

sendEgressMsg :: Message a => (Text, Text) -> a -> Function s () Source #

sendMsg :: Serde a => Address -> a -> Function s () Source #

sendMsgDelay :: Serde a => Address -> NominalDiffTime -> a -> Function s () Source #

flinkWrapper :: forall a s. (Serde a, Serde s) => s -> Expiration -> (a -> Function s ()) -> FuncExec Source #

Takes a function taking an arbitrary state type and converts it to take ByteStrings. This allows each function in the FunctionTable to take its own individual type of state and just expose a function accepting ByteString to the library code.

createApp :: FunctionTable -> Application Source #

Takes function table and creates a wai Application to serve flink requests

flinkServer :: FunctionTable -> Server FlinkApi Source #

Takes function table and creates a servant Server to serve flink requests

flinkApi :: Proxy FlinkApi Source #

data Address Source #

Constructors

Address FuncType Text 

Bundled Patterns

pattern Address' :: Text -> Text -> Text -> Address 

data FuncType Source #

Constructors

FuncType Text Text 

Instances

Instances details
Eq FuncType Source # 
Instance details

Defined in Network.Flink.Internal.Stateful

Ord FuncType Source # 
Instance details

Defined in Network.Flink.Internal.Stateful

class Serde a where Source #

Methods

tpName :: Proxy a -> Text Source #

Type name

deserializeBytes :: ByteString -> Either String a Source #

decodes types from strict ByteStrings

serializeBytes :: a -> ByteString Source #

encodes types to strict ByteStrings

newtype JsonSerde a Source #

Constructors

JsonSerde 

Fields

Instances

Instances details
Functor JsonSerde Source # 
Instance details

Defined in Network.Flink.Internal.Stateful

Methods

fmap :: (a -> b) -> JsonSerde a -> JsonSerde b #

(<$) :: a -> JsonSerde b -> JsonSerde a #

Json a => Serde (JsonSerde a) Source # 
Instance details

Defined in Network.Flink.Internal.Stateful

newtype ProtoSerde a Source #

Constructors

ProtoSerde 

Fields

Instances

Instances details
Functor ProtoSerde Source # 
Instance details

Defined in Network.Flink.Internal.Stateful

Methods

fmap :: (a -> b) -> ProtoSerde a -> ProtoSerde b #

(<$) :: a -> ProtoSerde b -> ProtoSerde a #

Message a => Serde (ProtoSerde a) Source # 
Instance details

Defined in Network.Flink.Internal.Stateful

jsonState :: Json s => Function s () -> Function (JsonSerde s) () Source #

Convenience function for wrapping state in newtype for JSON serialization

protoState :: Message s => Function s () -> Function (ProtoSerde s) () Source #

Convenience function for wrapping state in newtype for Protobuf serialization

sendProtoMsg :: (StatefulFunc s m, Message a) => Address -> a -> m () Source #

Convinience function to send protobuf messages

sendProtoMsgDelay :: (StatefulFunc s m, Message a) => Address -> NominalDiffTime -> a -> m () Source #

Convinience function to send delayed protobuf messages