-- | * Introduction
--   Contains a simple source and sink for linking together conduits in
--   in different threads. Usage is so easy, it's best explained with an
--   example:
--   We first create a channel for communication...
--   > do chan <- atomically $ newTBMChan 16
--   Then we fork a new thread loading a wackton of pictures into memory. The
--   data (pictures, in this case) will be streamed down the channel to whatever
--   is on the other side.
--   >    _ <- forkIO . runResourceT $ loadTextures lotsOfPictures $$ sinkTBMChan chan
--   Finally, we connect something to the other end of the channel. In this
--   case, we connect a sink which uploads the textures one by one to the
--   graphics card.
--   >    runResourceT $ sourceTBMChan chan $$ Conduit.mapM_ (liftIO . uploadToGraphicsCard)
--   By running the two tasks in parallel, we no longer have to wait for one
--   texture to upload to the graphics card before reading the next one from
--   disk. This avoids the common switching of bottlenecks (such as between the
--   disk and graphics memory) that most loading processes seem to love.
--   Control.Concurrent.STM.TMChan and Control.Concurrent.STM.TBMChan are
--   re-exported for convenience.
--   * Caveats
--   It is recommended to use TBMChan as much as possible, and generally avoid
--   TMChan usage. TMChans are unbounded, and if used, the conduit pipeline
--   will no longer use a bounded amount of space. They will essentially leak
--   memory if the writer is faster than the reader.
--   Therefore, use bounded channels as much as possible, preferably with a
--   high bound so it will be hit infrequently.
module Data.Conduit.TMChan ( module Control.Concurrent.STM.TBMChan
                           , sourceTBMChan
                           , sinkTBMChan
                           , module Control.Concurrent.STM.TMChan
                           , sourceTMChan
                           , sinkTMChan
                           ) where

import Control.Monad.IO.Class ( liftIO )
import Control.Concurrent.STM
import Control.Concurrent.STM.TBMChan
import Control.Concurrent.STM.TMChan

import Data.Conduit

chanSource :: chan -- ^ The channel.
           -> (chan -> STM (Maybe a)) -- ^ The 'read' function.
           -> (chan -> STM ()) -- ^ The 'close' function.
           -> Source IO a
chanSource ch reader closer = src
        src = Source pull close
        pull = do a <- liftIO . atomically $ reader ch
                  case a of
                    Just x  -> return $ Open src x
                    Nothing -> return Closed
        close = liftIO . atomically $ closer ch
{-# INLINE chanSource #-}

chanSink :: chan -- ^ The channel.
         -> (chan -> a -> STM ()) -- ^ The 'write' function.
         -> (chan -> STM ()) -- ^ The 'close' function.
         -> Sink a IO ()
chanSink ch writer closer = sink
        sink = SinkData push close
        push input = do liftIO . atomically $ writer ch input
                        return $ Processing push close
        close = liftIO . atomically $ closer ch
{-# INLINE chanSink #-}

-- | A simple wrapper around a TBMChan. As data is pushed into the channel, the
--   source will read it and pass it down the conduit pipeline. When the
--   channel is closed, the source will close also.
--   If the channel fills up, the pipeline will stall until values are read.
sourceTBMChan :: TBMChan a -> Source IO a
sourceTBMChan ch = chanSource ch readTBMChan closeTBMChan
{-# INLINE sourceTBMChan #-}

-- | A simple wrapper around a TMChan. As data is pushed into the channel, the
--   source will read it and pass it down the conduit pipeline. When the
--   channel is closed, the source will close also.
sourceTMChan :: TMChan a -> Source IO a
sourceTMChan ch = chanSource ch readTMChan closeTMChan
{-# INLINE sourceTMChan #-}

-- | A simple wrapper around a TBMChan. As data is pushed into the sink, it
--   will magically begin to appear in the channel. If the channel is full,
--   the sink will block until space frees up. When the sink is closed, the
--   channel will close too.
sinkTBMChan :: TBMChan a -> Sink a IO ()
sinkTBMChan ch = chanSink ch writeTBMChan closeTBMChan
{-# INLINE sinkTBMChan #-}

-- | A simple wrapper around a TMChan. As data is pushed into this sink, it
--   will magically begin to appear in the channel. When the sink is closed,
--   the channel will close too.
sinkTMChan :: TMChan a -> Sink a IO ()
sinkTMChan ch = chanSink ch writeTMChan closeTMChan
{-# INLINE sinkTMChan #-}