cxxmcp 1.1.6
C++ MCP SDK
Loading...
Searching...
No Matches
executor.hpp
1// Copyright (c) 2025 [caomengxuan666]
2
3#pragma once
4
5#include <algorithm>
6#include <atomic>
7#include <chrono>
8#include <condition_variable>
9#include <cstddef>
10#include <deque>
11#include <exception>
12#include <functional>
13#include <memory>
14#include <mutex>
15#include <queue>
16#include <string>
17#include <thread>
18#include <utility>
19#include <vector>
20
22#include "cxxmcp/core/timer_handle.hpp"
23
24namespace mcp::core {
25
31enum class TaskPriority { IO_BOUND = 0, DEFAULT = 1, BACKGROUND = 2 };
32
33class Executor;
34
37 std::size_t worker_count = 4;
38 std::size_t max_queue_size = 256;
39 std::function<void(std::exception_ptr)> exception_handler;
40};
41
55class Executor {
56 public:
57 using ExceptionHandler = std::function<void(std::exception_ptr)>;
59
60 explicit Executor(Options options = {}) : options_(std::move(options)) {
61 if (options_.worker_count == 0) {
62 options_.worker_count = 1;
63 }
64 for (std::size_t i = 0; i < options_.worker_count; ++i) {
65 workers_.emplace_back([this]() { worker_loop(); });
66 }
67 timer_thread_ = std::thread([this]() { timer_loop(); });
68 }
69
72 Executor(std::size_t worker_count, std::size_t max_queue_size)
73 : Executor(worker_count, max_queue_size, ExceptionHandler{}) {}
74
77 Executor(std::size_t worker_count, std::size_t max_queue_size,
78 ExceptionHandler exception_handler) {
79 options_.worker_count = worker_count;
80 options_.max_queue_size = max_queue_size;
81 options_.exception_handler = std::move(exception_handler);
82 if (options_.worker_count == 0) {
83 options_.worker_count = 1;
84 }
85 for (std::size_t i = 0; i < options_.worker_count; ++i) {
86 workers_.emplace_back([this]() { worker_loop(); });
87 }
88 timer_thread_ = std::thread([this]() { timer_loop(); });
89 }
90
91 Executor(const Executor&) = delete;
92 Executor& operator=(const Executor&) = delete;
93
94 ~Executor() { shutdown(); }
95
98 core::Result<Unit> enqueue(std::function<void()> task) {
99 return post(std::move(task), TaskPriority::DEFAULT);
100 }
101
103 core::Result<Unit> post(std::function<void()> task,
104 TaskPriority prio = TaskPriority::DEFAULT) {
105 if (!task) {
106 return core::unexpected(
107 Error{1, "executor task must be callable", {}, "executor"});
108 }
109 {
110 std::lock_guard<std::mutex> lock(mutex_);
111 if (stopping_) {
112 return core::unexpected(
113 Error{1, "executor is stopped", {}, "executor"});
114 }
115 const auto idx = static_cast<int>(prio);
116 if (queues_[idx].size() >= options_.max_queue_size) {
117 return core::unexpected(
118 Error{1, "executor queue is full", {}, "executor"});
119 }
120 queues_[idx].push_back(std::move(task));
121 }
122 work_cv_.notify_one();
123 return Unit{};
124 }
125
127 TimerHandle post_after(std::chrono::milliseconds delay,
128 std::function<void()> task,
129 TaskPriority prio = TaskPriority::DEFAULT) {
130 if (!task || delay.count() <= 0) {
131 // Fire immediately if no delay
132 if (task) {
133 (void)post(std::move(task), prio);
134 }
135 return TimerHandle{};
136 }
137 return post_at(std::chrono::steady_clock::now() + delay, std::move(task),
138 prio);
139 }
140
142 TimerHandle post_at(std::chrono::steady_clock::time_point when,
143 std::function<void()> task,
144 TaskPriority prio = TaskPriority::DEFAULT) {
145 if (!task) {
146 return TimerHandle{};
147 }
148 auto entry = std::make_shared<TimerEntry>();
149 entry->when = when;
150 entry->task = std::move(task);
151 entry->priority = static_cast<int>(prio);
152 TimerHandle handle(entry);
153 {
154 std::lock_guard<std::mutex> lock(timer_mutex_);
155 entry->id = next_timer_id_++;
156 timers_.push(entry);
157 }
158 timer_cv_.notify_one();
159 return handle;
160 }
161
163 void shutdown() noexcept {
164 {
165 std::lock_guard<std::mutex> lock(mutex_);
166 if (stopping_) {
167 return;
168 }
169 stopping_ = true;
170 drain_mode_ = true;
171 }
172 // Wake workers to drain remaining tasks
173 work_cv_.notify_all();
174 {
175 std::lock_guard<std::mutex> lock(timer_mutex_);
176 // Clear pending timers so timer thread can exit without dispatching
177 // stale tasks into a shutting-down executor.
178 decltype(timers_) empty;
179 timers_.swap(empty);
180 }
181 timer_cv_.notify_all();
182
183 for (auto& w : workers_) {
184 if (w.joinable()) {
185 w.join();
186 }
187 }
188 workers_.clear();
189 if (timer_thread_.joinable()) {
190 timer_thread_.join();
191 }
192 }
193
195 void stop() { shutdown(); }
196
199 {
200 std::lock_guard<std::mutex> lock(mutex_);
201 if (stopping_) {
202 return;
203 }
204 stopping_ = true;
205 for (auto& q : queues_) {
206 q.clear();
207 }
208 }
209 work_cv_.notify_all();
210 {
211 std::lock_guard<std::mutex> lock(timer_mutex_);
212 // Pop all pending timers so the timer thread wakes and exits.
213 while (!timers_.empty()) {
214 timers_.pop();
215 }
216 }
217 timer_cv_.notify_all();
218
219 for (auto& w : workers_) {
220 if (w.joinable()) {
221 w.join();
222 }
223 }
224 workers_.clear();
225 if (timer_thread_.joinable()) {
226 timer_thread_.join();
227 }
228 }
229
230 private:
231 void worker_loop() {
232 while (true) {
233 std::function<void()> task;
234 {
235 std::unique_lock<std::mutex> lock(mutex_);
236 work_cv_.wait(lock, [this]() {
237 return stopping_ || drain_mode_ || has_any_task();
238 });
239 if (stopping_ && !has_any_task()) {
240 return;
241 }
242 if (drain_mode_ && !has_any_task()) {
243 return;
244 }
245 task = dequeue_highest_priority();
246 }
247 try {
248 task();
249 } catch (...) {
250 if (options_.exception_handler) {
251 options_.exception_handler(std::current_exception());
252 }
253 }
254 }
255 }
256
257 void timer_loop() {
258 while (true) {
259 std::shared_ptr<TimerEntry> next;
260 {
261 std::unique_lock<std::mutex> lock(timer_mutex_);
262 // Purge cancelled entries from the top
263 while (!timers_.empty() &&
264 timers_.top()->cancelled.load(std::memory_order_acquire)) {
265 timers_.pop();
266 }
267 if (timers_.empty()) {
268 timer_cv_.wait(lock,
269 [this]() { return stopping_ || !timers_.empty(); });
270 if (stopping_) {
271 return;
272 }
273 } else {
274 auto deadline = timers_.top()->when;
275 timer_cv_.wait_until(lock, deadline, [this, deadline]() {
276 return stopping_ ||
277 (timers_.empty() ? true : timers_.top()->when != deadline);
278 });
279 if (stopping_) {
280 return;
281 }
282 }
283 // Purge cancelled entries again after waking
284 while (!timers_.empty() &&
285 timers_.top()->cancelled.load(std::memory_order_acquire)) {
286 timers_.pop();
287 }
288 if (timers_.empty()) {
289 continue;
290 }
291 auto top = timers_.top();
292 if (top->when > std::chrono::steady_clock::now()) {
293 continue; // Not yet due, loop back to wait_until
294 }
295 timers_.pop();
296 next = std::move(top);
297 }
298 // Dispatch the timer task into the appropriate priority queue
299 if (next && next->task) {
300 (void)post(std::move(next->task),
301 static_cast<TaskPriority>(next->priority));
302 }
303 }
304 }
305
306 bool has_any_task() const {
307 for (const auto& q : queues_) {
308 if (!q.empty()) return true;
309 }
310 return false;
311 }
312
313 std::function<void()> dequeue_highest_priority() {
314 for (auto& q : queues_) {
315 if (!q.empty()) {
316 auto task = std::move(q.front());
317 q.pop_front();
318 return task;
319 }
320 }
321 return nullptr;
322 }
323
324 struct TimerEntryCompare {
325 bool operator()(const std::shared_ptr<TimerEntry>& a,
326 const std::shared_ptr<TimerEntry>& b) const {
327 return a->when > b->when; // min-heap: earliest first
328 }
329 };
330
331 Options options_;
332 std::mutex mutex_;
333 std::condition_variable work_cv_;
334 std::deque<std::function<void()>> queues_[3];
335 std::vector<std::thread> workers_;
336 std::atomic<bool> stopping_ = false;
337 std::atomic<bool> drain_mode_ = false;
338
339 std::mutex timer_mutex_;
340 std::condition_variable timer_cv_;
341 std::priority_queue<std::shared_ptr<TimerEntry>,
342 std::vector<std::shared_ptr<TimerEntry>>,
343 TimerEntryCompare>
344 timers_;
345 std::thread timer_thread_;
346 std::uint64_t next_timer_id_ = 1;
347};
348
350using BoundedExecutor = Executor;
351
352} // namespace mcp::core
Thread pool executor with priority queues and a timer wheel.
Definition executor.hpp:55
void shutdown() noexcept
Graceful shutdown: complete all queued tasks, then join threads.
Definition executor.hpp:163
Executor(std::size_t worker_count, std::size_t max_queue_size, ExceptionHandler exception_handler)
Backward-compatible constructor matching BoundedExecutor's signature.
Definition executor.hpp:77
TimerHandle post_after(std::chrono::milliseconds delay, std::function< void()> task, TaskPriority prio=TaskPriority::DEFAULT)
Schedule a task to run after a delay.
Definition executor.hpp:127
core::Result< Unit > post(std::function< void()> task, TaskPriority prio=TaskPriority::DEFAULT)
Post a task for execution at the given priority.
Definition executor.hpp:103
void cancel_and_stop()
Cancel pending tasks and stop immediately.
Definition executor.hpp:198
TimerHandle post_at(std::chrono::steady_clock::time_point when, std::function< void()> task, TaskPriority prio=TaskPriority::DEFAULT)
Schedule a task to run at an absolute time point.
Definition executor.hpp:142
Executor(std::size_t worker_count, std::size_t max_queue_size)
Backward-compatible constructor matching BoundedExecutor's signature.
Definition executor.hpp:72
void stop()
Backward-compatible stop (equivalent to shutdown).
Definition executor.hpp:195
core::Result< Unit > enqueue(std::function< void()> task)
Backward-compatible enqueue (equivalent to post with DEFAULT priority).
Definition executor.hpp:98
Handle to a scheduled timer that can be cancelled before it fires.
Definition timer_handle.hpp:27
Shared result and error primitives used by the public cxxmcp SDK.
std::monostate Unit
Success value for operations that only need to report failure.
Definition result.hpp:55
tl::expected< T, Error > Result
Alias for the SDK result type.
Definition result.hpp:64
Structured error returned by fallible SDK operations.
Definition result.hpp:35
Options for the Executor class.
Definition executor.hpp:36