#include "inline.hs"

-- |
-- Module      : Streamly.Internal.Network.Socket
-- Copyright   : (c) 2018 Composewell Technologies
--
-- License     : BSD3
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC
--
module Streamly.Internal.Network.Socket
    (
    SockSpec (..)
    -- * Use a socket
    , forSocketM
    , withSocket

    -- * Accept connections
    , accept
    , connections
    , connect
    , connectFrom

    -- * Read from connection
    , read
    , readWithBufferOf
    -- , readUtf8
    -- , readLines
    -- , readFrames
    -- , readByChunks

    -- -- * Array Read
    -- , readArrayUpto
    -- , readChunksUpto
    , readChunk
    , readChunks
    , readChunksWithBufferOf

    , toChunksWithBufferOf
    , toChunks
    , toBytes

    -- * Write to connection
    , write
    -- , writeUtf8
    -- , writeUtf8ByLines
    -- , writeByFrames
    , writeWithBufferOf
    , writeMaybesWithBufferOf

    , putChunks
    , putBytesWithBufferOf
    , putBytes

    -- -- * Array Write
    , writeChunk
    , writeChunks
    , writeChunksWithBufferOf

    -- reading/writing datagrams
    )
where

import Control.Concurrent (threadWaitWrite, rtsSupportsBoundThreads)
import Control.Exception (onException)
import Control.Monad.Catch (MonadCatch, finally, MonadMask)
import Control.Monad.IO.Class (MonadIO(..))
import Control.Monad (forM_, when)
import Data.Maybe (isNothing, fromJust)
import Data.Word (Word8)
import Foreign.Ptr (minusPtr, plusPtr, Ptr, castPtr)
import Foreign.Storable (Storable(..))
import GHC.ForeignPtr (mallocPlainForeignPtrBytes)
import Network.Socket
       (Socket, SocketOption(..), Family(..), SockAddr(..),
        ProtocolNumber, withSocketsDo, SocketType(..), socket, bind,
        setSocketOption, sendBuf, recvBuf)
#if MIN_VERSION_network(3,1,0)
import Network.Socket (withFdSocket)
#else
import Network.Socket (fdSocket)
#endif
import Prelude hiding (read)

import qualified Network.Socket as Net

import Streamly.Internal.BaseCompat
import Streamly.Internal.Control.Concurrent (MonadAsync)
import Streamly.Internal.Data.Array.Foreign.Mut.Type
    (fromForeignPtrUnsafe, touch)
import Streamly.Internal.Data.Array.Foreign.Type (Array(..))
import Streamly.Internal.Data.Array.Stream.Foreign (lpackArraysChunksOf)
import Streamly.Internal.Data.Fold (Fold)
import Streamly.Internal.Data.Stream.IsStream.Type
    (IsStream, mkStream, fromStreamD)
import Streamly.Internal.Data.Stream.Serial (SerialT)
import Streamly.Internal.Data.Unfold.Type (Unfold(..))
-- import Streamly.String (encodeUtf8, decodeUtf8, foldLines)
import Streamly.Internal.System.IO (defaultChunkSize)

import qualified Streamly.Internal.Data.Array.Foreign as A
import qualified Streamly.Internal.Data.Array.Foreign.Type as A
import qualified Streamly.Internal.Data.Array.Stream.Foreign as AS
import qualified Streamly.Internal.Data.Fold as FL
import qualified Streamly.Internal.Data.Stream.IsStream as S
import qualified Streamly.Internal.Data.Stream.StreamD.Type as D
import qualified Streamly.Internal.Data.Unfold as UF

-- | @'forSocketM' action socket@ runs the monadic computation @action@ passing
-- the socket handle to it.  The handle will be closed on exit from
-- 'forSocketM', whether by normal termination or by raising an exception.  If
-- closing the handle raises an exception, then this exception will be raised
-- by 'forSocketM' rather than any exception raised by 'action'.
--
-- @since 0.8.0
{-# INLINE forSocketM #-}
forSocketM :: (MonadMask m, MonadIO m) => (Socket -> m ()) -> Socket -> m ()
forSocketM :: (Socket -> m ()) -> Socket -> m ()
forSocketM Socket -> m ()
f Socket
sk = m () -> m () -> m ()
forall (m :: * -> *) a b. MonadMask m => m a -> m b -> m a
finally (Socket -> m ()
f Socket
sk) (IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Socket -> IO ()
Net.close Socket
sk))

