{-# LANGUAGE PatternSynonyms #-}
module Network.Flink.Internal.Stateful
  ( StatefulFunc
      ( insideCtx,
        getCtx,
        setCtx,
        modifyCtx,
        sendMsg,
        sendMsgDelay,
        sendEgressMsg
      ),
    flinkWrapper,
    createApp,
    flinkServer,
    flinkApi,
    Address(.., Address'),
    FuncType (..),
    Function (..),
    Serde (..),
    FunctionState (..),
    FlinkError (..),
    FunctionTable,
    Env (..),
    Expiration(..),
    ExpirationMode(..),
    newState,
    ProtoSerde (..),
    JsonSerde (..),
    jsonState,
    protoState,
    sendProtoMsg,
    sendProtoMsgDelay
  )
where

import Control.Monad.Except
import Control.Monad.Reader
import Control.Monad.State (MonadState, StateT (..), gets, modify)
import Data.Aeson (FromJSON, ToJSON, eitherDecode, encode)
import Data.ByteString (ByteString)
import qualified Data.ByteString.Lazy.Char8 as BSL
import Data.Either.Combinators (mapLeft)
import Data.Foldable (Foldable (toList))
import Data.Map (Map)
import qualified Data.Map as Map
import Data.ProtoLens (Message, defMessage, encodeMessage, messageName)
import Data.ProtoLens.Any (UnpackError)
import Data.ProtoLens.Encoding (decodeMessage)
import Data.ProtoLens.Prism
import Data.Sequence (Seq)
import qualified Data.Sequence as Seq
import Data.Text (Text)
import Data.Text.Lazy (fromStrict)
import Data.Coerce ( coerce )
import qualified Data.Text.Lazy.Encoding as T
import Lens.Family2
import Lens.Micro ( traversed, filtered )
import Network.Flink.Internal.ProtoServant (Proto)
import Proto.RequestReply (FromFunction, ToFunction)
import qualified Proto.RequestReply as PR
import qualified Proto.RequestReply_Fields as PR
import Servant

import Data.Time.Clock ( NominalDiffTime )

data FuncType = FuncType Text Text deriving (FuncType -> FuncType -> Bool
(FuncType -> FuncType -> Bool)
-> (FuncType -> FuncType -> Bool) -> Eq FuncType
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: FuncType -> FuncType -> Bool
$c/= :: FuncType -> FuncType -> Bool
== :: FuncType -> FuncType -> Bool
$c== :: FuncType -> FuncType -> Bool
Eq, Eq FuncType
Eq FuncType =>
(FuncType -> FuncType -> Ordering)
-> (FuncType -> FuncType -> Bool)
-> (FuncType -> FuncType -> Bool)
-> (FuncType -> FuncType -> Bool)
-> (FuncType -> FuncType -> Bool)
-> (FuncType -> FuncType -> FuncType)
-> (FuncType -> FuncType -> FuncType)
-> Ord FuncType
FuncType -> FuncType -> Bool
FuncType -> FuncType -> Ordering
FuncType -> FuncType -> FuncType
forall a.
Eq a =>
(a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
min :: FuncType -> FuncType -> FuncType
$cmin :: FuncType -> FuncType -> FuncType
max :: FuncType -> FuncType -> FuncType
$cmax :: FuncType -> FuncType -> FuncType
>= :: FuncType -> FuncType -> Bool
$c>= :: FuncType -> FuncType -> Bool
> :: FuncType -> FuncType -> Bool
$c> :: FuncType -> FuncType -> Bool
<= :: FuncType -> FuncType -> Bool
$c<= :: FuncType -> FuncType -> Bool
< :: FuncType -> FuncType -> Bool
$c< :: FuncType -> FuncType -> Bool
compare :: FuncType -> FuncType -> Ordering
$ccompare :: FuncType -> FuncType -> Ordering
$cp1Ord :: Eq FuncType
Ord)
data Address = Address FuncType Text

{-# COMPLETE Address' #-}
pattern Address' :: Text -> Text -> Text -> Address
pattern $bAddress' :: Text -> Text -> Text -> Address
$mAddress' :: forall r.
Address -> (Text -> Text -> Text -> r) -> (Void# -> r) -> r
Address' fnamespace fnTp fid = Address (FuncType fnamespace fnTp) fid

data FuncRes = IncompleteContext Expiration Text | UpdatedState (FunctionState PR.TypedValue) deriving Int -> FuncRes -> ShowS
[FuncRes] -> ShowS
FuncRes -> String
(Int -> FuncRes -> ShowS)
-> (FuncRes -> String) -> ([FuncRes] -> ShowS) -> Show FuncRes
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [FuncRes] -> ShowS
$cshowList :: [FuncRes] -> ShowS
show :: FuncRes -> String
$cshow :: FuncRes -> String
showsPrec :: Int -> FuncRes -> ShowS
$cshowsPrec :: Int -> FuncRes -> ShowS
Show
type FuncExec = Env -> PR.ToFunction'InvocationBatchRequest -> IO (Either FlinkError FuncRes)
--- | Table of stateful functions `(functionNamespace, functionType) -> function
type FunctionTable = Map FuncType FuncExec

data Env = Env
  { Env -> Text
envFunctionNamespace :: Text,
    Env -> Text
envFunctionType :: Text,
    Env -> Text
envFunctionId :: Text
  }
  deriving (Int -> Env -> ShowS
[Env] -> ShowS
Env -> String
(Int -> Env -> ShowS)
-> (Env -> String) -> ([Env] -> ShowS) -> Show Env
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [Env] -> ShowS
$cshowList :: [Env] -> ShowS
show :: Env -> String
$cshow :: Env -> String
showsPrec :: Int -> Env -> ShowS
$cshowsPrec :: Int -> Env -> ShowS
Show)

data FunctionState ctx = FunctionState
  { FunctionState ctx -> ctx
functionStateCtx :: ctx,
    FunctionState ctx -> Bool
functionStateMutated :: Bool,
    FunctionState ctx -> Seq FromFunction'Invocation
functionStateInvocations :: Seq PR.FromFunction'Invocation,
    FunctionState ctx -> Seq FromFunction'DelayedInvocation
functionStateDelayedInvocations :: Seq PR.FromFunction'DelayedInvocation,
    FunctionState ctx -> Seq FromFunction'EgressMessage
functionStateEgressMessages :: Seq PR.FromFunction'EgressMessage
  }
  deriving (Int -> FunctionState ctx -> ShowS
[FunctionState ctx] -> ShowS
FunctionState ctx -> String
(Int -> FunctionState ctx -> ShowS)
-> (FunctionState ctx -> String)
-> ([FunctionState ctx] -> ShowS)
-> Show (FunctionState ctx)
forall ctx. Show ctx => Int -> FunctionState ctx -> ShowS
forall ctx. Show ctx => [FunctionState ctx] -> ShowS
forall ctx. Show ctx => FunctionState ctx -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [FunctionState ctx] -> ShowS
$cshowList :: forall ctx. Show ctx => [FunctionState ctx] -> ShowS
show :: FunctionState ctx -> String
$cshow :: forall ctx. Show ctx => FunctionState ctx -> String
showsPrec :: Int -> FunctionState ctx -> ShowS
$cshowsPrec :: forall ctx. Show ctx => Int -> FunctionState ctx -> ShowS
Show, a -> FunctionState b -> FunctionState a
(a -> b) -> FunctionState a -> FunctionState b
(forall a b. (a -> b) -> FunctionState a -> FunctionState b)
-> (forall a b. a -> FunctionState b -> FunctionState a)
-> Functor FunctionState
forall a b. a -> FunctionState b -> FunctionState a
forall a b. (a -> b) -> FunctionState a -> FunctionState b
forall (f :: * -> *).
(forall a b. (a -> b) -> f a -> f b)
-> (forall a b. a -> f b -> f a) -> Functor f
<$ :: a -> FunctionState b -> FunctionState a
$c<$ :: forall a b. a -> FunctionState b -> FunctionState a
fmap :: (a -> b) -> FunctionState a -> FunctionState b
$cfmap :: forall a b. (a -> b) -> FunctionState a -> FunctionState b
Functor)

newState :: a -> FunctionState a
newState :: a -> FunctionState a
newState initialCtx :: a
initialCtx = a
-> Bool
-> Seq FromFunction'Invocation
-> Seq FromFunction'DelayedInvocation
-> Seq FromFunction'EgressMessage
-> FunctionState a
forall ctx.
ctx
-> Bool
-> Seq FromFunction'Invocation
-> Seq FromFunction'DelayedInvocation
-> Seq FromFunction'EgressMessage
-> FunctionState ctx
FunctionState a
initialCtx Bool
False Seq FromFunction'Invocation
forall a. Monoid a => a
mempty Seq FromFunction'DelayedInvocation
forall a. Monoid a => a
mempty Seq FromFunction'EgressMessage
forall a. Monoid a => a
mempty

data ExpirationMode =  NONE | AFTER_WRITE | AFTER_CALL deriving (Int -> ExpirationMode -> ShowS
[ExpirationMode] -> ShowS
ExpirationMode -> String
(Int -> ExpirationMode -> ShowS)
-> (ExpirationMode -> String)
-> ([ExpirationMode] -> ShowS)
-> Show ExpirationMode
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [ExpirationMode] -> ShowS
$cshowList :: [ExpirationMode] -> ShowS
show :: ExpirationMode -> String
$cshow :: ExpirationMode -> String
showsPrec :: Int -> ExpirationMode -> ShowS
$cshowsPrec :: Int -> ExpirationMode -> ShowS
Show, ExpirationMode -> ExpirationMode -> Bool
(ExpirationMode -> ExpirationMode -> Bool)
-> (ExpirationMode -> ExpirationMode -> Bool) -> Eq ExpirationMode
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: ExpirationMode -> ExpirationMode -> Bool
$c/= :: ExpirationMode -> ExpirationMode -> Bool
== :: ExpirationMode -> ExpirationMode -> Bool
$c== :: ExpirationMode -> ExpirationMode -> Bool
Eq)

data Expiration = Expiration {
  Expiration -> ExpirationMode
emode :: ExpirationMode,
  Expiration -> NominalDiffTime
expireAfterMillis :: NominalDiffTime
} deriving (Int -> Expiration -> ShowS
[Expiration] -> ShowS
Expiration -> String
(Int -> Expiration -> ShowS)
-> (Expiration -> String)
-> ([Expiration] -> ShowS)
-> Show Expiration
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [Expiration] -> ShowS
$cshowList :: [Expiration] -> ShowS
show :: Expiration -> String
$cshow :: Expiration -> String
showsPrec :: Int -> Expiration -> ShowS
$cshowsPrec :: Int -> Expiration -> ShowS
Show, Expiration -> Expiration -> Bool
(Expiration -> Expiration -> Bool)
-> (Expiration -> Expiration -> Bool) -> Eq Expiration
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: Expiration -> Expiration -> Bool
$c/= :: Expiration -> Expiration -> Bool
== :: Expiration -> Expiration -> Bool
$c== :: Expiration -> Expiration -> Bool
Eq)

-- | Monad stack used for the execution of a Flink stateful function
-- Don't reference this directly in your code if possible
newtype Function s a = Function {Function s a
-> ExceptT FlinkError (StateT (FunctionState s) (ReaderT Env IO)) a
runFunction :: ExceptT FlinkError (StateT (FunctionState s) (ReaderT Env IO)) a}
  deriving (Applicative (Function s)
a -> Function s a
Applicative (Function s) =>
(forall a b. Function s a -> (a -> Function s b) -> Function s b)
-> (forall a b. Function s a -> Function s b -> Function s b)
-> (forall a. a -> Function s a)
-> Monad (Function s)
Function s a -> (a -> Function s b) -> Function s b
Function s a -> Function s b -> Function s b
forall s. Applicative (Function s)
forall a. a -> Function s a
forall s a. a -> Function s a
forall a b. Function s a -> Function s b -> Function s b
forall a b. Function s a -> (a -> Function s b) -> Function s b
forall s a b. Function s a -> Function s b -> Function s b
forall s a b. Function s a -> (a -> Function s b) -> Function s b
forall (m :: * -> *).
Applicative m =>
(forall a b. m a -> (a -> m b) -> m b)
-> (forall a b. m a -> m b -> m b)
-> (forall a. a -> m a)
-> Monad m
return :: a -> Function s a
$creturn :: forall s a. a -> Function s a
>> :: Function s a -> Function s b -> Function s b
$c>> :: forall s a b. Function s a -> Function s b -> Function s b
>>= :: Function s a -> (a -> Function s b) -> Function s b
$c>>= :: forall s a b. Function s a -> (a -> Function s b) -> Function s b
$cp1Monad :: forall s. Applicative (Function s)
Monad, Functor (Function s)
a -> Function s a
Functor (Function s) =>
(forall a. a -> Function s a)
-> (forall a b.
    Function s (a -> b) -> Function s a -> Function s b)
-> (forall a b c.
    (a -> b -> c) -> Function s a -> Function s b -> Function s c)
-> (forall a b. Function s a -> Function s b -> Function s b)
-> (forall a b. Function s a -> Function s b -> Function s a)
-> Applicative (Function s)
Function s a -> Function s b -> Function s b
Function s a -> Function s b -> Function s a
Function s (a -> b) -> Function s a -> Function s b
(a -> b -> c) -> Function s a -> Function s b -> Function s c
forall s. Functor (Function s)
forall a. a -> Function s a
forall s a. a -> Function s a
forall a b. Function s a -> Function s b -> Function s a
forall a b. Function s a -> Function s b -> Function s b
forall a b. Function s (a -> b) -> Function s a -> Function s b
forall s a b. Function s a -> Function s b -> Function s a
forall s a b. Function s a -> Function s b -> Function s b
forall s a b. Function s (a -> b) -> Function s a -> Function s b
forall a b c.
(a -> b -> c) -> Function s a -> Function s b -> Function s c
forall s a b c.
(a -> b -> c) -> Function s a -> Function s b -> Function s c
forall (f :: * -> *).
Functor f =>
(forall a. a -> f a)
-> (forall a b. f (a -> b) -> f a -> f b)
-> (forall a b c. (a -> b -> c) -> f a -> f b -> f c)
-> (forall a b. f a -> f b -> f b)
-> (forall a b. f a -> f b -> f a)
-> Applicative f
<* :: Function s a -> Function s b -> Function s a
$c<* :: forall s a b. Function s a -> Function s b -> Function s a
*> :: Function s a -> Function s b -> Function s b
$c*> :: forall s a b. Function s a -> Function s b -> Function s b
liftA2 :: (a -> b -> c) -> Function s a -> Function s b -> Function s c
$cliftA2 :: forall s a b c.
(a -> b -> c) -> Function s a -> Function s b -> Function s c
<*> :: Function s (a -> b) -> Function s a -> Function s b
$c<*> :: forall s a b. Function s (a -> b) -> Function s a -> Function s b
pure :: a -> Function s a
$cpure :: forall s a. a -> Function s a
$cp1Applicative :: forall s. Functor (Function s)
Applicative, a -> Function s b -> Function s a
(a -> b) -> Function s a -> Function s b
(forall a b. (a -> b) -> Function s a -> Function s b)
-> (forall a b. a -> Function s b -> Function s a)
-> Functor (Function s)
forall a b. a -> Function s b -> Function s a
forall a b. (a -> b) -> Function s a -> Function s b
forall s a b. a -> Function s b -> Function s a
forall s a b. (a -> b) -> Function s a -> Function s b
forall (f :: * -> *).
(forall a b. (a -> b) -> f a -> f b)
-> (forall a b. a -> f b -> f a) -> Functor f
<$ :: a -> Function s b -> Function s a
$c<$ :: forall s a b. a -> Function s b -> Function s a
fmap :: (a -> b) -> Function s a -> Function s b
$cfmap :: forall s a b. (a -> b) -> Function s a -> Function s b
Functor, MonadState (FunctionState s), MonadError FlinkError, Monad (Function s)
Monad (Function s) =>
(forall a. IO a -> Function s a) -> MonadIO (Function s)
IO a -> Function s a
forall s. Monad (Function s)
forall a. IO a -> Function s a
forall s a. IO a -> Function s a
forall (m :: * -> *).
Monad m =>
(forall a. IO a -> m a) -> MonadIO m
liftIO :: IO a -> Function s a
$cliftIO :: forall s a. IO a -> Function s a
$cp1MonadIO :: forall s. Monad (Function s)
MonadIO, MonadReader Env)

class Serde a where
  -- | Type name
  tpName :: Proxy a -> Text

  -- | decodes types from strict 'ByteString's
  deserializeBytes :: ByteString -> Either String a

  -- | encodes types to strict 'ByteString's
  serializeBytes :: a -> ByteString

newtype ProtoSerde a = ProtoSerde {ProtoSerde a -> a
getProto :: a}
  deriving (a -> ProtoSerde b -> ProtoSerde a
(a -> b) -> ProtoSerde a -> ProtoSerde b
(forall a b. (a -> b) -> ProtoSerde a -> ProtoSerde b)
-> (forall a b. a -> ProtoSerde b -> ProtoSerde a)
-> Functor ProtoSerde
forall a b. a -> ProtoSerde b -> ProtoSerde a
forall a b. (a -> b) -> ProtoSerde a -> ProtoSerde b
forall (f :: * -> *).
(forall a b. (a -> b) -> f a -> f b)
-> (forall a b. a -> f b -> f a) -> Functor f
<$ :: a -> ProtoSerde b -> ProtoSerde a
$c<$ :: forall a b. a -> ProtoSerde b -> ProtoSerde a
fmap :: (a -> b) -> ProtoSerde a -> ProtoSerde b
$cfmap :: forall a b. (a -> b) -> ProtoSerde a -> ProtoSerde b
Functor)

instance Message a => Serde (ProtoSerde a) where
  tpName :: Proxy (ProtoSerde a) -> Text
tpName px :: Proxy (ProtoSerde a)
px = "type.googleapis.com/" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Proxy a -> Text
forall msg. Message msg => Proxy msg -> Text
messageName (Proxy (ProtoSerde a) -> Proxy a
forall (f :: * -> *). Proxy (f a) -> Proxy a
unliftP Proxy (ProtoSerde a)
px)
    where unliftP :: Proxy (f a) -> Proxy a
          unliftP :: Proxy (f a) -> Proxy a
unliftP Proxy = Proxy a
forall k (t :: k). Proxy t
Proxy
  deserializeBytes :: ByteString -> Either String (ProtoSerde a)
deserializeBytes a :: ByteString
a = a -> ProtoSerde a
forall a. a -> ProtoSerde a
ProtoSerde (a -> ProtoSerde a)
-> Either String a -> Either String (ProtoSerde a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ByteString -> Either String a
forall msg. Message msg => ByteString -> Either String msg
decodeMessage ByteString
a
  serializeBytes :: ProtoSerde a -> ByteString
serializeBytes (ProtoSerde a :: a
a) = a -> ByteString
forall msg. Message msg => msg -> ByteString
encodeMessage a
a

type Json a = (FromJSON a, ToJSON a)

newtype JsonSerde a = JsonSerde {JsonSerde a -> a
getJson :: a}
  deriving (a -> JsonSerde b -> JsonSerde a
(a -> b) -> JsonSerde a -> JsonSerde b
(forall a b. (a -> b) -> JsonSerde a -> JsonSerde b)
-> (forall a b. a -> JsonSerde b -> JsonSerde a)
-> Functor JsonSerde
forall a b. a -> JsonSerde b -> JsonSerde a
forall a b. (a -> b) -> JsonSerde a -> JsonSerde b
forall (f :: * -> *).
(forall a b. (a -> b) -> f a -> f b)
-> (forall a b. a -> f b -> f a) -> Functor f
<$ :: a -> JsonSerde b -> JsonSerde a
$c<$ :: forall a b. a -> JsonSerde b -> JsonSerde a
fmap :: (a -> b) -> JsonSerde a -> JsonSerde b
$cfmap :: forall a b. (a -> b) -> JsonSerde a -> JsonSerde b
Functor)

instance Json a => Serde (JsonSerde a) where
  tpName :: Proxy (JsonSerde a) -> Text
tpName _ = "json/json" -- TODO: add adt name
  deserializeBytes :: ByteString -> Either String (JsonSerde a)
deserializeBytes a :: ByteString
a = a -> JsonSerde a
forall a. a -> JsonSerde a
JsonSerde (a -> JsonSerde a)
-> Either String a -> Either String (JsonSerde a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ByteString -> Either String a
forall a. FromJSON a => ByteString -> Either String a
eitherDecode (ByteString -> ByteString
BSL.fromStrict ByteString
a)
  serializeBytes :: JsonSerde a -> ByteString
serializeBytes (JsonSerde a :: a
a) = ByteString -> ByteString
BSL.toStrict (ByteString -> ByteString) -> ByteString -> ByteString
forall a b. (a -> b) -> a -> b
$ a -> ByteString
forall a. ToJSON a => a -> ByteString
encode a
a

instance Serde () where
  tpName :: Proxy () -> Text
tpName _ = "ghc/Unit"
  deserializeBytes :: ByteString -> Either String ()
deserializeBytes _ = () -> Either String ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
  serializeBytes :: () -> ByteString
serializeBytes _ = ""

instance Serde ByteString where
  tpName :: Proxy ByteString -> Text
tpName _ = "ghc/Data.ByteString"
  deserializeBytes :: ByteString -> Either String ByteString
deserializeBytes = ByteString -> Either String ByteString
forall (f :: * -> *) a. Applicative f => a -> f a
pure
  serializeBytes :: ByteString -> ByteString
serializeBytes = ByteString -> ByteString
forall a. a -> a
id

instance Serde BSL.ByteString where
  tpName :: Proxy ByteString -> Text
tpName _ = "ghc/Data.ByteString.Lazy"
  deserializeBytes :: ByteString -> Either String ByteString
deserializeBytes = ByteString -> Either String ByteString
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ByteString -> Either String ByteString)
-> (ByteString -> ByteString)
-> ByteString
-> Either String ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> ByteString
BSL.fromStrict
  serializeBytes :: ByteString -> ByteString
serializeBytes = ByteString -> ByteString
BSL.toStrict

-- | Used to represent all Flink stateful function capabilities.
--
-- Contexts are received from Flink and deserializeBytesd into `s`
-- all modifications to state are shipped back to Flink at the end of the
-- batch to be persisted.
--
-- Message passing is also queued up and passed back at the end of the current
-- batch.
class MonadIO m => StatefulFunc s m | m -> s where
  -- Internal
  setInitialCtx :: s -> m ()

  -- Public
  insideCtx :: (s -> a) -> m a
  getCtx :: m s
  setCtx :: s -> m ()
  modifyCtx :: (s -> s) -> m ()
  sendEgressMsg ::
    Message a =>
    -- | egress address (namespace, type)
    (Text, Text) ->
    -- | protobuf message to send (should be a Kafka or Kinesis protobuf record)
    a ->
    m ()
  sendMsg ::
    Serde a =>
    -- | Function address (namespace, type, id)
    Address ->
    -- | message to send
    a ->
    m ()
  sendMsgDelay ::
    Serde a =>
    -- | Function address (namespace, type, id)
    Address ->
    -- | delay before message send
    NominalDiffTime ->
    -- | message to send
    a ->
    -- | returns cancelation token with which delivery of the message could be canceled
    m ()

instance StatefulFunc s (Function s) where
  setInitialCtx :: s -> Function s ()
setInitialCtx ctx :: s
ctx = (FunctionState s -> FunctionState s) -> Function s ()
forall s (m :: * -> *). MonadState s m => (s -> s) -> m ()
modify (\old :: FunctionState s
old -> FunctionState s
old {functionStateCtx :: s
functionStateCtx = s
ctx})

  insideCtx :: (s -> a) -> Function s a
insideCtx func :: s -> a
func = s -> a
func (s -> a) -> Function s s -> Function s a
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Function s s
forall s (m :: * -> *). StatefulFunc s m => m s
getCtx
  getCtx :: Function s s
getCtx = (FunctionState s -> s) -> Function s s
forall s (m :: * -> *) a. MonadState s m => (s -> a) -> m a
gets FunctionState s -> s
forall ctx. FunctionState ctx -> ctx
functionStateCtx
  setCtx :: s -> Function s ()
setCtx new :: s
new = (FunctionState s -> FunctionState s) -> Function s ()
forall s (m :: * -> *). MonadState s m => (s -> s) -> m ()
modify (\old :: FunctionState s
old -> FunctionState s
old {functionStateCtx :: s
functionStateCtx = s
new, functionStateMutated :: Bool
functionStateMutated = Bool
True})
  modifyCtx :: (s -> s) -> Function s ()
modifyCtx mutator :: s -> s
mutator = Function s s
forall s (m :: * -> *). StatefulFunc s m => m s
getCtx Function s s -> (s -> Function s ()) -> Function s ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= s -> Function s ()
forall s (m :: * -> *). StatefulFunc s m => s -> m ()
setCtx (s -> Function s ()) -> (s -> s) -> s -> Function s ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. s -> s
mutator

  sendEgressMsg :: (Text, Text) -> a -> Function s ()
sendEgressMsg (namespace :: Text
namespace, egressType :: Text
egressType) msg :: a
msg = do
    Seq FromFunction'EgressMessage
egresses <- (FunctionState s -> Seq FromFunction'EgressMessage)
-> Function s (Seq FromFunction'EgressMessage)
forall s (m :: * -> *) a. MonadState s m => (s -> a) -> m a
gets FunctionState s -> Seq FromFunction'EgressMessage
forall ctx. FunctionState ctx -> Seq FromFunction'EgressMessage
functionStateEgressMessages
    (FunctionState s -> FunctionState s) -> Function s ()
forall s (m :: * -> *). MonadState s m => (s -> s) -> m ()
modify (\old :: FunctionState s
old -> FunctionState s
old {functionStateEgressMessages :: Seq FromFunction'EgressMessage
functionStateEgressMessages = Seq FromFunction'EgressMessage
egresses Seq FromFunction'EgressMessage
-> FromFunction'EgressMessage -> Seq FromFunction'EgressMessage
forall a. Seq a -> a -> Seq a
Seq.:|> FromFunction'EgressMessage
egressMsg})
    where
      wmsg :: ProtoSerde a
wmsg = a -> ProtoSerde a
forall a. a -> ProtoSerde a
ProtoSerde a
msg
      egressMsg :: PR.FromFunction'EgressMessage
      egressMsg :: FromFunction'EgressMessage
egressMsg =
        FromFunction'EgressMessage
forall msg. Message msg => msg
defMessage
          FromFunction'EgressMessage
-> (FromFunction'EgressMessage -> FromFunction'EgressMessage)
-> FromFunction'EgressMessage
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *).
Identical f =>
LensLike' f FromFunction'EgressMessage Text
forall (f :: * -> *) s a.
(Functor f, HasField s "egressNamespace" a) =>
LensLike' f s a
PR.egressNamespace (forall (f :: * -> *).
 Identical f =>
 LensLike' f FromFunction'EgressMessage Text)
-> Text -> FromFunction'EgressMessage -> FromFunction'EgressMessage
forall s t a b. Setter s t a b -> b -> s -> t
.~ Text
namespace
          FromFunction'EgressMessage
-> (FromFunction'EgressMessage -> FromFunction'EgressMessage)
-> FromFunction'EgressMessage
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *).
Identical f =>
LensLike' f FromFunction'EgressMessage Text
forall (f :: * -> *) s a.
(Functor f, HasField s "egressType" a) =>
LensLike' f s a
PR.egressType (forall (f :: * -> *).
 Identical f =>
 LensLike' f FromFunction'EgressMessage Text)
-> Text -> FromFunction'EgressMessage -> FromFunction'EgressMessage
forall s t a b. Setter s t a b -> b -> s -> t
.~ Text
egressType
          FromFunction'EgressMessage
-> (FromFunction'EgressMessage -> FromFunction'EgressMessage)
-> FromFunction'EgressMessage
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *).
Identical f =>
LensLike' f FromFunction'EgressMessage TypedValue
forall (f :: * -> *) s a.
(Functor f, HasField s "argument" a) =>
LensLike' f s a
PR.argument (forall (f :: * -> *).
 Identical f =>
 LensLike' f FromFunction'EgressMessage TypedValue)
-> TypedValue
-> FromFunction'EgressMessage
-> FromFunction'EgressMessage
forall s t a b. Setter s t a b -> b -> s -> t
.~ TypedValue
tpValue
      tpValue :: TypedValue
tpValue =
        TypedValue
forall msg. Message msg => msg
defMessage
          TypedValue -> (TypedValue -> TypedValue) -> TypedValue
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *). Identical f => LensLike' f TypedValue Text
forall (f :: * -> *) s a.
(Functor f, HasField s "typename" a) =>
LensLike' f s a
PR.typename (forall (f :: * -> *). Identical f => LensLike' f TypedValue Text)
-> Text -> TypedValue -> TypedValue
forall s t a b. Setter s t a b -> b -> s -> t
.~ Proxy (ProtoSerde a) -> Text
forall a. Serde a => Proxy a -> Text
tpName (ProtoSerde a -> Proxy (ProtoSerde a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure ProtoSerde a
wmsg)
          TypedValue -> (TypedValue -> TypedValue) -> TypedValue
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *). Identical f => LensLike' f TypedValue Bool
forall (f :: * -> *) s a.
(Functor f, HasField s "hasValue" a) =>
LensLike' f s a
PR.hasValue (forall (f :: * -> *). Identical f => LensLike' f TypedValue Bool)
-> Bool -> TypedValue -> TypedValue
forall s t a b. Setter s t a b -> b -> s -> t
.~ Bool
True          
          TypedValue -> (TypedValue -> TypedValue) -> TypedValue
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *).
Identical f =>
LensLike' f TypedValue ByteString
forall (f :: * -> *) s a.
(Functor f, HasField s "value" a) =>
LensLike' f s a
PR.value (forall (f :: * -> *).
 Identical f =>
 LensLike' f TypedValue ByteString)
-> ByteString -> TypedValue -> TypedValue
forall s t a b. Setter s t a b -> b -> s -> t
.~ ProtoSerde a -> ByteString
forall a. Serde a => a -> ByteString
serializeBytes ProtoSerde a
wmsg

  sendMsg :: Address -> a -> Function s ()
sendMsg (Address' namespace :: Text
namespace funcType :: Text
funcType id' :: Text
id') msg :: a
msg = do
    Seq FromFunction'Invocation
invocations <- (FunctionState s -> Seq FromFunction'Invocation)
-> Function s (Seq FromFunction'Invocation)
forall s (m :: * -> *) a. MonadState s m => (s -> a) -> m a
gets FunctionState s -> Seq FromFunction'Invocation
forall ctx. FunctionState ctx -> Seq FromFunction'Invocation
functionStateInvocations
    (FunctionState s -> FunctionState s) -> Function s ()
forall s (m :: * -> *). MonadState s m => (s -> s) -> m ()
modify (\old :: FunctionState s
old -> FunctionState s
old {functionStateInvocations :: Seq FromFunction'Invocation
functionStateInvocations = Seq FromFunction'Invocation
invocations Seq FromFunction'Invocation
-> FromFunction'Invocation -> Seq FromFunction'Invocation
forall a. Seq a -> a -> Seq a
Seq.:|> FromFunction'Invocation
invocation})
    where
      target :: PR.Address
      target :: Address
target =
        Address
forall msg. Message msg => msg
defMessage
          Address -> (Address -> Address) -> Address
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *). Identical f => LensLike' f Address Text
forall (f :: * -> *) s a.
(Functor f, HasField s "namespace" a) =>
LensLike' f s a
PR.namespace (forall (f :: * -> *). Identical f => LensLike' f Address Text)
-> Text -> Address -> Address
forall s t a b. Setter s t a b -> b -> s -> t
.~ Text
namespace
          Address -> (Address -> Address) -> Address
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *). Identical f => LensLike' f Address Text
forall (f :: * -> *) s a.
(Functor f, HasField s "type'" a) =>
LensLike' f s a
PR.type' (forall (f :: * -> *). Identical f => LensLike' f Address Text)
-> Text -> Address -> Address
forall s t a b. Setter s t a b -> b -> s -> t
.~ Text
funcType
          Address -> (Address -> Address) -> Address
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *). Identical f => LensLike' f Address Text
forall (f :: * -> *) s a.
(Functor f, HasField s "id" a) =>
LensLike' f s a
PR.id (forall (f :: * -> *). Identical f => LensLike' f Address Text)
-> Text -> Address -> Address
forall s t a b. Setter s t a b -> b -> s -> t
.~ Text
id'
      invocation :: PR.FromFunction'Invocation
      invocation :: FromFunction'Invocation
invocation =
        FromFunction'Invocation
forall msg. Message msg => msg
defMessage
          FromFunction'Invocation
-> (FromFunction'Invocation -> FromFunction'Invocation)
-> FromFunction'Invocation
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *).
Identical f =>
LensLike' f FromFunction'Invocation Address
forall (f :: * -> *) s a.
(Functor f, HasField s "target" a) =>
LensLike' f s a
PR.target (forall (f :: * -> *).
 Identical f =>
 LensLike' f FromFunction'Invocation Address)
-> Address -> FromFunction'Invocation -> FromFunction'Invocation
forall s t a b. Setter s t a b -> b -> s -> t
.~ Address
target
          FromFunction'Invocation
-> (FromFunction'Invocation -> FromFunction'Invocation)
-> FromFunction'Invocation
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *).
Identical f =>
LensLike' f FromFunction'Invocation TypedValue
forall (f :: * -> *) s a.
(Functor f, HasField s "argument" a) =>
LensLike' f s a
PR.argument (forall (f :: * -> *).
 Identical f =>
 LensLike' f FromFunction'Invocation TypedValue)
-> TypedValue -> FromFunction'Invocation -> FromFunction'Invocation
forall s t a b. Setter s t a b -> b -> s -> t
.~ TypedValue
tpValue
      tpValue :: TypedValue
tpValue =
        TypedValue
forall msg. Message msg => msg
defMessage
          TypedValue -> (TypedValue -> TypedValue) -> TypedValue
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *). Identical f => LensLike' f TypedValue Text
forall (f :: * -> *) s a.
(Functor f, HasField s "typename" a) =>
LensLike' f s a
PR.typename (forall (f :: * -> *). Identical f => LensLike' f TypedValue Text)
-> Text -> TypedValue -> TypedValue
forall s t a b. Setter s t a b -> b -> s -> t
.~ Proxy a -> Text
forall a. Serde a => Proxy a -> Text
tpName (a -> Proxy a
forall (f :: * -> *) a. Applicative f => a -> f a
pure a
msg)
          TypedValue -> (TypedValue -> TypedValue) -> TypedValue
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *). Identical f => LensLike' f TypedValue Bool
forall (f :: * -> *) s a.
(Functor f, HasField s "hasValue" a) =>
LensLike' f s a
PR.hasValue (forall (f :: * -> *). Identical f => LensLike' f TypedValue Bool)
-> Bool -> TypedValue -> TypedValue
forall s t a b. Setter s t a b -> b -> s -> t
.~ Bool
True
          TypedValue -> (TypedValue -> TypedValue) -> TypedValue
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *).
Identical f =>
LensLike' f TypedValue ByteString
forall (f :: * -> *) s a.
(Functor f, HasField s "value" a) =>
LensLike' f s a
PR.value (forall (f :: * -> *).
 Identical f =>
 LensLike' f TypedValue ByteString)
-> ByteString -> TypedValue -> TypedValue
forall s t a b. Setter s t a b -> b -> s -> t
.~ a -> ByteString
forall a. Serde a => a -> ByteString
serializeBytes a
msg

  sendMsgDelay :: Address -> NominalDiffTime -> a -> Function s ()
sendMsgDelay (Address' namespace :: Text
namespace funcType :: Text
funcType id' :: Text
id') delay :: NominalDiffTime
delay msg :: a
msg = do
    Seq FromFunction'DelayedInvocation
invocations <- (FunctionState s -> Seq FromFunction'DelayedInvocation)
-> Function s (Seq FromFunction'DelayedInvocation)
forall s (m :: * -> *) a. MonadState s m => (s -> a) -> m a
gets FunctionState s -> Seq FromFunction'DelayedInvocation
forall ctx. FunctionState ctx -> Seq FromFunction'DelayedInvocation
functionStateDelayedInvocations
    (FunctionState s -> FunctionState s) -> Function s ()
forall s (m :: * -> *). MonadState s m => (s -> s) -> m ()
modify (\old :: FunctionState s
old -> FunctionState s
old {functionStateDelayedInvocations :: Seq FromFunction'DelayedInvocation
functionStateDelayedInvocations = Seq FromFunction'DelayedInvocation
invocations Seq FromFunction'DelayedInvocation
-> FromFunction'DelayedInvocation
-> Seq FromFunction'DelayedInvocation
forall a. Seq a -> a -> Seq a
Seq.:|> FromFunction'DelayedInvocation
invocation})
    where
      target :: PR.Address
      target :: Address
target =
        Address
forall msg. Message msg => msg
defMessage
          Address -> (Address -> Address) -> Address
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *). Identical f => LensLike' f Address Text
forall (f :: * -> *) s a.
(Functor f, HasField s "namespace" a) =>
LensLike' f s a
PR.namespace (forall (f :: * -> *). Identical f => LensLike' f Address Text)
-> Text -> Address -> Address
forall s t a b. Setter s t a b -> b -> s -> t
.~ Text
namespace
          Address -> (Address -> Address) -> Address
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *). Identical f => LensLike' f Address Text
forall (f :: * -> *) s a.
(Functor f, HasField s "type'" a) =>
LensLike' f s a
PR.type' (forall (f :: * -> *). Identical f => LensLike' f Address Text)
-> Text -> Address -> Address
forall s t a b. Setter s t a b -> b -> s -> t
.~ Text
funcType
          Address -> (Address -> Address) -> Address
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *). Identical f => LensLike' f Address Text
forall (f :: * -> *) s a.
(Functor f, HasField s "id" a) =>
LensLike' f s a
PR.id (forall (f :: * -> *). Identical f => LensLike' f Address Text)
-> Text -> Address -> Address
forall s t a b. Setter s t a b -> b -> s -> t
.~ Text
id'
      invocation :: PR.FromFunction'DelayedInvocation
      invocation :: FromFunction'DelayedInvocation
invocation =
        FromFunction'DelayedInvocation
forall msg. Message msg => msg
defMessage
          FromFunction'DelayedInvocation
-> (FromFunction'DelayedInvocation
    -> FromFunction'DelayedInvocation)
-> FromFunction'DelayedInvocation
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *).
Identical f =>
LensLike' f FromFunction'DelayedInvocation Int64
forall (f :: * -> *) s a.
(Functor f, HasField s "delayInMs" a) =>
LensLike' f s a
PR.delayInMs (forall (f :: * -> *).
 Identical f =>
 LensLike' f FromFunction'DelayedInvocation Int64)
-> Int64
-> FromFunction'DelayedInvocation
-> FromFunction'DelayedInvocation
forall s t a b. Setter s t a b -> b -> s -> t
.~ NominalDiffTime -> Int64
forall a b. (RealFrac a, Integral b) => a -> b
round (NominalDiffTime
delay NominalDiffTime -> NominalDiffTime -> NominalDiffTime
forall a. Num a => a -> a -> a
* 1000)
          FromFunction'DelayedInvocation
-> (FromFunction'DelayedInvocation
    -> FromFunction'DelayedInvocation)
-> FromFunction'DelayedInvocation
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *).
Identical f =>
LensLike' f FromFunction'DelayedInvocation Address
forall (f :: * -> *) s a.
(Functor f, HasField s "target" a) =>
LensLike' f s a
PR.target (forall (f :: * -> *).
 Identical f =>
 LensLike' f FromFunction'DelayedInvocation Address)
-> Address
-> FromFunction'DelayedInvocation
-> FromFunction'DelayedInvocation
forall s t a b. Setter s t a b -> b -> s -> t
.~ Address
target
          FromFunction'DelayedInvocation
-> (FromFunction'DelayedInvocation
    -> FromFunction'DelayedInvocation)
-> FromFunction'DelayedInvocation
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *).
Identical f =>
LensLike' f FromFunction'DelayedInvocation TypedValue
forall (f :: * -> *) s a.
(Functor f, HasField s "argument" a) =>
LensLike' f s a
PR.argument (forall (f :: * -> *).
 Identical f =>
 LensLike' f FromFunction'DelayedInvocation TypedValue)
-> TypedValue
-> FromFunction'DelayedInvocation
-> FromFunction'DelayedInvocation
forall s t a b. Setter s t a b -> b -> s -> t
.~ TypedValue
tpValue
      tpValue :: TypedValue
tpValue =
        TypedValue
forall msg. Message msg => msg
defMessage
          TypedValue -> (TypedValue -> TypedValue) -> TypedValue
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *). Identical f => LensLike' f TypedValue Text
forall (f :: * -> *) s a.
(Functor f, HasField s "typename" a) =>
LensLike' f s a
PR.typename (forall (f :: * -> *). Identical f => LensLike' f TypedValue Text)
-> Text -> TypedValue -> TypedValue
forall s t a b. Setter s t a b -> b -> s -> t
.~ Proxy a -> Text
forall a. Serde a => Proxy a -> Text
tpName (a -> Proxy a
forall (f :: * -> *) a. Applicative f => a -> f a
pure a
msg)
          TypedValue -> (TypedValue -> TypedValue) -> TypedValue
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *). Identical f => LensLike' f TypedValue Bool
forall (f :: * -> *) s a.
(Functor f, HasField s "hasValue" a) =>
LensLike' f s a
PR.hasValue (forall (f :: * -> *). Identical f => LensLike' f TypedValue Bool)
-> Bool -> TypedValue -> TypedValue
forall s t a b. Setter s t a b -> b -> s -> t
.~ Bool
True
          TypedValue -> (TypedValue -> TypedValue) -> TypedValue
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *).
Identical f =>
LensLike' f TypedValue ByteString
forall (f :: * -> *) s a.
(Functor f, HasField s "value" a) =>
LensLike' f s a
PR.value (forall (f :: * -> *).
 Identical f =>
 LensLike' f TypedValue ByteString)
-> ByteString -> TypedValue -> TypedValue
forall s t a b. Setter s t a b -> b -> s -> t
.~ a -> ByteString
forall a. Serde a => a -> ByteString
serializeBytes a
msg

-- | Convinience function to send protobuf messages
sendProtoMsg :: (StatefulFunc s m, Message a) => Address -> a -> m ()
sendProtoMsg :: Address -> a -> m ()
sendProtoMsg addr :: Address
addr = Address -> ProtoSerde a -> m ()
forall s (m :: * -> *) a.
(StatefulFunc s m, Serde a) =>
Address -> a -> m ()
sendMsg Address
addr (ProtoSerde a -> m ()) -> (a -> ProtoSerde a) -> a -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> ProtoSerde a
forall a. a -> ProtoSerde a
ProtoSerde

-- | Convinience function to send delayed protobuf messages
sendProtoMsgDelay :: (StatefulFunc s m, Message a) => Address -> NominalDiffTime -> a -> m ()
sendProtoMsgDelay :: Address -> NominalDiffTime -> a -> m ()
sendProtoMsgDelay addr :: Address
addr delay :: NominalDiffTime
delay = Address -> NominalDiffTime -> ProtoSerde a -> m ()
forall s (m :: * -> *) a.
(StatefulFunc s m, Serde a) =>
Address -> NominalDiffTime -> a -> m ()
sendMsgDelay Address
addr NominalDiffTime
delay (ProtoSerde a -> m ()) -> (a -> ProtoSerde a) -> a -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> ProtoSerde a
forall a. a -> ProtoSerde a
ProtoSerde

data FlinkError
  = MissingInvocationBatch
  | ProtodeserializeBytesError String
  | InvalidTypePassedError Text Text
  | EmptyArgumentPassed
  | StateDecodeError String
  | MessageDecodeError String
  | ProtoMessageDecodeError UnpackError
  | NoSuchFunction (Text, Text)
  deriving (Int -> FlinkError -> ShowS
[FlinkError] -> ShowS
FlinkError -> String
(Int -> FlinkError -> ShowS)
-> (FlinkError -> String)
-> ([FlinkError] -> ShowS)
-> Show FlinkError
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [FlinkError] -> ShowS
$cshowList :: [FlinkError] -> ShowS
show :: FlinkError -> String
$cshow :: FlinkError -> String
showsPrec :: Int -> FlinkError -> ShowS
$cshowsPrec :: Int -> FlinkError -> ShowS
Show, FlinkError -> FlinkError -> Bool
(FlinkError -> FlinkError -> Bool)
-> (FlinkError -> FlinkError -> Bool) -> Eq FlinkError
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: FlinkError -> FlinkError -> Bool
$c/= :: FlinkError -> FlinkError -> Bool
== :: FlinkError -> FlinkError -> Bool
$c== :: FlinkError -> FlinkError -> Bool
Eq)

-- | Convenience function for wrapping state in newtype for JSON serialization
jsonState :: Json s => Function s () -> Function (JsonSerde s) ()
jsonState :: Function s () -> Function (JsonSerde s) ()
jsonState = Function s () -> Function (JsonSerde s) ()
forall a b. Coercible a b => a -> b
coerce

-- | Convenience function for wrapping state in newtype for Protobuf serialization
protoState :: Message s => Function s () -> Function (ProtoSerde s) ()
protoState :: Function s () -> Function (ProtoSerde s) ()
protoState = Function s () -> Function (ProtoSerde s) ()
forall a b. Coercible a b => a -> b
coerce

-- | Tries to unwrap typed value, possibly throwing FlinkError on broken input
unwrapA :: forall a m. (Serde a, MonadError FlinkError m) => PR.TypedValue -> m (Maybe a)
unwrapA :: TypedValue -> m (Maybe a)
unwrapA arg :: TypedValue
arg = let
      atp :: Text
atp = TypedValue
arg TypedValue -> FoldLike Text TypedValue TypedValue Text Text -> Text
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike Text TypedValue TypedValue Text Text
forall (f :: * -> *) s a.
(Functor f, HasField s "typename" a) =>
LensLike' f s a
PR.typename
      ctp :: Text
ctp = Proxy a -> Text
forall a. Serde a => Proxy a -> Text
tpName @a Proxy a
forall k (t :: k). Proxy t
Proxy
      in if Bool -> Bool
not (TypedValue
argTypedValue -> FoldLike Bool TypedValue TypedValue Bool Bool -> Bool
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike Bool TypedValue TypedValue Bool Bool
forall (f :: * -> *) s a.
(Functor f, HasField s "hasValue" a) =>
LensLike' f s a
PR.hasValue) then Maybe a -> m (Maybe a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe a
forall a. Maybe a
Nothing else
           if Text
atp Text -> Text -> Bool
forall a. Eq a => a -> a -> Bool
/= Text
ctp
              then FlinkError -> m (Maybe a)
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError (Text -> Text -> FlinkError
InvalidTypePassedError Text
ctp Text
atp)
              else a -> Maybe a
forall a. a -> Maybe a
Just (a -> Maybe a) -> m a -> m (Maybe a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (Either FlinkError a -> m a
forall e (m :: * -> *) a. MonadError e m => Either e a -> m a
liftEither (Either FlinkError a -> m a)
-> (Either String a -> Either FlinkError a)
-> Either String a
-> m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (String -> FlinkError) -> Either String a -> Either FlinkError a
forall a c b. (a -> c) -> Either a b -> Either c b
mapLeft String -> FlinkError
MessageDecodeError (Either String a -> m a) -> Either String a -> m a
forall a b. (a -> b) -> a -> b
$ ByteString -> Either String a
forall a. Serde a => ByteString -> Either String a
deserializeBytes @a (TypedValue
arg TypedValue
-> FoldLike ByteString TypedValue TypedValue ByteString ByteString
-> ByteString
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike ByteString TypedValue TypedValue ByteString ByteString
forall (f :: * -> *) s a.
(Functor f, HasField s "value" a) =>
LensLike' f s a
PR.value))

-- | Takes a function taking an arbitrary state type and converts it to take 'ByteString's.
-- This allows each function in the 'FunctionTable' to take its own individual type of state and just expose
-- a function accepting 'ByteString' to the library code.
flinkWrapper :: forall a s. (Serde a, Serde s) => s -> Expiration -> (a -> Function s ()) -> FuncExec
flinkWrapper :: s -> Expiration -> (a -> Function s ()) -> FuncExec
flinkWrapper s0 :: s
s0 expr :: Expiration
expr func :: a -> Function s ()
func env :: Env
env invocationBatch :: ToFunction'InvocationBatchRequest
invocationBatch = ExceptT FlinkError IO FuncRes -> IO (Either FlinkError FuncRes)
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT (ExceptT FlinkError IO FuncRes -> IO (Either FlinkError FuncRes))
-> ExceptT FlinkError IO FuncRes -> IO (Either FlinkError FuncRes)
forall a b. (a -> b) -> a -> b
$ do
  (eiRes :: Either FlinkError FuncRes
eiRes, _) <- IO (Either FlinkError FuncRes, FunctionState s)
-> ExceptT
     FlinkError IO (Either FlinkError FuncRes, FunctionState s)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either FlinkError FuncRes, FunctionState s)
 -> ExceptT
      FlinkError IO (Either FlinkError FuncRes, FunctionState s))
-> IO (Either FlinkError FuncRes, FunctionState s)
-> ExceptT
     FlinkError IO (Either FlinkError FuncRes, FunctionState s)
forall a b. (a -> b) -> a -> b
$ FunctionState s -> IO (Either FlinkError FuncRes, FunctionState s)
runner (s -> FunctionState s
forall a. a -> FunctionState a
newState s
s0)
  Either FlinkError FuncRes -> ExceptT FlinkError IO FuncRes
forall e (m :: * -> *) a. MonadError e m => Either e a -> m a
liftEither Either FlinkError FuncRes
eiRes
  where
    passedArgs :: [TypedValue]
passedArgs = ToFunction'InvocationBatchRequest
invocationBatch ToFunction'InvocationBatchRequest
-> Fold
     ToFunction'InvocationBatchRequest
     ToFunction'InvocationBatchRequest
     TypedValue
     TypedValue
-> [TypedValue]
forall s t a b. s -> Fold s t a b -> [a]
^.. LensLike'
  f ToFunction'InvocationBatchRequest [ToFunction'Invocation]
forall (f :: * -> *) s a.
(Functor f, HasField s "invocations" a) =>
LensLike' f s a
PR.invocations LensLike'
  f ToFunction'InvocationBatchRequest [ToFunction'Invocation]
-> ((TypedValue -> f TypedValue)
    -> [ToFunction'Invocation] -> f [ToFunction'Invocation])
-> (TypedValue -> f TypedValue)
-> ToFunction'InvocationBatchRequest
-> f ToFunction'InvocationBatchRequest
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (ToFunction'Invocation -> f ToFunction'Invocation)
-> [ToFunction'Invocation] -> f [ToFunction'Invocation]
forall (f :: * -> *) a b.
Traversable f =>
Traversal (f a) (f b) a b
traversed ((ToFunction'Invocation -> f ToFunction'Invocation)
 -> [ToFunction'Invocation] -> f [ToFunction'Invocation])
-> ((TypedValue -> f TypedValue)
    -> ToFunction'Invocation -> f ToFunction'Invocation)
-> (TypedValue -> f TypedValue)
-> [ToFunction'Invocation]
-> f [ToFunction'Invocation]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (TypedValue -> f TypedValue)
-> ToFunction'Invocation -> f ToFunction'Invocation
forall (f :: * -> *) s a.
(Functor f, HasField s "argument" a) =>
LensLike' f s a
PR.argument
    mbInitCtx :: Maybe TypedValue
mbInitCtx = (LensLike'
  f ToFunction'InvocationBatchRequest [ToFunction'PersistedValue]
forall (f :: * -> *) s a.
(Functor f, HasField s "state" a) =>
LensLike' f s a
PR.state LensLike'
  f ToFunction'InvocationBatchRequest [ToFunction'PersistedValue]
-> ((TypedValue -> f TypedValue)
    -> [ToFunction'PersistedValue] -> f [ToFunction'PersistedValue])
-> (TypedValue -> f TypedValue)
-> ToFunction'InvocationBatchRequest
-> f ToFunction'InvocationBatchRequest
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (ToFunction'PersistedValue -> f ToFunction'PersistedValue)
-> [ToFunction'PersistedValue] -> f [ToFunction'PersistedValue]
forall (f :: * -> *) a b.
Traversable f =>
Traversal (f a) (f b) a b
traversed ((ToFunction'PersistedValue -> f ToFunction'PersistedValue)
 -> [ToFunction'PersistedValue] -> f [ToFunction'PersistedValue])
-> ((TypedValue -> f TypedValue)
    -> ToFunction'PersistedValue -> f ToFunction'PersistedValue)
-> (TypedValue -> f TypedValue)
-> [ToFunction'PersistedValue]
-> f [ToFunction'PersistedValue]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (ToFunction'PersistedValue -> Bool)
-> Traversal' ToFunction'PersistedValue ToFunction'PersistedValue
forall a. (a -> Bool) -> Traversal' a a
filtered ((Text -> Text -> Bool
forall a. Eq a => a -> a -> Bool
== "flink_state") (Text -> Bool)
-> (ToFunction'PersistedValue -> Text)
-> ToFunction'PersistedValue
-> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (ToFunction'PersistedValue
-> FoldLike
     Text ToFunction'PersistedValue ToFunction'PersistedValue Text Text
-> Text
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike
  Text ToFunction'PersistedValue ToFunction'PersistedValue Text Text
forall (f :: * -> *) s a.
(Functor f, HasField s "stateName" a) =>
LensLike' f s a
PR.stateName)) ((ToFunction'PersistedValue -> f ToFunction'PersistedValue)
 -> ToFunction'PersistedValue -> f ToFunction'PersistedValue)
-> ((TypedValue -> f TypedValue)
    -> ToFunction'PersistedValue -> f ToFunction'PersistedValue)
-> (TypedValue -> f TypedValue)
-> ToFunction'PersistedValue
-> f ToFunction'PersistedValue
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (TypedValue -> f TypedValue)
-> ToFunction'PersistedValue -> f ToFunction'PersistedValue
forall (f :: * -> *) s a.
(Functor f, HasField s "stateValue" a) =>
LensLike' f s a
PR.stateValue) 
      Fold
  ToFunction'InvocationBatchRequest
  ToFunction'InvocationBatchRequest
  TypedValue
  TypedValue
-> ToFunction'InvocationBatchRequest -> Maybe TypedValue
forall s t a b. Fold s t a b -> s -> Maybe a
`firstOf` ToFunction'InvocationBatchRequest
invocationBatch
    runWithCtx :: Function s FuncRes
runWithCtx = do
      case Maybe TypedValue
mbInitCtx of 
        Nothing -> FuncRes -> Function s FuncRes
forall (m :: * -> *) a. Monad m => a -> m a
return (FuncRes -> Function s FuncRes) -> FuncRes -> Function s FuncRes
forall a b. (a -> b) -> a -> b
$ Expiration -> Text -> FuncRes
IncompleteContext Expiration
expr (Proxy s -> Text
forall a. Serde a => Proxy a -> Text
tpName @s Proxy s
forall k (t :: k). Proxy t
Proxy) -- if state was not propagated to the function - shorcut to incomplete context reponse
        Just tv :: TypedValue
tv -> do 
          Maybe s
mbCtx <- TypedValue -> Function s (Maybe s)
forall a (m :: * -> *).
(Serde a, MonadError FlinkError m) =>
TypedValue -> m (Maybe a)
unwrapA @s TypedValue
tv
          case Maybe s
mbCtx of
            Nothing -> () -> Function s ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure () -- if null state value was propagated
            Just s1 :: s
s1 -> s -> Function s ()
forall s (m :: * -> *). StatefulFunc s m => s -> m ()
setInitialCtx s
s1
          [Maybe a]
mbArgs <- (TypedValue -> Function s (Maybe a))
-> [TypedValue] -> Function s [Maybe a]
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
traverse (forall a (m :: * -> *).
(Serde a, MonadError FlinkError m) =>
TypedValue -> m (Maybe a)
forall (m :: * -> *).
(Serde a, MonadError FlinkError m) =>
TypedValue -> m (Maybe a)
unwrapA @a) [TypedValue]
passedArgs
          [a]
args <- (Maybe a -> Function s a) -> [Maybe a] -> Function s [a]
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
traverse (Function s a -> (a -> Function s a) -> Maybe a -> Function s a
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (FlinkError -> Function s a
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError FlinkError
EmptyArgumentPassed) a -> Function s a
forall (f :: * -> *) a. Applicative f => a -> f a
pure) [Maybe a]
mbArgs
          (a -> Function s ()) -> [a] -> Function s ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ a -> Function s ()
func [a]
args
          (FunctionState s -> FuncRes) -> Function s FuncRes
forall s (m :: * -> *) a. MonadState s m => (s -> a) -> m a
gets (FunctionState TypedValue -> FuncRes
UpdatedState (FunctionState TypedValue -> FuncRes)
-> (FunctionState s -> FunctionState TypedValue)
-> FunctionState s
-> FuncRes
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (s -> TypedValue) -> FunctionState s -> FunctionState TypedValue
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap s -> TypedValue
forall t a.
(Message t, Serde a, HasField t "typename" Text,
 HasField t "value" ByteString) =>
a -> t
outS)
    runner :: FunctionState s -> IO (Either FlinkError FuncRes, FunctionState s)
runner state :: FunctionState s
state = ReaderT Env IO (Either FlinkError FuncRes, FunctionState s)
-> Env -> IO (Either FlinkError FuncRes, FunctionState s)
forall r (m :: * -> *) a. ReaderT r m a -> r -> m a
runReaderT (StateT
  (FunctionState s) (ReaderT Env IO) (Either FlinkError FuncRes)
-> FunctionState s
-> ReaderT Env IO (Either FlinkError FuncRes, FunctionState s)
forall s (m :: * -> *) a. StateT s m a -> s -> m (a, s)
runStateT (ExceptT
  FlinkError (StateT (FunctionState s) (ReaderT Env IO)) FuncRes
-> StateT
     (FunctionState s) (ReaderT Env IO) (Either FlinkError FuncRes)
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT (ExceptT
   FlinkError (StateT (FunctionState s) (ReaderT Env IO)) FuncRes
 -> StateT
      (FunctionState s) (ReaderT Env IO) (Either FlinkError FuncRes))
-> ExceptT
     FlinkError (StateT (FunctionState s) (ReaderT Env IO)) FuncRes
-> StateT
     (FunctionState s) (ReaderT Env IO) (Either FlinkError FuncRes)
forall a b. (a -> b) -> a -> b
$ Function s FuncRes
-> ExceptT
     FlinkError (StateT (FunctionState s) (ReaderT Env IO)) FuncRes
forall s a.
Function s a
-> ExceptT FlinkError (StateT (FunctionState s) (ReaderT Env IO)) a
runFunction Function s FuncRes
runWithCtx) FunctionState s
state) Env
env
    outS :: a -> t
outS fstate :: a
fstate = t
forall msg. Message msg => msg
defMessage
        t -> (t -> t) -> t
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *). Identical f => LensLike' f t Text
forall (f :: * -> *) s a.
(Functor f, HasField s "typename" a) =>
LensLike' f s a
PR.typename (forall (f :: * -> *). Identical f => LensLike' f t Text)
-> Text -> t -> t
forall s t a b. Setter s t a b -> b -> s -> t
.~ Proxy a -> Text
forall a. Serde a => Proxy a -> Text
tpName (a -> Proxy a
forall (f :: * -> *) a. Applicative f => a -> f a
pure a
fstate)
        t -> (t -> t) -> t
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *). Identical f => LensLike' f t ByteString
forall (f :: * -> *) s a.
(Functor f, HasField s "value" a) =>
LensLike' f s a
PR.value (forall (f :: * -> *). Identical f => LensLike' f t ByteString)
-> ByteString -> t -> t
forall s t a b. Setter s t a b -> b -> s -> t
.~ a -> ByteString
forall a. Serde a => a -> ByteString
serializeBytes a
fstate

createFlinkResp :: FuncRes -> FromFunction
createFlinkResp :: FuncRes -> FromFunction
createFlinkResp (UpdatedState (FunctionState state :: TypedValue
state mutated :: Bool
mutated invocations :: Seq FromFunction'Invocation
invocations delayedInvocations :: Seq FromFunction'DelayedInvocation
delayedInvocations egresses :: Seq FromFunction'EgressMessage
egresses)) =
  FromFunction
forall msg. Message msg => msg
defMessage FromFunction -> (FromFunction -> FromFunction) -> FromFunction
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *).
Identical f =>
LensLike' f FromFunction FromFunction'InvocationResponse
forall (f :: * -> *) s a.
(Functor f, HasField s "invocationResult" a) =>
LensLike' f s a
PR.invocationResult
    (forall (f :: * -> *).
 Identical f =>
 LensLike' f FromFunction FromFunction'InvocationResponse)
-> FromFunction'InvocationResponse -> FromFunction -> FromFunction
forall s t a b. Setter s t a b -> b -> s -> t
.~ ( FromFunction'InvocationResponse
forall msg. Message msg => msg
defMessage
           FromFunction'InvocationResponse
-> (FromFunction'InvocationResponse
    -> FromFunction'InvocationResponse)
-> FromFunction'InvocationResponse
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *).
Identical f =>
LensLike'
  f
  FromFunction'InvocationResponse
  [FromFunction'PersistedValueMutation]
forall (f :: * -> *) s a.
(Functor f, HasField s "stateMutations" a) =>
LensLike' f s a
PR.stateMutations (forall (f :: * -> *).
 Identical f =>
 LensLike'
   f
   FromFunction'InvocationResponse
   [FromFunction'PersistedValueMutation])
-> [FromFunction'PersistedValueMutation]
-> FromFunction'InvocationResponse
-> FromFunction'InvocationResponse
forall s t a b. Setter s t a b -> b -> s -> t
.~ [FromFunction'PersistedValueMutation]
-> [FromFunction'PersistedValueMutation]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList [FromFunction'PersistedValueMutation]
stateMutations
           FromFunction'InvocationResponse
-> (FromFunction'InvocationResponse
    -> FromFunction'InvocationResponse)
-> FromFunction'InvocationResponse
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *).
Identical f =>
LensLike'
  f FromFunction'InvocationResponse [FromFunction'Invocation]
forall (f :: * -> *) s a.
(Functor f, HasField s "outgoingMessages" a) =>
LensLike' f s a
PR.outgoingMessages (forall (f :: * -> *).
 Identical f =>
 LensLike'
   f FromFunction'InvocationResponse [FromFunction'Invocation])
-> [FromFunction'Invocation]
-> FromFunction'InvocationResponse
-> FromFunction'InvocationResponse
forall s t a b. Setter s t a b -> b -> s -> t
.~ Seq FromFunction'Invocation -> [FromFunction'Invocation]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList Seq FromFunction'Invocation
invocations
           FromFunction'InvocationResponse
-> (FromFunction'InvocationResponse
    -> FromFunction'InvocationResponse)
-> FromFunction'InvocationResponse
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *).
Identical f =>
LensLike'
  f FromFunction'InvocationResponse [FromFunction'DelayedInvocation]
forall (f :: * -> *) s a.
(Functor f, HasField s "delayedInvocations" a) =>
LensLike' f s a
PR.delayedInvocations (forall (f :: * -> *).
 Identical f =>
 LensLike'
   f FromFunction'InvocationResponse [FromFunction'DelayedInvocation])
-> [FromFunction'DelayedInvocation]
-> FromFunction'InvocationResponse
-> FromFunction'InvocationResponse
forall s t a b. Setter s t a b -> b -> s -> t
.~ Seq FromFunction'DelayedInvocation
-> [FromFunction'DelayedInvocation]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList Seq FromFunction'DelayedInvocation
delayedInvocations
           FromFunction'InvocationResponse
-> (FromFunction'InvocationResponse
    -> FromFunction'InvocationResponse)
-> FromFunction'InvocationResponse
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *).
Identical f =>
LensLike'
  f FromFunction'InvocationResponse [FromFunction'EgressMessage]
forall (f :: * -> *) s a.
(Functor f, HasField s "outgoingEgresses" a) =>
LensLike' f s a
PR.outgoingEgresses (forall (f :: * -> *).
 Identical f =>
 LensLike'
   f FromFunction'InvocationResponse [FromFunction'EgressMessage])
-> [FromFunction'EgressMessage]
-> FromFunction'InvocationResponse
-> FromFunction'InvocationResponse
forall s t a b. Setter s t a b -> b -> s -> t
.~ Seq FromFunction'EgressMessage -> [FromFunction'EgressMessage]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList Seq FromFunction'EgressMessage
egresses
       )
  where
    stateMutations :: [PR.FromFunction'PersistedValueMutation]
    stateMutations :: [FromFunction'PersistedValueMutation]
stateMutations =
      [ FromFunction'PersistedValueMutation
forall msg. Message msg => msg
defMessage
          FromFunction'PersistedValueMutation
-> (FromFunction'PersistedValueMutation
    -> FromFunction'PersistedValueMutation)
-> FromFunction'PersistedValueMutation
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *).
Identical f =>
LensLike'
  f
  FromFunction'PersistedValueMutation
  FromFunction'PersistedValueMutation'MutationType
forall (f :: * -> *) s a.
(Functor f, HasField s "mutationType" a) =>
LensLike' f s a
PR.mutationType (forall (f :: * -> *).
 Identical f =>
 LensLike'
   f
   FromFunction'PersistedValueMutation
   FromFunction'PersistedValueMutation'MutationType)
-> FromFunction'PersistedValueMutation'MutationType
-> FromFunction'PersistedValueMutation
-> FromFunction'PersistedValueMutation
forall s t a b. Setter s t a b -> b -> s -> t
.~ FromFunction'PersistedValueMutation'MutationType
PR.FromFunction'PersistedValueMutation'MODIFY
          FromFunction'PersistedValueMutation
-> (FromFunction'PersistedValueMutation
    -> FromFunction'PersistedValueMutation)
-> FromFunction'PersistedValueMutation
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *).
Identical f =>
LensLike' f FromFunction'PersistedValueMutation Text
forall (f :: * -> *) s a.
(Functor f, HasField s "stateName" a) =>
LensLike' f s a
PR.stateName (forall (f :: * -> *).
 Identical f =>
 LensLike' f FromFunction'PersistedValueMutation Text)
-> Text
-> FromFunction'PersistedValueMutation
-> FromFunction'PersistedValueMutation
forall s t a b. Setter s t a b -> b -> s -> t
.~ "flink_state"
          FromFunction'PersistedValueMutation
-> (FromFunction'PersistedValueMutation
    -> FromFunction'PersistedValueMutation)
-> FromFunction'PersistedValueMutation
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *).
Identical f =>
LensLike' f FromFunction'PersistedValueMutation TypedValue
forall (f :: * -> *) s a.
(Functor f, HasField s "stateValue" a) =>
LensLike' f s a
PR.stateValue (forall (f :: * -> *).
 Identical f =>
 LensLike' f FromFunction'PersistedValueMutation TypedValue)
-> TypedValue
-> FromFunction'PersistedValueMutation
-> FromFunction'PersistedValueMutation
forall s t a b. Setter s t a b -> b -> s -> t
.~ TypedValue
state
        | Bool
mutated
      ]
createFlinkResp (IncompleteContext (Expiration mode :: ExpirationMode
mode expireTime :: NominalDiffTime
expireTime) typeName :: Text
typeName) = 
  FromFunction
forall msg. Message msg => msg
defMessage FromFunction -> (FromFunction -> FromFunction) -> FromFunction
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *).
Identical f =>
LensLike' f FromFunction FromFunction'IncompleteInvocationContext
forall (f :: * -> *) s a.
(Functor f, HasField s "incompleteInvocationContext" a) =>
LensLike' f s a
PR.incompleteInvocationContext (forall (f :: * -> *).
 Identical f =>
 LensLike' f FromFunction FromFunction'IncompleteInvocationContext)
-> FromFunction'IncompleteInvocationContext
-> FromFunction
-> FromFunction
forall s t a b. Setter s t a b -> b -> s -> t
.~ ( 
    FromFunction'IncompleteInvocationContext
forall msg. Message msg => msg
defMessage FromFunction'IncompleteInvocationContext
-> (FromFunction'IncompleteInvocationContext
    -> FromFunction'IncompleteInvocationContext)
-> FromFunction'IncompleteInvocationContext
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *).
Identical f =>
LensLike'
  f
  FromFunction'IncompleteInvocationContext
  [FromFunction'PersistedValueSpec]
forall (f :: * -> *) s a.
(Functor f, HasField s "missingValues" a) =>
LensLike' f s a
PR.missingValues (forall (f :: * -> *).
 Identical f =>
 LensLike'
   f
   FromFunction'IncompleteInvocationContext
   [FromFunction'PersistedValueSpec])
-> [FromFunction'PersistedValueSpec]
-> FromFunction'IncompleteInvocationContext
-> FromFunction'IncompleteInvocationContext
forall s t a b. Setter s t a b -> b -> s -> t
.~ [
      FromFunction'PersistedValueSpec
forall msg. Message msg => msg
defMessage
        FromFunction'PersistedValueSpec
-> (FromFunction'PersistedValueSpec
    -> FromFunction'PersistedValueSpec)
-> FromFunction'PersistedValueSpec
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *).
Identical f =>
LensLike' f FromFunction'PersistedValueSpec Text
forall (f :: * -> *) s a.
(Functor f, HasField s "stateName" a) =>
LensLike' f s a
PR.stateName (forall (f :: * -> *).
 Identical f =>
 LensLike' f FromFunction'PersistedValueSpec Text)
-> Text
-> FromFunction'PersistedValueSpec
-> FromFunction'PersistedValueSpec
forall s t a b. Setter s t a b -> b -> s -> t
.~ "flink_state"
        FromFunction'PersistedValueSpec
-> (FromFunction'PersistedValueSpec
    -> FromFunction'PersistedValueSpec)
-> FromFunction'PersistedValueSpec
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *).
Identical f =>
LensLike' f FromFunction'PersistedValueSpec Text
forall (f :: * -> *) s a.
(Functor f, HasField s "typeTypename" a) =>
LensLike' f s a
PR.typeTypename (forall (f :: * -> *).
 Identical f =>
 LensLike' f FromFunction'PersistedValueSpec Text)
-> Text
-> FromFunction'PersistedValueSpec
-> FromFunction'PersistedValueSpec
forall s t a b. Setter s t a b -> b -> s -> t
.~ Text
typeName
        FromFunction'PersistedValueSpec
-> (FromFunction'PersistedValueSpec
    -> FromFunction'PersistedValueSpec)
-> FromFunction'PersistedValueSpec
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *).
Identical f =>
LensLike'
  f FromFunction'PersistedValueSpec FromFunction'ExpirationSpec
forall (f :: * -> *) s a.
(Functor f, HasField s "expirationSpec" a) =>
LensLike' f s a
PR.expirationSpec (forall (f :: * -> *).
 Identical f =>
 LensLike'
   f FromFunction'PersistedValueSpec FromFunction'ExpirationSpec)
-> FromFunction'ExpirationSpec
-> FromFunction'PersistedValueSpec
-> FromFunction'PersistedValueSpec
forall s t a b. Setter s t a b -> b -> s -> t
.~ (
          FromFunction'ExpirationSpec
forall msg. Message msg => msg
defMessage 
            FromFunction'ExpirationSpec
-> (FromFunction'ExpirationSpec -> FromFunction'ExpirationSpec)
-> FromFunction'ExpirationSpec
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *).
Identical f =>
LensLike' f FromFunction'ExpirationSpec Int64
forall (f :: * -> *) s a.
(Functor f, HasField s "expireAfterMillis" a) =>
LensLike' f s a
PR.expireAfterMillis (forall (f :: * -> *).
 Identical f =>
 LensLike' f FromFunction'ExpirationSpec Int64)
-> Int64
-> FromFunction'ExpirationSpec
-> FromFunction'ExpirationSpec
forall s t a b. Setter s t a b -> b -> s -> t
.~ NominalDiffTime -> Int64
forall a b. (RealFrac a, Integral b) => a -> b
round (NominalDiffTime
expireTime NominalDiffTime -> NominalDiffTime -> NominalDiffTime
forall a. Num a => a -> a -> a
* 1000.0)
            FromFunction'ExpirationSpec
-> (FromFunction'ExpirationSpec -> FromFunction'ExpirationSpec)
-> FromFunction'ExpirationSpec
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *).
Identical f =>
LensLike'
  f
  FromFunction'ExpirationSpec
  FromFunction'ExpirationSpec'ExpireMode
forall (f :: * -> *) s a.
(Functor f, HasField s "mode" a) =>
LensLike' f s a
PR.mode (forall (f :: * -> *).
 Identical f =>
 LensLike'
   f
   FromFunction'ExpirationSpec
   FromFunction'ExpirationSpec'ExpireMode)
-> FromFunction'ExpirationSpec'ExpireMode
-> FromFunction'ExpirationSpec
-> FromFunction'ExpirationSpec
forall s t a b. Setter s t a b -> b -> s -> t
.~ ExpirationMode -> FromFunction'ExpirationSpec'ExpireMode
pbmode ExpirationMode
mode)
      ])
  where pbmode :: ExpirationMode -> FromFunction'ExpirationSpec'ExpireMode
pbmode NONE = FromFunction'ExpirationSpec'ExpireMode
PR.FromFunction'ExpirationSpec'NONE 
        pbmode AFTER_CALL = FromFunction'ExpirationSpec'ExpireMode
PR.FromFunction'ExpirationSpec'AFTER_INVOKE  
        pbmode AFTER_WRITE = FromFunction'ExpirationSpec'ExpireMode
PR.FromFunction'ExpirationSpec'AFTER_WRITE  

type FlinkApi =
  "statefun" :> ReqBody '[Proto] ToFunction :> Post '[Proto] FromFunction

flinkApi :: Proxy FlinkApi
flinkApi :: Proxy FlinkApi
flinkApi = Proxy FlinkApi
forall k (t :: k). Proxy t
Proxy

-- | Takes function table and creates a wai 'Application' to serve flink requests
createApp :: FunctionTable -> Application
createApp :: FunctionTable -> Application
createApp funcs :: FunctionTable
funcs = Proxy FlinkApi -> Server FlinkApi -> Application
forall api.
HasServer api '[] =>
Proxy api -> Server api -> Application
serve Proxy FlinkApi
flinkApi (FunctionTable -> Server FlinkApi
flinkServer FunctionTable
funcs)

-- | Takes function table and creates a servant 'Server' to serve flink requests
flinkServer :: FunctionTable -> Server FlinkApi
flinkServer :: FunctionTable -> Server FlinkApi
flinkServer functions :: FunctionTable
functions toFunction :: ToFunction
toFunction = do
  ToFunction'InvocationBatchRequest
batch <- ToFunction -> Handler ToFunction'InvocationBatchRequest
forall (m :: * -> *) t.
(MonadError ServerError m,
 HasField t "maybe'request" (Maybe ToFunction'Request)) =>
t -> m ToFunction'InvocationBatchRequest
getBatch ToFunction
toFunction
  (function :: FuncExec
function, (namespace :: Text
namespace, type' :: Text
type', id' :: Text
id')) <- Address -> Handler (FuncExec, (Text, Text, Text))
forall (m :: * -> *) t c.
(MonadError ServerError m, HasField t "id" c,
 HasField t "namespace" Text, HasField t "type'" Text) =>
t -> m (FuncExec, (Text, Text, c))
findFunc (ToFunction'InvocationBatchRequest
batch ToFunction'InvocationBatchRequest
-> FoldLike
     Address
     ToFunction'InvocationBatchRequest
     ToFunction'InvocationBatchRequest
     Address
     Address
-> Address
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike
  Address
  ToFunction'InvocationBatchRequest
  ToFunction'InvocationBatchRequest
  Address
  Address
forall (f :: * -> *) s a.
(Functor f, HasField s "target" a) =>
LensLike' f s a
PR.target)
  Either FlinkError FuncRes
result <- IO (Either FlinkError FuncRes)
-> Handler (Either FlinkError FuncRes)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either FlinkError FuncRes)
 -> Handler (Either FlinkError FuncRes))
-> IO (Either FlinkError FuncRes)
-> Handler (Either FlinkError FuncRes)
forall a b. (a -> b) -> a -> b
$ FuncExec
function (Text -> Text -> Text -> Env
Env Text
namespace Text
type' Text
id') ToFunction'InvocationBatchRequest
batch
  FuncRes
finalState <- Either ServerError FuncRes -> Handler FuncRes
forall e (m :: * -> *) a. MonadError e m => Either e a -> m a
liftEither (Either ServerError FuncRes -> Handler FuncRes)
-> Either ServerError FuncRes -> Handler FuncRes
forall a b. (a -> b) -> a -> b
$ (FlinkError -> ServerError)
-> Either FlinkError FuncRes -> Either ServerError FuncRes
forall a c b. (a -> c) -> Either a b -> Either c b
mapLeft FlinkError -> ServerError
flinkErrToServant Either FlinkError FuncRes
result
  FromFunction -> Handler FromFunction
forall (m :: * -> *) a. Monad m => a -> m a
return (FromFunction -> Handler FromFunction)
-> FromFunction -> Handler FromFunction
forall a b. (a -> b) -> a -> b
$ FuncRes -> FromFunction
createFlinkResp FuncRes
finalState
  where
    getBatch :: t -> m ToFunction'InvocationBatchRequest
getBatch input :: t
input = m ToFunction'InvocationBatchRequest
-> (ToFunction'InvocationBatchRequest
    -> m ToFunction'InvocationBatchRequest)
-> Maybe ToFunction'InvocationBatchRequest
-> m ToFunction'InvocationBatchRequest
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (ServerError -> m ToFunction'InvocationBatchRequest
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError (ServerError -> m ToFunction'InvocationBatchRequest)
-> ServerError -> m ToFunction'InvocationBatchRequest
forall a b. (a -> b) -> a -> b
$ FlinkError -> ServerError
flinkErrToServant FlinkError
MissingInvocationBatch) ToFunction'InvocationBatchRequest
-> m ToFunction'InvocationBatchRequest
forall (m :: * -> *) a. Monad m => a -> m a
return (t
input t
-> Fold
     t
     t
     ToFunction'InvocationBatchRequest
     ToFunction'InvocationBatchRequest
-> Maybe ToFunction'InvocationBatchRequest
forall s t a b. s -> Fold s t a b -> Maybe a
^? LensLike' f t (Maybe ToFunction'Request)
forall (f :: * -> *) s a.
(Functor f, HasField s "maybe'request" a) =>
LensLike' f s a
PR.maybe'request LensLike' f t (Maybe ToFunction'Request)
-> ((ToFunction'InvocationBatchRequest
     -> f ToFunction'InvocationBatchRequest)
    -> Maybe ToFunction'Request -> f (Maybe ToFunction'Request))
-> (ToFunction'InvocationBatchRequest
    -> f ToFunction'InvocationBatchRequest)
-> t
-> f t
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (ToFunction'Request -> f ToFunction'Request)
-> Maybe ToFunction'Request -> f (Maybe ToFunction'Request)
forall a b. Prism (Maybe a) (Maybe b) a b
_Just ((ToFunction'Request -> f ToFunction'Request)
 -> Maybe ToFunction'Request -> f (Maybe ToFunction'Request))
-> ((ToFunction'InvocationBatchRequest
     -> f ToFunction'InvocationBatchRequest)
    -> ToFunction'Request -> f ToFunction'Request)
-> (ToFunction'InvocationBatchRequest
    -> f ToFunction'InvocationBatchRequest)
-> Maybe ToFunction'Request
-> f (Maybe ToFunction'Request)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (ToFunction'InvocationBatchRequest
 -> f ToFunction'InvocationBatchRequest)
-> ToFunction'Request -> f ToFunction'Request
Prism' ToFunction'Request ToFunction'InvocationBatchRequest
PR._ToFunction'Invocation')
    findFunc :: t -> m (FuncExec, (Text, Text, c))
findFunc addr :: t
addr = do
      FuncExec
res <- m FuncExec
-> (FuncExec -> m FuncExec) -> Maybe FuncExec -> m FuncExec
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (ServerError -> m FuncExec
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError (ServerError -> m FuncExec) -> ServerError -> m FuncExec
forall a b. (a -> b) -> a -> b
$ FlinkError -> ServerError
flinkErrToServant (FlinkError -> ServerError) -> FlinkError -> ServerError
forall a b. (a -> b) -> a -> b
$ (Text, Text) -> FlinkError
NoSuchFunction (Text
namespace, Text
type')) FuncExec -> m FuncExec
forall (m :: * -> *) a. Monad m => a -> m a
return (FuncType -> FunctionTable -> Maybe FuncExec
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup (Text -> Text -> FuncType
FuncType Text
namespace Text
type') FunctionTable
functions)
      (FuncExec, (Text, Text, c)) -> m (FuncExec, (Text, Text, c))
forall (m :: * -> *) a. Monad m => a -> m a
return (FuncExec
res, (Text, Text, c)
address)
      where
        address :: (Text, Text, c)
address@(namespace :: Text
namespace, type' :: Text
type', _) = (t
addr t -> FoldLike Text t t Text Text -> Text
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike Text t t Text Text
forall (f :: * -> *) s a.
(Functor f, HasField s "namespace" a) =>
LensLike' f s a
PR.namespace, t
addr t -> FoldLike Text t t Text Text -> Text
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike Text t t Text Text
forall (f :: * -> *) s a.
(Functor f, HasField s "type'" a) =>
LensLike' f s a
PR.type', t
addr t -> FoldLike c t t c c -> c
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike c t t c c
forall (f :: * -> *) s a.
(Functor f, HasField s "id" a) =>
LensLike' f s a
PR.id)

flinkErrToServant :: FlinkError -> ServerError
flinkErrToServant :: FlinkError -> ServerError
flinkErrToServant err :: FlinkError
err = case FlinkError
err of
  MissingInvocationBatch -> ServerError
err400 {errBody :: ByteString
errBody = "Invocation batch missing"}
  ProtodeserializeBytesError protoErr :: String
protoErr -> ServerError
err400 {errBody :: ByteString
errBody = "Could not deserializeBytes protobuf " ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> String -> ByteString
BSL.pack String
protoErr}
  StateDecodeError decodeErr :: String
decodeErr -> ServerError
err400 {errBody :: ByteString
errBody = "Invalid JSON " ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> String -> ByteString
BSL.pack String
decodeErr}
  MessageDecodeError msg :: String
msg -> ServerError
err400 {errBody :: ByteString
errBody = "Failed to decode message " ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> String -> ByteString
BSL.pack String
msg}
  ProtoMessageDecodeError msg :: UnpackError
msg -> ServerError
err400 {errBody :: ByteString
errBody = "Failed to decode message " ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> String -> ByteString
BSL.pack (UnpackError -> String
forall a. Show a => a -> String
show UnpackError
msg)}
  NoSuchFunction (namespace :: Text
namespace, type' :: Text
type') -> ServerError
err400 {errBody :: ByteString
errBody = "No such function " ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> Text -> ByteString
T.encodeUtf8 (Text -> Text
fromStrict Text
namespace) ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> Text -> ByteString
T.encodeUtf8 (Text -> Text
fromStrict Text
type')}
  InvalidTypePassedError expected :: Text
expected passed :: Text
passed -> ServerError
err400 {errBody :: ByteString
errBody = "Expected type " ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> Text -> ByteString
T.encodeUtf8 (Text -> Text
fromStrict Text
expected) ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ", got " ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<>  Text -> ByteString
T.encodeUtf8 (Text -> Text
fromStrict Text
passed)}
  EmptyArgumentPassed -> ServerError
err400 {errBody :: ByteString
errBody = "Empty argument was passed to the function" }