----------------------------------------------------------------------------- -- | -- Module : Network.XMPP.Concurrent -- Copyright : (c) pierre, 2007 -- License : BSD-style (see the file libraries/base/LICENSE) -- -- Maintainer : k.pierre.k@gmail.com -- Stability : experimental -- Portability : portable -- -- Concurrent actions over single IO channel -- ----------------------------------------------------------------------------- module Network.XMPP.Concurrent ( Thread , XmppThreadT , runThreaded , readChanS , writeChanS , withNewThread , loop , waitFor ) where import Network.XMPP.Stream import Network.XMPP.Stanza import Network.XMPP.Types import Control.Concurrent import Control.Concurrent.STM import Control.Monad.State import Control.Monad.Reader import Network.XMPP.Utils import System.IO data Thread = Thread { inCh :: TChan Stanza , outCh :: TChan Stanza } type XmppThreadT a = ReaderT Thread IO a -- Two streams: input and output. Threads read from input stream and write to output stream. -- | Runs thread in XmppState monad runThreaded :: XmppThreadT () -> XmppStateT () runThreaded a = do in' <- liftIO $ atomically $ newTChan out' <- liftIO $ atomically $ newTChan liftIO $ forkIO $ runReaderT a (Thread in' out') s <- get liftIO $ forkIO $ loopWrite s out' liftIO $ forkIO $ connPersist (handle s) loopRead in' where loopRead in' = loop $ do st <- parseM liftIO $ atomically $ writeTChan in' st loopWrite s out' = do withNewStream $ do put s loop $ do st <- liftIO $ atomically $ readTChan out' outStanza st return () loop = sequence_ . repeat readChanS :: XmppThreadT Stanza readChanS = do c <- asks inCh st <- liftIO $ atomically $ readTChan c return st writeChanS :: Stanza -> XmppThreadT () writeChanS a = do c <- asks outCh st <- liftIO $ atomically $ writeTChan c a return () -- | Runs specified action in parallel withNewThread :: XmppThreadT () -> XmppThreadT ThreadId withNewThread a = do in' <- asks inCh out' <- asks outCh newin <- liftIO $ atomically $ dupTChan in' liftIO $ forkIO $ runReaderT a (Thread newin out') -- | Turns action into infinite loop loop :: XmppThreadT () -> XmppThreadT () loop a = do a loop a waitFor :: (Stanza -> Bool) -> XmppThreadT Stanza waitFor f = do s <- readChanS if (f s) then return s else do waitFor f connPersist :: Handle -> IO () connPersist h = do hPutStr h " " debugIO "" threadDelay 30000000 connPersist h