{-# OPTIONS -cpp #-} -- Haskell Ports Library: Process management -- -- Author : Manuel M T Chakravarty -- Created: 14 May 2001 -- -- Version $Revision: 1.12 $ from $Date: 2004/05/14 07:20:52 $ -- -- Copyright (c) [2001..2003] Manuel M T Chakravarty -- -- This file is free software; you can redistribute it and\/or modify -- it under the terms of the GNU General Public License as published by -- the Free Software Foundation; either version 2 of the License, or -- (at your option) any later version. -- -- This file is distributed in the hope that it will be useful, -- but WITHOUT ANY WARRANTY; without even the implied warranty of -- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -- GNU General Public License for more details. -- --- DESCRIPTION --------------------------------------------------------------- -- -- This modules provides support for handling child processes and their input -- and output streams. -- --- DOCU ---------------------------------------------------------------------- -- -- language: Haskell 98 + HsLibs POSIX library + Concurrency + Exceptions -- -- Process Proxies -- ~~~~~~~~~~~~~~~ -- -- Any child process is associated with a Haskell thread called its *process -- proxy*. The process proxy implements the conversion between streams (ie, -- lazy Haskell character lists) and the OS streams represented by the file -- descriptors for stdin, stdout & stderr. Moreover, the proxy acts as a -- supervisor to the process and as such handles termination of the process -- etc. Internally, the proxy may use a number of threads to implemented the -- overall proxy functionality. -- -- I/O & Threads -- ~~~~~~~~~~~~~ -- -- Currently, we rely on standard handle-based I/O to be thread friendly, ie, -- only to block individual threads and not the whole Haskell runtime. If -- this turns out to impose some overhead, we can always switch to directly -- operate on Posix file descriptors. -- --- TODO ---------------------------------------------------------------------- -- -- * `String' for representing input and output streams may not be a good -- choice in the presence of Unicode support; but Word8 seems to be a bit -- unwieldy... -- -- * possible error conditions have to be handled better -- -- * Benchmarking: how much throughput can we get with a char-by-char copy; -- how much better is reading in blocks and using `<==' to send; how much -- better would a `Port String' be? -- -- * An alternative design: instead of using streams for stdin, stdout, and -- stderr, a process provides a port for each. Piping one into another -- would, then, be simple port linking. Same for merging stderr streams. -- module Control.Concurrent.Processes ( Proc, ProcId, toProc, proc, procGrp, (>|), procStatus, procKill, procGrpKill ) where import Prelude hiding (catch) import Directory (setCurrentDirectory) import IO (Handle, BufferMode(..), hClose, hPutChar, hPutStr, hGetChar, isEOFError, hSetBuffering) import qualified IO (catch) import Maybe (isJust) import Monad (unless, when) import System (ExitCode(..)) import Control.Concurrent (ThreadId, forkIO, mergeIO, threadDelay) import Control.Exception (Exception(..), throw, throwIO, catch, finally) import System.Posix (ProcessID, FdOption(..), ProcessStatus(..), Signal, stdInput, stdOutput, stdError, forkProcess, executeFile, createPipe, fdToHandle, dupTo, setFdOption, closeFd, getProcessStatus, signalProcess, joinProcessGroup) import Control.Concurrent.PortsConfig import Control.Concurrent.SVars (V, newV, valV, (<<)) import Control.Concurrent.Ports (Port, newPort, openPort, closePort, listenToPort, listenToNewPort, (<--), (<==)) import Control.Concurrent.PortThreads (spawn_) infixl 1 >| -- processes & process creation -- ---------------------------- -- a process converts a stdin stream into a process id, a stdout stream, and a -- stderr stream (EXPORTED) -- type Proc = String -- stdin -> Port Char -- stdout -> Port Char -- stderr -> IO ProcId -- abstract process handle (EXPORTED ABSTRACTLY) -- -- * the unit in `NoProcId' and the exit code in `ProcId' are implicit -- synchronisation points; when they are available the process has -- completed (and won't touch any ports anymore) -- data ProcId = NoProcId () | ProcId ProcessID [ThreadId] ExitCode | PairId ProcId ProcId -- turn a stream-processor function into a process (EXPORTED) -- -- * the stream processor is executed asynchronously -- toProc :: (String -> String) -> Proc toProc f = \stdin stdout _ -> do sync <- newV forkIODebug "toProc" $ (stdout <== f stdin) `finally` (sync << ()) return $ NoProcId (valV sync) -- |Execute a sub-process asynchronously. -- -- * The sub-process reads stdin from a lazy list and writes stdout and stderr -- to a port. These two ports are closed when the stdout and stderr streams -- are closed. -- -- * The executable is searched for in the current PATH. -- proc :: FilePath -- name of executable -> [String] -- arguments -> Proc proc = procGen False -- |Execute a sub-process asynchronously in it's own process group. -- -- * The sub-process reads stdin from a lazy list and writes stdout and stderr -- to a port. These two ports are closed when the stdout and stderr streams -- are closed. -- -- * The executable is searched for in the current PATH. -- -- * The process group id is the same as the new process' process id. -- procGrp :: FilePath -- name of executable -> [String] -- arguments -> Proc procGrp = procGen True -- Execute a sub-process asynchronously (maybe in its own process group) -- -- * The sub-process reads stdin from a lazy list and writes stdout and stderr -- to a port. These two ports are closed when the stdout and stderr streams -- are closed. -- -- * The executable is searched for in the current PATH. -- procGen :: Bool -- put process into new process group? -> FilePath -- name of executable -> [String] -- arguments -> Proc procGen newGrp fname args = \stdin stdout stderr -> do -- -- Fork child process with redirected standard I/O streams -- logDebug $ "Processes.proc: Fork()ing `" ++ fname ++ "'" (pid, stdinWriteHandle, stdoutReadHandle, stderrReadHandle) <- forkExec fname args Nothing newGrp hSetBuffering stdinWriteHandle NoBuffering hSetBuffering stdoutReadHandle NoBuffering hSetBuffering stderrReadHandle NoBuffering logDebug $ "Processes.proc: Fork()ed `" ++ fname ++ "' with pipes" -- -- start the process proxies -- logDebug $ "Processes.proc: Starting proxies for `" ++ fname ++ "'" stdinProxy <- forkIODebug stdinDesc $ writer stdinWriteHandle stdin stdoutProxy <- forkIODebug stdoutDesc $ reader stdoutReadHandle stdout True -- stderrProxy <- forkIODebug stderrDesc $ reader stderrReadHandle stderr False stderrProxy <- forkIODebug stderrDesc $ reader stderrReadHandle stderr True -- -- obtain the process status -- vec <- newV forkIODebug ecDesc $ waitForExitCode pid vec -- return $ ProcId pid [stdinProxy, stdoutProxy, stderrProxy] (valV vec) -- FIXME: the threads ids aren't used yet and they are a potential risk given -- the current handling of TCBs by the storage manager return $ ProcId pid [] (valV vec) where writer hdl str = do hPutStr hdl str logDebug $ "Processes: `" ++ fname ++ "': Writer finished" hClose hdl -- reader hdl po close = let read = do c <- hGetChar hdl po <-- c read in read `catch` \e -> do unless (isEOFException e) $ throwIO e logDebug $ "Processes: `" ++ fname ++ "': A reader finished" when close $ closePort po -- FIXME: KLUDGE -- FIXME: For some strange reason, the stdout port -- is sometimes finalised to late, something -- must be hanging on to it. Having this -- explicit closePort is BAD, as it screws -- things when the same port is used for stdout -- and sterr and similar situations. where isEOFException (IOException ioe) = isEOFError ioe isEOFException _ = False -- -- FIXME: the following is awful and ugly, but doing it properly requires -- support for OS threads in the Haskell runtime waitForExitCode pid vec = do status <- getProcessStatus False False pid `IO.catch` \ioe -> do logDebug $ "Processes: `" ++ fname ++ "': Ooops - child vanished" return $ Just (Exited ExitSuccess) when (isJust status) $ logDebug $ "Processes: `" ++ fname ++ "': Terminated" case status of Nothing -> do threadDelay 10000 -- wait 10ms waitForExitCode pid vec Just (Exited ec ) -> vec << ec Just (Terminated sig) -> vec << ExitFailure (128 + fromIntegral sig) Just _ -> -- can't happen, as second argument = `False' error "Processes.proc: Stopped?" -- stdinDesc = "`" ++ fname ++ "' stdin writer" stdoutDesc = "`" ++ fname ++ "' stdout reader" stderrDesc = "`" ++ fname ++ "' stderr reader" ecDesc = "`" ++ fname ++ "' exit code poller" -- process combinators -- ------------------- -- pipes the output of one process into another; stderr is merged (EXPORTED) -- (>|) :: Proc -> Proc -> Proc p1 >| p2 = \stdin stdout stderr -> do (pipeIn, pipeOut) <- listenToNewPort undefined openPort stderr -- to account for the double usage pid1 <- p1 stdin pipeIn stderr pid2 <- p2 pipeOut stdout stderr return $ PairId pid1 pid2 {- This is problematic as p2 is fork()ed in a different thread and System.Posix.forkProcess has a problem with fork()ing in threads other than the main thread. p1 >| p2 = \stdin stdout stderr -> do pid2 <- newV pipeIn <- spawn_ $ \pipeOut -> p2 pipeOut stdout stderr >>= (pid2 <<) pid1 <- p1 stdin pipeIn stderr return $ PairId pid1 (valV pid2) -} -- |Process operators -- ------------------ -- |Returns the exit status of the given process (group). -- -- * Blocks until the value is available. -- -- * In case of a `NoProcId', the availability of the unit argument implements -- a synchronisation point. -- -- * In case of a process group, the status is `ExitSuccess' iff the exit -- status of all processes is `ExitSuccess'; otherwise, it is the failure -- status of any of the failed processes (it is, of course, always -- deterministically the same, which is picked). -- procStatus :: ProcId -> ExitCode procStatus (NoProcId ()) = ExitSuccess -- performs synchronisation procStatus (ProcId _ _ ec) = ec procStatus (PairId p1 p2 ) = case procStatus p1 of ExitSuccess -> procStatus p2 ec@(ExitFailure _) -> ec -- |Signal a process. -- procKill :: Signal -> ProcId -> IO () procKill sig (NoProcId _ ) = return () procKill sig (ProcId pid _ _) = signalProcess sig pid procKill sig (PairId p1 p2 ) = procKill sig p1 >> procKill sig p2 -- |Signal a process group. -- procGrpKill :: Signal -> ProcId -> IO () procGrpKill sig (NoProcId _ ) = return () procGrpKill sig (ProcId pid _ _) = signalProcess sig (-pid) procGrpKill sig (PairId p1 p2 ) = procKill sig p1 >> procKill sig p2 -- auxilliary routines -- ------------------- -- Fork a child process running the given executable on the given arguments -- and with the given environment -- -- * The standard input, output, and error streams of the child process are -- redirected to three Haskell file handles -- -- * The structure of this routine was inspired by David Sankel's -- `shell-haskell' code -- forkExec :: FilePath -- path of executable -> [String] -- command line arguments -> Maybe [(String, String)] -- environment -> Bool -- start a new process group? -> IO (ProcessID, -- child pid Handle, Handle, Handle) -- stdin, stdout, and stderr pipe hdls forkExec path args env newGrp = do (stdinReadFD , stdinWriteFD ) <- createPipe (stdoutReadFD, stdoutWriteFD) <- createPipe (stderrReadFD, stderrWriteFD) <- createPipe let child = do when newGrp $ joinProcessGroup 0 -- 0 means that grp id = pid dupTo stdinReadFD stdInput dupTo stdoutWriteFD stdOutput dupTo stderrWriteFD stdError mapM_ closeFd [stdinReadFD , stdinWriteFD , stdoutReadFD, stdoutWriteFD, stderrReadFD, stderrWriteFD] executeFile path True args env error "Processes.forkExec: executeFile: failed" let parent pid = do mapM_ closeFd [stdinReadFD, stdoutWriteFD, stderrWriteFD] mapM_ (\fd -> setFdOption fd CloseOnExec True) [stdinWriteFD, stdoutReadFD, stderrReadFD] stdinWriteHandle <- fdToHandle stdinWriteFD stdoutReadHandle <- fdToHandle stdoutReadFD stderrReadHandle <- fdToHandle stderrReadFD return (pid, stdinWriteHandle, stdoutReadHandle, stderrReadHandle) #if __GLASGOW_HASKELL__ >= 601 pid <- forkProcess child -- fork child parent pid -- and run parent code #else pid <- forkProcess case pid of Just pid -> parent pid Nothing -> child #endif -- auxilliary routines for debugging -- --------------------------------- -- a variant of `forkIO' that installs an exception sentry if compiled with -- debugging enabled -- forkIODebug :: String -> IO () -> IO ThreadId forkIODebug desc com = forkIO $ excSentry desc com -- report exceptions caught by the given computation -- -- * only in effect when compiled with debugging enabled -- excSentry :: String -- thread descriptor -> IO () -- computation to watch -> IO () {-# INLINE excSentry #-} excSentry desc com | not debug = com | otherwise = com `catch` logExc where logExc exc = do putStrLn $ "Processes: Thread \"" ++ desc ++ "\": " ++ show exc throw exc -- emit debugging message -- logDebug :: String -> IO () {-# INLINE logDebug #-} logDebug msg | not debug = return () | otherwise = putStrLn msg