{-# LANGUAGE DerivingVia, DeriveGeneric, RankNTypes, ScopedTypeVariables, MultiParamTypeClasses, OverloadedStrings, GeneralizedNewtypeDeriving, CPP, ExistentialQuantification, StandaloneDeriving, GADTs, UnboxedTuples, BangPatterns #-}
{-# OPTIONS_GHC -fno-warn-orphans #-}
{- HLINT ignore "Use lambda-case" -}
module Network.RPC.Curryer.Server where
import qualified Streamly.Data.Stream.Prelude as SP
#if MIN_VERSION_streamly(0,9,0)
import Streamly.Internal.Data.Stream.Concurrent as Stream
import Streamly.Internal.Serialize.FromBytes (word32be)
import qualified Streamly.Internal.Data.Array.Type as Arr
#else
import qualified Streamly.Data.Array as Arr
import Streamly.Data.Stream.Prelude as Stream hiding (foldr)
import Streamly.Internal.Data.Binary.Parser (word32be)
#endif
import Streamly.Data.Stream as Stream hiding (foldr)

import Streamly.Network.Socket as SSock
import Network.Socket as Socket
import Network.Socket.ByteString as Socket
import Streamly.Data.Parser as P
import Codec.Winery
import Codec.Winery.Internal (varInt, decodeVarInt, getBytes)
import GHC.Generics
import GHC.Fingerprint
import Data.Typeable
import Control.Concurrent.MVar (MVar, newMVar, withMVar)
import Control.Exception
import Data.Function ((&))
import Data.Word
import qualified Data.ByteString as BS
import qualified Data.ByteString.Lazy as BSL
import qualified Data.ByteString.FastBuilder as BB
import Streamly.Data.Fold as FL hiding (foldr)
--import qualified Streamly.Internal.Data.Stream.IsStream as P
import qualified Streamly.Data.Stream.Prelude as P
import qualified Streamly.External.ByteString as StreamlyBS
import qualified Data.Binary as B
import qualified Data.UUID as UUIDBase
import qualified Data.UUID.V4 as UUIDBase
import Control.Monad
import Data.Functor
import Control.Applicative

import qualified Network.RPC.Curryer.StreamlyAdditions as SA
import Data.Hashable
import System.Timeout
import qualified Network.ByteOrder as BO


#define CURRYER_SHOW_BYTES 0
#define CURRYER_PASS_SCHEMA 0

#if CURRYER_SHOW_BYTES == 1
import Debug.Trace
#endif

traceBytes :: Applicative f => String -> BS.ByteString -> f ()  
#if CURRYER_SHOW_BYTES == 1
traceBytes msg bs = traceShowM (msg, BS.length bs, bs)
#else
traceBytes :: forall (f :: * -> *). Applicative f => String -> ByteString -> f ()
traceBytes String
_ ByteString
_ = forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
#endif

-- a level of indirection to be able to switch between serialising with and without the winery schema
msgSerialise :: Serialise a => a -> BS.ByteString
#if CURRYER_PASS_SCHEMA == 1
msgSerialise = serialise
#else
msgSerialise :: forall a. Serialise a => a -> ByteString
msgSerialise = forall a. Serialise a => a -> ByteString
serialiseOnly
#endif

msgDeserialise :: forall s. Serialise s => BS.ByteString -> Either WineryException s
#if CURRYER_PASS_SCHEMA == 1
msgDeserialise = deserialise
#else
msgDeserialise :: forall s. Serialise s => ByteString -> Either WineryException s
msgDeserialise = forall s. Serialise s => ByteString -> Either WineryException s
deserialiseOnly
#endif

data Locking a = Locking (MVar ()) a

newLock :: a -> IO (Locking a)
newLock :: forall a. a -> IO (Locking a)
newLock a
x = do
  MVar ()
lock <- forall a. a -> IO (MVar a)
newMVar ()
  forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a. MVar () -> a -> Locking a
Locking MVar ()
lock a
x)
  
withLock :: Locking a -> (a -> IO b) -> IO b
withLock :: forall a b. Locking a -> (a -> IO b) -> IO b
withLock (Locking MVar ()
mvar a
v) a -> IO b
m =
  forall a b. MVar a -> (a -> IO b) -> IO b
withMVar MVar ()
mvar forall a b. (a -> b) -> a -> b
$ \()
_ -> a -> IO b
m a
v

lockless :: Locking a -> a
lockless :: forall a. Locking a -> a
lockless (Locking MVar ()
_ a
a) = a
a

type Timeout = Word32

type BinaryMessage = BS.ByteString

--includes the fingerprint of the incoming data type (held in the BinaryMessage) to determine how to dispatch the message.
--add another envelope type for unencoded binary messages for any easy optimization for in-process communication
data Envelope = Envelope {
  Envelope -> Fingerprint
envFingerprint :: !Fingerprint,
  Envelope -> MessageType
envMessageType :: !MessageType,
  Envelope -> UUID
envMsgId :: !UUID,
  Envelope -> ByteString
envPayload :: !BinaryMessage
  }
  deriving (forall x. Rep Envelope x -> Envelope
forall x. Envelope -> Rep Envelope x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep Envelope x -> Envelope
$cfrom :: forall x. Envelope -> Rep Envelope x
Generic, Int -> Envelope -> ShowS
[Envelope] -> ShowS
Envelope -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [Envelope] -> ShowS
$cshowList :: [Envelope] -> ShowS
show :: Envelope -> String
$cshow :: Envelope -> String
showsPrec :: Int -> Envelope -> ShowS
$cshowsPrec :: Int -> Envelope -> ShowS
Show)

type TimeoutMicroseconds = Int

#if MIN_VERSION_base(4,15,0)
#else
deriving instance Generic Fingerprint
#endif
deriving via WineryVariant Fingerprint instance Serialise Fingerprint

-- | Internal type used to mark envelope types.
data MessageType = RequestMessage TimeoutMicroseconds
                 | ResponseMessage
                 | TimeoutResponseMessage
                 | ExceptionResponseMessage
                 deriving (forall x. Rep MessageType x -> MessageType
forall x. MessageType -> Rep MessageType x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep MessageType x -> MessageType
$cfrom :: forall x. MessageType -> Rep MessageType x
Generic, Int -> MessageType -> ShowS
[MessageType] -> ShowS
MessageType -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [MessageType] -> ShowS
$cshowList :: [MessageType] -> ShowS
show :: MessageType -> String
$cshow :: MessageType -> String
showsPrec :: Int -> MessageType -> ShowS
$cshowsPrec :: Int -> MessageType -> ShowS
Show)
                 deriving Typeable MessageType
BundleSerialise MessageType
Extractor MessageType
Decoder MessageType
Proxy MessageType -> SchemaGen Schema
MessageType -> Builder
forall a.
Typeable a
-> (Proxy a -> SchemaGen Schema)
-> (a -> Builder)
-> Extractor a
-> Decoder a
-> BundleSerialise a
-> Serialise a
bundleSerialise :: BundleSerialise MessageType
$cbundleSerialise :: BundleSerialise MessageType
decodeCurrent :: Decoder MessageType
$cdecodeCurrent :: Decoder MessageType
extractor :: Extractor MessageType
$cextractor :: Extractor MessageType
toBuilder :: MessageType -> Builder
$ctoBuilder :: MessageType -> Builder
schemaGen :: Proxy MessageType -> SchemaGen Schema
$cschemaGen :: Proxy MessageType -> SchemaGen Schema
Serialise via WineryVariant MessageType

-- | A list of `RequestHandler`s.
type RequestHandlers serverState = [RequestHandler serverState]

-- | Data types for server-side request handlers, in synchronous (client waits for return value) and asynchronous (client does not wait for return value) forms.
data RequestHandler serverState where
  -- | create a request handler with a response
  RequestHandler :: forall a b serverState. (Serialise a, Serialise b) => (ConnectionState serverState -> a -> IO b) -> RequestHandler serverState
  -- | create an asynchronous request handler where the client does not expect nor await a response
  AsyncRequestHandler :: forall a serverState. Serialise a => (ConnectionState serverState -> a -> IO ()) -> RequestHandler serverState

-- | Server state sent in via `serve` and passed to `RequestHandler`s.
data ConnectionState a = ConnectionState {
  forall a. ConnectionState a -> a
connectionServerState :: a,
  forall a. ConnectionState a -> Locking Socket
connectionSocket :: Locking Socket
  }

-- | Used by server-side request handlers to send additional messages to the client. This is useful for sending asynchronous responses to the client outside of the normal request-response flow. The locking socket can be found in the ConnectionState when a request handler is called.
sendMessage :: Serialise a => Locking Socket -> a -> IO ()
sendMessage :: forall a. Serialise a => Locking Socket -> a -> IO ()
sendMessage Locking Socket
lockSock a
msg = do
  UUID
requestID <- UUID -> UUID
UUID forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUIDBase.nextRandom
  let env :: Envelope
env =
        Fingerprint -> MessageType -> UUID -> ByteString -> Envelope
Envelope (forall a. Typeable a => a -> Fingerprint
fingerprint a
msg) (Int -> MessageType
RequestMessage Int
timeout') UUID
requestID (forall a. Serialise a => a -> ByteString
msgSerialise a
msg)
      timeout' :: Int
timeout' = Int
0
  Envelope -> Locking Socket -> IO ()
sendEnvelope Envelope
env Locking Socket
lockSock
  
--avoid orphan instance
newtype UUID = UUID { UUID -> UUID
_unUUID :: UUIDBase.UUID }
  deriving (Int -> UUID -> ShowS
[UUID] -> ShowS
UUID -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [UUID] -> ShowS
$cshowList :: [UUID] -> ShowS
show :: UUID -> String
$cshow :: UUID -> String
showsPrec :: Int -> UUID -> ShowS
$cshowsPrec :: Int -> UUID -> ShowS
Show, UUID -> UUID -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: UUID -> UUID -> Bool
$c/= :: UUID -> UUID -> Bool
== :: UUID -> UUID -> Bool
$c== :: UUID -> UUID -> Bool
Eq, Get UUID
[UUID] -> Put
UUID -> Put
forall t. (t -> Put) -> Get t -> ([t] -> Put) -> Binary t
putList :: [UUID] -> Put
$cputList :: [UUID] -> Put
get :: Get UUID
$cget :: Get UUID
put :: UUID -> Put
$cput :: UUID -> Put
B.Binary, Eq UUID
Int -> UUID -> Int
UUID -> Int
forall a. Eq a -> (Int -> a -> Int) -> (a -> Int) -> Hashable a
hash :: UUID -> Int
$chash :: UUID -> Int
hashWithSalt :: Int -> UUID -> Int
$chashWithSalt :: Int -> UUID -> Int
Hashable)

instance Serialise UUID where
  schemaGen :: Proxy UUID -> SchemaGen Schema
schemaGen Proxy UUID
_ = forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a. Tag -> SchemaP a -> SchemaP a
STag (Text -> Tag
TagStr Text
"Data.UUID") forall a. SchemaP a
SBytes)
  toBuilder :: UUID -> Builder
toBuilder UUID
uuid = let bytes :: ByteString
bytes = ByteString -> ByteString
BSL.toStrict (forall a. Binary a => a -> ByteString
B.encode UUID
uuid) in
                     forall a. (Bits a, Integral a) => a -> Builder
varInt (ByteString -> Int
BS.length ByteString
bytes) forall a. Semigroup a => a -> a -> a
<> ByteString -> Builder
BB.byteString ByteString
bytes
  {-# INLINE toBuilder #-}
  extractor :: Extractor UUID
extractor = forall a.
Typeable a =>
(Schema -> Strategy' (Term -> a)) -> Extractor a
mkExtractor forall a b. (a -> b) -> a -> b
$
    \Schema
schema' -> case Schema
schema' of
                 STag (TagStr Text
"Data.UUID") Schema
SBytes ->
                   forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ \Term
term -> case Term
term of
                              TBytes ByteString
bs -> forall a. Binary a => ByteString -> a
B.decode (ByteString -> ByteString
BSL.fromStrict ByteString
bs)
                              Term
term' -> forall a e. Exception e => e -> a
throw (Term -> ExtractException
InvalidTerm Term
term')
                 Schema
x -> forall a. HasCallStack => String -> a
error forall a b. (a -> b) -> a -> b
$ String
"invalid schema element " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show Schema
x
  decodeCurrent :: Decoder UUID
decodeCurrent = forall a. Binary a => ByteString -> a
B.decode forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> ByteString
BSL.fromStrict forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (forall a. (Num a, Bits a) => Decoder a
decodeVarInt forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Int -> Decoder ByteString
getBytes)

-- | Errors from remote calls.
data ConnectionError = CodecError String -- show of WineryException from exception initiator which cannot otherwise be transmitted over a line due to dependencies on TypeReps
                     | TimeoutError
                     | ExceptionError String
                     deriving (forall x. Rep ConnectionError x -> ConnectionError
forall x. ConnectionError -> Rep ConnectionError x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep ConnectionError x -> ConnectionError
$cfrom :: forall x. ConnectionError -> Rep ConnectionError x
Generic, Int -> ConnectionError -> ShowS
[ConnectionError] -> ShowS
ConnectionError -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [ConnectionError] -> ShowS
$cshowList :: [ConnectionError] -> ShowS
show :: ConnectionError -> String
$cshow :: ConnectionError -> String
showsPrec :: Int -> ConnectionError -> ShowS
$cshowsPrec :: Int -> ConnectionError -> ShowS
Show, ConnectionError -> ConnectionError -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: ConnectionError -> ConnectionError -> Bool
$c/= :: ConnectionError -> ConnectionError -> Bool
== :: ConnectionError -> ConnectionError -> Bool
$c== :: ConnectionError -> ConnectionError -> Bool
Eq)
                     deriving Typeable ConnectionError
BundleSerialise ConnectionError
Extractor ConnectionError
Decoder ConnectionError
Proxy ConnectionError -> SchemaGen Schema
ConnectionError -> Builder
forall a.
Typeable a
-> (Proxy a -> SchemaGen Schema)
-> (a -> Builder)
-> Extractor a
-> Decoder a
-> BundleSerialise a
-> Serialise a
bundleSerialise :: BundleSerialise ConnectionError
$cbundleSerialise :: BundleSerialise ConnectionError
decodeCurrent :: Decoder ConnectionError
$cdecodeCurrent :: Decoder ConnectionError
extractor :: Extractor ConnectionError
$cextractor :: Extractor ConnectionError
toBuilder :: ConnectionError -> Builder
$ctoBuilder :: ConnectionError -> Builder
schemaGen :: Proxy ConnectionError -> SchemaGen Schema
$cschemaGen :: Proxy ConnectionError -> SchemaGen Schema
Serialise via WineryVariant ConnectionError

data TimeoutException = TimeoutException
  deriving Int -> TimeoutException -> ShowS
[TimeoutException] -> ShowS
TimeoutException -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [TimeoutException] -> ShowS
$cshowList :: [TimeoutException] -> ShowS
show :: TimeoutException -> String
$cshow :: TimeoutException -> String
showsPrec :: Int -> TimeoutException -> ShowS
$cshowsPrec :: Int -> TimeoutException -> ShowS
Show

instance Exception TimeoutException  

type HostAddr = (Word8, Word8, Word8, Word8)

type BParser a = Parser Word8 IO a

allHostAddrs,localHostAddr :: HostAddr
allHostAddrs :: HostAddr
allHostAddrs = (Word8
0,Word8
0,Word8
0,Word8
0)
localHostAddr :: HostAddr
localHostAddr = (Word8
127,Word8
0,Word8
0,Word8
1)


msgTypeP :: BParser MessageType
msgTypeP :: BParser MessageType
msgTypeP = (forall (m :: * -> *) a. Monad m => (a -> Bool) -> Parser a m a
P.satisfy (forall a. Eq a => a -> a -> Bool
== Word8
0) forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*>
             (Int -> MessageType
RequestMessage forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a b. (Integral a, Num b) => a -> b
fromIntegral forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> BParser Word32
word32P)) forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|>
           (forall (m :: * -> *) a. Monad m => (a -> Bool) -> Parser a m a
P.satisfy (forall a. Eq a => a -> a -> Bool
== Word8
1) forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> MessageType
ResponseMessage) forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|>
           (forall (m :: * -> *) a. Monad m => (a -> Bool) -> Parser a m a
P.satisfy (forall a. Eq a => a -> a -> Bool
== Word8
2) forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> MessageType
TimeoutResponseMessage) forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|>
           (forall (m :: * -> *) a. Monad m => (a -> Bool) -> Parser a m a
P.satisfy (forall a. Eq a => a -> a -> Bool
== Word8
3) forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> MessageType
ExceptionResponseMessage)
                 
-- Each message is length-prefixed by a 32-bit unsigned length.
envelopeP :: BParser Envelope
envelopeP :: BParser Envelope
envelopeP = do
  let lenPrefixedByteStringP :: Parser Word8 IO ByteString
lenPrefixedByteStringP = do
        Int
c <- forall a b. (Integral a, Num b) => a -> b
fromIntegral forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall (m :: * -> *). Monad m => Parser Word8 m Word32
word32be
        --streamly can't handle takeEQ 0, so add special handling
--        traceShowM ("envelopeP payload byteCount"::String, c)
        if Int
c forall a. Eq a => a -> a -> Bool
== Int
0 then
          forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a. Monoid a => a
mempty
          else do
          Array Word8
ps <- forall (m :: * -> *) a b.
Monad m =>
Int -> Fold m a b -> Parser a m b
P.takeEQ Int
c (forall (m :: * -> *) a.
(MonadIO m, Unbox a) =>
Int -> Fold m a (Array a)
Arr.writeN Int
c)
--          traceShowM ("envelopeP read bytes", c)
          let !bs :: ByteString
bs = Array Word8 -> ByteString
StreamlyBS.fromArray Array Word8
ps
--          traceShowM ("unoptimized bs")
          forall (f :: * -> *) a. Applicative f => a -> f a
pure ByteString
bs 
  Fingerprint -> MessageType -> UUID -> ByteString -> Envelope
Envelope forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> BParser Fingerprint
fingerprintP forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> BParser MessageType
msgTypeP forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> BParser UUID
uuidP forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Parser Word8 IO ByteString
lenPrefixedByteStringP

--overhead is fingerprint (16 bytes), msgType (1+4 optional bytes for request message), msgId (4 bytes), uuid (16 bytes) = 41 bytes per request message, 37 bytes for all others
encodeEnvelope :: Envelope -> BS.ByteString
encodeEnvelope :: Envelope -> ByteString
encodeEnvelope (Envelope (Fingerprint Word64
fp1 Word64
fp2) MessageType
msgType UUID
msgId ByteString
bs) =
{-  traceShow ("encodeEnvelope"::String,
             ("fingerprint len"::String, BS.length fingerprintBs),
             ("msgtype length"::String,BS.length msgTypeBs),
             ("id len"::String, BS.length msgIdBs),
             ("payload len"::String, payloadLen),
             ("complete len"::String, BS.length completeMessage)) $-}
  ByteString
completeMessage
  where
    completeMessage :: ByteString
completeMessage = ByteString
fingerprintBs forall a. Semigroup a => a -> a -> a
<> ByteString
msgTypeBs forall a. Semigroup a => a -> a -> a
<> ByteString
msgIdBs forall a. Semigroup a => a -> a -> a
<> ByteString
lenPrefixedBs
    fingerprintBs :: ByteString
fingerprintBs = Word64 -> ByteString
BO.bytestring64 Word64
fp1 forall a. Semigroup a => a -> a -> a
<> Word64 -> ByteString
BO.bytestring64 Word64
fp2
    msgTypeBs :: ByteString
msgTypeBs = case MessageType
msgType of
      RequestMessage Int
timeoutms -> Word8 -> ByteString
BS.singleton Word8
0 forall a. Semigroup a => a -> a -> a
<> Word32 -> ByteString
BO.bytestring32 (forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
timeoutms)
      MessageType
ResponseMessage -> Word8 -> ByteString
BS.singleton Word8
1
      MessageType
TimeoutResponseMessage -> Word8 -> ByteString
BS.singleton Word8
2
      MessageType
ExceptionResponseMessage -> Word8 -> ByteString
BS.singleton Word8
3
    msgIdBs :: ByteString
msgIdBs =
      case UUID -> (Word32, Word32, Word32, Word32)
UUIDBase.toWords (UUID -> UUID
_unUUID UUID
msgId) of
        (Word32
u1, Word32
u2, Word32
u3, Word32
u4) -> forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr (forall a. Semigroup a => a -> a -> a
(<>) forall b c a. (b -> c) -> (a -> b) -> a -> c
. Word32 -> ByteString
BO.bytestring32) ByteString
BS.empty [Word32
u1, Word32
u2, Word32
u3, Word32
u4]
    lenPrefixedBs :: ByteString
lenPrefixedBs = Word32 -> ByteString
BO.bytestring32 Word32
payloadLen forall a. Semigroup a => a -> a -> a
<> ByteString
bs
    payloadLen :: Word32
payloadLen = forall a b. (Integral a, Num b) => a -> b
fromIntegral (ByteString -> Int
BS.length ByteString
bs)
    
    

fingerprintP :: BParser Fingerprint
fingerprintP :: BParser Fingerprint
fingerprintP =
  Word64 -> Word64 -> Fingerprint
Fingerprint forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> BParser Word64
word64P forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> BParser Word64
word64P

word64P :: BParser Word64
word64P :: BParser Word64
word64P = do
  let s :: Fold IO a [a]
s = forall (m :: * -> *) a. Monad m => Fold m a [a]
FL.toList
  [Word8]
b <- forall (m :: * -> *) a b.
Monad m =>
Int -> Fold m a b -> Parser a m b
P.takeEQ Int
8 forall {a}. Fold IO a [a]
s
  forall (f :: * -> *) a. Applicative f => a -> f a
pure (ByteString -> Word64
BO.word64 ([Word8] -> ByteString
BS.pack [Word8]
b))

--parse a 32-bit integer from network byte order
word32P :: BParser Word32
word32P :: BParser Word32
word32P = do
  let s :: Fold IO a [a]
s = forall (m :: * -> *) a. Monad m => Fold m a [a]
FL.toList
  [Word8]
w4x8 <- forall (m :: * -> *) a b.
Monad m =>
Int -> Fold m a b -> Parser a m b
P.takeEQ Int
4 forall {a}. Fold IO a [a]
s
--  traceShowM ("w4x8"::String, BO.word32 (BS.pack w4x8))
  forall (f :: * -> *) a. Applicative f => a -> f a
pure (ByteString -> Word32
BO.word32 ([Word8] -> ByteString
BS.pack [Word8]
w4x8))

-- uuid is encode as 4 32-bit words because of its convenient 32-bit tuple encoding
uuidP :: BParser UUID
uuidP :: BParser UUID
uuidP = do
  Word32
u1 <- BParser Word32
word32P
  Word32
u2 <- BParser Word32
word32P
  Word32
u3 <- BParser Word32
word32P
  --u4 <- word32P
  --pure (UUID (UUIDBase.fromWords u1 u2 u3 u4))-}
  --(UUID . UUIDBase.fromWords) <$> word32P <*> word32P <*> word32P <*> word32P
  UUID -> UUID
UUID forall b c a. (b -> c) -> (a -> b) -> a -> c
. Word32 -> Word32 -> Word32 -> Word32 -> UUID
UUIDBase.fromWords Word32
u1 Word32
u2 Word32
u3 forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> BParser Word32
word32P

type NewConnectionHandler msg = IO (Maybe msg)

type NewMessageHandler req resp = req -> IO resp

-- | Listen for new connections and handle requests which are passed the server state 's'. The MVar SockAddr can be be optionally used to know when the server is ready for processing requests.
serve :: 
         RequestHandlers s->
         s ->
         HostAddr ->
         PortNumber ->
         Maybe (MVar SockAddr) ->
         IO Bool
serve :: forall s.
RequestHandlers s
-> s -> HostAddr -> PortNumber -> Maybe (MVar SockAddr) -> IO Bool
serve RequestHandlers s
userMsgHandlers s
serverState HostAddr
hostaddr PortNumber
port Maybe (MVar SockAddr)
mSockLock = do
  let handleSock :: Socket -> IO ()
handleSock Socket
sock = do
        Locking Socket
lockingSocket <- forall a. a -> IO (Locking a)
newLock Socket
sock
        Socket -> EnvelopeHandler -> IO ()
drainSocketMessages Socket
sock (forall s.
Locking Socket -> RequestHandlers s -> s -> EnvelopeHandler
serverEnvelopeHandler Locking Socket
lockingSocket RequestHandlers s
userMsgHandlers s
serverState)
  forall (m :: * -> *) a b.
Applicative m =>
Unfold m a b -> a -> Stream m b
Stream.unfold (forall (m :: * -> *).
MonadIO m =>
[(SocketOption, Int)]
-> Maybe (MVar SockAddr) -> Unfold m (HostAddr, PortNumber) Socket
SA.acceptorOnAddr [(SocketOption
ReuseAddr, Int
1), (SocketOption
NoDelay, Int
1)] Maybe (MVar SockAddr)
mSockLock) (HostAddr
hostaddr, PortNumber
port) 
   forall a b. a -> (a -> b) -> b
& forall (m :: * -> *) a b.
MonadAsync m =>
(Config -> Config) -> (a -> m b) -> Stream m a -> Stream m b
Stream.parMapM forall a. a -> a
id Socket -> IO ()
handleSock
   forall a b. a -> (a -> b) -> b
& forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> m b
Stream.fold forall (m :: * -> *) a. Monad m => Fold m a ()
FL.drain
  forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
True

openEnvelope :: forall s. (Serialise s, Typeable s) => Envelope -> Maybe s
openEnvelope :: forall s. (Serialise s, Typeable s) => Envelope -> Maybe s
openEnvelope (Envelope Fingerprint
eprint MessageType
_ UUID
_ ByteString
bytes) =
  if Fingerprint
eprint forall a. Eq a => a -> a -> Bool
== forall a. Typeable a => a -> Fingerprint
fingerprint (forall a. HasCallStack => a
undefined :: s) then
    case forall s. Serialise s => ByteString -> Either WineryException s
msgDeserialise ByteString
bytes of
      Left WineryException
_e -> {-traceShow ("openEnv error"::String, _e)-} forall a. Maybe a
Nothing
      Right s
decoded -> forall a. a -> Maybe a
Just s
decoded
    else
    forall a. Maybe a
Nothing

--use winery to decode only the data structure and skip the schema
deserialiseOnly :: forall s. Serialise s => BS.ByteString -> Either WineryException s
deserialiseOnly :: forall s. Serialise s => ByteString -> Either WineryException s
deserialiseOnly ByteString
bytes = do
  Decoder s
dec <- forall a.
Serialise a =>
Schema -> Either WineryException (Decoder a)
getDecoder (forall (proxy :: * -> *) a. Serialise a => proxy a -> Schema
schema (forall {k} (t :: k). Proxy t
Proxy :: Proxy s))
  forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a. Decoder a -> ByteString -> a
evalDecoder Decoder s
dec ByteString
bytes)


matchEnvelope :: forall a b s. (Serialise a, Serialise b, Typeable b) =>
              Envelope -> 
              (ConnectionState s -> a -> IO b) ->
              Maybe (ConnectionState s -> a -> IO b, a)
matchEnvelope :: forall a b s.
(Serialise a, Serialise b, Typeable b) =>
Envelope
-> (ConnectionState s -> a -> IO b)
-> Maybe (ConnectionState s -> a -> IO b, a)
matchEnvelope Envelope
envelope ConnectionState s -> a -> IO b
dispatchf =
  case forall s. (Serialise s, Typeable s) => Envelope -> Maybe s
openEnvelope Envelope
envelope :: Maybe a of
    Maybe a
Nothing -> forall a. Maybe a
Nothing
    Just a
decoded -> forall a. a -> Maybe a
Just (ConnectionState s -> a -> IO b
dispatchf, a
decoded)

-- | Called by `serve` to process incoming envelope requests. Never returns, so use `async` to spin it off on another thread.
serverEnvelopeHandler :: 
                     Locking Socket
                     -> RequestHandlers s
                     -> s         
                     -> Envelope
                     -> IO ()
serverEnvelopeHandler :: forall s.
Locking Socket -> RequestHandlers s -> s -> EnvelopeHandler
serverEnvelopeHandler Locking Socket
_ RequestHandlers s
_ s
_ (Envelope Fingerprint
_ MessageType
TimeoutResponseMessage UUID
_ ByteString
_) = forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
serverEnvelopeHandler Locking Socket
_ RequestHandlers s
_ s
_ (Envelope Fingerprint
_ MessageType
ExceptionResponseMessage UUID
_ ByteString
_) = forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
serverEnvelopeHandler Locking Socket
_ RequestHandlers s
_ s
_ (Envelope Fingerprint
_ MessageType
ResponseMessage UUID
_ ByteString
_) = forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
serverEnvelopeHandler Locking Socket
sockLock RequestHandlers s
msgHandlers s
serverState envelope :: Envelope
envelope@(Envelope Fingerprint
_ (RequestMessage Int
timeoutms) UUID
msgId ByteString
_) = do
  --find first matching handler
  let runTimeout :: IO b -> IO (Maybe b)
      runTimeout :: forall b. IO b -> IO (Maybe b)
runTimeout IO b
m = 
        if Int
timeoutms forall a. Eq a => a -> a -> Bool
== Int
0 then
          (forall a. a -> Maybe a
Just forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO b
m) forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`catch` forall b. TimeoutException -> IO (Maybe b)
timeoutExcHandler
        else
          forall a. Int -> IO a -> IO (Maybe a)
timeout (forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
timeoutms) IO b
m forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`catch` forall b. TimeoutException -> IO (Maybe b)
timeoutExcHandler
      --allow server-side function to throw TimeoutError which is caught here and becomes TimeoutError value
      timeoutExcHandler :: TimeoutException -> IO (Maybe b)
      timeoutExcHandler :: forall b. TimeoutException -> IO (Maybe b)
timeoutExcHandler TimeoutException
_ = forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a. Maybe a
Nothing
      
      sState :: ConnectionState s
sState = ConnectionState {
        connectionServerState :: s
connectionServerState = s
serverState,
        connectionSocket :: Locking Socket
connectionSocket = Locking Socket
sockLock
        }
            
      firstMatcher :: RequestHandler s -> Maybe () -> IO (Maybe ())
firstMatcher (RequestHandler ConnectionState s -> a -> IO b
msghandler) Maybe ()
Nothing =
        case forall a b s.
(Serialise a, Serialise b, Typeable b) =>
Envelope
-> (ConnectionState s -> a -> IO b)
-> Maybe (ConnectionState s -> a -> IO b, a)
matchEnvelope Envelope
envelope ConnectionState s -> a -> IO b
msghandler of
          Maybe (ConnectionState s -> a -> IO b, a)
Nothing -> forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a. Maybe a
Nothing
          Just (ConnectionState s -> a -> IO b
dispatchf, a
decoded) -> do
            --TODO add exception handling
            Maybe b
mResponse <- forall b. IO b -> IO (Maybe b)
runTimeout (ConnectionState s -> a -> IO b
dispatchf ConnectionState s
sState a
decoded)
            let envelopeResponse :: Envelope
envelopeResponse =
                  case Maybe b
mResponse of
                        Just b
response ->
                          Fingerprint -> MessageType -> UUID -> ByteString -> Envelope
Envelope (forall a. Typeable a => a -> Fingerprint
fingerprint b
response) MessageType
ResponseMessage UUID
msgId (forall a. Serialise a => a -> ByteString
msgSerialise b
response)
                        Maybe b
Nothing -> 
                          Fingerprint -> MessageType -> UUID -> ByteString -> Envelope
Envelope (forall a. Typeable a => a -> Fingerprint
fingerprint ConnectionError
TimeoutError) MessageType
TimeoutResponseMessage UUID
msgId ByteString
BS.empty
            Envelope -> Locking Socket -> IO ()
sendEnvelope Envelope
envelopeResponse Locking Socket
sockLock
            forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a. a -> Maybe a
Just ())
      firstMatcher (AsyncRequestHandler ConnectionState s -> a -> IO ()
msghandler) Maybe ()
Nothing =        
        case forall a b s.
(Serialise a, Serialise b, Typeable b) =>
Envelope
-> (ConnectionState s -> a -> IO b)
-> Maybe (ConnectionState s -> a -> IO b, a)
matchEnvelope Envelope
envelope ConnectionState s -> a -> IO ()
msghandler of
          Maybe (ConnectionState s -> a -> IO (), a)
Nothing -> forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a. Maybe a
Nothing
          Just (ConnectionState s -> a -> IO ()
dispatchf, a
decoded) -> do
              ()
_ <- ConnectionState s -> a -> IO ()
dispatchf ConnectionState s
sState a
decoded
              forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a. a -> Maybe a
Just ())
      firstMatcher RequestHandler s
