{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE NoMonomorphismRestriction #-}
{-# LANGUAGE RankNTypes #-}

-- | Miscellaneous utilities for pipes, required by glazier-tutorial
module Pipes.Misc where

import Control.Arrow
import Control.Concurrent.STM
import Control.Lens
import Control.Monad
import Control.Monad.State.Strict
import Control.Monad.Trans.Maybe
import qualified Pipes as P
import qualified Pipes.Concurrent as PC
import qualified Pipes.Shaft as PS
import qualified Pipes.Prelude as PP
import qualified Data.List.NonEmpty as NE
import Control.Monad.Except
import Control.Applicative

-- | Like Pipes.Concurrent.fromInput, but stays in STM.
-- Using @hoist atomically@ to convert to IO monad seems to work.
-- Do not use @unsafeHoist atomically@.
fromInputSTM :: PC.Input a -> P.Producer' a STM ()
fromInputSTM as = void $ runMaybeT $ forever $ do
    a <- MaybeT $ lift $ PC.recv as
    lift $ P.yield a

-- | Like Pipes.Concurrent.toOutput, but stays in STM.
-- Using @hoist atomically@ to convert to IO monad seems to work.
-- Do not use @unsafeHoist atomically@.
toOutputSTM :: PC.Output a -> P.Consumer' a STM ()
toOutputSTM output = void $ runMaybeT $ forever $ do
    a <- lift P.await
    p <- lift $ lift $ PC.send output a
    guard p

-- | Reads as much as possible from an input and return a list of all unblocked values read.
-- Blocks if the first value read is blocked.
batch :: PC.Input a -> PC.Input (NE.NonEmpty a)
batch (PC.Input xs) = PC.Input $ do
    x <- xs
    case x of
        Nothing -> pure Nothing
        Just x' -> do
            xs' <- runExceptT . tryNext $ x' NE.:| []
            case xs' of
                Left ys -> pure (Just ys)
                Right ys -> pure (Just ys)
  where
      tryNext ys = do
          ys' <- ExceptT $ (tryCons ys <$> xs) <|> pure (Left ys)
          tryNext ys'
      tryCons ys x = case x of
          Nothing -> Left ys -- return successful reads so far
          Just x' -> Right $ x' NE.<| ys

-- | Given a size and a initial tail, create a pipe that
-- will buffer the output of a producer.
-- This pipe is stateful, and will only buffer until the immediate connecting
-- producer is finished.
-- @
-- forever $ do
--   a <- await
--   yield a >-> buffer 2 [] -- will only ever result a producer of single 'a : []'.
-- @
-- @
-- (forever $ do
--   a <- await
--   yield a
-- ) >-> buffer 2 [] -- will buffer properly and produce '[latest, old]'
-- @
buffer :: Monad m => Int -> [a] -> P.Pipe a [a] m r
buffer n as = do
  a <- P.await
  let as' = take n $ a : as
  case forceSpine as' of -- TODO: can we leave this lazy?
    () -> do
      P.yield as'
      buffer n as'
 where
  -- from https://ro-che.info/articles/2015-05-28-force-list
  forceSpine = foldr (const id) ()

-- | Store the output of the pipe into a MonadState.
store :: MonadState s m => Getter a b -> Setter' s b -> P.Pipe a a m r
store v s = forever $ do
  a <- P.await
  s .= view v a
  P.yield a

-- | Yields a view into the stored value.
retrieve :: MonadState s m => Getter s b -> P.Pipe a (b, a) m r
retrieve v = forever $ do
  a <- P.await
  s <- get
  P.yield (view v s, a)

-- | Run a pipe in a larger stream, using view function and modify function
-- of the larger stream.
locally ::
  Monad m =>
     (s -> a)
  -> (b -> s -> t)
  -> P.Pipe a b m r
  -> P.Pipe s t m r
locally viewf modifyf p =
  PP.map (\s -> (s, s))
  P.>-> PS.runShaft (first $ PS.Shaft $ PP.map viewf P.>-> p)
  P.>-> PP.map (uncurry modifyf)