{-# LANGUAGE Rank2Types, KindSignatures #-}
-- | Contains a simple source and sink linking together conduits in
--   different threads. For extended examples of usage and bottlenecks
--   see 'Data.Conduit.TMChan'.
--
--   TQueue is an amoritized FIFO queue behaves like TChan, with two
--   important differences:
--
--     * it's faster (but amortized thus the cost of individual operations
--     may vary a lot)
--
--     * it doesn't provide equivalent of the dupTChan and cloneTChan
--     operations
--
--
--   Here is short description of data structures:
--     
--     * TQueue   - unbounded infinite queue
--     * TBQueue  - bounded infinite queue
--     * TMQueue  - unbounded finite (closable) queue
--     * TBMQueue - bounded finite (closable) queue
--
-- Caveats
-- 
--   Infinite operations means that source doesn't know when stream is
--   ended so one need to use other methods of finishing stream like
--   sending an exception or finish conduit in downstream.
--

module Data.Conduit.TQueue
  ( -- * Connectors
    -- ** TQueue connectors 
    sourceTQueue
  , sinkTQueue
    -- ** TBQueue connectors
  , sourceTBQueue
  , sinkTBQueue
    -- ** TMQueue connectors
  , sourceTMQueue
  , sinkTMQueue
    -- ** TBMQueue connectors
  , sourceTBMQueue
  , sinkTBMQueue
  , module Control.Concurrent.STM.TQueue
  ) where

import Control.Concurrent.STM
import Control.Concurrent.STM.TQueue
import Control.Concurrent.STM.TBMQueue
import Control.Concurrent.STM.TMQueue
import Control.Monad.IO.Class
import Data.Conduit
import Data.Conduit.Internal



sourceTQueue :: MonadIO m => TQueue a -> Source m a
sourceTQueue q = ConduitM src
  where src = PipeM pull
        pull = do x <- liftSTM $ readTQueue q
                  return $ HaveOutput src close x
        close = return ()

sinkTQueue :: MonadIO m => TQueue a -> Sink a m ()
sinkTQueue q = ConduitM src
  where src        = sink
        sink       = NeedInput push close
        push input = PipeM ((liftSTM $ writeTQueue q input)
                            >> (return $ NeedInput push close))
        close _    = return ()

sourceTBQueue :: MonadIO m => TBQueue a -> Source m a
sourceTBQueue q = ConduitM src
  where src = PipeM pull
        pull = do x <- liftSTM $ readTBQueue q
                  return $ HaveOutput src close x
        close = return ()

sinkTBQueue :: MonadIO m => TBQueue a -> Sink a m ()
sinkTBQueue q = ConduitM src
  where src        = sink
        sink       = NeedInput push close
        push input = PipeM ((liftSTM $ writeTBQueue q input)
                            >> (return $ NeedInput push close))
        close _    = return ()

sourceTMQueue :: MonadIO m => TMQueue a -> Source m a
sourceTMQueue q = ConduitM src
  where src = PipeM pull
        pull = do mx <- liftSTM $ readTMQueue q
                  case mx of
                      Nothing -> return $ Done ()
                      Just x -> return $ HaveOutput src close x
        close = do liftSTM $ closeTMQueue q
                   return ()

sinkTMQueue :: MonadIO m => TMQueue a -> Sink a m ()
sinkTMQueue q = ConduitM src
  where src = sink
        sink =  NeedInput push close
        push input = PipeM ((liftSTM $ writeTMQueue q input)
                            >> (return $ NeedInput push close))
        close _ = do liftSTM $ closeTMQueue q
                     return ()

sourceTBMQueue :: MonadIO m => TBMQueue a -> Source m a
sourceTBMQueue q = ConduitM src
  where src = PipeM pull
        pull = do mx <- liftSTM $ readTBMQueue q
                  case mx of
                      Nothing -> return $ Done ()
                      Just x -> return $ HaveOutput src close x
        close = do liftSTM $ closeTBMQueue q
                   return ()

sinkTBMQueue :: MonadIO m => TBMQueue a -> Sink a m ()
sinkTBMQueue q = ConduitM src
  where src = sink
        sink =  NeedInput push close
        push input = PipeM ((liftSTM $ writeTBMQueue q input)
                            >> (return $ NeedInput push close))
        close _ = do liftSTM $ closeTBMQueue q
                     return ()


liftSTM :: forall (m :: * -> *) a. MonadIO m => STM a -> m a
liftSTM = liftIO . atomically