{-# LANGUAGE RecursiveDo, ScopedTypeVariables #-} module Main (main) where import Control.Concurrent.Priority.Room import Control.Concurrent.Priority.Queue import Control.Concurrent.Priority.TaskPool import Control.Concurrent.MVar import Control.Monad import System.Random import Data.Set as Set import GHC.Conc import System.Environment import System.IO.Unsafe import System.Exit {-# NOINLINE fail_strs #-} fail_strs :: MVar [String] fail_strs = unsafePerformIO $ newMVar [] failed :: String -> IO () failed s = modifyMVar_ fail_strs $ \strs -> do putStrLn s return $ strs ++ [s] testRoom :: IO () testRoom = do putStrLn "testRoom" putStrLn "Simple test of room reentrancy." m <- newRoom () me <- myThreadId let check s b = do ok <- atomically $ liftM ((== b) . member me) $ inUse m when (not ok) $ failed $ "testRoom: " ++ s check "testRoom-1" False claim Acquire [m] $ check "testRoom-2" True check "testRoom-3" False claim Release [m] $ check "testRoom-4" False check "testRoom-5" False claim Acquire [m] $ claim Acquire [m] (check "testRoom-6" True) >> check "testRoom-7" True >> claim Release [m] (check "testRoom-8" False) testMaxThreads :: IO () testMaxThreads = do putStrLn "testMaxThreads" putStrLn "Various threads run in a pair of rooms. The large room has four slots, while the small room has two slots." putStrLn $ "12 large, 4 small, 8 large+small, 4 unconstrained that occupy a slot in large and small" io_sem <- newMVar () c <- newMVar 0 let runThread s = do threadDelay 2000000 modifyMVar_ c (return . (+1)) withMVar io_sem $ const $ putStrLn s large <- newRoom (MaxThreads 4) small <- newRoom (MaxThreads 2) claim Acquire [large,small] $ do forM_ [1..8] $ const $ forkIO $ claim Acquire [large,small] $ runThread "large+small" forM_ [1..12] $ const $ forkIO $ claim Acquire [large] $ runThread "large" forM_ [1..4] $ const $ forkIO $ claim Acquire [small] $ runThread "small" forM_ [1..4] $ const $ forkIO $ claim Acquire (Unconstrained,[large,small]) $ runThread "unconstrained occupant (large+small)" threadDelay 3000000 withMVar c $ \x -> when (x < 4) $ failed "testMaxThreads: should have completed at least 4 tasks within 3 seconds" withMVar c $ \x -> when (x > 10) $ failed "testMaxThreads: should not have completed more than 10 tasks within 3 seconds" withMVar io_sem $ const $ putStrLn "--" threadDelay 3000000 withMVar io_sem $ const $ putStrLn "--" threadDelay 3000000 withMVar io_sem $ const $ putStrLn "--" threadDelay 3000000 withMVar io_sem $ const $ putStrLn "--" threadDelay 3000000 withMVar c $ \x -> when (x /= 28) $ failed "testMaxThreads: did not complete after 15 seconds." testQueue :: IO () testQueue = mdo putStrLn "testQueue" putStrLn "Perform some tasks in priority order, with constraints enforced at queue-level (to govern input), priority level (priority-1 tasks require small load)," putStrLn "and task level (priority-2 tasks only work when the counter is even)." need_to_print <- newTVarIO False value_to_print <- newTVarIO "" q <- newQueue $ fair_queue_configuration { queue_predicate = flip when retry =<< readTVar need_to_print, priority_indexed_predicate = \x -> do l <- load q; if x == 1 && l > 10 then retry else return () } counter <- newTVarIO 0 str <- newTVarIO "" let incCounter x s = do n <- readTVar counter writeTVar counter $ 1 + n writeTVar need_to_print True writeTVar value_to_print (s ++ " " ++ show n) writeTVar str . (++ show x) =<< readTVar str atomically $ do forM [1..4] $ const $ putTask q 0 $ incCounter 0 "priority-0" forM [1..4] $ const $ putTask q 1 $ incCounter 1 "priority-1, load <= 10" forM [1..4] $ const $ putTask q 2 $ do n <- readTVar counter when (n `mod` 2 /= 0) retry incCounter 2 "priority-2, counter is even" forM [1..4] $ const $ putTask q 3 $ incCounter 3 "priority-3" forM_ [1..32] $ const $ do m_s <- atomically $ (do b <- readTVar need_to_print; if b then liftM Just (readTVar value_to_print) else retry) `orElse` (pullTask q >> return Nothing) maybe (return ()) (\s -> putStrLn s >> atomically (writeTVar need_to_print False)) m_s ok <- atomically $ liftM (== "0000231111232323") $ readTVar str when (not ok) $ failed "testQueue" testTaskPool :: IO () testTaskPool = do putStrLn "testTaskPool" putStrLn "Threads should complete in priority order over a duration of one and a half seconds after a one second delay." putStrLn "Room has two open slots, so order of evaluation may be off by one task." pool <- newTaskPool fair_queue_configuration 2 () m_inversions <- newMVar 0 m_count <- newMVar 0 m_last_prio <- newMVar 0 let testPrio n = modifyMVar_ m_last_prio $ \last_prio -> do when (last_prio > n) $ modifyMVar_ m_inversions (return . (+1)) modifyMVar_ m_count (return . (+1)) return n forM_ [1..10] $ const $ forkIO $ claim Acquire (schedule pool 4) $ testPrio 4 >> threadDelay 200000 >> putStrLn "finished-4" forM_ [1..4] $ const $ forkIO $ claim Acquire (schedule pool 2) $ testPrio 2 >> threadDelay 200000 >> putStrLn "finished-2" forkIO $ claim Acquire (schedule pool 1) $ testPrio 1 >> threadDelay 200000 >> putStrLn "finished-1" threadDelay 1000000 putStrLn "Starting testTaskPool:" startQueue pool threadDelay 4000000 stopQueue pool forkIO $ claim Acquire (schedule pool 0) $ failed "testTaskPool: This task should never run!" withMVar m_inversions $ \inversions -> when (inversions > 2) $ failed "testTaskPool: too many priority inversions" withMVar m_count $ \count -> when (count /= 15) $ failed "testTaskPool: did not complete all tasks within 4 seconds" putStrLn "Finished testTaskPool:" stress :: forall a. (Ord a) => QueueConfigurationRecord a -> (IO a) -> IO () stress config prioIO = do putStrLn "stressTest" putStrLn "Create 10,000 threads in a room of size 100, each test needs half a second to complete, and see what happens." threadDelay 3000000 pool <- newTaskPool config 100 () startQueue pool counter <- newMVar 0 forM_ [1..10000] $ \_ -> do prio <- prioIO forkIO $ claim Acquire (schedule pool prio) $ threadDelay 500000 >> modifyMVar_ counter (return . (+1)) threadDelay 50000000 atomically $ flip unless retry . (== 0) =<< activity pool withMVar counter $ putStrLn . show _example1 :: IO () _example1 = do (pool :: TaskPool () ()) <- simpleTaskPool forkIO $ claim Acquire pool $ putStrLn "Hello world!" forkIO $ claim Acquire pool $ putStrLn "Goodbye world!" startQueue pool _example2 :: IO () _example2 = do prio_pool <- simpleTaskPool forkIO $ claim Acquire (schedule prio_pool 1) $ putStrLn "Hello world!" forkIO $ claim Acquire (schedule prio_pool 2) $ putStrLn "Goodbye world!" startQueue prio_pool main :: IO () main = do args <- liftM (\args -> if Prelude.null args then ["help"] else args) getArgs let shouldRun s@('s':'t':'r':'e':'s':'s':_) = s `elem` args shouldRun s = s `elem` args || "all" `elem` args when (shouldRun "help") $ putStrLn "tests: all, testRoom, testMaxThreads, testQueue, testTaskPool, stressInt, stressIntFair, stressInt2, stressUnit, stressUnitFILO, stressUnitFair" when (shouldRun "testRoom") testRoom when (shouldRun "testMaxThreads") testMaxThreads when (shouldRun "testQueue") testQueue when (shouldRun "testTaskPool") testTaskPool when (shouldRun "stressInt") $ stress fast_queue_configuration $ randomRIO (0,1000 :: Int) when (shouldRun "stressIntFair") $ stress fair_queue_configuration $ randomRIO (0,1000 :: Int) when (shouldRun "stressInt2") $ stress fast_queue_configuration $ randomRIO (0,2 :: Int) when (shouldRun "stressUnit") $ stress fast_queue_configuration $ return () when (shouldRun "stressUnitFILO") $ stress (fast_queue_configuration { queue_order = FILO }) $ return () when (shouldRun "stressUnitFair") $ stress fair_queue_configuration $ return () withMVar fail_strs $ \strs -> do forM strs $ \s -> putStrLn $ "FAILED: " ++ s when (not $ Prelude.null strs) $ exitFailure putStrLn "Done."