{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE DerivingStrategies #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE ScopedTypeVariables #-}

{- | Socket utilities. -}
module OM.Socket (
  -- * Socket Addresses
  AddressDescription(..),
  resolveAddr,

  -- * Ingress-only sockets
  openIngress,

  -- * Egress-only sockets
  openEgress,

  -- * Bidirection request/resposne servers.
  openServer,
  Responded,
  connectServer,
) where


import Control.Concurrent (MVar, forkIO, newEmptyMVar, putMVar, takeMVar)
import Control.Concurrent.STM (TVar, atomically, newTVarIO, readTVar,
  retry, writeTVar)
import Control.Exception (SomeException, bracketOnError, throw)
import Control.Monad (when)
import Control.Monad.Catch (MonadThrow(throwM), MonadCatch, try)
import Control.Monad.IO.Unlift (MonadUnliftIO)
import Control.Monad.Logger.CallStack (LoggingT(runLoggingT),
  MonadLoggerIO(askLoggerIO), NoLoggingT(runNoLoggingT), MonadLogger,
  logDebug, logError, logWarn)
import Data.Aeson (FromJSON, FromJSONKey, ToJSON, ToJSONKey)
import Data.Binary (Binary(get), encode)
import Data.Binary.Get (Decoder(Done, Fail, Partial), pushChunk,
  runGetIncremental)
import Data.ByteString (ByteString)
import Data.Foldable (traverse_)
import Data.Map (Map)
import Data.String (IsString)
import Data.Text (Text)
import Data.Void (Void)
import Data.Word (Word32)
import GHC.Generics (Generic)
import Network.Socket (AddrInfo(addrAddress), Family(AF_INET, AF_INET6,
  AF_UNIX), SockAddr(SockAddrInet, SockAddrInet6, SockAddrUnix),
  SocketOption(ReuseAddr), SocketType(Stream), HostName, ServiceName,
  Socket, accept, bind, close, connect, defaultProtocol, getAddrInfo,
  listen, setSocketOption, socket)
import Network.Socket.ByteString (recv)
import Network.Socket.ByteString.Lazy (sendAll)
import Network.TLS (ClientParams, Context, ServerParams, contextNew,
  handshake, recvData, sendData)
import OM.Fork (Race, race)
import OM.Show (showt)
import Prelude (Applicative(pure), Bool(False, True), Bounded(minBound),
  Either(Left, Right), Enum(succ), Eq((/=)), Functor(fmap), Maybe(Just,
  Nothing), Monad((>>), (>>=), return), MonadFail(fail), Monoid(mempty),
  Semigroup((<>)), Show(show), ($), (++), (.), (=<<), IO, Num, Ord,
  String, flip, snd, userError)
import Streaming (Alternative((<|>)), MFunctor(hoist), MonadIO(liftIO),
  MonadTrans(lift), Of, Stream, join, void)
import Streaming.Binary (decoded)
import Streaming.ByteString (ByteStream, reread)
import Text.Megaparsec (MonadParsec(eof), Parsec, many, oneOf, parse,
  satisfy)
import Text.Megaparsec.Char (char)
import qualified Data.ByteString as BS
import qualified Data.ByteString.Lazy as BSL
import qualified Data.Map as Map
import qualified Data.Text as T
import qualified Streaming.Prelude as S
import qualified Text.Megaparsec as M


{-|
  Opens an "ingress" socket, which is a socket that accepts a stream
  of messages without responding. In particular, we listen on a socket,
  accepting new connections, an each connection concurrently reads its
  elements off the socket and pushes them onto the stream.
-}
openIngress
  :: forall i m never_returns.
     ( Binary i
     , MonadFail m
     , MonadIO m
     , Race
     )
  => AddressDescription
  -> Stream (Of i) m never_returns
openIngress :: forall i (m :: * -> *) never_returns.
(Binary i, MonadFail m, MonadIO m, Race) =>
AddressDescription -> Stream (Of i) m never_returns
openIngress AddressDescription
bindAddr = do
    Socket
so <- SockAddr -> Stream (Of i) m Socket
forall (m :: * -> *). MonadIO m => SockAddr -> m Socket
listenSocket (SockAddr -> Stream (Of i) m Socket)
-> Stream (Of i) m SockAddr -> Stream (Of i) m Socket
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< AddressDescription -> Stream (Of i) m SockAddr
forall (m :: * -> *).
(MonadIO m, MonadFail m) =>
AddressDescription -> m SockAddr
resolveAddr AddressDescription
bindAddr
    MVar i
mvar <- IO (MVar i) -> Stream (Of i) m (MVar i)
forall a. IO a -> Stream (Of i) m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (MVar i)
forall a. IO (MVar a)
newEmptyMVar
    IO () -> Stream (Of i) m ()
forall a. IO a -> Stream (Of i) m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
      (IO () -> Stream (Of i) m ())
-> (IO () -> IO ()) -> IO () -> Stream (Of i) m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. NoLoggingT IO () -> IO ()
forall (m :: * -> *) a. NoLoggingT m a -> m a
runNoLoggingT
      (NoLoggingT IO () -> IO ())
-> (IO () -> NoLoggingT IO ()) -> IO () -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ProcessName -> NoLoggingT IO () -> NoLoggingT IO ()
forall (m :: * -> *) a.
(MonadCatch m, MonadLogger m, MonadUnliftIO m, Race) =>
ProcessName -> m a -> m ()
race ProcessName
"ingress accept loop"
      (NoLoggingT IO () -> NoLoggingT IO ())
-> (IO () -> NoLoggingT IO ()) -> IO () -> NoLoggingT IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO () -> NoLoggingT IO ()
forall a. IO a -> NoLoggingT IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
      (IO () -> Stream (Of i) m ()) -> IO () -> Stream (Of i) m ()
forall a b. (a -> b) -> a -> b
$ Socket -> MVar i -> IO ()
acceptLoop Socket
so MVar i
mvar
    MVar i -> Stream (Of i) m never_returns
forall (m :: * -> *) i never_returns.
MonadIO m =>
MVar i -> Stream (Of i) m never_returns
mvarToStream MVar i
mvar
  where
    acceptLoop :: Socket -> MVar i -> IO ()
    acceptLoop :: Socket -> MVar i -> IO ()
acceptLoop Socket
so MVar i
mvar = do
      (Socket
conn, SockAddr
_) <- Socket -> IO (Socket, SockAddr)
accept Socket
so
      IO ThreadId -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO ThreadId -> IO ()) -> (IO () -> IO ThreadId) -> IO () -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO () -> IO ThreadId
forkIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Decoder i -> Socket -> MVar i -> IO ()
feed (Get i -> Decoder i
forall a. Get a -> Decoder a
runGetIncremental Get i
forall t. Binary t => Get t
get) Socket
conn MVar i
mvar
      Socket -> MVar i -> IO ()
acceptLoop Socket
so MVar i
mvar

    feed
      :: Decoder i
      -> Socket
      -> MVar i
      -> IO ()
    feed :: Decoder i -> Socket -> MVar i -> IO ()
feed (Done ByteString
leftover ByteOffset
_ i
i) Socket
conn MVar i
mvar = do
      MVar i -> i -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar i
mvar i
i
      Decoder i -> Socket -> MVar i -> IO ()
feed (Get i -> Decoder i
forall a. Get a -> Decoder a
runGetIncremental Get i
forall t. Binary t => Get t
get Decoder i -> ByteString -> Decoder i
forall a. Decoder a -> ByteString -> Decoder a
`pushChunk` ByteString
leftover) Socket
conn MVar i
mvar
    feed (Partial Maybe ByteString -> Decoder i
k) Socket
conn MVar i
mvar = do
      ByteString
bytes <- Socket -> Int -> IO ByteString
recv Socket
conn Int
4096
      Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (ByteString -> Bool
BS.null ByteString
bytes) ([Char] -> IO ()
forall a. [Char] -> IO a
forall (m :: * -> *) a. MonadFail m => [Char] -> m a
fail [Char]
"Socket closed by peer.")
      Decoder i -> Socket -> MVar i -> IO ()
feed (Maybe ByteString -> Decoder i
k (ByteString -> Maybe ByteString
forall a. a -> Maybe a
Just ByteString
bytes)) Socket
conn MVar i
mvar
    feed (Fail ByteString
_ ByteOffset
_ [Char]
err) Socket
_conn MVar i
_chan =
      [Char] -> IO ()
forall a. [Char] -> IO a
forall (m :: * -> *) a. MonadFail m => [Char] -> m a
fail ([Char] -> IO ()) -> [Char] -> IO ()
forall a b. (a -> b) -> a -> b
$ [Char]
"Socket crashed. Decoding error: " [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ [Char] -> [Char]
forall a. Show a => a -> [Char]
show [Char]
err


{- |
  Open an "egress" socket, which is a socket that sends a stream of messages
  without receiving responses.
-}
openEgress
  :: ( Binary o
     , MonadFail m
     , MonadIO m
     )
  => AddressDescription
  -> Stream (Of o) m r
  -> m r
openEgress :: forall o (m :: * -> *) r.
(Binary o, MonadFail m, MonadIO m) =>
AddressDescription -> Stream (Of o) m r -> m r
openEgress AddressDescription
addr Stream (Of o) m r
stream = do
  Socket
so <- SockAddr -> m Socket
forall (m :: * -> *). MonadIO m => SockAddr -> m Socket
connectSocket (SockAddr -> m Socket) -> m SockAddr -> m Socket
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< AddressDescription -> m SockAddr
forall (m :: * -> *).
(MonadIO m, MonadFail m) =>
AddressDescription -> m SockAddr
resolveAddr AddressDescription
addr
  r
result <-
    (o -> m ()) -> Stream (Of o) m r -> m r
forall (m :: * -> *) a x r.
Monad m =>
(a -> m x) -> Stream (Of a) m r -> m r
S.mapM_
      (IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> (o -> IO ()) -> o -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Socket -> ByteString -> IO ()
sendAll Socket
so (ByteString -> IO ()) -> (o -> ByteString) -> o -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. o -> ByteString
forall a. Binary a => a -> ByteString
encode)
      Stream (Of o) m r
stream
  IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Socket -> IO ()
close Socket
so)
  r -> m r
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure r
result


