Portability | base >= 4.3 |
---|---|
Maintainer | joeyadams3.14159@gmail.com |
Control.Concurrent.STM.Channelize
Contents
Description
Wrap a network connection such that sending and receiving can be done via STM transactions.
See connectHandle
for basic usage. See the examples
directory of this
package for full examples.
- channelize :: IO (ChannelizeConfig msg_in msg_out) -> (TDuplex msg_in msg_out -> IO a) -> IO a
- data TDuplex msg_in msg_out
- recv :: TDuplex msg_in msg_out -> STM msg_in
- send :: TDuplex msg_in msg_out -> msg_out -> STM ()
- sendE :: TDuplex msg_in msg_out -> msg_out -> STM ()
- data ChannelizeConfig msg_in msg_out = ChannelizeConfig {}
- connectStdio :: IO (ChannelizeConfig String String)
- connectHandle :: Handle -> IO (ChannelizeConfig String String)
- hGetInterruptible :: (Handle -> IO a) -> Handle -> IO a
- data ChannelizeException
The channelize function
Arguments
:: IO (ChannelizeConfig msg_in msg_out) | Connect action. It is run inside of an asynchronous
exception |
-> (TDuplex msg_in msg_out -> IO a) | Inner computation. When this completes or throws an exception, the connection will be closed. |
-> IO a |
Open a connection, and manage it so it can be used as a TDuplex
.
This works by spawning two threads, one which receives messages and another
which sends messages. If the recvMsg
callback throws an exception, it
will be forwarded to the next recv
call (once the receive queue is empty).
When the inner computation completes (or throws an exception), the send queue is flushed and the connection is closed.
Using TDuplex for transactional I/O
recv :: TDuplex msg_in msg_out -> STM msg_inSource
Read a message from the receive queue. retry
if no message is available yet.
This will throw an exception if the reading thread encountered an error, or if the connection is closed.
Remember that STM transactions have no effect until they commit. Thus, to send a message and wait for a response, you will need to use two separate transactions:
atomically
$send
duplex "What is your name?" name <-atomically
$recv
duplex
send :: TDuplex msg_in msg_out -> msg_out -> STM ()Source
Write a message to the send queue.
If an error occurred while sending a previous message, or if the connection
is closed, send
silently ignores the message and returns. Rationale:
suppose you have threads for clients A and B. A sends a message to B. If
send
were to throw an exception on failure, you might inadvertently
disconnect A because of a failure that is B's fault.
Configuring a connection
data ChannelizeConfig msg_in msg_out Source
Callbacks telling channelize
how to use a duplex connection.
Constructors
ChannelizeConfig | |
Fields
|
connectStdio :: IO (ChannelizeConfig String String)Source
Treat stdin
and stdout
as a "connection", where each message
corresponds to a line.
This sets the buffering mode of stdin
and stdout
to LineBuffering
.
connectHandle :: Handle -> IO (ChannelizeConfig String String)Source
Wrap a duplex Handle
in a ChannelizeConfig
. Each message corresponds
to a line.
Example (client):
let connect = connectTo "localhost" (PortNumber 1234) >>= connectHandle in channelize connect $ \duplex -> do ...
Example (Telnet server):
(handle, host, port) <- accept sock putStrLn $ "Accepted connection from " ++ host ++ ":" ++ show port -- Swallow carriage returns sent by telnet clients hSetNewlineMode handle universalNewlineMode forkIO $ channelize (connectHandle handle) $ \duplex -> do ...
hGetInterruptible :: (Handle -> IO a) -> Handle -> IO aSource
Perform a read action on a Handle
. Try to ensure that it can be
interrupted by an asynchronous exception.
On Windows with -threaded, a thread reading a Handle
cannot be interrupted
by an asynchronous exception. The exception will not be delivered until the
receive operation completes or fails.
hGetInterruptible
works around this problem (when present) by calling
hWaitForInput
over and over with a delay of one second.