{-# LANGUAGE ScopedTypeVariables #-} -- | -- Module : Data.Array.Accelerate.LLVM.PTX.Pool -- Copyright : [2017] Trevor L. McDonell -- License : BSD3 -- -- Maintainer : Trevor L. McDonell -- Stability : experimental -- Portability : non-portable (GHC extensions) -- module Data.Array.Accelerate.LLVM.PTX.Pool ( Pool, create, with, take, put, unsafeWith, ) where import Control.Concurrent.MVar import Control.Exception import Data.Maybe import System.IO.Unsafe import Prelude hiding ( take ) import Data.Sequence ( Seq ) import qualified Data.Sequence as Seq -- | An item pool -- -- Based on 'Control.Concurrent.QSem' -- data Pool a = Pool {-# UNPACK #-} !(MVar ([a], Seq (MVar a))) -- The semaphore state (as, bs): -- -- * as the currently available resources -- -- * bs is the queue of blocked threads, stored in FIFO order. New threads are -- queued onto the right, and threads are woken up from the left. -- -- A blocked thread is represented by an empty (MVar a). To unblock the thread, -- we give it a resource via its MVar. -- -- A thread can deque itself by also putting () into the MVar, which it must do -- if it receives an exception while blocked in 'take'. This means that when -- unblocking a thread in 'put' we must first check whether the MVar is already -- full; the MVar lock on the semaphore itself resolves race conditions between -- put and a thread attempting to deque itself. -- -- | Build a new pool with the supplied initial quantity. -- create :: [a] -> IO (Pool a) create initial = Pool <$> newMVar (initial, Seq.empty) -- | Wait for a unit of the resource to become available, and run the supplied -- action given that resource. -- with :: Pool a -> (a -> IO b) -> IO b with pool action = bracket (take pool) (put pool) action unsafeWith :: Pool a -> (a -> b) -> b unsafeWith pool action = unsafePerformIO $ with pool (evaluate . action) -- | Wait for an item from the pool to become available. -- take :: Pool a -> IO a take (Pool ref) = mask_ $ do (r, bs) <- takeMVar ref case r of [] -> do b <- newEmptyMVar putMVar ref (r, bs Seq.|> b) wait b (a:as) -> do putMVar ref (as, bs) return a where wait b = takeMVar b `catch` \(e :: SomeException) -> uninterruptibleMask_ $ do -- Note [signal interruptible] r <- takeMVar ref ma <- tryTakeMVar b r' <- case ma of Just a -> signal a r -- make sure we don't lose the resource Nothing -> do putMVar b (throw e) -- unblock the thread?? return r putMVar ref r' throwIO e -- | Return a unit of the resource to the pool. -- put :: Pool a -> a -> IO () put (Pool ref) a = uninterruptibleMask_ $ do -- Note [signal interruptible] r <- takeMVar ref r' <- signal a r putMVar ref r' -- Note [signal interruptible] -- -- If we have: -- -- > bracket take put (...) -- -- and an exception arrives at the put, then we must not lose the resource. The -- put is masked by bracket, but taking the MVar might block, and so it would be -- interruptible. Hence we need an uninterruptibleMask here. -- signal :: a -> ([a], Seq (MVar a)) -> IO ([a], Seq (MVar a)) signal a (as, blocked) = if null as then loop blocked -- there may be waiting threads; wake one up else return (a:as, blocked) -- nobody waiting where loop blocked' = case Seq.viewl blocked' of Seq.EmptyL -> return ([a], Seq.empty) b Seq.:< bs -> do r <- tryPutMVar b a if r then return ([], bs) -- we woke up a thread else loop bs -- already unblocked; drop from the queue {-- -- | An item pool -- data Pool a = Pool {-# UNPACK #-} !(MVar (NonEmpty a)) -- | Create a new pooled resource containing the given items -- create :: [a] -> IO (Pool a) create [] = Pool <$> newEmptyMVar create (x:xs) = Pool <$> newMVar (x :| xs) -- | Execute an operation using an item from the pool. Like 'take', the function -- blocks until one becomes available. -- with :: Pool a -> (a -> IO b) -> IO b with pool action = bracket (take pool) (put pool) action unsafeWith :: Pool a -> (a -> b) -> b unsafeWith pool action = unsafePerformIO $ with pool (pure . action) -- | Take an item from the pool. This will block until one is available. -- take :: Pool a -> IO a take (Pool ref) = do x :| xs <- takeMVar ref -- blocking case xs of [] -> return () -- leave the pool empty; subsequent 'take's will block (a:as) -> mask_ $ do r <- tryTakeMVar ref case r of Nothing -> putMVar ref (a :| as) Just (b :| bs) -> putMVar ref (a :| b : bs ++ as) return x -- | Return an item back to the pool for later reuse. This should be -- a non-blocking operation. -- put :: Pool a -> a -> IO () put (Pool ref) a = mask_ $ do it <- tryTakeMVar ref case it of Just (b :| bs) -> putMVar ref (a :| b : bs) Nothing -> putMVar ref (a :| []) #if __GLASGOW_HASKELL__ < 800 -- | Non-empty (and non-strict) list type. -- infixr 5 :| data NonEmpty a = a :| [a] deriving ( Eq, Ord, Show, Read ) #endif --}