{- | Guess the family of a `SockAddr`. -}
fam :: SockAddr -> Family
fam :: SockAddr -> Family
fam SockAddrInet {} = Family
AF_INET
fam SockAddrInet6 {} = Family
AF_INET6
fam SockAddrUnix {} = Family
AF_UNIX


{- | Resolve a host:port address into a 'SockAddr'. -}
resolveAddr :: (MonadIO m, MonadFail m) => AddressDescription -> m SockAddr
resolveAddr :: forall (m :: * -> *).
(MonadIO m, MonadFail m) =>
AddressDescription -> m SockAddr
resolveAddr AddressDescription
addr = do
  ([Char]
host, [Char]
port) <- AddressDescription -> m ([Char], [Char])
forall (m :: * -> *).
MonadFail m =>
AddressDescription -> m ([Char], [Char])
parseAddr AddressDescription
addr
  IO [AddrInfo] -> m [AddrInfo]
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Maybe AddrInfo -> Maybe [Char] -> Maybe [Char] -> IO [AddrInfo]
getAddrInfo Maybe AddrInfo
forall a. Maybe a
Nothing ([Char] -> Maybe [Char]
forall a. a -> Maybe a
Just [Char]
host) ([Char] -> Maybe [Char]
forall a. a -> Maybe a
Just [Char]
port)) m [AddrInfo] -> ([AddrInfo] -> m SockAddr) -> m SockAddr
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
    [] -> [Char] -> m SockAddr
forall a. [Char] -> m a
forall (m :: * -> *) a. MonadFail m => [Char] -> m a
fail [Char]
"Address not found: (host, port)"
    AddrInfo
sa:[AddrInfo]
_ -> SockAddr -> m SockAddr
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (AddrInfo -> SockAddr
addrAddress AddrInfo
sa)


{- | Parse a host:port address. -}
parseAddr :: (MonadFail m) => AddressDescription -> m (HostName, ServiceName)
parseAddr :: forall (m :: * -> *).
MonadFail m =>
AddressDescription -> m ([Char], [Char])
parseAddr AddressDescription
addr =
    case Parsec Void Text ([Char], [Char])
-> [Char]
-> Text
-> Either (ParseErrorBundle Text Void) ([Char], [Char])
forall e s a.
Parsec e s a -> [Char] -> s -> Either (ParseErrorBundle s e) a
parse Parsec Void Text ([Char], [Char])
parser [Char]
"$" (AddressDescription -> Text
unAddressDescription AddressDescription
addr) of
      Left ParseErrorBundle Text Void
err -> [Char] -> m ([Char], [Char])
forall a. [Char] -> m a
forall (m :: * -> *) a. MonadFail m => [Char] -> m a
fail (ParseErrorBundle Text Void -> [Char]
forall a. Show a => a -> [Char]
show ParseErrorBundle Text Void
err)
      Right ([Char]
host, [Char]
port) -> ([Char], [Char]) -> m ([Char], [Char])
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return ([Char]
host, [Char]
port)
  where
    parser :: Parsec Void Text (HostName, ServiceName)
    parser :: Parsec Void Text ([Char], [Char])
parser = do
      [Char]
host <- ParsecT Void Text Identity [Char]
-> ParsecT Void Text Identity [Char]
forall a.
ParsecT Void Text Identity a -> ParsecT Void Text Identity a
forall e s (m :: * -> *) a. MonadParsec e s m => m a -> m a
M.try ParsecT Void Text Identity [Char]
ipv6 ParsecT Void Text Identity [Char]
-> ParsecT Void Text Identity [Char]
-> ParsecT Void Text Identity [Char]
forall a.
ParsecT Void Text Identity a
-> ParsecT Void Text Identity a -> ParsecT Void Text Identity a
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> ParsecT Void Text Identity [Char]
ipv4
      ParsecT Void Text Identity Char -> ParsecT Void Text Identity ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (ParsecT Void Text Identity Char -> ParsecT Void Text Identity ())
-> ParsecT Void Text Identity Char -> ParsecT Void Text Identity ()
forall a b. (a -> b) -> a -> b
$ Token Text -> ParsecT Void Text Identity (Token Text)
forall e s (m :: * -> *).
(MonadParsec e s m, Token s ~ Char) =>
Token s -> m (Token s)
char Char
Token Text
':'
      [Char]
port <- ParsecT Void Text Identity Char
-> ParsecT Void Text Identity [Char]
forall (m :: * -> *) a. MonadPlus m => m a -> m [a]
many ([Token Text] -> ParsecT Void Text Identity (Token Text)
forall (f :: * -> *) e s (m :: * -> *).
(Foldable f, MonadParsec e s m) =>
f (Token s) -> m (Token s)
oneOf ([Char]
"0123456789" :: String))
      ParsecT Void Text Identity ()
forall e s (m :: * -> *). MonadParsec e s m => m ()
eof
      ([Char], [Char]) -> Parsec Void Text ([Char], [Char])
forall a. a -> ParsecT Void Text Identity a
forall (m :: * -> *) a. Monad m => a -> m a
return ([Char]
host, [Char]
port)

    ipv6 :: Parsec Void Text HostName
    ipv6 :: ParsecT Void Text Identity [Char]
ipv6 = do
      ParsecT Void Text Identity Char -> ParsecT Void Text Identity ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (ParsecT Void Text Identity Char -> ParsecT Void Text Identity ())
-> ParsecT Void Text Identity Char -> ParsecT Void Text Identity ()
forall a b. (a -> b) -> a -> b
$ Token Text -> ParsecT Void Text Identity (Token Text)
forall e s (m :: * -> *).
(MonadParsec e s m, Token s ~ Char) =>
Token s -> m (Token s)
char Char
Token Text
'['
      [Char]
host <- ParsecT Void Text Identity Char
-> ParsecT Void Text Identity [Char]
forall (m :: * -> *) a. MonadPlus m => m a -> m [a]
many ((Token Text -> Bool) -> ParsecT Void Text Identity (Token Text)
forall e s (m :: * -> *).
MonadParsec e s m =>
(Token s -> Bool) -> m (Token s)
satisfy (Token Text -> Token Text -> Bool
forall a. Eq a => a -> a -> Bool
/= Char
Token Text
']'))
      ParsecT Void Text Identity Char -> ParsecT Void Text Identity ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (ParsecT Void Text Identity Char -> ParsecT Void Text Identity ())
-> ParsecT Void Text Identity Char -> ParsecT Void Text Identity ()
forall a b. (a -> b) -> a -> b
$ Token Text -> ParsecT Void Text Identity (Token Text)
forall e s (m :: * -> *).
(MonadParsec e s m, Token s ~ Char) =>
Token s -> m (Token s)
char Char
Token Text
']'
      [Char] -> ParsecT Void Text Identity [Char]
forall a. a -> ParsecT Void Text Identity a
forall (m :: * -> *) a. Monad m => a -> m a
return [Char]
host

    ipv4 :: Parsec Void Text HostName
    ipv4 :: ParsecT Void Text Identity [Char]
ipv4 = ParsecT Void Text Identity Char
-> ParsecT Void Text Identity [Char]
forall (m :: * -> *) a. MonadPlus m => m a -> m [a]
many ((Token Text -> Bool) -> ParsecT Void Text Identity (Token Text)
forall e s (m :: * -> *).
MonadParsec e s m =>
(Token s -> Bool) -> m (Token s)
satisfy (Token Text -> Token Text -> Bool
forall a. Eq a => a -> a -> Bool
/= Char
Token Text
':'))


{- | Create a connected socket. -}
connectSocket :: (MonadIO m) => SockAddr -> m Socket
connectSocket :: forall (m :: * -> *). MonadIO m => SockAddr -> m Socket
connectSocket SockAddr
addr = IO Socket -> m Socket
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Socket -> m Socket) -> IO Socket -> m Socket
forall a b. (a -> b) -> a -> b
$
  {-
    Make sure to close the socket if an error happens during
    connection, because if not, we could easily run out of file
    descriptors in the case where we rapidly try to send thousands
    of message to the same peer, which could happen when one object
    is a hotspot.
  -}
  IO Socket
-> (Socket -> IO ()) -> (Socket -> IO Socket) -> IO Socket
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracketOnError
    (Family -> SocketType -> ProtocolNumber -> IO Socket
socket (SockAddr -> Family
fam SockAddr
addr) SocketType
Stream ProtocolNumber
defaultProtocol)
    Socket -> IO ()
close
    (\Socket
so -> Socket -> SockAddr -> IO ()
connect Socket
so SockAddr
addr IO () -> IO Socket -> IO Socket
forall a b. IO a -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Socket -> IO Socket
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Socket
so)


