module Control.Concurrent.CHP.Channels (
Chan, Channel(..), writeChannelStrict, newChannelWithLabel, newChannelWR, newChannelRW, ChannelTuple(..),
newChannelList, newChannelListWithLabels, newChannelListWithStem,
getChannelIdentifier,
Chanin, Chanout,
reader, writer, readers, writers,
ReadableChannel(..), WriteableChannel(..),
claim, Shared,
OneToOneChannel, oneToOneChannel,
OneToAnyChannel, oneToAnyChannel,
AnyToOneChannel, anyToOneChannel,
AnyToAnyChannel, anyToAnyChannel
)
where
import Control.Concurrent.STM.TVar
import Control.Monad
import Control.Monad.STM
import Control.Monad.Trans
import Control.Parallel.Strategies
import Data.Maybe
import Data.Unique
import Control.Concurrent.CHP.Base
import Control.Concurrent.CHP.CSP
import Control.Concurrent.CHP.Event
import Control.Concurrent.CHP.Monad
import Control.Concurrent.CHP.Mutex
import Control.Concurrent.CHP.Poison
import Control.Concurrent.CHP.Traces.Base
newtype Chanin a = Chanin (STMChannel a) deriving Eq
newtype Chanout a = Chanout (STMChannel a) deriving Eq
newtype STMChannel a = STMChan (Event, TVar (WithPoison (Maybe a))) deriving
Eq
type OneToOneChannel = Chan Chanin Chanout
type AnyToOneChannel = Chan (Chanin) (Shared Chanout)
type OneToAnyChannel = Chan (Shared Chanin) (Chanout)
type AnyToAnyChannel = Chan (Shared Chanin) (Shared Chanout)
class ChaninC c a where
startReadChannelC :: c a -> (Event, STM (WithPoison a))
endReadChannelC :: c a -> STM (WithPoison ())
poisonReadC :: c a -> IO ()
checkPoisonReadC :: c a -> IO (WithPoison ())
class ChanoutC c a where
startWriteChannelC :: c a -> (Event, STM (WithPoison ()))
sendWriteChannelC :: c a -> a -> STM (WithPoison ())
endWriteChannelC :: c a -> STM (WithPoison ())
poisonWriteC :: c a -> IO ()
checkPoisonWriteC :: c a -> IO (WithPoison ())
class Channel r w where
newChannel :: MonadCHP m => m (Chan r w a)
class ReadableChannel chanEnd where
readChannel :: chanEnd a -> CHP a
readChannel c = extReadChannel c return
extReadChannel :: chanEnd a -> (a -> CHP b) -> CHP b
class WriteableChannel chanEnd where
writeChannel :: chanEnd a -> a -> CHP ()
writeChannel c x = extWriteChannel c (return x)
extWriteChannel :: chanEnd a -> CHP a -> CHP ()
class ChannelTuple t where
newChannels :: MonadCHP m => m t
writeChannelStrict :: (NFData a, WriteableChannel chanEnd) => chanEnd a -> a -> CHP ()
writeChannelStrict c x = (writeChannel c $| rnf) x
chan :: Monad m => m (Unique, c a) -> (c a -> r a) -> (c a -> w a) -> m (Chan r w a)
chan m r w = do (u, x) <- m
return $ Chan u (r x) (w x)
waitForJustOrPoison :: TVar (WithPoison (Maybe a)) -> STM (WithPoison a)
waitForJustOrPoison tv = do x <- readTVar tv
case x of
PoisonItem -> return PoisonItem
NoPoison Nothing -> retry
NoPoison (Just y) -> return $ NoPoison y
waitForNothingOrPoison :: TVar (WithPoison (Maybe a)) -> STM (WithPoison ())
waitForNothingOrPoison tv = do x <- readTVar tv
case x of
PoisonItem -> return PoisonItem
NoPoison (Just _) -> retry
NoPoison Nothing -> return $ NoPoison ()
newChannelWithLabel :: (Channel r w, MonadCHP m) => String -> m (Chan r w a)
newChannelWithLabel l
= do c <- newChannel
liftCHP . liftPoison . liftTrace $ labelUnique (getChannelIdentifier c) l
return c
newChannelRW :: (Channel r w, MonadCHP m) => m (r a, w a)
newChannelRW = do c <- newChannel
return (reader c, writer c)
newChannelWR :: (Channel r w, MonadCHP m) => m (w a, r a)
newChannelWR = do c <- newChannel
return (writer c, reader c)
newChannelList :: (Channel r w, MonadCHP m) => Int -> m [Chan r w a]
newChannelList n = replicateM n newChannel
newChannelListWithStem :: (Channel r w, MonadCHP m) => Int -> String -> m [Chan r w a]
newChannelListWithStem n s = sequence [newChannelWithLabel (s ++ show i) | i <- [0 .. (n 1)]]
newChannelListWithLabels :: (Channel r w, MonadCHP m) => [String] -> m [Chan r w a]
newChannelListWithLabels = mapM newChannelWithLabel
readers :: [Chan r w a] -> [r a]
readers = map reader
writers :: [Chan r w a] -> [w a]
writers = map writer
stmChannel :: MonadIO m => m (Unique, STMChannel a)
stmChannel = liftIO $
do e <- newEvent ChannelComm 2
c <- atomically $ newTVar $ NoPoison Nothing
return (getEventUnique e, STMChan (e,c))
oneToOneChannel :: MonadCHP m => m (OneToOneChannel a)
oneToOneChannel = newChannel
claim :: Shared c a -> (c a -> CHP b) -> CHP b
claim (Shared (lock, c)) body
= scopeBlock
(claimMutex lock >> return c)
(\y -> do x <- body y
liftIO $ releaseMutex lock
return x)
(releaseMutex lock)
anyToOneChannel :: MonadCHP m => m (AnyToOneChannel a)
anyToOneChannel = newChannel
oneToAnyChannel :: MonadCHP m => m (OneToAnyChannel a)
oneToAnyChannel = newChannel
anyToAnyChannel :: MonadCHP m => m (AnyToAnyChannel a)
anyToAnyChannel = newChannel
instance ReadableChannel Chanin where
readChannel (Chanin c)
= let (e, m) = startReadChannelC c in
buildOnEventPoison (Just . ChannelRead) e (return ()) (liftSTM $
do x <- m
endReadChannelC c
return x) >>= checkPoison
extReadChannel (Chanin c) body
= let (e, m) = startReadChannelC c in
scopeBlock
(buildOnEventPoison (Just . ChannelRead) e (return ()) (liftSTM m) >>= checkPoison)
(\val -> do x <- body val
liftSTM $ endReadChannelC c
return x)
(poisonReadC c)
instance WriteableChannel Chanout where
writeChannel (Chanout c) x
= let (e, m) = startWriteChannelC c in
buildOnEventPoison (Just . ChannelWrite) e (return ())
(liftM2 (++)
(liftSTM $ sequence [m, sendWriteChannelC c x])
(liftSTM $ sequence [endWriteChannelC c]))
>>= checkPoison . mergeWithPoison
extWriteChannel (Chanout c) body
= let (e, m) = startWriteChannelC c in
scopeBlock
(buildOnEventPoison (Just . ChannelWrite)
e (return ()) (liftSTM m) >>= checkPoison)
(const $ sequence [body >>= liftSTM . sendWriteChannelC c
,liftSTM (endWriteChannelC c)]
>>= checkPoison . mergeWithPoison)
(poisonWriteC c)
instance Poisonable (Chanin a) where
poison (Chanin c) = liftIO $ poisonReadC c
checkForPoison (Chanin c) = liftCHP $ liftIO (checkPoisonReadC c) >>= checkPoison
instance Poisonable (Chanout a) where
poison (Chanout c) = liftIO $ poisonWriteC c
checkForPoison (Chanout c) = liftCHP $ liftIO (checkPoisonWriteC c) >>= checkPoison
instance (Channel r w) => ChannelTuple (Chan r w a, Chan r w a) where
newChannels = do c0 <- newChannel
c1 <- newChannel
return (c0, c1)
instance (Channel r w) => ChannelTuple (Chan r w a, Chan r w a, Chan r w a) where
newChannels = do c0 <- newChannel
c1 <- newChannel
c2 <- newChannel
return (c0, c1, c2)
instance (Channel r w) => ChannelTuple (Chan r w a, Chan r w a, Chan r w a,
Chan r w a) where
newChannels = do c0 <- newChannel
c1 <- newChannel
c2 <- newChannel
c3 <- newChannel
return (c0, c1, c2, c3)
instance (Channel r w) => ChannelTuple (Chan r w a, Chan r w a, Chan r w a,
Chan r w a, Chan r w a) where
newChannels = do c0 <- newChannel
c1 <- newChannel
c2 <- newChannel
c3 <- newChannel
c4 <- newChannel
return (c0, c1, c2, c3, c4)
instance (Channel r w) => ChannelTuple (Chan r w a, Chan r w a, Chan r w a,
Chan r w a, Chan r w a, Chan r w a) where
newChannels = do c0 <- newChannel
c1 <- newChannel
c2 <- newChannel
c3 <- newChannel
c4 <- newChannel
c5 <- newChannel
return (c0, c1, c2, c3, c4, c5)
instance ChaninC STMChannel a where
startReadChannelC (STMChan (e,tv)) = (e, waitForJustOrPoison tv)
endReadChannelC (STMChan (_,tv))
= do x <- readTVar tv
case x of
PoisonItem -> return PoisonItem
NoPoison _ -> do writeTVar tv $ NoPoison Nothing
return $ NoPoison ()
poisonReadC (STMChan (e,tv))
= liftSTM $ do poisonEvent e
writeTVar tv PoisonItem
checkPoisonReadC (STMChan (e,_)) = liftSTM $ checkEventForPoison e
instance ChanoutC STMChannel a where
startWriteChannelC (STMChan (e,tv))
= (e, do x <- readTVar tv
case x of
PoisonItem -> return PoisonItem
NoPoison _ -> return $ NoPoison ())
sendWriteChannelC (STMChan (_, tv)) val
= do x <- readTVar tv
case x of
PoisonItem -> return PoisonItem
NoPoison _ -> do writeTVar tv $ NoPoison $ Just val
return $ NoPoison ()
endWriteChannelC (STMChan (_, tv))
= waitForNothingOrPoison tv
poisonWriteC (STMChan (e,tv))
= liftSTM $ do poisonEvent e
writeTVar tv PoisonItem
checkPoisonWriteC (STMChan (e,_)) = liftSTM $ checkEventForPoison e
instance Channel Chanin Chanout where
newChannel = chan stmChannel Chanin Chanout
instance Channel (Shared Chanin) Chanout where
newChannel = do m <- newMutex
c <- newChannel
return $ Chan (getChannelIdentifier c) (Shared (m, reader c)) (writer c)
instance Channel Chanin (Shared Chanout) where
newChannel = do m <- newMutex
c <- newChannel
return $ Chan (getChannelIdentifier c) (reader c) (Shared (m, writer c))
instance Channel (Shared Chanin) (Shared Chanout) where
newChannel = do m <- newMutex
m' <- newMutex
c <- newChannel
return $ Chan (getChannelIdentifier c) (Shared (m, reader c)) (Shared (m', writer c))