Entropic 2.3.8
Local-first agentic inference engine
Loading...
Searching...
No Matches
response_generator.cpp
Go to the documentation of this file.
1// SPDX-License-Identifier: Apache-2.0
12
13#include <cstring>
14#include <functional>
15#include <unordered_map>
16
17static auto logger = entropic::log::get("core.response_generator");
18
19namespace entropic {
20
22static std::unordered_map<std::string, size_t> s_tier_system_hash;
23
35static void log_prompt(const std::vector<Message>& messages,
36 const std::string& tier) {
37 logger->info("─── Prompt ({} messages, tier={}) ───",
38 messages.size(), tier);
39 for (size_t i = 0; i < messages.size(); ++i) {
40 if (messages[i].role == "system") {
41 size_t h = std::hash<std::string>{}(messages[i].content);
42 size_t prev = s_tier_system_hash[tier];
43 s_tier_system_hash[tier] = h;
44 if (h != prev || prev == 0) {
45 logger->info("[{}] role=system hash={:016x} "
46 "prev={:016x}\n{}",
47 i, h, prev, messages[i].content);
48 } else {
49 logger->info("[{}] role=system [unchanged, {} chars, "
50 "hash={:016x}]",
51 i, messages[i].content.size(), h);
52 }
53 } else {
54 logger->info("[{}] role={}\n{}", i, messages[i].role,
55 messages[i].content);
56 }
57 }
58 logger->info("─── End prompt ───");
59}
60
71 const InferenceInterface& inference,
72 const LoopConfig& loop_config,
73 EngineCallbacks& callbacks,
74 GenerationEvents events)
75 : inference_(inference),
76 loop_config_(loop_config),
77 callbacks_(callbacks),
78 events_(events) {}
79
88 lock_tier_if_needed(ctx);
89
90 if (loop_config_.stream_output) {
91 return generate_streaming(ctx);
92 }
93 return generate_batch(ctx);
94}
95
105 const std::string& content,
106 const std::string& tool_calls_json) {
107 if (inference_.is_response_complete == nullptr) {
108 return !content.empty();
109 }
110 return inference_.is_response_complete(
111 content.c_str(), tool_calls_json.c_str(),
112 inference_.adapter_data) != 0;
113}
114
121void ResponseGenerator::lock_tier_if_needed(LoopContext& ctx) {
122 if (!ctx.locked_tier.empty()) {
123 if (callbacks_.on_tier_selected != nullptr) {
124 callbacks_.on_tier_selected(ctx.locked_tier.c_str(),
125 callbacks_.user_data);
126 }
127 return;
128 }
129
130 if (inference_.route == nullptr) {
131 ctx.locked_tier = "default";
132 return;
133 }
134
135 auto msgs_json = serialize_messages(ctx.messages);
136 char* result_json = nullptr;
137 int rc = inference_.route(msgs_json.c_str(), &result_json,
138 inference_.orchestrator_data);
139 if (rc == 0 && result_json != nullptr) {
140 ctx.locked_tier = result_json;
141 if (inference_.free_fn != nullptr) {
142 inference_.free_fn(result_json);
143 }
144 } else {
145 ctx.locked_tier = "default";
146 logger->warn("Routing failed (rc={}), using default tier", rc);
147 }
148
149 logger->info("Locked tier: {}", ctx.locked_tier);
150 if (callbacks_.on_tier_selected != nullptr) {
151 callbacks_.on_tier_selected(ctx.locked_tier.c_str(),
152 callbacks_.user_data);
153 }
154}
155
156// ── Streaming token accumulator context ──────────────────
157
164 std::string content;
167 const HookInterface* hooks;
168 int token_index = 0;
169 bool interrupted = false;
175 int* cancel_flag = nullptr;
178 void (*observer)(const char*, size_t, void*) = nullptr;
179 void* observer_data = nullptr;
180};
181
191 const char* token,
192 size_t len,
193 void* user_data) {
194 auto* acc = static_cast<StreamAccumulator*>(user_data);
195
196 // gh#20 (v2.1.5): two coupled bugs lived here.
197 //
198 // (A) The previous implementation set `acc->interrupted = true`
199 // and returned early WITHOUT propagating the interrupt to the
200 // backend. The backend's cancel_flag stayed 0, so llama_cpp
201 // ran to natural EOS — up to 60s of wasted decode after the
202 // user pressed Ctrl-C.
203 //
204 // (B) The early return also dropped every post-interrupt token
205 // from `acc->content`. When the backend finally finished
206 // cleanly, the response_generator built the iter result from
207 // this truncated buffer (e.g. 7 chars instead of the 107
208 // decoded tokens forming a valid tool call), throwing away
209 // fully-formed output.
210 //
211 // The fix raises the cancel flag for the backend AND keeps
212 // appending the token so the content buffer is complete up to
213 // the cancel point. The backend stops on its next loop iteration
214 // (<= 1 token wall-time); whatever made it through is preserved.
215 bool just_interrupted = acc->events->interrupt != nullptr
216 && acc->events->interrupt->load()
217 && !acc->interrupted;
218 if (just_interrupted) {
219 acc->interrupted = true;
220 // gh#49 (v2.1.12): log the cancel-flag raise so a session
221 // log can confirm the per-token interrupt poll observed the
222 // engine-level flag. Pre-v2.1.12 the bissell-llm-studio
223 // repro saw the "Engine interrupted" line on 0->1 transition
224 // but no evidence the per-token poll ever fired — this log
225 // is the first observable receipt of the propagation.
226 logger->info("Stream interrupt observed at token {}; "
227 "raising backend cancel_flag",
228 acc->token_index);
229 if (acc->cancel_flag != nullptr) {
230 *acc->cancel_flag = 1;
231 }
232 }
233
234 acc->content.append(token, len);
235 if (acc->callbacks->on_stream_chunk != nullptr) {
236 acc->callbacks->on_stream_chunk(token, len,
237 acc->callbacks->user_data);
238 }
239
240 // Global observer — fires on every token regardless of whether
241 // the caller registered on_stream_chunk. (2.0.6-rc16)
242 if (acc->observer != nullptr) {
243 acc->observer(token, len, acc->observer_data);
244 }
245
246 // Hook: ON_STREAM_TOKEN (v1.9.1)
247 if (acc->hooks != nullptr && acc->hooks->fire_info != nullptr) {
248 std::string json = "{\"token_index\":"
249 + std::to_string(acc->token_index++) + "}";
250 acc->hooks->fire_info(acc->hooks->registry,
251 ENTROPIC_HOOK_ON_STREAM_TOKEN, json.c_str());
252 }
253}
254
268static std::string resolve_stream_finish_reason(int rc,
269 size_t content_size) {
270 std::string reason;
271 if (rc == ENTROPIC_ERROR_CANCELLED) {
272 logger->info("Stream cancelled by interrupt after {} chars",
273 content_size);
274 reason = "interrupted";
275 } else if (rc != 0 && content_size > 0) {
276 logger->warn("Stream failed (rc={}) after {} chars — "
277 "preserving partial", rc, content_size);
278 reason = "partial";
279 } else if (rc != 0) {
280 logger->error("Stream failed (rc={}) with no partial content", rc);
281 reason = "error";
282 } else {
283 reason = "stop";
284 }
285 return reason;
286}
287
296std::pair<std::string, std::string> ResponseGenerator::prepare_prompts(
297 LoopContext& ctx, const char* mode) {
298 auto messages = inject_tool_prompt(ctx.messages, ctx.locked_tier);
299 messages = inject_engine_state_reminder(messages, ctx);
300 logger->info("Generate ({}): tier={}, {} messages",
301 mode, ctx.locked_tier, messages.size());
302 log_prompt(messages, ctx.locked_tier);
303 return {serialize_messages(messages),
304 build_params_json(ctx.locked_tier)};
305}
306
314GenerateResult ResponseGenerator::generate_streaming(LoopContext& ctx) {
315 if (inference_.generate_stream == nullptr) {
316 logger->warn("No streaming function, falling back to batch");
317 return generate_batch(ctx);
318 }
319
320 auto [msgs_json, params_json] = prepare_prompts(ctx, "stream");
321
322 int cancel_flag = 0;
323 StreamAccumulator acc;
324 acc.callbacks = &callbacks_;
325 acc.events = &events_;
326 acc.hooks = &hooks_;
327 // gh#20 (v2.1.5): give the token callback a path to raise the
328 // backend cancel flag when an interrupt is observed. Without
329 // this, the previous implementation would early-return out of
330 // the token callback without ever telling the backend to stop.
331 acc.cancel_flag = &cancel_flag;
332 // Wire the persistent stream observer so every token — including
333 // batch entropic_run and delegate child-loop generations — reaches
334 // any registered observer. (P0-1, 2.0.6-rc16)
335 acc.observer = stream_observer_;
336 acc.observer_data = stream_observer_data_;
337
338 int rc = inference_.generate_stream(
339 msgs_json.c_str(), params_json.c_str(),
341 &cancel_flag, inference_.backend_data);
342
343 GenerateResult result;
344 result.finish_reason = resolve_stream_finish_reason(rc,
345 acc.content.size());
346 // Issue #3 (v2.1.1): inbound boundary from llama_cpp. Models can emit
347 // malformed UTF-8 mid-stream (partial multi-byte runs under XML-tool-call
348 // pressure, decoder desyncs). Sanitize ONCE at message-finalization,
349 // never per-token — a multi-byte codepoint may split across token
350 // boundaries and per-token sanitize would corrupt valid output.
351 // See include/entropic/mcp/utf8_sanitize.h for the boundary policy.
352 result.content = mcp::sanitize_utf8(acc.content);
353 result.tool_calls_json = "[]";
354 logger->info("Generate complete (stream): finish={}, {} chars",
355 result.finish_reason, result.content.size());
356 return result;
357}
358
366GenerateResult ResponseGenerator::generate_batch(LoopContext& ctx) {
367 if (inference_.generate == nullptr) {
368 logger->error("No generate function available");
369 return {"", "[]", "error"};
370 }
371
372 auto [msgs_json, params_json] = prepare_prompts(ctx, "batch");
373 char* result_json = nullptr;
374
375 int rc = inference_.generate(
376 msgs_json.c_str(), params_json.c_str(),
377 &result_json, inference_.backend_data);
378
379 GenerateResult result;
380 if (rc == 0 && result_json != nullptr) {
381 // Issue #3 (v2.1.1): inbound boundary, batch path. See the
382 // streaming branch above for rationale; same policy applies.
383 result.content = mcp::sanitize_utf8(result_json);
384 result.finish_reason = "stop";
385 result.tool_calls_json = "[]";
386 if (inference_.free_fn != nullptr) {
387 inference_.free_fn(result_json);
388 }
389 // Fire observer once with full content so the non-streaming
390 // fallback still reaches registered observers. (2.0.6-rc16)
391 if (stream_observer_ != nullptr && !result.content.empty()) {
392 stream_observer_(result.content.data(),
393 result.content.size(),
394 stream_observer_data_);
395 }
396 } else {
397 result.finish_reason = "error";
398 logger->error("Generate failed (rc={})", rc);
399 }
400 logger->info("Generate complete (batch): finish={}, {} chars",
401 result.finish_reason, result.content.size());
402 return result;
403}
404
419std::string ResponseGenerator::handle_pause(
420 LoopContext& ctx,
421 const std::string& partial) {
422 ctx.state = AgentState::PAUSED;
423 if (callbacks_.on_state_change != nullptr) {
424 callbacks_.on_state_change(
425 static_cast<int>(AgentState::PAUSED),
426 callbacks_.user_data);
427 }
428 // gh#40 fallout (v2.1.10): persistent slot fires alongside the
429 // legacy on_state_change so consumers see PAUSED during
430 // streaming runs where the legacy callbacks_ struct has been
431 // overwritten by run_streaming's set_callbacks() shuffle.
432 if (state_observer_ != nullptr) {
433 state_observer_(static_cast<int>(AgentState::PAUSED),
434 state_observer_data_);
435 }
436
437 char* injection = nullptr;
438 if (callbacks_.on_pause_prompt != nullptr) {
439 callbacks_.on_pause_prompt(partial.c_str(), &injection,
440 callbacks_.user_data);
441 }
442
443 if (injection == nullptr) {
444 if (events_.interrupt != nullptr) {
445 events_.interrupt->store(true);
446 }
447 return partial;
448 }
449
450 std::string inj(injection);
451 if (inj.empty()) {
452 ctx.state = AgentState::EXECUTING;
453 return partial;
454 }
455
456 // Injection provided: append partial + injection to messages
457 if (!partial.empty()) {
458 Message partial_msg;
459 partial_msg.role = "assistant";
460 partial_msg.content = partial + "\n\n[Generation paused by user]";
461 ctx.messages.push_back(std::move(partial_msg));
462 }
463 Message inject_msg;
464 inject_msg.role = "user";
465 inject_msg.content = "[User interjection]: " + inj
466 + "\n\nPlease continue with this in mind.";
467 ctx.messages.push_back(std::move(inject_msg));
468
469 ctx.state = AgentState::EXECUTING;
470 return "";
471}
472
485static void json_escape_into(const std::string& s, std::string& out) {
486 for (char c : s) {
487 switch (c) {
488 case '"': out += "\\\""; break;
489 case '\\': out += "\\\\"; break;
490 case '\n': out += "\\n"; break;
491 case '\r': out += "\\r"; break;
492 case '\t': out += "\\t"; break;
493 default: out += c; break;
494 }
495 }
496}
497
508 const std::vector<ContentPart>& parts, std::string& out) {
509 out += '[';
510 for (size_t i = 0; i < parts.size(); ++i) {
511 if (i > 0) { out += ','; }
512 if (parts[i].type == ContentPartType::IMAGE) {
513 out += R"({"type":"image","path":")";
514 json_escape_into(parts[i].image_path, out);
515 out += R"(","url":")";
516 json_escape_into(parts[i].image_url, out);
517 out += R"("})";
518 } else {
519 out += R"({"type":"text","text":")";
520 json_escape_into(parts[i].text, out);
521 out += R"("})";
522 }
523 }
524 out += ']';
525}
526
541std::string ResponseGenerator::serialize_messages(
542 const std::vector<Message>& messages) {
543 std::string json = "[";
544 for (size_t i = 0; i < messages.size(); ++i) {
545 if (i > 0) { json += ','; }
546 json += "{\"role\":\"" + messages[i].role + "\",\"content\":";
547 if (messages[i].content_parts.empty()) {
548 json += '"';
549 json_escape_into(messages[i].content, json);
550 json += '"';
551 } else {
552 serialize_content_parts(messages[i].content_parts, json);
553 }
554 json += '}';
555 }
556 json += ']';
557 return json;
558}
559
566std::string ResponseGenerator::build_params_json(
567 const std::string& tier) {
568 if (tier.empty()) { return "{}"; }
569 return "{\"tier\":\"" + tier + "\"}";
570}
571
586std::vector<Message> ResponseGenerator::inject_tool_prompt(
587 const std::vector<Message>& messages,
588 const std::string& tier) {
589 if (inference_.get_tool_prompt == nullptr) { return messages; }
590
591 char* tool_prompt = nullptr;
592 int rc = inference_.get_tool_prompt(
593 tier.c_str(), &tool_prompt, inference_.tool_prompt_data);
594 if (rc != 0 || tool_prompt == nullptr) { return messages; }
595
596 std::string prompt_str(tool_prompt);
597 if (inference_.free_fn) { inference_.free_fn(tool_prompt); }
598
599 auto result = messages;
600 for (auto& msg : result) {
601 if (msg.role == "system") {
602 msg.content += "\n\n" + prompt_str;
603 break;
604 }
605 }
606 return result;
607}
608
628std::vector<Message> ResponseGenerator::inject_engine_state_reminder(
629 const std::vector<Message>& messages,
630 const LoopContext& ctx) {
631 int max_iter = ctx.effective_max_iterations >= 0
632 ? ctx.effective_max_iterations
633 : loop_config_.max_iterations;
634 std::string reminder = "[engine] iteration "
635 + std::to_string(ctx.metrics.iterations)
636 + "/" + std::to_string(max_iter)
637 + ", tool calls so far: "
638 + std::to_string(ctx.metrics.tool_calls) + ".";
639
640 // Demo ask #2 (v2.1.0): if the previous turn was validator-rejected,
641 // surface the reason so the model knows WHY it's being asked again.
642 // Engine clears pending_validation_feedback after this turn — the
643 // line is one-shot.
644 if (!ctx.pending_validation_feedback.empty()) {
645 reminder += "\n[engine] previous turn rejected: "
646 + ctx.pending_validation_feedback;
647 }
648 // Demo ask #5 (v2.1.0): anti-spiral primitive. ToolExecutor
649 // populated this when consecutive_same_tool_calls hit
650 // max_consecutive_same_tool. Same one-shot lifecycle as the
651 // validation feedback above; engine clears after this turn.
652 if (!ctx.pending_anti_spiral_warning.empty()) {
653 reminder += "\n[engine] anti-spiral: "
654 + ctx.pending_anti_spiral_warning;
655 }
656
657 auto result = messages;
658 Message reminder_msg;
659 reminder_msg.role = "user";
660 reminder_msg.content = std::move(reminder);
661 result.push_back(std::move(reminder_msg));
662 return result;
663}
664
665} // namespace entropic
GenerateResult generate_response(LoopContext &ctx)
Generate model response, routing tier first if needed.
ResponseGenerator(const InferenceInterface &inference, const LoopConfig &loop_config, EngineCallbacks &callbacks, GenerationEvents events)
Construct a response generator.
bool is_response_complete(const std::string &content, const std::string &tool_calls_json)
Check if the last response indicates completion.
Error types for cross-.so error reporting.
@ ENTROPIC_ERROR_CANCELLED
Operation cancelled via cancel token.
Definition error.h:48
@ ENTROPIC_HOOK_ON_STREAM_TOKEN
2: Each streaming token emitted
Definition hooks.h:38
spdlog initialization and logger access.
ENTROPIC_EXPORT std::shared_ptr< spdlog::logger > get(const std::string &name)
Get or create a named logger.
Definition logging.cpp:211
Activate model on GPU (WARM → ACTIVE).
@ IMAGE
Image content (local path or data URI)
static std::unordered_map< std::string, size_t > s_tier_system_hash
Per-tier system prompt hash for diff detection across delegations.
static void log_prompt(const std::vector< Message > &messages, const std::string &tier)
Log the full assembled prompt (all messages, no truncation).
static void serialize_content_parts(const std::vector< ContentPart > &parts, std::string &out)
Serialize a single multimodal content_parts array (gh#37, v2.1.8).
static void json_escape_into(const std::string &s, std::string &out)
Serialize messages to JSON for inference interface.
static std::string resolve_stream_finish_reason(int rc, size_t content_size)
Resolve a stream's finish_reason from rc + content size.
static void stream_token_callback(const char *token, size_t len, void *user_data)
Token callback for streaming generation.
Response generation subsystem for the agentic loop.
Callback function pointer types for engine events.
void(* on_tier_selected)(const char *tier, void *ud)
Tier routing result.
void * user_data
Opaque pointer passed to all callbacks.
void(* on_pause_prompt)(const char *partial, char **injection, void *ud)
Pause: get injection.
void(* on_state_change)(int state, void *ud)
AgentState as int.
Result of a generate_response call.
Atomic flags for interrupt/pause signaling.
std::atomic< bool > * interrupt
Hard interrupt flag.
Configuration for the agentic loop.
int max_iterations
Max loop iterations before forced stop.
bool stream_output
Stream vs batch generation.
Mutable state carried through the agentic loop.
std::vector< Message > messages
Conversation history.
std::string locked_tier
Tier locked for this loop ("" = none)
Context passed to the streaming token callback.
const HookInterface * hooks
Hook dispatch (v1.9.1)
void * observer_data
Observer user_data.
std::string content
Accumulated content.
void(* observer)(const char *, size_t, void *)
Global observer — fires on every token alongside callbacks->on_stream_chunk.
EngineCallbacks * callbacks
Callback reference.
bool interrupted
Set when interrupt detected.
int token_index
Token counter (v1.9.1)
GenerationEvents * events
Event flags.
int * cancel_flag
Pointer to the backend's cancel flag (gh#20, v2.1.5).
UTF-8 validation + replacement at every system boundary where bytes change ownership.