cxxmcp 1.1.6
C++ MCP SDK
Loading...
Searching...
No Matches
service.hpp
Go to the documentation of this file.
1// Copyright (c) 2025 [caomengxuan666]
2
3#pragma once
4
27
28#include <condition_variable>
29#include <memory>
30#include <mutex>
31#include <optional>
32#include <thread>
33#include <utility>
34
36#include "cxxmcp/peer.hpp"
37
38namespace mcp {
39
40namespace detail {
41
43 mutable std::mutex mutex;
44 std::mutex join_mutex;
45 std::condition_variable cv;
46 CancellationSource cancellation;
47 bool running = true;
48 bool closing = false;
49 std::optional<core::Error> failure;
50};
51
52inline bool service_running(
53 const std::shared_ptr<ServiceLifecycleState>& state) noexcept {
54 if (!state) {
55 return false;
56 }
57 std::lock_guard lock(state->mutex);
58 return state->running;
59}
60
61inline CancellationToken service_cancellation_token(
62 const std::shared_ptr<ServiceLifecycleState>& state) noexcept {
63 return state ? state->cancellation.token() : CancellationToken{};
64}
65
66template <class Stop>
67inline core::Result<core::Unit> stop_service(
68 const std::shared_ptr<ServiceLifecycleState>& state, Stop stop) noexcept {
69 if (!state) {
70 return core::Unit{};
71 }
72
73 {
74 std::unique_lock lock(state->mutex);
75 if (!state->running) {
76 return core::Unit{};
77 }
78 if (state->closing) {
79 state->cv.wait(lock, [&] { return !state->running; });
80 return core::Unit{};
81 }
82 state->closing = true;
83 state->cancellation.cancel();
84 }
85
86 stop();
87
88 {
89 std::lock_guard lock(state->mutex);
90 state->running = false;
91 state->closing = false;
92 }
93 state->cv.notify_all();
94 return core::Unit{};
95}
96
97inline core::Result<core::Unit> wait_service(
98 const std::shared_ptr<ServiceLifecycleState>& state) noexcept {
99 if (!state) {
100 return core::Unit{};
101 }
102 std::unique_lock lock(state->mutex);
103 state->cv.wait(lock, [&] { return !state->running; });
104 return core::Unit{};
105}
106
107inline void finish_service(const std::shared_ptr<ServiceLifecycleState>& state,
108 std::optional<core::Error> failure = {}) noexcept {
109 if (!state) {
110 return;
111 }
112 {
113 std::lock_guard lock(state->mutex);
114 state->running = false;
115 state->closing = false;
116 state->failure = std::move(failure);
117 }
118 state->cv.notify_all();
119}
120
121} // namespace detail
122
124template <class Role>
126
128template <class Role>
130
132template <>
134 public:
135 explicit RunningService(ClientPeer peer)
136 : peer_(std::make_shared<ClientPeer>(std::move(peer))) {
137 start();
138 }
139
140 RunningService(const RunningService&) = delete;
141 RunningService& operator=(const RunningService&) = delete;
142 RunningService(RunningService&& other) noexcept
143 : peer_(std::move(other.peer_)),
144 state_(std::move(other.state_)),
145 loop_(std::move(other.loop_)) {}
146 RunningService& operator=(RunningService&& other) noexcept {
147 if (this != &other) {
148 (void)stop();
149 peer_ = std::move(other.peer_);
150 state_ = std::move(other.state_);
151 loop_ = std::move(other.loop_);
152 }
153 return *this;
154 }
155
156 ~RunningService() { (void)stop(); }
157
158 ClientPeer& peer() noexcept { return *peer_; }
159
160 const ClientPeer& peer() const noexcept { return *peer_; }
161
162 bool running() const noexcept { return detail::service_running(state_); }
163
166 return detail::service_cancellation_token(state_);
167 }
168
170 core::Result<core::Unit> close() noexcept { return stop(); }
171
176 const auto waited = detail::wait_service(state_);
177 join_loop();
178 if (!waited) {
179 return mcp::core::unexpected(waited.error());
180 }
181 if (state_) {
182 std::lock_guard lock(state_->mutex);
183 if (state_->failure.has_value()) {
184 return mcp::core::unexpected(*state_->failure);
185 }
186 }
187 return core::Unit{};
188 }
189
190 core::Result<core::Unit> stop() noexcept {
191 if (!state_) {
192 return core::Unit{};
193 }
194 {
195 std::unique_lock lock(state_->mutex);
196 if (!state_->running) {
197 lock.unlock();
198 return wait();
199 }
200 if (state_->closing) {
201 lock.unlock();
202 return wait();
203 }
204 state_->closing = true;
205 state_->cancellation.cancel();
206 }
207 if (peer_) {
208 peer_->stop();
209 }
210 if (!peer_ || !peer_->uses_native_transport()) {
211 detail::finish_service(state_);
212 }
213 return wait();
214 }
215
216 private:
217 void start() {
218 if (!peer_) {
219 detail::finish_service(state_);
220 return;
221 }
222 if (!peer_->uses_native_transport()) {
223 const auto started = peer_->start(state_->cancellation.token());
224 if (!started) {
225 detail::finish_service(state_, started.error());
226 }
227 return;
228 }
229
230 auto state = state_;
231 auto peer = peer_;
232 auto cancellation = state_->cancellation.token();
233 loop_ = std::thread([state, peer, cancellation]() noexcept {
234 const auto started = peer->start(cancellation);
235 if (!started) {
236 detail::finish_service(state, started.error());
237 } else {
238 detail::finish_service(state);
239 }
240 });
241 }
242
243 void join_loop() noexcept {
244 if (!state_) {
245 return;
246 }
247 std::lock_guard lock(state_->join_mutex);
248 if (loop_.joinable() && loop_.get_id() != std::this_thread::get_id()) {
249 loop_.join();
250 }
251 }
252
253 std::shared_ptr<ClientPeer> peer_;
254 std::shared_ptr<detail::ServiceLifecycleState> state_ =
255 std::make_shared<detail::ServiceLifecycleState>();
256 std::thread loop_;
257};
258
260template <>
262 public:
263 explicit RunningService(ServerPeer peer)
264 : peer_(std::make_shared<ServerPeer>(std::move(peer))) {
265 start_loop();
266 }
267
269 std::unique_ptr<transport::ServerTransport> transport,
270 server::SessionContext context = {})
271 : peer_(std::make_shared<ServerPeer>(std::move(peer))),
272 transport_(std::move(transport)),
273 context_(std::move(context)) {
274 start_loop();
275 }
276
277 RunningService(const RunningService&) = delete;
278 RunningService& operator=(const RunningService&) = delete;
279 RunningService(RunningService&& other) noexcept
280 : peer_(std::move(other.peer_)),
281 transport_(std::move(other.transport_)),
282 context_(std::move(other.context_)),
283 state_(std::move(other.state_)),
284 loop_(std::move(other.loop_)) {}
285 RunningService& operator=(RunningService&& other) noexcept {
286 if (this != &other) {
287 (void)stop();
288 peer_ = std::move(other.peer_);
289 transport_ = std::move(other.transport_);
290 context_ = std::move(other.context_);
291 state_ = std::move(other.state_);
292 loop_ = std::move(other.loop_);
293 }
294 return *this;
295 }
296
297 ~RunningService() { (void)stop(); }
298
299 ServerPeer& peer() noexcept { return *peer_; }
300
301 const ServerPeer& peer() const noexcept { return *peer_; }
302
308 if (peer_) {
309 peer_->wait_until_ready();
310 }
311 }
312
313 bool running() const noexcept { return detail::service_running(state_); }
314
317 return detail::service_cancellation_token(state_);
318 }
319
321 core::Result<core::Unit> close() noexcept { return stop(); }
322
327 const auto waited = detail::wait_service(state_);
328 join_loop();
329 if (!waited) {
330 return mcp::core::unexpected(waited.error());
331 }
332 if (state_) {
333 std::lock_guard lock(state_->mutex);
334 if (state_->failure.has_value()) {
335 return mcp::core::unexpected(*state_->failure);
336 }
337 }
338 return core::Unit{};
339 }
340
341 core::Result<core::Unit> stop() noexcept {
342 if (!state_) {
343 return core::Unit{};
344 }
345 {
346 std::unique_lock lock(state_->mutex);
347 if (!state_->running) {
348 lock.unlock();
349 return wait();
350 }
351 if (state_->closing) {
352 lock.unlock();
353 return wait();
354 }
355 state_->closing = true;
356 state_->cancellation.cancel();
357 }
358 if (transport_) {
359 (void)transport_->close();
360 }
361 if (peer_) {
362 peer_->stop();
363 }
364 return wait();
365 }
366
367 private:
368 void start_loop() {
369 auto state = state_;
370 auto peer = peer_;
371 auto transport = transport_;
372 auto context = context_;
373 auto cancellation = state_->cancellation.token();
374 loop_ =
375 std::thread([state, peer, transport, context, cancellation]() noexcept {
376 const auto started =
377 transport
378 ? peer->serve_transport(*transport, context, cancellation)
379 : peer->start(cancellation);
380 if (!started) {
381 detail::finish_service(state, started.error());
382 } else {
383 detail::finish_service(state);
384 }
385 });
386 }
387
388 void join_loop() noexcept {
389 if (!state_) {
390 return;
391 }
392 std::lock_guard lock(state_->join_mutex);
393 if (loop_.joinable() && loop_.get_id() != std::this_thread::get_id()) {
394 loop_.join();
395 }
396 }
397
398 std::shared_ptr<ServerPeer> peer_;
399 std::shared_ptr<transport::ServerTransport> transport_;
400 server::SessionContext context_;
401 std::shared_ptr<detail::ServiceLifecycleState> state_ =
402 std::make_shared<detail::ServiceLifecycleState>();
403 std::thread loop_;
404};
405
407template <>
409 public:
410 explicit Service(ClientPeer peer) : peer_(std::move(peer)) {}
411
412 ClientPeer& peer() noexcept { return peer_; }
413
414 const ClientPeer& peer() const noexcept { return peer_; }
415
417 return RunningService<RoleClient>(std::move(peer_));
418 }
419
420 private:
421 ClientPeer peer_;
422};
423
425template <>
427 public:
428 explicit Service(ServerPeer peer) : peer_(std::move(peer)) {}
429
430 Service(ServerPeer peer,
431 std::unique_ptr<transport::ServerTransport> transport,
432 server::SessionContext context = {})
433 : peer_(std::move(peer)),
434 transport_(std::move(transport)),
435 context_(std::move(context)) {}
436
437 ServerPeer& peer() noexcept { return peer_; }
438
439 const ServerPeer& peer() const noexcept { return peer_; }
440
442 if (transport_) {
443 return RunningService<RoleServer>(std::move(peer_), std::move(transport_),
444 std::move(context_));
445 }
446 return RunningService<RoleServer>(std::move(peer_));
447 }
448
449 private:
450 ServerPeer peer_;
451 std::unique_ptr<transport::ServerTransport> transport_;
452 server::SessionContext context_;
453};
454
457 return Service<RoleClient>(std::move(peer));
458}
459
462 return Service<RoleServer>(std::move(peer));
463}
464
467 ServerPeer peer, std::unique_ptr<transport::ServerTransport> transport,
468 server::SessionContext context = {}) {
469 return Service<RoleServer>(std::move(peer), std::move(transport),
470 std::move(context));
471}
472
475 return make_service(std::move(peer)).serve();
476}
477
480 return make_service(std::move(peer)).serve();
481}
482
485 ServerPeer peer, std::unique_ptr<transport::ServerTransport> transport,
486 server::SessionContext context = {}) {
487 return make_service(std::move(peer), std::move(transport), std::move(context))
488 .serve();
489}
490
492 auto peer = build();
493 if (!peer) {
494 return 1;
495 }
496 auto running = serve(std::move(*peer));
497 if (!running) {
498 return 1;
499 }
500 return running->wait().has_value() ? 0 : 1;
501}
502
503template <class Fn>
505 auto peer = build();
506 if (!peer) {
507 return 1;
508 }
509 auto running = mcp::serve(std::move(*peer));
510 if (!running) {
511 return 1;
512 }
513 fn(*running);
514 (void)running->stop();
515 return 0;
516}
517
518} // namespace mcp
Cooperative cancellation primitives shared by SDK lifecycle APIs.
Owner side of a cooperative cancellation token.
Definition cancellation.hpp:212
Copyable token observed by cancellation-aware SDK operations.
Definition cancellation.hpp:104
Client-side peer boundary for talking to an MCP server.
Definition peer.hpp:319
Server-side peer boundary for exposing MCP capabilities.
Definition peer.hpp:2995
core::Result< core::Unit > serve_transport(transport::ServerTransport &transport, const server::SessionContext &context={}, CancellationToken cancellation=CancellationToken::none())
Runs a sequential receive loop over a role-generic server transport.
Definition peer.hpp:3898
Role-specialized MCP peer boundary.
Definition peer.hpp:310
core::Result< core::Unit > wait() noexcept
Waits for service shutdown.
Definition service.hpp:175
CancellationToken cancellation_token() const noexcept
Returns the service cancellation token.
Definition service.hpp:165
core::Result< core::Unit > close() noexcept
Explicitly closes the running service.
Definition service.hpp:170
void wait_until_ready()
Blocks until the server is ready to accept connections.
Definition service.hpp:307
core::Result< core::Unit > wait() noexcept
Waits for service shutdown.
Definition service.hpp:326
CancellationToken cancellation_token() const noexcept
Returns the service cancellation token.
Definition service.hpp:316
core::Result< core::Unit > close() noexcept
Explicitly closes the running service.
Definition service.hpp:321
Role-specialized running MCP service.
Definition service.hpp:129
Role-specialized MCP service before it is running.
Definition service.hpp:125
Role-aware peer execution boundaries for MCP client and server SDK users.
std::monostate Unit
Success value for operations that only need to report failure.
Definition result.hpp:55
tl::expected< T, Error > Result
Alias for the SDK result type.
Definition result.hpp:64
core::Result< RunningService< RoleClient > > serve(ClientPeer peer)
Starts a client service and returns its running handle.
Definition service.hpp:474
Service< RoleClient > make_service(ClientPeer peer)
Creates a client service from a role-aware peer.
Definition service.hpp:456
Marker for a client-side MCP role.
Definition roles.hpp:11
Marker for a server-side MCP role.
Definition roles.hpp:14
Definition service.hpp:42
Per-message connection metadata supplied to server handlers.
Definition transport.hpp:42