module Main where import Control.Concurrent import Control.Concurrent.STM import Control.Monad import Control.Concurrent.Async (async, cancel, link, race_, replicateConcurrently_) import Control.Exception (bracket) import Data.ByteString qualified as BS import Data.ByteString.Lazy qualified as BSL import Data.IORef (newIORef, atomicModifyIORef') import Data.IntMap.Strict qualified as IntMap import Data.Text (Text) import Data.Text.Encoding (encodeUtf8, decodeUtf8) import GHC.Generics (Generic) import Network.QUIC.Simple qualified as QUIC import Network.QUIC.Simple.Stream (MessageQueues, streamCodec, streamSerialise) import System.Timeout (timeout) main :: IO () main = do putStrLn "Raw" race_ serverRaw clientRaw putStrLn "" putStrLn "Mailbox" race_ serverBox clientBox putStrLn "" putStrLn "Serialise" race_ serverSerialise clientSerialise putStrLn "" putStrLn "Simple" race_ serverSimple clientSimple putStrLn "" putStrLn "Stateful/Async" race_ serverStateful clientAsync putStrLn "" -- * Raw clientRaw :: IO () clientRaw = QUIC.runClient "127.0.0.1" "14443" \conn stream -> do -- the initial stream comes pre-requested and ready to go putStrLn "Client connected:" QUIC.getConnectionInfo conn >>= print -- streams have no framing on their own -- but that's fine, for now... QUIC.sendStream stream "hi there" reply <- QUIC.recvStream stream 4096 putStrLn $ "Client got reply: " <> show reply QUIC.closeStream stream putStrLn "Client quits" serverRaw :: IO () serverRaw = QUIC.runServer [("127.0.0.1", 14443)] \conn stream -> do -- the initial stream comes pre-accepted and ready to go putStrLn "Server accepted connection:" QUIC.getConnectionInfo conn >>= print -- wait until *something* arrives and take it all in query <- QUIC.recvStream stream 4096 putStrLn $ "Server got query: " <> show query QUIC.sendStream stream $ "got yer bytes: " <> query -- the final bytes will be "", signalling the connection getting closed finalBytes <- QUIC.recvStream stream 4096 putStrLn $ "Server quits after " <> show finalBytes -- * Stream queues / framing {- | Stateful codec that splits byte stream into text messages This allows sending empty messages, but may break if the Text has a NUL in there. What kind of "text" is that anyway?! -} cstringCodec :: QUIC.Stream -> IO (MessageQueues Text Text) cstringCodec stream = do previous <- newIORef BSL.empty -- gotta store partial messages somewhere snd <$> streamCodec encode (decode previous) stream where -- add framing encode msg = BSL.fromChunks [encodeUtf8 msg, "\NUL"] -- strip framing decode previous chunk = case BS.break (== 0) chunk of ("", "") -> -- the connection is closing pure ("", Nothing) (partial, "") -> do -- everything is consumed, but no frame marker is in sight atomicModifyIORef' previous \prefix -> (prefix <> BSL.fromStrict partial, ()) pure ("", Nothing) -- don't emit a message just yet (suffix, leftovers) -> do -- found the marker prefix <- atomicModifyIORef' previous ("",) pure ( BS.drop 1 leftovers -- remove the separator , Just . decodeUtf8 . BSL.toStrict $ prefix <> BSL.fromStrict suffix -- decode the whole message ) clientBox :: IO () clientBox = do QUIC.runClient "127.0.0.1" "14443" \_conn stream -> do (writeQ, readQ) <- cstringCodec stream -- send all the messages at once atomically $ writeTBQueue writeQ "hi there" -- with the framing in place, this one is a valid message now atomically $ writeTBQueue writeQ "" atomically $ writeTBQueue writeQ "and again" -- read the replies one by one reply1 <- atomically $ readTBQueue readQ putStrLn $ "Client got reply 1: " <> show reply1 reply2 <- atomically $ readTBQueue readQ putStrLn $ "Client got reply 2: " <> show reply2 reply3 <- atomically $ readTBQueue readQ putStrLn $ "Client got reply 3: " <> show reply3 serverBox :: IO () serverBox = QUIC.runServer [("127.0.0.1", 14443)] \_conn stream -> do putStrLn "Server accepted connection:" (writeQ, readQ) <- cstringCodec stream forever do -- a linearised logging echo server query <- atomically $ readTBQueue readQ putStrLn $ "Server got query: " <> show query atomically $ writeTBQueue writeQ $ "got yer bytes: " <> query -- * Serialised messages data ClientMessage = Hello | Bye deriving (Eq, Show, Ord, Generic) -- derive Generic so Serialise can do its thing -- using StandAloneDeriving instance QUIC.Serialise ClientMessage data ServerMessage = Ok Int deriving (Eq, Show, Ord, Generic, QUIC.Serialise) -- using DeriveAnyClass {- XXX: No turn structure is imposed by the protocol. The messages get delivered in order, but the client and server have to coordinate implicitly on when to wait for a reply and when to go without waiting. Unless the messages themselves carry call IDs, there's no way to associate replies with responses besides sending in lockstep. -} clientSerialise :: IO () clientSerialise = do QUIC.runClient "127.0.0.1" "14443" \_conn stream -> do (writeQ, readQ) <- snd <$> streamSerialise stream replicateM_ 5 do -- send messages one by one atomically $ writeTBQueue writeQ Hello -- wait for the reply before sending another one reply <- atomically $ readTBQueue @ServerMessage readQ -- here we know that the reply is sent in response to the Hello putStrLn $ "Client got reply: " <> show reply -- send, but don't wait for the reply atomically $ writeTBQueue writeQ Bye -- the connection is terminated serverSerialise :: IO () serverSerialise = QUIC.runServer [("127.0.0.1", 14443)] \_conn stream -> do putStrLn "Server accepted connection:" (writeQ, readQ) <- snd <$> streamSerialise stream let -- simple state-passing loop loop counter = do query <- atomically (readTBQueue readQ) putStrLn $ "Server got query: " <> show query case query of Hello -> do -- reply at once before reading the next message atomically $ writeTBQueue writeQ (Ok counter) loop (counter + 1) Bye -> -- don't reply -- XXX: the client will hang indefinitely if waiting for the reply here pure () loop 0 -- * Sync client-driven RPC {- | The wrappers from startClientSimple replace implementation details with a handle. It also imposes a sync/linearised interaction -- every call gets a response. -} clientSimple :: IO () clientSimple = -- since this is a handle pattern, it should be properly scoped bracket open close \(_stop, call) -> do -- the flow is the same as in clientSerialise, replicateM_ 5 do -- no queues, just running a function to get a reply Ok n <- call Hello putStrLn $ "Client got reply " <> show n timeout 1000000 (call Bye) >>= mapM_ \reply -> putStrLn $ "Shouldn't happen, the server errors out on this: " <> show reply putStrLn "Stopping" where open = QUIC.startClientSimple "127.0.0.1" "14443" close (stop, _call) = stop {- | Wrappers in runServerSimple ensure that every call gets a response. -} serverSimple :: IO () serverSimple = do counter <- newIORef 0 -- some global state shared by all connections QUIC.runServerSimple "127.0.0.1" 14443 \case Hello -> do -- the connection handler is stateless self <- myThreadId -- but its thread id can be used as a key in the global state -- not now, though... putStrLn $ "Server got Hello from " <> show self n <- atomicModifyIORef' counter \old -> (old + 1, old) pure $ Ok n Bye -> do putStrLn "Server got Bye" -- trade-offs... error "Whelp, the serverSimple handler must reply, but the protocol should stop. Needs a re-design..." -- * DIY async calls/events {- | Wrappers from startClientAsync only do the connection setup and provide all the internals. The actual messages implicitly wrap ClientMessage/ServerMessage with an optional call id. This allows the messages to be treated as calls (sync request-response) or casts (no-reply messages). The client now has a state of its own to track the requests in flight. The calls are still sync, but the client now allows calling from multiple threads without relying on message ordering. -} clientAsync :: IO () clientAsync = do -- wait for the connection (client, _conn, (writeQ, readQ)) <- QUIC.startClientAsync "127.0.0.1" "14443" -- the casts require no state, but have no response handlers let cast q = atomically $ writeTBQueue writeQ (Nothing, q) -- events, like casts have no call id, but originating from server -- XXX: this protocol has no notion of server-sent requests though events <- newTBQueueIO 16 calls <- newTVarIO mempty void $ async do -- a dedicated thread now reads all messages and does the triage link client -- exit when the client stops forever do atomically (readTBQueue readQ) >>= \case (Nothing, e) -> -- no call id -- this is an event atomically $ writeTBQueue events e (Just callId, r) -> -- call id is present -- this is a response atomically $ modifyTVar' calls $ IntMap.insert callId r -- issuing calls from multiple threads requires avoiding getting someone else's reply counter <- newIORef 0 -- the simplest key is a counter let call q = do callId <- atomicModifyIORef' counter \old -> (old + 1, old) atomically $ writeTBQueue writeQ (Just callId, q) -- send the message annotated with call id replyVar <- newTVarIO undefined -- XXX: this will not be read until the reply is arrived let popOrRetry = IntMap.alterF \case Just r -> Nothing <$ writeTVar replyVar r -- pop the reply and store it outside, then return Nothing -> retry -- the calls map is missing the call id, continue waiting atomically $ -- the caller is blocked until the transaction succeeds readTVar calls >>= popOrRetry callId >>= writeTVar calls -- the transaction concluded, it is now safe to read the var readTVarIO replyVar -- send all the requests at once, each in its own thread replicateConcurrently_ 5 do -- everyone will block until they receive their own response Ok n <- call Hello putStrLn $ "Client got reply " <> show n cast Bye -- no reply is even expected cancel client -- we have no event handler thread so we read the whole queue unread <- atomically (flushTBQueue events) -- the server does not send any events though putStrLn $ "Client events: " <> show unread {- | Wrappers in runServerStateful provide a richer interface and impose less. -} serverStateful :: IO () serverStateful = do -- some global state conns <- newTVarIO mempty connIds <- newIORef 0 QUIC.runServerStateful "127.0.0.1" 14443 (setup conns connIds) (teardown conns) handler where -- every connection has a local state too setup conns counter _conn writeQ = do -- generate an explicit key connId <- atomicModifyIORef' counter \old -> (old + 1, old) -- don't store ThreadIDs directly! me <- myThreadId >>= mkWeakThreadId atomically $ modifyTVar' conns $ -- this per-connection state will be observable from outside -- everybody can send messages to this connection without a client to "request" them IntMap.insert connId (me, writeQ) pure (connId, writeQ, 0 :: Int) -- with writeQ the request handlers can use async replies too -- use the global and local state for cleanup ops teardown conns _conn (connId, _writeQ, _counter) = atomically $ modifyTVar' conns $ IntMap.delete connId handler connState@(connId, writeQ, counter) msg = do putStrLn $ "Server got " <> show msg case msg of (Just (rid :: Int), Hello) -> do -- this server replies later... void $ forkIO do threadDelay 1000000 -- with the writeQ being accessible it is possible to send messages at any time atomically $ writeTBQueue writeQ (Just rid, Ok counter) -- ... but moves to process the next message ASAP let connState' = (connId, writeQ, counter + 1) -- update one part, keep the rest pure (connState', Nothing) _ -> -- ignore casted Hellos and any form of Bye pure (connState, Nothing)