flink-statefulfun: Flink stateful functions SDK

[ flink, library, mpl, unclassified, web ] [ Propose Tags ]

Typeclasses for serving Flink stateful functions from Haskell. Checkout the README for examples.

[Skip to Readme]
Versions [faq],,,
Change log CHANGELOG.md
Dependencies aeson (>=1.0 && <2.0), base (>=4.0 && <4.15), bytestring (==0.10.*), containers (>=0.5 && <0.7), either (>=5 && <5.1), http-media (>=0.7 && <0.9), http-types (>=0.10 && <0.13), lens-family (>=2 && <2.2), mtl (>=2 && <2.3), proto-lens (>=0.5 && <0.8), proto-lens-protobuf-types (>=0.5 && <0.8), proto-lens-runtime (>=0.5 && <0.8), servant (>=0.16 && <0.19), servant-server (>=0.16 && <0.19), text (>=1.0 && <1.3), wai (>=3 && <3.3), warp (>=3 && <3.4) [details]
License MPL-2.0
Author Timothy Bess
Maintainer tdbgamer@gmail.com
Category Flink, Web
Source repo head: git clone https://github.com/tdbgamer/flink-statefulfun-hs.git
Uploaded by tdbgamer at 2020-09-30T05:57:47Z
Distributions NixOS:
Downloads 152 total (9 in the last 30 days)
Rating (no votes yet) [estimated by Bayesian average]
Your Rating
  • λ
  • λ
  • λ
Status Hackage Matrix CI
Docs uploaded by user
Build status unknown [no reports yet]


[Index] [Quick Jump]


Maintainer's Corner

For package maintainers and hackage trustees

Readme for flink-statefulfun-

[back to package description]

Flink Stateful Functions Haskell SDK

Hackage Build Join the chat at <a href="https://gitter.im/tdbgamer/flink-statefulfun-hs">https://gitter.im/tdbgamer/flink-statefulfun-hs</a>

Provides a typed API for creating flink stateful functions. Greeter example is in example/greeter/main/Main.hs

How to run example

cd example
docker-compose build
docker-compose up -d
docker-compose logs -f event-generator

How to compile locally

  1. Install nix
  2. Install cachix
  3. Setup nix cache.
cachix use iohk
cachix use static-haskell-nix
cachix use flink-statefulfun
  1. Compile inside a nix shell.
cabal build


Define our protobuf messages

// Example.proto
syntax = "proto3";

package example;

message GreeterRequest {
  string name = 1;

message GreeterResponse {
  string greeting = 1;

Declare a function

import Network.Flink.Stateful
import qualified Proto.Example as EX
import qualified Proto.Example_Fields as EX

printer :: StatefulFunc () m => EX.GreeterResponse -> m ()
printer msg = liftIO $ print msg

This declares a simple function that takes an GreeterResponse protobuf type as an argument and simply prints it. StatefulFunc makes this a Flink stateful function with a state type of () (meaning it requires no state).

Declaring a function with state

import Data.Aeson (FromJSON, ToJSON)
import Data.Text (Text)
import qualified Data.Text as T
import GHC.Generics

newtype GreeterState = GreeterState
  { greeterStateCount :: Int
  deriving (Generic, Show)

instance ToJSON GreeterState
instance FromJSON GreeterState

counter :: StatefulFunc GreeterState m => EX.GreeterRequest -> m ()
counter msg = do
  newCount <- (+ 1) <$> insideCtx greeterStateCount
  let respMsg = "Saw " <> T.unpack name <> " " <> show newCount <> " time(s)"

  sendMsg ("printing", "printer") (response $ T.pack respMsg)
  modifyCtx (\old -> old {greeterStateCount = newCount})
    name = msg ^. EX.name
    response :: Text -> EX.GreeterResponse
    response greeting =
        & EX.greeting .~ greeting

The StatefulFunc typeclass gives us access to the GreeterState that we are sending to and from Flink on every batch of incoming messages our function receives. For every message, this function will calculate its new count, send a message to the printer function we made earlier, then update its state with the new count.

Internally this is batched over many events before sending state back to Flink for efficiency.

NOTE: For JSON (or anything other than protobuf) messages, you must use sendByteMsg instead. When communicating with other SDKs, you'll likely want to use sendMsg and protobuf.


import qualified Data.ByteString.Lazy.Char8 as BSL
import qualified Data.Map as Map
import Network.Wai.Handler.Warp (run)
import Network.Wai.Middleware.RequestLogger

main :: IO ()
main = do
  putStrLn "http://localhost:8000/"
  run 8000 (logStdout $ createApp functionTable)

greeterState = BSL.toStrict . encode $ GreeterState 0

functionTable :: FunctionTable
functionTable =
    [ (("printing", "printer"), ("", flinkWrapper . protoInput $ printer)),
      (("greeting", "counter"), (greeterState, flinkWrapper . protoInput . jsonState $ counter))

The FunctionTable is a Map of (namespace, functionType) to (initialState, wrappedFunction). jsonState is a helper to serialize your function state as JSON for storage in the flink backend. protoState can also be used if that is your preference. flinkWrapper transforms your function into one that takes arbitrary ByteStrings so that every function in the FunctionTable Map is homogenous. protoInput indicates that the input message should be deserialized as protobuf. jsonInput can be used instead to deserialize the messages as JSON.

createApp is used to turn the FunctionTable into a Warp Application which can be served using the run function provided by Warp.

NOTE: JSON messages may not play nice with other SDKs, you'll probably want to stick with protobuf if you're communicating with another SDK without knowing too much Flink Statefun internals.

Create module YAML

To use the functions that are now served from the API, we now need to declare it in the module.yaml.

version: "1.0"
    type: remote
      - function:
            kind: http
            type: greeting/counter
            endpoint: http://localhost:8000/statefun
              - flink_state
            maxNumBatchRequests: 500
            timeout: 2min
      - function:
            kind: http
            type: printing/printer
            endpoint: http://localhost:8000/statefun
            maxNumBatchRequests: 500
            timeout: 2min

Flink Statefun supports multiple states, but for simplicity the SDK just serializes the whole record and hard codes flink_state as the only state value it uses.