-- | Elasticsearch logging back-end.
module Log.Backend.ElasticSearch
  ( ElasticSearchConfig
  , esServer
  , esIndex
  , esShardCount
  , esReplicaCount
  , esMapping
  , esLogin
  , esLoginInsecure
  , checkElasticSearchLogin
  , checkElasticSearchConnection
  , defaultElasticSearchConfig
  , withElasticSearchLogger
  ) where

import Control.Applicative
import Control.Concurrent
import Control.Exception
import Control.Monad
import Control.Monad.IO.Unlift
import Data.Aeson
import Data.Aeson.Encode.Pretty
import Data.IORef
import Data.Maybe
import Data.Time
import Log
import Log.Internal.Logger
import Network.HTTP.Client
import qualified Data.Text as T
import qualified Data.Text.IO as T
import qualified Data.Text.Lazy as TL
import qualified Data.Text.Lazy.Builder as T
import qualified Data.Traversable as F
import qualified Data.Vector as V

import Log.Backend.ElasticSearch.Internal
import qualified Log.Internal.Aeson.Compat as AC

----------------------------------------
-- | Create an 'elasticSearchLogger' for the duration of the given
-- action, and shut it down afterwards, making sure that all buffered
-- messages are actually written to the Elasticsearch store.
withElasticSearchLogger :: MonadUnliftIO m => ElasticSearchConfig -> (Logger -> m r) -> m r
withElasticSearchLogger :: forall (m :: * -> *) r.
MonadUnliftIO m =>
ElasticSearchConfig -> (Logger -> m r) -> m r
withElasticSearchLogger ElasticSearchConfig
conf Logger -> m r
act = forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
unlift -> do
  Logger
logger <- ElasticSearchConfig -> IO Logger
elasticSearchLogger ElasticSearchConfig
conf
  forall r. Logger -> (Logger -> IO r) -> IO r
withLogger Logger
logger (forall a. m a -> IO a
unlift forall b c a. (b -> c) -> (a -> b) -> a -> c
. Logger -> m r
act)

-- | Start an asynchronous logger thread that stores messages using
-- Elasticsearch.
--
-- Please use 'withElasticSearchLogger' instead, which is more
-- exception-safe (see the note attached to 'mkBulkLogger').
elasticSearchLogger
  :: ElasticSearchConfig -- ^ Configuration.
  -> IO Logger
elasticSearchLogger :: ElasticSearchConfig -> IO Logger
elasticSearchLogger esConf :: ElasticSearchConfig
esConf@ElasticSearchConfig{Bool
Int
Maybe (Text, Text)
Text
esLoginInsecure :: Bool
esLogin :: Maybe (Text, Text)
esMapping :: Text
esReplicaCount :: Int
esShardCount :: Int
esIndex :: Text
esServer :: Text
esLoginInsecure :: ElasticSearchConfig -> Bool
esLogin :: ElasticSearchConfig -> Maybe (Text, Text)
esMapping :: ElasticSearchConfig -> Text
esReplicaCount :: ElasticSearchConfig -> Int
esShardCount :: ElasticSearchConfig -> Int
esIndex :: ElasticSearchConfig -> Text
esServer :: ElasticSearchConfig -> Text
..} = do
  forall (m :: * -> *). MonadIO m => ElasticSearchConfig -> m ()
checkElasticSearchLogin ElasticSearchConfig
esConf
  EsEnv
env <- ElasticSearchConfig -> IO EsEnv
mkEsEnv ElasticSearchConfig
esConf
  IORef (Maybe EsVersion)
versionRef <- forall a. a -> IO (IORef a)
newIORef forall a. Maybe a
Nothing
  IORef Text
indexRef <- forall a. a -> IO (IORef a)
newIORef Text
T.empty
  Text -> ([LogMessage] -> IO ()) -> IO () -> IO Logger
