18#include <nlohmann/json.hpp>
30#include <unordered_map>
31#include <unordered_set>
33#include <sys/socket.h>
38using json = nlohmann::json;
50static std::mutex s_bound_sockets_mu;
51static std::unordered_set<std::string> s_bound_sockets;
55 const std::filesystem::path& project_dir);
67static std::string
rpc_ok(
const json&
id,
const json& result) {
68 json r = {{
"jsonrpc",
"2.0"}, {
"id",
id}, {
"result", result}};
81static std::string
rpc_err(
const json&
id,
int code,
82 const std::string& msg) {
83 json r = {{
"jsonrpc",
"2.0"}, {
"id",
id},
84 {
"error", {{
"code", code}, {
"message", msg}}}};
96 return {{
"content", json::array({{{
"type",
"text"}, {
"text", text}}})}};
109 {{
"name",
"entropic.ask"},
111 "Submit a prompt to the running entropic engine. "
112 "Set async=true to return immediately with a task_id; "
113 "the engine pushes a notification when done."},
117 {
"prompt", {{
"type",
"string"},
118 {
"description",
"User message"}}},
119 {
"async", {{
"type",
"boolean"},
120 {
"description",
"Run asynchronously"},
123 {
"required", json::array({
"prompt"})}
125 {{
"name",
"entropic.ask_status"},
127 "Check status of an async entropic.ask task."},
130 {
"properties", {{
"task_id", {
132 {
"description",
"Task ID from async ask"}
134 {
"required", json::array({
"task_id"})}
136 {{
"name",
"entropic.status"},
137 {
"description",
"Engine version and message count."},
138 {
"inputSchema", {{
"type",
"object"},
139 {
"properties", json::object()}}}},
140 {{
"name",
"entropic.context_clear"},
141 {
"description",
"Clear conversation history."},
142 {
"inputSchema", {{
"type",
"object"},
143 {
"properties", json::object()}}}},
144 {{
"name",
"entropic.context_count"},
145 {
"description",
"Return the message count."},
146 {
"inputSchema", {{
"type",
"object"},
147 {
"properties", json::object()}}}},
166 auto arr = json::parse(result_json,
nullptr,
false);
167 if (!arr.is_array()) {
return {}; }
168 for (
auto it = arr.rbegin(); it != arr.rend(); ++it) {
169 if (it->value(
"role",
"") ==
"assistant") {
170 return it->value(
"content",
"");
184 auto s = msg.dump() +
"\n";
185 ::write(fd, s.c_str(), s.size());
197 const std::string& progress_token) {
200 {
"method",
"notifications/progress"},
202 {
"progressToken", progress_token},
203 {
"progress", token_text}
225 int client_fd,
const std::string& call_id) {
226 auto it = args.find(
"prompt");
227 if (it == args.end() || !it->is_string()) {
228 return tool_text(
"error: missing 'prompt' argument");
230 std::string prompt = it->get<std::string>();
233 struct StreamCtx {
int fd; std::string token_id; };
234 StreamCtx sctx{client_fd, call_id};
235 auto on_token = [](
const char* tok,
size_t len,
void* ud) {
236 auto* ctx =
static_cast<StreamCtx*
>(ud);
237 send_progress(ctx->fd, std::string(tok, len), ctx->token_id);
240 handle, prompt.c_str(), on_token, &sctx,
nullptr);
243 return tool_text(std::string(
"error: ") + (msg ? msg :
"unknown"));
247 char* msgs_json =
nullptr;
249 auto text = (msgs_json !=
nullptr)
252 return tool_text(text.empty() ?
"(no response)" : text);
265 std::ostringstream os;
267 <<
"\nmessages: " << count;
269 char* mjson =
nullptr;
271 && mjson !=
nullptr) {
272 os <<
"\nmetrics: " << mjson;
335 s.
text = msg ? msg :
"cancelled";
337 s.
phase =
"cancelled";
340 s.
text = msg ? msg :
"unknown error";
363 if (task.status ==
"queued" || task.status ==
"running") {
364 task.status =
"cancelled";
365 task.phase =
"cancelling";
381 if (task.phase ==
"cancelling") {
return true; }
403 if (bridge ==
nullptr) {
return; }
407 std::this_thread::sleep_for(std::chrono::milliseconds(50));
431 return tool_text(
"conversation cleared");
461 auto tid = args.value(
"task_id", std::string{});
463 auto it = tasks_.find(tid);
464 if (it == tasks_.end()) {
465 return tool_text(
"error: unknown task_id");
467 json status = {{
"status", it->second.status},
468 {
"phase", it->second.phase}};
469 if (!it->second.result.empty()) {
470 auto key = (it->second.status ==
"error") ?
"error" :
"result";
471 status[key] = it->second.result;
485 static std::atomic<uint64_t> counter{0};
486 auto n = counter.fetch_add(1);
487 auto t = std::chrono::steady_clock::now().time_since_epoch().count();
488 std::ostringstream ss;
489 ss << std::hex << (t ^ (n * 2654435761ULL));
490 return "task-" + ss.str();
510 const std::string& call_id) {
511 if (args.value(
"async",
false)) {
514 args.value(
"prompt",
""), task_id, client_fd);
515 return tool_text(
"async task started: " + task_id);
517 return handle_ask(handle, args, client_fd, call_id);
540 const std::string& call_id) {
541 std::string name = params.value(
"name", std::string{});
542 json args = params.value(
"arguments", json::object());
543 if (name ==
"entropic.ask") {
544 return dispatch_ask(handle, bridge, args, client_fd, call_id);
548 else if (name ==
"entropic.status") { result =
handle_status(handle); }
549 else if (name ==
"entropic.context_clear") { result =
handle_clear(handle, bridge); }
550 else if (name ==
"entropic.context_count") { result =
handle_count(handle); }
551 else { result =
tool_text(
"error: unknown tool '" + name +
"'"); }
567 const std::filesystem::path& project_dir)
568 : handle_(handle), config_(config) {
595 std::filesystem::create_directories(parent, ec);
596 ::chmod(parent.c_str(), S_IRWXU);
614 if (::lstat(path.c_str(), &st) != 0) {
615 return errno == ENOENT;
617 bool is_symlink = S_ISLNK(st.st_mode);
618 bool is_socket = S_ISSOCK(st.st_mode);
619 if (!is_socket || is_symlink) {
621 "Refusing to bind: {} is a {} (expected unix socket)",
623 is_symlink ?
"symlink" :
"non-socket file");
625 return is_socket && !is_symlink;
637 auto s = path.string();
638 if (s.size() >=
sizeof(sockaddr_un::sun_path)) {
return false; }
639 struct sockaddr_un addr{};
640 addr.sun_family = AF_UNIX;
641 std::strncpy(addr.sun_path, s.c_str(),
642 sizeof(addr.sun_path) - 1);
643 if (bind(fd,
reinterpret_cast<sockaddr*
>(&addr),
sizeof(addr)) != 0) {
646 ::chmod(s.c_str(), S_IRUSR | S_IWUSR);
647 return listen(fd, 1) == 0;
668 std::filesystem::remove(path);
670 int fd = socket(AF_UNIX, SOCK_STREAM, 0);
673 logger->error(
"Socket setup failed for {}: {}",
674 path.string(), std::strerror(errno));
675 if (fd >= 0) { ::close(fd); }
679 "External MCP bridge ready: project_dir(canonical)={} socket={}",
680 path.parent_path().parent_path().string(), path.string());
700 socklen_t len =
sizeof(cred);
701 if (getsockopt(client_fd, SOL_SOCKET, SO_PEERCRED,
703 logger->warn(
"SO_PEERCRED failed on fd={}: {}",
704 client_fd, std::strerror(errno));
707 if (cred.uid != ::geteuid()) {
709 "Rejecting MCP client on fd={}: peer uid {} != engine uid {}",
710 client_fd, cred.uid, ::geteuid());
733 std::filesystem::weakly_canonical(socket_path_, ec).string();
734 if (ec) { canonical = socket_path_.string(); }
736 std::lock_guard lk(s_bound_sockets_mu);
737 if (!s_bound_sockets.insert(canonical).second) {
739 "External MCP bridge: socket {} already bound by another "
740 "handle in this process; declining to start. Set "
741 "external.socket_path to a distinct path per handle "
742 "if you need per-handle bridges.",
749 if (listen_fd_ < 0) {
750 std::lock_guard lk(s_bound_sockets_mu);
751 s_bound_sockets.erase(canonical);
754 bound_canonical_ = canonical;
756 running_.store(
true);
761 int log_id = handle_ ? handle_->
log_id : 0;
762 accept_thread_ = std::thread([
this, log_id]() {
767 logger->info(
"External MCP bridge listening on {}",
768 socket_path_.string());
793 running_.store(
false);
794 if (listen_fd_ >= 0) {
798 if (accept_thread_.joinable()) {
799 accept_thread_.join();
806 std::vector<std::unique_ptr<ClientThread>> drained;
808 std::lock_guard<std::mutex> lock(client_threads_mutex_);
809 drained = std::move(client_threads_);
810 client_threads_.clear();
812 for (
auto& ct : drained) {
813 if (ct->fd >= 0) { ::shutdown(ct->fd, SHUT_RDWR); }
815 for (
auto& ct : drained) {
816 if (ct->thread.joinable()) { ct->thread.join(); }
821 std::filesystem::remove(socket_path_, ec);
827 if (!bound_canonical_.empty()) {
828 std::lock_guard lk(s_bound_sockets_mu);
829 s_bound_sockets.erase(bound_canonical_);
830 bound_canonical_.clear();
846void ExternalBridge::reap_finished_clients_locked() {
847 auto it = client_threads_.begin();
848 while (it != client_threads_.end()) {
849 if ((*it)->finished.load()) {
850 if ((*it)->thread.joinable()) { (*it)->thread.join(); }
851 it = client_threads_.erase(it);
883void ExternalBridge::accept_loop() {
884 while (running_.load()) {
889 int rc = poll(&pfd, 1, 500);
890 if (rc <= 0) {
continue; }
892 int client_fd = accept(listen_fd_,
nullptr,
nullptr);
893 if (client_fd < 0) {
continue; }
900 logger->info(
"External MCP client connected (fd={})", client_fd);
902 auto ct = std::make_unique<ClientThread>();
906 ClientThread* raw = ct.get();
910 int log_id = handle_ ? handle_->
log_id : 0;
911 ct->thread = std::thread([
this, raw, log_id]() {
913 serve_client(raw->fd);
916 logger->info(
"External MCP client disconnected");
917 raw->finished.store(
true);
920 std::lock_guard<std::mutex> lock(client_threads_mutex_);
921 client_threads_.push_back(std::move(ct));
922 reap_finished_clients_locked();
937 ssize_t n = read(fd, &c, 1);
938 if (n <= 0) {
return {}; }
939 if (c ==
'\n') {
return line; }
954void ExternalBridge::serve_client(
int client_fd) {
957 ExternalBridge* self;
959 ~Unsub() { self->unsubscribe(fd); }
960 } guard{
this, client_fd};
962 while (running_.load()) {
964 if (line.empty()) {
break; }
966 auto response = dispatch(line, client_fd);
967 if (response.empty()) {
continue; }
970 ssize_t written = write(client_fd, response.c_str(),
972 if (written < 0) {
break; }
984 {
"protocolVersion",
"2025-06-18"},
985 {
"serverInfo", {{
"name",
"entropic"},
987 {
"capabilities", {{
"tools", json::object()}}}
1004std::string ExternalBridge::dispatch(
1005 const std::string& request,
int client_fd) {
1006 auto req = json::parse(request,
nullptr,
false);
1008 if (req.is_discarded() || !req.contains(
"id")) {
1009 return req.is_discarded()
1010 ?
rpc_err(
nullptr, -32700,
"Parse error")
1014 json
id = req[
"id"];
1015 std::string method = req.value(
"method", std::string{});
1016 json params = req.value(
"params", json::object());
1020 else if (method ==
"tools/list") { result = {{
"tools",
tool_definitions()}}; }
1021 else if (method ==
"tools/call") {
1022 auto id_str =
id.is_string() ?
id.get<std::string>()
1024 result =
dispatch_tool(handle_,
this, params, client_fd, id_str);
1026 else if (method ==
"shutdown" || method ==
"exit") { result = json::object(); }
1027 else {
return rpc_err(
id, -32601,
"Unknown method: " + method); }
1028 return rpc_ok(
id, result);
1041 std::lock_guard<std::mutex> lock(self->tasks_mutex_);
1044 if (self->observer_call_is_stale()) {
return; }
1045 auto it = self->tasks_for_cancel().find(self->active_task_id_for_observer());
1046 if (it == self->tasks_for_cancel().end()) {
return; }
1049 it->second.phase = (it->second.phase ==
"validating"
1050 || it->second.phase ==
"revising")
1051 ?
"revising" :
"validating";
1068 active_task_id_ = task_id;
1069 attached_gen_ = ++observer_gen_;
1090 active_task_id_.clear();
1109 const std::string& prompt,
1110 const std::string& task_id,
1117 t.
created = std::chrono::steady_clock::now();
1118 tasks_[task_id] = std::move(t);
1123 int log_id = handle_ ? handle_->
log_id : 0;
1124 std::thread([
this, prompt, task_id, client_fd, log_id]() {
1129 char* result_json =
nullptr;
1130 auto err =
entropic_run(handle_, prompt.c_str(), &result_json);
1135 handle_, err, result_json);
1139 auto it = tasks_.find(task_id);
1140 if (it != tasks_.end()) {
1141 it->second.status = final_state.status;
1142 it->second.phase = final_state.phase;
1143 it->second.result = final_state.text;
1151 auto status = final_state.status;
1176 {
"method",
"notifications/progress"},
1178 {
"progressToken", task_id},
1187 logger->info(
"Async task {} completed: {}", task_id, status);
1203 std::lock_guard<std::mutex> lock(subscribers_mutex_);
1204 subscribers_.insert(fd);
1214 std::lock_guard<std::mutex> lock(subscribers_mutex_);
1215 subscribers_.erase(fd);
1252 auto payload = notif.dump() +
"\n";
1254 std::vector<int> snapshot;
1256 std::lock_guard<std::mutex> lock(subscribers_mutex_);
1257 snapshot.assign(subscribers_.begin(), subscribers_.end());
1260 std::vector<int> dead;
1261 for (
int fd : snapshot) {
1262 ssize_t rc = ::send(fd, payload.c_str(), payload.size(),
1263 MSG_DONTWAIT | MSG_NOSIGNAL);
1269 logger->warn(
"Subscriber fd {} send failed (errno={}) — dropping",
1272 }
else if (
static_cast<size_t>(rc) < payload.size()) {
1273 logger->warn(
"Subscriber fd {} partial send ({}/{}) — dropping",
1274 fd, rc, payload.size());
1279 if (!dead.empty()) {
1280 std::lock_guard<std::mutex> lock(subscribers_mutex_);
1281 for (
int fd : dead) { subscribers_.erase(fd); }
1294 const std::string& status,
1295 const std::string& phase) {
1297 auto it = tasks_.find(task_id);
1298 if (it == tasks_.end()) {
return; }
1299 it->second.status = status;
1300 it->second.phase = phase;
1316 auto cutoff = std::chrono::steady_clock::now()
1317 - std::chrono::minutes(15);
1320 for (
auto it = tasks_.begin(); it != tasks_.end(); ) {
1321 if (it->second.created < cutoff) {
1322 if (!sentinel_dir.empty()) {
1323 for (
const char* suffix :
1324 {
".done",
".failed",
".cancelled"}) {
1326 std::filesystem::remove(
1327 sentinel_dir / (it->first + suffix), ec);
1330 it = tasks_.erase(it);
1348 std::filesystem::path root = async_sentinel_root_override_;
1349 if (root.empty() && handle_ !=
nullptr) {
1352 return root.empty() ? std::filesystem::path{} : (root /
"async");
1361 const std::filesystem::path& root) {
1362 async_sentinel_root_override_ = root;
1371 const std::string& status) {
1372 const char* suffix =
".done";
1373 if (status ==
"error") {
1375 }
else if (status ==
"cancelled") {
1376 suffix =
".cancelled";
1397 const std::string& status) {
1399 if (dir.empty()) {
return; }
1401 std::filesystem::create_directories(dir, ec);
1403 logger->warn(
"write_sentinel: mkdir {} failed: {}",
1404 dir.string(), ec.message());
1408 std::ofstream out(path);
1409 if (!out.is_open()) {
1410 logger->warn(
"write_sentinel: open {} failed", path.string());
1413 out << status <<
'\n';
Unix socket MCP bridge for external client access.
bool start()
Start the background accept loop.
void cleanup_expired_tasks()
Remove tasks older than TTL from the registry.
void stop()
Stop the accept loop and close the socket.
void unsubscribe(int fd)
Remove an fd from the subscriber set.
std::mutex tasks_mutex_
Async task mutex (public for dispatch_tool access).
~ExternalBridge()
Destructor — stop if running.
void detach_phase_observer()
Clear the phase observer installed by attach_phase_observer.
nlohmann::json handle_ask_status(const nlohmann::json &args)
Handle entropic.ask_status — check async task state.
std::filesystem::path async_sentinel_dir() const
Sentinel directory (lazy: returns empty path until the engine's log_dir is configured).
ExternalBridge(entropic_handle_t handle, const ExternalMCPConfig &config, const std::filesystem::path &project_dir)
Construct with engine handle and config.
void run_async_ask(const std::string &prompt, const std::string &task_id, int client_fd)
Run an async entropic.ask in a detached background thread.
void broadcast_notification(const nlohmann::json ¬if)
Write a JSON-RPC notification to every subscribed fd.
void set_async_sentinel_root(const std::filesystem::path &root)
Override the async sentinel root directory.
void attach_phase_observer(const std::string &task_id)
Run an async entropic.ask in a background thread.
void write_sentinel(const std::string &task_id, const std::string &status)
Write the sentinel file for an async task completion.
void subscribe(int fd)
Add a connected fd to the subscriber set.
void update_task_phase(const std::string &task_id, const std::string &status, const std::string &phase)
Update status/phase for a tracked task atomically.
std::unordered_map< std::string, AsyncTask > & tasks_for_cancel()
Mutable accessor to the task registry.
gh#59 (v2.3.1): RAII guard — sets thread's current handle_id.
Private definition of the entropic_engine struct.
Public C API for the Entropic inference engine.
ENTROPIC_EXPORT entropic_error_t entropic_set_state_observer(entropic_handle_t handle, void(*observer)(int state, void *user_data), void *user_data)
Register an engine state-change observer.
ENTROPIC_EXPORT entropic_error_t entropic_context_count(entropic_handle_t handle, size_t *count)
Get the number of messages in the conversation.
ENTROPIC_EXPORT entropic_error_t entropic_context_clear(entropic_handle_t handle)
Clear conversation history, starting a new session.
ENTROPIC_EXPORT entropic_error_t entropic_run(entropic_handle_t handle, const char *input, char **result_json)
Synchronous agentic loop.
ENTROPIC_EXPORT entropic_error_t entropic_metrics_json(entropic_handle_t handle, char **out)
Get loop metrics from the most recent run as JSON.
ENTROPIC_EXPORT const char * entropic_version(void)
Get the library version string.
ENTROPIC_EXPORT entropic_error_t entropic_interrupt(entropic_handle_t handle)
Interrupt a running generation.
ENTROPIC_EXPORT void entropic_free(void *ptr)
Free memory allocated by the engine or entropic_alloc().
ENTROPIC_EXPORT entropic_error_t entropic_context_get(entropic_handle_t handle, char **messages_json)
Get the current conversation history as a JSON array.
ENTROPIC_EXPORT entropic_error_t entropic_run_streaming(entropic_handle_t handle, const char *input, void(*on_token)(const char *token, size_t len, void *user_data), void *user_data, int *cancel_flag)
Streaming agentic loop with token callback.
@ ENTROPIC_AGENT_STATE_VERIFYING
Post-generation verification.
entropic_error_t
Error codes returned by all C API functions.
@ ENTROPIC_ERROR_CANCELLED
Operation cancelled via cancel token.
@ ENTROPIC_ERROR_INTERRUPTED
Operation interrupted via entropic_interrupt (v1.8.9)
ENTROPIC_EXPORT const char * entropic_last_error(entropic_handle_t handle)
Get the last error message for a handle.
Unix socket MCP bridge — exposes a running engine to external clients.
spdlog initialization and logger access.
ENTROPIC_EXPORT std::shared_ptr< spdlog::logger > get(const std::string &name)
Get or create a named logger.
Activate model on GPU (WARM → ACTIVE).
static std::string generate_task_id()
Generate a simple UUID-like task ID.
static bool any_cancelling_left(ExternalBridge *bridge)
True while any task is still phase=cancelling.
static json handle_ask(entropic_handle_t handle, const json &args, int client_fd, const std::string &call_id)
Handle entropic.ask — stream tokens then return final text.
std::filesystem::path compute_socket_path(const std::filesystem::path &project_dir)
Compute project-unique Unix socket path for self-detection.
static const char * sentinel_suffix_for_status(const std::string &status)
Map a terminal status string to a sentinel filename suffix.
static int create_listen_socket(const std::filesystem::path &path)
Create, bind, and listen on a unix domain socket.
@ ok
Tool dispatched, returned non-empty content.
static void phase_observer_cb(int state, void *ud)
State observer that projects VERIFYING onto task phase.
static bool socket_path_safe(const std::filesystem::path &path)
Reject a pre-existing path that is a symlink or non-socket.
static json handle_count(entropic_handle_t handle)
Handle entropic.context_count.
static bool peer_uid_matches(int client_fd)
Validate that the connecting peer shares the engine's UID.
static json tool_text(const std::string &text)
Wrap text in MCP tool result shape.
static bool mark_tasks_cancelling(ExternalBridge *bridge)
Mark every queued/running task as cancelling.
static void send_progress(int fd, const std::string &token_text, const std::string &progress_token)
Send an MCP progress notification with a text token.
static bool bind_and_listen(int fd, const std::filesystem::path &path)
Bind+listen on an AF_UNIX socket, applying 0600 perms.
static json handle_clear(entropic_handle_t handle, ExternalBridge *bridge)
entropic.context_clear MCP tool handler.
static void cancel_inflight_async_tasks(entropic_handle_t handle, ExternalBridge *bridge)
Cancel any async tasks currently running on the bridge.
static AsyncFinalState derive_async_final_state(entropic_handle_t handle, entropic_error_t err, char *result_json)
Translate entropic_run's return code into a final task state.
static json tool_definitions()
MCP tool definitions exposed by the bridge.
static void write_json_line(int fd, const json &msg)
Write a JSON-RPC line to a socket fd.
static std::string extract_final_text(const char *result_json)
Extract the last assistant message from a run result JSON.
static std::string rpc_err(const json &id, int code, const std::string &msg)
Build a JSON-RPC error response.
static json initialize_result()
Build the MCP initialize response payload.
static json dispatch_tool(entropic_handle_t handle, ExternalBridge *bridge, const json ¶ms, int client_fd, const std::string &call_id)
Dispatch a tools/call to the appropriate handler.
static std::string read_line(int fd)
Read one newline-delimited line from a socket fd.
static json handle_status(entropic_handle_t handle)
Handle entropic.status.
static json dispatch_ask(entropic_handle_t handle, ExternalBridge *bridge, const json &args, int client_fd, const std::string &call_id)
Route entropic.ask — sync (streaming) or async.
static std::string rpc_ok(const json &id, const json &result)
Build a JSON-RPC success response.
static void prepare_socket_dir(const std::filesystem::path &parent)
Prepare the socket containing directory with 0700 perms.
Handle entropic.context_clear.
std::string text
result or error message
std::string status
done | error | cancelled
std::string phase
done | failed | cancelled
Async task state for background entropic.ask runs.
std::string phase
queued, running, running:<tier>, done, failed, cancelled (P1-5)
std::chrono::steady_clock::time_point created
For TTL cleanup.
std::string status
queued | running | done | error | cancelled (2.0.6-rc16)
External MCP server configuration (Entropic-as-server).
std::optional< std::filesystem::path > socket_path
Socket path (nullopt = derived)
std::filesystem::path log_dir
Session log directory (session.log + session_model.log).
Engine handle struct — owns all subsystems.
int log_id
gh#59 (v2.3.1): unique handle id for per-handle log routing via entropic::log::HandleAwareSink.
entropic::ParsedConfig config
Parsed config.