{- |
Module      :  <File name or $Header$ to be replaced automatically>
Description :  This module implements a channel concurrency primitive using Buffers.
Maintainer  :  willig@ki.informatik.uni-frankfurt.de
Stability   :  experimental
Portability :  non-portable (requires Futures)

This module implements a channel synchronisation primitive using buffers that block on
futures. A channel is a linked list of buffers. It has a read-end at one side and a
write-end at the other. Elements put into the channel can be read out in
a first in, first out order. A read and a write operation can be executed in
parallel by several threads. A channel has no capacity bounding.

The module contains similar functions as 'Control.Concurrent.Futures.Chan'.

Warning: All operations on channels should only be used within the global wrapper function
'Futures.withFuturesDo'!
-}
module Control.Concurrent.Futures.Chan (
             Chan,
             ChanType,
           	 newChan,
             writeChan,
             readChan,
             writeChanContents,
             getChanContents,
             mergeChan
  ) where
  
-- qualified, because channel functions are also defined in 
-- Control.Concurrent
import qualified Control.Concurrent
import System.IO.Unsafe		( unsafeInterleaveIO )
import Control.Concurrent.Futures.Buffer
import Control.Concurrent.Futures.Futures

-- | A channel consists of a read-end buffer and a write-end buffer. 
-- The Itemtype is required to link the buffers in the channel.
data Chan a
	 = Chan (Buffer (ItemType a))
	        (Buffer (ItemType a))

type ChanType a = ((Buffer (ItemType a)), (Buffer (ItemType a)))
type ItemType a = (Buffer(Item a))        
data Item a = Item a (ItemType a)

-- | Creates a new empty channel.               
newChan :: IO (Chan a)
newChan = do
 hole <- newBuf
 read_end <- newBuf
 write_end <- newBuf
 putBuf read_end hole
 putBuf write_end hole
 return (Chan read_end write_end)
 
-- | Writes one value to a channel. A 'writeChan' never blocks, since channels have 
-- no bounding limiters.
writeChan :: Chan a -> a -> IO ()
writeChan (Chan read_end write_end) val = do
 new_hole <- newBuf
 old_hole <- getBuf write_end
 putBuf write_end new_hole
 putBuf old_hole (Item val new_hole)

  
-- | Reads out an item from the read-head of the channel.
-- It blocks on a empty channel.
readChan :: Chan a -> IO a
readChan (Chan read_end write_end) = do
  chan_head <- getBuf read_end
  (Item val content) <- getBuf chan_head
  putBuf read_end content
  return val


-- | Implements the same behaviour as writeChanContents from the module Control.Concurrent.Chan.
writeChanContents :: Chan a -> [a] -> IO ()
writeChanContents chan (x:xs) = do
 Control.Concurrent.forkIO (writeChan chan x)
 writeChanContents chan xs
 >>= return
writeChanContents chan [] = return ()

-- | Implements the same behaviour as getChanContents from the module Control.Concurrent.Chan.
-- It reads permanently from the channel.
getChanContents :: Chan a -> IO [a]
getChanContents ch
  = unsafeInterleaveIO ( do
    x  <- readChan ch
    xs <- getChanContents ch
    return (x:xs)
    )

-- | Writes two equally typed lists to a given channel in parallel.
mergeChan :: [a] -> [a] -> Chan a -> IO (Chan a)
mergeChan l1 l2 cm = do
          	Control.Concurrent.forkIO (merge l1 cm)
          	Control.Concurrent.forkIO (merge l2 cm)
          	return cm

-- internal function     
merge (x:xs) c = do
 Control.Concurrent.forkIO (writeChan c x)
 (merge xs c)
merge [] c = return ()