{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE UndecidableInstances #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE TypeFamilies #-}

module Pipes.Async where

import Control.Exception (AsyncException(..))
import Control.Concurrent.Lifted
import Control.Concurrent.Async.Lifted
import Control.Concurrent.STM
import Control.Monad (liftM)
import Control.Monad.Base as Base
import Control.Monad.Trans.Control
import Pipes
import Pipes.Internal
import Pipes.Safe as Safe

infixl 7 >&>
-- | An operator version of `buffer`, which assumes a queue size of 16
-- elements.
(>&>) :: (MonadBaseControl IO m, MonadBaseControl IO (Base m), MonadSafe m)
      => Proxy a' a () b m r    -- ^ Upstream producer
      -> Proxy () b c' c m r    -- ^ Downstream consumer
      -> Proxy a' a c' c m r
(>&>) = buffer 16

-- | A substitute for 'Pipes.>->' that executes both the upstream producer and
-- downstream consumer in separate threads (see '>&>' for an operator version,
-- with a default queue size of 16 slots). The reason separate threads are
-- used for both sides is so that the current thread (running 'runEffect' or
-- 'toListM', for example) can manage the bidirectional semantics for the
-- resulting Proxy. That is:
-- Upstream is executed in task A, downstream in task B, and 'runEffect' in
-- the parent thread. Tasks A and B are connected so that 'b' values produced
-- in A are immediately enqueued and available to B. 'runEffect' does not manage
-- passing 'b' values from A to B, as it normally would; rather they flow
-- directly through a 'TBQueue' side-channel.
-- If upstream should attempt to send an @a'@ value further upstream,
-- expecting an 'a' in return, this will block task A as 'runEffect' sends the
-- request further up the chain. Or, should downstream send a 'c' value
-- downstream and expect a @c'@, it will block task B as 'runEffect' sends the
-- response further down the chain.
-- If upstream exits, its result value is enqueued until downstream sees it,
-- at which point 'runEffect' terminates with this value. However, if
-- downstream should exit first, this result is communicated directly to
-- 'runEffect', which returns it immediately, canceling both threads. Thus,
-- execution lifetime is biased toward the downstream consumer, since it is
-- more likely that downstream will consume elements until there are none
-- left, than that upstream would produce elements while waiting for
-- downstream to terminate.
-- If an exception occurs in either upstream or downstream, it is re-thrown in
-- the 'runEffect' thread. Also, no matter what happens, both the upstream and
-- downstream threads are canceled at the conclusion of the enclosing
-- 'MonadSafe' block.
-- Note: Using '>&>' should be a drop-in replacement for 'Pipes.>->' without
-- changing the meaning of the pipeline; however, how the composition is
-- associated has an effect on the concurrency. For example, @a >-> (b >&> c)@
-- causes 'b' and 'c' to be executed concurrently, with effects from 'a'
-- occuring in the parent thread (while 'b' blocks waiting on the response).
-- By contrast, @(a >-> b) >&> c@ executes @a >-> b@ and 'c' concurrently,
-- with nothing happening in the parent thread except to wait on the final
-- result. This will generally be faster since value passing through 'MVar'
-- will not be necessary. This is also the default interpretation of @a >-> b
-- >&> c@, since both operators left-associate at the same level.

buffer :: (MonadBaseControl IO m, MonadBaseControl IO (Base m), MonadSafe m)
       => Int                    -- ^ Number of slots in the bounded queue
       -> Proxy a' a () b m r    -- ^ Upstream producer
       -> Proxy () b c' c m r    -- ^ Downstream consumer
       -> Proxy a' a c' c m r
buffer sz ups downs = M $ do
    -- 'qeb' is a bounded queue of 'b' values flowing from upstream to
    -- downstream
    qeb <- Base.liftBase $ newTBQueueIO sz
    me  <- myThreadId
    q   <- Base.liftBase $ newTBQueueIO 3  -- control channel
    hd  <- spawn $ toDowns me q qeb
    hu  <- spawn $ fromUps me q qeb
    mainLoop q $ \r -> do
        release hu
        release hd
        return $ Pure r
    spawn f = mask $ \_unmask -> do
        h <- async f
        link h
        register $ cancel h

    readQ q    = Base.liftBase $ atomically $ readTBQueue q
    writeQ q x = Base.liftBase $ atomically $ writeTBQueue q x

    mainLoop q done = loop
        loop = readQ q >>= \case
            Left u -> case u of
                Request a' fa  -> return $ Request a' $ (>> M loop) . fa
                Respond _  _   -> error "Respond never comes from ups"
                M          _   -> error "M never comes from ups"
                Pure       r   -> done r
            Right d -> case d of
                Request _  _   -> error "Request never comes from downs"
                Respond c  fc' -> return $ Respond c $ (>> M loop) . fc'
                M          _   -> error "M never comes from downs"
                Pure       r   -> done r

    guarded :: (MonadBaseControl IO m, MonadBaseControl IO (Base m),
                MonadSafe m)
            => ThreadId
            -> ((Proxy a' a b' b m r -> m ()) -> Proxy a' a b' b m r -> m ())
            -> Proxy a' a b' b m r
            -> m ()
    guarded parent f = loop
        loop p = f loop p
            `catch` (\e -> case e :: AsyncException of
                ThreadKilled -> return ()
                _ -> Safe.liftBase $ throwTo parent e)
            `catch` (\e -> Safe.liftBase $ throwTo parent (e :: SomeException))

    fromUps parent q qeb = flip (guarded parent) ups $ \loop -> \case
        Request a' fa  -> throughVar q (Left . Request a') fa >>= loop
        Respond b  fb' -> writeQ qeb (Right b) >> loop (fb' ())
        M       m      -> m >>= loop
        Pure    r      -> writeQ qeb (Left r) -- this enqueues exit

    toDowns parent q qeb = flip (guarded parent) downs $ \loop -> \case
        Request () fb  -> readQ qeb >>= \case
            Left  r -> writeQ q (Right (Pure r))
            Right b -> loop (fb b)
        Respond c  fc' -> throughVar q (Right . Respond c) fc' >>= loop
        M       m      -> m >>= loop
        Pure    r      -> writeQ q (Right (Pure r)) -- this causes exit

    throughVar q x f = do
        var <- newEmptyMVar
        writeQ q $ x $ \v -> M $ do
            putMVar var v
            return $ Pure $ error "(>> M loop) throws this value away"
        f `liftM` takeMVar var