{-# LANGUAGE BangPatterns, TypeSynonymInstances, FlexibleInstances, TupleSections #-}
{-# LANGUAGE CPP #-}
{-# OPTIONS_GHC -O2 #-}
{- Building this module with -O0 causes streams not to fuse and too much
 - memory to be used. -}

-- | 
-- Copyright: 2015 Joey Hess <id@joeyh.name>
-- License: BSD-2-clause
-- 
-- Concurrent output handling, internals.
--
-- May change at any time.

module System.Console.Concurrent.Internal where

import System.IO
#ifndef mingw32_HOST_OS
import System.Posix.IO
#endif
import System.Directory
import System.Exit
import Control.Monad
import Control.Monad.IO.Class (liftIO, MonadIO)
import System.IO.Unsafe (unsafePerformIO)
import Control.Concurrent
import Control.Concurrent.STM
import Control.Concurrent.Async
import Data.Maybe
import Data.List
import Data.Monoid
import qualified System.Process as P
import qualified Data.Text as T
import qualified Data.Text.IO as T
import Control.Applicative
import Prelude

import Utility.Monad
import Utility.Exception

data OutputHandle = OutputHandle
        { outputLock :: TMVar Lock
        , outputBuffer :: TMVar OutputBuffer
        , errorBuffer :: TMVar OutputBuffer
        , outputThreads :: TMVar Integer
        , processWaiters :: TMVar [Async ()]
        , waitForProcessLock :: TMVar ()
        }

data Lock = Locked

-- | A shared global variable for the OutputHandle.
{-# NOINLINE globalOutputHandle #-}
globalOutputHandle :: OutputHandle
globalOutputHandle = unsafePerformIO $ OutputHandle
        <$> newEmptyTMVarIO
        <*> newTMVarIO (OutputBuffer [])
        <*> newTMVarIO (OutputBuffer [])
        <*> newTMVarIO 0
        <*> newTMVarIO []
        <*> newEmptyTMVarIO

-- | Holds a lock while performing an action. This allows the action to
-- perform its own output to the console, without using functions from this
-- module.
--
-- While this is running, other threads that try to lockOutput will block.
-- Any calls to `outputConcurrent` and `createProcessConcurrent` will not
-- block, but the output will be buffered and displayed only once the
-- action is done.
lockOutput :: (MonadIO m, MonadMask m) => m a -> m a
lockOutput = bracket_ (liftIO takeOutputLock) (liftIO dropOutputLock)

-- | Blocks until we have the output lock.
takeOutputLock :: IO ()
takeOutputLock = void $ takeOutputLock' True

-- | Tries to take the output lock, without blocking.
tryTakeOutputLock :: IO Bool
tryTakeOutputLock = takeOutputLock' False

withLock :: (TMVar Lock -> STM a) -> IO a
withLock a = atomically $ a (outputLock globalOutputHandle)

takeOutputLock' :: Bool -> IO Bool
takeOutputLock' block = do
        locked <- withLock $ \l -> do
                v <- tryTakeTMVar l
                case v of
                        Just Locked
                                | block -> retry
                                | otherwise -> do
                                        -- Restore value we took.
                                        putTMVar l Locked
                                        return False
                        Nothing -> do
                                putTMVar l Locked
                                return True
        when locked $ do
                (outbuf, errbuf) <- atomically $ (,)
                        <$> swapTMVar (outputBuffer globalOutputHandle) (OutputBuffer [])
                        <*> swapTMVar (errorBuffer globalOutputHandle) (OutputBuffer [])
                emitOutputBuffer StdOut outbuf
                emitOutputBuffer StdErr errbuf
        return locked

-- | Only safe to call after taking the output lock.
dropOutputLock :: IO ()
dropOutputLock = withLock $ void . takeTMVar

-- | Use this around any actions that use `outputConcurrent`
-- or `createProcessConcurrent`
--
-- This is necessary to ensure that buffered concurrent output actually
-- gets displayed before the program exits.
withConcurrentOutput :: (MonadIO m, MonadMask m) => m a -> m a
withConcurrentOutput a = a `finally` liftIO flushConcurrentOutput

-- | Blocks until any processes started by `createProcessConcurrent` have
-- finished, and any buffered output is displayed. Also blocks while
-- `lockOutput` is is use.
--
-- `withConcurrentOutput` calls this at the end, so you do not normally
-- need to use this.
flushConcurrentOutput :: IO ()
flushConcurrentOutput = do
        atomically $ do
                r <- takeTMVar (outputThreads globalOutputHandle)
                if r <= 0
                        then putTMVar (outputThreads globalOutputHandle) r
                        else retry
        -- Take output lock to wait for anything else that might be
        -- currently generating output.
        lockOutput $ return ()

-- | Values that can be output.
class Outputable v where
        toOutput :: v -> T.Text

instance Outputable T.Text where
        toOutput = id

instance Outputable String where
        toOutput = toOutput . T.pack

-- | Displays a value to stdout.
--
-- No newline is appended to the value, so if you want a newline, be sure
-- to include it yourself.
--
-- Uses locking to ensure that the whole output occurs atomically
-- even when other threads are concurrently generating output.
--
-- When something else is writing to the console at the same time, this does
-- not block. It buffers the value, so it will be displayed once the other
-- writer is done.
outputConcurrent :: Outputable v => v -> IO ()
outputConcurrent = outputConcurrent' StdOut

-- | Like `outputConcurrent`, but displays to stderr.
--
-- (Does not throw an exception.)
errorConcurrent :: Outputable v => v -> IO ()
errorConcurrent = outputConcurrent' StdErr

outputConcurrent' :: Outputable v => StdHandle -> v -> IO ()
outputConcurrent' stdh v = bracket setup cleanup go
  where
        setup = tryTakeOutputLock
        cleanup False = return ()
        cleanup True = dropOutputLock
        go True = do
                T.hPutStr h (toOutput v)
                hFlush h
        go False = do
                oldbuf <- atomically $ takeTMVar bv
                newbuf <- addOutputBuffer (Output (toOutput v)) oldbuf
                atomically $ putTMVar bv newbuf
        h = toHandle stdh
        bv = bufferFor stdh

newtype ConcurrentProcessHandle = ConcurrentProcessHandle P.ProcessHandle

toConcurrentProcessHandle :: (Maybe Handle, Maybe Handle, Maybe Handle, P.ProcessHandle) -> (Maybe Handle, Maybe Handle, Maybe Handle, ConcurrentProcessHandle)
toConcurrentProcessHandle (i, o, e, h) = (i, o, e, ConcurrentProcessHandle h)

-- | Use this to wait for processes started with 
-- `createProcessConcurrent` and `createProcessForeground`, and get their
-- exit status.
--
-- Note that such processes are actually automatically waited for
-- internally, so not calling this explicitly will not result
-- in zombie processes. This behavior differs from `P.waitForProcess`
waitForProcessConcurrent :: ConcurrentProcessHandle -> IO ExitCode
waitForProcessConcurrent (ConcurrentProcessHandle h) =
        bracket lock unlock checkexit
  where
        lck = waitForProcessLock globalOutputHandle
        lock = atomically $ tryPutTMVar lck ()
        unlock True = atomically $ takeTMVar lck
        unlock False = return ()
        checkexit locked = maybe (waitsome locked) return
                =<< P.getProcessExitCode h
        waitsome True = do
                let v = processWaiters globalOutputHandle
                l <- atomically $ readTMVar v
                if null l
                        -- Avoid waitAny [] which blocks forever
                        then P.waitForProcess h
                        else do
                                -- Wait for any of the running
                                -- processes to exit. It may or may not
                                -- be the one corresponding to the
                                -- ProcessHandle. If it is,
                                -- getProcessExitCode will succeed.
                                void $ tryIO $ waitAny l
                                checkexit True
        waitsome False = do
                -- Another thread took the lck first. Wait for that thread to
                -- wait for one of the running processes to exit.
                atomically $ do
                        putTMVar lck ()
                        takeTMVar lck
                checkexit False

-- Registers an action that waits for a process to exit,
-- adding it to the processWaiters list, and removing it once the action
-- completes.
asyncProcessWaiter :: IO () -> IO ()
asyncProcessWaiter waitaction = do
        regdone <- newEmptyTMVarIO
        waiter <- async $ do
                self <- atomically (takeTMVar regdone)
                waitaction `finally` unregister self
        register waiter regdone
  where
        v = processWaiters globalOutputHandle
        register waiter regdone = atomically $ do
                l <- takeTMVar v
                putTMVar v (waiter:l)
                putTMVar regdone waiter
        unregister waiter = atomically $ do
                l <- takeTMVar v
                putTMVar v (filter (/= waiter) l)

-- | Wrapper around `System.Process.createProcess` that prevents 
-- multiple processes that are running concurrently from writing
-- to stdout/stderr at the same time.
--
-- If the process does not output to stdout or stderr, it's run
-- by createProcess entirely as usual. Only processes that can generate
-- output are handled specially:
--
-- A process is allowed to write to stdout and stderr in the usual
-- way, assuming it can successfully take the output lock.
--
-- When the output lock is held (ie, by another concurrent process,
-- or because `outputConcurrent` is being called at the same time),
-- the process is instead run with its stdout and stderr
-- redirected to a buffer. The buffered output will be displayed as soon
-- as the output lock becomes free.
--
-- Currently only available on Unix systems, not Windows.
#ifndef mingw32_HOST_OS
createProcessConcurrent :: P.CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, ConcurrentProcessHandle)
createProcessConcurrent p
        | willOutput (P.std_out p) || willOutput (P.std_err p) =
                ifM tryTakeOutputLock
                        ( fgProcess p
                        , bgProcess p
                        )
        | otherwise = do
                r@(_, _, _, h) <- P.createProcess p
                asyncProcessWaiter $
                        void $ tryIO $ P.waitForProcess h
                return (toConcurrentProcessHandle r)
#endif

-- | Wrapper around `System.Process.createProcess` that makes sure a process
-- is run in the foreground, with direct access to stdout and stderr.
-- Useful when eg, running an interactive process.
createProcessForeground :: P.CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, ConcurrentProcessHandle)
createProcessForeground p = do
        takeOutputLock
        fgProcess p

fgProcess :: P.CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, ConcurrentProcessHandle)
fgProcess p = do
        r@(_, _, _, h) <- P.createProcess p
                `onException` dropOutputLock
        registerOutputThread
        -- Wait for the process to exit and drop the lock.
        asyncProcessWaiter $ do
                void $ tryIO $ P.waitForProcess h
                unregisterOutputThread
                dropOutputLock
        return (toConcurrentProcessHandle r)

#ifndef mingw32_HOST_OS
bgProcess :: P.CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, ConcurrentProcessHandle)
bgProcess p = do
        (toouth, fromouth) <- pipe
        (toerrh, fromerrh) <- pipe
        let p' = p
                { P.std_out = rediroutput (P.std_out p) toouth
                , P.std_err = rediroutput (P.std_err p) toerrh
                }
        registerOutputThread
        r@(_, _, _, h) <- P.createProcess p'
                `onException` unregisterOutputThread
        asyncProcessWaiter $ void $ tryIO $ P.waitForProcess h
        outbuf <- setupOutputBuffer StdOut toouth (P.std_out p) fromouth
        errbuf <- setupOutputBuffer StdErr toerrh (P.std_err p) fromerrh
        void $ async $ bufferWriter [outbuf, errbuf]
        return (toConcurrentProcessHandle r)
  where
        pipe = do
                (from, to) <- createPipe
                (,) <$> fdToHandle to <*> fdToHandle from
        rediroutput ss h
                | willOutput ss = P.UseHandle h
                | otherwise = ss
