module Test.IOSpec.Concurrent
(
IOConc
, runIOConc
, ThreadId
, MVar
, newEmptyMVar
, takeMVar
, putMVar
, forkIO
, Scheduler(..)
, streamSched
, roundRobin
)
where
import Data.Dynamic
import Data.Maybe (fromJust)
import Data.List (nub)
import Control.Monad.State
import qualified Data.Stream as Stream
newtype ThreadId = ThreadId Int deriving (Eq, Show)
type Data = Dynamic
type Loc = Int
data IOConc a =
NewEmptyMVar (Loc -> IOConc a)
| TakeMVar Loc (Data -> IOConc a)
| PutMVar Loc Data (IOConc a)
| forall b . Fork (IOConc b) (ThreadId -> IOConc a)
| Return a
instance Functor IOConc where
fmap f (Return x) = Return (f x)
fmap f (NewEmptyMVar io) = NewEmptyMVar (\l -> fmap f (io l))
fmap f (TakeMVar l io) = TakeMVar l (\d -> fmap f (io d))
fmap f (PutMVar l d io) = PutMVar l d (fmap f io)
fmap f (Fork l io) = Fork l (\tid -> fmap f (io tid))
instance Monad IOConc where
return = Return
(Return x) >>= g = g x
(NewEmptyMVar f) >>= g = NewEmptyMVar (\l -> f l >>= g)
(TakeMVar l f) >>= g = TakeMVar l (\d -> f d >>= g)
PutMVar c d f >>= g = PutMVar c d (f >>= g)
Fork p1 p2 >>= g = Fork p1 (\tid -> p2 tid >>= g)
newtype MVar a = MVar Loc deriving Typeable
newEmptyMVar :: IOConc (MVar a)
newEmptyMVar = NewEmptyMVar (Return . MVar)
takeMVar :: Typeable a => MVar a -> IOConc a
takeMVar (MVar l) = TakeMVar l (Return . unsafeFromDynamic)
putMVar :: Typeable a => MVar a -> a -> IOConc ()
putMVar (MVar l) d = PutMVar l (toDyn d) (Return ())
forkIO :: IOConc a -> IOConc ThreadId
forkIO p = Fork p Return
newtype Scheduler =
Scheduler (Int -> (ThreadId, Scheduler))
data ThreadStatus =
forall b . Running (IOConc b)
| Finished
type Heap = Loc -> Maybe Data
data Store = Store { fresh :: Loc
, heap :: Heap
, nextTid :: ThreadId
, soup :: ThreadId -> ThreadStatus
, scheduler :: Scheduler
, blockedThreads :: [ThreadId]
}
initStore :: Scheduler -> Store
initStore s = Store { fresh = 0
, nextTid = ThreadId 1
, scheduler = s
, blockedThreads = []
}
runIOConc :: IOConc a -> Scheduler -> Maybe a
runIOConc io s = evalState (interleave io) (initStore s)
data Status a = Stop a | Step (IOConc a) | Blocked
step :: IOConc a -> State Store (Status a)
step (Return a) = return (Stop a)
step (NewEmptyMVar f)
= do loc <- alloc
modifyHeap (update loc Nothing)
return (Step (f loc))
step (TakeMVar l f)
= do var <- lookupHeap l
case var of
Nothing -> return Blocked
(Just d) -> do emptyMVar l
return (Step (f d))
step (PutMVar l d p)
= do var <- lookupHeap l
case var of
Nothing -> do fillMVar l d
return (Step p)
(Just d) -> return Blocked
step (Fork l r)
= do tid <- freshThreadId
extendSoup l tid
return (Step (r tid))
emptyMVar :: Loc -> State Store ()
emptyMVar l = modifyHeap (update l Nothing)
fillMVar :: Loc -> Data -> State Store ()
fillMVar l d = modifyHeap (update l (Just d))
extendSoup :: IOConc a -> ThreadId -> State Store ()
extendSoup p tid = modifySoup (update tid (Running p))
data Process a =
Main (IOConc a)
| forall b . Aux (IOConc b)
interleave :: IOConc a -> State Store (Maybe a)
interleave main
= do (tid,t) <- schedule main
case t of
Main p ->
do x <- step p
case x of
Stop r -> return (Just r)
Step p -> do resetBlockedThreads
interleave p
Blocked -> do isDeadlock <- detectDeadlock
if isDeadlock
then return Nothing
else interleave main
Aux p ->
do x <- step p
case x of
Stop _ -> do resetBlockedThreads
finishThread tid
interleave main
Step q -> do resetBlockedThreads
extendSoup q tid
interleave main
Blocked -> do recordBlockedThread tid
interleave main
schedule :: IOConc a -> State Store (ThreadId, Process a)
schedule main = do (ThreadId tid) <- getNextThreadId
if tid == 0
then return (ThreadId 0, Main main)
else do
tsoup <- gets soup
case tsoup (ThreadId tid) of
Finished -> schedule main
Running p -> return (ThreadId tid, Aux p)
getNextThreadId :: State Store ThreadId
getNextThreadId = do Scheduler sch <- gets scheduler
(ThreadId n) <- gets nextTid
let (tid,s) = sch n
modifyScheduler (const s)
return tid
streamSched :: Stream.Stream Int -> Scheduler
streamSched xs =
Scheduler (\k -> (ThreadId (Stream.head xs `mod` k), streamSched (Stream.tail xs)))
roundRobin :: Scheduler
roundRobin = streamSched (Stream.unfold (\k -> (k, k+1)) 0)
freshThreadId :: State Store ThreadId
freshThreadId = do tid <- gets nextTid
modifyTid (\(ThreadId k) -> ThreadId (k + 1))
return tid
alloc :: State Store Loc
alloc = do loc <- gets fresh
modifyFresh ((+) 1)
return loc
lookupHeap :: Loc -> State Store (Maybe Data)
lookupHeap l = do h <- gets heap
return (h l)
extendHeap :: Loc -> Data -> State Store ()
extendHeap l d = modifyHeap (update l (Just d))
finishThread :: ThreadId -> State Store ()
finishThread tid = modifySoup (update tid Finished)
resetBlockedThreads :: State Store ()
resetBlockedThreads = modifyBlockedThreads (const [])
recordBlockedThread :: ThreadId -> State Store ()
recordBlockedThread tid = do
tids <- gets blockedThreads
if tid `elem` tids
then return ()
else modifyBlockedThreads (tid :)
detectDeadlock :: State Store Bool
detectDeadlock = do blockedThreads <- liftM length (gets blockedThreads)
(ThreadId nrThreads) <- gets nextTid
threadSoup <- gets soup
let allThreadIds = [ThreadId x | x <- [1 .. (nrThreads 1)]]
let finishedThreads = length $ filter isFinished (map threadSoup allThreadIds)
return (blockedThreads + finishedThreads == nrThreads 1)
isFinished :: ThreadStatus -> Bool
isFinished Finished = True
isFinished _ = False
update :: Eq a => a -> b -> (a -> b) -> (a -> b)
update l d h k
| l == k = d
| otherwise = h k
unsafeFromDynamic :: Typeable a => Dynamic -> a
unsafeFromDynamic = fromJust . fromDynamic
modifyHeap f = do s <- get
put (s {heap = f (heap s)})
modifyScheduler f = do s <- get
put (s {scheduler = f (scheduler s)})
modifyFresh f = do s <- get
put (s {fresh = f (fresh s)})
modifyTid f = do s <- get
put (s {nextTid = f (nextTid s)})
modifySoup f = do s <- get
put (s {soup = f (soup s)})
modifyBlockedThreads f = do s <- get
put (s {blockedThreads = f (blockedThreads s)})