mkBulkLogger Text
"ElasticSearch" (\[LogMessage]
msgs -> do
    UTCTime
now <- IO UTCTime
getCurrentTime
    Text
oldIndex <- forall a. IORef a -> IO a
readIORef IORef Text
indexRef
    forall r. IORef (Maybe EsVersion) -> IO r -> IO r
retryOnException IORef (Maybe EsVersion)
versionRef forall a b. (a -> b) -> a -> b
$ do
      -- We need to consider version of ES because ES >= 5.0.0 and ES >= 7.0.0
      -- have slight differences in parts of API used for logging.
      EsVersion
version <- forall a. IORef a -> IO a
readIORef IORef (Maybe EsVersion)
versionRef forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
        Just EsVersion
version -> forall (f :: * -> *) a. Applicative f => a -> f a
pure EsVersion
version
        Maybe EsVersion
Nothing -> EsEnv -> IO (Either HttpException (Response Value))
serverInfo EsEnv
env forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
          Left (HttpException
ex :: HttpException) -> forall a. HasCallStack => [Char] -> a
error
            forall a b. (a -> b) -> a -> b
$  [Char]
"elasticSearchLogger: unexpected error: "
            forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> [Char]
show HttpException
ex
            forall a. Semigroup a => a -> a -> a
<> [Char]
" (is ElasticSearch server running?)"
          Right Response Value
reply -> case Value -> Maybe EsVersion
parseEsVersion forall a b. (a -> b) -> a -> b
$ forall body. Response body -> body
responseBody Response Value
reply of
            Maybe EsVersion
Nothing -> forall a. HasCallStack => [Char] -> a
error
              forall a b. (a -> b) -> a -> b
$  [Char]
"elasticSearchLogger: invalid response when parsing version number: "
              forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> [Char]
show Response Value
reply
            Just EsVersion
version -> forall (f :: * -> *) a. Applicative f => a -> f a
pure EsVersion
version
      -- Elasticsearch index names are additionally indexed by date so that each
      -- day is logged to a separate index to make log management easier.
      let index :: Text
index = [Text] -> Text
T.concat
            [ Text
esIndex
            , Text
"-"
            , [Char] -> Text
T.pack forall a b. (a -> b) -> a -> b
$ forall t. FormatTime t => TimeLocale -> [Char] -> t -> [Char]
formatTime TimeLocale
defaultTimeLocale [Char]
"%F" UTCTime
now
            ]
      forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Text
oldIndex forall a. Eq a => a -> a -> Bool
/= Text
index) forall a b. (a -> b) -> a -> b
$ do
        -- There is an obvious race condition in presence of more than one
        -- logger instance running, but it's irrelevant as attempting to create
        -- index that already exists is harmless.
        Bool
ixExists <- EsEnv -> Text -> IO Bool
indexExists EsEnv
env Text
index
        forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
ixExists forall a b. (a -> b) -> a -> b
$ do
          Response Value
reply <- EsVersion
-> EsEnv -> ElasticSearchConfig -> Text -> IO (Response Value)
createIndexWithMapping EsVersion
version EsEnv
env ElasticSearchConfig
esConf Text
index
          forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (forall a. Response a -> Bool
isSuccess Response Value
reply) forall a b. (a -> b) -> a -> b
$ do
            Text -> Value -> IO ()
printEsError Text
"error while creating index" forall a b. (a -> b) -> a -> b
$ forall body. Response body -> body
responseBody Response Value
reply
        forall a. IORef a -> a -> IO ()
writeIORef IORef Text
indexRef Text
index
      let jsonMsgs :: Vector Object
jsonMsgs = forall a. [a] -> Vector a
V.fromList forall a b. (a -> b) -> a -> b
$ forall a b. (a -> b) -> [a] -> [b]
map LogMessage -> Object
toJsonMsg [LogMessage]
msgs
      Value
reply <- forall body. Response body -> body
responseBody forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> EsVersion
-> EsEnv
-> ElasticSearchConfig
-> Text
-> Vector Object
-> IO (Response Value)
bulkIndex EsVersion
version EsEnv
env ElasticSearchConfig
esConf Text
index Vector Object
jsonMsgs
      -- Try to parse parts of reply to get information about log messages that
      -- failed to be inserted for some reason.
      case forall a. Vector a -> Value -> Maybe (Bool, Vector Object)
checkForBulkErrors Vector Object
jsonMsgs Value
reply of
        Maybe (Bool, Vector Object)
Nothing -> Text -> Value -> IO ()
printEsError Text
"unexpected response" Value
reply
        Just (Bool
hasErrors, Vector Object
responses) -> forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
hasErrors forall a b. (a -> b) -> a -> b
$ do
          -- If any message failed to be inserted because of type mismatch, go
          -- back to them, log the insertion failure and add type suffix to each
          -- of the keys in their "data" fields to work around type errors.
          let newMsgs :: Vector Object
newMsgs =
                let modifyData :: Maybe Value -> Value -> Value
                    modifyData :: Maybe Value -> Value -> Value
