-- | -- Module: Control.Concurrent.STM.Channelize -- Copyright: (c) Joseph Adams 2012 -- Maintainer: joeyadams3.14159@gmail.com -- Portability: base >= 4.3 -- -- Wrap a network connection such that sending and receiving can be done via -- STM transactions. -- -- See 'connectHandle' for basic usage. See the @examples@ directory of this -- package for full examples. {-# LANGUAGE CPP, DeriveDataTypeable #-} module Control.Concurrent.STM.Channelize ( -- * The channelize function channelize, -- * Using TDuplex for transactional I/O TDuplex, recv, send, sendE, -- * Configuring a connection ChannelizeConfig(..), connectStdio, connectHandle, hGetInterruptible, -- * Exceptions ChannelizeException(..), ) where import Prelude hiding (catch) import Control.Concurrent import Control.Concurrent.STM import Control.Exception import Control.Monad import Data.IORef import Data.Typeable import System.IO -- | An abstract object that supports sending and receiving messages in STM. -- -- Internally, it is a pair of 'TChan's, with additional flags to check for I/O -- errors on 'recv', and to avoid filling the send queue with messages that -- will never be sent. data TDuplex msg_in msg_out = TDuplex { tdRecvStatus :: TVar WorkerStatus , tdRecvChan :: TChan msg_in , tdSendStatus :: TVar WorkerStatus , tdSendChan :: TChan msg_out , tdStop :: TVar Bool } {- | Read a message from the receive queue. 'retry' if no message is available yet. This will throw an exception if the reading thread encountered an error, or if the connection is closed. Remember that STM transactions have no effect until they commit. Thus, to send a message and wait for a response, you will need to use two separate transactions: @ 'atomically' $ 'send' duplex \"What is your name?\" name <- 'atomically' $ 'recv' duplex @ -} recv :: TDuplex msg_in msg_out -> STM msg_in recv td = readTChan (tdRecvChan td) `orElse` do s <- readTVar $ tdRecvStatus td case s of Running -> retry Stopped -> throwSTM ChannelizeClosedRecv Error e -> throwSTM e -- | Write a message to the send queue. -- -- If an error occurred while sending a previous message, or if the connection -- is closed, 'send' silently ignores the message and returns. Rationale: -- suppose you have threads for clients A and B. A sends a message to B. If -- 'send' were to throw an exception on failure, you might inadvertently -- disconnect A because of a failure that is B's fault. send :: TDuplex msg_in msg_out -> msg_out -> STM () send td msg = sendReturnThrow td msg >> return () -- | Like 'send', but throw an exception if the message was discarded (either -- because 'sendMsg' failed on a previous message, or because the connection -- has been closed). sendE :: TDuplex msg_in msg_out -> msg_out -> STM () sendE td msg = join $ sendReturnThrow td msg sendReturnThrow :: TDuplex msg_in msg_out -> msg_out -> STM (STM ()) sendReturnThrow td msg = do s <- readTVar $ tdSendStatus td case s of Running -> do stopped <- readTVar (tdStop td) if stopped then return $ throwSTM ChannelizeClosedSend else do writeTChan (tdSendChan td) msg return $ return () Stopped -> return $ throwSTM ChannelizeClosedSend Error e -> return $ throwSTM e data ChannelizeException = ChannelizeClosedRecv | ChannelizeClosedSend deriving Typeable instance Show ChannelizeException where show ChannelizeClosedRecv = "channelize: receive callback used after connection was closed" show ChannelizeClosedSend = "channelize: send callback used after connection was closed" instance Exception ChannelizeException data ChannelizeKill = ChannelizeKill deriving (Show, Typeable) instance Exception ChannelizeKill data WorkerStatus = Running | Stopped | Error SomeException -- | Callbacks telling 'channelize' how to use a duplex connection. data ChannelizeConfig msg_in msg_out = ChannelizeConfig { recvMsg :: IO msg_in , sendMsg :: msg_out -> IO () -- ^ Callbacks for sending and receiving messages. All calls of -- 'recvMsg' will be in one thread, and all calls of 'sendMsg' will -- be in another thread. If 'recvMsg' throws an exception, it will -- not be called again. If 'sendMsg' throws an exception, it will -- not be called again, nor will 'sendBye' be called. -- -- This means it is safe to use an 'IORef' to pass state from one -- 'recvMsg' or 'sendMsg' call to the next. However, to share -- state between 'recvMsg' and 'sendMsg', you will need to use -- thread synchronization (e.g. 'MVar', 'STM'). , sendBye :: IO () -- ^ Action to call before closing the connection, but only if none -- of the send calls failed. This is called from the same thread -- as 'sendMsg'. , connClose :: IO () -- ^ Callback for closing the connection. Called when 'channelize' -- completes. } -- | Treat 'stdin' and 'stdout' as a \"connection\", where each message -- corresponds to a line. -- -- This sets the buffering mode of 'stdin' and 'stdout' to 'LineBuffering'. connectStdio :: IO (ChannelizeConfig String String) connectStdio = do hSetBuffering stdin LineBuffering hSetBuffering stdout LineBuffering return ChannelizeConfig { recvMsg = hGetInterruptible hGetLine stdin , sendMsg = hPutStrLn stdout , sendBye = return () , connClose = return () } -- | Wrap a duplex 'Handle' in a 'ChannelizeConfig'. Each message corresponds -- to a line. -- -- Example (client): -- -- >let connect = connectTo "localhost" (PortNumber 1234) >>= connectHandle -- > in channelize connect $ \duplex -> do -- > ... -- -- Example (Telnet server): -- -- >(handle, host, port) <- accept sock -- >putStrLn $ "Accepted connection from " ++ host ++ ":" ++ show port -- > -- >-- Swallow carriage returns sent by telnet clients -- >hSetNewlineMode handle universalNewlineMode -- > -- >forkIO $ channelize (connectHandle handle) $ \duplex -> do -- > ... connectHandle :: Handle -> IO (ChannelizeConfig String String) connectHandle h = do hSetBuffering h LineBuffering `onException` hClose h return ChannelizeConfig { recvMsg = hGetInterruptible hGetLine h , sendMsg = hPutStrLn h , sendBye = return () , connClose = hClose h } -- | Perform a read action on a 'Handle'. Try to ensure that it can be -- interrupted by an asynchronous exception. -- -- On Windows with -threaded, a thread reading a 'Handle' cannot be interrupted -- by an asynchronous exception. The exception will not be delivered until the -- receive operation completes or fails. -- -- 'hGetInterruptible' works around this problem (when present) by calling -- 'hWaitForInput' over and over with a delay of one second. hGetInterruptible :: (Handle -> IO a) -> Handle -> IO a #ifdef mingw32_HOST_OS hGetInterruptible inner h | rtsSupportsBoundThreads = let loop = do ready <- hWaitForInput h 1000 if ready then inner h else allowInterrupt >> loop #if !MIN_VERSION_base(4,4,0) allowInterrupt = unblock $ return () #endif in loop | otherwise = inner h #else hGetInterruptible inner h = inner h #endif -- | Open a connection, and manage it so it can be used as a 'TDuplex'. -- -- This works by spawning two threads, one which receives messages and another -- which sends messages. If the 'recvMsg' callback throws an exception, it -- will be forwarded to the next 'recv' call (once the receive queue is empty). -- -- When the inner computation completes (or throws an exception), the send -- queue is flushed and the connection is closed. channelize :: IO (ChannelizeConfig msg_in msg_out) -- ^ Connect action. It is run inside of an asynchronous -- exception 'mask'. -> (TDuplex msg_in msg_out -> IO a) -- ^ Inner computation. When this completes or throws an -- exception, the connection will be closed. -> IO a channelize connect inner = do recv_chan <- newTChanIO send_chan <- newTChanIO stop <- newTVarIO False recv_status <- newTVarIO Running send_status <- newTVarIO Running let tduplex = TDuplex { tdRecvStatus = recv_status , tdRecvChan = recv_chan , tdSendStatus = send_status , tdSendChan = send_chan , tdStop = stop } mask $ \restore -> do config <- connect let recvLoop = do msg <- recvMsg config join $ atomically $ do s <- readTVar stop if s then do writeTVar recv_status Stopped return $ return () else do writeTChan recv_chan msg return recvLoop sendLoop = join $ atomically $ (do msg <- readTChan send_chan return $ do sendMsg config msg sendLoop ) `orElse` (do s <- readTVar stop if s then return $ do sendBye config atomically $ writeTVar send_status Stopped else retry ) setError status_var e = case fromException e of Just ChannelizeKill -> writeTVar status_var Stopped _ -> writeTVar status_var $ Error e recver <- forkIO $ recvLoop `catch` (atomically . setError recv_status) _sender <- forkIO $ sendLoop `catch` (atomically . setError send_status) let finish = do atomically $ writeTVar stop True -- Wait for the sender and receiver threads to finish. This -- ensures that all enqueued data is sent before 'channelize' -- completes. -- -- If we receive any asynchronous exceptions after this point, -- too bad... the connection won't be properly closed. throwTo recver ChannelizeKill waitForWorkers connClose config waitForWorkers = atomically $ do r <- readTVar recv_status s <- readTVar send_status case (r, s) of (Running, _) -> retry (_, Running) -> retry _ -> return () r <- restore (inner tduplex) `onException` finish finish return r