-- Communicating Haskell Processes. -- Copyright (c) 2008, University of Kent. -- All rights reserved. -- -- Redistribution and use in source and binary forms, with or without -- modification, are permitted provided that the following conditions are -- met: -- -- * Redistributions of source code must retain the above copyright -- notice, this list of conditions and the following disclaimer. -- * Redistributions in binary form must reproduce the above copyright -- notice, this list of conditions and the following disclaimer in the -- documentation and/or other materials provided with the distribution. -- * Neither the name of the University of Kent nor the names of its -- contributors may be used to endorse or promote products derived from -- this software without specific prior written permission. -- -- THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS -- IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, -- THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR -- PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR -- CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, -- EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, -- PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR -- PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF -- LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING -- NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS -- SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -- | The module containing all the different types of channels in CHP. Unlike -- JCSP and C++CSP2, CHP does not offer buffered channels directly (see the -- "Control.Concurrent.CHP.Buffers" module). There are four different channel types, effectively -- all possible combinations of: -- -- * Shared reader vs non-shared reader -- -- * Shared writer vs non-shared writer -- -- For most applications you probably want just 'OneToOneChannel'. -- -- It is possible for the type system to infer which channel you want when -- you use 'newChannel'. If the types of the ends are known by the type system, -- the channel-type can be inferred. So you can usually just write 'newChannel', -- and depending on how you use the channel, the type system will figure out -- which one you needed. module Control.Concurrent.CHP.Channels ( -- * Channel Creation Chan, Channel(..), writeChannelStrict, newChannelWithLabel, newChannelWR, newChannelRW, ChannelTuple(..), newChannelList, newChannelListWithLabels, newChannelListWithStem, getChannelIdentifier, -- * Channel-Ends Chanin, Chanout, reader, writer, readers, writers, -- * Reading and Writing with Channels ReadableChannel(..), WriteableChannel(..), -- * Shared Channels claim, Shared, -- * Specific Channel Types -- | All the functions here are equivalent to newChannel (or newChannelWithLabel), but typed. So for -- example, @oneToOneChannel = newChannel :: MonadCHP m => m OneToOneChannel@. OneToOneChannel, oneToOneChannel, oneToOneChannelWithLabel, OneToAnyChannel, oneToAnyChannel, oneToAnyChannelWithLabel, AnyToOneChannel, anyToOneChannel, anyToOneChannelWithLabel, AnyToAnyChannel, anyToAnyChannel, anyToAnyChannelWithLabel ) where import Control.Concurrent.STM.TVar import Control.Monad import Control.Monad.STM import Control.Monad.Trans import Control.Parallel.Strategies import Data.Maybe import Data.Monoid import Data.Unique import Control.Concurrent.CHP.Base import Control.Concurrent.CHP.CSP import Control.Concurrent.CHP.Event import Control.Concurrent.CHP.Guard import Control.Concurrent.CHP.Monad import Control.Concurrent.CHP.Mutex import Control.Concurrent.CHP.Poison import Control.Concurrent.CHP.Traces.Base -- ====== -- Types: -- ====== -- | A reading channel-end type that allows poison to be thrown -- -- Eq instance added in version 1.1.1 newtype Chanin a = Chanin (STMChannel a) deriving Eq -- | A writing channel-end type that allows poison to be thrown -- -- Eq instance added in version 1.1.1 newtype Chanout a = Chanout (STMChannel a) deriving Eq newtype STMChannel a = STMChan (Event, TVar (WithPoison (Maybe a, Maybe ()))) deriving Eq type OneToOneChannel = Chan Chanin Chanout type AnyToOneChannel = Chan (Chanin) (Shared Chanout) type OneToAnyChannel = Chan (Shared Chanin) (Chanout) type AnyToAnyChannel = Chan (Shared Chanin) (Shared Chanout) -- ======== -- Classes: -- ======== class ChaninC c a where -- Start gets the event and the transaction that will wait for data. You -- sync on the event (possible extended write occurs) then wait for data startReadChannelC :: c a -> (Event, STM (WithPoison a)) -- (extended read action goes here) -- Read releases the writer endReadChannelC :: c a -> STM (WithPoison ()) -- First action is to be done as part of the completion: readChannelC :: c a -> (Event, STM (), STM (WithPoison a)) poisonReadC :: c a -> IO () checkPoisonReadC :: c a -> IO (WithPoison ()) class ChanoutC c a where -- Start checks for poison and gets the event: startWriteChannelC :: c a -> (Event, STM (WithPoison ())) -- (extended write action goes here) -- Send actually transmits the value: sendWriteChannelC :: c a -> a -> STM (WithPoison ()) -- (extended read action goes here) -- End waits for the reader to tell us we're done, must be done in a different -- transaction to the send endWriteChannelC :: c a -> STM (WithPoison ()) -- First action is to be done as part of the completion: writeChannelC :: c a -> a -> (Event, STM (), STM (WithPoison ())) poisonWriteC :: c a -> IO () checkPoisonWriteC :: c a -> IO (WithPoison ()) -- | A class used for allocating new channels, and getting the reading and -- writing ends. There is a bijective assocation between the channel, and -- its pair of end types. You can see the types in the list of instances below. -- Thus, 'newChannel' may be used, and the compiler will infer which type of -- channel is required based on what end-types you get from 'reader' and 'writer'. -- Alternatively, if you explicitly type the return of 'newChannel', it will -- be definite which ends you will use. If you do want to fix the type of -- the channel you are using when you allocate it, consider using one of the -- many 'oneToOneChannel'-like shorthand functions that fix the type. class Channel r w where -- | Allocates a new channel. Nothing need be done to -- destroy\/de-allocate the channel when it is no longer in use. newChannel :: MonadCHP m => m (Chan r w a) -- | Determines if two channel-ends refer to the same channel. -- -- This function was added in version 1.4.0. sameChannel :: r a -> w a -> Bool -- | A class indicating that a channel can be read from. class ReadableChannel chanEnd where -- minimal implementation: extReadChannel -- | Reads from the given reading channel-end readChannel :: chanEnd a -> CHP a readChannel c = extReadChannel c return -- | Performs an extended read from the channel, performing the given action -- before freeing the writer extReadChannel :: chanEnd a -> (a -> CHP b) -> CHP b -- | A class indicating that a channel can be written to. class WriteableChannel chanEnd where -- minimal implementation: extWriteChannel -- | Writes from the given writing channel-end writeChannel :: chanEnd a -> a -> CHP () writeChannel c x = extWriteChannel c (return x) >> return () -- | Starts the communication, then performs the given extended action, then -- sends the result of that down the channel. extWriteChannel :: chanEnd a -> CHP a -> CHP () extWriteChannel c m = extWriteChannel' c (liftM (flip (,) ()) m) -- | Like extWriteChannel, but allows a value to be returned from the inner action. -- -- This function was added in version 1.4.0. extWriteChannel' :: chanEnd a -> CHP (a, b) -> CHP b -- | A helper class for easily creating several channels of the same type. -- The same type refers not only to what type the channel carries, but -- also to the type of channel (one-to-one no poison, one-to-any with -- poison, etc). You can write code like this: -- -- > (a, b, c, d, e) <- newChannels -- -- To create five channels of the same type. class ChannelTuple t where newChannels :: MonadCHP m => m t -- ========== -- Functions: -- ========== -- | A helper function that uses the parallel strategies library (see the -- paper: \"Algorithm + Strategy = Parallelism\", P.W. Trinder et al, JFP -- 8(1) 1998, -- ) -- to make sure that the value sent down a channel is strictly evaluated -- by the sender before transmission. -- -- This is useful when you want to write worker processes that evaluate data -- and send it back to some \"harvester\" process. By default the values sent -- back may be unevaluated, and thus the harvester might end up doing the evaluation. -- If you use this function, the value is guaranteed to be completely evaluated -- before sending. -- -- Added in version 1.0.2. writeChannelStrict :: (NFData a, WriteableChannel chanEnd) => chanEnd a -> a -> CHP () writeChannelStrict c x = (writeChannel c $| rnf) x chan :: Monad m => m (Unique, c a) -> (c a -> r a) -> (c a -> w a) -> m (Chan r w a) chan m r w = do (u, x) <- m return $ Chan u (r x) (w x) -- | Like 'newChannel' but also associates a label with that channel in a -- trace. You can use this function whether tracing is turned on or not, -- so if you ever use tracing, you should use this rather than 'newChannel'. newChannelWithLabel :: (Channel r w, MonadCHP m) => String -> m (Chan r w a) newChannelWithLabel l = do c <- newChannel liftCHP . liftPoison . liftTrace $ labelUnique (getChannelIdentifier c) l return c -- | A helper that is like 'newChannel' but returns the reading and writing -- end of the channels directly. newChannelRW :: (Channel r w, MonadCHP m) => m (r a, w a) newChannelRW = do c <- newChannel return (reader c, writer c) -- | A helper that is like 'newChannel' but returns the writing and reading -- end of the channels directly. newChannelWR :: (Channel r w, MonadCHP m) => m (w a, r a) newChannelWR = do c <- newChannel return (writer c, reader c) -- | Creates a list of channels of the same type with the given length. If -- you need to access some channels by index, use this function. Otherwise -- you may find using 'newChannels' to be easier. newChannelList :: (Channel r w, MonadCHP m) => Int -> m [Chan r w a] newChannelList n = replicateM n newChannel -- | A helper that is like 'newChannelList', but labels the channels according -- to a pattern. Given a stem such as foo, it names the channels in the list -- foo0, foo1, foo2, etc. newChannelListWithStem :: (Channel r w, MonadCHP m) => Int -> String -> m [Chan r w a] newChannelListWithStem n s = sequence [newChannelWithLabel (s ++ show i) | i <- [0 .. (n - 1)]] -- | A helper that is like 'newChannelList', but labels the channels with the -- given list. The number of channels returned is the same as the length of -- the list of labels newChannelListWithLabels :: (Channel r w, MonadCHP m) => [String] -> m [Chan r w a] newChannelListWithLabels = mapM newChannelWithLabel -- | Gets all the reading ends of a list of channels. A shorthand for @map -- reader@. readers :: [Chan r w a] -> [r a] readers = map reader -- | Gets all the writing ends of a list of channels. A shorthand for @map -- writer@. writers :: [Chan r w a] -> [w a] writers = map writer stmChannel :: MonadIO m => m (Unique, STMChannel a) stmChannel = liftIO $ do e <- newEvent ChannelComm 2 c <- atomically $ newTVar $ NoPoison (Nothing, Nothing) return (getEventUnique e, STMChan (e,c)) -- | A type-constrained version of newChannel. oneToOneChannel :: MonadCHP m => m (OneToOneChannel a) oneToOneChannel = newChannel -- | A type-constrained version of newChannelWithLabel. -- -- Added in version 1.2.0. oneToOneChannelWithLabel :: MonadCHP m => String -> m (OneToOneChannel a) oneToOneChannelWithLabel = newChannelWithLabel -- | Claims the given channel-end, executes the given block, then releases -- the channel-end and returns the output value. If poison or an IO -- exception is thrown inside the block, the channel is released and the -- poison\/exception re-thrown. claim :: Shared c a -> (c a -> CHP b) -> CHP b claim (Shared (lock, c)) body = scopeBlock (claimMutex lock >> return c) (\y -> do x <- body y liftIO $ releaseMutex lock return x) (releaseMutex lock) -- | A type-constrained version of newChannel. anyToOneChannel :: MonadCHP m => m (AnyToOneChannel a) anyToOneChannel = newChannel -- | A type-constrained version of newChannel. oneToAnyChannel :: MonadCHP m => m (OneToAnyChannel a) oneToAnyChannel = newChannel -- | A type-constrained version of newChannel. anyToAnyChannel :: MonadCHP m => m (AnyToAnyChannel a) anyToAnyChannel = newChannel -- | A type-constrained version of newChannelWithLabel. -- -- Added in version 1.2.0. anyToOneChannelWithLabel :: MonadCHP m => String -> m (AnyToOneChannel a) anyToOneChannelWithLabel = newChannelWithLabel -- | A type-constrained version of newChannelWithLabel. -- -- Added in version 1.2.0. oneToAnyChannelWithLabel :: MonadCHP m => String -> m (OneToAnyChannel a) oneToAnyChannelWithLabel = newChannelWithLabel -- | A type-constrained version of newChannelWithLabel. -- -- Added in version 1.2.0. anyToAnyChannelWithLabel :: MonadCHP m => String -> m (AnyToAnyChannel a) anyToAnyChannelWithLabel = newChannelWithLabel -- ========== -- Instances: -- ========== instance ReadableChannel Chanin where readChannel (Chanin c) = let (e, mdur, mafter) = readChannelC c in buildOnEventPoison (indivRecJust ChannelRead) e (EventActions (return ()) mdur) (liftSTM mafter) >>= checkPoison extReadChannel (Chanin c) body = let (e, m) = startReadChannelC c in scopeBlock (buildOnEventPoison (indivRecJust ChannelRead) e mempty (liftSTM m) >>= checkPoison) (\val -> do x <- body val liftSTM $ endReadChannelC c return x) (poisonReadC c) instance WriteableChannel Chanout where writeChannel (Chanout c) x = let (e, mdur, mafter) = writeChannelC c x in buildOnEventPoison (indivRecJust ChannelWrite) e (EventActions (return ()) mdur) (liftSTM mafter) >>= checkPoison extWriteChannel' (Chanout c) body = let (e, m) = startWriteChannelC c in scopeBlock (buildOnEventPoison (indivRecJust ChannelWrite) e mempty (liftSTM m) >>= checkPoison) (const $ do (x, r) <- body sequence [liftSTM $ sendWriteChannelC c x ,liftSTM (endWriteChannelC c)] >>= checkPoison . mergeWithPoison return r) (poisonWriteC c) instance Poisonable (Chanin a) where poison (Chanin c) = liftIO $ poisonReadC c checkForPoison (Chanin c) = liftCHP $ liftIO (checkPoisonReadC c) >>= checkPoison instance Poisonable (Chanout a) where poison (Chanout c) = liftIO $ poisonWriteC c checkForPoison (Chanout c) = liftCHP $ liftIO (checkPoisonWriteC c) >>= checkPoison instance (Channel r w) => ChannelTuple (Chan r w a, Chan r w a) where newChannels = do c0 <- newChannel c1 <- newChannel return (c0, c1) instance (Channel r w) => ChannelTuple (Chan r w a, Chan r w a, Chan r w a) where newChannels = do c0 <- newChannel c1 <- newChannel c2 <- newChannel return (c0, c1, c2) instance (Channel r w) => ChannelTuple (Chan r w a, Chan r w a, Chan r w a, Chan r w a) where newChannels = do c0 <- newChannel c1 <- newChannel c2 <- newChannel c3 <- newChannel return (c0, c1, c2, c3) instance (Channel r w) => ChannelTuple (Chan r w a, Chan r w a, Chan r w a, Chan r w a, Chan r w a) where newChannels = do c0 <- newChannel c1 <- newChannel c2 <- newChannel c3 <- newChannel c4 <- newChannel return (c0, c1, c2, c3, c4) instance (Channel r w) => ChannelTuple (Chan r w a, Chan r w a, Chan r w a, Chan r w a, Chan r w a, Chan r w a) where newChannels = do c0 <- newChannel c1 <- newChannel c2 <- newChannel c3 <- newChannel c4 <- newChannel c5 <- newChannel return (c0, c1, c2, c3, c4, c5) -- Some of this is defensive programming -- the writer should never be able -- to discover poison in the channel variable, for example consumeData :: TVar (WithPoison (Maybe a, Maybe ())) -> STM (WithPoison a) consumeData tv = do d <- readTVar tv case d of PoisonItem -> return PoisonItem NoPoison (Nothing, _) -> retry NoPoison (Just x, a) -> do writeTVar tv $ NoPoison (Nothing, a) return $ NoPoison x sendData :: TVar (WithPoison (Maybe a, Maybe ())) -> a -> STM (WithPoison ()) sendData tv x = do y <- readTVar tv case y of PoisonItem -> return PoisonItem NoPoison (Just _, _) -> error "CHP: Found data while sending data" NoPoison (Nothing, a) -> do writeTVar tv $ NoPoison (Just x, a) return $ NoPoison () consumeAck :: TVar (WithPoison (Maybe a, Maybe ())) -> STM (WithPoison ()) consumeAck tv = do d <- readTVar tv case d of PoisonItem -> return PoisonItem NoPoison (_, Nothing) -> retry NoPoison (x, Just _) -> do writeTVar tv $ NoPoison (x, Nothing) return $ NoPoison () sendAck :: TVar (WithPoison (Maybe a, Maybe ())) -> STM (WithPoison ()) sendAck tv = do d <- readTVar tv case d of PoisonItem -> return PoisonItem NoPoison (_, Just _) -> error "CHP: Found ack while placing ack!" NoPoison (x, Nothing) -> do writeTVar tv $ NoPoison (x, Just ()) return $ NoPoison () instance ChaninC STMChannel a where startReadChannelC (STMChan (e,tv)) = (e, consumeData tv) endReadChannelC (STMChan (_,tv)) = sendAck tv readChannelC (STMChan (e, tv)) = (e, sendAck tv >> return (), consumeData tv) poisonReadC (STMChan (e,tv)) = liftSTM $ do poisonEvent e writeTVar tv PoisonItem checkPoisonReadC (STMChan (e,_)) = liftSTM $ checkEventForPoison e instance ChanoutC STMChannel a where startWriteChannelC (STMChan (e,tv)) = (e, do x <- readTVar tv case x of PoisonItem -> return PoisonItem NoPoison _ -> return $ NoPoison ()) sendWriteChannelC (STMChan (_, tv)) val = sendData tv val endWriteChannelC (STMChan (_, tv)) = consumeAck tv writeChannelC (STMChan (e, tv)) val = (e, sendData tv val >> return (), consumeAck tv) poisonWriteC (STMChan (e,tv)) = liftSTM $ do poisonEvent e writeTVar tv PoisonItem checkPoisonWriteC (STMChan (e,_)) = liftSTM $ checkEventForPoison e instance Channel Chanin Chanout where newChannel = chan stmChannel Chanin Chanout sameChannel (Chanin x) (Chanout y) = x == y instance Channel (Shared Chanin) Chanout where newChannel = do m <- newMutex c <- newChannel return $ Chan (getChannelIdentifier c) (Shared (m, reader c)) (writer c) sameChannel (Shared (_, Chanin x)) (Chanout y) = x == y instance Channel Chanin (Shared Chanout) where newChannel = do m <- newMutex c <- newChannel return $ Chan (getChannelIdentifier c) (reader c) (Shared (m, writer c)) sameChannel (Chanin x) (Shared (_, Chanout y)) = x == y instance Channel (Shared Chanin) (Shared Chanout) where newChannel = do m <- newMutex m' <- newMutex c <- newChannel return $ Chan (getChannelIdentifier c) (Shared (m, reader c)) (Shared (m', writer c)) sameChannel (Shared (_, Chanin x)) (Shared (_, Chanout y)) = x == y