-- | Like 'forSocketM' but runs a streaming computation instead of a monadic
-- computation.
--
-- /Inhibits stream fusion/
--
-- /Internal/
{-# INLINE withSocket #-}
withSocket :: (IsStream t, MonadAsync m, MonadCatch m)
    => Socket -> (Socket -> t m a) -> t m a
withSocket :: Socket -> (Socket -> t m a) -> t m a
withSocket Socket
sk Socket -> t m a
f = m () -> t m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
(IsStream t, MonadAsync m, MonadCatch m) =>
m b -> t m a -> t m a
S.finally (IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Socket -> IO ()
Net.close Socket
sk) (Socket -> t m a
f Socket
sk)

-------------------------------------------------------------------------------
-- Accept (Unfolds)
-------------------------------------------------------------------------------

-- XXX Protocol specific socket options should be separated from socket level
-- options.
--
-- | Specify the socket protocol details.
data SockSpec = SockSpec
    {
      SockSpec -> Family
sockFamily :: !Family
    , SockSpec -> SocketType
sockType   :: !SocketType
    , SockSpec -> ProtocolNumber
sockProto  :: !ProtocolNumber
    , SockSpec -> [(SocketOption, Int)]
sockOpts   :: ![(SocketOption, Int)]
    }

initListener :: Int -> SockSpec -> SockAddr -> IO Socket
initListener :: Int -> SockSpec -> SockAddr -> IO Socket
initListener Int
listenQLen SockSpec{[(SocketOption, Int)]
ProtocolNumber
SocketType
Family
sockOpts :: [(SocketOption, Int)]
sockProto :: ProtocolNumber
sockType :: SocketType
sockFamily :: Family
sockOpts :: SockSpec -> [(SocketOption, Int)]
sockProto :: SockSpec -> ProtocolNumber
sockType :: SockSpec -> SocketType
sockFamily :: SockSpec -> Family
..} SockAddr
addr =
  IO Socket -> IO Socket
forall a. IO a -> IO a
withSocketsDo (IO Socket -> IO Socket) -> IO Socket -> IO Socket
forall a b. (a -> b) -> a -> b
$ do
    Socket
sock <- Family -> SocketType -> ProtocolNumber -> IO Socket
socket Family
sockFamily SocketType
sockType ProtocolNumber
sockProto
    Socket -> IO ()
use Socket
sock IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO a
`onException` Socket -> IO ()
Net.close Socket
sock
    Socket -> IO Socket
forall (m :: * -> *) a. Monad m => a -> m a
return Socket
sock

    where

    use :: Socket -> IO ()
use Socket
sock = do
        ((SocketOption, Int) -> IO ()) -> [(SocketOption, Int)] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (\(SocketOption
opt, Int
val) -> Socket -> SocketOption -> Int -> IO ()
setSocketOption Socket
sock SocketOption
opt Int
val) [(SocketOption, Int)]
sockOpts
        Socket -> SockAddr -> IO ()
bind Socket
sock SockAddr
addr
        Socket -> Int -> IO ()
Net.listen Socket
sock Int
listenQLen

{-# INLINE listenTuples #-}
listenTuples :: MonadIO m
    => Unfold m (Int, SockSpec, SockAddr) (Socket, SockAddr)
listenTuples :: Unfold m (Int, SockSpec, SockAddr) (Socket, SockAddr)
listenTuples = (Socket -> m (Step Socket (Socket, SockAddr)))
-> ((Int, SockSpec, SockAddr) -> m Socket)
-> Unfold m (Int, SockSpec, SockAddr) (Socket, SockAddr)
forall (m :: * -> *) a b s.
(s -> m (Step s b)) -> (a -> m s) -> Unfold m a b
Unfold Socket -> m (Step Socket (Socket, SockAddr))
forall (m :: * -> *).
MonadIO m =>
Socket -> m (Step Socket (Socket, SockAddr))
step (Int, SockSpec, SockAddr) -> m Socket
forall (m :: * -> *).
MonadIO m =>
(Int, SockSpec, SockAddr) -> m Socket
inject
    where
    inject :: (Int, SockSpec, SockAddr) -> m Socket
inject (Int
listenQLen, SockSpec
spec, SockAddr
addr) =
        IO Socket -> m Socket
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Socket -> m Socket) -> IO Socket -> m Socket
forall a b. (a -> b) -> a -> b
$ Int -> SockSpec -> SockAddr -> IO Socket
initListener Int
listenQLen SockSpec
spec SockAddr
addr

    step :: Socket -> m (Step Socket (Socket, SockAddr))
step Socket
listener = do
        (Socket, SockAddr)
r <- IO (Socket, SockAddr) -> m (Socket, SockAddr)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Socket -> IO (Socket, SockAddr)
Net.accept Socket
listener IO (Socket, SockAddr) -> IO () -> IO (Socket, SockAddr)
forall a b. IO a -> IO b -> IO a
`onException` Socket -> IO ()
Net.close Socket
listener)
        Step Socket (Socket, SockAddr)
-> m (Step Socket (Socket, SockAddr))
forall (m :: * -> *) a. Monad m => a -> m a
return (Step Socket (Socket, SockAddr)
 -> m (Step Socket (Socket, SockAddr)))
-> Step Socket (Socket, SockAddr)
-> m (Step Socket (Socket, SockAddr))
forall a b. (a -> b) -> a -> b
$ (Socket, SockAddr) -> Socket -> Step Socket (Socket, SockAddr)
forall s a. a -> s -> Step s a
D.Yield (Socket, SockAddr)
r Socket
listener

