module TimerWheel
(
TimerWheel,
Config (..),
Seconds,
Timer,
create,
with,
count,
register,
register_,
recurring,
recurring_,
cancel,
)
where
import Control.Exception (mask_)
import Data.Primitive.Array qualified as Array
import Ki qualified
import TimerWheel.Internal.Alarm (Alarm (..))
import TimerWheel.Internal.AlarmBuckets (AlarmBuckets, AlarmId)
import TimerWheel.Internal.AlarmBuckets qualified as AlarmBuckets
import TimerWheel.Internal.Bucket (Bucket)
import TimerWheel.Internal.Bucket qualified as Bucket
import TimerWheel.Internal.Counter (Counter, decrCounter_, incrCounter, incrCounter_, newCounter, readCounter)
import TimerWheel.Internal.Nanoseconds (Nanoseconds (..))
import TimerWheel.Internal.Nanoseconds qualified as Nanoseconds
import TimerWheel.Internal.Prelude
import TimerWheel.Internal.Timer (Timer (..), cancel)
import TimerWheel.Internal.Timestamp (Timestamp)
import TimerWheel.Internal.Timestamp qualified as Timestamp
data TimerWheel = TimerWheel
{ TimerWheel -> AlarmBuckets
buckets :: {-# UNPACK #-} !AlarmBuckets,
TimerWheel -> Nanoseconds
resolution :: {-# UNPACK #-} !Nanoseconds,
TimerWheel -> Counter
count :: {-# UNPACK #-} !Counter,
TimerWheel -> Counter
supply :: {-# UNPACK #-} !Counter
}
data Config = Config
{
Config -> Int
spokes :: {-# UNPACK #-} !Int,
Config -> Seconds
resolution :: !Seconds
}
deriving stock ((forall x. Config -> Rep Config x)
-> (forall x. Rep Config x -> Config) -> Generic Config
forall x. Rep Config x -> Config
forall x. Config -> Rep Config x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cfrom :: forall x. Config -> Rep Config x
from :: forall x. Config -> Rep Config x
$cto :: forall x. Rep Config x -> Config
to :: forall x. Rep Config x -> Config
Generic, Int -> Config -> ShowS
[Config] -> ShowS
Config -> String
(Int -> Config -> ShowS)
-> (Config -> String) -> ([Config] -> ShowS) -> Show Config
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> Config -> ShowS
showsPrec :: Int -> Config -> ShowS
$cshow :: Config -> String
show :: Config -> String
$cshowList :: [Config] -> ShowS
showList :: [Config] -> ShowS
Show)
create ::
Ki.Scope ->
Config ->
IO TimerWheel
create :: Scope -> Config -> IO TimerWheel
create Scope
scope Config
config = do
AlarmBuckets
buckets <- Int
-> Bucket Alarm -> IO (MutableArray (PrimState IO) (Bucket Alarm))
forall (m :: * -> *) a.
PrimMonad m =>
Int -> a -> m (MutableArray (PrimState m) a)
Array.newArray Int
spokes Bucket Alarm
forall a. Bucket a
Bucket.empty
Counter
count_ <- IO Counter
newCounter
Counter
supply <- IO Counter
newCounter
Scope -> IO Void -> IO ()
Ki.fork_ Scope
scope (AlarmBuckets -> Nanoseconds -> IO Void
forall v. AlarmBuckets -> Nanoseconds -> IO v
runTimerReaperThread AlarmBuckets
buckets Nanoseconds
resolution)
TimerWheel -> IO TimerWheel
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure TimerWheel {AlarmBuckets
buckets :: AlarmBuckets
buckets :: AlarmBuckets
buckets, count :: Counter
count = Counter
count_, Nanoseconds
resolution :: Nanoseconds
resolution :: Nanoseconds
resolution, Counter
supply :: Counter
supply :: Counter
supply}
where
spokes :: Int
spokes = if Config
config.spokes Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0 then Int
1024 else Config
config.spokes
resolution :: Nanoseconds
resolution = Seconds -> Nanoseconds
Nanoseconds.fromNonNegativeSeconds (if Config
config.resolution Seconds -> Seconds -> Bool
forall a. Ord a => a -> a -> Bool
<= Seconds
0 then Seconds
1 else Config
config.resolution)
with ::
Config ->
(TimerWheel -> IO a) ->
IO a
with :: forall a. Config -> (TimerWheel -> IO a) -> IO a
with Config
config TimerWheel -> IO a
action =
(Scope -> IO a) -> IO a
forall a. (Scope -> IO a) -> IO a
Ki.scoped \Scope
scope -> do
TimerWheel
wheel <- Scope -> Config -> IO TimerWheel
create Scope
scope Config
config
TimerWheel -> IO a
action TimerWheel
wheel
count :: TimerWheel -> IO Int
count :: TimerWheel -> IO Int
count TimerWheel
wheel =
Counter -> IO Int
readCounter TimerWheel
wheel.count
register ::
TimerWheel ->
Seconds ->
IO () ->
IO (Timer Bool)
register :: TimerWheel -> Seconds -> IO () -> IO (Timer Bool)
register TimerWheel
wheel Seconds
delay IO ()
action = do
Timestamp
now <- IO Timestamp
Timestamp.now
let ringsAt :: Timestamp
ringsAt = Timestamp
now Timestamp -> Nanoseconds -> Timestamp
`Timestamp.plus` Seconds -> Nanoseconds
Nanoseconds.fromSeconds Seconds
delay
Int
alarmId <- Counter -> IO Int
incrCounter TimerWheel
wheel.supply
TimerWheel -> Int -> Timestamp -> Alarm -> IO ()
insertAlarm TimerWheel
wheel Int
alarmId Timestamp
ringsAt (IO () -> Alarm
OneShot (IO ()
action IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Counter -> IO ()
decrCounter_ TimerWheel
wheel.count))
forall a b. Coercible a b => a -> b
forall a b. Coercible a b => a -> b
coerce @(IO (IO Bool)) @(IO (Timer Bool)) do
IO Bool -> IO (IO Bool)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure do
IO Bool -> IO Bool
forall a. IO a -> IO a
mask_ do
Bool
deleted <- AlarmBuckets -> Nanoseconds -> Int -> Timestamp -> IO Bool
AlarmBuckets.delete TimerWheel
wheel.buckets TimerWheel
wheel.resolution Int
alarmId Timestamp
ringsAt
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
deleted (Counter -> IO ()
decrCounter_ TimerWheel
wheel.count)
Bool -> IO Bool
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
deleted
register_ ::
TimerWheel ->
Seconds ->
IO () ->
IO ()
register_ :: TimerWheel -> Seconds -> IO () -> IO ()
register_ TimerWheel
wheel Seconds
delay IO ()
action =
IO (Timer Bool) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (TimerWheel -> Seconds -> IO () -> IO (Timer Bool)
register TimerWheel
wheel Seconds
delay IO ()
action)
recurring ::
TimerWheel ->
Seconds ->
IO () ->
IO (Timer ())
recurring :: TimerWheel -> Seconds -> IO () -> IO (Timer ())
recurring TimerWheel
wheel (Seconds -> Nanoseconds
Nanoseconds.fromSeconds -> Nanoseconds
delay) IO ()
action = do
Timestamp
now <- IO Timestamp
Timestamp.now
Int
alarmId <- Counter -> IO Int
incrCounter TimerWheel
wheel.supply
IORef Bool
canceledRef <- Bool -> IO (IORef Bool)
forall a. a -> IO (IORef a)
newIORef Bool
False
TimerWheel -> Int -> Timestamp -> Alarm -> IO ()
insertAlarm TimerWheel
wheel Int
alarmId (Timestamp
now Timestamp -> Nanoseconds -> Timestamp
`Timestamp.plus` Nanoseconds
delay) (IO () -> Nanoseconds -> IORef Bool -> Alarm
Recurring IO ()
action Nanoseconds
delay IORef Bool
canceledRef)
forall a b. Coercible a b => a -> b
forall a b. Coercible a b => a -> b
coerce @(IO (IO ())) @(IO (Timer ())) do
IO () -> IO (IO ())
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure do
IO () -> IO ()
forall a. IO a -> IO a
mask_ do
IORef Bool -> Bool -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef Bool
canceledRef Bool
True
Counter -> IO ()
decrCounter_ TimerWheel
wheel.count
recurring_ ::
TimerWheel ->
Seconds ->
IO () ->
IO ()
recurring_ :: TimerWheel -> Seconds -> IO () -> IO ()
recurring_ TimerWheel
wheel (Seconds -> Nanoseconds
Nanoseconds.fromSeconds -> Nanoseconds
delay) IO ()
action = do
Timestamp
now <- IO Timestamp
Timestamp.now
Int
alarmId <- Counter -> IO Int
incrCounter TimerWheel
wheel.supply
TimerWheel -> Int -> Timestamp -> Alarm -> IO ()
insertAlarm TimerWheel
wheel Int
alarmId (Timestamp
now Timestamp -> Nanoseconds -> Timestamp
`Timestamp.plus` Nanoseconds
delay) (IO () -> Nanoseconds -> Alarm
Recurring_ IO ()
action Nanoseconds
delay)
insertAlarm :: TimerWheel -> AlarmId -> Timestamp -> Alarm -> IO ()
insertAlarm :: TimerWheel -> Int -> Timestamp -> Alarm -> IO ()
insertAlarm TimerWheel
wheel Int
alarmId Timestamp
ringsAt Alarm
alarm =
IO () -> IO ()
forall a. IO a -> IO a
mask_ do
Counter -> IO ()
incrCounter_ TimerWheel
wheel.count
AlarmBuckets -> Nanoseconds -> Int -> Timestamp -> Alarm -> IO ()
AlarmBuckets.insert TimerWheel
wheel.buckets TimerWheel
wheel.resolution Int
alarmId Timestamp
ringsAt Alarm
alarm
runTimerReaperThread :: AlarmBuckets -> Nanoseconds -> IO v
runTimerReaperThread :: forall v. AlarmBuckets -> Nanoseconds -> IO v
runTimerReaperThread AlarmBuckets
buckets Nanoseconds
resolution = do
Timestamp
now <- IO Timestamp
Timestamp.now
let progress :: Nanoseconds
progress = Timestamp
now Timestamp -> Nanoseconds -> Nanoseconds
`Timestamp.intoEpoch` Nanoseconds
resolution
let remaining :: Nanoseconds
remaining = Nanoseconds
resolution Nanoseconds -> Nanoseconds -> Nanoseconds
`Nanoseconds.unsafeMinus` Nanoseconds
progress
Nanoseconds -> IO ()
Nanoseconds.sleep Nanoseconds
remaining
let idealTime :: Timestamp
idealTime = Timestamp
now Timestamp -> Nanoseconds -> Timestamp
`Timestamp.plus` Nanoseconds
remaining
Timestamp -> Int -> IO v
forall v. Timestamp -> Int -> IO v
theLoop Timestamp
idealTime (AlarmBuckets -> Nanoseconds -> Timestamp -> Int
AlarmBuckets.timestampToIndex AlarmBuckets
buckets Nanoseconds
resolution Timestamp
now)
where
theLoop :: Timestamp -> Int -> IO v
theLoop :: forall v. Timestamp -> Int -> IO v
theLoop !Timestamp
idealTime !Int
index = do
Bucket Alarm
expired <- AlarmBuckets -> Int -> Timestamp -> IO (Bucket Alarm)
AlarmBuckets.deleteExpiredAt AlarmBuckets
buckets Int
index Timestamp
idealTime
Bucket Alarm -> IO ()
fireBucket Bucket Alarm
expired
Timestamp
now <- IO Timestamp
Timestamp.now
let !nextIdealTime :: Timestamp
nextIdealTime = Timestamp
idealTime Timestamp -> Nanoseconds -> Timestamp
`Timestamp.plus` Nanoseconds
resolution
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Timestamp
nextIdealTime Timestamp -> Timestamp -> Bool
forall a. Ord a => a -> a -> Bool
> Timestamp
now) (Nanoseconds -> IO ()
Nanoseconds.sleep (Timestamp
nextIdealTime Timestamp -> Timestamp -> Nanoseconds
`Timestamp.unsafeMinus` Timestamp
now))
Timestamp -> Int -> IO v
forall v. Timestamp -> Int -> IO v
theLoop Timestamp
nextIdealTime ((Int
index Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1) Int -> Int -> Int
forall a. Integral a => a -> a -> a
`rem` AlarmBuckets -> Int
forall s a. MutableArray s a -> Int
Array.sizeofMutableArray AlarmBuckets
buckets)
where
fireBucket :: Bucket Alarm -> IO ()
fireBucket :: Bucket Alarm -> IO ()
fireBucket Bucket Alarm
bucket0 =
case Bucket Alarm -> Pop Alarm
forall a. Bucket a -> Pop a
Bucket.pop Bucket Alarm
bucket0 of
Pop Alarm
Bucket.PopNada -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
Bucket.PopAlgo Int
alarmId Timestamp
ringsAt Alarm
timer Bucket Alarm
bucket1 -> do
Bucket Alarm
expired <- Bucket Alarm -> Int -> Timestamp -> Alarm -> IO (Bucket Alarm)
fireAlarm Bucket Alarm
bucket1 Int
alarmId Timestamp
ringsAt Alarm
timer
Bucket Alarm -> IO ()
fireBucket Bucket Alarm
expired
fireAlarm :: Bucket Alarm -> AlarmId -> Timestamp -> Alarm -> IO (Bucket Alarm)
fireAlarm :: Bucket Alarm -> Int -> Timestamp -> Alarm -> IO (Bucket Alarm)
fireAlarm Bucket Alarm
bucket Int
alarmId Timestamp
ringsAt Alarm
alarm =
case Alarm
alarm of
OneShot IO ()
action -> do
IO ()
action
Bucket Alarm -> IO (Bucket Alarm)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bucket Alarm
bucket
Recurring IO ()
action Nanoseconds
delay IORef Bool
canceledRef ->
IORef Bool -> IO Bool
forall a. IORef a -> IO a
readIORef IORef Bool
canceledRef IO Bool -> (Bool -> IO (Bucket Alarm)) -> IO (Bucket Alarm)
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Bool
True -> Bucket Alarm -> IO (Bucket Alarm)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bucket Alarm
bucket
Bool
False -> IO () -> Nanoseconds -> IO (Bucket Alarm)
fireRecurring IO ()
action Nanoseconds
delay
Recurring_ IO ()
action Nanoseconds
delay -> IO () -> Nanoseconds -> IO (Bucket Alarm)
fireRecurring IO ()
action Nanoseconds
delay
where
fireRecurring :: IO () -> Nanoseconds -> IO (Bucket Alarm)
fireRecurring :: IO () -> Nanoseconds -> IO (Bucket Alarm)
fireRecurring IO ()
action Nanoseconds
delay = do
IO ()
action
let ringsAtNext :: Timestamp
ringsAtNext = Timestamp
ringsAt Timestamp -> Nanoseconds -> Timestamp
`Timestamp.plus` Nanoseconds
delay
if Timestamp
ringsAtNext Timestamp -> Timestamp -> Bool
forall a. Ord a => a -> a -> Bool
< Timestamp
idealTime
then Bucket Alarm -> IO (Bucket Alarm)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Bucket Alarm -> IO (Bucket Alarm))
-> Bucket Alarm -> IO (Bucket Alarm)
forall a b. (a -> b) -> a -> b
$! Int -> Timestamp -> Alarm -> Bucket Alarm -> Bucket Alarm
forall a. Int -> Timestamp -> a -> Bucket a -> Bucket a
Bucket.insert Int
alarmId Timestamp
ringsAtNext Alarm
alarm Bucket Alarm
bucket
else do
AlarmBuckets -> Nanoseconds -> Int -> Timestamp -> Alarm -> IO ()
AlarmBuckets.insert AlarmBuckets
buckets Nanoseconds
resolution Int
alarmId Timestamp
ringsAtNext Alarm
alarm
Bucket Alarm -> IO (Bucket Alarm)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bucket Alarm
bucket