{-# language DataKinds             #-}
{-# language FlexibleContexts      #-}
{-# language FlexibleInstances     #-}
{-# language GADTs                 #-}
{-# language MultiParamTypeClasses #-}
{-# language PolyKinds             #-}
{-# language RankNTypes            #-}
{-# language ScopedTypeVariables   #-}
{-# language TypeApplications      #-}
{-# language TypeFamilies          #-}
{-# language TypeOperators         #-}
{-# language UndecidableInstances  #-}
{-|
Description : Execute a Mu 'Server' using gRPC as transport layer

This module allows you to server a Mu 'Server'
as a WAI 'Application' using gRPC as transport layer.

The simples way is to use 'runGRpcApp', all other
variants provide more control over the settings.
-}
module Mu.GRpc.Server
( -- * Supported messaging formats
  GRpcMessageProtocol(..)
, msgProtoBuf, msgAvro
  -- * Run a 'Server' directly
, runGRpcApp, runGRpcAppTrans
, runGRpcAppSettings, Settings
, runGRpcAppTLS, TLSSettings
  -- * Convert a 'Server' into a WAI application
, gRpcApp, gRpcAppTrans
  -- * Raise errors as exceptions in IO
, raiseErrors, liftServerConduit
  -- * Re-export useful instances
, module Avro
) where

import           Control.Concurrent.Async
import           Control.Concurrent.STM             (atomically)
import           Control.Concurrent.STM.TMVar
import           Control.Exception
import           Control.Monad.Except
import           Data.Avro
import           Data.ByteString                    (ByteString)
import qualified Data.ByteString.Char8              as BS
import           Data.Conduit
import           Data.Conduit.TMChan
import           Data.Kind
import           Data.Proxy
import           GHC.TypeLits
import           Network.GRPC.HTTP2.Encoding        (GRPCInput, GRPCOutput, gzip, uncompressed)
import           Network.GRPC.HTTP2.Types           (GRPCStatus (..), GRPCStatusCode (..))
import           Network.GRPC.Server.Handlers.Trans
import           Network.GRPC.Server.Wai            as Wai
import           Network.Wai                        (Application, Request, requestHeaders)
import           Network.Wai.Handler.Warp           (Port, Settings, run, runSettings)
import           Network.Wai.Handler.WarpTLS        (TLSSettings, runTLS)

import           Mu.Adapter.ProtoBuf.Via
import           Mu.GRpc.Avro
import qualified Mu.GRpc.Avro                       as Avro
import           Mu.GRpc.Bridge
import           Mu.Rpc
import           Mu.Schema
import           Mu.Server

-- | Run a Mu 'Server' on the given port.
runGRpcApp
  :: ( KnownName name
     , GRpcServiceHandlers ('Package ('Just name) services)
                           protocol ServerErrorIO chn services handlers )
  => Proxy protocol
  -> Port
  -> ServerT chn () ('Package ('Just name) services) ServerErrorIO handlers
  -> IO ()
runGRpcApp :: Proxy protocol
-> Port
-> ServerT
     chn () ('Package ('Just name) services) ServerErrorIO handlers
-> IO ()
runGRpcApp protocol :: Proxy protocol
protocol port :: Port
port = Proxy protocol
-> Port
-> (forall a. ServerErrorIO a -> ServerErrorIO a)
-> ServerT
     chn () ('Package ('Just name) services) ServerErrorIO handlers
-> IO ()
forall serviceName mnm anm (name :: serviceName)
       (services :: [Service serviceName mnm anm (TypeRef serviceName)])
       (protocol :: GRpcMessageProtocol) (m :: * -> *)
       (chn :: ServiceChain serviceName) (handlers :: [[*]]).
(KnownName name,
 GRpcServiceHandlers
   ('Package ('Just name) services)
   protocol
   m
   chn
   services
   handlers) =>
Proxy protocol
-> Port
-> (forall a. m a -> ServerErrorIO a)
-> ServerT chn () ('Package ('Just name) services) m handlers
-> IO ()
runGRpcAppTrans Proxy protocol
protocol Port
port forall a. a -> a
forall a. ServerErrorIO a -> ServerErrorIO a
id

-- | Run a Mu 'Server' on the given port.
runGRpcAppTrans
  :: ( KnownName name
     , GRpcServiceHandlers ('Package ('Just name) services)
                           protocol m chn services handlers )
  => Proxy protocol
  -> Port
  -> (forall a. m a -> ServerErrorIO a)
  -> ServerT chn () ('Package ('Just name) services) m handlers
  -> IO ()
runGRpcAppTrans :: Proxy protocol
-> Port
-> (forall a. m a -> ServerErrorIO a)
-> ServerT chn () ('Package ('Just name) services) m handlers
-> IO ()
runGRpcAppTrans protocol :: Proxy protocol
protocol port :: Port
port f :: forall a. m a -> ServerErrorIO a
f svr :: ServerT chn () ('Package ('Just name) services) m handlers
svr = Port -> Application -> IO ()
run Port
port (Proxy protocol
-> (forall a. m a -> ServerErrorIO a)
-> ServerT chn () ('Package ('Just name) services) m handlers
-> Application
forall serviceName mnm anm (name :: serviceName)
       (services :: [Service serviceName mnm anm (TypeRef serviceName)])
       (protocol :: GRpcMessageProtocol) (m :: * -> *)
       (chn :: ServiceChain serviceName) (handlers :: [[*]]).
(KnownName name,
 GRpcServiceHandlers
   ('Package ('Just name) services)
   protocol
   m
   chn
   services
   handlers) =>
Proxy protocol
-> (forall a. m a -> ServerErrorIO a)
-> ServerT chn () ('Package ('Just name) services) m handlers
-> Application
gRpcAppTrans Proxy protocol
protocol forall a. m a -> ServerErrorIO a
f ServerT chn () ('Package ('Just name) services) m handlers
svr)

-- | Run a Mu 'Server' using the given 'Settings'.
--
--   Go to 'Network.Wai.Handler.Warp' to declare 'Settings'.
runGRpcAppSettings
  :: ( KnownName name
     , GRpcServiceHandlers ('Package ('Just name) services)
                           protocol m chn services handlers )
  => Proxy protocol
  -> Settings
  -> (forall a. m a -> ServerErrorIO a)
  -> ServerT chn () ('Package ('Just name) services) m handlers
  -> IO ()
runGRpcAppSettings :: Proxy protocol
-> Settings
-> (forall a. m a -> ServerErrorIO a)
-> ServerT chn () ('Package ('Just name) services) m handlers
-> IO ()
runGRpcAppSettings protocol :: Proxy protocol
protocol st :: Settings
st f :: forall a. m a -> ServerErrorIO a
f svr :: ServerT chn () ('Package ('Just name) services) m handlers
svr = Settings -> Application -> IO ()
runSettings Settings
st (Proxy protocol
-> (forall a. m a -> ServerErrorIO a)
-> ServerT chn () ('Package ('Just name) services) m handlers
-> Application
forall serviceName mnm anm (name :: serviceName)
       (services :: [Service serviceName mnm anm (TypeRef serviceName)])
       (protocol :: GRpcMessageProtocol) (m :: * -> *)
       (chn :: ServiceChain serviceName) (handlers :: [[*]]).
(KnownName name,
 GRpcServiceHandlers
   ('Package ('Just name) services)
   protocol
   m
   chn
   services
   handlers) =>
Proxy protocol
-> (forall a. m a -> ServerErrorIO a)
-> ServerT chn () ('Package ('Just name) services) m handlers
-> Application
gRpcAppTrans Proxy protocol
protocol forall a. m a -> ServerErrorIO a
f ServerT chn () ('Package ('Just name) services) m handlers
svr)

-- | Run a Mu 'Server' using the given 'TLSSettings' and 'Settings'.
--
--   Go to 'Network.Wai.Handler.WarpTLS' to declare 'TLSSettings'
--   and to 'Network.Wai.Handler.Warp' to declare 'Settings'.
runGRpcAppTLS
  :: ( KnownName name
     , GRpcServiceHandlers ('Package ('Just name) services)
                           protocol m chn services handlers )
  => Proxy protocol
  -> TLSSettings -> Settings
  -> (forall a. m a -> ServerErrorIO a)
  -> ServerT chn () ('Package ('Just name) services) m handlers
  -> IO ()
runGRpcAppTLS :: Proxy protocol
-> TLSSettings
-> Settings
-> (forall a. m a -> ServerErrorIO a)
-> ServerT chn () ('Package ('Just name) services) m handlers
-> IO ()
runGRpcAppTLS protocol :: Proxy protocol
protocol tls :: TLSSettings
tls st :: Settings
st f :: forall a. m a -> ServerErrorIO a
f svr :: ServerT chn () ('Package ('Just name) services) m handlers
svr = TLSSettings -> Settings -> Application -> IO ()
runTLS TLSSettings
tls Settings
st (Proxy protocol
-> (forall a. m a -> ServerErrorIO a)
-> ServerT chn () ('Package ('Just name) services) m handlers
-> Application
forall serviceName mnm anm (name :: serviceName)
       (services :: [Service serviceName mnm anm (TypeRef serviceName)])
       (protocol :: GRpcMessageProtocol) (m :: * -> *)
       (chn :: ServiceChain serviceName) (handlers :: [[*]]).
(KnownName name,
 GRpcServiceHandlers
   ('Package ('Just name) services)
   protocol
   m
   chn
   services
   handlers) =>
