Entropic 2.3.8
Local-first agentic inference engine
Loading...
Searching...
No Matches
external_bridge.cpp
Go to the documentation of this file.
1// SPDX-License-Identifier: Apache-2.0
13#include <entropic/entropic.h>
15
16#include "engine_handle.h"
17
18#include <nlohmann/json.hpp>
19
20#include <atomic>
21#include <cerrno>
22#include <chrono>
23#include <cstring>
24#include <filesystem>
25#include <fstream>
26#include <functional>
27#include <mutex>
28#include <sstream>
29#include <thread>
30#include <unordered_map>
31#include <unordered_set>
32#include <unistd.h>
33#include <sys/socket.h>
34#include <sys/stat.h>
35#include <sys/un.h>
36#include <poll.h>
37
38using json = nlohmann::json;
39
40namespace entropic {
41
42static auto logger = entropic::log::get("mcp.external_bridge");
43
44// gh#58: process-wide set of socket paths currently bound by an
45// ExternalBridge in this process. Without this, a second handle whose
46// project_dir hashes to the same socket path as a first handle would
47// unlink the live socket out from under handle #1 (see the
48// std::filesystem::remove call in create_listen_socket). With this
49// guard, the second handle declines to start its bridge instead.
50static std::mutex s_bound_sockets_mu;
51static std::unordered_set<std::string> s_bound_sockets;
52
53// Forward declaration
54std::filesystem::path compute_socket_path(
55 const std::filesystem::path& project_dir);
56
57// ── JSON-RPC helpers ─────────────────────────────────────
58
67static std::string rpc_ok(const json& id, const json& result) {
68 json r = {{"jsonrpc", "2.0"}, {"id", id}, {"result", result}};
69 return r.dump();
70}
71
81static std::string rpc_err(const json& id, int code,
82 const std::string& msg) {
83 json r = {{"jsonrpc", "2.0"}, {"id", id},
84 {"error", {{"code", code}, {"message", msg}}}};
85 return r.dump();
86}
87
95static json tool_text(const std::string& text) {
96 return {{"content", json::array({{{"type", "text"}, {"text", text}}})}};
97}
98
99// ── Tool definitions ─────────────────────────────────────
100
107static json tool_definitions() {
108 return json::array({
109 {{"name", "entropic.ask"},
110 {"description",
111 "Submit a prompt to the running entropic engine. "
112 "Set async=true to return immediately with a task_id; "
113 "the engine pushes a notification when done."},
114 {"inputSchema", {
115 {"type", "object"},
116 {"properties", {
117 {"prompt", {{"type", "string"},
118 {"description", "User message"}}},
119 {"async", {{"type", "boolean"},
120 {"description", "Run asynchronously"},
121 {"default", false}}}
122 }},
123 {"required", json::array({"prompt"})}
124 }}},
125 {{"name", "entropic.ask_status"},
126 {"description",
127 "Check status of an async entropic.ask task."},
128 {"inputSchema", {
129 {"type", "object"},
130 {"properties", {{"task_id", {
131 {"type", "string"},
132 {"description", "Task ID from async ask"}
133 }}}},
134 {"required", json::array({"task_id"})}
135 }}},
136 {{"name", "entropic.status"},
137 {"description", "Engine version and message count."},
138 {"inputSchema", {{"type", "object"},
139 {"properties", json::object()}}}},
140 {{"name", "entropic.context_clear"},
141 {"description", "Clear conversation history."},
142 {"inputSchema", {{"type", "object"},
143 {"properties", json::object()}}}},
144 {{"name", "entropic.context_count"},
145 {"description", "Return the message count."},
146 {"inputSchema", {{"type", "object"},
147 {"properties", json::object()}}}},
148 });
149}
150
151// ── Tool handlers ────────────────────────────────────────
152
165static std::string extract_final_text(const char* result_json) {
166 auto arr = json::parse(result_json, nullptr, false);
167 if (!arr.is_array()) { return {}; }
168 for (auto it = arr.rbegin(); it != arr.rend(); ++it) {
169 if (it->value("role", "") == "assistant") {
170 return it->value("content", "");
171 }
172 }
173 return {};
174}
175
183static void write_json_line(int fd, const json& msg) {
184 auto s = msg.dump() + "\n";
185 ::write(fd, s.c_str(), s.size());
186}
187
196static void send_progress(int fd, const std::string& token_text,
197 const std::string& progress_token) {
198 json notif = {
199 {"jsonrpc", "2.0"},
200 {"method", "notifications/progress"},
201 {"params", {
202 {"progressToken", progress_token},
203 {"progress", token_text}
204 }}
205 };
206 write_json_line(fd, notif);
207}
208
224static json handle_ask(entropic_handle_t handle, const json& args,
225 int client_fd, const std::string& call_id) {
226 auto it = args.find("prompt");
227 if (it == args.end() || !it->is_string()) {
228 return tool_text("error: missing 'prompt' argument");
229 }
230 std::string prompt = it->get<std::string>();
231
232 // Stream tokens as progress notifications
233 struct StreamCtx { int fd; std::string token_id; };
234 StreamCtx sctx{client_fd, call_id};
235 auto on_token = [](const char* tok, size_t len, void* ud) {
236 auto* ctx = static_cast<StreamCtx*>(ud);
237 send_progress(ctx->fd, std::string(tok, len), ctx->token_id);
238 };
239 auto err = entropic_run_streaming(
240 handle, prompt.c_str(), on_token, &sctx, nullptr);
241 if (err != ENTROPIC_OK) {
242 const char* msg = entropic_last_error(handle);
243 return tool_text(std::string("error: ") + (msg ? msg : "unknown"));
244 }
245
246 // Extract clean final text from conversation
247 char* msgs_json = nullptr;
248 entropic_context_get(handle, &msgs_json);
249 auto text = (msgs_json != nullptr)
250 ? extract_final_text(msgs_json) : std::string{};
251 entropic_free(msgs_json);
252 return tool_text(text.empty() ? "(no response)" : text);
253}
254
262static json handle_status(entropic_handle_t handle) {
263 size_t count = 0;
264 entropic_context_count(handle, &count);
265 std::ostringstream os;
266 os << "entropic " << entropic_version()
267 << "\nmessages: " << count;
268 // Metrics + per-tier breakdown (P2-15 follow-up, 2.0.6-rc16.2)
269 char* mjson = nullptr;
270 if (entropic_metrics_json(handle, &mjson) == ENTROPIC_OK
271 && mjson != nullptr) {
272 os << "\nmetrics: " << mjson;
273 entropic_free(mjson);
274 }
275 return tool_text(os.str());
276}
277
308 std::string status;
309 std::string phase;
310 std::string text;
311};
312
328 entropic_handle_t handle,
330 char* result_json) {
332 if (err == ENTROPIC_ERROR_CANCELLED
333 || err == ENTROPIC_ERROR_INTERRUPTED) {
334 const char* msg = entropic_last_error(handle);
335 s.text = msg ? msg : "cancelled";
336 s.status = "cancelled";
337 s.phase = "cancelled";
338 } else if (err != ENTROPIC_OK) {
339 const char* msg = entropic_last_error(handle);
340 s.text = msg ? msg : "unknown error";
341 s.status = "error";
342 s.phase = "failed";
343 } else {
344 s.text = extract_final_text(result_json);
345 entropic_free(result_json);
346 s.status = "done";
347 s.phase = "done";
348 }
349 return s;
350}
351
360 std::lock_guard<std::mutex> lock(bridge->tasks_mutex_);
361 bool any = false;
362 for (auto& [_, task] : bridge->tasks_for_cancel()) {
363 if (task.status == "queued" || task.status == "running") {
364 task.status = "cancelled";
365 task.phase = "cancelling";
366 any = true;
367 }
368 }
369 return any;
370}
371
379 std::lock_guard<std::mutex> lock(bridge->tasks_mutex_);
380 for (auto& [_, task] : bridge->tasks_for_cancel()) {
381 if (task.phase == "cancelling") { return true; }
382 }
383 return false;
384}
385
402 entropic_handle_t handle, ExternalBridge* bridge) {
403 if (bridge == nullptr) { return; }
404 entropic_interrupt(handle); // idempotent, cheap
405 if (!mark_tasks_cancelling(bridge)) { return; }
406 for (int i = 0; i < 20 && any_cancelling_left(bridge); ++i) {
407 std::this_thread::sleep_for(std::chrono::milliseconds(50));
408 }
409 bridge->detach_phase_observer(); // bump gen; silences any stale observer
410}
411
425 ExternalBridge* bridge) {
426 cancel_inflight_async_tasks(handle, bridge);
427 auto err = entropic_context_clear(handle);
428 if (err != ENTROPIC_OK) {
429 return tool_text("error: clear failed");
430 }
431 return tool_text("conversation cleared");
432}
433
441static json handle_count(entropic_handle_t handle) {
442 size_t count = 0;
443 entropic_context_count(handle, &count);
444 return tool_text(std::to_string(count));
445}
446
447// ── Async ask status ─────────────────────────────────────
448
460json ExternalBridge::handle_ask_status(const json& args) {
461 auto tid = args.value("task_id", std::string{});
462 std::lock_guard<std::mutex> lock(tasks_mutex_);
463 auto it = tasks_.find(tid);
464 if (it == tasks_.end()) {
465 return tool_text("error: unknown task_id");
466 }
467 json status = {{"status", it->second.status},
468 {"phase", it->second.phase}};
469 if (!it->second.result.empty()) {
470 auto key = (it->second.status == "error") ? "error" : "result";
471 status[key] = it->second.result;
472 }
473 return tool_text(status.dump());
474}
475
476// ── UUID generation ──────────────────────────────────────
477
484static std::string generate_task_id() {
485 static std::atomic<uint64_t> counter{0};
486 auto n = counter.fetch_add(1);
487 auto t = std::chrono::steady_clock::now().time_since_epoch().count();
488 std::ostringstream ss;
489 ss << std::hex << (t ^ (n * 2654435761ULL));
490 return "task-" + ss.str();
491}
492
493// ── Dispatch ─────────────────────────────────────────────
494
507 ExternalBridge* bridge,
508 const json& args,
509 int client_fd,
510 const std::string& call_id) {
511 if (args.value("async", false)) {
512 auto task_id = generate_task_id();
513 bridge->run_async_ask(
514 args.value("prompt", ""), task_id, client_fd);
515 return tool_text("async task started: " + task_id);
516 }
517 return handle_ask(handle, args, client_fd, call_id);
518}
519
537 ExternalBridge* bridge,
538 const json& params,
539 int client_fd,
540 const std::string& call_id) {
541 std::string name = params.value("name", std::string{});
542 json args = params.value("arguments", json::object());
543 if (name == "entropic.ask") {
544 return dispatch_ask(handle, bridge, args, client_fd, call_id);
545 }
546 json result;
547 if (name == "entropic.ask_status") { result = bridge->handle_ask_status(args); }
548 else if (name == "entropic.status") { result = handle_status(handle); }
549 else if (name == "entropic.context_clear") { result = handle_clear(handle, bridge); }
550 else if (name == "entropic.context_count") { result = handle_count(handle); }
551 else { result = tool_text("error: unknown tool '" + name + "'"); }
552 return result;
553}
554
555// ── ExternalBridge ───────────────────────────────────────
556
565 entropic_handle_t handle,
566 const ExternalMCPConfig& config,
567 const std::filesystem::path& project_dir)
568 : handle_(handle), config_(config) {
569 socket_path_ = config.socket_path.has_value()
570 ? config.socket_path.value()
571 : compute_socket_path(project_dir);
572}
573
581
593static void prepare_socket_dir(const std::filesystem::path& parent) {
594 std::error_code ec;
595 std::filesystem::create_directories(parent, ec);
596 ::chmod(parent.c_str(), S_IRWXU); // 0700
597}
598
612static bool socket_path_safe(const std::filesystem::path& path) {
613 struct stat st{};
614 if (::lstat(path.c_str(), &st) != 0) {
615 return errno == ENOENT; // absent is fine
616 }
617 bool is_symlink = S_ISLNK(st.st_mode);
618 bool is_socket = S_ISSOCK(st.st_mode);
619 if (!is_socket || is_symlink) {
620 logger->error(
621 "Refusing to bind: {} is a {} (expected unix socket)",
622 path.string(),
623 is_symlink ? "symlink" : "non-socket file");
624 }
625 return is_socket && !is_symlink;
626}
627
636static bool bind_and_listen(int fd, const std::filesystem::path& path) {
637 auto s = path.string();
638 if (s.size() >= sizeof(sockaddr_un::sun_path)) { return false; }
639 struct sockaddr_un addr{};
640 addr.sun_family = AF_UNIX;
641 std::strncpy(addr.sun_path, s.c_str(),
642 sizeof(addr.sun_path) - 1);
643 if (bind(fd, reinterpret_cast<sockaddr*>(&addr), sizeof(addr)) != 0) {
644 return false;
645 }
646 ::chmod(s.c_str(), S_IRUSR | S_IWUSR); // 0600 — owner only
647 return listen(fd, 1) == 0;
648}
649
665static int create_listen_socket(const std::filesystem::path& path) {
666 prepare_socket_dir(path.parent_path());
667 if (!socket_path_safe(path)) { return -1; }
668 std::filesystem::remove(path);
669
670 int fd = socket(AF_UNIX, SOCK_STREAM, 0);
671 bool ok = (fd >= 0) && bind_and_listen(fd, path);
672 if (!ok) {
673 logger->error("Socket setup failed for {}: {}",
674 path.string(), std::strerror(errno));
675 if (fd >= 0) { ::close(fd); }
676 return -1;
677 }
678 logger->info(
679 "External MCP bridge ready: project_dir(canonical)={} socket={}",
680 path.parent_path().parent_path().string(), path.string());
681 return fd;
682}
683
698static bool peer_uid_matches(int client_fd) {
699 struct ucred cred{};
700 socklen_t len = sizeof(cred);
701 if (getsockopt(client_fd, SOL_SOCKET, SO_PEERCRED,
702 &cred, &len) != 0) {
703 logger->warn("SO_PEERCRED failed on fd={}: {}",
704 client_fd, std::strerror(errno));
705 return false;
706 }
707 if (cred.uid != ::geteuid()) {
708 logger->warn(
709 "Rejecting MCP client on fd={}: peer uid {} != engine uid {}",
710 client_fd, cred.uid, ::geteuid());
711 return false;
712 }
713 return true;
714}
715
723 // Refuse to start if another handle in this process already owns
724 // this socket path. The pre-fix create_listen_socket call unlinks
725 // any existing socket file before binding — that's correct for
726 // crashed-prior-process recovery, but if a live in-process bridge
727 // owned that file, the unlink stole its binding silently. The
728 // claim must happen *before* create_listen_socket so a concurrent
729 // start() on a colliding path cannot pass the check and then race
730 // into the unlink.
731 std::error_code ec;
732 auto canonical =
733 std::filesystem::weakly_canonical(socket_path_, ec).string();
734 if (ec) { canonical = socket_path_.string(); }
735 {
736 std::lock_guard lk(s_bound_sockets_mu);
737 if (!s_bound_sockets.insert(canonical).second) {
738 logger->warn(
739 "External MCP bridge: socket {} already bound by another "
740 "handle in this process; declining to start. Set "
741 "external.socket_path to a distinct path per handle "
742 "if you need per-handle bridges.",
743 canonical);
744 return false;
745 }
746 }
747
748 listen_fd_ = create_listen_socket(socket_path_);
749 if (listen_fd_ < 0) {
750 std::lock_guard lk(s_bound_sockets_mu);
751 s_bound_sockets.erase(canonical);
752 return false;
753 }
754 bound_canonical_ = canonical;
755
756 running_.store(true);
757 // gh#59 (v2.3.1): the accept thread (and the per-client serve
758 // threads it spawns) need to log on this handle's behalf — install
759 // a HandleLogScope at the top of each thread body so spdlog lines
760 // route through HandleAwareSink to this handle's session.log.
761 int log_id = handle_ ? handle_->log_id : 0;
762 accept_thread_ = std::thread([this, log_id]() {
763 entropic::log::HandleLogScope scope(log_id);
764 accept_loop();
765 });
766
767 logger->info("External MCP bridge listening on {}",
768 socket_path_.string());
769 return true;
770}
771
793 running_.store(false);
794 if (listen_fd_ >= 0) {
795 ::close(listen_fd_);
796 listen_fd_ = -1;
797 }
798 if (accept_thread_.joinable()) {
799 accept_thread_.join();
800 }
801
802 // Wake every connected client's blocking read so the per-client
803 // thread can observe the disconnect and exit. shutdown(SHUT_RDWR)
804 // on a connected socket returns EOF to the peer; the read in
805 // serve_client returns 0 and read_line returns "" → exits.
806 std::vector<std::unique_ptr<ClientThread>> drained;
807 {
808 std::lock_guard<std::mutex> lock(client_threads_mutex_);
809 drained = std::move(client_threads_);
810 client_threads_.clear();
811 }
812 for (auto& ct : drained) {
813 if (ct->fd >= 0) { ::shutdown(ct->fd, SHUT_RDWR); }
814 }
815 for (auto& ct : drained) {
816 if (ct->thread.joinable()) { ct->thread.join(); }
817 }
818
819 // Clean up socket file
820 std::error_code ec;
821 std::filesystem::remove(socket_path_, ec);
822
823 // Release our claim on the canonical path so a later handle in
824 // this process can re-bind. Use the exact string we inserted in
825 // start(); recomputing weakly_canonical here could differ if the
826 // path was deleted mid-run.
827 if (!bound_canonical_.empty()) {
828 std::lock_guard lk(s_bound_sockets_mu);
829 s_bound_sockets.erase(bound_canonical_);
830 bound_canonical_.clear();
831 }
832}
833
846void ExternalBridge::reap_finished_clients_locked() {
847 auto it = client_threads_.begin();
848 while (it != client_threads_.end()) {
849 if ((*it)->finished.load()) {
850 if ((*it)->thread.joinable()) { (*it)->thread.join(); }
851 it = client_threads_.erase(it);
852 } else {
853 ++it;
854 }
855 }
856}
857
883void ExternalBridge::accept_loop() {
884 while (running_.load()) {
885 struct pollfd pfd{};
886 pfd.fd = listen_fd_;
887 pfd.events = POLLIN;
888
889 int rc = poll(&pfd, 1, 500); // 500ms timeout for shutdown check
890 if (rc <= 0) { continue; }
891
892 int client_fd = accept(listen_fd_, nullptr, nullptr);
893 if (client_fd < 0) { continue; }
894
895 if (!peer_uid_matches(client_fd)) {
896 ::close(client_fd); // v2.1.7 (gh#34): cross-uid attempt
897 continue;
898 }
899
900 logger->info("External MCP client connected (fd={})", client_fd);
901
902 auto ct = std::make_unique<ClientThread>();
903 ct->fd = client_fd;
904 // Capture a raw pointer for the thread body so the unique_ptr
905 // can stay in client_threads_ without aliasing.
906 ClientThread* raw = ct.get();
907 // gh#59 (v2.3.1): propagate the handle log scope into the
908 // per-client thread so its `serve_client` logs route to the
909 // owning handle's session.log.
910 int log_id = handle_ ? handle_->log_id : 0;
911 ct->thread = std::thread([this, raw, log_id]() {
912 entropic::log::HandleLogScope scope(log_id);
913 serve_client(raw->fd);
914 ::close(raw->fd);
915 raw->fd = -1;
916 logger->info("External MCP client disconnected");
917 raw->finished.store(true);
918 });
919
920 std::lock_guard<std::mutex> lock(client_threads_mutex_);
921 client_threads_.push_back(std::move(ct));
922 reap_finished_clients_locked();
923 }
924}
925
933static std::string read_line(int fd) {
934 std::string line;
935 char c;
936 while (true) {
937 ssize_t n = read(fd, &c, 1);
938 if (n <= 0) { return {}; }
939 if (c == '\n') { return line; }
940 line += c;
941 }
942}
943
954void ExternalBridge::serve_client(int client_fd) {
955 subscribe(client_fd);
956 struct Unsub {
957 ExternalBridge* self;
958 int fd;
959 ~Unsub() { self->unsubscribe(fd); }
960 } guard{this, client_fd};
961
962 while (running_.load()) {
963 auto line = read_line(client_fd);
964 if (line.empty()) { break; }
965
966 auto response = dispatch(line, client_fd);
967 if (response.empty()) { continue; } // notification — no reply
968 response += '\n';
969
970 ssize_t written = write(client_fd, response.c_str(),
971 response.size());
972 if (written < 0) { break; }
973 }
974}
975
982static json initialize_result() {
983 return {
984 {"protocolVersion", "2025-06-18"},
985 {"serverInfo", {{"name", "entropic"},
986 {"version", entropic_version()}}},
987 {"capabilities", {{"tools", json::object()}}}
988 };
989}
990
1004std::string ExternalBridge::dispatch(
1005 const std::string& request, int client_fd) {
1006 auto req = json::parse(request, nullptr, false);
1007 // Parse error or notification (no id) → no dispatch
1008 if (req.is_discarded() || !req.contains("id")) {
1009 return req.is_discarded()
1010 ? rpc_err(nullptr, -32700, "Parse error")
1011 : std::string{};
1012 }
1013
1014 json id = req["id"];
1015 std::string method = req.value("method", std::string{});
1016 json params = req.value("params", json::object());
1017 json result;
1018
1019 if (method == "initialize") { result = initialize_result(); }
1020 else if (method == "tools/list") { result = {{"tools", tool_definitions()}}; }
1021 else if (method == "tools/call") {
1022 auto id_str = id.is_string() ? id.get<std::string>()
1023 : id.dump();
1024 result = dispatch_tool(handle_, this, params, client_fd, id_str);
1025 }
1026 else if (method == "shutdown" || method == "exit") { result = json::object(); }
1027 else { return rpc_err(id, -32601, "Unknown method: " + method); }
1028 return rpc_ok(id, result);
1029}
1030
1031// ── Async task support ───────────────────────────────────
1032
1038static void phase_observer_cb(int state, void* ud) {
1039 auto* self = static_cast<ExternalBridge*>(ud);
1040 if (state != ENTROPIC_AGENT_STATE_VERIFYING) { return; }
1041 std::lock_guard<std::mutex> lock(self->tasks_mutex_);
1042 // E5+E6 (2.1.0): discard stale callbacks fired after detach_phase_observer
1043 // incremented observer_gen_. attached_gen_ was captured at attach time.
1044 if (self->observer_call_is_stale()) { return; }
1045 auto it = self->tasks_for_cancel().find(self->active_task_id_for_observer());
1046 if (it == self->tasks_for_cancel().end()) { return; }
1047 // First VERIFYING = "validating"; subsequent VERIFYING transitions
1048 // on the same task indicate revision retries → "revising".
1049 it->second.phase = (it->second.phase == "validating"
1050 || it->second.phase == "revising")
1051 ? "revising" : "validating";
1052}
1053
1065void ExternalBridge::attach_phase_observer(const std::string& task_id) {
1066 {
1067 std::lock_guard<std::mutex> lock(tasks_mutex_);
1068 active_task_id_ = task_id;
1069 attached_gen_ = ++observer_gen_;
1070 }
1072}
1073
1087 {
1088 std::lock_guard<std::mutex> lock(tasks_mutex_);
1089 ++observer_gen_;
1090 active_task_id_.clear();
1091 }
1092 entropic_set_state_observer(handle_, nullptr, nullptr);
1093}
1094
1109 const std::string& prompt,
1110 const std::string& task_id,
1111 int client_fd) {
1112 {
1113 std::lock_guard<std::mutex> lock(tasks_mutex_);
1114 AsyncTask t;
1115 t.status = "queued";
1116 t.phase = "queued";
1117 t.created = std::chrono::steady_clock::now();
1118 tasks_[task_id] = std::move(t);
1119 }
1120
1121 // gh#59 (v2.3.1): the async-ask worker runs entropic_run on this
1122 // handle's behalf — scope its logs to the owning handle.
1123 int log_id = handle_ ? handle_->log_id : 0;
1124 std::thread([this, prompt, task_id, client_fd, log_id]() {
1125 entropic::log::HandleLogScope scope(log_id);
1126 update_task_phase(task_id, "running", "running");
1127 attach_phase_observer(task_id);
1128
1129 char* result_json = nullptr;
1130 auto err = entropic_run(handle_, prompt.c_str(), &result_json);
1131
1133
1134 auto final_state = derive_async_final_state(
1135 handle_, err, result_json);
1136
1137 {
1138 std::lock_guard<std::mutex> lock(tasks_mutex_);
1139 auto it = tasks_.find(task_id);
1140 if (it != tasks_.end()) {
1141 it->second.status = final_state.status;
1142 it->second.phase = final_state.phase;
1143 it->second.result = final_state.text;
1144 }
1145 // Issue #12 (v2.1.4): write sentinel UNDER tasks_mutex_,
1146 // before the MCP notification fires. Any external monitor
1147 // reacting to the sentinel can immediately call
1148 // entropic.ask_status and see the same terminal state.
1149 write_sentinel(task_id, final_state.status);
1150 }
1151 auto status = final_state.status;
1152
1153 // Issue #4 (v2.1.2, parts A+B): emit the spec-defined
1154 // ``notifications/progress`` method instead of the previous
1155 // non-spec ``notifications/ask_complete``. MCP-compliant
1156 // clients are obligated to drain ``notifications/progress``
1157 // (it's in the documented set); some clients silently
1158 // buffered or stalled on the unknown method name, which
1159 // combined with the bridge's blocking broadcast (fixed in
1160 // part C of this release) produced the deadlock observed
1161 // against entropic-explorer ↔ Claude Code in the field.
1162 //
1163 // ``progressToken`` is the ``task_id`` so the consumer can
1164 // correlate the notification back to the originating
1165 // ``entropic.ask`` response (which carried the same
1166 // ``task_id``). The result body is NO LONGER shipped inline
1167 // — consumers fetch via ``entropic.ask_status``. This caps
1168 // notification size at ~200 bytes regardless of generated
1169 // output, eliminating a real DoS surface (a 50KB result
1170 // would otherwise flood the broadcast write path on every
1171 // subscriber). ``status`` rides in ``message`` so consumers
1172 // can branch on done / error / cancelled without an extra
1173 // round-trip just to learn which result kind to fetch.
1174 json notif = {
1175 {"jsonrpc", "2.0"},
1176 {"method", "notifications/progress"},
1177 {"params", {
1178 {"progressToken", task_id},
1179 {"progress", 100},
1180 {"total", 100},
1181 {"message", status}
1182 }}
1183 };
1185
1187 logger->info("Async task {} completed: {}", task_id, status);
1188 }).detach();
1189}
1190
1203 std::lock_guard<std::mutex> lock(subscribers_mutex_);
1204 subscribers_.insert(fd);
1205}
1206
1214 std::lock_guard<std::mutex> lock(subscribers_mutex_);
1215 subscribers_.erase(fd);
1216}
1217
1252 auto payload = notif.dump() + "\n";
1253
1254 std::vector<int> snapshot;
1255 {
1256 std::lock_guard<std::mutex> lock(subscribers_mutex_);
1257 snapshot.assign(subscribers_.begin(), subscribers_.end());
1258 }
1259
1260 std::vector<int> dead;
1261 for (int fd : snapshot) {
1262 ssize_t rc = ::send(fd, payload.c_str(), payload.size(),
1263 MSG_DONTWAIT | MSG_NOSIGNAL);
1264 if (rc < 0) {
1265 // EAGAIN / EWOULDBLOCK: peer recv buffer full (slow consumer).
1266 // EBADF / EPIPE / ECONNRESET: peer closed or otherwise dead.
1267 // All collapse to "drop" — the long-term per-subscriber
1268 // queue lives in proposal P2-20260429-001.
1269 logger->warn("Subscriber fd {} send failed (errno={}) — dropping",
1270 fd, errno);
1271 dead.push_back(fd);
1272 } else if (static_cast<size_t>(rc) < payload.size()) {
1273 logger->warn("Subscriber fd {} partial send ({}/{}) — dropping",
1274 fd, rc, payload.size());
1275 dead.push_back(fd);
1276 }
1277 }
1278
1279 if (!dead.empty()) {
1280 std::lock_guard<std::mutex> lock(subscribers_mutex_);
1281 for (int fd : dead) { subscribers_.erase(fd); }
1282 }
1283}
1284
1293void ExternalBridge::update_task_phase(const std::string& task_id,
1294 const std::string& status,
1295 const std::string& phase) {
1296 std::lock_guard<std::mutex> lock(tasks_mutex_);
1297 auto it = tasks_.find(task_id);
1298 if (it == tasks_.end()) { return; }
1299 it->second.status = status;
1300 it->second.phase = phase;
1301}
1302
1316 auto cutoff = std::chrono::steady_clock::now()
1317 - std::chrono::minutes(15);
1318 auto sentinel_dir = async_sentinel_dir();
1319 std::lock_guard<std::mutex> lock(tasks_mutex_);
1320 for (auto it = tasks_.begin(); it != tasks_.end(); ) {
1321 if (it->second.created < cutoff) {
1322 if (!sentinel_dir.empty()) {
1323 for (const char* suffix :
1324 {".done", ".failed", ".cancelled"}) {
1325 std::error_code ec;
1326 std::filesystem::remove(
1327 sentinel_dir / (it->first + suffix), ec);
1328 }
1329 }
1330 it = tasks_.erase(it);
1331 } else {
1332 ++it;
1333 }
1334 }
1335}
1336
1347std::filesystem::path ExternalBridge::async_sentinel_dir() const {
1348 std::filesystem::path root = async_sentinel_root_override_;
1349 if (root.empty() && handle_ != nullptr) {
1350 root = handle_->config.log_dir;
1351 }
1352 return root.empty() ? std::filesystem::path{} : (root / "async");
1353}
1354
1361 const std::filesystem::path& root) {
1362 async_sentinel_root_override_ = root;
1363}
1364
1371 const std::string& status) {
1372 const char* suffix = ".done";
1373 if (status == "error") {
1374 suffix = ".failed";
1375 } else if (status == "cancelled") {
1376 suffix = ".cancelled";
1377 }
1378 return suffix;
1379}
1380
1396void ExternalBridge::write_sentinel(const std::string& task_id,
1397 const std::string& status) {
1398 auto dir = async_sentinel_dir();
1399 if (dir.empty()) { return; }
1400 std::error_code ec;
1401 std::filesystem::create_directories(dir, ec);
1402 if (ec) {
1403 logger->warn("write_sentinel: mkdir {} failed: {}",
1404 dir.string(), ec.message());
1405 return;
1406 }
1407 auto path = dir / (task_id + sentinel_suffix_for_status(status));
1408 std::ofstream out(path);
1409 if (!out.is_open()) {
1410 logger->warn("write_sentinel: open {} failed", path.string());
1411 return;
1412 }
1413 out << status << '\n';
1414}
1415
1416} // namespace entropic
Unix socket MCP bridge for external client access.
bool start()
Start the background accept loop.
void cleanup_expired_tasks()
Remove tasks older than TTL from the registry.
void stop()
Stop the accept loop and close the socket.
void unsubscribe(int fd)
Remove an fd from the subscriber set.
std::mutex tasks_mutex_
Async task mutex (public for dispatch_tool access).
~ExternalBridge()
Destructor — stop if running.
void detach_phase_observer()
Clear the phase observer installed by attach_phase_observer.
nlohmann::json handle_ask_status(const nlohmann::json &args)
Handle entropic.ask_status — check async task state.
std::filesystem::path async_sentinel_dir() const
Sentinel directory (lazy: returns empty path until the engine's log_dir is configured).
ExternalBridge(entropic_handle_t handle, const ExternalMCPConfig &config, const std::filesystem::path &project_dir)
Construct with engine handle and config.
void run_async_ask(const std::string &prompt, const std::string &task_id, int client_fd)
Run an async entropic.ask in a detached background thread.
void broadcast_notification(const nlohmann::json &notif)
Write a JSON-RPC notification to every subscribed fd.
void set_async_sentinel_root(const std::filesystem::path &root)
Override the async sentinel root directory.
void attach_phase_observer(const std::string &task_id)
Run an async entropic.ask in a background thread.
void write_sentinel(const std::string &task_id, const std::string &status)
Write the sentinel file for an async task completion.
void subscribe(int fd)
Add a connected fd to the subscriber set.
void update_task_phase(const std::string &task_id, const std::string &status, const std::string &phase)
Update status/phase for a tracked task atomically.
std::unordered_map< std::string, AsyncTask > & tasks_for_cancel()
Mutable accessor to the task registry.
gh#59 (v2.3.1): RAII guard — sets thread's current handle_id.
Definition logging.h:154
Private definition of the entropic_engine struct.
Public C API for the Entropic inference engine.
ENTROPIC_EXPORT entropic_error_t entropic_set_state_observer(entropic_handle_t handle, void(*observer)(int state, void *user_data), void *user_data)
Register an engine state-change observer.
ENTROPIC_EXPORT entropic_error_t entropic_context_count(entropic_handle_t handle, size_t *count)
Get the number of messages in the conversation.
ENTROPIC_EXPORT entropic_error_t entropic_context_clear(entropic_handle_t handle)
Clear conversation history, starting a new session.
ENTROPIC_EXPORT entropic_error_t entropic_run(entropic_handle_t handle, const char *input, char **result_json)
Synchronous agentic loop.
ENTROPIC_EXPORT entropic_error_t entropic_metrics_json(entropic_handle_t handle, char **out)
Get loop metrics from the most recent run as JSON.
ENTROPIC_EXPORT const char * entropic_version(void)
Get the library version string.
ENTROPIC_EXPORT entropic_error_t entropic_interrupt(entropic_handle_t handle)
Interrupt a running generation.
ENTROPIC_EXPORT void entropic_free(void *ptr)
Free memory allocated by the engine or entropic_alloc().
ENTROPIC_EXPORT entropic_error_t entropic_context_get(entropic_handle_t handle, char **messages_json)
Get the current conversation history as a JSON array.
ENTROPIC_EXPORT entropic_error_t entropic_run_streaming(entropic_handle_t handle, const char *input, void(*on_token)(const char *token, size_t len, void *user_data), void *user_data, int *cancel_flag)
Streaming agentic loop with token callback.
@ ENTROPIC_AGENT_STATE_VERIFYING
Post-generation verification.
Definition enums.h:41
entropic_error_t
Error codes returned by all C API functions.
Definition error.h:35
@ ENTROPIC_OK
Success.
Definition error.h:36
@ ENTROPIC_ERROR_CANCELLED
Operation cancelled via cancel token.
Definition error.h:48
@ ENTROPIC_ERROR_INTERRUPTED
Operation interrupted via entropic_interrupt (v1.8.9)
Definition error.h:62
ENTROPIC_EXPORT const char * entropic_last_error(entropic_handle_t handle)
Get the last error message for a handle.
Definition entropic.cpp:90
Unix socket MCP bridge — exposes a running engine to external clients.
spdlog initialization and logger access.
ENTROPIC_EXPORT std::shared_ptr< spdlog::logger > get(const std::string &name)
Get or create a named logger.
Definition logging.cpp:211
Activate model on GPU (WARM → ACTIVE).
static std::string generate_task_id()
Generate a simple UUID-like task ID.
static bool any_cancelling_left(ExternalBridge *bridge)
True while any task is still phase=cancelling.
static json handle_ask(entropic_handle_t handle, const json &args, int client_fd, const std::string &call_id)
Handle entropic.ask — stream tokens then return final text.
std::filesystem::path compute_socket_path(const std::filesystem::path &project_dir)
Compute project-unique Unix socket path for self-detection.
static const char * sentinel_suffix_for_status(const std::string &status)
Map a terminal status string to a sentinel filename suffix.
static int create_listen_socket(const std::filesystem::path &path)
Create, bind, and listen on a unix domain socket.
@ ok
Tool dispatched, returned non-empty content.
static void phase_observer_cb(int state, void *ud)
State observer that projects VERIFYING onto task phase.
static bool socket_path_safe(const std::filesystem::path &path)
Reject a pre-existing path that is a symlink or non-socket.
static json handle_count(entropic_handle_t handle)
Handle entropic.context_count.
static bool peer_uid_matches(int client_fd)
Validate that the connecting peer shares the engine's UID.
static json tool_text(const std::string &text)
Wrap text in MCP tool result shape.
static bool mark_tasks_cancelling(ExternalBridge *bridge)
Mark every queued/running task as cancelling.
static void send_progress(int fd, const std::string &token_text, const std::string &progress_token)
Send an MCP progress notification with a text token.
static bool bind_and_listen(int fd, const std::filesystem::path &path)
Bind+listen on an AF_UNIX socket, applying 0600 perms.
static json handle_clear(entropic_handle_t handle, ExternalBridge *bridge)
entropic.context_clear MCP tool handler.
static void cancel_inflight_async_tasks(entropic_handle_t handle, ExternalBridge *bridge)
Cancel any async tasks currently running on the bridge.
static AsyncFinalState derive_async_final_state(entropic_handle_t handle, entropic_error_t err, char *result_json)
Translate entropic_run's return code into a final task state.
static json tool_definitions()
MCP tool definitions exposed by the bridge.
static void write_json_line(int fd, const json &msg)
Write a JSON-RPC line to a socket fd.
static std::string extract_final_text(const char *result_json)
Extract the last assistant message from a run result JSON.
static std::string rpc_err(const json &id, int code, const std::string &msg)
Build a JSON-RPC error response.
static json initialize_result()
Build the MCP initialize response payload.
static json dispatch_tool(entropic_handle_t handle, ExternalBridge *bridge, const json &params, int client_fd, const std::string &call_id)
Dispatch a tools/call to the appropriate handler.
static std::string read_line(int fd)
Read one newline-delimited line from a socket fd.
static json handle_status(entropic_handle_t handle)
Handle entropic.status.
static json dispatch_ask(entropic_handle_t handle, ExternalBridge *bridge, const json &args, int client_fd, const std::string &call_id)
Route entropic.ask — sync (streaming) or async.
static std::string rpc_ok(const json &id, const json &result)
Build a JSON-RPC success response.
static void prepare_socket_dir(const std::filesystem::path &parent)
Prepare the socket containing directory with 0700 perms.
Handle entropic.context_clear.
std::string text
result or error message
std::string status
done | error | cancelled
std::string phase
done | failed | cancelled
Async task state for background entropic.ask runs.
std::string phase
queued, running, running:<tier>, done, failed, cancelled (P1-5)
std::chrono::steady_clock::time_point created
For TTL cleanup.
std::string status
queued | running | done | error | cancelled (2.0.6-rc16)
External MCP server configuration (Entropic-as-server).
Definition config.h:423
std::optional< std::filesystem::path > socket_path
Socket path (nullopt = derived)
Definition config.h:425
std::filesystem::path log_dir
Session log directory (session.log + session_model.log).
Definition config.h:742
Engine handle struct — owns all subsystems.
int log_id
gh#59 (v2.3.1): unique handle id for per-handle log routing via entropic::log::HandleAwareSink.
entropic::ParsedConfig config
Parsed config.