modifyData Maybe Value
merr (Object Object
hm) = Object -> Value
Object forall a b. (a -> b) -> a -> b
$
                      let newData :: Object
newData = forall v a. (Key -> v -> a -> a) -> a -> KeyMap v -> a
AC.foldrWithKey Key -> Value -> Object -> Object
keyAddValueTypeSuffix forall v. KeyMap v
AC.empty Object
hm
                      in case Maybe Value
merr of
                        -- We have the error message, i.e. we're at the top
                        -- level object, so add it to the data.
                        Just Value
err -> Object
newData forall v. KeyMap v -> KeyMap v -> KeyMap v
`AC.union` forall v. Key -> v -> KeyMap v
AC.singleton Key
"__es_error" Value
err
                        Maybe Value
Nothing  -> Object
newData
                    modifyData Maybe Value
_ Value
v = Value
v

                    keyAddValueTypeSuffix :: Key -> Value -> Object -> Object
keyAddValueTypeSuffix Key
k Value
v Object
acc = forall v. Key -> v -> KeyMap v -> KeyMap v
AC.insert
                      (case Value
v of
                          Object{} -> Key
k forall a. Semigroup a => a -> a -> a
<> Key
"_object"
                          Array{}  -> Key
k forall a. Semigroup a => a -> a -> a
<> Key
"_array"
                          String{} -> Key
k forall a. Semigroup a => a -> a -> a
<> Key
"_string"
                          Number{} -> Key
k forall a. Semigroup a => a -> a -> a
<> Key
"_number"
                          Bool{}   -> Key
k forall a. Semigroup a => a -> a -> a
<> Key
"_bool"
                          Null{}   -> Key
k forall a. Semigroup a => a -> a -> a
<> Key
"_null"
                      ) (Maybe Value -> Value -> Value
modifyData forall a. Maybe a
Nothing Value
v) Object
acc
                in forall err obj.
(Maybe err -> obj -> obj)
-> Vector (KeyMap obj)
-> Vector (KeyMap err)
-> Vector (KeyMap obj)
adjustFailedMessagesWith Maybe Value -> Value -> Value
modifyData Vector Object
jsonMsgs Vector Object
responses
          -- Attempt to put modified messages.
          Value
newReply <- forall body. Response body -> body
responseBody forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> EsVersion
-> EsEnv
-> ElasticSearchConfig
-> Text
-> Vector Object
-> IO (Response Value)
bulkIndex EsVersion
version EsEnv
env ElasticSearchConfig
esConf Text
index Vector Object
newMsgs
          case forall a. Vector a -> Value -> Maybe (Bool, Vector Object)
checkForBulkErrors Vector Object
newMsgs Value
newReply of
            Maybe (Bool, Vector Object)
Nothing -> Text -> Value -> IO ()
printEsError Text
"unexpected response" Value
newReply
            Just (Bool
newHasErrors, Vector Object
newResponses) -> forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
newHasErrors forall a b. (a -> b) -> a -> b
$ do
              -- If some of the messages failed again (it might happen e.g. if
              -- data contains an array with elements of different types), drop
              -- their data field.
              let newerMsgs :: Vector Object
newerMsgs =
                    let modifyData :: Maybe Value -> Value -> Value
                        modifyData :: Maybe Value -> Value -> Value
modifyData (Just Value
err) Object{} = [Pair] -> Value
object [ Key
"__es_error" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= Value
err ]
                        modifyData Maybe Value
_ Value
v = Value
v
                    in forall err obj.
(Maybe err -> obj -> obj)
-> Vector (KeyMap obj)
-> Vector (KeyMap err)
-> Vector (KeyMap obj)
adjustFailedMessagesWith Maybe Value -> Value -> Value
modifyData Vector Object
newMsgs Vector Object
newResponses
              -- Ignore any further errors.
              forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ EsVersion
-> EsEnv
-> ElasticSearchConfig
-> Text
-> Vector Object
-> IO (Response Value)
bulkIndex EsVersion
version EsEnv
env ElasticSearchConfig
esConf Text
index Vector Object
newerMsgs)
    (EsEnv -> Text -> IO ()
refreshIndex EsEnv
env forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< forall a. IORef a -> IO a
readIORef IORef Text
indexRef)
  where
    -- Process reply of bulk indexing to get responses for each index operation
    -- and check whether any insertion failed.
    checkForBulkErrors
      :: V.Vector a
      -> Value
      -> Maybe (Bool, V.Vector Object)
    checkForBulkErrors :: forall a. Vector a -> Value -> Maybe (Bool, Vector Object)
