module Control.Concurrent.CHP.BroadcastChannels (BroadcastChanin, BroadcastChanout,
OneToManyChannel, AnyToManyChannel, oneToManyChannel, anyToManyChannel,
oneToManyChannelWithLabel, anyToManyChannelWithLabel, ReduceChanin,
ReduceChanout, sameReduceChannel, ManyToOneChannel, ManyToAnyChannel, manyToOneChannel,
manyToAnyChannel, manyToOneChannelWithLabel, manyToAnyChannelWithLabel)
where
import Control.Concurrent.STM
import Control.Monad.Trans
import Data.Monoid
import Control.Concurrent.CHP.Barriers
import Control.Concurrent.CHP.Base
import Control.Concurrent.CHP.Channels
import Control.Concurrent.CHP.CSP
import Control.Concurrent.CHP.Enroll
import Control.Concurrent.CHP.Event
import Control.Concurrent.CHP.Mutex
import Control.Concurrent.CHP.Traces.Base
data Phase = Agreement | Reading | Neutral deriving (Enum, Bounded, Eq)
dontWarnMe :: a -> a
dontWarnMe = flip const [Agreement, Reading, Neutral]
newtype BroadcastChannel a = BC (PhasedBarrier Phase, TVar a)
instance Eq (BroadcastChannel a) where
(BC (_, tvX)) == (BC (_, tvY)) = tvX == tvY
newtype BroadcastChanin a = BI (BroadcastChannel a) deriving (Eq)
newtype BroadcastChanout a = BO (BroadcastChannel a) deriving (Eq)
instance Enrollable BroadcastChanin a where
enroll c@(BI (BC (b,_))) f = enroll b (\eb -> waitForPhase Neutral eb >> f (Enrolled c))
resign (Enrolled (BI (BC (b,_)))) m
= do x <- resign (Enrolled b) m
waitForPhase Neutral (Enrolled b)
return x
instance WriteableChannel BroadcastChanout where
extWriteChannel' (BO (BC (b, tv))) m
= do syncBarrierWith (indivRecJust ChannelWrite)
$ Enrolled b
(x, r) <- m
liftIO . atomically $ writeTVar tv x
syncBarrierWith (const $ const Nothing)
$ Enrolled b
syncBarrierWith (const $ const Nothing)
$ Enrolled b
return r
instance ReadableChannel (Enrolled BroadcastChanin) where
extReadChannel (Enrolled (BI (BC (b, tv)))) f
= do syncBarrierWith (indivRecJust ChannelRead)
$ Enrolled b
syncBarrierWith (const $ const Nothing)
$ Enrolled b
x <- liftIO (atomically $ readTVar tv)
y <- f x
syncBarrierWith (const $ const Nothing)
$ Enrolled b
return y
instance Poisonable (BroadcastChanout a) where
poison (BO (BC (b,_))) = poison $ Enrolled b
checkForPoison (BO (BC (b,_))) = checkForPoison $ Enrolled b
instance Poisonable (Enrolled BroadcastChanin a) where
poison (Enrolled (BI (BC (b,_)))) = poison $ Enrolled b
checkForPoison (Enrolled (BI (BC (b,_)))) = checkForPoison $ Enrolled b
newBroadcastChannel :: CHP (BroadcastChannel a)
newBroadcastChannel = dontWarnMe $ do
do b@(Barrier (e, _, _)) <- newPhasedBarrier Neutral
liftIO $ atomically $ enrollEvent e
tv <- liftIO $ atomically $ newTVar undefined
return $ BC (b, tv)
instance Channel BroadcastChanin BroadcastChanout where
newChannel = liftCHP $ do
c@(BC (b,_)) <- newBroadcastChannel
return $ Chan (getBarrierIdentifier b) (BI c) (BO c)
sameChannel (BI x) (BO y) = x == y
instance Channel BroadcastChanin (Shared BroadcastChanout) where
newChannel = liftCHP $ do
m <- newMutex
c <- newChannel
return $ Chan (getChannelIdentifier c) (reader c) (Shared (m, writer c))
sameChannel (BI x) (Shared (_, BO y)) = x == y
type OneToManyChannel = Chan BroadcastChanin BroadcastChanout
type AnyToManyChannel = Chan BroadcastChanin (Shared BroadcastChanout)
oneToManyChannel :: MonadCHP m => m (OneToManyChannel a)
oneToManyChannel = newChannel
anyToManyChannel :: MonadCHP m => m (AnyToManyChannel a)
anyToManyChannel = newChannel
oneToManyChannelWithLabel :: MonadCHP m => String -> m (OneToManyChannel a)
oneToManyChannelWithLabel = newChannelWithLabel
anyToManyChannelWithLabel :: MonadCHP m => String -> m (AnyToManyChannel a)
anyToManyChannelWithLabel = newChannelWithLabel
newtype ReduceChannel a = GC (PhasedBarrier Phase, TVar a, (a -> a -> a, a))
instance Eq (ReduceChannel a) where
(GC (_, tvX, _)) == (GC (_, tvY, _)) = tvX == tvY
newtype ReduceChanin a = GI (ReduceChannel a) deriving (Eq)
newtype ReduceChanout a = GO (ReduceChannel a) deriving (Eq)
instance Enrollable ReduceChanout a where
enroll c@(GO (GC (b,_,_))) f = enroll b (\eb -> waitForPhase Neutral eb >> f (Enrolled c))
resign (Enrolled (GO (GC (b,_,_)))) m
= do x <- resign (Enrolled b) m
waitForPhase Neutral (Enrolled b)
return x
instance WriteableChannel (Enrolled ReduceChanout) where
extWriteChannel' (Enrolled (GO (GC (b, tv, (f,_))))) m
= do syncBarrierWith (indivRecJust ChannelWrite)
$ Enrolled b
(x, r) <- m
liftIO . atomically $ readTVar tv >>= writeTVar tv . f x
syncBarrierWith (const $ const Nothing)
$ Enrolled b
syncBarrierWith (const $ const Nothing)
$ Enrolled b
return r
instance ReadableChannel ReduceChanin where
extReadChannel (GI (GC (b, tv, (_, empty)))) f
= do syncBarrierWith (indivRecJust ChannelRead)
$ Enrolled b
syncBarrierWith (const $ const Nothing)
$ Enrolled b
x <- liftIO (atomically $ readTVar tv)
y <- f x
liftIO (atomically $ writeTVar tv empty)
syncBarrierWith (const $ const Nothing)
$ Enrolled b
return y
instance Poisonable (Enrolled ReduceChanout a) where
poison (Enrolled (GO (GC (b,_,_)))) = poison $ Enrolled b
checkForPoison (Enrolled (GO (GC (b,_,_)))) = checkForPoison $ Enrolled b
instance Poisonable (ReduceChanin a) where
poison (GI (GC (b,_,_))) = poison $ Enrolled b
checkForPoison (GI (GC (b,_,_))) = checkForPoison $ Enrolled b
newReduceChannel :: Monoid a => CHP (ReduceChannel a)
newReduceChannel = dontWarnMe $ do
do b@(Barrier (e, _, _)) <- newPhasedBarrier Neutral
liftIO $ atomically $ enrollEvent e
tv <- liftIO $ atomically $ newTVar mempty
return $ GC (b, tv, (mappend, mempty))
sameReduceChannel :: ReduceChanin a -> ReduceChanout a -> Bool
sameReduceChannel (GI x) (GO y) = x == y
type ManyToOneChannel = Chan ReduceChanin ReduceChanout
type ManyToAnyChannel = Chan (Shared ReduceChanin) ReduceChanout
manyToOneChannel :: (Monoid a, MonadCHP m) => m (ManyToOneChannel a)
manyToOneChannel = do
c@(GC (b,_,_)) <- liftCHP newReduceChannel
return $ Chan (getBarrierIdentifier b) (GI c) (GO c)
manyToAnyChannel :: (Monoid a, MonadCHP m) => m (ManyToAnyChannel a)
manyToAnyChannel = do
m <- newMutex
c <- manyToOneChannel
return $ Chan (getChannelIdentifier c) (Shared (m, reader c)) (writer c)
manyToOneChannelWithLabel :: (Monoid a, MonadCHP m) => String -> m (ManyToOneChannel a)
manyToOneChannelWithLabel l
= do c <- manyToOneChannel
liftCHP . liftPoison . liftTrace $ labelUnique (getChannelIdentifier c) l
return c
manyToAnyChannelWithLabel :: (Monoid a, MonadCHP m) => String -> m (ManyToAnyChannel a)
manyToAnyChannelWithLabel l
= do c <- manyToAnyChannel
liftCHP . liftPoison . liftTrace $ labelUnique (getChannelIdentifier c) l
return c