-- | A variant of "Pipes.Concurrent" that uses a Finger Tree-based Priority
-- Queue ('TPQueue.TPQueue') instead of a normal 'TQueue'.
module Pipes.Concurrent.PQueue
  ( spawn
  , withSpawn
  -- * Re-exports from "Pipes.Concurrent"
  , Input (..)
  , Output (..)
  , fromInput
  , toOutput
  ) where

import           Control.Applicative
import           Control.Concurrent.STM         as STM
import qualified Control.Concurrent.STM.TPQueue as TPQueue
import           Control.Exception              (bracket)
import           Control.Monad
import           Pipes.Concurrent
    ( Input (..)
    , Output (..)
    , fromInput
    , toOutput
    )


-- | Spawn a mailbox to store prioritized messages in a Mailbox. Using 'recv' on
-- the 'Input' will return 'Just' the minimal element, or 'Nothing' if the
-- mailbox is closed.
--
-- This function is analogous to
-- @"Pipes.Concurrent".'Pipes.Concurrent.spawn'' 'Pipes.Concurrent.Unbounded'@,
-- but it uses a 'TPQueue.TPQueue' instead of a 'TQueue' to store messages.
spawn :: Ord p => IO (Output (p, a), Input a, STM ())
spawn :: IO (Output (p, a), Input a, STM ())
spawn = do
    TPQueue p a
q <- IO (TPQueue p a)
forall k v. Ord k => IO (TPQueue k v)
TPQueue.newTPQueueIO
    TVar Bool
sealed <- Bool -> IO (TVar Bool)
forall a. a -> IO (TVar a)
STM.newTVarIO Bool
False
    let seal :: STM ()
seal = TVar Bool -> Bool -> STM ()
forall a. TVar a -> a -> STM ()
STM.writeTVar TVar Bool
sealed Bool
True

    {- Use weak TVars to keep track of whether the 'Input' or 'Output' has been
       garbage collected.  Seal the mailbox when either of them becomes garbage
       collected.
    -}
    TVar ()
rSend <- () -> IO (TVar ())
forall a. a -> IO (TVar a)
STM.newTVarIO ()
    IO (Weak (TVar ())) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (TVar () -> IO () -> IO (Weak (TVar ()))
forall a. TVar a -> IO () -> IO (Weak (TVar a))
STM.mkWeakTVar TVar ()
rSend (STM () -> IO ()
forall a. STM a -> IO a
STM.atomically STM ()
seal))
    TVar ()
rRecv <- () -> IO (TVar ())
forall a. a -> IO (TVar a)
STM.newTVarIO ()
    IO (Weak (TVar ())) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (TVar () -> IO () -> IO (Weak (TVar ()))
forall a. TVar a -> IO () -> IO (Weak (TVar a))
STM.mkWeakTVar TVar ()
rRecv (STM () -> IO ()
forall a. STM a -> IO a
STM.atomically STM ()
seal))

    let sendOrEnd :: p -> a -> STM Bool
sendOrEnd p
p a
a = do
            Bool
isSealed <- TVar Bool -> STM Bool
forall a. TVar a -> STM a
readTVar TVar Bool
sealed
            if Bool
isSealed
                then Bool -> STM Bool
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
False
                else TPQueue p a -> p -> a -> STM ()
forall k v. Ord k => TPQueue k v -> k -> v -> STM ()
TPQueue.writeTPQueue TPQueue p a
q p
p a
a STM () -> STM Bool -> STM Bool
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Bool -> STM Bool
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
True

        readOrEnd :: STM (Maybe a)
readOrEnd = (a -> Maybe a) -> STM a -> STM (Maybe a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap a -> Maybe a
forall a. a -> Maybe a
Just (TPQueue p a -> STM a
forall k v. Ord k => TPQueue k v -> STM v
TPQueue.readTPQueue TPQueue p a
q)
                STM (Maybe a) -> STM (Maybe a) -> STM (Maybe a)
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> (TVar Bool -> STM Bool
forall a. TVar a -> STM a
readTVar TVar Bool
sealed STM Bool -> (Bool -> STM ()) -> STM ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Bool -> STM ()
check STM () -> STM (Maybe a) -> STM (Maybe a)
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Maybe a -> STM (Maybe a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe a
forall a. Maybe a
Nothing)

        _send :: (p, a) -> STM Bool
_send (p
p, a
a) = p -> a -> STM Bool
sendOrEnd p
p a
a STM Bool -> STM () -> STM Bool
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f a
<* TVar () -> STM ()
forall a. TVar a -> STM a
readTVar TVar ()
rSend
        _recv :: STM (Maybe a)
_recv        = STM (Maybe a)
readOrEnd     STM (Maybe a) -> STM () -> STM (Maybe a)
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f a
<* TVar () -> STM ()
forall a. TVar a -> STM a
readTVar TVar ()
rRecv
    (Output (p, a), Input a, STM ())
-> IO (Output (p, a), Input a, STM ())
forall (m :: * -> *) a. Monad m => a -> m a
return (((p, a) -> STM Bool) -> Output (p, a)
forall a. (a -> STM Bool) -> Output a
Output (p, a) -> STM Bool
_send, STM (Maybe a) -> Input a
forall a. STM (Maybe a) -> Input a
Input STM (Maybe a)
_recv, STM ()
seal)
{-# INLINABLE spawn #-}

-- | 'withSpawn' passes its enclosed action an 'Output' and 'Input' like you'd
-- get from 'spawn', but automatically @seal@s them after the action completes.
-- This can be used when you need the @seal@ing behavior available from 'spawn',
-- but want to work at a bit higher level:
--
-- > withSpawn buffer $ \(output, input) -> ...
--
-- 'withSpawn' is exception-safe, since it uses 'bracket' internally.
withSpawn :: Ord p => ((Output (p, a), Input a) -> IO r) -> IO r
withSpawn :: ((Output (p, a), Input a) -> IO r) -> IO r
withSpawn (Output (p, a), Input a) -> IO r
action = IO (Output (p, a), Input a, STM ())
-> ((Output (p, a), Input a, STM ()) -> IO ())
-> ((Output (p, a), Input a, STM ()) -> IO r)
-> IO r
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket IO (Output (p, a), Input a, STM ())
forall p a. Ord p => IO (Output (p, a), Input a, STM ())
spawn
    (\(Output (p, a)
_, Input a
_, STM ()
seal) -> STM () -> IO ()
forall a. STM a -> IO a
atomically STM ()
seal)
    (\(Output (p, a)
output, Input a
input, STM ()
_) -> (Output (p, a), Input a) -> IO r
action (Output (p, a)
output, Input a
input))