{-# LANGUAGE DeriveDataTypeable #-} -- | Functional channels -- | A channel data type which allows consumers to hold references to different points in a stream at the same time. Elements of a channel are kept alive only so long as there are references pointing before those elements. And producers on a channel are kept alive only so long as there are consumers. module Control.CUtils.FChan (Chan, DoneReadingException(..), takeChan, newChan, makeConsumer) where import Control.Concurrent.MVar import Control.Exception import System.Mem.Weak import Data.Typeable newtype Chan t = Chan (MVar (t, Chan t)) -- | Thrown by the writer function when the garbage collector detects that no one will read it. data DoneReadingException = DoneReadingException deriving (Typeable, Show) instance Exception DoneReadingException addChan vr x = modifyMVar_ vr (\weak -> do may <- deRefWeak weak case may of Just vr2 -> do vr' <- newEmptyMVar putMVar vr2 (x, Chan vr') mkWeak vr' vr' Nothing Nothing -> throwIO DoneReadingException) -- | Take the first element from a channel, and a channel representing the remainder of the output. takeChan (Chan vr) = readMVar vr -- | Create a new channel. The first return value is a function that can be used to add values to the channel. The second return value is the channel itself. newChan = do vr <- newEmptyMVar weak <- mkWeak vr vr Nothing vr2 <- newMVar weak return (addChan vr2, Chan vr) -- | The first return value is a thunk that returns values from the channel successively, starting from the position of the parameter channel. The second thunk can be used to retrieve the position of the channel after all the reads made using the first thunk. makeConsumer chn = do vr2 <- newMVar chn return (modifyMVar vr2 (\chn -> do (x, chn2) <- takeChan chn return (chn2, x)), readMVar vr2)