module Control.Concurrent.CHP.Channels.BroadcastReduce (BroadcastChanin, BroadcastChanout,
OneToManyChannel, AnyToManyChannel, oneToManyChannel, anyToManyChannel,
oneToManyChannel', anyToManyChannel', ReduceChanin,
ReduceChanout, sameReduceChannel, ManyToOneChannel, ManyToAnyChannel, manyToOneChannel,
manyToAnyChannel, manyToOneChannel', manyToAnyChannel')
where
import Control.Arrow
import Control.Concurrent.STM
import Control.Monad
import Control.Monad.Trans
import Data.Monoid
import Control.Concurrent.CHP.Barriers
import Control.Concurrent.CHP.Base
import Control.Concurrent.CHP.Channels
import Control.Concurrent.CHP.Channels.Base
import Control.Concurrent.CHP.CSP
import Control.Concurrent.CHP.Enroll
import Control.Concurrent.CHP.Event
import Control.Concurrent.CHP.Mutex
import Control.Concurrent.CHP.Traces.Base
newtype BroadcastChannel a = BC (Barrier, TVar (Maybe a), ManyToOneTVar Int)
instance Eq (BroadcastChannel a) where
(BC (_, tvX, _)) == (BC (_, tvY, _)) = tvX == tvY
newtype BroadcastChanin a = BI (BroadcastChannel a) deriving (Eq)
newtype BroadcastChanout a = BO (BroadcastChannel a) deriving (Eq)
instance Enrollable BroadcastChanin a where
enroll c@(BI (BC (b,_,_))) f = enroll b (const $ f (Enrolled c))
resign (Enrolled (BI (BC (b,_,_)))) m
= resign (Enrolled b) m
instance WriteableChannel BroadcastChanout where
extWriteChannel' (BO (BC (b, tvSend, tvAck))) m
= do syncBarrierWith (indivRecJust ChannelWrite)
(resetManyToOneTVar tvAck . pred) $ Enrolled b
(x, r) <- m
liftIO . atomically $ writeTVar tvSend $ Just x
liftIO . atomically $ readManyToOneTVar tvAck
return r
instance ReadableChannel (Enrolled BroadcastChanin) where
extReadChannel (Enrolled (BI (BC (b, tvSend, tvAck)))) f
= do syncBarrierWith (indivRecJust ChannelRead)
(resetManyToOneTVar tvAck . pred) $ Enrolled b
x <- liftIO $ atomically $ readTVar tvSend >>= maybe retry return
y <- f x
liftIO $ atomically $ writeManyToOneTVar ((== 0), return 0) pred tvAck
return y
instance Poisonable (BroadcastChanout a) where
poison (BO (BC (b,_,_))) = poison $ Enrolled b
checkForPoison (BO (BC (b,_,_))) = checkForPoison $ Enrolled b
instance Poisonable (Enrolled BroadcastChanin a) where
poison (Enrolled (BI (BC (b,_,_)))) = poison $ Enrolled b
checkForPoison (Enrolled (BI (BC (b,_,_)))) = checkForPoison $ Enrolled b
newBroadcastChannel :: CHP (BroadcastChannel a)
newBroadcastChannel = do
do b@(Barrier (e, _, _)) <- newBarrier
liftIO $ atomically $ enrollEvent e
tvSend <- liftIO $ atomically $ newTVar Nothing
tvAck <- liftIO $ atomically $ newManyToOneTVar 0
return $ BC (b, tvSend, tvAck)
instance Channel BroadcastChanin BroadcastChanout where
newChannel' _sh = liftCHP $ do
c@(BC (b, _, _)) <- newBroadcastChannel
return $ Chan (getBarrierIdentifier b) (BI c) (BO c)
sameChannel (BI x) (BO y) = x == y
instance Channel BroadcastChanin (Shared BroadcastChanout) where
newChannel' _sh = liftCHP $ do
m <- newMutex
c <- newChannel
return $ Chan (getChannelIdentifier c) (reader c) (Shared (m, writer c))
sameChannel (BI x) (Shared (_, BO y)) = x == y
type OneToManyChannel = Chan BroadcastChanin BroadcastChanout
type AnyToManyChannel = Chan BroadcastChanin (Shared BroadcastChanout)
oneToManyChannel :: MonadCHP m => m (OneToManyChannel a)
oneToManyChannel = newChannel
anyToManyChannel :: MonadCHP m => m (AnyToManyChannel a)
anyToManyChannel = newChannel
oneToManyChannel' :: MonadCHP m => ChanOpts a -> m (OneToManyChannel a)
oneToManyChannel' = newChannel'
anyToManyChannel' :: MonadCHP m => ChanOpts a -> m (AnyToManyChannel a)
anyToManyChannel' = newChannel'
newtype ReduceChannel a = GC (Barrier, ManyToOneTVar (Int, Maybe (a, TVar Bool)), (a -> a -> a, a))
instance Eq (ReduceChannel a) where
(GC (_, tvX, _)) == (GC (_, tvY, _)) = tvX == tvY
newtype ReduceChanin a = GI (ReduceChannel a) deriving (Eq)
newtype ReduceChanout a = GO (ReduceChannel a) deriving (Eq)
instance Enrollable ReduceChanout a where
enroll c@(GO (GC (b,_,_))) f = enroll b (const $ f (Enrolled c))
resign (Enrolled (GO (GC (b,_,_)))) m
= resign (Enrolled b) m
instance WriteableChannel (Enrolled ReduceChanout) where
extWriteChannel' (Enrolled (GO (GC (b, tv, (f,_))))) m
= do syncBarrierWith (indivRecJust ChannelWrite)
(\n -> resetManyToOneTVar tv (pred n, Nothing)) $ Enrolled b
(x, r) <- m
(_, Just (_, rtvb)) <- liftIO . atomically $ do
tvb <- newTVar False
let upd (n, mx) = (pred n, Just $ maybe (x, tvb) (first $ f x) mx)
writeManyToOneTVar ((== 0) . fst, return (0, Nothing)) upd tv
liftIO $ atomically $ readTVar rtvb >>= flip unless retry
return r
instance ReadableChannel ReduceChanin where
extReadChannel (GI (GC (b, tv, (_, _empty)))) f
= do syncBarrierWith (indivRecJust ChannelRead)
(\n -> resetManyToOneTVar tv (pred n, Nothing)) $ Enrolled b
(_, Just (x, tvb)) <- liftIO $ atomically $ readManyToOneTVar tv
y <- f x
liftIO $ atomically $ writeTVar tvb True
return y
instance Poisonable (Enrolled ReduceChanout a) where
poison (Enrolled (GO (GC (b,_,_)))) = poison $ Enrolled b
checkForPoison (Enrolled (GO (GC (b,_,_)))) = checkForPoison $ Enrolled b
instance Poisonable (ReduceChanin a) where
poison (GI (GC (b,_,_))) = poison $ Enrolled b
checkForPoison (GI (GC (b,_,_))) = checkForPoison $ Enrolled b
newReduceChannel :: Monoid a => CHP (ReduceChannel a)
newReduceChannel = do
do b@(Barrier (e, _, _)) <- newBarrier
liftIO $ atomically $ enrollEvent e
mtv <- liftIO $ atomically $ newManyToOneTVar (0, Nothing)
return $ GC (b, mtv, (mappend, mempty))
sameReduceChannel :: ReduceChanin a -> ReduceChanout a -> Bool
sameReduceChannel (GI x) (GO y) = x == y
type ManyToOneChannel = Chan ReduceChanin ReduceChanout
type ManyToAnyChannel = Chan (Shared ReduceChanin) ReduceChanout
manyToOneChannel :: (Monoid a, MonadCHP m) => m (ManyToOneChannel a)
manyToOneChannel = do
c@(GC (b,_,_)) <- liftCHP newReduceChannel
return $ Chan (getBarrierIdentifier b) (GI c) (GO c)
manyToAnyChannel :: (Monoid a, MonadCHP m) => m (ManyToAnyChannel a)
manyToAnyChannel = do
m <- newMutex
c <- manyToOneChannel
return $ Chan (getChannelIdentifier c) (Shared (m, reader c)) (writer c)
manyToOneChannel' :: (Monoid a, MonadCHP m) => ChanOpts a -> m (ManyToOneChannel a)
manyToOneChannel' = const manyToOneChannel --TODO
manyToAnyChannel' :: (Monoid a, MonadCHP m) => ChanOpts a -> m (ManyToAnyChannel a)
manyToAnyChannel' = const manyToAnyChannel --TODO