8#include <condition_variable>
22#include "cxxmcp/core/timer_handle.hpp"
31enum class TaskPriority { IO_BOUND = 0, DEFAULT = 1, BACKGROUND = 2 };
37 std::size_t worker_count = 4;
38 std::size_t max_queue_size = 256;
39 std::function<void(std::exception_ptr)> exception_handler;
57 using ExceptionHandler = std::function<void(std::exception_ptr)>;
60 explicit Executor(
Options options = {}) : options_(std::move(options)) {
61 if (options_.worker_count == 0) {
62 options_.worker_count = 1;
64 for (std::size_t i = 0; i < options_.worker_count; ++i) {
65 workers_.emplace_back([
this]() { worker_loop(); });
67 timer_thread_ = std::thread([
this]() { timer_loop(); });
72 Executor(std::size_t worker_count, std::size_t max_queue_size)
73 :
Executor(worker_count, max_queue_size, ExceptionHandler{}) {}
77 Executor(std::size_t worker_count, std::size_t max_queue_size,
78 ExceptionHandler exception_handler) {
79 options_.worker_count = worker_count;
80 options_.max_queue_size = max_queue_size;
81 options_.exception_handler = std::move(exception_handler);
82 if (options_.worker_count == 0) {
83 options_.worker_count = 1;
85 for (std::size_t i = 0; i < options_.worker_count; ++i) {
86 workers_.emplace_back([
this]() { worker_loop(); });
88 timer_thread_ = std::thread([
this]() { timer_loop(); });
99 return post(std::move(task), TaskPriority::DEFAULT);
104 TaskPriority prio = TaskPriority::DEFAULT) {
106 return core::unexpected(
107 Error{1,
"executor task must be callable", {},
"executor"});
110 std::lock_guard<std::mutex> lock(mutex_);
112 return core::unexpected(
113 Error{1,
"executor is stopped", {},
"executor"});
115 const auto idx =
static_cast<int>(prio);
116 if (queues_[idx].size() >= options_.max_queue_size) {
117 return core::unexpected(
118 Error{1,
"executor queue is full", {},
"executor"});
120 queues_[idx].push_back(std::move(task));
122 work_cv_.notify_one();
128 std::function<
void()> task,
129 TaskPriority prio = TaskPriority::DEFAULT) {
130 if (!task || delay.count() <= 0) {
133 (void)
post(std::move(task), prio);
137 return post_at(std::chrono::steady_clock::now() + delay, std::move(task),
143 std::function<
void()> task,
144 TaskPriority prio = TaskPriority::DEFAULT) {
148 auto entry = std::make_shared<TimerEntry>();
150 entry->task = std::move(task);
151 entry->priority =
static_cast<int>(prio);
154 std::lock_guard<std::mutex> lock(timer_mutex_);
155 entry->id = next_timer_id_++;
158 timer_cv_.notify_one();
165 std::lock_guard<std::mutex> lock(mutex_);
173 work_cv_.notify_all();
175 std::lock_guard<std::mutex> lock(timer_mutex_);
178 decltype(timers_) empty;
181 timer_cv_.notify_all();
183 for (
auto& w : workers_) {
189 if (timer_thread_.joinable()) {
190 timer_thread_.join();
200 std::lock_guard<std::mutex> lock(mutex_);
205 for (
auto& q : queues_) {
209 work_cv_.notify_all();
211 std::lock_guard<std::mutex> lock(timer_mutex_);
213 while (!timers_.empty()) {
217 timer_cv_.notify_all();
219 for (
auto& w : workers_) {
225 if (timer_thread_.joinable()) {
226 timer_thread_.join();
233 std::function<void()> task;
235 std::unique_lock<std::mutex> lock(mutex_);
236 work_cv_.wait(lock, [
this]() {
237 return stopping_ || drain_mode_ || has_any_task();
239 if (stopping_ && !has_any_task()) {
242 if (drain_mode_ && !has_any_task()) {
245 task = dequeue_highest_priority();
250 if (options_.exception_handler) {
251 options_.exception_handler(std::current_exception());
259 std::shared_ptr<TimerEntry> next;
261 std::unique_lock<std::mutex> lock(timer_mutex_);
263 while (!timers_.empty() &&
264 timers_.top()->cancelled.load(std::memory_order_acquire)) {
267 if (timers_.empty()) {
269 [
this]() {
return stopping_ || !timers_.empty(); });
274 auto deadline = timers_.top()->when;
275 timer_cv_.wait_until(lock, deadline, [
this, deadline]() {
277 (timers_.empty() ?
true : timers_.top()->when != deadline);
284 while (!timers_.empty() &&
285 timers_.top()->cancelled.load(std::memory_order_acquire)) {
288 if (timers_.empty()) {
291 auto top = timers_.top();
292 if (top->when > std::chrono::steady_clock::now()) {
296 next = std::move(top);
299 if (next && next->task) {
300 (void)
post(std::move(next->task),
301 static_cast<TaskPriority
>(next->priority));
306 bool has_any_task()
const {
307 for (
const auto& q : queues_) {
308 if (!q.empty())
return true;
313 std::function<void()> dequeue_highest_priority() {
314 for (
auto& q : queues_) {
316 auto task = std::move(q.front());
324 struct TimerEntryCompare {
325 bool operator()(
const std::shared_ptr<TimerEntry>& a,
326 const std::shared_ptr<TimerEntry>& b)
const {
327 return a->when > b->when;
333 std::condition_variable work_cv_;
334 std::deque<std::function<void()>> queues_[3];
335 std::vector<std::thread> workers_;
336 std::atomic<bool> stopping_ =
false;
337 std::atomic<bool> drain_mode_ =
false;
339 std::mutex timer_mutex_;
340 std::condition_variable timer_cv_;
341 std::priority_queue<std::shared_ptr<TimerEntry>,
342 std::vector<std::shared_ptr<TimerEntry>>,
345 std::thread timer_thread_;
346 std::uint64_t next_timer_id_ = 1;
350using BoundedExecutor = Executor;
Thread pool executor with priority queues and a timer wheel.
Definition executor.hpp:55
void shutdown() noexcept
Graceful shutdown: complete all queued tasks, then join threads.
Definition executor.hpp:163
Executor(std::size_t worker_count, std::size_t max_queue_size, ExceptionHandler exception_handler)
Backward-compatible constructor matching BoundedExecutor's signature.
Definition executor.hpp:77
TimerHandle post_after(std::chrono::milliseconds delay, std::function< void()> task, TaskPriority prio=TaskPriority::DEFAULT)
Schedule a task to run after a delay.
Definition executor.hpp:127
core::Result< Unit > post(std::function< void()> task, TaskPriority prio=TaskPriority::DEFAULT)
Post a task for execution at the given priority.
Definition executor.hpp:103
void cancel_and_stop()
Cancel pending tasks and stop immediately.
Definition executor.hpp:198
TimerHandle post_at(std::chrono::steady_clock::time_point when, std::function< void()> task, TaskPriority prio=TaskPriority::DEFAULT)
Schedule a task to run at an absolute time point.
Definition executor.hpp:142
Executor(std::size_t worker_count, std::size_t max_queue_size)
Backward-compatible constructor matching BoundedExecutor's signature.
Definition executor.hpp:72
void stop()
Backward-compatible stop (equivalent to shutdown).
Definition executor.hpp:195
core::Result< Unit > enqueue(std::function< void()> task)
Backward-compatible enqueue (equivalent to post with DEFAULT priority).
Definition executor.hpp:98
Handle to a scheduled timer that can be cancelled before it fires.
Definition timer_handle.hpp:27
Shared result and error primitives used by the public cxxmcp SDK.
std::monostate Unit
Success value for operations that only need to report failure.
Definition result.hpp:55
tl::expected< T, Error > Result
Alias for the SDK result type.
Definition result.hpp:64
Structured error returned by fallible SDK operations.
Definition result.hpp:35
Options for the Executor class.
Definition executor.hpp:36