{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE KindSignatures #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TupleSections #-}

-- |
-- Module      : Simplex.Messaging.Server
-- Copyright   : (c) simplex.chat
-- License     : AGPL-3
--
-- Maintainer  : chat@simplex.chat
-- Stability   : experimental
-- Portability : non-portable
--
-- This module defines SMP protocol server with in-memory persistence
-- and optional append only log of SMP queue records.
--
-- See https://github.com/simplex-chat/simplexmq/blob/master/protocol/simplex-messaging.md
module Simplex.Messaging.Server (runSMPServer, runSMPServerBlocking) where

import Control.Concurrent.STM (stateTVar)
import Control.Monad
import Control.Monad.Except
import Control.Monad.IO.Unlift
import Control.Monad.Reader
import Crypto.Random
import Data.ByteString.Char8 (ByteString)
import qualified Data.ByteString.Char8 as B
import Data.Functor (($>))
import qualified Data.Map.Strict as M
import Data.Time.Clock
import Network.Socket (ServiceName)
import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Protocol
import Simplex.Messaging.Server.Env.STM
import Simplex.Messaging.Server.MsgStore
import Simplex.Messaging.Server.MsgStore.STM (MsgQueue)
import Simplex.Messaging.Server.QueueStore
import Simplex.Messaging.Server.QueueStore.STM (QueueStore)
import Simplex.Messaging.Server.StoreLog
import Simplex.Messaging.Transport
import Simplex.Messaging.Util
import UnliftIO.Concurrent
import UnliftIO.Exception
import UnliftIO.IO
import UnliftIO.STM

-- | Runs an SMP server using passed configuration.
--
-- See a full server here: https://github.com/simplex-chat/simplexmq/blob/master/apps/smp-server/Main.hs
runSMPServer :: (MonadRandom m, MonadUnliftIO m) => ServerConfig -> m ()
runSMPServer :: ServerConfig -> m ()
runSMPServer ServerConfig
cfg = do
  TMVar Bool
started <- m (TMVar Bool)
forall (m :: * -> *) a. MonadIO m => m (TMVar a)
newEmptyTMVarIO
  TMVar Bool -> ServerConfig -> m ()
forall (m :: * -> *).
(MonadRandom m, MonadUnliftIO m) =>
TMVar Bool -> ServerConfig -> m ()
runSMPServerBlocking TMVar Bool
started ServerConfig
cfg

-- | Runs an SMP server using passed configuration with signalling.
--
-- This function uses passed TMVar to signal when the server is ready to accept TCP requests (True)
-- and when it is disconnected from the TCP socket once the server thread is killed (False).
runSMPServerBlocking :: (MonadRandom m, MonadUnliftIO m) => TMVar Bool -> ServerConfig -> m ()
runSMPServerBlocking :: TMVar Bool -> ServerConfig -> m ()
runSMPServerBlocking TMVar Bool
started cfg :: ServerConfig
cfg@ServerConfig {[(ServiceName, ATransport)]
$sel:transports:ServerConfig :: ServerConfig -> [(ServiceName, ATransport)]
transports :: [(ServiceName, ATransport)]
transports} = do
  Env
env <- ServerConfig -> m Env
forall (m :: * -> *).
(MonadUnliftIO m, MonadRandom m) =>
ServerConfig -> m Env
newEnv ServerConfig
cfg
  ReaderT Env m () -> Env -> m ()
forall r (m :: * -> *) a. ReaderT r m a -> r -> m a
runReaderT ReaderT Env m ()
forall (m' :: * -> *).
(MonadUnliftIO m', MonadReader Env m') =>
m' ()
smpServer Env
env
  where
    smpServer :: (MonadUnliftIO m', MonadReader Env m') => m' ()
    smpServer :: m' ()
smpServer = do
      Server
s <- (Env -> Server) -> m' Server
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks Env -> Server
server
      [m' ()] -> m' ()
forall (m :: * -> *) a. MonadUnliftIO m => [m a] -> m ()
raceAny_ (Server -> m' ()
forall (m' :: * -> *). MonadUnliftIO m' => Server -> m' ()
serverThread Server
s m' () -> [m' ()] -> [m' ()]
forall a. a -> [a] -> [a]
: ((ServiceName, ATransport) -> m' ())
-> [(ServiceName, ATransport)] -> [m' ()]
forall a b. (a -> b) -> [a] -> [b]
map (ServiceName, ATransport) -> m' ()
forall (m' :: * -> *).
(MonadUnliftIO m', MonadReader Env m') =>
(ServiceName, ATransport) -> m' ()
runServer [(ServiceName, ATransport)]
transports)
        m' () -> m' () -> m' ()
forall (m :: * -> *) a b. MonadUnliftIO m => m a -> m b -> m a
`finally` (StoreLog 'WriteMode -> IO ()) -> m' ()
forall (m :: * -> *) a.
(MonadUnliftIO m, MonadReader Env m) =>
(StoreLog 'WriteMode -> IO a) -> m ()
withLog StoreLog 'WriteMode -> IO ()
forall (a :: IOMode). StoreLog a -> IO ()
closeStoreLog

    runServer :: (MonadUnliftIO m', MonadReader Env m') => (ServiceName, ATransport) -> m' ()
    runServer :: (ServiceName, ATransport) -> m' ()
runServer (ServiceName
tcpPort, ATransport TProxy c
t) = TMVar Bool -> ServiceName -> (c -> m' ()) -> m' ()
forall c (m :: * -> *).
(Transport c, MonadUnliftIO m) =>
TMVar Bool -> ServiceName -> (c -> m ()) -> m ()
runTransportServer TMVar Bool
started ServiceName
tcpPort (TProxy c -> c -> m' ()
forall c (m :: * -> *).
(Transport c, MonadUnliftIO m, MonadReader Env m) =>
TProxy c -> c -> m ()
runClient TProxy c
t)

    serverThread :: MonadUnliftIO m' => Server -> m' ()
    serverThread :: Server -> m' ()
serverThread Server {TBQueue (RecipientId, Client)
$sel:subscribedQ:Server :: Server -> TBQueue (RecipientId, Client)
subscribedQ :: TBQueue (RecipientId, Client)
subscribedQ, TVar (Map RecipientId Client)
$sel:subscribers:Server :: Server -> TVar (Map RecipientId Client)
subscribers :: TVar (Map RecipientId Client)
subscribers} = m' () -> m' ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (m' () -> m' ()) -> m' () -> m' ()
forall a b. (a -> b) -> a -> b
$ do
      STM (Maybe (RecipientId, Client))
-> m' (Maybe (RecipientId, Client))
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically STM (Maybe (RecipientId, Client))
updateSubscribers m' (Maybe (RecipientId, Client))
-> (Maybe (RecipientId, Client) -> m' ()) -> m' ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
        Just (RecipientId
rId, Client {TBQueue Transmission
$sel:rcvQ:Client :: Client -> TBQueue Transmission
rcvQ :: TBQueue Transmission
rcvQ}) ->
          m' ThreadId -> m' ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m' ThreadId -> m' ())
-> (STM () -> m' ThreadId) -> STM () -> m' ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m' () -> m' ThreadId
forall (m :: * -> *). MonadUnliftIO m => m () -> m ThreadId
forkIO (m' () -> m' ThreadId)
-> (STM () -> m' ()) -> STM () -> m' ThreadId
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM () -> m' ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> m' ()) -> STM () -> m' ()
forall a b. (a -> b) -> a -> b
$
            TBQueue Transmission -> Transmission -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue Transmission
rcvQ (RecipientId -> CorrId
CorrId RecipientId
"", RecipientId
rId, SParty 'Broker -> Command 'Broker -> Cmd
forall (a :: Party). SParty a -> Command a -> Cmd
Cmd SParty 'Broker
SBroker Command 'Broker
END)
        Maybe (RecipientId, Client)
_ -> () -> m' ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
      where
        updateSubscribers :: STM (Maybe (RecipientId, Client))
        updateSubscribers :: STM (Maybe (RecipientId, Client))
updateSubscribers = do
          (RecipientId
rId, Client
c) <- TBQueue (RecipientId, Client) -> STM (RecipientId, Client)
forall a. TBQueue a -> STM a
readTBQueue TBQueue (RecipientId, Client)
subscribedQ
          TVar (Map RecipientId Client)
-> (Map RecipientId Client
    -> (Maybe Client, Map RecipientId Client))
-> STM (Maybe Client)
forall s a. TVar s -> (s -> (a, s)) -> STM a
stateTVar TVar (Map RecipientId Client)
subscribers (\Map RecipientId Client
cs -> (RecipientId -> Map RecipientId Client -> Maybe Client
forall k a. Ord k => k -> Map k a -> Maybe a
M.lookup RecipientId
rId Map RecipientId Client
cs, RecipientId
-> Client -> Map RecipientId Client -> Map RecipientId Client
forall k a. Ord k => k -> a -> Map k a -> Map k a
M.insert RecipientId
rId Client
c Map RecipientId Client
cs)) STM (Maybe Client)
-> (Maybe Client -> STM (Maybe (RecipientId, Client)))
-> STM (Maybe (RecipientId, Client))
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
            Just Client
c' -> RecipientId
-> Client -> Client -> STM (Maybe (RecipientId, Client))
clientToBeNotified RecipientId
rId Client
c Client
c'
            Maybe Client
_ -> Maybe (RecipientId, Client) -> STM (Maybe (RecipientId, Client))
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe (RecipientId, Client)
forall a. Maybe a
Nothing
        clientToBeNotified :: RecipientId -> Client -> Client -> STM (Maybe (RecipientId, Client))
        clientToBeNotified :: RecipientId
-> Client -> Client -> STM (Maybe (RecipientId, Client))
clientToBeNotified RecipientId
rId Client
c c' :: Client
c'@Client {TVar Bool
$sel:connected:Client :: Client -> TVar Bool
connected :: TVar Bool
connected}
          | Client -> Natural
clientId Client
c Natural -> Natural -> Bool
forall a. Eq a => a -> a -> Bool
/= Client -> Natural
clientId Client
c' = do
            Bool
yes <- TVar Bool -> STM Bool
forall a. TVar a -> STM a
readTVar TVar Bool
connected
            Maybe (RecipientId, Client) -> STM (Maybe (RecipientId, Client))
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe (RecipientId, Client) -> STM (Maybe (RecipientId, Client)))
-> Maybe (RecipientId, Client) -> STM (Maybe (RecipientId, Client))
forall a b. (a -> b) -> a -> b
$ if Bool
yes then (RecipientId, Client) -> Maybe (RecipientId, Client)
forall a. a -> Maybe a
Just (RecipientId
rId, Client
c') else Maybe (RecipientId, Client)
forall a. Maybe a
Nothing
          | Bool
otherwise = Maybe (RecipientId, Client) -> STM (Maybe (RecipientId, Client))
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe (RecipientId, Client)
forall a. Maybe a
Nothing

runClient :: (Transport c, MonadUnliftIO m, MonadReader Env m) => TProxy c -> c -> m ()
runClient :: TProxy c -> c -> m ()
runClient TProxy c
_ c
h = do
  FullKeyPair
keyPair <- (Env -> FullKeyPair) -> m FullKeyPair
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks Env -> FullKeyPair
serverKeyPair
  ServerConfig {Int
$sel:blockSize:ServerConfig :: ServerConfig -> Int
blockSize :: Int
blockSize} <- (Env -> ServerConfig) -> m ServerConfig
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks Env -> ServerConfig
config
  IO (Either TransportError (THandle c))
-> m (Either TransportError (THandle c))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (ExceptT TransportError IO (THandle c)
-> IO (Either TransportError (THandle c))
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT (ExceptT TransportError IO (THandle c)
 -> IO (Either TransportError (THandle c)))
-> ExceptT TransportError IO (THandle c)
-> IO (Either TransportError (THandle c))
forall a b. (a -> b) -> a -> b
$ c -> Int -> FullKeyPair -> ExceptT TransportError IO (THandle c)
forall c.
Transport c =>
c -> Int -> FullKeyPair -> ExceptT TransportError IO (THandle c)
serverHandshake c
h Int
blockSize FullKeyPair
keyPair) m (Either TransportError (THandle c))
-> (Either TransportError (THandle c) -> m ()) -> m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
    Right THandle c
th -> THandle c -> m ()
forall c (m :: * -> *).
(Transport c, MonadUnliftIO m, MonadReader Env m) =>
THandle c -> m ()
runClientTransport THandle c
th
    Left TransportError
_ -> () -> m ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()

runClientTransport :: (Transport c, MonadUnliftIO m, MonadReader Env m) => THandle c -> m ()
runClientTransport :: THandle c -> m ()
runClientTransport THandle c
th = do
  Natural
q <- (Env -> Natural) -> m Natural
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks ((Env -> Natural) -> m Natural) -> (Env -> Natural) -> m Natural
forall a b. (a -> b) -> a -> b
$ ServerConfig -> Natural
tbqSize (ServerConfig -> Natural)
-> (Env -> ServerConfig) -> Env -> Natural
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Env -> ServerConfig
config
  Server
s <- (Env -> Server) -> m Server
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks Env -> Server
server
  Client
c <- STM Client -> m Client
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM Client -> m Client) -> STM Client -> m Client
forall a b. (a -> b) -> a -> b
$ Server -> Natural -> STM Client
newClient Server
s Natural
q
  [m ()] -> m ()
forall (m :: * -> *) a. MonadUnliftIO m => [m a] -> m ()
raceAny_ [THandle c -> Client -> m ()
forall c (m :: * -> *).
(Transport c, MonadUnliftIO m) =>
THandle c -> Client -> m ()
send THandle c
th Client
c, Client -> Server -> m ()
forall (m :: * -> *).
(MonadUnliftIO m, MonadReader Env m) =>
Client -> Server -> m ()
client Client
c Server
s, THandle c -> Client -> m ()
forall c (m :: * -> *).
(Transport c, MonadUnliftIO m, MonadReader Env m) =>
THandle c -> Client -> m ()
receive THandle c
th Client
c]
    m () -> m () -> m ()
forall (m :: * -> *) a b. MonadUnliftIO m => m a -> m b -> m a
`finally` Client -> m ()
forall (m :: * -> *).
(MonadUnliftIO m, MonadReader Env m) =>
Client -> m ()
clientDisconnected Client
c

clientDisconnected :: (MonadUnliftIO m, MonadReader Env m) => Client -> m ()
clientDisconnected :: Client -> m ()
clientDisconnected c :: Client
c@Client {TVar (Map RecipientId Sub)
$sel:subscriptions:Client :: Client -> TVar (Map RecipientId Sub)
subscriptions :: TVar (Map RecipientId Sub)
subscriptions, TVar Bool
connected :: TVar Bool
$sel:connected:Client :: Client -> TVar Bool
connected} = do
  STM () -> m ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> m ()) -> STM () -> m ()
forall a b. (a -> b) -> a -> b
$ TVar Bool -> Bool -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar Bool
connected Bool
False
  Map RecipientId Sub
subs <- TVar (Map RecipientId Sub) -> m (Map RecipientId Sub)
forall (m :: * -> *) a. MonadIO m => TVar a -> m a
readTVarIO TVar (Map RecipientId Sub)
subscriptions
  (Sub -> m ()) -> Map RecipientId Sub -> m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ Sub -> m ()
forall (m :: * -> *). MonadUnliftIO m => Sub -> m ()
cancelSub Map RecipientId Sub
subs
  TVar (Map RecipientId Client)
cs <- (Env -> TVar (Map RecipientId Client))
-> m (TVar (Map RecipientId Client))
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks ((Env -> TVar (Map RecipientId Client))
 -> m (TVar (Map RecipientId Client)))
-> (Env -> TVar (Map RecipientId Client))
-> m (TVar (Map RecipientId Client))
forall a b. (a -> b) -> a -> b
$ Server -> TVar (Map RecipientId Client)
subscribers (Server -> TVar (Map RecipientId Client))
-> (Env -> Server) -> Env -> TVar (Map RecipientId Client)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Env -> Server
server
  STM () -> m ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> m ())
-> ([RecipientId] -> STM ()) -> [RecipientId] -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (RecipientId -> STM ()) -> [RecipientId] -> STM ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (TVar (Map RecipientId Client)
-> (Map RecipientId Client -> Map RecipientId Client) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar TVar (Map RecipientId Client)
cs ((Map RecipientId Client -> Map RecipientId Client) -> STM ())
-> (RecipientId
    -> Map RecipientId Client -> Map RecipientId Client)
-> RecipientId
-> STM ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Client -> Maybe Client)
-> RecipientId -> Map RecipientId Client -> Map RecipientId Client
forall k a. Ord k => (a -> Maybe a) -> k -> Map k a -> Map k a
M.update Client -> Maybe Client
deleteCurrentClient) ([RecipientId] -> m ()) -> [RecipientId] -> m ()
forall a b. (a -> b) -> a -> b
$ Map RecipientId Sub -> [RecipientId]
forall k a. Map k a -> [k]
M.keys Map RecipientId Sub
subs
  where
    deleteCurrentClient :: Client -> Maybe Client
    deleteCurrentClient :: Client -> Maybe Client
deleteCurrentClient Client
c'
      | Client -> Natural
clientId Client
c Natural -> Natural -> Bool
forall a. Eq a => a -> a -> Bool
== Client -> Natural
clientId Client
c' = Maybe Client
forall a. Maybe a
Nothing
      | Bool
otherwise = Client -> Maybe Client
forall a. a -> Maybe a
Just Client
c'

cancelSub :: MonadUnliftIO m => Sub -> m ()
cancelSub :: Sub -> m ()
cancelSub = \case
  Sub {$sel:subThread:Sub :: Sub -> SubscriptionThread
subThread = SubThread ThreadId
t} -> ThreadId -> m ()
forall (m :: * -> *). MonadIO m => ThreadId -> m ()
killThread ThreadId
t
  Sub
_ -> () -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()

receive :: (Transport c, MonadUnliftIO m, MonadReader Env m) => THandle c -> Client -> m ()
receive :: THandle c -> Client -> m ()
receive THandle c
h Client {TBQueue Transmission
rcvQ :: TBQueue Transmission
$sel:rcvQ:Client :: Client -> TBQueue Transmission
rcvQ} = m () -> m ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
  (Signature
signature, (CorrId
corrId, RecipientId
queueId, Either ErrorType Cmd
cmdOrError)) <- (Cmd -> Either ErrorType Cmd)
-> THandle c -> m (Signature, TransmissionOrError)
forall c (m :: * -> *).
(Transport c, MonadIO m) =>
(Cmd -> Either ErrorType Cmd)
-> THandle c -> m (Signature, TransmissionOrError)
tGet Cmd -> Either ErrorType Cmd
fromClient THandle c
h
  Transmission
t <- case Either ErrorType Cmd
cmdOrError of
    Left ErrorType
e -> Transmission -> m Transmission
forall (m :: * -> *) a. Monad m => a -> m a
return (Transmission -> m Transmission)
-> (Command 'Broker -> Transmission)
-> Command 'Broker
-> m Transmission
forall b c a. (b -> c) -> (a -> b) -> a -> c
. CorrId -> RecipientId -> Command 'Broker -> Transmission
mkResp CorrId
corrId RecipientId
queueId (Command 'Broker -> m Transmission)
-> Command 'Broker -> m Transmission
forall a b. (a -> b) -> a -> b
$ ErrorType -> Command 'Broker
ERR ErrorType
e
    Right Cmd
cmd -> SignedTransmission -> m Transmission
forall (m :: * -> *).
(MonadUnliftIO m, MonadReader Env m) =>
SignedTransmission -> m Transmission
verifyTransmission (Signature
signature, (CorrId
corrId, RecipientId
queueId, Cmd
cmd))
  STM () -> m ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> m ()) -> STM () -> m ()
forall a b. (a -> b) -> a -> b
$ TBQueue Transmission -> Transmission -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue Transmission
rcvQ Transmission
t

send :: (Transport c, MonadUnliftIO m) => THandle c -> Client -> m ()
send :: THandle c -> Client -> m ()
send THandle c
h Client {TBQueue Transmission
$sel:sndQ:Client :: Client -> TBQueue Transmission
sndQ :: TBQueue Transmission
sndQ} = m (Either TransportError ()) -> m ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (m (Either TransportError ()) -> m ())
-> m (Either TransportError ()) -> m ()
forall a b. (a -> b) -> a -> b
$ do
  Transmission
t <- STM Transmission -> m Transmission
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM Transmission -> m Transmission)
-> STM Transmission -> m Transmission
forall a b. (a -> b) -> a -> b
$ TBQueue Transmission -> STM Transmission
forall a. TBQueue a -> STM a
readTBQueue TBQueue Transmission
sndQ
  IO (Either TransportError ()) -> m (Either TransportError ())
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either TransportError ()) -> m (Either TransportError ()))
-> IO (Either TransportError ()) -> m (Either TransportError ())
forall a b. (a -> b) -> a -> b
$ THandle c -> SignedRawTransmission -> IO (Either TransportError ())
forall c.
Transport c =>
THandle c -> SignedRawTransmission -> IO (Either TransportError ())
tPut THandle c
h (Signature
"", Transmission -> RecipientId
serializeTransmission Transmission
t)

mkResp :: CorrId -> QueueId -> Command 'Broker -> Transmission
mkResp :: CorrId -> RecipientId -> Command 'Broker -> Transmission
mkResp CorrId
corrId RecipientId
queueId Command 'Broker
command = (CorrId
corrId, RecipientId
queueId, SParty 'Broker -> Command 'Broker -> Cmd
forall (a :: Party). SParty a -> Command a -> Cmd
Cmd SParty 'Broker
SBroker Command 'Broker
command)

verifyTransmission :: forall m. (MonadUnliftIO m, MonadReader Env m) => SignedTransmission -> m Transmission
verifyTransmission :: SignedTransmission -> m Transmission
verifyTransmission (Signature
sig, t :: Transmission
t@(CorrId
corrId, RecipientId
queueId, Cmd
cmd)) = do
  (CorrId
corrId,RecipientId
queueId,) (Cmd -> Transmission) -> m Cmd -> m Transmission
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> case Cmd
cmd of
    Cmd SParty a
SBroker Command a
_ -> Cmd -> m Cmd
forall (m :: * -> *) a. Monad m => a -> m a
return (Cmd -> m Cmd) -> Cmd -> m Cmd
forall a b. (a -> b) -> a -> b
$ ErrorType -> Cmd
smpErr ErrorType
INTERNAL -- it can only be client command, because `fromClient` was used
    Cmd SParty a
SRecipient (NEW RecipientPublicKey
k) -> Cmd -> m Cmd
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Cmd -> m Cmd) -> Cmd -> m Cmd
forall a b. (a -> b) -> a -> b
$ RecipientPublicKey -> Cmd
verifySignature RecipientPublicKey
k
    Cmd SParty a
SRecipient Command a
_ -> SParty 'Recipient -> (QueueRec -> Cmd) -> m Cmd
forall (p :: Party). SParty p -> (QueueRec -> Cmd) -> m Cmd
verifyCmd SParty 'Recipient
SRecipient ((QueueRec -> Cmd) -> m Cmd) -> (QueueRec -> Cmd) -> m Cmd
forall a b. (a -> b) -> a -> b
$ RecipientPublicKey -> Cmd
verifySignature (RecipientPublicKey -> Cmd)
-> (QueueRec -> RecipientPublicKey) -> QueueRec -> Cmd
forall b c a. (b -> c) -> (a -> b) -> a -> c
. QueueRec -> RecipientPublicKey
recipientKey
    Cmd SParty a
SSender (SEND RecipientId
_) -> SParty 'Sender -> (QueueRec -> Cmd) -> m Cmd
forall (p :: Party). SParty p -> (QueueRec -> Cmd) -> m Cmd
verifyCmd SParty 'Sender
SSender ((QueueRec -> Cmd) -> m Cmd) -> (QueueRec -> Cmd) -> m Cmd
forall a b. (a -> b) -> a -> b
$ Signature -> Maybe RecipientPublicKey -> Cmd
verifySend Signature
sig (Maybe RecipientPublicKey -> Cmd)
-> (QueueRec -> Maybe RecipientPublicKey) -> QueueRec -> Cmd
forall b c a. (b -> c) -> (a -> b) -> a -> c
. QueueRec -> Maybe RecipientPublicKey
senderKey
    Cmd SParty a
SSender Command a
PING -> Cmd -> m Cmd
forall (m :: * -> *) a. Monad m => a -> m a
return Cmd
cmd
  where
    verifyCmd :: SParty p -> (QueueRec -> Cmd) -> m Cmd
    verifyCmd :: SParty p -> (QueueRec -> Cmd) -> m Cmd
verifyCmd SParty p
party QueueRec -> Cmd
f = do
      QueueStore
st <- (Env -> QueueStore) -> m QueueStore
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks Env -> QueueStore
queueStore
      Either ErrorType QueueRec
q <- STM (Either ErrorType QueueRec) -> m (Either ErrorType QueueRec)
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM (Either ErrorType QueueRec) -> m (Either ErrorType QueueRec))
-> STM (Either ErrorType QueueRec) -> m (Either ErrorType QueueRec)
forall a b. (a -> b) -> a -> b
$ QueueStore
-> SParty p -> RecipientId -> STM (Either ErrorType QueueRec)
forall s (m :: * -> *) (a :: Party).
MonadQueueStore s m =>
s -> SParty a -> RecipientId -> m (Either ErrorType QueueRec)
getQueue QueueStore
st SParty p
party RecipientId
queueId
      Cmd -> m Cmd
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Cmd -> m Cmd) -> Cmd -> m Cmd
forall a b. (a -> b) -> a -> b
$ (ErrorType -> Cmd)
-> (QueueRec -> Cmd) -> Either ErrorType QueueRec -> Cmd
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either (Cmd -> ErrorType -> Cmd
forall a b. a -> b -> a
const (Cmd -> ErrorType -> Cmd) -> Cmd -> ErrorType -> Cmd
forall a b. (a -> b) -> a -> b
$ Cmd -> Cmd
forall a. a -> a
dummyVerify Cmd
authErr) QueueRec -> Cmd
f Either ErrorType QueueRec
q
    verifySend :: C.Signature -> Maybe SenderPublicKey -> Cmd
    verifySend :: Signature -> Maybe RecipientPublicKey -> Cmd
verifySend Signature
"" = Cmd
-> (RecipientPublicKey -> Cmd) -> Maybe RecipientPublicKey -> Cmd
forall b a. b -> (a -> b) -> Maybe a -> b
maybe Cmd
cmd (Cmd -> RecipientPublicKey -> Cmd
forall a b. a -> b -> a
const Cmd
authErr)
    verifySend Signature
_ = Cmd
-> (RecipientPublicKey -> Cmd) -> Maybe RecipientPublicKey -> Cmd
forall b a. b -> (a -> b) -> Maybe a -> b
maybe Cmd
authErr RecipientPublicKey -> Cmd
verifySignature
    verifySignature :: C.PublicKey -> Cmd
    verifySignature :: RecipientPublicKey -> Cmd
verifySignature RecipientPublicKey
key = if RecipientPublicKey -> Bool
verify RecipientPublicKey
key then Cmd
cmd else Cmd
authErr
    verify :: RecipientPublicKey -> Bool
verify RecipientPublicKey
key
      | RecipientPublicKey -> Int
C.publicKeySize RecipientPublicKey
key Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
sigLen = RecipientPublicKey -> Bool
cryptoVerify RecipientPublicKey
key
      | Bool
otherwise = Bool -> Bool
forall a. a -> a
dummyVerify Bool
False
    cryptoVerify :: RecipientPublicKey -> Bool
cryptoVerify RecipientPublicKey
key = RecipientPublicKey -> Signature -> RecipientId -> Bool
C.verify RecipientPublicKey
key Signature
sig (Transmission -> RecipientId
serializeTransmission Transmission
t)
    smpErr :: ErrorType -> Cmd
smpErr = SParty 'Broker -> Command 'Broker -> Cmd
forall (a :: Party). SParty a -> Command a -> Cmd
Cmd SParty 'Broker
SBroker (Command 'Broker -> Cmd)
-> (ErrorType -> Command 'Broker) -> ErrorType -> Cmd
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ErrorType -> Command 'Broker
ERR
    authErr :: Cmd
authErr = ErrorType -> Cmd
smpErr ErrorType
AUTH
    dummyVerify :: a -> a
    dummyVerify :: a -> a
dummyVerify = Bool -> a -> a
seq (Bool -> a -> a) -> Bool -> a -> a
forall a b. (a -> b) -> a -> b
$
      RecipientPublicKey -> Bool
cryptoVerify (RecipientPublicKey -> Bool) -> RecipientPublicKey -> Bool
forall a b. (a -> b) -> a -> b
$ case Int
sigLen of
        Int
128 -> RecipientPublicKey
dummyKey128
        Int
256 -> RecipientPublicKey
dummyKey256
        Int
384 -> RecipientPublicKey
dummyKey384
        Int
512 -> RecipientPublicKey
dummyKey512
        Int
_ -> RecipientPublicKey
dummyKey256
    sigLen :: Int
sigLen = RecipientId -> Int
B.length (RecipientId -> Int) -> RecipientId -> Int
forall a b. (a -> b) -> a -> b
$ Signature -> RecipientId
C.unSignature Signature
sig

-- These dummy keys are used with `dummyVerify` function to mitigate timing attacks
-- by having the same time of the response whether a queue exists or nor, for all valid key/signature sizes
dummyKey128 :: C.PublicKey
dummyKey128 :: RecipientPublicKey
dummyKey128 = RecipientPublicKey
"MIIBIDANBgkqhkiG9w0BAQEFAAOCAQ0AMIIBCAKBgQC2oeA7s4roXN5K2N6022I1/2CTeMKjWH0m00bSZWa4N8LDKeFcShh8YUxZea5giAveViTRNOOVLgcuXbKvR3u24szN04xP0+KnYUuUUIIoT3YSjX0IlomhDhhSyup4BmA0gAZ+D1OaIKZFX6J8yQ1Lr/JGLEfSRsBjw8l+4hs9OwKBgQDKA+YlZvGb3BcpDwKmatiCXN7ZRDWkjXbj8VAW5zV95tSRCCVN48hrFM1H4Ju2QMMUc6kPUVX+eW4ZjdCl5blIqIHMcTmsdcmsDDCg3PjUNrwc6bv/1TcirbAKcmnKt9iurIt6eerxSO7TZUXXMUVsi7eRwb/RUNhpCrpJ/hpIOw=="

dummyKey256 :: C.PublicKey
dummyKey256 :: RecipientPublicKey
dummyKey256 = RecipientPublicKey
"MIIBoDANBgkqhkiG9w0BAQEFAAOCAY0AMIIBiAKCAQEAxwmTvaqmdTbkfUGNi8Yu0L/T4cxuOlQlx3zGZ9X9Qx0+oZjknWK+QHrdWTcpS+zH4Hi7fP6kanOQoQ90Hj6Ghl57VU1GEdUPywSw4i1/7t0Wv9uT9Q2ktHp2rqVo3xkC9IVIpL7EZAxdRviIN2OsOB3g4a/F1ZpjxcAaZeOMUugiAX1+GtkLuE0Xn4neYjCaOghLxQTdhybN70VtnkiQLx/X9NjkDIl/spYGm3tQFMyYKkP6IWoEpj0926hJ0fmlmhy8tAOhlZsb/baW5cgkEZ3E9jVVrySCgQzoLQgma610FIISRpRJbSyv26jU7MkMxiyuBiDaFOORkXFttoKbtQKBgEbDS9II2brsz+vfI7uP8atFcawkE52cx4M1UWQhqb1H3tBiRl+qO+dMq1pPQF2bW7dlZAWYzS4W/367bTAuALHBDGB8xi1P4Njhh9vaOgTvuqrHG9NJQ85BLy0qGw8rjIWSIXVmVpfrXFJ8po5l04UE258Ll2yocv3QRQmddQW9"

dummyKey384 :: C.PublicKey
dummyKey384 :: RecipientPublicKey
dummyKey384 = RecipientPublicKey
"MIICITANBgkqhkiG9w0BAQEFAAOCAg4AMIICCQKCAYEAthExp77lSFBMB0RedjgKIU+oNH5lMGdMqDCG0E5Ly7X49rFpfDMMN08GDIgvzg9kcwV3ScbPcjUE19wmAShX9f9k3w38KM3wmIBKSiuCREQl0V3xAYp1SYwiAkMNSSwxuIkDEeSOR56WdEcZvqbB4lY9MQlUv70KriPDxZaqKCTKslUezXHQuYPQX6eMnGFK7hxz5Kl5MajV52d+5iXsa8CA+m/e1KVnbelCO+xhN89xG8ALt0CJ9k5Wwo3myLgXi4dmNankCmg8jkh+7y2ywkzxMwH1JydDtV/FLzkbZsbPR2w93TNrTq1RJOuqMyh0VtdBSpxNW/Ft988TkkX2BAWzx82INw7W6/QbHGNtHNB995R4sgeYy8QbEpNGBhQnfQh7yRWygLTVXWKApQzzfCeIoDDWUS7dMv/zXoasAnpDBj+6UhHv3BHrps7kBvRyZQ2d/nUuAqiGd43ljJ++n6vNyFLgZoiV7HLia/FOGMkdt7j92CNmFHxiT6Xl7kRHAoGBAPNoWny2O7LBxzAKMLmQVHBAiKp6RMx+7URvtQDHDHPaZ7F3MvtvmYWwGzund3cQFAaV1EkJoYeI3YRuj6xdXgMyMaP54On++btArb6jUtZuvlC98qE8dEEHQNh+7TsCiMU+ivbeKFxS9A/B7OVedoMnPoJWhatbA9zB/6L1GNPh"

dummyKey512 :: C.PublicKey
dummyKey512 :: RecipientPublicKey
dummyKey512 = RecipientPublicKey
"MIICoDANBgkqhkiG9w0BAQEFAAOCAo0AMIICiAKCAgEArkCY9DuverJ4mmzDektv9aZMFyeRV46WZK9NsOBKEc+1ncqMs+LhLti9asKNgUBRbNzmbOe0NYYftrUpwnATaenggkTFxxbJ4JGJuGYbsEdFWkXSvrbWGtM8YUmn5RkAGme12xQ89bSM4VoJAGnrYPHwmcQd+KYCPZvTUsxaxgrJTX65ejHN9BsAn8XtGViOtHTDJO9yUMD2WrJvd7wnNa+0ugEteDLzMU++xS98VC+uA1vfauUqi3yXVchdfrLdVUuM+JE0gUEXCgzjuHkaoHiaGNiGhdPYoAJJdOKQOIHAKdk7Th6OPhirPhc9XYNB4O8JDthKhNtfokvFIFlC4QBRzJhpLIENaEBDt08WmgpOnecZB/CuxkqqOrNa8j5K5jNrtXAI67W46VEC2jeQy/gZwb64Zit2A4D00xXzGbQTPGj4ehcEMhLx5LSCygViEf0w0tN3c3TEyUcgPzvECd2ZVpQLr9Z4a07Ebr+YSuxcHhjg4Rg1VyJyOTTvaCBGm5X2B3+tI4NUttmikIHOYpBnsLmHY2BgfH2KcrIsDyAhInXmTFr/L2+erFarUnlfATd2L8Ti43TNHDedO6k6jI5Gyi62yPwjqPLEIIK8l+pIeNfHJ3pPmjhHBfzFcQLMMMXffHWNK8kWklrQXK+4j4HiPcTBvlO1FEtG9nEIZhUCgYA4a6WtI2k5YNli1C89GY5rGUY7RP71T6RWri/D3Lz9T7GvU+FemAyYmsvCQwqijUOur0uLvwSP8VdxpSUcrjJJSWur2hrPWzWlu0XbNaeizxpFeKbQP+zSrWJ1z8RwfAeUjShxt8q1TuqGqY10wQyp3nyiTGvS+KwZVj5h5qx8NQ=="

client :: forall m. (MonadUnliftIO m, MonadReader Env m) => Client -> Server -> m ()
client :: Client -> Server -> m ()
client clnt :: Client
clnt@Client {TVar (Map RecipientId Sub)
subscriptions :: TVar (Map RecipientId Sub)
$sel:subscriptions:Client :: Client -> TVar (Map RecipientId Sub)
subscriptions, TBQueue Transmission
rcvQ :: TBQueue Transmission
$sel:rcvQ:Client :: Client -> TBQueue Transmission
rcvQ, TBQueue Transmission
sndQ :: TBQueue Transmission
$sel:sndQ:Client :: Client -> TBQueue Transmission
sndQ} Server {TBQueue (RecipientId, Client)
subscribedQ :: TBQueue (RecipientId, Client)
$sel:subscribedQ:Server :: Server -> TBQueue (RecipientId, Client)
subscribedQ} =
  m () -> m ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
    STM Transmission -> m Transmission
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (TBQueue Transmission -> STM Transmission
forall a. TBQueue a -> STM a
readTBQueue TBQueue Transmission
rcvQ)
      m Transmission
-> (Transmission -> m Transmission) -> m Transmission
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Transmission -> m Transmission
processCommand
      m Transmission -> (Transmission -> m ()) -> m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= STM () -> m ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> m ())
-> (Transmission -> STM ()) -> Transmission -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TBQueue Transmission -> Transmission -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue Transmission
sndQ
  where
    processCommand :: Transmission -> m Transmission
    processCommand :: Transmission -> m Transmission
processCommand (CorrId
corrId, RecipientId
queueId, Cmd
cmd) = do
      QueueStore
st <- (Env -> QueueStore) -> m QueueStore
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks Env -> QueueStore
queueStore
      case Cmd
cmd of
        Cmd SParty a
SBroker Command a
END -> m ()
unsubscribeQueue m () -> Transmission -> m Transmission
forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> (CorrId
corrId, RecipientId
queueId, Cmd
cmd)
        Cmd SParty a
SBroker Command a
_ -> Transmission -> m Transmission
forall (m :: * -> *) a. Monad m => a -> m a
return (CorrId
corrId, RecipientId
queueId, Cmd
cmd)
        Cmd SParty a
SSender Command a
command -> case Command a
command of
          SEND RecipientId
msgBody -> QueueStore -> RecipientId -> m Transmission
sendMessage QueueStore
st RecipientId
msgBody
          Command a
PING -> Transmission -> m Transmission
forall (m :: * -> *) a. Monad m => a -> m a
return (CorrId
corrId, RecipientId
queueId, SParty 'Broker -> Command 'Broker -> Cmd
forall (a :: Party). SParty a -> Command a -> Cmd
Cmd SParty 'Broker
SBroker Command 'Broker
PONG)
        Cmd SParty a
SRecipient Command a
command -> case Command a
command of
          NEW RecipientPublicKey
rKey -> QueueStore -> RecipientPublicKey -> m Transmission
createQueue QueueStore
st RecipientPublicKey
rKey
          Command a
SUB -> RecipientId -> m Transmission
subscribeQueue RecipientId
queueId
          Command a
ACK -> m Transmission
acknowledgeMsg
          KEY RecipientPublicKey
sKey -> QueueStore -> RecipientPublicKey -> m Transmission
secureQueue_ QueueStore
st RecipientPublicKey
sKey
          Command a
OFF -> QueueStore -> m Transmission
suspendQueue_ QueueStore
st
          Command a
DEL -> QueueStore -> m Transmission
delQueueAndMsgs QueueStore
st
      where
        createQueue :: QueueStore -> RecipientPublicKey -> m Transmission
        createQueue :: QueueStore -> RecipientPublicKey -> m Transmission
createQueue QueueStore
st RecipientPublicKey
rKey =
          RecipientPublicKey -> m (Command 'Broker) -> m Transmission
forall (m' :: * -> *).
Monad m' =>
RecipientPublicKey -> m' (Command 'Broker) -> m' Transmission
checkKeySize RecipientPublicKey
rKey m (Command 'Broker)
addSubscribe
          where
            addSubscribe :: m (Command 'Broker)
addSubscribe =
              Int -> m (Either ErrorType (RecipientId, RecipientId))
addQueueRetry Int
3 m (Either ErrorType (RecipientId, RecipientId))
-> (Either ErrorType (RecipientId, RecipientId)
    -> m (Command 'Broker))
-> m (Command 'Broker)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
                Left ErrorType
e -> Command 'Broker -> m (Command 'Broker)
forall (m :: * -> *) a. Monad m => a -> m a
return (Command 'Broker -> m (Command 'Broker))
-> Command 'Broker -> m (Command 'Broker)
forall a b. (a -> b) -> a -> b
$ ErrorType -> Command 'Broker
ERR ErrorType
e
                Right (RecipientId
rId, RecipientId
sId) -> do
                  (StoreLog 'WriteMode -> IO ()) -> m ()
forall (m :: * -> *) a.
(MonadUnliftIO m, MonadReader Env m) =>
(StoreLog 'WriteMode -> IO a) -> m ()
withLog (StoreLog 'WriteMode -> RecipientId -> IO ()
`logCreateById` RecipientId
rId)
                  RecipientId -> m Transmission
subscribeQueue RecipientId
rId m Transmission -> Command 'Broker -> m (Command 'Broker)
forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> RecipientId -> RecipientId -> Command 'Broker
IDS RecipientId
rId RecipientId
sId

            addQueueRetry :: Int -> m (Either ErrorType (RecipientId, SenderId))
            addQueueRetry :: Int -> m (Either ErrorType (RecipientId, RecipientId))
addQueueRetry Int
0 = Either ErrorType (RecipientId, RecipientId)
-> m (Either ErrorType (RecipientId, RecipientId))
forall (m :: * -> *) a. Monad m => a -> m a
return (Either ErrorType (RecipientId, RecipientId)
 -> m (Either ErrorType (RecipientId, RecipientId)))
-> Either ErrorType (RecipientId, RecipientId)
-> m (Either ErrorType (RecipientId, RecipientId))
forall a b. (a -> b) -> a -> b
$ ErrorType -> Either ErrorType (RecipientId, RecipientId)
forall a b. a -> Either a b
Left ErrorType
INTERNAL
            addQueueRetry Int
n = do
              (RecipientId, RecipientId)
ids <- m (RecipientId, RecipientId)
getIds
              STM (Either ErrorType ()) -> m (Either ErrorType ())
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (QueueStore
-> RecipientPublicKey
-> (RecipientId, RecipientId)
-> STM (Either ErrorType ())
forall s (m :: * -> *).
MonadQueueStore s m =>
s
-> RecipientPublicKey
-> (RecipientId, RecipientId)
-> m (Either ErrorType ())
addQueue QueueStore
st RecipientPublicKey
rKey (RecipientId, RecipientId)
ids) m (Either ErrorType ())
-> (Either ErrorType ()
    -> m (Either ErrorType (RecipientId, RecipientId)))
-> m (Either ErrorType (RecipientId, RecipientId))
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
                Left ErrorType
DUPLICATE_ -> Int -> m (Either ErrorType (RecipientId, RecipientId))
addQueueRetry (Int -> m (Either ErrorType (RecipientId, RecipientId)))
-> Int -> m (Either ErrorType (RecipientId, RecipientId))
forall a b. (a -> b) -> a -> b
$ Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1
                Left ErrorType
e -> Either ErrorType (RecipientId, RecipientId)
-> m (Either ErrorType (RecipientId, RecipientId))
forall (m :: * -> *) a. Monad m => a -> m a
return (Either ErrorType (RecipientId, RecipientId)
 -> m (Either ErrorType (RecipientId, RecipientId)))
-> Either ErrorType (RecipientId, RecipientId)
-> m (Either ErrorType (RecipientId, RecipientId))
forall a b. (a -> b) -> a -> b
$ ErrorType -> Either ErrorType (RecipientId, RecipientId)
forall a b. a -> Either a b
Left ErrorType
e
                Right ()
_ -> Either ErrorType (RecipientId, RecipientId)
-> m (Either ErrorType (RecipientId, RecipientId))
forall (m :: * -> *) a. Monad m => a -> m a
return (Either ErrorType (RecipientId, RecipientId)
 -> m (Either ErrorType (RecipientId, RecipientId)))
-> Either ErrorType (RecipientId, RecipientId)
-> m (Either ErrorType (RecipientId, RecipientId))
forall a b. (a -> b) -> a -> b
$ (RecipientId, RecipientId)
-> Either ErrorType (RecipientId, RecipientId)
forall a b. b -> Either a b
Right (RecipientId, RecipientId)
ids

            logCreateById :: StoreLog 'WriteMode -> RecipientId -> IO ()
            logCreateById :: StoreLog 'WriteMode -> RecipientId -> IO ()
logCreateById StoreLog 'WriteMode
s RecipientId
rId =
              STM (Either ErrorType QueueRec) -> IO (Either ErrorType QueueRec)
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (QueueStore
-> SParty 'Recipient
-> RecipientId
-> STM (Either ErrorType QueueRec)
forall s (m :: * -> *) (a :: Party).
MonadQueueStore s m =>
s -> SParty a -> RecipientId -> m (Either ErrorType QueueRec)
getQueue QueueStore
st SParty 'Recipient
SRecipient RecipientId
rId) IO (Either ErrorType QueueRec)
-> (Either ErrorType QueueRec -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
                Right QueueRec
q -> StoreLog 'WriteMode -> QueueRec -> IO ()
logCreateQueue StoreLog 'WriteMode
s QueueRec
q
                Either ErrorType QueueRec
_ -> () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()

            getIds :: m (RecipientId, SenderId)
            getIds :: m (RecipientId, RecipientId)
getIds = do
              Int
n <- (Env -> Int) -> m Int
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks ((Env -> Int) -> m Int) -> (Env -> Int) -> m Int
forall a b. (a -> b) -> a -> b
$ ServerConfig -> Int
queueIdBytes (ServerConfig -> Int) -> (Env -> ServerConfig) -> Env -> Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Env -> ServerConfig
config
              (RecipientId -> RecipientId -> (RecipientId, RecipientId))
-> m RecipientId -> m RecipientId -> m (RecipientId, RecipientId)
forall (m :: * -> *) a1 a2 r.
Monad m =>
(a1 -> a2 -> r) -> m a1 -> m a2 -> m r
liftM2 (,) (Int -> m RecipientId
forall (m :: * -> *).
(MonadUnliftIO m, MonadReader Env m) =>
Int -> m RecipientId
randomId Int
n) (Int -> m RecipientId
forall (m :: * -> *).
(MonadUnliftIO m, MonadReader Env m) =>
Int -> m RecipientId
randomId Int
n)

        secureQueue_ :: QueueStore -> SenderPublicKey -> m Transmission
        secureQueue_ :: QueueStore -> RecipientPublicKey -> m Transmission
secureQueue_ QueueStore
st RecipientPublicKey
sKey = do
          (StoreLog 'WriteMode -> IO ()) -> m ()
forall (m :: * -> *) a.
(MonadUnliftIO m, MonadReader Env m) =>
(StoreLog 'WriteMode -> IO a) -> m ()
withLog ((StoreLog 'WriteMode -> IO ()) -> m ())
-> (StoreLog 'WriteMode -> IO ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \StoreLog 'WriteMode
s -> StoreLog 'WriteMode -> RecipientId -> RecipientPublicKey -> IO ()
logSecureQueue StoreLog 'WriteMode
s RecipientId
queueId RecipientPublicKey
sKey
          STM Transmission -> m Transmission
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM Transmission -> m Transmission)
-> (STM (Command 'Broker) -> STM Transmission)
-> STM (Command 'Broker)
-> m Transmission
forall b c a. (b -> c) -> (a -> b) -> a -> c
. RecipientPublicKey -> STM (Command 'Broker) -> STM Transmission
forall (m' :: * -> *).
Monad m' =>
RecipientPublicKey -> m' (Command 'Broker) -> m' Transmission
checkKeySize RecipientPublicKey
sKey (STM (Command 'Broker) -> m Transmission)
-> STM (Command 'Broker) -> m Transmission
forall a b. (a -> b) -> a -> b
$ (ErrorType -> Command 'Broker)
-> (() -> Command 'Broker)
-> Either ErrorType ()
-> Command 'Broker
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either ErrorType -> Command 'Broker
ERR (Command 'Broker -> () -> Command 'Broker
forall a b. a -> b -> a
const Command 'Broker
OK) (Either ErrorType () -> Command 'Broker)
-> STM (Either ErrorType ()) -> STM (Command 'Broker)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> QueueStore
-> RecipientId -> RecipientPublicKey -> STM (Either ErrorType ())
forall s (m :: * -> *).
MonadQueueStore s m =>
s -> RecipientId -> RecipientPublicKey -> m (Either ErrorType ())
secureQueue QueueStore
st RecipientId
queueId RecipientPublicKey
sKey

        checkKeySize :: Monad m' => C.PublicKey -> m' (Command 'Broker) -> m' Transmission
        checkKeySize :: RecipientPublicKey -> m' (Command 'Broker) -> m' Transmission
checkKeySize RecipientPublicKey
key m' (Command 'Broker)
action =
          CorrId -> RecipientId -> Command 'Broker -> Transmission
mkResp CorrId
corrId RecipientId
queueId
            (Command 'Broker -> Transmission)
-> m' (Command 'Broker) -> m' Transmission
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> if Int -> Bool
C.validKeySize (Int -> Bool) -> Int -> Bool
forall a b. (a -> b) -> a -> b
$ RecipientPublicKey -> Int
C.publicKeySize RecipientPublicKey
key
              then m' (Command 'Broker)
action
              else Command 'Broker -> m' (Command 'Broker)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Command 'Broker -> m' (Command 'Broker))
-> (ErrorType -> Command 'Broker)
-> ErrorType
-> m' (Command 'Broker)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ErrorType -> Command 'Broker
ERR (ErrorType -> m' (Command 'Broker))
-> ErrorType -> m' (Command 'Broker)
forall a b. (a -> b) -> a -> b
$ CommandError -> ErrorType
CMD CommandError
KEY_SIZE

        suspendQueue_ :: QueueStore -> m Transmission
        suspendQueue_ :: QueueStore -> m Transmission
suspendQueue_ QueueStore
st = do
          (StoreLog 'WriteMode -> IO ()) -> m ()
forall (m :: * -> *) a.
(MonadUnliftIO m, MonadReader Env m) =>
(StoreLog 'WriteMode -> IO a) -> m ()
withLog (StoreLog 'WriteMode -> RecipientId -> IO ()
`logDeleteQueue` RecipientId
queueId)
          Either ErrorType () -> Transmission
okResp (Either ErrorType () -> Transmission)
-> m (Either ErrorType ()) -> m Transmission
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> STM (Either ErrorType ()) -> m (Either ErrorType ())
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (QueueStore -> RecipientId -> STM (Either ErrorType ())
forall s (m :: * -> *).
MonadQueueStore s m =>
s -> RecipientId -> m (Either ErrorType ())
suspendQueue QueueStore
st RecipientId
queueId)

        subscribeQueue :: RecipientId -> m Transmission
        subscribeQueue :: RecipientId -> m Transmission
subscribeQueue RecipientId
rId =
          STM Sub -> m Sub
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (RecipientId -> STM Sub
getSubscription RecipientId
rId) m Sub -> (Sub -> m Transmission) -> m Transmission
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (MsgQueue -> STM (Maybe Message))
-> RecipientId -> Sub -> m Transmission
deliverMessage MsgQueue -> STM (Maybe Message)
forall q (m :: * -> *). MonadMsgQueue q m => q -> m (Maybe Message)
tryPeekMsg RecipientId
rId

        getSubscription :: RecipientId -> STM Sub
        getSubscription :: RecipientId -> STM Sub
getSubscription RecipientId
rId = do
          Map RecipientId Sub
subs <- TVar (Map RecipientId Sub) -> STM (Map RecipientId Sub)
forall a. TVar a -> STM a
readTVar TVar (Map RecipientId Sub)
subscriptions
          case RecipientId -> Map RecipientId Sub -> Maybe Sub
forall k a. Ord k => k -> Map k a -> Maybe a
M.lookup RecipientId
rId Map RecipientId Sub
subs of
            Just Sub
s -> TMVar () -> STM (Maybe ())
forall a. TMVar a -> STM (Maybe a)
tryTakeTMVar (Sub -> TMVar ()
delivered Sub
s) STM (Maybe ()) -> Sub -> STM Sub
forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> Sub
s
            Maybe Sub
Nothing -> do
              TBQueue (RecipientId, Client) -> (RecipientId, Client) -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue (RecipientId, Client)
subscribedQ (RecipientId
rId, Client
clnt)
              Sub
s <- STM Sub
newSubscription
              TVar (Map RecipientId Sub) -> Map RecipientId Sub -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (Map RecipientId Sub)
subscriptions (Map RecipientId Sub -> STM ()) -> Map RecipientId Sub -> STM ()
forall a b. (a -> b) -> a -> b
$ RecipientId -> Sub -> Map RecipientId Sub -> Map RecipientId Sub
forall k a. Ord k => k -> a -> Map k a -> Map k a
M.insert RecipientId
rId Sub
s Map RecipientId Sub
subs
              Sub -> STM Sub
forall (m :: * -> *) a. Monad m => a -> m a
return Sub
s

        unsubscribeQueue :: m ()
        unsubscribeQueue :: m ()
unsubscribeQueue = do
          Maybe Sub
sub <- STM (Maybe Sub) -> m (Maybe Sub)
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM (Maybe Sub) -> m (Maybe Sub))
-> ((Map RecipientId Sub -> (Maybe Sub, Map RecipientId Sub))
    -> STM (Maybe Sub))
-> (Map RecipientId Sub -> (Maybe Sub, Map RecipientId Sub))
-> m (Maybe Sub)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TVar (Map RecipientId Sub)
-> (Map RecipientId Sub -> (Maybe Sub, Map RecipientId Sub))
-> STM (Maybe Sub)
forall s a. TVar s -> (s -> (a, s)) -> STM a
stateTVar TVar (Map RecipientId Sub)
subscriptions ((Map RecipientId Sub -> (Maybe Sub, Map RecipientId Sub))
 -> m (Maybe Sub))
-> (Map RecipientId Sub -> (Maybe Sub, Map RecipientId Sub))
-> m (Maybe Sub)
forall a b. (a -> b) -> a -> b
$
            \Map RecipientId Sub
cs -> (RecipientId -> Map RecipientId Sub -> Maybe Sub
forall k a. Ord k => k -> Map k a -> Maybe a
M.lookup RecipientId
queueId Map RecipientId Sub
cs, RecipientId -> Map RecipientId Sub -> Map RecipientId Sub
forall k a. Ord k => k -> Map k a -> Map k a
M.delete RecipientId
queueId Map RecipientId Sub
cs)
          (Sub -> m ()) -> Maybe Sub -> m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ Sub -> m ()
forall (m :: * -> *). MonadUnliftIO m => Sub -> m ()
cancelSub Maybe Sub
sub

        acknowledgeMsg :: m Transmission
        acknowledgeMsg :: m Transmission
acknowledgeMsg =
          STM (Maybe (Maybe Sub)) -> m (Maybe (Maybe Sub))
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (RecipientId -> (Sub -> STM (Maybe Sub)) -> STM (Maybe (Maybe Sub))
forall a. RecipientId -> (Sub -> STM a) -> STM (Maybe a)
withSub RecipientId
queueId ((Sub -> STM (Maybe Sub)) -> STM (Maybe (Maybe Sub)))
-> (Sub -> STM (Maybe Sub)) -> STM (Maybe (Maybe Sub))
forall a b. (a -> b) -> a -> b
$ \Sub
s -> Sub -> () -> Sub
forall a b. a -> b -> a
const Sub
s (() -> Sub) -> STM (Maybe ()) -> STM (Maybe Sub)
forall (f :: * -> *) (g :: * -> *) a b.
(Functor f, Functor g) =>
(a -> b) -> f (g a) -> f (g b)
<$$> TMVar () -> STM (Maybe ())
forall a. TMVar a -> STM (Maybe a)
tryTakeTMVar (Sub -> TMVar ()
delivered Sub
s))
            m (Maybe (Maybe Sub))
-> (Maybe (Maybe Sub) -> m Transmission) -> m Transmission
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
              Just (Just Sub
s) -> (MsgQueue -> STM (Maybe Message))
-> RecipientId -> Sub -> m Transmission
deliverMessage MsgQueue -> STM (Maybe Message)
forall q (m :: * -> *). MonadMsgQueue q m => q -> m (Maybe Message)
tryDelPeekMsg RecipientId
queueId Sub
s
              Maybe (Maybe Sub)
_ -> Transmission -> m Transmission
forall (m :: * -> *) a. Monad m => a -> m a
return (Transmission -> m Transmission) -> Transmission -> m Transmission
forall a b. (a -> b) -> a -> b
$ ErrorType -> Transmission
err ErrorType
NO_MSG

        withSub :: RecipientId -> (Sub -> STM a) -> STM (Maybe a)
        withSub :: RecipientId -> (Sub -> STM a) -> STM (Maybe a)
withSub RecipientId
rId Sub -> STM a
f = TVar (Map RecipientId Sub) -> STM (Map RecipientId Sub)
forall a. TVar a -> STM a
readTVar TVar (Map RecipientId Sub)
subscriptions STM (Map RecipientId Sub)
-> (Map RecipientId Sub -> STM (Maybe a)) -> STM (Maybe a)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (Sub -> STM a) -> Maybe Sub -> STM (Maybe a)
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
mapM Sub -> STM a
f (Maybe Sub -> STM (Maybe a))
-> (Map RecipientId Sub -> Maybe Sub)
-> Map RecipientId Sub
-> STM (Maybe a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. RecipientId -> Map RecipientId Sub -> Maybe Sub
forall k a. Ord k => k -> Map k a -> Maybe a
M.lookup RecipientId
rId

        sendMessage :: QueueStore -> MsgBody -> m Transmission
        sendMessage :: QueueStore -> RecipientId -> m Transmission
sendMessage QueueStore
st RecipientId
msgBody = do
          Either ErrorType QueueRec
qr <- STM (Either ErrorType QueueRec) -> m (Either ErrorType QueueRec)
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM (Either ErrorType QueueRec) -> m (Either ErrorType QueueRec))
-> STM (Either ErrorType QueueRec) -> m (Either ErrorType QueueRec)
forall a b. (a -> b) -> a -> b
$ QueueStore
-> SParty 'Sender -> RecipientId -> STM (Either ErrorType QueueRec)
forall s (m :: * -> *) (a :: Party).
MonadQueueStore s m =>
s -> SParty a -> RecipientId -> m (Either ErrorType QueueRec)
getQueue QueueStore
st SParty 'Sender
SSender RecipientId
queueId
          (ErrorType -> m Transmission)
-> (QueueRec -> m Transmission)
-> Either ErrorType QueueRec
-> m Transmission
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either (Transmission -> m Transmission
forall (m :: * -> *) a. Monad m => a -> m a
return (Transmission -> m Transmission)
-> (ErrorType -> Transmission) -> ErrorType -> m Transmission
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ErrorType -> Transmission
err) QueueRec -> m Transmission
storeMessage Either ErrorType QueueRec
qr
          where
            mkMessage :: m Message
            mkMessage :: m Message
mkMessage = do
              RecipientId
msgId <- (Env -> Int) -> m Int
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks (ServerConfig -> Int
msgIdBytes (ServerConfig -> Int) -> (Env -> ServerConfig) -> Env -> Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Env -> ServerConfig
config) m Int -> (Int -> m RecipientId) -> m RecipientId
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Int -> m RecipientId
forall (m :: * -> *).
(MonadUnliftIO m, MonadReader Env m) =>
Int -> m RecipientId
randomId
              UTCTime
ts <- IO UTCTime -> m UTCTime
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO UTCTime
getCurrentTime
              Message -> m Message
forall (m :: * -> *) a. Monad m => a -> m a
return (Message -> m Message) -> Message -> m Message
forall a b. (a -> b) -> a -> b
$ Message :: RecipientId -> UTCTime -> RecipientId -> Message
Message {RecipientId
msgId :: RecipientId
msgId :: RecipientId
msgId, UTCTime
ts :: UTCTime
ts :: UTCTime
ts, RecipientId
msgBody :: RecipientId
msgBody :: RecipientId
msgBody}

            storeMessage :: QueueRec -> m Transmission
            storeMessage :: QueueRec -> m Transmission
storeMessage QueueRec
qr = case QueueRec -> QueueStatus
status QueueRec
qr of
              QueueStatus
QueueOff -> Transmission -> m Transmission
forall (m :: * -> *) a. Monad m => a -> m a
return (Transmission -> m Transmission) -> Transmission -> m Transmission
forall a b. (a -> b) -> a -> b
$ ErrorType -> Transmission
err ErrorType
AUTH
              QueueStatus
QueueActive -> do
                STMMsgStore
ms <- (Env -> STMMsgStore) -> m STMMsgStore
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks Env -> STMMsgStore
msgStore
                Message
msg <- m Message
mkMessage
                Natural
quota <- (Env -> Natural) -> m Natural
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks ((Env -> Natural) -> m Natural) -> (Env -> Natural) -> m Natural
forall a b. (a -> b) -> a -> b
$ ServerConfig -> Natural
msgQueueQuota (ServerConfig -> Natural)
-> (Env -> ServerConfig) -> Env -> Natural
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Env -> ServerConfig
config
                STM Transmission -> m Transmission
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM Transmission -> m Transmission)
-> STM Transmission -> m Transmission
forall a b. (a -> b) -> a -> b
$ do
                  MsgQueue
q <- STMMsgStore -> RecipientId -> Natural -> STM MsgQueue
forall s q (m :: * -> *).
MonadMsgStore s q m =>
s -> RecipientId -> Natural -> m q
getMsgQueue STMMsgStore
ms (QueueRec -> RecipientId
recipientId QueueRec
qr) Natural
quota
                  MsgQueue -> STM Bool
forall q (m :: * -> *). MonadMsgQueue q m => q -> m Bool
isFull MsgQueue
q STM Bool -> (Bool -> STM Transmission) -> STM Transmission
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
                    Bool
False -> MsgQueue -> Message -> STM ()
forall q (m :: * -> *). MonadMsgQueue q m => q -> Message -> m ()
writeMsg MsgQueue
q Message
msg STM () -> Transmission -> STM Transmission
forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> Transmission
ok
                    Bool
True -> Transmission -> STM Transmission
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Transmission -> STM Transmission)
-> Transmission -> STM Transmission
forall a b. (a -> b) -> a -> b
$ ErrorType -> Transmission
err ErrorType
QUOTA

        deliverMessage :: (MsgQueue -> STM (Maybe Message)) -> RecipientId -> Sub -> m Transmission
        deliverMessage :: (MsgQueue -> STM (Maybe Message))
-> RecipientId -> Sub -> m Transmission
deliverMessage MsgQueue -> STM (Maybe Message)
tryPeek RecipientId
rId = \case
          Sub {$sel:subThread:Sub :: Sub -> SubscriptionThread
subThread = SubscriptionThread
NoSub} -> do
            STMMsgStore
ms <- (Env -> STMMsgStore) -> m STMMsgStore
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks Env -> STMMsgStore
msgStore
            Natural
quota <- (Env -> Natural) -> m Natural
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks ((Env -> Natural) -> m Natural) -> (Env -> Natural) -> m Natural
forall a b. (a -> b) -> a -> b
$ ServerConfig -> Natural
msgQueueQuota (ServerConfig -> Natural)
-> (Env -> ServerConfig) -> Env -> Natural
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Env -> ServerConfig
config
            MsgQueue
q <- STM MsgQueue -> m MsgQueue
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM MsgQueue -> m MsgQueue) -> STM MsgQueue -> m MsgQueue
forall a b. (a -> b) -> a -> b
$ STMMsgStore -> RecipientId -> Natural -> STM MsgQueue
forall s q (m :: * -> *).
MonadMsgStore s q m =>
s -> RecipientId -> Natural -> m q
getMsgQueue STMMsgStore
ms RecipientId
rId Natural
quota
            STM (Maybe Message) -> m (Maybe Message)
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (MsgQueue -> STM (Maybe Message)
tryPeek MsgQueue
q) m (Maybe Message)
-> (Maybe Message -> m Transmission) -> m Transmission
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
              Maybe Message
Nothing -> MsgQueue -> m ()
forkSub MsgQueue
q m () -> Transmission -> m Transmission
forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> Transmission
ok
              Just Message
msg -> STM (Maybe Bool) -> m (Maybe Bool)
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically STM (Maybe Bool)
setDelivered m (Maybe Bool) -> Transmission -> m Transmission
forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> CorrId -> RecipientId -> Command 'Broker -> Transmission
mkResp CorrId
corrId RecipientId
rId (Message -> Command 'Broker
msgCmd Message
msg)
          Sub
_ -> Transmission -> m Transmission
forall (m :: * -> *) a. Monad m => a -> m a
return Transmission
ok
          where
            forkSub :: MsgQueue -> m ()
            forkSub :: MsgQueue -> m ()
forkSub MsgQueue
q = do
              STM () -> m ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> m ())
-> ((Sub -> Sub) -> STM ()) -> (Sub -> Sub) -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Sub -> Sub) -> STM ()
setSub ((Sub -> Sub) -> m ()) -> (Sub -> Sub) -> m ()
forall a b. (a -> b) -> a -> b
$ \Sub
s -> Sub
s {$sel:subThread:Sub :: SubscriptionThread
subThread = SubscriptionThread
SubPending}
              ThreadId
t <- m () -> m ThreadId
forall (m :: * -> *). MonadUnliftIO m => m () -> m ThreadId
forkIO (m () -> m ThreadId) -> m () -> m ThreadId
forall a b. (a -> b) -> a -> b
$ MsgQueue -> m ()
subscriber MsgQueue
q
              STM () -> m ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> m ())
-> ((Sub -> Sub) -> STM ()) -> (Sub -> Sub) -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Sub -> Sub) -> STM ()
setSub ((Sub -> Sub) -> m ()) -> (Sub -> Sub) -> m ()
forall a b. (a -> b) -> a -> b
$ \case
                s :: Sub
s@Sub {$sel:subThread:Sub :: Sub -> SubscriptionThread
subThread = SubscriptionThread
SubPending} -> Sub
s {$sel:subThread:Sub :: SubscriptionThread
subThread = ThreadId -> SubscriptionThread
SubThread ThreadId
t}
                Sub
s -> Sub
s

            subscriber :: MsgQueue -> m ()
            subscriber :: MsgQueue -> m ()
subscriber MsgQueue
q = STM () -> m ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> m ()) -> STM () -> m ()
forall a b. (a -> b) -> a -> b
$ do
              Message
msg <- MsgQueue -> STM Message
forall q (m :: * -> *). MonadMsgQueue q m => q -> m Message
peekMsg MsgQueue
q
              TBQueue Transmission -> Transmission -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue Transmission
sndQ (Transmission -> STM ()) -> Transmission -> STM ()
forall a b. (a -> b) -> a -> b
$ CorrId -> RecipientId -> Command 'Broker -> Transmission
mkResp (RecipientId -> CorrId
CorrId RecipientId
"") RecipientId
rId (Message -> Command 'Broker
msgCmd Message
msg)
              (Sub -> Sub) -> STM ()
setSub (\Sub
s -> Sub
s {$sel:subThread:Sub :: SubscriptionThread
subThread = SubscriptionThread
NoSub})
              STM (Maybe Bool) -> STM ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void STM (Maybe Bool)
setDelivered

            setSub :: (Sub -> Sub) -> STM ()
            setSub :: (Sub -> Sub) -> STM ()
setSub Sub -> Sub
f = TVar (Map RecipientId Sub)
-> (Map RecipientId Sub -> Map RecipientId Sub) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar TVar (Map RecipientId Sub)
subscriptions ((Map RecipientId Sub -> Map RecipientId Sub) -> STM ())
-> (Map RecipientId Sub -> Map RecipientId Sub) -> STM ()
forall a b. (a -> b) -> a -> b
$ (Sub -> Sub)
-> RecipientId -> Map RecipientId Sub -> Map RecipientId Sub
forall k a. Ord k => (a -> a) -> k -> Map k a -> Map k a
M.adjust Sub -> Sub
f RecipientId
rId

            setDelivered :: STM (Maybe Bool)
            setDelivered :: STM (Maybe Bool)
setDelivered = RecipientId -> (Sub -> STM Bool) -> STM (Maybe Bool)
forall a. RecipientId -> (Sub -> STM a) -> STM (Maybe a)
withSub RecipientId
rId ((Sub -> STM Bool) -> STM (Maybe Bool))
-> (Sub -> STM Bool) -> STM (Maybe Bool)
forall a b. (a -> b) -> a -> b
$ \Sub
s -> TMVar () -> () -> STM Bool
forall a. TMVar a -> a -> STM Bool
tryPutTMVar (Sub -> TMVar ()
delivered Sub
s) ()

        delQueueAndMsgs :: QueueStore -> m Transmission
        delQueueAndMsgs :: QueueStore -> m Transmission
delQueueAndMsgs QueueStore
st = do
          (StoreLog 'WriteMode -> IO ()) -> m ()
forall (m :: * -> *) a.
(MonadUnliftIO m, MonadReader Env m) =>
(StoreLog 'WriteMode -> IO a) -> m ()
withLog (StoreLog 'WriteMode -> RecipientId -> IO ()
`logDeleteQueue` RecipientId
queueId)
          STMMsgStore
ms <- (Env -> STMMsgStore) -> m STMMsgStore
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks Env -> STMMsgStore
msgStore
          STM Transmission -> m Transmission
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM Transmission -> m Transmission)
-> STM Transmission -> m Transmission
forall a b. (a -> b) -> a -> b
$
            QueueStore -> RecipientId -> STM (Either ErrorType ())
forall s (m :: * -> *).
MonadQueueStore s m =>
s -> RecipientId -> m (Either ErrorType ())
deleteQueue QueueStore
st RecipientId
queueId STM (Either ErrorType ())
-> (Either ErrorType () -> STM Transmission) -> STM Transmission
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
              Left ErrorType
e -> Transmission -> STM Transmission
forall (m :: * -> *) a. Monad m => a -> m a
return (Transmission -> STM Transmission)
-> Transmission -> STM Transmission
forall a b. (a -> b) -> a -> b
$ ErrorType -> Transmission
err ErrorType
e
              Right ()
_ -> STMMsgStore -> RecipientId -> STM ()
forall s q (m :: * -> *).
MonadMsgStore s q m =>
s -> RecipientId -> m ()
delMsgQueue STMMsgStore
ms RecipientId
queueId STM () -> Transmission -> STM Transmission
forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> Transmission
ok

        ok :: Transmission
        ok :: Transmission
ok = CorrId -> RecipientId -> Command 'Broker -> Transmission
mkResp CorrId
corrId RecipientId
queueId Command 'Broker
OK

        err :: ErrorType -> Transmission
        err :: ErrorType -> Transmission
err = CorrId -> RecipientId -> Command 'Broker -> Transmission
mkResp CorrId
corrId RecipientId
queueId (Command 'Broker -> Transmission)
-> (ErrorType -> Command 'Broker) -> ErrorType -> Transmission
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ErrorType -> Command 'Broker
ERR

        okResp :: Either ErrorType () -> Transmission
        okResp :: Either ErrorType () -> Transmission
okResp = (ErrorType -> Transmission)
-> (() -> Transmission) -> Either ErrorType () -> Transmission
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either ErrorType -> Transmission
err ((() -> Transmission) -> Either ErrorType () -> Transmission)
-> (() -> Transmission) -> Either ErrorType () -> Transmission
forall a b. (a -> b) -> a -> b
$ Transmission -> () -> Transmission
forall a b. a -> b -> a
const Transmission
ok

        msgCmd :: Message -> Command 'Broker
        msgCmd :: Message -> Command 'Broker
msgCmd Message {RecipientId
msgId :: RecipientId
msgId :: Message -> RecipientId
msgId, UTCTime
ts :: UTCTime
ts :: Message -> UTCTime
ts, RecipientId
msgBody :: RecipientId
msgBody :: Message -> RecipientId
msgBody} = RecipientId -> UTCTime -> RecipientId -> Command 'Broker
MSG RecipientId
msgId UTCTime
ts RecipientId
msgBody

withLog :: (MonadUnliftIO m, MonadReader Env m) => (StoreLog 'WriteMode -> IO a) -> m ()
withLog :: (StoreLog 'WriteMode -> IO a) -> m ()
withLog StoreLog 'WriteMode -> IO a
action = do
  Env
env <- m Env
forall r (m :: * -> *). MonadReader r m => m r
ask
  IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ())
-> (Maybe (StoreLog 'WriteMode) -> IO ())
-> Maybe (StoreLog 'WriteMode)
-> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (StoreLog 'WriteMode -> IO a)
-> Maybe (StoreLog 'WriteMode) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ StoreLog 'WriteMode -> IO a
action (Maybe (StoreLog 'WriteMode) -> m ())
-> Maybe (StoreLog 'WriteMode) -> m ()
forall a b. (a -> b) -> a -> b
$ Env -> Maybe (StoreLog 'WriteMode)
storeLog (Env
env :: Env)

randomId :: (MonadUnliftIO m, MonadReader Env m) => Int -> m Encoded
randomId :: Int -> m RecipientId
randomId Int
n = do
  TVar ChaChaDRG
gVar <- (Env -> TVar ChaChaDRG) -> m (TVar ChaChaDRG)
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks Env -> TVar ChaChaDRG
idsDrg
  STM RecipientId -> m RecipientId
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (Int -> TVar ChaChaDRG -> STM RecipientId
randomBytes Int
n TVar ChaChaDRG
gVar)

randomBytes :: Int -> TVar ChaChaDRG -> STM ByteString
randomBytes :: Int -> TVar ChaChaDRG -> STM RecipientId
randomBytes Int
n TVar ChaChaDRG
gVar = do
  ChaChaDRG
g <- TVar ChaChaDRG -> STM ChaChaDRG
forall a. TVar a -> STM a
readTVar TVar ChaChaDRG
gVar
  let (RecipientId
bytes, ChaChaDRG
g') = Int -> ChaChaDRG -> (RecipientId, ChaChaDRG)
forall gen byteArray.
(DRG gen, ByteArray byteArray) =>
Int -> gen -> (byteArray, gen)
randomBytesGenerate Int
n ChaChaDRG
g
  TVar ChaChaDRG -> ChaChaDRG -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar ChaChaDRG
gVar ChaChaDRG
g'
  RecipientId -> STM RecipientId
forall (m :: * -> *) a. Monad m => a -> m a
return RecipientId
bytes