streamly-0.10.1: Streaming, dataflow programming and declarative concurrency
Copyright(c) 2018 Composewell Technologies
LicenseBSD-3-Clause
Maintainerstreamly@composewell.com
Stabilityreleased
PortabilityGHC
Safe HaskellSafe-Inferred
LanguageHaskell2010

Streamly.Network.Socket

Description

This module provides socket based streaming APIs to to receive connections from remote hosts, and to read and write from and to network sockets.

For basic socket types and non-streaming operations please consult the Network.Socket module of the network package.

Examples

To write a server, use the accept stream to start listening for connections from clients. accept generates a stream of connected sockets. We can map an effectful action on this socket stream to handle the connections. The action would typically use socket reading and writing operations to communicate with the remote host. We can read/write a stream of bytes or a stream of chunks of bytes (Array).

Following is a short example of a concurrent echo server. Please note that this example can be written even more succinctly by using higher level operations from Streamly.Network.Inet.TCP module.

>>> :set -XFlexibleContexts
>>> 
>>> import Data.Function ((&))
>>> import Network.Socket
>>> import Streamly.Network.Socket (SockSpec(..))
>>> 
>>> import qualified Streamly.Data.Fold as Fold
>>> import qualified Streamly.Data.Stream.Prelude as Stream
>>> import qualified Streamly.Network.Socket as Socket
>>> 
>>> :{
 main :: IO ()
 main = do
      let spec = SockSpec
                 { sockFamily = AF_INET
                 , sockType   = Stream
                 , sockProto  = defaultProtocol
                 , sockOpts   = []
                 }
          addr = SockAddrInet 8090 (tupleToHostAddress (0,0,0,0))
       in server spec addr
      where
      server spec addr =
            Socket.accept maxListenQueue spec addr
          & Stream.parMapM (Stream.eager True) (Socket.forSocketM echo)
          & Stream.fold Fold.drain
      echo sk =
            Socket.readChunks sk -- Stream IO (Array Word8)
          & Stream.fold (Socket.writeChunks sk) -- IO ()
:}

Programmer Notes

Read IO requests to connected stream sockets are performed in chunks of defaultChunkSize. Unless specified otherwise in the API, writes are collected into chunks of defaultChunkSize before they are written to the socket.

>>> import qualified Streamly.Network.Socket as Socket

See Also

Synopsis

Socket Specification

data SockSpec Source #

Specify the socket protocol details.

Accept Connections

accept :: MonadIO m => Int -> SockSpec -> SockAddr -> Stream m Socket Source #

Start a TCP stream server that listens for connections on the supplied server address specification (address family, local interface IP address and port). The server generates a stream of connected sockets. The first argument is the maximum number of pending connections in the backlog.

Pre-release

acceptor :: MonadIO m => Unfold m (Int, SockSpec, SockAddr) Socket Source #

Unfold a three tuple (listenQLen, spec, addr) into a stream of connected protocol sockets corresponding to incoming connections. listenQLen is the maximum number of pending connections in the backlog. spec is the socket protocol and options specification and addr is the protocol address where the server listens for incoming connections.

Reads

Singleton

getChunk :: Int -> Socket -> IO (Array Word8) Source #

Read a byte array from a file handle up to a maximum of the requested size. If no data is available on the handle it blocks until some data becomes available. If data is available then it immediately returns that data without blocking.

Streams

read :: MonadIO m => Socket -> Stream m Word8 Source #

Generate a byte stream from a socket.

>>> read = Socket.readWith defaultChunkSize

Pre-release

readWith :: MonadIO m => Int -> Socket -> Stream m Word8 Source #

Generate a byte stream from a socket using a buffer of the given size.

Pre-release

readChunks :: MonadIO m => Socket -> Stream m (Array Word8) Source #

Read a stream of byte arrays from a socket. The maximum size of a single array is limited to defaultChunkSize.

>>> readChunks = Socket.readChunksWith defaultChunkSize

Pre-release

readChunksWith :: MonadIO m => Int -> Socket -> Stream m (Array Word8) Source #

readChunksWith bufsize socket reads a stream of arrays from socket. The maximum size of a single array is limited to bufsize.

Pre-release

Unfolds

reader :: MonadIO m => Unfold m Socket Word8 Source #

Unfolds a Socket into a byte stream. IO requests to the socket are performed in sizes of defaultChunkSize.

readerWith :: MonadIO m => Unfold m (Int, Socket) Word8 Source #

Unfolds the tuple (bufsize, socket) into a byte stream, read requests to the socket are performed using buffers of bufsize.

chunkReader :: MonadIO m => Unfold m Socket (Array Word8) Source #

Unfolds a socket into a stream of Word8 arrays. Requests to the socket are performed using a buffer of size defaultChunkSize. The size of arrays in the resulting stream are therefore less than or equal to defaultChunkSize.

chunkReaderWith :: MonadIO m => Unfold m (Int, Socket) (Array Word8) Source #

Unfold the tuple (bufsize, socket) into a stream of Word8 arrays. Read requests to the socket are performed using a buffer of size bufsize. The size of an array in the resulting stream is always less than or equal to bufsize.

Writes

Singleton

putChunk :: Unbox a => Socket -> Array a -> IO () Source #

Write an Array to a socket.

Folds

write :: MonadIO m => Socket -> Fold m Word8 () Source #

Write a byte stream to a socket. Accumulates the input in chunks of up to defaultChunkSize bytes before writing.

>>> write = Socket.writeWith defaultChunkSize

writeWith :: MonadIO m => Int -> Socket -> Fold m Word8 () Source #

Write a byte stream to a socket. Accumulates the input in chunks of specified number of bytes before writing.

writeChunks :: (MonadIO m, Unbox a) => Socket -> Fold m (Array a) () Source #

Write a stream of arrays to a socket. Each array in the stream is written to the socket as a separate IO request.

writeChunksWith :: (MonadIO m, Unbox a) => Int -> Socket -> Fold m (Array a) () Source #

writeChunksWith bufsize socket writes a stream of arrays to socket after coalescing the adjacent arrays in chunks of bufsize. Multiple arrays are coalesed as long as the total size remains below the specified size. It never splits an array, if a single array is bigger than the specified size it emitted as it is.

Exceptions

forSocketM :: (MonadMask m, MonadIO m) => (Socket -> m ()) -> Socket -> m () Source #

forSocketM action socket runs the monadic computation action passing the socket handle to it. The handle will be closed on exit from forSocketM, whether by normal termination or by raising an exception. If closing the handle raises an exception, then this exception will be raised by forSocketM rather than any exception raised by action.

Deprecated

readChunk :: Int -> Socket -> IO (Array Word8) Source #

Deprecated: Please use getChunk instead

writeChunk :: Unbox a => Socket -> Array a -> IO () Source #

Deprecated: Please use putChunk instead

readWithBufferOf :: MonadIO m => Unfold m (Int, Socket) Word8 Source #

Deprecated: Please use readerWith instead

Same as readWith

readChunksWithBufferOf :: MonadIO m => Unfold m (Int, Socket) (Array Word8) Source #

Deprecated: Please use chunkReaderWith instead

Same as chunkReaderWith

writeWithBufferOf :: MonadIO m => Int -> Socket -> Fold m Word8 () Source #

Deprecated: Please use writeWith instead

Same as writeWith

writeChunksWithBufferOf :: (MonadIO m, Unbox a) => Int -> Socket -> Fold m (Array a) () Source #

Deprecated: Please use writeChunksWith instead

Same as writeChunksWith