{-# language DataKinds #-}
{-# language FlexibleContexts #-}
{-# language FlexibleInstances #-}
{-# language GADTs #-}
{-# language MultiParamTypeClasses #-}
{-# language PolyKinds #-}
{-# language RankNTypes #-}
{-# language ScopedTypeVariables #-}
{-# language TypeApplications #-}
{-# language TypeFamilies #-}
{-# language TypeOperators #-}
{-# language UndecidableInstances #-}
module Mu.GRpc.Server
(
GRpcMessageProtocol(..)
, msgProtoBuf, msgAvro
, runGRpcApp, runGRpcAppTrans
, runGRpcAppSettings, Settings
, runGRpcAppTLS, TLSSettings
, gRpcApp, gRpcAppTrans
, raiseErrors, liftServerConduit
, module Avro
) where
import Control.Concurrent.Async
import Control.Concurrent.STM (atomically)
import Control.Concurrent.STM.TMVar
import Control.Exception
import Control.Monad.Except
import Data.Avro
import Data.ByteString (ByteString)
import qualified Data.ByteString.Char8 as BS
import Data.Conduit
import Data.Conduit.TMChan
import Data.Kind
import Data.Proxy
import Network.GRPC.HTTP2.Encoding (GRPCInput, GRPCOutput, gzip, uncompressed)
import Network.GRPC.HTTP2.Types (GRPCStatus (..), GRPCStatusCode (..))
import Network.GRPC.Server.Handlers.Trans
import Network.GRPC.Server.Wai as Wai
import Network.Wai (Application)
import Network.Wai.Handler.Warp (Port, Settings, run, runSettings)
import Network.Wai.Handler.WarpTLS (TLSSettings, runTLS)
import Mu.Adapter.ProtoBuf.Via
import Mu.GRpc.Avro
import qualified Mu.GRpc.Avro as Avro
import Mu.GRpc.Bridge
import Mu.Rpc
import Mu.Schema
import Mu.Server
runGRpcApp
:: ( KnownName name, GRpcServiceHandlers protocol ServerErrorIO chn services handlers )
=> Proxy protocol
-> Port
-> ServerT chn ('Package ('Just name) services) ServerErrorIO handlers
-> IO ()
runGRpcApp :: Proxy protocol
-> Port
-> ServerT
chn ('Package ('Just name) services) ServerErrorIO handlers
-> IO ()
runGRpcApp protocol :: Proxy protocol
protocol port :: Port
port = Proxy protocol
-> Port
-> (forall a. ServerErrorIO a -> ServerErrorIO a)
-> ServerT
chn ('Package ('Just name) services) ServerErrorIO handlers
-> IO ()
forall serviceName mnm anm (name :: serviceName)
(protocol :: GRpcMessageProtocol) (m :: * -> *)
(chn :: ServiceChain serviceName)
(services :: [Service serviceName mnm anm]) (handlers :: [[*]]).
(KnownName name,
GRpcServiceHandlers protocol m chn services handlers) =>
Proxy protocol
-> Port
-> (forall a. m a -> ServerErrorIO a)
-> ServerT chn ('Package ('Just name) services) m handlers
-> IO ()
runGRpcAppTrans Proxy protocol
protocol Port
port forall a. a -> a
forall a. ServerErrorIO a -> ServerErrorIO a
id
runGRpcAppTrans
:: ( KnownName name, GRpcServiceHandlers protocol m chn services handlers )
=> Proxy protocol
-> Port
-> (forall a. m a -> ServerErrorIO a)
-> ServerT chn ('Package ('Just name) services) m handlers
-> IO ()
runGRpcAppTrans :: Proxy protocol
-> Port
-> (forall a. m a -> ServerErrorIO a)
-> ServerT chn ('Package ('Just name) services) m handlers
-> IO ()
runGRpcAppTrans protocol :: Proxy protocol
protocol port :: Port
port f :: forall a. m a -> ServerErrorIO a
f svr :: ServerT chn ('Package ('Just name) services) m handlers
svr = Port -> Application -> IO ()
run Port
port (Proxy protocol
-> (forall a. m a -> ServerErrorIO a)
-> ServerT chn ('Package ('Just name) services) m handlers
-> Application
forall serviceName mnm anm (name :: serviceName)
(protocol :: GRpcMessageProtocol) (m :: * -> *)
(chn :: ServiceChain serviceName)
(services :: [Service serviceName mnm anm]) (handlers :: [[*]]).
(KnownName name,
GRpcServiceHandlers protocol m chn services handlers) =>
Proxy protocol
-> (forall a. m a -> ServerErrorIO a)
-> ServerT chn ('Package ('Just name) services) m handlers
-> Application
gRpcAppTrans Proxy protocol
protocol forall a. m a -> ServerErrorIO a
f ServerT chn ('Package ('Just name) services) m handlers
svr)
runGRpcAppSettings
:: ( KnownName name, GRpcServiceHandlers protocol m chn services handlers )
=> Proxy protocol
-> Settings
-> (forall a. m a -> ServerErrorIO a)
-> ServerT chn ('Package ('Just name) services) m handlers
-> IO ()
runGRpcAppSettings :: Proxy protocol
-> Settings
-> (forall a. m a -> ServerErrorIO a)
-> ServerT chn ('Package ('Just name) services) m handlers
-> IO ()
runGRpcAppSettings protocol :: Proxy protocol
protocol st :: Settings
st f :: forall a. m a -> ServerErrorIO a
f svr :: ServerT chn ('Package ('Just name) services) m handlers
svr = Settings -> Application -> IO ()
runSettings Settings
st (Proxy protocol
-> (forall a. m a -> ServerErrorIO a)
-> ServerT chn ('Package ('Just name) services) m handlers
-> Application
forall serviceName mnm anm (name :: serviceName)
(protocol :: GRpcMessageProtocol) (m :: * -> *)
(chn :: ServiceChain serviceName)
(services :: [Service serviceName mnm anm]) (handlers :: [[*]]).
(KnownName name,
GRpcServiceHandlers protocol m chn services handlers) =>
Proxy protocol
-> (forall a. m a -> ServerErrorIO a)
-> ServerT chn ('Package ('Just name) services) m handlers
-> Application
gRpcAppTrans Proxy protocol
protocol forall a. m a -> ServerErrorIO a
f ServerT chn ('Package ('Just name) services) m handlers
svr)
runGRpcAppTLS
:: ( KnownName name, GRpcServiceHandlers protocol m chn services handlers )
=> Proxy protocol
-> TLSSettings -> Settings
-> (forall a. m a -> ServerErrorIO a)
-> ServerT chn ('Package ('Just name) services) m handlers
-> IO ()
runGRpcAppTLS :: Proxy protocol
-> TLSSettings
-> Settings
-> (forall a. m a -> ServerErrorIO a)
-> ServerT chn ('Package ('Just name) services) m handlers
-> IO ()
runGRpcAppTLS protocol :: Proxy protocol
protocol tls :: TLSSettings
tls st :: Settings
st f :: forall a. m a -> ServerErrorIO a
f svr :: ServerT chn ('Package ('Just name) services) m handlers
svr = TLSSettings -> Settings -> Application -> IO ()
runTLS TLSSettings
tls Settings
st (Proxy protocol
-> (forall a. m a -> ServerErrorIO a)
-> ServerT chn ('Package ('Just name) services) m handlers
-> Application
forall serviceName mnm anm (name :: serviceName)
(protocol :: GRpcMessageProtocol) (m :: * -> *)
(chn :: ServiceChain serviceName)
(services :: [Service serviceName mnm anm]) (handlers :: [[*]]).
(KnownName name,
GRpcServiceHandlers protocol m chn services handlers) =>
Proxy protocol
-> (forall a. m a -> ServerErrorIO a)
-> ServerT chn ('Package ('Just name) services) m handlers
-> Application
gRpcAppTrans Proxy protocol
protocol forall a. m a -> ServerErrorIO a
f ServerT chn ('Package ('Just name) services) m handlers
svr)
gRpcApp
:: ( KnownName name, GRpcServiceHandlers protocol ServerErrorIO chn services handlers )
=> Proxy protocol
-> ServerT chn ('Package ('Just name) services) ServerErrorIO handlers
-> Application
gRpcApp :: Proxy protocol
-> ServerT
chn ('Package ('Just name) services) ServerErrorIO handlers
-> Application
gRpcApp protocol :: Proxy protocol
protocol = Proxy protocol
-> (forall a. ServerErrorIO a -> ServerErrorIO a)
-> ServerT
chn ('Package ('Just name) services) ServerErrorIO handlers
-> Application
forall serviceName mnm anm (name :: serviceName)
(protocol :: GRpcMessageProtocol) (m :: * -> *)
(chn :: ServiceChain serviceName)
(services :: [Service serviceName mnm anm]) (handlers :: [[*]]).
(KnownName name,
GRpcServiceHandlers protocol m chn services handlers) =>
Proxy protocol
-> (forall a. m a -> ServerErrorIO a)
-> ServerT chn ('Package ('Just name) services) m handlers
-> Application
gRpcAppTrans Proxy protocol
protocol forall a. a -> a
forall a. ServerErrorIO a -> ServerErrorIO a
id
gRpcAppTrans
:: ( KnownName name, GRpcServiceHandlers protocol m chn services handlers )
=> Proxy protocol
-> (forall a. m a -> ServerErrorIO a)
-> ServerT chn ('Package ('Just name) services) m handlers
-> Application
gRpcAppTrans :: Proxy protocol
-> (forall a. m a -> ServerErrorIO a)
-> ServerT chn ('Package ('Just name) services) m handlers
-> Application
gRpcAppTrans protocol :: Proxy protocol
protocol f :: forall a. m a -> ServerErrorIO a
f svr :: ServerT chn ('Package ('Just name) services) m handlers
svr
= [Compression] -> [ServiceHandler] -> Application
Wai.grpcApp [Compression
uncompressed, Compression
gzip]
(Proxy protocol
-> (forall a. m a -> ServerErrorIO a)
-> ServerT chn ('Package ('Just name) services) m handlers
-> [ServiceHandler]
forall serviceName mnm anm (name :: serviceName)
(services :: [Service serviceName mnm anm]) (handlers :: [[*]])
(m :: * -> *) (protocol :: GRpcMessageProtocol)
(chn :: ServiceChain serviceName).
(KnownName name,
GRpcServiceHandlers protocol m chn services handlers) =>
Proxy protocol
-> (forall a. m a -> ServerErrorIO a)
-> ServerT chn ('Package ('Just name) services) m handlers
-> [ServiceHandler]
gRpcServerHandlers Proxy protocol
protocol forall a. m a -> ServerErrorIO a
f ServerT chn ('Package ('Just name) services) m handlers
svr)
gRpcServerHandlers
:: forall name services handlers m protocol chn.
( KnownName name, GRpcServiceHandlers protocol m chn services handlers )
=> Proxy protocol
-> (forall a. m a -> ServerErrorIO a)
-> ServerT chn ('Package ('Just name) services) m handlers
-> [ServiceHandler]
gRpcServerHandlers :: Proxy protocol
-> (forall a. m a -> ServerErrorIO a)
-> ServerT chn ('Package ('Just name) services) m handlers
-> [ServiceHandler]
gRpcServerHandlers pr :: Proxy protocol
pr f :: forall a. m a -> ServerErrorIO a
f (Services svr :: ServicesT chn s1 m handlers
svr) = (forall a. m a -> ServerErrorIO a)
-> Proxy protocol
-> ByteString
-> ServicesT chn s1 m handlers
-> [ServiceHandler]
forall snm mnm anm (p :: GRpcMessageProtocol) (m :: * -> *)
(chn :: ServiceChain snm) (ss :: [Service snm mnm anm])
(hs :: [[*]]).
GRpcServiceHandlers p m chn ss hs =>
(forall a. m a -> ServerErrorIO a)
-> Proxy p
-> ByteString
-> ServicesT chn ss m hs
-> [ServiceHandler]
gRpcServiceHandlers forall a. m a -> ServerErrorIO a
f Proxy protocol
pr ByteString
packageName ServicesT chn s1 m handlers
svr
where packageName :: ByteString
packageName = String -> ByteString
BS.pack (Proxy name -> String
forall k (a :: k) (proxy :: k -> *).
KnownName a =>
proxy a -> String
nameVal (Proxy name
forall k (t :: k). Proxy t
Proxy @name))
class GRpcServiceHandlers (p :: GRpcMessageProtocol) (m :: Type -> Type)
(chn :: ServiceChain snm)
(ss :: [Service snm mnm anm]) (hs :: [[Type]]) where
gRpcServiceHandlers :: (forall a. m a -> ServerErrorIO a)
-> Proxy p -> ByteString
-> ServicesT chn ss m hs -> [ServiceHandler]
instance GRpcServiceHandlers p m chn '[] '[] where
gRpcServiceHandlers :: (forall a. m a -> ServerErrorIO a)
-> Proxy p
-> ByteString
-> ServicesT chn '[] m '[]
-> [ServiceHandler]
gRpcServiceHandlers _ _ _ S0 = []
instance ( KnownName name, GRpcMethodHandlers p m chn (MappingRight chn name) methods h
, GRpcServiceHandlers p m chn rest hs )
=> GRpcServiceHandlers p m chn ('Service name anns methods ': rest) (h ': hs) where
gRpcServiceHandlers :: (forall a. m a -> ServerErrorIO a)
-> Proxy p
-> ByteString
-> ServicesT chn ('Service name anns methods : rest) m (h : hs)
-> [ServiceHandler]
gRpcServiceHandlers f :: forall a. m a -> ServerErrorIO a
f pr :: Proxy p
pr packageName :: ByteString
packageName (svr :: HandlersT chn (MappingRight chn sname) methods m hs1
svr :<&>: rest :: ServicesT chn rest m hss
rest)
= (forall a. m a -> ServerErrorIO a)
-> Proxy p
-> ByteString
-> ByteString
-> HandlersT chn (MappingRight chn name) methods m hs1
-> [ServiceHandler]
forall snm mnm anm (p :: GRpcMessageProtocol) (m :: * -> *)
(chn :: ServiceChain snm) inh (ms :: [Method snm mnm anm])
(hs :: [*]).
GRpcMethodHandlers p m chn inh ms hs =>
(forall a. m a -> ServerErrorIO a)
-> Proxy p
-> ByteString
-> ByteString
-> HandlersT chn inh ms m hs
-> [ServiceHandler]
gRpcMethodHandlers forall a. m a -> ServerErrorIO a
f Proxy p
pr ByteString
packageName ByteString
serviceName HandlersT chn (MappingRight chn name) methods m hs1
HandlersT chn (MappingRight chn sname) methods m hs1
svr
[ServiceHandler] -> [ServiceHandler] -> [ServiceHandler]
forall a. [a] -> [a] -> [a]
++ (forall a. m a -> ServerErrorIO a)
-> Proxy p
-> ByteString
-> ServicesT chn rest m hss
-> [ServiceHandler]
forall snm mnm anm (p :: GRpcMessageProtocol) (m :: * -> *)
(chn :: ServiceChain snm) (ss :: [Service snm mnm anm])
(hs :: [[*]]).
GRpcServiceHandlers p m chn ss hs =>
(forall a. m a -> ServerErrorIO a)
-> Proxy p
-> ByteString
-> ServicesT chn ss m hs
-> [ServiceHandler]
gRpcServiceHandlers forall a. m a -> ServerErrorIO a
f Proxy p
pr ByteString
packageName ServicesT chn rest m hss
rest
where serviceName :: ByteString
serviceName = String -> ByteString
BS.pack (Proxy name -> String
forall k (a :: k) (proxy :: k -> *).
KnownName a =>
proxy a -> String
nameVal (Proxy name
forall k (t :: k). Proxy t
Proxy @name))
class GRpcMethodHandlers (p :: GRpcMessageProtocol) (m :: Type -> Type)
(chn :: ServiceChain snm) (inh :: Type)
(ms :: [Method snm mnm anm]) (hs :: [Type]) where
gRpcMethodHandlers :: (forall a. m a -> ServerErrorIO a)
-> Proxy p -> ByteString -> ByteString
-> HandlersT chn inh ms m hs -> [ServiceHandler]
instance GRpcMethodHandlers p m chn inh '[] '[] where
gRpcMethodHandlers :: (forall a. m a -> ServerErrorIO a)
-> Proxy p
-> ByteString
-> ByteString
-> HandlersT chn inh '[] m '[]
-> [ServiceHandler]
gRpcMethodHandlers _ _ _ _ H0 = []
instance ( KnownName name, MkRPC p
, GRpcMethodHandler p m args r h
, GRpcMethodHandlers p m chn () rest hs)
=> GRpcMethodHandlers p m chn () ('Method name anns args r ': rest) (h ': hs) where
gRpcMethodHandlers :: (forall a. m a -> ServerErrorIO a)
-> Proxy p
-> ByteString
-> ByteString
-> HandlersT chn () ('Method name anns args r : rest) m (h : hs)
-> [ServiceHandler]
gRpcMethodHandlers f :: forall a. m a -> ServerErrorIO a
f pr :: Proxy p
pr p :: ByteString
p s :: ByteString
s (h :: () -> h
h :<||>: rest :: HandlersT chn () ms m hs1
rest)
= (forall a. m a -> ServerErrorIO a)
-> Proxy p
-> Proxy args
-> Proxy r
-> RPCTy p
-> h
-> ServiceHandler
forall k k (p :: GRpcMessageProtocol) (m :: * -> *) (args :: k)
(r :: k) h.
GRpcMethodHandler p m args r h =>
(forall a. m a -> ServerErrorIO a)
-> Proxy p
-> Proxy args
-> Proxy r
-> RPCTy p
-> h
-> ServiceHandler
gRpcMethodHandler forall a. m a -> ServerErrorIO a
f Proxy p
pr (Proxy args
forall k (t :: k). Proxy t
Proxy @args) (Proxy r
forall k (t :: k). Proxy t
Proxy @r) (Proxy p -> ByteString -> ByteString -> ByteString -> RPCTy p
forall (p :: GRpcMessageProtocol).
MkRPC p =>
Proxy p -> ByteString -> ByteString -> ByteString -> RPCTy p
mkRPC Proxy p
pr ByteString
p ByteString
s ByteString
methodName) (() -> h
h ())
ServiceHandler -> [ServiceHandler] -> [ServiceHandler]
forall a. a -> [a] -> [a]
: (forall a. m a -> ServerErrorIO a)
-> Proxy p
-> ByteString
-> ByteString
-> HandlersT chn () ms m hs1
-> [ServiceHandler]
forall snm mnm anm (p :: GRpcMessageProtocol) (m :: * -> *)
(chn :: ServiceChain snm) inh (ms :: [Method snm mnm anm])
(hs :: [*]).
GRpcMethodHandlers p m chn inh ms hs =>
(forall a. m a -> ServerErrorIO a)
-> Proxy p
-> ByteString
-> ByteString
-> HandlersT chn inh ms m hs
-> [ServiceHandler]
gRpcMethodHandlers forall a. m a -> ServerErrorIO a
f Proxy p
pr ByteString
p ByteString
s HandlersT chn () ms m hs1
rest
where methodName :: ByteString
methodName = String -> ByteString
BS.pack (Proxy name -> String
forall k (a :: k) (proxy :: k -> *).
KnownName a =>
proxy a -> String
nameVal (Proxy name
forall k (t :: k). Proxy t
Proxy @name))
class GRpcMethodHandler p m args r h where
gRpcMethodHandler :: (forall a. m a -> ServerErrorIO a)
-> Proxy p -> Proxy args -> Proxy r
-> RPCTy p -> h -> ServiceHandler
liftServerConduit
:: MonadIO m
=> ConduitT a b ServerErrorIO r -> ConduitT a b m r
liftServerConduit :: ConduitT a b ServerErrorIO r -> ConduitT a b m r
liftServerConduit = (forall a. ServerErrorIO a -> m a)
-> ConduitT a b ServerErrorIO r -> ConduitT a b m r
forall (m :: * -> *) (n :: * -> *) i o r.
Monad m =>
(forall a. m a -> n a) -> ConduitT i o m r -> ConduitT i o n r
transPipe forall a. ServerErrorIO a -> m a
forall (m :: * -> *) a. MonadIO m => ServerErrorIO a -> m a
raiseErrors
raiseErrors :: MonadIO m => ServerErrorIO a -> m a
raiseErrors :: ServerErrorIO a -> m a
raiseErrors h :: ServerErrorIO a
h
= IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO a -> m a) -> IO a -> m a
forall a b. (a -> b) -> a -> b
$ do
Either ServerError a
h' <- ServerErrorIO a -> IO (Either ServerError a)
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT ServerErrorIO a
h
case Either ServerError a
h' of
Right r :: a
r -> a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure a
r
Left (ServerError code :: ServerErrorCode
code msg :: String
msg)
-> GRPCStatus -> IO a
forall (m :: * -> *) a. MonadIO m => GRPCStatus -> m a
closeEarly (GRPCStatus -> IO a) -> GRPCStatus -> IO a
forall a b. (a -> b) -> a -> b
$ GRPCStatusCode -> ByteString -> GRPCStatus
GRPCStatus (ServerErrorCode -> GRPCStatusCode
serverErrorToGRpcError ServerErrorCode
code)
(String -> ByteString
BS.pack String
msg)
IO a -> [Handler a] -> IO a
forall a. IO a -> [Handler a] -> IO a
`catches`
[ (GRPCStatus -> IO a) -> Handler a
forall a e. Exception e => (e -> IO a) -> Handler a
Handler (\(GRPCStatus
e :: GRPCStatus) -> GRPCStatus -> IO a
forall e a. Exception e => e -> IO a
throwIO GRPCStatus
e)
, (SomeException -> IO a) -> Handler a
forall a e. Exception e => (e -> IO a) -> Handler a
Handler (\(SomeException
e :: SomeException) -> GRPCStatus -> IO a
forall (m :: * -> *) a. MonadIO m => GRPCStatus -> m a
closeEarly (GRPCStatus -> IO a) -> GRPCStatus -> IO a
forall a b. (a -> b) -> a -> b
$ GRPCStatusCode -> ByteString -> GRPCStatus
GRPCStatus GRPCStatusCode
INTERNAL (String -> ByteString
BS.pack (String -> ByteString) -> String -> ByteString
forall a b. (a -> b) -> a -> b
$ SomeException -> String
forall a. Show a => a -> String
show SomeException
e))
]
where
serverErrorToGRpcError :: ServerErrorCode -> GRPCStatusCode
serverErrorToGRpcError :: ServerErrorCode -> GRPCStatusCode
serverErrorToGRpcError Unknown = GRPCStatusCode
UNKNOWN
serverErrorToGRpcError Unavailable = GRPCStatusCode
UNAVAILABLE
serverErrorToGRpcError Unimplemented = GRPCStatusCode
UNIMPLEMENTED
serverErrorToGRpcError Unauthenticated = GRPCStatusCode
UNAUTHENTICATED
serverErrorToGRpcError Internal = GRPCStatusCode
INTERNAL
serverErrorToGRpcError NotFound = GRPCStatusCode
NOT_FOUND
serverErrorToGRpcError Invalid = GRPCStatusCode
INVALID_ARGUMENT
class GRPCOutput (RPCTy p) (GRpcOWTy p ref r)
=> GRpcOutputWrapper (p :: GRpcMessageProtocol) (ref :: TypeRef snm) (r :: Type) where
type GRpcOWTy p ref r :: Type
buildGRpcOWTy :: Proxy p -> Proxy ref -> r -> GRpcOWTy p ref r
instance ToProtoBufTypeRef ref r
=> GRpcOutputWrapper 'MsgProtoBuf ref r where
type GRpcOWTy 'MsgProtoBuf ref r = ViaToProtoBufTypeRef ref r
buildGRpcOWTy :: Proxy 'MsgProtoBuf -> Proxy ref -> r -> GRpcOWTy 'MsgProtoBuf ref r
buildGRpcOWTy _ _ = r -> GRpcOWTy 'MsgProtoBuf ref r
forall snm (ref :: TypeRef snm) t. t -> ViaToProtoBufTypeRef ref t
ViaToProtoBufTypeRef
instance forall (sch :: Schema') sty (r :: Type).
( ToSchema sch sty r
, ToAvro (WithSchema sch sty r)
, HasAvroSchema (WithSchema sch sty r) )
=> GRpcOutputWrapper 'MsgAvro ('SchemaRef sch sty) r where
type GRpcOWTy 'MsgAvro ('SchemaRef sch sty) r = ViaToAvroTypeRef ('SchemaRef sch sty) r
buildGRpcOWTy :: Proxy 'MsgAvro
-> Proxy ('SchemaRef sch sty)
-> r
-> GRpcOWTy 'MsgAvro ('SchemaRef sch sty) r
buildGRpcOWTy _ _ = r -> GRpcOWTy 'MsgAvro ('SchemaRef sch sty) r
forall snm (ref :: TypeRef snm) t. t -> ViaToAvroTypeRef ref t
ViaToAvroTypeRef
class GRPCInput (RPCTy p) (GRpcIWTy p ref r)
=> GRpcInputWrapper (p :: GRpcMessageProtocol) (ref :: TypeRef snm) (r :: Type) where
type GRpcIWTy p ref r :: Type
unGRpcIWTy :: Proxy p -> Proxy ref -> GRpcIWTy p ref r -> r
instance FromProtoBufTypeRef ref r
=> GRpcInputWrapper 'MsgProtoBuf ref r where
type GRpcIWTy 'MsgProtoBuf ref r = ViaFromProtoBufTypeRef ref r
unGRpcIWTy :: Proxy 'MsgProtoBuf -> Proxy ref -> GRpcIWTy 'MsgProtoBuf ref r -> r
unGRpcIWTy _ _ = GRpcIWTy 'MsgProtoBuf ref r -> r
forall snm (ref :: TypeRef snm) t.
ViaFromProtoBufTypeRef ref t -> t
unViaFromProtoBufTypeRef
instance forall (sch :: Schema') sty (r :: Type).
( FromSchema sch sty r
, FromAvro (WithSchema sch sty r)
, HasAvroSchema (WithSchema sch sty r) )
=> GRpcInputWrapper 'MsgAvro ('SchemaRef sch sty) r where
type GRpcIWTy 'MsgAvro ('SchemaRef sch sty) r = ViaFromAvroTypeRef ('SchemaRef sch sty) r
unGRpcIWTy :: Proxy 'MsgAvro
-> Proxy ('SchemaRef sch sty)
-> GRpcIWTy 'MsgAvro ('SchemaRef sch sty) r
-> r
unGRpcIWTy _ _ = GRpcIWTy 'MsgAvro ('SchemaRef sch sty) r -> r
forall snm (ref :: TypeRef snm) t. ViaFromAvroTypeRef ref t -> t
unViaFromAvroTypeRef
instance (MonadIO m, GRPCInput (RPCTy p) (), GRPCOutput (RPCTy p) ())
=> GRpcMethodHandler p m '[ ] 'RetNothing (m ()) where
gRpcMethodHandler :: (forall a. m a -> ServerErrorIO a)
-> Proxy p
-> Proxy '[]
-> Proxy 'RetNothing
-> RPCTy p
-> m ()
-> ServiceHandler
gRpcMethodHandler f :: forall a. m a -> ServerErrorIO a
f _ _ _ rpc :: RPCTy p
rpc h :: m ()
h
= (forall x. m x -> IO x)
-> RPCTy p -> UnaryHandler m () () -> ServiceHandler
forall (m :: * -> *) r i o.
(MonadIO m, GRPCInput r i, GRPCOutput r o) =>
(forall x. m x -> IO x)
-> r -> UnaryHandler m i o -> ServiceHandler
unary @m @_ @() @() (ServerErrorIO x -> IO x
forall (m :: * -> *) a. MonadIO m => ServerErrorIO a -> m a
raiseErrors (ServerErrorIO x -> IO x)
-> (m x -> ServerErrorIO x) -> m x -> IO x
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m x -> ServerErrorIO x
forall a. m a -> ServerErrorIO a
f) RPCTy p
rpc (\_ _ -> m ()
h)
instance (MonadIO m, GRPCInput (RPCTy p) (), GRpcOutputWrapper p rref r)
=> GRpcMethodHandler p m '[ ] ('RetSingle rref) (m r) where
gRpcMethodHandler :: (forall a. m a -> ServerErrorIO a)
-> Proxy p
-> Proxy '[]
-> Proxy ('RetSingle rref)
-> RPCTy p
-> m r
-> ServiceHandler
gRpcMethodHandler f :: forall a. m a -> ServerErrorIO a
f _ _ _ rpc :: RPCTy p
rpc h :: m r
h
= (forall x. m x -> IO x)
-> RPCTy p
-> UnaryHandler m () (GRpcOWTy p rref r)
-> ServiceHandler
forall (m :: * -> *) r i o.
(MonadIO m, GRPCInput r i, GRPCOutput r o) =>
(forall x. m x -> IO x)
-> r -> UnaryHandler m i o -> ServiceHandler
unary @m @_ @() @(GRpcOWTy p rref r)
(ServerErrorIO x -> IO x
forall (m :: * -> *) a. MonadIO m => ServerErrorIO a -> m a
raiseErrors (ServerErrorIO x -> IO x)
-> (m x -> ServerErrorIO x) -> m x -> IO x
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m x -> ServerErrorIO x
forall a. m a -> ServerErrorIO a
f) RPCTy p
rpc (\_ _ -> Proxy p -> Proxy rref -> r -> GRpcOWTy p rref r
forall snm (p :: GRpcMessageProtocol) (ref :: TypeRef snm) r.
GRpcOutputWrapper p ref r =>
Proxy p -> Proxy ref -> r -> GRpcOWTy p ref r
buildGRpcOWTy (Proxy p
forall k (t :: k). Proxy t
Proxy @p) (Proxy rref
forall k (t :: k). Proxy t
Proxy @rref) (r -> GRpcOWTy p rref r) -> m r -> m (GRpcOWTy p rref r)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> m r
h)
instance (MonadIO m, GRPCInput (RPCTy p) (), GRpcOutputWrapper p rref r, MonadIO m)
=> GRpcMethodHandler p m '[ ] ('RetStream rref)
(ConduitT r Void m () -> m ()) where
gRpcMethodHandler :: (forall a. m a -> ServerErrorIO a)
-> Proxy p
-> Proxy '[]
-> Proxy ('RetStream rref)
-> RPCTy p
-> (ConduitT r Void m () -> m ())
-> ServiceHandler
gRpcMethodHandler f :: forall a. m a -> ServerErrorIO a
f _ _ _ rpc :: RPCTy p
rpc h :: ConduitT r Void m () -> m ()
h
= (forall x. m x -> IO x)
-> RPCTy p
-> ServerStreamHandler m () (GRpcOWTy p rref r) ()
-> ServiceHandler
forall (m :: * -> *) r i o a.
(MonadIO m, GRPCInput r i, GRPCOutput r o) =>
(forall x. m x -> IO x)
-> r -> ServerStreamHandler m i o a -> ServiceHandler
serverStream @m @_ @() @(GRpcOWTy p rref r) (ServerErrorIO x -> IO x
forall (m :: * -> *) a. MonadIO m => ServerErrorIO a -> m a
raiseErrors (ServerErrorIO x -> IO x)
-> (m x -> ServerErrorIO x) -> m x -> IO x
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m x -> ServerErrorIO x
forall a. m a -> ServerErrorIO a
f) RPCTy p
rpc ServerStreamHandler m () (GRpcOWTy p rref r) ()
forall req.
req -> () -> m ((), ServerStream m (GRpcOWTy p rref r) ())
sstream
where sstream :: req -> ()
-> m ((), ServerStream m (GRpcOWTy p rref r) ())
sstream :: req -> () -> m ((), ServerStream m (GRpcOWTy p rref r) ())
sstream _ _ = do
TMVar (Maybe r)
var <- IO (TMVar (Maybe r)) -> m (TMVar (Maybe r))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (TMVar (Maybe r))
forall a. IO (TMVar a)
newEmptyTMVarIO :: m (TMVar (Maybe r))
Async ()
promise <- IO (Async ()) -> m (Async ())
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Async ()) -> m (Async ())) -> IO (Async ()) -> m (Async ())
forall a b. (a -> b) -> a -> b
$ IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async (ServerErrorIO () -> IO ()
forall (m :: * -> *) a. MonadIO m => ServerErrorIO a -> m a
raiseErrors (ServerErrorIO () -> IO ()) -> ServerErrorIO () -> IO ()
forall a b. (a -> b) -> a -> b
$ m () -> ServerErrorIO ()
forall a. m a -> ServerErrorIO a
f (ConduitT r Void m () -> m ()
h (TMVar (Maybe r) -> ConduitT r Void m ()
forall (m :: * -> *) r.
MonadIO m =>
TMVar (Maybe r) -> ConduitT r Void m ()
toTMVarConduit TMVar (Maybe r)
var)))
let readNext :: () -> m (Maybe ((), GRpcOWTy p rref r))
readNext _
= do Maybe r
nextOutput <- IO (Maybe r) -> m (Maybe r)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe r) -> m (Maybe r)) -> IO (Maybe r) -> m (Maybe r)
forall a b. (a -> b) -> a -> b
$ STM (Maybe r) -> IO (Maybe r)
forall a. STM a -> IO a
atomically (STM (Maybe r) -> IO (Maybe r)) -> STM (Maybe r) -> IO (Maybe r)
forall a b. (a -> b) -> a -> b
$ TMVar (Maybe r) -> STM (Maybe r)
forall a. TMVar a -> STM a
takeTMVar TMVar (Maybe r)
var
case Maybe r
nextOutput of
Just o :: r
o -> Maybe ((), GRpcOWTy p rref r) -> m (Maybe ((), GRpcOWTy p rref r))
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe ((), GRpcOWTy p rref r)
-> m (Maybe ((), GRpcOWTy p rref r)))
-> Maybe ((), GRpcOWTy p rref r)
-> m (Maybe ((), GRpcOWTy p rref r))
forall a b. (a -> b) -> a -> b
$ ((), GRpcOWTy p rref r) -> Maybe ((), GRpcOWTy p rref r)
forall a. a -> Maybe a
Just ((), Proxy p -> Proxy rref -> r -> GRpcOWTy p rref r
forall snm (p :: GRpcMessageProtocol) (ref :: TypeRef snm) r.
GRpcOutputWrapper p ref r =>
Proxy p -> Proxy ref -> r -> GRpcOWTy p ref r
buildGRpcOWTy (Proxy p
forall k (t :: k). Proxy t
Proxy @p) (Proxy rref
forall k (t :: k). Proxy t
Proxy @rref) r
o)
Nothing -> do IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Async () -> IO ()
forall a. Async a -> IO ()
cancel Async ()
promise
Maybe ((), GRpcOWTy p rref r) -> m (Maybe ((), GRpcOWTy p rref r))
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe ((), GRpcOWTy p rref r)
forall a. Maybe a
Nothing
((), ServerStream m (GRpcOWTy p rref r) ())
-> m ((), ServerStream m (GRpcOWTy p rref r) ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure ((), (() -> m (Maybe ((), GRpcOWTy p rref r)))
-> ServerStream m (GRpcOWTy p rref r) ()
forall (m :: * -> *) o a.
(a -> m (Maybe (a, o))) -> ServerStream m o a
ServerStream () -> m (Maybe ((), GRpcOWTy p rref r))
readNext)
instance (MonadIO m, GRpcInputWrapper p vref v, GRPCOutput (RPCTy p) ())
=> GRpcMethodHandler p m '[ 'ArgSingle aname anns vref ] 'RetNothing (v -> m ()) where
gRpcMethodHandler :: (forall a. m a -> ServerErrorIO a)
-> Proxy p
-> Proxy '[ 'ArgSingle aname anns vref]
-> Proxy 'RetNothing
-> RPCTy p
-> (v -> m ())
-> ServiceHandler
gRpcMethodHandler f :: forall a. m a -> ServerErrorIO a
f _ _ _ rpc :: RPCTy p
rpc h :: v -> m ()
h
= (forall x. m x -> IO x)
-> RPCTy p
-> UnaryHandler m (GRpcIWTy p vref v) ()
-> ServiceHandler
forall (m :: * -> *) r i o.
(MonadIO m, GRPCInput r i, GRPCOutput r o) =>
(forall x. m x -> IO x)
-> r -> UnaryHandler m i o -> ServiceHandler
unary @m @_ @(GRpcIWTy p vref v) @()
(ServerErrorIO x -> IO x
forall (m :: * -> *) a. MonadIO m => ServerErrorIO a -> m a
raiseErrors (ServerErrorIO x -> IO x)
-> (m x -> ServerErrorIO x) -> m x -> IO x
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m x -> ServerErrorIO x
forall a. m a -> ServerErrorIO a
f) RPCTy p
rpc (\_ -> v -> m ()
h (v -> m ())
-> (GRpcIWTy p vref v -> v) -> GRpcIWTy p vref v -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Proxy p -> Proxy vref -> GRpcIWTy p vref v -> v
forall snm (p :: GRpcMessageProtocol) (ref :: TypeRef snm) r.
GRpcInputWrapper p ref r =>
Proxy p -> Proxy ref -> GRpcIWTy p ref r -> r
unGRpcIWTy (Proxy p
forall k (t :: k). Proxy t
Proxy @p) (Proxy vref
forall k (t :: k). Proxy t
Proxy @vref))
instance (MonadIO m, GRpcInputWrapper p vref v, GRpcOutputWrapper p rref r)
=> GRpcMethodHandler p m '[ 'ArgSingle aname anns vref ] ('RetSingle rref) (v -> m r) where
gRpcMethodHandler :: (forall a. m a -> ServerErrorIO a)
-> Proxy p
-> Proxy '[ 'ArgSingle aname anns vref]
-> Proxy ('RetSingle rref)
-> RPCTy p
-> (v -> m r)
-> ServiceHandler
gRpcMethodHandler f :: forall a. m a -> ServerErrorIO a
f _ _ _ rpc :: RPCTy p
rpc h :: v -> m r
h
= (forall x. m x -> IO x)
-> RPCTy p
-> UnaryHandler m (GRpcIWTy p vref v) (GRpcOWTy p rref r)
-> ServiceHandler
forall (m :: * -> *) r i o.
(MonadIO m, GRPCInput r i, GRPCOutput r o) =>
(forall x. m x -> IO x)
-> r -> UnaryHandler m i o -> ServiceHandler
unary @m @_ @(GRpcIWTy p vref v) @(GRpcOWTy p rref r)
(ServerErrorIO x -> IO x
forall (m :: * -> *) a. MonadIO m => ServerErrorIO a -> m a
raiseErrors (ServerErrorIO x -> IO x)
-> (m x -> ServerErrorIO x) -> m x -> IO x
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m x -> ServerErrorIO x
forall a. m a -> ServerErrorIO a
f) RPCTy p
rpc
(\_ -> (Proxy p -> Proxy rref -> r -> GRpcOWTy p rref r
forall snm (p :: GRpcMessageProtocol) (ref :: TypeRef snm) r.
GRpcOutputWrapper p ref r =>
Proxy p -> Proxy ref -> r -> GRpcOWTy p ref r
buildGRpcOWTy (Proxy p
forall k (t :: k). Proxy t
Proxy @p) (Proxy rref
forall k (t :: k). Proxy t
Proxy @rref) (r -> GRpcOWTy p rref r) -> m r -> m (GRpcOWTy p rref r)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>)
(m r -> m (GRpcOWTy p rref r))
-> (GRpcIWTy p vref v -> m r)
-> GRpcIWTy p vref v
-> m (GRpcOWTy p rref r)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. v -> m r
h
(v -> m r) -> (GRpcIWTy p vref v -> v) -> GRpcIWTy p vref v -> m r
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Proxy p -> Proxy vref -> GRpcIWTy p vref v -> v
forall snm (p :: GRpcMessageProtocol) (ref :: TypeRef snm) r.
GRpcInputWrapper p ref r =>
Proxy p -> Proxy ref -> GRpcIWTy p ref r -> r
unGRpcIWTy (Proxy p
forall k (t :: k). Proxy t
Proxy @p) (Proxy vref
forall k (t :: k). Proxy t
Proxy @vref))
instance (GRpcInputWrapper p vref v, GRpcOutputWrapper p rref r, MonadIO m)
=> GRpcMethodHandler p m '[ 'ArgSingle aname anns vref ] ('RetStream rref)
(v -> ConduitT r Void m () -> m ()) where
gRpcMethodHandler :: (forall a. m a -> ServerErrorIO a)
-> Proxy p
-> Proxy '[ 'ArgSingle aname anns vref]
-> Proxy ('RetStream rref)
-> RPCTy p
-> (v -> ConduitT r Void m () -> m ())
-> ServiceHandler
gRpcMethodHandler f :: forall a. m a -> ServerErrorIO a
f _ _ _ rpc :: RPCTy p
rpc h :: v -> ConduitT r Void m () -> m ()
h
= (forall x. m x -> IO x)
-> RPCTy p
-> ServerStreamHandler m (GRpcIWTy p vref v) (GRpcOWTy p rref r) ()
-> ServiceHandler
forall (m :: * -> *) r i o a.
(MonadIO m, GRPCInput r i, GRPCOutput r o) =>
(forall x. m x -> IO x)
-> r -> ServerStreamHandler m i o a -> ServiceHandler
serverStream @m @_ @(GRpcIWTy p vref v) @(GRpcOWTy p rref r)
(ServerErrorIO x -> IO x
forall (m :: * -> *) a. MonadIO m => ServerErrorIO a -> m a
raiseErrors (ServerErrorIO x -> IO x)
-> (m x -> ServerErrorIO x) -> m x -> IO x
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m x -> ServerErrorIO x
forall a. m a -> ServerErrorIO a
f) RPCTy p
rpc ServerStreamHandler m (GRpcIWTy p vref v) (GRpcOWTy p rref r) ()
forall req.
req
-> GRpcIWTy p vref v
-> m ((), ServerStream m (GRpcOWTy p rref r) ())
sstream
where sstream :: req -> GRpcIWTy p vref v
-> m ((), ServerStream m (GRpcOWTy p rref r) ())
sstream :: req
-> GRpcIWTy p vref v
-> m ((), ServerStream m (GRpcOWTy p rref r) ())
sstream _ v :: GRpcIWTy p vref v
v = do
TMVar (Maybe r)
var <- IO (TMVar (Maybe r)) -> m (TMVar (Maybe r))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (TMVar (Maybe r))
forall a. IO (TMVar a)
newEmptyTMVarIO :: m (TMVar (Maybe r))
let v' :: v
v' = Proxy p -> Proxy vref -> GRpcIWTy p vref v -> v
forall snm (p :: GRpcMessageProtocol) (ref :: TypeRef snm) r.
GRpcInputWrapper p ref r =>
Proxy p -> Proxy ref -> GRpcIWTy p ref r -> r
unGRpcIWTy (Proxy p
forall k (t :: k). Proxy t
Proxy @p) (Proxy vref
forall k (t :: k). Proxy t
Proxy @vref) GRpcIWTy p vref v
v
Async ()
promise <- IO (Async ()) -> m (Async ())
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Async ()) -> m (Async ())) -> IO (Async ()) -> m (Async ())
forall a b. (a -> b) -> a -> b
$ IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async (ServerErrorIO () -> IO ()
forall (m :: * -> *) a. MonadIO m => ServerErrorIO a -> m a
raiseErrors (ServerErrorIO () -> IO ()) -> ServerErrorIO () -> IO ()
forall a b. (a -> b) -> a -> b
$ m () -> ServerErrorIO ()
forall a. m a -> ServerErrorIO a
f (v -> ConduitT r Void m () -> m ()
h v
v' (TMVar (Maybe r) -> ConduitT r Void m ()
forall (m :: * -> *) r.
MonadIO m =>
TMVar (Maybe r) -> ConduitT r Void m ()
toTMVarConduit TMVar (Maybe r)
var)))
let readNext :: () -> m (Maybe ((), GRpcOWTy p rref r))
readNext _
= do Maybe r
nextOutput <- IO (Maybe r) -> m (Maybe r)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe r) -> m (Maybe r)) -> IO (Maybe r) -> m (Maybe r)
forall a b. (a -> b) -> a -> b
$ STM (Maybe r) -> IO (Maybe r)
forall a. STM a -> IO a
atomically (STM (Maybe r) -> IO (Maybe r)) -> STM (Maybe r) -> IO (Maybe r)
forall a b. (a -> b) -> a -> b
$ TMVar (Maybe r) -> STM (Maybe r)
forall a. TMVar a -> STM a
takeTMVar TMVar (Maybe r)
var
case Maybe r
nextOutput of
Just o :: r
o -> Maybe ((), GRpcOWTy p rref r) -> m (Maybe ((), GRpcOWTy p rref r))
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe ((), GRpcOWTy p rref r)
-> m (Maybe ((), GRpcOWTy p rref r)))
-> Maybe ((), GRpcOWTy p rref r)
-> m (Maybe ((), GRpcOWTy p rref r))
forall a b. (a -> b) -> a -> b
$ ((), GRpcOWTy p rref r) -> Maybe ((), GRpcOWTy p rref r)
forall a. a -> Maybe a
Just ((), Proxy p -> Proxy rref -> r -> GRpcOWTy p rref r
forall snm (p :: GRpcMessageProtocol) (ref :: TypeRef snm) r.
GRpcOutputWrapper p ref r =>
Proxy p -> Proxy ref -> r -> GRpcOWTy p ref r
buildGRpcOWTy (Proxy p
forall k (t :: k). Proxy t
Proxy @p) (Proxy rref
forall k (t :: k). Proxy t
Proxy @rref) r
o)
Nothing -> do IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Async () -> IO ()
forall a. Async a -> IO ()
cancel Async ()
promise
Maybe ((), GRpcOWTy p rref r) -> m (Maybe ((), GRpcOWTy p rref r))
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe ((), GRpcOWTy p rref r)
forall a. Maybe a
Nothing
((), ServerStream m (GRpcOWTy p rref r) ())
-> m ((), ServerStream m (GRpcOWTy p rref r) ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure ((), (() -> m (Maybe ((), GRpcOWTy p rref r)))
-> ServerStream m (GRpcOWTy p rref r) ()
forall (m :: * -> *) o a.
(a -> m (Maybe (a, o))) -> ServerStream m o a
ServerStream () -> m (Maybe ((), GRpcOWTy p rref r))
readNext)
instance (MonadIO m, GRpcInputWrapper p vref v, GRPCOutput (RPCTy p) (), MonadIO m)
=> GRpcMethodHandler p m '[ 'ArgStream aname anns vref ] 'RetNothing
(ConduitT () v m () -> m ()) where
gRpcMethodHandler :: (forall a. m a -> ServerErrorIO a)
-> Proxy p
-> Proxy '[ 'ArgStream aname anns vref]
-> Proxy 'RetNothing
-> RPCTy p
-> (ConduitT () v m () -> m ())
-> ServiceHandler
gRpcMethodHandler f :: forall a. m a -> ServerErrorIO a
f _ _ _ rpc :: RPCTy p
rpc h :: ConduitT () v m () -> m ()
h
= (forall x. m x -> IO x)
-> RPCTy p
-> ClientStreamHandler m (GRpcIWTy p vref v) () ()
-> ServiceHandler
forall (m :: * -> *) r i o a.
(MonadIO m, GRPCInput r i, GRPCOutput r o) =>
(forall x. m x -> IO x)
-> r -> ClientStreamHandler m i o a -> ServiceHandler
clientStream @m @_ @(GRpcIWTy p vref v) @()
(ServerErrorIO x -> IO x
forall (m :: * -> *) a. MonadIO m => ServerErrorIO a -> m a
raiseErrors (ServerErrorIO x -> IO x)
-> (m x -> ServerErrorIO x) -> m x -> IO x
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m x -> ServerErrorIO x
forall a. m a -> ServerErrorIO a
f) RPCTy p
rpc ClientStreamHandler m (GRpcIWTy p vref v) () ()
forall req. req -> m ((), ClientStream m (GRpcIWTy p vref v) () ())
cstream
where cstream :: req
-> m ((), ClientStream m (GRpcIWTy p vref v) () ())
cstream :: req -> m ((), ClientStream m (GRpcIWTy p vref v) () ())
cstream _ = do
TMChan v
chan <- IO (TMChan v) -> m (TMChan v)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (TMChan v)
forall a. IO (TMChan a)
newTMChanIO :: m (TMChan v)
let producer :: ConduitT () v m ()
producer = TMChan v -> ConduitT () v m ()
forall (m :: * -> *) a. MonadIO m => TMChan a -> ConduitT () a m ()
sourceTMChan @m TMChan v
chan
Async ()
promise <- IO (Async ()) -> m (Async ())
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Async ()) -> m (Async ())) -> IO (Async ()) -> m (Async ())
forall a b. (a -> b) -> a -> b
$ IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async (ServerErrorIO () -> IO ()
forall (m :: * -> *) a. MonadIO m => ServerErrorIO a -> m a
raiseErrors (ServerErrorIO () -> IO ()) -> ServerErrorIO () -> IO ()
forall a b. (a -> b) -> a -> b
$ m () -> ServerErrorIO ()
forall a. m a -> ServerErrorIO a
f (ConduitT () v m () -> m ()
h ConduitT () v m ()
producer))
let cstreamHandler :: () -> GRpcIWTy p vref v -> m ()
cstreamHandler _ newInput :: GRpcIWTy p vref v
newInput
= IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$
TMChan v -> v -> STM ()
forall a. TMChan a -> a -> STM ()
writeTMChan TMChan v
chan (Proxy p -> Proxy vref -> GRpcIWTy p vref v -> v
forall snm (p :: GRpcMessageProtocol) (ref :: TypeRef snm) r.
GRpcInputWrapper p ref r =>
Proxy p -> Proxy ref -> GRpcIWTy p ref r -> r
unGRpcIWTy (Proxy p
forall k (t :: k). Proxy t
Proxy @p) (Proxy vref
forall k (t :: k). Proxy t
Proxy @vref) GRpcIWTy p vref v
newInput)
cstreamFinalizer :: () -> m ()
cstreamFinalizer _
= IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (TMChan v -> STM ()
forall a. TMChan a -> STM ()
closeTMChan TMChan v
chan) IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Async () -> IO ()
forall a. Async a -> IO a
wait Async ()
promise
((), ClientStream m (GRpcIWTy p vref v) () ())
-> m ((), ClientStream m (GRpcIWTy p vref v) () ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure ((), (() -> GRpcIWTy p vref v -> m ())
-> (() -> m ()) -> ClientStream m (GRpcIWTy p vref v) () ()
forall (m :: * -> *) i o a.
(a -> i -> m a) -> (a -> m o) -> ClientStream m i o a
ClientStream () -> GRpcIWTy p vref v -> m ()
cstreamHandler () -> m ()
cstreamFinalizer)
instance (MonadIO m, GRpcInputWrapper p vref v, GRpcOutputWrapper p rref r, MonadIO m)
=> GRpcMethodHandler p m '[ 'ArgStream aname anns vref ] ('RetSingle rref)
(ConduitT () v m () -> m r) where
gRpcMethodHandler :: (forall a. m a -> ServerErrorIO a)
-> Proxy p
-> Proxy '[ 'ArgStream aname anns vref]
-> Proxy ('RetSingle rref)
-> RPCTy p
-> (ConduitT () v m () -> m r)
-> ServiceHandler
gRpcMethodHandler f :: forall a. m a -> ServerErrorIO a
f _ _ _ rpc :: RPCTy p
rpc h :: ConduitT () v m () -> m r
h
= (forall x. m x -> IO x)
-> RPCTy p
-> ClientStreamHandler m (GRpcIWTy p vref v) (GRpcOWTy p rref r) ()
-> ServiceHandler
forall (m :: * -> *) r i o a.
(MonadIO m, GRPCInput r i, GRPCOutput r o) =>
(forall x. m x -> IO x)
-> r -> ClientStreamHandler m i o a -> ServiceHandler
clientStream @m @_ @(GRpcIWTy p vref v) @(GRpcOWTy p rref r)
(ServerErrorIO x -> IO x
forall (m :: * -> *) a. MonadIO m => ServerErrorIO a -> m a
raiseErrors (ServerErrorIO x -> IO x)
-> (m x -> ServerErrorIO x) -> m x -> IO x
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m x -> ServerErrorIO x
forall a. m a -> ServerErrorIO a
f) RPCTy p
rpc ClientStreamHandler m (GRpcIWTy p vref v) (GRpcOWTy p rref r) ()
forall req.
req
-> m ((),
ClientStream m (GRpcIWTy p vref v) (GRpcOWTy p rref r) ())
cstream
where cstream :: req
-> m ((), ClientStream m (GRpcIWTy p vref v)
(GRpcOWTy p rref r) ())
cstream :: req
-> m ((),
ClientStream m (GRpcIWTy p vref v) (GRpcOWTy p rref r) ())
cstream _ = do
TMChan v
chan <- IO (TMChan v) -> m (TMChan v)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (TMChan v)
forall a. IO (TMChan a)
newTMChanIO :: m (TMChan v)
let producer :: ConduitT () v m ()
producer = TMChan v -> ConduitT () v m ()
forall (m :: * -> *) a. MonadIO m => TMChan a -> ConduitT () a m ()
sourceTMChan @m TMChan v
chan
Async (GRpcOWTy p rref r)
promise <- IO (Async (GRpcOWTy p rref r)) -> m (Async (GRpcOWTy p rref r))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Async (GRpcOWTy p rref r)) -> m (Async (GRpcOWTy p rref r)))
-> IO (Async (GRpcOWTy p rref r)) -> m (Async (GRpcOWTy p rref r))
forall a b. (a -> b) -> a -> b
$ IO (GRpcOWTy p rref r) -> IO (Async (GRpcOWTy p rref r))
forall a. IO a -> IO (Async a)
async (ServerErrorIO (GRpcOWTy p rref r) -> IO (GRpcOWTy p rref r)
forall (m :: * -> *) a. MonadIO m => ServerErrorIO a -> m a
raiseErrors (ServerErrorIO (GRpcOWTy p rref r) -> IO (GRpcOWTy p rref r))
-> ServerErrorIO (GRpcOWTy p rref r) -> IO (GRpcOWTy p rref r)
forall a b. (a -> b) -> a -> b
$ Proxy p -> Proxy rref -> r -> GRpcOWTy p rref r
forall snm (p :: GRpcMessageProtocol) (ref :: TypeRef snm) r.
GRpcOutputWrapper p ref r =>
Proxy p -> Proxy ref -> r -> GRpcOWTy p ref r
buildGRpcOWTy (Proxy p
forall k (t :: k). Proxy t
Proxy @p) (Proxy rref
forall k (t :: k). Proxy t
Proxy @rref) (r -> GRpcOWTy p rref r)
-> ExceptT ServerError IO r -> ServerErrorIO (GRpcOWTy p rref r)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> m r -> ExceptT ServerError IO r
forall a. m a -> ServerErrorIO a
f (ConduitT () v m () -> m r
h ConduitT () v m ()
producer))
let cstreamHandler :: () -> GRpcIWTy p vref v -> m ()
cstreamHandler _ newInput :: GRpcIWTy p vref v
newInput
= IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$
TMChan v -> v -> STM ()
forall a. TMChan a -> a -> STM ()
writeTMChan TMChan v
chan (Proxy p -> Proxy vref -> GRpcIWTy p vref v -> v
forall snm (p :: GRpcMessageProtocol) (ref :: TypeRef snm) r.
GRpcInputWrapper p ref r =>
Proxy p -> Proxy ref -> GRpcIWTy p ref r -> r
unGRpcIWTy (Proxy p
forall k (t :: k). Proxy t
Proxy @p) (Proxy vref
forall k (t :: k). Proxy t
Proxy @vref) GRpcIWTy p vref v
newInput)
cstreamFinalizer :: () -> m (GRpcOWTy p rref r)
cstreamFinalizer _
= IO (GRpcOWTy p rref r) -> m (GRpcOWTy p rref r)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (GRpcOWTy p rref r) -> m (GRpcOWTy p rref r))
-> IO (GRpcOWTy p rref r) -> m (GRpcOWTy p rref r)
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (TMChan v -> STM ()
forall a. TMChan a -> STM ()
closeTMChan TMChan v
chan) IO () -> IO (GRpcOWTy p rref r) -> IO (GRpcOWTy p rref r)
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Async (GRpcOWTy p rref r) -> IO (GRpcOWTy p rref r)
forall a. Async a -> IO a
wait Async (GRpcOWTy p rref r)
promise
((), ClientStream m (GRpcIWTy p vref v) (GRpcOWTy p rref r) ())
-> m ((),
ClientStream m (GRpcIWTy p vref v) (GRpcOWTy p rref r) ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure ((), (() -> GRpcIWTy p vref v -> m ())
-> (() -> m (GRpcOWTy p rref r))
-> ClientStream m (GRpcIWTy p vref v) (GRpcOWTy p rref r) ()
forall (m :: * -> *) i o a.
(a -> i -> m a) -> (a -> m o) -> ClientStream m i o a
ClientStream () -> GRpcIWTy p vref v -> m ()
cstreamHandler () -> m (GRpcOWTy p rref r)
cstreamFinalizer)
instance (GRpcInputWrapper p vref v, GRpcOutputWrapper p rref r, MonadIO m)
=> GRpcMethodHandler p m '[ 'ArgStream aname anns vref ] ('RetStream rref)
(ConduitT () v m () -> ConduitT r Void m () -> m ()) where
gRpcMethodHandler :: (forall a. m a -> ServerErrorIO a)
-> Proxy p
-> Proxy '[ 'ArgStream aname anns vref]
-> Proxy ('RetStream rref)
-> RPCTy p
-> (ConduitT () v m () -> ConduitT r Void m () -> m ())
-> ServiceHandler
gRpcMethodHandler f :: forall a. m a -> ServerErrorIO a
f _ _ _ rpc :: RPCTy p
rpc h :: ConduitT () v m () -> ConduitT r Void m () -> m ()
h
= (forall x. m x -> IO x)
-> RPCTy p
-> GeneralStreamHandler
m (GRpcIWTy p vref v) (GRpcOWTy p rref r) () ()
-> ServiceHandler
forall (m :: * -> *) r i o a b.
(MonadIO m, GRPCInput r i, GRPCOutput r o) =>
(forall x. m x -> IO x)
-> r -> GeneralStreamHandler m i o a b -> ServiceHandler
generalStream @m @_ @(GRpcIWTy p vref v) @(GRpcOWTy p rref r)
(ServerErrorIO x -> IO x
forall (m :: * -> *) a. MonadIO m => ServerErrorIO a -> m a
raiseErrors (ServerErrorIO x -> IO x)
-> (m x -> ServerErrorIO x) -> m x -> IO x
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m x -> ServerErrorIO x
forall a. m a -> ServerErrorIO a
f) RPCTy p
rpc GeneralStreamHandler
m (GRpcIWTy p vref v) (GRpcOWTy p rref r) () ()
forall req.
req
-> m ((), IncomingStream m (GRpcIWTy p vref v) (), (),
OutgoingStream m (GRpcOWTy p rref r) ())
bdstream
where bdstream :: req -> m ( (), IncomingStream m (GRpcIWTy p vref v) ()
, (), OutgoingStream m (GRpcOWTy p rref r) () )
bdstream :: req
-> m ((), IncomingStream m (GRpcIWTy p vref v) (), (),
OutgoingStream m (GRpcOWTy p rref r) ())
bdstream _ = do
TMChan v
chan <- IO (TMChan v) -> m (TMChan v)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (TMChan v)
forall a. IO (TMChan a)
newTMChanIO :: m (TMChan v)
let producer :: ConduitT () v m ()
producer = TMChan v -> ConduitT () v m ()
forall (m :: * -> *) a. MonadIO m => TMChan a -> ConduitT () a m ()
sourceTMChan @m TMChan v
chan
TMVar (Maybe r)
var <- IO (TMVar (Maybe r)) -> m (TMVar (Maybe r))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (TMVar (Maybe r))
forall a. IO (TMVar a)
newEmptyTMVarIO :: m (TMVar (Maybe r))
Async ()
promise <- IO (Async ()) -> m (Async ())
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Async ()) -> m (Async ())) -> IO (Async ()) -> m (Async ())
forall a b. (a -> b) -> a -> b
$ IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async (ServerErrorIO () -> IO ()
forall (m :: * -> *) a. MonadIO m => ServerErrorIO a -> m a
raiseErrors (ServerErrorIO () -> IO ()) -> ServerErrorIO () -> IO ()
forall a b. (a -> b) -> a -> b
$ m () -> ServerErrorIO ()
forall a. m a -> ServerErrorIO a
f (m () -> ServerErrorIO ()) -> m () -> ServerErrorIO ()
forall a b. (a -> b) -> a -> b
$ ConduitT () v m () -> ConduitT r Void m () -> m ()
h ConduitT () v m ()
producer (TMVar (Maybe r) -> ConduitT r Void m ()
forall (m :: * -> *) r.
MonadIO m =>
TMVar (Maybe r) -> ConduitT r Void m ()
toTMVarConduit TMVar (Maybe r)
var))
let cstreamHandler :: () -> GRpcIWTy p vref v -> m ()
cstreamHandler _ newInput :: GRpcIWTy p vref v
newInput
= IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$
TMChan v -> v -> STM ()
forall a. TMChan a -> a -> STM ()
writeTMChan TMChan v
chan (Proxy p -> Proxy vref -> GRpcIWTy p vref v -> v
forall snm (p :: GRpcMessageProtocol) (ref :: TypeRef snm) r.
GRpcInputWrapper p ref r =>
Proxy p -> Proxy ref -> GRpcIWTy p ref r -> r
unGRpcIWTy (Proxy p
forall k (t :: k). Proxy t
Proxy @p) (Proxy vref
forall k (t :: k). Proxy t
Proxy @vref) GRpcIWTy p vref v
newInput)
cstreamFinalizer :: () -> m ()
cstreamFinalizer _
= IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (TMChan v -> STM ()
forall a. TMChan a -> STM ()
closeTMChan TMChan v
chan) IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Async () -> IO ()
forall a. Async a -> IO a
wait Async ()
promise
readNext :: () -> m (Maybe ((), GRpcOWTy p rref r))
readNext _
= do Maybe (Maybe r)
nextOutput <- IO (Maybe (Maybe r)) -> m (Maybe (Maybe r))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe (Maybe r)) -> m (Maybe (Maybe r)))
-> IO (Maybe (Maybe r)) -> m (Maybe (Maybe r))
forall a b. (a -> b) -> a -> b
$ STM (Maybe (Maybe r)) -> IO (Maybe (Maybe r))
forall a. STM a -> IO a
atomically (STM (Maybe (Maybe r)) -> IO (Maybe (Maybe r)))
-> STM (Maybe (Maybe r)) -> IO (Maybe (Maybe r))
forall a b. (a -> b) -> a -> b
$ TMVar (Maybe r) -> STM (Maybe (Maybe r))
forall a. TMVar a -> STM (Maybe a)
tryTakeTMVar TMVar (Maybe r)
var
case Maybe (Maybe r)
nextOutput of
Just (Just o :: r
o) ->
Maybe ((), GRpcOWTy p rref r) -> m (Maybe ((), GRpcOWTy p rref r))
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe ((), GRpcOWTy p rref r)
-> m (Maybe ((), GRpcOWTy p rref r)))
-> Maybe ((), GRpcOWTy p rref r)
-> m (Maybe ((), GRpcOWTy p rref r))
forall a b. (a -> b) -> a -> b
$ ((), GRpcOWTy p rref r) -> Maybe ((), GRpcOWTy p rref r)
forall a. a -> Maybe a
Just ((), Proxy p -> Proxy rref -> r -> GRpcOWTy p rref r
forall snm (p :: GRpcMessageProtocol) (ref :: TypeRef snm) r.
GRpcOutputWrapper p ref r =>
Proxy p -> Proxy ref -> r -> GRpcOWTy p ref r
buildGRpcOWTy (Proxy p
forall k (t :: k). Proxy t
Proxy @p) (Proxy rref
forall k (t :: k). Proxy t
Proxy @rref) r
o)
Just Nothing -> do
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Async () -> IO ()
forall a. Async a -> IO ()
cancel Async ()
promise
Maybe ((), GRpcOWTy p rref r) -> m (Maybe ((), GRpcOWTy p rref r))
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe ((), GRpcOWTy p rref r)
forall a. Maybe a
Nothing
Nothing ->
() -> m (Maybe ((), GRpcOWTy p rref r))
readNext ()
((), IncomingStream m (GRpcIWTy p vref v) (), (),
OutgoingStream m (GRpcOWTy p rref r) ())
-> m ((), IncomingStream m (GRpcIWTy p vref v) (), (),
OutgoingStream m (GRpcOWTy p rref r) ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure ((), (() -> GRpcIWTy p vref v -> m ())
-> (() -> m ()) -> IncomingStream m (GRpcIWTy p vref v) ()
forall (m :: * -> *) i a.
(a -> i -> m a) -> (a -> m ()) -> IncomingStream m i a
IncomingStream () -> GRpcIWTy p vref v -> m ()
cstreamHandler () -> m ()
cstreamFinalizer, (), (() -> m (Maybe ((), GRpcOWTy p rref r)))
-> OutgoingStream m (GRpcOWTy p rref r) ()
forall (m :: * -> *) o a.
(a -> m (Maybe (a, o))) -> OutgoingStream m o a
OutgoingStream () -> m (Maybe ((), GRpcOWTy p rref r))
readNext)
toTMVarConduit :: MonadIO m => TMVar (Maybe r) -> ConduitT r Void m ()
toTMVarConduit :: TMVar (Maybe r) -> ConduitT r Void m ()
toTMVarConduit var :: TMVar (Maybe r)
var = do
Maybe r
x <- ConduitT r Void m (Maybe r)
forall (m :: * -> *) i. Monad m => Consumer i m (Maybe i)
await
IO () -> ConduitT r Void m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ConduitT r Void m ()) -> IO () -> ConduitT r Void m ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMVar (Maybe r) -> Maybe r -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar TMVar (Maybe r)
var Maybe r
x
TMVar (Maybe r) -> ConduitT r Void m ()
forall (m :: * -> *) r.
MonadIO m =>
TMVar (Maybe r) -> ConduitT r Void m ()
toTMVarConduit TMVar (Maybe r)
var