module Katip.Scribes.ElasticSearch
(
mkEsScribe
, EsScribeSetupError(..)
, EsQueueSize
, mkEsQueueSize
, EsPoolSize
, mkEsPoolSize
, EsScribeCfg
, essRetryPolicy
, essQueueSize
, essPoolSize
, essAnnotateTypes
, essIndexSettings
, essIndexSharding
, IndexShardingPolicy(..)
, IndexNameSegment(..)
, defaultEsScribeCfg
, mkDocId
, module Katip.Scribes.ElasticSearch.Annotations
, roundToSunday
) where
import Control.Applicative as A
import Control.Concurrent
import Control.Concurrent.Async
import Control.Concurrent.STM.TBMQueue
import Control.Exception.Base
import Control.Exception.Enclosed
import Control.Monad
import Control.Monad.Catch
import Control.Monad.IO.Class
import Control.Monad.STM
import Control.Retry (RetryPolicy,
exponentialBackoff,
limitRetries,
recovering)
import Data.Aeson
import Data.Monoid ((<>))
import Data.Text (Text)
import qualified Data.Text as T
import qualified Data.Text.Encoding as T
import Data.Time
import Data.Time.Calendar.WeekDate
import Data.Typeable
import Data.UUID
import qualified Data.UUID.V4 as UUID4
import Database.Bloodhound
import Network.HTTP.Client
import Network.HTTP.Types.Status
import Text.Printf (printf)
import Katip.Core
import Katip.Scribes.ElasticSearch.Annotations
data EsScribeCfg = EsScribeCfg {
essRetryPolicy :: RetryPolicy
, essQueueSize :: EsQueueSize
, essPoolSize :: EsPoolSize
, essAnnotateTypes :: Bool
, essIndexSettings :: IndexSettings
, essIndexSharding :: IndexShardingPolicy
} deriving (Typeable)
defaultEsScribeCfg :: EsScribeCfg
defaultEsScribeCfg = EsScribeCfg {
essRetryPolicy = exponentialBackoff 25 <> limitRetries 5
, essQueueSize = EsQueueSize 1000
, essPoolSize = EsPoolSize 2
, essAnnotateTypes = False
, essIndexSettings = defaultIndexSettings
, essIndexSharding = DailyIndexSharding
}
data IndexShardingPolicy = NoIndexSharding
| MonthlyIndexSharding
| WeeklyIndexSharding
| DailyIndexSharding
| HourlyIndexSharding
| EveryMinuteIndexSharding
| CustomIndexSharding (forall a. Item a -> [IndexNameSegment])
instance Show IndexShardingPolicy where
show NoIndexSharding = "NoIndexSharding"
show MonthlyIndexSharding = "MonthlyIndexSharding"
show WeeklyIndexSharding = "WeeklyIndexSharding"
show DailyIndexSharding = "DailyIndexSharding"
show HourlyIndexSharding = "HourlyIndexSharding"
show EveryMinuteIndexSharding = "EveryMinuteIndexSharding"
show (CustomIndexSharding _) = "CustomIndexSharding λ"
newtype IndexNameSegment = IndexNameSegment {
indexNameSegment :: Text
} deriving (Show, Eq, Ord)
shardPolicySegs :: IndexShardingPolicy -> Item a -> [IndexNameSegment]
shardPolicySegs NoIndexSharding _ = []
shardPolicySegs MonthlyIndexSharding Item {..} = [sis y, sis m]
where
(y, m, _) = toGregorian (utctDay _itemTime)
shardPolicySegs WeeklyIndexSharding Item {..} = [sis y, sis m, sis d]
where
(y, m, d) = toGregorian (roundToSunday (utctDay _itemTime))
shardPolicySegs DailyIndexSharding Item {..} = [sis y, sis m, sis d]
where
(y, m, d) = toGregorian (utctDay _itemTime)
shardPolicySegs HourlyIndexSharding Item {..} = [sis y, sis m, sis d, sis h]
where
(y, m, d) = toGregorian (utctDay _itemTime)
(h, _) = splitTime (utctDayTime _itemTime)
shardPolicySegs EveryMinuteIndexSharding Item {..} = [sis y, sis m, sis d, sis h, sis mn]
where
(y, m, d) = toGregorian (utctDay _itemTime)
(h, mn) = splitTime (utctDayTime _itemTime)
shardPolicySegs (CustomIndexSharding f) i = f i
roundToSunday :: Day -> Day
roundToSunday d
| dow == 7 = d
| w > 1 = fromWeekDate y (w 1) 7
| otherwise = fromWeekDate (y 1) 53 7
where
(y, w, dow) = toWeekDate d
chooseIxn :: IndexName -> IndexShardingPolicy -> Item a -> IndexName
chooseIxn (IndexName ixn) p i =
IndexName (T.intercalate "-" (ixn:segs))
where
segs = indexNameSegment A.<$> shardPolicySegs p i
sis :: Integral a => a -> IndexNameSegment
sis = IndexNameSegment . T.pack . fmt
where
fmt = printf "%02d" . toInteger
splitTime :: DiffTime -> (Int, Int)
splitTime t = asMins `divMod` 60
where
asMins = floor t `div` 60
data EsScribeSetupError = CouldNotCreateIndex !Reply
| CouldNotCreateMapping !Reply deriving (Typeable, Show)
instance Exception EsScribeSetupError
mkEsScribe
:: EsScribeCfg
-> BHEnv
-> IndexName
-> MappingName
-> Severity
-> Verbosity
-> IO (Scribe, IO ())
mkEsScribe cfg@EsScribeCfg {..} env ix mapping sev verb = do
q <- newTBMQueueIO $ unEsQueueSize essQueueSize
endSig <- newEmptyMVar
runBH env $ do
chk <- indexExists ix
unless chk $ void $ do
r1 <- createIndex essIndexSettings ix
unless (statusIsSuccessful (responseStatus r1)) $
liftIO $ throwIO (CouldNotCreateIndex r1)
r2 <- if shardingEnabled
then putTemplate tpl tplName
else putMapping ix mapping (baseMapping mapping)
unless (statusIsSuccessful (responseStatus r2)) $
liftIO $ throwIO (CouldNotCreateMapping r2)
workers <- replicateM (unEsPoolSize essPoolSize) $ async $
startWorker cfg env mapping q
_ <- async $ do
takeMVar endSig
atomically $ closeTBMQueue q
mapM_ waitCatch workers
putMVar endSig ()
let scribe = Scribe $ \ i ->
when (_itemSeverity i >= sev) $
void $ atomically $ tryWriteTBMQueue q (chooseIxn ix essIndexSharding i, itemJson' i)
let finalizer = putMVar endSig () >> takeMVar endSig
return (scribe, finalizer)
where
tplName = TemplateName ixn
shardingEnabled = case essIndexSharding of
NoIndexSharding -> False
_ -> True
tpl = IndexTemplate (TemplatePattern (ixn <> "-*")) (Just essIndexSettings) [toJSON (baseMapping mapping)]
IndexName ixn = ix
itemJson' i
| essAnnotateTypes = itemJson verb (TypeAnnotated <$> i)
| otherwise = itemJson verb i
baseMapping :: MappingName -> Value
baseMapping (MappingName mn) =
object [ mn .= object ["properties" .= object prs] ]
where prs = [ str "thread"
, str "sev"
, str "pid"
, str "ns"
, str "msg"
, "loc" .= locType
, str "host"
, str "env"
, "at" .= dateType
, str "app"
]
str k = k .= object ["type" .= String "string"]
locType = object ["properties" .= object locPairs]
locPairs = [ str "loc_pkg"
, str "loc_mod"
, str "loc_ln"
, str "loc_fn"
, str "loc_col"
]
dateType = object [ "format" .= esDateFormat
, "type" .= String "date"
]
esDateFormat :: Text
esDateFormat = "yyyy-MM-dd'T'HH:mm:ssZ||yyyy-MM-dd'T'HH:mm:ss.SSSZ||yyyy-MM-dd'T'HH:mm:ss.SSSSSSSSSSSSZ"
mkDocId :: IO DocId
mkDocId = (DocId . T.decodeUtf8 . toASCIIBytes) `fmap` UUID4.nextRandom
newtype EsQueueSize = EsQueueSize {
unEsQueueSize :: Int
} deriving (Show, Eq, Ord)
instance Bounded EsQueueSize where
minBound = EsQueueSize 1
maxBound = EsQueueSize maxBound
mkEsQueueSize :: Int -> Maybe EsQueueSize
mkEsQueueSize = mkNonZero EsQueueSize
newtype EsPoolSize = EsPoolSize {
unEsPoolSize :: Int
} deriving (Show, Eq, Ord)
instance Bounded EsPoolSize where
minBound = EsPoolSize 1
maxBound = EsPoolSize maxBound
mkEsPoolSize :: Int -> Maybe EsPoolSize
mkEsPoolSize = mkNonZero EsPoolSize
mkNonZero :: (Int -> a) -> Int -> Maybe a
mkNonZero ctor n
| n > 0 = Just $ ctor n
| otherwise = Nothing
startWorker
:: EsScribeCfg
-> BHEnv
-> MappingName
-> TBMQueue (IndexName, Value)
-> IO ()
startWorker EsScribeCfg {..} env mapping q = go
where
go = do
popped <- atomically $ readTBMQueue q
case popped of
Just (ixn, v) -> do
sendLog ixn v `catchAny` eat
go
Nothing -> return ()
sendLog :: IndexName -> Value -> IO ()
sendLog ixn v = void $ recovering essRetryPolicy [handler] $ const $ do
did <- mkDocId
res <- runBH env $ indexDocument ixn mapping defaultIndexDocumentSettings v did
return res
eat _ = return ()
handler _ = Handler $ \e ->
case fromException e of
Just (_ :: AsyncException) -> return False
_ -> return True