#include "inline.hs"

-- |
-- Module      : Streamly.Internal.Network.Inet.TCP
-- Copyright   : (c) 2019 Composewell Technologies
--
-- License     : BSD3
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC
--
-- Combinators to build Inet/TCP clients and servers.

module Streamly.Internal.Network.Inet.TCP
    (
    -- * TCP Servers
    -- ** Unfolds
      acceptOnAddr
    , acceptOnAddrWith
    , acceptOnPort
    , acceptOnPortWith
    , acceptOnPortLocal

    -- ** Streams
    , connectionsOnAddr
    , connectionsOnAddrWith
    , connectionsOnPort
    , connectionsOnLocalHost

    -- * TCP clients
    -- | IP Address based operations.
    , connect
    , withConnectionM

    -- ** Unfolds
    , usingConnection
    , read

    -- ** Streams
    , withConnection
    -- *** Source
    , toBytes
    -- , readUtf8
    -- , readLines
    -- , readFrames
    -- , readByChunks

    -- -- * Array Read
    -- , readArrayUpto
    -- , readArrayOf

    -- , readChunksUpto
    -- , readChunksOf
    -- , readChunks

    -- *** Sink
    , write
    -- , writeUtf8
    -- , writeUtf8ByLines
    -- , writeByFrames
    , writeWithBufferOf
    , putBytes
    , putBytesWithBufferOf

    -- -- * Array Write
    -- , writeArray
    , writeChunks
    , putChunks

    -- ** Transformation
    , processBytes
    {-
    -- ** Sink Servers

    -- These abstractions can be applied to any setting where we need to do a
    -- sink processing of multiple streams e.g. output from multiple processes
    -- or data coming from multiple files.

    -- handle connections concurrently using a specified fold
    -- , handleConnections

    -- handle frames concurrently using a specified fold
    , handleFrames

    -- merge frames from all connection into a single stream. Frames can be
    -- created by a specified fold.
    , mergeFrames

    -- * UDP Servers
    , datagrams
    , datagramsOn
    -}
    )
where

import Control.Exception (onException)
import Control.Monad.Catch (MonadCatch, MonadMask, bracket)
import Control.Monad.IO.Class (MonadIO(..))
import Data.Word (Word8)
import Network.Socket
       (Socket, PortNumber, SocketOption(..), Family(..), SockAddr(..),
        SocketType(..), defaultProtocol, maxListenQueue, tupleToHostAddress,
        socket)
import Prelude hiding (read)

