{-# OPTIONS_GHC -fglasgow-exts #-} -- | Asynchronous message broadcasting. module System.Miniplex.Sink ( -- * Caveat -- -- | This module performs writes on sockets that may be closed at any time, -- triggering @SIGPIPE@ signals. It is therefore recommended that you call -- @'System.Posix.Signals.installHandler' 'System.Posix.Signals.openEndedPipe' -- 'System.Posix.Signals.Ignore' 'Nothing'@ at the beginning of your -- program. Sink, create, write, destroy, withSink ) where import System.Miniplex.Sekrit import Prelude hiding (catch) import Data.Typeable import Data.List import Control.Concurrent import Control.Concurrent.STM import Control.Exception import Control.Monad import Control.Monad.Fix import System.Directory import qualified Network as N import Network.Socket data Rend a = EOF | Empty | Some a data Sink = Sink { name :: !String, sock :: !Socket, ctrl :: !(TVar [TVar (Rend String)]), acct :: !ThreadId, zmsg :: !(TVar (Maybe String)) } deriving (Typeable) loop :: (Monad m) => m Bool -> m () loop m = fix $ \self -> m >>= flip when self xsend :: Socket -> String -> IO () xsend s str = {- bracket (installHandler openEndedPipe Ignore Nothing) (\h -> installHandler openEndedPipe h Nothing) (\_ -> -} reallySend s (bytesFromInt (length str) ++ str) {- ) -} finallyClose :: Socket -> IO a -> IO a finallyClose s x = x `finally` sClose s forkClose :: Socket -> IO () -> IO ThreadId forkClose s x = forkIO $ finallyClose s $ handleJust ioErrors (const $ return ()) x barghest :: Socket -> TVar [TVar (Rend String)] -> TVar (Maybe String) -> IO () barghest s cp zp = loop conn where conn = block $ do (t, _) <- accept s forkClose t $ unblock $ do v <- atomically (newTVar Empty) spawn t v return True spawn t v = do shutdown t ShutdownReceive z <- atomically $ readTVar zp maybe (return ()) (xsend t) z block $ do atomically $ do vs <- readTVar cp writeTVar cp (v : vs) unblock (loop body) `finally` atomically (readTVar cp >>= writeTVar cp . delete v) where body = join . atomically $ do x <- readTVar v case x of EOF -> return (return False) Empty -> retry Some str -> do writeTVar v Empty return $ xsend t str >> return True -- | @'create' tag@ creates a message sink. @tag@ is used to uniquely -- identify this sink. The set of allowed characters for @tag@ includes -- letters, digits, @_@, @-@ and @+@. -- -- (Implementation detail: This function actually creates a named socket -- in your @~\/.miniplex\/@.) create :: String -> IO Sink create what = do n <- pathFromTag "System.Miniplex.Sink.create" what s <- N.listenOn (N.UnixSocket n) closeOnExec s vs <- atomically $ newTVar [] z <- atomically $ newTVar Nothing t <- forkClose s (barghest s vs z) return $ Sink { name = n , sock = s , ctrl = vs , acct = t , zmsg = z } -- | @'write' si msg@ asynchronously writes @msg@ to @si@, where it will be -- received by all currently connected readers. write :: Sink -> String -> IO () write si str = atomically $ do writeTVar (zmsg si) (Just str) vs <- readTVar (ctrl si) mapM_ (flip writeTVar msg) vs where msg = Some str -- | Deallocates a sink. The destroyed sink must not be used again. -- -- (Implementation detail: This function closes and removes the socket from -- the file system. If you forget to call it, you'll leave a stale entry in -- @~\/.miniplex\/@, which will cause calls to 'create' with the same tag to -- fail.) destroy :: Sink -> IO () destroy si = do removeFile (name si) killThread (acct si) atomically $ do vs <- readTVar c mapM_ (flip writeTVar EOF) vs writeTVar c [] where c = ctrl si -- | Helper function to simplify resource handling. @'withSink' tag body@ -- creates a sink, calls @body@, then destroys the sink, even if @body@ -- throws an exception. withSink :: String -> (Sink -> IO a) -> IO a withSink t f = block $ do si <- create t unblock (f si) `finally` destroy si