{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeFamilies #-}
{-# OPTIONS_GHC -Wall #-}
{-# OPTIONS_GHC -fno-warn-type-defaults #-}

-- | queues
-- Follows [pipes-concurrency](https://hackage.haskell.org/package/pipes-concurrency)
module Box.Queue
  ( Queue (..),
    queue,
    queueC,
    queueE,
    queueCM,
    queueEM,
    waitCancel,
    ends,
    withQ,
    withQE,
    withQC,
    toBox,
    concurrentlyLeft,
    concurrentlyRight,
  )
where

import Box.Box
import Box.Committer
import Box.Emitter
import Control.Applicative
import Control.Concurrent.Classy.Async as C
import Control.Concurrent.Classy.STM as C
import Control.Monad.Catch as C
import Control.Monad.Conc.Class as C

-- | 'Queue' specifies how messages are queued
data Queue a
  = Unbounded
  | Bounded Int
  | Single
  | Latest a
  | Newest Int
  | New

-- | create a queue, returning the ends
ends :: MonadSTM stm => Queue a -> stm (a -> stm (), stm a)
ends qu =
  case qu of
    Bounded n -> do
      q <- newTBQueue n
      return (writeTBQueue q, readTBQueue q)
    Unbounded -> do
      q <- newTQueue
      return (writeTQueue q, readTQueue q)
    Single -> do
      m <- newEmptyTMVar
      return (putTMVar m, takeTMVar m)
    Latest a -> do
      t <- newTVar a
      return (writeTVar t, readTVar t)
    New -> do
      m <- newEmptyTMVar
      return (\x -> tryTakeTMVar m *> putTMVar m x, takeTMVar m)
    Newest n -> do
      q <- newTBQueue n
      let write x = writeTBQueue q x <|> (tryReadTBQueue q *> write x)
      return (write, readTBQueue q)

-- | write to a queue, checking the seal
writeCheck :: (MonadSTM stm) => TVar stm Bool -> (a -> stm ()) -> a -> stm Bool
writeCheck sealed i a = do
  b <- readTVar sealed
  if b
    then pure False
    else do
      i a
      pure True

-- | read from a queue, and retry if not sealed
readCheck :: MonadSTM stm => TVar stm Bool -> stm a -> stm (Maybe a)
readCheck sealed o =
  (Just <$> o)
    <|> ( do
            b <- readTVar sealed
            C.check b
            pure Nothing
        )

-- | turn a queue into a box (and a seal)
toBox ::
  (MonadSTM stm) =>
  Queue a ->
  stm (Box stm a a, stm ())
toBox q = do
  (i, o) <- ends q
  sealed <- newTVarN "sealed" False
  let seal = writeTVar sealed True
  pure
    ( Box
        (Committer (writeCheck sealed i))
        (Emitter (readCheck sealed o)),
      seal
    )

toBoxM ::
  (MonadConc m) =>
  Queue a ->
  m (Box m a a, m ())
toBoxM q = do
  (i, o) <- atomically $ ends q
  sealed <- atomically $ newTVarN "sealed" False
  let seal = atomically $ writeTVar sealed True
  pure
    ( Box
        (Committer (atomically . writeCheck sealed i))
        (Emitter (atomically $ readCheck sealed o)),
      seal
    )

-- | wait for the first action, and then cancel the second
waitCancel :: (MonadConc m) => m b -> m a -> m b
waitCancel a b =
  withAsync a $ \a' ->
    withAsync b $ \b' -> do
      a'' <- wait a'
      cancel b'
      pure a''

-- | run two actions concurrently, but wait and return on the left result.
concurrentlyLeft :: MonadConc m => m a -> m b -> m a
concurrentlyLeft left right =
  withAsync left $ \a ->
    withAsync right $ \_ ->
      wait a

-- | run two actions concurrently, but wait and return on the right result.
concurrentlyRight :: MonadConc m => m a -> m b -> m b
concurrentlyRight left right =
  withAsync left $ \_ ->
    withAsync right $ \b ->
      wait b

-- | connect a committer and emitter action via spawning a queue, and wait for both to complete.
withQ ::
  (MonadConc m) =>
  Queue a ->
  (Queue a -> (STM m) (Box (STM m) a a, (STM m) ())) ->
  (Committer (STM m) a -> m l) ->
  (Emitter (STM m) a -> m r) ->
  m (l, r)
withQ q spawner cio eio =
  C.bracket
    (atomically $ spawner q)
    (\(_, seal) -> atomically seal)
    ( \(box, seal) ->
        concurrently
          (cio (committer box) `C.finally` atomically seal)
          (eio (emitter box) `C.finally` atomically seal)
    )

-- | connect a committer and emitter action via spawning a queue, and wait for committer to complete.
withQC ::
  (MonadConc m) =>
  Queue a ->
  (Queue a -> (STM m) (Box (STM m) a a, (STM m) ())) ->
  (Committer (STM m) a -> m l) ->
  (Emitter (STM m) a -> m r) ->
  m l
withQC q spawner cio eio =
  C.bracket
    (atomically $ spawner q)
    (\(_, seal) -> atomically seal)
    ( \(box, seal) ->
        concurrentlyLeft
          (cio (committer box) `C.finally` atomically seal)
          (eio (emitter box) `C.finally` atomically seal)
    )

-- | connect a committer and emitter action via spawning a queue, and wait for emitter to complete.
withQE ::
  (MonadConc m) =>
  Queue a ->
  (Queue a -> (STM m) (Box (STM m) a a, (STM m) ())) ->
  (Committer (STM m) a -> m l) ->
  (Emitter (STM m) a -> m r) ->
  m r
withQE q spawner cio eio =
  C.bracket
    (atomically $ spawner q)
    (\(_, seal) -> atomically seal)
    ( \(box, seal) ->
        concurrentlyRight
          (cio (committer box) `C.finally` atomically seal)
          (eio (emitter box) `C.finally` atomically seal)
    )

-- | connect a committer and emitter action via spawning a queue, and wait for both to complete.
withQM ::
  (MonadConc m) =>
  Queue a ->
  (Queue a -> m (Box m a a, m ())) ->
  (Committer m a -> m l) ->
  (Emitter m a -> m r) ->
  m (l, r)
withQM q spawner cio eio =
  C.bracket
    (spawner q)
    snd
    ( \(box, seal) ->
        concurrently
          (cio (committer box) `C.finally` seal)
          (eio (emitter box) `C.finally` seal)
    )

-- | create an unbounded queue
queue ::
  (MonadConc m) =>
  (Committer (STM m) a -> m l) ->
  (Emitter (STM m) a -> m r) ->
  m (l, r)
queue = withQ Unbounded toBox

-- | create an unbounded queue, returning the emitter result
queueE ::
  (MonadConc m) =>
  (Committer (STM m) a -> m l) ->
  (Emitter (STM m) a -> m r) ->
  m r
queueE cm em = snd <$> withQ Unbounded toBox cm em

-- | create an unbounded queue, returning the committer result
queueC ::
  (MonadConc m) =>
  (Committer (STM m) a -> m l) ->
  (Emitter (STM m) a -> m r) ->
  m l
queueC cm em = fst <$> withQ Unbounded toBox cm em

-- | create an unbounded queue, returning the emitter result
queueCM ::
  (MonadConc m) =>
  (Committer m a -> m l) ->
  (Emitter m a -> m r) ->
  m l
queueCM cm em = fst <$> withQM Unbounded toBoxM cm em

-- | create an unbounded queue, returning the emitter result
queueEM ::
  (MonadConc m) =>
  (Committer m a -> m l) ->
  (Emitter m a -> m r) ->
  m r
queueEM cm em = snd <$> withQM Unbounded toBoxM cm em

-- |
--
-- The one-in-the-chamber problem
--
-- This is the referential transparency refactoring I did to solve the one-in-the-chamber problem.  An etc process wasn't closing down when it should, until the committer fired once more:
--
-- -- etc () (Transducer $ \s -> s & S.takeWhile (/="q")) (Box <$> cStdout 2 <*> eStdin 2)
--
-- On entering a 'q' in stdin, this code piece requires another input from stdin before it shuts down.

-- > etc () (Transducer $ \s -> s & S.takeWhile (/="q")) (Box <$> cStdout 2 <*> eStdin 2)
-- etc substitution
-- > with (Box <$> cStdout 2 <*> eStdin 2) $ \(Box c e) -> (e & toStream & transduce (Transducer $ \s -> s & S.takeWhile (/="q")) & fromStream) c & flip execStateT ()
-- no state & transduction unwrapping
-- > with (Box <$> cStdout 2 <*> eStdin 2) $ \(Box c e) -> (e & toStream & S.takeWhile (/="q") & fromStream) c
-- subbing the IO's
-- > with (Box <$> (eStdout 2 & commitPlug) <*> (cStdin 2 & emitPlug)) $ \(Box c e) -> (e & toStream & transduce (Transducer $ \s -> s & S.takeWhile (/="q")) & fromStream) c
-- unplugging
-- > with (Box <$> (Cont $ \cio -> queueC cio (eStdout 2)) <*> (Cont $ \eio -> queueE (cStdin 2) eio)) $ \(Box c e) -> (e & toStream & S.takeWhile (/="q") & fromStream) c
-- fmapping the Box
-- > Cont (\r_ -> (Cont $ \cio -> queueC cio (eStdout 2)) `with` \x -> r_ (Box x))
-- twisting the with simplifying the Cont
-- > Cont (\r_ -> queueC (r_ . Box) (eStdout 2))
-- spaceship time!
-- > Cont (\r_ -> (Cont (\e -> queueC (e . Box) (eStdout 2))) `with` \f -> (Cont $ \eio -> queueE (cStdin 2) eio) `with` \x -> r_ (f x))
-- flipping the withs
-- > Cont (\r_ -> with (Cont (\e -> queueC (e . Box) (eStdout 2))) (\f -> with (Cont $ \eio -> queueE (cStdin 2) eio) (r_ . f)))
-- swallowing the withs
-- > with Cont (\r_ -> queueC ((\f -> queueE (cStdin 2) (r_ . f)) . Box) (eStdout 2))

-- subbing back in mainline
-- > with (Cont (\r_ -> queueC ((\f -> queueE (cStdin 2) (r_ . f)) . Box) (eStdout 2))) (\(Box c e) -> (e & toStream & S.takeWhile (/="q") & fromStream) c)
-- > queueC ((\f -> queueE (cStdin 2) ((\(Box c e) -> (e & toStream & S.takeWhile (/="q") & fromStream) c) . f)) . Box) (eStdout 2)
-- subbing queues
-- > fmap fst (withQ Unbounded toBox ((\f -> queueE (cStdin 2) ((\(Box c e) -> (e & toStream & S.takeWhile (/="q") & fromStream) c) . f)) . Box) (eStdout 2))
-- subbing stdin and stdout
-- > fmap fst (withQ Unbounded toBox ((\f -> fmap snd (withQ Unbounded toBox (\c -> cStdin_ c *> cStdin_ c) ((\(Box c e) -> (e & toStream & S.takeWhile (/="q") & fromStream) c) . f))) . Box) (\e -> eStdout_ e *> eStdout_ e))
-- removes the second eStdout_ (still requires another stdin input before it closes up)
-- fmap fst (withQ Unbounded toBox ((\f -> fmap snd (withQ Unbounded toBox (\c -> cStdin_ c *> cStdin_ c) ((\(Box c e) -> (e & toStream & S.takeWhile (/="q") & fromStream) c) . f))) . Box) eStdout_)
-- remove surperfluous fsts and snds
-- > withQ Unbounded toBox ((\f -> (withQ Unbounded toBox (\c -> cStdin_ c *> cStdin_ c) ((\(Box c e) -> (e & toStream & S.takeWhile (/="q") & fromStream) c) . f))) . Box) eStdout_
-- IO (((),()), ())
-- an intuitive unwrapping of the f
-- > withQ Unbounded toBox (\c -> (withQ Unbounded toBox (\c' -> cStdin_ c' *> cStdin_ c') ((\e -> (e & toStream & S.takeWhile (/="q") & fromStream) c)))) eStdout_

{-

And here was the problem is much easier to see. The withQ's were waiting on both sides of the queue.

I replaced `snd <$> withQ` with `withQE`

-}

-- subbing withQE fixes!
-- withQ Unbounded toBox (\c -> (withQE Unbounded toBox (\c' -> cStdin_ c' *> cStdin_ c') ((\e -> (fromStream . S.takeWhile (/="q") . toStream $ e) c)))) eStdout_