Proxy protocol
-> (forall a. m a -> ServerErrorIO a)
-> ServerT chn () ('Package ('Just name) services) m handlers
-> Application
gRpcAppTrans Proxy protocol
protocol forall a. m a -> ServerErrorIO a
f ServerT chn () ('Package ('Just name) services) m handlers
svr)

-- | Turn a Mu 'Server' into a WAI 'Application'.
--
--   These 'Application's can be later combined using,
--   for example, @wai-routes@, or you can add middleware
--   from @wai-extra@, among others.
gRpcApp
  :: ( KnownName name
     , GRpcServiceHandlers ('Package ('Just name) services)
                           protocol ServerErrorIO chn services handlers )
  => Proxy protocol
  -> ServerT chn () ('Package ('Just name) services) ServerErrorIO handlers
  -> Application
gRpcApp :: Proxy protocol
-> ServerT
     chn () ('Package ('Just name) services) ServerErrorIO handlers
-> Application
gRpcApp protocol :: Proxy protocol
protocol = Proxy protocol
-> (forall a. ServerErrorIO a -> ServerErrorIO a)
-> ServerT
     chn () ('Package ('Just name) services) ServerErrorIO handlers
-> Application
forall serviceName mnm anm (name :: serviceName)
       (services :: [Service serviceName mnm anm (TypeRef serviceName)])
       (protocol :: GRpcMessageProtocol) (m :: * -> *)
       (chn :: ServiceChain serviceName) (handlers :: [[*]]).
(KnownName name,
 GRpcServiceHandlers
   ('Package ('Just name) services)
   protocol
   m
   chn
   services
   handlers) =>
Proxy protocol
-> (forall a. m a -> ServerErrorIO a)
-> ServerT chn () ('Package ('Just name) services) m handlers
-> Application
gRpcAppTrans Proxy protocol
protocol forall a. a -> a
forall a. ServerErrorIO a -> ServerErrorIO a
id

-- | Turn a Mu 'Server' into a WAI 'Application'.
--
--   These 'Application's can be later combined using,
--   for example, @wai-routes@, or you can add middleware
--   from @wai-extra@, among others.
gRpcAppTrans
  :: ( KnownName name
     , GRpcServiceHandlers ('Package ('Just name) services)
                           protocol m chn services handlers )
  => Proxy protocol
  -> (forall a. m a -> ServerErrorIO a)
  -> ServerT chn () ('Package ('Just name) services) m handlers
  -> Application
gRpcAppTrans :: Proxy protocol
-> (forall a. m a -> ServerErrorIO a)
-> ServerT chn () ('Package ('Just name) services) m handlers
-> Application
gRpcAppTrans protocol :: Proxy protocol
protocol f :: forall a. m a -> ServerErrorIO a
f svr :: ServerT chn () ('Package ('Just name) services) m handlers
svr
  = [Compression] -> [ServiceHandler] -> Application
Wai.grpcApp [Compression
uncompressed, Compression
gzip]
                (Proxy protocol
-> (forall a. m a -> ServerErrorIO a)
-> ServerT chn () ('Package ('Just name) services) m handlers
-> [ServiceHandler]
forall serviceName mnm anm (name :: serviceName)
       (services :: [Service serviceName mnm anm (TypeRef serviceName)])
       (handlers :: [[*]]) (m :: * -> *) (protocol :: GRpcMessageProtocol)
       (chn :: ServiceChain serviceName).
(KnownName name,
 GRpcServiceHandlers
   ('Package ('Just name) services)
   protocol
   m
   chn
   services
   handlers) =>
Proxy protocol
-> (forall a. m a -> ServerErrorIO a)
-> ServerT chn () ('Package ('Just name) services) m handlers
-> [ServiceHandler]
gRpcServerHandlers Proxy protocol
protocol forall a. m a -> ServerErrorIO a
f ServerT chn () ('Package ('Just name) services) m handlers
svr)

gRpcServerHandlers
  :: forall name services handlers m protocol chn.
     ( KnownName name
     , GRpcServiceHandlers ('Package ('Just name) services)
                           protocol m chn services handlers )
  => Proxy protocol
  -> (forall a. m a -> ServerErrorIO a)
  -> ServerT chn () ('Package ('Just name) services) m handlers
  -> [ServiceHandler]
gRpcServerHandlers :: Proxy protocol
-> (forall a. m a -> ServerErrorIO a)
-> ServerT chn () ('Package ('Just name) services) m handlers
-> [ServiceHandler]
gRpcServerHandlers pr :: Proxy protocol
pr f :: forall a. m a -> ServerErrorIO a
f (Services svr :: ServicesT chn () s1 m handlers
svr)
  = (forall a. m a -> ServerErrorIO a)
-> Proxy ('Package ('Just name) services)
-> Proxy protocol
-> ByteString
-> ServicesT chn () s1 m handlers
-> [ServiceHandler]
forall snm mnm anm (fullP :: Package snm mnm anm (TypeRef snm))
       (p :: GRpcMessageProtocol) (m :: * -> *) (chn :: ServiceChain snm)
       (ss :: [Service snm mnm anm (TypeRef snm)]) (hs :: [[*]]).
GRpcServiceHandlers fullP p m chn ss hs =>
(forall a. m a -> ServerErrorIO a)
-> Proxy fullP
-> Proxy p
-> ByteString
-> ServicesT chn () ss m hs
-> [ServiceHandler]
gRpcServiceHandlers forall a. m a -> ServerErrorIO a
f (Proxy ('Package ('Just name) services)
forall k (t :: k). Proxy t
Proxy @('Package ('Just name) services)) Proxy protocol
pr ByteString
packageName ServicesT chn () s1 m handlers
svr
  where packageName :: ByteString
packageName = String -> ByteString
BS.pack (Proxy name -> String
forall k (a :: k) (proxy :: k -> *).
KnownName a =>
proxy a -> String
nameVal (Proxy name
forall k (t :: k). Proxy t
Proxy @name))

class GRpcServiceHandlers (fullP :: Package snm mnm anm (TypeRef snm))
                          (p :: GRpcMessageProtocol) (m :: Type -> Type)
                          (chn :: ServiceChain snm)
                          (ss :: [Service snm mnm anm (TypeRef snm)]) (hs :: [[Type]]) where
  gRpcServiceHandlers :: (forall a. m a -> ServerErrorIO a)
                      -> Proxy fullP -> Proxy p -> ByteString
                      -> ServicesT chn () ss m hs -> [ServiceHandler]

instance GRpcServiceHandlers fullP p m chn '[] '[] where
  gRpcServiceHandlers :: (forall a. m a -> ServerErrorIO a)
