-- Communicating Haskell Processes. -- Copyright (c) 2008, University of Kent. -- All rights reserved. -- -- Redistribution and use in source and binary forms, with or without -- modification, are permitted provided that the following conditions are -- met: -- -- * Redistributions of source code must retain the above copyright -- notice, this list of conditions and the following disclaimer. -- * Redistributions in binary form must reproduce the above copyright -- notice, this list of conditions and the following disclaimer in the -- documentation and/or other materials provided with the distribution. -- * Neither the name of the University of Kent nor the names of its -- contributors may be used to endorse or promote products derived from -- this software without specific prior written permission. -- -- THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS -- IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, -- THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR -- PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR -- CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, -- EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, -- PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR -- PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF -- LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING -- NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS -- SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -- | A module containing broadcast channels (one-to-many). Whereas a one-to-any -- channel features one writer sending a /single/ value to /one/ (of many) readers, a -- one-to-many channel features one writer sending the /same/ value to /many/ -- readers. So a one-to-any channel involves claiming the channel-end to ensure -- exclusivity, but a one-to-many channel involves enrolling on the channel-end -- (subscribing) before it can engage in communication. -- -- A communication on a one-to-many channel only takes place when the writer -- and all readers currently enrolled agree to communicate. What happens when -- the writer wants to communicate and no readers are enrolled is undefined -- (the writer may block, or may communicate happily to no-one). module Control.Concurrent.CHP.BroadcastChannels (BroadcastChanin, BroadcastChanout, OneToManyChannel, AnyToManyChannel, oneToManyChannel, anyToManyChannel) where import Control.Concurrent.STM import Control.Monad.Trans import Control.Concurrent.CHP.Barriers import Control.Concurrent.CHP.Base import Control.Concurrent.CHP.Channels import Control.Concurrent.CHP.CSP import Control.Concurrent.CHP.Enroll import Control.Concurrent.CHP.Event import Control.Concurrent.CHP.Mutex import Control.Concurrent.CHP.Traces.Base -- The general pattern of a broadcast channel is as follows: -- SYNC -> Agreement; the readers indicate they are all willing to read, and the -- writer indicates it is ready to write. Either side may ALT. -- -- After this synchronisation, the writer can write his data to the TVar, possibly -- following an extended action -- -- SYNC -> Reading; everyone syncs (no-one ALTs) to move to the reading phase -- -- After this synchronisation, the readers can all read the data from the TVar, -- and possibly complete an extended action -- -- SYNC -> Neutral; everyone syncs (no-one ALTs) to indicate one communication -- cycle has finished. After this the writer may proceed on their way (the -- main reason for needing a third sync). -- There used to be a warning that the first two constructors are never used, but they -- do need to be there for the Enum and Bounded instances... data Phase = Agreement | Reading | Neutral deriving (Enum, Bounded, Eq) -- So I constructed this horrendous hack to suppress the warning: dontWarnMe :: a -> a dontWarnMe = flip const [Agreement, Reading, Neutral] newtype BroadcastChannel a = BC (PhasedBarrier Phase, TVar a) -- | The reading end of a broadcast channel. You must enroll on it before -- you can read from it or poison it. newtype BroadcastChanin a = BI (BroadcastChannel a) -- | The writing end of a broadcast channel. newtype BroadcastChanout a = BO (BroadcastChannel a) instance Enrollable BroadcastChanin a where enroll c@(BI (BC (b,_))) f = enroll b (\eb -> waitForPhase Neutral eb >> f (Enrolled c)) resign (Enrolled (BI (BC (b,_)))) m = do x <- resign (Enrolled b) m waitForPhase Neutral (Enrolled b) return x instance WriteableChannel BroadcastChanout where extWriteChannel (BO (BC (b, tv))) m = do syncBarrierWith (Just . ChannelWrite) $ Enrolled b m >>= liftIO . atomically . writeTVar tv syncBarrierWith (const Nothing) $ Enrolled b syncBarrierWith (const Nothing) $ Enrolled b return () instance ReadableChannel (Enrolled BroadcastChanin) where extReadChannel (Enrolled (BI (BC (b, tv)))) f = do syncBarrierWith (Just . ChannelRead) $ Enrolled b syncBarrierWith (const Nothing) $ Enrolled b x <- liftIO (atomically $ readTVar tv) y <- f x syncBarrierWith (const Nothing) $ Enrolled b return y instance Poisonable (BroadcastChanout a) where poison (BO (BC (b,_))) = poison $ Enrolled b checkForPoison (BO (BC (b,_))) = checkForPoison $ Enrolled b instance Poisonable (Enrolled BroadcastChanin a) where poison (Enrolled (BI (BC (b,_)))) = poison $ Enrolled b checkForPoison (Enrolled (BI (BC (b,_)))) = checkForPoison $ Enrolled b newBroadcastChannel :: CHP (BroadcastChannel a) newBroadcastChannel = dontWarnMe {- see above -} $ do do b@(Barrier (e,_)) <- newPhasedBarrier Neutral -- Writer is always enrolled: liftIO $ atomically $ enrollEvent e tv <- liftIO $ atomically $ newTVar undefined return $ BC (b, tv) instance Channel BroadcastChanin BroadcastChanout where newChannel = liftCHP $ do c@(BC (b,_)) <- newBroadcastChannel return $ Chan (getBarrierIdentifier b) (BI c) (BO c) instance Channel BroadcastChanin (Shared BroadcastChanout) where newChannel = liftCHP $ do m <- newMutex c <- newChannel return $ Chan (getChannelIdentifier c) (reader c) (Shared (m, writer c)) type OneToManyChannel = Chan BroadcastChanin BroadcastChanout type AnyToManyChannel = Chan BroadcastChanin (Shared BroadcastChanout) oneToManyChannel :: CHP (OneToManyChannel a) oneToManyChannel = newChannel anyToManyChannel :: CHP (AnyToManyChannel a) anyToManyChannel = newChannel