module Z.IO.Network.UDP (
UDP
, initUDP
, UDPConfig(..)
, defaultUDPConfig
, sendUDP
, UDPRecvConfig(..)
, defaultUDPRecvConfig
, recvUDPLoop
, recvUDP
, getSockName
, ConnectedUDP
, connectUDP
, disconnectUDP
, getPeerName
, sendConnectedUDP
, setMembership
, setSourceMembership
, setMulticastLoop
, setMulticastTTL
, setMulticastInterface
, setBroadcast
, setTTL
, UDPFlag
, pattern UDP_DEFAULT
, pattern UDP_IPV6ONLY
, pattern UDP_REUSEADDR
, Membership
, pattern JOIN_GROUP
, pattern LEAVE_GROUP
) where
import Control.Concurrent
import Control.Monad
import Data.Primitive.PrimArray as A
import Data.IORef
import Data.Word
import Data.Int
import Data.Bits ((.&.))
import Foreign.Storable (peek, poke)
import Foreign.Ptr (plusPtr)
import Foreign.C
import Z.Data.Array as A
import Z.Data.Vector.Base as V
import Z.Data.Vector.Extra as V
import Z.Data.CBytes as CBytes
import Z.IO.Network.SocketAddr
import Z.Foreign
import Z.IO.UV.Errno (pattern UV_EMSGSIZE)
import Z.IO.UV.FFI
import Z.IO.UV.Manager
import Z.IO.Exception
import Z.IO.Resource
data UDP = UDP
{ udpHandle :: {-# UNPACK #-} !(Ptr UVHandle)
, udpSlot :: {-# UNPACK #-} !UVSlot
, udpManager :: UVManager
, udpSendBuffer :: {-# UNPACK #-} !(A.MutablePrimArray RealWorld Word8)
, udpClosed :: {-# UNPACK #-} !(IORef Bool)
}
instance Show UDP where
show (UDP hdl slot uvm _ _) =
"UDP{udpHandle=" ++ show hdl ++
",udpSlot=" ++ show slot ++
",udpManager=" ++ show uvm ++ "}"
data UDPConfig = UDPConfig
{ udpSendMsgSize :: {-# UNPACK #-} !Int
, udpLocalAddr :: Maybe (SocketAddr, UDPFlag)
} deriving (Show, Eq, Ord)
defaultUDPConfig :: UDPConfig
defaultUDPConfig = UDPConfig 512 Nothing
initUDP :: HasCallStack
=> UDPConfig
-> Resource UDP
initUDP (UDPConfig sbsiz maddr) = initResource
(do uvm <- getUVManager
(hdl, slot) <- withUVManager uvm $ \ loop -> do
hdl <- hs_uv_handle_alloc loop
slot <- getUVSlot uvm (peekUVHandleData hdl)
_ <- tryTakeMVar =<< getBlockMVar uvm slot
(do throwUVIfMinus_ (uv_udp_init loop hdl)
) `onException` hs_uv_handle_free hdl
return (hdl, slot)
forM_ maddr $ \ (addr, flag) ->
withSocketAddrUnsafe addr $ \ p ->
throwUVIfMinus_ (uv_udp_bind hdl p flag)
sbuf <- A.newPinnedPrimArray (max 0 sbsiz)
closed <- newIORef False
return (UDP hdl slot uvm sbuf closed))
(\ (UDP hdl _ uvm _ closed) -> withUVManager' uvm $ do
c <- readIORef closed
unless c $ writeIORef closed True >> hs_uv_handle_close hdl)
checkUDPClosed :: HasCallStack => UDP -> IO ()
checkUDPClosed udp = do
c <- readIORef (udpClosed udp)
when c throwECLOSED
getSockName :: HasCallStack => UDP -> IO SocketAddr
getSockName udp@(UDP hdl _ _ _ _) = do
checkUDPClosed udp
withSocketAddrStorageUnsafe $ \ paddr ->
void $ withPrimUnsafe (fromIntegral sizeOfSocketAddrStorage :: CInt) $ \ plen ->
throwUVIfMinus_ (uv_udp_getsockname hdl paddr plen)
newtype ConnectedUDP = ConnectedUDP UDP deriving Show
connectUDP :: HasCallStack => UDP -> SocketAddr -> IO ConnectedUDP
connectUDP udp@(UDP hdl _ _ _ _) addr = do
checkUDPClosed udp
withSocketAddrUnsafe addr $ \ paddr ->
throwUVIfMinus_ (uv_udp_connect hdl paddr)
return (ConnectedUDP udp)
disconnectUDP :: HasCallStack => ConnectedUDP -> IO UDP
disconnectUDP (ConnectedUDP udp@(UDP hdl _ _ _ _)) = do
checkUDPClosed udp
throwUVIfMinus_ (uv_udp_disconnect hdl nullPtr)
return udp
getPeerName :: HasCallStack => ConnectedUDP -> IO SocketAddr
getPeerName (ConnectedUDP udp@(UDP hdl _ _ _ _)) = do
checkUDPClosed udp
withSocketAddrStorageUnsafe $ \ paddr ->
void $ withPrimUnsafe (fromIntegral sizeOfSocketAddrStorage :: CInt) $ \ plen ->
throwUVIfMinus_ (uv_udp_getpeername hdl paddr plen)
sendConnectedUDP :: HasCallStack => ConnectedUDP -> V.Bytes -> IO ()
sendConnectedUDP (ConnectedUDP udp@(UDP hdl _ uvm sbuf _)) (V.PrimVector ba s la) = mask_ $ do
checkUDPClosed udp
lb <- getSizeofMutablePrimArray sbuf
when (la > lb) (throwUVIfMinus_ (return UV_EMSGSIZE))
copyPrimArray sbuf 0 ba s la
withMutablePrimArrayContents sbuf $ \ pbuf -> do
m <- withUVManager' uvm $ do
reqSlot <- getUVSlot uvm (hs_uv_udp_send_connected hdl pbuf la)
reqMVar <- getBlockMVar uvm reqSlot
_ <- tryTakeMVar reqMVar
return reqMVar
throwUVIfMinus_ (uninterruptibleMask_ $ takeMVar m)
sendUDP :: HasCallStack => UDP -> SocketAddr -> V.Bytes -> IO ()
sendUDP udp@(UDP hdl _ uvm sbuf _) addr (V.PrimVector ba s la) = mask_ $ do
checkUDPClosed udp
lb <- getSizeofMutablePrimArray sbuf
when (la > lb) (throwUVIfMinus_ (return UV_EMSGSIZE))
copyPrimArray sbuf 0 ba s la
withMutablePrimArrayContents sbuf $ \ pbuf -> do
m <- withUVManager' uvm $ do
reqSlot <- withSocketAddrUnsafe addr $ \ paddr ->
getUVSlot uvm (hs_uv_udp_send hdl paddr pbuf la)
reqMVar <- getBlockMVar uvm reqSlot
_ <- tryTakeMVar reqMVar
return reqMVar
throwUVIfMinus_ (uninterruptibleMask_ $ takeMVar m)
setMulticastLoop :: HasCallStack => UDP -> Bool -> IO ()
setMulticastLoop udp@(UDP hdl _ _ _ _) loop = do
checkUDPClosed udp
throwUVIfMinus_ (uv_udp_set_multicast_loop hdl (if loop then 1 else 0))
setMulticastTTL :: HasCallStack => UDP -> Int -> IO ()
setMulticastTTL udp@(UDP hdl _ _ _ _) ttl = do
checkUDPClosed udp
throwUVIfMinus_ (uv_udp_set_multicast_ttl hdl (fromIntegral ttl'))
where ttl' = V.rangeCut ttl 1 255
setMulticastInterface :: HasCallStack => UDP -> CBytes ->IO ()
setMulticastInterface udp@(UDP hdl _ _ _ _) iaddr = do
checkUDPClosed udp
withCBytesUnsafe iaddr $ \ iaddrp ->
throwUVIfMinus_ (uv_udp_set_multicast_interface hdl iaddrp)
setBroadcast :: HasCallStack => UDP -> Bool -> IO ()
setBroadcast udp@(UDP hdl _ _ _ _) b = do
checkUDPClosed udp
throwUVIfMinus_ (uv_udp_set_broadcast hdl (if b then 1 else 0))
setTTL :: HasCallStack
=> UDP
-> Int
-> IO ()
setTTL udp@(UDP hdl _ _ _ _) ttl = do
checkUDPClosed udp
throwUVIfMinus_ (uv_udp_set_ttl hdl (fromIntegral ttl))
setMembership :: HasCallStack
=> UDP
-> CBytes
-> CBytes
-> Membership
-> IO ()
setMembership udp@(UDP hdl _ _ _ _) gaddr iaddr member = do
checkUDPClosed udp
withCBytesUnsafe gaddr $ \ gaddrp ->
withCBytesUnsafe iaddr $ \ iaddrp ->
throwUVIfMinus_ (uv_udp_set_membership hdl gaddrp iaddrp member)
setSourceMembership :: HasCallStack
=> UDP
-> CBytes
-> CBytes
-> CBytes
-> Membership
-> IO ()
setSourceMembership udp@(UDP hdl _ _ _ _) gaddr iaddr source member = do
checkUDPClosed udp
withCBytesUnsafe gaddr $ \ gaddrp ->
withCBytesUnsafe iaddr $ \ iaddrp ->
withCBytesUnsafe source $ \ sourcep ->
throwUVIfMinus_ (uv_udp_set_source_membership hdl gaddrp iaddrp sourcep member)
data UDPRecvConfig = UDPRecvConfig
{ recvMsgSize :: {-# UNPACK #-} !Int32
, recvBatchSize :: {-# UNPACK #-} !Int
}
defaultUDPRecvConfig :: UDPRecvConfig
defaultUDPRecvConfig = UDPRecvConfig 512 6
newRecvBuf :: Int32 -> Int -> IO (A.MutablePrimArray RealWorld Word8, A.MutablePrimArray RealWorld (Ptr Word8))
newRecvBuf bufSiz bufArrSiz = do
rbuf <- A.newPinnedPrimArray (fromIntegral bufsiz' * bufArrSiz')
rbufArr <- A.newPinnedPrimArray bufArrSiz'
withMutablePrimArrayContents rbuf $ \ p ->
forM_ [0..bufArrSiz'-1] $ \ i -> do
let bufNPtr = p `plusPtr` (i * fromIntegral bufsiz')
writePrimArray rbufArr i bufNPtr
return (rbuf, rbufArr)
where
bufsiz' = 140 + (max 0 bufSiz)
bufArrSiz' = max 1 bufArrSiz
recvUDPLoop :: HasCallStack
=> UDPRecvConfig
-> UDP
-> ((Maybe SocketAddr, Bool, V.Bytes) -> IO a)
-> IO ()
recvUDPLoop (UDPRecvConfig bufSiz bufArrSiz) udp@(UDP hdl slot uvm _ _) worker = do
bracket
(throwOOMIfNull $ hs_uv_udp_check_alloc hdl)
hs_uv_udp_check_close $
\ check -> do
buf@(_, rbufArr) <- newRecvBuf bufSiz bufArrSiz
withMutablePrimArrayContents rbufArr $ \ p -> do
pokeBufferTable uvm slot (castPtr p) (bufArrSiz-1)
throwUVIfMinus_ $ hs_uv_udp_check_init check
forever $ do
msgs <- recvUDPWith udp check buf bufSiz
withMutablePrimArrayContents rbufArr $ \ p ->
pokeBufferTable uvm slot (castPtr p) (bufArrSiz-1)
forM_ msgs worker
recvUDP :: HasCallStack => UDPRecvConfig -> UDP -> IO [(Maybe SocketAddr, Bool, V.Bytes)]
recvUDP (UDPRecvConfig bufSiz bufArrSiz) udp@(UDP hdl slot uvm _ _) = do
bracket
(throwOOMIfNull $ hs_uv_udp_check_alloc hdl)
hs_uv_udp_check_close $
\ check -> do
buf@(_, rbufArr) <- newRecvBuf bufSiz bufArrSiz
withMutablePrimArrayContents rbufArr $ \ p -> do
pokeBufferTable uvm slot (castPtr p) (bufArrSiz-1)
throwUVIfMinus_ $ hs_uv_udp_check_init check
recvUDPWith udp check buf bufSiz
recvUDPWith :: HasCallStack
=> UDP
-> Ptr UVHandle
-> (A.MutablePrimArray RealWorld Word8, A.MutablePrimArray RealWorld (Ptr Word8))
-> Int32
-> IO [(Maybe SocketAddr, Bool, V.Bytes)]
recvUDPWith udp@(UDP hdl slot uvm _ _) check (rubf, rbufArr) bufSiz =
mask_ . withMutablePrimArrayContents rubf $ \ _ -> do
checkUDPClosed udp
bufArrSiz <- getSizeofMutablePrimArray rbufArr
forM_ [0..bufArrSiz-1] $ \ i -> do
p <- readPrimArray rbufArr i
poke (castPtr p :: Ptr Int32) bufSiz
m <- getBlockMVar uvm slot
throwUVIfMinus_ . withUVManager' uvm $ do
_ <- tryTakeMVar m
hs_uv_udp_recv_start hdl
r <- takeMVar m `onException` (do
throwUVIfMinus_ $ withUVManager' uvm (uv_udp_recv_stop hdl)
void (tryTakeMVar m))
forM [r+1..bufArrSiz-1] $ \ i -> do
p <- readPrimArray rbufArr i
result <- throwUVIfMinus (fromIntegral <$> peek @Int32 (castPtr p))
flag <- peek @Int32 (castPtr (p `plusPtr` 4))
addrFlag <- peek @Int32 (castPtr (p `plusPtr` 8))
!addr <- if addrFlag == 1
then Just <$!> peekSocketAddr (castPtr (p `plusPtr` 12))
else return Nothing
let !partial = flag .&. UV_UDP_PARTIAL /= 0
mba <- A.newPrimArray result
copyPtrToMutablePrimArray mba 0 (p `plusPtr` 140) result
ba <- A.unsafeFreezePrimArray mba
return (addr, partial, V.PrimVector ba 0 result)