-> Proxy fullP
-> Proxy p
-> ByteString
-> ServicesT chn () '[] m '[]
-> [ServiceHandler]
gRpcServiceHandlers _ _ _ _ S0 = []
instance ( KnownName name
         , GRpcMethodHandlers fullP ('Service name methods)
                              p m chn (MappingRight chn name) methods h
         , GRpcServiceHandlers fullP p m chn rest hs )
         => GRpcServiceHandlers fullP p m chn ('Service name methods ': rest) (h ': hs) where
  gRpcServiceHandlers :: (forall a. m a -> ServerErrorIO a)
-> Proxy fullP
-> Proxy p
-> ByteString
-> ServicesT chn () ('Service name methods : rest) m (h : hs)
-> [ServiceHandler]
gRpcServiceHandlers f :: forall a. m a -> ServerErrorIO a
f pfullP :: Proxy fullP
pfullP pr :: Proxy p
pr packageName :: ByteString
packageName (ProperSvc svr :: HandlersT chn () (MappingRight chn sname) methods m hs1
svr :<&>: rest :: ServicesT chn () rest m hss
rest)
    =  (forall a. m a -> ServerErrorIO a)
-> Proxy fullP
-> Proxy ('Service name methods)
-> Proxy p
-> ByteString
-> ByteString
-> HandlersT chn () (MappingRight chn name) methods m hs1
-> [ServiceHandler]
forall snm mnm anm (fullP :: Package snm mnm anm (TypeRef snm))
       (fullS :: Service snm mnm anm (TypeRef snm))
       (p :: GRpcMessageProtocol) (m :: * -> *) (chn :: ServiceChain snm)
       inh (ms :: [Method snm mnm anm (TypeRef snm)]) (hs :: [*]).
GRpcMethodHandlers fullP fullS p m chn inh ms hs =>
(forall a. m a -> ServerErrorIO a)
-> Proxy fullP
-> Proxy fullS
-> Proxy p
-> ByteString
-> ByteString
-> HandlersT chn () inh ms m hs
-> [ServiceHandler]
gRpcMethodHandlers forall a. m a -> ServerErrorIO a
f Proxy fullP
pfullP (Proxy ('Service name methods)
forall k (t :: k). Proxy t
Proxy @('Service name methods)) Proxy p
pr
                          ByteString
packageName ByteString
serviceName HandlersT chn () (MappingRight chn name) methods m hs1
HandlersT chn () (MappingRight chn sname) methods m hs1
svr
    [ServiceHandler] -> [ServiceHandler] -> [ServiceHandler]
forall a. [a] -> [a] -> [a]
++ (forall a. m a -> ServerErrorIO a)
-> Proxy fullP
-> Proxy p
-> ByteString
-> ServicesT chn () rest m hss
-> [ServiceHandler]
forall snm mnm anm (fullP :: Package snm mnm anm (TypeRef snm))
       (p :: GRpcMessageProtocol) (m :: * -> *) (chn :: ServiceChain snm)
       (ss :: [Service snm mnm anm (TypeRef snm)]) (hs :: [[*]]).
GRpcServiceHandlers fullP p m chn ss hs =>
(forall a. m a -> ServerErrorIO a)
-> Proxy fullP
-> Proxy p
-> ByteString
-> ServicesT chn () ss m hs
-> [ServiceHandler]
gRpcServiceHandlers forall a. m a -> ServerErrorIO a
f Proxy fullP
pfullP Proxy p
pr ByteString
packageName ServicesT chn () rest m hss
rest
    where serviceName :: ByteString
serviceName = String -> ByteString
BS.pack (Proxy name -> String
forall k (a :: k) (proxy :: k -> *).
KnownName a =>
proxy a -> String
nameVal (Proxy name
forall k (t :: k). Proxy t
Proxy @name))

instance ( GHC.TypeLits.TypeError ('Text "unions are not supported in gRPC") )
         => GRpcServiceHandlers fullP p m chn ('OneOf name methods ': rest) hs where
  gRpcServiceHandlers :: (forall a. m a -> ServerErrorIO a)
-> Proxy fullP
-> Proxy p
-> ByteString
-> ServicesT chn () ('OneOf name methods : rest) m hs
-> [ServiceHandler]
gRpcServiceHandlers _ = String
-> Proxy fullP
-> Proxy p
-> ByteString
-> ServicesT chn () ('OneOf name methods : rest) m hs
-> [ServiceHandler]
forall a. HasCallStack => String -> a
error "unions are not supported in gRPC"

class GRpcMethodHandlers (fullP :: Package snm mnm anm (TypeRef snm))
                         (fullS :: Service snm mnm anm (TypeRef snm))
                         (p :: GRpcMessageProtocol) (m :: Type -> Type)
                         (chn :: ServiceChain snm) (inh :: Type)
                         (ms :: [Method snm mnm anm (TypeRef snm)]) (hs :: [Type]) where
  gRpcMethodHandlers :: (forall a. m a -> ServerErrorIO a)
                     -> Proxy fullP -> Proxy fullS -> Proxy p -> ByteString -> ByteString
                     -> HandlersT chn () inh ms m hs -> [ServiceHandler]

instance GRpcMethodHandlers fullP fullS p m chn inh '[] '[] where
  gRpcMethodHandlers :: (forall a. m a -> ServerErrorIO a)
-> Proxy fullP
-> Proxy fullS
-> Proxy p
-> ByteString
-> ByteString
-> HandlersT chn () inh '[] m '[]
-> [ServiceHandler]
gRpcMethodHandlers _ _ _ _ _ _ H0 = []
instance ( KnownName name, MkRPC p
         , ReflectRpcInfo fullP fullS ('Method name args r)
         , GRpcMethodHandler p m args r h
         , GRpcMethodHandlers fullP fullS p m chn () rest hs)
         => GRpcMethodHandlers fullP fullS p m chn ()
                               ('Method name args r ': rest) (h ': hs) where
  gRpcMethodHandlers :: (forall a. m a -> ServerErrorIO a)
-> Proxy fullP
-> Proxy fullS
-> Proxy p
-> ByteString
-> ByteString
-> HandlersT chn () () ('Method name args r : rest) m (h : hs)
-> [ServiceHandler]
gRpcMethodHandlers f :: forall a. m a -> ServerErrorIO a
f pfullP :: Proxy fullP
pfullP pfullS :: Proxy fullS
pfullS pr :: Proxy p
pr p :: ByteString
p s :: ByteString
s (Hmore _ _ h :: RpcInfo () -> () -> h
h rest :: HandlersT chn () () ms m hs1
rest)
    = (forall a. m a -> ServerErrorIO a)
-> Proxy p
-> Proxy args
-> Proxy r
-> RPCTy p
-> (Request -> h)
-> ServiceHandler
forall k snm anm (p :: GRpcMessageProtocol) (m :: * -> *)
       (args :: [Argument snm anm (TypeRef snm)]) (r :: k) h.
GRpcMethodHandler p m args r h =>
(forall a. m a -> ServerErrorIO a)
-> Proxy p
-> Proxy args
-> Proxy r
-> RPCTy p
-> (Request -> h)
-> ServiceHandler
gRpcMethodHandler forall a. m a -> ServerErrorIO a
f Proxy p
pr (Proxy args
forall k (t :: k). Proxy t
Proxy @args) (Proxy r
forall k (t :: k). Proxy t
Proxy @r) (Proxy p -> ByteString -> ByteString -> ByteString -> RPCTy p
forall (p :: GRpcMessageProtocol).
MkRPC p =>
Proxy p -> ByteString -> ByteString -> ByteString -> RPCTy p
mkRPC Proxy p
pr ByteString
p ByteString
s ByteString
methodName)
                        (\req :: Request
req -> RpcInfo () -> () -> h
h (RequestHeaders -> RpcInfo ()
reflectInfo (Request -> RequestHeaders
requestHeaders Request
req)) ())
      ServiceHandler -> [ServiceHandler] -> [ServiceHandler]
forall a. a -> [a] -> [a]
: (forall a. m a -> ServerErrorIO a)
-> Proxy fullP
-> Proxy fullS
-> Proxy p
-> ByteString
-> ByteString
-> HandlersT chn () () ms m hs1
-> [ServiceHandler]
forall snm mnm anm (fullP :: Package snm mnm anm (TypeRef snm))
       (fullS :: Service snm mnm anm (TypeRef snm))
       (p :: GRpcMessageProtocol) (m :: * -> *) (chn :: ServiceChain snm)
       inh (ms :: [Method snm mnm anm (TypeRef snm)]) (hs :: [*]).
GRpcMethodHandlers fullP fullS p m chn inh ms hs =>
(forall a. m a -> ServerErrorIO a)
-> Proxy fullP
-> Proxy fullS
-> Proxy p
-> ByteString
-> ByteString
-> HandlersT chn () inh ms m hs
-> [ServiceHandler]
gRpcMethodHandlers forall a. m a -> ServerErrorIO a
f Proxy fullP
pfullP Proxy fullS
pfullS Proxy p
pr ByteString
p ByteString
s HandlersT chn () () ms m hs1
rest
    where methodName :: ByteString
methodName = String -> ByteString
BS.pack (Proxy name -> String
forall k (a :: k) (proxy :: k -> *).
KnownName a =>
proxy a -> String
nameVal (Proxy name
forall k (t :: k). Proxy t
Proxy @name))
          reflectInfo :: RequestHeaders -> RpcInfo ()
reflectInfo hdrs :: RequestHeaders
hdrs
            = Proxy fullP
-> Proxy fullS
-> Proxy ('Method name args r)
-> RequestHeaders
-> ()
-> RpcInfo ()
forall (p :: Package Symbol Symbol Symbol (TypeRef Symbol))
       (s :: Service Symbol Symbol Symbol (TypeRef Symbol))
       (m :: Method Symbol Symbol Symbol (TypeRef Symbol)) i.
ReflectRpcInfo p s m =>
Proxy p -> Proxy s -> Proxy m -> RequestHeaders -> i -> RpcInfo i
reflectRpcInfo Proxy fullP
pfullP Proxy fullS
pfullS (Proxy ('Method name args r)
forall k (t :: k). Proxy t
Proxy @('Method name args r)) RequestHeaders
hdrs ()

class GRpcMethodHandler p m (args :: [Argument snm anm (TypeRef snm)]) r h where
  gRpcMethodHandler :: (forall a. m a -> ServerErrorIO a)
                    -> Proxy p -> Proxy args -> Proxy r
                    -> RPCTy p -> (Request -> h) -> ServiceHandler

-- | Turns a 'Conduit' working on 'ServerErrorIO'
--   into any other base monad which supports 'IO',
--   by raising any error as an exception.
--
--   This function is useful to interoperate with
--   libraries which generate 'Conduit's with other
--   base monads, such as @persistent@.
liftServerConduit
  :: MonadIO m
  => ConduitT a b ServerErrorIO r -> ConduitT a b m r
liftServerConduit :: ConduitT a b ServerErrorIO r -> ConduitT a b m r
liftServerConduit = (forall a. ServerErrorIO a -> m a)
-> ConduitT a b ServerErrorIO r -> ConduitT a b m r
forall (m :: * -> *) (n :: * -> *) i o r.
Monad m =>
(forall a. m a -> n a) -> ConduitT i o m r -> ConduitT i o n r
transPipe forall a. ServerErrorIO a -> m a
forall (m :: * -> *) a. MonadIO m => ServerErrorIO a -> m a
raiseErrors

-- | Raises errors from 'ServerErrorIO' as exceptions
--   in a monad which supports 'IO'.
--
--   This function is useful to interoperate with other
--   libraries which cannot handle the additional error
--   layer. In particular, with Conduit, as witnessed
--   by 'liftServerConduit'.
raiseErrors :: MonadIO m => ServerErrorIO a -> m a
raiseErrors :: ServerErrorIO a -> m a
raiseErrors h :: ServerErrorIO a
h
  = IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO a -> m a) -> IO a -> m a
forall a b. (a -> b) -> a -> b
$ do
      Either ServerError a
h' <- ServerErrorIO a -> IO (Either ServerError a)
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT ServerErrorIO a
h
      case Either ServerError a
h' of
        Right r :: a
r -> a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure a
r
        Left (ServerError code :: ServerErrorCode
code msg :: String
msg)
          -> GRPCStatus -> IO a
forall (m :: * -> *) a. MonadIO m => GRPCStatus -> m a
closeEarly (GRPCStatus -> IO a) -> GRPCStatus -> IO a
forall a b. (a -> b) -> a -> b
$ GRPCStatusCode -> ByteString -> GRPCStatus
GRPCStatus (ServerErrorCode -> GRPCStatusCode
serverErrorToGRpcError ServerErrorCode
code)
                                     (String -> ByteString
BS.pack String
msg)
    IO a -> [Handler a] -> IO a
forall a. IO a -> [Handler a] -> IO a
`catches`
    [ (GRPCStatus -> IO a) -> Handler a
forall a e. Exception e => (e -> IO a) -> Handler a
Handler (\(GRPCStatus
e :: GRPCStatus) -> GRPCStatus -> IO a
forall e a. Exception e => e -> IO a
throwIO GRPCStatus
e)
    , (SomeException -> IO a) -> Handler a
forall a e. Exception e => (e -> IO a) -> Handler a
Handler (\(SomeException
e :: SomeException) -> GRPCStatus -> IO a
forall (m :: * -> *) a. MonadIO m => GRPCStatus -> m a
closeEarly (GRPCStatus -> IO a) -> GRPCStatus -> IO a
forall a b. (a -> b) -> a -> b
$ GRPCStatusCode -> ByteString -> GRPCStatus
GRPCStatus GRPCStatusCode
INTERNAL (String -> ByteString
BS.pack (String -> ByteString) -> String -> ByteString
forall a b. (a -> b) -> a -> b
$ SomeException -> String
forall a. Show a => a -> String
show SomeException
e))
    ]

  where
    serverErrorToGRpcError :: ServerErrorCode -> GRPCStatusCode
    serverErrorToGRpcError :: ServerErrorCode -> GRPCStatusCode
serverErrorToGRpcError Unknown         = GRPCStatusCode
UNKNOWN
    serverErrorToGRpcError Unavailable     = GRPCStatusCode
UNAVAILABLE
    serverErrorToGRpcError Unimplemented   = GRPCStatusCode
UNIMPLEMENTED
    serverErrorToGRpcError Unauthenticated = GRPCStatusCode
UNAUTHENTICATED
    serverErrorToGRpcError Internal        = GRPCStatusCode
INTERNAL
    serverErrorToGRpcError NotFound        = GRPCStatusCode
NOT_FOUND
    serverErrorToGRpcError Invalid         = GRPCStatusCode
INVALID_ARGUMENT

-----
-- IMPLEMENTATION OF THE METHODS
-----

-- These type classes allow us to abstract over
-- the choice of message protocol (PB or Avro)

class GRPCOutput (RPCTy p) (GRpcOWTy p ref r)
      => GRpcOutputWrapper (p :: GRpcMessageProtocol) (ref :: TypeRef snm) (r :: Type) where
  type GRpcOWTy p ref r :: Type
  buildGRpcOWTy :: Proxy p -> Proxy ref -> r -> GRpcOWTy p ref r

instance ToProtoBufTypeRef ref r
         => GRpcOutputWrapper 'MsgProtoBuf ref r where
  type GRpcOWTy 'MsgProtoBuf ref r = ViaToProtoBufTypeRef ref r
  buildGRpcOWTy :: Proxy 'MsgProtoBuf -> Proxy ref -> r -> GRpcOWTy 'MsgProtoBuf ref r
buildGRpcOWTy _ _ = r -> GRpcOWTy 'MsgProtoBuf ref r
forall snm (ref :: TypeRef snm) t. t -> ViaToProtoBufTypeRef ref t
ViaToProtoBufTypeRef

instance forall (sch :: Schema') sty (r :: Type).
         ( ToSchema sch sty r
         , ToAvro (WithSchema sch sty r)
         , HasAvroSchema (WithSchema sch sty r) )
         => GRpcOutputWrapper 'MsgAvro ('SchemaRef sch sty) r where
  type GRpcOWTy 'MsgAvro ('SchemaRef sch sty) r = ViaToAvroTypeRef ('SchemaRef sch sty) r
  buildGRpcOWTy :: Proxy 'MsgAvro
-> Proxy ('SchemaRef sch sty)
-> r
-> GRpcOWTy 'MsgAvro ('SchemaRef sch sty) r
buildGRpcOWTy _ _ = r -> GRpcOWTy 'MsgAvro ('SchemaRef sch sty) r
forall snm (ref :: TypeRef snm) t. t -> ViaToAvroTypeRef ref t
ViaToAvroTypeRef

class GRPCInput (RPCTy p) (GRpcIWTy p ref r)
      => GRpcInputWrapper (p :: GRpcMessageProtocol) (ref :: TypeRef snm) (r :: Type) where
  type GRpcIWTy p ref r :: Type
  unGRpcIWTy :: Proxy p -> Proxy ref -> GRpcIWTy p ref r -> r

instance FromProtoBufTypeRef ref r
         => GRpcInputWrapper 'MsgProtoBuf ref r where
  type GRpcIWTy 'MsgProtoBuf ref r = ViaFromProtoBufTypeRef ref r
  unGRpcIWTy :: Proxy 'MsgProtoBuf -> Proxy ref -> GRpcIWTy 'MsgProtoBuf ref r -> r
unGRpcIWTy _ _ = GRpcIWTy 'MsgProtoBuf ref r -> r
forall snm (ref :: TypeRef snm) t.
ViaFromProtoBufTypeRef ref t -> t
unViaFromProtoBufTypeRef

instance forall (sch :: Schema') sty (r :: Type).
         ( FromSchema sch sty r
         , FromAvro (WithSchema sch sty r)
         , HasAvroSchema (WithSchema sch sty r) )
         => GRpcInputWrapper 'MsgAvro ('SchemaRef sch sty) r where
  type GRpcIWTy 'MsgAvro ('SchemaRef sch sty) r = ViaFromAvroTypeRef ('SchemaRef sch sty) r
  unGRpcIWTy :: Proxy 'MsgAvro
-> Proxy ('SchemaRef sch sty)
-> GRpcIWTy 'MsgAvro ('SchemaRef sch sty) r
-> r
unGRpcIWTy _ _ = GRpcIWTy 'MsgAvro ('SchemaRef sch sty) r -> r
forall snm (ref :: TypeRef snm) t. ViaFromAvroTypeRef ref t -> t
unViaFromAvroTypeRef

---

instance (MonadIO m, GRPCInput (RPCTy p) (), GRPCOutput (RPCTy p) ())
         => GRpcMethodHandler p m '[ ] 'RetNothing (m ()) where
  gRpcMethodHandler :: (forall a. m a -> ServerErrorIO a)
-> Proxy p
-> Proxy '[]
-> Proxy 'RetNothing
-> RPCTy p
-> (Request -> m ())
-> ServiceHandler
gRpcMethodHandler f :: forall a. m a -> ServerErrorIO a
f _ _ _ rpc :: RPCTy p
rpc h :: Request -> m ()
h
    = (forall x. m x -> IO x)
-> RPCTy p -> UnaryHandler m () () -> ServiceHandler
forall (m :: * -> *) r i o.
(MonadIO m, GRPCInput r i, GRPCOutput r o) =>
(forall x. m x -> IO x)
-> r -> UnaryHandler m i o -> ServiceHandler
unary @m @_ @() @() (ServerErrorIO x -> IO x
forall (m :: * -> *) a. MonadIO m => ServerErrorIO a -> m a
raiseErrors (ServerErrorIO x -> IO x)
-> (m x -> ServerErrorIO x) -> m x -> IO x
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m x -> ServerErrorIO x
forall a. m a -> ServerErrorIO a
f) RPCTy p
rpc (\req :: Request
req _ -> Request -> m ()
h Request
req)

-----

instance (MonadIO m, GRPCInput (RPCTy p) (), GRpcOutputWrapper p rref r)
         => GRpcMethodHandler p m '[ ] ('RetSingle rref) (m r) where
  gRpcMethodHandler :: (forall a. m a -> ServerErrorIO a)
-> Proxy p
-> Proxy '[]
-> Proxy ('RetSingle rref)
-> RPCTy p
-> (Request -> m r)
-> ServiceHandler
gRpcMethodHandler f :: forall a. m a -> ServerErrorIO a
f _ _ _ rpc :: RPCTy p
rpc h :: Request -> m r
h
    = (forall x. m x -> IO x)
-> RPCTy p
-> UnaryHandler m () (GRpcOWTy p rref r)
-> ServiceHandler
forall (m :: * -> *) r i o.
(MonadIO m, GRPCInput r i, GRPCOutput r o) =>
(forall x. m x -> IO x)
-> r -> UnaryHandler m i o -> ServiceHandler
unary @m @_ @() @(GRpcOWTy p rref r)
            (ServerErrorIO x -> IO x
forall (m :: * -> *) a. MonadIO m => ServerErrorIO a -> m a
raiseErrors (ServerErrorIO x -> IO x)
-> (m x -> ServerErrorIO x) -> m x -> IO x
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m x -> ServerErrorIO x
forall a. m a -> ServerErrorIO a
f) RPCTy p
rpc
            (\req :: Request
req _ -> Proxy p -> Proxy rref -> r -> GRpcOWTy p rref r
forall snm (p :: GRpcMessageProtocol) (ref :: TypeRef snm) r.
GRpcOutputWrapper p ref r =>
Proxy p -> Proxy ref -> r -> GRpcOWTy p ref r
buildGRpcOWTy (Proxy p
forall k (t :: k). Proxy t
Proxy @p) (Proxy rref
forall k (t :: k). Proxy t
Proxy @rref) (r -> GRpcOWTy p rref r) -> m r -> m (GRpcOWTy p rref r)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Request -> m r
h Request
req)

-----

instance (MonadIO m, GRPCInput (RPCTy p) (), GRpcOutputWrapper p rref r, MonadIO m)
         => GRpcMethodHandler p m '[ ] ('RetStream rref)
                              (ConduitT r Void m () -> m ()) where
  gRpcMethodHandler :: (forall a. m a -> ServerErrorIO a)
-> Proxy p
-> Proxy '[]
-> Proxy ('RetStream rref)
-> RPCTy p
-> (Request -> ConduitT r Void m () -> m ())
-> ServiceHandler
gRpcMethodHandler f :: forall a. m a -> ServerErrorIO a
f _ _ _ rpc :: RPCTy p
rpc h :: Request -> ConduitT r Void m () -> m ()
h
    = (forall x. m x -> IO x)
-> RPCTy p
-> ServerStreamHandler m () (GRpcOWTy p rref r) ()
-> ServiceHandler
forall (m :: * -> *) r i o a.
(MonadIO m, GRPCInput r i, GRPCOutput r o) =>
(forall x. m x -> IO x)
-> r -> ServerStreamHandler m i o a -> ServiceHandler
serverStream @m @_ @() @(GRpcOWTy p rref r) (ServerErrorIO x -> IO x
forall (m :: * -> *) a. MonadIO m => ServerErrorIO a -> m a
raiseErrors (ServerErrorIO x -> IO x)
-> (m x -> ServerErrorIO x) -> m x -> IO x
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m x -> ServerErrorIO x
forall a. m a -> ServerErrorIO a
f) RPCTy p
rpc ServerStreamHandler m () (GRpcOWTy p rref r) ()
sstream
    where sstream :: Request -> ()
                  -> m ((), ServerStream m (GRpcOWTy p rref r) ())
          sstream :: ServerStreamHandler m () (GRpcOWTy p rref r) ()
sstream req :: Request
req _ = do
            -- Variable to connect input and output
            TMVar (Maybe r)
var <- IO (TMVar (Maybe r)) -> m (TMVar (Maybe r))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (TMVar (Maybe r))
forall a. IO (TMVar a)
newEmptyTMVarIO :: m (TMVar (Maybe r))
            -- Start executing the handler
            Async ()
promise <- IO (Async ()) -> m (Async ())
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Async ()) -> m (Async ())) -> IO (Async ()) -> m (Async ())
forall a b. (a -> b) -> a -> b
$ IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async (ServerErrorIO () -> IO ()
forall (m :: * -> *) a. MonadIO m => ServerErrorIO a -> m a
raiseErrors (ServerErrorIO () -> IO ()) -> ServerErrorIO () -> IO ()
forall a b. (a -> b) -> a -> b
$ m () -> ServerErrorIO ()
forall a. m a -> ServerErrorIO a
f (Request -> ConduitT r Void m () -> m ()
h Request
req (TMVar (Maybe r) -> ConduitT r Void m ()
forall (m :: * -> *) r.
MonadIO m =>
TMVar (Maybe r) -> ConduitT r Void m ()
toTMVarConduit TMVar (Maybe r)
var)))
            -- Return the information
            let readNext :: () -> m (Maybe ((), GRpcOWTy p rref r))
readNext _
                  = do Maybe r
nextOutput <- IO (Maybe r) -> m (Maybe r)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe r) -> m (Maybe r)) -> IO (Maybe r) -> m (Maybe r)
forall a b. (a -> b) -> a -> b
$ STM (Maybe r) -> IO (Maybe r)
forall a. STM a -> IO a
atomically (STM (Maybe r) -> IO (Maybe r)) -> STM (Maybe r) -> IO (Maybe r)
forall a b. (a -> b) -> a -> b
$ TMVar (Maybe r) -> STM (Maybe r)
forall a. TMVar a -> STM a
takeTMVar TMVar (Maybe r)
var
                       case Maybe r
nextOutput of
                         Just o :: r
o  -> Maybe ((), GRpcOWTy p rref r) -> m (Maybe ((), GRpcOWTy p rref r))
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe ((), GRpcOWTy p rref r)
 -> m (Maybe ((), GRpcOWTy p rref r)))
-> Maybe ((), GRpcOWTy p rref r)
-> m (Maybe ((), GRpcOWTy p rref r))
forall a b. (a -> b) -> a -> b
$ ((), GRpcOWTy p rref r) -> Maybe ((), GRpcOWTy p rref r)
forall a. a -> Maybe a
Just ((), Proxy p -> Proxy rref -> r -> GRpcOWTy p rref r
forall snm (p :: GRpcMessageProtocol) (ref :: TypeRef snm) r.
GRpcOutputWrapper p ref r =>
Proxy p -> Proxy ref -> r -> GRpcOWTy p ref r
buildGRpcOWTy (Proxy p
forall k (t :: k). Proxy t
Proxy @p) (Proxy rref
forall k (t :: k). Proxy t
Proxy @rref) r
o)
                         Nothing -> do IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Async () -> IO ()
forall a. Async a -> IO ()
cancel Async ()
promise
                                       Maybe ((), GRpcOWTy p rref r) -> m (Maybe ((), GRpcOWTy p rref r))
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe ((), GRpcOWTy p rref r)
forall a. Maybe a
Nothing
            ((), ServerStream m (GRpcOWTy p rref r) ())
-> m ((), ServerStream m (GRpcOWTy p rref r) ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure ((), (() -> m (Maybe ((), GRpcOWTy p rref r)))
-> ServerStream m (GRpcOWTy p rref r) ()
forall (m :: * -> *) o a.
(a -> m (Maybe (a, o))) -> ServerStream m o a
ServerStream () -> m (Maybe ((), GRpcOWTy p rref r))
readNext)

-----

instance (MonadIO m, GRpcInputWrapper p vref v, GRPCOutput (RPCTy p) ())
         => GRpcMethodHandler p m '[ 'ArgSingle aname vref ] 'RetNothing (v -> m ()) where
  gRpcMethodHandler :: (forall a. m a -> ServerErrorIO a)
-> Proxy p
-> Proxy '[ 'ArgSingle aname vref]
-> Proxy 'RetNothing
-> RPCTy p
-> (Request -> v -> m ())
-> ServiceHandler
gRpcMethodHandler f :: forall a. m a -> ServerErrorIO a
f _ _ _ rpc :: RPCTy p
rpc h :: Request -> v -> m ()
h
    = (forall x. m x -> IO x)
-> RPCTy p
-> UnaryHandler m (GRpcIWTy p vref v) ()
-> ServiceHandler
forall (m :: * -> *) r i o.
(MonadIO m, GRPCInput r i, GRPCOutput r o) =>
(forall x. m x -> IO x)
-> r -> UnaryHandler m i o -> ServiceHandler
unary @m @_ @(GRpcIWTy p vref v) @()
            (ServerErrorIO x -> IO x
forall (m :: * -> *) a. MonadIO m => ServerErrorIO a -> m a
raiseErrors (ServerErrorIO x -> IO x)
-> (m x -> ServerErrorIO x) -> m x -> IO x
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m x -> ServerErrorIO x
forall a. m a -> ServerErrorIO a
f) RPCTy p
rpc
            (\req :: Request
req -> Request -> v -> m ()
h Request
req (v -> m ())
-> (GRpcIWTy p vref v -> v) -> GRpcIWTy p vref v -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Proxy p -> Proxy vref -> GRpcIWTy p vref v -> v
forall snm (p :: GRpcMessageProtocol) (ref :: TypeRef snm) r.
GRpcInputWrapper p ref r =>
Proxy p -> Proxy ref -> GRpcIWTy p ref r -> r
unGRpcIWTy (Proxy p
forall k (t :: k). Proxy t
Proxy @p) (Proxy vref
forall k (t :: k). Proxy t
Proxy @vref))

-----

instance (MonadIO m, GRpcInputWrapper p vref v, GRpcOutputWrapper p rref r)
         => GRpcMethodHandler p m '[ 'ArgSingle aname vref ] ('RetSingle rref) (v -> m r) where
  gRpcMethodHandler :: (forall a. m a -> ServerErrorIO a)
-> Proxy p
-> Proxy '[ 'ArgSingle aname vref]
-> Proxy ('RetSingle rref)
-> RPCTy p
-> (Request -> v -> m r)
-> ServiceHandler
gRpcMethodHandler f :: forall a. m a -> ServerErrorIO a
f _ _ _ rpc :: RPCTy p
rpc h :: Request -> v -> m r
h
    = (forall x. m x -> IO x)
-> RPCTy p
-> UnaryHandler m (GRpcIWTy p vref v) (GRpcOWTy p rref r)
-> ServiceHandler
forall (m :: * -> *) r i o.
(MonadIO m, GRPCInput r i, GRPCOutput r o) =>
(forall x. m x -> IO x)
-> r -> UnaryHandler m i o -> ServiceHandler
unary @m @_ @(GRpcIWTy p vref v) @(GRpcOWTy p rref r)
            (ServerErrorIO x -> IO x
forall (m :: * -> *) a. MonadIO m => ServerErrorIO a -> m a
raiseErrors (ServerErrorIO x -> IO x)
-> (m x -> ServerErrorIO x) -> m x -> IO x
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m x -> ServerErrorIO x
forall a. m a -> ServerErrorIO a
f) RPCTy p
rpc
            (\req :: Request
req -> (Proxy p -> Proxy rref -> r -> GRpcOWTy p rref r
forall snm (p :: GRpcMessageProtocol) (ref :: TypeRef snm) r.
GRpcOutputWrapper p ref r =>
Proxy p -> Proxy ref -> r -> GRpcOWTy p ref r
buildGRpcOWTy (Proxy p
forall k (t :: k). Proxy t
Proxy @p) (Proxy rref
forall k (t :: k). Proxy t
Proxy @rref) (r -> GRpcOWTy p rref r) -> m r -> m (GRpcOWTy p rref r)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>)
                     (m r -> m (GRpcOWTy p rref r))
-> (GRpcIWTy p vref v -> m r)
-> GRpcIWTy p vref v
-> m (GRpcOWTy p rref r)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Request -> v -> m r
h Request
req
                     (v -> m r) -> (GRpcIWTy p vref v -> v) -> GRpcIWTy p vref v -> m r
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Proxy p -> Proxy vref -> GRpcIWTy p vref v -> v
forall snm (p :: GRpcMessageProtocol) (ref :: TypeRef snm) r.
GRpcInputWrapper p ref r =>
Proxy p -> Proxy ref -> GRpcIWTy p ref r -> r
unGRpcIWTy (Proxy p
forall k (t :: k). Proxy t
Proxy @p) (Proxy vref
forall k (t :: k). Proxy t
Proxy @vref))

-----

instance (GRpcInputWrapper p vref v, GRpcOutputWrapper p rref r, MonadIO m)
         => GRpcMethodHandler p m '[ 'ArgSingle aname vref ] ('RetStream rref)
                              (v -> ConduitT r Void m () -> m ()) where
  gRpcMethodHandler :: (forall a. m a -> ServerErrorIO a)
-> Proxy p
-> Proxy '[ 'ArgSingle aname vref]
-> Proxy ('RetStream rref)
-> RPCTy p
-> (Request -> v -> ConduitT r Void m () -> m ())
-> ServiceHandler
gRpcMethodHandler f :: forall a. m a -> ServerErrorIO a
f _ _ _ rpc :: RPCTy p
rpc h :: Request -> v -> ConduitT r Void m () -> m ()
h
    = (forall x. m x -> IO x)
-> RPCTy p
-> ServerStreamHandler m (GRpcIWTy p vref v) (GRpcOWTy p rref r) ()
-> ServiceHandler
forall (m :: * -> *) r i o a.
(MonadIO m, GRPCInput r i, GRPCOutput r o) =>
(forall x. m x -> IO x)
-> r -> ServerStreamHandler m i o a -> ServiceHandler
serverStream @m @_ @(GRpcIWTy p vref v) @(GRpcOWTy p rref r)
                   (ServerErrorIO x -> IO x
forall (m :: * -> *) a. MonadIO m => ServerErrorIO a -> m a
raiseErrors (ServerErrorIO x -> IO x)
-> (m x -> ServerErrorIO x) -> m x -> IO x
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m x -> ServerErrorIO x
forall a. m a -> ServerErrorIO a
f) RPCTy p
rpc ServerStreamHandler m (GRpcIWTy p vref v) (GRpcOWTy p rref r) ()
sstream
    where sstream :: Request -> GRpcIWTy p vref v
                  -> m ((), ServerStream m (GRpcOWTy p rref r) ())
          sstream :: ServerStreamHandler m (GRpcIWTy p vref v) (GRpcOWTy p rref r) ()
sstream req :: Request
req v :: GRpcIWTy p vref v
v = do
            -- Variable to connect input and output
            TMVar (Maybe r)
var <- IO (TMVar (Maybe r)) -> m (TMVar (Maybe r))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (TMVar (Maybe r))
forall a. IO (TMVar a)
newEmptyTMVarIO :: m (TMVar (Maybe r))
            -- Start executing the handler
            let v' :: v
v' = Proxy p -> Proxy vref -> GRpcIWTy p vref v -> v
forall snm (p :: GRpcMessageProtocol) (ref :: TypeRef snm) r.
GRpcInputWrapper p ref r =>
Proxy p -> Proxy ref -> GRpcIWTy p ref r -> r
unGRpcIWTy (Proxy p
forall k (t :: k). Proxy t
Proxy @p) (Proxy vref
forall k (t :: k). Proxy t
Proxy @vref) GRpcIWTy p vref v
v
            Async ()
promise <- IO (Async ()) -> m (Async ())
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Async ()) -> m (Async ())) -> IO (Async ()) -> m (Async ())
forall a b. (a -> b) -> a -> b
$ IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async (ServerErrorIO () -> IO ()
forall (m :: * -> *) a. MonadIO m => ServerErrorIO a -> m a
raiseErrors (ServerErrorIO () -> IO ()) -> ServerErrorIO () -> IO ()
forall a b. (a -> b) -> a -> b
$ m () -> ServerErrorIO ()
forall a. m a -> ServerErrorIO a
f (Request -> v -> ConduitT r Void m () -> m ()
h Request
req v
v' (TMVar (Maybe r) -> ConduitT r Void m ()
forall (m :: * -> *) r.
MonadIO m =>
TMVar (Maybe r) -> ConduitT r Void m ()
toTMVarConduit TMVar (Maybe r)
var)))
            -- Return the information
            let readNext :: () -> m (Maybe ((), GRpcOWTy p rref r))
readNext _
                  = do Maybe r
nextOutput <- IO (Maybe r) -> m (Maybe r)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe r) -> m (Maybe r)) -> IO (Maybe r) -> m (Maybe r)
forall a b. (a -> b) -> a -> b
$ STM (Maybe r) -> IO (Maybe r)
forall a. STM a -> IO a
atomically (STM (Maybe r) -> IO (Maybe r)) -> STM (Maybe r) -> IO (Maybe r)
forall a b. (a -> b) -> a -> b
$ TMVar (Maybe r) -> STM (Maybe r)
forall a. TMVar a -> STM a
takeTMVar TMVar (Maybe r)
var
                       case Maybe r
nextOutput of
                         Just o :: r
o  -> Maybe ((), GRpcOWTy p rref r) -> m (Maybe ((), GRpcOWTy p rref r))
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe ((), GRpcOWTy p rref r)
 -> m (Maybe ((), GRpcOWTy p rref r)))
-> Maybe ((), GRpcOWTy p rref r)
-> m (Maybe ((), GRpcOWTy p rref r))
forall a b. (a -> b) -> a -> b
$ ((), GRpcOWTy p rref r) -> Maybe ((), GRpcOWTy p rref r)
forall a. a -> Maybe a
Just ((), Proxy p -> Proxy rref -> r -> GRpcOWTy p rref r
forall snm (p :: GRpcMessageProtocol) (ref :: TypeRef snm) r.
GRpcOutputWrapper p ref r =>
Proxy p -> Proxy ref -> r -> GRpcOWTy p ref r
buildGRpcOWTy (Proxy p
forall k (t :: k). Proxy t
Proxy @p) (Proxy rref
forall k (t :: k). Proxy t
Proxy @rref) r
o)
                         Nothing -> do IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Async () -> IO ()
forall a. Async a -> IO ()
cancel Async ()
promise
                                       Maybe ((), GRpcOWTy p rref r) -> m (Maybe ((), GRpcOWTy p rref r))
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe ((), GRpcOWTy p rref r)
forall a. Maybe a
Nothing
            ((), ServerStream m (GRpcOWTy p rref r) ())
-> m ((), ServerStream m (GRpcOWTy p rref r) ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure ((), (() -> m (Maybe ((), GRpcOWTy p rref r)))
-> ServerStream m (GRpcOWTy p rref r) ()
forall (m :: * -> *) o a.
(a -> m (Maybe (a, o))) -> ServerStream m o a
ServerStream () -> m (Maybe ((), GRpcOWTy p rref r))
readNext)

-----

instance (MonadIO m, GRpcInputWrapper p vref v, GRPCOutput (RPCTy p) (), MonadIO m)
         => GRpcMethodHandler p m '[ 'ArgStream aname vref ] 'RetNothing
                              (ConduitT () v m () -> m ()) where
  gRpcMethodHandler :: (forall a. m a -> ServerErrorIO a)
-> Proxy p
-> Proxy '[ 'ArgStream aname vref]
-> Proxy 'RetNothing
-> RPCTy p
-> (Request -> ConduitT () v m () -> m ())
-> ServiceHandler
gRpcMethodHandler f :: forall a. m a -> ServerErrorIO a
f _ _ _ rpc :: RPCTy p
rpc h :: Request -> ConduitT () v m () -> m ()
h
    = (forall x. m x -> IO x)
-> RPCTy p
-> ClientStreamHandler m (GRpcIWTy p vref v) () ()
-> ServiceHandler
forall (m :: * -> *) r i o a.
(MonadIO m, GRPCInput r i, GRPCOutput r o) =>
(forall x. m x -> IO x)
-> r -> ClientStreamHandler m i o a -> ServiceHandler
clientStream @m @_ @(GRpcIWTy p vref v) @()
                   (ServerErrorIO x -> IO x
forall (m :: * -> *) a. MonadIO m => ServerErrorIO a -> m a
raiseErrors (ServerErrorIO x -> IO x)
-> (m x -> ServerErrorIO x) -> m x -> IO x
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m x -> ServerErrorIO x
forall a. m a -> ServerErrorIO a
f) RPCTy p
rpc ClientStreamHandler m (GRpcIWTy p vref v) () ()
cstream
    where cstream :: Request
                  -> m ((), ClientStream m (GRpcIWTy p vref v) () ())
          cstream :: ClientStreamHandler m (GRpcIWTy p vref v) () ()
cstream req :: Request
req = do
            -- Create a new TMChan
            TMChan v
chan <- IO (TMChan v) -> m (TMChan v)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (TMChan v)
forall a. IO (TMChan a)
newTMChanIO :: m (TMChan v)
            let producer :: ConduitT () v m ()
producer = TMChan v -> ConduitT () v m ()
forall (m :: * -> *) a. MonadIO m => TMChan a -> ConduitT () a m ()
sourceTMChan @m TMChan v
chan
            -- Start executing the handler in another thread
            Async ()
promise <- IO (Async ()) -> m (Async ())
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Async ()) -> m (Async ())) -> IO (Async ()) -> m (Async ())
forall a b. (a -> b) -> a -> b
$ IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async (ServerErrorIO () -> IO ()
forall (m :: * -> *) a. MonadIO m => ServerErrorIO a -> m a
raiseErrors (ServerErrorIO () -> IO ()) -> ServerErrorIO () -> IO ()
forall a b. (a -> b) -> a -> b
$ m () -> ServerErrorIO ()
forall a. m a -> ServerErrorIO a
f (Request -> ConduitT () v m () -> m ()
h Request
req ConduitT () v m ()
producer))
            -- Build the actual handler
            let cstreamHandler :: () -> GRpcIWTy p vref v -> m ()
