{-# LANGUAGE DeriveFunctor #-}
{-# LANGUAGE ExistentialQuantification #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE ViewPatterns #-}

module System.Process.Streaming.Internal ( 
        Piping(..), 
        Piap(..), 
        Pump(..),
        Siphon(..),
        runSiphon,
        runSiphonDumb,
        Siphon_(..),
        exhaustive,
        Lines(..),
        Splitter(..),
        combined,
        manyCombined,
        Stage(..)
    ) where

import Data.Bifunctor
import Data.Monoid
import Data.Text 
import Control.Applicative
import Control.Applicative.Lift
import Control.Monad
import Control.Monad.Trans.Free hiding (Pure)
import Control.Monad.Trans.Except
import Control.Exception
import Control.Concurrent
import Control.Concurrent.Conceit
import Pipes
import qualified Pipes as P
import qualified Pipes.Prelude as P
import Pipes.ByteString
import qualified Pipes.Text as T
import Pipes.Concurrent
import System.Process
import System.Exit

{-|
    A 'Piping' determines what standard streams will be piped and what to
do with them.

    The user doesn't need to manually set the 'std_in', 'std_out' and 'std_err'
fields of the 'CreateProcess' record to 'CreatePipe', this is done
automatically. 

    A 'Piping' is parametrized by the type @e@ of errors that can abort
the processing of the streams.
 -}
-- Knows that there is a stdin, stdout and a stderr,
-- but doesn't know anything about file handlers or CreateProcess.
data Piping e a = 
      PPNone a
    | PPOutput 
        (Producer ByteString IO () 
         -> 
         IO (Either e a))
    | PPError 
        (Producer ByteString IO () 
         -> 
         IO (Either e a))
    | PPOutputError 
        ((Producer ByteString IO ()
         ,Producer ByteString IO ()) 
         -> 
         IO (Either e a))
    | PPInput 
        ((Consumer ByteString IO ()
         ,IO ()) 
         -> 
         IO (Either e a))
    | PPInputOutput 
        ((Consumer ByteString IO ()
         ,IO ()
         ,Producer ByteString IO ()) 
         -> 
         IO (Either e a))
    | PPInputError 
        ((Consumer ByteString IO ()
         ,IO () 
         ,Producer ByteString IO ()) 
         -> 
         IO (Either e a))
    | PPInputOutputError 
        ((Consumer ByteString IO ()
         ,IO ()
         ,Producer ByteString IO ()
         ,Producer ByteString IO ()) 
         -> 
         IO (Either e a))
    deriving (Functor)


{-| 
    'first' is useful to massage errors.
-}
instance Bifunctor Piping where
  bimap f g pp = case pp of
        PPNone a -> PPNone $ 
            g a 
        PPOutput action -> PPOutput $ 
            fmap (fmap (bimap f g)) action
        PPError action -> PPError $ 
            fmap (fmap (bimap f g)) action
        PPOutputError action -> PPOutputError $ 
            fmap (fmap (bimap f g)) action
        PPInput action -> PPInput $ 
            fmap (fmap (bimap f g)) action
        PPInputOutput action -> PPInputOutput $ 
            fmap (fmap (bimap f g)) action
        PPInputError action -> PPInputError $ 
            fmap (fmap (bimap f g)) action
        PPInputOutputError action -> PPInputOutputError $ 
            fmap (fmap (bimap f g)) action


{-| 
    'Pump's are actions that write data into a process' @stdin@. 
 -}
newtype Pump b e a = Pump { runPump :: Consumer b IO () -> IO (Either e a) } deriving Functor

{-| 
    'first' is useful to massage errors.
-}
instance Bifunctor (Pump b) where
  bimap f g (Pump x) = Pump $ fmap (liftM  (bimap f g)) x


{-| 
    'pure' writes nothing to @stdin@.

    '<*>' sequences the writes to @stdin@.
-}
instance Applicative (Pump b e) where
  pure = Pump . pure . pure . pure
  Pump fs <*> Pump as = 
      Pump $ \consumer -> do
          (outbox1,inbox1,seal1) <- spawn' (bounded 1)
          (outbox2,inbox2,seal2) <- spawn' (bounded 1)
          runConceit $ 
              Conceit (runExceptT $ do
                           r1 <- ExceptT $ (fs $ toOutput outbox1) 
                                               `finally` atomically seal1
                           r2 <- ExceptT $ (as $ toOutput outbox2) 
                                               `finally` atomically seal2
                           return $ r1 r2 
                      )
              <* 
              Conceit (do
                         (runEffect $
                             (fromInput inbox1 >> fromInput inbox2) >-> consumer)
                            `finally` atomically seal1
                            `finally` atomically seal2
                         runExceptT $ pure ()
                      )

instance (Monoid a) => Monoid (Pump b e a) where
   mempty = Pump . pure . pure . pure $ mempty
   mappend s1 s2 = (<>) <$> s1 <*> s2

-- instance IsString b => IsString (Pump b e ()) where 
--    fromString = fromProducer . P.yield . fromString 

{-|
    An alternative to `Piping` for defining what to do with the
    standard streams. 'Piap' is an instance of 'Applicative', unlike
    'Piping'. 

    With 'Piap', the standard streams are always piped. The values of
    @std_in@, @std_out@ and @std_err@ in the 'CreateProcess' record are
    ignored.
 -}
newtype Piap e a = Piap { runPiap :: (Consumer ByteString IO (), IO (), Producer ByteString IO (), Producer ByteString IO ()) -> IO (Either e a) } deriving (Functor)

instance Bifunctor Piap where
  bimap f g (Piap action) = Piap $ fmap (fmap (bimap f g)) action


{-| 
    'pure' creates a 'Piap' that writes nothing to @stdin@ and drains
    @stdout@ and @stderr@, discarding the data.

    '<*>' schedules the writes to @stdin@ sequentially, and the reads from
    @stdout@ and @stderr@ concurrently.
-}
instance Applicative (Piap e) where
  pure a = Piap $ \(consumer, cleanup, producer1, producer2) -> do
      let nullInput = runPump (pure ()) consumer `finally` cleanup
          drainOutput = runSiphonDumb (pure ()) producer1
          drainError = runSiphonDumb (pure ()) producer2
      runConceit $ 
          (\_ _ _ -> a)
          <$>
          Conceit nullInput
          <*>
          Conceit drainOutput
          <*>
          Conceit drainError

  (Piap fa) <*> (Piap fb) = Piap $ \(consumer, cleanup, producer1, producer2) -> do
        latch <- newEmptyMVar :: IO (MVar ())
        (ioutbox, iinbox, iseal) <- spawn' (bounded 1)
        (ooutbox, oinbox, oseal) <- spawn' (bounded 1)
        (eoutbox, einbox, eseal) <- spawn' (bounded 1)
        (ioutbox2, iinbox2, iseal2) <- spawn' (bounded 1)
        (ooutbox2, oinbox2, oseal2) <- spawn' (bounded 1)
        (eoutbox2, einbox2, eseal2) <- spawn' (bounded 1)
        let reroutei = runEffect (fromInput (iinbox <> iinbox2) >-> consumer)
                       `finally` atomically iseal 
                       `finally` atomically iseal2
                       `finally` cleanup
            rerouteo = runEffect (producer1 >-> toOutput (ooutbox <> ooutbox2))
                       `finally` atomically oseal 
                       `finally` atomically oseal2
            reroutee = runEffect (producer2 >-> toOutput (eoutbox <> eoutbox2))
                       `finally` atomically eseal 
                       `finally` atomically eseal2
            deceivedf = fa 
                (toOutput ioutbox, 
                 atomically iseal `finally` putMVar latch (), 
                 fromInput oinbox, 
                 fromInput einbox)
                 `finally` atomically iseal 
                 `finally` atomically oseal 
                 `finally` atomically eseal 
            deceivedx = fb 
                (liftIO (takeMVar latch) *> toOutput ioutbox2, 
                 atomically iseal2, 
                 fromInput oinbox2, 
                 fromInput einbox2)
                 `finally` atomically iseal2 
                 `finally` atomically oseal2 
                 `finally` atomically eseal2 
        runConceit $
            _Conceit reroutei 
            *>
            _Conceit rerouteo 
            *> 
            _Conceit reroutee 
            *> 
            (Conceit deceivedf <*> Conceit deceivedx)


{-| 
    A 'Siphon' represents a computation that completely drains
    a 'Producer', but which may fail early with an error of type @e@. 
 -}
newtype Siphon b e a = Siphon (Lift (Siphon_ b e) a) deriving (Functor)


{-| 
    'pure' creates a 'Siphon' that does nothing besides draining the
'Producer'. 

    '<*>' executes its arguments concurrently. The 'Producer' is forked so
    that each argument receives its own copy of the data.
-}
instance Applicative (Siphon b e) where
    pure a = Siphon (pure a)
    (Siphon fa) <*> (Siphon a) = Siphon (fa <*> a)

data Siphon_ b e a = 
         Exhaustive (forall r. Producer b IO r -> IO (Either e (a,r)))
       | Nonexhaustive (Producer b IO () -> IO (Either e a))
       deriving (Functor)


instance Applicative (Siphon_ b e) where
    pure a = Exhaustive $ \producer -> do
        r <- runEffect (producer >-> P.drain)
        pure (pure (a,r))
    s1 <*> s2 = bifurcate (nonexhaustive s1) (nonexhaustive s2)  
      where 
        bifurcate fs as = Exhaustive $ \producer -> do
            (outbox1,inbox1,seal1) <- spawn' (bounded 1)
            (outbox2,inbox2,seal2) <- spawn' (bounded 1)
            runConceit $
                (,)
                <$>
                Conceit (fmap (uncurry ($)) <$> conceit ((fs $ fromInput inbox1) 
                                                        `finally` atomically seal1) 
                                                        ((as $ fromInput inbox2) 
                                                        `finally` atomically seal2) 
                        )
                <*>
                _Conceit ((runEffect $ 
                              producer >-> P.tee (toOutput outbox1 >> P.drain) 
                                       >->       (toOutput outbox2 >> P.drain))   
                          `finally` atomically seal1 `finally` atomically seal2
                         ) 

nonexhaustive :: Siphon_ b e a -> Producer b IO () -> IO (Either e a)
nonexhaustive (Exhaustive e) = \producer -> liftM (fmap fst) (e producer)
nonexhaustive (Nonexhaustive u) = u

exhaustive :: Siphon_ b e a -> Producer b IO r -> IO (Either e (a,r))
exhaustive s = case s of 
    Exhaustive e -> e
    Nonexhaustive activity -> \producer -> do 
        (outbox,inbox,seal) <- spawn' (bounded 1)
        runConceit $ 
            (,) 
            <$>
            Conceit (activity (fromInput inbox) `finally` atomically seal)
            <*>
            _Conceit (runEffect (producer >-> (toOutput outbox >> P.drain)) 
                      `finally` atomically seal
                     )

runSiphon :: Siphon b e a -> Producer b IO r -> IO (Either e (a,r))
runSiphon (Siphon (unLift -> s)) = exhaustive s

runSiphonDumb :: Siphon b e a -> Producer b IO () -> IO (Either e a)
runSiphonDumb (Siphon (unLift -> s)) = nonexhaustive $ case s of 
    Exhaustive _ -> s
    Nonexhaustive _ -> Exhaustive (exhaustive s)


instance Bifunctor (Siphon_ b) where
  bimap f g s = case s of
      Exhaustive u -> Exhaustive $ fmap (liftM  (bimap f (bimap g id))) u
      Nonexhaustive h -> Nonexhaustive $ fmap (liftM  (bimap f g)) h

{-| 
    'first' is useful to massage errors.
-}
instance Bifunctor (Siphon b) where
  bimap f g (Siphon s) = Siphon $ case s of
      Pure a -> Pure (g a)
      Other o -> Other (bimap f g o)

instance (Monoid a) => Monoid (Siphon b e a) where
   mempty = pure mempty
   mappend s1 s2 = (<>) <$> s1 <*> s2



-- http://unix.stackexchange.com/questions/114182/can-redirecting-stdout-and-stderr-to-the-same-file-mangle-lines here
combined :: Lines e 
         -> Lines e 
         -> (Producer T.Text IO () -> IO (Either e a))
         -> Producer ByteString IO () 
         -> Producer ByteString IO () 
         -> IO (Either e a)
combined (Lines fun1 twk1) (Lines fun2 twk2) combinedConsumer prod1 prod2 = 
    manyCombined [fmap ($prod1) (fun1 twk1), fmap ($prod2) (fun2 twk2)] combinedConsumer 

manyCombined :: [(FreeT (Producer T.Text IO) IO (Producer ByteString IO ()) -> IO (Producer ByteString IO ())) -> IO (Either e ())]
             -> (Producer T.Text IO () -> IO (Either e a))
             -> IO (Either e a) 
manyCombined actions consumer = do
    (outbox, inbox, seal) <- spawn' (bounded 1)
    mVar <- newMVar outbox
    runConceit $ 
        Conceit (mapConceit ($ iterTLines mVar) actions `finally` atomically seal)
        *>
        Conceit (consumer (fromInput inbox) `finally` atomically seal)
    where 
    iterTLines mvar = iterT $ \textProducer -> do
        -- the P.drain bit was difficult to figure out!!!
        join $ withMVar mvar $ \output -> do
            runEffect $ (textProducer <* P.yield (singleton '\n')) >-> (toOutput output >> P.drain)

{-|
    A configuration parameter used in functions that combine lines of text from
    multiple streams.
 -}

data Lines e = Lines 
    {
        teardown :: (forall r. Producer T.Text IO r -> Producer T.Text IO r)
                 -> (FreeT (Producer T.Text IO) IO (Producer ByteString IO ()) -> IO (Producer ByteString IO ())) 
                 -> Producer ByteString IO () -> IO (Either e ())
    ,   lineTweaker :: forall r. Producer T.Text IO r -> Producer T.Text IO r
    } 

-- | 'fmap' maps over the encoding error. 
instance Functor Lines where
  fmap f (Lines func lt) = Lines (\x y z -> fmap (bimap f id) $ func x y z) lt


newtype Splitter b = Splitter { getSplitter :: forall r. Producer b IO r -> FreeT (Producer b IO) IO r } 


{-|
   An individual stage in a process pipeline. 
 -}
data Stage e = Stage 
           {
             _processDefinition :: CreateProcess 
           , _stderrLines :: Lines e
           , _exitCodePolicy :: ExitCode -> Either e ()
           , _inbound :: forall r. Producer ByteString IO r 
                      -> Producer ByteString (ExceptT e IO) r 
           } 

instance Functor (Stage) where
    fmap f (Stage a b c d) = Stage a (fmap f b) (bimap f id . c) (hoist (mapExceptT $ liftM (bimap f id)) . d)