| 1 | ------------------------------------------------------------------------------- |
|---|
| 2 | -- minimal example of a much bigger program (which is my excuse for strange |
|---|
| 3 | -- looking code) |
|---|
| 4 | -- |
|---|
| 5 | -- compile with |
|---|
| 6 | -- |
|---|
| 7 | -- ghc --make -O2 -threaded Pi.hs -o pi -rtsopts -fforce-recomp |
|---|
| 8 | -- |
|---|
| 9 | -- and execute it (for example, on a 2 core system) with |
|---|
| 10 | -- |
|---|
| 11 | -- pi +RTS -N2 |
|---|
| 12 | -- |
|---|
| 13 | -- For inifite runs, try |
|---|
| 14 | -- |
|---|
| 15 | -- i=0; while true;do printf "%4d\n" $((i=$i+1));pi +RTS -N2;done |
|---|
| 16 | -- |
|---|
| 17 | -- on a system with bash. |
|---|
| 18 | ------------------------------------------------------------------------------- |
|---|
| 19 | |
|---|
| 20 | {-# LANGUAGE FlexibleContexts, MultiParamTypeClasses, FlexibleInstances#-} |
|---|
| 21 | module Main where |
|---|
| 22 | import Control.Monad |
|---|
| 23 | import Control.Concurrent |
|---|
| 24 | import Control.Parallel.Strategies |
|---|
| 25 | import System.Environment |
|---|
| 26 | import Data.Array.IO |
|---|
| 27 | import Control.Concurrent.STM |
|---|
| 28 | import System.Posix.Signals |
|---|
| 29 | import Data.Maybe |
|---|
| 30 | import GHC.Conc |
|---|
| 31 | import System.IO |
|---|
| 32 | import System.IO.Unsafe |
|---|
| 33 | import qualified Data.Set as Set |
|---|
| 34 | import Data.Set (Set) |
|---|
| 35 | import Data.Time.Format |
|---|
| 36 | import System.Locale |
|---|
| 37 | import Data.Time |
|---|
| 38 | import Data.Time.Clock.POSIX |
|---|
| 39 | import Text.Printf |
|---|
| 40 | |
|---|
| 41 | |
|---|
| 42 | main :: IO () |
|---|
| 43 | main = do |
|---|
| 44 | installLogger |
|---|
| 45 | pool <- newSTMPQueue 16 |
|---|
| 46 | let tasks = replicate 64 1 |
|---|
| 47 | mapM_ (putSTMPQueue pool) tasks |
|---|
| 48 | ids <- forM [0..numCapabilities-1] (\n -> forkIO $ thread n calcPi pool) |
|---|
| 49 | waitSTMPQueue pool |
|---|
| 50 | debug "__________________________________" |
|---|
| 51 | writeLog_ True |
|---|
| 52 | putStrLn "SHOULD EXIT" |
|---|
| 53 | |
|---|
| 54 | |
|---|
| 55 | thread n f pool = do |
|---|
| 56 | task <- getSTMPQueue n pool |
|---|
| 57 | case task of |
|---|
| 58 | Nothing -> return () |
|---|
| 59 | Just t -> do |
|---|
| 60 | f pool t |
|---|
| 61 | thread n f pool |
|---|
| 62 | |
|---|
| 63 | |
|---|
| 64 | --- pi calculation functions for number crunching ---------------------------- |
|---|
| 65 | -- doing nothing to allow compilcation with GHC HEAD!!! the bug still occurs. |
|---|
| 66 | |
|---|
| 67 | -- calcPiPure :: Int -> Int |
|---|
| 68 | -- calcPiPure digits = showCReal (fromEnum digits) pi `pseq` 1 |
|---|
| 69 | |
|---|
| 70 | |
|---|
| 71 | calcPi :: t -> Int -> IO () |
|---|
| 72 | calcPi _ digits = do |
|---|
| 73 | debug $ "T calc " ++ show digits |
|---|
| 74 | --calcPiPure digits `pseq` return () |
|---|
| 75 | debug $ "T finished" |
|---|
| 76 | |
|---|
| 77 | |
|---|
| 78 | |
|---|
| 79 | --- STM based global queue with an additional private part ------------------ |
|---|
| 80 | data Show a => STMPQueue a = STMPQueue { |
|---|
| 81 | stmChan :: STMCQueue a |
|---|
| 82 | , stmState :: TVar STMState |
|---|
| 83 | , stmFinished :: TChan () |
|---|
| 84 | , stmWorking :: TVar (Set ThreadId) |
|---|
| 85 | |
|---|
| 86 | -- for the private queue |
|---|
| 87 | , stmPrivate :: TArray (Int,Int) a -- currently a bit slow... |
|---|
| 88 | , stmIndex :: TArray Int (Int, Int) |
|---|
| 89 | , stmSize :: Int |
|---|
| 90 | } |
|---|
| 91 | |
|---|
| 92 | data STMState = |
|---|
| 93 | SPut |
|---|
| 94 | | SWait |
|---|
| 95 | deriving (Show, Eq) |
|---|
| 96 | |
|---|
| 97 | |
|---|
| 98 | newSTMPQueue :: Show a => Int -> IO (STMPQueue a) |
|---|
| 99 | newSTMPQueue size = do |
|---|
| 100 | chan <- newSTMCQueue |
|---|
| 101 | state <- newTVarIO SPut |
|---|
| 102 | finished <- newTChanIO |
|---|
| 103 | working <- newTVarIO Set.empty |
|---|
| 104 | |
|---|
| 105 | (private, index) <- atomically $ do |
|---|
| 106 | private <- newArray_ ((0,0), (numCapabilities-1,size-1)) |
|---|
| 107 | index <- newArray (0,numCapabilities-1) (0,0) |
|---|
| 108 | return (private,index) |
|---|
| 109 | return $ STMPQueue chan state finished working private index size |
|---|
| 110 | |
|---|
| 111 | |
|---|
| 112 | putSTMPQueue :: Show a => STMPQueue a -> a -> IO () |
|---|
| 113 | putSTMPQueue (STMPQueue chan state finished working _ _ _) a = do |
|---|
| 114 | atomically $ writeSTMCQueue chan a |
|---|
| 115 | |
|---|
| 116 | |
|---|
| 117 | waitSTMPQueue pool@(STMPQueue chan state finished working _ _ _) = do |
|---|
| 118 | atomically $ writeTVar state SWait |
|---|
| 119 | atomically $ do |
|---|
| 120 | work <- readTVar working |
|---|
| 121 | empty <- isEmptySTMCQueue chan |
|---|
| 122 | check (Set.null work && empty) |
|---|
| 123 | writeTVar state SPut |
|---|
| 124 | return () |
|---|
| 125 | |
|---|
| 126 | |
|---|
| 127 | getSTMPQueue :: Show a => Int -> STMPQueue a -> IO (Maybe a) |
|---|
| 128 | getSTMPQueue idx pool@(STMPQueue chan state finished working private index |
|---|
| 129 | size) = do |
|---|
| 130 | (curidx,midx) <- atomically $ readArray index idx |
|---|
| 131 | if curidx == midx |
|---|
| 132 | then do |
|---|
| 133 | debug "private queue empty" |
|---|
| 134 | loop |
|---|
| 135 | else do task <- atomically $ readArray private (idx, curidx) |
|---|
| 136 | atomically $ writeArray index idx (curidx+1,midx) |
|---|
| 137 | return (Just task) |
|---|
| 138 | |
|---|
| 139 | where loop = do |
|---|
| 140 | tid <- myThreadId |
|---|
| 141 | |
|---|
| 142 | atomically $ do |
|---|
| 143 | work <- Set.delete tid `fmap` readTVar working |
|---|
| 144 | writeTVar working $! work |
|---|
| 145 | |
|---|
| 146 | atomically $ do |
|---|
| 147 | empty <- isEmptySTMCQueue chan |
|---|
| 148 | work <- readTVar working |
|---|
| 149 | op <- readTVar state |
|---|
| 150 | |
|---|
| 151 | if (not empty) |
|---|
| 152 | then do |
|---|
| 153 | a@(task:rest) <- readSTMCQueue chan (size+1) |
|---|
| 154 | unsafeIOToSTM $ debug $ "my tasks: " ++ show a |
|---|
| 155 | forM_ (zip [0..] rest) $ \(col,v) -> |
|---|
| 156 | writeArray private (idx,col) v |
|---|
| 157 | writeArray index idx $! (0,length rest) |
|---|
| 158 | writeTVar working $! (Set.insert tid work) |
|---|
| 159 | return (Just task) |
|---|
| 160 | else do |
|---|
| 161 | case op of |
|---|
| 162 | SPut -> retry |
|---|
| 163 | SWait -> do |
|---|
| 164 | if Set.null work |
|---|
| 165 | then return Nothing |
|---|
| 166 | else retry |
|---|
| 167 | |
|---|
| 168 | |
|---|
| 169 | --- For debugging output ---------------------------------------------------- |
|---|
| 170 | debugChan :: Chan String |
|---|
| 171 | debugChan = unsafePerformIO newChan |
|---|
| 172 | |
|---|
| 173 | debugTime :: MVar UTCTime |
|---|
| 174 | debugTime = unsafePerformIO (newMVar =<< getCurrentTime) |
|---|
| 175 | |
|---|
| 176 | debug :: String -> IO () |
|---|
| 177 | debug msg = writeChan debugChan =<< debugStr msg |
|---|
| 178 | |
|---|
| 179 | debug_ :: String -> IO () |
|---|
| 180 | debug_ msg = do |
|---|
| 181 | s <- debugStr msg |
|---|
| 182 | hPutStrLn stderr s |
|---|
| 183 | debug msg |
|---|
| 184 | |
|---|
| 185 | debugStr :: String -> IO String |
|---|
| 186 | debugStr msg = do |
|---|
| 187 | tid <- (drop 9 . show) `fmap` myThreadId |
|---|
| 188 | t <- getCurrentTime |
|---|
| 189 | told <- swapMVar debugTime t |
|---|
| 190 | let td = diffUTCTime t told |
|---|
| 191 | ts = init $ show td |
|---|
| 192 | tl = if read ts < 0.0 |
|---|
| 193 | then "0.000000 " |
|---|
| 194 | else printf "%-10s" ts |
|---|
| 195 | return $ tl ++ " " ++ tid ++ " " ++ msg |
|---|
| 196 | |
|---|
| 197 | |
|---|
| 198 | writeLog :: IO () |
|---|
| 199 | writeLog = writeLog_ False |
|---|
| 200 | |
|---|
| 201 | |
|---|
| 202 | -- check, if an environment variable LOG exists. |
|---|
| 203 | writeLog_ :: Bool -> IO () |
|---|
| 204 | writeLog_ checkEnv = do |
|---|
| 205 | env <- map fst `fmap` getEnvironment |
|---|
| 206 | when (checkEnv || ("LOG" `elem` env)) showLog |
|---|
| 207 | where showLog = do |
|---|
| 208 | e <- isEmptyChan debugChan |
|---|
| 209 | unless e $ do |
|---|
| 210 | value <- readChan debugChan |
|---|
| 211 | putStrLn value |
|---|
| 212 | showLog |
|---|
| 213 | |
|---|
| 214 | numEnv :: String -> IO Int |
|---|
| 215 | numEnv ss = do |
|---|
| 216 | (read . fromJust . lookup ss) `fmap` getEnvironment |
|---|
| 217 | |
|---|
| 218 | |
|---|
| 219 | whenEnv :: String -> IO () -> IO () |
|---|
| 220 | whenEnv ss f = do |
|---|
| 221 | env <- map fst `fmap` getEnvironment |
|---|
| 222 | when (ss `elem` env) f |
|---|
| 223 | |
|---|
| 224 | |
|---|
| 225 | installLogger :: IO () |
|---|
| 226 | installLogger = do |
|---|
| 227 | installHandler sigINT (CatchOnce handleInt) Nothing |
|---|
| 228 | return () |
|---|
| 229 | where handleInt = do |
|---|
| 230 | putStrLn "" -- prevent that ^C is displayed on 1st log line |
|---|
| 231 | debug "SIGINT caught." |
|---|
| 232 | writeLog_ True |
|---|
| 233 | raiseSignal sigINT |
|---|
| 234 | |
|---|
| 235 | |
|---|
| 236 | --- Counting Chan using STM ------------------------------------------------- |
|---|
| 237 | data STMCQueue a = STMCQueue { |
|---|
| 238 | scqList :: TChan a |
|---|
| 239 | , scqSize :: TVar Int |
|---|
| 240 | } |
|---|
| 241 | |
|---|
| 242 | |
|---|
| 243 | newSTMCQueue :: IO (STMCQueue a) |
|---|
| 244 | newSTMCQueue = do |
|---|
| 245 | list <- newTChanIO |
|---|
| 246 | size <- newTVarIO 0 |
|---|
| 247 | return (STMCQueue list size) |
|---|
| 248 | |
|---|
| 249 | |
|---|
| 250 | isEmptySTMCQueue :: STMCQueue a -> STM Bool |
|---|
| 251 | isEmptySTMCQueue (STMCQueue list size) = do |
|---|
| 252 | ioSize <- readTVar size |
|---|
| 253 | return $! (ioSize == 0) |
|---|
| 254 | |
|---|
| 255 | |
|---|
| 256 | writeSTMCQueue :: STMCQueue a -> a -> STM () |
|---|
| 257 | writeSTMCQueue (STMCQueue list size) value = do |
|---|
| 258 | ioSize <- readTVar size |
|---|
| 259 | writeTChan list value |
|---|
| 260 | writeTVar size $! (ioSize + 1) |
|---|
| 261 | |
|---|
| 262 | |
|---|
| 263 | writeList2STMCQueue :: STMCQueue a -> [a] -> STM () |
|---|
| 264 | writeList2STMCQueue (STMCQueue list size) values = do |
|---|
| 265 | ioSize <- readTVar size |
|---|
| 266 | mapM_ (writeTChan list) values |
|---|
| 267 | writeTVar size $! (ioSize + length values) |
|---|
| 268 | |
|---|
| 269 | |
|---|
| 270 | readSTMCQueue :: STMCQueue a -> Int -> STM [a] |
|---|
| 271 | -- return min(n,size) elements from list |
|---|
| 272 | readSTMCQueue (STMCQueue list size) n = do |
|---|
| 273 | ioSize <- readTVar size |
|---|
| 274 | unsafeIOToSTM $ debug $ "# queue: " ++ show ioSize |
|---|
| 275 | let taken = min ioSize n |
|---|
| 276 | values <- replicateM taken (readTChan list) |
|---|
| 277 | check (length values == taken) |
|---|
| 278 | writeTVar size (ioSize - taken) |
|---|
| 279 | return values |
|---|
| 280 | |
|---|