{-# language AllowAmbiguousTypes #-}
{-# language DataKinds #-}
{-# language DeriveFunctor #-}
{-# language FlexibleContexts #-}
{-# language FlexibleInstances #-}
{-# language GADTs #-}
{-# language MultiParamTypeClasses #-}
{-# language PolyKinds #-}
{-# language ScopedTypeVariables #-}
{-# language TypeApplications #-}
{-# language TypeFamilies #-}
{-# language TypeOperators #-}
{-# language UndecidableInstances #-}
module Mu.GRpc.Client.Internal where
import Control.Concurrent.Async
import Control.Concurrent.STM (atomically)
import Control.Concurrent.STM.TMChan
import Control.Concurrent.STM.TMVar
import Control.Monad.IO.Class
import qualified Data.ByteString.Char8 as BS
import Data.Conduit
import qualified Data.Conduit.Combinators as C
import Data.Conduit.TMChan
import Data.Kind
import Network.GRPC.Client (CompressMode (..), IncomingEvent (..),
OutgoingEvent (..), RawReply, StreamDone (..))
import Network.GRPC.Client.Helpers
import Network.GRPC.HTTP2.Encoding (GRPCInput, GRPCOutput)
import Network.HTTP2 (ErrorCode)
import Network.HTTP2.Client (ClientError, ClientIO, TooMuchConcurrency,
runExceptT)
import Mu.Adapter.ProtoBuf.Via
import Mu.GRpc.Avro
import Mu.GRpc.Bridge
import Mu.Rpc
import Mu.Schema
setupGrpcClient' :: GrpcClientConfig -> IO (Either ClientError GrpcClient)
setupGrpcClient' :: GrpcClientConfig -> IO (Either ClientError GrpcClient)
setupGrpcClient' = ExceptT ClientError IO GrpcClient
-> IO (Either ClientError GrpcClient)
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT (ExceptT ClientError IO GrpcClient
-> IO (Either ClientError GrpcClient))
-> (GrpcClientConfig -> ExceptT ClientError IO GrpcClient)
-> GrpcClientConfig
-> IO (Either ClientError GrpcClient)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. GrpcClientConfig -> ExceptT ClientError IO GrpcClient
setupGrpcClient
class GRpcServiceMethodCall (p :: GRpcMessageProtocol) (s :: Service snm mnm) (m :: Method mnm) h where
gRpcServiceMethodCall :: Proxy p -> Proxy s -> Proxy m -> GrpcClient -> h
instance ( KnownName serviceName, KnownName (FindPackageName anns), KnownName mname
, GRpcMethodCall p ('Method mname manns margs mret) h, MkRPC p )
=> GRpcServiceMethodCall p ('Service serviceName anns methods)
('Method mname manns margs mret) h where
gRpcServiceMethodCall :: Proxy p
-> Proxy ('Service serviceName anns methods)
-> Proxy ('Method mname manns margs mret)
-> GrpcClient
-> h
gRpcServiceMethodCall pro :: Proxy p
pro _ = RPCTy p
-> Proxy ('Method mname manns margs mret) -> GrpcClient -> h
forall k (p :: GRpcMessageProtocol) (method :: k) h.
GRpcMethodCall p method h =>
RPCTy p -> Proxy method -> GrpcClient -> h
gRpcMethodCall @p RPCTy p
rpc
where pkgName :: ByteString
pkgName = String -> ByteString
BS.pack (Proxy (FindPackageName anns) -> String
forall k (a :: k) (proxy :: k -> *).
KnownName a =>
proxy a -> String
nameVal (Proxy (FindPackageName anns)
forall k (t :: k). Proxy t
Proxy @(FindPackageName anns)))
svrName :: ByteString
svrName = String -> ByteString
BS.pack (Proxy serviceName -> String
forall k (a :: k) (proxy :: k -> *).
KnownName a =>
proxy a -> String
nameVal (Proxy serviceName
forall k (t :: k). Proxy t
Proxy @serviceName))
metName :: ByteString
metName = String -> ByteString
BS.pack (Proxy mname -> String
forall k (a :: k) (proxy :: k -> *).
KnownName a =>
proxy a -> String
nameVal (Proxy mname
forall k (t :: k). Proxy t
Proxy @mname))
rpc :: RPCTy p
rpc = Proxy p -> ByteString -> ByteString -> ByteString -> RPCTy p
forall (p :: GRpcMessageProtocol).
MkRPC p =>
Proxy p -> ByteString -> ByteString -> ByteString -> RPCTy p
mkRPC Proxy p
pro ByteString
pkgName ByteString
svrName ByteString
metName
data GRpcReply a
= GRpcTooMuchConcurrency TooMuchConcurrency
| GRpcErrorCode ErrorCode
| GRpcErrorString String
| GRpcClientError ClientError
| GRpcOk a
deriving (Int -> GRpcReply a -> ShowS
[GRpcReply a] -> ShowS
GRpcReply a -> String
(Int -> GRpcReply a -> ShowS)
-> (GRpcReply a -> String)
-> ([GRpcReply a] -> ShowS)
-> Show (GRpcReply a)
forall a. Show a => Int -> GRpcReply a -> ShowS
forall a. Show a => [GRpcReply a] -> ShowS
forall a. Show a => GRpcReply a -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [GRpcReply a] -> ShowS
$cshowList :: forall a. Show a => [GRpcReply a] -> ShowS
show :: GRpcReply a -> String
$cshow :: forall a. Show a => GRpcReply a -> String
showsPrec :: Int -> GRpcReply a -> ShowS
$cshowsPrec :: forall a. Show a => Int -> GRpcReply a -> ShowS
Show, a -> GRpcReply b -> GRpcReply a
(a -> b) -> GRpcReply a -> GRpcReply b
(forall a b. (a -> b) -> GRpcReply a -> GRpcReply b)
-> (forall a b. a -> GRpcReply b -> GRpcReply a)
-> Functor GRpcReply
forall a b. a -> GRpcReply b -> GRpcReply a
forall a b. (a -> b) -> GRpcReply a -> GRpcReply b
forall (f :: * -> *).
(forall a b. (a -> b) -> f a -> f b)
-> (forall a b. a -> f b -> f a) -> Functor f
<$ :: a -> GRpcReply b -> GRpcReply a
$c<$ :: forall a b. a -> GRpcReply b -> GRpcReply a
fmap :: (a -> b) -> GRpcReply a -> GRpcReply b
$cfmap :: forall a b. (a -> b) -> GRpcReply a -> GRpcReply b
Functor)
buildGRpcReply1 :: Either TooMuchConcurrency (RawReply a) -> GRpcReply a
buildGRpcReply1 :: Either TooMuchConcurrency (RawReply a) -> GRpcReply a
buildGRpcReply1 (Left tmc :: TooMuchConcurrency
tmc) = TooMuchConcurrency -> GRpcReply a
forall a. TooMuchConcurrency -> GRpcReply a
GRpcTooMuchConcurrency TooMuchConcurrency
tmc
buildGRpcReply1 (Right (Left ec :: ErrorCode
ec)) = ErrorCode -> GRpcReply a
forall a. ErrorCode -> GRpcReply a
GRpcErrorCode ErrorCode
ec
buildGRpcReply1 (Right (Right (_, _, Left es :: String
es))) = String -> GRpcReply a
forall a. String -> GRpcReply a
GRpcErrorString String
es
buildGRpcReply1 (Right (Right (_, _, Right r :: a
r))) = a -> GRpcReply a
forall a. a -> GRpcReply a
GRpcOk a
r
buildGRpcReply2 :: Either TooMuchConcurrency (r, RawReply a) -> GRpcReply a
buildGRpcReply2 :: Either TooMuchConcurrency (r, RawReply a) -> GRpcReply a
buildGRpcReply2 (Left tmc :: TooMuchConcurrency
tmc) = TooMuchConcurrency -> GRpcReply a
forall a. TooMuchConcurrency -> GRpcReply a
GRpcTooMuchConcurrency TooMuchConcurrency
tmc
buildGRpcReply2 (Right (_, Left ec :: ErrorCode
ec)) = ErrorCode -> GRpcReply a
forall a. ErrorCode -> GRpcReply a
GRpcErrorCode ErrorCode
ec
buildGRpcReply2 (Right (_, Right (_, _, Left es :: String
es))) = String -> GRpcReply a
forall a. String -> GRpcReply a
GRpcErrorString String
es
buildGRpcReply2 (Right (_, Right (_, _, Right r :: a
r))) = a -> GRpcReply a
forall a. a -> GRpcReply a
GRpcOk a
r
buildGRpcReply3 :: Either TooMuchConcurrency v -> GRpcReply ()
buildGRpcReply3 :: Either TooMuchConcurrency v -> GRpcReply ()
buildGRpcReply3 (Left tmc :: TooMuchConcurrency
tmc) = TooMuchConcurrency -> GRpcReply ()
forall a. TooMuchConcurrency -> GRpcReply a
GRpcTooMuchConcurrency TooMuchConcurrency
tmc
buildGRpcReply3 (Right _) = () -> GRpcReply ()
forall a. a -> GRpcReply a
GRpcOk ()
simplifyResponse :: ClientIO (GRpcReply a) -> IO (GRpcReply a)
simplifyResponse :: ClientIO (GRpcReply a) -> IO (GRpcReply a)
simplifyResponse reply :: ClientIO (GRpcReply a)
reply = do
Either ClientError (GRpcReply a)
r <- ClientIO (GRpcReply a) -> IO (Either ClientError (GRpcReply a))
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT ClientIO (GRpcReply a)
reply
case Either ClientError (GRpcReply a)
r of
Left e :: ClientError
e -> GRpcReply a -> IO (GRpcReply a)
forall (m :: * -> *) a. Monad m => a -> m a
return (GRpcReply a -> IO (GRpcReply a))
-> GRpcReply a -> IO (GRpcReply a)
forall a b. (a -> b) -> a -> b
$ ClientError -> GRpcReply a
forall a. ClientError -> GRpcReply a
GRpcClientError ClientError
e
Right v :: GRpcReply a
v -> GRpcReply a -> IO (GRpcReply a)
forall (m :: * -> *) a. Monad m => a -> m a
return GRpcReply a
v
class GRPCInput (RPCTy p) (GRpcIWTy p ref r)
=> GRpcInputWrapper (p :: GRpcMessageProtocol) (ref :: TypeRef) (r :: Type) where
type GRpcIWTy p ref r :: Type
buildGRpcIWTy :: Proxy p -> Proxy ref -> r -> GRpcIWTy p ref r
instance ToProtoBufTypeRef ref r
=> GRpcInputWrapper 'MsgProtoBuf ref r where
type GRpcIWTy 'MsgProtoBuf ref r = ViaToProtoBufTypeRef ref r
buildGRpcIWTy :: Proxy 'MsgProtoBuf -> Proxy ref -> r -> GRpcIWTy 'MsgProtoBuf ref r
buildGRpcIWTy _ _ = r -> GRpcIWTy 'MsgProtoBuf ref r
forall (ref :: TypeRef) t. t -> ViaToProtoBufTypeRef ref t
ViaToProtoBufTypeRef
instance (GRPCInput AvroRPC (ViaToAvroTypeRef ('ViaSchema sch sty) r))
=> GRpcInputWrapper 'MsgAvro ('ViaSchema sch sty) r where
type GRpcIWTy 'MsgAvro ('ViaSchema sch sty) r = ViaToAvroTypeRef ('ViaSchema sch sty) r
buildGRpcIWTy :: Proxy 'MsgAvro
-> Proxy ('ViaSchema sch sty)
-> r
-> GRpcIWTy 'MsgAvro ('ViaSchema sch sty) r
buildGRpcIWTy _ _ = r -> GRpcIWTy 'MsgAvro ('ViaSchema sch sty) r
forall (ref :: TypeRef) t. t -> ViaToAvroTypeRef ref t
ViaToAvroTypeRef
class GRPCOutput (RPCTy p) (GRpcOWTy p ref r)
=> GRpcOutputWrapper (p :: GRpcMessageProtocol) (ref :: TypeRef) (r :: Type) where
type GRpcOWTy p ref r :: Type
unGRpcOWTy :: Proxy p -> Proxy ref -> GRpcOWTy p ref r -> r
instance FromProtoBufTypeRef ref r
=> GRpcOutputWrapper 'MsgProtoBuf ref r where
type GRpcOWTy 'MsgProtoBuf ref r = ViaFromProtoBufTypeRef ref r
unGRpcOWTy :: Proxy 'MsgProtoBuf -> Proxy ref -> GRpcOWTy 'MsgProtoBuf ref r -> r
unGRpcOWTy _ _ = GRpcOWTy 'MsgProtoBuf ref r -> r
forall (ref :: TypeRef) t. ViaFromProtoBufTypeRef ref t -> t
unViaFromProtoBufTypeRef
instance (GRPCOutput AvroRPC (ViaFromAvroTypeRef ('ViaSchema sch sty) r))
=> GRpcOutputWrapper 'MsgAvro ('ViaSchema sch sty) r where
type GRpcOWTy 'MsgAvro ('ViaSchema sch sty) r = ViaFromAvroTypeRef ('ViaSchema sch sty) r
unGRpcOWTy :: Proxy 'MsgAvro
-> Proxy ('ViaSchema sch sty)
-> GRpcOWTy 'MsgAvro ('ViaSchema sch sty) r
-> r
unGRpcOWTy _ _ = GRpcOWTy 'MsgAvro ('ViaSchema sch sty) r -> r
forall (ref :: TypeRef) t. ViaFromAvroTypeRef ref t -> t
unViaFromAvroTypeRef
class GRpcMethodCall (p :: GRpcMessageProtocol) method h where
gRpcMethodCall :: RPCTy p -> Proxy method -> GrpcClient -> h
instance ( KnownName name
, GRPCInput (RPCTy p) (), GRPCOutput (RPCTy p) ()
, handler ~ IO (GRpcReply ()) )
=> GRpcMethodCall p ('Method name anns '[ ] 'RetNothing) handler where
gRpcMethodCall :: RPCTy p
-> Proxy ('Method name anns '[] 'RetNothing)
-> GrpcClient
-> handler
gRpcMethodCall rpc :: RPCTy p
rpc _ client :: GrpcClient
client
= ClientIO (GRpcReply ()) -> handler
forall a. ClientIO (GRpcReply a) -> IO (GRpcReply a)
simplifyResponse (ClientIO (GRpcReply ()) -> handler)
-> ClientIO (GRpcReply ()) -> handler
forall a b. (a -> b) -> a -> b
$
Either TooMuchConcurrency (RawReply ()) -> GRpcReply ()
forall a. Either TooMuchConcurrency (RawReply a) -> GRpcReply a
buildGRpcReply1 (Either TooMuchConcurrency (RawReply ()) -> GRpcReply ())
-> ExceptT ClientError IO (Either TooMuchConcurrency (RawReply ()))
-> ClientIO (GRpcReply ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
RPCTy p
-> GrpcClient
-> ()
-> ExceptT ClientError IO (Either TooMuchConcurrency (RawReply ()))
forall r i o.
(GRPCInput r i, GRPCOutput r o) =>
r
-> GrpcClient
-> i
-> ClientIO (Either TooMuchConcurrency (RawReply o))
rawUnary RPCTy p
rpc GrpcClient
client ()
instance ( KnownName name
, GRPCInput (RPCTy p) (), GRpcOutputWrapper p rref r
, handler ~ IO (GRpcReply r) )
=> GRpcMethodCall p ('Method name anns '[ ] ('RetSingle rref)) handler where
gRpcMethodCall :: RPCTy p
-> Proxy ('Method name anns '[] ('RetSingle rref))
-> GrpcClient
-> handler
gRpcMethodCall rpc :: RPCTy p
rpc _ client :: GrpcClient
client
= (GRpcReply (GRpcOWTy p rref r) -> GRpcReply r)
-> IO (GRpcReply (GRpcOWTy p rref r)) -> IO (GRpcReply r)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((GRpcOWTy p rref r -> r)
-> GRpcReply (GRpcOWTy p rref r) -> GRpcReply r
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Proxy p -> Proxy rref -> GRpcOWTy p rref r -> r
forall (p :: GRpcMessageProtocol) (ref :: TypeRef) r.
GRpcOutputWrapper p ref r =>
Proxy p -> Proxy ref -> GRpcOWTy p ref r -> r
unGRpcOWTy (Proxy p
forall k (t :: k). Proxy t
Proxy @p) (Proxy rref
forall k (t :: k). Proxy t
Proxy @rref))) (IO (GRpcReply (GRpcOWTy p rref r)) -> handler)
-> IO (GRpcReply (GRpcOWTy p rref r)) -> handler
forall a b. (a -> b) -> a -> b
$
ClientIO (GRpcReply (GRpcOWTy p rref r))
-> IO (GRpcReply (GRpcOWTy p rref r))
forall a. ClientIO (GRpcReply a) -> IO (GRpcReply a)
simplifyResponse (ClientIO (GRpcReply (GRpcOWTy p rref r))
-> IO (GRpcReply (GRpcOWTy p rref r)))
-> ClientIO (GRpcReply (GRpcOWTy p rref r))
-> IO (GRpcReply (GRpcOWTy p rref r))
forall a b. (a -> b) -> a -> b
$
Either TooMuchConcurrency (RawReply (GRpcOWTy p rref r))
-> GRpcReply (GRpcOWTy p rref r)
forall a. Either TooMuchConcurrency (RawReply a) -> GRpcReply a
buildGRpcReply1 (Either TooMuchConcurrency (RawReply (GRpcOWTy p rref r))
-> GRpcReply (GRpcOWTy p rref r))
-> ExceptT
ClientError
IO
(Either TooMuchConcurrency (RawReply (GRpcOWTy p rref r)))
-> ClientIO (GRpcReply (GRpcOWTy p rref r))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
RPCTy p
-> GrpcClient
-> ()
-> ExceptT
ClientError
IO
(Either TooMuchConcurrency (RawReply (GRpcOWTy p rref r)))
forall r i o.
(GRPCInput r i, GRPCOutput r o) =>
r
-> GrpcClient
-> i
-> ClientIO (Either TooMuchConcurrency (RawReply o))
rawUnary @_ @() @(GRpcOWTy p rref r) RPCTy p
rpc GrpcClient
client ()
instance ( KnownName name
, GRPCInput (RPCTy p) (), GRpcOutputWrapper p rref r
, handler ~ IO (ConduitT () (GRpcReply r) IO ()) )
=> GRpcMethodCall p ('Method name anns '[ ] ('RetStream rref)) handler where
gRpcMethodCall :: RPCTy p
-> Proxy ('Method name anns '[] ('RetStream rref))
-> GrpcClient
-> handler
gRpcMethodCall rpc :: RPCTy p
rpc _ client :: GrpcClient
client
= do
TMChan r
chan <- IO (TMChan r)
forall a. IO (TMChan a)
newTMChanIO :: IO (TMChan r)
TMVar (GRpcReply ())
var <- IO (TMVar (GRpcReply ()))
forall a. IO (TMVar a)
newEmptyTMVarIO
Async ()
_ <- IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async (IO () -> IO (Async ())) -> IO () -> IO (Async ())
forall a b. (a -> b) -> a -> b
$ do
GRpcReply ()
v <- ClientIO (GRpcReply ()) -> IO (GRpcReply ())
forall a. ClientIO (GRpcReply a) -> IO (GRpcReply a)
simplifyResponse (ClientIO (GRpcReply ()) -> IO (GRpcReply ()))
-> ClientIO (GRpcReply ()) -> IO (GRpcReply ())
forall a b. (a -> b) -> a -> b
$
Either TooMuchConcurrency ((), HeaderList, HeaderList)
-> GRpcReply ()
forall v. Either TooMuchConcurrency v -> GRpcReply ()
buildGRpcReply3 (Either TooMuchConcurrency ((), HeaderList, HeaderList)
-> GRpcReply ())
-> ExceptT
ClientError
IO
(Either TooMuchConcurrency ((), HeaderList, HeaderList))
-> ClientIO (GRpcReply ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
RPCTy p
-> GrpcClient
-> ()
-> ()
-> (() -> HeaderList -> GRpcOWTy p rref r -> ClientIO ())
-> ExceptT
ClientError
IO
(Either TooMuchConcurrency ((), HeaderList, HeaderList))
forall r i o a.
(GRPCInput r i, GRPCOutput r o) =>
r
-> GrpcClient
-> a
-> i
-> (a -> HeaderList -> o -> ClientIO a)
-> ClientIO (Either TooMuchConcurrency (a, HeaderList, HeaderList))
rawStreamServer @_ @() @(GRpcOWTy p rref r)
RPCTy p
rpc GrpcClient
client () ()
(\_ _ newVal :: GRpcOWTy p rref r
newVal -> IO () -> ClientIO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ClientIO ()) -> IO () -> ClientIO ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
Bool
_ <- TMVar (GRpcReply ()) -> GRpcReply () -> STM Bool
forall a. TMVar a -> a -> STM Bool
tryPutTMVar TMVar (GRpcReply ())
var (() -> GRpcReply ()
forall a. a -> GRpcReply a
GRpcOk ())
TMChan r -> r -> STM ()
forall a. TMChan a -> a -> STM ()
writeTMChan TMChan r
chan (Proxy p -> Proxy rref -> GRpcOWTy p rref r -> r
forall (p :: GRpcMessageProtocol) (ref :: TypeRef) r.
GRpcOutputWrapper p ref r =>
Proxy p -> Proxy ref -> GRpcOWTy p ref r -> r
unGRpcOWTy (Proxy p
forall k (t :: k). Proxy t
Proxy @p) (Proxy rref
forall k (t :: k). Proxy t
Proxy @rref) GRpcOWTy p rref r
newVal))
case GRpcReply ()
v of
GRpcOk () -> IO () -> IO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMChan r -> STM ()
forall a. TMChan a -> STM ()
closeTMChan TMChan r
chan
_ -> IO () -> IO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMVar (GRpcReply ()) -> GRpcReply () -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar TMVar (GRpcReply ())
var GRpcReply ()
v
let go :: ConduitT () (GRpcReply r) IO ()
go = do GRpcReply ()
firstResult <- IO (GRpcReply ()) -> ConduitT () (GRpcReply r) IO (GRpcReply ())
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (GRpcReply ()) -> ConduitT () (GRpcReply r) IO (GRpcReply ()))
-> IO (GRpcReply ()) -> ConduitT () (GRpcReply r) IO (GRpcReply ())
forall a b. (a -> b) -> a -> b
$ STM (GRpcReply ()) -> IO (GRpcReply ())
forall a. STM a -> IO a
atomically (STM (GRpcReply ()) -> IO (GRpcReply ()))
-> STM (GRpcReply ()) -> IO (GRpcReply ())
forall a b. (a -> b) -> a -> b
$ TMVar (GRpcReply ()) -> STM (GRpcReply ())
forall a. TMVar a -> STM a
takeTMVar TMVar (GRpcReply ())
var
case GRpcReply ()
firstResult of
GRpcOk _ ->
TMChan r -> ConduitT () r IO ()
forall (m :: * -> *) a. MonadIO m => TMChan a -> ConduitT () a m ()
sourceTMChan TMChan r
chan ConduitT () r IO ()
-> ConduitM r (GRpcReply r) IO ()
-> ConduitT () (GRpcReply r) IO ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| (r -> GRpcReply r) -> ConduitM r (GRpcReply r) IO ()
forall (m :: * -> *) a b. Monad m => (a -> b) -> ConduitT a b m ()
C.map r -> GRpcReply r
forall a. a -> GRpcReply a
GRpcOk
e :: GRpcReply ()
e -> GRpcReply r -> ConduitT () (GRpcReply r) IO ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield (GRpcReply r -> ConduitT () (GRpcReply r) IO ())
-> GRpcReply r -> ConduitT () (GRpcReply r) IO ()
forall a b. (a -> b) -> a -> b
$ (\_ -> String -> r
forall a. HasCallStack => String -> a
error "this should never happen") (() -> r) -> GRpcReply () -> GRpcReply r
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> GRpcReply ()
e
ConduitT () (GRpcReply r) IO ()
-> IO (ConduitT () (GRpcReply r) IO ())
forall (m :: * -> *) a. Monad m => a -> m a
return ConduitT () (GRpcReply r) IO ()
go
instance ( KnownName name
, GRpcInputWrapper p vref v, GRPCOutput (RPCTy p) ()
, handler ~ (v -> IO (GRpcReply ())) )
=> GRpcMethodCall p ('Method name anns '[ 'ArgSingle vref ] 'RetNothing) handler where
gRpcMethodCall :: RPCTy p
-> Proxy ('Method name anns '[ 'ArgSingle vref] 'RetNothing)
-> GrpcClient
-> handler
gRpcMethodCall rpc :: RPCTy p
rpc _ client :: GrpcClient
client x :: v
x
= ClientIO (GRpcReply ()) -> IO (GRpcReply ())
forall a. ClientIO (GRpcReply a) -> IO (GRpcReply a)
simplifyResponse (ClientIO (GRpcReply ()) -> IO (GRpcReply ()))
-> ClientIO (GRpcReply ()) -> IO (GRpcReply ())
forall a b. (a -> b) -> a -> b
$
Either TooMuchConcurrency (RawReply ()) -> GRpcReply ()
forall a. Either TooMuchConcurrency (RawReply a) -> GRpcReply a
buildGRpcReply1 (Either TooMuchConcurrency (RawReply ()) -> GRpcReply ())
-> ExceptT ClientError IO (Either TooMuchConcurrency (RawReply ()))
-> ClientIO (GRpcReply ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
RPCTy p
-> GrpcClient
-> GRpcIWTy p vref v
-> ExceptT ClientError IO (Either TooMuchConcurrency (RawReply ()))
forall r i o.
(GRPCInput r i, GRPCOutput r o) =>
r
-> GrpcClient
-> i
-> ClientIO (Either TooMuchConcurrency (RawReply o))
rawUnary @_ @(GRpcIWTy p vref v) @() RPCTy p
rpc GrpcClient
client (Proxy p -> Proxy vref -> v -> GRpcIWTy p vref v
forall (p :: GRpcMessageProtocol) (ref :: TypeRef) r.
GRpcInputWrapper p ref r =>
Proxy p -> Proxy ref -> r -> GRpcIWTy p ref r
buildGRpcIWTy (Proxy p
forall k (t :: k). Proxy t
Proxy @p) (Proxy vref
forall k (t :: k). Proxy t
Proxy @vref) v
x)
instance ( KnownName name
, GRpcInputWrapper p vref v, GRpcOutputWrapper p rref r
, handler ~ (v -> IO (GRpcReply r)) )
=> GRpcMethodCall p ('Method name anns '[ 'ArgSingle vref ] ('RetSingle rref)) handler where
gRpcMethodCall :: RPCTy p
-> Proxy ('Method name anns '[ 'ArgSingle vref] ('RetSingle rref))
-> GrpcClient
-> handler
gRpcMethodCall rpc :: RPCTy p
rpc _ client :: GrpcClient
client x :: v
x
= (GRpcReply (GRpcOWTy p rref r) -> GRpcReply r)
-> IO (GRpcReply (GRpcOWTy p rref r)) -> IO (GRpcReply r)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((GRpcOWTy p rref r -> r)
-> GRpcReply (GRpcOWTy p rref r) -> GRpcReply r
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Proxy p -> Proxy rref -> GRpcOWTy p rref r -> r
forall (p :: GRpcMessageProtocol) (ref :: TypeRef) r.
GRpcOutputWrapper p ref r =>
Proxy p -> Proxy ref -> GRpcOWTy p ref r -> r
unGRpcOWTy (Proxy p
forall k (t :: k). Proxy t
Proxy @p) (Proxy rref
forall k (t :: k). Proxy t
Proxy @rref))) (IO (GRpcReply (GRpcOWTy p rref r)) -> IO (GRpcReply r))
-> IO (GRpcReply (GRpcOWTy p rref r)) -> IO (GRpcReply r)
forall a b. (a -> b) -> a -> b
$
ClientIO (GRpcReply (GRpcOWTy p rref r))
-> IO (GRpcReply (GRpcOWTy p rref r))
forall a. ClientIO (GRpcReply a) -> IO (GRpcReply a)
simplifyResponse (ClientIO (GRpcReply (GRpcOWTy p rref r))
-> IO (GRpcReply (GRpcOWTy p rref r)))
-> ClientIO (GRpcReply (GRpcOWTy p rref r))
-> IO (GRpcReply (GRpcOWTy p rref r))
forall a b. (a -> b) -> a -> b
$
Either TooMuchConcurrency (RawReply (GRpcOWTy p rref r))
-> GRpcReply (GRpcOWTy p rref r)
forall a. Either TooMuchConcurrency (RawReply a) -> GRpcReply a
buildGRpcReply1 (Either TooMuchConcurrency (RawReply (GRpcOWTy p rref r))
-> GRpcReply (GRpcOWTy p rref r))
-> ExceptT
ClientError
IO
(Either TooMuchConcurrency (RawReply (GRpcOWTy p rref r)))
-> ClientIO (GRpcReply (GRpcOWTy p rref r))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
RPCTy p
-> GrpcClient
-> GRpcIWTy p vref v
-> ExceptT
ClientError
IO
(Either TooMuchConcurrency (RawReply (GRpcOWTy p rref r)))
forall r i o.
(GRPCInput r i, GRPCOutput r o) =>
r
-> GrpcClient
-> i
-> ClientIO (Either TooMuchConcurrency (RawReply o))
rawUnary @_ @(GRpcIWTy p vref v) @(GRpcOWTy p rref r)
RPCTy p
rpc GrpcClient
client (Proxy p -> Proxy vref -> v -> GRpcIWTy p vref v
forall (p :: GRpcMessageProtocol) (ref :: TypeRef) r.
GRpcInputWrapper p ref r =>
Proxy p -> Proxy ref -> r -> GRpcIWTy p ref r
buildGRpcIWTy (Proxy p
forall k (t :: k). Proxy t
Proxy @p) (Proxy vref
forall k (t :: k). Proxy t
Proxy @vref) v
x)
instance ( KnownName name
, GRpcInputWrapper p vref v, GRpcOutputWrapper p rref r
, handler ~ (CompressMode -> IO (ConduitT v Void IO (GRpcReply r))) )
=> GRpcMethodCall p ('Method name anns '[ 'ArgStream vref ] ('RetSingle rref)) handler where
gRpcMethodCall :: RPCTy p
-> Proxy ('Method name anns '[ 'ArgStream vref] ('RetSingle rref))
-> GrpcClient
-> handler
gRpcMethodCall rpc :: RPCTy p
rpc _ client :: GrpcClient
client compress :: CompressMode
compress
= do
TMChan v
chan <- IO (TMChan v)
forall a. IO (TMChan a)
newTMChanIO :: IO (TMChan v)
Async (GRpcReply r)
promise <- IO (GRpcReply r) -> IO (Async (GRpcReply r))
forall a. IO a -> IO (Async a)
async (IO (GRpcReply r) -> IO (Async (GRpcReply r)))
-> IO (GRpcReply r) -> IO (Async (GRpcReply r))
forall a b. (a -> b) -> a -> b
$
(GRpcReply (GRpcOWTy p rref r) -> GRpcReply r)
-> IO (GRpcReply (GRpcOWTy p rref r)) -> IO (GRpcReply r)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((GRpcOWTy p rref r -> r)
-> GRpcReply (GRpcOWTy p rref r) -> GRpcReply r
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Proxy p -> Proxy rref -> GRpcOWTy p rref r -> r
forall (p :: GRpcMessageProtocol) (ref :: TypeRef) r.
GRpcOutputWrapper p ref r =>
Proxy p -> Proxy ref -> GRpcOWTy p ref r -> r
unGRpcOWTy (Proxy p
forall k (t :: k). Proxy t
Proxy @p) (Proxy rref
forall k (t :: k). Proxy t
Proxy @rref))) (IO (GRpcReply (GRpcOWTy p rref r)) -> IO (GRpcReply r))
-> IO (GRpcReply (GRpcOWTy p rref r)) -> IO (GRpcReply r)
forall a b. (a -> b) -> a -> b
$
ClientIO (GRpcReply (GRpcOWTy p rref r))
-> IO (GRpcReply (GRpcOWTy p rref r))
forall a. ClientIO (GRpcReply a) -> IO (GRpcReply a)
simplifyResponse (ClientIO (GRpcReply (GRpcOWTy p rref r))
-> IO (GRpcReply (GRpcOWTy p rref r)))
-> ClientIO (GRpcReply (GRpcOWTy p rref r))
-> IO (GRpcReply (GRpcOWTy p rref r))
forall a b. (a -> b) -> a -> b
$
Either TooMuchConcurrency ((), RawReply (GRpcOWTy p rref r))
-> GRpcReply (GRpcOWTy p rref r)
forall r a.
Either TooMuchConcurrency (r, RawReply a) -> GRpcReply a
buildGRpcReply2 (Either TooMuchConcurrency ((), RawReply (GRpcOWTy p rref r))
-> GRpcReply (GRpcOWTy p rref r))
-> ExceptT
ClientError
IO
(Either TooMuchConcurrency ((), RawReply (GRpcOWTy p rref r)))
-> ClientIO (GRpcReply (GRpcOWTy p rref r))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
RPCTy p
-> GrpcClient
-> ()
-> (()
-> ClientIO
((), Either StreamDone (CompressMode, GRpcIWTy p vref v)))
-> ExceptT
ClientError
IO
(Either TooMuchConcurrency ((), RawReply (GRpcOWTy p rref r)))
forall r i o a.
(GRPCInput r i, GRPCOutput r o) =>
r
-> GrpcClient
-> a
-> (a -> ClientIO (a, Either StreamDone (CompressMode, i)))
-> ClientIO (Either TooMuchConcurrency (a, RawReply o))
rawStreamClient @_ @(GRpcIWTy p vref v) @(GRpcOWTy p rref r) RPCTy p
rpc GrpcClient
client ()
(\_ -> do Maybe v
nextVal <- IO (Maybe v) -> ExceptT ClientError IO (Maybe v)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe v) -> ExceptT ClientError IO (Maybe v))
-> IO (Maybe v) -> ExceptT ClientError IO (Maybe v)
forall a b. (a -> b) -> a -> b
$ STM (Maybe v) -> IO (Maybe v)
forall a. STM a -> IO a
atomically (STM (Maybe v) -> IO (Maybe v)) -> STM (Maybe v) -> IO (Maybe v)
forall a b. (a -> b) -> a -> b
$ TMChan v -> STM (Maybe v)
forall a. TMChan a -> STM (Maybe a)
readTMChan TMChan v
chan
case Maybe v
nextVal of
Nothing -> ((), Either StreamDone (CompressMode, GRpcIWTy p vref v))
-> ClientIO
((), Either StreamDone (CompressMode, GRpcIWTy p vref v))
forall (m :: * -> *) a. Monad m => a -> m a
return ((), StreamDone -> Either StreamDone (CompressMode, GRpcIWTy p vref v)
forall a b. a -> Either a b
Left StreamDone
StreamDone)
Just v :: v
v -> ((), Either StreamDone (CompressMode, GRpcIWTy p vref v))
-> ClientIO
((), Either StreamDone (CompressMode, GRpcIWTy p vref v))
forall (m :: * -> *) a. Monad m => a -> m a
return ((), (CompressMode, GRpcIWTy p vref v)
-> Either StreamDone (CompressMode, GRpcIWTy p vref v)
forall a b. b -> Either a b
Right (CompressMode
compress, Proxy p -> Proxy vref -> v -> GRpcIWTy p vref v
forall (p :: GRpcMessageProtocol) (ref :: TypeRef) r.
GRpcInputWrapper p ref r =>
Proxy p -> Proxy ref -> r -> GRpcIWTy p ref r
buildGRpcIWTy (Proxy p
forall k (t :: k). Proxy t
Proxy @p) (Proxy vref
forall k (t :: k). Proxy t
Proxy @vref) v
v)))
let go :: ConduitT v Void IO (GRpcReply r)
go = do Maybe v
x <- ConduitT v Void IO (Maybe v)
forall (m :: * -> *) i. Monad m => Consumer i m (Maybe i)
await
case Maybe v
x of
Just v :: v
v -> do IO () -> ConduitT v Void IO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ConduitT v Void IO ()) -> IO () -> ConduitT v Void IO ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMChan v -> v -> STM ()
forall a. TMChan a -> a -> STM ()
writeTMChan TMChan v
chan v
v
ConduitT v Void IO (GRpcReply r)
go
Nothing -> do IO () -> ConduitT v Void IO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ConduitT v Void IO ()) -> IO () -> ConduitT v Void IO ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMChan v -> STM ()
forall a. TMChan a -> STM ()
closeTMChan TMChan v
chan
IO (GRpcReply r) -> ConduitT v Void IO (GRpcReply r)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (GRpcReply r) -> ConduitT v Void IO (GRpcReply r))
-> IO (GRpcReply r) -> ConduitT v Void IO (GRpcReply r)
forall a b. (a -> b) -> a -> b
$ Async (GRpcReply r) -> IO (GRpcReply r)
forall a. Async a -> IO a
wait Async (GRpcReply r)
promise
ConduitT v Void IO (GRpcReply r)
-> IO (ConduitT v Void IO (GRpcReply r))
forall (m :: * -> *) a. Monad m => a -> m a
return ConduitT v Void IO (GRpcReply r)
go
instance ( KnownName name
, GRpcInputWrapper p vref v, GRpcOutputWrapper p rref r
, handler ~ (v -> IO (ConduitT () (GRpcReply r) IO ())) )
=> GRpcMethodCall p ('Method name anns '[ 'ArgSingle vref ] ('RetStream rref)) handler where
gRpcMethodCall :: RPCTy p
-> Proxy ('Method name anns '[ 'ArgSingle vref] ('RetStream rref))
-> GrpcClient
-> handler
gRpcMethodCall rpc :: RPCTy p
rpc _ client :: GrpcClient
client x :: v
x
= do
TMChan r
chan <- IO (TMChan r)
forall a. IO (TMChan a)
newTMChanIO :: IO (TMChan r)
TMVar (GRpcReply ())
var <- IO (TMVar (GRpcReply ()))
forall a. IO (TMVar a)
newEmptyTMVarIO
Async ()
_ <- IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async (IO () -> IO (Async ())) -> IO () -> IO (Async ())
forall a b. (a -> b) -> a -> b
$ do
GRpcReply ()
v <- ClientIO (GRpcReply ()) -> IO (GRpcReply ())
forall a. ClientIO (GRpcReply a) -> IO (GRpcReply a)
simplifyResponse (ClientIO (GRpcReply ()) -> IO (GRpcReply ()))
-> ClientIO (GRpcReply ()) -> IO (GRpcReply ())
forall a b. (a -> b) -> a -> b
$
Either TooMuchConcurrency ((), HeaderList, HeaderList)
-> GRpcReply ()
forall v. Either TooMuchConcurrency v -> GRpcReply ()
buildGRpcReply3 (Either TooMuchConcurrency ((), HeaderList, HeaderList)
-> GRpcReply ())
-> ExceptT
ClientError
IO
(Either TooMuchConcurrency ((), HeaderList, HeaderList))
-> ClientIO (GRpcReply ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
RPCTy p
-> GrpcClient
-> ()
-> GRpcIWTy p vref v
-> (() -> HeaderList -> GRpcOWTy p rref r -> ClientIO ())
-> ExceptT
ClientError
IO
(Either TooMuchConcurrency ((), HeaderList, HeaderList))
forall r i o a.
(GRPCInput r i, GRPCOutput r o) =>
r
-> GrpcClient
-> a
-> i
-> (a -> HeaderList -> o -> ClientIO a)
-> ClientIO (Either TooMuchConcurrency (a, HeaderList, HeaderList))
rawStreamServer @_ @(GRpcIWTy p vref v) @(GRpcOWTy p rref r)
RPCTy p
rpc GrpcClient
client () (Proxy p -> Proxy vref -> v -> GRpcIWTy p vref v
forall (p :: GRpcMessageProtocol) (ref :: TypeRef) r.
GRpcInputWrapper p ref r =>
Proxy p -> Proxy ref -> r -> GRpcIWTy p ref r
buildGRpcIWTy (Proxy p
forall k (t :: k). Proxy t
Proxy @p) (Proxy vref
forall k (t :: k). Proxy t
Proxy @vref) v
x)
(\_ _ newVal :: GRpcOWTy p rref r
newVal -> IO () -> ClientIO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ClientIO ()) -> IO () -> ClientIO ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
Bool
_ <- TMVar (GRpcReply ()) -> GRpcReply () -> STM Bool
forall a. TMVar a -> a -> STM Bool
tryPutTMVar TMVar (GRpcReply ())
var (() -> GRpcReply ()
forall a. a -> GRpcReply a
GRpcOk ())
TMChan r -> r -> STM ()
forall a. TMChan a -> a -> STM ()
writeTMChan TMChan r
chan (Proxy p -> Proxy rref -> GRpcOWTy p rref r -> r
forall (p :: GRpcMessageProtocol) (ref :: TypeRef) r.
GRpcOutputWrapper p ref r =>
Proxy p -> Proxy ref -> GRpcOWTy p ref r -> r
unGRpcOWTy (Proxy p
forall k (t :: k). Proxy t
Proxy @p) (Proxy rref
forall k (t :: k). Proxy t
Proxy @rref) GRpcOWTy p rref r
newVal))
case GRpcReply ()
v of
GRpcOk () -> IO () -> IO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMChan r -> STM ()
forall a. TMChan a -> STM ()
closeTMChan TMChan r
chan
_ -> IO () -> IO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMVar (GRpcReply ()) -> GRpcReply () -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar TMVar (GRpcReply ())
var GRpcReply ()
v
let go :: ConduitT () (GRpcReply r) IO ()
go = do GRpcReply ()
firstResult <- IO (GRpcReply ()) -> ConduitT () (GRpcReply r) IO (GRpcReply ())
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (GRpcReply ()) -> ConduitT () (GRpcReply r) IO (GRpcReply ()))
-> IO (GRpcReply ()) -> ConduitT () (GRpcReply r) IO (GRpcReply ())
forall a b. (a -> b) -> a -> b
$ STM (GRpcReply ()) -> IO (GRpcReply ())
forall a. STM a -> IO a
atomically (STM (GRpcReply ()) -> IO (GRpcReply ()))
-> STM (GRpcReply ()) -> IO (GRpcReply ())
forall a b. (a -> b) -> a -> b
$ TMVar (GRpcReply ()) -> STM (GRpcReply ())
forall a. TMVar a -> STM a
takeTMVar TMVar (GRpcReply ())
var
case GRpcReply ()
firstResult of
GRpcOk _ ->
TMChan r -> ConduitT () r IO ()
forall (m :: * -> *) a. MonadIO m => TMChan a -> ConduitT () a m ()
sourceTMChan TMChan r
chan ConduitT () r IO ()
-> ConduitM r (GRpcReply r) IO ()
-> ConduitT () (GRpcReply r) IO ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| (r -> GRpcReply r) -> ConduitM r (GRpcReply r) IO ()
forall (m :: * -> *) a b. Monad m => (a -> b) -> ConduitT a b m ()
C.map r -> GRpcReply r
forall a. a -> GRpcReply a
GRpcOk
e :: GRpcReply ()
e -> GRpcReply r -> ConduitT () (GRpcReply r) IO ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield (GRpcReply r -> ConduitT () (GRpcReply r) IO ())
-> GRpcReply r -> ConduitT () (GRpcReply r) IO ()
forall a b. (a -> b) -> a -> b
$ (\_ -> String -> r
forall a. HasCallStack => String -> a
error "this should never happen") (() -> r) -> GRpcReply () -> GRpcReply r
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> GRpcReply ()
e
ConduitT () (GRpcReply r) IO ()
-> IO (ConduitT () (GRpcReply r) IO ())
forall (m :: * -> *) a. Monad m => a -> m a
return ConduitT () (GRpcReply r) IO ()
go
instance ( KnownName name
, GRpcInputWrapper p vref v, GRpcOutputWrapper p rref r
, handler ~ (CompressMode -> IO (ConduitT v (GRpcReply r) IO ())) )
=> GRpcMethodCall p ('Method name anns '[ 'ArgStream vref ] ('RetStream rref)) handler where
gRpcMethodCall :: RPCTy p
-> Proxy ('Method name anns '[ 'ArgStream vref] ('RetStream rref))
-> GrpcClient
-> handler
gRpcMethodCall rpc :: RPCTy p
rpc _ client :: GrpcClient
client compress :: CompressMode
compress
= do
TMChan (GRpcReply r)
inchan <- IO (TMChan (GRpcReply r))
forall a. IO (TMChan a)
newTMChanIO :: IO (TMChan (GRpcReply r))
TMChan v
outchan <- IO (TMChan v)
forall a. IO (TMChan a)
newTMChanIO :: IO (TMChan v)
TMVar (GRpcReply ())
var <- IO (TMVar (GRpcReply ()))
forall a. IO (TMVar a)
newEmptyTMVarIO
Async ()
_ <- IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async (IO () -> IO (Async ())) -> IO () -> IO (Async ())
forall a b. (a -> b) -> a -> b
$ do
GRpcReply ()
v <- ClientIO (GRpcReply ()) -> IO (GRpcReply ())
forall a. ClientIO (GRpcReply a) -> IO (GRpcReply a)
simplifyResponse (ClientIO (GRpcReply ()) -> IO (GRpcReply ()))
-> ClientIO (GRpcReply ()) -> IO (GRpcReply ())
forall a b. (a -> b) -> a -> b
$
Either TooMuchConcurrency ((), ()) -> GRpcReply ()
forall v. Either TooMuchConcurrency v -> GRpcReply ()
buildGRpcReply3 (Either TooMuchConcurrency ((), ()) -> GRpcReply ())
-> ExceptT ClientError IO (Either TooMuchConcurrency ((), ()))
-> ClientIO (GRpcReply ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
RPCTy p
-> GrpcClient
-> ()
-> (() -> IncomingEvent (GRpcOWTy p rref r) () -> ClientIO ())
-> ()
-> (() -> ClientIO ((), OutgoingEvent (GRpcIWTy p vref v) ()))
-> ExceptT ClientError IO (Either TooMuchConcurrency ((), ()))
forall r i o a b.
(GRPCInput r i, GRPCOutput r o) =>
r
-> GrpcClient
-> a
-> (a -> IncomingEvent o a -> ClientIO a)
-> b
-> (b -> ClientIO (b, OutgoingEvent i b))
-> ClientIO (Either TooMuchConcurrency (a, b))
rawGeneralStream
@_ @(GRpcIWTy p vref v) @(GRpcOWTy p rref r)
RPCTy p
rpc GrpcClient
client
() (\_ ievent :: IncomingEvent (GRpcOWTy p rref r) ()
ievent -> do
Bool
_ <- IO Bool -> ExceptT ClientError IO Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> ExceptT ClientError IO Bool)
-> IO Bool -> ExceptT ClientError IO Bool
forall a b. (a -> b) -> a -> b
$ STM Bool -> IO Bool
forall a. STM a -> IO a
atomically (STM Bool -> IO Bool) -> STM Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ TMVar (GRpcReply ()) -> GRpcReply () -> STM Bool
forall a. TMVar a -> a -> STM Bool
tryPutTMVar TMVar (GRpcReply ())
var (() -> GRpcReply ()
forall a. a -> GRpcReply a
GRpcOk ())
case IncomingEvent (GRpcOWTy p rref r) ()
ievent of
RecvMessage o :: GRpcOWTy p rref r
o -> IO () -> ClientIO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ClientIO ()) -> IO () -> ClientIO ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMChan (GRpcReply r) -> GRpcReply r -> STM ()
forall a. TMChan a -> a -> STM ()
writeTMChan TMChan (GRpcReply r)
inchan (r -> GRpcReply r
forall a. a -> GRpcReply a
GRpcOk (r -> GRpcReply r) -> r -> GRpcReply r
forall a b. (a -> b) -> a -> b
$ Proxy p -> Proxy rref -> GRpcOWTy p rref r -> r
forall (p :: GRpcMessageProtocol) (ref :: TypeRef) r.
GRpcOutputWrapper p ref r =>
Proxy p -> Proxy ref -> GRpcOWTy p ref r -> r
unGRpcOWTy(Proxy p
forall k (t :: k). Proxy t
Proxy @p) (Proxy rref
forall k (t :: k). Proxy t
Proxy @rref) GRpcOWTy p rref r
o)
Invalid e :: SomeException
e -> IO () -> ClientIO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ClientIO ()) -> IO () -> ClientIO ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMChan (GRpcReply r) -> GRpcReply r -> STM ()
forall a. TMChan a -> a -> STM ()
writeTMChan TMChan (GRpcReply r)
inchan (String -> GRpcReply r
forall a. String -> GRpcReply a
GRpcErrorString (SomeException -> String
forall a. Show a => a -> String
show SomeException
e))
_ -> () -> ClientIO ()
forall (m :: * -> *) a. Monad m => a -> m a
return () )
() (\_ -> do
Maybe v
nextVal <- IO (Maybe v) -> ExceptT ClientError IO (Maybe v)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe v) -> ExceptT ClientError IO (Maybe v))
-> IO (Maybe v) -> ExceptT ClientError IO (Maybe v)
forall a b. (a -> b) -> a -> b
$ STM (Maybe v) -> IO (Maybe v)
forall a. STM a -> IO a
atomically (STM (Maybe v) -> IO (Maybe v)) -> STM (Maybe v) -> IO (Maybe v)
forall a b. (a -> b) -> a -> b
$ TMChan v -> STM (Maybe v)
forall a. TMChan a -> STM (Maybe a)
readTMChan TMChan v
outchan
case Maybe v
nextVal of
Nothing -> ((), OutgoingEvent (GRpcIWTy p vref v) ())
-> ClientIO ((), OutgoingEvent (GRpcIWTy p vref v) ())
forall (m :: * -> *) a. Monad m => a -> m a
return ((), OutgoingEvent (GRpcIWTy p vref v) ()
forall i b. OutgoingEvent i b
Finalize)
Just v :: v
v -> ((), OutgoingEvent (GRpcIWTy p vref v) ())
-> ClientIO ((), OutgoingEvent (GRpcIWTy p vref v) ())
forall (m :: * -> *) a. Monad m => a -> m a
return ((), CompressMode
-> GRpcIWTy p vref v -> OutgoingEvent (GRpcIWTy p vref v) ()
forall i b. CompressMode -> i -> OutgoingEvent i b
SendMessage CompressMode
compress (Proxy p -> Proxy vref -> v -> GRpcIWTy p vref v
forall (p :: GRpcMessageProtocol) (ref :: TypeRef) r.
GRpcInputWrapper p ref r =>
Proxy p -> Proxy ref -> r -> GRpcIWTy p ref r
buildGRpcIWTy (Proxy p
forall k (t :: k). Proxy t
Proxy @p) (Proxy vref
forall k (t :: k). Proxy t
Proxy @vref) v
v)))
case GRpcReply ()
v of
GRpcOk () -> IO () -> IO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMChan (GRpcReply r) -> STM ()
forall a. TMChan a -> STM ()
closeTMChan TMChan (GRpcReply r)
inchan
_ -> IO () -> IO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMVar (GRpcReply ()) -> GRpcReply () -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar TMVar (GRpcReply ())
var GRpcReply ()
v
let go :: ConduitT v (GRpcReply r) IO ()
go = do GRpcReply ()
err <- IO (GRpcReply ()) -> ConduitT v (GRpcReply r) IO (GRpcReply ())
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (GRpcReply ()) -> ConduitT v (GRpcReply r) IO (GRpcReply ()))
-> IO (GRpcReply ()) -> ConduitT v (GRpcReply r) IO (GRpcReply ())
forall a b. (a -> b) -> a -> b
$ STM (GRpcReply ()) -> IO (GRpcReply ())
forall a. STM a -> IO a
atomically (STM (GRpcReply ()) -> IO (GRpcReply ()))
-> STM (GRpcReply ()) -> IO (GRpcReply ())
forall a b. (a -> b) -> a -> b
$ TMVar (GRpcReply ()) -> STM (GRpcReply ())
forall a. TMVar a -> STM a
takeTMVar TMVar (GRpcReply ())
var
case GRpcReply ()
err of
GRpcOk _ -> ConduitT v (GRpcReply r) IO ()
go2
e :: GRpcReply ()
e -> GRpcReply r -> ConduitT v (GRpcReply r) IO ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield (GRpcReply r -> ConduitT v (GRpcReply r) IO ())
-> GRpcReply r -> ConduitT v (GRpcReply r) IO ()
forall a b. (a -> b) -> a -> b
$ (\_ -> String -> r
forall a. HasCallStack => String -> a
error "this should never happen") (() -> r) -> GRpcReply () -> GRpcReply r
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> GRpcReply ()
e
go2 :: ConduitT v (GRpcReply r) IO ()
go2 = do Maybe v
nextOut <- ConduitT v (GRpcReply r) IO (Maybe v)
forall (m :: * -> *) i. Monad m => Consumer i m (Maybe i)
await
case Maybe v
nextOut of
Just v :: v
v -> do IO () -> ConduitT v (GRpcReply r) IO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ConduitT v (GRpcReply r) IO ())
-> IO () -> ConduitT v (GRpcReply r) IO ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMChan v -> v -> STM ()
forall a. TMChan a -> a -> STM ()
writeTMChan TMChan v
outchan v
v
ConduitT v (GRpcReply r) IO ()
go2
Nothing -> do Maybe (Maybe (GRpcReply r))
r <- IO (Maybe (Maybe (GRpcReply r)))
-> ConduitT v (GRpcReply r) IO (Maybe (Maybe (GRpcReply r)))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe (Maybe (GRpcReply r)))
-> ConduitT v (GRpcReply r) IO (Maybe (Maybe (GRpcReply r))))
-> IO (Maybe (Maybe (GRpcReply r)))
-> ConduitT v (GRpcReply r) IO (Maybe (Maybe (GRpcReply r)))
forall a b. (a -> b) -> a -> b
$ STM (Maybe (Maybe (GRpcReply r)))
-> IO (Maybe (Maybe (GRpcReply r)))
forall a. STM a -> IO a
atomically (STM (Maybe (Maybe (GRpcReply r)))
-> IO (Maybe (Maybe (GRpcReply r))))
-> STM (Maybe (Maybe (GRpcReply r)))
-> IO (Maybe (Maybe (GRpcReply r)))
forall a b. (a -> b) -> a -> b
$ TMChan (GRpcReply r) -> STM (Maybe (Maybe (GRpcReply r)))
forall a. TMChan a -> STM (Maybe (Maybe a))
tryReadTMChan TMChan (GRpcReply r)
inchan
case Maybe (Maybe (GRpcReply r))
r of
Nothing -> () -> ConduitT v (GRpcReply r) IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Just Nothing -> ConduitT v (GRpcReply r) IO ()
go2
Just (Just nextIn :: GRpcReply r
nextIn) -> GRpcReply r -> ConduitT v (GRpcReply r) IO ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield GRpcReply r
nextIn ConduitT v (GRpcReply r) IO ()
-> ConduitT v (GRpcReply r) IO () -> ConduitT v (GRpcReply r) IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> ConduitT v (GRpcReply r) IO ()
go2
ConduitT v (GRpcReply r) IO ()
-> IO (ConduitT v (GRpcReply r) IO ())
forall (m :: * -> *) a. Monad m => a -> m a
return ConduitT v (GRpcReply r) IO ()
go