{-# LANGUAGE ScopedTypeVariables #-}
-- |
-- Module      : Data.Array.Accelerate.LLVM.PTX.Pool
-- Copyright   : [2017..2020] The Accelerate Team
-- License     : BSD3
--
-- Maintainer  : Trevor L. McDonell <trevor.mcdonell@gmail.com>
-- Stability   : experimental
-- Portability : non-portable (GHC extensions)
--

module Data.Array.Accelerate.LLVM.PTX.Pool (

  Pool,
  create, with, take, put,
  unsafeWith,

) where

import Control.Concurrent.MVar
import Control.Exception
import Data.Maybe
import System.IO.Unsafe
import Prelude                                                      hiding ( take )

import Data.Sequence                                                ( Seq )
import qualified Data.Sequence                                      as Seq


-- | An item pool
--
-- Based on 'Control.Concurrent.QSem'
--
data Pool a = Pool {-# UNPACK #-} !(MVar ([a], Seq (MVar a)))

-- The semaphore state (as, bs):
--
-- * as   the currently available resources
--
-- * bs   is the queue of blocked threads, stored in FIFO order. New threads are
--        queued onto the right, and threads are woken up from the left.
--
-- A blocked thread is represented by an empty (MVar a). To unblock the thread,
-- we give it a resource via its MVar.
--
-- A thread can deque itself by also putting () into the MVar, which it must do
-- if it receives an exception while blocked in 'take'. This means that when
-- unblocking a thread in 'put' we must first check whether the MVar is already
-- full; the MVar lock on the semaphore itself resolves race conditions between
-- put and a thread attempting to deque itself.
--

-- | Build a new pool with the supplied initial quantity.
--
create :: [a] -> IO (Pool a)
create :: [a] -> IO (Pool a)
create [a]
initial =
  MVar ([a], Seq (MVar a)) -> Pool a
forall a. MVar ([a], Seq (MVar a)) -> Pool a
Pool (MVar ([a], Seq (MVar a)) -> Pool a)
-> IO (MVar ([a], Seq (MVar a))) -> IO (Pool a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ([a], Seq (MVar a)) -> IO (MVar ([a], Seq (MVar a)))
forall a. a -> IO (MVar a)
newMVar ([a]
initial, Seq (MVar a)
forall a. Seq a
Seq.empty)

-- | Wait for a unit of the resource to become available, and run the supplied
-- action given that resource.
--
with :: Pool a -> (a -> IO b) -> IO b
with :: Pool a -> (a -> IO b) -> IO b
with Pool a
pool a -> IO b
action =
  IO a -> (a -> IO ()) -> (a -> IO b) -> IO b
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket (Pool a -> IO a
forall a. Pool a -> IO a
take Pool a
pool) (Pool a -> a -> IO ()
forall a. Pool a -> a -> IO ()
put Pool a
pool) a -> IO b
action

unsafeWith :: Pool a -> (a -> b) -> b
unsafeWith :: Pool a -> (a -> b) -> b
unsafeWith Pool a
pool a -> b
action =
  IO b -> b
forall a. IO a -> a
unsafePerformIO (IO b -> b) -> IO b -> b
forall a b. (a -> b) -> a -> b
$ Pool a -> (a -> IO b) -> IO b
forall a b. Pool a -> (a -> IO b) -> IO b
with Pool a
pool (b -> IO b
forall a. a -> IO a
evaluate (b -> IO b) -> (a -> b) -> a -> IO b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> b
action)


-- | Wait for an item from the pool to become available.
--
take :: Pool a -> IO a
take :: Pool a -> IO a
take (Pool MVar ([a], Seq (MVar a))
ref) =
  IO a -> IO a
forall a. IO a -> IO a
mask_ (IO a -> IO a) -> IO a -> IO a
forall a b. (a -> b) -> a -> b
$ do
    ([a]
r, Seq (MVar a)
bs) <- MVar ([a], Seq (MVar a)) -> IO ([a], Seq (MVar a))
forall a. MVar a -> IO a
takeMVar MVar ([a], Seq (MVar a))
ref
    case [a]
r of
      [] -> do
        MVar a
b <- IO (MVar a)
forall a. IO (MVar a)
newEmptyMVar
        MVar ([a], Seq (MVar a)) -> ([a], Seq (MVar a)) -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar ([a], Seq (MVar a))
ref ([a]
r, Seq (MVar a)
bs Seq (MVar a) -> MVar a -> Seq (MVar a)
forall a. Seq a -> a -> Seq a
Seq.|> MVar a
b)
        MVar a -> IO a
wait MVar a
b

      (a
a:[a]
as) -> do
        MVar ([a], Seq (MVar a)) -> ([a], Seq (MVar a)) -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar ([a], Seq (MVar a))
ref ([a]
as, Seq (MVar a)
bs)
        a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return a
a
  where
    wait :: MVar a -> IO a
wait MVar a
b =
      MVar a -> IO a
forall a. MVar a -> IO a
takeMVar MVar a
b IO a -> (SomeException -> IO a) -> IO a
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`catch` \(SomeException
e :: SomeException) ->
        IO a -> IO a
forall a. IO a -> IO a
uninterruptibleMask_ (IO a -> IO a) -> IO a -> IO a
forall a b. (a -> b) -> a -> b
$ do  -- Note [signal interruptible]
          ([a], Seq (MVar a))
r  <- MVar ([a], Seq (MVar a)) -> IO ([a], Seq (MVar a))
forall a. MVar a -> IO a
takeMVar MVar ([a], Seq (MVar a))
ref
          Maybe a
ma <- MVar a -> IO (Maybe a)
forall a. MVar a -> IO (Maybe a)
tryTakeMVar MVar a
b
          ([a], Seq (MVar a))
r' <- case Maybe a
ma of
                  Just a
a  -> a -> ([a], Seq (MVar a)) -> IO ([a], Seq (MVar a))
forall a. a -> ([a], Seq (MVar a)) -> IO ([a], Seq (MVar a))
signal a
a ([a], Seq (MVar a))
r               -- make sure we don't lose the resource
                  Maybe a
Nothing -> do MVar a -> a -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar a
b (SomeException -> a
forall a e. Exception e => e -> a
throw SomeException
e)   -- unblock the thread??
                                ([a], Seq (MVar a)) -> IO ([a], Seq (MVar a))
forall (m :: * -> *) a. Monad m => a -> m a
return ([a], Seq (MVar a))
r
          MVar ([a], Seq (MVar a)) -> ([a], Seq (MVar a)) -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar ([a], Seq (MVar a))
ref ([a], Seq (MVar a))
r'
          SomeException -> IO a
forall e a. Exception e => e -> IO a
throwIO SomeException
e


-- | Return a unit of the resource to the pool.
--
put :: Pool a -> a -> IO ()
put :: Pool a -> a -> IO ()
put (Pool MVar ([a], Seq (MVar a))
ref) a
a =
  IO () -> IO ()
forall a. IO a -> IO a
uninterruptibleMask_ (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do   -- Note [signal interruptible]
    ([a], Seq (MVar a))
r  <- MVar ([a], Seq (MVar a)) -> IO ([a], Seq (MVar a))
forall a. MVar a -> IO a
takeMVar MVar ([a], Seq (MVar a))
ref
    ([a], Seq (MVar a))
r' <- a -> ([a], Seq (MVar a)) -> IO ([a], Seq (MVar a))
forall a. a -> ([a], Seq (MVar a)) -> IO ([a], Seq (MVar a))
signal a
a ([a], Seq (MVar a))
r
    MVar ([a], Seq (MVar a)) -> ([a], Seq (MVar a)) -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar ([a], Seq (MVar a))
ref ([a], Seq (MVar a))
r'


-- Note [signal interruptible]
--
-- If we have:
--
-- > bracket take put (...)
--
-- and an exception arrives at the put, then we must not lose the resource. The
-- put is masked by bracket, but taking the MVar might block, and so it would be
-- interruptible. Hence we need an uninterruptibleMask here.
--
signal :: a -> ([a], Seq (MVar a)) -> IO ([a], Seq (MVar a))
signal :: a -> ([a], Seq (MVar a)) -> IO ([a], Seq (MVar a))
signal a
a ([a]
as, Seq (MVar a)
blocked) =
  if [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [a]
as
    then Seq (MVar a) -> IO ([a], Seq (MVar a))
loop Seq (MVar a)
blocked           -- there may be waiting threads; wake one up
    else ([a], Seq (MVar a)) -> IO ([a], Seq (MVar a))
forall (m :: * -> *) a. Monad m => a -> m a
return (a
aa -> [a] -> [a]
forall a. a -> [a] -> [a]
:[a]
as, Seq (MVar a)
blocked) -- nobody waiting
  where
    loop :: Seq (MVar a) -> IO ([a], Seq (MVar a))
loop Seq (MVar a)
blocked' =
      case Seq (MVar a) -> ViewL (MVar a)
forall a. Seq a -> ViewL a
Seq.viewl Seq (MVar a)
blocked' of
        ViewL (MVar a)
Seq.EmptyL  -> ([a], Seq (MVar a)) -> IO ([a], Seq (MVar a))
forall (m :: * -> *) a. Monad m => a -> m a
return ([a
a], Seq (MVar a)
forall a. Seq a
Seq.empty)
        MVar a
b Seq.:< Seq (MVar a)
bs -> do
          Bool
r <- MVar a -> a -> IO Bool
forall a. MVar a -> a -> IO Bool
tryPutMVar MVar a
b a
a
          if Bool
r then ([a], Seq (MVar a)) -> IO ([a], Seq (MVar a))
forall (m :: * -> *) a. Monad m => a -> m a
return ([], Seq (MVar a)
bs)     -- we woke up a thread
               else Seq (MVar a) -> IO ([a], Seq (MVar a))
loop Seq (MVar a)
bs             -- already unblocked; drop from the queue

{--
-- | An item pool
--
data Pool a = Pool {-# UNPACK #-} !(MVar (NonEmpty a))


-- | Create a new pooled resource containing the given items
--
create :: [a] -> IO (Pool a)
create []     = Pool <$> newEmptyMVar
create (x:xs) = Pool <$> newMVar (x :| xs)


-- | Execute an operation using an item from the pool. Like 'take', the function
-- blocks until one becomes available.
--
with :: Pool a -> (a -> IO b) -> IO b
with pool action =
  bracket (take pool) (put pool) action

unsafeWith :: Pool a -> (a -> b) -> b
unsafeWith pool action =
  unsafePerformIO $ with pool (pure . action)


-- | Take an item from the pool. This will block until one is available.
--
take :: Pool a -> IO a
take (Pool ref) = do
  x :| xs <- takeMVar ref -- blocking
  case xs of
    []     -> return ()   -- leave the pool empty; subsequent 'take's will block
    (a:as) -> mask_ $ do r <- tryTakeMVar ref
                         case r of
                           Nothing        -> putMVar ref (a :| as)
                           Just (b :| bs) -> putMVar ref (a :| b : bs ++ as)
  return x


-- | Return an item back to the pool for later reuse. This should be
-- a non-blocking operation.
--
put :: Pool a -> a -> IO ()
put (Pool ref) a =
  mask_ $ do
    it <- tryTakeMVar ref
    case it of
      Just (b :| bs) -> putMVar ref (a :| b : bs)
      Nothing        -> putMVar ref (a :| [])


#if __GLASGOW_HASKELL__ < 800
-- | Non-empty (and non-strict) list type.
--
infixr 5 :|
data NonEmpty a = a :| [a]
  deriving ( Eq, Ord, Show, Read )
#endif
--}