Entropic 2.3.8
Local-first agentic inference engine
Loading...
Searching...
No Matches
transport_sse.cpp
Go to the documentation of this file.
1// SPDX-License-Identifier: Apache-2.0
10
11#include <httplib.h>
12#include <nlohmann/json.hpp>
13
14#include <sstream>
15
16static auto logger = entropic::log::get("mcp.transport.sse");
17
18namespace entropic {
19
28 std::string url,
29 uint32_t default_timeout_ms)
30 : url_(std::move(url)),
31 default_timeout_ms_(default_timeout_ms) {}
32
41
48bool SSETransport::parse_url() {
49 // Expected: http[s]://host[:port]/path
50 auto scheme_end = url_.find("://");
51 if (scheme_end == std::string::npos) {
52 return false;
53 }
54
55 auto host_start = scheme_end + 3;
56 auto path_start = url_.find('/', host_start);
57 if (path_start == std::string::npos) {
58 host_ = url_.substr(0, host_start) +
59 url_.substr(host_start);
60 sse_path_ = "/";
61 } else {
62 host_ = url_.substr(0, path_start);
63 sse_path_ = url_.substr(path_start);
64 }
65 return true;
66}
67
73void SSETransport::warn_if_cleartext() const {
74 bool is_http = url_.substr(0, 5) == "http:";
75 bool is_localhost = host_.find("localhost") != std::string::npos ||
76 host_.find("127.0.0.1") != std::string::npos ||
77 host_.find("[::1]") != std::string::npos;
78 if (is_http && !is_localhost) {
79 logger->warn("SSE connection to {} uses HTTP (cleartext). "
80 "Tool call data will be unencrypted.", host_);
81 }
82}
83
91 if (connected_) {
92 return true;
93 }
94
95 if (!parse_url()) {
96 logger->error("Invalid SSE URL: {}", url_);
97 return false;
98 }
99
100 warn_if_cleartext();
101
102 client_ = std::make_unique<httplib::Client>(host_);
103 client_->set_read_timeout(std::chrono::seconds(0));
104 client_->set_connection_timeout(std::chrono::seconds(10));
105
106 running_ = true;
107 sse_reader_thread_ = std::thread(
108 &SSETransport::sse_reader_loop, this);
109
110 std::this_thread::sleep_for(std::chrono::milliseconds(500));
111
112 bool ok = connected_.load();
113 if (!ok) {
114 logger->error("SSE connection to {} failed", url_);
115 close();
116 } else {
117 logger->info("SSE transport connected to {}", url_);
118 }
119 return ok;
120}
121
128 running_ = false;
129 connected_ = false;
130
131 if (client_) {
132 client_->stop();
133 }
134
135 // Wake any waiting send_request callers
136 pending_cv_.notify_all();
137
138 if (sse_reader_thread_.joinable()) {
139 sse_reader_thread_.join();
140 }
141
142 client_.reset();
143 logger->info("SSE transport closed for {}", url_);
144}
145
154bool SSETransport::parse_request_id(
155 const std::string& request_json, int& request_id) {
156
157 try {
158 auto req = nlohmann::json::parse(request_json);
159 request_id = req.value("id", 0);
160 return true;
161 } catch (...) {
162 return false;
163 }
164}
165
173bool SSETransport::post_request(const std::string& request_json) {
174 auto result = client_->Post(
175 message_endpoint_, request_json, "application/json");
176
177 if (!result || result->status != 200) {
178 logger->error("POST to {} failed: {}",
179 message_endpoint_,
180 result ? std::to_string(result->status)
181 : "no response");
182 return false;
183 }
184 return true;
185}
186
195std::string SSETransport::await_response(
196 int request_id, uint32_t timeout_ms) {
197
198 std::unique_lock<std::mutex> lock(pending_mutex_);
199 auto deadline = std::chrono::steady_clock::now() +
200 std::chrono::milliseconds(timeout_ms);
201
202 while (pending_responses_.find(request_id) ==
203 pending_responses_.end()) {
204 if (pending_cv_.wait_until(lock, deadline) ==
205 std::cv_status::timeout) {
206 logger->warn("SSE response timeout for id={}",
207 request_id);
208 return "";
209 }
210 if (!connected_) {
211 return "";
212 }
213 }
214
215 std::string response = std::move(
216 pending_responses_[request_id]);
217 pending_responses_.erase(request_id);
218 return response;
219}
220
230 const std::string& request_json,
231 uint32_t timeout_ms) {
232
233 logger->info("SSE request: {} bytes", request_json.size());
234 int request_id = 0;
235 bool ready = connected_ && !message_endpoint_.empty() &&
236 parse_request_id(request_json, request_id) &&
237 post_request(request_json);
238
239 if (!ready) {
240 return "";
241 }
242
243 uint32_t actual_timeout = timeout_ms > 0
244 ? timeout_ms : default_timeout_ms_;
245 return await_response(request_id, actual_timeout);
246}
247
255 return connected_;
256}
257
263void SSETransport::sse_reader_loop() {
264 auto content_receiver = [this](
265 const char* data, size_t len) -> bool {
266 std::string chunk(data, len);
267 // Parse SSE events from chunk
268 std::istringstream stream(chunk);
269 std::string line;
270 while (std::getline(stream, line)) {
271 if (line.empty() || line == "\r") {
272 continue;
273 }
274 process_sse_line(line);
275 }
276 return running_.load();
277 };
278
279 auto res = client_->Get(
280 sse_path_,
281 httplib::Headers{{"Accept", "text/event-stream"}},
282 content_receiver);
283
284 connected_ = false;
285 if (running_) {
286 logger->warn("SSE stream disconnected from {}", url_);
287 }
288}
289
296void SSETransport::process_sse_line(const std::string& line) {
297 // Strip trailing \r
298 std::string clean = line;
299 if (!clean.empty() && clean.back() == '\r') {
300 clean.pop_back();
301 }
302
303 if (clean.substr(0, 6) == "data: ") {
304 handle_sse_data(clean.substr(6));
305 } else if (clean.substr(0, 7) == "event: ") {
306 current_event_type_ = clean.substr(7);
307 }
308}
309
316void SSETransport::handle_sse_data(const std::string& data) {
317 if (current_event_type_ == "endpoint") {
318 handle_endpoint_event(data);
319 current_event_type_.clear();
320 return;
321 }
322 current_event_type_.clear();
323
324 try {
325 auto j = nlohmann::json::parse(data);
326 int id = j.value("id", 0);
327 if (id > 0) {
328 std::lock_guard<std::mutex> lock(pending_mutex_);
329 pending_responses_[id] = data;
330 pending_cv_.notify_all();
331 }
332 } catch (const nlohmann::json::exception& e) {
333 logger->warn("Failed to parse SSE data: {}", e.what());
334 }
335}
336
343void SSETransport::handle_endpoint_event(const std::string& data) {
344 if (data.empty()) {
345 return;
346 }
347
348 // Could be absolute URL or relative path
349 if (data[0] == '/') {
350 message_endpoint_ = data;
351 } else if (data.find("://") != std::string::npos) {
352 // Extract path from absolute URL
353 auto path_start = data.find('/', data.find("://") + 3);
354 message_endpoint_ = (path_start != std::string::npos)
355 ? data.substr(path_start) : "/message";
356 } else {
357 message_endpoint_ = "/" + data;
358 }
359
360 connected_ = true;
361 logger->info("SSE message endpoint: {}", message_endpoint_);
362}
363
364} // namespace entropic
SSETransport(std::string url, uint32_t default_timeout_ms=30000)
Construct with SSE endpoint URL.
void close() override
Stop SSE reader and close HTTP client.
bool is_connected() const override
Check if SSE stream is connected.
std::string send_request(const std::string &request_json, uint32_t timeout_ms=0) override
POST a JSON-RPC request and wait for matching response.
bool open() override
Connect to SSE endpoint and start reader thread.
~SSETransport() override
Destructor — ensures reader thread is stopped.
spdlog initialization and logger access.
ENTROPIC_EXPORT std::shared_ptr< spdlog::logger > get(const std::string &name)
Get or create a named logger.
Definition logging.cpp:211
Activate model on GPU (WARM → ACTIVE).
@ ok
Tool dispatched, returned non-empty content.
SSE transport for external MCP servers.