module Log.Backend.ElasticSearch
( ElasticSearchConfig
, esServer
, esIndex
, esShardCount
, esReplicaCount
, esMapping
, esLogin
, esLoginInsecure
, checkElasticSearchLogin
, checkElasticSearchConnection
, defaultElasticSearchConfig
, withElasticSearchLogger
) where
import Control.Applicative
import Control.Concurrent
import Control.Exception
import Control.Monad
import Control.Monad.IO.Unlift
import Data.Aeson
import Data.Aeson.Encode.Pretty
import Data.IORef
import Data.Maybe
import Data.Time
import Log
import Log.Internal.Logger
import Network.HTTP.Client
import qualified Data.Text as T
import qualified Data.Text.IO as T
import qualified Data.Text.Lazy as TL
import qualified Data.Text.Lazy.Builder as T
import qualified Data.Traversable as F
import qualified Data.Vector as V
import Log.Backend.ElasticSearch.Internal
import qualified Log.Internal.Aeson.Compat as AC
withElasticSearchLogger :: MonadUnliftIO m => ElasticSearchConfig -> (Logger -> m r) -> m r
withElasticSearchLogger :: forall (m :: * -> *) r.
MonadUnliftIO m =>
ElasticSearchConfig -> (Logger -> m r) -> m r
withElasticSearchLogger ElasticSearchConfig
conf Logger -> m r
act = forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
unlift -> do
Logger
logger <- ElasticSearchConfig -> IO Logger
elasticSearchLogger ElasticSearchConfig
conf
forall r. Logger -> (Logger -> IO r) -> IO r
withLogger Logger
logger (forall a. m a -> IO a
unlift forall b c a. (b -> c) -> (a -> b) -> a -> c
. Logger -> m r
act)
elasticSearchLogger
:: ElasticSearchConfig
-> IO Logger
elasticSearchLogger :: ElasticSearchConfig -> IO Logger
elasticSearchLogger esConf :: ElasticSearchConfig
esConf@ElasticSearchConfig{Bool
Int
Maybe (Text, Text)
Text
esLoginInsecure :: Bool
esLogin :: Maybe (Text, Text)
esMapping :: Text
esReplicaCount :: Int
esShardCount :: Int
esIndex :: Text
esServer :: Text
esLoginInsecure :: ElasticSearchConfig -> Bool
esLogin :: ElasticSearchConfig -> Maybe (Text, Text)
esMapping :: ElasticSearchConfig -> Text
esReplicaCount :: ElasticSearchConfig -> Int
esShardCount :: ElasticSearchConfig -> Int
esIndex :: ElasticSearchConfig -> Text
esServer :: ElasticSearchConfig -> Text
..} = do
forall (m :: * -> *). MonadIO m => ElasticSearchConfig -> m ()
checkElasticSearchLogin ElasticSearchConfig
esConf
EsEnv
env <- ElasticSearchConfig -> IO EsEnv
mkEsEnv ElasticSearchConfig
esConf
IORef (Maybe EsVersion)
versionRef <- forall a. a -> IO (IORef a)
newIORef forall a. Maybe a
Nothing
IORef Text
indexRef <- forall a. a -> IO (IORef a)
newIORef Text
T.empty
Text -> ([LogMessage] -> IO ()) -> IO () -> IO Logger
mkBulkLogger Text
"ElasticSearch" (\[LogMessage]
msgs -> do
UTCTime
now <- IO UTCTime
getCurrentTime
Text
oldIndex <- forall a. IORef a -> IO a
readIORef IORef Text
indexRef
forall r. IORef (Maybe EsVersion) -> IO r -> IO r
retryOnException IORef (Maybe EsVersion)
versionRef forall a b. (a -> b) -> a -> b
$ do
EsVersion
version <- forall a. IORef a -> IO a
readIORef IORef (Maybe EsVersion)
versionRef forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Just EsVersion
version -> forall (f :: * -> *) a. Applicative f => a -> f a
pure EsVersion
version
Maybe EsVersion
Nothing -> EsEnv -> IO (Either HttpException (Response Value))
serverInfo EsEnv
env forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Left (HttpException
ex :: HttpException) -> forall a. HasCallStack => [Char] -> a
error
forall a b. (a -> b) -> a -> b
$ [Char]
"elasticSearchLogger: unexpected error: "
forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> [Char]
show HttpException
ex
forall a. Semigroup a => a -> a -> a
<> [Char]
" (is ElasticSearch server running?)"
Right Response Value
reply -> case Value -> Maybe EsVersion
parseEsVersion forall a b. (a -> b) -> a -> b
$ forall body. Response body -> body
responseBody Response Value
reply of
Maybe EsVersion
Nothing -> forall a. HasCallStack => [Char] -> a
error
forall a b. (a -> b) -> a -> b
$ [Char]
"elasticSearchLogger: invalid response when parsing version number: "
forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> [Char]
show Response Value
reply
Just EsVersion
version -> forall (f :: * -> *) a. Applicative f => a -> f a
pure EsVersion
version
let index :: Text
index = [Text] -> Text
T.concat
[ Text
esIndex
, Text
"-"
, [Char] -> Text
T.pack forall a b. (a -> b) -> a -> b
$ forall t. FormatTime t => TimeLocale -> [Char] -> t -> [Char]
formatTime TimeLocale
defaultTimeLocale [Char]
"%F" UTCTime
now
]
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Text
oldIndex forall a. Eq a => a -> a -> Bool
/= Text
index) forall a b. (a -> b) -> a -> b
$ do
Bool
ixExists <- EsEnv -> Text -> IO Bool
indexExists EsEnv
env Text
index
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
ixExists forall a b. (a -> b) -> a -> b
$ do
Response Value
reply <- EsVersion
-> EsEnv -> ElasticSearchConfig -> Text -> IO (Response Value)
createIndexWithMapping EsVersion
version EsEnv
env ElasticSearchConfig
esConf Text
index
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (forall a. Response a -> Bool
isSuccess Response Value
reply) forall a b. (a -> b) -> a -> b
$ do
Text -> Value -> IO ()
printEsError Text
"error while creating index" forall a b. (a -> b) -> a -> b
$ forall body. Response body -> body
responseBody Response Value
reply
forall a. IORef a -> a -> IO ()
writeIORef IORef Text
indexRef Text
index
let jsonMsgs :: Vector Object
jsonMsgs = forall a. [a] -> Vector a
V.fromList forall a b. (a -> b) -> a -> b
$ forall a b. (a -> b) -> [a] -> [b]
map LogMessage -> Object
toJsonMsg [LogMessage]
msgs
Value
reply <- forall body. Response body -> body
responseBody forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> EsVersion
-> EsEnv
-> ElasticSearchConfig
-> Text
-> Vector Object
-> IO (Response Value)
bulkIndex EsVersion
version EsEnv
env ElasticSearchConfig
esConf Text
index Vector Object
jsonMsgs
case forall a. Vector a -> Value -> Maybe (Bool, Vector Object)
checkForBulkErrors Vector Object
jsonMsgs Value
reply of
Maybe (Bool, Vector Object)
Nothing -> Text -> Value -> IO ()
printEsError Text
"unexpected response" Value
reply
Just (Bool
hasErrors, Vector Object
responses) -> forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
hasErrors forall a b. (a -> b) -> a -> b
$ do
let newMsgs :: Vector Object
newMsgs =
let modifyData :: Maybe Value -> Value -> Value
modifyData :: Maybe Value -> Value -> Value
modifyData Maybe Value
merr (Object Object
hm) = Object -> Value
Object forall a b. (a -> b) -> a -> b
$
let newData :: Object
newData = forall v a. (Key -> v -> a -> a) -> a -> KeyMap v -> a
AC.foldrWithKey Key -> Value -> Object -> Object
keyAddValueTypeSuffix forall v. KeyMap v
AC.empty Object
hm
in case Maybe Value
merr of
Just Value
err -> Object
newData forall v. KeyMap v -> KeyMap v -> KeyMap v
`AC.union` forall v. Key -> v -> KeyMap v
AC.singleton Key
"__es_error" Value
err
Maybe Value
Nothing -> Object
newData
modifyData Maybe Value
_ Value
v = Value
v
keyAddValueTypeSuffix :: Key -> Value -> Object -> Object
keyAddValueTypeSuffix Key
k Value
v Object
acc = forall v. Key -> v -> KeyMap v -> KeyMap v
AC.insert
(case Value
v of
Object{} -> Key
k forall a. Semigroup a => a -> a -> a
<> Key
"_object"
Array{} -> Key
k forall a. Semigroup a => a -> a -> a
<> Key
"_array"
String{} -> Key
k forall a. Semigroup a => a -> a -> a
<> Key
"_string"
Number{} -> Key
k forall a. Semigroup a => a -> a -> a
<> Key
"_number"
Bool{} -> Key
k forall a. Semigroup a => a -> a -> a
<> Key
"_bool"
Null{} -> Key
k forall a. Semigroup a => a -> a -> a
<> Key
"_null"
) (Maybe Value -> Value -> Value
modifyData forall a. Maybe a
Nothing Value
v) Object
acc
in forall err obj.
(Maybe err -> obj -> obj)
-> Vector (KeyMap obj)
-> Vector (KeyMap err)
-> Vector (KeyMap obj)
adjustFailedMessagesWith Maybe Value -> Value -> Value
modifyData Vector Object
jsonMsgs Vector Object
responses
Value
newReply <- forall body. Response body -> body
responseBody forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> EsVersion
-> EsEnv
-> ElasticSearchConfig
-> Text
-> Vector Object
-> IO (Response Value)
bulkIndex EsVersion
version EsEnv
env ElasticSearchConfig
esConf Text
index Vector Object
newMsgs
case forall a. Vector a -> Value -> Maybe (Bool, Vector Object)
checkForBulkErrors Vector Object
newMsgs Value
newReply of
Maybe (Bool, Vector Object)
Nothing -> Text -> Value -> IO ()
printEsError Text
"unexpected response" Value
newReply
Just (Bool
newHasErrors, Vector Object
newResponses) -> forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
newHasErrors forall a b. (a -> b) -> a -> b
$ do
let newerMsgs :: Vector Object
newerMsgs =
let modifyData :: Maybe Value -> Value -> Value
modifyData :: Maybe Value -> Value -> Value
modifyData (Just Value
err) Object{} = [Pair] -> Value
object [ Key
"__es_error" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= Value
err ]
modifyData Maybe Value
_ Value
v = Value
v
in forall err obj.
(Maybe err -> obj -> obj)
-> Vector (KeyMap obj)
-> Vector (KeyMap err)
-> Vector (KeyMap obj)
adjustFailedMessagesWith Maybe Value -> Value -> Value
modifyData Vector Object
newMsgs Vector Object
newResponses
forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ EsVersion
-> EsEnv
-> ElasticSearchConfig
-> Text
-> Vector Object
-> IO (Response Value)
bulkIndex EsVersion
version EsEnv
env ElasticSearchConfig
esConf Text
index Vector Object
newerMsgs)
(EsEnv -> Text -> IO ()
refreshIndex EsEnv
env forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< forall a. IORef a -> IO a
readIORef IORef Text
indexRef)
where
checkForBulkErrors
:: V.Vector a
-> Value
-> Maybe (Bool, V.Vector Object)
checkForBulkErrors :: forall a. Vector a -> Value -> Maybe (Bool, Vector Object)
checkForBulkErrors Vector a
jsonMsgs Value
replyBody = do
Object Object
response <- forall (f :: * -> *) a. Applicative f => a -> f a
pure Value
replyBody
Bool Bool
hasErrors <- Key
"errors" forall v. Key -> KeyMap v -> Maybe v
`AC.lookup` Object
response
Array Array
jsonItems <- Key
"items" forall v. Key -> KeyMap v -> Maybe v
`AC.lookup` Object
response
Vector Object
items <- forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
F.forM Array
jsonItems forall a b. (a -> b) -> a -> b
$ \Value
v -> do
Object Object
item <- forall (f :: * -> *) a. Applicative f => a -> f a
pure Value
v
Object Object
index_ <- Key
"index" forall v. Key -> KeyMap v -> Maybe v
`AC.lookup` Object
item
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> Key
"create" forall v. Key -> KeyMap v -> Maybe v
`AC.lookup` Object
item
forall (f :: * -> *) a. Applicative f => a -> f a
pure Object
index_
forall (f :: * -> *). Alternative f => Bool -> f ()
guard forall a b. (a -> b) -> a -> b
$ forall a. Vector a -> Int
V.length Vector Object
items forall a. Eq a => a -> a -> Bool
== forall a. Vector a -> Int
V.length Vector a
jsonMsgs
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Bool
hasErrors, Vector Object
items)
adjustFailedMessagesWith
:: (Maybe err -> obj -> obj)
-> V.Vector (AC.KeyMap obj)
-> V.Vector (AC.KeyMap err)
-> V.Vector (AC.KeyMap obj)
adjustFailedMessagesWith :: forall err obj.
(Maybe err -> obj -> obj)
-> Vector (KeyMap obj)
-> Vector (KeyMap err)
-> Vector (KeyMap obj)
adjustFailedMessagesWith Maybe err -> obj -> obj
f Vector (KeyMap obj)
jsonMsgs Vector (KeyMap err)
responses =
let failed :: Vector (Int, err)
failed = forall a b. (Int -> a -> Maybe b) -> Vector a -> Vector b
V.imapMaybe (\Int
n KeyMap err
item -> (Int
n, ) forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Key
"error" forall v. Key -> KeyMap v -> Maybe v
`AC.lookup` KeyMap err
item) Vector (KeyMap err)
responses
adjust :: (t -> t) -> Key -> KeyMap t -> KeyMap t
adjust t -> t
act Key
key KeyMap t
m = case forall v. Key -> KeyMap v -> Maybe v
AC.lookup Key
key KeyMap t
m of
Maybe t
Nothing -> KeyMap t
m
Just t
v -> forall v. Key -> v -> KeyMap v -> KeyMap v
AC.insert Key
key (t -> t
act t
v) KeyMap t
m
in (forall a b. (a -> b) -> Vector a -> Vector b
`V.map` Vector (Int, err)
failed) forall a b. (a -> b) -> a -> b
$ \(Int
n, err
err) -> forall {t}. (t -> t) -> Key -> KeyMap t -> KeyMap t
adjust (Maybe err -> obj -> obj
f forall a b. (a -> b) -> a -> b
$ forall a. a -> Maybe a
Just err
err) Key
"data" forall a b. (a -> b) -> a -> b
$ Vector (KeyMap obj)
jsonMsgs forall a. Vector a -> Int -> a
V.! Int
n
printEsError :: Text -> Value -> IO ()
printEsError Text
msg Value
body =
Text -> IO ()
T.putStrLn forall a b. (a -> b) -> a -> b
$ Text
"elasticSearchLogger: " forall a. Semigroup a => a -> a -> a
<> Text
msg forall a. Semigroup a => a -> a -> a
<> Text
" " forall a. Semigroup a => a -> a -> a
<> Value -> Text
prettyJson Value
body
retryOnException :: forall r. IORef (Maybe EsVersion) -> IO r -> IO r
retryOnException :: forall r. IORef (Maybe EsVersion) -> IO r -> IO r
retryOnException IORef (Maybe EsVersion)
versionRef IO r
m = forall e a. Exception e => IO a -> IO (Either e a)
try IO r
m forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Left (SomeException
ex::SomeException) -> do
[Char] -> IO ()
putStrLn forall a b. (a -> b) -> a -> b
$ [Char]
"ElasticSearch: unexpected error: "
forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> [Char]
show SomeException
ex forall a. Semigroup a => a -> a -> a
<> [Char]
", retrying in 10 seconds"
forall a. IORef a -> a -> IO ()
writeIORef IORef (Maybe EsVersion)
versionRef forall a. Maybe a
Nothing
Int -> IO ()
threadDelay forall a b. (a -> b) -> a -> b
$ Int
10 forall a. Num a => a -> a -> a
* Int
1000000
forall r. IORef (Maybe EsVersion) -> IO r -> IO r
retryOnException IORef (Maybe EsVersion)
versionRef IO r
m
Right r
result -> forall (m :: * -> *) a. Monad m => a -> m a
return r
result
prettyJson :: Value -> T.Text
prettyJson :: Value -> Text
prettyJson = Text -> Text
TL.toStrict
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Builder -> Text
T.toLazyText
forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. ToJSON a => Config -> a -> Builder
encodePrettyToTextBuilder' Config
defConfig { confIndent :: Indent
confIndent = Int -> Indent
Spaces Int
2 }
toJsonMsg :: LogMessage -> Object
toJsonMsg :: LogMessage -> Object
toJsonMsg LogMessage
msg = case forall a. ToJSON a => a -> Value
toJSON LogMessage
msg of
Object Object
jMsg -> Object
jMsg
Value
value -> forall a. HasCallStack => [Char] -> a
error forall a b. (a -> b) -> a -> b
$ [Char]
"unexpected non-object: " forall a. [a] -> [a] -> [a]
++ forall a. Show a => a -> [Char]
show Value
value
checkElasticSearchLogin :: MonadIO m => ElasticSearchConfig -> m ()
checkElasticSearchLogin :: forall (m :: * -> *). MonadIO m => ElasticSearchConfig -> m ()
checkElasticSearchLogin ElasticSearchConfig{Bool
Int
Maybe (Text, Text)
Text
esLoginInsecure :: Bool
esLogin :: Maybe (Text, Text)
esMapping :: Text
esReplicaCount :: Int
esShardCount :: Int
esIndex :: Text
esServer :: Text
esLoginInsecure :: ElasticSearchConfig -> Bool
esLogin :: ElasticSearchConfig -> Maybe (Text, Text)
esMapping :: ElasticSearchConfig -> Text
esReplicaCount :: ElasticSearchConfig -> Int
esShardCount :: ElasticSearchConfig -> Int
esIndex :: ElasticSearchConfig -> Text
esServer :: ElasticSearchConfig -> Text
..} = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ do
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (forall a. Maybe a -> Bool
isJust Maybe (Text, Text)
esLogin
Bool -> Bool -> Bool
&& Bool -> Bool
not Bool
esLoginInsecure
Bool -> Bool -> Bool
&& Bool -> Bool
not (Text
"https:" Text -> Text -> Bool
`T.isPrefixOf` Text
esServer)) forall a b. (a -> b) -> a -> b
$
forall a. HasCallStack => [Char] -> a
error forall a b. (a -> b) -> a -> b
$ [Char]
"ElasticSearch: insecure login: "
forall a. Semigroup a => a -> a -> a
<> [Char]
"Attempting to send login credentials over an insecure connection. "
forall a. Semigroup a => a -> a -> a
<> [Char]
"Set esLoginInsecure = True to disable this check."
checkElasticSearchConnection :: MonadIO m => ElasticSearchConfig -> m (Either HttpException ())
checkElasticSearchConnection :: forall (m :: * -> *).
MonadIO m =>
ElasticSearchConfig -> m (Either HttpException ())
checkElasticSearchConnection ElasticSearchConfig
esConf = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ do
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (forall a b. a -> b -> a
const ()) forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (EsEnv -> IO (Either HttpException (Response Value))
serverInfo forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< ElasticSearchConfig -> IO EsEnv
mkEsEnv ElasticSearchConfig
esConf)