{-# language DataKinds #-}
{-# language FlexibleContexts #-}
{-# language FlexibleInstances #-}
{-# language GADTs #-}
{-# language MultiParamTypeClasses #-}
{-# language PolyKinds #-}
{-# language RankNTypes #-}
{-# language ScopedTypeVariables #-}
{-# language TypeApplications #-}
{-# language TypeFamilies #-}
{-# language TypeOperators #-}
{-# language UndecidableInstances #-}
module Mu.GRpc.Server
(
GRpcMessageProtocol(..)
, msgProtoBuf, msgAvro
, runGRpcApp, runGRpcAppTrans
, runGRpcAppSettings, Settings
, runGRpcAppTLS, TLSSettings
, gRpcApp, gRpcAppTrans
, raiseErrors, liftServerConduit
, module Avro
) where
import Control.Concurrent.Async
import Control.Concurrent.STM (atomically)
import Control.Concurrent.STM.TMVar
import Control.Exception
import Control.Monad.Except
import Data.Avro
import Data.ByteString (ByteString)
import qualified Data.ByteString.Char8 as BS
import Data.Conduit
import Data.Conduit.TMChan
import Data.Kind
import Data.Proxy
import GHC.TypeLits
import Network.GRPC.HTTP2.Encoding (GRPCInput, GRPCOutput, gzip, uncompressed)
import Network.GRPC.HTTP2.Types (GRPCStatus (..), GRPCStatusCode (..))
import Network.GRPC.Server.Handlers.Trans
import Network.GRPC.Server.Wai as Wai
import Network.Wai (Application, Request, requestHeaders)
import Network.Wai.Handler.Warp (Port, Settings, run, runSettings)
import Network.Wai.Handler.WarpTLS (TLSSettings, runTLS)
import Mu.Adapter.ProtoBuf.Via
import Mu.GRpc.Avro
import qualified Mu.GRpc.Avro as Avro
import Mu.GRpc.Bridge
import Mu.Rpc
import Mu.Schema
import Mu.Server
runGRpcApp
:: ( KnownName name
, GRpcServiceHandlers ('Package ('Just name) services)
protocol ServerErrorIO chn services handlers )
=> Proxy protocol
-> Port
-> ServerT chn () ('Package ('Just name) services) ServerErrorIO handlers
-> IO ()
runGRpcApp :: Proxy protocol
-> Port
-> ServerT
chn () ('Package ('Just name) services) ServerErrorIO handlers
-> IO ()
runGRpcApp protocol :: Proxy protocol
protocol port :: Port
port = Proxy protocol
-> Port
-> (forall a. ServerErrorIO a -> ServerErrorIO a)
-> ServerT
chn () ('Package ('Just name) services) ServerErrorIO handlers
-> IO ()
forall serviceName mnm anm (name :: serviceName)
(services :: [Service serviceName mnm anm (TypeRef serviceName)])
(protocol :: GRpcMessageProtocol) (m :: * -> *)
(chn :: ServiceChain serviceName) (handlers :: [[*]]).
(KnownName name,
GRpcServiceHandlers
('Package ('Just name) services)
protocol
m
chn
services
handlers) =>
Proxy protocol
-> Port
-> (forall a. m a -> ServerErrorIO a)
-> ServerT chn () ('Package ('Just name) services) m handlers
-> IO ()
runGRpcAppTrans Proxy protocol
protocol Port
port forall a. a -> a
forall a. ServerErrorIO a -> ServerErrorIO a
id
runGRpcAppTrans
:: ( KnownName name
, GRpcServiceHandlers ('Package ('Just name) services)
protocol m chn services handlers )
=> Proxy protocol
-> Port
-> (forall a. m a -> ServerErrorIO a)
-> ServerT chn () ('Package ('Just name) services) m handlers
-> IO ()
runGRpcAppTrans :: Proxy protocol
-> Port
-> (forall a. m a -> ServerErrorIO a)
-> ServerT chn () ('Package ('Just name) services) m handlers
-> IO ()
runGRpcAppTrans protocol :: Proxy protocol
protocol port :: Port
port f :: forall a. m a -> ServerErrorIO a
f svr :: ServerT chn () ('Package ('Just name) services) m handlers
svr = Port -> Application -> IO ()
run Port
port (Proxy protocol
-> (forall a. m a -> ServerErrorIO a)
-> ServerT chn () ('Package ('Just name) services) m handlers
-> Application
forall serviceName mnm anm (name :: serviceName)
(services :: [Service serviceName mnm anm (TypeRef serviceName)])
(protocol :: GRpcMessageProtocol) (m :: * -> *)
(chn :: ServiceChain serviceName) (handlers :: [[*]]).
(KnownName name,
GRpcServiceHandlers
('Package ('Just name) services)
protocol
m
chn
services
handlers) =>
Proxy protocol
-> (forall a. m a -> ServerErrorIO a)
-> ServerT chn () ('Package ('Just name) services) m handlers
-> Application
gRpcAppTrans Proxy protocol
protocol forall a. m a -> ServerErrorIO a
f ServerT chn () ('Package ('Just name) services) m handlers
svr)
runGRpcAppSettings
:: ( KnownName name
, GRpcServiceHandlers ('Package ('Just name) services)
protocol m chn services handlers )
=> Proxy protocol
-> Settings
-> (forall a. m a -> ServerErrorIO a)
-> ServerT chn () ('Package ('Just name) services) m handlers
-> IO ()
runGRpcAppSettings :: Proxy protocol
-> Settings
-> (forall a. m a -> ServerErrorIO a)
-> ServerT chn () ('Package ('Just name) services) m handlers
-> IO ()
runGRpcAppSettings protocol :: Proxy protocol
protocol st :: Settings
st f :: forall a. m a -> ServerErrorIO a
f svr :: ServerT chn () ('Package ('Just name) services) m handlers
svr = Settings -> Application -> IO ()
runSettings Settings
st (Proxy protocol
-> (forall a. m a -> ServerErrorIO a)
-> ServerT chn () ('Package ('Just name) services) m handlers
-> Application
forall serviceName mnm anm (name :: serviceName)
(services :: [Service serviceName mnm anm (TypeRef serviceName)])
(protocol :: GRpcMessageProtocol) (m :: * -> *)
(chn :: ServiceChain serviceName) (handlers :: [[*]]).
(KnownName name,
GRpcServiceHandlers
('Package ('Just name) services)
protocol
m
chn
services
handlers) =>
Proxy protocol
-> (forall a. m a -> ServerErrorIO a)
-> ServerT chn () ('Package ('Just name) services) m handlers
-> Application
gRpcAppTrans Proxy protocol
protocol forall a. m a -> ServerErrorIO a
f ServerT chn () ('Package ('Just name) services) m handlers
svr)
runGRpcAppTLS
:: ( KnownName name
, GRpcServiceHandlers ('Package ('Just name) services)
protocol m chn services handlers )
=> Proxy protocol
-> TLSSettings -> Settings
-> (forall a. m a -> ServerErrorIO a)
-> ServerT chn () ('Package ('Just name) services) m handlers
-> IO ()
runGRpcAppTLS :: Proxy protocol
-> TLSSettings
-> Settings
-> (forall a. m a -> ServerErrorIO a)
-> ServerT chn () ('Package ('Just name) services) m handlers
-> IO ()
runGRpcAppTLS protocol :: Proxy protocol
protocol tls :: TLSSettings
tls st :: Settings
st f :: forall a. m a -> ServerErrorIO a
f svr :: ServerT chn () ('Package ('Just name) services) m handlers
svr = TLSSettings -> Settings -> Application -> IO ()
runTLS TLSSettings
tls Settings
st (Proxy protocol
-> (forall a. m a -> ServerErrorIO a)
-> ServerT chn () ('Package ('Just name) services) m handlers
-> Application
forall serviceName mnm anm (name :: serviceName)
(services :: [Service serviceName mnm anm (TypeRef serviceName)])
(protocol :: GRpcMessageProtocol) (m :: * -> *)
(chn :: ServiceChain serviceName) (handlers :: [[*]]).
(KnownName name,
GRpcServiceHandlers
('Package ('Just name) services)
protocol
m
chn
services
handlers) =>
Proxy protocol
-> (forall a. m a -> ServerErrorIO a)
-> ServerT chn () ('Package ('Just name) services) m handlers
-> Application
gRpcAppTrans Proxy protocol
protocol forall a. m a -> ServerErrorIO a
f ServerT chn () ('Package ('Just name) services) m handlers
svr)
gRpcApp
:: ( KnownName name
, GRpcServiceHandlers ('Package ('Just name) services)
protocol ServerErrorIO chn services handlers )
=> Proxy protocol
-> ServerT chn () ('Package ('Just name) services) ServerErrorIO handlers
-> Application
gRpcApp :: Proxy protocol
-> ServerT
chn () ('Package ('Just name) services) ServerErrorIO handlers
-> Application
gRpcApp protocol :: Proxy protocol
protocol = Proxy protocol
-> (forall a. ServerErrorIO a -> ServerErrorIO a)
-> ServerT
chn () ('Package ('Just name) services) ServerErrorIO handlers
-> Application
forall serviceName mnm anm (name :: serviceName)
(services :: [Service serviceName mnm anm (TypeRef serviceName)])
(protocol :: GRpcMessageProtocol) (m :: * -> *)
(chn :: ServiceChain serviceName) (handlers :: [[*]]).
(KnownName name,
GRpcServiceHandlers
('Package ('Just name) services)
protocol
m
chn
services
handlers) =>
Proxy protocol
-> (forall a. m a -> ServerErrorIO a)
-> ServerT chn () ('Package ('Just name) services) m handlers
-> Application
gRpcAppTrans Proxy protocol
protocol forall a. a -> a
forall a. ServerErrorIO a -> ServerErrorIO a
id
gRpcAppTrans
:: ( KnownName name
, GRpcServiceHandlers ('Package ('Just name) services)
protocol m chn services handlers )
=> Proxy protocol
-> (forall a. m a -> ServerErrorIO a)
-> ServerT chn () ('Package ('Just name) services) m handlers
-> Application
gRpcAppTrans :: Proxy protocol
-> (forall a. m a -> ServerErrorIO a)
-> ServerT chn () ('Package ('Just name) services) m handlers
-> Application
gRpcAppTrans protocol :: Proxy protocol
protocol f :: forall a. m a -> ServerErrorIO a
f svr :: ServerT chn () ('Package ('Just name) services) m handlers
svr
= [Compression] -> [ServiceHandler] -> Application
Wai.grpcApp [Compression
uncompressed, Compression
gzip]
(Proxy protocol
-> (forall a. m a -> ServerErrorIO a)
-> ServerT chn () ('Package ('Just name) services) m handlers
-> [ServiceHandler]
forall serviceName mnm anm (name :: serviceName)
(services :: [Service serviceName mnm anm (TypeRef serviceName)])
(handlers :: [[*]]) (m :: * -> *) (protocol :: GRpcMessageProtocol)
(chn :: ServiceChain serviceName).
(KnownName name,
GRpcServiceHandlers
('Package ('Just name) services)
protocol
m
chn
services
handlers) =>
Proxy protocol
-> (forall a. m a -> ServerErrorIO a)
-> ServerT chn () ('Package ('Just name) services) m handlers
-> [ServiceHandler]
gRpcServerHandlers Proxy protocol
protocol forall a. m a -> ServerErrorIO a
f ServerT chn () ('Package ('Just name) services) m handlers
svr)
gRpcServerHandlers
:: forall name services handlers m protocol chn.
( KnownName name
, GRpcServiceHandlers ('Package ('Just name) services)
protocol m chn services handlers )
=> Proxy protocol
-> (forall a. m a -> ServerErrorIO a)
-> ServerT chn () ('Package ('Just name) services) m handlers
-> [ServiceHandler]
gRpcServerHandlers :: Proxy protocol
-> (forall a. m a -> ServerErrorIO a)
-> ServerT chn () ('Package ('Just name) services) m handlers
-> [ServiceHandler]
gRpcServerHandlers pr :: Proxy protocol
pr f :: forall a. m a -> ServerErrorIO a
f (Services svr :: ServicesT chn () s1 m handlers
svr)
= (forall a. m a -> ServerErrorIO a)
-> Proxy ('Package ('Just name) services)
-> Proxy protocol
-> ByteString
-> ServicesT chn () s1 m handlers
-> [ServiceHandler]
forall snm mnm anm (fullP :: Package snm mnm anm (TypeRef snm))
(p :: GRpcMessageProtocol) (m :: * -> *) (chn :: ServiceChain snm)
(ss :: [Service snm mnm anm (TypeRef snm)]) (hs :: [[*]]).
GRpcServiceHandlers fullP p m chn ss hs =>
(forall a. m a -> ServerErrorIO a)
-> Proxy fullP
-> Proxy p
-> ByteString
-> ServicesT chn () ss m hs
-> [ServiceHandler]
gRpcServiceHandlers forall a. m a -> ServerErrorIO a
f (Proxy ('Package ('Just name) services)
forall k (t :: k). Proxy t
Proxy @('Package ('Just name) services)) Proxy protocol
pr ByteString
packageName ServicesT chn () s1 m handlers
svr
where packageName :: ByteString
packageName = String -> ByteString
BS.pack (Proxy name -> String
forall k (a :: k) (proxy :: k -> *).
KnownName a =>
proxy a -> String
nameVal (Proxy name
forall k (t :: k). Proxy t
Proxy @name))
class GRpcServiceHandlers (fullP :: Package snm mnm anm (TypeRef snm))
(p :: GRpcMessageProtocol) (m :: Type -> Type)
(chn :: ServiceChain snm)
(ss :: [Service snm mnm anm (TypeRef snm)]) (hs :: [[Type]]) where
gRpcServiceHandlers :: (forall a. m a -> ServerErrorIO a)
-> Proxy fullP -> Proxy p -> ByteString
-> ServicesT chn () ss m hs -> [ServiceHandler]
instance GRpcServiceHandlers fullP p m chn '[] '[] where
gRpcServiceHandlers :: (forall a. m a -> ServerErrorIO a)
-> Proxy fullP
-> Proxy p
-> ByteString
-> ServicesT chn () '[] m '[]
-> [ServiceHandler]
gRpcServiceHandlers _ _ _ _ S0 = []
instance ( KnownName name
, GRpcMethodHandlers fullP ('Service name methods)
p m chn (MappingRight chn name) methods h
, GRpcServiceHandlers fullP p m chn rest hs )
=> GRpcServiceHandlers fullP p m chn ('Service name methods ': rest) (h ': hs) where
gRpcServiceHandlers :: (forall a. m a -> ServerErrorIO a)
-> Proxy fullP
-> Proxy p
-> ByteString
-> ServicesT chn () ('Service name methods : rest) m (h : hs)
-> [ServiceHandler]
gRpcServiceHandlers f :: forall a. m a -> ServerErrorIO a
f pfullP :: Proxy fullP
pfullP pr :: Proxy p
pr packageName :: ByteString
packageName (ProperSvc svr :: HandlersT chn () (MappingRight chn sname) methods m hs1
svr :<&>: rest :: ServicesT chn () rest m hss
rest)
= (forall a. m a -> ServerErrorIO a)
-> Proxy fullP
-> Proxy ('Service name methods)
-> Proxy p
-> ByteString
-> ByteString
-> HandlersT chn () (MappingRight chn name) methods m hs1
-> [ServiceHandler]
forall snm mnm anm (fullP :: Package snm mnm anm (TypeRef snm))
(fullS :: Service snm mnm anm (TypeRef snm))
(p :: GRpcMessageProtocol) (m :: * -> *) (chn :: ServiceChain snm)
inh (ms :: [Method snm mnm anm (TypeRef snm)]) (hs :: [*]).
GRpcMethodHandlers fullP fullS p m chn inh ms hs =>
(forall a. m a -> ServerErrorIO a)
-> Proxy fullP
-> Proxy fullS
-> Proxy p
-> ByteString
-> ByteString
-> HandlersT chn () inh ms m hs
-> [ServiceHandler]
gRpcMethodHandlers forall a. m a -> ServerErrorIO a
f Proxy fullP
pfullP (Proxy ('Service name methods)
forall k (t :: k). Proxy t
Proxy @('Service name methods)) Proxy p
pr
ByteString
packageName ByteString
serviceName HandlersT chn () (MappingRight chn name) methods m hs1
HandlersT chn () (MappingRight chn sname) methods m hs1
svr
[ServiceHandler] -> [ServiceHandler] -> [ServiceHandler]
forall a. [a] -> [a] -> [a]
++ (forall a. m a -> ServerErrorIO a)
-> Proxy fullP
-> Proxy p
-> ByteString
-> ServicesT chn () rest m hss
-> [ServiceHandler]
forall snm mnm anm (fullP :: Package snm mnm anm (TypeRef snm))
(p :: GRpcMessageProtocol) (m :: * -> *) (chn :: ServiceChain snm)
(ss :: [Service snm mnm anm (TypeRef snm)]) (hs :: [[*]]).
GRpcServiceHandlers fullP p m chn ss hs =>
(forall a. m a -> ServerErrorIO a)
-> Proxy fullP
-> Proxy p
-> ByteString
-> ServicesT chn () ss m hs
-> [ServiceHandler]
gRpcServiceHandlers forall a. m a -> ServerErrorIO a
f Proxy fullP
pfullP Proxy p
pr ByteString
packageName ServicesT chn () rest m hss
rest
where serviceName :: ByteString
serviceName = String -> ByteString
BS.pack (Proxy name -> String
forall k (a :: k) (proxy :: k -> *).
KnownName a =>
proxy a -> String
nameVal (Proxy name
forall k (t :: k). Proxy t
Proxy @name))
instance ( GHC.TypeLits.TypeError ('Text "unions are not supported in gRPC") )
=> GRpcServiceHandlers fullP p m chn ('OneOf name methods ': rest) hs where
gRpcServiceHandlers :: (forall a. m a -> ServerErrorIO a)
-> Proxy fullP
-> Proxy p
-> ByteString
-> ServicesT chn () ('OneOf name methods : rest) m hs
-> [ServiceHandler]
gRpcServiceHandlers _ = String
-> Proxy fullP
-> Proxy p
-> ByteString
-> ServicesT chn () ('OneOf name methods : rest) m hs
-> [ServiceHandler]
forall a. HasCallStack => String -> a
error "unions are not supported in gRPC"
class GRpcMethodHandlers (fullP :: Package snm mnm anm (TypeRef snm))
(fullS :: Service snm mnm anm (TypeRef snm))
(p :: GRpcMessageProtocol) (m :: Type -> Type)
(chn :: ServiceChain snm) (inh :: Type)
(ms :: [Method snm mnm anm (TypeRef snm)]) (hs :: [Type]) where
gRpcMethodHandlers :: (forall a. m a -> ServerErrorIO a)
-> Proxy fullP -> Proxy fullS -> Proxy p -> ByteString -> ByteString
-> HandlersT chn () inh ms m hs -> [ServiceHandler]
instance GRpcMethodHandlers fullP fullS p m chn inh '[] '[] where
gRpcMethodHandlers :: (forall a. m a -> ServerErrorIO a)
-> Proxy fullP
-> Proxy fullS
-> Proxy p
-> ByteString
-> ByteString
-> HandlersT chn () inh '[] m '[]
-> [ServiceHandler]
gRpcMethodHandlers _ _ _ _ _ _ H0 = []
instance ( KnownName name, MkRPC p
, ReflectRpcInfo fullP fullS ('Method name args r)
, GRpcMethodHandler p m args r h
, GRpcMethodHandlers fullP fullS p m chn () rest hs)
=> GRpcMethodHandlers fullP fullS p m chn ()
('Method name args r ': rest) (h ': hs) where
gRpcMethodHandlers :: (forall a. m a -> ServerErrorIO a)
-> Proxy fullP
-> Proxy fullS
-> Proxy p
-> ByteString
-> ByteString
-> HandlersT chn () () ('Method name args r : rest) m (h : hs)
-> [ServiceHandler]
gRpcMethodHandlers f :: forall a. m a -> ServerErrorIO a
f pfullP :: Proxy fullP
pfullP pfullS :: Proxy fullS
pfullS pr :: Proxy p
pr p :: ByteString
p s :: ByteString
s (Hmore _ _ h :: RpcInfo () -> () -> h
h rest :: HandlersT chn () () ms m hs1
rest)
= (forall a. m a -> ServerErrorIO a)
-> Proxy p
-> Proxy args
-> Proxy r
-> RPCTy p
-> (Request -> h)
-> ServiceHandler
forall k snm anm (p :: GRpcMessageProtocol) (m :: * -> *)
(args :: [Argument snm anm (TypeRef snm)]) (r :: k) h.
GRpcMethodHandler p m args r h =>
(forall a. m a -> ServerErrorIO a)
-> Proxy p
-> Proxy args
-> Proxy r
-> RPCTy p
-> (Request -> h)
-> ServiceHandler
gRpcMethodHandler forall a. m a -> ServerErrorIO a
f Proxy p
pr (Proxy args
forall k (t :: k). Proxy t
Proxy @args) (Proxy r
forall k (t :: k). Proxy t
Proxy @r) (Proxy p -> ByteString -> ByteString -> ByteString -> RPCTy p
forall (p :: GRpcMessageProtocol).
MkRPC p =>
Proxy p -> ByteString -> ByteString -> ByteString -> RPCTy p
mkRPC Proxy p
pr ByteString
p ByteString
s ByteString
methodName)
(\req :: Request
req -> RpcInfo () -> () -> h
h (RequestHeaders -> RpcInfo ()
reflectInfo (Request -> RequestHeaders
requestHeaders Request
req)) ())
ServiceHandler -> [ServiceHandler] -> [ServiceHandler]
forall a. a -> [a] -> [a]
: (forall a. m a -> ServerErrorIO a)
-> Proxy fullP
-> Proxy fullS
-> Proxy p
-> ByteString
-> ByteString
-> HandlersT chn () () ms m hs1
-> [ServiceHandler]
forall snm mnm anm (fullP :: Package snm mnm anm (TypeRef snm))
(fullS :: Service snm mnm anm (TypeRef snm))
(p :: GRpcMessageProtocol) (m :: * -> *) (chn :: ServiceChain snm)
inh (ms :: [Method snm mnm anm (TypeRef snm)]) (hs :: [*]).
GRpcMethodHandlers fullP fullS p m chn inh ms hs =>
(forall a. m a -> ServerErrorIO a)
-> Proxy fullP
-> Proxy fullS
-> Proxy p
-> ByteString
-> ByteString
-> HandlersT chn () inh ms m hs
-> [ServiceHandler]
gRpcMethodHandlers forall a. m a -> ServerErrorIO a
f Proxy fullP
pfullP Proxy fullS
pfullS Proxy p
pr ByteString
p ByteString
s HandlersT chn () () ms m hs1
rest
where methodName :: ByteString
methodName = String -> ByteString
BS.pack (Proxy name -> String
forall k (a :: k) (proxy :: k -> *).
KnownName a =>
proxy a -> String
nameVal (Proxy name
forall k (t :: k). Proxy t
Proxy @name))
reflectInfo :: RequestHeaders -> RpcInfo ()
reflectInfo hdrs :: RequestHeaders
hdrs
= Proxy fullP
-> Proxy fullS
-> Proxy ('Method name args r)
-> RequestHeaders
-> ()
-> RpcInfo ()
forall (p :: Package Symbol Symbol Symbol (TypeRef Symbol))
(s :: Service Symbol Symbol Symbol (TypeRef Symbol))
(m :: Method Symbol Symbol Symbol (TypeRef Symbol)) i.
ReflectRpcInfo p s m =>
Proxy p -> Proxy s -> Proxy m -> RequestHeaders -> i -> RpcInfo i
reflectRpcInfo Proxy fullP
pfullP Proxy fullS
pfullS (Proxy ('Method name args r)
forall k (t :: k). Proxy t
Proxy @('Method name args r)) RequestHeaders
hdrs ()
class GRpcMethodHandler p m (args :: [Argument snm anm (TypeRef snm)]) r h where
gRpcMethodHandler :: (forall a. m a -> ServerErrorIO a)
-> Proxy p -> Proxy args -> Proxy r
-> RPCTy p -> (Request -> h) -> ServiceHandler
liftServerConduit
:: MonadIO m
=> ConduitT a b ServerErrorIO r -> ConduitT a b m r
liftServerConduit :: ConduitT a b ServerErrorIO r -> ConduitT a b m r
liftServerConduit = (forall a. ServerErrorIO a -> m a)
-> ConduitT a b ServerErrorIO r -> ConduitT a b m r
forall (m :: * -> *) (n :: * -> *) i o r.
Monad m =>
(forall a. m a -> n a) -> ConduitT i o m r -> ConduitT i o n r
transPipe forall a. ServerErrorIO a -> m a
forall (m :: * -> *) a. MonadIO m => ServerErrorIO a -> m a
raiseErrors
raiseErrors :: MonadIO m => ServerErrorIO a -> m a
raiseErrors :: ServerErrorIO a -> m a
raiseErrors h :: ServerErrorIO a
h
= IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO a -> m a) -> IO a -> m a
forall a b. (a -> b) -> a -> b
$ do
Either ServerError a
h' <- ServerErrorIO a -> IO (Either ServerError a)
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT ServerErrorIO a
h
case Either ServerError a
h' of
Right r :: a
r -> a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure a
r
Left (ServerError code :: ServerErrorCode
code msg :: String
msg)
-> GRPCStatus -> IO a
forall (m :: * -> *) a. MonadIO m => GRPCStatus -> m a
closeEarly (GRPCStatus -> IO a) -> GRPCStatus -> IO a
forall a b. (a -> b) -> a -> b
$ GRPCStatusCode -> ByteString -> GRPCStatus
GRPCStatus (ServerErrorCode -> GRPCStatusCode
serverErrorToGRpcError ServerErrorCode
code)
(String -> ByteString
BS.pack String
msg)
IO a -> [Handler a] -> IO a
forall a. IO a -> [Handler a] -> IO a
`catches`
[ (GRPCStatus -> IO a) -> Handler a
forall a e. Exception e => (e -> IO a) -> Handler a
Handler (\(GRPCStatus
e :: GRPCStatus) -> GRPCStatus -> IO a
forall e a. Exception e => e -> IO a
throwIO GRPCStatus
e)
, (SomeException -> IO a) -> Handler a
forall a e. Exception e => (e -> IO a) -> Handler a
Handler (\(SomeException
e :: SomeException) -> GRPCStatus -> IO a
forall (m :: * -> *) a. MonadIO m => GRPCStatus -> m a
closeEarly (GRPCStatus -> IO a) -> GRPCStatus -> IO a
forall a b. (a -> b) -> a -> b
$ GRPCStatusCode -> ByteString -> GRPCStatus
GRPCStatus GRPCStatusCode
INTERNAL (String -> ByteString
BS.pack (String -> ByteString) -> String -> ByteString
forall a b. (a -> b) -> a -> b
$ SomeException -> String
forall a. Show a => a -> String
show SomeException
e))
]
where
serverErrorToGRpcError :: ServerErrorCode -> GRPCStatusCode
serverErrorToGRpcError :: ServerErrorCode -> GRPCStatusCode
serverErrorToGRpcError Unknown = GRPCStatusCode
UNKNOWN
serverErrorToGRpcError Unavailable = GRPCStatusCode
UNAVAILABLE
serverErrorToGRpcError Unimplemented = GRPCStatusCode
UNIMPLEMENTED
serverErrorToGRpcError Unauthenticated = GRPCStatusCode
UNAUTHENTICATED
serverErrorToGRpcError Internal = GRPCStatusCode
INTERNAL
serverErrorToGRpcError NotFound = GRPCStatusCode
NOT_FOUND
serverErrorToGRpcError Invalid = GRPCStatusCode
INVALID_ARGUMENT
class GRPCOutput (RPCTy p) (GRpcOWTy p ref r)
=> GRpcOutputWrapper (p :: GRpcMessageProtocol) (ref :: TypeRef snm) (r :: Type) where
type GRpcOWTy p ref r :: Type
buildGRpcOWTy :: Proxy p -> Proxy ref -> r -> GRpcOWTy p ref r
instance ToProtoBufTypeRef ref r
=> GRpcOutputWrapper 'MsgProtoBuf ref r where
type GRpcOWTy 'MsgProtoBuf ref r = ViaToProtoBufTypeRef ref r
buildGRpcOWTy :: Proxy 'MsgProtoBuf -> Proxy ref -> r -> GRpcOWTy 'MsgProtoBuf ref r
buildGRpcOWTy _ _ = r -> GRpcOWTy 'MsgProtoBuf ref r
forall snm (ref :: TypeRef snm) t. t -> ViaToProtoBufTypeRef ref t
ViaToProtoBufTypeRef
instance forall (sch :: Schema') sty (r :: Type).
( ToSchema sch sty r
, ToAvro (WithSchema sch sty r)
, HasAvroSchema (WithSchema sch sty r) )
=> GRpcOutputWrapper 'MsgAvro ('SchemaRef sch sty) r where
type GRpcOWTy 'MsgAvro ('SchemaRef sch sty) r = ViaToAvroTypeRef ('SchemaRef sch sty) r
buildGRpcOWTy :: Proxy 'MsgAvro
-> Proxy ('SchemaRef sch sty)
-> r
-> GRpcOWTy 'MsgAvro ('SchemaRef sch sty) r
buildGRpcOWTy _ _ = r -> GRpcOWTy 'MsgAvro ('SchemaRef sch sty) r
forall snm (ref :: TypeRef snm) t. t -> ViaToAvroTypeRef ref t
ViaToAvroTypeRef
class GRPCInput (RPCTy p) (GRpcIWTy p ref r)
=> GRpcInputWrapper (p :: GRpcMessageProtocol) (ref :: TypeRef snm) (r :: Type) where
type GRpcIWTy p ref r :: Type
unGRpcIWTy :: Proxy p -> Proxy ref -> GRpcIWTy p ref r -> r
instance FromProtoBufTypeRef ref r
=> GRpcInputWrapper 'MsgProtoBuf ref r where
type GRpcIWTy 'MsgProtoBuf ref r = ViaFromProtoBufTypeRef ref r
unGRpcIWTy :: Proxy 'MsgProtoBuf -> Proxy ref -> GRpcIWTy 'MsgProtoBuf ref r -> r
unGRpcIWTy _ _ = GRpcIWTy 'MsgProtoBuf ref r -> r
forall snm (ref :: TypeRef snm) t.
ViaFromProtoBufTypeRef ref t -> t
unViaFromProtoBufTypeRef
instance forall (sch :: Schema') sty (r :: Type).
( FromSchema sch sty r
, FromAvro (WithSchema sch sty r)
, HasAvroSchema (WithSchema sch sty r) )
=> GRpcInputWrapper 'MsgAvro ('SchemaRef sch sty) r where
type GRpcIWTy 'MsgAvro ('SchemaRef sch sty) r = ViaFromAvroTypeRef ('SchemaRef sch sty) r
unGRpcIWTy :: Proxy 'MsgAvro
-> Proxy ('SchemaRef sch sty)
-> GRpcIWTy 'MsgAvro ('SchemaRef sch sty) r
-> r
unGRpcIWTy _ _ = GRpcIWTy 'MsgAvro ('SchemaRef sch sty) r -> r
forall snm (ref :: TypeRef snm) t. ViaFromAvroTypeRef ref t -> t
unViaFromAvroTypeRef
instance (MonadIO m, GRPCInput (RPCTy p) (), GRPCOutput (RPCTy p) ())
=> GRpcMethodHandler p m '[ ] 'RetNothing (m ()) where
gRpcMethodHandler :: (forall a. m a -> ServerErrorIO a)
-> Proxy p
-> Proxy '[]
-> Proxy 'RetNothing
-> RPCTy p
-> (Request -> m ())
-> ServiceHandler
gRpcMethodHandler f :: forall a. m a -> ServerErrorIO a
f _ _ _ rpc :: RPCTy p
rpc h :: Request -> m ()
h
= (forall x. m x -> IO x)
-> RPCTy p -> UnaryHandler m () () -> ServiceHandler
forall (m :: * -> *) r i o.
(MonadIO m, GRPCInput r i, GRPCOutput r o) =>
(forall x. m x -> IO x)
-> r -> UnaryHandler m i o -> ServiceHandler
unary @m @_ @() @() (ServerErrorIO x -> IO x
forall (m :: * -> *) a. MonadIO m => ServerErrorIO a -> m a
raiseErrors (ServerErrorIO x -> IO x)
-> (m x -> ServerErrorIO x) -> m x -> IO x
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m x -> ServerErrorIO x
forall a. m a -> ServerErrorIO a
f) RPCTy p
rpc (\req :: Request
req _ -> Request -> m ()
h Request
req)
instance (MonadIO m, GRPCInput (RPCTy p) (), GRpcOutputWrapper p rref r)
=> GRpcMethodHandler p m '[ ] ('RetSingle rref) (m r) where
gRpcMethodHandler :: (forall a. m a -> ServerErrorIO a)
-> Proxy p
-> Proxy '[]
-> Proxy ('RetSingle rref)
-> RPCTy p
-> (Request -> m r)
-> ServiceHandler
gRpcMethodHandler f :: forall a. m a -> ServerErrorIO a
f _ _ _ rpc :: RPCTy p
rpc h :: Request -> m r
h
= (forall x. m x -> IO x)
-> RPCTy p
-> UnaryHandler m () (GRpcOWTy p rref r)
-> ServiceHandler
forall (m :: * -> *) r i o.
(MonadIO m, GRPCInput r i, GRPCOutput r o) =>
(forall x. m x -> IO x)
-> r -> UnaryHandler m i o -> ServiceHandler
unary @m @_ @() @(GRpcOWTy p rref r)
(ServerErrorIO x -> IO x
forall (m :: * -> *) a. MonadIO m => ServerErrorIO a -> m a
raiseErrors (ServerErrorIO x -> IO x)
-> (m x -> ServerErrorIO x) -> m x -> IO x
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m x -> ServerErrorIO x
forall a. m a -> ServerErrorIO a
f) RPCTy p
rpc
(\req :: Request
req _ -> Proxy p -> Proxy rref -> r -> GRpcOWTy p rref r
forall snm (p :: GRpcMessageProtocol) (ref :: TypeRef snm) r.
GRpcOutputWrapper p ref r =>
Proxy p -> Proxy ref -> r -> GRpcOWTy p ref r
buildGRpcOWTy (Proxy p
forall k (t :: k). Proxy t
Proxy @p) (Proxy rref
forall k (t :: k). Proxy t
Proxy @rref) (r -> GRpcOWTy p rref r) -> m r -> m (GRpcOWTy p rref r)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Request -> m r
h Request
req)
instance (MonadIO m, GRPCInput (RPCTy p) (), GRpcOutputWrapper p rref r, MonadIO m)
=> GRpcMethodHandler p m '[ ] ('RetStream rref)
(ConduitT r Void m () -> m ()) where
gRpcMethodHandler :: (forall a. m a -> ServerErrorIO a)
-> Proxy p
-> Proxy '[]
-> Proxy ('RetStream rref)
-> RPCTy p
-> (Request -> ConduitT r Void m () -> m ())
-> ServiceHandler
gRpcMethodHandler f :: forall a. m a -> ServerErrorIO a
f _ _ _ rpc :: RPCTy p
rpc h :: Request -> ConduitT r Void m () -> m ()
h
= (forall x. m x -> IO x)
-> RPCTy p
-> ServerStreamHandler m () (GRpcOWTy p rref r) ()
-> ServiceHandler
forall (m :: * -> *) r i o a.
(MonadIO m, GRPCInput r i, GRPCOutput r o) =>
(forall x. m x -> IO x)
-> r -> ServerStreamHandler m i o a -> ServiceHandler
serverStream @m @_ @() @(GRpcOWTy p rref r) (ServerErrorIO x -> IO x
forall (m :: * -> *) a. MonadIO m => ServerErrorIO a -> m a
raiseErrors (ServerErrorIO x -> IO x)
-> (m x -> ServerErrorIO x) -> m x -> IO x
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m x -> ServerErrorIO x
forall a. m a -> ServerErrorIO a
f) RPCTy p
rpc ServerStreamHandler m () (GRpcOWTy p rref r) ()
sstream
where sstream :: Request -> ()
-> m ((), ServerStream m (GRpcOWTy p rref r) ())
sstream :: ServerStreamHandler m () (GRpcOWTy p rref r) ()
sstream req :: Request
req _ = do
TMVar (Maybe r)
var <- IO (TMVar (Maybe r)) -> m (TMVar (Maybe r))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (TMVar (Maybe r))
forall a. IO (TMVar a)
newEmptyTMVarIO :: m (TMVar (Maybe r))
Async ()
promise <- IO (Async ()) -> m (Async ())
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Async ()) -> m (Async ())) -> IO (Async ()) -> m (Async ())
forall a b. (a -> b) -> a -> b
$ IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async (ServerErrorIO () -> IO ()
forall (m :: * -> *) a. MonadIO m => ServerErrorIO a -> m a
raiseErrors (ServerErrorIO () -> IO ()) -> ServerErrorIO () -> IO ()
forall a b. (a -> b) -> a -> b
$ m () -> ServerErrorIO ()
forall a. m a -> ServerErrorIO a
f (Request -> ConduitT r Void m () -> m ()
h Request
req (TMVar (Maybe r) -> ConduitT r Void m ()
forall (m :: * -> *) r.
MonadIO m =>
TMVar (Maybe r) -> ConduitT r Void m ()
toTMVarConduit TMVar (Maybe r)
var)))
let readNext :: () -> m (Maybe ((), GRpcOWTy p rref r))
readNext _
= do Maybe r
nextOutput <- IO (Maybe r) -> m (Maybe r)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe r) -> m (Maybe r)) -> IO (Maybe r) -> m (Maybe r)
forall a b. (a -> b) -> a -> b
$ STM (Maybe r) -> IO (Maybe r)
forall a. STM a -> IO a
atomically (STM (Maybe r) -> IO (Maybe r)) -> STM (Maybe r) -> IO (Maybe r)
forall a b. (a -> b) -> a -> b
$ TMVar (Maybe r) -> STM (Maybe r)
forall a. TMVar a -> STM a
takeTMVar TMVar (Maybe r)
var
case Maybe r
nextOutput of
Just o :: r
o -> Maybe ((), GRpcOWTy p rref r) -> m (Maybe ((), GRpcOWTy p rref r))
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe ((), GRpcOWTy p rref r)
-> m (Maybe ((), GRpcOWTy p rref r)))
-> Maybe ((), GRpcOWTy p rref r)
-> m (Maybe ((), GRpcOWTy p rref r))
forall a b. (a -> b) -> a -> b
$ ((), GRpcOWTy p rref r) -> Maybe ((), GRpcOWTy p rref r)
forall a. a -> Maybe a
Just ((), Proxy p -> Proxy rref -> r -> GRpcOWTy p rref r
forall snm (p :: GRpcMessageProtocol) (ref :: TypeRef snm) r.
GRpcOutputWrapper p ref r =>
Proxy p -> Proxy ref -> r -> GRpcOWTy p ref r
buildGRpcOWTy (Proxy p
forall k (t :: k). Proxy t
Proxy @p) (Proxy rref
forall k (t :: k). Proxy t
Proxy @rref) r
o)
Nothing -> do IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Async () -> IO ()
forall a. Async a -> IO ()
cancel Async ()
promise
Maybe ((), GRpcOWTy p rref r) -> m (Maybe ((), GRpcOWTy p rref r))
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe ((), GRpcOWTy p rref r)
forall a. Maybe a
Nothing
((), ServerStream m (GRpcOWTy p rref r) ())
-> m ((), ServerStream m (GRpcOWTy p rref r) ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure ((), (() -> m (Maybe ((), GRpcOWTy p rref r)))
-> ServerStream m (GRpcOWTy p rref r) ()
forall (m :: * -> *) o a.
(a -> m (Maybe (a, o))) -> ServerStream m o a
ServerStream () -> m (Maybe ((), GRpcOWTy p rref r))
readNext)
instance (MonadIO m, GRpcInputWrapper p vref v, GRPCOutput (RPCTy p) ())
=> GRpcMethodHandler p m '[ 'ArgSingle aname vref ] 'RetNothing (v -> m ()) where
gRpcMethodHandler :: (forall a. m a -> ServerErrorIO a)
-> Proxy p
-> Proxy '[ 'ArgSingle aname vref]
-> Proxy 'RetNothing
-> RPCTy p
-> (Request -> v -> m ())
-> ServiceHandler
gRpcMethodHandler f :: forall a. m a -> ServerErrorIO a
f _ _ _ rpc :: RPCTy p
rpc h :: Request -> v -> m ()
h
= (forall x. m x -> IO x)
-> RPCTy p
-> UnaryHandler m (GRpcIWTy p vref v) ()
-> ServiceHandler
forall (m :: * -> *) r i o.
(MonadIO m, GRPCInput r i, GRPCOutput r o) =>
(forall x. m x -> IO x)
-> r -> UnaryHandler m i o -> ServiceHandler
unary @m @_ @(GRpcIWTy p vref v) @()
(ServerErrorIO x -> IO x
forall (m :: * -> *) a. MonadIO m => ServerErrorIO a -> m a
raiseErrors (ServerErrorIO x -> IO x)
-> (m x -> ServerErrorIO x) -> m x -> IO x
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m x -> ServerErrorIO x
forall a. m a -> ServerErrorIO a
f) RPCTy p
rpc
(\req :: Request
req -> Request -> v -> m ()
h Request
req (v -> m ())
-> (GRpcIWTy p vref v -> v) -> GRpcIWTy p vref v -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Proxy p -> Proxy vref -> GRpcIWTy p vref v -> v
forall snm (p :: GRpcMessageProtocol) (ref :: TypeRef snm) r.
GRpcInputWrapper p ref r =>
Proxy p -> Proxy ref -> GRpcIWTy p ref r -> r
unGRpcIWTy (Proxy p
forall k (t :: k). Proxy t
Proxy @p) (Proxy vref
forall k (t :: k). Proxy t
Proxy @vref))
instance (MonadIO m, GRpcInputWrapper p vref v, GRpcOutputWrapper p rref r)
=> GRpcMethodHandler p m '[ 'ArgSingle aname vref ] ('RetSingle rref) (v -> m r) where
gRpcMethodHandler :: (forall a. m a -> ServerErrorIO a)
-> Proxy p
-> Proxy '[ 'ArgSingle aname vref]
-> Proxy ('RetSingle rref)
-> RPCTy p
-> (Request -> v -> m r)
-> ServiceHandler
gRpcMethodHandler f :: forall a. m a -> ServerErrorIO a
f _ _ _ rpc :: RPCTy p
rpc h :: Request -> v -> m r
h
= (forall x. m x -> IO x)
-> RPCTy p
-> UnaryHandler m (GRpcIWTy p vref v) (GRpcOWTy p rref r)
-> ServiceHandler
forall (m :: * -> *) r i o.
(MonadIO m, GRPCInput r i, GRPCOutput r o) =>
(forall x. m x -> IO x)
-> r -> UnaryHandler m i o -> ServiceHandler
unary @m @_ @(GRpcIWTy p vref v) @(GRpcOWTy p rref r)
(ServerErrorIO x -> IO x
forall (m :: * -> *) a. MonadIO m => ServerErrorIO a -> m a
raiseErrors (ServerErrorIO x -> IO x)
-> (m x -> ServerErrorIO x) -> m x -> IO x
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m x -> ServerErrorIO x
forall a. m a -> ServerErrorIO a
f) RPCTy p
rpc
(\req :: Request
req -> (Proxy p -> Proxy rref -> r -> GRpcOWTy p rref r
forall snm (p :: GRpcMessageProtocol) (ref :: TypeRef snm) r.
GRpcOutputWrapper p ref r =>
Proxy p -> Proxy ref -> r -> GRpcOWTy p ref r
buildGRpcOWTy (Proxy p
forall k (t :: k). Proxy t
Proxy @p) (Proxy rref
forall k (t :: k). Proxy t
Proxy @rref) (r -> GRpcOWTy p rref r) -> m r -> m (GRpcOWTy p rref r)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>)
(m r -> m (GRpcOWTy p rref r))
-> (GRpcIWTy p vref v -> m r)
-> GRpcIWTy p vref v
-> m (GRpcOWTy p rref r)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Request -> v -> m r
h Request
req
(v -> m r) -> (GRpcIWTy p vref v -> v) -> GRpcIWTy p vref v -> m r
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Proxy p -> Proxy vref -> GRpcIWTy p vref v -> v
forall snm (p :: GRpcMessageProtocol) (ref :: TypeRef snm) r.
GRpcInputWrapper p ref r =>
Proxy p -> Proxy ref -> GRpcIWTy p ref r -> r
unGRpcIWTy (Proxy p
forall k (t :: k). Proxy t
Proxy @p) (Proxy vref
forall k (t :: k). Proxy t
Proxy @vref))
instance (GRpcInputWrapper p vref v, GRpcOutputWrapper p rref r, MonadIO m)
=> GRpcMethodHandler p m '[ 'ArgSingle aname vref ] ('RetStream rref)
(v -> ConduitT r Void m () -> m ()) where
gRpcMethodHandler :: (forall a. m a -> ServerErrorIO a)
-> Proxy p
-> Proxy '[ 'ArgSingle aname vref]
-> Proxy ('RetStream rref)
-> RPCTy p
-> (Request -> v -> ConduitT r Void m () -> m ())
-> ServiceHandler
gRpcMethodHandler f :: forall a. m a -> ServerErrorIO a
f _ _ _ rpc :: RPCTy p
rpc h :: Request -> v -> ConduitT r Void m () -> m ()
h
= (forall x. m x -> IO x)
-> RPCTy p
-> ServerStreamHandler m (GRpcIWTy p vref v) (GRpcOWTy p rref r) ()
-> ServiceHandler
forall (m :: * -> *) r i o a.
(MonadIO m, GRPCInput r i, GRPCOutput r o) =>
(forall x. m x -> IO x)
-> r -> ServerStreamHandler m i o a -> ServiceHandler
serverStream @m @_ @(GRpcIWTy p vref v) @(GRpcOWTy p rref r)
(ServerErrorIO x -> IO x
forall (m :: * -> *) a. MonadIO m => ServerErrorIO a -> m a
raiseErrors (ServerErrorIO x -> IO x)
-> (m x -> ServerErrorIO x) -> m x -> IO x
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m x -> ServerErrorIO x
forall a. m a -> ServerErrorIO a
f) RPCTy p
rpc ServerStreamHandler m (GRpcIWTy p vref v) (GRpcOWTy p rref r) ()
sstream
where sstream :: Request -> GRpcIWTy p vref v
-> m ((), ServerStream m (GRpcOWTy p rref r) ())
sstream :: ServerStreamHandler m (GRpcIWTy p vref v) (GRpcOWTy p rref r) ()
sstream req :: Request
req v :: GRpcIWTy p vref v
v = do
TMVar (Maybe r)
var <- IO (TMVar (Maybe r)) -> m (TMVar (Maybe r))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (TMVar (Maybe r))
forall a. IO (TMVar a)
newEmptyTMVarIO :: m (TMVar (Maybe r))
let v' :: v
v' = Proxy p -> Proxy vref -> GRpcIWTy p vref v -> v
forall snm (p :: GRpcMessageProtocol) (ref :: TypeRef snm) r.
GRpcInputWrapper p ref r =>
Proxy p -> Proxy ref -> GRpcIWTy p ref r -> r
unGRpcIWTy (Proxy p
forall k (t :: k). Proxy t
Proxy @p) (Proxy vref
forall k (t :: k). Proxy t
Proxy @vref) GRpcIWTy p vref v
v
Async ()
promise <- IO (Async ()) -> m (Async ())
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Async ()) -> m (Async ())) -> IO (Async ()) -> m (Async ())
forall a b. (a -> b) -> a -> b
$ IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async (ServerErrorIO () -> IO ()
forall (m :: * -> *) a. MonadIO m => ServerErrorIO a -> m a
raiseErrors (ServerErrorIO () -> IO ()) -> ServerErrorIO () -> IO ()
forall a b. (a -> b) -> a -> b
$ m () -> ServerErrorIO ()
forall a. m a -> ServerErrorIO a
f (Request -> v -> ConduitT r Void m () -> m ()
h Request
req v
v' (TMVar (Maybe r) -> ConduitT r Void m ()
forall (m :: * -> *) r.
MonadIO m =>
TMVar (Maybe r) -> ConduitT r Void m ()
toTMVarConduit TMVar (Maybe r)
var)))
let readNext :: () -> m (Maybe ((), GRpcOWTy p rref r))
readNext _
= do Maybe r
nextOutput <- IO (Maybe r) -> m (Maybe r)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe r) -> m (Maybe r)) -> IO (Maybe r) -> m (Maybe r)
forall a b. (a -> b) -> a -> b
$ STM (Maybe r) -> IO (Maybe r)
forall a. STM a -> IO a
atomically (STM (Maybe r) -> IO (Maybe r)) -> STM (Maybe r) -> IO (Maybe r)
forall a b. (a -> b) -> a -> b
$ TMVar (Maybe r) -> STM (Maybe r)
forall a. TMVar a -> STM a
takeTMVar TMVar (Maybe r)
var
case Maybe r
nextOutput of
Just o :: r
o -> Maybe ((), GRpcOWTy p rref r) -> m (Maybe ((), GRpcOWTy p rref r))
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe ((), GRpcOWTy p rref r)
-> m (Maybe ((), GRpcOWTy p rref r)))
-> Maybe ((), GRpcOWTy p rref r)
-> m (Maybe ((), GRpcOWTy p rref r))
forall a b. (a -> b) -> a -> b
$ ((), GRpcOWTy p rref r) -> Maybe ((), GRpcOWTy p rref r)
forall a. a -> Maybe a
Just ((), Proxy p -> Proxy rref -> r -> GRpcOWTy p rref r
forall snm (p :: GRpcMessageProtocol) (ref :: TypeRef snm) r.
GRpcOutputWrapper p ref r =>
Proxy p -> Proxy ref -> r -> GRpcOWTy p ref r
buildGRpcOWTy (Proxy p
forall k (t :: k). Proxy t
Proxy @p) (Proxy rref
forall k (t :: k). Proxy t
Proxy @rref) r
o)
Nothing -> do IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Async () -> IO ()
forall a. Async a -> IO ()
cancel Async ()
promise
Maybe ((), GRpcOWTy p rref r) -> m (Maybe ((), GRpcOWTy p rref r))
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe ((), GRpcOWTy p rref r)
forall a. Maybe a
Nothing
((), ServerStream m (GRpcOWTy p rref r) ())
-> m ((), ServerStream m (GRpcOWTy p rref r) ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure ((), (() -> m (Maybe ((), GRpcOWTy p rref r)))
-> ServerStream m (GRpcOWTy p rref r) ()
forall (m :: * -> *) o a.
(a -> m (Maybe (a, o))) -> ServerStream m o a
ServerStream () -> m (Maybe ((), GRpcOWTy p rref r))
readNext)
instance (MonadIO m, GRpcInputWrapper p vref v, GRPCOutput (RPCTy p) (), MonadIO m)
=> GRpcMethodHandler p m '[ 'ArgStream aname vref ] 'RetNothing
(ConduitT () v m () -> m ()) where
gRpcMethodHandler :: (forall a. m a -> ServerErrorIO a)
-> Proxy p
-> Proxy '[ 'ArgStream aname vref]
-> Proxy 'RetNothing
-> RPCTy p
-> (Request -> ConduitT () v m () -> m ())
-> ServiceHandler
gRpcMethodHandler f :: forall a. m a -> ServerErrorIO a
f _ _ _ rpc :: RPCTy p
rpc h :: Request -> ConduitT () v m () -> m ()
h
= (forall x. m x -> IO x)
-> RPCTy p
-> ClientStreamHandler m (GRpcIWTy p vref v) () ()
-> ServiceHandler
forall (m :: * -> *) r i o a.
(MonadIO m, GRPCInput r i, GRPCOutput r o) =>
(forall x. m x -> IO x)
-> r -> ClientStreamHandler m i o a -> ServiceHandler
clientStream @m @_ @(GRpcIWTy p vref v) @()
(ServerErrorIO x -> IO x
forall (m :: * -> *) a. MonadIO m => ServerErrorIO a -> m a
raiseErrors (ServerErrorIO x -> IO x)
-> (m x -> ServerErrorIO x) -> m x -> IO x
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m x -> ServerErrorIO x
forall a. m a -> ServerErrorIO a
f) RPCTy p
rpc ClientStreamHandler m (GRpcIWTy p vref v) () ()
cstream
where cstream :: Request
-> m ((), ClientStream m (GRpcIWTy p vref v) () ())
cstream :: ClientStreamHandler m (GRpcIWTy p vref v) () ()
cstream req :: Request
req = do
TMChan v
chan <- IO (TMChan v) -> m (TMChan v)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (TMChan v)
forall a. IO (TMChan a)
newTMChanIO :: m (TMChan v)
let producer :: ConduitT () v m ()
producer = TMChan v -> ConduitT () v m ()
forall (m :: * -> *) a. MonadIO m => TMChan a -> ConduitT () a m ()
sourceTMChan @m TMChan v
chan
Async ()
promise <- IO (Async ()) -> m (Async ())
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Async ()) -> m (Async ())) -> IO (Async ()) -> m (Async ())
forall a b. (a -> b) -> a -> b
$ IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async (ServerErrorIO () -> IO ()
forall (m :: * -> *) a. MonadIO m => ServerErrorIO a -> m a
raiseErrors (ServerErrorIO () -> IO ()) -> ServerErrorIO () -> IO ()
forall a b. (a -> b) -> a -> b
$ m () -> ServerErrorIO ()
forall a. m a -> ServerErrorIO a
f (Request -> ConduitT () v m () -> m ()
h Request
req ConduitT () v m ()
producer))
let cstreamHandler :: () -> GRpcIWTy p vref v -> m ()
cstreamHandler _ newInput :: GRpcIWTy p vref v
newInput
= IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$
TMChan v -> v -> STM ()
forall a. TMChan a -> a -> STM ()
writeTMChan TMChan v
chan (Proxy p -> Proxy vref -> GRpcIWTy p vref v -> v
forall snm (p :: GRpcMessageProtocol) (ref :: TypeRef snm) r.
GRpcInputWrapper p ref r =>
Proxy p -> Proxy ref -> GRpcIWTy p ref r -> r
unGRpcIWTy (Proxy p
forall k (t :: k). Proxy t
Proxy @p) (Proxy vref
forall k (t :: k). Proxy t
Proxy @vref) GRpcIWTy p vref v
newInput)
cstreamFinalizer :: () -> m ()
cstreamFinalizer _
= IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (TMChan v -> STM ()
forall a. TMChan a -> STM ()
closeTMChan TMChan v
chan) IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Async () -> IO ()
forall a. Async a -> IO a
wait Async ()
promise
((), ClientStream m (GRpcIWTy p vref v) () ())
-> m ((), ClientStream m (GRpcIWTy p vref v) () ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure ((), (() -> GRpcIWTy p vref v -> m ())
-> (() -> m ()) -> ClientStream m (GRpcIWTy p vref v) () ()
forall (m :: * -> *) i o a.
(a -> i -> m a) -> (a -> m o) -> ClientStream m i o a
ClientStream () -> GRpcIWTy p vref v -> m ()
cstreamHandler () -> m ()
cstreamFinalizer)
instance (MonadIO m, GRpcInputWrapper p vref v, GRpcOutputWrapper p rref r, MonadIO m)
=> GRpcMethodHandler p m '[ 'ArgStream aname vref ] ('RetSingle rref)
(ConduitT () v m () -> m r) where
gRpcMethodHandler :: (forall a. m a -> ServerErrorIO a)
-> Proxy p
-> Proxy '[ 'ArgStream aname vref]
-> Proxy ('RetSingle rref)
-> RPCTy p
-> (Request -> ConduitT () v m () -> m r)
-> ServiceHandler
gRpcMethodHandler f :: forall a. m a -> ServerErrorIO a
f _ _ _ rpc :: RPCTy p
rpc h :: Request -> ConduitT () v m () -> m r
h
= (forall x. m x -> IO x)
-> RPCTy p
-> ClientStreamHandler m (GRpcIWTy p vref v) (GRpcOWTy p rref r) ()
-> ServiceHandler
forall (m :: * -> *) r i o a.
(MonadIO m, GRPCInput r i, GRPCOutput r o) =>
(forall x. m x -> IO x)
-> r -> ClientStreamHandler m i o a -> ServiceHandler
clientStream @m @_ @(GRpcIWTy p vref v) @(GRpcOWTy p rref r)
(ServerErrorIO x -> IO x
forall (m :: * -> *) a. MonadIO m => ServerErrorIO a -> m a
raiseErrors (ServerErrorIO x -> IO x)
-> (m x -> ServerErrorIO x) -> m x -> IO x
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m x -> ServerErrorIO x
forall a. m a -> ServerErrorIO a
f) RPCTy p
rpc ClientStreamHandler m (GRpcIWTy p vref v) (GRpcOWTy p rref r) ()
cstream
where cstream :: Request
-> m ((), ClientStream m (GRpcIWTy p vref v)
(GRpcOWTy p rref r) ())
cstream :: ClientStreamHandler m (GRpcIWTy p vref v) (GRpcOWTy p rref r) ()
cstream req :: Request
req = do
TMChan v
chan <- IO (TMChan v) -> m (TMChan v)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (TMChan v)
forall a. IO (TMChan a)
newTMChanIO :: m (TMChan v)
let producer :: ConduitT () v m ()
producer = TMChan v -> ConduitT () v m ()
forall (m :: * -> *) a. MonadIO m => TMChan a -> ConduitT () a m ()
sourceTMChan @m TMChan v
chan
Async (GRpcOWTy p rref r)
promise <- IO (Async (GRpcOWTy p rref r)) -> m (Async (GRpcOWTy p rref r))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Async (GRpcOWTy p rref r)) -> m (Async (GRpcOWTy p rref r)))
-> IO (Async (GRpcOWTy p rref r)) -> m (Async (GRpcOWTy p rref r))
forall a b. (a -> b) -> a -> b
$ IO (GRpcOWTy p rref r) -> IO (Async (GRpcOWTy p rref r))
forall a. IO a -> IO (Async a)
async
(ServerErrorIO (GRpcOWTy p rref r) -> IO (GRpcOWTy p rref r)
forall (m :: * -> *) a. MonadIO m => ServerErrorIO a -> m a
raiseErrors
(ServerErrorIO (GRpcOWTy p rref r) -> IO (GRpcOWTy p rref r))
-> ServerErrorIO (GRpcOWTy p rref r) -> IO (GRpcOWTy p rref r)
forall a b. (a -> b) -> a -> b
$ Proxy p -> Proxy rref -> r -> GRpcOWTy p rref r
forall snm (p :: GRpcMessageProtocol) (ref :: TypeRef snm) r.
GRpcOutputWrapper p ref r =>
Proxy p -> Proxy ref -> r -> GRpcOWTy p ref r
buildGRpcOWTy (Proxy p
forall k (t :: k). Proxy t
Proxy @p) (Proxy rref
forall k (t :: k). Proxy t
Proxy @rref)
(r -> GRpcOWTy p rref r)
-> ExceptT ServerError IO r -> ServerErrorIO (GRpcOWTy p rref r)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> m r -> ExceptT ServerError IO r
forall a. m a -> ServerErrorIO a
f (Request -> ConduitT () v m () -> m r
h Request
req ConduitT () v m ()
producer))
let cstreamHandler :: () -> GRpcIWTy p vref v -> m ()
cstreamHandler _ newInput :: GRpcIWTy p vref v
newInput
= IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$
TMChan v -> v -> STM ()
forall a. TMChan a -> a -> STM ()
writeTMChan TMChan v
chan (Proxy p -> Proxy vref -> GRpcIWTy p vref v -> v
forall snm (p :: GRpcMessageProtocol) (ref :: TypeRef snm) r.
GRpcInputWrapper p ref r =>
Proxy p -> Proxy ref -> GRpcIWTy p ref r -> r
unGRpcIWTy (Proxy p
forall k (t :: k). Proxy t
Proxy @p) (Proxy vref
forall k (t :: k). Proxy t
Proxy @vref) GRpcIWTy p vref v
newInput)
cstreamFinalizer :: () -> m (GRpcOWTy p rref r)
cstreamFinalizer _
= IO (GRpcOWTy p rref r) -> m (GRpcOWTy p rref r)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (GRpcOWTy p rref r) -> m (GRpcOWTy p rref r))
-> IO (GRpcOWTy p rref r) -> m (GRpcOWTy p rref r)
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (TMChan v -> STM ()
forall a. TMChan a -> STM ()
closeTMChan TMChan v
chan) IO () -> IO (GRpcOWTy p rref r) -> IO (GRpcOWTy p rref r)
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Async (GRpcOWTy p rref r) -> IO (GRpcOWTy p rref r)
forall a. Async a -> IO a
wait Async (GRpcOWTy p rref r)
promise
((), ClientStream m (GRpcIWTy p vref v) (GRpcOWTy p rref r) ())
-> m ((),
ClientStream m (GRpcIWTy p vref v) (GRpcOWTy p rref r) ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure ((), (() -> GRpcIWTy p vref v -> m ())
-> (() -> m (GRpcOWTy p rref r))
-> ClientStream m (GRpcIWTy p vref v) (GRpcOWTy p rref r) ()
forall (m :: * -> *) i o a.
(a -> i -> m a) -> (a -> m o) -> ClientStream m i o a
ClientStream () -> GRpcIWTy p vref v -> m ()
cstreamHandler () -> m (GRpcOWTy p rref r)
cstreamFinalizer)
instance (GRpcInputWrapper p vref v, GRpcOutputWrapper p rref r, MonadIO m)
=> GRpcMethodHandler p m '[ 'ArgStream aname vref ] ('RetStream rref)
(ConduitT () v m () -> ConduitT r Void m () -> m ()) where
gRpcMethodHandler :: (forall a. m a -> ServerErrorIO a)
-> Proxy p
-> Proxy '[ 'ArgStream aname vref]
-> Proxy ('RetStream rref)
-> RPCTy p
-> (Request -> ConduitT () v m () -> ConduitT r Void m () -> m ())
-> ServiceHandler
gRpcMethodHandler f :: forall a. m a -> ServerErrorIO a
f _ _ _ rpc :: RPCTy p
rpc h :: Request -> ConduitT () v m () -> ConduitT r Void m () -> m ()
h
= (forall x. m x -> IO x)
-> RPCTy p
-> GeneralStreamHandler
m (GRpcIWTy p vref v) (GRpcOWTy p rref r) () ()
-> ServiceHandler
forall (m :: * -> *) r i o a b.
(MonadIO m, GRPCInput r i, GRPCOutput r o) =>
(forall x. m x -> IO x)
-> r -> GeneralStreamHandler m i o a b -> ServiceHandler
generalStream @m @_ @(GRpcIWTy p vref v) @(GRpcOWTy p rref r)
(ServerErrorIO x -> IO x
forall (m :: * -> *) a. MonadIO m => ServerErrorIO a -> m a
raiseErrors (ServerErrorIO x -> IO x)
-> (m x -> ServerErrorIO x) -> m x -> IO x
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m x -> ServerErrorIO x
forall a. m a -> ServerErrorIO a
f) RPCTy p
rpc GeneralStreamHandler
m (GRpcIWTy p vref v) (GRpcOWTy p rref r) () ()
bdstream
where bdstream :: Request
-> m ( (), IncomingStream m (GRpcIWTy p vref v) ()
, (), OutgoingStream m (GRpcOWTy p rref r) () )
bdstream :: GeneralStreamHandler
m (GRpcIWTy p vref v) (GRpcOWTy p rref r) () ()
bdstream req :: Request
req = do
TMChan v
chan <- IO (TMChan v) -> m (TMChan v)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (TMChan v)
forall a. IO (TMChan a)
newTMChanIO :: m (TMChan v)
let producer :: ConduitT () v m ()
producer = TMChan v -> ConduitT () v m ()
forall (m :: * -> *) a. MonadIO m => TMChan a -> ConduitT () a m ()
sourceTMChan @m TMChan v
chan
TMVar (Maybe r)
var <- IO (TMVar (Maybe r)) -> m (TMVar (Maybe r))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (TMVar (Maybe r))
forall a. IO (TMVar a)
newEmptyTMVarIO :: m (TMVar (Maybe r))
Async ()
promise <- IO (Async ()) -> m (Async ())
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Async ()) -> m (Async ())) -> IO (Async ()) -> m (Async ())
forall a b. (a -> b) -> a -> b
$ IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async
(ServerErrorIO () -> IO ()
forall (m :: * -> *) a. MonadIO m => ServerErrorIO a -> m a
raiseErrors (ServerErrorIO () -> IO ()) -> ServerErrorIO () -> IO ()
forall a b. (a -> b) -> a -> b
$ m () -> ServerErrorIO ()
forall a. m a -> ServerErrorIO a
f (m () -> ServerErrorIO ()) -> m () -> ServerErrorIO ()
forall a b. (a -> b) -> a -> b
$ Request -> ConduitT () v m () -> ConduitT r Void m () -> m ()
h Request
req ConduitT () v m ()
producer (TMVar (Maybe r) -> ConduitT r Void m ()
forall (m :: * -> *) r.
MonadIO m =>
TMVar (Maybe r) -> ConduitT r Void m ()
toTMVarConduit TMVar (Maybe r)
var))
let cstreamHandler :: () -> GRpcIWTy p vref v -> m ()
cstreamHandler _ newInput :: GRpcIWTy p vref v
newInput
= IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$
TMChan v -> v -> STM ()
forall a. TMChan a -> a -> STM ()
writeTMChan TMChan v
chan (Proxy p -> Proxy vref -> GRpcIWTy p vref v -> v
forall snm (p :: GRpcMessageProtocol) (ref :: TypeRef snm) r.
GRpcInputWrapper p ref r =>
Proxy p -> Proxy ref -> GRpcIWTy p ref r -> r
unGRpcIWTy (Proxy p
forall k (t :: k). Proxy t
Proxy @p) (Proxy vref
forall k (t :: k). Proxy t
Proxy @vref) GRpcIWTy p vref v
newInput)
cstreamFinalizer :: () -> m ()
cstreamFinalizer _
= IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (TMChan v -> STM ()
forall a. TMChan a -> STM ()
closeTMChan TMChan v
chan) IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Async () -> IO ()
forall a. Async a -> IO a
wait Async ()
promise
readNext :: () -> m (Maybe ((), GRpcOWTy p rref r))
readNext _
= do Maybe (Maybe r)
nextOutput <- IO (Maybe (Maybe r)) -> m (Maybe (Maybe r))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe (Maybe r)) -> m (Maybe (Maybe r)))
-> IO (Maybe (Maybe r)) -> m (Maybe (Maybe r))
forall a b. (a -> b) -> a -> b
$ STM (Maybe (Maybe r)) -> IO (Maybe (Maybe r))
forall a. STM a -> IO a
atomically (STM (Maybe (Maybe r)) -> IO (Maybe (Maybe r)))
-> STM (Maybe (Maybe r)) -> IO (Maybe (Maybe r))
forall a b. (a -> b) -> a -> b
$ TMVar (Maybe r) -> STM (Maybe (Maybe r))
forall a. TMVar a -> STM (Maybe a)
tryTakeTMVar TMVar (Maybe r)
var
case Maybe (Maybe r)
nextOutput of
Just (Just o :: r
o) ->
Maybe ((), GRpcOWTy p rref r) -> m (Maybe ((), GRpcOWTy p rref r))
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe ((), GRpcOWTy p rref r)
-> m (Maybe ((), GRpcOWTy p rref r)))
-> Maybe ((), GRpcOWTy p rref r)
-> m (Maybe ((), GRpcOWTy p rref r))
forall a b. (a -> b) -> a -> b
$ ((), GRpcOWTy p rref r) -> Maybe ((), GRpcOWTy p rref r)
forall a. a -> Maybe a
Just ((), Proxy p -> Proxy rref -> r -> GRpcOWTy p rref r
forall snm (p :: GRpcMessageProtocol) (ref :: TypeRef snm) r.
GRpcOutputWrapper p ref r =>
Proxy p -> Proxy ref -> r -> GRpcOWTy p ref r
buildGRpcOWTy (Proxy p
forall k (t :: k). Proxy t
Proxy @p) (Proxy rref
forall k (t :: k). Proxy t
Proxy @rref) r
o)
Just Nothing -> do
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Async () -> IO ()
forall a. Async a -> IO ()
cancel Async ()
promise
Maybe ((), GRpcOWTy p rref r) -> m (Maybe ((), GRpcOWTy p rref r))
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe ((), GRpcOWTy p rref r)
forall a. Maybe a
Nothing
Nothing ->
() -> m (Maybe ((), GRpcOWTy p rref r))
readNext ()
((), IncomingStream m (GRpcIWTy p vref v) (), (),
OutgoingStream m (GRpcOWTy p rref r) ())
-> m ((), IncomingStream m (GRpcIWTy p vref v) (), (),
OutgoingStream m (GRpcOWTy p rref r) ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure ((), (() -> GRpcIWTy p vref v -> m ())
-> (() -> m ()) -> IncomingStream m (GRpcIWTy p vref v) ()
forall (m :: * -> *) i a.
(a -> i -> m a) -> (a -> m ()) -> IncomingStream m i a
IncomingStream () -> GRpcIWTy p vref v -> m ()
cstreamHandler () -> m ()
cstreamFinalizer, (), (() -> m (Maybe ((), GRpcOWTy p rref r)))
-> OutgoingStream m (GRpcOWTy p rref r) ()
forall (m :: * -> *) o a.
(a -> m (Maybe (a, o))) -> OutgoingStream m o a
OutgoingStream () -> m (Maybe ((), GRpcOWTy p rref r))
readNext)
toTMVarConduit :: MonadIO m => TMVar (Maybe r) -> ConduitT r Void m ()
toTMVarConduit :: TMVar (Maybe r) -> ConduitT r Void m ()
toTMVarConduit var :: TMVar (Maybe r)
var = do
Maybe r
x <- ConduitT r Void m (Maybe r)
forall (m :: * -> *) i. Monad m => Consumer i m (Maybe i)
await
IO () -> ConduitT r Void m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ConduitT r Void m ()) -> IO () -> ConduitT r Void m ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMVar (Maybe r) -> Maybe r -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar TMVar (Maybe r)
var Maybe r
x
TMVar (Maybe r) -> ConduitT r Void m ()
forall (m :: * -> *) r.
MonadIO m =>
TMVar (Maybe r) -> ConduitT r Void m ()
toTMVarConduit TMVar (Maybe r)
var