cstreamHandler _ newInput :: GRpcIWTy p vref v
newInput
                  = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$
                      TMChan v -> v -> STM ()
forall a. TMChan a -> a -> STM ()
writeTMChan TMChan v
chan (Proxy p -> Proxy vref -> GRpcIWTy p vref v -> v
forall snm (p :: GRpcMessageProtocol) (ref :: TypeRef snm) r.
GRpcInputWrapper p ref r =>
Proxy p -> Proxy ref -> GRpcIWTy p ref r -> r
unGRpcIWTy (Proxy p
forall k (t :: k). Proxy t
Proxy @p) (Proxy vref
forall k (t :: k). Proxy t
Proxy @vref) GRpcIWTy p vref v
newInput)
                cstreamFinalizer :: () -> m ()
cstreamFinalizer _
                  = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (TMChan v -> STM ()
forall a. TMChan a -> STM ()
closeTMChan TMChan v
chan) IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Async () -> IO ()
forall a. Async a -> IO a
wait Async ()
promise
            -- Return the information
            ((), ClientStream m (GRpcIWTy p vref v) () ())
-> m ((), ClientStream m (GRpcIWTy p vref v) () ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure ((), (() -> GRpcIWTy p vref v -> m ())
-> (() -> m ()) -> ClientStream m (GRpcIWTy p vref v) () ()
forall (m :: * -> *) i o a.
(a -> i -> m a) -> (a -> m o) -> ClientStream m i o a
ClientStream () -> GRpcIWTy p vref v -> m ()
cstreamHandler () -> m ()
cstreamFinalizer)

-----

instance (MonadIO m, GRpcInputWrapper p vref v, GRpcOutputWrapper p rref r, MonadIO m)
         => GRpcMethodHandler p m '[ 'ArgStream aname vref ] ('RetSingle rref)
                              (ConduitT () v m () -> m r) where
  gRpcMethodHandler :: (forall a. m a -> ServerErrorIO a)
-> Proxy p
-> Proxy '[ 'ArgStream aname vref]
-> Proxy ('RetSingle rref)
-> RPCTy p
-> (Request -> ConduitT () v m () -> m r)
-> ServiceHandler
gRpcMethodHandler f :: forall a. m a -> ServerErrorIO a
f _ _ _ rpc :: RPCTy p
rpc h :: Request -> ConduitT () v m () -> m r
h
    = (forall x. m x -> IO x)
-> RPCTy p
-> ClientStreamHandler m (GRpcIWTy p vref v) (GRpcOWTy p rref r) ()
-> ServiceHandler
forall (m :: * -> *) r i o a.
(MonadIO m, GRPCInput r i, GRPCOutput r o) =>
(forall x. m x -> IO x)
-> r -> ClientStreamHandler m i o a -> ServiceHandler
clientStream @m @_ @(GRpcIWTy p vref v) @(GRpcOWTy p rref r)
                   (ServerErrorIO x -> IO x
forall (m :: * -> *) a. MonadIO m => ServerErrorIO a -> m a
raiseErrors (ServerErrorIO x -> IO x)
-> (m x -> ServerErrorIO x) -> m x -> IO x
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m x -> ServerErrorIO x
forall a. m a -> ServerErrorIO a
f) RPCTy p
rpc ClientStreamHandler m (GRpcIWTy p vref v) (GRpcOWTy p rref r) ()
cstream
    where cstream :: Request
                  -> m ((), ClientStream m (GRpcIWTy p vref v)
                        (GRpcOWTy p rref r) ())
          cstream :: ClientStreamHandler m (GRpcIWTy p vref v) (GRpcOWTy p rref r) ()
cstream req :: Request
req = do
            -- Create a new TMChan
            TMChan v
chan <- IO (TMChan v) -> m (TMChan v)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (TMChan v)
forall a. IO (TMChan a)
newTMChanIO :: m (TMChan v)
            let producer :: ConduitT () v m ()
producer = TMChan v -> ConduitT () v m ()
forall (m :: * -> *) a. MonadIO m => TMChan a -> ConduitT () a m ()
sourceTMChan @m TMChan v
chan
            -- Start executing the handler in another thread
            Async (GRpcOWTy p rref r)
promise <- IO (Async (GRpcOWTy p rref r)) -> m (Async (GRpcOWTy p rref r))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Async (GRpcOWTy p rref r)) -> m (Async (GRpcOWTy p rref r)))
-> IO (Async (GRpcOWTy p rref r)) -> m (Async (GRpcOWTy p rref r))
forall a b. (a -> b) -> a -> b
$ IO (GRpcOWTy p rref r) -> IO (Async (GRpcOWTy p rref r))
forall a. IO a -> IO (Async a)
async
              (ServerErrorIO (GRpcOWTy p rref r) -> IO (GRpcOWTy p rref r)
forall (m :: * -> *) a. MonadIO m => ServerErrorIO a -> m a
raiseErrors
                 (ServerErrorIO (GRpcOWTy p rref r) -> IO (GRpcOWTy p rref r))
-> ServerErrorIO (GRpcOWTy p rref r) -> IO (GRpcOWTy p rref r)
forall a b. (a -> b) -> a -> b
$ Proxy p -> Proxy rref -> r -> GRpcOWTy p rref r
forall snm (p :: GRpcMessageProtocol) (ref :: TypeRef snm) r.
GRpcOutputWrapper p ref r =>
Proxy p -> Proxy ref -> r -> GRpcOWTy p ref r
buildGRpcOWTy (Proxy p
forall k (t :: k). Proxy t
Proxy @p) (Proxy rref
forall k (t :: k). Proxy t
Proxy @rref)
                 (r -> GRpcOWTy p rref r)
-> ExceptT ServerError IO r -> ServerErrorIO (GRpcOWTy p rref r)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> m r -> ExceptT ServerError IO r
forall a. m a -> ServerErrorIO a
f (Request -> ConduitT () v m () -> m r
h Request
req ConduitT () v m ()
producer))
            -- Build the actual handler
            let cstreamHandler :: () -> GRpcIWTy p vref v -> m ()
