{-# LANGUAGE  LambdaCase, ScopedTypeVariables, GADTs, DeriveDataTypeable, Trustworthy #-}
module Control.CUtils.ThreadPool (Pool, addToPoolMulti, newPool, stopPool_,
-- ** Global thread pools
globalPool,
-- ** Compatibility shims
ThreadPool(..), Interruptible(..), NoPool(..), BoxedThreadPool(..)) where

import Control.Exception
import Control.Concurrent



import Data.Data
import Data.Array
import Data.Foldable
import Data.IORef



import Data.List.Extras.Argmax
import Data.Maybe
import Control.Monad
import Control.Monad.Identity
import Control.Monad.Loops
--import Control.Monad.Random
import Control.CUtils.BoundedQueue
import System.IO.Unsafe
import System.IO
import Prelude hiding (mapM_)
data Instruction = NextTask(IO()) | S deriving (Typeable)

data Worker= Worker{ instructions :: !(BoundedQueue Instruction), counter :: !(IORef Int) }
        deriving (Typeable)

instance Data Instruction
instance Data Worker

newtype Pool = Pool { workers_ :: Array Int Worker } deriving (Typeable, Data)

addToWorker :: Worker -> IO t -> IO()
{-# INLINE addToWorker #-}
addToWorker mv mnd = mask_$ do
        atomicModifyIORef'(counter mv) (flip(,) ().succ)
        writeRB(instructions mv) $!NextTask(void mnd)

addToPoolMulti :: Pool -> IO t -> IO()
{-# INLINABLE addToPoolMulti #-}
addToPoolMulti (Pool ls) _ | rangeSize(bounds ls)<=0 = throwIO$ErrorCall"addToPoolMulti: pool is empty"
addToPoolMulti (Pool ls) mnd = do
        let ls' = toList ls
        ls2 <- mapM(readIORef.counter) ls'--tbd
        let worker = fst.argmin snd.zip ls'$ls2
        addToWorker worker mnd

-- It's desirable for performance to have each worker thread occupied
-- as often as possible with work. The simplest way of achieving this
-- is to have a single work queue and have the various threads take
-- work off it greedily (or do a workstealing approach)
-- , but this introduces a central point of contention.
-- An observation: pretty good load balancing can be achieved with an
-- imprecise estimate of which threads are occupied; then in principle
-- the source of contention is eliminated; the caller of 'addToPoolMulti'
-- selects the worker thread to which to assign the work. (The estimate
-- is imprecise because the code is reading it with limited synchronization,
-- so the estimate is sometimes out of date.)

newWorker :: IO Worker
newWorker = do
        rb <- newRB 10000
        ref <- newIORef 0
        let worker = Worker rb ref

        _ <- forkIO(loop worker)
        return$!worker
        where

        -- Task  processing loop
        loop worker = whileM_(readRB(instructions worker) >>= \ case
                NextTask mnd -> do
                        -- For every task taken off  the work queue, decrement the counter.
                        atomicModifyIORef'(counter worker) (flip(,) ().pred)
                        -- The default exception handling policy for worker threads is to
                        -- print the exception to stderr.
                        catch mnd(\(ex::SomeException) -> hPrint stderr ex)
                        return$!True
                S -> return$!False) -- Serve a request to terminate the thread pool
                $return()

newPool :: Int -> IO Pool
newPool n= liftM(Pool. listArray(0,n-1)) (replicateM  n newWorker)

stopWorker :: Worker -> IO()

stopWorker mv = writeRB(instructions mv)$!S

stopPool_ :: Pool -> IO()
-- | Stop each worker thread in turn by sending it a message.
stopPool_ = mapM_ stopWorker.workers_

--------------------------------------------
-- Global thread pools

globalPool :: Pool
{-# NOINLINE globalPool #-}
-- 'Pool' type is abstract, ensuring that the implementation can choose the number of workers
-- in the pool based on various considerations without breaking referential transparency.
globalPool = unsafePerformIO(getNumCapabilities >>= newPool)

--------------------------------------------
-- Compatibility shims

-- | Thread pools support some standard operations....
class ThreadPool pool where
        addToPool :: pool -> IO t -> IO()

class Interruptible pool where
        stopPool :: pool -> IO()

instance ThreadPool Pool where
        addToPool = addToPoolMulti

instance Interruptible Pool where
        stopPool = stopPool_

-- | Use if you don't want to use a thread pool. The implementation of 'addToPool' spawns a green thread.
data NoPool = NoPool deriving (Typeable, Data)

instance ThreadPool NoPool where
        addToPool _ = void.forkIO.void

data BoxedThreadPool where
        BoxedThreadPool :: (ThreadPool pool) => pool -> BoxedThreadPool

instance ThreadPool BoxedThreadPool where
        addToPool(BoxedThreadPool pool) = addToPool pool

{-# SPECIALIZE addToPool :: Pool -> IO t -> IO() #-}
{-# SPECIALIZE addToPool :: NoPool -> IO t -> IO() #-}
{-# SPECIALIZE addToPool :: BoxedThreadPool -> IO t -> IO() #-}