-- | Elasticsearch logging back-end. module Log.Backend.ElasticSearch ( ElasticSearchConfig , esServer , esIndex , esShardCount , esReplicaCount , esMapping , esLogin , esLoginInsecure , checkElasticSearchLogin , checkElasticSearchConnection , defaultElasticSearchConfig , withElasticSearchLogger , elasticSearchLogger ) where import Control.Applicative import Control.Concurrent import Control.Exception import Control.Monad import Data.Aeson import Data.Aeson.Encode.Pretty import Data.IORef import Data.Maybe import Data.Semigroup import Data.Time import Data.Word import Log import Log.Internal.Logger import Network.HTTP.Client import Prelude import qualified Data.HashMap.Strict as H import qualified Data.Text as T import qualified Data.Text.IO as T import qualified Data.Text.Lazy as TL import qualified Data.Text.Lazy.Builder as T import qualified Data.Traversable as F import qualified Data.Vector as V import Log.Backend.ElasticSearch.Internal ---------------------------------------- -- | Create an 'elasticSearchLogger' for the duration of the given -- action, and shut it down afterwards, making sure that all buffered -- messages are actually written to the Elasticsearch store. withElasticSearchLogger :: ElasticSearchConfig -> (Logger -> IO r) -> IO r withElasticSearchLogger conf act = do logger <- elasticSearchLogger conf withLogger logger act {-# DEPRECATED elasticSearchLogger "Use 'withElasticSearchLogger' instead!" #-} -- | Start an asynchronous logger thread that stores messages using -- Elasticsearch. -- -- Please use 'withElasticSearchLogger' instead, which is more -- exception-safe (see the note attached to 'mkBulkLogger'). elasticSearchLogger :: ElasticSearchConfig -- ^ Configuration. -> IO Logger elasticSearchLogger esConf@ElasticSearchConfig{..} = do checkElasticSearchLogin esConf env <- mkEsEnv esConf versionRef <- newIORef Nothing indexRef <- newIORef T.empty mkBulkLogger "ElasticSearch" (\msgs -> do now <- getCurrentTime oldIndex <- readIORef indexRef retryOnException versionRef $ do -- We need to consider version of ES because ES >= 5.0.0 and ES >= 7.0.0 -- have slight differences in parts of API used for logging. version <- readIORef versionRef >>= \case Just version -> pure version Nothing -> serverInfo env >>= \case Left (ex :: HttpException) -> error $ "elasticSearchLogger: unexpected error: " <> show ex <> " (is ElasticSearch server running?)" Right reply -> case parseEsVersion $ responseBody reply of Nothing -> error $ "elasticSearchLogger: invalid response when parsing version number: " <> show reply Just version -> pure version -- Elasticsearch index names are additionally indexed by date so that each -- day is logged to a separate index to make log management easier. let index = T.concat [ esIndex , "-" , T.pack $ formatTime defaultTimeLocale "%F" now ] when (oldIndex /= index) $ do -- There is an obvious race condition in presence of more than one -- logger instance running, but it's irrelevant as attempting to create -- index that already exists is harmless. ixExists <- indexExists env index unless ixExists $ do reply <- createIndexWithMapping version env esConf index unless (isSuccess reply) $ do printEsError "error while creating index" $ responseBody reply writeIORef indexRef index let jsonMsgs = V.fromList $ map (toJsonMsg now) $ zip [1..] msgs reply <- responseBody <$> bulkIndex version env esConf index jsonMsgs -- Try to parse parts of reply to get information about log messages that -- failed to be inserted for some reason. case checkForBulkErrors jsonMsgs reply of Nothing -> printEsError "unexpected response" reply Just (hasErrors, responses) -> when hasErrors $ do -- If any message failed to be inserted because of type mismatch, go -- back to them, log the insertion failure and add type suffix to each -- of the keys in their "data" fields to work around type errors. let newMsgs = let modifyData :: Maybe Value -> Value -> Value modifyData merr (Object hm) = Object $ let newData = H.foldlWithKey' keyAddValueTypeSuffix H.empty hm in case merr of -- We have the error message, i.e. we're at the top -- level object, so add it to the data. Just err -> newData `H.union` H.singleton "__es_error" err Nothing -> newData modifyData _ v = v keyAddValueTypeSuffix acc k v = H.insert (case v of Object{} -> k <> "_object" Array{} -> k <> "_array" String{} -> k <> "_string" Number{} -> k <> "_number" Bool{} -> k <> "_bool" Null{} -> k <> "_null" ) (modifyData Nothing v) acc in adjustFailedMessagesWith modifyData jsonMsgs responses -- Attempt to put modified messages. newReply <- responseBody <$> bulkIndex version env esConf index newMsgs case checkForBulkErrors newMsgs newReply of Nothing -> printEsError "unexpected response" newReply Just (newHasErrors, newResponses) -> when newHasErrors $ do -- If some of the messages failed again (it might happen e.g. if -- data contains an array with elements of different types), drop -- their data field. let newerMsgs = let modifyData :: Maybe Value -> Value -> Value modifyData (Just err) Object{} = object [ "__es_error" .= err ] modifyData _ v = v in adjustFailedMessagesWith modifyData newMsgs newResponses -- Ignore any further errors. void $ bulkIndex version env esConf index newerMsgs) (refreshIndex env =<< readIORef indexRef) where -- Process reply of bulk indexing to get responses for each index operation -- and check whether any insertion failed. checkForBulkErrors :: V.Vector a -> Value -> Maybe (Bool, V.Vector Object) checkForBulkErrors jsonMsgs replyBody = do Object response <- pure replyBody Bool hasErrors <- "errors" `H.lookup` response Array jsonItems <- "items" `H.lookup` response items <- F.forM jsonItems $ \v -> do Object item <- pure v Object index_ <- "index" `H.lookup` item -- ES <= 2.x returns 'create' for some reason, so consider both. <|> "create" `H.lookup` item pure index_ guard $ V.length items == V.length jsonMsgs pure (hasErrors, items) adjustFailedMessagesWith :: (Maybe err -> obj -> obj) -> V.Vector (H.HashMap T.Text obj) -> V.Vector (H.HashMap T.Text err) -> V.Vector (H.HashMap T.Text obj) adjustFailedMessagesWith f jsonMsgs responses = let failed = V.imapMaybe (\n item -> (n, ) <$> "error" `H.lookup` item) responses in (`V.map` failed) $ \(n, err) -> H.adjust (f $ Just err) "data" $ jsonMsgs V.! n printEsError msg body = T.putStrLn $ "elasticSearchLogger: " <> msg <> " " <> prettyJson body retryOnException :: forall r. IORef (Maybe EsVersion) -> IO r -> IO r retryOnException versionRef m = try m >>= \case Left (ex::SomeException) -> do putStrLn $ "ElasticSearch: unexpected error: " <> show ex <> ", retrying in 10 seconds" -- If there was an exception, ElasticSearch version might've changed, so -- reset it. writeIORef versionRef Nothing threadDelay $ 10 * 1000000 retryOnException versionRef m Right result -> return result prettyJson :: Value -> T.Text prettyJson = TL.toStrict . T.toLazyText . encodePrettyToTextBuilder' defConfig { confIndent = Spaces 2 } toJsonMsg :: UTCTime -> (Word32, LogMessage) -> H.HashMap T.Text Value toJsonMsg now (n, msg) = H.union jMsg $ H.fromList [ ("insertion_order", toJSON n) , ("insertion_time", toJSON now) ] where Object jMsg = toJSON msg ---------------------------------------- -- | Check that login credentials are specified properly. -- -- @since 0.10.0.0 checkElasticSearchLogin :: ElasticSearchConfig -> IO () checkElasticSearchLogin ElasticSearchConfig{..} = when (isJust esLogin && not esLoginInsecure && not ("https:" `T.isPrefixOf` esServer)) $ error $ "ElasticSearch: insecure login: " <> "Attempting to send login credentials over an insecure connection. " <> "Set esLoginInsecure = True to disable this check." -- | Check that we can connect to the ES server. -- -- @since 0.10.0.0 checkElasticSearchConnection :: ElasticSearchConfig -> IO (Either HttpException ()) checkElasticSearchConnection esConf = fmap (const ()) <$> (serverInfo =<< mkEsEnv esConf)