cxxmcp 1.1.6
C++ MCP SDK
Loading...
Searching...
No Matches
request.hpp
Go to the documentation of this file.
1// Copyright (c) 2025 [caomengxuan666]
2
3#pragma once
4
7
8#include <chrono>
9#include <cstddef>
10#include <exception>
11#include <functional>
12#include <memory>
13#include <mutex>
14#include <optional>
15#include <string>
16#include <unordered_map>
17#include <utility>
18
20#include "cxxmcp/core/async_result.hpp"
21#include "cxxmcp/core/executor.hpp"
23#include "cxxmcp/error.hpp"
25
26namespace mcp {
27
34 std::size_t worker_count = 4;
35 std::size_t max_queue_size = 256;
36};
37
38namespace detail {
39
41 std::mutex mutex;
43 std::unique_ptr<core::Executor> executor;
44};
45
46inline RequestExecutorState& request_executor_state() {
47 static RequestExecutorState state;
48 return state;
49}
50
51inline core::Error request_executor_error(std::string message,
52 std::string detail = {}) {
53 return core::Error{static_cast<int>(protocol::ErrorCode::InternalError),
54 std::move(message), std::move(detail), "request"};
55}
56
57inline core::Executor& request_executor() {
58 auto& state = request_executor_state();
59 std::lock_guard lock(state.mutex);
60 if (!state.executor) {
61 state.executor = std::make_unique<core::Executor>(core::Executor::Options{
62 state.options.worker_count, state.options.max_queue_size});
63 }
64 return *state.executor;
65}
66
68 mutable std::mutex mutex;
69 std::optional<core::Error> terminal_error;
70 std::shared_ptr<detail::CancellationRegistration> cancellation_registration;
71 bool cancel_sent = false;
72};
73
74} // namespace detail
75
82 RequestExecutorOptions options) {
83 if (options.worker_count == 0) {
84 return mcp::core::unexpected(detail::request_executor_error(
85 "request executor worker_count must be greater than zero"));
86 }
87 if (options.max_queue_size == 0) {
88 return mcp::core::unexpected(detail::request_executor_error(
89 "request executor max_queue_size must be greater than zero"));
90 }
91
92 auto& state = detail::request_executor_state();
93 std::lock_guard lock(state.mutex);
94 if (state.executor) {
95 return mcp::core::unexpected(detail::request_executor_error(
96 "request executor is already initialized"));
97 }
98 state.options = options;
99 return core::Unit{};
100}
101
105 std::optional<std::chrono::milliseconds> timeout;
106
108 std::optional<protocol::Json> meta;
109
112 std::optional<CancellationToken> cancellation_token;
113
115 std::unordered_map<std::string, std::string> headers;
116
119 std::optional<std::string> protocol_version;
120};
121
122template <class T>
124 public:
125 using ResultType = core::Result<T>;
126 using CancelCallback =
127 std::function<core::Result<core::Unit>(std::string reason)>;
128
129 RequestHandle() = default;
130
133 ResultType result) {
134 auto async_result = std::make_shared<core::AsyncResult<ResultType>>();
135 async_result->set_value(std::move(result));
136 return RequestHandle(std::move(request_id), std::nullopt, std::nullopt, {},
137 std::move(async_result));
138 }
139
140 static RequestHandle spawn(protocol::RequestId request_id,
141 std::optional<std::chrono::milliseconds> timeout,
142 std::optional<CancellationToken> cancellation,
143 CancelCallback cancel,
144 std::function<ResultType()> task) {
145 auto async_result = std::make_shared<core::AsyncResult<ResultType>>();
146 if (!task) {
147 async_result->set_value(
148 mcp::core::unexpected(errors::request_task_missing()));
149 return RequestHandle(std::move(request_id), std::move(timeout),
150 std::move(cancellation), std::move(cancel),
151 std::move(async_result));
152 }
153
154 const auto queued = detail::request_executor().post(
155 [async_result, task = std::move(task)]() mutable {
156 try {
157 async_result->set_value(task());
158 } catch (const std::exception& ex) {
159 async_result->set_value(mcp::core::unexpected(
160 errors::request_worker_exception(ex.what())));
161 } catch (...) {
162 async_result->set_value(mcp::core::unexpected(
163 errors::request_worker_unknown_exception()));
164 }
165 },
166 core::TaskPriority::IO_BOUND);
167 if (!queued) {
168 async_result->set_value(mcp::core::unexpected(queued.error()));
169 }
170
171 auto handle = RequestHandle(std::move(request_id), std::move(timeout),
172 std::move(cancellation), std::move(cancel),
173 std::move(async_result));
174 // Register cancellation observer if a real cancellation source is
175 // configured.
176 if (handle.cancellation_.has_value() &&
177 handle.cancellation_->cancellable()) {
178 handle.start_cancellation_observer();
179 }
180 return handle;
181 }
182
183 const protocol::RequestId& request_id() const noexcept { return request_id_; }
184
185 const std::optional<std::chrono::milliseconds>& timeout() const noexcept {
186 return timeout_;
187 }
188
189 const std::optional<CancellationToken>& cancellation_token() const noexcept {
190 return cancellation_;
191 }
192
201 if (!async_result_) {
202 return mcp::core::unexpected(errors::request_state_missing());
203 }
204
205 if (const auto terminal = terminal_error(); terminal.has_value()) {
206 return mcp::core::unexpected(*terminal);
207 }
208
209 // Fast path: no timeout, no cancellation
210 if (!timeout_.has_value() && !cancellation_.has_value()) {
211 return read_from_async_result(async_result_->wait());
212 }
213
214 // Timeout-only path
215 if (timeout_.has_value() && !cancellation_.has_value()) {
216 auto result = async_result_->wait_for(*timeout_);
217 if (const auto terminal = terminal_error(); terminal.has_value()) {
218 return mcp::core::unexpected(*terminal);
219 }
220 if (result) {
221 return read_from_async_result(*result);
222 }
223 return fail_terminal("request timeout",
224 errors::request_timed_out(*timeout_));
225 }
226
227 // Cancellation path (with optional timeout). The token observer will set a
228 // terminal error and cancel the async_result_ when the token fires.
229 if (timeout_.has_value()) {
230 auto result = async_result_->wait_for(*timeout_);
231 if (const auto terminal = terminal_error(); terminal.has_value()) {
232 return mcp::core::unexpected(*terminal);
233 }
234 if (result) {
235 return read_from_async_result(*result);
236 }
237 // Check if cancellation fired
238 if (cancellation_.has_value() && cancellation_->cancelled()) {
239 return fail_terminal("request cancelled", errors::request_cancelled());
240 }
241 return fail_terminal("request timeout",
242 errors::request_timed_out(*timeout_));
243 }
244
245 // No timeout, just cancellation -- watcher delivers via async_result_
246 auto result = async_result_->wait();
247 if (const auto terminal = terminal_error(); terminal.has_value()) {
248 return mcp::core::unexpected(*terminal);
249 }
250 return read_from_async_result(result);
251 }
252
253 core::Result<core::Unit> cancel(std::string reason = {}) const {
254 if (!cancel_) {
255 return core::Unit{};
256 }
257 return cancel_(std::move(reason));
258 }
259
260 private:
261 RequestHandle(protocol::RequestId request_id,
262 std::optional<std::chrono::milliseconds> timeout,
263 std::optional<CancellationToken> cancellation,
264 CancelCallback cancel,
265 std::shared_ptr<core::AsyncResult<ResultType>> async_result)
266 : request_id_(std::move(request_id)),
267 timeout_(std::move(timeout)),
268 cancellation_(std::move(cancellation)),
269 cancel_(std::move(cancel)),
270 async_result_(std::move(async_result)),
271 control_(std::make_shared<detail::RequestHandleControl>()) {}
272
275 void start_cancellation_observer() const {
276 if (!cancellation_.has_value() || !async_result_) return;
277
278 std::weak_ptr<detail::RequestHandleControl> weak_control = control_;
279 auto async_result = async_result_;
280 auto cancel_token = *cancellation_;
281 auto cancel_cb = cancel_;
282
283 auto registration = std::make_shared<detail::CancellationRegistration>(
284 detail::register_cancellation_callback(
285 cancel_token, [weak_control, async_result, cancel_cb]() {
286 bool send_cancel = false;
287 if (auto control = weak_control.lock()) {
288 std::shared_ptr<detail::CancellationRegistration> registration;
289 {
290 std::lock_guard lock(control->mutex);
291 if (!control->terminal_error.has_value()) {
292 control->terminal_error = errors::request_cancelled();
293 }
294 if (!control->cancel_sent) {
295 control->cancel_sent = true;
296 send_cancel = true;
297 }
298 registration = std::move(control->cancellation_registration);
299 }
300 }
301 async_result->cancel("request cancelled");
302 if (send_cancel && cancel_cb) {
303 (void)cancel_cb("request cancelled");
304 }
305 }));
306
307 if (registration->active() && control_) {
308 {
309 std::lock_guard lock(control_->mutex);
310 control_->cancellation_registration = registration;
311 }
312 async_result_->then([weak_control](const auto&) {
313 if (auto control = weak_control.lock()) {
314 std::shared_ptr<detail::CancellationRegistration> registration;
315 {
316 std::lock_guard lock(control->mutex);
317 registration = std::move(control->cancellation_registration);
318 }
319 }
320 return core::Unit{};
321 });
322 }
323 }
324
325 std::optional<core::Error> terminal_error() const {
326 if (!control_) {
327 return std::nullopt;
328 }
329 std::lock_guard lock(control_->mutex);
330 return control_->terminal_error;
331 }
332
333 core::Result<T> read_from_async_result(
334 const core::Result<ResultType>& outer) const noexcept {
335 if (!outer) {
336 return mcp::core::unexpected(outer.error());
337 }
338 return *outer;
339 }
340
341 core::Result<T> read_from_async_result(
342 const ResultType& inner) const noexcept {
343 return inner;
344 }
345
346 core::Result<T> fail_terminal(std::string reason, core::Error error) const {
347 bool send_cancel = false;
348 core::Error terminal = error;
349 std::shared_ptr<detail::CancellationRegistration> registration;
350 if (control_) {
351 std::lock_guard lock(control_->mutex);
352 if (!control_->terminal_error.has_value()) {
353 control_->terminal_error = std::move(error);
354 }
355 terminal = *control_->terminal_error;
356 if (!control_->cancel_sent) {
357 control_->cancel_sent = true;
358 send_cancel = true;
359 }
360 registration = std::move(control_->cancellation_registration);
361 } else {
362 send_cancel = true;
363 }
364
365 if (send_cancel) {
366 (void)cancel(std::move(reason));
367 }
368 return mcp::core::unexpected(std::move(terminal));
369 }
370
371 protocol::RequestId request_id_;
372 std::optional<std::chrono::milliseconds> timeout_;
373 std::optional<CancellationToken> cancellation_;
374 CancelCallback cancel_;
375 std::shared_ptr<core::AsyncResult<ResultType>> async_result_;
376 std::shared_ptr<detail::RequestHandleControl> control_;
377};
378
379} // namespace mcp
Cooperative cancellation primitives shared by SDK lifecycle APIs.
Definition request.hpp:123
static RequestHandle ready(protocol::RequestId request_id, ResultType result)
Creates a handle whose result is already available.
Definition request.hpp:132
core::Result< T > await_response() const
Waits for the request result.
Definition request.hpp:200
Stable public error helpers for SDK request and dispatch paths.
Shared JSON, JSON-RPC, error, cancellation, and progress model types.
std::variant< std::int64_t, std::string > RequestId
JSON-RPC request or response identifier.
Definition types.hpp:56
core::Result< core::Unit > configure_request_executor(RequestExecutorOptions options)
Configures the process-wide background request executor.
Definition request.hpp:81
Shared result and error primitives used by the public cxxmcp SDK.
std::monostate Unit
Success value for operations that only need to report failure.
Definition result.hpp:55
tl::expected< T, Error > Result
Alias for the SDK result type.
Definition result.hpp:64
constexpr auto unexpected(E &&value)
Creates an unexpected result value for the active expected backend.
Definition result.hpp:24
Configuration for the background request executor.
Definition request.hpp:33
Options for an outbound SDK request.
Definition request.hpp:103
std::optional< CancellationToken > cancellation_token
Optional cooperative cancellation token observed while awaiting the response.
Definition request.hpp:112
std::optional< protocol::Json > meta
Optional protocol metadata attached to the request envelope.
Definition request.hpp:108
std::optional< std::chrono::milliseconds > timeout
Optional timeout applied when awaiting the response.
Definition request.hpp:105
std::optional< std::string > protocol_version
Optional override for the MCP-Protocol-Version header sent on this request.
Definition request.hpp:119
std::unordered_map< std::string, std::string > headers
Per-request HTTP headers merged into the transport layer.
Definition request.hpp:115
Structured error returned by fallible SDK operations.
Definition result.hpp:35
Definition request.hpp:40
Definition request.hpp:67