{-# LANGUAGE BangPatterns, TypeSynonymInstances, FlexibleInstances, TupleSections #-}
{-# LANGUAGE CPP #-}
{-# OPTIONS_GHC -O2 #-}
module System.Console.Concurrent.Internal where
import System.IO
#ifndef mingw32_HOST_OS
import System.Posix.IO
#endif
import System.Directory
import System.Exit
import Control.Monad
import Control.Monad.IO.Class (liftIO, MonadIO)
import System.IO.Unsafe (unsafePerformIO)
import Control.Concurrent
import Control.Concurrent.STM
import Control.Concurrent.Async
import Data.Maybe
import Data.List
import Data.Monoid
import qualified System.Process as P
import qualified Data.Text as T
import qualified Data.Text.IO as T
import Control.Applicative
import Prelude
import Utility.Monad
import Utility.Exception
data OutputHandle = OutputHandle
{ outputLock :: TMVar Lock
, outputBuffer :: TMVar OutputBuffer
, errorBuffer :: TMVar OutputBuffer
, outputThreads :: TMVar Integer
, processWaiters :: TMVar [Async ()]
, waitForProcessLock :: TMVar ()
}
data Lock = Locked
{-# NOINLINE globalOutputHandle #-}
globalOutputHandle :: OutputHandle
globalOutputHandle = unsafePerformIO $ OutputHandle
<$> newEmptyTMVarIO
<*> newTMVarIO (OutputBuffer [])
<*> newTMVarIO (OutputBuffer [])
<*> newTMVarIO 0
<*> newTMVarIO []
<*> newEmptyTMVarIO
lockOutput :: (MonadIO m, MonadMask m) => m a -> m a
lockOutput = bracket_ (liftIO takeOutputLock) (liftIO dropOutputLock)
takeOutputLock :: IO ()
takeOutputLock = void $ takeOutputLock' True
tryTakeOutputLock :: IO Bool
tryTakeOutputLock = takeOutputLock' False
withLock :: (TMVar Lock -> STM a) -> IO a
withLock a = atomically $ a (outputLock globalOutputHandle)
takeOutputLock' :: Bool -> IO Bool
takeOutputLock' block = do
locked <- withLock $ \l -> do
v <- tryTakeTMVar l
case v of
Just Locked
| block -> retry
| otherwise -> do
putTMVar l Locked
return False
Nothing -> do
putTMVar l Locked
return True
when locked $ do
(outbuf, errbuf) <- atomically $ (,)
<$> swapTMVar (outputBuffer globalOutputHandle) (OutputBuffer [])
<*> swapTMVar (errorBuffer globalOutputHandle) (OutputBuffer [])
emitOutputBuffer StdOut outbuf
emitOutputBuffer StdErr errbuf
return locked
dropOutputLock :: IO ()
dropOutputLock = withLock $ void . takeTMVar
withConcurrentOutput :: (MonadIO m, MonadMask m) => m a -> m a
withConcurrentOutput a = a `finally` liftIO flushConcurrentOutput
flushConcurrentOutput :: IO ()
flushConcurrentOutput = do
atomically $ do
r <- takeTMVar (outputThreads globalOutputHandle)
if r <= 0
then putTMVar (outputThreads globalOutputHandle) r
else retry
lockOutput $ return ()
class Outputable v where
toOutput :: v -> T.Text
instance Outputable T.Text where
toOutput = id
instance Outputable String where
toOutput = toOutput . T.pack
outputConcurrent :: Outputable v => v -> IO ()
outputConcurrent = outputConcurrent' StdOut
errorConcurrent :: Outputable v => v -> IO ()
errorConcurrent = outputConcurrent' StdErr
outputConcurrent' :: Outputable v => StdHandle -> v -> IO ()
outputConcurrent' stdh v = bracket setup cleanup go
where
setup = tryTakeOutputLock
cleanup False = return ()
cleanup True = dropOutputLock
go True = do
T.hPutStr h (toOutput v)
hFlush h
go False = do
oldbuf <- atomically $ takeTMVar bv
newbuf <- addOutputBuffer (Output (toOutput v)) oldbuf
atomically $ putTMVar bv newbuf
h = toHandle stdh
bv = bufferFor stdh
newtype ConcurrentProcessHandle = ConcurrentProcessHandle P.ProcessHandle
toConcurrentProcessHandle :: (Maybe Handle, Maybe Handle, Maybe Handle, P.ProcessHandle) -> (Maybe Handle, Maybe Handle, Maybe Handle, ConcurrentProcessHandle)
toConcurrentProcessHandle (i, o, e, h) = (i, o, e, ConcurrentProcessHandle h)
waitForProcessConcurrent :: ConcurrentProcessHandle -> IO ExitCode
waitForProcessConcurrent (ConcurrentProcessHandle h) =
bracket lock unlock checkexit
where
lck = waitForProcessLock globalOutputHandle
lock = atomically $ tryPutTMVar lck ()
unlock True = atomically $ takeTMVar lck
unlock False = return ()
checkexit locked = maybe (waitsome locked) return
=<< P.getProcessExitCode h
waitsome True = do
let v = processWaiters globalOutputHandle
l <- atomically $ readTMVar v
if null l
then P.waitForProcess h
else do
void $ tryIO $ waitAny l
checkexit True
waitsome False = do
atomically $ do
putTMVar lck ()
takeTMVar lck
checkexit False
asyncProcessWaiter :: IO () -> IO ()
asyncProcessWaiter waitaction = do
regdone <- newEmptyTMVarIO
waiter <- async $ do
self <- atomically (takeTMVar regdone)
waitaction `finally` unregister self
register waiter regdone
where
v = processWaiters globalOutputHandle
register waiter regdone = atomically $ do
l <- takeTMVar v
putTMVar v (waiter:l)
putTMVar regdone waiter
unregister waiter = atomically $ do
l <- takeTMVar v
putTMVar v (filter (/= waiter) l)
#ifndef mingw32_HOST_OS
createProcessConcurrent :: P.CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, ConcurrentProcessHandle)
createProcessConcurrent p
| willOutput (P.std_out p) || willOutput (P.std_err p) =
ifM tryTakeOutputLock
( fgProcess p
, bgProcess p
)
| otherwise = do
r@(_, _, _, h) <- P.createProcess p
asyncProcessWaiter $
void $ tryIO $ P.waitForProcess h
return (toConcurrentProcessHandle r)
#endif
createProcessForeground :: P.CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, ConcurrentProcessHandle)
createProcessForeground p = do
takeOutputLock
fgProcess p
fgProcess :: P.CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, ConcurrentProcessHandle)
fgProcess p = do
r@(_, _, _, h) <- P.createProcess p
`onException` dropOutputLock
registerOutputThread
asyncProcessWaiter $ do
void $ tryIO $ P.waitForProcess h
unregisterOutputThread
dropOutputLock
return (toConcurrentProcessHandle r)
#ifndef mingw32_HOST_OS
bgProcess :: P.CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, ConcurrentProcessHandle)
bgProcess p = do
(toouth, fromouth) <- pipe
(toerrh, fromerrh) <- pipe
let p' = p
{ P.std_out = rediroutput (P.std_out p) toouth
, P.std_err = rediroutput (P.std_err p) toerrh
}
registerOutputThread
r@(_, _, _, h) <- P.createProcess p'
`onException` unregisterOutputThread
asyncProcessWaiter $ void $ tryIO $ P.waitForProcess h
outbuf <- setupOutputBuffer StdOut toouth (P.std_out p) fromouth
errbuf <- setupOutputBuffer StdErr toerrh (P.std_err p) fromerrh
void $ async $ bufferWriter [outbuf, errbuf]
return (toConcurrentProcessHandle r)
where
pipe = do
(from, to) <- createPipe
(,) <$> fdToHandle to <*> fdToHandle from
rediroutput ss h
| willOutput ss = P.UseHandle h
| otherwise = ss
#endif
willOutput :: P.StdStream -> Bool
willOutput P.Inherit = True
willOutput _ = False
data OutputBuffer = OutputBuffer [OutputBufferedActivity]
deriving (Eq)
data StdHandle = StdOut | StdErr
toHandle :: StdHandle -> Handle
toHandle StdOut = stdout
toHandle StdErr = stderr
bufferFor :: StdHandle -> TMVar OutputBuffer
bufferFor StdOut = outputBuffer globalOutputHandle
bufferFor StdErr = errorBuffer globalOutputHandle
data OutputBufferedActivity
= Output T.Text
| InTempFile
{ tempFile :: FilePath
, endsInNewLine :: Bool
}
deriving (Eq)
data AtEnd = AtEnd
deriving Eq
data BufSig = BufSig
setupOutputBuffer :: StdHandle -> Handle -> P.StdStream -> Handle -> IO (StdHandle, MVar OutputBuffer, TMVar BufSig, TMVar AtEnd)
setupOutputBuffer h toh ss fromh = do
hClose toh
buf <- newMVar (OutputBuffer [])
bufsig <- atomically newEmptyTMVar
bufend <- atomically newEmptyTMVar
void $ async $ outputDrainer ss fromh buf bufsig bufend
return (h, buf, bufsig, bufend)
outputDrainer :: P.StdStream -> Handle -> MVar OutputBuffer -> TMVar BufSig -> TMVar AtEnd -> IO ()
outputDrainer ss fromh buf bufsig bufend
| willOutput ss = go
| otherwise = atend
where
go = do
t <- T.hGetChunk fromh
if T.null t
then atend
else do
modifyMVar_ buf $ addOutputBuffer (Output t)
changed
go
atend = do
atomically $ putTMVar bufend AtEnd
hClose fromh
changed = atomically $ do
void $ tryTakeTMVar bufsig
putTMVar bufsig BufSig
registerOutputThread :: IO ()
registerOutputThread = do
let v = outputThreads globalOutputHandle
atomically $ putTMVar v . succ =<< takeTMVar v
unregisterOutputThread :: IO ()
unregisterOutputThread = do
let v = outputThreads globalOutputHandle
atomically $ putTMVar v . pred =<< takeTMVar v
bufferWriter :: [(StdHandle, MVar OutputBuffer, TMVar BufSig, TMVar AtEnd)] -> IO ()
bufferWriter ts = do
activitysig <- atomically newEmptyTMVar
worker1 <- async $ lockOutput $
ifM (atomically $ tryPutTMVar activitysig ())
( void $ mapConcurrently displaybuf ts
, noop
)
worker2 <- async $ void $ globalbuf activitysig worker1
void $ async $ do
void $ waitCatch worker1
void $ waitCatch worker2
unregisterOutputThread
where
displaybuf v@(outh, buf, bufsig, bufend) = do
change <- atomically $
(Right <$> takeTMVar bufsig)
`orElse`
(Left <$> takeTMVar bufend)
l <- takeMVar buf
putMVar buf (OutputBuffer [])
emitOutputBuffer outh l
case change of
Right BufSig -> displaybuf v
Left AtEnd -> return ()
globalbuf activitysig worker1 = do
ok <- atomically $ do
ok <- tryPutTMVar activitysig ()
when ok $
mapM_ (\(_outh, _buf, _bufsig, bufend) -> takeTMVar bufend) ts
return ok
when ok $ do
bs <- forM ts $ \(outh, buf, _bufsig, _bufend) ->
(outh,) <$> takeMVar buf
atomically $
forM_ bs $ \(outh, b) ->
bufferOutputSTM' outh b
cancel worker1
addOutputBuffer :: OutputBufferedActivity -> OutputBuffer -> IO OutputBuffer
addOutputBuffer (Output t) (OutputBuffer buf)
| T.length t' <= 1048576 = return $ OutputBuffer (Output t' : other)
| otherwise = do
tmpdir <- getTemporaryDirectory
(tmp, h) <- openTempFile tmpdir "output.tmp"
let !endnl = endsNewLine t'
let i = InTempFile
{ tempFile = tmp
, endsInNewLine = endnl
}
T.hPutStr h t'
hClose h
return $ OutputBuffer (i : other)
where
!t' = T.concat (mapMaybe getOutput this) <> t
!(this, other) = partition isOutput buf
isOutput v = case v of
Output _ -> True
_ -> False
getOutput v = case v of
Output t'' -> Just t''
_ -> Nothing
addOutputBuffer v (OutputBuffer buf) = return $ OutputBuffer (v:buf)
bufferOutputSTM :: Outputable v => StdHandle -> v -> STM ()
bufferOutputSTM h v = bufferOutputSTM' h (OutputBuffer [Output (toOutput v)])
bufferOutputSTM' :: StdHandle -> OutputBuffer -> STM ()
bufferOutputSTM' h (OutputBuffer newbuf) = do
(OutputBuffer buf) <- takeTMVar bv
putTMVar bv (OutputBuffer (newbuf ++ buf))
where
bv = bufferFor h
outputBufferWaiterSTM :: (OutputBuffer -> (OutputBuffer, OutputBuffer)) -> STM (StdHandle, OutputBuffer)
outputBufferWaiterSTM selector = waitgetbuf StdOut `orElse` waitgetbuf StdErr
where
waitgetbuf h = do
let bv = bufferFor h
(selected, rest) <- selector <$> takeTMVar bv
when (selected == OutputBuffer [])
retry
putTMVar bv rest
return (h, selected)
waitAnyBuffer :: OutputBuffer -> (OutputBuffer, OutputBuffer)
waitAnyBuffer b = (b, OutputBuffer [])
waitCompleteLines :: OutputBuffer -> (OutputBuffer, OutputBuffer)
waitCompleteLines (OutputBuffer l) =
let (selected, rest) = span completeline l
in (OutputBuffer selected, OutputBuffer rest)
where
completeline (v@(InTempFile {})) = endsInNewLine v
completeline (Output b) = endsNewLine b
endsNewLine :: T.Text -> Bool
endsNewLine t = not (T.null t) && T.last t == '\n'
emitOutputBuffer :: StdHandle -> OutputBuffer -> IO ()
emitOutputBuffer stdh (OutputBuffer l) =
forM_ (reverse l) $ \ba -> case ba of
Output t -> emit t
InTempFile tmp _ -> do
emit =<< T.readFile tmp
void $ tryWhenExists $ removeFile tmp
where
outh = toHandle stdh
emit t = void $ tryIO $ do
T.hPutStr outh t
hFlush outh