{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE DerivingStrategies #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE ScopedTypeVariables #-}
module OM.Socket (
AddressDescription(..),
resolveAddr,
openIngress,
openEgress,
openServer,
Responded,
connectServer,
) where
import Control.Concurrent (MVar, forkIO, newEmptyMVar, putMVar, takeMVar)
import Control.Concurrent.STM (TVar, atomically, newTVarIO, readTVar,
retry, writeTVar)
import Control.Exception (SomeException, bracketOnError, throw)
import Control.Monad (when)
import Control.Monad.Catch (MonadThrow(throwM), MonadCatch, try)
import Control.Monad.IO.Unlift (MonadUnliftIO)
import Control.Monad.Logger.CallStack (LoggingT(runLoggingT),
MonadLoggerIO(askLoggerIO), NoLoggingT(runNoLoggingT), MonadLogger,
logDebug, logError, logWarn)
import Data.Aeson (FromJSON, FromJSONKey, ToJSON, ToJSONKey)
import Data.Binary (Binary(get), encode)
import Data.Binary.Get (Decoder(Done, Fail, Partial), pushChunk,
runGetIncremental)
import Data.ByteString (ByteString)
import Data.Foldable (traverse_)
import Data.Map (Map)
import Data.String (IsString)
import Data.Text (Text)
import Data.Void (Void)
import Data.Word (Word32)
import GHC.Generics (Generic)
import Network.Socket (AddrInfo(addrAddress), Family(AF_INET, AF_INET6,
AF_UNIX), SockAddr(SockAddrInet, SockAddrInet6, SockAddrUnix),
SocketOption(ReuseAddr), SocketType(Stream), HostName, ServiceName,
Socket, accept, bind, close, connect, defaultProtocol, getAddrInfo,
listen, setSocketOption, socket)
import Network.Socket.ByteString (recv)
import Network.Socket.ByteString.Lazy (sendAll)
import Network.TLS (ClientParams, Context, ServerParams, contextNew,
handshake, recvData, sendData)
import OM.Fork (Race, race)
import OM.Show (showt)
import Prelude (Applicative(pure), Bool(False, True), Bounded(minBound),
Either(Left, Right), Enum(succ), Eq((/=)), Functor(fmap), Maybe(Just,
Nothing), Monad((>>), (>>=), return), MonadFail(fail), Monoid(mempty),
Semigroup((<>)), Show(show), ($), (++), (.), (=<<), IO, Num, Ord,
String, flip, snd, userError)
import Streaming (Alternative((<|>)), MFunctor(hoist), MonadIO(liftIO),
MonadTrans(lift), Of, Stream, join, void)
import Streaming.Binary (decoded)
import Streaming.ByteString (ByteStream, reread)
import Text.Megaparsec (MonadParsec(eof), Parsec, many, oneOf, parse,
satisfy)
import Text.Megaparsec.Char (char)
import qualified Data.ByteString as BS
import qualified Data.ByteString.Lazy as BSL
import qualified Data.Map as Map
import qualified Data.Text as T
import qualified Streaming.Prelude as S
import qualified Text.Megaparsec as M
openIngress
:: forall i m never_returns.
( Binary i
, MonadFail m
, MonadIO m
, Race
)
=> AddressDescription
-> Stream (Of i) m never_returns
openIngress :: forall i (m :: * -> *) never_returns.
(Binary i, MonadFail m, MonadIO m, Race) =>
AddressDescription -> Stream (Of i) m never_returns
openIngress AddressDescription
bindAddr = do
Socket
so <- SockAddr -> Stream (Of i) m Socket
forall (m :: * -> *). MonadIO m => SockAddr -> m Socket
listenSocket (SockAddr -> Stream (Of i) m Socket)
-> Stream (Of i) m SockAddr -> Stream (Of i) m Socket
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< AddressDescription -> Stream (Of i) m SockAddr
forall (m :: * -> *).
(MonadIO m, MonadFail m) =>
AddressDescription -> m SockAddr
resolveAddr AddressDescription
bindAddr
MVar i
mvar <- IO (MVar i) -> Stream (Of i) m (MVar i)
forall a. IO a -> Stream (Of i) m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (MVar i)
forall a. IO (MVar a)
newEmptyMVar
IO () -> Stream (Of i) m ()
forall a. IO a -> Stream (Of i) m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
(IO () -> Stream (Of i) m ())
-> (IO () -> IO ()) -> IO () -> Stream (Of i) m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. NoLoggingT IO () -> IO ()
forall (m :: * -> *) a. NoLoggingT m a -> m a
runNoLoggingT
(NoLoggingT IO () -> IO ())
-> (IO () -> NoLoggingT IO ()) -> IO () -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ProcessName -> NoLoggingT IO () -> NoLoggingT IO ()
forall (m :: * -> *) a.
(MonadCatch m, MonadLogger m, MonadUnliftIO m, Race) =>
ProcessName -> m a -> m ()
race ProcessName
"ingress accept loop"
(NoLoggingT IO () -> NoLoggingT IO ())
-> (IO () -> NoLoggingT IO ()) -> IO () -> NoLoggingT IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO () -> NoLoggingT IO ()
forall a. IO a -> NoLoggingT IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
(IO () -> Stream (Of i) m ()) -> IO () -> Stream (Of i) m ()
forall a b. (a -> b) -> a -> b
$ Socket -> MVar i -> IO ()
acceptLoop Socket
so MVar i
mvar
MVar i -> Stream (Of i) m never_returns
forall (m :: * -> *) i never_returns.
MonadIO m =>
MVar i -> Stream (Of i) m never_returns
mvarToStream MVar i
mvar
where
acceptLoop :: Socket -> MVar i -> IO ()
acceptLoop :: Socket -> MVar i -> IO ()
acceptLoop Socket
so MVar i
mvar = do
(Socket
conn, SockAddr
_) <- Socket -> IO (Socket, SockAddr)
accept Socket
so
IO ThreadId -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO ThreadId -> IO ()) -> (IO () -> IO ThreadId) -> IO () -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO () -> IO ThreadId
forkIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Decoder i -> Socket -> MVar i -> IO ()
feed (Get i -> Decoder i
forall a. Get a -> Decoder a
runGetIncremental Get i
forall t. Binary t => Get t
get) Socket
conn MVar i
mvar
Socket -> MVar i -> IO ()
acceptLoop Socket
so MVar i
mvar
feed
:: Decoder i
-> Socket
-> MVar i
-> IO ()
feed :: Decoder i -> Socket -> MVar i -> IO ()
feed (Done ByteString
leftover ByteOffset
_ i
i) Socket
conn MVar i
mvar = do
MVar i -> i -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar i
mvar i
i
Decoder i -> Socket -> MVar i -> IO ()
feed (Get i -> Decoder i
forall a. Get a -> Decoder a
runGetIncremental Get i
forall t. Binary t => Get t
get Decoder i -> ByteString -> Decoder i
forall a. Decoder a -> ByteString -> Decoder a
`pushChunk` ByteString
leftover) Socket
conn MVar i
mvar
feed (Partial Maybe ByteString -> Decoder i
k) Socket
conn MVar i
mvar = do
ByteString
bytes <- Socket -> Int -> IO ByteString
recv Socket
conn Int
4096
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (ByteString -> Bool
BS.null ByteString
bytes) ([Char] -> IO ()
forall a. [Char] -> IO a
forall (m :: * -> *) a. MonadFail m => [Char] -> m a
fail [Char]
"Socket closed by peer.")
Decoder i -> Socket -> MVar i -> IO ()
feed (Maybe ByteString -> Decoder i
k (ByteString -> Maybe ByteString
forall a. a -> Maybe a
Just ByteString
bytes)) Socket
conn MVar i
mvar
feed (Fail ByteString
_ ByteOffset
_ [Char]
err) Socket
_conn MVar i
_chan =
[Char] -> IO ()
forall a. [Char] -> IO a
forall (m :: * -> *) a. MonadFail m => [Char] -> m a
fail ([Char] -> IO ()) -> [Char] -> IO ()
forall a b. (a -> b) -> a -> b
$ [Char]
"Socket crashed. Decoding error: " [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ [Char] -> [Char]
forall a. Show a => a -> [Char]
show [Char]
err
openEgress
:: ( Binary o
, MonadFail m
, MonadIO m
)
=> AddressDescription
-> Stream (Of o) m r
-> m r
openEgress :: forall o (m :: * -> *) r.
(Binary o, MonadFail m, MonadIO m) =>
AddressDescription -> Stream (Of o) m r -> m r
openEgress AddressDescription
addr Stream (Of o) m r
stream = do
Socket
so <- SockAddr -> m Socket
forall (m :: * -> *). MonadIO m => SockAddr -> m Socket
connectSocket (SockAddr -> m Socket) -> m SockAddr -> m Socket
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< AddressDescription -> m SockAddr
forall (m :: * -> *).
(MonadIO m, MonadFail m) =>
AddressDescription -> m SockAddr
resolveAddr AddressDescription
addr
r
result <-
(o -> m ()) -> Stream (Of o) m r -> m r
forall (m :: * -> *) a x r.
Monad m =>
(a -> m x) -> Stream (Of a) m r -> m r
S.mapM_
(IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> (o -> IO ()) -> o -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Socket -> ByteString -> IO ()
sendAll Socket
so (ByteString -> IO ()) -> (o -> ByteString) -> o -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. o -> ByteString
forall a. Binary a => a -> ByteString
encode)
Stream (Of o) m r
stream
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Socket -> IO ()
close Socket
so)
r -> m r
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure r
result
fam :: SockAddr -> Family
fam :: SockAddr -> Family
fam SockAddrInet {} = Family
AF_INET
fam SockAddrInet6 {} = Family
AF_INET6
fam SockAddrUnix {} = Family
AF_UNIX
resolveAddr :: (MonadIO m, MonadFail m) => AddressDescription -> m SockAddr
resolveAddr :: forall (m :: * -> *).
(MonadIO m, MonadFail m) =>
AddressDescription -> m SockAddr
resolveAddr AddressDescription
addr = do
([Char]
host, [Char]
port) <- AddressDescription -> m ([Char], [Char])
forall (m :: * -> *).
MonadFail m =>
AddressDescription -> m ([Char], [Char])
parseAddr AddressDescription
addr
IO [AddrInfo] -> m [AddrInfo]
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Maybe AddrInfo -> Maybe [Char] -> Maybe [Char] -> IO [AddrInfo]
getAddrInfo Maybe AddrInfo
forall a. Maybe a
Nothing ([Char] -> Maybe [Char]
forall a. a -> Maybe a
Just [Char]
host) ([Char] -> Maybe [Char]
forall a. a -> Maybe a
Just [Char]
port)) m [AddrInfo] -> ([AddrInfo] -> m SockAddr) -> m SockAddr
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
[] -> [Char] -> m SockAddr
forall a. [Char] -> m a
forall (m :: * -> *) a. MonadFail m => [Char] -> m a
fail [Char]
"Address not found: (host, port)"
AddrInfo
sa:[AddrInfo]
_ -> SockAddr -> m SockAddr
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (AddrInfo -> SockAddr
addrAddress AddrInfo
sa)
parseAddr :: (MonadFail m) => AddressDescription -> m (HostName, ServiceName)
parseAddr :: forall (m :: * -> *).
MonadFail m =>
AddressDescription -> m ([Char], [Char])
parseAddr AddressDescription
addr =
case Parsec Void Text ([Char], [Char])
-> [Char]
-> Text
-> Either (ParseErrorBundle Text Void) ([Char], [Char])
forall e s a.
Parsec e s a -> [Char] -> s -> Either (ParseErrorBundle s e) a
parse Parsec Void Text ([Char], [Char])
parser [Char]
"$" (AddressDescription -> Text
unAddressDescription AddressDescription
addr) of
Left ParseErrorBundle Text Void
err -> [Char] -> m ([Char], [Char])
forall a. [Char] -> m a
forall (m :: * -> *) a. MonadFail m => [Char] -> m a
fail (ParseErrorBundle Text Void -> [Char]
forall a. Show a => a -> [Char]
show ParseErrorBundle Text Void
err)
Right ([Char]
host, [Char]
port) -> ([Char], [Char]) -> m ([Char], [Char])
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return ([Char]
host, [Char]
port)
where
parser :: Parsec Void Text (HostName, ServiceName)
parser :: Parsec Void Text ([Char], [Char])
parser = do
[Char]
host <- ParsecT Void Text Identity [Char]
-> ParsecT Void Text Identity [Char]
forall a.
ParsecT Void Text Identity a -> ParsecT Void Text Identity a
forall e s (m :: * -> *) a. MonadParsec e s m => m a -> m a
M.try ParsecT Void Text Identity [Char]
ipv6 ParsecT Void Text Identity [Char]
-> ParsecT Void Text Identity [Char]
-> ParsecT Void Text Identity [Char]
forall a.
ParsecT Void Text Identity a
-> ParsecT Void Text Identity a -> ParsecT Void Text Identity a
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> ParsecT Void Text Identity [Char]
ipv4
ParsecT Void Text Identity Char -> ParsecT Void Text Identity ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (ParsecT Void Text Identity Char -> ParsecT Void Text Identity ())
-> ParsecT Void Text Identity Char -> ParsecT Void Text Identity ()
forall a b. (a -> b) -> a -> b
$ Token Text -> ParsecT Void Text Identity (Token Text)
forall e s (m :: * -> *).
(MonadParsec e s m, Token s ~ Char) =>
Token s -> m (Token s)
char Char
Token Text
':'
[Char]
port <- ParsecT Void Text Identity Char
-> ParsecT Void Text Identity [Char]
forall (m :: * -> *) a. MonadPlus m => m a -> m [a]
many ([Token Text] -> ParsecT Void Text Identity (Token Text)
forall (f :: * -> *) e s (m :: * -> *).
(Foldable f, MonadParsec e s m) =>
f (Token s) -> m (Token s)
oneOf ([Char]
"0123456789" :: String))
ParsecT Void Text Identity ()
forall e s (m :: * -> *). MonadParsec e s m => m ()
eof
([Char], [Char]) -> Parsec Void Text ([Char], [Char])
forall a. a -> ParsecT Void Text Identity a
forall (m :: * -> *) a. Monad m => a -> m a
return ([Char]
host, [Char]
port)
ipv6 :: Parsec Void Text HostName
ipv6 :: ParsecT Void Text Identity [Char]
ipv6 = do
ParsecT Void Text Identity Char -> ParsecT Void Text Identity ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (ParsecT Void Text Identity Char -> ParsecT Void Text Identity ())
-> ParsecT Void Text Identity Char -> ParsecT Void Text Identity ()
forall a b. (a -> b) -> a -> b
$ Token Text -> ParsecT Void Text Identity (Token Text)
forall e s (m :: * -> *).
(MonadParsec e s m, Token s ~ Char) =>
Token s -> m (Token s)
char Char
Token Text
'['
[Char]
host <- ParsecT Void Text Identity Char
-> ParsecT Void Text Identity [Char]
forall (m :: * -> *) a. MonadPlus m => m a -> m [a]
many ((Token Text -> Bool) -> ParsecT Void Text Identity (Token Text)
forall e s (m :: * -> *).
MonadParsec e s m =>
(Token s -> Bool) -> m (Token s)
satisfy (Token Text -> Token Text -> Bool
forall a. Eq a => a -> a -> Bool
/= Char
Token Text
']'))
ParsecT Void Text Identity Char -> ParsecT Void Text Identity ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (ParsecT Void Text Identity Char -> ParsecT Void Text Identity ())
-> ParsecT Void Text Identity Char -> ParsecT Void Text Identity ()
forall a b. (a -> b) -> a -> b
$ Token Text -> ParsecT Void Text Identity (Token Text)
forall e s (m :: * -> *).
(MonadParsec e s m, Token s ~ Char) =>
Token s -> m (Token s)
char Char
Token Text
']'
[Char] -> ParsecT Void Text Identity [Char]
forall a. a -> ParsecT Void Text Identity a
forall (m :: * -> *) a. Monad m => a -> m a
return [Char]
host
ipv4 :: Parsec Void Text HostName
ipv4 :: ParsecT Void Text Identity [Char]
ipv4 = ParsecT Void Text Identity Char
-> ParsecT Void Text Identity [Char]
forall (m :: * -> *) a. MonadPlus m => m a -> m [a]
many ((Token Text -> Bool) -> ParsecT Void Text Identity (Token Text)
forall e s (m :: * -> *).
MonadParsec e s m =>
(Token s -> Bool) -> m (Token s)
satisfy (Token Text -> Token Text -> Bool
forall a. Eq a => a -> a -> Bool
/= Char
Token Text
':'))
connectSocket :: (MonadIO m) => SockAddr -> m Socket
connectSocket :: forall (m :: * -> *). MonadIO m => SockAddr -> m Socket
connectSocket SockAddr
addr = IO Socket -> m Socket
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Socket -> m Socket) -> IO Socket -> m Socket
forall a b. (a -> b) -> a -> b
$
IO Socket
-> (Socket -> IO ()) -> (Socket -> IO Socket) -> IO Socket
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracketOnError
(Family -> SocketType -> ProtocolNumber -> IO Socket
socket (SockAddr -> Family
fam SockAddr
addr) SocketType
Stream ProtocolNumber
defaultProtocol)
Socket -> IO ()
close
(\Socket
so -> Socket -> SockAddr -> IO ()
connect Socket
so SockAddr
addr IO () -> IO Socket -> IO Socket
forall a b. IO a -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Socket -> IO Socket
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Socket
so)
listenSocket :: (MonadIO m) => SockAddr -> m Socket
listenSocket :: forall (m :: * -> *). MonadIO m => SockAddr -> m Socket
listenSocket SockAddr
addr = IO Socket -> m Socket
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Socket -> m Socket) -> IO Socket -> m Socket
forall a b. (a -> b) -> a -> b
$ do
Socket
so <- Family -> SocketType -> ProtocolNumber -> IO Socket
socket (SockAddr -> Family
fam SockAddr
addr) SocketType
Stream ProtocolNumber
defaultProtocol
Socket -> SocketOption -> Int -> IO ()
setSocketOption Socket
so SocketOption
ReuseAddr Int
1
Socket -> SockAddr -> IO ()
bind Socket
so SockAddr
addr
Socket -> Int -> IO ()
listen Socket
so Int
5
Socket -> IO Socket
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Socket
so
openServer
:: forall request response m never_returns.
( Binary request
, Binary response
, MonadLogger m
, MonadCatch m
, MonadFail m
, MonadUnliftIO m
, Race
)
=> AddressDescription
-> Maybe (IO ServerParams)
-> Stream (Of (request, response -> m Responded)) m never_returns
openServer :: forall request response (m :: * -> *) never_returns.
(Binary request, Binary response, MonadLogger m, MonadCatch m,
MonadFail m, MonadUnliftIO m, Race) =>
AddressDescription
-> Maybe (IO ServerParams)
-> Stream (Of (request, response -> m Responded)) m never_returns
openServer AddressDescription
bindAddr Maybe (IO ServerParams)
tls = do
Socket
so <- SockAddr -> Stream (Of (request, response -> m Responded)) m Socket
forall (m :: * -> *). MonadIO m => SockAddr -> m Socket
listenSocket (SockAddr
-> Stream (Of (request, response -> m Responded)) m Socket)
-> Stream (Of (request, response -> m Responded)) m SockAddr
-> Stream (Of (request, response -> m Responded)) m Socket
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< AddressDescription
-> Stream (Of (request, response -> m Responded)) m SockAddr
forall (m :: * -> *).
(MonadIO m, MonadFail m) =>
AddressDescription -> m SockAddr
resolveAddr AddressDescription
bindAddr
MVar (request, response -> m Responded)
requestMVar <- IO (MVar (request, response -> m Responded))
-> Stream
(Of (request, response -> m Responded))
m
(MVar (request, response -> m Responded))
forall a.
IO a -> Stream (Of (request, response -> m Responded)) m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (MVar (request, response -> m Responded))
forall a. IO (MVar a)
newEmptyMVar
m () -> Stream (Of (request, response -> m Responded)) m ()
forall (m :: * -> *) a.
Monad m =>
m a -> Stream (Of (request, response -> m Responded)) m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift
(m () -> Stream (Of (request, response -> m Responded)) m ())
-> (m Any -> m ())
-> m Any
-> Stream (Of (request, response -> m Responded)) m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ProcessName -> m Any -> m ()
forall (m :: * -> *) a.
(MonadCatch m, MonadLogger m, MonadUnliftIO m, Race) =>
ProcessName -> m a -> m ()
race ProcessName
"server accept loop"
(m Any -> Stream (Of (request, response -> m Responded)) m ())
-> m Any -> Stream (Of (request, response -> m Responded)) m ()
forall a b. (a -> b) -> a -> b
$ Socket -> MVar (request, response -> m Responded) -> m Any
forall void.
Socket -> MVar (request, response -> m Responded) -> m void
acceptLoop Socket
so MVar (request, response -> m Responded)
requestMVar
MVar (request, response -> m Responded)
-> Stream (Of (request, response -> m Responded)) m never_returns
forall (m :: * -> *) i never_returns.
MonadIO m =>
MVar i -> Stream (Of i) m never_returns
mvarToStream MVar (request, response -> m Responded)
requestMVar
where
acceptLoop
:: Socket
-> MVar (request, response -> m Responded)
-> m void
acceptLoop :: forall void.
Socket -> MVar (request, response -> m Responded) -> m void
acceptLoop Socket
so MVar (request, response -> m Responded)
requestMVar = do
(Socket
conn, SockAddr
ra) <- IO (Socket, SockAddr) -> m (Socket, SockAddr)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Socket -> IO (Socket, SockAddr)
accept Socket
so)
Text -> m ()
forall (m :: * -> *). (HasCallStack, MonadLogger m) => Text -> m ()
logDebug (Text -> m ()) -> Text -> m ()
forall a b. (a -> b) -> a -> b
$ Text
"New connection: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> SockAddr -> Text
forall a b. (Show a, IsString b) => a -> b
showt SockAddr
ra
(ByteStream IO ()
input, ByteString -> m ()
send) <- Socket -> m (ByteStream IO (), ByteString -> m ())
prepareConnection Socket
conn
m ThreadId -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m ThreadId -> m ()) -> (IO () -> m ThreadId) -> IO () -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO ThreadId -> m ThreadId
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ThreadId -> m ThreadId)
-> (IO () -> IO ThreadId) -> IO () -> m ThreadId
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO () -> IO ThreadId
forkIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ ByteStream IO ()
-> (ByteString -> m ())
-> MVar (request, response -> m Responded)
-> IO ()
handleConnection ByteStream IO ()
input ByteString -> m ()
send MVar (request, response -> m Responded)
requestMVar
Socket -> MVar (request, response -> m Responded) -> m void
forall void.
Socket -> MVar (request, response -> m Responded) -> m void
acceptLoop Socket
so MVar (request, response -> m Responded)
requestMVar
handleConnection
:: ByteStream IO ()
-> (BSL.ByteString -> m ())
-> MVar (request, response -> m Responded)
-> IO ()
handleConnection :: ByteStream IO ()
-> (ByteString -> m ())
-> MVar (request, response -> m Responded)
-> IO ()
handleConnection ByteStream IO ()
input ByteString -> m ()
send MVar (request, response -> m Responded)
requestMVar =
IO (ByteStream IO (), ByteOffset, Either [Char] ()) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (ByteStream IO (), ByteOffset, Either [Char] ()) -> IO ())
-> IO (ByteStream IO (), ByteOffset, Either [Char] ()) -> IO ()
forall a b. (a -> b) -> a -> b
$
(Request request -> IO ())
-> Stream
(Of (Request request))
IO
(ByteStream IO (), ByteOffset, Either [Char] ())
-> IO (ByteStream IO (), ByteOffset, Either [Char] ())
forall (m :: * -> *) a x r.
Monad m =>
(a -> m x) -> Stream (Of a) m r -> m r
S.mapM_
Request request -> IO ()
sendRequestToMVar
(ByteStream IO ()
-> Stream
(Of (Request request))
IO
(ByteStream IO (), ByteOffset, Either [Char] ())
forall a (m :: * -> *) r.
(Binary a, Monad m) =>
ByteString m r
-> Stream (Of a) m (ByteString m r, ByteOffset, Either [Char] r)
decoded ByteStream IO ()
input)
where
sendRequestToMVar :: Request request -> IO ()
sendRequestToMVar :: Request request -> IO ()
sendRequestToMVar Request { MessageId
messageId :: MessageId
messageId :: forall p. Request p -> MessageId
messageId , request
payload :: request
payload :: forall p. Request p -> p
payload } =
MVar (request, response -> m Responded)
-> (request, response -> m Responded) -> IO ()
forall a. MVar a -> a -> IO ()
putMVar
MVar (request, response -> m Responded)
requestMVar
( request
payload
, MessageId -> response -> m Responded
respond MessageId
messageId
)
respond :: MessageId -> response -> m Responded
respond :: MessageId -> response -> m Responded
respond MessageId
responseTo response
response = do
ByteString -> m ()
send (ByteString -> m ())
-> (Response response -> ByteString) -> Response response -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Response response -> ByteString
forall a. Binary a => a -> ByteString
encode (Response response -> m ()) -> Response response -> m ()
forall a b. (a -> b) -> a -> b
$ Response { MessageId
responseTo :: MessageId
responseTo :: MessageId
responseTo , response
response :: response
response :: response
response }
Responded -> m Responded
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Responded
Responded
prepareConnection
:: Socket
-> m
( ByteStream IO ()
, BSL.ByteString -> m ()
)
prepareConnection :: Socket -> m (ByteStream IO (), ByteString -> m ())
prepareConnection Socket
conn =
case Maybe (IO ServerParams)
tls of
Maybe (IO ServerParams)
Nothing ->
(ByteStream IO (), ByteString -> m ())
-> m (ByteStream IO (), ByteString -> m ())
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure
( (Socket -> IO ByteString) -> Socket -> ByteStream IO ()
forall (m :: * -> *) c.
Monad m =>
(c -> m ByteString) -> c -> ByteStream m ()
rereadNull
((Socket -> Int -> IO ByteString) -> Int -> Socket -> IO ByteString
forall a b c. (a -> b -> c) -> b -> a -> c
flip Socket -> Int -> IO ByteString
recv Int
4096)
Socket
conn
, IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> (ByteString -> IO ()) -> ByteString -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Socket -> ByteString -> IO ()
sendAll Socket
conn
)
Just IO ServerParams
getParams ->
IO (ByteStream IO (), ByteString -> m ())
-> m (ByteStream IO (), ByteString -> m ())
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (ByteStream IO (), ByteString -> m ())
-> m (ByteStream IO (), ByteString -> m ()))
-> IO (ByteStream IO (), ByteString -> m ())
-> m (ByteStream IO (), ByteString -> m ())
forall a b. (a -> b) -> a -> b
$ do
Context
ctx <- Socket -> ServerParams -> IO Context
forall (m :: * -> *) backend params.
(MonadIO m, HasBackend backend, TLSParams params) =>
backend -> params -> m Context
contextNew Socket
conn (ServerParams -> IO Context) -> IO ServerParams -> IO Context
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< IO ServerParams
getParams
Context -> IO ()
forall (m :: * -> *). MonadIO m => Context -> m ()
handshake Context
ctx
(ByteStream IO (), ByteString -> m ())
-> IO (ByteStream IO (), ByteString -> m ())
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure
( (Context -> IO ByteString) -> Context -> ByteStream IO ()
forall (m :: * -> *) c.
Monad m =>
(c -> m ByteString) -> c -> ByteStream m ()
rereadNull Context -> IO ByteString
forall (m :: * -> *). MonadIO m => Context -> m ByteString
recvData Context
ctx
, Context -> ByteString -> m ()
forall (m :: * -> *). MonadIO m => Context -> ByteString -> m ()
sendData Context
ctx
)
connectServer
:: forall n request m response.
( Binary request
, Binary response
, MonadIO m
, MonadLoggerIO n
, Show response
)
=> AddressDescription
-> Maybe ClientParams
-> n (request -> m response)
connectServer :: forall (n :: * -> *) request (m :: * -> *) response.
(Binary request, Binary response, MonadIO m, MonadLoggerIO n,
Show response) =>
AddressDescription
-> Maybe ClientParams -> n (request -> m response)
connectServer AddressDescription
addr Maybe ClientParams
tls = do
Loc -> Text -> LogLevel -> LogStr -> IO ()
logging <- n (Loc -> Text -> LogLevel -> LogStr -> IO ())
forall (m :: * -> *).
MonadLoggerIO m =>
m (Loc -> Text -> LogLevel -> LogStr -> IO ())
askLoggerIO
IO (request -> m response) -> n (request -> m response)
forall a. IO a -> n a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (request -> m response) -> n (request -> m response))
-> IO (request -> m response) -> n (request -> m response)
forall a b. (a -> b) -> a -> b
$ do
Socket
so <- SockAddr -> IO Socket
forall (m :: * -> *). MonadIO m => SockAddr -> m Socket
connectSocket (SockAddr -> IO Socket) -> IO SockAddr -> IO Socket
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< AddressDescription -> IO SockAddr
forall (m :: * -> *).
(MonadIO m, MonadFail m) =>
AddressDescription -> m SockAddr
resolveAddr AddressDescription
addr
TVar (ClientState request response)
state <-
ClientState request response
-> IO (TVar (ClientState request response))
forall a. a -> IO (TVar a)
newTVarIO
ClientState
{ csServerAlive :: Bool
csServerAlive = Bool
True
, csResponders :: Map MessageId (response -> IO ())
csResponders = Map MessageId (response -> IO ())
forall k a. Map k a
Map.empty
, csMessageId :: MessageId
csMessageId = MessageId
forall a. Bounded a => a
minBound
, csRequestQueue :: [(request, response -> IO ())]
csRequestQueue = []
}
(ByteString -> IO ()
send, ByteStream IO ()
responseSource) <- Socket -> IO (ByteString -> IO (), ByteStream IO ())
prepareConnection Socket
so
IO ThreadId -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO ThreadId -> IO ()) -> (IO () -> IO ThreadId) -> IO () -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO () -> IO ThreadId
forkIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ (LoggingT IO ()
-> (Loc -> Text -> LogLevel -> LogStr -> IO ()) -> IO ()
forall (m :: * -> *) a.
LoggingT m a -> (Loc -> Text -> LogLevel -> LogStr -> IO ()) -> m a
`runLoggingT` Loc -> Text -> LogLevel -> LogStr -> IO ()
logging) ((ByteString -> IO ())
-> TVar (ClientState request response) -> LoggingT IO ()
requestThread ByteString -> IO ()
send TVar (ClientState request response)
state)
IO ThreadId -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO ThreadId -> IO ()) -> (IO () -> IO ThreadId) -> IO () -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO () -> IO ThreadId
forkIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ (LoggingT IO ()
-> (Loc -> Text -> LogLevel -> LogStr -> IO ()) -> IO ()
forall (m :: * -> *) a.
LoggingT m a -> (Loc -> Text -> LogLevel -> LogStr -> IO ()) -> m a
`runLoggingT` Loc -> Text -> LogLevel -> LogStr -> IO ()
logging) (ByteStream IO ()
-> TVar (ClientState request response) -> LoggingT IO ()
responseThread ByteStream IO ()
responseSource TVar (ClientState request response)
state)
(request -> m response) -> IO (request -> m response)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (\request
i -> IO response -> m response
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO response -> m response) -> IO response -> m response
forall a b. (a -> b) -> a -> b
$ do
MVar response
mvar <- IO (MVar response)
forall a. IO (MVar a)
newEmptyMVar
IO (IO response) -> IO response
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (IO (IO response) -> IO response)
-> (STM (IO response) -> IO (IO response))
-> STM (IO response)
-> IO response
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM (IO response) -> IO (IO response)
forall a. STM a -> IO a
atomically (STM (IO response) -> IO response)
-> STM (IO response) -> IO response
forall a b. (a -> b) -> a -> b
$
TVar (ClientState request response)
-> STM (ClientState request response)
forall a. TVar a -> STM a
readTVar TVar (ClientState request response)
state STM (ClientState request response)
-> (ClientState request response -> STM (IO response))
-> STM (IO response)
forall a b. STM a -> (a -> STM b) -> STM b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
ClientState {csServerAlive :: forall i o. ClientState i o -> Bool
csServerAlive = Bool
False} -> IO response -> STM (IO response)
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return (IO response -> STM (IO response))
-> IO response -> STM (IO response)
forall a b. (a -> b) -> a -> b
$
IOError -> IO response
forall e a. (HasCallStack, Exception e) => e -> IO a
forall (m :: * -> *) e a.
(MonadThrow m, HasCallStack, Exception e) =>
e -> m a
throwM ([Char] -> IOError
userError [Char]
"Server connection died.")
s :: ClientState request response
s@ClientState {[(request, response -> IO ())]
csRequestQueue :: forall i o. ClientState i o -> [(i, o -> IO ())]
csRequestQueue :: [(request, response -> IO ())]
csRequestQueue} -> do
TVar (ClientState request response)
-> ClientState request response -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (ClientState request response)
state ClientState request response
s {
csRequestQueue = csRequestQueue <> [(i, putMVar mvar)]
}
IO response -> STM (IO response)
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return (MVar response -> IO response
forall a. MVar a -> IO a
takeMVar MVar response
mvar)
)
where
prepareConnection
:: Socket
-> IO
( BSL.ByteString -> IO ()
, ByteStream IO ()
)
prepareConnection :: Socket -> IO (ByteString -> IO (), ByteStream IO ())
prepareConnection Socket
so =
case Maybe ClientParams
tls of
Maybe ClientParams
Nothing ->
(ByteString -> IO (), ByteStream IO ())
-> IO (ByteString -> IO (), ByteStream IO ())
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure
( Socket -> ByteString -> IO ()
sendAll Socket
so
, (Socket -> IO ByteString) -> Socket -> ByteStream IO ()
forall (m :: * -> *) c.
Monad m =>
(c -> m ByteString) -> c -> ByteStream m ()
rereadNull
((Socket -> Int -> IO ByteString) -> Int -> Socket -> IO ByteString
forall a b c. (a -> b -> c) -> b -> a -> c
flip Socket -> Int -> IO ByteString
recv Int
4096)
Socket
so
)
Just ClientParams
params -> do
Context
ctx <- Socket -> ClientParams -> IO Context
forall (m :: * -> *) backend params.
(MonadIO m, HasBackend backend, TLSParams params) =>
backend -> params -> m Context
contextNew Socket
so ClientParams
params
Context -> IO ()
forall (m :: * -> *). MonadIO m => Context -> m ()
handshake Context
ctx
(ByteString -> IO (), ByteStream IO ())
-> IO (ByteString -> IO (), ByteStream IO ())
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Context -> ByteString -> IO ()
send Context
ctx, Context -> ByteStream IO ()
resSource Context
ctx)
where
send :: Context -> BSL.ByteString -> IO ()
send :: Context -> ByteString -> IO ()
send = Context -> ByteString -> IO ()
forall (m :: * -> *). MonadIO m => Context -> ByteString -> m ()
sendData
resSource
:: Context
-> ByteStream IO ()
resSource :: Context -> ByteStream IO ()
resSource = do
(Context -> IO ByteString) -> Context -> ByteStream IO ()
forall (m :: * -> *) c.
Monad m =>
(c -> m ByteString) -> c -> ByteStream m ()
rereadNull Context -> IO ByteString
forall (m :: * -> *). MonadIO m => Context -> m ByteString
recvData
requestThread
:: (BSL.ByteString -> IO ())
-> TVar (ClientState request response)
-> LoggingT IO ()
requestThread :: (ByteString -> IO ())
-> TVar (ClientState request response) -> LoggingT IO ()
requestThread ByteString -> IO ()
send TVar (ClientState request response)
state =
LoggingT IO (LoggingT IO ()) -> LoggingT IO ()
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (LoggingT IO (LoggingT IO ()) -> LoggingT IO ())
-> (STM (LoggingT IO ()) -> LoggingT IO (LoggingT IO ()))
-> STM (LoggingT IO ())
-> LoggingT IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO (LoggingT IO ()) -> LoggingT IO (LoggingT IO ())
forall a. IO a -> LoggingT IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (LoggingT IO ()) -> LoggingT IO (LoggingT IO ()))
-> (STM (LoggingT IO ()) -> IO (LoggingT IO ()))
-> STM (LoggingT IO ())
-> LoggingT IO (LoggingT IO ())
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM (LoggingT IO ()) -> IO (LoggingT IO ())
forall a. STM a -> IO a
atomically (STM (LoggingT IO ()) -> LoggingT IO ())
-> STM (LoggingT IO ()) -> LoggingT IO ()
forall a b. (a -> b) -> a -> b
$
TVar (ClientState request response)
-> STM (ClientState request response)
forall a. TVar a -> STM a
readTVar TVar (ClientState request response)
state STM (ClientState request response)
-> (ClientState request response -> STM (LoggingT IO ()))
-> STM (LoggingT IO ())
forall a b. STM a -> (a -> STM b) -> STM b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
ClientState {csServerAlive :: forall i o. ClientState i o -> Bool
csServerAlive = Bool
False} -> LoggingT IO () -> STM (LoggingT IO ())
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (() -> LoggingT IO ()
forall a. a -> LoggingT IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ())
ClientState {csRequestQueue :: forall i o. ClientState i o -> [(i, o -> IO ())]
csRequestQueue = []} -> STM (LoggingT IO ())
forall a. STM a
retry
s :: ClientState request response
s@ClientState {
csRequestQueue :: forall i o. ClientState i o -> [(i, o -> IO ())]
csRequestQueue = (request
m, response -> IO ()
r):[(request, response -> IO ())]
remaining,
Map MessageId (response -> IO ())
csResponders :: forall i o. ClientState i o -> Map MessageId (o -> IO ())
csResponders :: Map MessageId (response -> IO ())
csResponders,
MessageId
csMessageId :: forall i o. ClientState i o -> MessageId
csMessageId :: MessageId
csMessageId
}
-> do
TVar (ClientState request response)
-> ClientState request response -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (ClientState request response)
state ClientState request response
s {
csRequestQueue = remaining,
csResponders = Map.insert csMessageId r csResponders,
csMessageId = succ csMessageId
}
LoggingT IO () -> STM (LoggingT IO ())
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (LoggingT IO () -> STM (LoggingT IO ()))
-> LoggingT IO () -> STM (LoggingT IO ())
forall a b. (a -> b) -> a -> b
$ do
IO () -> LoggingT IO ()
forall a. IO a -> LoggingT IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> LoggingT IO ()) -> IO () -> LoggingT IO ()
forall a b. (a -> b) -> a -> b
$ ByteString -> IO ()
send (Request request -> ByteString
forall a. Binary a => a -> ByteString
encode (MessageId -> request -> Request request
forall p. MessageId -> p -> Request p
Request MessageId
csMessageId request
m))
(ByteString -> IO ())
-> TVar (ClientState request response) -> LoggingT IO ()
requestThread ByteString -> IO ()
send TVar (ClientState request response)
state
responseThread
:: ByteStream IO ()
-> TVar (ClientState request response)
-> LoggingT IO ()
responseThread :: ByteStream IO ()
-> TVar (ClientState request response) -> LoggingT IO ()
responseThread ByteStream IO ()
resSource TVar (ClientState request response)
stateT = do
LoggingT IO () -> LoggingT IO (Either SomeException ())
forall (m :: * -> *) e a.
(HasCallStack, MonadCatch m, Exception e) =>
m a -> m (Either e a)
try
(
LoggingT IO (ByteStream IO (), ByteOffset, Either [Char] ())
-> LoggingT IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (LoggingT IO (ByteStream IO (), ByteOffset, Either [Char] ())
-> LoggingT IO ())
-> LoggingT IO (ByteStream IO (), ByteOffset, Either [Char] ())
-> LoggingT IO ()
forall a b. (a -> b) -> a -> b
$
(Response response -> LoggingT IO ())
-> Stream
(Of (Response response))
(LoggingT IO)
(ByteStream IO (), ByteOffset, Either [Char] ())
-> LoggingT IO (ByteStream IO (), ByteOffset, Either [Char] ())
forall (m :: * -> *) a x r.
Monad m =>
(a -> m x) -> Stream (Of a) m r -> m r
S.mapM_
Response response -> LoggingT IO ()
handleResponse
((forall a. IO a -> LoggingT IO a)
-> Stream
(Of (Response response))
IO
(ByteStream IO (), ByteOffset, Either [Char] ())
-> Stream
(Of (Response response))
(LoggingT IO)
(ByteStream IO (), ByteOffset, Either [Char] ())
forall {k} (t :: (* -> *) -> k -> *) (m :: * -> *) (n :: * -> *)
(b :: k).
(MFunctor t, Monad m) =>
(forall a. m a -> n a) -> t m b -> t n b
forall (m :: * -> *) (n :: * -> *) b.
Monad m =>
(forall a. m a -> n a)
-> Stream (Of (Response response)) m b
-> Stream (Of (Response response)) n b
hoist IO a -> LoggingT IO a
forall a. IO a -> LoggingT IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (ByteStream IO ()
-> Stream
(Of (Response response))
IO
(ByteStream IO (), ByteOffset, Either [Char] ())
forall a (m :: * -> *) r.
(Binary a, Monad m) =>
ByteString m r
-> Stream (Of a) m (ByteString m r, ByteOffset, Either [Char] r)
decoded ByteStream IO ()
resSource))
)
LoggingT IO (Either SomeException ())
-> (Either SomeException () -> LoggingT IO ()) -> LoggingT IO ()
forall a b. LoggingT IO a -> (a -> LoggingT IO b) -> LoggingT IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Left SomeException
err -> do
Text -> LoggingT IO ()
forall (m :: * -> *). (HasCallStack, MonadLogger m) => Text -> m ()
logError
(Text -> LoggingT IO ()) -> Text -> LoggingT IO ()
forall a b. (a -> b) -> a -> b
$ Text
"Socket receive error: "
Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> SomeException -> Text
forall a b. (Show a, IsString b) => a -> b
showt (SomeException
err :: SomeException)
SomeException -> LoggingT IO ()
forall a e. Exception e => e -> a
throw SomeException
err
Right () ->
() -> LoggingT IO ()
forall a. a -> LoggingT IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
LoggingT IO ()
closeClientState
where
closeClientState :: LoggingT IO ()
closeClientState =
LoggingT IO (LoggingT IO ()) -> LoggingT IO ()
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (LoggingT IO (LoggingT IO ()) -> LoggingT IO ())
-> (STM (LoggingT IO ()) -> LoggingT IO (LoggingT IO ()))
-> STM (LoggingT IO ())
-> LoggingT IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO (LoggingT IO ()) -> LoggingT IO (LoggingT IO ())
forall a. IO a -> LoggingT IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (LoggingT IO ()) -> LoggingT IO (LoggingT IO ()))
-> (STM (LoggingT IO ()) -> IO (LoggingT IO ()))
-> STM (LoggingT IO ())
-> LoggingT IO (LoggingT IO ())
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM (LoggingT IO ()) -> IO (LoggingT IO ())
forall a. STM a -> IO a
atomically (STM (LoggingT IO ()) -> LoggingT IO ())
-> STM (LoggingT IO ()) -> LoggingT IO ()
forall a b. (a -> b) -> a -> b
$ do
ClientState request response
state <- TVar (ClientState request response)
-> STM (ClientState request response)
forall a. TVar a -> STM a
readTVar TVar (ClientState request response)
stateT
TVar (ClientState request response)
-> ClientState request response -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (ClientState request response)
stateT ClientState request response
state
{ csServerAlive = False
, csResponders = mempty
, csRequestQueue = mempty
}
LoggingT IO () -> STM (LoggingT IO ())
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (LoggingT IO () -> STM (LoggingT IO ()))
-> ([IO ()] -> LoggingT IO ()) -> [IO ()] -> STM (LoggingT IO ())
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (IO () -> LoggingT IO ()) -> [IO ()] -> LoggingT IO ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ IO () -> LoggingT IO ()
forall a. IO a -> LoggingT IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO ([IO ()] -> STM (LoggingT IO ()))
-> [IO ()] -> STM (LoggingT IO ())
forall a b. (a -> b) -> a -> b
$
[ response -> IO ()
respond (IOError -> response
forall a e. Exception e => e -> a
throw ([Char] -> IOError
userError [Char]
"Remote connection died."))
| response -> IO ()
respond <-
Map MessageId (response -> IO ()) -> [response -> IO ()]
forall k a. Map k a -> [a]
Map.elems (ClientState request response -> Map MessageId (response -> IO ())
forall i o. ClientState i o -> Map MessageId (o -> IO ())
csResponders ClientState request response
state)
[response -> IO ()] -> [response -> IO ()] -> [response -> IO ()]
forall a. Semigroup a => a -> a -> a
<> ((request, response -> IO ()) -> response -> IO ())
-> [(request, response -> IO ())] -> [response -> IO ()]
forall a b. (a -> b) -> [a] -> [b]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (request, response -> IO ()) -> response -> IO ()
forall a b. (a, b) -> b
snd (ClientState request response -> [(request, response -> IO ())]
forall i o. ClientState i o -> [(i, o -> IO ())]
csRequestQueue ClientState request response
state)
]
handleResponse :: Response response -> LoggingT IO ()
handleResponse :: Response response -> LoggingT IO ()
handleResponse
responsePackage :: Response response
responsePackage@Response
{ MessageId
responseTo :: forall p. Response p -> MessageId
responseTo :: MessageId
responseTo
, response
response :: forall p. Response p -> p
response :: response
response
}
= do
LoggingT IO (LoggingT IO ()) -> LoggingT IO ()
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (LoggingT IO (LoggingT IO ()) -> LoggingT IO ())
-> (STM (LoggingT IO ()) -> LoggingT IO (LoggingT IO ()))
-> STM (LoggingT IO ())
-> LoggingT IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO (LoggingT IO ()) -> LoggingT IO (LoggingT IO ())
forall (m :: * -> *) a. Monad m => m a -> LoggingT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (IO (LoggingT IO ()) -> LoggingT IO (LoggingT IO ()))
-> (STM (LoggingT IO ()) -> IO (LoggingT IO ()))
-> STM (LoggingT IO ())
-> LoggingT IO (LoggingT IO ())
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM (LoggingT IO ()) -> IO (LoggingT IO ())
forall a. STM a -> IO a
atomically (STM (LoggingT IO ()) -> LoggingT IO ())
-> STM (LoggingT IO ()) -> LoggingT IO ()
forall a b. (a -> b) -> a -> b
$ do
ClientState request response
state <- TVar (ClientState request response)
-> STM (ClientState request response)
forall a. TVar a -> STM a
readTVar TVar (ClientState request response)
stateT
case MessageId
-> Map MessageId (response -> IO ())
-> Maybe (response -> IO (), Map MessageId (response -> IO ()))
forall k v. Ord k => k -> Map k v -> Maybe (v, Map k v)
deleteFind MessageId
responseTo (ClientState request response -> Map MessageId (response -> IO ())
forall i o. ClientState i o -> Map MessageId (o -> IO ())
csResponders ClientState request response
state) of
Maybe (response -> IO (), Map MessageId (response -> IO ()))
Nothing ->
LoggingT IO () -> STM (LoggingT IO ())
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (LoggingT IO () -> STM (LoggingT IO ()))
-> (Text -> LoggingT IO ()) -> Text -> STM (LoggingT IO ())
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> LoggingT IO ()
forall (m :: * -> *). (HasCallStack, MonadLogger m) => Text -> m ()
logWarn (Text -> STM (LoggingT IO ())) -> Text -> STM (LoggingT IO ())
forall a b. (a -> b) -> a -> b
$
Text
"Unexpected server response: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Response response -> Text
forall a b. (Show a, IsString b) => a -> b
showt Response response
responsePackage
Just (response -> IO ()
respond, Map MessageId (response -> IO ())
newResponders) -> do
TVar (ClientState request response)
-> ClientState request response -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (ClientState request response)
stateT ClientState request response
state {csResponders = newResponders}
LoggingT IO () -> STM (LoggingT IO ())
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (LoggingT IO () -> STM (LoggingT IO ()))
-> (IO () -> LoggingT IO ()) -> IO () -> STM (LoggingT IO ())
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO () -> LoggingT IO ()
forall (m :: * -> *) a. Monad m => m a -> LoggingT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (IO () -> STM (LoggingT IO ())) -> IO () -> STM (LoggingT IO ())
forall a b. (a -> b) -> a -> b
$ response -> IO ()
respond response
response
data Endpoint = Endpoint {
Endpoint -> AddressDescription
bindAddr :: AddressDescription,
Endpoint -> Maybe (IO ServerParams)
tls :: Maybe (IO ServerParams)
}
deriving stock ((forall x. Endpoint -> Rep Endpoint x)
-> (forall x. Rep Endpoint x -> Endpoint) -> Generic Endpoint
forall x. Rep Endpoint x -> Endpoint
forall x. Endpoint -> Rep Endpoint x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cfrom :: forall x. Endpoint -> Rep Endpoint x
from :: forall x. Endpoint -> Rep Endpoint x
$cto :: forall x. Rep Endpoint x -> Endpoint
to :: forall x. Rep Endpoint x -> Endpoint
Generic)
data Response p = Response
{ forall p. Response p -> MessageId
responseTo :: MessageId
, forall p. Response p -> p
response :: p
}
deriving stock ((forall x. Response p -> Rep (Response p) x)
-> (forall x. Rep (Response p) x -> Response p)
-> Generic (Response p)
forall x. Rep (Response p) x -> Response p
forall x. Response p -> Rep (Response p) x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
forall p x. Rep (Response p) x -> Response p
forall p x. Response p -> Rep (Response p) x
$cfrom :: forall p x. Response p -> Rep (Response p) x
from :: forall x. Response p -> Rep (Response p) x
$cto :: forall p x. Rep (Response p) x -> Response p
to :: forall x. Rep (Response p) x -> Response p
Generic, Int -> Response p -> [Char] -> [Char]
[Response p] -> [Char] -> [Char]
Response p -> [Char]
(Int -> Response p -> [Char] -> [Char])
-> (Response p -> [Char])
-> ([Response p] -> [Char] -> [Char])
-> Show (Response p)
forall p. Show p => Int -> Response p -> [Char] -> [Char]
forall p. Show p => [Response p] -> [Char] -> [Char]
forall p. Show p => Response p -> [Char]
forall a.
(Int -> a -> [Char] -> [Char])
-> (a -> [Char]) -> ([a] -> [Char] -> [Char]) -> Show a
$cshowsPrec :: forall p. Show p => Int -> Response p -> [Char] -> [Char]
showsPrec :: Int -> Response p -> [Char] -> [Char]
$cshow :: forall p. Show p => Response p -> [Char]
show :: Response p -> [Char]
$cshowList :: forall p. Show p => [Response p] -> [Char] -> [Char]
showList :: [Response p] -> [Char] -> [Char]
Show)
instance (Binary p) => Binary (Response p)
newtype AddressDescription = AddressDescription {
AddressDescription -> Text
unAddressDescription :: Text
}
deriving stock ((forall x. AddressDescription -> Rep AddressDescription x)
-> (forall x. Rep AddressDescription x -> AddressDescription)
-> Generic AddressDescription
forall x. Rep AddressDescription x -> AddressDescription
forall x. AddressDescription -> Rep AddressDescription x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cfrom :: forall x. AddressDescription -> Rep AddressDescription x
from :: forall x. AddressDescription -> Rep AddressDescription x
$cto :: forall x. Rep AddressDescription x -> AddressDescription
to :: forall x. Rep AddressDescription x -> AddressDescription
Generic)
deriving newtype
( Get AddressDescription
[AddressDescription] -> Put
AddressDescription -> Put
(AddressDescription -> Put)
-> Get AddressDescription
-> ([AddressDescription] -> Put)
-> Binary AddressDescription
forall t. (t -> Put) -> Get t -> ([t] -> Put) -> Binary t
$cput :: AddressDescription -> Put
put :: AddressDescription -> Put
$cget :: Get AddressDescription
get :: Get AddressDescription
$cputList :: [AddressDescription] -> Put
putList :: [AddressDescription] -> Put
Binary
, AddressDescription -> AddressDescription -> Bool
(AddressDescription -> AddressDescription -> Bool)
-> (AddressDescription -> AddressDescription -> Bool)
-> Eq AddressDescription
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: AddressDescription -> AddressDescription -> Bool
== :: AddressDescription -> AddressDescription -> Bool
$c/= :: AddressDescription -> AddressDescription -> Bool
/= :: AddressDescription -> AddressDescription -> Bool
Eq
, Maybe AddressDescription
Value -> Parser [AddressDescription]
Value -> Parser AddressDescription
(Value -> Parser AddressDescription)
-> (Value -> Parser [AddressDescription])
-> Maybe AddressDescription
-> FromJSON AddressDescription
forall a.
(Value -> Parser a)
-> (Value -> Parser [a]) -> Maybe a -> FromJSON a
$cparseJSON :: Value -> Parser AddressDescription
parseJSON :: Value -> Parser AddressDescription
$cparseJSONList :: Value -> Parser [AddressDescription]
parseJSONList :: Value -> Parser [AddressDescription]
$comittedField :: Maybe AddressDescription
omittedField :: Maybe AddressDescription
FromJSON
, FromJSONKeyFunction [AddressDescription]
FromJSONKeyFunction AddressDescription
FromJSONKeyFunction AddressDescription
-> FromJSONKeyFunction [AddressDescription]
-> FromJSONKey AddressDescription
forall a.
FromJSONKeyFunction a -> FromJSONKeyFunction [a] -> FromJSONKey a
$cfromJSONKey :: FromJSONKeyFunction AddressDescription
fromJSONKey :: FromJSONKeyFunction AddressDescription
$cfromJSONKeyList :: FromJSONKeyFunction [AddressDescription]
fromJSONKeyList :: FromJSONKeyFunction [AddressDescription]
FromJSONKey
, [Char] -> AddressDescription
([Char] -> AddressDescription) -> IsString AddressDescription
forall a. ([Char] -> a) -> IsString a
$cfromString :: [Char] -> AddressDescription
fromString :: [Char] -> AddressDescription
IsString
, Semigroup AddressDescription
AddressDescription
Semigroup AddressDescription =>
AddressDescription
-> (AddressDescription -> AddressDescription -> AddressDescription)
-> ([AddressDescription] -> AddressDescription)
-> Monoid AddressDescription
[AddressDescription] -> AddressDescription
AddressDescription -> AddressDescription -> AddressDescription
forall a.
Semigroup a =>
a -> (a -> a -> a) -> ([a] -> a) -> Monoid a
$cmempty :: AddressDescription
mempty :: AddressDescription
$cmappend :: AddressDescription -> AddressDescription -> AddressDescription
mappend :: AddressDescription -> AddressDescription -> AddressDescription
$cmconcat :: [AddressDescription] -> AddressDescription
mconcat :: [AddressDescription] -> AddressDescription
Monoid
, Eq AddressDescription
Eq AddressDescription =>
(AddressDescription -> AddressDescription -> Ordering)
-> (AddressDescription -> AddressDescription -> Bool)
-> (AddressDescription -> AddressDescription -> Bool)
-> (AddressDescription -> AddressDescription -> Bool)
-> (AddressDescription -> AddressDescription -> Bool)
-> (AddressDescription -> AddressDescription -> AddressDescription)
-> (AddressDescription -> AddressDescription -> AddressDescription)
-> Ord AddressDescription
AddressDescription -> AddressDescription -> Bool
AddressDescription -> AddressDescription -> Ordering
AddressDescription -> AddressDescription -> AddressDescription
forall a.
Eq a =>
(a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
$ccompare :: AddressDescription -> AddressDescription -> Ordering
compare :: AddressDescription -> AddressDescription -> Ordering
$c< :: AddressDescription -> AddressDescription -> Bool
< :: AddressDescription -> AddressDescription -> Bool
$c<= :: AddressDescription -> AddressDescription -> Bool
<= :: AddressDescription -> AddressDescription -> Bool
$c> :: AddressDescription -> AddressDescription -> Bool
> :: AddressDescription -> AddressDescription -> Bool
$c>= :: AddressDescription -> AddressDescription -> Bool
>= :: AddressDescription -> AddressDescription -> Bool
$cmax :: AddressDescription -> AddressDescription -> AddressDescription
max :: AddressDescription -> AddressDescription -> AddressDescription
$cmin :: AddressDescription -> AddressDescription -> AddressDescription
min :: AddressDescription -> AddressDescription -> AddressDescription
Ord
, NonEmpty AddressDescription -> AddressDescription
AddressDescription -> AddressDescription -> AddressDescription
(AddressDescription -> AddressDescription -> AddressDescription)
-> (NonEmpty AddressDescription -> AddressDescription)
-> (forall b.
Integral b =>
b -> AddressDescription -> AddressDescription)
-> Semigroup AddressDescription
forall b.
Integral b =>
b -> AddressDescription -> AddressDescription
forall a.
(a -> a -> a)
-> (NonEmpty a -> a)
-> (forall b. Integral b => b -> a -> a)
-> Semigroup a
$c<> :: AddressDescription -> AddressDescription -> AddressDescription
<> :: AddressDescription -> AddressDescription -> AddressDescription
$csconcat :: NonEmpty AddressDescription -> AddressDescription
sconcat :: NonEmpty AddressDescription -> AddressDescription
$cstimes :: forall b.
Integral b =>
b -> AddressDescription -> AddressDescription
stimes :: forall b.
Integral b =>
b -> AddressDescription -> AddressDescription
Semigroup
, [AddressDescription] -> Value
[AddressDescription] -> Encoding
AddressDescription -> Bool
AddressDescription -> Value
AddressDescription -> Encoding
(AddressDescription -> Value)
-> (AddressDescription -> Encoding)
-> ([AddressDescription] -> Value)
-> ([AddressDescription] -> Encoding)
-> (AddressDescription -> Bool)
-> ToJSON AddressDescription
forall a.
(a -> Value)
-> (a -> Encoding)
-> ([a] -> Value)
-> ([a] -> Encoding)
-> (a -> Bool)
-> ToJSON a
$ctoJSON :: AddressDescription -> Value
toJSON :: AddressDescription -> Value
$ctoEncoding :: AddressDescription -> Encoding
toEncoding :: AddressDescription -> Encoding
$ctoJSONList :: [AddressDescription] -> Value
toJSONList :: [AddressDescription] -> Value
$ctoEncodingList :: [AddressDescription] -> Encoding
toEncodingList :: [AddressDescription] -> Encoding
$comitField :: AddressDescription -> Bool
omitField :: AddressDescription -> Bool
ToJSON
, ToJSONKeyFunction [AddressDescription]
ToJSONKeyFunction AddressDescription
ToJSONKeyFunction AddressDescription
-> ToJSONKeyFunction [AddressDescription]
-> ToJSONKey AddressDescription
forall a.
ToJSONKeyFunction a -> ToJSONKeyFunction [a] -> ToJSONKey a
$ctoJSONKey :: ToJSONKeyFunction AddressDescription
toJSONKey :: ToJSONKeyFunction AddressDescription
$ctoJSONKeyList :: ToJSONKeyFunction [AddressDescription]
toJSONKeyList :: ToJSONKeyFunction [AddressDescription]
ToJSONKey
)
instance Show AddressDescription where
show :: AddressDescription -> [Char]
show = Text -> [Char]
T.unpack (Text -> [Char])
-> (AddressDescription -> Text) -> AddressDescription -> [Char]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. AddressDescription -> Text
unAddressDescription
data ClientState i o = ClientState
{ forall i o. ClientState i o -> Bool
csServerAlive :: Bool
, forall i o. ClientState i o -> Map MessageId (o -> IO ())
csResponders :: Map MessageId (o -> IO ())
, forall i o. ClientState i o -> MessageId
csMessageId :: MessageId
, forall i o. ClientState i o -> [(i, o -> IO ())]
csRequestQueue :: [(i, o -> IO ())]
}
data Request p = Request
{ forall p. Request p -> MessageId
messageId :: MessageId
, forall p. Request p -> p
payload :: p
}
deriving stock ((forall x. Request p -> Rep (Request p) x)
-> (forall x. Rep (Request p) x -> Request p)
-> Generic (Request p)
forall x. Rep (Request p) x -> Request p
forall x. Request p -> Rep (Request p) x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
forall p x. Rep (Request p) x -> Request p
forall p x. Request p -> Rep (Request p) x
$cfrom :: forall p x. Request p -> Rep (Request p) x
from :: forall x. Request p -> Rep (Request p) x
$cto :: forall p x. Rep (Request p) x -> Request p
to :: forall x. Rep (Request p) x -> Request p
Generic, Int -> Request p -> [Char] -> [Char]
[Request p] -> [Char] -> [Char]
Request p -> [Char]
(Int -> Request p -> [Char] -> [Char])
-> (Request p -> [Char])
-> ([Request p] -> [Char] -> [Char])
-> Show (Request p)
forall p. Show p => Int -> Request p -> [Char] -> [Char]
forall p. Show p => [Request p] -> [Char] -> [Char]
forall p. Show p => Request p -> [Char]
forall a.
(Int -> a -> [Char] -> [Char])
-> (a -> [Char]) -> ([a] -> [Char] -> [Char]) -> Show a
$cshowsPrec :: forall p. Show p => Int -> Request p -> [Char] -> [Char]
showsPrec :: Int -> Request p -> [Char] -> [Char]
$cshow :: forall p. Show p => Request p -> [Char]
show :: Request p -> [Char]
$cshowList :: forall p. Show p => [Request p] -> [Char] -> [Char]
showList :: [Request p] -> [Char] -> [Char]
Show)
instance (Binary p) => Binary (Request p)
newtype MessageId = MessageId {
MessageId -> Word32
_unMessageId :: Word32
}
deriving newtype (Get MessageId
[MessageId] -> Put
MessageId -> Put
(MessageId -> Put)
-> Get MessageId -> ([MessageId] -> Put) -> Binary MessageId
forall t. (t -> Put) -> Get t -> ([t] -> Put) -> Binary t
$cput :: MessageId -> Put
put :: MessageId -> Put
$cget :: Get MessageId
get :: Get MessageId
$cputList :: [MessageId] -> Put
putList :: [MessageId] -> Put
Binary, Integer -> MessageId
MessageId -> MessageId
MessageId -> MessageId -> MessageId
(MessageId -> MessageId -> MessageId)
-> (MessageId -> MessageId -> MessageId)
-> (MessageId -> MessageId -> MessageId)
-> (MessageId -> MessageId)
-> (MessageId -> MessageId)
-> (MessageId -> MessageId)
-> (Integer -> MessageId)
-> Num MessageId
forall a.
(a -> a -> a)
-> (a -> a -> a)
-> (a -> a -> a)
-> (a -> a)
-> (a -> a)
-> (a -> a)
-> (Integer -> a)
-> Num a
$c+ :: MessageId -> MessageId -> MessageId
+ :: MessageId -> MessageId -> MessageId
$c- :: MessageId -> MessageId -> MessageId
- :: MessageId -> MessageId -> MessageId
$c* :: MessageId -> MessageId -> MessageId
* :: MessageId -> MessageId -> MessageId
$cnegate :: MessageId -> MessageId
negate :: MessageId -> MessageId
$cabs :: MessageId -> MessageId
abs :: MessageId -> MessageId
$csignum :: MessageId -> MessageId
signum :: MessageId -> MessageId
$cfromInteger :: Integer -> MessageId
fromInteger :: Integer -> MessageId
Num, MessageId
MessageId -> MessageId -> Bounded MessageId
forall a. a -> a -> Bounded a
$cminBound :: MessageId
minBound :: MessageId
$cmaxBound :: MessageId
maxBound :: MessageId
Bounded, MessageId -> MessageId -> Bool
(MessageId -> MessageId -> Bool)
-> (MessageId -> MessageId -> Bool) -> Eq MessageId
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: MessageId -> MessageId -> Bool
== :: MessageId -> MessageId -> Bool
$c/= :: MessageId -> MessageId -> Bool
/= :: MessageId -> MessageId -> Bool
Eq, Eq MessageId
Eq MessageId =>
(MessageId -> MessageId -> Ordering)
-> (MessageId -> MessageId -> Bool)
-> (MessageId -> MessageId -> Bool)
-> (MessageId -> MessageId -> Bool)
-> (MessageId -> MessageId -> Bool)
-> (MessageId -> MessageId -> MessageId)
-> (MessageId -> MessageId -> MessageId)
-> Ord MessageId
MessageId -> MessageId -> Bool
MessageId -> MessageId -> Ordering
MessageId -> MessageId -> MessageId
forall a.
Eq a =>
(a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
$ccompare :: MessageId -> MessageId -> Ordering
compare :: MessageId -> MessageId -> Ordering
$c< :: MessageId -> MessageId -> Bool
< :: MessageId -> MessageId -> Bool
$c<= :: MessageId -> MessageId -> Bool
<= :: MessageId -> MessageId -> Bool
$c> :: MessageId -> MessageId -> Bool
> :: MessageId -> MessageId -> Bool
$c>= :: MessageId -> MessageId -> Bool
>= :: MessageId -> MessageId -> Bool
$cmax :: MessageId -> MessageId -> MessageId
max :: MessageId -> MessageId -> MessageId
$cmin :: MessageId -> MessageId -> MessageId
min :: MessageId -> MessageId -> MessageId
Ord, Int -> MessageId -> [Char] -> [Char]
[MessageId] -> [Char] -> [Char]
MessageId -> [Char]
(Int -> MessageId -> [Char] -> [Char])
-> (MessageId -> [Char])
-> ([MessageId] -> [Char] -> [Char])
-> Show MessageId
forall a.
(Int -> a -> [Char] -> [Char])
-> (a -> [Char]) -> ([a] -> [Char] -> [Char]) -> Show a
$cshowsPrec :: Int -> MessageId -> [Char] -> [Char]
showsPrec :: Int -> MessageId -> [Char] -> [Char]
$cshow :: MessageId -> [Char]
show :: MessageId -> [Char]
$cshowList :: [MessageId] -> [Char] -> [Char]
showList :: [MessageId] -> [Char] -> [Char]
Show, Int -> MessageId
MessageId -> Int
MessageId -> [MessageId]
MessageId -> MessageId
MessageId -> MessageId -> [MessageId]
MessageId -> MessageId -> MessageId -> [MessageId]
(MessageId -> MessageId)
-> (MessageId -> MessageId)
-> (Int -> MessageId)
-> (MessageId -> Int)
-> (MessageId -> [MessageId])
-> (MessageId -> MessageId -> [MessageId])
-> (MessageId -> MessageId -> [MessageId])
-> (MessageId -> MessageId -> MessageId -> [MessageId])
-> Enum MessageId
forall a.
(a -> a)
-> (a -> a)
-> (Int -> a)
-> (a -> Int)
-> (a -> [a])
-> (a -> a -> [a])
-> (a -> a -> [a])
-> (a -> a -> a -> [a])
-> Enum a
$csucc :: MessageId -> MessageId
succ :: MessageId -> MessageId
$cpred :: MessageId -> MessageId
pred :: MessageId -> MessageId
$ctoEnum :: Int -> MessageId
toEnum :: Int -> MessageId
$cfromEnum :: MessageId -> Int
fromEnum :: MessageId -> Int
$cenumFrom :: MessageId -> [MessageId]
enumFrom :: MessageId -> [MessageId]
$cenumFromThen :: MessageId -> MessageId -> [MessageId]
enumFromThen :: MessageId -> MessageId -> [MessageId]
$cenumFromTo :: MessageId -> MessageId -> [MessageId]
enumFromTo :: MessageId -> MessageId -> [MessageId]
$cenumFromThenTo :: MessageId -> MessageId -> MessageId -> [MessageId]
enumFromThenTo :: MessageId -> MessageId -> MessageId -> [MessageId]
Enum)
data Responded = Responded
mvarToStream
:: (MonadIO m)
=> MVar i
-> Stream (Of i) m never_returns
mvarToStream :: forall (m :: * -> *) i never_returns.
MonadIO m =>
MVar i -> Stream (Of i) m never_returns
mvarToStream MVar i
mvar = do
IO i -> Stream (Of i) m i
forall a. IO a -> Stream (Of i) m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (MVar i -> IO i
forall a. MVar a -> IO a
takeMVar MVar i
mvar) Stream (Of i) m i
-> (i -> Stream (Of i) m ()) -> Stream (Of i) m ()
forall a b.
Stream (Of i) m a -> (a -> Stream (Of i) m b) -> Stream (Of i) m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= i -> Stream (Of i) m ()
forall (m :: * -> *) a. Monad m => a -> Stream (Of a) m ()
S.yield
MVar i -> Stream (Of i) m never_returns
forall (m :: * -> *) i never_returns.
MonadIO m =>
MVar i -> Stream (Of i) m never_returns
mvarToStream MVar i
mvar
rereadNull
:: (Monad m)
=> (c -> m ByteString)
-> c
-> ByteStream m ()
rereadNull :: forall (m :: * -> *) c.
Monad m =>
(c -> m ByteString) -> c -> ByteStream m ()
rereadNull c -> m ByteString
f =
(c -> m (Maybe ByteString)) -> c -> ByteStream m ()
forall (m :: * -> *) s.
Monad m =>
(s -> m (Maybe ByteString)) -> s -> ByteStream m ()
reread
(\c
c -> do
ByteString
bytes <- c -> m ByteString
f c
c
Maybe ByteString -> m (Maybe ByteString)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe ByteString -> m (Maybe ByteString))
-> Maybe ByteString -> m (Maybe ByteString)
forall a b. (a -> b) -> a -> b
$ if ByteString -> Bool
BS.null ByteString
bytes then Maybe ByteString
forall a. Maybe a
Nothing else ByteString -> Maybe ByteString
forall a. a -> Maybe a
Just ByteString
bytes
)
deleteFind
:: (Ord k)
=> k
-> Map k v
-> Maybe (v, Map k v)
deleteFind :: forall k v. Ord k => k -> Map k v -> Maybe (v, Map k v)
deleteFind k
key Map k v
m =
case k -> Map k v -> Maybe v
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup k
key Map k v
m of
Maybe v
Nothing -> Maybe (v, Map k v)
forall a. Maybe a
Nothing
Just v
v ->
(v, Map k v) -> Maybe (v, Map k v)
forall a. a -> Maybe a
Just (v
v, k -> Map k v -> Map k v
forall k a. Ord k => k -> Map k a -> Map k a
Map.delete k
key Map k v
m)