-- GSoC 2013 - Communicating with mobile devices.

{-# LANGUAGE OverloadedStrings , ScopedTypeVariables #-}

-- | This Module define the main function to send Push Notifications through Cloud Connection Server (GCM).
module Network.PushNotify.Ccs.Send
    ( startCCS
    , closeCCS
    , sendCCS
    , withCCS
    , withCCS'
    ) where

import Network.PushNotify.Ccs.Constants
import Network.PushNotify.Ccs.Types
import Network.PushNotify.Gcm.Types

import Control.Applicative
import Control.Concurrent
import Control.Concurrent.Async
import Control.Concurrent.Chan
import Control.Concurrent.STM.TChan
import Control.Monad.Error
import Control.Monad.STM
import Control.Monad
import Control.Retry
import Data.Aeson
import Data.Aeson.Types
import Data.Aeson.Parser
import Data.Default
import Data.Functor
import Data.IORef
import Data.Int
import Data.List
import Data.Text
import Data.Text.Encoding
import Data.Monoid                          ((<>))
import Data.XML.Types
import qualified Data.Attoparsec.ByteString as AB
import qualified Control.Exception          as CE
import qualified Data.HashMap.Strict        as HM
import qualified Data.HashSet               as HS
import qualified Data.ByteString.Lazy       as BL
import qualified Data.ByteString            as BS
import qualified Data.Text.Encoding         as E
import Network
import Network.Xmpp
import Network.Xmpp.Internal
import Network.TLS
import Network.TLS.Extra
import Crypto.Random.API
import qualified Crypto.Random.AESCtr
import GHC.IO.Handle
import System.Log.Logger

-- 'connectCCS' starts a secure connection with CCS servers.
connectCCS :: GCMCcsConfig -> IO Session
connectCCS config = do
    let getStreamHandle = lift $ do
            hdl <- connectTo (unpack cCCS_URL) (PortNumber (fromIntegral cCCS_PORT))
            let bck = Backend { backendFlush = hFlush hdl
                              , backendClose = hClose hdl
                              , backendSend  = BS.hPut hdl
                              , backendRecv  = BS.hGet hdl
            gen <- Crypto.Random.AESCtr.makeSystem
            ctx <- contextNew
                       (defaultParamsClient { pCiphers = ciphersuite_medium })
            handshake ctx
            return StreamHandle { streamSend    = \bs -> CE.catch
                                                    (sendData ctx (BL.fromChunks [bs]) >> return (Right ()))
                                                    (\(_e :: CE.SomeException) -> return $ Left XmppOtherFailure)
                                , streamReceive = \bs -> CE.catch
                                                    (recvData ctx >>= return . Right )
                                                    (\(_e :: CE.SomeException) -> return $ Left XmppOtherFailure)
                                , streamFlush   = contextFlush ctx
                                , streamClose   = bye ctx }
    result <- session
                (unpack cCCS_URL)
                (Just ( \_ -> [plain (senderID config <> "@" <> cCCS_URL) Nothing (aPiKey config) ] , Nothing))
                def{ sessionStreamConfiguration = def{
                         connectionDetails = UseConnection getStreamHandle }
    case result of
        Right s -> return s
        Left e  -> fail $ "XmppFailure: " ++ (show e)

-- | 'startCCS' starts the CCS service, which means starting a worker thread which maintains a connection with CCS servers.
startCCS :: GCMCcsConfig                 -- ^ The main configuration for the GCM service.
         -> (RegId -> Value -> IO ())    -- ^ A callback function to be called each time a message arrives from a device.
         -> IO CCSManager
startCCS config newMessageCallbackFunction = do
        c      <- newTChanIO
        ref    <- newIORef $ Just ()
        tID    <- forkIO $ CE.catch (ccsWorker config c newMessageCallbackFunction)
                                    (\(e :: CE.SomeException) -> atomicModifyIORef ref (\_ -> (Nothing,())) )
        return $ CCSManager ref c tID

-- Main worker thread.
ccsWorker :: GCMCcsConfig -> TChan (Chan GCMresult , MVar (Chan ()), GCMmessage) -> (RegId -> Value -> IO ()) -> IO ()
ccsWorker config requestChan callBackF = do
        sess      <- recoverAll (ccsRetrySettings config) $ connectCCS config
        cont      <- newIORef 1000
        hmap      <- newIORef HM.empty
        lock      <- newEmptyMVar
        locki     <- newMVar ()
        errorChan <- newChan -- new Error Channel.
        s         <- async (sender 1 cont lock locki hmap requestChan errorChan sess)
        r         <- async (receiver cont lock hmap sess)
        res       <- waitEither s r
        case res of
            Left  _ -> do
                         cancel r
                         writeChan errorChan ()
            Right v -> do
                         takeMVar locki
                         cancel s
                         writeChan errorChan ()
        ccsWorker config requestChan callBackF -- restarts.

            buildMessage :: Value -> Message
            buildMessage value = Message{
                                    messageID      = Nothing
                                  , messageFrom    = Nothing
                                  , messageTo      = Nothing
                                  , messageLangTag = Nothing
                                  , messageType    = Normal
                                  , messagePayload = [ Element (Name "gcm" (Just "google:mobile:data") Nothing) []
                                                       [(NodeContent $ ContentText $ E.decodeUtf8 $
                                                                       BS.concat . BL.toChunks $ encode value)]

            buildAck :: Text -> Text -> Message
            buildAck regId id = buildMessage $ object [cTo .= regId , cMessageId .= id , cMessageType .= cAck]

            sender  :: Int32
                    -> IORef Int
                    -> MVar ()
                    -> MVar ()
                    -> IORef (HM.HashMap Text (Chan GCMresult,RegId))
                    -> TChan (Chan GCMresult , MVar (Chan ()), GCMmessage)
                    -> Chan ()
                    -> Session
                    -> IO Int
            sender n cont lock locki hmap requestChan errorChan sess = do -- this function reads the channel and sends the messages.

                    atomically $ peekTChan requestChan
                    -- Now there is at least one element in the channel, so the next readTChan won't block.
                    takeMVar locki

                    (chanRes,varErr,msg) <- atomically $ readTChan requestChan
                    echan <- dupChan errorChan
                    putMVar varErr echan -- Here, notifies that it is attending this request,
                                         -- and provides a duplicated error channel.
                    putMVar locki ()

                    m <- loopSend (HS.toList $ registration_ids msg) msg chanRes sess hmap cont lock n

                    sender m cont lock locki hmap requestChan errorChan sess

            loopSend []     _   _       _    _    _    _    n = return n
            loopSend (x:xs) msg chanRes sess hmap cont lock n = do
                    checkCounter cont lock

                    let id     = pack $ show n
                        value  = fromGCMtoCCS x id msg

                    atomicModifyIORef hmap (\hashMap -> (HM.insert id (chanRes,x) hashMap,()))

                    sendMessage (buildMessage value) sess

                    loopSend xs msg chanRes sess hmap cont lock (n+1)

            checkCounter :: IORef Int -> MVar () -> IO ()
            checkCounter cont lock = do
                    newC <- atomicModifyIORef cont (\c -> (c-1,c-1))
                    if newC == 0
                      then do
                             race (threadDelay 5000000 >> fail "Timeout") (takeMVar lock) -- blocks
                             return ()
                      else return ()

            controlPars :: Value -> Parser (Text,Text,Text,Maybe Text)
            controlPars (Object v) = (,,,) <$>
                         v .: cMessageId   <*>
                         v .: cFrom        <*>
                         v .: cMessageType <*>
                         v .:? cError
            controlPars _          = mzero

            msgPars :: Value -> Parser (Value,Text,Text)
            msgPars (Object v) = (,,) <$>
                      v .: cData      <*>
                      v .: cMessageId <*>
                      v .: cFrom
            msgPars _          = mzero

            receiver :: IORef Int
                     -> MVar ()
                     -> IORef (HM.HashMap Text (Chan GCMresult,RegId))
                     -> Session
                     -> IO Int
            receiver cont lock hmap sess = do
                    msg <- getMessage sess
                    let [Element _ _ ([NodeContent (ContentText p)])] = messagePayload msg
                        value = case AB.maybeResult $ AB.parse json $ encodeUtf8 p of
                                    Nothing -> object []
                                    Just v  -> v

                    case parseMaybe controlPars value of
                        Nothing         -> case parseMaybe msgPars value of
                                             Nothing       -> return () -- No expected msg.
                                             Just (v,id,f) -> do -- This is a message from device so I send the Ack response to CCS server
                                                                 -- and start the callback function.
                                                                sendMessage (buildAck f id) sess
                                                                forkIO $ callBackF f v
                                                                return ()
                        Just (id,f,t,e) -> do -- This is an ACK/NACK message so I look for the entry of this message
                                              -- in the hashmap and I put the response in the proper channel.

                                             oldC <- atomicModifyIORef cont (\c -> (c+1,c))

                                             if oldC == 0
                                               then putMVar lock () -- unblock the sender thread
                                               else return ()

                                             hashMap <- readIORef hmap

                                             case HM.lookup id hashMap of
                                               Just (chanRes,regId) -> do
                                                 let result = getRes t e regId
                                                 writeChan chanRes result
                                               Nothing -> return ()

                                             atomicModifyIORef hmap (\hashMap -> (HM.delete id hashMap,()))

                    receiver cont lock hmap sess
                      getRes t e regId
                       | t == cAck                      = def{success = Just 1}
                       | e == Just cBadRegistration     = def{failure = Just 1 , errorRest         = HM.singleton regId cBadRegistration}
                       | e == Just cDeviceUnregistered  = def{failure = Just 1 , errorUnRegistered = HS.singleton regId }
                       | e == Just cInternalServerError = def{failure = Just 1 , errorRest         = HM.singleton regId cInternalServerError }
                       | e == Just cServiceUnAvailable  = def{failure = Just 1 , errorToReSend     = HS.singleton regId }
                       | otherwise                      = def{failure = Just 1 , errorToReSend     = HS.singleton regId } -- no expected msg

-- | 'closeCCS' stops the CCS service.
-- This means stopping the worker thread which maintains a connection with CCS servers.
closeCCS :: CCSManager -> IO ()
closeCCS m = do
               atomicModifyIORef (mState m) (\_ -> (Nothing,()))
               killThread $ mWorkerID m

-- | 'sendCCS' sends messages to a CCS Server.
-- Every time you call this function, it will put the notification in a channel waiting to be proceesed by the worker thread.
-- It will block until the worker thread receives a response from CCS server.
sendCCS :: CCSManager -> GCMmessage -> IO GCMresult
sendCCS man msg = do
    s <- readIORef $ mState man
    case s of
      Nothing -> fail "CCS Service closed."
      Just () -> do
          let requestChan = mCcsChannel man
          chanRes   <- newChan
          varErr    <- newEmptyMVar
          atomically $ writeTChan requestChan (chanRes,varErr,msg)
          errorChan <- takeMVar varErr
          v         <- race (readChan errorChan) (loopResponse chanRes)
          case v of
              Left _  -> return def{ failure = Just (HS.size $ registration_ids msg)
                                   , errorToReSend = (registration_ids msg)} -- Error while sending.
              Right r -> return r -- Successful.
             loopResponse chan = Data.List.foldr (\_ m -> do
                                                           r   <- readChan chan
                                                           res <- m
                                                           return $ r <> res)
                                                (return def)
                                                (HS.toList $ registration_ids msg)

-- | 'withCCS' creates a new manager, uses it in the provided function, and then releases it.
-- (The second argument is a callback function to be called each time a message arrives from a device).
withCCS :: GCMCcsConfig -> (RegId -> Value -> IO ()) -> (CCSManager -> IO a) -> IO a
withCCS confg callback fun = CE.bracket (startCCS confg callback) closeCCS fun

-- | 'withCCS'' creates a new manager, uses it in the provided function, and then releases it 
-- (ignores messages that arrive from a device).
withCCS' :: GCMCcsConfig -> (CCSManager -> IO a) -> IO a
withCCS' confg fun = withCCS confg (\_ _ -> return ()) fun