{- | Create a listening socket. -}
listenSocket :: (MonadIO m) => SockAddr -> m Socket
listenSocket :: forall (m :: * -> *). MonadIO m => SockAddr -> m Socket
listenSocket SockAddr
addr = IO Socket -> m Socket
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Socket -> m Socket) -> IO Socket -> m Socket
forall a b. (a -> b) -> a -> b
$ do
  Socket
so <- Family -> SocketType -> ProtocolNumber -> IO Socket
socket (SockAddr -> Family
fam SockAddr
addr) SocketType
Stream ProtocolNumber
defaultProtocol
  Socket -> SocketOption -> Int -> IO ()
setSocketOption Socket
so SocketOption
ReuseAddr Int
1
  Socket -> SockAddr -> IO ()
bind Socket
so SockAddr
addr
  Socket -> Int -> IO ()
listen Socket
so Int
5
  Socket -> IO Socket
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Socket
so


{- |
  Open a "server" socket, which is a socket that accepts incoming requests
  and provides a way to respond to those requests.
-}
openServer
  :: forall request response m never_returns.
     ( Binary request
     , Binary response
     , MonadLogger m
     , MonadCatch m
     , MonadFail m
     , MonadUnliftIO m
     , Race
     )
  => AddressDescription
  -> Maybe (IO ServerParams)
  -> Stream (Of (request, response -> m Responded)) m never_returns
openServer :: forall request response (m :: * -> *) never_returns.
(Binary request, Binary response, MonadLogger m, MonadCatch m,
 MonadFail m, MonadUnliftIO m, Race) =>
AddressDescription
-> Maybe (IO ServerParams)
-> Stream (Of (request, response -> m Responded)) m never_returns
openServer AddressDescription
bindAddr Maybe (IO ServerParams)
tls = do
    Socket
so <- SockAddr -> Stream (Of (request, response -> m Responded)) m Socket
forall (m :: * -> *). MonadIO m => SockAddr -> m Socket
listenSocket (SockAddr
 -> Stream (Of (request, response -> m Responded)) m Socket)
-> Stream (Of (request, response -> m Responded)) m SockAddr
-> Stream (Of (request, response -> m Responded)) m Socket
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< AddressDescription
-> Stream (Of (request, response -> m Responded)) m SockAddr
forall (m :: * -> *).
(MonadIO m, MonadFail m) =>
AddressDescription -> m SockAddr
resolveAddr AddressDescription
bindAddr
    MVar (request, response -> m Responded)
requestMVar <- IO (MVar (request, response -> m Responded))
-> Stream
     (Of (request, response -> m Responded))
     m
     (MVar (request, response -> m Responded))
forall a.
IO a -> Stream (Of (request, response -> m Responded)) m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (MVar (request, response -> m Responded))
forall a. IO (MVar a)
newEmptyMVar
    m () -> Stream (Of (request, response -> m Responded)) m ()
forall (m :: * -> *) a.
Monad m =>
m a -> Stream (Of (request, response -> m Responded)) m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift
      (m () -> Stream (Of (request, response -> m Responded)) m ())
-> (m Any -> m ())
-> m Any
-> Stream (Of (request, response -> m Responded)) m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ProcessName -> m Any -> m ()
forall (m :: * -> *) a.
(MonadCatch m, MonadLogger m, MonadUnliftIO m, Race) =>
ProcessName -> m a -> m ()
race ProcessName
"server accept loop"
      (m Any -> Stream (Of (request, response -> m Responded)) m ())
-> m Any -> Stream (Of (request, response -> m Responded)) m ()
forall a b. (a -> b) -> a -> b
$ Socket -> MVar (request, response -> m Responded) -> m Any
forall void.
Socket -> MVar (request, response -> m Responded) -> m void
acceptLoop Socket
so MVar (request, response -> m Responded)
requestMVar
    MVar (request, response -> m Responded)
-> Stream (Of (request, response -> m Responded)) m never_returns
forall (m :: * -> *) i never_returns.
MonadIO m =>
MVar i -> Stream (Of i) m never_returns
mvarToStream MVar (request, response -> m Responded)
requestMVar
  where
    acceptLoop
      :: Socket
      -> MVar (request, response -> m Responded)
      -> m void
    acceptLoop :: forall void.
Socket -> MVar (request, response -> m Responded) -> m void
acceptLoop Socket
so MVar (request, response -> m Responded)
requestMVar = do
      (Socket
conn, SockAddr
ra) <- IO (Socket, SockAddr) -> m (Socket, SockAddr)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Socket -> IO (Socket, SockAddr)
accept Socket
so)
      Text -> m ()
forall (m :: * -> *). (HasCallStack, MonadLogger m) => Text -> m ()
logDebug (Text -> m ()) -> Text -> m ()
forall a b. (a -> b) -> a -> b
$ Text
"New connection: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<>  SockAddr -> Text
forall a b. (Show a, IsString b) => a -> b
showt SockAddr
ra
      (ByteStream IO ()
input, ByteString -> m ()
send) <- Socket -> m (ByteStream IO (), ByteString -> m ())
prepareConnection Socket
conn
      m ThreadId -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m ThreadId -> m ()) -> (IO () -> m ThreadId) -> IO () -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO ThreadId -> m ThreadId
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ThreadId -> m ThreadId)
-> (IO () -> IO ThreadId) -> IO () -> m ThreadId
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO () -> IO ThreadId
forkIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ ByteStream IO ()
-> (ByteString -> m ())
-> MVar (request, response -> m Responded)
-> IO ()
handleConnection ByteStream IO ()
input ByteString -> m ()
send MVar (request, response -> m Responded)
requestMVar
      Socket -> MVar (request, response -> m Responded) -> m void
forall void.
Socket -> MVar (request, response -> m Responded) -> m void
acceptLoop Socket
so MVar (request, response -> m Responded)
requestMVar

    handleConnection
      :: ByteStream IO () {-^ raw bytes input from the socket -}
      -> (BSL.ByteString -> m ()) {-^ How to send bytes back -}
      -> MVar (request, response -> m Responded)
         {-^ how we stream (req, respond) tuples to the client code. -}
      -> IO ()
    handleConnection :: ByteStream IO ()
-> (ByteString -> m ())
-> MVar (request, response -> m Responded)
-> IO ()
handleConnection ByteStream IO ()
input ByteString -> m ()
send MVar (request, response -> m Responded)
requestMVar =
        IO (ByteStream IO (), ByteOffset, Either [Char] ()) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (ByteStream IO (), ByteOffset, Either [Char] ()) -> IO ())
-> IO (ByteStream IO (), ByteOffset, Either [Char] ()) -> IO ()
forall a b. (a -> b) -> a -> b
$
          (Request request -> IO ())
-> Stream
     (Of (Request request))
     IO
     (ByteStream IO (), ByteOffset, Either [Char] ())
-> IO (ByteStream IO (), ByteOffset, Either [Char] ())
forall (m :: * -> *) a x r.
Monad m =>
(a -> m x) -> Stream (Of a) m r -> m r
S.mapM_
            Request request -> IO ()
sendRequestToMVar
            (ByteStream IO ()
-> Stream
     (Of (Request request))
     IO
     (ByteStream IO (), ByteOffset, Either [Char] ())
forall a (m :: * -> *) r.
(Binary a, Monad m) =>
ByteString m r
-> Stream (Of a) m (ByteString m r, ByteOffset, Either [Char] r)
decoded ByteStream IO ()
input)
      where
        sendRequestToMVar :: Request request -> IO ()
        sendRequestToMVar :: Request request -> IO ()
sendRequestToMVar Request { MessageId
messageId :: MessageId
messageId :: forall p. Request p -> MessageId
messageId , request
payload :: request
payload :: forall p. Request p -> p
payload } =
          MVar (request, response -> m Responded)
-> (request, response -> m Responded) -> IO ()
forall a. MVar a -> a -> IO ()
putMVar
            MVar (request, response -> m Responded)
requestMVar
            ( request
payload
            , MessageId -> response -> m Responded
respond MessageId
messageId
            )

        respond :: MessageId -> response -> m Responded
        respond :: MessageId -> response -> m Responded
respond MessageId
responseTo response
response = do
          ByteString -> m ()
send (ByteString -> m ())
-> (Response response -> ByteString) -> Response response -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Response response -> ByteString
forall a. Binary a => a -> ByteString
encode (Response response -> m ()) -> Response response -> m ()
forall a b. (a -> b) -> a -> b
$ Response { MessageId
responseTo :: MessageId
responseTo :: MessageId
responseTo , response
response :: response
response :: response
response }
          Responded -> m Responded
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Responded
Responded

    {- Maybe make a TLS connection. -}
    prepareConnection
      :: Socket
      -> m
          ( ByteStream IO ()
          , BSL.ByteString -> m ()
          )
    prepareConnection :: Socket -> m (ByteStream IO (), ByteString -> m ())
prepareConnection Socket
conn =
      case Maybe (IO ServerParams)
tls of
        Maybe (IO ServerParams)
Nothing ->
          (ByteStream IO (), ByteString -> m ())
-> m (ByteStream IO (), ByteString -> m ())
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure
            ( (Socket -> IO ByteString) -> Socket -> ByteStream IO ()
forall (m :: * -> *) c.
Monad m =>
(c -> m ByteString) -> c -> ByteStream m ()
rereadNull
                ((Socket -> Int -> IO ByteString) -> Int -> Socket -> IO ByteString
forall a b c. (a -> b -> c) -> b -> a -> c
flip Socket -> Int -> IO ByteString
recv Int
4096)
                Socket
conn
            , IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> (ByteString -> IO ()) -> ByteString -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Socket -> ByteString -> IO ()
sendAll Socket
conn
            )
        Just IO ServerParams
