module Network.Xmpp.Concurrent.Monad where
import Network.Xmpp.Types
import Control.Applicative((<$>))
import Control.Concurrent
import Control.Concurrent.STM
import Control.Concurrent.STM.TVar (TVar, readTVar, writeTVar)
import qualified Control.Exception.Lifted as Ex
import Control.Monad.IO.Class
import Control.Monad.Reader
import Control.Monad.State.Strict
import Data.IORef
import qualified Data.Map as Map
import Data.Text(Text)
import Network.Xmpp.Concurrent.Types
import Network.Xmpp.Monad
listenIQChan :: IQRequestType
-> Text
-> Session
-> IO (Either (TChan IQRequestTicket) (TChan IQRequestTicket))
listenIQChan tp ns session = do
let handlers = iqHandlers session
atomically $ do
(byNS, byID) <- readTVar handlers
iqCh <- newTChan
let (present, byNS') = Map.insertLookupWithKey'
(\_ _ old -> old)
(tp, ns)
iqCh
byNS
writeTVar handlers (byNS', byID)
return $ case present of
Nothing -> Right iqCh
Just iqCh' -> Left iqCh'
getStanzaChan :: Session -> IO (TChan Stanza)
getStanzaChan session = atomically $ dupTChan (sShadow session)
getMessageChan :: Session -> IO (TChan (Either MessageError Message))
getMessageChan session = do
mCh <- readIORef $ messagesRef session
case mCh of
Nothing -> do
mCh' <- atomically $ dupTChan (mShadow session)
writeIORef (messagesRef session) (Just mCh')
return mCh'
Just mCh' -> return mCh'
getPresenceChan :: Session -> IO (TChan (Either PresenceError Presence))
getPresenceChan session = do
pCh <- readIORef $ presenceRef session
case pCh of
Nothing -> do
pCh' <- atomically $ dupTChan (pShadow session)
writeIORef (presenceRef session) (Just pCh')
return pCh'
Just pCh' -> return pCh'
dropMessageChan :: Session -> IO ()
dropMessageChan session = writeIORef (messagesRef session) Nothing
dropPresenceChan :: Session -> IO ()
dropPresenceChan session = writeIORef (presenceRef session) Nothing
pullMessage :: Session -> IO (Either MessageError Message)
pullMessage session = do
c <- getMessageChan session
atomically $ readTChan c
pullPresence :: Session -> IO (Either PresenceError Presence)
pullPresence session = do
c <- getPresenceChan session
atomically $ readTChan c
sendStanza :: Stanza -> Session -> IO ()
sendStanza a session = atomically $ writeTChan (outCh session) a
forkSession :: Session -> IO Session
forkSession session = do
mCH' <- newIORef Nothing
pCH' <- newIORef Nothing
return $ session {messagesRef = mCH', presenceRef = pCH'}
filterMessages :: (MessageError -> Bool)
-> (Message -> Bool)
-> Session -> IO (Either MessageError Message)
filterMessages f g session = do
s <- pullMessage session
case s of
Left e | f e -> return $ Left e
| otherwise -> filterMessages f g session
Right m | g m -> return $ Right m
| otherwise -> filterMessages f g session
waitForMessage :: (Message -> Bool) -> Session -> IO Message
waitForMessage f session = do
s <- pullMessage session
case s of
Left _ -> waitForMessage f session
Right m | f m -> return m
| otherwise -> waitForMessage f session
waitForMessageError :: (MessageError -> Bool) -> Session -> IO MessageError
waitForMessageError f session = do
s <- pullMessage session
case s of
Right _ -> waitForMessageError f session
Left m | f m -> return m
| otherwise -> waitForMessageError f session
waitForPresence :: (Presence -> Bool) -> Session -> IO Presence
waitForPresence f session = do
s <- pullPresence session
case s of
Left _ -> waitForPresence f session
Right m | f m -> return m
| otherwise -> waitForPresence f session
withConnection :: XmppConMonad a -> Session -> IO (Either StreamError a)
withConnection a session = do
wait <- newEmptyTMVarIO
Ex.mask_ $ do
throwTo (readerThread session) $ Interrupt wait
s <- Ex.catch
(atomically $ do
_ <- takeTMVar (writeRef session)
s <- takeTMVar (conStateRef session)
putTMVar wait ()
return s
)
(\e -> atomically (putTMVar wait ()) >>
Ex.throwIO (e :: Ex.SomeException)
)
Ex.catches
(do
(res, s') <- runStateT a s
atomically $ do
putTMVar (writeRef session) (sConPushBS s')
putTMVar (conStateRef session) s'
return $ Right res
)
[ Ex.Handler $ \e -> return $ Left (e :: StreamError)
, Ex.Handler $ \e -> runStateT xmppKillConnection s
>> Ex.throwIO (e :: Ex.SomeException)
]
sendPresence :: Presence -> Session -> IO ()
sendPresence p session = sendStanza (PresenceS p) session
sendMessage :: Message -> Session -> IO ()
sendMessage m session = sendStanza (MessageS m) session
modifyHandlers :: (EventHandlers -> EventHandlers) -> Session -> IO ()
modifyHandlers f session = atomically $ modifyTVar (eventHandlers session) f
where
modifyTVar :: TVar a -> (a -> a) -> STM ()
modifyTVar var f = do
x <- readTVar var
writeTVar var (f x)
setConnectionClosedHandler :: (StreamError -> Session -> IO ()) -> Session -> IO ()
setConnectionClosedHandler eh session = do
modifyHandlers (\s -> s{connectionClosedHandler =
\e -> eh e session}) session
runHandler :: (EventHandlers -> IO a) -> Session -> IO a
runHandler h session = h =<< atomically (readTVar $ eventHandlers session)
endSession :: Session -> IO ()
endSession session = do
void $ withConnection xmppKillConnection session
stopThreads session
closeConnection :: Session -> IO ()
closeConnection session = Ex.mask_ $ do
send <- atomically $ takeTMVar (writeRef session)
cc <- sCloseConnection <$> ( atomically $ readTMVar (conStateRef session))
send "</stream:stream>"
void . forkIO $ do
threadDelay 3000000
(Ex.try cc) :: IO (Either Ex.SomeException ())
return ()
atomically $ putTMVar (writeRef session) (\_ -> return False)