{-# language DataKinds             #-}
{-# language FlexibleContexts      #-}
{-# language FlexibleInstances     #-}
{-# language GADTs                 #-}
{-# language MultiParamTypeClasses #-}
{-# language PolyKinds             #-}
{-# language RankNTypes            #-}
{-# language ScopedTypeVariables   #-}
{-# language TypeApplications      #-}
{-# language TypeFamilies          #-}
{-# language TypeOperators         #-}
{-# language UndecidableInstances  #-}
{-|
Description : Execute a Mu 'Server' using gRPC as transport layer

This module allows you to server a Mu 'Server'
as a WAI 'Application' using gRPC as transport layer.

The simples way is to use 'runGRpcApp', all other
variants provide more control over the settings.
-}
module Mu.GRpc.Server
( -- * Supported messaging formats
  GRpcMessageProtocol(..)
, msgProtoBuf, msgAvro
  -- * Run a 'Server' directly
, runGRpcApp, runGRpcAppTrans
, runGRpcAppSettings, Settings
, runGRpcAppTLS, TLSSettings
  -- * Convert a 'Server' into a WAI application
, gRpcApp
  -- * Raise errors as exceptions in IO
, raiseErrors, liftServerConduit
  -- * Re-export useful instances
, module Avro
) where

import           Control.Concurrent.Async
import           Control.Concurrent.STM       (atomically)
import           Control.Concurrent.STM.TMVar
import           Control.Exception
import           Control.Monad.Except
import           Data.ByteString              (ByteString)
import qualified Data.ByteString.Char8        as BS
import           Data.Conduit
import           Data.Conduit.TMChan
import           Data.Kind
import           Data.Proxy
import           Network.GRPC.HTTP2.Encoding  (GRPCInput, GRPCOutput, gzip, uncompressed)
import           Network.GRPC.HTTP2.Types     (GRPCStatus (..), GRPCStatusCode (..))
import           Network.GRPC.Server.Handlers
import           Network.GRPC.Server.Wai      as Wai
import           Network.Wai                  (Application)
import           Network.Wai.Handler.Warp     (Port, Settings, run, runSettings)
import           Network.Wai.Handler.WarpTLS  (TLSSettings, runTLS)

import           Mu.Adapter.ProtoBuf.Via
import           Mu.GRpc.Avro
import qualified Mu.GRpc.Avro                 as Avro
import           Mu.GRpc.Bridge
import           Mu.Rpc
import           Mu.Schema
import           Mu.Server

-- | Run a Mu 'Server' on the given port.
runGRpcApp
  :: ( KnownName name, KnownName (FindPackageName anns)
     , GRpcMethodHandlers protocol ServerErrorIO methods handlers )
  => Proxy protocol
  -> Port
  -> ServerT f ('Service name anns methods) ServerErrorIO handlers
  -> IO ()
runGRpcApp :: Proxy protocol
-> Port
-> ServerT f ('Service name anns methods) ServerErrorIO handlers
-> IO ()
runGRpcApp protocol :: Proxy protocol
protocol port :: Port
port = Proxy protocol
-> Port
-> (forall a. ServerErrorIO a -> ServerErrorIO a)
-> ServerT f ('Service name anns methods) ServerErrorIO handlers
-> IO ()
forall snm mnm (name :: snm) (anns :: [*])
       (protocol :: GRpcMessageProtocol) (m :: * -> *)
       (methods :: [Method mnm]) (handlers :: [*]) (f :: * -> *).
(KnownName name, KnownName (FindPackageName anns),
 GRpcMethodHandlers protocol m methods handlers) =>
Proxy protocol
-> Port
-> (forall a. m a -> ServerErrorIO a)
-> ServerT f ('Service name anns methods) m handlers
-> IO ()
runGRpcAppTrans Proxy protocol
protocol Port
port forall a. a -> a
forall a. ServerErrorIO a -> ServerErrorIO a
id

-- | Run a Mu 'Server' on the given port.
runGRpcAppTrans
  :: ( KnownName name, KnownName (FindPackageName anns)
     , GRpcMethodHandlers protocol m methods handlers )
  => Proxy protocol
  -> Port
  -> (forall a. m a -> ServerErrorIO a)
  -> ServerT f ('Service name anns methods) m handlers
  -> IO ()
runGRpcAppTrans :: Proxy protocol
-> Port
-> (forall a. m a -> ServerErrorIO a)
-> ServerT f ('Service name anns methods) m handlers
-> IO ()
runGRpcAppTrans protocol :: Proxy protocol
protocol port :: Port
port f :: forall a. m a -> ServerErrorIO a
f svr :: ServerT f ('Service name anns methods) m handlers
svr = Port -> Application -> IO ()
run Port
port (Proxy protocol
-> (forall a. m a -> ServerErrorIO a)
-> ServerT f ('Service name anns methods) m handlers
-> Application
forall snm mnm (name :: snm) (anns :: [*])
       (protocol :: GRpcMessageProtocol) (m :: * -> *)
       (methods :: [Method mnm]) (handlers :: [*]) (f :: * -> *).
(KnownName name, KnownName (FindPackageName anns),
 GRpcMethodHandlers protocol m methods handlers) =>
Proxy protocol
-> (forall a. m a -> ServerErrorIO a)
-> ServerT f ('Service name anns methods) m handlers
-> Application
gRpcAppTrans Proxy protocol
protocol forall a. m a -> ServerErrorIO a
f ServerT f ('Service name anns methods) m handlers
svr)

-- | Run a Mu 'Server' using the given 'Settings'.
--
--   Go to 'Network.Wai.Handler.Warp' to declare 'Settings'.
runGRpcAppSettings
  :: ( KnownName name, KnownName (FindPackageName anns)
     , GRpcMethodHandlers protocol m methods handlers )
  => Proxy protocol
  -> Settings
  -> (forall a. m a -> ServerErrorIO a)
  -> ServerT f ('Service name anns methods) m handlers
  -> IO ()
runGRpcAppSettings :: Proxy protocol
-> Settings
-> (forall a. m a -> ServerErrorIO a)
-> ServerT f ('Service name anns methods) m handlers
-> IO ()
runGRpcAppSettings protocol :: Proxy protocol
protocol st :: Settings
st f :: forall a. m a -> ServerErrorIO a
f svr :: ServerT f ('Service name anns methods) m handlers
svr = Settings -> Application -> IO ()
runSettings Settings
st (Proxy protocol
-> (forall a. m a -> ServerErrorIO a)
-> ServerT f ('Service name anns methods) m handlers
-> Application
forall snm mnm (name :: snm) (anns :: [*])
       (protocol :: GRpcMessageProtocol) (m :: * -> *)
       (methods :: [Method mnm]) (handlers :: [*]) (f :: * -> *).
(KnownName name, KnownName (FindPackageName anns),
 GRpcMethodHandlers protocol m methods handlers) =>
Proxy protocol
-> (forall a. m a -> ServerErrorIO a)
-> ServerT f ('Service name anns methods) m handlers
-> Application
gRpcAppTrans Proxy protocol
protocol forall a. m a -> ServerErrorIO a
f ServerT f ('Service name anns methods) m handlers
svr)

-- | Run a Mu 'Server' using the given 'TLSSettings' and 'Settings'.
--
--   Go to 'Network.Wai.Handler.WarpTLS' to declare 'TLSSettings'
--   and to 'Network.Wai.Handler.Warp' to declare 'Settings'.
runGRpcAppTLS
  :: ( KnownName name, KnownName (FindPackageName anns)
     , GRpcMethodHandlers protocol m methods handlers )
  => Proxy protocol
  -> TLSSettings -> Settings
  -> (forall a. m a -> ServerErrorIO a)
  -> ServerT f ('Service name anns methods) m handlers
  -> IO ()
runGRpcAppTLS :: Proxy protocol
-> TLSSettings
-> Settings
-> (forall a. m a -> ServerErrorIO a)
-> ServerT f ('Service name anns methods) m handlers
-> IO ()
runGRpcAppTLS protocol :: Proxy protocol
protocol tls :: TLSSettings
tls st :: Settings
st f :: forall a. m a -> ServerErrorIO a
f svr :: ServerT f ('Service name anns methods) m handlers
svr = TLSSettings -> Settings -> Application -> IO ()
runTLS TLSSettings
tls Settings
st (Proxy protocol
-> (forall a. m a -> ServerErrorIO a)
-> ServerT f ('Service name anns methods) m handlers
-> Application
forall snm mnm (name :: snm) (anns :: [*])
       (protocol :: GRpcMessageProtocol) (m :: * -> *)
       (methods :: [Method mnm]) (handlers :: [*]) (f :: * -> *).
(KnownName name, KnownName (FindPackageName anns),
 GRpcMethodHandlers protocol m methods handlers) =>
Proxy protocol
-> (forall a. m a -> ServerErrorIO a)
-> ServerT f ('Service name anns methods) m handlers
-> Application
gRpcAppTrans Proxy protocol
protocol forall a. m a -> ServerErrorIO a
f ServerT f ('Service name anns methods) m handlers
svr)

-- | Turn a Mu 'Server' into a WAI 'Application'.
--
--   These 'Application's can be later combined using,
--   for example, @wai-routes@, or you can add middleware
--   from @wai-extra@, among others.
gRpcApp
  :: ( KnownName name, KnownName (FindPackageName anns)
     , GRpcMethodHandlers protocol ServerErrorIO methods handlers )
  => Proxy protocol
  -> ServerT f ('Service name anns methods) ServerErrorIO handlers
  -> Application
gRpcApp :: Proxy protocol
-> ServerT f ('Service name anns methods) ServerErrorIO handlers
-> Application
gRpcApp protocol :: Proxy protocol
protocol = Proxy protocol
-> (forall a. ServerErrorIO a -> ServerErrorIO a)
-> ServerT f ('Service name anns methods) ServerErrorIO handlers
-> Application
forall snm mnm (name :: snm) (anns :: [*])
       (protocol :: GRpcMessageProtocol) (m :: * -> *)
       (methods :: [Method mnm]) (handlers :: [*]) (f :: * -> *).
(KnownName name, KnownName (FindPackageName anns),
 GRpcMethodHandlers protocol m methods handlers) =>
Proxy protocol
-> (forall a. m a -> ServerErrorIO a)
-> ServerT f ('Service name anns methods) m handlers
-> Application
gRpcAppTrans Proxy protocol
protocol forall a. a -> a
forall a. ServerErrorIO a -> ServerErrorIO a
id

-- | Turn a Mu 'Server' into a WAI 'Application'.
--
--   These 'Application's can be later combined using,
--   for example, @wai-routes@, or you can add middleware
--   from @wai-extra@, among others.
gRpcAppTrans
  :: ( KnownName name, KnownName (FindPackageName anns)
     , GRpcMethodHandlers protocol m methods handlers )
  => Proxy protocol
  -> (forall a. m a -> ServerErrorIO a)
  -> ServerT f ('Service name anns methods) m handlers
  -> Application
gRpcAppTrans :: Proxy protocol
-> (forall a. m a -> ServerErrorIO a)
-> ServerT f ('Service name anns methods) m handlers
-> Application
gRpcAppTrans protocol :: Proxy protocol
protocol f :: forall a. m a -> ServerErrorIO a
f svr :: ServerT f ('Service name anns methods) m handlers
svr
  = [Compression] -> [ServiceHandler] -> Application
Wai.grpcApp [Compression
uncompressed, Compression
gzip]
                (Proxy protocol
-> (forall a. m a -> ServerErrorIO a)
-> ServerT f ('Service name anns methods) m handlers
-> [ServiceHandler]
forall snm mnm (name :: snm) (anns :: [*])
       (methods :: [Method mnm]) (handlers :: [*]) (m :: * -> *)
       (protocol :: GRpcMessageProtocol) (w :: * -> *).
(KnownName name, KnownName (FindPackageName anns),
 GRpcMethodHandlers protocol m methods handlers) =>
Proxy protocol
-> (forall a. m a -> ServerErrorIO a)
-> ServerT w ('Service name anns methods) m handlers
-> [ServiceHandler]
gRpcServiceHandlers Proxy protocol
protocol forall a. m a -> ServerErrorIO a
f ServerT f ('Service name anns methods) m handlers
svr)

gRpcServiceHandlers
  :: forall name anns methods handlers m protocol w.
     ( KnownName name, KnownName (FindPackageName anns)
     , GRpcMethodHandlers protocol m methods handlers )
  => Proxy protocol
  -> (forall a. m a -> ServerErrorIO a)
  -> ServerT w ('Service name anns methods) m handlers
  -> [ServiceHandler]
gRpcServiceHandlers :: Proxy protocol
-> (forall a. m a -> ServerErrorIO a)
-> ServerT w ('Service name anns methods) m handlers
-> [ServiceHandler]
gRpcServiceHandlers pr :: Proxy protocol
pr f :: forall a. m a -> ServerErrorIO a
f (Server svr :: HandlersT w methods m handlers
svr) = (forall a. m a -> ServerErrorIO a)
-> Proxy protocol
-> ByteString
-> ByteString
-> HandlersT w methods m handlers
-> [ServiceHandler]
forall mnm (p :: GRpcMessageProtocol) (m :: * -> *)
       (ms :: [Method mnm]) (hs :: [*]) (f :: * -> *).
GRpcMethodHandlers p m ms hs =>
(forall a. m a -> ServerErrorIO a)
-> Proxy p
-> ByteString
-> ByteString
-> HandlersT f ms m hs
-> [ServiceHandler]
gRpcMethodHandlers forall a. m a -> ServerErrorIO a
f Proxy protocol
pr ByteString
packageName ByteString
serviceName HandlersT w methods m handlers
svr
  where packageName :: ByteString
packageName = String -> ByteString
BS.pack (Proxy (FindPackageName anns) -> String
forall k (a :: k) (proxy :: k -> *).
KnownName a =>
proxy a -> String
nameVal (Proxy (FindPackageName anns)
forall k (t :: k). Proxy t
Proxy @(FindPackageName anns)))
        serviceName :: ByteString
serviceName = String -> ByteString
BS.pack (Proxy name -> String
forall k (a :: k) (proxy :: k -> *).
KnownName a =>
proxy a -> String
nameVal (Proxy name
forall k (t :: k). Proxy t
Proxy @name))

class GRpcMethodHandlers (p :: GRpcMessageProtocol) (m :: Type -> Type)
                         (ms :: [Method mnm]) (hs :: [Type]) where
  gRpcMethodHandlers :: (forall a. m a -> ServerErrorIO a)
                     -> Proxy p -> ByteString -> ByteString
                     -> HandlersT f ms m hs -> [ServiceHandler]

instance GRpcMethodHandlers p m '[] '[] where
  gRpcMethodHandlers :: (forall a. m a -> ServerErrorIO a)
-> Proxy p
-> ByteString
-> ByteString
-> HandlersT f '[] m '[]
-> [ServiceHandler]
gRpcMethodHandlers _ _ _ _ H0 = []
instance (KnownName name, GRpcMethodHandler p m args r h, GRpcMethodHandlers p m rest hs, MkRPC p)
         => GRpcMethodHandlers p m ('Method name anns args r ': rest) (h ': hs) where
  gRpcMethodHandlers :: (forall a. m a -> ServerErrorIO a)
-> Proxy p
-> ByteString
-> ByteString
-> HandlersT f ('Method name anns args r : rest) m (h : hs)
-> [ServiceHandler]
gRpcMethodHandlers f :: forall a. m a -> ServerErrorIO a
f pr :: Proxy p
pr p :: ByteString
p s :: ByteString
s (h :: h
h :<|>: rest :: HandlersT f ms m hs1
rest)
    = (forall a. m a -> ServerErrorIO a)
-> Proxy p
-> Proxy args
-> Proxy r
-> RPCTy p
-> h
-> ServiceHandler
forall k k (p :: GRpcMessageProtocol) (m :: * -> *) (args :: k)
       (r :: k) h.
GRpcMethodHandler p m args r h =>
(forall a. m a -> ServerErrorIO a)
-> Proxy p
-> Proxy args
-> Proxy r
-> RPCTy p
-> h
-> ServiceHandler
gRpcMethodHandler forall a. m a -> ServerErrorIO a
f Proxy p
pr (Proxy args
forall k (t :: k). Proxy t
Proxy @args) (Proxy r
forall k (t :: k). Proxy t
Proxy @r) (Proxy p -> ByteString -> ByteString -> ByteString -> RPCTy p
forall (p :: GRpcMessageProtocol).
MkRPC p =>
Proxy p -> ByteString -> ByteString -> ByteString -> RPCTy p
mkRPC Proxy p
pr ByteString
p ByteString
s ByteString
methodName) h
h
      ServiceHandler -> [ServiceHandler] -> [ServiceHandler]
forall a. a -> [a] -> [a]
: (forall a. m a -> ServerErrorIO a)
-> Proxy p
-> ByteString
-> ByteString
-> HandlersT f ms m hs1
-> [ServiceHandler]
forall mnm (p :: GRpcMessageProtocol) (m :: * -> *)
       (ms :: [Method mnm]) (hs :: [*]) (f :: * -> *).
GRpcMethodHandlers p m ms hs =>
(forall a. m a -> ServerErrorIO a)
-> Proxy p
-> ByteString
-> ByteString
-> HandlersT f ms m hs
-> [ServiceHandler]
gRpcMethodHandlers forall a. m a -> ServerErrorIO a
f Proxy p
pr ByteString
p ByteString
s HandlersT f ms m hs1
rest
    where methodName :: ByteString
methodName = String -> ByteString
BS.pack (Proxy name -> String
forall k (a :: k) (proxy :: k -> *).
KnownName a =>
proxy a -> String
nameVal (Proxy name
forall k (t :: k). Proxy t
Proxy @name))

class GRpcMethodHandler p m args r h where
  gRpcMethodHandler :: (forall a. m a -> ServerErrorIO a)
                    -> Proxy p -> Proxy args -> Proxy r
                    -> RPCTy p -> h -> ServiceHandler

-- | Turns a 'Conduit' working on 'ServerErrorIO'
--   into any other base monad which supports 'IO',
--   by raising any error as an exception.
--
--   This function is useful to interoperate with
--   libraries which generate 'Conduit's with other
--   base monads, such as @persistent@.
liftServerConduit
  :: MonadIO m
  => ConduitT a b ServerErrorIO r -> ConduitT a b m r
liftServerConduit :: ConduitT a b ServerErrorIO r -> ConduitT a b m r
liftServerConduit = (forall a. ServerErrorIO a -> m a)
-> ConduitT a b ServerErrorIO r -> ConduitT a b m r
forall (m :: * -> *) (n :: * -> *) i o r.
Monad m =>
(forall a. m a -> n a) -> ConduitT i o m r -> ConduitT i o n r
transPipe forall a. ServerErrorIO a -> m a
forall (m :: * -> *) a. MonadIO m => ServerErrorIO a -> m a
raiseErrors

-- | Raises errors from 'ServerErrorIO' as exceptions
--   in a monad which supports 'IO'.
--
--   This function is useful to interoperate with other
--   libraries which cannot handle the additional error
--   layer. In particular, with Conduit, as witnessed
--   by 'liftServerConduit'.
raiseErrors :: MonadIO m => ServerErrorIO a -> m a
raiseErrors :: ServerErrorIO a -> m a
raiseErrors h :: ServerErrorIO a
h
  = IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO a -> m a) -> IO a -> m a
forall a b. (a -> b) -> a -> b
$ do
      Either ServerError a
h' <- ServerErrorIO a -> IO (Either ServerError a)
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT ServerErrorIO a
h
      case Either ServerError a
h' of
        Right r :: a
r -> a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return a
r
        Left (ServerError code :: ServerErrorCode
code msg :: String
msg)
          -> GRPCStatus -> IO a
forall a. GRPCStatus -> IO a
closeEarly (GRPCStatus -> IO a) -> GRPCStatus -> IO a
forall a b. (a -> b) -> a -> b
$ GRPCStatusCode -> ByteString -> GRPCStatus
GRPCStatus (ServerErrorCode -> GRPCStatusCode
serverErrorToGRpcError ServerErrorCode
code)
                                     (String -> ByteString
BS.pack String
msg)
    IO a -> [Handler a] -> IO a
forall a. IO a -> [Handler a] -> IO a
`catches`
    [ (GRPCStatus -> IO a) -> Handler a
forall a e. Exception e => (e -> IO a) -> Handler a
Handler (\(GRPCStatus
e :: GRPCStatus) -> GRPCStatus -> IO a
forall e a. Exception e => e -> IO a
throwIO GRPCStatus
e)
    , (SomeException -> IO a) -> Handler a
forall a e. Exception e => (e -> IO a) -> Handler a
Handler (\(SomeException
e :: SomeException) -> GRPCStatus -> IO a
forall a. GRPCStatus -> IO a
closeEarly (GRPCStatus -> IO a) -> GRPCStatus -> IO a
forall a b. (a -> b) -> a -> b
$ GRPCStatusCode -> ByteString -> GRPCStatus
GRPCStatus GRPCStatusCode
INTERNAL (String -> ByteString
BS.pack (String -> ByteString) -> String -> ByteString
forall a b. (a -> b) -> a -> b
$ SomeException -> String
forall a. Show a => a -> String
show SomeException
e))
    ]

  where
    serverErrorToGRpcError :: ServerErrorCode -> GRPCStatusCode
    serverErrorToGRpcError :: ServerErrorCode -> GRPCStatusCode
serverErrorToGRpcError Unknown         = GRPCStatusCode
UNKNOWN
    serverErrorToGRpcError Unavailable     = GRPCStatusCode
UNAVAILABLE
    serverErrorToGRpcError Unimplemented   = GRPCStatusCode
UNIMPLEMENTED
    serverErrorToGRpcError Unauthenticated = GRPCStatusCode
UNAUTHENTICATED
    serverErrorToGRpcError Internal        = GRPCStatusCode
INTERNAL
    serverErrorToGRpcError NotFound        = GRPCStatusCode
NOT_FOUND
    serverErrorToGRpcError Invalid         = GRPCStatusCode
INVALID_ARGUMENT

-----
-- IMPLEMENTATION OF THE METHODS
-----

-- These type classes allow us to abstract over
-- the choice of message protocol (PB or Avro)

class GRPCOutput (RPCTy p) (GRpcOWTy p ref r)
      => GRpcOutputWrapper (p :: GRpcMessageProtocol) (ref :: TypeRef) (r :: Type) where
  type GRpcOWTy p ref r :: Type
  buildGRpcOWTy :: Proxy p -> Proxy ref -> r -> GRpcOWTy p ref r

instance ToProtoBufTypeRef ref r
         => GRpcOutputWrapper 'MsgProtoBuf ref r where
  type GRpcOWTy 'MsgProtoBuf ref r = ViaToProtoBufTypeRef ref r
  buildGRpcOWTy :: Proxy 'MsgProtoBuf -> Proxy ref -> r -> GRpcOWTy 'MsgProtoBuf ref r
buildGRpcOWTy _ _ = r -> GRpcOWTy 'MsgProtoBuf ref r
forall (ref :: TypeRef) t. t -> ViaToProtoBufTypeRef ref t
ViaToProtoBufTypeRef

instance (GRPCOutput AvroRPC (ViaToAvroTypeRef ('ViaSchema sch sty) r))
         => GRpcOutputWrapper 'MsgAvro ('ViaSchema sch sty) r where
  type GRpcOWTy 'MsgAvro ('ViaSchema sch sty) r = ViaToAvroTypeRef ('ViaSchema sch sty) r
  buildGRpcOWTy :: Proxy 'MsgAvro
-> Proxy ('ViaSchema sch sty)
-> r
-> GRpcOWTy 'MsgAvro ('ViaSchema sch sty) r
buildGRpcOWTy _ _ = r -> GRpcOWTy 'MsgAvro ('ViaSchema sch sty) r
forall (ref :: TypeRef) t. t -> ViaToAvroTypeRef ref t
ViaToAvroTypeRef

class GRPCInput (RPCTy p) (GRpcIWTy p ref r)
      => GRpcInputWrapper (p :: GRpcMessageProtocol) (ref :: TypeRef) (r :: Type) where
  type GRpcIWTy p ref r :: Type
  unGRpcIWTy :: Proxy p -> Proxy ref -> GRpcIWTy p ref r -> r

instance FromProtoBufTypeRef ref r
         => GRpcInputWrapper 'MsgProtoBuf ref r where
  type GRpcIWTy 'MsgProtoBuf ref r = ViaFromProtoBufTypeRef ref r
  unGRpcIWTy :: Proxy 'MsgProtoBuf -> Proxy ref -> GRpcIWTy 'MsgProtoBuf ref r -> r
unGRpcIWTy _ _ = GRpcIWTy 'MsgProtoBuf ref r -> r
forall (ref :: TypeRef) t. ViaFromProtoBufTypeRef ref t -> t
unViaFromProtoBufTypeRef

instance (GRPCInput AvroRPC (ViaFromAvroTypeRef ('ViaSchema sch sty) r))
         => GRpcInputWrapper 'MsgAvro ('ViaSchema sch sty) r where
  type GRpcIWTy 'MsgAvro ('ViaSchema sch sty) r = ViaFromAvroTypeRef ('ViaSchema sch sty) r
  unGRpcIWTy :: Proxy 'MsgAvro
-> Proxy ('ViaSchema sch sty)
-> GRpcIWTy 'MsgAvro ('ViaSchema sch sty) r
-> r
unGRpcIWTy _ _ = GRpcIWTy 'MsgAvro ('ViaSchema sch sty) r -> r
forall (ref :: TypeRef) t. ViaFromAvroTypeRef ref t -> t
unViaFromAvroTypeRef

---

instance (GRPCInput (RPCTy p) (), GRPCOutput (RPCTy p) ())
         => GRpcMethodHandler p m '[ ] 'RetNothing (m ()) where
  gRpcMethodHandler :: (forall a. m a -> ServerErrorIO a)
-> Proxy p
-> Proxy '[]
-> Proxy 'RetNothing
-> RPCTy p
-> m ()
-> ServiceHandler
gRpcMethodHandler f :: forall a. m a -> ServerErrorIO a
f _ _ _ rpc :: RPCTy p
rpc h :: m ()
h
    = RPCTy p -> UnaryHandler () () -> ServiceHandler
forall r i o.
(GRPCInput r i, GRPCOutput r o) =>
r -> UnaryHandler i o -> ServiceHandler
unary @_ @() @() RPCTy p
rpc (\_ _ -> ServerErrorIO () -> IO ()
forall (m :: * -> *) a. MonadIO m => ServerErrorIO a -> m a
raiseErrors (m () -> ServerErrorIO ()
forall a. m a -> ServerErrorIO a
f m ()
h))

-----

instance (GRPCInput (RPCTy p) (), GRpcOutputWrapper p rref r)
         => GRpcMethodHandler p m '[ ] ('RetSingle rref) (m r) where
  gRpcMethodHandler :: (forall a. m a -> ServerErrorIO a)
-> Proxy p
-> Proxy '[]
-> Proxy ('RetSingle rref)
-> RPCTy p
-> m r
-> ServiceHandler
gRpcMethodHandler f :: forall a. m a -> ServerErrorIO a
f _ _ _ rpc :: RPCTy p
rpc h :: m r
h
    = RPCTy p -> UnaryHandler () (GRpcOWTy p rref r) -> ServiceHandler
forall r i o.
(GRPCInput r i, GRPCOutput r o) =>
r -> UnaryHandler i o -> ServiceHandler
unary @_ @() @(GRpcOWTy p rref r)
            RPCTy p
rpc (\_ _ -> Proxy p -> Proxy rref -> r -> GRpcOWTy p rref r
forall (p :: GRpcMessageProtocol) (ref :: TypeRef) r.
GRpcOutputWrapper p ref r =>
Proxy p -> Proxy ref -> r -> GRpcOWTy p ref r
buildGRpcOWTy (Proxy p
forall k (t :: k). Proxy t
Proxy @p) (Proxy rref
forall k (t :: k). Proxy t
Proxy @rref) (r -> GRpcOWTy p rref r) -> IO r -> IO (GRpcOWTy p rref r)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ServerErrorIO r -> IO r
forall (m :: * -> *) a. MonadIO m => ServerErrorIO a -> m a
raiseErrors (m r -> ServerErrorIO r
forall a. m a -> ServerErrorIO a
f m r
h))

-----

instance (GRPCInput (RPCTy p) (), GRpcOutputWrapper p rref r, MonadIO m)
         => GRpcMethodHandler p m '[ ] ('RetStream rref)
                              (ConduitT r Void m () -> m ()) where
  gRpcMethodHandler :: (forall a. m a -> ServerErrorIO a)
-> Proxy p
-> Proxy '[]
-> Proxy ('RetStream rref)
-> RPCTy p
-> (ConduitT r Void m () -> m ())
-> ServiceHandler
gRpcMethodHandler f :: forall a. m a -> ServerErrorIO a
f _ _ _ rpc :: RPCTy p
rpc h :: ConduitT r Void m () -> m ()
h
    = RPCTy p
-> ServerStreamHandler () (GRpcOWTy p rref r) () -> ServiceHandler
forall r i o a.
(GRPCInput r i, GRPCOutput r o) =>
r -> ServerStreamHandler i o a -> ServiceHandler
serverStream @_ @() @(GRpcOWTy p rref r) RPCTy p
rpc ServerStreamHandler () (GRpcOWTy p rref r) ()
forall req.
req -> () -> IO ((), ServerStream (GRpcOWTy p rref r) ())
sstream
    where sstream :: req -> ()
                  -> IO ((), ServerStream (GRpcOWTy p rref r) ())
          sstream :: req -> () -> IO ((), ServerStream (GRpcOWTy p rref r) ())
sstream _ _ = do
            -- Variable to connect input and output
            TMVar (Maybe r)
var <- IO (TMVar (Maybe r))
forall a. IO (TMVar a)
newEmptyTMVarIO :: IO (TMVar (Maybe r))
            -- Start executing the handler
            Async ()
promise <- IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async (ServerErrorIO () -> IO ()
forall (m :: * -> *) a. MonadIO m => ServerErrorIO a -> m a
raiseErrors (ServerErrorIO () -> IO ()) -> ServerErrorIO () -> IO ()
forall a b. (a -> b) -> a -> b
$ m () -> ServerErrorIO ()
forall a. m a -> ServerErrorIO a
f (ConduitT r Void m () -> m ()
h (TMVar (Maybe r) -> ConduitT r Void m ()
forall (m :: * -> *) r.
MonadIO m =>
TMVar (Maybe r) -> ConduitT r Void m ()
toTMVarConduit TMVar (Maybe r)
var)))
              -- Return the information
            let readNext :: () -> IO (Maybe ((), GRpcOWTy p rref r))
readNext _
                  = do Maybe r
nextOutput <- STM (Maybe r) -> IO (Maybe r)
forall a. STM a -> IO a
atomically (STM (Maybe r) -> IO (Maybe r)) -> STM (Maybe r) -> IO (Maybe r)
forall a b. (a -> b) -> a -> b
$ TMVar (Maybe r) -> STM (Maybe r)
forall a. TMVar a -> STM a
takeTMVar TMVar (Maybe r)
var
                       case Maybe r
nextOutput of
                         Just o :: r
o  -> Maybe ((), GRpcOWTy p rref r) -> IO (Maybe ((), GRpcOWTy p rref r))
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe ((), GRpcOWTy p rref r)
 -> IO (Maybe ((), GRpcOWTy p rref r)))
-> Maybe ((), GRpcOWTy p rref r)
-> IO (Maybe ((), GRpcOWTy p rref r))
forall a b. (a -> b) -> a -> b
$ ((), GRpcOWTy p rref r) -> Maybe ((), GRpcOWTy p rref r)
forall a. a -> Maybe a
Just ((), Proxy p -> Proxy rref -> r -> GRpcOWTy p rref r
forall (p :: GRpcMessageProtocol) (ref :: TypeRef) r.
GRpcOutputWrapper p ref r =>
Proxy p -> Proxy ref -> r -> GRpcOWTy p ref r
buildGRpcOWTy (Proxy p
forall k (t :: k). Proxy t
Proxy @p) (Proxy rref
forall k (t :: k). Proxy t
Proxy @rref) r
o)
                         Nothing -> do Async () -> IO ()
forall a. Async a -> IO ()
cancel Async ()
promise
                                       Maybe ((), GRpcOWTy p rref r) -> IO (Maybe ((), GRpcOWTy p rref r))
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe ((), GRpcOWTy p rref r)
forall a. Maybe a
Nothing
            ((), ServerStream (GRpcOWTy p rref r) ())
-> IO ((), ServerStream (GRpcOWTy p rref r) ())
forall (m :: * -> *) a. Monad m => a -> m a
return ((), (() -> IO (Maybe ((), GRpcOWTy p rref r)))
-> ServerStream (GRpcOWTy p rref r) ()
forall o a. (a -> IO (Maybe (a, o))) -> ServerStream o a
ServerStream () -> IO (Maybe ((), GRpcOWTy p rref r))
readNext)

-----

instance (GRpcInputWrapper p vref v, GRPCOutput (RPCTy p) ())
         => GRpcMethodHandler p m '[ 'ArgSingle vref ] 'RetNothing (v -> m ()) where
  gRpcMethodHandler :: (forall a. m a -> ServerErrorIO a)
-> Proxy p
-> Proxy '[ 'ArgSingle vref]
-> Proxy 'RetNothing
-> RPCTy p
-> (v -> m ())
-> ServiceHandler
gRpcMethodHandler f :: forall a. m a -> ServerErrorIO a
f _ _ _ rpc :: RPCTy p
rpc h :: v -> m ()
h
    = RPCTy p -> UnaryHandler (GRpcIWTy p vref v) () -> ServiceHandler
forall r i o.
(GRPCInput r i, GRPCOutput r o) =>
r -> UnaryHandler i o -> ServiceHandler
unary @_ @(GRpcIWTy p vref v) @()
            RPCTy p
rpc (\_ -> ServerErrorIO () -> IO ()
forall (m :: * -> *) a. MonadIO m => ServerErrorIO a -> m a
raiseErrors (ServerErrorIO () -> IO ())
-> (GRpcIWTy p vref v -> ServerErrorIO ())
-> GRpcIWTy p vref v
-> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m () -> ServerErrorIO ()
forall a. m a -> ServerErrorIO a
f (m () -> ServerErrorIO ())
-> (GRpcIWTy p vref v -> m ())
-> GRpcIWTy p vref v
-> ServerErrorIO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. v -> m ()
h (v -> m ())
-> (GRpcIWTy p vref v -> v) -> GRpcIWTy p vref v -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Proxy p -> Proxy vref -> GRpcIWTy p vref v -> v
forall (p :: GRpcMessageProtocol) (ref :: TypeRef) r.
GRpcInputWrapper p ref r =>
Proxy p -> Proxy ref -> GRpcIWTy p ref r -> r
unGRpcIWTy (Proxy p
forall k (t :: k). Proxy t
Proxy @p) (Proxy vref
forall k (t :: k). Proxy t
Proxy @vref))

-----

instance (GRpcInputWrapper p vref v, GRpcOutputWrapper p rref r)
         => GRpcMethodHandler p m '[ 'ArgSingle vref ] ('RetSingle rref) (v -> m r) where
  gRpcMethodHandler :: (forall a. m a -> ServerErrorIO a)
-> Proxy p
-> Proxy '[ 'ArgSingle vref]
-> Proxy ('RetSingle rref)
-> RPCTy p
-> (v -> m r)
-> ServiceHandler
gRpcMethodHandler f :: forall a. m a -> ServerErrorIO a
f _ _ _ rpc :: RPCTy p
rpc h :: v -> m r
h
    = RPCTy p
-> UnaryHandler (GRpcIWTy p vref v) (GRpcOWTy p rref r)
-> ServiceHandler
forall r i o.
(GRPCInput r i, GRPCOutput r o) =>
r -> UnaryHandler i o -> ServiceHandler
unary @_ @(GRpcIWTy p vref v) @(GRpcOWTy p rref r)
            RPCTy p
rpc (\_ -> (Proxy p -> Proxy rref -> r -> GRpcOWTy p rref r
forall (p :: GRpcMessageProtocol) (ref :: TypeRef) r.
GRpcOutputWrapper p ref r =>
Proxy p -> Proxy ref -> r -> GRpcOWTy p ref r
buildGRpcOWTy (Proxy p
forall k (t :: k). Proxy t
Proxy @p) (Proxy rref
forall k (t :: k). Proxy t
Proxy @rref) (r -> GRpcOWTy p rref r) -> IO r -> IO (GRpcOWTy p rref r)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>)
                       (IO r -> IO (GRpcOWTy p rref r))
-> (GRpcIWTy p vref v -> IO r)
-> GRpcIWTy p vref v
-> IO (GRpcOWTy p rref r)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ServerErrorIO r -> IO r
forall (m :: * -> *) a. MonadIO m => ServerErrorIO a -> m a
raiseErrors (ServerErrorIO r -> IO r)
-> (GRpcIWTy p vref v -> ServerErrorIO r)
-> GRpcIWTy p vref v
-> IO r
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m r -> ServerErrorIO r
forall a. m a -> ServerErrorIO a
f (m r -> ServerErrorIO r)
-> (GRpcIWTy p vref v -> m r)
-> GRpcIWTy p vref v
-> ServerErrorIO r
forall b c a. (b -> c) -> (a -> b) -> a -> c
. v -> m r
h
                       (v -> m r) -> (GRpcIWTy p vref v -> v) -> GRpcIWTy p vref v -> m r
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Proxy p -> Proxy vref -> GRpcIWTy p vref v -> v
forall (p :: GRpcMessageProtocol) (ref :: TypeRef) r.
GRpcInputWrapper p ref r =>
Proxy p -> Proxy ref -> GRpcIWTy p ref r -> r
unGRpcIWTy (Proxy p
forall k (t :: k). Proxy t
Proxy @p) (Proxy vref
forall k (t :: k). Proxy t
Proxy @vref))

-----

instance (GRpcInputWrapper p vref v, GRpcOutputWrapper p rref r, MonadIO m)
         => GRpcMethodHandler p m '[ 'ArgStream vref ] ('RetSingle rref)
                              (ConduitT () v m () -> m r) where
  gRpcMethodHandler :: (forall a. m a -> ServerErrorIO a)
-> Proxy p
-> Proxy '[ 'ArgStream vref]
-> Proxy ('RetSingle rref)
-> RPCTy p
-> (ConduitT () v m () -> m r)
-> ServiceHandler
gRpcMethodHandler f :: forall a. m a -> ServerErrorIO a
f _ _ _ rpc :: RPCTy p
rpc h :: ConduitT () v m () -> m r
h
    = RPCTy p
-> ClientStreamHandler (GRpcIWTy p vref v) (GRpcOWTy p rref r) ()
-> ServiceHandler
forall r i o a.
(GRPCInput r i, GRPCOutput r o) =>
r -> ClientStreamHandler i o a -> ServiceHandler
clientStream @_ @(GRpcIWTy p vref v) @(GRpcOWTy p rref r)
                   RPCTy p
rpc ClientStreamHandler (GRpcIWTy p vref v) (GRpcOWTy p rref r) ()
forall req.
req
-> IO ((), ClientStream (GRpcIWTy p vref v) (GRpcOWTy p rref r) ())
cstream
    where cstream :: req
                  -> IO ((), ClientStream (GRpcIWTy p vref v)
                        (GRpcOWTy p rref r) ())
          cstream :: req
-> IO ((), ClientStream (GRpcIWTy p vref v) (GRpcOWTy p rref r) ())
cstream _ = do
            -- Create a new TMChan
            TMChan v
chan <- IO (TMChan v)
forall a. IO (TMChan a)
newTMChanIO :: IO (TMChan v)
            let producer :: ConduitT () v m ()
producer = TMChan v -> ConduitT () v m ()
forall (m :: * -> *) a. MonadIO m => TMChan a -> ConduitT () a m ()
sourceTMChan @m TMChan v
chan
            -- Start executing the handler in another thread
            Async (GRpcOWTy p rref r)
promise <- IO (GRpcOWTy p rref r) -> IO (Async (GRpcOWTy p rref r))
forall a. IO a -> IO (Async a)
async (ServerErrorIO (GRpcOWTy p rref r) -> IO (GRpcOWTy p rref r)
forall (m :: * -> *) a. MonadIO m => ServerErrorIO a -> m a
raiseErrors (ServerErrorIO (GRpcOWTy p rref r) -> IO (GRpcOWTy p rref r))
-> ServerErrorIO (GRpcOWTy p rref r) -> IO (GRpcOWTy p rref r)
forall a b. (a -> b) -> a -> b
$ Proxy p -> Proxy rref -> r -> GRpcOWTy p rref r
forall (p :: GRpcMessageProtocol) (ref :: TypeRef) r.
GRpcOutputWrapper p ref r =>
Proxy p -> Proxy ref -> r -> GRpcOWTy p ref r
buildGRpcOWTy (Proxy p
forall k (t :: k). Proxy t
Proxy @p) (Proxy rref
forall k (t :: k). Proxy t
Proxy @rref) (r -> GRpcOWTy p rref r)
-> ExceptT ServerError IO r -> ServerErrorIO (GRpcOWTy p rref r)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> m r -> ExceptT ServerError IO r
forall a. m a -> ServerErrorIO a
f (ConduitT () v m () -> m r
h ConduitT () v m ()
producer))
            -- Build the actual handler
            let cstreamHandler :: () -> GRpcIWTy p vref v -> IO ()
cstreamHandler _ newInput :: GRpcIWTy p vref v
newInput
                  = STM () -> IO ()
forall a. STM a -> IO a
atomically (TMChan v -> v -> STM ()
forall a. TMChan a -> a -> STM ()
writeTMChan TMChan v
chan (Proxy p -> Proxy vref -> GRpcIWTy p vref v -> v
forall (p :: GRpcMessageProtocol) (ref :: TypeRef) r.
GRpcInputWrapper p ref r =>
Proxy p -> Proxy ref -> GRpcIWTy p ref r -> r
unGRpcIWTy (Proxy p
forall k (t :: k). Proxy t
Proxy @p) (Proxy vref
forall k (t :: k). Proxy t
Proxy @vref) GRpcIWTy p vref v
newInput))
                cstreamFinalizer :: () -> IO (GRpcOWTy p rref r)
cstreamFinalizer _
                  = STM () -> IO ()
forall a. STM a -> IO a
atomically (TMChan v -> STM ()
forall a. TMChan a -> STM ()
closeTMChan TMChan v
chan) IO () -> IO (GRpcOWTy p rref r) -> IO (GRpcOWTy p rref r)
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Async (GRpcOWTy p rref r) -> IO (GRpcOWTy p rref r)
forall a. Async a -> IO a
wait Async (GRpcOWTy p rref r)
promise
            -- Return the information
            ((), ClientStream (GRpcIWTy p vref v) (GRpcOWTy p rref r) ())
-> IO ((), ClientStream (GRpcIWTy p vref v) (GRpcOWTy p rref r) ())
forall (m :: * -> *) a. Monad m => a -> m a
return ((), (() -> GRpcIWTy p vref v -> IO ())
-> (() -> IO (GRpcOWTy p rref r))
-> ClientStream (GRpcIWTy p vref v) (GRpcOWTy p rref r) ()
forall i o a. (a -> i -> IO a) -> (a -> IO o) -> ClientStream i o a
ClientStream () -> GRpcIWTy p vref v -> IO ()
cstreamHandler () -> IO (GRpcOWTy p rref r)
cstreamFinalizer)

-----

instance (GRpcInputWrapper p vref v, GRpcOutputWrapper p rref r, MonadIO m)
         => GRpcMethodHandler p m '[ 'ArgSingle vref ] ('RetStream rref)
                              (v -> ConduitT r Void m () -> m ()) where
  gRpcMethodHandler :: (forall a. m a -> ServerErrorIO a)
-> Proxy p
-> Proxy '[ 'ArgSingle vref]
-> Proxy ('RetStream rref)
-> RPCTy p
-> (v -> ConduitT r Void m () -> m ())
-> ServiceHandler
gRpcMethodHandler f :: forall a. m a -> ServerErrorIO a
f _ _ _ rpc :: RPCTy p
rpc h :: v -> ConduitT r Void m () -> m ()
h
    = RPCTy p
-> ServerStreamHandler (GRpcIWTy p vref v) (GRpcOWTy p rref r) ()
-> ServiceHandler
forall r i o a.
(GRPCInput r i, GRPCOutput r o) =>
r -> ServerStreamHandler i o a -> ServiceHandler
serverStream @_ @(GRpcIWTy p vref v) @(GRpcOWTy p rref r)
                   RPCTy p
rpc ServerStreamHandler (GRpcIWTy p vref v) (GRpcOWTy p rref r) ()
forall req.
req
-> GRpcIWTy p vref v
-> IO ((), ServerStream (GRpcOWTy p rref r) ())
sstream
    where sstream :: req -> GRpcIWTy p vref v
                  -> IO ((), ServerStream (GRpcOWTy p rref r) ())
          sstream :: req
-> GRpcIWTy p vref v
-> IO ((), ServerStream (GRpcOWTy p rref r) ())
sstream _ v :: GRpcIWTy p vref v
v = do
            -- Variable to connect input and output
            TMVar (Maybe r)
var <- IO (TMVar (Maybe r))
forall a. IO (TMVar a)
newEmptyTMVarIO :: IO (TMVar (Maybe r))
            -- Start executing the handler
            let v' :: v
v' = Proxy p -> Proxy vref -> GRpcIWTy p vref v -> v
forall (p :: GRpcMessageProtocol) (ref :: TypeRef) r.
GRpcInputWrapper p ref r =>
Proxy p -> Proxy ref -> GRpcIWTy p ref r -> r
unGRpcIWTy (Proxy p
forall k (t :: k). Proxy t
Proxy @p) (Proxy vref
forall k (t :: k). Proxy t
Proxy @vref) GRpcIWTy p vref v
v
            Async ()
promise <- IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async (ServerErrorIO () -> IO ()
forall (m :: * -> *) a. MonadIO m => ServerErrorIO a -> m a
raiseErrors (ServerErrorIO () -> IO ()) -> ServerErrorIO () -> IO ()
forall a b. (a -> b) -> a -> b
$ m () -> ServerErrorIO ()
forall a. m a -> ServerErrorIO a
f (v -> ConduitT r Void m () -> m ()
h v
v' (TMVar (Maybe r) -> ConduitT r Void m ()
forall (m :: * -> *) r.
MonadIO m =>
TMVar (Maybe r) -> ConduitT r Void m ()
toTMVarConduit TMVar (Maybe r)
var)))
              -- Return the information
            let readNext :: () -> IO (Maybe ((), GRpcOWTy p rref r))
readNext _
                  = do Maybe r
nextOutput <- STM (Maybe r) -> IO (Maybe r)
forall a. STM a -> IO a
atomically (STM (Maybe r) -> IO (Maybe r)) -> STM (Maybe r) -> IO (Maybe r)
forall a b. (a -> b) -> a -> b
$ TMVar (Maybe r) -> STM (Maybe r)
forall a. TMVar a -> STM a
takeTMVar TMVar (Maybe r)
var
                       case Maybe r
nextOutput of
                         Just o :: r
o  -> Maybe ((), GRpcOWTy p rref r) -> IO (Maybe ((), GRpcOWTy p rref r))
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe ((), GRpcOWTy p rref r)
 -> IO (Maybe ((), GRpcOWTy p rref r)))
-> Maybe ((), GRpcOWTy p rref r)
-> IO (Maybe ((), GRpcOWTy p rref r))
forall a b. (a -> b) -> a -> b
$ ((), GRpcOWTy p rref r) -> Maybe ((), GRpcOWTy p rref r)
forall a. a -> Maybe a
Just ((), Proxy p -> Proxy rref -> r -> GRpcOWTy p rref r
forall (p :: GRpcMessageProtocol) (ref :: TypeRef) r.
GRpcOutputWrapper p ref r =>
Proxy p -> Proxy ref -> r -> GRpcOWTy p ref r
buildGRpcOWTy (Proxy p
forall k (t :: k). Proxy t
Proxy @p) (Proxy rref
forall k (t :: k). Proxy t
Proxy @rref) r
o)
                         Nothing -> do Async () -> IO ()
forall a. Async a -> IO ()
cancel Async ()
promise
                                       Maybe ((), GRpcOWTy p rref r) -> IO (Maybe ((), GRpcOWTy p rref r))
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe ((), GRpcOWTy p rref r)
forall a. Maybe a
Nothing
            ((), ServerStream (GRpcOWTy p rref r) ())
-> IO ((), ServerStream (GRpcOWTy p rref r) ())
forall (m :: * -> *) a. Monad m => a -> m a
return ((), (() -> IO (Maybe ((), GRpcOWTy p rref r)))
-> ServerStream (GRpcOWTy p rref r) ()
forall o a. (a -> IO (Maybe (a, o))) -> ServerStream o a
ServerStream () -> IO (Maybe ((), GRpcOWTy p rref r))
readNext)

-----

instance (GRpcInputWrapper p vref v, GRpcOutputWrapper p rref r, MonadIO m)
         => GRpcMethodHandler p m '[ 'ArgStream vref ] ('RetStream rref)
                              (ConduitT () v m () -> ConduitT r Void m () -> m ()) where
  gRpcMethodHandler :: (forall a. m a -> ServerErrorIO a)
-> Proxy p
-> Proxy '[ 'ArgStream vref]
-> Proxy ('RetStream rref)
-> RPCTy p
-> (ConduitT () v m () -> ConduitT r Void m () -> m ())
-> ServiceHandler
gRpcMethodHandler f :: forall a. m a -> ServerErrorIO a
f _ _ _ rpc :: RPCTy p
rpc h :: ConduitT () v m () -> ConduitT r Void m () -> m ()
h
    = RPCTy p
-> GeneralStreamHandler
     (GRpcIWTy p vref v) (GRpcOWTy p rref r) () ()
-> ServiceHandler
forall r i o a b.
(GRPCInput r i, GRPCOutput r o) =>
r -> GeneralStreamHandler i o a b -> ServiceHandler
generalStream @_ @(GRpcIWTy p vref v) @(GRpcOWTy p rref r)
                    RPCTy p
rpc GeneralStreamHandler (GRpcIWTy p vref v) (GRpcOWTy p rref r) () ()
forall req.
req
-> IO
     ((), IncomingStream (GRpcIWTy p vref v) (), (),
      OutgoingStream (GRpcOWTy p rref r) ())
bdstream
    where bdstream :: req -> IO ( (), IncomingStream (GRpcIWTy p vref v) ()
                                , (), OutgoingStream (GRpcOWTy p rref r) () )
          bdstream :: req
-> IO
     ((), IncomingStream (GRpcIWTy p vref v) (), (),
      OutgoingStream (GRpcOWTy p rref r) ())
bdstream _ = do
            -- Create a new TMChan and a new variable
            TMChan v
chan <- IO (TMChan v)
forall a. IO (TMChan a)
newTMChanIO :: IO (TMChan v)
            let producer :: ConduitT () v m ()
producer = TMChan v -> ConduitT () v m ()
forall (m :: * -> *) a. MonadIO m => TMChan a -> ConduitT () a m ()
sourceTMChan @m TMChan v
chan
            TMVar (Maybe r)
var <- IO (TMVar (Maybe r))
forall a. IO (TMVar a)
newEmptyTMVarIO :: IO (TMVar (Maybe r))
            -- Start executing the handler
            Async ()
promise <- IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async (ServerErrorIO () -> IO ()
forall (m :: * -> *) a. MonadIO m => ServerErrorIO a -> m a
raiseErrors (ServerErrorIO () -> IO ()) -> ServerErrorIO () -> IO ()
forall a b. (a -> b) -> a -> b
$ m () -> ServerErrorIO ()
forall a. m a -> ServerErrorIO a
f (m () -> ServerErrorIO ()) -> m () -> ServerErrorIO ()
forall a b. (a -> b) -> a -> b
$ ConduitT () v m () -> ConduitT r Void m () -> m ()
h ConduitT () v m ()
producer (TMVar (Maybe r) -> ConduitT r Void m ()
forall (m :: * -> *) r.
MonadIO m =>
TMVar (Maybe r) -> ConduitT r Void m ()
toTMVarConduit TMVar (Maybe r)
var))
            -- Build the actual handler
            let cstreamHandler :: () -> GRpcIWTy p vref v -> IO ()
cstreamHandler _ newInput :: GRpcIWTy p vref v
newInput
                  = STM () -> IO ()
forall a. STM a -> IO a
atomically (TMChan v -> v -> STM ()
forall a. TMChan a -> a -> STM ()
writeTMChan TMChan v
chan (Proxy p -> Proxy vref -> GRpcIWTy p vref v -> v
forall (p :: GRpcMessageProtocol) (ref :: TypeRef) r.
GRpcInputWrapper p ref r =>
Proxy p -> Proxy ref -> GRpcIWTy p ref r -> r
unGRpcIWTy (Proxy p
forall k (t :: k). Proxy t
Proxy @p) (Proxy vref
forall k (t :: k). Proxy t
Proxy @vref) GRpcIWTy p vref v
newInput))
                cstreamFinalizer :: () -> IO ()
cstreamFinalizer _
                  = STM () -> IO ()
forall a. STM a -> IO a
atomically (TMChan v -> STM ()
forall a. TMChan a -> STM ()
closeTMChan TMChan v
chan) IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Async () -> IO ()
forall a. Async a -> IO a
wait Async ()
promise
                readNext :: () -> IO (Maybe ((), GRpcOWTy p rref r))
readNext _
                  = do Maybe (Maybe r)
nextOutput <- STM (Maybe (Maybe r)) -> IO (Maybe (Maybe r))
forall a. STM a -> IO a
atomically (STM (Maybe (Maybe r)) -> IO (Maybe (Maybe r)))
-> STM (Maybe (Maybe r)) -> IO (Maybe (Maybe r))
forall a b. (a -> b) -> a -> b
$ TMVar (Maybe r) -> STM (Maybe (Maybe r))
forall a. TMVar a -> STM (Maybe a)
tryTakeTMVar TMVar (Maybe r)
var
                       case Maybe (Maybe r)
nextOutput of
                         Just (Just o :: r
o) ->
                           Maybe ((), GRpcOWTy p rref r) -> IO (Maybe ((), GRpcOWTy p rref r))
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe ((), GRpcOWTy p rref r)
 -> IO (Maybe ((), GRpcOWTy p rref r)))
-> Maybe ((), GRpcOWTy p rref r)
-> IO (Maybe ((), GRpcOWTy p rref r))
forall a b. (a -> b) -> a -> b
$ ((), GRpcOWTy p rref r) -> Maybe ((), GRpcOWTy p rref r)
forall a. a -> Maybe a
Just ((), Proxy p -> Proxy rref -> r -> GRpcOWTy p rref r
forall (p :: GRpcMessageProtocol) (ref :: TypeRef) r.
GRpcOutputWrapper p ref r =>
Proxy p -> Proxy ref -> r -> GRpcOWTy p ref r
buildGRpcOWTy (Proxy p
forall k (t :: k). Proxy t
Proxy @p) (Proxy rref
forall k (t :: k). Proxy t
Proxy @rref) r
o)
                         Just Nothing  -> do
                           Async () -> IO ()
forall a. Async a -> IO ()
cancel Async ()
promise
                           Maybe ((), GRpcOWTy p rref r) -> IO (Maybe ((), GRpcOWTy p rref r))
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe ((), GRpcOWTy p rref r)
forall a. Maybe a
Nothing
                         Nothing -> -- no new elements to output
                           () -> IO (Maybe ((), GRpcOWTy p rref r))
readNext ()
            ((), IncomingStream (GRpcIWTy p vref v) (), (),
 OutgoingStream (GRpcOWTy p rref r) ())
-> IO
     ((), IncomingStream (GRpcIWTy p vref v) (), (),
      OutgoingStream (GRpcOWTy p rref r) ())
forall (m :: * -> *) a. Monad m => a -> m a
return ((), (() -> GRpcIWTy p vref v -> IO ())
-> (() -> IO ()) -> IncomingStream (GRpcIWTy p vref v) ()
forall i a. (a -> i -> IO a) -> (a -> IO ()) -> IncomingStream i a
IncomingStream () -> GRpcIWTy p vref v -> IO ()
cstreamHandler () -> IO ()
cstreamFinalizer, (), (() -> IO (Maybe ((), GRpcOWTy p rref r)))
-> OutgoingStream (GRpcOWTy p rref r) ()
forall o a. (a -> IO (Maybe (a, o))) -> OutgoingStream o a
OutgoingStream () -> IO (Maybe ((), GRpcOWTy p rref r))
readNext)

-----

toTMVarConduit :: MonadIO m => TMVar (Maybe r) -> ConduitT r Void m ()
toTMVarConduit :: TMVar (Maybe r) -> ConduitT r Void m ()
toTMVarConduit var :: TMVar (Maybe r)
var = do
  Maybe r
x <- ConduitT r Void m (Maybe r)
forall (m :: * -> *) i. Monad m => Consumer i m (Maybe i)
await
  IO () -> ConduitT r Void m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ConduitT r Void m ()) -> IO () -> ConduitT r Void m ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMVar (Maybe r) -> Maybe r -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar TMVar (Maybe r)
var Maybe r
x
  TMVar (Maybe r) -> ConduitT r Void m ()
forall (m :: * -> *) r.
MonadIO m =>
TMVar (Maybe r) -> ConduitT r Void m ()
toTMVarConduit TMVar (Maybe r)
var