{-# LANGUAGE Trustworthy, ScopedTypeVariables, ConstraintKinds , DeriveDataTypeable #-}
module Control.CUtils.CPUMultiThreading (ExceptionList(..), ConcException(..), ConcurrentMethod, ConcurrentMethodAdapter, module Control.CUtils.ThreadPool, simpleConc_, concurrent_, throwToCallerAdapter_, toNewStyle) where
import Control.Exception
import Control.Concurrent
import Control.Concurrent.Chan
import Data.Typeable
import Control.Monad.Loops
import Data.IORef
import Data.List.Extra
import Data.Maybe
import Control.Monad
import Control.Monad.Identity
import Control.Category
import Control.Arrow
import System.IO
import Control.CUtils.Semaphore
import Control.CUtils.ThreadPool
import Prelude hiding (catch, (.), id)
data ExceptionList = ExceptionList[SomeException] deriving (Show, Typeable)
instance Exception ExceptionList
data ConcException = ConcException deriving (Show, Typeable)
instance Exception ConcException
type Over p f t u v w = p v(f w) -> t -> f u
type Over' p f t u = Over p f t t u u
type ConcurrentMethod t t2 = Over(Kleisli((->) Int)) IO () t () t2
type ConcurrentMethodAdapter t t2 = Pool -> (Int -> ConcurrentMethod t t2) -> Int -> ConcurrentMethod t t2
simpleConc_ :: (Foldable f)=> Pool -> f(IO()) -> () -> IO()
simpleConc_ ls mnds unit= do
sem <- newSem
ref <- newIORef 0
catch
(mapM_(\m ->do
addToPoolMulti ls(finally m(putSem sem 1))
modifyIORef' ref succ)
mnds)
(\(ex :: SomeException) -> do
hPrint stderr ex
nLen <- readIORef ref
takeSem sem nLen
throwIO ex)
nLen <- readIORef ref
takeSem sem nLen
chunkSize :: Int
chunkSize= 10000
concurrent_ :: Pool -> Int -> ConcurrentMethod() t
concurrent_ ls n mnds = simpleConc_ ls
(map
(\ii -> mapM_(runKleisli mnds()) [ii..pred(min n(ii+chunkSize))])
[0,chunkSize..n-1])
getExceptions exs = do
writeChan exs$!mzero
ls <- getChanContents exs
let exslst = map fromJust . takeWhile isJust$ls
when(not(null exslst))$throwIO(ExceptionList exslst)
throwToCallerAdapter_ ::
Pool
-> (t1 -> ConcurrentMethod() ())
-> t1 -> ConcurrentMethod() ()
throwToCallerAdapter_ _ method arg mnds unit = do
exs <- newChan
_ <- method arg(arr(f exs).mnds) unit
getExceptions exs
where
f exs mnd = catch mnd(\(ex::SomeException) -> writeChan exs$!return ex)
toNewStyle :: ((Int -> IO t2) ->IO t) -> ConcurrentMethod t t2
{-# INLINE toNewStyle #-}
toNewStyle x x2 unit= x(runKleisli x2 unit)