{-# OPTIONS_HADDOCK hide #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE ScopedTypeVariables #-}

module Network.Xmpp.Concurrent.Threads where

import           Control.Applicative((<$>))
import           Control.Concurrent
import           Control.Concurrent.STM
import qualified Control.Exception.Lifted as Ex
import           Control.Monad
import           Control.Monad.Except
import qualified Data.ByteString as BS
import           GHC.IO (unsafeUnmask)
import           Network.Xmpp.Concurrent.Types
import           Network.Xmpp.Stream
import           Network.Xmpp.Types
import           System.Log.Logger

-- Worker to read stanzas from the stream and concurrently distribute them to
-- all listener threads.
readWorker :: (XmppElement -> IO ())
           -> (XmppFailure -> IO ())
           -> TMVar Stream
           -> IO a
readWorker :: (XmppElement -> IO ())
-> (XmppFailure -> IO ()) -> TMVar Stream -> IO a
readWorker XmppElement -> IO ()
onElement XmppFailure -> IO ()
onCClosed TMVar Stream
stateRef = IO () -> IO a
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO a) -> (IO () -> IO ()) -> IO () -> IO a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO () -> IO ()
forall (m :: * -> *) a. MonadBaseControl IO m => m a -> m a
Ex.mask_ (IO () -> IO a) -> IO () -> IO a
forall a b. (a -> b) -> a -> b
$ do
    Maybe Stream
s' <- IO (Maybe Stream)
-> [Handler IO (Maybe Stream)] -> IO (Maybe Stream)
forall (m :: * -> *) a.
MonadBaseControl IO m =>
m a -> [Handler m a] -> m a
Ex.catches ( do
                        STM (Maybe Stream) -> IO (Maybe Stream)
forall a. STM a -> IO a
atomically (STM (Maybe Stream) -> IO (Maybe Stream))
-> STM (Maybe Stream) -> IO (Maybe Stream)
forall a b. (a -> b) -> a -> b
$ do
                            s :: Stream
s@(Stream TMVar StreamState
con) <- TMVar Stream -> STM Stream
forall a. TMVar a -> STM a
readTMVar TMVar Stream
stateRef
                            ConnectionState
scs <- StreamState -> ConnectionState
streamConnectionState (StreamState -> ConnectionState)
-> STM StreamState -> STM ConnectionState
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TMVar StreamState -> STM StreamState
forall a. TMVar a -> STM a
readTMVar TMVar StreamState
con
                            Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (ConnectionState -> Bool
stateIsClosed ConnectionState
scs)
                                 STM ()
forall a. STM a
retry
                            Maybe Stream -> STM (Maybe Stream)
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe Stream -> STM (Maybe Stream))
-> Maybe Stream -> STM (Maybe Stream)
forall a b. (a -> b) -> a -> b
$ Stream -> Maybe Stream
forall a. a -> Maybe a
Just Stream
s
                   )
               [ (Interrupt -> IO (Maybe Stream)) -> Handler IO (Maybe Stream)
forall (m :: * -> *) a e. Exception e => (e -> m a) -> Handler m a
Ex.Handler ((Interrupt -> IO (Maybe Stream)) -> Handler IO (Maybe Stream))
-> (Interrupt -> IO (Maybe Stream)) -> Handler IO (Maybe Stream)
forall a b. (a -> b) -> a -> b
$ \(Interrupt TMVar ()
t) -> do
                     IO [()] -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO [()] -> IO ()) -> IO [()] -> IO ()
forall a b. (a -> b) -> a -> b
$ [TMVar ()] -> IO [()]
handleInterrupts [TMVar ()
t]
                     Maybe Stream -> IO (Maybe Stream)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe Stream
forall a. Maybe a
Nothing

               ]
    case Maybe Stream
s' of -- Maybe Stream
        Maybe Stream
Nothing -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
        Just Stream
s -> do -- Stream
            Maybe XmppElement
