The Atlas AnyLegal OSS — documentation bound to its code
20 documents

The agentic chat loop, end to end

Follow one chat request from the SSE endpoint through the reactive turn loop to tool dispatch — the system's beating heart, doc claims bound to the code.

backend/anylegal_oss/workspace/tools/tool_executor.py391 lines · AsyncToolExecutor.execute_async L295–390
Outline 12 symbols
1"""
2Tool Executor
3
4Central dispatcher for executing workspace tools.
5Handles tool routing, argument validation, error handling, and result formatting.
6"""
7
8import logging
9import traceback
10from dataclasses import dataclass, field
11from datetime import datetime, timezone
12from typing import Dict, Any, Optional, Callable, List
13
14from ..session import WorkspaceSession
15from .document_tools import DOCUMENT_TOOLS
16from .web_tools import WEB_TOOLS
17from .legal_tools import LEGAL_TOOLS
18from .docx_tools import DOCX_TOOLS
19from .python_tools import PYTHON_TOOLS
20from .todo_tool import TODO_TOOLS
21from .mode_tools import MODE_TOOLS
22from .skill_tool import SKILL_TOOLS
23from .comment_tools import COMMENT_TOOLS
24from .wiki_tools import WIKI_TOOLS
25
26logger = logging.getLogger(__name__)
27
28@dataclass
29class ToolResult:
30 """Result of a tool execution."""
31 success: bool
32 tool_name: str
33 result: Dict[str, Any]
34 error: Optional[str] = None
35 execution_time_ms: float = 0.0
36 timestamp: str = field(default_factory=lambda: datetime.now(timezone.utc).isoformat())
37
38 def to_dict(self) -> Dict[str, Any]:
39 """Convert to dictionary for JSON serialization."""
40 return {
41 "success": self.success,
42 "tool_name": self.tool_name,
43 "result": self.result,
44 "error": self.error,
45 "execution_time_ms": self.execution_time_ms,
46 "timestamp": self.timestamp
47 }
48
49class ToolExecutor:
50 """
51 Executes tools on behalf of the agentic loop.
52
53 Manages tool dispatch, session context injection, and change tracking.
54 """
55
56 def __init__(
57 self,
58 session: WorkspaceSession,
59 user_id: Optional[int] = None,
60 model: Optional[str] = None,
61 session_state: Optional[Any] = None,
62 agent_id: Optional[str] = None,
63 ):
64 """
65 Initialize the tool executor.
66
67 Args:
68 session: Workspace session for document operations
69 user_id: User ID for billing and user-specific data
70 model: Model override for LLM-based tools
71 session_state: SessionState for tools that need per-session storage
72 (notably todo_write, which keys todos by agent_id or session_id).
73 agent_id: Current agent ID when running as a coordinator worker.
74 Used as the todo_write storage key so workers don't pollute
75 the parent session's todo list.
76 """
77 self.session = session
78 self.user_id = user_id
79 self.model = model
80 self.session_state = session_state
81 self.agent_id = agent_id
82
83 self._handlers: Dict[str, Callable] = {}
84 self._handlers.update(DOCUMENT_TOOLS)
85 self._handlers.update(WEB_TOOLS)
86 self._handlers.update(LEGAL_TOOLS)
87 self._handlers.update(DOCX_TOOLS)
88 self._handlers.update(PYTHON_TOOLS)
89 self._handlers.update(TODO_TOOLS)
90 self._handlers.update(MODE_TOOLS)
91 self._handlers.update(SKILL_TOOLS)
92 self._handlers.update(COMMENT_TOOLS)
93 self._handlers.update(WIKI_TOOLS)
94
95 self.tool_calls: List[Dict[str, Any]] = []
96
97 def execute(self, tool_name: str, arguments: Dict[str, Any]) -> ToolResult:
98 """
99 Execute a tool by name with given arguments.
100
101 Args:
102 tool_name: Name of the tool to execute
103 arguments: Tool arguments
104
105 Returns:
106 ToolResult with execution result or error
107 """
108 start_time = datetime.now(timezone.utc)
109
110 tool_name = (tool_name or "").strip()
111
112 handler = self._handlers.get(tool_name)
113 if not handler:
114 return ToolResult(
115 success=False,
116 tool_name=tool_name,
117 result={},
118 error=f"Unknown tool: {tool_name}. Available tools: {list(self._handlers.keys())}"
119 )
120
121 try:
122
123 enriched_args = self._enrich_arguments(tool_name, arguments)
124
125 result = handler(**enriched_args)
126
127 self._inject_post_execution_hints(tool_name, result)
128
129 execution_time = (datetime.now(timezone.utc) - start_time).total_seconds() * 1000
130
131 try:
132 from ..metrics import emit_tool_metric, emit_validation_metric
133 sid = self.session.session_id if self.session else None
134 emit_tool_metric(
135 tool_name=tool_name,
136 outcome="success" if result.get("success", True) else "failure",
137 duration_ms=execution_time,
138 session_id=sid,
139 )
140
141 if isinstance(result.get("validation"), dict):
142 v = result["validation"]
143 emit_validation_metric(
144 tool_name=tool_name,
145 level=v.get("level", "light"),
146 valid=bool(v.get("valid", True)),
147 errors_count=len(v.get("errors", [])),
148 warnings_count=len(v.get("warnings", [])),
149 repairs_made=int(v.get("repairs_made", 0) or 0),
150 session_id=sid,
151 )
152 except Exception:
153 pass
154
155 self.tool_calls.append({
156 "tool_name": tool_name,
157 "arguments": arguments,
158 "success": result.get("success", True),
159 "timestamp": datetime.now(timezone.utc).isoformat()
160 })
161
162 return ToolResult(
163 success=result.get("success", True),
164 tool_name=tool_name,
165 result=result,
166 error=result.get("error"),
167 execution_time_ms=execution_time
168 )
169
170 except Exception as e:
171 logger.error(f"Tool execution error for {tool_name}: {e}")
172 logger.debug(traceback.format_exc())
173
174 execution_time = (datetime.now(timezone.utc) - start_time).total_seconds() * 1000
175
176 return ToolResult(
177 success=False,
178 tool_name=tool_name,
179 result={},
180 error=f"Execution error: {str(e)}",
181 execution_time_ms=execution_time
182 )
183
184 def _enrich_arguments(
185 self,
186 tool_name: str,
187 arguments: Dict[str, Any]
188 ) -> Dict[str, Any]:
189 """
190 Enrich tool arguments with context from session.
191
192 Different tools need different context injected.
193 """
194 enriched = arguments.copy()
195
196 if tool_name in DOCUMENT_TOOLS:
197 enriched["session"] = self.session
198
199 if tool_name in LEGAL_TOOLS:
200 enriched["session"] = self.session
201 enriched["user_id"] = self.user_id
202 enriched["model"] = self.model
203
204 if tool_name in DOCX_TOOLS:
205 enriched["session"] = self.session
206
207 if tool_name in PYTHON_TOOLS:
208 enriched["session"] = self.session
209
210 if tool_name in TODO_TOOLS:
211 enriched["session_state"] = self.session_state
212 enriched["todo_key"] = self.agent_id or (self.session.session_id if self.session else "")
213
214 if tool_name in SKILL_TOOLS:
215 enriched["session"] = self.session
216 enriched["session_state"] = self.session_state
217 enriched["agent_id"] = self.agent_id
218
219 if tool_name in COMMENT_TOOLS:
220 enriched["session"] = self.session
221
222 if tool_name in WIKI_TOOLS:
223 enriched["session"] = self.session
224
225 return enriched
226
227 def _inject_post_execution_hints(
228 self, tool_name: str, result: Dict[str, Any]
229 ) -> None:
230 """Append context hints to a successful tool result (in-place).
231
232 Currently only nudges on run_code(language=node) when it produced
233 a DOCX and the session's todo list has pending items. Other tools
234 get no hint. Idempotent; safe to call on every tool result.
235 """
236 if tool_name != "run_code" or not result.get("success"):
237 return
238 if not self.session_state:
239 return
240
241 files = result.get("files_created") or []
242 if not any(
243 isinstance(f, dict) and f.get("type") == "docx"
244 and f.get("added_to_workspace")
245 for f in files
246 ):
247 return
248 todo_key = self.agent_id or (
249 self.session.session_id if self.session else ""
250 )
251 try:
252 todos = self.session_state.get_todos(todo_key) or []
253 except Exception:
254 return
255 if not todos:
256 return
257 pending = [t for t in todos if (t.get("status") or "").lower() != "completed"]
258 if not pending:
259 return
260
261 completed = len(todos) - len(pending)
262 sample = [
263 t.get("content") or t.get("activeForm") or "(unnamed)"
264 for t in pending[:3]
265 ]
266 result["todo_reminder"] = (
267 f"{completed}/{len(todos)} todo items complete. Remaining: "
268 + "; ".join(sample)
269 + (". Do NOT end your turn until every item is complete — "
270 "continue with the next run_code now.")
271 )
272
273 def get_available_tools(self) -> List[str]:
274 """Get list of available tool names."""
275 return list(self._handlers.keys())
276
277 def get_tool_call_history(self) -> List[Dict[str, Any]]:
278 """Get history of tool calls for this executor."""
279 return self.tool_calls.copy()
280
281class AsyncToolExecutor(ToolExecutor):
282 """
283 Async version of ToolExecutor for use with async agentic loop.
284
285 Uses async versions of web tools.
286 """
287
288 def __init__(self, *args, **kwargs):
289 super().__init__(*args, **kwargs)
290
291 from .web_tools import WEB_TOOLS_ASYNC
292 self._async_handlers = {**self._handlers}
293 self._async_handlers.update(WEB_TOOLS_ASYNC)
294
295 async def execute_async(
296 self,
297 tool_name: str,
298 arguments: Dict[str, Any]
299 ) -> ToolResult:
300 """
301 Execute a tool asynchronously.
302
303 Args:
304 tool_name: Name of the tool to execute
305 arguments: Tool arguments
306
307 Returns:
308 ToolResult with execution result or error
309 """
310 start_time = datetime.now(timezone.utc)
311
312 tool_name = (tool_name or "").strip()
313
314 handler = self._async_handlers.get(tool_name)
315 if not handler:
316 return ToolResult(
317 success=False,
318 tool_name=tool_name,
319 result={},
320 error=f"Unknown tool: {tool_name}"
321 )
322
323 try:
324
325 enriched_args = self._enrich_arguments(tool_name, arguments)
326
327 import asyncio
328 if asyncio.iscoroutinefunction(handler):
329 result = await handler(**enriched_args)
330 else:
331
332 loop = asyncio.get_event_loop()
333 result = await loop.run_in_executor(
334 None,
335 lambda: handler(**enriched_args)
336 )
337
338 execution_time = (datetime.now(timezone.utc) - start_time).total_seconds() * 1000
339
340 try:
341 from ..metrics import emit_tool_metric, emit_validation_metric
342 sid = self.session.session_id if self.session else None
343 emit_tool_metric(
344 tool_name=tool_name,
345 outcome="success" if result.get("success", True) else "failure",
346 duration_ms=execution_time,
347 session_id=sid,
348 )
349 if isinstance(result.get("validation"), dict):
350 v = result["validation"]
351 emit_validation_metric(
352 tool_name=tool_name,
353 level=v.get("level", "light"),
354 valid=bool(v.get("valid", True)),
355 errors_count=len(v.get("errors", [])),
356 warnings_count=len(v.get("warnings", [])),
357 repairs_made=int(v.get("repairs_made", 0) or 0),
358 session_id=sid,
359 )
360 except Exception:
361 pass
362
363 self.tool_calls.append({
364 "tool_name": tool_name,
365 "arguments": arguments,
366 "success": result.get("success", True),
367 "timestamp": datetime.now(timezone.utc).isoformat()
368 })
369
370 return ToolResult(
371 success=result.get("success", True),
372 tool_name=tool_name,
373 result=result,
374 error=result.get("error"),
375 execution_time_ms=execution_time
376 )
377
378 except Exception as e:
379 logger.error(f"Async tool execution error for {tool_name}: {e}")
380 logger.debug(traceback.format_exc())
381
382 execution_time = (datetime.now(timezone.utc) - start_time).total_seconds() * 1000
383
384 return ToolResult(
385 success=False,
386 tool_name=tool_name,
387 result={},
388 error=f"Execution error: {str(e)}",
389 execution_time_ms=execution_time
390 )
391