-- |The Inter-Process Communication (IPC) library makes the task of setting -- up datagram sockets between processes trivial. The simplest method is: -- -- > outputQueue <- channelConnectSimple "channelName" -- -- > send outputQueue someInstanceOfBinary -- -- and -- -- > inputQueue <- channelAcceptSimple "channelName" -- -- > val <- recv inputQueue -- -- The Queues are TVars from Data.Queue, which can be imported for fine-tuned -- control. -- -- If you find this useful then e-mail me and I'll be more likely to maintain -- it and add features. Potential new features include: -- -- * Remove the static maximum message size ('maxBytes') limitation. -- -- * Fix connection capability (ex: if the sender closes, receiver rebinds and continues) -- -- * Testing on other platforms -- -- * Better exception handling -- -- * Input buffer size limitations (for memory concious apps) module System.IPC ( channelAccept , channelConnect , channelAcceptSimple , channelConnectSimple , D.send , D.recv , maxBytes , waitTillEmpty ) where import qualified Data.Queue as D import Network.Socket hiding (recv, send) import Network.Socket.ByteString import Data.Binary import qualified Data.ByteString as S import qualified Data.ByteString.Lazy as L import Control.Concurrent.STM import Control.Concurrent (forkIO, threadDelay) import Control.Monad (forever) import Control.Exception as X -- |The maximum bytes per datagram. Any data structure with an encoding greater than -- this size stands to be truncated (and probably non-decodable). maxBytes = 4096 -- |A wrapper for 'channelAccept'. Creates a new queue and sets conversion == return. channelAcceptSimple :: (Binary i) => String -> IO (TVar (D.Queue D.Input i)) channelAcceptSimple tag = do q <- newTVarIO D.empty forkIO $ channelAccept tag return q return q -- |A wrapper for 'channelConnect'. Creates a new queue and sets conversion == return. channelConnectSimple :: (Binary o) => String -> IO (TVar (D.Queue D.Output o)) channelConnectSimple tag = withSocketsDo $ do q <- newTVarIO D.empty forkIO $ channelConnect tag return q return q -- |Sets up an input channel to receive data. -- Uses the provided queue. Converts all data from the deserialized format before enqueuing. channelAccept :: (Binary a) => String -> (a -> IO input) -> TVar (D.Queue D.Input input) -> IO () channelAccept tag convIn q = withSocketsDo $ do sock <- socket AF_UNIX Datagram defaultProtocol bindSocket sock (SockAddrUnix ('\0' : tag)) receiver sock convIn q -- |Connects to an open channel to send data. -- Uses the provided queue. Converts all data from the queue formate before serializing / sending. channelConnect :: (Binary b) => String -> (output -> IO b) -> TVar (D.Queue D.Output output) -> IO () channelConnect tag convOut q = withSocketsDo $ do sock <- socket AF_UNIX Datagram defaultProtocol makeConnection sock 0 sender sock convOut q where makeConnection s t = X.catchJust ioErrors (doConnect s) (\_ -> threadDelay t >> makeConnection s (newDelay t)) doConnect sock = connect sock (SockAddrUnix ('\0' : tag)) newDelay 0 = 1000 newDelay x = min (x * 2) (10^6) -- |Receives data from the socket, deserializes, converts, enqueues. receiver :: (Binary b) => Socket -> -- ^ input socket (b -> IO a) -> -- ^ conversion function TVar (D.Queue D.Input a) -> -- ^ buffer IO () receiver sock conv q = forever $ do bs <- recv sock maxBytes val <- conv (decode (toLazy bs)) atomically ( readTVar q >>= writeTVar q . (flip D.snoc) val) -- |Removes data from the buffer, converts, serializes and sends. sender :: (Binary b) => Socket -> -- ^ output socket (a -> IO b) -> -- ^ conversion function TVar (D.Queue D.Output a) -> -- ^ buffer IO () sender sock conv tv = forever $ do v <- atomically ( do q <- readTVar tv case D.pop q of (Just e,rest) -> writeTVar tv rest >> return e (Nothing, _) -> retry ) val <- conv v send sock (fromLazy (encode val)) -- |Will block until the queue is empty. This is useful for delaying program -- shutdown till _most_ of the data has been sent. Items are removed just -- before sending, so an immediate shutdown could fail to send the final item. waitTillEmpty :: TVar (D.Queue d a) -> IO () waitTillEmpty tv = atomically (do q <- readTVar tv if D.isEmpty q then return () else retry ) toLazy :: S.ByteString -> L.ByteString toLazy bs = L.fromChunks [bs] fromLazy :: L.ByteString -> S.ByteString fromLazy bs = S.concat $ L.toChunks bs