{-# LANGUAGE DerivingVia, DeriveGeneric, RankNTypes, ScopedTypeVariables, MultiParamTypeClasses, OverloadedStrings, GeneralizedNewtypeDeriving, CPP, ExistentialQuantification, StandaloneDeriving, GADTs #-}
{-# OPTIONS_GHC -fno-warn-orphans #-}
{- HLINT ignore "Use lambda-case" -}
module Network.RPC.Curryer.Server where
import qualified Streamly.Prelude as S
import Streamly.Network.Socket as SSock
import Network.Socket as Socket
import Network.Socket.ByteString as Socket
import Streamly.Internal.Data.Parser as P hiding (concatMap)
import Codec.Winery
import Codec.Winery.Internal (varInt, decodeVarInt, getBytes)
import GHC.Generics
import GHC.Fingerprint
import Data.Typeable
import Control.Concurrent.MVar (MVar, newMVar, withMVar)
import Control.Exception
import Data.Function ((&))
import Data.Word
import qualified Data.ByteString as BS
import qualified Data.ByteString.Lazy as BSL
import qualified Data.ByteString.FastBuilder as BB
import Streamly.Data.Fold as FL hiding (foldr)
import qualified Streamly.Internal.Data.Stream.IsStream as P
import qualified Data.Binary as B
import qualified Data.UUID as UUIDBase
import qualified Data.UUID.V4 as UUIDBase
import Control.Monad
import Data.Functor
import Control.Applicative

import qualified Network.RPC.Curryer.StreamlyAdditions as SA
import Data.Hashable
import System.Timeout
import qualified Network.ByteOrder as BO


-- for toArrayS conversion
import qualified Data.ByteString.Internal as BSI
import qualified Streamly.Internal.Data.Array.Foreign.Type as Arr
import qualified Streamly.Internal.Data.Array.Foreign.Mut.Type as ArrT
import GHC.ForeignPtr (ForeignPtr(ForeignPtr))
import GHC.Ptr (minusPtr, Ptr(..))

--define CURRYER_SHOW_BYTES 1

#if CURRYER_SHOW_BYTES == 1
import Debug.Trace
#endif

traceBytes :: Applicative f => String -> BS.ByteString -> f ()  
#if CURRYER_SHOW_BYTES == 1
traceBytes msg bs = traceShowM (msg, BS.length bs, bs)
#else
traceBytes :: String -> ByteString -> f ()
traceBytes String
_ ByteString
_ = () -> f ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
#endif

-- a level of indirection to be able to switch between serialising with and without the winery schema
msgSerialise :: Serialise a => a -> BS.ByteString
msgSerialise :: a -> ByteString
msgSerialise = a -> ByteString
forall a. Serialise a => a -> ByteString
serialiseOnly

msgDeserialise :: forall s. Serialise s => BS.ByteString -> Either WineryException s
msgDeserialise :: ByteString -> Either WineryException s
msgDeserialise = ByteString -> Either WineryException s
forall s. Serialise s => ByteString -> Either WineryException s
deserialiseOnly

data Locking a = Locking (MVar ()) a

newLock :: a -> IO (Locking a)
newLock :: a -> IO (Locking a)
newLock a
x = do
  MVar ()
lock <- () -> IO (MVar ())
forall a. a -> IO (MVar a)
newMVar ()
  Locking a -> IO (Locking a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (MVar () -> a -> Locking a
forall a. MVar () -> a -> Locking a
Locking MVar ()
lock a
x)
  
withLock :: Locking a -> (a -> IO b) -> IO b
withLock :: Locking a -> (a -> IO b) -> IO b
withLock (Locking MVar ()
mvar a
v) a -> IO b
m =
  MVar () -> (() -> IO b) -> IO b
forall a b. MVar a -> (a -> IO b) -> IO b
withMVar MVar ()
mvar ((() -> IO b) -> IO b) -> (() -> IO b) -> IO b
forall a b. (a -> b) -> a -> b
$ \()
_ -> a -> IO b
m a
v

lockless :: Locking a -> a
lockless :: Locking a -> a
lockless (Locking MVar ()
_ a
a) = a
a

type Timeout = Word32

type BinaryMessage = BS.ByteString

--includes the fingerprint of the incoming data type (held in the BinaryMessage) to determine how to dispatch the message.
--add another envelope type for unencoded binary messages for any easy optimization for in-process communication
data Envelope = Envelope {
  Envelope -> Fingerprint
envFingerprint :: !Fingerprint,
  Envelope -> MessageType
envMessageType :: !MessageType,
  Envelope -> UUID
envMsgId :: !UUID,
  Envelope -> ByteString
envPayload :: !BinaryMessage
  }
  deriving ((forall x. Envelope -> Rep Envelope x)
-> (forall x. Rep Envelope x -> Envelope) -> Generic Envelope
forall x. Rep Envelope x -> Envelope
forall x. Envelope -> Rep Envelope x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep Envelope x -> Envelope
$cfrom :: forall x. Envelope -> Rep Envelope x
Generic, Int -> Envelope -> ShowS
[Envelope] -> ShowS
Envelope -> String
(Int -> Envelope -> ShowS)
-> (Envelope -> String) -> ([Envelope] -> ShowS) -> Show Envelope
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [Envelope] -> ShowS
$cshowList :: [Envelope] -> ShowS
show :: Envelope -> String
$cshow :: Envelope -> String
showsPrec :: Int -> Envelope -> ShowS
$cshowsPrec :: Int -> Envelope -> ShowS
Show)

type TimeoutMicroseconds = Int

deriving instance Generic Fingerprint
deriving via WineryVariant Fingerprint instance Serialise Fingerprint

-- | Internal type used to mark envelope types.
data MessageType = RequestMessage TimeoutMicroseconds
                 | ResponseMessage
                 | TimeoutResponseMessage
                 | ExceptionResponseMessage
                 deriving ((forall x. MessageType -> Rep MessageType x)
-> (forall x. Rep MessageType x -> MessageType)
-> Generic MessageType
forall x. Rep MessageType x -> MessageType
forall x. MessageType -> Rep MessageType x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep MessageType x -> MessageType
$cfrom :: forall x. MessageType -> Rep MessageType x
Generic, Int -> MessageType -> ShowS
[MessageType] -> ShowS
MessageType -> String
(Int -> MessageType -> ShowS)
-> (MessageType -> String)
-> ([MessageType] -> ShowS)
-> Show MessageType
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [MessageType] -> ShowS
$cshowList :: [MessageType] -> ShowS
show :: MessageType -> String
$cshow :: MessageType -> String
showsPrec :: Int -> MessageType -> ShowS
$cshowsPrec :: Int -> MessageType -> ShowS
Show)
                 deriving Typeable MessageType
BundleSerialise MessageType
Extractor MessageType
Decoder MessageType
Typeable MessageType
-> (Proxy MessageType -> SchemaGen Schema)
-> (MessageType -> Builder)
-> Extractor MessageType
-> Decoder MessageType
-> BundleSerialise MessageType
-> Serialise MessageType
Proxy MessageType -> SchemaGen Schema
MessageType -> Builder
forall a.
Typeable a
-> (Proxy a -> SchemaGen Schema)
-> (a -> Builder)
-> Extractor a
-> Decoder a
-> BundleSerialise a
-> Serialise a
bundleSerialise :: BundleSerialise MessageType
$cbundleSerialise :: BundleSerialise MessageType
decodeCurrent :: Decoder MessageType
$cdecodeCurrent :: Decoder MessageType
extractor :: Extractor MessageType
$cextractor :: Extractor MessageType
toBuilder :: MessageType -> Builder
$ctoBuilder :: MessageType -> Builder
schemaGen :: Proxy MessageType -> SchemaGen Schema
$cschemaGen :: Proxy MessageType -> SchemaGen Schema
$cp1Serialise :: Typeable MessageType
Serialise via WineryVariant MessageType

-- | A list of `RequestHandler`s.
type RequestHandlers serverState = [RequestHandler serverState]

-- | Data types for server-side request handlers, in synchronous (client waits for return value) and asynchronous (client does not wait for return value) forms.
data RequestHandler serverState where
  -- | create a request handler with a response
  RequestHandler :: forall a b serverState. (Serialise a, Serialise b) => (ConnectionState serverState -> a -> IO b) -> RequestHandler serverState
  -- | create an asynchronous request handler where the client does not expect nor await a response
  AsyncRequestHandler :: forall a serverState. Serialise a => (ConnectionState serverState -> a -> IO ()) -> RequestHandler serverState

-- | Server state sent in via `serve` and passed to `RequestHandler`s.
data ConnectionState a = ConnectionState {
  ConnectionState a -> a
connectionServerState :: a,
  ConnectionState a -> Locking Socket
connectionSocket :: Locking Socket
  }

-- | Used by server-side request handlers to send additional messages to the client. This is useful for sending asynchronous responses to the client outside of the normal request-response flow. The locking socket can be found in the ConnectionState when a request handler is called.
sendMessage :: Serialise a => Locking Socket -> a -> IO ()
sendMessage :: Locking Socket -> a -> IO ()
sendMessage Locking Socket
lockSock a
msg = do
  UUID
requestID <- UUID -> UUID
UUID (UUID -> UUID) -> IO UUID -> IO UUID
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUIDBase.nextRandom
  let env :: Envelope
env =
        Fingerprint -> MessageType -> UUID -> ByteString -> Envelope
Envelope (a -> Fingerprint
forall a. Typeable a => a -> Fingerprint
fingerprint a
msg) (Int -> MessageType
RequestMessage Int
timeout') UUID
requestID (a -> ByteString
forall a. Serialise a => a -> ByteString
msgSerialise a
msg)
      timeout' :: Int
timeout' = Int
0
  Envelope -> Locking Socket -> IO ()
sendEnvelope Envelope
env Locking Socket
lockSock
  
--avoid orphan instance
newtype UUID = UUID { UUID -> UUID
_unUUID :: UUIDBase.UUID }
  deriving (Int -> UUID -> ShowS
[UUID] -> ShowS
UUID -> String
(Int -> UUID -> ShowS)
-> (UUID -> String) -> ([UUID] -> ShowS) -> Show UUID
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [UUID] -> ShowS
$cshowList :: [UUID] -> ShowS
show :: UUID -> String
$cshow :: UUID -> String
showsPrec :: Int -> UUID -> ShowS
$cshowsPrec :: Int -> UUID -> ShowS
Show, UUID -> UUID -> Bool
(UUID -> UUID -> Bool) -> (UUID -> UUID -> Bool) -> Eq UUID
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: UUID -> UUID -> Bool
$c/= :: UUID -> UUID -> Bool
== :: UUID -> UUID -> Bool
$c== :: UUID -> UUID -> Bool
Eq, Get UUID
[UUID] -> Put
UUID -> Put
(UUID -> Put) -> Get UUID -> ([UUID] -> Put) -> Binary UUID
forall t. (t -> Put) -> Get t -> ([t] -> Put) -> Binary t
putList :: [UUID] -> Put
$cputList :: [UUID] -> Put
get :: Get UUID
$cget :: Get UUID
put :: UUID -> Put
$cput :: UUID -> Put
B.Binary, Eq UUID
Eq UUID -> (Int -> UUID -> Int) -> (UUID -> Int) -> Hashable UUID
Int -> UUID -> Int
UUID -> Int
forall a. Eq a -> (Int -> a -> Int) -> (a -> Int) -> Hashable a
hash :: UUID -> Int
$chash :: UUID -> Int
hashWithSalt :: Int -> UUID -> Int
$chashWithSalt :: Int -> UUID -> Int
$cp1Hashable :: Eq UUID
Hashable)

instance Serialise UUID where
  schemaGen :: Proxy UUID -> SchemaGen Schema
schemaGen Proxy UUID
_ = Schema -> SchemaGen Schema
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Tag -> Schema -> Schema
forall a. Tag -> SchemaP a -> SchemaP a
STag (Text -> Tag
TagStr Text
"Data.UUID") Schema
forall a. SchemaP a
SBytes)
  toBuilder :: UUID -> Builder
toBuilder UUID
uuid = let bytes :: ByteString
bytes = ByteString -> ByteString
BSL.toStrict (UUID -> ByteString
forall a. Binary a => a -> ByteString
B.encode UUID
uuid) in
                     Int -> Builder
forall a. (Bits a, Integral a) => a -> Builder
varInt (ByteString -> Int
BS.length ByteString
bytes) Builder -> Builder -> Builder
forall a. Semigroup a => a -> a -> a
<> ByteString -> Builder
BB.byteString ByteString
bytes
  {-# INLINE toBuilder #-}
  extractor :: Extractor UUID
extractor = (Schema -> Strategy' (Term -> UUID)) -> Extractor UUID
forall a.
Typeable a =>
(Schema -> Strategy' (Term -> a)) -> Extractor a
mkExtractor ((Schema -> Strategy' (Term -> UUID)) -> Extractor UUID)
-> (Schema -> Strategy' (Term -> UUID)) -> Extractor UUID
forall a b. (a -> b) -> a -> b
$
    \Schema
schema' -> case Schema
schema' of
                 STag (TagStr Text
"Data.UUID") Schema
SBytes ->
                   (Term -> UUID) -> Strategy' (Term -> UUID)
forall (f :: * -> *) a. Applicative f => a -> f a
pure ((Term -> UUID) -> Strategy' (Term -> UUID))
-> (Term -> UUID) -> Strategy' (Term -> UUID)
forall a b. (a -> b) -> a -> b
$ \Term
term -> case Term
term of
                              TBytes ByteString
bs -> ByteString -> UUID
forall a. Binary a => ByteString -> a
B.decode (ByteString -> ByteString
BSL.fromStrict ByteString
bs)
                              Term
term' -> ExtractException -> UUID
forall a e. Exception e => e -> a
throw (Term -> ExtractException
InvalidTerm Term
term')
                 Schema
x -> String -> Strategy' (Term -> UUID)
forall a. HasCallStack => String -> a
error (String -> Strategy' (Term -> UUID))
-> String -> Strategy' (Term -> UUID)
forall a b. (a -> b) -> a -> b
$ String
"invalid schema element " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> Schema -> String
forall a. Show a => a -> String
show Schema
x
  decodeCurrent :: Decoder UUID
decodeCurrent = ByteString -> UUID
forall a. Binary a => ByteString -> a
B.decode (ByteString -> UUID)
-> (ByteString -> ByteString) -> ByteString -> UUID
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> ByteString
BSL.fromStrict (ByteString -> UUID) -> Decoder ByteString -> Decoder UUID
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (Decoder Int
forall a. (Num a, Bits a) => Decoder a
decodeVarInt Decoder Int -> (Int -> Decoder ByteString) -> Decoder ByteString
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Int -> Decoder ByteString
getBytes)

-- | Errors from remote calls.
data ConnectionError = CodecError String -- show of WineryException from exception initiator which cannot otherwise be transmitted over a line due to dependencies on TypeReps
                     | TimeoutError
                     | ExceptionError String
                     deriving ((forall x. ConnectionError -> Rep ConnectionError x)
-> (forall x. Rep ConnectionError x -> ConnectionError)
-> Generic ConnectionError
forall x. Rep ConnectionError x -> ConnectionError
forall x. ConnectionError -> Rep ConnectionError x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep ConnectionError x -> ConnectionError
$cfrom :: forall x. ConnectionError -> Rep ConnectionError x
Generic, Int -> ConnectionError -> ShowS
[ConnectionError] -> ShowS
ConnectionError -> String
(Int -> ConnectionError -> ShowS)
-> (ConnectionError -> String)
-> ([ConnectionError] -> ShowS)
-> Show ConnectionError
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [ConnectionError] -> ShowS
$cshowList :: [ConnectionError] -> ShowS
show :: ConnectionError -> String
$cshow :: ConnectionError -> String
showsPrec :: Int -> ConnectionError -> ShowS
$cshowsPrec :: Int -> ConnectionError -> ShowS
Show, ConnectionError -> ConnectionError -> Bool
(ConnectionError -> ConnectionError -> Bool)
-> (ConnectionError -> ConnectionError -> Bool)
-> Eq ConnectionError
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: ConnectionError -> ConnectionError -> Bool
$c/= :: ConnectionError -> ConnectionError -> Bool
== :: ConnectionError -> ConnectionError -> Bool
$c== :: ConnectionError -> ConnectionError -> Bool
Eq)
                     deriving Typeable ConnectionError
BundleSerialise ConnectionError
Extractor ConnectionError
Decoder ConnectionError
Typeable ConnectionError
-> (Proxy ConnectionError -> SchemaGen Schema)
-> (ConnectionError -> Builder)
-> Extractor ConnectionError
-> Decoder ConnectionError
-> BundleSerialise ConnectionError
-> Serialise ConnectionError
Proxy ConnectionError -> SchemaGen Schema
ConnectionError -> Builder
forall a.
Typeable a
-> (Proxy a -> SchemaGen Schema)
-> (a -> Builder)
-> Extractor a
-> Decoder a
-> BundleSerialise a
-> Serialise a
bundleSerialise :: BundleSerialise ConnectionError
$cbundleSerialise :: BundleSerialise ConnectionError
decodeCurrent :: Decoder ConnectionError
$cdecodeCurrent :: Decoder ConnectionError
extractor :: Extractor ConnectionError
$cextractor :: Extractor ConnectionError
toBuilder :: ConnectionError -> Builder
$ctoBuilder :: ConnectionError -> Builder
schemaGen :: Proxy ConnectionError -> SchemaGen Schema
$cschemaGen :: Proxy ConnectionError -> SchemaGen Schema
$cp1Serialise :: Typeable ConnectionError
Serialise via WineryVariant ConnectionError

data TimeoutException = TimeoutException
  deriving Int -> TimeoutException -> ShowS
[TimeoutException] -> ShowS
TimeoutException -> String
(Int -> TimeoutException -> ShowS)
-> (TimeoutException -> String)
-> ([TimeoutException] -> ShowS)
-> Show TimeoutException
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [TimeoutException] -> ShowS
$cshowList :: [TimeoutException] -> ShowS
show :: TimeoutException -> String
$cshow :: TimeoutException -> String
showsPrec :: Int -> TimeoutException -> ShowS
$cshowsPrec :: Int -> TimeoutException -> ShowS
Show

instance Exception TimeoutException  

type HostAddr = (Word8, Word8, Word8, Word8)

allHostAddrs,localHostAddr :: HostAddr
allHostAddrs :: HostAddr
allHostAddrs = (Word8
0,Word8
0,Word8
0,Word8
0)
localHostAddr :: HostAddr
localHostAddr = (Word8
127,Word8
0,Word8
0,Word8
1)

msgTypeP :: Parser IO Word8 MessageType
msgTypeP :: Parser IO Word8 MessageType
msgTypeP = ((Word8 -> Bool) -> Parser IO Word8 Word8
forall (m :: * -> *) a. MonadCatch m => (a -> Bool) -> Parser m a a
P.satisfy (Word8 -> Word8 -> Bool
forall a. Eq a => a -> a -> Bool
== Word8
0) Parser IO Word8 Word8
-> Parser IO Word8 MessageType -> Parser IO Word8 MessageType
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*>
             (Int -> MessageType
RequestMessage (Int -> MessageType) -> (Word32 -> Int) -> Word32 -> MessageType
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Word32 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Word32 -> MessageType)
-> Parser IO Word8 Word32 -> Parser IO Word8 MessageType
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Parser IO Word8 Word32
word32P)) Parser IO Word8 MessageType
-> Parser IO Word8 MessageType -> Parser IO Word8 MessageType
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|>
           ((Word8 -> Bool) -> Parser IO Word8 Word8
forall (m :: * -> *) a. MonadCatch m => (a -> Bool) -> Parser m a a
P.satisfy (Word8 -> Word8 -> Bool
forall a. Eq a => a -> a -> Bool
== Word8
1) Parser IO Word8 Word8 -> MessageType -> Parser IO Word8 MessageType
forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> MessageType
ResponseMessage) Parser IO Word8 MessageType
-> Parser IO Word8 MessageType -> Parser IO Word8 MessageType
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|>
           ((Word8 -> Bool) -> Parser IO Word8 Word8
forall (m :: * -> *) a. MonadCatch m => (a -> Bool) -> Parser m a a
P.satisfy (Word8 -> Word8 -> Bool
forall a. Eq a => a -> a -> Bool
== Word8
2) Parser IO Word8 Word8 -> MessageType -> Parser IO Word8 MessageType
forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> MessageType
TimeoutResponseMessage) Parser IO Word8 MessageType
-> Parser IO Word8 MessageType -> Parser IO Word8 MessageType
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|>
           ((Word8 -> Bool) -> Parser IO Word8 Word8
forall (m :: * -> *) a. MonadCatch m => (a -> Bool) -> Parser m a a
P.satisfy (Word8 -> Word8 -> Bool
forall a. Eq a => a -> a -> Bool
== Word8
3) Parser IO Word8 Word8 -> MessageType -> Parser IO Word8 MessageType
forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> MessageType
ExceptionResponseMessage)
                 
-- Each message is length-prefixed by a 32-bit unsigned length.
envelopeP :: Parser IO Word8 Envelope
envelopeP :: Parser IO Word8 Envelope
envelopeP = do
  let lenPrefixedByteStringP :: Parser IO Word8 ByteString
lenPrefixedByteStringP = do
        Int
c <- Word32 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Word32 -> Int) -> Parser IO Word8 Word32 -> Parser IO Word8 Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Parser IO Word8 Word32
word32P
        --streamly can't handle takeEQ 0, so add special handling
--        traceShowM ("envelopeP payload byteCount"::String, c)
        if Int
c Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0 then
          ByteString -> Parser IO Word8 ByteString
forall (f :: * -> *) a. Applicative f => a -> f a
pure ByteString
BS.empty
          else
          Array Word8 -> ByteString
fromArray (Array Word8 -> ByteString)
-> Parser IO Word8 (Array Word8) -> Parser IO Word8 ByteString
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Int -> Fold IO Word8 (Array Word8) -> Parser IO Word8 (Array Word8)
forall (m :: * -> *) a b.
MonadCatch m =>
Int -> Fold m a b -> Parser m a b
P.takeEQ Int
c (Int -> Fold IO Word8 (Array Word8)
forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
Int -> Fold m a (Array a)
Arr.writeN Int
c)
  Fingerprint -> MessageType -> UUID -> ByteString -> Envelope
Envelope (Fingerprint -> MessageType -> UUID -> ByteString -> Envelope)
-> Parser IO Word8 Fingerprint
-> Parser IO Word8 (MessageType -> UUID -> ByteString -> Envelope)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Parser IO Word8 Fingerprint
fingerprintP Parser IO Word8 (MessageType -> UUID -> ByteString -> Envelope)
-> Parser IO Word8 MessageType
-> Parser IO Word8 (UUID -> ByteString -> Envelope)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Parser IO Word8 MessageType
msgTypeP Parser IO Word8 (UUID -> ByteString -> Envelope)
-> Parser IO Word8 UUID -> Parser IO Word8 (ByteString -> Envelope)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Parser IO Word8 UUID
uuidP Parser IO Word8 (ByteString -> Envelope)
-> Parser IO Word8 ByteString -> Parser IO Word8 Envelope
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Parser IO Word8 ByteString
lenPrefixedByteStringP

--overhead is fingerprint (16 bytes), msgType (1+4 optional bytes for request message), msgId (4 bytes), uuid (16 bytes) = 41 bytes per request message, 37 bytes for all others
encodeEnvelope :: Envelope -> BS.ByteString
encodeEnvelope :: Envelope -> ByteString
encodeEnvelope (Envelope (Fingerprint Word64
fp1 Word64
fp2) MessageType
msgType UUID
msgId ByteString
bs) =
{-  traceShow ("encodeEnvelope"::String,
             ("fingerprint len"::String, BS.length fingerprintBs),
             ("msgtype length"::String,BS.length msgTypeBs),
             ("id len"::String, BS.length msgIdBs),
             ("payload len"::String, payloadLen),
             ("complete len"::String, BS.length completeMessage)) $-}
  ByteString
completeMessage
  where
    completeMessage :: ByteString
completeMessage = ByteString
fingerprintBs ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
msgTypeBs ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
msgIdBs ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
lenPrefixedBs
    fingerprintBs :: ByteString
fingerprintBs = Word64 -> ByteString
BO.bytestring64 Word64
fp1 ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> Word64 -> ByteString
BO.bytestring64 Word64
fp2
    msgTypeBs :: ByteString
msgTypeBs = case MessageType
msgType of
      RequestMessage Int
timeoutms -> Word8 -> ByteString
BS.singleton Word8
0 ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> Word32 -> ByteString
BO.bytestring32 (Int -> Word32
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
timeoutms)
      MessageType
ResponseMessage -> Word8 -> ByteString
BS.singleton Word8
1
      MessageType
TimeoutResponseMessage -> Word8 -> ByteString
BS.singleton Word8
2
      MessageType
ExceptionResponseMessage -> Word8 -> ByteString
BS.singleton Word8
3
    msgIdBs :: ByteString
msgIdBs =
      case UUID -> (Word32, Word32, Word32, Word32)
UUIDBase.toWords (UUID -> UUID
_unUUID UUID
msgId) of
        (Word32
u1, Word32
u2, Word32
u3, Word32
u4) -> (Word32 -> ByteString -> ByteString)
-> ByteString -> [Word32] -> ByteString
forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr (ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
(<>) (ByteString -> ByteString -> ByteString)
-> (Word32 -> ByteString) -> Word32 -> ByteString -> ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Word32 -> ByteString
BO.bytestring32) ByteString
BS.empty [Word32
u1, Word32
u2, Word32
u3, Word32
u4]
    lenPrefixedBs :: ByteString
lenPrefixedBs = Word32 -> ByteString
BO.bytestring32 Word32
payloadLen ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
bs
    payloadLen :: Word32
payloadLen = Int -> Word32
forall a b. (Integral a, Num b) => a -> b
fromIntegral (ByteString -> Int
BS.length ByteString
bs)
    
    

fingerprintP :: Parser IO Word8 Fingerprint
fingerprintP :: Parser IO Word8 Fingerprint
fingerprintP =
  Word64 -> Word64 -> Fingerprint
Fingerprint (Word64 -> Word64 -> Fingerprint)
-> Parser IO Word8 Word64
-> Parser IO Word8 (Word64 -> Fingerprint)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Parser IO Word8 Word64
word64P Parser IO Word8 (Word64 -> Fingerprint)
-> Parser IO Word8 Word64 -> Parser IO Word8 Fingerprint
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Parser IO Word8 Word64
word64P

word64P :: Parser IO Word8 Word64
word64P :: Parser IO Word8 Word64
word64P = do
  let s :: Fold IO a [a]
s = Fold IO a [a]
forall (m :: * -> *) a. Monad m => Fold m a [a]
FL.toList
  [Word8]
b <- Int -> Fold IO Word8 [Word8] -> Parser IO Word8 [Word8]
forall (m :: * -> *) a b.
MonadCatch m =>
Int -> Fold m a b -> Parser m a b
P.takeEQ Int
8 Fold IO Word8 [Word8]
forall a. Fold IO a [a]
s
  Word64 -> Parser IO Word8 Word64
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ByteString -> Word64
BO.word64 ([Word8] -> ByteString
BS.pack [Word8]
b))

--parse a 32-bit integer from network byte order
word32P :: Parser IO Word8 Word32
word32P :: Parser IO Word8 Word32
word32P = do
  let s :: Fold IO a [a]
s = Fold IO a [a]
forall (m :: * -> *) a. Monad m => Fold m a [a]
FL.toList
  [Word8]
w4x8 <- Int -> Fold IO Word8 [Word8] -> Parser IO Word8 [Word8]
forall (m :: * -> *) a b.
MonadCatch m =>
Int -> Fold m a b -> Parser m a b
P.takeEQ Int
4 Fold IO Word8 [Word8]
forall a. Fold IO a [a]
s
--  traceShowM ("w4x8"::String, BO.word32 (BS.pack w4x8))
  Word32 -> Parser IO Word8 Word32
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ByteString -> Word32
BO.word32 ([Word8] -> ByteString
BS.pack [Word8]
w4x8))

-- uuid is encode as 4 32-bit words because of its convenient 32-bit tuple encoding
uuidP :: Parser IO Word8 UUID
uuidP :: Parser IO Word8 UUID
uuidP = do
  Word32
u1 <- Parser IO Word8 Word32
word32P
  Word32
u2 <- Parser IO Word8 Word32
word32P
  Word32
u3 <- Parser IO Word8 Word32
word32P
  --u4 <- word32P
  --pure (UUID (UUIDBase.fromWords u1 u2 u3 u4))-}
  --(UUID . UUIDBase.fromWords) <$> word32P <*> word32P <*> word32P <*> word32P
  UUID -> UUID
UUID (UUID -> UUID) -> (Word32 -> UUID) -> Word32 -> UUID
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Word32 -> Word32 -> Word32 -> Word32 -> UUID
UUIDBase.fromWords Word32
u1 Word32
u2 Word32
u3 (Word32 -> UUID) -> Parser IO Word8 Word32 -> Parser IO Word8 UUID
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Parser IO Word8 Word32
word32P

type NewConnectionHandler msg = IO (Maybe msg)

type NewMessageHandler req resp = req -> IO resp

-- | Listen for new connections and handle requests which are passed the server state 's'. The MVar SockAddr can be be optionally used to know when the server is ready for processing requests.
serve :: 
         RequestHandlers s->
         s ->
         HostAddr ->
         PortNumber ->
         Maybe (MVar SockAddr) ->
         IO Bool
serve :: RequestHandlers s
-> s -> HostAddr -> PortNumber -> Maybe (MVar SockAddr) -> IO Bool
serve RequestHandlers s
userMsgHandlers s
serverState HostAddr
hostaddr PortNumber
port Maybe (MVar SockAddr)
mSockLock = do
  let
      handleSock :: Socket -> IO ()
handleSock Socket
sock = do
        Locking Socket
lockingSocket <- Socket -> IO (Locking Socket)
forall a. a -> IO (Locking a)
newLock Socket
sock
        Socket -> EnvelopeHandler -> IO ()
drainSocketMessages Socket
sock (Locking Socket -> RequestHandlers s -> s -> EnvelopeHandler
forall s.
Locking Socket -> RequestHandlers s -> s -> EnvelopeHandler
serverEnvelopeHandler Locking Socket
lockingSocket RequestHandlers s
userMsgHandlers s
serverState)

  SerialT IO Socket -> ParallelT IO Socket
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
SerialT m a -> t m a
S.fromSerial (Unfold IO (HostAddr, PortNumber) Socket
-> (HostAddr, PortNumber) -> SerialT IO Socket
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
Unfold m a b -> a -> t m b
S.unfold ([(SocketOption, Int)]
-> Maybe (MVar SockAddr) -> Unfold IO (HostAddr, PortNumber) Socket
forall (m :: * -> *).
MonadIO m =>
[(SocketOption, Int)]
-> Maybe (MVar SockAddr) -> Unfold m (HostAddr, PortNumber) Socket
SA.acceptOnAddrWith [(SocketOption
ReuseAddr,Int
1)] Maybe (MVar SockAddr)
mSockLock) (HostAddr
hostaddr, PortNumber
port)) ParallelT IO Socket
-> (ParallelT IO Socket -> SerialT IO ()) -> SerialT IO ()
forall a b. a -> (a -> b) -> b
& ParallelT IO () -> SerialT IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
ParallelT m a -> t m a
S.fromParallel (ParallelT IO () -> SerialT IO ())
-> (ParallelT IO Socket -> ParallelT IO ())
-> ParallelT IO Socket
-> SerialT IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Socket -> IO ()) -> ParallelT IO Socket -> ParallelT IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
(a -> m b) -> t m a -> t m b
S.mapM ((Socket -> IO ()) -> Socket -> IO ()
forall (m :: * -> *).
(MonadMask m, MonadIO m) =>
(Socket -> m ()) -> Socket -> m ()
forSocketM Socket -> IO ()
handleSock) SerialT IO () -> (SerialT IO () -> IO ()) -> IO ()
forall a b. a -> (a -> b) -> b
& SerialT IO () -> IO ()
forall (m :: * -> *) a. Monad m => SerialT m a -> m ()
S.drain
  Bool -> IO Bool
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
True

openEnvelope :: forall s. (Serialise s, Typeable s) => Envelope -> Maybe s
openEnvelope :: Envelope -> Maybe s
openEnvelope (Envelope Fingerprint
eprint MessageType
_ UUID
_ ByteString
bytes) =
  if Fingerprint
eprint Fingerprint -> Fingerprint -> Bool
forall a. Eq a => a -> a -> Bool
== s -> Fingerprint
forall a. Typeable a => a -> Fingerprint
fingerprint (s
forall a. HasCallStack => a
undefined :: s) then
    case ByteString -> Either WineryException s
forall s. Serialise s => ByteString -> Either WineryException s
msgDeserialise ByteString
bytes of
      Left WineryException
_e -> {-traceShow ("openEnv error"::String, e) $-} Maybe s
forall a. Maybe a
Nothing
      Right s
decoded -> s -> Maybe s
forall a. a -> Maybe a
Just s
decoded
    else
    Maybe s
forall a. Maybe a
Nothing

--use winery to decode only the data structure and skip the schema
deserialiseOnly :: forall s. Serialise s => BS.ByteString -> Either WineryException s
deserialiseOnly :: ByteString -> Either WineryException s
deserialiseOnly ByteString
bytes = do
  Decoder s
dec <- Schema -> Either WineryException (Decoder s)
forall a.
Serialise a =>
Schema -> Either WineryException (Decoder a)
getDecoder (Proxy s -> Schema
forall (proxy :: * -> *) a. Serialise a => proxy a -> Schema
schema (Proxy s
forall k (t :: k). Proxy t
Proxy :: Proxy s))
  s -> Either WineryException s
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Decoder s -> ByteString -> s
forall a. Decoder a -> ByteString -> a
evalDecoder Decoder s
dec ByteString
bytes)


matchEnvelope :: forall a b s. (Serialise a, Serialise b, Typeable b) =>
              Envelope -> 
              (ConnectionState s -> a -> IO b) ->
              Maybe (ConnectionState s -> a -> IO b, a)
matchEnvelope :: Envelope
-> (ConnectionState s -> a -> IO b)
-> Maybe (ConnectionState s -> a -> IO b, a)
matchEnvelope Envelope
envelope ConnectionState s -> a -> IO b
dispatchf =
  case Envelope -> Maybe a
forall s. (Serialise s, Typeable s) => Envelope -> Maybe s
openEnvelope Envelope
envelope :: Maybe a of
    Maybe a
Nothing -> Maybe (ConnectionState s -> a -> IO b, a)
forall a. Maybe a
Nothing
    Just a
decoded -> (ConnectionState s -> a -> IO b, a)
-> Maybe (ConnectionState s -> a -> IO b, a)
forall a. a -> Maybe a
Just (ConnectionState s -> a -> IO b
dispatchf, a
decoded)

-- | Called by `serve` to process incoming envelope requests. Never returns, so use `async` to spin it off on another thread.
serverEnvelopeHandler :: 
                     Locking Socket
                     -> RequestHandlers s
                     -> s         
                     -> Envelope
                     -> IO ()
serverEnvelopeHandler :: Locking Socket -> RequestHandlers s -> s -> EnvelopeHandler
serverEnvelopeHandler Locking Socket
_ RequestHandlers s
_ s
_ (Envelope Fingerprint
_ MessageType
TimeoutResponseMessage UUID
_ ByteString
_) = () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
serverEnvelopeHandler Locking Socket
_ RequestHandlers s
_ s
_ (Envelope Fingerprint
_ MessageType
ExceptionResponseMessage UUID
_ ByteString
_) = () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
serverEnvelopeHandler Locking Socket
_ RequestHandlers s
_ s
_ (Envelope Fingerprint
_ MessageType
ResponseMessage UUID
_ ByteString
_) = () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
serverEnvelopeHandler Locking Socket
sockLock RequestHandlers s
msgHandlers s
serverState envelope :: Envelope
envelope@(Envelope Fingerprint
_ (RequestMessage Int
timeoutms) UUID
msgId ByteString
_) = do
  --find first matching handler
  let runTimeout :: IO b -> IO (Maybe b)
      runTimeout :: IO b -> IO (Maybe b)
runTimeout IO b
m = 
        if Int
timeoutms Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0 then
          (b -> Maybe b
forall a. a -> Maybe a
Just (b -> Maybe b) -> IO b -> IO (Maybe b)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO b
m) IO (Maybe b) -> (TimeoutException -> IO (Maybe b)) -> IO (Maybe b)
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`catch` TimeoutException -> IO (Maybe b)
forall b. TimeoutException -> IO (Maybe b)
timeoutExcHandler
        else
          (Int -> IO b -> IO (Maybe b)
forall a. Int -> IO a -> IO (Maybe a)
timeout (Int -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
timeoutms) IO b
m) IO (Maybe b) -> (TimeoutException -> IO (Maybe b)) -> IO (Maybe b)
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`catch` TimeoutException -> IO (Maybe b)
forall b. TimeoutException -> IO (Maybe b)
timeoutExcHandler
      --allow server-side function to throw TimeoutError which is caught here and becomes TimeoutError value
      timeoutExcHandler :: TimeoutException -> IO (Maybe b)
      timeoutExcHandler :: TimeoutException -> IO (Maybe b)
timeoutExcHandler TimeoutException
_ = Maybe b -> IO (Maybe b)
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe b
forall a. Maybe a
Nothing
      
      sState :: ConnectionState s
sState = ConnectionState :: forall a. a -> Locking Socket -> ConnectionState a
ConnectionState {
        connectionServerState :: s
connectionServerState = s
serverState,
        connectionSocket :: Locking Socket
connectionSocket = Locking Socket
sockLock
        }
            
      firstMatcher :: RequestHandler s -> Maybe () -> IO (Maybe ())
firstMatcher (RequestHandler ConnectionState s -> a -> IO b
msghandler) Maybe ()
Nothing =
        case Envelope
-> (ConnectionState s -> a -> IO b)
-> Maybe (ConnectionState s -> a -> IO b, a)
forall a b s.
(Serialise a, Serialise b, Typeable b) =>
Envelope
-> (ConnectionState s -> a -> IO b)
-> Maybe (ConnectionState s -> a -> IO b, a)
matchEnvelope Envelope
envelope ConnectionState s -> a -> IO b
msghandler of
          Maybe (ConnectionState s -> a -> IO b, a)
Nothing -> Maybe () -> IO (Maybe ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe ()
forall a. Maybe a
Nothing
          Just (ConnectionState s -> a -> IO b
dispatchf, a
decoded) -> do
            --TODO add exception handling
            Maybe b
mResponse <- IO b -> IO (Maybe b)
forall b. IO b -> IO (Maybe b)
runTimeout (ConnectionState s -> a -> IO b
dispatchf ConnectionState s
sState a
decoded)
            let envelopeResponse :: Envelope
envelopeResponse =
                  case Maybe b
mResponse of
                        Just b
response ->
                          Fingerprint -> MessageType -> UUID -> ByteString -> Envelope
Envelope (b -> Fingerprint
forall a. Typeable a => a -> Fingerprint
fingerprint b
response) MessageType
ResponseMessage UUID
msgId (b -> ByteString
forall a. Serialise a => a -> ByteString
msgSerialise b
response)
                        Maybe b
Nothing -> 
                          Fingerprint -> MessageType -> UUID -> ByteString -> Envelope
Envelope (ConnectionError -> Fingerprint
forall a. Typeable a => a -> Fingerprint
fingerprint ConnectionError
TimeoutError) MessageType
TimeoutResponseMessage UUID
msgId ByteString
BS.empty
            Envelope -> Locking Socket -> IO ()
sendEnvelope Envelope
envelopeResponse Locking Socket
sockLock
            Maybe () -> IO (Maybe ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure (() -> Maybe ()
forall a. a -> Maybe a
Just ())
      firstMatcher (AsyncRequestHandler ConnectionState s -> a -> IO ()
msghandler) Maybe ()
Nothing =        
        case Envelope
-> (ConnectionState s -> a -> IO ())
-> Maybe (ConnectionState s -> a -> IO (), a)
forall a b s.
(Serialise a, Serialise b, Typeable b) =>
Envelope
-> (ConnectionState s -> a -> IO b)
-> Maybe (ConnectionState s -> a -> IO b, a)
matchEnvelope Envelope
envelope ConnectionState s -> a -> IO ()
msghandler of
          Maybe (ConnectionState s -> a -> IO (), a)
Nothing -> Maybe () -> IO (Maybe ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe ()
forall a. Maybe a
Nothing
          Just (ConnectionState s -> a -> IO ()
dispatchf, a
decoded) -> do
              ()
_ <- ConnectionState s -> a -> IO ()
dispatchf ConnectionState s
sState a
decoded
              Maybe () -> IO (Maybe ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure (() -> Maybe ()
forall a. a -> Maybe a
Just ())
      firstMatcher RequestHandler s
_ Maybe ()
acc = Maybe () -> IO (Maybe ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe ()
acc
  Either SomeException ()
eExc <- IO () -> IO (Either SomeException ())
forall e a. Exception e => IO a -> IO (Either e a)
try (IO () -> IO (Either SomeException ()))
-> IO () -> IO (Either SomeException ())
forall a b. (a -> b) -> a -> b
$ (Maybe () -> RequestHandler s -> IO (Maybe ()))
-> Maybe () -> RequestHandlers s -> IO ()
forall (t :: * -> *) (m :: * -> *) b a.
(Foldable t, Monad m) =>
(b -> a -> m b) -> b -> t a -> m ()
foldM_ ((RequestHandler s -> Maybe () -> IO (Maybe ()))
-> Maybe () -> RequestHandler s -> IO (Maybe ())
forall a b c. (a -> b -> c) -> b -> a -> c
flip RequestHandler s -> Maybe () -> IO (Maybe ())
firstMatcher) Maybe ()
forall a. Maybe a
Nothing RequestHandlers s
msgHandlers :: IO (Either SomeException ())
  case Either SomeException ()
eExc of
    Left SomeException
exc ->
      let env :: Envelope
env = Fingerprint -> MessageType -> UUID -> ByteString -> Envelope
Envelope (String -> Fingerprint
forall a. Typeable a => a -> Fingerprint
fingerprint (SomeException -> String
forall a. Show a => a -> String
show SomeException
exc)) MessageType
ExceptionResponseMessage UUID
msgId (String -> ByteString
forall a. Serialise a => a -> ByteString
msgSerialise (SomeException -> String
forall a. Show a => a -> String
show SomeException
exc)) in
      Envelope -> Locking Socket -> IO ()
sendEnvelope Envelope
env Locking Socket
sockLock
    Right () -> () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()


type EnvelopeHandler = Envelope -> IO ()

drainSocketMessages :: Socket -> EnvelopeHandler -> IO ()
drainSocketMessages :: Socket -> EnvelopeHandler -> IO ()
drainSocketMessages Socket
sock EnvelopeHandler
envelopeHandler = do
  Unfold IO Socket Word8 -> Socket -> AsyncT IO Word8
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
Unfold m a b -> a -> t m b
S.unfold Unfold IO Socket Word8
forall (m :: * -> *). MonadIO m => Unfold m Socket Word8
SSock.read Socket
sock
  AsyncT IO Word8
-> (AsyncT IO Word8 -> AsyncT IO Envelope) -> AsyncT IO Envelope
forall a b. a -> (a -> b) -> b
& Parser IO Word8 Envelope -> AsyncT IO Word8 -> AsyncT IO Envelope
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadThrow m) =>
Parser m a b -> t m a -> t m b
P.parseMany Parser IO Word8 Envelope
envelopeP
  AsyncT IO Envelope
-> (AsyncT IO Envelope -> AsyncT IO ()) -> AsyncT IO ()
forall a b. a -> (a -> b) -> b
& EnvelopeHandler -> AsyncT IO Envelope -> AsyncT IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
(a -> m b) -> t m a -> t m b
S.mapM EnvelopeHandler
envelopeHandler
  AsyncT IO () -> (AsyncT IO () -> SerialT IO ()) -> SerialT IO ()
forall a b. a -> (a -> b) -> b
& AsyncT IO () -> SerialT IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
AsyncT m a -> t m a
S.fromAsync
  SerialT IO () -> (SerialT IO () -> IO ()) -> IO ()
forall a b. a -> (a -> b) -> b
& SerialT IO () -> IO ()
forall (m :: * -> *) a. Monad m => SerialT m a -> m ()
S.drain

--send length-tagged bytestring, perhaps should be in network byte order?
sendEnvelope :: Envelope -> Locking Socket -> IO ()
sendEnvelope :: Envelope -> Locking Socket -> IO ()
sendEnvelope Envelope
envelope Locking Socket
sockLock = do
  let envelopebytes :: ByteString
envelopebytes = Envelope -> ByteString
encodeEnvelope Envelope
envelope
  --Socket.sendAll syscalls send() on a loop until all the bytes are sent, so we need socket locking here to account for serialized messages of size > PIPE_BUF
  Locking Socket -> (Socket -> IO ()) -> IO ()
forall a b. Locking a -> (a -> IO b) -> IO b
withLock Locking Socket
sockLock ((Socket -> IO ()) -> IO ()) -> (Socket -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Socket
socket' -> do
    {-traceShowM ("sendEnvelope"::String,
                ("type"::String, envMessageType envelope),
                socket', ("env len"::String, BS.length envelopebytes),
                "payloadbytes"::String, envPayload envelope)-}
    Socket -> ByteString -> IO ()
Socket.sendAll Socket
socket' ByteString
envelopebytes
--  traceBytes "sendEnvelope" envelopebytes

fingerprint :: Typeable a => a -> Fingerprint
fingerprint :: a -> Fingerprint
fingerprint = TypeRep -> Fingerprint
typeRepFingerprint (TypeRep -> Fingerprint) -> (a -> TypeRep) -> a -> Fingerprint
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> TypeRep
forall a. Typeable a => a -> TypeRep
typeOf

fromArray :: Arr.Array Word8 -> BSI.ByteString
fromArray :: Array Word8 -> ByteString
fromArray Array Word8
arr 
    | Int
aLen Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0 = ByteString
forall a. Monoid a => a
mempty
    | Bool
otherwise = {-traceShow ("bsi len"::String, aLen, Arr.byteLength arr) $-} ForeignPtr Word8 -> Int -> Int -> ByteString
BSI.PS ForeignPtr Word8
aStartFPtr Int
0 Int
aLen
  where
    aStart :: Ptr Word8
aStart = Array Word8 -> Ptr Word8
forall a. Array a -> Ptr a
Arr.arrStart Array Word8
arr
    aEnd :: Ptr Word8
aEnd = Array Word8 -> Ptr Word8
forall a. Array a -> Ptr a
Arr.aEnd Array Word8
arr
    aStartFPtr :: ForeignPtr Word8
aStartFPtr = case Array Word8 -> Ptr Word8
forall a. Array a -> Ptr a
Arr.arrStart Array Word8
arr of
      Ptr Addr#
addr -> Addr# -> ForeignPtrContents -> ForeignPtr Word8
forall a. Addr# -> ForeignPtrContents -> ForeignPtr a
ForeignPtr Addr#
addr (ArrayContents -> ForeignPtrContents
ArrT.arrayToFptrContents (Array Word8 -> ArrayContents
forall a. Array a -> ArrayContents
Arr.arrContents Array Word8
arr))
    aLen :: Int
aLen = Ptr Word8
aEnd Ptr Word8 -> Ptr Word8 -> Int
forall a b. Ptr a -> Ptr b -> Int
`minusPtr` Ptr Word8
aStart