-----------------------------------------------------------------------------
-- |
-- Module      :  Network.XMPP.Concurrent
-- Copyright   :  (c) pierre, 2007
-- License     :  BSD-style (see the file libraries/base/LICENSE)
-- 
-- Maintainer  :  k.pierre.k@gmail.com
-- Stability   :  experimental
-- Portability :  portable
--
-- Concurrent actions over single IO channel
--
-----------------------------------------------------------------------------
module Network.XMPP.Concurrent
  ( Thread
  , XmppThreadT
  , runThreaded
  , readChanS
  , writeChanS
  , withNewThread
  , loop
  , waitFor
  ) where

import Network.XMPP.Stream
import Network.XMPP.Stanza
import Network.XMPP.Types
import Control.Concurrent
import Control.Concurrent.STM
import Control.Monad.State 
import Control.Monad.Reader 
import Network.XMPP.Utils

import System.IO
    
data Thread = Thread { inCh :: TChan Stanza
                     , outCh :: TChan Stanza
                     }

type XmppThreadT a = ReaderT Thread IO a
       
-- Two streams: input and output. Threads read from input stream and write to output stream.
-- | Runs thread in XmppState monad
runThreaded :: XmppThreadT () -> XmppStateT ()
runThreaded a = do
  in' <- liftIO $ atomically $ newTChan
  out' <- liftIO $ atomically $ newTChan
  liftIO $ forkIO $ runReaderT a (Thread in' out')
  s <- get
  liftIO $ forkIO $ loopWrite s out'
  liftIO $ forkIO $ connPersist (handle s)
  loopRead in' 
    where 
      loopRead in' = loop $ 
          do              
            st <- parseM
            liftIO $ atomically $ writeTChan in' st
      loopWrite s out' =
          do
            withNewStream $ do
              put s
              loop $ do
                st <- liftIO $ atomically $ readTChan out'
                outStanza st
            return ()                   
      loop = sequence_ . repeat
       
readChanS :: XmppThreadT Stanza
readChanS = do
  c <- asks inCh 
  st <- liftIO $ atomically $ readTChan c
  return st  

writeChanS :: Stanza -> XmppThreadT ()
writeChanS a = do
  c <- asks outCh
  st <- liftIO $ atomically $ writeTChan c a 
  return ()

-- | Runs specified action in parallel
withNewThread :: XmppThreadT () -> XmppThreadT ThreadId
withNewThread a = do
  in' <- asks inCh
  out' <- asks outCh
  newin <- liftIO $ atomically $ dupTChan in'
  liftIO $ forkIO $ runReaderT a (Thread newin out')

-- | Turns action into infinite loop
loop :: XmppThreadT () -> XmppThreadT ()
loop a = do
  a
  loop a

waitFor :: (Stanza -> Bool) -> XmppThreadT Stanza
waitFor f = do
  s <- readChanS
  if (f s) then 
    return s
    else do
      waitFor f

connPersist :: Handle -> IO ()
connPersist h = do
  hPutStr h " "
  debugIO "<space added>"
  threadDelay 30000000
  connPersist h