{-# LANGUAGE PatternSynonyms #-}
module Network.Flink.Internal.Stateful
( StatefulFunc
( insideCtx,
getCtx,
setCtx,
modifyCtx,
sendMsg,
sendMsgDelay,
sendEgressMsg
),
flinkWrapper,
createApp,
flinkServer,
flinkApi,
Address(.., Address'),
FuncType (..),
Function (..),
Serde (..),
FunctionState (..),
FlinkError (..),
FunctionTable,
Env (..),
Expiration(..),
ExpirationMode(..),
newState,
ProtoSerde (..),
JsonSerde (..),
jsonState,
protoState,
sendProtoMsg,
sendProtoMsgDelay
)
where
import Control.Monad.Except
import Control.Monad.Reader
import Control.Monad.State (MonadState, StateT (..), gets, modify)
import Data.Aeson (FromJSON, ToJSON, eitherDecode, encode)
import Data.ByteString (ByteString)
import qualified Data.ByteString.Lazy.Char8 as BSL
import Data.Either.Combinators (mapLeft)
import Data.Foldable (Foldable (toList))
import Data.Map (Map)
import qualified Data.Map as Map
import Data.ProtoLens (Message, defMessage, encodeMessage, messageName)
import Data.ProtoLens.Any (UnpackError)
import Data.ProtoLens.Encoding (decodeMessage)
import Data.ProtoLens.Prism
import Data.Sequence (Seq)
import qualified Data.Sequence as Seq
import Data.Text (Text)
import Data.Text.Lazy (fromStrict)
import Data.Coerce ( coerce )
import qualified Data.Text.Lazy.Encoding as T
import Lens.Family2
import Lens.Micro ( traversed, filtered )
import Network.Flink.Internal.ProtoServant (Proto)
import Proto.RequestReply (FromFunction, ToFunction)
import qualified Proto.RequestReply as PR
import qualified Proto.RequestReply_Fields as PR
import Servant
import Data.Time.Clock ( NominalDiffTime )
data FuncType = FuncType Text Text deriving (FuncType -> FuncType -> Bool
(FuncType -> FuncType -> Bool)
-> (FuncType -> FuncType -> Bool) -> Eq FuncType
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: FuncType -> FuncType -> Bool
$c/= :: FuncType -> FuncType -> Bool
== :: FuncType -> FuncType -> Bool
$c== :: FuncType -> FuncType -> Bool
Eq, Eq FuncType
Eq FuncType =>
(FuncType -> FuncType -> Ordering)
-> (FuncType -> FuncType -> Bool)
-> (FuncType -> FuncType -> Bool)
-> (FuncType -> FuncType -> Bool)
-> (FuncType -> FuncType -> Bool)
-> (FuncType -> FuncType -> FuncType)
-> (FuncType -> FuncType -> FuncType)
-> Ord FuncType
FuncType -> FuncType -> Bool
FuncType -> FuncType -> Ordering
FuncType -> FuncType -> FuncType
forall a.
Eq a =>
(a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
min :: FuncType -> FuncType -> FuncType
$cmin :: FuncType -> FuncType -> FuncType
max :: FuncType -> FuncType -> FuncType
$cmax :: FuncType -> FuncType -> FuncType
>= :: FuncType -> FuncType -> Bool
$c>= :: FuncType -> FuncType -> Bool
> :: FuncType -> FuncType -> Bool
$c> :: FuncType -> FuncType -> Bool
<= :: FuncType -> FuncType -> Bool
$c<= :: FuncType -> FuncType -> Bool
< :: FuncType -> FuncType -> Bool
$c< :: FuncType -> FuncType -> Bool
compare :: FuncType -> FuncType -> Ordering
$ccompare :: FuncType -> FuncType -> Ordering
$cp1Ord :: Eq FuncType
Ord)
data Address = Address FuncType Text
{-# COMPLETE Address' #-}
pattern Address' :: Text -> Text -> Text -> Address
pattern $bAddress' :: Text -> Text -> Text -> Address
$mAddress' :: forall r.
Address -> (Text -> Text -> Text -> r) -> (Void# -> r) -> r
Address' fnamespace fnTp fid = Address (FuncType fnamespace fnTp) fid
data FuncRes = IncompleteContext Expiration Text | UpdatedState (FunctionState PR.TypedValue) deriving Int -> FuncRes -> ShowS
[FuncRes] -> ShowS
FuncRes -> String
(Int -> FuncRes -> ShowS)
-> (FuncRes -> String) -> ([FuncRes] -> ShowS) -> Show FuncRes
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [FuncRes] -> ShowS
$cshowList :: [FuncRes] -> ShowS
show :: FuncRes -> String
$cshow :: FuncRes -> String
showsPrec :: Int -> FuncRes -> ShowS
$cshowsPrec :: Int -> FuncRes -> ShowS
Show
type FuncExec = Env -> PR.ToFunction'InvocationBatchRequest -> IO (Either FlinkError FuncRes)
type FunctionTable = Map FuncType FuncExec
data Env = Env
{ Env -> Text
envFunctionNamespace :: Text,
Env -> Text
envFunctionType :: Text,
Env -> Text
envFunctionId :: Text
}
deriving (Int -> Env -> ShowS
[Env] -> ShowS
Env -> String
(Int -> Env -> ShowS)
-> (Env -> String) -> ([Env] -> ShowS) -> Show Env
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [Env] -> ShowS
$cshowList :: [Env] -> ShowS
show :: Env -> String
$cshow :: Env -> String
showsPrec :: Int -> Env -> ShowS
$cshowsPrec :: Int -> Env -> ShowS
Show)
data FunctionState ctx = FunctionState
{ FunctionState ctx -> ctx
functionStateCtx :: ctx,
FunctionState ctx -> Bool
functionStateMutated :: Bool,
FunctionState ctx -> Seq FromFunction'Invocation
functionStateInvocations :: Seq PR.FromFunction'Invocation,
FunctionState ctx -> Seq FromFunction'DelayedInvocation
functionStateDelayedInvocations :: Seq PR.FromFunction'DelayedInvocation,
FunctionState ctx -> Seq FromFunction'EgressMessage
functionStateEgressMessages :: Seq PR.FromFunction'EgressMessage
}
deriving (Int -> FunctionState ctx -> ShowS
[FunctionState ctx] -> ShowS
FunctionState ctx -> String
(Int -> FunctionState ctx -> ShowS)
-> (FunctionState ctx -> String)
-> ([FunctionState ctx] -> ShowS)
-> Show (FunctionState ctx)
forall ctx. Show ctx => Int -> FunctionState ctx -> ShowS
forall ctx. Show ctx => [FunctionState ctx] -> ShowS
forall ctx. Show ctx => FunctionState ctx -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [FunctionState ctx] -> ShowS
$cshowList :: forall ctx. Show ctx => [FunctionState ctx] -> ShowS
show :: FunctionState ctx -> String
$cshow :: forall ctx. Show ctx => FunctionState ctx -> String
showsPrec :: Int -> FunctionState ctx -> ShowS
$cshowsPrec :: forall ctx. Show ctx => Int -> FunctionState ctx -> ShowS
Show, a -> FunctionState b -> FunctionState a
(a -> b) -> FunctionState a -> FunctionState b
(forall a b. (a -> b) -> FunctionState a -> FunctionState b)
-> (forall a b. a -> FunctionState b -> FunctionState a)
-> Functor FunctionState
forall a b. a -> FunctionState b -> FunctionState a
forall a b. (a -> b) -> FunctionState a -> FunctionState b
forall (f :: * -> *).
(forall a b. (a -> b) -> f a -> f b)
-> (forall a b. a -> f b -> f a) -> Functor f
<$ :: a -> FunctionState b -> FunctionState a
$c<$ :: forall a b. a -> FunctionState b -> FunctionState a
fmap :: (a -> b) -> FunctionState a -> FunctionState b
$cfmap :: forall a b. (a -> b) -> FunctionState a -> FunctionState b
Functor)
newState :: a -> FunctionState a
newState :: a -> FunctionState a
newState initialCtx :: a
initialCtx = a
-> Bool
-> Seq FromFunction'Invocation
-> Seq FromFunction'DelayedInvocation
-> Seq FromFunction'EgressMessage
-> FunctionState a
forall ctx.
ctx
-> Bool
-> Seq FromFunction'Invocation
-> Seq FromFunction'DelayedInvocation
-> Seq FromFunction'EgressMessage
-> FunctionState ctx
FunctionState a
initialCtx Bool
False Seq FromFunction'Invocation
forall a. Monoid a => a
mempty Seq FromFunction'DelayedInvocation
forall a. Monoid a => a
mempty Seq FromFunction'EgressMessage
forall a. Monoid a => a
mempty
data ExpirationMode = NONE | AFTER_WRITE | AFTER_CALL deriving (Int -> ExpirationMode -> ShowS
[ExpirationMode] -> ShowS
ExpirationMode -> String
(Int -> ExpirationMode -> ShowS)
-> (ExpirationMode -> String)
-> ([ExpirationMode] -> ShowS)
-> Show ExpirationMode
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [ExpirationMode] -> ShowS
$cshowList :: [ExpirationMode] -> ShowS
show :: ExpirationMode -> String
$cshow :: ExpirationMode -> String
showsPrec :: Int -> ExpirationMode -> ShowS
$cshowsPrec :: Int -> ExpirationMode -> ShowS
Show, ExpirationMode -> ExpirationMode -> Bool
(ExpirationMode -> ExpirationMode -> Bool)
-> (ExpirationMode -> ExpirationMode -> Bool) -> Eq ExpirationMode
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: ExpirationMode -> ExpirationMode -> Bool
$c/= :: ExpirationMode -> ExpirationMode -> Bool
== :: ExpirationMode -> ExpirationMode -> Bool
$c== :: ExpirationMode -> ExpirationMode -> Bool
Eq)
data Expiration = Expiration {
Expiration -> ExpirationMode
emode :: ExpirationMode,
Expiration -> NominalDiffTime
expireAfterMillis :: NominalDiffTime
} deriving (Int -> Expiration -> ShowS
[Expiration] -> ShowS
Expiration -> String
(Int -> Expiration -> ShowS)
-> (Expiration -> String)
-> ([Expiration] -> ShowS)
-> Show Expiration
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [Expiration] -> ShowS
$cshowList :: [Expiration] -> ShowS
show :: Expiration -> String
$cshow :: Expiration -> String
showsPrec :: Int -> Expiration -> ShowS
$cshowsPrec :: Int -> Expiration -> ShowS
Show, Expiration -> Expiration -> Bool
(Expiration -> Expiration -> Bool)
-> (Expiration -> Expiration -> Bool) -> Eq Expiration
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: Expiration -> Expiration -> Bool
$c/= :: Expiration -> Expiration -> Bool
== :: Expiration -> Expiration -> Bool
$c== :: Expiration -> Expiration -> Bool
Eq)
newtype Function s a = Function {Function s a
-> ExceptT FlinkError (StateT (FunctionState s) (ReaderT Env IO)) a
runFunction :: ExceptT FlinkError (StateT (FunctionState s) (ReaderT Env IO)) a}
deriving (Applicative (Function s)
a -> Function s a
Applicative (Function s) =>
(forall a b. Function s a -> (a -> Function s b) -> Function s b)
-> (forall a b. Function s a -> Function s b -> Function s b)
-> (forall a. a -> Function s a)
-> Monad (Function s)
Function s a -> (a -> Function s b) -> Function s b
Function s a -> Function s b -> Function s b
forall s. Applicative (Function s)
forall a. a -> Function s a
forall s a. a -> Function s a
forall a b. Function s a -> Function s b -> Function s b
forall a b. Function s a -> (a -> Function s b) -> Function s b
forall s a b. Function s a -> Function s b -> Function s b
forall s a b. Function s a -> (a -> Function s b) -> Function s b
forall (m :: * -> *).
Applicative m =>
(forall a b. m a -> (a -> m b) -> m b)
-> (forall a b. m a -> m b -> m b)
-> (forall a. a -> m a)
-> Monad m
return :: a -> Function s a
$creturn :: forall s a. a -> Function s a
>> :: Function s a -> Function s b -> Function s b
$c>> :: forall s a b. Function s a -> Function s b -> Function s b
>>= :: Function s a -> (a -> Function s b) -> Function s b
$c>>= :: forall s a b. Function s a -> (a -> Function s b) -> Function s b
$cp1Monad :: forall s. Applicative (Function s)
Monad, Functor (Function s)
a -> Function s a
Functor (Function s) =>
(forall a. a -> Function s a)
-> (forall a b.
Function s (a -> b) -> Function s a -> Function s b)
-> (forall a b c.
(a -> b -> c) -> Function s a -> Function s b -> Function s c)
-> (forall a b. Function s a -> Function s b -> Function s b)
-> (forall a b. Function s a -> Function s b -> Function s a)
-> Applicative (Function s)
Function s a -> Function s b -> Function s b
Function s a -> Function s b -> Function s a
Function s (a -> b) -> Function s a -> Function s b
(a -> b -> c) -> Function s a -> Function s b -> Function s c
forall s. Functor (Function s)
forall a. a -> Function s a
forall s a. a -> Function s a
forall a b. Function s a -> Function s b -> Function s a
forall a b. Function s a -> Function s b -> Function s b
forall a b. Function s (a -> b) -> Function s a -> Function s b
forall s a b. Function s a -> Function s b -> Function s a
forall s a b. Function s a -> Function s b -> Function s b
forall s a b. Function s (a -> b) -> Function s a -> Function s b
forall a b c.
(a -> b -> c) -> Function s a -> Function s b -> Function s c
forall s a b c.
(a -> b -> c) -> Function s a -> Function s b -> Function s c
forall (f :: * -> *).
Functor f =>
(forall a. a -> f a)
-> (forall a b. f (a -> b) -> f a -> f b)
-> (forall a b c. (a -> b -> c) -> f a -> f b -> f c)
-> (forall a b. f a -> f b -> f b)
-> (forall a b. f a -> f b -> f a)
-> Applicative f
<* :: Function s a -> Function s b -> Function s a
$c<* :: forall s a b. Function s a -> Function s b -> Function s a
*> :: Function s a -> Function s b -> Function s b
$c*> :: forall s a b. Function s a -> Function s b -> Function s b
liftA2 :: (a -> b -> c) -> Function s a -> Function s b -> Function s c
$cliftA2 :: forall s a b c.
(a -> b -> c) -> Function s a -> Function s b -> Function s c
<*> :: Function s (a -> b) -> Function s a -> Function s b
$c<*> :: forall s a b. Function s (a -> b) -> Function s a -> Function s b
pure :: a -> Function s a
$cpure :: forall s a. a -> Function s a
$cp1Applicative :: forall s. Functor (Function s)
Applicative, a -> Function s b -> Function s a
(a -> b) -> Function s a -> Function s b
(forall a b. (a -> b) -> Function s a -> Function s b)
-> (forall a b. a -> Function s b -> Function s a)
-> Functor (Function s)
forall a b. a -> Function s b -> Function s a
forall a b. (a -> b) -> Function s a -> Function s b
forall s a b. a -> Function s b -> Function s a
forall s a b. (a -> b) -> Function s a -> Function s b
forall (f :: * -> *).
(forall a b. (a -> b) -> f a -> f b)
-> (forall a b. a -> f b -> f a) -> Functor f
<$ :: a -> Function s b -> Function s a
$c<$ :: forall s a b. a -> Function s b -> Function s a
fmap :: (a -> b) -> Function s a -> Function s b
$cfmap :: forall s a b. (a -> b) -> Function s a -> Function s b
Functor, MonadState (FunctionState s), MonadError FlinkError, Monad (Function s)
Monad (Function s) =>
(forall a. IO a -> Function s a) -> MonadIO (Function s)
IO a -> Function s a
forall s. Monad (Function s)
forall a. IO a -> Function s a
forall s a. IO a -> Function s a
forall (m :: * -> *).
Monad m =>
(forall a. IO a -> m a) -> MonadIO m
liftIO :: IO a -> Function s a
$cliftIO :: forall s a. IO a -> Function s a
$cp1MonadIO :: forall s. Monad (Function s)
MonadIO, MonadReader Env)
class Serde a where
tpName :: Proxy a -> Text
deserializeBytes :: ByteString -> Either String a
serializeBytes :: a -> ByteString
newtype ProtoSerde a = ProtoSerde {ProtoSerde a -> a
getProto :: a}
deriving (a -> ProtoSerde b -> ProtoSerde a
(a -> b) -> ProtoSerde a -> ProtoSerde b
(forall a b. (a -> b) -> ProtoSerde a -> ProtoSerde b)
-> (forall a b. a -> ProtoSerde b -> ProtoSerde a)
-> Functor ProtoSerde
forall a b. a -> ProtoSerde b -> ProtoSerde a
forall a b. (a -> b) -> ProtoSerde a -> ProtoSerde b
forall (f :: * -> *).
(forall a b. (a -> b) -> f a -> f b)
-> (forall a b. a -> f b -> f a) -> Functor f
<$ :: a -> ProtoSerde b -> ProtoSerde a
$c<$ :: forall a b. a -> ProtoSerde b -> ProtoSerde a
fmap :: (a -> b) -> ProtoSerde a -> ProtoSerde b
$cfmap :: forall a b. (a -> b) -> ProtoSerde a -> ProtoSerde b
Functor)
instance Message a => Serde (ProtoSerde a) where
tpName :: Proxy (ProtoSerde a) -> Text
tpName px :: Proxy (ProtoSerde a)
px = "type.googleapis.com/" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Proxy a -> Text
forall msg. Message msg => Proxy msg -> Text
messageName (Proxy (ProtoSerde a) -> Proxy a
forall (f :: * -> *). Proxy (f a) -> Proxy a
unliftP Proxy (ProtoSerde a)
px)
where unliftP :: Proxy (f a) -> Proxy a
unliftP :: Proxy (f a) -> Proxy a
unliftP Proxy = Proxy a
forall k (t :: k). Proxy t
Proxy
deserializeBytes :: ByteString -> Either String (ProtoSerde a)
deserializeBytes a :: ByteString
a = a -> ProtoSerde a
forall a. a -> ProtoSerde a
ProtoSerde (a -> ProtoSerde a)
-> Either String a -> Either String (ProtoSerde a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ByteString -> Either String a
forall msg. Message msg => ByteString -> Either String msg
decodeMessage ByteString
a
serializeBytes :: ProtoSerde a -> ByteString
serializeBytes (ProtoSerde a :: a
a) = a -> ByteString
forall msg. Message msg => msg -> ByteString
encodeMessage a
a
type Json a = (FromJSON a, ToJSON a)
newtype JsonSerde a = JsonSerde {JsonSerde a -> a
getJson :: a}
deriving (a -> JsonSerde b -> JsonSerde a
(a -> b) -> JsonSerde a -> JsonSerde b
(forall a b. (a -> b) -> JsonSerde a -> JsonSerde b)
-> (forall a b. a -> JsonSerde b -> JsonSerde a)
-> Functor JsonSerde
forall a b. a -> JsonSerde b -> JsonSerde a
forall a b. (a -> b) -> JsonSerde a -> JsonSerde b
forall (f :: * -> *).
(forall a b. (a -> b) -> f a -> f b)
-> (forall a b. a -> f b -> f a) -> Functor f
<$ :: a -> JsonSerde b -> JsonSerde a
$c<$ :: forall a b. a -> JsonSerde b -> JsonSerde a
fmap :: (a -> b) -> JsonSerde a -> JsonSerde b
$cfmap :: forall a b. (a -> b) -> JsonSerde a -> JsonSerde b
Functor)
instance Json a => Serde (JsonSerde a) where
tpName :: Proxy (JsonSerde a) -> Text
tpName _ = "json/json"
deserializeBytes :: ByteString -> Either String (JsonSerde a)
deserializeBytes a :: ByteString
a = a -> JsonSerde a
forall a. a -> JsonSerde a
JsonSerde (a -> JsonSerde a)
-> Either String a -> Either String (JsonSerde a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ByteString -> Either String a
forall a. FromJSON a => ByteString -> Either String a
eitherDecode (ByteString -> ByteString
BSL.fromStrict ByteString
a)
serializeBytes :: JsonSerde a -> ByteString
serializeBytes (JsonSerde a :: a
a) = ByteString -> ByteString
BSL.toStrict (ByteString -> ByteString) -> ByteString -> ByteString
forall a b. (a -> b) -> a -> b
$ a -> ByteString
forall a. ToJSON a => a -> ByteString
encode a
a
instance Serde () where
tpName :: Proxy () -> Text
tpName _ = "ghc/Unit"
deserializeBytes :: ByteString -> Either String ()
deserializeBytes _ = () -> Either String ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
serializeBytes :: () -> ByteString
serializeBytes _ = ""
instance Serde ByteString where
tpName :: Proxy ByteString -> Text
tpName _ = "ghc/Data.ByteString"
deserializeBytes :: ByteString -> Either String ByteString
deserializeBytes = ByteString -> Either String ByteString
forall (f :: * -> *) a. Applicative f => a -> f a
pure
serializeBytes :: ByteString -> ByteString
serializeBytes = ByteString -> ByteString
forall a. a -> a
id
instance Serde BSL.ByteString where
tpName :: Proxy ByteString -> Text
tpName _ = "ghc/Data.ByteString.Lazy"
deserializeBytes :: ByteString -> Either String ByteString
deserializeBytes = ByteString -> Either String ByteString
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ByteString -> Either String ByteString)
-> (ByteString -> ByteString)
-> ByteString
-> Either String ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> ByteString
BSL.fromStrict
serializeBytes :: ByteString -> ByteString
serializeBytes = ByteString -> ByteString
BSL.toStrict
class MonadIO m => StatefulFunc s m | m -> s where
setInitialCtx :: s -> m ()
insideCtx :: (s -> a) -> m a
getCtx :: m s
setCtx :: s -> m ()
modifyCtx :: (s -> s) -> m ()
sendEgressMsg ::
Message a =>
(Text, Text) ->
a ->
m ()
sendMsg ::
Serde a =>
Address ->
a ->
m ()
sendMsgDelay ::
Serde a =>
Address ->
NominalDiffTime ->
a ->
m ()
instance StatefulFunc s (Function s) where
setInitialCtx :: s -> Function s ()
setInitialCtx ctx :: s
ctx = (FunctionState s -> FunctionState s) -> Function s ()
forall s (m :: * -> *). MonadState s m => (s -> s) -> m ()
modify (\old :: FunctionState s
old -> FunctionState s
old {functionStateCtx :: s
functionStateCtx = s
ctx})
insideCtx :: (s -> a) -> Function s a
insideCtx func :: s -> a
func = s -> a
func (s -> a) -> Function s s -> Function s a
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Function s s
forall s (m :: * -> *). StatefulFunc s m => m s
getCtx
getCtx :: Function s s
getCtx = (FunctionState s -> s) -> Function s s
forall s (m :: * -> *) a. MonadState s m => (s -> a) -> m a
gets FunctionState s -> s
forall ctx. FunctionState ctx -> ctx
functionStateCtx
setCtx :: s -> Function s ()
setCtx new :: s
new = (FunctionState s -> FunctionState s) -> Function s ()
forall s (m :: * -> *). MonadState s m => (s -> s) -> m ()
modify (\old :: FunctionState s
old -> FunctionState s
old {functionStateCtx :: s
functionStateCtx = s
new, functionStateMutated :: Bool
functionStateMutated = Bool
True})
modifyCtx :: (s -> s) -> Function s ()
modifyCtx mutator :: s -> s
mutator = Function s s
forall s (m :: * -> *). StatefulFunc s m => m s
getCtx Function s s -> (s -> Function s ()) -> Function s ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= s -> Function s ()
forall s (m :: * -> *). StatefulFunc s m => s -> m ()
setCtx (s -> Function s ()) -> (s -> s) -> s -> Function s ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. s -> s
mutator
sendEgressMsg :: (Text, Text) -> a -> Function s ()
sendEgressMsg (namespace :: Text
namespace, egressType :: Text
egressType) msg :: a
msg = do
Seq FromFunction'EgressMessage
egresses <- (FunctionState s -> Seq FromFunction'EgressMessage)
-> Function s (Seq FromFunction'EgressMessage)
forall s (m :: * -> *) a. MonadState s m => (s -> a) -> m a
gets FunctionState s -> Seq FromFunction'EgressMessage
forall ctx. FunctionState ctx -> Seq FromFunction'EgressMessage
functionStateEgressMessages
(FunctionState s -> FunctionState s) -> Function s ()
forall s (m :: * -> *). MonadState s m => (s -> s) -> m ()
modify (\old :: FunctionState s
old -> FunctionState s
old {functionStateEgressMessages :: Seq FromFunction'EgressMessage
functionStateEgressMessages = Seq FromFunction'EgressMessage
egresses Seq FromFunction'EgressMessage
-> FromFunction'EgressMessage -> Seq FromFunction'EgressMessage
forall a. Seq a -> a -> Seq a
Seq.:|> FromFunction'EgressMessage
egressMsg})
where
wmsg :: ProtoSerde a
wmsg = a -> ProtoSerde a
forall a. a -> ProtoSerde a
ProtoSerde a
msg
egressMsg :: PR.FromFunction'EgressMessage
egressMsg :: FromFunction'EgressMessage
egressMsg =
FromFunction'EgressMessage
forall msg. Message msg => msg
defMessage
FromFunction'EgressMessage
-> (FromFunction'EgressMessage -> FromFunction'EgressMessage)
-> FromFunction'EgressMessage
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *).
Identical f =>
LensLike' f FromFunction'EgressMessage Text
forall (f :: * -> *) s a.
(Functor f, HasField s "egressNamespace" a) =>
LensLike' f s a
PR.egressNamespace (forall (f :: * -> *).
Identical f =>
LensLike' f FromFunction'EgressMessage Text)
-> Text -> FromFunction'EgressMessage -> FromFunction'EgressMessage
forall s t a b. Setter s t a b -> b -> s -> t
.~ Text
namespace
FromFunction'EgressMessage
-> (FromFunction'EgressMessage -> FromFunction'EgressMessage)
-> FromFunction'EgressMessage
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *).
Identical f =>
LensLike' f FromFunction'EgressMessage Text
forall (f :: * -> *) s a.
(Functor f, HasField s "egressType" a) =>
LensLike' f s a
PR.egressType (forall (f :: * -> *).
Identical f =>
LensLike' f FromFunction'EgressMessage Text)
-> Text -> FromFunction'EgressMessage -> FromFunction'EgressMessage
forall s t a b. Setter s t a b -> b -> s -> t
.~ Text
egressType
FromFunction'EgressMessage
-> (FromFunction'EgressMessage -> FromFunction'EgressMessage)
-> FromFunction'EgressMessage
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *).
Identical f =>
LensLike' f FromFunction'EgressMessage TypedValue
forall (f :: * -> *) s a.
(Functor f, HasField s "argument" a) =>
LensLike' f s a
PR.argument (forall (f :: * -> *).
Identical f =>
LensLike' f FromFunction'EgressMessage TypedValue)
-> TypedValue
-> FromFunction'EgressMessage
-> FromFunction'EgressMessage
forall s t a b. Setter s t a b -> b -> s -> t
.~ TypedValue
tpValue
tpValue :: TypedValue
tpValue =
TypedValue
forall msg. Message msg => msg
defMessage
TypedValue -> (TypedValue -> TypedValue) -> TypedValue
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *). Identical f => LensLike' f TypedValue Text
forall (f :: * -> *) s a.
(Functor f, HasField s "typename" a) =>
LensLike' f s a
PR.typename (forall (f :: * -> *). Identical f => LensLike' f TypedValue Text)
-> Text -> TypedValue -> TypedValue
forall s t a b. Setter s t a b -> b -> s -> t
.~ Proxy (ProtoSerde a) -> Text
forall a. Serde a => Proxy a -> Text
tpName (ProtoSerde a -> Proxy (ProtoSerde a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure ProtoSerde a
wmsg)
TypedValue -> (TypedValue -> TypedValue) -> TypedValue
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *). Identical f => LensLike' f TypedValue Bool
forall (f :: * -> *) s a.
(Functor f, HasField s "hasValue" a) =>
LensLike' f s a
PR.hasValue (forall (f :: * -> *). Identical f => LensLike' f TypedValue Bool)
-> Bool -> TypedValue -> TypedValue
forall s t a b. Setter s t a b -> b -> s -> t
.~ Bool
True
TypedValue -> (TypedValue -> TypedValue) -> TypedValue
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *).
Identical f =>
LensLike' f TypedValue ByteString
forall (f :: * -> *) s a.
(Functor f, HasField s "value" a) =>
LensLike' f s a
PR.value (forall (f :: * -> *).
Identical f =>
LensLike' f TypedValue ByteString)
-> ByteString -> TypedValue -> TypedValue
forall s t a b. Setter s t a b -> b -> s -> t
.~ ProtoSerde a -> ByteString
forall a. Serde a => a -> ByteString
serializeBytes ProtoSerde a
wmsg
sendMsg :: Address -> a -> Function s ()
sendMsg (Address' namespace :: Text
namespace funcType :: Text
funcType id' :: Text
id') msg :: a
msg = do
Seq FromFunction'Invocation
invocations <- (FunctionState s -> Seq FromFunction'Invocation)
-> Function s (Seq FromFunction'Invocation)
forall s (m :: * -> *) a. MonadState s m => (s -> a) -> m a
gets FunctionState s -> Seq FromFunction'Invocation
forall ctx. FunctionState ctx -> Seq FromFunction'Invocation
functionStateInvocations
(FunctionState s -> FunctionState s) -> Function s ()
forall s (m :: * -> *). MonadState s m => (s -> s) -> m ()
modify (\old :: FunctionState s
old -> FunctionState s
old {functionStateInvocations :: Seq FromFunction'Invocation
functionStateInvocations = Seq FromFunction'Invocation
invocations Seq FromFunction'Invocation
-> FromFunction'Invocation -> Seq FromFunction'Invocation
forall a. Seq a -> a -> Seq a
Seq.:|> FromFunction'Invocation
invocation})
where
target :: PR.Address
target :: Address
target =
Address
forall msg. Message msg => msg
defMessage
Address -> (Address -> Address) -> Address
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *). Identical f => LensLike' f Address Text
forall (f :: * -> *) s a.
(Functor f, HasField s "namespace" a) =>
LensLike' f s a
PR.namespace (forall (f :: * -> *). Identical f => LensLike' f Address Text)
-> Text -> Address -> Address
forall s t a b. Setter s t a b -> b -> s -> t
.~ Text
namespace
Address -> (Address -> Address) -> Address
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *). Identical f => LensLike' f Address Text
forall (f :: * -> *) s a.
(Functor f, HasField s "type'" a) =>
LensLike' f s a
PR.type' (forall (f :: * -> *). Identical f => LensLike' f Address Text)
-> Text -> Address -> Address
forall s t a b. Setter s t a b -> b -> s -> t
.~ Text
funcType
Address -> (Address -> Address) -> Address
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *). Identical f => LensLike' f Address Text
forall (f :: * -> *) s a.
(Functor f, HasField s "id" a) =>
LensLike' f s a
PR.id (forall (f :: * -> *). Identical f => LensLike' f Address Text)
-> Text -> Address -> Address
forall s t a b. Setter s t a b -> b -> s -> t
.~ Text
id'
invocation :: PR.FromFunction'Invocation
invocation :: FromFunction'Invocation
invocation =
FromFunction'Invocation
forall msg. Message msg => msg
defMessage
FromFunction'Invocation
-> (FromFunction'Invocation -> FromFunction'Invocation)
-> FromFunction'Invocation
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *).
Identical f =>
LensLike' f FromFunction'Invocation Address
forall (f :: * -> *) s a.
(Functor f, HasField s "target" a) =>
LensLike' f s a
PR.target (forall (f :: * -> *).
Identical f =>
LensLike' f FromFunction'Invocation Address)
-> Address -> FromFunction'Invocation -> FromFunction'Invocation
forall s t a b. Setter s t a b -> b -> s -> t
.~ Address
target
FromFunction'Invocation
-> (FromFunction'Invocation -> FromFunction'Invocation)
-> FromFunction'Invocation
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *).
Identical f =>
LensLike' f FromFunction'Invocation TypedValue
forall (f :: * -> *) s a.
(Functor f, HasField s "argument" a) =>
LensLike' f s a
PR.argument (forall (f :: * -> *).
Identical f =>
LensLike' f FromFunction'Invocation TypedValue)
-> TypedValue -> FromFunction'Invocation -> FromFunction'Invocation
forall s t a b. Setter s t a b -> b -> s -> t
.~ TypedValue
tpValue
tpValue :: TypedValue
tpValue =
TypedValue
forall msg. Message msg => msg
defMessage
TypedValue -> (TypedValue -> TypedValue) -> TypedValue
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *). Identical f => LensLike' f TypedValue Text
forall (f :: * -> *) s a.
(Functor f, HasField s "typename" a) =>
LensLike' f s a
PR.typename (forall (f :: * -> *). Identical f => LensLike' f TypedValue Text)
-> Text -> TypedValue -> TypedValue
forall s t a b. Setter s t a b -> b -> s -> t
.~ Proxy a -> Text
forall a. Serde a => Proxy a -> Text
tpName (a -> Proxy a
forall (f :: * -> *) a. Applicative f => a -> f a
pure a
msg)
TypedValue -> (TypedValue -> TypedValue) -> TypedValue
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *). Identical f => LensLike' f TypedValue Bool
forall (f :: * -> *) s a.
(Functor f, HasField s "hasValue" a) =>
LensLike' f s a
PR.hasValue (forall (f :: * -> *). Identical f => LensLike' f TypedValue Bool)
-> Bool -> TypedValue -> TypedValue
forall s t a b. Setter s t a b -> b -> s -> t
.~ Bool
True
TypedValue -> (TypedValue -> TypedValue) -> TypedValue
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *).
Identical f =>
LensLike' f TypedValue ByteString
forall (f :: * -> *) s a.
(Functor f, HasField s "value" a) =>
LensLike' f s a
PR.value (forall (f :: * -> *).
Identical f =>
LensLike' f TypedValue ByteString)
-> ByteString -> TypedValue -> TypedValue
forall s t a b. Setter s t a b -> b -> s -> t
.~ a -> ByteString
forall a. Serde a => a -> ByteString
serializeBytes a
msg
sendMsgDelay :: Address -> NominalDiffTime -> a -> Function s ()
sendMsgDelay (Address' namespace :: Text
namespace funcType :: Text
funcType id' :: Text
id') delay :: NominalDiffTime
delay msg :: a
msg = do
Seq FromFunction'DelayedInvocation
invocations <- (FunctionState s -> Seq FromFunction'DelayedInvocation)
-> Function s (Seq FromFunction'DelayedInvocation)
forall s (m :: * -> *) a. MonadState s m => (s -> a) -> m a
gets FunctionState s -> Seq FromFunction'DelayedInvocation
forall ctx. FunctionState ctx -> Seq FromFunction'DelayedInvocation
functionStateDelayedInvocations
(FunctionState s -> FunctionState s) -> Function s ()
forall s (m :: * -> *). MonadState s m => (s -> s) -> m ()
modify (\old :: FunctionState s
old -> FunctionState s
old {functionStateDelayedInvocations :: Seq FromFunction'DelayedInvocation
functionStateDelayedInvocations = Seq FromFunction'DelayedInvocation
invocations Seq FromFunction'DelayedInvocation
-> FromFunction'DelayedInvocation
-> Seq FromFunction'DelayedInvocation
forall a. Seq a -> a -> Seq a
Seq.:|> FromFunction'DelayedInvocation
invocation})
where
target :: PR.Address
target :: Address
target =
Address
forall msg. Message msg => msg
defMessage
Address -> (Address -> Address) -> Address
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *). Identical f => LensLike' f Address Text
forall (f :: * -> *) s a.
(Functor f, HasField s "namespace" a) =>
LensLike' f s a
PR.namespace (forall (f :: * -> *). Identical f => LensLike' f Address Text)
-> Text -> Address -> Address
forall s t a b. Setter s t a b -> b -> s -> t
.~ Text
namespace
Address -> (Address -> Address) -> Address
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *). Identical f => LensLike' f Address Text
forall (f :: * -> *) s a.
(Functor f, HasField s "type'" a) =>
LensLike' f s a
PR.type' (forall (f :: * -> *). Identical f => LensLike' f Address Text)
-> Text -> Address -> Address
forall s t a b. Setter s t a b -> b -> s -> t
.~ Text
funcType
Address -> (Address -> Address) -> Address
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *). Identical f => LensLike' f Address Text
forall (f :: * -> *) s a.
(Functor f, HasField s "id" a) =>
LensLike' f s a
PR.id (forall (f :: * -> *). Identical f => LensLike' f Address Text)
-> Text -> Address -> Address
forall s t a b. Setter s t a b -> b -> s -> t
.~ Text
id'
invocation :: PR.FromFunction'DelayedInvocation
invocation :: FromFunction'DelayedInvocation
invocation =
FromFunction'DelayedInvocation
forall msg. Message msg => msg
defMessage
FromFunction'DelayedInvocation
-> (FromFunction'DelayedInvocation
-> FromFunction'DelayedInvocation)
-> FromFunction'DelayedInvocation
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *).
Identical f =>
LensLike' f FromFunction'DelayedInvocation Int64
forall (f :: * -> *) s a.
(Functor f, HasField s "delayInMs" a) =>
LensLike' f s a
PR.delayInMs (forall (f :: * -> *).
Identical f =>
LensLike' f FromFunction'DelayedInvocation Int64)
-> Int64
-> FromFunction'DelayedInvocation
-> FromFunction'DelayedInvocation
forall s t a b. Setter s t a b -> b -> s -> t
.~ NominalDiffTime -> Int64
forall a b. (RealFrac a, Integral b) => a -> b
round (NominalDiffTime
delay NominalDiffTime -> NominalDiffTime -> NominalDiffTime
forall a. Num a => a -> a -> a
* 1000)
FromFunction'DelayedInvocation
-> (FromFunction'DelayedInvocation
-> FromFunction'DelayedInvocation)
-> FromFunction'DelayedInvocation
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *).
Identical f =>
LensLike' f FromFunction'DelayedInvocation Address
forall (f :: * -> *) s a.
(Functor f, HasField s "target" a) =>
LensLike' f s a
PR.target (forall (f :: * -> *).
Identical f =>
LensLike' f FromFunction'DelayedInvocation Address)
-> Address
-> FromFunction'DelayedInvocation
-> FromFunction'DelayedInvocation
forall s t a b. Setter s t a b -> b -> s -> t
.~ Address
target
FromFunction'DelayedInvocation
-> (FromFunction'DelayedInvocation
-> FromFunction'DelayedInvocation)
-> FromFunction'DelayedInvocation
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *).
Identical f =>
LensLike' f FromFunction'DelayedInvocation TypedValue
forall (f :: * -> *) s a.
(Functor f, HasField s "argument" a) =>
LensLike' f s a
PR.argument (forall (f :: * -> *).
Identical f =>
LensLike' f FromFunction'DelayedInvocation TypedValue)
-> TypedValue
-> FromFunction'DelayedInvocation
-> FromFunction'DelayedInvocation
forall s t a b. Setter s t a b -> b -> s -> t
.~ TypedValue
tpValue
tpValue :: TypedValue
tpValue =
TypedValue
forall msg. Message msg => msg
defMessage
TypedValue -> (TypedValue -> TypedValue) -> TypedValue
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *). Identical f => LensLike' f TypedValue Text
forall (f :: * -> *) s a.
(Functor f, HasField s "typename" a) =>
LensLike' f s a
PR.typename (forall (f :: * -> *). Identical f => LensLike' f TypedValue Text)
-> Text -> TypedValue -> TypedValue
forall s t a b. Setter s t a b -> b -> s -> t
.~ Proxy a -> Text
forall a. Serde a => Proxy a -> Text
tpName (a -> Proxy a
forall (f :: * -> *) a. Applicative f => a -> f a
pure a
msg)
TypedValue -> (TypedValue -> TypedValue) -> TypedValue
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *). Identical f => LensLike' f TypedValue Bool
forall (f :: * -> *) s a.
(Functor f, HasField s "hasValue" a) =>
LensLike' f s a
PR.hasValue (forall (f :: * -> *). Identical f => LensLike' f TypedValue Bool)
-> Bool -> TypedValue -> TypedValue
forall s t a b. Setter s t a b -> b -> s -> t
.~ Bool
True
TypedValue -> (TypedValue -> TypedValue) -> TypedValue
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *).
Identical f =>
LensLike' f TypedValue ByteString
forall (f :: * -> *) s a.
(Functor f, HasField s "value" a) =>
LensLike' f s a
PR.value (forall (f :: * -> *).
Identical f =>
LensLike' f TypedValue ByteString)
-> ByteString -> TypedValue -> TypedValue
forall s t a b. Setter s t a b -> b -> s -> t
.~ a -> ByteString
forall a. Serde a => a -> ByteString
serializeBytes a
msg
sendProtoMsg :: (StatefulFunc s m, Message a) => Address -> a -> m ()
sendProtoMsg :: Address -> a -> m ()
sendProtoMsg addr :: Address
addr = Address -> ProtoSerde a -> m ()
forall s (m :: * -> *) a.
(StatefulFunc s m, Serde a) =>
Address -> a -> m ()
sendMsg Address
addr (ProtoSerde a -> m ()) -> (a -> ProtoSerde a) -> a -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> ProtoSerde a
forall a. a -> ProtoSerde a
ProtoSerde
sendProtoMsgDelay :: (StatefulFunc s m, Message a) => Address -> NominalDiffTime -> a -> m ()
sendProtoMsgDelay :: Address -> NominalDiffTime -> a -> m ()
sendProtoMsgDelay addr :: Address
addr delay :: NominalDiffTime
delay = Address -> NominalDiffTime -> ProtoSerde a -> m ()
forall s (m :: * -> *) a.
(StatefulFunc s m, Serde a) =>
Address -> NominalDiffTime -> a -> m ()
sendMsgDelay Address
addr NominalDiffTime
delay (ProtoSerde a -> m ()) -> (a -> ProtoSerde a) -> a -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> ProtoSerde a
forall a. a -> ProtoSerde a
ProtoSerde
data FlinkError
= MissingInvocationBatch
| ProtodeserializeBytesError String
| InvalidTypePassedError Text Text
| EmptyArgumentPassed
| StateDecodeError String
| MessageDecodeError String
| ProtoMessageDecodeError UnpackError
| NoSuchFunction (Text, Text)
deriving (Int -> FlinkError -> ShowS
[FlinkError] -> ShowS
FlinkError -> String
(Int -> FlinkError -> ShowS)
-> (FlinkError -> String)
-> ([FlinkError] -> ShowS)
-> Show FlinkError
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [FlinkError] -> ShowS
$cshowList :: [FlinkError] -> ShowS
show :: FlinkError -> String
$cshow :: FlinkError -> String
showsPrec :: Int -> FlinkError -> ShowS
$cshowsPrec :: Int -> FlinkError -> ShowS
Show, FlinkError -> FlinkError -> Bool
(FlinkError -> FlinkError -> Bool)
-> (FlinkError -> FlinkError -> Bool) -> Eq FlinkError
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: FlinkError -> FlinkError -> Bool
$c/= :: FlinkError -> FlinkError -> Bool
== :: FlinkError -> FlinkError -> Bool
$c== :: FlinkError -> FlinkError -> Bool
Eq)
jsonState :: Json s => Function s () -> Function (JsonSerde s) ()
jsonState :: Function s () -> Function (JsonSerde s) ()
jsonState = Function s () -> Function (JsonSerde s) ()
forall a b. Coercible a b => a -> b
coerce
protoState :: Message s => Function s () -> Function (ProtoSerde s) ()
protoState :: Function s () -> Function (ProtoSerde s) ()
protoState = Function s () -> Function (ProtoSerde s) ()
forall a b. Coercible a b => a -> b
coerce
unwrapA :: forall a m. (Serde a, MonadError FlinkError m) => PR.TypedValue -> m (Maybe a)
unwrapA :: TypedValue -> m (Maybe a)
unwrapA arg :: TypedValue
arg = let
atp :: Text
atp = TypedValue
arg TypedValue -> FoldLike Text TypedValue TypedValue Text Text -> Text
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike Text TypedValue TypedValue Text Text
forall (f :: * -> *) s a.
(Functor f, HasField s "typename" a) =>
LensLike' f s a
PR.typename
ctp :: Text
ctp = Proxy a -> Text
forall a. Serde a => Proxy a -> Text
tpName @a Proxy a
forall k (t :: k). Proxy t
Proxy
in if Bool -> Bool
not (TypedValue
argTypedValue -> FoldLike Bool TypedValue TypedValue Bool Bool -> Bool
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike Bool TypedValue TypedValue Bool Bool
forall (f :: * -> *) s a.
(Functor f, HasField s "hasValue" a) =>
LensLike' f s a
PR.hasValue) then Maybe a -> m (Maybe a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe a
forall a. Maybe a
Nothing else
if Text
atp Text -> Text -> Bool
forall a. Eq a => a -> a -> Bool
/= Text
ctp
then FlinkError -> m (Maybe a)
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError (Text -> Text -> FlinkError
InvalidTypePassedError Text
ctp Text
atp)
else a -> Maybe a
forall a. a -> Maybe a
Just (a -> Maybe a) -> m a -> m (Maybe a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (Either FlinkError a -> m a
forall e (m :: * -> *) a. MonadError e m => Either e a -> m a
liftEither (Either FlinkError a -> m a)
-> (Either String a -> Either FlinkError a)
-> Either String a
-> m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (String -> FlinkError) -> Either String a -> Either FlinkError a
forall a c b. (a -> c) -> Either a b -> Either c b
mapLeft String -> FlinkError
MessageDecodeError (Either String a -> m a) -> Either String a -> m a
forall a b. (a -> b) -> a -> b
$ ByteString -> Either String a
forall a. Serde a => ByteString -> Either String a
deserializeBytes @a (TypedValue
arg TypedValue
-> FoldLike ByteString TypedValue TypedValue ByteString ByteString
-> ByteString
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike ByteString TypedValue TypedValue ByteString ByteString
forall (f :: * -> *) s a.
(Functor f, HasField s "value" a) =>
LensLike' f s a
PR.value))
flinkWrapper :: forall a s. (Serde a, Serde s) => s -> Expiration -> (a -> Function s ()) -> FuncExec
flinkWrapper :: s -> Expiration -> (a -> Function s ()) -> FuncExec
flinkWrapper s0 :: s
s0 expr :: Expiration
expr func :: a -> Function s ()
func env :: Env
env invocationBatch :: ToFunction'InvocationBatchRequest
invocationBatch = ExceptT FlinkError IO FuncRes -> IO (Either FlinkError FuncRes)
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT (ExceptT FlinkError IO FuncRes -> IO (Either FlinkError FuncRes))
-> ExceptT FlinkError IO FuncRes -> IO (Either FlinkError FuncRes)
forall a b. (a -> b) -> a -> b
$ do
(eiRes :: Either FlinkError FuncRes
eiRes, _) <- IO (Either FlinkError FuncRes, FunctionState s)
-> ExceptT
FlinkError IO (Either FlinkError FuncRes, FunctionState s)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either FlinkError FuncRes, FunctionState s)
-> ExceptT
FlinkError IO (Either FlinkError FuncRes, FunctionState s))
-> IO (Either FlinkError FuncRes, FunctionState s)
-> ExceptT
FlinkError IO (Either FlinkError FuncRes, FunctionState s)
forall a b. (a -> b) -> a -> b
$ FunctionState s -> IO (Either FlinkError FuncRes, FunctionState s)
runner (s -> FunctionState s
forall a. a -> FunctionState a
newState s
s0)
Either FlinkError FuncRes -> ExceptT FlinkError IO FuncRes
forall e (m :: * -> *) a. MonadError e m => Either e a -> m a
liftEither Either FlinkError FuncRes
eiRes
where
passedArgs :: [TypedValue]
passedArgs = ToFunction'InvocationBatchRequest
invocationBatch ToFunction'InvocationBatchRequest
-> Fold
ToFunction'InvocationBatchRequest
ToFunction'InvocationBatchRequest
TypedValue
TypedValue
-> [TypedValue]
forall s t a b. s -> Fold s t a b -> [a]
^.. LensLike'
f ToFunction'InvocationBatchRequest [ToFunction'Invocation]
forall (f :: * -> *) s a.
(Functor f, HasField s "invocations" a) =>
LensLike' f s a
PR.invocations LensLike'
f ToFunction'InvocationBatchRequest [ToFunction'Invocation]
-> ((TypedValue -> f TypedValue)
-> [ToFunction'Invocation] -> f [ToFunction'Invocation])
-> (TypedValue -> f TypedValue)
-> ToFunction'InvocationBatchRequest
-> f ToFunction'InvocationBatchRequest
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (ToFunction'Invocation -> f ToFunction'Invocation)
-> [ToFunction'Invocation] -> f [ToFunction'Invocation]
forall (f :: * -> *) a b.
Traversable f =>
Traversal (f a) (f b) a b
traversed ((ToFunction'Invocation -> f ToFunction'Invocation)
-> [ToFunction'Invocation] -> f [ToFunction'Invocation])
-> ((TypedValue -> f TypedValue)
-> ToFunction'Invocation -> f ToFunction'Invocation)
-> (TypedValue -> f TypedValue)
-> [ToFunction'Invocation]
-> f [ToFunction'Invocation]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (TypedValue -> f TypedValue)
-> ToFunction'Invocation -> f ToFunction'Invocation
forall (f :: * -> *) s a.
(Functor f, HasField s "argument" a) =>
LensLike' f s a
PR.argument
mbInitCtx :: Maybe TypedValue
mbInitCtx = (LensLike'
f ToFunction'InvocationBatchRequest [ToFunction'PersistedValue]
forall (f :: * -> *) s a.
(Functor f, HasField s "state" a) =>
LensLike' f s a
PR.state LensLike'
f ToFunction'InvocationBatchRequest [ToFunction'PersistedValue]
-> ((TypedValue -> f TypedValue)
-> [ToFunction'PersistedValue] -> f [ToFunction'PersistedValue])
-> (TypedValue -> f TypedValue)
-> ToFunction'InvocationBatchRequest
-> f ToFunction'InvocationBatchRequest
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (ToFunction'PersistedValue -> f ToFunction'PersistedValue)
-> [ToFunction'PersistedValue] -> f [ToFunction'PersistedValue]
forall (f :: * -> *) a b.
Traversable f =>
Traversal (f a) (f b) a b
traversed ((ToFunction'PersistedValue -> f ToFunction'PersistedValue)
-> [ToFunction'PersistedValue] -> f [ToFunction'PersistedValue])
-> ((TypedValue -> f TypedValue)
-> ToFunction'PersistedValue -> f ToFunction'PersistedValue)
-> (TypedValue -> f TypedValue)
-> [ToFunction'PersistedValue]
-> f [ToFunction'PersistedValue]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (ToFunction'PersistedValue -> Bool)
-> Traversal' ToFunction'PersistedValue ToFunction'PersistedValue
forall a. (a -> Bool) -> Traversal' a a
filtered ((Text -> Text -> Bool
forall a. Eq a => a -> a -> Bool
== "flink_state") (Text -> Bool)
-> (ToFunction'PersistedValue -> Text)
-> ToFunction'PersistedValue
-> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (ToFunction'PersistedValue
-> FoldLike
Text ToFunction'PersistedValue ToFunction'PersistedValue Text Text
-> Text
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike
Text ToFunction'PersistedValue ToFunction'PersistedValue Text Text
forall (f :: * -> *) s a.
(Functor f, HasField s "stateName" a) =>
LensLike' f s a
PR.stateName)) ((ToFunction'PersistedValue -> f ToFunction'PersistedValue)
-> ToFunction'PersistedValue -> f ToFunction'PersistedValue)
-> ((TypedValue -> f TypedValue)
-> ToFunction'PersistedValue -> f ToFunction'PersistedValue)
-> (TypedValue -> f TypedValue)
-> ToFunction'PersistedValue
-> f ToFunction'PersistedValue
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (TypedValue -> f TypedValue)
-> ToFunction'PersistedValue -> f ToFunction'PersistedValue
forall (f :: * -> *) s a.
(Functor f, HasField s "stateValue" a) =>
LensLike' f s a
PR.stateValue)
Fold
ToFunction'InvocationBatchRequest
ToFunction'InvocationBatchRequest
TypedValue
TypedValue
-> ToFunction'InvocationBatchRequest -> Maybe TypedValue
forall s t a b. Fold s t a b -> s -> Maybe a
`firstOf` ToFunction'InvocationBatchRequest
invocationBatch
runWithCtx :: Function s FuncRes
runWithCtx = do
case Maybe TypedValue
mbInitCtx of
Nothing -> FuncRes -> Function s FuncRes
forall (m :: * -> *) a. Monad m => a -> m a
return (FuncRes -> Function s FuncRes) -> FuncRes -> Function s FuncRes
forall a b. (a -> b) -> a -> b
$ Expiration -> Text -> FuncRes
IncompleteContext Expiration
expr (Proxy s -> Text
forall a. Serde a => Proxy a -> Text
tpName @s Proxy s
forall k (t :: k). Proxy t
Proxy)
Just tv :: TypedValue
tv -> do
Maybe s
mbCtx <- TypedValue -> Function s (Maybe s)
forall a (m :: * -> *).
(Serde a, MonadError FlinkError m) =>
TypedValue -> m (Maybe a)
unwrapA @s TypedValue
tv
case Maybe s
mbCtx of
Nothing -> () -> Function s ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
Just s1 :: s
s1 -> s -> Function s ()
forall s (m :: * -> *). StatefulFunc s m => s -> m ()
setInitialCtx s
s1
[Maybe a]
mbArgs <- (TypedValue -> Function s (Maybe a))
-> [TypedValue] -> Function s [Maybe a]
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
traverse (forall a (m :: * -> *).
(Serde a, MonadError FlinkError m) =>
TypedValue -> m (Maybe a)
forall (m :: * -> *).
(Serde a, MonadError FlinkError m) =>
TypedValue -> m (Maybe a)
unwrapA @a) [TypedValue]
passedArgs
[a]
args <- (Maybe a -> Function s a) -> [Maybe a] -> Function s [a]
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
traverse (Function s a -> (a -> Function s a) -> Maybe a -> Function s a
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (FlinkError -> Function s a
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError FlinkError
EmptyArgumentPassed) a -> Function s a
forall (f :: * -> *) a. Applicative f => a -> f a
pure) [Maybe a]
mbArgs
(a -> Function s ()) -> [a] -> Function s ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ a -> Function s ()
func [a]
args
(FunctionState s -> FuncRes) -> Function s FuncRes
forall s (m :: * -> *) a. MonadState s m => (s -> a) -> m a
gets (FunctionState TypedValue -> FuncRes
UpdatedState (FunctionState TypedValue -> FuncRes)
-> (FunctionState s -> FunctionState TypedValue)
-> FunctionState s
-> FuncRes
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (s -> TypedValue) -> FunctionState s -> FunctionState TypedValue
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap s -> TypedValue
forall t a.
(Message t, Serde a, HasField t "typename" Text,
HasField t "value" ByteString) =>
a -> t
outS)
runner :: FunctionState s -> IO (Either FlinkError FuncRes, FunctionState s)
runner state :: FunctionState s
state = ReaderT Env IO (Either FlinkError FuncRes, FunctionState s)
-> Env -> IO (Either FlinkError FuncRes, FunctionState s)
forall r (m :: * -> *) a. ReaderT r m a -> r -> m a
runReaderT (StateT
(FunctionState s) (ReaderT Env IO) (Either FlinkError FuncRes)
-> FunctionState s
-> ReaderT Env IO (Either FlinkError FuncRes, FunctionState s)
forall s (m :: * -> *) a. StateT s m a -> s -> m (a, s)
runStateT (ExceptT
FlinkError (StateT (FunctionState s) (ReaderT Env IO)) FuncRes
-> StateT
(FunctionState s) (ReaderT Env IO) (Either FlinkError FuncRes)
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT (ExceptT
FlinkError (StateT (FunctionState s) (ReaderT Env IO)) FuncRes
-> StateT
(FunctionState s) (ReaderT Env IO) (Either FlinkError FuncRes))
-> ExceptT
FlinkError (StateT (FunctionState s) (ReaderT Env IO)) FuncRes
-> StateT
(FunctionState s) (ReaderT Env IO) (Either FlinkError FuncRes)
forall a b. (a -> b) -> a -> b
$ Function s FuncRes
-> ExceptT
FlinkError (StateT (FunctionState s) (ReaderT Env IO)) FuncRes
forall s a.
Function s a
-> ExceptT FlinkError (StateT (FunctionState s) (ReaderT Env IO)) a
runFunction Function s FuncRes
runWithCtx) FunctionState s
state) Env
env
outS :: a -> t
outS fstate :: a
fstate = t
forall msg. Message msg => msg
defMessage
t -> (t -> t) -> t
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *). Identical f => LensLike' f t Text
forall (f :: * -> *) s a.
(Functor f, HasField s "typename" a) =>
LensLike' f s a
PR.typename (forall (f :: * -> *). Identical f => LensLike' f t Text)
-> Text -> t -> t
forall s t a b. Setter s t a b -> b -> s -> t
.~ Proxy a -> Text
forall a. Serde a => Proxy a -> Text
tpName (a -> Proxy a
forall (f :: * -> *) a. Applicative f => a -> f a
pure a
fstate)
t -> (t -> t) -> t
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *). Identical f => LensLike' f t ByteString
forall (f :: * -> *) s a.
(Functor f, HasField s "value" a) =>
LensLike' f s a
PR.value (forall (f :: * -> *). Identical f => LensLike' f t ByteString)
-> ByteString -> t -> t
forall s t a b. Setter s t a b -> b -> s -> t
.~ a -> ByteString
forall a. Serde a => a -> ByteString
serializeBytes a
fstate
createFlinkResp :: FuncRes -> FromFunction
createFlinkResp :: FuncRes -> FromFunction
createFlinkResp (UpdatedState (FunctionState state :: TypedValue
state mutated :: Bool
mutated invocations :: Seq FromFunction'Invocation
invocations delayedInvocations :: Seq FromFunction'DelayedInvocation
delayedInvocations egresses :: Seq FromFunction'EgressMessage
egresses)) =
FromFunction
forall msg. Message msg => msg
defMessage FromFunction -> (FromFunction -> FromFunction) -> FromFunction
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *).
Identical f =>
LensLike' f FromFunction FromFunction'InvocationResponse
forall (f :: * -> *) s a.
(Functor f, HasField s "invocationResult" a) =>
LensLike' f s a
PR.invocationResult
(forall (f :: * -> *).
Identical f =>
LensLike' f FromFunction FromFunction'InvocationResponse)
-> FromFunction'InvocationResponse -> FromFunction -> FromFunction
forall s t a b. Setter s t a b -> b -> s -> t
.~ ( FromFunction'InvocationResponse
forall msg. Message msg => msg
defMessage
FromFunction'InvocationResponse
-> (FromFunction'InvocationResponse
-> FromFunction'InvocationResponse)
-> FromFunction'InvocationResponse
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *).
Identical f =>
LensLike'
f
FromFunction'InvocationResponse
[FromFunction'PersistedValueMutation]
forall (f :: * -> *) s a.
(Functor f, HasField s "stateMutations" a) =>
LensLike' f s a
PR.stateMutations (forall (f :: * -> *).
Identical f =>
LensLike'
f
FromFunction'InvocationResponse
[FromFunction'PersistedValueMutation])
-> [FromFunction'PersistedValueMutation]
-> FromFunction'InvocationResponse
-> FromFunction'InvocationResponse
forall s t a b. Setter s t a b -> b -> s -> t
.~ [FromFunction'PersistedValueMutation]
-> [FromFunction'PersistedValueMutation]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList [FromFunction'PersistedValueMutation]
stateMutations
FromFunction'InvocationResponse
-> (FromFunction'InvocationResponse
-> FromFunction'InvocationResponse)
-> FromFunction'InvocationResponse
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *).
Identical f =>
LensLike'
f FromFunction'InvocationResponse [FromFunction'Invocation]
forall (f :: * -> *) s a.
(Functor f, HasField s "outgoingMessages" a) =>
LensLike' f s a
PR.outgoingMessages (forall (f :: * -> *).
Identical f =>
LensLike'
f FromFunction'InvocationResponse [FromFunction'Invocation])
-> [FromFunction'Invocation]
-> FromFunction'InvocationResponse
-> FromFunction'InvocationResponse
forall s t a b. Setter s t a b -> b -> s -> t
.~ Seq FromFunction'Invocation -> [FromFunction'Invocation]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList Seq FromFunction'Invocation
invocations
FromFunction'InvocationResponse
-> (FromFunction'InvocationResponse
-> FromFunction'InvocationResponse)
-> FromFunction'InvocationResponse
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *).
Identical f =>
LensLike'
f FromFunction'InvocationResponse [FromFunction'DelayedInvocation]
forall (f :: * -> *) s a.
(Functor f, HasField s "delayedInvocations" a) =>
LensLike' f s a
PR.delayedInvocations (forall (f :: * -> *).
Identical f =>
LensLike'
f FromFunction'InvocationResponse [FromFunction'DelayedInvocation])
-> [FromFunction'DelayedInvocation]
-> FromFunction'InvocationResponse
-> FromFunction'InvocationResponse
forall s t a b. Setter s t a b -> b -> s -> t
.~ Seq FromFunction'DelayedInvocation
-> [FromFunction'DelayedInvocation]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList Seq FromFunction'DelayedInvocation
delayedInvocations
FromFunction'InvocationResponse
-> (FromFunction'InvocationResponse
-> FromFunction'InvocationResponse)
-> FromFunction'InvocationResponse
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *).
Identical f =>
LensLike'
f FromFunction'InvocationResponse [FromFunction'EgressMessage]
forall (f :: * -> *) s a.
(Functor f, HasField s "outgoingEgresses" a) =>
LensLike' f s a
PR.outgoingEgresses (forall (f :: * -> *).
Identical f =>
LensLike'
f FromFunction'InvocationResponse [FromFunction'EgressMessage])
-> [FromFunction'EgressMessage]
-> FromFunction'InvocationResponse
-> FromFunction'InvocationResponse
forall s t a b. Setter s t a b -> b -> s -> t
.~ Seq FromFunction'EgressMessage -> [FromFunction'EgressMessage]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList Seq FromFunction'EgressMessage
egresses
)
where
stateMutations :: [PR.FromFunction'PersistedValueMutation]
stateMutations :: [FromFunction'PersistedValueMutation]
stateMutations =
[ FromFunction'PersistedValueMutation
forall msg. Message msg => msg
defMessage
FromFunction'PersistedValueMutation
-> (FromFunction'PersistedValueMutation
-> FromFunction'PersistedValueMutation)
-> FromFunction'PersistedValueMutation
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *).
Identical f =>
LensLike'
f
FromFunction'PersistedValueMutation
FromFunction'PersistedValueMutation'MutationType
forall (f :: * -> *) s a.
(Functor f, HasField s "mutationType" a) =>
LensLike' f s a
PR.mutationType (forall (f :: * -> *).
Identical f =>
LensLike'
f
FromFunction'PersistedValueMutation
FromFunction'PersistedValueMutation'MutationType)
-> FromFunction'PersistedValueMutation'MutationType
-> FromFunction'PersistedValueMutation
-> FromFunction'PersistedValueMutation
forall s t a b. Setter s t a b -> b -> s -> t
.~ FromFunction'PersistedValueMutation'MutationType
PR.FromFunction'PersistedValueMutation'MODIFY
FromFunction'PersistedValueMutation
-> (FromFunction'PersistedValueMutation
-> FromFunction'PersistedValueMutation)
-> FromFunction'PersistedValueMutation
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *).
Identical f =>
LensLike' f FromFunction'PersistedValueMutation Text
forall (f :: * -> *) s a.
(Functor f, HasField s "stateName" a) =>
LensLike' f s a
PR.stateName (forall (f :: * -> *).
Identical f =>
LensLike' f FromFunction'PersistedValueMutation Text)
-> Text
-> FromFunction'PersistedValueMutation
-> FromFunction'PersistedValueMutation
forall s t a b. Setter s t a b -> b -> s -> t
.~ "flink_state"
FromFunction'PersistedValueMutation
-> (FromFunction'PersistedValueMutation
-> FromFunction'PersistedValueMutation)
-> FromFunction'PersistedValueMutation
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *).
Identical f =>
LensLike' f FromFunction'PersistedValueMutation TypedValue
forall (f :: * -> *) s a.
(Functor f, HasField s "stateValue" a) =>
LensLike' f s a
PR.stateValue (forall (f :: * -> *).
Identical f =>
LensLike' f FromFunction'PersistedValueMutation TypedValue)
-> TypedValue
-> FromFunction'PersistedValueMutation
-> FromFunction'PersistedValueMutation
forall s t a b. Setter s t a b -> b -> s -> t
.~ TypedValue
state
| Bool
mutated
]
createFlinkResp (IncompleteContext (Expiration mode :: ExpirationMode
mode expireTime :: NominalDiffTime
expireTime) typeName :: Text
typeName) =
FromFunction
forall msg. Message msg => msg
defMessage FromFunction -> (FromFunction -> FromFunction) -> FromFunction
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *).
Identical f =>
LensLike' f FromFunction FromFunction'IncompleteInvocationContext
forall (f :: * -> *) s a.
(Functor f, HasField s "incompleteInvocationContext" a) =>
LensLike' f s a
PR.incompleteInvocationContext (forall (f :: * -> *).
Identical f =>
LensLike' f FromFunction FromFunction'IncompleteInvocationContext)
-> FromFunction'IncompleteInvocationContext
-> FromFunction
-> FromFunction
forall s t a b. Setter s t a b -> b -> s -> t
.~ (
FromFunction'IncompleteInvocationContext
forall msg. Message msg => msg
defMessage FromFunction'IncompleteInvocationContext
-> (FromFunction'IncompleteInvocationContext
-> FromFunction'IncompleteInvocationContext)
-> FromFunction'IncompleteInvocationContext
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *).
Identical f =>
LensLike'
f
FromFunction'IncompleteInvocationContext
[FromFunction'PersistedValueSpec]
forall (f :: * -> *) s a.
(Functor f, HasField s "missingValues" a) =>
LensLike' f s a
PR.missingValues (forall (f :: * -> *).
Identical f =>
LensLike'
f
FromFunction'IncompleteInvocationContext
[FromFunction'PersistedValueSpec])
-> [FromFunction'PersistedValueSpec]
-> FromFunction'IncompleteInvocationContext
-> FromFunction'IncompleteInvocationContext
forall s t a b. Setter s t a b -> b -> s -> t
.~ [
FromFunction'PersistedValueSpec
forall msg. Message msg => msg
defMessage
FromFunction'PersistedValueSpec
-> (FromFunction'PersistedValueSpec
-> FromFunction'PersistedValueSpec)
-> FromFunction'PersistedValueSpec
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *).
Identical f =>
LensLike' f FromFunction'PersistedValueSpec Text
forall (f :: * -> *) s a.
(Functor f, HasField s "stateName" a) =>
LensLike' f s a
PR.stateName (forall (f :: * -> *).
Identical f =>
LensLike' f FromFunction'PersistedValueSpec Text)
-> Text
-> FromFunction'PersistedValueSpec
-> FromFunction'PersistedValueSpec
forall s t a b. Setter s t a b -> b -> s -> t
.~ "flink_state"
FromFunction'PersistedValueSpec
-> (FromFunction'PersistedValueSpec
-> FromFunction'PersistedValueSpec)
-> FromFunction'PersistedValueSpec
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *).
Identical f =>
LensLike' f FromFunction'PersistedValueSpec Text
forall (f :: * -> *) s a.
(Functor f, HasField s "typeTypename" a) =>
LensLike' f s a
PR.typeTypename (forall (f :: * -> *).
Identical f =>
LensLike' f FromFunction'PersistedValueSpec Text)
-> Text
-> FromFunction'PersistedValueSpec
-> FromFunction'PersistedValueSpec
forall s t a b. Setter s t a b -> b -> s -> t
.~ Text
typeName
FromFunction'PersistedValueSpec
-> (FromFunction'PersistedValueSpec
-> FromFunction'PersistedValueSpec)
-> FromFunction'PersistedValueSpec
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *).
Identical f =>
LensLike'
f FromFunction'PersistedValueSpec FromFunction'ExpirationSpec
forall (f :: * -> *) s a.
(Functor f, HasField s "expirationSpec" a) =>
LensLike' f s a
PR.expirationSpec (forall (f :: * -> *).
Identical f =>
LensLike'
f FromFunction'PersistedValueSpec FromFunction'ExpirationSpec)
-> FromFunction'ExpirationSpec
-> FromFunction'PersistedValueSpec
-> FromFunction'PersistedValueSpec
forall s t a b. Setter s t a b -> b -> s -> t
.~ (
FromFunction'ExpirationSpec
forall msg. Message msg => msg
defMessage
FromFunction'ExpirationSpec
-> (FromFunction'ExpirationSpec -> FromFunction'ExpirationSpec)
-> FromFunction'ExpirationSpec
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *).
Identical f =>
LensLike' f FromFunction'ExpirationSpec Int64
forall (f :: * -> *) s a.
(Functor f, HasField s "expireAfterMillis" a) =>
LensLike' f s a
PR.expireAfterMillis (forall (f :: * -> *).
Identical f =>
LensLike' f FromFunction'ExpirationSpec Int64)
-> Int64
-> FromFunction'ExpirationSpec
-> FromFunction'ExpirationSpec
forall s t a b. Setter s t a b -> b -> s -> t
.~ NominalDiffTime -> Int64
forall a b. (RealFrac a, Integral b) => a -> b
round (NominalDiffTime
expireTime NominalDiffTime -> NominalDiffTime -> NominalDiffTime
forall a. Num a => a -> a -> a
* 1000.0)
FromFunction'ExpirationSpec
-> (FromFunction'ExpirationSpec -> FromFunction'ExpirationSpec)
-> FromFunction'ExpirationSpec
forall s t. s -> (s -> t) -> t
& forall (f :: * -> *).
Identical f =>
LensLike'
f
FromFunction'ExpirationSpec
FromFunction'ExpirationSpec'ExpireMode
forall (f :: * -> *) s a.
(Functor f, HasField s "mode" a) =>
LensLike' f s a
PR.mode (forall (f :: * -> *).
Identical f =>
LensLike'
f
FromFunction'ExpirationSpec
FromFunction'ExpirationSpec'ExpireMode)
-> FromFunction'ExpirationSpec'ExpireMode
-> FromFunction'ExpirationSpec
-> FromFunction'ExpirationSpec
forall s t a b. Setter s t a b -> b -> s -> t
.~ ExpirationMode -> FromFunction'ExpirationSpec'ExpireMode
pbmode ExpirationMode
mode)
])
where pbmode :: ExpirationMode -> FromFunction'ExpirationSpec'ExpireMode
pbmode NONE = FromFunction'ExpirationSpec'ExpireMode
PR.FromFunction'ExpirationSpec'NONE
pbmode AFTER_CALL = FromFunction'ExpirationSpec'ExpireMode
PR.FromFunction'ExpirationSpec'AFTER_INVOKE
pbmode AFTER_WRITE = FromFunction'ExpirationSpec'ExpireMode
PR.FromFunction'ExpirationSpec'AFTER_WRITE
type FlinkApi =
"statefun" :> ReqBody '[Proto] ToFunction :> Post '[Proto] FromFunction
flinkApi :: Proxy FlinkApi
flinkApi :: Proxy FlinkApi
flinkApi = Proxy FlinkApi
forall k (t :: k). Proxy t
Proxy
createApp :: FunctionTable -> Application
createApp :: FunctionTable -> Application
createApp funcs :: FunctionTable
funcs = Proxy FlinkApi -> Server FlinkApi -> Application
forall api.
HasServer api '[] =>
Proxy api -> Server api -> Application
serve Proxy FlinkApi
flinkApi (FunctionTable -> Server FlinkApi
flinkServer FunctionTable
funcs)
flinkServer :: FunctionTable -> Server FlinkApi
flinkServer :: FunctionTable -> Server FlinkApi
flinkServer functions :: FunctionTable
functions toFunction :: ToFunction
toFunction = do
ToFunction'InvocationBatchRequest
batch <- ToFunction -> Handler ToFunction'InvocationBatchRequest
forall (m :: * -> *) t.
(MonadError ServerError m,
HasField t "maybe'request" (Maybe ToFunction'Request)) =>
t -> m ToFunction'InvocationBatchRequest
getBatch ToFunction
toFunction
(function :: FuncExec
function, (namespace :: Text
namespace, type' :: Text
type', id' :: Text
id')) <- Address -> Handler (FuncExec, (Text, Text, Text))
forall (m :: * -> *) t c.
(MonadError ServerError m, HasField t "id" c,
HasField t "namespace" Text, HasField t "type'" Text) =>
t -> m (FuncExec, (Text, Text, c))
findFunc (ToFunction'InvocationBatchRequest
batch ToFunction'InvocationBatchRequest
-> FoldLike
Address
ToFunction'InvocationBatchRequest
ToFunction'InvocationBatchRequest
Address
Address
-> Address
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike
Address
ToFunction'InvocationBatchRequest
ToFunction'InvocationBatchRequest
Address
Address
forall (f :: * -> *) s a.
(Functor f, HasField s "target" a) =>
LensLike' f s a
PR.target)
Either FlinkError FuncRes
result <- IO (Either FlinkError FuncRes)
-> Handler (Either FlinkError FuncRes)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either FlinkError FuncRes)
-> Handler (Either FlinkError FuncRes))
-> IO (Either FlinkError FuncRes)
-> Handler (Either FlinkError FuncRes)
forall a b. (a -> b) -> a -> b
$ FuncExec
function (Text -> Text -> Text -> Env
Env Text
namespace Text
type' Text
id') ToFunction'InvocationBatchRequest
batch
FuncRes
finalState <- Either ServerError FuncRes -> Handler FuncRes
forall e (m :: * -> *) a. MonadError e m => Either e a -> m a
liftEither (Either ServerError FuncRes -> Handler FuncRes)
-> Either ServerError FuncRes -> Handler FuncRes
forall a b. (a -> b) -> a -> b
$ (FlinkError -> ServerError)
-> Either FlinkError FuncRes -> Either ServerError FuncRes
forall a c b. (a -> c) -> Either a b -> Either c b
mapLeft FlinkError -> ServerError
flinkErrToServant Either FlinkError FuncRes
result
FromFunction -> Handler FromFunction
forall (m :: * -> *) a. Monad m => a -> m a
return (FromFunction -> Handler FromFunction)
-> FromFunction -> Handler FromFunction
forall a b. (a -> b) -> a -> b
$ FuncRes -> FromFunction
createFlinkResp FuncRes
finalState
where
getBatch :: t -> m ToFunction'InvocationBatchRequest
getBatch input :: t
input = m ToFunction'InvocationBatchRequest
-> (ToFunction'InvocationBatchRequest
-> m ToFunction'InvocationBatchRequest)
-> Maybe ToFunction'InvocationBatchRequest
-> m ToFunction'InvocationBatchRequest
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (ServerError -> m ToFunction'InvocationBatchRequest
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError (ServerError -> m ToFunction'InvocationBatchRequest)
-> ServerError -> m ToFunction'InvocationBatchRequest
forall a b. (a -> b) -> a -> b
$ FlinkError -> ServerError
flinkErrToServant FlinkError
MissingInvocationBatch) ToFunction'InvocationBatchRequest
-> m ToFunction'InvocationBatchRequest
forall (m :: * -> *) a. Monad m => a -> m a
return (t
input t
-> Fold
t
t
ToFunction'InvocationBatchRequest
ToFunction'InvocationBatchRequest
-> Maybe ToFunction'InvocationBatchRequest
forall s t a b. s -> Fold s t a b -> Maybe a
^? LensLike' f t (Maybe ToFunction'Request)
forall (f :: * -> *) s a.
(Functor f, HasField s "maybe'request" a) =>
LensLike' f s a
PR.maybe'request LensLike' f t (Maybe ToFunction'Request)
-> ((ToFunction'InvocationBatchRequest
-> f ToFunction'InvocationBatchRequest)
-> Maybe ToFunction'Request -> f (Maybe ToFunction'Request))
-> (ToFunction'InvocationBatchRequest
-> f ToFunction'InvocationBatchRequest)
-> t
-> f t
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (ToFunction'Request -> f ToFunction'Request)
-> Maybe ToFunction'Request -> f (Maybe ToFunction'Request)
forall a b. Prism (Maybe a) (Maybe b) a b
_Just ((ToFunction'Request -> f ToFunction'Request)
-> Maybe ToFunction'Request -> f (Maybe ToFunction'Request))
-> ((ToFunction'InvocationBatchRequest
-> f ToFunction'InvocationBatchRequest)
-> ToFunction'Request -> f ToFunction'Request)
-> (ToFunction'InvocationBatchRequest
-> f ToFunction'InvocationBatchRequest)
-> Maybe ToFunction'Request
-> f (Maybe ToFunction'Request)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (ToFunction'InvocationBatchRequest
-> f ToFunction'InvocationBatchRequest)
-> ToFunction'Request -> f ToFunction'Request
Prism' ToFunction'Request ToFunction'InvocationBatchRequest
PR._ToFunction'Invocation')
findFunc :: t -> m (FuncExec, (Text, Text, c))
findFunc addr :: t
addr = do
FuncExec
res <- m FuncExec
-> (FuncExec -> m FuncExec) -> Maybe FuncExec -> m FuncExec
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (ServerError -> m FuncExec
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError (ServerError -> m FuncExec) -> ServerError -> m FuncExec
forall a b. (a -> b) -> a -> b
$ FlinkError -> ServerError
flinkErrToServant (FlinkError -> ServerError) -> FlinkError -> ServerError
forall a b. (a -> b) -> a -> b
$ (Text, Text) -> FlinkError
NoSuchFunction (Text
namespace, Text
type')) FuncExec -> m FuncExec
forall (m :: * -> *) a. Monad m => a -> m a
return (FuncType -> FunctionTable -> Maybe FuncExec
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup (Text -> Text -> FuncType
FuncType Text
namespace Text
type') FunctionTable
functions)
(FuncExec, (Text, Text, c)) -> m (FuncExec, (Text, Text, c))
forall (m :: * -> *) a. Monad m => a -> m a
return (FuncExec
res, (Text, Text, c)
address)
where
address :: (Text, Text, c)
address@(namespace :: Text
namespace, type' :: Text
type', _) = (t
addr t -> FoldLike Text t t Text Text -> Text
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike Text t t Text Text
forall (f :: * -> *) s a.
(Functor f, HasField s "namespace" a) =>
LensLike' f s a
PR.namespace, t
addr t -> FoldLike Text t t Text Text -> Text
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike Text t t Text Text
forall (f :: * -> *) s a.
(Functor f, HasField s "type'" a) =>
LensLike' f s a
PR.type', t
addr t -> FoldLike c t t c c -> c
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike c t t c c
forall (f :: * -> *) s a.
(Functor f, HasField s "id" a) =>
LensLike' f s a
PR.id)
flinkErrToServant :: FlinkError -> ServerError
flinkErrToServant :: FlinkError -> ServerError
flinkErrToServant err :: FlinkError
err = case FlinkError
err of
MissingInvocationBatch -> ServerError
err400 {errBody :: ByteString
errBody = "Invocation batch missing"}
ProtodeserializeBytesError protoErr :: String
protoErr -> ServerError
err400 {errBody :: ByteString
errBody = "Could not deserializeBytes protobuf " ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> String -> ByteString
BSL.pack String
protoErr}
StateDecodeError decodeErr :: String
decodeErr -> ServerError
err400 {errBody :: ByteString
errBody = "Invalid JSON " ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> String -> ByteString
BSL.pack String
decodeErr}
MessageDecodeError msg :: String
msg -> ServerError
err400 {errBody :: ByteString
errBody = "Failed to decode message " ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> String -> ByteString
BSL.pack String
msg}
ProtoMessageDecodeError msg :: UnpackError
msg -> ServerError
err400 {errBody :: ByteString
errBody = "Failed to decode message " ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> String -> ByteString
BSL.pack (UnpackError -> String
forall a. Show a => a -> String
show UnpackError
msg)}
NoSuchFunction (namespace :: Text
namespace, type' :: Text
type') -> ServerError
err400 {errBody :: ByteString
errBody = "No such function " ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> Text -> ByteString
T.encodeUtf8 (Text -> Text
fromStrict Text
namespace) ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> Text -> ByteString
T.encodeUtf8 (Text -> Text
fromStrict Text
type')}
InvalidTypePassedError expected :: Text
expected passed :: Text
passed -> ServerError
err400 {errBody :: ByteString
errBody = "Expected type " ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> Text -> ByteString
T.encodeUtf8 (Text -> Text
fromStrict Text
expected) ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ", got " ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> Text -> ByteString
T.encodeUtf8 (Text -> Text
fromStrict Text
passed)}
EmptyArgumentPassed -> ServerError
err400 {errBody :: ByteString
errBody = "Empty argument was passed to the function" }