module Control.Concurrent.Output (
withConcurrentOutput,
flushConcurrentOutput,
Outputable(..),
outputConcurrent,
createProcessConcurrent,
waitForProcessConcurrent,
lockOutput,
) where
import System.IO
import System.Posix.IO
import System.Directory
import System.Exit
import Control.Monad
import Control.Monad.IO.Class (liftIO, MonadIO)
import Control.Applicative
import System.IO.Unsafe (unsafePerformIO)
import Control.Concurrent
import Control.Concurrent.STM
import Control.Concurrent.Async
import Data.Maybe
import Data.List
import Data.Monoid
import qualified System.Process as P
import qualified Data.Set as S
import qualified Data.ByteString as B
import qualified Data.Text as T
import Data.Text.Encoding (encodeUtf8)
import Utility.Monad
import Utility.Exception
data OutputHandle = OutputHandle
{ outputLock :: TMVar Lock
, outputBuffer :: TMVar Buffer
, outputThreads :: TMVar (S.Set (Async ()))
}
data Lock = Locked
globalOutputHandle :: MVar OutputHandle
globalOutputHandle = unsafePerformIO $
newMVar =<< OutputHandle
<$> newEmptyTMVarIO
<*> newTMVarIO []
<*> newTMVarIO S.empty
getOutputHandle :: IO OutputHandle
getOutputHandle = readMVar globalOutputHandle
lockOutput :: (MonadIO m, MonadMask m) => m a -> m a
lockOutput = bracket_ (liftIO takeOutputLock) (liftIO dropOutputLock)
takeOutputLock :: IO ()
takeOutputLock = void $ takeOutputLock' True
tryTakeOutputLock :: IO Bool
tryTakeOutputLock = takeOutputLock' False
withLock :: (TMVar Lock -> STM a) -> IO a
withLock a = do
lck <- outputLock <$> getOutputHandle
atomically (a lck)
takeOutputLock' :: Bool -> IO Bool
takeOutputLock' block = do
locked <- withLock $ \l -> do
v <- tryTakeTMVar l
case v of
Just Locked
| block -> retry
| otherwise -> do
putTMVar l Locked
return False
Nothing -> do
putTMVar l Locked
return True
when locked $ do
bv <- outputBuffer <$> getOutputHandle
buf <- atomically $ swapTMVar bv []
emitBuffer stdout buf
return locked
dropOutputLock :: IO ()
dropOutputLock = withLock $ void . takeTMVar
withConcurrentOutput :: IO a -> IO a
withConcurrentOutput a = a `finally` flushConcurrentOutput
flushConcurrentOutput :: IO ()
flushConcurrentOutput = do
v <- outputThreads <$> getOutputHandle
atomically $ do
r <- takeTMVar v
if r == S.empty
then putTMVar v r
else retry
lockOutput $ return ()
class Outputable v where
toOutput :: v -> B.ByteString
instance Outputable B.ByteString where
toOutput = id
instance Outputable T.Text where
toOutput = encodeUtf8
instance Outputable String where
toOutput = toOutput . T.pack
outputConcurrent :: Outputable v => v -> IO ()
outputConcurrent v = bracket setup cleanup go
where
setup = tryTakeOutputLock
cleanup False = return ()
cleanup True = dropOutputLock
go True = do
B.hPut stdout (toOutput v)
hFlush stdout
go False = do
bv <- outputBuffer <$> getOutputHandle
oldbuf <- atomically $ takeTMVar bv
newbuf <- addBuffer (Output (toOutput v)) oldbuf
atomically $ putTMVar bv newbuf
waitForProcessConcurrent :: P.ProcessHandle -> IO ExitCode
waitForProcessConcurrent h = do
v <- tryWhenExists (P.waitForProcess h)
case v of
Just r -> return r
Nothing -> maybe (waitForProcessConcurrent h) return =<< P.getProcessExitCode h
createProcessConcurrent :: P.CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, P.ProcessHandle)
createProcessConcurrent p
| willOutput (P.std_out p) || willOutput (P.std_err p) =
ifM tryTakeOutputLock
( firstprocess
, concurrentprocess
)
| otherwise = P.createProcess p
where
rediroutput ss h
| willOutput ss = P.UseHandle h
| otherwise = ss
firstprocess = do
r@(_, _, _, h) <- P.createProcess p
`onException` dropOutputLock
void $ async $ do
void $ tryIO $ waitForProcessConcurrent h
dropOutputLock
return r
concurrentprocess = do
(toouth, fromouth) <- pipe
(toerrh, fromerrh) <- pipe
let p' = p
{ P.std_out = rediroutput (P.std_out p) toouth
, P.std_err = rediroutput (P.std_err p) toerrh
}
r <- P.createProcess p'
outbuf <- setupBuffer stdout toouth (P.std_out p) fromouth
errbuf <- setupBuffer stderr toerrh (P.std_err p) fromerrh
void $ async $ bufferWriter [outbuf, errbuf]
return r
pipe = do
(from, to) <- createPipe
(,) <$> fdToHandle to <*> fdToHandle from
willOutput :: P.StdStream -> Bool
willOutput P.Inherit = True
willOutput _ = False
type Buffer = [BufferedActivity]
data BufferedActivity
= ReachedEnd
| Output B.ByteString
| InTempFile FilePath
deriving (Eq)
setupBuffer :: Handle -> Handle -> P.StdStream -> Handle -> IO (Handle, MVar Buffer, TMVar ())
setupBuffer h toh ss fromh = do
hClose toh
buf <- newMVar []
bufsig <- atomically newEmptyTMVar
void $ async $ outputDrainer ss fromh buf bufsig
return (h, buf, bufsig)
outputDrainer :: P.StdStream -> Handle -> MVar Buffer -> TMVar () -> IO ()
outputDrainer ss fromh buf bufsig
| willOutput ss = go
| otherwise = atend
where
go = do
v <- tryIO $ B.hGetSome fromh 1048576
case v of
Right b | not (B.null b) -> do
modifyMVar_ buf $ addBuffer (Output b)
changed
go
_ -> atend
atend = do
modifyMVar_ buf $ pure . (ReachedEnd :)
changed
hClose fromh
changed = atomically $ do
void $ tryTakeTMVar bufsig
putTMVar bufsig ()
bufferWriter :: [(Handle, MVar Buffer, TMVar ())] -> IO ()
bufferWriter ts = do
worker <- async $ void $ lockOutput $ mapConcurrently go ts
v <- outputThreads <$> getOutputHandle
atomically $ do
s <- takeTMVar v
putTMVar v (S.insert worker s)
void $ async $ do
void $ waitCatch worker
atomically $ do
s <- takeTMVar v
putTMVar v (S.delete worker s)
where
go v@(outh, buf, bufsig) = do
void $ atomically $ takeTMVar bufsig
l <- takeMVar buf
putMVar buf []
emitBuffer outh l
if any (== ReachedEnd) l
then return ()
else go v
emitBuffer :: Handle -> Buffer -> IO ()
emitBuffer outh l = forM_ (reverse l) $ \ba -> case ba of
Output b -> do
B.hPut outh b
hFlush outh
InTempFile tmp -> do
B.hPut outh =<< B.readFile tmp
void $ tryWhenExists $ removeFile tmp
ReachedEnd -> return ()
addBuffer :: BufferedActivity -> Buffer -> IO Buffer
addBuffer (Output b) buf
| B.length b' <= 1048576 = return (Output b' : other)
| otherwise = do
tmpdir <- getTemporaryDirectory
(tmp, h) <- openTempFile tmpdir "output.tmp"
B.hPut h b'
hClose h
return (InTempFile tmp : other)
where
!b' = B.concat (mapMaybe getOutput this) <> b
!(this, other) = partition isOutput buf
isOutput v = case v of
Output _ -> True
_ -> False
getOutput v = case v of
Output b'' -> Just b''
_ -> Nothing
addBuffer v buf = return (v:buf)