-- | This module is intended to be imported qualified:
--
-- > import TimerWheel (TimerWheel)
-- > import TimerWheel qualified
module TimerWheel
  ( -- * Timer wheel
    TimerWheel,

    -- * Timer wheel configuration
    Config (..),
    Seconds,

    -- * Timer
    Timer,

    -- * Constructing a timer wheel
    create,
    with,

    -- * Querying a timer wheel
    count,

    -- * Registering timers in a timer wheel
    register,
    register_,
    recurring,
    recurring_,

    -- * Canceling timers
    cancel,
  )
where

import Control.Exception (mask_)
import qualified Data.Atomics as Atomics
import Data.Functor (void)
import Data.Primitive.Array (MutableArray)
import qualified Data.Primitive.Array as Array
import GHC.Base (RealWorld)
import qualified Ki
import TimerWheel.Internal.Bucket (Bucket)
import qualified TimerWheel.Internal.Bucket as Bucket
import TimerWheel.Internal.Counter (Counter, decrCounter_, incrCounter, incrCounter_, newCounter, readCounter)
import TimerWheel.Internal.Nanoseconds (Nanoseconds (..))
import qualified TimerWheel.Internal.Nanoseconds as Nanoseconds
import TimerWheel.Internal.Prelude
import TimerWheel.Internal.Timestamp (Timestamp)
import qualified TimerWheel.Internal.Timestamp as Timestamp

