module UnambCustom.Unamb
( race, unamb, rebootScheduler )
where
import Prelude hiding (catch)
import Control.Concurrent
import Data.IORef
import qualified Data.Map as Map
import System.IO.Unsafe (unsafePerformIO)
import Control.Exception
import Data.Unique
import Control.Monad.Writer
takeMVar' = const takeMVar
putMVar' = const putMVar
data LiveEntry = LiveEntry {
aliveVar :: IORef Bool,
subthreads :: IORef (Map.Map ThreadId ()),
parentEntry :: Maybe LiveEntry
}
data ForkRecord = ForkRecord {
frParent :: ThreadId,
frEntry :: LiveEntry,
frAction :: IO ()
}
type LiveMap = Map.Map ThreadId LiveEntry
type ForkQueue = Chan ForkRecord
data Scheduler = Scheduler {
liveMap :: MVar (LiveMap),
forkQueue :: ForkQueue
}
makeScheduler :: IO Scheduler
makeScheduler = do
livemap <- newMVar Map.empty
forkqueue <- newChan
let sched = Scheduler livemap forkqueue
forkIO $ schedDaemon sched
return sched
newLiveEntry :: Maybe LiveEntry -> IO LiveEntry
newLiveEntry parent = do
var <- newIORef True
subs <- newIORef Map.empty
return $ LiveEntry var subs parent
newMThreadWithParent :: Scheduler -> Maybe LiveEntry -> IO () -> IO ()
newMThreadWithParent sched parententry thr = block $ do
parentid <- myThreadId
entry <- newLiveEntry parententry
writeChan (forkQueue sched) $ ForkRecord {
frParent = parentid,
frEntry = entry,
frAction = thr }
newMThread :: Scheduler -> IO () -> IO ()
newMThread sched thr = block $ do
parentid <- myThreadId
livemap <- takeMVar' "newMThread " (liveMap sched)
parententry <- return $! Map.lookup parentid livemap
newMThreadWithParent sched parententry thr
putMVar' "newMThread " (liveMap sched) livemap
endMThread :: Scheduler -> ThreadId -> IO ()
endMThread sched threadid = block $ do
livemap <- takeMVar' "endMThread " (liveMap sched)
death <- execWriterT $ go livemap threadid
putMVar' "endMThread " (liveMap sched) $
Map.delete threadid (livemap `Map.difference` death)
mapM_ killThread (Map.keys death)
where
go livemap threadid = do
case Map.lookup threadid livemap of
Nothing -> return ()
Just entry -> do
case parentEntry entry of
Nothing -> return ()
Just p -> liftIO $ safeModifyIORef (subthreads p) (Map.delete threadid)
subs <- liftIO . readIORef $ subthreads entry
tell subs
liftIO $ writeIORef (aliveVar entry) False
forM_ (Map.keys subs) $ go livemap
killMThread :: Scheduler -> ThreadId -> IO ()
killMThread sched threadid = do
endMThread sched threadid
killThread threadid
schedDaemon :: Scheduler -> IO ()
schedDaemon sched = forever . block $ do
record <- readChan (forkQueue sched)
livemap <- takeMVar' "schedDaemon" (liveMap sched)
case parentEntry (frEntry record) of
Nothing -> do
childid <- forkIO $
frAction record `finally` (endMThread sched =<< myThreadId)
putMVar' "schedDaemon" (liveMap sched) (Map.insert childid (frEntry record) livemap)
Just entry -> do
alive <- readIORef (aliveVar entry)
case alive of
False -> putMVar' "schedDaemon" (liveMap sched) livemap
True -> do
childid <- forkIO $ do
frAction record
endMThread sched =<< myThreadId
safeModifyIORef (subthreads entry) (Map.insert childid ())
putMVar' "schedDaemon" (liveMap sched) (Map.insert childid (frEntry record) livemap)
rebootSched :: Scheduler -> IO ()
rebootSched sched = block $ do
livemap <- takeMVar (liveMap sched)
mapM_ killThread (Map.keys livemap)
putMVar (liveMap sched) Map.empty
theScheduler = unsafePerformIO makeScheduler
raceOn :: Scheduler -> IO a -> IO a -> IO a
raceOn sched ioa iob = do
livemap <- readMVar (liveMap sched)
mythread <- myThreadId
case Map.lookup mythread livemap of
Nothing -> do
var <- newEmptyMVar
newMThreadWithParent sched Nothing $ do
unsafeRaceOn sched ioa iob >>= putMVar var
takeMVar var
Just _ -> unsafeRaceOn sched ioa iob
unsafeRaceOn :: Scheduler -> IO a -> IO a -> IO a
unsafeRaceOn sched ioa iob = do
var <- newEmptyMVar
let writer a = a >>= putMVar var
newMThread sched $ ignoreExceptions (writer ioa)
newMThread sched $ ignoreExceptions (writer iob)
takeMVar var
where
ignoreExceptions io = io `catch` (\e -> let _ = (e::SomeException) in return ())
race :: IO a -> IO a -> IO a
race = raceOn theScheduler
unamb :: a -> a -> a
unamb a b = unsafePerformIO $ race (return $! a) (return $! b)
rebootScheduler :: IO ()
rebootScheduler = rebootSched theScheduler
safeModifyIORef :: IORef a -> (a -> a) -> IO ()
safeModifyIORef r f = atomicModifyIORef r (\x -> (f x, ()))