import Streamly.Internal.Control.Concurrent (MonadAsync)
import Streamly.Internal.Control.ForkLifted (fork)
import Streamly.Internal.Data.Array.Foreign.Type (Array(..), writeNUnsafe)
import Streamly.Internal.Data.Fold.Type (Fold(..))
import Streamly.Internal.Data.Stream.IsStream.Type (IsStream)
import Streamly.Internal.Data.Stream.Serial (SerialT)
import Streamly.Internal.Data.Tuple.Strict (Tuple'(..))
import Streamly.Internal.Data.Unfold.Type (Unfold(..))
import Streamly.Internal.Network.Socket (SockSpec(..), accept, connections)
import Streamly.Internal.System.IO (defaultChunkSize)

import qualified Control.Monad.Catch as MC
import qualified Network.Socket as Net

import qualified Streamly.Internal.Data.Unfold as UF
import qualified Streamly.Internal.Data.Array.Foreign as A
import qualified Streamly.Internal.Data.Array.Stream.Foreign as AS
import qualified Streamly.Internal.Data.Fold.Type as FL
import qualified Streamly.Internal.Data.Stream.IsStream as S
import qualified Streamly.Internal.Network.Socket as ISK

-------------------------------------------------------------------------------
-- Accept (unfolds)
-------------------------------------------------------------------------------

{-# INLINE acceptOnAddrWith #-}
acceptOnAddrWith
    :: MonadIO m
    => [(SocketOption, Int)]
    -> Unfold m ((Word8, Word8, Word8, Word8), PortNumber) Socket
acceptOnAddrWith :: forall (m :: * -> *).
MonadIO m =>
[(SocketOption, Int)]
-> Unfold m ((Word8, Word8, Word8, Word8), PortNumber) Socket
acceptOnAddrWith [(SocketOption, Int)]
opts = forall a c (m :: * -> *) b.
(a -> c) -> Unfold m c b -> Unfold m a b
UF.lmap ((Word8, Word8, Word8, Word8), PortNumber)
-> (Int, SockSpec, SockAddr)
f forall (m :: * -> *).
MonadIO m =>
Unfold m (Int, SockSpec, SockAddr) Socket
accept
    where
    f :: ((Word8, Word8, Word8, Word8), PortNumber)
-> (Int, SockSpec, SockAddr)
f ((Word8, Word8, Word8, Word8)
addr, PortNumber
port) =
        (Int
maxListenQueue
        , SockSpec
            { sockFamily :: Family
sockFamily = Family
AF_INET
            , sockType :: SocketType
sockType = SocketType
Stream
            , sockProto :: ProtocolNumber
sockProto = ProtocolNumber
defaultProtocol -- TCP
            , sockOpts :: [(SocketOption, Int)]
sockOpts = [(SocketOption, Int)]
opts
            }
        , PortNumber -> HostAddress -> SockAddr
SockAddrInet PortNumber
port ((Word8, Word8, Word8, Word8) -> HostAddress
tupleToHostAddress (Word8, Word8, Word8, Word8)
addr)
        )

-- | Unfold a tuple @(ipAddr, port)@ into a stream of connected TCP sockets.
-- @ipAddr@ is the local IP address and @port@ is the local port on which
-- connections are accepted.
--
-- @since 0.7.0
{-# INLINE acceptOnAddr #-}
acceptOnAddr
    :: MonadIO m
    => Unfold m ((Word8, Word8, Word8, Word8), PortNumber) Socket
acceptOnAddr :: forall (m :: * -> *).
MonadIO m =>
Unfold m ((Word8, Word8, Word8, Word8), PortNumber) Socket
acceptOnAddr = forall (m :: * -> *).
MonadIO m =>
[(SocketOption, Int)]
-> Unfold m ((Word8, Word8, Word8, Word8), PortNumber) Socket
acceptOnAddrWith []

{-# INLINE acceptOnPortWith #-}
acceptOnPortWith :: MonadIO m
    => [(SocketOption, Int)]
    -> Unfold m PortNumber Socket
acceptOnPortWith :: forall (m :: * -> *).
MonadIO m =>
[(SocketOption, Int)] -> Unfold m PortNumber Socket
acceptOnPortWith [(SocketOption, Int)]
opts = forall a (m :: * -> *) b c. a -> Unfold m (a, b) c -> Unfold m b c
UF.supplyFirst (Word8
0,Word8
0,Word8
0,Word8
0) (forall (m :: * -> *).
MonadIO m =>
[(SocketOption, Int)]
-> Unfold m ((Word8, Word8, Word8, Word8), PortNumber) Socket
acceptOnAddrWith [(SocketOption, Int)]
opts)

-- | Like 'acceptOnAddr' but binds on the IPv4 address @0.0.0.0@ i.e.  on all
-- IPv4 addresses/interfaces of the machine and listens for TCP connections on
-- the specified port.
--
-- > acceptOnPort = UF.supplyFirst acceptOnAddr (0,0,0,0)
--
-- @since 0.7.0
{-# INLINE acceptOnPort #-}
acceptOnPort :: MonadIO m => Unfold m PortNumber Socket
acceptOnPort :: forall (m :: * -> *). MonadIO m => Unfold m PortNumber Socket
acceptOnPort = forall a (m :: * -> *) b c. a -> Unfold m (a, b) c -> Unfold m b c
UF.supplyFirst (Word8
0,Word8
0,Word8
0,Word8
0) forall (m :: * -> *).
MonadIO m =>
Unfold m ((Word8, Word8, Word8, Word8), PortNumber) Socket
acceptOnAddr

-- | Like 'acceptOnAddr' but binds on the localhost IPv4 address @127.0.0.1@.
-- The server can only be accessed from the local host, it cannot be accessed
-- from other hosts on the network.
--
-- > acceptOnPortLocal = UF.supplyFirst acceptOnAddr (127,0,0,1)
--
-- @since 0.7.0
{-# INLINE acceptOnPortLocal #-}
acceptOnPortLocal :: MonadIO m => Unfold m PortNumber Socket
acceptOnPortLocal :: forall (m :: * -> *). MonadIO m => Unfold m PortNumber Socket
acceptOnPortLocal = forall a (m :: * -> *) b c. a -> Unfold m (a, b) c -> Unfold m b c
UF.supplyFirst (Word8
127,Word8
0,Word8
0,Word8
1) forall (m :: * -> *).
MonadIO m =>
Unfold m ((Word8, Word8, Word8, Word8), PortNumber) Socket
acceptOnAddr

-------------------------------------------------------------------------------
-- Accept (streams)
-------------------------------------------------------------------------------

{-# INLINE connectionsOnAddrWith #-}
connectionsOnAddrWith
    :: MonadAsync m
    => [(SocketOption, Int)]
    -> (Word8, Word8, Word8, Word8)
    -> PortNumber
    -> SerialT m Socket
connectionsOnAddrWith :: forall (m :: * -> *).
MonadAsync m =>
[(SocketOption, Int)]
-> (Word8, Word8, Word8, Word8) -> PortNumber -> SerialT m Socket
connectionsOnAddrWith [(SocketOption, Int)]
opts (Word8, Word8, Word8, Word8)
addr PortNumber
port =
    forall (m :: * -> *).
MonadAsync m =>
Int -> SockSpec -> SockAddr -> SerialT m Socket
connections Int
maxListenQueue SockSpec
        { sockFamily :: Family
sockFamily = Family
AF_INET
        , sockType :: SocketType
sockType = SocketType
Stream
        , sockProto :: ProtocolNumber
sockProto = ProtocolNumber
defaultProtocol
        , sockOpts :: [(SocketOption, Int)]
sockOpts = [(SocketOption, Int)]
opts
        }
        (PortNumber -> HostAddress -> SockAddr
SockAddrInet PortNumber
port ((Word8, Word8, Word8, Word8) -> HostAddress
tupleToHostAddress (Word8, Word8, Word8, Word8)
addr))

-- | Like 'connections' but binds on the specified IPv4 address of the machine
-- and listens for TCP connections on the specified port.
--
-- /Pre-release/
{-# INLINE connectionsOnAddr #-}
connectionsOnAddr
    :: MonadAsync m
    => (Word8, Word8, Word8, Word8)
    -> PortNumber
    -> SerialT m Socket
connectionsOnAddr :: forall (m :: * -> *).
MonadAsync m =>
(Word8, Word8, Word8, Word8) -> PortNumber -> SerialT m Socket
connectionsOnAddr = forall (m :: * -> *).
MonadAsync m =>
[(SocketOption, Int)]
-> (Word8, Word8, Word8, Word8) -> PortNumber -> SerialT m Socket
connectionsOnAddrWith []

-- | Like 'connections' but binds on the IPv4 address @0.0.0.0@ i.e.  on all
-- IPv4 addresses/interfaces of the machine and listens for TCP connections on
-- the specified port.
--
-- > connectionsOnPort = connectionsOnAddr (0,0,0,0)
--
-- /Pre-release/
{-# INLINE connectionsOnPort #-}
connectionsOnPort :: MonadAsync m => PortNumber -> SerialT m Socket
connectionsOnPort :: forall (m :: * -> *).
MonadAsync m =>
PortNumber -> SerialT m Socket
connectionsOnPort = forall (m :: * -> *).
MonadAsync m =>
(Word8, Word8, Word8, Word8) -> PortNumber -> SerialT m Socket
connectionsOnAddr (Word8
0,Word8
0,Word8
0,Word8
0)

-- | Like 'connections' but binds on the localhost IPv4 address @127.0.0.1@.
-- The server can only be accessed from the local host, it cannot be accessed
-- from other hosts on the network.
--
-- > connectionsOnLocalHost = connectionsOnAddr (127,0,0,1)
--
-- /Pre-release/
{-# INLINE connectionsOnLocalHost #-}
connectionsOnLocalHost :: MonadAsync m => PortNumber -> SerialT m Socket
connectionsOnLocalHost :: forall (m :: * -> *).
MonadAsync m =>
PortNumber -> SerialT m Socket
connectionsOnLocalHost = forall (m :: * -> *).
MonadAsync m =>
(Word8, Word8, Word8, Word8) -> PortNumber -> SerialT m Socket
connectionsOnAddr (Word8
127,Word8
0,Word8
0,Word8
1)

-------------------------------------------------------------------------------
-- TCP Clients
-------------------------------------------------------------------------------

-- | Connect to the specified IP address and port number. Returns a connected
-- socket or throws an exception.
--
-- @since 0.7.0
connect :: (Word8, Word8, Word8, Word8) -> PortNumber -> IO Socket
connect :: (Word8, Word8, Word8, Word8) -> PortNumber -> IO Socket
connect (Word8, Word8, Word8, Word8)
addr PortNumber
port = do
    Socket
sock <- Family -> SocketType -> ProtocolNumber -> IO Socket
socket Family
AF_INET SocketType
Stream ProtocolNumber
defaultProtocol
    Socket -> SockAddr -> IO ()
Net.connect Socket
sock (PortNumber -> HostAddress -> SockAddr
SockAddrInet PortNumber
port ((Word8, Word8, Word8, Word8) -> HostAddress
Net.tupleToHostAddress (Word8, Word8, Word8, Word8)
addr))
        forall a b. IO a -> IO b -> IO a
`onException` Socket -> IO ()
Net.close Socket
sock
    forall (m :: * -> *) a. Monad m => a -> m a
return Socket
sock

-- | Connect to a remote host using IP address and port and run the supplied
-- action on the resulting socket.  'withConnectionM' makes sure that the
-- socket is closed on normal termination or in case of an exception.  If
-- closing the socket raises an exception, then this exception will be raised
-- by 'withConnectionM'.
--
-- /Pre-release/
{-# INLINABLE withConnectionM #-}
withConnectionM :: (MonadMask m, MonadIO m)
    => (Word8, Word8, Word8, Word8) -> PortNumber -> (Socket -> m ()) -> m ()
withConnectionM :: forall (m :: * -> *).
(MonadMask m, MonadIO m) =>
(Word8, Word8, Word8, Word8)
-> PortNumber -> (Socket -> m ()) -> m ()
withConnectionM (Word8, Word8, Word8, Word8)
addr PortNumber
port =
    forall (m :: * -> *) a c b.
MonadMask m =>
m a -> (a -> m c) -> (a -> m b) -> m b
bracket (forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ (Word8, Word8, Word8, Word8) -> PortNumber -> IO Socket
connect (Word8, Word8, Word8, Word8)
addr PortNumber
port) (forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall b c a. (b -> c) -> (a -> b) -> a -> c
. Socket -> IO ()
Net.close)

-------------------------------------------------------------------------------
-- Connect (unfolds)
-------------------------------------------------------------------------------

-- | Transform an 'Unfold' from a 'Socket' to an unfold from a remote IP
-- address and port. The resulting unfold opens a socket, uses it using the
-- supplied unfold and then makes sure that the socket is closed on normal
-- termination or in case of an exception.  If closing the socket raises an
-- exception, then this exception will be raised by 'usingConnection'.
--
-- /Pre-release/
{-# INLINE usingConnection #-}
usingConnection :: (MonadCatch m, MonadAsync m)
    => Unfold m Socket a
    -> Unfold m ((Word8, Word8, Word8, Word8), PortNumber) a
usingConnection :: forall (m :: * -> *) a.
(MonadCatch m, MonadAsync m) =>
Unfold m Socket a
-> Unfold m ((Word8, Word8, Word8, Word8), PortNumber) a
usingConnection =
    forall (m :: * -> *) a c d b.
(MonadAsync m, MonadCatch m) =>
(a -> m c) -> (c -> m d) -> Unfold m c b -> Unfold m a b
UF.bracket (\((Word8, Word8, Word8, Word8)
addr, PortNumber
port) -> forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ (Word8, Word8, Word8, Word8) -> PortNumber -> IO Socket
connect (Word8, Word8, Word8, Word8)
addr PortNumber
port)
               (forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall b c a. (b -> c) -> (a -> b) -> a -> c
. Socket -> IO ()
Net.close)

-------------------------------------------------------------------------------
-- Connect (streams)
-------------------------------------------------------------------------------

-- | @'withConnection' addr port act@ opens a connection to the specified IPv4
-- host address and port and passes the resulting socket handle to the
-- computation @act@.  The handle will be closed on exit from 'withConnection',
-- whether by normal termination or by raising an exception.  If closing the
-- handle raises an exception, then this exception will be raised by
-- 'withConnection' rather than any exception raised by 'act'.
--
-- /Pre-release/
{-# INLINE withConnection #-}
withConnection :: (IsStream t, MonadCatch m, MonadAsync m)
    => (Word8, Word8, Word8, Word8) -> PortNumber -> (Socket -> t m a) -> t m a
withConnection :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadCatch m, MonadAsync m) =>
(Word8, Word8, Word8, Word8)
-> PortNumber -> (Socket -> t m a) -> t m a
withConnection (Word8, Word8, Word8, Word8)
addr PortNumber
port =
    forall (t :: (* -> *) -> * -> *) (m :: * -> *) b c a.
(IsStream t, MonadAsync m, MonadCatch m) =>
m b -> (b -> m c) -> (b -> t m a) -> t m a
S.bracket (forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ (Word8, Word8, Word8, Word8) -> PortNumber -> IO Socket
connect (Word8, Word8, Word8, Word8)
addr PortNumber
port) (forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall b c a. (b -> c) -> (a -> b) -> a -> c
. Socket -> IO ()
Net.close)

-------------------------------------------------------------------------------
-- Read Addr to Stream
-------------------------------------------------------------------------------

-- | Read a stream from the supplied IPv4 host address and port number.
--
-- @since 0.7.0
{-# INLINE read #-}
read :: (MonadCatch m, MonadAsync m)
    => Unfold m ((Word8, Word8, Word8, Word8), PortNumber) Word8
read :: forall (m :: * -> *).
(MonadCatch m, MonadAsync m) =>
Unfold m ((Word8, Word8, Word8, Word8), PortNumber) Word8
read = forall (m :: * -> *) a b c.
Monad m =>
Unfold m a b -> Unfold m b c -> Unfold m a c
UF.many (forall (m :: * -> *) a.
(MonadCatch m, MonadAsync m) =>
Unfold m Socket a
-> Unfold m ((Word8, Word8, Word8, Word8), PortNumber) a
usingConnection forall (m :: * -> *). MonadIO m => Unfold m Socket (Array Word8)
ISK.readChunks) forall (m :: * -> *) a.
(Monad m, Storable a) =>
Unfold m (Array a) a
A.read

-- | Read a stream from the supplied IPv4 host address and port number.
--
-- @since 0.7.0
{-# INLINE toBytes #-}
toBytes :: (IsStream t, MonadCatch m, MonadAsync m)
    => (Word8, Word8, Word8, Word8) -> PortNumber -> t m Word8
toBytes :: forall (t :: (* -> *) -> * -> *) (m :: * -> *).
(IsStream t, MonadCatch m, MonadAsync m) =>
(Word8, Word8, Word8, Word8) -> PortNumber -> t m Word8
toBytes (Word8, Word8, Word8, Word8)
addr PortNumber
port = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m, Storable a) =>
t m (Array a) -> t m a
AS.concat forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadCatch m, MonadAsync m) =>
(Word8, Word8, Word8, Word8)
-> PortNumber -> (Socket -> t m a) -> t m a
withConnection (Word8, Word8, Word8, Word8)
addr PortNumber
port forall (t :: (* -> *) -> * -> *) (m :: * -> *).
(IsStream t, MonadIO m) =>
Socket -> t m (Array Word8)
ISK.toChunks

-------------------------------------------------------------------------------
-- Writing
-------------------------------------------------------------------------------

-- | Write a stream of arrays to the supplied IPv4 host address and port
-- number.
--
-- @since 0.7.0
{-# INLINE putChunks #-}
putChunks
    :: (MonadCatch m, MonadAsync m)
    => (Word8, Word8, Word8, Word8)
    -> PortNumber
    -> SerialT m (Array Word8)
    -> m ()
putChunks :: forall (m :: * -> *).
(MonadCatch m, MonadAsync m) =>
(Word8, Word8, Word8, Word8)
-> PortNumber -> SerialT m (Array Word8) -> m ()
putChunks (Word8, Word8, Word8, Word8)
addr PortNumber
port SerialT m (Array Word8)
xs =
    forall (m :: * -> *) a. Monad m => SerialT m a -> m ()
S.drain forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadCatch m, MonadAsync m) =>
(Word8, Word8, Word8, Word8)
-> PortNumber -> (Socket -> t m a) -> t m a
withConnection (Word8, Word8, Word8, Word8)
addr PortNumber
port (\Socket
sk -> forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, IsStream t) =>
m a -> t m a
S.fromEffect forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
Socket -> SerialT m (Array a) -> m ()
ISK.putChunks Socket
sk SerialT m (Array Word8)
xs)

-- | Write a stream of arrays to the supplied IPv4 host address and port
-- number.
--
-- @since 0.7.0
{-# INLINE writeChunks #-}
writeChunks
    :: (MonadAsync m, MonadCatch m)
    => (Word8, Word8, Word8, Word8)
    -> PortNumber
    -> Fold m (Array Word8) ()
writeChunks :: forall (m :: * -> *).
(MonadAsync m, MonadCatch m) =>
(Word8, Word8, Word8, Word8)
-> PortNumber -> Fold m (Array Word8) ()
writeChunks (Word8, Word8, Word8, Word8)
addr PortNumber
port = forall (m :: * -> *) a b s.
(s -> a -> m (Step s b))
-> m (Step s b) -> (s -> m b) -> Fold m a b
Fold forall {m :: * -> *} {a} {b} {b}.
(MonadCatch m, MonadIO m) =>
Tuple' (Fold m a b) Socket
-> a -> m (Step (Tuple' (Fold m a b) Socket) b)
step forall {b}. m (Step (Tuple' (Fold m (Array Word8) ()) Socket) b)
initial forall {m :: * -> *} {a} {b}.
MonadIO m =>
Tuple' (Fold m a b) Socket -> m b
extract
    where
    initial :: m (Step (Tuple' (Fold m (Array Word8) ()) Socket) b)
initial = do
        Socket
skt <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO ((Word8, Word8, Word8, Word8) -> PortNumber -> IO Socket
connect (Word8, Word8, Word8, Word8)
addr PortNumber
port)
        Fold m (Array Word8) ()
fld <- forall (m :: * -> *) a b. Monad m => Fold m a b -> m (Fold m a b)
FL.initialize (forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
Socket -> Fold m (Array a) ()
ISK.writeChunks Socket
skt)
                    forall (m :: * -> *) a b. MonadCatch m => m a -> m b -> m a
`MC.onException` forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Socket -> IO ()
Net.close Socket
skt)
        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s b. s -> Step s b
FL.Partial (forall a b. a -> b -> Tuple' a b
Tuple' Fold m (Array Word8) ()
fld Socket
skt)
    step :: Tuple' (Fold m a b) Socket
-> a -> m (Step (Tuple' (Fold m a b) Socket) b)
step (Tuple' Fold m a b
fld Socket
skt) a
x = do
        Fold m a b
r <- forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> a -> m (Fold m a b)
FL.snoc Fold m a b
fld a
x forall (m :: * -> *) a b. MonadCatch m => m a -> m b -> m a
`MC.onException` forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Socket -> IO ()
Net.close Socket
skt)
        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s b. s -> Step s b
FL.Partial (forall a b. a -> b -> Tuple' a b
Tuple' Fold m a b
r Socket
skt)
    extract :: Tuple' (Fold m a b) Socket -> m b
extract (Tuple' (Fold s -> a -> m (Step s b)
_ m (Step s b)
initial1 s -> m b
extract1) Socket
skt) = do
        forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ Socket -> IO ()
Net.close Socket
skt
        Step s b
res <- m (Step s b)
initial1
        case Step s b
res of
            FL.Partial s
fs -> s -> m b
extract1 s
fs
            FL.Done b
fb -> forall (m :: * -> *) a. Monad m => a -> m a
return b
fb

-- | Like 'write' but provides control over the write buffer. Output will
-- be written to the IO device as soon as we collect the specified number of
-- input elements.
--
-- @since 0.7.0
{-# INLINE putBytesWithBufferOf #-}
putBytesWithBufferOf
    :: (MonadCatch m, MonadAsync m)
    => Int
    -> (Word8, Word8, Word8, Word8)
    -> PortNumber
    -> SerialT m Word8
    -> m ()
putBytesWithBufferOf :: forall (m :: * -> *).
(MonadCatch m, MonadAsync m) =>
Int
-> (Word8, Word8, Word8, Word8)
-> PortNumber
-> SerialT m Word8
-> m ()
putBytesWithBufferOf Int
n (Word8, Word8, Word8, Word8)
addr PortNumber
port SerialT m Word8
m = forall (m :: * -> *).
(MonadCatch m, MonadAsync m) =>
(Word8, Word8, Word8, Word8)
-> PortNumber -> SerialT m (Array Word8) -> m ()
putChunks (Word8, Word8, Word8, Word8)
addr PortNumber
port forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadIO m, Storable a) =>
Int -> t m a -> t m (Array a)
AS.arraysOf Int
n SerialT m Word8
m

-- | Like 'write' but provides control over the write buffer. Output will
-- be written to the IO device as soon as we collect the specified number of
-- input elements.
--
-- @since 0.7.0
{-# INLINE writeWithBufferOf #-}
writeWithBufferOf
    :: (MonadAsync m, MonadCatch m)
    => Int
    -> (Word8, Word8, Word8, Word8)
    -> PortNumber
    -> Fold m Word8 ()
writeWithBufferOf :: forall (m :: * -> *).
(MonadAsync m, MonadCatch m) =>
Int
-> (Word8, Word8, Word8, Word8) -> PortNumber -> Fold m Word8 ()
writeWithBufferOf Int
n (Word8, Word8, Word8, Word8)
addr PortNumber
port =
    forall (m :: * -> *) a b c.
Monad m =>
Int -> Fold m a b -> Fold m b c -> Fold m a c
FL.chunksOf Int
n (forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
Int -> Fold m a (Array a)
writeNUnsafe Int
n) (forall (m :: * -> *).
(MonadAsync m, MonadCatch m) =>
(Word8, Word8, Word8, Word8)
-> PortNumber -> Fold m (Array Word8) ()
writeChunks (Word8, Word8, Word8, Word8)
addr PortNumber
port)

-- | Write a stream to the supplied IPv4 host address and port number.
--
-- @since 0.7.0
{-# INLINE putBytes #-}
putBytes :: (MonadCatch m, MonadAsync m)
    => (Word8, Word8, Word8, Word8) -> PortNumber -> SerialT m Word8 -> m ()
putBytes :: forall (m :: * -> *).
(MonadCatch m, MonadAsync m) =>
(Word8, Word8, Word8, Word8)
-> PortNumber -> SerialT m Word8 -> m ()
putBytes = forall (m :: * -> *).
(MonadCatch m, MonadAsync m) =>
Int
-> (Word8, Word8, Word8, Word8)
-> PortNumber
-> SerialT m Word8
-> m ()
putBytesWithBufferOf Int
defaultChunkSize

-- | Write a stream to the supplied IPv4 host address and port number.
--
-- @since 0.7.0
{-# INLINE write #-}
write :: (MonadAsync m, MonadCatch m)
    => (Word8, Word8, Word8, Word8) -> PortNumber -> Fold m Word8 ()
write :: forall (m :: * -> *).
(MonadAsync m, MonadCatch m) =>
(Word8, Word8, Word8, Word8) -> PortNumber -> Fold m Word8 ()
write = forall (m :: * -> *).
(MonadAsync m, MonadCatch m) =>
Int
-> (Word8, Word8, Word8, Word8) -> PortNumber -> Fold m Word8 ()
writeWithBufferOf Int
defaultChunkSize

-------------------------------------------------------------------------------
-- Transformations
-------------------------------------------------------------------------------

{-# INLINE withInputConnect #-}
withInputConnect
    :: (IsStream t, MonadCatch m, MonadAsync m)
    => (Word8, Word8, Word8, Word8)
    -> PortNumber
    -> SerialT m Word8
    -> (Socket -> t m a)
    -> t m a
withInputConnect :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadCatch m, MonadAsync m) =>
(Word8, Word8, Word8, Word8)
-> PortNumber -> SerialT m Word8 -> (Socket -> t m a) -> t m a
withInputConnect (Word8, Word8, Word8, Word8)
addr PortNumber
port SerialT m Word8
input Socket -> t m a
f = forall (t :: (* -> *) -> * -> *) (m :: * -> *) b c a.
(IsStream t, MonadAsync m, MonadCatch m) =>
m b -> (b -> m c) -> (b -> t m a) -> t m a
S.bracket m (Socket, ThreadId)
pre forall {m :: * -> *} {b}. MonadIO m => (Socket, b) -> m ()
post forall {b}. (Socket, b) -> t m a
handler

    where

    pre :: m (Socket, ThreadId)
pre = do
        Socket
sk <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ (Word8, Word8, Word8, Word8) -> PortNumber -> IO Socket
connect (Word8, Word8, Word8, Word8)
addr PortNumber
port
        ThreadId
tid <- forall (m :: * -> *). MonadRunInIO m => m () -> m ThreadId
fork (forall (m :: * -> *).
MonadIO m =>
Socket -> SerialT m Word8 -> m ()
ISK.putBytes Socket
sk SerialT m Word8
input)
        forall (m :: * -> *) a. Monad m => a -> m a
return (Socket
sk, ThreadId
tid)

    handler :: (Socket, b) -> t m a
handler (Socket
sk, b
_) = Socket -> t m a
f Socket
sk

    -- XXX kill the thread immediately?
    post :: (Socket, b) -> m ()
post (Socket
sk, b
_) = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ Socket -> IO ()
Net.close Socket
sk

-- | Send an input stream to a remote host and produce the output stream from
-- the host. The server host just acts as a transformation function on the
-- input stream.  Both sending and receiving happen asynchronously.
--
-- /Pre-release/
--
{-# INLINE processBytes #-}
processBytes
    :: (IsStream t, MonadAsync m, MonadCatch m)
    => (Word8, Word8, Word8, Word8)
    -> PortNumber
    -> SerialT m Word8
    -> t m Word8
processBytes :: forall (t :: (* -> *) -> * -> *) (m :: * -> *).
(IsStream t, MonadAsync m, MonadCatch m) =>
(Word8, Word8, Word8, Word8)
-> PortNumber -> SerialT m Word8 -> t m Word8
processBytes (Word8, Word8, Word8, Word8)
addr PortNumber
port SerialT m Word8
input = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadCatch m, MonadAsync m) =>
(Word8, Word8, Word8, Word8)
-> PortNumber -> SerialT m Word8 -> (Socket -> t m a) -> t m a
withInputConnect (Word8, Word8, Word8, Word8)
addr PortNumber
port SerialT m Word8
input forall (t :: (* -> *) -> * -> *) (m :: * -> *).
(IsStream t, MonadIO m) =>
Socket -> t m Word8
ISK.toBytes