#endif

willOutput :: P.StdStream -> Bool
willOutput P.Inherit = True
willOutput _ = False

-- | Buffered output.
data OutputBuffer = OutputBuffer [OutputBufferedActivity]
        deriving (Eq)

data StdHandle = StdOut | StdErr

toHandle :: StdHandle -> Handle
toHandle StdOut = stdout
toHandle StdErr = stderr

bufferFor :: StdHandle -> TMVar OutputBuffer
bufferFor StdOut = outputBuffer globalOutputHandle
bufferFor StdErr = errorBuffer globalOutputHandle

data OutputBufferedActivity
        = Output T.Text
        | InTempFile
                { tempFile :: FilePath
                , endsInNewLine :: Bool
                }
        deriving (Eq)

data AtEnd = AtEnd
        deriving Eq

data BufSig = BufSig

setupOutputBuffer :: StdHandle -> Handle -> P.StdStream -> Handle -> IO (StdHandle, MVar OutputBuffer, TMVar BufSig, TMVar AtEnd)
setupOutputBuffer h toh ss fromh = do
        hClose toh
        buf <- newMVar (OutputBuffer [])
        bufsig <- atomically newEmptyTMVar
        bufend <- atomically newEmptyTMVar
        void $ async $ outputDrainer ss fromh buf bufsig bufend
        return (h, buf, bufsig, bufend)

