{-# OPTIONS_GHC -fglasgow-exts #-} -- | Asynchronous message broadcasting. module System.Miniplex.Sink ( -- * Caveat -- -- | This module performs writes on sockets that may be closed at any time, -- triggering @SIGPIPE@ signals. It is therefore recommended that you call -- @'System.Posix.Signals.installHandler' 'System.Posix.Signals.openEndedPipe' -- 'System.Posix.Signals.Ignore' 'Nothing'@ at the beginning of your -- program. Sink, create, write, destroy, withSink ) where import System.Miniplex.Sekrit import Prelude hiding (catch) import Data.Typeable import Data.List import Control.Concurrent import Control.Concurrent.STM import Control.Exception import Control.Monad import Control.Monad.Fix import System.Directory import System.IO.Lock import System.Posix.IO import qualified Network as N import Network.Socket data Rend a = EOF | Empty | Some a data Sink = Sink { name :: !String, sock :: !Socket, ctrl :: !(TVar [TVar (Rend String)]), acct :: !ThreadId, zmsg :: !(TVar (Maybe String)) } deriving (Typeable) loop :: (Monad m) => m Bool -> m () loop m = fix $ \self -> m >>= flip when self xsend :: Socket -> String -> IO () xsend s str = {- bracket (installHandler openEndedPipe Ignore Nothing) (\h -> installHandler openEndedPipe h Nothing) (\_ -> -} reallySend s (bytesFromInt (length str) ++ str) {- ) -} finallyClose :: Socket -> IO a -> IO a finallyClose s x = x `finally` sClose s forkClose :: Socket -> IO () -> IO ThreadId forkClose s x = forkIO $ finallyClose s $ handleJust ioErrors (const $ return ()) x barghest :: Socket -> TVar [TVar (Rend String)] -> TVar (Maybe String) -> IO () barghest s cp zp = loop conn where conn = block $ do (t, _) <- accept s forkClose t $ unblock $ do v <- atomically (newTVar Empty) spawn t v return True spawn t v = do shutdown t ShutdownReceive z <- atomically $ readTVar zp maybe (return ()) (xsend t) z block $ do atomically $ do vs <- readTVar cp writeTVar cp (v : vs) unblock (loop body) `finally` atomically (readTVar cp >>= writeTVar cp . delete v) where body = join . atomically $ do x <- readTVar v case x of EOF -> return (return False) Empty -> retry Some str -> do writeTVar v Empty return $ xsend t str >> return True -- | @'create' tag@ creates a message sink. @tag@ is used to uniquely -- identify this sink. The set of allowed characters for @tag@ includes -- letters, digits, @_@, @-@ and @+@. -- -- (Implementation detail: This function actually creates a named socket -- in your @~\/.miniplex\/@.) create :: String -> IO Sink create what = do (com, lck, ret) <- pathFromTag "System.Miniplex.Sink.create" what bracketOnError (N.listenOn (N.UnixSocket com)) sClose $ \s -> do closeOnExec s bracket (openFd lck WriteOnly (Just mode644) defaultFileFlags) closeFd $ \lf -> do bracket (setLockAll lf LockWrite) unLock $ \_ -> do brush ret vs <- atomically $ newTVar [] z <- atomically $ newTVar Nothing t <- forkClose s (barghest s vs z) return $ Sink { name = com , sock = s , ctrl = vs , acct = t , zmsg = z } brush :: String -> IO () brush file = do flip finally (del file) $ do r <- tryJust enoents $ openFd file WriteOnly Nothing defaultFileFlags{ nonBlock = True } case r of Left _ -> return () Right fd -> do flip finally (closeFd fd) $ do fdWrite fd "\0" return () where del x = handleJust ioErrors (const $ return ()) $ removeFile x -- | @'write' si msg@ asynchronously writes @msg@ to @si@, where it will be -- received by all currently connected readers. write :: Sink -> String -> IO () write si str = atomically $ do writeTVar (zmsg si) (Just str) vs <- readTVar (ctrl si) mapM_ (flip writeTVar msg) vs where msg = Some str -- | Deallocates a sink. The destroyed sink must not be used again. -- -- (Implementation detail: This function closes and removes the socket from -- the file system. If you forget to call it, you'll leave a stale entry in -- @~\/.miniplex\/@, which will cause calls to 'create' with the same tag to -- fail.) destroy :: Sink -> IO () destroy si = do removeFile (name si) killThread (acct si) atomically $ do vs <- readTVar c mapM_ (flip writeTVar EOF) vs writeTVar c [] where c = ctrl si -- | Helper function to simplify resource handling. @'withSink' tag body@ -- creates a sink, calls @body@, then destroys the sink, even if @body@ -- throws an exception. withSink :: String -> (Sink -> IO a) -> IO a withSink t f = block $ do si <- create t unblock (f si) `finally` destroy si