cstreamHandler _ newInput :: GRpcIWTy p vref v
newInput
                  = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$
                      TMChan v -> v -> STM ()
forall a. TMChan a -> a -> STM ()
writeTMChan TMChan v
chan (Proxy p -> Proxy vref -> GRpcIWTy p vref v -> v
forall snm (p :: GRpcMessageProtocol) (ref :: TypeRef snm) r.
GRpcInputWrapper p ref r =>
Proxy p -> Proxy ref -> GRpcIWTy p ref r -> r
unGRpcIWTy (Proxy p
forall k (t :: k). Proxy t
Proxy @p) (Proxy vref
forall k (t :: k). Proxy t
Proxy @vref) GRpcIWTy p vref v
newInput)
                cstreamFinalizer :: () -> m (GRpcOWTy p rref r)
cstreamFinalizer _
                  = IO (GRpcOWTy p rref r) -> m (GRpcOWTy p rref r)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (GRpcOWTy p rref r) -> m (GRpcOWTy p rref r))
-> IO (GRpcOWTy p rref r) -> m (GRpcOWTy p rref r)
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (TMChan v -> STM ()
forall a. TMChan a -> STM ()
closeTMChan TMChan v
chan) IO () -> IO (GRpcOWTy p rref r) -> IO (GRpcOWTy p rref r)
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Async (GRpcOWTy p rref r) -> IO (GRpcOWTy p rref r)
forall a. Async a -> IO a
wait Async (GRpcOWTy p rref r)
promise
            -- Return the information
            ((), ClientStream m (GRpcIWTy p vref v) (GRpcOWTy p rref r) ())
