stm-channelize-0.1.1: Transactional I/O for duplex streams

Portabilitybase >= 4.3




Wrap a network connection such that sending and receiving can be done via STM transactions.

See connectHandle for basic usage. See the examples directory of this package for full examples.


The channelize function



:: IO (ChannelizeConfig msg_in msg_out)

Connect action. It is run inside of an asynchronous exception mask.

-> (TDuplex msg_in msg_out -> IO a)

Inner computation. When this completes or throws an exception, the connection will be closed.

-> IO a 

Open a connection, and manage it so it can be used as a TDuplex.

This works by spawning two threads, one which receives messages and another which sends messages. If the recvMsg callback throws an exception, it will be forwarded to the next recv call (once the receive queue is empty).

When the inner computation completes (or throws an exception), the send queue is flushed and the connection is closed.

Using TDuplex for transactional I/O

data TDuplex msg_in msg_out Source

An abstract object that supports sending and receiving messages in STM.

Internally, it is a pair of TChans, with additional flags to check for I/O errors on recv, and to avoid filling the send queue with messages that will never be sent.

recv :: TDuplex msg_in msg_out -> STM msg_inSource

Read a message from the receive queue. retry if no message is available yet.

This will throw an exception if the reading thread encountered an error, or if the connection is closed.

Remember that STM transactions have no effect until they commit. Thus, to send a message and wait for a response, you will need to use two separate transactions:

atomically $ send duplex "What is your name?"
name <- atomically $ recv duplex

send :: TDuplex msg_in msg_out -> msg_out -> STM ()Source

Write a message to the send queue.

If an error occurred while sending a previous message, or if the connection is closed, send silently ignores the message and returns. Rationale: suppose you have threads for clients A and B. A sends a message to B. If send were to throw an exception on failure, you might inadvertently disconnect A because of a failure that is B's fault.

sendE :: TDuplex msg_in msg_out -> msg_out -> STM ()Source

Like send, but throw an exception if the message was discarded (either because sendMsg failed on a previous message, or because the connection has been closed).

Configuring a connection

data ChannelizeConfig msg_in msg_out Source

Callbacks telling channelize how to use a duplex connection.




recvMsg :: IO msg_in
sendMsg :: msg_out -> IO ()

Callbacks for sending and receiving messages. All calls of recvMsg will be in one thread, and all calls of sendMsg will be in another thread. If recvMsg throws an exception, it will not be called again. If sendMsg throws an exception, it will not be called again, nor will sendBye be called.

This means it is safe to use an IORef to pass state from one recvMsg or sendMsg call to the next. However, to share state between recvMsg and sendMsg, you will need to use thread synchronization (e.g. MVar, STM).

sendBye :: IO ()

Action to call before closing the connection, but only if none of the send calls failed. This is called from the same thread as sendMsg.

connClose :: IO ()

Callback for closing the connection. Called when channelize completes.

connectStdio :: IO (ChannelizeConfig String String)Source

Treat stdin and stdout as a "connection", where each message corresponds to a line.

This sets the buffering mode of stdin and stdout to LineBuffering.

connectHandle :: Handle -> IO (ChannelizeConfig String String)Source

Wrap a duplex Handle in a ChannelizeConfig. Each message corresponds to a line.

Example (client):

let connect = connectTo "localhost" (PortNumber 1234) >>= connectHandle
 in channelize connect $ \duplex -> do

Example (Telnet server):

(handle, host, port) <- accept sock
putStrLn $ "Accepted connection from " ++ host ++ ":" ++ show port

-- Swallow carriage returns sent by telnet clients
hSetNewlineMode handle universalNewlineMode

forkIO $ channelize (connectHandle handle) $ \duplex -> do

hGetInterruptible :: (Handle -> IO a) -> Handle -> IO aSource

Perform a read action on a Handle. Try to ensure that it can be interrupted by an asynchronous exception.

On Windows with -threaded, a thread reading a Handle cannot be interrupted by an asynchronous exception. The exception will not be delivered until the receive operation completes or fails.

hGetInterruptible works around this problem (when present) by calling hWaitForInput over and over with a delay of one second.