{-# LANGUAGE FlexibleContexts, MultiParamTypeClasses, NamedFieldPuns,
RankNTypes, RecordWildCards #-}
module Streaming.Process
(
withStreamingProcess
, withStreamingCommand
, streamInput
, streamInputCommand
, withStreamingOutput
, withStreamingOutputCommand
, StreamProcess(..)
, switchOutputs
, WithStream(..)
, WithStream'
, SupplyStream(..)
, withStreamProcess
, withStreamCommand
, withProcessHandles
, processInput
, withProcessOutput
, StdOutErr
, withStreamOutputs
, module Data.Streaming.Process
, concurrently
) where
import Data.ByteString.Streaming (ByteString)
import qualified Data.ByteString.Streaming as SB
import Streaming (hoist)
import Streaming.Concurrent (unbounded, withMergedStreams)
import qualified Streaming.Prelude as S
import Control.Concurrent.Async.Lifted (concurrently)
import Control.Monad.Base (MonadBase)
import Control.Monad.Catch (MonadMask, finally, onException, throwM)
import Control.Monad.IO.Class (MonadIO, liftIO)
import Control.Monad.Trans.Control (MonadBaseControl)
import Data.Streaming.Process
import Data.Streaming.Process.Internal (InputSource(..), OutputSink(..))
import System.Exit (ExitCode(..))
import System.IO (hClose)
import System.Process (CreateProcess(..),
StdStream(CreatePipe), shell)
withStreamingProcess :: (MonadBaseControl IO m, MonadIO m, MonadMask m
, MonadBase IO n)
=> CreateProcess -> ByteString m v
-> (StdOutErr n () -> m r) -> m r
withStreamingProcess cp inp = withStreamProcess cp
. flip (withProcessHandles inp)
withStreamingCommand :: (MonadBaseControl IO m, MonadIO m, MonadMask m
, MonadBase IO n)
=> String -> ByteString m v
-> (StdOutErr n () -> m r) -> m r
withStreamingCommand = withStreamingProcess . shell
streamInput :: (MonadIO m, MonadMask m) => CreateProcess
-> ByteString m r -> m r
streamInput cp = withStreamProcess cp . flip processInput
streamInputCommand :: (MonadIO m, MonadMask m) => String
-> ByteString m r -> m r
streamInputCommand = streamInput . shell
withStreamingOutput :: (MonadIO n, MonadIO m, MonadMask m)
=> CreateProcess
-> (ByteString n () -> m r) -> m r
withStreamingOutput cp = withStreamProcess cp . flip withProcessOutput
withStreamingOutputCommand :: (MonadIO n, MonadIO m, MonadMask m)
=> String
-> (ByteString n () -> m r) -> m r
withStreamingOutputCommand = withStreamingOutput . shell
withProcessHandles :: (MonadBaseControl IO m, MonadIO m, MonadMask m, MonadBase IO n)
=> ByteString m v
-> StreamProcess (SupplyStream m)
(WithStream' m)
(WithStream' m)
-> (StdOutErr n () -> m r) -> m r
withProcessHandles inp sp@StreamProcess{..} f =
snd <$> concurrently withIn withOutErr
where
withIn = supplyStream toStdin inp
withOutErr = withStreamOutputs sp f
processInput :: (MonadIO m, MonadMask m)
=> StreamProcess (SupplyStream m) ClosedStream ClosedStream
-> ByteString m r -> m r
processInput StreamProcess{toStdin} = supplyStream toStdin
withProcessOutput :: (MonadIO n, MonadIO m, MonadMask m)
=> StreamProcess ClosedStream (WithStream n m) ClosedStream
-> (ByteString n () -> m r) -> m r
withProcessOutput StreamProcess{fromStdout} = withStream fromStdout
data StreamProcess stdin stdout stderr = StreamProcess
{ toStdin :: !stdin
, fromStdout :: !stdout
, fromStderr :: !stderr
} deriving (Eq, Show)
switchOutputs :: StreamProcess stdin stdout stderr
-> StreamProcess stdin stderr stdout
switchOutputs sp@StreamProcess{fromStdout, fromStderr}
= sp { fromStdout = fromStderr
, fromStderr = fromStdout
}
withStreamProcess :: (InputSource stdin, OutputSink stdout, OutputSink stderr
, MonadIO m, MonadMask m)
=> CreateProcess
-> (StreamProcess stdin stdout stderr -> m r) -> m r
withStreamProcess cp f = do
(stdin, stdout, stderr, sph) <- streamingProcess cp
r <- f (StreamProcess stdin stdout stderr)
`onException` terminateStreamingProcess sph
ec <- waitForStreamingProcess sph `finally` closeStreamingProcessHandle sph
case ec of
ExitSuccess -> return r
ExitFailure _ -> throwM (ProcessExitedUnsuccessfully cp ec)
withStreamCommand :: (InputSource stdin, OutputSink stdout, OutputSink stderr
, MonadIO m, MonadMask m)
=> String
-> (StreamProcess stdin stdout stderr -> m r) -> m r
withStreamCommand = withStreamProcess . shell
terminateStreamingProcess :: (MonadIO m) => StreamingProcessHandle -> m ()
terminateStreamingProcess = liftIO . terminateProcess . streamingProcessHandleRaw
type StdOutErr m r = ByteString (ByteString m) r
withStreamOutputs :: ( MonadMask m, MonadIO m, MonadBaseControl IO m
, MonadBase IO n)
=> StreamProcess stdin (WithStream' m) (WithStream' m)
-> (StdOutErr n () -> m r) -> m r
withStreamOutputs StreamProcess{fromStdout, fromStderr} f =
withStream fromStdout $ \stdout ->
withStream fromStderr $ \stderr ->
let getOut = S.map Left . SB.toChunks $ stdout
getErr = S.map Right . SB.toChunks $ stderr
in withMergedStreams unbounded [getOut, getErr] (f . mrg)
where
mrg = SB.fromChunks . hoist SB.fromChunks . S.partitionEithers
newtype SupplyStream m = SupplyStream { supplyStream :: forall r. ByteString m r -> m r }
instance (MonadMask m, MonadIO m) => InputSource (SupplyStream m) where
isStdStream = (\(Just h) -> return (SupplyStream $ \inp ->
SB.hPut h inp `finally` liftIO (hClose h))
, Just CreatePipe
)
newtype WithStream n m = WithStream { withStream :: forall r. (ByteString n () -> m r) -> m r }
type WithStream' m = WithStream m m
instance (MonadIO m, MonadMask m, MonadIO n) => OutputSink (WithStream n m) where
osStdStream = (\(Just h) -> return (WithStream $ \f ->
f (SB.hGetContents h) `finally` liftIO (hClose h))
, Just CreatePipe
)