-- | A variant of "Pipes.Concurrent" that uses a Finger Tree-based Priority
-- Queue ('TPQueue.TPQueue') instead of a normal 'TQueue'.
module Pipes.Concurrent.PQueue
  ( spawn
  , withSpawn
  -- * Re-exports from "Pipes.Concurrent"
  , Input (..)
  , Output (..)
  , fromInput
  , toOutput
  ) where

import           Control.Applicative
import           Control.Concurrent.STM         as STM
import qualified Control.Concurrent.STM.TPQueue as TPQueue
import           Control.Exception              (bracket)
import           Control.Monad
import           Pipes.Concurrent
    ( Input (..)
    , Output (..)
    , fromInput
    , toOutput
    )


-- | Spawn a mailbox to store prioritized messages in a Mailbox. Using 'recv' on
-- the 'Input' will return 'Just' the minimal element, or 'Nothing' if the
-- mailbox is closed.
--
-- This function is analogous to
-- @"Pipes.Concurrent".'Pipes.Concurrent.spawn'' 'Pipes.Concurrent.Unbounded'@,
-- but it uses a 'TPQueue.TPQueue' instead of a 'TQueue' to store messages.
spawn :: Ord p => IO (Output (p, a), Input a, STM ())
spawn = do
    q <- TPQueue.newTPQueueIO
    sealed <- STM.newTVarIO False
    let seal = STM.writeTVar sealed True

    {- Use weak TVars to keep track of whether the 'Input' or 'Output' has been
       garbage collected.  Seal the mailbox when either of them becomes garbage
       collected.
    -}
    rSend <- STM.newTVarIO ()
    void (STM.mkWeakTVar rSend (STM.atomically seal))
    rRecv <- STM.newTVarIO ()
    void (STM.mkWeakTVar rRecv (STM.atomically seal))

    let sendOrEnd p a = do
            isSealed <- readTVar sealed
            if isSealed
                then pure False
                else TPQueue.writeTPQueue q p a >> pure True

        readOrEnd = fmap Just (TPQueue.readTPQueue q)
                <|> (readTVar sealed >>= check >> pure Nothing)

        _send (p, a) = sendOrEnd p a <* readTVar rSend
        _recv        = readOrEnd     <* readTVar rRecv
    return (Output _send, Input _recv, seal)
{-# INLINABLE spawn #-}

-- | 'withSpawn' passes its enclosed action an 'Output' and 'Input' like you'd
-- get from 'spawn', but automatically @seal@s them after the action completes.
-- This can be used when you need the @seal@ing behavior available from 'spawn',
-- but want to work at a bit higher level:
--
-- > withSpawn buffer $ \(output, input) -> ...
--
-- 'withSpawn' is exception-safe, since it uses 'bracket' internally.
withSpawn :: Ord p => ((Output (p, a), Input a) -> IO r) -> IO r
withSpawn action = bracket spawn
    (\(_, _, seal) -> atomically seal)
    (\(output, input, _) -> action (output, input))