getParams ->
          IO (ByteStream IO (), ByteString -> m ())
-> m (ByteStream IO (), ByteString -> m ())
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (ByteStream IO (), ByteString -> m ())
 -> m (ByteStream IO (), ByteString -> m ()))
-> IO (ByteStream IO (), ByteString -> m ())
-> m (ByteStream IO (), ByteString -> m ())
forall a b. (a -> b) -> a -> b
$ do
            Context
ctx <- Socket -> ServerParams -> IO Context
forall (m :: * -> *) backend params.
(MonadIO m, HasBackend backend, TLSParams params) =>
backend -> params -> m Context
contextNew Socket
conn (ServerParams -> IO Context) -> IO ServerParams -> IO Context
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< IO ServerParams
getParams
            Context -> IO ()
forall (m :: * -> *). MonadIO m => Context -> m ()
handshake Context
ctx
            (ByteStream IO (), ByteString -> m ())
-> IO (ByteStream IO (), ByteString -> m ())
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure
              ( (Context -> IO ByteString) -> Context -> ByteStream IO ()
forall (m :: * -> *) c.
Monad m =>
(c -> m ByteString) -> c -> ByteStream m ()
rereadNull Context -> IO ByteString
forall (m :: * -> *). MonadIO m => Context -> m ByteString
recvData Context
ctx
              , Context -> ByteString -> m ()
forall (m :: * -> *). MonadIO m => Context -> ByteString -> m ()
sendData Context
ctx
              )


{- |
  Connect to a server. Returns a function in 'MonadIO' that can be used
  to submit requests to (and returns the corresponding response from)
  the server.
-}
connectServer
  :: forall n request m response.
     ( Binary request
     , Binary response
     , MonadIO m
     , MonadLoggerIO n
     , Show response
     )
  => AddressDescription
  -> Maybe ClientParams
  -> n (request -> m response)
connectServer :: forall (n :: * -> *) request (m :: * -> *) response.
(Binary request, Binary response, MonadIO m, MonadLoggerIO n,
 Show response) =>
AddressDescription
-> Maybe ClientParams -> n (request -> m response)
connectServer AddressDescription
addr Maybe ClientParams
tls = do
    Loc -> Text -> LogLevel -> LogStr -> IO ()
logging <- n (Loc -> Text -> LogLevel -> LogStr -> IO ())
forall (m :: * -> *).
MonadLoggerIO m =>
m (Loc -> Text -> LogLevel -> LogStr -> IO ())
askLoggerIO
    IO (request -> m response) -> n (request -> m response)
forall a. IO a -> n a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (request -> m response) -> n (request -> m response))
-> IO (request -> m response) -> n (request -> m response)
forall a b. (a -> b) -> a -> b
$ do
      Socket
so <- SockAddr -> IO Socket
forall (m :: * -> *). MonadIO m => SockAddr -> m Socket
connectSocket (SockAddr -> IO Socket) -> IO SockAddr -> IO Socket
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< AddressDescription -> IO SockAddr
forall (m :: * -> *).
(MonadIO m, MonadFail m) =>
AddressDescription -> m SockAddr
resolveAddr AddressDescription
addr
      TVar (ClientState request response)
state <-
        ClientState request response
-> IO (TVar (ClientState request response))
forall a. a -> IO (TVar a)
newTVarIO
          ClientState
            { csServerAlive :: Bool
csServerAlive = Bool
True
            , csResponders :: Map MessageId (response -> IO ())
csResponders = Map MessageId (response -> IO ())
forall k a. Map k a
Map.empty
            , csMessageId :: MessageId
csMessageId = MessageId
forall a. Bounded a => a
minBound
            , csRequestQueue :: [(request, response -> IO ())]
csRequestQueue = []
            }
      (ByteString -> IO ()
send, ByteStream IO ()
responseSource) <- Socket -> IO (ByteString -> IO (), ByteStream IO ())
prepareConnection Socket
so
      IO ThreadId -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO ThreadId -> IO ()) -> (IO () -> IO ThreadId) -> IO () -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO () -> IO ThreadId
forkIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ (LoggingT IO ()
-> (Loc -> Text -> LogLevel -> LogStr -> IO ()) -> IO ()
forall (m :: * -> *) a.
LoggingT m a -> (Loc -> Text -> LogLevel -> LogStr -> IO ()) -> m a
`runLoggingT` Loc -> Text -> LogLevel -> LogStr -> IO ()
logging) ((ByteString -> IO ())
-> TVar (ClientState request response) -> LoggingT IO ()
requestThread ByteString -> IO ()
send TVar (ClientState request response)
state)
      IO ThreadId -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO ThreadId -> IO ()) -> (IO () -> IO ThreadId) -> IO () -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO () -> IO ThreadId
forkIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ (LoggingT IO ()
-> (Loc -> Text -> LogLevel -> LogStr -> IO ()) -> IO ()
forall (m :: * -> *) a.
LoggingT m a -> (Loc -> Text -> LogLevel -> LogStr -> IO ()) -> m a
`runLoggingT` Loc -> Text -> LogLevel -> LogStr -> IO ()
logging) (ByteStream IO ()
-> TVar (ClientState request response) -> LoggingT IO ()
responseThread ByteStream IO ()
responseSource TVar (ClientState request response)
state)
      (request -> m response) -> IO (request -> m response)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (\request
i -> IO response -> m response
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO response -> m response) -> IO response -> m response
forall a b. (a -> b) -> a -> b
$ do
          MVar response
mvar <- IO (MVar response)
forall a. IO (MVar a)
newEmptyMVar
          IO (IO response) -> IO response
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (IO (IO response) -> IO response)
-> (STM (IO response) -> IO (IO response))
-> STM (IO response)
-> IO response
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM (IO response) -> IO (IO response)
forall a. STM a -> IO a
atomically (STM (IO response) -> IO response)
-> STM (IO response) -> IO response
forall a b. (a -> b) -> a -> b
$
            TVar (ClientState request response)
-> STM (ClientState request response)
forall a. TVar a -> STM a
readTVar TVar (ClientState request response)
state STM (ClientState request response)
-> (ClientState request response -> STM (IO response))
-> STM (IO response)
forall a b. STM a -> (a -> STM b) -> STM b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
              ClientState {csServerAlive :: forall i o. ClientState i o -> Bool
csServerAlive = Bool
False} -> IO response -> STM (IO response)
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return (IO response -> STM (IO response))
-> IO response -> STM (IO response)
forall a b. (a -> b) -> a -> b
$
                IOError -> IO response
forall e a. (HasCallStack, Exception e) => e -> IO a
forall (m :: * -> *) e a.
(MonadThrow m, HasCallStack, Exception e) =>
e -> m a
throwM ([Char] -> IOError
userError [Char]
"Server connection died.")
              s :: ClientState request response
s@ClientState {[(request, response -> IO ())]
csRequestQueue :: forall i o. ClientState i o -> [(i, o -> IO ())]
csRequestQueue :: [(request, response -> IO ())]
csRequestQueue} -> do
                TVar (ClientState request response)
-> ClientState request response -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (ClientState request response)
state ClientState request response
s {
                    csRequestQueue = csRequestQueue <> [(i, putMVar mvar)]
                  }
                IO response -> STM (IO response)
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return (MVar response -> IO response
forall a. MVar a -> IO a
takeMVar MVar response
mvar)
        )
  where
    {- |
      Returns the (output, input) communication channels, either prepared
      for TSL or not depending on the configuration.
    -}
    prepareConnection
      :: Socket
      -> IO
           ( BSL.ByteString -> IO ()
           , ByteStream IO ()
           )
    prepareConnection :: Socket -> IO (ByteString -> IO (), ByteStream IO ())
prepareConnection Socket
so =
        case Maybe ClientParams
tls of
          Maybe ClientParams
Nothing ->
            (ByteString -> IO (), ByteStream IO ())
-> IO (ByteString -> IO (), ByteStream IO ())
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure
              ( Socket -> ByteString -> IO ()
sendAll Socket
so
              , (Socket -> IO ByteString) -> Socket -> ByteStream IO ()
forall (m :: * -> *) c.
Monad m =>
(c -> m ByteString) -> c -> ByteStream m ()
rereadNull
                  ((Socket -> Int -> IO ByteString) -> Int -> Socket -> IO ByteString
forall a b c. (a -> b -> c) -> b -> a -> c
flip Socket -> Int -> IO ByteString
recv Int
4096)
                  Socket
so
              )
          Just ClientParams
params -> do
            Context
ctx <- Socket -> ClientParams -> IO Context
forall (m :: * -> *) backend params.
(MonadIO m, HasBackend backend, TLSParams params) =>
backend -> params -> m Context
contextNew Socket
so ClientParams
params
            Context -> IO ()
forall (m :: * -> *). MonadIO m => Context -> m ()
handshake Context
ctx
            (ByteString -> IO (), ByteStream IO ())
-> IO (ByteString -> IO (), ByteStream IO ())
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Context -> ByteString -> IO ()
send Context
ctx, Context -> ByteStream IO ()
resSource Context
ctx)
      where
        send :: Context -> BSL.ByteString -> IO ()
        send :: Context -> ByteString -> IO ()
