module System.Process.Streaming (
execute
, executeFallibly
, PipingPolicy
, nopiping
, pipeo
, pipee
, pipeoe
, pipeoec
, pipei
, pipeio
, pipeie
, pipeioe
, pipeioec
, Pump (..)
, fromProducer
, fromSafeProducer
, fromFallibleProducer
, Siphon
, siphon
, siphon'
, fromFold
, fromFold'
, fromFold'_
, fromConsumer
, fromSafeConsumer
, fromFallibleConsumer
, fromParser
, unwanted
, DecodingFunction
, encoded
, LinePolicy
, linePolicy
, tweakLines
, executePipeline
, executePipelineFallibly
, Stage
, stage
, pipefail
, inbound
, module System.Process
) where
import Data.Maybe
import Data.Bifunctor
import Data.Functor.Identity
import Data.Either
import Data.Monoid
import Data.Foldable
import Data.Traversable
import Data.Tree
import Data.Text
import Data.Text.Encoding
import Data.Void
import Data.List.NonEmpty
import qualified Data.List.NonEmpty as N
import Control.Applicative
import Control.Monad
import Control.Monad.Trans.Free
import Control.Monad.Trans.Except
import Control.Monad.Trans.State
import Control.Monad.Trans.Writer.Strict
import qualified Control.Monad.Catch as C
import Control.Exception
import Control.Concurrent
import Control.Concurrent.Conceit
import Pipes
import qualified Pipes as P
import qualified Pipes.Prelude as P
import Pipes.Lift
import Pipes.ByteString
import Pipes.Parse
import qualified Pipes.Text as T
import Pipes.Concurrent
import Pipes.Safe (SafeT, runSafeT)
import System.IO
import System.IO.Error
import System.Process
import System.Process.Lens
import System.Exit
execute :: PipingPolicy Void a -> CreateProcess -> IO (ExitCode,a)
execute pp cprocess = either absurd id <$> executeFallibly pp cprocess
executeFallibly :: PipingPolicy e a -> CreateProcess -> IO (Either e (ExitCode,a))
executeFallibly pp record = case pp of
PPNone a -> executeInternal record nohandles $
\() -> (return . Right $ a,return ())
PPOutput action -> executeInternal (record{std_out = CreatePipe}) handleso $
\h->(action (fromHandle h),hClose h)
PPError action -> executeInternal (record{std_err = CreatePipe}) handlese $
\h->(action (fromHandle h),hClose h)
PPOutputError action -> executeInternal (record{std_out = CreatePipe, std_err = CreatePipe}) handlesoe $
\(hout,herr)->(action (fromHandle hout,fromHandle herr),hClose hout `finally` hClose herr)
PPInput action -> executeInternal (record{std_in = CreatePipe}) handlesi $
\h -> (action (toHandle h, hClose h), return ())
PPInputOutput action -> executeInternal (record{std_in = CreatePipe,std_out = CreatePipe}) handlesio $
\(hin,hout) -> (action (toHandle hin,hClose hin,fromHandle hout), hClose hout)
PPInputError action -> executeInternal (record{std_in = CreatePipe,std_err = CreatePipe}) handlesie $
\(hin,herr) -> (action (toHandle hin,hClose hin,fromHandle herr), hClose herr)
PPInputOutputError action -> executeInternal (record{std_in = CreatePipe, std_out = CreatePipe, std_err = CreatePipe}) handlesioe $
\(hin,hout,herr) -> (action (toHandle hin,hClose hin,fromHandle hout,fromHandle herr), hClose hout `finally` hClose herr)
executeInternal :: CreateProcess -> (forall m. Applicative m => (t -> m t) -> (Maybe Handle, Maybe Handle, Maybe Handle) -> m (Maybe Handle, Maybe Handle, Maybe Handle)) -> (t ->(IO (Either e a),IO ())) -> IO (Either e (ExitCode,a))
executeInternal record somePrism allocator = mask $ \restore -> do
(min,mout,merr,phandle) <- createProcess record
case getFirst . getConst . somePrism (Const . First . Just) $ (min,mout,merr) of
Nothing ->
throwIO (userError "stdin/stdout/stderr handle unwantedly null")
`finally`
terminateCarefully phandle
Just t ->
let (action,cleanup) = allocator t in
(restore (terminateOnError phandle action) `onException` terminateCarefully phandle) `finally` cleanup
exitCode :: (ExitCode,a) -> Either Int a
exitCode (ec,a) = case ec of
ExitSuccess -> Right a
ExitFailure i -> Left i
terminateCarefully :: ProcessHandle -> IO ()
terminateCarefully pHandle = do
mExitCode <- getProcessExitCode pHandle
case mExitCode of
Nothing -> terminateProcess pHandle
Just _ -> return ()
terminateOnError :: ProcessHandle
-> IO (Either e a)
-> IO (Either e (ExitCode,a))
terminateOnError pHandle action = do
result <- action
case result of
Left e -> do
terminateCarefully pHandle
return $ Left e
Right r -> do
exitCode <- waitForProcess pHandle
return $ Right (exitCode,r)
data PipingPolicy e a =
PPNone a
| PPOutput (Producer ByteString IO () -> IO (Either e a))
| PPError (Producer ByteString IO () -> IO (Either e a))
| PPOutputError ((Producer ByteString IO (),Producer ByteString IO ()) -> IO (Either e a))
| PPInput ((Consumer ByteString IO (), IO ()) -> IO (Either e a))
| PPInputOutput ((Consumer ByteString IO (), IO (),Producer ByteString IO ()) -> IO (Either e a))
| PPInputError ((Consumer ByteString IO (), IO (), Producer ByteString IO ()) -> IO (Either e a))
| PPInputOutputError ((Consumer ByteString IO (),IO (),Producer ByteString IO (),Producer ByteString IO ()) -> IO (Either e a))
deriving (Functor)
instance Bifunctor PipingPolicy where
bimap f g pp = case pp of
PPNone a -> PPNone $ g a
PPOutput action -> PPOutput $ fmap (fmap (bimap f g)) action
PPError action -> PPError $ fmap (fmap (bimap f g)) action
PPOutputError action -> PPOutputError $ fmap (fmap (bimap f g)) action
PPInput action -> PPInput $ fmap (fmap (bimap f g)) action
PPInputOutput action -> PPInputOutput $ fmap (fmap (bimap f g)) action
PPInputError action -> PPInputError $ fmap (fmap (bimap f g)) action
PPInputOutputError action -> PPInputOutputError $ fmap (fmap (bimap f g)) action
nopiping :: PipingPolicy e ()
nopiping = PPNone ()
pipeo :: Siphon ByteString e a -> PipingPolicy e a
pipeo (runSiphon -> siphonout) = PPOutput $ siphonout
pipee :: Siphon ByteString e a -> PipingPolicy e a
pipee (runSiphon -> siphonout) = PPError $ siphonout
pipeoe :: Siphon ByteString e a -> Siphon ByteString e b -> PipingPolicy e (a,b)
pipeoe (runSiphon -> siphonout) (runSiphon -> siphonerr) =
PPOutputError $ uncurry $ separated siphonout siphonerr
pipeoec :: LinePolicy e -> LinePolicy e -> Siphon Text e a -> PipingPolicy e a
pipeoec policy1 policy2 (runSiphon -> siphon) =
PPOutputError $ uncurry $ combined policy1 policy2 siphon
pipei :: Pump ByteString e i -> PipingPolicy e i
pipei (Pump feeder) = PPInput $ \(consumer,cleanup) -> feeder consumer `finally` cleanup
pipeio :: Pump ByteString e i -> Siphon ByteString e a -> PipingPolicy e (i,a)
pipeio (Pump feeder) (runSiphon -> siphonout) = PPInputOutput $ \(consumer,cleanup,producer) ->
(conceit (feeder consumer `finally` cleanup) (siphonout producer))
pipeie :: Pump ByteString e i -> Siphon ByteString e a -> PipingPolicy e (i,a)
pipeie (Pump feeder) (runSiphon -> siphonerr) = PPInputError $ \(consumer,cleanup,producer) ->
(conceit (feeder consumer `finally` cleanup) (siphonerr producer))
pipeioe :: Pump ByteString e i -> Siphon ByteString e a -> Siphon ByteString e b -> PipingPolicy e (i,a,b)
pipeioe (Pump feeder) (runSiphon -> siphonout) (runSiphon -> siphonerr) = fmap flattenTuple $ PPInputOutputError $
\(consumer,cleanup,outprod,errprod) ->
(conceit (feeder consumer `finally` cleanup)
(separated siphonout siphonerr outprod errprod))
where
flattenTuple (i, (a, b)) = (i,a,b)
pipeioec :: Pump ByteString e i -> LinePolicy e -> LinePolicy e -> Siphon Text e a -> PipingPolicy e (i,a)
pipeioec (Pump feeder) policy1 policy2 (runSiphon -> siphon) = PPInputOutputError $
\(consumer,cleanup,outprod,errprod) ->
(conceit (feeder consumer `finally` cleanup)
(combined policy1 policy2 siphon outprod errprod))
separated :: (Producer ByteString IO () -> IO (Either e a))
-> (Producer ByteString IO () -> IO (Either e b))
-> Producer ByteString IO () -> Producer ByteString IO () -> IO (Either e (a,b))
separated outfunc errfunc outprod errprod =
conceit (outfunc outprod) (errfunc errprod)
data LinePolicy e = LinePolicy
{
teardown :: (forall r. Producer T.Text IO r -> Producer T.Text IO r)
-> (FreeT (Producer T.Text IO) IO (Producer ByteString IO ()) -> IO (Producer ByteString IO ()))
-> Producer ByteString IO () -> IO (Either e ())
, lineTweaker :: forall r. Producer T.Text IO r -> Producer T.Text IO r
}
instance Functor LinePolicy where
fmap f (LinePolicy func lt) = LinePolicy (\x y z -> fmap (bimap f id) $ func x y z) lt
tweakLines :: (forall r. Producer T.Text IO r -> Producer T.Text IO r) -> LinePolicy e -> LinePolicy e
tweakLines lt' (LinePolicy tear lt) = LinePolicy tear (lt' . lt)
linePolicy :: DecodingFunction ByteString Text
-> Siphon ByteString e ()
-> LinePolicy e
linePolicy decoder lopo = LinePolicy
(\tweaker teardown producer -> do
let freeLines = transFreeT tweaker
. viewLines
. decoder
$ producer
viewLines = getConst . T.lines Const
teardown freeLines >>= runSiphon lopo)
id
combined :: LinePolicy e
-> LinePolicy e
-> (Producer T.Text IO () -> IO (Either e a))
-> Producer ByteString IO () -> Producer ByteString IO () -> IO (Either e a)
combined (LinePolicy fun1 twk1) (LinePolicy fun2 twk2) combinedConsumer prod1 prod2 =
manyCombined [fmap ($prod1) (fun1 twk1), fmap ($prod2) (fun2 twk2)] combinedConsumer
where
manyCombined :: [(FreeT (Producer T.Text IO) IO (Producer ByteString IO ()) -> IO (Producer ByteString IO ())) -> IO (Either e ())]
-> (Producer T.Text IO () -> IO (Either e a))
-> IO (Either e a)
manyCombined actions consumer = do
(outbox, inbox, seal) <- spawn' Single
mVar <- newMVar outbox
runConceit $
Conceit (mapConceit ($ iterTLines mVar) actions `finally` atomically seal)
*>
Conceit (consumer (fromInput inbox) `finally` atomically seal)
where
iterTLines mvar = iterT $ \textProducer -> do
join $ withMVar mvar $ \output -> do
runEffect $ (textProducer <* P.yield (singleton '\n')) >-> (toOutput output >> P.drain)
fromProducer :: Producer b IO r -> Pump b e ()
fromProducer producer = Pump $ \consumer -> fmap pure $ runEffect (mute producer >-> consumer)
fromSafeProducer :: Producer b (SafeT IO) r -> Pump b e ()
fromSafeProducer producer = Pump $ safely $ \consumer -> fmap pure $ runEffect (mute producer >-> consumer)
fromFallibleProducer :: Producer b (ExceptT e IO) r -> Pump b e ()
fromFallibleProducer producer = Pump $ \consumer -> runExceptT $ runEffect (mute producer >-> hoist lift consumer)
safely :: (MFunctor t, C.MonadMask m, MonadIO m)
=> (t (SafeT m) l -> (SafeT m) x)
-> t m l -> m x
safely activity = runSafeT . activity . hoist lift
type DecodingFunction bytes text = forall r. Producer bytes IO r -> Producer text IO (Producer bytes IO r)
encoded :: DecodingFunction bytes text
-> Siphon bytes e (a -> b)
-> Siphon text e a
-> Siphon bytes e b
encoded decoder policy activity =
Unhalting $ \producer ->
runExceptT $ do
(a,leftovers) <- ExceptT $ unhalting activity $ decoder producer
(f,r) <- ExceptT $ unhalting policy leftovers
pure (f a,r)
newtype Pump b e a = Pump { runPump :: Consumer b IO () -> IO (Either e a) } deriving Functor
instance Bifunctor (Pump b) where
bimap f g (Pump x) = Pump $ fmap (liftM (bimap f g)) x
instance Applicative (Pump b e) where
pure = Pump . pure . pure . pure
Pump fs <*> Pump as =
Pump $ \consumer -> do
(outbox1,inbox1,seal1) <- spawn' Single
(outbox2,inbox2,seal2) <- spawn' Single
runConceit $
Conceit (runExceptT $ do
r1 <- ExceptT $ (fs $ toOutput outbox1)
`finally` atomically seal1
r2 <- ExceptT $ (as $ toOutput outbox2)
`finally` atomically seal2
return $ r1 r2
)
<*
Conceit (do
(runEffect $
(fromInput inbox1 >> fromInput inbox2) >-> consumer)
`finally` atomically seal1
`finally` atomically seal2
runExceptT $ pure ()
)
instance (Monoid a) => Monoid (Pump b e a) where
mempty = Pump . pure . pure . pure $ mempty
mappend s1 s2 = (<>) <$> s1 <*> s2
data Siphon b e a =
Trivial a
| Unhalting (forall r. Producer b IO r -> IO (Either e (a,r)))
| Halting (Producer b IO () -> IO (Either e a))
deriving (Functor)
instance Bifunctor (Siphon b) where
bimap f g s = case s of
Trivial a -> Trivial $ g a
Unhalting u -> Unhalting $ fmap (liftM (bimap f (bimap g id))) u
Halting h -> Halting $ fmap (liftM (bimap f g)) h
instance Applicative (Siphon b e) where
pure = Trivial
s1 <*> s2 = case (s1,s2) of
(Trivial f,_) -> fmap f s2
(_,Trivial a) -> fmap ($ a) s1
(_,_) -> bifurcate (halting s1) (halting s2)
where
bifurcate fs as =
Unhalting $ \producer -> do
(outbox1,inbox1,seal1) <- spawn' Single
(outbox2,inbox2,seal2) <- spawn' Single
runConceit $
(,)
<$>
Conceit (fmap (uncurry ($)) <$> conceit ((fs $ fromInput inbox1)
`finally` atomically seal1)
((as $ fromInput inbox2)
`finally` atomically seal2)
)
<*>
Conceit ((fmap pure $ runEffect $
producer >-> P.tee (toOutput outbox1 >> P.drain)
>-> (toOutput outbox2 >> P.drain))
`finally` atomically seal1 `finally` atomically seal2
)
runSiphon :: Siphon b e a -> Producer b IO () -> IO (Either e a)
runSiphon s = case s of
h@(Halting _) -> halting $ Unhalting $ unhalting h
_ -> halting s
halting :: Siphon b e a -> Producer b IO () -> IO (Either e a)
halting s = case s of
a@(Trivial _) -> halting $ Unhalting $ unhalting a
Unhalting u -> \producer -> liftM (fmap fst) $ u producer
Halting h -> h
unhalting :: Siphon b e a -> Producer b IO r -> IO (Either e (a,r))
unhalting s = case s of
Trivial a -> \producer -> do
r <- (runEffect $ producer >-> P.drain)
pure . pure $ (a,r)
Unhalting u -> u
Halting activity -> \producer -> do
(outbox,inbox,seal) <- spawn' Single
runConceit $
(,)
<$>
Conceit (activity (fromInput inbox) `finally` atomically seal)
<*>
Conceit ((fmap pure $ runEffect $
producer >-> (toOutput outbox >> P.drain))
`finally` atomically seal
)
instance (Monoid a) => Monoid (Siphon b e a) where
mempty = pure mempty
mappend s1 s2 = (<>) <$> s1 <*> s2
fromConsumer :: Consumer b IO r -> Siphon b e ()
fromConsumer consumer = siphon $ \producer -> fmap pure $ runEffect $ producer >-> mute consumer
fromSafeConsumer :: Consumer b (SafeT IO) r -> Siphon b e ()
fromSafeConsumer consumer = siphon $ safely $ \producer -> fmap pure $ runEffect $ producer >-> mute consumer
fromFallibleConsumer :: Consumer b (ExceptT e IO) r -> Siphon b e ()
fromFallibleConsumer consumer = siphon $ \producer -> runExceptT $ runEffect (hoist lift producer >-> mute consumer)
fromParser :: Parser b IO (Either e a) -> Siphon b e a
fromParser parser = siphon $ Pipes.Parse.evalStateT parser
siphon :: (Producer b IO () -> IO (Either e a))
-> Siphon b e a
siphon = Halting
siphon' :: (forall r. Producer b IO r -> IO (Either e (a,r))) -> Siphon b e a
siphon' = Unhalting
fromFold :: (Producer b IO () -> IO a) -> Siphon b e a
fromFold aFold = siphon $ fmap (fmap pure) $ aFold
fromFold' :: (forall r. Producer b IO r -> IO (a,r)) -> Siphon b e a
fromFold' aFold = siphon' $ fmap (fmap pure) aFold
fromFold'_ :: (forall r. Producer b IO r -> IO r) -> Siphon b e ()
fromFold'_ aFold = fromFold' $ fmap (fmap ((,) ())) aFold
unwanted :: a -> Siphon b b a
unwanted a = Unhalting $ \producer -> do
n <- next producer
return $ case n of
Left r -> Right (a,r)
Right (b,_) -> Left b
executePipeline :: PipingPolicy Void a -> Tree (Stage Void) -> IO a
executePipeline pp pipeline = either absurd id <$> executePipelineFallibly pp pipeline
executePipelineFallibly :: PipingPolicy e a -> Tree (Stage e) -> IO (Either e a)
executePipelineFallibly policy (Node (Stage cp lpol ecpol _) []) = case policy of
PPNone a -> blende ecpol <$> executeFallibly policy cp
PPOutput action -> blende ecpol <$> executeFallibly policy cp
PPError action -> do
(eoutbox, einbox, eseal) <- spawn' Single
errf <- errorSiphonUTF8 <$> newMVar eoutbox
runConceit $
(Conceit $ action $ fromInput einbox)
<*
(Conceit $ blende ecpol <$> executeFallibly (pipee (errf lpol)) cp `finally` atomically eseal)
PPOutputError action -> do
(outbox, inbox, seal) <- spawn' Single
(eoutbox, einbox, eseal) <- spawn' Single
errf <- errorSiphonUTF8 <$> newMVar eoutbox
runConceit $
(Conceit $ action $ (fromInput inbox,fromInput einbox))
<*
(Conceit $ blende ecpol <$> executeFallibly
(pipeoe (fromConsumer.toOutput $ outbox) (errf lpol)) cp
`finally` atomically seal `finally` atomically eseal
)
PPInput action -> blende ecpol <$> executeFallibly policy cp
PPInputOutput action -> blende ecpol <$> executeFallibly policy cp
PPInputError action -> do
(outbox, inbox, seal) <- spawn' Single
(eoutbox, einbox, eseal) <- spawn' Single
errf <- errorSiphonUTF8 <$> newMVar eoutbox
runConceit $
(Conceit $ action (toOutput outbox,atomically seal,fromInput einbox))
<*
(Conceit $ blende ecpol <$> executeFallibly
(pipeie (fromProducer . fromInput $ inbox) (errf lpol)) cp
`finally` atomically seal `finally` atomically eseal
)
PPInputOutputError action -> do
(ioutbox, iinbox, iseal) <- spawn' Single
(ooutbox, oinbox, oseal) <- spawn' Single
(eoutbox, einbox, eseal) <- spawn' Single
errf <- errorSiphonUTF8 <$> newMVar eoutbox
runConceit $
(Conceit $ action (toOutput ioutbox,atomically iseal,fromInput oinbox,fromInput einbox))
<*
(Conceit $ blende ecpol <$> executeFallibly
(pipeioe (fromProducer . fromInput $ iinbox)
(fromConsumer . toOutput $ ooutbox)
(errf lpol)
)
cp
`finally` atomically iseal `finally` atomically oseal `finally` atomically eseal
)
executePipelineFallibly policy (Node s (s':ss)) =
let pipeline = CreatePipeline s $ s' :| ss
in case policy of
PPNone a -> fmap (fmap (const a)) $
executePipelineInternal
(\o _ -> mute $ pipeo o)
(\i o _ -> mute $ pipeio i o)
(\i _ -> mute $ pipei i)
(\i _ -> mute $ pipei i)
pipeline
PPOutput action -> do
(outbox, inbox, seal) <- spawn' Single
runConceit $
(Conceit $ action $ fromInput inbox)
<*
(Conceit $ executePipelineInternal
(\o _ -> pipeo o)
(\i o _ -> mute $ pipeio i o)
(\i _ -> mute $ pipeio i (fromConsumer . toOutput $ outbox))
(\i _ -> mute $ pipei i)
pipeline
`finally` atomically seal
)
PPError action -> do
(eoutbox, einbox, eseal) <- spawn' Single
errf <- errorSiphonUTF8 <$> newMVar eoutbox
runConceit $
(Conceit $ action $ fromInput einbox)
<*
(Conceit $ executePipelineInternal
(\o l -> mute $ pipeoe o (errf l))
(\i o l -> mute $ pipeioe i o (errf l))
(\i l -> mute $ pipeie i (errf l))
(\i l -> mute $ pipeie i (errf l))
pipeline
`finally` atomically eseal)
PPOutputError action -> do
(outbox, inbox, seal) <- spawn' Single
(eoutbox, einbox, eseal) <- spawn' Single
errf <- errorSiphonUTF8 <$> newMVar eoutbox
runConceit $
(Conceit $ action $ (fromInput inbox,fromInput einbox))
<*
(Conceit $ executePipelineInternal
(\o l -> mute $ pipeoe o (errf l))
(\i o l -> mute $ pipeioe i o (errf l))
(\i l -> mute $ pipeioe i (fromConsumer . toOutput $ outbox) (errf l))
(\i l -> mute $ pipeie i (errf l))
pipeline
`finally` atomically seal `finally` atomically eseal
)
PPInput action -> do
(outbox, inbox, seal) <- spawn' Single
runConceit $
(Conceit $ action (toOutput outbox,atomically seal))
<*
(Conceit $ executePipelineInternal
(\o _ -> mute $ pipeio (fromProducer . fromInput $ inbox) o)
(\i o _ -> mute $ pipeio i o)
(\i _ -> mute $ pipei i)
(\i _ -> mute $ pipei i)
pipeline
`finally` atomically seal
)
PPInputOutput action -> do
(ioutbox, iinbox, iseal) <- spawn' Single
(ooutbox, oinbox, oseal) <- spawn' Single
runConceit $
(Conceit $ action (toOutput ioutbox,atomically iseal,fromInput oinbox))
<*
(Conceit $ executePipelineInternal
(\o _ -> mute $ pipeio (fromProducer . fromInput $ iinbox) o)
(\i o _ -> mute $ pipeio i o)
(\i _ -> mute $ pipeio i (fromConsumer . toOutput $ ooutbox))
(\i _ -> mute $ pipei i)
pipeline
`finally` atomically iseal `finally` atomically oseal
)
PPInputError action -> do
(outbox, inbox, seal) <- spawn' Single
(eoutbox, einbox, eseal) <- spawn' Single
errf <- errorSiphonUTF8 <$> newMVar eoutbox
runConceit $
(Conceit $ action (toOutput outbox,atomically seal,fromInput einbox))
<*
(Conceit $ executePipelineInternal
(\o l -> mute $ pipeioe (fromProducer . fromInput $ inbox) o (errf l))
(\i o l -> mute $ pipeioe i o (errf l))
(\i l -> mute $ pipeie i (errf l))
(\i l -> mute $ pipeie i (errf l))
pipeline
`finally` atomically seal `finally` atomically eseal
)
PPInputOutputError action -> do
(ioutbox, iinbox, iseal) <- spawn' Single
(ooutbox, oinbox, oseal) <- spawn' Single
(eoutbox, einbox, eseal) <- spawn' Single
errf <- errorSiphonUTF8 <$> newMVar eoutbox
runConceit $
(Conceit $ action (toOutput ioutbox,atomically iseal,fromInput oinbox,fromInput einbox))
<*
(Conceit $ executePipelineInternal
(\o l -> mute $ pipeioe (fromProducer . fromInput $ iinbox) o (errf l))
(\i o l -> mute $ pipeioe i o (errf l))
(\i l -> mute $ pipeioe i (fromConsumer . toOutput $ ooutbox) (errf l))
(\i l -> mute $ pipeie i (errf l))
pipeline
`finally` atomically iseal `finally` atomically oseal `finally` atomically eseal
)
errorSiphonUTF8 :: MVar (Output ByteString) -> LinePolicy e -> Siphon ByteString e ()
errorSiphonUTF8 mvar (LinePolicy fun twk) = Halting $ fun twk iterTLines
where
iterTLines = iterT $ \textProducer -> do
join $ withMVar mvar $ \output -> do
runEffect $ (textProducer <* P.yield (singleton '\n'))
>-> P.map Data.Text.Encoding.encodeUtf8
>-> (toOutput output >> P.drain)
mute :: Functor f => f a -> f ()
mute = fmap (const ())
data Stage e = Stage
{
processDefinition' :: CreateProcess
, stderrLinePolicy' :: LinePolicy e
, exitCodePolicy' :: ExitCode -> Either e ()
, inbound' :: forall r. Producer ByteString IO r -> Producer ByteString (ExceptT e IO) r
}
instance Functor (Stage) where
fmap f (Stage a b c d) = Stage a (fmap f b) (bimap f id . c) (hoist (mapExceptT $ liftM (bimap f id)) . d)
stage :: LinePolicy e -> (ExitCode -> Either e ()) -> CreateProcess -> Stage e
stage lp ec cp = Stage cp lp ec (hoist lift)
inbound :: (forall r. Producer ByteString (ExceptT e IO) r -> Producer ByteString (ExceptT e IO) r)
-> Stage e -> Stage e
inbound f (Stage a b c d) = Stage a b c (f . d)
data CreatePipeline e = CreatePipeline (Stage e) (NonEmpty (Tree (Stage e))) deriving (Functor)
executePipelineInternal :: (Siphon ByteString e () -> LinePolicy e -> PipingPolicy e ())
-> (Pump ByteString e () -> Siphon ByteString e () -> LinePolicy e -> PipingPolicy e ())
-> (Pump ByteString e () -> LinePolicy e -> PipingPolicy e ())
-> (Pump ByteString e () -> LinePolicy e -> PipingPolicy e ())
-> CreatePipeline e
-> IO (Either e ())
executePipelineInternal ppinitial ppmiddle ppend ppend' (CreatePipeline (Stage cp lpol ecpol _) a) =
blende ecpol <$> executeFallibly (ppinitial (runNonEmpty ppend ppend' a) lpol) cp
where
runTree ppend ppend' (Node (Stage cp lpol ecpol pipe) forest) = case forest of
[] -> Halting $ \producer ->
blende ecpol <$> executeFallibly (ppend (fromFallibleProducer $ pipe producer) lpol) cp
c1 : cs -> Halting $ \producer ->
blende ecpol <$> executeFallibly (ppmiddle (fromFallibleProducer $ pipe producer) (runNonEmpty ppend ppend' (c1 :| cs)) lpol) cp
runNonEmpty ppend ppend' (b :| bs) =
runTree ppend ppend' b <* Prelude.foldr (<*) (pure ()) (runTree ppend' ppend' <$> bs)
blende :: (ExitCode -> Either e ()) -> Either e (ExitCode,a) -> Either e a
blende f r = r >>= \(ec,a) -> f ec *> pure a
pipefail :: ExitCode -> Either Int ()
pipefail ec = case ec of
ExitSuccess -> Right ()
ExitFailure i -> Left i