-- Drain output from the handle, and buffer it.
outputDrainer :: P.StdStream -> Handle -> MVar OutputBuffer -> TMVar BufSig -> TMVar AtEnd -> IO ()
outputDrainer ss fromh buf bufsig bufend
        | willOutput ss = go
        | otherwise = atend
  where
        go = do
                t <- T.hGetChunk fromh
                if T.null t
                        then atend
                        else do
                                modifyMVar_ buf $ addOutputBuffer (Output t)
                                changed
                                go
        atend = do
                atomically $ putTMVar bufend AtEnd
                hClose fromh
        changed = atomically $ do
                void $ tryTakeTMVar bufsig
                putTMVar bufsig BufSig

registerOutputThread :: IO ()
registerOutputThread = do
        let v = outputThreads globalOutputHandle
        atomically $ putTMVar v . succ =<< takeTMVar v

unregisterOutputThread :: IO ()
unregisterOutputThread = do
        let v = outputThreads globalOutputHandle
        atomically $ putTMVar v . pred =<< takeTMVar v

-- Wait to lock output, and once we can, display everything 
-- that's put into the buffers, until the end.
--
-- If end is reached before lock is taken, instead add the command's
-- buffers to the global outputBuffer and errorBuffer.
bufferWriter :: [(StdHandle, MVar OutputBuffer, TMVar BufSig, TMVar AtEnd)] -> IO ()
bufferWriter ts = do
        activitysig <- atomically newEmptyTMVar
        worker1 <- async $ lockOutput $
                ifM (atomically $ tryPutTMVar activitysig ())
                        ( void $ mapConcurrently displaybuf ts
                        , noop -- buffers already moved to global
                        )
        worker2 <- async $ void $ globalbuf activitysig worker1
        void $ async $ do
                void $ waitCatch worker1
                void $ waitCatch worker2
                unregisterOutputThread
  where
        displaybuf v@(outh, buf, bufsig, bufend) = do
                change <- atomically $
                        (Right <$> takeTMVar bufsig)
                                `orElse`
                        (Left <$> takeTMVar bufend)
                l <- takeMVar buf
                putMVar buf (OutputBuffer [])
                emitOutputBuffer outh l
                case change of
                        Right BufSig -> displaybuf v
                        Left AtEnd -> return ()
        globalbuf activitysig worker1 = do
                ok <- atomically $ do
                        -- signal we're going to handle it
                        -- (returns false if the displaybuf already did)
                        ok <- tryPutTMVar activitysig ()
                        -- wait for end of all buffers
                        when ok $
                                mapM_ (\(_outh, _buf, _bufsig, bufend) -> takeTMVar bufend) ts
                        return ok
                when ok $ do
                        -- add all of the command's buffered output to the
                        -- global output buffer, atomically
                        bs <- forM ts $ \(outh, buf, _bufsig, _bufend) ->
                                (outh,) <$> takeMVar buf
                        atomically $
                                forM_ bs $ \(outh, b) ->
                                        bufferOutputSTM' outh b
                        -- worker1 might be blocked waiting for the output
                        -- lock, and we've already done its job, so cancel it
                        cancel worker1