-- | Unfold a three tuple @(listenQLen, spec, addr)@ into a stream of connected
-- protocol sockets corresponding to incoming connections. @listenQLen@ is the
-- maximum number of pending connections in the backlog. @spec@ is the socket
-- protocol and options specification and @addr@ is the protocol address where
-- the server listens for incoming connections.
--
-- @since 0.7.0
{-# INLINE accept #-}
accept :: MonadIO m => Unfold m (Int, SockSpec, SockAddr) Socket
accept :: Unfold m (Int, SockSpec, SockAddr) Socket
accept = ((Socket, SockAddr) -> Socket)
-> Unfold m (Int, SockSpec, SockAddr) (Socket, SockAddr)
-> Unfold m (Int, SockSpec, SockAddr) Socket
forall (m :: * -> *) b c a.
Functor m =>
(b -> c) -> Unfold m a b -> Unfold m a c
UF.map (Socket, SockAddr) -> Socket
forall a b. (a, b) -> a
fst Unfold m (Int, SockSpec, SockAddr) (Socket, SockAddr)
forall (m :: * -> *).
MonadIO m =>
Unfold m (Int, SockSpec, SockAddr) (Socket, SockAddr)
listenTuples

{-# INLINE connectCommon #-}
connectCommon :: SockSpec -> Maybe SockAddr -> SockAddr -> IO Socket
connectCommon :: SockSpec -> Maybe SockAddr -> SockAddr -> IO Socket
connectCommon SockSpec{[(SocketOption, Int)]
ProtocolNumber
SocketType
Family
sockOpts :: [(SocketOption, Int)]
sockProto :: ProtocolNumber
sockType :: SocketType
sockFamily :: Family
sockOpts :: SockSpec -> [(SocketOption, Int)]
sockProto :: SockSpec -> ProtocolNumber
sockType :: SockSpec -> SocketType
sockFamily :: SockSpec -> Family
..} Maybe SockAddr
local SockAddr
remote = IO Socket -> IO Socket
forall a. IO a -> IO a
withSocketsDo (IO Socket -> IO Socket) -> IO Socket -> IO Socket
forall a b. (a -> b) -> a -> b
$ do
    Socket
sock <- Family -> SocketType -> ProtocolNumber -> IO Socket
socket Family
sockFamily SocketType
sockType ProtocolNumber
sockProto
    Socket -> IO ()
use Socket
sock IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO a
`onException` Socket -> IO ()
Net.close Socket
sock
    Socket -> IO Socket
forall (m :: * -> *) a. Monad m => a -> m a
return Socket
sock

    where

    use :: Socket -> IO ()
use Socket
sock = do
        ((SocketOption, Int) -> IO ()) -> [(SocketOption, Int)] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (\(SocketOption
opt, Int
val) -> Socket -> SocketOption -> Int -> IO ()
setSocketOption Socket
sock SocketOption
opt Int
val) [(SocketOption, Int)]
sockOpts
        Maybe SockAddr -> (SockAddr -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ Maybe SockAddr
local (Socket -> SockAddr -> IO ()
bind Socket
sock)
        Socket -> SockAddr -> IO ()
Net.connect Socket
sock SockAddr
remote

-- | Connect to a remote host using the given socket specification and remote
-- address. Returns a connected socket or throws an exception.
--
-- /Pre-release/
--
{-# INLINE connect #-}
connect :: SockSpec -> SockAddr -> IO Socket
connect :: SockSpec -> SockAddr -> IO Socket
connect SockSpec
spec = SockSpec -> Maybe SockAddr -> SockAddr -> IO Socket
connectCommon SockSpec
spec Maybe SockAddr
forall a. Maybe a
Nothing

-- | Connect to a remote host using the given socket specification, a local
-- address to bind to and a remote address to connect to. Returns a connected
-- socket or throws an exception.
--
-- /Pre-release/
--
{-# INLINE connectFrom #-}
connectFrom :: SockSpec -> SockAddr -> SockAddr -> IO Socket
connectFrom :: SockSpec -> SockAddr -> SockAddr -> IO Socket
connectFrom SockSpec
spec SockAddr
local = SockSpec -> Maybe SockAddr -> SockAddr -> IO Socket
connectCommon SockSpec
spec (SockAddr -> Maybe SockAddr
forall a. a -> Maybe a
Just SockAddr
local)

-------------------------------------------------------------------------------
-- Listen (Streams)
-------------------------------------------------------------------------------

{-# INLINE recvConnectionTuplesWith #-}
recvConnectionTuplesWith :: MonadAsync m
    => Int -> SockSpec -> SockAddr -> SerialT m (Socket, SockAddr)
recvConnectionTuplesWith :: Int -> SockSpec -> SockAddr -> SerialT m (Socket, SockAddr)
recvConnectionTuplesWith Int
tcpListenQ SockSpec
spec SockAddr
addr = (Maybe Socket -> m (Maybe ((Socket, SockAddr), Maybe Socket)))
-> Maybe Socket -> SerialT m (Socket, SockAddr)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
(IsStream t, MonadAsync m) =>
(b -> m (Maybe (a, b))) -> b -> t m a
S.unfoldrM Maybe Socket -> m (Maybe ((Socket, SockAddr), Maybe Socket))
forall (m :: * -> *).
MonadIO m =>
Maybe Socket -> m (Maybe ((Socket, SockAddr), Maybe Socket))
step Maybe Socket
forall a. Maybe a
Nothing
    where
    step :: Maybe Socket -> m (Maybe ((Socket, SockAddr), Maybe Socket))
step Maybe Socket
Nothing = do
        Socket
listener <- IO Socket -> m Socket
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Socket -> m Socket) -> IO Socket -> m Socket
forall a b. (a -> b) -> a -> b
$ Int -> SockSpec -> SockAddr -> IO Socket
initListener Int
tcpListenQ SockSpec
spec SockAddr
addr
        (Socket, SockAddr)
r <- IO (Socket, SockAddr) -> m (Socket, SockAddr)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Socket -> IO (Socket, SockAddr)
Net.accept Socket
listener IO (Socket, SockAddr) -> IO () -> IO (Socket, SockAddr)
forall a b. IO a -> IO b -> IO a
`onException` Socket -> IO ()
Net.close Socket
listener)
        Maybe ((Socket, SockAddr), Maybe Socket)
-> m (Maybe ((Socket, SockAddr), Maybe Socket))
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe ((Socket, SockAddr), Maybe Socket)
 -> m (Maybe ((Socket, SockAddr), Maybe Socket)))
-> Maybe ((Socket, SockAddr), Maybe Socket)
-> m (Maybe ((Socket, SockAddr), Maybe Socket))
forall a b. (a -> b) -> a -> b
$ ((Socket, SockAddr), Maybe Socket)
-> Maybe ((Socket, SockAddr), Maybe Socket)
forall a. a -> Maybe a
Just ((Socket, SockAddr)
r, Socket -> Maybe Socket
forall a. a -> Maybe a
Just Socket
listener)

    step (Just Socket
listener) = do
        (Socket, SockAddr)
r <- IO (Socket, SockAddr) -> m (Socket, SockAddr)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Socket -> IO (Socket, SockAddr)
Net.accept Socket
listener IO (Socket, SockAddr) -> IO () -> IO (Socket, SockAddr)
forall a b. IO a -> IO b -> IO a
`onException` Socket -> IO ()
Net.close Socket
listener)
        Maybe ((Socket, SockAddr), Maybe Socket)
-> m (Maybe ((Socket, SockAddr), Maybe Socket))
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe ((Socket, SockAddr), Maybe Socket)
 -> m (Maybe ((Socket, SockAddr), Maybe Socket)))
-> Maybe ((Socket, SockAddr), Maybe Socket)
-> m (Maybe ((Socket, SockAddr), Maybe Socket))
forall a b. (a -> b) -> a -> b
$ ((Socket, SockAddr), Maybe Socket)
-> Maybe ((Socket, SockAddr), Maybe Socket)
forall a. a -> Maybe a
Just ((Socket, SockAddr)
r, Socket -> Maybe Socket
forall a. a -> Maybe a
Just Socket
listener)

-- | Start a TCP stream server that listens for connections on the supplied
-- server address specification (address family, local interface IP address and
-- port). The server generates a stream of connected sockets.  The first
-- argument is the maximum number of pending connections in the backlog.
--
-- /Pre-release/
{-# INLINE connections #-}
connections :: MonadAsync m => Int -> SockSpec -> SockAddr -> SerialT m Socket
connections :: Int -> SockSpec -> SockAddr -> SerialT m Socket
connections Int
tcpListenQ SockSpec
spec SockAddr
addr =
    (Socket, SockAddr) -> Socket
forall a b. (a, b) -> a
fst ((Socket, SockAddr) -> Socket)
-> SerialT m (Socket, SockAddr) -> SerialT m Socket
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Int -> SockSpec -> SockAddr -> SerialT m (Socket, SockAddr)
forall (m :: * -> *).
MonadAsync m =>
Int -> SockSpec -> SockAddr -> SerialT m (Socket, SockAddr)
recvConnectionTuplesWith Int
tcpListenQ SockSpec
spec SockAddr
addr

-------------------------------------------------------------------------------
-- Array IO (Input)
-------------------------------------------------------------------------------

{-# INLINABLE readArrayUptoWith #-}
readArrayUptoWith
    :: (h -> Ptr Word8 -> Int -> IO Int)
    -> Int
    -> h
    -> IO (Array Word8)
readArrayUptoWith :: (h -> Ptr Word8 -> Int -> IO Int) -> Int -> h -> IO (Array Word8)
readArrayUptoWith h -> Ptr Word8 -> Int -> IO Int
f Int
size h
h = do
    ForeignPtr Word8
ptr <- Int -> IO (ForeignPtr Word8)
forall a. Int -> IO (ForeignPtr a)
mallocPlainForeignPtrBytes Int
size
    -- ptr <- mallocPlainForeignPtrAlignedBytes size (alignment (undefined :: Word8))
    ForeignPtr Word8
-> (Ptr Word8 -> IO (Array Word8)) -> IO (Array Word8)
forall a b. ForeignPtr a -> (Ptr a -> IO b) -> IO b
unsafeWithForeignPtr ForeignPtr Word8
ptr ((Ptr Word8 -> IO (Array Word8)) -> IO (Array Word8))
-> (Ptr Word8 -> IO (Array Word8)) -> IO (Array Word8)
forall a b. (a -> b) -> a -> b
$ \Ptr Word8
p -> do
        Int
n <- h -> Ptr Word8 -> Int -> IO Int
f h
h Ptr Word8
p Int
size
        let v :: Array Word8
v = Array Word8 -> Array Word8
forall a. Array a -> Array a
A.unsafeFreeze
                (Array Word8 -> Array Word8) -> Array Word8 -> Array Word8
forall a b. (a -> b) -> a -> b
$ ForeignPtr Word8 -> Ptr Word8 -> Ptr Word8 -> Array Word8
forall a. ForeignPtr a -> Ptr a -> Ptr a -> Array a
fromForeignPtrUnsafe ForeignPtr Word8
ptr (Ptr Word8
p Ptr Word8 -> Int -> Ptr Word8
forall a b. Ptr a -> Int -> Ptr b
`plusPtr` Int
n) (Ptr Word8
p Ptr Word8 -> Int -> Ptr Word8
forall a b. Ptr a -> Int -> Ptr b
`plusPtr` Int
size)

        -- XXX shrink only if the diff is significant
        -- A.shrinkToFit v
        Array Word8 -> IO (Array Word8)
forall (m :: * -> *) a. Monad m => a -> m a
return Array Word8
v

-- | Read a byte array from a file handle up to a maximum of the requested
-- size. If no data is available on the handle it blocks until some data
-- becomes available. If data is available then it immediately returns that
-- data without blocking.
--
-- @since 0.8.0
{-# INLINABLE readChunk #-}
readChunk :: Int -> Socket -> IO (Array Word8)
readChunk :: Int -> Socket -> IO (Array Word8)
readChunk = (Socket -> Ptr Word8 -> Int -> IO Int)
-> Int -> Socket -> IO (Array Word8)
forall h.
(h -> Ptr Word8 -> Int -> IO Int) -> Int -> h -> IO (Array Word8)
readArrayUptoWith Socket -> Ptr Word8 -> Int -> IO Int
recvBuf

-------------------------------------------------------------------------------
-- Array IO (output)
-------------------------------------------------------------------------------

waitWhen0 :: Int -> Socket -> IO ()
waitWhen0 :: Int -> Socket -> IO ()
waitWhen0 Int
0 Socket
s = Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
rtsSupportsBoundThreads (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
#if MIN_VERSION_network(3,1,0)
    Socket -> (ProtocolNumber -> IO ()) -> IO ()
forall r. Socket -> (ProtocolNumber -> IO r) -> IO r
withFdSocket Socket
s ((ProtocolNumber -> IO ()) -> IO ())
-> (ProtocolNumber -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \ProtocolNumber
fd -> Fd -> IO ()
threadWaitWrite (Fd -> IO ()) -> Fd -> IO ()
forall a b. (a -> b) -> a -> b
$ ProtocolNumber -> Fd
forall a b. (Integral a, Num b) => a -> b
fromIntegral ProtocolNumber
fd
#elif MIN_VERSION_network(3,0,0)
    fdSocket s >>= threadWaitWrite . fromIntegral
#else
    let fd = fdSocket s in threadWaitWrite $ fromIntegral fd
#endif
waitWhen0 Int
_ Socket
_ = () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()

sendAll :: Socket -> Ptr Word8 -> Int -> IO ()
sendAll :: Socket -> Ptr Word8 -> Int -> IO ()
sendAll Socket
_ Ptr Word8
_ Int
len | Int
len Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0 = () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
sendAll Socket
s Ptr Word8
p Int
len = do
    Int
sent <- Socket -> Ptr Word8 -> Int -> IO Int
sendBuf Socket
s Ptr Word8
p Int
len
    Int -> Socket -> IO ()
waitWhen0 Int
sent Socket
s
    -- assert (sent <= len)
    Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
sent Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
0) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Socket -> Ptr Word8 -> Int -> IO ()
sendAll Socket
s (Ptr Word8
p Ptr Word8 -> Int -> Ptr Word8
forall a b. Ptr a -> Int -> Ptr b
`plusPtr` Int
sent) (Int
len Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
sent)

{-# INLINABLE writeArrayWith #-}
writeArrayWith :: Storable a
    => (h -> Ptr Word8 -> Int -> IO ())
    -> h
    -> Array a
    -> IO ()
writeArrayWith :: (h -> Ptr Word8 -> Int -> IO ()) -> h -> Array a -> IO ()
writeArrayWith h -> Ptr Word8 -> Int -> IO ()
_ h
_ Array a
arr | Array a -> Int
forall a. Storable a => Array a -> Int
A.length Array a
arr Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0 = () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
writeArrayWith h -> Ptr Word8 -> Int -> IO ()
f h
h Array{Ptr a
ArrayContents
aEnd :: forall a. Array a -> Ptr a
arrStart :: forall a. Array a -> Ptr a
arrContents :: forall a. Array a -> ArrayContents
aEnd :: Ptr a
arrStart :: Ptr a
arrContents :: ArrayContents
..} =
    h -> Ptr Word8 -> Int -> IO ()
f h
h (Ptr a -> Ptr Word8
forall a b. Ptr a -> Ptr b
castPtr Ptr a
arrStart) Int
aLen IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> ArrayContents -> IO ()
touch ArrayContents
arrContents

    where

    aLen :: Int
aLen = Ptr a
aEnd Ptr a -> Ptr a -> Int
forall a b. Ptr a -> Ptr b -> Int
`minusPtr` Ptr a
arrStart

-- | Write an Array to a file handle.
--
-- @since 0.8.0
{-# INLINABLE writeChunk #-}
writeChunk :: Storable a => Socket -> Array a -> IO ()
writeChunk :: Socket -> Array a -> IO ()
writeChunk = (Socket -> Ptr Word8 -> Int -> IO ()) -> Socket -> Array a -> IO ()
forall a h.
Storable a =>
(h -> Ptr Word8 -> Int -> IO ()) -> h -> Array a -> IO ()
writeArrayWith Socket -> Ptr Word8 -> Int -> IO ()
sendAll

-------------------------------------------------------------------------------
-- Stream of Arrays IO
-------------------------------------------------------------------------------

{-# INLINABLE _readChunksUptoWith #-}
_readChunksUptoWith :: (IsStream t, MonadIO m)
    => (Int -> h -> IO (Array Word8))
    -> Int -> h -> t m (Array Word8)
_readChunksUptoWith :: (Int -> h -> IO (Array Word8)) -> Int -> h -> t m (Array Word8)
_readChunksUptoWith Int -> h -> IO (Array Word8)
f Int
size h
h = t m (Array Word8)
go
  where
    -- XXX use cons/nil instead
    go :: t m (Array Word8)
go = (forall r.
 State Stream m (Array Word8)
 -> (Array Word8 -> t m (Array Word8) -> m r)
 -> (Array Word8 -> m r)
 -> m r
 -> m r)
-> t m (Array Word8)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
(forall r.
 State Stream m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
mkStream ((forall r.
  State Stream m (Array Word8)
  -> (Array Word8 -> t m (Array Word8) -> m r)
  -> (Array Word8 -> m r)
  -> m r
  -> m r)
 -> t m (Array Word8))
-> (forall r.
    State Stream m (Array Word8)
    -> (Array Word8 -> t m (Array Word8) -> m r)
    -> (Array Word8 -> m r)
    -> m r
    -> m r)
-> t m (Array Word8)
forall a b. (a -> b) -> a -> b
$ \State Stream m (Array Word8)
_ Array Word8 -> t m (Array Word8) -> m r
yld Array Word8 -> m r
_ m r
stp -> do
        Array Word8
arr <- IO (Array Word8) -> m (Array Word8)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Array Word8) -> m (Array Word8))
-> IO (Array Word8) -> m (Array Word8)
forall a b. (a -> b) -> a -> b
$ Int -> h -> IO (Array Word8)
f Int
size h
h
        if Array Word8 -> Int
forall a. Storable a => Array a -> Int
A.length Array Word8
arr Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0
        then m r
stp
        else Array Word8 -> t m (Array Word8) -> m r
yld Array Word8
arr t m (Array Word8)
go

-- | @toChunksWithBufferOf size h@ reads a stream of arrays from file handle @h@.
-- The maximum size of a single array is limited to @size@.
-- 'fromHandleArraysUpto' ignores the prevailing 'TextEncoding' and 'NewlineMode'
-- on the 'Handle'.
{-# INLINE_NORMAL toChunksWithBufferOf #-}
toChunksWithBufferOf :: (IsStream t, MonadIO m)
    => Int -> Socket -> t m (Array Word8)
-- toChunksWithBufferOf = _readChunksUptoWith readChunk
toChunksWithBufferOf :: Int -> Socket -> t m (Array Word8)
toChunksWithBufferOf Int
size Socket
h = Stream m (Array Word8) -> t m (Array Word8)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD ((State Stream m (Array Word8) -> () -> m (Step () (Array Word8)))
-> () -> Stream m (Array Word8)
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream State Stream m (Array Word8) -> () -> m (Step () (Array Word8))
forall (m :: * -> *) p p.
MonadIO m =>
p -> p -> m (Step () (Array Word8))
step ())
    where
    {-# INLINE_LATE step #-}
    step :: p -> p -> m (Step () (Array Word8))
step p
_ p
_ = do
        Array Word8
arr <- IO (Array Word8) -> m (Array Word8)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Array Word8) -> m (Array Word8))
-> IO (Array Word8) -> m (Array Word8)
forall a b. (a -> b) -> a -> b
$ Int -> Socket -> IO (Array Word8)
readChunk Int
size Socket
h
        Step () (Array Word8) -> m (Step () (Array Word8))
forall (m :: * -> *) a. Monad m => a -> m a
return (Step () (Array Word8) -> m (Step () (Array Word8)))
-> Step () (Array Word8) -> m (Step () (Array Word8))
forall a b. (a -> b) -> a -> b
$
            case Array Word8 -> Int
forall a. Storable a => Array a -> Int
A.length Array Word8
arr of
                Int
0 -> Step () (Array Word8)
forall s a. Step s a
D.Stop
                Int
_ -> Array Word8 -> () -> Step () (Array Word8)
forall s a. a -> s -> Step s a
D.Yield Array Word8
arr ()

-- XXX read 'Array a' instead of Word8
--
-- | @toChunks h@ reads a stream of arrays from socket handle @h@.
-- The maximum size of a single array is limited to @defaultChunkSize@.
--
-- @since 0.7.0
{-# INLINE toChunks #-}
toChunks :: (IsStream t, MonadIO m) => Socket -> t m (Array Word8)
toChunks :: Socket -> t m (Array Word8)
toChunks = Int -> Socket -> t m (Array Word8)
forall (t :: (* -> *) -> * -> *) (m :: * -> *).
(IsStream t, MonadIO m) =>
Int -> Socket -> t m (Array Word8)
toChunksWithBufferOf Int
defaultChunkSize

-- | Unfold the tuple @(bufsize, socket)@ into a stream of 'Word8' arrays.
-- Read requests to the socket are performed using a buffer of size @bufsize@.
-- The size of an array in the resulting stream is always less than or equal to
-- @bufsize@.
--
-- @since 0.7.0
{-# INLINE_NORMAL readChunksWithBufferOf #-}
readChunksWithBufferOf :: MonadIO m => Unfold m (Int, Socket) (Array Word8)
readChunksWithBufferOf :: Unfold m (Int, Socket) (Array Word8)
readChunksWithBufferOf = ((Int, Socket) -> m (Step (Int, Socket) (Array Word8)))
-> ((Int, Socket) -> m (Int, Socket))
-> Unfold m (Int, Socket) (Array Word8)
forall (m :: * -> *) a b s.
(s -> m (Step s b)) -> (a -> m s) -> Unfold m a b
Unfold (Int, Socket) -> m (Step (Int, Socket) (Array Word8))
forall (m :: * -> *).
MonadIO m =>
(Int, Socket) -> m (Step (Int, Socket) (Array Word8))
step (Int, Socket) -> m (Int, Socket)
forall (m :: * -> *) a. Monad m => a -> m a
return
    where
    {-# INLINE_LATE step #-}
    step :: (Int, Socket) -> m (Step (Int, Socket) (Array Word8))
step (Int
size, Socket
h) = do
        Array Word8
arr <- IO (Array Word8) -> m (Array Word8)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Array Word8) -> m (Array Word8))
-> IO (Array Word8) -> m (Array Word8)
forall a b. (a -> b) -> a -> b
$ Int -> Socket -> IO (Array Word8)
readChunk Int
size Socket
h
        Step (Int, Socket) (Array Word8)
-> m (Step (Int, Socket) (Array Word8))
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Int, Socket) (Array Word8)
 -> m (Step (Int, Socket) (Array Word8)))
-> Step (Int, Socket) (Array Word8)
-> m (Step (Int, Socket) (Array Word8))
forall a b. (a -> b) -> a -> b
$
            case Array Word8 -> Int
forall a. Storable a => Array a -> Int
A.length Array Word8
arr of
                Int
0 -> Step (Int, Socket) (Array Word8)
forall s a. Step s a
D.Stop
                Int
_ -> Array Word8 -> (Int, Socket) -> Step (Int, Socket) (Array Word8)
forall s a. a -> s -> Step s a
D.Yield Array Word8
arr (Int
size, Socket
h)

-- | Unfolds a socket into a stream of 'Word8' arrays. Requests to the socket
-- are performed using a buffer of size
-- 'Streamly.Internal.Data.Array.Foreign.Type.defaultChunkSize'. The
-- size of arrays in the resulting stream are therefore less than or equal to
-- 'Streamly.Internal.Data.Array.Foreign.Type.defaultChunkSize'.
--
-- @since 0.7.0
{-# INLINE readChunks #-}
readChunks :: MonadIO m => Unfold m Socket (Array Word8)
readChunks :: Unfold m Socket (Array Word8)
readChunks = Int
-> Unfold m (Int, Socket) (Array Word8)
-> Unfold m Socket (Array Word8)
forall a (m :: * -> *) b c. a -> Unfold m (a, b) c -> Unfold m b c
UF.supplyFirst Int
defaultChunkSize Unfold m (Int, Socket) (Array Word8)
forall (m :: * -> *).
MonadIO m =>
Unfold m (Int, Socket) (Array Word8)
readChunksWithBufferOf

-------------------------------------------------------------------------------
-- Read File to Stream
-------------------------------------------------------------------------------

-- TODO for concurrent streams implement readahead IO. We can send multiple
-- read requests at the same time. For serial case we can use async IO. We can
-- also control the read throughput in mbps or IOPS.

{-
-- | @readWithBufferOf bufsize handle@ reads a byte stream from a file
-- handle, reads are performed in chunks of up to @bufsize@.  The stream ends
-- as soon as EOF is encountered.
--
{-# INLINE readWithBufferOf #-}
readWithBufferOf :: (IsStream t, MonadIO m) => Int -> Handle -> t m Word8
readWithBufferOf chunkSize h = A.flattenArrays $ readChunksUpto chunkSize h
-}

-- TODO
-- read :: (IsStream t, MonadIO m, Storable a) => Handle -> t m a
--
-- > read = 'readByChunks' defaultChunkSize
-- | Generate a stream of elements of the given type from a socket. The
-- stream ends when EOF is encountered.
--
-- @since 0.7.0
{-# INLINE toBytes #-}
toBytes :: (IsStream t, MonadIO m) => Socket -> t m Word8
toBytes :: Socket -> t m Word8
toBytes = t m (Array Word8) -> t m Word8
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadIO m, Storable a) =>
t m (Array a) -> t m a
AS.concat (t m (Array Word8) -> t m Word8)
-> (Socket -> t m (Array Word8)) -> Socket -> t m Word8
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Socket -> t m (Array Word8)
forall (t :: (* -> *) -> * -> *) (m :: * -> *).
(IsStream t, MonadIO m) =>
Socket -> t m (Array Word8)
toChunks

-- | Unfolds the tuple @(bufsize, socket)@ into a byte stream, read requests
-- to the socket are performed using buffers of @bufsize@.
--
-- @since 0.7.0
{-# INLINE readWithBufferOf #-}
readWithBufferOf :: MonadIO m => Unfold m (Int, Socket) Word8
readWithBufferOf :: Unfold m (Int, Socket) Word8
readWithBufferOf = Unfold m (Int, Socket) (Array Word8)
-> Unfold m (Array Word8) Word8 -> Unfold m (Int, Socket) Word8
forall (m :: * -> *) a b c.
Monad m =>
Unfold m a b -> Unfold m b c -> Unfold m a c
UF.many Unfold m (Int, Socket) (Array Word8)
forall (m :: * -> *).
MonadIO m =>
Unfold m (Int, Socket) (Array Word8)
readChunksWithBufferOf Unfold m (Array Word8) Word8
forall (m :: * -> *) a.
(Monad m, Storable a) =>
Unfold m (Array a) a
A.read

-- | Unfolds a 'Socket' into a byte stream.  IO requests to the socket are
-- performed in sizes of
-- 'Streamly.Internal.Data.Array.Foreign.Type.defaultChunkSize'.
--
-- @since 0.7.0
{-# INLINE read #-}
read :: MonadIO m => Unfold m Socket Word8
read :: Unfold m Socket Word8
read = Int -> Unfold m (Int, Socket) Word8 -> Unfold m Socket Word8
forall a (m :: * -> *) b c. a -> Unfold m (a, b) c -> Unfold m b c
UF.supplyFirst Int
defaultChunkSize Unfold m (Int, Socket) Word8
forall (m :: * -> *). MonadIO m => Unfold m (Int, Socket) Word8
readWithBufferOf

-------------------------------------------------------------------------------
-- Writing
-------------------------------------------------------------------------------

-- | Write a stream of arrays to a handle.
--
-- @since 0.7.0
{-# INLINE putChunks #-}
putChunks :: (MonadIO m, Storable a)
    => Socket -> SerialT m (Array a) -> m ()
putChunks :: Socket -> SerialT m (Array a) -> m ()
putChunks Socket
h = (Array a -> m ()) -> SerialT m (Array a) -> m ()
forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> SerialT m a -> m ()
S.mapM_ (IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> (Array a -> IO ()) -> Array a -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Socket -> Array a -> IO ()
forall a. Storable a => Socket -> Array a -> IO ()
writeChunk Socket
h)

-- | Write a stream of arrays to a socket.  Each array in the stream is written
-- to the socket as a separate IO request.
--
-- @since 0.7.0
{-# INLINE writeChunks #-}
writeChunks :: (MonadIO m, Storable a) => Socket -> Fold m (Array a) ()
writeChunks :: Socket -> Fold m (Array a) ()
writeChunks Socket
h = (Array a -> m ()) -> Fold m (Array a) ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> Fold m a ()
FL.drainBy (IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> (Array a -> IO ()) -> Array a -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Socket -> Array a -> IO ()
forall a. Storable a => Socket -> Array a -> IO ()
writeChunk Socket
h)

-- | @writeChunksWithBufferOf bufsize socket@ writes a stream of arrays to
-- @socket@ after coalescing the adjacent arrays in chunks of @bufsize@.
-- Multiple arrays are coalesed as long as the total size remains below the
-- specified size.  It never splits an array, if a single array is bigger than
-- the specified size it emitted as it is.
--
-- @since 0.8.0
{-# INLINE writeChunksWithBufferOf #-}
writeChunksWithBufferOf :: (MonadIO m, Storable a)
    => Int -> Socket -> Fold m (Array a) ()
writeChunksWithBufferOf :: Int -> Socket -> Fold m (Array a) ()
writeChunksWithBufferOf Int
n Socket
h = Int -> Fold m (Array a) () -> Fold m (Array a) ()
forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
Int -> Fold m (Array a) () -> Fold m (Array a) ()
lpackArraysChunksOf Int
n (Socket -> Fold m (Array a) ()
forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
Socket -> Fold m (Array a) ()
writeChunks Socket
h)

-- GHC buffer size dEFAULT_FD_BUFFER_SIZE=8192 bytes.
--
-- XXX test this
-- Note that if you use a chunk size less than 8K (GHC's default buffer
-- size) then you are advised to use 'NOBuffering' mode on the 'Handle' in case you
-- do not want buffering to occur at GHC level as well. Same thing applies to
-- writes as well.

-- | Like 'write' but provides control over the write buffer. Output will
-- be written to the IO device as soon as we collect the specified number of
-- input elements.
--
-- @since 0.7.0
{-# INLINE putBytesWithBufferOf #-}
putBytesWithBufferOf :: MonadIO m => Int -> Socket -> SerialT m Word8 -> m ()
putBytesWithBufferOf :: Int -> Socket -> SerialT m Word8 -> m ()
putBytesWithBufferOf Int
n Socket
h SerialT m Word8
m = Socket -> SerialT m (Array Word8) -> m ()
forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
Socket -> SerialT m (Array a) -> m ()
putChunks Socket
h (SerialT m (Array Word8) -> m ())
-> SerialT m (Array Word8) -> m ()
forall a b. (a -> b) -> a -> b
$ Int -> SerialT m Word8 -> SerialT m (Array Word8)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadIO m, Storable a) =>
Int -> t m a -> t m (Array a)
AS.arraysOf Int
n SerialT m Word8
m

-- | Write a byte stream to a socket. Accumulates the input in chunks of
-- specified number of bytes before writing.
--
-- @since 0.7.0
{-# INLINE writeWithBufferOf #-}
writeWithBufferOf :: MonadIO m => Int -> Socket -> Fold m Word8 ()
writeWithBufferOf :: Int -> Socket -> Fold m Word8 ()
writeWithBufferOf Int
n Socket
h = Int
-> Fold m Word8 (Array Word8)
-> Fold m (Array Word8) ()
-> Fold m Word8 ()
forall (m :: * -> *) a b c.
Monad m =>
Int -> Fold m a b -> Fold m b c -> Fold m a c
FL.chunksOf Int
n (Int -> Fold m Word8 (Array Word8)
forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
Int -> Fold m a (Array a)
A.writeNUnsafe Int
n) (Socket -> Fold m (Array Word8) ()
forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
Socket -> Fold m (Array a) ()
writeChunks Socket
h)

-- | Write a stream of 'Maybe' values. Keep buffering the 'Just' values in an
-- array. Write the array to the 'Handle' as soon as a 'Nothing' is encountered
-- or the buffer size exceeds the specified limit.
--
-- /Pre-release/
{-# INLINE writeMaybesWithBufferOf #-}
writeMaybesWithBufferOf :: (MonadIO m )
    => Int -> Socket -> Fold m (Maybe Word8) ()
writeMaybesWithBufferOf :: Int -> Socket -> Fold m (Maybe Word8) ()
writeMaybesWithBufferOf Int
n Socket
h =
    let writeNJusts :: Fold m (Maybe Word8) (Array Word8)
writeNJusts = (Maybe Word8 -> Word8)
-> Fold m Word8 (Array Word8) -> Fold m (Maybe Word8) (Array Word8)
forall a b (m :: * -> *) r. (a -> b) -> Fold m b r -> Fold m a r
FL.lmap Maybe Word8 -> Word8
forall a. HasCallStack => Maybe a -> a
fromJust (Fold m Word8 (Array Word8) -> Fold m (Maybe Word8) (Array Word8))
-> Fold m Word8 (Array Word8) -> Fold m (Maybe Word8) (Array Word8)
forall a b. (a -> b) -> a -> b
$ Int -> Fold m Word8 (Array Word8)
forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
Int -> Fold m a (Array a)
A.writeN Int
n
        writeOnNothing :: Fold m (Maybe Word8) (Array Word8)
writeOnNothing = (Maybe Word8 -> Bool)
-> Fold m (Maybe Word8) (Array Word8)
-> Fold m (Maybe Word8) (Array Word8)
forall (m :: * -> *) a b.
Monad m =>
(a -> Bool) -> Fold m a b -> Fold m a b
FL.takeEndBy_ Maybe Word8 -> Bool
forall a. Maybe a -> Bool
isNothing Fold m (Maybe Word8) (Array Word8)
writeNJusts
    in Fold m (Maybe Word8) (Array Word8)
-> Fold m (Array Word8) () -> Fold m (Maybe Word8) ()
forall (m :: * -> *) a b c.
Monad m =>
Fold m a b -> Fold m b c -> Fold m a c
FL.many Fold m (Maybe Word8) (Array Word8)
writeOnNothing (Socket -> Fold m (Array Word8) ()
forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
Socket -> Fold m (Array a) ()
writeChunks Socket
h)

-- > write = 'writeWithBufferOf' defaultChunkSize
--
-- | Write a byte stream to a file handle. Combines the bytes in chunks of size
-- up to 'defaultChunkSize' before writing.  Note that the write behavior
-- depends on the 'IOMode' and the current seek position of the handle.
--
-- @since 0.7.0
{-# INLINE putBytes #-}
putBytes :: MonadIO m => Socket -> SerialT m Word8 -> m ()
putBytes :: Socket -> SerialT m Word8 -> m ()
putBytes = Int -> Socket -> SerialT m Word8 -> m ()
forall (m :: * -> *).
MonadIO m =>
Int -> Socket -> SerialT m Word8 -> m ()
putBytesWithBufferOf Int
defaultChunkSize

-- | Write a byte stream to a socket. Accumulates the input in chunks of
-- up to 'defaultChunkSize' bytes before writing.
--
-- @
-- write = 'writeWithBufferOf' 'defaultChunkSize'
-- @
--
-- @since 0.7.0
{-# INLINE write #-}
write :: MonadIO m => Socket -> Fold m Word8 ()
write :: Socket -> Fold m Word8 ()
write = Int -> Socket -> Fold m Word8 ()
forall (m :: * -> *). MonadIO m => Int -> Socket -> Fold m Word8 ()
writeWithBufferOf Int
defaultChunkSize

{-
{-# INLINE write #-}
write :: (MonadIO m, Storable a) => Handle -> SerialT m a -> m ()
write = toHandleWith defaultChunkSize
-}

-------------------------------------------------------------------------------
-- IO with encoding/decoding Unicode characters
-------------------------------------------------------------------------------

{-
-- |
-- > readUtf8 = decodeUtf8 . read
--
-- Read a UTF8 encoded stream of unicode characters from a file handle.
--
-- @since 0.7.0
{-# INLINE readUtf8 #-}
readUtf8 :: (IsStream t, MonadIO m) => Handle -> t m Char
readUtf8 = decodeUtf8 . read

-- |
-- > writeUtf8 h s = write h $ encodeUtf8 s
--
-- Encode a stream of unicode characters to UTF8 and write it to the given file
-- handle. Default block buffering applies to the writes.
--
-- @since 0.7.0
{-# INLINE writeUtf8 #-}
writeUtf8 :: MonadIO m => Handle -> SerialT m Char -> m ()
writeUtf8 h s = write h $ encodeUtf8 s

-- | Write a stream of unicode characters after encoding to UTF-8 in chunks
-- separated by a linefeed character @'\n'@. If the size of the buffer exceeds
-- @defaultChunkSize@ and a linefeed is not yet found, the buffer is written
-- anyway.  This is similar to writing to a 'Handle' with the 'LineBuffering'
-- option.
--
-- @since 0.7.0
{-# INLINE writeUtf8ByLines #-}
writeUtf8ByLines :: (IsStream t, MonadIO m) => Handle -> t m Char -> m ()
writeUtf8ByLines = undefined

-- | Read UTF-8 lines from a file handle and apply the specified fold to each
-- line. This is similar to reading a 'Handle' with the 'LineBuffering' option.
--
-- @since 0.7.0
{-# INLINE readLines #-}
readLines :: (IsStream t, MonadIO m) => Handle -> Fold m Char b -> t m b
readLines h f = foldLines (readUtf8 h) f

-------------------------------------------------------------------------------
-- Framing on a sequence
-------------------------------------------------------------------------------

-- | Read a stream from a file handle and split it into frames delimited by
-- the specified sequence of elements. The supplied fold is applied on each
-- frame.
--
-- @since 0.7.0
{-# INLINE readFrames #-}
readFrames :: (IsStream t, MonadIO m, Storable a)
    => Array a -> Handle -> Fold m a b -> t m b
readFrames = undefined -- foldFrames . read

-- | Write a stream to the given file handle buffering up to frames separated
-- by the given sequence or up to a maximum of @defaultChunkSize@.
--
-- @since 0.7.0
{-# INLINE writeByFrames #-}
writeByFrames :: (IsStream t, MonadIO m, Storable a)
    => Array a -> Handle -> t m a -> m ()
writeByFrames = undefined
-}