-- Communicating Haskell Processes. -- Copyright (c) 2008--2009, University of Kent. -- All rights reserved. -- -- Redistribution and use in source and binary forms, with or without -- modification, are permitted provided that the following conditions are -- met: -- -- * Redistributions of source code must retain the above copyright -- notice, this list of conditions and the following disclaimer. -- * Redistributions in binary form must reproduce the above copyright -- notice, this list of conditions and the following disclaimer in the -- documentation and/or other materials provided with the distribution. -- * Neither the name of the University of Kent nor the names of its -- contributors may be used to endorse or promote products derived from -- this software without specific prior written permission. -- -- THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS -- IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, -- THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR -- PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR -- CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, -- EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, -- PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR -- PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF -- LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING -- NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS -- SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -- | A module containing broadcast channels (one-to-many). Whereas a one-to-any -- channel features one writer sending a /single/ value to /one/ (of many) readers, a -- one-to-many channel features one writer sending the /same/ value to /many/ -- readers. So a one-to-any channel involves claiming the channel-end to ensure -- exclusivity, but a one-to-many channel involves enrolling on the channel-end -- (subscribing) before it can engage in communication. -- -- A communication on a one-to-many channel only takes place when the writer -- and all readers currently enrolled agree to communicate. What happens when -- the writer wants to communicate and no readers are enrolled is undefined -- (the writer may block, or may communicate happily to no-one). -- -- This module also contains reduce channels (added in version 1.1.1). Because -- in CHP channels must have the same type at both ends, we use the Monoid -- type-class. It is important to be aware that the order of mappends will be -- non-deterministic, and thus you should either use an mappend that is commutative -- or code around this restruction. -- -- For example, a common thing to do would be to use lists as the type for -- reduce channels, make each writer write a single item list (but more is -- possible), then use the list afterwards, but be aware that it is unordered. -- If it is important to have an ordered list, make each writer write a pair -- containing a (unique) index value and the real data, then sort by the index -- value and discard it. -- -- Since reduce channels were added after the initial library design, there -- is a slight complication: it is not possible to use newChannel (and all -- similar functions) with reduce channels because it is impossible to express -- the Monoid constraint for the Channel instance. Instead, you must use manyToOneChannel -- and manyToAnyChannel. module Control.Concurrent.CHP.Channels.BroadcastReduce (BroadcastChanin, BroadcastChanout, OneToManyChannel, AnyToManyChannel, oneToManyChannel, anyToManyChannel, oneToManyChannel', anyToManyChannel', ReduceChanin, ReduceChanout, sameReduceChannel, ManyToOneChannel, ManyToAnyChannel, manyToOneChannel, manyToAnyChannel, manyToOneChannel', manyToAnyChannel') where import Control.Arrow import Control.Concurrent.STM import Control.Monad import Control.Monad.Trans import Data.Monoid import Control.Concurrent.CHP.Barriers import Control.Concurrent.CHP.Base import Control.Concurrent.CHP.Channels import Control.Concurrent.CHP.Channels.Base import Control.Concurrent.CHP.CSP import Control.Concurrent.CHP.Enroll import Control.Concurrent.CHP.Event import Control.Concurrent.CHP.Mutex import Control.Concurrent.CHP.Traces.Base -- | The Eq instance was added in version 1.4.0. -- -- In version 1.5.0, the broadcast and reduce channels do not appear correctly -- in the traces. newtype BroadcastChannel a = BC (Barrier, TVar (Maybe a), ManyToOneTVar Int) instance Eq (BroadcastChannel a) where (BC (_, tvX, _)) == (BC (_, tvY, _)) = tvX == tvY -- | The reading end of a broadcast channel. You must enroll on it before -- you can read from it or poison it. -- -- The Eq instance was added in version 1.4.0. newtype BroadcastChanin a = BI (BroadcastChannel a) deriving (Eq) -- | The writing end of a broadcast channel. -- -- The Eq instance was added in version 1.4.0. newtype BroadcastChanout a = BO (BroadcastChannel a) deriving (Eq) instance Enrollable BroadcastChanin a where enroll c@(BI (BC (b,_,_))) f = enroll b (const $ f (Enrolled c)) resign (Enrolled (BI (BC (b,_,_)))) m = resign (Enrolled b) m instance WriteableChannel BroadcastChanout where extWriteChannel' (BO (BC (b, tvSend, tvAck))) m = do syncBarrierWith (indivRecJust ChannelWrite) (resetManyToOneTVar tvAck . pred) $ Enrolled b -- subtract one for writer (x, r) <- m liftIO . atomically $ writeTVar tvSend $ Just x -- Must be two separate transactions: liftIO . atomically $ readManyToOneTVar tvAck return r instance ReadableChannel (Enrolled BroadcastChanin) where extReadChannel (Enrolled (BI (BC (b, tvSend, tvAck)))) f = do syncBarrierWith (indivRecJust ChannelRead) (resetManyToOneTVar tvAck . pred) $ Enrolled b x <- liftIO $ atomically $ readTVar tvSend >>= maybe retry return y <- f x liftIO $ atomically $ writeManyToOneTVar ((== 0), return 0) pred tvAck return y instance Poisonable (BroadcastChanout a) where poison (BO (BC (b,_,_))) = poison $ Enrolled b checkForPoison (BO (BC (b,_,_))) = checkForPoison $ Enrolled b instance Poisonable (Enrolled BroadcastChanin a) where poison (Enrolled (BI (BC (b,_,_)))) = poison $ Enrolled b checkForPoison (Enrolled (BI (BC (b,_,_)))) = checkForPoison $ Enrolled b newBroadcastChannel :: CHP (BroadcastChannel a) newBroadcastChannel = do do b@(Barrier (e, _, _)) <- newBarrier -- Writer is always enrolled: liftIO $ atomically $ enrollEvent e tvSend <- liftIO $ atomically $ newTVar Nothing tvAck <- liftIO $ atomically $ newManyToOneTVar 0 return $ BC (b, tvSend, tvAck) instance Channel BroadcastChanin BroadcastChanout where newChannel' _sh = liftCHP $ do c@(BC (b, _, _)) <- newBroadcastChannel return $ Chan (getBarrierIdentifier b) (BI c) (BO c) sameChannel (BI x) (BO y) = x == y instance Channel BroadcastChanin (Shared BroadcastChanout) where newChannel' _sh = liftCHP $ do m <- newMutex c <- newChannel return $ Chan (getChannelIdentifier c) (reader c) (Shared (m, writer c)) sameChannel (BI x) (Shared (_, BO y)) = x == y type OneToManyChannel = Chan BroadcastChanin BroadcastChanout type AnyToManyChannel = Chan BroadcastChanin (Shared BroadcastChanout) oneToManyChannel :: MonadCHP m => m (OneToManyChannel a) oneToManyChannel = newChannel anyToManyChannel :: MonadCHP m => m (AnyToManyChannel a) anyToManyChannel = newChannel -- | Added in version 1.5.0. -- -- In version 1.5.0, the broadcast and reduce channels do not appear correctly -- in the traces. oneToManyChannel' :: MonadCHP m => ChanOpts a -> m (OneToManyChannel a) oneToManyChannel' = newChannel' -- | Added in version 1.5.0. -- -- In version 1.5.0, the broadcast and reduce channels do not appear correctly -- in the traces. anyToManyChannel' :: MonadCHP m => ChanOpts a -> m (AnyToManyChannel a) anyToManyChannel' = newChannel' -- | The Eq instance was added in version 1.4.0. -- -- In version 1.5.0, the broadcast and reduce channels do not appear correctly -- in the traces. newtype ReduceChannel a = GC (Barrier, ManyToOneTVar (Int, Maybe (a, TVar Bool)), (a -> a -> a, a)) instance Eq (ReduceChannel a) where (GC (_, tvX, _)) == (GC (_, tvY, _)) = tvX == tvY -- | The reading end of a reduce channel. -- -- The Eq instance was added in version 1.4.0. newtype ReduceChanin a = GI (ReduceChannel a) deriving (Eq) -- | The writing end of a reduce channel. You must enroll on it before -- you can read from it or poison it. -- -- The Eq instance was added in version 1.4.0. newtype ReduceChanout a = GO (ReduceChannel a) deriving (Eq) instance Enrollable ReduceChanout a where enroll c@(GO (GC (b,_,_))) f = enroll b (const $ f (Enrolled c)) resign (Enrolled (GO (GC (b,_,_)))) m = resign (Enrolled b) m instance WriteableChannel (Enrolled ReduceChanout) where extWriteChannel' (Enrolled (GO (GC (b, tv, (f,_))))) m = do syncBarrierWith (indivRecJust ChannelWrite) (\n -> resetManyToOneTVar tv (pred n, Nothing)) $ Enrolled b -- Subtract one for reader (x, r) <- m (_, Just (_, rtvb)) <- liftIO . atomically $ do tvb <- newTVar False let upd (n, mx) = (pred n, Just $ maybe (x, tvb) (first $ f x) mx) writeManyToOneTVar ((== 0) . fst, return (0, Nothing)) upd tv -- Has to be two separate transactions liftIO $ atomically $ readTVar rtvb >>= flip unless retry return r instance ReadableChannel ReduceChanin where extReadChannel (GI (GC (b, tv, (_, _empty)))) f = do syncBarrierWith (indivRecJust ChannelRead) (\n -> resetManyToOneTVar tv (pred n, Nothing)) $ Enrolled b -- Subtract one for reader (_, Just (x, tvb)) <- liftIO $ atomically $ readManyToOneTVar tv y <- f x liftIO $ atomically $ writeTVar tvb True return y instance Poisonable (Enrolled ReduceChanout a) where poison (Enrolled (GO (GC (b,_,_)))) = poison $ Enrolled b checkForPoison (Enrolled (GO (GC (b,_,_)))) = checkForPoison $ Enrolled b instance Poisonable (ReduceChanin a) where poison (GI (GC (b,_,_))) = poison $ Enrolled b checkForPoison (GI (GC (b,_,_))) = checkForPoison $ Enrolled b newReduceChannel :: Monoid a => CHP (ReduceChannel a) newReduceChannel = do do b@(Barrier (e, _, _)) <- newBarrier -- Writer is always enrolled: liftIO $ atomically $ enrollEvent e mtv <- liftIO $ atomically $ newManyToOneTVar (0, Nothing) return $ GC (b, mtv, (mappend, mempty)) -- | The reduce channel version of sameChannel. -- -- This function was added in version 1.4.0. sameReduceChannel :: ReduceChanin a -> ReduceChanout a -> Bool sameReduceChannel (GI x) (GO y) = x == y type ManyToOneChannel = Chan ReduceChanin ReduceChanout type ManyToAnyChannel = Chan (Shared ReduceChanin) ReduceChanout manyToOneChannel :: (Monoid a, MonadCHP m) => m (ManyToOneChannel a) manyToOneChannel = do c@(GC (b,_,_)) <- liftCHP newReduceChannel return $ Chan (getBarrierIdentifier b) (GI c) (GO c) manyToAnyChannel :: (Monoid a, MonadCHP m) => m (ManyToAnyChannel a) manyToAnyChannel = do m <- newMutex c <- manyToOneChannel return $ Chan (getChannelIdentifier c) (Shared (m, reader c)) (writer c) -- | Added in version 1.5.0. -- -- In version 1.5.0, the broadcast and reduce channels do not appear correctly -- in the traces. manyToOneChannel' :: (Monoid a, MonadCHP m) => ChanOpts a -> m (ManyToOneChannel a) manyToOneChannel' = const manyToOneChannel --TODO -- | Added in version 1.5.0. -- -- In version 1.5.0, the broadcast and reduce channels do not appear correctly -- in the traces. manyToAnyChannel' :: (Monoid a, MonadCHP m) => ChanOpts a -> m (ManyToAnyChannel a) manyToAnyChannel' = const manyToAnyChannel --TODO