flink-statefulfun-0.1.0.0: Flink stateful functions SDK
Safe HaskellNone
LanguageHaskell2010

Network.Flink.Stateful

Synopsis

Documentation

class MonadIO m => StatefulFunc s m | m -> s where Source #

Used to represent all Flink stateful function capabilities.

Contexts are received from Flink and deserialized into s all modifications to state are shipped back to Flink at the end of the batch to be persisted.

Message passing is also queued up and passed back at the end of the current batch.

Example of a stateless function (done by setting s to ()) that adds one to a number and puts the protobuf response on Kafka via an egress message:

adder :: StatefulFunc () m => AdderRequest -> m ()
adder msg = sendEgressMsg ("adder", "added") (kafkaRecord "added" name added)
  where
    num = msg ^. AdderRequest.num
    added = defMessage & AdderResponse.num .~ (num + 1)

Example of a stateful function:

newtype GreeterState = GreeterState
  { greeterStateCount :: Int
  }
  deriving (Generic, Show, ToJSON, FromJSON)

instance FlinkState GreeterState where
  decodeState = eitherDecode . BSL.fromStrict
  encodeState = BSL.toStrict . Data.Aeson.encode

counter :: StatefulFunc GreeterState m => EX.GreeterRequest -> m ()
counter msg = do
  newCount <- (+ 1) <$> insideCtx greeterStateCount
  let respMsg = "Saw " <> T.unpack name <> " " <> show newCount <> " time(s)"

  sendEgressMsg ("greeting", "greets") (kafkaRecord "greets" name $ response (T.pack respMsg))
  modifyCtx (old -> old {greeterStateCount = newCount})
  where
    name = msg ^. EX.name
    response :: Text -> EX.GreeterResponse
    response greeting =
      defMessage
        & EX.greeting .~ greeting

This will respond to each event by counting how many times it has been called for the name it was passed. The final state is taken and sent back to Flink. Failures of any kind will cause state to rollback to previous values seamlessly without double counting.

Minimal complete definition

setInitialCtx, insideCtx, getCtx, setCtx, modifyCtx, sendMsg, sendMsgDelay, sendEgressMsg

Methods

insideCtx :: (s -> a) -> m a Source #

getCtx :: m s Source #

setCtx :: s -> m () Source #

modifyCtx :: (s -> s) -> m () Source #

sendMsg Source #

Arguments

:: Message a 
=> (Text, Text, Text)

Function address (namespace, type, id)

-> a

protobuf message to send

-> m () 

sendMsgDelay Source #

Arguments

:: Message a 
=> (Text, Text, Text)

Function address (namespace, type, id)

-> Int

delay before message send

-> a

protobuf message to send

-> m () 

sendEgressMsg Source #

Arguments

:: Message a 
=> (Text, Text)

egress address (namespace, type)

-> a

protobuf message to send (should be a Kafka or Kinesis protobuf record)

-> m () 

Instances

Instances details
FlinkState s => StatefulFunc s (Function s) Source # 
Instance details

Defined in Network.Flink.Stateful

Methods

setInitialCtx :: s -> Function s ()

insideCtx :: (s -> a) -> Function s a Source #

getCtx :: Function s s Source #

setCtx :: s -> Function s () Source #

modifyCtx :: (s -> s) -> Function s () Source #

sendMsg :: Message a => (Text, Text, Text) -> a -> Function s () Source #

sendMsgDelay :: Message a => (Text, Text, Text) -> Int -> a -> Function s () Source #

sendEgressMsg :: Message a => (Text, Text) -> a -> Function s () Source #

makeConcrete :: (FlinkState s, Message a) => (a -> Function s ()) -> ByteString -> Env -> ToFunction'InvocationBatchRequest -> IO (Either FlinkError (FunctionState ByteString)) Source #

Takes a function taking an abstract state/message type and converts it to take concrete ByteStrings This allows each function in the FunctionTable to take its own individual type of state and just expose a function accepting ByteString to the library code.

flinkApi :: Proxy FlinkApi Source #

kafkaRecord :: Message v => Text -> Text -> v -> KafkaProducerRecord Source #

Takes a topic, key, and protobuf value to construct KafkaProducerRecords for egress

data Function s a Source #

Monad stack used for the execution of a Flink stateful function Don't reference this directly in your code if possible

Instances

Instances details
FlinkState s => StatefulFunc s (Function s) Source # 
Instance details

Defined in Network.Flink.Stateful

Methods

setInitialCtx :: s -> Function s ()

insideCtx :: (s -> a) -> Function s a Source #

getCtx :: Function s s Source #

setCtx :: s -> Function s () Source #

modifyCtx :: (s -> s) -> Function s () Source #

sendMsg :: Message a => (Text, Text, Text) -> a -> Function s () Source #

sendMsgDelay :: Message a => (Text, Text, Text) -> Int -> a -> Function s () Source #

sendEgressMsg :: Message a => (Text, Text) -> a -> Function s () Source #

Monad (Function s) Source # 
Instance details

Defined in Network.Flink.Stateful

Methods

(>>=) :: Function s a -> (a -> Function s b) -> Function s b #

(>>) :: Function s a -> Function s b -> Function s b #

return :: a -> Function s a #

Functor (Function s) Source # 
Instance details

Defined in Network.Flink.Stateful

Methods

fmap :: (a -> b) -> Function s a -> Function s b #

(<$) :: a -> Function s b -> Function s a #

Applicative (Function s) Source # 
Instance details

Defined in Network.Flink.Stateful

Methods

pure :: a -> Function s a #

(<*>) :: Function s (a -> b) -> Function s a -> Function s b #

liftA2 :: (a -> b -> c) -> Function s a -> Function s b -> Function s c #

(*>) :: Function s a -> Function s b -> Function s b #

(<*) :: Function s a -> Function s b -> Function s a #

MonadIO (Function s) Source # 
Instance details

Defined in Network.Flink.Stateful

Methods

liftIO :: IO a -> Function s a #

class FlinkState s where Source #

Provides functions for Flink state SerDe

Methods

decodeState :: ByteString -> Either String s Source #

decodes Flink state types from strict ByteStrings

encodeState :: s -> ByteString Source #

encodes Flink state types to strict ByteStrings

Instances

Instances details
FlinkState () Source # 
Instance details

Defined in Network.Flink.Stateful