checkForBulkErrors Vector a
jsonMsgs Value
replyBody = do
      Object Object
response <- forall (f :: * -> *) a. Applicative f => a -> f a
pure Value
replyBody
      Bool Bool
hasErrors  <- Key
"errors" forall v. Key -> KeyMap v -> Maybe v
`AC.lookup` Object
response
      Array Array
jsonItems <- Key
"items"  forall v. Key -> KeyMap v -> Maybe v
`AC.lookup` Object
response
      Vector Object
items <- forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
F.forM Array
jsonItems forall a b. (a -> b) -> a -> b
$ \Value
v -> do
        Object Object
item   <- forall (f :: * -> *) a. Applicative f => a -> f a
pure Value
v
        Object Object
index_ <- Key
"index" forall v. Key -> KeyMap v -> Maybe v
`AC.lookup` Object
item
          -- ES <= 2.x returns 'create' for some reason, so consider both.
          forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> Key
"create" forall v. Key -> KeyMap v -> Maybe v
`AC.lookup` Object
item
        forall (f :: * -> *) a. Applicative f => a -> f a
pure Object
index_
      forall (f :: * -> *). Alternative f => Bool -> f ()
guard forall a b. (a -> b) -> a -> b
$ forall a. Vector a -> Int
V.length Vector Object
items forall a. Eq a => a -> a -> Bool
== forall a. Vector a -> Int
V.length Vector a
jsonMsgs
      forall (f :: * -> *) a. Applicative f => a -> f a
pure (Bool
hasErrors, Vector Object
items)

    adjustFailedMessagesWith
      :: (Maybe err -> obj -> obj)
      -> V.Vector (AC.KeyMap obj)
      -> V.Vector (AC.KeyMap err)
      -> V.Vector (AC.KeyMap obj)
    adjustFailedMessagesWith :: forall err obj.
(Maybe err -> obj -> obj)
-> Vector (KeyMap obj)
-> Vector (KeyMap err)
-> Vector (KeyMap obj)
adjustFailedMessagesWith Maybe err -> obj -> obj
f Vector (KeyMap obj)
jsonMsgs Vector (KeyMap err)
responses =
      let failed :: Vector (Int, err)
failed = forall a b. (Int -> a -> Maybe b) -> Vector a -> Vector b
V.imapMaybe (\Int
n KeyMap err
item -> (Int
n, ) forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Key
"error" forall v. Key -> KeyMap v -> Maybe v
`AC.lookup` KeyMap err
item) Vector (KeyMap err)
responses
          adjust :: (t -> t) -> Key -> KeyMap t -> KeyMap t
adjust t -> t
act Key
key KeyMap t
m = case forall v. Key -> KeyMap v -> Maybe v
AC.lookup Key
key KeyMap t
m of
            Maybe t
Nothing -> KeyMap t
m
            Just t
v -> forall v. Key -> v -> KeyMap v -> KeyMap v
AC.insert Key
key (t -> t
act t
v) KeyMap t
m
      in (forall a b. (a -> b) -> Vector a -> Vector b
`V.map` Vector (Int, err)
failed) forall a b. (a -> b) -> a -> b
$ \(Int
n, err
err) -> forall {t}. (t -> t) -> Key -> KeyMap t -> KeyMap t
adjust (Maybe err -> obj -> obj
f forall a b. (a -> b) -> a -> b
$ forall a. a -> Maybe a
Just err
err) Key
"data" forall a b. (a -> b) -> a -> b
$ Vector (KeyMap obj)
jsonMsgs forall a. Vector a -> Int -> a
V.! Int
n

    printEsError :: Text -> Value -> IO ()
printEsError Text
msg Value
body =
      Text -> IO ()
T.putStrLn forall a b. (a -> b) -> a -> b
$ Text
"elasticSearchLogger: " forall a. Semigroup a => a -> a -> a
<> Text
msg forall a. Semigroup a => a -> a -> a
<> Text
" " forall a. Semigroup a => a -> a -> a
<> Value -> Text
prettyJson Value
body

    retryOnException :: forall r. IORef (Maybe EsVersion) -> IO r -> IO r
    retryOnException :: forall r. IORef (Maybe EsVersion) -> IO r -> IO r
retryOnException IORef (Maybe EsVersion)
versionRef IO r
m = forall e a. Exception e => IO a -> IO (Either e a)
try IO r
m forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
      Left (SomeException
ex::SomeException) -> do
        [Char] -> IO ()
