Entropic 2.3.8
Local-first agentic inference engine
Loading...
Searching...
No Matches
health_monitor.cpp
Go to the documentation of this file.
1// SPDX-License-Identifier: Apache-2.0
10
11static auto logger = entropic::log::get("mcp.health_monitor");
12
13namespace entropic {
14
23 ReconnectPolicy policy,
24 uint32_t health_check_interval_ms)
25 : policy_(std::move(policy)),
26 health_check_interval_ms_(health_check_interval_ms) {}
27
36
45 const std::string& name,
46 ExternalMCPClient* client) {
47
48 std::lock_guard<std::mutex> lock(watched_mutex_);
49 WatchEntry entry;
50 entry.client = client;
51 entry.status = client->is_connected() ? "connected" : "disconnected";
52 entry.reconnect_attempt = 0;
53 entry.next_action = std::chrono::steady_clock::now();
54 watched_[name] = entry;
55
56 logger->info("Watching server '{}'", name);
57 wake_cv_.notify_one();
58}
59
66void HealthMonitor::unwatch(const std::string& name) {
67 std::lock_guard<std::mutex> lock(watched_mutex_);
68 watched_.erase(name);
69 logger->info("Unwatched server '{}'", name);
70}
71
79 status_callback_ = std::move(cb);
80}
81
88 if (running_) {
89 return;
90 }
91 running_ = true;
92 monitor_thread_ = std::thread(
93 &HealthMonitor::monitor_loop, this);
94 logger->info("Health monitor started");
95}
96
103 if (!running_.exchange(false)) {
104 return;
105 }
106 wake_cv_.notify_all();
107 if (monitor_thread_.joinable()) {
108 monitor_thread_.join();
109 }
110 logger->info("Health monitor stopped");
111}
112
119 std::vector<HealthEvent> events;
120 {
121 std::lock_guard<std::mutex> lock(event_mutex_);
122 events.swap(event_queue_);
123 }
124
125 for (const auto& event : events) {
126 if (status_callback_) {
127 status_callback_(event);
128 }
129 }
130}
131
137void HealthMonitor::monitor_loop() {
138 constexpr auto poll_interval = std::chrono::milliseconds(500);
139
140 while (running_) {
141 {
142 std::lock_guard<std::mutex> lock(watched_mutex_);
143 auto now = std::chrono::steady_clock::now();
144 for (auto& [name, entry] : watched_) {
145 if (now >= entry.next_action) {
146 check_server(name, entry);
147 }
148 }
149 }
150
151 std::unique_lock<std::mutex> lock(wake_mutex_);
152 wake_cv_.wait_for(lock, poll_interval,
153 [this] { return !running_.load(); });
154 }
155}
156
164void HealthMonitor::check_server(
165 const std::string& name,
166 WatchEntry& entry) {
167
168 bool alive = entry.client->is_connected();
169
170 if (entry.status == "connected" && !alive) {
171 // Connection lost — start reconnecting
172 auto old = entry.status;
173 entry.status = "disconnected";
174 entry.reconnect_attempt = 0;
175 post_event(name, old, entry.status);
176 logger->warn("Server '{}' disconnected", name);
177 attempt_reconnect(name, entry);
178 return;
179 }
180
181 if (entry.status == "disconnected" ||
182 entry.status == "reconnecting") {
183 attempt_reconnect(name, entry);
184 return;
185 }
186
187 // Connected + healthy: schedule next check
188 if (health_check_interval_ms_ > 0) {
189 entry.next_action = std::chrono::steady_clock::now() +
190 std::chrono::milliseconds(health_check_interval_ms_);
191 } else {
192 entry.next_action = std::chrono::steady_clock::now() +
193 std::chrono::seconds(5);
194 }
195}
196
204void HealthMonitor::attempt_reconnect(
205 const std::string& name,
206 WatchEntry& entry) {
207
208 if (policy_.exhausted(entry.reconnect_attempt)) {
209 if (entry.status != "error") {
210 auto old = entry.status;
211 entry.status = "error";
212 post_event(name, old, "error");
213 logger->error("Server '{}' reconnection exhausted "
214 "after {} attempts",
215 name, entry.reconnect_attempt);
216 }
217 entry.next_action = std::chrono::steady_clock::time_point::max();
218 return;
219 }
220
221 if (entry.status != "reconnecting") {
222 auto old = entry.status;
223 entry.status = "reconnecting";
224 post_event(name, old, "reconnecting");
225 }
226
227 logger->info("Reconnecting to '{}' (attempt {})",
228 name, entry.reconnect_attempt + 1);
229
230 if (entry.client->connect()) {
231 on_reconnect_success(name, entry);
232 return;
233 }
234
235 // Schedule next attempt with backoff
236 auto delay = policy_.delay_ms(entry.reconnect_attempt);
237 entry.reconnect_attempt++;
238 entry.next_action = std::chrono::steady_clock::now() +
239 std::chrono::milliseconds(delay);
240
241 logger->info("Server '{}' reconnect failed, "
242 "retry in {}ms", name, delay);
243}
244
252void HealthMonitor::on_reconnect_success(const std::string& name,
253 WatchEntry& entry) {
254 auto [added, removed] = entry.client->refresh_tools();
255 entry.status = "connected";
256 entry.reconnect_attempt = 0;
257
258 HealthEvent evt;
259 evt.server_name = name;
260 evt.old_status = "reconnecting";
261 evt.new_status = "connected";
262 evt.added_tools = std::move(added);
263 evt.removed_tools = std::move(removed);
264 {
265 std::lock_guard<std::mutex> lock(event_mutex_);
266 event_queue_.push_back(std::move(evt));
267 }
268
269 logger->info("Server '{}' reconnected", name);
270 entry.next_action = std::chrono::steady_clock::now();
271}
272
281void HealthMonitor::post_event(
282 const std::string& name,
283 const std::string& old_status,
284 const std::string& new_status) {
285
286 HealthEvent evt;
287 evt.server_name = name;
288 evt.old_status = old_status;
289 evt.new_status = new_status;
290 {
291 std::lock_guard<std::mutex> lock(event_mutex_);
292 event_queue_.push_back(std::move(evt));
293 }
294}
295
296} // namespace entropic
Client for an external MCP server (stdio or SSE).
bool is_connected() const
Check connection state.
void start()
Start the monitoring thread.
void watch(const std::string &name, ExternalMCPClient *client)
Start monitoring a server.
void stop()
Stop monitoring and all reconnection attempts.
HealthMonitor(ReconnectPolicy policy, uint32_t health_check_interval_ms=0)
Construct with reconnection policy.
std::function< void(const HealthEvent &)> StatusCallback
Callback invoked on engine thread when processing events.
void process_events()
Drain event queue, invoke callbacks (call on engine thread).
~HealthMonitor()
Destructor — stops monitor if running.
void set_status_callback(StatusCallback cb)
Set callback for status change events.
void unwatch(const std::string &name)
Stop monitoring a server.
Exponential backoff with jitter for reconnection attempts.
bool exhausted(uint32_t attempt) const
Check if retries are exhausted.
uint32_t delay_ms(uint32_t attempt) const
Compute delay for the given attempt number.
Monitors external MCP server health and triggers reconnection.
spdlog initialization and logger access.
ENTROPIC_EXPORT std::shared_ptr< spdlog::logger > get(const std::string &name)
Get or create a named logger.
Definition logging.cpp:211
Activate model on GPU (WARM → ACTIVE).