-> m ((),
      ClientStream m (GRpcIWTy p vref v) (GRpcOWTy p rref r) ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure ((), (() -> GRpcIWTy p vref v -> m ())
-> (() -> m (GRpcOWTy p rref r))
-> ClientStream m (GRpcIWTy p vref v) (GRpcOWTy p rref r) ()
forall (m :: * -> *) i o a.
(a -> i -> m a) -> (a -> m o) -> ClientStream m i o a
ClientStream () -> GRpcIWTy p vref v -> m ()
cstreamHandler () -> m (GRpcOWTy p rref r)
cstreamFinalizer)

-----

instance (GRpcInputWrapper p vref v, GRpcOutputWrapper p rref r, MonadIO m)
         => GRpcMethodHandler p m '[ 'ArgStream aname vref ] ('RetStream rref)
                              (ConduitT () v m () -> ConduitT r Void m () -> m ()) where
  gRpcMethodHandler :: (forall a. m a -> ServerErrorIO a)
-> Proxy p
-> Proxy '[ 'ArgStream aname vref]
-> Proxy ('RetStream rref)
-> RPCTy p
-> (Request -> ConduitT () v m () -> ConduitT r Void m () -> m ())
-> ServiceHandler
gRpcMethodHandler f :: forall a. m a -> ServerErrorIO a
f _ _ _ rpc :: RPCTy p
rpc h :: Request -> ConduitT () v m () -> ConduitT r Void m () -> m ()
h
    = (forall x. m x -> IO x)
-> RPCTy p
-> GeneralStreamHandler
     m (GRpcIWTy p vref v) (GRpcOWTy p rref r) () ()
-> ServiceHandler
forall (m :: * -> *) r i o a b.
(MonadIO m, GRPCInput r i, GRPCOutput r o) =>
(forall x. m x -> IO x)
-> r -> GeneralStreamHandler m i o a b -> ServiceHandler
generalStream @m @_ @(GRpcIWTy p vref v) @(GRpcOWTy p rref r)
                    (ServerErrorIO x -> IO x
forall (m :: * -> *) a. MonadIO m => ServerErrorIO a -> m a
raiseErrors (ServerErrorIO x -> IO x)
-> (m x -> ServerErrorIO x) -> m x -> IO x
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m x -> ServerErrorIO x
forall a. m a -> ServerErrorIO a
f) RPCTy p
rpc GeneralStreamHandler
  m (GRpcIWTy p vref v) (GRpcOWTy p rref r) () ()
bdstream
    where bdstream :: Request
                   -> m ( (), IncomingStream m (GRpcIWTy p vref v) ()
                        , (), OutgoingStream m (GRpcOWTy p rref r) () )
          bdstream :: GeneralStreamHandler
  m (GRpcIWTy p vref v) (GRpcOWTy p rref r) () ()
bdstream req :: Request
req = do
            -- Create a new TMChan and a new variable
            TMChan v
chan <- IO (TMChan v) -> m (TMChan v)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (TMChan v)
forall a. IO (TMChan a)
newTMChanIO :: m (TMChan v)
            let producer :: ConduitT () v m ()
producer = TMChan v -> ConduitT () v m ()
forall (m :: * -> *) a. MonadIO m => TMChan a -> ConduitT () a m ()
sourceTMChan @m TMChan v
chan
            TMVar (Maybe r)
var <- IO (TMVar (Maybe r)) -> m (TMVar (Maybe r))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (TMVar (Maybe r))
forall a. IO (TMVar a)
newEmptyTMVarIO :: m (TMVar (Maybe r))
            -- Start executing the handler
            Async ()
promise <- IO (Async ()) -> m (Async ())
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Async ()) -> m (Async ())) -> IO (Async ()) -> m (Async ())
forall a b. (a -> b) -> a -> b
$ IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async
              (ServerErrorIO () -> IO ()
forall (m :: * -> *) a. MonadIO m => ServerErrorIO a -> m a
raiseErrors (ServerErrorIO () -> IO ()) -> ServerErrorIO () -> IO ()
forall a b. (a -> b) -> a -> b
$ m () -> ServerErrorIO ()
forall a. m a -> ServerErrorIO a
f (m () -> ServerErrorIO ()) -> m () -> ServerErrorIO ()
forall a b. (a -> b) -> a -> b
$ Request -> ConduitT () v m () -> ConduitT r Void m () -> m ()
h Request
req ConduitT () v m ()
producer (TMVar (Maybe r) -> ConduitT r Void m ()
forall (m :: * -> *) r.
MonadIO m =>
TMVar (Maybe r) -> ConduitT r Void m ()
toTMVarConduit TMVar (Maybe r)
var))
            -- Build the actual handler
            let cstreamHandler :: () -> GRpcIWTy p vref v -> m ()
