{-# LANGUAGE DeriveGeneric #-} module Log.Backend.ElasticSearch.Internal ( ElasticSearchConfig(..) , defaultElasticSearchConfig -- * ES version , EsVersion(..) , parseEsVersion , esV5, esV7 -- * ES commands , serverInfo , indexExists , createIndexWithMapping , bulkIndex , refreshIndex -- * ES communication details , EsEnv(..) , mkEsEnv , dispatch , decodeReply , isSuccess ) where import Control.Applicative import Control.Exception import Control.Monad import Data.Aeson import Data.Ix (inRange) import Data.Maybe import Data.Semigroup import GHC.Generics (Generic) import Network.HTTP.Client import Network.HTTP.Types import Network.HTTP.Client.TLS (tlsManagerSettings) import Prelude import qualified Data.ByteString.Builder as BSB import qualified Data.ByteString.Lazy.Char8 as BSL import qualified Data.HashMap.Strict as H import qualified Data.Text as T import qualified Data.Text.Encoding as T import qualified Data.Vector as V -- | Configuration for the Elasticsearch 'Logger'. See -- -- for the explanation of terms. data ElasticSearchConfig = ElasticSearchConfig { esServer :: !T.Text -- ^ Elasticsearch server address. , esIndex :: !T.Text -- ^ Elasticsearch index name. , esShardCount :: !Int -- ^ Elasticsearch shard count for the named index. -- -- @since 0.10.0.0 , esReplicaCount :: !Int -- ^ Elasticsearch replica count for the named index. -- -- @since 0.10.0.0 , esMapping :: !T.Text -- ^ Elasticsearch mapping name (unused with ES >= 7.0.0) , esLogin :: Maybe (T.Text, T.Text) -- ^ Elasticsearch basic authentication username and password. , esLoginInsecure :: !Bool -- ^ Allow basic authentication over non-TLS connections. } deriving (Eq, Show, Generic) -- | Sensible defaults for 'ElasticSearchConfig'. defaultElasticSearchConfig :: ElasticSearchConfig defaultElasticSearchConfig = ElasticSearchConfig { esServer = "http://localhost:9200" , esIndex = "logs" , esShardCount = 4 , esReplicaCount = 1 , esMapping = "log" , esLogin = Nothing , esLoginInsecure = False } ---------------------------------------- -- ES communication -- Most of the below code is taken from the bloodhound library -- (https://github.com/bitemyapp/bloodhound). data EsVersion = EsVersion !Int !Int !Int deriving (Eq, Ord) parseEsVersion :: Value -> Maybe EsVersion parseEsVersion js = do Object props <- pure js Object version <- "version" `H.lookup` props String number <- "number" `H.lookup` version [v1, v2, v3] <- mapM (maybeRead . T.unpack) $ T.splitOn "." number pure $ EsVersion v1 v2 v3 where maybeRead s = do [(v, "")] <- pure $ reads s pure v -- | Minimum version with split 'string' type. esV5 :: EsVersion esV5 = EsVersion 5 0 0 -- | Minimum version without mapping types. esV7 :: EsVersion esV7 = EsVersion 7 0 0 ---------------------------------------- -- | Check the ElasticSearch server for info. Result can be fed to -- 'parseEsVersion' to determine version of the server. serverInfo :: EsEnv -> IO (Either HttpException (Response Value)) serverInfo env = try $ dispatch env methodGet [] Nothing -- | Check that given index exists. indexExists :: EsEnv -> T.Text -> IO Bool indexExists env index = isSuccess <$> dispatch env methodHead [index] Nothing -- | Create an index with given mapping. createIndexWithMapping :: EsVersion -> EsEnv -> ElasticSearchConfig -> T.Text -> IO (Response Value) createIndexWithMapping version env ElasticSearchConfig{..} index = do dispatch env methodPut [index] . Just . encode $ object [ "settings" .= object [ "number_of_shards" .= esShardCount , "number_of_replicas" .= esReplicaCount ] , "mappings" .= if version >= esV7 then logsMapping else object [ esMapping .= logsMapping ] ] where logsMapping = object [ "properties" .= object [ "insertion_order" .= object [ "type" .= ("integer"::T.Text) ] , "insertion_time" .= object [ "type" .= ("date"::T.Text) , "format" .= ("date_time"::T.Text) ] , "time" .= object [ "type" .= ("date"::T.Text) , "format" .= ("date_time"::T.Text) ] , "domain" .= object [ "type" .= textTy ] , "level" .= object [ "type" .= textTy ] , "component" .= object [ "type" .= textTy ] , "message" .= object [ "type" .= textTy ] ] ] where textTy :: T.Text textTy = if version >= esV5 then "text" else "string" -- Index multiple log messages. bulkIndex :: EsVersion -> EsEnv -> ElasticSearchConfig -> T.Text -> V.Vector (H.HashMap T.Text Value) -> IO (Response Value) bulkIndex version env conf index objs = do dispatch env methodPost route . Just . BSB.toLazyByteString $ foldMap ixOp objs where route = if version >= esV7 then [index, "_bulk"] else [index, esMapping conf, "_bulk"] ixOp obj = ixCmd <> BSB.char8 '\n' <> BSB.lazyByteString (encode $ Object obj) <> BSB.char8 '\n' where ixCmd = BSB.lazyByteString . encode $ object [ "index" .= object [] ] -- Refresh given index. refreshIndex :: EsEnv -> T.Text -> IO () refreshIndex env index = void $ dispatch env methodPost [index, "_refresh"] Nothing ---------------------------------------- data EsEnv = EsEnv { envServer :: !T.Text , envManager :: !Manager , envRequestHook :: !(Request -> Request) } mkEsEnv :: ElasticSearchConfig -> IO EsEnv mkEsEnv ElasticSearchConfig{..} = do envManager <- newManager tlsManagerSettings let envServer = esServer envRequestHook = maybe id mkAuthHook esLogin pure EsEnv{..} where mkAuthHook (u, p) = applyBasicAuth (T.encodeUtf8 u) (T.encodeUtf8 p) ---------------------------------------- dispatch :: EsEnv -> Method -> [T.Text] -> Maybe BSL.ByteString -> IO (Response Value) dispatch EsEnv{..} dMethod url body = do initReq <- parseRequest $ T.unpack $ T.intercalate "/" $ envServer : url let req = envRequestHook . setRequestIgnoreStatus $ initReq { method = dMethod , requestBody = RequestBodyLBS $ fromMaybe BSL.empty body , requestHeaders = ("Content-Type", "application/json") : requestHeaders initReq } fmap decodeReply <$> httpLbs req envManager decodeReply :: BSL.ByteString -> Value decodeReply bs = case eitherDecode' bs of Right js -> js Left err -> object ["decoding_error" .= err] isSuccess :: Response a -> Bool isSuccess = statusCheck (inRange (200, 299)) where statusCheck :: (Int -> Bool) -> Response a -> Bool statusCheck p = p . statusCode . responseStatus