{-# LANGUAGE TemplateHaskell, OverloadedStrings, BangPatterns #-}

-----------------------------------------------------------------------------
-- |
-- Module      :  NgxExport.Tools.Aggregate
-- Copyright   :  (c) Alexey Radkov 2019
-- License     :  BSD-style
--
-- Maintainer  :  alexey.radkov@gmail.com
-- Stability   :  experimental
-- Portability :  non-portable (requires Template Haskell)
--
-- An aggregate service from the more extra tools collection for
-- <http://github.com/lyokha/nginx-haskell-module nginx-haskell-module>.
--
-----------------------------------------------------------------------------


module NgxExport.Tools.Aggregate (
    -- * The typed service exporter
    -- $aggregateServiceExporter
                                  AggregateServerConf
                                 ,ngxExportAggregateService
    -- * The worker-side reporter
                                 ,reportAggregate
    -- * Re-exported data constructors from /Foreign.C/
    -- | Re-exports are needed by the exporter for marshalling in foreign calls.
                                 ,Foreign.C.Types.CInt (..)
                                 ,Foreign.C.Types.CUInt (..)
                                 ) where

import           NgxExport.Tools

import           Language.Haskell.TH
import           Network.HTTP.Client
import           Foreign.C.Types
import qualified Data.ByteString as B
import qualified Data.ByteString.Char8 as C8
import           Data.ByteString (ByteString)
import qualified Data.ByteString.Lazy as L
import           Data.Map.Strict (Map)
import qualified Data.Map.Strict as M
import           Data.IORef
import           Data.Int
import qualified Data.Text as T
import qualified Data.Text.Encoding as T
import           Data.Time.Clock.POSIX
import           Data.Aeson
import           Data.Maybe
import           Control.Monad
import           Control.Monad.IO.Class
import           Control.Arrow
import           Control.Exception
import           Control.Exception.Enclosed (handleAny)
import           System.IO.Unsafe
import           Snap.Http.Server
import           Snap.Core

type Aggregate a = IORef (CTime, Map Int32 (CTime, Maybe a))

-- $aggregateServiceExporter
--
-- An aggregate service collects custom typed data reported by worker processes
-- and sends this via HTTP when requested. This is an 'ignitionService' in terms
-- of module "NgxExport.Tools", which means that it starts upon the startup of
-- the worker process and runs until termination of the worker. Internally, an
-- aggregate service starts an HTTP server implemented via the [Snap
-- framework](http://snapframework.com/), which serves incoming requests from
-- worker processes (collecting data) as well as from the Nginx server's
-- clients (reporting collected data for administration purpose).
--
-- Below is a simple example.
--
-- ==== File /test_tools_extra.hs/
-- @
-- {-\# LANGUAGE TemplateHaskell, DeriveGeneric, TypeApplications \#-}
-- {-\# LANGUAGE OverloadedStrings, BangPatterns \#-}
--
-- module TestToolsExtra where
--
-- import           NgxExport
-- import           NgxExport.Tools
-- import           NgxExport.Tools.Aggregate
--
-- import           Data.ByteString (ByteString)
-- import qualified Data.ByteString.Lazy.Char8 as C8L
-- import           Data.Aeson
-- import           Data.Maybe
-- import           Data.IORef
-- import           System.IO.Unsafe
-- import           GHC.Generics
--
-- data Stats = Stats { bytesSent :: Int
--                    , requests :: Int
--                    , meanBytesSent :: Int
--                    } deriving Generic
-- instance FromJSON Stats
-- instance ToJSON Stats
--
-- stats :: IORef Stats
-- stats = unsafePerformIO $ newIORef $ Stats 0 0 0
-- {-\# NOINLINE stats \#-}
--
-- updateStats :: ByteString -> IO C8L.ByteString
-- __/updateStats/__ s = do
--     let cbs = 'readFromByteString' \@Int s
--     modifyIORef\' stats $ \\(Stats bs rs _) ->
--         let !nbs = bs + fromMaybe 0 cbs
--             !nrs = rs + 1
--             !nmbs = nbs \`div\` nrs
--         in Stats nbs nrs nmbs
--     return \"\"
-- 'NgxExport.ngxExportIOYY' \'updateStats
--
-- reportStats :: Int -> Bool -> IO C8L.ByteString
-- __/reportStats/__ = 'deferredService' $ \\port -> do
--     s <- readIORef stats
--     'reportAggregate' port (Just s) \"__/stats/__\"
--     return \"\"
-- 'ngxExportSimpleServiceTyped' \'reportStats \'\'Int $
--     'PersistentService' $ Just $ Sec 5
--
-- 'ngxExportAggregateService' \"__/stats/__\" \'\'Stats
-- @
--
-- Here, on the bottom line, aggregate service /stats/ is declared. It expects
-- from worker processes reports in JSON format with data of type /Stats/ which
-- includes the number of bytes sent so far, the number of client requests, and
-- the mean value of bytes sent per a single request. Its own configuration
-- (a TCP port and the /purge interval/) shall be defined in the Nginx
-- configuration file. The reports from worker processes are sent from a
-- 'deferredService' /reportStats/ every 5 seconds: it merely reads data
-- collected in a global IORef /stats/ and then sends this to the aggregate
-- service using 'reportAggregate'. Handler /updateStats/ updates the /stats/
-- on every run. It accepts a /ByteString/ from Nginx, then converts it to an
-- /Int/ value and interprets this as the number of bytes sent in the current
-- request. It also increments the number or requests and calculates the mean
-- value of bytes sent in all requests to this worker so far. Notice that all
-- the parts of /stats/ are evaluated /strictly/, it is important!
--
-- ==== File /nginx.conf/
-- @
-- user                    nobody;
-- worker_processes        2;
--
-- events {
--     worker_connections  1024;
-- }
--
-- http {
--     default_type        application\/octet-stream;
--     sendfile            on;
--
--     haskell load \/var\/lib\/nginx\/test_tools_extra.so;
--
--     haskell_run_service __/simpleService_aggregate_stats/__ $hs_stats
--             \'__/AggregateServerConf/__ { __/asPort/__ = 8100, __/asPurgeInterval/__ = Min 5 }\';
--
--     haskell_service_var_in_shm stats 32k \/tmp $hs_stats;
--
--     haskell_run_service __/simpleService_reportStats/__ $hs_reportStats 8100;
--
--     server {
--         listen       8010;
--         server_name  main;
--         error_log    \/tmp\/nginx-test-haskell-error.log;
--         access_log   \/tmp\/nginx-test-haskell-access.log;
--
--         haskell_run __/updateStats/__ !$hs_updateStats $bytes_sent;
--
--         location \/ {
--             echo Ok;
--         }
--     }
--
--     server {
--         listen       8020;
--         server_name  stat;
--
--         location \/ {
--             allow 127.0.0.1;
--             deny all;
--             proxy_pass http:\/\/127.0.0.1:8100\/get\/__/stats/__;
--         }
--     }
-- }
-- @
--
-- The aggregate service /stats/ must be referred from the Nginx configuration
-- file with prefix __/simpleService_aggregate_/__. Its configuration is typed,
-- the type is 'AggregateServerConf'. Though its only constructor
-- /AggregateServerConf/ is not exported from this module, the service is still
-- configurable from an Nginx configuration. Here, the aggregate service listens
-- on TCP port /8100/, and its /purge interval/ is 5 minutes. Notice that an
-- aggregate service must be /shared/ (here, variable /$hs_stats/ is declared as
-- shared with Nginx directive /haskell_service_var_in_shm/), otherwise it won't
-- even start because the internal HTTP servers on each worker process won't be
-- able to bind to the same TCP port. Inside the upper /server/ clause, handler
-- /updateStats/ runs on every client request. This handler always returns an
-- empty string in variable /$hs_updateStats/ because it is only needed for the
-- side effect of updating the /stats/. However, as soon as Nginx variable
-- handlers are /lazy/, evaluation of /$hs_updateStats/ must be forced somehow.
-- To achieve this, we used the /strict annotation/ (the /bang/ symbol) in
-- directive /haskell_run/ that enforces strict evaluation in a late request
-- processing phase, when the value of variable /$bytes_sent/ has been already
-- calculated.
--
-- Data collected by the aggregate service can be obtained in a request to the
-- virtual server listening on TCP port /8020/. It simply proxies requests to
-- the internal aggregate server with URL /\/get\/__stats__/ where __/stats/__
-- corresponds to the /name/ of the aggregate service.
--
-- ==== A simple test
-- As far as /reportStats/ is a deferred service, we won't get useful data in 5
-- seconds after Nginx start.
--
-- > $ curl 'http://127.0.0.1:8020/' | jq
-- > [
-- >   "1970-01-01T00:00:00Z",
-- >   {}
-- > ]
--
-- However, later we should get some useful data.
--
-- > $ curl 'http://127.0.0.1:8020/' | jq
-- > [
-- >   "2019-04-22T14:19:04Z",
-- >   {
-- >     "5910": [
-- >       "2019-04-22T14:19:19Z",
-- >       {
-- >         "bytesSent": 0,
-- >         "requests": 0,
-- >         "meanBytesSent": 0
-- >       }
-- >     ],
-- >     "5911": [
-- >       "2019-04-22T14:19:14Z",
-- >       {
-- >         "bytesSent": 0,
-- >         "requests": 0,
-- >         "meanBytesSent": 0
-- >       }
-- >     ]
-- >   }
-- > ]
--
-- Here we have collected stats from the two Nginx worker processes with /PIDs/
-- /5910/ and /5911/. The timestamps show when the stats was updated the last
-- time. The topmost timestamp shows the time of the latest /purge/ event. The
-- data itself have only zeros as soon we have made no request to the main
-- server so far. Let's run 100 simultaneous requests and look at the stats (it
-- should update at worst in 5 seconds after running them).
--
-- > $ for i in {1..100} ; do curl 'http://127.0.0.1:8010/' & done
--
-- Wait 5 seconds...
--
-- > $ curl 'http://127.0.0.1:8020/' | jq
-- > [
-- >   "2019-04-22T14:29:04Z",
-- >   {
-- >     "5910": [
-- >       "2019-04-22T14:31:34Z",
-- >       {
-- >         "bytesSent": 17751,
-- >         "requests": 97,
-- >         "meanBytesSent": 183
-- >       }
-- >     ],
-- >     "5911": [
-- >       "2019-04-22T14:31:31Z",
-- >       {
-- >         "bytesSent": 549,
-- >         "requests": 3,
-- >         "meanBytesSent": 183
-- >       }
-- >     ]
-- >   }
-- > ]

-- | Configuration of an aggregate service.
--
-- This type is exported because Template Haskell requires that. Though its
-- only constructor /AggregateServerConf/ is not exported, it is still reachable
-- from Nginx configuration files. Below is definition of the constructor.
--
-- @
--     AggregateServerConf { asPort          :: Int
--                         , asPurgeInterval :: 'TimeInterval'
--                         }
-- @
--
-- The value of /asPort/ corresponds to the TCP port of the internal aggregate
-- server. The /asPurgeInterval/ is the /purge/ interval. An aggregate service
-- should sometimes purge data from worker processes which did not report for a
-- long time. For example, it makes no sense to keep data from workers that
-- have already been terminated. The inactive PIDs get checked every
-- /asPurgeInterval/, and data which correspond to PIDs with timestamps older
-- than /asPurgeInterval/ get removed.
--
-- Be aware that due to limitations of Template Haskell, this name must be
-- imported unqualified!
data AggregateServerConf =
    AggregateServerConf { asPort          :: Int
                        , asPurgeInterval :: TimeInterval
                        } deriving Read

aggregateServer :: (FromJSON a, ToJSON a) =>
    Aggregate a -> ByteString -> AggregateServerConf -> Bool -> IO L.ByteString
aggregateServer a u = ignitionService $ \conf ->
    simpleHttpServe (asConfig $ asPort conf) (asHandler a u conf) >> return ""

asConfig :: Int -> Config Snap a
asConfig p = setPort p
           $ setBind "127.0.0.1"
           $ setAccessLog ConfigNoLog
           $ setErrorLog ConfigNoLog
           $ setVerbose False mempty

asHandler :: (FromJSON a, ToJSON a) =>
    Aggregate a -> ByteString -> AggregateServerConf -> Snap ()
asHandler a u conf =
    route [(B.append "put/" u, Snap.Core.method POST $ receiveAggregate a conf)
          ,(B.append "get/" u, Snap.Core.method GET $ sendAggregate a)
          ]

receiveAggregate :: FromJSON a =>
    Aggregate a -> AggregateServerConf -> Snap ()
receiveAggregate a conf =
    handleAggregateExceptions "Exception while receiving aggregate" $ do
        !s <- decode' <$> readRequestBody 65536
        when (isNothing s) $ liftIO $ throwUserError "Unreadable aggregate!"
        liftIO $ do
            let (pid, v) = fromJust s
                int = fromIntegral . toSec . asPurgeInterval $ conf
            !t <- ngxNow
            atomicModifyIORef' a $
                \(t', v') ->
                    (let (!tn, f) =
                             if t - t' >= int
                                 then (t, M.filter $ \(t'', _) -> t - t'' < int)
                                 else (t', id)
                         !vn = f $ M.alter
                                   (\old ->
                                       let !new' =
                                               if isNothing old || isJust v
                                                   then v
                                                   else snd $ fromJust old
                                       in Just (t, new')
                                   ) pid v'
                     in (tn, vn)
                    ,()
                    )
        finishWith emptyResponse

sendAggregate :: ToJSON a => Aggregate a -> Snap ()
sendAggregate a =
    handleAggregateExceptions "Exception while sending aggregate" $ do
        s <- liftIO $ readIORef a
        modifyResponse $ setContentType "application/json"
        writeLBS $ encode $ (toUTCTime *** M.map (first toUTCTime)) s
    where toUTCTime (CTime t) = posixSecondsToUTCTime $ fromIntegral t

handleAggregateExceptions :: String -> Snap () -> Snap ()
handleAggregateExceptions cmsg = handleAny $ \e ->
    writeErrorResponse 500 $ show (e :: SomeException)
    where writeErrorResponse c msg = do
              modifyResponse $ setResponseStatus c $ T.encodeUtf8 $ T.pack cmsg
              writeBS $ T.encodeUtf8 $ T.pack msg

throwUserError :: String -> IO a
throwUserError = ioError . userError

-- | Exports a simple aggregate service with specified name and the aggregate
--   type.
--
-- The name of the service can be chosen arbitrarily, however it must be
-- exactly referred from 'reportAggregate' and client requests to the service
-- because the URL of the internal HTTP server contains this.
--
-- The aggregate type must have instances of 'FromJSON' and 'ToJSON' as its
-- objects will be transferred via HTTP in JSON format.
--
-- The service is implemented via 'ngxExportSimpleServiceTyped' with
-- 'AggregateServerConf' as the name of its custom type. This is an
-- 'ignitionService' with an HTTP server based on the [Snap
-- framework](http://snapframework.com/) running inside. The internal HTTP
-- server collects data from worker processes on URL
-- /\/put\/__\<name_of_the_service\>__/ and reports data on URL
-- /\/get\/__\<name_of_the_service\>__/.
ngxExportAggregateService :: String       -- ^ Name of the service
                          -> Name         -- ^ Name of the aggregate type
                          -> Q [Dec]
ngxExportAggregateService f a = do
    let nameF = 'aggregateServer
        fName = mkName $ "aggregate_" ++ f
        sName = mkName $ "aggregate_storage_" ++ f
        uName = mkName $ "aggregate_url_" ++ f
    concat <$> sequence
        [sequence
            [sigD uName [t|ByteString|]
            ,funD uName [clause [] (normalB [|C8.pack f|]) []]
            ,sigD sName [t|Aggregate $(conT a)|]
            ,funD sName
                [clause []
                    (normalB [|unsafePerformIO $ newIORef (0, M.empty)|])
                    []
                ]
            ,pragInlD sName NoInline FunLike AllPhases
            ,sigD fName [t|AggregateServerConf -> Bool -> IO L.ByteString|]
            ,funD fName
                [clause []
                    (normalB [|$(varE nameF) $(varE sName) $(varE uName)|])
                    []
                ]
            ]
        -- FIXME: name AggregateServerConf must be imported from the user's
        -- module unqualified (see details in NgxExport/Tools.hs, function
        -- ngxExportSimpleService')!
        ,ngxExportSimpleServiceTyped
            fName ''AggregateServerConf SingleShotService
        ]
-- | Reports data to an aggregate service.
--
-- If reported data is 'Nothing' then data collected on the aggregate service
-- won't alter except that the timestamp associated with the PID of the sending
-- worker process will be updated.
reportAggregate :: ToJSON a => Int          -- ^ Port of the aggregate server
                            -> Maybe a      -- ^ Reported data
                            -> ByteString   -- ^ Name of the aggregate service
                            -> IO ()
reportAggregate p v u =
    handle (const $ return () :: SomeException -> IO ()) $ do
        req <- parseRequest "POST http://127.0.0.1"
        pid <- fromIntegral <$> ngxPid :: IO Int32
        let !req' = req { requestBody = RequestBodyLBS $ encode (pid, v)
                        , port = p
                        , Network.HTTP.Client.path = B.append "put/" u
                        }
        void $ httpNoBody req' httpManager

httpManager :: Manager
httpManager = unsafePerformIO $ newManager defaultManagerSettings
{-# NOINLINE httpManager #-}