-- Distributed Haskell: ports library -- -- Author : Manuel M. T. Chakravarty -- Created: 20 March 2000 -- -- Version $Revision: 1.15 $ from $Date: 2003/06/23 13:48:13 $ -- -- Copyright (c) [2000..2003] Manuel M. T. Chakravarty -- -- This file is free software; you can redistribute it and/or modify -- it under the terms of the GNU General Public License as published by -- the Free Software Foundation; either version 2 of the License, or -- (at your option) any later version. -- -- This file is distributed in the hope that it will be useful, -- but WITHOUT ANY WARRANTY; without even the implied warranty of -- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -- GNU General Public License for more details. -- --- DESCRIPTION --------------------------------------------------------------- -- -- Provides ports in the spirit of -- -- Distributed Haskell: Goffin on the Internet. Manuel M. T. Chakravarty, -- Yike Guo, and Martin Köhler. In M. Sato and Y. Toyama, editors, -- Proceedings of the Third Fuji International Symposium on Functional and -- Logic Programming, World Scientific Publishers, pp 80-97, 1998. -- -- plus, by now, a considerable amount of extras. -- --- DOCU ---------------------------------------------------------------------- -- -- language: Haskell 98 + Concurrency + Weak pointers -- -- Structure of Ports -- ~~~~~~~~~~~~~~~~~~ -- A port consists of two components: -- (1) the port data and -- (2) the port filter. -- -- The port filter is a function that censors any new port value (and may -- alter the value or discard it altogether). The port data consists of the -- current port value plus the hole in the port stream that has to be -- instantiated next; furthermore, contains a port notifier, which is invoked -- on every new value put into the out stream of the port. The port data is -- stored in an `MVar' to allow port updates and perform these updates -- atomically. The later is achieved by taking the port data out of the -- `MVar' when a critical section in the update code is reached and putting -- it back at the end of the critical section. -- -- As soon as a port is no dead (and this is spotted by the storage manager), -- it is automagically closed. This is sometimes essential and sometimes -- just convenient, but care has to be taken, as space leaks can lead to -- deadlock. -- -- Port Streams -- ~~~~~~~~~~~~ -- Port streams are represented as lists with a hole, ie, lists terminated by -- a single assignment variable, which is updated to extended the stream. -- This is a standard technique from logic programming. -- -- Proxy Ports -- ~~~~~~~~~~~ -- A port may serve as a proxy for a stateful object, such as a GUI widget. -- In this case, the proxy mirrors the (externally visible) state of the -- object and takes care to avoid feedback loops between the port and the -- object. Moreover, port notifiers can be useful in other cases where a -- synchronous notification of changes of a port value are required (eg, when -- port values are dumped to stdout and interleaving with other output should -- be minimised). -- -- Internal port notifier -- ~~~~~~~~~~~~~~~~~~~~~~ -- Each port has an internal port notifier that is run on the filtered input -- messages. The notifier can be modified (extended) over the lifetime of a -- port. As notifications are only send out on filtered messages, the -- message will effectively be filtered twice for each linked port (and it -- won't arrive at any linked port at all if it is already discarded by the -- filter of the port receiving the original message). Internally, messages -- are tagged as either coming from the outside, from a linked port, or from -- an object for which the current port is a proxy. These tags are used to -- prevent feedback loops. -- -- Note that a port is proxy to at most one object; thus, there is also at -- most one notifier for such an object. -- -- Concurrency Control -- ~~~~~~~~~~~~~~~~~~~ -- `Concurrent.yield' is important to ensure that listeners get a chance soon -- after a port is updated (even if the routine that did the update goes into -- non-preemptive non-Haskell land). -- -- All port primitives avoid to block indefinitely. They use blocking -- operations only to realise mutual exclusion to critical sections in which -- no operation remains for longer periods of time. However, they may block -- of course when any of the supplied arguments that needs to be touched is -- not yet available. -- -- Port primitives do not spawn asynchronous operations to complete their -- task - ie, when a port primtive returns it's operation has been fully -- executed. In particular, the primitive will not access the port anymore -- after it returned, and so, will not be affected if the port is closed. -- --- TODO ---------------------------------------------------------------------- -- -- * BUG: Port linking doesn't work properly for more than two ports. When a -- port is linked to a port that is already linked to a third port, -- the newly linked port has to be registered in the notifiers of -- *both* ports. -- -- * Problem with finalisers: There can be a deadlock if all threads wait for -- finalisers to run (so that unused ports get closed); forcing a GC in -- this situation helps, but IMO this should be done automagically by the -- RTS. [Should be ok from GHC 5.02 onwards.] -- -- * All critical sections should probably be protected against exceptions. -- The port internal MVar has to be put back even if an exception occurs -- during the critical section (eg, an async exc); otherwise, the system -- may deadlock. -- module Control.Concurrent.Ports ( PortFilter, PortNotifier, Port, withPorts, newPort, listenToNewPort, newFilteringPort, newProxyPort, openPort, closePort, isClosedPort, waitUntilClosedPort, keepAlivePort, setThreadInfo, waitForThread, waitForPort, waitForPortThread, closeAndWaitForPort, listenToPort, (<--), (<==), (<-$), chainPorts, linkPorts, (<->), peekIntoPort, -- thread routines -- keepAliveForkIO, ) where import Prelude hiding (catch) import Maybe import Monad (unless, when) import System (getArgs) import Control.Concurrent (ThreadId, myThreadId, MVar, newMVar, newEmptyMVar, takeMVar, putMVar, readMVar, swapMVar, isEmptyMVar, forkIO, yield, threadDelay) import Control.Exception (throw, throwTo, catch, finally) import System.Mem (performGC) -- only temporary import System.IO.Unsafe (unsafePerformIO) import Foreign.StablePtr (newStablePtr, freeStablePtr) -- only temporary import System.Mem.Weak (addFinalizer) import Control.Concurrent.PortsConfig (debug) import Control.Concurrent.SVars (V, newV, valV, (<<)) infix 3 <--, <==, <-$ -- ports data structures -- --------------------- -- port filter are used to screen messages before they are allowed into the -- port (EXPORTED) -- -- * messages may be rejected (by returning `Nothing') -- -- * and may be altered (by returning the altered value) -- -- * the first argument is the current value of the port and the second is the -- value of the current message -- type PortFilter a = a -> a -> Maybe a -- port notifier (EXPORTED) -- -- * an action either notifying a port of a new message or notifying some -- other stateful object (eg, a widget) of a new port value -- type PortNotifier a = a -> IO () -- we have three different kinds of messages -- data MsgKind = ExternalMsg -- from an external source (normal send) | BuddyMsg -- from a buddy, ie, linked port | ProxyMsg -- from an object for which we are a proxy deriving (Eq) -- data specifying the current state of a port -- type PortData a = (a, -- current port value V [a], -- current hole in the stream MsgKind -> PortNotifier a, -- internal update notification Maybe ThreadInfo, -- associated thread info Int) -- number of open input edges -- it's Maybe because we currently still allow ports that -- are not associated with threads type ThreadInfo = (ThreadId, MVar ()) -- ports structure (EXPORTED ABSTRACTLY) -- -- * the `MVar' is empty while a port operation is in a critical section -- -- * the `MVar' will contain `Nothing' after the port was closed -- data Port a = Port (MVar (Maybe (PortData a))) -- updatable port data (PortFilter a) -- screens incoming messages -- list of synchronisation points that have to be honoured before terminating -- terminationSyncPoints :: MVar [MVar ()] {-# NOINLINE terminationSyncPoints #-} terminationSyncPoints = unsafePerformIO $ newMVar [] -- ports initilisation -- ------------------- -- initialise ports environment -- -- * lists remotely callable entry points -- -- * this function will not terminate unless all `keep alive' ports are closed -- withPorts :: [(String, IO ())] -> ([String] -> IO ()) -> IO () withPorts rep main = do unless (null rep) $ putStrLn "Warning: Distribution not supported yet!" args <- getArgs main args takeMVar terminationSyncPoints >>= mapM_ takeMVar -- wait for sync points -- FIXME: This is subtly wrong! -- If after the `terminationSyncPoints' have been taken, any other -- thread calls `keepAlivePort', that thread will block on the -- empty `terminationSyncPoints'. -- ports core functionality -- ------------------------ -- create a new port (EXPORTED) -- -- * the argument is the intial value used in case an application of a -- state transformer is performed before the first message is sent; it, -- however, never appears in any output stream -- -- * the port is automagically closed when it is garbage collected (and not -- closed yet) -- newPort :: a -> IO (Port a) newPort x = newFilteringPort x (const Just) -- trivial filter -- create a new port and obtain its message stream (EXPORTED) -- listenToNewPort :: a -> IO (Port a, [a]) listenToNewPort x = do p <- newPort x ms <- listenToPort p return (p, ms) -- create a new port that has a port filter (EXPORTED) -- -- * the argument is the intial value used in case an application of a -- state transformer is performed before the first message is sent; it, -- however, never appears in any output stream -- -- * the port is automagically closed when it is garbage collected (and not -- closed yet) -- -- * CONSTRAINT: The port filter must not include blocking operations. It is -- evaluated in a critical section that manipulates the internal -- representation of ports. If it blocks, deadlocks may occur. -- newFilteringPort :: a -> PortFilter a -> IO (Port a) newFilteringPort x f = setupPort x f (const nop) -- create a new proxy port (EXPORTED) -- -- * the port notifier passed as an argument is used to update the `real' -- object about a change of the port value -- -- * in contrast, the port notifier returned as a result must be used to -- notify the proxy of a change of the state of the `real' object -- -- * it is ensured that no feedback loop is generated -- -- * the port is automagically closed when it is garbage collected (and not -- closed yet) -- newProxyPort :: a -> PortNotifier a -> IO (Port a, PortNotifier a) newProxyPort x pn = do let pn' msgType = if (msgType /= ProxyMsg) -- don't feedback proxy msgs then pn else nop p <- setupPort x (const Just) pn' return $ (p, insertMessage p ProxyMsg) -- |Open given port another time to allow for repeated closing. -- --FIXME: Should this better pass out a new port value? Then, closing that new -- port handle while not closing the whole port should invalidate that new -- handle. (Would probably, at least, be nicer for debugging.) openPort :: Port a -> IO () openPort (Port mv _) = do onDebugLogEmpty mv "openPort blocks" portData <- takeMVar mv case portData of Nothing -> do putStr "Warning: Ports.openPort: Attempted \ \to open a closed port!\n" putMVar mv Nothing Just (x, hole, pn, t, refs) -> putMVar mv $ Just (x, hole, pn, t, refs + 1) -- |Close a port. -- -- * If the port has been opened multiple times, it can be closed the -- corresponding number of times before it will be irrevocably shut, whereby -- creation of a port counts as one opening. -- -- * Closing an already closed port is a no-op. -- closePort :: Port a -> IO () closePort (Port mv _) = do onDebugLogEmpty mv "closePort blocks" portData <- takeMVar mv case portData of Nothing -> do logDebug $ "Ports.closePort: port already closed" putMVar mv Nothing Just (_, hole, _ , _, 1 ) -> do logDebug $ "Ports.closePort: port now closed" putMVar mv Nothing hole << [] -- close the stream yield Just (x, hole, pn, t, refs) -> do logDebug $ "Ports.closePort: port ref count at " ++ (show (refs - 1)) putMVar mv $ Just (x, hole, pn, t, refs - 1) -- |Check whether a given port is closed. -- -- * ATTENTION: This routine is a potential source of race conditions. Use -- with care. If this routine returns `False', the state of the port may -- have changed by the time the return value is inspected. -- isClosedPort :: Port a -> IO Bool isClosedPort (Port mv _) = do portData <- readMVar mv return (isNothing portData) -- suspend until the given port is closed (EXPORTED) -- -- FIXME: remove this in favour of `waitForPort' below waitUntilClosedPort :: Port a -> IO () waitUntilClosedPort = waitForPort -- make sure the program does not terminate before the given port is closed -- (EXPORTED) -- keepAlivePort :: Port a -> IO () keepAlivePort p = do sp <- newEmptyMVar sps <- takeMVar terminationSyncPoints putMVar terminationSyncPoints (sp:sps) addFinalizer p $ putMVar sp () -- Attaches a stream to a port that collects its messages (EXPORTED) -- -- * the stream starts to collect the messages from the moment it is attached -- - ie, it does not see old messages -- listenToPort :: Port a -> IO [a] listenToPort (Port mv _) = do portData <- readMVar mv case portData of Just (_, hole, _, _, _) -> return (valV hole) Nothing -> do putStr "Warning: Ports.listenToPort: Attempted \ \to listen to a closed port!\n" return [] -- send a message to a port (EXPORTED) -- (<--) :: Port a -> a -> IO () p@(Port mv f) <-- m = insertMessage p ExternalMsg m -- send a list of messages to a port (EXPORTED) -- -- * the current thread *will* block if the evaluation of the second -- argument blocks (eg, if it is a stream) -- -- * there is no guarantee that the messages are not interleaved with -- concurrently sent messages -- (<==) :: Port a -> [a] -> IO () p <== ms = do mapM_ (p <--) ms return () -- send a state transformer to a port (EXPORTED) -- -- * it will be applied to the message most recently seen on the port; the -- result is send as a message to the port -- -- * CONSTRAINT: The state transformer must not include blocking operations. -- It is evaluated in a critical section that manipulates the -- internal representation of ports. If it blocks, deadlocks -- may occur. -- (<-$) :: Port a -> (a -> a) -> IO () -- -- Note that the seemingly equivalent peeking into the port, applying the -- state transformer, and then, sending the result to the port is a much -- weaker operation. The following code ensures that if the transformed state -- makes it into the port, it will be the value right after the argument -- passed to the state transformer - the variant using `peekIntoPort' cannot -- give this guarantee. -- (Port mv f) <-$ g = do onDebugLogEmpty mv "(<-$) blocks" portData <- takeMVar mv case portData of Nothing -> do putMVar mv Nothing putStr "Warning: (Ports.<-$): Attempted \ \to transform a closed port!\n" Just state@(x, hole, pn, t, refs) -> -- current end of stream case f x (g x) of Nothing -> putMVar mv $ Just state -- no change Just m' -> do newHole <- newV -- new end of stream hole << m' : (valV newHole) -- extend stream putMVar mv $ Just (m', newHole, pn, t, refs) -- remember new end pn ExternalMsg m' -- invoke notifier yield -- any runnable threads? -- chain two ports (EXPORTED) -- -- * the output of one port is sent to the second after being filtered through -- a translation function -- chainPorts :: (a -> b) -> Port a -> Port b -> IO () chainPorts aToB pa pb = do stream <- listenToPort pa pb <== map aToB stream -- link two ports with translation functions (EXPORTED) -- -- * any message to one port will - after translation - also be send to the -- other one -- linkPorts :: (a -> b) -> (b -> a) -> Port a -> Port b -> IO () linkPorts aToB bToA pa pb = do addNotifier pa (insertMessage pb BuddyMsg . aToB) addNotifier pb (insertMessage pa BuddyMsg . bToA) -- link two ports (EXPORTED) -- -- * any message to one port will also be send to the other one -- (<->) :: Port a -> Port a -> IO () (<->) = linkPorts id id -- observe the momentary current value of the port (EXPORTED) -- -- * returns `Nothing' if the port is closed -- peekIntoPort :: Port a -> IO (Maybe a) peekIntoPort (Port mv f) = do portData <- readMVar mv case portData of Nothing -> return Nothing Just (x, _, _, _, _) -> return $ Just x -- thread routines -- --------------- -- make sure the program does not terminate before the current thread -- terminates (EXPORTED) -- keepAliveForkIO :: IO () -> IO ThreadId keepAliveForkIO m = do sp <- newEmptyMVar sps <- takeMVar terminationSyncPoints putMVar terminationSyncPoints (sp:sps) forkIO $ m `finally` putMVar sp () -- |Associate a thread with a port -- -- * Sets the thread id and a synchronisation variable that will be set by the -- thread when it terminates -- setThreadInfo :: Port a -> ThreadId -> MVar () -> IO () setThreadInfo (Port mv _) t sv = do onDebugLogEmpty mv "setThreadInfo blocks" portData <- takeMVar mv case portData of Just state@(x, hole, pn, _, refs) -> -- current end of stream putMVar mv $ Just (x, hole, pn, Just (t, sv), refs) Nothing -> do putMVar mv Nothing putStr "Warning: Ports.setThreadPort: Attempted to associate\ \a thread with a closed port!\n" -- |Synchronise on the synchronisation variable in a port's thread info, which -- is set as soon as the thread terminates. -- -- * If a thread is associate with more than one port, waiting for any or all -- ports will have the same effect -- --FIXME: should we also wait until the port is closed? (if the thread --traverses the whole port stream stream, it won't terminate until the port is --closed anyway) waitForThread :: Port a -> IO () waitForThread (Port mv _) = do portData <- readMVar mv case portData of Nothing -> return () Just (_, _, _, ti, _) -> case ti of Nothing -> return () Just (_, sv) -> swapMVar sv () -- |Synchronise on a port being closed. -- --FIXME: It would be more efficient if a port would keep a list of threads to --notify when a port is being closed. waitForPort :: Port a -> IO () waitForPort (Port mv _) = do portData <- readMVar mv case portData of Nothing -> return () Just (_, hole, _, _, _) -> walk (valV hole) where walk [] = return () walk (_:s) = walk s -- |Wait until both the port is closed and the corresponding thread has -- terminated. -- waitForPortThread :: Port a -> IO () waitForPortThread p = waitForPort p >> waitForThread p -- |Ensure that the port is closed before waiting for the associated thread to -- terminate. -- closeAndWaitForPort :: Port a -> IO () closeAndWaitForPort p = closePort p >> waitForThread p -- auxiliary routines -- ------------------ -- no notifier -- nop :: PortNotifier a nop _ = return () -- Creates a port structure -- -- * the argument is the intial value used in case an application of a -- state transformer is performed before the first message is sent; it, -- however, never appears in any output stream -- -- * the port is automagically closed when it is garbage collected -- -- * CONSTRAINT: The port filter must not include blocking operations. It is -- evaluated in a critical section that manipulates the internal -- representation of ports. If it blocks, deadlocks may occur. -- setupPort :: a -> PortFilter a -> (MsgKind -> PortNotifier a) -> IO (Port a) setupPort x f pn = do hole <- newV mv <- newMVar $ Just (x, hole, pn, Nothing, 1) let p = Port mv f -- FIXME: Kludge until RTS is fixed! -- (makes it explicit to GC that mv can still be written) stableMV <- newStablePtr mv -- End of Kludge addFinalizer p (ensurePortIsClosed p stableMV) -- FIXME: Evil Kludge until RTS is fixed! ensureRegularGC -- End of Evil Kludge return p where ensurePortIsClosed p stableMV = do closePort p -- FIXME: Kludge until RTS is fixed! freeStablePtr stableMV -- End of Kludge -- FIXME: Evil Kludge until RTS is fixed! sentinel :: MVar Bool {-# NOINLINE sentinel #-} sentinel = unsafePerformIO $ newMVar False -- GC thread started? ensureRegularGC = do started <- swapMVar sentinel True unless started $ do forkIODebug "GC trigger thread" $ let gc = threadDelay 100000 >> performGC >> gc in gc return () -- End of Evil Kludge -- pass a message to a port ignoring the buddy list when the first argument is -- `false' -- insertMessage :: Port a -> MsgKind -> a -> IO () insertMessage (Port mv f) msgKind m = do onDebugLogEmpty mv "insertMessage blocks" portData <- takeMVar mv case portData of Nothing -> do putMVar mv Nothing putStr warnMsg Just state@(x, hole, pn, t, refs) -> -- current end of stream do case f x m of Nothing -> putMVar mv $ Just state -- no change Just m' -> do newHole <- newV -- create new end of stream hole << m' : (valV newHole) -- extend stream putMVar mv $ Just (m', newHole, pn, t, refs) -- store new end of stream pn msgKind m' -- invoke notifier yield -- any runnable threads? where warnMsg = "Warning: (Ports.insertMessage): Attempted \ \to write to a closed port!\n" -- add a notifier to the buddy list -- addNotifier :: Port a -> PortNotifier a -> IO () addNotifier (Port mv f) pn2 = do onDebugLogEmpty mv "addNotifier blocks" portData <- takeMVar mv case portData of Nothing -> do putMVar mv Nothing putStr warnMsg Just (x, hole, pn1, t, refs) -> do let pn' msgType x = do pn1 msgType x when (msgType /= BuddyMsg) $ pn2 x putMVar mv $ Just (x, hole, pn', t, refs) where warnMsg = "Warning: (Ports.addNotifier): Attempted \ \to modify a closed port!\n" -- auxilliary routines for debugging -- --------------------------------- -- FIXME: these routines are duplicated in `Processes'; maybe introduce a -- `PortsDebug' module -- a variant of `forkIO' that installs an exception sentry if compiled with -- debugging enabled -- forkIODebug :: String -> IO () -> IO ThreadId forkIODebug desc com = forkIO $ excSentry desc com -- report exceptions caught by the given computation -- -- * only in effect when compiled with debugging enabled -- excSentry :: String -- thread descriptor -> IO () -- computation to watch -> IO () {-# INLINE excSentry #-} excSentry desc com | not debug = com | otherwise = com `catch` logExc where logExc exc = do putStrLn $ "Processes: Thread \"" ++ desc ++ "\": " ++ show exc throw exc -- execute only if debugging on -- onDebug :: IO () -> IO () {-# INLINE onDebug #-} onDebug com | debug = com | otherwise = return () -- emit debugging message -- logDebug :: String -> IO () {-# INLINE logDebug #-} logDebug msg = onDebug $ putStrLn msg -- if debugging switched on, write a log message if the `MVar' is empty -- onDebugLogEmpty :: MVar a -> String -> IO () onDebugLogEmpty mv msg = onDebug $ do isEmpty <- isEmptyMVar mv when isEmpty $ putStrLn $ msg ++ ": empty `MVar'"