{-# LANGUAGE CPP #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE Arrows #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE TypeSynonymInstances #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE MultiParamTypeClasses #-}

#if __GLASGOW_HASKELL__ >= 708
{-# LANGUAGE Safe #-}
#else
{-# LANGUAGE Trustworthy #-}
#endif

module
    Control.Arrow.Machine.Utils
      (
        -- * AFRP-like utilities
        hold,
        dHold,
        accum,
        dAccum,
        edge,

        -- * Switches
        -- | Switches inspired by Yampa library.
        -- Signature is almost same, but collection requirement is  not only 'Functor',
        -- but 'Tv.Traversable'. This is because of side effects.
        switch,
        dSwitch,
        rSwitch,
        drSwitch,
        kSwitch,
        dkSwitch,
        pSwitch,
        pSwitchB,
        rpSwitch,
        rpSwitchB,

        -- * Sources
        -- $sources

        source,
        blockingSource,
        interleave,
        blocking,

        -- * Other utility arrows
        tee,
        gather,
        fork,
        filter,
        echo,
        anytime,
        par,
        parB,
        oneshot,
        now,
        onEnd,
#if defined(MIN_VERSION_arrows)
        -- * Transformer
        readerProc
#endif
     )
where

import Prelude hiding (filter)

import qualified Data.List.NonEmpty as NonEmpty
import qualified Data.Foldable as Fd
import qualified Control.Category as Cat
import Control.Monad.Trans
import Control.Monad.State
import Control.Arrow
import Control.Applicative
#if defined(MIN_VERSION_arrows)
import Control.Arrow.Transformer.Reader (ArrowAddReader(..))
#endif
import Control.Arrow.Machine.ArrowUtil
import Control.Arrow.Machine.Types





hold ::
    ArrowApply a => b -> ProcessA a (Event b) b
hold old = proc evx ->
  do
    rSwitch (pure old) -< ((), pure <$> evx)

dHold ::
    ArrowApply a => b -> ProcessA a (Event b) b
dHold old = proc evx ->
  do
    drSwitch (pure old) -< ((), pure <$> evx)

accum ::
    ArrowApply a => b -> ProcessA a (Event (b->b)) b
accum x = switch (pure x &&& arr (($x)<$>)) accum'
  where
    accum' y = dSwitch (pure y &&& Cat.id) (const (accum y))

dAccum ::
    ArrowApply a => b -> ProcessA a (Event (b->b)) b
dAccum x = dSwitch (pure x &&& arr (($x)<$>)) dAccum


edge ::
    (ArrowApply a, Eq b) =>
    ProcessA a b (Event b)
edge = proc x ->
  do
    rec
        ev <- unsafeExhaust (arr judge) -< (prv, x)
        prv <- dHold Nothing -< Just x <$ ev
    returnA -< ev
  where
    judge (prv, x) = if prv == Just x then Nothing else Just x


-- $sources
-- In addition to the main event stream privided by `run`,
-- there are two other ways to provide additional input streams,
-- "interleaved" sources and "blocking" sources.
--
-- Interleaved sources are actually Event -> Event transformers
-- that don't see the values of the input events.
-- They discard input values and emit their values according to input event timing.
--
-- Blocking sources emit their events independent from upstream.
-- Until they exhaust their values, they block upstream transducers.
--
-- Here is a demonstration of two kind of sources.
--
-- @
-- a = proc x ->
--   do
--     y1 <- source [1, 2, 3] -< x
--     y2 <- source [4, 5, 6] -< x
--
--     gather -< [y1, y2]
-- -- run a (repeat ()) => [1, 4, 2, 5, 3, 6]
--
-- b = proc _ ->
--   do
--     y1 <- blockingSource [1, 2, 3] -< ()
--     y2 <- blockingSource [4, 5, 6] -< ()
--
--     gather -< [y1, y2]
-- -- run b [] => [4, 5, 6, 1, 2, 3]
-- @
--
-- In above code, you'll see that output values of `source`
-- (an interleaved source) are actually interelaved,
-- while `blockingSource` blocks another upstream source.
--
-- And they can both implemented using `PlanT`.
-- The only one deference is `await` call to listen upstream event timing.
--
-- An example is below.
--
-- @
-- interleavedStdin = constructT kleisli0 (forever pl)
--   where
--     pl =
--       do
--         _ <- await
--         eof <- isEOF
--         if isEOF then stop else return()
--         getLine >>= yield
--
-- blockingStdin = pure noEvent >>> constructT kleisli0 (forever pl)
--   where
--     pl =
--       do
--         -- No await here
--         eof <- isEOF
--         if isEOF then stop else return()
--         getLine >>= yield
-- @
--
-- They are different in the end behavior.
-- When upstream stops, an interleaved source stops because await call fails.
-- But a blocking source doesn't stop until its own termination.


-- | Provides a source event stream.
-- A dummy input event stream is needed.
--
-- @
--   run af [...]
-- @
--
-- is equivalent to
--
-- @
--   run (source [...] >>> af) (repeat ())
-- @
source ::
    (ArrowApply a, Fd.Foldable f) =>
    f c -> ProcessA a (Event b) (Event c)
source l = construct $ Fd.mapM_ yd l
  where
    yd x = await >> yield x

-- | Provides a blocking event stream.
blockingSource ::
    (ArrowApply a, Fd.Foldable f) =>
    f c -> ProcessA a () (Event c)
blockingSource l = pure noEvent >>> construct (Fd.mapM_ yield l)

-- | Make a blocking source interleaved.
interleave ::
    ArrowApply a =>
    ProcessA a () (Event c) ->
    ProcessA a (Event b) (Event c)
interleave bs0 = sweep1 (pure () >>> bs0)
  where
    waiting bs r =
        dSwitch
            (handler bs r)
            sweep1
    sweep1 bs =
        kSwitch
            bs
            (arr snd)
            waiting
    handler bs r = proc ev ->
      do
        ev' <- splitter bs r -< ev
        returnA -< (filterJust (fst <$> ev'), snd <$> ev')
    splitter bs r =
        construct $
          do
            _ <- await
            yield (Just r, bs)
          `catchP`
            yield (Nothing, bs >>> muted)

-- | Make an interleaved source blocking.
blocking ::
    ArrowApply a =>
    ProcessA a (Event ()) (Event c) ->
    ProcessA a () (Event c)
blocking is = dSwitch (blockingSource (repeat ()) >>> is >>> (Cat.id &&& onEnd)) (const stopped)


--
-- other utility arrow

-- |Make two event streams into one.
-- Actually `gather` is more general and convenient;
--
-- @... \<- tee -\< (e1, e2)@
--
-- is equivalent to
--
-- @... \<- gather -\< [Left \<$\> e1, Right \<$\> e2]@
--
tee ::
    ArrowApply a => ProcessA a (Event b1, Event b2) (Event (Either b1 b2))
tee = proc (e1, e2) -> gather -< [Left <$> e1, Right <$> e2]



-- |Make multiple event channels into one.
-- If simultaneous events are given, lefter one is emitted earlier.
gather ::
    (ArrowApply a, Fd.Foldable f) =>
    ProcessA a (f (Event b)) (Event b)
gather = arr (Fd.foldMap $ fmap singleton) >>> fork
  where
    singleton x = x NonEmpty.:| []


-- |Given an array-valued event and emit it's values as inidvidual events.
fork ::
    (ArrowApply a, Fd.Foldable f) =>
    ProcessA a (Event (f b)) (Event b)

fork = repeatedly $
    await >>= Fd.mapM_ yield

-- |Executes an action once per an input event is provided.
anytime ::
    ArrowApply a =>
    a b c ->
    ProcessA a (Event b) (Event c)

anytime action = repeatedlyT (ary0 unArrowMonad) $
  do
    x <- await
    ret <- lift $ arrowMonad action x
    yield ret


filter ::
    ArrowApply a =>
    a b Bool ->
    ProcessA a (Event b) (Event b)
filter cond = repeatedlyT (ary0 unArrowMonad) $
  do
    x <- await
    b <- lift $ arrowMonad cond x
    if b then yield x else return ()


echo ::
    ArrowApply a =>
    ProcessA a (Event b) (Event b)

echo = filter (arr (const True))

-- |Emit an event of given value as soon as possible.
oneshot ::
    ArrowApply a =>
    c ->
    ProcessA a b (Event c)
oneshot x = arr (const noEvent) >>> go
  where
    go = construct $
        yield x >> forever await

-- |Emit an event as soon as possible.
--
-- @
--  now = oneshot ()
-- @
now ::
    ArrowApply a =>
    ProcessA a b (Event ())
now = oneshot ()

-- |Emit an event at the end of the input stream.
onEnd ::
    (ArrowApply a, Occasional' b) =>
    ProcessA a b (Event ())
onEnd = arr collapse >>> go
  where
    go = repeatedly $
        await `catchP` (yield () >> stop)


#if defined(MIN_VERSION_arrows)
-- | Run reader of base arrow.
readerProc ::
    (ArrowApply a, ArrowApply a', ArrowAddReader r a a') =>
    ProcessA a b c ->
    ProcessA a' (b, r) c
readerProc pa = arr swap >>> fitW snd (\ar -> arr swap >>> elimReader ar) pa
  where
    swap :: (a, b) -> (b, a)
    swap ~(a, b) = (b, a)
#endif