module UnagiNoBlocking (unagiNoBlockingMain) where -- Unagi-chan-specific tests import Control.Concurrent.Chan.Unagi.NoBlocking import qualified Control.Concurrent.Chan.Unagi.NoBlocking.Internal as UI import Control.Monad import qualified Data.Primitive as P import Data.IORef import System.Mem(performGC) import Data.List(sort) import Control.Concurrent(forkIO,yield,threadDelay) import Control.Concurrent.MVar import Control.Exception import Data.Atomics.Counter.Fat unagiNoBlockingMain :: IO () unagiNoBlockingMain = do putStrLn "===================" putStrLn "Testing Unagi.NoBlocking details:" -- ------ putStr "Smoke test at different starting offsets, spanning overflow... " mapM_ smoke $ [ (maxBound - UI.sEGMENT_LENGTH - 1) .. maxBound] ++ [minBound .. (minBound + UI.sEGMENT_LENGTH + 1)] putStrLn "OK" -- ------ putStr "Correct first write... " mapM_ correctFirstWrite [ (maxBound - 7), maxBound, minBound, 0] putStrLn "OK" -- ------ putStr "Correct initial writes... " mapM_ correctInitialWrites [ (maxBound - UI.sEGMENT_LENGTH), (maxBound - UI.sEGMENT_LENGTH) - 1, maxBound, minBound, 0] putStrLn "OK" -- ------ putStr "Checking isActive... " replicateM_ 10 isActiveTest replicateM_ 10 readChanYieldTest putStrLn "OK" -- ------ putStr "Checking streamChan... " streamChanSmoke replicateM_ 3 $ streamChanConcurrentStreamerReader 10000000 replicateM_ 3 $ streamChanConcurrentStreamerWriter 10000000 putStrLn "OK" -- ------ let tries = 10000 putStrLn $ "Checking for deadlocks from killed Unagi reader in a fancy way, x"++show tries checkDeadlocksReaderUnagi tries -- Helper for when we know a read should succeed immediately: tryReadChanErr :: OutChan a -> IO a tryReadChanErr oc = tryReadChan oc >>= tryRead >>= maybe (error "A read we expected to succeed failed!") return -- TODO CONSIDER ADDING newChanStarting (or raplcing newChan) TO IMPLEMENTATIONS, AND CONSOLIDATE THESE IN Smoke.hs smoke :: Int -> IO () smoke n = smoke1 n >> smoke2 n -- www.../rrr... spanning overflow smoke1 :: Int -> IO () smoke1 n = do (i,o) <- UI.newChanStarting n let inp = [0 .. (UI.sEGMENT_LENGTH * 3)] mapM_ (writeChan i) inp forM_ inp $ \inx-> do outx <- tryReadChanErr o unless (inx == outx) $ error $ "Smoke test failed with starting offset of: "++(show n) -- w/r/w/r... spanning overflow smoke2 :: Int -> IO () smoke2 n = do (i,o) <- UI.newChanStarting n let inp = [0 .. (UI.sEGMENT_LENGTH * 3)] mapM_ (check i o) inp where check i o x = do writeChan i x x' <- tryReadChanErr o if x == x' then return () else error $ "Smoke test failed with starting offset of: "++(show n)++"at write: "++(show x) correctFirstWrite :: Int -> IO () correctFirstWrite n = do (i,oc@(UI.OutChan _ (UI.ChanEnd _ _ arrRef))) <- UI.newChanStarting n actv <- isActive oc unless actv $ error "not isActive before first and only write" writeChan i () (UI.StreamHead _ (UI.Stream arr _)) <- readIORef arrRef cell <- P.readArray arr 0 case cell of Just () -> return () _ -> error "Expected a Write at index 0" -- check writes by doing a segment+1-worth of reads by hand -- also check that segments pre-allocated as expected correctInitialWrites :: Int -> IO () correctInitialWrites startN = do (i,(UI.OutChan _ (UI.ChanEnd _ _ arrRef))) <- UI.newChanStarting startN let writes = [0..UI.sEGMENT_LENGTH] mapM_ (writeChan i) writes (UI.StreamHead _ (UI.Stream arr next)) <- readIORef arrRef -- check all of first segment: forM_ (init writes) $ \ix-> do cell <- P.readArray arr ix case cell of Just n | n == ix -> return () | otherwise -> error $ "Expected a Write at index "++(show ix)++" of same value but got "++(show n) _ -> error $ "Expected a Write at index "++(show ix) -- check last write: lastSeg <- readIORef next case lastSeg of (UI.Next (UI.Stream arr2 next2)) -> do cell <- P.readArray arr2 0 case cell of Just n | n == last writes -> return () | otherwise -> error $ "Expected last write at index "++(show $ last writes)++" of same value but got "++(show n) _ -> error "Expected a last Write" -- check pre-allocation: n2 <- readIORef next2 case n2 of (UI.Next (UI.Stream _ next3)) -> do n3 <- readIORef next3 case n3 of UI.NoSegment -> return () _ -> error "Too many segments pre-allocated!" _ -> error "Next segment was not pre-allocated!" _ -> error "No last segment present!" {- NOTE: we would like this to pass (see trac #9030) but are happy to note that - it fails which somewhat validates the test below testBlockedRecovery = do (i,o) <- newChan v <- newEmptyMVar rid <- forkIO (putMVar v () >> readChan o) takeMVar v threadDelay 1000 throwTo rid ThreadKilled -- we race the exception-handler in `readChan` here... writeChan i () -- In a buggy implementation, this would consistently win failing by losing -- the message and raising BlockedIndefinitely here: readChan o -} -- test for deadlocks caused by async exceptions in reader. checkDeadlocksReaderUnagi :: Int -> IO () checkDeadlocksReaderUnagi times = do let run 0 normalRetries numRace = putStrLn $ "Lates: "++(show normalRetries)++", Races: "++(show numRace) run n normalRetries numRace | (normalRetries + numRace) > (times `div` 3) = error "This test is taking too long. Please retry, and if still failing send the log to me" | otherwise = do (i,o) <- UI.newChanStarting 0 -- preload a chan with 0s let numPreloaded = 10000 replicateM_ numPreloaded $ writeChan i (0::Int) rStart <- newEmptyMVar rid <- forkIO $ (putMVar rStart () >> (forever $ void $ readChan yield o)) takeMVar rStart >> threadDelay 1 throwTo rid ThreadKilled -- did killing reader damage queue for reads or writes? writeChan i 1 `onException` ( putStrLn "Exception from first writeChan!") writeChan i 2 `onException` ( putStrLn "Exception from second writeChan!") finalRead <- tryReadChanErr o `onException` ( putStrLn "Exception from final tryReadChan!") oCnt <- readCounter $ (\(UI.OutChan _ (UI.ChanEnd _ cntr _))-> cntr) o iCnt <- readCounter $ (\(UI.InChan _ (UI.ChanEnd _ cntr _))-> cntr) i unless (iCnt == numPreloaded + 2) $ error "The InChan counter doesn't match what we'd expect from numPreloaded!" case finalRead of 0 -> if oCnt <= 0 -- (technically, -1 means hasn't started yet) -- reader didn't have a chance to get started then putStr "0" >> run n (normalRetries+1) numRace -- normal run; we tested that killing a reader didn't -- break chan for other readers/writers: else putStr "." >> run (n-1) normalRetries numRace -- -- Rare. Reader was killed after reading all pre-loaded messages -- but before starting what would be the blocking read: 1 | oCnt == numPreloaded+1 -> putStr "X" >> run n normalRetries (numRace + 1) | otherwise -> error $ "Having read final 1, "++ "Expecting a counter value of "++(show $ numPreloaded+1)++ " but got: "++(show oCnt) 2 -> do unless (oCnt == numPreloaded + 2) $ error $ "Having read final 2, "++ "Expecting a counter value of "++(show $ numPreloaded+2)++ " but got: "++(show oCnt) putStr "+" >> run n (normalRetries + 1) numRace _ -> error "Fix your #$%@ test!" run times 0 0 putStrLn "" -- do a series of writes, forcing GC making sure remains true, then do the last -- write and loop on forcing GC until we see False. isActiveTest :: IO () isActiveTest = do let n = 100000 k = n `div` 100 let testWrites (inc,outc) = replicateM_ 100 $ do replicateM_ k $ writeChan inc () performGC actv <- isActive outc unless actv $ error "isActive returned False before last write!" lastWriteWait iters (inc,outc) = writeChan inc () >> go iters where go i | i < (0::Int) = error "Timed out waiting for isActive to return False. Anomaly or possible bug." | otherwise = do actv <- isActive outc when actv $ performGC >> go (i-1) -- first with newChan: c1 <- newChan testWrites c1 lastWriteWait 1000 c1 -- then with a duplicated channel: (inc2,_) <- newChan outc2 <- dupChan inc2 testWrites (inc2,outc2) lastWriteWait 1000 (inc2,outc2) -- Concurrently write [1..100000], while reading 100001 and prepending in a -- IORef. Then a handler catches and puts () in an MVar which we wait on. -- Then check that the elements were correct. readChanYieldTest :: IO () readChanYieldTest = do let n = 100000 :: Int (inc,outc) <- newChan saving <- newIORef [] exceptionRaised <- newIORef False goAhead <- newEmptyMVar let handling io = Control.Exception.catch io $ \BlockedIndefinitelyOnMVar -> writeIORef exceptionRaised True void $ forkIO $ replicateM_ (n+1) $ handling $ do x <- readChan yield outc modifyIORef' saving (x:) when (x == n) $ -- about to do final deadlocking loop: putMVar goAhead () void $ forkIO $ forM_ [1..n] $ writeChan inc takeMVar goAhead out <- readIORef saving unless (out == [n,n-1..1]) $ error "readChanYieldTest reads incorrect!" performGC threadDelay 100000 raised <- readIORef exceptionRaised unless raised $ error "Handler doesn't seem to have run in readChanYieldTest. Either a testing fluke or a bug." -- Smoke tests for different strides of interleaved streams, at different -- offsets, with concurrent stream readers. streamChanSmoke :: IO () streamChanSmoke = -- A few odd starting offsets, spanning Int/Counter overflow: forM_ [0, maxBound - UI.sEGMENT_LENGTH, maxBound - UI.sEGMENT_LENGTH + 1, maxBound, minBound] $ \startingOffset -> -- And few odd numbers of streams, where we especially want to exercise -- skips of entire segments: forM_ [1,17, UI.sEGMENT_LENGTH, UI.sEGMENT_LENGTH+1, UI.sEGMENT_LENGTH*3+1] $ \numStreams-> do (i,o) <- UI.newChanStarting startingOffset let payload = 100000 :: Int -- and these are what we'll expect to see returned: let payloadPartsRev = map reverse $ [ [s,(s+numStreams).. payload ] | s<-[1..numStreams]] forM_ [1..payload] (writeChan i) strms <- streamChan numStreams o strmsReadOut <- replicateM numStreams newEmptyMVar unless (length strms == numStreams) $ error $ "numStreams /= length strms: " ++(show numStreams)++" vs "++(show $ length strms) ++" at offset: "++(show startingOffset) forM_ (zip strms strmsReadOut) (forkIO . consumeUntilEmpty []) parts <- forM (zip strmsReadOut payloadPartsRev) $ \(v,expectedStack)-> do stack <- takeMVar v unless (stack == expectedStack) $ error $ "Incorrect stream reads: "++(show stack) return stack unless ((sort $ concat parts) == [1..payload]) $ error $ "Somehow read parts weren't what we expected: "++(show parts) where consumeUntilEmpty stack (strm,v) = do h <- tryReadNext strm case h of (Next x xs) -> consumeUntilEmpty (x:stack) (xs,v) Pending -> putMVar v stack -- Done -- Simple writer/streamer concurrency test streamChanConcurrentStreamerWriter :: Int -> IO () streamChanConcurrentStreamerWriter n = do (i,o) <- newChan [strm] <- streamChan 1 o v <- newEmptyMVar let streamReader s stack itr failCnt | failCnt > 4 = putMVar v $ Left "failCnt exceeded; possibly bug, but probably anomaly" | itr > n = putMVar v $ Right stack | otherwise = do xs <- tryReadNext s case xs of Pending -> threadDelay 1000 >> streamReader s stack itr (failCnt+1) Next x xs' -> streamReader xs' (x:stack) (itr+1) 0 void $ forkIO $ streamReader strm [] (1::Int) (0::Int) void $ forkIO $ mapM_ (writeChan i) [1..n] strmOut <- either error return =<< takeMVar v unless (strmOut == [n,n-1..1]) $ error $ "Stream reads were incorrect: "++(show strmOut) -- Simple reader/streamer concurrency test streamChanConcurrentStreamerReader :: Int -> IO () streamChanConcurrentStreamerReader n = do (i,o) <- newChan [strm] <- streamChan 1 o mapM_ (writeChan i) [1..n] vStream <- newEmptyMVar vOutchan <- newEmptyMVar let streamReader s stack = do xs <- tryReadNext s case xs of Pending -> putMVar vStream stack Next x xs' -> streamReader xs' (x:stack) outchanReader stack = do tryReadChan o >>= tryRead >>= maybe (putMVar vOutchan stack) (outchanReader . (:stack)) void $ forkIO $ streamReader strm [] void $ forkIO $ outchanReader [] strmOut <- takeMVar vStream `onException` putStr " :in takeMVar vStream: " rdOut <- takeMVar vOutchan `onException` putStr " :in takeMVar vOutchan: " let correctOut = [n,n-1..1] unless (strmOut == correctOut) $ error $ "Stream reads were incorrect: "++(show strmOut) unless (rdOut == correctOut) $ error $ "OutChan reads were incorrect: "++(show rdOut)