module Control.Concurrent.Bag.Task
( TaskIO
, runTaskIO
, addTaskIO
, writeResult
, Interruptible (..)
, runInterrupted
, runInterruptible
, WriteResult (..)
, AddTask (..) )
where
import Control.Applicative
import Control.Monad.Reader
import Control.Monad.Writer
import Control.Concurrent (forkIOWithUnmask, forkIO, threadDelay)
import Control.Concurrent.Bag.TaskBuffer
import Control.Concurrent.MVar
import Control.Concurrent (killThread)
import Control.Exception
import qualified Control.Concurrent.Bag.Basic as Basic
newtype TaskIO r a = TaskIO { getTaskReader :: ReaderT (IO (Maybe r) -> IO (), r -> IO ()) IO a }
instance Functor (TaskIO r) where
fmap = liftM
instance Applicative (TaskIO r) where
pure = return
(<*>) = ap
instance Monad (TaskIO r) where
return = TaskIO . return
(TaskIO a) >>= b = TaskIO $ a >>= getTaskReader . b
instance MonadIO (TaskIO r) where
liftIO act = TaskIO $ lift act
type WriteResult r = r -> IO ()
type AddTask r = IO (Maybe r) -> IO ()
runTaskIO :: TaskIO r (Maybe r)
-> AddTask r
-> WriteResult r
-> IO (Maybe r)
runTaskIO tio addTask addResult = runReaderT (getTaskReader tio) (addTask, addResult)
addTaskIO :: TaskIO r (Maybe r) -> TaskIO r ()
addTaskIO task =
TaskIO $ do
(add, addR) <- ask
liftIO $ add (runTaskIO task add addR)
writeResult :: r -> TaskIO r ()
writeResult x =
TaskIO $ do
(_, add) <- ask
liftIO $ add x
data Interruptible r = NoResult
| OneResult r
| AddInterruptibles [Interruptible r]
runInterruptible :: Interruptible r -> TaskIO r (Maybe r)
runInterruptible cur = do
case cur of
NoResult -> return Nothing
OneResult r ->
liftIO $ evaluate r >>= return . Just
AddInterruptibles inters -> do
liftIO (evaluateList inters) >>= mapM (\i -> addTaskIO $ runInterruptible i)
return Nothing
runInterrupted :: Interruptible r
-> TaskIO r (Maybe r)
runInterrupted cur = do
resultVar <- liftIO newEmptyMVar
tid <- liftIO $ uninterruptibleMask_ $ forkIOWithUnmask $ \restore -> do
r <- restore (do
case cur of
NoResult -> return NoResult
OneResult r ->
evaluate r >>= return . OneResult
AddInterruptibles inters ->
evaluateList inters >>= return . AddInterruptibles)
`onException`
(putMVar resultVar Nothing)
putMVar resultVar $ Just r
stopper <- liftIO $ forkIO $ (threadDelay 1000 >> killThread tid)
rs <- liftIO $ takeMVar resultVar
liftIO $ killThread stopper
case rs of
Nothing -> do
addTaskIO $ runInterrupted cur
return Nothing
Just NoResult -> return Nothing
Just (OneResult r) -> return $ Just r
Just (AddInterruptibles inters) -> do
mapM (\i -> addTaskIO $ runInterrupted i) inters
return Nothing
evaluateList [] = return []
evaluateList (x:xs) = do
xs' <- evaluateList xs
return $ x:xs'