126 using CancelCallback =
127 std::function<core::Result<core::Unit>(std::string reason)>;
134 auto async_result = std::make_shared<core::AsyncResult<ResultType>>();
135 async_result->set_value(std::move(result));
136 return RequestHandle(std::move(request_id), std::nullopt, std::nullopt, {},
137 std::move(async_result));
141 std::optional<std::chrono::milliseconds> timeout,
142 std::optional<CancellationToken> cancellation,
143 CancelCallback cancel,
144 std::function<ResultType()> task) {
145 auto async_result = std::make_shared<core::AsyncResult<ResultType>>();
147 async_result->set_value(
148 mcp::core::unexpected(errors::request_task_missing()));
149 return RequestHandle(std::move(request_id), std::move(timeout),
150 std::move(cancellation), std::move(cancel),
151 std::move(async_result));
154 const auto queued = detail::request_executor().post(
155 [async_result, task = std::move(task)]()
mutable {
157 async_result->set_value(task());
158 }
catch (
const std::exception& ex) {
159 async_result->set_value(mcp::core::unexpected(
160 errors::request_worker_exception(ex.what())));
162 async_result->set_value(mcp::core::unexpected(
163 errors::request_worker_unknown_exception()));
166 core::TaskPriority::IO_BOUND);
168 async_result->set_value(mcp::core::unexpected(queued.error()));
171 auto handle = RequestHandle(std::move(request_id), std::move(timeout),
172 std::move(cancellation), std::move(cancel),
173 std::move(async_result));
176 if (handle.cancellation_.has_value() &&
177 handle.cancellation_->cancellable()) {
178 handle.start_cancellation_observer();
183 const protocol::RequestId& request_id() const noexcept {
return request_id_; }
185 const std::optional<std::chrono::milliseconds>& timeout() const noexcept {
189 const std::optional<CancellationToken>& cancellation_token() const noexcept {
190 return cancellation_;
201 if (!async_result_) {
202 return mcp::core::unexpected(errors::request_state_missing());
205 if (
const auto terminal = terminal_error(); terminal.has_value()) {
206 return mcp::core::unexpected(*terminal);
210 if (!timeout_.has_value() && !cancellation_.has_value()) {
211 return read_from_async_result(async_result_->wait());
215 if (timeout_.has_value() && !cancellation_.has_value()) {
216 auto result = async_result_->wait_for(*timeout_);
217 if (
const auto terminal = terminal_error(); terminal.has_value()) {
218 return mcp::core::unexpected(*terminal);
221 return read_from_async_result(*result);
223 return fail_terminal(
"request timeout",
224 errors::request_timed_out(*timeout_));
229 if (timeout_.has_value()) {
230 auto result = async_result_->wait_for(*timeout_);
231 if (
const auto terminal = terminal_error(); terminal.has_value()) {
232 return mcp::core::unexpected(*terminal);
235 return read_from_async_result(*result);
238 if (cancellation_.has_value() && cancellation_->cancelled()) {
239 return fail_terminal(
"request cancelled", errors::request_cancelled());
241 return fail_terminal(
"request timeout",
242 errors::request_timed_out(*timeout_));
246 auto result = async_result_->wait();
247 if (
const auto terminal = terminal_error(); terminal.has_value()) {
248 return mcp::core::unexpected(*terminal);
250 return read_from_async_result(result);
257 return cancel_(std::move(reason));
261 RequestHandle(protocol::RequestId request_id,
262 std::optional<std::chrono::milliseconds> timeout,
263 std::optional<CancellationToken> cancellation,
264 CancelCallback cancel,
265 std::shared_ptr<core::AsyncResult<ResultType>> async_result)
266 : request_id_(std::move(request_id)),
267 timeout_(std::move(timeout)),
268 cancellation_(std::move(cancellation)),
269 cancel_(std::move(cancel)),
270 async_result_(std::move(async_result)),
271 control_(std::make_shared<detail::RequestHandleControl>()) {}
275 void start_cancellation_observer()
const {
276 if (!cancellation_.has_value() || !async_result_)
return;
278 std::weak_ptr<detail::RequestHandleControl> weak_control = control_;
279 auto async_result = async_result_;
280 auto cancel_token = *cancellation_;
281 auto cancel_cb = cancel_;
283 auto registration = std::make_shared<detail::CancellationRegistration>(
284 detail::register_cancellation_callback(
285 cancel_token, [weak_control, async_result, cancel_cb]() {
286 bool send_cancel =
false;
287 if (
auto control = weak_control.lock()) {
288 std::shared_ptr<detail::CancellationRegistration> registration;
290 std::lock_guard lock(control->mutex);
291 if (!control->terminal_error.has_value()) {
292 control->terminal_error = errors::request_cancelled();
294 if (!control->cancel_sent) {
295 control->cancel_sent = true;
298 registration = std::move(control->cancellation_registration);
301 async_result->cancel(
"request cancelled");
302 if (send_cancel && cancel_cb) {
303 (void)cancel_cb(
"request cancelled");
307 if (registration->active() && control_) {
309 std::lock_guard lock(control_->mutex);
310 control_->cancellation_registration = registration;
312 async_result_->then([weak_control](
const auto&) {
313 if (
auto control = weak_control.lock()) {
314 std::shared_ptr<detail::CancellationRegistration> registration;
316 std::lock_guard lock(control->mutex);
317 registration = std::move(control->cancellation_registration);
325 std::optional<core::Error> terminal_error()
const {
329 std::lock_guard lock(control_->mutex);
330 return control_->terminal_error;
333 core::Result<T> read_from_async_result(
334 const core::Result<ResultType>& outer)
const noexcept {
341 core::Result<T> read_from_async_result(
342 const ResultType& inner)
const noexcept {
346 core::Result<T> fail_terminal(std::string reason, core::Error error)
const {
347 bool send_cancel =
false;
348 core::Error terminal = error;
349 std::shared_ptr<detail::CancellationRegistration> registration;
351 std::lock_guard lock(control_->mutex);
352 if (!control_->terminal_error.has_value()) {
353 control_->terminal_error = std::move(error);
355 terminal = *control_->terminal_error;
356 if (!control_->cancel_sent) {
357 control_->cancel_sent =
true;
360 registration = std::move(control_->cancellation_registration);
366 (void)cancel(std::move(reason));
371 protocol::RequestId request_id_;
372 std::optional<std::chrono::milliseconds> timeout_;
373 std::optional<CancellationToken> cancellation_;
374 CancelCallback cancel_;
375 std::shared_ptr<core::AsyncResult<ResultType>> async_result_;
376 std::shared_ptr<detail::RequestHandleControl> control_;