module Network.Flink.Internal.Stateful
  ( StatefulFunc
      ( insideCtx,
        getCtx,
        setCtx,
        modifyCtx,
        sendMsg,
        sendMsgDelay,
        sendEgressMsg,
        sendByteMsg,
        sendByteMsgDelay
      ),
    flinkWrapper,
    createApp,
    flinkServer,
    flinkApi,
    Function (..),
    Serde (..),
    FunctionState (..),
    FlinkError (..),
    FunctionTable,
    Env (..),
    newState,
    ProtoSerde (..),
    JsonSerde (..),
    jsonState,
    protoState,
    serdeInput,
    protoInput,
    jsonInput,
  )
where

import Control.Monad.Except
import Control.Monad.Reader
import Control.Monad.State (MonadState, StateT (..), gets, modify)
import Data.Aeson (FromJSON, ToJSON, eitherDecode, encode)
import Data.ByteString (ByteString)
import qualified Data.ByteString.Lazy.Char8 as BSL
import Data.Either.Combinators (mapLeft)
import Data.Foldable (Foldable (toList))
import Data.Map (Map)
import qualified Data.Map as Map
import Data.Maybe (listToMaybe)
import Data.ProtoLens (Message, defMessage, encodeMessage)
import Data.ProtoLens.Any (UnpackError)
import qualified Data.ProtoLens.Any as Any
import Data.ProtoLens.Encoding (decodeMessage)
import Data.ProtoLens.Prism
import Data.Sequence (Seq)
import qualified Data.Sequence as Seq
import Data.Text (Text)
import Data.Text.Lazy (fromStrict)
import qualified Data.Text.Lazy.Encoding as T
import Lens.Family2
import Network.Flink.Internal.ProtoServant (Proto)
import Proto.Google.Protobuf.Any (Any)
import qualified Proto.Google.Protobuf.Any_Fields as Any
import Proto.RequestReply (FromFunction, ToFunction)
import qualified Proto.RequestReply as PR
import qualified Proto.RequestReply_Fields as PR
import Servant

