{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE CPP #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE DeriveDataTypeable #-}
{-# LANGUAGE DoAndIfThenElse #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TupleSections #-}

{-|
DMCC session handling.
-}

module DMCC.Session
  ( Session (..)
  , startSession
  , stopSession
  , defaultSessionOptions

  , DMCCError (..)

  , DMCCHandle (..)
  , sendRequestSync
  , sendRequestAsync
  )

where

import           DMCC.Prelude

import           Control.Arrow ()

import qualified Data.Map.Strict as Map
import qualified Data.Set as Set
import qualified Data.IntMap.Strict as IntMap
import           Data.Text as T (empty)
import           Data.Typeable ()

import           System.IO.Streams ( InputStream
                                   , OutputStream
                                   , ReadTooShortException
                                   , write
                                   )
import qualified System.IO.Streams.SSL as SSLStreams

import qualified Network.HTTP.Client as HTTP
import           Network.Socket hiding (connect)
import           OpenSSL
import qualified OpenSSL.Session as SSL

import           DMCC.Types
import           DMCC.XML.Request (Request)
import qualified DMCC.XML.Request as Rq
import           DMCC.XML.Response (Response)
import qualified DMCC.XML.Response as Rs
import qualified DMCC.XML.Raw as Raw

import {-# SOURCE #-} DMCC.Agent


-- | Third element is a connection close action.
type ConnectionData = (InputStream ByteString, OutputStream ByteString, IO ())


-- | Low-level DMCC API plumbing.
data DMCCHandle = DMCCHandle
  { connection :: TMVar ConnectionData
  -- ^ AVAYA server socket streams and connection cleanup action.
  , dmccSession :: TMVar (Text, Int)
  -- ^ DMCC session ID and duration.
  , reconnect :: forall m
               . (MonadUnliftIO m, MonadLoggerIO m, MonadBaseControl IO m, MonadCatch m)
              => m ()
  -- ^ Reconnect to AVAYA server, changing socket streams, cleanup
  -- action and session.
  , pingThread :: ThreadId
  , readThread :: ThreadId
  -- ^ DMCC response reader thread.
  , procThread :: ThreadId
  -- ^ Response handler/synchronous requests worker thread.
  , invokeId :: TVar Int
  -- ^ Request/response counter.
  , syncResponses :: TVar (IntMap.IntMap (TMVar (Maybe Response)))
  , agentRequests :: TVar (IntMap.IntMap AgentId)
  -- ^ Keeps track of which request has has been issued by what agent
  -- (if there's one) until its response arrives.
  , sessionOptions :: SessionOptions
  }


-- | Library API session.
data Session = Session
  { protocolVersion :: Text
  , dmccHandle :: DMCCHandle
  , webHook :: Maybe (HTTP.Request, HTTP.Manager)
  -- ^ Web hook handler URL and manager.
  , agents :: TVar (Map.Map AgentId Agent)
  , agentLocks :: TVar (Set.Set AgentId)
  }


instance Show Session where
  show as = "Session{protocolVersion=" <> unpack (protocolVersion as) <> "}"


data LoopEvent
  = DMCCRsp Response
  | Timeout
  | ReadError
  deriving Show


data DMCCError = ApplicationSessionFailed
                 deriving (Show, Typeable)


instance Exception DMCCError


defaultSessionOptions :: SessionOptions
defaultSessionOptions = SessionOptions 1 120 24 5


startSession :: (MonadUnliftIO m, MonadLoggerIO m, MonadBaseControl IO m, MonadCatch m)
             => (String, PortNumber)
             -- ^ Host and port of AES server.
             -> Maybe FilePath
             -- ^ AES CA certificates directory for TLS.
             -> Text
             -- ^ DMCC API user.
             -> Text
             -- ^ DMCC API password.
             -> Maybe String
             -- ^ Web hook URL.
             -> SessionOptions
             -> m Session
startSession (host, port) caDir user pass whUrl sopts = do
  syncResponses <- newTVarIO IntMap.empty
  agentRequests <- newTVarIO IntMap.empty
  invoke <- newTVarIO 0

  -- When this is empty, I/O streams to the server are not ready.
  conn <- newEmptyTMVarIO
  -- When this is empty, DMCC session is not ready or is being
  -- recovered.
  sess <- newEmptyTMVarIO

  let
    -- Connect to the server, produce I/O streams and a cleanup action
    connect :: (MonadUnliftIO m, MonadBase IO m, MonadLoggerIO m, MonadCatch m) => m ConnectionData
    connect = connect1 (connectionRetryAttempts sopts)
      where
        connectExHandler
          :: (Exception e, Show e, MonadUnliftIO m, MonadLoggerIO m, MonadBase IO m, MonadCatch m)
          => Int -> e -> m ConnectionData
        connectExHandler attempts e = do
          logErrorN $ "Connection failed: " <> tshow e
          if attempts > 0
             then threadDelay (connectionRetryDelay sopts * 1000000) >> connect1 (attempts - 1)
             else throwIO e
        connect1 attempts =
          handleNetwork (connectExHandler attempts) $
          liftIO $ withOpenSSL $ do
              sslCtx <- SSL.context
              SSL.contextSetDefaultCiphers sslCtx
              SSL.contextSetVerificationMode sslCtx $
                SSL.VerifyPeer True True Nothing
              maybe (pure ()) (SSL.contextSetCADirectory sslCtx) caDir
              (is, os, ssl) <- SSLStreams.connect sslCtx host port
              let cl = do
                    SSL.shutdown ssl SSL.Unidirectional
                    maybe (pure ()) close $ SSL.sslSocket ssl
              pure (is, os, cl)

    -- Start new DMCC session
    startDMCCSession :: (MonadUnliftIO m, MonadLoggerIO m, MonadBaseControl IO m, MonadCatch m)
                     => Maybe Text
                     -- ^ Previous session ID (we attempt to recover
                     -- when this is given).
                     -> m ((Text, Int), Text)
    startDMCCSession old = do
      let
        sendReq =
          sendRequestSyncRaw
          conn
          reconnect
          invoke
          syncResponses
          Nothing
        startReq sid =
          sendReq
          Rq.StartApplicationSession
          { applicationId = ""
          , requestedProtocolVersion = Rq.DMCC_6_2
          , userName = user
          , password = pass
          , sessionCleanupDelay = sessionDuration sopts
          , sessionID = fromMaybe T.empty sid
          , requestedSessionDuration = sessionDuration sopts
          }
        -- Start a session monitor to enable TransferMonitorObjects
        -- feature
        sessionMonitorReq proto =
          sendReq
          Rq.MonitorStart
          { acceptedProtocol = proto
          , monitorRq = Rq.Session
          }
      startRsp <- startReq old
      case (startRsp, old) of
        (Just Rs.StartApplicationSessionPosResponse{..}, _) -> do
          _ <- sessionMonitorReq actualProtocolVersion
          pure ((sessionID, actualSessionDuration), actualProtocolVersion)
        (Just Rs.StartApplicationSessionNegResponse, Just oldID) -> do
          -- The old session has expired, start from scratch
          startRsp' <- startReq Nothing
          case startRsp' of
            Just Rs.StartApplicationSessionPosResponse{..} -> do
              _ <- sessionMonitorReq actualProtocolVersion
              -- Transfer MonitorObjects from old session
              sendRequestAsyncRaw
                conn
                reconnect
                invoke
                Nothing
                Rq.TransferMonitorObjects
                { fromSessionID = oldID
                , toSessionID = sessionID
                , acceptedProtocol = actualProtocolVersion
                }
              pure ((sessionID, actualSessionDuration), actualProtocolVersion)
            _ -> throwIO ApplicationSessionFailed
        _ -> throwIO ApplicationSessionFailed

    -- Restart I/O and DMCC session. This routine returns when new I/O
    -- streams become available (starting DMCC session requires
    -- response reader thread to be functional).
    reconnect :: (MonadUnliftIO m, MonadBaseControl IO m, MonadLoggerIO m, MonadCatch m) => m ()
    reconnect = do
      logWarnN "Attempting reconnection"
      -- Only one reconnection at a time
      (oldId, cl) <- atomically $ do
        (oldId, _) <- takeTMVar sess
        (_, _, cl) <- takeTMVar conn
        pure (oldId, cl)
      -- Fail all pending synchronous requests
      atomically $ do
        srs <- readTVar syncResponses
        mapM_ (`putTMVar` Nothing) $ IntMap.elems srs
        writeTVar syncResponses IntMap.empty
      handle
        (\(e :: IOException) -> logErrorN $ "Failed to close old connection: " <> tshow e) $
        liftIO cl
      -- We do not change the protocol version during session recovery
      connect >>= atomically . putTMVar conn
      logWarnN "Connection re-established"
      let
        shdl (Right ()) = pure ()
        shdl (Left e) = throwIO e
      -- Fork new thread for DMCC session initialization. This is
      -- because 'reconnect' needs to return for response reader
      -- thread to start working again (which is required for DMCC
      -- session start).
      void $ flip forkFinally shdl $ do
        (newSession, _) <- startDMCCSession (Just oldId)
        atomically $ putTMVar sess newSession

  -- Read DMCC responses from socket
  msgChan <- newTChanIO
  let readExHandler e = do
        logErrorN $ "Read error: " <> tshow e
        reconnect
  readThread <-
    forkIO $ forever $ do
      (istream, _, _) <- atomically $ readTMVar conn
      handleNetwork readExHandler $
        Raw.readResponse istream >>=
        atomically . writeTChan msgChan . first DMCCRsp

  agents <- newTVarIO Map.empty

  -- Process parsed messages
  procThread <- forkIO $ forever $ do
    (msg, invokeId) <- atomically $ readTChan msgChan
    -- TODO Check agent locks?
    ags <- readTVarIO agents
    case msg of
      DMCCRsp rsp -> do
        -- Return response if the request was synchronous
        sync' <- atomically $ do
          srs <- readTVar syncResponses
          modifyTVar' syncResponses (IntMap.delete invokeId)
          pure $ IntMap.lookup invokeId srs
        case sync' of
          Just sync -> void $ atomically $ tryPutTMVar sync $ Just rsp
          Nothing -> pure ()
        -- Redirect events and request errors to matching agent
        ag' <- case rsp of
                 Rs.EventResponse monId _ ->
                   pure $ find (\a -> monId == monitorId a) $ Map.elems ags
                 Rs.CSTAErrorCodeResponse _ -> do
                   aid <- atomically $ do
                     ars <- readTVar agentRequests
                     modifyTVar' agentRequests (IntMap.delete invokeId)
                     pure $ IntMap.lookup invokeId ars
                   pure $ (`Map.lookup` ags) =<< aid
                 _ -> pure Nothing
        case ag' of
          Just ag -> atomically $ writeTChan (rspChan ag) rsp
          -- Error/event received for an unknown agent?
          Nothing -> pure ()
      _ -> pure ()

  -- Keep the session alive
  pingThread <- forkIO $ forever $ do
    -- Do not send a keep-alive message if the session is not ready
    (sid, duration) <- atomically $ readTMVar sess
    sendRequestAsyncRaw conn reconnect invoke Nothing
      Rq.ResetApplicationSessionTimer
      { sessionId = sid
      , requestedSessionDuration = duration
      }
    threadDelay $ duration * 500 * 1000

  let h = DMCCHandle
          conn
          sess
          reconnect
          pingThread
          readThread
          procThread
          invoke
          syncResponses
          agentRequests
          sopts

  -- Start the session
  connect >>= atomically . putTMVar conn
  (newSession, actualProtocolVersion) <- startDMCCSession Nothing
  atomically $ putTMVar sess newSession

  wh <- case whUrl of
          Just url -> do
            mgr <- liftIO $ HTTP.newManager HTTP.defaultManagerSettings
            req <- HTTP.parseUrlThrow url
            pure $ Just (req, mgr)
          Nothing -> pure Nothing

  agLocks <- newTVarIO Set.empty

  pure $
    Session
    actualProtocolVersion
    h
    wh
    agents
    agLocks


-- | TODO Agent releasing notice
stopSession :: (MonadUnliftIO m, MonadLoggerIO m, MonadBaseControl IO m, MonadCatch m)
            => Session -> m ()
stopSession as@Session{..} = do
  -- Release all agents
  ags <- readTVarIO agents
  (s, _) <- atomically $ readTMVar $ dmccSession dmccHandle
  forM_ (keys ags) $ releaseAgent . \aid -> AgentHandle (aid, as)

  sendRequestAsync dmccHandle Nothing Rq.StopApplicationSession{sessionID = s}

  -- TOOD Use async/throwTo instead
  killThread $ pingThread dmccHandle
  killThread $ procThread dmccHandle
  killThread $ readThread dmccHandle
  (_, ostream, cleanup) <- atomically $ readTMVar $ connection dmccHandle
  liftIO $ do
    write Nothing ostream
    cleanup


-- | Send a request and block until the response arrives or a write
-- exception occurs. No request is sent until a connection and an
-- application session become available. Write exceptions cause a
-- reconnection and a session restart, Nothing is returned in this
-- case.
--
-- This must not be used to for session setup as this requires an
-- active DMCC application session!
--
-- Write errors are made explicit here because 'sendRequestSync' is
-- called from multiple locations, making it tedious to install the
-- reconnection handler everywhere.
sendRequestSync :: (MonadUnliftIO m, MonadLoggerIO m, MonadBaseControl IO m, MonadCatch m)
                => DMCCHandle
                -> Maybe AgentId
                -- ^ Push erroneous responses to this agent's event
                -- processor.
                --
                -- TODO Disallow unknown agents on type level (use
                -- AgentHandle).
                -> Request
                -> m (Maybe Response)
sendRequestSync DMCCHandle{..} aid rq = do
  void $ atomically $ readTMVar dmccSession
  sendRequestSyncRaw
    connection
    reconnect
    invokeId
    syncResponses
    ((agentRequests, ) <$> aid)
    rq


sendRequestSyncRaw :: (MonadUnliftIO m, MonadLoggerIO m, MonadBaseControl IO m, MonadCatch m)
                   => TMVar ConnectionData
                   -- ^ Block until this connection becomes available.
                   -> m ()
                   -- ^ Reconnection action.
                   -> TVar Int
                   -> TVar (IntMap.IntMap (TMVar (Maybe Response)))
                   -- ^ Synchronous response storage.
                   -> Maybe (TVar (IntMap.IntMap AgentId), AgentId)
                   -- ^ Agent requests map and target agent id. When
                   -- provided, CSTAErrorCode response will be
                   -- directed to that agent's event processor.
                   -> Request
                   -> m (Maybe Response)
sendRequestSyncRaw connection re invoke srs ar !rq = do
  (ix, var, c@(_, ostream, _)) <- atomically $ do
    modifyTVar' invoke $ (`mod` 9999) . (+1)
    ix <- readTVar invoke
    var <- newEmptyTMVar
    modifyTVar' srs (IntMap.insert ix var)
    case ar of
      Just (ars, a) -> modifyTVar' ars (IntMap.insert ix a)
      Nothing -> pure ()
    c <- takeTMVar connection
    pure (ix, var, c)
  let
    srHandler e = do
      logErrorN $ "Write error: " <> tshow e
      atomically $ do
        putTMVar connection c
        putTMVar var Nothing
        modifyTVar' srs $ IntMap.delete ix
        case ar of
          Just (ars, _) -> modifyTVar' ars (IntMap.delete ix)
          Nothing -> pure ()
      re
  handleNetwork srHandler $ Raw.sendRequest ostream ix rq
  -- Release the connection at once and wait for response in a
  -- separate transaction.
  atomically $ putTMVar connection c
  atomically $ takeTMVar var


-- | Like 'sendRequestAsync', but do not wait for a result.
sendRequestAsync :: (MonadUnliftIO m, MonadLoggerIO m, MonadBaseControl IO m, MonadCatch m)
                 => DMCCHandle
                 -> Maybe AgentId
                 -- ^ Push erroneous responses to this agent's event
                 -- processor.
                 -> Request
                 -> m ()
sendRequestAsync DMCCHandle{..} aid rq = do
  _ <- atomically $ readTMVar dmccSession
  sendRequestAsyncRaw
    connection
    reconnect
    invokeId
    ((agentRequests, ) <$> aid)
    rq


sendRequestAsyncRaw :: (MonadUnliftIO m, MonadLoggerIO m, MonadCatch m)
                    => TMVar ConnectionData
                    -- ^ Block until this connection becomes available.
                    -> m ()
                    -- ^ Reconnection action.
                    -> TVar Int
                    -> Maybe (TVar (IntMap.IntMap AgentId), AgentId)
                    -> Request
                    -> m ()
sendRequestAsyncRaw connection re invoke ar !rq = do
  (ix, c@(_, ostream, _)) <- atomically $ do
    modifyTVar' invoke $ (`mod` 9999) . (+1)
    ix <- readTVar invoke
    case ar of
      Just (ars, a) -> modifyTVar' ars (IntMap.insert ix a)
      Nothing -> pure ()
    c <- takeTMVar connection
    pure (ix, c)
  let
    srHandler e = do
      logErrorN $ "Write error: " <> tshow e
      atomically $ do
        putTMVar connection c
        case ar of
          Just (ars, _) -> modifyTVar' ars (IntMap.delete ix)
          Nothing -> pure ()
      re
  handleNetwork srHandler $ Raw.sendRequest ostream ix rq
  atomically $ putTMVar connection c


-- | Handle network-related errors we know of.
handleNetwork :: forall a m. (MonadUnliftIO m, MonadLoggerIO m, MonadCatch m)
              => (forall e. (Exception e, Show e) => (e -> m a))
              -- ^ Exception handler.
              -> m a
              -> m a
handleNetwork handler action = action `catches`
  [ Handler (\(e :: ReadTooShortException) -> handler e)
  , Handler (\(e :: IOException) -> handler e)
  , Handler (\(e :: SSL.ConnectionAbruptlyTerminated) -> handler e)
  , Handler (\(e :: SSL.ProtocolError) -> handler e)
  ]