//===----------------------------------------------------------------------===// // DuckDB // // duckdb/parallel/task_scheduler.hpp // // //===----------------------------------------------------------------------===// #pragma once #include "duckdb/common/common.hpp" #include "duckdb/common/mutex.hpp" #include "duckdb/common/vector.hpp" #include "duckdb/parallel/task.hpp" #include "duckdb/common/atomic.hpp" namespace duckdb { struct ConcurrentQueue; struct QueueProducerToken; class ClientContext; class DatabaseInstance; class TaskScheduler; struct SchedulerThread; struct ProducerToken { ProducerToken(TaskScheduler &scheduler, unique_ptr token); ~ProducerToken(); TaskScheduler &scheduler; unique_ptr token; mutex producer_lock; }; //! The TaskScheduler is responsible for managing tasks and threads class TaskScheduler { // timeout for semaphore wait, default 5ms constexpr static int64_t TASK_TIMEOUT_USECS = 5000; public: TaskScheduler(DatabaseInstance &db); ~TaskScheduler(); DUCKDB_API static TaskScheduler &GetScheduler(ClientContext &context); DUCKDB_API static TaskScheduler &GetScheduler(DatabaseInstance &db); unique_ptr CreateProducer(); //! Schedule a task to be executed by the task scheduler void ScheduleTask(ProducerToken &producer, shared_ptr task); //! Fetches a task from a specific producer, returns true if successful or false if no tasks were available bool GetTaskFromProducer(ProducerToken &token, shared_ptr &task); //! Run tasks forever until "marker" is set to false, "marker" must remain valid until the thread is joined void ExecuteForever(atomic *marker); //! Run tasks until `marker` is set to false, `max_tasks` have been completed, or until there are no more tasks //! available. Returns the number of tasks that were completed. idx_t ExecuteTasks(atomic *marker, idx_t max_tasks); //! Run tasks until `max_tasks` have been completed, or until there are no more tasks available void ExecuteTasks(idx_t max_tasks); //! Sets the amount of active threads executing tasks for the system; n-1 background threads will be launched. //! The main thread will also be used for execution void SetThreads(int32_t n); //! Returns the number of threads DUCKDB_API int32_t NumberOfThreads(); //! Send signals to n threads, signalling for them to wake up and attempt to execute a task void Signal(idx_t n); private: void SetThreadsInternal(int32_t n); private: DatabaseInstance &db; //! The task queue unique_ptr queue; //! Lock for modifying the thread count mutex thread_lock; //! The active background threads of the task scheduler vector> threads; //! Markers used by the various threads, if the markers are set to "false" the thread execution is stopped vector>> markers; }; } // namespace duckdb