module Network.Flink.Test (testStatefulFunc) where
import Control.Monad.Except (runExceptT)
import Control.Monad.Reader (ReaderT (runReaderT))
import Control.Monad.State (StateT (..))
import Data.Either.Combinators (fromRight')
import Data.Functor (($>))
import Network.Flink.Internal.Stateful
( Env (Env),
Function (runFunction),
FunctionState,
newState,
)
testStatefulFunc ::
s ->
(a -> Function s ()) ->
a ->
IO (FunctionState s)
testStatefulFunc :: s -> (a -> Function s ()) -> a -> IO (FunctionState s)
testStatefulFunc initialCtx :: s
initialCtx func :: a -> Function s ()
func input :: a
input = FunctionState s -> IO (FunctionState s)
runner (s -> FunctionState s
forall a. a -> FunctionState a
newState s
initialCtx)
where
env :: Env
env = Text -> Text -> Text -> Env
Env "test_namespace" "test_function" "placeholder_id"
runner :: FunctionState s -> IO (FunctionState s)
runner state :: FunctionState s
state = do
(res :: Either FlinkError ()
res, state' :: FunctionState s
state') <- ReaderT Env IO (Either FlinkError (), FunctionState s)
-> Env -> IO (Either FlinkError (), FunctionState s)
forall r (m :: * -> *) a. ReaderT r m a -> r -> m a
runReaderT (StateT (FunctionState s) (ReaderT Env IO) (Either FlinkError ())
-> FunctionState s
-> ReaderT Env IO (Either FlinkError (), FunctionState s)
forall s (m :: * -> *) a. StateT s m a -> s -> m (a, s)
runStateT (ExceptT FlinkError (StateT (FunctionState s) (ReaderT Env IO)) ()
-> StateT (FunctionState s) (ReaderT Env IO) (Either FlinkError ())
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT (ExceptT FlinkError (StateT (FunctionState s) (ReaderT Env IO)) ()
-> StateT
(FunctionState s) (ReaderT Env IO) (Either FlinkError ()))
-> ExceptT
FlinkError (StateT (FunctionState s) (ReaderT Env IO)) ()
-> StateT (FunctionState s) (ReaderT Env IO) (Either FlinkError ())
forall a b. (a -> b) -> a -> b
$ Function s ()
-> ExceptT
FlinkError (StateT (FunctionState s) (ReaderT Env IO)) ()
forall s a.
Function s a
-> ExceptT FlinkError (StateT (FunctionState s) (ReaderT Env IO)) a
runFunction (a -> Function s ()
func a
input)) FunctionState s
state) Env
env
FunctionState s -> IO (FunctionState s)
forall (m :: * -> *) a. Monad m => a -> m a
return (FunctionState s -> IO (FunctionState s))
-> FunctionState s -> IO (FunctionState s)
forall a b. (a -> b) -> a -> b
$ Either FlinkError (FunctionState s) -> FunctionState s
forall a b. Either a b -> b
fromRight' (Either FlinkError ()
res Either FlinkError ()
-> FunctionState s -> Either FlinkError (FunctionState s)
forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> FunctionState s
state')