module Network.XMPP.Concurrent
( Thread
, XmppThreadT
, runThreaded
, readChanS
, writeChanS
, withNewThread
, loop
, waitFor
) where
import Network.XMPP.Stream
import Network.XMPP.Stanza
import Network.XMPP.Types
import Control.Concurrent
import Control.Concurrent.STM
import Control.Monad.State
import Control.Monad.Reader
import Network.XMPP.Utils
import System.IO
data Thread = Thread { inCh :: TChan Stanza
, outCh :: TChan Stanza
}
type XmppThreadT a = ReaderT Thread IO a
runThreaded :: XmppThreadT () -> XmppStateT ()
runThreaded a = do
in' <- liftIO $ atomically $ newTChan
out' <- liftIO $ atomically $ newTChan
liftIO $ forkIO $ runReaderT a (Thread in' out')
s <- get
liftIO $ forkIO $ loopWrite s out'
liftIO $ forkIO $ connPersist (handle s)
loopRead in'
where
loopRead in' = loop $
do
st <- parseM
liftIO $ atomically $ writeTChan in' st
loopWrite s out' =
do
withNewStream $ do
put s
loop $ do
st <- liftIO $ atomically $ readTChan out'
outStanza st
return ()
loop = sequence_ . repeat
readChanS :: XmppThreadT Stanza
readChanS = do
c <- asks inCh
st <- liftIO $ atomically $ readTChan c
return st
writeChanS :: Stanza -> XmppThreadT ()
writeChanS a = do
c <- asks outCh
st <- liftIO $ atomically $ writeTChan c a
return ()
withNewThread :: XmppThreadT () -> XmppThreadT ThreadId
withNewThread a = do
in' <- asks inCh
out' <- asks outCh
newin <- liftIO $ atomically $ dupTChan in'
liftIO $ forkIO $ runReaderT a (Thread newin out')
loop :: XmppThreadT () -> XmppThreadT ()
loop a = do
a
loop a
waitFor :: (Stanza -> Bool) -> XmppThreadT Stanza
waitFor f = do
s <- readChanS
if (f s) then
return s
else do
waitFor f
connPersist :: Handle -> IO ()
connPersist h = do
hPutStr h " "
debugIO "<space added>"
threadDelay 30000000
connPersist h