{-# LANGUAGE RankNTypes #-}

module Pipes.Misc.Concurrent where

import Control.Applicative
import Control.Concurrent
import Control.Concurrent.STM
import Control.Monad
import Control.Monad.Morph
import Control.Monad.Trans.Except
import Control.Monad.Trans.Maybe
import qualified Data.List.NonEmpty as NE
import qualified Pipes as P
import qualified Pipes.Concurrent as PC
import qualified Pipes.Prelude as PP

-- | Like Pipes.Concurrent.fromInput, but stays in STM.
-- Using @hoist atomically@ to convert to IO monad seems to work.
-- Do not use @unsafeHoist atomically@.
-- Each transaction is `atomically` scoped around each yield,
-- so be careful when `Pipes.Prelude.filter` or similar pipes to remove yields
-- as this results in larger transactions and it may cause BlockIndefinitelyOnSTM exceptions.
-- Intead, use Monoids to yield mempty so that the STM state changes.
fromInputSTM :: PC.Input a -> P.Producer' a STM ()
fromInputSTM as = void $ runMaybeT $ forever $ do
    a <- MaybeT $ lift $ PC.recv as
    lift $ P.yield a

-- | Like Pipes.Concurrent.toOutput, but stays in STM.
-- Using @hoist atomically@ to convert to IO monad seems to work.
-- Do not use @unsafeHoist atomically@.
toOutputSTM :: PC.Output a -> P.Consumer' a STM ()
toOutputSTM output = void $ runMaybeT $ forever $ do
    a <- lift P.await
    p <- lift $ lift $ PC.send output a
    guard p

-- | Convert PC.Output @a -> STM Bool@ to @a -> MaybeT STM ()@
toOutputMaybeT :: PC.Output a -> a -> MaybeT STM ()
toOutputMaybeT output = (MaybeT . fmap guard) <$> PC.send output

-- | Converts a Producer in IO monad to a producer in STM monad.
mkProducerSTM :: PC.Buffer a -> P.Producer a IO () -> IO (P.Producer a STM ())
mkProducerSTM b xs = do
    (output, input) <- PC.spawn b
    void . forkIO . void . forever . P.runEffect $ xs P.>-> PC.toOutput output
    pure (fromInputSTM input)

-- | Converts a Producer in IO monad to a producer in STM monad. Also returns the seal.
mkProducerSTM' :: PC.Buffer a -> P.Producer a IO () -> IO (STM (), P.Producer a STM ())
mkProducerSTM' b xs = do
    (output, input, seal) <- PC.spawn' b
    void . forkIO . void . forever . P.runEffect $ xs P.>-> PC.toOutput output
    pure (seal, fromInputSTM input)

-- | Reads as much as possible from an input and return a list of all unblocked values read.
-- Blocks if the first value read is blocked.
batch :: PC.Input a -> PC.Input (NE.NonEmpty a)
batch (PC.Input xs) = PC.Input $ do
    x <- xs
    case x of
        Nothing -> pure Nothing
        Just x' -> do
            xs' <- runExceptT . tryNext $ x' NE.:| []
            case xs' of
                Left ys -> pure (Just ys)
                Right ys -> pure (Just ys)
  where
      tryNext ys = do
          ys' <- ExceptT $ (tryCons ys <$> xs) <|> pure (Left ys)
          tryNext ys'
      tryCons ys x = case x of
          Nothing -> Left ys -- return successful reads so far
          Just x' -> Right $ x' NE.<| ys

-- | Combine a 'Pipes.Concurrent.Input' and a 'a -> t STM b' into a 'Pipes.Producer' of the result b.
-- That is, given a input of messages, and something that executes the messages to produce a result b,
-- combine them to get a Producer of the executed results.
execInput
    :: (MonadTrans t, Monad (t STM))
    => PC.Input a -> (a -> (t STM) b) -> P.Producer' b (t STM) ()
execInput input f = hoist lift (fromInputSTM input) P.>-> PP.mapM f