send = Context -> ByteString -> IO ()
forall (m :: * -> *). MonadIO m => Context -> ByteString -> m ()
sendData

        resSource
          :: Context
          -> ByteStream IO ()
        resSource :: Context -> ByteStream IO ()
resSource = do
          (Context -> IO ByteString) -> Context -> ByteStream IO ()
forall (m :: * -> *) c.
Monad m =>
(c -> m ByteString) -> c -> ByteStream m ()
rereadNull Context -> IO ByteString
forall (m :: * -> *). MonadIO m => Context -> m ByteString
recvData

    {- |
      Receive requests from the client request function and send them
      to the server.
    -}
    requestThread
      :: (BSL.ByteString -> IO ())
      -> TVar (ClientState request response)
      -> LoggingT IO ()
    requestThread :: (ByteString -> IO ())
-> TVar (ClientState request response) -> LoggingT IO ()
requestThread ByteString -> IO ()
send TVar (ClientState request response)
state =
      LoggingT IO (LoggingT IO ()) -> LoggingT IO ()
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (LoggingT IO (LoggingT IO ()) -> LoggingT IO ())
-> (STM (LoggingT IO ()) -> LoggingT IO (LoggingT IO ()))
-> STM (LoggingT IO ())
-> LoggingT IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO (LoggingT IO ()) -> LoggingT IO (LoggingT IO ())
forall a. IO a -> LoggingT IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (LoggingT IO ()) -> LoggingT IO (LoggingT IO ()))
-> (STM (LoggingT IO ()) -> IO (LoggingT IO ()))
-> STM (LoggingT IO ())
-> LoggingT IO (LoggingT IO ())
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM (LoggingT IO ()) -> IO (LoggingT IO ())
forall a. STM a -> IO a
atomically (STM (LoggingT IO ()) -> LoggingT IO ())
-> STM (LoggingT IO ()) -> LoggingT IO ()
forall a b. (a -> b) -> a -> b
$
        TVar (ClientState request response)
-> STM (ClientState request response)
forall a. TVar a -> STM a
readTVar TVar (ClientState request response)
state STM (ClientState request response)
-> (ClientState request response -> STM (LoggingT IO ()))
-> STM (LoggingT IO ())
forall a b. STM a -> (a -> STM b) -> STM b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
          ClientState {csServerAlive :: forall i o. ClientState i o -> Bool
csServerAlive = Bool
False} -> LoggingT IO () -> STM (LoggingT IO ())
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (() -> LoggingT IO ()
forall a. a -> LoggingT IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ())
          ClientState {csRequestQueue :: forall i o. ClientState i o -> [(i, o -> IO ())]
csRequestQueue = []} -> STM (LoggingT IO ())
forall a. STM a
retry
          s :: ClientState request response
s@ClientState {
                csRequestQueue :: forall i o. ClientState i o -> [(i, o -> IO ())]
csRequestQueue = (request
m, response -> IO ()
r):[(request, response -> IO ())]
remaining,
                Map MessageId (response -> IO ())
csResponders :: forall i o. ClientState i o -> Map MessageId (o -> IO ())
csResponders :: Map MessageId (response -> IO ())
csResponders,
                MessageId
csMessageId :: forall i o. ClientState i o -> MessageId
csMessageId :: MessageId
csMessageId
              }
            -> do
              TVar (ClientState request response)
-> ClientState request response -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (ClientState request response)
state ClientState request response
s {
                  csRequestQueue = remaining,
                  csResponders = Map.insert csMessageId r csResponders,
                  csMessageId = succ csMessageId
                }
              LoggingT IO () -> STM (LoggingT IO ())
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (LoggingT IO () -> STM (LoggingT IO ()))
-> LoggingT IO () -> STM (LoggingT IO ())
forall a b. (a -> b) -> a -> b
$ do
                IO () -> LoggingT IO ()
forall a. IO a -> LoggingT IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> LoggingT IO ()) -> IO () -> LoggingT IO ()
forall a b. (a -> b) -> a -> b
$ ByteString -> IO ()
send (Request request -> ByteString
forall a. Binary a => a -> ByteString
encode (MessageId -> request -> Request request
forall p. MessageId -> p -> Request p
Request MessageId
csMessageId request
m))
                (ByteString -> IO ())
-> TVar (ClientState request response) -> LoggingT IO ()
requestThread ByteString -> IO ()
send TVar (ClientState request response)
state

    {- |
      Receive responses from the server and send then them back to the
      client request function.
    -}
    responseThread
      :: ByteStream IO ()
      -> TVar (ClientState request response)
      -> LoggingT IO ()
    responseThread :: ByteStream IO ()
-> TVar (ClientState request response) -> LoggingT IO ()
responseThread ByteStream IO ()
resSource TVar (ClientState request response)
stateT = do
        LoggingT IO () -> LoggingT IO (Either SomeException ())
forall (m :: * -> *) e a.
(HasCallStack, MonadCatch m, Exception e) =>
m a -> m (Either e a)
try
            (
              LoggingT IO (ByteStream IO (), ByteOffset, Either [Char] ())
-> LoggingT IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (LoggingT IO (ByteStream IO (), ByteOffset, Either [Char] ())
 -> LoggingT IO ())
-> LoggingT IO (ByteStream IO (), ByteOffset, Either [Char] ())
-> LoggingT IO ()
forall a b. (a -> b) -> a -> b
$
                (Response response -> LoggingT IO ())
-> Stream
     (Of (Response response))
     (LoggingT IO)
     (ByteStream IO (), ByteOffset, Either [Char] ())
-> LoggingT IO (ByteStream IO (), ByteOffset, Either [Char] ())
forall (m :: * -> *) a x r.
Monad m =>
(a -> m x) -> Stream (Of a) m r -> m r
S.mapM_
                  Response response -> LoggingT IO ()
handleResponse
                  ((forall a. IO a -> LoggingT IO a)
-> Stream
     (Of (Response response))
     IO
     (ByteStream IO (), ByteOffset, Either [Char] ())
-> Stream
     (Of (Response response))
     (LoggingT IO)
     (ByteStream IO (), ByteOffset, Either [Char] ())
forall {k} (t :: (* -> *) -> k -> *) (m :: * -> *) (n :: * -> *)
       (b :: k).
(MFunctor t, Monad m) =>
(forall a. m a -> n a) -> t m b -> t n b
forall (m :: * -> *) (n :: * -> *) b.
Monad m =>
(forall a. m a -> n a)
-> Stream (Of (Response response)) m b
-> Stream (Of (Response response)) n b
hoist IO a -> LoggingT IO a
forall a. IO a -> LoggingT IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (ByteStream IO ()
-> Stream
     (Of (Response response))
     IO
     (ByteStream IO (), ByteOffset, Either [Char] ())
forall a (m :: * -> *) r.
(Binary a, Monad m) =>
ByteString m r
-> Stream (Of a) m (ByteString m r, ByteOffset, Either [Char] r)
decoded ByteStream IO ()
resSource))
            )
          LoggingT IO (Either SomeException ())
-> (Either SomeException () -> LoggingT IO ()) -> LoggingT IO ()
forall a b. LoggingT IO a -> (a -> LoggingT IO b) -> LoggingT IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
            Left SomeException
err -> do
              Text -> LoggingT IO ()
forall (m :: * -> *). (HasCallStack, MonadLogger m) => Text -> m ()
logError
                (Text -> LoggingT IO ()) -> Text -> LoggingT IO ()
forall a b. (a -> b) -> a -> b
$ Text
"Socket receive error: "
                Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> SomeException -> Text
forall a b. (Show a, IsString b) => a -> b
showt (SomeException
err :: SomeException)
              SomeException -> LoggingT IO ()
forall a e. Exception e => e -> a
throw SomeException
err
            Right () ->
              () -> LoggingT IO ()
forall a. a -> LoggingT IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
        LoggingT IO ()
closeClientState
      where
        closeClientState :: LoggingT IO ()
closeClientState =
          LoggingT IO (LoggingT IO ()) -> LoggingT IO ()
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (LoggingT IO (LoggingT IO ()) -> LoggingT IO ())
-> (STM (LoggingT IO ()) -> LoggingT IO (LoggingT IO ()))
-> STM (LoggingT IO ())
-> LoggingT IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO (LoggingT IO ()) -> LoggingT IO (LoggingT IO ())
forall a. IO a -> LoggingT IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (LoggingT IO ()) -> LoggingT IO (LoggingT IO ()))
-> (STM (LoggingT IO ()) -> IO (LoggingT IO ()))
-> STM (LoggingT IO ())
-> LoggingT IO (LoggingT IO ())
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM (LoggingT IO ()) -> IO (LoggingT IO ())
forall a. STM a -> IO a
atomically (STM (LoggingT IO ()) -> LoggingT IO ())
-> STM (LoggingT IO ()) -> LoggingT IO ()
forall a b. (a -> b) -> a -> b
$ do
            ClientState request response
state <- TVar (ClientState request response)
-> STM (ClientState request response)
forall a. TVar a -> STM a
readTVar TVar (ClientState request response)
stateT
            TVar (ClientState request response)
