The agentic chat loop, end to end
Follow one chat request from the SSE endpoint through the reactive turn loop to tool dispatch — the system's beating heart, doc claims bound to the code.
backend/anylegal_oss/fastapi_app.py424 lines · agentic_chat L128–247
Outline 9 symbols
- lifespan function
- _env_flag function
- _check_chat_killswitch function
- health function
- metrics_endpoint function
- agentic_chat function
- compact_conversation function
- switch_session function
- list_sessions function
1import os
2import json
3import asyncio
4import logging
5from datetime import datetime, timezone
6from typing import Dict, Any, AsyncGenerator, Optional
7from contextlib import asynccontextmanager
8
9from fastapi import FastAPI, Request, HTTPException, Depends
10from fastapi.responses import StreamingResponse, PlainTextResponse
11from fastapi.middleware.cors import CORSMiddleware
12import uuid
13
14from anylegal_oss.workspace.session import WorkspaceSession
15from anylegal_oss.workspace.workspace import Workspace
16from anylegal_oss.workspace.agentic_chat_async import create_agentic_chat_async
17from anylegal_oss.state.session_guard import AsyncSessionGuard
18from anylegal_oss.state.transcript import record_transcript, flush_session_storage
19
20OSS_USER_ID = 1
21
22from anylegal_oss.db.database import get_user_preferred_model
23
24from anylegal_oss.services import (
25 get_metrics,
26 validate_agentic_request,
27)
28
29from anylegal_oss.api.v1 import models as _models_module
30from anylegal_oss.api.v1 import threads as _threads_module
31from anylegal_oss.documents.api import router as _documents_router
32from anylegal_oss.workspace import workspace_router as _workspace_router
33
34_LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper()
35if not logging.getLogger().handlers:
36 logging.basicConfig(
37 level=_LOG_LEVEL,
38 format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
39 )
40logging.getLogger("anylegal_oss").setLevel(_LOG_LEVEL)
41
42logger = logging.getLogger(__name__)
43
44@asynccontextmanager
45async def lifespan(app: FastAPI):
46 logger.info("FastAPI agentic service starting")
47
48 # DB and workspace migrations are not optional: a half-migrated schema
49 # serves 500s at runtime. Fail-fast so the process exits with logs the
50 # operator can act on rather than silently boot into a broken state.
51 from anylegal_oss.db.database import init_db, ensure_schema
52 from anylegal_oss.workspace.db import (
53 init_document_editor_tables,
54 migrate_document_sessions_table,
55 migrate_create_workspaces_table,
56 migrate_sessions_to_workspaces,
57 )
58 from anylegal_oss.lexwiki_compiler.db import migrate_create_workspace_wikis_table
59
60 init_db()
61 ensure_schema()
62 init_document_editor_tables()
63 migrate_document_sessions_table()
64 migrate_create_workspaces_table()
65 migrate_sessions_to_workspaces()
66 migrate_create_workspace_wikis_table()
67
68 # Workspace seeding is best-effort: an empty workspace is a usable
69 # state, so a seeding error should not block the service.
70 try:
71 from anylegal_oss.workspace.bootstrap import seed_default_workspace_if_empty
72 seed_default_workspace_if_empty(user_id=OSS_USER_ID)
73 except Exception as e:
74 logger.warning(f"Workspace seeding skipped: {e}", exc_info=True)
75
76 app.state.session_guard = AsyncSessionGuard()
77
78 yield
79
80 logger.info("FastAPI agentic service shutting down")
81
82app = FastAPI(
83 title="AnyLegal Agentic API",
84 description="Async agentic chat endpoints",
85 version="2.0.0",
86 lifespan=lifespan
87)
88
89app.add_middleware(
90 CORSMiddleware,
91 allow_origins=[
92 "http://localhost:3000",
93 "http://localhost:3001",
94 ],
95 allow_credentials=True,
96 allow_methods=["GET", "POST", "PUT", "DELETE", "OPTIONS"],
97 allow_headers=["Content-Type", "Authorization", "Accept", "Accept-Language"],
98)
99
100app.include_router(_models_module.router)
101app.include_router(_threads_module.router)
102app.include_router(_documents_router, prefix="/api/v1")
103app.include_router(_workspace_router)
104
105def _env_flag(name: str, default: str = "true") -> bool:
106 return os.getenv(name, default).strip().lower() in ("1", "true", "yes", "on")
107
108def _check_chat_killswitch():
109
110 if not _env_flag("CHAT_ENABLED", "true"):
111 raise HTTPException(status_code=503, detail="chat temporarily disabled")
112
113@app.get("/health")
114async def health():
115 return {
116 "status": "ok",
117 "service": "fastapi-agentic",
118 "timestamp": datetime.now(timezone.utc).isoformat(),
119 "chat_enabled": _env_flag("CHAT_ENABLED", "true"),
120 }
121
122@app.get("/metrics")
123async def metrics_endpoint():
124 metrics = get_metrics()
125 output = metrics.export_prometheus()
126 return PlainTextResponse(content=output, media_type="text/plain")
127
128@app.post("/api/v1/agentic/chat")
129async def agentic_chat(
130 request: Request,
131 validation: dict = Depends(validate_agentic_request)
132):
133 _check_chat_killswitch()
134 user_id = OSS_USER_ID
135 session_id = validation.session_id
136 message = validation.message
137 thread_id = validation.thread_id
138 max_turns = validation.max_turns
139 max_budget_usd = validation.max_budget_usd
140
141 model = validation.model or get_user_preferred_model(user_id)
142
143 planner_mode = getattr(validation, "planner_mode", False)
144 approved_plan = getattr(validation, "approved_plan", None)
145 approved_mode_change = getattr(validation, "approved_mode_change", None)
146 if getattr(validation, "deep_research_toggle", False):
147 planner_mode = True
148
149 request.state.user_id = user_id
150
151 # Defense-in-depth: with ANYLEGAL_PLANNER_MODE off (the OSS default), the
152 # enter_plan_mode/exit_plan_mode tools are not registered for the LLM, so
153 # the model can't request planner_mode itself. This guard catches the case
154 # where a non-default frontend ships `planner_mode: true` in the request
155 # body anyway.
156 if planner_mode and os.getenv("ANYLEGAL_PLANNER_MODE", "disabled").lower() != "enabled":
157 raise HTTPException(
158 status_code=503,
159 detail=(
160 "Planner mode is disabled in this deployment. "
161 "Set ANYLEGAL_PLANNER_MODE=enabled in .env and recreate the "
162 "backend container to enable it."
163 ),
164 )
165
166 guard = request.app.state.session_guard
167 acquired = await guard.acquire(session_id, timeout=300)
168 if not acquired:
169 raise HTTPException(
170 status_code=409,
171 detail="Another request is already running for this session"
172 )
173
174 try:
175
176 workspace = await asyncio.to_thread(Workspace.get_or_create, user_id)
177
178 active_doc = getattr(validation, "active_document", None)
179 if active_doc and active_doc in workspace.documents:
180 workspace.set_active_document(active_doc)
181 logger.info(
182 f"[AGENTIC_ASYNC] active_document set from request: {active_doc!r}"
183 )
184 elif active_doc:
185 logger.info(
186 f"[AGENTIC_ASYNC] active_document {active_doc!r} not in "
187 f"workspace ({len(workspace.documents)} docs) — ignoring"
188 )
189
190 agent = create_agentic_chat_async(
191 model=model,
192 session_guard=None,
193 planner_mode=planner_mode,
194 approved_plan=approved_plan,
195 approved_mode_change=approved_mode_change,
196 )
197
198 async def event_stream():
199 try:
200 async for event in agent.run_async(
201 session=workspace,
202 message=message,
203 user_id=user_id,
204 thread_id=thread_id,
205 max_turns=max_turns,
206 max_budget_usd=max_budget_usd,
207 ):
208
209 data_json = json.dumps({**event.data, "timestamp": event.timestamp})
210 yield f"event: {event.type}\ndata: {data_json}\n\n"
211
212 if os.getenv("EAGER_FLUSH", "false").lower() == "true":
213 await asyncio.to_thread(flush_session_storage)
214
215 except Exception as e:
216 logger.error(f"Error in agentic stream: {e}", exc_info=True)
217 error_data = json.dumps({
218 "error": str(e),
219 "timestamp": datetime.now(timezone.utc).isoformat()
220 })
221 yield f"event: error\ndata: {error_data}\n\n"
222 finally:
223
224 try:
225 await asyncio.to_thread(workspace.save)
226 except Exception as save_err:
227 logger.error(f"Failed to persist workspace at end of stream: {save_err}")
228
229 await guard.release(session_id)
230
231 return StreamingResponse(
232 event_stream(),
233 media_type="text/event-stream",
234 headers={
235 "Cache-Control": "no-cache",
236 "X-Accel-Buffering": "no",
237 "Connection": "keep-alive"
238 }
239 )
240
241 except HTTPException:
242 await guard.release(session_id)
243 raise
244 except Exception as e:
245 logger.error(f"Unexpected error: {e}", exc_info=True)
246 await guard.release(session_id)
247 raise HTTPException(status_code=500, detail="Internal server error")
248
249@app.post("/api/v1/agentic/compact")
250async def compact_conversation(
251 request: Request,
252):
253 """
254 Manual compaction endpoint.
255 """
256 _check_chat_killswitch()
257 user_id = OSS_USER_ID
258
259 try:
260 body = await request.json()
261 except Exception:
262 raise HTTPException(status_code=400, detail="Invalid JSON")
263
264 session_id = body.get("session_id")
265 thread_id = body.get("thread_id")
266 custom_instructions = body.get("custom_instructions")
267
268 if not session_id:
269 raise HTTPException(status_code=400, detail="session_id required")
270
271 guard = request.app.state.session_guard
272 acquired = await guard.acquire(session_id, timeout=300)
273 if not acquired:
274 raise HTTPException(status_code=409, detail="Session locked")
275
276 try:
277 workspace = WorkspaceSession(user_id=user_id, session_id=session_id)
278
279 from anylegal_oss.state import load_session_transcript
280 transcript = await asyncio.to_thread(load_session_transcript, session_id)
281
282 messages = []
283 for entry in transcript:
284 role = entry.get("role")
285 if role in ("user", "assistant"):
286 messages.append({
287 "role": role,
288 "content": entry.get("content", ""),
289 })
290
291 if not messages:
292 raise HTTPException(status_code=400, detail="No messages to compact")
293
294 from anylegal_oss.services.compaction.compactor import perform_compaction
295 from anylegal_oss.state.session_state import get_session_state
296 session_state = get_session_state(session_id)
297 result = await perform_compaction(
298 messages=messages,
299 session_state=session_state,
300 custom_instructions=custom_instructions,
301 is_auto=False,
302 )
303
304 if result["success"]:
305
306 if thread_id:
307 try:
308 from anylegal_oss.db import async_db as _async_db
309 summary_str = ""
310 for sm in (result.get("summary_messages") or []):
311 sm_content = sm.get("content")
312 if isinstance(sm_content, str):
313 summary_str = sm_content
314 elif isinstance(sm_content, list):
315 summary_str = "\n".join(
316 (b.get("text") or "")
317 for b in sm_content
318 if isinstance(b, dict)
319 )
320 if summary_str:
321 break
322 boundary_metadata = json.dumps({
323 "pre_tokens": result.get("pre_tokens", 0),
324 "post_tokens": result.get("post_tokens", 0),
325 "is_auto": False,
326 })
327 await _async_db.save_agentic_message(
328 session_id=session_id,
329 thread_id=thread_id,
330 user_id=user_id,
331 message_type='compaction_boundary',
332 content=summary_str,
333 tool_arguments=boundary_metadata,
334 )
335 except Exception as e:
336 logger.warning(f"[COMPACT] Failed to persist boundary row: {e}")
337
338 return {
339 "success": True,
340 "session_id": session_id,
341 "thread_id": thread_id,
342 "pre_tokens": result["pre_tokens"],
343 "post_tokens": result["post_tokens"],
344 "summary": result["summary"],
345 }
346 else:
347 raise HTTPException(status_code=500, detail=result.get("error", "Compaction failed"))
348
349 finally:
350 await guard.release(session_id)
351
352@app.post("/api/v1/agentic/sessions/switch")
353async def switch_session(
354 request: Request,
355):
356 """
357 Resume an existing session. Returns the prior agentic_messages for the
358 thread so the frontend can display them.
359
360 Body: {"session_id": "...", "thread_id": "..." (optional)}
361 """
362 _check_chat_killswitch()
363 user_id = OSS_USER_ID
364 try:
365 body = await request.json()
366 except Exception:
367 raise HTTPException(status_code=400, detail="Invalid JSON")
368
369 session_id = body.get("session_id")
370 thread_id = body.get("thread_id")
371 if not session_id:
372 raise HTTPException(status_code=400, detail="session_id required")
373
374 from anylegal_oss.db import async_db
375 try:
376 if thread_id:
377 rows = await async_db.get_agentic_thread_messages(thread_id=thread_id, limit=200)
378 else:
379 rows = await async_db.get_agentic_thread_messages(session_id=session_id, limit=200)
380 except Exception as e:
381 logger.error(f"switch_session: DB load failed: {e}")
382 raise HTTPException(status_code=500, detail="Failed to load session")
383
384 if not rows:
385 raise HTTPException(status_code=404, detail="Session not found or empty")
386
387 public = [
388 {
389 "role": r.get("message_type"),
390 "content": r.get("content"),
391 "tool_name": r.get("tool_name"),
392 "tool_call_id": r.get("tool_call_id"),
393 "created_at": r.get("created_at"),
394 }
395 for r in rows
396 ]
397
398 return {
399 "success": True,
400 "session_id": session_id,
401 "thread_id": thread_id,
402 "message_count": len(public),
403 "messages": public,
404 }
405
406@app.get("/api/v1/agentic/sessions")
407async def list_sessions(
408 request: Request,
409):
410 """
411 List available session transcripts.
412 """
413 _check_chat_killswitch()
414 from anylegal_oss.state import list_session_transcripts
415 sessions = await asyncio.to_thread(list_session_transcripts)
416
417 return {
418 "sessions": sessions,
419 "count": len(sessions),
420 }
421
422# For development entrypoint use `python -m uvicorn main:app` from
423# backend/, or `docker compose up`. There is no `__main__` block here on
424# purpose: uvicorn loads the `app` symbol directly.