{-# LANGUAGE BangPatterns , DeriveDataTypeable, CPP , ScopedTypeVariables #-} module Control.Concurrent.Chan.Unagi.Unboxed.Internal #ifdef NOT_x86 {-# WARNING "This library is unlikely to perform well on architectures without a fetch-and-add instruction" #-} #endif (sEGMENT_LENGTH , UnagiPrim(..) , InChan(..), OutChan(..), ChanEnd(..), Cell, Stream(..), ElementArray(..), SignalIntArray , readElementArray, writeElementArray , NextSegment(..), StreamHead(..), segSource , newChanStarting, writeChan, readChan, readChanOnException , dupChan, tryReadChan -- for NoBlocking.Unboxed , moveToNextCell, waitingAdvanceStream, cellEmpty ) where -- Forked from src/Control/Concurrent/Chan/Unagi/Internal.hs at 443465. See -- that implementation for additional details and notes which we omit here. -- -- Internals exposed for testing and for re-use in Unagi.NoBlocking.Unboxed -- -- TODO integration w/ ByteString -- - we'd need to make IndexedMVar () and always write to ByteString -- - we'd need to switch to Storable probably. -- - exporting slices of elements as ByteString -- - lazy bytestring would be easiest because of segment boundaries -- - might be tricky to do blocking checking efficiently -- - writing bytearrays -- - fast with memcpy -- -- TODO MAYBE another variation: -- Either with a single reader, or a counter that tracks readers as they -- exit a segment (so that we know when can be manually 'free'd), allowing -- use of unmanaged memory, and: -- - creatable with calloc -- - replace IORef with an unboxed 1-element array holding Addr of MutableByteArray? or something... -- - nullPtr can be used to get us references of (Maybe a) -- > IORef (StreamHead ) -- > Int + (Stream ) -- > arr + arr + IndexedMvar + Maybe Stream -- - We would need to move IndexedMvar into streamhead -- - No CAS for Ptr/ForeignPtr but we can probably extract the mutablebytearray for CAS -- Data.Primitive.ByteArray.mutableByteArrayContents ~> Addr -- , and ForeignPtr holds an Addr# + MutableByteArray internally... -- , use GHC.ForeignPtr and wrap MutableByteArray in PlainPtr and off to races -- - we can re-use read segments as soon as they pass. -- -- TODO GHC 7.10 and/or someday: -- - use segment length of e.g. 1022 to account for MutableByteArray -- header, then align to cache line (note: we don't really need to use -- div/mod here; just subtraction) This could be done in all -- implementations. (boxed arrays are: 3 + n/128 + n words?? Who knows...) -- - use a smaller sigArr of 1024 bytes (just makes segSource a little cheaper) -- - here use a clever fetchAndAdd to distinguish 4 different cells (+0001, vs +0100, etc) -- - the NoBlocking can read/write to individual bytes -- - calloc for mutableByteArray, when/if available -- - non-temporal writes that bypass the cache? See: http://lwn.net/Articles/255364/ -- - SIMD stuff for batch writing, or zeroing, etc. etc import Data.IORef import Control.Exception import Control.Monad.Primitive(RealWorld) import Data.Atomics.Counter.Fat import Data.Atomics import qualified Data.Primitive as P import Control.Monad import Control.Applicative import Data.Bits import GHC.Exts(inline) -- For instances: import Data.Typeable(Typeable) import Data.Int(Int8,Int16,Int32,Int64) import Data.Word(Word,Word8,Word16,Word32,Word64) import Utilities import Control.Concurrent.Chan.Unagi.Constants import qualified Control.Concurrent.Chan.Unagi.NoBlocking.Types as UT import Prelude -- | The write end of a channel created with 'newChan'. newtype InChan a = InChan (ChanEnd a) deriving Typeable -- | The read end of a channel created with 'newChan'. newtype OutChan a = OutChan (ChanEnd a) deriving Typeable instance Eq (InChan a) where (InChan (ChanEnd _ headA)) == (InChan (ChanEnd _ headB)) = headA == headB instance Eq (OutChan a) where (OutChan (ChanEnd _ headA)) == (OutChan (ChanEnd _ headB)) = headA == headB -- InChan & OutChan are mostly identical, sharing a stream, but with -- independent counters data ChanEnd a = ChanEnd -- Both Chan ends must start with the same counter value. !AtomicCounter -- the stream head; this must never point to a segment whose offset -- is greater than the counter value !(IORef (StreamHead a)) deriving Typeable data StreamHead a = StreamHead !Int !(Stream a) -- The array we actually store our Prim elements in newtype ElementArray a = ElementArray (P.MutableByteArray RealWorld) readElementArray :: (P.Prim a)=> ElementArray a -> Int -> IO a {-# INLINE readElementArray #-} readElementArray (ElementArray arr) i = P.readByteArray arr i writeElementArray :: (P.Prim a)=> ElementArray a -> Int -> a -> IO () {-# INLINE writeElementArray #-} writeElementArray (ElementArray arr) i a = P.writeByteArray arr i a -- We CAS on this, using Ints to signal (see below) type SignalIntArray = P.MutableByteArray RealWorld -- TRANSITIONS and POSSIBLE VALUES: -- During Read: -- Empty -> Blocking -- Written -- Blocking (only when dupChan used) -- During Write: -- Empty -> Written -- Blocking {- data Cell a = Empty -- 0 | Written -- 1 | Blocking -- 2 -} type Cell = Int cellEmpty, cellWritten, cellBlocking :: Cell cellEmpty = 0 cellWritten = 1 cellBlocking = 2 -- NOTE: attempts to make allocation and initialization faster via copying, or -- other tricks failed; although a calloc was about 2x faster (but that was for -- unmanaged memory) segSource :: forall a. (UnagiPrim a)=> IO (SignalIntArray, ElementArray a) --ScopedTypeVariables {-# INLINE segSource #-} segSource = do -- A largish pinned array seems like it would be the best choice here. sigArr <- P.newAlignedPinnedByteArray (P.sizeOf cellEmpty `unsafeShiftL` lOG_SEGMENT_LENGTH) -- times sEGMENT_LENGTH (P.alignment cellEmpty) -- NOTE: we need these to be aligned to (some multiple of) Word boundaries -- for magic trick to be correct, and for assumptions about atomicity of -- loads/stores to hold! eArr <- P.newAlignedPinnedByteArray (P.sizeOf (undefined :: a) `unsafeShiftL` lOG_SEGMENT_LENGTH) (P.alignment (undefined :: a)) P.setByteArray sigArr 0 sEGMENT_LENGTH cellEmpty -- If no atomicUnicorn then we always check in at sigArr, so no need to -- initialize eArr: maybe (return ()) (P.setByteArray eArr 0 sEGMENT_LENGTH) (atomicUnicorn :: Maybe a) return (sigArr, ElementArray eArr) -- NOTE: We always CAS this into place which provides write barrier, such -- that arrays are fully initialized before they can be read. No -- corresponding barrier is needed in waitingAdvanceStream. -- | Our class of types supporting primitive array operations. Instance method -- definitions are architecture-dependent. class (P.Prim a, Eq a)=> UnagiPrim a where -- | When the read and write operations of the underlying @Prim@ instances -- on aligned memory are atomic, this may be set to @Just x@ where @x@ is -- some rare (i.e. unlikely to occur frequently in your data) magic value; -- this might help speed up some @UnagiPrim@ operations. -- -- Where those 'Prim' instance operations are not atomic, this *must* be -- set to @Nothing@. atomicUnicorn :: Maybe a atomicUnicorn = Nothing -- These ought all to be atomic for 32-bit or 64-bit systems: instance UnagiPrim Char where atomicUnicorn = Just '\1010101' instance UnagiPrim Float where atomicUnicorn = Just 0xDADADA instance UnagiPrim Int where atomicUnicorn = Just 0xDADADA instance UnagiPrim Int8 where atomicUnicorn = Just 113 instance UnagiPrim Int16 where atomicUnicorn = Just 0xDAD instance UnagiPrim Int32 where atomicUnicorn = Just 0xDADADA instance UnagiPrim Word where atomicUnicorn = Just 0xDADADA instance UnagiPrim Word8 where atomicUnicorn = Just 0xDA instance UnagiPrim Word16 where atomicUnicorn = Just 0xDADA instance UnagiPrim Word32 where atomicUnicorn = Just 0xDADADADA instance UnagiPrim P.Addr where atomicUnicorn = Just P.nullAddr -- These should conservatively be expected to be atomic only on 64-bit -- machines: instance UnagiPrim Int64 where #ifdef IS_64_BIT atomicUnicorn = Just 0xDADADADADADA #endif instance UnagiPrim Word64 where #ifdef IS_64_BIT atomicUnicorn = Just 0xDADADADADADA #endif instance UnagiPrim Double where #ifdef IS_64_BIT atomicUnicorn = Just 0xDADADADADADA #endif -- NOTE: we tried combining the SignalIntArray and ElementArray into a single -- bytearray in the unagi-unboxed-combined-bytearray branch but saw no -- significant improvement. data Stream a = Stream !SignalIntArray !(ElementArray a) -- For coordinating blocking between reader/writer; NOTE [1] (IndexedMVar a) -- N.B. must remain non-strict for NoBlocking.Unboxed -- The next segment in the stream; NOTE [2] !(IORef (NextSegment a)) -- [1] An important property: we can switch out this implementation as long -- as it utilizes a fresh MVar for each reader/writer pair. -- -- [2] new segments are allocated and put here as we go, with threads -- cooperating to allocate new segments: data NextSegment a = NoSegment | Next !(Stream a) -- we expose `startingCellOffset` for debugging correct behavior with overflow: newChanStarting :: UnagiPrim a=> Int -> IO (InChan a, OutChan a) {-# INLINE newChanStarting #-} newChanStarting !startingCellOffset = do (sigArr0,eArr0) <- segSource stream <- Stream sigArr0 eArr0 <$> newIndexedMVar <*> newIORef NoSegment let end = ChanEnd <$> newCounter startingCellOffset <*> newIORef (StreamHead startingCellOffset stream) liftA2 (,) (InChan <$> end) (OutChan <$> end) -- | Duplicate a chan: the returned @OutChan@ begins empty, but data written to -- the argument @InChan@ from then on will be available from both the original -- @OutChan@ and the one returned here, creating a kind of broadcast channel. dupChan :: InChan a -> IO (OutChan a) {-# INLINE dupChan #-} dupChan (InChan (ChanEnd counter streamHead)) = do hLoc <- readIORef streamHead loadLoadBarrier wCount <- readCounter counter OutChan <$> (ChanEnd <$> newCounter wCount <*> newIORef hLoc) -- | Write a value to the channel. writeChan :: UnagiPrim a=> InChan a -> a -> IO () {-# INLINE writeChan #-} writeChan (InChan ce) = \a-> mask_ $ do (segIx, (Stream sigArr eArr mvarIndexed next), maybeUpdateStreamHead) <- moveToNextCell ce maybeUpdateStreamHead -- NOTE!: must write element before signaling with CAS: writeElementArray eArr segIx a actuallyWas <- casByteArrayInt sigArr segIx cellEmpty cellWritten -- NOTE[1] -- try to pre-allocate next segment: when (segIx == 0) $ void $ waitingAdvanceStream next 0 case actuallyWas of -- CAS SUCCEEDED: -- 0 {- Empty -} -> return () -- CAS FAILED: -- 2 {- Blocking -} -> putMVarIx mvarIndexed segIx a 1 {- Written -} -> error "Nearly Impossible! Expected Blocking" _ -> error "Invalid signal seen in writeChan!" -- [1] casByteArrayInt provides the write barrier we need here to make sure -- GHC maintains our ordering such that the element is written before we -- signal its availability with the CAS to sigArr that follows. See [2] in -- readSegIxUnmasked. -- Core of blocking read functions, taking handler and output of moveToNextCell readSegIxUnmasked :: UnagiPrim a=> (IO a -> IO a) -> (Int, Stream a, IO ()) -> IO a {-# INLINE readSegIxUnmasked #-} readSegIxUnmasked h = \(segIx, (Stream sigArr eArr mvarIndexed _), maybeUpdateStreamHead)-> do maybeUpdateStreamHead let readBlocking = inline h $ readMVarIx mvarIndexed segIx -- NOTE [1] readElem = readElementArray eArr segIx slowRead = do -- Assume probably blocking (Note: casByteArrayInt is a full barrier) actuallyWas <- casByteArrayInt sigArr segIx cellEmpty cellBlocking -- NOTE [2] case actuallyWas of -- succeeded writing Empty; proceed with blocking 0 {- Empty -} -> readBlocking -- else in the meantime, writer wrote 1 {- Written -} -> readElem -- else in the meantime a dupChan reader read, blocking 2 {- Blocking -} -> readBlocking _ -> error "Invalid signal seen in readSegIxUnmasked!" -- If we know writes of this element are atomic, we can determine if the -- element has been written, and possibly return it without consulting -- sigArr. case atomicUnicorn of Just magic -> do el <- readElem if (el /= magic) -- We know `el` was atomically written: then return el else slowRead Nothing -> slowRead -- [1] we must use `readMVarIx` here to support `dupChan`. It's also -- important that the behavior of readMVarIx be identical to a readMVar on -- the same MVar. -- -- [2] casByteArrayInt provides the loadLoadBarrier we need here. See [1] in -- writeChan. -- TODO we might want also a blocking `IO a` returned here, or use an opaque -- Element type supporting blocking, since otherwise calling `tryReadChan` we -- give up the ability to block on that element. Please open an issue if you -- need this in the meantime. And also handling of lost elements on async -- exceptions. And also isActive... -- | Returns immediately with: -- -- - an @'UT.Element' a@ future, which returns one unique element when it -- becomes available via 'UT.tryRead'. -- -- - a blocking @IO@ action that returns the element when it becomes available. -- -- If you're using this function exclusively you might find the implementation -- in "Control.Concurrent.Chan.Unagi.NoBlocking.Unboxed" is faster. -- -- /Note re. exceptions/: When an async exception is raised during a @tryReadChan@ -- the message that the read would have returned is likely to be lost, just as -- it would be when raised directly after this function returns. tryReadChan :: UnagiPrim a=> OutChan a -> IO (UT.Element a, IO a) {-# INLINE tryReadChan #-} tryReadChan (OutChan ce) = do -- no masking needed -- NOTE: implementation adapted from readSegIxUnmasked: segStuff@(segIx, (Stream sigArr eArr mvarIndexed _), maybeUpdateStreamHead) <- moveToNextCell ce maybeUpdateStreamHead let readElem = readElementArray eArr segIx slowRead = do sig <- P.readByteArray sigArr segIx case (sig :: Int) of 0 {- Empty -} -> return Nothing 1 {- Written -} -> loadLoadBarrier >> Just <$> readElem 2 {- Blocking -} -> tryReadMVarIx mvarIndexed segIx _ -> error "Invalid signal seen in tryReadChan!" return ( UT.Element $ case atomicUnicorn of Just magic -> do el <- readElem if (el /= magic) then return $ Just el else slowRead Nothing -> slowRead , readSegIxUnmasked id segStuff ) -- | Read an element from the chan, blocking if the chan is empty. -- -- /Note re. exceptions/: When an async exception is raised during a @readChan@ -- the message that the read would have returned is likely to be lost, even when -- the read is known to be blocked on an empty queue. If you need to handle -- this scenario, you can use 'readChanOnException'. readChan :: UnagiPrim a=> OutChan a -> IO a {-# INLINE readChan #-} readChan = \(OutChan ce)-> moveToNextCell ce >>= readSegIxUnmasked id -- | Like 'readChan' but allows recovery of the queue element which would have -- been read, in the case that an async exception is raised during the read. To -- be precise exceptions are raised, and the handler run, only when -- @readChanOnException@ is blocking. -- -- The second argument is a handler that takes a blocking IO action returning -- the element, and performs some recovery action. When the handler is called, -- the passed @IO a@ is the only way to access the element. readChanOnException :: UnagiPrim a=> OutChan a -> (IO a -> IO ()) -> IO a {-# INLINE readChanOnException #-} readChanOnException (OutChan ce) h = mask_ $ moveToNextCell ce >>= readSegIxUnmasked (\io-> io `onException` (h io)) -- increments counter, finds stream segment of corresponding cell (updating the -- stream head pointer as needed), and returns the stream segment and relative -- index of our cell. moveToNextCell :: UnagiPrim a=> ChanEnd a -> IO (Int, Stream a, IO ()) {-# INLINE moveToNextCell #-} moveToNextCell (ChanEnd counter streamHead) = do (StreamHead offset0 str0) <- readIORef streamHead ix <- incrCounter 1 counter let (segsAway, segIx) = assert ((ix - offset0) >= 0) $ divMod_sEGMENT_LENGTH $! (ix - offset0) -- (ix - offset0) `quotRem` sEGMENT_LENGTH {-# INLINE go #-} go 0 str = return str go !n (Stream _ _ _ next) = waitingAdvanceStream next (nEW_SEGMENT_WAIT*segIx) >>= go (n-1) str <- go segsAway str0 -- We need to return this continuation here for NoBlocking.Unboxed, which -- needs to perform this action at different points in the reader and -- writer. let !maybeUpdateStreamHead = do when (segsAway > 0) $ do let !offsetN = offset0 + (segsAway `unsafeShiftL` lOG_SEGMENT_LENGTH) --(segsAway*sEGMENT_LENGTH) writeIORef streamHead $ StreamHead offsetN str touchIORef streamHead -- NOTE [1] return (segIx,str, maybeUpdateStreamHead) -- [1] For NoBlocking.Unboxed: this helps ensure that streamHead is not GC'd -- until `maybeUpdateStreamHead` is run in calling function. For correctness -- of `isActive`. -- thread-safely try to fill `nextSegRef` at the next offset with a new -- segment, waiting some number of iterations (for other threads to handle it). -- Returns nextSegRef's StreamSegment. waitingAdvanceStream :: (UnagiPrim a)=> IORef (NextSegment a) -> Int -> IO (Stream a) waitingAdvanceStream nextSegRef = go where go !wait = assert (wait >= 0) $ do tk <- readForCAS nextSegRef case peekTicket tk of NoSegment | wait > 0 -> go (wait - 1) -- Create a potential next segment and try to insert it: | otherwise -> do potentialStrNext <- uncurry Stream <$> segSource <*> newIndexedMVar <*> newIORef NoSegment (_,tkDone) <- casIORef nextSegRef tk (Next potentialStrNext) -- If that failed another thread succeeded (no false negatives) case peekTicket tkDone of Next strNext -> return strNext _ -> error "Impossible! This should only have been Next segment" Next strNext -> return strNext