cxxmcp 1.1.6
C++ MCP SDK
Loading...
Searching...
No Matches
async_result.hpp
1// Copyright (c) 2025 [caomengxuan666]
2
3#pragma once
4
5#include <chrono>
6#include <condition_variable>
7#include <exception>
8#include <functional>
9#include <memory>
10#include <mutex>
11#include <optional>
12#include <string>
13#include <type_traits>
14#include <utility>
15#include <vector>
16
18
19namespace mcp::core {
20
28template <class T>
30 public:
31 using ResultType = Result<T>;
32
33 AsyncResult() = default;
34
35 AsyncResult(const AsyncResult&) = delete;
36 AsyncResult& operator=(const AsyncResult&) = delete;
37
40 void set_value(T value) {
41 std::vector<std::function<void(ResultType)>> conts;
42 {
43 std::lock_guard<std::mutex> lock(mutex_);
44 if (value_.has_value()) return; // already set
45 value_.emplace(std::move(value));
46 conts = std::move(continuations_);
47 }
48 cv_.notify_all();
49 fire_continuations(conts, *value_);
50 }
51
53 void set_error(Error error) {
54 std::vector<std::function<void(ResultType)>> conts;
55 {
56 std::lock_guard<std::mutex> lock(mutex_);
57 if (value_.has_value()) return;
58 value_.emplace(unexpected(std::move(error)));
59 conts = std::move(continuations_);
60 }
61 cv_.notify_all();
62 fire_continuations(conts, *value_);
63 }
64
66 void cancel(std::string reason = {}) {
67 std::vector<std::function<void(ResultType)>> conts;
68 {
69 std::lock_guard<std::mutex> lock(mutex_);
70 if (value_.has_value()) return;
71 value_.emplace(
73 reason.empty() ? "cancelled" : std::move(reason),
74 {},
75 "cancellation"}));
76 conts = std::move(continuations_);
77 }
78 cv_.notify_all();
79 fire_continuations(conts, *value_);
80 }
81
83 void set_exception(std::exception_ptr ex) {
84 try {
85 std::rethrow_exception(ex);
86 } catch (const std::exception& e) {
87 set_error(Error{1, e.what(), {}, "exception"});
88 } catch (...) {
89 set_error(Error{1, "unknown exception", {}, "exception"});
90 }
91 }
92
94 ResultType wait() const {
95 std::unique_lock<std::mutex> lock(mutex_);
96 cv_.wait(lock, [this]() { return value_.has_value(); });
97 return *value_;
98 }
99
101 ResultType wait_for(std::chrono::milliseconds timeout) const {
102 std::unique_lock<std::mutex> lock(mutex_);
103 if (!cv_.wait_for(lock, timeout, // NOLINT(whitespace/newline)
104 [this]() { return value_.has_value(); })) {
105 return unexpected(Error{1, "AsyncResult wait timed out", {}, "timeout"});
106 }
107 return *value_;
108 }
109
111 ResultType wait_until(std::chrono::steady_clock::time_point deadline) const {
112 std::unique_lock<std::mutex> lock(mutex_);
113 if (!cv_.wait_until(lock, deadline,
114 [this]() { return value_.has_value(); })) {
115 return unexpected(Error{1, "AsyncResult wait timed out", {}, "timeout"});
116 }
117 return *value_;
118 }
119
121 bool ready() const {
122 std::lock_guard<std::mutex> lock(mutex_);
123 return value_.has_value();
124 }
125
134 template <class F>
135 auto then(F&& callback)
136 -> std::shared_ptr<AsyncResult<std::invoke_result_t<F, ResultType>>> {
137 using R = std::invoke_result_t<F, ResultType>;
138 auto next = std::make_shared<AsyncResult<R>>();
139 std::optional<ResultType> ready_value;
140 {
141 std::lock_guard<std::mutex> lock(mutex_);
142 if (value_.has_value()) {
143 ready_value = *value_;
144 } else {
145 continuations_.emplace_back(
146 [next, cb = std::forward<F>(callback)](ResultType result) mutable {
147 try {
148 next->set_value(cb(std::move(result)));
149 } catch (...) {
150 next->set_exception(std::current_exception());
151 }
152 });
153 }
154 }
155 if (ready_value.has_value()) {
156 try {
157 next->set_value(callback(std::move(*ready_value)));
158 } catch (...) {
159 next->set_exception(std::current_exception());
160 }
161 }
162 return next;
163 }
164
165 private:
166 static void fire_continuations(
167 const std::vector<std::function<void(ResultType)>>& conts,
168 const ResultType& value) {
169 for (auto& cont : conts) {
170 try {
171 cont(value);
172 } catch (...) {
173 // continuation exceptions are swallowed; the result is already set
174 }
175 }
176 }
177
178 mutable std::mutex mutex_;
179 mutable std::condition_variable cv_;
180 std::optional<ResultType> value_;
181 std::vector<std::function<void(ResultType)>> continuations_;
182};
183
185template <>
186class AsyncResult<void> {
187 public:
188 using ResultType = Result<Unit>;
189
190 AsyncResult() = default;
191
192 AsyncResult(const AsyncResult&) = delete;
193 AsyncResult& operator=(const AsyncResult&) = delete;
194
195 void set_value() {
196 std::vector<std::function<void(ResultType)>> conts;
197 {
198 std::lock_guard<std::mutex> lock(mutex_);
199 if (value_.has_value()) return;
200 value_.emplace(Unit{});
201 conts = std::move(continuations_);
202 }
203 cv_.notify_all();
204 for (auto& cont : conts) {
205 try {
206 cont(*value_);
207 } catch (...) {
208 }
209 }
210 }
211
212 void set_error(Error error) {
213 std::vector<std::function<void(ResultType)>> conts;
214 {
215 std::lock_guard<std::mutex> lock(mutex_);
216 if (value_.has_value()) return;
217 value_.emplace(unexpected(std::move(error)));
218 conts = std::move(continuations_);
219 }
220 cv_.notify_all();
221 for (auto& cont : conts) {
222 try {
223 cont(*value_);
224 } catch (...) {
225 }
226 }
227 }
228
229 void cancel(std::string reason = {}) {
230 std::vector<std::function<void(ResultType)>> conts;
231 {
232 std::lock_guard<std::mutex> lock(mutex_);
233 if (value_.has_value()) return;
234 value_.emplace(
236 reason.empty() ? "cancelled" : std::move(reason),
237 {},
238 "cancellation"}));
239 conts = std::move(continuations_);
240 }
241 cv_.notify_all();
242 for (auto& cont : conts) {
243 try {
244 cont(*value_);
245 } catch (...) {
246 }
247 }
248 }
249
250 ResultType wait() const {
251 std::unique_lock<std::mutex> lock(mutex_);
252 cv_.wait(lock, [this]() { return value_.has_value(); });
253 return *value_;
254 }
255
256 ResultType wait_for(std::chrono::milliseconds timeout) const {
257 std::unique_lock<std::mutex> lock(mutex_);
258 if (!cv_.wait_for(lock, timeout, // NOLINT(whitespace/newline)
259 [this]() { return value_.has_value(); })) {
260 return unexpected(Error{1, "AsyncResult wait timed out", {}, "timeout"});
261 }
262 return *value_;
263 }
264
265 ResultType wait_until(std::chrono::steady_clock::time_point deadline) const {
266 std::unique_lock<std::mutex> lock(mutex_);
267 if (!cv_.wait_until(lock, deadline,
268 [this]() { return value_.has_value(); })) {
269 return unexpected(Error{1, "AsyncResult wait timed out", {}, "timeout"});
270 }
271 return *value_;
272 }
273
274 bool ready() const {
275 std::lock_guard<std::mutex> lock(mutex_);
276 return value_.has_value();
277 }
278
279 template <class F>
280 auto then(F&& callback)
281 -> std::shared_ptr<AsyncResult<std::invoke_result_t<F, ResultType>>> {
282 using R = std::invoke_result_t<F, ResultType>;
283 auto next = std::make_shared<AsyncResult<R>>();
284 std::optional<ResultType> ready_value;
285 {
286 std::lock_guard<std::mutex> lock(mutex_);
287 if (value_.has_value()) {
288 ready_value = *value_;
289 } else {
290 continuations_.emplace_back(
291 [next, cb = std::forward<F>(callback)](ResultType result) mutable {
292 try {
293 next->set_value(cb(std::move(result)));
294 } catch (...) {
295 next->set_exception(std::current_exception());
296 }
297 });
298 }
299 }
300 if (ready_value.has_value()) {
301 try {
302 next->set_value(callback(std::move(*ready_value)));
303 } catch (...) {
304 next->set_exception(std::current_exception());
305 }
306 }
307 return next;
308 }
309
310 private:
311 mutable std::mutex mutex_;
312 mutable std::condition_variable cv_;
313 std::optional<ResultType> value_;
314 std::vector<std::function<void(ResultType)>> continuations_;
315};
316
317} // namespace mcp::core
A thread-safe, CV-based future that replaces std::promise/shared_future.
Definition async_result.hpp:29
ResultType wait_until(std::chrono::steady_clock::time_point deadline) const
Block until the result is available or the deadline is reached.
Definition async_result.hpp:111
void cancel(std::string reason={})
Cancel the result with a cancellation error. Notifies all waiters.
Definition async_result.hpp:66
ResultType wait_for(std::chrono::milliseconds timeout) const
Block until the result is available or the timeout expires.
Definition async_result.hpp:101
auto then(F &&callback) -> std::shared_ptr< AsyncResult< std::invoke_result_t< F, ResultType > > >
Chain a continuation that fires when the result is set.
Definition async_result.hpp:135
void set_exception(std::exception_ptr ex)
Set the result from an exception_ptr.
Definition async_result.hpp:83
bool ready() const
Returns true if the result has been set.
Definition async_result.hpp:121
void set_value(T value)
Set the result value.
Definition async_result.hpp:40
void set_error(Error error)
Set an error result. Notifies all waiters and fires continuations.
Definition async_result.hpp:53
ResultType wait() const
Block until the result is available. CV-based, no polling.
Definition async_result.hpp:94
Shared result and error primitives used by the public cxxmcp SDK.
std::monostate Unit
Success value for operations that only need to report failure.
Definition result.hpp:55
tl::expected< T, Error > Result
Alias for the SDK result type.
Definition result.hpp:64
constexpr auto unexpected(E &&value)
Creates an unexpected result value for the active expected backend.
Definition result.hpp:24
Structured error returned by fallible SDK operations.
Definition result.hpp:35