{-# LANGUAGE BangPatterns , DeriveDataTypeable, CPP #-}
module Control.Concurrent.Chan.Unagi.Internal
#ifdef NOT_optimised
    {-# WARNING "This library is unlikely to perform well on architectures other than i386/x64/aarch64" #-}
#endif
    (sEGMENT_LENGTH
    , InChan(..), OutChan(..), ChanEnd(..), StreamSegment, Cell(..), Stream(..)
    , NextSegment(..), StreamHead(..)
    , newChanStarting, writeChan, readChan, readChanOnException
    , dupChan, tryReadChan
    -- For Unagi.NoBlocking:
    , moveToNextCell, waitingAdvanceStream, newSegmentSource
    -- sanity tests:
    , assertionCanary
    )
    where

-- Internals exposed for testing.

import Control.Concurrent.MVar
import Data.IORef
import Control.Exception
import Control.Monad.Primitive(RealWorld)
import Data.Atomics.Counter.Fat
import Data.Atomics
import qualified Data.Primitive as P
import Control.Monad
import Control.Applicative
import Data.Bits
import Data.Typeable(Typeable)
import GHC.Exts(inline)

import Control.Concurrent.Chan.Unagi.Constants
import qualified Control.Concurrent.Chan.Unagi.NoBlocking.Types as UT
import Utilities(touchIORef)

import Prelude

-- | The write end of a channel created with 'newChan'.
data InChan a = InChan !(Ticket (Cell a)) !(ChanEnd (Cell a))
    deriving Typeable

instance Eq (InChan a) where
    (InChan Ticket (Cell a)
_ (ChanEnd SegSource (Cell a)
_ AtomicCounter
_ IORef (StreamHead (Cell a))
headA)) == :: InChan a -> InChan a -> Bool
== (InChan Ticket (Cell a)
_ (ChanEnd SegSource (Cell a)
_ AtomicCounter
_ IORef (StreamHead (Cell a))
headB))
        = IORef (StreamHead (Cell a))
headA IORef (StreamHead (Cell a)) -> IORef (StreamHead (Cell a)) -> Bool
forall a. Eq a => a -> a -> Bool
== IORef (StreamHead (Cell a))
headB

-- | The read end of a channel created with 'newChan'.
newtype OutChan a = OutChan (ChanEnd (Cell a))
    deriving (OutChan a -> OutChan a -> Bool
(OutChan a -> OutChan a -> Bool)
-> (OutChan a -> OutChan a -> Bool) -> Eq (OutChan a)
forall a. OutChan a -> OutChan a -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: OutChan a -> OutChan a -> Bool
$c/= :: forall a. OutChan a -> OutChan a -> Bool
== :: OutChan a -> OutChan a -> Bool
$c== :: forall a. OutChan a -> OutChan a -> Bool
Eq,Typeable)


-- TODO POTENTIAL CPP FLAGS (or functions)
--   - Strict element (or lazy? maybe also expose a writeChan' when relevant?)
--   - sEGMENT_LENGTH
--   - reads that clear the element immediately (or export as a special function?)

-- InChan & OutChan are mostly identical, sharing a stream, but with
-- independent counters.
--
--   NOTE: we parameterize this, and its child types, by `cell_a` (instantiated
--   to `Cell a` in this module) instead of `a` so that we can use
--   `moveToNextCell`, `waitingAdvanceStream`, and `newSegmentSource` and all
--   the types below in Unagi.NoBlocking, which uses a different type `Cell a`;
--   Sorry!
data ChanEnd cell_a = 
           -- an efficient producer of segments of length sEGMENT_LENGTH:
    ChanEnd !(SegSource cell_a)
            -- Both Chan ends must start with the same counter value.
            !AtomicCounter 
            -- the stream head; this must never point to a segment whose offset
            -- is greater than the counter value
            !(IORef (StreamHead cell_a))
    deriving Typeable

instance Eq (ChanEnd a) where
     (ChanEnd SegSource a
_ AtomicCounter
_ IORef (StreamHead a)
headA) == :: ChanEnd a -> ChanEnd a -> Bool
== (ChanEnd SegSource a
_ AtomicCounter
_ IORef (StreamHead a)
headB)
        = IORef (StreamHead a)
headA IORef (StreamHead a) -> IORef (StreamHead a) -> Bool
forall a. Eq a => a -> a -> Bool
== IORef (StreamHead a)
headB


data StreamHead cell_a = StreamHead !Int !(Stream cell_a)

--TODO later see if we get a benefit from the small array primops in 7.10,
--     which omit card-marking overhead and might have faster clone.
type StreamSegment cell_a = P.MutableArray RealWorld cell_a

-- TRANSITIONS and POSSIBLE VALUES:
--   During Read:
--     Empty   -> Blocking
--     Written
--     Blocking (only when dupChan used)
--   During Write:
--     Empty   -> Written 
--     Blocking
data Cell a = Empty | Written a | Blocking !(MVar a)


-- NOTE In general we'll have two segments allocated at any given time in
-- addition to the segment template, so in the worst case, when the program
-- exits we will have allocated ~ 3 segments extra memory than was actually
-- required.

data Stream cell_a = 
    Stream !(StreamSegment cell_a)
           -- The next segment in the stream; new segments are allocated and
           -- put here as we go, with threads cooperating to allocate new
           -- segments:
           !(IORef (NextSegment cell_a))

data NextSegment cell_a = NoSegment | Next !(Stream cell_a)

-- we expose `startingCellOffset` for debugging correct behavior with overflow:
newChanStarting :: Int -> IO (InChan a, OutChan a)
{-# INLINE newChanStarting #-}
newChanStarting :: Int -> IO (InChan a, OutChan a)
newChanStarting !Int
startingCellOffset = do
    SegSource (Cell a)
segSource <- Cell a -> IO (SegSource (Cell a))
forall cell_a. cell_a -> IO (SegSource cell_a)
newSegmentSource Cell a
forall a. Cell a
Empty
    StreamSegment (Cell a)
firstSeg <- SegSource (Cell a)
segSource
    -- collect a ticket to save for writer CAS
    Ticket (Cell a)
savedEmptyTkt <- StreamSegment (Cell a) -> Int -> IO (Ticket (Cell a))
forall a. MutableArray RealWorld a -> Int -> IO (Ticket a)
readArrayElem StreamSegment (Cell a)
firstSeg Int
0
    Stream (Cell a)
stream <- StreamSegment (Cell a)
-> IORef (NextSegment (Cell a)) -> Stream (Cell a)
forall cell_a.
StreamSegment cell_a -> IORef (NextSegment cell_a) -> Stream cell_a
Stream StreamSegment (Cell a)
firstSeg (IORef (NextSegment (Cell a)) -> Stream (Cell a))
-> IO (IORef (NextSegment (Cell a))) -> IO (Stream (Cell a))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> NextSegment (Cell a) -> IO (IORef (NextSegment (Cell a)))
forall a. a -> IO (IORef a)
newIORef NextSegment (Cell a)
forall cell_a. NextSegment cell_a
NoSegment
    let end :: IO (ChanEnd (Cell a))
end = SegSource (Cell a)
-> AtomicCounter -> IORef (StreamHead (Cell a)) -> ChanEnd (Cell a)
forall cell_a.
SegSource cell_a
-> AtomicCounter -> IORef (StreamHead cell_a) -> ChanEnd cell_a
ChanEnd SegSource (Cell a)
segSource 
                  (AtomicCounter -> IORef (StreamHead (Cell a)) -> ChanEnd (Cell a))
-> IO AtomicCounter
-> IO (IORef (StreamHead (Cell a)) -> ChanEnd (Cell a))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Int -> IO AtomicCounter
newCounter Int
startingCellOffset
                  IO (IORef (StreamHead (Cell a)) -> ChanEnd (Cell a))
-> IO (IORef (StreamHead (Cell a))) -> IO (ChanEnd (Cell a))
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> StreamHead (Cell a) -> IO (IORef (StreamHead (Cell a)))
forall a. a -> IO (IORef a)
newIORef (Int -> Stream (Cell a) -> StreamHead (Cell a)
forall cell_a. Int -> Stream cell_a -> StreamHead cell_a
StreamHead Int
startingCellOffset Stream (Cell a)
stream)
    (InChan a -> OutChan a -> (InChan a, OutChan a))
-> IO (InChan a) -> IO (OutChan a) -> IO (InChan a, OutChan a)
forall (f :: * -> *) a b c.
Applicative f =>
(a -> b -> c) -> f a -> f b -> f c
liftA2 (,) (Ticket (Cell a) -> ChanEnd (Cell a) -> InChan a
forall a. Ticket (Cell a) -> ChanEnd (Cell a) -> InChan a
InChan Ticket (Cell a)
savedEmptyTkt (ChanEnd (Cell a) -> InChan a)
-> IO (ChanEnd (Cell a)) -> IO (InChan a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO (ChanEnd (Cell a))
end) (ChanEnd (Cell a) -> OutChan a
forall a. ChanEnd (Cell a) -> OutChan a
OutChan (ChanEnd (Cell a) -> OutChan a)
-> IO (ChanEnd (Cell a)) -> IO (OutChan a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO (ChanEnd (Cell a))
end)

-- | Duplicate a chan: the returned @OutChan@ begins empty, but data written to
-- the argument @InChan@ from then on will be available from both the original
-- @OutChan@ and the one returned here, creating a kind of broadcast channel.
dupChan :: InChan a -> IO (OutChan a)
{-# INLINE dupChan #-}
dupChan :: InChan a -> IO (OutChan a)
dupChan (InChan Ticket (Cell a)
_ (ChanEnd SegSource (Cell a)
segSource AtomicCounter
counter IORef (StreamHead (Cell a))
streamHead)) = do
    StreamHead (Cell a)
hLoc <- IORef (StreamHead (Cell a)) -> IO (StreamHead (Cell a))
forall a. IORef a -> IO a
readIORef IORef (StreamHead (Cell a))
streamHead
    IO ()
loadLoadBarrier  -- NOTE [1]
    Int
wCount <- AtomicCounter -> IO Int
readCounter AtomicCounter
counter
    
    AtomicCounter
counter' <- Int -> IO AtomicCounter
newCounter Int
wCount 
    IORef (StreamHead (Cell a))
streamHead' <- StreamHead (Cell a) -> IO (IORef (StreamHead (Cell a)))
forall a. a -> IO (IORef a)
newIORef StreamHead (Cell a)
hLoc
    OutChan a -> IO (OutChan a)
forall (m :: * -> *) a. Monad m => a -> m a
return (OutChan a -> IO (OutChan a)) -> OutChan a -> IO (OutChan a)
forall a b. (a -> b) -> a -> b
$ ChanEnd (Cell a) -> OutChan a
forall a. ChanEnd (Cell a) -> OutChan a
OutChan (SegSource (Cell a)
-> AtomicCounter -> IORef (StreamHead (Cell a)) -> ChanEnd (Cell a)
forall cell_a.
SegSource cell_a
-> AtomicCounter -> IORef (StreamHead cell_a) -> ChanEnd cell_a
ChanEnd SegSource (Cell a)
segSource AtomicCounter
counter' IORef (StreamHead (Cell a))
streamHead')
  -- [1] We must read the streamHead before inspecting the counter; otherwise,
  -- as writers write, the stream head pointer may advance past the cell
  -- indicated by wCount. For the corresponding store-store barrier see [*] in
  -- moveToNextCell

-- | Write a value to the channel.
writeChan :: InChan a -> a -> IO ()
{-# INLINE writeChan #-}
writeChan :: InChan a -> a -> IO ()
writeChan (InChan Ticket (Cell a)
savedEmptyTkt ce :: ChanEnd (Cell a)
ce@(ChanEnd SegSource (Cell a)
segSource AtomicCounter
_ IORef (StreamHead (Cell a))
_)) = \a
a-> IO () -> IO ()
forall a. IO a -> IO a
mask_ (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do 
    (Int
segIx, (Stream StreamSegment (Cell a)
seg IORef (NextSegment (Cell a))
next), IO ()
maybeUpdateStreamHead) <- ChanEnd (Cell a) -> IO (Int, Stream (Cell a), IO ())
forall cell_a. ChanEnd cell_a -> IO (Int, Stream cell_a, IO ())
moveToNextCell ChanEnd (Cell a)
ce
    IO ()
maybeUpdateStreamHead
    (Bool
success,Ticket (Cell a)
nonEmptyTkt) <- StreamSegment (Cell a)
-> Int -> Ticket (Cell a) -> Cell a -> IO (Bool, Ticket (Cell a))
forall a.
MutableArray RealWorld a
-> Int -> Ticket a -> a -> IO (Bool, Ticket a)
casArrayElem StreamSegment (Cell a)
seg Int
segIx Ticket (Cell a)
savedEmptyTkt (a -> Cell a
forall a. a -> Cell a
Written a
a)
    -- try to pre-allocate next segment; NOTE [1]
    Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
segIx Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ IO (Stream (Cell a)) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Stream (Cell a)) -> IO ()) -> IO (Stream (Cell a)) -> IO ()
forall a b. (a -> b) -> a -> b
$
      IORef (NextSegment (Cell a))
-> SegSource (Cell a) -> Int -> IO (Stream (Cell a))
forall cell_a.
IORef (NextSegment cell_a)
-> SegSource cell_a -> Int -> IO (Stream cell_a)
waitingAdvanceStream IORef (NextSegment (Cell a))
next SegSource (Cell a)
segSource Int
0
    Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool -> Bool
not Bool
success) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
        case Ticket (Cell a) -> Cell a
forall a. Ticket a -> a
peekTicket Ticket (Cell a)
nonEmptyTkt of
             Blocking MVar a
v -> MVar a -> a -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar a
v a
a
             Cell a
Empty      -> [Char] -> IO ()
forall a. HasCallStack => [Char] -> a
error [Char]
"Stored Empty Ticket went stale!" -- NOTE [2]
             Written a
_  -> [Char] -> IO ()
forall a. HasCallStack => [Char] -> a
error [Char]
"Nearly Impossible! Expected Blocking"
  -- [1] the writer which arrives first to the first cell of a new segment is
  -- tasked (somewhat arbitrarily) with trying to pre-allocate the *next*
  -- segment hopefully ahead of any readers or writers who might need it. This
  -- will race with any reader *or* writer that tries to read the next segment
  -- and finds it's empty (see `waitingAdvanceStream`); when this wins
  -- (hopefully the vast majority of the time) we avoid a throughput hit.
  --
  -- [2] this assumes that the compiler is statically-allocating Empty, sharing
  -- the constructor among all uses, and that it never moves it between
  -- checking the pointer stored in the array and checking the pointer in the
  -- cached Any Empty value. If this is incorrect then the Ticket approach to
  -- CAS is equally incorrect (though maybe less likely to fail).


-- We would like our queue to behave like Chan in that an async exception
-- raised in a reader known to be blocked or about to block on an empty queue
-- never results in a lost message; this matches our simple intuition about the
-- mechanics of a queue: if you're last in line for a cupcake and have to leave
-- you wouldn't expect the cake that would have gone to you to disappear.
--
-- But there are other systems that a busy cake shop might use that you could
-- easily imagine resulting in a lost cake, and that's a different question
-- from whether cakes are given to well-behaved customers in the order they
-- came out of the oven, or whether a customer leaving at the wrong moment
-- might cause the cake shop to burn down...
readSegIxUnmasked :: (IO a -> IO a) -> (Int, Stream (Cell a), IO ()) -> IO a
{-# INLINE readSegIxUnmasked #-}
readSegIxUnmasked :: (IO a -> IO a) -> (Int, Stream (Cell a), IO ()) -> IO a
readSegIxUnmasked IO a -> IO a
h =
  \(Int
segIx, (Stream StreamSegment (Cell a)
seg IORef (NextSegment (Cell a))
_), IO ()
maybeUpdateStreamHead)-> do
    IO ()
maybeUpdateStreamHead
    Ticket (Cell a)
cellTkt <- StreamSegment (Cell a) -> Int -> IO (Ticket (Cell a))
forall a. MutableArray RealWorld a -> Int -> IO (Ticket a)
readArrayElem StreamSegment (Cell a)
seg Int
segIx
    case Ticket (Cell a) -> Cell a
forall a. Ticket a -> a
peekTicket Ticket (Cell a)
cellTkt of
         Written a
a -> a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return a
a
         Cell a
Empty -> do
            MVar a
v <- IO (MVar a)
forall a. IO (MVar a)
newEmptyMVar
            (Bool
success,Ticket (Cell a)
elseWrittenCell) <- StreamSegment (Cell a)
-> Int -> Ticket (Cell a) -> Cell a -> IO (Bool, Ticket (Cell a))
forall a.
MutableArray RealWorld a
-> Int -> Ticket a -> a -> IO (Bool, Ticket a)
casArrayElem StreamSegment (Cell a)
seg Int
segIx Ticket (Cell a)
cellTkt (MVar a -> Cell a
forall a. MVar a -> Cell a
Blocking MVar a
v)
            if Bool
success 
              then MVar a -> IO a
readBlocking MVar a
v
              else case Ticket (Cell a) -> Cell a
forall a. Ticket a -> a
peekTicket Ticket (Cell a)
elseWrittenCell of
                        -- In the meantime a writer has written. Good!
                        Written a
a -> a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return a
a
                        -- ...or a dupChan reader initiated blocking:
                        Blocking MVar a
v2 -> MVar a -> IO a
readBlocking MVar a
v2
                        Cell a
_ -> [Char] -> IO a
forall a. HasCallStack => [Char] -> a
error [Char]
"Impossible! Expecting Written or Blocking"
         Blocking MVar a
v -> MVar a -> IO a
readBlocking MVar a
v
  -- N.B. must use `readMVar` here to support `dupChan`:
  where readBlocking :: MVar a -> IO a
readBlocking MVar a
v = (IO a -> IO a) -> IO a -> IO a
forall a. a -> a
inline IO a -> IO a
h (IO a -> IO a) -> IO a -> IO a
forall a b. (a -> b) -> a -> b
$ MVar a -> IO a
forall a. MVar a -> IO a
readMVar MVar a
v 


-- | Returns immediately with:
--
--  - an @'UT.Element' a@ future, which returns one unique element when it
--  becomes available via 'UT.tryRead'.
--
--  - a blocking @IO@ action that returns the element when it becomes available.
--
-- /Note/: This is a destructive operation. See 'UT.Element' for more details.
--
-- If you're using this function exclusively you might find the implementation
-- in "Control.Concurrent.Chan.Unagi.NoBlocking" is faster.
--
-- /Note re. exceptions/: When an async exception is raised during a @tryReadChan@ 
-- the message that the read would have returned is likely to be lost, just as
-- it would be when raised directly after this function returns.
tryReadChan :: OutChan a -> IO (UT.Element a, IO a)
{-# INLINE tryReadChan #-}
tryReadChan :: OutChan a -> IO (Element a, IO a)
tryReadChan (OutChan ChanEnd (Cell a)
ce) = do -- no masking needed
    segStuff :: (Int, Stream (Cell a), IO ())
segStuff@(Int
segIx, (Stream StreamSegment (Cell a)
seg IORef (NextSegment (Cell a))
_), IO ()
maybeUpdateStreamHead) <- ChanEnd (Cell a) -> IO (Int, Stream (Cell a), IO ())
forall cell_a. ChanEnd cell_a -> IO (Int, Stream cell_a, IO ())
moveToNextCell ChanEnd (Cell a)
ce
    IO ()
maybeUpdateStreamHead
    (Element a, IO a) -> IO (Element a, IO a)
forall (m :: * -> *) a. Monad m => a -> m a
return ( 
        IO (Maybe a) -> Element a
forall a. IO (Maybe a) -> Element a
UT.Element (IO (Maybe a) -> Element a) -> IO (Maybe a) -> Element a
forall a b. (a -> b) -> a -> b
$ do
          Cell a
cell <- MutableArray (PrimState IO) (Cell a) -> Int -> IO (Cell a)
forall (m :: * -> *) a.
PrimMonad m =>
MutableArray (PrimState m) a -> Int -> m a
P.readArray StreamSegment (Cell a)
MutableArray (PrimState IO) (Cell a)
seg Int
segIx
          case Cell a
cell of
               Written a
a  -> Maybe a -> IO (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe a -> IO (Maybe a)) -> Maybe a -> IO (Maybe a)
forall a b. (a -> b) -> a -> b
$ a -> Maybe a
forall a. a -> Maybe a
Just a
a
               Cell a
Empty      -> Maybe a -> IO (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
forall a. Maybe a
Nothing
               Blocking MVar a
v -> MVar a -> IO (Maybe a)
forall a. MVar a -> IO (Maybe a)
tryReadMVar MVar a
v

      , (IO a -> IO a) -> (Int, Stream (Cell a), IO ()) -> IO a
forall a. (IO a -> IO a) -> (Int, Stream (Cell a), IO ()) -> IO a
readSegIxUnmasked IO a -> IO a
forall a. a -> a
id (Int, Stream (Cell a), IO ())
segStuff 
      )


-- | Read an element from the chan, blocking if the chan is empty.
--
-- /Note re. exceptions/: When an async exception is raised during a @readChan@ 
-- the message that the read would have returned is likely to be lost, even when
-- the read is known to be blocked on an empty queue. If you need to handle
-- this scenario, you can use 'readChanOnException'.
readChan :: OutChan a -> IO a
{-# INLINE readChan #-}
readChan :: OutChan a -> IO a
readChan = \(OutChan ChanEnd (Cell a)
ce)-> ChanEnd (Cell a) -> IO (Int, Stream (Cell a), IO ())
forall cell_a. ChanEnd cell_a -> IO (Int, Stream cell_a, IO ())
moveToNextCell ChanEnd (Cell a)
ce IO (Int, Stream (Cell a), IO ())
-> ((Int, Stream (Cell a), IO ()) -> IO a) -> IO a
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (IO a -> IO a) -> (Int, Stream (Cell a), IO ()) -> IO a
forall a. (IO a -> IO a) -> (Int, Stream (Cell a), IO ()) -> IO a
readSegIxUnmasked IO a -> IO a
forall a. a -> a
id

-- | Like 'readChan' but allows recovery of the queue element which would have
-- been read, in the case that an async exception is raised during the read. To
-- be precise exceptions are raised, and the handler run, only when
-- @readChanOnException@ is blocking.
--
-- The second argument is a handler that takes a blocking IO action returning
-- the element, and performs some recovery action.  When the handler is called,
-- the passed @IO a@ is the only way to access the element.
readChanOnException :: OutChan a -> (IO a -> IO ()) -> IO a
{-# INLINE readChanOnException #-}
readChanOnException :: OutChan a -> (IO a -> IO ()) -> IO a
readChanOnException (OutChan ChanEnd (Cell a)
ce) IO a -> IO ()
h = IO a -> IO a
forall a. IO a -> IO a
mask_ ( 
    ChanEnd (Cell a) -> IO (Int, Stream (Cell a), IO ())
forall cell_a. ChanEnd cell_a -> IO (Int, Stream cell_a, IO ())
moveToNextCell ChanEnd (Cell a)
ce IO (Int, Stream (Cell a), IO ())
-> ((Int, Stream (Cell a), IO ()) -> IO a) -> IO a
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= 
      (IO a -> IO a) -> (Int, Stream (Cell a), IO ()) -> IO a
forall a. (IO a -> IO a) -> (Int, Stream (Cell a), IO ()) -> IO a
readSegIxUnmasked (\IO a
io-> IO a
io IO a -> IO () -> IO a
forall a b. IO a -> IO b -> IO a
`onException` (IO a -> IO ()
h IO a
io)) )


------------ NOTE: ALL CODE BELOW IS RE-USED IN Unagi.NoBlocking --------------


-- increments counter, finds stream segment of corresponding cell (updating the
-- stream head pointer as needed), and returns the stream segment and relative
-- index of our cell.
moveToNextCell :: ChanEnd cell_a -> IO (Int, Stream cell_a, IO ())
{-# INLINE moveToNextCell #-}
moveToNextCell :: ChanEnd cell_a -> IO (Int, Stream cell_a, IO ())
moveToNextCell (ChanEnd SegSource cell_a
segSource AtomicCounter
counter IORef (StreamHead cell_a)
streamHead) = do
    (StreamHead Int
offset0 Stream cell_a
str0) <- IORef (StreamHead cell_a) -> IO (StreamHead cell_a)
forall a. IORef a -> IO a
readIORef IORef (StreamHead cell_a)
streamHead
    -- NOTE [3/4]
    Int
ix <- Int -> AtomicCounter -> IO Int
incrCounter Int
1 AtomicCounter
counter  -- [*]
    let (Int
segsAway, Int
segIx) = Bool -> (Int, Int) -> (Int, Int)
forall a. HasCallStack => Bool -> a -> a
assert ((Int
ix Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
offset0) Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
0) ((Int, Int) -> (Int, Int)) -> (Int, Int) -> (Int, Int)
forall a b. (a -> b) -> a -> b
$ 
                 Int -> (Int, Int)
divMod_sEGMENT_LENGTH (Int -> (Int, Int)) -> Int -> (Int, Int)
forall a b. (a -> b) -> a -> b
$! (Int
ix Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
offset0)
              -- (ix - offset0) `quotRem` sEGMENT_LENGTH
        {-# INLINE go #-}
        go :: t -> Stream cell_a -> IO (Stream cell_a)
go t
0 Stream cell_a
str = Stream cell_a -> IO (Stream cell_a)
forall (m :: * -> *) a. Monad m => a -> m a
return Stream cell_a
str
        go !t
n (Stream StreamSegment cell_a
_ IORef (NextSegment cell_a)
next) =
            IORef (NextSegment cell_a)
-> SegSource cell_a -> Int -> IO (Stream cell_a)
forall cell_a.
IORef (NextSegment cell_a)
-> SegSource cell_a -> Int -> IO (Stream cell_a)
waitingAdvanceStream IORef (NextSegment cell_a)
next SegSource cell_a
segSource (Int
nEW_SEGMENT_WAITInt -> Int -> Int
forall a. Num a => a -> a -> a
*Int
segIx) -- NOTE [1]
              IO (Stream cell_a)
-> (Stream cell_a -> IO (Stream cell_a)) -> IO (Stream cell_a)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= t -> Stream cell_a -> IO (Stream cell_a)
go (t
nt -> t -> t
forall a. Num a => a -> a -> a
-t
1)
    Stream cell_a
str <- Int -> Stream cell_a -> IO (Stream cell_a)
forall t. (Eq t, Num t) => t -> Stream cell_a -> IO (Stream cell_a)
go Int
segsAway Stream cell_a
str0
    -- In Unagi.NoBlocking we need to control when this is run (see also [5]):
    let !maybeUpdateStreamHead :: IO ()
maybeUpdateStreamHead = do
          Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
segsAway Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
            let !offsetN :: Int
offsetN = 
                  Int
offset0 Int -> Int -> Int
forall a. Num a => a -> a -> a
+ (Int
segsAway Int -> Int -> Int
forall a. Bits a => a -> Int -> a
`unsafeShiftL` Int
lOG_SEGMENT_LENGTH) --(segsAway*sEGMENT_LENGTH)
            IORef (StreamHead cell_a) -> StreamHead cell_a -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef (StreamHead cell_a)
streamHead (StreamHead cell_a -> IO ()) -> StreamHead cell_a -> IO ()
forall a b. (a -> b) -> a -> b
$ Int -> Stream cell_a -> StreamHead cell_a
forall cell_a. Int -> Stream cell_a -> StreamHead cell_a
StreamHead Int
offsetN Stream cell_a
str
          IORef (StreamHead cell_a) -> IO ()
forall a. IORef a -> IO ()
touchIORef IORef (StreamHead cell_a)
streamHead -- NOTE [5]
    (Int, Stream cell_a, IO ()) -> IO (Int, Stream cell_a, IO ())
forall (m :: * -> *) a. Monad m => a -> m a
return (Int
segIx,Stream cell_a
str, IO ()
maybeUpdateStreamHead)
  -- [1] All readers or writers needing to work with a not-yet-created segment
  -- race to create it, but those past index 0 have progressively long waits; 20
  -- is chosen as 20 readIORefs should be more than enough time for writer/reader
  -- 0 to add the new segment (if it's not descheduled).
  --
  -- [2] advancing the stream head pointer on segIx == sEGMENT_LENGTH - 1 would
  -- be more correct, but this is simpler here. This may move the head pointer
  -- BACKWARDS if the thread was descheduled, but that's not a correctness
  -- issue.
  --
  -- [3] There is a theoretical race condition here: thread reads head and is
  -- descheduled, meanwhile other readers/writers increment counter one full
  -- lap; when we increment we think we've found our cell in what is actually a
  -- very old segment. However in this scenario all addressable memory will
  -- have been consumed just by the array pointers which haven't been able to
  -- be GC'd. So I don't think this is something to worry about.
  --
  -- [4] We must ensure the read above doesn't move ahead of our incrCounter
  -- below. But fetchAddByteArrayInt is meant to be a full barrier (for
  -- compiler and processor) across architectures, so no explicit barrier is
  -- needed here.
  --
  -- [[5]] FOR Unagi.NoBlocking: This helps ensure that our (possibly last) use
  -- of streamHead occurs after our (possibly last) write, for correctness of
  -- 'isActive'.  See NOTE 1 of 'Unagi.NoBlocking.writeChan'


-- thread-safely try to fill `nextSegRef` at the next offset with a new
-- segment, waiting some number of iterations (for other threads to handle it).
-- Returns nextSegRef's StreamSegment.
waitingAdvanceStream :: IORef (NextSegment cell_a) -> SegSource cell_a 
                     -> Int -> IO (Stream cell_a)
{-# NOINLINE waitingAdvanceStream #-}
waitingAdvanceStream :: IORef (NextSegment cell_a)
-> SegSource cell_a -> Int -> IO (Stream cell_a)
waitingAdvanceStream IORef (NextSegment cell_a)
nextSegRef SegSource cell_a
segSource = Int -> IO (Stream cell_a)
forall t. (Ord t, Num t) => t -> IO (Stream cell_a)
go where
  go :: t -> IO (Stream cell_a)
go !t
wait = Bool -> IO (Stream cell_a) -> IO (Stream cell_a)
forall a. HasCallStack => Bool -> a -> a
assert (t
wait t -> t -> Bool
forall a. Ord a => a -> a -> Bool
>= t
0) (IO (Stream cell_a) -> IO (Stream cell_a))
-> IO (Stream cell_a) -> IO (Stream cell_a)
forall a b. (a -> b) -> a -> b
$ do
    Ticket (NextSegment cell_a)
tk <- IORef (NextSegment cell_a) -> IO (Ticket (NextSegment cell_a))
forall a. IORef a -> IO (Ticket a)
readForCAS IORef (NextSegment cell_a)
nextSegRef
    case Ticket (NextSegment cell_a) -> NextSegment cell_a
forall a. Ticket a -> a
peekTicket Ticket (NextSegment cell_a)
tk of
         NextSegment cell_a
NoSegment 
           | t
wait t -> t -> Bool
forall a. Ord a => a -> a -> Bool
> t
0 -> t -> IO (Stream cell_a)
go (t
wait t -> t -> t
forall a. Num a => a -> a -> a
- t
1)
             -- Create a potential next segment and try to insert it:
           | Bool
otherwise -> do 
               Stream cell_a
potentialStrNext <- StreamSegment cell_a -> IORef (NextSegment cell_a) -> Stream cell_a
forall cell_a.
StreamSegment cell_a -> IORef (NextSegment cell_a) -> Stream cell_a
Stream (StreamSegment cell_a
 -> IORef (NextSegment cell_a) -> Stream cell_a)
-> SegSource cell_a
-> IO (IORef (NextSegment cell_a) -> Stream cell_a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> SegSource cell_a
segSource 
                                          IO (IORef (NextSegment cell_a) -> Stream cell_a)
-> IO (IORef (NextSegment cell_a)) -> IO (Stream cell_a)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> NextSegment cell_a -> IO (IORef (NextSegment cell_a))
forall a. a -> IO (IORef a)
newIORef NextSegment cell_a
forall cell_a. NextSegment cell_a
NoSegment
               (Bool
_,Ticket (NextSegment cell_a)
tkDone) <- IORef (NextSegment cell_a)
-> Ticket (NextSegment cell_a)
-> NextSegment cell_a
-> IO (Bool, Ticket (NextSegment cell_a))
forall a. IORef a -> Ticket a -> a -> IO (Bool, Ticket a)
casIORef IORef (NextSegment cell_a)
nextSegRef Ticket (NextSegment cell_a)
tk (Stream cell_a -> NextSegment cell_a
forall cell_a. Stream cell_a -> NextSegment cell_a
Next Stream cell_a
potentialStrNext)
               -- If that failed another thread succeeded (no false negatives)
               case Ticket (NextSegment cell_a) -> NextSegment cell_a
forall a. Ticket a -> a
peekTicket Ticket (NextSegment cell_a)
tkDone of
                 Next Stream cell_a
strNext -> Stream cell_a -> IO (Stream cell_a)
forall (m :: * -> *) a. Monad m => a -> m a
return Stream cell_a
strNext
                 NextSegment cell_a
_ -> [Char] -> IO (Stream cell_a)
forall a. HasCallStack => [Char] -> a
error [Char]
"Impossible! This should only have been Next segment"
         Next Stream cell_a
strNext -> Stream cell_a -> IO (Stream cell_a)
forall (m :: * -> *) a. Monad m => a -> m a
return Stream cell_a
strNext


-- copying a template array with cloneMutableArray is much faster than creating
-- a new one; in fact it seems we need this in order to scale, since as cores
-- increase we don't have enough "runway" and can't allocate fast enough:
type SegSource cell_a = IO (StreamSegment cell_a)

newSegmentSource :: cell_a -> IO (SegSource cell_a)
newSegmentSource :: cell_a -> IO (SegSource cell_a)
newSegmentSource cell_a
cell_empty = do
    -- NOTE: evaluate Empty seems to be required here in order to not raise
    -- "Stored Empty Ticket went stale!"  exception when in GHCi.
    MutableArray RealWorld cell_a
arr <- cell_a -> IO cell_a
forall a. a -> IO a
evaluate cell_a
cell_empty IO cell_a -> (cell_a -> SegSource cell_a) -> SegSource cell_a
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Int -> cell_a -> IO (MutableArray (PrimState IO) cell_a)
forall (m :: * -> *) a.
PrimMonad m =>
Int -> a -> m (MutableArray (PrimState m) a)
P.newArray Int
sEGMENT_LENGTH
    SegSource cell_a -> IO (SegSource cell_a)
forall (m :: * -> *) a. Monad m => a -> m a
return (MutableArray (PrimState IO) cell_a
-> Int -> Int -> IO (MutableArray (PrimState IO) cell_a)
forall (m :: * -> *) a.
PrimMonad m =>
MutableArray (PrimState m) a
-> Int -> Int -> m (MutableArray (PrimState m) a)
P.cloneMutableArray MutableArray RealWorld cell_a
MutableArray (PrimState IO) cell_a
arr Int
0 Int
sEGMENT_LENGTH)

-- ----------
-- CELLS AND GC:
--
--   Each cell in a segment is assigned at most one reader and one writer
--
--   When all readers disappear and writers continue we'll have at most one
--   segment-worth of garbage that can't be collected at a time; when writers
--   advance the head segment pointer, the previous may be GC'd.
--
--   Readers blocked indefinitely should eventually raise a
--   BlockedIndefinitelyOnMVar.
-- ----------


-- This could go anywhere, and lets us ensure that assertions are turned on
-- when running test suite.
assertionCanary :: IO Bool
assertionCanary :: IO Bool
assertionCanary = do
    Either AssertionFailed ()
assertionsWorking <- IO () -> IO (Either AssertionFailed ())
forall e a. Exception e => IO a -> IO (Either e a)
try (IO () -> IO (Either AssertionFailed ()))
-> IO () -> IO (Either AssertionFailed ())
forall a b. (a -> b) -> a -> b
$ Bool -> IO () -> IO ()
forall a. HasCallStack => Bool -> a -> a
assert Bool
False (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
    Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> IO Bool) -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$
      case Either AssertionFailed ()
assertionsWorking of
           Left (AssertionFailed [Char]
_) -> Bool
True
           Either AssertionFailed ()
_                        -> Bool
False