Safe Haskell | None |
---|---|
Language | Haskell2010 |
Primary module containing everything needed to create stateful functions with Flink.
All stateful functions should have a single record type that represents the entire internal state
of the function. Stateful functions API provides many "slots" to store state, but for the purposes of this library
that is hardcoded to the single key flink_state
which you can see in the example module.yaml
.
The Serde typeclass abstracts serialization away from the library so that users can decide how
state should be serialized. Aeson is very convenient so I use it in the example, but protobuf or any other
binary format is also acceptable. Flink essentially stores function state as an opaque ByteString
regardless.
When running your program don't forget to pass +RTS -N
to your binary to run on all cores.
Synopsis
- class MonadIO m => StatefulFunc s m | m -> s where
- insideCtx :: (s -> a) -> m a
- getCtx :: m s
- setCtx :: s -> m ()
- modifyCtx :: (s -> s) -> m ()
- sendEgressMsg :: Message a => (Text, Text) -> a -> m ()
- sendMsg :: Serde a => Address -> a -> m ()
- sendMsgDelay :: Serde a => Address -> NominalDiffTime -> a -> m ()
- flinkWrapper :: forall a s. (Serde a, Serde s) => s -> Expiration -> (a -> Function s ()) -> FuncExec
- createApp :: FunctionTable -> Application
- flinkServer :: FunctionTable -> Server FlinkApi
- flinkApi :: Proxy FlinkApi
- data Address where
- data FuncType = FuncType Text Text
- class Serde a where
- tpName :: Proxy a -> Text
- deserializeBytes :: ByteString -> Either String a
- serializeBytes :: a -> ByteString
- type FunctionTable = Map FuncType FuncExec
- newtype JsonSerde a = JsonSerde {
- getJson :: a
- newtype ProtoSerde a = ProtoSerde {
- getProto :: a
- data Expiration = Expiration {}
- data ExpirationMode
- jsonState :: Json s => Function s () -> Function (JsonSerde s) ()
- protoState :: Message s => Function s () -> Function (ProtoSerde s) ()
- sendProtoMsg :: (StatefulFunc s m, Message a) => Address -> a -> m ()
- sendProtoMsgDelay :: (StatefulFunc s m, Message a) => Address -> NominalDiffTime -> a -> m ()
Documentation
class MonadIO m => StatefulFunc s m | m -> s where Source #
Used to represent all Flink stateful function capabilities.
Contexts are received from Flink and deserializeBytesd into s
all modifications to state are shipped back to Flink at the end of the
batch to be persisted.
Message passing is also queued up and passed back at the end of the current batch.
setInitialCtx, insideCtx, getCtx, setCtx, modifyCtx, sendEgressMsg, sendMsg, sendMsgDelay
insideCtx :: (s -> a) -> m a Source #
modifyCtx :: (s -> s) -> m () Source #
:: Message a | |
=> (Text, Text) | egress address (namespace, type) |
-> a | protobuf message to send (should be a Kafka or Kinesis protobuf record) |
-> m () |
:: Serde a | |
=> Address | Function address (namespace, type, id) |
-> NominalDiffTime | delay before message send |
-> a | message to send |
-> m () | returns cancelation token with which delivery of the message could be canceled |
Instances
StatefulFunc s (Function s) Source # | |
Defined in Network.Flink.Internal.Stateful setInitialCtx :: s -> Function s () insideCtx :: (s -> a) -> Function s a Source # getCtx :: Function s s Source # setCtx :: s -> Function s () Source # modifyCtx :: (s -> s) -> Function s () Source # sendEgressMsg :: Message a => (Text, Text) -> a -> Function s () Source # sendMsg :: Serde a => Address -> a -> Function s () Source # sendMsgDelay :: Serde a => Address -> NominalDiffTime -> a -> Function s () Source # |
flinkWrapper :: forall a s. (Serde a, Serde s) => s -> Expiration -> (a -> Function s ()) -> FuncExec Source #
Takes a function taking an arbitrary state type and converts it to take ByteString
s.
This allows each function in the FunctionTable
to take its own individual type of state and just expose
a function accepting ByteString
to the library code.
createApp :: FunctionTable -> Application Source #
Takes function table and creates a wai Application
to serve flink requests
flinkServer :: FunctionTable -> Server FlinkApi Source #
Takes function table and creates a servant Server
to serve flink requests
tpName :: Proxy a -> Text Source #
Type name
deserializeBytes :: ByteString -> Either String a Source #
decodes types from strict ByteString
s
serializeBytes :: a -> ByteString Source #
encodes types to strict ByteString
s
Instances
type FunctionTable = Map FuncType FuncExec Source #
newtype ProtoSerde a Source #
ProtoSerde | |
|
Instances
Functor ProtoSerde Source # | |
Defined in Network.Flink.Internal.Stateful fmap :: (a -> b) -> ProtoSerde a -> ProtoSerde b # (<$) :: a -> ProtoSerde b -> ProtoSerde a # | |
Message a => Serde (ProtoSerde a) Source # | |
Defined in Network.Flink.Internal.Stateful tpName :: Proxy (ProtoSerde a) -> Text Source # deserializeBytes :: ByteString -> Either String (ProtoSerde a) Source # serializeBytes :: ProtoSerde a -> ByteString Source # |
data Expiration Source #
Instances
Eq Expiration Source # | |
Defined in Network.Flink.Internal.Stateful (==) :: Expiration -> Expiration -> Bool # (/=) :: Expiration -> Expiration -> Bool # | |
Show Expiration Source # | |
Defined in Network.Flink.Internal.Stateful showsPrec :: Int -> Expiration -> ShowS # show :: Expiration -> String # showList :: [Expiration] -> ShowS # |
data ExpirationMode Source #
Instances
Eq ExpirationMode Source # | |
Defined in Network.Flink.Internal.Stateful (==) :: ExpirationMode -> ExpirationMode -> Bool # (/=) :: ExpirationMode -> ExpirationMode -> Bool # | |
Show ExpirationMode Source # | |
Defined in Network.Flink.Internal.Stateful showsPrec :: Int -> ExpirationMode -> ShowS # show :: ExpirationMode -> String # showList :: [ExpirationMode] -> ShowS # |
jsonState :: Json s => Function s () -> Function (JsonSerde s) () Source #
Convenience function for wrapping state in newtype for JSON serialization
protoState :: Message s => Function s () -> Function (ProtoSerde s) () Source #
Convenience function for wrapping state in newtype for Protobuf serialization
sendProtoMsg :: (StatefulFunc s m, Message a) => Address -> a -> m () Source #
Convinience function to send protobuf messages
sendProtoMsgDelay :: (StatefulFunc s m, Message a) => Address -> NominalDiffTime -> a -> m () Source #
Convinience function to send delayed protobuf messages