The agentic chat loop, end to end
Follow one chat request from the SSE endpoint through the reactive turn loop to tool dispatch — the system's beating heart, doc claims bound to the code.
backend/anylegal_oss/workspace/agentic_chat_async.py2189 lines · AgenticWorkspaceChatAsync._build_initial_messages L841–1050
Outline 30 symbols
- _get_tokenizer function
- _default_system_prompt function
- AgenticEvent class
- to_dict method
- AgenticWorkspaceChatAsync class
- __init__ method
- run_async method
- _build_initial_messages method
- _default_system_prompt method
- _count_tokens method
- _estimate_tokens method
- _context_usage_pct method
- _handle_empty_truncated_response method
- _emit_plan_state method
- _session_loaded_research_skill method
- _detect_planner_template method
- _rehydrate_plan method
- _plan_and_execute method
- _scoped_reactive_loop method
- _reactive_loop method
- _auto_compact method
- _call_llm method
- _call_llm_streaming method
- _extract_content method
- _extract_tool_calls method
- _parse_tool_arguments method
- _calculate_cost method
- _execute_tool method
- cancel method
- create_agentic_chat_async function
1"""
2Agentic Workspace Chat — Async
3
4Async implementation of the agentic loop for FastAPI with async/await throughout.
5Parallel to the sync ``agentic_chat.py`` used by the Flask app.
6"""
7
8import asyncio
9import json
10import logging
11import os
12import re
13import uuid
14from dataclasses import dataclass, field
15from datetime import datetime, timezone
16from pathlib import Path
17from typing import Any, AsyncGenerator, Dict, List, Optional
18
19import httpx
20
21_LLM_READ_TIMEOUT = float(os.getenv("LLM_READ_TIMEOUT_SECONDS", "300"))
22LLM_HTTP_TIMEOUT = httpx.Timeout(connect=30.0, read=_LLM_READ_TIMEOUT, write=30.0, pool=30.0)
23
24_TOKENIZER = None
25_TOKENIZER_NAME = "o200k_base"
26
27def _get_tokenizer():
28 global _TOKENIZER
29 if _TOKENIZER is None:
30 import tiktoken
31 _TOKENIZER = tiktoken.get_encoding(_TOKENIZER_NAME)
32 return _TOKENIZER
33
34from.session import WorkspaceSession
35from.tools.tool_executor import AsyncToolExecutor, ToolResult
36from.tools.workspace_tools import WORKSPACE_TOOLS, get_workspace_tools
37
38from anylegal_oss.db import async_db
39from anylegal_oss.services.compaction.compactor import perform_compaction
40from anylegal_oss.state.transcript import flush_session_storage, record_transcript
41
42def _default_system_prompt(self) -> str:
43 """Load system prompt from prompts/system_prompt.md file."""
44 prompt_path = os.path.join(
45 os.path.dirname(__file__), "prompts", "system_prompt.md"
46 )
47 try:
48 with open(prompt_path, "r", encoding="utf-8") as f:
49 prompt = f.read()
50
51 # for the schema, and the "CRITICAL: Skills Contain the Required
52
53 try:
54 from.skills.skill_loader import create_skill_loader
55 loader = create_skill_loader()
56 skills = loader.discover_skills()
57
58 if skills:
59 skills_section = "\n## Available Skills\n\n"
60 for skill in skills:
61 desc = skill.description or skill.name
62 skills_section += f"- **{skill.name}** — {desc}\n"
63 skills_section += (
64 "\n"
65 "Invoke any of these via the `Skill` tool: "
66 "`Skill(skill=\"<name>\")`. The full procedure is "
67 "returned as the tool result — follow it on the next "
68 "turn. **Do NOT read the SKILL.md file** via "
69 "`read_document` — use the `Skill` tool instead so "
70 "the tool pool is correctly scoped.\n"
71 )
72
73 prompt = prompt.rstrip() + skills_section
74 except Exception as e:
75 logger.warning(f"Failed to load skills for system prompt: {e}")
76
77 return prompt
78 except FileNotFoundError:
79 logger.warning(f"System prompt file not found at {prompt_path}, using minimal fallback")
80 return (
81 "You are an expert legal AI assistant in AnyLegal's agentic workspace. "
82 "Help users with contract review, drafting, research, and document editing."
83 )
84
85logger = logging.getLogger(__name__)
86
87@dataclass
88class AgenticEvent:
89 """Event emitted during agentic loop execution."""
90 type: str
91 data: Dict[str, Any]
92 timestamp: str = field(default_factory=lambda: datetime.now(timezone.utc).isoformat())
93
94 def to_dict(self) -> Dict[str, Any]:
95 """Convert to dictionary."""
96 return {
97 "type": self.type,
98 "data": self.data,
99 "timestamp": self.timestamp
100 }
101
102class AgenticWorkspaceChatAsync:
103 """
104 Async version of AgenticWorkspaceChat with external session guard and state tracking.
105 Designed for FastAPI with true async/await.
106 """
107
108 MAX_ITERATIONS = 50
109 MAX_TOOL_CALLS_PER_ITERATION = 20
110 CONTEXT_WARN_THRESHOLD = 70
111 CONTEXT_CRITICAL_THRESHOLD = 85
112
113 def __init__(
114 self,
115 model: Optional[str] = None,
116 system_prompt: Optional[str] = None,
117 session_guard = None,
118 planner_mode: bool = False,
119 approved_plan: Optional[Dict[str, Any]] = None,
120 approved_mode_change: Optional[Dict[str, Any]] = None,
121 ):
122 """
123 Initialize async agentic chat.
124
125 Args:
126 model: LLM model to use (defaults to env CHAT_MODEL)
127 system_prompt: Optional system prompt override
128 session_guard: External async session guard (injected)
129 planner_mode: Run the agent in plan-and-execute mode (Planner service).
130 The planner template is selected automatically from the session's
131 loaded skills — e.g. a prior ``read_document("Skills/research/
132 SKILL.md")`` in the thread routes to the ``legal_research``
133 template. See ``_detect_planner_template``.
134 approved_plan: Plan JSON previously emitted by the planner and
135 approved by the user, echoed back to skip re-planning and
136 execute directly. Implements ``exit_plan_mode``
137 approval flow where the plan re-enters conversation context
138 on approval. When None (default) and ``planner_mode=True``,
139 the agent emits a plan and returns without executing.
140 """
141 from anylegal_oss.core.pricing import get_model_registry
142 self.model = model or os.getenv("CHAT_MODEL") or get_model_registry().get_default_model()
143 self.planner_mode = planner_mode
144 self._approved_plan = approved_plan
145 self._approved_mode_change = approved_mode_change
146 logger.info(
147 f"[AGENTIC_ASYNC] Initialized with model: {self.model}, "
148 f"planner_mode={self.planner_mode}"
149 )
150 self.system_prompt = system_prompt
151 self._cancelled = False
152 self._session_guard = session_guard
153 self._enable_streaming = os.getenv("AGENTIC_STREAMING", "true").lower() == "true"
154
155 self._session_id: Optional[str] = None
156 self._session_state = None
157 self._workspace: Optional[WorkspaceSession] = None
158
159 self._turn_count = 0
160 self._total_prompt_tokens = 0
161 self._total_completion_tokens = 0
162 self._total_cost_usd = 0.0
163
164 self._truncation_retries_used = 0
165 self.MAX_TRUNCATION_RETRIES = 4
166
167 async def run_async(
168 self,
169 session: WorkspaceSession,
170 message: str,
171 user_id: Optional[int] = None,
172 thread_id: Optional[str] = None,
173 max_turns: int = 50,
174 max_budget_usd: Optional[float] = None,
175 image_attachments: Optional[List[Dict[str, str]]] = None,
176 ) -> AsyncGenerator[AgenticEvent, None]:
177 """
178 Run the agentic loop asynchronously.
179
180 Args:
181 session: Workspace session
182 message: User's message
183 user_id: User ID for billing
184 thread_id: Thread ID for persistence
185 max_turns: Maximum iterations (default 50)
186 max_budget_usd: Optional budget limit
187 image_attachments: Optional images
188
189 Yields:
190 AgenticEvent objects for SSE streaming
191 """
192 self._session_id = session.session_id
193 self._workspace = session
194
195 from anylegal_oss.state.session_state import get_session_state
196 self._session_state = get_session_state(self._session_id)
197
198 mode_change_tool_result = None
199 if self._approved_mode_change:
200 change = self._approved_mode_change
201 approved = bool(change.get("approved"))
202 target_mode = change.get("mode") or "plan"
203 if approved and target_mode == "plan":
204 self._session_state.mode = "plan"
205
206 from anylegal_oss.workspace.tools.mode_tools import PLAN_MODE_ENTRY_RESULT
207 mode_change_tool_result = (
208 change.get("tool_call_id"),
209 PLAN_MODE_ENTRY_RESULT,
210 )
211 else:
212
213 mode_change_tool_result = (
214 change.get("tool_call_id"),
215 "User declined plan mode. Continue reactively — answer "
216 "directly or use tools as needed.",
217 )
218
219 approved_plan_tool_result = None
220 if self._approved_plan:
221 plan_payload = self._approved_plan
222 plan_text = plan_payload.get("plan_text") if isinstance(plan_payload, dict) else None
223 if not plan_text and isinstance(plan_payload, dict) and plan_payload.get("steps"):
224
225 lines = []
226 if plan_payload.get("goal"):
227 lines.append(f"**Goal:** {plan_payload['goal']}")
228 for i, step in enumerate(plan_payload.get("steps") or [], start=1):
229 if isinstance(step, dict) and step.get("description"):
230 lines.append(f"{i}. {step['description']}")
231 plan_text = "\n".join(lines) if lines else None
232 if plan_text:
233 self._session_state.mode = "default"
234
235 self._session_state.plan_already_approved = True
236 tool_call_id = plan_payload.get("tool_call_id") if isinstance(plan_payload, dict) else None
237
238 approval_body = (
239 "User has approved your plan. Begin executing it now with the full tool pool.\n\n"
240 "- Use the `todo_write` tool to track progress through the plan steps as you go, if applicable.\n"
241 "- Start with the first step. Use tool calls to gather information; produce a final answer at the end.\n\n"
242 "**Output rules for this turn (MANDATORY):**\n"
243 "- Cite every factual claim inline using [[N]](URL). Start numbering at [1] for this response.\n"
244 "- End the response with a ## Sources section listing each URL in order.\n"
245 "- If a tool errored during exploration, do not invent a source — say so and use only sources you actually fetched.\n"
246 "- Do NOT use markdown horizontal rules (--- or ***) as section dividers.\n\n"
247 f"## Approved Plan\n\n{plan_text}"
248 )
249 approved_plan_tool_result = (tool_call_id, approval_body)
250
251 if self._session_guard:
252 acquired = await self._session_guard.acquire(self._session_id, timeout=300)
253 if not acquired:
254 yield AgenticEvent(
255 type="error",
256 data={"error": "Session is already running"}
257 )
258 return
259
260 try:
261
262 self._cancelled = False
263 self._turn_count = 0
264 self._total_prompt_tokens = 0
265 self._total_completion_tokens = 0
266 self._total_cost_usd = 0.0
267
268 from anylegal_oss.workspace.tools.skill_tool import reset_skill_scope
269 reset_skill_scope(self._session_state)
270
271 yield AgenticEvent(
272 type="start",
273 data={
274 "session_id": self._session_id,
275 "workspace_id": session.session_id,
276 "thread_id": thread_id,
277 "message": message[:100] + "..." if len(message) > 100 else message,
278 "model": self.model,
279 "streaming": self._enable_streaming,
280 }
281 )
282
283 user_msg = {
284 "role": "user",
285 "content": message,
286 "session_id": self._session_id,
287 "thread_id": thread_id,
288 "timestamp": datetime.now(timezone.utc).isoformat(),
289 }
290 record_transcript([user_msg], session_id=self._session_id)
291
292 try:
293 await async_db.save_agentic_message(
294 session_id=self._session_id,
295 thread_id=thread_id,
296 user_id=user_id or 0,
297 message_type='user',
298 content=message,
299 model_used=self.model,
300 )
301 except Exception as e:
302 logger.warning(f"[AGENTIC] Failed to persist user message: {e}")
303
304 messages = await self._build_initial_messages(
305 session,
306 message,
307 thread_id=thread_id,
308 user_id=user_id or 0,
309 )
310
311 if mode_change_tool_result is not None:
312 tool_call_id, content = mode_change_tool_result
313 resolved_tcid = tool_call_id or "enter_plan_mode_approval"
314 messages.insert(-1, {
315 "role": "tool",
316 "tool_call_id": resolved_tcid,
317 "content": content,
318 })
319 try:
320 await async_db.save_agentic_message(
321 session_id=self._session_id,
322 thread_id=thread_id,
323 user_id=user_id or 0,
324 message_type='tool_result',
325 tool_name='enter_plan_mode',
326 tool_call_id=resolved_tcid,
327 content=content,
328 model_used=self.model,
329 )
330 except Exception as e:
331 logger.warning(f"[AGENTIC] Failed to persist enter_plan_mode tool_result: {e}")
332 if approved_plan_tool_result is not None:
333 tool_call_id, content = approved_plan_tool_result
334 resolved_tcid = tool_call_id or "exit_plan_mode_approval"
335 messages.insert(-1, {
336 "role": "tool",
337 "tool_call_id": resolved_tcid,
338 "content": content,
339 })
340 try:
341 await async_db.save_agentic_message(
342 session_id=self._session_id,
343 thread_id=thread_id,
344 user_id=user_id or 0,
345 message_type='tool_result',
346 tool_name='exit_plan_mode',
347 tool_call_id=resolved_tcid,
348 content=content,
349 model_used=self.model,
350 )
351 except Exception as e:
352 logger.warning(f"[AGENTIC] Failed to persist exit_plan_mode tool_result: {e}")
353
354 # TODO checklist), then walk the steps one at a time with a
355
356 plan = None
357 if self.planner_mode:
358 async for evt in self._plan_and_execute(
359 session=session, user_id=user_id or 0, thread_id=thread_id,
360 user_message=message, messages=messages,
361 max_turns=max_turns, max_budget_usd=max_budget_usd,
362 ):
363 yield evt
364
365 yield AgenticEvent(
366 type="end",
367 data={
368 "session_id": self._session_id,
369 "workspace_id": session.session_id,
370 "thread_id": thread_id,
371 "total_cost_usd": self._total_cost_usd,
372 "total_prompt_tokens": self._total_prompt_tokens,
373 "total_completion_tokens": self._total_completion_tokens,
374 "iterations": self._turn_count,
375 "planner_mode": True,
376 "workers_spawned": 0,
377 },
378 )
379 return
380
381 iteration = 0
382 while iteration < max_turns:
383 iteration += 1
384 self._turn_count = iteration
385
386 if self._cancelled:
387 yield AgenticEvent(
388 type="text_chunk",
389 data={"content": "[Cancelled by user]"}
390 )
391 break
392
393 if max_budget_usd and self._total_cost_usd >= max_budget_usd:
394 yield AgenticEvent(
395 type="error",
396 data={"error": f"Budget limit ${max_budget_usd} exceeded"}
397 )
398 break
399
400 ctx_tokens = self._estimate_tokens(messages)
401 ctx_pct = self._context_usage_pct(ctx_tokens, self.model)
402
403 if ctx_pct >= self.CONTEXT_CRITICAL_THRESHOLD:
404 logger.warning(f"[AGENTIC] Context CRITICAL: {ctx_tokens:,} tokens ({ctx_pct:.0f}%)")
405 elif ctx_pct >= self.CONTEXT_WARN_THRESHOLD:
406 logger.warning(f"[AGENTIC] Context HIGH: {ctx_tokens:,} tokens ({ctx_pct:.0f}%)")
407 else:
408 logger.info(f"[AGENTIC] Context: {ctx_tokens:,} tokens ({ctx_pct:.0f}%), iteration {iteration}")
409
410 if iteration > 2 and ctx_pct >= self.CONTEXT_CRITICAL_THRESHOLD:
411 yield AgenticEvent(
412 type="system_message",
413 data={"content": f"Context at {ctx_pct:.0f}% — compacting prior conversation..."}
414 )
415 pre_len = len(messages)
416 messages = await self._auto_compact(messages, thread_id=thread_id, user_id=user_id)
417 yield AgenticEvent(
418 type="system_message",
419 data={"content": f"Compacted {pre_len} → {len(messages)} messages."}
420 )
421
422 accumulated_content = ""
423 tool_calls: List[Dict[str, Any]] = []
424 usage: Optional[Dict[str, Any]] = None
425 finish_reason: Optional[str] = None
426 async for evt in self._call_llm_streaming(messages):
427 evt_type = evt.get("type")
428 if evt_type == "content":
429 delta_text = evt.get("delta") or ""
430 if delta_text:
431
432 plan_mode_active = (
433 self._session_state is not None
434 and getattr(self._session_state, "mode", "default") == "plan"
435 )
436 if plan_mode_active:
437 yield AgenticEvent(
438 type="thinking",
439 data={"content": delta_text}
440 )
441 else:
442 yield AgenticEvent(
443 type="text_chunk",
444 data={"content": delta_text}
445 )
446 elif evt_type == "reasoning":
447
448 accumulated = evt.get("accumulated") or evt.get("delta") or ""
449 if accumulated:
450 yield AgenticEvent(
451 type="thinking",
452 data={"content": accumulated}
453 )
454 elif evt_type == "done":
455 accumulated_content = evt.get("content", "") or ""
456 tool_calls = evt.get("tool_calls") or []
457 usage = evt.get("usage")
458 finish_reason = evt.get("finish_reason")
459
460 if usage:
461 self._total_prompt_tokens += usage.get("prompt_tokens", 0)
462 self._total_completion_tokens += usage.get("completion_tokens", 0)
463 self._total_cost_usd += usage.get("cost", 0.0)
464
465 from anylegal_oss.state.session_state import ModelUsage
466 model_usage = ModelUsage(
467 input_tokens=usage.get("prompt_tokens", 0),
468 output_tokens=usage.get("completion_tokens", 0),
469 cost=usage.get("cost", 0.0),
470 )
471 self._session_state.accumulate_cost(self.model, model_usage)
472
473 if tool_calls:
474
475 parse_failed_calls = [tc for tc in tool_calls if tc.get("parse_failed")]
476 if parse_failed_calls:
477
478 openai_all = [
479 {
480 "id": tc.get("id") or f"call_{iteration}_{tc.get('index', 0)}",
481 "type": "function",
482 "function": {
483 "name": tc["name"],
484 "arguments": json.dumps(tc.get("arguments", {}))
485 if isinstance(tc.get("arguments"), dict)
486 else (tc.get("arguments") or "{}"),
487 },
488 }
489 for tc in tool_calls
490 ]
491 messages.append({
492 "role": "assistant",
493 "content": accumulated_content or None,
494 "tool_calls": openai_all,
495 })
496 for tc in parse_failed_calls:
497 tool_call_id = tc.get("id") or f"call_{iteration}_{tc.get('index', 0)}"
498 raw_preview = (tc.get("raw_arguments") or "")[:120]
499 error_msg = (
500 f"Tool call arguments for '{tc['name']}' could "
501 f"not be parsed as JSON — the provider likely "
502 f"truncated the response mid-stream. Received: "
503 f"{raw_preview!r}. Retry the call with valid "
504 f"JSON arguments; keep them minimal."
505 )
506 yield AgenticEvent(
507 type="tool_call",
508 data={
509 "tool_name": tc["name"],
510 "arguments": tc["arguments"],
511 "tool_call_id": tool_call_id,
512 "streaming": False,
513 }
514 )
515 yield AgenticEvent(
516 type="tool_result",
517 data={
518 "tool_name": tc["name"],
519 "tool_call_id": tool_call_id,
520 "success": False,
521 "result": {},
522 "error": error_msg,
523 "execution_time_ms": 0.0,
524 },
525 )
526 messages.append({
527 "role": "tool",
528 "tool_call_id": tool_call_id,
529 "content": error_msg,
530 })
531
532 tool_calls = [tc for tc in tool_calls if not tc.get("parse_failed")]
533 assistant_msg_already_appended = True
534 if not tool_calls:
535
536 continue
537 else:
538 assistant_msg_already_appended = False
539
540 current_mode_for_validation = (
541 getattr(self._session_state, "mode", "default")
542 if self._session_state else "default"
543 )
544 invalid_exit_calls = [
545 tc for tc in tool_calls
546 if tc["name"] == "exit_plan_mode" and current_mode_for_validation != "plan"
547 ]
548 if invalid_exit_calls:
549 from anylegal_oss.workspace.tools.mode_tools import EXIT_PLAN_MODE_NOT_IN_PLAN_MODE_ERROR
550 for tc in invalid_exit_calls:
551 tool_call_id = f"call_{iteration}_{tc.get('index', 0)}"
552 yield AgenticEvent(
553 type="tool_call",
554 data={
555 "tool_name": tc["name"],
556 "arguments": tc["arguments"],
557 "tool_call_id": tool_call_id,
558 "streaming": False,
559 }
560 )
561 yield AgenticEvent(
562 type="tool_result",
563 data={
564 "tool_name": tc["name"],
565 "tool_call_id": tool_call_id,
566 "success": False,
567 "result": {},
568 "error": EXIT_PLAN_MODE_NOT_IN_PLAN_MODE_ERROR,
569 "execution_time_ms": 0.0,
570 },
571 )
572 messages.append({
573 "role": "tool",
574 "tool_call_id": tool_call_id,
575 "content": EXIT_PLAN_MODE_NOT_IN_PLAN_MODE_ERROR,
576 })
577
578 tool_calls = [tc for tc in tool_calls if tc not in invalid_exit_calls]
579
580 for tc in tool_calls:
581 if tc["name"] in ("enter_plan_mode", "exit_plan_mode"):
582 tool_call_id = f"call_{iteration}_{tc.get('index', 0)}"
583 yield AgenticEvent(
584 type="tool_call",
585 data={
586 "tool_name": tc["name"],
587 "arguments": tc["arguments"],
588 "tool_call_id": tool_call_id,
589 "streaming": False,
590 "awaiting_approval": True,
591 }
592 )
593
594 try:
595 args_json = json.dumps(tc["arguments"]) if isinstance(tc["arguments"], dict) else str(tc.get("arguments", ""))
596 await async_db.save_agentic_message(
597 session_id=self._session_id,
598 thread_id=thread_id,
599 user_id=user_id or 0,
600 message_type='tool_call',
601 tool_name=tc["name"],
602 tool_call_id=tool_call_id,
603 tool_arguments=args_json,
604 model_used=self.model,
605 )
606 except Exception as e:
607 logger.warning(f"[AGENTIC] Failed to persist mode-transition tool_call: {e}")
608
609 yield AgenticEvent(
610 type="end",
611 data={
612 "session_id": self._session_id,
613 "workspace_id": session.session_id,
614 "thread_id": thread_id,
615 "total_cost_usd": self._total_cost_usd,
616 "total_prompt_tokens": self._total_prompt_tokens,
617 "total_completion_tokens": self._total_completion_tokens,
618 "iterations": iteration,
619 "awaiting_approval": True,
620 "approval_kind": tc["name"],
621 }
622 )
623 return
624
625 for tc in tool_calls:
626 tool_call_id = f"call_{iteration}_{tc.get('index', 0)}"
627 yield AgenticEvent(
628 type="tool_call",
629 data={
630 "tool_name": tc["name"],
631 "arguments": tc["arguments"],
632 "tool_call_id": tool_call_id,
633 "streaming": False,
634 }
635 )
636
637 try:
638 args_json = (
639 json.dumps(tc["arguments"])
640 if isinstance(tc["arguments"], dict)
641 else str(tc.get("arguments", ""))
642 )
643 await async_db.save_agentic_message(
644 session_id=self._session_id,
645 thread_id=thread_id,
646 user_id=user_id or 0,
647 message_type='tool_call',
648 tool_name=tc["name"],
649 tool_call_id=tool_call_id,
650 tool_arguments=args_json,
651 model_used=self.model,
652 )
653 except Exception as e:
654 logger.warning(f"[AGENTIC] Failed to persist tool_call: {e}")
655
656 result = await self._execute_tool(tc)
657 yield AgenticEvent(
658 type="tool_result",
659 data={
660 "tool_name": tc["name"],
661 "tool_call_id": tool_call_id,
662 "success": result.success,
663 "result": result.result,
664 "error": result.error,
665 "execution_time_ms": result.execution_time_ms,
666 }
667 )
668
669 if result.success and isinstance(result.result, dict):
670 created: list[dict] = []
671 if tc["name"] == "create_document":
672 doc_path = result.result.get("document_created") or result.result.get("path")
673 if doc_path:
674 created.append({
675 "path": doc_path,
676 "description": result.result.get("description", ""),
677 "format": result.result.get("format", "md"),
678 "has_docx": bool(result.result.get("has_docx")),
679 })
680 elif tc["name"] == "run_code":
681 for entry in result.result.get("files_created") or []:
682 if not (entry.get("added_to_workspace") and entry.get("path")):
683 continue
684 entry_type = entry.get("type") or ""
685 fmt = Path(entry["path"]).suffix.lstrip(".").lower() or entry_type or "md"
686 created.append({
687 "path": entry["path"],
688 "description": "Generated by run_code",
689 "format": fmt,
690 "has_docx": entry_type == "docx" or fmt == "docx",
691 })
692
693 for doc in created:
694 yield AgenticEvent(
695 type="document_created",
696 data={
697 "path": doc["path"],
698 "description": doc["description"],
699 "workspace_id": session.session_id,
700 "format": doc["format"],
701 "has_docx": doc["has_docx"],
702 },
703 )
704
705 try:
706 result_content = result.result if result.success else (result.error or "Error")
707 if result_content and len(str(result_content)) > 10000:
708 result_content = str(result_content)[:10000] + "... [truncated]"
709 await async_db.save_agentic_message(
710 session_id=self._session_id,
711 thread_id=thread_id,
712 user_id=user_id or 0,
713 message_type='tool_result',
714 content=str(result_content),
715 tool_name=tc["name"],
716 tool_call_id=tool_call_id,
717 )
718 except Exception as e:
719 logger.warning(f"[AGENTIC] Failed to persist tool_result: {e}")
720
721 if result.success:
722 tool_content = json.dumps(result.result)
723 elif isinstance(result.result, dict):
724 payload = dict(result.result)
725 if result.error and not payload.get("error"):
726 payload["error"] = result.error
727 tool_content = json.dumps(payload)
728 else:
729 tool_content = result.error or ""
730 messages.append({
731 "role": "tool",
732 "tool_call_id": tool_call_id,
733 "content": tool_content,
734 })
735
736 if not assistant_msg_already_appended:
737 openai_tool_calls = [
738 {
739 "id": tc.get("id") or f"call_{iteration}_{tc.get('index', 0)}",
740 "type": "function",
741 "function": {
742 "name": tc["name"],
743 "arguments": json.dumps(tc.get("arguments", {}))
744 if isinstance(tc.get("arguments"), dict)
745 else (tc.get("arguments") or "{}"),
746 },
747 }
748 for tc in tool_calls
749 ]
750 messages.append({
751 "role": "assistant",
752 "content": accumulated_content or None,
753 "tool_calls": openai_tool_calls,
754 })
755 else:
756
757 is_empty = not (accumulated_content and accumulated_content.strip())
758 if is_empty and finish_reason in ("length", "stop"):
759
760 async for ev in self._handle_empty_truncated_response(
761 usage=usage,
762 thread_id=thread_id,
763 user_id=user_id,
764 finish_reason=finish_reason,
765 ):
766 yield ev
767 if self._truncation_retries_used <= self.MAX_TRUNCATION_RETRIES:
768
769 continue
770
771 break
772
773 if accumulated_content and accumulated_content.strip():
774 try:
775 await async_db.save_agentic_message(
776 session_id=self._session_id,
777 thread_id=thread_id,
778 user_id=user_id or 0,
779 message_type='assistant',
780 content=accumulated_content,
781 model_used=self.model,
782 tokens_used=self._total_prompt_tokens + self._total_completion_tokens,
783 cost=self._total_cost_usd,
784 )
785 except Exception as e:
786 logger.warning(f"[AGENTIC] Failed to persist final assistant message: {e}")
787 break
788
789 assistant_msg = {
790 "role": "assistant",
791 "content": accumulated_content,
792 "tool_calls": tool_calls if tool_calls else None,
793 "timestamp": datetime.now(timezone.utc).isoformat(),
794 }
795 record_transcript([assistant_msg], session_id=self._session_id)
796
797 if tool_calls and accumulated_content and accumulated_content.strip():
798 try:
799 await async_db.save_agentic_message(
800 session_id=self._session_id,
801 thread_id=thread_id,
802 user_id=user_id or 0,
803 message_type='assistant',
804 content=accumulated_content,
805 model_used=self.model,
806 )
807 except Exception as e:
808 logger.warning(f"[AGENTIC] Failed to persist assistant message: {e}")
809
810 if os.getenv("EAGER_FLUSH", "false").lower() == "true":
811 flush_session_storage()
812
813 yield AgenticEvent(
814 type="end",
815 data={
816 "session_id": self._session_id,
817 "workspace_id": session.session_id,
818 "thread_id": thread_id,
819 "total_cost_usd": self._total_cost_usd,
820 "total_prompt_tokens": self._total_prompt_tokens,
821 "total_completion_tokens": self._total_completion_tokens,
822 "iterations": iteration,
823 }
824 )
825
826 except Exception as e:
827 logger.error(f"Agentic loop error: {e}", exc_info=True)
828 yield AgenticEvent(
829 type="error",
830 data={"error": str(e)}
831 )
832 yield AgenticEvent(
833 type="end",
834 data={"session_id": self._session_id, "workspace_id": session.session_id, "error": True}
835 )
836 finally:
837
838 if self._session_guard:
839 await self._session_guard.release(self._session_id)
840
841 async def _build_initial_messages(
842 self,
843 session: WorkspaceSession,
844 user_message: str,
845 thread_id: Optional[str] = None,
846 user_id: int = 0,
847 ) -> List[Dict]:
848 """Build initial message list for LLM, loading prior thread history if resuming."""
849 messages: List[Dict] = []
850
851 system_prompt = self.system_prompt or self._default_system_prompt()
852
853 try:
854 cascade = session.get_anylegal_cascade(
855 document_path=session.active_document
856 )
857 if cascade:
858 parts = [
859 f"### Instructions ({label}):\n{content}"
860 for label, content in cascade
861 ]
862 system_prompt += (
863 "\n\n## User Instructions (anylegal.md cascade):\n"
864 + "\n\n".join(parts)
865 )
866 except Exception as e:
867 logger.warning(f"[AGENTIC_ASYNC] anylegal cascade load failed: {e}")
868
869 try:
870 from.memory_layer import build_memory_layer
871 memory_block = build_memory_layer(
872 session=session,
873 workspace_id=getattr(session, 'session_id', '') or '',
874 active_doc_path=session.active_document,
875 user_id=user_id,
876 )
877 if memory_block:
878 system_prompt += "\n\n" + memory_block
879 except Exception as e:
880 logger.warning(f"[AGENTIC_ASYNC] memory layer build failed: {e}")
881
882 try:
883 playbook_manifest = session.build_playbook_manifest()
884 if playbook_manifest:
885 system_prompt += f"\n\n{playbook_manifest}"
886 except Exception as e:
887 logger.warning(f"[AGENTIC_ASYNC] playbook manifest load failed: {e}")
888
889 try:
890 template_files = session.get_template_files()
891 if template_files:
892 tmpl_list = "\n".join(
893 f"- `{t['path']}` (read-only)" for t in template_files
894 )
895 system_prompt += (
896 f"\n\n## Available Templates (user-uploaded, read-only for agent):\n"
897 f"{tmpl_list}\n"
898 f"Use `read_document` to read a template. Do NOT write to Templates/."
899 )
900 except Exception as e:
901 logger.warning(f"[AGENTIC_ASYNC] templates list failed: {e}")
902
903 try:
904 visible_wf = {
905 path: content for path, content in session.workspace_files.items()
906 if not path.endswith("anylegal.md") and content and content.strip()
907 }
908 if visible_wf:
909 wf_list = "\n".join(f"- {path}" for path in visible_wf)
910 system_prompt += f"\n\n## Workspace Files (readable/editable via tools):\n{wf_list}"
911 except Exception as e:
912 logger.warning(f"[AGENTIC_ASYNC] workspace files list failed: {e}")
913
914 system_prompt += f"\n\n## Current Date\nToday is {datetime.now().strftime('%d %B %Y')}."
915
916 try:
917 if session.documents:
918 def _doc_format_label(doc, path):
919 if doc.mime_type and doc.mime_type.startswith('image/'):
920 return 'IMAGE — visible in attached messages'
921 if doc.docx_blob:
922 return 'DOCX'
923 ext = path.rsplit('.', 1)[-1].lower() if '.' in path else ''
924 if ext in ('pdf', 'xlsx', 'xls', 'pptx', 'ppt'):
925 return ext.upper()
926 return 'HTML'
927
928 doc_list = "\n".join([
929 f"- {path}: {doc.description or 'No description'} "
930 f"({_doc_format_label(doc, path)})"
931 for path, doc in session.documents.items()
932 ])
933 system_prompt += f"\n\n## Current Workspace Documents:\n{doc_list}"
934
935 if session.active_document:
936 is_reviewable = session.active_document in session.documents
937 system_prompt += f"\n\nActive document: {session.active_document}"
938 if is_reviewable:
939 system_prompt += (
940 "\nWhen the user says 'this', 'the document', 'this agreement', or similar "
941 "references without specifying a name, they mean the active document. "
942 "Read and work with it directly — do NOT ask which document."
943 )
944 else:
945 system_prompt += (
946 f"\n\n**Note:** The active document '{session.active_document}' is NOT a contract or "
947 f"document to review — it is an instructions/playbook/template/reference file. "
948 f"If the user asks to review, draft, compare, or edit a document, you MUST ask which "
949 f"document from the workspace they want to work with. Do NOT review or analyze the "
950 f"active file itself. List the available documents above and ask the user to pick one."
951 )
952
953 if session.context:
954 ctx_str = ", ".join(f"{k}: {v}" for k, v in session.context.items())
955 system_prompt += f"\n\n## Session Context:\n{ctx_str}"
956 except Exception as e:
957 logger.warning(f"[AGENTIC_ASYNC] workspace context assembly failed: {e}")
958
959 from anylegal_oss.workspace.tools.todo_tool import TODO_WRITE_GUIDANCE
960 system_prompt = f"{system_prompt}\n\n## Progress Tracking\n\n{TODO_WRITE_GUIDANCE}"
961
962 from anylegal_oss.workspace.tools.mode_tools import (
963 PLAN_MODE_GUIDANCE, LEGAL_RESEARCH_PLAN_HINT,
964 )
965 current_mode = getattr(self._session_state, "mode", "default") if self._session_state else "default"
966 if current_mode == "plan":
967 system_prompt = f"{system_prompt}\n\n## Plan Mode\n\n{PLAN_MODE_GUIDANCE}"
968
969 system_prompt = f"{system_prompt}\n\n### Legal research\n\n{LEGAL_RESEARCH_PLAN_HINT}"
970
971 messages.append({
972 "role": "system",
973 "content": system_prompt,
974 })
975
976 if thread_id:
977 try:
978 prior = await async_db.get_agentic_thread_messages(
979 thread_id=thread_id,
980 limit=200,
981 )
982
983 system_prefix_len = len(messages)
984 id_map: Dict[str, str] = {}
985 replayed_tool_events = 0
986 boundaries_seen = 0
987 for row in prior:
988 mtype = row.get("message_type")
989 content = row.get("content")
990 if mtype == "compaction_boundary":
991
992 del messages[system_prefix_len:]
993 id_map = {}
994 if content:
995 messages.append({"role": "user", "content": content})
996 boundaries_seen += 1
997 elif mtype == "user":
998 if content:
999 messages.append({"role": "user", "content": content})
1000 elif mtype == "assistant":
1001 if content:
1002 messages.append({"role": "assistant", "content": content})
1003 elif mtype == "tool_call":
1004 old_id = row.get("tool_call_id") or ""
1005 new_id = f"replay_{uuid.uuid4().hex[:12]}"
1006 if old_id:
1007 id_map[old_id] = new_id
1008 tool_name = row.get("tool_name") or ""
1009 tool_args = row.get("tool_arguments") or "{}"
1010 if not tool_name:
1011 continue
1012 messages.append({
1013 "role": "assistant",
1014 "content": None,
1015 "tool_calls": [{
1016 "id": new_id,
1017 "type": "function",
1018 "function": {
1019 "name": tool_name,
1020 "arguments": tool_args,
1021 },
1022 }],
1023 })
1024 replayed_tool_events += 1
1025 elif mtype == "tool_result":
1026 old_id = row.get("tool_call_id") or ""
1027 new_id = id_map.get(old_id)
1028 if not new_id:
1029
1030 continue
1031 messages.append({
1032 "role": "tool",
1033 "tool_call_id": new_id,
1034 "content": content or "",
1035 })
1036 replayed_tool_events += 1
1037 logger.info(
1038 f"[AGENTIC] Resumed thread {thread_id}: loaded {len(prior)} "
1039 f"rows ({replayed_tool_events} tool events replayed, "
1040 f"{boundaries_seen} compaction boundaries)"
1041 )
1042 except Exception as e:
1043 logger.warning(f"[AGENTIC] Failed to load thread {thread_id} history: {e}")
1044
1045 messages.append({
1046 "role": "user",
1047 "content": user_message,
1048 })
1049
1050 return messages
1051
1052 def _default_system_prompt(self) -> str:
1053 """Load system prompt from prompts/system_prompt.md file."""
1054 prompt_path = os.path.join(
1055 os.path.dirname(__file__), "prompts", "system_prompt.md"
1056 )
1057 try:
1058 with open(prompt_path, "r", encoding="utf-8") as f:
1059 return f.read()
1060 except FileNotFoundError:
1061 logger.warning(f"System prompt file not found at {prompt_path}, using minimal fallback")
1062 return "You are an expert legal AI assistant in AnyLegal's agentic workspace. Help users with contract review, drafting, research, and document editing."
1063
1064 def _count_tokens(self, text: str) -> int:
1065 """
1066 Count tokens in a string using a real BPE tokenizer.
1067
1068 Why not len(text)//4: that heuristic averages 0.25 tok/char for
1069 ASCII English and falls apart on high-entropy bytes. Binary content
1070 accidentally injected as text (e.g. a PDF body that wasn't decoded)
1071 can tokenize at ~1.87 tok/char at the provider — under-reporting
1072 local token counts by an order of magnitude. Compaction's threshold
1073 won't fire when the local view is wrong by that much.
1074
1075 We use tiktoken's o200k_base (the GPT-4o family encoder). It's not
1076 the exact tokenizer for every model — Kimi K2 uses a DeepSeek-derived
1077 tokenizer, Claude uses its own — but cross-model error is ~10–20%,
1078 which is fine for context-pressure decisions. Critically, o200k_base
1079 tokenizes high-entropy bytes the same way other modern BPEs do, so
1080 the binary-garbage failure mode is correctly caught.
1081 """
1082 try:
1083 return len(_get_tokenizer().encode(text or "", disallowed_special=()))
1084 except Exception:
1085
1086 return max(0, len(text or "") // 3)
1087
1088 def _estimate_tokens(self, messages: List[Dict]) -> int:
1089 """
1090 Estimate total tokens for a chat-completion message list.
1091
1092 Counts content (str or Anthropic-style content blocks) AND tool_calls
1093 (function name + arguments JSON) AND tool_call_id strings on tool
1094 messages. The previous implementation counted only `content`, which
1095 meant tool-heavy iterations underreported by however many JSON-arg
1096 bytes the model emitted.
1097
1098 Adds a small per-message overhead (4 tokens) to approximate the
1099 chat-completions message-framing overhead — matches OpenAI's published
1100 guidance for cl100k/o200k.
1101 """
1102 total = 0
1103 for msg in messages:
1104 total += 4
1105
1106 content = msg.get("content")
1107 if isinstance(content, str):
1108 total += self._count_tokens(content)
1109 elif isinstance(content, list):
1110 for block in content:
1111 if isinstance(block, dict):
1112 text = block.get("text", "") or ""
1113 total += self._count_tokens(text)
1114
1115 tool_calls = msg.get("tool_calls") or []
1116 for tc in tool_calls:
1117 if isinstance(tc, dict):
1118 fn = tc.get("function") or {}
1119 total += self._count_tokens(fn.get("name", "") or "")
1120 args = fn.get("arguments")
1121 if isinstance(args, str):
1122 total += self._count_tokens(args)
1123 elif args is not None:
1124 total += self._count_tokens(json.dumps(args))
1125
1126 tcid = msg.get("tool_call_id")
1127 if tcid:
1128 total += self._count_tokens(tcid)
1129
1130 return total
1131
1132 def _context_usage_pct(self, tokens: int, model: str) -> float:
1133 """
1134 Compute context-window usage percentage against the model's real
1135 window from the model registry. Falls back to 200K for unknown
1136 models (most modern models exceed this; the underestimate is safer
1137 than overestimating headroom).
1138
1139 Previously hardcoded to 200K, which under-reports context pressure
1140 for any model with a larger window — Kimi K2.6 (262K), Gemini 1.5/2.5
1141 (1M-2M), Claude Sonnet 4 (200K-1M depending on tier). Combine that
1142 with a broken token estimator (above) and the local view can lag
1143 actual context-used by tens of percentage points.
1144 """
1145 try:
1146 from anylegal_oss.core.pricing import get_model_registry
1147 info = get_model_registry().get_model(model)
1148 context_size = info.context_window if info and info.context_window else 200_000
1149 except Exception:
1150 context_size = 200_000
1151 if context_size <= 0:
1152 context_size = 200_000
1153 return (tokens / context_size) * 100
1154
1155 async def _handle_empty_truncated_response(
1156 self,
1157 usage: Optional[Dict[str, Any]],
1158 thread_id: Optional[str],
1159 user_id: Optional[int],
1160 finish_reason: Optional[str] = None,
1161 ) -> AsyncGenerator[AgenticEvent, None]:
1162 """
1163 Recover from an empty stream — either finish_reason=length (provider
1164 truncated, no content + no tool_calls) or finish_reason=stop (model
1165 voluntarily stopped with all output stuck in the reasoning channel).
1166
1167 First N attempts (N = MAX_TRUNCATION_RETRIES): emit a system_message
1168 and bump the counter. The caller will `continue` the loop, which
1169 re-issues the request through OpenRouter — usually onto a different
1170 provider or after the transient hiccup clears.
1171
1172 After retries are spent: synthesize a visible assistant message,
1173 stream it as text_chunk so the UI renders it, persist via
1174 save_agentic_message so the thread isn't a ghost, and let the
1175 caller break.
1176 """
1177 prompt_tokens = (usage or {}).get("prompt_tokens", 0) or 0
1178 completion_tokens = (usage or {}).get("completion_tokens", 0) or 0
1179
1180 self._truncation_retries_used += 1
1181 if self._truncation_retries_used <= self.MAX_TRUNCATION_RETRIES:
1182 logger.warning(
1183 f"[AGENTIC] empty stream finish_reason={finish_reason} "
1184 f"(prompt={prompt_tokens} completion={completion_tokens}); "
1185 f"retry {self._truncation_retries_used}/{self.MAX_TRUNCATION_RETRIES}"
1186 )
1187
1188 return
1189
1190 user_message = "No response from the model provider. Try again."
1191
1192 logger.warning(
1193 f"[AGENTIC] empty stream finish_reason={finish_reason} — retry budget spent "
1194 f"(prompt={prompt_tokens} completion={completion_tokens}); "
1195 f"surfacing user-visible failure"
1196 )
1197
1198 yield AgenticEvent(
1199 type="text_chunk",
1200 data={"content": user_message},
1201 )
1202
1203 try:
1204 await async_db.save_agentic_message(
1205 session_id=self._session_id,
1206 thread_id=thread_id,
1207 user_id=user_id or 0,
1208 message_type='assistant',
1209 content=user_message,
1210 model_used=self.model,
1211 tokens_used=self._total_prompt_tokens + self._total_completion_tokens,
1212 cost=self._total_cost_usd,
1213 )
1214 except Exception as e:
1215 logger.warning(f"[AGENTIC] Failed to persist truncation-failure message: {e}")
1216
1217 _PLAN_TOOL_CALL_ID = "plan-tool-call"
1218 _PLAN_TOOL_NAME = "update_plan"
1219
1220 async def _emit_plan_state(self, plan) -> AsyncGenerator[AgenticEvent, None]:
1221 """Emit a synthetic tool_call + tool_result carrying the current plan."""
1222 payload = plan.to_dict()
1223 yield AgenticEvent(
1224 type="tool_call",
1225 data={
1226 "tool_name": self._PLAN_TOOL_NAME,
1227 "arguments": payload,
1228 "tool_call_id": self._PLAN_TOOL_CALL_ID,
1229 "streaming": False,
1230 },
1231 )
1232 yield AgenticEvent(
1233 type="tool_result",
1234 data={
1235 "tool_name": self._PLAN_TOOL_NAME,
1236 "tool_call_id": self._PLAN_TOOL_CALL_ID,
1237 "success": True,
1238 "result": payload,
1239 "error": None,
1240 "execution_time_ms": 0.0,
1241 },
1242 )
1243
1244 _RESEARCH_SKILL_PATH = "Skills/research/SKILL.md"
1245
1246 def _session_loaded_research_skill(self, messages: List[Dict]) -> bool:
1247 """
1248 Return True if the research skill was invoked anywhere in this thread.
1249
1250 Detects both invocation paths (transition compatibility):
1251 - ``Skill(skill="research")`` — current Anthropic/convention path.
1252 - ``read_document("Skills/research/SKILL.md")`` — legacy path,
1253 honored until the legacy path is removed.
1254
1255 Looks at assistant messages' ``tool_calls`` (live-run shape) and at
1256 DB-rehydrated ``message_type="tool_call"`` rows.
1257 """
1258 for msg in messages or []:
1259 tool_calls = msg.get("tool_calls") or []
1260 for tc in tool_calls:
1261 name = tc.get("name") or ""
1262 args = tc.get("arguments") or {}
1263 if name == "Skill":
1264 if isinstance(args, dict) and args.get("skill") == "research":
1265 return True
1266 elif name == "read_document":
1267 path = args.get("path") if isinstance(args, dict) else None
1268 if isinstance(path, str) and path == self._RESEARCH_SKILL_PATH:
1269 return True
1270 if msg.get("message_type") == "tool_call":
1271 tool_name = msg.get("tool_name") or ""
1272 raw = msg.get("tool_arguments") or msg.get("content") or ""
1273 if tool_name == "Skill" and '"research"' in str(raw):
1274 return True
1275 if tool_name == "read_document" and self._RESEARCH_SKILL_PATH in str(raw):
1276 return True
1277 return False
1278
1279 def _detect_planner_template(self, messages: List[Dict]) -> str:
1280 return "legal_research" if self._session_loaded_research_skill(messages) else "default"
1281
1282 @staticmethod
1283 def _rehydrate_plan(payload: Dict[str, Any]):
1284 """Rebuild an ``ExecutionPlan`` from a JSON payload the client echoed
1285 back on approval. Step statuses and results are reset so execution
1286 starts from a clean slate (the UI is rendering the fresh plan anyway
1287 from the re-emitted ``update_plan`` snapshot)."""
1288 from anylegal_oss.services.planning.planner import ExecutionPlan, PlanStep
1289 import uuid
1290
1291 raw_steps = payload.get("steps") or []
1292 if not isinstance(raw_steps, list) or not raw_steps:
1293 raise ValueError("approved_plan.steps is empty or invalid")
1294
1295 steps: List[PlanStep] = []
1296 for idx, raw in enumerate(raw_steps, start=1):
1297 if not isinstance(raw, dict):
1298 continue
1299 desc = (raw.get("description") or "").strip()
1300 if not desc:
1301 continue
1302 steps.append(PlanStep(
1303 step_number=int(raw.get("step_number") or idx),
1304 description=desc,
1305 tool_calls=[],
1306 status="pending",
1307 result=None,
1308 error=None,
1309 ))
1310 if not steps:
1311 raise ValueError("approved_plan has no usable steps")
1312
1313 return ExecutionPlan(
1314 plan_id=str(payload.get("plan_id") or uuid.uuid4()),
1315 goal=str(payload.get("goal") or ""),
1316 steps=steps,
1317 reasoning=payload.get("reasoning"),
1318 status="pending",
1319 )
1320
1321 async def _plan_and_execute(
1322 self,
1323 *,
1324 session: "WorkspaceSession",
1325 user_id: int,
1326 thread_id: Optional[str],
1327 user_message: str,
1328 messages: List[Dict],
1329 max_turns: int,
1330 max_budget_usd: Optional[float],
1331 ) -> AsyncGenerator[AgenticEvent, None]:
1332 """
1333 Plan-and-execute loop.
1334
1335 1. Ask the Planner to decompose ``user_message`` into N steps.
1336 2. Emit the plan as an update_plan tool call so the UI renders the
1337 TODO checklist immediately.
1338 3. For each step, run a bounded reactive loop. Emit an update_plan
1339 refresh on every status transition.
1340 4. Final step synthesizes the answer.
1341 """
1342 from anylegal_oss.services.planning.planner import get_planner
1343
1344 planner = get_planner()
1345
1346 per_step_cap = max(1, max_turns // max(1, self.MAX_ITERATIONS // 8))
1347
1348 if self._approved_plan is not None:
1349 try:
1350 plan = self._rehydrate_plan(self._approved_plan)
1351 logger.info(
1352 f"[PLANNER] approved plan rehydrated: {len(plan.steps)} steps"
1353 )
1354 except Exception as e:
1355 logger.warning(f"[PLANNER] rehydrate failed; re-planning: {e}")
1356 self._approved_plan = None
1357
1358 if self._approved_plan is None:
1359
1360 plan_template = self._detect_planner_template(messages)
1361 logger.info(f"[PLANNER] selected template: {plan_template}")
1362
1363 try:
1364 plan = await planner.create_plan(
1365 goal=user_message,
1366 context="",
1367 available_tools=[t["name"] for t in get_workspace_tools()],
1368 model=self.model,
1369 plan_template=plan_template,
1370 )
1371 except Exception as e:
1372 logger.warning(f"[PLANNER] create_plan failed; falling back to reactive: {e}")
1373
1374 async for evt in self._reactive_loop(messages, max_turns, max_budget_usd, thread_id, user_id):
1375 yield evt
1376 return
1377
1378 async for evt in self._emit_plan_state(plan):
1379 yield evt
1380 return
1381
1382 plan.status = "in_progress"
1383 async for evt in self._emit_plan_state(plan):
1384 yield evt
1385
1386 final_text = ""
1387
1388 for step in plan.steps:
1389 if self._cancelled:
1390 break
1391 if max_budget_usd and self._total_cost_usd >= max_budget_usd:
1392 yield AgenticEvent(
1393 type="error",
1394 data={"error": f"Budget limit ${max_budget_usd} exceeded"},
1395 )
1396 plan.mark_step_failed(step.step_number, "budget exceeded")
1397 async for e in self._emit_plan_state(plan):
1398 yield e
1399 break
1400
1401 plan.mark_step_in_progress(step.step_number)
1402 plan.current_step = step.step_number
1403 async for e in self._emit_plan_state(plan):
1404 yield e
1405
1406 is_last = step.step_number == len(plan.steps)
1407 if is_last:
1408 nudge_tail = (
1409 "This is the FINAL step — produce the user's complete answer by "
1410 "synthesizing the findings from prior steps. No more tool calls: "
1411 "write the full structured response per the skill's output format, "
1412 "with inline citations. When you stop emitting tool calls and reply "
1413 "with text, the plan completes."
1414 )
1415 else:
1416 nudge_tail = (
1417 "Focus on THIS step only. Use tools as needed (web_search, "
1418 "web_search, etc.). When the research for this step is complete, "
1419 "stop calling tools and reply with a concise summary of findings "
1420 "(2-5 sentences with inline citations). That summary becomes the "
1421 "step's result and the plan advances. Do NOT plan ahead or answer "
1422 "the user's full question yet — later steps will."
1423 )
1424 messages.append({
1425 "role": "user",
1426 "content": (
1427 f"[Plan step {step.step_number} of {len(plan.steps)}] "
1428 f"{step.description}\n\n{nudge_tail}"
1429 ),
1430 })
1431
1432 step_result_text = ""
1433 try:
1434 self._last_step_text = ""
1435 async for evt in self._scoped_reactive_loop(
1436 messages=messages,
1437 max_turns=per_step_cap,
1438 max_budget_usd=max_budget_usd,
1439 thread_id=thread_id,
1440 user_id=user_id,
1441 emit_text_to_stream=is_last,
1442 ):
1443 yield evt
1444 step_result_text = self._last_step_text
1445 except Exception as e:
1446 logger.error(f"[PLANNER] step {step.step_number} raised: {e}", exc_info=True)
1447 plan.mark_step_failed(step.step_number, str(e))
1448 async for e2 in self._emit_plan_state(plan):
1449 yield e2
1450 continue
1451
1452 plan.mark_step_complete(step.step_number, step_result_text)
1453 async for e in self._emit_plan_state(plan):
1454 yield e
1455 final_text = step_result_text or final_text
1456
1457 plan.status = "completed" if not any(s.status == "failed" for s in plan.steps) else "failed"
1458 async for e in self._emit_plan_state(plan):
1459 yield e
1460
1461 if final_text:
1462 try:
1463 await async_db.save_agentic_message(
1464 session_id=self._session_id,
1465 thread_id=thread_id,
1466 user_id=user_id,
1467 message_type="assistant",
1468 content=final_text,
1469 model_used=self.model,
1470 tokens_used=self._total_prompt_tokens + self._total_completion_tokens,
1471 cost=self._total_cost_usd,
1472 )
1473 except Exception as e:
1474 logger.warning(f"[PLANNER] final assistant persist failed: {e}")
1475
1476 async def _scoped_reactive_loop(
1477 self,
1478 *,
1479 messages: List[Dict],
1480 max_turns: int,
1481 max_budget_usd: Optional[float],
1482 thread_id: Optional[str],
1483 user_id: int,
1484 emit_text_to_stream: bool = True,
1485 ) -> AsyncGenerator[AgenticEvent, None]:
1486 """
1487 One-step mini-loop for planner mode. Yields events; stores the final
1488 text chunk emitted for the step on ``self._last_step_text`` so the
1489 caller can persist it on the corresponding PlanStep.
1490
1491 ``emit_text_to_stream``: when False (non-final plan steps), the step's
1492 findings are stored on ``self._last_step_text`` but not yielded as a
1493 text_chunk. Keeps the main answer stream clean — only the final
1494 synthesis step renders into the chat transcript. Intermediate findings
1495 are still shown per-step via the PlanChecklist accordion on the client.
1496 """
1497 iteration = 0
1498 self._last_step_text = ""
1499 while iteration < max_turns:
1500 iteration += 1
1501 self._turn_count += 1
1502 if self._cancelled:
1503 break
1504 if max_budget_usd and self._total_cost_usd >= max_budget_usd:
1505 break
1506
1507 accumulated_content, tool_calls, usage = await self._call_llm(messages)
1508 if usage:
1509 self._total_prompt_tokens += usage.get("prompt_tokens", 0)
1510 self._total_completion_tokens += usage.get("completion_tokens", 0)
1511 self._total_cost_usd += usage.get("cost", 0.0)
1512
1513 if accumulated_content and not tool_calls:
1514 self._last_step_text = accumulated_content
1515 if emit_text_to_stream:
1516 yield AgenticEvent(
1517 type="text_chunk", data={"content": accumulated_content}
1518 )
1519 return
1520
1521 if tool_calls:
1522 for tc in tool_calls:
1523 tool_call_id = f"call_{iteration}_{tc.get('index', 0)}"
1524 yield AgenticEvent(
1525 type="tool_call",
1526 data={
1527 "tool_name": tc["name"], "arguments": tc["arguments"],
1528 "tool_call_id": tool_call_id, "streaming": False,
1529 },
1530 )
1531 result = await self._execute_tool(tc)
1532 yield AgenticEvent(
1533 type="tool_result",
1534 data={
1535 "tool_name": tc["name"], "tool_call_id": tool_call_id,
1536 "success": result.success, "result": result.result,
1537 "error": result.error,
1538 "execution_time_ms": result.execution_time_ms,
1539 },
1540 )
1541 messages.append({
1542 "role": "tool", "tool_call_id": tool_call_id,
1543 "content": json.dumps(result.result) if result.success else (result.error or ""),
1544 })
1545 openai_tool_calls = [
1546 {
1547 "id": tc.get("id") or f"call_{iteration}_{tc.get('index', 0)}",
1548 "type": "function",
1549 "function": {
1550 "name": tc["name"],
1551 "arguments": json.dumps(tc.get("arguments", {}))
1552 if isinstance(tc.get("arguments"), dict)
1553 else (tc.get("arguments") or "{}"),
1554 },
1555 }
1556 for tc in tool_calls
1557 ]
1558 messages.append({
1559 "role": "assistant", "content": accumulated_content or None,
1560 "tool_calls": openai_tool_calls,
1561 })
1562 else:
1563 break
1564
1565 async def _reactive_loop(
1566 self,
1567 messages: List[Dict],
1568 max_turns: int,
1569 max_budget_usd: Optional[float],
1570 thread_id: Optional[str],
1571 user_id: Optional[int],
1572 ) -> AsyncGenerator[AgenticEvent, None]:
1573 """Fallback for when the Planner fails — just forward to the normal loop.
1574
1575 Currently a thin adapter: we re-enter the main loop body below. Used
1576 only when ``_plan_and_execute`` catches a planner error. For now it
1577 synthesizes a single text chunk rather than recursing; the main
1578 agentic loop already handles the reactive case when planner_mode=False.
1579 """
1580 yield AgenticEvent(
1581 type="error",
1582 data={"error": "Planner unavailable; please retry without planner_mode."},
1583 )
1584
1585 async def _auto_compact(
1586 self,
1587 messages: List[Dict],
1588 thread_id: Optional[str] = None,
1589 user_id: Optional[int] = None,
1590 ) -> List[Dict]:
1591 """Perform auto-compaction and return new message list.
1592
1593 On success, also writes a `compaction_boundary` row to agentic_messages
1594 so subsequent /chat calls (in fresh processes) replay from the boundary
1595 rather than the full thread history. Without this DB write, compaction
1596 would only shrink the in-memory list for the current SSE stream and
1597 the next request would reload the full unchanged history.
1598 """
1599 if not self._session_state:
1600 logger.warning("No session state for compaction")
1601 return messages
1602
1603 try:
1604 result = await perform_compaction(
1605 messages=messages,
1606 session_state=self._session_state,
1607 custom_instructions=None,
1608 is_auto=True,
1609 )
1610
1611 if result["success"]:
1612
1613 new_messages = []
1614 if result["boundary_marker"]:
1615 new_messages.append(result["boundary_marker"])
1616 if result["summary_messages"]:
1617 new_messages.extend(result["summary_messages"])
1618 if result["attachments"]:
1619 new_messages.extend(result["attachments"])
1620
1621 logger.info(f"[AGENTIC] Auto-compaction: {len(messages)} -> {len(new_messages)} messages")
1622
1623 record_transcript([result["boundary_marker"]], session_id=self._session_id)
1624
1625 if thread_id:
1626 try:
1627
1628 summary_str = ""
1629 for sm in (result.get("summary_messages") or []):
1630 sm_content = sm.get("content")
1631 if isinstance(sm_content, str):
1632 summary_str = sm_content
1633 elif isinstance(sm_content, list):
1634 summary_str = "\n".join(
1635 (b.get("text") or "")
1636 for b in sm_content
1637 if isinstance(b, dict)
1638 )
1639 if summary_str:
1640 break
1641 boundary_metadata = json.dumps({
1642 "pre_tokens": result.get("pre_tokens", 0),
1643 "post_tokens": result.get("post_tokens", 0),
1644 "is_auto": True,
1645 })
1646 await async_db.save_agentic_message(
1647 session_id=self._session_id,
1648 thread_id=thread_id,
1649 user_id=user_id or 0,
1650 message_type='compaction_boundary',
1651 content=summary_str,
1652 tool_arguments=boundary_metadata,
1653 model_used=self.model,
1654 )
1655 except Exception as e:
1656 logger.warning(f"[AGENTIC] Failed to persist compaction boundary: {e}")
1657
1658 return new_messages
1659 else:
1660 logger.error(f"[AGENTIC] Auto-compaction failed: {result.get('error')}")
1661 return messages
1662
1663 except Exception as e:
1664 logger.error(f"[AGENTIC] Auto-compaction error: {e}")
1665 return messages
1666
1667 async def _call_llm(self, messages: List[Dict]) -> tuple:
1668 """
1669 Call LLM (non-streaming) with full integration.
1670 Adapted from original AgenticWorkspaceChat._call_llm()
1671 """
1672 from openai import AsyncOpenAI
1673 from anylegal_oss.core.llm_provider import llm_provider
1674 from anylegal_oss.workspace.services import get_provider_extra_body
1675
1676 provider_config = llm_provider.get_provider_config("chat")
1677 if not provider_config:
1678 raise ValueError("LLM provider not configured")
1679
1680 tool_defs = get_workspace_tools()
1681
1682 current_mode = getattr(self._session_state, "mode", "default") if self._session_state else "default"
1683 plan_done = bool(getattr(self._session_state, "plan_already_approved", False)) if self._session_state else False
1684 if current_mode == "plan":
1685 from anylegal_oss.workspace.tools.workspace_tools import PLAN_MODE_TOOL_NAMES
1686 tool_defs = [t for t in tool_defs if t["name"] in PLAN_MODE_TOOL_NAMES]
1687 else:
1688
1689 blocked = {"exit_plan_mode"}
1690 if plan_done:
1691 blocked.add("enter_plan_mode")
1692 tool_defs = [t for t in tool_defs if t["name"] not in blocked]
1693
1694 from anylegal_oss.workspace.tools.skill_tool import apply_skill_scope
1695 tool_defs = apply_skill_scope(tool_defs, self._session_state)
1696
1697 tools = []
1698 for tool_def in tool_defs:
1699 params = tool_def.get("input_schema") or tool_def.get("parameters") or {}
1700 tools.append({
1701 "type": "function",
1702 "function": {
1703 "name": tool_def["name"],
1704 "description": tool_def.get("description", ""),
1705 "parameters": params,
1706 }
1707 })
1708
1709 extra = get_provider_extra_body(self.model) or {}
1710 reasoning_effort = os.getenv("REASONING_EFFORT", "medium")
1711 if reasoning_effort != "none":
1712 extra["reasoning"] = {"effort": reasoning_effort, "exclude": False}
1713
1714 async with AsyncOpenAI(
1715 api_key=provider_config["api_key"],
1716 base_url=provider_config["base_url"],
1717 default_headers=provider_config.get("default_headers", {}),
1718 timeout=LLM_HTTP_TIMEOUT,
1719 ) as client:
1720 try:
1721 response = await client.chat.completions.create(
1722 model=self.model,
1723 messages=messages,
1724 tools=tools if tools else None,
1725 tool_choice="auto",
1726 max_tokens=64000,
1727 extra_body=extra if extra else None,
1728 )
1729
1730 content = self._extract_content(response)
1731
1732 tool_calls = self._extract_tool_calls(response)
1733
1734 if not tool_calls and content:
1735 from.tool_call_rescue import rescue_tool_calls_from_content
1736 allowed = [t["name"] for t in get_workspace_tools()]
1737 rescued, content = rescue_tool_calls_from_content(content, allowed)
1738 if rescued:
1739 tool_calls = rescued
1740
1741 finish_reason = None
1742 try:
1743 choices = response.choices or []
1744 if choices:
1745 finish_reason = getattr(choices[0], "finish_reason", None)
1746 except Exception:
1747 pass
1748
1749 usage = response.usage
1750 if usage:
1751 prompt_tokens = usage.prompt_tokens
1752 completion_tokens = usage.completion_tokens
1753
1754 cost = self._calculate_cost(self.model, prompt_tokens, completion_tokens)
1755 else:
1756 prompt_tokens = completion_tokens = 0
1757 cost = 0.0
1758
1759 logger.info(
1760 f"[AGENTIC_ASYNC] non-stream summary: finish_reason={finish_reason} "
1761 f"completion_tokens={completion_tokens} content_chars={len(content or '')} "
1762 f"tool_calls={len(tool_calls)}"
1763 )
1764 if finish_reason == "length":
1765 logger.warning(
1766 f"[AGENTIC_ASYNC] OUTPUT TRUNCATED (non-stream): "
1767 f"completion_tokens={completion_tokens} — provider capped the response."
1768 )
1769
1770 return content, tool_calls, {
1771 "prompt_tokens": prompt_tokens,
1772 "completion_tokens": completion_tokens,
1773 "cost": cost,
1774 "finish_reason": finish_reason,
1775 }
1776
1777 except Exception as e:
1778 logger.error(f"LLM call failed: {e}")
1779 raise
1780
1781 async def _call_llm_streaming(self, messages: List[Dict]) -> AsyncGenerator[Dict[str, Any], None]:
1782 """
1783 Streaming variant of ``_call_llm``. Yields deltas as they arrive:
1784
1785 {"type": "content", "delta": str} — per content token
1786 {"type": "done", "content": str,
1787 "tool_calls": [...], "usage": {...}} — terminal event with
1788 full accumulated state
1789
1790 The outer reactive loop emits ``AgenticEvent(type="text_chunk", …)``
1791 for each content delta so the frontend renders text as it arrives.
1792 Tool-call deltas are accumulated internally and flushed on ``done``.
1793
1794 Tests mock this method directly (see test_enter_exit_plan_mode.py).
1795 The non-streaming ``_call_llm`` is retained for the planner-service
1796 path and any code that needs a single awaited tuple.
1797 """
1798 from openai import AsyncOpenAI
1799 from anylegal_oss.core.llm_provider import llm_provider
1800 from anylegal_oss.workspace.services import get_provider_extra_body
1801
1802 provider_config = llm_provider.get_provider_config("chat")
1803 if not provider_config:
1804 raise ValueError("LLM provider not configured")
1805
1806 tool_defs = get_workspace_tools()
1807
1808 current_mode = getattr(self._session_state, "mode", "default") if self._session_state else "default"
1809 plan_done = bool(getattr(self._session_state, "plan_already_approved", False)) if self._session_state else False
1810 if current_mode == "plan":
1811 from anylegal_oss.workspace.tools.workspace_tools import PLAN_MODE_TOOL_NAMES
1812 tool_defs = [t for t in tool_defs if t["name"] in PLAN_MODE_TOOL_NAMES]
1813 else:
1814 blocked = {"exit_plan_mode"}
1815 if plan_done:
1816 blocked.add("enter_plan_mode")
1817 tool_defs = [t for t in tool_defs if t["name"] not in blocked]
1818
1819 tools = []
1820 for tool_def in tool_defs:
1821 params = tool_def.get("input_schema") or tool_def.get("parameters") or {}
1822 tools.append({
1823 "type": "function",
1824 "function": {
1825 "name": tool_def["name"],
1826 "description": tool_def.get("description", ""),
1827 "parameters": params,
1828 }
1829 })
1830
1831 extra = get_provider_extra_body(self.model) or {}
1832 reasoning_effort = os.getenv("REASONING_EFFORT", "medium")
1833 if reasoning_effort != "none":
1834 extra["reasoning"] = {"effort": reasoning_effort, "exclude": False}
1835
1836 accumulated_content = ""
1837 accumulated_reasoning = ""
1838 accumulated_tc: Dict[int, Dict[str, Any]] = {}
1839 prompt_tokens = 0
1840 completion_tokens = 0
1841 finish_reason: Optional[str] = None
1842
1843 debug_reasoning = os.getenv("DEBUG_REASONING_STREAM", "").lower() in ("1", "true", "yes")
1844 first_delta_logged = False
1845 saw_reasoning = False
1846
1847 # Retry transient upstream drops (e.g. OpenRouter / akashml dropping the
1848 # SSE mid-turn) only when nothing has been yielded to the caller yet.
1849 # Once the model has emitted reasoning or content, we surface the error
1850 # rather than re-issue the request — retrying mid-stream would either
1851 # double-render or diverge from the partial output the frontend already
1852 # has.
1853 from openai import APIConnectionError, APITimeoutError, RateLimitError
1854 _transient_llm_errors = (APIConnectionError, APITimeoutError, RateLimitError)
1855 _llm_retry_backoffs = (1.0, 2.0) # 2 retries on top of the initial attempt
1856
1857 async with AsyncOpenAI(
1858 api_key=provider_config["api_key"],
1859 base_url=provider_config["base_url"],
1860 default_headers=provider_config.get("default_headers", {}),
1861 timeout=LLM_HTTP_TIMEOUT,
1862 ) as client:
1863 for _attempt_idx in range(len(_llm_retry_backoffs) + 1):
1864 try:
1865 stream = await client.chat.completions.create(
1866 model=self.model,
1867 messages=messages,
1868 tools=tools if tools else None,
1869 tool_choice="auto",
1870 max_tokens=64000,
1871 stream=True,
1872 stream_options={"include_usage": True},
1873 extra_body=extra if extra else None,
1874 )
1875
1876 async for chunk in stream:
1877
1878 ch_usage = getattr(chunk, "usage", None)
1879 if ch_usage:
1880 prompt_tokens = getattr(ch_usage, "prompt_tokens", 0) or 0
1881 completion_tokens = getattr(ch_usage, "completion_tokens", 0) or 0
1882
1883 choices = getattr(chunk, "choices", None) or []
1884 if not choices:
1885 continue
1886
1887 fr = getattr(choices[0], "finish_reason", None)
1888 if fr:
1889 finish_reason = fr
1890 delta = getattr(choices[0], "delta", None)
1891 if not delta:
1892 continue
1893
1894 if debug_reasoning and not first_delta_logged:
1895 first_delta_logged = True
1896 try:
1897
1898 dumped = delta.model_dump() if hasattr(delta, "model_dump") else (
1899 delta.dict() if hasattr(delta, "dict") else vars(delta)
1900 )
1901 logger.info(f"[AGENTIC_ASYNC] first delta keys={list(dumped.keys())} payload={dumped}")
1902 except Exception as e:
1903 logger.info(f"[AGENTIC_ASYNC] first delta dump failed: {e}; type={type(delta).__name__}")
1904
1905 reasoning_delta = getattr(delta, "reasoning", None) or getattr(delta, "reasoning_content", None)
1906 if reasoning_delta:
1907 if debug_reasoning and not saw_reasoning:
1908 saw_reasoning = True
1909 logger.info(f"[AGENTIC_ASYNC] reasoning tokens DETECTED (first chunk len={len(reasoning_delta)})")
1910 accumulated_reasoning += reasoning_delta
1911 yield {"type": "reasoning", "delta": reasoning_delta, "accumulated": accumulated_reasoning}
1912
1913 content_delta = getattr(delta, "content", None)
1914 if content_delta:
1915 accumulated_content += content_delta
1916 yield {"type": "content", "delta": content_delta}
1917
1918 tc_deltas = getattr(delta, "tool_calls", None) or []
1919 for tc_delta in tc_deltas:
1920 idx = getattr(tc_delta, "index", 0) or 0
1921 slot = accumulated_tc.setdefault(idx, {
1922 "id": "", "name": "", "arguments": "", "index": idx,
1923 })
1924 if getattr(tc_delta, "id", None):
1925 slot["id"] = tc_delta.id
1926 fn = getattr(tc_delta, "function", None)
1927 if fn:
1928 if getattr(fn, "name", None):
1929 slot["name"] = fn.name
1930 if getattr(fn, "arguments", None):
1931 slot["arguments"] += fn.arguments
1932
1933 except _transient_llm_errors as e:
1934 already_yielded = bool(accumulated_content) or bool(accumulated_reasoning)
1935 if already_yielded or _attempt_idx >= len(_llm_retry_backoffs):
1936 logger.error(
1937 f"LLM streaming call failed "
1938 f"(attempt {_attempt_idx + 1}, "
1939 f"yielded_content={bool(accumulated_content)}, "
1940 f"yielded_reasoning={bool(accumulated_reasoning)}): "
1941 f"{type(e).__name__}: {e}"
1942 )
1943 raise
1944 backoff = _llm_retry_backoffs[_attempt_idx]
1945 logger.warning(
1946 f"LLM streaming transient error on attempt {_attempt_idx + 1} "
1947 f"before any output emitted; retrying in {backoff}s: "
1948 f"{type(e).__name__}: {e}"
1949 )
1950 await asyncio.sleep(backoff)
1951 first_delta_logged = False
1952 saw_reasoning = False
1953 continue
1954 except Exception as e:
1955 logger.error(f"LLM streaming call failed: {e}")
1956 raise
1957 else:
1958 break
1959
1960 tool_calls: List[Dict[str, Any]] = []
1961 tc_parse_failures = 0
1962 for idx in sorted(accumulated_tc.keys()):
1963 slot = accumulated_tc[idx]
1964 name = (slot["name"] or "").strip()
1965 raw_args = slot["arguments"] or "{}"
1966 args = self._parse_tool_arguments(raw_args, name)
1967
1968 parse_failed = not args and raw_args.strip() not in ("", "{}")
1969 if parse_failed:
1970 tc_parse_failures += 1
1971 if not name:
1972 continue
1973
1974 tool_calls.append({
1975 "id": slot["id"],
1976 "name": name,
1977 "arguments": args,
1978 "index": idx,
1979 "parse_failed": parse_failed,
1980 "raw_arguments": raw_args if parse_failed else None,
1981 })
1982
1983 rescued_count = 0
1984 if not tool_calls and accumulated_content:
1985 from.tool_call_rescue import rescue_tool_calls_from_content
1986 allowed = [t["name"] for t in get_workspace_tools()]
1987 rescued, accumulated_content = rescue_tool_calls_from_content(
1988 accumulated_content, allowed,
1989 )
1990 if rescued:
1991 tool_calls = rescued
1992 rescued_count = len(rescued)
1993
1994 cost = self._calculate_cost(self.model, prompt_tokens, completion_tokens) if (prompt_tokens or completion_tokens) else 0.0
1995
1996 logger.info(
1997 f"[AGENTIC_ASYNC] stream summary: finish_reason={finish_reason} "
1998 f"completion_tokens={completion_tokens} content_chars={len(accumulated_content)} "
1999 f"reasoning_chars={len(accumulated_reasoning)} tool_calls={len(tool_calls)} "
2000 f"tc_parse_failures={tc_parse_failures} rescued={rescued_count}"
2001 )
2002
2003 if finish_reason == "length" or tc_parse_failures > 0:
2004 truncated_tc_names = [
2005 (accumulated_tc[i].get("name") or "?")
2006 for i in sorted(accumulated_tc.keys())
2007 ]
2008 logger.warning(
2009 f"[AGENTIC_ASYNC] OUTPUT TRUNCATED: finish_reason={finish_reason} "
2010 f"completion_tokens={completion_tokens} tc_parse_failures={tc_parse_failures} "
2011 f"tool_names={truncated_tc_names}. Provider capped the response — "
2012 f"the tool call arguments were cut mid-JSON. "
2013 f"Consider: (a) raising max_tokens if the provider allows, "
2014 f"(b) switching to a provider without this cap, or "
2015 f"(c) teaching the skill to split drafts via input_files."
2016 )
2017
2018 yield {
2019 "type": "done",
2020 "content": accumulated_content,
2021 "tool_calls": tool_calls,
2022 "finish_reason": finish_reason,
2023 "usage": {
2024 "prompt_tokens": prompt_tokens,
2025 "completion_tokens": completion_tokens,
2026 "cost": cost,
2027 },
2028 }
2029
2030 def _extract_content(self, response: Any) -> str:
2031 """Extract text content from an LLM response (dict or SDK object)."""
2032 try:
2033 choices = response.get("choices", []) if isinstance(response, dict) else response.choices
2034 if not choices:
2035 return ""
2036 first = choices[0]
2037 if isinstance(first, dict):
2038 message = first.get("message", {})
2039 content = message.get("content", "") or ""
2040 else:
2041 message = first.message
2042 content = getattr(message, "content", "") or ""
2043 return content
2044 except Exception as e:
2045 logger.error(f"Error extracting content: {e}")
2046 return ""
2047
2048 def _extract_tool_calls(self, response: Any) -> List[Dict[str, Any]]:
2049 """Extract tool calls from an LLM response (dict or SDK object)."""
2050 tool_calls: List[Dict[str, Any]] = []
2051 try:
2052 choices = response.get("choices", []) if isinstance(response, dict) else response.choices
2053 if not choices:
2054 return []
2055 first = choices[0]
2056 if isinstance(first, dict):
2057 message = first.get("message", {})
2058 raw_calls = message.get("tool_calls") or []
2059 else:
2060 message = first.message
2061 raw_calls = getattr(message, "tool_calls", None) or []
2062
2063 for call in raw_calls:
2064 if isinstance(call, dict):
2065 call_type = call.get("type")
2066 func = call.get("function", {}) or {}
2067 call_id = call.get("id", "")
2068 call_index = call.get("index", 0)
2069 else:
2070 call_type = getattr(call, "type", "function")
2071 func_obj = getattr(call, "function", None)
2072 func = {
2073 "name": getattr(func_obj, "name", "") if func_obj else "",
2074 "arguments": getattr(func_obj, "arguments", "{}") if func_obj else "{}",
2075 }
2076 call_id = getattr(call, "id", "")
2077 call_index = getattr(call, "index", 0)
2078
2079 if call_type == "function":
2080 tool_name = (func.get("name", "") or "").strip()
2081 arguments = self._parse_tool_arguments(func.get("arguments", "{}"), tool_name)
2082 tool_calls.append({
2083 "id": call_id,
2084 "name": tool_name,
2085 "arguments": arguments,
2086 "index": call_index,
2087 })
2088 except Exception as e:
2089 logger.error(f"Error extracting tool calls: {e}")
2090 return tool_calls
2091
2092 def _parse_tool_arguments(self, args_str: str, tool_name: str) -> Dict[str, Any]:
2093 """Parse tool arguments JSON with basic repair."""
2094 if not args_str:
2095 return {}
2096 try:
2097 return json.loads(args_str)
2098 except json.JSONDecodeError:
2099
2100 if tool_name == "create_document" and '"content"' in args_str:
2101 try:
2102 repaired = args_str.rstrip()
2103 if not repaired.endswith('}'):
2104 if repaired.endswith('\\'):
2105 repaired = repaired[:-1]
2106 if not repaired.endswith('"'):
2107 repaired += '"'
2108 repaired += '}'
2109 parsed = json.loads(repaired)
2110 if 'path' in parsed and parsed['path']:
2111 return parsed
2112 except:
2113 pass
2114
2115 logger.warning(f"Failed to parse tool arguments for {tool_name}: {args_str[:100]}")
2116 return {}
2117
2118 def _calculate_cost(self, model: str, prompt_tokens: int, completion_tokens: int) -> float:
2119 """Calculate USD cost for this turn using the model registry.
2120
2121 Historical bug: this used to import ``get_cost_for_model`` which
2122 doesn't exist, so every call fell through to a $10/M fallback
2123 (~5-10× the real OpenRouter price for the models we use). Now uses
2124 the real ``calculate_action_cost`` and only falls back if the model
2125 is entirely unknown to the registry.
2126 """
2127 try:
2128 from anylegal_oss.core.pricing import calculate_action_cost
2129 result = calculate_action_cost(model, prompt_tokens, completion_tokens)
2130
2131 if isinstance(result, dict):
2132 return float(result.get("total_cost", 0.0))
2133 return float(result)
2134 except Exception as e:
2135 logger.warning(f"Failed to calculate cost for model {model}: {e}")
2136
2137 total_tokens = prompt_tokens + completion_tokens
2138 return (total_tokens / 1_000_000) * 1.00
2139
2140 async def _execute_tool(self, tool_call: Dict) -> ToolResult:
2141 """Execute a tool call using AsyncToolExecutor."""
2142 tool_name = tool_call["name"]
2143 arguments = tool_call["arguments"]
2144
2145 try:
2146
2147 executor = AsyncToolExecutor(
2148 session=self._workspace,
2149 user_id=getattr(self._workspace, 'user_id', None),
2150 model=self.model,
2151 session_state=self._session_state,
2152 agent_id=getattr(self, "_agent_id", None),
2153 )
2154 result = await executor.execute_async(tool_name, arguments)
2155 return result
2156 except Exception as e:
2157 logger.error(f"Tool execution error for {tool_name}: {e}")
2158 return ToolResult(
2159 success=False,
2160 tool_name=tool_name,
2161 result={},
2162 error=str(e),
2163 execution_time_ms=0,
2164 )
2165
2166 def cancel(self) -> None:
2167 """Cancel the running agentic loop"""
2168 self._cancelled = True
2169 logger.info("[AGENTIC_ASYNC] Cancellation requested")
2170
2171def create_agentic_chat_async(
2172 model: Optional[str] = None,
2173 system_prompt: Optional[str] = None,
2174 session_guard = None,
2175 planner_mode: bool = False,
2176 approved_plan: Optional[Dict[str, Any]] = None,
2177 approved_mode_change: Optional[Dict[str, Any]] = None,
2178) -> AgenticWorkspaceChatAsync:
2179 """Create a new async agentic chat instance."""
2180 return AgenticWorkspaceChatAsync(
2181 model=model,
2182 system_prompt=system_prompt,
2183 session_guard=session_guard,
2184 planner_mode=planner_mode,
2185 approved_plan=approved_plan,
2186 approved_mode_change=approved_mode_change,
2187 )
2188
2189