{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE BangPatterns #-}
{- |
Module: Data.Queue
Description: A real-time, concurrent, and mutable queue
Copyright: (c) Samuel Schlesinger 2020
License: MIT
Maintainer: sgschlesinger@gmail.com
Stability: experimental
Portability: POSIX, Windows
-}
module Data.Queue
( Queue
, newQueue
, peek
, tryPeek
, enqueue
, dequeue
, tryDequeue
, flush
) where

import Control.Concurrent
import Control.Concurrent.STM

-- | Real time 'Queue' backed by transactional variables ('TVar's)
data Queue a = Queue
  {-# UNPACK #-} !(TVar ([a], [a]))
  {-# UNPACK #-} !(TVar ([a], [a]))

-- | Create a new, empty 'Queue'
newQueue :: STM (Queue a)
newQueue :: STM (Queue a)
newQueue = TVar ([a], [a]) -> TVar ([a], [a]) -> Queue a
forall a. TVar ([a], [a]) -> TVar ([a], [a]) -> Queue a
Queue
  (TVar ([a], [a]) -> TVar ([a], [a]) -> Queue a)
-> STM (TVar ([a], [a])) -> STM (TVar ([a], [a]) -> Queue a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ([a], [a]) -> STM (TVar ([a], [a]))
forall a. a -> STM (TVar a)
newTVar ([], [])
  STM (TVar ([a], [a]) -> Queue a)
-> STM (TVar ([a], [a])) -> STM (Queue a)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> ([a], [a]) -> STM (TVar ([a], [a]))
forall a. a -> STM (TVar a)
newTVar ([], [])

rotate :: [a] -> [a] -> [a]
rotate :: [a] -> [a] -> [a]
rotate [a]
xs [a]
ys = [a] -> [a] -> [a] -> [a]
forall a. [a] -> [a] -> [a] -> [a]
go [a]
xs [a]
ys []
  where
  go :: [a] -> [a] -> [a] -> [a]
go [] [a]
bottom [a]
acc = [a]
bottom [a] -> [a] -> [a]
forall a. [a] -> [a] -> [a]
++ [a]
acc
  go (a
t:[a]
ts) (a
b:[a]
bs) [a]
acc = a
t a -> [a] -> [a]
forall a. a -> [a] -> [a]
: [a] -> [a] -> [a] -> [a]
go [a]
ts [a]
bs (a
ba -> [a] -> [a]
forall a. a -> [a] -> [a]
:[a]
acc)
  go [a]
ts [] [a]
acc = [a]
ts [a] -> [a] -> [a]
forall a. [a] -> [a] -> [a]
++ [a]
acc

-- | Enqueue a single item onto the 'Queue'.
enqueue :: Queue a -> a -> STM ()
enqueue :: Queue a -> a -> STM ()
enqueue q :: Queue a
q@(Queue TVar ([a], [a])
top TVar ([a], [a])
bottom) a
a = do
  ([a]
bs, [a]
sbs) <- TVar ([a], [a]) -> STM ([a], [a])
forall a. TVar a -> STM a
readTVar TVar ([a], [a])
bottom
  let bs' :: [a]
bs' = a
a a -> [a] -> [a]
forall a. a -> [a] -> [a]
: [a]
bs
  case [a]
sbs of
    a
_:a
_:[a]
sbs' -> do
      TVar ([a], [a]) -> ([a], [a]) -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar ([a], [a])
bottom ([a]
bs', [a]
sbs')
    [a]
_ -> do
      ([a]
ts, [a]
_sts) <- TVar ([a], [a]) -> STM ([a], [a])
forall a. TVar a -> STM a
readTVar TVar ([a], [a])
top
      let ts' :: [a]
ts' = [a] -> [a] -> [a]
forall a. [a] -> [a] -> [a]
rotate [a]
ts [a]
bs'
      TVar ([a], [a]) -> ([a], [a]) -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar ([a], [a])
bottom ([], [a]
ts')
      TVar ([a], [a]) -> ([a], [a]) -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar ([a], [a])
top ([a]
ts', [a]
ts')

-- | Dequeue a single item onto the 'Queue', 'retry'ing if there is nothing
-- there. This is the motivating use case of this library, allowing a thread to
-- register its interest in the head of a 'Queue' and be woken up by the
-- runtime system to read from the top of that 'Queue' when an item has
-- been made available.
dequeue :: Queue a -> STM a
dequeue :: Queue a -> STM a
dequeue q :: Queue a
q@(Queue TVar ([a], [a])
top TVar ([a], [a])
bottom) = do
  ([a]
ts, [a]
sts) <- TVar ([a], [a]) -> STM ([a], [a])
forall a. TVar a -> STM a
readTVar TVar ([a], [a])
top
  case [a]
ts of
    [] -> STM a
forall a. STM a
retry
    a
t:[a]
ts' ->
      case [a]
sts of
        a
_:a
_:[a]
sts' -> do
          TVar ([a], [a]) -> ([a], [a]) -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar ([a], [a])
top ([a]
ts', [a]
sts')
          a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure a
t
        [a]
_ -> do
          ([a]
bs, [a]
_) <- TVar ([a], [a]) -> STM ([a], [a])
forall a. TVar a -> STM a
readTVar TVar ([a], [a])
bottom
          let !ts'' :: [a]
ts'' = [a] -> [a] -> [a]
forall a. [a] -> [a] -> [a]
rotate [a]
ts' [a]
bs
          TVar ([a], [a]) -> ([a], [a]) -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar ([a], [a])
bottom ([], [a]
ts'')
          TVar ([a], [a]) -> ([a], [a]) -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar ([a], [a])
top ([a]
ts'', [a]
ts'')
          a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure a
t

-- | Try to 'dequeue' a single item. This function is offered to allow
-- users to easily port from the 'TQueue' offered in the stm package,
-- but is not the intended usage of the library.
tryDequeue :: Queue a -> STM (Maybe a)
tryDequeue :: Queue a -> STM (Maybe a)
tryDequeue q :: Queue a
q@(Queue TVar ([a], [a])
top TVar ([a], [a])
bottom) = do
  ([a]
ts, [a]
sts) <- TVar ([a], [a]) -> STM ([a], [a])
forall a. TVar a -> STM a
readTVar TVar ([a], [a])
top
  case [a]
ts of
    [] -> Maybe a -> STM (Maybe a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe a
forall a. Maybe a
Nothing
    a
t:[a]
ts' ->
      case [a]
sts of
        a
_:a
_:[a]
sts' -> do
          TVar ([a], [a]) -> ([a], [a]) -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar ([a], [a])
top ([a]
ts', [a]
sts')
          Maybe a -> STM (Maybe a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (a -> Maybe a
forall a. a -> Maybe a
Just a
t)
        [a]
_ -> do
          ([a]
bs, [a]
_) <- TVar ([a], [a]) -> STM ([a], [a])
forall a. TVar a -> STM a
readTVar TVar ([a], [a])
bottom
          let !ts'' :: [a]
ts'' = [a] -> [a] -> [a]
forall a. [a] -> [a] -> [a]
rotate [a]
ts' [a]
bs
          TVar ([a], [a]) -> ([a], [a]) -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar ([a], [a])
bottom ([], [a]
ts'')
          TVar ([a], [a]) -> ([a], [a]) -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar ([a], [a])
top ([a]
ts'', [a]
ts'')
          Maybe a -> STM (Maybe a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (a -> Maybe a
forall a. a -> Maybe a
Just a
t)

-- | Peek at the top of the 'Queue', returning the top element.
peek :: Queue a -> STM a
peek :: Queue a -> STM a
peek (Queue TVar ([a], [a])
top TVar ([a], [a])
_bottom) =
  TVar ([a], [a]) -> STM ([a], [a])
forall a. TVar a -> STM a
readTVar TVar ([a], [a])
top STM ([a], [a]) -> (([a], [a]) -> STM a) -> STM a
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
    (a
x : [a]
xs, [a]
_) -> a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure a
x
    ([], [a]
_) -> STM a
forall a. STM a
retry

-- | Try to 'peek' for the top item of the 'Queue'. This function is
-- offered to easily port from the 'TQueue' offered in the stm package,
-- but is not the intended usage of the library.
tryPeek :: Queue a -> STM (Maybe a)
tryPeek :: Queue a -> STM (Maybe a)
tryPeek (Queue TVar ([a], [a])
top TVar ([a], [a])
_bottom) =
  TVar ([a], [a]) -> STM ([a], [a])
forall a. TVar a -> STM a
readTVar TVar ([a], [a])
top STM ([a], [a]) -> (([a], [a]) -> STM (Maybe a)) -> STM (Maybe a)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
    (a
x : [a]
xs, [a]
_) -> Maybe a -> STM (Maybe a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (a -> Maybe a
forall a. a -> Maybe a
Just a
x)
    ([], [a]
_) -> Maybe a -> STM (Maybe a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe a
forall a. Maybe a
Nothing  

-- | Efficiently read the entire contents of a 'Queue' into a list.
flush :: Queue a -> STM [a]
flush :: Queue a -> STM [a]
flush (Queue TVar ([a], [a])
top TVar ([a], [a])
bottom) = do
  ([a]
xs, [a]
_) <- TVar ([a], [a]) -> ([a], [a]) -> STM ([a], [a])
forall a. TVar a -> a -> STM a
swapTVar TVar ([a], [a])
top ([], [])
  ([a]
ys, [a]
_) <- TVar ([a], [a]) -> ([a], [a]) -> STM ([a], [a])
forall a. TVar a -> a -> STM a
swapTVar TVar ([a], [a])
bottom ([], [])
  [a] -> STM [a]
forall (f :: * -> *) a. Applicative f => a -> f a
pure ([a] -> [a] -> [a]
forall a. [a] -> [a] -> [a]
rotate [a]
xs [a]
ys)