module System.Process.Streaming (
execute
, exitCode
, safeExecute
, simpleSafeExecute
, PipingPolicy
, nopiping
, pipeoe
, pipeioe
, separated
, combined
, LinePolicy
, linePolicy
, encoding
, LeftoverPolicy(..)
, ignoreLeftovers
, failOnLeftovers
, useConsumer
, useProducer
, surely
, safely
, fallibly
, monoidally
, nop
, Conceit (..)
, conceit
, mapConceit
, Siphon (..)
, forkSiphon
, SiphonL (..)
, SiphonR (..)
, module System.Process
) where
import Data.Maybe
import Data.Bifunctor
import Data.Profunctor
import Data.Functor.Identity
import Data.Either
import Data.Monoid
import Data.Traversable
import Data.Typeable
import Data.Text
import Control.Applicative
import Control.Monad
import Control.Monad.Trans.Free
import Control.Monad.Error
import Control.Monad.State
import Control.Monad.Writer.Strict
import qualified Control.Monad.Catch as C
import Control.Exception
import Control.Concurrent
import Control.Concurrent.Async
import Pipes
import qualified Pipes as P
import qualified Pipes.Prelude as P
import Pipes.Lift
import Pipes.ByteString
import qualified Pipes.Text as T
import Pipes.Concurrent
import Pipes.Safe (SafeT, runSafeT)
import System.IO
import System.IO.Error
import System.Process
import System.Process.Lens
import System.Exit
execute :: PipingPolicy e a -> CreateProcess -> IO (Either e (ExitCode,a))
execute (PipingPolicy tr somePrism action) procSpec = mask $ \restore -> do
(min,mout,merr,phandle) <- createProcess (tr procSpec)
case getFirst . getConst . somePrism (Const . First . Just) $ (min,mout,merr) of
Nothing ->
throwIO (userError "stdin/stdout/stderr handle unexpectedly null")
`finally`
terminateCarefully phandle
Just t ->
let (a, cleanup) = action t in
(restore (terminateOnError phandle a) `onException` terminateCarefully phandle)
`finally`
cleanup
exitCode :: (ExitCode,a) -> Either Int a
exitCode (ec,a) = case ec of
ExitSuccess -> Right a
ExitFailure i -> Left i
safeExecute :: (IOError -> e) -> (Int -> e) -> PipingPolicy e a -> CreateProcess -> IO (Either e a)
safeExecute exh ech pp cp = collapseEithers <$> (tryIOError $ execute pp cp)
where
collapseEithers = join . join . bimap exh (fmap (bimap ech id . exitCode))
simpleSafeExecute :: PipingPolicy String a -> CreateProcess -> IO (Either String a)
simpleSafeExecute = safeExecute show (mappend "Exit code: " . show)
terminateCarefully :: ProcessHandle -> IO ()
terminateCarefully pHandle = do
mExitCode <- getProcessExitCode pHandle
case mExitCode of
Nothing -> terminateProcess pHandle
Just _ -> return ()
terminateOnError :: ProcessHandle
-> IO (Either e a)
-> IO (Either e (ExitCode,a))
terminateOnError pHandle action = do
result <- action
case result of
Left e -> do
terminateCarefully pHandle
return $ Left e
Right r -> do
exitCode <- waitForProcess pHandle
return $ Right (exitCode,r)
data PipingPolicy e a = forall t. PipingPolicy (CreateProcess -> CreateProcess) (forall m. Applicative m => (t -> m t) -> (Maybe Handle, Maybe Handle, Maybe Handle) -> m (Maybe Handle, Maybe Handle, Maybe Handle)) (t -> (IO (Either e a), IO ()))
instance Functor (PipingPolicy e) where
fmap f (PipingPolicy cpf prsm func) = PipingPolicy cpf prsm $
(fmap (bimap (fmap (fmap f)) id) func)
instance Bifunctor PipingPolicy where
bimap f g (PipingPolicy cpf prsm func) = PipingPolicy cpf prsm $
(fmap (bimap (fmap (bimap f g)) id) func)
nopiping :: PipingPolicy e ()
nopiping = PipingPolicy id nohandles (\() -> (return $ return (), return ()))
pipeoe :: (Producer ByteString IO () -> Producer ByteString IO () -> IO (Either e a))
-> PipingPolicy e a
pipeoe consumefunc = PipingPolicy changecp handlesoe handler
where handler (hout,herr) =
(,) (consumefunc (fromHandle hout) (fromHandle herr))
(hClose hout `finally` hClose herr)
changecp cp = cp { std_out = CreatePipe
, std_err = CreatePipe
}
pipeioe :: (Show e, Typeable e)
=> (Consumer ByteString IO () -> IO (Either e a))
-> (Producer ByteString IO () -> Producer ByteString IO () -> IO (Either e b))
-> PipingPolicy e (a,b)
pipeioe feeder consumefunc = PipingPolicy changecp handlesioe handler
where handler (hin,hout,herr) =
(,) (conceit (feeder (toHandle hin) `finally` hClose hin)
(consumefunc (fromHandle hout) (fromHandle herr)))
(hClose hin `finally` hClose hout `finally` hClose herr)
changecp cp = cp { std_in = CreatePipe
, std_out = CreatePipe
, std_err = CreatePipe
}
separated :: (Show e, Typeable e)
=> (Producer ByteString IO () -> IO (Either e a))
-> (Producer ByteString IO () -> IO (Either e b))
-> Producer ByteString IO () -> Producer ByteString IO () -> IO (Either e (a,b))
separated outfunc errfunc outprod errprod =
conceit (buffer_ outfunc outprod) (buffer_ errfunc errprod)
data LinePolicy e = LinePolicy ((FreeT (Producer T.Text IO) IO (Producer ByteString IO ()) -> IO (Producer ByteString IO ())) -> Producer ByteString IO () -> IO (Either e ()))
instance Functor LinePolicy where
fmap f (LinePolicy func) = LinePolicy $ fmap (fmap (fmap (bimap f id))) func
linePolicy :: (forall r. Producer ByteString IO r -> Producer T.Text IO (Producer ByteString IO r))
-> (forall r. Producer T.Text IO r -> Producer T.Text IO r)
-> (LeftoverPolicy () ByteString e)
-> LinePolicy e
linePolicy decoder transform lopo = LinePolicy $ \teardown producer -> do
let freeLines = transFreeT transform
. viewLines
. decoder
$ producer
viewLines = getConst . T.lines Const
teardown freeLines >>= runLeftoverPolicy lopo ()
data LeftoverPolicy a l e = LeftoverPolicy { runLeftoverPolicy :: a -> Producer l IO () -> IO (Either e a) }
instance Functor (LeftoverPolicy a l) where
fmap f (LeftoverPolicy x) = LeftoverPolicy $ fmap (fmap (fmap (bimap f id))) x
instance Profunctor (LeftoverPolicy a) where
dimap ab cd (LeftoverPolicy pf) = LeftoverPolicy $ \a p -> liftM (bimap cd id) $ pf a $ p >-> P.map ab
ignoreLeftovers :: LeftoverPolicy a l e
ignoreLeftovers = LeftoverPolicy $ pure . pure . pure
failOnLeftovers :: (a -> b -> e) -> LeftoverPolicy a b e
failOnLeftovers errh = LeftoverPolicy $ \a remainingBytes -> do
r <- next remainingBytes
return $ case r of
Left () -> Right a
Right (somebytes,_) -> Left $ errh a somebytes
combined :: (Show e, Typeable e)
=> LinePolicy e
-> LinePolicy e
-> (Producer T.Text IO () -> IO (Either e a))
-> Producer ByteString IO () -> Producer ByteString IO () -> IO (Either e a)
combined (LinePolicy fun1) (LinePolicy fun2) combinedConsumer prod1 prod2 =
manyCombined [fmap (($prod1).buffer_) fun1, fmap (($prod2).buffer_) fun2] combinedConsumer
manyCombined :: (Show e, Typeable e)
=> [((FreeT (Producer T.Text IO) IO (Producer ByteString IO ())) -> IO (Producer ByteString IO ())) -> IO (Either e ())]
-> (Producer T.Text IO () -> IO (Either e a))
-> IO (Either e a)
manyCombined actions consumer = do
(outbox, inbox, seal) <- spawn' Unbounded
mVar <- newMVar outbox
r <- conceit (mapConceit ($ iterTLines mVar) actions `finally` atomically seal)
(consumer (fromInput inbox) `finally` atomically seal)
return $ snd <$> r
where
iterTLines mvar = iterT $ \textProducer -> do
join $ withMVar mvar $ \output -> do
runEffect $ (textProducer <* P.yield (singleton '\n')) >-> (toOutput output >> P.drain)
useConsumer :: Monad m => Consumer b m () -> Producer b m () -> m ()
useConsumer consumer producer = runEffect $ producer >-> consumer
useProducer :: Monad m => Producer b m () -> Consumer b m () -> m ()
useProducer producer consumer = runEffect (producer >-> consumer)
surely :: (Functor f0, Functor f1) => f0 (f1 a) -> f0 (f1 (Either e a))
surely = fmap (fmap Right)
safely :: (MFunctor t, C.MonadMask m, MonadIO m)
=> (t (SafeT m) l -> (SafeT m) x)
-> t m l -> m x
safely activity = runSafeT . activity . hoist lift
fallibly :: (MFunctor t, Monad m, Error e)
=> (t (ErrorT e m) l -> (ErrorT e m) x)
-> t m l -> m (Either e x)
fallibly activity = runErrorT . activity . hoist lift
monoidally :: (MFunctor t,Monad m,Monoid w, Error e')
=> (e' -> w -> e)
-> (t (ErrorT e' (WriterT w m)) l -> ErrorT e' (WriterT w m) ())
-> t m l -> m (Either e w)
monoidally errh activity proxy = do
(r,w) <- runWriterT . runErrorT . activity . hoist (lift.lift) $ proxy
return $ case r of
Left e' -> Left $ errh e' w
Right () -> Right $ w
nop :: Applicative m => i -> m (Either e ())
nop = pure . pure . pure $ ()
buffer :: (Show e, Typeable e)
=> LeftoverPolicy a l e
-> (Producer b IO () -> IO (Either e a))
-> Producer b IO (Producer l IO ()) -> IO (Either e a)
buffer policy activity producer = do
(outbox,inbox,seal) <- spawn' Unbounded
r <- conceit
(do feeding <- async $ runEffect $
producer >-> (toOutput outbox >> P.drain)
Right <$> wait feeding `finally` atomically seal
)
(activity (fromInput inbox) `finally` atomically seal)
case r of
Left e -> return $ Left e
Right (lp,r') -> runLeftoverPolicy policy r' lp
buffer_ :: (Show e, Typeable e)
=> (Producer ByteString IO () -> IO (Either e a))
-> Producer ByteString IO () -> IO (Either e a)
buffer_ activity producer = do
(outbox,inbox,seal) <- spawn' Unbounded
r <- conceit
(do feeding <- async $ runEffect $
producer >-> (toOutput outbox >> P.drain)
Right <$> wait feeding `finally` atomically seal
)
(activity (fromInput inbox) `finally` atomically seal)
return $ fmap snd r
encoding :: (Show e, Typeable e)
=> (Producer b IO () -> Producer t IO (Producer b IO ()))
-> LeftoverPolicy a b e
-> (Producer t IO () -> IO (Either e a))
-> Producer b IO () -> IO (Either e a)
encoding decoder policy activity producer = buffer policy activity $ decoder producer
data WrappedError e = WrappedError e
deriving (Show, Typeable)
instance (Show e, Typeable e) => Exception (WrappedError e)
elideError :: (Show e, Typeable e) => IO (Either e a) -> IO a
elideError action = action >>= either (throwIO . WrappedError) return
revealError :: (Show e, Typeable e) => IO a -> IO (Either e a)
revealError action = catch (action >>= return . Right)
(\(WrappedError e) -> return . Left $ e)
newtype Conceit e a = Conceit { runConceit :: IO (Either e a) }
instance Functor (Conceit e) where
fmap f (Conceit x) = Conceit $ fmap (fmap f) x
instance Bifunctor Conceit where
bimap f g (Conceit x) = Conceit $ liftM (bimap f g) x
instance (Show e, Typeable e) => Applicative (Conceit e) where
pure = Conceit . pure . pure
Conceit fs <*> Conceit as =
Conceit . revealError $
uncurry ($) <$> concurrently (elideError fs) (elideError as)
instance (Show e, Typeable e) => Alternative (Conceit e) where
empty = Conceit $ forever (threadDelay maxBound)
Conceit as <|> Conceit bs =
Conceit $ either id id <$> race as bs
instance (Show e, Typeable e, Monoid a) => Monoid (Conceit e a) where
mempty = Conceit . pure . pure $ mempty
mappend c1 c2 = (<>) <$> c1 <*> c2
conceit :: (Show e, Typeable e)
=> IO (Either e a)
-> IO (Either e b)
-> IO (Either e (a,b))
conceit c1 c2 = runConceit $ (,) <$> Conceit c1 <*> Conceit c2
mapConceit :: (Show e, Typeable e, Traversable t) => (a -> IO (Either e b)) -> t a -> IO (Either e (t b))
mapConceit f = revealError . mapConcurrently (elideError . f)
newtype Siphon b e a = Siphon { runSiphon :: Producer b IO () -> IO (Either e a) }
instance Functor (Siphon b e) where
fmap f (Siphon x) = Siphon $ fmap (fmap (fmap f)) x
instance Bifunctor (Siphon b) where
bimap f g (Siphon x) = Siphon $ fmap (liftM (bimap f g)) x
instance (Show e, Typeable e) => Applicative (Siphon b e) where
pure = Siphon . pure . pure . pure
Siphon fs <*> Siphon as =
Siphon $ \producer -> do
(outbox1,inbox1,seal1) <- spawn' Unbounded
(outbox2,inbox2,seal2) <- spawn' Unbounded
r <- conceit (do
feeding <- async $ runEffect $
producer >-> P.tee (toOutput outbox1 >> P.drain)
>-> (toOutput outbox2 >> P.drain)
sealing <- async $ wait feeding `finally` atomically seal1
`finally` atomically seal2
return $ Right ()
)
(fmap (uncurry ($)) <$> conceit ((fs $ fromInput inbox1)
`finally` atomically seal1)
((as $ fromInput inbox2)
`finally` atomically seal2)
)
return $ fmap snd r
instance (Show e, Typeable e, Monoid a) => Monoid (Siphon b e a) where
mempty = Siphon . pure . pure . pure $ mempty
mappend s1 s2 = (<>) <$> s1 <*> s2
forkSiphon :: (Show e, Typeable e)
=> (Producer b IO () -> IO (Either e x))
-> (Producer b IO () -> IO (Either e y))
-> Producer b IO () -> IO (Either e (x,y))
forkSiphon c1 c2 = runSiphon $ (,) <$> Siphon c1 <*> Siphon c2
newtype SiphonL a b e = SiphonL { runSiphonL :: Producer b IO () -> IO (Either e a) }
instance Profunctor (SiphonL e) where
dimap ab cd (SiphonL pf) = SiphonL $ \p -> liftM (bimap cd id) $ pf $ p >-> P.map ab
newtype SiphonR e b a = SiphonR { runSiphonR :: Producer b IO () -> IO (Either e a) }
instance Profunctor (SiphonR e) where
dimap ab cd (SiphonR pf) = SiphonR $ \p -> liftM (fmap cd) $ pf $ p >-> P.map ab