module Control.Concurrent.CHP.Channels (
Chan, Channel(..), writeChannelStrict, newChannelWithLabel, newChannelWR, newChannelRW, ChannelTuple(..),
newChannelList, newChannelListWithLabels, newChannelListWithStem,
getChannelIdentifier,
Chanin, Chanout,
reader, writer, readers, writers,
ReadableChannel(..), WriteableChannel(..),
claim, Shared,
OneToOneChannel, oneToOneChannel, oneToOneChannelWithLabel,
OneToAnyChannel, oneToAnyChannel, oneToAnyChannelWithLabel,
AnyToOneChannel, anyToOneChannel, anyToOneChannelWithLabel,
AnyToAnyChannel, anyToAnyChannel, anyToAnyChannelWithLabel
)
where
import Control.Concurrent.STM.TVar
import Control.Monad
import Control.Monad.STM
import Control.Monad.Trans
import Control.Parallel.Strategies
import Data.Maybe
import Data.Monoid
import Data.Unique
import Control.Concurrent.CHP.Base
import Control.Concurrent.CHP.CSP
import Control.Concurrent.CHP.Event
import Control.Concurrent.CHP.Guard
import Control.Concurrent.CHP.Monad
import Control.Concurrent.CHP.Mutex
import Control.Concurrent.CHP.Poison
import Control.Concurrent.CHP.Traces.Base
newtype Chanin a = Chanin (STMChannel a) deriving Eq
newtype Chanout a = Chanout (STMChannel a) deriving Eq
newtype STMChannel a = STMChan (Event, TVar (WithPoison (Maybe a, Maybe ()))) deriving
Eq
type OneToOneChannel = Chan Chanin Chanout
type AnyToOneChannel = Chan (Chanin) (Shared Chanout)
type OneToAnyChannel = Chan (Shared Chanin) (Chanout)
type AnyToAnyChannel = Chan (Shared Chanin) (Shared Chanout)
class ChaninC c a where
startReadChannelC :: c a -> (Event, STM (WithPoison a))
endReadChannelC :: c a -> STM (WithPoison ())
readChannelC :: c a -> (Event, STM (), STM (WithPoison a))
poisonReadC :: c a -> IO ()
checkPoisonReadC :: c a -> IO (WithPoison ())
class ChanoutC c a where
startWriteChannelC :: c a -> (Event, STM (WithPoison ()))
sendWriteChannelC :: c a -> a -> STM (WithPoison ())
endWriteChannelC :: c a -> STM (WithPoison ())
writeChannelC :: c a -> a -> (Event, STM (), STM (WithPoison ()))
poisonWriteC :: c a -> IO ()
checkPoisonWriteC :: c a -> IO (WithPoison ())
class Channel r w where
newChannel :: MonadCHP m => m (Chan r w a)
sameChannel :: r a -> w a -> Bool
class ReadableChannel chanEnd where
readChannel :: chanEnd a -> CHP a
readChannel c = extReadChannel c return
extReadChannel :: chanEnd a -> (a -> CHP b) -> CHP b
class WriteableChannel chanEnd where
writeChannel :: chanEnd a -> a -> CHP ()
writeChannel c x = extWriteChannel c (return x) >> return ()
extWriteChannel :: chanEnd a -> CHP a -> CHP ()
extWriteChannel c m = extWriteChannel' c (liftM (flip (,) ()) m)
extWriteChannel' :: chanEnd a -> CHP (a, b) -> CHP b
class ChannelTuple t where
newChannels :: MonadCHP m => m t
writeChannelStrict :: (NFData a, WriteableChannel chanEnd) => chanEnd a -> a -> CHP ()
writeChannelStrict c x = (writeChannel c $| rnf) x
chan :: Monad m => m (Unique, c a) -> (c a -> r a) -> (c a -> w a) -> m (Chan r w a)
chan m r w = do (u, x) <- m
return $ Chan u (r x) (w x)
newChannelWithLabel :: (Channel r w, MonadCHP m) => String -> m (Chan r w a)
newChannelWithLabel l
= do c <- newChannel
liftCHP . liftPoison . liftTrace $ labelUnique (getChannelIdentifier c) l
return c
newChannelRW :: (Channel r w, MonadCHP m) => m (r a, w a)
newChannelRW = do c <- newChannel
return (reader c, writer c)
newChannelWR :: (Channel r w, MonadCHP m) => m (w a, r a)
newChannelWR = do c <- newChannel
return (writer c, reader c)
newChannelList :: (Channel r w, MonadCHP m) => Int -> m [Chan r w a]
newChannelList n = replicateM n newChannel
newChannelListWithStem :: (Channel r w, MonadCHP m) => Int -> String -> m [Chan r w a]
newChannelListWithStem n s = sequence [newChannelWithLabel (s ++ show i) | i <- [0 .. (n 1)]]
newChannelListWithLabels :: (Channel r w, MonadCHP m) => [String] -> m [Chan r w a]
newChannelListWithLabels = mapM newChannelWithLabel
readers :: [Chan r w a] -> [r a]
readers = map reader
writers :: [Chan r w a] -> [w a]
writers = map writer
stmChannel :: MonadIO m => m (Unique, STMChannel a)
stmChannel = liftIO $
do e <- newEvent ChannelComm 2
c <- atomically $ newTVar $ NoPoison (Nothing, Nothing)
return (getEventUnique e, STMChan (e,c))
oneToOneChannel :: MonadCHP m => m (OneToOneChannel a)
oneToOneChannel = newChannel
oneToOneChannelWithLabel :: MonadCHP m => String -> m (OneToOneChannel a)
oneToOneChannelWithLabel = newChannelWithLabel
claim :: Shared c a -> (c a -> CHP b) -> CHP b
claim (Shared (lock, c)) body
= scopeBlock
(claimMutex lock >> return c)
(\y -> do x <- body y
liftIO $ releaseMutex lock
return x)
(releaseMutex lock)
anyToOneChannel :: MonadCHP m => m (AnyToOneChannel a)
anyToOneChannel = newChannel
oneToAnyChannel :: MonadCHP m => m (OneToAnyChannel a)
oneToAnyChannel = newChannel
anyToAnyChannel :: MonadCHP m => m (AnyToAnyChannel a)
anyToAnyChannel = newChannel
anyToOneChannelWithLabel :: MonadCHP m => String -> m (AnyToOneChannel a)
anyToOneChannelWithLabel = newChannelWithLabel
oneToAnyChannelWithLabel :: MonadCHP m => String -> m (OneToAnyChannel a)
oneToAnyChannelWithLabel = newChannelWithLabel
anyToAnyChannelWithLabel :: MonadCHP m => String -> m (AnyToAnyChannel a)
anyToAnyChannelWithLabel = newChannelWithLabel
instance ReadableChannel Chanin where
readChannel (Chanin c)
= let (e, mdur, mafter) = readChannelC c in
buildOnEventPoison (indivRecJust ChannelRead) e
(EventActions (return ()) mdur)
(liftSTM mafter) >>= checkPoison
extReadChannel (Chanin c) body
= let (e, m) = startReadChannelC c in
scopeBlock
(buildOnEventPoison (indivRecJust ChannelRead) e mempty (liftSTM m) >>= checkPoison)
(\val -> do x <- body val
liftSTM $ endReadChannelC c
return x)
(poisonReadC c)
instance WriteableChannel Chanout where
writeChannel (Chanout c) x
= let (e, mdur, mafter) = writeChannelC c x in
buildOnEventPoison (indivRecJust ChannelWrite) e
(EventActions (return ()) mdur) (liftSTM mafter)
>>= checkPoison
extWriteChannel' (Chanout c) body
= let (e, m) = startWriteChannelC c in
scopeBlock
(buildOnEventPoison (indivRecJust ChannelWrite)
e mempty (liftSTM m) >>= checkPoison)
(const $ do (x, r) <- body
sequence [liftSTM $ sendWriteChannelC c x
,liftSTM (endWriteChannelC c)]
>>= checkPoison . mergeWithPoison
return r)
(poisonWriteC c)
instance Poisonable (Chanin a) where
poison (Chanin c) = liftIO $ poisonReadC c
checkForPoison (Chanin c) = liftCHP $ liftIO (checkPoisonReadC c) >>= checkPoison
instance Poisonable (Chanout a) where
poison (Chanout c) = liftIO $ poisonWriteC c
checkForPoison (Chanout c) = liftCHP $ liftIO (checkPoisonWriteC c) >>= checkPoison
instance (Channel r w) => ChannelTuple (Chan r w a, Chan r w a) where
newChannels = do c0 <- newChannel
c1 <- newChannel
return (c0, c1)
instance (Channel r w) => ChannelTuple (Chan r w a, Chan r w a, Chan r w a) where
newChannels = do c0 <- newChannel
c1 <- newChannel
c2 <- newChannel
return (c0, c1, c2)
instance (Channel r w) => ChannelTuple (Chan r w a, Chan r w a, Chan r w a,
Chan r w a) where
newChannels = do c0 <- newChannel
c1 <- newChannel
c2 <- newChannel
c3 <- newChannel
return (c0, c1, c2, c3)
instance (Channel r w) => ChannelTuple (Chan r w a, Chan r w a, Chan r w a,
Chan r w a, Chan r w a) where
newChannels = do c0 <- newChannel
c1 <- newChannel
c2 <- newChannel
c3 <- newChannel
c4 <- newChannel
return (c0, c1, c2, c3, c4)
instance (Channel r w) => ChannelTuple (Chan r w a, Chan r w a, Chan r w a,
Chan r w a, Chan r w a, Chan r w a) where
newChannels = do c0 <- newChannel
c1 <- newChannel
c2 <- newChannel
c3 <- newChannel
c4 <- newChannel
c5 <- newChannel
return (c0, c1, c2, c3, c4, c5)
consumeData :: TVar (WithPoison (Maybe a, Maybe ())) -> STM (WithPoison a)
consumeData tv = do d <- readTVar tv
case d of
PoisonItem -> return PoisonItem
NoPoison (Nothing, _) -> retry
NoPoison (Just x, a) -> do writeTVar tv $ NoPoison (Nothing, a)
return $ NoPoison x
sendData :: TVar (WithPoison (Maybe a, Maybe ())) -> a -> STM (WithPoison ())
sendData tv x = do y <- readTVar tv
case y of
PoisonItem -> return PoisonItem
NoPoison (Just _, _) -> error "CHP: Found data while sending data"
NoPoison (Nothing, a) -> do writeTVar tv $ NoPoison (Just x, a)
return $ NoPoison ()
consumeAck :: TVar (WithPoison (Maybe a, Maybe ())) -> STM (WithPoison ())
consumeAck tv = do d <- readTVar tv
case d of
PoisonItem -> return PoisonItem
NoPoison (_, Nothing) -> retry
NoPoison (x, Just _) -> do writeTVar tv $ NoPoison (x, Nothing)
return $ NoPoison ()
sendAck :: TVar (WithPoison (Maybe a, Maybe ())) -> STM (WithPoison ())
sendAck tv = do d <- readTVar tv
case d of
PoisonItem -> return PoisonItem
NoPoison (_, Just _) -> error "CHP: Found ack while placing ack!"
NoPoison (x, Nothing) -> do writeTVar tv $ NoPoison (x, Just ())
return $ NoPoison ()
instance ChaninC STMChannel a where
startReadChannelC (STMChan (e,tv)) = (e, consumeData tv)
endReadChannelC (STMChan (_,tv)) = sendAck tv
readChannelC (STMChan (e, tv))
= (e, sendAck tv >> return (), consumeData tv)
poisonReadC (STMChan (e,tv))
= liftSTM $ do poisonEvent e
writeTVar tv PoisonItem
checkPoisonReadC (STMChan (e,_)) = liftSTM $ checkEventForPoison e
instance ChanoutC STMChannel a where
startWriteChannelC (STMChan (e,tv))
= (e, do x <- readTVar tv
case x of
PoisonItem -> return PoisonItem
NoPoison _ -> return $ NoPoison ())
sendWriteChannelC (STMChan (_, tv)) val
= sendData tv val
endWriteChannelC (STMChan (_, tv))
= consumeAck tv
writeChannelC (STMChan (e, tv)) val
= (e, sendData tv val >> return (), consumeAck tv)
poisonWriteC (STMChan (e,tv))
= liftSTM $ do poisonEvent e
writeTVar tv PoisonItem
checkPoisonWriteC (STMChan (e,_)) = liftSTM $ checkEventForPoison e
instance Channel Chanin Chanout where
newChannel = chan stmChannel Chanin Chanout
sameChannel (Chanin x) (Chanout y) = x == y
instance Channel (Shared Chanin) Chanout where
newChannel = do m <- newMutex
c <- newChannel
return $ Chan (getChannelIdentifier c) (Shared (m, reader c)) (writer c)
sameChannel (Shared (_, Chanin x)) (Chanout y) = x == y
instance Channel Chanin (Shared Chanout) where
newChannel = do m <- newMutex
c <- newChannel
return $ Chan (getChannelIdentifier c) (reader c) (Shared (m, writer c))
sameChannel (Chanin x) (Shared (_, Chanout y)) = x == y
instance Channel (Shared Chanin) (Shared Chanout) where
newChannel = do m <- newMutex
m' <- newMutex
c <- newChannel
return $ Chan (getChannelIdentifier c) (Shared (m, reader c)) (Shared (m', writer c))
sameChannel (Shared (_, Chanin x)) (Shared (_, Chanout y)) = x == y