{-# OPTIONS_GHC -fglasgow-exts #-} -- | Synchronous message receiving. module System.Miniplex.Source ( Source, attach, read, getMsgs, detach, withSource ) where import System.Miniplex.Sekrit import Prelude hiding (read) import Control.Exception import Control.Monad import Control.Monad.Reader () import Data.Typeable import Network.Socket import System.IO.Unsafe newtype Source = Source { sock :: Socket } deriving (Typeable) -- | @'attach' tag@ returns a message source connected to the sink created by -- a call to @'System.Miniplex.Sink.create' tag@. If no such sink exists, an -- exception is thrown. attach :: String -> IO Source attach what = do n <- pathFromTag "System.Miniplex.Source.attach" what bracketOnError (socket AF_UNIX Stream 0) sClose $ \s -> do closeOnExec s connect s (SockAddrUnix n) shutdown s ShutdownSend return $ Source {sock = s} -- | Synchronously reads a message from a source (i.e. it blocks if there is -- currently no message available). read :: Source -> IO String read so = do n <- liftM intFromBytes $ reallyRecv s 4 reallyRecv s n where s = sock so -- | Returns a lazy list of all messages arriving at a source (like -- @'System.IO.hGetContents'@). getMsgs :: Source -> IO [String] getMsgs so = unsafeInterleaveIO . handle (\_ -> detach so >> return []) $ liftM2 (liftM2 (:)) read getMsgs so -- | Disconnects from a message sink. The detached source becomes invalid -- and must not be used again. detach :: Source -> IO () detach so = do sClose (sock so) -- | Helper function to simplify resource handling. @'withSource' tag body@ -- creates a source, calls @body@, then disconnects the source, even if -- @body@ throws an exception. withSource :: String -> (Source -> IO a) -> IO a withSource tag f = block $ do so <- attach tag unblock (f so) `finally` detach so