{-# language BangPatterns #-}
{-# language LambdaCase #-}
{-# language MagicHash #-}
{-# language MultiWayIf #-}
{-# language UnboxedTuples #-}
module Stream.Receive.Indefinite
( receiveExactly
, receiveOnce
, receiveBetween
) where
import Control.Monad (when)
import Control.Concurrent.STM (TVar)
import Foreign.C.Error (Errno(..), eAGAIN, eWOULDBLOCK, eCONNRESET, eHOSTUNREACH)
import Foreign.C.Types (CSize)
import GHC.Exts (RealWorld,State#,Int(I#),Int#)
import GHC.IO (IO(IO))
import Socket.Error (die)
import Socket.EventManager (Token)
import Socket.Stream (ReceiveException(..),Connection(..))
import Socket.Buffer (Buffer)
import Socket.Interrupt (Interrupt,Intr,wait,tokenToStreamReceiveException)
import System.Posix.Types (Fd)
import qualified Foreign.C.Error.Describe as D
import qualified Socket.EventManager as EM
import qualified Socket.Buffer as Buffer
import qualified Stream.Receive as Receive
receiveExactly ::
Interrupt
-> Connection
-> Buffer
-> IO (Either (ReceiveException Intr) ())
receiveExactly !intr (Connection !conn) !buf = do
let !mngr = EM.manager
!tv <- EM.reader mngr conn
e <- box $ receiveLoop intr conn tv buf (Buffer.length buf) 0
case e of
Left err -> pure (Left err)
Right _ -> pure (Right ())
receiveOnce ::
Interrupt
-> Connection
-> Buffer
-> IO (Either (ReceiveException Intr) Int)
receiveOnce !intr (Connection !conn) !buf = do
let !mngr = EM.manager
!tv <- EM.reader mngr conn
box $ receiveLoop intr conn tv buf 1 0
receiveBetween :: Interrupt -> Connection -> Buffer -> Int -> IO (Either (ReceiveException Intr) Int)
receiveBetween !intr (Connection !conn) !buf !minLen
| Buffer.length buf > 0 = if minLen >= 0 && minLen <= Buffer.length buf
then do
let !mngr = EM.manager
!tv <- EM.reader mngr conn
box $ receiveLoop intr conn tv buf minLen 0
else die "Socket.Stream.IPv4.receive: negative slice length"
| Buffer.length buf == 0 && minLen == 0 = pure (Right 0)
| otherwise = die "Socket.Stream.IPv4.receive: negative slice length"
type Result# = State# RealWorld -> (# State# RealWorld, (# ReceiveException Intr | Int# #) #)
unbox :: IO (Either (ReceiveException Intr) Int) -> Result#
{-# inline unbox #-}
unbox (IO f) = \s0 -> case f s0 of
(# s1, e #) -> case e of
Left err -> (# s1, (# err | #) #)
Right (I# i) -> (# s1, (# | i #) #)
box :: Result# -> IO (Either (ReceiveException Intr) Int)
box f = IO $ \s0 -> case f s0 of
(# s1, e #) -> case e of
(# err | #) -> (# s1, Left err #)
(# | i #) -> (# s1, Right (I# i) #)
receiveLoop ::
Interrupt
-> Fd
-> TVar Token
-> Buffer
-> Int
-> Int
-> Result#
receiveLoop !intr !conn !tv !buf !minLen !total
| minLen <= 0 = unbox (pure (Right total))
| otherwise = unbox $ do
let !maxLen = Buffer.length buf
!token <- wait intr tv
case tokenToStreamReceiveException token total of
Left err -> pure (Left err)
Right _ -> Receive.receiveOnce conn buf >>= \case
Left err ->
if | err == eAGAIN || err == eWOULDBLOCK -> do
EM.persistentUnready token tv
box $ receiveLoop intr conn tv buf minLen total
| err == eCONNRESET -> pure (Left ReceiveReset)
| err == eHOSTUNREACH -> pure (Left ReceiveHostUnreachable)
| otherwise -> die ("Socket.Stream.IPv4.receive: " ++ describeErrorCode err)
Right recvSzCInt -> if recvSzCInt /= 0
then do
let recvSz = csizeToInt recvSzCInt
when (recvSz < maxLen) (EM.unready token tv)
box $ receiveLoop intr conn tv
(Buffer.advance buf recvSz) (minLen - recvSz) (total + recvSz)
else pure (Left ReceiveShutdown)
csizeToInt :: CSize -> Int
csizeToInt = fromIntegral
describeErrorCode :: Errno -> String
describeErrorCode err@(Errno e) = "error code " ++ D.string err ++ " (" ++ show e ++ ")"