-- Sync. -- Copyright (c) 2010, Neil Brown. -- All rights reserved. -- -- Redistribution and use in source and binary forms, with or without -- modification, are permitted provided that the following conditions are -- met: -- -- * Redistributions of source code must retain the above copyright -- notice, this list of conditions and the following disclaimer. -- * Redistributions in binary form must reproduce the above copyright -- notice, this list of conditions and the following disclaimer in the -- documentation and/or other materials provided with the distribution. -- * The name of Neil Brown may not be used to endorse or promote products derived from -- this software without specific prior written permission. -- -- THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS -- IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, -- THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR -- PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR -- CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, -- EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, -- PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR -- PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF -- LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING -- NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS -- SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -- | This library supports synchronous message-passing with choice in Haskell. -- It is similar to the CML package for Haskell -- (), and shares a similar API. It -- avoids some of the problems the CML package has with choose on GHC 6.12.1 -- (), -- and also deliberately leaves out some of the features in the CML package. -- -- The implementation is explained in this blog post: -- . -- The algorithm uses STM rather than spawning threads to implement choice. -- -- At the moment the library is fairly unfeatured; if you want more features I -- would suggest using my more powerful CHP library -- () -- but then I am biased! module Control.Concurrent.Sync (Event, Channel, choose, send, recv, newChannel, sync) where import Control.Applicative import Control.Concurrent import Control.Concurrent.STM import Control.Monad import Data.Function ---------------------------------------------------------------------- -- Internal implementation: ---------------------------------------------------------------------- data Offer = Offer { offerThreadId :: ThreadId, offerEvents :: [IntEvent] } instance Eq Offer where (==) = (==) `on` offerThreadId data IntEvent = IntEvent { enrollCount :: Int, offersTV :: TVar [(STM (), Offer)] } newIntEvent :: Int -> IO IntEvent newIntEvent n = IntEvent n <$> atomically (newTVar []) -- A ham-fisted strictifying function, but it seems to do the trick modifyTVar' :: ([a] -> [a]) -> TVar [a] -> STM () modifyTVar' f tv = do xs <- f <$> readTVar tv case length xs of -1 -> error "modifyTVar' impossible" _ -> writeTVar tv xs -- | Executes the actions until it finds one that returns True (at which point -- it will execute no further actions). Returns True if an action did, False -- if none of them did. anyM :: Monad m => [m Bool] -> m Bool anyM = foldM orM False where orM True _ = return True orM False m = m recordOffer :: Offer -> (STM (), IntEvent) -> STM () recordOffer o (act, e) = modifyTVar' ((act, o):) (offersTV e) offerAll :: [(STM (), IntEvent, a)] -> IO a offerAll off = do tid <- myThreadId rtv <- atomically $ checkAll tid atomically $ readTVar rtv >>= maybe retry return where checkAll tid = do rtv <- newTVar Nothing let offer = [(act >> writeTVar rtv (Just x), e) | (act, e, x) <- off] complete <- anyM (map checkComplete offer) unless complete $ mapM_ (recordOffer (Offer tid [e | (_, e, _) <- off])) offer return rtv checkComplete :: (STM (), IntEvent) -> STM Bool checkComplete (act, e) = do offers <- readTVar (offersTV e) if enrollCount e /= length offers + 1 then return False else do sequence_ (act : map fst offers) mapM_ (revoke . snd) offers return True revoke :: Offer -> STM () revoke off = mapM_ (modifyTVar' (filter ((/= off) . snd)) . offersTV) (offerEvents off) ---------------------------------------------------------------------- -- Public API: ---------------------------------------------------------------------- -- | A synchronous communication channel (i.e. the writer must wait until the read -- is willing to read the value). Should only ever be used by one writer and one -- reader -- the algorithm is not currently designed for anything else. data Channel a = Channel { _chanEvent :: IntEvent, chanTVar :: TVar a } instance Eq (Channel a) where (==) = (==) `on` chanTVar -- | A synchronisation that is yet to be executed (and that returns a value). -- The functor instance allows you to modify the value after the -- synchronisation has occurred. data Event a = Event {events :: [((STM (), STM a), IntEvent)]} instance Functor Event where fmap f (Event es) = Event [((dur, fmap f aft), e) | ((dur, aft), e) <- es] -- | Synchronises on an event. This blocks the thread until the Event can occur. -- This may be a choice of several different events, via the 'choose' function. sync :: Event a -> IO a sync ev = offerAll [(dur, e, aft) | ((dur, aft), e) <- events ev] >>= atomically -- | Creates an event that represents sending the given value on the given channel. send :: Channel a -> a -> Event () send (Channel e ctv) x = Event [((writeTVar ctv x, return ()), e)] -- | Creates an event that represents receiving a value from the given channel. recv :: Channel a -> Event a recv (Channel e ctv) = Event [((return (), readTVar ctv), e)] -- | Creates an event that is the choice of the given list of events. -- -- If the list is a singleton this is equivalent to calling 'head'. If the -- list is empty, and you call 'sync' on the resulting event, it will block -- forever (or GHC will throw you an exception because of it). -- -- You should not pass more than one event from each channel in the list (nor -- combine two events that are themselves choices, such that you end up -- combining more than one event from a channel) or undefined behaviour will result. choose :: [Event a] -> Event a choose = Event . concatMap events -- | Creates a new communication channel. newChannel :: IO (Channel a) newChannel = liftM2 Channel (newIntEvent 2) (atomically $ newTVar (error "sync:newChannel"))