{-# LANGUAGE DerivingVia, DeriveGeneric, RankNTypes, ScopedTypeVariables, MultiParamTypeClasses, OverloadedStrings, GeneralizedNewtypeDeriving, CPP, ExistentialQuantification, StandaloneDeriving, GADTs, UnboxedTuples, BangPatterns #-}
{-# OPTIONS_GHC -fno-warn-orphans #-}
module Network.RPC.Curryer.Server where
import qualified Streamly.Data.Stream.Prelude as SP
#if MIN_VERSION_streamly(0,9,0)
import Streamly.Internal.Data.Stream.Concurrent as Stream
import Streamly.Internal.Serialize.FromBytes (word32be)
import qualified Streamly.Internal.Data.Array.Type as Arr
#else
import qualified Streamly.Data.Array as Arr
import Streamly.Data.Stream.Prelude as Stream hiding (foldr)
import Streamly.Internal.Data.Binary.Parser (word32be)
#endif
import Streamly.Data.Stream as Stream hiding (foldr)
import Streamly.Network.Socket as SSock
import Network.Socket as Socket
import Network.Socket.ByteString as Socket
import Streamly.Data.Parser as P
import Codec.Winery
import Codec.Winery.Internal (varInt, decodeVarInt, getBytes)
import GHC.Generics
import GHC.Fingerprint
import Data.Typeable
import Control.Concurrent.MVar (MVar, newMVar, withMVar)
import Control.Exception
import Data.Function ((&))
import Data.Word
import qualified Data.ByteString as BS
import qualified Data.ByteString.Lazy as BSL
import qualified Data.ByteString.FastBuilder as BB
import Streamly.Data.Fold as FL hiding (foldr)
import qualified Streamly.Data.Stream.Prelude as P
import qualified Streamly.External.ByteString as StreamlyBS
import qualified Data.Binary as B
import qualified Data.UUID as UUIDBase
import qualified Data.UUID.V4 as UUIDBase
import Control.Monad
import Data.Functor
import Control.Applicative
import qualified Network.RPC.Curryer.StreamlyAdditions as SA
import Data.Hashable
import System.Timeout
import qualified Network.ByteOrder as BO
#define CURRYER_SHOW_BYTES 0
#define CURRYER_PASS_SCHEMA 0
#if CURRYER_SHOW_BYTES == 1
import Debug.Trace
#endif
traceBytes :: Applicative f => String -> BS.ByteString -> f ()
#if CURRYER_SHOW_BYTES == 1
traceBytes msg bs = traceShowM (msg, BS.length bs, bs)
#else
traceBytes :: forall (f :: * -> *). Applicative f => String -> ByteString -> f ()
traceBytes String
_ ByteString
_ = forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
#endif
msgSerialise :: Serialise a => a -> BS.ByteString
#if CURRYER_PASS_SCHEMA == 1
msgSerialise = serialise
#else
msgSerialise :: forall a. Serialise a => a -> ByteString
msgSerialise = forall a. Serialise a => a -> ByteString
serialiseOnly
#endif
msgDeserialise :: forall s. Serialise s => BS.ByteString -> Either WineryException s
#if CURRYER_PASS_SCHEMA == 1
msgDeserialise = deserialise
#else
msgDeserialise :: forall s. Serialise s => ByteString -> Either WineryException s
msgDeserialise = forall s. Serialise s => ByteString -> Either WineryException s
deserialiseOnly
#endif
data Locking a = Locking (MVar ()) a
newLock :: a -> IO (Locking a)
newLock :: forall a. a -> IO (Locking a)
newLock a
x = do
MVar ()
lock <- forall a. a -> IO (MVar a)
newMVar ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a. MVar () -> a -> Locking a
Locking MVar ()
lock a
x)
withLock :: Locking a -> (a -> IO b) -> IO b
withLock :: forall a b. Locking a -> (a -> IO b) -> IO b
withLock (Locking MVar ()
mvar a
v) a -> IO b
m =
forall a b. MVar a -> (a -> IO b) -> IO b
withMVar MVar ()
mvar forall a b. (a -> b) -> a -> b
$ \()
_ -> a -> IO b
m a
v
lockless :: Locking a -> a
lockless :: forall a. Locking a -> a
lockless (Locking MVar ()
_ a
a) = a
a
type Timeout = Word32
type BinaryMessage = BS.ByteString
data Envelope = Envelope {
Envelope -> Fingerprint
envFingerprint :: !Fingerprint,
Envelope -> MessageType
envMessageType :: !MessageType,
Envelope -> UUID
envMsgId :: !UUID,
Envelope -> ByteString
envPayload :: !BinaryMessage
}
deriving (forall x. Rep Envelope x -> Envelope
forall x. Envelope -> Rep Envelope x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep Envelope x -> Envelope
$cfrom :: forall x. Envelope -> Rep Envelope x
Generic, Int -> Envelope -> ShowS
[Envelope] -> ShowS
Envelope -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [Envelope] -> ShowS
$cshowList :: [Envelope] -> ShowS
show :: Envelope -> String
$cshow :: Envelope -> String
showsPrec :: Int -> Envelope -> ShowS
$cshowsPrec :: Int -> Envelope -> ShowS
Show)
type TimeoutMicroseconds = Int
#if MIN_VERSION_base(4,15,0)
#else
deriving instance Generic Fingerprint
#endif
deriving via WineryVariant Fingerprint instance Serialise Fingerprint
data MessageType = RequestMessage TimeoutMicroseconds
| ResponseMessage
| TimeoutResponseMessage
| ExceptionResponseMessage
deriving (forall x. Rep MessageType x -> MessageType
forall x. MessageType -> Rep MessageType x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep MessageType x -> MessageType
$cfrom :: forall x. MessageType -> Rep MessageType x
Generic, Int -> MessageType -> ShowS
[MessageType] -> ShowS
MessageType -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [MessageType] -> ShowS
$cshowList :: [MessageType] -> ShowS
show :: MessageType -> String
$cshow :: MessageType -> String
showsPrec :: Int -> MessageType -> ShowS
$cshowsPrec :: Int -> MessageType -> ShowS
Show)
deriving Typeable MessageType
BundleSerialise MessageType
Extractor MessageType
Decoder MessageType
Proxy MessageType -> SchemaGen Schema
MessageType -> Builder
forall a.
Typeable a
-> (Proxy a -> SchemaGen Schema)
-> (a -> Builder)
-> Extractor a
-> Decoder a
-> BundleSerialise a
-> Serialise a
bundleSerialise :: BundleSerialise MessageType
$cbundleSerialise :: BundleSerialise MessageType
decodeCurrent :: Decoder MessageType
$cdecodeCurrent :: Decoder MessageType
extractor :: Extractor MessageType
$cextractor :: Extractor MessageType
toBuilder :: MessageType -> Builder
$ctoBuilder :: MessageType -> Builder
schemaGen :: Proxy MessageType -> SchemaGen Schema
$cschemaGen :: Proxy MessageType -> SchemaGen Schema
Serialise via WineryVariant MessageType
type RequestHandlers serverState = [RequestHandler serverState]
data RequestHandler serverState where
RequestHandler :: forall a b serverState. (Serialise a, Serialise b) => (ConnectionState serverState -> a -> IO b) -> RequestHandler serverState
AsyncRequestHandler :: forall a serverState. Serialise a => (ConnectionState serverState -> a -> IO ()) -> RequestHandler serverState
data ConnectionState a = ConnectionState {
forall a. ConnectionState a -> a
connectionServerState :: a,
forall a. ConnectionState a -> Locking Socket
connectionSocket :: Locking Socket
}
sendMessage :: Serialise a => Locking Socket -> a -> IO ()
sendMessage :: forall a. Serialise a => Locking Socket -> a -> IO ()
sendMessage Locking Socket
lockSock a
msg = do
UUID
requestID <- UUID -> UUID
UUID forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
UUIDBase.nextRandom
let env :: Envelope
env =
Fingerprint -> MessageType -> UUID -> ByteString -> Envelope
Envelope (forall a. Typeable a => a -> Fingerprint
fingerprint a
msg) (Int -> MessageType
RequestMessage Int
timeout') UUID
requestID (forall a. Serialise a => a -> ByteString
msgSerialise a
msg)
timeout' :: Int
timeout' = Int
0
Envelope -> Locking Socket -> IO ()
sendEnvelope Envelope
env Locking Socket
lockSock
newtype UUID = UUID { UUID -> UUID
_unUUID :: UUIDBase.UUID }
deriving (Int -> UUID -> ShowS
[UUID] -> ShowS
UUID -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [UUID] -> ShowS
$cshowList :: [UUID] -> ShowS
show :: UUID -> String
$cshow :: UUID -> String
showsPrec :: Int -> UUID -> ShowS
$cshowsPrec :: Int -> UUID -> ShowS
Show, UUID -> UUID -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: UUID -> UUID -> Bool
$c/= :: UUID -> UUID -> Bool
== :: UUID -> UUID -> Bool
$c== :: UUID -> UUID -> Bool
Eq, Get UUID
[UUID] -> Put
UUID -> Put
forall t. (t -> Put) -> Get t -> ([t] -> Put) -> Binary t
putList :: [UUID] -> Put
$cputList :: [UUID] -> Put
get :: Get UUID
$cget :: Get UUID
put :: UUID -> Put
$cput :: UUID -> Put
B.Binary, Eq UUID
Int -> UUID -> Int
UUID -> Int
forall a. Eq a -> (Int -> a -> Int) -> (a -> Int) -> Hashable a
hash :: UUID -> Int
$chash :: UUID -> Int
hashWithSalt :: Int -> UUID -> Int
$chashWithSalt :: Int -> UUID -> Int
Hashable)
instance Serialise UUID where
schemaGen :: Proxy UUID -> SchemaGen Schema
schemaGen Proxy UUID
_ = forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a. Tag -> SchemaP a -> SchemaP a
STag (Text -> Tag
TagStr Text
"Data.UUID") forall a. SchemaP a
SBytes)
toBuilder :: UUID -> Builder
toBuilder UUID
uuid = let bytes :: ByteString
bytes = ByteString -> ByteString
BSL.toStrict (forall a. Binary a => a -> ByteString
B.encode UUID
uuid) in
forall a. (Bits a, Integral a) => a -> Builder
varInt (ByteString -> Int
BS.length ByteString
bytes) forall a. Semigroup a => a -> a -> a
<> ByteString -> Builder
BB.byteString ByteString
bytes
{-# INLINE toBuilder #-}
extractor :: Extractor UUID
extractor = forall a.
Typeable a =>
(Schema -> Strategy' (Term -> a)) -> Extractor a
mkExtractor forall a b. (a -> b) -> a -> b
$
\Schema
schema' -> case Schema
schema' of
STag (TagStr Text
"Data.UUID") Schema
SBytes ->
forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ \Term
term -> case Term
term of
TBytes ByteString
bs -> forall a. Binary a => ByteString -> a
B.decode (ByteString -> ByteString
BSL.fromStrict ByteString
bs)
Term
term' -> forall a e. Exception e => e -> a
throw (Term -> ExtractException
InvalidTerm Term
term')
Schema
x -> forall a. HasCallStack => String -> a
error forall a b. (a -> b) -> a -> b
$ String
"invalid schema element " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show Schema
x
decodeCurrent :: Decoder UUID
decodeCurrent = forall a. Binary a => ByteString -> a
B.decode forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> ByteString
BSL.fromStrict forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (forall a. (Num a, Bits a) => Decoder a
decodeVarInt forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Int -> Decoder ByteString
getBytes)
data ConnectionError = CodecError String
| TimeoutError
| ExceptionError String
deriving (forall x. Rep ConnectionError x -> ConnectionError
forall x. ConnectionError -> Rep ConnectionError x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep ConnectionError x -> ConnectionError
$cfrom :: forall x. ConnectionError -> Rep ConnectionError x
Generic, Int -> ConnectionError -> ShowS
[ConnectionError] -> ShowS
ConnectionError -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [ConnectionError] -> ShowS
$cshowList :: [ConnectionError] -> ShowS
show :: ConnectionError -> String
$cshow :: ConnectionError -> String
showsPrec :: Int -> ConnectionError -> ShowS
$cshowsPrec :: Int -> ConnectionError -> ShowS
Show, ConnectionError -> ConnectionError -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: ConnectionError -> ConnectionError -> Bool
$c/= :: ConnectionError -> ConnectionError -> Bool
== :: ConnectionError -> ConnectionError -> Bool
$c== :: ConnectionError -> ConnectionError -> Bool
Eq)
deriving Typeable ConnectionError
BundleSerialise ConnectionError
Extractor ConnectionError
Decoder ConnectionError
Proxy ConnectionError -> SchemaGen Schema
ConnectionError -> Builder
forall a.
Typeable a
-> (Proxy a -> SchemaGen Schema)
-> (a -> Builder)
-> Extractor a
-> Decoder a
-> BundleSerialise a
-> Serialise a
bundleSerialise :: BundleSerialise ConnectionError
$cbundleSerialise :: BundleSerialise ConnectionError
decodeCurrent :: Decoder ConnectionError
$cdecodeCurrent :: Decoder ConnectionError
extractor :: Extractor ConnectionError
$cextractor :: Extractor ConnectionError
toBuilder :: ConnectionError -> Builder
$ctoBuilder :: ConnectionError -> Builder
schemaGen :: Proxy ConnectionError -> SchemaGen Schema
$cschemaGen :: Proxy ConnectionError -> SchemaGen Schema
Serialise via WineryVariant ConnectionError
data TimeoutException = TimeoutException
deriving Int -> TimeoutException -> ShowS
[TimeoutException] -> ShowS
TimeoutException -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [TimeoutException] -> ShowS
$cshowList :: [TimeoutException] -> ShowS
show :: TimeoutException -> String
$cshow :: TimeoutException -> String
showsPrec :: Int -> TimeoutException -> ShowS
$cshowsPrec :: Int -> TimeoutException -> ShowS
Show
instance Exception TimeoutException
type HostAddr = (Word8, Word8, Word8, Word8)
type BParser a = Parser Word8 IO a
allHostAddrs,localHostAddr :: HostAddr
allHostAddrs :: HostAddr
allHostAddrs = (Word8
0,Word8
0,Word8
0,Word8
0)
localHostAddr :: HostAddr
localHostAddr = (Word8
127,Word8
0,Word8
0,Word8
1)
msgTypeP :: BParser MessageType
msgTypeP :: BParser MessageType
msgTypeP = (forall (m :: * -> *) a. Monad m => (a -> Bool) -> Parser a m a
P.satisfy (forall a. Eq a => a -> a -> Bool
== Word8
0) forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*>
(Int -> MessageType
RequestMessage forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a b. (Integral a, Num b) => a -> b
fromIntegral forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> BParser Word32
word32P)) forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|>
(forall (m :: * -> *) a. Monad m => (a -> Bool) -> Parser a m a
P.satisfy (forall a. Eq a => a -> a -> Bool
== Word8
1) forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> MessageType
ResponseMessage) forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|>
(forall (m :: * -> *) a. Monad m => (a -> Bool) -> Parser a m a
P.satisfy (forall a. Eq a => a -> a -> Bool
== Word8
2) forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> MessageType
TimeoutResponseMessage) forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|>
(forall (m :: * -> *) a. Monad m => (a -> Bool) -> Parser a m a
P.satisfy (forall a. Eq a => a -> a -> Bool
== Word8
3) forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> MessageType
ExceptionResponseMessage)
envelopeP :: BParser Envelope
envelopeP :: BParser Envelope
envelopeP = do
let lenPrefixedByteStringP :: Parser Word8 IO ByteString
lenPrefixedByteStringP = do
Int
c <- forall a b. (Integral a, Num b) => a -> b
fromIntegral forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall (m :: * -> *). Monad m => Parser Word8 m Word32
word32be
if Int
c forall a. Eq a => a -> a -> Bool
== Int
0 then
forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a. Monoid a => a
mempty
else do
Array Word8
ps <- forall (m :: * -> *) a b.
Monad m =>
Int -> Fold m a b -> Parser a m b
P.takeEQ Int
c (forall (m :: * -> *) a.
(MonadIO m, Unbox a) =>
Int -> Fold m a (Array a)
Arr.writeN Int
c)
let !bs :: ByteString
bs = Array Word8 -> ByteString
StreamlyBS.fromArray Array Word8
ps
forall (f :: * -> *) a. Applicative f => a -> f a
pure ByteString
bs
Fingerprint -> MessageType -> UUID -> ByteString -> Envelope
Envelope forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> BParser Fingerprint
fingerprintP forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> BParser MessageType
msgTypeP forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> BParser UUID
uuidP forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Parser Word8 IO ByteString
lenPrefixedByteStringP
encodeEnvelope :: Envelope -> BS.ByteString
encodeEnvelope :: Envelope -> ByteString
encodeEnvelope (Envelope (Fingerprint Word64
fp1 Word64
fp2) MessageType
msgType UUID
msgId ByteString
bs) =
ByteString
completeMessage
where
completeMessage :: ByteString
completeMessage = ByteString
fingerprintBs forall a. Semigroup a => a -> a -> a
<> ByteString
msgTypeBs forall a. Semigroup a => a -> a -> a
<> ByteString
msgIdBs forall a. Semigroup a => a -> a -> a
<> ByteString
lenPrefixedBs
fingerprintBs :: ByteString
fingerprintBs = Word64 -> ByteString
BO.bytestring64 Word64
fp1 forall a. Semigroup a => a -> a -> a
<> Word64 -> ByteString
BO.bytestring64 Word64
fp2
msgTypeBs :: ByteString
msgTypeBs = case MessageType
msgType of
RequestMessage Int
timeoutms -> Word8 -> ByteString
BS.singleton Word8
0 forall a. Semigroup a => a -> a -> a
<> Word32 -> ByteString
BO.bytestring32 (forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
timeoutms)
MessageType
ResponseMessage -> Word8 -> ByteString
BS.singleton Word8
1
MessageType
TimeoutResponseMessage -> Word8 -> ByteString
BS.singleton Word8
2
MessageType
ExceptionResponseMessage -> Word8 -> ByteString
BS.singleton Word8
3
msgIdBs :: ByteString
msgIdBs =
case UUID -> (Word32, Word32, Word32, Word32)
UUIDBase.toWords (UUID -> UUID
_unUUID UUID
msgId) of
(Word32
u1, Word32
u2, Word32
u3, Word32
u4) -> forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr (forall a. Semigroup a => a -> a -> a
(<>) forall b c a. (b -> c) -> (a -> b) -> a -> c
. Word32 -> ByteString
BO.bytestring32) ByteString
BS.empty [Word32
u1, Word32
u2, Word32
u3, Word32
u4]
lenPrefixedBs :: ByteString
lenPrefixedBs = Word32 -> ByteString
BO.bytestring32 Word32
payloadLen forall a. Semigroup a => a -> a -> a
<> ByteString
bs
payloadLen :: Word32
payloadLen = forall a b. (Integral a, Num b) => a -> b
fromIntegral (ByteString -> Int
BS.length ByteString
bs)
fingerprintP :: BParser Fingerprint
fingerprintP :: BParser Fingerprint
fingerprintP =
Word64 -> Word64 -> Fingerprint
Fingerprint forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> BParser Word64
word64P forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> BParser Word64
word64P
word64P :: BParser Word64
word64P :: BParser Word64
word64P = do
let s :: Fold IO a [a]
s = forall (m :: * -> *) a. Monad m => Fold m a [a]
FL.toList
[Word8]
b <- forall (m :: * -> *) a b.
Monad m =>
Int -> Fold m a b -> Parser a m b
P.takeEQ Int
8 forall {a}. Fold IO a [a]
s
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ByteString -> Word64
BO.word64 ([Word8] -> ByteString
BS.pack [Word8]
b))
word32P :: BParser Word32
word32P :: BParser Word32
word32P = do
let s :: Fold IO a [a]
s = forall (m :: * -> *) a. Monad m => Fold m a [a]
FL.toList
[Word8]
w4x8 <- forall (m :: * -> *) a b.
Monad m =>
Int -> Fold m a b -> Parser a m b
P.takeEQ Int
4 forall {a}. Fold IO a [a]
s
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ByteString -> Word32
BO.word32 ([Word8] -> ByteString
BS.pack [Word8]
w4x8))
uuidP :: BParser UUID
uuidP :: BParser UUID
uuidP = do
Word32
u1 <- BParser Word32
word32P
Word32
u2 <- BParser Word32
word32P
Word32
u3 <- BParser Word32
word32P
UUID -> UUID
UUID forall b c a. (b -> c) -> (a -> b) -> a -> c
. Word32 -> Word32 -> Word32 -> Word32 -> UUID
UUIDBase.fromWords Word32
u1 Word32
u2 Word32
u3 forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> BParser Word32
word32P
type NewConnectionHandler msg = IO (Maybe msg)
type NewMessageHandler req resp = req -> IO resp
serve ::
RequestHandlers s->
s ->
HostAddr ->
PortNumber ->
Maybe (MVar SockAddr) ->
IO Bool
serve :: forall s.
RequestHandlers s
-> s -> HostAddr -> PortNumber -> Maybe (MVar SockAddr) -> IO Bool
serve RequestHandlers s
userMsgHandlers s
serverState HostAddr
hostaddr PortNumber
port Maybe (MVar SockAddr)
mSockLock = do
let handleSock :: Socket -> IO ()
handleSock Socket
sock = do
Locking Socket
lockingSocket <- forall a. a -> IO (Locking a)
newLock Socket
sock
Socket -> EnvelopeHandler -> IO ()
drainSocketMessages Socket
sock (forall s.
Locking Socket -> RequestHandlers s -> s -> EnvelopeHandler
serverEnvelopeHandler Locking Socket
lockingSocket RequestHandlers s
userMsgHandlers s
serverState)
forall (m :: * -> *) a b.
Applicative m =>
Unfold m a b -> a -> Stream m b
Stream.unfold (forall (m :: * -> *).
MonadIO m =>
[(SocketOption, Int)]
-> Maybe (MVar SockAddr) -> Unfold m (HostAddr, PortNumber) Socket
SA.acceptorOnAddr [(SocketOption
ReuseAddr, Int
1), (SocketOption
NoDelay, Int
1)] Maybe (MVar SockAddr)
mSockLock) (HostAddr
hostaddr, PortNumber
port)
forall a b. a -> (a -> b) -> b
& forall (m :: * -> *) a b.
MonadAsync m =>
(Config -> Config) -> (a -> m b) -> Stream m a -> Stream m b
Stream.parMapM forall a. a -> a
id Socket -> IO ()
handleSock
forall a b. a -> (a -> b) -> b
& forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> m b
Stream.fold forall (m :: * -> *) a. Monad m => Fold m a ()
FL.drain
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
True
openEnvelope :: forall s. (Serialise s, Typeable s) => Envelope -> Maybe s
openEnvelope :: forall s. (Serialise s, Typeable s) => Envelope -> Maybe s
openEnvelope (Envelope Fingerprint
eprint MessageType
_ UUID
_ ByteString
bytes) =
if Fingerprint
eprint forall a. Eq a => a -> a -> Bool
== forall a. Typeable a => a -> Fingerprint
fingerprint (forall a. HasCallStack => a
undefined :: s) then
case forall s. Serialise s => ByteString -> Either WineryException s
msgDeserialise ByteString
bytes of
Left WineryException
_e -> forall a. Maybe a
Nothing
Right s
decoded -> forall a. a -> Maybe a
Just s
decoded
else
forall a. Maybe a
Nothing
deserialiseOnly :: forall s. Serialise s => BS.ByteString -> Either WineryException s
deserialiseOnly :: forall s. Serialise s => ByteString -> Either WineryException s
deserialiseOnly ByteString
bytes = do
Decoder s
dec <- forall a.
Serialise a =>
Schema -> Either WineryException (Decoder a)
getDecoder (forall (proxy :: * -> *) a. Serialise a => proxy a -> Schema
schema (forall {k} (t :: k). Proxy t
Proxy :: Proxy s))
forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a. Decoder a -> ByteString -> a
evalDecoder Decoder s
dec ByteString
bytes)
matchEnvelope :: forall a b s. (Serialise a, Serialise b, Typeable b) =>
Envelope ->
(ConnectionState s -> a -> IO b) ->
Maybe (ConnectionState s -> a -> IO b, a)
matchEnvelope :: forall a b s.
(Serialise a, Serialise b, Typeable b) =>
Envelope
-> (ConnectionState s -> a -> IO b)
-> Maybe (ConnectionState s -> a -> IO b, a)
matchEnvelope Envelope
envelope ConnectionState s -> a -> IO b
dispatchf =
case forall s. (Serialise s, Typeable s) => Envelope -> Maybe s
openEnvelope Envelope
envelope :: Maybe a of
Maybe a
Nothing -> forall a. Maybe a
Nothing
Just a
decoded -> forall a. a -> Maybe a
Just (ConnectionState s -> a -> IO b
dispatchf, a
decoded)
serverEnvelopeHandler ::
Locking Socket
-> RequestHandlers s
-> s
-> Envelope
-> IO ()
serverEnvelopeHandler :: forall s.
Locking Socket -> RequestHandlers s -> s -> EnvelopeHandler
serverEnvelopeHandler Locking Socket
_ RequestHandlers s
_ s
_ (Envelope Fingerprint
_ MessageType
TimeoutResponseMessage UUID
_ ByteString
_) = forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
serverEnvelopeHandler Locking Socket
_ RequestHandlers s
_ s
_ (Envelope Fingerprint
_ MessageType
ExceptionResponseMessage UUID
_ ByteString
_) = forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
serverEnvelopeHandler Locking Socket
_ RequestHandlers s
_ s
_ (Envelope Fingerprint
_ MessageType
ResponseMessage UUID
_ ByteString
_) = forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
serverEnvelopeHandler Locking Socket
sockLock RequestHandlers s
msgHandlers s
serverState envelope :: Envelope
envelope@(Envelope Fingerprint
_ (RequestMessage Int
timeoutms) UUID
msgId ByteString
_) = do
let runTimeout :: IO b -> IO (Maybe b)
runTimeout :: forall b. IO b -> IO (Maybe b)
runTimeout IO b
m =
if Int
timeoutms forall a. Eq a => a -> a -> Bool
== Int
0 then
(forall a. a -> Maybe a
Just forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO b
m) forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`catch` forall b. TimeoutException -> IO (Maybe b)
timeoutExcHandler
else
forall a. Int -> IO a -> IO (Maybe a)
timeout (forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
timeoutms) IO b
m forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`catch` forall b. TimeoutException -> IO (Maybe b)
timeoutExcHandler
timeoutExcHandler :: TimeoutException -> IO (Maybe b)
timeoutExcHandler :: forall b. TimeoutException -> IO (Maybe b)
timeoutExcHandler TimeoutException
_ = forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a. Maybe a
Nothing
sState :: ConnectionState s
sState = ConnectionState {
connectionServerState :: s
connectionServerState = s
serverState,
connectionSocket :: Locking Socket
connectionSocket = Locking Socket
sockLock
}
firstMatcher :: RequestHandler s -> Maybe () -> IO (Maybe ())
firstMatcher (RequestHandler ConnectionState s -> a -> IO b
msghandler) Maybe ()
Nothing =
case forall a b s.
(Serialise a, Serialise b, Typeable b) =>
Envelope
-> (ConnectionState s -> a -> IO b)
-> Maybe (ConnectionState s -> a -> IO b, a)
matchEnvelope Envelope
envelope ConnectionState s -> a -> IO b
msghandler of
Maybe (ConnectionState s -> a -> IO b, a)
Nothing -> forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a. Maybe a
Nothing
Just (ConnectionState s -> a -> IO b
dispatchf, a
decoded) -> do
Maybe b
mResponse <- forall b. IO b -> IO (Maybe b)
runTimeout (ConnectionState s -> a -> IO b
dispatchf ConnectionState s
sState a
decoded)
let envelopeResponse :: Envelope
envelopeResponse =
case Maybe b
mResponse of
Just b
response ->
Fingerprint -> MessageType -> UUID -> ByteString -> Envelope
Envelope (forall a. Typeable a => a -> Fingerprint
fingerprint b
response) MessageType
ResponseMessage UUID
msgId (forall a. Serialise a => a -> ByteString
msgSerialise b
response)
Maybe b
Nothing ->
Fingerprint -> MessageType -> UUID -> ByteString -> Envelope
Envelope (forall a. Typeable a => a -> Fingerprint
fingerprint ConnectionError
TimeoutError) MessageType
TimeoutResponseMessage UUID
msgId ByteString
BS.empty
Envelope -> Locking Socket -> IO ()
sendEnvelope Envelope
envelopeResponse Locking Socket
sockLock
forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a. a -> Maybe a
Just ())
firstMatcher (AsyncRequestHandler ConnectionState s -> a -> IO ()
msghandler) Maybe ()
Nothing =
case forall a b s.
(Serialise a, Serialise b, Typeable b) =>
Envelope
-> (ConnectionState s -> a -> IO b)
-> Maybe (ConnectionState s -> a -> IO b, a)
matchEnvelope Envelope
envelope ConnectionState s -> a -> IO ()
msghandler of
Maybe (ConnectionState s -> a -> IO (), a)
Nothing -> forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a. Maybe a
Nothing
Just (ConnectionState s -> a -> IO ()
dispatchf, a
decoded) -> do
()
_ <- ConnectionState s -> a -> IO ()
dispatchf ConnectionState s
sState a
decoded
forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a. a -> Maybe a
Just ())
firstMatcher RequestHandler s
_ Maybe ()
acc = forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe ()
acc
Either SomeException ()
eExc <- forall e a. Exception e => IO a -> IO (Either e a)
try forall a b. (a -> b) -> a -> b
$ forall (t :: * -> *) (m :: * -> *) b a.
(Foldable t, Monad m) =>
(b -> a -> m b) -> b -> t a -> m ()
foldM_ (forall a b c. (a -> b -> c) -> b -> a -> c
flip RequestHandler s -> Maybe () -> IO (Maybe ())
firstMatcher) forall a. Maybe a
Nothing RequestHandlers s
msgHandlers :: IO (Either SomeException ())
case Either SomeException ()
eExc of
Left SomeException
exc ->
let env :: Envelope
env = Fingerprint -> MessageType -> UUID -> ByteString -> Envelope
Envelope (forall a. Typeable a => a -> Fingerprint
fingerprint (forall a. Show a => a -> String
show SomeException
exc)) MessageType
ExceptionResponseMessage UUID
msgId (forall a. Serialise a => a -> ByteString
msgSerialise (forall a. Show a => a -> String
show SomeException
exc)) in
Envelope -> Locking Socket -> IO ()
sendEnvelope Envelope
env Locking Socket
sockLock
Right () -> forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
type EnvelopeHandler = Envelope -> IO ()
drainSocketMessages :: Socket -> EnvelopeHandler -> IO ()
drainSocketMessages :: Socket -> EnvelopeHandler -> IO ()
drainSocketMessages Socket
sock EnvelopeHandler
envelopeHandler = do
forall (m :: * -> *) a b.
Applicative m =>
Unfold m a b -> a -> Stream m b
SP.unfold forall (m :: * -> *). MonadIO m => Unfold m Socket Word8
SSock.reader Socket
sock
forall a b. a -> (a -> b) -> b
& forall (m :: * -> *) a b.
Monad m =>
Parser a m b -> Stream m a -> Stream m (Either ParseError b)
P.parseMany BParser Envelope
envelopeP
forall a b. a -> (a -> b) -> b
& forall (m :: * -> *) a b.
Monad m =>
Stream m (Either a b) -> Stream m b
SP.catRights
forall a b. a -> (a -> b) -> b
& forall (m :: * -> *) a b.
MonadAsync m =>
(Config -> Config) -> (a -> m b) -> Stream m a -> Stream m b
SP.parMapM (Bool -> Config -> Config
SP.ordered Bool
False) EnvelopeHandler
envelopeHandler
forall a b. a -> (a -> b) -> b
& forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> m b
SP.fold forall (m :: * -> *) a. Monad m => Fold m a ()
FL.drain
sendEnvelope :: Envelope -> Locking Socket -> IO ()
sendEnvelope :: Envelope -> Locking Socket -> IO ()
sendEnvelope Envelope
envelope Locking Socket
sockLock = do
let envelopebytes :: ByteString
envelopebytes = Envelope -> ByteString
encodeEnvelope Envelope
envelope
forall a b. Locking a -> (a -> IO b) -> IO b
withLock Locking Socket
sockLock forall a b. (a -> b) -> a -> b
$ \Socket
socket' -> do
Socket -> ByteString -> IO ()
Socket.sendAll Socket
socket' ByteString
envelopebytes
forall (f :: * -> *). Applicative f => String -> ByteString -> f ()
traceBytes String
"sendEnvelope" ByteString
envelopebytes
fingerprint :: Typeable a => a -> Fingerprint
fingerprint :: forall a. Typeable a => a -> Fingerprint
fingerprint = TypeRep -> Fingerprint
typeRepFingerprint forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. Typeable a => a -> TypeRep
typeOf