-------------------------------------------------------------------------------- -- Copyright © 2009, Galois, Inc. -- Copyright © 2018, DFINITY Stiftung -- All rights reserved. -- -- Redistribution and use in source and binary forms, with or without -- modification, are permitted provided that the following conditions -- are met: -- -- * Redistributions of source code must retain the above copyright -- notice, this list of conditions and the following disclaimer. -- * Redistributions in binary form must reproduce the above copyright -- notice, this list of conditions and the following disclaimer in -- the documentation and/or other materials provided with the -- distribution. -- * Neither the name of the Galois, Inc. nor the names of its -- contributors may be used to endorse or promote products derived -- from this software without specific prior written permission. -- -- THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -- "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -- LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS -- FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE -- COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, -- INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, -- BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; -- LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER -- CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT -- LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN -- ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE -- POSSIBILITY OF SUCH DAMAGE. -------------------------------------------------------------------------------- -- | -- Module : Control.Concurrent.Classy.BoundedChan -- Copyright : © 2009 Galois Inc. -- , © 2018 DFINITY Stiftung -- Maintainer : DFINITY USA Research -- -- Implements bounded channels. These channels differ from normal 'Chan's in -- that they are guaranteed to contain no more than a certain number of -- elements. This is ideal when you may be writing to a channel faster than -- you are able to read from it. -- -- This module supports all the functions of "Control.Concurrent.Chan" except -- 'unGetChan' and 'dupChan', which are not supported for bounded channels. -- -- Extra consistency: This version enforces that if thread Alice writes -- e1 followed by e2 then e1 will be returned by readBoundedChan before e2. -- Conversely, if thead Bob reads e1 followed by e2 then it was true that -- writeBoundedChan e1 preceded writeBoundedChan e2. -- -- Previous versions did not enforce this consistency: if writeBoundedChan were -- preempted between putMVars or killThread arrived between putMVars then it -- can fail. Similarly it might fail if readBoundedChan were stopped after putMVar -- and before the second takeMVar. An unlucky pattern of several such deaths -- might actually break the invariants of the array in an unrecoverable way -- causing all future reads and writes to block. -------------------------------------------------------------------------------- module Control.Concurrent.Classy.BoundedChan ( BoundedChan , newBoundedChan , writeBoundedChan , trywriteBoundedChan , readBoundedChan , tryreadBoundedChan , isEmptyBoundedChan , writeList2BoundedChan ) where -------------------------------------------------------------------------------- import Control.Monad (replicateM) import Data.Array (Array, listArray, (!)) import qualified Control.Concurrent.Classy.MVar as MVar import Control.Monad.Catch (mask_, onException) import Control.Monad.Conc.Class (MonadConc(MVar)) -------------------------------------------------------------------------------- -- | A 'BoundedChan' is an abstract data type representing a bounded channel. -- -- @since 1.6.2.0 data BoundedChan m a = BoundedChan { _size :: Int , _contents :: Array Int (MVar m a) , _writePos :: MVar m Int , _readPos :: MVar m Int } deriving () -- TODO: check if the fields of BoundedChan could be strict / unpacked -------------------------------------------------------------------------------- -- Versions of modifyMVar and withMVar that do not 'restore' the previous mask -- state when running 'io', with added modification strictness. -- The lack of 'restore' may make these perform better than the normal version. -- Moving strictness here makes using them more pleasant. {-# INLINE modifyMVarMask #-} modifyMVarMask :: (MonadConc m) => MVar m a -> (a -> m (a, b)) -> m b modifyMVarMask m callback = mask_ $ do a <- MVar.takeMVar m (a', b) <- callback a `onException` MVar.putMVar m a MVar.putMVar m $! a' pure b {-# INLINE modifyMVarMask_ #-} modifyMVarMask_ :: (MonadConc m) => MVar m a -> (a -> m a) -> m () modifyMVarMask_ m callback = mask_ $ do a <- MVar.takeMVar m a' <- callback a `onException` MVar.putMVar m a MVar.putMVar m $! a' {-# INLINE withMVarMask #-} withMVarMask :: (MonadConc m) => MVar m a -> (a -> m b) -> m b withMVarMask m callback = mask_ $ do a <- MVar.takeMVar m b <- callback a `onException` MVar.putMVar m a MVar.putMVar m a pure b -------------------------------------------------------------------------------- -- | -- @newBoundedChan n@ returns a channel than can contain no more than @n@ -- elements. -- -- @since 1.6.2.0 newBoundedChan :: (MonadConc m) => Int -> m (BoundedChan m a) newBoundedChan x = do entls <- replicateM x MVar.newEmptyMVar wpos <- MVar.newMVar 0 rpos <- MVar.newMVar 0 let entries = listArray (0, x - 1) entls pure (BoundedChan x entries wpos rpos) -- | -- Write an element to the channel. If the channel is full, this routine will -- block until it is able to write. Blockers wait in a fair FIFO queue. -- -- @since 1.6.2.0 writeBoundedChan :: (MonadConc m) => BoundedChan m a -> a -> m () writeBoundedChan (BoundedChan size contents wposMV _) x = modifyMVarMask_ wposMV $ \wpos -> do MVar.putMVar (contents ! wpos) x pure (succ wpos `mod` size) -- only advance when putMVar succeeds -- | -- A variant of 'writeBoundedChan' which, instead of blocking when the channel is -- full, simply aborts and does not write the element. Note that this routine -- can still block while waiting for write access to the channel. -- -- @since 1.6.2.0 trywriteBoundedChan :: (MonadConc m) => BoundedChan m a -> a -> m Bool trywriteBoundedChan (BoundedChan size contents wposMV _) x = modifyMVarMask wposMV $ \wpos -> do success <- MVar.tryPutMVar (contents ! wpos) x -- only advance when putMVar succeeds let wpos' = if success then succ wpos `mod` size else wpos pure (wpos', success) -- | -- Read an element from the channel. If the channel is empty, this routine -- will block until it is able to read. Blockers wait in a fair FIFO queue. -- -- @since 1.6.2.0 readBoundedChan :: (MonadConc m) => BoundedChan m a -> m a readBoundedChan (BoundedChan size contents _ rposMV) = modifyMVarMask rposMV $ \rpos -> do a <- MVar.takeMVar (contents ! rpos) pure (succ rpos `mod` size, a) -- only advance when takeMVar succeeds -- | -- A variant of 'readBoundedChan' which, instead of blocking when the channel is -- empty, immediately returns 'Nothing'. Otherwise, 'tryreadBoundedChan' returns -- @'Just' a@ where @a@ is the element read from the channel. Note that this -- routine can still block while waiting for read access to the channel. -- -- @since 1.6.2.0 tryreadBoundedChan :: (MonadConc m) => BoundedChan m a -> m (Maybe a) tryreadBoundedChan (BoundedChan size contents _ rposMV) = modifyMVarMask rposMV $ \rpos -> do ma <- MVar.tryTakeMVar (contents ! rpos) -- only advance when takeMVar succeeds let rpos' = case ma of Just _ -> succ rpos `mod` size Nothing -> rpos pure (rpos', ma) -- | -- Returns 'True' if the supplied channel is empty. -- -- NOTE: This may block on an empty channel if there is a blocked reader. -- NOTE: This function is deprecated. -- -- @since 1.6.2.0 {-# DEPRECATED isEmptyBoundedChan "This isEmptyBoundedChan can block, no non-blocking substitute yet" #-} isEmptyBoundedChan :: (MonadConc m) => BoundedChan m a -> m Bool isEmptyBoundedChan (BoundedChan _ contents _ rposMV) = withMVarMask rposMV $ \rpos -> MVar.isEmptyMVar (contents ! rpos) -- | -- Write a list of elements to the channel. -- If the channel becomes full, this routine will block until it can write. -- Competing writers may interleave with this one. -- -- @since 1.6.2.0 writeList2BoundedChan :: (MonadConc m) => BoundedChan m a -> [a] -> m () writeList2BoundedChan = mapM_ . writeBoundedChan --------------------------------------------------------------------------------