res <- IO (Maybe XmppElement)
-> [Handler IO (Maybe XmppElement)] -> IO (Maybe XmppElement)
forall (m :: * -> *) a.
MonadBaseControl IO m =>
m a -> [Handler m a] -> m a
Ex.catches (do
                   -- we don't know whether pull will
                   -- necessarily be interruptible
                             IO ()
allowInterrupt
                             Either XmppFailure XmppElement
res <- Stream -> IO (Either XmppFailure XmppElement)
pullXmppElement Stream
s
                             case Either XmppFailure XmppElement
res of
                                 Left XmppFailure
e -> do
                                     String -> String -> IO ()
errorM String
"Pontarius.Xmpp" (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ String
"Read error: "
                                         String -> String -> String
forall a. [a] -> [a] -> [a]
++ XmppFailure -> String
forall a. Show a => a -> String
show XmppFailure
e
                                     ()
_ <- Stream -> IO ()
closeStreams Stream
s
                                     XmppFailure -> IO ()
onCClosed XmppFailure
e
                                     Maybe XmppElement -> IO (Maybe XmppElement)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe XmppElement
forall a. Maybe a
Nothing
                                 Right XmppElement
r -> Maybe XmppElement -> IO (Maybe XmppElement)
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe XmppElement -> IO (Maybe XmppElement))
-> Maybe XmppElement -> IO (Maybe XmppElement)
forall a b. (a -> b) -> a -> b
$ XmppElement -> Maybe XmppElement
forall a. a -> Maybe a
Just XmppElement
r
                              )
                       [ (Interrupt -> IO (Maybe XmppElement))
-> Handler IO (Maybe XmppElement)
forall (m :: * -> *) a e. Exception e => (e -> m a) -> Handler m a
Ex.Handler ((Interrupt -> IO (Maybe XmppElement))
 -> Handler IO (Maybe XmppElement))
-> (Interrupt -> IO (Maybe XmppElement))
-> Handler IO (Maybe XmppElement)
forall a b. (a -> b) -> a -> b
$ \(Interrupt TMVar ()
t) -> do
                              IO [()] -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO [()] -> IO ()) -> IO [()] -> IO ()
forall a b. (a -> b) -> a -> b
$ [TMVar ()] -> IO [()]
handleInterrupts [TMVar ()
t]
                              Maybe XmppElement -> IO (Maybe XmppElement)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe XmppElement
forall a. Maybe a
Nothing
                       ]
            case Maybe XmppElement
res of
                Maybe XmppElement
Nothing -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return () -- Caught an exception, nothing to
                                     -- do. TODO: Can this happen?
                Just XmppElement
sta -> IO () -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ XmppElement -> IO ()
onElement XmppElement
sta
  where
    -- Defining an Control.Exception.allowInterrupt equivalent for GHC 7
    -- compatibility.
    allowInterrupt :: IO ()
    allowInterrupt :: IO ()
