{-| This module exports "Conduit" interfaces to ElasticSearch. It is totally experimental. -} module Data.Conduit.ElasticSearch (esSink) where import Prelude hiding (catch) import Control.Exception import Data.Conduit import qualified Data.Conduit.List as CL import Network.HTTP.Conduit import Data.Aeson import qualified Data.ByteString as BS import qualified Data.ByteString.Lazy as BSL import Data.Time import qualified Data.Text.Lazy.Encoding as E import Data.Text.Format (format,left) import Logstash.Message import Control.Monad.IO.Class import Control.Concurrent (threadDelay) import qualified Data.HashMap.Strict as HM safeQuery :: Request (ResourceT IO) -> IO (Response BSL.ByteString) safeQuery req = catch (withManager $ httpLbs req) (\e -> print (e :: SomeException) >> threadDelay 500000 >> safeQuery req) -- | Takes JSONifiable values, and returns the result of the ES request -- along with the value in case of errors, or ES's values in case of -- success esSink :: (MonadResource m) => Maybe (Request m) -- ^ Defaults parameters for the http request to ElasticSearch. Use "Nothing" for defaults. -> BS.ByteString -- ^ Hostname of the ElasticSearch server -> Int -- ^ Port of the HTTP interface (usually 9200) -> Conduit LogstashMessage m (Either (LogstashMessage, Value) Value) esSink r h p = CL.mapM doIndexA where defR1 = case r of Just x -> x Nothing -> def defR2 = defR1 { host = h , port = p , method = "POST" , checkStatus = (\_ _ -> Nothing) } doIndexA :: (MonadResource m) => LogstashMessage -> m (Either (LogstashMessage, Value) Value) doIndexA input = case logstashTime input of Nothing -> return $! Left (input, object [ "error" .= String "Time was not supplied" ]) Just (UTCTime day _) -> do let (y,m,d) = toGregorian day req = defR2 { path = BSL.toStrict (E.encodeUtf8 (format "/logstash-{}-{}-{}/{}/" (y, left 2 '0' m, left 2 '0' d, logstashType input))) , requestBody = RequestBodyLBS (encode input) } res <- liftIO $ safeQuery req case decode (responseBody res) of Just (Object hh) -> case HM.lookup "ok" hh of Just (Bool True) -> return $! Right (Object hh) _ -> return $! Left (input, Object hh) Just j -> return $! Left (input, j) Nothing -> return $! Left (input, object [ "error" .= String "Could not decode", "content" .= responseBody res ])