module Network.Xmpp.Concurrent.Threads where
import Network.Xmpp.Types
import Control.Applicative((<$>))
import Control.Concurrent
import Control.Concurrent.STM
import qualified Control.Exception.Lifted as Ex
import Control.Monad
import Control.Monad.IO.Class
import Control.Monad.State.Strict
import qualified Data.ByteString as BS
import Network.Xmpp.Concurrent.Types
import Network.Xmpp.Stream
import Control.Concurrent.STM.TMVar
import GHC.IO (unsafeUnmask)
import Control.Monad.Error
import System.Log.Logger
readWorker :: (Stanza -> IO ())
-> (XmppFailure -> IO ())
-> TMVar Stream
-> IO a
readWorker onStanza onConnectionClosed stateRef =
Ex.mask_ . forever $ do
res <- Ex.catches ( do
s <- atomically $ do
s@(Stream con) <- readTMVar stateRef
state <- streamConnectionState <$> readTMVar con
when (state == Closed)
retry
return s
allowInterrupt
Just <$> pullStanza s
)
[ Ex.Handler $ \(Interrupt t) -> do
void $ handleInterrupts [t]
return Nothing
, Ex.Handler $ \(e :: XmppFailure) -> do
onConnectionClosed e
errorM "Pontarius.Xmpp" $ "Read error: " ++ show e
return Nothing
]
case res of
Nothing -> return ()
Just (Left e) -> return ()
Just (Right sta) -> onStanza sta
where
allowInterrupt :: IO ()
allowInterrupt = unsafeUnmask $ return ()
handleInterrupts :: [TMVar ()] -> IO [()]
handleInterrupts ts =
Ex.catch (atomically $ forM ts takeTMVar)
(\(Interrupt t) -> handleInterrupts (t:ts))
startThreadsWith :: (Stanza -> IO ())
-> TVar EventHandlers
-> Stream
-> IO (Either XmppFailure (IO (),
TMVar (BS.ByteString -> IO Bool),
TMVar Stream,
ThreadId))
startThreadsWith stanzaHandler eh con = do
read <- withStream' (gets $ streamSend . streamHandle >>= \d -> return $ Right d) con
case read of
Left e -> return $ Left e
Right read' -> do
writeLock <- newTMVarIO read'
conS <- newTMVarIO con
cp <- forkIO $ connPersist writeLock
rd <- forkIO $ readWorker stanzaHandler (noCon eh) conS
return $ Right ( killConnection writeLock [rd, cp]
, writeLock
, conS
, rd
)
where
killConnection writeLock threads = liftIO $ do
_ <- atomically $ takeTMVar writeLock
_ <- forM threads killThread
return ()
noCon :: TVar EventHandlers -> XmppFailure -> IO ()
noCon h e = do
hands <- atomically $ readTVar h
_ <- forkIO $ connectionClosedHandler hands e
return ()
connPersist :: TMVar (BS.ByteString -> IO Bool) -> IO ()
connPersist lock = forever $ do
pushBS <- atomically $ takeTMVar lock
_ <- pushBS " "
atomically $ putTMVar lock pushBS
threadDelay 30000000