//===----------------------------------------------------------------------===// // DuckDB // // duckdb/execution/reservoir_sample.hpp // // //===----------------------------------------------------------------------===// #pragma once #include "duckdb/common/common.hpp" #include "duckdb/common/random_engine.hpp" #include "duckdb/common/types/chunk_collection.hpp" #include "duckdb/common/queue.hpp" namespace duckdb { class BaseReservoirSampling { public: explicit BaseReservoirSampling(int64_t seed); BaseReservoirSampling(); void InitializeReservoir(idx_t cur_size, idx_t sample_size); void SetNextEntry(); void ReplaceElement(); //! The random generator RandomEngine random; //! Priority queue of [random element, index] for each of the elements in the sample std::priority_queue> reservoir_weights; //! The next element to sample idx_t next_index; //! The reservoir threshold of the current min entry double min_threshold; //! The reservoir index of the current min entry idx_t min_entry; //! The current count towards next index (i.e. we will replace an entry in next_index - current_count tuples) idx_t current_count; }; class BlockingSample { public: explicit BlockingSample(int64_t seed) : base_reservoir_sample(seed), random(base_reservoir_sample.random) { } virtual ~BlockingSample() { } //! Add a chunk of data to the sample virtual void AddToReservoir(DataChunk &input) = 0; //! Fetches a chunk from the sample. Note that this method is destructive and should only be used after the // sample is completely built. virtual unique_ptr GetChunk() = 0; protected: //! The reservoir sampling BaseReservoirSampling base_reservoir_sample; RandomEngine &random; }; //! The reservoir sample class maintains a streaming sample of fixed size "sample_count" class ReservoirSample : public BlockingSample { public: ReservoirSample(Allocator &allocator, idx_t sample_count, int64_t seed); //! Add a chunk of data to the sample void AddToReservoir(DataChunk &input) override; //! Fetches a chunk from the sample. Note that this method is destructive and should only be used after the //! sample is completely built. unique_ptr GetChunk() override; private: //! Replace a single element of the input void ReplaceElement(DataChunk &input, idx_t index_in_chunk); //! Fills the reservoir up until sample_count entries, returns how many entries are still required idx_t FillReservoir(DataChunk &input); private: //! The size of the reservoir sample idx_t sample_count; //! The current reservoir ChunkCollection reservoir; }; //! The reservoir sample sample_size class maintains a streaming sample of variable size class ReservoirSamplePercentage : public BlockingSample { constexpr static idx_t RESERVOIR_THRESHOLD = 100000; public: ReservoirSamplePercentage(Allocator &allocator, double percentage, int64_t seed); //! Add a chunk of data to the sample void AddToReservoir(DataChunk &input) override; //! Fetches a chunk from the sample. Note that this method is destructive and should only be used after the //! sample is completely built. unique_ptr GetChunk() override; private: void Finalize(); private: Allocator &allocator; //! The sample_size to sample double sample_percentage; //! The fixed sample size of the sub-reservoirs idx_t reservoir_sample_size; //! The current sample unique_ptr current_sample; //! The set of finished samples of the reservoir sample vector> finished_samples; //! The amount of tuples that have been processed so far idx_t current_count = 0; //! Whether or not the stream is finalized. The stream is automatically finalized on the first call to GetChunk(); bool is_finalized; }; } // namespace duckdb