module Log.Backend.ElasticSearch
( ElasticSearchConfig
, esServer
, esIndex
, esShardCount
, esReplicaCount
, esMapping
, esLogin
, esLoginInsecure
, checkElasticSearchLogin
, checkElasticSearchConnection
, defaultElasticSearchConfig
, withElasticSearchLogger
, elasticSearchLogger
) where
import Control.Applicative
import Control.Concurrent
import Control.Exception
import Control.Monad
import Data.Aeson
import Data.Aeson.Encode.Pretty
import Data.IORef
import Data.Maybe
import Data.Semigroup
import Data.Time
import Data.Word
import Log
import Log.Internal.Logger
import Network.HTTP.Client
import Prelude
import qualified Data.HashMap.Strict as H
import qualified Data.Text as T
import qualified Data.Text.IO as T
import qualified Data.Text.Lazy as TL
import qualified Data.Text.Lazy.Builder as T
import qualified Data.Traversable as F
import qualified Data.Vector as V
import Log.Backend.ElasticSearch.Internal
withElasticSearchLogger :: ElasticSearchConfig -> (Logger -> IO r) -> IO r
withElasticSearchLogger conf act = do
logger <- elasticSearchLogger conf
withLogger logger act
{-# DEPRECATED elasticSearchLogger "Use 'withElasticSearchLogger' instead!" #-}
elasticSearchLogger
:: ElasticSearchConfig
-> IO Logger
elasticSearchLogger esConf@ElasticSearchConfig{..} = do
checkElasticSearchLogin esConf
env <- mkEsEnv esConf
versionRef <- newIORef Nothing
indexRef <- newIORef T.empty
mkBulkLogger "ElasticSearch" (\msgs -> do
now <- getCurrentTime
oldIndex <- readIORef indexRef
retryOnException versionRef $ do
version <- readIORef versionRef >>= \case
Just version -> pure version
Nothing -> serverInfo env >>= \case
Left (ex :: HttpException) -> error
$ "elasticSearchLogger: unexpected error: "
<> show ex
<> " (is ElasticSearch server running?)"
Right reply -> case parseEsVersion $ responseBody reply of
Nothing -> error
$ "elasticSearchLogger: invalid response when parsing version number: "
<> show reply
Just version -> pure version
let index = T.concat
[ esIndex
, "-"
, T.pack $ formatTime defaultTimeLocale "%F" now
]
when (oldIndex /= index) $ do
ixExists <- indexExists env index
unless ixExists $ do
reply <- createIndexWithMapping version env esConf index
unless (isSuccess reply) $ do
printEsError "error while creating index" $ responseBody reply
writeIORef indexRef index
let jsonMsgs = V.fromList $ map (toJsonMsg now) $ zip [1..] msgs
reply <- responseBody <$> bulkIndex version env esConf index jsonMsgs
case checkForBulkErrors jsonMsgs reply of
Nothing -> printEsError "unexpected response" reply
Just (hasErrors, responses) -> when hasErrors $ do
let newMsgs =
let modifyData :: Maybe Value -> Value -> Value
modifyData merr (Object hm) = Object $
let newData = H.foldlWithKey' keyAddValueTypeSuffix H.empty hm
in case merr of
Just err -> newData `H.union` H.singleton "__es_error" err
Nothing -> newData
modifyData _ v = v
keyAddValueTypeSuffix acc k v = H.insert
(case v of
Object{} -> k <> "_object"
Array{} -> k <> "_array"
String{} -> k <> "_string"
Number{} -> k <> "_number"
Bool{} -> k <> "_bool"
Null{} -> k <> "_null"
) (modifyData Nothing v) acc
in adjustFailedMessagesWith modifyData jsonMsgs responses
newReply <- responseBody <$> bulkIndex version env esConf index newMsgs
case checkForBulkErrors newMsgs newReply of
Nothing -> printEsError "unexpected response" newReply
Just (newHasErrors, newResponses) -> when newHasErrors $ do
let newerMsgs =
let modifyData :: Maybe Value -> Value -> Value
modifyData (Just err) Object{} = object [ "__es_error" .= err ]
modifyData _ v = v
in adjustFailedMessagesWith modifyData newMsgs newResponses
void $ bulkIndex version env esConf index newerMsgs)
(refreshIndex env =<< readIORef indexRef)
where
checkForBulkErrors
:: V.Vector a
-> Value
-> Maybe (Bool, V.Vector Object)
checkForBulkErrors jsonMsgs replyBody = do
Object response <- pure replyBody
Bool hasErrors <- "errors" `H.lookup` response
Array jsonItems <- "items" `H.lookup` response
items <- F.forM jsonItems $ \v -> do
Object item <- pure v
Object index_ <- "index" `H.lookup` item
<|> "create" `H.lookup` item
pure index_
guard $ V.length items == V.length jsonMsgs
pure (hasErrors, items)
adjustFailedMessagesWith
:: (Maybe err -> obj -> obj)
-> V.Vector (H.HashMap T.Text obj)
-> V.Vector (H.HashMap T.Text err)
-> V.Vector (H.HashMap T.Text obj)
adjustFailedMessagesWith f jsonMsgs responses =
let failed = V.imapMaybe (\n item -> (n, ) <$> "error" `H.lookup` item) responses
in (`V.map` failed) $ \(n, err) -> H.adjust (f $ Just err) "data" $ jsonMsgs V.! n
printEsError msg body =
T.putStrLn $ "elasticSearchLogger: " <> msg <> " " <> prettyJson body
retryOnException :: forall r. IORef (Maybe EsVersion) -> IO r -> IO r
retryOnException versionRef m = try m >>= \case
Left (ex::SomeException) -> do
putStrLn $ "ElasticSearch: unexpected error: "
<> show ex <> ", retrying in 10 seconds"
writeIORef versionRef Nothing
threadDelay $ 10 * 1000000
retryOnException versionRef m
Right result -> return result
prettyJson :: Value -> T.Text
prettyJson = TL.toStrict
. T.toLazyText
. encodePrettyToTextBuilder' defConfig { confIndent = Spaces 2 }
toJsonMsg :: UTCTime -> (Word32, LogMessage) -> H.HashMap T.Text Value
toJsonMsg now (n, msg) = H.union jMsg $ H.fromList
[ ("insertion_order", toJSON n)
, ("insertion_time", toJSON now)
]
where
Object jMsg = toJSON msg
checkElasticSearchLogin :: ElasticSearchConfig -> IO ()
checkElasticSearchLogin ElasticSearchConfig{..} =
when (isJust esLogin
&& not esLoginInsecure
&& not ("https:" `T.isPrefixOf` esServer)) $
error $ "ElasticSearch: insecure login: "
<> "Attempting to send login credentials over an insecure connection. "
<> "Set esLoginInsecure = True to disable this check."
checkElasticSearchConnection :: ElasticSearchConfig -> IO (Either HttpException ())
checkElasticSearchConnection esConf =
fmap (const ()) <$> (serverInfo =<< mkEsEnv esConf)