module TimerWheel.Internal.Wheel ( Wheel (resolution), create, lenMicros, insert, reap, ) where import Control.Monad (join, replicateM, when) import Data.Array (Array) import qualified Data.Array as Array import Data.IORef import TimerWheel.Internal.Entries (Entries) import qualified TimerWheel.Internal.Entries as Entries import TimerWheel.Internal.Micros (Micros (..)) import qualified TimerWheel.Internal.Micros as Micros import TimerWheel.Internal.Timestamp (Timestamp) import qualified TimerWheel.Internal.Timestamp as Timestamp data Wheel = Wheel { Wheel -> Array Int (IORef Entries) buckets :: {-# UNPACK #-} !(Array Int (IORef Entries)), Wheel -> Micros resolution :: {-# UNPACK #-} !Micros } create :: Int -> Micros -> IO Wheel create :: Int -> Micros -> IO Wheel create Int spokes Micros resolution = do [IORef Entries] refs <- forall (m :: * -> *) a. Applicative m => Int -> m a -> m [a] replicateM Int spokes (forall a. a -> IO (IORef a) newIORef Entries Entries.empty) let buckets :: Array Int (IORef Entries) buckets = forall i e. Ix i => (i, i) -> [e] -> Array i e Array.listArray (Int 0, Int spokes forall a. Num a => a -> a -> a - Int 1) [IORef Entries] refs forall (f :: * -> *) a. Applicative f => a -> f a pure Wheel {Array Int (IORef Entries) buckets :: Array Int (IORef Entries) buckets :: Array Int (IORef Entries) buckets, Micros resolution :: Micros resolution :: Micros resolution} numSpokes :: Wheel -> Int numSpokes :: Wheel -> Int numSpokes Wheel wheel = forall (t :: * -> *) a. Foldable t => t a -> Int length (Wheel -> Array Int (IORef Entries) buckets Wheel wheel) lenMicros :: Wheel -> Micros lenMicros :: Wheel -> Micros lenMicros Wheel wheel = Int -> Micros -> Micros Micros.scale (Wheel -> Int numSpokes Wheel wheel) (Wheel -> Micros resolution Wheel wheel) bucket :: Wheel -> Timestamp -> IORef Entries bucket :: Wheel -> Timestamp -> IORef Entries bucket Wheel wheel Timestamp timestamp = Wheel -> Array Int (IORef Entries) buckets Wheel wheel forall i e. Ix i => Array i e -> i -> e Array.! Wheel -> Timestamp -> Int index Wheel wheel Timestamp timestamp index :: Wheel -> Timestamp -> Int index :: Wheel -> Timestamp -> Int index wheel :: Wheel wheel@Wheel {Micros resolution :: Micros resolution :: Wheel -> Micros resolution} Timestamp timestamp = forall a b. (Integral a, Num b) => a -> b fromIntegral (Micros -> Timestamp -> Word64 Timestamp.epoch Micros resolution Timestamp timestamp) forall a. Integral a => a -> a -> a `rem` Wheel -> Int numSpokes Wheel wheel insert :: Wheel -> Int -> Micros -> IO () -> IO (IO Bool) insert :: Wheel -> Int -> Micros -> IO () -> IO (IO Bool) insert Wheel wheel Int key Micros delay IO () action = do IORef Entries bucketRef <- do Timestamp now <- IO Timestamp Timestamp.now forall (f :: * -> *) a. Applicative f => a -> f a pure (Wheel -> Timestamp -> IORef Entries bucket Wheel wheel (Timestamp now Timestamp -> Micros -> Timestamp `Timestamp.plus` Micros delay)) forall a b. IORef a -> (a -> (a, b)) -> IO b atomicModifyIORef' IORef Entries bucketRef (\Entries entries -> (Entries -> Entries insertEntry Entries entries, ())) forall (f :: * -> *) a. Applicative f => a -> f a pure do forall a b. IORef a -> (a -> (a, b)) -> IO b atomicModifyIORef' IORef Entries bucketRef \Entries entries -> case Int -> Entries -> Maybe Entries Entries.delete Int key Entries entries of Maybe Entries Nothing -> (Entries entries, Bool False) Just Entries entries' -> (Entries entries', Bool True) where insertEntry :: Entries -> Entries insertEntry :: Entries -> Entries insertEntry = Int -> Word64 -> IO () -> Entries -> Entries Entries.insert Int key (Micros -> Word64 unMicros (Micros delay Micros -> Micros -> Micros `Micros.div` Wheel -> Micros lenMicros Wheel wheel)) IO () action reap :: Wheel -> IO a reap :: forall a. Wheel -> IO a reap wheel :: Wheel wheel@Wheel {Array Int (IORef Entries) buckets :: Array Int (IORef Entries) buckets :: Wheel -> Array Int (IORef Entries) buckets, Micros resolution :: Micros resolution :: Wheel -> Micros resolution} = do Timestamp now <- IO Timestamp Timestamp.now let remainingBucketMicros :: Micros remainingBucketMicros = Micros resolution Micros -> Micros -> Micros `Micros.minus` (Timestamp now Timestamp -> Micros -> Micros `Timestamp.rem` Micros resolution) Micros -> IO () Micros.sleep Micros remainingBucketMicros forall a. Timestamp -> Int -> IO a loop (Timestamp now Timestamp -> Micros -> Timestamp `Timestamp.plus` Micros remainingBucketMicros Timestamp -> Micros -> Timestamp `Timestamp.plus` Micros resolution) (Wheel -> Timestamp -> Int index Wheel wheel Timestamp now) where loop :: Timestamp -> Int -> IO a loop :: forall a. Timestamp -> Int -> IO a loop Timestamp nextTime Int i = do forall (m :: * -> *) a. Monad m => m (m a) -> m a join (forall a b. IORef a -> (a -> (a, b)) -> IO b atomicModifyIORef' (Array Int (IORef Entries) buckets forall i e. Ix i => Array i e -> i -> e Array.! Int i) Entries -> (Entries, IO ()) expire) Timestamp afterTime <- IO Timestamp Timestamp.now forall (f :: * -> *). Applicative f => Bool -> f () -> f () when (Timestamp afterTime forall a. Ord a => a -> a -> Bool < Timestamp nextTime) (Micros -> IO () Micros.sleep (Timestamp nextTime Timestamp -> Timestamp -> Micros `Timestamp.minus` Timestamp afterTime)) forall a. Timestamp -> Int -> IO a loop (Timestamp nextTime Timestamp -> Micros -> Timestamp `Timestamp.plus` Micros resolution) ((Int i forall a. Num a => a -> a -> a + Int 1) forall a. Integral a => a -> a -> a `rem` Wheel -> Int numSpokes Wheel wheel) expire :: Entries -> (Entries, IO ()) expire :: Entries -> (Entries, IO ()) expire Entries entries | Entries -> Bool Entries.null Entries entries = (Entries entries, forall (f :: * -> *) a. Applicative f => a -> f a pure ()) | Bool otherwise = (Entries alive, forall (t :: * -> *) (m :: * -> *) a. (Foldable t, Monad m) => t (m a) -> m () sequence_ [IO ()] expired) where ([IO ()] expired, Entries alive) = Entries -> ([IO ()], Entries) Entries.partition Entries entries