{- Copyright 2016 Joey Hess - - Licensed under the GNU AGPL version 3 or higher. -} module HTTP.RateLimit where import Types.Cost import HTTP import HTTP.ProofOfWork import HTTP.Logger import Tunables import CmdLine (ServerConfig(..)) import Types.Storage import Storage.Local import Servant import Control.Concurrent import Control.Concurrent.STM import Control.Concurrent.TokenBucket import qualified Control.Concurrent.FairRWLock as FairRWLock import Control.Concurrent.Thread.Delay import qualified Data.BloomFilter.Mutable as BloomFilter import qualified Data.BloomFilter.Hash as BloomFilter import Data.BloomFilter.Easy (suggestSizing) import Control.Monad import Control.Monad.ST import Control.Exception.Lifted (bracket) import System.DiskSpace import Data.Maybe import Data.Word import Control.Monad.IO.Class -- | A rate limiter is a series of buckets. Each bucket has a -- successively more difficult proof of work access requirement. -- -- To guard against DOS attacks that reuse the same proof of work, -- bloom filters keep track of RequestIDs that have been used before. data RateLimiter = RateLimiter { buckets :: TMVar [Bucket] , unusedBuckets :: TMVar [Bucket] , fallbackQueue :: FallbackQueue , usedRequestIDs :: BloomFilter , usedRequestIDsOld :: BloomFilter , numUsedRequestIDs :: TMVar Int , requestIDSecret :: RequestIDSecret , requestCounter :: TMVar Integer } type BloomFilter = TMVar (BloomFilter.MBloom RealWorld RequestID) -- | Buckets fill up at a fixed rate, and accessing a bucket -- removes one unit from it. data Bucket = Bucket { tokenBucket :: TokenBucket , proofOfWorkRequired :: Seconds , fillInterval :: Word64 } minFillInterval :: Word64 minFillInterval = 2 * 60 * 1000000 -- 1 token every other minute -- | Size of the bucket. This allows a burst of accesses after an idle -- period, which is especially useful when retrieving keys that were -- split into multiple chunks. However, setting this too high lets clients -- cheaply store lots of data on a server that has been idle for a while, -- which could be an attractive way to abuse keysafe servers. burstSize :: Word64 burstSize = 4 -- 256 kb immediate storage newRateLimiter :: ServerConfig -> Maybe LocalStorageDirectory -> Logger -> IO RateLimiter newRateLimiter cfg storedir logger = do rl <- RateLimiter <$> (newTMVarIO =<< mkbuckets (sdiv maxProofOfWork 2) []) <*> newTMVarIO [] <*> newFallbackQueue <*> mkBloomFilter <*> mkBloomFilter <*> newTMVarIO 0 <*> newRequestIDSecret <*> newTMVarIO 0 _ <- forkIO (adjusterThread cfg storedir rl logger) return rl where -- The last bucket takes half of maxProofOfWork to access, and -- each earlier bucket quarters that time, down to the first bucket, -- which needs no proof of work. This ensures that in the edge case -- where a client keeps getting bumped up to more and more expensive -- buckets, it doesn't need to do more than maxProofOfWork total work. mkbuckets s@(Seconds n) bs | n <= 0 = finalbucket bs | otherwise = do case mkProofOfWorkRequirement s of Nothing -> finalbucket bs Just _ -> do b <- Bucket <$> newTokenBucket <*> pure s <*> pure minFillInterval mkbuckets (sdiv s 4) (b:bs) finalbucket bs = do b <- Bucket <$> newTokenBucket <*> pure (Seconds 0) <*> pure minFillInterval return (b:bs) sdiv (Seconds n) d = Seconds (n / d) mkBloomFilter :: IO BloomFilter mkBloomFilter = do b <- stToIO $ BloomFilter.new (BloomFilter.cheapHashes bloomhashes) bloomsize newTMVarIO b where -- Size the bloom filter to hold 1 million items, with a false -- positive rate of 1 in 100 thousand. This will use around 32 mb -- of memory. (bloomsize, bloomhashes) = suggestSizing bloomMaxSize (1/100000) -- | Maximum number of RequestIDs that can be stored in a bloom filter -- without the false positive rate getting bad. bloomMaxSize :: Int bloomMaxSize = 1000000 -- A request is tried in each bucket in turn which its proof of work allows -- access to, until one is found that accepts it. rateLimit :: POWIdent p => RateLimiter -> Logger -> Maybe ProofOfWork -> p -> Handler a -> Handler (POWGuarded a) rateLimit ratelimiter logger mpow p a = do bs <- getBuckets ratelimiter validrequest <- liftIO $ checkValidRequestID ratelimiter logger mpow if validrequest then go bs else assignWork ratelimiter bs where go [] = fallback ratelimiter logger a go (b:bs) = case mkProofOfWorkRequirement (proofOfWorkRequired b) of Nothing -> checkbucket b bs Just mkreq -> case mpow of Nothing -> assignWork ratelimiter (b:bs) Just pow@(ProofOfWork _ rid) -> if isValidProofOfWork pow (mkreq rid) p then checkbucket b bs else assignWork ratelimiter (b:bs) checkbucket b bs = do allowed <- liftIO $ tokenBucketTryAlloc (tokenBucket b) burstSize (fillInterval b) 1 if allowed then allowRequest ratelimiter a else go bs checkValidRequestID :: RateLimiter -> Logger -> Maybe ProofOfWork -> IO Bool checkValidRequestID _ _ Nothing = return True checkValidRequestID rl logger (Just (ProofOfWork _ rid)) | validRequestID (requestIDSecret rl) rid = do used <- iselem usedRequestIDs oldused <- iselem usedRequestIDsOld if not used && not oldused then do withBloomFilter rl usedRequestIDs (`BloomFilter.insert` rid) checkbloomsize return True else return False | otherwise = return False where iselem f = withBloomFilter rl f (BloomFilter.elem rid) checkbloomsize = do needrot <- atomically $ do n <- takeTMVar (numUsedRequestIDs rl) if n > bloomMaxSize `div` 2 then return (Just n) else do putTMVar (numUsedRequestIDs rl) (n+1) return Nothing handlerotation needrot handlerotation Nothing = return () handlerotation (Just n) = do logStderr logger $ "rotating bloom filters after processing " ++ show n ++ " requests" newused <- mkBloomFilter atomically $ do oldused <- takeTMVar (usedRequestIDs rl) putTMVar (usedRequestIDsOld rl) oldused putTMVar (usedRequestIDs rl) =<< takeTMVar newused putTMVar (numUsedRequestIDs rl) 0 assignWork :: RateLimiter -> [Bucket] -> Handler (POWGuarded a) assignWork ratelimiter bs = case mapMaybe (mkProofOfWorkRequirement . proofOfWorkRequired) bs of [] -> throwError err404 (mkreq:_) -> do rid <- liftIO $ mkRequestID $ requestIDSecret ratelimiter return $ NeedProofOfWork $ mkreq rid withBloomFilter :: RateLimiter -> (RateLimiter -> BloomFilter) -> (BloomFilter.MBloom RealWorld RequestID -> ST RealWorld a) -> IO a withBloomFilter rl field a = do b <- atomically $ readTMVar (field rl) stToIO (a b) getBuckets :: MonadIO m => RateLimiter -> m [Bucket] getBuckets = liftIO . atomically . readTMVar . buckets putBuckets :: MonadIO m => RateLimiter -> [Bucket] -> m () putBuckets rl bs = liftIO $ atomically $ do _ <- takeTMVar (buckets rl) putTMVar (buckets rl) bs -- The fallback queue is used when a client has provided a good enough -- proof of work to access all buckets, but all are empty. -- -- Only a limited number of requests can be in the queue, since they take -- up server memory while blocked, and since too large a queue would stall -- requests for too long. -- -- Once in the queue, requests are run in FIFO order. -- -- A separate bucket is used to rate limit requests in the fallback queue, -- so requests in the queue do not need to contend with requests not in the -- queue. data FallbackQueue = FallbackQueue { fallbackBucket :: TokenBucket , blockedRequestLock :: FairRWLock.RWLock , fallbackQueueSlots :: TMVar Int } newFallbackQueue :: IO FallbackQueue newFallbackQueue = FallbackQueue <$> newTokenBucket <*> FairRWLock.new <*> newTMVarIO 100 fallback :: RateLimiter -> Logger -> Handler a -> Handler (POWGuarded a) fallback ratelimiter logger a = bracket (liftIO addq) (liftIO . removeq) go where q = fallbackQueueSlots (fallbackQueue ratelimiter) addq = liftIO $ atomically $ do n <- takeTMVar q if n <= 0 then do putTMVar q n return False else do putTMVar q (n-1) return True removeq False = return () removeq True = liftIO $ atomically $ do n <- takeTMVar q putTMVar q (n+1) -- tokenBucketWait is not fair, so use the blockedRequestLock -- to get fair FIFO ordering. waitbucket = do logStderr logger "** warning: All token buckets are empty. Delaying request.." FairRWLock.withWrite (blockedRequestLock (fallbackQueue ratelimiter)) $ do -- For simplicity, use the same fillInterval as the -- last bucket in the rate limiter for the fallback -- bucket. bs <- getBuckets ratelimiter case reverse bs of (lastb:_) -> tokenBucketWait (fallbackBucket (fallbackQueue ratelimiter)) burstSize (fillInterval lastb) [] -> return () go False = giveup go True = do liftIO waitbucket allowRequest ratelimiter a giveup = do liftIO $ logStderr logger "** warning: All token buckets are empty and request queue is large; possible DOS attack? Rejected request.." assignWork ratelimiter =<< getBuckets ratelimiter -- | How much data could be stored, in bytes per second, assuming all -- buckets in the rate limiter being constantly drained by requests, -- and all requests store objects. maximumStorageRate :: RateLimiter -> IO Integer maximumStorageRate ratelimiter = do bs <- getBuckets ratelimiter -- The last bucket is counted a second time, because the fallback -- request queue has its own bucket with the same characteristics -- as this bucket. let fallbackb = take 1 (reverse bs) return $ sum $ map calc (bs ++ fallbackb) where storesize = maximum knownObjectSizes calc b = fromIntegral $ (storesize * 1000000) `div` fromIntegral (fillInterval b) describeRateLimiter :: RateLimiter -> IO String describeRateLimiter ratelimiter = do storerate <- maximumStorageRate ratelimiter bs <- getBuckets ratelimiter return $ concat [ "rate limiter buckets: " ++ show bs , " ; maximum allowed storage rate: " , showBytes (storerate * 60 * 60 * 24 * 31) ++ "/month" ] showBytes :: Integer -> String showBytes n | n <= 1024*1024 = show (n `div` 1024) ++ " KiB" | n <= 1024*1024*1024 = show (n `div` (1024 * 1024)) ++ " MiB" | otherwise = show (n `div` (1024 * 1024 * 1024)) ++ " GiB" instance Show Bucket where show b = show (fillInterval b `div` (60 * 1000000)) ++ " Second/Request" ++ " (PoW=" ++ show (proofOfWorkRequired b) ++ ")" increaseDifficulty :: Logger -> RateLimiter -> IO () increaseDifficulty logger ratelimiter = do bs <- getBuckets ratelimiter case bs of [] -> unable (b:[]) | fillInterval b < maxBound `div` 2 -> do -- Make the remaining bucket take longer to fill. let b' = b { fillInterval = fillInterval b * 2 } putBuckets ratelimiter [b'] done | otherwise -> unable (b:rest) -> do -- Remove less expensive to access buckets, -- so that clients have to do some work. -- This is done first to cut off any freeloaders -- that may be abusing the keysafe server. atomically $ do unused <- takeTMVar (unusedBuckets ratelimiter) putTMVar (unusedBuckets ratelimiter) (b:unused) putBuckets ratelimiter rest done where unable = logStderr logger "Unable to increase difficulty any further!" done = do desc <- describeRateLimiter ratelimiter logStdout logger $ "increased difficulty -- " ++ desc -- Should undo the effect of increaseDifficulty. reduceDifficulty :: Logger -> RateLimiter -> IO () reduceDifficulty logger ratelimiter = do bs <- getBuckets ratelimiter case bs of (b:[]) | fillInterval b > minFillInterval -> do let b' = b { fillInterval = fillInterval b `div` 2 } putBuckets ratelimiter [b'] done _ -> do mb <- getunused case mb of Nothing -> unable Just b -> do putBuckets ratelimiter (b:bs) done where getunused = atomically $ do unused <- takeTMVar (unusedBuckets ratelimiter) case unused of (b:bs) -> do putTMVar (unusedBuckets ratelimiter) bs return (Just b) [] -> do putTMVar (unusedBuckets ratelimiter) [] return Nothing unable = return () done = do desc <- describeRateLimiter ratelimiter logStdout logger $ "reduced difficulty -- " ++ desc allowRequest :: RateLimiter -> Handler a -> Handler (POWGuarded a) allowRequest ratelimiter a = do liftIO $ addRequest ratelimiter 1 Result <$> a addRequest :: RateLimiter -> Integer -> IO () addRequest ratelimiter n = liftIO $ atomically $ do v <- takeTMVar c putTMVar c (v + n) where c = requestCounter ratelimiter -- Thread that wakes up periodically and checks the request rate -- against the available disk space. If the disk is filling too quickly, -- the difficulty is increased. adjusterThread :: ServerConfig -> Maybe LocalStorageDirectory -> RateLimiter -> Logger -> IO () adjusterThread cfg storedir ratelimiter logger = forever $ do delay (1000000 * intervalsecs) checkRequestRate cfg storedir ratelimiter logger intervalsecs where intervalsecs = 60*15 checkRequestRate :: ServerConfig -> Maybe LocalStorageDirectory -> RateLimiter -> Logger -> Integer -> IO () checkRequestRate cfg storedir ratelimiter logger intervalsecs = do let storesize = maximum knownObjectSizes n <- liftIO $ atomically $ swapTMVar (requestCounter ratelimiter) 0 let maxstoredinterval = n * fromIntegral storesize let maxstoredthismonth = maxstoredinterval * (intervalsecs * 24*31 `div` (60*60)) freespace <- diskFree <$> localDiskUsage storedir let target = monthsToFillHalfDisk cfg let estimate = if maxstoredthismonth <= 0 then 10000 else freespace `div` maxstoredthismonth `div` 2 logStdout logger $ unlines [ "rate limit check" , " free disk space: " ++ showBytes freespace , " number of requests since last check: " ++ show n , " estimated max incoming data in the next month: " ++ showBytes maxstoredthismonth , " estimate min " ++ show estimate ++ " months to fill half of disk" ] if estimate > target * 2 then reduceDifficulty logger ratelimiter else if estimate < target then increaseDifficulty logger ratelimiter else return ()