8#include <condition_variable>
22namespace mcp::transport {
24inline core::Error transport_adapter_error(protocol::ErrorCode code,
25 std::string message) {
27 static_cast<int>(code),
37 using TxMessage =
typename Transport<Role>::TxMessage;
38 using RxMessage =
typename Transport<Role>::RxMessage;
41 std::string
name =
"function";
43 std::function<core::Result<core::Unit>(TxMessage)>
send;
45 std::function<core::Result<std::optional<RxMessage>>()>
receive;
47 std::function<core::Result<core::Unit>()>
close;
57 using TxMessage =
typename Transport<Role>::TxMessage;
58 using RxMessage =
typename Transport<Role>::RxMessage;
61 : options_(std::move(options)) {}
63 std::string_view
name() const noexcept
override {
return options_.name; }
66 if (options_.diagnostics) {
67 return options_.diagnostics();
74 return mcp::core::unexpected(transport_adapter_error(
75 protocol::ErrorCode::InvalidRequest,
76 "function transport send callback is not configured"));
78 return options_.send(std::move(message));
82 if (!options_.receive) {
83 return mcp::core::unexpected(transport_adapter_error(
84 protocol::ErrorCode::InvalidRequest,
85 "function transport receive callback is not configured"));
87 return options_.receive();
91 if (!options_.close) {
94 return options_.close();
105 return std::make_unique<FunctionTransport<Role>>(std::move(options));
114 std::string
name =
"json-line";
116 std::function<core::Result<core::Unit>(std::string)>
write_line;
120 std::function<core::Result<core::Unit>()>
close;
129 using TxMessage =
typename Transport<Role>::TxMessage;
130 using RxMessage =
typename Transport<Role>::RxMessage;
133 : options_(std::move(options)) {}
135 std::string_view
name() const noexcept
override {
return options_.name; }
138 if (options_.diagnostics) {
139 return options_.diagnostics();
145 if (!options_.write_line) {
146 return mcp::core::unexpected(transport_adapter_error(
147 protocol::ErrorCode::InvalidRequest,
148 "json-line transport write callback is not configured"));
150 const auto serialized = protocol::serialize_message(message);
152 return mcp::core::unexpected(serialized.error());
154 return options_.write_line(*serialized);
158 if (!options_.read_line) {
159 return mcp::core::unexpected(transport_adapter_error(
160 protocol::ErrorCode::InvalidRequest,
161 "json-line transport read callback is not configured"));
163 auto line = options_.read_line();
165 return mcp::core::unexpected(line.error());
167 if (!line->has_value()) {
170 if (line->value().empty()) {
171 return mcp::core::unexpected(transport_adapter_error(
172 protocol::ErrorCode::ParseError,
"empty transport message"));
174 const auto parsed = protocol::parse_message(line->value());
176 return mcp::core::unexpected(parsed.error());
178 return RxMessage{*parsed};
182 if (!options_.close) {
185 return options_.close();
196 return std::make_unique<JsonLineTransport<Role>>(std::move(options));
214 using TxMessage =
typename Transport<Role>::TxMessage;
215 using RxMessage =
typename Transport<Role>::RxMessage;
218 : options_(std::move(options)) {}
220 std::string_view
name() const noexcept
override {
return options_.name; }
223 std::lock_guard<std::mutex> lock(mutex_);
225 {
"name", options_.name},
227 {
"inbound", inbound_.size()},
228 {
"outbound", outbound_.size()},
233 std::lock_guard<std::mutex> lock(mutex_);
235 return mcp::core::unexpected(transport_adapter_error(
236 protocol::ErrorCode::InvalidRequest,
"transport is closed"));
238 if (options_.max_outbound != 0 &&
239 outbound_.size() >= options_.max_outbound) {
240 return mcp::core::unexpected(
241 transport_adapter_error(protocol::ErrorCode::InternalError,
242 "transport outbound queue is full"));
244 outbound_.push_back(std::move(message));
245 outbound_cv_.notify_one();
250 std::unique_lock<std::mutex> lock(mutex_);
251 inbound_cv_.wait(lock, [&] {
return closed_ || !inbound_.empty(); });
252 if (inbound_.empty()) {
255 auto message = std::move(inbound_.front());
256 inbound_.pop_front();
262 std::lock_guard<std::mutex> lock(mutex_);
265 inbound_cv_.notify_all();
266 outbound_cv_.notify_all();
272 std::lock_guard<std::mutex> lock(mutex_);
274 return mcp::core::unexpected(transport_adapter_error(
275 protocol::ErrorCode::InvalidRequest,
"transport is closed"));
277 if (options_.max_inbound != 0 && inbound_.size() >= options_.max_inbound) {
278 return mcp::core::unexpected(
279 transport_adapter_error(protocol::ErrorCode::InternalError,
280 "transport inbound queue is full"));
282 inbound_.push_back(std::move(message));
283 inbound_cv_.notify_one();
289 std::lock_guard<std::mutex> lock(mutex_);
290 if (outbound_.empty()) {
293 auto message = std::move(outbound_.front());
294 outbound_.pop_front();
300 mutable std::mutex mutex_;
301 std::condition_variable inbound_cv_;
302 std::condition_variable outbound_cv_;
303 std::deque<RxMessage> inbound_;
304 std::deque<TxMessage> outbound_;
305 bool closed_ =
false;
308using ClientFunctionTransport = FunctionTransport<RoleClient>;
309using ServerFunctionTransport = FunctionTransport<RoleServer>;
310using ClientJsonLineTransport = JsonLineTransport<RoleClient>;
311using ServerJsonLineTransport = JsonLineTransport<RoleServer>;
312using ClientQueueTransport = QueueTransport<RoleClient>;
313using ServerQueueTransport = QueueTransport<RoleServer>;
std::unique_ptr< Transport< Role > > make_function_transport(FunctionTransportOptions< Role > options)
Creates a function-backed role-generic transport.
Definition adapters.hpp:103
std::unique_ptr< Transport< Role > > make_json_line_transport(JsonLineTransportOptions< Role > options)
Creates a line-oriented JSON-RPC transport.
Definition adapters.hpp:194
Transport adapter for applications that already have send/receive callables.
Definition adapters.hpp:55
core::Result< core::Unit > send(TxMessage message) override
Sends one JSON-RPC message to the peer.
Definition adapters.hpp:72
protocol::Json diagnostics() const override
Structured implementation diagnostics.
Definition adapters.hpp:65
core::Result< core::Unit > close() override
Closes the transport and unblocks receive() where possible.
Definition adapters.hpp:90
core::Result< std::optional< RxMessage > > receive() override
Receives the next JSON-RPC message from the peer.
Definition adapters.hpp:81
std::string_view name() const noexcept override
Human-readable transport name for diagnostics.
Definition adapters.hpp:63
Adapter for transports that expose newline-delimited JSON strings.
Definition adapters.hpp:127
core::Result< core::Unit > send(TxMessage message) override
Sends one JSON-RPC message to the peer.
Definition adapters.hpp:144
core::Result< core::Unit > close() override
Closes the transport and unblocks receive() where possible.
Definition adapters.hpp:181
protocol::Json diagnostics() const override
Structured implementation diagnostics.
Definition adapters.hpp:137
core::Result< std::optional< RxMessage > > receive() override
Receives the next JSON-RPC message from the peer.
Definition adapters.hpp:157
std::string_view name() const noexcept override
Human-readable transport name for diagnostics.
Definition adapters.hpp:135
Thread-safe queue transport for worker/queue integrations and tests.
Definition adapters.hpp:212
std::string_view name() const noexcept override
Human-readable transport name for diagnostics.
Definition adapters.hpp:220
protocol::Json diagnostics() const override
Structured implementation diagnostics.
Definition adapters.hpp:222
core::Result< core::Unit > send(TxMessage message) override
Sends one JSON-RPC message to the peer.
Definition adapters.hpp:232
core::Result< std::optional< RxMessage > > receive() override
Receives the next JSON-RPC message from the peer.
Definition adapters.hpp:249
core::Result< core::Unit > close() override
Closes the transport and unblocks receive() where possible.
Definition adapters.hpp:260
core::Result< core::Unit > push_inbound(RxMessage message)
Queues a message that receive() will return.
Definition adapters.hpp:271
std::optional< TxMessage > pop_outbound()
Returns one message captured by send(), if available.
Definition adapters.hpp:288
Minimal message-level transport contract shared by MCP roles.
Definition transport.hpp:38
nlohmann::json Json
JSON value type used by all protocol DTOs.
Definition types.hpp:28
std::monostate Unit
Success value for operations that only need to report failure.
Definition result.hpp:55
tl::expected< T, Error > Result
Alias for the SDK result type.
Definition result.hpp:64
JSON-RPC method names and message construction/parsing helpers.
Function-backed implementation of Transport<Role>.
Definition adapters.hpp:36
std::function< core::Result< core::Unit >(TxMessage)> send
Sends one message. Required.
Definition adapters.hpp:43
std::string name
Human-readable transport name.
Definition adapters.hpp:41
std::function< core::Result< std::optional< RxMessage > >()> receive
Receives the next message or end-of-stream. Required.
Definition adapters.hpp:45
std::function< protocol::Json()> diagnostics
Returns structured diagnostics. Optional.
Definition adapters.hpp:49
std::function< core::Result< core::Unit >()> close
Closes the underlying transport. Optional.
Definition adapters.hpp:47
Line-oriented JSON-RPC source/sink adapter options.
Definition adapters.hpp:110
std::function< core::Result< core::Unit >()> close
Closes the underlying source/sink. Optional.
Definition adapters.hpp:120
std::string name
Human-readable transport name.
Definition adapters.hpp:114
std::function< core::Result< core::Unit >(std::string)> write_line
Writes one serialized JSON-RPC document without a trailing newline.
Definition adapters.hpp:116
std::function< RxLine()> read_line
Reads one serialized JSON-RPC document or end-of-stream.
Definition adapters.hpp:118
std::function< protocol::Json()> diagnostics
Returns structured diagnostics. Optional.
Definition adapters.hpp:122
Options for an in-memory queue-backed transport.
Definition adapters.hpp:201
std::size_t max_outbound
Maximum outbound queue size. Zero means unbounded.
Definition adapters.hpp:207
std::size_t max_inbound
Maximum inbound queue size. Zero means unbounded.
Definition adapters.hpp:205
std::string name
Human-readable transport name.
Definition adapters.hpp:203
Role-generic MCP transport contract for SDK peer/service layers.