{-# LANGUAGE BangPatterns #-} {-# LANGUAGE MagicHash #-} {-# LANGUAGE RecordWildCards #-} -- | -- Module : Data.Array.Accelerate.LLVM.PTX.Execute.Stream -- Copyright : [2014..2017] Trevor L. McDonell -- [2014..2014] Vinod Grover (NVIDIA Corporation) -- License : BSD3 -- -- Maintainer : Trevor L. McDonell -- Stability : experimental -- Portability : non-portable (GHC extensions) -- module Data.Array.Accelerate.LLVM.PTX.Execute.Stream ( Reservoir, new, Stream, create, destroy, streaming, ) where -- accelerate import Data.Array.Accelerate.Lifetime import qualified Data.Array.Accelerate.Array.Remote.LRU as Remote import Data.Array.Accelerate.LLVM.PTX.Array.Remote ( ) import Data.Array.Accelerate.LLVM.PTX.Execute.Event ( Event ) import Data.Array.Accelerate.LLVM.PTX.Target ( PTX(..) ) import Data.Array.Accelerate.LLVM.State import qualified Data.Array.Accelerate.LLVM.PTX.Debug as Debug import qualified Data.Array.Accelerate.LLVM.PTX.Execute.Event as Event import Data.Array.Accelerate.LLVM.PTX.Execute.Stream.Reservoir as RSV -- cuda import Foreign.CUDA.Driver.Error import qualified Foreign.CUDA.Driver.Stream as Stream -- standard library import Control.Exception import Control.Monad.State -- | A 'Stream' represents an independent sequence of computations executed on -- the GPU. Operations in different streams may be executed concurrently with -- each other, but operations in the same stream can never overlap. -- 'Data.Array.Accelerate.LLVM.PTX.Execute.Event.Event's can be used for -- efficient cross-stream synchronisation. -- type Stream = Lifetime Stream.Stream -- Executing operations in streams -- ------------------------------- -- | Execute an operation in a unique execution stream. The (asynchronous) -- result is passed to a second operation together with an event that will be -- signalled once the operation is complete. The stream and event are released -- after the second operation completes. -- {-# INLINEABLE streaming #-} streaming :: (Stream -> LLVM PTX a) -> (Event -> a -> LLVM PTX b) -> LLVM PTX b streaming !action !after = do PTX{..} <- gets llvmTarget stream <- create first <- action stream end <- Event.waypoint stream final <- after end first liftIO $ do destroy stream Event.destroy end return final -- Primitive operations -- -------------------- {-- -- | Delete all execution streams from the reservoir -- {-# INLINEABLE flush #-} flush :: Context -> Reservoir -> IO () flush !Context{..} !ref = do mc <- deRefWeak weakContext case mc of Nothing -> message "delete reservoir/dead context" Just ctx -> do message "flush reservoir" old <- swapMVar ref Seq.empty bracket_ (CUDA.push ctx) CUDA.pop $ Seq.mapM_ Stream.destroy old --} -- | Create a CUDA execution stream. If an inactive stream is available for use, -- use that, otherwise generate a fresh stream. -- -- Note: [Finalising execution streams] -- -- We don't actually ensure that the stream has executed all of its operations -- to completion before attempting to return it to the reservoir for reuse. -- Doing so increases overhead of the LLVM RTS due to 'forkIO', and consumes CPU -- time as 'Stream.block' busy-waits for the stream to complete. It is quicker -- to optimistically return the streams to the end of the reservoir immediately, -- and just check whether the stream is done before reusing it. -- -- > void . forkIO $ do -- > Stream.block stream -- > modifyMVar_ ref $ \rsv -> return (rsv Seq.|> stream) -- {-# INLINEABLE create #-} create :: LLVM PTX Stream create = do PTX{..} <- gets llvmTarget s <- create' stream <- liftIO $ newLifetime s liftIO $ addFinalizer stream (RSV.insert ptxStreamReservoir s) return stream create' :: LLVM PTX Stream.Stream create' = do PTX{..} <- gets llvmTarget ms <- attempt "create/reservoir" (liftIO $ RSV.malloc ptxStreamReservoir) `orElse` attempt "create/new" (liftIO . catchOOM $ Stream.create []) `orElse` do Remote.reclaim ptxMemoryTable liftIO $ do message "create/new: failed (purging)" catchOOM $ Stream.create [] case ms of Just s -> return s Nothing -> liftIO $ do message "create/new: failed (non-recoverable)" throwIO (ExitCode OutOfMemory) where catchOOM :: IO a -> IO (Maybe a) catchOOM it = liftM Just it `catch` \e -> case e of ExitCode OutOfMemory -> return Nothing _ -> throwIO e attempt :: MonadIO m => String -> m (Maybe a) -> m (Maybe a) attempt msg ea = do ma <- ea case ma of Nothing -> return Nothing Just a -> do liftIO (message msg) return (Just a) orElse :: MonadIO m => m (Maybe a) -> m (Maybe a) -> m (Maybe a) orElse ea eb = do ma <- ea case ma of Just a -> return (Just a) Nothing -> eb -- | Merge a stream back into the reservoir. This must only be done once all -- pending operations in the stream have completed. -- {-# INLINEABLE destroy #-} destroy :: Stream -> IO () destroy = finalize -- Debug -- ----- {-# INLINE trace #-} trace :: String -> IO a -> IO a trace msg next = do Debug.traceIO Debug.dump_sched ("stream: " ++ msg) next {-# INLINE message #-} message :: String -> IO () message s = s `trace` return ()