module Network.Xmpp.Concurrent
( module Network.Xmpp.Concurrent.Monad
, module Network.Xmpp.Concurrent.Threads
, module Network.Xmpp.Concurrent.Basic
, module Network.Xmpp.Concurrent.Types
, module Network.Xmpp.Concurrent.Message
, module Network.Xmpp.Concurrent.Presence
, module Network.Xmpp.Concurrent.IQ
, toChans
, newSession
, writeWorker
, session
) where
import Network.Xmpp.Concurrent.Monad
import Network.Xmpp.Concurrent.Threads
import Control.Applicative((<$>),(<*>))
import Control.Concurrent
import Control.Concurrent.STM
import Control.Monad
import qualified Data.ByteString as BS
import Data.IORef
import qualified Data.Map as Map
import Data.Maybe (fromMaybe)
import Data.XML.Types
import Network.Xmpp.Concurrent.Basic
import Network.Xmpp.Concurrent.IQ
import Network.Xmpp.Concurrent.Message
import Network.Xmpp.Concurrent.Presence
import Network.Xmpp.Concurrent.Types
import Network.Xmpp.Concurrent.Threads
import Network.Xmpp.Marshal
import Network.Xmpp.Types
import Network
import Data.Text as Text
import Network.Xmpp.Tls
import qualified Network.TLS as TLS
import Network.Xmpp.Sasl
import Network.Xmpp.Sasl.Mechanisms
import Network.Xmpp.Sasl.Types
import Data.Maybe
import Network.Xmpp.Stream
import Network.Xmpp.Utilities
import Control.Monad.Error
import Data.Default
import System.Log.Logger
import Control.Monad.State.Strict
toChans :: TChan Stanza
-> TChan Stanza
-> TVar IQHandlers
-> Stanza
-> IO ()
toChans stanzaC outC iqHands sta = atomically $ do
writeTChan stanzaC sta
case sta of
IQRequestS i -> handleIQRequest iqHands i
IQResultS i -> handleIQResponse iqHands (Right i)
IQErrorS i -> handleIQResponse iqHands (Left i)
_ -> return ()
where
handleIQRequest :: TVar IQHandlers -> IQRequest -> STM ()
handleIQRequest handlers iq = do
(byNS, _) <- readTVar handlers
let iqNS = fromMaybe "" (nameNamespace . elementName $ iqRequestPayload iq)
case Map.lookup (iqRequestType iq, iqNS) byNS of
Nothing -> writeTChan outC $ serviceUnavailable iq
Just ch -> do
sent <- newTVar False
writeTChan ch $ IQRequestTicket sent iq
serviceUnavailable (IQRequest iqid from _to lang _tp bd) =
IQErrorS $ IQError iqid Nothing from lang err (Just bd)
err = StanzaError Cancel ServiceUnavailable Nothing Nothing
handleIQResponse :: TVar IQHandlers -> Either IQError IQResult -> STM ()
handleIQResponse handlers iq = do
(byNS, byID) <- readTVar handlers
case Map.updateLookupWithKey (\_ _ -> Nothing) (iqID iq) byID of
(Nothing, _) -> return ()
(Just tmvar, byID') -> do
let answer = either IQResponseError IQResponseResult iq
_ <- tryPutTMVar tmvar answer
writeTVar handlers (byNS, byID')
where
iqID (Left err) = iqErrorID err
iqID (Right iq') = iqResultID iq'
newSession :: Stream -> SessionConfiguration -> IO (Either XmppFailure Session)
newSession stream config = runErrorT $ do
outC <- lift newTChanIO
stanzaChan <- lift newTChanIO
iqHandlers <- lift $ newTVarIO (Map.empty, Map.empty)
eh <- lift $ newTVarIO $ EventHandlers { connectionClosedHandler = sessionClosedHandler config }
let stanzaHandler = toChans stanzaChan outC iqHandlers
(kill, wLock, streamState, readerThread) <- ErrorT $ startThreadsWith stanzaHandler eh stream
writer <- lift $ forkIO $ writeWorker outC wLock
return $ Session { stanzaCh = stanzaChan
, outCh = outC
, iqHandlers = iqHandlers
, writeRef = wLock
, readerThread = readerThread
, idGenerator = sessionStanzaIDs config
, streamRef = streamState
, eventHandlers = eh
, stopThreads = kill >> killThread writer
, conf = config
}
writeWorker :: TChan Stanza -> TMVar (BS.ByteString -> IO Bool) -> IO ()
writeWorker stCh writeR = forever $ do
(write, next) <- atomically $ (,) <$>
takeTMVar writeR <*>
readTChan stCh
r <- write $ renderElement (pickleElem xpStanza next)
atomically $ putTMVar writeR write
unless r $ do
atomically $ unGetTChan stCh next
threadDelay 250000
session :: HostName
-> SessionConfiguration
-> Maybe ([SaslHandler], Maybe Text)
-> IO (Either XmppFailure Session)
session realm config mbSasl = runErrorT $ do
stream <- ErrorT $ openStream realm (sessionStreamConfiguration config)
ErrorT $ tls stream
mbAuthError <- case mbSasl of
Nothing -> return Nothing
Just (handlers, resource) -> ErrorT $ auth handlers resource stream
case mbAuthError of
Nothing -> return ()
Just _ -> throwError XmppAuthFailure
ses <- ErrorT $ newSession stream config
return ses