Entropic 2.3.8
Local-first agentic inference engine
Loading...
Searching...
No Matches
transport_stdio.cpp
Go to the documentation of this file.
1// SPDX-License-Identifier: Apache-2.0
10
11#include <cerrno>
12#include <cstring>
13#include <chrono>
14
15#include <fcntl.h>
16#include <poll.h>
17#include <signal.h>
18#include <spawn.h>
19#include <sys/wait.h>
20#include <unistd.h>
21
22namespace {
28struct IgnoreSigpipe {
34 IgnoreSigpipe() { signal(SIGPIPE, SIG_IGN); }
35};
36static IgnoreSigpipe ignore_sigpipe;
37} // namespace
38
39extern char** environ;
40
41static auto logger = entropic::log::get("mcp.transport.stdio");
42
43namespace entropic {
44
61static std::string sanitize_display_name(const std::string& raw) {
62 std::string out;
63 out.reserve(raw.size());
64 for (char c : raw) {
65 if (out.empty() && c == '/') { continue; }
66 if (c == '[' || c == ']') { continue; }
67 if (static_cast<unsigned char>(c) < 0x20 || c == 0x7f) { continue; }
68 out.push_back(c);
69 }
70 return out.empty() ? std::string{"server"} : out;
71}
72
83 std::string command,
84 std::vector<std::string> args,
85 std::map<std::string, std::string> env,
86 uint32_t default_timeout_ms)
87 : display_name_(sanitize_display_name(command)),
88 command_(std::move(command)),
89 args_(std::move(args)),
90 env_(std::move(env)),
91 default_timeout_ms_(default_timeout_ms) {}
92
109 std::string display_name,
110 std::string command,
111 std::vector<std::string> args,
112 std::map<std::string, std::string> env,
113 uint32_t default_timeout_ms)
114 : display_name_(sanitize_display_name(
115 display_name.empty() ? command : std::move(display_name))),
116 command_(std::move(command)),
117 args_(std::move(args)),
118 env_(std::move(env)),
119 default_timeout_ms_(default_timeout_ms) {}
120
129
137bool StdioTransport::create_all_pipes(int (&fds)[6]) {
138 bool ok = create_pipe(fds[0], fds[1]) &&
139 create_pipe(fds[2], fds[3]) &&
140 create_pipe(fds[4], fds[5]);
141 if (!ok) {
142 logger->error("Failed to create pipes: {}", strerror(errno));
143 for (auto& fd : fds) { close_fd(fd); }
144 }
145 return ok;
146}
147
155 if (connected_) {
156 return true;
157 }
158
159 if (!open_child_process()) {
160 return false;
161 }
162
163 connected_ = true;
164 stderr_thread_ = std::thread(
165 &StdioTransport::stderr_reader_loop, this);
166
167 logger->info("Spawned child process PID {} for '{}' (cmd: {})",
168 child_pid_, display_name_, command_);
169 return true;
170}
171
178bool StdioTransport::open_child_process() {
179 int fds[6] = {-1, -1, -1, -1, -1, -1};
180 if (!create_all_pipes(fds)) {
181 return false;
182 }
183
184 auto env_strs = build_env();
185 bool ok = spawn_child(fds[0], fds[3], fds[5], env_strs);
186
187 // Close child-side pipe ends in parent
188 close_fd(fds[0]);
189 close_fd(fds[3]);
190 close_fd(fds[5]);
191
192 if (!ok) {
193 close_fd(fds[1]);
194 close_fd(fds[2]);
195 close_fd(fds[4]);
196 return false;
197 }
198
199 stdin_fd_ = fds[1];
200 stdout_fd_ = fds[2];
201 stderr_fd_ = fds[4];
202 return true;
203}
204
211 connected_ = false;
212
213 terminate_child();
214 close_fd(stdin_fd_);
215 close_fd(stdout_fd_);
216 close_fd(stderr_fd_);
217
218 if (stderr_thread_.joinable()) {
219 stderr_thread_.join();
220 }
221
222 logger->info("Closed stdio transport for '{}'", display_name_);
223}
224
234 const std::string& request_json,
235 uint32_t timeout_ms) {
236
237 if (!connected_ || cancel_flag_.load(std::memory_order_acquire)) {
238 return "";
239 }
240
241 uint32_t actual_timeout = timeout_ms > 0
242 ? timeout_ms : default_timeout_ms_;
243
244 std::lock_guard<std::mutex> lock(io_mutex_);
245
246 std::string msg = request_json + "\n";
247 ssize_t written = ::write(stdin_fd_, msg.data(), msg.size());
248 if (written < 0 || static_cast<size_t>(written) != msg.size()) {
249 logger->error("Write to child stdin failed: {}",
250 strerror(errno));
251 connected_ = false;
252 return "";
253 }
254
255 return read_line(stdout_fd_, actual_timeout);
256}
257
265 if (!connected_) {
266 return false;
267 }
268 if (child_pid_ > 0 && kill(child_pid_, 0) == 0) {
269 return true;
270 }
271 return false;
272}
273
286 cancel_flag_.store(true, std::memory_order_release);
287}
288
297static std::vector<char*> build_argv(const std::string& command,
298 const std::vector<std::string>& args) {
299 std::vector<char*> argv;
300 argv.push_back(const_cast<char*>(command.c_str()));
301 for (const auto& arg : args) {
302 argv.push_back(const_cast<char*>(arg.c_str()));
303 }
304 argv.push_back(nullptr);
305 return argv;
306}
307
315static std::vector<char*> build_envp(
316 const std::vector<std::string>& env_strs) {
317 std::vector<char*> envp;
318 for (const auto& e : env_strs) {
319 envp.push_back(const_cast<char*>(e.c_str()));
320 }
321 envp.push_back(nullptr);
322 return envp;
323}
324
335bool StdioTransport::spawn_child(
336 int stdin_r, int stdout_w, int stderr_w,
337 const std::vector<std::string>& env_strs) {
338
339 posix_spawn_file_actions_t actions;
340 posix_spawn_file_actions_init(&actions);
341 posix_spawn_file_actions_adddup2(&actions, stdin_r, STDIN_FILENO);
342 posix_spawn_file_actions_adddup2(&actions, stdout_w, STDOUT_FILENO);
343 posix_spawn_file_actions_adddup2(&actions, stderr_w, STDERR_FILENO);
344
345 auto argv = build_argv(command_, args_);
346 auto envp = build_envp(env_strs);
347
348 int err = posix_spawnp(
349 &child_pid_, command_.c_str(), &actions,
350 nullptr, argv.data(), envp.data());
351
352 posix_spawn_file_actions_destroy(&actions);
353
354 if (err != 0) {
355 logger->error("posix_spawnp failed for '{}' (cmd: {}): {}",
356 display_name_, command_, strerror(err));
357 child_pid_ = -1;
358 return false;
359 }
360 return true;
361}
362
369std::vector<std::string> StdioTransport::build_env() const {
370 std::map<std::string, std::string> merged;
371
372 // Copy parent environment
373 for (char** ep = environ; ep && *ep; ++ep) {
374 std::string entry(*ep);
375 auto eq = entry.find('=');
376 if (eq != std::string::npos) {
377 merged[entry.substr(0, eq)] = entry.substr(eq + 1);
378 }
379 }
380
381 // Apply overrides
382 for (const auto& [key, val] : env_) {
383 merged[key] = val;
384 }
385
386 std::vector<std::string> result;
387 result.reserve(merged.size());
388 for (const auto& [key, val] : merged) {
389 result.push_back(key + "=" + val);
390 }
391 return result;
392}
393
402bool StdioTransport::create_pipe(int& read_fd, int& write_fd) {
403 int fds[2];
404 if (::pipe(fds) != 0) {
405 return false;
406 }
407 ::fcntl(fds[0], F_SETFD, FD_CLOEXEC);
408 ::fcntl(fds[1], F_SETFD, FD_CLOEXEC);
409 read_fd = fds[0];
410 write_fd = fds[1];
411 return true;
412}
413
422int StdioTransport::poll_until_ready(
423 int fd,
424 std::chrono::steady_clock::time_point deadline) {
425
426 auto remaining = std::chrono::duration_cast<
427 std::chrono::milliseconds>(
428 deadline - std::chrono::steady_clock::now());
429
430 if (remaining.count() <= 0) {
431 return 0;
432 }
433
434 // P1-10 (2.0.6-rc16): cap the single poll slice at 100ms so the
435 // caller's cancel_flag_ is re-checked at least ten times/second.
436 constexpr int kSliceMs = 100;
437 int slice = static_cast<int>(remaining.count());
438 if (slice > kSliceMs) { slice = kSliceMs; }
439
440 struct pollfd pfd{fd, POLLIN, 0};
441 int rc = ::poll(&pfd, 1, slice);
442 if (rc == 0) {
443 // Slice expired; report 0 only if the full deadline passed.
444 if (std::chrono::steady_clock::now() < deadline) {
445 return -2; // caller retries after checking cancel_flag
446 }
447 }
448 return rc;
449}
450
464std::string StdioTransport::read_line(int fd, uint32_t timeout_ms) {
465 std::string line;
466 auto deadline = std::chrono::steady_clock::now() +
467 std::chrono::milliseconds(timeout_ms);
468
469 while (true) {
470 // P1-10: short-circuit if the engine interrupted this request.
471 if (cancel_flag_.load(std::memory_order_acquire)) {
472 logger->info("Transport read cancelled by interrupt");
473 break;
474 }
475 int ready = poll_until_ready(fd, deadline);
476 if (ready == -2) { continue; } // slice expired, re-check cancel
477 if (ready <= 0) {
478 if (ready == 0) { logger->warn("Read timeout after {}ms", timeout_ms); }
479 break;
480 }
481 char ch = 0;
482 if (::read(fd, &ch, 1) <= 0) { break; }
483 if (ch == '\n') { return line; }
484 line += ch;
485 }
486 return "";
487}
488
494void StdioTransport::stderr_reader_loop() {
495 constexpr int poll_timeout_ms = 500;
496 while (connected_) {
497 struct pollfd pfd{stderr_fd_, POLLIN, 0};
498 int ret = ::poll(&pfd, 1, poll_timeout_ms);
499 if (ret <= 0) {
500 continue;
501 }
502
503 char buf[1024];
504 ssize_t n = ::read(stderr_fd_, buf, sizeof(buf) - 1);
505 if (n <= 0) {
506 break;
507 }
508 buf[n] = '\0';
509 logger->warn("[{}] {}", display_name_, buf);
510 }
511}
512
518void StdioTransport::terminate_child() {
519 if (child_pid_ <= 0) {
520 return;
521 }
522
523 ::kill(child_pid_, SIGTERM);
524
525 // Wait up to 3 seconds for graceful exit
526 constexpr int max_wait_ms = 3000;
527 constexpr int poll_interval_ms = 50;
528 int waited = 0;
529 while (waited < max_wait_ms) {
530 int status = 0;
531 pid_t result = ::waitpid(child_pid_, &status, WNOHANG);
532 if (result == child_pid_) {
533 child_pid_ = -1;
534 return;
535 }
536 std::this_thread::sleep_for(
537 std::chrono::milliseconds(poll_interval_ms));
538 waited += poll_interval_ms;
539 }
540
541 // Force kill
542 ::kill(child_pid_, SIGKILL);
543 ::waitpid(child_pid_, nullptr, 0);
544 logger->warn("Force-killed child PID {}", child_pid_);
545 child_pid_ = -1;
546}
547
554void StdioTransport::close_fd(int& fd) {
555 if (fd >= 0) {
556 ::close(fd);
557 fd = -1;
558 }
559}
560
561} // namespace entropic
bool open() override
Spawn child process and open pipes.
bool is_connected() const override
Check if child process is alive.
void interrupt() override
Abort any blocking read and mark transport uncancellable.
std::string send_request(const std::string &request_json, uint32_t timeout_ms=0) override
Send JSON-RPC request via stdin, read response from stdout.
void close() override
Send SIGTERM, reap child, close pipes.
StdioTransport(std::string command, std::vector< std::string > args, std::map< std::string, std::string > env={}, uint32_t default_timeout_ms=30000)
Construct with command, arguments, and environment.
~StdioTransport() override
Destructor — ensures child is cleaned up.
spdlog initialization and logger access.
ENTROPIC_EXPORT std::shared_ptr< spdlog::logger > get(const std::string &name)
Get or create a named logger.
Definition logging.cpp:211
Activate model on GPU (WARM → ACTIVE).
static std::vector< char * > build_envp(const std::vector< std::string > &env_strs)
Build a NULL-terminated envp from env strings.
static std::string sanitize_display_name(const std::string &raw)
Sanitize a display_name for use as a log-line bracket label.
static std::vector< char * > build_argv(const std::string &command, const std::vector< std::string > &args)
Build a NULL-terminated argv from command + args.
@ ok
Tool dispatched, returned non-empty content.
Stdio transport for external MCP servers.