module Network.WebSockets.RPC.ACKable
( ackableRPCServer
, ackableRPCClient
, ACKable (..)
) where
import Network.WebSockets.RPC
import Data.UUID (UUID)
import Data.UUID.V4 (nextRandom)
import qualified Data.HashMap.Lazy as HM
import qualified Data.HashSet as HS
import Data.IORef (newIORef, readIORef, writeIORef)
import Data.Hashable (Hashable)
import Control.Applicative ((<|>))
import Control.Monad (when)
import Control.Monad.IO.Class (MonadIO, liftIO)
import Control.Concurrent (threadDelay)
import Control.Concurrent.Async (async)
import qualified Control.Concurrent.Async as Async
import Control.Concurrent.STM (atomically)
import Control.Concurrent.STM.TVar (newTVarIO, readTVarIO, writeTVar, modifyTVar, TVar)
data ACKable owner a = ACKable
{ ackableID :: UUID
, ackableOwner :: owner
, ackableData :: Maybe a
}
ack :: UUID -> owner -> ACKable owner a
ack ackableID ackableOwner = ACKable
{ ackableID
, ackableOwner
, ackableData = Nothing
}
ackableRPCServer :: forall sub sup rep com m owner
. ( MonadIO m
, Eq owner
, Hashable owner
)
=> (forall a. m a -> IO a)
-> owner
-> RPCServer sub sup rep com m
-> RPCServer (ACKable owner sub) (ACKable owner sup) (ACKable owner rep) com m
ackableRPCServer runM serverOwner rpc RPCServerParams{reply,complete} eSubSup = do
replyMailbox <- liftIO $ newTVarIO HM.empty
ownerPending <- liftIO $ newTVarIO (HM.empty :: HM.HashMap owner (HS.HashSet UUID))
let params :: owner -> RPCServerParams rep com m
params clientOwner = RPCServerParams
{ reply = \r -> do
ackableID <- liftIO nextRandom
let op =
reply ACKable
{ ackableID
, ackableOwner = serverOwner
, ackableData = Just r
}
liftIO $ do
expBackoff <- mkBackoff (runM op) $ do
replies <- readTVarIO replyMailbox
atomically $ do
modifyTVar replyMailbox $ HM.delete ackableID
modifyTVar ownerPending $ HM.delete clientOwner
case HM.lookup ackableID replies of
Nothing -> pure ()
Just (_,expBackoff) -> Async.cancel expBackoff
atomically $ do
modifyTVar replyMailbox $ HM.insert ackableID (r, expBackoff)
modifyTVar ownerPending $ HM.insertWith HS.union clientOwner (HS.singleton ackableID)
op
, complete
}
case eSubSup of
Left ACKable{ackableID,ackableOwner,ackableData} ->
case ackableData of
Nothing ->
liftIO $ putStrLn "Somehow received an ACK from a Sub on a server"
Just sub -> do
reply (ack ackableID serverOwner)
rpc (params ackableOwner) (Left sub)
Right ACKable{ackableID,ackableOwner,ackableData} ->
case ackableData of
Nothing -> do
replies <- liftIO $ readTVarIO replyMailbox
owners <- liftIO $ readTVarIO ownerPending
case HM.lookup ackableID replies of
Nothing -> liftIO $ putStrLn $ "Somehow received an ACK that doesn't exist: " ++ show ackableID
Just (_,expBackoff) -> liftIO $ do
Async.cancel expBackoff
atomically $ do
writeTVar replyMailbox $ HM.delete ackableID replies
writeTVar ownerPending $ HM.adjust (HS.delete ackableID) ackableOwner owners
Just sup -> do
reply (ack ackableID serverOwner)
rpc (params ackableOwner) (Right sup)
ackableRPCClient :: forall sub sup rep com m owner
. ( MonadIO m
, Eq owner
, Hashable owner
)
=> (forall a. m a -> IO a)
-> owner
-> RPCClient sub sup rep com m
-> m (RPCClient (ACKable owner sub) (ACKable owner sup) (ACKable owner rep) com m)
ackableRPCClient runM clientOwner RPCClient{subscription,onSubscribe,onReply,onComplete} = do
subscriptionMailbox <- liftIO $ newTVarIO HM.empty
supplyMailbox <- liftIO $ newTVarIO HM.empty
ownerPending <- liftIO $ newTVarIO (HM.empty :: HM.HashMap owner (HS.HashSet UUID))
ackableID <- liftIO nextRandom
let ackParams :: Maybe owner -> RPCClientParams (ACKable owner sup) m -> RPCClientParams sup m
ackParams mOwner RPCClientParams{supply,cancel} = RPCClientParams
{ supply = \s -> do
ackableID <- liftIO nextRandom
let op =
supply ACKable
{ ackableID
, ackableOwner = clientOwner
, ackableData = Just s
}
liftIO $ do
expBackoff <- mkBackoff (runM op) $ do
subscriptions <- readTVarIO subscriptionMailbox
supplies <- readTVarIO supplyMailbox
atomically $ do
modifyTVar subscriptionMailbox $ HM.delete ackableID
modifyTVar supplyMailbox $ HM.delete ackableID
case mOwner of
Nothing -> pure ()
Just serverOwner -> modifyTVar ownerPending $ HM.delete serverOwner
case Left <$> HM.lookup ackableID subscriptions <|> Right <$> HM.lookup ackableID supplies of
Nothing -> pure ()
Just (Left (_,expBackoff)) -> Async.cancel expBackoff
Just (Right (_,expBackoff)) -> Async.cancel expBackoff
atomically $ do
modifyTVar supplyMailbox $ HM.insert ackableID (s,expBackoff)
case mOwner of
Nothing -> pure ()
Just serverOwner -> modifyTVar ownerPending $ HM.insertWith HS.union serverOwner (HS.singleton ackableID)
op
, cancel
}
pure RPCClient
{ subscription = ACKable
{ ackableID
, ackableOwner = clientOwner
, ackableData = Just subscription
}
, onSubscribe = onSubscribe . ackParams Nothing
, onReply = \params ACKable{ackableID,ackableData,ackableOwner} -> case ackableData of
Nothing -> do
subscriptions <- liftIO $ readTVarIO subscriptionMailbox
supplies <- liftIO $ readTVarIO supplyMailbox
owners <- liftIO $ readTVarIO ownerPending
case Left <$> HM.lookup ackableID subscriptions <|> Right <$> HM.lookup ackableID supplies of
Nothing -> pure ()
Just (Left (_,expBackoff)) -> liftIO $ do
atomically $ do
writeTVar subscriptionMailbox $ HM.delete ackableID subscriptions
writeTVar ownerPending $ HM.adjust (HS.delete ackableID) ackableOwner owners
Async.cancel expBackoff
Just (Right (_,expBackoff)) -> liftIO $ do
atomically $ do
writeTVar supplyMailbox $ HM.delete ackableID supplies
writeTVar ownerPending $ HM.adjust (HS.delete ackableID) ackableOwner owners
Async.cancel expBackoff
Just rep -> do
(supply params) (ack ackableID clientOwner)
onReply (ackParams (Just ackableOwner) params) rep
, onComplete
}
second = 1000000
minute = 60 * second
hour = 60 * minute
day = 24 * hour
week = 7 * day
mkBackoff op x = do
spentWaiting <- newIORef (0 :: Int)
async $ do
toWait <- readIORef spentWaiting
writeIORef spentWaiting (toWait + 1)
let toWait' = 2 ^ toWait
soFar = sum $ (\x -> (2 ^ x) * second) <$> [0..toWait]
when (soFar > week) x
threadDelay $ second * (toWait' + 10)
op