-- | A timer wheel is a vector-of-collections-of timers to fire. Timers may be one-shot or recurring, and may be
-- scheduled arbitrarily far in the future.
--
-- A timer wheel is configured with a /spoke count/ and /resolution/:
--
-- * The /spoke count/ determines the size of the timer vector.
--
--     A __larger spoke count__ will require __more memory__, but will result in __less insert contention__.
--
-- * The /resolution/ determines the duration of time that each spoke corresponds to, and thus how often timers are
--   checked for expiry.
--
--     For example, in a timer wheel with a /resolution/ of __@1 second@__, a timer that is scheduled to fire at
--     __@8.4 o'clock@__ will end up firing around __@9.0 o'clock@__ instead (that is, on the
--     __@1 second@__-boundary).
--
--     A __larger resolution__ will result in __more insert contention__ and __less accurate timers__, but will require
--     __fewer wakeups__ by the timeout thread.
--
-- The timeout thread has some important properties:
--
--     * There is only one, and it fires expired timers synchronously. If your timer actions execute quicky, you can
--       'register' them directly. Otherwise, consider registering an action that enqueues the real action to be
--       performed on a job queue.
--
--     * A synchronous exception thrown by a registered timer will bring the timeout thread down, and the exception will
--       be propagated to the thread that created the timer wheel. If you want to log and ignore exceptions, for example,
--       you will have to bake this into the registered actions yourself.
--
-- __API summary__
--
-- +----------+---------+----------------+
-- | Create   | Query   | Modify         |
-- +==========+=========+================+
-- | 'create' | 'count' | 'register'     |
-- +----------+---------+----------------+
-- | 'with'   |         | 'register_'    |
-- +----------+         +----------------+
-- |          |         | 'recurring'    |
-- |          |         +----------------+
-- |          |         | 'recurring_'   |
-- +----------+---------+----------------+
data TimerWheel = TimerWheel
  { TimerWheel -> MutableArray RealWorld (Bucket Timer0)
buckets :: {-# UNPACK #-} !(MutableArray RealWorld (Bucket Timer0)),
    TimerWheel -> Nanoseconds
resolution :: {-# UNPACK #-} !Nanoseconds,
    TimerWheel -> Counter
numTimers :: {-# UNPACK #-} !Counter,
    -- A counter to generate unique ints that identify registered actions, so they can be canceled.
    TimerWheel -> Counter
timerIdSupply :: {-# UNPACK #-} !Counter
  }

-- | A timer wheel config.
--
-- * @spokes@ must be ∈ @[1, maxBound]@, and is set to @1024@ if invalid.
-- * @resolution@ must be ∈ @(0, ∞]@, and is set to @1@ if invalid.
--
-- __API summary__
--
-- +----------+
-- | Create   |
-- +==========+
-- | 'Config' |
-- +----------+
data Config = Config
  { -- | Spoke count
    Config -> Int
spokes :: {-# UNPACK #-} !Int,
    -- | Resolution
    Config -> Seconds
resolution :: !Seconds
  }
  deriving stock ((forall x. Config -> Rep Config x)
-> (forall x. Rep Config x -> Config) -> Generic Config
forall x. Rep Config x -> Config
forall x. Config -> Rep Config x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cfrom :: forall x. Config -> Rep Config x
from :: forall x. Config -> Rep Config x
$cto :: forall x. Rep Config x -> Config
to :: forall x. Rep Config x -> Config
Generic, Int -> Config -> ShowS
[Config] -> ShowS
Config -> String
(Int -> Config -> ShowS)
-> (Config -> String) -> ([Config] -> ShowS) -> Show Config
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> Config -> ShowS
showsPrec :: Int -> Config -> ShowS
$cshow :: Config -> String
show :: Config -> String
$cshowList :: [Config] -> ShowS
showList :: [Config] -> ShowS
Show)

-- | Create a timer wheel in a scope.
create ::
  -- | ​
  Ki.Scope ->
  -- | ​
  Config ->
  -- | ​
  IO TimerWheel
create :: Scope -> Config -> IO TimerWheel
create Scope
scope (Config Int
spokes0 Seconds
resolution0) = do
  MutableArray RealWorld (Bucket Timer0)
buckets <- Int
-> Bucket Timer0
-> IO (MutableArray (PrimState IO) (Bucket Timer0))
forall (m :: * -> *) a.
PrimMonad m =>
Int -> a -> m (MutableArray (PrimState m) a)
Array.newArray Int
spokes Bucket Timer0
forall a. Bucket a
Bucket.empty
  Counter
numTimers <- IO Counter
newCounter
  Counter
timerIdSupply <- IO Counter
newCounter
  Scope -> IO Void -> IO ()
Ki.fork_ Scope
scope (MutableArray RealWorld (Bucket Timer0)
-> Counter -> Nanoseconds -> IO Void
forall void.
MutableArray RealWorld (Bucket Timer0)
-> Counter -> Nanoseconds -> IO void
runTimerReaperThread MutableArray RealWorld (Bucket Timer0)
buckets Counter
numTimers Nanoseconds
resolution)
  TimerWheel -> IO TimerWheel
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure TimerWheel {MutableArray RealWorld (Bucket Timer0)
$sel:buckets:TimerWheel :: MutableArray RealWorld (Bucket Timer0)
buckets :: MutableArray RealWorld (Bucket Timer0)
buckets, Counter
$sel:numTimers:TimerWheel :: Counter
numTimers :: Counter
numTimers, Nanoseconds
$sel:resolution:TimerWheel :: Nanoseconds
resolution :: Nanoseconds
resolution, Counter
$sel:timerIdSupply:TimerWheel :: Counter
timerIdSupply :: Counter
timerIdSupply}
  where
    spokes :: Int
spokes = if Int
spokes0 Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0 then Int
1024 else Int
spokes0
    resolution :: Nanoseconds
resolution = Seconds -> Nanoseconds
Nanoseconds.fromNonNegativeSeconds (if Seconds
resolution0 Seconds -> Seconds -> Bool
forall a. Ord a => a -> a -> Bool
<= Seconds
0 then Seconds
1 else Seconds
resolution0)

-- | Perform an action with a timer wheel.
with ::
  -- | ​
  Config ->
  -- | ​
  (TimerWheel -> IO a) ->
  -- | ​
  IO a
with :: forall a. Config -> (TimerWheel -> IO a) -> IO a
with Config
config TimerWheel -> IO a
action =
  (Scope -> IO a) -> IO a
forall a. (Scope -> IO a) -> IO a
Ki.scoped \Scope
scope -> do
    TimerWheel
wheel <- Scope -> Config -> IO TimerWheel
create Scope
scope Config
config
    TimerWheel -> IO a
action TimerWheel
wheel

-- | Get the number of timers in a timer wheel.
--
-- /O(1)/.
count :: TimerWheel -> IO Int
count :: TimerWheel -> IO Int
count TimerWheel {Counter
$sel:numTimers:TimerWheel :: TimerWheel -> Counter
numTimers :: Counter
numTimers} =
  Counter -> IO Int
readCounter Counter
numTimers

-- | @register wheel delay action@ registers __@action@__ in __@wheel@__ to fire after __@delay@__ seconds.
--
-- When canceled, the timer returns whether or not the cancelation was successful; @False@ means the timer had either
-- already fired, or had already been canceled.
register ::
  -- | The timer wheel
  TimerWheel ->
  -- | The delay before the action is fired
  Seconds ->
  -- | The action to fire
  IO () ->
  -- | The timer
  IO (Timer Bool)
register :: TimerWheel -> Seconds -> IO () -> IO (Timer Bool)
register TimerWheel {MutableArray RealWorld (Bucket Timer0)
$sel:buckets:TimerWheel :: TimerWheel -> MutableArray RealWorld (Bucket Timer0)
buckets :: MutableArray RealWorld (Bucket Timer0)
buckets, Counter
$sel:numTimers:TimerWheel :: TimerWheel -> Counter
numTimers :: Counter
numTimers, Nanoseconds
$sel:resolution:TimerWheel :: TimerWheel -> Nanoseconds
resolution :: Nanoseconds
resolution, Counter
$sel:timerIdSupply:TimerWheel :: TimerWheel -> Counter
timerIdSupply :: Counter
timerIdSupply} Seconds
delay IO ()
action = do
  Timestamp
now <- IO Timestamp
Timestamp.now
  let timestamp :: Timestamp
timestamp = Timestamp
now Timestamp -> Nanoseconds -> Timestamp
`Timestamp.plus` Seconds -> Nanoseconds
Nanoseconds.fromSeconds Seconds
delay
  let index :: Int
index = MutableArray RealWorld (Bucket Timer0)
-> Nanoseconds -> Timestamp -> Int
forall bucket.
MutableArray RealWorld bucket -> Nanoseconds -> Timestamp -> Int
timestampToIndex MutableArray RealWorld (Bucket Timer0)
buckets Nanoseconds
resolution Timestamp
timestamp
  Int
timerId <- Counter -> IO Int
incrCounter Counter
timerIdSupply
  IO () -> IO ()
forall a. IO a -> IO a
mask_ do
    MutableArray RealWorld (Bucket Timer0)
-> Int -> (Bucket Timer0 -> Bucket Timer0) -> IO ()
forall a. MutableArray RealWorld a -> Int -> (a -> a) -> IO ()
atomicModifyArray MutableArray RealWorld (Bucket Timer0)
buckets Int
index (Int -> Timestamp -> Timer0 -> Bucket Timer0 -> Bucket Timer0
forall a. Int -> Timestamp -> a -> Bucket a -> Bucket a
Bucket.insert Int
timerId Timestamp
timestamp (IO () -> Timer0
OneShot1 IO ()
action))
    Counter -> IO ()
incrCounter_ Counter
numTimers
  forall a b. Coercible a b => a -> b
forall a b. Coercible a b => a -> b
coerce @(IO (IO Bool)) @(IO (Timer Bool)) do
    IO Bool -> IO (IO Bool)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure do
      IO Bool -> IO Bool
forall a. IO a -> IO a
mask_ do
        Bool
deleted <- MutableArray RealWorld (Bucket Timer0)
-> Int -> (Bucket Timer0 -> Maybe (Bucket Timer0)) -> IO Bool
forall a.
MutableArray RealWorld a -> Int -> (a -> Maybe a) -> IO Bool
atomicMaybeModifyArray MutableArray RealWorld (Bucket Timer0)
buckets Int
index (Int -> Bucket Timer0 -> Maybe (Bucket Timer0)
forall v. Int -> Bucket v -> Maybe (Bucket v)
Bucket.deleteExpectingHit Int
timerId)
        Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
deleted (Counter -> IO ()
decrCounter_ Counter
numTimers)
        Bool -> IO Bool
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
deleted

-- | Like 'register', but for when you don't intend to cancel the timer.
register_ ::
  -- | The timer wheel
  TimerWheel ->
  -- | The delay before the action is fired
  Seconds ->
  -- | The action to fire
  IO () ->
  IO ()
register_ :: TimerWheel -> Seconds -> IO () -> IO ()
register_ TimerWheel
wheel Seconds
delay IO ()
action =
  IO (Timer Bool) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (TimerWheel -> Seconds -> IO () -> IO (Timer Bool)
register TimerWheel
wheel Seconds
delay IO ()
action)

-- | @recurring wheel action delay@ registers __@action@__ in __@wheel@__ to fire in __@delay@__ seconds, and every
-- __@delay@__ seconds thereafter.
recurring ::
  -- | The timer wheel
  TimerWheel ->
  -- | The delay before each action is fired
  Seconds ->
  -- | The action to fire repeatedly
  IO () ->
  -- | The timer
  IO (Timer ())
recurring :: TimerWheel -> Seconds -> IO () -> IO (Timer ())
recurring TimerWheel {MutableArray RealWorld (Bucket Timer0)
$sel:buckets:TimerWheel :: TimerWheel -> MutableArray RealWorld (Bucket Timer0)
buckets :: MutableArray RealWorld (Bucket Timer0)
buckets, Counter
$sel:numTimers:TimerWheel :: TimerWheel -> Counter
numTimers :: Counter
numTimers, Nanoseconds
$sel:resolution:TimerWheel :: TimerWheel -> Nanoseconds
resolution :: Nanoseconds
resolution, Counter
$sel:timerIdSupply:TimerWheel :: TimerWheel -> Counter
timerIdSupply :: Counter
timerIdSupply} (Seconds -> Nanoseconds
Nanoseconds.fromSeconds -> Nanoseconds
delay) IO ()
action = do
  Timestamp
now <- IO Timestamp
Timestamp.now
  let timestamp :: Timestamp
timestamp = Timestamp
now Timestamp -> Nanoseconds -> Timestamp
`Timestamp.plus` Nanoseconds
delay
  let index :: Int
index = MutableArray RealWorld (Bucket Timer0)
-> Nanoseconds -> Timestamp -> Int
forall bucket.
MutableArray RealWorld bucket -> Nanoseconds -> Timestamp -> Int
timestampToIndex MutableArray RealWorld (Bucket Timer0)
buckets Nanoseconds
resolution Timestamp
timestamp
  Int
timerId <- Counter -> IO Int
incrCounter Counter
timerIdSupply
  IORef Bool
canceledRef <- Bool -> IO (IORef Bool)
forall a. a -> IO (IORef a)
newIORef Bool
False
  IO () -> IO ()
forall a. IO a -> IO a
mask_ do
    MutableArray RealWorld (Bucket Timer0)
-> Int -> (Bucket Timer0 -> Bucket Timer0) -> IO ()
forall a. MutableArray RealWorld a -> Int -> (a -> a) -> IO ()
atomicModifyArray MutableArray RealWorld (Bucket Timer0)
buckets Int
index (Int -> Timestamp -> Timer0 -> Bucket Timer0 -> Bucket Timer0
forall a. Int -> Timestamp -> a -> Bucket a -> Bucket a
Bucket.insert Int
timerId Timestamp
timestamp (IO () -> Nanoseconds -> IORef Bool -> Timer0
Recurring1 IO ()
action Nanoseconds
delay IORef Bool
canceledRef))
    Counter -> IO ()
incrCounter_ Counter
numTimers
  forall a b. Coercible a b => a -> b
forall a b. Coercible a b => a -> b
coerce @(IO (IO ())) @(IO (Timer ())) do
    IO () -> IO (IO ())
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure do
      IO () -> IO ()
forall a. IO a -> IO a
mask_ do
        IORef Bool -> Bool -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef Bool
canceledRef Bool
True
        Counter -> IO ()
decrCounter_ Counter
numTimers

-- | Like 'recurring', but for when you don't intend to cancel the timer.
recurring_ ::
  TimerWheel ->
  -- | The delay before each action is fired
  Seconds ->
  -- | The action to fire repeatedly
  IO () ->
  IO ()
recurring_ :: TimerWheel -> Seconds -> IO () -> IO ()
recurring_ TimerWheel {MutableArray RealWorld (Bucket Timer0)
$sel:buckets:TimerWheel :: TimerWheel -> MutableArray RealWorld (Bucket Timer0)
buckets :: MutableArray RealWorld (Bucket Timer0)
buckets, Counter
$sel:numTimers:TimerWheel :: TimerWheel -> Counter
numTimers :: Counter
numTimers, Nanoseconds
$sel:resolution:TimerWheel :: TimerWheel -> Nanoseconds
resolution :: Nanoseconds
resolution, Counter
$sel:timerIdSupply:TimerWheel :: TimerWheel -> Counter
timerIdSupply :: Counter
timerIdSupply} (Seconds -> Nanoseconds
Nanoseconds.fromSeconds -> Nanoseconds
delay) IO ()
action = do
  Timestamp
now <- IO Timestamp
Timestamp.now
  let timestamp :: Timestamp
timestamp = Timestamp
now Timestamp -> Nanoseconds -> Timestamp
`Timestamp.plus` Nanoseconds
delay
  let index :: Int
index = MutableArray RealWorld (Bucket Timer0)
-> Nanoseconds -> Timestamp -> Int
forall bucket.
MutableArray RealWorld bucket -> Nanoseconds -> Timestamp -> Int
timestampToIndex MutableArray RealWorld (Bucket Timer0)
buckets Nanoseconds
resolution Timestamp
timestamp
  Int
timerId <- Counter -> IO Int
incrCounter Counter
timerIdSupply
  IO () -> IO ()
forall a. IO a -> IO a
mask_ do
    MutableArray RealWorld (Bucket Timer0)
-> Int -> (Bucket Timer0 -> Bucket Timer0) -> IO ()
forall a. MutableArray RealWorld a -> Int -> (a -> a) -> IO ()
atomicModifyArray MutableArray RealWorld (Bucket Timer0)
buckets Int
index (Int -> Timestamp -> Timer0 -> Bucket Timer0 -> Bucket Timer0
forall a. Int -> Timestamp -> a -> Bucket a -> Bucket a
Bucket.insert Int
timerId Timestamp
timestamp (IO () -> Nanoseconds -> Timer0
Recurring1_ IO ()
action Nanoseconds
delay))
    Counter -> IO ()
incrCounter_ Counter
numTimers

-- | A registered timer, parameterized by the result of attempting to cancel it:
--
--     * A one-shot timer may only be canceled if it has not already fired.
--     * A recurring timer can always be canceled.
--
-- __API summary__
--
-- +-------------+----------+
-- | Create      | Modify   |
-- +=============+==========+
-- | 'register'  | 'cancel' |
-- +-------------+----------+
-- | 'recurring' |          |
-- +-------------+----------+
newtype Timer a
  = Timer (IO a)

-- | Cancel a timer.
cancel :: Timer a -> IO a
cancel :: forall a. Timer a -> IO a
cancel =
  Timer a -> IO a
forall a b. Coercible a b => a -> b
coerce

-- `timestampToIndex buckets resolution timestamp` figures out which index `timestamp` corresponds to in `buckets`,
-- where each bucket corresponds to `resolution` nanoseconds.
--
-- For example, consider a three-element `buckets` with resolution `1000000000`.
--
--   +--------------------------------------+
--   | 1000000000 | 1000000000 | 1000000000 |
--   +--------------------------------------+
--
-- Some timestamp like `1053298012387` gets binned to one of the three indices 0, 1, or 2, with quick and easy maffs:
--
--   1. Figure out which index the timestamp corresponds to, if there were infinitely many:
--
--        1053298012387 `div` 1000000000 = 1053
--
--   2. Wrap around per the actual length of the array:
--
--        1053 `rem` 3 = 0
timestampToIndex :: MutableArray RealWorld bucket -> Nanoseconds -> Timestamp -> Int
timestampToIndex :: forall bucket.
MutableArray RealWorld bucket -> Nanoseconds -> Timestamp -> Int
timestampToIndex MutableArray RealWorld bucket
buckets Nanoseconds
resolution Timestamp
timestamp =
  -- This downcast is safe because there are at most `maxBound :: Int` buckets (not that anyone would ever have that
  -- many...)
  forall a b. (Integral a, Num b) => a -> b
fromIntegral @Word64 @Int
    (Nanoseconds -> Timestamp -> Word64
Timestamp.epoch Nanoseconds
resolution Timestamp
timestamp Word64 -> Word64 -> Word64
forall a. Integral a => a -> a -> a
`rem` forall a b. (Integral a, Num b) => a -> b
fromIntegral @Int @Word64 (MutableArray RealWorld bucket -> Int
forall s a. MutableArray s a -> Int
Array.sizeofMutableArray MutableArray RealWorld bucket
buckets))

data Timer0
  = OneShot1 !(IO ())
  | Recurring1 !(IO ()) !Nanoseconds !(IORef Bool)
  | Recurring1_ !(IO ()) !Nanoseconds

type TimerId =
  Int

------------------------------------------------------------------------------------------------------------------------
-- Atomic operations on arrays

atomicModifyArray :: forall a. MutableArray RealWorld a -> Int -> (a -> a) -> IO ()
atomicModifyArray :: forall a. MutableArray RealWorld a -> Int -> (a -> a) -> IO ()
atomicModifyArray MutableArray RealWorld a
array Int
index a -> a
f = do
  Ticket a
ticket0 <- MutableArray RealWorld a -> Int -> IO (Ticket a)
forall a. MutableArray RealWorld a -> Int -> IO (Ticket a)
Atomics.readArrayElem MutableArray RealWorld a
array Int
index
  Ticket a -> IO ()
loop Ticket a
ticket0
  where
    loop :: Atomics.Ticket a -> IO ()
    loop :: Ticket a -> IO ()
loop Ticket a
ticket = do
      (Bool
success, Ticket a
ticket1) <- MutableArray RealWorld a
-> Int -> Ticket a -> a -> IO (Bool, Ticket a)
forall a.
MutableArray RealWorld a
-> Int -> Ticket a -> a -> IO (Bool, Ticket a)
Atomics.casArrayElem MutableArray RealWorld a
array Int
index Ticket a
ticket (a -> a
f (Ticket a -> a
forall a. Ticket a -> a
Atomics.peekTicket Ticket a
ticket))
      if Bool
success then () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure () else Ticket a -> IO ()
loop Ticket a
ticket1

atomicMaybeModifyArray :: forall a. MutableArray RealWorld a -> Int -> (a -> Maybe a) -> IO Bool
atomicMaybeModifyArray :: forall a.
MutableArray RealWorld a -> Int -> (a -> Maybe a) -> IO Bool
atomicMaybeModifyArray MutableArray RealWorld a
buckets Int
index a -> Maybe a
doDelete = do
  Ticket a
ticket0 <- MutableArray RealWorld a -> Int -> IO (Ticket a)
forall a. MutableArray RealWorld a -> Int -> IO (Ticket a)
Atomics.readArrayElem MutableArray RealWorld a
buckets Int
index
  Ticket a -> IO Bool
loop Ticket a
ticket0
  where
    loop :: Atomics.Ticket a -> IO Bool
    loop :: Ticket a -> IO Bool
loop Ticket a
ticket =
      case a -> Maybe a
doDelete (Ticket a -> a
forall a. Ticket a -> a
Atomics.peekTicket Ticket a
ticket) of
        Maybe a
Nothing -> Bool -> IO Bool
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
False
        Just a
bucket -> do
          (Bool
success, Ticket a
ticket1) <- MutableArray RealWorld a
-> Int -> Ticket a -> a -> IO (Bool, Ticket a)
forall a.
MutableArray RealWorld a
-> Int -> Ticket a -> a -> IO (Bool, Ticket a)
Atomics.casArrayElem MutableArray RealWorld a
buckets Int
index Ticket a
ticket a
bucket
          if Bool
success then Bool -> IO Bool
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
True else Ticket a -> IO Bool
loop Ticket a
ticket1

atomicExtractExpiredTimersFromBucket :: MutableArray RealWorld (Bucket Timer0) -> Int -> Timestamp -> IO (Bucket Timer0)
atomicExtractExpiredTimersFromBucket :: MutableArray RealWorld (Bucket Timer0)
-> Int -> Timestamp -> IO (Bucket Timer0)
atomicExtractExpiredTimersFromBucket MutableArray RealWorld (Bucket Timer0)
buckets Int
index Timestamp
now = do
  Ticket (Bucket Timer0)
ticket0 <- MutableArray RealWorld (Bucket Timer0)
-> Int -> IO (Ticket (Bucket Timer0))
forall a. MutableArray RealWorld a -> Int -> IO (Ticket a)
Atomics.readArrayElem MutableArray RealWorld (Bucket Timer0)
buckets Int
index
  Ticket (Bucket Timer0) -> IO (Bucket Timer0)
loop Ticket (Bucket Timer0)
ticket0
  where
    loop :: Atomics.Ticket (Bucket Timer0) -> IO (Bucket Timer0)
    loop :: Ticket (Bucket Timer0) -> IO (Bucket Timer0)
loop Ticket (Bucket Timer0)
ticket = do
      let Bucket.Pair Bucket Timer0
expired Bucket Timer0
bucket1 = Timestamp -> Bucket Timer0 -> Pair (Bucket Timer0) (Bucket Timer0)
forall a. Timestamp -> Bucket a -> Pair (Bucket a) (Bucket a)
Bucket.partition Timestamp
now (Ticket (Bucket Timer0) -> Bucket Timer0
forall a. Ticket a -> a
Atomics.peekTicket Ticket (Bucket Timer0)
ticket)
      if Bucket Timer0 -> Bool
forall a. Bucket a -> Bool
Bucket.isEmpty Bucket Timer0
expired
        then Bucket Timer0 -> IO (Bucket Timer0)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bucket Timer0
forall a. Bucket a
Bucket.empty
        else do
          (Bool
success, Ticket (Bucket Timer0)
ticket1) <- MutableArray RealWorld (Bucket Timer0)
-> Int
-> Ticket (Bucket Timer0)
-> Bucket Timer0
-> IO (Bool, Ticket (Bucket Timer0))
forall a.
MutableArray RealWorld a
-> Int -> Ticket a -> a -> IO (Bool, Ticket a)
Atomics.casArrayElem MutableArray RealWorld (Bucket Timer0)
buckets Int
index Ticket (Bucket Timer0)
ticket Bucket Timer0
bucket1
          if Bool
success then Bucket Timer0 -> IO (Bucket Timer0)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bucket Timer0
expired else Ticket (Bucket Timer0) -> IO (Bucket Timer0)
loop Ticket (Bucket Timer0)
ticket1

------------------------------------------------------------------------------------------------------------------------
-- Timer reaper thread
--
-- The main loop is rather simple, but the code is somewhat fiddly. In brief, the reaper thread wakes up to fire all of
-- the expired timers in bucket N, then sleeps, then wakes up to fire all of the expired timers in bucket N+1, then
-- sleeps, and so on, forever.
--
-- It wakes up on the "bucket boundaries", that is,
--
--   +------+------+------+------+------+------+------+------+------+------+
--   |      |      |      |      |      |      |      |      |      |      |
--   |      |      |      |      |      |      |      |      |      |      |
--   +------+------+------+------+------+------+------+------+------+------+
--                           ^   ^
--                           |   we wake up around here
--                           |
--                           to fire all of the expired timers stored here
--
-- It's entirely possible the reaper thread gets hopelessly behind, that is, it's taken so long to expire all of the
-- timers in previous buckets that we're behind schedule an entire bucket or more. That might look like this:
--
--   +------+------+------+------+------+------+------+------+------+------+
--   |      |      |      |      |      |      |      |      |      |      |
--   |      |      |      |      |      |      |      |      |      |      |
--   +------+------+------+------+------+------+------+------+------+------+
--                           ^                    ^
--                           |                    we are very behind, and enter the loop around here
--                           |
--                           yet we nonetheless fire all of the expired timers stored here, as if we were on time
--
-- That's accomplished simplly by maintaining in the loop state the "ideal" time that we wake up, ignoring reality. We
-- only ultimately check the *actual* current time when determining how long to *sleep* after expiring all of the timers
-- in the current bucket. If we're behind schedule, we won't sleep at all.
--
--   +------+------+------+------+------+------+------+------+------+------+
--   |      |      |      |      |      |      |      |      |      |      |
--   |      |      |      |      |      |      |      |      |      |      |
--   +------+------+------+------+------+------+------+------+------+------+
--                           ^   ^                  ^
--                           |   |                  |
--                           |   we enter the loop with this "ideal" time
--                           |                      |
--                           to fire timers in here |
--                                                  |
--                                                  not caring how far ahead the actual current time is
--
-- On to expiring timers: a "bucket" of timers is stored at each array index, which can be partitioned into "expired"
-- (meant to fire at or before the ideal time) and "not expired" (to expire on a subsequent wrap around the bucket
-- array).
--
--   +-----------------------+
--   |           /           |
--   | expired  /            |
--   |         / not expired |
--   |        /              |
--   +-----------------------+
--
-- The reaper thread simply atomically partitions the bucket, keeping the expired collection for itself, and putting the
-- not-expired collection back in the array.
--
-- Next, the timers are carefully fired one-by-one, in timestamp order. It's possible that two or more timers are
-- scheduled to expire concurrently (i.e. on the same nanosecond); that's fine: we fire them in the order they were
-- scheduled.
--
-- Let's say this is our set of timers to fire.
--
--    Ideal time         Timers to fire
--   +--------------+   +-----------------------------+
--   | 700          |   | Expiry | Type               |
--   +--------------+   +--------+--------------------+
--                      | 630    | One-shot           |
--    Next ideal time   | 643    | Recurring every 10 |
--   +--------------+   | 643    | One-shot           |
--   | 800          |   | 689    | Recurring every 80 |
--   +--------------+   +--------+--------------------+
--
-- Expiring a one-shot timer is simple: call the IO action and move on.
--
-- Expiring a recurring timer is less simple (but still simple): call the IO action, then schedule the next occurrence.
-- There are two possibilities.
--
--   1. The next occurrence is *at or before* the ideal time, which means it ought to fire along with the other timers
--      in the queue, right now. So, insert it into the collection of timers to fire.
--
--   2. The next occurrence is *after* the ideal time, so enqueue it in the array of buckets wherever it belongs.
--
-- After all expired timers are fired, the reaper thread has one last decision to make: how long should we sleep? We
-- get the current timestamp, and if it's still before the next ideal time (i.e. the current ideal time plus the wheel
-- resolution), then we sleep for the difference.
--
-- If the actual time is at or after the next ideal time, that's kind of bad - it means the reaper thread is behind
-- schedule. The user's enqueued actions have taken too long, or their wheel resolution is too short. Anyway, it's not
-- our problem, our behavior doesn't change per whether we are behind schedule or not.
runTimerReaperThread :: MutableArray RealWorld (Bucket Timer0) -> Counter -> Nanoseconds -> IO void
runTimerReaperThread :: forall void.
MutableArray RealWorld (Bucket Timer0)
-> Counter -> Nanoseconds -> IO void
runTimerReaperThread MutableArray RealWorld (Bucket Timer0)
buckets Counter
numTimers Nanoseconds
resolution = do
  -- Sleep until the very first bucket of timers expires
  --
  --     resolution                         = 100
  --     now                                = 184070
  --     progress   = now % resolution      = 70
  --     remaining  = resolution - progress = 30
  --     idealTime  = now + remaining       = 184100
  --
  --   +-------------------------+----------------+---------
  --   | progress = 70           | remaining = 30 |
  --   +-------------------------+----------------+
  --   | resolution = 100                         |
  --   +------------------------------------------+---------
  --                             ^                ^
  --                             now              idealTime
  Timestamp
now <- IO Timestamp
Timestamp.now
  let progress :: Nanoseconds
progress = Timestamp
now Timestamp -> Nanoseconds -> Nanoseconds
`Timestamp.intoEpoch` Nanoseconds
resolution
  let remaining :: Nanoseconds
remaining = Nanoseconds
resolution Nanoseconds -> Nanoseconds -> Nanoseconds
`Nanoseconds.unsafeMinus` Nanoseconds
progress
  Nanoseconds -> IO ()
Nanoseconds.sleep Nanoseconds
remaining
  -- Enter the Loop™
  let idealTime :: Timestamp
idealTime = Timestamp
now Timestamp -> Nanoseconds -> Timestamp
`Timestamp.plus` Nanoseconds
remaining
  Timestamp -> Int -> IO void
forall void. Timestamp -> Int -> IO void
theLoop Timestamp
idealTime (MutableArray RealWorld (Bucket Timer0)
-> Nanoseconds -> Timestamp -> Int
forall bucket.
MutableArray RealWorld bucket -> Nanoseconds -> Timestamp -> Int
timestampToIndex MutableArray RealWorld (Bucket Timer0)
buckets Nanoseconds
resolution Timestamp
now)
  where
    -- `index` could be derived from `thisTime`, but it's cheaper to just store it separately and bump by 1 as we go
    theLoop :: Timestamp -> Int -> IO void
    theLoop :: forall void. Timestamp -> Int -> IO void
theLoop !Timestamp
idealTime !Int
index = do
      Bucket Timer0
expired2 <- MutableArray RealWorld (Bucket Timer0)
-> Int -> Timestamp -> IO (Bucket Timer0)
atomicExtractExpiredTimersFromBucket MutableArray RealWorld (Bucket Timer0)
buckets Int
index Timestamp
idealTime
      Bucket Timer0 -> IO ()
fireTimerBucket Bucket Timer0
expired2
      let !nextIdealTime :: Timestamp
nextIdealTime = Timestamp
idealTime Timestamp -> Nanoseconds -> Timestamp
`Timestamp.plus` Nanoseconds
resolution
      Timestamp
now <- IO Timestamp
Timestamp.now
      Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Timestamp
nextIdealTime Timestamp -> Timestamp -> Bool
forall a. Ord a => a -> a -> Bool
> Timestamp
now) (Nanoseconds -> IO ()
Nanoseconds.sleep (Timestamp
nextIdealTime Timestamp -> Timestamp -> Nanoseconds
`Timestamp.unsafeMinus` Timestamp
now))
      Timestamp -> Int -> IO void
forall void. Timestamp -> Int -> IO void
theLoop Timestamp
nextIdealTime ((Int
index Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1) Int -> Int -> Int
forall a. Integral a => a -> a -> a
`rem` MutableArray RealWorld (Bucket Timer0) -> Int
forall s a. MutableArray s a -> Int
Array.sizeofMutableArray MutableArray RealWorld (Bucket Timer0)
buckets)
      where
        fireTimerBucket :: Bucket Timer0 -> IO ()
        fireTimerBucket :: Bucket Timer0 -> IO ()
fireTimerBucket Bucket Timer0
bucket0 =
          case Bucket Timer0 -> Pop Timer0
forall a. Bucket a -> Pop a
Bucket.pop Bucket Timer0
bucket0 of
            Pop Timer0
Bucket.PopNada -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
            Bucket.PopAlgo Int
timerId Timestamp
timestamp Timer0
timer Bucket Timer0
bucket1 -> do
              Bucket Timer0
expired2 <- Bucket Timer0 -> Int -> Timestamp -> Timer0 -> IO (Bucket Timer0)
fireTimer Bucket Timer0
bucket1 Int
timerId Timestamp
timestamp Timer0
timer
              Bucket Timer0 -> IO ()
fireTimerBucket Bucket Timer0
expired2

        fireTimer :: Bucket Timer0 -> TimerId -> Timestamp -> Timer0 -> IO (Bucket Timer0)
        fireTimer :: Bucket Timer0 -> Int -> Timestamp -> Timer0 -> IO (Bucket Timer0)
fireTimer Bucket Timer0
bucket Int
timerId Timestamp
timestamp Timer0
timer =
          case Timer0
timer of
            OneShot1 IO ()
action -> do
              IO ()
action
              Counter -> IO ()
decrCounter_ Counter
numTimers
              Bucket Timer0 -> IO (Bucket Timer0)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bucket Timer0
bucket
            Recurring1 IO ()
action Nanoseconds
delay IORef Bool
canceledRef ->
              IORef Bool -> IO Bool
forall a. IORef a -> IO a
readIORef IORef Bool
canceledRef IO Bool -> (Bool -> IO (Bucket Timer0)) -> IO (Bucket Timer0)
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
                Bool
True -> Bucket Timer0 -> IO (Bucket Timer0)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bucket Timer0
bucket
                Bool
False -> do
                  IO ()
action
                  Timestamp -> IO (Bucket Timer0)
scheduleNextOccurrence (Timestamp
timestamp Timestamp -> Nanoseconds -> Timestamp
`Timestamp.plus` Nanoseconds
delay)
            Recurring1_ IO ()
action Nanoseconds
delay -> do
              IO ()
action
              Timestamp -> IO (Bucket Timer0)
scheduleNextOccurrence (Timestamp
timestamp Timestamp -> Nanoseconds -> Timestamp
`Timestamp.plus` Nanoseconds
delay)
          where
            scheduleNextOccurrence :: Timestamp -> IO (Bucket Timer0)
            scheduleNextOccurrence :: Timestamp -> IO (Bucket Timer0)
scheduleNextOccurrence Timestamp
nextOccurrence =
              if Timestamp
nextOccurrence Timestamp -> Timestamp -> Bool
forall a. Ord a => a -> a -> Bool
< Timestamp
idealTime
                then Bucket Timer0 -> IO (Bucket Timer0)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Bucket Timer0 -> IO (Bucket Timer0))
-> Bucket Timer0 -> IO (Bucket Timer0)
forall a b. (a -> b) -> a -> b
$! Bucket Timer0 -> Bucket Timer0
insertNextOccurrence Bucket Timer0
bucket
                else do
                  MutableArray RealWorld (Bucket Timer0)
-> Int -> (Bucket Timer0 -> Bucket Timer0) -> IO ()
forall a. MutableArray RealWorld a -> Int -> (a -> a) -> IO ()
atomicModifyArray
                    MutableArray RealWorld (Bucket Timer0)
buckets
                    (MutableArray RealWorld (Bucket Timer0)
-> Nanoseconds -> Timestamp -> Int
forall bucket.
MutableArray RealWorld bucket -> Nanoseconds -> Timestamp -> Int
timestampToIndex MutableArray RealWorld (Bucket Timer0)
buckets Nanoseconds
resolution Timestamp
nextOccurrence)
                    Bucket Timer0 -> Bucket Timer0
insertNextOccurrence
                  Bucket Timer0 -> IO (Bucket Timer0)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bucket Timer0
bucket
              where
                insertNextOccurrence :: Bucket Timer0 -> Bucket Timer0
                insertNextOccurrence :: Bucket Timer0 -> Bucket Timer0
insertNextOccurrence =
                  Int -> Timestamp -> Timer0 -> Bucket Timer0 -> Bucket Timer0
forall a. Int -> Timestamp -> a -> Bucket a -> Bucket a
Bucket.insert Int
timerId Timestamp
nextOccurrence Timer0
timer