module Control.Concurrent.Bag.Task
( TaskIO
, runTaskIO
, addTaskIO
, Interruptible (..)
, runInterrupted
, runInterruptible )
where
import Control.Applicative
import Control.Monad.Reader
import Control.Monad.Writer
import Control.Concurrent (forkIOWithUnmask, forkIO, threadDelay)
import Control.Concurrent.Bag.TaskBuffer
import Control.Concurrent.MVar
import Control.Concurrent (killThread)
import Control.Exception
import qualified Control.Concurrent.Bag.Basic as Basic
newtype TaskIO r a = TaskIO { getTaskReader :: ReaderT (IO (Maybe r) -> IO ()) IO a }
instance Functor (TaskIO r) where
fmap = liftM
instance Applicative (TaskIO r) where
pure = return
(<*>) = ap
instance Monad (TaskIO r) where
return = TaskIO . return
(TaskIO a) >>= b = TaskIO $ a >>= getTaskReader . b
instance MonadIO (TaskIO r) where
liftIO act = TaskIO $ lift act
runTaskIO :: TaskIO r (Maybe r) -> (IO (Maybe r) -> IO ()) -> IO (Maybe r)
runTaskIO = runReaderT . getTaskReader
addTaskIO :: TaskIO r (Maybe r) -> TaskIO r ()
addTaskIO task =
TaskIO $ do
add <- ask
liftIO $ add (runTaskIO task add)
data Interruptible r = NoResult
| OneResult r
| AddInterruptibles [Interruptible r]
runInterruptible :: Interruptible r -> TaskIO r (Maybe r)
runInterruptible cur = do
case cur of
NoResult -> return Nothing
OneResult r ->
liftIO $ evaluate r >>= return . Just
AddInterruptibles inters -> do
liftIO (evaluateList inters) >>= mapM (\i -> addTaskIO $ runInterruptible i)
return Nothing
runInterrupted :: Interruptible r
-> TaskIO r (Maybe r)
runInterrupted cur = do
resultVar <- liftIO newEmptyMVar
tid <- liftIO $ uninterruptibleMask_ $ forkIOWithUnmask $ \restore -> do
r <- restore (do
case cur of
NoResult -> return NoResult
OneResult r ->
evaluate r >>= return . OneResult
AddInterruptibles inters ->
evaluateList inters >>= return . AddInterruptibles)
`onException`
(putMVar resultVar Nothing)
putMVar resultVar $ Just r
stopper <- liftIO $ forkIO $ (threadDelay 1000 >> killThread tid)
rs <- liftIO $ takeMVar resultVar
liftIO $ killThread stopper
case rs of
Nothing -> do
addTaskIO $ runInterrupted cur
return Nothing
Just NoResult -> return Nothing
Just (OneResult r) -> return $ Just r
Just (AddInterruptibles inters) -> do
mapM (\i -> addTaskIO $ runInterrupted i) inters
return Nothing
evaluateMaybe Nothing = return Nothing
evaluateMaybe (Just a) = evaluate a >>= return . Just
evaluateList [] = return []
evaluateList (x:xs) = do
x' <- evaluate x
xs' <- evaluateList xs
return $ x':xs'