Copyright | (c) Alexey Radkov 2019-2020 |
---|---|
License | BSD-style |
Maintainer | alexey.radkov@gmail.com |
Stability | experimental |
Portability | non-portable (requires Template Haskell) |
Safe Haskell | None |
Language | Haskell98 |
An aggregate service from the more extra tools collection for nginx-haskell-module.
Synopsis
- data AggregateServerConf
- ngxExportAggregateService :: String -> Name -> Q [Dec]
- reportAggregate :: ToJSON a => Int -> Maybe a -> ByteString -> IO ()
- newtype CInt = CInt Int32
- newtype CUInt = CUInt Word32
The typed service exporter
An aggregate service collects custom typed data reported by worker processes
and sends this via HTTP when requested. This is an ignitionService
in terms
of module NgxExport.Tools, which means that it starts upon the startup of
the worker process and runs until termination of the worker. Internally, an
aggregate service starts an HTTP server implemented via the Snap
framework, which serves incoming requests from
worker processes (collecting data) as well as from the Nginx server's
clients (reporting collected data for administration purpose).
Below is a simple example.
File test_tools_extra_aggregate.hs
{-# LANGUAGE TemplateHaskell, DeriveGeneric, TypeApplications #-} {-# LANGUAGE OverloadedStrings, BangPatterns #-} module TestToolsExtraAggregate where import NgxExport import NgxExport.Tools import NgxExport.Tools.Aggregate import Data.ByteString (ByteString) import qualified Data.ByteString.Lazy.Char8 as C8L import Data.Aeson import Data.Maybe import Data.IORef import System.IO.Unsafe import GHC.Generics data Stats = Stats { bytesSent :: Int , requests :: Int , meanBytesSent :: Int } deriving Generic instance FromJSON Stats instance ToJSON Stats stats :: IORef Stats stats = unsafePerformIO $ newIORef $ Stats 0 0 0 {-# NOINLINE stats #-} updateStats :: ByteString -> IO C8L.ByteString updateStats s = do let cbs =readFromByteString
@Int s modifyIORef' stats $ \(Stats bs rs _) -> let !nbs = bs + fromMaybe 0 cbs !nrs = rs + 1 !nmbs = nbs `div` nrs in Stats nbs nrs nmbs return ""ngxExportIOYY
'updateStats reportStats :: Int -> Bool -> IO C8L.ByteString reportStats =deferredService
$ \port -> do s <- readIORef statsreportAggregate
port (Just s) "stats" return ""ngxExportSimpleServiceTyped
'reportStats ''Int $PersistentService
$ Just $ Sec 5ngxExportAggregateService
"stats" ''Stats
Here, on the bottom line, aggregate service stats is declared. It expects
from worker processes reports in JSON format with data of type Stats which
includes the number of bytes sent so far, the number of client requests, and
the mean value of bytes sent per a single request. Its own configuration
(a TCP port and the purge interval) shall be defined in the Nginx
configuration file. The reports from worker processes are sent from a
deferredService
reportStats every 5 seconds: it merely reads data
collected in a global IORef stats and then sends this to the aggregate
service using reportAggregate
. Handler updateStats updates the stats
on every run. It accepts a ByteString from Nginx, then converts it to an
Int value and interprets this as the number of bytes sent in the current
request. It also increments the number or requests and calculates the mean
value of bytes sent in all requests to this worker so far. Notice that all
the parts of stats are evaluated strictly, it is important!
File nginx.conf
user nobody; worker_processes 2; events { worker_connections 1024; } http { default_type application/octet-stream; sendfile on; haskell load /var/lib/nginx/test_tools_extra_aggregate.so; haskell_run_service simpleService_aggregate_stats $hs_stats 'AggregateServerConf { asPort = 8100, asPurgeInterval = Min 5 }'; haskell_service_var_in_shm stats 32k /tmp $hs_stats; haskell_run_service simpleService_reportStats $hs_reportStats 8100; server { listen 8010; server_name main; error_log /tmp/nginx-test-haskell-error.log; access_log /tmp/nginx-test-haskell-access.log; haskell_run updateStats !$hs_updateStats $bytes_sent; location / { echo Ok; } } server { listen 8020; server_name stat; location / { allow 127.0.0.1; deny all; proxy_pass http://127.0.0.1:8100/get/stats; } } }
The aggregate service stats must be referred from the Nginx configuration
file with prefix simpleService_aggregate_. Its configuration is typed,
the type is AggregateServerConf
. Though its only constructor
AggregateServerConf is not exported from this module, the service is still
configurable from an Nginx configuration. Here, the aggregate service listens
on TCP port 8100, and its purge interval is 5 minutes. Notice that an
aggregate service must be shared (here, variable $hs_stats is declared as
shared with Nginx directive haskell_service_var_in_shm), otherwise it won't
even start because the internal HTTP servers on each worker process won't be
able to bind to the same TCP port. Inside the upper server clause, handler
updateStats runs on every client request. This handler always returns an
empty string in variable $hs_updateStats because it is only needed for the
side effect of updating the stats. However, as soon as Nginx variable
handlers are lazy, evaluation of $hs_updateStats must be forced somehow.
To achieve this, we used the strict annotation (the bang symbol) in
directive haskell_run that enforces strict evaluation in a late request
processing phase, when the value of variable $bytes_sent has been already
calculated.
Data collected by the aggregate service can be obtained in a request to the virtual server listening on TCP port 8020. It simply proxies requests to the internal aggregate server with URL /get/stats where stats corresponds to the name of the aggregate service.
A simple test
As far as reportStats is a deferred service, we won't get useful data in 5 seconds after Nginx start.
$ curl 'http://127.0.0.1:8020/' | jq [ "1970-01-01T00:00:00Z", {} ]
However, later we should get some useful data.
$ curl 'http://127.0.0.1:8020/' | jq [ "2019-04-22T14:19:04Z", { "5910": [ "2019-04-22T14:19:19Z", { "bytesSent": 0, "requests": 0, "meanBytesSent": 0 } ], "5911": [ "2019-04-22T14:19:14Z", { "bytesSent": 0, "requests": 0, "meanBytesSent": 0 } ] } ]
Here we have collected stats from the two Nginx worker processes with PIDs 5910 and 5911. The timestamps show when the stats was updated the last time. The topmost timestamp shows the time of the latest purge event. The data itself have only zeros as soon we have made no request to the main server so far. Let's run 100 simultaneous requests and look at the stats (it should update at worst in 5 seconds after running them).
$ for i in {1..100} ; do curl 'http://127.0.0.1:8010/' & done
Wait 5 seconds...
$ curl 'http://127.0.0.1:8020/' | jq [ "2019-04-22T14:29:04Z", { "5910": [ "2019-04-22T14:31:34Z", { "bytesSent": 17751, "requests": 97, "meanBytesSent": 183 } ], "5911": [ "2019-04-22T14:31:31Z", { "bytesSent": 549, "requests": 3, "meanBytesSent": 183 } ] } ]
data AggregateServerConf Source #
Configuration of an aggregate service.
This type is exported because Template Haskell requires that. Though its only constructor AggregateServerConf is not exported, it is still reachable from Nginx configuration files. Below is definition of the constructor.
AggregateServerConf { asPort :: Int
, asPurgeInterval :: TimeInterval
}
The value of asPort corresponds to the TCP port of the internal aggregate server. The asPurgeInterval is the purge interval. An aggregate service should sometimes purge data from worker processes which did not report for a long time. For example, it makes no sense to keep data from workers that have already been terminated. The inactive PIDs get checked every asPurgeInterval, and data which correspond to PIDs with timestamps older than asPurgeInterval get removed.
Be aware that due to limitations of Template Haskell, this name must be imported unqualified!
Instances
ngxExportAggregateService Source #
Exports a simple aggregate service with specified name and the aggregate type.
The name of the service can be chosen arbitrarily, however it must be
exactly referred from reportAggregate
and client requests to the service
because the URL of the internal HTTP server contains this.
The aggregate type must have instances of FromJSON
and ToJSON
as its
objects will be transferred via HTTP in JSON format.
The service is implemented via ngxExportSimpleServiceTyped
with
AggregateServerConf
as the name of its custom type. This is an
ignitionService
with an HTTP server based on the Snap
framework running inside. The internal HTTP
server collects data from worker processes on URL
/put/<name_of_the_service> and reports data on URL
/get/<name_of_the_service>.
The worker-side reporter
:: ToJSON a | |
=> Int | Port of the aggregate server |
-> Maybe a | Reported data |
-> ByteString | Name of the aggregate service |
-> IO () |
Reports data to an aggregate service.
If reported data is Nothing
then data collected on the aggregate service
won't alter except that the timestamp associated with the PID of the sending
worker process will be updated.
Re-exported data constructors from Foreign.C
Re-exports are needed by the exporter for marshalling in foreign calls.
Haskell type representing the C int
type.
Instances
Haskell type representing the C unsigned int
type.