module System.Process.Read.Chunks (
NonBlocking(..),
Output(..),
foldOutput,
foldOutputsL,
foldOutputsR,
readProcessChunks,
readProcessChunks'
) where
import Control.Applicative ((<$>))
import Control.Concurrent (forkIO, threadDelay, MVar, newEmptyMVar, putMVar, takeMVar)
import Control.DeepSeq (NFData)
import Control.Exception as E (onException, catch, mask, try, throwIO, SomeException)
import Control.Monad (unless)
import qualified Data.ByteString as B
import qualified Data.ByteString.Lazy as L
import Data.ListLike (ListLike(..), ListLikeIO(..))
import Data.Word (Word8)
import qualified GHC.IO.Exception as E
import GHC.IO.Exception (IOErrorType(ResourceVanished), IOException(ioe_type))
import Prelude hiding (null, length, rem)
import qualified Prelude
import System.Exit (ExitCode)
import System.IO hiding (hPutStr, hGetContents)
import System.IO.Error (mkIOError)
import System.Process.Read (ListLikePlus(..))
import System.IO.Unsafe (unsafeInterleaveIO)
import System.Process (CreateProcess(..), StdStream(CreatePipe), ProcessHandle,
createProcess, waitForProcess, terminateProcess)
instance NFData ExitCode
class ListLikePlus a c => NonBlocking a c where
hGetSome :: Handle -> LengthType a -> IO a
toChunks :: a -> [a]
data Output a = Stdout a | Stderr a | Result ExitCode | Exception IOError deriving (Eq, Show)
foldOutput :: ListLikePlus a c =>
(ExitCode -> b)
-> (a -> b)
-> (a -> b)
-> (IOError -> b)
-> Output a
-> b
foldOutput codefn _ _ _ (Result code) = codefn code
foldOutput _ outfn _ _ (Stdout out) = outfn out
foldOutput _ _ errfn _ (Stderr err) = errfn err
foldOutput _ _ _ exnfn (Exception exn) = exnfn exn
foldOutputsL :: ListLikePlus a c =>
(b -> ExitCode -> b)
-> (b -> a -> b)
-> (b -> a -> b)
-> (b -> IOError -> b)
-> b
-> [Output a]
-> b
foldOutputsL _ _ _ _ result [] = result
foldOutputsL codefn outfn errfn exnfn result (x : xs) =
let result' = foldOutput (codefn result) (outfn result) (errfn result) (exnfn result) x in
foldOutputsL codefn outfn errfn exnfn result' xs
foldOutputsR :: forall a b c. ListLikePlus a c =>
(b -> ExitCode -> b)
-> (b -> a -> b)
-> (b -> a -> b)
-> (b -> IOError -> b)
-> b
-> [Output a]
-> b
foldOutputsR _ _ _ _ result [] = result
foldOutputsR codefn outfn errfn exnfn result (x : xs) =
let result' = foldOutputsR codefn outfn errfn exnfn result xs in
foldOutput (codefn result') (outfn result') (errfn result') (exnfn result') x
readProcessChunks :: (NonBlocking a c) =>
CreateProcess
-> a
-> IO [Output a]
readProcessChunks p input = mask $ \ restore -> do
(Just inh, Just outh, Just errh, pid) <-
createProcess (p {std_in = CreatePipe, std_out = CreatePipe, std_err = CreatePipe })
binary input [inh, outh, errh]
flip onException
(do hClose inh; hClose outh; hClose errh;
terminateProcess pid; waitForProcess pid) $ restore $ do
waitOut <- forkWait $ elements pid (Just outh) (Just errh)
(do unless (null input) (hPutStr inh input >> hFlush inh)
hClose inh) `E.catch` resourceVanished (\ _e -> return ())
waitOut
elements :: NonBlocking a c => ProcessHandle -> Maybe Handle -> Maybe Handle -> IO [Output a]
elements pid outh errh =
do case (outh, errh) of
(Nothing, Nothing) ->
do result <- waitForProcess pid
return [Result result]
_ ->
do (outh', errh', elems') <- ready uSecs outh errh
more <- unsafeInterleaveIO (elements pid outh' errh')
return $ elems' ++ more
data Readyness = Ready | Unready | Closed
hReady' :: Handle -> IO Readyness
hReady' h =
hIsClosed h >>= checkReady
where
checkReady True = return Closed
checkReady False =
(hReady h >>= (\ flag -> return (if flag then Ready else Unready)))
`catch` endOfFile (\ _ -> return Closed)
ready :: (NonBlocking a c) =>
Int -> Maybe Handle -> Maybe Handle
-> IO (Maybe Handle, Maybe Handle, [Output a])
ready waitUSecs outh errh =
do
outReady <- maybe (return Closed) hReady' outh
errReady <- maybe (return Closed) hReady' errh
case (outReady, errReady) of
(Closed, Closed) ->
return (Nothing, Nothing, [])
(Unready, Unready) ->
do threadDelay waitUSecs
ready (min maxUSecs (2 * waitUSecs)) outh errh
_ ->
do (errh', out1) <- nextOut errh errReady Stderr
(outh', out2) <- nextOut outh outReady Stdout
return (outh', errh', out1 ++ out2)
nextOut :: NonBlocking a c => Maybe Handle -> Readyness -> (a -> Output a) -> IO (Maybe Handle, [Output a])
nextOut Nothing _ _ = return (Nothing, [])
nextOut (Just h) Ready constructor =
do
a <- hGetSome h (fromIntegral bufSize)
case length a of
0 -> do hClose h
return (Nothing, [])
_n -> return (Just h, [constructor a])
nextOut h _ _ = return (h, [])
endOfFile :: (IOError -> IO a) -> IOError -> IO a
endOfFile eeof e = if E.ioe_type e == E.EOF then eeof e else ioError e
bufSize :: Int
bufSize = 4096
uSecs :: Int
uSecs = 8
maxUSecs :: Int
maxUSecs = 100000
readProcessChunks' :: (NonBlocking a c) =>
CreateProcess
-> a
-> IO [Output a]
readProcessChunks' p input = mask $ \ restore -> do
(Just inh, Just outh, Just errh, pid) <-
createProcess (p {std_in = CreatePipe, std_out = CreatePipe, std_err = CreatePipe })
binary input [inh, outh, errh]
flip onException
(do hClose inh; hClose outh; hClose errh;
terminateProcess pid; waitForProcess pid) $ restore $ do
waitOut <- forkWait $ elements' pid outh errh
(do unless (null input) (hPutStr inh input >> hFlush inh)
hClose inh) `catch` resourceVanished (\ _e -> return ())
waitOut
elements' :: forall a c. NonBlocking a c => ProcessHandle -> Handle -> Handle -> IO [Output a]
elements' pid outh errh = do
res <- newEmptyMVar
_outtid <- forkIO $ hGetContents outh >>= mapM_ (\ c -> putMVar res (Stdout c)) . toChunks >> hClose outh >> putMVar res (Exception (mkIOError E.EOF "EOF" Nothing Nothing))
_errtid <- forkIO $ hGetContents errh >>= mapM_ (\ c -> putMVar res (Stderr c)) . toChunks >> hClose errh >> putMVar res (Exception (mkIOError E.EOF "EOF" Nothing Nothing))
takeChunks 0 res
where
takeChunks :: Int -> MVar (Output a) -> IO [Output a]
takeChunks 2 _ = waitForProcess pid >>= \ result -> return [Result result]
takeChunks closedCount res = takeMVar res >>= takeChunk closedCount res
takeChunk closedCount res (Exception _) = takeChunks (closedCount + 1) res
takeChunk closedCount res x =
do xs <- unsafeInterleaveIO $ takeChunks closedCount res
return (x : xs)
forkWait :: IO a -> IO (IO a)
forkWait a = do
res <- newEmptyMVar
_ <- mask $ \restore -> forkIO $ try (restore a) >>= putMVar res
return (takeMVar res >>= either (\ex -> throwIO (ex :: SomeException)) return)
resourceVanished :: (IOError -> IO a) -> IOError -> IO a
resourceVanished epipe e = if ioe_type e == ResourceVanished then epipe e else ioError e
instance NonBlocking B.ByteString Word8 where
hGetSome = B.hGetSome
toChunks = (: [])
instance NonBlocking L.ByteString Word8 where
hGetSome h n = (L.fromChunks . (: [])) <$> B.hGetSome h (fromIntegral n)
toChunks = Prelude.map (L.fromChunks . (: [])) . L.toChunks