module System.Process.Streaming (
execute
, executeFallibly
, PipingPolicy
, nopiping
, pipeo
, pipee
, pipeoe
, pipeoec
, pipei
, pipeio
, pipeie
, pipeioe
, pipeioec
, Pump (..)
, fromProducer
, fromSafeProducer
, fromFallibleProducer
, Siphon
, siphon
, siphon'
, fromFold
, fromFold'
, fromFold'_
, fromConsumer
, fromSafeConsumer
, fromFallibleConsumer
, fromParser
, unwanted
, DecodingFunction
, encoded
, LinePolicy
, linePolicy
, tweakLines
, executePipeline
, executePipelineFallibly
, Stage
, stage
, pipefail
, inbound
, module System.Process
) where
import Data.Maybe
import Data.Bifunctor
import Data.Functor.Identity
import Data.Either
import Data.Monoid
import Data.Foldable
import Data.Traversable
import Data.Typeable
import Data.Tree
import Data.Text
import Data.Text.Encoding
import Data.Void
import Data.List.NonEmpty
import qualified Data.List.NonEmpty as N
import Control.Applicative
import Control.Monad
import Control.Monad.Trans.Free
import Control.Monad.Trans.Except
import Control.Monad.Trans.State
import Control.Monad.Trans.Writer.Strict
import qualified Control.Monad.Catch as C
import Control.Exception
import Control.Concurrent
import Control.Concurrent.Async
import Pipes
import qualified Pipes as P
import qualified Pipes.Prelude as P
import Pipes.Lift
import Pipes.ByteString
import Pipes.Parse
import qualified Pipes.Text as T
import Pipes.Concurrent
import Pipes.Safe (SafeT, runSafeT)
import System.IO
import System.IO.Error
import System.Process
import System.Process.Lens
import System.Exit
execute :: PipingPolicy Void a -> CreateProcess -> IO (ExitCode,a)
execute pp cprocess = either absurd id <$> executeFallibly pp cprocess
executeFallibly :: PipingPolicy e a -> CreateProcess -> IO (Either e (ExitCode,a))
executeFallibly pp record = case pp of
PPNone a -> executeInternal record nohandles $
\() -> (return . Right $ a,return ())
PPOutput action -> executeInternal (record{std_out = CreatePipe}) handleso $
\h->(action (fromHandle h),hClose h)
PPError action -> executeInternal (record{std_err = CreatePipe}) handlese $
\h->(action (fromHandle h),hClose h)
PPOutputError action -> executeInternal (record{std_out = CreatePipe, std_err = CreatePipe}) handlesoe $
\(hout,herr)->(action (fromHandle hout,fromHandle herr),hClose hout `finally` hClose herr)
PPInput action -> executeInternal (record{std_in = CreatePipe}) handlesi $
\h -> (action (toHandle h, hClose h), return ())
PPInputOutput action -> executeInternal (record{std_in = CreatePipe,std_out = CreatePipe}) handlesio $
\(hin,hout) -> (action (toHandle hin,hClose hin,fromHandle hout), hClose hout)
PPInputError action -> executeInternal (record{std_in = CreatePipe,std_err = CreatePipe}) handlesie $
\(hin,herr) -> (action (toHandle hin,hClose hin,fromHandle herr), hClose herr)
PPInputOutputError action -> executeInternal (record{std_in = CreatePipe, std_out = CreatePipe, std_err = CreatePipe}) handlesioe $
\(hin,hout,herr) -> (action (toHandle hin,hClose hin,fromHandle hout,fromHandle herr), hClose hout `finally` hClose herr)
executeInternal :: CreateProcess -> (forall m. Applicative m => (t -> m t) -> (Maybe Handle, Maybe Handle, Maybe Handle) -> m (Maybe Handle, Maybe Handle, Maybe Handle)) -> (t ->(IO (Either e a),IO ())) -> IO (Either e (ExitCode,a))
executeInternal record somePrism allocator = mask $ \restore -> do
(min,mout,merr,phandle) <- createProcess record
case getFirst . getConst . somePrism (Const . First . Just) $ (min,mout,merr) of
Nothing ->
throwIO (userError "stdin/stdout/stderr handle unwantedly null")
`finally`
terminateCarefully phandle
Just t ->
let (action,cleanup) = allocator t in
(restore (terminateOnError phandle action) `onException` terminateCarefully phandle) `finally` cleanup
exitCode :: (ExitCode,a) -> Either Int a
exitCode (ec,a) = case ec of
ExitSuccess -> Right a
ExitFailure i -> Left i
terminateCarefully :: ProcessHandle -> IO ()
terminateCarefully pHandle = do
mExitCode <- getProcessExitCode pHandle
case mExitCode of
Nothing -> terminateProcess pHandle
Just _ -> return ()
terminateOnError :: ProcessHandle
-> IO (Either e a)
-> IO (Either e (ExitCode,a))
terminateOnError pHandle action = do
result <- action
case result of
Left e -> do
terminateCarefully pHandle
return $ Left e
Right r -> do
exitCode <- waitForProcess pHandle
return $ Right (exitCode,r)
data PipingPolicy e a =
PPNone a
| PPOutput (Producer ByteString IO () -> IO (Either e a))
| PPError (Producer ByteString IO () -> IO (Either e a))
| PPOutputError ((Producer ByteString IO (),Producer ByteString IO ()) -> IO (Either e a))
| PPInput ((Consumer ByteString IO (), IO ()) -> IO (Either e a))
| PPInputOutput ((Consumer ByteString IO (), IO (),Producer ByteString IO ()) -> IO (Either e a))
| PPInputError ((Consumer ByteString IO (), IO (), Producer ByteString IO ()) -> IO (Either e a))
| PPInputOutputError ((Consumer ByteString IO (),IO (),Producer ByteString IO (),Producer ByteString IO ()) -> IO (Either e a))
deriving (Functor)
instance Bifunctor PipingPolicy where
bimap f g pp = case pp of
PPNone a -> PPNone $ g a
PPOutput action -> PPOutput $ fmap (fmap (bimap f g)) action
PPError action -> PPError $ fmap (fmap (bimap f g)) action
PPOutputError action -> PPOutputError $ fmap (fmap (bimap f g)) action
PPInput action -> PPInput $ fmap (fmap (bimap f g)) action
PPInputOutput action -> PPInputOutput $ fmap (fmap (bimap f g)) action
PPInputError action -> PPInputError $ fmap (fmap (bimap f g)) action
PPInputOutputError action -> PPInputOutputError $ fmap (fmap (bimap f g)) action
nopiping :: PipingPolicy e ()
nopiping = PPNone ()
pipeo :: (Show e,Typeable e) => Siphon ByteString e a -> PipingPolicy e a
pipeo (runSiphon -> siphonout) = PPOutput $ siphonout
pipee :: (Show e,Typeable e) => Siphon ByteString e a -> PipingPolicy e a
pipee (runSiphon -> siphonout) = PPError $ siphonout
pipeoe :: (Show e,Typeable e) => Siphon ByteString e a -> Siphon ByteString e b -> PipingPolicy e (a,b)
pipeoe (runSiphon -> siphonout) (runSiphon -> siphonerr) =
PPOutputError $ uncurry $ separated siphonout siphonerr
pipeoec :: (Show e,Typeable e) => LinePolicy e -> LinePolicy e -> Siphon Text e a -> PipingPolicy e a
pipeoec policy1 policy2 (runSiphon -> siphon) =
PPOutputError $ uncurry $ combined policy1 policy2 siphon
pipei :: (Show e, Typeable e) => Pump ByteString e i -> PipingPolicy e i
pipei (Pump feeder) = PPInput $ \(consumer,cleanup) -> feeder consumer `finally` cleanup
pipeio :: (Show e, Typeable e)
=> Pump ByteString e i -> Siphon ByteString e a -> PipingPolicy e (i,a)
pipeio (Pump feeder) (runSiphon -> siphonout) = PPInputOutput $ \(consumer,cleanup,producer) ->
(conceit (feeder consumer `finally` cleanup) (siphonout producer))
pipeie :: (Show e, Typeable e)
=> Pump ByteString e i -> Siphon ByteString e a -> PipingPolicy e (i,a)
pipeie (Pump feeder) (runSiphon -> siphonerr) = PPInputError $ \(consumer,cleanup,producer) ->
(conceit (feeder consumer `finally` cleanup) (siphonerr producer))
pipeioe :: (Show e, Typeable e)
=> Pump ByteString e i -> Siphon ByteString e a -> Siphon ByteString e b -> PipingPolicy e (i,a,b)
pipeioe (Pump feeder) (runSiphon -> siphonout) (runSiphon -> siphonerr) = fmap flattenTuple $ PPInputOutputError $
\(consumer,cleanup,outprod,errprod) ->
(conceit (feeder consumer `finally` cleanup)
(separated siphonout siphonerr outprod errprod))
where
flattenTuple (i, (a, b)) = (i,a,b)
pipeioec :: (Show e, Typeable e)
=> Pump ByteString e i -> LinePolicy e -> LinePolicy e -> Siphon Text e a -> PipingPolicy e (i,a)
pipeioec (Pump feeder) policy1 policy2 (runSiphon -> siphon) = PPInputOutputError $
\(consumer,cleanup,outprod,errprod) ->
(conceit (feeder consumer `finally` cleanup)
(combined policy1 policy2 siphon outprod errprod))
separated :: (Show e, Typeable e)
=> (Producer ByteString IO () -> IO (Either e a))
-> (Producer ByteString IO () -> IO (Either e b))
-> Producer ByteString IO () -> Producer ByteString IO () -> IO (Either e (a,b))
separated outfunc errfunc outprod errprod =
conceit (outfunc outprod) (errfunc errprod)
data LinePolicy e = LinePolicy
{
teardown :: (forall r. Producer T.Text IO r -> Producer T.Text IO r)
-> (FreeT (Producer T.Text IO) IO (Producer ByteString IO ()) -> IO (Producer ByteString IO ()))
-> Producer ByteString IO () -> IO (Either e ())
, lineTweaker :: forall r. Producer T.Text IO r -> Producer T.Text IO r
}
instance Functor LinePolicy where
fmap f (LinePolicy func lt) = LinePolicy (\x y z -> fmap (bimap f id) $ func x y z) lt
tweakLines :: (forall r. Producer T.Text IO r -> Producer T.Text IO r) -> LinePolicy e -> LinePolicy e
tweakLines lt' (LinePolicy tear lt) = LinePolicy tear (lt' . lt)
linePolicy :: (Show e, Typeable e)
=> DecodingFunction ByteString Text
-> Siphon ByteString e ()
-> LinePolicy e
linePolicy decoder lopo = LinePolicy
(\tweaker teardown producer -> do
let freeLines = transFreeT tweaker
. viewLines
. decoder
$ producer
viewLines = getConst . T.lines Const
teardown freeLines >>= runSiphon lopo)
id
combined :: (Show e, Typeable e)
=> LinePolicy e
-> LinePolicy e
-> (Producer T.Text IO () -> IO (Either e a))
-> Producer ByteString IO () -> Producer ByteString IO () -> IO (Either e a)
combined (LinePolicy fun1 twk1) (LinePolicy fun2 twk2) combinedConsumer prod1 prod2 =
manyCombined [fmap ($prod1) (fun1 twk1), fmap ($prod2) (fun2 twk2)] combinedConsumer
where
manyCombined :: (Show e, Typeable e)
=> [(FreeT (Producer T.Text IO) IO (Producer ByteString IO ()) -> IO (Producer ByteString IO ())) -> IO (Either e ())]
-> (Producer T.Text IO () -> IO (Either e a))
-> IO (Either e a)
manyCombined actions consumer = do
(outbox, inbox, seal) <- spawn' Single
mVar <- newMVar outbox
runConceit $
Conceit (mapConceit ($ iterTLines mVar) actions `finally` atomically seal)
*>
Conceit (consumer (fromInput inbox) `finally` atomically seal)
where
iterTLines mvar = iterT $ \textProducer -> do
join $ withMVar mvar $ \output -> do
runEffect $ (textProducer <* P.yield (singleton '\n')) >-> (toOutput output >> P.drain)
fromProducer :: Producer b IO r -> Pump b e ()
fromProducer producer = Pump $ \consumer -> fmap pure $ runEffect (mute producer >-> consumer)
fromSafeProducer :: Producer b (SafeT IO) r -> Pump b e ()
fromSafeProducer producer = Pump $ safely $ \consumer -> fmap pure $ runEffect (mute producer >-> consumer)
fromFallibleProducer :: Producer b (ExceptT e IO) r -> Pump b e ()
fromFallibleProducer producer = Pump $ \consumer -> runExceptT $ runEffect (mute producer >-> hoist lift consumer)
safely :: (MFunctor t, C.MonadMask m, MonadIO m)
=> (t (SafeT m) l -> (SafeT m) x)
-> t m l -> m x
safely activity = runSafeT . activity . hoist lift
type DecodingFunction bytes text = forall r. Producer bytes IO r -> Producer text IO (Producer bytes IO r)
encoded :: (Show e, Typeable e)
=> DecodingFunction bytes text
-> Siphon bytes e (a -> b)
-> Siphon text e a
-> Siphon bytes e b
encoded decoder policy activity =
Unhalting $ \producer ->
runExceptT $ do
(a,leftovers) <- ExceptT $ unhalting activity $ decoder producer
(f,r) <- ExceptT $ unhalting policy leftovers
pure (f a,r)
data WrappedError e = WrappedError e
deriving (Show, Typeable)
instance (Show e, Typeable e) => Exception (WrappedError e)
elideError :: (Show e, Typeable e) => IO (Either e a) -> IO a
elideError action = action >>= either (throwIO . WrappedError) return
revealError :: (Show e, Typeable e) => IO a -> IO (Either e a)
revealError action = catch (action >>= return . Right)
(\(WrappedError e) -> return . Left $ e)
newtype Conceit e a = Conceit { runConceit :: IO (Either e a) } deriving Functor
instance Bifunctor Conceit where
bimap f g (Conceit x) = Conceit $ liftM (bimap f g) x
instance (Show e, Typeable e) => Applicative (Conceit e) where
pure = Conceit . pure . pure
Conceit fs <*> Conceit as =
Conceit . revealError $
uncurry ($) <$> concurrently (elideError fs) (elideError as)
instance (Show e, Typeable e) => Alternative (Conceit e) where
empty = Conceit $ forever (threadDelay maxBound)
Conceit as <|> Conceit bs =
Conceit $ either id id <$> race as bs
instance (Show e, Typeable e, Monoid a) => Monoid (Conceit e a) where
mempty = Conceit . pure . pure $ mempty
mappend c1 c2 = (<>) <$> c1 <*> c2
conceit :: (Show e, Typeable e)
=> IO (Either e a)
-> IO (Either e b)
-> IO (Either e (a,b))
conceit c1 c2 = runConceit $ (,) <$> Conceit c1 <*> Conceit c2
mapConceit :: (Show e, Typeable e, Traversable t) => (a -> IO (Either e b)) -> t a -> IO (Either e (t b))
mapConceit f = revealError . mapConcurrently (elideError . f)
newtype Pump b e a = Pump { runPump :: Consumer b IO () -> IO (Either e a) } deriving Functor
instance Bifunctor (Pump b) where
bimap f g (Pump x) = Pump $ fmap (liftM (bimap f g)) x
instance (Show e, Typeable e) => Applicative (Pump b e) where
pure = Pump . pure . pure . pure
Pump fs <*> Pump as =
Pump $ \consumer -> do
(outbox1,inbox1,seal1) <- spawn' Single
(outbox2,inbox2,seal2) <- spawn' Single
runConceit $
Conceit (runExceptT $ do
r1 <- ExceptT $ (fs $ toOutput outbox1)
`finally` atomically seal1
r2 <- ExceptT $ (as $ toOutput outbox2)
`finally` atomically seal2
return $ r1 r2
)
<*
Conceit (do
(runEffect $
(fromInput inbox1 >> fromInput inbox2) >-> consumer)
`finally` atomically seal1
`finally` atomically seal2
runExceptT $ pure ()
)
instance (Show e, Typeable e, Monoid a) => Monoid (Pump b e a) where
mempty = Pump . pure . pure . pure $ mempty
mappend s1 s2 = (<>) <$> s1 <*> s2
data Siphon b e a =
Trivial a
| Unhalting (forall r. Producer b IO r -> IO (Either e (a,r)))
| Halting (Producer b IO () -> IO (Either e a))
deriving (Functor)
instance Bifunctor (Siphon b) where
bimap f g s = case s of
Trivial a -> Trivial $ g a
Unhalting u -> Unhalting $ fmap (liftM (bimap f (bimap g id))) u
Halting h -> Halting $ fmap (liftM (bimap f g)) h
instance (Show e, Typeable e) => Applicative (Siphon b e) where
pure = Trivial
s1 <*> s2 = case (s1,s2) of
(Trivial f,_) -> fmap f s2
(_,Trivial a) -> fmap ($ a) s1
(_,_) -> bifurcate (halting s1) (halting s2)
where
bifurcate fs as =
Unhalting $ \producer -> do
(outbox1,inbox1,seal1) <- spawn' Single
(outbox2,inbox2,seal2) <- spawn' Single
runConceit $
(,)
<$>
Conceit (fmap (uncurry ($)) <$> conceit ((fs $ fromInput inbox1)
`finally` atomically seal1)
((as $ fromInput inbox2)
`finally` atomically seal2)
)
<*>
Conceit ((fmap pure $ runEffect $
producer >-> P.tee (toOutput outbox1 >> P.drain)
>-> (toOutput outbox2 >> P.drain))
`finally` atomically seal1 `finally` atomically seal2
)
runSiphon :: (Show e, Typeable e) => Siphon b e a -> Producer b IO () -> IO (Either e a)
runSiphon s = case s of
h@(Halting _) -> halting $ Unhalting $ unhalting h
_ -> halting s
halting :: (Show e, Typeable e) => Siphon b e a -> Producer b IO () -> IO (Either e a)
halting s = case s of
a@(Trivial _) -> halting $ Unhalting $ unhalting a
Unhalting u -> \producer -> liftM (fmap fst) $ u producer
Halting h -> h
unhalting :: (Show e, Typeable e) => Siphon b e a -> Producer b IO r -> IO (Either e (a,r))
unhalting s = case s of
Trivial a -> \producer -> do
r <- (runEffect $ producer >-> P.drain)
pure . pure $ (a,r)
Unhalting u -> u
Halting activity -> \producer -> do
(outbox,inbox,seal) <- spawn' Single
runConceit $
(,)
<$>
Conceit (activity (fromInput inbox) `finally` atomically seal)
<*>
Conceit ((fmap pure $ runEffect $
producer >-> (toOutput outbox >> P.drain))
`finally` atomically seal
)
instance (Show e, Typeable e, Monoid a) => Monoid (Siphon b e a) where
mempty = pure mempty
mappend s1 s2 = (<>) <$> s1 <*> s2
fromConsumer :: Consumer b IO r -> Siphon b e ()
fromConsumer consumer = siphon $ \producer -> fmap pure $ runEffect $ producer >-> mute consumer
fromSafeConsumer :: Consumer b (SafeT IO) r -> Siphon b e ()
fromSafeConsumer consumer = siphon $ safely $ \producer -> fmap pure $ runEffect $ producer >-> mute consumer
fromFallibleConsumer :: Consumer b (ExceptT e IO) r -> Siphon b e ()
fromFallibleConsumer consumer = siphon $ \producer -> runExceptT $ runEffect (hoist lift producer >-> mute consumer)
fromParser :: Parser b IO (Either e a) -> Siphon b e a
fromParser parser = siphon $ Pipes.Parse.evalStateT parser
siphon :: (Producer b IO () -> IO (Either e a))
-> Siphon b e a
siphon = Halting
siphon' :: (forall r. Producer b IO r -> IO (Either e (a,r))) -> Siphon b e a
siphon' = Unhalting
fromFold :: (Producer b IO () -> IO a) -> Siphon b e a
fromFold aFold = siphon $ fmap (fmap pure) $ aFold
fromFold' :: (forall r. Producer b IO r -> IO (a,r)) -> Siphon b e a
fromFold' aFold = siphon' $ fmap (fmap pure) aFold
fromFold'_ :: (forall r. Producer b IO r -> IO r) -> Siphon b e ()
fromFold'_ aFold = fromFold' $ fmap (fmap ((,) ())) aFold
unwanted :: a -> Siphon b b a
unwanted a = Unhalting $ \producer -> do
n <- next producer
return $ case n of
Left r -> Right (a,r)
Right (b,_) -> Left b
executePipeline :: PipingPolicy Void a -> Tree (Stage Void) -> IO a
executePipeline pp pipeline = either absurd id <$> executePipelineFallibly pp pipeline
executePipelineFallibly :: (Show e,Typeable e) => PipingPolicy e a -> Tree (Stage e) -> IO (Either e a)
executePipelineFallibly policy (Node (Stage cp lpol ecpol _) []) = case policy of
PPNone a -> blende ecpol <$> executeFallibly policy cp
PPOutput action -> blende ecpol <$> executeFallibly policy cp
PPError action -> do
(eoutbox, einbox, eseal) <- spawn' Single
errf <- errorSiphonUTF8 <$> newMVar eoutbox
runConceit $
(Conceit $ action $ fromInput einbox)
<*
(Conceit $ blende ecpol <$> executeFallibly (pipee (errf lpol)) cp `finally` atomically eseal)
PPOutputError action -> do
(outbox, inbox, seal) <- spawn' Single
(eoutbox, einbox, eseal) <- spawn' Single
errf <- errorSiphonUTF8 <$> newMVar eoutbox
runConceit $
(Conceit $ action $ (fromInput inbox,fromInput einbox))
<*
(Conceit $ blende ecpol <$> executeFallibly
(pipeoe (fromConsumer.toOutput $ outbox) (errf lpol)) cp
`finally` atomically seal `finally` atomically eseal
)
PPInput action -> blende ecpol <$> executeFallibly policy cp
PPInputOutput action -> blende ecpol <$> executeFallibly policy cp
PPInputError action -> do
(outbox, inbox, seal) <- spawn' Single
(eoutbox, einbox, eseal) <- spawn' Single
errf <- errorSiphonUTF8 <$> newMVar eoutbox
runConceit $
(Conceit $ action (toOutput outbox,atomically seal,fromInput einbox))
<*
(Conceit $ blende ecpol <$> executeFallibly
(pipeie (fromProducer . fromInput $ inbox) (errf lpol)) cp
`finally` atomically seal `finally` atomically eseal
)
PPInputOutputError action -> do
(ioutbox, iinbox, iseal) <- spawn' Single
(ooutbox, oinbox, oseal) <- spawn' Single
(eoutbox, einbox, eseal) <- spawn' Single
errf <- errorSiphonUTF8 <$> newMVar eoutbox
runConceit $
(Conceit $ action (toOutput ioutbox,atomically iseal,fromInput oinbox,fromInput einbox))
<*
(Conceit $ blende ecpol <$> executeFallibly
(pipeioe (fromProducer . fromInput $ iinbox)
(fromConsumer . toOutput $ ooutbox)
(errf lpol)
)
cp
`finally` atomically iseal `finally` atomically oseal `finally` atomically eseal
)
executePipelineFallibly policy (Node s (s':ss)) =
let pipeline = CreatePipeline s $ s' :| ss
in case policy of
PPNone a -> fmap (fmap (const a)) $
executePipelineInternal
(\o _ -> mute $ pipeo o)
(\i o _ -> mute $ pipeio i o)
(\i _ -> mute $ pipei i)
(\i _ -> mute $ pipei i)
pipeline
PPOutput action -> do
(outbox, inbox, seal) <- spawn' Single
runConceit $
(Conceit $ action $ fromInput inbox)
<*
(Conceit $ executePipelineInternal
(\o _ -> pipeo o)
(\i o _ -> mute $ pipeio i o)
(\i _ -> mute $ pipeio i (fromConsumer . toOutput $ outbox))
(\i _ -> mute $ pipei i)
pipeline
`finally` atomically seal
)
PPError action -> do
(eoutbox, einbox, eseal) <- spawn' Single
errf <- errorSiphonUTF8 <$> newMVar eoutbox
runConceit $
(Conceit $ action $ fromInput einbox)
<*
(Conceit $ executePipelineInternal
(\o l -> mute $ pipeoe o (errf l))
(\i o l -> mute $ pipeioe i o (errf l))
(\i l -> mute $ pipeie i (errf l))
(\i l -> mute $ pipeie i (errf l))
pipeline
`finally` atomically eseal)
PPOutputError action -> do
(outbox, inbox, seal) <- spawn' Single
(eoutbox, einbox, eseal) <- spawn' Single
errf <- errorSiphonUTF8 <$> newMVar eoutbox
runConceit $
(Conceit $ action $ (fromInput inbox,fromInput einbox))
<*
(Conceit $ executePipelineInternal
(\o l -> mute $ pipeoe o (errf l))
(\i o l -> mute $ pipeioe i o (errf l))
(\i l -> mute $ pipeioe i (fromConsumer . toOutput $ outbox) (errf l))
(\i l -> mute $ pipeie i (errf l))
pipeline
`finally` atomically seal `finally` atomically eseal
)
PPInput action -> do
(outbox, inbox, seal) <- spawn' Single
runConceit $
(Conceit $ action (toOutput outbox,atomically seal))
<*
(Conceit $ executePipelineInternal
(\o _ -> mute $ pipeio (fromProducer . fromInput $ inbox) o)
(\i o _ -> mute $ pipeio i o)
(\i _ -> mute $ pipei i)
(\i _ -> mute $ pipei i)
pipeline
`finally` atomically seal
)
PPInputOutput action -> do
(ioutbox, iinbox, iseal) <- spawn' Single
(ooutbox, oinbox, oseal) <- spawn' Single
runConceit $
(Conceit $ action (toOutput ioutbox,atomically iseal,fromInput oinbox))
<*
(Conceit $ executePipelineInternal
(\o _ -> mute $ pipeio (fromProducer . fromInput $ iinbox) o)
(\i o _ -> mute $ pipeio i o)
(\i _ -> mute $ pipeio i (fromConsumer . toOutput $ ooutbox))
(\i _ -> mute $ pipei i)
pipeline
`finally` atomically iseal `finally` atomically oseal
)
PPInputError action -> do
(outbox, inbox, seal) <- spawn' Single
(eoutbox, einbox, eseal) <- spawn' Single
errf <- errorSiphonUTF8 <$> newMVar eoutbox
runConceit $
(Conceit $ action (toOutput outbox,atomically seal,fromInput einbox))
<*
(Conceit $ executePipelineInternal
(\o l -> mute $ pipeioe (fromProducer . fromInput $ inbox) o (errf l))
(\i o l -> mute $ pipeioe i o (errf l))
(\i l -> mute $ pipeie i (errf l))
(\i l -> mute $ pipeie i (errf l))
pipeline
`finally` atomically seal `finally` atomically eseal
)
PPInputOutputError action -> do
(ioutbox, iinbox, iseal) <- spawn' Single
(ooutbox, oinbox, oseal) <- spawn' Single
(eoutbox, einbox, eseal) <- spawn' Single
errf <- errorSiphonUTF8 <$> newMVar eoutbox
runConceit $
(Conceit $ action (toOutput ioutbox,atomically iseal,fromInput oinbox,fromInput einbox))
<*
(Conceit $ executePipelineInternal
(\o l -> mute $ pipeioe (fromProducer . fromInput $ iinbox) o (errf l))
(\i o l -> mute $ pipeioe i o (errf l))
(\i l -> mute $ pipeioe i (fromConsumer . toOutput $ ooutbox) (errf l))
(\i l -> mute $ pipeie i (errf l))
pipeline
`finally` atomically iseal `finally` atomically oseal `finally` atomically eseal
)
errorSiphonUTF8 :: MVar (Output ByteString) -> LinePolicy e -> Siphon ByteString e ()
errorSiphonUTF8 mvar (LinePolicy fun twk) = Halting $ fun twk iterTLines
where
iterTLines = iterT $ \textProducer -> do
join $ withMVar mvar $ \output -> do
runEffect $ (textProducer <* P.yield (singleton '\n'))
>-> P.map Data.Text.Encoding.encodeUtf8
>-> (toOutput output >> P.drain)
mute :: Functor f => f a -> f ()
mute = fmap (const ())
data Stage e = Stage
{
processDefinition' :: CreateProcess
, stderrLinePolicy' :: LinePolicy e
, exitCodePolicy' :: ExitCode -> Either e ()
, inbound' :: forall r. Producer ByteString IO r -> Producer ByteString (ExceptT e IO) r
}
instance Functor (Stage) where
fmap f (Stage a b c d) = Stage a (fmap f b) (bimap f id . c) (hoist (mapExceptT $ liftM (bimap f id)) . d)
stage :: LinePolicy e -> (ExitCode -> Either e ()) -> CreateProcess -> Stage e
stage lp ec cp = Stage cp lp ec (hoist lift)
inbound :: (forall r. Producer ByteString (ExceptT e IO) r -> Producer ByteString (ExceptT e IO) r)
-> Stage e -> Stage e
inbound f (Stage a b c d) = Stage a b c (f . d)
data CreatePipeline e = CreatePipeline (Stage e) (NonEmpty (Tree (Stage e))) deriving (Functor)
executePipelineInternal :: (Show e,Typeable e)
=> (Siphon ByteString e () -> LinePolicy e -> PipingPolicy e ())
-> (Pump ByteString e () -> Siphon ByteString e () -> LinePolicy e -> PipingPolicy e ())
-> (Pump ByteString e () -> LinePolicy e -> PipingPolicy e ())
-> (Pump ByteString e () -> LinePolicy e -> PipingPolicy e ())
-> CreatePipeline e
-> IO (Either e ())
executePipelineInternal ppinitial ppmiddle ppend ppend' (CreatePipeline (Stage cp lpol ecpol _) a) =
blende ecpol <$> executeFallibly (ppinitial (runNonEmpty ppend ppend' a) lpol) cp
where
runTree ppend ppend' (Node (Stage cp lpol ecpol pipe) forest) = case forest of
[] -> Halting $ \producer ->
blende ecpol <$> executeFallibly (ppend (fromFallibleProducer $ pipe producer) lpol) cp
c1 : cs -> Halting $ \producer ->
blende ecpol <$> executeFallibly (ppmiddle (fromFallibleProducer $ pipe producer) (runNonEmpty ppend ppend' (c1 :| cs)) lpol) cp
runNonEmpty ppend ppend' (b :| bs) =
runTree ppend ppend' b <* Prelude.foldr (<*) (pure ()) (runTree ppend' ppend' <$> bs)
blende :: (ExitCode -> Either e ()) -> Either e (ExitCode,a) -> Either e a
blende f r = r >>= \(ec,a) -> f ec *> pure a
pipefail :: ExitCode -> Either Int ()
pipefail ec = case ec of
ExitSuccess -> Right ()
ExitFailure i -> Left i