module System.Process.Streaming (
execute,
execute3,
exitCode,
separate,
LinePolicy,
linePolicy,
LeftoverPolicy,
ignoreLeftovers,
failOnLeftovers,
combineLines,
useConsumer,
useProducer,
surely,
safely,
fallibly,
monoidally,
exceptionally,
nop,
encoding,
Conc (..),
conc,
mapConc,
ForkProd (..),
forkProd
) where
import Data.Maybe
import Data.Functor.Identity
import Data.Either
import Data.Either.Combinators
import Data.Monoid
import Data.Traversable
import Data.Typeable
import Data.Text
import Control.Applicative
import Control.Monad
import Control.Monad.Trans.Free
import Control.Monad.Trans.Either
import Control.Monad.Error
import Control.Monad.State
import Control.Monad.Morph
import Control.Monad.Writer.Strict
import qualified Control.Monad.Catch as C
import Control.Exception
import Control.Concurrent
import Control.Concurrent.Async
import Pipes
import qualified Pipes as P
import qualified Pipes.Prelude as P
import Pipes.Lift
import Pipes.ByteString
import qualified Pipes.Text as T
import Pipes.Concurrent
import Pipes.Safe (SafeT, runSafeT)
import System.IO
import System.Process
import System.Process.Lens
import System.Exit
execute :: (Show e, Typeable e)
=> CreateProcess
-> (IOException -> e)
-> (Producer ByteString IO () -> Producer ByteString IO () -> IO (Either e a))
-> IO (Either e (ExitCode,a))
execute spec ehandler consumefunc = do
executeX handle2 spec' ehandler $ \(hout,herr) ->
(,) (consumefunc (fromHandle hout) (fromHandle herr))
(hClose hout `finally` hClose herr)
where
spec' = spec { std_out = CreatePipe
, std_err = CreatePipe
}
execute3 :: (Show e, Typeable e)
=> CreateProcess
-> (IOException -> e)
-> (Consumer ByteString IO () -> IO (Either e a))
-> (Producer ByteString IO () -> Producer ByteString IO () -> IO (Either e b))
-> IO (Either e (ExitCode,(a,b)))
execute3 spec ehandler feeder consumefunc = do
executeX handle3 spec' ehandler $ \(hin,hout,herr) ->
(,) (conc (feeder (toHandle hin) `finally` hClose hin)
(consumefunc (fromHandle hout) (fromHandle herr)))
(hClose hin `finally` hClose hout `finally` hClose herr)
where
spec' = spec { std_in = CreatePipe
, std_out = CreatePipe
, std_err = CreatePipe
}
try' :: (IOException -> e) -> IO (Either e a) -> IO (Either e a)
try' handler action = try action >>= either (return . Left . handler) return
createProcess' :: CreateProcess
-> IO (Either IOException (Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle))
createProcess' = try . createProcess
terminateCarefully :: ProcessHandle -> IO ()
terminateCarefully pHandle = do
mExitCode <- getProcessExitCode pHandle
case mExitCode of
Nothing -> terminateProcess pHandle
Just _ -> return ()
terminateOnError :: ProcessHandle
-> IO (Either e a)
-> IO (Either e (ExitCode,a))
terminateOnError pHandle action = do
result <- action
case result of
Left e -> do
terminateCarefully pHandle
return $ Left e
Right r -> do
exitCode <- waitForProcess pHandle
return $ Right (exitCode,r)
executeX :: ((forall m. Applicative m => ((t, ProcessHandle) -> m (t, ProcessHandle)) -> (Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle) -> m (Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle))) -> CreateProcess -> (IOException -> e) -> (t -> (IO (Either e a), IO())) -> IO (Either e (ExitCode,a))
executeX somePrism procSpec exHandler action = mask $ \restore -> runEitherT $ do
maybeHtuple <- bimapEitherT exHandler id $ EitherT $ createProcess' procSpec
EitherT $ try' exHandler $
case getFirst . getConst . somePrism (Const . First . Just) $ maybeHtuple of
Nothing ->
throwIO (userError "stdin/stdout/stderr handle unexpectedly null")
`finally`
let (_,_,_,phandle) = maybeHtuple
in terminateCarefully phandle
Just (htuple,phandle) -> let (a, cleanup) = action htuple in
(terminateOnError phandle $ restore a `onException` terminateCarefully phandle)
`finally`
cleanup
exitCode :: Functor c => (Int -> e) -> c (Either e (ExitCode,a)) -> c (Either e a)
exitCode f m = conversion <$> m
where
conversion r = case r of
Left e -> Left e
Right (code,a) -> case code of
ExitSuccess -> Right a
ExitFailure i -> Left $ f i
separate :: (Show e, Typeable e)
=> (Producer ByteString IO () -> IO (Either e a))
-> (Producer ByteString IO () -> IO (Either e b))
-> Producer ByteString IO () -> Producer ByteString IO () -> IO (Either e (a,b))
separate outfunc errfunc outprod errprod =
conc (buffer_ outfunc outprod)
(buffer_ errfunc errprod)
type LinePolicy e = (FreeT (Producer T.Text IO) IO (Producer ByteString IO ()) -> IO (Producer ByteString IO ())) -> Producer ByteString IO () -> IO (Either e ())
linePolicy :: (forall r. Producer ByteString IO r -> Producer T.Text IO (Producer ByteString IO r))
-> (forall r. Producer T.Text IO r -> Producer T.Text IO r)
-> (LeftoverPolicy (Producer ByteString IO ()) e ())
-> LinePolicy e
linePolicy decoder transform lopo teardown producer = do
teardown freeLines >>= lopo ()
where
freeLines = transFreeT transform
. viewLines
. decoder
$ producer
viewLines = getConst . T.lines Const
type LeftoverPolicy l e a = a -> l -> IO (Either e a)
ignoreLeftovers :: LeftoverPolicy l e a
ignoreLeftovers a _ = return $ Right a
failOnLeftovers :: (a -> b -> e) -> LeftoverPolicy (Producer b IO ()) e a
failOnLeftovers errh a remainingBytes = do
r <- next remainingBytes
return $ case r of
Left () -> Right a
Right (somebytes,_) -> Left $ errh a somebytes
combineLines :: (Show e, Typeable e)
=> LinePolicy e
-> LinePolicy e
-> (Producer T.Text IO () -> IO (Either e a))
-> Producer ByteString IO () -> Producer ByteString IO () -> IO (Either e a)
combineLines fun1 fun2 combinedConsumer prod1 prod2 =
combineManyLines [fmap (($prod1).buffer_) fun1, fmap (($prod2).buffer_) fun2] combinedConsumer
combineManyLines :: (Show e, Typeable e)
=> [((FreeT (Producer T.Text IO) IO (Producer ByteString IO ())) -> IO (Producer ByteString IO ())) -> IO (Either e ())]
-> (Producer T.Text IO () -> IO (Either e a))
-> IO (Either e a)
combineManyLines actions consumer = do
(outbox, inbox, seal) <- spawn' Unbounded
mVar <- newMVar outbox
r <- conc (mapConc ($ iterTLines mVar) actions `finally` atomically seal)
(consumer (fromInput inbox) `finally` atomically seal)
return $ snd <$> r
where
iterTLines mvar = iterT $ \textProducer -> do
join $ withMVar mvar $ \output -> do
runEffect $ (textProducer <* P.yield (singleton '\n')) >-> (toOutput output >> P.drain)
useConsumer :: Monad m => Consumer b m () -> Producer b m () -> m ()
useConsumer consumer producer = runEffect $ producer >-> consumer
useProducer :: Monad m => Producer b m () -> Consumer b m () -> m ()
useProducer producer consumer = runEffect (producer >-> consumer)
surely :: (Functor f, Functor f') => f (f' a) -> f (f' (Either e a))
surely = fmap (fmap Right)
safely :: (MFunctor t, C.MonadCatch m, MonadIO m)
=> (t (SafeT m) l -> (SafeT m) x)
-> t m l -> m x
safely activity = runSafeT . activity . hoist lift
fallibly :: (MFunctor t, Monad m, Error e)
=> (t (ErrorT e m) l -> (ErrorT e m) x)
-> t m l -> m (Either e x)
fallibly activity = runErrorT . activity . hoist lift
monoidally :: (MFunctor t,Monad m,Monoid w, Error e')
=> (e' -> w -> e)
-> (t (ErrorT e' (WriterT w m)) l -> ErrorT e' (WriterT w m) ())
-> t m l -> m (Either e w)
monoidally errh activity proxy = do
(r,w) <- runWriterT . runErrorT . activity . hoist (lift.lift) $ proxy
return $ case r of
Left e' -> Left $ errh e' w
Right () -> Right $ w
exceptionally :: (IOException -> e)
-> (x -> IO (Either e a))
-> (x -> IO (Either e a))
exceptionally handler operation x = try' handler (operation x)
nop :: (MFunctor t, Monad m) => t m l -> m (Either e ())
nop = \_ -> return $ Right ()
buffer :: (Show e, Typeable e)
=> LeftoverPolicy l e a
-> (Producer b IO () -> IO (Either e a))
-> Producer b IO l -> IO (Either e a)
buffer policy activity producer = do
(outbox,inbox,seal) <- spawn' Unbounded
r <- conc (do feeding <- async $ runEffect $
producer >-> (toOutput outbox >> P.drain)
Right <$> wait feeding `finally` atomically seal
)
(activity (fromInput inbox) `finally` atomically seal)
case r of
Left e -> return $ Left e
Right (lp,r') -> policy r' lp
encoding :: (Show e, Typeable e)
=> (Producer b IO r -> Producer t IO (Producer b IO r))
-> LeftoverPolicy (Producer b IO r) e x
-> (Producer t IO () -> IO (Either e x))
-> Producer b IO r -> IO (Either e x)
encoding decoder policy activity producer = buffer policy activity $ decoder producer
buffer_ :: (Show e, Typeable e)
=> (Producer ByteString IO () -> IO (Either e a))
-> Producer ByteString IO () -> IO (Either e a)
buffer_ = buffer ignoreLeftovers
data WrappedError e = WrappedError e
deriving (Show, Typeable)
instance (Show e, Typeable e) => Exception (WrappedError e)
elideError :: (Show e, Typeable e) => IO (Either e a) -> IO a
elideError action = action >>= either (throwIO . WrappedError) return
revealError :: (Show e, Typeable e) => IO a -> IO (Either e a)
revealError action = catch (action >>= return . Right)
(\(WrappedError e) -> return . Left $ e)
newtype Conc e a = Conc { runConc :: IO (Either e a) }
instance Functor (Conc e) where
fmap f (Conc x) = Conc $ fmap (fmap f) x
instance (Show e, Typeable e) => Applicative (Conc e) where
pure = Conc . pure . pure
Conc fs <*> Conc as =
Conc . revealError $
uncurry ($) <$> concurrently (elideError fs) (elideError as)
instance (Show e, Typeable e) => Alternative (Conc e) where
empty = Conc $ forever (threadDelay maxBound)
Conc as <|> Conc bs =
Conc $ either id id <$> race as bs
conc :: (Show e, Typeable e)
=> IO (Either e a)
-> IO (Either e b)
-> IO (Either e (a,b))
conc c1 c2 = runConc $ (,) <$> Conc c1
<*> Conc c2
mapConc :: (Show e, Typeable e, Traversable t) => (a -> IO (Either e b)) -> t a -> IO (Either e (t b))
mapConc f = revealError . mapConcurrently (elideError . f)
newtype ForkProd b e a = ForkProd { runForkProd :: Producer b IO () -> IO (Either e a) }
instance Functor (ForkProd b e) where
fmap f (ForkProd x) = ForkProd $ fmap (fmap (fmap f)) x
instance (Show e, Typeable e) => Applicative (ForkProd b e) where
pure = ForkProd . pure . pure . pure
ForkProd fs <*> ForkProd as =
ForkProd $ \producer -> do
(outbox1,inbox1,seal1) <- spawn' Unbounded
(outbox2,inbox2,seal2) <- spawn' Unbounded
r <- conc (do
feeding <- async $ runEffect $
producer >-> P.tee (toOutput outbox1 >> P.drain)
>-> (toOutput outbox2 >> P.drain)
sealing <- async $ wait feeding `finally` atomically seal1
`finally` atomically seal2
return $ Right ()
)
(fmap (uncurry ($)) <$> conc ((fs $ fromInput inbox1)
`finally` atomically seal1)
((as $ fromInput inbox2)
`finally` atomically seal2)
)
return $ fmap snd r
forkProd :: (Show e, Typeable e)
=> (Producer b IO () -> IO (Either e x))
-> (Producer b IO () -> IO (Either e y))
-> (Producer b IO () -> IO (Either e (x,y)))
forkProd c1 c2 = runForkProd $ (,) <$> ForkProd c1
<*> ForkProd c2