-> ClientState request response -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (ClientState request response)
stateT ClientState request response
state
              { csServerAlive = False
              , csResponders = mempty
              , csRequestQueue = mempty
              }

            LoggingT IO () -> STM (LoggingT IO ())
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (LoggingT IO () -> STM (LoggingT IO ()))
-> ([IO ()] -> LoggingT IO ()) -> [IO ()] -> STM (LoggingT IO ())
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (IO () -> LoggingT IO ()) -> [IO ()] -> LoggingT IO ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ IO () -> LoggingT IO ()
forall a. IO a -> LoggingT IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO ([IO ()] -> STM (LoggingT IO ()))
-> [IO ()] -> STM (LoggingT IO ())
forall a b. (a -> b) -> a -> b
$
              [ response -> IO ()
respond (IOError -> response
forall a e. Exception e => e -> a
throw ([Char] -> IOError
userError [Char]
"Remote connection died."))
              | response -> IO ()
respond <-
                  Map MessageId (response -> IO ()) -> [response -> IO ()]
forall k a. Map k a -> [a]
Map.elems (ClientState request response -> Map MessageId (response -> IO ())
forall i o. ClientState i o -> Map MessageId (o -> IO ())
csResponders ClientState request response
state)
                  [response -> IO ()] -> [response -> IO ()] -> [response -> IO ()]
forall a. Semigroup a => a -> a -> a
<>  ((request, response -> IO ()) -> response -> IO ())
-> [(request, response -> IO ())] -> [response -> IO ()]
forall a b. (a -> b) -> [a] -> [b]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (request, response -> IO ()) -> response -> IO ()
forall a b. (a, b) -> b
snd (ClientState request response -> [(request, response -> IO ())]
forall i o. ClientState i o -> [(i, o -> IO ())]
csRequestQueue ClientState request response
state)
              ]

        handleResponse :: Response response -> LoggingT IO ()
        handleResponse :: Response response -> LoggingT IO ()
handleResponse
            responsePackage :: Response response
responsePackage@Response
              { MessageId
responseTo :: forall p. Response p -> MessageId
responseTo :: MessageId
responseTo
              , response
response :: forall p. Response p -> p
response :: response
response
              }
          = do
              LoggingT IO (LoggingT IO ()) -> LoggingT IO ()
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (LoggingT IO (LoggingT IO ()) -> LoggingT IO ())
-> (STM (LoggingT IO ()) -> LoggingT IO (LoggingT IO ()))
-> STM (LoggingT IO ())
-> LoggingT IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO (LoggingT IO ()) -> LoggingT IO (LoggingT IO ())
forall (m :: * -> *) a. Monad m => m a -> LoggingT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (IO (LoggingT IO ()) -> LoggingT IO (LoggingT IO ()))
-> (STM (LoggingT IO ()) -> IO (LoggingT IO ()))
-> STM (LoggingT IO ())
-> LoggingT IO (LoggingT IO ())
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM (LoggingT IO ()) -> IO (LoggingT IO ())
forall a. STM a -> IO a
atomically (STM (LoggingT IO ()) -> LoggingT IO ())
-> STM (LoggingT IO ()) -> LoggingT IO ()
forall a b. (a -> b) -> a -> b
$ do
                ClientState request response
state <- TVar (ClientState request response)
-> STM (ClientState request response)
forall a. TVar a -> STM a
readTVar TVar (ClientState request response)
stateT
                case MessageId
-> Map MessageId (response -> IO ())
-> Maybe (response -> IO (), Map MessageId (response -> IO ()))
forall k v. Ord k => k -> Map k v -> Maybe (v, Map k v)
deleteFind MessageId
responseTo (ClientState request response -> Map MessageId (response -> IO ())
forall i o. ClientState i o -> Map MessageId (o -> IO ())
csResponders ClientState request response
state) of
                  Maybe (response -> IO (), Map MessageId (response -> IO ()))
Nothing ->
                    LoggingT IO () -> STM (LoggingT IO ())
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (LoggingT IO () -> STM (LoggingT IO ()))
-> (Text -> LoggingT IO ()) -> Text -> STM (LoggingT IO ())
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> LoggingT IO ()
forall (m :: * -> *). (HasCallStack, MonadLogger m) => Text -> m ()
logWarn (Text -> STM (LoggingT IO ())) -> Text -> STM (LoggingT IO ())
forall a b. (a -> b) -> a -> b
$
                      Text
"Unexpected server response: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Response response -> Text
forall a b. (Show a, IsString b) => a -> b
showt Response response
responsePackage
                  Just (response -> IO ()
respond, Map MessageId (response -> IO ())
newResponders) -> do
                    TVar (ClientState request response)
-> ClientState request response -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (ClientState request response)
stateT ClientState request response
state {csResponders = newResponders}
                    LoggingT IO () -> STM (LoggingT IO ())
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (LoggingT IO () -> STM (LoggingT IO ()))
-> (IO () -> LoggingT IO ()) -> IO () -> STM (LoggingT IO ())
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO () -> LoggingT IO ()
forall (m :: * -> *) a. Monad m => m a -> LoggingT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (IO () -> STM (LoggingT IO ())) -> IO () -> STM (LoggingT IO ())
forall a b. (a -> b) -> a -> b
$ response -> IO ()
respond response
response


{- | A server endpoint configuration. -}
data Endpoint = Endpoint {
    Endpoint -> AddressDescription
bindAddr :: AddressDescription,
         Endpoint -> Maybe (IO ServerParams)
tls :: Maybe (IO ServerParams)
  }
  deriving stock ((forall x. Endpoint -> Rep Endpoint x)
-> (forall x. Rep Endpoint x -> Endpoint) -> Generic Endpoint
forall x. Rep Endpoint x -> Endpoint
forall x. Endpoint -> Rep Endpoint x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cfrom :: forall x. Endpoint -> Rep Endpoint x
from :: forall x. Endpoint -> Rep Endpoint x
$cto :: forall x. Rep Endpoint x -> Endpoint
to :: forall x. Rep Endpoint x -> Endpoint
Generic)


{- | Response to a request. -}
data Response p = Response
  { forall p. Response p -> MessageId
responseTo :: MessageId
  ,   forall p. Response p -> p
response :: p
  }
  deriving stock ((forall x. Response p -> Rep (Response p) x)
-> (forall x. Rep (Response p) x -> Response p)
-> Generic (Response p)
forall x. Rep (Response p) x -> Response p
forall x. Response p -> Rep (Response p) x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
forall p x. Rep (Response p) x -> Response p
forall p x. Response p -> Rep (Response p) x
$cfrom :: forall p x. Response p -> Rep (Response p) x
from :: forall x. Response p -> Rep (Response p) x
$cto :: forall p x. Rep (Response p) x -> Response p
to :: forall x. Rep (Response p) x -> Response p
Generic, Int -> Response p -> [Char] -> [Char]
[Response p] -> [Char] -> [Char]
Response p -> [Char]
(Int -> Response p -> [Char] -> [Char])
-> (Response p -> [Char])
-> ([Response p] -> [Char] -> [Char])
-> Show (Response p)
forall p. Show p => Int -> Response p -> [Char] -> [Char]
forall p. Show p => [Response p] -> [Char] -> [Char]
forall p. Show p => Response p -> [Char]
forall a.
(Int -> a -> [Char] -> [Char])
-> (a -> [Char]) -> ([a] -> [Char] -> [Char]) -> Show a
$cshowsPrec :: forall p. Show p => Int -> Response p -> [Char] -> [Char]
showsPrec :: Int -> Response p -> [Char] -> [Char]
$cshow :: forall p. Show p => Response p -> [Char]
show :: Response p -> [Char]
$cshowList :: forall p. Show p => [Response p] -> [Char] -> [Char]
showList :: [Response p] -> [Char] -> [Char]
Show)
instance (Binary p) => Binary (Response p)


