module Control.Concurrent.Futures.Chan (
Chan,
ChanType,
newChan,
writeChan,
readChan,
writeChanContents,
getChanContents,
mergeChan
) where
import qualified Control.Concurrent
import System.IO.Unsafe ( unsafeInterleaveIO )
import Control.Concurrent.Futures.Buffer
import Control.Concurrent.Futures.Futures
data Chan a
= Chan (Buffer (ItemType a))
(Buffer (ItemType a))
type ChanType a = ((Buffer (ItemType a)), (Buffer (ItemType a)))
type ItemType a = (Buffer(Item a))
data Item a = Item a (ItemType a)
newChan :: IO (Chan a)
newChan = do
hole <- newBuf
read_end <- newBuf
write_end <- newBuf
putBuf read_end hole
putBuf write_end hole
return (Chan read_end write_end)
writeChan :: Chan a -> a -> IO ()
writeChan (Chan read_end write_end) val = do
new_hole <- newBuf
old_hole <- getBuf write_end
putBuf write_end new_hole
putBuf old_hole (Item val new_hole)
readChan :: Chan a -> IO a
readChan (Chan read_end write_end) = do
chan_head <- getBuf read_end
(Item val content) <- getBuf chan_head
putBuf read_end content
return val
writeChanContents :: Chan a -> [a] -> IO ()
writeChanContents chan (x:xs) = do
Control.Concurrent.forkIO (writeChan chan x)
writeChanContents chan xs
>>= return
writeChanContents chan [] = return ()
getChanContents :: Chan a -> IO [a]
getChanContents ch
= unsafeInterleaveIO ( do
x <- readChan ch
xs <- getChanContents ch
return (x:xs)
)
mergeChan :: [a] -> [a] -> Chan a -> IO (Chan a)
mergeChan l1 l2 cm = do
Control.Concurrent.forkIO (merge l1 cm)
Control.Concurrent.forkIO (merge l2 cm)
return cm
merge (x:xs) c = do
Control.Concurrent.forkIO (writeChan c x)
(merge xs c)
merge [] c = return ()