{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE TypeFamilies #-}

-- | This module provides an EQueue implimentation that uses STM to wait on the first
--   available event among a set of possible events. When Waited on it can provide up to one
--   event from every available event source when it provides one event, allowing coalescing.

module Control.Concurrent.EQueue.STMEQueue where

import           Control.Concurrent.EQueue.Class
import           Control.Concurrent.STM
import           Control.Monad.Trans
import           Data.Foldable
import           Data.Map (Map)
import qualified Data.Map as Map
import           Data.Maybe (catMaybes)
import           Data.Semigroup
import           Data.Unique

-- | A basic example implimentation of an EQueue using STM.
--   This implimentation must look at every registered event source
--   leading to inefficiency in systems with a very large number of
--   sources. For most systems it should be a sufficient implimentation
--   though.
data STMEQueue a =
    STMEQueue
    { _eqActiveSources :: TVar (Map Unique (STM (Maybe a)))
    }

-- | Passed an STM dequeueing the current value of this signal.
--   Returns an action to unregister said.
register :: (MonadIO m) => STMEQueue a -> STM (Maybe a) -> m (IO ())
register (STMEQueue tqm) g = liftIO $ do
  u <- newUnique
  atomically $ do
    modifyTVar tqm (Map.insert u g)
    return . atomically $ modifyTVar tqm (Map.delete u)

-- | Create a new STMEQueue which initally has no event sources registered.
newSTMEQueue :: MonadIO m => m (STMEQueue a)
newSTMEQueue = liftIO $ STMEQueue <$> newTVarIO mempty

instance EQueue STMEQueue where
  registerSemi eq f = liftIO $ do
    t <- newEmptyTMVarIO
    (mappendTMVar t,) <$> register eq ((fmap f) <$> tryTakeTMVar t)
    where
      mappendTMVar :: Semigroup a => TMVar a -> a -> IO ()
      mappendTMVar t a = atomically $ do
        mv <- tryTakeTMVar t
        case mv of
          Nothing -> putTMVar t a
          Just v  -> putTMVar t (v <> a)

  registerQueued eq = liftIO $ do
    t <- newTQueueIO
    (atomically . writeTQueue t,) <$> register eq (tryReadTQueue t)

-- | The policy for waiting on an STMEQueue.
data STMEQueueWait =
    ReturnImmediate
    -- ^ Immediately return, even if no events are available.
  | RequireEvent
    -- ^ Wait for at least one event to be available before returning.
  deriving (Eq)

instance EQueueW STMEQueue where
  type WaitPolicy STMEQueue = STMEQueueWait

  waitEQ (STMEQueue tqm) wp = liftIO . atomically $ do
    qm <- readTVar tqm
    es <- catMaybes <$> (sequenceA . toList $ qm)
    if (null es && wp == RequireEvent)
      then retry
      else return es