{-# LANGUAGE CPP #-}
{-# LANGUAGE DeriveDataTypeable #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeFamilies #-}
module Katip.Scribes.ElasticSearch.Internal where
import Control.Applicative as A
import Control.Concurrent
import Control.Concurrent.Async
import Control.Concurrent.STM.TBMQueue
import Control.Exception.Base
import Control.Exception.Enclosed
import Control.Monad
import Control.Monad.Catch
import Control.Monad.IO.Class
import Control.Monad.STM
import Control.Retry (RetryPolicy,
exponentialBackoff,
limitRetries,
recovering)
import Data.Aeson
import Data.ByteString.Lazy (ByteString)
import Data.List.NonEmpty (NonEmpty(..))
import Data.Monoid ((<>))
import Data.Text (Text)
import qualified Data.Text as T
import qualified Data.Text.Encoding as T
import Data.Time
import Data.Time.Calendar.WeekDate
import Data.Typeable as Typeable
import Data.UUID
import qualified Data.UUID.V4 as UUID4
import qualified Database.V1.Bloodhound as V1
import qualified Database.V5.Bloodhound as V5
import Network.HTTP.Client
import Network.HTTP.Types.Status
import Text.Printf (printf)
import Katip.Core
import Katip.Scribes.ElasticSearch.Annotations
data EsScribeCfg v = EsScribeCfg {
essRetryPolicy :: RetryPolicy
, essQueueSize :: EsQueueSize
, essPoolSize :: EsPoolSize
, essAnnotateTypes :: Bool
, essIndexSettings :: IndexSettings v
, essIndexSharding :: IndexShardingPolicy
} deriving (Typeable)
defaultEsScribeCfg' :: ESVersion v => proxy v -> EsScribeCfg v
defaultEsScribeCfg' prx = EsScribeCfg {
essRetryPolicy = exponentialBackoff 25 <> limitRetries 5
, essQueueSize = EsQueueSize 1000
, essPoolSize = EsPoolSize 2
, essAnnotateTypes = False
, essIndexSettings = defaultIndexSettings prx
, essIndexSharding = DailyIndexSharding
}
defaultEsScribeCfg :: EsScribeCfg ESV1
defaultEsScribeCfg = defaultEsScribeCfgV1
defaultEsScribeCfgV1 :: EsScribeCfg ESV1
defaultEsScribeCfgV1 = defaultEsScribeCfg' (Typeable.Proxy :: Typeable.Proxy ESV1)
defaultEsScribeCfgV5 :: EsScribeCfg ESV5
defaultEsScribeCfgV5 = defaultEsScribeCfg' (Typeable.Proxy :: Typeable.Proxy ESV5)
data IndexShardingPolicy = NoIndexSharding
| MonthlyIndexSharding
| WeeklyIndexSharding
| DailyIndexSharding
| HourlyIndexSharding
| EveryMinuteIndexSharding
| CustomIndexSharding (forall a. Item a -> [IndexNameSegment])
instance Show IndexShardingPolicy where
show NoIndexSharding = "NoIndexSharding"
show MonthlyIndexSharding = "MonthlyIndexSharding"
show WeeklyIndexSharding = "WeeklyIndexSharding"
show DailyIndexSharding = "DailyIndexSharding"
show HourlyIndexSharding = "HourlyIndexSharding"
show EveryMinuteIndexSharding = "EveryMinuteIndexSharding"
show (CustomIndexSharding _) = "CustomIndexSharding λ"
newtype IndexNameSegment = IndexNameSegment {
indexNameSegment :: Text
} deriving (Show, Eq, Ord)
shardPolicySegs :: IndexShardingPolicy -> Item a -> [IndexNameSegment]
shardPolicySegs NoIndexSharding _ = []
shardPolicySegs MonthlyIndexSharding Item {..} = [sis y, sis m]
where
(y, m, _) = toGregorian (utctDay _itemTime)
shardPolicySegs WeeklyIndexSharding Item {..} = [sis y, sis m, sis d]
where
(y, m, d) = toGregorian (roundToSunday (utctDay _itemTime))
shardPolicySegs DailyIndexSharding Item {..} = [sis y, sis m, sis d]
where
(y, m, d) = toGregorian (utctDay _itemTime)
shardPolicySegs HourlyIndexSharding Item {..} = [sis y, sis m, sis d, sis h]
where
(y, m, d) = toGregorian (utctDay _itemTime)
(h, _) = splitTime (utctDayTime _itemTime)
shardPolicySegs EveryMinuteIndexSharding Item {..} = [sis y, sis m, sis d, sis h, sis mn]
where
(y, m, d) = toGregorian (utctDay _itemTime)
(h, mn) = splitTime (utctDayTime _itemTime)
shardPolicySegs (CustomIndexSharding f) i = f i
roundToSunday :: Day -> Day
roundToSunday d
| dow == 7 = d
| w > 1 = fromWeekDate y (w - 1) 7
| otherwise = fromWeekDate (y - 1) 53 7
where
(y, w, dow) = toWeekDate d
chooseIxn :: ESVersion v => proxy v -> IndexName v -> IndexShardingPolicy -> Item a -> IndexName v
chooseIxn prx ixn p i =
toIndexName prx (T.intercalate "-" ((fromIndexName prx ixn):segs))
where
segs = indexNameSegment A.<$> shardPolicySegs p i
sis :: Integral a => a -> IndexNameSegment
sis = IndexNameSegment . T.pack . fmt
where
fmt = printf "%02d" . toInteger
splitTime :: DiffTime -> (Int, Int)
splitTime t = asMins `divMod` 60
where
asMins = floor t `div` 60
data EsScribeSetupError = CouldNotCreateIndex !(Response ByteString)
| CouldNotUpdateIndexSettings !(Response ByteString)
| CouldNotCreateMapping !(Response ByteString)
| CouldNotPutTemplate !(Response ByteString)
deriving (Typeable, Show)
instance Exception EsScribeSetupError
mkEsScribe
:: forall v. ( ESVersion v
, MonadIO (BH v IO)
#if defined(__GLASGOW_HASKELL__) && __GLASGOW_HASKELL__ < 800
, Functor (BH v IO)
#endif
)
=> EsScribeCfg v
-> BHEnv v
-> IndexName v
-> MappingName v
-> Severity
-> Verbosity
-> IO Scribe
mkEsScribe cfg@EsScribeCfg {..} env ix mapping sev verb = do
q <- newTBMQueueIO $ unEsQueueSize essQueueSize
endSig <- newEmptyMVar
runBH prx env $ do
if shardingEnabled
then do
res <- putTemplate prx tpl tplName
unless (statusIsSuccessful (responseStatus res)) $
liftIO $ throwIO (CouldNotPutTemplate res)
else do
ixExists <- indexExists prx ix
if ixExists
then do
res <- updateIndexSettings prx (toUpdatabaleIndexSettings prx essIndexSettings) ix
unless (statusIsSuccessful (responseStatus res)) $
liftIO $ throwIO (CouldNotUpdateIndexSettings res)
else do
r1 <- createIndex prx essIndexSettings ix
unless (statusIsSuccessful (responseStatus r1)) $
liftIO $ throwIO (CouldNotCreateIndex r1)
r2 <- putMapping prx ix mapping base
unless (statusIsSuccessful (responseStatus r2)) $
liftIO $ throwIO (CouldNotCreateMapping r2)
workers <- replicateM (unEsPoolSize essPoolSize) $ async $
startWorker cfg env mapping q
_ <- async $ do
takeMVar endSig
atomically $ closeTBMQueue q
mapM_ waitCatch workers
putMVar endSig ()
let finalizer = putMVar endSig () >> takeMVar endSig
return (Scribe (logger q) finalizer)
where
logger :: forall a. LogItem a => TBMQueue (IndexName v, Value) -> Item a -> IO ()
logger q i = when (_itemSeverity i >= sev) $
void $ atomically $ tryWriteTBMQueue q (chooseIxn prx ix essIndexSharding i, itemJson' i)
prx :: Typeable.Proxy v
prx = Typeable.Proxy
tplName = toTemplateName prx ixn
shardingEnabled = case essIndexSharding of
NoIndexSharding -> False
_ -> True
tpl = toIndexTemplate prx (toTemplatePattern prx (ixn <> "-*")) (Just essIndexSettings) [toJSON base]
base = baseMapping prx mapping
ixn = fromIndexName prx ix
itemJson' :: LogItem a => Item a -> Value
itemJson' i
| essAnnotateTypes = itemJson verb (TypeAnnotated <$> i)
| otherwise = itemJson verb i
baseMapping :: ESVersion v => proxy v -> MappingName v -> Value
baseMapping prx mn =
object [ fromMappingName prx mn .= object ["properties" .= object prs] ]
where prs = [ unanalyzedString "thread"
, unanalyzedString "sev"
, unanalyzedString "pid"
, analyzedString "ns"
, analyzedString "msg"
, "loc" .= locType
, unanalyzedString "host"
, unanalyzedString "env"
, "at" .= dateType
, unanalyzedString "app"
]
unanalyzedString k = k .= unanalyzedStringSpec prx
analyzedString k = k .= analyzedStringSpec prx
locType = object ["properties" .= object locPairs]
locPairs = [ unanalyzedString "loc_pkg"
, unanalyzedString "loc_mod"
, unanalyzedString "loc_ln"
, unanalyzedString "loc_fn"
, unanalyzedString "loc_col"
]
dateType = object [ "format" .= esDateFormat
, "type" .= String "date"
]
esDateFormat :: Text
esDateFormat = "yyyy-MM-dd'T'HH:mm:ssZ||yyyy-MM-dd'T'HH:mm:ss.SSSZ||yyyy-MM-dd'T'HH:mm:ss.SSSSSSSSSSSSZ"
mkDocId :: ESVersion v => proxy v -> IO (DocId v)
mkDocId prx = (toDocId prx . T.decodeUtf8 . toASCIIBytes) `fmap` UUID4.nextRandom
newtype EsQueueSize = EsQueueSize {
unEsQueueSize :: Int
} deriving (Show, Eq, Ord)
instance Bounded EsQueueSize where
minBound = EsQueueSize 1
maxBound = EsQueueSize maxBound
mkEsQueueSize :: Int -> Maybe EsQueueSize
mkEsQueueSize = mkNonZero EsQueueSize
newtype EsPoolSize = EsPoolSize {
unEsPoolSize :: Int
} deriving (Show, Eq, Ord)
instance Bounded EsPoolSize where
minBound = EsPoolSize 1
maxBound = EsPoolSize maxBound
mkEsPoolSize :: Int -> Maybe EsPoolSize
mkEsPoolSize = mkNonZero EsPoolSize
mkNonZero :: (Int -> a) -> Int -> Maybe a
mkNonZero ctor n
| n > 0 = Just $ ctor n
| otherwise = Nothing
startWorker
:: forall v. (ESVersion v)
=> EsScribeCfg v
-> BHEnv v
-> MappingName v
-> TBMQueue (IndexName v, Value)
-> IO ()
startWorker EsScribeCfg {..} env mapping q = go
where
go = do
popped <- atomically $ readTBMQueue q
case popped of
Just (ixn, v) -> do
sendLog ixn v `catchAny` eat
go
Nothing -> return ()
prx :: Typeable.Proxy v
prx = Typeable.Proxy
sendLog :: IndexName v -> Value -> IO ()
sendLog ixn v = void $ recovering essRetryPolicy [handler] $ const $ do
did <- mkDocId prx
res <- runBH prx env $ indexDocument prx ixn mapping (defaultIndexDocumentSettings prx) v did
return res
eat _ = return ()
handler _ = Handler $ \e ->
case fromException e of
Just (_ :: AsyncException) -> return False
_ -> return True
class ESVersion v where
type BHEnv v
type IndexSettings v
defaultIndexSettings :: proxy v -> IndexSettings v
type UpdatableIndexSetting v
type IndexName v
toIndexName :: proxy v -> Text -> IndexName v
fromIndexName :: proxy v -> IndexName v -> Text
type MappingName v
fromMappingName :: proxy v -> MappingName v -> Text
type DocId v
toDocId :: proxy v -> Text -> DocId v
type BH v :: (* -> *) -> * -> *
runBH :: proxy v -> BHEnv v -> BH v m a -> m a
type TemplateName v
toTemplateName :: proxy v -> Text -> TemplateName v
type TemplatePattern v
toTemplatePattern :: proxy v -> Text -> TemplatePattern v
type IndexTemplate v
toIndexTemplate :: proxy v -> TemplatePattern v -> Maybe (IndexSettings v) -> [Value] -> IndexTemplate v
type IndexDocumentSettings v
defaultIndexDocumentSettings :: proxy v -> IndexDocumentSettings v
toUpdatabaleIndexSettings :: proxy v -> IndexSettings v -> NonEmpty (UpdatableIndexSetting v)
indexExists :: proxy v -> IndexName v -> BH v IO Bool
indexDocument :: ToJSON doc => proxy v -> IndexName v -> MappingName v -> IndexDocumentSettings v -> doc -> DocId v -> BH v IO (Response ByteString)
createIndex :: proxy v -> IndexSettings v -> IndexName v -> BH v IO (Response ByteString)
updateIndexSettings :: proxy v -> NonEmpty (UpdatableIndexSetting v) -> IndexName v -> BH v IO (Response ByteString)
putTemplate :: proxy v -> IndexTemplate v -> TemplateName v -> BH v IO (Response ByteString)
putMapping :: (ToJSON a) => proxy v -> IndexName v -> MappingName v -> a -> BH v IO (Response ByteString)
unanalyzedStringSpec :: proxy v -> Value
analyzedStringSpec :: proxy v -> Value
data ESV1 = ESV1
instance ESVersion ESV1 where
type BHEnv ESV1 = V1.BHEnv
type IndexSettings ESV1 = V1.IndexSettings
defaultIndexSettings _ = V1.defaultIndexSettings
type IndexName ESV1 = V1.IndexName
type UpdatableIndexSetting ESV1 = V1.UpdatableIndexSetting
toIndexName _ = V1.IndexName
fromIndexName _ (V1.IndexName x) = x
type MappingName ESV1 = V1.MappingName
fromMappingName _ (V1.MappingName x) = x
type DocId ESV1 = V1.DocId
toDocId _ = V1.DocId
type BH ESV1 = V1.BH
runBH _ = V1.runBH
type TemplateName ESV1 = V1.TemplateName
toTemplateName _ = V1.TemplateName
type TemplatePattern ESV1 = V1.TemplatePattern
toTemplatePattern _ = V1.TemplatePattern
type IndexTemplate ESV1 = V1.IndexTemplate
toIndexTemplate _ = V1.IndexTemplate
type IndexDocumentSettings ESV1 = V1.IndexDocumentSettings
toUpdatabaleIndexSettings _ s =
(V1.NumberOfReplicas (V1.indexReplicas s)) :| []
defaultIndexDocumentSettings _ = V1.defaultIndexDocumentSettings
indexExists _ = V1.indexExists
indexDocument _ = V1.indexDocument
createIndex _ = V1.createIndex
updateIndexSettings _ = V1.updateIndexSettings
putTemplate _ = V1.putTemplate
putMapping _ = V1.putMapping
unanalyzedStringSpec _ = object
[ "type" .= String "string"
, "index" .= String "not_analyzed"
]
analyzedStringSpec _ = object
[ "type" .= String "string"
, "index" .= String "analyzed"
]
data ESV5 = ESV5
instance ESVersion ESV5 where
type BHEnv ESV5 = V5.BHEnv
type IndexSettings ESV5 = V5.IndexSettings
type UpdatableIndexSetting ESV5 = V5.UpdatableIndexSetting
defaultIndexSettings _ = V5.defaultIndexSettings
type IndexName ESV5 = V5.IndexName
toIndexName _ = V5.IndexName
fromIndexName _ (V5.IndexName x) = x
type MappingName ESV5 = V5.MappingName
fromMappingName _ (V5.MappingName x) = x
type DocId ESV5 = V5.DocId
toDocId _ = V5.DocId
type BH ESV5 = V5.BH
runBH _ = V5.runBH
type TemplateName ESV5 = V5.TemplateName
toTemplateName _ = V5.TemplateName
type TemplatePattern ESV5 = V5.TemplatePattern
toTemplatePattern _ = V5.TemplatePattern
type IndexTemplate ESV5 = V5.IndexTemplate
toIndexTemplate _ = V5.IndexTemplate
type IndexDocumentSettings ESV5 = V5.IndexDocumentSettings
toUpdatabaleIndexSettings _ s =
(V5.NumberOfReplicas (V5.indexReplicas s)) :| []
defaultIndexDocumentSettings _ = V5.defaultIndexDocumentSettings
indexExists _ = V5.indexExists
indexDocument _ = V5.indexDocument
createIndex _ = V5.createIndex
updateIndexSettings _ = V5.updateIndexSettings
putTemplate _ = V5.putTemplate
putMapping _ = V5.putMapping
unanalyzedStringSpec _ = object
[ "type" .= String "keyword"
]
analyzedStringSpec _ = object
[ "type" .= String "text"
]