module Control.Concurrent.CHP.Channels (
Chan, Channel(..), newChannelWithLabel, newChannelWR, newChannelRW, ChannelTuple(..),
newChannelList, newChannelListWithLabels, newChannelListWithStem,
getChannelIdentifier,
Chanin, Chanout,
reader, writer, readers, writers,
ReadableChannel(..), WriteableChannel(..),
claim, Shared,
OneToOneChannel, oneToOneChannel,
OneToAnyChannel, oneToAnyChannel,
AnyToOneChannel, anyToOneChannel,
AnyToAnyChannel, anyToAnyChannel
)
where
import Control.Concurrent.STM.TVar
import Control.Monad
import Control.Monad.STM
import Control.Monad.Trans
import Data.Maybe
import Data.Unique
import Control.Concurrent.CHP.Base
import Control.Concurrent.CHP.CSP
import Control.Concurrent.CHP.Event
import Control.Concurrent.CHP.Monad
import Control.Concurrent.CHP.Mutex
import Control.Concurrent.CHP.Poison
import Control.Concurrent.CHP.Traces.Base
newtype Chanin a = Chanin (STMChannel a)
newtype Chanout a = Chanout (STMChannel a)
newtype STMChannel a = STMChan (Event, TVar (WithPoison (Maybe a)))
type OneToOneChannel = Chan Chanin Chanout
type AnyToOneChannel = Chan (Chanin) (Shared Chanout)
type OneToAnyChannel = Chan (Shared Chanin) (Chanout)
type AnyToAnyChannel = Chan (Shared Chanin) (Shared Chanout)
class ChaninC c a where
startReadChannelC :: c a -> (Event, STM (WithPoison a))
endReadChannelC :: c a -> STM ()
poisonReadC :: c a -> IO ()
class ChanoutC c a where
startWriteChannelC :: c a -> (Event, STM (WithPoison ()))
endWriteChannelC :: c a -> a -> STM ()
poisonWriteC :: c a -> IO ()
class Channel r w where
newChannel :: MonadCHP m => m (Chan r w a)
class ReadableChannel chanEnd where
readChannel :: chanEnd a -> CHP a
readChannel c = extReadChannel c return
extReadChannel :: chanEnd a -> (a -> CHP b) -> CHP b
class WriteableChannel chanEnd where
writeChannel :: chanEnd a -> a -> CHP ()
writeChannel c x = extWriteChannel c (return x)
extWriteChannel :: chanEnd a -> CHP a -> CHP ()
class ChannelTuple t where
newChannels :: MonadCHP m => m t
chan :: Monad m => m (Unique, c a) -> (c a -> r a) -> (c a -> w a) -> m (Chan r w a)
chan m r w = do (u, x) <- m
return $ Chan u (r x) (w x)
waitForJustOrPoison :: TVar (WithPoison (Maybe a)) -> STM (WithPoison a)
waitForJustOrPoison tv = do x <- readTVar tv
case x of
PoisonItem -> return PoisonItem
NoPoison Nothing -> retry
NoPoison (Just y) -> return $ NoPoison y
newChannelWithLabel :: (Channel r w, MonadCHP m) => String -> m (Chan r w a)
newChannelWithLabel l
= do c <- newChannel
liftCHP . liftPoison . liftTrace $ labelUnique (getChannelIdentifier c) l
return c
newChannelRW :: (Channel r w, MonadCHP m) => m (r a, w a)
newChannelRW = do c <- newChannel
return (reader c, writer c)
newChannelWR :: (Channel r w, MonadCHP m) => m (w a, r a)
newChannelWR = do c <- newChannel
return (writer c, reader c)
newChannelList :: (Channel r w, MonadCHP m) => Int -> m [Chan r w a]
newChannelList n = replicateM n newChannel
newChannelListWithStem :: (Channel r w, MonadCHP m) => Int -> String -> m [Chan r w a]
newChannelListWithStem n s = sequence [newChannelWithLabel (s ++ show i) | i <- [0 .. (n 1)]]
newChannelListWithLabels :: (Channel r w, MonadCHP m) => [String] -> m [Chan r w a]
newChannelListWithLabels = mapM newChannelWithLabel
readers :: [Chan r w a] -> [r a]
readers = map reader
writers :: [Chan r w a] -> [w a]
writers = map writer
stmChannel :: MonadIO m => m (Unique, STMChannel a)
stmChannel = liftIO $
do e@(Event (u,_)) <- newEvent 2
c <- atomically $ newTVar $ NoPoison Nothing
return (u, STMChan (e,c))
oneToOneChannel :: MonadCHP m => m (OneToOneChannel a)
oneToOneChannel = newChannel
claim :: Shared c a -> (c a -> CHP b) -> CHP b
claim (Shared (lock, c)) body
= scopeBlock
(claimMutex lock >> return c)
(\y -> do x <- body y
liftIO $ releaseMutex lock
return x)
(releaseMutex lock)
anyToOneChannel :: MonadCHP m => m (AnyToOneChannel a)
anyToOneChannel = newChannel
oneToAnyChannel :: MonadCHP m => m (OneToAnyChannel a)
oneToAnyChannel = newChannel
anyToAnyChannel :: MonadCHP m => m (AnyToAnyChannel a)
anyToAnyChannel = newChannel
instance ReadableChannel Chanin where
readChannel (Chanin c)
= let (e, m) = startReadChannelC c in
buildOnEventPoison (recAs (Just . ChannelComm) (Just . ChannelRead)) e (return ()) (liftSTM $
do x <- m
endReadChannelC c
return x) >>= checkPoison
extReadChannel (Chanin c) body
= let (e, m) = startReadChannelC c in
scopeBlock
(buildOnEventPoison (recAs (Just . ChannelComm) (Just . ChannelRead)) e (return ()) (liftSTM m) >>= checkPoison)
(\val -> do x <- body val
liftSTM $ endReadChannelC c
return x)
(poisonReadC c)
instance WriteableChannel Chanout where
writeChannel (Chanout c) x
= let (e, m) = startWriteChannelC c in
buildOnEventPoison (recAs (Just . ChannelComm) (Just . ChannelWrite)) e (return ())
(liftSTM $ do y <- m
endWriteChannelC c x
return y)
>>= checkPoison
extWriteChannel (Chanout c) body
= let (e, m) = startWriteChannelC c in
scopeBlock
(buildOnEventPoison (recAs (Just . ChannelComm) (Just . ChannelWrite))
e (return ()) (liftSTM m) >>= checkPoison)
(const $ body >>= liftSTM . endWriteChannelC c)
(poisonWriteC c)
instance Poisonable (Chanin a) where
poison (Chanin c) = liftIO $ poisonReadC c
instance Poisonable (Chanout a) where
poison (Chanout c) = liftIO $ poisonWriteC c
instance (Channel r w) => ChannelTuple (Chan r w a, Chan r w a) where
newChannels = do c0 <- newChannel
c1 <- newChannel
return (c0, c1)
instance (Channel r w) => ChannelTuple (Chan r w a, Chan r w a, Chan r w a) where
newChannels = do c0 <- newChannel
c1 <- newChannel
c2 <- newChannel
return (c0, c1, c2)
instance (Channel r w) => ChannelTuple (Chan r w a, Chan r w a, Chan r w a,
Chan r w a) where
newChannels = do c0 <- newChannel
c1 <- newChannel
c2 <- newChannel
c3 <- newChannel
return (c0, c1, c2, c3)
instance (Channel r w) => ChannelTuple (Chan r w a, Chan r w a, Chan r w a,
Chan r w a, Chan r w a) where
newChannels = do c0 <- newChannel
c1 <- newChannel
c2 <- newChannel
c3 <- newChannel
c4 <- newChannel
return (c0, c1, c2, c3, c4)
instance (Channel r w) => ChannelTuple (Chan r w a, Chan r w a, Chan r w a,
Chan r w a, Chan r w a, Chan r w a) where
newChannels = do c0 <- newChannel
c1 <- newChannel
c2 <- newChannel
c3 <- newChannel
c4 <- newChannel
c5 <- newChannel
return (c0, c1, c2, c3, c4, c5)
instance ChaninC STMChannel a where
startReadChannelC (STMChan (e,tv)) = (e, waitForJustOrPoison tv)
endReadChannelC (STMChan (_,tv))
= do x <- readTVar tv
case x of
PoisonItem -> return ()
NoPoison _ -> writeTVar tv $ NoPoison Nothing
poisonReadC (STMChan (e,tv))
= liftSTM $ do poisonEvent e
writeTVar tv PoisonItem
instance ChanoutC STMChannel a where
startWriteChannelC (STMChan (e,tv))
= (e, do x <- readTVar tv
case x of
PoisonItem -> return PoisonItem
NoPoison _ -> return $ NoPoison ())
endWriteChannelC (STMChan (_, tv)) val
= do x <- readTVar tv
case x of
PoisonItem -> return ()
NoPoison _ -> do writeTVar tv $ NoPoison $ Just val
poisonWriteC (STMChan (e,tv))
= liftSTM $ do poisonEvent e
writeTVar tv PoisonItem
instance Channel Chanin Chanout where
newChannel = chan stmChannel Chanin Chanout
instance Channel (Shared Chanin) Chanout where
newChannel = do m <- newMutex
c <- newChannel
return $ Chan (getChannelIdentifier c) (Shared (m, reader c)) (writer c)
instance Channel Chanin (Shared Chanout) where
newChannel = do m <- newMutex
c <- newChannel
return $ Chan (getChannelIdentifier c) (reader c) (Shared (m, writer c))
instance Channel (Shared Chanin) (Shared Chanout) where
newChannel = do m <- newMutex
m' <- newMutex
c <- newChannel
return $ Chan (getChannelIdentifier c) (Shared (m, reader c)) (Shared (m', writer c))