{-# LANGUAGE DerivingVia, DeriveGeneric, RankNTypes, ScopedTypeVariables, MultiParamTypeClasses, OverloadedStrings, GeneralizedNewtypeDeriving, CPP, ExistentialQuantification, StandaloneDeriving, GADTs, UnboxedTuples, BangPatterns #-}
{-# OPTIONS_GHC -fno-warn-orphans #-}
{- HLINT ignore "Use lambda-case" -}
module Network.RPC.Curryer.Server where
import qualified Streamly.Data.Stream.Prelude as SP
import Streamly.Data.Stream as Stream hiding (foldr)
import Streamly.Internal.Data.Stream.Concurrent as Stream
import Streamly.Internal.Serialize.FromBytes (word32be)
import Streamly.Network.Socket as SSock
import Network.Socket as Socket
import Network.Socket.ByteString as Socket
import Streamly.Data.Parser as P
import Codec.Winery
import Codec.Winery.Internal (varInt, decodeVarInt, getBytes)
import GHC.Generics
import GHC.Fingerprint
import Data.Typeable
import Control.Concurrent.MVar (MVar, newMVar, withMVar)
import Control.Exception
import Data.Function ((&))
import Data.Word
import qualified Data.ByteString as BS
import qualified Data.ByteString.Lazy as BSL
import qualified Data.ByteString.FastBuilder as BB
import Streamly.Data.Fold as FL hiding (foldr)
--import qualified Streamly.Internal.Data.Stream.IsStream as P
import qualified Streamly.Data.Stream.Prelude as P
import qualified Streamly.External.ByteString as StreamlyBS
import qualified Data.Binary as B
import qualified Data.UUID as UUIDBase
import qualified Data.UUID.V4 as UUIDBase
import Control.Monad
import Data.Functor
import Control.Applicative

import qualified Network.RPC.Curryer.StreamlyAdditions as SA
import Data.Hashable
import System.Timeout
import qualified Network.ByteOrder as BO
import qualified Streamly.Internal.Data.Array.Type as Arr


#define CURRYER_SHOW_BYTES 0
#define CURRYER_PASS_SCHEMA 0

#if CURRYER_SHOW_BYTES == 1
import Debug.Trace
#endif

traceBytes :: Applicative f => String -> BS.ByteString -> f ()  
#if CURRYER_SHOW_BYTES == 1
traceBytes msg bs = traceShowM (msg, BS.length bs, bs)
#else
traceBytes :: String -> ByteString -> f ()
traceBytes String
_ ByteString
_ = () -> f ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
#endif

-- a level of indirection to be able to switch between serialising with and without the winery schema
msgSerialise :: Serialise a => a -> BS.ByteString
#if CURRYER_PASS_SCHEMA == 1
msgSerialise = serialise
#else
msgSerialise :: a -> ByteString
msgSerialise = a -> ByteString
forall a. Serialise a => a -> ByteString
serialiseOnly
#endif

msgDeserialise :: forall s. Serialise s => BS.ByteString -> Either WineryException s
#if CURRYER_PASS_SCHEMA == 1
msgDeserialise = deserialise
#else
msgDeserialise :: ByteString -> Either WineryException s
msgDeserialise = ByteString -> Either WineryException s
forall s. Serialise s => ByteString -> Either WineryException s
deserialiseOnly
#endif

data Locking a = Locking (MVar ()) a

newLock :: a -> IO (Locking a)
newLock :: a -> IO (Locking a)
newLock a
x = do
  MVar ()
lock <- () -> IO (MVar ())
forall a. a -> IO (MVar a)
newMVar ()
  Locking a -> IO (Locking a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (MVar () -> a -> Locking a
forall a. MVar () -> a -> Locking a
Locking MVar ()
lock a
x)
  
withLock :: Locking a -> (a -> IO b) -> IO b
withLock :: Locking a -> (a -> IO b) -> IO b
withLock (Locking MVar ()
mvar a
v) a -> IO b
m =
  MVar () -> (() -> IO b) -> IO b
forall a b. MVar a -> (a -> IO b) -> IO b
withMVar MVar ()
mvar ((() -> IO b) -> IO b) -> (() -> IO b) -> IO b
forall a b. (a -> b) -> a -> b
$ \()
_ -> a -> IO b
m a
v

lockless :: Locking a -> a
lockless :: Locking a -> a
lockless (Locking MVar ()
_ a
a) = a
a

type Timeout = Word32

type BinaryMessage = BS.ByteString

--includes the fingerprint of the incoming data type (held in the BinaryMessage) to determine how to dispatch the message.
--add another envelope type for unencoded binary messages for any easy optimization for in-process communication
data Envelope = Envelope {
  Envelope -> Fingerprint
envFingerprint :: !Fingerprint,
  Envelope -> MessageType
envMessageType :: !MessageType,
  Envelope -> UUID
envMsgId :: !UUID,
  Envelope -> ByteString
envPayload :: !BinaryMessage
  }
  deriving ((forall x. Envelope -> Rep Envelope x)
-> (forall x. Rep Envelope x -> Envelope) -> Generic Envelope
forall x. Rep Envelope x -> Envelope
forall x. Envelope -> Rep Envelope x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep Envelope x -> Envelope
$cfrom :: forall x. Envelope -> Rep Envelope x
Generic, Int -> Envelope -> ShowS
[Envelope] -> ShowS
Envelope -> String
(Int -> Envelope -> ShowS)
-> (Envelope -> String) -> ([Envelope] -> ShowS) -> Show Envelope
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [Envelope] -> ShowS
$cshowList :: [Envelope] -> ShowS
show :: Envelope -> String
$cshow :: Envelope -> String
showsPrec :: Int -> Envelope -> ShowS
$cshowsPrec :: Int -> Envelope -> ShowS
Show)

type TimeoutMicroseconds = Int

#if MIN_VERSION_base(4,15,0)
#else
deriving instance Generic Fingerprint
#endif
deriving via WineryVariant Fingerprint instance Serialise Fingerprint

-- | Internal type used to mark envelope types.
data MessageType = RequestMessage TimeoutMicroseconds
                 | ResponseMessage
                 | TimeoutResponseMessage
                 | ExceptionResponseMessage
                 deriving ((forall x. MessageType -> Rep MessageType x)
-> (forall x. Rep MessageType x -> MessageType)
-> Generic MessageType
forall x. Rep MessageType x -> MessageType
forall x. MessageType -> Rep MessageType x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep MessageType x -> MessageType
$cfrom :: forall x. MessageType -> Rep MessageType x
Generic, Int -> MessageType -> ShowS
[MessageType] -> ShowS
MessageType -> String
(Int -> MessageType -> ShowS)
-> (MessageType -> String)
-> ([MessageType] -> ShowS)
-> Show MessageType
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [MessageType] -> ShowS
$cshowList :: [MessageType] -> ShowS
show :: MessageType -> String
$cshow :: MessageType -> String
showsPrec :: Int -> MessageType -> ShowS
$cshowsPrec :: Int -> MessageType -> ShowS
Show)
                 deriving Typeable MessageType
BundleSerialise MessageType
Extractor MessageType
Decoder MessageType
Typeable MessageType
-> (Proxy MessageType -> SchemaGen Schema)
-> (MessageType -> Builder)
-> Extractor MessageType
-> Decoder MessageType
-> BundleSerialise MessageType
-> Serialise MessageType
Proxy MessageType -> SchemaGen Schema
MessageType -> Builder
forall a.
Typeable a
-> (Proxy a -> SchemaGen Schema)
-> (a -> Builder)
-> Extractor a
-> Decoder a
-> BundleSerialise a
-> Serialise a
bundleSerialise :: BundleSerialise MessageType
$cbundleSerialise :: BundleSerialise MessageType
decodeCurrent :: Decoder MessageType
$cdecodeCurrent :: Decoder MessageType
extractor :: Extractor MessageType
$cextractor :: Extractor MessageType
toBuilder :: MessageType -> Builder
$ctoBuilder :: MessageType -> Builder
schemaGen :: Proxy MessageType -> SchemaGen Schema
$cschemaGen :: Proxy MessageType -> SchemaGen Schema
$cp1Serialise :: Typeable MessageType
Serialise via WineryVariant MessageType

-- | A list of `RequestHandler`s.
type RequestHandlers serverState = [RequestHandler serverState]

-- | Data types for server-side request handlers, in synchronous (client waits for return value) and asynchronous (client does not wait for return value) forms.
data RequestHandler serverState where
  -- | create a request handler with a response
  RequestHandler :: forall a b serverState. (Serialise a, Serialise b) => (ConnectionState serverState -> a -> IO b) -> RequestHandler serverState
  -- | create an asynchronous request handler where the client does not expect nor await a response
  AsyncRequestHandler :: forall a serverState. Serialise a => (ConnectionState serverState -> a -> IO ()) -> RequestHandler serverState

-- | Server state sent in via `serve` and passed to `RequestHandler`s.
data ConnectionState a = ConnectionState {
  ConnectionState a -> a
connectionServerState :: a,
  ConnectionState a -> Locking Socket
connectionSocket :: Locking Socket
  }

-- | Used by server-side request handlers to send additional messages to the client. This is useful for sending asynchronous responses to the client outside of the normal request-response flow. The locking socket can be found in the ConnectionState when a request handler is called.
sendMessage :: Serialise a => Locking Socket -> a -> IO ()
sendMessage :: Locking Socket -> a -> IO ()
sendMessage Locking Socket
lockSock a
msg = do
  UUID
requestID <- UUID -> UUID
UUID (UUID -> UUID) -> IO UUID -> IO UUID
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUIDBase.nextRandom
  let env :: Envelope
env =
        Fingerprint -> MessageType -> UUID -> ByteString -> Envelope
Envelope (a -> Fingerprint
forall a. Typeable a => a -> Fingerprint
fingerprint a
msg) (Int -> MessageType
RequestMessage Int
timeout') UUID
requestID (a -> ByteString
forall a. Serialise a => a -> ByteString
msgSerialise a
msg)
      timeout' :: Int
timeout' = Int
0
  Envelope -> Locking Socket -> IO ()
sendEnvelope Envelope
env Locking Socket
lockSock
  
--avoid orphan instance
newtype UUID = UUID { UUID -> UUID
_unUUID :: UUIDBase.UUID }
  deriving (Int -> UUID -> ShowS
[UUID] -> ShowS
UUID -> String
(Int -> UUID -> ShowS)
-> (UUID -> String) -> ([UUID] -> ShowS) -> Show UUID
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [UUID] -> ShowS
$cshowList :: [UUID] -> ShowS
show :: UUID -> String
$cshow :: UUID -> String
showsPrec :: Int -> UUID -> ShowS
$cshowsPrec :: Int -> UUID -> ShowS
Show, UUID -> UUID -> Bool
(UUID -> UUID -> Bool) -> (UUID -> UUID -> Bool) -> Eq UUID
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: UUID -> UUID -> Bool
$c/= :: UUID -> UUID -> Bool
== :: UUID -> UUID -> Bool
$c== :: UUID -> UUID -> Bool
Eq, Get UUID
[UUID] -> Put
UUID -> Put
(UUID -> Put) -> Get UUID -> ([UUID] -> Put) -> Binary UUID
forall t. (t -> Put) -> Get t -> ([t] -> Put) -> Binary t
putList :: [UUID] -> Put
$cputList :: [UUID] -> Put
get :: Get UUID
$cget :: Get UUID
put :: UUID -> Put
$cput :: UUID -> Put
B.Binary, Eq UUID
Eq UUID -> (Int -> UUID -> Int) -> (UUID -> Int) -> Hashable UUID
Int -> UUID -> Int
UUID -> Int
forall a. Eq a -> (Int -> a -> Int) -> (a -> Int) -> Hashable a
hash :: UUID -> Int
$chash :: UUID -> Int
hashWithSalt :: Int -> UUID -> Int
$chashWithSalt :: Int -> UUID -> Int
$cp1Hashable :: Eq UUID
Hashable)

instance Serialise UUID where
  schemaGen :: Proxy UUID -> SchemaGen Schema
schemaGen Proxy UUID
_ = Schema -> SchemaGen Schema
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Tag -> Schema -> Schema
forall a. Tag -> SchemaP a -> SchemaP a
STag (Text -> Tag
TagStr Text
"Data.UUID") Schema
forall a. SchemaP a
SBytes)
  toBuilder :: UUID -> Builder
toBuilder UUID
uuid = let bytes :: ByteString
bytes = ByteString -> ByteString
BSL.toStrict (UUID -> ByteString
forall a. Binary a => a -> ByteString
B.encode UUID
uuid) in
                     Int -> Builder
forall a. (Bits a, Integral a) => a -> Builder
varInt (ByteString -> Int
BS.length ByteString
bytes) Builder -> Builder -> Builder
forall a. Semigroup a => a -> a -> a
<> ByteString -> Builder
BB.byteString ByteString
bytes
  {-# INLINE toBuilder #-}
  extractor :: Extractor UUID
extractor = (Schema -> Strategy' (Term -> UUID)) -> Extractor UUID
forall a.
Typeable a =>
(Schema -> Strategy' (Term -> a)) -> Extractor a
mkExtractor ((Schema -> Strategy' (Term -> UUID)) -> Extractor UUID)
-> (Schema -> Strategy' (Term -> UUID)) -> Extractor UUID
forall a b. (a -> b) -> a -> b
$
    \Schema
schema' -> case Schema
schema' of
                 STag (TagStr Text
"Data.UUID") Schema
SBytes ->
                   (Term -> UUID) -> Strategy' (Term -> UUID)
forall (f :: * -> *) a. Applicative f => a -> f a
pure ((Term -> UUID) -> Strategy' (Term -> UUID))
-> (Term -> UUID) -> Strategy' (Term -> UUID)
forall a b. (a -> b) -> a -> b
$ \Term
term -> case Term
term of
                              TBytes ByteString
bs -> ByteString -> UUID
forall a. Binary a => ByteString -> a
B.decode (ByteString -> ByteString
BSL.fromStrict ByteString
bs)
                              Term
term' -> ExtractException -> UUID
forall a e. Exception e => e -> a
throw (Term -> ExtractException
InvalidTerm Term
term')
                 Schema
x -> String -> Strategy' (Term -> UUID)
forall a. HasCallStack => String -> a
error (String -> Strategy' (Term -> UUID))
-> String -> Strategy' (Term -> UUID)
forall a b. (a -> b) -> a -> b
$ String
"invalid schema element " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> Schema -> String
forall a. Show a => a -> String
show Schema
x
  decodeCurrent :: Decoder UUID
decodeCurrent = ByteString -> UUID
forall a. Binary a => ByteString -> a
B.decode (ByteString -> UUID)
-> (ByteString -> ByteString) -> ByteString -> UUID
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> ByteString
BSL.fromStrict (ByteString -> UUID) -> Decoder ByteString -> Decoder UUID
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (Decoder Int
forall a. (Num a, Bits a) => Decoder a
decodeVarInt Decoder Int -> (Int -> Decoder ByteString) -> Decoder ByteString
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Int -> Decoder ByteString
getBytes)

-- | Errors from remote calls.
data ConnectionError = CodecError String -- show of WineryException from exception initiator which cannot otherwise be transmitted over a line due to dependencies on TypeReps
                     | TimeoutError
                     | ExceptionError String
                     deriving ((forall x. ConnectionError -> Rep ConnectionError x)
-> (forall x. Rep ConnectionError x -> ConnectionError)
-> Generic ConnectionError
forall x. Rep ConnectionError x -> ConnectionError
forall x. ConnectionError -> Rep ConnectionError x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep ConnectionError x -> ConnectionError
$cfrom :: forall x. ConnectionError -> Rep ConnectionError x
Generic, Int -> ConnectionError -> ShowS
[ConnectionError] -> ShowS
ConnectionError -> String
(Int -> ConnectionError -> ShowS)
-> (ConnectionError -> String)
-> ([ConnectionError] -> ShowS)
-> Show ConnectionError
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [ConnectionError] -> ShowS
$cshowList :: [ConnectionError] -> ShowS
show :: ConnectionError -> String
$cshow :: ConnectionError -> String
showsPrec :: Int -> ConnectionError -> ShowS
$cshowsPrec :: Int -> ConnectionError -> ShowS
Show, ConnectionError -> ConnectionError -> Bool
(ConnectionError -> ConnectionError -> Bool)
-> (ConnectionError -> ConnectionError -> Bool)
-> Eq ConnectionError
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: ConnectionError -> ConnectionError -> Bool
$c/= :: ConnectionError -> ConnectionError -> Bool
== :: ConnectionError -> ConnectionError -> Bool
$c== :: ConnectionError -> ConnectionError -> Bool
Eq)
                     deriving Typeable ConnectionError
BundleSerialise ConnectionError
Extractor ConnectionError
Decoder ConnectionError
Typeable ConnectionError
-> (Proxy ConnectionError -> SchemaGen Schema)
-> (ConnectionError -> Builder)
-> Extractor ConnectionError
-> Decoder ConnectionError
-> BundleSerialise ConnectionError
-> Serialise ConnectionError
Proxy ConnectionError -> SchemaGen Schema
ConnectionError -> Builder
forall a.
Typeable a
-> (Proxy a -> SchemaGen Schema)
-> (a -> Builder)
-> Extractor a
-> Decoder a
-> BundleSerialise a
-> Serialise a
bundleSerialise :: BundleSerialise ConnectionError
$cbundleSerialise :: BundleSerialise ConnectionError
decodeCurrent :: Decoder ConnectionError
$cdecodeCurrent :: Decoder ConnectionError
extractor :: Extractor ConnectionError
$cextractor :: Extractor ConnectionError
toBuilder :: ConnectionError -> Builder
$ctoBuilder :: ConnectionError -> Builder
schemaGen :: Proxy ConnectionError -> SchemaGen Schema
$cschemaGen :: Proxy ConnectionError -> SchemaGen Schema
$cp1Serialise :: Typeable ConnectionError
Serialise via WineryVariant ConnectionError

data TimeoutException = TimeoutException
  deriving Int -> TimeoutException -> ShowS
[TimeoutException] -> ShowS
TimeoutException -> String
(Int -> TimeoutException -> ShowS)
-> (TimeoutException -> String)
-> ([TimeoutException] -> ShowS)
-> Show TimeoutException
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [TimeoutException] -> ShowS
$cshowList :: [TimeoutException] -> ShowS
show :: TimeoutException -> String
$cshow :: TimeoutException -> String
showsPrec :: Int -> TimeoutException -> ShowS
$cshowsPrec :: Int -> TimeoutException -> ShowS
Show

instance Exception TimeoutException  

type HostAddr = (Word8, Word8, Word8, Word8)

type BParser a = Parser Word8 IO a

allHostAddrs,localHostAddr :: HostAddr
allHostAddrs :: HostAddr
allHostAddrs = (Word8
0,Word8
0,Word8
0,Word8
0)
localHostAddr :: HostAddr
localHostAddr = (Word8
127,Word8
0,Word8
0,Word8
1)


msgTypeP :: BParser MessageType
msgTypeP :: BParser MessageType
msgTypeP = ((Word8 -> Bool) -> Parser Word8 IO Word8
forall (m :: * -> *) a. Monad m => (a -> Bool) -> Parser a m a
P.satisfy (Word8 -> Word8 -> Bool
forall a. Eq a => a -> a -> Bool
== Word8
0) Parser Word8 IO Word8 -> BParser MessageType -> BParser MessageType
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*>
             (Int -> MessageType
RequestMessage (Int -> MessageType) -> (Word32 -> Int) -> Word32 -> MessageType
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Word32 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Word32 -> MessageType)
-> Parser Word8 IO Word32 -> BParser MessageType
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Parser Word8 IO Word32
word32P)) BParser MessageType -> BParser MessageType -> BParser MessageType
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|>
           ((Word8 -> Bool) -> Parser Word8 IO Word8
forall (m :: * -> *) a. Monad m => (a -> Bool) -> Parser a m a
P.satisfy (Word8 -> Word8 -> Bool
forall a. Eq a => a -> a -> Bool
== Word8
1) Parser Word8 IO Word8 -> MessageType -> BParser MessageType
forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> MessageType
ResponseMessage) BParser MessageType -> BParser MessageType -> BParser MessageType
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|>
           ((Word8 -> Bool) -> Parser Word8 IO Word8
forall (m :: * -> *) a. Monad m => (a -> Bool) -> Parser a m a
P.satisfy (Word8 -> Word8 -> Bool
forall a. Eq a => a -> a -> Bool
== Word8
2) Parser Word8 IO Word8 -> MessageType -> BParser MessageType
forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> MessageType
TimeoutResponseMessage) BParser MessageType -> BParser MessageType -> BParser MessageType
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|>
           ((Word8 -> Bool) -> Parser Word8 IO Word8
forall (m :: * -> *) a. Monad m => (a -> Bool) -> Parser a m a
P.satisfy (Word8 -> Word8 -> Bool
forall a. Eq a => a -> a -> Bool
== Word8
3) Parser Word8 IO Word8 -> MessageType -> BParser MessageType
forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> MessageType
ExceptionResponseMessage)
                 
-- Each message is length-prefixed by a 32-bit unsigned length.
envelopeP :: BParser Envelope
envelopeP :: BParser Envelope
envelopeP = do
  let lenPrefixedByteStringP :: Parser Word8 IO ByteString
lenPrefixedByteStringP = do
        Int
c <- Word32 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Word32 -> Int) -> Parser Word8 IO Word32 -> Parser Word8 IO Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Parser Word8 IO Word32
forall (m :: * -> *). Monad m => Parser Word8 m Word32
word32be
        --streamly can't handle takeEQ 0, so add special handling
--        traceShowM ("envelopeP payload byteCount"::String, c)
        if Int
c Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0 then
          ByteString -> Parser Word8 IO ByteString
forall (f :: * -> *) a. Applicative f => a -> f a
pure ByteString
forall a. Monoid a => a
mempty
          else do
          Array Word8
ps <- Int -> Fold IO Word8 (Array Word8) -> Parser Word8 IO (Array Word8)
forall (m :: * -> *) a b.
Monad m =>
Int -> Fold m a b -> Parser a m b
P.takeEQ Int
c (Int -> Fold IO Word8 (Array Word8)
forall (m :: * -> *) a.
(MonadIO m, Unbox a) =>
Int -> Fold m a (Array a)
Arr.writeN Int
c)
--          traceShowM ("envelopeP read bytes", c)
          let !bs :: ByteString
bs = Array Word8 -> ByteString
StreamlyBS.fromArray Array Word8
ps
--          traceShowM ("unoptimized bs")
          ByteString -> Parser Word8 IO ByteString
forall (f :: * -> *) a. Applicative f => a -> f a
pure ByteString
bs 
  Fingerprint -> MessageType -> UUID -> ByteString -> Envelope
Envelope (Fingerprint -> MessageType -> UUID -> ByteString -> Envelope)
-> Parser Word8 IO Fingerprint
-> Parser Word8 IO (MessageType -> UUID -> ByteString -> Envelope)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Parser Word8 IO Fingerprint
fingerprintP Parser Word8 IO (MessageType -> UUID -> ByteString -> Envelope)
-> BParser MessageType
-> Parser Word8 IO (UUID -> ByteString -> Envelope)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> BParser MessageType
msgTypeP Parser Word8 IO (UUID -> ByteString -> Envelope)
-> Parser Word8 IO UUID -> Parser Word8 IO (ByteString -> Envelope)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Parser Word8 IO UUID
uuidP Parser Word8 IO (ByteString -> Envelope)
-> Parser Word8 IO ByteString -> BParser Envelope
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Parser Word8 IO ByteString
lenPrefixedByteStringP

--overhead is fingerprint (16 bytes), msgType (1+4 optional bytes for request message), msgId (4 bytes), uuid (16 bytes) = 41 bytes per request message, 37 bytes for all others
encodeEnvelope :: Envelope -> BS.ByteString
encodeEnvelope :: Envelope -> ByteString
encodeEnvelope (Envelope (Fingerprint Word64
fp1 Word64
fp2) MessageType
msgType UUID
msgId ByteString
bs) =
{-  traceShow ("encodeEnvelope"::String,
             ("fingerprint len"::String, BS.length fingerprintBs),
             ("msgtype length"::String,BS.length msgTypeBs),
             ("id len"::String, BS.length msgIdBs),
             ("payload len"::String, payloadLen),
             ("complete len"::String, BS.length completeMessage)) $-}
  ByteString
completeMessage
  where
    completeMessage :: ByteString
completeMessage = ByteString
fingerprintBs ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
msgTypeBs ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
msgIdBs ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
lenPrefixedBs
    fingerprintBs :: ByteString
fingerprintBs = Word64 -> ByteString
BO.bytestring64 Word64
fp1 ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> Word64 -> ByteString
BO.bytestring64 Word64
fp2
    msgTypeBs :: ByteString
msgTypeBs = case MessageType
msgType of
      RequestMessage Int
timeoutms -> Word8 -> ByteString
BS.singleton Word8
0 ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> Word32 -> ByteString
BO.bytestring32 (Int -> Word32
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
timeoutms)
      MessageType
ResponseMessage -> Word8 -> ByteString
BS.singleton Word8
1
      MessageType
TimeoutResponseMessage -> Word8 -> ByteString
BS.singleton Word8
2
      MessageType
ExceptionResponseMessage -> Word8 -> ByteString
BS.singleton Word8
3
    msgIdBs :: ByteString
msgIdBs =
      case UUID -> (Word32, Word32, Word32, Word32)
UUIDBase.toWords (UUID -> UUID
_unUUID UUID
msgId) of
        (Word32
u1, Word32
u2, Word32
u3, Word32
u4) -> (Word32 -> ByteString -> ByteString)
-> ByteString -> [Word32] -> ByteString
forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr (ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
(<>) (ByteString -> ByteString -> ByteString)
-> (Word32 -> ByteString) -> Word32 -> ByteString -> ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Word32 -> ByteString
BO.bytestring32) ByteString
BS.empty [Word32
u1, Word32
u2, Word32
u3, Word32
u4]
    lenPrefixedBs :: ByteString
lenPrefixedBs = Word32 -> ByteString
BO.bytestring32 Word32
payloadLen ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
bs
    payloadLen :: Word32
payloadLen = Int -> Word32
forall a b. (Integral a, Num b) => a -> b
fromIntegral (ByteString -> Int
BS.length ByteString
bs)
    
    

fingerprintP :: BParser Fingerprint
fingerprintP :: Parser Word8 IO Fingerprint
fingerprintP =
  Word64 -> Word64 -> Fingerprint
Fingerprint (Word64 -> Word64 -> Fingerprint)
-> Parser Word8 IO Word64
-> Parser Word8 IO (Word64 -> Fingerprint)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Parser Word8 IO Word64
word64P Parser Word8 IO (Word64 -> Fingerprint)
-> Parser Word8 IO Word64 -> Parser Word8 IO Fingerprint
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Parser Word8 IO Word64
word64P

word64P :: BParser Word64
word64P :: Parser Word8 IO Word64
word64P = do
  let s :: Fold IO a [a]
s = Fold IO a [a]
forall (m :: * -> *) a. Monad m => Fold m a [a]
FL.toList
  [Word8]
b <- Int -> Fold IO Word8 [Word8] -> Parser Word8 IO [Word8]
forall (m :: * -> *) a b.
Monad m =>
Int -> Fold m a b -> Parser a m b
P.takeEQ Int
8 Fold IO Word8 [Word8]
forall a. Fold IO a [a]
s
  Word64 -> Parser Word8 IO Word64
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ByteString -> Word64
BO.word64 ([Word8] -> ByteString
BS.pack [Word8]
b))

--parse a 32-bit integer from network byte order
word32P :: BParser Word32
word32P :: Parser Word8 IO Word32
word32P = do
  let s :: Fold IO a [a]
s = Fold IO a [a]
forall (m :: * -> *) a. Monad m => Fold m a [a]
FL.toList
  [Word8]
w4x8 <- Int -> Fold IO Word8 [Word8] -> Parser Word8 IO [Word8]
forall (m :: * -> *) a b.
Monad m =>
Int -> Fold m a b -> Parser a m b
P.takeEQ Int
4 Fold IO Word8 [Word8]
forall a. Fold IO a [a]
s
--  traceShowM ("w4x8"::String, BO.word32 (BS.pack w4x8))
  Word32 -> Parser Word8 IO Word32
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ByteString -> Word32
BO.word32 ([Word8] -> ByteString
BS.pack [Word8]
w4x8))

-- uuid is encode as 4 32-bit words because of its convenient 32-bit tuple encoding
uuidP :: BParser UUID
uuidP :: Parser Word8 IO UUID
uuidP = do
  Word32
u1 <- Parser Word8 IO Word32
word32P
  Word32
u2 <- Parser Word8 IO Word32
word32P
  Word32
u3 <- Parser Word8 IO Word32
word32P
  --u4 <- word32P
  --pure (UUID (UUIDBase.fromWords u1 u2 u3 u4))-}
  --(UUID . UUIDBase.fromWords) <$> word32P <*> word32P <*> word32P <*> word32P
  UUID -> UUID
UUID (UUID -> UUID) -> (Word32 -> UUID) -> Word32 -> UUID
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Word32 -> Word32 -> Word32 -> Word32 -> UUID
UUIDBase.fromWords Word32
u1 Word32
u2 Word32
u3 (Word32 -> UUID) -> Parser Word8 IO Word32 -> Parser Word8 IO UUID
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Parser Word8 IO Word32
word32P

type NewConnectionHandler msg = IO (Maybe msg)

type NewMessageHandler req resp = req -> IO resp

-- | Listen for new connections and handle requests which are passed the server state 's'. The MVar SockAddr can be be optionally used to know when the server is ready for processing requests.
serve :: 
         RequestHandlers s->
         s ->
         HostAddr ->
         PortNumber ->
         Maybe (MVar SockAddr) ->
         IO Bool
serve :: RequestHandlers s
-> s -> HostAddr -> PortNumber -> Maybe (MVar SockAddr) -> IO Bool
serve RequestHandlers s
userMsgHandlers s
serverState HostAddr
hostaddr PortNumber
port Maybe (MVar SockAddr)
mSockLock = do
  let handleSock :: Socket -> IO ()
handleSock Socket
sock = do
        Locking Socket
lockingSocket <- Socket -> IO (Locking Socket)
forall a. a -> IO (Locking a)
newLock Socket
sock
        Socket -> EnvelopeHandler -> IO ()
drainSocketMessages Socket
sock (Locking Socket -> RequestHandlers s -> s -> EnvelopeHandler
forall s.
Locking Socket -> RequestHandlers s -> s -> EnvelopeHandler
serverEnvelopeHandler Locking Socket
lockingSocket RequestHandlers s
userMsgHandlers s
serverState)
  Unfold IO (HostAddr, PortNumber) Socket
-> (HostAddr, PortNumber) -> Stream IO Socket
forall (m :: * -> *) a b.
Applicative m =>
Unfold m a b -> a -> Stream m b
Stream.unfold ([(SocketOption, Int)]
-> Maybe (MVar SockAddr) -> Unfold IO (HostAddr, PortNumber) Socket
forall (m :: * -> *).
MonadIO m =>
[(SocketOption, Int)]
-> Maybe (MVar SockAddr) -> Unfold m (HostAddr, PortNumber) Socket
SA.acceptorOnAddr [(SocketOption
ReuseAddr, Int
1)] Maybe (MVar SockAddr)
mSockLock) (HostAddr
hostaddr, PortNumber
port) 
   Stream IO Socket
-> (Stream IO Socket -> Stream IO ()) -> Stream IO ()
forall a b. a -> (a -> b) -> b
& (Config -> Config)
-> (Socket -> IO ()) -> Stream IO Socket -> Stream IO ()
forall (m :: * -> *) a b.
MonadAsync m =>
(Config -> Config) -> (a -> m b) -> Stream m a -> Stream m b
Stream.parMapM Config -> Config
forall a. a -> a
id Socket -> IO ()
handleSock
   Stream IO () -> (Stream IO () -> IO ()) -> IO ()
forall a b. a -> (a -> b) -> b
& Fold IO () () -> Stream IO () -> IO ()
forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> m b
Stream.fold Fold IO () ()
forall (m :: * -> *) a. Monad m => Fold m a ()
FL.drain
  Bool -> IO Bool
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
True

openEnvelope :: forall s. (Serialise s, Typeable s) => Envelope -> Maybe s
openEnvelope :: Envelope -> Maybe s
openEnvelope (Envelope Fingerprint
eprint MessageType
_ UUID
_ ByteString
bytes) =
  if Fingerprint
eprint Fingerprint -> Fingerprint -> Bool
forall a. Eq a => a -> a -> Bool
== s -> Fingerprint
forall a. Typeable a => a -> Fingerprint
fingerprint (s
forall a. HasCallStack => a
undefined :: s) then
    case ByteString -> Either WineryException s
forall s. Serialise s => ByteString -> Either WineryException s
msgDeserialise ByteString
bytes of
      Left WineryException
_e -> {-traceShow ("openEnv error"::String, _e)-} Maybe s
forall a. Maybe a
Nothing
      Right s
decoded -> s -> Maybe s
forall a. a -> Maybe a
Just s
decoded
    else
    Maybe s
forall a. Maybe a
Nothing

--use winery to decode only the data structure and skip the schema
deserialiseOnly :: forall s. Serialise s => BS.ByteString -> Either WineryException s
deserialiseOnly :: ByteString -> Either WineryException s
deserialiseOnly ByteString
bytes = do
  Decoder s
dec <- Schema -> Either WineryException (Decoder s)
forall a.
Serialise a =>
Schema -> Either WineryException (Decoder a)
getDecoder (Proxy s -> Schema
forall (proxy :: * -> *) a. Serialise a => proxy a -> Schema
schema (Proxy s
forall k (t :: k). Proxy t
Proxy :: Proxy s))
  s -> Either WineryException s
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Decoder s -> ByteString -> s
forall a. Decoder a -> ByteString -> a
evalDecoder Decoder s
dec ByteString
bytes)


matchEnvelope :: forall a b s. (Serialise a, Serialise b, Typeable b) =>
              Envelope -> 
              (ConnectionState s -> a -> IO b) ->
              Maybe (ConnectionState s -> a -> IO b, a)
matchEnvelope :: Envelope
-> (ConnectionState s -> a -> IO b)
-> Maybe (ConnectionState s -> a -> IO b, a)
matchEnvelope Envelope
envelope ConnectionState s -> a -> IO b
dispatchf =
  case Envelope -> Maybe a
forall s. (Serialise s, Typeable s) => Envelope -> Maybe s
openEnvelope Envelope
envelope :: Maybe a of
    Maybe a
Nothing -> Maybe (ConnectionState s -> a -> IO b, a)
forall a. Maybe a
Nothing
    Just a
decoded -> (ConnectionState s -> a -> IO b, a)
-> Maybe (ConnectionState s -> a -> IO b, a)
forall a. a -> Maybe a
Just (ConnectionState s -> a -> IO b
dispatchf, a
decoded)

-- | Called by `serve` to process incoming envelope requests. Never returns, so use `async` to spin it off on another thread.
serverEnvelopeHandler :: 
                     Locking Socket
                     -> RequestHandlers s
                     -> s         
                     -> Envelope
                     -> IO ()
serverEnvelopeHandler :: Locking Socket -> RequestHandlers s -> s -> EnvelopeHandler
serverEnvelopeHandler Locking Socket
_ RequestHandlers s
_ s
_ (Envelope Fingerprint
_ MessageType
TimeoutResponseMessage UUID
_ ByteString
_) = () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
serverEnvelopeHandler Locking Socket
_ RequestHandlers s
_ s
_ (Envelope Fingerprint
_ MessageType
ExceptionResponseMessage UUID
_ ByteString
_) = () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
serverEnvelopeHandler Locking Socket
_ RequestHandlers s
_ s
_ (Envelope Fingerprint
_ MessageType
ResponseMessage UUID
_ ByteString
_) = () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
serverEnvelopeHandler Locking Socket
sockLock RequestHandlers s
msgHandlers s
serverState envelope :: Envelope
envelope@(Envelope Fingerprint
_ (RequestMessage Int
timeoutms) UUID
msgId ByteString
_) = do
  --find first matching handler
  let runTimeout :: IO b -> IO (Maybe b)
      runTimeout :: IO b -> IO (Maybe b)
runTimeout IO b
m = 
        if Int
timeoutms Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0 then
          (b -> Maybe b
forall a. a -> Maybe a
Just (b -> Maybe b) -> IO b -> IO (Maybe b)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO b
m) IO (Maybe b) -> (TimeoutException -> IO (Maybe b)) -> IO (Maybe b)
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`catch` TimeoutException -> IO (Maybe b)
forall b. TimeoutException -> IO (Maybe b)
timeoutExcHandler
        else
          Int -> IO b -> IO (Maybe b)
forall a. Int -> IO a -> IO (Maybe a)
timeout (Int -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
timeoutms) IO b
m IO (Maybe b) -> (TimeoutException -> IO (Maybe b)) -> IO (Maybe b)
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`catch` TimeoutException -> IO (Maybe b)
forall b. TimeoutException -> IO (Maybe b)
timeoutExcHandler
      --allow server-side function to throw TimeoutError which is caught here and becomes TimeoutError value
      timeoutExcHandler :: TimeoutException -> IO (Maybe b)
      timeoutExcHandler :: TimeoutException -> IO (Maybe b)
timeoutExcHandler TimeoutException
_ = Maybe b -> IO (Maybe b)
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe b
forall a. Maybe a
Nothing
      
      sState :: ConnectionState s
sState = ConnectionState :: forall a. a -> Locking Socket -> ConnectionState a
ConnectionState {
        connectionServerState :: s
connectionServerState = s
serverState,
        connectionSocket :: Locking Socket
connectionSocket = Locking Socket
sockLock
        }
            
      firstMatcher :: RequestHandler s -> Maybe () -> IO (Maybe ())
firstMatcher (RequestHandler ConnectionState s -> a -> IO b
msghandler) Maybe ()
Nothing =
        case Envelope
-> (ConnectionState s -> a -> IO b)
-> Maybe (ConnectionState s -> a -> IO b, a)
forall a b s.
(Serialise a, Serialise b, Typeable b) =>
Envelope
-> (ConnectionState s -> a -> IO b)
-> Maybe (ConnectionState s -> a -> IO b, a)
matchEnvelope Envelope
envelope ConnectionState s -> a -> IO b
msghandler of
          Maybe (ConnectionState s -> a -> IO b, a)
Nothing -> Maybe () -> IO (Maybe ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe ()
forall a. Maybe a
Nothing
          Just (ConnectionState s -> a -> IO b
dispatchf, a
decoded) -> do
            --TODO add exception handling
            Maybe b
mResponse <- IO b -> IO (Maybe b)
forall b. IO b -> IO (Maybe b)
runTimeout (ConnectionState s -> a -> IO b
dispatchf ConnectionState s
sState a
decoded)
            let envelopeResponse :: Envelope
envelopeResponse =
                  case Maybe b
mResponse of
                        Just b
response ->
                          Fingerprint -> MessageType -> UUID -> ByteString -> Envelope
Envelope (b -> Fingerprint
forall a. Typeable a => a -> Fingerprint
fingerprint b
response) MessageType
ResponseMessage UUID
msgId (b -> ByteString
forall a. Serialise a => a -> ByteString
msgSerialise b
response)
                        Maybe b
Nothing -> 
                          Fingerprint -> MessageType -> UUID -> ByteString -> Envelope
Envelope (ConnectionError -> Fingerprint
forall a. Typeable a => a -> Fingerprint
fingerprint ConnectionError
TimeoutError) MessageType
TimeoutResponseMessage UUID
msgId ByteString
BS.empty
            Envelope -> Locking Socket -> IO ()
sendEnvelope Envelope
envelopeResponse Locking Socket
sockLock
            Maybe () -> IO (Maybe ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure (() -> Maybe ()
forall a. a -> Maybe a
Just ())
      firstMatcher (AsyncRequestHandler ConnectionState s -> a -> IO ()
msghandler) Maybe ()
Nothing =        
        case Envelope
-> (ConnectionState s -> a -> IO ())
-> Maybe (ConnectionState s -> a -> IO (), a)
forall a b s.
(Serialise a, Serialise b, Typeable b) =>
Envelope
-> (ConnectionState s -> a -> IO b)
-> Maybe (ConnectionState s -> a -> IO b, a)
matchEnvelope Envelope
envelope ConnectionState s -> a -> IO ()
msghandler of
          Maybe (ConnectionState s -> a -> IO (), a)
Nothing -> Maybe () -> IO (Maybe ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe ()
forall a. Maybe a
Nothing
          Just (ConnectionState s -> a -> IO ()
dispatchf, a
decoded) -> do
              ()
_ <- ConnectionState s -> a -> IO ()
dispatchf ConnectionState s
sState a
decoded
              Maybe () -> IO (Maybe ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure (() -> Maybe ()
forall a. a -> Maybe a
Just ())
      firstMatcher RequestHandler s
_ Maybe ()
acc = Maybe () -> IO (Maybe ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe ()
acc
  Either SomeException ()
eExc <- IO () -> IO (Either SomeException ())
forall e a. Exception e => IO a -> IO (Either e a)
try (IO () -> IO (Either SomeException ()))
-> IO () -> IO (Either SomeException ())
forall a b. (a -> b) -> a -> b
$ (Maybe () -> RequestHandler s -> IO (Maybe ()))
-> Maybe () -> RequestHandlers s -> IO ()
forall (t :: * -> *) (m :: * -> *) b a.
(Foldable t, Monad m) =>
(b -> a -> m b) -> b -> t a -> m ()
foldM_ ((RequestHandler s -> Maybe () -> IO (Maybe ()))
-> Maybe () -> RequestHandler s -> IO (Maybe ())
forall a b c. (a -> b -> c) -> b -> a -> c
flip RequestHandler s -> Maybe () -> IO (Maybe ())
firstMatcher) Maybe ()
forall a. Maybe a
Nothing RequestHandlers s
msgHandlers :: IO (Either SomeException ())
  case Either SomeException ()
eExc of
    Left SomeException
exc ->
      let env :: Envelope
env = Fingerprint -> MessageType -> UUID -> ByteString -> Envelope
Envelope (String -> Fingerprint
forall a. Typeable a => a -> Fingerprint
fingerprint (SomeException -> String
forall a. Show a => a -> String
show SomeException
exc)) MessageType
ExceptionResponseMessage UUID
msgId (String -> ByteString
forall a. Serialise a => a -> ByteString
msgSerialise (SomeException -> String
forall a. Show a => a -> String
show SomeException
exc)) in
      Envelope -> Locking Socket -> IO ()
sendEnvelope Envelope
env Locking Socket
sockLock
    Right () -> () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()


type EnvelopeHandler = Envelope -> IO ()

drainSocketMessages :: Socket -> EnvelopeHandler -> IO ()
drainSocketMessages :: Socket -> EnvelopeHandler -> IO ()
drainSocketMessages Socket
sock EnvelopeHandler
envelopeHandler = do
  Unfold IO Socket Word8 -> Socket -> Stream IO Word8
forall (m :: * -> *) a b.
Applicative m =>
Unfold m a b -> a -> Stream m b
SP.unfold Unfold IO Socket Word8
forall (m :: * -> *). MonadIO m => Unfold m Socket Word8
SSock.reader Socket
sock
  Stream IO Word8
-> (Stream IO Word8 -> Stream IO (Either ParseError Envelope))
-> Stream IO (Either ParseError Envelope)
forall a b. a -> (a -> b) -> b
& BParser Envelope
-> Stream IO Word8 -> Stream IO (Either ParseError Envelope)
forall (m :: * -> *) a b.
Monad m =>
Parser a m b -> Stream m a -> Stream m (Either ParseError b)
P.parseMany BParser Envelope
envelopeP
  Stream IO (Either ParseError Envelope)
-> (Stream IO (Either ParseError Envelope) -> Stream IO Envelope)
-> Stream IO Envelope
forall a b. a -> (a -> b) -> b
& Stream IO (Either ParseError Envelope) -> Stream IO Envelope
forall (m :: * -> *) a b.
Monad m =>
Stream m (Either a b) -> Stream m b
SP.catRights
  Stream IO Envelope
-> (Stream IO Envelope -> Stream IO ()) -> Stream IO ()
forall a b. a -> (a -> b) -> b
& (Config -> Config)
-> EnvelopeHandler -> Stream IO Envelope -> Stream IO ()
forall (m :: * -> *) a b.
MonadAsync m =>
(Config -> Config) -> (a -> m b) -> Stream m a -> Stream m b
SP.parMapM (Bool -> Config -> Config
SP.ordered Bool
False) EnvelopeHandler
envelopeHandler
  Stream IO () -> (Stream IO () -> IO ()) -> IO ()
forall a b. a -> (a -> b) -> b
& Fold IO () () -> Stream IO () -> IO ()
forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> m b
SP.fold Fold IO () ()
forall (m :: * -> *) a. Monad m => Fold m a ()
FL.drain

--send length-tagged bytestring, perhaps should be in network byte order?
sendEnvelope :: Envelope -> Locking Socket -> IO ()
sendEnvelope :: Envelope -> Locking Socket -> IO ()
sendEnvelope Envelope
envelope Locking Socket
sockLock = do
  let envelopebytes :: ByteString
envelopebytes = Envelope -> ByteString
encodeEnvelope Envelope
envelope
  --Socket.sendAll syscalls send() on a loop until all the bytes are sent, so we need socket locking here to account for serialized messages of size > PIPE_BUF
  Locking Socket -> (Socket -> IO ()) -> IO ()
forall a b. Locking a -> (a -> IO b) -> IO b
withLock Locking Socket
sockLock ((Socket -> IO ()) -> IO ()) -> (Socket -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Socket
socket' -> do
    {-traceShowM ("sendEnvelope"::String,
                ("type"::String, envMessageType envelope),
                socket',
                ("envelope len out"::String, BS.length envelopebytes),
                "payloadbytes"::String, envPayload envelope
               )-}
    Socket -> ByteString -> IO ()
Socket.sendAll Socket
socket' ByteString
envelopebytes
  String -> ByteString -> IO ()
forall (f :: * -> *). Applicative f => String -> ByteString -> f ()
traceBytes String
"sendEnvelope" ByteString
envelopebytes

fingerprint :: Typeable a => a -> Fingerprint
fingerprint :: a -> Fingerprint
fingerprint = TypeRep -> Fingerprint
typeRepFingerprint (TypeRep -> Fingerprint) -> (a -> TypeRep) -> a -> Fingerprint
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> TypeRep
forall a. Typeable a => a -> TypeRep
typeOf