allowInterrupt = IO () -> IO ()
forall a. IO a -> IO a
unsafeUnmask (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
    -- While waiting for the first semaphore(s) to flip we might receive another
    -- interrupt. When that happens we add it's semaphore to the list and retry
    -- waiting.
    handleInterrupts :: [TMVar ()] -> IO [()]
    handleInterrupts :: [TMVar ()] -> IO [()]
handleInterrupts [TMVar ()]
ts =
        IO [()] -> (Interrupt -> IO [()]) -> IO [()]
forall (m :: * -> *) e a.
(MonadBaseControl IO m, Exception e) =>
m a -> (e -> m a) -> m a
Ex.catch (STM [()] -> IO [()]
forall a. STM a -> IO a
atomically (STM [()] -> IO [()]) -> STM [()] -> IO [()]
forall a b. (a -> b) -> a -> b
$ [TMVar ()] -> (TMVar () -> STM ()) -> STM [()]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM [TMVar ()]
ts TMVar () -> STM ()
forall a. TMVar a -> STM a
takeTMVar)
            (\(Interrupt TMVar ()
t) -> [TMVar ()] -> IO [()]
handleInterrupts (TMVar ()
tTMVar () -> [TMVar ()] -> [TMVar ()]
forall a. a -> [a] -> [a]
:[TMVar ()]
ts))
    stateIsClosed :: ConnectionState -> Bool
stateIsClosed ConnectionState
Closed   = Bool
True
    stateIsClosed ConnectionState
Finished = Bool
True
    stateIsClosed ConnectionState
_        = Bool
False

-- | Runs thread in XmppState monad. Returns channel of incoming and outgoing
-- stances, respectively, and an Action to stop the Threads and close the
-- connection.
startThreadsWith :: TMVar (BS.ByteString -> IO (Either XmppFailure ()))
                 -> (XmppElement -> IO ())
                 -> TMVar EventHandlers
                 -> Stream
                 -> Maybe Int
                 -> IO (Either XmppFailure (IO (),
                                            TMVar Stream,
                                            ThreadId))
startThreadsWith :: TMVar (ByteString -> IO (Either XmppFailure ()))
-> (XmppElement -> IO ())
-> TMVar EventHandlers
-> Stream
-> Maybe Int
-> IO (Either XmppFailure (IO (), TMVar Stream, ThreadId))
startThreadsWith TMVar (ByteString -> IO (Either XmppFailure ()))
writeSem XmppElement -> IO ()
stanzaHandler TMVar EventHandlers
eh Stream
con Maybe Int
keepAlive = do
    -- read' <- withStream' (gets $ streamSend . streamHandle) con
    -- writeSem <- newTMVarIO read'
    TMVar Stream
conS <- Stream -> IO (TMVar Stream)
forall a. a -> IO (TMVar a)
newTMVarIO Stream
con
    ThreadId
cp <- IO () -> IO ThreadId
forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ Maybe Int
-> TMVar (ByteString -> IO (Either XmppFailure ())) -> IO ()
forall a. Maybe Int -> TMVar (ByteString -> IO a) -> IO ()
connPersist Maybe Int
keepAlive TMVar (ByteString -> IO (Either XmppFailure ()))
writeSem
    let onConClosed :: XmppFailure -> IO ()
onConClosed XmppFailure
failure = do
            IO ()
stopWrites
            TMVar EventHandlers -> XmppFailure -> IO ()
noCon TMVar EventHandlers
eh XmppFailure
failure
    ThreadId
rdw <- IO () -> IO ThreadId
forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ (XmppElement -> IO ())
-> (XmppFailure -> IO ()) -> TMVar Stream -> IO ()
forall a.
(XmppElement -> IO ())
-> (XmppFailure -> IO ()) -> TMVar Stream -> IO a
readWorker XmppElement -> IO ()
stanzaHandler XmppFailure -> IO ()
onConClosed TMVar Stream
conS
    Either XmppFailure (IO (), TMVar Stream, ThreadId)
-> IO (Either XmppFailure (IO (), TMVar Stream, ThreadId))
forall (m :: * -> *) a. Monad m => a -> m a
return (Either XmppFailure (IO (), TMVar Stream, ThreadId)
 -> IO (Either XmppFailure (IO (), TMVar Stream, ThreadId)))
-> Either XmppFailure (IO (), TMVar Stream, ThreadId)
-> IO (Either XmppFailure (IO (), TMVar Stream, ThreadId))
forall a b. (a -> b) -> a -> b
$ (IO (), TMVar Stream, ThreadId)
-> Either XmppFailure (IO (), TMVar Stream, ThreadId)
forall a b. b -> Either a b
Right ( [ThreadId] -> IO ()
forall (m :: * -> *) (t :: * -> *).
(MonadIO m, Traversable t) =>
t ThreadId -> m ()
killConnection [ThreadId
rdw, ThreadId
cp]
                   , TMVar Stream
conS
                   , ThreadId
rdw
                   )
  where
    stopWrites :: IO ()
stopWrites = STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
        ByteString -> IO (Either XmppFailure ())
_ <- TMVar (ByteString -> IO (Either XmppFailure ()))
-> STM (ByteString -> IO (Either XmppFailure ()))
forall a. TMVar a -> STM a
takeTMVar TMVar (ByteString -> IO (Either XmppFailure ()))
writeSem
        TMVar (ByteString -> IO (Either XmppFailure ()))
-> (ByteString -> IO (Either XmppFailure ())) -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar TMVar (ByteString -> IO (Either XmppFailure ()))
writeSem ((ByteString -> IO (Either XmppFailure ())) -> STM ())
-> (ByteString -> IO (Either XmppFailure ())) -> STM ()
forall a b. (a -> b) -> a -> b
$ \ByteString
_ -> Either XmppFailure () -> IO (Either XmppFailure ())
forall (m :: * -> *) a. Monad m => a -> m a
return (Either XmppFailure () -> IO (Either XmppFailure ()))
-> Either XmppFailure () -> IO (Either XmppFailure ())
forall a b. (a -> b) -> a -> b
$ XmppFailure -> Either XmppFailure ()
forall a b. a -> Either a b
Left XmppFailure
XmppNoStream
    killConnection :: t ThreadId -> m ()
killConnection t ThreadId
threads = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
        String -> String -> IO ()
debugM String
"Pontarius.Xmpp" String
"killing connection"
        IO ()
stopWrites
        String -> String -> IO ()
debugM String
"Pontarius.Xmpp" String
"killing threads"
        t ()
_ <- t ThreadId -> (ThreadId -> IO ()) -> IO (t ())
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM t ThreadId
threads ThreadId -> IO ()
killThread
        () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
    -- Call the connection closed handlers.
    noCon :: TMVar EventHandlers -> XmppFailure -> IO ()
    noCon :: TMVar EventHandlers -> XmppFailure -> IO ()
noCon TMVar EventHandlers
h XmppFailure
e = do
        EventHandlers
hands <- STM EventHandlers -> IO EventHandlers
forall a. STM a -> IO a
atomically (STM EventHandlers -> IO EventHandlers)
-> STM EventHandlers -> IO EventHandlers
forall a b. (a -> b) -> a -> b
$ TMVar EventHandlers -> STM EventHandlers
forall a. TMVar a -> STM a
readTMVar TMVar EventHandlers
h
        ThreadId
_ <- IO () -> IO ThreadId
forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ EventHandlers -> XmppFailure -> IO ()
connectionClosedHandler EventHandlers
hands XmppFailure
e
        () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()

-- Acquires the write lock, pushes a space, and releases the lock.
-- | Sends a blank space every <delay> seconds to keep the connection alive.
connPersist :: Maybe Int -> TMVar (BS.ByteString -> IO a) -> IO ()
connPersist :: Maybe Int -> TMVar (ByteString -> IO a) -> IO ()
connPersist (Just Int
delay) TMVar (ByteString -> IO a)
sem = IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
    ByteString -> IO a
pushBS <- STM (ByteString -> IO a) -> IO (ByteString -> IO a)
forall a. STM a -> IO a
atomically (STM (ByteString -> IO a) -> IO (ByteString -> IO a))
-> STM (ByteString -> IO a) -> IO (ByteString -> IO a)
forall a b. (a -> b) -> a -> b
$ TMVar (ByteString -> IO a) -> STM (ByteString -> IO a)
forall a. TMVar a -> STM a
takeTMVar TMVar (ByteString -> IO a)
sem
    a
_ <- ByteString -> IO a
pushBS ByteString
" "
    STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMVar (ByteString -> IO a) -> (ByteString -> IO a) -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar TMVar (ByteString -> IO a)
sem ByteString -> IO a
pushBS
    Int -> IO ()
threadDelay (Int
delayInt -> Int -> Int
forall a. Num a => a -> a -> a
*Int
1000000)
connPersist Maybe Int
Nothing TMVar (ByteString -> IO a)
_ = () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()