{-# LANGUAGE DoRec, ScopedTypeVariables #-} module Main (main) where import PrioritySync.PrioritySync import qualified PrioritySync.Internal.Queue as Queue import Control.Concurrent import Control.Concurrent.STM import Control.Monad import System.Random import Data.Set as Set import System.Environment import System.IO.Unsafe import System.Exit import Control.Exception {-# NOINLINE fail_strs #-} fail_strs :: MVar [String] fail_strs = unsafePerformIO $ newMVar [] failed :: String -> IO () failed s = modifyMVar_ fail_strs $ \strs -> do putStrLn s return $ strs ++ [s] testRoom :: IO () testRoom = do putStrLn "testRoom" putStrLn "Simple test of room reentrancy." m <- newRoom () me <- myThreadId let f s b = do ok <- liftM ((== b) . member me) $ inUse m when (not ok) $ failed $ "testRoom: " ++ s f "testRoom-1" False claim Acquire (Constrained,[m]) $ f "testRoom-2" True f "testRoom-3" False claim Release (Constrained,[m]) $ f "testRoom-4" False f "testRoom-5" False claim Acquire (Constrained,[m]) $ claim Acquire (Constrained,[m]) (f "testRoom-6" True) >> f "testRoom-7" True >> claim Release (Constrained,[m]) (f "testRoom-8" False) testMaxThreads :: IO () testMaxThreads = do putStrLn "testMaxThreads" putStrLn "Various threads run in a pair of rooms. The large room has four slots, while the small room has two slots." putStrLn $ "12 large, 4 small, 8 large+small, 4 unconstrained that occupy a slot in large and small" io_sem <- newMVar () c <- newMVar 0 let runThread s = do threadDelay 2000000 modifyMVar_ c (return . (+1)) withMVar io_sem $ const $ putStrLn s large <- newRoom (MaxThreads 4) small <- newRoom (MaxThreads 2) claim Acquire (Constrained,[large,small]) $ do forM_ [1..8] $ const $ forkIO $ claim Acquire (Constrained,[large,small]) $ runThread "large+small" forM_ [1..12] $ const $ forkIO $ claim Acquire (Constrained,[large]) $ runThread "large" forM_ [1..4] $ const $ forkIO $ claim Acquire (Constrained,[small]) $ runThread "small" forM_ [1..4] $ const $ forkIO $ claim Acquire (Unconstrained,[large,small]) $ runThread "unconstrained occupant (large+small)" threadDelay 3000000 withMVar c $ \x -> when (x < 4) $ failed "testMaxThreads: should have completed at least 4 tasks within 3 seconds" withMVar c $ \x -> when (x > 10) $ failed "testMaxThreads: should not have completed more than 10 tasks within 3 seconds" withMVar io_sem $ const $ putStrLn "--" threadDelay 3000000 withMVar io_sem $ const $ putStrLn "--" threadDelay 3000000 withMVar io_sem $ const $ putStrLn "--" threadDelay 3000000 withMVar io_sem $ const $ putStrLn "--" threadDelay 3000000 withMVar c $ \x -> when (x /= 28) $ failed "testMaxThreads: did not complete after 15 seconds." testQueue :: IO () testQueue = do putStrLn "testQueue" putStrLn "Perform some tasks in priority order, with constraints enforced at queue-level (to govern input), priority level (priority-1 tasks require small load)," putStrLn "and task level (priority-2 tasks only work when the counter is even)." need_to_print <- newTVarIO False value_to_print <- newTVarIO "" rec q <- Queue.newQueue $ fair_queue_configuration { allowed_ordering_inversion = 15, -- will stall with the default value, b/c the failover requires a significant ordering inversion queue_predicate = flip when retry =<< readTVar need_to_print, priority_indexed_predicate = \x -> do l <- Queue.load q; if x == 1 && l > 10 then retry else return () } counter <- newTVarIO 0 str <- newTVarIO "" let incCounter x s = do n <- readTVar counter writeTVar counter $ 1 + n writeTVar need_to_print True writeTVar value_to_print (s ++ " " ++ show n) writeTVar str . (++ show x) =<< readTVar str atomically $ do forM_ [1..4] $ const $ Queue.putTask q 0 $ incCounter 0 "priority-0" forM_ [1..4] $ const $ Queue.putTask q 1 $ incCounter 1 "priority-1, load <= 10" forM_ [1..4] $ const $ Queue.putTask q 2 $ do n <- readTVar counter when (n `mod` 2 /= 0) retry incCounter 2 "priority-2, counter is even" forM_ [1..4] $ const $ Queue.putTask q 3 $ incCounter 3 "priority-3" return () forM_ [1..32] $ const $ do m_s <- atomically $ (do b <- readTVar need_to_print; if b then liftM Just (readTVar value_to_print) else retry) `orElse` (Queue.pullTask q >> return Nothing) maybe (return ()) (\s -> putStrLn s >> atomically (writeTVar need_to_print False)) m_s ok <- atomically $ liftM (== "0000231111232323") $ readTVar str when (not ok) $ failed "testQueue" testTaskPool :: IO () testTaskPool = do putStrLn "testTaskPool" putStrLn "Threads should complete in priority order over a duration of one and a half seconds after a one second delay." putStrLn "Room has two open slots, so order of evaluation may be off by one task." pool <- newTaskPool fair_queue_configuration 2 () m_inversions <- newMVar 0 m_count <- newMVar 0 m_greatest_prio <- newMVar 0 let testPrio n = modifyMVar_ m_greatest_prio $ \greatest_prio -> do when (greatest_prio > n) $ modifyMVar_ m_inversions (return . (+1)) modifyMVar_ m_count (return . (+1)) return $ max greatest_prio n forM_ [1..10] $ const $ forkIO $ claim Acquire (schedule pool 4) $ testPrio 4 >> threadDelay 200000 >> putStrLn "finished-4" forM_ [1..4] $ const $ forkIO $ claim Acquire (schedule pool 2) $ testPrio 2 >> threadDelay 200000 >> putStrLn "finished-2" _ <- forkIO $ claim Acquire (schedule pool 1) $ testPrio 1 >> threadDelay 200000 >> putStrLn "finished-1" threadDelay 1000000 putStrLn "Starting testTaskPool:" startQueue pool threadDelay 4000000 stopQueue pool _ <- forkIO $ (claim Acquire (schedule pool 0) $ failed "testTaskPool: This task should never run!") `finally` (putStrLn "testTaskPool: runtime discovered that never-running task was hung (this is good)") withMVar m_inversions $ \inversions -> when (inversions > 2) $ failed "testTaskPool: too many priority inversions" withMVar m_count $ \count -> when (count /= 15) $ failed "testTaskPool: did not complete all tasks within 4 seconds" putStrLn "Finished testTaskPool:" testDispatch :: IO () testDispatch = do putStrLn "testDispatch" putStrLn "Dispatch 8 tasks into a room of size 3. Reprioritize one task from 7 to 0." io_sem <- newMVar () c <- newMVar 0 let runThread s = do withMVar io_sem $ const $ putStrLn $ "{" ++ s threadDelay 2000000 modifyMVar_ c (return . (+1)) withMVar io_sem $ const $ putStrLn $ s ++ "}" pool <- newTaskPool fair_queue_configuration 3 () task_handles <- forM [1..8 :: Integer] $ \n -> dispatch (schedule pool n) (runThread $ "priority-" ++ show n) startQueue pool threadDelay 1000000 reprioritize (task_handles !! 6) (const 0) putStrLn "Just reprioritized 7 -> 0 after waiting on second." threadDelay 2000000 withMVar c $ \x -> when (x < 3) $ failed "testDispatch: should have completed at least 3 tasks within 3 seconds" withMVar c $ \x -> when (x > 5) $ failed "testDispatch: should not have completed more than 5 tasks within 3 seconds" threadDelay 7000000 withMVar c $ \x -> when (x /= 8) $ failed "testDispatch: did not complete after 7 seconds." stress :: forall a. (Ord a) => QueueConfigurationRecord a -> (IO a) -> IO () stress config prioIO = do putStrLn "stressTest" putStrLn "Create 10,000 threads in a room of size 100, each test needs half a second to complete, and see what happens." threadDelay 3000000 pool <- newTaskPool config 100 () startQueue pool counter <- newMVar 0 forM_ [1..10000] $ \_ -> do prio <- prioIO forkIO $ claim Acquire (schedule pool prio) $ threadDelay 500000 >> modifyMVar_ counter (return . (+1)) threadDelay 50000000 waitUntilFinished pool withMVar counter $ putStrLn . show example :: IO () example = do let expensiveTask = threadDelay 1000000 pool <- simpleTaskPool _ <- forkIO $ claim Acquire (schedule pool 1) $ putStrLn "Task 1 started . . ." >> expensiveTask >> putStrLn "Task 1 completed." _ <- forkIO $ claim Acquire (schedule pool 3) $ putStrLn "Task 3 started . . ." >> expensiveTask >> putStrLn "Task 3 completed." _ <- forkIO $ claim Acquire (schedule pool 2) $ putStrLn "Task 2 started . . ." >> expensiveTask >> putStrLn "Task 2 completed." threadDelay 100000 -- contrive to wait for all tasks to become enqueued putStrLn "Starting pool: " startQueue pool threadDelay 4000000 -- contrive to wait for all tasks to become dequeued main :: IO () main = do args <- liftM (\args -> if Prelude.null args then ["help"] else args) getArgs let shouldRun s@('s':'t':'r':'e':'s':'s':_) = s `elem` args shouldRun "example" = "example" `elem` args shouldRun s = s `elem` args || "all" `elem` args when (shouldRun "help") $ putStrLn "tests: all, testRoom, testMaxThreads, testQueue, testTaskPool, testDispatch, stressInt, stressIntFair, stressInt2, stressUnit, stressUnitFILO, stressUnitFair" when (shouldRun "example") $ example when (shouldRun "testRoom") testRoom when (shouldRun "testMaxThreads") testMaxThreads when (shouldRun "testQueue") testQueue when (shouldRun "testTaskPool") testTaskPool when (shouldRun "testDispatch") testDispatch when (shouldRun "stressInt") $ stress fast_queue_configuration $ randomRIO (0,1000 :: Int) when (shouldRun "stressIntFair") $ stress fair_queue_configuration $ randomRIO (0,1000 :: Int) when (shouldRun "stressInt2") $ stress fast_queue_configuration $ randomRIO (0,2 :: Int) when (shouldRun "stressUnit") $ stress fast_queue_configuration $ return () when (shouldRun "stressUnitFILO") $ stress (fast_queue_configuration { queue_order = FILO }) $ return () when (shouldRun "stressUnitFair") $ stress fair_queue_configuration $ return () withMVar fail_strs $ \strs -> do forM_ strs $ \s -> putStrLn $ "FAILED: " ++ s when (not $ Prelude.null strs) $ exitFailure putStrLn "Done."