module Control.Concurrent.CHP.BroadcastChannels (BroadcastChanin, BroadcastChanout,
OneToManyChannel, AnyToManyChannel, oneToManyChannel, anyToManyChannel, ReduceChanin,
ReduceChanout, ManyToOneChannel, ManyToAnyChannel, manyToOneChannel, manyToAnyChannel)
where
import Control.Concurrent.STM
import Control.Monad.Trans
import Data.Monoid
import Control.Concurrent.CHP.Barriers
import Control.Concurrent.CHP.Base
import Control.Concurrent.CHP.Channels
import Control.Concurrent.CHP.CSP
import Control.Concurrent.CHP.Enroll
import Control.Concurrent.CHP.Event
import Control.Concurrent.CHP.Mutex
import Control.Concurrent.CHP.Traces.Base
data Phase = Agreement | Reading | Neutral deriving (Enum, Bounded, Eq)
dontWarnMe :: a -> a
dontWarnMe = flip const [Agreement, Reading, Neutral]
newtype BroadcastChannel a = BC (PhasedBarrier Phase, TVar a)
newtype BroadcastChanin a = BI (BroadcastChannel a)
newtype BroadcastChanout a = BO (BroadcastChannel a)
instance Enrollable BroadcastChanin a where
enroll c@(BI (BC (b,_))) f = enroll b (\eb -> waitForPhase Neutral eb >> f (Enrolled c))
resign (Enrolled (BI (BC (b,_)))) m
= do x <- resign (Enrolled b) m
waitForPhase Neutral (Enrolled b)
return x
instance WriteableChannel BroadcastChanout where
extWriteChannel (BO (BC (b, tv))) m
= do syncBarrierWith (Just . ChannelWrite)
$ Enrolled b
m >>= liftIO . atomically . writeTVar tv
syncBarrierWith (const Nothing)
$ Enrolled b
syncBarrierWith (const Nothing)
$ Enrolled b
return ()
instance ReadableChannel (Enrolled BroadcastChanin) where
extReadChannel (Enrolled (BI (BC (b, tv)))) f
= do syncBarrierWith (Just . ChannelRead)
$ Enrolled b
syncBarrierWith (const Nothing)
$ Enrolled b
x <- liftIO (atomically $ readTVar tv)
y <- f x
syncBarrierWith (const Nothing)
$ Enrolled b
return y
instance Poisonable (BroadcastChanout a) where
poison (BO (BC (b,_))) = poison $ Enrolled b
checkForPoison (BO (BC (b,_))) = checkForPoison $ Enrolled b
instance Poisonable (Enrolled BroadcastChanin a) where
poison (Enrolled (BI (BC (b,_)))) = poison $ Enrolled b
checkForPoison (Enrolled (BI (BC (b,_)))) = checkForPoison $ Enrolled b
newBroadcastChannel :: CHP (BroadcastChannel a)
newBroadcastChannel = dontWarnMe $ do
do b@(Barrier (e,_)) <- newPhasedBarrier Neutral
liftIO $ atomically $ enrollEvent e
tv <- liftIO $ atomically $ newTVar undefined
return $ BC (b, tv)
instance Channel BroadcastChanin BroadcastChanout where
newChannel = liftCHP $ do
c@(BC (b,_)) <- newBroadcastChannel
return $ Chan (getBarrierIdentifier b) (BI c) (BO c)
instance Channel BroadcastChanin (Shared BroadcastChanout) where
newChannel = liftCHP $ do
m <- newMutex
c <- newChannel
return $ Chan (getChannelIdentifier c) (reader c) (Shared (m, writer c))
type OneToManyChannel = Chan BroadcastChanin BroadcastChanout
type AnyToManyChannel = Chan BroadcastChanin (Shared BroadcastChanout)
oneToManyChannel :: CHP (OneToManyChannel a)
oneToManyChannel = newChannel
anyToManyChannel :: CHP (AnyToManyChannel a)
anyToManyChannel = newChannel
newtype ReduceChannel a = GC (PhasedBarrier Phase, TVar a, (a -> a -> a, a))
newtype ReduceChanin a = GI (ReduceChannel a)
newtype ReduceChanout a = GO (ReduceChannel a)
instance Enrollable ReduceChanout a where
enroll c@(GO (GC (b,_,_))) f = enroll b (\eb -> waitForPhase Neutral eb >> f (Enrolled c))
resign (Enrolled (GO (GC (b,_,_)))) m
= do x <- resign (Enrolled b) m
waitForPhase Neutral (Enrolled b)
return x
instance WriteableChannel (Enrolled ReduceChanout) where
extWriteChannel (Enrolled (GO (GC (b, tv, (f,_))))) m
= do syncBarrierWith (Just . ChannelWrite)
$ Enrolled b
m >>= liftIO . atomically . \x -> readTVar tv >>= writeTVar tv . f x
syncBarrierWith (const Nothing)
$ Enrolled b
syncBarrierWith (const Nothing)
$ Enrolled b
return ()
instance ReadableChannel ReduceChanin where
extReadChannel (GI (GC (b, tv, (_, empty)))) f
= do syncBarrierWith (Just . ChannelRead)
$ Enrolled b
syncBarrierWith (const Nothing)
$ Enrolled b
x <- liftIO (atomically $ readTVar tv)
y <- f x
liftIO (atomically $ writeTVar tv empty)
syncBarrierWith (const Nothing)
$ Enrolled b
return y
instance Poisonable (Enrolled ReduceChanout a) where
poison (Enrolled (GO (GC (b,_,_)))) = poison $ Enrolled b
checkForPoison (Enrolled (GO (GC (b,_,_)))) = checkForPoison $ Enrolled b
instance Poisonable (ReduceChanin a) where
poison (GI (GC (b,_,_))) = poison $ Enrolled b
checkForPoison (GI (GC (b,_,_))) = checkForPoison $ Enrolled b
newReduceChannel :: Monoid a => CHP (ReduceChannel a)
newReduceChannel = dontWarnMe $ do
do b@(Barrier (e,_)) <- newPhasedBarrier Neutral
liftIO $ atomically $ enrollEvent e
tv <- liftIO $ atomically $ newTVar mempty
return $ GC (b, tv, (mappend, mempty))
type ManyToOneChannel = Chan ReduceChanin ReduceChanout
type ManyToAnyChannel = Chan (Shared ReduceChanin) ReduceChanout
manyToOneChannel :: Monoid a => CHP (ManyToOneChannel a)
manyToOneChannel = do
c@(GC (b,_,_)) <- newReduceChannel
return $ Chan (getBarrierIdentifier b) (GI c) (GO c)
manyToAnyChannel :: Monoid a => CHP (ManyToAnyChannel a)
manyToAnyChannel = do
m <- newMutex
c <- manyToOneChannel
return $ Chan (getChannelIdentifier c) (Shared (m, reader c)) (writer c)