{-# LANGUAGE BangPatterns , DeriveDataTypeable, CPP , ScopedTypeVariables #-}
{-# LANGUAGE TypeSynonymInstances, FlexibleInstances #-}
module Control.Concurrent.Chan.Unagi.Unboxed.Internal
#ifdef NOT_optimised
    {-# WARNING "This library is unlikely to perform well on architectures other than i386/x64/aarch64" #-}
#endif
    (sEGMENT_LENGTH
    , UnagiPrim(..)
    , InChan(..), OutChan(..), ChanEnd(..), Cell, Stream(..), ElementArray(..), SignalIntArray
    , readElementArray, writeElementArray
    , NextSegment(..), StreamHead(..), segSource
    , newChanStarting, writeChan, readChan, readChanOnException
    , dupChan, tryReadChan
    -- for NoBlocking.Unboxed
    , moveToNextCell, waitingAdvanceStream, cellEmpty
    )
    where

-- Forked from src/Control/Concurrent/Chan/Unagi/Internal.hs at 443465. See
-- that implementation for additional details and notes which we omit here.
--
-- Internals exposed for testing and for re-use in Unagi.NoBlocking.Unboxed
--
-- TODO integration w/ ByteString
--       - we'd need to make IndexedMVar () and always write to ByteString
--       - we'd need to switch to Storable probably.
--       - exporting slices of elements as ByteString 
--         - lazy bytestring would be easiest because of segment boundaries
--         - might be tricky to do blocking checking efficiently
--       - writing bytearrays
--         - fast with memcpy
--
-- TODO MAYBE another variation:
--    Either with a single reader, or a counter that tracks readers as they
--    exit a segment (so that we know when can be manually 'free'd), allowing
--    use of unmanaged memory, and:
--         - creatable with calloc
--         - replace IORef with an unboxed 1-element array holding Addr of MutableByteArray? or something...
--         - nullPtr can be used to get us references of (Maybe a)
--            > IORef (StreamHead                                 )
--            >       Int + (Stream                               )
--            >             arr + arr + IndexedMvar + Maybe Stream
--           - We would need to move IndexedMvar into streamhead
--         - No CAS for Ptr/ForeignPtr but we can probably extract the mutablebytearray for CAS
--            Data.Primitive.ByteArray.mutableByteArrayContents ~> Addr
--             , and ForeignPtr holds an Addr# + MutableByteArray internally...
--             , use GHC.ForeignPtr and wrap MutableByteArray in PlainPtr and off to races
--         - we can re-use read segments as soon as they pass.
--
-- TODO GHC 7.10 and/or someday:
--       - use segment length of e.g. 1022 to account for MutableByteArray
--         header, then align to cache line (note: we don't really need to use
--         div/mod here; just subtraction) This could be done in all
--         implementations. (boxed arrays are: 3 + n/128 + n words?? Who knows...)
--       - use a smaller sigArr of 1024 bytes (just makes segSource a little cheaper)
--          - here use a clever fetchAndAdd to distinguish 4 different cells (+0001, vs +0100, etc)
--          - the NoBlocking can read/write to individual bytes
--       - calloc for mutableByteArray, when/if available
--       - non-temporal writes that bypass the cache? See: http://lwn.net/Articles/255364/
--       - SIMD stuff for batch writing, or zeroing, etc. etc


import Data.IORef
import Control.Exception
import Control.Monad.Primitive(RealWorld)
import Data.Atomics.Counter.Fat
import Data.Atomics
import qualified Data.Primitive as P
import Control.Monad
import Control.Applicative
import Data.Bits
import GHC.Exts(inline)
-- For instances:
import Data.Typeable(Typeable)
import Data.Int(Int8,Int16,Int32,Int64)
import Data.Word

import Utilities
import Control.Concurrent.Chan.Unagi.Constants
import qualified Control.Concurrent.Chan.Unagi.NoBlocking.Types as UT

import Prelude

-- primitive 0.7.0 got rid of Addr
#if MIN_VERSION_primitive(0,7,0)
import qualified Data.Primitive.Ptr as P
type Addr = P.Ptr Word8
nullAddr :: Addr
nullAddr :: Addr
nullAddr = Addr
forall a. Ptr a
P.nullPtr
#else
import Data.Primitive (Addr, nullAddr)
#endif

-- | The write end of a channel created with 'newChan'.
newtype InChan a = InChan (ChanEnd a)
    deriving Typeable

-- | The read end of a channel created with 'newChan'.
newtype OutChan a = OutChan (ChanEnd a)
    deriving Typeable

instance Eq (InChan a) where
    (InChan (ChanEnd AtomicCounter
_ IORef (StreamHead a)
headA)) == :: InChan a -> InChan a -> Bool
== (InChan (ChanEnd AtomicCounter
_ IORef (StreamHead a)
headB))
        = IORef (StreamHead a)
headA IORef (StreamHead a) -> IORef (StreamHead a) -> Bool
forall a. Eq a => a -> a -> Bool
== IORef (StreamHead a)
headB
instance Eq (OutChan a) where
    (OutChan (ChanEnd AtomicCounter
_ IORef (StreamHead a)
headA)) == :: OutChan a -> OutChan a -> Bool
== (OutChan (ChanEnd AtomicCounter
_ IORef (StreamHead a)
headB))
        = IORef (StreamHead a)
headA IORef (StreamHead a) -> IORef (StreamHead a) -> Bool
forall a. Eq a => a -> a -> Bool
== IORef (StreamHead a)
headB


-- InChan & OutChan are mostly identical, sharing a stream, but with
-- independent counters
data ChanEnd a = 
   ChanEnd  -- Both Chan ends must start with the same counter value.
            !AtomicCounter 
            -- the stream head; this must never point to a segment whose offset
            -- is greater than the counter value
            !(IORef (StreamHead a))
    deriving Typeable

data StreamHead a = StreamHead !Int !(Stream a)


-- The array we actually store our Prim elements in
newtype ElementArray a = ElementArray (P.MutableByteArray RealWorld)

readElementArray :: (P.Prim a)=> ElementArray a -> Int -> IO a
{-# INLINE readElementArray #-}
readElementArray :: ElementArray a -> Int -> IO a
readElementArray (ElementArray MutableByteArray RealWorld
arr) Int
i = MutableByteArray (PrimState IO) -> Int -> IO a
forall a (m :: * -> *).
(Prim a, PrimMonad m) =>
MutableByteArray (PrimState m) -> Int -> m a
P.readByteArray MutableByteArray RealWorld
MutableByteArray (PrimState IO)
arr Int
i

writeElementArray :: (P.Prim a)=> ElementArray a -> Int -> a -> IO ()
{-# INLINE writeElementArray #-}
writeElementArray :: ElementArray a -> Int -> a -> IO ()
writeElementArray (ElementArray MutableByteArray RealWorld
arr) Int
i a
a = MutableByteArray (PrimState IO) -> Int -> a -> IO ()
forall a (m :: * -> *).
(Prim a, PrimMonad m) =>
MutableByteArray (PrimState m) -> Int -> a -> m ()
P.writeByteArray MutableByteArray RealWorld
MutableByteArray (PrimState IO)
arr Int
i a
a

-- We CAS on this, using Ints to signal (see below)
type SignalIntArray = P.MutableByteArray RealWorld

-- TRANSITIONS and POSSIBLE VALUES:
--   During Read:
--     Empty   -> Blocking
--     Written
--     Blocking (only when dupChan used)
--   During Write:
--     Empty   -> Written 
--     Blocking
{-
data Cell a = Empty    -- 0
            | Written  -- 1
            | Blocking -- 2
-}
type Cell = Int
cellEmpty, cellWritten, cellBlocking :: Cell
cellEmpty :: Int
cellEmpty    = Int
0
cellWritten :: Int
cellWritten  = Int
1
cellBlocking :: Int
cellBlocking = Int
2


-- NOTE: attempts to make allocation and initialization faster via copying, or
-- other tricks failed; although a calloc was about 2x faster (but that was for
-- unmanaged memory)
segSource :: forall a. (UnagiPrim a)=> IO (SignalIntArray, ElementArray a) --ScopedTypeVariables
{-# INLINE segSource #-}
segSource :: IO (MutableByteArray RealWorld, ElementArray a)
segSource = do
    -- A largish pinned array seems like it would be the best choice here.
    MutableByteArray RealWorld
sigArr <- Int -> Int -> IO (MutableByteArray (PrimState IO))
forall (m :: * -> *).
PrimMonad m =>
Int -> Int -> m (MutableByteArray (PrimState m))
P.newAlignedPinnedByteArray 
                (Int -> Int
forall a. Prim a => a -> Int
P.sizeOf    Int
cellEmpty Int -> Int -> Int
forall a. Bits a => a -> Int -> a
`unsafeShiftL` Int
lOG_SEGMENT_LENGTH) -- times sEGMENT_LENGTH
                (Int -> Int
forall a. Prim a => a -> Int
P.alignment Int
cellEmpty)
    -- NOTE: we need these to be aligned to (some multiple of) Word boundaries
    -- for magic trick to be correct, and for assumptions about atomicity of
    -- loads/stores to hold!
    MutableByteArray RealWorld
eArr <- Int -> Int -> IO (MutableByteArray (PrimState IO))
forall (m :: * -> *).
PrimMonad m =>
Int -> Int -> m (MutableByteArray (PrimState m))
P.newAlignedPinnedByteArray 
                (a -> Int
forall a. Prim a => a -> Int
P.sizeOf (a
forall a. HasCallStack => a
undefined :: a) Int -> Int -> Int
forall a. Bits a => a -> Int -> a
`unsafeShiftL` Int
lOG_SEGMENT_LENGTH)
                (a -> Int
forall a. Prim a => a -> Int
P.alignment (a
forall a. HasCallStack => a
undefined :: a))
    MutableByteArray (PrimState IO) -> Int -> Int -> Int -> IO ()
forall a (m :: * -> *).
(Prim a, PrimMonad m) =>
MutableByteArray (PrimState m) -> Int -> Int -> a -> m ()
P.setByteArray MutableByteArray RealWorld
MutableByteArray (PrimState IO)
sigArr Int
0 Int
sEGMENT_LENGTH Int
cellEmpty
    -- If no atomicUnicorn then we always check in at sigArr, so no need to
    -- initialize eArr:
    IO () -> (a -> IO ()) -> Maybe a -> IO ()
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (() -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()) 
        (MutableByteArray (PrimState IO) -> Int -> Int -> a -> IO ()
forall a (m :: * -> *).
(Prim a, PrimMonad m) =>
MutableByteArray (PrimState m) -> Int -> Int -> a -> m ()
P.setByteArray MutableByteArray RealWorld
MutableByteArray (PrimState IO)
eArr Int
0 Int
sEGMENT_LENGTH) (Maybe a
forall a. UnagiPrim a => Maybe a
atomicUnicorn :: Maybe a)
    (MutableByteArray RealWorld, ElementArray a)
-> IO (MutableByteArray RealWorld, ElementArray a)
forall (m :: * -> *) a. Monad m => a -> m a
return (MutableByteArray RealWorld
sigArr, MutableByteArray RealWorld -> ElementArray a
forall a. MutableByteArray RealWorld -> ElementArray a
ElementArray MutableByteArray RealWorld
eArr)
    -- NOTE: We always CAS this into place which provides write barrier, such
    -- that arrays are fully initialized before they can be read. No
    -- corresponding barrier is needed in waitingAdvanceStream.


-- | Our class of types supporting primitive array operations. Instance method
-- definitions are architecture-dependent.
class (P.Prim a, Eq a)=> UnagiPrim a where
    -- | When the read and write operations of the underlying @Prim@ instances
    -- on aligned memory are atomic, this may be set to @Just x@ where @x@ is
    -- some rare (i.e.  unlikely to occur frequently in your data) magic value;
    -- this might help speed up some @UnagiPrim@ operations.
    --
    -- Where those 'Prim' instance operations are not atomic, this *must* be
    -- set to @Nothing@.
    atomicUnicorn :: Maybe a
    atomicUnicorn = Maybe a
forall a. Maybe a
Nothing


-- These ought all to be atomic for 32-bit or 64-bit systems:
instance UnagiPrim Char where
    atomicUnicorn :: Maybe Char
atomicUnicorn = Char -> Maybe Char
forall a. a -> Maybe a
Just Char
'\1010101'
instance UnagiPrim Float where
    atomicUnicorn :: Maybe Float
atomicUnicorn = Float -> Maybe Float
forall a. a -> Maybe a
Just Float
0xDADADA
instance UnagiPrim Int where
    atomicUnicorn :: Maybe Int
atomicUnicorn = Int -> Maybe Int
forall a. a -> Maybe a
Just Int
0xDADADA
instance UnagiPrim Int8 where
    atomicUnicorn :: Maybe Int8
atomicUnicorn = Int8 -> Maybe Int8
forall a. a -> Maybe a
Just Int8
113
instance UnagiPrim Int16 where
    atomicUnicorn :: Maybe Int16
atomicUnicorn = Int16 -> Maybe Int16
forall a. a -> Maybe a
Just Int16
0xDAD
instance UnagiPrim Int32 where
    atomicUnicorn :: Maybe Int32
atomicUnicorn = Int32 -> Maybe Int32
forall a. a -> Maybe a
Just Int32
0xDADADA
instance UnagiPrim Word where
    atomicUnicorn :: Maybe Word
atomicUnicorn = Word -> Maybe Word
forall a. a -> Maybe a
Just Word
0xDADADA
instance UnagiPrim Word8 where
    atomicUnicorn :: Maybe Word8
atomicUnicorn = Word8 -> Maybe Word8
forall a. a -> Maybe a
Just Word8
0xDA
instance UnagiPrim Word16 where
    atomicUnicorn :: Maybe Word16
atomicUnicorn = Word16 -> Maybe Word16
forall a. a -> Maybe a
Just Word16
0xDADA
instance UnagiPrim Word32 where
    atomicUnicorn :: Maybe Word32
atomicUnicorn = Word32 -> Maybe Word32
forall a. a -> Maybe a
Just Word32
0xDADADADA
instance UnagiPrim Addr where
    atomicUnicorn :: Maybe Addr
atomicUnicorn = Addr -> Maybe Addr
forall a. a -> Maybe a
Just Addr
nullAddr
-- These should conservatively be expected to be atomic only on 64-bit
-- machines:
instance UnagiPrim Int64 where
#ifdef IS_64_BIT
    atomicUnicorn :: Maybe Int64
atomicUnicorn = Int64 -> Maybe Int64
forall a. a -> Maybe a
Just Int64
0xDADADADADADA
#endif
instance UnagiPrim Word64 where
#ifdef IS_64_BIT
    atomicUnicorn :: Maybe Word64
atomicUnicorn = Word64 -> Maybe Word64
forall a. a -> Maybe a
Just Word64
0xDADADADADADA
#endif
instance UnagiPrim Double where
#ifdef IS_64_BIT
    atomicUnicorn :: Maybe Double
atomicUnicorn = Double -> Maybe Double
forall a. a -> Maybe a
Just Double
0xDADADADADADA
#endif

-- NOTE: we tried combining the SignalIntArray and ElementArray into a single
-- bytearray in the unagi-unboxed-combined-bytearray branch but saw no
-- significant improvement.
data Stream a = 
    Stream !SignalIntArray
           !(ElementArray a)
           -- For coordinating blocking between reader/writer; NOTE [1]
           (IndexedMVar a) -- N.B. must remain non-strict for NoBlocking.Unboxed
           -- The next segment in the stream; NOTE [2] 
           !(IORef (NextSegment a))
  -- [1] An important property: we can switch out this implementation as long
  -- as it utilizes a fresh MVar for each reader/writer pair.
  --
  -- [2] new segments are allocated and put here as we go, with threads
  -- cooperating to allocate new segments:

data NextSegment a = NoSegment | Next !(Stream a)

-- we expose `startingCellOffset` for debugging correct behavior with overflow:
newChanStarting :: UnagiPrim a=> Int -> IO (InChan a, OutChan a)
{-# INLINE newChanStarting #-}
newChanStarting :: Int -> IO (InChan a, OutChan a)
newChanStarting !Int
startingCellOffset = do
    (MutableByteArray RealWorld
sigArr0,ElementArray a
eArr0) <- IO (MutableByteArray RealWorld, ElementArray a)
forall a.
UnagiPrim a =>
IO (MutableByteArray RealWorld, ElementArray a)
segSource
    Stream a
stream <- MutableByteArray RealWorld
-> ElementArray a
-> IndexedMVar a
-> IORef (NextSegment a)
-> Stream a
forall a.
MutableByteArray RealWorld
-> ElementArray a
-> IndexedMVar a
-> IORef (NextSegment a)
-> Stream a
Stream MutableByteArray RealWorld
sigArr0 ElementArray a
eArr0 (IndexedMVar a -> IORef (NextSegment a) -> Stream a)
-> IO (IndexedMVar a) -> IO (IORef (NextSegment a) -> Stream a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO (IndexedMVar a)
forall a. IO (IndexedMVar a)
newIndexedMVar IO (IORef (NextSegment a) -> Stream a)
-> IO (IORef (NextSegment a)) -> IO (Stream a)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> NextSegment a -> IO (IORef (NextSegment a))
forall a. a -> IO (IORef a)
newIORef NextSegment a
forall a. NextSegment a
NoSegment
    let end :: IO (ChanEnd a)
end = AtomicCounter -> IORef (StreamHead a) -> ChanEnd a
forall a. AtomicCounter -> IORef (StreamHead a) -> ChanEnd a
ChanEnd
                  (AtomicCounter -> IORef (StreamHead a) -> ChanEnd a)
-> IO AtomicCounter -> IO (IORef (StreamHead a) -> ChanEnd a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Int -> IO AtomicCounter
newCounter Int
startingCellOffset
                  IO (IORef (StreamHead a) -> ChanEnd a)
-> IO (IORef (StreamHead a)) -> IO (ChanEnd a)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> StreamHead a -> IO (IORef (StreamHead a))
forall a. a -> IO (IORef a)
newIORef (Int -> Stream a -> StreamHead a
forall a. Int -> Stream a -> StreamHead a
StreamHead Int
startingCellOffset Stream a
stream)
    (InChan a -> OutChan a -> (InChan a, OutChan a))
-> IO (InChan a) -> IO (OutChan a) -> IO (InChan a, OutChan a)
forall (f :: * -> *) a b c.
Applicative f =>
(a -> b -> c) -> f a -> f b -> f c
liftA2 (,) (ChanEnd a -> InChan a
forall a. ChanEnd a -> InChan a
InChan (ChanEnd a -> InChan a) -> IO (ChanEnd a) -> IO (InChan a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO (ChanEnd a)
end) (ChanEnd a -> OutChan a
forall a. ChanEnd a -> OutChan a
OutChan (ChanEnd a -> OutChan a) -> IO (ChanEnd a) -> IO (OutChan a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO (ChanEnd a)
end)


-- | Duplicate a chan: the returned @OutChan@ begins empty, but data written to
-- the argument @InChan@ from then on will be available from both the original
-- @OutChan@ and the one returned here, creating a kind of broadcast channel.
dupChan :: InChan a -> IO (OutChan a)
{-# INLINE dupChan #-}
dupChan :: InChan a -> IO (OutChan a)
dupChan (InChan (ChanEnd AtomicCounter
counter IORef (StreamHead a)
streamHead)) = do
    StreamHead a
hLoc <- IORef (StreamHead a) -> IO (StreamHead a)
forall a. IORef a -> IO a
readIORef IORef (StreamHead a)
streamHead
    IO ()
loadLoadBarrier
    Int
wCount <- AtomicCounter -> IO Int
readCounter AtomicCounter
counter
    ChanEnd a -> OutChan a
forall a. ChanEnd a -> OutChan a
OutChan (ChanEnd a -> OutChan a) -> IO (ChanEnd a) -> IO (OutChan a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (AtomicCounter -> IORef (StreamHead a) -> ChanEnd a
forall a. AtomicCounter -> IORef (StreamHead a) -> ChanEnd a
ChanEnd (AtomicCounter -> IORef (StreamHead a) -> ChanEnd a)
-> IO AtomicCounter -> IO (IORef (StreamHead a) -> ChanEnd a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Int -> IO AtomicCounter
newCounter Int
wCount IO (IORef (StreamHead a) -> ChanEnd a)
-> IO (IORef (StreamHead a)) -> IO (ChanEnd a)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> StreamHead a -> IO (IORef (StreamHead a))
forall a. a -> IO (IORef a)
newIORef StreamHead a
hLoc)


-- | Write a value to the channel.
writeChan :: UnagiPrim a=> InChan a -> a -> IO ()
{-# INLINE writeChan #-}
writeChan :: InChan a -> a -> IO ()
writeChan (InChan ChanEnd a
ce) = \a
a-> IO () -> IO ()
forall a. IO a -> IO a
mask_ (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do 
    (Int
segIx, (Stream MutableByteArray RealWorld
sigArr ElementArray a
eArr IndexedMVar a
mvarIndexed IORef (NextSegment a)
next), IO ()
maybeUpdateStreamHead) <- ChanEnd a -> IO (Int, Stream a, IO ())
forall a. UnagiPrim a => ChanEnd a -> IO (Int, Stream a, IO ())
moveToNextCell ChanEnd a
ce
    IO ()
maybeUpdateStreamHead
    -- NOTE!: must write element before signaling with CAS:
    ElementArray a -> Int -> a -> IO ()
forall a. Prim a => ElementArray a -> Int -> a -> IO ()
writeElementArray ElementArray a
eArr Int
segIx a
a
    Int
actuallyWas <- MutableByteArray RealWorld -> Int -> Int -> Int -> IO Int
casByteArrayInt MutableByteArray RealWorld
sigArr Int
segIx Int
cellEmpty Int
cellWritten -- NOTE[1]
    -- try to pre-allocate next segment:
    Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
segIx Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ IO (Stream a) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Stream a) -> IO ()) -> IO (Stream a) -> IO ()
forall a b. (a -> b) -> a -> b
$
      IORef (NextSegment a) -> Int -> IO (Stream a)
forall a.
UnagiPrim a =>
IORef (NextSegment a) -> Int -> IO (Stream a)
waitingAdvanceStream IORef (NextSegment a)
next Int
0
    case Int
actuallyWas of
         -- CAS SUCCEEDED: --
         Int
0 {- Empty -} -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
         -- CAS FAILED: --
         Int
2 {- Blocking -} -> IndexedMVar a -> Int -> a -> IO ()
forall a. IndexedMVar a -> Int -> a -> IO ()
putMVarIx IndexedMVar a
mvarIndexed Int
segIx a
a

         Int
1 {- Written -} -> [Char] -> IO ()
forall a. HasCallStack => [Char] -> a
error [Char]
"Nearly Impossible! Expected Blocking"
         Int
_ -> [Char] -> IO ()
forall a. HasCallStack => [Char] -> a
error [Char]
"Invalid signal seen in writeChan!"
  -- [1] casByteArrayInt provides the write barrier we need here to make sure
  -- GHC maintains our ordering such that the element is written before we
  -- signal its availability with the CAS to sigArr that follows. See [2] in
  -- readSegIxUnmasked.


-- Core of blocking read functions, taking handler and output of moveToNextCell
readSegIxUnmasked :: UnagiPrim a=> (IO a -> IO a) -> (Int, Stream a, IO ()) -> IO a
{-# INLINE readSegIxUnmasked #-}
readSegIxUnmasked :: (IO a -> IO a) -> (Int, Stream a, IO ()) -> IO a
readSegIxUnmasked IO a -> IO a
h =
  \(Int
segIx, (Stream MutableByteArray RealWorld
sigArr ElementArray a
eArr IndexedMVar a
mvarIndexed IORef (NextSegment a)
_), IO ()
maybeUpdateStreamHead)-> do
    IO ()
maybeUpdateStreamHead
    let readBlocking :: IO a
readBlocking = (IO a -> IO a) -> IO a -> IO a
forall a. a -> a
inline IO a -> IO a
h (IO a -> IO a) -> IO a -> IO a
forall a b. (a -> b) -> a -> b
$ IndexedMVar a -> Int -> IO a
forall a. IndexedMVar a -> Int -> IO a
readMVarIx IndexedMVar a
mvarIndexed Int
segIx    -- NOTE [1]
        readElem :: IO a
readElem = ElementArray a -> Int -> IO a
forall a. Prim a => ElementArray a -> Int -> IO a
readElementArray ElementArray a
eArr Int
segIx
        slowRead :: IO a
slowRead = do 
           -- Assume probably blocking (Note: casByteArrayInt is a full barrier)
           Int
actuallyWas <- MutableByteArray RealWorld -> Int -> Int -> Int -> IO Int
casByteArrayInt MutableByteArray RealWorld
sigArr Int
segIx Int
cellEmpty Int
cellBlocking -- NOTE [2]
           case Int
actuallyWas of
                -- succeeded writing Empty; proceed with blocking
                Int
0 {- Empty -} -> IO a
readBlocking
                -- else in the meantime, writer wrote
                Int
1 {- Written -} -> IO a
readElem
                -- else in the meantime a dupChan reader read, blocking
                Int
2 {- Blocking -} -> IO a
readBlocking
                Int
_ -> [Char] -> IO a
forall a. HasCallStack => [Char] -> a
error [Char]
"Invalid signal seen in readSegIxUnmasked!"
    -- If we know writes of this element are atomic, we can determine if the
    -- element has been written, and possibly return it without consulting
    -- sigArr.
    case Maybe a
forall a. UnagiPrim a => Maybe a
atomicUnicorn of
         Just a
magic -> do
            a
el <- IO a
readElem
            if (a
el a -> a -> Bool
forall a. Eq a => a -> a -> Bool
/= a
magic) 
              -- We know `el` was atomically written:
              then a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return a
el
              else IO a
slowRead
         Maybe a
Nothing -> IO a
slowRead
  -- [1] we must use `readMVarIx` here to support `dupChan`. It's also
  -- important that the behavior of readMVarIx be identical to a readMVar on
  -- the same MVar.
  --
  -- [2] casByteArrayInt provides the loadLoadBarrier we need here. See [1] in
  -- writeChan.




-- | Returns immediately with:
--
--  - an @'UT.Element' a@ future, which returns one unique element when it
--  becomes available via 'UT.tryRead'.
--
--  - a blocking @IO@ action that returns the element when it becomes available.
--
-- /Note/: This is a destructive operation. See 'UT.Element' for more details.
--
-- If you're using this function exclusively you might find the implementation
-- in "Control.Concurrent.Chan.Unagi.NoBlocking.Unboxed" is faster.
--
-- /Note re. exceptions/: When an async exception is raised during a @tryReadChan@ 
-- the message that the read would have returned is likely to be lost, just as
-- it would be when raised directly after this function returns.
tryReadChan :: UnagiPrim a=> OutChan a -> IO (UT.Element a, IO a)
{-# INLINE tryReadChan #-}
tryReadChan :: OutChan a -> IO (Element a, IO a)
tryReadChan (OutChan ChanEnd a
ce) = do -- no masking needed
-- NOTE: implementation adapted from readSegIxUnmasked:
    segStuff :: (Int, Stream a, IO ())
segStuff@(Int
segIx, (Stream MutableByteArray RealWorld
sigArr ElementArray a
eArr IndexedMVar a
mvarIndexed IORef (NextSegment a)
_), IO ()
maybeUpdateStreamHead) <- ChanEnd a -> IO (Int, Stream a, IO ())
forall a. UnagiPrim a => ChanEnd a -> IO (Int, Stream a, IO ())
moveToNextCell ChanEnd a
ce
    IO ()
maybeUpdateStreamHead
    let readElem :: IO a
readElem = ElementArray a -> Int -> IO a
forall a. Prim a => ElementArray a -> Int -> IO a
readElementArray ElementArray a
eArr Int
segIx
        slowRead :: IO (Maybe a)
slowRead = do 
           Int
sig <- MutableByteArray (PrimState IO) -> Int -> IO Int
forall a (m :: * -> *).
(Prim a, PrimMonad m) =>
MutableByteArray (PrimState m) -> Int -> m a
P.readByteArray MutableByteArray RealWorld
MutableByteArray (PrimState IO)
sigArr Int
segIx
           case (Int
sig :: Int) of
                Int
0 {- Empty -} -> Maybe a -> IO (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
forall a. Maybe a
Nothing
                Int
1 {- Written -} -> IO ()
loadLoadBarrier  IO () -> IO (Maybe a) -> IO (Maybe a)
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>>  a -> Maybe a
forall a. a -> Maybe a
Just (a -> Maybe a) -> IO a -> IO (Maybe a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO a
readElem
                Int
2 {- Blocking -} -> IndexedMVar a -> Int -> IO (Maybe a)
forall a. IndexedMVar a -> Int -> IO (Maybe a)
tryReadMVarIx IndexedMVar a
mvarIndexed Int
segIx
                Int
_ -> [Char] -> IO (Maybe a)
forall a. HasCallStack => [Char] -> a
error [Char]
"Invalid signal seen in tryReadChan!"
    (Element a, IO a) -> IO (Element a, IO a)
forall (m :: * -> *) a. Monad m => a -> m a
return ( 
        IO (Maybe a) -> Element a
forall a. IO (Maybe a) -> Element a
UT.Element (IO (Maybe a) -> Element a) -> IO (Maybe a) -> Element a
forall a b. (a -> b) -> a -> b
$
          case Maybe a
forall a. UnagiPrim a => Maybe a
atomicUnicorn of
             Just a
magic -> do
                a
el <- IO a
readElem
                if (a
el a -> a -> Bool
forall a. Eq a => a -> a -> Bool
/= a
magic) 
                  then Maybe a -> IO (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe a -> IO (Maybe a)) -> Maybe a -> IO (Maybe a)
forall a b. (a -> b) -> a -> b
$ a -> Maybe a
forall a. a -> Maybe a
Just a
el
                  else IO (Maybe a)
slowRead
             Maybe a
Nothing -> IO (Maybe a)
slowRead

      , (IO a -> IO a) -> (Int, Stream a, IO ()) -> IO a
forall a.
UnagiPrim a =>
(IO a -> IO a) -> (Int, Stream a, IO ()) -> IO a
readSegIxUnmasked IO a -> IO a
forall a. a -> a
id (Int, Stream a, IO ())
segStuff
      )


-- | Read an element from the chan, blocking if the chan is empty.
--
-- /Note re. exceptions/: When an async exception is raised during a @readChan@ 
-- the message that the read would have returned is likely to be lost, even when
-- the read is known to be blocked on an empty queue. If you need to handle
-- this scenario, you can use 'readChanOnException'.
readChan :: UnagiPrim a=> OutChan a -> IO a
{-# INLINE readChan #-}
readChan :: OutChan a -> IO a
readChan = \(OutChan ChanEnd a
ce)-> ChanEnd a -> IO (Int, Stream a, IO ())
forall a. UnagiPrim a => ChanEnd a -> IO (Int, Stream a, IO ())
moveToNextCell ChanEnd a
ce IO (Int, Stream a, IO ())
-> ((Int, Stream a, IO ()) -> IO a) -> IO a
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (IO a -> IO a) -> (Int, Stream a, IO ()) -> IO a
forall a.
UnagiPrim a =>
(IO a -> IO a) -> (Int, Stream a, IO ()) -> IO a
readSegIxUnmasked IO a -> IO a
forall a. a -> a
id

-- | Like 'readChan' but allows recovery of the queue element which would have
-- been read, in the case that an async exception is raised during the read. To
-- be precise exceptions are raised, and the handler run, only when
-- @readChanOnException@ is blocking.
--
-- The second argument is a handler that takes a blocking IO action returning
-- the element, and performs some recovery action.  When the handler is called,
-- the passed @IO a@ is the only way to access the element.
readChanOnException :: UnagiPrim a=> OutChan a -> (IO a -> IO ()) -> IO a
{-# INLINE readChanOnException #-}
readChanOnException :: OutChan a -> (IO a -> IO ()) -> IO a
readChanOnException (OutChan ChanEnd a
ce) IO a -> IO ()
h = IO a -> IO a
forall a. IO a -> IO a
mask_ (IO a -> IO a) -> IO a -> IO a
forall a b. (a -> b) -> a -> b
$ 
    ChanEnd a -> IO (Int, Stream a, IO ())
forall a. UnagiPrim a => ChanEnd a -> IO (Int, Stream a, IO ())
moveToNextCell ChanEnd a
ce IO (Int, Stream a, IO ())
-> ((Int, Stream a, IO ()) -> IO a) -> IO a
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>=
      (IO a -> IO a) -> (Int, Stream a, IO ()) -> IO a
forall a.
UnagiPrim a =>
(IO a -> IO a) -> (Int, Stream a, IO ()) -> IO a
readSegIxUnmasked (\IO a
io-> IO a
io IO a -> IO () -> IO a
forall a b. IO a -> IO b -> IO a
`onException` (IO a -> IO ()
h IO a
io))



-- increments counter, finds stream segment of corresponding cell (updating the
-- stream head pointer as needed), and returns the stream segment and relative
-- index of our cell.
moveToNextCell :: UnagiPrim a=> ChanEnd a -> IO (Int, Stream a, IO ())
{-# INLINE moveToNextCell #-}
moveToNextCell :: ChanEnd a -> IO (Int, Stream a, IO ())
moveToNextCell (ChanEnd AtomicCounter
counter IORef (StreamHead a)
streamHead) = do
    (StreamHead Int
offset0 Stream a
str0) <- IORef (StreamHead a) -> IO (StreamHead a)
forall a. IORef a -> IO a
readIORef IORef (StreamHead a)
streamHead
    Int
ix <- Int -> AtomicCounter -> IO Int
incrCounter Int
1 AtomicCounter
counter
    let (Int
segsAway, Int
segIx) = Bool -> (Int, Int) -> (Int, Int)
forall a. HasCallStack => Bool -> a -> a
assert ((Int
ix Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
offset0) Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
0) ((Int, Int) -> (Int, Int)) -> (Int, Int) -> (Int, Int)
forall a b. (a -> b) -> a -> b
$ 
                 Int -> (Int, Int)
divMod_sEGMENT_LENGTH (Int -> (Int, Int)) -> Int -> (Int, Int)
forall a b. (a -> b) -> a -> b
$! (Int
ix Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
offset0)
              -- (ix - offset0) `quotRem` sEGMENT_LENGTH
        {-# INLINE go #-}
        go :: t -> Stream a -> IO (Stream a)
go t
0 Stream a
str = Stream a -> IO (Stream a)
forall (m :: * -> *) a. Monad m => a -> m a
return Stream a
str
        go !t
n (Stream MutableByteArray RealWorld
_ ElementArray a
_ IndexedMVar a
_ IORef (NextSegment a)
next) =
            IORef (NextSegment a) -> Int -> IO (Stream a)
forall a.
UnagiPrim a =>
IORef (NextSegment a) -> Int -> IO (Stream a)
waitingAdvanceStream IORef (NextSegment a)
next (Int
nEW_SEGMENT_WAITInt -> Int -> Int
forall a. Num a => a -> a -> a
*Int
segIx)
              IO (Stream a) -> (Stream a -> IO (Stream a)) -> IO (Stream a)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= t -> Stream a -> IO (Stream a)
go (t
nt -> t -> t
forall a. Num a => a -> a -> a
-t
1)
    Stream a
str <- Int -> Stream a -> IO (Stream a)
forall t a.
(Num t, UnagiPrim a, Eq t) =>
t -> Stream a -> IO (Stream a)
go Int
segsAway Stream a
str0
    -- We need to return this continuation here for NoBlocking.Unboxed, which
    -- needs to perform this action at different points in the reader and
    -- writer.
    let !maybeUpdateStreamHead :: IO ()
maybeUpdateStreamHead = do
          Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
segsAway Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
            let !offsetN :: Int
offsetN = 
                  Int
offset0 Int -> Int -> Int
forall a. Num a => a -> a -> a
+ (Int
segsAway Int -> Int -> Int
forall a. Bits a => a -> Int -> a
`unsafeShiftL` Int
lOG_SEGMENT_LENGTH) --(segsAway*sEGMENT_LENGTH)
            IORef (StreamHead a) -> StreamHead a -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef (StreamHead a)
streamHead (StreamHead a -> IO ()) -> StreamHead a -> IO ()
forall a b. (a -> b) -> a -> b
$ Int -> Stream a -> StreamHead a
forall a. Int -> Stream a -> StreamHead a
StreamHead Int
offsetN Stream a
str
          IORef (StreamHead a) -> IO ()
forall a. IORef a -> IO ()
touchIORef IORef (StreamHead a)
streamHead -- NOTE [1]
    (Int, Stream a, IO ()) -> IO (Int, Stream a, IO ())
forall (m :: * -> *) a. Monad m => a -> m a
return (Int
segIx,Stream a
str, IO ()
maybeUpdateStreamHead)
  -- [1] For NoBlocking.Unboxed: this helps ensure that streamHead is not GC'd
  -- until `maybeUpdateStreamHead` is run in calling function. For correctness
  -- of `isActive`.


-- thread-safely try to fill `nextSegRef` at the next offset with a new
-- segment, waiting some number of iterations (for other threads to handle it).
-- Returns nextSegRef's StreamSegment.
waitingAdvanceStream :: (UnagiPrim a)=> IORef (NextSegment a) -> Int -> IO (Stream a)
waitingAdvanceStream :: IORef (NextSegment a) -> Int -> IO (Stream a)
waitingAdvanceStream IORef (NextSegment a)
nextSegRef = Int -> IO (Stream a)
forall t. (Ord t, Num t) => t -> IO (Stream a)
go where
  go :: t -> IO (Stream a)
go !t
wait = Bool -> IO (Stream a) -> IO (Stream a)
forall a. HasCallStack => Bool -> a -> a
assert (t
wait t -> t -> Bool
forall a. Ord a => a -> a -> Bool
>= t
0) (IO (Stream a) -> IO (Stream a)) -> IO (Stream a) -> IO (Stream a)
forall a b. (a -> b) -> a -> b
$ do
    Ticket (NextSegment a)
tk <- IORef (NextSegment a) -> IO (Ticket (NextSegment a))
forall a. IORef a -> IO (Ticket a)
readForCAS IORef (NextSegment a)
nextSegRef
    case Ticket (NextSegment a) -> NextSegment a
forall a. Ticket a -> a
peekTicket Ticket (NextSegment a)
tk of
         NextSegment a
NoSegment 
           | t
wait t -> t -> Bool
forall a. Ord a => a -> a -> Bool
> t
0 -> t -> IO (Stream a)
go (t
wait t -> t -> t
forall a. Num a => a -> a -> a
- t
1)
             -- Create a potential next segment and try to insert it:
           | Bool
otherwise -> do 
               Stream a
potentialStrNext <- (MutableByteArray RealWorld
 -> ElementArray a
 -> IndexedMVar a
 -> IORef (NextSegment a)
 -> Stream a)
-> (MutableByteArray RealWorld, ElementArray a)
-> IndexedMVar a
-> IORef (NextSegment a)
-> Stream a
forall a b c. (a -> b -> c) -> (a, b) -> c
uncurry MutableByteArray RealWorld
-> ElementArray a
-> IndexedMVar a
-> IORef (NextSegment a)
-> Stream a
forall a.
MutableByteArray RealWorld
-> ElementArray a
-> IndexedMVar a
-> IORef (NextSegment a)
-> Stream a
Stream 
                                            ((MutableByteArray RealWorld, ElementArray a)
 -> IndexedMVar a -> IORef (NextSegment a) -> Stream a)
-> IO (MutableByteArray RealWorld, ElementArray a)
-> IO (IndexedMVar a -> IORef (NextSegment a) -> Stream a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO (MutableByteArray RealWorld, ElementArray a)
forall a.
UnagiPrim a =>
IO (MutableByteArray RealWorld, ElementArray a)
segSource 
                                            IO (IndexedMVar a -> IORef (NextSegment a) -> Stream a)
-> IO (IndexedMVar a) -> IO (IORef (NextSegment a) -> Stream a)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> IO (IndexedMVar a)
forall a. IO (IndexedMVar a)
newIndexedMVar
                                            IO (IORef (NextSegment a) -> Stream a)
-> IO (IORef (NextSegment a)) -> IO (Stream a)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> NextSegment a -> IO (IORef (NextSegment a))
forall a. a -> IO (IORef a)
newIORef NextSegment a
forall a. NextSegment a
NoSegment
               (Bool
_,Ticket (NextSegment a)
tkDone) <- IORef (NextSegment a)
-> Ticket (NextSegment a)
-> NextSegment a
-> IO (Bool, Ticket (NextSegment a))
forall a. IORef a -> Ticket a -> a -> IO (Bool, Ticket a)
casIORef IORef (NextSegment a)
nextSegRef Ticket (NextSegment a)
tk (Stream a -> NextSegment a
forall a. Stream a -> NextSegment a
Next Stream a
potentialStrNext)
               -- If that failed another thread succeeded (no false negatives)
               case Ticket (NextSegment a) -> NextSegment a
forall a. Ticket a -> a
peekTicket Ticket (NextSegment a)
tkDone of
                 Next Stream a
strNext -> Stream a -> IO (Stream a)
forall (m :: * -> *) a. Monad m => a -> m a
return Stream a
strNext
                 NextSegment a
_ -> [Char] -> IO (Stream a)
forall a. HasCallStack => [Char] -> a
error [Char]
"Impossible! This should only have been Next segment"
         Next Stream a
strNext -> Stream a -> IO (Stream a)
forall (m :: * -> *) a. Monad m => a -> m a
return Stream a
strNext