{-# language DataKinds #-}
{-# language FlexibleContexts #-}
{-# language FlexibleInstances #-}
{-# language GADTs #-}
{-# language MultiParamTypeClasses #-}
{-# language PolyKinds #-}
{-# language RankNTypes #-}
{-# language ScopedTypeVariables #-}
{-# language TypeApplications #-}
{-# language TypeFamilies #-}
{-# language TypeOperators #-}
{-# language UndecidableInstances #-}
module Mu.GRpc.Server
(
GRpcMessageProtocol(..)
, msgProtoBuf, msgAvro
, runGRpcApp, runGRpcAppTrans
, runGRpcAppSettings, Settings
, runGRpcAppTLS, TLSSettings
, gRpcApp
, raiseErrors, liftServerConduit
, module Avro
) where
import Control.Concurrent.Async
import Control.Concurrent.STM (atomically)
import Control.Concurrent.STM.TMVar
import Control.Exception
import Control.Monad.Except
import Data.ByteString (ByteString)
import qualified Data.ByteString.Char8 as BS
import Data.Conduit
import Data.Conduit.TMChan
import Data.Kind
import Data.Proxy
import Network.GRPC.HTTP2.Encoding (GRPCInput, GRPCOutput, gzip, uncompressed)
import Network.GRPC.HTTP2.Types (GRPCStatus (..), GRPCStatusCode (..))
import Network.GRPC.Server.Handlers
import Network.GRPC.Server.Wai as Wai
import Network.Wai (Application)
import Network.Wai.Handler.Warp (Port, Settings, run, runSettings)
import Network.Wai.Handler.WarpTLS (TLSSettings, runTLS)
import Mu.Adapter.ProtoBuf.Via
import Mu.GRpc.Avro
import qualified Mu.GRpc.Avro as Avro
import Mu.GRpc.Bridge
import Mu.Rpc
import Mu.Schema
import Mu.Server
runGRpcApp
:: ( KnownName name, KnownName (FindPackageName anns)
, GRpcMethodHandlers protocol ServerErrorIO methods handlers )
=> Proxy protocol
-> Port
-> ServerT f ('Service name anns methods) ServerErrorIO handlers
-> IO ()
runGRpcApp :: Proxy protocol
-> Port
-> ServerT f ('Service name anns methods) ServerErrorIO handlers
-> IO ()
runGRpcApp protocol :: Proxy protocol
protocol port :: Port
port = Proxy protocol
-> Port
-> (forall a. ServerErrorIO a -> ServerErrorIO a)
-> ServerT f ('Service name anns methods) ServerErrorIO handlers
-> IO ()
forall snm mnm (name :: snm) (anns :: [*])
(protocol :: GRpcMessageProtocol) (m :: * -> *)
(methods :: [Method mnm]) (handlers :: [*]) (f :: * -> *).
(KnownName name, KnownName (FindPackageName anns),
GRpcMethodHandlers protocol m methods handlers) =>
Proxy protocol
-> Port
-> (forall a. m a -> ServerErrorIO a)
-> ServerT f ('Service name anns methods) m handlers
-> IO ()
runGRpcAppTrans Proxy protocol
protocol Port
port forall a. a -> a
forall a. ServerErrorIO a -> ServerErrorIO a
id
runGRpcAppTrans
:: ( KnownName name, KnownName (FindPackageName anns)
, GRpcMethodHandlers protocol m methods handlers )
=> Proxy protocol
-> Port
-> (forall a. m a -> ServerErrorIO a)
-> ServerT f ('Service name anns methods) m handlers
-> IO ()
runGRpcAppTrans :: Proxy protocol
-> Port
-> (forall a. m a -> ServerErrorIO a)
-> ServerT f ('Service name anns methods) m handlers
-> IO ()
runGRpcAppTrans protocol :: Proxy protocol
protocol port :: Port
port f :: forall a. m a -> ServerErrorIO a
f svr :: ServerT f ('Service name anns methods) m handlers
svr = Port -> Application -> IO ()
run Port
port (Proxy protocol
-> (forall a. m a -> ServerErrorIO a)
-> ServerT f ('Service name anns methods) m handlers
-> Application
forall snm mnm (name :: snm) (anns :: [*])
(protocol :: GRpcMessageProtocol) (m :: * -> *)
(methods :: [Method mnm]) (handlers :: [*]) (f :: * -> *).
(KnownName name, KnownName (FindPackageName anns),
GRpcMethodHandlers protocol m methods handlers) =>
Proxy protocol
-> (forall a. m a -> ServerErrorIO a)
-> ServerT f ('Service name anns methods) m handlers
-> Application
gRpcAppTrans Proxy protocol
protocol forall a. m a -> ServerErrorIO a
f ServerT f ('Service name anns methods) m handlers
svr)
runGRpcAppSettings
:: ( KnownName name, KnownName (FindPackageName anns)
, GRpcMethodHandlers protocol m methods handlers )
=> Proxy protocol
-> Settings
-> (forall a. m a -> ServerErrorIO a)
-> ServerT f ('Service name anns methods) m handlers
-> IO ()
runGRpcAppSettings :: Proxy protocol
-> Settings
-> (forall a. m a -> ServerErrorIO a)
-> ServerT f ('Service name anns methods) m handlers
-> IO ()
runGRpcAppSettings protocol :: Proxy protocol
protocol st :: Settings
st f :: forall a. m a -> ServerErrorIO a
f svr :: ServerT f ('Service name anns methods) m handlers
svr = Settings -> Application -> IO ()
runSettings Settings
st (Proxy protocol
-> (forall a. m a -> ServerErrorIO a)
-> ServerT f ('Service name anns methods) m handlers
-> Application
forall snm mnm (name :: snm) (anns :: [*])
(protocol :: GRpcMessageProtocol) (m :: * -> *)
(methods :: [Method mnm]) (handlers :: [*]) (f :: * -> *).
(KnownName name, KnownName (FindPackageName anns),
GRpcMethodHandlers protocol m methods handlers) =>
Proxy protocol
-> (forall a. m a -> ServerErrorIO a)
-> ServerT f ('Service name anns methods) m handlers
-> Application
gRpcAppTrans Proxy protocol
protocol forall a. m a -> ServerErrorIO a
f ServerT f ('Service name anns methods) m handlers
svr)
runGRpcAppTLS
:: ( KnownName name, KnownName (FindPackageName anns)
, GRpcMethodHandlers protocol m methods handlers )
=> Proxy protocol
-> TLSSettings -> Settings
-> (forall a. m a -> ServerErrorIO a)
-> ServerT f ('Service name anns methods) m handlers
-> IO ()
runGRpcAppTLS :: Proxy protocol
-> TLSSettings
-> Settings
-> (forall a. m a -> ServerErrorIO a)
-> ServerT f ('Service name anns methods) m handlers
-> IO ()
runGRpcAppTLS protocol :: Proxy protocol
protocol tls :: TLSSettings
tls st :: Settings
st f :: forall a. m a -> ServerErrorIO a
f svr :: ServerT f ('Service name anns methods) m handlers
svr = TLSSettings -> Settings -> Application -> IO ()
runTLS TLSSettings
tls Settings
st (Proxy protocol
-> (forall a. m a -> ServerErrorIO a)
-> ServerT f ('Service name anns methods) m handlers
-> Application
forall snm mnm (name :: snm) (anns :: [*])
(protocol :: GRpcMessageProtocol) (m :: * -> *)
(methods :: [Method mnm]) (handlers :: [*]) (f :: * -> *).
(KnownName name, KnownName (FindPackageName anns),
GRpcMethodHandlers protocol m methods handlers) =>
Proxy protocol
-> (forall a. m a -> ServerErrorIO a)
-> ServerT f ('Service name anns methods) m handlers
-> Application
gRpcAppTrans Proxy protocol
protocol forall a. m a -> ServerErrorIO a
f ServerT f ('Service name anns methods) m handlers
svr)
gRpcApp
:: ( KnownName name, KnownName (FindPackageName anns)
, GRpcMethodHandlers protocol ServerErrorIO methods handlers )
=> Proxy protocol
-> ServerT f ('Service name anns methods) ServerErrorIO handlers
-> Application
gRpcApp :: Proxy protocol
-> ServerT f ('Service name anns methods) ServerErrorIO handlers
-> Application
gRpcApp protocol :: Proxy protocol
protocol = Proxy protocol
-> (forall a. ServerErrorIO a -> ServerErrorIO a)
-> ServerT f ('Service name anns methods) ServerErrorIO handlers
-> Application
forall snm mnm (name :: snm) (anns :: [*])
(protocol :: GRpcMessageProtocol) (m :: * -> *)
(methods :: [Method mnm]) (handlers :: [*]) (f :: * -> *).
(KnownName name, KnownName (FindPackageName anns),
GRpcMethodHandlers protocol m methods handlers) =>
Proxy protocol
-> (forall a. m a -> ServerErrorIO a)
-> ServerT f ('Service name anns methods) m handlers
-> Application
gRpcAppTrans Proxy protocol
protocol forall a. a -> a
forall a. ServerErrorIO a -> ServerErrorIO a
id
gRpcAppTrans
:: ( KnownName name, KnownName (FindPackageName anns)
, GRpcMethodHandlers protocol m methods handlers )
=> Proxy protocol
-> (forall a. m a -> ServerErrorIO a)
-> ServerT f ('Service name anns methods) m handlers
-> Application
gRpcAppTrans :: Proxy protocol
-> (forall a. m a -> ServerErrorIO a)
-> ServerT f ('Service name anns methods) m handlers
-> Application
gRpcAppTrans protocol :: Proxy protocol
protocol f :: forall a. m a -> ServerErrorIO a
f svr :: ServerT f ('Service name anns methods) m handlers
svr
= [Compression] -> [ServiceHandler] -> Application
Wai.grpcApp [Compression
uncompressed, Compression
gzip]
(Proxy protocol
-> (forall a. m a -> ServerErrorIO a)
-> ServerT f ('Service name anns methods) m handlers
-> [ServiceHandler]
forall snm mnm (name :: snm) (anns :: [*])
(methods :: [Method mnm]) (handlers :: [*]) (m :: * -> *)
(protocol :: GRpcMessageProtocol) (w :: * -> *).
(KnownName name, KnownName (FindPackageName anns),
GRpcMethodHandlers protocol m methods handlers) =>
Proxy protocol
-> (forall a. m a -> ServerErrorIO a)
-> ServerT w ('Service name anns methods) m handlers
-> [ServiceHandler]
gRpcServiceHandlers Proxy protocol
protocol forall a. m a -> ServerErrorIO a
f ServerT f ('Service name anns methods) m handlers
svr)
gRpcServiceHandlers
:: forall name anns methods handlers m protocol w.
( KnownName name, KnownName (FindPackageName anns)
, GRpcMethodHandlers protocol m methods handlers )
=> Proxy protocol
-> (forall a. m a -> ServerErrorIO a)
-> ServerT w ('Service name anns methods) m handlers
-> [ServiceHandler]
gRpcServiceHandlers :: Proxy protocol
-> (forall a. m a -> ServerErrorIO a)
-> ServerT w ('Service name anns methods) m handlers
-> [ServiceHandler]
gRpcServiceHandlers pr :: Proxy protocol
pr f :: forall a. m a -> ServerErrorIO a
f (Server svr :: HandlersT w methods m handlers
svr) = (forall a. m a -> ServerErrorIO a)
-> Proxy protocol
-> ByteString
-> ByteString
-> HandlersT w methods m handlers
-> [ServiceHandler]
forall mnm (p :: GRpcMessageProtocol) (m :: * -> *)
(ms :: [Method mnm]) (hs :: [*]) (f :: * -> *).
GRpcMethodHandlers p m ms hs =>
(forall a. m a -> ServerErrorIO a)
-> Proxy p
-> ByteString
-> ByteString
-> HandlersT f ms m hs
-> [ServiceHandler]
gRpcMethodHandlers forall a. m a -> ServerErrorIO a
f Proxy protocol
pr ByteString
packageName ByteString
serviceName HandlersT w methods m handlers
svr
where packageName :: ByteString
packageName = String -> ByteString
BS.pack (Proxy (FindPackageName anns) -> String
forall k (a :: k) (proxy :: k -> *).
KnownName a =>
proxy a -> String
nameVal (Proxy (FindPackageName anns)
forall k (t :: k). Proxy t
Proxy @(FindPackageName anns)))
serviceName :: ByteString
serviceName = String -> ByteString
BS.pack (Proxy name -> String
forall k (a :: k) (proxy :: k -> *).
KnownName a =>
proxy a -> String
nameVal (Proxy name
forall k (t :: k). Proxy t
Proxy @name))
class GRpcMethodHandlers (p :: GRpcMessageProtocol) (m :: Type -> Type)
(ms :: [Method mnm]) (hs :: [Type]) where
gRpcMethodHandlers :: (forall a. m a -> ServerErrorIO a)
-> Proxy p -> ByteString -> ByteString
-> HandlersT f ms m hs -> [ServiceHandler]
instance GRpcMethodHandlers p m '[] '[] where
gRpcMethodHandlers :: (forall a. m a -> ServerErrorIO a)
-> Proxy p
-> ByteString
-> ByteString
-> HandlersT f '[] m '[]
-> [ServiceHandler]
gRpcMethodHandlers _ _ _ _ H0 = []
instance (KnownName name, GRpcMethodHandler p m args r h, GRpcMethodHandlers p m rest hs, MkRPC p)
=> GRpcMethodHandlers p m ('Method name anns args r ': rest) (h ': hs) where
gRpcMethodHandlers :: (forall a. m a -> ServerErrorIO a)
-> Proxy p
-> ByteString
-> ByteString
-> HandlersT f ('Method name anns args r : rest) m (h : hs)
-> [ServiceHandler]
gRpcMethodHandlers f :: forall a. m a -> ServerErrorIO a
f pr :: Proxy p
pr p :: ByteString
p s :: ByteString
s (h :: h
h :<|>: rest :: HandlersT f ms m hs1
rest)
= (forall a. m a -> ServerErrorIO a)
-> Proxy p
-> Proxy args
-> Proxy r
-> RPCTy p
-> h
-> ServiceHandler
forall k k (p :: GRpcMessageProtocol) (m :: * -> *) (args :: k)
(r :: k) h.
GRpcMethodHandler p m args r h =>
(forall a. m a -> ServerErrorIO a)
-> Proxy p
-> Proxy args
-> Proxy r
-> RPCTy p
-> h
-> ServiceHandler
gRpcMethodHandler forall a. m a -> ServerErrorIO a
f Proxy p
pr (Proxy args
forall k (t :: k). Proxy t
Proxy @args) (Proxy r
forall k (t :: k). Proxy t
Proxy @r) (Proxy p -> ByteString -> ByteString -> ByteString -> RPCTy p
forall (p :: GRpcMessageProtocol).
MkRPC p =>
Proxy p -> ByteString -> ByteString -> ByteString -> RPCTy p
mkRPC Proxy p
pr ByteString
p ByteString
s ByteString
methodName) h
h
ServiceHandler -> [ServiceHandler] -> [ServiceHandler]
forall a. a -> [a] -> [a]
: (forall a. m a -> ServerErrorIO a)
-> Proxy p
-> ByteString
-> ByteString
-> HandlersT f ms m hs1
-> [ServiceHandler]
forall mnm (p :: GRpcMessageProtocol) (m :: * -> *)
(ms :: [Method mnm]) (hs :: [*]) (f :: * -> *).
GRpcMethodHandlers p m ms hs =>
(forall a. m a -> ServerErrorIO a)
-> Proxy p
-> ByteString
-> ByteString
-> HandlersT f ms m hs
-> [ServiceHandler]
gRpcMethodHandlers forall a. m a -> ServerErrorIO a
f Proxy p
pr ByteString
p ByteString
s HandlersT f ms m hs1
rest
where methodName :: ByteString
methodName = String -> ByteString
BS.pack (Proxy name -> String
forall k (a :: k) (proxy :: k -> *).
KnownName a =>
proxy a -> String
nameVal (Proxy name
forall k (t :: k). Proxy t
Proxy @name))
class GRpcMethodHandler p m args r h where
gRpcMethodHandler :: (forall a. m a -> ServerErrorIO a)
-> Proxy p -> Proxy args -> Proxy r
-> RPCTy p -> h -> ServiceHandler
liftServerConduit
:: MonadIO m
=> ConduitT a b ServerErrorIO r -> ConduitT a b m r
liftServerConduit :: ConduitT a b ServerErrorIO r -> ConduitT a b m r
liftServerConduit = (forall a. ServerErrorIO a -> m a)
-> ConduitT a b ServerErrorIO r -> ConduitT a b m r
forall (m :: * -> *) (n :: * -> *) i o r.
Monad m =>
(forall a. m a -> n a) -> ConduitT i o m r -> ConduitT i o n r
transPipe forall a. ServerErrorIO a -> m a
forall (m :: * -> *) a. MonadIO m => ServerErrorIO a -> m a
raiseErrors
raiseErrors :: MonadIO m => ServerErrorIO a -> m a
raiseErrors :: ServerErrorIO a -> m a
raiseErrors h :: ServerErrorIO a
h
= IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO a -> m a) -> IO a -> m a
forall a b. (a -> b) -> a -> b
$ do
Either ServerError a
h' <- ServerErrorIO a -> IO (Either ServerError a)
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT ServerErrorIO a
h
case Either ServerError a
h' of
Right r :: a
r -> a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return a
r
Left (ServerError code :: ServerErrorCode
code msg :: String
msg)
-> GRPCStatus -> IO a
forall a. GRPCStatus -> IO a
closeEarly (GRPCStatus -> IO a) -> GRPCStatus -> IO a
forall a b. (a -> b) -> a -> b
$ GRPCStatusCode -> ByteString -> GRPCStatus
GRPCStatus (ServerErrorCode -> GRPCStatusCode
serverErrorToGRpcError ServerErrorCode
code)
(String -> ByteString
BS.pack String
msg)
IO a -> [Handler a] -> IO a
forall a. IO a -> [Handler a] -> IO a
`catches`
[ (GRPCStatus -> IO a) -> Handler a
forall a e. Exception e => (e -> IO a) -> Handler a
Handler (\(GRPCStatus
e :: GRPCStatus) -> GRPCStatus -> IO a
forall e a. Exception e => e -> IO a
throwIO GRPCStatus
e)
, (SomeException -> IO a) -> Handler a
forall a e. Exception e => (e -> IO a) -> Handler a
Handler (\(SomeException
e :: SomeException) -> GRPCStatus -> IO a
forall a. GRPCStatus -> IO a
closeEarly (GRPCStatus -> IO a) -> GRPCStatus -> IO a
forall a b. (a -> b) -> a -> b
$ GRPCStatusCode -> ByteString -> GRPCStatus
GRPCStatus GRPCStatusCode
INTERNAL (String -> ByteString
BS.pack (String -> ByteString) -> String -> ByteString
forall a b. (a -> b) -> a -> b
$ SomeException -> String
forall a. Show a => a -> String
show SomeException
e))
]
where
serverErrorToGRpcError :: ServerErrorCode -> GRPCStatusCode
serverErrorToGRpcError :: ServerErrorCode -> GRPCStatusCode
serverErrorToGRpcError Unknown = GRPCStatusCode
UNKNOWN
serverErrorToGRpcError Unavailable = GRPCStatusCode
UNAVAILABLE
serverErrorToGRpcError Unimplemented = GRPCStatusCode
UNIMPLEMENTED
serverErrorToGRpcError Unauthenticated = GRPCStatusCode
UNAUTHENTICATED
serverErrorToGRpcError Internal = GRPCStatusCode
INTERNAL
serverErrorToGRpcError NotFound = GRPCStatusCode
NOT_FOUND
serverErrorToGRpcError Invalid = GRPCStatusCode
INVALID_ARGUMENT
class GRPCOutput (RPCTy p) (GRpcOWTy p ref r)
=> GRpcOutputWrapper (p :: GRpcMessageProtocol) (ref :: TypeRef) (r :: Type) where
type GRpcOWTy p ref r :: Type
buildGRpcOWTy :: Proxy p -> Proxy ref -> r -> GRpcOWTy p ref r
instance ToProtoBufTypeRef ref r
=> GRpcOutputWrapper 'MsgProtoBuf ref r where
type GRpcOWTy 'MsgProtoBuf ref r = ViaToProtoBufTypeRef ref r
buildGRpcOWTy :: Proxy 'MsgProtoBuf -> Proxy ref -> r -> GRpcOWTy 'MsgProtoBuf ref r
buildGRpcOWTy _ _ = r -> GRpcOWTy 'MsgProtoBuf ref r
forall (ref :: TypeRef) t. t -> ViaToProtoBufTypeRef ref t
ViaToProtoBufTypeRef
instance (GRPCOutput AvroRPC (ViaToAvroTypeRef ('ViaSchema sch sty) r))
=> GRpcOutputWrapper 'MsgAvro ('ViaSchema sch sty) r where
type GRpcOWTy 'MsgAvro ('ViaSchema sch sty) r = ViaToAvroTypeRef ('ViaSchema sch sty) r
buildGRpcOWTy :: Proxy 'MsgAvro
-> Proxy ('ViaSchema sch sty)
-> r
-> GRpcOWTy 'MsgAvro ('ViaSchema sch sty) r
buildGRpcOWTy _ _ = r -> GRpcOWTy 'MsgAvro ('ViaSchema sch sty) r
forall (ref :: TypeRef) t. t -> ViaToAvroTypeRef ref t
ViaToAvroTypeRef
class GRPCInput (RPCTy p) (GRpcIWTy p ref r)
=> GRpcInputWrapper (p :: GRpcMessageProtocol) (ref :: TypeRef) (r :: Type) where
type GRpcIWTy p ref r :: Type
unGRpcIWTy :: Proxy p -> Proxy ref -> GRpcIWTy p ref r -> r
instance FromProtoBufTypeRef ref r
=> GRpcInputWrapper 'MsgProtoBuf ref r where
type GRpcIWTy 'MsgProtoBuf ref r = ViaFromProtoBufTypeRef ref r
unGRpcIWTy :: Proxy 'MsgProtoBuf -> Proxy ref -> GRpcIWTy 'MsgProtoBuf ref r -> r
unGRpcIWTy _ _ = GRpcIWTy 'MsgProtoBuf ref r -> r
forall (ref :: TypeRef) t. ViaFromProtoBufTypeRef ref t -> t
unViaFromProtoBufTypeRef
instance (GRPCInput AvroRPC (ViaFromAvroTypeRef ('ViaSchema sch sty) r))
=> GRpcInputWrapper 'MsgAvro ('ViaSchema sch sty) r where
type GRpcIWTy 'MsgAvro ('ViaSchema sch sty) r = ViaFromAvroTypeRef ('ViaSchema sch sty) r
unGRpcIWTy :: Proxy 'MsgAvro
-> Proxy ('ViaSchema sch sty)
-> GRpcIWTy 'MsgAvro ('ViaSchema sch sty) r
-> r
unGRpcIWTy _ _ = GRpcIWTy 'MsgAvro ('ViaSchema sch sty) r -> r
forall (ref :: TypeRef) t. ViaFromAvroTypeRef ref t -> t
unViaFromAvroTypeRef
instance (GRPCInput (RPCTy p) (), GRPCOutput (RPCTy p) ())
=> GRpcMethodHandler p m '[ ] 'RetNothing (m ()) where
gRpcMethodHandler :: (forall a. m a -> ServerErrorIO a)
-> Proxy p
-> Proxy '[]
-> Proxy 'RetNothing
-> RPCTy p
-> m ()
-> ServiceHandler
gRpcMethodHandler f :: forall a. m a -> ServerErrorIO a
f _ _ _ rpc :: RPCTy p
rpc h :: m ()
h
= RPCTy p -> UnaryHandler () () -> ServiceHandler
forall r i o.
(GRPCInput r i, GRPCOutput r o) =>
r -> UnaryHandler i o -> ServiceHandler
unary @_ @() @() RPCTy p
rpc (\_ _ -> ServerErrorIO () -> IO ()
forall (m :: * -> *) a. MonadIO m => ServerErrorIO a -> m a
raiseErrors (m () -> ServerErrorIO ()
forall a. m a -> ServerErrorIO a
f m ()
h))
instance (GRPCInput (RPCTy p) (), GRpcOutputWrapper p rref r)
=> GRpcMethodHandler p m '[ ] ('RetSingle rref) (m r) where
gRpcMethodHandler :: (forall a. m a -> ServerErrorIO a)
-> Proxy p
-> Proxy '[]
-> Proxy ('RetSingle rref)
-> RPCTy p
-> m r
-> ServiceHandler
gRpcMethodHandler f :: forall a. m a -> ServerErrorIO a
f _ _ _ rpc :: RPCTy p
rpc h :: m r
h
= RPCTy p -> UnaryHandler () (GRpcOWTy p rref r) -> ServiceHandler
forall r i o.
(GRPCInput r i, GRPCOutput r o) =>
r -> UnaryHandler i o -> ServiceHandler
unary @_ @() @(GRpcOWTy p rref r)
RPCTy p
rpc (\_ _ -> Proxy p -> Proxy rref -> r -> GRpcOWTy p rref r
forall (p :: GRpcMessageProtocol) (ref :: TypeRef) r.
GRpcOutputWrapper p ref r =>
Proxy p -> Proxy ref -> r -> GRpcOWTy p ref r
buildGRpcOWTy (Proxy p
forall k (t :: k). Proxy t
Proxy @p) (Proxy rref
forall k (t :: k). Proxy t
Proxy @rref) (r -> GRpcOWTy p rref r) -> IO r -> IO (GRpcOWTy p rref r)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ServerErrorIO r -> IO r
forall (m :: * -> *) a. MonadIO m => ServerErrorIO a -> m a
raiseErrors (m r -> ServerErrorIO r
forall a. m a -> ServerErrorIO a
f m r
h))
instance (GRPCInput (RPCTy p) (), GRpcOutputWrapper p rref r, MonadIO m)
=> GRpcMethodHandler p m '[ ] ('RetStream rref)
(ConduitT r Void m () -> m ()) where
gRpcMethodHandler :: (forall a. m a -> ServerErrorIO a)
-> Proxy p
-> Proxy '[]
-> Proxy ('RetStream rref)
-> RPCTy p
-> (ConduitT r Void m () -> m ())
-> ServiceHandler
gRpcMethodHandler f :: forall a. m a -> ServerErrorIO a
f _ _ _ rpc :: RPCTy p
rpc h :: ConduitT r Void m () -> m ()
h
= RPCTy p
-> ServerStreamHandler () (GRpcOWTy p rref r) () -> ServiceHandler
forall r i o a.
(GRPCInput r i, GRPCOutput r o) =>
r -> ServerStreamHandler i o a -> ServiceHandler
serverStream @_ @() @(GRpcOWTy p rref r) RPCTy p
rpc ServerStreamHandler () (GRpcOWTy p rref r) ()
forall req.
req -> () -> IO ((), ServerStream (GRpcOWTy p rref r) ())
sstream
where sstream :: req -> ()
-> IO ((), ServerStream (GRpcOWTy p rref r) ())
sstream :: req -> () -> IO ((), ServerStream (GRpcOWTy p rref r) ())
sstream _ _ = do
TMVar (Maybe r)
var <- IO (TMVar (Maybe r))
forall a. IO (TMVar a)
newEmptyTMVarIO :: IO (TMVar (Maybe r))
Async ()
promise <- IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async (ServerErrorIO () -> IO ()
forall (m :: * -> *) a. MonadIO m => ServerErrorIO a -> m a
raiseErrors (ServerErrorIO () -> IO ()) -> ServerErrorIO () -> IO ()
forall a b. (a -> b) -> a -> b
$ m () -> ServerErrorIO ()
forall a. m a -> ServerErrorIO a
f (ConduitT r Void m () -> m ()
h (TMVar (Maybe r) -> ConduitT r Void m ()
forall (m :: * -> *) r.
MonadIO m =>
TMVar (Maybe r) -> ConduitT r Void m ()
toTMVarConduit TMVar (Maybe r)
var)))
let readNext :: () -> IO (Maybe ((), GRpcOWTy p rref r))
readNext _
= do Maybe r
nextOutput <- STM (Maybe r) -> IO (Maybe r)
forall a. STM a -> IO a
atomically (STM (Maybe r) -> IO (Maybe r)) -> STM (Maybe r) -> IO (Maybe r)
forall a b. (a -> b) -> a -> b
$ TMVar (Maybe r) -> STM (Maybe r)
forall a. TMVar a -> STM a
takeTMVar TMVar (Maybe r)
var
case Maybe r
nextOutput of
Just o :: r
o -> Maybe ((), GRpcOWTy p rref r) -> IO (Maybe ((), GRpcOWTy p rref r))
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe ((), GRpcOWTy p rref r)
-> IO (Maybe ((), GRpcOWTy p rref r)))
-> Maybe ((), GRpcOWTy p rref r)
-> IO (Maybe ((), GRpcOWTy p rref r))
forall a b. (a -> b) -> a -> b
$ ((), GRpcOWTy p rref r) -> Maybe ((), GRpcOWTy p rref r)
forall a. a -> Maybe a
Just ((), Proxy p -> Proxy rref -> r -> GRpcOWTy p rref r
forall (p :: GRpcMessageProtocol) (ref :: TypeRef) r.
GRpcOutputWrapper p ref r =>
Proxy p -> Proxy ref -> r -> GRpcOWTy p ref r
buildGRpcOWTy (Proxy p
forall k (t :: k). Proxy t
Proxy @p) (Proxy rref
forall k (t :: k). Proxy t
Proxy @rref) r
o)
Nothing -> do Async () -> IO ()
forall a. Async a -> IO ()
cancel Async ()
promise
Maybe ((), GRpcOWTy p rref r) -> IO (Maybe ((), GRpcOWTy p rref r))
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe ((), GRpcOWTy p rref r)
forall a. Maybe a
Nothing
((), ServerStream (GRpcOWTy p rref r) ())
-> IO ((), ServerStream (GRpcOWTy p rref r) ())
forall (m :: * -> *) a. Monad m => a -> m a
return ((), (() -> IO (Maybe ((), GRpcOWTy p rref r)))
-> ServerStream (GRpcOWTy p rref r) ()
forall o a. (a -> IO (Maybe (a, o))) -> ServerStream o a
ServerStream () -> IO (Maybe ((), GRpcOWTy p rref r))
readNext)
instance (GRpcInputWrapper p vref v, GRPCOutput (RPCTy p) ())
=> GRpcMethodHandler p m '[ 'ArgSingle vref ] 'RetNothing (v -> m ()) where
gRpcMethodHandler :: (forall a. m a -> ServerErrorIO a)
-> Proxy p
-> Proxy '[ 'ArgSingle vref]
-> Proxy 'RetNothing
-> RPCTy p
-> (v -> m ())
-> ServiceHandler
gRpcMethodHandler f :: forall a. m a -> ServerErrorIO a
f _ _ _ rpc :: RPCTy p
rpc h :: v -> m ()
h
= RPCTy p -> UnaryHandler (GRpcIWTy p vref v) () -> ServiceHandler
forall r i o.
(GRPCInput r i, GRPCOutput r o) =>
r -> UnaryHandler i o -> ServiceHandler
unary @_ @(GRpcIWTy p vref v) @()
RPCTy p
rpc (\_ -> ServerErrorIO () -> IO ()
forall (m :: * -> *) a. MonadIO m => ServerErrorIO a -> m a
raiseErrors (ServerErrorIO () -> IO ())
-> (GRpcIWTy p vref v -> ServerErrorIO ())
-> GRpcIWTy p vref v
-> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m () -> ServerErrorIO ()
forall a. m a -> ServerErrorIO a
f (m () -> ServerErrorIO ())
-> (GRpcIWTy p vref v -> m ())
-> GRpcIWTy p vref v
-> ServerErrorIO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. v -> m ()
h (v -> m ())
-> (GRpcIWTy p vref v -> v) -> GRpcIWTy p vref v -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Proxy p -> Proxy vref -> GRpcIWTy p vref v -> v
forall (p :: GRpcMessageProtocol) (ref :: TypeRef) r.
GRpcInputWrapper p ref r =>
Proxy p -> Proxy ref -> GRpcIWTy p ref r -> r
unGRpcIWTy (Proxy p
forall k (t :: k). Proxy t
Proxy @p) (Proxy vref
forall k (t :: k). Proxy t
Proxy @vref))
instance (GRpcInputWrapper p vref v, GRpcOutputWrapper p rref r)
=> GRpcMethodHandler p m '[ 'ArgSingle vref ] ('RetSingle rref) (v -> m r) where
gRpcMethodHandler :: (forall a. m a -> ServerErrorIO a)
-> Proxy p
-> Proxy '[ 'ArgSingle vref]
-> Proxy ('RetSingle rref)
-> RPCTy p
-> (v -> m r)
-> ServiceHandler
gRpcMethodHandler f :: forall a. m a -> ServerErrorIO a
f _ _ _ rpc :: RPCTy p
rpc h :: v -> m r
h
= RPCTy p
-> UnaryHandler (GRpcIWTy p vref v) (GRpcOWTy p rref r)
-> ServiceHandler
forall r i o.
(GRPCInput r i, GRPCOutput r o) =>
r -> UnaryHandler i o -> ServiceHandler
unary @_ @(GRpcIWTy p vref v) @(GRpcOWTy p rref r)
RPCTy p
rpc (\_ -> (Proxy p -> Proxy rref -> r -> GRpcOWTy p rref r
forall (p :: GRpcMessageProtocol) (ref :: TypeRef) r.
GRpcOutputWrapper p ref r =>
Proxy p -> Proxy ref -> r -> GRpcOWTy p ref r
buildGRpcOWTy (Proxy p
forall k (t :: k). Proxy t
Proxy @p) (Proxy rref
forall k (t :: k). Proxy t
Proxy @rref) (r -> GRpcOWTy p rref r) -> IO r -> IO (GRpcOWTy p rref r)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>)
(IO r -> IO (GRpcOWTy p rref r))
-> (GRpcIWTy p vref v -> IO r)
-> GRpcIWTy p vref v
-> IO (GRpcOWTy p rref r)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ServerErrorIO r -> IO r
forall (m :: * -> *) a. MonadIO m => ServerErrorIO a -> m a
raiseErrors (ServerErrorIO r -> IO r)
-> (GRpcIWTy p vref v -> ServerErrorIO r)
-> GRpcIWTy p vref v
-> IO r
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m r -> ServerErrorIO r
forall a. m a -> ServerErrorIO a
f (m r -> ServerErrorIO r)
-> (GRpcIWTy p vref v -> m r)
-> GRpcIWTy p vref v
-> ServerErrorIO r
forall b c a. (b -> c) -> (a -> b) -> a -> c
. v -> m r
h
(v -> m r) -> (GRpcIWTy p vref v -> v) -> GRpcIWTy p vref v -> m r
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Proxy p -> Proxy vref -> GRpcIWTy p vref v -> v
forall (p :: GRpcMessageProtocol) (ref :: TypeRef) r.
GRpcInputWrapper p ref r =>
Proxy p -> Proxy ref -> GRpcIWTy p ref r -> r
unGRpcIWTy (Proxy p
forall k (t :: k). Proxy t
Proxy @p) (Proxy vref
forall k (t :: k). Proxy t
Proxy @vref))
instance (GRpcInputWrapper p vref v, GRpcOutputWrapper p rref r, MonadIO m)
=> GRpcMethodHandler p m '[ 'ArgStream vref ] ('RetSingle rref)
(ConduitT () v m () -> m r) where
gRpcMethodHandler :: (forall a. m a -> ServerErrorIO a)
-> Proxy p
-> Proxy '[ 'ArgStream vref]
-> Proxy ('RetSingle rref)
-> RPCTy p
-> (ConduitT () v m () -> m r)
-> ServiceHandler
gRpcMethodHandler f :: forall a. m a -> ServerErrorIO a
f _ _ _ rpc :: RPCTy p
rpc h :: ConduitT () v m () -> m r
h
= RPCTy p
-> ClientStreamHandler (GRpcIWTy p vref v) (GRpcOWTy p rref r) ()
-> ServiceHandler
forall r i o a.
(GRPCInput r i, GRPCOutput r o) =>
r -> ClientStreamHandler i o a -> ServiceHandler
clientStream @_ @(GRpcIWTy p vref v) @(GRpcOWTy p rref r)
RPCTy p
rpc ClientStreamHandler (GRpcIWTy p vref v) (GRpcOWTy p rref r) ()
forall req.
req
-> IO ((), ClientStream (GRpcIWTy p vref v) (GRpcOWTy p rref r) ())
cstream
where cstream :: req
-> IO ((), ClientStream (GRpcIWTy p vref v)
(GRpcOWTy p rref r) ())
cstream :: req
-> IO ((), ClientStream (GRpcIWTy p vref v) (GRpcOWTy p rref r) ())
cstream _ = do
TMChan v
chan <- IO (TMChan v)
forall a. IO (TMChan a)
newTMChanIO :: IO (TMChan v)
let producer :: ConduitT () v m ()
producer = TMChan v -> ConduitT () v m ()
forall (m :: * -> *) a. MonadIO m => TMChan a -> ConduitT () a m ()
sourceTMChan @m TMChan v
chan
Async (GRpcOWTy p rref r)
promise <- IO (GRpcOWTy p rref r) -> IO (Async (GRpcOWTy p rref r))
forall a. IO a -> IO (Async a)
async (ServerErrorIO (GRpcOWTy p rref r) -> IO (GRpcOWTy p rref r)
forall (m :: * -> *) a. MonadIO m => ServerErrorIO a -> m a
raiseErrors (ServerErrorIO (GRpcOWTy p rref r) -> IO (GRpcOWTy p rref r))
-> ServerErrorIO (GRpcOWTy p rref r) -> IO (GRpcOWTy p rref r)
forall a b. (a -> b) -> a -> b
$ Proxy p -> Proxy rref -> r -> GRpcOWTy p rref r
forall (p :: GRpcMessageProtocol) (ref :: TypeRef) r.
GRpcOutputWrapper p ref r =>
Proxy p -> Proxy ref -> r -> GRpcOWTy p ref r
buildGRpcOWTy (Proxy p
forall k (t :: k). Proxy t
Proxy @p) (Proxy rref
forall k (t :: k). Proxy t
Proxy @rref) (r -> GRpcOWTy p rref r)
-> ExceptT ServerError IO r -> ServerErrorIO (GRpcOWTy p rref r)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> m r -> ExceptT ServerError IO r
forall a. m a -> ServerErrorIO a
f (ConduitT () v m () -> m r
h ConduitT () v m ()
producer))
let cstreamHandler :: () -> GRpcIWTy p vref v -> IO ()
cstreamHandler _ newInput :: GRpcIWTy p vref v
newInput
= STM () -> IO ()
forall a. STM a -> IO a
atomically (TMChan v -> v -> STM ()
forall a. TMChan a -> a -> STM ()
writeTMChan TMChan v
chan (Proxy p -> Proxy vref -> GRpcIWTy p vref v -> v
forall (p :: GRpcMessageProtocol) (ref :: TypeRef) r.
GRpcInputWrapper p ref r =>
Proxy p -> Proxy ref -> GRpcIWTy p ref r -> r
unGRpcIWTy (Proxy p
forall k (t :: k). Proxy t
Proxy @p) (Proxy vref
forall k (t :: k). Proxy t
Proxy @vref) GRpcIWTy p vref v
newInput))
cstreamFinalizer :: () -> IO (GRpcOWTy p rref r)
cstreamFinalizer _
= STM () -> IO ()
forall a. STM a -> IO a
atomically (TMChan v -> STM ()
forall a. TMChan a -> STM ()
closeTMChan TMChan v
chan) IO () -> IO (GRpcOWTy p rref r) -> IO (GRpcOWTy p rref r)
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Async (GRpcOWTy p rref r) -> IO (GRpcOWTy p rref r)
forall a. Async a -> IO a
wait Async (GRpcOWTy p rref r)
promise
((), ClientStream (GRpcIWTy p vref v) (GRpcOWTy p rref r) ())
-> IO ((), ClientStream (GRpcIWTy p vref v) (GRpcOWTy p rref r) ())
forall (m :: * -> *) a. Monad m => a -> m a
return ((), (() -> GRpcIWTy p vref v -> IO ())
-> (() -> IO (GRpcOWTy p rref r))
-> ClientStream (GRpcIWTy p vref v) (GRpcOWTy p rref r) ()
forall i o a. (a -> i -> IO a) -> (a -> IO o) -> ClientStream i o a
ClientStream () -> GRpcIWTy p vref v -> IO ()
cstreamHandler () -> IO (GRpcOWTy p rref r)
cstreamFinalizer)
instance (GRpcInputWrapper p vref v, GRpcOutputWrapper p rref r, MonadIO m)
=> GRpcMethodHandler p m '[ 'ArgSingle vref ] ('RetStream rref)
(v -> ConduitT r Void m () -> m ()) where
gRpcMethodHandler :: (forall a. m a -> ServerErrorIO a)
-> Proxy p
-> Proxy '[ 'ArgSingle vref]
-> Proxy ('RetStream rref)
-> RPCTy p
-> (v -> ConduitT r Void m () -> m ())
-> ServiceHandler
gRpcMethodHandler f :: forall a. m a -> ServerErrorIO a
f _ _ _ rpc :: RPCTy p
rpc h :: v -> ConduitT r Void m () -> m ()
h
= RPCTy p
-> ServerStreamHandler (GRpcIWTy p vref v) (GRpcOWTy p rref r) ()
-> ServiceHandler
forall r i o a.
(GRPCInput r i, GRPCOutput r o) =>
r -> ServerStreamHandler i o a -> ServiceHandler
serverStream @_ @(GRpcIWTy p vref v) @(GRpcOWTy p rref r)
RPCTy p
rpc ServerStreamHandler (GRpcIWTy p vref v) (GRpcOWTy p rref r) ()
forall req.
req
-> GRpcIWTy p vref v
-> IO ((), ServerStream (GRpcOWTy p rref r) ())
sstream
where sstream :: req -> GRpcIWTy p vref v
-> IO ((), ServerStream (GRpcOWTy p rref r) ())
sstream :: req
-> GRpcIWTy p vref v
-> IO ((), ServerStream (GRpcOWTy p rref r) ())
sstream _ v :: GRpcIWTy p vref v
v = do
TMVar (Maybe r)
var <- IO (TMVar (Maybe r))
forall a. IO (TMVar a)
newEmptyTMVarIO :: IO (TMVar (Maybe r))
let v' :: v
v' = Proxy p -> Proxy vref -> GRpcIWTy p vref v -> v
forall (p :: GRpcMessageProtocol) (ref :: TypeRef) r.
GRpcInputWrapper p ref r =>
Proxy p -> Proxy ref -> GRpcIWTy p ref r -> r
unGRpcIWTy (Proxy p
forall k (t :: k). Proxy t
Proxy @p) (Proxy vref
forall k (t :: k). Proxy t
Proxy @vref) GRpcIWTy p vref v
v
Async ()
promise <- IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async (ServerErrorIO () -> IO ()
forall (m :: * -> *) a. MonadIO m => ServerErrorIO a -> m a
raiseErrors (ServerErrorIO () -> IO ()) -> ServerErrorIO () -> IO ()
forall a b. (a -> b) -> a -> b
$ m () -> ServerErrorIO ()
forall a. m a -> ServerErrorIO a
f (v -> ConduitT r Void m () -> m ()
h v
v' (TMVar (Maybe r) -> ConduitT r Void m ()
forall (m :: * -> *) r.
MonadIO m =>
TMVar (Maybe r) -> ConduitT r Void m ()
toTMVarConduit TMVar (Maybe r)
var)))
let readNext :: () -> IO (Maybe ((), GRpcOWTy p rref r))
readNext _
= do Maybe r
nextOutput <- STM (Maybe r) -> IO (Maybe r)
forall a. STM a -> IO a
atomically (STM (Maybe r) -> IO (Maybe r)) -> STM (Maybe r) -> IO (Maybe r)
forall a b. (a -> b) -> a -> b
$ TMVar (Maybe r) -> STM (Maybe r)
forall a. TMVar a -> STM a
takeTMVar TMVar (Maybe r)
var
case Maybe r
nextOutput of
Just o :: r
o -> Maybe ((), GRpcOWTy p rref r) -> IO (Maybe ((), GRpcOWTy p rref r))
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe ((), GRpcOWTy p rref r)
-> IO (Maybe ((), GRpcOWTy p rref r)))
-> Maybe ((), GRpcOWTy p rref r)
-> IO (Maybe ((), GRpcOWTy p rref r))
forall a b. (a -> b) -> a -> b
$ ((), GRpcOWTy p rref r) -> Maybe ((), GRpcOWTy p rref r)
forall a. a -> Maybe a
Just ((), Proxy p -> Proxy rref -> r -> GRpcOWTy p rref r
forall (p :: GRpcMessageProtocol) (ref :: TypeRef) r.
GRpcOutputWrapper p ref r =>
Proxy p -> Proxy ref -> r -> GRpcOWTy p ref r
buildGRpcOWTy (Proxy p
forall k (t :: k). Proxy t
Proxy @p) (Proxy rref
forall k (t :: k). Proxy t
Proxy @rref) r
o)
Nothing -> do Async () -> IO ()
forall a. Async a -> IO ()
cancel Async ()
promise
Maybe ((), GRpcOWTy p rref r) -> IO (Maybe ((), GRpcOWTy p rref r))
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe ((), GRpcOWTy p rref r)
forall a. Maybe a
Nothing
((), ServerStream (GRpcOWTy p rref r) ())
-> IO ((), ServerStream (GRpcOWTy p rref r) ())
forall (m :: * -> *) a. Monad m => a -> m a
return ((), (() -> IO (Maybe ((), GRpcOWTy p rref r)))
-> ServerStream (GRpcOWTy p rref r) ()
forall o a. (a -> IO (Maybe (a, o))) -> ServerStream o a
ServerStream () -> IO (Maybe ((), GRpcOWTy p rref r))
readNext)
instance (GRpcInputWrapper p vref v, GRpcOutputWrapper p rref r, MonadIO m)
=> GRpcMethodHandler p m '[ 'ArgStream vref ] ('RetStream rref)
(ConduitT () v m () -> ConduitT r Void m () -> m ()) where
gRpcMethodHandler :: (forall a. m a -> ServerErrorIO a)
-> Proxy p
-> Proxy '[ 'ArgStream vref]
-> Proxy ('RetStream rref)
-> RPCTy p
-> (ConduitT () v m () -> ConduitT r Void m () -> m ())
-> ServiceHandler
gRpcMethodHandler f :: forall a. m a -> ServerErrorIO a
f _ _ _ rpc :: RPCTy p
rpc h :: ConduitT () v m () -> ConduitT r Void m () -> m ()
h
= RPCTy p
-> GeneralStreamHandler
(GRpcIWTy p vref v) (GRpcOWTy p rref r) () ()
-> ServiceHandler
forall r i o a b.
(GRPCInput r i, GRPCOutput r o) =>
r -> GeneralStreamHandler i o a b -> ServiceHandler
generalStream @_ @(GRpcIWTy p vref v) @(GRpcOWTy p rref r)
RPCTy p
rpc GeneralStreamHandler (GRpcIWTy p vref v) (GRpcOWTy p rref r) () ()
forall req.
req
-> IO
((), IncomingStream (GRpcIWTy p vref v) (), (),
OutgoingStream (GRpcOWTy p rref r) ())
bdstream
where bdstream :: req -> IO ( (), IncomingStream (GRpcIWTy p vref v) ()
, (), OutgoingStream (GRpcOWTy p rref r) () )
bdstream :: req
-> IO
((), IncomingStream (GRpcIWTy p vref v) (), (),
OutgoingStream (GRpcOWTy p rref r) ())
bdstream _ = do
TMChan v
chan <- IO (TMChan v)
forall a. IO (TMChan a)
newTMChanIO :: IO (TMChan v)
let producer :: ConduitT () v m ()
producer = TMChan v -> ConduitT () v m ()
forall (m :: * -> *) a. MonadIO m => TMChan a -> ConduitT () a m ()
sourceTMChan @m TMChan v
chan
TMVar (Maybe r)
var <- IO (TMVar (Maybe r))
forall a. IO (TMVar a)
newEmptyTMVarIO :: IO (TMVar (Maybe r))
Async ()
promise <- IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async (ServerErrorIO () -> IO ()
forall (m :: * -> *) a. MonadIO m => ServerErrorIO a -> m a
raiseErrors (ServerErrorIO () -> IO ()) -> ServerErrorIO () -> IO ()
forall a b. (a -> b) -> a -> b
$ m () -> ServerErrorIO ()
forall a. m a -> ServerErrorIO a
f (m () -> ServerErrorIO ()) -> m () -> ServerErrorIO ()
forall a b. (a -> b) -> a -> b
$ ConduitT () v m () -> ConduitT r Void m () -> m ()
h ConduitT () v m ()
producer (TMVar (Maybe r) -> ConduitT r Void m ()
forall (m :: * -> *) r.
MonadIO m =>
TMVar (Maybe r) -> ConduitT r Void m ()
toTMVarConduit TMVar (Maybe r)
var))
let cstreamHandler :: () -> GRpcIWTy p vref v -> IO ()
cstreamHandler _ newInput :: GRpcIWTy p vref v
newInput
= STM () -> IO ()
forall a. STM a -> IO a
atomically (TMChan v -> v -> STM ()
forall a. TMChan a -> a -> STM ()
writeTMChan TMChan v
chan (Proxy p -> Proxy vref -> GRpcIWTy p vref v -> v
forall (p :: GRpcMessageProtocol) (ref :: TypeRef) r.
GRpcInputWrapper p ref r =>
Proxy p -> Proxy ref -> GRpcIWTy p ref r -> r
unGRpcIWTy (Proxy p
forall k (t :: k). Proxy t
Proxy @p) (Proxy vref
forall k (t :: k). Proxy t
Proxy @vref) GRpcIWTy p vref v
newInput))
cstreamFinalizer :: () -> IO ()
cstreamFinalizer _
= STM () -> IO ()
forall a. STM a -> IO a
atomically (TMChan v -> STM ()
forall a. TMChan a -> STM ()
closeTMChan TMChan v
chan) IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Async () -> IO ()
forall a. Async a -> IO a
wait Async ()
promise
readNext :: () -> IO (Maybe ((), GRpcOWTy p rref r))
readNext _
= do Maybe (Maybe r)
nextOutput <- STM (Maybe (Maybe r)) -> IO (Maybe (Maybe r))
forall a. STM a -> IO a
atomically (STM (Maybe (Maybe r)) -> IO (Maybe (Maybe r)))
-> STM (Maybe (Maybe r)) -> IO (Maybe (Maybe r))
forall a b. (a -> b) -> a -> b
$ TMVar (Maybe r) -> STM (Maybe (Maybe r))
forall a. TMVar a -> STM (Maybe a)
tryTakeTMVar TMVar (Maybe r)
var
case Maybe (Maybe r)
nextOutput of
Just (Just o :: r
o) ->
Maybe ((), GRpcOWTy p rref r) -> IO (Maybe ((), GRpcOWTy p rref r))
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe ((), GRpcOWTy p rref r)
-> IO (Maybe ((), GRpcOWTy p rref r)))
-> Maybe ((), GRpcOWTy p rref r)
-> IO (Maybe ((), GRpcOWTy p rref r))
forall a b. (a -> b) -> a -> b
$ ((), GRpcOWTy p rref r) -> Maybe ((), GRpcOWTy p rref r)
forall a. a -> Maybe a
Just ((), Proxy p -> Proxy rref -> r -> GRpcOWTy p rref r
forall (p :: GRpcMessageProtocol) (ref :: TypeRef) r.
GRpcOutputWrapper p ref r =>
Proxy p -> Proxy ref -> r -> GRpcOWTy p ref r
buildGRpcOWTy (Proxy p
forall k (t :: k). Proxy t
Proxy @p) (Proxy rref
forall k (t :: k). Proxy t
Proxy @rref) r
o)
Just Nothing -> do
Async () -> IO ()
forall a. Async a -> IO ()
cancel Async ()
promise
Maybe ((), GRpcOWTy p rref r) -> IO (Maybe ((), GRpcOWTy p rref r))
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe ((), GRpcOWTy p rref r)
forall a. Maybe a
Nothing
Nothing ->
() -> IO (Maybe ((), GRpcOWTy p rref r))
readNext ()
((), IncomingStream (GRpcIWTy p vref v) (), (),
OutgoingStream (GRpcOWTy p rref r) ())
-> IO
((), IncomingStream (GRpcIWTy p vref v) (), (),
OutgoingStream (GRpcOWTy p rref r) ())
forall (m :: * -> *) a. Monad m => a -> m a
return ((), (() -> GRpcIWTy p vref v -> IO ())
-> (() -> IO ()) -> IncomingStream (GRpcIWTy p vref v) ()
forall i a. (a -> i -> IO a) -> (a -> IO ()) -> IncomingStream i a
IncomingStream () -> GRpcIWTy p vref v -> IO ()
cstreamHandler () -> IO ()
cstreamFinalizer, (), (() -> IO (Maybe ((), GRpcOWTy p rref r)))
-> OutgoingStream (GRpcOWTy p rref r) ()
forall o a. (a -> IO (Maybe (a, o))) -> OutgoingStream o a
OutgoingStream () -> IO (Maybe ((), GRpcOWTy p rref r))
readNext)
toTMVarConduit :: MonadIO m => TMVar (Maybe r) -> ConduitT r Void m ()
toTMVarConduit :: TMVar (Maybe r) -> ConduitT r Void m ()
toTMVarConduit var :: TMVar (Maybe r)
var = do
Maybe r
x <- ConduitT r Void m (Maybe r)
forall (m :: * -> *) i. Monad m => Consumer i m (Maybe i)
await
IO () -> ConduitT r Void m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ConduitT r Void m ()) -> IO () -> ConduitT r Void m ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMVar (Maybe r) -> Maybe r -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar TMVar (Maybe r)
var Maybe r
x
TMVar (Maybe r) -> ConduitT r Void m ()
forall (m :: * -> *) r.
MonadIO m =>
TMVar (Maybe r) -> ConduitT r Void m ()
toTMVarConduit TMVar (Maybe r)
var