{-# OPTIONS_HADDOCK hide #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Network.Xmpp.Concurrent.Threads where
import Control.Applicative((<$>))
import Control.Concurrent
import Control.Concurrent.STM
import qualified Control.Exception.Lifted as Ex
import Control.Monad
import Control.Monad.Except
import qualified Data.ByteString as BS
import GHC.IO (unsafeUnmask)
import Network.Xmpp.Concurrent.Types
import Network.Xmpp.Stream
import Network.Xmpp.Types
import System.Log.Logger
readWorker :: (XmppElement -> IO ())
-> (XmppFailure -> IO ())
-> TMVar Stream
-> IO a
readWorker :: (XmppElement -> IO ())
-> (XmppFailure -> IO ()) -> TMVar Stream -> IO a
readWorker XmppElement -> IO ()
onElement XmppFailure -> IO ()
onCClosed TMVar Stream
stateRef = IO () -> IO a
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO a) -> (IO () -> IO ()) -> IO () -> IO a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO () -> IO ()
forall (m :: * -> *) a. MonadBaseControl IO m => m a -> m a
Ex.mask_ (IO () -> IO a) -> IO () -> IO a
forall a b. (a -> b) -> a -> b
$ do
Maybe Stream
s' <- IO (Maybe Stream)
-> [Handler IO (Maybe Stream)] -> IO (Maybe Stream)
forall (m :: * -> *) a.
MonadBaseControl IO m =>
m a -> [Handler m a] -> m a
Ex.catches ( do
STM (Maybe Stream) -> IO (Maybe Stream)
forall a. STM a -> IO a
atomically (STM (Maybe Stream) -> IO (Maybe Stream))
-> STM (Maybe Stream) -> IO (Maybe Stream)
forall a b. (a -> b) -> a -> b
$ do
s :: Stream
s@(Stream TMVar StreamState
con) <- TMVar Stream -> STM Stream
forall a. TMVar a -> STM a
readTMVar TMVar Stream
stateRef
ConnectionState
scs <- StreamState -> ConnectionState
streamConnectionState (StreamState -> ConnectionState)
-> STM StreamState -> STM ConnectionState
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TMVar StreamState -> STM StreamState
forall a. TMVar a -> STM a
readTMVar TMVar StreamState
con
Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (ConnectionState -> Bool
stateIsClosed ConnectionState
scs)
STM ()
forall a. STM a
retry
Maybe Stream -> STM (Maybe Stream)
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe Stream -> STM (Maybe Stream))
-> Maybe Stream -> STM (Maybe Stream)
forall a b. (a -> b) -> a -> b
$ Stream -> Maybe Stream
forall a. a -> Maybe a
Just Stream
s
)
[ (Interrupt -> IO (Maybe Stream)) -> Handler IO (Maybe Stream)
forall (m :: * -> *) a e. Exception e => (e -> m a) -> Handler m a
Ex.Handler ((Interrupt -> IO (Maybe Stream)) -> Handler IO (Maybe Stream))
-> (Interrupt -> IO (Maybe Stream)) -> Handler IO (Maybe Stream)
forall a b. (a -> b) -> a -> b
$ \(Interrupt TMVar ()
t) -> do
IO [()] -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO [()] -> IO ()) -> IO [()] -> IO ()
forall a b. (a -> b) -> a -> b
$ [TMVar ()] -> IO [()]
handleInterrupts [TMVar ()
t]
Maybe Stream -> IO (Maybe Stream)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe Stream
forall a. Maybe a
Nothing
]
case Maybe Stream
s' of
Maybe Stream
Nothing -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Just Stream
s -> do
Maybe XmppElement
res <- IO (Maybe XmppElement)
-> [Handler IO (Maybe XmppElement)] -> IO (Maybe XmppElement)
forall (m :: * -> *) a.
MonadBaseControl IO m =>
m a -> [Handler m a] -> m a
Ex.catches (do
IO ()
allowInterrupt
Either XmppFailure XmppElement
res <- Stream -> IO (Either XmppFailure XmppElement)
pullXmppElement Stream
s
case Either XmppFailure XmppElement
res of
Left XmppFailure
e -> do
String -> String -> IO ()
errorM String
"Pontarius.Xmpp" (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ String
"Read error: "
String -> String -> String
forall a. [a] -> [a] -> [a]
++ XmppFailure -> String
forall a. Show a => a -> String
show XmppFailure
e
()
_ <- Stream -> IO ()
closeStreams Stream
s
XmppFailure -> IO ()
onCClosed XmppFailure
e
Maybe XmppElement -> IO (Maybe XmppElement)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe XmppElement
forall a. Maybe a
Nothing
Right XmppElement
r -> Maybe XmppElement -> IO (Maybe XmppElement)
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe XmppElement -> IO (Maybe XmppElement))
-> Maybe XmppElement -> IO (Maybe XmppElement)
forall a b. (a -> b) -> a -> b
$ XmppElement -> Maybe XmppElement
forall a. a -> Maybe a
Just XmppElement
r
)
[ (Interrupt -> IO (Maybe XmppElement))
-> Handler IO (Maybe XmppElement)
forall (m :: * -> *) a e. Exception e => (e -> m a) -> Handler m a
Ex.Handler ((Interrupt -> IO (Maybe XmppElement))
-> Handler IO (Maybe XmppElement))
-> (Interrupt -> IO (Maybe XmppElement))
-> Handler IO (Maybe XmppElement)
forall a b. (a -> b) -> a -> b
$ \(Interrupt TMVar ()
t) -> do
IO [()] -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO [()] -> IO ()) -> IO [()] -> IO ()
forall a b. (a -> b) -> a -> b
$ [TMVar ()] -> IO [()]
handleInterrupts [TMVar ()
t]
Maybe XmppElement -> IO (Maybe XmppElement)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe XmppElement
forall a. Maybe a
Nothing
]
case Maybe XmppElement
res of
Maybe XmppElement
Nothing -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Just XmppElement
sta -> IO () -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ XmppElement -> IO ()
onElement XmppElement
sta
where
allowInterrupt :: IO ()
allowInterrupt :: IO ()
allowInterrupt = IO () -> IO ()
forall a. IO a -> IO a
unsafeUnmask (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
handleInterrupts :: [TMVar ()] -> IO [()]
handleInterrupts :: [TMVar ()] -> IO [()]
handleInterrupts [TMVar ()]
ts =
IO [()] -> (Interrupt -> IO [()]) -> IO [()]
forall (m :: * -> *) e a.
(MonadBaseControl IO m, Exception e) =>
m a -> (e -> m a) -> m a
Ex.catch (STM [()] -> IO [()]
forall a. STM a -> IO a
atomically (STM [()] -> IO [()]) -> STM [()] -> IO [()]
forall a b. (a -> b) -> a -> b
$ [TMVar ()] -> (TMVar () -> STM ()) -> STM [()]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM [TMVar ()]
ts TMVar () -> STM ()
forall a. TMVar a -> STM a
takeTMVar)
(\(Interrupt TMVar ()
t) -> [TMVar ()] -> IO [()]
handleInterrupts (TMVar ()
tTMVar () -> [TMVar ()] -> [TMVar ()]
forall a. a -> [a] -> [a]
:[TMVar ()]
ts))
stateIsClosed :: ConnectionState -> Bool
stateIsClosed ConnectionState
Closed = Bool
True
stateIsClosed ConnectionState
Finished = Bool
True
stateIsClosed ConnectionState
_ = Bool
False
startThreadsWith :: TMVar (BS.ByteString -> IO (Either XmppFailure ()))
-> (XmppElement -> IO ())
-> TMVar EventHandlers
-> Stream
-> Maybe Int
-> IO (Either XmppFailure (IO (),
TMVar Stream,
ThreadId))
startThreadsWith :: TMVar (ByteString -> IO (Either XmppFailure ()))
-> (XmppElement -> IO ())
-> TMVar EventHandlers
-> Stream
-> Maybe Int
-> IO (Either XmppFailure (IO (), TMVar Stream, ThreadId))
startThreadsWith TMVar (ByteString -> IO (Either XmppFailure ()))
writeSem XmppElement -> IO ()
stanzaHandler TMVar EventHandlers
eh Stream
con Maybe Int
keepAlive = do
TMVar Stream
conS <- Stream -> IO (TMVar Stream)
forall a. a -> IO (TMVar a)
newTMVarIO Stream
con
ThreadId
cp <- IO () -> IO ThreadId
forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ Maybe Int
-> TMVar (ByteString -> IO (Either XmppFailure ())) -> IO ()
forall a. Maybe Int -> TMVar (ByteString -> IO a) -> IO ()
connPersist Maybe Int
keepAlive TMVar (ByteString -> IO (Either XmppFailure ()))
writeSem
let onConClosed :: XmppFailure -> IO ()
onConClosed XmppFailure
failure = do
IO ()
stopWrites
TMVar EventHandlers -> XmppFailure -> IO ()
noCon TMVar EventHandlers
eh XmppFailure
failure
ThreadId
rdw <- IO () -> IO ThreadId
forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ (XmppElement -> IO ())
-> (XmppFailure -> IO ()) -> TMVar Stream -> IO ()
forall a.
(XmppElement -> IO ())
-> (XmppFailure -> IO ()) -> TMVar Stream -> IO a
readWorker XmppElement -> IO ()
stanzaHandler XmppFailure -> IO ()
onConClosed TMVar Stream
conS
Either XmppFailure (IO (), TMVar Stream, ThreadId)
-> IO (Either XmppFailure (IO (), TMVar Stream, ThreadId))
forall (m :: * -> *) a. Monad m => a -> m a
return (Either XmppFailure (IO (), TMVar Stream, ThreadId)
-> IO (Either XmppFailure (IO (), TMVar Stream, ThreadId)))
-> Either XmppFailure (IO (), TMVar Stream, ThreadId)
-> IO (Either XmppFailure (IO (), TMVar Stream, ThreadId))
forall a b. (a -> b) -> a -> b
$ (IO (), TMVar Stream, ThreadId)
-> Either XmppFailure (IO (), TMVar Stream, ThreadId)
forall a b. b -> Either a b
Right ( [ThreadId] -> IO ()
forall (m :: * -> *) (t :: * -> *).
(MonadIO m, Traversable t) =>
t ThreadId -> m ()
killConnection [ThreadId
rdw, ThreadId
cp]
, TMVar Stream
conS
, ThreadId
rdw
)
where
stopWrites :: IO ()
stopWrites = STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
ByteString -> IO (Either XmppFailure ())
_ <- TMVar (ByteString -> IO (Either XmppFailure ()))
-> STM (ByteString -> IO (Either XmppFailure ()))
forall a. TMVar a -> STM a
takeTMVar TMVar (ByteString -> IO (Either XmppFailure ()))
writeSem
TMVar (ByteString -> IO (Either XmppFailure ()))
-> (ByteString -> IO (Either XmppFailure ())) -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar TMVar (ByteString -> IO (Either XmppFailure ()))
writeSem ((ByteString -> IO (Either XmppFailure ())) -> STM ())
-> (ByteString -> IO (Either XmppFailure ())) -> STM ()
forall a b. (a -> b) -> a -> b
$ \ByteString
_ -> Either XmppFailure () -> IO (Either XmppFailure ())
forall (m :: * -> *) a. Monad m => a -> m a
return (Either XmppFailure () -> IO (Either XmppFailure ()))
-> Either XmppFailure () -> IO (Either XmppFailure ())
forall a b. (a -> b) -> a -> b
$ XmppFailure -> Either XmppFailure ()
forall a b. a -> Either a b
Left XmppFailure
XmppNoStream
killConnection :: t ThreadId -> m ()
killConnection t ThreadId
threads = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
String -> String -> IO ()
debugM String
"Pontarius.Xmpp" String
"killing connection"
IO ()
stopWrites
String -> String -> IO ()
debugM String
"Pontarius.Xmpp" String
"killing threads"
t ()
_ <- t ThreadId -> (ThreadId -> IO ()) -> IO (t ())
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM t ThreadId
threads ThreadId -> IO ()
killThread
() -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
noCon :: TMVar EventHandlers -> XmppFailure -> IO ()
noCon :: TMVar EventHandlers -> XmppFailure -> IO ()
noCon TMVar EventHandlers
h XmppFailure
e = do
EventHandlers
hands <- STM EventHandlers -> IO EventHandlers
forall a. STM a -> IO a
atomically (STM EventHandlers -> IO EventHandlers)
-> STM EventHandlers -> IO EventHandlers
forall a b. (a -> b) -> a -> b
$ TMVar EventHandlers -> STM EventHandlers
forall a. TMVar a -> STM a
readTMVar TMVar EventHandlers
h
ThreadId
_ <- IO () -> IO ThreadId
forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ EventHandlers -> XmppFailure -> IO ()
connectionClosedHandler EventHandlers
hands XmppFailure
e
() -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
connPersist :: Maybe Int -> TMVar (BS.ByteString -> IO a) -> IO ()
connPersist :: Maybe Int -> TMVar (ByteString -> IO a) -> IO ()
connPersist (Just Int
delay) TMVar (ByteString -> IO a)
sem = IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
ByteString -> IO a
pushBS <- STM (ByteString -> IO a) -> IO (ByteString -> IO a)
forall a. STM a -> IO a
atomically (STM (ByteString -> IO a) -> IO (ByteString -> IO a))
-> STM (ByteString -> IO a) -> IO (ByteString -> IO a)
forall a b. (a -> b) -> a -> b
$ TMVar (ByteString -> IO a) -> STM (ByteString -> IO a)
forall a. TMVar a -> STM a
takeTMVar TMVar (ByteString -> IO a)
sem
a
_ <- ByteString -> IO a
pushBS ByteString
" "
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMVar (ByteString -> IO a) -> (ByteString -> IO a) -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar TMVar (ByteString -> IO a)
sem ByteString -> IO a
pushBS
Int -> IO ()
threadDelay (Int
delayInt -> Int -> Int
forall a. Num a => a -> a -> a
*Int
1000000)
connPersist Maybe Int
Nothing TMVar (ByteString -> IO a)
_ = () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()