streamly-0.9.0: Streaming, dataflow programming and declarative concurrency
Copyright(c) 2018 Composewell Technologies
LicenseBSD-3-Clause
Maintainerstreamly@composewell.com
Stabilityreleased
PortabilityGHC
Safe HaskellSafe-Inferred
LanguageHaskell2010

Streamly.Network.Socket

Description

This module provides socket based streaming APIs to to receive connections from remote hosts, and to read and write from and to network sockets.

For basic socket types and operations please consult the Network.Socket module of the network package.

Examples

To write a server, use the acceptor unfold to start listening for connections from clients. acceptor generates a stream of connected sockets. We can map an effectful action on this socket stream to handle the connections. The action would typically use socket reading and writing operations to communicate with the remote host. We can read/write a stream of bytes or a stream of chunks of bytes (Array).

Following is a short example of a concurrent echo server. Please note that this example can be written even more succinctly by using higher level operations from Streamly.Network.Inet.TCP module.

>>> :set -XFlexibleContexts
>>> 
>>> import Data.Function ((&))
>>> import Network.Socket
>>> import Streamly.Network.Socket (SockSpec(..))
>>> 
>>> import qualified Streamly.Data.Fold as Fold
>>> import qualified Streamly.Data.Stream as Stream
>>> import qualified Streamly.Data.Stream.Prelude as Stream
>>> import qualified Streamly.Network.Socket as Socket
>>> 
>>> :{
 main :: IO ()
 main = do
      let spec = SockSpec
                 { sockFamily = AF_INET
                 , sockType   = Stream
                 , sockProto  = defaultProtocol
                 , sockOpts   = []
                 }
          addr = SockAddrInet 8090 (tupleToHostAddress (0,0,0,0))
       in server spec addr
      where
      server spec addr =
            Stream.unfold Socket.acceptor (maxListenQueue, spec, addr)
          & Stream.parMapM (Stream.eager True) (Socket.forSocketM echo)
          & Stream.fold Fold.drain
      echo sk =
            Stream.unfold Socket.chunkReader sk -- Stream IO (Array Word8)
          & Stream.fold (Socket.writeChunks sk) -- IO ()
:}

Programmer Notes

Read IO requests to connected stream sockets are performed in chunks of defaultChunkSize. Unless specified otherwise in the API, writes are collected into chunks of defaultChunkSize before they are written to the socket.

>>> import qualified Streamly.Network.Socket as Socket

See Also

Synopsis

Socket Specification

data SockSpec Source #

Specify the socket protocol details.

Accept Connections

acceptor :: MonadIO m => Unfold m (Int, SockSpec, SockAddr) Socket Source #

Unfold a three tuple (listenQLen, spec, addr) into a stream of connected protocol sockets corresponding to incoming connections. listenQLen is the maximum number of pending connections in the backlog. spec is the socket protocol and options specification and addr is the protocol address where the server listens for incoming connections.

Reads

Singleton

getChunk :: Int -> Socket -> IO (Array Word8) Source #

Read a byte array from a file handle up to a maximum of the requested size. If no data is available on the handle it blocks until some data becomes available. If data is available then it immediately returns that data without blocking.

Unfolds

reader :: MonadIO m => Unfold m Socket Word8 Source #

Unfolds a Socket into a byte stream. IO requests to the socket are performed in sizes of defaultChunkSize.

readerWith :: MonadIO m => Unfold m (Int, Socket) Word8 Source #

Unfolds the tuple (bufsize, socket) into a byte stream, read requests to the socket are performed using buffers of bufsize.

chunkReader :: MonadIO m => Unfold m Socket (Array Word8) Source #

Unfolds a socket into a stream of Word8 arrays. Requests to the socket are performed using a buffer of size defaultChunkSize. The size of arrays in the resulting stream are therefore less than or equal to defaultChunkSize.

chunkReaderWith :: MonadIO m => Unfold m (Int, Socket) (Array Word8) Source #

Unfold the tuple (bufsize, socket) into a stream of Word8 arrays. Read requests to the socket are performed using a buffer of size bufsize. The size of an array in the resulting stream is always less than or equal to bufsize.

Writes

Singleton

putChunk :: Unbox a => Socket -> Array a -> IO () Source #

Write an Array to a socket.

Folds

write :: MonadIO m => Socket -> Fold m Word8 () Source #

Write a byte stream to a socket. Accumulates the input in chunks of up to defaultChunkSize bytes before writing.

>>> write = Socket.writeWith defaultChunkSize

writeWith :: MonadIO m => Int -> Socket -> Fold m Word8 () Source #

Write a byte stream to a socket. Accumulates the input in chunks of specified number of bytes before writing.

writeChunks :: (MonadIO m, Unbox a) => Socket -> Fold m (Array a) () Source #

Write a stream of arrays to a socket. Each array in the stream is written to the socket as a separate IO request.

writeChunksWith :: (MonadIO m, Unbox a) => Int -> Socket -> Fold m (Array a) () Source #

writeChunksWith bufsize socket writes a stream of arrays to socket after coalescing the adjacent arrays in chunks of bufsize. Multiple arrays are coalesed as long as the total size remains below the specified size. It never splits an array, if a single array is bigger than the specified size it emitted as it is.

Exceptions

forSocketM :: (MonadMask m, MonadIO m) => (Socket -> m ()) -> Socket -> m () Source #

forSocketM action socket runs the monadic computation action passing the socket handle to it. The handle will be closed on exit from forSocketM, whether by normal termination or by raising an exception. If closing the handle raises an exception, then this exception will be raised by forSocketM rather than any exception raised by action.

Deprecated

accept :: MonadIO m => Unfold m (Int, SockSpec, SockAddr) Socket Source #

Deprecated: Please use acceptor instead

readChunk :: Int -> Socket -> IO (Array Word8) Source #

Deprecated: Please use getChunk instead

writeChunk :: Unbox a => Socket -> Array a -> IO () Source #

Deprecated: Please use putChunk instead

read :: MonadIO m => Unfold m Socket Word8 Source #

Deprecated: Please use reader instead

readWithBufferOf :: MonadIO m => Unfold m (Int, Socket) Word8 Source #

Deprecated: Please use readerWith instead

Same as readWith

readChunks :: MonadIO m => Unfold m Socket (Array Word8) Source #

Deprecated: Please use chunkReader instead

readChunksWithBufferOf :: MonadIO m => Unfold m (Int, Socket) (Array Word8) Source #

Deprecated: Please use chunkReaderWith instead

Same as chunkReaderWith

writeWithBufferOf :: MonadIO m => Int -> Socket -> Fold m Word8 () Source #

Deprecated: Please use writeWith instead

Same as writeWith

writeChunksWithBufferOf :: (MonadIO m, Unbox a) => Int -> Socket -> Fold m (Array a) () Source #

Deprecated: Please use writeChunksWith instead

Same as writeChunksWith