module Control.Concurrent.STM.TChunkedQueue (
TChunkedQueue,
ChunkedQueue,
consumeQueue,
newTChunkedQueue,
newTChunkedQueueIO,
drainTChunkedQueue,
tryDrainTChunkedQueue,
writeTChunkedQueue,
writeManyTChunkedQueue,
isEmptyTChunkedQueue,
drainAndSettleTChunkedQueue,
drainWithTimeoutTChunkedQueue,
) where
import Data.Typeable (Typeable)
import Prelude hiding (reads)
import Control.Applicative ((<$>))
import Control.Monad
import Control.Monad.STM (STM, retry, atomically)
import Control.Concurrent.STM.TVar
import Control.Concurrent.Async (race)
import Control.Concurrent (threadDelay)
import Control.Concurrent.STM.ChunkedQueue
data TChunkedQueue a = TChunkedQueue
!(TVar (ChunkedQueue a))
deriving Typeable
newTChunkedQueue :: STM (TChunkedQueue a)
newTChunkedQueue = TChunkedQueue <$> newTVar (ChunkedQueue [])
newTChunkedQueueIO :: IO (TChunkedQueue a)
newTChunkedQueueIO = TChunkedQueue <$> newTVarIO (ChunkedQueue [])
drainTChunkedQueue :: TChunkedQueue a -> STM (ChunkedQueue a)
drainTChunkedQueue (TChunkedQueue tChQueue) = do
chQueue <- readTVar tChQueue
case chQueue of
ChunkedQueue [] -> retry
_ -> do
writeTVar tChQueue (ChunkedQueue [])
return chQueue
tryDrainTChunkedQueue :: TChunkedQueue a -> STM (ChunkedQueue a)
tryDrainTChunkedQueue (TChunkedQueue tChQueue) = do
chQueue <- readTVar tChQueue
case chQueue of
ChunkedQueue [] -> return ()
_ -> writeTVar tChQueue (ChunkedQueue [])
return chQueue
writeManyTChunkedQueue :: TChunkedQueue a -> [a] -> STM ()
writeManyTChunkedQueue (TChunkedQueue tChQueue) xs = do
chQueue <- readTVar tChQueue
writeTVar tChQueue $ enqueueMany chQueue xs
writeTChunkedQueue :: TChunkedQueue a -> a -> STM ()
writeTChunkedQueue (TChunkedQueue tChQueue) x = do
chQueue <- readTVar tChQueue
writeTVar tChQueue $ enqueueOne chQueue x
isEmptyTChunkedQueue :: TChunkedQueue a -> STM Bool
isEmptyTChunkedQueue (TChunkedQueue tChQueue) = do
ChunkedQueue chunks <- readTVar tChQueue
return $ null chunks
drainAndSettleTChunkedQueue :: Int
-> TChunkedQueue a
-> IO (ChunkedQueue a)
drainAndSettleTChunkedQueue delay queue = do
ChunkedQueue chunks <- atomically $ drainTChunkedQueue queue
go chunks
where
go acc = do
threadDelay delay
ChunkedQueue chunks <- atomically $ tryDrainTChunkedQueue queue
case chunks of
[] -> return $ ChunkedQueue acc
_ -> go (chunks ++ acc)
drainWithTimeoutTChunkedQueue :: Int
-> TChunkedQueue a
-> IO (ChunkedQueue a)
drainWithTimeoutTChunkedQueue delay queue = do
stashedQueue <- newTChunkedQueueIO
let transferItems = atomically $ do
items <- drainTChunkedQueue queue
stashedQueue `writeManyTChunkedQueue` (consumeQueue items)
transferItems
withTimeout delay (forever transferItems)
atomically $ drainTChunkedQueue stashedQueue
where
withTimeout t action = void $ action `race` threadDelay t