{-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE RankNTypes #-} {-# LANGUAGE RecordWildCards #-} {-# LANGUAGE ScopedTypeVariables #-} module Database.Bloodhound.ElasticSearch7.Client ( module Reexport, pitSearch, openPointInTime, closePointInTime, ) where import Control.Monad import Data.Aeson import Data.Monoid import Database.Bloodhound.Client.Cluster import Database.Bloodhound.Common.Client as Reexport import qualified Database.Bloodhound.ElasticSearch7.Requests as Requests import Database.Bloodhound.ElasticSearch7.Types import Prelude hiding (filter, head) -- | 'pitSearch' uses the point in time (PIT) API of elastic, for a given -- 'IndexName'. Requires Elasticsearch >=7.10. Note that this will consume the -- entire search result set and will be doing O(n) list appends so this may -- not be suitable for large result sets. In that case, the point in time API -- should be used directly with `openPointInTime` and `closePointInTime`. -- -- Note that 'pitSearch' utilizes the 'search_after' parameter under the hood, -- which requires a non-empty 'sortBody' field in the provided 'Search' value. -- Otherwise, 'pitSearch' will fail to return all matching documents. -- -- For more information see -- . pitSearch :: forall a m. (FromJSON a, MonadBH m) => IndexName -> Search -> m [Hit a] pitSearch indexName search = do openResp <- openPointInTime indexName case openResp of Left e -> throwEsError e Right OpenPointInTimeResponse {..} -> do let searchPIT = search {pointInTime = Just (PointInTime oPitId "1m")} hits <- pitAccumulator searchPIT [] closeResp <- closePointInTime (ClosePointInTime oPitId) case closeResp of Left _ -> return [] Right (ClosePointInTimeResponse False _) -> error "failed to close point in time (PIT)" Right (ClosePointInTimeResponse True _) -> return hits where pitAccumulator :: Search -> [Hit a] -> m [Hit a] pitAccumulator search' oldHits = do resp <- tryPerformBHRequest $ Requests.searchAll search' case resp of Left _ -> return [] Right searchResult -> case hits (searchHits searchResult) of [] -> return oldHits newHits -> case (hitSort $ last newHits, pitId searchResult) of (Nothing, Nothing) -> error "no point in time (PIT) ID or last sort value" (Just _, Nothing) -> error "no point in time (PIT) ID" (Nothing, _) -> return (oldHits <> newHits) (Just lastSort, Just pitId') -> do let newSearch = search' { pointInTime = Just (PointInTime pitId' "1m"), searchAfterKey = Just lastSort } pitAccumulator newSearch (oldHits <> newHits) -- | 'openPointInTime' opens a point in time for an index given an 'IndexName'. -- Note that the point in time should be closed with 'closePointInTime' as soon -- as it is no longer needed. -- -- For more information see -- . openPointInTime :: (MonadBH m) => IndexName -> m (ParsedEsResponse OpenPointInTimeResponse) openPointInTime indexName = performBHRequest $ Requests.openPointInTime indexName -- | 'closePointInTime' closes a point in time given a 'ClosePointInTime'. -- -- For more information see -- . closePointInTime :: (MonadBH m) => ClosePointInTime -> m (ParsedEsResponse ClosePointInTimeResponse) closePointInTime q = performBHRequest $ Requests.closePointInTime q