{-# language AllowAmbiguousTypes   #-}
{-# language DataKinds             #-}
{-# language DeriveFunctor         #-}
{-# language FlexibleContexts      #-}
{-# language FlexibleInstances     #-}
{-# language GADTs                 #-}
{-# language MultiParamTypeClasses #-}
{-# language PolyKinds             #-}
{-# language ScopedTypeVariables   #-}
{-# language TypeApplications      #-}
{-# language TypeFamilies          #-}
{-# language TypeOperators         #-}
{-# language UndecidableInstances  #-}
-- | Client for gRPC services defined using Mu 'Service'
module Mu.GRpc.Client.Internal where

import           Control.Concurrent.Async
import           Control.Concurrent.STM        (atomically)
import           Control.Concurrent.STM.TMChan
import           Control.Concurrent.STM.TMVar
import           Control.Monad.IO.Class
import qualified Data.ByteString.Char8         as BS
import           Data.Conduit
import qualified Data.Conduit.Combinators      as C
import           Data.Conduit.TMChan
import           Data.Kind
import           Network.GRPC.Client           (CompressMode (..), IncomingEvent (..),
                                                OutgoingEvent (..), RawReply, StreamDone (..))
import           Network.GRPC.Client.Helpers
import           Network.GRPC.HTTP2.Encoding   (GRPCInput, GRPCOutput)
import           Network.HTTP2                 (ErrorCode)
import           Network.HTTP2.Client          (ClientError, ClientIO, TooMuchConcurrency,
                                                runExceptT)

import           Mu.Adapter.ProtoBuf.Via
import           Mu.GRpc.Avro
import           Mu.GRpc.Bridge
import           Mu.Rpc
import           Mu.Schema

setupGrpcClient' :: GrpcClientConfig -> IO (Either ClientError GrpcClient)
setupGrpcClient' :: GrpcClientConfig -> IO (Either ClientError GrpcClient)
setupGrpcClient' = ExceptT ClientError IO GrpcClient
-> IO (Either ClientError GrpcClient)
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT (ExceptT ClientError IO GrpcClient
 -> IO (Either ClientError GrpcClient))
-> (GrpcClientConfig -> ExceptT ClientError IO GrpcClient)
-> GrpcClientConfig
-> IO (Either ClientError GrpcClient)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. GrpcClientConfig -> ExceptT ClientError IO GrpcClient
setupGrpcClient

class GRpcServiceMethodCall (p :: GRpcMessageProtocol) (s :: Service snm mnm) (m :: Method mnm) h where
  gRpcServiceMethodCall :: Proxy p -> Proxy s -> Proxy m -> GrpcClient -> h
instance ( KnownName serviceName, KnownName (FindPackageName anns), KnownName mname
         , GRpcMethodCall p ('Method mname manns margs mret) h, MkRPC p )
         => GRpcServiceMethodCall p ('Service serviceName anns methods)
                                  ('Method mname manns margs mret) h where
  gRpcServiceMethodCall :: Proxy p
-> Proxy ('Service serviceName anns methods)
-> Proxy ('Method mname manns margs mret)
-> GrpcClient
-> h
gRpcServiceMethodCall pro :: Proxy p
pro _ = RPCTy p
-> Proxy ('Method mname manns margs mret) -> GrpcClient -> h
forall k (p :: GRpcMessageProtocol) (method :: k) h.
GRpcMethodCall p method h =>
RPCTy p -> Proxy method -> GrpcClient -> h
gRpcMethodCall @p RPCTy p
rpc
    where pkgName :: ByteString
pkgName = String -> ByteString
BS.pack (Proxy (FindPackageName anns) -> String
forall k (a :: k) (proxy :: k -> *).
KnownName a =>
proxy a -> String
nameVal (Proxy (FindPackageName anns)
forall k (t :: k). Proxy t
Proxy @(FindPackageName anns)))
          svrName :: ByteString
svrName = String -> ByteString
BS.pack (Proxy serviceName -> String
forall k (a :: k) (proxy :: k -> *).
KnownName a =>
proxy a -> String
nameVal (Proxy serviceName
forall k (t :: k). Proxy t
Proxy @serviceName))
          metName :: ByteString
metName = String -> ByteString
BS.pack (Proxy mname -> String
forall k (a :: k) (proxy :: k -> *).
KnownName a =>
proxy a -> String
nameVal (Proxy mname
forall k (t :: k). Proxy t
Proxy @mname))
          rpc :: RPCTy p
rpc = Proxy p -> ByteString -> ByteString -> ByteString -> RPCTy p
forall (p :: GRpcMessageProtocol).
MkRPC p =>
Proxy p -> ByteString -> ByteString -> ByteString -> RPCTy p
mkRPC Proxy p
pro ByteString
pkgName ByteString
svrName ByteString
metName

data GRpcReply a
  = GRpcTooMuchConcurrency TooMuchConcurrency
  | GRpcErrorCode ErrorCode
  | GRpcErrorString String
  | GRpcClientError ClientError
  | GRpcOk a
  deriving (Int -> GRpcReply a -> ShowS
[GRpcReply a] -> ShowS
GRpcReply a -> String
(Int -> GRpcReply a -> ShowS)
-> (GRpcReply a -> String)
-> ([GRpcReply a] -> ShowS)
-> Show (GRpcReply a)
forall a. Show a => Int -> GRpcReply a -> ShowS
forall a. Show a => [GRpcReply a] -> ShowS
forall a. Show a => GRpcReply a -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [GRpcReply a] -> ShowS
$cshowList :: forall a. Show a => [GRpcReply a] -> ShowS
show :: GRpcReply a -> String
$cshow :: forall a. Show a => GRpcReply a -> String
showsPrec :: Int -> GRpcReply a -> ShowS
$cshowsPrec :: forall a. Show a => Int -> GRpcReply a -> ShowS
Show, a -> GRpcReply b -> GRpcReply a
(a -> b) -> GRpcReply a -> GRpcReply b
(forall a b. (a -> b) -> GRpcReply a -> GRpcReply b)
-> (forall a b. a -> GRpcReply b -> GRpcReply a)
-> Functor GRpcReply
forall a b. a -> GRpcReply b -> GRpcReply a
forall a b. (a -> b) -> GRpcReply a -> GRpcReply b
forall (f :: * -> *).
(forall a b. (a -> b) -> f a -> f b)
-> (forall a b. a -> f b -> f a) -> Functor f
<$ :: a -> GRpcReply b -> GRpcReply a
$c<$ :: forall a b. a -> GRpcReply b -> GRpcReply a
fmap :: (a -> b) -> GRpcReply a -> GRpcReply b
$cfmap :: forall a b. (a -> b) -> GRpcReply a -> GRpcReply b
Functor)

buildGRpcReply1 :: Either TooMuchConcurrency (RawReply a) -> GRpcReply a
buildGRpcReply1 :: Either TooMuchConcurrency (RawReply a) -> GRpcReply a
buildGRpcReply1 (Left tmc :: TooMuchConcurrency
tmc)                      = TooMuchConcurrency -> GRpcReply a
forall a. TooMuchConcurrency -> GRpcReply a
GRpcTooMuchConcurrency TooMuchConcurrency
tmc
buildGRpcReply1 (Right (Left ec :: ErrorCode
ec))               = ErrorCode -> GRpcReply a
forall a. ErrorCode -> GRpcReply a
GRpcErrorCode ErrorCode
ec
buildGRpcReply1 (Right (Right (_, _, Left es :: String
es))) = String -> GRpcReply a
forall a. String -> GRpcReply a
GRpcErrorString String
es
buildGRpcReply1 (Right (Right (_, _, Right r :: a
r))) = a -> GRpcReply a
forall a. a -> GRpcReply a
GRpcOk a
r

buildGRpcReply2 :: Either TooMuchConcurrency (r, RawReply a) -> GRpcReply a
buildGRpcReply2 :: Either TooMuchConcurrency (r, RawReply a) -> GRpcReply a
buildGRpcReply2 (Left tmc :: TooMuchConcurrency
tmc)                         = TooMuchConcurrency -> GRpcReply a
forall a. TooMuchConcurrency -> GRpcReply a
GRpcTooMuchConcurrency TooMuchConcurrency
tmc
buildGRpcReply2 (Right (_, Left ec :: ErrorCode
ec))               = ErrorCode -> GRpcReply a
forall a. ErrorCode -> GRpcReply a
GRpcErrorCode ErrorCode
ec
buildGRpcReply2 (Right (_, Right (_, _, Left es :: String
es))) = String -> GRpcReply a
forall a. String -> GRpcReply a
GRpcErrorString String
es
buildGRpcReply2 (Right (_, Right (_, _, Right r :: a
r))) = a -> GRpcReply a
forall a. a -> GRpcReply a
GRpcOk a
r

buildGRpcReply3 :: Either TooMuchConcurrency v -> GRpcReply ()
buildGRpcReply3 :: Either TooMuchConcurrency v -> GRpcReply ()
buildGRpcReply3 (Left tmc :: TooMuchConcurrency
tmc) = TooMuchConcurrency -> GRpcReply ()
forall a. TooMuchConcurrency -> GRpcReply a
GRpcTooMuchConcurrency TooMuchConcurrency
tmc
buildGRpcReply3 (Right _)  = () -> GRpcReply ()
forall a. a -> GRpcReply a
GRpcOk ()

simplifyResponse :: ClientIO (GRpcReply a) -> IO (GRpcReply a)
simplifyResponse :: ClientIO (GRpcReply a) -> IO (GRpcReply a)
simplifyResponse reply :: ClientIO (GRpcReply a)
reply = do
  Either ClientError (GRpcReply a)
r <- ClientIO (GRpcReply a) -> IO (Either ClientError (GRpcReply a))
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT ClientIO (GRpcReply a)
reply
  case Either ClientError (GRpcReply a)
r of
    Left e :: ClientError
e  -> GRpcReply a -> IO (GRpcReply a)
forall (m :: * -> *) a. Monad m => a -> m a
return (GRpcReply a -> IO (GRpcReply a))
-> GRpcReply a -> IO (GRpcReply a)
forall a b. (a -> b) -> a -> b
$ ClientError -> GRpcReply a
forall a. ClientError -> GRpcReply a
GRpcClientError ClientError
e
    Right v :: GRpcReply a
v -> GRpcReply a -> IO (GRpcReply a)
forall (m :: * -> *) a. Monad m => a -> m a
return GRpcReply a
v

-- These type classes allow us to abstract over
-- the choice of message protocol (PB or Avro)

class GRPCInput (RPCTy p) (GRpcIWTy p ref r)
      => GRpcInputWrapper (p :: GRpcMessageProtocol) (ref :: TypeRef) (r :: Type) where
  type GRpcIWTy p ref r :: Type
  buildGRpcIWTy :: Proxy p -> Proxy ref -> r -> GRpcIWTy p ref r

instance ToProtoBufTypeRef ref r
         => GRpcInputWrapper 'MsgProtoBuf ref r where
  type GRpcIWTy 'MsgProtoBuf ref r = ViaToProtoBufTypeRef ref r
  buildGRpcIWTy :: Proxy 'MsgProtoBuf -> Proxy ref -> r -> GRpcIWTy 'MsgProtoBuf ref r
buildGRpcIWTy _ _ = r -> GRpcIWTy 'MsgProtoBuf ref r
forall (ref :: TypeRef) t. t -> ViaToProtoBufTypeRef ref t
ViaToProtoBufTypeRef

instance (GRPCInput AvroRPC (ViaToAvroTypeRef ('ViaSchema sch sty) r))
         => GRpcInputWrapper 'MsgAvro ('ViaSchema sch sty) r where
  type GRpcIWTy 'MsgAvro ('ViaSchema sch sty) r = ViaToAvroTypeRef ('ViaSchema sch sty) r
  buildGRpcIWTy :: Proxy 'MsgAvro
-> Proxy ('ViaSchema sch sty)
-> r
-> GRpcIWTy 'MsgAvro ('ViaSchema sch sty) r
buildGRpcIWTy _ _ = r -> GRpcIWTy 'MsgAvro ('ViaSchema sch sty) r
forall (ref :: TypeRef) t. t -> ViaToAvroTypeRef ref t
ViaToAvroTypeRef

class GRPCOutput (RPCTy p) (GRpcOWTy p ref r)
      => GRpcOutputWrapper (p :: GRpcMessageProtocol) (ref :: TypeRef) (r :: Type) where
  type GRpcOWTy p ref r :: Type
  unGRpcOWTy :: Proxy p -> Proxy ref -> GRpcOWTy p ref r -> r

instance FromProtoBufTypeRef ref r
         => GRpcOutputWrapper 'MsgProtoBuf ref r where
  type GRpcOWTy 'MsgProtoBuf ref r = ViaFromProtoBufTypeRef ref r
  unGRpcOWTy :: Proxy 'MsgProtoBuf -> Proxy ref -> GRpcOWTy 'MsgProtoBuf ref r -> r
unGRpcOWTy _ _ = GRpcOWTy 'MsgProtoBuf ref r -> r
forall (ref :: TypeRef) t. ViaFromProtoBufTypeRef ref t -> t
unViaFromProtoBufTypeRef

instance (GRPCOutput AvroRPC (ViaFromAvroTypeRef ('ViaSchema sch sty) r))
         => GRpcOutputWrapper 'MsgAvro ('ViaSchema sch sty) r where
  type GRpcOWTy 'MsgAvro ('ViaSchema sch sty) r = ViaFromAvroTypeRef ('ViaSchema sch sty) r
  unGRpcOWTy :: Proxy 'MsgAvro
-> Proxy ('ViaSchema sch sty)
-> GRpcOWTy 'MsgAvro ('ViaSchema sch sty) r
-> r
unGRpcOWTy _ _ = GRpcOWTy 'MsgAvro ('ViaSchema sch sty) r -> r
forall (ref :: TypeRef) t. ViaFromAvroTypeRef ref t -> t
unViaFromAvroTypeRef

-- -----------------------------
-- IMPLEMENTATION OF THE METHODS
-- -----------------------------

class GRpcMethodCall (p :: GRpcMessageProtocol) method h where
  gRpcMethodCall :: RPCTy p -> Proxy method -> GrpcClient -> h

instance ( KnownName name
         , GRPCInput (RPCTy p) (), GRPCOutput (RPCTy p) ()
         , handler ~ IO (GRpcReply ()) )
         => GRpcMethodCall p ('Method name anns '[ ] 'RetNothing) handler where
  gRpcMethodCall :: RPCTy p
-> Proxy ('Method name anns '[] 'RetNothing)
-> GrpcClient
-> handler
gRpcMethodCall rpc :: RPCTy p
rpc _ client :: GrpcClient
client
    = ClientIO (GRpcReply ()) -> handler
forall a. ClientIO (GRpcReply a) -> IO (GRpcReply a)
simplifyResponse (ClientIO (GRpcReply ()) -> handler)
-> ClientIO (GRpcReply ()) -> handler
forall a b. (a -> b) -> a -> b
$
      Either TooMuchConcurrency (RawReply ()) -> GRpcReply ()
forall a. Either TooMuchConcurrency (RawReply a) -> GRpcReply a
buildGRpcReply1 (Either TooMuchConcurrency (RawReply ()) -> GRpcReply ())
-> ExceptT ClientError IO (Either TooMuchConcurrency (RawReply ()))
-> ClientIO (GRpcReply ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
      RPCTy p
-> GrpcClient
-> ()
-> ExceptT ClientError IO (Either TooMuchConcurrency (RawReply ()))
forall r i o.
(GRPCInput r i, GRPCOutput r o) =>
r
-> GrpcClient
-> i
-> ClientIO (Either TooMuchConcurrency (RawReply o))
rawUnary RPCTy p
rpc GrpcClient
client ()

instance ( KnownName name
         , GRPCInput (RPCTy p) (), GRpcOutputWrapper p rref r
         , handler ~ IO (GRpcReply r) )
         => GRpcMethodCall p ('Method name anns '[ ] ('RetSingle rref)) handler where
  gRpcMethodCall :: RPCTy p
-> Proxy ('Method name anns '[] ('RetSingle rref))
-> GrpcClient
-> handler
gRpcMethodCall rpc :: RPCTy p
rpc _ client :: GrpcClient
client
    = (GRpcReply (GRpcOWTy p rref r) -> GRpcReply r)
-> IO (GRpcReply (GRpcOWTy p rref r)) -> IO (GRpcReply r)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((GRpcOWTy p rref r -> r)
-> GRpcReply (GRpcOWTy p rref r) -> GRpcReply r
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Proxy p -> Proxy rref -> GRpcOWTy p rref r -> r
forall (p :: GRpcMessageProtocol) (ref :: TypeRef) r.
GRpcOutputWrapper p ref r =>
Proxy p -> Proxy ref -> GRpcOWTy p ref r -> r
unGRpcOWTy (Proxy p
forall k (t :: k). Proxy t
Proxy @p) (Proxy rref
forall k (t :: k). Proxy t
Proxy @rref))) (IO (GRpcReply (GRpcOWTy p rref r)) -> handler)
-> IO (GRpcReply (GRpcOWTy p rref r)) -> handler
forall a b. (a -> b) -> a -> b
$
      ClientIO (GRpcReply (GRpcOWTy p rref r))
-> IO (GRpcReply (GRpcOWTy p rref r))
forall a. ClientIO (GRpcReply a) -> IO (GRpcReply a)
simplifyResponse (ClientIO (GRpcReply (GRpcOWTy p rref r))
 -> IO (GRpcReply (GRpcOWTy p rref r)))
-> ClientIO (GRpcReply (GRpcOWTy p rref r))
-> IO (GRpcReply (GRpcOWTy p rref r))
forall a b. (a -> b) -> a -> b
$
      Either TooMuchConcurrency (RawReply (GRpcOWTy p rref r))
-> GRpcReply (GRpcOWTy p rref r)
forall a. Either TooMuchConcurrency (RawReply a) -> GRpcReply a
buildGRpcReply1 (Either TooMuchConcurrency (RawReply (GRpcOWTy p rref r))
 -> GRpcReply (GRpcOWTy p rref r))
-> ExceptT
     ClientError
     IO
     (Either TooMuchConcurrency (RawReply (GRpcOWTy p rref r)))
-> ClientIO (GRpcReply (GRpcOWTy p rref r))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
      RPCTy p
-> GrpcClient
-> ()
-> ExceptT
     ClientError
     IO
     (Either TooMuchConcurrency (RawReply (GRpcOWTy p rref r)))
forall r i o.
(GRPCInput r i, GRPCOutput r o) =>
r
-> GrpcClient
-> i
-> ClientIO (Either TooMuchConcurrency (RawReply o))
rawUnary @_ @() @(GRpcOWTy p rref r) RPCTy p
rpc GrpcClient
client ()

instance ( KnownName name
         , GRPCInput (RPCTy p) (), GRpcOutputWrapper p rref r
         , handler ~ IO (ConduitT () (GRpcReply r) IO ()) )
         => GRpcMethodCall p ('Method name anns '[ ] ('RetStream rref)) handler where
  gRpcMethodCall :: RPCTy p
-> Proxy ('Method name anns '[] ('RetStream rref))
-> GrpcClient
-> handler
gRpcMethodCall rpc :: RPCTy p
rpc _ client :: GrpcClient
client
    = do -- Create a new TMChan
         TMChan r
chan <- IO (TMChan r)
forall a. IO (TMChan a)
newTMChanIO :: IO (TMChan r)
         TMVar (GRpcReply ())
var  <- IO (TMVar (GRpcReply ()))
forall a. IO (TMVar a)
newEmptyTMVarIO  -- if full, this means an error
         -- Start executing the client in another thread
         Async ()
_ <- IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async (IO () -> IO (Async ())) -> IO () -> IO (Async ())
forall a b. (a -> b) -> a -> b
$ do
            GRpcReply ()
v <- ClientIO (GRpcReply ()) -> IO (GRpcReply ())
forall a. ClientIO (GRpcReply a) -> IO (GRpcReply a)
simplifyResponse (ClientIO (GRpcReply ()) -> IO (GRpcReply ()))
-> ClientIO (GRpcReply ()) -> IO (GRpcReply ())
forall a b. (a -> b) -> a -> b
$
                 Either TooMuchConcurrency ((), HeaderList, HeaderList)
-> GRpcReply ()
forall v. Either TooMuchConcurrency v -> GRpcReply ()
buildGRpcReply3 (Either TooMuchConcurrency ((), HeaderList, HeaderList)
 -> GRpcReply ())
-> ExceptT
     ClientError
     IO
     (Either TooMuchConcurrency ((), HeaderList, HeaderList))
-> ClientIO (GRpcReply ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
                 RPCTy p
-> GrpcClient
-> ()
-> ()
-> (() -> HeaderList -> GRpcOWTy p rref r -> ClientIO ())
-> ExceptT
     ClientError
     IO
     (Either TooMuchConcurrency ((), HeaderList, HeaderList))
forall r i o a.
(GRPCInput r i, GRPCOutput r o) =>
r
-> GrpcClient
-> a
-> i
-> (a -> HeaderList -> o -> ClientIO a)
-> ClientIO (Either TooMuchConcurrency (a, HeaderList, HeaderList))
rawStreamServer @_ @() @(GRpcOWTy p rref r)
                                 RPCTy p
rpc GrpcClient
client () ()
                                 (\_ _ newVal :: GRpcOWTy p rref r
newVal -> IO () -> ClientIO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ClientIO ()) -> IO () -> ClientIO ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
                                   -- on the first iteration, say that everything is OK
                                   Bool
_ <- TMVar (GRpcReply ()) -> GRpcReply () -> STM Bool
forall a. TMVar a -> a -> STM Bool
tryPutTMVar TMVar (GRpcReply ())
var (() -> GRpcReply ()
forall a. a -> GRpcReply a
GRpcOk ())
                                   TMChan r -> r -> STM ()
forall a. TMChan a -> a -> STM ()
writeTMChan TMChan r
chan (Proxy p -> Proxy rref -> GRpcOWTy p rref r -> r
forall (p :: GRpcMessageProtocol) (ref :: TypeRef) r.
GRpcOutputWrapper p ref r =>
Proxy p -> Proxy ref -> GRpcOWTy p ref r -> r
unGRpcOWTy (Proxy p
forall k (t :: k). Proxy t
Proxy @p) (Proxy rref
forall k (t :: k). Proxy t
Proxy @rref) GRpcOWTy p rref r
newVal))
            case GRpcReply ()
v of
              GRpcOk () -> IO () -> IO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMChan r -> STM ()
forall a. TMChan a -> STM ()
closeTMChan TMChan r
chan
              _         -> IO () -> IO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMVar (GRpcReply ()) -> GRpcReply () -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar TMVar (GRpcReply ())
var GRpcReply ()
v
         -- This conduit feeds information to the other thread
         let go :: ConduitT () (GRpcReply r) IO ()
go = do GRpcReply ()
firstResult <- IO (GRpcReply ()) -> ConduitT () (GRpcReply r) IO (GRpcReply ())
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (GRpcReply ()) -> ConduitT () (GRpcReply r) IO (GRpcReply ()))
-> IO (GRpcReply ()) -> ConduitT () (GRpcReply r) IO (GRpcReply ())
forall a b. (a -> b) -> a -> b
$ STM (GRpcReply ()) -> IO (GRpcReply ())
forall a. STM a -> IO a
atomically (STM (GRpcReply ()) -> IO (GRpcReply ()))
-> STM (GRpcReply ()) -> IO (GRpcReply ())
forall a b. (a -> b) -> a -> b
$ TMVar (GRpcReply ()) -> STM (GRpcReply ())
forall a. TMVar a -> STM a
takeTMVar TMVar (GRpcReply ())
var
                     case GRpcReply ()
firstResult of
                       GRpcOk _ -> -- no error, everything is fine
                         TMChan r -> ConduitT () r IO ()
forall (m :: * -> *) a. MonadIO m => TMChan a -> ConduitT () a m ()
sourceTMChan TMChan r
chan ConduitT () r IO ()
-> ConduitM r (GRpcReply r) IO ()
-> ConduitT () (GRpcReply r) IO ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| (r -> GRpcReply r) -> ConduitM r (GRpcReply r) IO ()
forall (m :: * -> *) a b. Monad m => (a -> b) -> ConduitT a b m ()
C.map r -> GRpcReply r
forall a. a -> GRpcReply a
GRpcOk
                       e :: GRpcReply ()
e -> GRpcReply r -> ConduitT () (GRpcReply r) IO ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield (GRpcReply r -> ConduitT () (GRpcReply r) IO ())
-> GRpcReply r -> ConduitT () (GRpcReply r) IO ()
forall a b. (a -> b) -> a -> b
$ (\_ -> String -> r
forall a. HasCallStack => String -> a
error "this should never happen") (() -> r) -> GRpcReply () -> GRpcReply r
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> GRpcReply ()
e
         ConduitT () (GRpcReply r) IO ()
-> IO (ConduitT () (GRpcReply r) IO ())
forall (m :: * -> *) a. Monad m => a -> m a
return ConduitT () (GRpcReply r) IO ()
go

instance ( KnownName name
         , GRpcInputWrapper p vref v, GRPCOutput (RPCTy p) ()
         , handler ~ (v -> IO (GRpcReply ())) )
         => GRpcMethodCall p ('Method name anns '[ 'ArgSingle vref ] 'RetNothing) handler where
  gRpcMethodCall :: RPCTy p
-> Proxy ('Method name anns '[ 'ArgSingle vref] 'RetNothing)
-> GrpcClient
-> handler
gRpcMethodCall rpc :: RPCTy p
rpc _ client :: GrpcClient
client x :: v
x
    = ClientIO (GRpcReply ()) -> IO (GRpcReply ())
forall a. ClientIO (GRpcReply a) -> IO (GRpcReply a)
simplifyResponse (ClientIO (GRpcReply ()) -> IO (GRpcReply ()))
-> ClientIO (GRpcReply ()) -> IO (GRpcReply ())
forall a b. (a -> b) -> a -> b
$
      Either TooMuchConcurrency (RawReply ()) -> GRpcReply ()
forall a. Either TooMuchConcurrency (RawReply a) -> GRpcReply a
buildGRpcReply1 (Either TooMuchConcurrency (RawReply ()) -> GRpcReply ())
-> ExceptT ClientError IO (Either TooMuchConcurrency (RawReply ()))
-> ClientIO (GRpcReply ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
      RPCTy p
-> GrpcClient
-> GRpcIWTy p vref v
-> ExceptT ClientError IO (Either TooMuchConcurrency (RawReply ()))
forall r i o.
(GRPCInput r i, GRPCOutput r o) =>
r
-> GrpcClient
-> i
-> ClientIO (Either TooMuchConcurrency (RawReply o))
rawUnary @_ @(GRpcIWTy p vref v) @() RPCTy p
rpc GrpcClient
client (Proxy p -> Proxy vref -> v -> GRpcIWTy p vref v
forall (p :: GRpcMessageProtocol) (ref :: TypeRef) r.
GRpcInputWrapper p ref r =>
Proxy p -> Proxy ref -> r -> GRpcIWTy p ref r
buildGRpcIWTy (Proxy p
forall k (t :: k). Proxy t
Proxy @p) (Proxy vref
forall k (t :: k). Proxy t
Proxy @vref) v
x)

instance ( KnownName name
         , GRpcInputWrapper p vref v, GRpcOutputWrapper p rref r
         , handler ~ (v -> IO (GRpcReply r)) )
         => GRpcMethodCall p ('Method name anns '[ 'ArgSingle vref ] ('RetSingle rref)) handler where
  gRpcMethodCall :: RPCTy p
-> Proxy ('Method name anns '[ 'ArgSingle vref] ('RetSingle rref))
-> GrpcClient
-> handler
gRpcMethodCall rpc :: RPCTy p
rpc _ client :: GrpcClient
client x :: v
x
    = (GRpcReply (GRpcOWTy p rref r) -> GRpcReply r)
-> IO (GRpcReply (GRpcOWTy p rref r)) -> IO (GRpcReply r)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((GRpcOWTy p rref r -> r)
-> GRpcReply (GRpcOWTy p rref r) -> GRpcReply r
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Proxy p -> Proxy rref -> GRpcOWTy p rref r -> r
forall (p :: GRpcMessageProtocol) (ref :: TypeRef) r.
GRpcOutputWrapper p ref r =>
Proxy p -> Proxy ref -> GRpcOWTy p ref r -> r
unGRpcOWTy (Proxy p
forall k (t :: k). Proxy t
Proxy @p) (Proxy rref
forall k (t :: k). Proxy t
Proxy @rref))) (IO (GRpcReply (GRpcOWTy p rref r)) -> IO (GRpcReply r))
-> IO (GRpcReply (GRpcOWTy p rref r)) -> IO (GRpcReply r)
forall a b. (a -> b) -> a -> b
$
      ClientIO (GRpcReply (GRpcOWTy p rref r))
-> IO (GRpcReply (GRpcOWTy p rref r))
forall a. ClientIO (GRpcReply a) -> IO (GRpcReply a)
simplifyResponse (ClientIO (GRpcReply (GRpcOWTy p rref r))
 -> IO (GRpcReply (GRpcOWTy p rref r)))
-> ClientIO (GRpcReply (GRpcOWTy p rref r))
-> IO (GRpcReply (GRpcOWTy p rref r))
forall a b. (a -> b) -> a -> b
$
      Either TooMuchConcurrency (RawReply (GRpcOWTy p rref r))
-> GRpcReply (GRpcOWTy p rref r)
forall a. Either TooMuchConcurrency (RawReply a) -> GRpcReply a
buildGRpcReply1 (Either TooMuchConcurrency (RawReply (GRpcOWTy p rref r))
 -> GRpcReply (GRpcOWTy p rref r))
-> ExceptT
     ClientError
     IO
     (Either TooMuchConcurrency (RawReply (GRpcOWTy p rref r)))
-> ClientIO (GRpcReply (GRpcOWTy p rref r))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
      RPCTy p
-> GrpcClient
-> GRpcIWTy p vref v
-> ExceptT
     ClientError
     IO
     (Either TooMuchConcurrency (RawReply (GRpcOWTy p rref r)))
forall r i o.
(GRPCInput r i, GRPCOutput r o) =>
r
-> GrpcClient
-> i
-> ClientIO (Either TooMuchConcurrency (RawReply o))
rawUnary @_ @(GRpcIWTy p vref v) @(GRpcOWTy p rref r)
               RPCTy p
rpc GrpcClient
client (Proxy p -> Proxy vref -> v -> GRpcIWTy p vref v
forall (p :: GRpcMessageProtocol) (ref :: TypeRef) r.
GRpcInputWrapper p ref r =>
Proxy p -> Proxy ref -> r -> GRpcIWTy p ref r
buildGRpcIWTy (Proxy p
forall k (t :: k). Proxy t
Proxy @p) (Proxy vref
forall k (t :: k). Proxy t
Proxy @vref) v
x)

instance ( KnownName name
         , GRpcInputWrapper p vref v, GRpcOutputWrapper p rref r
         , handler ~ (CompressMode -> IO (ConduitT v Void IO (GRpcReply r))) )
         => GRpcMethodCall p ('Method name anns '[ 'ArgStream vref ] ('RetSingle rref)) handler where
  gRpcMethodCall :: RPCTy p
-> Proxy ('Method name anns '[ 'ArgStream vref] ('RetSingle rref))
-> GrpcClient
-> handler
gRpcMethodCall rpc :: RPCTy p
rpc _ client :: GrpcClient
client compress :: CompressMode
compress
    = do -- Create a new TMChan
         TMChan v
chan <- IO (TMChan v)
forall a. IO (TMChan a)
newTMChanIO :: IO (TMChan v)
         -- Start executing the client in another thread
         Async (GRpcReply r)
promise <- IO (GRpcReply r) -> IO (Async (GRpcReply r))
forall a. IO a -> IO (Async a)
async (IO (GRpcReply r) -> IO (Async (GRpcReply r)))
-> IO (GRpcReply r) -> IO (Async (GRpcReply r))
forall a b. (a -> b) -> a -> b
$
            (GRpcReply (GRpcOWTy p rref r) -> GRpcReply r)
-> IO (GRpcReply (GRpcOWTy p rref r)) -> IO (GRpcReply r)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((GRpcOWTy p rref r -> r)
-> GRpcReply (GRpcOWTy p rref r) -> GRpcReply r
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Proxy p -> Proxy rref -> GRpcOWTy p rref r -> r
forall (p :: GRpcMessageProtocol) (ref :: TypeRef) r.
GRpcOutputWrapper p ref r =>
Proxy p -> Proxy ref -> GRpcOWTy p ref r -> r
unGRpcOWTy (Proxy p
forall k (t :: k). Proxy t
Proxy @p) (Proxy rref
forall k (t :: k). Proxy t
Proxy @rref))) (IO (GRpcReply (GRpcOWTy p rref r)) -> IO (GRpcReply r))
-> IO (GRpcReply (GRpcOWTy p rref r)) -> IO (GRpcReply r)
forall a b. (a -> b) -> a -> b
$
            ClientIO (GRpcReply (GRpcOWTy p rref r))
-> IO (GRpcReply (GRpcOWTy p rref r))
forall a. ClientIO (GRpcReply a) -> IO (GRpcReply a)
simplifyResponse (ClientIO (GRpcReply (GRpcOWTy p rref r))
 -> IO (GRpcReply (GRpcOWTy p rref r)))
-> ClientIO (GRpcReply (GRpcOWTy p rref r))
-> IO (GRpcReply (GRpcOWTy p rref r))
forall a b. (a -> b) -> a -> b
$
            Either TooMuchConcurrency ((), RawReply (GRpcOWTy p rref r))
-> GRpcReply (GRpcOWTy p rref r)
forall r a.
Either TooMuchConcurrency (r, RawReply a) -> GRpcReply a
buildGRpcReply2 (Either TooMuchConcurrency ((), RawReply (GRpcOWTy p rref r))
 -> GRpcReply (GRpcOWTy p rref r))
-> ExceptT
     ClientError
     IO
     (Either TooMuchConcurrency ((), RawReply (GRpcOWTy p rref r)))
-> ClientIO (GRpcReply (GRpcOWTy p rref r))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
            RPCTy p
-> GrpcClient
-> ()
-> (()
    -> ClientIO
         ((), Either StreamDone (CompressMode, GRpcIWTy p vref v)))
-> ExceptT
     ClientError
     IO
     (Either TooMuchConcurrency ((), RawReply (GRpcOWTy p rref r)))
forall r i o a.
(GRPCInput r i, GRPCOutput r o) =>
r
-> GrpcClient
-> a
-> (a -> ClientIO (a, Either StreamDone (CompressMode, i)))
-> ClientIO (Either TooMuchConcurrency (a, RawReply o))
rawStreamClient @_ @(GRpcIWTy p vref v) @(GRpcOWTy p rref r) RPCTy p
rpc GrpcClient
client ()
                            (\_ -> do Maybe v
nextVal <- IO (Maybe v) -> ExceptT ClientError IO (Maybe v)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe v) -> ExceptT ClientError IO (Maybe v))
-> IO (Maybe v) -> ExceptT ClientError IO (Maybe v)
forall a b. (a -> b) -> a -> b
$ STM (Maybe v) -> IO (Maybe v)
forall a. STM a -> IO a
atomically (STM (Maybe v) -> IO (Maybe v)) -> STM (Maybe v) -> IO (Maybe v)
forall a b. (a -> b) -> a -> b
$ TMChan v -> STM (Maybe v)
forall a. TMChan a -> STM (Maybe a)
readTMChan TMChan v
chan
                                      case Maybe v
nextVal of
                                        Nothing -> ((), Either StreamDone (CompressMode, GRpcIWTy p vref v))
-> ClientIO
     ((), Either StreamDone (CompressMode, GRpcIWTy p vref v))
forall (m :: * -> *) a. Monad m => a -> m a
return ((), StreamDone -> Either StreamDone (CompressMode, GRpcIWTy p vref v)
forall a b. a -> Either a b
Left StreamDone
StreamDone)
                                        Just v :: v
v  -> ((), Either StreamDone (CompressMode, GRpcIWTy p vref v))
-> ClientIO
     ((), Either StreamDone (CompressMode, GRpcIWTy p vref v))
forall (m :: * -> *) a. Monad m => a -> m a
return ((), (CompressMode, GRpcIWTy p vref v)
-> Either StreamDone (CompressMode, GRpcIWTy p vref v)
forall a b. b -> Either a b
Right (CompressMode
compress, Proxy p -> Proxy vref -> v -> GRpcIWTy p vref v
forall (p :: GRpcMessageProtocol) (ref :: TypeRef) r.
GRpcInputWrapper p ref r =>
Proxy p -> Proxy ref -> r -> GRpcIWTy p ref r
buildGRpcIWTy (Proxy p
forall k (t :: k). Proxy t
Proxy @p) (Proxy vref
forall k (t :: k). Proxy t
Proxy @vref) v
v)))
         -- This conduit feeds information to the other thread
         let go :: ConduitT v Void IO (GRpcReply r)
go = do Maybe v
x <- ConduitT v Void IO (Maybe v)
forall (m :: * -> *) i. Monad m => Consumer i m (Maybe i)
await
                     case Maybe v
x of
                       Just v :: v
v  -> do IO () -> ConduitT v Void IO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ConduitT v Void IO ()) -> IO () -> ConduitT v Void IO ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMChan v -> v -> STM ()
forall a. TMChan a -> a -> STM ()
writeTMChan TMChan v
chan v
v
                                     ConduitT v Void IO (GRpcReply r)
go
                       Nothing -> do IO () -> ConduitT v Void IO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ConduitT v Void IO ()) -> IO () -> ConduitT v Void IO ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMChan v -> STM ()
forall a. TMChan a -> STM ()
closeTMChan TMChan v
chan
                                     IO (GRpcReply r) -> ConduitT v Void IO (GRpcReply r)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (GRpcReply r) -> ConduitT v Void IO (GRpcReply r))
-> IO (GRpcReply r) -> ConduitT v Void IO (GRpcReply r)
forall a b. (a -> b) -> a -> b
$ Async (GRpcReply r) -> IO (GRpcReply r)
forall a. Async a -> IO a
wait Async (GRpcReply r)
promise
         ConduitT v Void IO (GRpcReply r)
-> IO (ConduitT v Void IO (GRpcReply r))
forall (m :: * -> *) a. Monad m => a -> m a
return ConduitT v Void IO (GRpcReply r)
go

instance ( KnownName name
         , GRpcInputWrapper p vref v, GRpcOutputWrapper p rref r
         , handler ~ (v -> IO (ConduitT () (GRpcReply r) IO ())) )
         => GRpcMethodCall p ('Method name anns '[ 'ArgSingle vref ] ('RetStream rref)) handler where
  gRpcMethodCall :: RPCTy p
-> Proxy ('Method name anns '[ 'ArgSingle vref] ('RetStream rref))
-> GrpcClient
-> handler
gRpcMethodCall rpc :: RPCTy p
rpc _ client :: GrpcClient
client x :: v
x
    = do -- Create a new TMChan
         TMChan r
chan <- IO (TMChan r)
forall a. IO (TMChan a)
newTMChanIO :: IO (TMChan r)
         TMVar (GRpcReply ())
var  <- IO (TMVar (GRpcReply ()))
forall a. IO (TMVar a)
newEmptyTMVarIO  -- if full, this means an error
         -- Start executing the client in another thread
         Async ()
_ <- IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async (IO () -> IO (Async ())) -> IO () -> IO (Async ())
forall a b. (a -> b) -> a -> b
$ do
            GRpcReply ()
v <- ClientIO (GRpcReply ()) -> IO (GRpcReply ())
forall a. ClientIO (GRpcReply a) -> IO (GRpcReply a)
simplifyResponse (ClientIO (GRpcReply ()) -> IO (GRpcReply ()))
-> ClientIO (GRpcReply ()) -> IO (GRpcReply ())
forall a b. (a -> b) -> a -> b
$
                 Either TooMuchConcurrency ((), HeaderList, HeaderList)
-> GRpcReply ()
forall v. Either TooMuchConcurrency v -> GRpcReply ()
buildGRpcReply3 (Either TooMuchConcurrency ((), HeaderList, HeaderList)
 -> GRpcReply ())
-> ExceptT
     ClientError
     IO
     (Either TooMuchConcurrency ((), HeaderList, HeaderList))
-> ClientIO (GRpcReply ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
                 RPCTy p
-> GrpcClient
-> ()
-> GRpcIWTy p vref v
-> (() -> HeaderList -> GRpcOWTy p rref r -> ClientIO ())
-> ExceptT
     ClientError
     IO
     (Either TooMuchConcurrency ((), HeaderList, HeaderList))
forall r i o a.
(GRPCInput r i, GRPCOutput r o) =>
r
-> GrpcClient
-> a
-> i
-> (a -> HeaderList -> o -> ClientIO a)
-> ClientIO (Either TooMuchConcurrency (a, HeaderList, HeaderList))
rawStreamServer @_ @(GRpcIWTy p vref v) @(GRpcOWTy p rref r)
                                 RPCTy p
rpc GrpcClient
client () (Proxy p -> Proxy vref -> v -> GRpcIWTy p vref v
forall (p :: GRpcMessageProtocol) (ref :: TypeRef) r.
GRpcInputWrapper p ref r =>
Proxy p -> Proxy ref -> r -> GRpcIWTy p ref r
buildGRpcIWTy (Proxy p
forall k (t :: k). Proxy t
Proxy @p) (Proxy vref
forall k (t :: k). Proxy t
Proxy @vref) v
x)
                                 (\_ _ newVal :: GRpcOWTy p rref r
newVal -> IO () -> ClientIO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ClientIO ()) -> IO () -> ClientIO ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
                                   -- on the first iteration, say that everything is OK
                                   Bool
_ <- TMVar (GRpcReply ()) -> GRpcReply () -> STM Bool
forall a. TMVar a -> a -> STM Bool
tryPutTMVar TMVar (GRpcReply ())
var (() -> GRpcReply ()
forall a. a -> GRpcReply a
GRpcOk ())
                                   TMChan r -> r -> STM ()
forall a. TMChan a -> a -> STM ()
writeTMChan TMChan r
chan (Proxy p -> Proxy rref -> GRpcOWTy p rref r -> r
forall (p :: GRpcMessageProtocol) (ref :: TypeRef) r.
GRpcOutputWrapper p ref r =>
Proxy p -> Proxy ref -> GRpcOWTy p ref r -> r
unGRpcOWTy (Proxy p
forall k (t :: k). Proxy t
Proxy @p) (Proxy rref
forall k (t :: k). Proxy t
Proxy @rref) GRpcOWTy p rref r
newVal))
            case GRpcReply ()
v of
              GRpcOk () -> IO () -> IO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMChan r -> STM ()
forall a. TMChan a -> STM ()
closeTMChan TMChan r
chan
              _         -> IO () -> IO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMVar (GRpcReply ()) -> GRpcReply () -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar TMVar (GRpcReply ())
var GRpcReply ()
v
         -- This conduit feeds information to the other thread
         let go :: ConduitT () (GRpcReply r) IO ()
go = do GRpcReply ()
firstResult <- IO (GRpcReply ()) -> ConduitT () (GRpcReply r) IO (GRpcReply ())
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (GRpcReply ()) -> ConduitT () (GRpcReply r) IO (GRpcReply ()))
-> IO (GRpcReply ()) -> ConduitT () (GRpcReply r) IO (GRpcReply ())
forall a b. (a -> b) -> a -> b
$ STM (GRpcReply ()) -> IO (GRpcReply ())
forall a. STM a -> IO a
atomically (STM (GRpcReply ()) -> IO (GRpcReply ()))
-> STM (GRpcReply ()) -> IO (GRpcReply ())
forall a b. (a -> b) -> a -> b
$ TMVar (GRpcReply ()) -> STM (GRpcReply ())
forall a. TMVar a -> STM a
takeTMVar TMVar (GRpcReply ())
var
                     case GRpcReply ()
firstResult of
                       GRpcOk _ -> -- no error, everything is fine
                         TMChan r -> ConduitT () r IO ()
forall (m :: * -> *) a. MonadIO m => TMChan a -> ConduitT () a m ()
sourceTMChan TMChan r
chan ConduitT () r IO ()
-> ConduitM r (GRpcReply r) IO ()
-> ConduitT () (GRpcReply r) IO ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| (r -> GRpcReply r) -> ConduitM r (GRpcReply r) IO ()
forall (m :: * -> *) a b. Monad m => (a -> b) -> ConduitT a b m ()
C.map r -> GRpcReply r
forall a. a -> GRpcReply a
GRpcOk
                       e :: GRpcReply ()
e -> GRpcReply r -> ConduitT () (GRpcReply r) IO ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield (GRpcReply r -> ConduitT () (GRpcReply r) IO ())
-> GRpcReply r -> ConduitT () (GRpcReply r) IO ()
forall a b. (a -> b) -> a -> b
$ (\_ -> String -> r
forall a. HasCallStack => String -> a
error "this should never happen") (() -> r) -> GRpcReply () -> GRpcReply r
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> GRpcReply ()
e
         ConduitT () (GRpcReply r) IO ()
-> IO (ConduitT () (GRpcReply r) IO ())
forall (m :: * -> *) a. Monad m => a -> m a
return ConduitT () (GRpcReply r) IO ()
go

instance ( KnownName name
         , GRpcInputWrapper p vref v, GRpcOutputWrapper p rref r
         , handler ~ (CompressMode -> IO (ConduitT v (GRpcReply r) IO ())) )
         => GRpcMethodCall p ('Method name anns '[ 'ArgStream vref ] ('RetStream rref)) handler where
  gRpcMethodCall :: RPCTy p
-> Proxy ('Method name anns '[ 'ArgStream vref] ('RetStream rref))
-> GrpcClient
-> handler
gRpcMethodCall rpc :: RPCTy p
rpc _ client :: GrpcClient
client compress :: CompressMode
compress
    = do -- Create a new TMChan
         TMChan (GRpcReply r)
inchan <- IO (TMChan (GRpcReply r))
forall a. IO (TMChan a)
newTMChanIO :: IO (TMChan (GRpcReply r))
         TMChan v
outchan <- IO (TMChan v)
forall a. IO (TMChan a)
newTMChanIO :: IO (TMChan v)
         TMVar (GRpcReply ())
var <- IO (TMVar (GRpcReply ()))
forall a. IO (TMVar a)
newEmptyTMVarIO  -- if full, this means an error
         -- Start executing the client in another thread
         Async ()
_ <- IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async (IO () -> IO (Async ())) -> IO () -> IO (Async ())
forall a b. (a -> b) -> a -> b
$ do
            GRpcReply ()
v <- ClientIO (GRpcReply ()) -> IO (GRpcReply ())
forall a. ClientIO (GRpcReply a) -> IO (GRpcReply a)
simplifyResponse (ClientIO (GRpcReply ()) -> IO (GRpcReply ()))
-> ClientIO (GRpcReply ()) -> IO (GRpcReply ())
forall a b. (a -> b) -> a -> b
$
                 Either TooMuchConcurrency ((), ()) -> GRpcReply ()
forall v. Either TooMuchConcurrency v -> GRpcReply ()
buildGRpcReply3 (Either TooMuchConcurrency ((), ()) -> GRpcReply ())
-> ExceptT ClientError IO (Either TooMuchConcurrency ((), ()))
-> ClientIO (GRpcReply ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
                 RPCTy p
-> GrpcClient
-> ()
-> (() -> IncomingEvent (GRpcOWTy p rref r) () -> ClientIO ())
-> ()
-> (() -> ClientIO ((), OutgoingEvent (GRpcIWTy p vref v) ()))
-> ExceptT ClientError IO (Either TooMuchConcurrency ((), ()))
forall r i o a b.
(GRPCInput r i, GRPCOutput r o) =>
r
-> GrpcClient
-> a
-> (a -> IncomingEvent o a -> ClientIO a)
-> b
-> (b -> ClientIO (b, OutgoingEvent i b))
-> ClientIO (Either TooMuchConcurrency (a, b))
rawGeneralStream
                   @_ @(GRpcIWTy p vref v) @(GRpcOWTy p rref r)
                   RPCTy p
rpc GrpcClient
client
                   () (\_ ievent :: IncomingEvent (GRpcOWTy p rref r) ()
ievent -> do -- on the first iteration, say that everything is OK
                        Bool
_ <- IO Bool -> ExceptT ClientError IO Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> ExceptT ClientError IO Bool)
-> IO Bool -> ExceptT ClientError IO Bool
forall a b. (a -> b) -> a -> b
$ STM Bool -> IO Bool
forall a. STM a -> IO a
atomically (STM Bool -> IO Bool) -> STM Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ TMVar (GRpcReply ()) -> GRpcReply () -> STM Bool
forall a. TMVar a -> a -> STM Bool
tryPutTMVar TMVar (GRpcReply ())
var (() -> GRpcReply ()
forall a. a -> GRpcReply a
GRpcOk ())
                        case IncomingEvent (GRpcOWTy p rref r) ()
ievent of
                          RecvMessage o :: GRpcOWTy p rref r
o -> IO () -> ClientIO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ClientIO ()) -> IO () -> ClientIO ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMChan (GRpcReply r) -> GRpcReply r -> STM ()
forall a. TMChan a -> a -> STM ()
writeTMChan TMChan (GRpcReply r)
inchan (r -> GRpcReply r
forall a. a -> GRpcReply a
GRpcOk (r -> GRpcReply r) -> r -> GRpcReply r
forall a b. (a -> b) -> a -> b
$ Proxy p -> Proxy rref -> GRpcOWTy p rref r -> r
forall (p :: GRpcMessageProtocol) (ref :: TypeRef) r.
GRpcOutputWrapper p ref r =>
Proxy p -> Proxy ref -> GRpcOWTy p ref r -> r
unGRpcOWTy(Proxy p
forall k (t :: k). Proxy t
Proxy @p) (Proxy rref
forall k (t :: k). Proxy t
Proxy @rref) GRpcOWTy p rref r
o)
                          Invalid e :: SomeException
e -> IO () -> ClientIO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ClientIO ()) -> IO () -> ClientIO ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMChan (GRpcReply r) -> GRpcReply r -> STM ()
forall a. TMChan a -> a -> STM ()
writeTMChan TMChan (GRpcReply r)
inchan (String -> GRpcReply r
forall a. String -> GRpcReply a
GRpcErrorString (SomeException -> String
forall a. Show a => a -> String
show SomeException
e))
                          _ -> () -> ClientIO ()
forall (m :: * -> *) a. Monad m => a -> m a
return () )
                   () (\_ -> do
                        Maybe v
nextVal <- IO (Maybe v) -> ExceptT ClientError IO (Maybe v)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe v) -> ExceptT ClientError IO (Maybe v))
-> IO (Maybe v) -> ExceptT ClientError IO (Maybe v)
forall a b. (a -> b) -> a -> b
$ STM (Maybe v) -> IO (Maybe v)
forall a. STM a -> IO a
atomically (STM (Maybe v) -> IO (Maybe v)) -> STM (Maybe v) -> IO (Maybe v)
forall a b. (a -> b) -> a -> b
$ TMChan v -> STM (Maybe v)
forall a. TMChan a -> STM (Maybe a)
readTMChan TMChan v
outchan
                        case Maybe v
nextVal of
                          Nothing -> ((), OutgoingEvent (GRpcIWTy p vref v) ())
-> ClientIO ((), OutgoingEvent (GRpcIWTy p vref v) ())
forall (m :: * -> *) a. Monad m => a -> m a
return ((), OutgoingEvent (GRpcIWTy p vref v) ()
forall i b. OutgoingEvent i b
Finalize)
                          Just v :: v
v  -> ((), OutgoingEvent (GRpcIWTy p vref v) ())
-> ClientIO ((), OutgoingEvent (GRpcIWTy p vref v) ())
forall (m :: * -> *) a. Monad m => a -> m a
return ((), CompressMode
-> GRpcIWTy p vref v -> OutgoingEvent (GRpcIWTy p vref v) ()
forall i b. CompressMode -> i -> OutgoingEvent i b
SendMessage CompressMode
compress (Proxy p -> Proxy vref -> v -> GRpcIWTy p vref v
forall (p :: GRpcMessageProtocol) (ref :: TypeRef) r.
GRpcInputWrapper p ref r =>
Proxy p -> Proxy ref -> r -> GRpcIWTy p ref r
buildGRpcIWTy (Proxy p
forall k (t :: k). Proxy t
Proxy @p) (Proxy vref
forall k (t :: k). Proxy t
Proxy @vref) v
v)))
            case GRpcReply ()
v of
              GRpcOk () -> IO () -> IO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMChan (GRpcReply r) -> STM ()
forall a. TMChan a -> STM ()
closeTMChan TMChan (GRpcReply r)
inchan
              _         -> IO () -> IO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMVar (GRpcReply ()) -> GRpcReply () -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar TMVar (GRpcReply ())
var GRpcReply ()
v
         -- This conduit feeds information to the other thread
         let go :: ConduitT v (GRpcReply r) IO ()
go = do GRpcReply ()
err <- IO (GRpcReply ()) -> ConduitT v (GRpcReply r) IO (GRpcReply ())
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (GRpcReply ()) -> ConduitT v (GRpcReply r) IO (GRpcReply ()))
-> IO (GRpcReply ()) -> ConduitT v (GRpcReply r) IO (GRpcReply ())
forall a b. (a -> b) -> a -> b
$ STM (GRpcReply ()) -> IO (GRpcReply ())
forall a. STM a -> IO a
atomically (STM (GRpcReply ()) -> IO (GRpcReply ()))
-> STM (GRpcReply ()) -> IO (GRpcReply ())
forall a b. (a -> b) -> a -> b
$ TMVar (GRpcReply ()) -> STM (GRpcReply ())
forall a. TMVar a -> STM a
takeTMVar TMVar (GRpcReply ())
var
                     case GRpcReply ()
err of
                       GRpcOk _ -> ConduitT v (GRpcReply r) IO ()
go2
                       e :: GRpcReply ()
e        -> GRpcReply r -> ConduitT v (GRpcReply r) IO ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield (GRpcReply r -> ConduitT v (GRpcReply r) IO ())
-> GRpcReply r -> ConduitT v (GRpcReply r) IO ()
forall a b. (a -> b) -> a -> b
$ (\_ -> String -> r
forall a. HasCallStack => String -> a
error "this should never happen") (() -> r) -> GRpcReply () -> GRpcReply r
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> GRpcReply ()
e
             go2 :: ConduitT v (GRpcReply r) IO ()
go2 = do Maybe v
nextOut <- ConduitT v (GRpcReply r) IO (Maybe v)
forall (m :: * -> *) i. Monad m => Consumer i m (Maybe i)
await
                      case Maybe v
nextOut of
                        Just v :: v
v  -> do IO () -> ConduitT v (GRpcReply r) IO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ConduitT v (GRpcReply r) IO ())
-> IO () -> ConduitT v (GRpcReply r) IO ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMChan v -> v -> STM ()
forall a. TMChan a -> a -> STM ()
writeTMChan TMChan v
outchan v
v
                                      ConduitT v (GRpcReply r) IO ()
go2
                        Nothing -> do Maybe (Maybe (GRpcReply r))
r <- IO (Maybe (Maybe (GRpcReply r)))
-> ConduitT v (GRpcReply r) IO (Maybe (Maybe (GRpcReply r)))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe (Maybe (GRpcReply r)))
 -> ConduitT v (GRpcReply r) IO (Maybe (Maybe (GRpcReply r))))
-> IO (Maybe (Maybe (GRpcReply r)))
-> ConduitT v (GRpcReply r) IO (Maybe (Maybe (GRpcReply r)))
forall a b. (a -> b) -> a -> b
$ STM (Maybe (Maybe (GRpcReply r)))
-> IO (Maybe (Maybe (GRpcReply r)))
forall a. STM a -> IO a
atomically (STM (Maybe (Maybe (GRpcReply r)))
 -> IO (Maybe (Maybe (GRpcReply r))))
-> STM (Maybe (Maybe (GRpcReply r)))
-> IO (Maybe (Maybe (GRpcReply r)))
forall a b. (a -> b) -> a -> b
$ TMChan (GRpcReply r) -> STM (Maybe (Maybe (GRpcReply r)))
forall a. TMChan a -> STM (Maybe (Maybe a))
tryReadTMChan TMChan (GRpcReply r)
inchan
                                      case Maybe (Maybe (GRpcReply r))
r of
                                        Nothing            -> () -> ConduitT v (GRpcReply r) IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return () -- both are empty, end
                                        Just Nothing       -> ConduitT v (GRpcReply r) IO ()
go2
                                        Just (Just nextIn :: GRpcReply r
nextIn) -> GRpcReply r -> ConduitT v (GRpcReply r) IO ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield GRpcReply r
nextIn ConduitT v (GRpcReply r) IO ()
-> ConduitT v (GRpcReply r) IO () -> ConduitT v (GRpcReply r) IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> ConduitT v (GRpcReply r) IO ()
go2
         ConduitT v (GRpcReply r) IO ()
-> IO (ConduitT v (GRpcReply r) IO ())
forall (m :: * -> *) a. Monad m => a -> m a
return ConduitT v (GRpcReply r) IO ()
go