_ Maybe ()
acc = forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe ()
acc
  Either SomeException ()
eExc <- forall e a. Exception e => IO a -> IO (Either e a)
try forall a b. (a -> b) -> a -> b
$ forall (t :: * -> *) (m :: * -> *) b a.
(Foldable t, Monad m) =>
(b -> a -> m b) -> b -> t a -> m ()
foldM_ (forall a b c. (a -> b -> c) -> b -> a -> c
flip RequestHandler s -> Maybe () -> IO (Maybe ())
firstMatcher) forall a. Maybe a
Nothing RequestHandlers s
msgHandlers :: IO (Either SomeException ())
  case Either SomeException ()
eExc of
    Left SomeException
exc ->
      let env :: Envelope
env = Fingerprint -> MessageType -> UUID -> ByteString -> Envelope
Envelope (forall a. Typeable a => a -> Fingerprint
fingerprint (forall a. Show a => a -> String
show SomeException
exc)) MessageType
ExceptionResponseMessage UUID
msgId (forall a. Serialise a => a -> ByteString
msgSerialise (forall a. Show a => a -> String
show SomeException
exc)) in
      Envelope -> Locking Socket -> IO ()
sendEnvelope Envelope
env Locking Socket
sockLock
    Right () -> forall (f :: * -> *) a. Applicative f => a -> f a