-- Adds a value to the OutputBuffer. When adding Output to a Handle,
-- it's cheaper to combine it with any already buffered Output to that
-- same Handle.
--
-- When the total buffered Output exceeds 1 mb in size, it's moved out of
-- memory, to a temp file. This should only happen rarely, but is done to
-- avoid some verbose process unexpectedly causing excessive memory use.
addOutputBuffer :: OutputBufferedActivity -> OutputBuffer -> IO OutputBuffer
addOutputBuffer (Output t) (OutputBuffer buf)
        | T.length t' <= 1048576 = return $ OutputBuffer (Output t' : other)
        | otherwise = do
                tmpdir <- getTemporaryDirectory
                (tmp, h) <- openTempFile tmpdir "output.tmp"
                let !endnl = endsNewLine t'
                let i = InTempFile
                        { tempFile = tmp
                        , endsInNewLine = endnl
                        }
                T.hPutStr h t'
                hClose h
                return $ OutputBuffer (i : other)
  where
        !t' = T.concat (mapMaybe getOutput this) <> t
        !(this, other) = partition isOutput buf
        isOutput v = case v of
                Output _ -> True
                _ -> False
        getOutput v = case v of
                Output t'' -> Just t''
                _ -> Nothing
addOutputBuffer v (OutputBuffer buf) = return $ OutputBuffer (v:buf)

