-- Communicating Haskell Processes. -- Copyright (c) 2008, University of Kent. -- All rights reserved. -- -- Redistribution and use in source and binary forms, with or without -- modification, are permitted provided that the following conditions are -- met: -- -- * Redistributions of source code must retain the above copyright -- notice, this list of conditions and the following disclaimer. -- * Redistributions in binary form must reproduce the above copyright -- notice, this list of conditions and the following disclaimer in the -- documentation and/or other materials provided with the distribution. -- * Neither the name of the University of Kent nor the names of its -- contributors may be used to endorse or promote products derived from -- this software without specific prior written permission. -- -- THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS -- IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, -- THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR -- PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR -- CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, -- EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, -- PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR -- PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF -- LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING -- NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS -- SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -- | The module containing all the different types of channels in CHP. Unlike -- JCSP and C++CSP2, CHP does not offer buffered channels directly (see the -- "Control.Concurrent.CHP.Buffers" module). There are four different channel types, effectively -- all possible combinations of: -- -- * Shared reader vs non-shared reader -- -- * Shared writer vs non-shared writer -- -- For most applications you probably want just 'OneToOneChannel'. -- -- It is possible for the type system to infer which channel you want when -- you use 'newChannel'. If the types of the ends are known by the type system, -- the channel-type can be inferred. So you can usually just write 'newChannel', -- and depending on how you use the channel, the type system will figure out -- which one you needed. module Control.Concurrent.CHP.Channels ( -- * Channel Creation Chan, Channel(..), writeChannelStrict, newChannelWithLabel, newChannelWR, newChannelRW, ChannelTuple(..), newChannelList, newChannelListWithLabels, newChannelListWithStem, getChannelIdentifier, -- * Channel-Ends Chanin, Chanout, reader, writer, readers, writers, -- * Reading and Writing with Channels ReadableChannel(..), WriteableChannel(..), -- * Shared Channels claim, Shared, -- * Specific Channel Types -- | All the functions here are equivalent to newChannel (or newChannelWithLabel), but typed. So for -- example, @oneToOneChannel = newChannel :: MonadCHP m => m OneToOneChannel@. OneToOneChannel, oneToOneChannel, oneToOneChannelWithLabel, OneToAnyChannel, oneToAnyChannel, oneToAnyChannelWithLabel, AnyToOneChannel, anyToOneChannel, anyToOneChannelWithLabel, AnyToAnyChannel, anyToAnyChannel, anyToAnyChannelWithLabel ) where import Control.Concurrent.STM.TVar import Control.Monad import Control.Monad.STM import Control.Monad.Trans import Control.Parallel.Strategies import Data.Maybe import Data.Unique import Control.Concurrent.CHP.Base import Control.Concurrent.CHP.CSP import Control.Concurrent.CHP.Event import Control.Concurrent.CHP.Monad import Control.Concurrent.CHP.Mutex import Control.Concurrent.CHP.Poison import Control.Concurrent.CHP.Traces.Base -- ====== -- Types: -- ====== -- | A reading channel-end type that allows poison to be thrown -- -- Eq instance added in version 1.1.1 newtype Chanin a = Chanin (STMChannel a) deriving Eq -- | A writing channel-end type that allows poison to be thrown -- -- Eq instance added in version 1.1.1 newtype Chanout a = Chanout (STMChannel a) deriving Eq newtype STMChannel a = STMChan (Event, TVar (WithPoison (Maybe a))) deriving Eq type OneToOneChannel = Chan Chanin Chanout type AnyToOneChannel = Chan (Chanin) (Shared Chanout) type OneToAnyChannel = Chan (Shared Chanin) (Chanout) type AnyToAnyChannel = Chan (Shared Chanin) (Shared Chanout) -- ======== -- Classes: -- ======== class ChaninC c a where -- Start gets the event and the transaction that will wait for data. You -- sync on the event (possible extended write occurs) then wait for data startReadChannelC :: c a -> (Event, STM (WithPoison a)) -- (extended read action goes here) -- Read releases the writer endReadChannelC :: c a -> STM (WithPoison ()) poisonReadC :: c a -> IO () checkPoisonReadC :: c a -> IO (WithPoison ()) class ChanoutC c a where -- Start checks for poison and gets the event: startWriteChannelC :: c a -> (Event, STM (WithPoison ())) -- (extended write action goes here) -- Send actually transmits the value: sendWriteChannelC :: c a -> a -> STM (WithPoison ()) -- (extended read action goes here) -- End waits for the reader to tell us we're done, must be done in a different -- transaction to the send endWriteChannelC :: c a -> STM (WithPoison ()) poisonWriteC :: c a -> IO () checkPoisonWriteC :: c a -> IO (WithPoison ()) -- | A class used for allocating new channels, and getting the reading and -- writing ends. There is a bijective assocation between the channel, and -- its pair of end types. You can see the types in the list of instances below. -- Thus, 'newChannel' may be used, and the compiler will infer which type of -- channel is required based on what end-types you get from 'reader' and 'writer'. -- Alternatively, if you explicitly type the return of 'newChannel', it will -- be definite which ends you will use. If you do want to fix the type of -- the channel you are using when you allocate it, consider using one of the -- many 'oneToOneChannel'-like shorthand functions that fix the type. class Channel r w where -- | Allocates a new channel. Nothing need be done to -- destroy\/de-allocate the channel when it is no longer in use. newChannel :: MonadCHP m => m (Chan r w a) -- | A class indicating that a channel can be read from. class ReadableChannel chanEnd where -- minimal implementation: extReadChannel -- | Reads from the given reading channel-end readChannel :: chanEnd a -> CHP a readChannel c = extReadChannel c return -- | Performs an extended read from the channel, performing the given action -- before freeing the writer extReadChannel :: chanEnd a -> (a -> CHP b) -> CHP b -- | A class indicating that a channel can be written to. class WriteableChannel chanEnd where -- minimal implementation: extWriteChannel -- | Writes from the given writing channel-end writeChannel :: chanEnd a -> a -> CHP () writeChannel c x = extWriteChannel c (return x) -- | Starts the communication, then performs the given extended action, then -- sends the result of that down the channel extWriteChannel :: chanEnd a -> CHP a -> CHP () -- | A helper class for easily creating several channels of the same type. -- The same type refers not only to what type the channel carries, but -- also to the type of channel (one-to-one no poison, one-to-any with -- poison, etc). You can write code like this: -- -- > (a, b, c, d, e) <- newChannels -- -- To create five channels of the same type. class ChannelTuple t where newChannels :: MonadCHP m => m t -- ========== -- Functions: -- ========== -- | A helper function that uses the parallel strategies library (see the -- paper: \"Algorithm + Strategy = Parallelism\", P.W. Trinder et al, JFP -- 8(1) 1998, -- ) -- to make sure that the value sent down a channel is strictly evaluated -- by the sender before transmission. -- -- This is useful when you want to write worker processes that evaluate data -- and send it back to some \"harvester\" process. By default the values sent -- back may be unevaluated, and thus the harvester might end up doing the evaluation. -- If you use this function, the value is guaranteed to be completely evaluated -- before sending. -- -- Added in version 1.0.2. writeChannelStrict :: (NFData a, WriteableChannel chanEnd) => chanEnd a -> a -> CHP () writeChannelStrict c x = (writeChannel c $| rnf) x chan :: Monad m => m (Unique, c a) -> (c a -> r a) -> (c a -> w a) -> m (Chan r w a) chan m r w = do (u, x) <- m return $ Chan u (r x) (w x) -- | Like 'newChannel' but also associates a label with that channel in a -- trace. You can use this function whether tracing is turned on or not, -- so if you ever use tracing, you should use this rather than 'newChannel'. newChannelWithLabel :: (Channel r w, MonadCHP m) => String -> m (Chan r w a) newChannelWithLabel l = do c <- newChannel liftCHP . liftPoison . liftTrace $ labelUnique (getChannelIdentifier c) l return c -- | A helper that is like 'newChannel' but returns the reading and writing -- end of the channels directly. newChannelRW :: (Channel r w, MonadCHP m) => m (r a, w a) newChannelRW = do c <- newChannel return (reader c, writer c) -- | A helper that is like 'newChannel' but returns the writing and reading -- end of the channels directly. newChannelWR :: (Channel r w, MonadCHP m) => m (w a, r a) newChannelWR = do c <- newChannel return (writer c, reader c) -- | Creates a list of channels of the same type with the given length. If -- you need to access some channels by index, use this function. Otherwise -- you may find using 'newChannels' to be easier. newChannelList :: (Channel r w, MonadCHP m) => Int -> m [Chan r w a] newChannelList n = replicateM n newChannel -- | A helper that is like 'newChannelList', but labels the channels according -- to a pattern. Given a stem such as foo, it names the channels in the list -- foo0, foo1, foo2, etc. newChannelListWithStem :: (Channel r w, MonadCHP m) => Int -> String -> m [Chan r w a] newChannelListWithStem n s = sequence [newChannelWithLabel (s ++ show i) | i <- [0 .. (n - 1)]] -- | A helper that is like 'newChannelList', but labels the channels with the -- given list. The number of channels returned is the same as the length of -- the list of labels newChannelListWithLabels :: (Channel r w, MonadCHP m) => [String] -> m [Chan r w a] newChannelListWithLabels = mapM newChannelWithLabel -- | Gets all the reading ends of a list of channels. A shorthand for @map -- reader@. readers :: [Chan r w a] -> [r a] readers = map reader -- | Gets all the writing ends of a list of channels. A shorthand for @map -- writer@. writers :: [Chan r w a] -> [w a] writers = map writer stmChannel :: MonadIO m => m (Unique, STMChannel a) stmChannel = liftIO $ do e <- newEvent ChannelComm 2 c <- atomically $ newTVar $ NoPoison Nothing return (getEventUnique e, STMChan (e,c)) -- | A type-constrained version of newChannel. oneToOneChannel :: MonadCHP m => m (OneToOneChannel a) oneToOneChannel = newChannel -- | A type-constrained version of newChannelWithLabel. -- -- Added in version 1.2.0. oneToOneChannelWithLabel :: MonadCHP m => String -> m (OneToOneChannel a) oneToOneChannelWithLabel = newChannelWithLabel -- | Claims the given channel-end, executes the given block, then releases -- the channel-end and returns the output value. If poison or an IO -- exception is thrown inside the block, the channel is released and the -- poison\/exception re-thrown. claim :: Shared c a -> (c a -> CHP b) -> CHP b claim (Shared (lock, c)) body = scopeBlock (claimMutex lock >> return c) (\y -> do x <- body y liftIO $ releaseMutex lock return x) (releaseMutex lock) -- | A type-constrained version of newChannel. anyToOneChannel :: MonadCHP m => m (AnyToOneChannel a) anyToOneChannel = newChannel -- | A type-constrained version of newChannel. oneToAnyChannel :: MonadCHP m => m (OneToAnyChannel a) oneToAnyChannel = newChannel -- | A type-constrained version of newChannel. anyToAnyChannel :: MonadCHP m => m (AnyToAnyChannel a) anyToAnyChannel = newChannel -- | A type-constrained version of newChannelWithLabel. -- -- Added in version 1.2.0. anyToOneChannelWithLabel :: MonadCHP m => String -> m (AnyToOneChannel a) anyToOneChannelWithLabel = newChannelWithLabel -- | A type-constrained version of newChannelWithLabel. -- -- Added in version 1.2.0. oneToAnyChannelWithLabel :: MonadCHP m => String -> m (OneToAnyChannel a) oneToAnyChannelWithLabel = newChannelWithLabel -- | A type-constrained version of newChannelWithLabel. -- -- Added in version 1.2.0. anyToAnyChannelWithLabel :: MonadCHP m => String -> m (AnyToAnyChannel a) anyToAnyChannelWithLabel = newChannelWithLabel -- ========== -- Instances: -- ========== instance ReadableChannel Chanin where readChannel (Chanin c) = let (e, m) = startReadChannelC c in buildOnEventPoison (indivRecJust ChannelRead) e (return ()) (liftSTM $ do x <- m endReadChannelC c return x) >>= checkPoison extReadChannel (Chanin c) body = let (e, m) = startReadChannelC c in scopeBlock (buildOnEventPoison (indivRecJust ChannelRead) e (return ()) (liftSTM m) >>= checkPoison) (\val -> do x <- body val liftSTM $ endReadChannelC c return x) (poisonReadC c) instance WriteableChannel Chanout where writeChannel (Chanout c) x = let (e, m) = startWriteChannelC c in buildOnEventPoison (indivRecJust ChannelWrite) e (return ()) (liftM2 (++) (liftSTM $ sequence [m, sendWriteChannelC c x]) (liftSTM $ sequence [endWriteChannelC c])) >>= checkPoison . mergeWithPoison extWriteChannel (Chanout c) body = let (e, m) = startWriteChannelC c in scopeBlock (buildOnEventPoison (indivRecJust ChannelWrite) e (return ()) (liftSTM m) >>= checkPoison) (const $ sequence [body >>= liftSTM . sendWriteChannelC c ,liftSTM (endWriteChannelC c)] >>= checkPoison . mergeWithPoison) (poisonWriteC c) instance Poisonable (Chanin a) where poison (Chanin c) = liftIO $ poisonReadC c checkForPoison (Chanin c) = liftCHP $ liftIO (checkPoisonReadC c) >>= checkPoison instance Poisonable (Chanout a) where poison (Chanout c) = liftIO $ poisonWriteC c checkForPoison (Chanout c) = liftCHP $ liftIO (checkPoisonWriteC c) >>= checkPoison instance (Channel r w) => ChannelTuple (Chan r w a, Chan r w a) where newChannels = do c0 <- newChannel c1 <- newChannel return (c0, c1) instance (Channel r w) => ChannelTuple (Chan r w a, Chan r w a, Chan r w a) where newChannels = do c0 <- newChannel c1 <- newChannel c2 <- newChannel return (c0, c1, c2) instance (Channel r w) => ChannelTuple (Chan r w a, Chan r w a, Chan r w a, Chan r w a) where newChannels = do c0 <- newChannel c1 <- newChannel c2 <- newChannel c3 <- newChannel return (c0, c1, c2, c3) instance (Channel r w) => ChannelTuple (Chan r w a, Chan r w a, Chan r w a, Chan r w a, Chan r w a) where newChannels = do c0 <- newChannel c1 <- newChannel c2 <- newChannel c3 <- newChannel c4 <- newChannel return (c0, c1, c2, c3, c4) instance (Channel r w) => ChannelTuple (Chan r w a, Chan r w a, Chan r w a, Chan r w a, Chan r w a, Chan r w a) where newChannels = do c0 <- newChannel c1 <- newChannel c2 <- newChannel c3 <- newChannel c4 <- newChannel c5 <- newChannel return (c0, c1, c2, c3, c4, c5) -- Some of this is defensive programming -- the writer should never be able -- to discover poison in the channel variable, for example instance ChaninC STMChannel a where startReadChannelC (STMChan (e,tv)) = (e, waitForJustOrPoison tv) endReadChannelC (STMChan (_,tv)) = do x <- readTVar tv case x of PoisonItem -> return PoisonItem NoPoison _ -> do writeTVar tv $ NoPoison Nothing return $ NoPoison () poisonReadC (STMChan (e,tv)) = liftSTM $ do poisonEvent e writeTVar tv PoisonItem checkPoisonReadC (STMChan (e,_)) = liftSTM $ checkEventForPoison e instance ChanoutC STMChannel a where startWriteChannelC (STMChan (e,tv)) = (e, do x <- readTVar tv case x of PoisonItem -> return PoisonItem NoPoison _ -> return $ NoPoison ()) sendWriteChannelC (STMChan (_, tv)) val = do x <- readTVar tv case x of PoisonItem -> return PoisonItem NoPoison _ -> do writeTVar tv $ NoPoison $ Just val return $ NoPoison () endWriteChannelC (STMChan (_, tv)) = waitForNothingOrPoison tv poisonWriteC (STMChan (e,tv)) = liftSTM $ do poisonEvent e writeTVar tv PoisonItem checkPoisonWriteC (STMChan (e,_)) = liftSTM $ checkEventForPoison e instance Channel Chanin Chanout where newChannel = chan stmChannel Chanin Chanout instance Channel (Shared Chanin) Chanout where newChannel = do m <- newMutex c <- newChannel return $ Chan (getChannelIdentifier c) (Shared (m, reader c)) (writer c) instance Channel Chanin (Shared Chanout) where newChannel = do m <- newMutex c <- newChannel return $ Chan (getChannelIdentifier c) (reader c) (Shared (m, writer c)) instance Channel (Shared Chanin) (Shared Chanout) where newChannel = do m <- newMutex m' <- newMutex c <- newChannel return $ Chan (getChannelIdentifier c) (Shared (m, reader c)) (Shared (m', writer c))