cxxmcp 1.1.6
C++ MCP SDK
Loading...
Searching...
No Matches
adapters.hpp
Go to the documentation of this file.
1// Copyright (c) 2025 [caomengxuan666]
2
3#pragma once
4
7
8#include <condition_variable>
9#include <cstddef>
10#include <deque>
11#include <functional>
12#include <memory>
13#include <mutex>
14#include <optional>
15#include <string>
16#include <string_view>
17#include <utility>
18
21
22namespace mcp::transport {
23
24inline core::Error transport_adapter_error(protocol::ErrorCode code,
25 std::string message) {
26 return core::Error{
27 static_cast<int>(code),
28 std::move(message),
29 {},
30 "transport",
31 };
32}
33
35template <class Role>
37 using TxMessage = typename Transport<Role>::TxMessage;
38 using RxMessage = typename Transport<Role>::RxMessage;
39
41 std::string name = "function";
43 std::function<core::Result<core::Unit>(TxMessage)> send;
45 std::function<core::Result<std::optional<RxMessage>>()> receive;
47 std::function<core::Result<core::Unit>()> close;
49 std::function<protocol::Json()> diagnostics;
50};
51
54template <class Role>
55class FunctionTransport final : public Transport<Role> {
56 public:
57 using TxMessage = typename Transport<Role>::TxMessage;
58 using RxMessage = typename Transport<Role>::RxMessage;
59
61 : options_(std::move(options)) {}
62
63 std::string_view name() const noexcept override { return options_.name; }
64
65 protocol::Json diagnostics() const override {
66 if (options_.diagnostics) {
67 return options_.diagnostics();
68 }
69 return protocol::Json{{"name", options_.name}};
70 }
71
72 core::Result<core::Unit> send(TxMessage message) override {
73 if (!options_.send) {
74 return mcp::core::unexpected(transport_adapter_error(
75 protocol::ErrorCode::InvalidRequest,
76 "function transport send callback is not configured"));
77 }
78 return options_.send(std::move(message));
79 }
80
82 if (!options_.receive) {
83 return mcp::core::unexpected(transport_adapter_error(
84 protocol::ErrorCode::InvalidRequest,
85 "function transport receive callback is not configured"));
86 }
87 return options_.receive();
88 }
89
91 if (!options_.close) {
92 return core::Unit{};
93 }
94 return options_.close();
95 }
96
97 private:
99};
100
102template <class Role>
103std::unique_ptr<Transport<Role>> make_function_transport(
105 return std::make_unique<FunctionTransport<Role>>(std::move(options));
106}
107
109template <class Role>
112
114 std::string name = "json-line";
116 std::function<core::Result<core::Unit>(std::string)> write_line;
118 std::function<RxLine()> read_line;
120 std::function<core::Result<core::Unit>()> close;
122 std::function<protocol::Json()> diagnostics;
123};
124
126template <class Role>
127class JsonLineTransport final : public Transport<Role> {
128 public:
129 using TxMessage = typename Transport<Role>::TxMessage;
130 using RxMessage = typename Transport<Role>::RxMessage;
131
133 : options_(std::move(options)) {}
134
135 std::string_view name() const noexcept override { return options_.name; }
136
137 protocol::Json diagnostics() const override {
138 if (options_.diagnostics) {
139 return options_.diagnostics();
140 }
141 return protocol::Json{{"name", options_.name}};
142 }
143
144 core::Result<core::Unit> send(TxMessage message) override {
145 if (!options_.write_line) {
146 return mcp::core::unexpected(transport_adapter_error(
147 protocol::ErrorCode::InvalidRequest,
148 "json-line transport write callback is not configured"));
149 }
150 const auto serialized = protocol::serialize_message(message);
151 if (!serialized) {
152 return mcp::core::unexpected(serialized.error());
153 }
154 return options_.write_line(*serialized);
155 }
156
158 if (!options_.read_line) {
159 return mcp::core::unexpected(transport_adapter_error(
160 protocol::ErrorCode::InvalidRequest,
161 "json-line transport read callback is not configured"));
162 }
163 auto line = options_.read_line();
164 if (!line) {
165 return mcp::core::unexpected(line.error());
166 }
167 if (!line->has_value()) {
168 return std::nullopt;
169 }
170 if (line->value().empty()) {
171 return mcp::core::unexpected(transport_adapter_error(
172 protocol::ErrorCode::ParseError, "empty transport message"));
173 }
174 const auto parsed = protocol::parse_message(line->value());
175 if (!parsed) {
176 return mcp::core::unexpected(parsed.error());
177 }
178 return RxMessage{*parsed};
179 }
180
182 if (!options_.close) {
183 return core::Unit{};
184 }
185 return options_.close();
186 }
187
188 private:
190};
191
193template <class Role>
194std::unique_ptr<Transport<Role>> make_json_line_transport(
196 return std::make_unique<JsonLineTransport<Role>>(std::move(options));
197}
198
200template <class Role>
203 std::string name = "queue";
205 std::size_t max_inbound = 0;
207 std::size_t max_outbound = 0;
208};
209
211template <class Role>
212class QueueTransport final : public Transport<Role> {
213 public:
214 using TxMessage = typename Transport<Role>::TxMessage;
215 using RxMessage = typename Transport<Role>::RxMessage;
216
217 explicit QueueTransport(QueueTransportOptions<Role> options = {})
218 : options_(std::move(options)) {}
219
220 std::string_view name() const noexcept override { return options_.name; }
221
222 protocol::Json diagnostics() const override {
223 std::lock_guard<std::mutex> lock(mutex_);
224 return protocol::Json{
225 {"name", options_.name},
226 {"closed", closed_},
227 {"inbound", inbound_.size()},
228 {"outbound", outbound_.size()},
229 };
230 }
231
232 core::Result<core::Unit> send(TxMessage message) override {
233 std::lock_guard<std::mutex> lock(mutex_);
234 if (closed_) {
235 return mcp::core::unexpected(transport_adapter_error(
236 protocol::ErrorCode::InvalidRequest, "transport is closed"));
237 }
238 if (options_.max_outbound != 0 &&
239 outbound_.size() >= options_.max_outbound) {
240 return mcp::core::unexpected(
241 transport_adapter_error(protocol::ErrorCode::InternalError,
242 "transport outbound queue is full"));
243 }
244 outbound_.push_back(std::move(message));
245 outbound_cv_.notify_one();
246 return core::Unit{};
247 }
248
250 std::unique_lock<std::mutex> lock(mutex_);
251 inbound_cv_.wait(lock, [&] { return closed_ || !inbound_.empty(); });
252 if (inbound_.empty()) {
253 return std::nullopt;
254 }
255 auto message = std::move(inbound_.front());
256 inbound_.pop_front();
257 return message;
258 }
259
261 {
262 std::lock_guard<std::mutex> lock(mutex_);
263 closed_ = true;
264 }
265 inbound_cv_.notify_all();
266 outbound_cv_.notify_all();
267 return core::Unit{};
268 }
269
272 std::lock_guard<std::mutex> lock(mutex_);
273 if (closed_) {
274 return mcp::core::unexpected(transport_adapter_error(
275 protocol::ErrorCode::InvalidRequest, "transport is closed"));
276 }
277 if (options_.max_inbound != 0 && inbound_.size() >= options_.max_inbound) {
278 return mcp::core::unexpected(
279 transport_adapter_error(protocol::ErrorCode::InternalError,
280 "transport inbound queue is full"));
281 }
282 inbound_.push_back(std::move(message));
283 inbound_cv_.notify_one();
284 return core::Unit{};
285 }
286
288 std::optional<TxMessage> pop_outbound() {
289 std::lock_guard<std::mutex> lock(mutex_);
290 if (outbound_.empty()) {
291 return std::nullopt;
292 }
293 auto message = std::move(outbound_.front());
294 outbound_.pop_front();
295 return message;
296 }
297
298 private:
300 mutable std::mutex mutex_;
301 std::condition_variable inbound_cv_;
302 std::condition_variable outbound_cv_;
303 std::deque<RxMessage> inbound_;
304 std::deque<TxMessage> outbound_;
305 bool closed_ = false;
306};
307
308using ClientFunctionTransport = FunctionTransport<RoleClient>;
309using ServerFunctionTransport = FunctionTransport<RoleServer>;
310using ClientJsonLineTransport = JsonLineTransport<RoleClient>;
311using ServerJsonLineTransport = JsonLineTransport<RoleServer>;
312using ClientQueueTransport = QueueTransport<RoleClient>;
313using ServerQueueTransport = QueueTransport<RoleServer>;
314
315} // namespace mcp::transport
std::unique_ptr< Transport< Role > > make_function_transport(FunctionTransportOptions< Role > options)
Creates a function-backed role-generic transport.
Definition adapters.hpp:103
std::unique_ptr< Transport< Role > > make_json_line_transport(JsonLineTransportOptions< Role > options)
Creates a line-oriented JSON-RPC transport.
Definition adapters.hpp:194
Transport adapter for applications that already have send/receive callables.
Definition adapters.hpp:55
core::Result< core::Unit > send(TxMessage message) override
Sends one JSON-RPC message to the peer.
Definition adapters.hpp:72
protocol::Json diagnostics() const override
Structured implementation diagnostics.
Definition adapters.hpp:65
core::Result< core::Unit > close() override
Closes the transport and unblocks receive() where possible.
Definition adapters.hpp:90
core::Result< std::optional< RxMessage > > receive() override
Receives the next JSON-RPC message from the peer.
Definition adapters.hpp:81
std::string_view name() const noexcept override
Human-readable transport name for diagnostics.
Definition adapters.hpp:63
Adapter for transports that expose newline-delimited JSON strings.
Definition adapters.hpp:127
core::Result< core::Unit > send(TxMessage message) override
Sends one JSON-RPC message to the peer.
Definition adapters.hpp:144
core::Result< core::Unit > close() override
Closes the transport and unblocks receive() where possible.
Definition adapters.hpp:181
protocol::Json diagnostics() const override
Structured implementation diagnostics.
Definition adapters.hpp:137
core::Result< std::optional< RxMessage > > receive() override
Receives the next JSON-RPC message from the peer.
Definition adapters.hpp:157
std::string_view name() const noexcept override
Human-readable transport name for diagnostics.
Definition adapters.hpp:135
Thread-safe queue transport for worker/queue integrations and tests.
Definition adapters.hpp:212
std::string_view name() const noexcept override
Human-readable transport name for diagnostics.
Definition adapters.hpp:220
protocol::Json diagnostics() const override
Structured implementation diagnostics.
Definition adapters.hpp:222
core::Result< core::Unit > send(TxMessage message) override
Sends one JSON-RPC message to the peer.
Definition adapters.hpp:232
core::Result< std::optional< RxMessage > > receive() override
Receives the next JSON-RPC message from the peer.
Definition adapters.hpp:249
core::Result< core::Unit > close() override
Closes the transport and unblocks receive() where possible.
Definition adapters.hpp:260
core::Result< core::Unit > push_inbound(RxMessage message)
Queues a message that receive() will return.
Definition adapters.hpp:271
std::optional< TxMessage > pop_outbound()
Returns one message captured by send(), if available.
Definition adapters.hpp:288
Minimal message-level transport contract shared by MCP roles.
Definition transport.hpp:38
nlohmann::json Json
JSON value type used by all protocol DTOs.
Definition types.hpp:28
std::monostate Unit
Success value for operations that only need to report failure.
Definition result.hpp:55
tl::expected< T, Error > Result
Alias for the SDK result type.
Definition result.hpp:64
JSON-RPC method names and message construction/parsing helpers.
Function-backed implementation of Transport<Role>.
Definition adapters.hpp:36
std::function< core::Result< core::Unit >(TxMessage)> send
Sends one message. Required.
Definition adapters.hpp:43
std::string name
Human-readable transport name.
Definition adapters.hpp:41
std::function< core::Result< std::optional< RxMessage > >()> receive
Receives the next message or end-of-stream. Required.
Definition adapters.hpp:45
std::function< protocol::Json()> diagnostics
Returns structured diagnostics. Optional.
Definition adapters.hpp:49
std::function< core::Result< core::Unit >()> close
Closes the underlying transport. Optional.
Definition adapters.hpp:47
Line-oriented JSON-RPC source/sink adapter options.
Definition adapters.hpp:110
std::function< core::Result< core::Unit >()> close
Closes the underlying source/sink. Optional.
Definition adapters.hpp:120
std::string name
Human-readable transport name.
Definition adapters.hpp:114
std::function< core::Result< core::Unit >(std::string)> write_line
Writes one serialized JSON-RPC document without a trailing newline.
Definition adapters.hpp:116
std::function< RxLine()> read_line
Reads one serialized JSON-RPC document or end-of-stream.
Definition adapters.hpp:118
std::function< protocol::Json()> diagnostics
Returns structured diagnostics. Optional.
Definition adapters.hpp:122
Options for an in-memory queue-backed transport.
Definition adapters.hpp:201
std::size_t max_outbound
Maximum outbound queue size. Zero means unbounded.
Definition adapters.hpp:207
std::size_t max_inbound
Maximum inbound queue size. Zero means unbounded.
Definition adapters.hpp:205
std::string name
Human-readable transport name.
Definition adapters.hpp:203
Role-generic MCP transport contract for SDK peer/service layers.