--- | Table of stateful functions `(functionNamespace, functionType) -> (initialState, function)
type FunctionTable = Map (Text, Text) (ByteString, ByteString -> Env -> PR.ToFunction'InvocationBatchRequest -> IO (Either FlinkError (FunctionState ByteString)))

data Env = Env
  { Env -> Text
envFunctionNamespace :: Text,
    Env -> Text
envFunctionType :: Text,
    Env -> Text
envFunctionId :: Text
  }
  deriving (Int -> Env -> ShowS
[Env] -> ShowS
Env -> String
(Int -> Env -> ShowS)
-> (Env -> String) -> ([Env] -> ShowS) -> Show Env
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [Env] -> ShowS
$cshowList :: [Env] -> ShowS
show :: Env -> String
$cshow :: Env -> String
showsPrec :: Int -> Env -> ShowS
$cshowsPrec :: Int -> Env -> ShowS
Show)

data FunctionState ctx = FunctionState
  { FunctionState ctx -> ctx
functionStateCtx :: ctx,
    FunctionState ctx -> Bool
functionStateMutated :: Bool,
    FunctionState ctx -> Seq FromFunction'Invocation
functionStateInvocations :: Seq PR.FromFunction'Invocation,
    FunctionState ctx -> Seq FromFunction'DelayedInvocation
functionStateDelayedInvocations :: Seq PR.FromFunction'DelayedInvocation,
    FunctionState ctx -> Seq FromFunction'EgressMessage
functionStateEgressMessages :: Seq PR.FromFunction'EgressMessage
  }
  deriving (Int -> FunctionState ctx -> ShowS
[FunctionState ctx] -> ShowS
FunctionState ctx -> String
(Int -> FunctionState ctx -> ShowS)
-> (FunctionState ctx -> String)
-> ([FunctionState ctx] -> ShowS)
-> Show (FunctionState ctx)
forall ctx. Show ctx => Int -> FunctionState ctx -> ShowS
forall ctx. Show ctx => [FunctionState ctx] -> ShowS
forall ctx. Show ctx => FunctionState ctx -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [FunctionState ctx] -> ShowS
$cshowList :: forall ctx. Show ctx => [FunctionState ctx] -> ShowS
show :: FunctionState ctx -> String
$cshow :: forall ctx. Show ctx => FunctionState ctx -> String
showsPrec :: Int -> FunctionState ctx -> ShowS
$cshowsPrec :: forall ctx. Show ctx => Int -> FunctionState ctx -> ShowS
Show, a -> FunctionState b -> FunctionState a
(a -> b) -> FunctionState a -> FunctionState b
(forall a b. (a -> b) -> FunctionState a -> FunctionState b)
-> (forall a b. a -> FunctionState b -> FunctionState a)
-> Functor FunctionState
forall a b. a -> FunctionState b -> FunctionState a
forall a b. (a -> b) -> FunctionState a -> FunctionState b
forall (f :: * -> *).
(forall a b. (a -> b) -> f a -> f b)
-> (forall a b. a -> f b -> f a) -> Functor f
<$ :: a -> FunctionState b -> FunctionState a
$c<$ :: forall a b. a -> FunctionState b -> FunctionState a
fmap :: (a -> b) -> FunctionState a -> FunctionState b
$cfmap :: forall a b. (a -> b) -> FunctionState a -> FunctionState b
Functor)

newState :: a -> FunctionState a
newState :: a -> FunctionState a
newState initialCtx :: a
initialCtx = a
-> Bool
-> Seq FromFunction'Invocation
-> Seq FromFunction'DelayedInvocation
-> Seq FromFunction'EgressMessage
-> FunctionState a
forall ctx.
ctx
-> Bool
-> Seq FromFunction'Invocation
-> Seq FromFunction'DelayedInvocation
-> Seq FromFunction'EgressMessage
-> FunctionState ctx
FunctionState a
initialCtx Bool
False Seq FromFunction'Invocation
forall a. Monoid a => a
mempty Seq FromFunction'DelayedInvocation
forall a. Monoid a => a
mempty Seq FromFunction'EgressMessage
forall a. Monoid a => a
mempty

-- | Monad stack used for the execution of a Flink stateful function
-- Don't reference this directly in your code if possible
newtype Function s a = Function {Function s a
-> ExceptT FlinkError (StateT (FunctionState s) (ReaderT Env IO)) a
runFunction :: ExceptT FlinkError (StateT (FunctionState s) (ReaderT Env IO)) a}
  deriving (Applicative (Function s)
a -> Function s a
Applicative (Function s) =>
(forall a b. Function s a -> (a -> Function s b) -> Function s b)
-> (forall a b. Function s a -> Function s b -> Function s b)
-> (forall a. a -> Function s a)
-> Monad (Function s)
Function s a -> (a -> Function s b) -> Function s b
Function s a -> Function s b -> Function s b
forall s. Applicative (Function s)
forall a. a -> Function s a
forall s a. a -> Function s a
forall a b. Function s a -> Function s b -> Function s b
forall a b. Function s a -> (a -> Function s b) -> Function s b
forall s a b. Function s a -> Function s b -> Function s b
forall s a b. Function s a -> (a -> Function s b) -> Function s b
forall (m :: * -> *).
Applicative m =>
(forall a b. m a -> (a -> m b) -> m b)
-> (forall a b. m a -> m b -> m b)
-> (forall a. a -> m a)
-> Monad m
return :: a -> Function s a
$creturn :: forall s a. a -> Function s a
>> :: Function s a -> Function s b -> Function s b
$c>> :: forall s a b. Function s a -> Function s b -> Function s b
>>= :: Function s a -> (a -> Function s b) -> Function s b
$c>>= :: forall s a b. Function s a -> (a -> Function s b) -> Function s b
$cp1Monad :: forall s. Applicative (Function s)
Monad, Functor (Function s)
a -> Function s a
Functor (Function s) =>
(forall a. a -> Function s a)
-> (forall a b.
    Function s (a -> b) -> Function s a -> Function s b)
-> (forall a b c.
    (a -> b -> c) -> Function s a -> Function s b -> Function s c)
-> (forall a b. Function s a -> Function s b -> Function s b)
-> (forall a b. Function s a -> Function s b -> Function s a)
-> Applicative (Function s)
Function s a -> Function s b -> Function s b
Function s a -> Function s b -> Function s a
Function s (a -> b) -> Function s a -> Function s b
(a -> b -> c) -> Function s a -> Function s b -> Function s c
forall s. Functor (Function s)
forall a. a -> Function s a
forall s a. a -> Function s a
forall a b. Function s a -> Function s b -> Function s a
forall a b. Function s a -> Function s b -> Function s b
forall a b. Function s (a -> b) -> Function s a -> Function s b
forall s a b. Function s a -> Function s b -> Function s a
forall s a b. Function s a -> Function s b -> Function s b
forall s a b. Function s (a -> b) -> Function s a -> Function s b
forall a b c.
(a -> b -> c) -> Function s a -> Function s b -> Function s c
forall s a b c.
(a -> b -> c) -> Function s a -> Function s b -> Function s c
forall (f :: * -> *).
Functor f =>
(forall a. a -> f a)
-> (forall a b. f (a -> b) -> f a -> f b)
-> (forall a b c. (a -> b -> c) -> f a -> f b -> f c)
-> (forall a b. f a -> f b -> f b)
-> (forall a b. f a -> f b -> f a)
-> Applicative f
<* :: Function s a -> Function s b -> Function s a
$c<* :: forall s a b. Function s a -> Function s b -> Function s a
*> :: Function s a -> Function s b -> Function s b
$c*> :: forall s a b. Function s a -> Function s b -> Function s b
liftA2 :: (a -> b -> c) -> Function s a -> Function s b -> Function s c
$cliftA2 :: forall s a b c.
(a -> b -> c) -> Function s a -> Function s b -> Function s c
<*> :: Function s (a -> b) -> Function s a -> Function s b
$c<*> :: forall s a b. Function s (a -> b) -> Function s a -> Function s b
pure :: a -> Function s a
$cpure :: forall s a. a -> Function s a
$cp1Applicative :: forall s. Functor (Function s)
Applicative, a -> Function s b -> Function s a
(a -> b) -> Function s a -> Function s b
(forall a b. (a -> b) -> Function s a -> Function s b)
-> (forall a b. a -> Function s b -> Function s a)
-> Functor (Function s)
forall a b. a -> Function s b -> Function s a
forall a b. (a -> b) -> Function s a -> Function s b
forall s a b. a -> Function s b -> Function s a
forall s a b. (a -> b) -> Function s a -> Function s b
forall (f :: * -> *).
(forall a b. (a -> b) -> f a -> f b)
-> (forall a b. a -> f b -> f a) -> Functor f
<$ :: a -> Function s b -> Function s a
$c<$ :: forall s a b. a -> Function s b -> Function s a
fmap :: (a -> b) -> Function s a -> Function s b
$cfmap :: forall s a b. (a -> b) -> Function s a -> Function s b
Functor, MonadState (FunctionState s), MonadError FlinkError, Monad (Function s)
Monad (Function s) =>
(forall a. IO a -> Function s a) -> MonadIO (Function s)
IO a -> Function s a
forall s. Monad (Function s)
forall a. IO a -> Function s a
forall s a. IO a -> Function s a
forall (m :: * -> *).
Monad m =>
(forall a. IO a -> m a) -> MonadIO m
liftIO :: IO a -> Function s a
$cliftIO :: forall s a. IO a -> Function s a
$cp1MonadIO :: forall s. Monad (Function s)
MonadIO, MonadReader Env)

class Serde a where
  -- | decodes types from strict 'ByteString's
  deserializeBytes :: ByteString -> Either String a

  -- | encodes types to strict 'ByteString's
  serializeBytes :: a -> ByteString

newtype ProtoSerde a = ProtoSerde {ProtoSerde a -> a
getMessage :: a}
  deriving (a -> ProtoSerde b -> ProtoSerde a
(a -> b) -> ProtoSerde a -> ProtoSerde b
(forall a b. (a -> b) -> ProtoSerde a -> ProtoSerde b)
-> (forall a b. a -> ProtoSerde b -> ProtoSerde a)
-> Functor ProtoSerde
forall a b. a -> ProtoSerde b -> ProtoSerde a
forall a b. (a -> b) -> ProtoSerde a -> ProtoSerde b
forall (f :: * -> *).
(forall a b. (a -> b) -> f a -> f b)
-> (forall a b. a -> f b -> f a) -> Functor f
<$ :: a -> ProtoSerde b -> ProtoSerde a
$c<$ :: forall a b. a -> ProtoSerde b -> ProtoSerde a
fmap :: (a -> b) -> ProtoSerde a -> ProtoSerde b
$cfmap :: forall a b. (a -> b) -> ProtoSerde a -> ProtoSerde b
Functor)

instance Message a => Serde (ProtoSerde a) where
  deserializeBytes :: ByteString -> Either String (ProtoSerde a)
deserializeBytes a :: ByteString
a = a -> ProtoSerde a
forall a. a -> ProtoSerde a
ProtoSerde (a -> ProtoSerde a)
-> Either String a -> Either String (ProtoSerde a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ByteString -> Either String a
forall msg. Message msg => ByteString -> Either String msg
decodeMessage ByteString
a
  serializeBytes :: ProtoSerde a -> ByteString
serializeBytes (ProtoSerde a :: a
a) = a -> ByteString
forall msg. Message msg => msg -> ByteString
encodeMessage a
a

type Json a = (FromJSON a, ToJSON a)

newtype JsonSerde a = JsonSerde {JsonSerde a -> a
getJson :: a}
  deriving (a -> JsonSerde b -> JsonSerde a
(a -> b) -> JsonSerde a -> JsonSerde b
(forall a b. (a -> b) -> JsonSerde a -> JsonSerde b)
-> (forall a b. a -> JsonSerde b -> JsonSerde a)
-> Functor JsonSerde
forall a b. a -> JsonSerde b -> JsonSerde a
forall a b. (a -> b) -> JsonSerde a -> JsonSerde b
forall (f :: * -> *).
(forall a b. (a -> b) -> f a -> f b)
-> (forall a b. a -> f b -> f a) -> Functor f
<$ :: a -> JsonSerde b -> JsonSerde a
$c<$ :: forall a b. a -> JsonSerde b -> JsonSerde a
fmap :: (a -> b) -> JsonSerde a -> JsonSerde b
$cfmap :: forall a b. (a -> b) -> JsonSerde a -> JsonSerde b
Functor)

instance Json a => Serde (JsonSerde a) where
  deserializeBytes :: ByteString -> Either String (JsonSerde a)
deserializeBytes a :: ByteString
a = a -> JsonSerde a
forall a. a -> JsonSerde a
JsonSerde (a -> JsonSerde a)
-> Either String a -> Either String (JsonSerde a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ByteString -> Either String a
forall a. FromJSON a => ByteString -> Either String a
eitherDecode (ByteString -> ByteString
BSL.fromStrict ByteString
a)
  serializeBytes :: JsonSerde a -> ByteString
serializeBytes (JsonSerde a :: a
a) = ByteString -> ByteString
BSL.toStrict (ByteString -> ByteString) -> ByteString -> ByteString
forall a b. (a -> b) -> a -> b
$ a -> ByteString
forall a. ToJSON a => a -> ByteString
encode a
a

instance Serde () where
  deserializeBytes :: ByteString -> Either String ()
deserializeBytes _ = () -> Either String ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
  serializeBytes :: () -> ByteString
serializeBytes _ = ""

instance Serde ByteString where
  deserializeBytes :: ByteString -> Either String ByteString
deserializeBytes = ByteString -> Either String ByteString
forall (f :: * -> *) a. Applicative f => a -> f a
pure
  serializeBytes :: ByteString -> ByteString
serializeBytes = ByteString -> ByteString
forall a. a -> a
id

instance Serde BSL.ByteString where
  deserializeBytes :: ByteString -> Either String ByteString
deserializeBytes = ByteString -> Either String ByteString
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ByteString -> Either String ByteString)
-> (ByteString -> ByteString)
-> ByteString
-> Either String ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> ByteString
BSL.fromStrict
  serializeBytes :: ByteString -> ByteString
serializeBytes = ByteString -> ByteString
BSL.toStrict

-- | Used to represent all Flink stateful function capabilities.
--
-- Contexts are received from Flink and deserializeBytesd into `s`
-- all modifications to state are shipped back to Flink at the end of the
-- batch to be persisted.
--
-- Message passing is also queued up and passed back at the end of the current
-- batch.
class MonadIO m => StatefulFunc s m | m -> s where
  -- Internal
  setInitialCtx :: s -> m ()

  -- Public
  insideCtx :: (s -> a) -> m a
  getCtx :: m s
  setCtx :: s -> m ()
  modifyCtx :: (s -> s) -> m ()
  sendMsg ::
    Message a =>
    -- | Function address (namespace, type, id)
    (Text, Text, Text) ->
    -- | protobuf message to send
    a ->
    m ()
  sendMsgDelay ::
    Message a =>
    -- | Function address (namespace, type, id)
    (Text, Text, Text) ->
    -- | delay before message send
    Int ->
    -- | protobuf message to send
    a ->
    m ()
  sendEgressMsg ::
    Message a =>
    -- | egress address (namespace, type)
    (Text, Text) ->
    -- | protobuf message to send (should be a Kafka or Kinesis protobuf record)
    a ->
    m ()

  sendByteMsg ::
    Serde a =>
    -- | Function address (namespace, type, id)
    (Text, Text, Text) ->
    -- | message to send
    a ->
    m ()
  sendByteMsgDelay ::
    Serde a =>
    -- | Function address (namespace, type, id)
    (Text, Text, Text) ->
    -- | delay before message send
    Int ->
    -- | message to send
    a ->
    m ()

instance StatefulFunc s (Function s) where
  setInitialCtx :: s -> Function s ()
setInitialCtx ctx :: s
ctx = (FunctionState s -> FunctionState s) -> Function s ()
forall s (m :: * -> *). MonadState s m => (s -> s) -> m ()
modify (\old :: FunctionState s
old -> FunctionState s
old {functionStateCtx :: s
functionStateCtx = s
ctx})

  insideCtx :: (s -> a) -> Function s a
insideCtx func :: s -> a
func = s -> a
func (s -> a) -> Function s s -> Function s a
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Function s s
forall s (m :: * -> *). StatefulFunc s m => m s
getCtx
  getCtx :: Function s s
getCtx = (FunctionState s -> s) -> Function s s
forall s (m :: * -> *) a. MonadState s m => (s -> a) -> m a
gets FunctionState s -> s
forall ctx. FunctionState ctx -> ctx
functionStateCtx
  setCtx :: s -> Function s ()
setCtx new :: s
new = (FunctionState s -> FunctionState s) -> Function s ()
forall s (m :: * -> *). MonadState s m => (s -> s) -> m ()
modify (\old :: FunctionState s
old -> FunctionState s
old {functionStateCtx :: s
functionStateCtx = s
new, functionStateMutated :: Bool
functionStateMutated = Bool
True})
  modifyCtx :: (s -> s) -> Function s ()
modifyCtx mutator :: s -> s
mutator = s -> s
mutator (s -> s) -> Function s s -> Function s s
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Function s s
forall s (m :: * -> *). StatefulFunc s m => m s
getCtx Function s s -> (s -> Function s ()) -> Function s ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= s -> Function s ()
forall s (m :: * -> *). StatefulFunc s m => s -> m ()
setCtx
  sendMsg :: (Text, Text, Text) -> a -> Function s ()
sendMsg (namespace :: Text
namespace, funcType :: Text
funcType, id' :: Text
id') msg :: a
msg = do
    Seq FromFunction'Invocation
invocations <- (FunctionState s -> Seq FromFunction'Invocation)
-> Function s (Seq FromFunction'Invocation)
forall s (m :: * -> *) a. MonadState s m => (s -> a) -> m a
gets FunctionState s -> Seq FromFunction'Invocation
forall ctx. FunctionState ctx -> Seq FromFunction'Invocation
functionStateInvocations
    (FunctionState s -> FunctionState s) -> Function s ()
forall s (m :: * -> *). MonadState s m => (s -> s) -> m ()
modify (\old :: FunctionState s
old -> FunctionState s
old {functionStateInvocations :: Seq FromFunction'Invocation
functionStateInvocations = Seq FromFunction'Invocation
invocations Seq FromFunction'Invocation
-> FromFunction'Invocation -> Seq FromFunction'Invocation
forall a. Seq a -> a -> Seq a
Seq.:|> FromFunction'Invocation
invocation})
    where
      target :: PR.Address
      target :: Address
target =
        Address
forall msg. Message msg => msg
defMessage
          Address -> (Address -> Address) -> Address
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *). Identical f => LensLike' f Address Text
forall (f :: * -> *) s a.
(Functor f, HasField s "namespace" a) =>
LensLike' f s a
PR.namespace (forall (f :: * -> *). Identical f => LensLike' f Address Text)
-> Text -> Address -> Address
forall s t a b. Setter s t a b -> b -> s -> t
.~ Text
namespace
          Address -> (Address -> Address) -> Address
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *). Identical f => LensLike' f Address Text
forall (f :: * -> *) s a.
(Functor f, HasField s "type'" a) =>
LensLike' f s a
PR.type' (forall (f :: * -> *). Identical f => LensLike' f Address Text)
-> Text -> Address -> Address
forall s t a b. Setter s t a b -> b -> s -> t
.~ Text
funcType
          Address -> (Address -> Address) -> Address
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *). Identical f => LensLike' f Address Text
forall (f :: * -> *) s a.
(Functor f, HasField s "id" a) =>
LensLike' f s a
PR.id (forall (f :: * -> *). Identical f => LensLike' f Address Text)
-> Text -> Address -> Address
forall s t a b. Setter s t a b -> b -> s -> t
.~ Text
id'
      invocation :: PR.FromFunction'Invocation
      invocation :: FromFunction'Invocation
invocation =
        FromFunction'Invocation
forall msg. Message msg => msg
defMessage
          FromFunction'Invocation
-> (FromFunction'Invocation -> FromFunction'Invocation)
-> FromFunction'Invocation
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *).
Identical f =>
LensLike' f FromFunction'Invocation Address
forall (f :: * -> *) s a.
(Functor f, HasField s "target" a) =>
LensLike' f s a
PR.target (forall (f :: * -> *).
 Identical f =>
 LensLike' f FromFunction'Invocation Address)
-> Address -> FromFunction'Invocation -> FromFunction'Invocation
forall s t a b. Setter s t a b -> b -> s -> t
.~ Address
target
          FromFunction'Invocation
-> (FromFunction'Invocation -> FromFunction'Invocation)
-> FromFunction'Invocation
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *).
Identical f =>
LensLike' f FromFunction'Invocation Any
forall (f :: * -> *) s a.
(Functor f, HasField s "argument" a) =>
LensLike' f s a
PR.argument (forall (f :: * -> *).
 Identical f =>
 LensLike' f FromFunction'Invocation Any)
-> Any -> FromFunction'Invocation -> FromFunction'Invocation
forall s t a b. Setter s t a b -> b -> s -> t
.~ a -> Any
forall a. Message a => a -> Any
Any.pack a
msg
  sendMsgDelay :: (Text, Text, Text) -> Int -> a -> Function s ()
sendMsgDelay (namespace :: Text
namespace, funcType :: Text
funcType, id' :: Text
id') delay :: Int
delay msg :: a
msg = do
    Seq FromFunction'DelayedInvocation
invocations <- (FunctionState s -> Seq FromFunction'DelayedInvocation)
-> Function s (Seq FromFunction'DelayedInvocation)
forall s (m :: * -> *) a. MonadState s m => (s -> a) -> m a
gets FunctionState s -> Seq FromFunction'DelayedInvocation
forall ctx. FunctionState ctx -> Seq FromFunction'DelayedInvocation
functionStateDelayedInvocations
    (FunctionState s -> FunctionState s) -> Function s ()
forall s (m :: * -> *). MonadState s m => (s -> s) -> m ()
modify (\old :: FunctionState s
old -> FunctionState s
old {functionStateDelayedInvocations :: Seq FromFunction'DelayedInvocation
functionStateDelayedInvocations = Seq FromFunction'DelayedInvocation
invocations Seq FromFunction'DelayedInvocation
-> FromFunction'DelayedInvocation
-> Seq FromFunction'DelayedInvocation
forall a. Seq a -> a -> Seq a
Seq.:|> FromFunction'DelayedInvocation
invocation})
    where
      target :: PR.Address
      target :: Address
target =
        Address
forall msg. Message msg => msg
defMessage
          Address -> (Address -> Address) -> Address
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *). Identical f => LensLike' f Address Text
forall (f :: * -> *) s a.
(Functor f, HasField s "namespace" a) =>
LensLike' f s a
PR.namespace (forall (f :: * -> *). Identical f => LensLike' f Address Text)
-> Text -> Address -> Address
forall s t a b. Setter s t a b -> b -> s -> t
.~ Text
namespace
          Address -> (Address -> Address) -> Address
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *). Identical f => LensLike' f Address Text
forall (f :: * -> *) s a.
(Functor f, HasField s "type'" a) =>
LensLike' f s a
PR.type' (forall (f :: * -> *). Identical f => LensLike' f Address Text)
-> Text -> Address -> Address
forall s t a b. Setter s t a b -> b -> s -> t
.~ Text
funcType
          Address -> (Address -> Address) -> Address
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *). Identical f => LensLike' f Address Text
forall (f :: * -> *) s a.
(Functor f, HasField s "id" a) =>
LensLike' f s a
PR.id (forall (f :: * -> *). Identical f => LensLike' f Address Text)
-> Text -> Address -> Address
forall s t a b. Setter s t a b -> b -> s -> t
.~ Text
id'
      invocation :: PR.FromFunction'DelayedInvocation
      invocation :: FromFunction'DelayedInvocation
invocation =
        FromFunction'DelayedInvocation
forall msg. Message msg => msg
defMessage
          FromFunction'DelayedInvocation
-> (FromFunction'DelayedInvocation
    -> FromFunction'DelayedInvocation)
-> FromFunction'DelayedInvocation
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *).
Identical f =>
LensLike' f FromFunction'DelayedInvocation Int64
forall (f :: * -> *) s a.
(Functor f, HasField s "delayInMs" a) =>
LensLike' f s a
PR.delayInMs (forall (f :: * -> *).
 Identical f =>
 LensLike' f FromFunction'DelayedInvocation Int64)
-> Int64
-> FromFunction'DelayedInvocation
-> FromFunction'DelayedInvocation
forall s t a b. Setter s t a b -> b -> s -> t
.~ Int -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
delay
          FromFunction'DelayedInvocation
-> (FromFunction'DelayedInvocation
    -> FromFunction'DelayedInvocation)
-> FromFunction'DelayedInvocation
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *).
Identical f =>
LensLike' f FromFunction'DelayedInvocation Address
forall (f :: * -> *) s a.
(Functor f, HasField s "target" a) =>
LensLike' f s a
PR.target (forall (f :: * -> *).
 Identical f =>
 LensLike' f FromFunction'DelayedInvocation Address)
-> Address
-> FromFunction'DelayedInvocation
-> FromFunction'DelayedInvocation
forall s t a b. Setter s t a b -> b -> s -> t
.~ Address
target
          FromFunction'DelayedInvocation
-> (FromFunction'DelayedInvocation
    -> FromFunction'DelayedInvocation)
-> FromFunction'DelayedInvocation
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *).
Identical f =>
LensLike' f FromFunction'DelayedInvocation Any
forall (f :: * -> *) s a.
(Functor f, HasField s "argument" a) =>
LensLike' f s a
PR.argument (forall (f :: * -> *).
 Identical f =>
 LensLike' f FromFunction'DelayedInvocation Any)
-> Any
-> FromFunction'DelayedInvocation
-> FromFunction'DelayedInvocation
forall s t a b. Setter s t a b -> b -> s -> t
.~ a -> Any
forall a. Message a => a -> Any
Any.pack a
msg
  sendEgressMsg :: (Text, Text) -> a -> Function s ()
sendEgressMsg (namespace :: Text
namespace, egressType :: Text
egressType) msg :: a
msg = do
    Seq FromFunction'EgressMessage
egresses <- (FunctionState s -> Seq FromFunction'EgressMessage)
-> Function s (Seq FromFunction'EgressMessage)
forall s (m :: * -> *) a. MonadState s m => (s -> a) -> m a
gets FunctionState s -> Seq FromFunction'EgressMessage
forall ctx. FunctionState ctx -> Seq FromFunction'EgressMessage
functionStateEgressMessages
    (FunctionState s -> FunctionState s) -> Function s ()
forall s (m :: * -> *). MonadState s m => (s -> s) -> m ()
modify (\old :: FunctionState s
old -> FunctionState s
old {functionStateEgressMessages :: Seq FromFunction'EgressMessage
functionStateEgressMessages = Seq FromFunction'EgressMessage
egresses Seq FromFunction'EgressMessage
-> FromFunction'EgressMessage -> Seq FromFunction'EgressMessage
forall a. Seq a -> a -> Seq a
Seq.:|> FromFunction'EgressMessage
egressMsg})
    where
      egressMsg :: PR.FromFunction'EgressMessage
      egressMsg :: FromFunction'EgressMessage
egressMsg =
        FromFunction'EgressMessage
forall msg. Message msg => msg
defMessage
          FromFunction'EgressMessage
-> (FromFunction'EgressMessage -> FromFunction'EgressMessage)
-> FromFunction'EgressMessage
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *).
Identical f =>
LensLike' f FromFunction'EgressMessage Text
forall (f :: * -> *) s a.
(Functor f, HasField s "egressNamespace" a) =>
LensLike' f s a
PR.egressNamespace (forall (f :: * -> *).
 Identical f =>
 LensLike' f FromFunction'EgressMessage Text)
-> Text -> FromFunction'EgressMessage -> FromFunction'EgressMessage
forall s t a b. Setter s t a b -> b -> s -> t
.~ Text
namespace
          FromFunction'EgressMessage
-> (FromFunction'EgressMessage -> FromFunction'EgressMessage)
-> FromFunction'EgressMessage
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *).
Identical f =>
LensLike' f FromFunction'EgressMessage Text
forall (f :: * -> *) s a.
(Functor f, HasField s "egressType" a) =>
LensLike' f s a
PR.egressType (forall (f :: * -> *).
 Identical f =>
 LensLike' f FromFunction'EgressMessage Text)
-> Text -> FromFunction'EgressMessage -> FromFunction'EgressMessage
forall s t a b. Setter s t a b -> b -> s -> t
.~ Text
egressType
          FromFunction'EgressMessage
-> (FromFunction'EgressMessage -> FromFunction'EgressMessage)
-> FromFunction'EgressMessage
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *).
Identical f =>
LensLike' f FromFunction'EgressMessage Any
forall (f :: * -> *) s a.
(Functor f, HasField s "argument" a) =>
LensLike' f s a
PR.argument (forall (f :: * -> *).
 Identical f =>
 LensLike' f FromFunction'EgressMessage Any)
-> Any -> FromFunction'EgressMessage -> FromFunction'EgressMessage
forall s t a b. Setter s t a b -> b -> s -> t
.~ a -> Any
forall a. Message a => a -> Any
Any.pack a
msg
  sendByteMsg :: (Text, Text, Text) -> a -> Function s ()
sendByteMsg (namespace :: Text
namespace, funcType :: Text
funcType, id' :: Text
id') msg :: a
msg = do
    Seq FromFunction'Invocation
invocations <- (FunctionState s -> Seq FromFunction'Invocation)
-> Function s (Seq FromFunction'Invocation)
forall s (m :: * -> *) a. MonadState s m => (s -> a) -> m a
gets FunctionState s -> Seq FromFunction'Invocation
forall ctx. FunctionState ctx -> Seq FromFunction'Invocation
functionStateInvocations
    (FunctionState s -> FunctionState s) -> Function s ()
forall s (m :: * -> *). MonadState s m => (s -> s) -> m ()
modify (\old :: FunctionState s
old -> FunctionState s
old {functionStateInvocations :: Seq FromFunction'Invocation
functionStateInvocations = Seq FromFunction'Invocation
invocations Seq FromFunction'Invocation
-> FromFunction'Invocation -> Seq FromFunction'Invocation
forall a. Seq a -> a -> Seq a
Seq.:|> FromFunction'Invocation
invocation})
    where
      argument :: Any
      argument :: Any
argument = Any
forall msg. Message msg => msg
defMessage Any -> (Any -> Any) -> Any
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *). Identical f => LensLike' f Any ByteString
forall (f :: * -> *) s a.
(Functor f, HasField s "value" a) =>
LensLike' f s a
Any.value (forall (f :: * -> *). Identical f => LensLike' f Any ByteString)
-> ByteString -> Any -> Any
forall s t a b. Setter s t a b -> b -> s -> t
.~ a -> ByteString
forall a. Serde a => a -> ByteString
serializeBytes a
msg
      target :: PR.Address
      target :: Address
target =
        Address
forall msg. Message msg => msg
defMessage
          Address -> (Address -> Address) -> Address
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *). Identical f => LensLike' f Address Text
forall (f :: * -> *) s a.
(Functor f, HasField s "namespace" a) =>
LensLike' f s a
PR.namespace (forall (f :: * -> *). Identical f => LensLike' f Address Text)
-> Text -> Address -> Address
forall s t a b. Setter s t a b -> b -> s -> t
.~ Text
namespace
          Address -> (Address -> Address) -> Address
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *). Identical f => LensLike' f Address Text
forall (f :: * -> *) s a.
(Functor f, HasField s "type'" a) =>
LensLike' f s a
PR.type' (forall (f :: * -> *). Identical f => LensLike' f Address Text)
-> Text -> Address -> Address
forall s t a b. Setter s t a b -> b -> s -> t
.~ Text
funcType
          Address -> (Address -> Address) -> Address
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *). Identical f => LensLike' f Address Text
forall (f :: * -> *) s a.
(Functor f, HasField s "id" a) =>
LensLike' f s a
PR.id (forall (f :: * -> *). Identical f => LensLike' f Address Text)
-> Text -> Address -> Address
forall s t a b. Setter s t a b -> b -> s -> t
.~ Text
id'
      invocation :: PR.FromFunction'Invocation
      invocation :: FromFunction'Invocation
invocation =
        FromFunction'Invocation
forall msg. Message msg => msg
defMessage
          FromFunction'Invocation
-> (FromFunction'Invocation -> FromFunction'Invocation)
-> FromFunction'Invocation
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *).
Identical f =>
LensLike' f FromFunction'Invocation Address
forall (f :: * -> *) s a.
(Functor f, HasField s "target" a) =>
LensLike' f s a
PR.target (forall (f :: * -> *).
 Identical f =>
 LensLike' f FromFunction'Invocation Address)
-> Address -> FromFunction'Invocation -> FromFunction'Invocation
forall s t a b. Setter s t a b -> b -> s -> t
.~ Address
target
          FromFunction'Invocation
-> (FromFunction'Invocation -> FromFunction'Invocation)
-> FromFunction'Invocation
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *).
Identical f =>
LensLike' f FromFunction'Invocation Any
forall (f :: * -> *) s a.
(Functor f, HasField s "argument" a) =>
LensLike' f s a
PR.argument (forall (f :: * -> *).
 Identical f =>
 LensLike' f FromFunction'Invocation Any)
-> Any -> FromFunction'Invocation -> FromFunction'Invocation
forall s t a b. Setter s t a b -> b -> s -> t
.~ Any
argument
  sendByteMsgDelay :: (Text, Text, Text) -> Int -> a -> Function s ()
sendByteMsgDelay (namespace :: Text
namespace, funcType :: Text
funcType, id' :: Text
id') delay :: Int
delay msg :: a
msg = do
    Seq FromFunction'DelayedInvocation
invocations <- (FunctionState s -> Seq FromFunction'DelayedInvocation)
-> Function s (Seq FromFunction'DelayedInvocation)
forall s (m :: * -> *) a. MonadState s m => (s -> a) -> m a
gets FunctionState s -> Seq FromFunction'DelayedInvocation
forall ctx. FunctionState ctx -> Seq FromFunction'DelayedInvocation
functionStateDelayedInvocations
    (FunctionState s -> FunctionState s) -> Function s ()
forall s (m :: * -> *). MonadState s m => (s -> s) -> m ()
modify (\old :: FunctionState s
old -> FunctionState s
old {functionStateDelayedInvocations :: Seq FromFunction'DelayedInvocation
functionStateDelayedInvocations = Seq FromFunction'DelayedInvocation
invocations Seq FromFunction'DelayedInvocation
-> FromFunction'DelayedInvocation
-> Seq FromFunction'DelayedInvocation
forall a. Seq a -> a -> Seq a
Seq.:|> FromFunction'DelayedInvocation
invocation})
    where
      argument :: Any
      argument :: Any
argument = Any
forall msg. Message msg => msg
defMessage Any -> (Any -> Any) -> Any
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *). Identical f => LensLike' f Any ByteString
forall (f :: * -> *) s a.
(Functor f, HasField s "value" a) =>
LensLike' f s a
Any.value (forall (f :: * -> *). Identical f => LensLike' f Any ByteString)
-> ByteString -> Any -> Any
forall s t a b. Setter s t a b -> b -> s -> t
.~ a -> ByteString
forall a. Serde a => a -> ByteString
serializeBytes a
msg
      target :: PR.Address
      target :: Address
target =
        Address
forall msg. Message msg => msg
defMessage
          Address -> (Address -> Address) -> Address
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *). Identical f => LensLike' f Address Text
forall (f :: * -> *) s a.
(Functor f, HasField s "namespace" a) =>
LensLike' f s a
PR.namespace (forall (f :: * -> *). Identical f => LensLike' f Address Text)
-> Text -> Address -> Address
forall s t a b. Setter s t a b -> b -> s -> t
.~ Text
namespace
          Address -> (Address -> Address) -> Address
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *). Identical f => LensLike' f Address Text
forall (f :: * -> *) s a.
(Functor f, HasField s "type'" a) =>
LensLike' f s a
PR.type' (forall (f :: * -> *). Identical f => LensLike' f Address Text)
-> Text -> Address -> Address
forall s t a b. Setter s t a b -> b -> s -> t
.~ Text
funcType
          Address -> (Address -> Address) -> Address
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *). Identical f => LensLike' f Address Text
forall (f :: * -> *) s a.
(Functor f, HasField s "id" a) =>
LensLike' f s a
PR.id (forall (f :: * -> *). Identical f => LensLike' f Address Text)
-> Text -> Address -> Address
forall s t a b. Setter s t a b -> b -> s -> t
.~ Text
id'
      invocation :: PR.FromFunction'DelayedInvocation
      invocation :: FromFunction'DelayedInvocation
invocation =
        FromFunction'DelayedInvocation
forall msg. Message msg => msg
defMessage
          FromFunction'DelayedInvocation
-> (FromFunction'DelayedInvocation
    -> FromFunction'DelayedInvocation)
-> FromFunction'DelayedInvocation
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *).
Identical f =>
LensLike' f FromFunction'DelayedInvocation Int64
forall (f :: * -> *) s a.
(Functor f, HasField s "delayInMs" a) =>
LensLike' f s a
PR.delayInMs (forall (f :: * -> *).
 Identical f =>
 LensLike' f FromFunction'DelayedInvocation Int64)
-> Int64
-> FromFunction'DelayedInvocation
-> FromFunction'DelayedInvocation
forall s t a b. Setter s t a b -> b -> s -> t
.~ Int -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
delay
          FromFunction'DelayedInvocation
-> (FromFunction'DelayedInvocation
    -> FromFunction'DelayedInvocation)
-> FromFunction'DelayedInvocation
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *).
Identical f =>
LensLike' f FromFunction'DelayedInvocation Address
forall (f :: * -> *) s a.
(Functor f, HasField s "target" a) =>
LensLike' f s a
PR.target (forall (f :: * -> *).
 Identical f =>
 LensLike' f FromFunction'DelayedInvocation Address)
-> Address
-> FromFunction'DelayedInvocation
-> FromFunction'DelayedInvocation
forall s t a b. Setter s t a b -> b -> s -> t
.~ Address
target
          FromFunction'DelayedInvocation
-> (FromFunction'DelayedInvocation
    -> FromFunction'DelayedInvocation)
-> FromFunction'DelayedInvocation
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *).
Identical f =>
LensLike' f FromFunction'DelayedInvocation Any
forall (f :: * -> *) s a.
(Functor f, HasField s "argument" a) =>
LensLike' f s a
PR.argument (forall (f :: * -> *).
 Identical f =>
 LensLike' f FromFunction'DelayedInvocation Any)
-> Any
-> FromFunction'DelayedInvocation
-> FromFunction'DelayedInvocation
forall s t a b. Setter s t a b -> b -> s -> t
.~ Any
argument

data FlinkError
  = MissingInvocationBatch
  | ProtodeserializeBytesError String
  | StateDecodeError String
  | MessageDecodeError String
  | ProtoMessageDecodeError UnpackError
  | NoSuchFunction (Text, Text)
  deriving (Int -> FlinkError -> ShowS
[FlinkError] -> ShowS
FlinkError -> String
(Int -> FlinkError -> ShowS)
-> (FlinkError -> String)
-> ([FlinkError] -> ShowS)
-> Show FlinkError
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [FlinkError] -> ShowS
$cshowList :: [FlinkError] -> ShowS
show :: FlinkError -> String
$cshow :: FlinkError -> String
showsPrec :: Int -> FlinkError -> ShowS
$cshowsPrec :: Int -> FlinkError -> ShowS
Show, FlinkError -> FlinkError -> Bool
(FlinkError -> FlinkError -> Bool)
-> (FlinkError -> FlinkError -> Bool) -> Eq FlinkError
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: FlinkError -> FlinkError -> Bool
$c/= :: FlinkError -> FlinkError -> Bool
== :: FlinkError -> FlinkError -> Bool
$c== :: FlinkError -> FlinkError -> Bool
Eq)

-- | Convenience function for wrapping state in newtype for JSON serialization
jsonState :: Json s => (a -> Function s ()) -> a -> Function (JsonSerde s) ()
jsonState :: (a -> Function s ()) -> a -> Function (JsonSerde s) ()
jsonState f :: a -> Function s ()
f a :: a
a = Function (JsonSerde s) ()
jsonWrapper
  where
    jsonWrapper :: Function (JsonSerde s) ()
jsonWrapper = ExceptT
  FlinkError
  (StateT (FunctionState (JsonSerde s)) (ReaderT Env IO))
  ()
-> Function (JsonSerde s) ()
forall s a.
ExceptT FlinkError (StateT (FunctionState s) (ReaderT Env IO)) a
-> Function s a
Function (StateT
  (FunctionState (JsonSerde s))
  (ReaderT Env IO)
  (Either FlinkError ())
-> ExceptT
     FlinkError
     (StateT (FunctionState (JsonSerde s)) (ReaderT Env IO))
     ()
forall e (m :: * -> *) a. m (Either e a) -> ExceptT e m a
ExceptT ((FunctionState (JsonSerde s)
 -> ReaderT
      Env IO (Either FlinkError (), FunctionState (JsonSerde s)))
-> StateT
     (FunctionState (JsonSerde s))
     (ReaderT Env IO)
     (Either FlinkError ())
forall s (m :: * -> *) a. (s -> m (a, s)) -> StateT s m a
StateT (\s :: FunctionState (JsonSerde s)
s -> (Env -> IO (Either FlinkError (), FunctionState (JsonSerde s)))
-> ReaderT
     Env IO (Either FlinkError (), FunctionState (JsonSerde s))
forall r (m :: * -> *) a. (r -> m a) -> ReaderT r m a
ReaderT (\env :: Env
env -> ((FunctionState s -> FunctionState (JsonSerde s))
-> (Either FlinkError (), FunctionState s)
-> (Either FlinkError (), FunctionState (JsonSerde s))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((FunctionState s -> FunctionState (JsonSerde s))
 -> (Either FlinkError (), FunctionState s)
 -> (Either FlinkError (), FunctionState (JsonSerde s)))
-> ((s -> JsonSerde s)
    -> FunctionState s -> FunctionState (JsonSerde s))
-> (s -> JsonSerde s)
-> (Either FlinkError (), FunctionState s)
-> (Either FlinkError (), FunctionState (JsonSerde s))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (s -> JsonSerde s)
-> FunctionState s -> FunctionState (JsonSerde s)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap) s -> JsonSerde s
forall a. a -> JsonSerde a
JsonSerde ((Either FlinkError (), FunctionState s)
 -> (Either FlinkError (), FunctionState (JsonSerde s)))
-> IO (Either FlinkError (), FunctionState s)
-> IO (Either FlinkError (), FunctionState (JsonSerde s))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Function s ()
-> Env
-> FunctionState s
-> IO (Either FlinkError (), FunctionState s)
forall s a.
Function s a
-> Env
-> FunctionState s
-> IO (Either FlinkError a, FunctionState s)
runner (a -> Function s ()
f a
a) Env
env (JsonSerde s -> s
forall a. JsonSerde a -> a
getJson (JsonSerde s -> s)
-> FunctionState (JsonSerde s) -> FunctionState s
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> FunctionState (JsonSerde s)
s)))))
    runner :: Function s a
-> Env
-> FunctionState s
-> IO (Either FlinkError a, FunctionState s)
runner res :: Function s a
res env :: Env
env state :: FunctionState s
state = ReaderT Env IO (Either FlinkError a, FunctionState s)
-> Env -> IO (Either FlinkError a, FunctionState s)
forall r (m :: * -> *) a. ReaderT r m a -> r -> m a
runReaderT (StateT (FunctionState s) (ReaderT Env IO) (Either FlinkError a)
-> FunctionState s
-> ReaderT Env IO (Either FlinkError a, FunctionState s)
forall s (m :: * -> *) a. StateT s m a -> s -> m (a, s)
runStateT (ExceptT FlinkError (StateT (FunctionState s) (ReaderT Env IO)) a
-> StateT (FunctionState s) (ReaderT Env IO) (Either FlinkError a)
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT (ExceptT FlinkError (StateT (FunctionState s) (ReaderT Env IO)) a
 -> StateT (FunctionState s) (ReaderT Env IO) (Either FlinkError a))
-> ExceptT FlinkError (StateT (FunctionState s) (ReaderT Env IO)) a
-> StateT (FunctionState s) (ReaderT Env IO) (Either FlinkError a)
forall a b. (a -> b) -> a -> b
$ Function s a
-> ExceptT FlinkError (StateT (FunctionState s) (ReaderT Env IO)) a
forall s a.
Function s a
-> ExceptT FlinkError (StateT (FunctionState s) (ReaderT Env IO)) a
runFunction Function s a
res) FunctionState s
state) Env
env

-- | Convenience function for wrapping state in newtype for Protobuf serialization
protoState :: Message s => (a -> Function s ()) -> a -> Function (ProtoSerde s) ()
protoState :: (a -> Function s ()) -> a -> Function (ProtoSerde s) ()
protoState f :: a -> Function s ()
f a :: a
a = Function (ProtoSerde s) ()
protoWrapper
  where
    protoWrapper :: Function (ProtoSerde s) ()
protoWrapper = ExceptT
  FlinkError
  (StateT (FunctionState (ProtoSerde s)) (ReaderT Env IO))
  ()
-> Function (ProtoSerde s) ()
forall s a.
ExceptT FlinkError (StateT (FunctionState s) (ReaderT Env IO)) a
-> Function s a
Function (StateT
  (FunctionState (ProtoSerde s))
  (ReaderT Env IO)
  (Either FlinkError ())
-> ExceptT
     FlinkError
     (StateT (FunctionState (ProtoSerde s)) (ReaderT Env IO))
     ()
forall e (m :: * -> *) a. m (Either e a) -> ExceptT e m a
ExceptT ((FunctionState (ProtoSerde s)
 -> ReaderT
      Env IO (Either FlinkError (), FunctionState (ProtoSerde s)))
-> StateT
     (FunctionState (ProtoSerde s))
     (ReaderT Env IO)
     (Either FlinkError ())
forall s (m :: * -> *) a. (s -> m (a, s)) -> StateT s m a
StateT (\s :: FunctionState (ProtoSerde s)
s -> (Env -> IO (Either FlinkError (), FunctionState (ProtoSerde s)))
-> ReaderT
     Env IO (Either FlinkError (), FunctionState (ProtoSerde s))
forall r (m :: * -> *) a. (r -> m a) -> ReaderT r m a
ReaderT (\env :: Env
env -> ((FunctionState s -> FunctionState (ProtoSerde s))
-> (Either FlinkError (), FunctionState s)
-> (Either FlinkError (), FunctionState (ProtoSerde s))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((FunctionState s -> FunctionState (ProtoSerde s))
 -> (Either FlinkError (), FunctionState s)
 -> (Either FlinkError (), FunctionState (ProtoSerde s)))
-> ((s -> ProtoSerde s)
    -> FunctionState s -> FunctionState (ProtoSerde s))
-> (s -> ProtoSerde s)
-> (Either FlinkError (), FunctionState s)
-> (Either FlinkError (), FunctionState (ProtoSerde s))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (s -> ProtoSerde s)
-> FunctionState s -> FunctionState (ProtoSerde s)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap) s -> ProtoSerde s
forall a. a -> ProtoSerde a
ProtoSerde ((Either FlinkError (), FunctionState s)
 -> (Either FlinkError (), FunctionState (ProtoSerde s)))
-> IO (Either FlinkError (), FunctionState s)
-> IO (Either FlinkError (), FunctionState (ProtoSerde s))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Function s ()
-> Env
-> FunctionState s
-> IO (Either FlinkError (), FunctionState s)
forall s a.
Function s a
-> Env
-> FunctionState s
-> IO (Either FlinkError a, FunctionState s)
runner (a -> Function s ()
f a
a) Env
env (ProtoSerde s -> s
forall a. ProtoSerde a -> a
getMessage (ProtoSerde s -> s)
-> FunctionState (ProtoSerde s) -> FunctionState s
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> FunctionState (ProtoSerde s)
s)))))
    runner :: Function s a
-> Env
-> FunctionState s
-> IO (Either FlinkError a, FunctionState s)
runner res :: Function s a
res env :: Env
env state :: FunctionState s
state = ReaderT Env IO (Either FlinkError a, FunctionState s)
-> Env -> IO (Either FlinkError a, FunctionState s)
forall r (m :: * -> *) a. ReaderT r m a -> r -> m a
runReaderT (StateT (FunctionState s) (ReaderT Env IO) (Either FlinkError a)
-> FunctionState s
-> ReaderT Env IO (Either FlinkError a, FunctionState s)
forall s (m :: * -> *) a. StateT s m a -> s -> m (a, s)
runStateT (ExceptT FlinkError (StateT (FunctionState s) (ReaderT Env IO)) a
-> StateT (FunctionState s) (ReaderT Env IO) (Either FlinkError a)
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT (ExceptT FlinkError (StateT (FunctionState s) (ReaderT Env IO)) a
 -> StateT (FunctionState s) (ReaderT Env IO) (Either FlinkError a))
-> ExceptT FlinkError (StateT (FunctionState s) (ReaderT Env IO)) a
-> StateT (FunctionState s) (ReaderT Env IO) (Either FlinkError a)
forall a b. (a -> b) -> a -> b
$ Function s a
-> ExceptT FlinkError (StateT (FunctionState s) (ReaderT Env IO)) a
forall s a.
Function s a
-> ExceptT FlinkError (StateT (FunctionState s) (ReaderT Env IO)) a
runFunction Function s a
res) FunctionState s
state) Env
env

-- | Takes a function taking an arbitrary state type and converts it to take 'ByteString's.
-- This allows each function in the 'FunctionTable' to take its own individual type of state and just expose
-- a function accepting 'ByteString' to the library code.
flinkWrapper :: Serde s => (Any -> Function s ()) -> ByteString -> Env -> PR.ToFunction'InvocationBatchRequest -> IO (Either FlinkError (FunctionState ByteString))
flinkWrapper :: (Any -> Function s ())
-> ByteString
-> Env
-> ToFunction'InvocationBatchRequest
-> IO (Either FlinkError (FunctionState ByteString))
flinkWrapper func :: Any -> Function s ()
func initialContext :: ByteString
initialContext env :: Env
env invocationBatch :: ToFunction'InvocationBatchRequest
invocationBatch = ExceptT FlinkError IO (FunctionState ByteString)
-> IO (Either FlinkError (FunctionState ByteString))
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT (ExceptT FlinkError IO (FunctionState ByteString)
 -> IO (Either FlinkError (FunctionState ByteString)))
-> ExceptT FlinkError IO (FunctionState ByteString)
-> IO (Either FlinkError (FunctionState ByteString))
forall a b. (a -> b) -> a -> b
$ do
  s
deserializeBytesdContext <- Either FlinkError s -> ExceptT FlinkError IO s
forall e (m :: * -> *) a. MonadError e m => Either e a -> m a
liftEither (Either FlinkError s -> ExceptT FlinkError IO s)
-> Either FlinkError s -> ExceptT FlinkError IO s
forall a b. (a -> b) -> a -> b
$ (String -> FlinkError) -> Either String s -> Either FlinkError s
forall a c b. (a -> c) -> Either a b -> Either c b
mapLeft String -> FlinkError
StateDecodeError (Either String s -> Either FlinkError s)
-> Either String s -> Either FlinkError s
forall a b. (a -> b) -> a -> b
$ ByteString -> Either String s
forall a. Serde a => ByteString -> Either String a
deserializeBytes ByteString
initialContext
  (err :: Either FlinkError ()
err, finalState :: FunctionState s
finalState) <- IO (Either FlinkError (), FunctionState s)
-> ExceptT FlinkError IO (Either FlinkError (), FunctionState s)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either FlinkError (), FunctionState s)
 -> ExceptT FlinkError IO (Either FlinkError (), FunctionState s))
-> IO (Either FlinkError (), FunctionState s)
-> ExceptT FlinkError IO (Either FlinkError (), FunctionState s)
forall a b. (a -> b) -> a -> b
$ FunctionState s -> IO (Either FlinkError (), FunctionState s)
runner (s -> FunctionState s
forall a. a -> FunctionState a
newState s
deserializeBytesdContext)
  Either FlinkError () -> ExceptT FlinkError IO ()
forall e (m :: * -> *) a. MonadError e m => Either e a -> m a
liftEither Either FlinkError ()
err
  FunctionState ByteString
-> ExceptT FlinkError IO (FunctionState ByteString)
forall (m :: * -> *) a. Monad m => a -> m a
return (FunctionState ByteString
 -> ExceptT FlinkError IO (FunctionState ByteString))
-> FunctionState ByteString
-> ExceptT FlinkError IO (FunctionState ByteString)
forall a b. (a -> b) -> a -> b
$ s -> ByteString
forall a. Serde a => a -> ByteString
serializeBytes (s -> ByteString) -> FunctionState s -> FunctionState ByteString
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> FunctionState s
finalState
  where
    runner :: FunctionState s -> IO (Either FlinkError (), FunctionState s)
runner state :: FunctionState s
state = ReaderT Env IO (Either FlinkError (), FunctionState s)
-> Env -> IO (Either FlinkError (), FunctionState s)
forall r (m :: * -> *) a. ReaderT r m a -> r -> m a
runReaderT (StateT (FunctionState s) (ReaderT Env IO) (Either FlinkError ())
-> FunctionState s
-> ReaderT Env IO (Either FlinkError (), FunctionState s)
forall s (m :: * -> *) a. StateT s m a -> s -> m (a, s)
runStateT (ExceptT FlinkError (StateT (FunctionState s) (ReaderT Env IO)) ()
-> StateT (FunctionState s) (ReaderT Env IO) (Either FlinkError ())
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT (ExceptT FlinkError (StateT (FunctionState s) (ReaderT Env IO)) ()
 -> StateT
      (FunctionState s) (ReaderT Env IO) (Either FlinkError ()))
-> ExceptT
     FlinkError (StateT (FunctionState s) (ReaderT Env IO)) ()
-> StateT (FunctionState s) (ReaderT Env IO) (Either FlinkError ())
forall a b. (a -> b) -> a -> b
$ Function s ()
-> ExceptT
     FlinkError (StateT (FunctionState s) (ReaderT Env IO)) ()
forall s a.
Function s a
-> ExceptT FlinkError (StateT (FunctionState s) (ReaderT Env IO)) a
runFunction Function s ()
runWithCtx) FunctionState s
state) Env
env
    runWithCtx :: Function s ()
runWithCtx = do
      s
defaultCtx <- (FunctionState s -> s) -> Function s s
forall s (m :: * -> *) a. MonadState s m => (s -> a) -> m a
gets FunctionState s -> s
forall ctx. FunctionState ctx -> ctx
functionStateCtx
      let initialCtx :: Either FlinkError s
initialCtx = s -> Maybe ToFunction'PersistedValue -> Either FlinkError s
forall a t.
(Serde a, HasField t "stateValue" ByteString) =>
a -> Maybe t -> Either FlinkError a
getInitialCtx s
defaultCtx ([ToFunction'PersistedValue] -> Maybe ToFunction'PersistedValue
forall a. [a] -> Maybe a
listToMaybe ([ToFunction'PersistedValue] -> Maybe ToFunction'PersistedValue)
-> [ToFunction'PersistedValue] -> Maybe ToFunction'PersistedValue
forall a b. (a -> b) -> a -> b
$ ToFunction'InvocationBatchRequest
invocationBatch ToFunction'InvocationBatchRequest
-> FoldLike
     [ToFunction'PersistedValue]
     ToFunction'InvocationBatchRequest
     ToFunction'InvocationBatchRequest
     [ToFunction'PersistedValue]
     [ToFunction'PersistedValue]
-> [ToFunction'PersistedValue]
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike
  [ToFunction'PersistedValue]
  ToFunction'InvocationBatchRequest
  ToFunction'InvocationBatchRequest
  [ToFunction'PersistedValue]
  [ToFunction'PersistedValue]
forall (f :: * -> *) s a.
(Functor f, HasField s "state" a) =>
LensLike' f s a
PR.state)
      case Either FlinkError s
initialCtx of
        Left err :: FlinkError
err -> FlinkError -> Function s ()
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError FlinkError
err
        Right ctx :: s
ctx -> s -> Function s ()
forall s (m :: * -> *). StatefulFunc s m => s -> m ()
setInitialCtx s
ctx
      (Any -> Function s ()) -> [Any] -> Function s ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ Any -> Function s ()
func ((ToFunction'Invocation
-> FoldLike Any ToFunction'Invocation ToFunction'Invocation Any Any
-> Any
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike Any ToFunction'Invocation ToFunction'Invocation Any Any
forall (f :: * -> *) s a.
(Functor f, HasField s "argument" a) =>
LensLike' f s a
PR.argument) (ToFunction'Invocation -> Any) -> [ToFunction'Invocation] -> [Any]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ToFunction'InvocationBatchRequest
invocationBatch ToFunction'InvocationBatchRequest
-> FoldLike
     [ToFunction'Invocation]
     ToFunction'InvocationBatchRequest
     ToFunction'InvocationBatchRequest
     [ToFunction'Invocation]
     [ToFunction'Invocation]
-> [ToFunction'Invocation]
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike
  [ToFunction'Invocation]
  ToFunction'InvocationBatchRequest
  ToFunction'InvocationBatchRequest
  [ToFunction'Invocation]
  [ToFunction'Invocation]
forall (f :: * -> *) s a.
(Functor f, HasField s "invocations" a) =>
LensLike' f s a
PR.invocations)

    getInitialCtx :: a -> Maybe t -> Either FlinkError a
getInitialCtx def :: a
def pv :: Maybe t
pv = a -> Maybe ByteString -> Either FlinkError a
forall a. Serde a => a -> Maybe ByteString -> Either FlinkError a
handleEmptyState a
def (Maybe ByteString -> Either FlinkError a)
-> Maybe ByteString -> Either FlinkError a
forall a b. (a -> b) -> a -> b
$ (t -> FoldLike ByteString t t ByteString ByteString -> ByteString
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike ByteString t t ByteString ByteString
forall (f :: * -> *) s a.
(Functor f, HasField s "stateValue" a) =>
LensLike' f s a
PR.stateValue) (t -> ByteString) -> Maybe t -> Maybe ByteString
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe t
pv
    handleEmptyState :: a -> Maybe ByteString -> Either FlinkError a
handleEmptyState def :: a
def state' :: Maybe ByteString
state' = case Maybe ByteString
state' of
      Just "" -> a -> Either FlinkError a
forall (m :: * -> *) a. Monad m => a -> m a
return a
def
      Just other :: ByteString
other -> (String -> FlinkError) -> Either String a -> Either FlinkError a
forall a c b. (a -> c) -> Either a b -> Either c b
mapLeft String -> FlinkError
StateDecodeError (Either String a -> Either FlinkError a)
-> Either String a -> Either FlinkError a
forall a b. (a -> b) -> a -> b
$ ByteString -> Either String a
forall a. Serde a => ByteString -> Either String a
deserializeBytes ByteString
other
      Nothing -> a -> Either FlinkError a
forall (m :: * -> *) a. Monad m => a -> m a
return a
def

-- | Deserializes input messages as arbitrary bytes by extracting them out of the protobuf Any
-- and ignoring the type since that's protobuf specific
serdeInput :: (Serde s, Serde a, StatefulFunc s m, MonadError FlinkError m, MonadReader Env m) => (a -> m b) -> Any -> m b
serdeInput :: (a -> m b) -> Any -> m b
serdeInput f :: a -> m b
f input :: Any
input = a -> m b
f (a -> m b) -> m a -> m b
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Either FlinkError a -> m a
forall e (m :: * -> *) a. MonadError e m => Either e a -> m a
liftEither ((String -> FlinkError) -> Either String a -> Either FlinkError a
forall a c b. (a -> c) -> Either a b -> Either c b
mapLeft String -> FlinkError
MessageDecodeError (Either String a -> Either FlinkError a)
-> Either String a -> Either FlinkError a
forall a b. (a -> b) -> a -> b
$ ByteString -> Either String a
forall a. Serde a => ByteString -> Either String a
deserializeBytes (Any
input Any
-> FoldLike ByteString Any Any ByteString ByteString -> ByteString
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike ByteString Any Any ByteString ByteString
forall (f :: * -> *) s a.
(Functor f, HasField s "value" a) =>
LensLike' f s a
Any.value))

-- | Deserializes input messages as arbitrary bytes by extracting them out of the protobuf Any
-- and ignoring the type since that's protobuf specific
jsonInput :: (Serde s, Json a, StatefulFunc s m, MonadError FlinkError m, MonadReader Env m) => (a -> m b) -> Any -> m b
jsonInput :: (a -> m b) -> Any -> m b
jsonInput f :: a -> m b
f = (JsonSerde a -> m b) -> Any -> m b
forall s a (m :: * -> *) b.
(Serde s, Serde a, StatefulFunc s m, MonadError FlinkError m,
 MonadReader Env m) =>
(a -> m b) -> Any -> m b
serdeInput JsonSerde a -> m b
wrapJson
  where
    wrapJson :: JsonSerde a -> m b
wrapJson (JsonSerde msg :: a
msg) = a -> m b
f a
msg

-- | Deserializes input messages by unpacking the protobuf Any into the expected type.
-- If you are passing messages via protobuf, this is much more typesafe than 'serdeInput'
protoInput :: (Serde s, Message a, StatefulFunc s m, MonadError FlinkError m, MonadReader Env m) => (a -> m b) -> Any -> m b
protoInput :: (a -> m b) -> Any -> m b
protoInput f :: a -> m b
f input :: Any
input = a -> m b
f (a -> m b) -> m a -> m b
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Either FlinkError a -> m a
forall e (m :: * -> *) a. MonadError e m => Either e a -> m a
liftEither ((UnpackError -> FlinkError)
-> Either UnpackError a -> Either FlinkError a
forall a c b. (a -> c) -> Either a b -> Either c b
mapLeft UnpackError -> FlinkError
ProtoMessageDecodeError (Either UnpackError a -> Either FlinkError a)
-> Either UnpackError a -> Either FlinkError a
forall a b. (a -> b) -> a -> b
$ Any -> Either UnpackError a
forall a. Message a => Any -> Either UnpackError a
Any.unpack Any
input)

createFlinkResp :: FunctionState ByteString -> FromFunction
createFlinkResp :: FunctionState ByteString -> FromFunction
createFlinkResp (FunctionState state :: ByteString
state mutated :: Bool
mutated invocations :: Seq FromFunction'Invocation
invocations delayedInvocations :: Seq FromFunction'DelayedInvocation
delayedInvocations egresses :: Seq FromFunction'EgressMessage
egresses) =
  FromFunction
forall msg. Message msg => msg
defMessage FromFunction -> (FromFunction -> FromFunction) -> FromFunction
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *).
Identical f =>
LensLike' f FromFunction FromFunction'InvocationResponse
forall (f :: * -> *) s a.
(Functor f, HasField s "invocationResult" a) =>
LensLike' f s a
PR.invocationResult
    (forall (f :: * -> *).
 Identical f =>
 LensLike' f FromFunction FromFunction'InvocationResponse)
-> FromFunction'InvocationResponse -> FromFunction -> FromFunction
forall s t a b. Setter s t a b -> b -> s -> t
.~ ( FromFunction'InvocationResponse
forall msg. Message msg => msg
defMessage
           FromFunction'InvocationResponse
-> (FromFunction'InvocationResponse
    -> FromFunction'InvocationResponse)
-> FromFunction'InvocationResponse
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *).
Identical f =>
LensLike'
  f
  FromFunction'InvocationResponse
  [FromFunction'PersistedValueMutation]
forall (f :: * -> *) s a.
(Functor f, HasField s "stateMutations" a) =>
LensLike' f s a
PR.stateMutations (forall (f :: * -> *).
 Identical f =>
 LensLike'
   f
   FromFunction'InvocationResponse
   [FromFunction'PersistedValueMutation])
-> [FromFunction'PersistedValueMutation]
-> FromFunction'InvocationResponse
-> FromFunction'InvocationResponse
forall s t a b. Setter s t a b -> b -> s -> t
.~ [FromFunction'PersistedValueMutation]
-> [FromFunction'PersistedValueMutation]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList [FromFunction'PersistedValueMutation]
stateMutations
           FromFunction'InvocationResponse
-> (FromFunction'InvocationResponse
    -> FromFunction'InvocationResponse)
-> FromFunction'InvocationResponse
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *).
Identical f =>
LensLike'
  f FromFunction'InvocationResponse [FromFunction'Invocation]
forall (f :: * -> *) s a.
(Functor f, HasField s "outgoingMessages" a) =>
LensLike' f s a
PR.outgoingMessages (forall (f :: * -> *).
 Identical f =>
 LensLike'
   f FromFunction'InvocationResponse [FromFunction'Invocation])
-> [FromFunction'Invocation]
-> FromFunction'InvocationResponse
-> FromFunction'InvocationResponse
forall s t a b. Setter s t a b -> b -> s -> t
.~ Seq FromFunction'Invocation -> [FromFunction'Invocation]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList Seq FromFunction'Invocation
invocations
           FromFunction'InvocationResponse
-> (FromFunction'InvocationResponse
    -> FromFunction'InvocationResponse)
-> FromFunction'InvocationResponse
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *).
Identical f =>
LensLike'
  f FromFunction'InvocationResponse [FromFunction'DelayedInvocation]
forall (f :: * -> *) s a.
(Functor f, HasField s "delayedInvocations" a) =>
LensLike' f s a
PR.delayedInvocations (forall (f :: * -> *).
 Identical f =>
 LensLike'
   f FromFunction'InvocationResponse [FromFunction'DelayedInvocation])
-> [FromFunction'DelayedInvocation]
-> FromFunction'InvocationResponse
-> FromFunction'InvocationResponse
forall s t a b. Setter s t a b -> b -> s -> t
.~ Seq FromFunction'DelayedInvocation
-> [FromFunction'DelayedInvocation]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList Seq FromFunction'DelayedInvocation
delayedInvocations
           FromFunction'InvocationResponse
-> (FromFunction'InvocationResponse
    -> FromFunction'InvocationResponse)
-> FromFunction'InvocationResponse
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *).
Identical f =>
LensLike'
  f FromFunction'InvocationResponse [FromFunction'EgressMessage]
forall (f :: * -> *) s a.
(Functor f, HasField s "outgoingEgresses" a) =>
LensLike' f s a
PR.outgoingEgresses (forall (f :: * -> *).
 Identical f =>
 LensLike'
   f FromFunction'InvocationResponse [FromFunction'EgressMessage])
-> [FromFunction'EgressMessage]
-> FromFunction'InvocationResponse
-> FromFunction'InvocationResponse
forall s t a b. Setter s t a b -> b -> s -> t
.~ Seq FromFunction'EgressMessage -> [FromFunction'EgressMessage]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList Seq FromFunction'EgressMessage
egresses
       )
  where
    stateMutations :: [PR.FromFunction'PersistedValueMutation]
    stateMutations :: [FromFunction'PersistedValueMutation]
stateMutations =
      [ FromFunction'PersistedValueMutation
forall msg. Message msg => msg
defMessage
          FromFunction'PersistedValueMutation
-> (FromFunction'PersistedValueMutation
    -> FromFunction'PersistedValueMutation)
-> FromFunction'PersistedValueMutation
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *).
Identical f =>
LensLike'
  f
  FromFunction'PersistedValueMutation
  FromFunction'PersistedValueMutation'MutationType
forall (f :: * -> *) s a.
(Functor f, HasField s "mutationType" a) =>
LensLike' f s a
PR.mutationType (forall (f :: * -> *).
 Identical f =>
 LensLike'
   f
   FromFunction'PersistedValueMutation
   FromFunction'PersistedValueMutation'MutationType)
-> FromFunction'PersistedValueMutation'MutationType
-> FromFunction'PersistedValueMutation
-> FromFunction'PersistedValueMutation
forall s t a b. Setter s t a b -> b -> s -> t
.~ FromFunction'PersistedValueMutation'MutationType
PR.FromFunction'PersistedValueMutation'MODIFY
          FromFunction'PersistedValueMutation
-> (FromFunction'PersistedValueMutation
    -> FromFunction'PersistedValueMutation)
-> FromFunction'PersistedValueMutation
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *).
Identical f =>
LensLike' f FromFunction'PersistedValueMutation Text
forall (f :: * -> *) s a.
(Functor f, HasField s "stateName" a) =>
LensLike' f s a
PR.stateName (forall (f :: * -> *).
 Identical f =>
 LensLike' f FromFunction'PersistedValueMutation Text)
-> Text
-> FromFunction'PersistedValueMutation
-> FromFunction'PersistedValueMutation
forall s t a b. Setter s t a b -> b -> s -> t
.~ "flink_state"
          FromFunction'PersistedValueMutation
-> (FromFunction'PersistedValueMutation
    -> FromFunction'PersistedValueMutation)
-> FromFunction'PersistedValueMutation
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *).
Identical f =>
LensLike' f FromFunction'PersistedValueMutation ByteString
forall (f :: * -> *) s a.
(Functor f, HasField s "stateValue" a) =>
LensLike' f s a
PR.stateValue (forall (f :: * -> *).
 Identical f =>
 LensLike' f FromFunction'PersistedValueMutation ByteString)
-> ByteString
-> FromFunction'PersistedValueMutation
-> FromFunction'PersistedValueMutation
forall s t a b. Setter s t a b -> b -> s -> t
.~ ByteString
state
        | Bool
mutated
      ]

type FlinkApi =
  "statefun" :> ReqBody '[Proto] ToFunction :> Post '[Proto] FromFunction

flinkApi :: Proxy FlinkApi
flinkApi :: Proxy FlinkApi
flinkApi = Proxy FlinkApi
forall k (t :: k). Proxy t
Proxy

-- | Takes function table and creates a wai 'Application' to serve flink requests
createApp :: FunctionTable -> Application
createApp :: FunctionTable -> Application
createApp funcs :: FunctionTable
funcs = Proxy FlinkApi -> Server FlinkApi -> Application
forall api.
HasServer api '[] =>
Proxy api -> Server api -> Application
serve Proxy FlinkApi
flinkApi (FunctionTable -> Server FlinkApi
flinkServer FunctionTable
funcs)

-- | Takes function table and creates a servant 'Server' to serve flink requests
flinkServer :: FunctionTable -> Server FlinkApi
flinkServer :: FunctionTable -> Server FlinkApi
flinkServer functions :: FunctionTable
functions toFunction :: ToFunction
toFunction = do
  ToFunction'InvocationBatchRequest
batch <- ToFunction -> Handler ToFunction'InvocationBatchRequest
forall (m :: * -> *) t.
(MonadError ServerError m,
 HasField t "maybe'request" (Maybe ToFunction'Request)) =>
t -> m ToFunction'InvocationBatchRequest
getBatch ToFunction
toFunction
  ((initialCtx :: ByteString
initialCtx, function :: ByteString
-> Env
-> ToFunction'InvocationBatchRequest
-> IO (Either FlinkError (FunctionState ByteString))
function), (namespace :: Text
namespace, type' :: Text
type', id' :: Text
id')) <- Address
-> Handler
     ((ByteString,
       ByteString
       -> Env
       -> ToFunction'InvocationBatchRequest
       -> IO (Either FlinkError (FunctionState ByteString))),
      (Text, Text, Text))
forall (m :: * -> *) t c.
(MonadError ServerError m, HasField t "id" c,
 HasField t "namespace" Text, HasField t "type'" Text) =>
t
-> m ((ByteString,
       ByteString
       -> Env
       -> ToFunction'InvocationBatchRequest
       -> IO (Either FlinkError (FunctionState ByteString))),
      (Text, Text, c))
findFunc (ToFunction'InvocationBatchRequest
batch ToFunction'InvocationBatchRequest
-> FoldLike
     Address
     ToFunction'InvocationBatchRequest
     ToFunction'InvocationBatchRequest
     Address
     Address
-> Address
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike
  Address
  ToFunction'InvocationBatchRequest
  ToFunction'InvocationBatchRequest
  Address
  Address
forall (f :: * -> *) s a.
(Functor f, HasField s "target" a) =>
LensLike' f s a
PR.target)
  Either FlinkError (FunctionState ByteString)
result <- IO (Either FlinkError (FunctionState ByteString))
-> Handler (Either FlinkError (FunctionState ByteString))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either FlinkError (FunctionState ByteString))
 -> Handler (Either FlinkError (FunctionState ByteString)))
-> IO (Either FlinkError (FunctionState ByteString))
-> Handler (Either FlinkError (FunctionState ByteString))
forall a b. (a -> b) -> a -> b
$ ByteString
-> Env
-> ToFunction'InvocationBatchRequest
-> IO (Either FlinkError (FunctionState ByteString))
function ByteString
initialCtx (Text -> Text -> Text -> Env
Env Text
namespace Text
type' Text
id') ToFunction'InvocationBatchRequest
batch
  FunctionState ByteString
finalState <- Either ServerError (FunctionState ByteString)
-> Handler (FunctionState ByteString)
forall e (m :: * -> *) a. MonadError e m => Either e a -> m a
liftEither (Either ServerError (FunctionState ByteString)
 -> Handler (FunctionState ByteString))
-> Either ServerError (FunctionState ByteString)
-> Handler (FunctionState ByteString)
forall a b. (a -> b) -> a -> b
$ (FlinkError -> ServerError)
-> Either FlinkError (FunctionState ByteString)
-> Either ServerError (FunctionState ByteString)
forall a c b. (a -> c) -> Either a b -> Either c b
mapLeft FlinkError -> ServerError
flinkErrToServant Either FlinkError (FunctionState ByteString)
result
  FromFunction -> Handler FromFunction
forall (m :: * -> *) a. Monad m => a -> m a
return (FromFunction -> Handler FromFunction)
-> FromFunction -> Handler FromFunction
forall a b. (a -> b) -> a -> b
$ FunctionState ByteString -> FromFunction
createFlinkResp FunctionState ByteString
finalState
  where
    getBatch :: t -> m ToFunction'InvocationBatchRequest
getBatch input :: t
input = m ToFunction'InvocationBatchRequest
-> (ToFunction'InvocationBatchRequest
    -> m ToFunction'InvocationBatchRequest)
-> Maybe ToFunction'InvocationBatchRequest
-> m ToFunction'InvocationBatchRequest
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (ServerError -> m ToFunction'InvocationBatchRequest
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError (ServerError -> m ToFunction'InvocationBatchRequest)
-> ServerError -> m ToFunction'InvocationBatchRequest
forall a b. (a -> b) -> a -> b
$ FlinkError -> ServerError
flinkErrToServant FlinkError
MissingInvocationBatch) ToFunction'InvocationBatchRequest
-> m ToFunction'InvocationBatchRequest
forall (m :: * -> *) a. Monad m => a -> m a
return (t
input t
-> Fold
     t
     t
     ToFunction'InvocationBatchRequest
     ToFunction'InvocationBatchRequest
-> Maybe ToFunction'InvocationBatchRequest
forall s t a b. s -> Fold s t a b -> Maybe a
^? LensLike' f t (Maybe ToFunction'Request)
forall (f :: * -> *) s a.
(Functor f, HasField s "maybe'request" a) =>
LensLike' f s a
PR.maybe'request LensLike' f t (Maybe ToFunction'Request)
-> ((ToFunction'InvocationBatchRequest
     -> f ToFunction'InvocationBatchRequest)
    -> Maybe ToFunction'Request -> f (Maybe ToFunction'Request))
-> (ToFunction'InvocationBatchRequest
    -> f ToFunction'InvocationBatchRequest)
-> t
-> f t
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (ToFunction'Request -> f ToFunction'Request)
-> Maybe ToFunction'Request -> f (Maybe ToFunction'Request)
forall a b. Prism (Maybe a) (Maybe b) a b
_Just ((ToFunction'Request -> f ToFunction'Request)
 -> Maybe ToFunction'Request -> f (Maybe ToFunction'Request))
-> ((ToFunction'InvocationBatchRequest
     -> f ToFunction'InvocationBatchRequest)
    -> ToFunction'Request -> f ToFunction'Request)
-> (ToFunction'InvocationBatchRequest
    -> f ToFunction'InvocationBatchRequest)
-> Maybe ToFunction'Request
-> f (Maybe ToFunction'Request)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (ToFunction'InvocationBatchRequest
 -> f ToFunction'InvocationBatchRequest)
-> ToFunction'Request -> f ToFunction'Request
Prism' ToFunction'Request ToFunction'InvocationBatchRequest
PR._ToFunction'Invocation')
    findFunc :: t
-> m ((ByteString,
       ByteString
       -> Env
       -> ToFunction'InvocationBatchRequest
       -> IO (Either FlinkError (FunctionState ByteString))),
      (Text, Text, c))
findFunc addr :: t
addr = do
      (ByteString,
 ByteString
 -> Env
 -> ToFunction'InvocationBatchRequest
 -> IO (Either FlinkError (FunctionState ByteString)))
res <- m (ByteString,
   ByteString
   -> Env
   -> ToFunction'InvocationBatchRequest
   -> IO (Either FlinkError (FunctionState ByteString)))
-> ((ByteString,
     ByteString
     -> Env
     -> ToFunction'InvocationBatchRequest
     -> IO (Either FlinkError (FunctionState ByteString)))
    -> m (ByteString,
          ByteString
          -> Env
          -> ToFunction'InvocationBatchRequest
          -> IO (Either FlinkError (FunctionState ByteString))))
-> Maybe
     (ByteString,
      ByteString
      -> Env
      -> ToFunction'InvocationBatchRequest
      -> IO (Either FlinkError (FunctionState ByteString)))
-> m (ByteString,
      ByteString
      -> Env
      -> ToFunction'InvocationBatchRequest
      -> IO (Either FlinkError (FunctionState ByteString)))
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (ServerError
-> m (ByteString,
      ByteString
      -> Env
      -> ToFunction'InvocationBatchRequest
      -> IO (Either FlinkError (FunctionState ByteString)))
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError (ServerError
 -> m (ByteString,
       ByteString
       -> Env
       -> ToFunction'InvocationBatchRequest
       -> IO (Either FlinkError (FunctionState ByteString))))
-> ServerError
-> m (ByteString,
      ByteString
      -> Env
      -> ToFunction'InvocationBatchRequest
      -> IO (Either FlinkError (FunctionState ByteString)))
forall a b. (a -> b) -> a -> b
$ FlinkError -> ServerError
flinkErrToServant (FlinkError -> ServerError) -> FlinkError -> ServerError
forall a b. (a -> b) -> a -> b
$ (Text, Text) -> FlinkError
NoSuchFunction (Text
namespace, Text
type')) (ByteString,
 ByteString
 -> Env
 -> ToFunction'InvocationBatchRequest
 -> IO (Either FlinkError (FunctionState ByteString)))
-> m (ByteString,
      ByteString
      -> Env
      -> ToFunction'InvocationBatchRequest
      -> IO (Either FlinkError (FunctionState ByteString)))
forall (m :: * -> *) a. Monad m => a -> m a
return ((Text, Text)
-> FunctionTable
-> Maybe
     (ByteString,
      ByteString
      -> Env
      -> ToFunction'InvocationBatchRequest
      -> IO (Either FlinkError (FunctionState ByteString)))
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup (Text
namespace, Text
type') FunctionTable
functions)
      ((ByteString,
  ByteString
  -> Env
  -> ToFunction'InvocationBatchRequest
  -> IO (Either FlinkError (FunctionState ByteString))),
 (Text, Text, c))
-> m ((ByteString,
       ByteString
       -> Env
       -> ToFunction'InvocationBatchRequest
       -> IO (Either FlinkError (FunctionState ByteString))),
      (Text, Text, c))
forall (m :: * -> *) a. Monad m => a -> m a
return ((ByteString,
 ByteString
 -> Env
 -> ToFunction'InvocationBatchRequest
 -> IO (Either FlinkError (FunctionState ByteString)))
res, (Text, Text, c)
address)
      where
        address :: (Text, Text, c)
address@(namespace :: Text
namespace, type' :: Text
type', _) = (t
addr t -> FoldLike Text t t Text Text -> Text
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike Text t t Text Text
forall (f :: * -> *) s a.
(Functor f, HasField s "namespace" a) =>
LensLike' f s a
PR.namespace, t
addr t -> FoldLike Text t t Text Text -> Text
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike Text t t Text Text
forall (f :: * -> *) s a.
(Functor f, HasField s "type'" a) =>
LensLike' f s a
PR.type', t
addr t -> FoldLike c t t c c -> c
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike c t t c c
forall (f :: * -> *) s a.
(Functor f, HasField s "id" a) =>
LensLike' f s a
PR.id)

flinkErrToServant :: FlinkError -> ServerError
flinkErrToServant :: FlinkError -> ServerError
flinkErrToServant err :: FlinkError
err = case FlinkError
err of
  MissingInvocationBatch -> ServerError
err400 {errBody :: ByteString
errBody = "Invocation batch missing"}
  ProtodeserializeBytesError protoErr :: String
protoErr -> ServerError
err400 {errBody :: ByteString
errBody = "Could not deserializeBytes protobuf " ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> String -> ByteString
BSL.pack String
protoErr}
  StateDecodeError decodeErr :: String
decodeErr -> ServerError
err400 {errBody :: ByteString
errBody = "Invalid JSON " ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> String -> ByteString
BSL.pack String
decodeErr}
  MessageDecodeError msg :: String
msg -> ServerError
err400 {errBody :: ByteString
errBody = "Failed to decode message " ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> String -> ByteString
BSL.pack String
msg}
  ProtoMessageDecodeError msg :: UnpackError
msg -> ServerError
err400 {errBody :: ByteString
errBody = "Failed to decode message " ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> String -> ByteString
BSL.pack (UnpackError -> String
forall a. Show a => a -> String
show UnpackError
msg)}
  NoSuchFunction (namespace :: Text
namespace, type' :: Text
type') -> ServerError
err400 {errBody :: ByteString
errBody = "No such function " ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> Text -> ByteString
T.encodeUtf8 (Text -> Text
fromStrict Text
namespace) ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> Text -> ByteString
T.encodeUtf8 (Text -> Text
fromStrict Text
type')}