pure ()


type EnvelopeHandler = Envelope -> IO ()

drainSocketMessages :: Socket -> EnvelopeHandler -> IO ()
drainSocketMessages :: Socket -> EnvelopeHandler -> IO ()
drainSocketMessages Socket
sock EnvelopeHandler
envelopeHandler = do
  forall (m :: * -> *) a b.
Applicative m =>
Unfold m a b -> a -> Stream m b
SP.unfold forall (m :: * -> *). MonadIO m => Unfold m Socket Word8
SSock.reader Socket
sock
  forall a b. a -> (a -> b) -> b
& forall (m :: * -> *) a b.
Monad m =>
Parser a m b -> Stream m a -> Stream m (Either ParseError b)
P.parseMany BParser Envelope
envelopeP
  forall a b. a -> (a -> b) -> b
& forall (m :: * -> *) a b.
Monad m =>
Stream m (Either a b) -> Stream m b
SP.catRights
  forall a b. a -> (a -> b) -> b
& forall (m :: * -> *) a b.
MonadAsync m =>
(Config -> Config) -> (a -> m b) -> Stream m a -> Stream m b
SP.parMapM (Bool -> Config -> Config
SP.ordered Bool
False) EnvelopeHandler
envelopeHandler
  forall a b. a -> (a -> b) -> b
& forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> m b
SP.fold forall (m :: * -> *) a. Monad m => Fold m a ()
FL.drain

--send length-tagged bytestring, perhaps should be in network byte order?
sendEnvelope :: Envelope -> Locking Socket -> IO ()
sendEnvelope :: Envelope -> Locking Socket -> IO ()
sendEnvelope Envelope
envelope Locking Socket
sockLock = do
  let envelopebytes :: ByteString
envelopebytes = Envelope -> ByteString
encodeEnvelope Envelope
envelope
  --Socket.sendAll syscalls send() on a loop until all the bytes are sent, so we need socket locking here to account for serialized messages of size > PIPE_BUF
  forall a b. Locking a -> (a -> IO b) -> IO b
withLock Locking Socket
sockLock forall a b. (a -> b) -> a -> b
$ \Socket
socket' -> do
    {-traceShowM ("sendEnvelope"::String,
                ("type"::String, envMessageType envelope),
                socket',
                ("envelope len out"::String, BS.length envelopebytes),
                "payloadbytes"::String, envPayload envelope
               )-}
    Socket -> ByteString -> IO ()
Socket.sendAll Socket
socket' ByteString
envelopebytes
  forall (f :: * -> *). Applicative f => String -> ByteString -> f ()
traceBytes String
"sendEnvelope" ByteString
envelopebytes

fingerprint :: Typeable a => a -> Fingerprint
fingerprint :: forall a. Typeable a => a -> Fingerprint
fingerprint = TypeRep -> Fingerprint
typeRepFingerprint forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. Typeable a => a -> TypeRep
typeOf