{-# LANGUAGE CPP, TemplateHaskell, ScopedTypeVariables #-}
{-# LANGUAGE OverloadedStrings, BangPatterns #-}

-----------------------------------------------------------------------------
-- |
-- Module      :  NgxExport.Tools.Aggregate
-- Copyright   :  (c) Alexey Radkov 2019-2021
-- License     :  BSD-style
--
-- Maintainer  :  alexey.radkov@gmail.com
-- Stability   :  stable
-- Portability :  non-portable (requires Template Haskell)
--
-- An aggregate service from the more extra tools collection for
-- <http://github.com/lyokha/nginx-haskell-module nginx-haskell-module>.
--
-----------------------------------------------------------------------------


module NgxExport.Tools.Aggregate (
    -- * The typed service exporter
    -- $aggregateServiceExporter
#ifdef SNAP_AGGREGATE_SERVER
                                  AggregateServerConf,
#endif
                                  ngxExportAggregateService
    -- * Nginx-based aggregate service
    -- $nginxBasedAggregateService

    -- * The worker-side reporter
                                 ,reportAggregate
    -- * Re-exported data constructors from /Foreign.C/
    -- | Re-exports are needed by the exporter for marshalling in foreign calls.
                                 ,Foreign.C.Types.CInt (..)
                                 ,Foreign.C.Types.CUInt (..)
                                 ) where

import           NgxExport
import           NgxExport.Tools

import           Language.Haskell.TH
import           Network.HTTP.Client
import           Foreign.C.Types
import qualified Data.ByteString as B
import qualified Data.ByteString.Char8 as C8
import           Data.ByteString (ByteString)
import qualified Data.ByteString.Lazy as L
import           Data.Map.Strict (Map)
import qualified Data.Map.Strict as M
import           Data.IORef
import           Data.Int
#if MIN_VERSION_time(1,9,1)
import           Data.Fixed
#endif
import           Data.Time.Clock
import           Data.Time.Calendar
import           Data.Aeson
import           Data.Maybe
import           Control.Monad
import           Control.Exception
import           System.IO.Unsafe
import           Safe

#ifdef SNAP_AGGREGATE_SERVER
import qualified Data.Text as T
import qualified Data.Text.Encoding as T
import           Control.Monad.IO.Class
import           Control.Exception.Enclosed (handleAny)
import           Snap.Http.Server
import           Snap.Core
#endif


type AggregateValue a = (UTCTime, Map Int32 (UTCTime, Maybe a))
type Aggregate a = IORef (AggregateValue a)

type ReportValue a = Maybe (Int32, Maybe a)

-- $aggregateServiceExporter
--
-- An aggregate service collects custom typed data reported by worker processes
-- and sends this via HTTP when requested. This is an 'ignitionService' in terms
-- of module "NgxExport.Tools", which means that it starts upon the startup of
-- the worker process and runs until termination of the worker. Internally, an
-- aggregate service starts an HTTP server implemented via the [Snap
-- framework](http://snapframework.com/), which serves incoming requests from
-- worker processes (collecting data) as well as from the Nginx server's
-- clients (reporting collected data for administration purpose).
--
-- Below is a simple example.
--
-- ==== File /test_tools_extra_aggregate.hs/
-- @
-- {-\# LANGUAGE TemplateHaskell, DeriveGeneric, TypeApplications \#-}
-- {-\# LANGUAGE OverloadedStrings, BangPatterns \#-}
--
-- module TestToolsExtraAggregate where
--
-- import           NgxExport
-- import           NgxExport.Tools
-- import           NgxExport.Tools.Aggregate
--
-- import           Data.ByteString (ByteString)
-- import qualified Data.ByteString.Lazy.Char8 as C8L
-- import           Data.Aeson
-- import           Data.Maybe
-- import           Data.IORef
-- import           System.IO.Unsafe
-- import           GHC.Generics
--
-- data Stats = Stats { bytesSent :: Int
--                    , requests :: Int
--                    , meanBytesSent :: Int
--                    } deriving Generic
-- instance FromJSON Stats
-- instance ToJSON Stats
--
-- stats :: IORef Stats
-- stats = unsafePerformIO $ newIORef $ Stats 0 0 0
-- {-\# NOINLINE stats \#-}
--
-- updateStats :: ByteString -> IO C8L.ByteString
-- __/updateStats/__ s = do
--     let cbs = 'readFromByteString' \@Int s
--     modifyIORef\' stats $ \\(Stats bs rs _) ->
--         let !nbs = bs + fromMaybe 0 cbs
--             !nrs = rs + 1
--             !nmbs = nbs \`div\` nrs
--         in Stats nbs nrs nmbs
--     return \"\"
-- 'NgxExport.ngxExportIOYY' \'updateStats
--
-- reportStats :: Int -> Bool -> IO C8L.ByteString
-- __/reportStats/__ = 'deferredService' $ \\port -> do
--     s <- readIORef stats
--     'reportAggregate' port (Just s) \"__/stats/__\"
--     return \"\"
-- 'ngxExportSimpleServiceTyped' \'reportStats \'\'Int $
--     'PersistentService' $ Just $ Sec 5
--
-- 'ngxExportAggregateService' \"__/stats/__\" \'\'Stats
-- @
--
-- Here, on the bottom line, aggregate service /stats/ is declared. It expects
-- from worker processes reports in JSON format with data of type /Stats/ which
-- includes the number of bytes sent so far, the number of client requests, and
-- the mean value of bytes sent per a single request. Its own configuration
-- (a TCP port and the /purge interval/) shall be defined in the Nginx
-- configuration file. The reports from worker processes are sent from a
-- 'deferredService' /reportStats/ every 5 seconds: it merely reads data
-- collected in a global IORef /stats/ and then sends this to the aggregate
-- service using 'reportAggregate'. Handler /updateStats/ updates the /stats/
-- on every run. It accepts a /ByteString/ from Nginx, then converts it to an
-- /Int/ value and interprets this as the number of bytes sent in the current
-- request. It also increments the number or requests and calculates the mean
-- value of bytes sent in all requests to this worker so far. Notice that all
-- the parts of /stats/ are evaluated /strictly/, it is important!
--
-- ==== File /nginx.conf/
-- @
-- user                    nobody;
-- worker_processes        2;
--
-- events {
--     worker_connections  1024;
-- }
--
-- http {
--     default_type        application\/octet-stream;
--     sendfile            on;
--
--     haskell load \/var\/lib\/nginx\/test_tools_extra_aggregate.so;
--
--     haskell_run_service __/simpleService_aggregate_stats/__ $hs_stats
--             \'__/AggregateServerConf/__ { __/asPort/__ = 8100, __/asPurgeInterval/__ = Min 5 }\';
--
--     haskell_service_var_in_shm stats 32k \/tmp $hs_stats;
--
--     haskell_run_service __/simpleService_reportStats/__ $hs_reportStats 8100;
--
--     server {
--         listen       8010;
--         server_name  main;
--         error_log    \/tmp\/nginx-test-haskell-error.log;
--         access_log   \/tmp\/nginx-test-haskell-access.log;
--
--         haskell_run __/updateStats/__ !$hs_updateStats $bytes_sent;
--
--         location \/ {
--             echo Ok;
--         }
--     }
--
--     server {
--         listen       8020;
--         server_name  stat;
--
--         location \/ {
--             allow 127.0.0.1;
--             deny all;
--             proxy_pass http:\/\/127.0.0.1:8100\/get\/__/stats/__;
--         }
--     }
-- }
-- @
--
-- The aggregate service /stats/ must be referred from the Nginx configuration
-- file with prefix __/simpleService_aggregate_/__. Its configuration is typed,
-- the type is 'AggregateServerConf'. Though its only constructor
-- /AggregateServerConf/ is not exported from this module, the service is still
-- configurable from an Nginx configuration. Here, the aggregate service listens
-- on TCP port /8100/, and its /purge interval/ is 5 minutes. Notice that an
-- aggregate service must be /shared/ (here, variable /$hs_stats/ is declared as
-- shared with Nginx directive /haskell_service_var_in_shm/), otherwise it won't
-- even start because the internal HTTP servers on each worker process won't be
-- able to bind to the same TCP port. Inside the upper /server/ clause, handler
-- /updateStats/ runs on every client request. This handler always returns an
-- empty string in variable /$hs_updateStats/ because it is only needed for the
-- side effect of updating the /stats/. However, as soon as Nginx variable
-- handlers are /lazy/, evaluation of /$hs_updateStats/ must be forced somehow.
-- To achieve this, we used the /strict annotation/ (the /bang/ symbol) in
-- directive /haskell_run/ that enforces strict evaluation in a late request
-- processing phase, when the value of variable /$bytes_sent/ has been already
-- calculated.
--
-- Data collected by the aggregate service can be obtained in a request to the
-- virtual server listening on TCP port /8020/. It simply proxies requests to
-- the internal aggregate server with URL /\/get\/__stats__/ where __/stats/__
-- corresponds to the /name/ of the aggregate service.
--
-- ==== A simple test
-- As far as /reportStats/ is a deferred service, we won't get useful data in 5
-- seconds after Nginx start.
--
-- > $ curl -s 'http://127.0.0.1:8020/' | jq
-- > [
-- >   "1858-11-17T00:00:00Z",
-- >   {}
-- > ]
--
-- However, later we should get some useful data.
--
-- > $ curl -s 'http://127.0.0.1:8020/' | jq
-- > [
-- >   "2021-12-08T09:56:18.118132083Z",
-- >   {
-- >     "21651": [
-- >       "2021-12-08T09:56:18.12155413Z",
-- >       {
-- >         "meanBytesSent": 0,
-- >         "requests": 0,
-- >         "bytesSent": 0
-- >       }
-- >     ],
-- >     "21652": [
-- >       "2021-12-08T09:56:18.118132083Z",
-- >       {
-- >         "meanBytesSent": 0,
-- >         "requests": 0,
-- >         "bytesSent": 0
-- >       }
-- >     ]
-- >   }
-- > ]
--
-- Here we have collected stats from the two Nginx worker processes with /PIDs/
-- /21651/ and /21652/. The timestamps show when the stats was updated the last
-- time. The topmost timestamp shows the time of the latest /purge/ event. The
-- data itself have only zeros as soon we have made no request to the main
-- server so far. Let's run 100 simultaneous requests and look at the stats (it
-- should update at worst in 5 seconds after running them).
--
-- > $ for i in {1..100} ; do curl 'http://127.0.0.1:8010/' & done
--
-- Wait 5 seconds...
--
-- > $ curl -s 'http://127.0.0.1:8020/' | jq
-- > [
-- >   "2021-12-08T09:56:18.118132083Z",
-- >   {
-- >     "21651": [
-- >       "2021-12-08T09:56:48.159263993Z",
-- >       {
-- >         "meanBytesSent": 183,
-- >         "requests": 84,
-- >         "bytesSent": 15372
-- >       }
-- >     ],
-- >     "21652": [
-- >       "2021-12-08T09:56:48.136934713Z",
-- >       {
-- >         "meanBytesSent": 183,
-- >         "requests": 16,
-- >         "bytesSent": 2928
-- >       }
-- >     ]
-- >   }
-- > ]

throwUserError :: String -> IO a
throwUserError :: forall a. String -> IO a
throwUserError = IOError -> IO a
forall a. IOError -> IO a
ioError (IOError -> IO a) -> (String -> IOError) -> String -> IO a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> IOError
userError

#if MIN_VERSION_time(1,9,1)
asIntegerPart :: forall a. HasResolution a => Integer -> Fixed a
asIntegerPart :: forall a. HasResolution a => Integer -> Fixed a
asIntegerPart = Integer -> Fixed a
forall k (a :: k). Integer -> Fixed a
MkFixed (Integer -> Fixed a) -> (Integer -> Integer) -> Integer -> Fixed a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Fixed a -> Integer
forall k (a :: k) (p :: k -> *). HasResolution a => p a -> Integer
forall (p :: * -> *). p a -> Integer
resolution (Fixed a
forall a. HasCallStack => a
undefined :: Fixed a) Integer -> Integer -> Integer
forall a. Num a => a -> a -> a
*)
{-# SPECIALIZE INLINE asIntegerPart :: Integer -> Pico #-}
#endif

toNominalDiffTime :: TimeInterval -> NominalDiffTime
toNominalDiffTime :: TimeInterval -> NominalDiffTime
toNominalDiffTime =
#if MIN_VERSION_time(1,9,1)
    Pico -> NominalDiffTime
secondsToNominalDiffTime (Pico -> NominalDiffTime)
-> (TimeInterval -> Pico) -> TimeInterval -> NominalDiffTime
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Integer -> Pico
forall a. HasResolution a => Integer -> Fixed a
asIntegerPart
#else
    fromRational . toRational . secondsToDiffTime
#endif
    (Integer -> Pico)
-> (TimeInterval -> Integer) -> TimeInterval -> Pico
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> Integer
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> Integer)
-> (TimeInterval -> Int) -> TimeInterval -> Integer
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TimeInterval -> Int
toSec

updateAggregate :: Aggregate a -> ReportValue a -> NominalDiffTime -> IO ()
updateAggregate :: forall a. Aggregate a -> ReportValue a -> NominalDiffTime -> IO ()
updateAggregate Aggregate a
a ReportValue a
s NominalDiffTime
int = do
    let (Int32
pid, Maybe a
v) = ReportValue a -> (Int32, Maybe a)
forall a. HasCallStack => Maybe a -> a
fromJust ReportValue a
s
    !UTCTime
t <- IO UTCTime
getCurrentTime
    Aggregate a
-> (AggregateValue a -> (AggregateValue a, ())) -> IO ()
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' Aggregate a
a ((AggregateValue a -> (AggregateValue a, ())) -> IO ())
-> (AggregateValue a -> (AggregateValue a, ())) -> IO ()
forall a b. (a -> b) -> a -> b
$
        \(UTCTime
t', Map Int32 (UTCTime, Maybe a)
v') ->
            (let (!UTCTime
tn, Map k (UTCTime, b) -> Map k (UTCTime, b)
f) =
                     if UTCTime -> UTCTime -> NominalDiffTime
diffUTCTime UTCTime
t UTCTime
t' NominalDiffTime -> NominalDiffTime -> Bool
forall a. Ord a => a -> a -> Bool
>= NominalDiffTime
int
                         then (UTCTime
t
                              ,((UTCTime, b) -> Bool) -> Map k (UTCTime, b) -> Map k (UTCTime, b)
forall a k. (a -> Bool) -> Map k a -> Map k a
M.filter (((UTCTime, b) -> Bool)
 -> Map k (UTCTime, b) -> Map k (UTCTime, b))
-> ((UTCTime, b) -> Bool)
-> Map k (UTCTime, b)
-> Map k (UTCTime, b)
forall a b. (a -> b) -> a -> b
$
                                  \(UTCTime
t'', b
_) -> UTCTime -> UTCTime -> NominalDiffTime
diffUTCTime UTCTime
t UTCTime
t'' NominalDiffTime -> NominalDiffTime -> Bool
forall a. Ord a => a -> a -> Bool
< NominalDiffTime
int
                              )
                         else (UTCTime
t', Map k (UTCTime, b) -> Map k (UTCTime, b)
forall a. a -> a
id)
                 !vn :: Map Int32 (UTCTime, Maybe a)
vn = Map Int32 (UTCTime, Maybe a) -> Map Int32 (UTCTime, Maybe a)
forall {k} {b}. Map k (UTCTime, b) -> Map k (UTCTime, b)
f (Map Int32 (UTCTime, Maybe a) -> Map Int32 (UTCTime, Maybe a))
-> Map Int32 (UTCTime, Maybe a) -> Map Int32 (UTCTime, Maybe a)
forall a b. (a -> b) -> a -> b
$ (Maybe (UTCTime, Maybe a) -> Maybe (UTCTime, Maybe a))
-> Int32
-> Map Int32 (UTCTime, Maybe a)
-> Map Int32 (UTCTime, Maybe a)
forall k a.
Ord k =>
(Maybe a -> Maybe a) -> k -> Map k a -> Map k a
M.alter
                           (\Maybe (UTCTime, Maybe a)
old ->
                               let !new' :: Maybe a
new' = if Maybe (UTCTime, Maybe a) -> Bool
forall a. Maybe a -> Bool
isNothing Maybe (UTCTime, Maybe a)
old Bool -> Bool -> Bool
|| Maybe a -> Bool
forall a. Maybe a -> Bool
isJust Maybe a
v
                                               then Maybe a
v
                                               else (UTCTime, Maybe a) -> Maybe a
forall a b. (a, b) -> b
snd ((UTCTime, Maybe a) -> Maybe a) -> (UTCTime, Maybe a) -> Maybe a
forall a b. (a -> b) -> a -> b
$ Maybe (UTCTime, Maybe a) -> (UTCTime, Maybe a)
forall a. HasCallStack => Maybe a -> a
fromJust Maybe (UTCTime, Maybe a)
old
                               in (UTCTime, Maybe a) -> Maybe (UTCTime, Maybe a)
forall a. a -> Maybe a
Just (UTCTime
t, Maybe a
new')
                           ) Int32
pid Map Int32 (UTCTime, Maybe a)
v'
             in (UTCTime
tn, Map Int32 (UTCTime, Maybe a)
vn)
            ,()
            )

receiveAggregate :: FromJSON a =>
    Aggregate a -> L.ByteString -> ByteString -> IO L.ByteString
receiveAggregate :: forall a.
FromJSON a =>
Aggregate a -> ByteString -> ByteString -> IO ByteString
receiveAggregate Aggregate a
a ByteString
v ByteString
sint = do
    let !s :: Maybe (ReportValue a)
s = ByteString -> Maybe (ReportValue a)
forall a. FromJSON a => ByteString -> Maybe a
decode' ByteString
v
        !int :: NominalDiffTime
int = TimeInterval -> NominalDiffTime
toNominalDiffTime (TimeInterval -> NominalDiffTime)
-> TimeInterval -> NominalDiffTime
forall a b. (a -> b) -> a -> b
$ TimeInterval -> String -> TimeInterval
forall a. Read a => a -> String -> a
readDef (Int -> TimeInterval
Min Int
5) (String -> TimeInterval) -> String -> TimeInterval
forall a b. (a -> b) -> a -> b
$ ByteString -> String
C8.unpack ByteString
sint
    Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Maybe (ReportValue a) -> Bool
forall a. Maybe a -> Bool
isNothing Maybe (ReportValue a)
s) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ String -> IO ()
forall a. String -> IO a
throwUserError String
"Unreadable aggregate!"
    Aggregate a -> ReportValue a -> NominalDiffTime -> IO ()
forall a. Aggregate a -> ReportValue a -> NominalDiffTime -> IO ()
updateAggregate Aggregate a
a (Maybe (ReportValue a) -> ReportValue a
forall a. HasCallStack => Maybe a -> a
fromJust Maybe (ReportValue a)
s) NominalDiffTime
int
    ByteString -> IO ByteString
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ByteString
"done"

sendAggregate :: ToJSON a =>
    Aggregate a -> ByteString -> IO ContentHandlerResult
sendAggregate :: forall a.
ToJSON a =>
Aggregate a -> ByteString -> IO ContentHandlerResult
sendAggregate Aggregate a
a = IO ContentHandlerResult -> ByteString -> IO ContentHandlerResult
forall a b. a -> b -> a
const (IO ContentHandlerResult -> ByteString -> IO ContentHandlerResult)
-> IO ContentHandlerResult -> ByteString -> IO ContentHandlerResult
forall a b. (a -> b) -> a -> b
$ do
    AggregateValue a
s <- Aggregate a -> IO (AggregateValue a)
forall a. IORef a -> IO a
readIORef Aggregate a
a
    ContentHandlerResult -> IO ContentHandlerResult
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (AggregateValue a -> ByteString
forall a. ToJSON a => a -> ByteString
encode AggregateValue a
s, ByteString
"text/plain", Int
200, [])


#ifdef SNAP_AGGREGATE_SERVER

-- | Configuration of an aggregate service.
--
-- This type is exported because Template Haskell requires that. Though its
-- only constructor /AggregateServerConf/ is not exported, it is still reachable
-- from Nginx configuration files. Below is definition of the constructor.
--
-- @
--     AggregateServerConf { asPort          :: Int
--                         , asPurgeInterval :: 'TimeInterval'
--                         }
-- @
--
-- The value of /asPort/ corresponds to the TCP port of the internal aggregate
-- server (the IP address of the internal server is always /127.0.0.1/). The
-- /asPurgeInterval/ is the /purge/ interval. An aggregate service should
-- sometimes purge data from worker processes which have not reported for a
-- long time. For example, it makes no sense to keep data from workers that
-- have already been terminated. The inactive PIDs get checked every
-- /asPurgeInterval/, and data which correspond to PIDs with timestamps older
-- than /asPurgeInterval/ get removed.
--
-- Be aware that due to limitations of Template Haskell, this name must be
-- imported unqualified!
data AggregateServerConf =
    AggregateServerConf { AggregateServerConf -> Int
asPort          :: Int
                        , AggregateServerConf -> TimeInterval
asPurgeInterval :: TimeInterval
                        } deriving ReadPrec [AggregateServerConf]
ReadPrec AggregateServerConf
Int -> ReadS AggregateServerConf
ReadS [AggregateServerConf]
(Int -> ReadS AggregateServerConf)
-> ReadS [AggregateServerConf]
-> ReadPrec AggregateServerConf
-> ReadPrec [AggregateServerConf]
-> Read AggregateServerConf
forall a.
(Int -> ReadS a)
-> ReadS [a] -> ReadPrec a -> ReadPrec [a] -> Read a
$creadsPrec :: Int -> ReadS AggregateServerConf
readsPrec :: Int -> ReadS AggregateServerConf
$creadList :: ReadS [AggregateServerConf]
readList :: ReadS [AggregateServerConf]
$creadPrec :: ReadPrec AggregateServerConf
readPrec :: ReadPrec AggregateServerConf
$creadListPrec :: ReadPrec [AggregateServerConf]
readListPrec :: ReadPrec [AggregateServerConf]
Read

aggregateServer :: (FromJSON a, ToJSON a) =>
    Aggregate a -> ByteString -> AggregateServerConf -> Bool -> IO L.ByteString
aggregateServer :: forall a.
(FromJSON a, ToJSON a) =>
Aggregate a
-> ByteString -> AggregateServerConf -> Bool -> IO ByteString
aggregateServer Aggregate a
a ByteString
u = (AggregateServerConf -> IO ByteString)
-> AggregateServerConf -> Bool -> IO ByteString
forall a. (a -> IO ByteString) -> a -> Bool -> IO ByteString
ignitionService ((AggregateServerConf -> IO ByteString)
 -> AggregateServerConf -> Bool -> IO ByteString)
-> (AggregateServerConf -> IO ByteString)
-> AggregateServerConf
-> Bool
-> IO ByteString
forall a b. (a -> b) -> a -> b
$ \AggregateServerConf
conf -> do
    let !int :: NominalDiffTime
int = TimeInterval -> NominalDiffTime
toNominalDiffTime (TimeInterval -> NominalDiffTime)
-> TimeInterval -> NominalDiffTime
forall a b. (a -> b) -> a -> b
$ AggregateServerConf -> TimeInterval
asPurgeInterval AggregateServerConf
conf
    Config Snap Any -> Snap () -> IO ()
forall (m :: * -> *) a.
MonadSnap m =>
Config m a -> Snap () -> IO ()
simpleHttpServe (Int -> Config Snap Any
forall a. Int -> Config Snap a
asConfig (Int -> Config Snap Any) -> Int -> Config Snap Any
forall a b. (a -> b) -> a -> b
$ AggregateServerConf -> Int
asPort AggregateServerConf
conf) (Snap () -> IO ()) -> Snap () -> IO ()
forall a b. (a -> b) -> a -> b
$ Aggregate a -> ByteString -> NominalDiffTime -> Snap ()
forall a.
(FromJSON a, ToJSON a) =>
Aggregate a -> ByteString -> NominalDiffTime -> Snap ()
asHandler Aggregate a
a ByteString
u NominalDiffTime
int
    ByteString -> IO ByteString
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ByteString
""

asConfig :: Int -> Config Snap a
asConfig :: forall a. Int -> Config Snap a
asConfig Int
p = Int -> Config Snap a -> Config Snap a
forall (m :: * -> *) a. Int -> Config m a -> Config m a
setPort Int
p
           (Config Snap a -> Config Snap a) -> Config Snap a -> Config Snap a
forall a b. (a -> b) -> a -> b
$ ByteString -> Config Snap a -> Config Snap a
forall (m :: * -> *) a. ByteString -> Config m a -> Config m a
setBind ByteString
"127.0.0.1"
           (Config Snap a -> Config Snap a) -> Config Snap a -> Config Snap a
forall a b. (a -> b) -> a -> b
$ ConfigLog -> Config Snap a -> Config Snap a
forall (m :: * -> *) a. ConfigLog -> Config m a -> Config m a
setAccessLog ConfigLog
ConfigNoLog
           (Config Snap a -> Config Snap a) -> Config Snap a -> Config Snap a
forall a b. (a -> b) -> a -> b
$ ConfigLog -> Config Snap a -> Config Snap a
forall (m :: * -> *) a. ConfigLog -> Config m a -> Config m a
setErrorLog ConfigLog
ConfigNoLog
           (Config Snap a -> Config Snap a) -> Config Snap a -> Config Snap a
forall a b. (a -> b) -> a -> b
$ Bool -> Config Snap a -> Config Snap a
forall (m :: * -> *) a. Bool -> Config m a -> Config m a
setVerbose Bool
False Config Snap a
forall a. Monoid a => a
mempty

asHandler :: (FromJSON a, ToJSON a) =>
    Aggregate a -> ByteString -> NominalDiffTime -> Snap ()
asHandler :: forall a.
(FromJSON a, ToJSON a) =>
Aggregate a -> ByteString -> NominalDiffTime -> Snap ()
asHandler Aggregate a
a ByteString
u NominalDiffTime
int =
    [(ByteString, Snap ())] -> Snap ()
forall (m :: * -> *) a. MonadSnap m => [(ByteString, m a)] -> m a
route [(ByteString -> ByteString -> ByteString
B.append ByteString
"put/" ByteString
u
           ,Method -> Snap () -> Snap ()
forall (m :: * -> *) a. MonadSnap m => Method -> m a -> m a
Snap.Core.method Method
POST (Snap () -> Snap ()) -> Snap () -> Snap ()
forall a b. (a -> b) -> a -> b
$ Aggregate a -> NominalDiffTime -> Snap ()
forall a. FromJSON a => Aggregate a -> NominalDiffTime -> Snap ()
receiveAggregateSnap Aggregate a
a NominalDiffTime
int
           )
          ,(ByteString -> ByteString -> ByteString
B.append ByteString
"get/" ByteString
u
           ,Method -> Snap () -> Snap ()
forall (m :: * -> *) a. MonadSnap m => Method -> m a -> m a
Snap.Core.method Method
GET (Snap () -> Snap ()) -> Snap () -> Snap ()
forall a b. (a -> b) -> a -> b
$ Aggregate a -> Snap ()
forall a. ToJSON a => Aggregate a -> Snap ()
sendAggregateSnap Aggregate a
a
           )
          ]

receiveAggregateSnap :: FromJSON a => Aggregate a -> NominalDiffTime -> Snap ()
receiveAggregateSnap :: forall a. FromJSON a => Aggregate a -> NominalDiffTime -> Snap ()
receiveAggregateSnap Aggregate a
a NominalDiffTime
int =
    String -> Snap () -> Snap ()
handleAggregateExceptions String
"Exception while receiving aggregate" (Snap () -> Snap ()) -> Snap () -> Snap ()
forall a b. (a -> b) -> a -> b
$ do
        !Maybe (ReportValue a)
s <- ByteString -> Maybe (ReportValue a)
forall a. FromJSON a => ByteString -> Maybe a
decode' (ByteString -> Maybe (ReportValue a))
-> Snap ByteString -> Snap (Maybe (ReportValue a))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Word64 -> Snap ByteString
forall (m :: * -> *). MonadSnap m => Word64 -> m ByteString
readRequestBody Word64
65536
        Bool -> Snap () -> Snap ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Maybe (ReportValue a) -> Bool
forall a. Maybe a -> Bool
isNothing Maybe (ReportValue a)
s) (Snap () -> Snap ()) -> Snap () -> Snap ()
forall a b. (a -> b) -> a -> b
$ IO () -> Snap ()
forall a. IO a -> Snap a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Snap ()) -> IO () -> Snap ()
forall a b. (a -> b) -> a -> b
$ String -> IO ()
forall a. String -> IO a
throwUserError String
"Unreadable aggregate!"
        IO () -> Snap ()
forall a. IO a -> Snap a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Snap ()) -> IO () -> Snap ()
forall a b. (a -> b) -> a -> b
$ Aggregate a -> ReportValue a -> NominalDiffTime -> IO ()
forall a. Aggregate a -> ReportValue a -> NominalDiffTime -> IO ()
updateAggregate Aggregate a
a (Maybe (ReportValue a) -> ReportValue a
forall a. HasCallStack => Maybe a -> a
fromJust Maybe (ReportValue a)
s) NominalDiffTime
int
        Response -> Snap ()
forall (m :: * -> *) a. MonadSnap m => Response -> m a
finishWith Response
emptyResponse

sendAggregateSnap :: ToJSON a => Aggregate a -> Snap ()
sendAggregateSnap :: forall a. ToJSON a => Aggregate a -> Snap ()
sendAggregateSnap Aggregate a
a =
    String -> Snap () -> Snap ()
handleAggregateExceptions String
"Exception while sending aggregate" (Snap () -> Snap ()) -> Snap () -> Snap ()
forall a b. (a -> b) -> a -> b
$ do
        AggregateValue a
s <- IO (AggregateValue a) -> Snap (AggregateValue a)
forall a. IO a -> Snap a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (AggregateValue a) -> Snap (AggregateValue a))
-> IO (AggregateValue a) -> Snap (AggregateValue a)
forall a b. (a -> b) -> a -> b
$ Aggregate a -> IO (AggregateValue a)
forall a. IORef a -> IO a
readIORef Aggregate a
a
        (Response -> Response) -> Snap ()
forall (m :: * -> *). MonadSnap m => (Response -> Response) -> m ()
modifyResponse ((Response -> Response) -> Snap ())
-> (Response -> Response) -> Snap ()
forall a b. (a -> b) -> a -> b
$ ByteString -> Response -> Response
setContentType ByteString
"application/json"
        ByteString -> Snap ()
forall (m :: * -> *). MonadSnap m => ByteString -> m ()
writeLBS (ByteString -> Snap ()) -> ByteString -> Snap ()
forall a b. (a -> b) -> a -> b
$ AggregateValue a -> ByteString
forall a. ToJSON a => a -> ByteString
encode AggregateValue a
s

handleAggregateExceptions :: String -> Snap () -> Snap ()
handleAggregateExceptions :: String -> Snap () -> Snap ()
handleAggregateExceptions String
cmsg = (SomeException -> Snap ()) -> Snap () -> Snap ()
forall (m :: * -> *) a.
MonadBaseControl IO m =>
(SomeException -> m a) -> m a -> m a
handleAny ((SomeException -> Snap ()) -> Snap () -> Snap ())
-> (SomeException -> Snap ()) -> Snap () -> Snap ()
forall a b. (a -> b) -> a -> b
$ \SomeException
e ->
    Int -> String -> Snap ()
forall {m :: * -> *}. MonadSnap m => Int -> String -> m ()
writeErrorResponse Int
500 (String -> Snap ()) -> String -> Snap ()
forall a b. (a -> b) -> a -> b
$ SomeException -> String
forall a. Show a => a -> String
show (SomeException
e :: SomeException)
    where writeErrorResponse :: Int -> String -> m ()
writeErrorResponse Int
c String
msg = do
              (Response -> Response) -> m ()
forall (m :: * -> *). MonadSnap m => (Response -> Response) -> m ()
modifyResponse ((Response -> Response) -> m ()) -> (Response -> Response) -> m ()
forall a b. (a -> b) -> a -> b
$ Int -> ByteString -> Response -> Response
setResponseStatus Int
c (ByteString -> Response -> Response)
-> ByteString -> Response -> Response
forall a b. (a -> b) -> a -> b
$ Text -> ByteString
T.encodeUtf8 (Text -> ByteString) -> Text -> ByteString
forall a b. (a -> b) -> a -> b
$ String -> Text
T.pack String
cmsg
              ByteString -> m ()
forall (m :: * -> *). MonadSnap m => ByteString -> m ()
writeBS (ByteString -> m ()) -> ByteString -> m ()
forall a b. (a -> b) -> a -> b
$ Text -> ByteString
T.encodeUtf8 (Text -> ByteString) -> Text -> ByteString
forall a b. (a -> b) -> a -> b
$ String -> Text
T.pack String
msg

#endif


-- | Exports a simple aggregate service with specified name and the aggregate
--   type.
--
-- The name of the service can be chosen arbitrarily, however it must be
-- exactly referred from 'reportAggregate' and client requests to the service
-- because the URL of the internal HTTP server contains this.
--
-- The aggregate type must have instances of 'FromJSON' and 'ToJSON' as its
-- objects will be transferred via HTTP in JSON format.
--
-- The service is implemented via 'ngxExportSimpleServiceTyped' with
-- 'AggregateServerConf' as the name of its custom type. This is an
-- 'ignitionService' with an HTTP server based on the [Snap
-- framework](http://snapframework.com/) running inside. The internal HTTP
-- server collects data from worker processes on URL
-- /\/put\/__\<name_of_the_service\>__/ and reports data on URL
-- /\/get\/__\<name_of_the_service\>__/.
ngxExportAggregateService :: String       -- ^ Name of the service
                          -> Name         -- ^ Name of the aggregate type
                          -> Q [Dec]
ngxExportAggregateService :: String -> Name -> Q [Dec]
ngxExportAggregateService String
f Name
a = do
    let sName :: Name
sName = String -> Name
mkName (String -> Name) -> String -> Name
forall a b. (a -> b) -> a -> b
$ String
"aggregate_storage_" String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
f
        storage :: Q Exp
storage = Name -> Q Exp
forall (m :: * -> *). Quote m => Name -> m Exp
varE Name
sName
#ifdef SNAP_AGGREGATE_SERVER
        nameF :: Name
nameF = 'aggregateServer
        fName :: Name
fName = String -> Name
mkName (String -> Name) -> String -> Name
forall a b. (a -> b) -> a -> b
$ String
"aggregate_" String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
f
        uName :: Name
uName = String -> Name
mkName (String -> Name) -> String -> Name
forall a b. (a -> b) -> a -> b
$ String
"aggregate_url_" String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
f
#endif
        nameRecv :: Name
nameRecv = 'receiveAggregate
        recvName :: Name
recvName = String -> Name
mkName (String -> Name) -> String -> Name
forall a b. (a -> b) -> a -> b
$ String
"receiveAggregate_" String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
f
        nameSend :: Name
nameSend = 'sendAggregate
        sendName :: Name
sendName = String -> Name
mkName (String -> Name) -> String -> Name
forall a b. (a -> b) -> a -> b
$ String
"sendAggregate_" String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
f
    [[Dec]] -> [Dec]
forall (t :: * -> *) a. Foldable t => t [a] -> [a]
concat ([[Dec]] -> [Dec]) -> Q [[Dec]] -> Q [Dec]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [Q [Dec]] -> Q [[Dec]]
forall (t :: * -> *) (m :: * -> *) a.
(Traversable t, Monad m) =>
t (m a) -> m (t a)
forall (m :: * -> *) a. Monad m => [m a] -> m [a]
sequence
        [[Q Dec] -> Q [Dec]
forall (t :: * -> *) (m :: * -> *) a.
(Traversable t, Monad m) =>
t (m a) -> m (t a)
forall (m :: * -> *) a. Monad m => [m a] -> m [a]
sequence
            [Name -> Q Type -> Q Dec
forall (m :: * -> *). Quote m => Name -> m Type -> m Dec
sigD Name
sName [t|Aggregate $(conT a)|]
            ,Name -> [Q Clause] -> Q Dec
forall (m :: * -> *). Quote m => Name -> [m Clause] -> m Dec
funD Name
sName
                [[Q Pat] -> Q Body -> [Q Dec] -> Q Clause
forall (m :: * -> *).
Quote m =>
[m Pat] -> m Body -> [m Dec] -> m Clause
clause []
                    (Q Exp -> Q Body
forall (m :: * -> *). Quote m => m Exp -> m Body
normalB [|unsafePerformIO $
                                 newIORef (UTCTime (ModifiedJulianDay 0) 0
                                          ,M.empty
                                          )
                             |]
                    )
                    []
                ]
            ,Name -> Inline -> RuleMatch -> Phases -> Q Dec
forall (m :: * -> *).
Quote m =>
Name -> Inline -> RuleMatch -> Phases -> m Dec
pragInlD Name
sName Inline
NoInline RuleMatch
FunLike Phases
AllPhases
#ifdef SNAP_AGGREGATE_SERVER
            ,Name -> Q Type -> Q Dec
forall (m :: * -> *). Quote m => Name -> m Type -> m Dec
sigD Name
uName [t|ByteString|]
            ,Name -> [Q Clause] -> Q Dec
forall (m :: * -> *). Quote m => Name -> [m Clause] -> m Dec
funD Name
uName [[Q Pat] -> Q Body -> [Q Dec] -> Q Clause
forall (m :: * -> *).
Quote m =>
[m Pat] -> m Body -> [m Dec] -> m Clause
clause [] (Q Exp -> Q Body
forall (m :: * -> *). Quote m => m Exp -> m Body
normalB [|C8.pack f|]) []]
            ,Name -> Q Type -> Q Dec
forall (m :: * -> *). Quote m => Name -> m Type -> m Dec
sigD Name
fName [t|AggregateServerConf -> Bool -> IO L.ByteString|]
            ,Name -> [Q Clause] -> Q Dec
forall (m :: * -> *). Quote m => Name -> [m Clause] -> m Dec
funD Name
fName
                [[Q Pat] -> Q Body -> [Q Dec] -> Q Clause
forall (m :: * -> *).
Quote m =>
[m Pat] -> m Body -> [m Dec] -> m Clause
clause []
                    (Q Exp -> Q Body
forall (m :: * -> *). Quote m => m Exp -> m Body
normalB [|$(varE nameF) $(storage) $(varE uName)|])
                    []
                ]
#endif
            ,Name -> Q Type -> Q Dec
forall (m :: * -> *). Quote m => Name -> m Type -> m Dec
sigD Name
recvName [t|L.ByteString -> ByteString -> IO L.ByteString|]
            ,Name -> [Q Clause] -> Q Dec
forall (m :: * -> *). Quote m => Name -> [m Clause] -> m Dec
funD Name
recvName
                [[Q Pat] -> Q Body -> [Q Dec] -> Q Clause
forall (m :: * -> *).
Quote m =>
[m Pat] -> m Body -> [m Dec] -> m Clause
clause []
                    (Q Exp -> Q Body
forall (m :: * -> *). Quote m => m Exp -> m Body
normalB [|$(varE nameRecv) $(storage)|])
                    []
                ]
            ,Name -> Q Type -> Q Dec
forall (m :: * -> *). Quote m => Name -> m Type -> m Dec
sigD Name
sendName [t|ByteString -> IO ContentHandlerResult|]
            ,Name -> [Q Clause] -> Q Dec
forall (m :: * -> *). Quote m => Name -> [m Clause] -> m Dec
funD Name
sendName
                [[Q Pat] -> Q Body -> [Q Dec] -> Q Clause
forall (m :: * -> *).
Quote m =>
[m Pat] -> m Body -> [m Dec] -> m Clause
clause []
                    (Q Exp -> Q Body
forall (m :: * -> *). Quote m => m Exp -> m Body
normalB [|$(varE nameSend) $(storage)|])
                    []
                ]
            ]
#ifdef SNAP_AGGREGATE_SERVER
        -- FIXME: name AggregateServerConf must be imported from the user's
        -- module unqualified (see details in NgxExport/Tools.hs, function
        -- ngxExportSimpleService')!
        ,Name -> Name -> ServiceMode -> Q [Dec]
ngxExportSimpleServiceTyped
            Name
fName ''AggregateServerConf ServiceMode
SingleShotService
#endif
        ,Name -> Q [Dec]
ngxExportAsyncOnReqBody Name
recvName
        ,Name -> Q [Dec]
ngxExportAsyncHandler Name
sendName
        ]

-- $nginxBasedAggregateService
--
-- Service /simpleService_aggregate_stats/ was implemented using
-- /Snap framework/. Basically, a native Nginx implementation is not easy
-- because the service must listen on a single (not duplicated) file descriptor
-- which is not the case when Nginx spawns more than one worker processes.
-- Running /simpleService_aggregate_stats/ as a shared service is an elegant
-- solution as shared services guarantee that they occupy only one worker at a
-- time. However, /nginx-haskell-module/ provides directive /single_listener/
-- which can be used to apply the required restriction in a custom Nginx virtual
-- server. This directive requires that the virtual server listens with option
-- /reuseport/ and is only available on Linux with socket option
-- /SO_ATTACH_REUSEPORT_CBPF/.
--
-- Exporter 'ngxExportAggregateService' exports additional handlers to build a
-- native Nginx-based aggregate service. Let's replace service
-- /simpleService_aggregate_stats/ from the previous example with such a native
-- Nginx-based aggregate service using /single_listener/ and listening on port
-- /8100/.
--
-- ==== File /nginx.conf/
-- @
-- user                    nobody;
-- worker_processes        2;
--
-- events {
--     worker_connections  1024;
-- }
--
-- http {
--     default_type        application\/octet-stream;
--     sendfile            on;
--
--     haskell load \/var\/lib\/nginx\/test_tools_extra_aggregate.so;
--
--     haskell_run_service simpleService_reportStats $hs_reportStats 8100;
--
--     haskell_var_empty_on_error $hs_stats;
--
--     server {
--         listen       8010;
--         server_name  main;
--         error_log    \/tmp\/nginx-test-haskell-error.log;
--         access_log   \/tmp\/nginx-test-haskell-access.log;
--
--         haskell_run updateStats !$hs_updateStats $bytes_sent;
--
--         location \/ {
--             echo Ok;
--         }
--     }
--
--     server {
--         listen       8020;
--         server_name  stat;
--
--         location \/ {
--             allow 127.0.0.1;
--             deny all;
--             proxy_pass http:\/\/127.0.0.1:8100\/get\/stats;
--         }
--     }
--
--     server {
--         listen          8100 __/reuseport/__;
--         server_name     stats;
--
--         __/single_listener on/__;
--
--         location __/\/put\/stats/__ {
--             haskell_run_async_on_request_body __/receiveAggregate_stats/__
--                     $hs_stats \"Min 1\";
--
--             if ($hs_stats = \'\') {
--                 return 400;
--             }
--
--             return 200;
--         }
--
--         location __/\/get\/stats/__ {
--             haskell_async_content __/sendAggregate_stats/__ noarg;
--         }
--     }
-- }
-- @
--
-- Handler /receiveAggregate_stats/ accepts a time interval corresponding to the
-- value of /asPurgeInterval/ from service /simpleService_aggregate_stats/. If
-- the value is not readable (say, /noarg/) then it is defaulted to /Min 5/.
--
-- Notice that the stats server must listen on IP address /127.0.0.1/ because
-- 'reportAggregate' (being the base of service /simpleService_reportStats/)
-- reports stats to this address.

-- | Reports data to an aggregate service.
--
-- If reported data is 'Nothing' then data collected on the aggregate service
-- won't alter except that the timestamp associated with the PID of the sending
-- worker process will be updated.
reportAggregate :: ToJSON a => Int          -- ^ Port of the aggregate server
                            -> Maybe a      -- ^ Reported data
                            -> ByteString   -- ^ Name of the aggregate service
                            -> IO ()
reportAggregate :: forall a. ToJSON a => Int -> Maybe a -> ByteString -> IO ()
reportAggregate Int
p Maybe a
v ByteString
u =
    (SomeException -> IO ()) -> IO () -> IO ()
forall e a. Exception e => (e -> IO a) -> IO a -> IO a
handle (IO () -> SomeException -> IO ()
forall a b. a -> b -> a
const (IO () -> SomeException -> IO ())
-> IO () -> SomeException -> IO ()
forall a b. (a -> b) -> a -> b
$ () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return () :: SomeException -> IO ()) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
        Request
req <- String -> IO Request
forall (m :: * -> *). MonadThrow m => String -> m Request
parseRequest String
"POST http://127.0.0.1"
        Int32
pid <- CPid -> Int32
forall a b. (Integral a, Num b) => a -> b
fromIntegral (CPid -> Int32) -> IO CPid -> IO Int32
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO CPid
ngxPid :: IO Int32
        let !req' :: Request
req' = Request
req { requestBody :: RequestBody
requestBody = ByteString -> RequestBody
RequestBodyLBS (ByteString -> RequestBody) -> ByteString -> RequestBody
forall a b. (a -> b) -> a -> b
$ (Int32, Maybe a) -> ByteString
forall a. ToJSON a => a -> ByteString
encode (Int32
pid, Maybe a
v)
                        , port :: Int
port = Int
p
                        , path :: ByteString
Network.HTTP.Client.path = ByteString -> ByteString -> ByteString
B.append ByteString
"put/" ByteString
u
                        }
        IO (Response ()) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Response ()) -> IO ()) -> IO (Response ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ Request -> Manager -> IO (Response ())
httpNoBody Request
req' Manager
httpManager

httpManager :: Manager
httpManager :: Manager
httpManager = IO Manager -> Manager
forall a. IO a -> a
unsafePerformIO (IO Manager -> Manager) -> IO Manager -> Manager
forall a b. (a -> b) -> a -> b
$ ManagerSettings -> IO Manager
newManager ManagerSettings
defaultManagerSettings
{-# NOINLINE httpManager #-}