The Atlas AnyLegal OSS — documentation bound to its code
20 documents

Run it locally: services, sandbox, knowledge

From `docker compose up` to the hardened code sandbox and the optional knowledge sidecar — what boots, what to lock down, and what is opt-in.

backend/anylegal_oss/lexwiki_compiler/compiler_job.py449 lines · compile_workspace L243–448
Outline 8 symbols
1"""Per-workspace compile cycle.
2
3Reads encrypted workspace blob from sqlite -> writes decrypted source docs
4to a scratch dir -> runs LexWiki compile + lint -> stuffs the result back
5into workspace_wikis (encrypted). Wipes scratch on the way out.
6"""
7
8from __future__ import annotations
9
10import hashlib
11import json
12import logging
13import re
14import shutil
15import time
16from datetime import datetime, timezone
17from pathlib import Path
18from typing import Any, Dict, List, Optional, Tuple
19
20from .config import Config
21from . import lexwiki_runner
22
23logger = logging.getLogger(__name__)
24
25EXCLUDED_PREFIXES = (
26 "Skills/",
27 "Templates/",
28 "Playbook/",
29 "Wiki/",
30 ".compiled/",
31 "anylegal.md",
32)
33
34TEXT_FORMATS = {"txt", "md", "markdown"}
35
36def _is_compilable(path: str, doc: Dict[str, Any], has_blob: bool = False) -> bool:
37 """A doc is compilable if it has either pre-extracted text content OR a
38 docx_blob we can run through lexwiki's extractor. Skills/Templates/etc.
39 are always excluded.
40 """
41 if any(path.startswith(prefix) or path == prefix.rstrip('/') for prefix in EXCLUDED_PREFIXES):
42 return False
43 content = doc.get("content")
44 if isinstance(content, str) and content.strip():
45 return True
46 if has_blob:
47 return True
48 return False
49
50def _safe_filename(path: str) -> str:
51 """Convert a workspace path like 'Clients/Acme/MSA.docx' into a flat
52 filename like 'clients__acme__msa.md'. Lowercased, slashes -> double
53 underscore, non-alnum dropped. We always emit .md because all workspace
54 content is already extracted text.
55 """
56 stem = path.rsplit('.', 1)[0] if '.' in path.rsplit('/', 1)[-1] else path
57 flat = stem.replace('/', '__').replace('\\', '__')
58 safe = re.sub(r'[^a-zA-Z0-9_\-]+', '_', flat).strip('_').lower()
59 return (safe or "doc")[:100] + ".md"
60
61def _doc_signature(
62 path: str,
63 doc: Dict[str, Any],
64 blob: Optional[bytes] = None,
65) -> str:
66 """Per-doc hash for the per-doc skip path. Mirrors the
67 (path, modified_at, content_len|blob_len) tuple used by
68 _compute_source_hash so 'changed' means the same thing at both levels.
69 """
70 modified_at = str(doc.get("modified_at") or doc.get("created_at") or "")
71 content = doc.get("content") or ""
72 size = len(content) if content else len(blob or b"")
73 return hashlib.sha256(repr((path, modified_at, size)).encode('utf-8')).hexdigest()
74
75def _compute_source_hash(
76 documents: Dict[str, Any],
77 docx_blobs: Optional[Dict[str, bytes]] = None,
78) -> str:
79 """Hash of compilable docs' (path, modified_at, content_len|blob_len) tuples.
80 Used to skip recompile when nothing material changed at the workspace level.
81 """
82 docx_blobs = docx_blobs or {}
83 tuples: List[Tuple[str, str, int]] = []
84 for path, doc in sorted(documents.items()):
85 if not _is_compilable(path, doc, has_blob=bool(docx_blobs.get(path))):
86 continue
87 modified_at = str(doc.get("modified_at") or doc.get("created_at") or "")
88 content = doc.get("content") or ""
89 size = len(content) if content else len(docx_blobs.get(path) or b"")
90 tuples.append((path, modified_at, size))
91 h = hashlib.sha256()
92 for t in tuples:
93 h.update(repr(t).encode('utf-8'))
94 return h.hexdigest()
95
96def _serialize_page_to_markdown(page: Dict[str, Any]) -> str:
97 """Round-trip a wiki_data page back to the on-disk markdown lexwiki wrote.
98
99 Used to rehydrate `wiki_dir/<slug>.md` for unchanged docs so that the
100 index rebuild + backlinks passes see the full corpus, not just docs we
101 actually recompiled this turn. Annotations are NOT serialized — they
102 live only in the encrypted DB; lexwiki has no concept of them.
103 """
104 import yaml # noqa: WPS433
105 fm = page.get("frontmatter") or {}
106 body = page.get("compiled_body") or ""
107 try:
108 fm_yaml = yaml.safe_dump(fm, sort_keys=False, default_flow_style=False).strip()
109 except Exception:
110
111 fm_yaml = "title: \"" + str(fm.get("title", "")).replace('"', "'") + "\""
112 return f"---\n{fm_yaml}\n---\n\n{body}"
113
114def _rehydrate_unchanged_pages(
115 skip_set: Dict[str, Dict[str, Any]],
116 raw_dir: Path,
117 wiki_dir: Path,
118) -> int:
119 """For each (source_path -> {stem, slug, page}) in skip_set:
120
121 1. Write the prior compiled page back to wiki_dir/<slug>.md so the
122 index rebuild + backlinks passes see the full corpus.
123 2. Write a fresh marker at raw_dir/.compiled/<stem>.marker so lexwiki's
124 `_is_compiled(raw_path)` returns True and skips the LLM call.
125
126 Materialize_raw_docs has already written the raw .md to disk; we touch
127 the marker AFTER, so its mtime is naturally >= the raw file's mtime
128 (which is what lexwiki's skip predicate compares).
129
130 Returns the count of pages successfully rehydrated.
131 """
132 markers_dir = raw_dir / ".compiled"
133 markers_dir.mkdir(parents=True, exist_ok=True)
134 rehydrated = 0
135 for source_path, info in skip_set.items():
136 slug = info.get("slug")
137 stem = info.get("stem")
138 prior_page = info.get("page")
139 if not (slug and stem and prior_page):
140 continue
141 try:
142 page_path = wiki_dir / f"{slug}.md"
143 page_path.parent.mkdir(parents=True, exist_ok=True)
144 page_path.write_text(
145 _serialize_page_to_markdown(prior_page), encoding='utf-8'
146 )
147 (markers_dir / f"{stem}.marker").write_text(
148 datetime.now(timezone.utc).isoformat(),
149 encoding='utf-8',
150 )
151 rehydrated += 1
152 except Exception as e:
153 logger.warning(
154 f"Failed to rehydrate page for {source_path} "
155 f"(slug={slug}, stem={stem}): {e}"
156 )
157 return rehydrated
158
159def _materialize_raw_docs(
160 documents: Dict[str, Any],
161 docx_blobs: Dict[str, bytes],
162 raw_dir: Path,
163 source_dir: Path,
164 *,
165 max_docs: int,
166 max_chars: int,
167) -> Tuple[int, Dict[str, str]]:
168 """Get every compilable workspace doc into `raw_dir` as a `.md` file.
169
170 Two paths:
171 - Doc has pre-extracted text content -> write .md directly with YAML
172 frontmatter (skips lexwiki's extractor — fast path).
173 - Doc has empty content but a docx_blob (typical for uploaded DOCX
174 whose text is extracted lazily on read) -> dump the blob to
175 `source_dir` as .docx and run `lexwiki.extract.router.ingest_file`
176 to convert to .md in raw_dir.
177
178 Returns:
179 (written_count, raw_stem_to_source) — the second value maps each
180 raw filename's stem (without `.md`) back to its workspace path, so
181 the per-doc compiler can attribute compiled pages to source paths.
182 """
183 raw_dir.mkdir(parents=True, exist_ok=True)
184 source_dir.mkdir(parents=True, exist_ok=True)
185 written = 0
186 raw_stem_to_source: Dict[str, str] = {}
187 for path, doc in sorted(documents.items()):
188 if written >= max_docs:
189 logger.info(f"Hit max_docs={max_docs}, skipping remainder")
190 break
191 blob = docx_blobs.get(path)
192 if not _is_compilable(path, doc, has_blob=bool(blob)):
193 continue
194
195 content = doc.get("content") or ""
196 if isinstance(content, str) and content.strip():
197
198 if len(content) > max_chars:
199 content = content[:max_chars] + "\n\n[... truncated for compilation budget ...]\n"
200 fmt = doc.get("format") or "md"
201 modified_at = doc.get("modified_at") or doc.get("created_at") or ""
202 title = path.rsplit('/', 1)[-1].rsplit('.', 1)[0]
203 frontmatter = (
204 "---\n"
205 f"source: {path}\n"
206 f"title: {title}\n"
207 f"format: {fmt}\n"
208 f"modified_at: {modified_at}\n"
209 f"word_count: {len(content.split())}\n"
210 "---\n"
211 )
212 safe_name = _safe_filename(path)
213 target = raw_dir / safe_name
214 try:
215 target.write_text(frontmatter + content, encoding='utf-8')
216 written += 1
217 raw_stem_to_source[Path(safe_name).stem] = path
218 except Exception as e:
219 logger.warning(f"Failed to materialize {path} -> {target}: {e}")
220 continue
221
222 if not blob:
223 continue
224 try:
225 from lexwiki.extract.router import ingest_file # type: ignore[import-not-found]
226 except ImportError:
227 logger.warning(f"lexwiki not available — skipping blob extraction for {path}")
228 continue
229 try:
230 safe_stem = Path(_safe_filename(path)).stem
231 tmp_docx = source_dir / (safe_stem + ".docx")
232 tmp_docx.write_bytes(blob)
233 ingest_file(tmp_docx, raw_dir)
234 written += 1
235
236 raw_stem_to_source[safe_stem] = path
237 except Exception as e:
238 logger.warning(f"Failed to extract docx blob for {path}: {e}")
239 continue
240
241 return written, raw_stem_to_source
242
243def compile_workspace(workspace_id: str, cfg: Config) -> Dict[str, Any]:
244 """Compile one workspace end-to-end.
245
246 Returns a small status dict suitable for logging:
247 {ok, status, source_doc_count, skipped_reason?, error?}
248
249 Exceptions are caught and translated into status='error' with the
250 workspace_wikis row updated accordingly.
251 """
252 from .db import (
253 get_workspace_for_compile,
254 get_workspace_wiki,
255 update_workspace_wiki,
256 update_workspace_wiki_status,
257 )
258
259 t0 = time.time()
260 workspace = get_workspace_for_compile(workspace_id)
261 if not workspace:
262 return {"ok": False, "error": "workspace not found"}
263
264 compile_model = cfg.model
265
266 documents = workspace.get("documents") or {}
267 docx_blobs = workspace.get("docx_blobs") or {}
268 compilable_count = sum(
269 1
270 for p, d in documents.items()
271 if _is_compilable(p, d, has_blob=bool(docx_blobs.get(p)))
272 )
273 if compilable_count == 0:
274
275 update_workspace_wiki(
276 workspace_id,
277 {"pages": {}, "indexes": {}, "workspace_notes": {"annotations": []}},
278 source_doc_count=0,
279 source_docs_hash="empty",
280 cost_usd=0.0,
281 )
282 return {"ok": True, "status": "ready", "source_doc_count": 0, "skipped_reason": "no compilable docs"}
283
284 current_hash = _compute_source_hash(documents, docx_blobs)
285 existing = get_workspace_wiki(workspace_id)
286 if existing and existing.get("source_docs_hash") == current_hash and existing.get("compile_status") == "ready":
287 return {"ok": True, "status": "ready", "source_doc_count": compilable_count, "skipped_reason": "unchanged"}
288
289 update_workspace_wiki_status(workspace_id, "compiling", error=None)
290
291 scratch_root = cfg.scratch_dir / workspace_id
292 if scratch_root.exists():
293 shutil.rmtree(scratch_root, ignore_errors=True)
294 raw_dir = scratch_root / "raw"
295 wiki_dir = scratch_root / "wiki"
296 source_dir = scratch_root / "source"
297 raw_dir.mkdir(parents=True, exist_ok=True)
298 wiki_dir.mkdir(parents=True, exist_ok=True)
299 source_dir.mkdir(parents=True, exist_ok=True)
300
301 try:
302 written, raw_stem_to_source = _materialize_raw_docs(
303 documents,
304 docx_blobs,
305 raw_dir,
306 source_dir,
307 max_docs=cfg.max_docs_per_workspace,
308 max_chars=cfg.max_doc_chars,
309 )
310 if written == 0:
311 update_workspace_wiki(
312 workspace_id,
313 {"pages": {}, "indexes": {}, "workspace_notes": {"annotations": []}},
314 source_doc_count=0,
315 source_docs_hash=current_hash,
316 cost_usd=0.0,
317 )
318 return {"ok": True, "status": "ready", "source_doc_count": 0, "skipped_reason": "materialize wrote 0"}
319
320 prior_data: Dict[str, Any] = {}
321 prior_pages: Dict[str, Any] = {}
322 prior_source_index: Dict[str, Any] = {}
323 if existing and isinstance(existing, dict):
324 prior_data = existing.get("wiki_data") or {}
325 prior_pages = prior_data.get("pages") or {}
326 prior_source_index = prior_data.get("source_index") or {}
327
328 skip_set: Dict[str, Dict[str, Any]] = {}
329
330 materialized_paths = set(raw_stem_to_source.values())
331 for source_path in materialized_paths:
332 doc = documents.get(source_path)
333 if not doc:
334 continue
335 prior_entry = prior_source_index.get(source_path)
336 if not prior_entry:
337 continue
338 current_doc_hash = _doc_signature(
339 source_path, doc, docx_blobs.get(source_path)
340 )
341 if prior_entry.get("source_hash") != current_doc_hash:
342 continue
343 slug = prior_entry.get("slug")
344 stem = prior_entry.get("stem")
345 if not slug or not stem:
346 continue
347 prior_page = prior_pages.get(slug)
348 if not prior_page:
349 continue
350 skip_set[source_path] = {
351 "stem": stem,
352 "slug": slug,
353 "source_hash": current_doc_hash,
354 "page": prior_page,
355 }
356
357 if skip_set:
358 rehydrated = _rehydrate_unchanged_pages(skip_set, raw_dir, wiki_dir)
359 logger.info(
360 f"[{workspace_id}] per-doc skip: rehydrated {rehydrated}/"
361 f"{len(skip_set)} unchanged pages, will recompile "
362 f"{written - rehydrated} doc(s)"
363 )
364
365 compile_result = lexwiki_runner.compile_vault_per_doc(
366 scratch_root,
367 model=compile_model,
368 base_url=cfg.base_url,
369 raw_stem_to_source=raw_stem_to_source,
370 )
371 stats = compile_result["stats"]
372 compiled_source_to_slug = compile_result["source_to_slug"]
373 logger.info(
374 f"[{workspace_id}] compile stats (model={compile_model}, "
375 f"skipped={len(skip_set)}, compiled={len(compiled_source_to_slug)}): {stats}"
376 )
377
378 wiki_data = lexwiki_runner.read_compiled_vault(scratch_root)
379
380 for slug, fresh_page in wiki_data["pages"].items():
381 prior = prior_pages.get(slug)
382 if not prior:
383 continue
384 fresh_page["annotations"] = prior.get("annotations") or []
385 overrides = prior.get("metadata_overrides") or {}
386 if overrides:
387 fresh_page["metadata_overrides"] = overrides
388 prior_workspace_notes = prior_data.get("workspace_notes")
389 if prior_workspace_notes:
390 wiki_data["workspace_notes"] = prior_workspace_notes
391
392 fresh_source_index: Dict[str, Any] = {}
393
394 for source_path, info in skip_set.items():
395 fresh_source_index[source_path] = {
396 "stem": info["stem"],
397 "slug": info["slug"],
398 "source_hash": info["source_hash"],
399 }
400
401 for source_path, slug in compiled_source_to_slug.items():
402 doc = documents.get(source_path)
403 if not doc:
404 continue
405 stem = next(
406 (s for s, p in raw_stem_to_source.items() if p == source_path),
407 None,
408 )
409 if not stem:
410 continue
411 fresh_source_index[source_path] = {
412 "stem": stem,
413 "slug": slug,
414 "source_hash": _doc_signature(
415 source_path, doc, docx_blobs.get(source_path)
416 ),
417 }
418 wiki_data["source_index"] = fresh_source_index
419
420 ok = update_workspace_wiki(
421 workspace_id,
422 wiki_data,
423 source_doc_count=written,
424 source_docs_hash=current_hash,
425 cost_usd=None,
426 )
427 if not ok:
428 raise RuntimeError("update_workspace_wiki returned False")
429
430 elapsed = time.time() - t0
431 return {
432 "ok": True,
433 "status": "ready",
434 "source_doc_count": written,
435 "page_count": len(wiki_data.get("pages") or {}),
436 "skipped_count": len(skip_set),
437 "compiled_count": len(compiled_source_to_slug),
438 "elapsed_seconds": round(elapsed, 2),
439 }
440
441 except Exception as e:
442 logger.exception(f"[{workspace_id}] compile failed")
443 update_workspace_wiki_status(workspace_id, "error", error=str(e)[:500])
444 return {"ok": False, "status": "error", "error": str(e)}
445
446 finally:
447
448 shutil.rmtree(scratch_root, ignore_errors=True)
449