{-|
Module      : Z.IO.Network.UDP
Description : UDP servers and clients
Copyright   : (c) Dong Han, 2018
License     : BSD
Maintainer  : winterland1989@gmail.com
Stability   : experimental
Portability : non-portable

This module provides an API for creating UDP sender and receiver.

* Socket FD is created lazily if no local address is provided, that means various functions
  that need FD will throw bad FD exception if you 'initUDP' with no local address e.g. 'setTTL'.

* If you want to create socket FD but don't care about which port or interface you're using,
  use 'SocketAddrInet' 'portAny' 'inetAny' when 'initUDP'.

* Prefer 'recvUDPLoop' because it can reuse receive buffer.

-}

module Z.IO.Network.UDP (
  -- * TCP Client
    UDP
  , initUDP
  , UDPConfig(..)
  , defaultUDPConfig
  , sendUDP
  , UDPRecvConfig(..)
  , defaultUDPRecvConfig
  , recvUDPLoop
  , recvUDP
  , getSockName
  -- * Connected UDP Client
  , ConnectedUDP
  , connectUDP
  , disconnectUDP
  , getPeerName
  , sendConnectedUDP
  -- * multicast and broadcast
  , setMembership
  , setSourceMembership
  , setMulticastLoop
  , setMulticastTTL
  , setMulticastInterface
  , setBroadcast
  , setTTL
  -- * Constants
  -- ** UDPFlag
  , UDPFlag
  , pattern UDP_DEFAULT
  , pattern UDP_IPV6ONLY
  , pattern UDP_REUSEADDR
  -- ** Membership
  , Membership
  , pattern JOIN_GROUP
  , pattern LEAVE_GROUP
  ) where

import Control.Concurrent
import Control.Monad
import Data.Primitive.PrimArray as A
import Data.IORef
import Data.Word
import Data.Int
import Data.Bits ((.&.))
import Foreign.Storable (peek, poke)
import Foreign.Ptr (plusPtr)
import Foreign.C
import Z.Data.Array           as A
import Z.Data.Vector.Base     as V
import Z.Data.Vector.Extra    as V
import Z.Data.CBytes          as CBytes
import Z.IO.Network.SocketAddr
import Z.Foreign
import Z.IO.UV.FFI
import Z.IO.UV.Manager
import Z.IO.Exception
import Z.IO.Resource

