12#include <nlohmann/json.hpp>
29 uint32_t default_timeout_ms)
30 : url_(std::move(url)),
31 default_timeout_ms_(default_timeout_ms) {}
48bool SSETransport::parse_url() {
50 auto scheme_end = url_.find(
"://");
51 if (scheme_end == std::string::npos) {
55 auto host_start = scheme_end + 3;
56 auto path_start = url_.find(
'/', host_start);
57 if (path_start == std::string::npos) {
58 host_ = url_.substr(0, host_start) +
59 url_.substr(host_start);
62 host_ = url_.substr(0, path_start);
63 sse_path_ = url_.substr(path_start);
73void SSETransport::warn_if_cleartext()
const {
74 bool is_http = url_.substr(0, 5) ==
"http:";
75 bool is_localhost = host_.find(
"localhost") != std::string::npos ||
76 host_.find(
"127.0.0.1") != std::string::npos ||
77 host_.find(
"[::1]") != std::string::npos;
78 if (is_http && !is_localhost) {
79 logger->warn(
"SSE connection to {} uses HTTP (cleartext). "
80 "Tool call data will be unencrypted.", host_);
96 logger->error(
"Invalid SSE URL: {}", url_);
102 client_ = std::make_unique<httplib::Client>(host_);
103 client_->set_read_timeout(std::chrono::seconds(0));
104 client_->set_connection_timeout(std::chrono::seconds(10));
107 sse_reader_thread_ = std::thread(
108 &SSETransport::sse_reader_loop,
this);
110 std::this_thread::sleep_for(std::chrono::milliseconds(500));
112 bool ok = connected_.load();
114 logger->error(
"SSE connection to {} failed", url_);
117 logger->info(
"SSE transport connected to {}", url_);
136 pending_cv_.notify_all();
138 if (sse_reader_thread_.joinable()) {
139 sse_reader_thread_.join();
143 logger->info(
"SSE transport closed for {}", url_);
154bool SSETransport::parse_request_id(
155 const std::string& request_json,
int& request_id) {
158 auto req = nlohmann::json::parse(request_json);
159 request_id = req.value(
"id", 0);
173bool SSETransport::post_request(
const std::string& request_json) {
174 auto result = client_->Post(
175 message_endpoint_, request_json,
"application/json");
177 if (!result || result->status != 200) {
178 logger->error(
"POST to {} failed: {}",
180 result ? std::to_string(result->status)
195std::string SSETransport::await_response(
196 int request_id, uint32_t timeout_ms) {
198 std::unique_lock<std::mutex> lock(pending_mutex_);
199 auto deadline = std::chrono::steady_clock::now() +
200 std::chrono::milliseconds(timeout_ms);
202 while (pending_responses_.find(request_id) ==
203 pending_responses_.end()) {
204 if (pending_cv_.wait_until(lock, deadline) ==
205 std::cv_status::timeout) {
206 logger->warn(
"SSE response timeout for id={}",
215 std::string response = std::move(
216 pending_responses_[request_id]);
217 pending_responses_.erase(request_id);
230 const std::string& request_json,
231 uint32_t timeout_ms) {
233 logger->info(
"SSE request: {} bytes", request_json.size());
235 bool ready = connected_ && !message_endpoint_.empty() &&
236 parse_request_id(request_json, request_id) &&
237 post_request(request_json);
243 uint32_t actual_timeout = timeout_ms > 0
244 ? timeout_ms : default_timeout_ms_;
245 return await_response(request_id, actual_timeout);
263void SSETransport::sse_reader_loop() {
264 auto content_receiver = [
this](
265 const char* data,
size_t len) ->
bool {
266 std::string chunk(data, len);
268 std::istringstream stream(chunk);
270 while (std::getline(stream, line)) {
271 if (line.empty() || line ==
"\r") {
274 process_sse_line(line);
276 return running_.load();
279 auto res = client_->Get(
281 httplib::Headers{{
"Accept",
"text/event-stream"}},
286 logger->warn(
"SSE stream disconnected from {}", url_);
296void SSETransport::process_sse_line(
const std::string& line) {
298 std::string clean = line;
299 if (!clean.empty() && clean.back() ==
'\r') {
303 if (clean.substr(0, 6) ==
"data: ") {
304 handle_sse_data(clean.substr(6));
305 }
else if (clean.substr(0, 7) ==
"event: ") {
306 current_event_type_ = clean.substr(7);
316void SSETransport::handle_sse_data(
const std::string& data) {
317 if (current_event_type_ ==
"endpoint") {
318 handle_endpoint_event(data);
319 current_event_type_.clear();
322 current_event_type_.clear();
325 auto j = nlohmann::json::parse(data);
326 int id = j.value(
"id", 0);
328 std::lock_guard<std::mutex> lock(pending_mutex_);
329 pending_responses_[id] = data;
330 pending_cv_.notify_all();
332 }
catch (
const nlohmann::json::exception& e) {
333 logger->warn(
"Failed to parse SSE data: {}", e.what());
343void SSETransport::handle_endpoint_event(
const std::string& data) {
349 if (data[0] ==
'/') {
350 message_endpoint_ = data;
351 }
else if (data.find(
"://") != std::string::npos) {
353 auto path_start = data.find(
'/', data.find(
"://") + 3);
354 message_endpoint_ = (path_start != std::string::npos)
355 ? data.substr(path_start) :
"/message";
357 message_endpoint_ =
"/" + data;
361 logger->info(
"SSE message endpoint: {}", message_endpoint_);
SSETransport(std::string url, uint32_t default_timeout_ms=30000)
Construct with SSE endpoint URL.
void close() override
Stop SSE reader and close HTTP client.
bool is_connected() const override
Check if SSE stream is connected.
std::string send_request(const std::string &request_json, uint32_t timeout_ms=0) override
POST a JSON-RPC request and wait for matching response.
bool open() override
Connect to SSE endpoint and start reader thread.
~SSETransport() override
Destructor — ensures reader thread is stopped.
spdlog initialization and logger access.
ENTROPIC_EXPORT std::shared_ptr< spdlog::logger > get(const std::string &name)
Get or create a named logger.
Activate model on GPU (WARM → ACTIVE).
@ ok
Tool dispatched, returned non-empty content.
SSE transport for external MCP servers.