{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE CPP #-}
{-# LANGUAGE MultiWayIf #-}

{-|
Module      : Z.IO.UV.Manager
Description : IO manager based on libuv
Copyright   : (c) Dong Han, 2017-2018
License     : BSD
Maintainer  : winterland1989@gmail.com
Stability   : experimental
Portability : non-portable

This module provide IO manager which bridge libuv's async interface with ghc's light weight thread.

The main procedures for doing event IO is:

  * Allocate uv_handle in C side, get its slot number with 'getUVSlot', or allocate uv_request with 'withUVRequest'.
  * Prepare you IO buffer with 'pokeBufferTable'(both read and write).
  * Call C side IO functions with predefined callbacks.
  * Block your thread with the 'MVar' from 'getBlockMVar'.
  * Read the result by 'takeMVar' on that 'MVar', it will be the value pushed on C side.
  * Slot is freed on C side, either via callbacks, or when handle is closed.

Usually slots are cache in the IO device so that you don't have to allocate new one before each IO operation.
Check "System.IO.Socket.TCP" as an example.

-}

module Z.IO.UV.Manager
  ( UVManager
  , getUVManager
  , getBlockMVar
  , peekBufferTable
  , pokeBufferTable
  , withUVManager
  , withUVManager'
  , getUVSlot
  -- * request based async function helper
  , withUVRequest
  , withUVRequest_
  , withUVRequest'
  , withUVRequestEx
  -- * uv_stream abstraction
  , initUVStream
  , UVStream(..)
  -- * concurrent helpers
  , forkBa
  ) where

import           Control.Concurrent
import           Control.Monad
import           Control.Monad.IO.Class
import           Data.IORef
import           Data.Bits (shiftL)
import           Data.Word
import           Foreign.Ptr
import           Foreign.Storable
import           GHC.Conc.Sync            (labelThread)
import           System.IO.Unsafe
import           Z.Data.Array
import           Z.Data.PrimRef.PrimIORef
import           Z.IO.Buffered
import           Z.IO.Exception
import           Z.IO.UV.Errno
import           Z.IO.Resource
import           Z.IO.UV.FFI

#define IDLE_LIMIT 20

--------------------------------------------------------------------------------

data UVManager = UVManager
    { UVManager -> IORef (UnliftedArray (MVar Int))
uvmBlockTable :: {-# UNPACK #-} !(IORef (UnliftedArray (MVar Int))) -- a array to store threads blocked on async IO.

    , UVManager -> Ptr UVLoop
uvmLoop       :: {-# UNPACK #-} !(Ptr UVLoop)        -- the uv loop refrerence

    , UVManager -> Ptr UVLoopData
uvmLoopData   :: {-# UNPACK #-} !(Ptr UVLoopData)    -- cached pointer to uv_loop_t's data field

    , UVManager -> MVar Bool
uvmRunning    :: {-# UNPACK #-} !(MVar Bool)     -- only uv manager thread will modify this value.
                                                        -- 'True' druing uv_run and 'False' otherwise.
                                                        --
                                                        -- unlike epoll/ONESHOT, uv loop are NOT thread safe,
                                                        -- we have to wake up the loop before mutating uv_loop's
                                                        -- state.
    , UVManager -> Int
uvmCap        ::  {-# UNPACK #-} !Int                -- the capability uv manager run on.
    }

instance Show UVManager where
    show :: UVManager -> String
show UVManager
uvm = String
"UVManager on capability " String -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> String
forall a. Show a => a -> String
show (UVManager -> Int
uvmCap UVManager
uvm)

instance Eq UVManager where
    UVManager
uvm == :: UVManager -> UVManager -> Bool
== UVManager
uvm' =
        UVManager -> Int
uvmCap UVManager
uvm Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== UVManager -> Int
uvmCap UVManager
uvm'

uvManagerArray :: IORef (Array UVManager)
{-# NOINLINE uvManagerArray #-}
uvManagerArray :: IORef (Array UVManager)
uvManagerArray = IO (IORef (Array UVManager)) -> IORef (Array UVManager)
forall a. IO a -> a
unsafePerformIO (IO (IORef (Array UVManager)) -> IORef (Array UVManager))
-> IO (IORef (Array UVManager)) -> IORef (Array UVManager)
forall a b. (a -> b) -> a -> b
$ do
    Int
numCaps <- IO Int
getNumCapabilities
    MutableArray RealWorld UVManager
uvmArray <- Int -> IO (MArr Array RealWorld UVManager)
forall (arr :: * -> *) a (m :: * -> *) s.
(Arr arr a, PrimMonad m, PrimState m ~ s) =>
Int -> m (MArr arr s a)
newArr Int
numCaps
    QSemN
s <- Int -> IO QSemN
newQSemN Int
0
    [Int] -> (Int -> IO ThreadId) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [Int
0..Int
numCapsInt -> Int -> Int
forall a. Num a => a -> a -> a
-Int
1] ((Int -> IO ThreadId) -> IO ()) -> (Int -> IO ThreadId) -> IO ()
forall a b. (a -> b) -> a -> b
$ \ Int
i -> do
        -- fork uv manager thread
        Int -> IO () -> IO ThreadId
forkOn Int
i (IO () -> IO ThreadId)
-> ((UVManager -> IO ()) -> IO ())
-> (UVManager -> IO ())
-> IO ThreadId
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Resource UVManager -> (UVManager -> IO ()) -> IO ()
forall (m :: * -> *) a b.
(MonadMask m, MonadIO m, HasCallStack) =>
Resource a -> (a -> m b) -> m b
withResource (HasCallStack => Int -> Int -> Resource UVManager
Int -> Int -> Resource UVManager
initUVManager Int
INIT_LOOP_SIZE Int
i) ((UVManager -> IO ()) -> IO ThreadId)
-> (UVManager -> IO ()) -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ \ UVManager
m -> do
            IO ThreadId
myThreadId IO ThreadId -> (ThreadId -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (ThreadId -> String -> IO ()
`labelThread` (String
"uv manager on " String -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> String
forall a. Show a => a -> String
show Int
i))
            MArr Array RealWorld UVManager -> Int -> UVManager -> IO ()
forall (arr :: * -> *) a (m :: * -> *) s.
(Arr arr a, PrimMonad m, PrimState m ~ s) =>
MArr arr s a -> Int -> a -> m ()
writeArr MArr Array RealWorld UVManager
MutableArray RealWorld UVManager
uvmArray Int
i UVManager
m
            QSemN -> Int -> IO ()
signalQSemN QSemN
s Int
1
            HasCallStack => UVManager -> IO ()
UVManager -> IO ()
startUVManager UVManager
m
    QSemN -> Int -> IO ()
waitQSemN QSemN
s Int
numCaps
    Array UVManager
iuvmArray <- MArr Array RealWorld UVManager -> IO (Array UVManager)
forall (arr :: * -> *) a (m :: * -> *) s.
(Arr arr a, PrimMonad m, PrimState m ~ s) =>
MArr arr s a -> m (arr a)
unsafeFreezeArr MArr Array RealWorld UVManager
MutableArray RealWorld UVManager
uvmArray
    Array UVManager -> IO (IORef (Array UVManager))
forall a. a -> IO (IORef a)
newIORef Array UVManager
iuvmArray

-- | Get 'UVManager' runing on the same capability.
--
getUVManager :: IO UVManager
{-# INLINABLE getUVManager #-}
getUVManager :: IO UVManager
getUVManager = do
    (Int
cap, Bool
_) <- ThreadId -> IO (Int, Bool)
threadCapability (ThreadId -> IO (Int, Bool)) -> IO ThreadId -> IO (Int, Bool)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< IO ThreadId
myThreadId
    Array UVManager
uvmArray <- IORef (Array UVManager) -> IO (Array UVManager)
forall a. IORef a -> IO a
readIORef IORef (Array UVManager)
uvManagerArray
    Array UVManager -> Int -> IO UVManager
forall (arr :: * -> *) a (m :: * -> *).
(Arr arr a, Monad m) =>
arr a -> Int -> m a
indexArrM Array UVManager
uvmArray (Int
cap Int -> Int -> Int
forall a. Integral a => a -> a -> a
`rem` Array UVManager -> Int
forall (arr :: * -> *) a. Arr arr a => arr a -> Int
sizeofArr Array UVManager
uvmArray)

-- | Get 'MVar' from blocking table with given slot.
--
getBlockMVar :: UVManager -> UVSlot -> IO (MVar Int)
{-# INLINABLE getBlockMVar #-}
getBlockMVar :: UVManager -> Int -> IO (MVar Int)
getBlockMVar UVManager
uvm Int
slot = do
    UnliftedArray (MVar Int)
blockTable <- IORef (UnliftedArray (MVar Int)) -> IO (UnliftedArray (MVar Int))
forall a. IORef a -> IO a
readIORef (UVManager -> IORef (UnliftedArray (MVar Int))
uvmBlockTable UVManager
uvm)
    UnliftedArray (MVar Int) -> Int -> IO (MVar Int)
forall (arr :: * -> *) a (m :: * -> *).
(Arr arr a, Monad m) =>
arr a -> Int -> m a
indexArrM UnliftedArray (MVar Int)
blockTable Int
slot

-- | Poke a prepared buffer and size into loop data under given slot.
--
-- NOTE, this action is not protected with 'withUVManager' for effcient reason, you should merge this action
-- with other uv action and put them together inside a 'withUVManager' or 'withUVManager\''. for example:
--
-- @
--    ...
--    withUVManager' uvm $ do
--        pokeBufferTable uvm slot buf len
--        uvReadStart handle
--    ...
-- @
--
pokeBufferTable :: UVManager    -- ^ uv manager
                -> UVSlot       -- ^ uv slot
                -> Ptr Word8    -- ^ buffer pointer
                -> Int          -- ^ buffer length
                -> IO ()
{-# INLINABLE pokeBufferTable #-}
pokeBufferTable :: UVManager -> Int -> Ptr Word8 -> Int -> IO ()
pokeBufferTable UVManager
uvm Int
slot Ptr Word8
buf Int
bufSiz = do
    (Ptr (Ptr Word8)
bufTable, Ptr CSsize
bufSizTable) <- Ptr UVLoopData -> IO (Ptr (Ptr Word8), Ptr CSsize)
peekUVBufferTable (UVManager -> Ptr UVLoopData
uvmLoopData UVManager
uvm)
    Ptr (Ptr Word8) -> Int -> Ptr Word8 -> IO ()
forall a. Storable a => Ptr a -> Int -> a -> IO ()
pokeElemOff Ptr (Ptr Word8)
bufTable Int
slot Ptr Word8
buf
    Ptr CSsize -> Int -> CSsize -> IO ()
forall a. Storable a => Ptr a -> Int -> a -> IO ()
pokeElemOff Ptr CSsize
bufSizTable Int
slot (Int -> CSsize
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
bufSiz)

peekBufferTable :: UVManager -> UVSlot -> IO Int
{-# INLINABLE peekBufferTable #-}
peekBufferTable :: UVManager -> Int -> IO Int
peekBufferTable UVManager
uvm Int
slot = do
    (Ptr (Ptr Word8)
_, Ptr CSsize
bufSizTable) <- Ptr UVLoopData -> IO (Ptr (Ptr Word8), Ptr CSsize)
peekUVBufferTable (UVManager -> Ptr UVLoopData
uvmLoopData UVManager
uvm)
    CSsize -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (CSsize -> Int) -> IO CSsize -> IO Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Ptr CSsize -> Int -> IO CSsize
forall a. Storable a => Ptr a -> Int -> IO a
peekElemOff Ptr CSsize
bufSizTable Int
slot

initUVManager :: HasCallStack => Int -> Int -> Resource UVManager
initUVManager :: Int -> Int -> Resource UVManager
initUVManager Int
siz Int
cap = do
    Ptr UVLoop
loop  <- IO (Ptr UVLoop) -> (Ptr UVLoop -> IO ()) -> Resource (Ptr UVLoop)
forall a. IO a -> (a -> IO ()) -> Resource a
initResource
                (IO (Ptr UVLoop) -> IO (Ptr UVLoop)
forall a. HasCallStack => IO (Ptr a) -> IO (Ptr a)
throwOOMIfNull (IO (Ptr UVLoop) -> IO (Ptr UVLoop))
-> IO (Ptr UVLoop) -> IO (Ptr UVLoop)
forall a b. (a -> b) -> a -> b
$ Int -> IO (Ptr UVLoop)
hs_uv_loop_init (Int -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
siz))
                Ptr UVLoop -> IO ()
hs_uv_loop_close
    IO UVManager -> Resource UVManager
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO UVManager -> Resource UVManager)
-> IO UVManager -> Resource UVManager
forall a b. (a -> b) -> a -> b
$ do
        MutableUnliftedArray RealWorld (MVar Int)
mblockTable <- Int -> IO (MArr UnliftedArray RealWorld (MVar Int))
forall (arr :: * -> *) a (m :: * -> *) s.
(Arr arr a, PrimMonad m, PrimState m ~ s) =>
Int -> m (MArr arr s a)
newArr Int
siz
        [Int] -> (Int -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [Int
0..Int
sizInt -> Int -> Int
forall a. Num a => a -> a -> a
-Int
1] ((Int -> IO ()) -> IO ()) -> (Int -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \ Int
i -> MArr UnliftedArray RealWorld (MVar Int) -> Int -> MVar Int -> IO ()
forall (arr :: * -> *) a (m :: * -> *) s.
(Arr arr a, PrimMonad m, PrimState m ~ s) =>
MArr arr s a -> Int -> a -> m ()
writeArr MArr UnliftedArray RealWorld (MVar Int)
MutableUnliftedArray RealWorld (MVar Int)
mblockTable Int
i (MVar Int -> IO ()) -> IO (MVar Int) -> IO ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< IO (MVar Int)
forall a. IO (MVar a)
newEmptyMVar
        UnliftedArray (MVar Int)
blockTable <- MArr UnliftedArray RealWorld (MVar Int)
-> IO (UnliftedArray (MVar Int))
forall (arr :: * -> *) a (m :: * -> *) s.
(Arr arr a, PrimMonad m, PrimState m ~ s) =>
MArr arr s a -> m (arr a)
unsafeFreezeArr MArr UnliftedArray RealWorld (MVar Int)
MutableUnliftedArray RealWorld (MVar Int)
mblockTable
        IORef (UnliftedArray (MVar Int))
blockTableRef <- UnliftedArray (MVar Int) -> IO (IORef (UnliftedArray (MVar Int)))
forall a. a -> IO (IORef a)
newIORef UnliftedArray (MVar Int)
blockTable
        Ptr UVLoopData
loopData <- Ptr UVLoop -> IO (Ptr UVLoopData)
peekUVLoopData Ptr UVLoop
loop
        MVar Bool
runningLock <- Bool -> IO (MVar Bool)
forall a. a -> IO (MVar a)
newMVar Bool
False
        UVManager -> IO UVManager
forall (m :: * -> *) a. Monad m => a -> m a
return (IORef (UnliftedArray (MVar Int))
-> Ptr UVLoop -> Ptr UVLoopData -> MVar Bool -> Int -> UVManager
UVManager IORef (UnliftedArray (MVar Int))
blockTableRef Ptr UVLoop
loop Ptr UVLoopData
loopData MVar Bool
runningLock Int
cap)

-- | Lock an uv mananger, so that we can safely mutate its uv_loop's state.
--
-- libuv is not thread safe, use this function to perform any action which will mutate uv_loop's state.
--
withUVManager :: HasCallStack => UVManager -> (Ptr UVLoop -> IO a) -> IO a
withUVManager :: UVManager -> (Ptr UVLoop -> IO a) -> IO a
withUVManager (UVManager IORef (UnliftedArray (MVar Int))
_ Ptr UVLoop
loop Ptr UVLoopData
loopData MVar Bool
runningLock Int
_) Ptr UVLoop -> IO a
f = IO a
go
  where
    go :: IO a
go = do
        Maybe a
r <- MVar Bool -> (Bool -> IO (Maybe a)) -> IO (Maybe a)
forall a b. MVar a -> (a -> IO b) -> IO b
withMVar MVar Bool
runningLock ((Bool -> IO (Maybe a)) -> IO (Maybe a))
-> (Bool -> IO (Maybe a)) -> IO (Maybe a)
forall a b. (a -> b) -> a -> b
$ \ Bool
running -> do
            if Bool
running
            then do
                -- if uv_run is running, it will stop
                -- if uv_run is not running, next running won't block
                IO CInt -> IO ()
forall a. (HasCallStack, Integral a) => IO a -> IO ()
throwUVIfMinus_ (Ptr UVLoopData -> IO CInt
hs_uv_wake_up_async Ptr UVLoopData
loopData)
                Maybe a -> IO (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
forall a. Maybe a
Nothing
            else do
                a
r <- Ptr UVLoop -> IO a
f Ptr UVLoop
loop
                Maybe a -> IO (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return (a -> Maybe a
forall a. a -> Maybe a
Just a
r)
        case Maybe a
r of
            Just a
r' -> a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return a
r'
            Maybe a
_       -> IO ()
yield IO () -> IO a -> IO a
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO a
go -- we yield here, because uv_run is probably not finished yet

-- | Lock an uv mananger, so that we can safely mutate its uv_loop's state.
--
-- Some action did not request uv_loop pointer explicitly, but will mutate uv_loop underhood, for example:
-- @uv_read_start@. These actions have to be protected by locking the uv_loop.
--
-- In fact most of the libuv's functions are not thread safe, so watch out!
--
withUVManager' :: HasCallStack => UVManager -> IO a -> IO a
withUVManager' :: UVManager -> IO a -> IO a
withUVManager' UVManager
uvm IO a
f = UVManager -> (Ptr UVLoop -> IO a) -> IO a
forall a. HasCallStack => UVManager -> (Ptr UVLoop -> IO a) -> IO a
withUVManager UVManager
uvm (\ Ptr UVLoop
_ -> IO a
f)

-- | Start the uv loop
--
startUVManager :: HasCallStack => UVManager -> IO ()
startUVManager :: UVManager -> IO ()
startUVManager uvm :: UVManager
uvm@(UVManager IORef (UnliftedArray (MVar Int))
_ Ptr UVLoop
_ Ptr UVLoopData
_ MVar Bool
runningLock Int
_) = IO ()
forall b. IO b
poll -- use a closure capture uvm in case of stack memory leaking
  where
    -- we borrow mio's non-blocking/blocking poll strategy here
    poll :: IO b
poll = do
        Int
e <- MVar Bool -> (Bool -> IO Int) -> IO Int
forall a b. MVar a -> (a -> IO b) -> IO b
withMVar MVar Bool
runningLock ((Bool -> IO Int) -> IO Int) -> (Bool -> IO Int) -> IO Int
forall a b. (a -> b) -> a -> b
$ \ Bool
_ -> UVManager -> Bool -> IO Int
step UVManager
uvm Bool
False
        if Int
e Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0                                        -- first we do a non-blocking poll, if we got events
        then IO ()
yield IO () -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO b
poll                              -- we yield here, to let other threads do actual work
        else do                                         -- otherwise we still yield once
            IO ()
yield                                       -- in case other threads can still progress
            Int
e' <- MVar Bool -> (Bool -> IO Int) -> IO Int
forall a b. MVar a -> (a -> IO b) -> IO b
withMVar MVar Bool
runningLock ((Bool -> IO Int) -> IO Int) -> (Bool -> IO Int) -> IO Int
forall a b. (a -> b) -> a -> b
$ \ Bool
_ -> UVManager -> Bool -> IO Int
step UVManager
uvm Bool
False   -- now we do another non-blocking poll to make sure
            if Int
e' Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0 then IO ()
yield IO () -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO b
poll             -- if we got events somehow, we yield and go back
            else do                                 -- if there's still no events, we directly jump to safe blocking poll
                Bool
_ <- MVar Bool -> Bool -> IO Bool
forall a. MVar a -> a -> IO a
swapMVar MVar Bool
runningLock Bool
True          -- after swap this lock, other thread can wake up us
                Int
_ <- UVManager -> Bool -> IO Int
step UVManager
uvm Bool
True                  -- by send async handler, and it's thread safe
                Bool
_ <- MVar Bool -> Bool -> IO Bool
forall a. MVar a -> a -> IO a
swapMVar MVar Bool
runningLock Bool
False

                IO ()
yield                               -- we yield here, to let other threads do actual work
                IO b
poll

    -- call uv_run, return the event number
    step :: UVManager -> Bool -> IO Int
    step :: UVManager -> Bool -> IO Int
step (UVManager IORef (UnliftedArray (MVar Int))
blockTableRef Ptr UVLoop
loop Ptr UVLoopData
loopData MVar Bool
_ Int
_) Bool
block = do
            UnliftedArray (MVar Int)
blockTable <- IORef (UnliftedArray (MVar Int)) -> IO (UnliftedArray (MVar Int))
forall a. IORef a -> IO a
readIORef IORef (UnliftedArray (MVar Int))
blockTableRef
            Ptr UVLoopData -> IO ()
clearUVEventCounter Ptr UVLoopData
loopData        -- clean event counter

            if Bool
block
            then if Bool
rtsSupportsBoundThreads
                then IO CInt -> IO ()
forall a. (HasCallStack, Integral a) => IO a -> IO ()
throwUVIfMinus_ (IO CInt -> IO ()) -> IO CInt -> IO ()
forall a b. (a -> b) -> a -> b
$ Ptr UVLoop -> UVRunMode -> IO CInt
uv_run_safe Ptr UVLoop
loop UVRunMode
UV_RUN_ONCE
                else do
                    -- use a 1ms timeout blocking poll on non-threaded rts
                    IO CInt -> IO ()
forall a. (HasCallStack, Integral a) => IO a -> IO ()
throwUVIfMinus_ (Ptr UVLoopData -> IO CInt
hs_uv_wake_up_timer Ptr UVLoopData
loopData)
                    IO CInt -> IO ()
forall a. (HasCallStack, Integral a) => IO a -> IO ()
throwUVIfMinus_ (Ptr UVLoop -> UVRunMode -> IO CInt
uv_run Ptr UVLoop
loop UVRunMode
UV_RUN_ONCE)
            else IO CInt -> IO ()
forall a. (HasCallStack, Integral a) => IO a -> IO ()
throwUVIfMinus_ (Ptr UVLoop -> UVRunMode -> IO CInt
uv_run Ptr UVLoop
loop UVRunMode
UV_RUN_NOWAIT)

            (Int
c, Ptr Int
q) <- Ptr UVLoopData -> IO (Int, Ptr Int)
peekUVEventQueue Ptr UVLoopData
loopData
            [Int] -> (Int -> IO Bool) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [Int
0..Int
cInt -> Int -> Int
forall a. Num a => a -> a -> a
-Int
1] ((Int -> IO Bool) -> IO ()) -> (Int -> IO Bool) -> IO ()
forall a b. (a -> b) -> a -> b
$ \ Int
i -> do
                Int
slot <- Ptr Int -> Int -> IO Int
forall a. Storable a => Ptr a -> Int -> IO a
peekElemOff Ptr Int
q Int
i
                MVar Int
lock <- UnliftedArray (MVar Int) -> Int -> IO (MVar Int)
forall (arr :: * -> *) a (m :: * -> *).
(Arr arr a, Monad m) =>
arr a -> Int -> m a
indexArrM UnliftedArray (MVar Int)
blockTable Int
slot
                -- It's important to read the buffer size table inside running lock and
                -- unlock ghc thread with the result, where 'tryPutMVar' will mutate waiting
                -- thread's stack to ensure it will receive the result after get resumed.
                --
                -- After step finished, other threads are free to take the same slot,
                -- thus can overwrite the buffer size table, i.e. the previous result.
                --
                Int
r <- UVManager -> Int -> IO Int
peekBufferTable UVManager
uvm Int
slot
                MVar Int -> Int -> IO Bool
forall a. MVar a -> a -> IO Bool
tryPutMVar MVar Int
lock Int
r
            Int -> IO Int
forall (m :: * -> *) a. Monad m => a -> m a
return Int
c

-- | Run a libuv FFI to get a 'UVSlotUnSafe' (which may exceed block table size),
-- resize the block table in that case, so that the returned slot always has an
-- accompanying 'MVar' in block table.
--
-- Always use this function to turn an 'UVSlotUnsafe' into 'UVSlot', so that the block
-- table size synchronize with libuv side's slot table.
getUVSlot :: HasCallStack => UVManager -> IO UVSlotUnSafe -> IO UVSlot
{-# INLINE getUVSlot #-}
getUVSlot :: UVManager -> IO UVSlotUnSafe -> IO Int
getUVSlot (UVManager IORef (UnliftedArray (MVar Int))
blockTableRef Ptr UVLoop
_ Ptr UVLoopData
_ MVar Bool
_ Int
_) IO UVSlotUnSafe
f = do
    Int
slot <- IO Int -> IO Int
forall a. (HasCallStack, Integral a) => IO a -> IO a
throwUVIfMinus (UVSlotUnSafe -> Int
unsafeGetSlot (UVSlotUnSafe -> Int) -> IO UVSlotUnSafe -> IO Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UVSlotUnSafe
f)
    UnliftedArray (MVar Int)
blockTable <- IORef (UnliftedArray (MVar Int)) -> IO (UnliftedArray (MVar Int))
forall a. IORef a -> IO a
readIORef IORef (UnliftedArray (MVar Int))
blockTableRef
    let oldSiz :: Int
oldSiz = UnliftedArray (MVar Int) -> Int
forall (arr :: * -> *) a. Arr arr a => arr a -> Int
sizeofArr UnliftedArray (MVar Int)
blockTable
    Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
slot Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
oldSiz) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
        let newSiz :: Int
newSiz = Int
oldSiz Int -> Int -> Int
forall a. Bits a => a -> Int -> a
`shiftL` Int
2
        MutableUnliftedArray RealWorld (MVar Int)
blockTable' <- Int -> IO (MArr UnliftedArray RealWorld (MVar Int))
forall (arr :: * -> *) a (m :: * -> *) s.
(Arr arr a, PrimMonad m, PrimState m ~ s) =>
Int -> m (MArr arr s a)
newArr Int
newSiz
        MArr UnliftedArray RealWorld (MVar Int)
-> Int -> UnliftedArray (MVar Int) -> Int -> Int -> IO ()
forall (arr :: * -> *) a (m :: * -> *) s.
(Arr arr a, PrimMonad m, PrimState m ~ s) =>
MArr arr s a -> Int -> arr a -> Int -> Int -> m ()
copyArr MArr UnliftedArray RealWorld (MVar Int)
MutableUnliftedArray RealWorld (MVar Int)
blockTable' Int
0 UnliftedArray (MVar Int)
blockTable Int
0 Int
oldSiz
        [Int] -> (Int -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [Int
oldSiz..Int
newSizInt -> Int -> Int
forall a. Num a => a -> a -> a
-Int
1] ((Int -> IO ()) -> IO ()) -> (Int -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \ Int
i ->
            MArr UnliftedArray RealWorld (MVar Int) -> Int -> MVar Int -> IO ()
forall (arr :: * -> *) a (m :: * -> *) s.
(Arr arr a, PrimMonad m, PrimState m ~ s) =>
MArr arr s a -> Int -> a -> m ()
writeArr MArr UnliftedArray RealWorld (MVar Int)
MutableUnliftedArray RealWorld (MVar Int)
blockTable' Int
i (MVar Int -> IO ()) -> IO (MVar Int) -> IO ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< IO (MVar Int)
forall a. IO (MVar a)
newEmptyMVar
        !UnliftedArray (MVar Int)
iBlockTable' <- MArr UnliftedArray RealWorld (MVar Int)
-> IO (UnliftedArray (MVar Int))
forall (arr :: * -> *) a (m :: * -> *) s.
(Arr arr a, PrimMonad m, PrimState m ~ s) =>
MArr arr s a -> m (arr a)
unsafeFreezeArr MArr UnliftedArray RealWorld (MVar Int)
MutableUnliftedArray RealWorld (MVar Int)
blockTable'
        IORef (UnliftedArray (MVar Int))
-> UnliftedArray (MVar Int) -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef (UnliftedArray (MVar Int))
blockTableRef UnliftedArray (MVar Int)
iBlockTable'
    Int -> IO Int
forall (m :: * -> *) a. Monad m => a -> m a
return Int
slot

--------------------------------------------------------------------------------

-- | Cancel uv async function (actions which can be cancelled with 'uv_cancel') with
-- best effort, if the action is already performed, run an extra clean up action.
cancelUVReq :: UVManager -> UVSlot -> (Int -> IO ()) -> IO ()
cancelUVReq :: UVManager -> Int -> (Int -> IO ()) -> IO ()
cancelUVReq UVManager
uvm Int
slot Int -> IO ()
extra_cleanup = UVManager -> (Ptr UVLoop -> IO ()) -> IO ()
forall a. HasCallStack => UVManager -> (Ptr UVLoop -> IO a) -> IO a
withUVManager UVManager
uvm ((Ptr UVLoop -> IO ()) -> IO ()) -> (Ptr UVLoop -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \ Ptr UVLoop
loop -> do
    MVar Int
m <- UVManager -> Int -> IO (MVar Int)
getBlockMVar UVManager
uvm Int
slot
    Maybe Int
r <- MVar Int -> IO (Maybe Int)
forall a. MVar a -> IO (Maybe a)
tryTakeMVar MVar Int
m
    case Maybe Int
r of
        Just Int
r' -> Int -> IO ()
extra_cleanup Int
r'             -- It's too late
        Maybe Int
_ -> do
            UVManager -> Int -> Ptr Word8 -> Int -> IO ()
pokeBufferTable UVManager
uvm Int
slot Ptr Word8
forall a. Ptr a
nullPtr Int
0  -- doing this let libuv side knows that
                                                -- we won't keep buffer alive in callbacks
            Ptr UVLoop -> Int -> IO ()
hs_uv_cancel Ptr UVLoop
loop Int
slot              -- then we cancel the io with best efforts

-- | Exception safe uv request helper
--
-- This helper will run a libuv's async function, which will return a
-- libuv side's slot, then we will accommodate a 'MVar' in block table and
-- wait on that 'MVar', until the async function finished or an exception
-- is received, in later case we will call 'cancelUVReq' to cancel the on-going
-- async function with best efforts,
withUVRequest :: HasCallStack
              => UVManager -> (Ptr UVLoop -> IO UVSlotUnSafe) -> IO Int
withUVRequest :: UVManager -> (Ptr UVLoop -> IO UVSlotUnSafe) -> IO Int
withUVRequest UVManager
uvm Ptr UVLoop -> IO UVSlotUnSafe
f = do
    (Int
slot, MVar Int
m) <- UVManager
-> (Ptr UVLoop -> IO (Int, MVar Int)) -> IO (Int, MVar Int)
forall a. HasCallStack => UVManager -> (Ptr UVLoop -> IO a) -> IO a
withUVManager UVManager
uvm ((Ptr UVLoop -> IO (Int, MVar Int)) -> IO (Int, MVar Int))
-> (Ptr UVLoop -> IO (Int, MVar Int)) -> IO (Int, MVar Int)
forall a b. (a -> b) -> a -> b
$ \ Ptr UVLoop
loop -> IO (Int, MVar Int) -> IO (Int, MVar Int)
forall a. IO a -> IO a
mask_ (IO (Int, MVar Int) -> IO (Int, MVar Int))
-> IO (Int, MVar Int) -> IO (Int, MVar Int)
forall a b. (a -> b) -> a -> b
$ do
        Int
slot <- HasCallStack => UVManager -> IO UVSlotUnSafe -> IO Int
UVManager -> IO UVSlotUnSafe -> IO Int
getUVSlot UVManager
uvm (Ptr UVLoop -> IO UVSlotUnSafe
f Ptr UVLoop
loop)
        MVar Int
m <- UVManager -> Int -> IO (MVar Int)
getBlockMVar UVManager
uvm Int
slot
        Maybe Int
_ <- MVar Int -> IO (Maybe Int)
forall a. MVar a -> IO (Maybe a)
tryTakeMVar MVar Int
m
        (Int, MVar Int) -> IO (Int, MVar Int)
forall (m :: * -> *) a. Monad m => a -> m a
return (Int
slot, MVar Int
m)
    IO Int -> IO Int
forall a. (HasCallStack, Integral a) => IO a -> IO a
throwUVIfMinus (MVar Int -> IO Int
forall a. MVar a -> IO a
takeMVar MVar Int
m IO Int -> IO () -> IO Int
forall a b. IO a -> IO b -> IO a
`onException` UVManager -> Int -> (Int -> IO ()) -> IO ()
cancelUVReq UVManager
uvm Int
slot Int -> IO ()
forall b. b -> IO ()
no_extra_cleanup)
  where no_extra_cleanup :: b -> IO ()
no_extra_cleanup = IO () -> b -> IO ()
forall a b. a -> b -> a
const (IO () -> b -> IO ()) -> IO () -> b -> IO ()
forall a b. (a -> b) -> a -> b
$ () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()

-- | Same with 'withUVRequest' but disgard the result.
withUVRequest_ :: HasCallStack
               => UVManager -> (Ptr UVLoop -> IO UVSlotUnSafe) -> IO ()
withUVRequest_ :: UVManager -> (Ptr UVLoop -> IO UVSlotUnSafe) -> IO ()
withUVRequest_ UVManager
uvm Ptr UVLoop -> IO UVSlotUnSafe
f = IO Int -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (HasCallStack =>
UVManager -> (Ptr UVLoop -> IO UVSlotUnSafe) -> IO Int
UVManager -> (Ptr UVLoop -> IO UVSlotUnSafe) -> IO Int
withUVRequest UVManager
uvm Ptr UVLoop -> IO UVSlotUnSafe
f)

-- | Same with 'withUVRequest' but apply an convert function to result.
--
-- The convert function have all access to the returned value including
-- negative ones, it's convert funtions's responsiblity to throw an exception
-- if appropriate.
withUVRequest' :: HasCallStack
               => UVManager
               -> (Ptr UVLoop -> IO UVSlotUnSafe)
               -> (Int -> IO b)     -- ^ convert function
               -> IO b
withUVRequest' :: UVManager
-> (Ptr UVLoop -> IO UVSlotUnSafe) -> (Int -> IO b) -> IO b
withUVRequest' UVManager
uvm Ptr UVLoop -> IO UVSlotUnSafe
f Int -> IO b
g = do
    (Int
slot, MVar Int
m) <- UVManager
-> (Ptr UVLoop -> IO (Int, MVar Int)) -> IO (Int, MVar Int)
forall a. HasCallStack => UVManager -> (Ptr UVLoop -> IO a) -> IO a
withUVManager UVManager
uvm ((Ptr UVLoop -> IO (Int, MVar Int)) -> IO (Int, MVar Int))
-> (Ptr UVLoop -> IO (Int, MVar Int)) -> IO (Int, MVar Int)
forall a b. (a -> b) -> a -> b
$ \ Ptr UVLoop
loop -> IO (Int, MVar Int) -> IO (Int, MVar Int)
forall a. IO a -> IO a
mask_ (IO (Int, MVar Int) -> IO (Int, MVar Int))
-> IO (Int, MVar Int) -> IO (Int, MVar Int)
forall a b. (a -> b) -> a -> b
$ do
        Int
slot <- HasCallStack => UVManager -> IO UVSlotUnSafe -> IO Int
UVManager -> IO UVSlotUnSafe -> IO Int
getUVSlot UVManager
uvm (Ptr UVLoop -> IO UVSlotUnSafe
f Ptr UVLoop
loop)
        MVar Int
m <- UVManager -> Int -> IO (MVar Int)
getBlockMVar UVManager
uvm Int
slot
        -- since we locked uv manager here, it won't affect next event
        Maybe Int
_ <- MVar Int -> IO (Maybe Int)
forall a. MVar a -> IO (Maybe a)
tryTakeMVar MVar Int
m
        (Int, MVar Int) -> IO (Int, MVar Int)
forall (m :: * -> *) a. Monad m => a -> m a
return (Int
slot, MVar Int
m)
    Int -> IO b
g (Int -> IO b) -> IO Int -> IO b
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< (MVar Int -> IO Int
forall a. MVar a -> IO a
takeMVar MVar Int
m IO Int -> IO () -> IO Int
forall a b. IO a -> IO b -> IO a
`onException` UVManager -> Int -> (Int -> IO ()) -> IO ()
cancelUVReq UVManager
uvm Int
slot Int -> IO ()
forall b. b -> IO ()
no_extra_cleanup)
  where no_extra_cleanup :: b -> IO ()
no_extra_cleanup = IO () -> b -> IO ()
forall a b. a -> b -> a
const (IO () -> b -> IO ()) -> IO () -> b -> IO ()
forall a b. (a -> b) -> a -> b
$ () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()

-- | Same with 'withUVRequest', but will also run an extra cleanup function
-- if async exception hit this thread but the async action is already successfully performed,
-- e.g. release result memory.
withUVRequestEx :: HasCallStack
                => UVManager -> (Ptr UVLoop -> IO UVSlotUnSafe) -> (Int -> IO ()) -> IO Int
withUVRequestEx :: UVManager
-> (Ptr UVLoop -> IO UVSlotUnSafe) -> (Int -> IO ()) -> IO Int
withUVRequestEx UVManager
uvm Ptr UVLoop -> IO UVSlotUnSafe
f Int -> IO ()
extra_cleanup = do
    (Int
slot, MVar Int
m) <- UVManager
-> (Ptr UVLoop -> IO (Int, MVar Int)) -> IO (Int, MVar Int)
forall a. HasCallStack => UVManager -> (Ptr UVLoop -> IO a) -> IO a
withUVManager UVManager
uvm ((Ptr UVLoop -> IO (Int, MVar Int)) -> IO (Int, MVar Int))
-> (Ptr UVLoop -> IO (Int, MVar Int)) -> IO (Int, MVar Int)
forall a b. (a -> b) -> a -> b
$ \ Ptr UVLoop
loop -> IO (Int, MVar Int) -> IO (Int, MVar Int)
forall a. IO a -> IO a
mask_ (IO (Int, MVar Int) -> IO (Int, MVar Int))
-> IO (Int, MVar Int) -> IO (Int, MVar Int)
forall a b. (a -> b) -> a -> b
$ do
        Int
slot <- HasCallStack => UVManager -> IO UVSlotUnSafe -> IO Int
UVManager -> IO UVSlotUnSafe -> IO Int
getUVSlot UVManager
uvm (Ptr UVLoop -> IO UVSlotUnSafe
f Ptr UVLoop
loop)
        MVar Int
m <- UVManager -> Int -> IO (MVar Int)
getBlockMVar UVManager
uvm Int
slot
        Maybe Int
_ <- MVar Int -> IO (Maybe Int)
forall a. MVar a -> IO (Maybe a)
tryTakeMVar MVar Int
m
        (Int, MVar Int) -> IO (Int, MVar Int)
forall (m :: * -> *) a. Monad m => a -> m a
return (Int
slot, MVar Int
m)
    IO Int -> IO Int
forall a. (HasCallStack, Integral a) => IO a -> IO a
throwUVIfMinus (MVar Int -> IO Int
forall a. MVar a -> IO a
takeMVar MVar Int
m IO Int -> IO () -> IO Int
forall a b. IO a -> IO b -> IO a
`onException` UVManager -> Int -> (Int -> IO ()) -> IO ()
cancelUVReq UVManager
uvm Int
slot Int -> IO ()
extra_cleanup)

--------------------------------------------------------------------------------

-- | Fork a new GHC thread with active load-balancing.
--
-- Using libuv based IO solution has a disadvantage that file handlers are bound to certain
-- uv_loop, thus certain uv mananger/capability. Worker threads that migrate to other capability
-- will lead contention since various APIs here is protected by manager's lock, this makes GHC's
-- work-stealing strategy unsuitable for certain workload, such as a webserver.
-- we solve this problem with simple round-robin load-balancing: forkBa will automatically
-- distribute new threads to all capabilities in round-robin manner. Thus its name forkBa(lance).
forkBa :: IO () -> IO ThreadId
forkBa :: IO () -> IO ThreadId
forkBa IO ()
io = do
    Int
i <- Counter -> Int -> IO Int
atomicAddCounter Counter
counter Int
1
    Int -> IO () -> IO ThreadId
forkOn Int
i IO ()
io
  where
    counter :: Counter
    {-# NOINLINE counter #-}
    counter :: Counter
counter = IO Counter -> Counter
forall a. IO a -> a
unsafePerformIO (IO Counter -> Counter) -> IO Counter -> Counter
forall a b. (a -> b) -> a -> b
$ Int -> IO Counter
newCounter Int
0

--------------------------------------------------------------------------------
-- UVStream

-- | A haskell data type wrap an @uv_stream_t@ inside
--
-- 'UVStream' DO NOT provide thread safety! Use 'UVStream' concurrently in multiple
-- threads will lead to undefined behavior.
data UVStream = UVStream
    { UVStream -> Ptr UVHandle
uvsHandle  :: {-# UNPACK #-} !(Ptr UVHandle)
    , UVStream -> Int
uvsSlot    :: {-# UNPACK #-} !UVSlot
    , UVStream -> UVManager
uvsManager :: UVManager
    , UVStream -> IORef Bool
uvsClosed  :: {-# UNPACK #-} !(IORef Bool)    -- We have no thread-safe guarantee,
                                                    -- so no need to use atomic read&write
    }

instance Show UVStream where
    show :: UVStream -> String
show (UVStream Ptr UVHandle
hdl Int
slot UVManager
uvm IORef Bool
_) =
        String
"UVStream{uvsHandle=" String -> ShowS
forall a. [a] -> [a] -> [a]
++ Ptr UVHandle -> String
forall a. Show a => a -> String
show Ptr UVHandle
hdl String -> ShowS
forall a. [a] -> [a] -> [a]
++
                String
",uvsSlot=" String -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> String
forall a. Show a => a -> String
show Int
slot String -> ShowS
forall a. [a] -> [a] -> [a]
++
                String
",uvsManager=" String -> ShowS
forall a. [a] -> [a] -> [a]
++ UVManager -> String
forall a. Show a => a -> String
show UVManager
uvm String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
"}"

-- | Safely lock an uv manager and perform uv_handle initialization.
--
-- Initialization an UV stream usually take two step:
--
--   * allocate an uv_stream struct with proper size
--   * lock a particular uv_loop from a uv manager, and perform custom initialization, such as @uv_tcp_init@.
--
-- And this is what 'initUVStream' do, all you need to do is to provide the manager you want to hook the handle
-- onto(usually the one on the same capability, i.e. the one obtained by 'getUVManager'),
-- and provide a custom initialization function (which should throw an exception if failed).
--
initUVStream :: HasCallStack
             => (Ptr UVLoop -> Ptr UVHandle -> IO ())
             -> UVManager
             -> Resource UVStream
initUVStream :: (Ptr UVLoop -> Ptr UVHandle -> IO ())
-> UVManager -> Resource UVStream
initUVStream Ptr UVLoop -> Ptr UVHandle -> IO ()
f UVManager
uvm = IO UVStream -> (UVStream -> IO ()) -> Resource UVStream
forall a. IO a -> (a -> IO ()) -> Resource a
initResource
    (UVManager -> (Ptr UVLoop -> IO UVStream) -> IO UVStream
forall a. HasCallStack => UVManager -> (Ptr UVLoop -> IO a) -> IO a
withUVManager UVManager
uvm ((Ptr UVLoop -> IO UVStream) -> IO UVStream)
-> (Ptr UVLoop -> IO UVStream) -> IO UVStream
forall a b. (a -> b) -> a -> b
$ \ Ptr UVLoop
loop -> do
        Ptr UVHandle
hdl <- Ptr UVLoop -> IO (Ptr UVHandle)
hs_uv_handle_alloc Ptr UVLoop
loop
        Int
slot <- HasCallStack => UVManager -> IO UVSlotUnSafe -> IO Int
UVManager -> IO UVSlotUnSafe -> IO Int
getUVSlot UVManager
uvm (Ptr UVHandle -> IO UVSlotUnSafe
peekUVHandleData Ptr UVHandle
hdl)
        Maybe Int
_ <- MVar Int -> IO (Maybe Int)
forall a. MVar a -> IO (Maybe a)
tryTakeMVar (MVar Int -> IO (Maybe Int)) -> IO (MVar Int) -> IO (Maybe Int)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< UVManager -> Int -> IO (MVar Int)
getBlockMVar UVManager
uvm Int
slot   -- clear the parking spot
        Ptr UVLoop -> Ptr UVHandle -> IO ()
f Ptr UVLoop
loop Ptr UVHandle
hdl IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO a
`onException` Ptr UVHandle -> IO ()
hs_uv_handle_free Ptr UVHandle
hdl
        IORef Bool
closed <- Bool -> IO (IORef Bool)
forall a. a -> IO (IORef a)
newIORef Bool
False
        UVStream -> IO UVStream
forall (m :: * -> *) a. Monad m => a -> m a
return (Ptr UVHandle -> Int -> UVManager -> IORef Bool -> UVStream
UVStream Ptr UVHandle
hdl Int
slot UVManager
uvm IORef Bool
closed))
    UVStream -> IO ()
closeUVStream

closeUVStream :: UVStream -> IO ()
closeUVStream :: UVStream -> IO ()
closeUVStream (UVStream Ptr UVHandle
hdl Int
_ UVManager
uvm IORef Bool
closed) = UVManager -> IO () -> IO ()
forall a. HasCallStack => UVManager -> IO a -> IO a
withUVManager' UVManager
uvm (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
    Bool
c <- IORef Bool -> IO Bool
forall a. IORef a -> IO a
readIORef IORef Bool
closed
    -- hs_uv_handle_close won't return error
    Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
c (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ IORef Bool -> Bool -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef Bool
closed Bool
True IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Ptr UVHandle -> IO ()
hs_uv_handle_close Ptr UVHandle
hdl

instance Input UVStream where
    -- readInput :: HasCallStack => UVStream -> Ptr Word8 ->  Int -> IO Int
    {-# INLINABLE readInput  #-}
    readInput :: UVStream -> Ptr Word8 -> Int -> IO Int
readInput (UVStream Ptr UVHandle
hdl Int
slot UVManager
uvm IORef Bool
closed) Ptr Word8
buf Int
len = IO Int -> IO Int
forall a. IO a -> IO a
mask_ (IO Int -> IO Int) -> IO Int -> IO Int
forall a b. (a -> b) -> a -> b
$ do
        Bool
c <- IORef Bool -> IO Bool
forall a. IORef a -> IO a
readIORef IORef Bool
closed
        Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
c IO ()
forall a. HasCallStack => IO a
throwECLOSED
        -- set up buffer
        UVManager -> Int -> Ptr Word8 -> Int -> IO ()
pokeBufferTable UVManager
uvm Int
slot Ptr Word8
buf Int
len
        MVar Int
m <- UVManager -> Int -> IO (MVar Int)
getBlockMVar UVManager
uvm Int
slot
        -- clean up
        Maybe Int
_ <- MVar Int -> IO (Maybe Int)
forall a. MVar a -> IO (Maybe a)
tryTakeMVar MVar Int
m

        IO CInt -> IO ()
forall a. (HasCallStack, Integral a) => IO a -> IO ()
throwUVIfMinus_ (IO CInt -> IO ()) -> IO CInt -> IO ()
forall a b. (a -> b) -> a -> b
$ UVManager -> IO CInt -> IO CInt
forall a. HasCallStack => UVManager -> IO a -> IO a
withUVManager' UVManager
uvm (Ptr UVHandle -> IO CInt
hs_uv_read_start Ptr UVHandle
hdl)
        -- since we are inside mask, this is the only place
        -- async exceptions could possibly kick in, and we should stop reading
        Int
r <- MVar Int -> IO Int
forall a. MVar a -> IO a
takeMVar MVar Int
m IO Int -> IO () -> IO Int
forall a b. IO a -> IO b -> IO a
`onException` (do
                -- normally we call 'uv_read_stop' in C read callback
                -- but when exception raise, here's the place to stop
                IO CInt -> IO ()
forall a. (HasCallStack, Integral a) => IO a -> IO ()
throwUVIfMinus_ (IO CInt -> IO ()) -> IO CInt -> IO ()
forall a b. (a -> b) -> a -> b
$ UVManager -> IO CInt -> IO CInt
forall a. HasCallStack => UVManager -> IO a -> IO a
withUVManager' UVManager
uvm (Ptr UVHandle -> IO CInt
uv_read_stop Ptr UVHandle
hdl)
                IO (Maybe Int) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (MVar Int -> IO (Maybe Int)
forall a. MVar a -> IO (Maybe a)
tryTakeMVar MVar Int
m))

        if  | Int
r Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0  -> Int -> IO Int
forall (m :: * -> *) a. Monad m => a -> m a
return Int
r
            -- r == 0 should be impossible, since we guard this situation in c side
            | Int
r Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== CInt -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral CInt
UV_EOF -> Int -> IO Int
forall (m :: * -> *) a. Monad m => a -> m a
return Int
0
            | Int
r Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
0 ->  IO Int -> IO Int
forall a. (HasCallStack, Integral a) => IO a -> IO a
throwUVIfMinus (Int -> IO Int
forall (m :: * -> *) a. Monad m => a -> m a
return Int
r)

instance Output UVStream where
    -- writeOutput :: HasCallStack => UVStream -> Ptr Word8 -> Int -> IO ()
    {-# INLINABLE writeOutput  #-}
    writeOutput :: UVStream -> Ptr Word8 -> Int -> IO ()
writeOutput (UVStream Ptr UVHandle
hdl Int
_ UVManager
uvm IORef Bool
closed) Ptr Word8
buf Int
len = IO () -> IO ()
forall a. IO a -> IO a
mask_ (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
        Bool
c <- IORef Bool -> IO Bool
forall a. IORef a -> IO a
readIORef IORef Bool
closed
        Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
c IO ()
forall a. HasCallStack => IO a
throwECLOSED
        MVar Int
m <- UVManager -> IO (MVar Int) -> IO (MVar Int)
forall a. HasCallStack => UVManager -> IO a -> IO a
withUVManager' UVManager
uvm (IO (MVar Int) -> IO (MVar Int)) -> IO (MVar Int) -> IO (MVar Int)
forall a b. (a -> b) -> a -> b
$ do
            Int
reqSlot <- HasCallStack => UVManager -> IO UVSlotUnSafe -> IO Int
UVManager -> IO UVSlotUnSafe -> IO Int
getUVSlot UVManager
uvm (Ptr UVHandle -> Ptr Word8 -> Int -> IO UVSlotUnSafe
hs_uv_write Ptr UVHandle
hdl Ptr Word8
buf Int
len)
            MVar Int
m <- UVManager -> Int -> IO (MVar Int)
getBlockMVar UVManager
uvm Int
reqSlot
            Maybe Int
_ <- MVar Int -> IO (Maybe Int)
forall a. MVar a -> IO (Maybe a)
tryTakeMVar MVar Int
m
            MVar Int -> IO (MVar Int)
forall (m :: * -> *) a. Monad m => a -> m a
return MVar Int
m
        -- we can't cancel uv_write_t with current libuv,
        -- otherwise disaster will happen if buffer got collected.
        -- so we have to turn to uninterruptibleMask_'s help.
        -- i.e. writing UVStream is an uninterruptible operation.
        -- OS will guarantee writing TTY and socket will not
        -- hang forever anyway.
        IO Int -> IO ()
forall a. (HasCallStack, Integral a) => IO a -> IO ()
throwUVIfMinus_  (IO Int -> IO Int
forall a. IO a -> IO a
uninterruptibleMask_ (IO Int -> IO Int) -> IO Int -> IO Int
forall a b. (a -> b) -> a -> b
$ MVar Int -> IO Int
forall a. MVar a -> IO a
takeMVar MVar Int
m)

--------------------------------------------------------------------------------