cstreamHandler _ newInput :: GRpcIWTy p vref v
newInput
                  = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$
                      TMChan v -> v -> STM ()
forall a. TMChan a -> a -> STM ()
writeTMChan TMChan v
chan (Proxy p -> Proxy vref -> GRpcIWTy p vref v -> v
forall snm (p :: GRpcMessageProtocol) (ref :: TypeRef snm) r.
GRpcInputWrapper p ref r =>
Proxy p -> Proxy ref -> GRpcIWTy p ref r -> r
unGRpcIWTy (Proxy p
forall k (t :: k). Proxy t
Proxy @p) (Proxy vref
forall k (t :: k). Proxy t
Proxy @vref) GRpcIWTy p vref v
newInput)
                cstreamFinalizer :: () -> m ()
cstreamFinalizer _
                  = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (TMChan v -> STM ()
forall a. TMChan a -> STM ()
closeTMChan TMChan v
chan) IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Async () -> IO ()
forall a. Async a -> IO a
wait Async ()
promise
                readNext :: () -> m (Maybe ((), GRpcOWTy p rref r))
readNext _
                  = do Maybe (Maybe r)
nextOutput <- IO (Maybe (Maybe r)) -> m (Maybe (Maybe r))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe (Maybe r)) -> m (Maybe (Maybe r)))
-> IO (Maybe (Maybe r)) -> m (Maybe (Maybe r))
forall a b. (a -> b) -> a -> b
$ STM (Maybe (Maybe r)) -> IO (Maybe (Maybe r))
forall a. STM a -> IO a
atomically (STM (Maybe (Maybe r)) -> IO (Maybe (Maybe r)))
-> STM (Maybe (Maybe r)) -> IO (Maybe (Maybe r))
forall a b. (a -> b) -> a -> b
$ TMVar (Maybe r) -> STM (Maybe (Maybe r))
forall a. TMVar a -> STM (Maybe a)
tryTakeTMVar TMVar (Maybe r)
var
                       case Maybe (Maybe r)
