ngx-export-tools-extra-0.5.5.1: More extra tools for Nginx haskell module
Copyright(c) Alexey Radkov 2019-2020
LicenseBSD-style
Maintaineralexey.radkov@gmail.com
Stabilityexperimental
Portabilitynon-portable (requires Template Haskell)
Safe HaskellNone
LanguageHaskell2010

NgxExport.Tools.Aggregate

Description

An aggregate service from the more extra tools collection for nginx-haskell-module.

Synopsis

The typed service exporter

An aggregate service collects custom typed data reported by worker processes and sends this via HTTP when requested. This is an ignitionService in terms of module NgxExport.Tools, which means that it starts upon the startup of the worker process and runs until termination of the worker. Internally, an aggregate service starts an HTTP server implemented via the Snap framework, which serves incoming requests from worker processes (collecting data) as well as from the Nginx server's clients (reporting collected data for administration purpose).

Below is a simple example.

File test_tools_extra_aggregate.hs

{-# LANGUAGE TemplateHaskell, DeriveGeneric, TypeApplications #-}
{-# LANGUAGE OverloadedStrings, BangPatterns #-}

module TestToolsExtraAggregate where

import           NgxExport
import           NgxExport.Tools
import           NgxExport.Tools.Aggregate

import           Data.ByteString (ByteString)
import qualified Data.ByteString.Lazy.Char8 as C8L
import           Data.Aeson
import           Data.Maybe
import           Data.IORef
import           System.IO.Unsafe
import           GHC.Generics

data Stats = Stats { bytesSent :: Int
                   , requests :: Int
                   , meanBytesSent :: Int
                   } deriving Generic
instance FromJSON Stats
instance ToJSON Stats

stats :: IORef Stats
stats = unsafePerformIO $ newIORef $ Stats 0 0 0
{-# NOINLINE stats #-}

updateStats :: ByteString -> IO C8L.ByteString
updateStats s = do
    let cbs = readFromByteString @Int s
    modifyIORef' stats $ \(Stats bs rs _) ->
        let !nbs = bs + fromMaybe 0 cbs
            !nrs = rs + 1
            !nmbs = nbs `div` nrs
        in Stats nbs nrs nmbs
    return ""
ngxExportIOYY 'updateStats

reportStats :: Int -> Bool -> IO C8L.ByteString
reportStats = deferredService $ \port -> do
    s <- readIORef stats
    reportAggregate port (Just s) "stats"
    return ""
ngxExportSimpleServiceTyped 'reportStats ''Int $
    PersistentService $ Just $ Sec 5

ngxExportAggregateService "stats" ''Stats

Here, on the bottom line, aggregate service stats is declared. It expects from worker processes reports in JSON format with data of type Stats which includes the number of bytes sent so far, the number of client requests, and the mean value of bytes sent per a single request. Its own configuration (a TCP port and the purge interval) shall be defined in the Nginx configuration file. The reports from worker processes are sent from a deferredService reportStats every 5 seconds: it merely reads data collected in a global IORef stats and then sends this to the aggregate service using reportAggregate. Handler updateStats updates the stats on every run. It accepts a ByteString from Nginx, then converts it to an Int value and interprets this as the number of bytes sent in the current request. It also increments the number or requests and calculates the mean value of bytes sent in all requests to this worker so far. Notice that all the parts of stats are evaluated strictly, it is important!

File nginx.conf

user                    nobody;
worker_processes        2;

events {
    worker_connections  1024;
}

http {
    default_type        application/octet-stream;
    sendfile            on;

    haskell load /var/lib/nginx/test_tools_extra_aggregate.so;

    haskell_run_service simpleService_aggregate_stats $hs_stats
            'AggregateServerConf { asPort = 8100, asPurgeInterval = Min 5 }';

    haskell_service_var_in_shm stats 32k /tmp $hs_stats;

    haskell_run_service simpleService_reportStats $hs_reportStats 8100;

    server {
        listen       8010;
        server_name  main;
        error_log    /tmp/nginx-test-haskell-error.log;
        access_log   /tmp/nginx-test-haskell-access.log;

        haskell_run updateStats !$hs_updateStats $bytes_sent;

        location / {
            echo Ok;
        }
    }

    server {
        listen       8020;
        server_name  stat;

        location / {
            allow 127.0.0.1;
            deny all;
            proxy_pass http://127.0.0.1:8100/get/stats;
        }
    }
}

The aggregate service stats must be referred from the Nginx configuration file with prefix simpleService_aggregate_. Its configuration is typed, the type is AggregateServerConf. Though its only constructor AggregateServerConf is not exported from this module, the service is still configurable from an Nginx configuration. Here, the aggregate service listens on TCP port 8100, and its purge interval is 5 minutes. Notice that an aggregate service must be shared (here, variable $hs_stats is declared as shared with Nginx directive haskell_service_var_in_shm), otherwise it won't even start because the internal HTTP servers on each worker process won't be able to bind to the same TCP port. Inside the upper server clause, handler updateStats runs on every client request. This handler always returns an empty string in variable $hs_updateStats because it is only needed for the side effect of updating the stats. However, as soon as Nginx variable handlers are lazy, evaluation of $hs_updateStats must be forced somehow. To achieve this, we used the strict annotation (the bang symbol) in directive haskell_run that enforces strict evaluation in a late request processing phase, when the value of variable $bytes_sent has been already calculated.

Data collected by the aggregate service can be obtained in a request to the virtual server listening on TCP port 8020. It simply proxies requests to the internal aggregate server with URL /get/stats where stats corresponds to the name of the aggregate service.

A simple test

As far as reportStats is a deferred service, we won't get useful data in 5 seconds after Nginx start.

$ curl 'http://127.0.0.1:8020/' | jq
[
  "1970-01-01T00:00:00Z",
  {}
]

However, later we should get some useful data.

$ curl 'http://127.0.0.1:8020/' | jq
[
  "2019-04-22T14:19:04Z",
  {
    "5910": [
      "2019-04-22T14:19:19Z",
      {
        "bytesSent": 0,
        "requests": 0,
        "meanBytesSent": 0
      }
    ],
    "5911": [
      "2019-04-22T14:19:14Z",
      {
        "bytesSent": 0,
        "requests": 0,
        "meanBytesSent": 0
      }
    ]
  }
]

Here we have collected stats from the two Nginx worker processes with PIDs 5910 and 5911. The timestamps show when the stats was updated the last time. The topmost timestamp shows the time of the latest purge event. The data itself have only zeros as soon we have made no request to the main server so far. Let's run 100 simultaneous requests and look at the stats (it should update at worst in 5 seconds after running them).

$ for i in {1..100} ; do curl 'http://127.0.0.1:8010/' & done

Wait 5 seconds...

$ curl 'http://127.0.0.1:8020/' | jq
[
  "2019-04-22T14:29:04Z",
  {
    "5910": [
      "2019-04-22T14:31:34Z",
      {
        "bytesSent": 17751,
        "requests": 97,
        "meanBytesSent": 183
      }
    ],
    "5911": [
      "2019-04-22T14:31:31Z",
      {
        "bytesSent": 549,
        "requests": 3,
        "meanBytesSent": 183
      }
    ]
  }
]

data AggregateServerConf Source #

Configuration of an aggregate service.

This type is exported because Template Haskell requires that. Though its only constructor AggregateServerConf is not exported, it is still reachable from Nginx configuration files. Below is definition of the constructor.

    AggregateServerConf { asPort          :: Int
                        , asPurgeInterval :: TimeInterval
                        }

The value of asPort corresponds to the TCP port of the internal aggregate server. The asPurgeInterval is the purge interval. An aggregate service should sometimes purge data from worker processes which did not report for a long time. For example, it makes no sense to keep data from workers that have already been terminated. The inactive PIDs get checked every asPurgeInterval, and data which correspond to PIDs with timestamps older than asPurgeInterval get removed.

Be aware that due to limitations of Template Haskell, this name must be imported unqualified!

ngxExportAggregateService Source #

Arguments

:: String

Name of the service

-> Name

Name of the aggregate type

-> Q [Dec] 

Exports a simple aggregate service with specified name and the aggregate type.

The name of the service can be chosen arbitrarily, however it must be exactly referred from reportAggregate and client requests to the service because the URL of the internal HTTP server contains this.

The aggregate type must have instances of FromJSON and ToJSON as its objects will be transferred via HTTP in JSON format.

The service is implemented via ngxExportSimpleServiceTyped with AggregateServerConf as the name of its custom type. This is an ignitionService with an HTTP server based on the Snap framework running inside. The internal HTTP server collects data from worker processes on URL /put/<name_of_the_service> and reports data on URL /get/<name_of_the_service>.

The worker-side reporter

reportAggregate Source #

Arguments

:: ToJSON a 
=> Int

Port of the aggregate server

-> Maybe a

Reported data

-> ByteString

Name of the aggregate service

-> IO () 

Reports data to an aggregate service.

If reported data is Nothing then data collected on the aggregate service won't alter except that the timestamp associated with the PID of the sending worker process will be updated.

Re-exported data constructors from Foreign.C

Re-exports are needed by the exporter for marshalling in foreign calls.

newtype CInt #

Haskell type representing the C int type.

Constructors

CInt Int32 

Instances

Instances details
Bounded CInt 
Instance details

Defined in Foreign.C.Types

Enum CInt 
Instance details

Defined in Foreign.C.Types

Methods

succ :: CInt -> CInt #

pred :: CInt -> CInt #

toEnum :: Int -> CInt #

fromEnum :: CInt -> Int #

enumFrom :: CInt -> [CInt] #

enumFromThen :: CInt -> CInt -> [CInt] #

enumFromTo :: CInt -> CInt -> [CInt] #

enumFromThenTo :: CInt -> CInt -> CInt -> [CInt] #

Eq CInt 
Instance details

Defined in Foreign.C.Types

Methods

(==) :: CInt -> CInt -> Bool #

(/=) :: CInt -> CInt -> Bool #

Integral CInt 
Instance details

Defined in Foreign.C.Types

Methods

quot :: CInt -> CInt -> CInt #

rem :: CInt -> CInt -> CInt #

div :: CInt -> CInt -> CInt #

mod :: CInt -> CInt -> CInt #

quotRem :: CInt -> CInt -> (CInt, CInt) #

divMod :: CInt -> CInt -> (CInt, CInt) #

toInteger :: CInt -> Integer #

Num CInt 
Instance details

Defined in Foreign.C.Types

Methods

(+) :: CInt -> CInt -> CInt #

(-) :: CInt -> CInt -> CInt #

(*) :: CInt -> CInt -> CInt #

negate :: CInt -> CInt #

abs :: CInt -> CInt #

signum :: CInt -> CInt #

fromInteger :: Integer -> CInt #

Ord CInt 
Instance details

Defined in Foreign.C.Types

Methods

compare :: CInt -> CInt -> Ordering #

(<) :: CInt -> CInt -> Bool #

(<=) :: CInt -> CInt -> Bool #

(>) :: CInt -> CInt -> Bool #

(>=) :: CInt -> CInt -> Bool #

max :: CInt -> CInt -> CInt #

min :: CInt -> CInt -> CInt #

Read CInt 
Instance details

Defined in Foreign.C.Types

Real CInt 
Instance details

Defined in Foreign.C.Types

Methods

toRational :: CInt -> Rational #

Show CInt 
Instance details

Defined in Foreign.C.Types

Methods

showsPrec :: Int -> CInt -> ShowS #

show :: CInt -> String #

showList :: [CInt] -> ShowS #

Storable CInt 
Instance details

Defined in Foreign.C.Types

Methods

sizeOf :: CInt -> Int #

alignment :: CInt -> Int #

peekElemOff :: Ptr CInt -> Int -> IO CInt #

pokeElemOff :: Ptr CInt -> Int -> CInt -> IO () #

peekByteOff :: Ptr b -> Int -> IO CInt #

pokeByteOff :: Ptr b -> Int -> CInt -> IO () #

peek :: Ptr CInt -> IO CInt #

poke :: Ptr CInt -> CInt -> IO () #

Bits CInt 
Instance details

Defined in Foreign.C.Types

FiniteBits CInt 
Instance details

Defined in Foreign.C.Types

Subtractive CInt 
Instance details

Defined in Basement.Numerical.Subtractive

Associated Types

type Difference CInt #

Methods

(-) :: CInt -> CInt -> Difference CInt #

Prim CInt 
Instance details

Defined in Data.Primitive.Types

Wrapped CInt 
Instance details

Defined in Control.Lens.Wrapped

Associated Types

type Unwrapped CInt #

Rewrapped CInt t 
Instance details

Defined in Control.Lens.Wrapped

type Difference CInt 
Instance details

Defined in Basement.Numerical.Subtractive

type Unwrapped CInt 
Instance details

Defined in Control.Lens.Wrapped

newtype CUInt #

Haskell type representing the C unsigned int type.

Constructors

CUInt Word32 

Instances

Instances details
Bounded CUInt 
Instance details

Defined in Foreign.C.Types

Enum CUInt 
Instance details

Defined in Foreign.C.Types

Eq CUInt 
Instance details

Defined in Foreign.C.Types

Methods

(==) :: CUInt -> CUInt -> Bool #

(/=) :: CUInt -> CUInt -> Bool #

Integral CUInt 
Instance details

Defined in Foreign.C.Types

Num CUInt 
Instance details

Defined in Foreign.C.Types

Ord CUInt 
Instance details

Defined in Foreign.C.Types

Methods

compare :: CUInt -> CUInt -> Ordering #

(<) :: CUInt -> CUInt -> Bool #

(<=) :: CUInt -> CUInt -> Bool #

(>) :: CUInt -> CUInt -> Bool #

(>=) :: CUInt -> CUInt -> Bool #

max :: CUInt -> CUInt -> CUInt #

min :: CUInt -> CUInt -> CUInt #

Read CUInt 
Instance details

Defined in Foreign.C.Types

Real CUInt 
Instance details

Defined in Foreign.C.Types

Methods

toRational :: CUInt -> Rational #

Show CUInt 
Instance details

Defined in Foreign.C.Types

Methods

showsPrec :: Int -> CUInt -> ShowS #

show :: CUInt -> String #

showList :: [CUInt] -> ShowS #

Storable CUInt 
Instance details

Defined in Foreign.C.Types

Methods

sizeOf :: CUInt -> Int #

alignment :: CUInt -> Int #

peekElemOff :: Ptr CUInt -> Int -> IO CUInt #

pokeElemOff :: Ptr CUInt -> Int -> CUInt -> IO () #

peekByteOff :: Ptr b -> Int -> IO CUInt #

pokeByteOff :: Ptr b -> Int -> CUInt -> IO () #

peek :: Ptr CUInt -> IO CUInt #

poke :: Ptr CUInt -> CUInt -> IO () #

Bits CUInt 
Instance details

Defined in Foreign.C.Types

FiniteBits CUInt 
Instance details

Defined in Foreign.C.Types

Subtractive CUInt 
Instance details

Defined in Basement.Numerical.Subtractive

Associated Types

type Difference CUInt #

Methods

(-) :: CUInt -> CUInt -> Difference CUInt #

Prim CUInt 
Instance details

Defined in Data.Primitive.Types

Wrapped CUInt 
Instance details

Defined in Control.Lens.Wrapped

Associated Types

type Unwrapped CUInt #

Rewrapped CUInt t 
Instance details

Defined in Control.Lens.Wrapped

type Difference CUInt 
Instance details

Defined in Basement.Numerical.Subtractive

type Unwrapped CUInt 
Instance details

Defined in Control.Lens.Wrapped