{-# LANGUAGE BangPatterns #-}
module Control.Concurrent.BFQueue
( BFQueue
, newBFQueue
, writeBFQueue
, tryWriteBFQueue
, takeBFQueue
, flushBFQueue
, lengthBFQueue
, isEmptyBFQueue
) where
import Control.Concurrent.BQueue
import Control.Concurrent.MVar
import Control.Monad
import Data.Atomics (atomicModifyIORefCAS)
import Data.IORef
import Numeric.Natural
newtype BFQueue a = BFQueue (IORef (BQueue a, MVar ()))
newBFQueue :: Natural -> IO (BFQueue a)
newBFQueue bound = do
baton <- newEmptyMVar
bQueueIORef <- newIORef (newBQueue $ fromIntegral bound, baton)
return $ BFQueue bQueueIORef
writeBFQueue :: BFQueue a -> a -> IO ()
writeBFQueue (BFQueue bQueueIORef) x = inner
where
inner = join $ atomicModifyIORefCAS bQueueIORef $ \bbQueue@(bQueue, baton) ->
case pushBQueue x bQueue of
Just newQueue -> ((newQueue, baton), pure ())
Nothing -> (bbQueue, readMVar baton >> inner)
tryWriteBFQueue :: BFQueue a -> a -> IO Bool
tryWriteBFQueue (BFQueue bQueueIORef) x =
atomicModifyIORefCAS bQueueIORef $ \bbQueue@(bQueue, baton) ->
case pushBQueue x bQueue of
Just newQueue -> ((newQueue, baton), True)
Nothing -> (bbQueue, False)
flushBFQueue :: BFQueue a -> IO [a]
flushBFQueue (BFQueue bQueueIORef) = do
newBaton <- newEmptyMVar
join $
atomicModifyIORefCAS bQueueIORef $ \(bQueue, baton) ->
let !(queue, newQueue) = flushBQueue bQueue
in ((newQueue, newBaton), queue <$ putMVar baton ())
takeBFQueue :: Natural -> BFQueue a -> IO [a]
takeBFQueue i (BFQueue bQueueIORef)
| i == 0 = return []
| otherwise = do
newBaton <- newEmptyMVar
join $
atomicModifyIORefCAS bQueueIORef $ \(bQueue, baton) ->
let !(queue, newQueue) = takeBQueue (fromIntegral i) bQueue
in ((newQueue, newBaton), queue <$ putMVar baton ())
lengthBFQueue :: BFQueue a -> IO Natural
lengthBFQueue (BFQueue bQueueIORef) = fromIntegral . lengthBQueue . fst <$> readIORef bQueueIORef
isEmptyBFQueue :: BFQueue a -> IO Bool
isEmptyBFQueue = fmap (==0) . lengthBFQueue