nextOutput of
                         Just (Just o :: r
o) ->
                           Maybe ((), GRpcOWTy p rref r) -> m (Maybe ((), GRpcOWTy p rref r))
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe ((), GRpcOWTy p rref r)
 -> m (Maybe ((), GRpcOWTy p rref r)))
-> Maybe ((), GRpcOWTy p rref r)
-> m (Maybe ((), GRpcOWTy p rref r))
forall a b. (a -> b) -> a -> b
$ ((), GRpcOWTy p rref r) -> Maybe ((), GRpcOWTy p rref r)
forall a. a -> Maybe a
Just ((), Proxy p -> Proxy rref -> r -> GRpcOWTy p rref r
forall snm (p :: GRpcMessageProtocol) (ref :: TypeRef snm) r.
GRpcOutputWrapper p ref r =>
Proxy p -> Proxy ref -> r -> GRpcOWTy p ref r
buildGRpcOWTy (Proxy p
forall k (t :: k). Proxy t
Proxy @p) (Proxy rref
forall k (t :: k). Proxy t
Proxy @rref) r
o)
                         Just Nothing  -> do
                           IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Async () -> IO ()
forall a. Async a -> IO ()
cancel Async ()
promise
                           Maybe ((), GRpcOWTy p rref r) -> m (Maybe ((), GRpcOWTy p rref r))
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe ((), GRpcOWTy p rref r)
forall a. Maybe a
Nothing
                         Nothing -> -- no new elements to output
                           () -> m (Maybe ((), GRpcOWTy p rref r))
readNext ()
            ((), IncomingStream m (GRpcIWTy p vref v) (), (),
 OutgoingStream m (GRpcOWTy p rref r) ())
-> m ((), IncomingStream m (GRpcIWTy p vref v) (), (),
      OutgoingStream m (GRpcOWTy p rref r) ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure ((), (() -> GRpcIWTy p vref v -> m ())
-> (() -> m ()) -> IncomingStream m (GRpcIWTy p vref v) ()
forall (m :: * -> *) i a.
(a -> i -> m a) -> (a -> m ()) -> IncomingStream m i a
IncomingStream () -> GRpcIWTy p vref v -> m ()
cstreamHandler () -> m ()
cstreamFinalizer, (), (() -> m (Maybe ((), GRpcOWTy p rref r)))
-> OutgoingStream m (GRpcOWTy p rref r) ()
forall (m :: * -> *) o a.
(a -> m (Maybe (a, o))) -> OutgoingStream m o a
OutgoingStream () -> m (Maybe ((), GRpcOWTy p rref r))
readNext)

-----

toTMVarConduit :: MonadIO m => TMVar (Maybe r) -> ConduitT r Void m ()
toTMVarConduit :: TMVar (Maybe r) -> ConduitT r Void m ()
toTMVarConduit var :: TMVar (Maybe r)
var = do
  Maybe r
x <- ConduitT r Void m (Maybe r)
forall (m :: * -> *) i. Monad m => Consumer i m (Maybe i)
await
  IO () -> ConduitT r Void m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ConduitT r Void m ()) -> IO () -> ConduitT r Void m ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMVar (Maybe r) -> Maybe r -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar TMVar (Maybe r)
var Maybe r
x
  TMVar (Maybe r) -> ConduitT r Void m ()
forall (m :: * -> *) r.
MonadIO m =>
TMVar (Maybe r) -> ConduitT r Void m ()
toTMVarConduit TMVar (Maybe r)
var