-- | Adds a value to the output buffer for later display.
--
-- Note that buffering large quantities of data this way will keep it
-- resident in memory until it can be displayed. While `outputConcurrent`
-- uses temp files if the buffer gets too big, this STM function cannot do
-- so.
bufferOutputSTM :: Outputable v => StdHandle -> v -> STM ()
bufferOutputSTM h v = bufferOutputSTM' h (OutputBuffer [Output (toOutput v)])

bufferOutputSTM' :: StdHandle -> OutputBuffer -> STM ()
bufferOutputSTM' h (OutputBuffer newbuf) = do
        (OutputBuffer buf) <- takeTMVar bv
        putTMVar bv (OutputBuffer (newbuf ++ buf))
  where
        bv = bufferFor h

-- | A STM action that waits for some buffered output to become
-- available, and returns it.
--
-- The function can select a subset of output when only some is desired;
-- the fst part is returned and the snd is left in the buffer.
--
-- This will prevent it from being displayed in the usual way, so you'll
-- need to use `emitOutputBuffer` to display it yourself.
outputBufferWaiterSTM :: (OutputBuffer -> (OutputBuffer, OutputBuffer)) -> STM (StdHandle, OutputBuffer)
outputBufferWaiterSTM selector = waitgetbuf StdOut `orElse` waitgetbuf StdErr
  where
        waitgetbuf h = do
                let bv = bufferFor h
                (selected, rest) <- selector <$> takeTMVar bv
                when (selected == OutputBuffer [])
                        retry
                putTMVar bv rest
                return (h, selected)

waitAnyBuffer :: OutputBuffer -> (OutputBuffer, OutputBuffer)
waitAnyBuffer b = (b, OutputBuffer [])

-- | Use with `outputBufferWaiterSTM` to make it only return buffered
-- output that ends with a newline. Anything buffered without a newline
-- is left in the buffer.
waitCompleteLines :: OutputBuffer -> (OutputBuffer, OutputBuffer)
waitCompleteLines (OutputBuffer l) =
        let (selected, rest) = span completeline l
        in (OutputBuffer selected, OutputBuffer rest)
  where
        completeline (v@(InTempFile {})) = endsInNewLine v
        completeline (Output b) = endsNewLine b

endsNewLine :: T.Text -> Bool
endsNewLine t = not (T.null t) && T.last t == '\n'

-- | Emits the content of the OutputBuffer to the Handle
--
-- If you use this, you should use `lockOutput` to ensure you're the only
-- thread writing to the console.
emitOutputBuffer :: StdHandle -> OutputBuffer -> IO ()
emitOutputBuffer stdh (OutputBuffer l) =
        forM_ (reverse l) $ \ba -> case ba of
                Output t -> emit t
                InTempFile tmp _ -> do
                        emit =<< T.readFile tmp
                        void $ tryWhenExists $ removeFile tmp
  where
        outh = toHandle stdh
        emit t = void $ tryIO $ do
                T.hPutStr outh t
                hFlush outh