34 IgnoreSigpipe() { signal(SIGPIPE, SIG_IGN); }
36static IgnoreSigpipe ignore_sigpipe;
63 out.reserve(raw.size());
65 if (out.empty() && c ==
'/') {
continue; }
66 if (c ==
'[' || c ==
']') {
continue; }
67 if (
static_cast<unsigned char>(c) < 0x20 || c == 0x7f) {
continue; }
70 return out.empty() ? std::string{
"server"} : out;
84 std::vector<std::string> args,
85 std::map<std::string, std::string> env,
86 uint32_t default_timeout_ms)
88 command_(std::move(command)),
89 args_(std::move(args)),
91 default_timeout_ms_(default_timeout_ms) {}
109 std::string display_name,
111 std::vector<std::string> args,
112 std::map<std::string, std::string> env,
113 uint32_t default_timeout_ms)
115 display_name.empty() ? command : std::move(display_name))),
116 command_(std::move(command)),
117 args_(std::move(args)),
118 env_(std::move(env)),
119 default_timeout_ms_(default_timeout_ms) {}
137bool StdioTransport::create_all_pipes(
int (&fds)[6]) {
138 bool ok = create_pipe(fds[0], fds[1]) &&
139 create_pipe(fds[2], fds[3]) &&
140 create_pipe(fds[4], fds[5]);
142 logger->error(
"Failed to create pipes: {}", strerror(errno));
143 for (
auto& fd : fds) { close_fd(fd); }
159 if (!open_child_process()) {
164 stderr_thread_ = std::thread(
165 &StdioTransport::stderr_reader_loop,
this);
167 logger->info(
"Spawned child process PID {} for '{}' (cmd: {})",
168 child_pid_, display_name_, command_);
178bool StdioTransport::open_child_process() {
179 int fds[6] = {-1, -1, -1, -1, -1, -1};
180 if (!create_all_pipes(fds)) {
184 auto env_strs = build_env();
185 bool ok = spawn_child(fds[0], fds[3], fds[5], env_strs);
215 close_fd(stdout_fd_);
216 close_fd(stderr_fd_);
218 if (stderr_thread_.joinable()) {
219 stderr_thread_.join();
222 logger->info(
"Closed stdio transport for '{}'", display_name_);
234 const std::string& request_json,
235 uint32_t timeout_ms) {
237 if (!connected_ || cancel_flag_.load(std::memory_order_acquire)) {
241 uint32_t actual_timeout = timeout_ms > 0
242 ? timeout_ms : default_timeout_ms_;
244 std::lock_guard<std::mutex> lock(io_mutex_);
246 std::string msg = request_json +
"\n";
247 ssize_t written = ::write(stdin_fd_, msg.data(), msg.size());
248 if (written < 0 ||
static_cast<size_t>(written) != msg.size()) {
249 logger->error(
"Write to child stdin failed: {}",
255 return read_line(stdout_fd_, actual_timeout);
268 if (child_pid_ > 0 && kill(child_pid_, 0) == 0) {
286 cancel_flag_.store(
true, std::memory_order_release);
297static std::vector<char*>
build_argv(
const std::string& command,
298 const std::vector<std::string>& args) {
299 std::vector<char*> argv;
300 argv.push_back(
const_cast<char*
>(command.c_str()));
301 for (
const auto& arg : args) {
302 argv.push_back(
const_cast<char*
>(arg.c_str()));
304 argv.push_back(
nullptr);
316 const std::vector<std::string>& env_strs) {
317 std::vector<char*> envp;
318 for (
const auto& e : env_strs) {
319 envp.push_back(
const_cast<char*
>(e.c_str()));
321 envp.push_back(
nullptr);
335bool StdioTransport::spawn_child(
336 int stdin_r,
int stdout_w,
int stderr_w,
337 const std::vector<std::string>& env_strs) {
339 posix_spawn_file_actions_t actions;
340 posix_spawn_file_actions_init(&actions);
341 posix_spawn_file_actions_adddup2(&actions, stdin_r, STDIN_FILENO);
342 posix_spawn_file_actions_adddup2(&actions, stdout_w, STDOUT_FILENO);
343 posix_spawn_file_actions_adddup2(&actions, stderr_w, STDERR_FILENO);
348 int err = posix_spawnp(
349 &child_pid_, command_.c_str(), &actions,
350 nullptr, argv.data(), envp.data());
352 posix_spawn_file_actions_destroy(&actions);
355 logger->error(
"posix_spawnp failed for '{}' (cmd: {}): {}",
356 display_name_, command_, strerror(err));
369std::vector<std::string> StdioTransport::build_env()
const {
370 std::map<std::string, std::string> merged;
373 for (
char** ep = environ; ep && *ep; ++ep) {
374 std::string entry(*ep);
375 auto eq = entry.find(
'=');
376 if (eq != std::string::npos) {
377 merged[entry.substr(0, eq)] = entry.substr(eq + 1);
382 for (
const auto& [key, val] : env_) {
386 std::vector<std::string> result;
387 result.reserve(merged.size());
388 for (
const auto& [key, val] : merged) {
389 result.push_back(key +
"=" + val);
402bool StdioTransport::create_pipe(
int& read_fd,
int& write_fd) {
404 if (::pipe(fds) != 0) {
407 ::fcntl(fds[0], F_SETFD, FD_CLOEXEC);
408 ::fcntl(fds[1], F_SETFD, FD_CLOEXEC);
422int StdioTransport::poll_until_ready(
424 std::chrono::steady_clock::time_point deadline) {
426 auto remaining = std::chrono::duration_cast<
427 std::chrono::milliseconds>(
428 deadline - std::chrono::steady_clock::now());
430 if (remaining.count() <= 0) {
436 constexpr int kSliceMs = 100;
437 int slice =
static_cast<int>(remaining.count());
438 if (slice > kSliceMs) { slice = kSliceMs; }
440 struct pollfd pfd{fd, POLLIN, 0};
441 int rc = ::poll(&pfd, 1, slice);
444 if (std::chrono::steady_clock::now() < deadline) {
464std::string StdioTransport::read_line(
int fd, uint32_t timeout_ms) {
466 auto deadline = std::chrono::steady_clock::now() +
467 std::chrono::milliseconds(timeout_ms);
471 if (cancel_flag_.load(std::memory_order_acquire)) {
472 logger->info(
"Transport read cancelled by interrupt");
475 int ready = poll_until_ready(fd, deadline);
476 if (ready == -2) {
continue; }
478 if (ready == 0) { logger->warn(
"Read timeout after {}ms", timeout_ms); }
482 if (::read(fd, &ch, 1) <= 0) {
break; }
483 if (ch ==
'\n') {
return line; }
494void StdioTransport::stderr_reader_loop() {
495 constexpr int poll_timeout_ms = 500;
497 struct pollfd pfd{stderr_fd_, POLLIN, 0};
498 int ret = ::poll(&pfd, 1, poll_timeout_ms);
504 ssize_t n = ::read(stderr_fd_, buf,
sizeof(buf) - 1);
509 logger->warn(
"[{}] {}", display_name_, buf);
518void StdioTransport::terminate_child() {
519 if (child_pid_ <= 0) {
523 ::kill(child_pid_, SIGTERM);
526 constexpr int max_wait_ms = 3000;
527 constexpr int poll_interval_ms = 50;
529 while (waited < max_wait_ms) {
531 pid_t result = ::waitpid(child_pid_, &status, WNOHANG);
532 if (result == child_pid_) {
536 std::this_thread::sleep_for(
537 std::chrono::milliseconds(poll_interval_ms));
538 waited += poll_interval_ms;
542 ::kill(child_pid_, SIGKILL);
543 ::waitpid(child_pid_,
nullptr, 0);
544 logger->warn(
"Force-killed child PID {}", child_pid_);
554void StdioTransport::close_fd(
int& fd) {
bool open() override
Spawn child process and open pipes.
bool is_connected() const override
Check if child process is alive.
void interrupt() override
Abort any blocking read and mark transport uncancellable.
std::string send_request(const std::string &request_json, uint32_t timeout_ms=0) override
Send JSON-RPC request via stdin, read response from stdout.
void close() override
Send SIGTERM, reap child, close pipes.
StdioTransport(std::string command, std::vector< std::string > args, std::map< std::string, std::string > env={}, uint32_t default_timeout_ms=30000)
Construct with command, arguments, and environment.
~StdioTransport() override
Destructor — ensures child is cleaned up.
spdlog initialization and logger access.
ENTROPIC_EXPORT std::shared_ptr< spdlog::logger > get(const std::string &name)
Get or create a named logger.
Activate model on GPU (WARM → ACTIVE).
static std::vector< char * > build_envp(const std::vector< std::string > &env_strs)
Build a NULL-terminated envp from env strings.
static std::string sanitize_display_name(const std::string &raw)
Sanitize a display_name for use as a log-line bracket label.
static std::vector< char * > build_argv(const std::string &command, const std::vector< std::string > &args)
Build a NULL-terminated argv from command + args.
@ ok
Tool dispatched, returned non-empty content.
Stdio transport for external MCP servers.