{-# LANGUAGE BangPatterns, TypeSynonymInstances, FlexibleInstances, TupleSections #-} {-# LANGUAGE CPP #-} {-# OPTIONS_GHC -O2 #-} {- Building this module with -O0 causes streams not to fuse and too much - memory to be used. -} -- | -- Copyright: 2015 Joey Hess -- License: BSD-2-clause -- -- Concurrent output handling, internals. -- -- May change at any time. module System.Console.Concurrent.Internal where import System.IO #ifndef mingw32_HOST_OS import System.Posix.IO #endif import System.Directory import System.Exit import Control.Monad import Control.Monad.IO.Class (liftIO, MonadIO) import System.IO.Unsafe (unsafePerformIO) import Control.Concurrent import Control.Concurrent.STM import Control.Concurrent.Async import Data.Maybe import Data.List import Data.Monoid import qualified System.Process as P import qualified Data.Text as T import qualified Data.Text.IO as T import Control.Applicative import Prelude import Utility.Monad import Utility.Exception data OutputHandle = OutputHandle { outputLock :: TMVar Lock , outputBuffer :: TMVar OutputBuffer , errorBuffer :: TMVar OutputBuffer , outputThreads :: TMVar Integer , processWaiters :: TMVar [Async ()] , waitForProcessLock :: TMVar () } data Lock = Locked -- | A shared global variable for the OutputHandle. {-# NOINLINE globalOutputHandle #-} globalOutputHandle :: OutputHandle globalOutputHandle = unsafePerformIO $ OutputHandle <$> newEmptyTMVarIO <*> newTMVarIO (OutputBuffer []) <*> newTMVarIO (OutputBuffer []) <*> newTMVarIO 0 <*> newTMVarIO [] <*> newEmptyTMVarIO -- | Holds a lock while performing an action. This allows the action to -- perform its own output to the console, without using functions from this -- module. -- -- While this is running, other threads that try to lockOutput will block. -- Any calls to `outputConcurrent` and `createProcessConcurrent` will not -- block, but the output will be buffered and displayed only once the -- action is done. lockOutput :: (MonadIO m, MonadMask m) => m a -> m a lockOutput = bracket_ (liftIO takeOutputLock) (liftIO dropOutputLock) -- | Blocks until we have the output lock. takeOutputLock :: IO () takeOutputLock = void $ takeOutputLock' True -- | Tries to take the output lock, without blocking. tryTakeOutputLock :: IO Bool tryTakeOutputLock = takeOutputLock' False withLock :: (TMVar Lock -> STM a) -> IO a withLock a = atomically $ a (outputLock globalOutputHandle) takeOutputLock' :: Bool -> IO Bool takeOutputLock' block = do locked <- withLock $ \l -> do v <- tryTakeTMVar l case v of Just Locked | block -> retry | otherwise -> do -- Restore value we took. putTMVar l Locked return False Nothing -> do putTMVar l Locked return True when locked $ do (outbuf, errbuf) <- atomically $ (,) <$> swapTMVar (outputBuffer globalOutputHandle) (OutputBuffer []) <*> swapTMVar (errorBuffer globalOutputHandle) (OutputBuffer []) emitOutputBuffer StdOut outbuf emitOutputBuffer StdErr errbuf return locked -- | Only safe to call after taking the output lock. dropOutputLock :: IO () dropOutputLock = withLock $ void . takeTMVar -- | Use this around any actions that use `outputConcurrent` -- or `createProcessConcurrent` -- -- This is necessary to ensure that buffered concurrent output actually -- gets displayed before the program exits. withConcurrentOutput :: (MonadIO m, MonadMask m) => m a -> m a withConcurrentOutput a = a `finally` liftIO flushConcurrentOutput -- | Blocks until any processes started by `createProcessConcurrent` have -- finished, and any buffered output is displayed. Also blocks while -- `lockOutput` is is use. -- -- `withConcurrentOutput` calls this at the end, so you do not normally -- need to use this. flushConcurrentOutput :: IO () flushConcurrentOutput = do atomically $ do r <- takeTMVar (outputThreads globalOutputHandle) if r <= 0 then putTMVar (outputThreads globalOutputHandle) r else retry -- Take output lock to wait for anything else that might be -- currently generating output. lockOutput $ return () -- | Values that can be output. class Outputable v where toOutput :: v -> T.Text instance Outputable T.Text where toOutput = id instance Outputable String where toOutput = toOutput . T.pack -- | Displays a value to stdout. -- -- No newline is appended to the value, so if you want a newline, be sure -- to include it yourself. -- -- Uses locking to ensure that the whole output occurs atomically -- even when other threads are concurrently generating output. -- -- When something else is writing to the console at the same time, this does -- not block. It buffers the value, so it will be displayed once the other -- writer is done. outputConcurrent :: Outputable v => v -> IO () outputConcurrent = outputConcurrent' StdOut -- | Like `outputConcurrent`, but displays to stderr. -- -- (Does not throw an exception.) errorConcurrent :: Outputable v => v -> IO () errorConcurrent = outputConcurrent' StdErr outputConcurrent' :: Outputable v => StdHandle -> v -> IO () outputConcurrent' stdh v = bracket setup cleanup go where setup = tryTakeOutputLock cleanup False = return () cleanup True = dropOutputLock go True = do T.hPutStr h (toOutput v) hFlush h go False = do oldbuf <- atomically $ takeTMVar bv newbuf <- addOutputBuffer (Output (toOutput v)) oldbuf atomically $ putTMVar bv newbuf h = toHandle stdh bv = bufferFor stdh newtype ConcurrentProcessHandle = ConcurrentProcessHandle P.ProcessHandle toConcurrentProcessHandle :: (Maybe Handle, Maybe Handle, Maybe Handle, P.ProcessHandle) -> (Maybe Handle, Maybe Handle, Maybe Handle, ConcurrentProcessHandle) toConcurrentProcessHandle (i, o, e, h) = (i, o, e, ConcurrentProcessHandle h) -- | Use this to wait for processes started with -- `createProcessConcurrent` and `createProcessForeground`, and get their -- exit status. -- -- Note that such processes are actually automatically waited for -- internally, so not calling this explicitly will not result -- in zombie processes. This behavior differs from `P.waitForProcess` waitForProcessConcurrent :: ConcurrentProcessHandle -> IO ExitCode waitForProcessConcurrent (ConcurrentProcessHandle h) = bracket lock unlock checkexit where lck = waitForProcessLock globalOutputHandle lock = atomically $ tryPutTMVar lck () unlock True = atomically $ takeTMVar lck unlock False = return () checkexit locked = maybe (waitsome locked) return =<< P.getProcessExitCode h waitsome True = do let v = processWaiters globalOutputHandle l <- atomically $ readTMVar v if null l -- Avoid waitAny [] which blocks forever then P.waitForProcess h else do -- Wait for any of the running -- processes to exit. It may or may not -- be the one corresponding to the -- ProcessHandle. If it is, -- getProcessExitCode will succeed. void $ tryIO $ waitAny l checkexit True waitsome False = do -- Another thread took the lck first. Wait for that thread to -- wait for one of the running processes to exit. atomically $ do putTMVar lck () takeTMVar lck checkexit False -- Registers an action that waits for a process to exit, -- adding it to the processWaiters list, and removing it once the action -- completes. asyncProcessWaiter :: IO () -> IO () asyncProcessWaiter waitaction = do regdone <- newEmptyTMVarIO waiter <- async $ do self <- atomically (takeTMVar regdone) waitaction `finally` unregister self register waiter regdone where v = processWaiters globalOutputHandle register waiter regdone = atomically $ do l <- takeTMVar v putTMVar v (waiter:l) putTMVar regdone waiter unregister waiter = atomically $ do l <- takeTMVar v putTMVar v (filter (/= waiter) l) -- | Wrapper around `System.Process.createProcess` that prevents -- multiple processes that are running concurrently from writing -- to stdout/stderr at the same time. -- -- If the process does not output to stdout or stderr, it's run -- by createProcess entirely as usual. Only processes that can generate -- output are handled specially: -- -- A process is allowed to write to stdout and stderr in the usual -- way, assuming it can successfully take the output lock. -- -- When the output lock is held (ie, by another concurrent process, -- or because `outputConcurrent` is being called at the same time), -- the process is instead run with its stdout and stderr -- redirected to a buffer. The buffered output will be displayed as soon -- as the output lock becomes free. -- -- Currently only available on Unix systems, not Windows. #ifndef mingw32_HOST_OS createProcessConcurrent :: P.CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, ConcurrentProcessHandle) createProcessConcurrent p | willOutput (P.std_out p) || willOutput (P.std_err p) = ifM tryTakeOutputLock ( fgProcess p , bgProcess p ) | otherwise = do r@(_, _, _, h) <- P.createProcess p asyncProcessWaiter $ void $ tryIO $ P.waitForProcess h return (toConcurrentProcessHandle r) #endif -- | Wrapper around `System.Process.createProcess` that makes sure a process -- is run in the foreground, with direct access to stdout and stderr. -- Useful when eg, running an interactive process. createProcessForeground :: P.CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, ConcurrentProcessHandle) createProcessForeground p = do takeOutputLock fgProcess p fgProcess :: P.CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, ConcurrentProcessHandle) fgProcess p = do r@(_, _, _, h) <- P.createProcess p `onException` dropOutputLock registerOutputThread -- Wait for the process to exit and drop the lock. asyncProcessWaiter $ do void $ tryIO $ P.waitForProcess h unregisterOutputThread dropOutputLock return (toConcurrentProcessHandle r) #ifndef mingw32_HOST_OS bgProcess :: P.CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, ConcurrentProcessHandle) bgProcess p = do (toouth, fromouth) <- pipe (toerrh, fromerrh) <- pipe let p' = p { P.std_out = rediroutput (P.std_out p) toouth , P.std_err = rediroutput (P.std_err p) toerrh } registerOutputThread r@(_, _, _, h) <- P.createProcess p' `onException` unregisterOutputThread asyncProcessWaiter $ void $ tryIO $ P.waitForProcess h outbuf <- setupOutputBuffer StdOut toouth (P.std_out p) fromouth errbuf <- setupOutputBuffer StdErr toerrh (P.std_err p) fromerrh void $ async $ bufferWriter [outbuf, errbuf] return (toConcurrentProcessHandle r) where pipe = do (from, to) <- createPipe (,) <$> fdToHandle to <*> fdToHandle from rediroutput ss h | willOutput ss = P.UseHandle h | otherwise = ss #endif willOutput :: P.StdStream -> Bool willOutput P.Inherit = True willOutput _ = False -- | Buffered output. data OutputBuffer = OutputBuffer [OutputBufferedActivity] deriving (Eq) data StdHandle = StdOut | StdErr toHandle :: StdHandle -> Handle toHandle StdOut = stdout toHandle StdErr = stderr bufferFor :: StdHandle -> TMVar OutputBuffer bufferFor StdOut = outputBuffer globalOutputHandle bufferFor StdErr = errorBuffer globalOutputHandle data OutputBufferedActivity = Output T.Text | InTempFile { tempFile :: FilePath , endsInNewLine :: Bool } deriving (Eq) data AtEnd = AtEnd deriving Eq data BufSig = BufSig setupOutputBuffer :: StdHandle -> Handle -> P.StdStream -> Handle -> IO (StdHandle, MVar OutputBuffer, TMVar BufSig, TMVar AtEnd) setupOutputBuffer h toh ss fromh = do hClose toh buf <- newMVar (OutputBuffer []) bufsig <- atomically newEmptyTMVar bufend <- atomically newEmptyTMVar void $ async $ outputDrainer ss fromh buf bufsig bufend return (h, buf, bufsig, bufend) -- Drain output from the handle, and buffer it. outputDrainer :: P.StdStream -> Handle -> MVar OutputBuffer -> TMVar BufSig -> TMVar AtEnd -> IO () outputDrainer ss fromh buf bufsig bufend | willOutput ss = go | otherwise = atend where go = do t <- T.hGetChunk fromh if T.null t then atend else do modifyMVar_ buf $ addOutputBuffer (Output t) changed go atend = do atomically $ putTMVar bufend AtEnd hClose fromh changed = atomically $ do void $ tryTakeTMVar bufsig putTMVar bufsig BufSig registerOutputThread :: IO () registerOutputThread = do let v = outputThreads globalOutputHandle atomically $ putTMVar v . succ =<< takeTMVar v unregisterOutputThread :: IO () unregisterOutputThread = do let v = outputThreads globalOutputHandle atomically $ putTMVar v . pred =<< takeTMVar v -- Wait to lock output, and once we can, display everything -- that's put into the buffers, until the end. -- -- If end is reached before lock is taken, instead add the command's -- buffers to the global outputBuffer and errorBuffer. bufferWriter :: [(StdHandle, MVar OutputBuffer, TMVar BufSig, TMVar AtEnd)] -> IO () bufferWriter ts = do activitysig <- atomically newEmptyTMVar worker1 <- async $ lockOutput $ ifM (atomically $ tryPutTMVar activitysig ()) ( void $ mapConcurrently displaybuf ts , noop -- buffers already moved to global ) worker2 <- async $ void $ globalbuf activitysig worker1 void $ async $ do void $ waitCatch worker1 void $ waitCatch worker2 unregisterOutputThread where displaybuf v@(outh, buf, bufsig, bufend) = do change <- atomically $ (Right <$> takeTMVar bufsig) `orElse` (Left <$> takeTMVar bufend) l <- takeMVar buf putMVar buf (OutputBuffer []) emitOutputBuffer outh l case change of Right BufSig -> displaybuf v Left AtEnd -> return () globalbuf activitysig worker1 = do ok <- atomically $ do -- signal we're going to handle it -- (returns false if the displaybuf already did) ok <- tryPutTMVar activitysig () -- wait for end of all buffers when ok $ mapM_ (\(_outh, _buf, _bufsig, bufend) -> takeTMVar bufend) ts return ok when ok $ do -- add all of the command's buffered output to the -- global output buffer, atomically bs <- forM ts $ \(outh, buf, _bufsig, _bufend) -> (outh,) <$> takeMVar buf atomically $ forM_ bs $ \(outh, b) -> bufferOutputSTM' outh b -- worker1 might be blocked waiting for the output -- lock, and we've already done its job, so cancel it cancel worker1 -- Adds a value to the OutputBuffer. When adding Output to a Handle, -- it's cheaper to combine it with any already buffered Output to that -- same Handle. -- -- When the total buffered Output exceeds 1 mb in size, it's moved out of -- memory, to a temp file. This should only happen rarely, but is done to -- avoid some verbose process unexpectedly causing excessive memory use. addOutputBuffer :: OutputBufferedActivity -> OutputBuffer -> IO OutputBuffer addOutputBuffer (Output t) (OutputBuffer buf) | T.length t' <= 1048576 = return $ OutputBuffer (Output t' : other) | otherwise = do tmpdir <- getTemporaryDirectory (tmp, h) <- openTempFile tmpdir "output.tmp" let !endnl = endsNewLine t' let i = InTempFile { tempFile = tmp , endsInNewLine = endnl } T.hPutStr h t' hClose h return $ OutputBuffer (i : other) where !t' = T.concat (mapMaybe getOutput this) <> t !(this, other) = partition isOutput buf isOutput v = case v of Output _ -> True _ -> False getOutput v = case v of Output t'' -> Just t'' _ -> Nothing addOutputBuffer v (OutputBuffer buf) = return $ OutputBuffer (v:buf) -- | Adds a value to the output buffer for later display. -- -- Note that buffering large quantities of data this way will keep it -- resident in memory until it can be displayed. While `outputConcurrent` -- uses temp files if the buffer gets too big, this STM function cannot do -- so. bufferOutputSTM :: Outputable v => StdHandle -> v -> STM () bufferOutputSTM h v = bufferOutputSTM' h (OutputBuffer [Output (toOutput v)]) bufferOutputSTM' :: StdHandle -> OutputBuffer -> STM () bufferOutputSTM' h (OutputBuffer newbuf) = do (OutputBuffer buf) <- takeTMVar bv putTMVar bv (OutputBuffer (newbuf ++ buf)) where bv = bufferFor h -- | A STM action that waits for some buffered output to become -- available, and returns it. -- -- The function can select a subset of output when only some is desired; -- the fst part is returned and the snd is left in the buffer. -- -- This will prevent it from being displayed in the usual way, so you'll -- need to use `emitOutputBuffer` to display it yourself. outputBufferWaiterSTM :: (OutputBuffer -> (OutputBuffer, OutputBuffer)) -> STM (StdHandle, OutputBuffer) outputBufferWaiterSTM selector = waitgetbuf StdOut `orElse` waitgetbuf StdErr where waitgetbuf h = do let bv = bufferFor h (selected, rest) <- selector <$> takeTMVar bv when (selected == OutputBuffer []) retry putTMVar bv rest return (h, selected) waitAnyBuffer :: OutputBuffer -> (OutputBuffer, OutputBuffer) waitAnyBuffer b = (b, OutputBuffer []) -- | Use with `outputBufferWaiterSTM` to make it only return buffered -- output that ends with a newline. Anything buffered without a newline -- is left in the buffer. waitCompleteLines :: OutputBuffer -> (OutputBuffer, OutputBuffer) waitCompleteLines (OutputBuffer l) = let (selected, rest) = span completeline l in (OutputBuffer selected, OutputBuffer rest) where completeline (v@(InTempFile {})) = endsInNewLine v completeline (Output b) = endsNewLine b endsNewLine :: T.Text -> Bool endsNewLine t = not (T.null t) && T.last t == '\n' -- | Emits the content of the OutputBuffer to the Handle -- -- If you use this, you should use `lockOutput` to ensure you're the only -- thread writing to the console. emitOutputBuffer :: StdHandle -> OutputBuffer -> IO () emitOutputBuffer stdh (OutputBuffer l) = forM_ (reverse l) $ \ba -> case ba of Output t -> emit t InTempFile tmp _ -> do emit =<< T.readFile tmp void $ tryWhenExists $ removeFile tmp where outh = toHandle stdh emit t = void $ tryIO $ do T.hPutStr outh t hFlush outh