-- | UDP socket client.
--
-- UDP is not a sequential protocol, thus not an instance of 'Input\/Output'.
-- Message are received or sent individually, UDP socket client is NOT thread safe!
-- Use 'MVar' 'UDP' in multiple threads.
--
data UDP = UDP
    { UDP -> Ptr UVHandle
udpHandle  :: {-# UNPACK #-} !(Ptr UVHandle)
    , UDP -> UVSlot
udpSlot    :: {-# UNPACK #-} !UVSlot
    , UDP -> UVManager
udpManager :: UVManager
    , UDP -> MutablePrimArray RealWorld Word8
udpSendBuffer ::  {-# UNPACK #-} !(A.MutablePrimArray RealWorld Word8)
    , UDP -> IORef Bool
udpClosed  :: {-# UNPACK #-} !(IORef Bool)
    }

instance Show UDP where
    show :: UDP -> String
show (UDP Ptr UVHandle
hdl UVSlot
slot UVManager
uvm MutablePrimArray RealWorld Word8
_ IORef Bool
_) =
        String
"UDP{udpHandle=" String -> ShowS
forall a. [a] -> [a] -> [a]
++ Ptr UVHandle -> String
forall a. Show a => a -> String
show Ptr UVHandle
hdl String -> ShowS
forall a. [a] -> [a] -> [a]
++
                String
",udpSlot=" String -> ShowS
forall a. [a] -> [a] -> [a]
++ UVSlot -> String
forall a. Show a => a -> String
show UVSlot
slot String -> ShowS
forall a. [a] -> [a] -> [a]
++
                String
",udpManager=" String -> ShowS
forall a. [a] -> [a] -> [a]
++ UVManager -> String
forall a. Show a => a -> String
show UVManager
uvm String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
"}"

-- | UDP options.
--
-- Though technically message length field in the UDP header is a max of 65535, but large packets
-- could be more likely dropped by routers,
-- usually a packet(IPV4) with a payload <= 508 bytes is considered safe.
data UDPConfig = UDPConfig
    { UDPConfig -> UVSlot
udpSendMsgSize :: {-# UNPACK #-} !Int         -- ^ maximum size of sending buffer
    , UDPConfig -> Maybe (SocketAddr, UDPFlag)
udpLocalAddr :: Maybe (SocketAddr, UDPFlag)   -- ^ do we want bind a local address before receiving & sending?
                                                    --   set to Nothing to let OS pick a random one.
    } deriving (UVSlot -> UDPConfig -> ShowS
[UDPConfig] -> ShowS
UDPConfig -> String
(UVSlot -> UDPConfig -> ShowS)
-> (UDPConfig -> String)
-> ([UDPConfig] -> ShowS)
-> Show UDPConfig
forall a.
(UVSlot -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [UDPConfig] -> ShowS
$cshowList :: [UDPConfig] -> ShowS
show :: UDPConfig -> String
$cshow :: UDPConfig -> String
showsPrec :: UVSlot -> UDPConfig -> ShowS
$cshowsPrec :: UVSlot -> UDPConfig -> ShowS
Show, UDPConfig -> UDPConfig -> Bool
(UDPConfig -> UDPConfig -> Bool)
-> (UDPConfig -> UDPConfig -> Bool) -> Eq UDPConfig
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: UDPConfig -> UDPConfig -> Bool
$c/= :: UDPConfig -> UDPConfig -> Bool
== :: UDPConfig -> UDPConfig -> Bool
$c== :: UDPConfig -> UDPConfig -> Bool
Eq, Eq UDPConfig
Eq UDPConfig
-> (UDPConfig -> UDPConfig -> Ordering)
-> (UDPConfig -> UDPConfig -> Bool)
-> (UDPConfig -> UDPConfig -> Bool)
-> (UDPConfig -> UDPConfig -> Bool)
-> (UDPConfig -> UDPConfig -> Bool)
-> (UDPConfig -> UDPConfig -> UDPConfig)
-> (UDPConfig -> UDPConfig -> UDPConfig)
-> Ord UDPConfig
UDPConfig -> UDPConfig -> Bool
UDPConfig -> UDPConfig -> Ordering
UDPConfig -> UDPConfig -> UDPConfig
forall a.
Eq a
-> (a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
min :: UDPConfig -> UDPConfig -> UDPConfig
$cmin :: UDPConfig -> UDPConfig -> UDPConfig
max :: UDPConfig -> UDPConfig -> UDPConfig
$cmax :: UDPConfig -> UDPConfig -> UDPConfig
>= :: UDPConfig -> UDPConfig -> Bool
$c>= :: UDPConfig -> UDPConfig -> Bool
> :: UDPConfig -> UDPConfig -> Bool
$c> :: UDPConfig -> UDPConfig -> Bool
<= :: UDPConfig -> UDPConfig -> Bool
$c<= :: UDPConfig -> UDPConfig -> Bool
< :: UDPConfig -> UDPConfig -> Bool
$c< :: UDPConfig -> UDPConfig -> Bool
compare :: UDPConfig -> UDPConfig -> Ordering
$ccompare :: UDPConfig -> UDPConfig -> Ordering
$cp1Ord :: Eq UDPConfig
Ord)

-- | @UDPConfig 512 Nothing@
defaultUDPConfig :: UDPConfig
defaultUDPConfig :: UDPConfig
defaultUDPConfig = UVSlot -> Maybe (SocketAddr, UDPFlag) -> UDPConfig
UDPConfig UVSlot
512 Maybe (SocketAddr, UDPFlag)
forall a. Maybe a
Nothing

-- | Initialize a UDP socket.
--
initUDP :: UDPConfig -> Resource UDP
initUDP :: UDPConfig -> Resource UDP
initUDP (UDPConfig UVSlot
sbsiz Maybe (SocketAddr, UDPFlag)
maddr) = IO UDP -> (UDP -> IO ()) -> Resource UDP
forall a. IO a -> (a -> IO ()) -> Resource a
initResource
    (do UVManager
uvm <- IO UVManager
getUVManager
        (Ptr UVHandle
hdl, UVSlot
slot) <- UVManager
-> (Ptr UVLoop -> IO (Ptr UVHandle, UVSlot))
-> IO (Ptr UVHandle, UVSlot)
forall a. HasCallStack => UVManager -> (Ptr UVLoop -> IO a) -> IO a
withUVManager UVManager
uvm ((Ptr UVLoop -> IO (Ptr UVHandle, UVSlot))
 -> IO (Ptr UVHandle, UVSlot))
-> (Ptr UVLoop -> IO (Ptr UVHandle, UVSlot))
-> IO (Ptr UVHandle, UVSlot)
forall a b. (a -> b) -> a -> b
$ \ Ptr UVLoop
loop -> do
            Ptr UVHandle
hdl <- Ptr UVLoop -> IO (Ptr UVHandle)
hs_uv_handle_alloc Ptr UVLoop
loop
            UVSlot
slot <- HasCallStack => UVManager -> IO UVSlotUnsafe -> IO UVSlot
UVManager -> IO UVSlotUnsafe -> IO UVSlot
getUVSlot UVManager
uvm (Ptr UVHandle -> IO UVSlotUnsafe
peekUVHandleData Ptr UVHandle
hdl)
            -- init uv struct
            IO UDPFlag -> IO ()
forall a. (HasCallStack, Integral a) => IO a -> IO ()
throwUVIfMinus_ (Ptr UVLoop -> Ptr UVHandle -> IO UDPFlag
uv_udp_init Ptr UVLoop
loop Ptr UVHandle
hdl)
            (Ptr UVHandle, UVSlot) -> IO (Ptr UVHandle, UVSlot)
forall (m :: * -> *) a. Monad m => a -> m a
return (Ptr UVHandle
hdl, UVSlot
slot)

        -- bind the socket if address is available
        -- This is safe without lock UV manager
        Maybe (SocketAddr, UDPFlag)
-> ((SocketAddr, UDPFlag) -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ Maybe (SocketAddr, UDPFlag)
maddr (((SocketAddr, UDPFlag) -> IO ()) -> IO ())
-> ((SocketAddr, UDPFlag) -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \ (SocketAddr
addr, UDPFlag
flag) ->
            SocketAddr -> (MBA# SocketAddr -> IO ()) -> IO ()
forall a. SocketAddr -> (MBA# SocketAddr -> IO a) -> IO a
withSocketAddrUnsafe SocketAddr
addr ((MBA# SocketAddr -> IO ()) -> IO ())
-> (MBA# SocketAddr -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \ MBA# SocketAddr
p ->
                IO UDPFlag -> IO ()
forall a. (HasCallStack, Integral a) => IO a -> IO ()
throwUVIfMinus_ (Ptr UVHandle -> MBA# SocketAddr -> UDPFlag -> IO UDPFlag
uv_udp_bind Ptr UVHandle
hdl MBA# SocketAddr
p UDPFlag
flag)

        MutablePrimArray RealWorld Word8
sbuf <- UVSlot -> IO (MutablePrimArray (PrimState IO) Word8)
forall (m :: * -> *) a.
(PrimMonad m, Prim a) =>
UVSlot -> m (MutablePrimArray (PrimState m) a)
A.newPinnedPrimArray (UVSlot -> UVSlot -> UVSlot
forall a. Ord a => a -> a -> a
max UVSlot
0 UVSlot
sbsiz)
        IORef Bool
closed <- Bool -> IO (IORef Bool)
forall a. a -> IO (IORef a)
newIORef Bool
False
        UDP -> IO UDP
forall (m :: * -> *) a. Monad m => a -> m a
return (Ptr UVHandle
-> UVSlot
-> UVManager
-> MutablePrimArray RealWorld Word8
-> IORef Bool
-> UDP
UDP Ptr UVHandle
hdl UVSlot
slot UVManager
uvm MutablePrimArray RealWorld Word8
sbuf IORef Bool
closed))
    (\ (UDP Ptr UVHandle
hdl UVSlot
_ UVManager
uvm MutablePrimArray RealWorld Word8
_  IORef Bool
closed) -> UVManager -> IO () -> IO ()
forall a. HasCallStack => UVManager -> IO a -> IO a
withUVManager' UVManager
uvm (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
        Bool
c <- IORef Bool -> IO Bool
forall a. IORef a -> IO a
readIORef IORef Bool
closed
        -- hs_uv_handle_close won't return error
        Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
c (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ IORef Bool -> Bool -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef Bool
closed Bool
True IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Ptr UVHandle -> IO ()
hs_uv_handle_close Ptr UVHandle
hdl)

checkUDPClosed :: HasCallStack => UDP -> IO ()
checkUDPClosed :: UDP -> IO ()
checkUDPClosed UDP
udp = do
    Bool
c <- IORef Bool -> IO Bool
forall a. IORef a -> IO a
readIORef (UDP -> IORef Bool
udpClosed UDP
udp)
    Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
c IO ()
forall a. HasCallStack => IO a
throwECLOSED

-- | Get the local IP and port of the 'UDP'.
getSockName :: HasCallStack => UDP -> IO SocketAddr
getSockName :: UDP -> IO SocketAddr
getSockName udp :: UDP
udp@(UDP Ptr UVHandle
hdl UVSlot
_ UVManager
_ MutablePrimArray RealWorld Word8
_ IORef Bool
_) = do
    HasCallStack => UDP -> IO ()
UDP -> IO ()
checkUDPClosed UDP
udp
    (MBA# SocketAddr -> IO ()) -> IO SocketAddr
withSocketAddrStorageUnsafe ((MBA# SocketAddr -> IO ()) -> IO SocketAddr)
-> (MBA# SocketAddr -> IO ()) -> IO SocketAddr
forall a b. (a -> b) -> a -> b
$ \ MBA# SocketAddr
paddr ->
        IO (UDPFlag, ()) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (UDPFlag, ()) -> IO ()) -> IO (UDPFlag, ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ UDPFlag -> (MBA# SocketAddr -> IO ()) -> IO (UDPFlag, ())
forall a b. Prim a => a -> (MBA# SocketAddr -> IO b) -> IO (a, b)
withPrimUnsafe (CSize -> UDPFlag
forall a b. (Integral a, Num b) => a -> b
fromIntegral CSize
sizeOfSocketAddrStorage :: CInt) ((MBA# SocketAddr -> IO ()) -> IO (UDPFlag, ()))
-> (MBA# SocketAddr -> IO ()) -> IO (UDPFlag, ())
forall a b. (a -> b) -> a -> b
$ \ MBA# SocketAddr
plen ->
            IO UDPFlag -> IO ()
forall a. (HasCallStack, Integral a) => IO a -> IO ()
throwUVIfMinus_ (Ptr UVHandle -> MBA# SocketAddr -> MBA# SocketAddr -> IO UDPFlag
uv_udp_getsockname Ptr UVHandle
hdl MBA# SocketAddr
paddr MBA# SocketAddr
plen)

-- | Wrapper for a connected 'UDP'.
newtype ConnectedUDP = ConnectedUDP UDP deriving UVSlot -> ConnectedUDP -> ShowS
[ConnectedUDP] -> ShowS
ConnectedUDP -> String
(UVSlot -> ConnectedUDP -> ShowS)
-> (ConnectedUDP -> String)
-> ([ConnectedUDP] -> ShowS)
-> Show ConnectedUDP
forall a.
(UVSlot -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [ConnectedUDP] -> ShowS
$cshowList :: [ConnectedUDP] -> ShowS
show :: ConnectedUDP -> String
$cshow :: ConnectedUDP -> String
showsPrec :: UVSlot -> ConnectedUDP -> ShowS
$cshowsPrec :: UVSlot -> ConnectedUDP -> ShowS
Show

-- | Associate the UDP handle to a remote address and port,
-- so every message sent by this handle is automatically sent to that destination
connectUDP :: HasCallStack => UDP -> SocketAddr -> IO ConnectedUDP
connectUDP :: UDP -> SocketAddr -> IO ConnectedUDP
connectUDP udp :: UDP
udp@(UDP Ptr UVHandle
hdl UVSlot
_ UVManager
_ MutablePrimArray RealWorld Word8
_ IORef Bool
_) SocketAddr
addr = do
    HasCallStack => UDP -> IO ()
UDP -> IO ()
checkUDPClosed UDP
udp
    SocketAddr -> (MBA# SocketAddr -> IO ()) -> IO ()
forall a. SocketAddr -> (MBA# SocketAddr -> IO a) -> IO a
withSocketAddrUnsafe SocketAddr
addr ((MBA# SocketAddr -> IO ()) -> IO ())
-> (MBA# SocketAddr -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \ MBA# SocketAddr
paddr ->
        IO UDPFlag -> IO ()
forall a. (HasCallStack, Integral a) => IO a -> IO ()
throwUVIfMinus_ (Ptr UVHandle -> MBA# SocketAddr -> IO UDPFlag
uv_udp_connect Ptr UVHandle
hdl MBA# SocketAddr
paddr)
    ConnectedUDP -> IO ConnectedUDP
forall (m :: * -> *) a. Monad m => a -> m a
return (UDP -> ConnectedUDP
ConnectedUDP UDP
udp)

-- | Disconnect the UDP handle from a remote address and port.
disconnectUDP :: HasCallStack => ConnectedUDP -> IO UDP
disconnectUDP :: ConnectedUDP -> IO UDP
disconnectUDP (ConnectedUDP udp :: UDP
udp@(UDP Ptr UVHandle
hdl UVSlot
_ UVManager
_ MutablePrimArray RealWorld Word8
_ IORef Bool
_)) = do
    HasCallStack => UDP -> IO ()
UDP -> IO ()
checkUDPClosed UDP
udp
    IO UDPFlag -> IO ()
forall a. (HasCallStack, Integral a) => IO a -> IO ()
throwUVIfMinus_ (Ptr UVHandle -> Ptr SocketAddr -> IO UDPFlag
uv_udp_disconnect Ptr UVHandle
hdl Ptr SocketAddr
forall a. Ptr a
nullPtr)
    UDP -> IO UDP
forall (m :: * -> *) a. Monad m => a -> m a
return UDP
udp

-- | Get the remote IP and port on 'ConnectedUDP'.
getPeerName :: HasCallStack => ConnectedUDP -> IO SocketAddr
getPeerName :: ConnectedUDP -> IO SocketAddr
getPeerName (ConnectedUDP udp :: UDP
udp@(UDP Ptr UVHandle
hdl UVSlot
_ UVManager
_ MutablePrimArray RealWorld Word8
_ IORef Bool
_)) = do
    HasCallStack => UDP -> IO ()
UDP -> IO ()
checkUDPClosed UDP
udp
    (MBA# SocketAddr -> IO ()) -> IO SocketAddr
withSocketAddrStorageUnsafe ((MBA# SocketAddr -> IO ()) -> IO SocketAddr)
-> (MBA# SocketAddr -> IO ()) -> IO SocketAddr
forall a b. (a -> b) -> a -> b
$ \ MBA# SocketAddr
paddr ->
        IO (UDPFlag, ()) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (UDPFlag, ()) -> IO ()) -> IO (UDPFlag, ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ UDPFlag -> (MBA# SocketAddr -> IO ()) -> IO (UDPFlag, ())
forall a b. Prim a => a -> (MBA# SocketAddr -> IO b) -> IO (a, b)
withPrimUnsafe (CSize -> UDPFlag
forall a b. (Integral a, Num b) => a -> b
fromIntegral CSize
sizeOfSocketAddrStorage :: CInt) ((MBA# SocketAddr -> IO ()) -> IO (UDPFlag, ()))
-> (MBA# SocketAddr -> IO ()) -> IO (UDPFlag, ())
forall a b. (a -> b) -> a -> b
$ \ MBA# SocketAddr
plen ->
            IO UDPFlag -> IO ()
forall a. (HasCallStack, Integral a) => IO a -> IO ()
throwUVIfMinus_ (Ptr UVHandle -> MBA# SocketAddr -> MBA# SocketAddr -> IO UDPFlag
uv_udp_getpeername Ptr UVHandle
hdl MBA# SocketAddr
paddr MBA# SocketAddr
plen)

-- | Send a UDP message with a connected UDP.
--
-- WARNING: A 'InvalidArgument' with errno 'UV_EMSGSIZE' will be thrown
-- if message is larger than 'sendMsgSize'.
sendConnectedUDP :: HasCallStack => ConnectedUDP -> V.Bytes -> IO ()
sendConnectedUDP :: ConnectedUDP -> Bytes -> IO ()
sendConnectedUDP (ConnectedUDP udp :: UDP
udp@(UDP Ptr UVHandle
hdl UVSlot
_ UVManager
uvm MutablePrimArray RealWorld Word8
sbuf IORef Bool
_)) (V.PrimVector PrimArray Word8
ba UVSlot
s UVSlot
la) = IO () -> IO ()
forall a. IO a -> IO a
mask_ (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
    HasCallStack => UDP -> IO ()
UDP -> IO ()
checkUDPClosed UDP
udp
    -- copy message to pinned buffer
    UVSlot
lb <- MutablePrimArray (PrimState IO) Word8 -> IO UVSlot
forall (m :: * -> *) a.
(PrimMonad m, Prim a) =>
MutablePrimArray (PrimState m) a -> m UVSlot
getSizeofMutablePrimArray MutablePrimArray RealWorld Word8
MutablePrimArray (PrimState IO) Word8
sbuf
    Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (UVSlot
la UVSlot -> UVSlot -> Bool
forall a. Ord a => a -> a -> Bool
> UVSlot
lb) (IO UDPFlag -> IO ()
forall a. (HasCallStack, Integral a) => IO a -> IO ()
throwUVIfMinus_ (UDPFlag -> IO UDPFlag
forall (m :: * -> *) a. Monad m => a -> m a
return UDPFlag
UV_EMSGSIZE))
    MutablePrimArray (PrimState IO) Word8
-> UVSlot -> PrimArray Word8 -> UVSlot -> UVSlot -> IO ()
forall (m :: * -> *) a.
(PrimMonad m, Prim a) =>
MutablePrimArray (PrimState m) a
-> UVSlot -> PrimArray a -> UVSlot -> UVSlot -> m ()
copyPrimArray MutablePrimArray RealWorld Word8
MutablePrimArray (PrimState IO) Word8
sbuf UVSlot
0 PrimArray Word8
ba UVSlot
s UVSlot
la
    MutablePrimArray RealWorld Word8 -> (Ptr Word8 -> IO ()) -> IO ()
forall a b. MutablePrimArray RealWorld a -> (Ptr a -> IO b) -> IO b
withMutablePrimArrayContents MutablePrimArray RealWorld Word8
sbuf ((Ptr Word8 -> IO ()) -> IO ()) -> (Ptr Word8 -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \ Ptr Word8
pbuf -> do
        MVar UVSlot
m <- UVManager -> IO (MVar UVSlot) -> IO (MVar UVSlot)
forall a. HasCallStack => UVManager -> IO a -> IO a
withUVManager' UVManager
uvm (IO (MVar UVSlot) -> IO (MVar UVSlot))
-> IO (MVar UVSlot) -> IO (MVar UVSlot)
forall a b. (a -> b) -> a -> b
$ do
            UVSlot
reqSlot <- HasCallStack => UVManager -> IO UVSlotUnsafe -> IO UVSlot
UVManager -> IO UVSlotUnsafe -> IO UVSlot
getUVSlot UVManager
uvm (Ptr UVHandle -> Ptr Word8 -> UVSlot -> IO UVSlotUnsafe
hs_uv_udp_send_connected Ptr UVHandle
hdl Ptr Word8
pbuf UVSlot
la)
            MVar UVSlot
reqMVar <- UVManager -> UVSlot -> IO (MVar UVSlot)
getBlockMVar UVManager
uvm UVSlot
reqSlot
            -- since we locked uv manager here, it won't affect next event
            Maybe UVSlot
_ <- MVar UVSlot -> IO (Maybe UVSlot)
forall a. MVar a -> IO (Maybe a)
tryTakeMVar MVar UVSlot
reqMVar
            MVar UVSlot -> IO (MVar UVSlot)
forall (m :: * -> *) a. Monad m => a -> m a
return MVar UVSlot
reqMVar
        -- we can't cancel uv_udp_send_t in current libuv
        -- and disaster will happen if buffer got collected.
        -- so we have to turn to uninterruptibleMask_'s help.
        -- i.e. sendUDP is an uninterruptible operation.
        -- OS will guarantee writing a socket will not
        -- hang forever anyway.
        IO UVSlot -> IO ()
forall a. (HasCallStack, Integral a) => IO a -> IO ()
throwUVIfMinus_  (IO UVSlot -> IO UVSlot
forall a. IO a -> IO a
uninterruptibleMask_ (IO UVSlot -> IO UVSlot) -> IO UVSlot -> IO UVSlot
forall a b. (a -> b) -> a -> b
$ MVar UVSlot -> IO UVSlot
forall a. MVar a -> IO a
takeMVar MVar UVSlot
m)

-- | Send a UDP message to target address.
--
-- WARNING: A 'InvalidArgument' with errno 'UV_EMSGSIZE' will be thrown
-- if message is larger than 'sendMsgSize'.
sendUDP :: HasCallStack => UDP -> SocketAddr -> V.Bytes -> IO ()
sendUDP :: UDP -> SocketAddr -> Bytes -> IO ()
sendUDP udp :: UDP
udp@(UDP Ptr UVHandle
hdl UVSlot
_ UVManager
uvm MutablePrimArray RealWorld Word8
sbuf IORef Bool
_) SocketAddr
addr (V.PrimVector PrimArray Word8
ba UVSlot
s UVSlot
la) = IO () -> IO ()
forall a. IO a -> IO a
mask_ (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
    HasCallStack => UDP -> IO ()
UDP -> IO ()
checkUDPClosed UDP
udp
    -- copy message to pinned buffer
    UVSlot
lb <- MutablePrimArray (PrimState IO) Word8 -> IO UVSlot
forall (m :: * -> *) a.
(PrimMonad m, Prim a) =>
MutablePrimArray (PrimState m) a -> m UVSlot
getSizeofMutablePrimArray MutablePrimArray RealWorld Word8
MutablePrimArray (PrimState IO) Word8
sbuf
    Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (UVSlot
la UVSlot -> UVSlot -> Bool
forall a. Ord a => a -> a -> Bool
> UVSlot
lb) (IO UDPFlag -> IO ()
forall a. (HasCallStack, Integral a) => IO a -> IO ()
throwUVIfMinus_ (UDPFlag -> IO UDPFlag
forall (m :: * -> *) a. Monad m => a -> m a
return UDPFlag
UV_EMSGSIZE))
    MutablePrimArray (PrimState IO) Word8
-> UVSlot -> PrimArray Word8 -> UVSlot -> UVSlot -> IO ()
forall (m :: * -> *) a.
(PrimMonad m, Prim a) =>
MutablePrimArray (PrimState m) a
-> UVSlot -> PrimArray a -> UVSlot -> UVSlot -> m ()
copyPrimArray MutablePrimArray RealWorld Word8
MutablePrimArray (PrimState IO) Word8
sbuf UVSlot
0 PrimArray Word8
ba UVSlot
s UVSlot
la
    MutablePrimArray RealWorld Word8 -> (Ptr Word8 -> IO ()) -> IO ()
forall a b. MutablePrimArray RealWorld a -> (Ptr a -> IO b) -> IO b
withMutablePrimArrayContents MutablePrimArray RealWorld Word8
sbuf ((Ptr Word8 -> IO ()) -> IO ()) -> (Ptr Word8 -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \ Ptr Word8
pbuf -> do
        MVar UVSlot
m <- UVManager -> IO (MVar UVSlot) -> IO (MVar UVSlot)
forall a. HasCallStack => UVManager -> IO a -> IO a
withUVManager' UVManager
uvm (IO (MVar UVSlot) -> IO (MVar UVSlot))
-> IO (MVar UVSlot) -> IO (MVar UVSlot)
forall a b. (a -> b) -> a -> b
$ do
            UVSlot
reqSlot <- SocketAddr -> (MBA# SocketAddr -> IO UVSlot) -> IO UVSlot
forall a. SocketAddr -> (MBA# SocketAddr -> IO a) -> IO a
withSocketAddrUnsafe SocketAddr
addr ((MBA# SocketAddr -> IO UVSlot) -> IO UVSlot)
-> (MBA# SocketAddr -> IO UVSlot) -> IO UVSlot
forall a b. (a -> b) -> a -> b
$ \ MBA# SocketAddr
paddr ->
                HasCallStack => UVManager -> IO UVSlotUnsafe -> IO UVSlot
UVManager -> IO UVSlotUnsafe -> IO UVSlot
getUVSlot UVManager
uvm (Ptr UVHandle
-> MBA# SocketAddr -> Ptr Word8 -> UVSlot -> IO UVSlotUnsafe
hs_uv_udp_send Ptr UVHandle
hdl MBA# SocketAddr
paddr Ptr Word8
pbuf UVSlot
la)
            MVar UVSlot
reqMVar <- UVManager -> UVSlot -> IO (MVar UVSlot)
getBlockMVar UVManager
uvm UVSlot
reqSlot
            -- since we locked uv manager here, it won't affect next event
            Maybe UVSlot
_ <- MVar UVSlot -> IO (Maybe UVSlot)
forall a. MVar a -> IO (Maybe a)
tryTakeMVar MVar UVSlot
reqMVar
            MVar UVSlot -> IO (MVar UVSlot)
forall (m :: * -> *) a. Monad m => a -> m a
return MVar UVSlot
reqMVar
        -- we can't cancel uv_udp_send_t in current libuv
        -- and disaster will happen if buffer got collected.
        -- so we have to turn to uninterruptibleMask_'s help.
        -- i.e. sendUDP is an uninterruptible operation.
        -- OS will guarantee writing a socket will not
        -- hang forever anyway.
        IO UVSlot -> IO ()
forall a. (HasCallStack, Integral a) => IO a -> IO ()
throwUVIfMinus_  (IO UVSlot -> IO UVSlot
forall a. IO a -> IO a
uninterruptibleMask_ (IO UVSlot -> IO UVSlot) -> IO UVSlot -> IO UVSlot
forall a b. (a -> b) -> a -> b
$ MVar UVSlot -> IO UVSlot
forall a. MVar a -> IO a
takeMVar MVar UVSlot
m)

-- | Set IP multicast loop flag. Makes multicast packets loop back to local sockets.
setMulticastLoop :: HasCallStack => UDP -> Bool -> IO ()
setMulticastLoop :: UDP -> Bool -> IO ()
setMulticastLoop udp :: UDP
udp@(UDP Ptr UVHandle
hdl UVSlot
_ UVManager
_ MutablePrimArray RealWorld Word8
_ IORef Bool
_) Bool
loop = do
    HasCallStack => UDP -> IO ()
UDP -> IO ()
checkUDPClosed UDP
udp
    IO UDPFlag -> IO ()
forall a. (HasCallStack, Integral a) => IO a -> IO ()
throwUVIfMinus_ (Ptr UVHandle -> UDPFlag -> IO UDPFlag
uv_udp_set_multicast_loop Ptr UVHandle
hdl (if Bool
loop then UDPFlag
1 else UDPFlag
0))

-- | Set the multicast ttl.
setMulticastTTL :: HasCallStack => UDP -> Int -> IO ()
setMulticastTTL :: UDP -> UVSlot -> IO ()
setMulticastTTL udp :: UDP
udp@(UDP Ptr UVHandle
hdl UVSlot
_ UVManager
_ MutablePrimArray RealWorld Word8
_ IORef Bool
_) UVSlot
ttl = do
    HasCallStack => UDP -> IO ()
UDP -> IO ()
checkUDPClosed UDP
udp
    IO UDPFlag -> IO ()
forall a. (HasCallStack, Integral a) => IO a -> IO ()
throwUVIfMinus_ (Ptr UVHandle -> UDPFlag -> IO UDPFlag
uv_udp_set_multicast_ttl Ptr UVHandle
hdl (UVSlot -> UDPFlag
forall a b. (Integral a, Num b) => a -> b
fromIntegral UVSlot
ttl'))
  where ttl' :: UVSlot
ttl' = UVSlot -> UVSlot -> UVSlot -> UVSlot
V.rangeCut UVSlot
ttl UVSlot
1 UVSlot
255

-- | Set the multicast interface to send or receive data on.
setMulticastInterface :: HasCallStack => UDP -> CBytes ->IO ()
setMulticastInterface :: UDP -> CBytes -> IO ()
setMulticastInterface udp :: UDP
udp@(UDP Ptr UVHandle
hdl UVSlot
_ UVManager
_ MutablePrimArray RealWorld Word8
_ IORef Bool
_) CBytes
iaddr = do
    HasCallStack => UDP -> IO ()
UDP -> IO ()
checkUDPClosed UDP
udp
    CBytes -> (BA# Word8 -> IO ()) -> IO ()
forall a. CBytes -> (BA# Word8 -> IO a) -> IO a
withCBytesUnsafe CBytes
iaddr ((BA# Word8 -> IO ()) -> IO ()) -> (BA# Word8 -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \ BA# Word8
iaddrp ->
        IO UDPFlag -> IO ()
forall a. (HasCallStack, Integral a) => IO a -> IO ()
throwUVIfMinus_ (Ptr UVHandle -> BA# Word8 -> IO UDPFlag
uv_udp_set_multicast_interface Ptr UVHandle
hdl BA# Word8
iaddrp)

-- | Set broadcast on or off.
setBroadcast :: HasCallStack => UDP -> Bool -> IO ()
setBroadcast :: UDP -> Bool -> IO ()
setBroadcast udp :: UDP
udp@(UDP Ptr UVHandle
hdl UVSlot
_ UVManager
_ MutablePrimArray RealWorld Word8
_ IORef Bool
_) Bool
b = do
    HasCallStack => UDP -> IO ()
UDP -> IO ()
checkUDPClosed UDP
udp
    IO UDPFlag -> IO ()
forall a. (HasCallStack, Integral a) => IO a -> IO ()
throwUVIfMinus_ (Ptr UVHandle -> UDPFlag -> IO UDPFlag
uv_udp_set_broadcast Ptr UVHandle
hdl (if Bool
b then UDPFlag
1 else UDPFlag
0))

-- | Set the time to live.
setTTL :: HasCallStack
       => UDP
       -> Int       -- ^ 1 ~ 255
       -> IO ()
setTTL :: UDP -> UVSlot -> IO ()
setTTL udp :: UDP
udp@(UDP Ptr UVHandle
hdl UVSlot
_ UVManager
_ MutablePrimArray RealWorld Word8
_ IORef Bool
_) UVSlot
ttl = do
    HasCallStack => UDP -> IO ()
UDP -> IO ()
checkUDPClosed UDP
udp
    IO UDPFlag -> IO ()
forall a. (HasCallStack, Integral a) => IO a -> IO ()
throwUVIfMinus_ (Ptr UVHandle -> UDPFlag -> IO UDPFlag
uv_udp_set_ttl Ptr UVHandle
hdl (UVSlot -> UDPFlag
forall a b. (Integral a, Num b) => a -> b
fromIntegral UVSlot
ttl))

-- | Set membership for a multicast group.
setMembership :: HasCallStack
              => UDP
              -> CBytes             -- ^ Multicast address to set membership for.
              -> CBytes             -- ^ Interface address.
              -> Membership       -- ^ UV_JOIN_GROUP | UV_LEAVE_GROUP
              -> IO ()
setMembership :: UDP -> CBytes -> CBytes -> UDPFlag -> IO ()
setMembership udp :: UDP
udp@(UDP Ptr UVHandle
hdl UVSlot
_ UVManager
_ MutablePrimArray RealWorld Word8
_ IORef Bool
_) CBytes
gaddr CBytes
iaddr UDPFlag
member = do
    HasCallStack => UDP -> IO ()
UDP -> IO ()
checkUDPClosed UDP
udp
    CBytes -> (BA# Word8 -> IO ()) -> IO ()
forall a. CBytes -> (BA# Word8 -> IO a) -> IO a
withCBytesUnsafe CBytes
gaddr ((BA# Word8 -> IO ()) -> IO ()) -> (BA# Word8 -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \ BA# Word8
gaddrp ->
        CBytes -> (BA# Word8 -> IO ()) -> IO ()
forall a. CBytes -> (BA# Word8 -> IO a) -> IO a
withCBytesUnsafe CBytes
iaddr ((BA# Word8 -> IO ()) -> IO ()) -> (BA# Word8 -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \ BA# Word8
iaddrp ->
            IO UDPFlag -> IO ()
forall a. (HasCallStack, Integral a) => IO a -> IO ()
throwUVIfMinus_ (Ptr UVHandle -> BA# Word8 -> BA# Word8 -> UDPFlag -> IO UDPFlag
uv_udp_set_membership Ptr UVHandle
hdl BA# Word8
gaddrp BA# Word8
iaddrp UDPFlag
member)

-- | Set membership for a source-specific multicast group.
setSourceMembership :: HasCallStack
                    => UDP
                    -> CBytes           -- ^ Multicast address to set membership for.
                    -> CBytes           -- ^ Interface address.
                    -> CBytes           -- ^ Source address.
                    -> Membership     -- ^ UV_JOIN_GROUP | UV_LEAVE_GROUP
                    -> IO ()
setSourceMembership :: UDP -> CBytes -> CBytes -> CBytes -> UDPFlag -> IO ()
setSourceMembership udp :: UDP
udp@(UDP Ptr UVHandle
hdl UVSlot
_ UVManager
_ MutablePrimArray RealWorld Word8
_ IORef Bool
_) CBytes
gaddr CBytes
iaddr CBytes
source UDPFlag
member = do
    HasCallStack => UDP -> IO ()
UDP -> IO ()
checkUDPClosed UDP
udp
    CBytes -> (BA# Word8 -> IO ()) -> IO ()
forall a. CBytes -> (BA# Word8 -> IO a) -> IO a
withCBytesUnsafe CBytes
gaddr ((BA# Word8 -> IO ()) -> IO ()) -> (BA# Word8 -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \ BA# Word8
gaddrp ->
        CBytes -> (BA# Word8 -> IO ()) -> IO ()
forall a. CBytes -> (BA# Word8 -> IO a) -> IO a
withCBytesUnsafe CBytes
iaddr ((BA# Word8 -> IO ()) -> IO ()) -> (BA# Word8 -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \ BA# Word8
iaddrp ->
            CBytes -> (BA# Word8 -> IO ()) -> IO ()
forall a. CBytes -> (BA# Word8 -> IO a) -> IO a
withCBytesUnsafe CBytes
source ((BA# Word8 -> IO ()) -> IO ()) -> (BA# Word8 -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \ BA# Word8
sourcep ->
                IO UDPFlag -> IO ()
forall a. (HasCallStack, Integral a) => IO a -> IO ()
throwUVIfMinus_ (Ptr UVHandle
-> BA# Word8 -> BA# Word8 -> BA# Word8 -> UDPFlag -> IO UDPFlag
uv_udp_set_source_membership Ptr UVHandle
hdl BA# Word8
gaddrp BA# Word8
iaddrp BA# Word8
sourcep UDPFlag
member)

--------------------------------------------------------------------------------

-- | Receiving buffering config.
--
data UDPRecvConfig = UDPRecvConfig
    { UDPRecvConfig -> Int32
recvMsgSize :: {-# UNPACK #-} !Int32      -- ^ maximum size of a received message
    , UDPRecvConfig -> UVSlot
recvBatchSize :: {-# UNPACK #-} !Int      -- ^ how many messages we want to receive per uv loop,
                                                --   inside each uv_run, we do batch receiving,
                                                --   increase this number can improve receiving performance,
                                                --   at the cost of memory and potential GHC thread starving.
    }

-- | @UDPRecvConfig 512 6@
defaultUDPRecvConfig :: UDPRecvConfig
defaultUDPRecvConfig :: UDPRecvConfig
defaultUDPRecvConfig = Int32 -> UVSlot -> UDPRecvConfig
UDPRecvConfig Int32
512 UVSlot
6


-- The buffer passing of UDP is a litte complicated here, to get maximum performance,
-- we do batch receiving. i.e. recv multiple messages inside libuv's event loop:
--
--   udpRecvLargeBuffer:
--
--   +---------+--------------+-----------+----------+--------+---------+------------
--   | buf siz | partial flag | addr flag |   addr   | buffer | buf siz | partial ...
--   +--4bytes-+----4bytes----+--4bytes---+-128bytes-+-bufsiz-+---------+------------
--   ^                                                        ^
--   |                                                        |
--   +---------------------+       +--------------------------+
--                         |       |
--                      +--+---+---+--+----
--   udpRecvBufferArray | buf0 | buf1 | ...
--                      +------+------+----
--
-- We allocate a large buffer (buffer_size * buffer_number),
-- each time we poke the udpRecvBufferArray and its last index (size - 1) to uv manager's buffer table.
--
-- On libuv side each alloc callback picks the last pointer from udpRecvBufferArray, decrease last index by 1
-- the read result is write into the `buf siz` cell, then followed with partial flag, if addr is not NULL
-- then addr flag is 1 (otherwise 0), following addr if not NULL, the buffer is already written when
-- recv callback is called.
--
-- On haskell side, we read buffer table's size, which is decreased by n(which is the times callback are called).
-- Then we poke those cells out.
--
newRecvBuf :: Int32 -> Int -> IO (A.MutablePrimArray RealWorld Word8, A.MutablePrimArray RealWorld (Ptr Word8))
newRecvBuf :: Int32
-> UVSlot
-> IO
     (MutablePrimArray RealWorld Word8,
      MutablePrimArray RealWorld (Ptr Word8))
newRecvBuf Int32
bufSiz UVSlot
bufArrSiz = do
    MutablePrimArray RealWorld Word8
rbuf <- UVSlot -> IO (MutablePrimArray (PrimState IO) Word8)
forall (m :: * -> *) a.
(PrimMonad m, Prim a) =>
UVSlot -> m (MutablePrimArray (PrimState m) a)
A.newPinnedPrimArray (Int32 -> UVSlot
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int32
bufsiz' UVSlot -> UVSlot -> UVSlot
forall a. Num a => a -> a -> a
* UVSlot
bufArrSiz')
    MutablePrimArray RealWorld (Ptr Word8)
rbufArr <- UVSlot -> IO (MutablePrimArray (PrimState IO) (Ptr Word8))
forall (m :: * -> *) a.
(PrimMonad m, Prim a) =>
UVSlot -> m (MutablePrimArray (PrimState m) a)
A.newPinnedPrimArray UVSlot
bufArrSiz'

    -- initialize buffer array with right index
    MutablePrimArray RealWorld Word8 -> (Ptr Word8 -> IO ()) -> IO ()
forall a b. MutablePrimArray RealWorld a -> (Ptr a -> IO b) -> IO b
withMutablePrimArrayContents MutablePrimArray RealWorld Word8
rbuf ((Ptr Word8 -> IO ()) -> IO ()) -> (Ptr Word8 -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \ Ptr Word8
p ->
        [UVSlot] -> (UVSlot -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [UVSlot
0..UVSlot
bufArrSiz'UVSlot -> UVSlot -> UVSlot
forall a. Num a => a -> a -> a
-UVSlot
1] ((UVSlot -> IO ()) -> IO ()) -> (UVSlot -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \ UVSlot
i -> do
            let bufNPtr :: Ptr Word8
bufNPtr = Ptr Word8
p Ptr Word8 -> UVSlot -> Ptr Word8
forall a b. Ptr a -> UVSlot -> Ptr b
`plusPtr` (UVSlot
i UVSlot -> UVSlot -> UVSlot
forall a. Num a => a -> a -> a
* Int32 -> UVSlot
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int32
bufsiz')
            MutablePrimArray (PrimState IO) (Ptr Word8)
-> UVSlot -> Ptr Word8 -> IO ()
forall a (m :: * -> *).
(Prim a, PrimMonad m) =>
MutablePrimArray (PrimState m) a -> UVSlot -> a -> m ()
writePrimArray MutablePrimArray RealWorld (Ptr Word8)
MutablePrimArray (PrimState IO) (Ptr Word8)
rbufArr UVSlot
i Ptr Word8
bufNPtr
    (MutablePrimArray RealWorld Word8,
 MutablePrimArray RealWorld (Ptr Word8))
-> IO
     (MutablePrimArray RealWorld Word8,
      MutablePrimArray RealWorld (Ptr Word8))
forall (m :: * -> *) a. Monad m => a -> m a
return (MutablePrimArray RealWorld Word8
rbuf, MutablePrimArray RealWorld (Ptr Word8)
rbufArr)
  where
    -- (message size + sockaddr flag + + flag size) + sockaddr_in size + buffer
    -- see diagram above
    bufsiz' :: Int32
bufsiz' = Int32
140 Int32 -> Int32 -> Int32
forall a. Num a => a -> a -> a
+ (Int32 -> Int32 -> Int32
forall a. Ord a => a -> a -> a
max Int32
0 Int32
bufSiz)
    bufArrSiz' :: UVSlot
bufArrSiz' = UVSlot -> UVSlot -> UVSlot
forall a. Ord a => a -> a -> a
max UVSlot
1 UVSlot
bufArrSiz

-- | Recv UDP message within a loop
--
-- Loop receiving can be faster since it can reuse receiving buffer.
recvUDPLoop :: HasCallStack
            => UDPRecvConfig
            -> UDP
            -> ((Maybe SocketAddr, Bool, V.Bytes) -> IO a)
            -> IO ()
recvUDPLoop :: UDPRecvConfig
-> UDP -> ((Maybe SocketAddr, Bool, Bytes) -> IO a) -> IO ()
recvUDPLoop (UDPRecvConfig Int32
bufSiz UVSlot
bufArrSiz) udp :: UDP
udp@(UDP Ptr UVHandle
hdl UVSlot
slot UVManager
uvm MutablePrimArray RealWorld Word8
_ IORef Bool
_) (Maybe SocketAddr, Bool, Bytes) -> IO a
worker = do
    IO (Ptr UVHandle)
-> (Ptr UVHandle -> IO ()) -> (Ptr UVHandle -> IO ()) -> IO ()
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket
        (do Ptr UVHandle
check <- IO (Ptr UVHandle) -> IO (Ptr UVHandle)
forall a. HasCallStack => IO (Ptr a) -> IO (Ptr a)
throwOOMIfNull (IO (Ptr UVHandle) -> IO (Ptr UVHandle))
-> IO (Ptr UVHandle) -> IO (Ptr UVHandle)
forall a b. (a -> b) -> a -> b
$ IO (Ptr UVHandle)
hs_uv_check_alloc
            IO UDPFlag -> IO ()
forall a. (HasCallStack, Integral a) => IO a -> IO ()
throwUVIfMinus_ (Ptr UVHandle -> Ptr UVHandle -> IO UDPFlag
hs_uv_check_init Ptr UVHandle
check Ptr UVHandle
hdl)
            Ptr UVHandle -> IO (Ptr UVHandle)
forall (m :: * -> *) a. Monad m => a -> m a
return Ptr UVHandle
check)
        Ptr UVHandle -> IO ()
hs_uv_check_close ((Ptr UVHandle -> IO ()) -> IO ())
-> (Ptr UVHandle -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$
        \ Ptr UVHandle
check -> do
            buf :: (MutablePrimArray RealWorld Word8,
 MutablePrimArray RealWorld (Ptr Word8))
buf@(MutablePrimArray RealWorld Word8
_, MutablePrimArray RealWorld (Ptr Word8)
rbufArr) <- Int32
-> UVSlot
-> IO
     (MutablePrimArray RealWorld Word8,
      MutablePrimArray RealWorld (Ptr Word8))
newRecvBuf Int32
bufSiz UVSlot
bufArrSiz
            MutablePrimArray RealWorld (Ptr Word8)
-> (Ptr (Ptr Word8) -> IO ()) -> IO ()
forall a b. MutablePrimArray RealWorld a -> (Ptr a -> IO b) -> IO b
withMutablePrimArrayContents MutablePrimArray RealWorld (Ptr Word8)
rbufArr ((Ptr (Ptr Word8) -> IO ()) -> IO ())
-> (Ptr (Ptr Word8) -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \ Ptr (Ptr Word8)
p -> do
                UVManager -> UVSlot -> Ptr Word8 -> UVSlot -> IO ()
pokeBufferTable UVManager
uvm UVSlot
slot (Ptr (Ptr Word8) -> Ptr Word8
forall a b. Ptr a -> Ptr b
castPtr Ptr (Ptr Word8)
p) (UVSlot
bufArrSizUVSlot -> UVSlot -> UVSlot
forall a. Num a => a -> a -> a
-UVSlot
1)
                -- init uv_check_t must come after poking buffer
                IO UDPFlag -> IO ()
forall a. (HasCallStack, Integral a) => IO a -> IO ()
throwUVIfMinus_ (IO UDPFlag -> IO ()) -> IO UDPFlag -> IO ()
forall a b. (a -> b) -> a -> b
$ Ptr UVHandle -> IO UDPFlag
hs_uv_udp_check_start Ptr UVHandle
check
            IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
                [(Maybe SocketAddr, Bool, Bytes)]
msgs <- HasCallStack =>
UDP
-> (MutablePrimArray RealWorld Word8,
    MutablePrimArray RealWorld (Ptr Word8))
-> Int32
-> IO [(Maybe SocketAddr, Bool, Bytes)]
UDP
-> (MutablePrimArray RealWorld Word8,
    MutablePrimArray RealWorld (Ptr Word8))
-> Int32
-> IO [(Maybe SocketAddr, Bool, Bytes)]
recvUDPWith UDP
udp (MutablePrimArray RealWorld Word8,
 MutablePrimArray RealWorld (Ptr Word8))
buf Int32
bufSiz
                UVManager -> UVSlot -> UVSlot -> IO ()
pokeBufferSizeTable UVManager
uvm UVSlot
slot (UVSlot
bufArrSizUVSlot -> UVSlot -> UVSlot
forall a. Num a => a -> a -> a
-UVSlot
1)
                [(Maybe SocketAddr, Bool, Bytes)]
-> ((Maybe SocketAddr, Bool, Bytes) -> IO a) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [(Maybe SocketAddr, Bool, Bytes)]
msgs (Maybe SocketAddr, Bool, Bytes) -> IO a
worker

-- | Recv messages from UDP socket, return source address if available, and a `Bool`
-- to indicate if the message is partial (larger than receive buffer size).
--
recvUDP :: HasCallStack => UDPRecvConfig -> UDP -> IO [(Maybe SocketAddr, Bool, V.Bytes)]
recvUDP :: UDPRecvConfig -> UDP -> IO [(Maybe SocketAddr, Bool, Bytes)]
recvUDP (UDPRecvConfig Int32
bufSiz UVSlot
bufArrSiz) udp :: UDP
udp@(UDP Ptr UVHandle
hdl UVSlot
slot UVManager
uvm MutablePrimArray RealWorld Word8
_ IORef Bool
_)  = do
    IO (Ptr UVHandle)
-> (Ptr UVHandle -> IO ())
-> (Ptr UVHandle -> IO [(Maybe SocketAddr, Bool, Bytes)])
-> IO [(Maybe SocketAddr, Bool, Bytes)]
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket
        (do Ptr UVHandle
check <- IO (Ptr UVHandle) -> IO (Ptr UVHandle)
forall a. HasCallStack => IO (Ptr a) -> IO (Ptr a)
throwOOMIfNull (IO (Ptr UVHandle) -> IO (Ptr UVHandle))
-> IO (Ptr UVHandle) -> IO (Ptr UVHandle)
forall a b. (a -> b) -> a -> b
$ IO (Ptr UVHandle)
hs_uv_check_alloc
            IO UDPFlag -> IO ()
forall a. (HasCallStack, Integral a) => IO a -> IO ()
throwUVIfMinus_ (Ptr UVHandle -> Ptr UVHandle -> IO UDPFlag
hs_uv_check_init Ptr UVHandle
check Ptr UVHandle
hdl)
            Ptr UVHandle -> IO (Ptr UVHandle)
forall (m :: * -> *) a. Monad m => a -> m a
return Ptr UVHandle
check)
        Ptr UVHandle -> IO ()
hs_uv_check_close ((Ptr UVHandle -> IO [(Maybe SocketAddr, Bool, Bytes)])
 -> IO [(Maybe SocketAddr, Bool, Bytes)])
-> (Ptr UVHandle -> IO [(Maybe SocketAddr, Bool, Bytes)])
-> IO [(Maybe SocketAddr, Bool, Bytes)]
forall a b. (a -> b) -> a -> b
$
        \ Ptr UVHandle
check -> do
            buf :: (MutablePrimArray RealWorld Word8,
 MutablePrimArray RealWorld (Ptr Word8))
buf@(MutablePrimArray RealWorld Word8
_, MutablePrimArray RealWorld (Ptr Word8)
rbufArr) <- Int32
-> UVSlot
-> IO
     (MutablePrimArray RealWorld Word8,
      MutablePrimArray RealWorld (Ptr Word8))
newRecvBuf Int32
bufSiz UVSlot
bufArrSiz
            MutablePrimArray RealWorld (Ptr Word8)
-> (Ptr (Ptr Word8) -> IO ()) -> IO ()
forall a b. MutablePrimArray RealWorld a -> (Ptr a -> IO b) -> IO b
withMutablePrimArrayContents MutablePrimArray RealWorld (Ptr Word8)
rbufArr ((Ptr (Ptr Word8) -> IO ()) -> IO ())
-> (Ptr (Ptr Word8) -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \ Ptr (Ptr Word8)
p -> do
                UVManager -> UVSlot -> Ptr Word8 -> UVSlot -> IO ()
pokeBufferTable UVManager
uvm UVSlot
slot (Ptr (Ptr Word8) -> Ptr Word8
forall a b. Ptr a -> Ptr b
castPtr Ptr (Ptr Word8)
p) (UVSlot
bufArrSizUVSlot -> UVSlot -> UVSlot
forall a. Num a => a -> a -> a
-UVSlot
1)
                -- init uv_check_t must come after poking buffer
                IO UDPFlag -> IO ()
forall a. (HasCallStack, Integral a) => IO a -> IO ()
throwUVIfMinus_ (IO UDPFlag -> IO ()) -> IO UDPFlag -> IO ()
forall a b. (a -> b) -> a -> b
$ Ptr UVHandle -> IO UDPFlag
hs_uv_udp_check_start Ptr UVHandle
check
            HasCallStack =>
UDP
-> (MutablePrimArray RealWorld Word8,
    MutablePrimArray RealWorld (Ptr Word8))
-> Int32
-> IO [(Maybe SocketAddr, Bool, Bytes)]
UDP
-> (MutablePrimArray RealWorld Word8,
    MutablePrimArray RealWorld (Ptr Word8))
-> Int32
-> IO [(Maybe SocketAddr, Bool, Bytes)]
recvUDPWith UDP
udp (MutablePrimArray RealWorld Word8,
 MutablePrimArray RealWorld (Ptr Word8))
buf Int32
bufSiz

recvUDPWith :: HasCallStack
            => UDP
            -> (A.MutablePrimArray RealWorld Word8, A.MutablePrimArray RealWorld (Ptr Word8))
            -> Int32
            -> IO [(Maybe SocketAddr, Bool, V.Bytes)]
recvUDPWith :: UDP
-> (MutablePrimArray RealWorld Word8,
    MutablePrimArray RealWorld (Ptr Word8))
-> Int32
-> IO [(Maybe SocketAddr, Bool, Bytes)]
recvUDPWith udp :: UDP
udp@(UDP Ptr UVHandle
hdl UVSlot
slot UVManager
uvm MutablePrimArray RealWorld Word8
_ IORef Bool
_) (MutablePrimArray RealWorld Word8
rubf, MutablePrimArray RealWorld (Ptr Word8)
rbufArr) Int32
bufSiz =
    -- It's important to keep recv buffer alive, even if we don't directly use it
    IO [(Maybe SocketAddr, Bool, Bytes)]
-> IO [(Maybe SocketAddr, Bool, Bytes)]
forall a. IO a -> IO a
mask_ (IO [(Maybe SocketAddr, Bool, Bytes)]
 -> IO [(Maybe SocketAddr, Bool, Bytes)])
-> ((Ptr Word8 -> IO [(Maybe SocketAddr, Bool, Bytes)])
    -> IO [(Maybe SocketAddr, Bool, Bytes)])
-> (Ptr Word8 -> IO [(Maybe SocketAddr, Bool, Bytes)])
-> IO [(Maybe SocketAddr, Bool, Bytes)]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. MutablePrimArray RealWorld Word8
-> (Ptr Word8 -> IO [(Maybe SocketAddr, Bool, Bytes)])
-> IO [(Maybe SocketAddr, Bool, Bytes)]
forall a b. MutablePrimArray RealWorld a -> (Ptr a -> IO b) -> IO b
withMutablePrimArrayContents MutablePrimArray RealWorld Word8
rubf ((Ptr Word8 -> IO [(Maybe SocketAddr, Bool, Bytes)])
 -> IO [(Maybe SocketAddr, Bool, Bytes)])
-> (Ptr Word8 -> IO [(Maybe SocketAddr, Bool, Bytes)])
-> IO [(Maybe SocketAddr, Bool, Bytes)]
forall a b. (a -> b) -> a -> b
$ \ Ptr Word8
_ -> do
        HasCallStack => UDP -> IO ()
UDP -> IO ()
checkUDPClosed UDP
udp

        UVSlot
bufArrSiz <- MutablePrimArray (PrimState IO) (Ptr Word8) -> IO UVSlot
forall (m :: * -> *) a.
(PrimMonad m, Prim a) =>
MutablePrimArray (PrimState m) a -> m UVSlot
getSizeofMutablePrimArray MutablePrimArray RealWorld (Ptr Word8)
MutablePrimArray (PrimState IO) (Ptr Word8)
rbufArr
        -- we have to reset the buffer size, during receiving it'll be overwritten
        [UVSlot] -> (UVSlot -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [UVSlot
0..UVSlot
bufArrSizUVSlot -> UVSlot -> UVSlot
forall a. Num a => a -> a -> a
-UVSlot
1] ((UVSlot -> IO ()) -> IO ()) -> (UVSlot -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \ UVSlot
i -> do
            Ptr Word8
p <- MutablePrimArray (PrimState IO) (Ptr Word8)
-> UVSlot -> IO (Ptr Word8)
forall a (m :: * -> *).
(Prim a, PrimMonad m) =>
MutablePrimArray (PrimState m) a -> UVSlot -> m a
readPrimArray MutablePrimArray RealWorld (Ptr Word8)
MutablePrimArray (PrimState IO) (Ptr Word8)
rbufArr UVSlot
i
            Ptr Int32 -> Int32 -> IO ()
forall a. Storable a => Ptr a -> a -> IO ()
poke (Ptr Word8 -> Ptr Int32
forall a b. Ptr a -> Ptr b
castPtr Ptr Word8
p :: Ptr Int32) Int32
bufSiz

        MVar UVSlot
m <- UVManager -> UVSlot -> IO (MVar UVSlot)
getBlockMVar UVManager
uvm UVSlot
slot

        IO UDPFlag -> IO ()
forall a. (HasCallStack, Integral a) => IO a -> IO ()
throwUVIfMinus_ (IO UDPFlag -> IO ())
-> (IO UDPFlag -> IO UDPFlag) -> IO UDPFlag -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. UVManager -> IO UDPFlag -> IO UDPFlag
forall a. HasCallStack => UVManager -> IO a -> IO a
withUVManager' UVManager
uvm (IO UDPFlag -> IO ()) -> IO UDPFlag -> IO ()
forall a b. (a -> b) -> a -> b
$ do
            -- clean up
            Maybe UVSlot
_ <- MVar UVSlot -> IO (Maybe UVSlot)
forall a. MVar a -> IO (Maybe a)
tryTakeMVar MVar UVSlot
m
            Ptr UVHandle -> IO UDPFlag
hs_uv_udp_recv_start Ptr UVHandle
hdl

        UVSlot
r <- MVar UVSlot -> IO UVSlot
forall a. MVar a -> IO a
takeMVar MVar UVSlot
m IO UVSlot -> IO () -> IO UVSlot
forall a b. IO a -> IO b -> IO a
`onException` (do
                -- normally we call 'uv_udp_recv_stop' in C read callback
                -- but when exception raise, here's the place to stop
                -- stop a handle twice will be a libuv error, so we don't check result
                UDPFlag
_ <- UVManager -> IO UDPFlag -> IO UDPFlag
forall a. HasCallStack => UVManager -> IO a -> IO a
withUVManager' UVManager
uvm (Ptr UVHandle -> IO UDPFlag
uv_udp_recv_stop Ptr UVHandle
hdl)
                IO (Maybe UVSlot) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (MVar UVSlot -> IO (Maybe UVSlot)
forall a. MVar a -> IO (Maybe a)
tryTakeMVar MVar UVSlot
m))

        [UVSlot]
-> (UVSlot -> IO (Maybe SocketAddr, Bool, Bytes))
-> IO [(Maybe SocketAddr, Bool, Bytes)]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM [UVSlot
rUVSlot -> UVSlot -> UVSlot
forall a. Num a => a -> a -> a
+UVSlot
1..UVSlot
bufArrSizUVSlot -> UVSlot -> UVSlot
forall a. Num a => a -> a -> a
-UVSlot
1] ((UVSlot -> IO (Maybe SocketAddr, Bool, Bytes))
 -> IO [(Maybe SocketAddr, Bool, Bytes)])
-> (UVSlot -> IO (Maybe SocketAddr, Bool, Bytes))
-> IO [(Maybe SocketAddr, Bool, Bytes)]
forall a b. (a -> b) -> a -> b
$ \ UVSlot
i -> do
            Ptr Word8
p        <- MutablePrimArray (PrimState IO) (Ptr Word8)
-> UVSlot -> IO (Ptr Word8)
forall a (m :: * -> *).
(Prim a, PrimMonad m) =>
MutablePrimArray (PrimState m) a -> UVSlot -> m a
readPrimArray MutablePrimArray RealWorld (Ptr Word8)
MutablePrimArray (PrimState IO) (Ptr Word8)
rbufArr UVSlot
i
            -- see the buffer struct diagram above
            UVSlot
result   <- IO UVSlot -> IO UVSlot
forall a. (HasCallStack, Integral a) => IO a -> IO a
throwUVIfMinus (Int32 -> UVSlot
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int32 -> UVSlot) -> IO Int32 -> IO UVSlot
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Ptr Int32 -> IO Int32
forall a. Storable a => Ptr a -> IO a
peek @Int32 (Ptr Word8 -> Ptr Int32
forall a b. Ptr a -> Ptr b
castPtr Ptr Word8
p))
            Int32
flag     <- Ptr Int32 -> IO Int32
forall a. Storable a => Ptr a -> IO a
peek @Int32 (Ptr Any -> Ptr Int32
forall a b. Ptr a -> Ptr b
castPtr (Ptr Word8
p Ptr Word8 -> UVSlot -> Ptr Any
forall a b. Ptr a -> UVSlot -> Ptr b
`plusPtr` UVSlot
4))
            Int32
addrFlag <- Ptr Int32 -> IO Int32
forall a. Storable a => Ptr a -> IO a
peek @Int32 (Ptr Any -> Ptr Int32
forall a b. Ptr a -> Ptr b
castPtr (Ptr Word8
p Ptr Word8 -> UVSlot -> Ptr Any
forall a b. Ptr a -> UVSlot -> Ptr b
`plusPtr` UVSlot
8))
            !Maybe SocketAddr
addr <- if Int32
addrFlag Int32 -> Int32 -> Bool
forall a. Eq a => a -> a -> Bool
== Int32
1
                then SocketAddr -> Maybe SocketAddr
forall a. a -> Maybe a
Just (SocketAddr -> Maybe SocketAddr)
-> IO SocketAddr -> IO (Maybe SocketAddr)
forall (m :: * -> *) a b. Monad m => (a -> b) -> m a -> m b
<$!> HasCallStack => Ptr SocketAddr -> IO SocketAddr
Ptr SocketAddr -> IO SocketAddr
peekSocketAddr (Ptr Any -> Ptr SocketAddr
forall a b. Ptr a -> Ptr b
castPtr (Ptr Word8
p Ptr Word8 -> UVSlot -> Ptr Any
forall a b. Ptr a -> UVSlot -> Ptr b
`plusPtr` UVSlot
12))
                else Maybe SocketAddr -> IO (Maybe SocketAddr)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe SocketAddr
forall a. Maybe a
Nothing
            let !partial :: Bool
partial = Int32
flag Int32 -> Int32 -> Int32
forall a. Bits a => a -> a -> a
.&. Int32
UV_UDP_PARTIAL Int32 -> Int32 -> Bool
forall a. Eq a => a -> a -> Bool
/= Int32
0
            MutablePrimArray RealWorld Word8
mba <- UVSlot -> IO (MutablePrimArray (PrimState IO) Word8)
forall (m :: * -> *) a.
(PrimMonad m, Prim a) =>
UVSlot -> m (MutablePrimArray (PrimState m) a)
A.newPrimArray UVSlot
result
            MutablePrimArray (PrimState IO) Word8
-> UVSlot -> Ptr Word8 -> UVSlot -> IO ()
forall (m :: * -> *) a.
(PrimMonad m, Prim a) =>
MutablePrimArray (PrimState m) a
-> UVSlot -> Ptr a -> UVSlot -> m ()
copyPtrToMutablePrimArray MutablePrimArray RealWorld Word8
MutablePrimArray (PrimState IO) Word8
mba UVSlot
0 (Ptr Word8
p Ptr Word8 -> UVSlot -> Ptr Word8
forall a b. Ptr a -> UVSlot -> Ptr b
`plusPtr` UVSlot
140) UVSlot
result
            PrimArray Word8
ba <- MutablePrimArray (PrimState IO) Word8 -> IO (PrimArray Word8)
forall (m :: * -> *) a.
PrimMonad m =>
MutablePrimArray (PrimState m) a -> m (PrimArray a)
A.unsafeFreezePrimArray MutablePrimArray RealWorld Word8
MutablePrimArray (PrimState IO) Word8
mba
            (Maybe SocketAddr, Bool, Bytes)
-> IO (Maybe SocketAddr, Bool, Bytes)
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe SocketAddr
addr, Bool
partial, PrimArray Word8 -> UVSlot -> UVSlot -> Bytes
forall a. PrimArray a -> UVSlot -> UVSlot -> PrimVector a
V.PrimVector PrimArray Word8
ba UVSlot
0 UVSlot
result)