putStrLn forall a b. (a -> b) -> a -> b
$ [Char]
"ElasticSearch: unexpected error: "
          forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> [Char]
show SomeException
ex forall a. Semigroup a => a -> a -> a
<> [Char]
", retrying in 10 seconds"
        -- If there was an exception, ElasticSearch version might've changed, so
        -- reset it.
        forall a. IORef a -> a -> IO ()
writeIORef IORef (Maybe EsVersion)
versionRef forall a. Maybe a
Nothing
        Int -> IO ()
threadDelay forall a b. (a -> b) -> a -> b
$ Int
10 forall a. Num a => a -> a -> a
* Int
1000000
        forall r. IORef (Maybe EsVersion) -> IO r -> IO r
retryOnException IORef (Maybe EsVersion)
versionRef IO r
m
      Right r
result -> forall (m :: * -> *) a. Monad m => a -> m a
return r
result

    prettyJson :: Value -> T.Text
    prettyJson :: Value -> Text
prettyJson = Text -> Text
TL.toStrict
               forall b c a. (b -> c) -> (a -> b) -> a -> c
. Builder -> Text
T.toLazyText
               forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. ToJSON a => Config -> a -> Builder
encodePrettyToTextBuilder' Config
defConfig { confIndent :: Indent
confIndent = Int -> Indent
Spaces Int
2 }

    toJsonMsg :: LogMessage -> Object
    toJsonMsg :: LogMessage -> Object
toJsonMsg LogMessage
msg = case forall a. ToJSON a => a -> Value
toJSON LogMessage
msg of
      Object Object
jMsg -> Object
jMsg
      Value
value       -> forall a. HasCallStack => [Char] -> a
error forall a b. (a -> b) -> a -> b
$ [Char]
"unexpected non-object: " forall a. [a] -> [a] -> [a]
++ forall a. Show a => a -> [Char]
show Value
value

----------------------------------------

-- | Check that login credentials are specified properly.
--
-- @since 0.10.0.0
checkElasticSearchLogin :: MonadIO m => ElasticSearchConfig -> m ()
checkElasticSearchLogin :: forall (m :: * -> *). MonadIO m => ElasticSearchConfig -> m ()
checkElasticSearchLogin ElasticSearchConfig{Bool
Int
Maybe (Text, Text)
Text
esLoginInsecure :: Bool
esLogin :: Maybe (Text, Text)
esMapping :: Text
esReplicaCount :: Int
esShardCount :: Int
esIndex :: Text
esServer :: Text
esLoginInsecure :: ElasticSearchConfig -> Bool
esLogin :: ElasticSearchConfig -> Maybe (Text, Text)
esMapping :: ElasticSearchConfig -> Text
esReplicaCount :: ElasticSearchConfig -> Int
esShardCount :: ElasticSearchConfig -> Int
esIndex :: ElasticSearchConfig -> Text
esServer :: ElasticSearchConfig -> Text
..} = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ do
  forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (forall a. Maybe a -> Bool
isJust Maybe (Text, Text)
esLogin
        Bool -> Bool -> Bool
&& Bool -> Bool
not Bool
esLoginInsecure
        Bool -> Bool -> Bool
&& Bool -> Bool
not (Text
"https:" Text -> Text -> Bool
`T.isPrefixOf` Text
esServer)) forall a b. (a -> b) -> a -> b
$
    forall a. HasCallStack => [Char] -> a
error forall a b. (a -> b) -> a -> b
$ [Char]
"ElasticSearch: insecure login: "
      forall a. Semigroup a => a -> a -> a
<> [Char]
"Attempting to send login credentials over an insecure connection. "
      forall a. Semigroup a => a -> a -> a
<> [Char]
"Set esLoginInsecure = True to disable this check."

-- | Check that we can connect to the ES server.
--
-- @since 0.10.0.0
checkElasticSearchConnection :: MonadIO m => ElasticSearchConfig -> m (Either HttpException ())
checkElasticSearchConnection :: forall (m :: * -> *).
MonadIO m =>
ElasticSearchConfig -> m (Either HttpException ())
checkElasticSearchConnection ElasticSearchConfig
esConf = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ do
  forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (forall a b. a -> b -> a
const ()) forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (EsEnv -> IO (Either HttpException (Response Value))
serverInfo forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< ElasticSearchConfig -> IO EsEnv
mkEsEnv ElasticSearchConfig
esConf)