{- |
  A description of a socket address on which a socket is or should be
  listening. Supports both IPv4 and IPv6.

  Examples:

  > AddressDescription "[::1]:80" -- IPv6 localhost, port 80
  > AddressDescription "127.0.0.1:80" -- IPv4 localhost, port 80
  > AddressDescription "somehost:80" -- IPv4 or IPv6 (depending on what name resolution returns), port 80
-}
newtype AddressDescription = AddressDescription {
    AddressDescription -> Text
unAddressDescription :: Text
  }
  deriving stock ((forall x. AddressDescription -> Rep AddressDescription x)
-> (forall x. Rep AddressDescription x -> AddressDescription)
-> Generic AddressDescription
forall x. Rep AddressDescription x -> AddressDescription
forall x. AddressDescription -> Rep AddressDescription x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cfrom :: forall x. AddressDescription -> Rep AddressDescription x
from :: forall x. AddressDescription -> Rep AddressDescription x
$cto :: forall x. Rep AddressDescription x -> AddressDescription
to :: forall x. Rep AddressDescription x -> AddressDescription
Generic)
  deriving newtype
    ( Get AddressDescription
[AddressDescription] -> Put
AddressDescription -> Put
(AddressDescription -> Put)
-> Get AddressDescription
-> ([AddressDescription] -> Put)
-> Binary AddressDescription
forall t. (t -> Put) -> Get t -> ([t] -> Put) -> Binary t
$cput :: AddressDescription -> Put
put :: AddressDescription -> Put
$cget :: Get AddressDescription
get :: Get AddressDescription
$cputList :: [AddressDescription] -> Put
putList :: [AddressDescription] -> Put
Binary
    , AddressDescription -> AddressDescription -> Bool
(AddressDescription -> AddressDescription -> Bool)
-> (AddressDescription -> AddressDescription -> Bool)
-> Eq AddressDescription
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: AddressDescription -> AddressDescription -> Bool
== :: AddressDescription -> AddressDescription -> Bool
$c/= :: AddressDescription -> AddressDescription -> Bool
/= :: AddressDescription -> AddressDescription -> Bool
Eq
    , Maybe AddressDescription
Value -> Parser [AddressDescription]
Value -> Parser AddressDescription
(Value -> Parser AddressDescription)
-> (Value -> Parser [AddressDescription])
-> Maybe AddressDescription
-> FromJSON AddressDescription
forall a.
(Value -> Parser a)
-> (Value -> Parser [a]) -> Maybe a -> FromJSON a
$cparseJSON :: Value -> Parser AddressDescription
parseJSON :: Value -> Parser AddressDescription
$cparseJSONList :: Value -> Parser [AddressDescription]
parseJSONList :: Value -> Parser [AddressDescription]
$comittedField :: Maybe AddressDescription
omittedField :: Maybe AddressDescription
FromJSON
    , FromJSONKeyFunction [AddressDescription]
FromJSONKeyFunction AddressDescription
FromJSONKeyFunction AddressDescription
-> FromJSONKeyFunction [AddressDescription]
-> FromJSONKey AddressDescription
forall a.
FromJSONKeyFunction a -> FromJSONKeyFunction [a] -> FromJSONKey a
$cfromJSONKey :: FromJSONKeyFunction AddressDescription
fromJSONKey :: FromJSONKeyFunction AddressDescription
$cfromJSONKeyList :: FromJSONKeyFunction [AddressDescription]
fromJSONKeyList :: FromJSONKeyFunction [AddressDescription]
FromJSONKey
    , [Char] -> AddressDescription
([Char] -> AddressDescription) -> IsString AddressDescription
forall a. ([Char] -> a) -> IsString a
$cfromString :: [Char] -> AddressDescription
fromString :: [Char] -> AddressDescription
IsString
    , Semigroup AddressDescription
AddressDescription
Semigroup AddressDescription =>
AddressDescription
-> (AddressDescription -> AddressDescription -> AddressDescription)
-> ([AddressDescription] -> AddressDescription)
-> Monoid AddressDescription
[AddressDescription] -> AddressDescription
AddressDescription -> AddressDescription -> AddressDescription
forall a.
Semigroup a =>
a -> (a -> a -> a) -> ([a] -> a) -> Monoid a
$cmempty :: AddressDescription
mempty :: AddressDescription
$cmappend :: AddressDescription -> AddressDescription -> AddressDescription
mappend :: AddressDescription -> AddressDescription -> AddressDescription
$cmconcat :: [AddressDescription] -> AddressDescription
mconcat :: [AddressDescription] -> AddressDescription
Monoid
    , Eq AddressDescription
Eq AddressDescription =>
(AddressDescription -> AddressDescription -> Ordering)
-> (AddressDescription -> AddressDescription -> Bool)
-> (AddressDescription -> AddressDescription -> Bool)
-> (AddressDescription -> AddressDescription -> Bool)
-> (AddressDescription -> AddressDescription -> Bool)
-> (AddressDescription -> AddressDescription -> AddressDescription)
-> (AddressDescription -> AddressDescription -> AddressDescription)
-> Ord AddressDescription
AddressDescription -> AddressDescription -> Bool
AddressDescription -> AddressDescription -> Ordering
AddressDescription -> AddressDescription -> AddressDescription
forall a.
Eq a =>
(a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
$ccompare :: AddressDescription -> AddressDescription -> Ordering
compare :: AddressDescription -> AddressDescription -> Ordering
$c< :: AddressDescription -> AddressDescription -> Bool
< :: AddressDescription -> AddressDescription -> Bool
$c<= :: AddressDescription -> AddressDescription -> Bool
<= :: AddressDescription -> AddressDescription -> Bool
$c> :: AddressDescription -> AddressDescription -> Bool
> :: AddressDescription -> AddressDescription -> Bool
$c>= :: AddressDescription -> AddressDescription -> Bool
>= :: AddressDescription -> AddressDescription -> Bool
$cmax :: AddressDescription -> AddressDescription -> AddressDescription
max :: AddressDescription -> AddressDescription -> AddressDescription
$cmin :: AddressDescription -> AddressDescription -> AddressDescription
min :: AddressDescription -> AddressDescription -> AddressDescription
Ord
    , NonEmpty AddressDescription -> AddressDescription
AddressDescription -> AddressDescription -> AddressDescription
(AddressDescription -> AddressDescription -> AddressDescription)
-> (NonEmpty AddressDescription -> AddressDescription)
-> (forall b.
    Integral b =>
    b -> AddressDescription -> AddressDescription)
-> Semigroup AddressDescription
forall b.
Integral b =>
b -> AddressDescription -> AddressDescription
forall a.
(a -> a -> a)
-> (NonEmpty a -> a)
-> (forall b. Integral b => b -> a -> a)
-> Semigroup a
$c<> :: AddressDescription -> AddressDescription -> AddressDescription
<> :: AddressDescription -> AddressDescription -> AddressDescription
$csconcat :: NonEmpty AddressDescription -> AddressDescription
sconcat :: NonEmpty AddressDescription -> AddressDescription
$cstimes :: forall b.
Integral b =>
b -> AddressDescription -> AddressDescription
stimes :: forall b.
Integral b =>
b -> AddressDescription -> AddressDescription
Semigroup
    , [AddressDescription] -> Value
[AddressDescription] -> Encoding
AddressDescription -> Bool
AddressDescription -> Value
AddressDescription -> Encoding
(AddressDescription -> Value)
-> (AddressDescription -> Encoding)
-> ([AddressDescription] -> Value)
-> ([AddressDescription] -> Encoding)
-> (AddressDescription -> Bool)
-> ToJSON AddressDescription
forall a.
(a -> Value)
-> (a -> Encoding)
-> ([a] -> Value)
-> ([a] -> Encoding)
-> (a -> Bool)
-> ToJSON a
$ctoJSON :: AddressDescription -> Value
toJSON :: AddressDescription -> Value
$ctoEncoding :: AddressDescription -> Encoding
toEncoding :: AddressDescription -> Encoding
$ctoJSONList :: [AddressDescription] -> Value
toJSONList :: [AddressDescription] -> Value
$ctoEncodingList :: [AddressDescription] -> Encoding
toEncodingList :: [AddressDescription] -> Encoding
$comitField :: AddressDescription -> Bool
omitField :: AddressDescription -> Bool
ToJSON
    , ToJSONKeyFunction [AddressDescription]
ToJSONKeyFunction AddressDescription
ToJSONKeyFunction AddressDescription
-> ToJSONKeyFunction [AddressDescription]
-> ToJSONKey AddressDescription
forall a.
ToJSONKeyFunction a -> ToJSONKeyFunction [a] -> ToJSONKey a
$ctoJSONKey :: ToJSONKeyFunction AddressDescription
toJSONKey :: ToJSONKeyFunction AddressDescription
$ctoJSONKeyList :: ToJSONKeyFunction [AddressDescription]
toJSONKeyList :: ToJSONKeyFunction [AddressDescription]
ToJSONKey
    )
instance Show AddressDescription where
  show :: AddressDescription -> [Char]
show = Text -> [Char]
T.unpack (Text -> [Char])
-> (AddressDescription -> Text) -> AddressDescription -> [Char]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. AddressDescription -> Text
unAddressDescription


{- | Client connection state. -}
data ClientState i o = ClientState
  {  forall i o. ClientState i o -> Bool
csServerAlive :: Bool
  ,   forall i o. ClientState i o -> Map MessageId (o -> IO ())
csResponders :: Map MessageId (o -> IO ())
  ,    forall i o. ClientState i o -> MessageId
csMessageId :: MessageId
  , forall i o. ClientState i o -> [(i, o -> IO ())]
csRequestQueue :: [(i, o -> IO ())]
  }


{- | A Request message type. -}
data Request p = Request
  { forall p. Request p -> MessageId
messageId :: MessageId
  ,   forall p. Request p -> p
payload :: p
  }
  deriving stock ((forall x. Request p -> Rep (Request p) x)
-> (forall x. Rep (Request p) x -> Request p)
-> Generic (Request p)
forall x. Rep (Request p) x -> Request p
forall x. Request p -> Rep (Request p) x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
forall p x. Rep (Request p) x -> Request p
forall p x. Request p -> Rep (Request p) x
$cfrom :: forall p x. Request p -> Rep (Request p) x
from :: forall x. Request p -> Rep (Request p) x
$cto :: forall p x. Rep (Request p) x -> Request p
to :: forall x. Rep (Request p) x -> Request p
Generic, Int -> Request p -> [Char] -> [Char]
[Request p] -> [Char] -> [Char]
Request p -> [Char]
(Int -> Request p -> [Char] -> [Char])
-> (Request p -> [Char])
-> ([Request p] -> [Char] -> [Char])
-> Show (Request p)
forall p. Show p => Int -> Request p -> [Char] -> [Char]
forall p. Show p => [Request p] -> [Char] -> [Char]
forall p. Show p => Request p -> [Char]
forall a.
(Int -> a -> [Char] -> [Char])
-> (a -> [Char]) -> ([a] -> [Char] -> [Char]) -> Show a
$cshowsPrec :: forall p. Show p => Int -> Request p -> [Char] -> [Char]
showsPrec :: Int -> Request p -> [Char] -> [Char]
$cshow :: forall p. Show p => Request p -> [Char]
show :: Request p -> [Char]
$cshowList :: forall p. Show p => [Request p] -> [Char] -> [Char]
showList :: [Request p] -> [Char] -> [Char]
Show)
instance (Binary p) => Binary (Request p)


{- | A message identifier. -}
newtype MessageId = MessageId {
    MessageId -> Word32
_unMessageId :: Word32
  }
  deriving newtype (Get MessageId
[MessageId] -> Put
MessageId -> Put
(MessageId -> Put)
-> Get MessageId -> ([MessageId] -> Put) -> Binary MessageId
forall t. (t -> Put) -> Get t -> ([t] -> Put) -> Binary t
$cput :: MessageId -> Put
put :: MessageId -> Put
$cget :: Get MessageId
get :: Get MessageId
$cputList :: [MessageId] -> Put
putList :: [MessageId] -> Put
Binary, Integer -> MessageId
MessageId -> MessageId
MessageId -> MessageId -> MessageId
(MessageId -> MessageId -> MessageId)
-> (MessageId -> MessageId -> MessageId)
-> (MessageId -> MessageId -> MessageId)
-> (MessageId -> MessageId)
-> (MessageId -> MessageId)
-> (MessageId -> MessageId)
-> (Integer -> MessageId)
-> Num MessageId
forall a.
(a -> a -> a)
-> (a -> a -> a)
-> (a -> a -> a)
-> (a -> a)
-> (a -> a)
-> (a -> a)
-> (Integer -> a)
-> Num a
$c+ :: MessageId -> MessageId -> MessageId
+ :: MessageId -> MessageId -> MessageId
$c- :: MessageId -> MessageId -> MessageId
- :: MessageId -> MessageId -> MessageId
$c* :: MessageId -> MessageId -> MessageId
* :: MessageId -> MessageId -> MessageId
$cnegate :: MessageId -> MessageId
negate :: MessageId -> MessageId
$cabs :: MessageId -> MessageId
abs :: MessageId -> MessageId
$csignum :: MessageId -> MessageId
signum :: MessageId -> MessageId
$cfromInteger :: Integer -> MessageId
fromInteger :: Integer -> MessageId
Num, MessageId
MessageId -> MessageId -> Bounded MessageId
forall a. a -> a -> Bounded a
$cminBound :: MessageId
minBound :: MessageId
$cmaxBound :: MessageId
maxBound :: MessageId
Bounded, MessageId -> MessageId -> Bool
(MessageId -> MessageId -> Bool)
-> (MessageId -> MessageId -> Bool) -> Eq MessageId
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: MessageId -> MessageId -> Bool
== :: MessageId -> MessageId -> Bool
$c/= :: MessageId -> MessageId -> Bool
/= :: MessageId -> MessageId -> Bool
Eq, Eq MessageId
Eq MessageId =>
(MessageId -> MessageId -> Ordering)
-> (MessageId -> MessageId -> Bool)
-> (MessageId -> MessageId -> Bool)
-> (MessageId -> MessageId -> Bool)
-> (MessageId -> MessageId -> Bool)
-> (MessageId -> MessageId -> MessageId)
-> (MessageId -> MessageId -> MessageId)
-> Ord MessageId
MessageId -> MessageId -> Bool
MessageId -> MessageId -> Ordering
MessageId -> MessageId -> MessageId
forall a.
Eq a =>
(a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
$ccompare :: MessageId -> MessageId -> Ordering
compare :: MessageId -> MessageId -> Ordering
$c< :: MessageId -> MessageId -> Bool
< :: MessageId -> MessageId -> Bool
$c<= :: MessageId -> MessageId -> Bool
<= :: MessageId -> MessageId -> Bool
$c> :: MessageId -> MessageId -> Bool
> :: MessageId -> MessageId -> Bool
$c>= :: MessageId -> MessageId -> Bool
>= :: MessageId -> MessageId -> Bool
$cmax :: MessageId -> MessageId -> MessageId
max :: MessageId -> MessageId -> MessageId
$cmin :: MessageId -> MessageId -> MessageId
min :: MessageId -> MessageId -> MessageId
Ord, Int -> MessageId -> [Char] -> [Char]
[MessageId] -> [Char] -> [Char]
MessageId -> [Char]
(Int -> MessageId -> [Char] -> [Char])
-> (MessageId -> [Char])
-> ([MessageId] -> [Char] -> [Char])
-> Show MessageId
forall a.
(Int -> a -> [Char] -> [Char])
-> (a -> [Char]) -> ([a] -> [Char] -> [Char]) -> Show a
$cshowsPrec :: Int -> MessageId -> [Char] -> [Char]
showsPrec :: Int -> MessageId -> [Char] -> [Char]
$cshow :: MessageId -> [Char]
show :: MessageId -> [Char]
$cshowList :: [MessageId] -> [Char] -> [Char]
showList :: [MessageId] -> [Char] -> [Char]
Show, Int -> MessageId
MessageId -> Int
MessageId -> [MessageId]
MessageId -> MessageId
MessageId -> MessageId -> [MessageId]
MessageId -> MessageId -> MessageId -> [MessageId]
(MessageId -> MessageId)
-> (MessageId -> MessageId)
-> (Int -> MessageId)
-> (MessageId -> Int)
-> (MessageId -> [MessageId])
-> (MessageId -> MessageId -> [MessageId])
-> (MessageId -> MessageId -> [MessageId])
-> (MessageId -> MessageId -> MessageId -> [MessageId])
-> Enum MessageId
forall a.
(a -> a)
-> (a -> a)
-> (Int -> a)
-> (a -> Int)
-> (a -> [a])
-> (a -> a -> [a])
-> (a -> a -> [a])
-> (a -> a -> a -> [a])
-> Enum a
$csucc :: MessageId -> MessageId
succ :: MessageId -> MessageId
$cpred :: MessageId -> MessageId
pred :: MessageId -> MessageId
$ctoEnum :: Int -> MessageId
toEnum :: Int -> MessageId
$cfromEnum :: MessageId -> Int
fromEnum :: MessageId -> Int
$cenumFrom :: MessageId -> [MessageId]
enumFrom :: MessageId -> [MessageId]
$cenumFromThen :: MessageId -> MessageId -> [MessageId]
enumFromThen :: MessageId -> MessageId -> [MessageId]
$cenumFromTo :: MessageId -> MessageId -> [MessageId]
enumFromTo :: MessageId -> MessageId -> [MessageId]
$cenumFromThenTo :: MessageId -> MessageId -> MessageId -> [MessageId]
enumFromThenTo :: MessageId -> MessageId -> MessageId -> [MessageId]
Enum)


{- |
  Proof that a response function was called on the server. Mainly
  useful for including in a type signature somewhere in your server
  implementation to help ensure that you actually responded to the
  request in all cases.
-}
data Responded = Responded


mvarToStream
  :: (MonadIO m)
  => MVar i
  -> Stream (Of i) m never_returns
mvarToStream :: forall (m :: * -> *) i never_returns.
MonadIO m =>
MVar i -> Stream (Of i) m never_returns
mvarToStream MVar i
mvar = do
  IO i -> Stream (Of i) m i
forall a. IO a -> Stream (Of i) m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (MVar i -> IO i
forall a. MVar a -> IO a
takeMVar MVar i
mvar) Stream (Of i) m i
-> (i -> Stream (Of i) m ()) -> Stream (Of i) m ()
forall a b.
Stream (Of i) m a -> (a -> Stream (Of i) m b) -> Stream (Of i) m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= i -> Stream (Of i) m ()
forall (m :: * -> *) a. Monad m => a -> Stream (Of a) m ()
S.yield
  MVar i -> Stream (Of i) m never_returns
forall (m :: * -> *) i never_returns.
MonadIO m =>
MVar i -> Stream (Of i) m never_returns
mvarToStream MVar i
mvar


rereadNull
  :: (Monad m)
  => (c -> m ByteString)
  -> c
  -> ByteStream m ()
rereadNull :: forall (m :: * -> *) c.
Monad m =>
(c -> m ByteString) -> c -> ByteStream m ()
rereadNull c -> m ByteString
f =
  (c -> m (Maybe ByteString)) -> c -> ByteStream m ()
forall (m :: * -> *) s.
Monad m =>
(s -> m (Maybe ByteString)) -> s -> ByteStream m ()
reread
    (\c
c -> do
      ByteString
bytes <- c -> m ByteString
f c
c
      Maybe ByteString -> m (Maybe ByteString)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe ByteString -> m (Maybe ByteString))
-> Maybe ByteString -> m (Maybe ByteString)
forall a b. (a -> b) -> a -> b
$ if ByteString -> Bool
BS.null ByteString
bytes then Maybe ByteString
forall a. Maybe a
Nothing else ByteString -> Maybe ByteString
forall a. a -> Maybe a
Just ByteString
bytes
    )


{-|
  If the key exists in the map, delete it and return its value along
  with the new map.
-}
deleteFind
  :: (Ord k)
  => k
  -> Map k v
  -> Maybe (v, Map k v)
deleteFind :: forall k v. Ord k => k -> Map k v -> Maybe (v, Map k v)
deleteFind k
key Map k v
m =
  case k -> Map k v -> Maybe v
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup k
key Map k v
m of
    Maybe v
Nothing -> Maybe (v, Map k v)
forall a. Maybe a
Nothing
    Just v
v ->
      (v, Map k v) -> Maybe (v, Map k v)
forall a. a -> Maybe a
Just (v
v, k -> Map k v -> Map k v
forall k a. Ord k => k -> Map k a -> Map k a
Map.delete k
key Map k v
m)