The Atlas AnyLegal OSS — documentation bound to its code
20 documents

The DOCX tracked-change pipeline

How a plain-text edit from the model becomes surgical Word tracked-change markup that preserves formatting — from the skill the model reads to the OOXML engine and LibreOffice finalization.

backend/anylegal_oss/workspace/tools/document_tools.py1395 lines · edit_document L649–935
Outline 18 symbols
1"""
2Document Management Tool Implementations
3
4Workspace file CRUD with dual-mode support for text files and DOCX blobs:
5- list_documents
6- read_document (branching: DOCX text/xml view vs HTML as-is)
7- create_document (workspace text files only — anylegal.md, Playbook/*.md)
8- edit_document (branching: text→tracked-change for DOCX vs str.replace for HTML)
9- clone_document (versioned clone for editing)
10- create_folder, delete_document, delete_folder
11
12DOCX creation uses ``run_code`` (python-docx / docx-js), not this file.
13
14DOCX editing approach (edit_document):
15 LLM sends plain text old_text / new_text. Backend:
16 1. Finds old_text inside <w:t> elements (quote/case normalization)
17 2. Generates OOXML w:del/w:ins tracked-change markup
18 3. Preserves original formatting (w:rPr)
19 4. Validates, repacks blob, saves session
20
21For structural OOXML edits, see the docx-xml skill (run_code + lxml + zipfile).
22"""
23
24import io
25import os
26import re
27import logging
28from pathlib import Path
29from typing import Dict, Any, Optional, List
30
31from ..session import WorkspaceSession
32
33logger = logging.getLogger(__name__)
34
35def extract_xlsx_text(blob: bytes, filename: str = "", max_rows: int = 500) -> str:
36 """Extract spreadsheet content as markdown tables. Used at upload and on legacy re-read."""
37 try:
38 import openpyxl
39 wb = openpyxl.load_workbook(io.BytesIO(blob), data_only=False, read_only=True)
40 lines = []
41 total_rows = 0
42 for sheet in wb.sheetnames:
43 if total_rows >= max_rows:
44 lines.append(f"\n*[Truncated at {max_rows} rows — use run_python with openpyxl for full data]*")
45 break
46 ws = wb[sheet]
47 rows = []
48 for row in ws.iter_rows(values_only=True):
49 rows.append(row)
50 total_rows += 1
51 if total_rows >= max_rows:
52 break
53 if not rows:
54 continue
55 lines.append(f"## Sheet: {sheet}")
56
57 header = [str(c) if c is not None else "" for c in rows[0]]
58 lines.append("| " + " | ".join(header) + " |")
59 lines.append("| " + " | ".join(["---"] * len(header)) + " |")
60 for row in rows[1:]:
61 cells = [str(c) if c is not None else "" for c in row]
62
63 while len(cells) < len(header):
64 cells.append("")
65 cells = cells[:len(header)]
66 lines.append("| " + " | ".join(cells) + " |")
67 wb.close()
68 return "\n".join(lines) if lines else "[Empty spreadsheet]"
69 except Exception as e:
70 logger.warning(f"XLSX text extraction failed for {filename}: {e}")
71 return f"[XLSX extraction failed: {e}]"
72
73def extract_pptx_text(blob: bytes, filename: str = "") -> str:
74 """Extract presentation content as structured markdown. Used at upload and on legacy re-read."""
75 try:
76 from pptx import Presentation
77 prs = Presentation(io.BytesIO(blob))
78 lines = []
79 for i, slide in enumerate(prs.slides, 1):
80 title = ""
81 if slide.shapes.title:
82 title = slide.shapes.title.text
83 lines.append(f"## Slide {i}" + (f": {title}" if title else ""))
84 for shape in slide.shapes:
85 if shape.has_text_frame:
86 for para in shape.text_frame.paragraphs:
87 text = para.text.strip()
88 if text and text != title:
89 lines.append(text)
90 if shape.has_table:
91 table = shape.table
92 for row in table.rows:
93 cells = [cell.text.strip() for cell in row.cells]
94 lines.append("| " + " | ".join(cells) + " |")
95 if slide.has_notes_slide and slide.notes_slide.notes_text_frame:
96 notes = slide.notes_slide.notes_text_frame.text.strip()
97 if notes:
98 lines.append(f"\n> Notes: {notes}")
99 lines.append("")
100 return "\n".join(lines) if lines else "[Empty presentation]"
101 except Exception as e:
102 logger.warning(f"PPTX text extraction failed for {filename}: {e}")
103 return f"[PPTX extraction failed: {e}]"
104
105_DOC_MIME_TYPES = {"application/msword", "application/x-ole-storage", "application/octet-stream"}
106
107def _ensure_docx_blob(doc, session: WorkspaceSession) -> bool:
108 """
109 Ensure a document has a valid docx_blob.
110
111 If the document only has a binary_blob (legacy .doc upload), attempts
112 to convert it to .docx via the LibreOffice service. On success, stores
113 the converted bytes as docx_blob and saves the session.
114
115 Returns True if docx_blob is available (existing or newly converted).
116 """
117 if doc.docx_blob is not None:
118 return True
119
120 blob = getattr(doc, "binary_blob", None)
121 if not blob:
122 return False
123
124 mime = getattr(doc, "mime_type", None) or ""
125 is_doc_path = any(
126 getattr(doc, "description", "").lower().endswith(ext)
127 for ext in (".doc", ".dot")
128 )
129 if mime not in _DOC_MIME_TYPES and not is_doc_path:
130 return False
131
132 import requests as http_requests
133 libreoffice_url = os.environ.get("LIBREOFFICE_SERVICE_URL", "http://localhost:8002")
134 filename = getattr(doc, "description", None) or "document.doc"
135 try:
136 resp = http_requests.post(
137 f"{libreoffice_url}/convert",
138 files={"file": (filename, blob, "application/msword")},
139 params={"format": "docx"},
140 timeout=120,
141 )
142 if resp.status_code == 200 and resp.headers.get("content-type", "").startswith("application/"):
143 docx_bytes = resp.content
144
145 html_content = doc.content
146 try:
147 from ..docx_service import DocxService
148 html_content, _ = DocxService.docx_to_html(docx_bytes)
149 except Exception:
150 pass
151 doc.update_docx(docx_bytes, html_content)
152 doc.content = html_content
153 session.save()
154 logger.info(f".doc → .docx on-demand conversion succeeded for {filename}")
155 return True
156 else:
157 logger.warning(f".doc → .docx conversion failed (status {resp.status_code})")
158 except Exception as e:
159 logger.warning(f".doc → .docx conversion unavailable: {e}")
160
161 return False
162
163_XML_EDITING_REFERENCE = """
164## XML Editing Quick Reference (Advanced)
165
166NOTE: For normal editing, just use edit_document with plain text.
167The system generates tracked changes automatically.
168
169This reference is only needed for direct XML manipulation (advanced).
170""".strip()
171
172def list_documents(session: WorkspaceSession, folder: str = None, **kwargs) -> Dict[str, Any]:
173 """List all documents in the workspace, optionally filtered by folder."""
174 docs = []
175 for path, doc in session.documents.items():
176
177 if folder:
178 prefix = folder.rstrip('/') + '/'
179 if not path.startswith(prefix):
180 continue
181
182 parts = path.replace("\\", "/").split("/")
183 doc_folder = "/".join(parts[:-1]) + "/" if len(parts) > 1 else ""
184
185 docs.append({
186 "path": path,
187 "folder": doc_folder,
188 "description": doc.description,
189 "created_at": doc.created_at.isoformat(),
190 "modified_at": doc.modified_at.isoformat(),
191 "size": len(doc.content),
192 "is_active": path == session.active_document,
193 "format": doc.format,
194 "has_docx": doc.docx_blob is not None,
195 "has_binary": doc.binary_blob is not None,
196 "mime_type": doc.mime_type,
197 "is_synced": doc.is_synced
198 })
199
200 workspace_files = []
201 for wf_path, wf_content in session.workspace_files.items():
202 if folder:
203 prefix = folder.rstrip('/') + '/'
204 if not wf_path.startswith(prefix) and wf_path != folder.rstrip('/'):
205 continue
206 workspace_files.append({
207 "path": wf_path,
208 "type": "workspace_file",
209 "size": len(wf_content),
210 "editable": True,
211 })
212
213 skills = session.get_skill_files()
214
215 templates = session.get_template_files()
216
217 return {
218 "success": True,
219 "documents": docs,
220 "workspace_files": workspace_files,
221 "skills": skills,
222 "templates": templates,
223 "count": len(docs),
224 "workspace_file_count": len(workspace_files),
225 "active_document": session.active_document,
226 "folders": sorted(session.folders),
227 }
228
229def _apply_range(
230 content: str,
231 around_text: Optional[str] = None,
232 context_chars: int = 2000,
233 start_text: Optional[str] = None,
234 end_text: Optional[str] = None,
235 paragraph_range: Optional[List[int]] = None,
236) -> Dict[str, Any]:
237 """Slice ``content`` according to the requested range parameters.
238
239 Returns ``{content, range_info}``. If a requested range can't be
240 satisfied (anchor not found, indices out of bounds), returns
241 ``{error: str}`` instead. Caller should wrap in the standard
242 ``{success: False, error}`` shape.
243
244 Modes (mutually exclusive — caller must validate that at most one
245 is set; if multiple are set, ``around_text`` wins, then range, then
246 paragraphs):
247 - around_text: window of ``context_chars`` total around the first
248 occurrence of ``around_text``.
249 - start_text / end_text: content between two anchors (inclusive). If
250 ``end_text`` is missing, return from ``start_text`` to EOF.
251 - paragraph_range: ``[start_idx, end_idx]`` slice of paragraphs split
252 on single ``\\n``. Inclusive range.
253 """
254 total = len(content)
255
256 if around_text:
257 idx = content.find(around_text)
258 if idx < 0:
259 return {"error": f"around_text not found in document: {around_text!r}"}
260 match_end = idx + len(around_text)
261 half = max(0, context_chars - len(around_text)) // 2
262 start = max(0, idx - half)
263 end = min(total, match_end + half)
264 return {
265 "content": content[start:end],
266 "range_info": {
267 "mode": "around_text",
268 "anchor": around_text,
269 "anchor_offset": idx,
270 "start_offset": start,
271 "end_offset": end,
272 "total_size": total,
273 },
274 }
275
276 if start_text is not None:
277 s_idx = content.find(start_text)
278 if s_idx < 0:
279 return {"error": f"start_text not found in document: {start_text!r}"}
280 if end_text:
281 e_idx = content.find(end_text, s_idx + len(start_text))
282 if e_idx < 0:
283
284 end = total
285 end_anchor_found = False
286 else:
287 end = e_idx + len(end_text)
288 end_anchor_found = True
289 else:
290 end = total
291 end_anchor_found = None
292 return {
293 "content": content[s_idx:end],
294 "range_info": {
295 "mode": "start_end_text",
296 "start_anchor": start_text,
297 "end_anchor": end_text,
298 "end_anchor_found": end_anchor_found,
299 "start_offset": s_idx,
300 "end_offset": end,
301 "total_size": total,
302 },
303 }
304
305 if paragraph_range is not None:
306 if len(paragraph_range) != 2:
307 return {"error": "paragraph_range must be [start_idx, end_idx]"}
308 p_start, p_end = paragraph_range
309 paragraphs = content.split("\n")
310 n = len(paragraphs)
311 if p_start < 0 or p_start >= n:
312 return {"error": f"paragraph_range start {p_start} out of bounds (0..{n-1})"}
313
314 p_end_clamped = min(p_end, n - 1)
315 sliced = paragraphs[p_start : p_end_clamped + 1]
316 return {
317 "content": "\n".join(sliced),
318 "range_info": {
319 "mode": "paragraph_range",
320 "requested": [p_start, p_end],
321 "applied": [p_start, p_end_clamped],
322 "paragraph_count_total": n,
323 "paragraph_count_returned": len(sliced),
324 },
325 }
326
327 return {"content": content, "range_info": None}
328
329def read_document(
330 session: WorkspaceSession,
331 path: str,
332 view: str = "text",
333 around_text: Optional[str] = None,
334 context_chars: int = 2000,
335 start_text: Optional[str] = None,
336 end_text: Optional[str] = None,
337 paragraph_range: Optional[List[int]] = None,
338 **kwargs
339) -> Dict[str, Any]:
340 """
341 Read the content of a document.
342
343 Dual-mode branching:
344 - DOCX-native (has docx_blob):
345 - view="text" (default): plain text for analysis
346 - view="xml": pretty-printed document.xml for editing
347 - HTML-native (no docx_blob): return HTML content as-is
348
349 Range params (optional, mutually exclusive — at most one should be
350 set; multiple set evaluates in this priority order: around_text,
351 start_text/end_text, paragraph_range):
352 - around_text + context_chars: slice ±context_chars/2 around the
353 first occurrence of around_text. Use after edits to verify a
354 specific region without re-reading the whole document.
355 - start_text + end_text: slice between two anchors, inclusive. If
356 end_text is missing in the doc, returns start_text to EOF.
357 - paragraph_range: [start_idx, end_idx] slice of paragraphs (split
358 on \\n), inclusive. Out-of-bounds end is clamped to last paragraph.
359
360 Range params apply only to text view of DOCX, workspace_files, and
361 HTML docs. They're ignored for view="xml" (where you want the full
362 OOXML for structural work).
363 """
364
365 if not path or not path.strip() or len(path.strip()) < 3:
366 if session.active_document:
367 logger.info(f"[READ-DOC] No valid path provided, using active document: {session.active_document}")
368 path = session.active_document
369
370 range_kwargs = dict(
371 around_text=around_text,
372 context_chars=context_chars,
373 start_text=start_text,
374 end_text=end_text,
375 paragraph_range=paragraph_range,
376 )
377 range_active = bool(around_text or start_text or paragraph_range)
378
379 def _maybe_slice(resp: Dict[str, Any]) -> Dict[str, Any]:
380 """Apply range params to a successful text response, if requested."""
381 if not range_active or not resp.get("success"):
382 return resp
383 sliced = _apply_range(resp.get("content", ""), **range_kwargs)
384 if "error" in sliced:
385 return {"success": False, "error": sliced["error"], "path": resp.get("path")}
386 resp["content"] = sliced["content"]
387 resp["size"] = len(sliced["content"])
388 resp["range_info"] = sliced["range_info"]
389 return resp
390
391 doc = session.get_document(path)
392
393 if not doc:
394
395 wf_content = session.get_workspace_file(path)
396 if wf_content is not None:
397 return _maybe_slice({
398 "success": True,
399 "path": path,
400 "content": wf_content,
401 "size": len(wf_content),
402 "doc_type": "workspace_file",
403 "view": "text",
404 "editable": True,
405 })
406
407 _canonical_path = path if path.startswith("Skills/") else (
408 "Skills/" + path[len("skills/"):] if path.startswith("skills/") else None
409 )
410 if _canonical_path:
411 content = session.read_skill_file(_canonical_path)
412 if content is not None:
413 return _maybe_slice({
414 "success": True,
415 "path": path,
416 "content": content,
417 "size": len(content),
418 "doc_type": "skill",
419 "view": "text",
420 "editable": False,
421 })
422
423 return {
424 "success": False,
425 "error": f"Document not found: {path}",
426 "available_documents": list(session.documents.keys()),
427 "available_workspace_files": list(session.workspace_files.keys()),
428 }
429
430 _ensure_docx_blob(doc, session)
431
432 if doc.docx_blob is not None:
433 doc_type = "docx"
434
435 if view == "xml":
436
437 xml_content = doc.document_xml
438 if xml_content is None:
439 return {
440 "success": False,
441 "error": "Failed to extract XML from DOCX blob",
442 "doc_type": "docx"
443 }
444
445 return {
446 "success": True,
447 "path": path,
448 "content": xml_content,
449 "xml_editing_reference": _XML_EDITING_REFERENCE,
450 "description": doc.description,
451 "modified_at": doc.modified_at.isoformat(),
452 "size": len(xml_content),
453 "doc_type": "docx",
454 "view": "xml",
455 "format": doc.format,
456 "has_docx": True,
457 "is_synced": doc.is_synced,
458 "docx_size": len(doc.docx_blob) if doc.docx_blob else 0,
459 "hint": (
460 "This is the raw XML of word/document.xml (advanced view). "
461 "For editing, use edit_document with plain text old_text and "
462 "new_text — the system generates tracked changes automatically."
463 ),
464 }
465
466 try:
467 from ..docx_xml_service import extract_plain_text
468 content = extract_plain_text(doc.docx_blob)
469 except Exception as e:
470 logger.warning(f"DOCX text extraction failed for {path}: {e}")
471 content = doc.content
472
473 return _maybe_slice({
474 "success": True,
475 "path": path,
476 "content": content,
477 "description": doc.description,
478 "modified_at": doc.modified_at.isoformat(),
479 "size": len(content),
480 "doc_type": doc_type,
481 "view": "text",
482 "format": doc.format,
483 "has_docx": True,
484 "is_synced": doc.is_synced,
485 "docx_size": len(doc.docx_blob) if doc.docx_blob else 0,
486 "hint": (
487 "This is plain text extracted from the DOCX. "
488 "To edit, use edit_document with old_text (exact text to change) "
489 "and new_text (replacement). Tracked changes are generated automatically."
490 ),
491 })
492
493 if doc.binary_blob and doc.content.startswith("[Binary file:"):
494 if doc.format == "xlsx" or (doc.mime_type and "spreadsheet" in doc.mime_type):
495 doc.content = extract_xlsx_text(doc.binary_blob, doc.description or path)
496 elif doc.format == "pptx" or (doc.mime_type and "presentation" in doc.mime_type):
497 doc.content = extract_pptx_text(doc.binary_blob, doc.description or path)
498
499 return _maybe_slice({
500 "success": True,
501 "path": path,
502 "content": doc.content,
503 "description": doc.description,
504 "modified_at": doc.modified_at.isoformat(),
505 "size": len(doc.content),
506 "doc_type": "html",
507 "view": "text",
508 "format": doc.format,
509 "has_docx": False,
510 "is_synced": doc.is_synced,
511 })
512
513def create_document(
514 session: WorkspaceSession,
515 path: str,
516 content: str,
517 description: str = "",
518 **kwargs
519) -> Dict[str, Any]:
520 """
521 Create or overwrite a workspace text file.
522
523 Primary use: anylegal.md, Playbook/*.md, and other plain-text workspace
524 files. For DOCX documents, use ``run_code`` (python-docx / docx-js) or
525 ``clone_document`` — this tool does NOT produce DOCX blobs.
526
527 (Historically named ``write_document``; renamed Feb 2026. The alias was
528 removed Apr 2026 once all callers and model calls migrated.)
529 """
530 BINARY_DOC_EXTS = (".docx", ".xlsx", ".pptx", ".pdf")
531 if any(path.lower().endswith(ext) for ext in BINARY_DOC_EXTS):
532 ext = path.lower().rsplit(".", 1)[-1]
533 return {
534 "success": False,
535 "error": (
536 f"create_document does not produce binary documents. "
537 f"For .{ext} files, use run_code with python-docx / openpyxl / "
538 f"python-pptx, or call clone_document to copy a template. "
539 f"For text/markdown content, save to a .md path instead."
540 ),
541 }
542
543 is_workspace_path = (
544 path == "anylegal.md"
545 or path.endswith("/anylegal.md")
546 or path.startswith("Playbook/")
547 )
548 if is_workspace_path:
549 already_exists = path in session.workspace_files
550 session.set_workspace_file(path, content)
551 if path == "Playbook/positions.md":
552 session.set_playbook(content)
553
554 parts = path.replace("\\", "/").split("/")
555 if len(parts) > 1:
556 folder = "/".join(parts[:-1]) + "/"
557 session.folders.add(folder)
558 session.save()
559 result = {
560 "success": True,
561 "path": path,
562 "action": "overwritten" if already_exists else "created",
563 "size": len(content),
564 "doc_type": "workspace_file",
565 }
566
567 if path.endswith("/anylegal.md") and path != "anylegal.md":
568 root_instructions = session.workspace_files.get("anylegal.md", "")
569 if not root_instructions or not root_instructions.strip():
570 result["warning"] = (
571 f"You created folder-level instructions at '{path}', but the root "
572 f"anylegal.md (shown as 'Instructions' in the sidebar) is still empty. "
573 f"If this was meant to be the user's main instructions, use path "
574 f"'anylegal.md' instead. Folder-level instructions only apply to "
575 f"documents inside that specific folder."
576 )
577 return result
578
579 if path.startswith("Skills/"):
580 return {
581 "success": False,
582 "error": f"'{path}' is read-only. Skills/ contains system skill files.",
583 }
584
585 if path.startswith("Templates/"):
586 filename = path.replace("\\", "/").split("/")[-1]
587 return {
588 "success": False,
589 "error": (
590 f"Cannot write to Templates/ — only users can manage templates. "
591 f"Save the document to a regular path instead, e.g. '{filename}' or "
592 f"'Client Projects/{filename}'."
593 ),
594 }
595
596 is_markdown_output = path.lower().endswith('.md')
597 is_new = path not in session.documents
598
599 session.add_document(
600 path=path,
601 content=content,
602 description=description,
603 set_active=True
604 )
605
606 parts = path.replace("\\", "/").split("/")
607 if len(parts) > 1:
608 folder = "/".join(parts[:-1]) + "/"
609 session.folders.add(folder)
610
611 result = {
612 "success": True,
613 "path": path,
614 "action": "created" if is_new else "overwritten",
615 "size": len(content),
616 "is_active": True,
617 }
618
619 if is_markdown_output:
620
621 doc = session.get_document(path)
622 if doc:
623 doc.format = "markdown"
624 doc.mime_type = "text/markdown"
625 doc.binary_blob = content.encode('utf-8')
626 result["format"] = "markdown"
627 result["has_docx"] = False
628 else:
629
630 try:
631 from ..docx_service import DocxService
632
633 doc = session.get_document(path)
634 if doc:
635 docx_bytes = DocxService.markdown_to_docx(content)
636 doc.update_docx(docx_bytes, content)
637 result["format"] = "docx"
638 result["docx_size"] = len(docx_bytes)
639 result["has_docx"] = True
640 except Exception as e:
641 logger.error(f"Markdown→DOCX conversion failed for {path}: {e}", exc_info=True)
642 result["format"] = "markdown"
643 result["has_docx"] = False
644 result["docx_error"] = str(e)
645
646 session.save()
647 return result
648
649def edit_document(
650 session: WorkspaceSession,
651 path: str,
652 old_text: str = "",
653 new_text: str = "",
654 explanation: str = "",
655 start_text: str = "",
656 end_text: str = "",
657 near_text: str = "",
658 **kwargs
659) -> Dict[str, Any]:
660 """
661 Find and replace content in a document.
662
663 Dual-mode branching:
664 - DOCX-native (has docx_blob):
665 1. Text-level edit: find old_text in <w:t> content, generate tracked
666 changes (w:del/w:ins) automatically. The LLM sends plain text.
667 2. Raw XML fallback: if old_text is an XML fragment found in the raw
668 document.xml, do direct string replacement (advanced).
669 - HTML-native (no docx_blob): str.replace on HTML content.
670 """
671
672 if not path or not path.strip() or len(path.strip()) < 3:
673 if session.active_document:
674 logger.info(f"[DOCX-EDIT] No valid path provided, using active document: {session.active_document}")
675 path = session.active_document
676 else:
677 return {"success": False, "error": "No document path provided and no active document set."}
678
679 doc = session.get_document(path)
680 if not doc:
681
682 wf_content = session.get_workspace_file(path)
683 if wf_content is not None:
684 if old_text not in wf_content:
685 return {
686 "success": False,
687 "error": f"Text not found in workspace file: {path}",
688 "doc_type": "workspace_file",
689 }
690 count = wf_content.count(old_text)
691 if count > 1:
692 return {
693 "success": False,
694 "error": f"Text appears {count} times. Include more context to make it unique.",
695 "doc_type": "workspace_file",
696 }
697 new_content = wf_content.replace(old_text, new_text, 1)
698 session.set_workspace_file(path, new_content)
699
700 if path == "Playbook/positions.md":
701 session.playbook = new_content
702 session.save()
703 return {
704 "success": True,
705 "path": path,
706 "doc_type": "workspace_file",
707 "explanation": explanation or "",
708 }
709
710 if path.startswith("Skills/"):
711 return {
712 "success": False,
713 "error": f"'{path}' is read-only and cannot be edited.",
714 }
715
716 if path.startswith("Templates/"):
717 return {
718 "success": False,
719 "error": f"'{path}' is a template and cannot be edited by the agent. Use read_document to read it.",
720 }
721
722 return {
723 "success": False,
724 "error": f"Document not found: {path}",
725 "available_documents": list(session.documents.keys()),
726 "available_workspace_files": list(session.workspace_files.keys()),
727 }
728
729 _ensure_docx_blob(doc, session)
730
731 if doc.docx_blob is not None:
732
733 resolved_path, cloned_from = resolve_or_clone_to_v2(session, path)
734 if resolved_path != path:
735 path = resolved_path
736 doc = session.get_document(path)
737 if doc is None or doc.docx_blob is None:
738 return {
739 "success": False,
740 "error": (
741 f"auto-clone-to-v2 produced path {resolved_path!r} "
742 f"but the document is missing or has no DOCX blob"
743 ),
744 }
745 try:
746 from ..docx_xml_service import (
747 apply_text_edit,
748 validate_document_xml,
749 )
750
751 xml_content = doc.document_xml
752 if xml_content is None:
753 return {
754 "success": False,
755 "error": "Failed to extract XML from DOCX blob",
756 "doc_type": "docx"
757 }
758
759 new_xml = None
760 edit_info = {}
761 info = {}
762
763 if start_text and end_text:
764 from ..docx_xml_service import apply_range_delete
765 result_xml, info = apply_range_delete(
766 xml_content, start_text, end_text
767 )
768 if result_xml is not None:
769 new_xml = result_xml
770 edit_info = info
771 logger.info(
772 f"[DOCX-EDIT] Range delete on '{path}': "
773 f"{info.get('paragraphs_deleted', 0)} paragraphs"
774 )
775 else:
776 return {
777 "success": False,
778 "error": info.get("error", "Range delete failed."),
779 "suggestion": info.get("suggestion", ""),
780 "doc_type": "docx",
781 }
782
783 if new_xml is None and old_text:
784 result_xml, info = apply_text_edit(
785 xml_content, old_text, new_text, near_text=near_text
786 )
787 if result_xml is not None:
788 new_xml = result_xml
789 edit_info = info
790 logger.info(
791 f"[DOCX-EDIT] Text-level edit on '{path}': "
792 f"matched '{info.get('matched_text', '')[:60]}'"
793 )
794
795 if new_xml is None and old_text in xml_content:
796 from ..docx_xml_service import (
797 _toc_paragraph_ranges,
798 _in_tracked_change,
799 )
800
801 toc_ranges = _toc_paragraph_ranges(xml_content)
802
803 body_count = 0
804 body_pos = None
805 search_start = 0
806 while True:
807 pos = xml_content.find(old_text, search_start)
808 if pos == -1:
809 break
810 if not _in_tracked_change(pos, toc_ranges):
811 body_count += 1
812 if body_pos is None:
813 body_pos = pos
814 search_start = pos + 1
815
816 if body_count > 1:
817 return {
818 "success": False,
819 "error": (
820 f"XML fragment appears {body_count} times in document body. "
821 "Include more context to make it unique."
822 ),
823 "doc_type": "docx",
824 }
825 elif body_count == 1:
826
827 new_xml = (
828 xml_content[:body_pos]
829 + new_text
830 + xml_content[body_pos + len(old_text) :]
831 )
832 edit_info = {"mode": "raw_xml"}
833 logger.info(f"[DOCX-EDIT] Raw XML edit on '{path}' (TOC-aware)")
834
835 if new_xml is None:
836 if not old_text and not (start_text and end_text):
837 return {
838 "success": False,
839 "error": "Provide old_text/new_text for editing, or start_text/end_text for range deletion.",
840 "doc_type": "docx",
841 }
842 error_msg = info.get("error", "Text not found in document.")
843 return {
844 "success": False,
845 "error": error_msg,
846 "suggestion": info.get("suggestion", ""),
847 "nearby_text": info.get("nearby_text", ""),
848 "doc_type": "docx",
849 }
850
851 errors = validate_document_xml(new_xml)
852 if errors:
853 logger.warning(f"[DOCX-EDIT] Validation warnings: {errors}")
854 parse_errors = [
855 e for e in errors if e.startswith("XML parse error")
856 ]
857 if parse_errors:
858 return {
859 "success": False,
860 "error": (
861 "Edit produced invalid XML. "
862 f"Errors: {'; '.join(parse_errors)}"
863 ),
864 "doc_type": "docx",
865 "validation_errors": errors,
866 }
867
868 doc.update_document_xml(new_xml)
869
870 try:
871 session.save()
872 except Exception as save_err:
873 logger.warning(f"[DOCX-EDIT] Session save failed: {save_err}")
874
875 result = {
876 "success": True,
877 "path": path,
878 "explanation": explanation or "",
879 "doc_type": "docx",
880 "has_docx": True,
881 "docx_updated": True,
882 }
883
884 if cloned_from:
885 result["cloned_from"] = cloned_from
886 result["message"] = (
887 f"First edit on '{cloned_from}' — created working copy "
888 f"'{path}'. Original preserved; further edits in this "
889 f"session will target the working copy."
890 )
891 if edit_info.get("matched_text"):
892 result["matched_text"] = edit_info["matched_text"]
893 if edit_info.get("revision_ids"):
894 result["revision_ids"] = edit_info["revision_ids"]
895 if new_text:
896 result["replacement_text"] = new_text
897 if errors:
898 result["validation_warnings"] = errors
899
900 try:
901 from ..docx_xml_service import _context_around_revision
902 ctx = _context_around_revision(
903 new_xml, edit_info.get("revision_ids", [])
904 )
905 if ctx:
906 result["context_around_edit"] = ctx
907 except Exception:
908 pass
909
910 return result
911
912 except Exception as e:
913 logger.error(f"DOCX edit failed for {path}: {e}", exc_info=True)
914 return {
915 "success": False,
916 "error": f"DOCX edit failed: {str(e)}",
917 "doc_type": "docx"
918 }
919
920 result = session.edit_document(path, old_text, new_text)
921
922 if result["success"]:
923 if explanation:
924 result["explanation"] = explanation
925 result["doc_type"] = "html"
926
927 updated_doc = session.get_document(path)
928 if updated_doc:
929 result["content"] = updated_doc.content
930
931 if doc and doc.docx_blob:
932 result["has_docx"] = True
933 result["is_synced"] = doc.is_synced
934
935 return result
936
937def clone_document(
938 session: "WorkspaceSession",
939 source_path: str,
940 target_path: str = "",
941 **kwargs,
942) -> Dict[str, Any]:
943 """
944 Clone a document to create the next version in a version chain.
945
946 Law firm versioning: original → v2 → v3 → v4. All versions preserved.
947 The model always passes the ORIGINAL document path. The backend automatically:
948 1. Finds the latest existing version (v2, v3, etc.)
949 2. Clones FROM the latest version TO the next version number
950 3. Sets the new version as the active document
951
952 Example chain:
953 clone_document("Contract.docx") → clones original → Contract_v2.docx
954 clone_document("Contract.docx") → clones v2 → Contract_v3.docx
955 clone_document("Contract.docx") → clones v3 → Contract_v4.docx
956 """
957 logger.info(f"clone_document: source_path={source_path!r}, target_path={target_path!r}")
958
959 base_path = _strip_version_suffix(source_path)
960
961 base_doc = session.get_document(base_path)
962 if not base_doc:
963
964 base_doc = session.get_document(source_path)
965 if base_doc:
966 base_path = source_path
967 else:
968
969 latest_path, next_version = _find_latest_version(session, base_path)
970 if latest_path != base_path and session.get_document(latest_path):
971
972 logger.info(f"clone_document: base {base_path!r} not found, but found {latest_path}")
973 elif session.active_document:
974
975 logger.warning(f"clone_document: garbled path {source_path!r}, falling back to active document: {session.active_document}")
976 source_path = session.active_document
977 base_path = _strip_version_suffix(source_path)
978 base_doc = session.get_document(base_path)
979 if not base_doc:
980 base_doc = session.get_document(source_path)
981 if base_doc:
982 base_path = source_path
983 else:
984 available = list(session.documents.keys())
985 logger.warning(f"clone_document: source not found: {source_path}. Available: {available}")
986 return {
987 "success": False,
988 "error": f"Source document not found: {source_path}. Available documents: {available}",
989 }
990
991 latest_path, next_version = _find_latest_version(session, base_path)
992 latest_doc = session.get_document(latest_path)
993 if not latest_doc:
994
995 latest_doc = base_doc
996 latest_path = base_path
997
998 _ensure_docx_blob(latest_doc, session)
999
1000 if not latest_doc.docx_blob and not latest_doc.content and not latest_doc.binary_blob:
1001 return {
1002 "success": False,
1003 "error": f"Source document '{latest_path}' has no content to clone.",
1004 }
1005
1006 if not target_path or not target_path.strip():
1007 target_path = _version_path(base_path, next_version)
1008
1009 while target_path in session.documents:
1010 next_version += 1
1011 target_path = _version_path(base_path, next_version)
1012
1013 session.add_document(
1014 path=target_path,
1015 content=latest_doc.content or "",
1016 description=f"v{next_version} of {Path(base_path).name}",
1017 set_active=True,
1018 )
1019
1020 doc = session.get_document(target_path)
1021 if doc:
1022 if latest_doc.docx_blob:
1023 doc.update_docx(latest_doc.docx_blob, latest_doc.content or "")
1024 doc.format = "docx"
1025 elif latest_doc.binary_blob:
1026 doc.binary_blob = latest_doc.binary_blob
1027 doc.mime_type = getattr(latest_doc, "mime_type", None) or "application/octet-stream"
1028 doc.format = getattr(latest_doc, "format", "other")
1029
1030 norm = target_path.replace("\\", "/")
1031 if "/" in norm:
1032 folder = norm.rsplit("/", 1)[0] + "/"
1033 session.folders.add(folder)
1034
1035 session.save()
1036
1037 cloned_from = latest_path if latest_path != base_path else "original"
1038 logger.info(f"clone_document: {cloned_from}{target_path!r} (v{next_version}, docx={bool(latest_doc.docx_blob)})")
1039 return {
1040 "success": True,
1041 "path": target_path,
1042 "source_path": latest_path,
1043 "version": next_version,
1044 "format": "docx" if latest_doc.docx_blob else "other",
1045 "has_docx": bool(latest_doc.docx_blob),
1046 }
1047
1048def _has_version_suffix(path: str) -> bool:
1049 """True if path ends in ``_v{N}.{ext}`` — model is working on an explicit version."""
1050 p = Path(path)
1051 return bool(re.search(r'_v\d+$', p.stem))
1052
1053def resolve_or_clone_to_v2(
1054 session: "WorkspaceSession",
1055 path: str,
1056) -> tuple:
1057 """
1058 Map a mutating edit's target path to the actual write target, auto-cloning
1059 originals to ``_v2.docx`` on first edit. Protects the original against
1060 model-introduced corruption (bad OOXML, wrong span, hallucinated content)
1061 by preserving it as the pristine reference.
1062
1063 Rules:
1064 * Path is ALREADY versioned (``Contract_v2.docx``, ``Contract_v3.docx``)
1065 → return as-is. The caller explicitly picked this version; we don't
1066 second-guess. Subsequent edits chain onto the same version until the
1067 user asks for a new one via ``clone_document`` (non-agent paths).
1068 * Path is the original (``Contract.docx``) AND a ``_v{N}`` already
1069 exists in this workspace:
1070 - If the latest ``_v{N}`` was finalized via accept_all_changes /
1071 reject_all_changes (no output_path) and not yet edited again,
1072 **clone to ``_v{N+1}`` and return the new path**. This is the
1073 Option B "round-bump" — each finalize event closes a round,
1074 the next edit starts a fresh version chain. Matches the
1075 lawyer's mental model of "version = round of negotiation."
1076 - Otherwise redirect to the existing ``_v{N}`` — edits within a
1077 round keep piling onto the same working copy.
1078 * Path is the original, no ``_v{N}`` exists, and the document has a
1079 DOCX blob → clone to ``_v2.docx`` and return the new path.
1080 * Path has no DOCX blob (missing doc, plain-text workspace file like
1081 ``anylegal.md``) → return path unchanged. Downstream tool handles
1082 the missing-blob error and no safety concern applies to text files.
1083
1084 Returns ``(target_path, cloned_from)``. ``cloned_from`` is the source path
1085 when a new version was just created; ``None`` otherwise (including when
1086 the caller was redirected to an existing ``_v{N}``). Tools return both
1087 fields in their result so the UI can render a "v2 created" affordance
1088 distinct from "you're still editing the working copy."
1089
1090 Disable via ``AUTO_CLONE_ON_FIRST_EDIT=false`` — useful for tests that
1091 want deterministic in-place edits.
1092 """
1093 import os
1094
1095 if os.getenv("AUTO_CLONE_ON_FIRST_EDIT", "true").lower() == "false":
1096 return path, None
1097
1098 if _has_version_suffix(path):
1099 return path, None
1100
1101 base_path = path
1102 doc = session.get_document(base_path)
1103 if doc is None:
1104 return path, None
1105 if not getattr(doc, "docx_blob", None):
1106 return path, None
1107
1108 latest_path, _next_version = _find_latest_version(session, base_path)
1109 if latest_path != base_path:
1110
1111 latest_doc = session.get_document(latest_path)
1112 if latest_doc is not None and getattr(latest_doc, "finalized_at", None) is not None:
1113 new_version_path = _version_path(base_path, _next_version)
1114 clone_result = clone_document(
1115 session=session,
1116 source_path=base_path,
1117 target_path=new_version_path,
1118 )
1119 if clone_result.get("success"):
1120 target = clone_result["path"]
1121 logger.info(
1122 f"resolve_or_clone_to_v2: round-bump on {base_path!r}"
1123 f"{latest_path} was finalized, cloned to {target!r} for new round"
1124 )
1125 return target, latest_path
1126
1127 logger.warning(
1128 f"resolve_or_clone_to_v2: round-bump clone failed "
1129 f"({clone_result.get('error')}); piling onto {latest_path}"
1130 )
1131 return latest_path, None
1132
1133 result = clone_document(session=session, source_path=base_path)
1134 if not result.get("success"):
1135 logger.warning(
1136 f"resolve_or_clone_to_v2: clone of {base_path!r} failed "
1137 f"({result.get('error')}); editing in place"
1138 )
1139 return path, None
1140
1141 target = result["path"]
1142 logger.info(
1143 f"resolve_or_clone_to_v2: first edit on {base_path!r} → cloned to "
1144 f"{target!r} (original preserved)"
1145 )
1146 return target, base_path
1147
1148def _strip_version_suffix(path: str) -> str:
1149 """Remove _v2, _v3 etc. from a path: Contract_v3.docx → Contract.docx."""
1150 p = Path(path)
1151 stem = re.sub(r'_v\d+$', '', p.stem)
1152 parent = str(p.parent) if str(p.parent) != "." else ""
1153 name = f"{stem}{p.suffix}"
1154 return f"{parent}/{name}" if parent else name
1155
1156def _version_path(base_path: str, version: int) -> str:
1157 """Generate a versioned path: Contract.docx + version=3 → Contract_v3.docx."""
1158 p = Path(base_path)
1159 stem = re.sub(r'_v\d+$', '', p.stem)
1160 suffix = p.suffix or ".docx"
1161 parent = str(p.parent) if str(p.parent) != "." else ""
1162 name = f"{stem}_v{version}{suffix}"
1163 return f"{parent}/{name}" if parent else name
1164
1165def _find_latest_version(session: "WorkspaceSession", base_path: str) -> tuple:
1166 """
1167 Find the highest version of a document in the workspace.
1168
1169 Returns (latest_path, next_version_number).
1170 If no versions exist, returns (base_path, 2).
1171 """
1172 p = Path(base_path)
1173 stem = re.sub(r'_v\d+$', '', p.stem)
1174 suffix = p.suffix or ".docx"
1175 parent = str(p.parent) if str(p.parent) != "." else ""
1176
1177 highest_version = 1
1178 latest_path = base_path
1179
1180 for doc_path in session.documents:
1181 doc_p = Path(doc_path)
1182 doc_parent = str(doc_p.parent) if str(doc_p.parent) != "." else ""
1183 if doc_parent != parent or doc_p.suffix.lower() != suffix.lower():
1184 continue
1185 match = re.match(rf'^{re.escape(stem)}_v(\d+)$', doc_p.stem)
1186 if match:
1187 v = int(match.group(1))
1188 if v > highest_version:
1189 highest_version = v
1190 latest_path = doc_path
1191
1192 return latest_path, highest_version + 1
1193
1194def create_folder(session: WorkspaceSession, folder_path: str, **kwargs) -> Dict[str, Any]:
1195 """Create a folder in the workspace. Used by the /setup skill to scaffold folder structure."""
1196 if not folder_path or not folder_path.strip():
1197 return {"success": False, "error": "folder_path is required"}
1198
1199 clean = folder_path.replace("\\", "/").strip("/") + "/"
1200
1201 top = clean.split("/")[0].lower()
1202 if top == "skills":
1203 return {"success": False, "error": "Cannot create folders inside Skills/ — it is a system folder"}
1204
1205 session.create_folder(clean)
1206 session.save()
1207 return {
1208 "success": True,
1209 "folder_path": clean,
1210 "message": f"Folder '{clean}' created successfully",
1211 }
1212
1213def delete_document(session: WorkspaceSession, path: str, **kwargs) -> Dict[str, Any]:
1214 """Delete a single document or workspace file from the workspace."""
1215 if not path or not path.strip():
1216 return {"success": False, "error": "path is required"}
1217 clean = path.replace("\\", "/").strip("/")
1218
1219 if clean.split("/")[0] == "Skills":
1220 return {"success": False, "error": "Cannot delete files from the Skills/ system folder"}
1221 removed = session.remove_document(clean)
1222 if not removed:
1223
1224 if clean in session.workspace_files:
1225 del session.workspace_files[clean]
1226 if clean == "anylegal.md" or clean == "agents.md":
1227 session.agents_md = ""
1228 removed = True
1229 if removed:
1230 return {"success": True, "path": clean, "message": f"'{clean}' deleted"}
1231 return {"success": False, "error": f"'{clean}' not found in workspace"}
1232
1233def delete_folder(session: WorkspaceSession, folder_path: str, **kwargs) -> Dict[str, Any]:
1234 """Delete a user folder and all its contents. System folders (Skills/, Templates/) are protected."""
1235 if not folder_path or not folder_path.strip():
1236 return {"success": False, "error": "folder_path is required"}
1237 try:
1238 count = session.delete_folder(folder_path)
1239 clean = folder_path.replace("\\", "/").strip("/") + "/"
1240 return {
1241 "success": True,
1242 "folder_path": clean,
1243 "documents_deleted": count,
1244 "message": f"Folder '{clean}' and {count} file(s) deleted",
1245 }
1246 except ValueError as e:
1247 return {"success": False, "error": str(e)}
1248
1249def instantiate_template(
1250 session: WorkspaceSession,
1251 template_path: str,
1252 output_path: str,
1253 replacements: Dict[str, str],
1254 **kwargs,
1255) -> Dict[str, Any]:
1256 """
1257 Create a new DOCX from a template by filling placeholders, with NO tracked
1258 changes in the output. The original template is untouched.
1259
1260 Implementation: clone the template's bytes to a new doc at ``output_path``,
1261 apply ``apply_text_edit`` for each replacement (which generates valid
1262 tracked-change OOXML preserving run properties — ``<w:rPr>``), then accept
1263 all the freshly-created revision IDs to produce a clean final output.
1264
1265 This reuses the battle-tested matcher from ``edit_document`` (multi-run
1266 safe, smart-quote tolerant, whitespace-agnostic) without duplicating
1267 ~200 lines of match-finding logic. The output has no ``<w:ins>``/``<w:del>``
1268 markup — the document looks identical to a fresh manual fill.
1269
1270 Args:
1271 session: Workspace session
1272 template_path: Path to the source template (e.g. "Templates/Board_Resolution.docx")
1273 output_path: Path for the new document (e.g. "Acme Board Res 2026-04-25.docx")
1274 replacements: Dict of placeholder text → replacement text. Each key
1275 must appear in the template; missing keys are reported in
1276 ``not_found``.
1277
1278 Returns:
1279 ``{success, output_path, applied: [...], not_found: [...], doc_type: "docx"}``
1280 """
1281 from ..docx_xml_service import (
1282 apply_text_edit,
1283 accept_specific_changes,
1284 repack_docx,
1285 validate_document_xml,
1286 )
1287
1288 try:
1289 if not template_path:
1290 return {"success": False, "error": "template_path is required"}
1291 if not output_path:
1292 return {"success": False, "error": "output_path is required"}
1293 if not replacements:
1294 return {"success": False, "error": "replacements must be a non-empty dict"}
1295
1296 template_doc = session.get_document(template_path)
1297 if not template_doc:
1298 return {
1299 "success": False,
1300 "error": f"Template not found: {template_path}",
1301 "available_documents": list(session.documents.keys()),
1302 }
1303 _ensure_docx_blob(template_doc, session)
1304 if not template_doc.docx_blob:
1305 return {
1306 "success": False,
1307 "error": f"Template {template_path!r} has no DOCX blob — cannot instantiate.",
1308 }
1309
1310 original_blob = template_doc.docx_blob
1311 from ..docx_xml_service import extract_document_xml
1312 try:
1313 xml_content = extract_document_xml(original_blob)
1314 except Exception:
1315 xml_content = template_doc.document_xml
1316 if xml_content is None:
1317 return {
1318 "success": False,
1319 "error": "Failed to extract document.xml from template",
1320 }
1321
1322 applied: List[str] = []
1323 not_found: List[str] = []
1324 all_revision_ids: List[int] = []
1325
1326 for old_text, new_text in replacements.items():
1327 if not old_text:
1328 continue
1329 new_xml, info = apply_text_edit(xml_content, old_text, str(new_text))
1330 if new_xml is None:
1331 not_found.append(old_text)
1332 continue
1333 xml_content = new_xml
1334 applied.append(old_text)
1335 for rid in info.get("revision_ids", []):
1336 try:
1337 all_revision_ids.append(int(rid))
1338 except (TypeError, ValueError):
1339 pass
1340
1341 if all_revision_ids:
1342 xml_content, _ = accept_specific_changes(xml_content, all_revision_ids)
1343
1344 errors = validate_document_xml(xml_content)
1345 parse_errors = [e for e in errors if e.startswith("XML parse error")]
1346 if parse_errors:
1347 return {
1348 "success": False,
1349 "error": f"XML invalid after instantiation: {parse_errors[0]}",
1350 "validation_errors": errors,
1351 "applied": applied,
1352 "not_found": not_found,
1353 }
1354
1355 new_blob = repack_docx(original_blob, xml_content)
1356
1357 if output_path in session.documents:
1358 existing = session.get_document(output_path)
1359 existing.update_docx(new_blob)
1360 else:
1361 session.add_document(
1362 path=output_path,
1363 content="",
1364 description=f"Instantiated from {template_path}",
1365 set_active=True,
1366 )
1367 session.get_document(output_path).update_docx(new_blob)
1368
1369 session.save()
1370
1371 return {
1372 "success": True,
1373 "output_path": output_path,
1374 "doc_type": "docx",
1375 "applied": applied,
1376 "not_found": not_found,
1377 "template_path": template_path,
1378 }
1379
1380 except Exception as e:
1381 logger.error(f"instantiate_template failed: {e}", exc_info=True)
1382 return {"success": False, "error": str(e)}
1383
1384DOCUMENT_TOOLS = {
1385 "list_documents": list_documents,
1386 "read_document": read_document,
1387 "create_document": create_document,
1388 "edit_document": edit_document,
1389 "clone_document": clone_document,
1390 "create_folder": create_folder,
1391 "delete_document": delete_document,
1392 "delete_folder": delete_folder,
1393 "instantiate_template": instantiate_template,
1394}
1395