{-# LANGUAGE BangPatterns #-}
-- |
-- Module      : Data.Array.Accelerate.LLVM.PTX.Execute.Stream.Reservoir
-- Copyright   : [2016..2020] The Accelerate Team
-- License     : BSD3
--
-- Maintainer  : Trevor L. McDonell <trevor.mcdonell@gmail.com>
-- Stability   : experimental
-- Portability : non-portable (GHC extensions)
--

module Data.Array.Accelerate.LLVM.PTX.Execute.Stream.Reservoir (

  Reservoir,
  new, malloc, insert,

) where

import Data.Array.Accelerate.LLVM.PTX.Context                       ( Context )
import qualified Data.Array.Accelerate.LLVM.PTX.Debug               as Debug

import Control.Concurrent.MVar
import Data.Sequence                                                ( Seq )
import qualified Data.Sequence                                      as Seq
import qualified Foreign.CUDA.Driver.Stream                         as Stream


-- | The reservoir is a place to store CUDA execution streams that are currently
-- inactive. When a new stream is requested one is provided from the reservoir
-- if available, otherwise a fresh execution stream is created.
--
type Reservoir = MVar (Seq Stream.Stream)


-- | Generate a new empty reservoir. It is not necessary to pre-populate it with
-- any streams because stream creation does not cause a device synchronisation.
--
-- Additionally, we do not need to finalise any of the streams. A reservoir is
-- tied to a specific execution context, so when the reservoir dies it is
-- because the PTX state and contained CUDA context have died, so there is
-- nothing more to do.
--
{-# INLINEABLE new #-}
new :: Context -> IO Reservoir
new :: Context -> IO Reservoir
new Context
_ctx = Seq Stream -> IO Reservoir
forall a. a -> IO (MVar a)
newMVar Seq Stream
forall a. Seq a
Seq.empty


-- | Retrieve an execution stream from the reservoir, if one is available.
--
-- Since we put streams back onto the reservoir once we have finished adding
-- work to them, not once they have completed execution of the tasks, we must
-- check for one which has actually completed.
--
-- See note: [Finalising execution streams]
--
{-# INLINEABLE malloc #-}
malloc :: Reservoir -> IO (Maybe Stream.Stream)
malloc :: Reservoir -> IO (Maybe Stream)
malloc !Reservoir
ref =
  Reservoir
-> (Seq Stream -> IO (Seq Stream, Maybe Stream))
-> IO (Maybe Stream)
forall a b. MVar a -> (a -> IO (a, b)) -> IO b
modifyMVar Reservoir
ref (Seq Stream -> Seq Stream -> IO (Seq Stream, Maybe Stream)
search Seq Stream
forall a. Seq a
Seq.empty)
  where
    -- scan through the streams in the reservoir looking for the first inactive
    -- one. Optimistically adding the streams to the end of the reservoir as
    -- soon as we stop assigning new work to them (c.f. async), and just
    -- checking they have completed before reusing them, is quicker than having
    -- a finaliser thread block until completion before retiring them.
    --
    search :: Seq Stream -> Seq Stream -> IO (Seq Stream, Maybe Stream)
search !Seq Stream
acc !Seq Stream
rsv =
      case Seq Stream -> ViewL Stream
forall a. Seq a -> ViewL a
Seq.viewl Seq Stream
rsv of
        ViewL Stream
Seq.EmptyL  -> (Seq Stream, Maybe Stream) -> IO (Seq Stream, Maybe Stream)
forall (m :: * -> *) a. Monad m => a -> m a
return (Seq Stream
acc, Maybe Stream
forall a. Maybe a
Nothing)
        Stream
s Seq.:< Seq Stream
ss -> do
          Bool
done <- Stream -> IO Bool
Stream.finished Stream
s
          case Bool
done of
            Bool
True  -> (Seq Stream, Maybe Stream) -> IO (Seq Stream, Maybe Stream)
forall (m :: * -> *) a. Monad m => a -> m a
return (Seq Stream
acc Seq Stream -> Seq Stream -> Seq Stream
forall a. Seq a -> Seq a -> Seq a
Seq.>< Seq Stream
ss, Stream -> Maybe Stream
forall a. a -> Maybe a
Just Stream
s)
            Bool
False -> Seq Stream -> Seq Stream -> IO (Seq Stream, Maybe Stream)
search (Seq Stream
acc Seq Stream -> Stream -> Seq Stream
forall a. Seq a -> a -> Seq a
Seq.|> Stream
s) Seq Stream
ss


-- | Add a stream to the reservoir
--
{-# INLINEABLE insert #-}
insert :: Reservoir -> Stream.Stream -> IO ()
insert :: Reservoir -> Stream -> IO ()
insert !Reservoir
ref !Stream
stream = do
  String -> IO ()
message (String
"stash stream " String -> String -> String
forall a. [a] -> [a] -> [a]
++ Stream -> String
showStream Stream
stream)
  Reservoir -> (Seq Stream -> IO (Seq Stream)) -> IO ()
forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ Reservoir
ref ((Seq Stream -> IO (Seq Stream)) -> IO ())
-> (Seq Stream -> IO (Seq Stream)) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Seq Stream
rsv -> Seq Stream -> IO (Seq Stream)
forall (m :: * -> *) a. Monad m => a -> m a
return (Seq Stream
rsv Seq Stream -> Stream -> Seq Stream
forall a. Seq a -> a -> Seq a
Seq.|> Stream
stream)


-- Debug
-- -----

{-# INLINE trace #-}
trace :: String -> IO a -> IO a
trace :: String -> IO a -> IO a
trace String
msg IO a
next = do
  Flag -> String -> IO ()
Debug.traceIO Flag
Debug.dump_sched (String
"stream: " String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
msg)
  IO a
next

{-# INLINE message #-}
message :: String -> IO ()
message :: String -> IO ()
message String
s = String
s String -> IO () -> IO ()
forall a. String -> IO a -> IO a
`trace` () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()

{-# INLINE showStream #-}
showStream :: Stream.Stream -> String
showStream :: Stream -> String
showStream (Stream.Stream Ptr ()
s) = Ptr () -> String
forall a. Show a => a -> String
show Ptr ()
s