module System.Console.Concurrent (
withConcurrentOutput,
Outputable(..),
outputConcurrent,
createProcessConcurrent,
waitForProcessConcurrent,
flushConcurrentOutput,
lockOutput,
OutputBuffer,
StdHandle(..),
bufferOutputSTM,
outputBufferWaiterSTM,
waitAnyBuffer,
waitCompleteLines,
emitOutputBuffer,
) where
import System.IO
import System.Posix.IO
import System.Directory
import System.Exit
import Control.Monad
import Control.Monad.IO.Class (liftIO, MonadIO)
import Control.Applicative
import System.IO.Unsafe (unsafePerformIO)
import Control.Concurrent
import Control.Concurrent.STM
import Control.Concurrent.Async
import Data.Maybe
import Data.List
import Data.Monoid
import Data.Char
import qualified System.Process as P
import qualified Data.ByteString as B
import qualified Data.Text as T
import Data.Text.Encoding (encodeUtf8)
import Utility.Monad
import Utility.Exception
data OutputHandle = OutputHandle
{ outputLock :: TMVar Lock
, outputBuffer :: TMVar OutputBuffer
, errorBuffer :: TMVar OutputBuffer
, outputThreads :: TMVar Integer
}
data Lock = Locked
globalOutputHandle :: OutputHandle
globalOutputHandle = unsafePerformIO $ OutputHandle
<$> newEmptyTMVarIO
<*> newTMVarIO (OutputBuffer [])
<*> newTMVarIO (OutputBuffer [])
<*> newTMVarIO 0
lockOutput :: (MonadIO m, MonadMask m) => m a -> m a
lockOutput = bracket_ (liftIO takeOutputLock) (liftIO dropOutputLock)
takeOutputLock :: IO ()
takeOutputLock = void $ takeOutputLock' True
tryTakeOutputLock :: IO Bool
tryTakeOutputLock = takeOutputLock' False
withLock :: (TMVar Lock -> STM a) -> IO a
withLock a = atomically $ a (outputLock globalOutputHandle)
takeOutputLock' :: Bool -> IO Bool
takeOutputLock' block = do
locked <- withLock $ \l -> do
v <- tryTakeTMVar l
case v of
Just Locked
| block -> retry
| otherwise -> do
putTMVar l Locked
return False
Nothing -> do
putTMVar l Locked
return True
when locked $ do
(outbuf, errbuf) <- atomically $ (,)
<$> swapTMVar (outputBuffer globalOutputHandle) (OutputBuffer [])
<*> swapTMVar (errorBuffer globalOutputHandle) (OutputBuffer [])
emitOutputBuffer StdOut outbuf
emitOutputBuffer StdErr errbuf
return locked
dropOutputLock :: IO ()
dropOutputLock = withLock $ void . takeTMVar
withConcurrentOutput :: (MonadIO m, MonadMask m) => m a -> m a
withConcurrentOutput a = a `finally` liftIO flushConcurrentOutput
flushConcurrentOutput :: IO ()
flushConcurrentOutput = do
let v = outputThreads globalOutputHandle
atomically $ do
r <- takeTMVar v
if r <= 0
then putTMVar v r
else retry
lockOutput $ return ()
class Outputable v where
toOutput :: v -> B.ByteString
instance Outputable B.ByteString where
toOutput = id
instance Outputable T.Text where
toOutput = encodeUtf8
instance Outputable String where
toOutput = toOutput . T.pack
outputConcurrent :: Outputable v => v -> IO ()
outputConcurrent v = bracket setup cleanup go
where
setup = tryTakeOutputLock
cleanup False = return ()
cleanup True = dropOutputLock
go True = do
B.hPut stdout (toOutput v)
hFlush stdout
go False = do
let bv = outputBuffer globalOutputHandle
oldbuf <- atomically $ takeTMVar bv
newbuf <- addOutputBuffer (Output (toOutput v)) oldbuf
atomically $ putTMVar bv newbuf
waitForProcessConcurrent :: P.ProcessHandle -> IO ExitCode
waitForProcessConcurrent h = do
v <- tryWhenExists (P.waitForProcess h)
case v of
Just r -> return r
Nothing -> maybe (waitForProcessConcurrent h) return =<< P.getProcessExitCode h
createProcessConcurrent :: P.CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, P.ProcessHandle)
createProcessConcurrent p
| willOutput (P.std_out p) || willOutput (P.std_err p) =
ifM tryTakeOutputLock
( firstprocess
, concurrentprocess
)
| otherwise = P.createProcess p
where
rediroutput ss h
| willOutput ss = P.UseHandle h
| otherwise = ss
firstprocess = do
r@(_, _, _, h) <- P.createProcess p
`onException` dropOutputLock
void $ async $ do
void $ tryIO $ waitForProcessConcurrent h
dropOutputLock
return r
concurrentprocess = do
(toouth, fromouth) <- pipe
(toerrh, fromerrh) <- pipe
let p' = p
{ P.std_out = rediroutput (P.std_out p) toouth
, P.std_err = rediroutput (P.std_err p) toerrh
}
registerOutputThread
r <- P.createProcess p'
`onException` unregisterOutputThread
outbuf <- setupOutputBuffer StdOut toouth (P.std_out p) fromouth
errbuf <- setupOutputBuffer StdErr toerrh (P.std_err p) fromerrh
void $ async $ bufferWriter [outbuf, errbuf]
return r
pipe = do
(from, to) <- createPipe
(,) <$> fdToHandle to <*> fdToHandle from
willOutput :: P.StdStream -> Bool
willOutput P.Inherit = True
willOutput _ = False
data OutputBuffer = OutputBuffer [OutputBufferedActivity]
deriving (Eq)
data StdHandle = StdOut | StdErr
toHandle :: StdHandle -> Handle
toHandle StdOut = stdout
toHandle StdErr = stderr
bufferFor :: StdHandle -> TMVar OutputBuffer
bufferFor StdOut = outputBuffer globalOutputHandle
bufferFor StdErr = errorBuffer globalOutputHandle
data OutputBufferedActivity
= Output B.ByteString
| InTempFile
{ tempFile :: FilePath
, endsInNewLine :: Bool
}
deriving (Eq)
data AtEnd = AtEnd
deriving Eq
data BufSig = BufSig
setupOutputBuffer :: StdHandle -> Handle -> P.StdStream -> Handle -> IO (StdHandle, MVar OutputBuffer, TMVar BufSig, TMVar AtEnd)
setupOutputBuffer h toh ss fromh = do
hClose toh
buf <- newMVar (OutputBuffer [])
bufsig <- atomically newEmptyTMVar
bufend <- atomically newEmptyTMVar
void $ async $ outputDrainer ss fromh buf bufsig bufend
return (h, buf, bufsig, bufend)
outputDrainer :: P.StdStream -> Handle -> MVar OutputBuffer -> TMVar BufSig -> TMVar AtEnd -> IO ()
outputDrainer ss fromh buf bufsig bufend
| willOutput ss = go
| otherwise = atend
where
go = do
v <- tryIO $ B.hGetSome fromh 1048576
case v of
Right b | not (B.null b) -> do
modifyMVar_ buf $ addOutputBuffer (Output b)
changed
go
_ -> atend
atend = do
atomically $ putTMVar bufend AtEnd
hClose fromh
changed = atomically $ do
void $ tryTakeTMVar bufsig
putTMVar bufsig BufSig
registerOutputThread :: IO ()
registerOutputThread = do
hFlush stdout
let v = outputThreads globalOutputHandle
atomically $ putTMVar v . succ =<< takeTMVar v
unregisterOutputThread :: IO ()
unregisterOutputThread = do
hFlush stdout
let v = outputThreads globalOutputHandle
atomically $ putTMVar v . pred =<< takeTMVar v
bufferWriter :: [(StdHandle, MVar OutputBuffer, TMVar BufSig, TMVar AtEnd)] -> IO ()
bufferWriter ts = do
activitysig <- atomically newEmptyTMVar
worker1 <- async $ lockOutput $
ifM (atomically $ tryPutTMVar activitysig ())
( void $ mapConcurrently displaybuf ts
, noop
)
worker2 <- async $ void $ globalbuf activitysig
void $ async $ do
void $ waitCatch worker1
void $ waitCatch worker2
unregisterOutputThread
where
displaybuf v@(outh, buf, bufsig, bufend) = do
change <- atomically $
(Right <$> takeTMVar bufsig)
`orElse`
(Left <$> takeTMVar bufend)
l <- takeMVar buf
putMVar buf (OutputBuffer [])
emitOutputBuffer outh l
case change of
Right BufSig -> displaybuf v
Left AtEnd -> return ()
globalbuf activitysig = do
ok <- atomically $ do
ok <- tryPutTMVar activitysig ()
when ok $
mapM_ (\(_outh, _buf, _bufsig, bufend) -> takeTMVar bufend) ts
return ok
when ok $ do
bs <- forM ts $ \(outh, buf, _bufsig, _bufend) ->
(outh,) <$> takeMVar buf
atomically $
forM_ bs $ \(outh, b) ->
bufferOutputSTM' outh b
addOutputBuffer :: OutputBufferedActivity -> OutputBuffer -> IO OutputBuffer
addOutputBuffer (Output b) (OutputBuffer buf)
| B.length b' <= 1048576 = return $ OutputBuffer (Output b' : other)
| otherwise = do
tmpdir <- getTemporaryDirectory
(tmp, h) <- openTempFile tmpdir "output.tmp"
let !endnl = endsNewLine b'
let i = InTempFile
{ tempFile = tmp
, endsInNewLine = endnl
}
B.hPut h b'
hClose h
return $ OutputBuffer (i : other)
where
!b' = B.concat (mapMaybe getOutput this) <> b
!(this, other) = partition isOutput buf
isOutput v = case v of
Output _ -> True
_ -> False
getOutput v = case v of
Output b'' -> Just b''
_ -> Nothing
addOutputBuffer v (OutputBuffer buf) = return $ OutputBuffer (v:buf)
bufferOutputSTM :: Outputable v => StdHandle -> v -> STM ()
bufferOutputSTM h v = bufferOutputSTM' h (OutputBuffer [Output (toOutput v)])
bufferOutputSTM' :: StdHandle -> OutputBuffer -> STM ()
bufferOutputSTM' h (OutputBuffer newbuf) = do
(OutputBuffer buf) <- takeTMVar bv
putTMVar bv (OutputBuffer (newbuf ++ buf))
where
bv = bufferFor h
outputBufferWaiterSTM :: (OutputBuffer -> (OutputBuffer, OutputBuffer)) -> STM [(StdHandle, OutputBuffer)]
outputBufferWaiterSTM selector = do
bs <- forM hs $ \h -> do
let bv = bufferFor h
(selected, rest) <- selector <$> takeTMVar bv
putTMVar bv rest
return selected
if all (== OutputBuffer []) bs
then retry
else do
return (zip hs bs)
where
hs = [StdOut, StdErr]
waitAnyBuffer :: OutputBuffer -> (OutputBuffer, OutputBuffer)
waitAnyBuffer b = (b, OutputBuffer [])
waitCompleteLines :: OutputBuffer -> (OutputBuffer, OutputBuffer)
waitCompleteLines (OutputBuffer l) =
let (selected, rest) = span completeline l
in (OutputBuffer selected, OutputBuffer rest)
where
completeline (v@(InTempFile {})) = endsInNewLine v
completeline (Output b) = endsNewLine b
endsNewLine :: B.ByteString -> Bool
endsNewLine b = not (B.null b) && B.last b == fromIntegral (ord '\n')
emitOutputBuffer :: StdHandle -> OutputBuffer -> IO ()
emitOutputBuffer stdh (OutputBuffer l) =
forM_ (reverse l) $ \ba -> case ba of
Output b -> do
B.hPut outh b
hFlush outh
InTempFile tmp _ -> do
B.hPut outh =<< B.readFile tmp
void $ tryWhenExists $ removeFile tmp
where
outh = toHandle stdh