{-# LANGUAGE TemplateHaskell, OverloadedStrings, BangPatterns #-}

-----------------------------------------------------------------------------
-- |
-- Module      :  NgxExport.Tools.Aggregate
-- Copyright   :  (c) Alexey Radkov 2019-2020
-- License     :  BSD-style
--
-- Maintainer  :  alexey.radkov@gmail.com
-- Stability   :  experimental
-- Portability :  non-portable (requires Template Haskell)
--
-- An aggregate service from the more extra tools collection for
-- <http://github.com/lyokha/nginx-haskell-module nginx-haskell-module>.
--
-----------------------------------------------------------------------------


module NgxExport.Tools.Aggregate (
    -- * The typed service exporter
    -- $aggregateServiceExporter
                                  AggregateServerConf
                                 ,ngxExportAggregateService
    -- * The worker-side reporter
                                 ,reportAggregate
    -- * Re-exported data constructors from /Foreign.C/
    -- | Re-exports are needed by the exporter for marshalling in foreign calls.
                                 ,Foreign.C.Types.CInt (..)
                                 ,Foreign.C.Types.CUInt (..)
                                 ) where

import           NgxExport.Tools

import           Language.Haskell.TH
import           Network.HTTP.Client
import           Foreign.C.Types
import qualified Data.ByteString as B
import qualified Data.ByteString.Char8 as C8
import           Data.ByteString (ByteString)
import qualified Data.ByteString.Lazy as L
import           Data.Map.Strict (Map)
import qualified Data.Map.Strict as M
import           Data.IORef
import           Data.Int
import qualified Data.Text as T
import qualified Data.Text.Encoding as T
import           Data.Time.Clock.POSIX
import           Data.Aeson
import           Data.Maybe
import           Control.Monad
import           Control.Monad.IO.Class
import           Control.Arrow
import           Control.Exception
import           Control.Exception.Enclosed (handleAny)
import           System.IO.Unsafe
import           Snap.Http.Server
import           Snap.Core

type Aggregate a = IORef (CTime, Map Int32 (CTime, Maybe a))

-- $aggregateServiceExporter
--
-- An aggregate service collects custom typed data reported by worker processes
-- and sends this via HTTP when requested. This is an 'ignitionService' in terms
-- of module "NgxExport.Tools", which means that it starts upon the startup of
-- the worker process and runs until termination of the worker. Internally, an
-- aggregate service starts an HTTP server implemented via the [Snap
-- framework](http://snapframework.com/), which serves incoming requests from
-- worker processes (collecting data) as well as from the Nginx server's
-- clients (reporting collected data for administration purpose).
--
-- Below is a simple example.
--
-- ==== File /test_tools_extra_aggregate.hs/
-- @
-- {-\# LANGUAGE TemplateHaskell, DeriveGeneric, TypeApplications \#-}
-- {-\# LANGUAGE OverloadedStrings, BangPatterns \#-}
--
-- module TestToolsExtraAggregate where
--
-- import           NgxExport
-- import           NgxExport.Tools
-- import           NgxExport.Tools.Aggregate
--
-- import           Data.ByteString (ByteString)
-- import qualified Data.ByteString.Lazy.Char8 as C8L
-- import           Data.Aeson
-- import           Data.Maybe
-- import           Data.IORef
-- import           System.IO.Unsafe
-- import           GHC.Generics
--
-- data Stats = Stats { bytesSent :: Int
--                    , requests :: Int
--                    , meanBytesSent :: Int
--                    } deriving Generic
-- instance FromJSON Stats
-- instance ToJSON Stats
--
-- stats :: IORef Stats
-- stats = unsafePerformIO $ newIORef $ Stats 0 0 0
-- {-\# NOINLINE stats \#-}
--
-- updateStats :: ByteString -> IO C8L.ByteString
-- __/updateStats/__ s = do
--     let cbs = 'readFromByteString' \@Int s
--     modifyIORef\' stats $ \\(Stats bs rs _) ->
--         let !nbs = bs + fromMaybe 0 cbs
--             !nrs = rs + 1
--             !nmbs = nbs \`div\` nrs
--         in Stats nbs nrs nmbs
--     return \"\"
-- 'NgxExport.ngxExportIOYY' \'updateStats
--
-- reportStats :: Int -> Bool -> IO C8L.ByteString
-- __/reportStats/__ = 'deferredService' $ \\port -> do
--     s <- readIORef stats
--     'reportAggregate' port (Just s) \"__/stats/__\"
--     return \"\"
-- 'ngxExportSimpleServiceTyped' \'reportStats \'\'Int $
--     'PersistentService' $ Just $ Sec 5
--
-- 'ngxExportAggregateService' \"__/stats/__\" \'\'Stats
-- @
--
-- Here, on the bottom line, aggregate service /stats/ is declared. It expects
-- from worker processes reports in JSON format with data of type /Stats/ which
-- includes the number of bytes sent so far, the number of client requests, and
-- the mean value of bytes sent per a single request. Its own configuration
-- (a TCP port and the /purge interval/) shall be defined in the Nginx
-- configuration file. The reports from worker processes are sent from a
-- 'deferredService' /reportStats/ every 5 seconds: it merely reads data
-- collected in a global IORef /stats/ and then sends this to the aggregate
-- service using 'reportAggregate'. Handler /updateStats/ updates the /stats/
-- on every run. It accepts a /ByteString/ from Nginx, then converts it to an
-- /Int/ value and interprets this as the number of bytes sent in the current
-- request. It also increments the number or requests and calculates the mean
-- value of bytes sent in all requests to this worker so far. Notice that all
-- the parts of /stats/ are evaluated /strictly/, it is important!
--
-- ==== File /nginx.conf/
-- @
-- user                    nobody;
-- worker_processes        2;
--
-- events {
--     worker_connections  1024;
-- }
--
-- http {
--     default_type        application\/octet-stream;
--     sendfile            on;
--
--     haskell load \/var\/lib\/nginx\/test_tools_extra_aggregate.so;
--
--     haskell_run_service __/simpleService_aggregate_stats/__ $hs_stats
--             \'__/AggregateServerConf/__ { __/asPort/__ = 8100, __/asPurgeInterval/__ = Min 5 }\';
--
--     haskell_service_var_in_shm stats 32k \/tmp $hs_stats;
--
--     haskell_run_service __/simpleService_reportStats/__ $hs_reportStats 8100;
--
--     server {
--         listen       8010;
--         server_name  main;
--         error_log    \/tmp\/nginx-test-haskell-error.log;
--         access_log   \/tmp\/nginx-test-haskell-access.log;
--
--         haskell_run __/updateStats/__ !$hs_updateStats $bytes_sent;
--
--         location \/ {
--             echo Ok;
--         }
--     }
--
--     server {
--         listen       8020;
--         server_name  stat;
--
--         location \/ {
--             allow 127.0.0.1;
--             deny all;
--             proxy_pass http:\/\/127.0.0.1:8100\/get\/__/stats/__;
--         }
--     }
-- }
-- @
--
-- The aggregate service /stats/ must be referred from the Nginx configuration
-- file with prefix __/simpleService_aggregate_/__. Its configuration is typed,
-- the type is 'AggregateServerConf'. Though its only constructor
-- /AggregateServerConf/ is not exported from this module, the service is still
-- configurable from an Nginx configuration. Here, the aggregate service listens
-- on TCP port /8100/, and its /purge interval/ is 5 minutes. Notice that an
-- aggregate service must be /shared/ (here, variable /$hs_stats/ is declared as
-- shared with Nginx directive /haskell_service_var_in_shm/), otherwise it won't
-- even start because the internal HTTP servers on each worker process won't be
-- able to bind to the same TCP port. Inside the upper /server/ clause, handler
-- /updateStats/ runs on every client request. This handler always returns an
-- empty string in variable /$hs_updateStats/ because it is only needed for the
-- side effect of updating the /stats/. However, as soon as Nginx variable
-- handlers are /lazy/, evaluation of /$hs_updateStats/ must be forced somehow.
-- To achieve this, we used the /strict annotation/ (the /bang/ symbol) in
-- directive /haskell_run/ that enforces strict evaluation in a late request
-- processing phase, when the value of variable /$bytes_sent/ has been already
-- calculated.
--
-- Data collected by the aggregate service can be obtained in a request to the
-- virtual server listening on TCP port /8020/. It simply proxies requests to
-- the internal aggregate server with URL /\/get\/__stats__/ where __/stats/__
-- corresponds to the /name/ of the aggregate service.
--
-- ==== A simple test
-- As far as /reportStats/ is a deferred service, we won't get useful data in 5
-- seconds after Nginx start.
--
-- > $ curl 'http://127.0.0.1:8020/' | jq
-- > [
-- >   "1970-01-01T00:00:00Z",
-- >   {}
-- > ]
--
-- However, later we should get some useful data.
--
-- > $ curl 'http://127.0.0.1:8020/' | jq
-- > [
-- >   "2019-04-22T14:19:04Z",
-- >   {
-- >     "5910": [
-- >       "2019-04-22T14:19:19Z",
-- >       {
-- >         "bytesSent": 0,
-- >         "requests": 0,
-- >         "meanBytesSent": 0
-- >       }
-- >     ],
-- >     "5911": [
-- >       "2019-04-22T14:19:14Z",
-- >       {
-- >         "bytesSent": 0,
-- >         "requests": 0,
-- >         "meanBytesSent": 0
-- >       }
-- >     ]
-- >   }
-- > ]
--
-- Here we have collected stats from the two Nginx worker processes with /PIDs/
-- /5910/ and /5911/. The timestamps show when the stats was updated the last
-- time. The topmost timestamp shows the time of the latest /purge/ event. The
-- data itself have only zeros as soon we have made no request to the main
-- server so far. Let's run 100 simultaneous requests and look at the stats (it
-- should update at worst in 5 seconds after running them).
--
-- > $ for i in {1..100} ; do curl 'http://127.0.0.1:8010/' & done
--
-- Wait 5 seconds...
--
-- > $ curl 'http://127.0.0.1:8020/' | jq
-- > [
-- >   "2019-04-22T14:29:04Z",
-- >   {
-- >     "5910": [
-- >       "2019-04-22T14:31:34Z",
-- >       {
-- >         "bytesSent": 17751,
-- >         "requests": 97,
-- >         "meanBytesSent": 183
-- >       }
-- >     ],
-- >     "5911": [
-- >       "2019-04-22T14:31:31Z",
-- >       {
-- >         "bytesSent": 549,
-- >         "requests": 3,
-- >         "meanBytesSent": 183
-- >       }
-- >     ]
-- >   }
-- > ]

-- | Configuration of an aggregate service.
--
-- This type is exported because Template Haskell requires that. Though its
-- only constructor /AggregateServerConf/ is not exported, it is still reachable
-- from Nginx configuration files. Below is definition of the constructor.
--
-- @
--     AggregateServerConf { asPort          :: Int
--                         , asPurgeInterval :: 'TimeInterval'
--                         }
-- @
--
-- The value of /asPort/ corresponds to the TCP port of the internal aggregate
-- server. The /asPurgeInterval/ is the /purge/ interval. An aggregate service
-- should sometimes purge data from worker processes which did not report for a
-- long time. For example, it makes no sense to keep data from workers that
-- have already been terminated. The inactive PIDs get checked every
-- /asPurgeInterval/, and data which correspond to PIDs with timestamps older
-- than /asPurgeInterval/ get removed.
--
-- Be aware that due to limitations of Template Haskell, this name must be
-- imported unqualified!
data AggregateServerConf =
    AggregateServerConf { AggregateServerConf -> Int
asPort          :: Int
                        , AggregateServerConf -> TimeInterval
asPurgeInterval :: TimeInterval
                        } deriving ReadPrec [AggregateServerConf]
ReadPrec AggregateServerConf
Int -> ReadS AggregateServerConf
ReadS [AggregateServerConf]
(Int -> ReadS AggregateServerConf)
-> ReadS [AggregateServerConf]
-> ReadPrec AggregateServerConf
-> ReadPrec [AggregateServerConf]
-> Read AggregateServerConf
forall a.
(Int -> ReadS a)
-> ReadS [a] -> ReadPrec a -> ReadPrec [a] -> Read a
readListPrec :: ReadPrec [AggregateServerConf]
$creadListPrec :: ReadPrec [AggregateServerConf]
readPrec :: ReadPrec AggregateServerConf
$creadPrec :: ReadPrec AggregateServerConf
readList :: ReadS [AggregateServerConf]
$creadList :: ReadS [AggregateServerConf]
readsPrec :: Int -> ReadS AggregateServerConf
$creadsPrec :: Int -> ReadS AggregateServerConf
Read

aggregateServer :: (FromJSON a, ToJSON a) =>
    Aggregate a -> ByteString -> AggregateServerConf -> Bool -> IO L.ByteString
aggregateServer :: Aggregate a
-> ByteString -> AggregateServerConf -> Bool -> IO ByteString
aggregateServer a :: Aggregate a
a u :: ByteString
u = (AggregateServerConf -> IO ByteString)
-> AggregateServerConf -> Bool -> IO ByteString
forall a. (a -> IO ByteString) -> a -> Bool -> IO ByteString
ignitionService ((AggregateServerConf -> IO ByteString)
 -> AggregateServerConf -> Bool -> IO ByteString)
-> (AggregateServerConf -> IO ByteString)
-> AggregateServerConf
-> Bool
-> IO ByteString
forall a b. (a -> b) -> a -> b
$ \conf :: AggregateServerConf
conf ->
    Config Snap Any -> Snap () -> IO ()
forall (m :: * -> *) a.
MonadSnap m =>
Config m a -> Snap () -> IO ()
simpleHttpServe (Int -> Config Snap Any
forall a. Int -> Config Snap a
asConfig (Int -> Config Snap Any) -> Int -> Config Snap Any
forall a b. (a -> b) -> a -> b
$ AggregateServerConf -> Int
asPort AggregateServerConf
conf) (Aggregate a -> ByteString -> AggregateServerConf -> Snap ()
forall a.
(FromJSON a, ToJSON a) =>
Aggregate a -> ByteString -> AggregateServerConf -> Snap ()
asHandler Aggregate a
a ByteString
u AggregateServerConf
conf) IO () -> IO ByteString -> IO ByteString
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> ByteString -> IO ByteString
forall (m :: * -> *) a. Monad m => a -> m a
return ""

asConfig :: Int -> Config Snap a
asConfig :: Int -> Config Snap a
asConfig p :: Int
p = Int -> Config Snap a -> Config Snap a
forall (m :: * -> *) a. Int -> Config m a -> Config m a
setPort Int
p
           (Config Snap a -> Config Snap a) -> Config Snap a -> Config Snap a
forall a b. (a -> b) -> a -> b
$ ByteString -> Config Snap a -> Config Snap a
forall (m :: * -> *) a. ByteString -> Config m a -> Config m a
setBind "127.0.0.1"
           (Config Snap a -> Config Snap a) -> Config Snap a -> Config Snap a
forall a b. (a -> b) -> a -> b
$ ConfigLog -> Config Snap a -> Config Snap a
forall (m :: * -> *) a. ConfigLog -> Config m a -> Config m a
setAccessLog ConfigLog
ConfigNoLog
           (Config Snap a -> Config Snap a) -> Config Snap a -> Config Snap a
forall a b. (a -> b) -> a -> b
$ ConfigLog -> Config Snap a -> Config Snap a
forall (m :: * -> *) a. ConfigLog -> Config m a -> Config m a
setErrorLog ConfigLog
ConfigNoLog
           (Config Snap a -> Config Snap a) -> Config Snap a -> Config Snap a
forall a b. (a -> b) -> a -> b
$ Bool -> Config Snap a -> Config Snap a
forall (m :: * -> *) a. Bool -> Config m a -> Config m a
setVerbose Bool
False Config Snap a
forall a. Monoid a => a
mempty

asHandler :: (FromJSON a, ToJSON a) =>
    Aggregate a -> ByteString -> AggregateServerConf -> Snap ()
asHandler :: Aggregate a -> ByteString -> AggregateServerConf -> Snap ()
asHandler a :: Aggregate a
a u :: ByteString
u conf :: AggregateServerConf
conf =
    [(ByteString, Snap ())] -> Snap ()
forall (m :: * -> *) a. MonadSnap m => [(ByteString, m a)] -> m a
route [(ByteString -> ByteString -> ByteString
B.append "put/" ByteString
u, Method -> Snap () -> Snap ()
forall (m :: * -> *) a. MonadSnap m => Method -> m a -> m a
Snap.Core.method Method
POST (Snap () -> Snap ()) -> Snap () -> Snap ()
forall a b. (a -> b) -> a -> b
$ Aggregate a -> AggregateServerConf -> Snap ()
forall a.
FromJSON a =>
Aggregate a -> AggregateServerConf -> Snap ()
receiveAggregate Aggregate a
a AggregateServerConf
conf)
          ,(ByteString -> ByteString -> ByteString
B.append "get/" ByteString
u, Method -> Snap () -> Snap ()
forall (m :: * -> *) a. MonadSnap m => Method -> m a -> m a
Snap.Core.method Method
GET (Snap () -> Snap ()) -> Snap () -> Snap ()
forall a b. (a -> b) -> a -> b
$ Aggregate a -> Snap ()
forall a. ToJSON a => Aggregate a -> Snap ()
sendAggregate Aggregate a
a)
          ]

receiveAggregate :: FromJSON a =>
    Aggregate a -> AggregateServerConf -> Snap ()
receiveAggregate :: Aggregate a -> AggregateServerConf -> Snap ()
receiveAggregate a :: Aggregate a
a conf :: AggregateServerConf
conf =
    String -> Snap () -> Snap ()
handleAggregateExceptions "Exception while receiving aggregate" (Snap () -> Snap ()) -> Snap () -> Snap ()
forall a b. (a -> b) -> a -> b
$ do
        !Maybe (Int32, Maybe a)
s <- ByteString -> Maybe (Int32, Maybe a)
forall a. FromJSON a => ByteString -> Maybe a
decode' (ByteString -> Maybe (Int32, Maybe a))
-> Snap ByteString -> Snap (Maybe (Int32, Maybe a))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Word64 -> Snap ByteString
forall (m :: * -> *). MonadSnap m => Word64 -> m ByteString
readRequestBody 65536
        Bool -> Snap () -> Snap ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Maybe (Int32, Maybe a) -> Bool
forall a. Maybe a -> Bool
isNothing Maybe (Int32, Maybe a)
s) (Snap () -> Snap ()) -> Snap () -> Snap ()
forall a b. (a -> b) -> a -> b
$ IO () -> Snap ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Snap ()) -> IO () -> Snap ()
forall a b. (a -> b) -> a -> b
$ String -> IO ()
forall a. String -> IO a
throwUserError "Unreadable aggregate!"
        IO () -> Snap ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Snap ()) -> IO () -> Snap ()
forall a b. (a -> b) -> a -> b
$ do
            let (pid :: Int32
pid, v :: Maybe a
v) = Maybe (Int32, Maybe a) -> (Int32, Maybe a)
forall a. HasCallStack => Maybe a -> a
fromJust Maybe (Int32, Maybe a)
s
                int :: CTime
int = Int -> CTime
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> CTime)
-> (AggregateServerConf -> Int) -> AggregateServerConf -> CTime
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TimeInterval -> Int
toSec (TimeInterval -> Int)
-> (AggregateServerConf -> TimeInterval)
-> AggregateServerConf
-> Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. AggregateServerConf -> TimeInterval
asPurgeInterval (AggregateServerConf -> CTime) -> AggregateServerConf -> CTime
forall a b. (a -> b) -> a -> b
$ AggregateServerConf
conf
            !CTime
t <- IO CTime
ngxNow
            Aggregate a
-> ((CTime, Map Int32 (CTime, Maybe a))
    -> ((CTime, Map Int32 (CTime, Maybe a)), ()))
-> IO ()
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' Aggregate a
a (((CTime, Map Int32 (CTime, Maybe a))
  -> ((CTime, Map Int32 (CTime, Maybe a)), ()))
 -> IO ())
-> ((CTime, Map Int32 (CTime, Maybe a))
    -> ((CTime, Map Int32 (CTime, Maybe a)), ()))
-> IO ()
forall a b. (a -> b) -> a -> b
$
                \(t' :: CTime
t', v' :: Map Int32 (CTime, Maybe a)
v') ->
                    (let (!CTime
tn, f :: Map k (CTime, b) -> Map k (CTime, b)
f) =
                             if CTime
t CTime -> CTime -> CTime
forall a. Num a => a -> a -> a
- CTime
t' CTime -> CTime -> Bool
forall a. Ord a => a -> a -> Bool
>= CTime
int
                                 then (CTime
t, ((CTime, b) -> Bool) -> Map k (CTime, b) -> Map k (CTime, b)
forall a k. (a -> Bool) -> Map k a -> Map k a
M.filter (((CTime, b) -> Bool) -> Map k (CTime, b) -> Map k (CTime, b))
-> ((CTime, b) -> Bool) -> Map k (CTime, b) -> Map k (CTime, b)
forall a b. (a -> b) -> a -> b
$ \(t'' :: CTime
t'', _) -> CTime
t CTime -> CTime -> CTime
forall a. Num a => a -> a -> a
- CTime
t'' CTime -> CTime -> Bool
forall a. Ord a => a -> a -> Bool
< CTime
int)
                                 else (CTime
t', Map k (CTime, b) -> Map k (CTime, b)
forall a. a -> a
id)
                         !vn :: Map Int32 (CTime, Maybe a)
vn = Map Int32 (CTime, Maybe a) -> Map Int32 (CTime, Maybe a)
forall k b. Map k (CTime, b) -> Map k (CTime, b)
f (Map Int32 (CTime, Maybe a) -> Map Int32 (CTime, Maybe a))
-> Map Int32 (CTime, Maybe a) -> Map Int32 (CTime, Maybe a)
forall a b. (a -> b) -> a -> b
$ (Maybe (CTime, Maybe a) -> Maybe (CTime, Maybe a))
-> Int32
-> Map Int32 (CTime, Maybe a)
-> Map Int32 (CTime, Maybe a)
forall k a.
Ord k =>
(Maybe a -> Maybe a) -> k -> Map k a -> Map k a
M.alter
                                   (\old :: Maybe (CTime, Maybe a)
old ->
                                       let !new' :: Maybe a
new' =
                                               if Maybe (CTime, Maybe a) -> Bool
forall a. Maybe a -> Bool
isNothing Maybe (CTime, Maybe a)
old Bool -> Bool -> Bool
|| Maybe a -> Bool
forall a. Maybe a -> Bool
isJust Maybe a
v
                                                   then Maybe a
v
                                                   else (CTime, Maybe a) -> Maybe a
forall a b. (a, b) -> b
snd ((CTime, Maybe a) -> Maybe a) -> (CTime, Maybe a) -> Maybe a
forall a b. (a -> b) -> a -> b
$ Maybe (CTime, Maybe a) -> (CTime, Maybe a)
forall a. HasCallStack => Maybe a -> a
fromJust Maybe (CTime, Maybe a)
old
                                       in (CTime, Maybe a) -> Maybe (CTime, Maybe a)
forall a. a -> Maybe a
Just (CTime
t, Maybe a
new')
                                   ) Int32
pid Map Int32 (CTime, Maybe a)
v'
                     in (CTime
tn, Map Int32 (CTime, Maybe a)
vn)
                    ,()
                    )
        Response -> Snap ()
forall (m :: * -> *) a. MonadSnap m => Response -> m a
finishWith Response
emptyResponse

sendAggregate :: ToJSON a => Aggregate a -> Snap ()
sendAggregate :: Aggregate a -> Snap ()
sendAggregate a :: Aggregate a
a =
    String -> Snap () -> Snap ()
handleAggregateExceptions "Exception while sending aggregate" (Snap () -> Snap ()) -> Snap () -> Snap ()
forall a b. (a -> b) -> a -> b
$ do
        (CTime, Map Int32 (CTime, Maybe a))
s <- IO (CTime, Map Int32 (CTime, Maybe a))
-> Snap (CTime, Map Int32 (CTime, Maybe a))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (CTime, Map Int32 (CTime, Maybe a))
 -> Snap (CTime, Map Int32 (CTime, Maybe a)))
-> IO (CTime, Map Int32 (CTime, Maybe a))
-> Snap (CTime, Map Int32 (CTime, Maybe a))
forall a b. (a -> b) -> a -> b
$ Aggregate a -> IO (CTime, Map Int32 (CTime, Maybe a))
forall a. IORef a -> IO a
readIORef Aggregate a
a
        (Response -> Response) -> Snap ()
forall (m :: * -> *). MonadSnap m => (Response -> Response) -> m ()
modifyResponse ((Response -> Response) -> Snap ())
-> (Response -> Response) -> Snap ()
forall a b. (a -> b) -> a -> b
$ ByteString -> Response -> Response
setContentType "application/json"
        ByteString -> Snap ()
forall (m :: * -> *). MonadSnap m => ByteString -> m ()
writeLBS (ByteString -> Snap ()) -> ByteString -> Snap ()
forall a b. (a -> b) -> a -> b
$ (UTCTime, Map Int32 (UTCTime, Maybe a)) -> ByteString
forall a. ToJSON a => a -> ByteString
encode ((UTCTime, Map Int32 (UTCTime, Maybe a)) -> ByteString)
-> (UTCTime, Map Int32 (UTCTime, Maybe a)) -> ByteString
forall a b. (a -> b) -> a -> b
$ (CTime -> UTCTime
toUTCTime (CTime -> UTCTime)
-> (Map Int32 (CTime, Maybe a) -> Map Int32 (UTCTime, Maybe a))
-> (CTime, Map Int32 (CTime, Maybe a))
-> (UTCTime, Map Int32 (UTCTime, Maybe a))
forall (a :: * -> * -> *) b c b' c'.
Arrow a =>
a b c -> a b' c' -> a (b, b') (c, c')
*** ((CTime, Maybe a) -> (UTCTime, Maybe a))
-> Map Int32 (CTime, Maybe a) -> Map Int32 (UTCTime, Maybe a)
forall a b k. (a -> b) -> Map k a -> Map k b
M.map ((CTime -> UTCTime) -> (CTime, Maybe a) -> (UTCTime, Maybe a)
forall (a :: * -> * -> *) b c d.
Arrow a =>
a b c -> a (b, d) (c, d)
first CTime -> UTCTime
toUTCTime)) (CTime, Map Int32 (CTime, Maybe a))
s
    where toUTCTime :: CTime -> UTCTime
toUTCTime (CTime t :: Int64
t) = POSIXTime -> UTCTime
posixSecondsToUTCTime (POSIXTime -> UTCTime) -> POSIXTime -> UTCTime
forall a b. (a -> b) -> a -> b
$ Int64 -> POSIXTime
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int64
t

handleAggregateExceptions :: String -> Snap () -> Snap ()
handleAggregateExceptions :: String -> Snap () -> Snap ()
handleAggregateExceptions cmsg :: String
cmsg = (SomeException -> Snap ()) -> Snap () -> Snap ()
forall (m :: * -> *) a.
MonadBaseControl IO m =>
(SomeException -> m a) -> m a -> m a
handleAny ((SomeException -> Snap ()) -> Snap () -> Snap ())
-> (SomeException -> Snap ()) -> Snap () -> Snap ()
forall a b. (a -> b) -> a -> b
$ \e :: SomeException
e ->
    Int -> String -> Snap ()
forall (m :: * -> *). MonadSnap m => Int -> String -> m ()
writeErrorResponse 500 (String -> Snap ()) -> String -> Snap ()
forall a b. (a -> b) -> a -> b
$ SomeException -> String
forall a. Show a => a -> String
show (SomeException
e :: SomeException)
    where writeErrorResponse :: Int -> String -> m ()
writeErrorResponse c :: Int
c msg :: String
msg = do
              (Response -> Response) -> m ()
forall (m :: * -> *). MonadSnap m => (Response -> Response) -> m ()
modifyResponse ((Response -> Response) -> m ()) -> (Response -> Response) -> m ()
forall a b. (a -> b) -> a -> b
$ Int -> ByteString -> Response -> Response
setResponseStatus Int
c (ByteString -> Response -> Response)
-> ByteString -> Response -> Response
forall a b. (a -> b) -> a -> b
$ Text -> ByteString
T.encodeUtf8 (Text -> ByteString) -> Text -> ByteString
forall a b. (a -> b) -> a -> b
$ String -> Text
T.pack String
cmsg
              ByteString -> m ()
forall (m :: * -> *). MonadSnap m => ByteString -> m ()
writeBS (ByteString -> m ()) -> ByteString -> m ()
forall a b. (a -> b) -> a -> b
$ Text -> ByteString
T.encodeUtf8 (Text -> ByteString) -> Text -> ByteString
forall a b. (a -> b) -> a -> b
$ String -> Text
T.pack String
msg

throwUserError :: String -> IO a
throwUserError :: String -> IO a
throwUserError = IOError -> IO a
forall a. IOError -> IO a
ioError (IOError -> IO a) -> (String -> IOError) -> String -> IO a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> IOError
userError

-- | Exports a simple aggregate service with specified name and the aggregate
--   type.
--
-- The name of the service can be chosen arbitrarily, however it must be
-- exactly referred from 'reportAggregate' and client requests to the service
-- because the URL of the internal HTTP server contains this.
--
-- The aggregate type must have instances of 'FromJSON' and 'ToJSON' as its
-- objects will be transferred via HTTP in JSON format.
--
-- The service is implemented via 'ngxExportSimpleServiceTyped' with
-- 'AggregateServerConf' as the name of its custom type. This is an
-- 'ignitionService' with an HTTP server based on the [Snap
-- framework](http://snapframework.com/) running inside. The internal HTTP
-- server collects data from worker processes on URL
-- /\/put\/__\<name_of_the_service\>__/ and reports data on URL
-- /\/get\/__\<name_of_the_service\>__/.
ngxExportAggregateService :: String       -- ^ Name of the service
                          -> Name         -- ^ Name of the aggregate type
                          -> Q [Dec]
ngxExportAggregateService :: String -> Name -> Q [Dec]
ngxExportAggregateService f :: String
f a :: Name
a = do
    let nameF :: Name
nameF = 'aggregateServer
        fName :: Name
fName = String -> Name
mkName (String -> Name) -> String -> Name
forall a b. (a -> b) -> a -> b
$ "aggregate_" String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
f
        sName :: Name
sName = String -> Name
mkName (String -> Name) -> String -> Name
forall a b. (a -> b) -> a -> b
$ "aggregate_storage_" String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
f
        uName :: Name
uName = String -> Name
mkName (String -> Name) -> String -> Name
forall a b. (a -> b) -> a -> b
$ "aggregate_url_" String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
f
    [[Dec]] -> [Dec]
forall (t :: * -> *) a. Foldable t => t [a] -> [a]
concat ([[Dec]] -> [Dec]) -> Q [[Dec]] -> Q [Dec]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [Q [Dec]] -> Q [[Dec]]
forall (t :: * -> *) (m :: * -> *) a.
(Traversable t, Monad m) =>
t (m a) -> m (t a)
sequence
        [[Q Dec] -> Q [Dec]
forall (t :: * -> *) (m :: * -> *) a.
(Traversable t, Monad m) =>
t (m a) -> m (t a)
sequence
            [Name -> TypeQ -> Q Dec
sigD Name
uName [t|ByteString|]
            ,Name -> [ClauseQ] -> Q Dec
funD Name
uName [[PatQ] -> BodyQ -> [Q Dec] -> ClauseQ
clause [] (ExpQ -> BodyQ
normalB [|C8.pack f|]) []]
            ,Name -> TypeQ -> Q Dec
sigD Name
sName [t|Aggregate $(conT a)|]
            ,Name -> [ClauseQ] -> Q Dec
funD Name
sName
                [[PatQ] -> BodyQ -> [Q Dec] -> ClauseQ
clause []
                    (ExpQ -> BodyQ
normalB [|unsafePerformIO $ newIORef (0, M.empty)|])
                    []
                ]
            ,Name -> Inline -> RuleMatch -> Phases -> Q Dec
pragInlD Name
sName Inline
NoInline RuleMatch
FunLike Phases
AllPhases
            ,Name -> TypeQ -> Q Dec
sigD Name
fName [t|AggregateServerConf -> Bool -> IO L.ByteString|]
            ,Name -> [ClauseQ] -> Q Dec
funD Name
fName
                [[PatQ] -> BodyQ -> [Q Dec] -> ClauseQ
clause []
                    (ExpQ -> BodyQ
normalB [|$(varE nameF) $(varE sName) $(varE uName)|])
                    []
                ]
            ]
        -- FIXME: name AggregateServerConf must be imported from the user's
        -- module unqualified (see details in NgxExport/Tools.hs, function
        -- ngxExportSimpleService')!
        ,Name -> Name -> ServiceMode -> Q [Dec]
ngxExportSimpleServiceTyped
            Name
fName ''AggregateServerConf ServiceMode
SingleShotService
        ]

-- | Reports data to an aggregate service.
--
-- If reported data is 'Nothing' then data collected on the aggregate service
-- won't alter except that the timestamp associated with the PID of the sending
-- worker process will be updated.
reportAggregate :: ToJSON a => Int          -- ^ Port of the aggregate server
                            -> Maybe a      -- ^ Reported data
                            -> ByteString   -- ^ Name of the aggregate service
                            -> IO ()
reportAggregate :: Int -> Maybe a -> ByteString -> IO ()
reportAggregate p :: Int
p v :: Maybe a
v u :: ByteString
u =
    (SomeException -> IO ()) -> IO () -> IO ()
forall e a. Exception e => (e -> IO a) -> IO a -> IO a
handle (IO () -> SomeException -> IO ()
forall a b. a -> b -> a
const (IO () -> SomeException -> IO ())
-> IO () -> SomeException -> IO ()
forall a b. (a -> b) -> a -> b
$ () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return () :: SomeException -> IO ()) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
        Request
req <- String -> IO Request
forall (m :: * -> *). MonadThrow m => String -> m Request
parseRequest "POST http://127.0.0.1"
        Int32
pid <- CPid -> Int32
forall a b. (Integral a, Num b) => a -> b
fromIntegral (CPid -> Int32) -> IO CPid -> IO Int32
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO CPid
ngxPid :: IO Int32
        let !req' :: Request
req' = Request
req { requestBody :: RequestBody
requestBody = ByteString -> RequestBody
RequestBodyLBS (ByteString -> RequestBody) -> ByteString -> RequestBody
forall a b. (a -> b) -> a -> b
$ (Int32, Maybe a) -> ByteString
forall a. ToJSON a => a -> ByteString
encode (Int32
pid, Maybe a
v)
                        , port :: Int
port = Int
p
                        , path :: ByteString
Network.HTTP.Client.path = ByteString -> ByteString -> ByteString
B.append "put/" ByteString
u
                        }
        IO (Response ()) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Response ()) -> IO ()) -> IO (Response ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ Request -> Manager -> IO (Response ())
httpNoBody Request
req' Manager
httpManager

httpManager :: Manager
httpManager :: Manager
httpManager = IO Manager -> Manager
forall a. IO a -> a
unsafePerformIO (IO Manager -> Manager) -> IO Manager -> Manager
forall a b. (a -> b) -> a -> b
$ ManagerSettings -> IO Manager
newManager ManagerSettings
defaultManagerSettings
{-# NOINLINE httpManager #-}