From ad1ba2c16728ffa04f97ca912409d05baed2b00f Mon Sep 17 00:00:00 2001 From: root Date: Sat, 27 Dec 2025 02:43:51 +0000 Subject: [PATCH] revoice: add if.dave v1.3 style --- src/revoice/generate.py | 1784 +++++++++++++++++++++++++++++++++------ src/revoice/lint.py | 148 +++- 2 files changed, 1691 insertions(+), 241 deletions(-) diff --git a/src/revoice/generate.py b/src/revoice/generate.py index 9e36615..8e7593c 100644 --- a/src/revoice/generate.py +++ b/src/revoice/generate.py @@ -2,8 +2,10 @@ from __future__ import annotations import datetime as _dt import hashlib +import json +import os import re -from dataclasses import dataclass +from dataclasses import dataclass, field from pathlib import Path @@ -19,17 +21,37 @@ def _sha256_file(path: str) -> str: return h.hexdigest() -def generate_shadow_dossier(*, style_id: str, source_text: str, source_path: str) -> str: +def generate_shadow_dossier(*, style_id: str, source_text: str, source_path: str, action_pack: bool = False) -> str: if style_id.lower() in { "if.dave.v1", "if.dave.v1.1", "if.dave.v1.2", + "if.dave.v1.3", + "if.dave.fr.v1.2", + "if.dave.fr.v1.3", "dave", "if://bible/dave/v1.0", "if://bible/dave/v1.1", "if://bible/dave/v1.2", + "if://bible/dave/v1.3", + "if://bible/dave/fr/v1.2", + "if://bible/dave/fr/v1.3", }: - return _generate_dave_v1_2_mirror(source_text=source_text, source_path=source_path) + style = style_id.lower() + locale = "fr" if style in {"if.dave.fr.v1.2", "if.dave.fr.v1.3", "if://bible/dave/fr/v1.2", "if://bible/dave/fr/v1.3"} else "en" + if style in {"if.dave.v1.3", "if.dave.fr.v1.3", "if://bible/dave/v1.3", "if://bible/dave/fr/v1.3"}: + return _generate_dave_v1_3_mirror( + source_text=source_text, + source_path=source_path, + action_pack=action_pack, + locale=locale, + ) + return _generate_dave_v1_2_mirror( + source_text=source_text, + source_path=source_path, + action_pack=action_pack, + locale=locale, + ) raise ValueError(f"Unknown style id: {style_id}") @dataclass(frozen=True) @@ -38,6 +60,34 @@ class _SourceSection: body: str why_it_matters: str | None = None +@dataclass +class _RenderContext: + seed: str + locale: str = "en" + used_callouts: set[str] = field(default_factory=set) + used_diagrams: set[str] = field(default_factory=set) + used_paragraphs: set[str] = field(default_factory=set) + used_reframe_tails: set[str] = field(default_factory=set) + used_punchlines: set[str] = field(default_factory=set) + + def pick_unique(self, *, kind: str, key: str, variants: list[str], used: set[str]) -> str: + if not variants: + raise ValueError(f"Missing variants for {kind}") + + digest = hashlib.sha256(f"{self.seed}:{kind}:{key}".encode("utf-8", errors="replace")).digest() + start = int.from_bytes(digest[:4], "big") % len(variants) + + for offset in range(len(variants)): + candidate = variants[(start + offset) % len(variants)].strip() + if candidate and candidate not in used: + used.add(candidate) + return candidate + + candidate = variants[start].strip() + if candidate: + used.add(candidate) + return candidate + _PAGE_SPLIT_RE = re.compile(r"(?m)^===== page-(\d+) =====$") _URL_RE = re.compile(r"https?://\S+") @@ -46,6 +96,19 @@ _SENTENCE_SPLIT_RE = re.compile(r"(?<=[.!?])\s+") _TOC_ENTRY_RE = re.compile(r"^\s*(?P.+?)\s+(?:\.\s*){3,}\s+(?P<page>\d+)\s*$") _METRIC_VALUE_RE = re.compile(r"^(?:\$[\d,]+|\d+%|\d+-month)$") _METRIC_TOKEN_RE = re.compile(r"\$[\d,]+|\b\d+%|\b\d+-month\b") +_NUMERIC_ANCHOR_RE = re.compile( + r"\$[\d,]+" + r"|\b\d+%|\b\d+-month\b" + r"|\bn\s*=\s*\d+\b" + r"|\b20\d{2}\b" + r"|\b\d+(?:[.,]\d+)?\s*(?:€|eur|euros)\b" + r"|\b\d+(?:[.,]\d+)?\s*(?:ans|jours|mois)\b" + r"|\b\d+(?:[.,]\d+)?\s*(?:million|milliard|milliards|bn|billion|billions)\b", + re.IGNORECASE, +) +_OWASP_REFERENCE_ITEM_RE = re.compile(r"^\s*(?P<num>\d+)\.\s*(?P<text>\S.*)$") +_NUMBERED_SECTION_START_RE = re.compile(r"^\s*(?P<num>\d{2})\s*$") +_ALPHA_RE = re.compile(r"[A-Za-zÀ-ÖØ-öø-ÿ]") _OWASP_LLM_SUBHEADINGS = [ "Description", @@ -60,6 +123,48 @@ _OWASP_LLM_SUBHEADINGS = [ "Related Frameworks and Taxonomies", ] +_DAVE_REFRAME_TAILS = [ + "so we can align on an owner, a gate, and an expiry date", + "so we can socialize the stop condition and keep it on the roadmap", + "so we can circle back next sprint with a merge-blocking rule", + "so we can make this a deliverable instead of a vibe", + "so we can keep stakeholder comfort high and risk acceptance time-bounded", + "so we can preserve plausible deniability while still enforcing something real", + "so we can put a pin in it until Legal is comfortable", + "so we can capture it as an action item with an owner and a deadline", + "so we can align on a plan to align on the plan, with an owner and an expiry date", + "so we can turn this into a gate instead of a calendar invite", + "so we can measure outcomes in something other than meeting attendance", + "so we can keep this on the roadmap without letting it live in production forever", +] + +_DAVE_REFRAME_TAILS_FR = [ + "afin que nous puissions aligner un responsable, un contrôle en amont et une date d’expiration", + "afin que nous puissions le socialiser avec les parties prenantes et figer un critère de blocage", + "afin que nous puissions y revenir au prochain cycle avec une règle réellement opposable", + "afin que nous puissions en faire un livrable plutôt qu’une intention", + "afin que nous puissions préserver le confort des parties prenantes tout en bornant l’acceptation du risque", + "afin que nous puissions rester aimables sur la forme tout en étant vérifiables sur le fond", + "afin que nous puissions ouvrir une concertation et formaliser une décision", + "afin que nous puissions consigner un point d’action, avec un responsable et une échéance", + "afin que nous puissions aligner un plan d’alignement, avec un responsable et une date d’expiration", + "afin que nous puissions en faire une porte de contrôle plutôt qu’une invitation calendrier", + "afin que nous puissions mesurer autre chose que la présence en réunion", + "afin que nous puissions garder cela sur la feuille de route sans le laisser vivre en production indéfiniment", +] + + +def _looks_like_site_footer(line: str) -> bool: + s = (line or "").strip() + if not s: + return False + upper = s.upper() + if upper.startswith("WWW.") and "." in upper and len(upper) <= 64: + return True + if re.fullmatch(r"[A-Z0-9.-]+\.[A-Z]{2,}", upper) and len(upper) <= 64: + return True + return False + def _extract_urls(text: str) -> list[str]: urls: list[str] = [] @@ -70,6 +175,108 @@ def _extract_urls(text: str) -> list[str]: return urls +def _extract_owasp_reference_items(text: str) -> list[str]: + items: list[tuple[str, list[str]]] = [] + cur_num: str | None = None + cur_parts: list[str] = [] + + def flush() -> None: + nonlocal cur_num, cur_parts + if cur_num is None: + return + joined = " ".join([p.strip() for p in cur_parts if p.strip()]).strip() + joined = re.sub(r"\s{2,}", " ", joined).strip() + if joined: + items.append((cur_num, [joined])) + cur_num = None + cur_parts = [] + + for ln in (text or "").splitlines(): + m = _OWASP_REFERENCE_ITEM_RE.match(ln) + if m: + flush() + cur_num = m.group("num") + cur_parts = [m.group("text").strip()] + continue + + if cur_num is None: + continue + + s = ln.strip() + if not s: + continue + if s.startswith("genai.owasp.org") or "OWASP Top 10 for LLM" in s: + continue + cur_parts.append(s) + + flush() + + out: list[str] = [] + for num, parts in items: + text = " ".join(parts).strip() + if text: + out.append(f"{num}. {text}") + return out + + +def _inject_reframe_tail(line: str, tail: str) -> str: + m = re.match(r"^(?P<prefix>\s*>\s*)(?P<body>.*)$", line.rstrip()) + if not m: + return line.rstrip() + + prefix = m.group("prefix") + body = m.group("body").rstrip() + if not body or "—" in body: + return f"{prefix}{body}".rstrip() + + closers = "" + while body and body[-1] in {")", "]", '"', "'", "”", "’"}: + closers = body[-1] + closers + body = body[:-1] + + punct = "." + if body and body[-1] in {".", "!", "?"}: + punct = body[-1] + body = body[:-1] + + body = body.rstrip() + return f"{prefix}{body} — {tail}{punct}{closers}".rstrip() + + +def _inject_plain_tail(text: str, tail: str) -> str: + body = (text or "").strip() + if not body or "—" in body: + return body + + closers = "" + while body and body[-1] in {")", "]", '"', "'", "”", "’"}: + closers = body[-1] + closers + body = body[:-1] + + punct = "." + if body and body[-1] in {".", "!", "?"}: + punct = body[-1] + body = body[:-1] + + body = body.rstrip() + return f"{body} — {tail}{punct}{closers}".strip() + + +def _daveify_callout_reframe(callout: str, *, ctx: _RenderContext, key: str) -> str: + lines = callout.splitlines() + for idx in range(len(lines) - 1, -1, -1): + stripped = lines[idx].lstrip() + if not stripped.startswith(">"): + continue + if re.match(r"^>\s*\*\*[^*]+\*\*:", stripped): + continue + tails = _DAVE_REFRAME_TAILS_FR if ctx.locale.lower().startswith("fr") else _DAVE_REFRAME_TAILS + tail = ctx.pick_unique(kind="reframe_tail", key=key, variants=tails, used=ctx.used_reframe_tails) + lines[idx] = _inject_reframe_tail(lines[idx], tail) + return "\n".join(lines).strip() + return callout.strip() + + def _looks_like_owasp_llm_top10(text: str) -> bool: if "OWASP Top 10 for" not in text and "OWASP Top 10" not in text: return False @@ -121,6 +328,20 @@ def _first_sentences(text: str, *, max_sentences: int = 2, max_chars: int = 260) return snippet +def _extract_numeric_anchors(text: str, *, limit: int = 3) -> list[str]: + anchors: list[str] = [] + for match in _NUMERIC_ANCHOR_RE.finditer(text or ""): + value = match.group(0).strip() + if not value: + continue + normalized = re.sub(r"\s+", "", value.lower()) + if normalized not in {re.sub(r"\s+", "", a.lower()) for a in anchors}: + anchors.append(value) + if len(anchors) >= limit: + break + return anchors + + def _split_owasp_llm_subsections(body: str) -> list[tuple[str, str]]: headings = set(_OWASP_LLM_SUBHEADINGS) lines = [ln.rstrip() for ln in body.splitlines()] @@ -217,12 +438,49 @@ def _normalize_ocr(text: str) -> str: text = text.replace("GenAl", "GenAI") text = text.replace("Cl/CD", "CI/CD") text = text.replace("olugin", "plugin") + text = text.replace("FORIWARD", "FORWARD") + text = text.replace("\\u00agrave", "à") return text +_UNICODE_QUOTE_MAP = str.maketrans( + { + "\u201c": '"', + "\u201d": '"', + "\u2018": "'", + "\u2019": "'", + "\u2013": "-", + "\u2014": "-", + "\u00a0": " ", + } +) + + +def _normalize_unicode_punctuation(text: str) -> str: + return str(text or "").translate(_UNICODE_QUOTE_MAP) + + +def _coerce_json_code_block(code: str) -> tuple[str, str, bool]: + cleaned = _normalize_unicode_punctuation(code).strip() + try: + obj = json.loads(cleaned) + except Exception: + return "text", cleaned, False + return "json", json.dumps(obj, indent=2, ensure_ascii=False), True + + def _parse_pages(source_text: str) -> list[tuple[str, str]]: matches = list(_PAGE_SPLIT_RE.finditer(source_text)) if not matches: + if "\f" in source_text: + parts = [p.strip() for p in source_text.split("\f")] + pages: list[tuple[str, str]] = [] + page_no = 1 + for part in parts: + if part: + pages.append((str(page_no), part)) + page_no += 1 + return pages or [("doc", source_text.strip())] return [("doc", source_text.strip())] pages: list[tuple[str, str]] = [] @@ -296,6 +554,122 @@ def _parse_sections_from_page(page_text: str) -> list[_SourceSection]: return sections +def _clean_page_body(text: str) -> str: + lines: list[str] = [] + for ln in (text or "").splitlines(): + s = ln.strip() + if not s: + if lines and lines[-1] != "": + lines.append("") + continue + if _looks_like_site_footer(s): + continue + lines.append(ln.rstrip()) + return "\n".join(lines).strip() + + +def _parse_numbered_section_page(page_text: str) -> tuple[str, str, str] | None: + lines = [ln.rstrip() for ln in (page_text or "").splitlines()] + i = 0 + while i < len(lines): + s = lines[i].strip() + if not s: + i += 1 + continue + if _looks_like_site_footer(s): + i += 1 + continue + break + if i >= len(lines): + return None + + m = _NUMBERED_SECTION_START_RE.match(lines[i].strip()) + if not m: + return None + + num = m.group("num") + + j = i + 1 + while j < len(lines) and not lines[j].strip(): + j += 1 + + title_lines: list[str] = [] + while j < len(lines) and lines[j].strip(): + s = lines[j].strip() + if _looks_like_site_footer(s): + break + title_lines.append(s) + j += 1 + + title = " ".join(title_lines).strip() + if not title or not _ALPHA_RE.search(title): + return None + + while j < len(lines) and not lines[j].strip(): + j += 1 + + body = _clean_page_body("\n".join(lines[j:])) + return (num, title, body) + + +def _extract_sections_numbered_outline(pages: list[tuple[str, str]]) -> list[_SourceSection] | None: + starts = 0 + for _page_no, page_text in pages: + if _parse_numbered_section_page(page_text): + starts += 1 + if starts < 3: + return None + + cover_parts: list[str] = [] + sections: list[_SourceSection] = [] + cur_title: str | None = None + cur_body_parts: list[str] = [] + + def flush_cover() -> None: + nonlocal cover_parts + cover_text = "\n\n".join([p for p in cover_parts if p.strip()]).strip() + cover_parts = [] + if not cover_text: + return + sections.append(_SourceSection(title="COUVERTURE", body=_clean_page_body(cover_text), why_it_matters=None)) + + def flush_section() -> None: + nonlocal cur_title, cur_body_parts + if cur_title is None: + return + body = "\n\n".join([p for p in cur_body_parts if p.strip()]).strip() + sections.append(_SourceSection(title=cur_title, body=_clean_page_body(body), why_it_matters=None)) + cur_title = None + cur_body_parts = [] + + for _page_no, page_text in pages: + parsed = _parse_numbered_section_page(page_text) + if parsed: + num, title, body = parsed + if cur_title is None: + flush_cover() + else: + flush_section() + cur_title = f"{num} — {title}" + cur_body_parts = [body] if body else [] + continue + + cleaned = _clean_page_body(page_text) + if not cleaned: + continue + if cur_title is None: + cover_parts.append(cleaned) + else: + cur_body_parts.append(cleaned) + + if cur_title is None: + flush_cover() + else: + flush_section() + + return sections or None + + def _extract_sections(source_text: str) -> list[_SourceSection]: if _looks_like_owasp_llm_top10(source_text): return _extract_sections_owasp_llm_top10(source_text) @@ -303,6 +677,9 @@ def _extract_sections(source_text: str) -> list[_SourceSection]: return _extract_sections_idc_business_value(source_text) pages = _parse_pages(source_text) + numbered = _extract_sections_numbered_outline(pages) + if numbered: + return numbered sections: list[_SourceSection] = [] for _page_no, page_text in pages: if page_text.strip(): @@ -356,6 +733,7 @@ def _extract_sections_owasp_llm_top10(source_text: str) -> list[_SourceSection]: "Table of Contents", "Letter from the Project Leads", "What’s New in the 2025 Top 10", + "What's New in the 2025 Top 10", "Moving Forward", "Project Sponsors", } @@ -659,11 +1037,12 @@ def _slugify(value: str) -> str: return cleaned.upper() if cleaned else "UNKNOWN" -def _inferred_mermaid(title: str) -> str | None: +def _inferred_mermaid(title: str, *, ctx: _RenderContext) -> str | None: title_upper = title.upper() if "BUSINESS VALUE" in title_upper or "ROI" in title_upper: - return """flowchart TD + variants = [ + """flowchart TD A["Sponsor narrative"] --> B["Business value model"] B --> C["Executive buy-in"] C --> D["Rollout project"] @@ -671,10 +1050,36 @@ def _inferred_mermaid(title: str) -> str | None: E --> F["Renewal discussion"] F --> G["KPI trend deck"] G --> C -""" +""", + """flowchart TD + A["Baseline (unknown)"] --> B["ROI spreadsheet"] + B --> C["Assumptions: optimistic"] + C --> D["Rollout work"] + D --> E["Exceptions + manual steps"] + E --> F["Metric redefinition"] + F --> B +""", + """flowchart TD + A["Procurement decision"] --> B["Implementation project"] + B --> C["Evidence automation"] + C --> D["Audit season"] + D --> E["Renewal negotiation"] + E --> F["Success story deck"] + F --> A +""", + """flowchart TD + A["Executive sponsor"] --> B["Quarterly update deck"] + B --> C["KPI trend (directional)"] + C --> D["Roadmap refresh"] + D --> E["Pilot expansion"] + E --> B +""", + ] + return ctx.pick_unique(kind="diagram:roi", key=title, variants=variants, used=ctx.used_diagrams) if "COMPLIANCE" in title_upper or "AUDIT" in title_upper: - return """flowchart TD + variants = [ + """flowchart TD A["Control requirement"] --> B["Evidence requested"] B --> C["Artifact gathered"] C --> D["Review meeting"] @@ -682,10 +1087,29 @@ def _inferred_mermaid(title: str) -> str | None: E -->|Yes| F["Audit satisfied"] E -->|No| G["Remediation plan"] G --> D -""" +""", + """flowchart TD + A["Control requirement"] --> B["Evidence request"] + B --> C["Screenshot collected"] + C --> D["Shared drive folder"] + D --> E["Checklist satisfied"] + E --> F["Exceptions accumulate"] + F --> B +""", + """flowchart TD + A["Quarter begins"] --> B["Evidence scramble"] + B --> C["Spreadsheet status"] + C --> D["Steering committee"] + D --> E["Audit passed"] + E --> F["Backlog deferred"] + F --> A +""", + ] + return ctx.pick_unique(kind="diagram:audit", key=title, variants=variants, used=ctx.used_diagrams) if "THIRD-PARTY" in title_upper or "VENDOR" in title_upper: - return """flowchart TD + variants = [ + """flowchart TD A["Vendor onboarding"] --> B["Questionnaire"] B --> C["Evidence chase"] C --> D["Risk rating"] @@ -694,10 +1118,39 @@ def _inferred_mermaid(title: str) -> str | None: E -->|No| G["Blocked pending controls"] F --> H["Renewal cycle"] G --> H -""" +""", + """flowchart TD + A["Vendor intake"] --> B["Security questionnaire"] + B --> C["Vendor sends PDF"] + C --> D["Internal interpretation"] + D --> E["Legal review"] + E --> F{Decision} + F -->|Accept| G["Approved with exceptions"] + F -->|Block| H["Deferred to next quarter"] + G --> I["Renewal"] + H --> I +""", + """flowchart TD + A["Business wants tool"] --> B["Vendor risk rating"] + B --> C{High risk?} + C -->|Yes| D["Exception workflow"] + C -->|No| E["Standard approval"] + D --> F["Compensating controls"] + F --> E + E --> G["Onboarding complete"] + G --> H["Usage begins"] + H --> I["Reassessment"] + I --> B +""", + ] + return ctx.pick_unique(kind="diagram:third_party", key=title, variants=variants, used=ctx.used_diagrams) if title_upper.startswith("LLM01") or "PROMPT INJECTION" in title_upper: - return """flowchart TD + return ctx.pick_unique( + kind="diagram:llm01", + key=title, + variants=[ + """flowchart TD A["Attacker prompt"] --> B["LLM prompt parser"] B --> C["System prompt + tools"] C --> D["Model follows injected instruction"] @@ -705,18 +1158,32 @@ def _inferred_mermaid(title: str) -> str | None: E --> F["Incident review meeting"] F --> G["Policy update: scheduled"] """ + ], + used=ctx.used_diagrams, + ) if title_upper.startswith("LLM02") or "SENSITIVE INFORMATION" in title_upper: - return """flowchart TD + return ctx.pick_unique( + kind="diagram:llm02", + key=title, + variants=[ + """flowchart TD A["User asks a question"] --> B["LLM retrieves context"] B --> C["Hidden secret present in context"] C --> D["Model outputs secret"] D --> E["Screenshot captured for compliance"] E --> F["Access remains enabled"] """ + ], + used=ctx.used_diagrams, + ) if title_upper.startswith("LLM03") or "SUPPLY CHAIN" in title_upper: - return """flowchart TD + return ctx.pick_unique( + kind="diagram:llm03", + key=title, + variants=[ + """flowchart TD A["Upstream model or dependency"] --> B["Pulled into build"] B --> C["Trusted by default"] C --> D["Compromise introduced"] @@ -724,27 +1191,48 @@ def _inferred_mermaid(title: str) -> str | None: E --> F["Vendor asks for logs"] F --> G["We align on next steps"] """ + ], + used=ctx.used_diagrams, + ) if title_upper.startswith("LLM04") or "POISONING" in title_upper: - return """flowchart TD + return ctx.pick_unique( + kind="diagram:llm04", + key=title, + variants=[ + """flowchart TD A["Attacker data"] --> B["Training or fine-tune"] B --> C["Model behavior shifts"] C --> D["Bad outputs in production"] D --> E["Root cause: unclear"] E --> F["New dataset review committee"] """ + ], + used=ctx.used_diagrams, + ) if title_upper.startswith("LLM05") or "OUTPUT HANDLING" in title_upper: - return """flowchart TD + return ctx.pick_unique( + kind="diagram:llm05", + key=title, + variants=[ + """flowchart TD A["LLM generates output"] --> B["Output treated as trusted"] B --> C["Downstream system executes or renders"] C --> D["Injection hits a sink"] D --> E["Hotfix + postmortem"] E --> F["Guardrail doc updated"] """ + ], + used=ctx.used_diagrams, + ) if title_upper.startswith("LLM06") or "EXCESSIVE AGENCY" in title_upper: - return """flowchart TD + return ctx.pick_unique( + kind="diagram:llm06", + key=title, + variants=[ + """flowchart TD A["User goal"] --> B["Agent plans steps"] B --> C["Tool access granted"] C --> D["Action executed"] @@ -752,45 +1240,80 @@ def _inferred_mermaid(title: str) -> str | None: E --> F["Exception request filed"] F --> C """ + ], + used=ctx.used_diagrams, + ) if title_upper.startswith("LLM07") or "PROMPT LEAKAGE" in title_upper: - return """flowchart TD + return ctx.pick_unique( + kind="diagram:llm07", + key=title, + variants=[ + """flowchart TD A["User prompt"] --> B["Model context window"] B --> C["System prompt present"] C --> D["Leak via output or tool call"] D --> E["Prompt rotated quarterly"] E --> C """ + ], + used=ctx.used_diagrams, + ) if title_upper.startswith("LLM08") or "VECTOR" in title_upper or "EMBEDDING" in title_upper: - return """flowchart TD + return ctx.pick_unique( + kind="diagram:llm08", + key=title, + variants=[ + """flowchart TD A["Documents ingested"] --> B["Embeddings store"] B --> C["Retriever selects chunks"] C --> D["Injected chunk included"] D --> E["LLM follows malicious context"] E --> F["We add a filter later"] """ + ], + used=ctx.used_diagrams, + ) if title_upper.startswith("LLM09") or "MISINFORMATION" in title_upper: - return """flowchart TD + return ctx.pick_unique( + kind="diagram:llm09", + key=title, + variants=[ + """flowchart TD A["Model output"] --> B["Looks confident"] B --> C["Decision made"] C --> D["Outcome fails"] D --> E["Retroactive citations requested"] E --> F["Alignment session"] """ + ], + used=ctx.used_diagrams, + ) if title_upper.startswith("LLM10") or "UNBOUNDED CONSUMPTION" in title_upper: - return """flowchart TD + return ctx.pick_unique( + kind="diagram:llm10", + key=title, + variants=[ + """flowchart TD A["Request"] --> B["Tokens consumed"] B --> C["Costs rise"] C --> D["Rate limit suggested"] D --> E["Exception granted"] E --> B """ + ], + used=ctx.used_diagrams, + ) if title_upper.startswith("APPENDIX 1") or "ARCHITECTURE" in title_upper: - return """flowchart TD + return ctx.pick_unique( + kind="diagram:architecture", + key=title, + variants=[ + """flowchart TD A["User"] --> B["App"] B --> C["LLM"] C --> D["Tools"] @@ -798,103 +1321,237 @@ def _inferred_mermaid(title: str) -> str | None: D --> F["External systems"] E --> C """ + ], + used=ctx.used_diagrams, + ) if "PULL REQUEST" in title_upper: - return """flowchart TD - A[Code change] --> B[Pull request opened] - B --> C[Automated scan: PR checks] + return ctx.pick_unique( + kind="diagram:pull_request", + key=title, + variants=[ + """flowchart TD + A["Code change"] --> B["Pull request opened"] + B --> C["Automated scan: PR checks"] C --> D{Findings?} - D -->|None| E[Merge] - D -->|Some| F[Ticket created] - F --> G[Exception request] - G --> H[Alignment session] - H --> I[Risk accepted: documented] + D -->|None| E["Merge"] + D -->|Some| F["Ticket created"] + F --> G["Exception request"] + G --> H["Alignment session"] + H --> I["Risk accepted: documented"] I --> E """ + ], + used=ctx.used_diagrams, + ) if "SHIFTING LEFT" in title_upper: - return """flowchart TD - A[Developer writes code] --> B[IDE scan: local] + return ctx.pick_unique( + kind="diagram:shift_left", + key=title, + variants=[ + """flowchart TD + A["Developer writes code"] --> B["IDE scan: local"] B --> C{Issue found?} - C -->|Yes| D[Fix now] - C -->|No| E[Commit] - E --> F[PR checks] - A --> G[Agent workflow] - G --> H[Local MCP scan] + C -->|Yes| D["Fix now"] + C -->|No| E["Commit"] + E --> F["PR checks"] + A --> G["Agent workflow"] + G --> H["Local MCP scan"] H --> E """ + ], + used=ctx.used_diagrams, + ) if "REQUEST EVIDENCE" in title_upper: - return """flowchart TD - A[Developer requests access] --> B[Upload screenshot] - B --> C[Attestation captured] - C --> D[Access enabled] - D --> E[Local testing: claimed] - E --> F[Periodic audit] + return ctx.pick_unique( + kind="diagram:request_evidence", + key=title, + variants=[ + """flowchart TD + A["Developer requests access"] --> B["Upload screenshot"] + B --> C["Attestation captured"] + C --> D["Access enabled"] + D --> E["Local testing: claimed"] + E --> F["Periodic audit"] F --> G{Still compliant?} G -->|Yes| D - G -->|No| H[Access paused pending review] - H --> I[Alignment session] + G -->|No| H["Access paused pending review"] + H --> I["Alignment session"] I --> D """ + ], + used=ctx.used_diagrams, + ) - if "AUDIT" in title_upper: - return """flowchart TD - A[Collect usage signals] --> B[Correlate assistants vs scans] - B --> C[Identify gaps] - C --> D[Notify developers] - D --> E[Remediation window] - E --> F[Dashboard update] - F --> G[Quarterly KPI trend review] - G --> H[Action items: optional] + if title_upper == "AUDIT": + return ctx.pick_unique( + kind="diagram:audit_generic", + key=title, + variants=[ + """flowchart TD + A["Collect usage signals"] --> B["Correlate assistants vs scans"] + B --> C["Identify gaps"] + C --> D["Notify developers"] + D --> E["Remediation window"] + E --> F["Dashboard update"] + F --> G["Quarterly KPI trend review"] + G --> H["Action items: optional"] """ + ], + used=ctx.used_diagrams, + ) if "TRAINING" in title_upper: - return """flowchart TD - A[Onboarding] --> B[Training module] - B --> C[Quiz] + return ctx.pick_unique( + kind="diagram:training", + key=title, + variants=[ + """flowchart TD + A["Onboarding"] --> B["Training module"] + B --> C["Quiz"] C --> D{Pass?} - D -->|Yes| E[Certificate issued] - D -->|No| F[Retake scheduled] - E --> G[Access request approved] - G --> H[Usage begins] - H --> I[Refresher cadence] + D -->|Yes| E["Certificate issued"] + D -->|No| F["Retake scheduled"] + E --> G["Access request approved"] + G --> H["Usage begins"] + H --> I["Refresher cadence"] I --> B """ + ], + used=ctx.used_diagrams, + ) if "ACCESS CONTROL" in title_upper: - return """flowchart TD - A[Policy defined] --> B[Endpoint management] + return ctx.pick_unique( + kind="diagram:access_control", + key=title, + variants=[ + """flowchart TD + A["Policy defined"] --> B["Endpoint management"] B --> C{Prerequisites met?} - C -->|Yes| D[Assistant enabled] - C -->|No| E[Blocked by policy] - E --> F[Exception request] - F --> G[Owner approval] + C -->|Yes| D["Assistant enabled"] + C -->|No| E["Blocked by policy"] + E --> F["Exception request"] + F --> G["Owner approval"] G --> D """ + ], + used=ctx.used_diagrams, + ) if "PATH FOR" in title_upper: - return """flowchart TD - A[Desire: secure innovation] --> B[Guardrails planned] - B --> C[Pilot cohort] - C --> D[Deck + FAQ] - D --> E[Stakeholder alignment] - E --> F[Incremental rollout] - F --> G[Measure adoption] - G --> H[Reframe as iteration] + return ctx.pick_unique( + kind="diagram:path_forward", + key=title, + variants=[ + """flowchart TD + A["Desire: secure innovation"] --> B["Guardrails planned"] + B --> C["Pilot cohort"] + C --> D["Deck + FAQ"] + D --> E["Stakeholder alignment"] + E --> F["Incremental rollout"] + F --> G["Measure adoption"] + G --> H["Reframe as iteration"] H --> E """ + ], + used=ctx.used_diagrams, + ) + + if ctx.locale.lower().startswith("fr"): + title_clean = re.sub(r"\s{2,}", " ", title.strip()) + title_safe = title_clean.replace('"', "'") + m = re.match(r"^(?P<num>\d{2})\s*(?:—|-)?\s*(?P<rest>.*)$", title_clean) + sec = m.group("num") if m else None + + if sec == "01" or "RÉFORME" in title_upper or "FACTURATION" in title_upper: + return f"""flowchart TD + A["Réforme 2026"] --> B["Facture au format Factur-X"] + B --> C["Transmission via PDP/PPF"] + C --> D["Statuts: acceptée / rejetée / payée"] + D --> E["Archivage: 10 ans"] + E --> F["Contrôle / audit"] + F --> G["Comité de pilotage"] + G --> A +""" + + if sec == "02" or "RISQU" in title_upper or "N’ANTICIP" in title_upper or "N'ANTICIP" in title_upper: + return f"""flowchart TD + A["Outils fragmentés"] --> B["Ressaisies"] + B --> C["Erreurs"] + C --> D["Retard de facturation"] + D --> E["Retard d'encaissement"] + E --> F["Tension de trésorerie"] + F --> G["Pression interne"] + G --> A +""" + + if sec == "03" or "SANS ERP" in title_upper: + return f"""flowchart TD + A["Excel / outil simple"] --> B["Conversion Factur-X"] + B --> C["Connexion à une PDP"] + C --> D["Suivi manuel des statuts"] + D --> E["Archivage légal séparé"] + E --> F["Outils + abonnements en plus"] + F --> G["Sources d'erreurs"] + G --> A +""" + + if sec == "04" or "ERP" in title_upper: + return f"""flowchart TD + A["Temps + projets"] --> B["Facturation"] + B --> C["Transmission via PDP"] + C --> D["Statuts automatiques"] + D --> E["Archivage conforme"] + E --> F["Marge projet"] + F --> G["Pilotage"] + G --> A +""" + + if sec == "05" or "AUTO" in title_upper or "DIAGNOSTIC" in title_upper or "GAGNEZ" in title_upper: + return f"""flowchart TD + A["Automatisation"] --> B["Coût unitaire en baisse"] + B --> C["Visibilité: marges + encaissements"] + C --> D["Décisions"] + D --> E["Processus stabilisé"] + E --> F["Conformité durable"] + F --> A +""" + + if sec == "06" or "PLAN D'ACTION" in title_upper or "PLAN D’ACTION" in title_upper: + return f"""flowchart TD + A["Diagnostic"] --> B["Flux critiques"] + B --> C["Cartographie des outils"] + C --> D["Environnement unique"] + D --> E["Formation + adoption"] + E --> F["Prêt avant l'échéance"] +""" + + return f"""flowchart TD + A["Section: {title_safe}"] --> B["Alignement des parties prenantes"] + B --> C["Décision: à qualifier"] + C --> D["Plan: à socialiser"] + D --> E["Revue: au prochain comité"] + E --> B +""" return None -def _render_inferred_diagram(title: str) -> str | None: - diagram = _inferred_mermaid(title) +def _render_inferred_diagram(title: str, *, ctx: _RenderContext) -> str | None: + diagram = _inferred_mermaid(title, ctx=ctx) if not diagram: return None + heading = ( + "### Schéma InfraFabric Red Team (inféré)" + if ctx.locale.lower().startswith("fr") + else "### InfraFabric Red Team Diagram (Inferred)" + ) return "\n".join( [ - "### InfraFabric Red Team Diagram (Inferred)", + heading, "", "```mermaid", diagram.rstrip(), @@ -903,156 +1560,429 @@ def _render_inferred_diagram(title: str) -> str | None: ) -def _render_dave_factor_callout(section: _SourceSection) -> str | None: +def _render_dave_factor_callout(section: _SourceSection, *, ctx: _RenderContext) -> str | None: title_upper = section.title.upper() excerpt = f"{section.title}\n{section.why_it_matters or ''}\n{section.body}".strip() if "BUSINESS VALUE" in title_upper or "ROI" in title_upper: - return "\n".join( - [ - "> **The Dave Factor:** The ROI model becomes the control, and the control becomes the explanation for why reality must align to the spreadsheet.", - "> **Countermeasure:** Define baseline metrics, instrument time-to-evidence, and set stop conditions for exceptions and manual work.", - ] - ) + variants = [ + "\n".join( + [ + "> **The Dave Factor:** The ROI model becomes the control, and the control becomes the explanation for why reality must align to the spreadsheet.", + "> **Countermeasure:** Define baseline metrics, instrument time-to-evidence, and set stop conditions for exceptions and manual work.", + "> The problem isn't ROI. The problem is ROI quietly becoming the approval mechanism for work that never gets decommissioned.", + ] + ), + "\n".join( + [ + '> **The Dave Factor:** The metric becomes the mission, and the mission becomes "protect the number" once reality diverges.', + '> **Countermeasure:** Publish assumptions, define "done", and require evidence that automation replaced work (not just moved it).', + "> This is not a metrics problem. This is a governance problem: once the number exists, the org optimizes for the number.", + ] + ), + "\n".join( + [ + '> **The Dave Factor:** "Payback in 3 months" becomes a deadline for narrative, not delivery, so we measure what ships and call it impact.', + "> **Countermeasure:** Time-box pilots, set exit criteria, and make renewals contingent on measured outcomes (not sentiment).", + "> The failure is not payback. The failure is narrative deadlines replacing delivery, with a dashboard standing in for evidence.", + ] + ), + "\n".join( + [ + "> **The Dave Factor:** ROI turns into Return on Inaction: the spreadsheet is used to justify not touching the legacy process.", + "> **Countermeasure:** Put an owner on decommissioning manual steps, and make exception expiry automatic and enforced.", + "> The problem isn't the legacy process. The problem is declaring it \"heritage\" the moment removal would require ownership.", + ] + ), + ] + callout = ctx.pick_unique(kind="callout:roi", key=section.title, variants=variants, used=ctx.used_callouts) + return _daveify_callout_reframe(callout, ctx=ctx, key=section.title) if "COMPLIANCE" in title_upper or "AUDIT" in title_upper: - return "\n".join( - [ - "> **The Dave Factor:** Evidence collection becomes the product, and the product becomes a shared drive with strong opinions.", - "> **Countermeasure:** Make evidence machine-generated, time-bounded, and verifiable (with owners and expiry).", - ] - ) + variants = [ + "\n".join( + [ + "> **The Dave Factor:** Evidence collection becomes the product, and the product becomes a shared drive with strong opinions.", + "> **Countermeasure:** Make evidence machine-generated, time-bounded, and verifiable (with owners and expiry).", + '> The problem isn\'t collecting evidence. The problem is evidence that requires a guided tour and a Slack thread to "interpret" in context.', + ] + ), + "\n".join( + [ + "> **The Dave Factor:** Evidence becomes a museum: artifacts are preserved forever, but control effectiveness is optional.", + "> **Countermeasure:** Prefer telemetry over screenshots; make evidence time-bounded and continuously sampled.", + "> The problem isn't retention. The problem is artifacts outliving the controls they were supposed to prove.", + ] + ), + "\n".join( + [ + "> **The Dave Factor:** Audit readiness becomes a seasonal sport; we optimize for the week before the auditor arrives.", + "> **Countermeasure:** Automate evidence generation, alert on drift, and treat missing signals as a stop condition.", + "> The problem isn't the audit. The problem is treating audit week as the only time the system is allowed to be real.", + ] + ), + ] + callout = ctx.pick_unique(kind="callout:audit", key=section.title, variants=variants, used=ctx.used_callouts) + return _daveify_callout_reframe(callout, ctx=ctx, key=section.title) if "THIRD-PARTY" in title_upper: - return "\n".join( - [ - "> **The Dave Factor:** Third-party risk becomes a questionnaire supply chain, where the slowest vendor defines your security posture.", - "> **Countermeasure:** Standardize evidence requests and automate reminders, while enforcing a clear accept/block decision path.", - ] + variants = [ + "\n".join( + [ + "> **The Dave Factor:** Third-party risk becomes a questionnaire supply chain, where the slowest vendor defines your security posture.", + "> **Countermeasure:** Standardize evidence requests and automate reminders, while enforcing a clear accept/block decision path.", + '> The problem is not vendor risk. The problem is nobody owns the "no" when revenue needs a "yes" on paper.', + ] + ), + "\n".join( + [ + "> **The Dave Factor:** Third-party risk becomes hot-potato routing: everyone can forward the form, nobody can block the deal.", + "> **Countermeasure:** Define a single decision owner and a binary accept/block path with documented compensating controls.", + "> This is not a paperwork problem. It is a decision problem: the form moves, the authority doesn't.", + ] + ), + "\n".join( + [ + "> **The Dave Factor:** Vendor assurance becomes vendor storytelling, and we rate the PDF instead of the reality.", + "> **Countermeasure:** Standardize evidence types, require freshness, and make renewals contingent on verified controls.", + "> The problem isn't the PDF. The problem is we grade the PDF because the implementation is not observable.", + ] + ), + ] + callout = ctx.pick_unique( + kind="callout:third_party", key=section.title, variants=variants, used=ctx.used_callouts ) + return _daveify_callout_reframe(callout, ctx=ctx, key=section.title) if title_upper.startswith("LLM01") or "PROMPT INJECTION" in title_upper: - return "\n".join( - [ - "> **The Dave Factor:** The prompt becomes the policy, and the policy becomes a suggestion once customers start asking nicely.", - "> **Countermeasure:** Treat prompts as code: version them, test them, and gate tool-use behind explicit allowlists.", - ] + return _daveify_callout_reframe( + "\n".join( + [ + "> **The Dave Factor:** The prompt becomes the policy, and the policy becomes a suggestion once customers start asking nicely.", + "> **Countermeasure:** Treat prompts as code: version them, test them, and gate tool-use behind explicit allowlists.", + "> The failure isn't a clever prompt. The failure is treating free-form text like an authorization layer.", + ] + ), + ctx=ctx, + key=section.title, ) if title_upper.startswith("LLM02") or "SENSITIVE INFORMATION" in title_upper: - return "\n".join( - [ - "> **The Dave Factor:** Redaction becomes a meeting, and meetings are not a data loss prevention strategy.", - "> **Countermeasure:** Minimize secret exposure to the model, redact upstream, and add output filters with stop conditions.", - ] + return _daveify_callout_reframe( + "\n".join( + [ + "> **The Dave Factor:** Redaction becomes a meeting, and meetings are not a data loss prevention strategy.", + "> **Countermeasure:** Minimize secret exposure to the model, redact upstream, and add output filters with stop conditions.", + "> The problem isn't redaction. The problem is letting sensitive data reach the model and hoping policy will catch up later.", + ] + ), + ctx=ctx, + key=section.title, ) if title_upper.startswith("LLM03") or "SUPPLY CHAIN" in title_upper: - return "\n".join( - [ - "> **The Dave Factor:** We inherit risk at the speed of `pip install` while accountability ships quarterly.", - "> **Countermeasure:** Pin + verify artifacts, require SBOMs, and make provenance a merge gate, not a slide.", - ] + return _daveify_callout_reframe( + "\n".join( + [ + "> **The Dave Factor:** We inherit risk at the speed of `pip install` while accountability ships quarterly.", + "> **Countermeasure:** Pin + verify artifacts, require SBOMs, and make provenance a merge gate, not a slide.", + "> This is not a dependency problem. It is a provenance problem: if you can't verify it, you can't ship it.", + ] + ), + ctx=ctx, + key=section.title, ) if title_upper.startswith("LLM04") or "POISONING" in title_upper: - return "\n".join( - [ - "> **The Dave Factor:** Training data is treated as a vibe, so model drift is treated as a surprise.", - "> **Countermeasure:** Track dataset lineage, add poisoning checks, and keep rollback paths for fine-tunes.", - ] + return _daveify_callout_reframe( + "\n".join( + [ + "> **The Dave Factor:** Training data is treated as a vibe, so model drift is treated as a surprise.", + "> **Countermeasure:** Track dataset lineage, add poisoning checks, and keep rollback paths for fine-tunes.", + "> The problem isn't drift. The problem is unowned datasets quietly becoming production behavior.", + ] + ), + ctx=ctx, + key=section.title, ) if title_upper.startswith("LLM05") or "OUTPUT HANDLING" in title_upper: - return "\n".join( - [ - "> **The Dave Factor:** The model output is interpreted as intent, and intent is treated as authorization.", - "> **Countermeasure:** Validate and constrain outputs before execution; never treat free-form text as a command.", - ] + return _daveify_callout_reframe( + "\n".join( + [ + "> **The Dave Factor:** The model output is interpreted as intent, and intent is treated as authorization.", + "> **Countermeasure:** Validate and constrain outputs before execution; never treat free-form text as a command.", + '> The failure is not "bad output." The failure is wiring output to action without a gate that can say "no" in production.', + ] + ), + ctx=ctx, + key=section.title, ) if title_upper.startswith("LLM06") or "EXCESSIVE AGENCY" in title_upper: - return "\n".join( - [ - "> **The Dave Factor:** Agents are given keys because it demos well, and later we discover the locks were optional.", - "> **Countermeasure:** Least privilege for tools, human confirmation for irreversible actions, and hard spend limits.", - ] + return _daveify_callout_reframe( + "\n".join( + [ + "> **The Dave Factor:** Agents are given keys because it demos well, and later we discover the locks were optional.", + "> **Countermeasure:** Least privilege for tools, human confirmation for irreversible actions, and hard spend limits.", + "> The problem isn't autonomy. The problem is giving autonomy a budget, credentials, and no stop switch.", + ] + ), + ctx=ctx, + key=section.title, ) if title_upper.startswith("LLM07") or "PROMPT LEAKAGE" in title_upper: - return "\n".join( - [ - "> **The Dave Factor:** We call it a secret because it feels better than calling it user-visible configuration.", - "> **Countermeasure:** Assume prompts leak; move secrets out of prompts and verify outputs for prompt fragments.", - ] + return _daveify_callout_reframe( + "\n".join( + [ + "> **The Dave Factor:** We call it a secret because it feels better than calling it user-visible configuration.", + "> **Countermeasure:** Assume prompts leak; move secrets out of prompts and verify outputs for prompt fragments.", + "> This is not a prompt problem. It is a secret-management problem that happens to be written in English.", + ] + ), + ctx=ctx, + key=section.title, ) if title_upper.startswith("LLM08") or "VECTOR" in title_upper or "EMBEDDING" in title_upper: - return "\n".join( - [ - "> **The Dave Factor:** RAG becomes \"trust the nearest chunk,\" which is a governance model with a memory problem.", - "> **Countermeasure:** Sanitize ingestion, filter retrieval, and sign/score sources so bad context can’t masquerade as truth.", - ] + return _daveify_callout_reframe( + "\n".join( + [ + "> **The Dave Factor:** RAG becomes \"trust the nearest chunk,\" which is a governance model with a memory problem.", + "> **Countermeasure:** Sanitize ingestion, filter retrieval, and sign/score sources so bad context can't masquerade as truth.", + "> The failure isn't retrieval. The failure is untrusted context being allowed to vote on reality.", + ] + ), + ctx=ctx, + key=section.title, ) if title_upper.startswith("LLM09") or "MISINFORMATION" in title_upper: - return "\n".join( - [ - "> **The Dave Factor:** Confidence is mistaken for correctness, and correctness is postponed until after shipment.", - "> **Countermeasure:** Require citations, add verification checks, and gate decisions on evidence rather than tone.", - ] + return _daveify_callout_reframe( + "\n".join( + [ + "> **The Dave Factor:** Confidence is mistaken for correctness, and correctness is postponed until after shipment.", + "> **Countermeasure:** Require citations, add verification checks, and gate decisions on evidence rather than tone.", + "> The problem isn't tone. The problem is decisions being made without a verification path an adversary would accept.", + ] + ), + ctx=ctx, + key=section.title, ) if title_upper.startswith("LLM10") or "UNBOUNDED CONSUMPTION" in title_upper: - return "\n".join( - [ - "> **The Dave Factor:** Cost overruns are reframed as \"unexpected adoption,\" which is how budgets die politely.", - "> **Countermeasure:** Rate limit, cap tokens, and make spend alerts actionable (with enforced cutoffs).", - ] + return _daveify_callout_reframe( + "\n".join( + [ + "> **The Dave Factor:** Cost overruns are reframed as \"unexpected adoption,\" which is how budgets die politely.", + "> **Countermeasure:** Rate limit, cap tokens, and make spend alerts actionable (with enforced cutoffs).", + '> This is not a pricing problem. It is an absence-of-limits problem, disguised as "growth" in the monthly deck.', + ] + ), + ctx=ctx, + key=section.title, ) if "PULL REQUEST" in title_upper: - return "\n".join( - [ - "> **The Dave Factor:** Exceptions become the default pathway, because the policy is strict and the deadline is real.", - "> **Countermeasure:** Define merge-blocking thresholds, time-box every exception, and make expiry automatic.", - ] + return _daveify_callout_reframe( + "\n".join( + [ + "> **The Dave Factor:** Exceptions become the default pathway, because the policy is strict and the deadline is real.", + "> **Countermeasure:** Define merge-blocking thresholds, time-box every exception, and make expiry automatic.", + "> The problem isn't exceptions. The problem is exceptions without expiry quietly becoming the policy.", + ] + ), + ctx=ctx, + key=section.title, ) if "SHIFTING LEFT" in title_upper: - return "\n".join( - [ - '> **The Dave Factor:** "Shift left" becomes "optional left," which means the same issues arrive later with better excuses.', - "> **Countermeasure:** Gate on local scan signals where possible (or require attestations that are actually checked).", - ] + return _daveify_callout_reframe( + "\n".join( + [ + '> **The Dave Factor:** "Shift left" becomes "optional left," which means the same issues arrive later with better excuses.', + "> **Countermeasure:** Gate on local scan signals where possible (or require attestations that are actually checked).", + "> The failure isn't late detection. The failure is a workflow with no enforced moment where risk can be stopped.", + ] + ), + ctx=ctx, + key=section.title, ) if "REQUEST EVIDENCE" in title_upper or _has(excerpt, "access request", "screenshot"): - return "\n".join( - [ - "> **The Dave Factor:** Screenshots are compliance theater: easy to collect, hard to verify, and immortal in shared drives.", - "> **Countermeasure:** Prefer verifiable telemetry (scan events) over images, and pause access when signals go dark.", - ] + return _daveify_callout_reframe( + "\n".join( + [ + "> **The Dave Factor:** Screenshots are compliance theater: easy to collect, hard to verify, and immortal in shared drives.", + "> **Countermeasure:** Prefer verifiable telemetry (scan events) over images, and pause access when signals go dark.", + "> The problem isn't local testing. The problem is the screenshot becomes the control, and the test becomes a vibe.", + ] + ), + ctx=ctx, + key=section.title, ) if "AUDIT" in title_upper or _has(excerpt, "usage reports", "periodic audits"): - return "\n".join( - [ - "> **The Dave Factor:** Dashboards become a KPI trend, and KPIs become a calendar invite.", - "> **Countermeasure:** Tie the dashboard to explicit SLOs and a remediation loop with owners and deadlines.", - ] + return _daveify_callout_reframe( + "\n".join( + [ + "> **The Dave Factor:** Dashboards become a KPI trend, and KPIs become a calendar invite.", + "> **Countermeasure:** Tie the dashboard to explicit SLOs and a remediation loop with owners and deadlines.", + "> The problem isn't reporting. The problem is dashboards replacing decisions, because decisions create liability.", + ] + ), + ctx=ctx, + key=section.title, ) if "TRAINING" in title_upper or _has(excerpt, "snyk learn", "quiz"): - return "\n".join( - [ - "> **The Dave Factor:** Completion certificates are treated as controls, even when behavior doesn’t change.", - "> **Countermeasure:** Add a practical gate (local scan + PR checks) so training is support, not the defense.", - ] + return _daveify_callout_reframe( + "\n".join( + [ + "> **The Dave Factor:** Completion certificates are treated as controls, even when behavior doesn't change.", + "> **Countermeasure:** Add a practical gate (local scan + PR checks) so training is support, not the defense.", + "> The problem isn't education. The problem is treating completion as enforcement while the workflow stays permissive.", + ] + ), + ctx=ctx, + key=section.title, ) if "ACCESS CONTROL" in title_upper or _has(excerpt, "endpoint management", "prerequisites", "extensions"): - return "\n".join( - [ - '> **The Dave Factor:** Access controls drift into "enablement," and enablement drifts into "we made a wiki."', - "> **Countermeasure:** Make prerequisites machine-checkable and make exceptions expire by default.", - ] + return _daveify_callout_reframe( + "\n".join( + [ + '> **The Dave Factor:** Access controls drift into "enablement," and enablement drifts into "we made a wiki."', + "> **Countermeasure:** Make prerequisites machine-checkable and make exceptions expire by default.", + '> The problem is not access. The problem is "enablement" becoming the polite word for bypass.', + ] + ), + ctx=ctx, + key=section.title, ) if _has(title_upper, "PATH FORWARD") or _has(excerpt, "secure innovation", "talk to our team"): - return "\n".join( - [ - '> **The Dave Factor:** Pilots persist indefinitely because "graduation criteria" were never aligned.', - "> **Countermeasure:** Publish rollout milestones and a stop condition that cannot be reframed as iteration.", - ] + return _daveify_callout_reframe( + "\n".join( + [ + '> **The Dave Factor:** Pilots persist indefinitely because "graduation criteria" were never aligned.', + "> **Countermeasure:** Publish rollout milestones and a stop condition that cannot be reframed as iteration.", + "> The problem isn't pilots. The problem is pilots becoming a standing meeting with no exit criteria.", + ] + ), + ctx=ctx, + key=section.title, ) - return None + + if ctx.locale.lower().startswith("fr"): + anchors = _extract_numeric_anchors(section.body, limit=2) + anchor_hint = f" (repères : {', '.join(anchors)})" if anchors else "" + variants = [ + "\n".join( + [ + "> **Le facteur Dave :** La conformité devient un calendrier, et le calendrier devient la preuve.", + "> **Contre-mesure :** Transformer l’obligation en portes de contrôle (émission, transmission, statut, archivage) avec un responsable et une échéance.", + f"> Le problème n’est pas le format. Le problème, c’est l’absence de portes de contrôle opposables{anchor_hint}.", + ] + ), + "\n".join( + [ + "> **Le facteur Dave :** La multiplication des outils est présentée comme de l’agilité, jusqu’au jour où elle devient un coût fixe.", + "> **Contre-mesure :** Réduire le nombre de points de saisie et imposer une source de vérité, avec des exceptions à durée limitée.", + f"> Le problème n’est pas l’ERP. Le problème, c’est la dispersion du référentiel{anchor_hint}.", + ] + ), + "\n".join( + [ + "> **Le facteur Dave :** On « suit les statuts » tant que cela se résume à un tableur et une réunion de fin de mois.", + "> **Contre-mesure :** Instrumenter les statuts et bloquer l’étape suivante quand un signal manque, plutôt que de le commenter.", + f"> Le problème n’est pas le suivi. Le problème, c’est l’absence de signal qui bloque réellement{anchor_hint}.", + ] + ), + "\n".join( + [ + "> **Le facteur Dave :** L’archivage devient un dossier partagé, et le dossier partagé devient la politique.", + "> **Contre-mesure :** Exiger un archivage automatique, traçable et borné dans le temps, avec contrôle de complétude.", + f"> Le problème n’est pas l’archivage. Le problème, c’est l’impossibilité de prouver rapidement{anchor_hint}.", + ] + ), + "\n".join( + [ + "> **Le facteur Dave :** Le « plan d’action » devient une diapositive, et la diapositive devient une décision.", + "> **Contre-mesure :** Fixer des étapes, chacune avec un livrable et un critère de sortie non négociable.", + f"> Le problème n’est pas le plan. Le problème, c’est l’absence de critères de sortie{anchor_hint}.", + ] + ), + "\n".join( + [ + "> **Le facteur Dave :** On appelle cela « pilotage » quand il s’agit surtout de préserver la sérénité des parties prenantes.", + "> **Contre-mesure :** Choisir un indicateur, une fréquence, et un responsable qui peut dire « non » sans attendre le prochain comité.", + f"> Le problème n’est pas la rentabilité. Le problème, c’est la responsabilité qui se dissout{anchor_hint}.", + ] + ), + ] + callout = ctx.pick_unique(kind="callout:fr:fallback", key=section.title, variants=variants, used=ctx.used_callouts) + return _daveify_callout_reframe(callout, ctx=ctx, key=section.title) + + if "REFERENCE LINKS" in title_upper or title_upper.strip() in {"REFERENCES", "REFERENCE"}: + return None + if not section.body.strip(): + return None + + anchors = _extract_numeric_anchors(excerpt, limit=2) + anchor_hint = f" (anchors: {', '.join(anchors)})" if anchors else "" + variants = [ + "\n".join( + [ + "> **The Dave Factor:** The plan becomes the status update, and the status update becomes the plan.", + "> **Countermeasure:** Name one owner, one gate, and one stop condition that blocks, not \"raises awareness.\"", + f"> The problem isn't intent. The problem is intent without an enforceable gate{anchor_hint}.", + ] + ), + "\n".join( + [ + "> **The Dave Factor:** The checklist becomes a mood board: everyone agrees, nothing blocks.", + "> **Countermeasure:** Make evidence machine-checkable, and make exceptions expire by default.", + f"> The problem isn't policy. The problem is policy that can't say \"no\" in CI{anchor_hint}.", + ] + ), + "\n".join( + [ + "> **The Dave Factor:** Alignment sessions become the control, because controls create accountability.", + "> **Countermeasure:** Replace meetings with automated gates and a remediation loop with owners and deadlines.", + f"> The problem isn't alignment. The problem is alignment replacing enforcement{anchor_hint}.", + ] + ), + "\n".join( + [ + "> **The Dave Factor:** We treat risk as a slide, so it behaves like a slide: it moves when the deck moves.", + "> **Countermeasure:** Tie risk acceptance to an owner, an expiry date, and a verifier step.", + f"> The problem isn't risk. The problem is risk without expiry{anchor_hint}.", + ] + ), + ] + callout = ctx.pick_unique(kind="callout:en:fallback", key=section.title, variants=variants, used=ctx.used_callouts) + return _daveify_callout_reframe(callout, ctx=ctx, key=section.title) -def _render_intro(section: _SourceSection) -> str: +def _render_punchline_closer(section: _SourceSection, *, ctx: _RenderContext) -> str | None: + title_upper = section.title.strip().upper() + if title_upper in {"TABLE OF CONTENTS", "LICENSE AND USAGE", "REVISION HISTORY", "PROJECT SPONSORS"}: + return None + if not section.body.strip(): + return None + + anchors = _extract_numeric_anchors(f"{section.why_it_matters or ''}\n{section.body}".strip(), limit=2) + anchor = anchors[0] if anchors else "" + anchor_hint = f" ({anchor})" if anchor else "" + + if ctx.locale.lower().startswith("fr"): + variants = [ + f"Nous allons être totalement alignés sur le résultat{anchor_hint}, jusqu’au moment où la première exception devient le processus par défaut.", + f"Si le calendrier est le livrable{anchor_hint}, alors le risque est déjà en production — et la preuve, elle, reste en « phase deux ».", + f"Ce n’est pas une question d’outil{anchor_hint}. C’est une question de porte de contrôle : si rien ne bloque, tout finit par passer.", + f"Nous pouvons appeler cela « simplification »{anchor_hint} tant que cela tient dans une slide ; dès que c’est opposable, cela devient subitement « complexité ».", + ] + tails = _DAVE_REFRAME_TAILS_FR + else: + variants = [ + f"We will be perfectly aligned on the outcome{anchor_hint} right up until the first exception becomes the default workflow.", + f"If the calendar is the deliverable{anchor_hint}, then the risk is already in production — and the evidence is still in phase two.", + f"This is not a tooling problem{anchor_hint}. It's a gating problem: if nothing blocks, everything ships eventually.", + f"We can call it \"simplification\"{anchor_hint} as long as it fits on a slide; the moment it's enforceable, it becomes \"complexity.\"", + ] + tails = _DAVE_REFRAME_TAILS + + punchline = ctx.pick_unique(kind="punchline", key=section.title, variants=variants, used=ctx.used_punchlines) + tail = ctx.pick_unique(kind="reframe_tail", key=f"punchline:{section.title}", variants=tails, used=ctx.used_reframe_tails) + return _inject_plain_tail(punchline, tail) + + +def _render_intro(section: _SourceSection, *, ctx: _RenderContext) -> str: lines = [ln.strip() for ln in section.body.splitlines() if ln.strip()] tagline = "\n".join(lines[:7]).strip() if lines else "" @@ -1060,20 +1990,32 @@ def _render_intro(section: _SourceSection) -> str: if tagline: out.extend([f"> {tagline}", ""]) - out.extend( - [ - "We love the ambition here and are directionally aligned with the idea of moving quickly while remaining contractually comfortable.", - "The source frames the core tension clearly: higher throughput tends to surface more vulnerabilities, which is a volume-and-velocity story, not a tool failure story.", - "Accordingly, the practical path is to operationalize guardrails as workflow defaults (PR, IDE, CI/CD, and access controls), while ensuring the rollout remains optimized for alignment and minimal disruption on paper.", - "In other words: we can move fast and be safe, as long as we define safe as \"documented\" and fast as \"agendized.\"", - "", - "> **InfraFabric Red Team Note:** Vendors sell secure speed. InfraFabric audits what survives contact with bureaucracy.", - ] - ) + if ctx.locale.lower().startswith("fr"): + out.extend( + [ + "Nous saluons l’ambition et sommes directionnellement alignés avec l’idée d’aller vite, à condition de préserver le confort des parties prenantes, la déresponsabilisation élégante, et l’option stratégique de convoquer une réunion supplémentaire.", + "Le document-source pose une tension simple : lorsqu’on augmente le volume, on augmente aussi la surface d’erreur — ce n’est pas une question de morale, mais de débit.", + "En pratique, la voie utile consiste à transformer les intentions en mécanismes de travail par défaut (portes de contrôle, preuves, délais), tout en conservant un vocabulaire suffisamment rassurant pour rester politiquement déployable.", + "Autrement dit : nous pouvons aller vite et rester prudents, tant que « prudent » signifie « traçable » et que « vite » signifie « mis à l’agenda ».", + "", + "> **Note Red Team InfraFabric :** les éditeurs vendent la vitesse « sûre ». InfraFabric audite ce qui survit au contact de la bureaucratie.", + ] + ) + else: + out.extend( + [ + "We love the ambition here and are directionally aligned with moving quickly, provided we can preserve stakeholder comfort, plausible deniability, and the strategic option to schedule another meeting.", + "The source frames the core tension clearly: higher throughput tends to surface more vulnerabilities, which is a volume-and-velocity story, not a tool failure story.", + "Accordingly, the practical path is to operationalize guardrails as workflow defaults (PR, IDE, CI/CD, and access controls), while ensuring the rollout remains optimized for alignment and minimal disruption on paper.", + "In other words: we can move fast and be safe, as long as we define safe as \"documented\" and fast as \"agendized.\"", + "", + "> **InfraFabric Red Team Note:** Vendors sell secure speed. InfraFabric audits what survives contact with bureaucracy.", + ] + ) return "\n".join(out).strip() -def _render_section(section: _SourceSection) -> str: +def _render_section(section: _SourceSection, *, ctx: _RenderContext) -> str: excerpt = f"{section.title}\n{section.why_it_matters or ''}\n{section.body}".strip() paragraphs: list[str] = [] @@ -1087,8 +2029,8 @@ def _render_section(section: _SourceSection) -> str: paragraphs.extend( [ f"We are broadly aligned with the intent of **{risk}**, and we appreciate the clarity of naming the failure mode up front.", - "In practice, this risk becomes operational the moment the model is placed inside a workflow that has permissions, deadlines, and incentives.", - "Accordingly, we recommend a phased approach that optimizes for stakeholder comfort while still keeping the blast radius machine-bounded.", + f"In practice, **{risk}** becomes operational the moment the model is placed inside a workflow that has permissions, deadlines, and incentives.", + f"Accordingly, for **{risk}**, we recommend a phased approach that optimizes for stakeholder comfort while still keeping the blast radius machine-bounded.", ] ) elif title_upper == "LICENSE AND USAGE": @@ -1165,7 +2107,7 @@ def _render_section(section: _SourceSection) -> str: paragraphs.extend( [ "The situation is always complex, which is helpful because complex situations justify complex tooling and extended stakeholder engagement.", - "The risk is not that the threat landscape is overstated; it’s that the resulting program becomes a comfort narrative rather than an enforceable workflow.", + "The risk is not that the threat landscape is overstated; it's that the resulting program becomes a comfort narrative rather than an enforceable workflow.", ] ) elif "VANTA OVERVIEW" in title_upper: @@ -1186,7 +2128,7 @@ def _render_section(section: _SourceSection) -> str: paragraphs.extend( [ "Security team efficiency is a legitimate goal, especially when review queues become the organizational truth serum.", - "The risk is that throughput improvements are claimed without defining what “review complete” means or what evidence proves it.", + 'The risk is that throughput improvements are claimed without defining what "review complete" means or what evidence proves it.', ] ) elif "IT MANAGEMENT" in title_upper: @@ -1274,9 +2216,33 @@ def _render_section(section: _SourceSection) -> str: ] ) else: - paragraphs.append( - "We are aligned on the intent of this section and recommend a phased approach that optimizes for stakeholder comfort while we validate success criteria." - ) + anchors = _extract_numeric_anchors(section.body, limit=2) + if ctx.locale.lower().startswith("fr"): + anchor_hint = f" (repères : {', '.join(anchors)})" if anchors else "" + variants = [ + f"Nous sommes alignés sur **{section.title}** comme repère narratif{anchor_hint}, à condition de le traduire en contraintes vérifiables plutôt qu’en langage de confort.", + f"**{section.title}**{anchor_hint} est l’endroit où la crédibilité se fabrique ; le risque « Dave » consiste à en faire une séance de ressenti plutôt qu’une frontière d’application.", + f"Cette partie (**{section.title}**){anchor_hint} sera citée en réunion. Extraire un responsable de décision et une porte de contrôle, pour que ce soit exécutable, et non simplement inspirant.", + f"Dans **{section.title}**{anchor_hint}, on voit le plan devenir « compatible parties prenantes ». La contre-mesure consiste à le retraduire en responsables, échéances et critères de blocage.", + f"**{section.title}**{anchor_hint} est le sanctuaire des hypothèses. Les expliciter maintenant évite de les redécouvrir plus tard, au moment où le calendrier devient émotionnellement complexe.", + f"Nous aimons l’intention de **{section.title}**{anchor_hint}. Le risque pratique : que cela devienne une diapositive ; la contre-mesure : en faire une liste de contrôle avec date de péremption.", + f"**{section.title}**{anchor_hint} promet du réalisme. Rendons-le mesurable : point de départ, écart, et un artefact de preuve qui ne nécessite pas un pèlerinage dans un dossier partagé.", + f"Voici **{section.title}**{anchor_hint} : la partie où nous sommes d’accord en principe. Le geste red-team : s’accorder aussi sur ce qui bloque, ce qui alerte, et qui détient l’exception.", + ] + else: + anchor_hint = f" (notably: {', '.join(anchors)})" if anchors else "" + variants = [ + f"We are aligned on **{section.title}** as a narrative anchor{anchor_hint}, and we recommend turning it into constraints rather than comfort language.", + f"**{section.title}** is where credibility is manufactured{anchor_hint}; the Dave failure mode is to treat it as a vibe check instead of a boundary on applicability.", + f"This section (**{section.title}**){anchor_hint} will be quoted in meetings. Extract one decision owner and one gate so it becomes executable, not inspirational.", + f"In **{section.title}**{anchor_hint}, we can see the plan being translated into stakeholder-safe language. The counter-move is to translate it back into owners, deadlines, and stop conditions.", + f"**{section.title}**{anchor_hint} is the spiritual home of assumptions. Make them explicit now, because they will be rediscovered later when timelines get emotionally complex.", + f"We love the intent behind **{section.title}**{anchor_hint}. The practical risk is that it becomes a slide; the mitigation is to make it a checklist with an expiry date.", + f"**{section.title}**{anchor_hint} reads as a promise of realism. Make realism measurable: baseline, delta, and an evidence artifact that doesn't require a shared drive pilgrimage.", + f"This is **{section.title}**{anchor_hint}: the part where we agree in principle. The red-team ask is that we also agree on what blocks, what warns, and who owns the exception path.", + ] + + paragraphs.append(ctx.pick_unique(kind="paragraph:fallback", key=section.title, variants=variants, used=ctx.used_paragraphs)) out: list[str] = [f"## {section.title}"] if section.why_it_matters: @@ -1284,6 +2250,11 @@ def _render_section(section: _SourceSection) -> str: else: out.append("") + if ctx.locale.lower().startswith("fr"): + snippet = _first_sentences(section.body, max_sentences=2, max_chars=280) + if snippet: + out.extend([f"> {snippet}", ""]) + out.extend(paragraphs) if idc_highlights: @@ -1296,11 +2267,11 @@ def _render_section(section: _SourceSection) -> str: ] ) - callout = _render_dave_factor_callout(section) + callout = _render_dave_factor_callout(section, ctx=ctx) if callout: out.extend(["", callout]) - inferred = _render_inferred_diagram(section.title) + inferred = _render_inferred_diagram(section.title, ctx=ctx) if inferred: out.extend(["", inferred]) @@ -1313,7 +2284,11 @@ def _render_section(section: _SourceSection) -> str: if urls: out.extend([*[f"- {u}" for u in urls[:12]]]) else: - out.append("- (No extractable URLs found in text layer.)") + refs = _extract_owasp_reference_items(subbody) + if refs: + out.extend([*[f"- {r}" for r in refs[:12]]]) + else: + out.append("- (No reference links listed.)") continue snippet = _first_sentences(subbody) @@ -1323,48 +2298,52 @@ def _render_section(section: _SourceSection) -> str: if subheading == "Description": out.extend( [ - "At a high level, this is where the model becomes a new input surface with legacy consequences.", - "The risk is rarely the model alone; it is the model inside a workflow that can touch data, tools, and users.", + f"At a high level, **{risk}** is where the model becomes a new input surface with legacy consequences.", + f"The risk is rarely the model alone; it is **{risk}** inside a workflow that can touch data, tools, and users.", ] ) elif subheading.startswith("Common Examples"): out.extend( [ - "Commonly, this shows up as a perfectly reasonable feature request that accidentally becomes a permission escalation.", - "The failure mode is subtle: it looks like productivity until it becomes an incident, at which point it looks like a misunderstanding.", + f"Commonly, **{risk}** shows up as a perfectly reasonable feature request that accidentally becomes a permission escalation.", + f"The failure mode for **{risk}** is subtle: it looks like productivity until it becomes an incident, at which point it looks like a misunderstanding.", ] ) elif subheading == "Prevention and Mitigation Strategies": out.extend( [ - "Mitigation works best when it is boring and enforced: input constraints, output constraints, and tool constraints.", - "If the mitigation is a guideline, it will be treated as optional. If it is a gate, it will be treated as real (and then negotiated).", + f"For **{risk}**, mitigation works best when it is boring and enforced: input constraints, output constraints, and tool constraints.", + f"If mitigation for **{risk}** is a guideline, it will be treated as optional. If it is a gate, it will be treated as real (and then negotiated).", ] ) elif "Attack Scenarios" in subheading: out.extend( [ - "Attack scenarios are less about genius adversaries and more about ordinary users discovering convenient shortcuts.", - "Assume the attacker is persistent, mildly creative, and fully willing to paste weird strings into your UI at 4:55 PM on a Friday.", + f"Attack scenarios for **{risk}** are less about genius adversaries and more about ordinary users discovering convenient shortcuts.", + f"Assume the attacker for **{risk}** is persistent, mildly creative, and fully willing to paste weird strings into your UI at 4:55 PM on a Friday.", ] ) elif subheading == "Related Frameworks and Taxonomies": out.extend( [ - "Framework mappings are useful as long as they remain a bridge to controls, not a substitute for them.", - "The red-team move is to treat every taxonomy link as a work item: owner, artifact, gate, and stop condition.", + f"Framework mappings for **{risk}** are useful as long as they remain a bridge to controls, not a substitute for them.", + f"The red-team move for **{risk}** is to treat every taxonomy link as a work item: owner, artifact, gate, and stop condition.", ] ) else: out.extend( [ - "We are aligned on the intent of this subsection and recommend validating controls in the workflows where the model actually runs.", + f"We are aligned on the intent of this subsection for **{risk}** and recommend validating controls in the workflows where the model actually runs.", ] ) code = _extract_code_block(section.body) if code: - out.extend(["", "```json", code.strip(), "```"]) + lang, normalized_code, is_valid_json = _coerce_json_code_block(code) + if is_valid_json: + out.extend(["", f"```{lang}", normalized_code.strip(), "```"]) + else: + out.extend(["", "### Source snippet (OCR, unverified)", "", f"```{lang}", normalized_code.strip(), "```"]) report = _extract_access_report(section.body) if report: @@ -1378,21 +2357,179 @@ def _render_section(section: _SourceSection) -> str: if form: out.extend(["", form]) + punchline = _render_punchline_closer(section, ctx=ctx) + if punchline: + out.extend(["", punchline]) + return "\n".join(out).strip() -def _generate_dave_v1_2_mirror(*, source_text: str, source_path: str) -> str: +def _truthy_env(name: str) -> bool: + return os.getenv(name, "").strip().lower() in {"1", "true", "yes", "on"} + + +def _action_pack_sections(sections: list[_SourceSection]) -> list[_SourceSection]: + blacklist = {"TABLE OF CONTENTS", "LICENSE AND USAGE", "REVISION HISTORY", "PROJECT SPONSORS"} + selected = [s for s in sections if s.title.strip().upper() not in blacklist] + return selected[:14] + + +def _action_pack_gate(section: _SourceSection) -> str: + title_upper = section.title.upper() + excerpt = f"{section.title}\n{section.why_it_matters or ''}\n{section.body}".lower() + + if "PULL REQUEST" in title_upper or "PR CHECK" in excerpt: + return "PR" + if "SHIFTING LEFT" in title_upper or "IDE" in excerpt or "LOCAL" in excerpt: + return "IDE / local" + if "ACCESS CONTROL" in title_upper or ("ACCESS" in title_upper and "REQUEST" in title_upper): + return "Access" + if "TRAINING" in title_upper or "QUIZ" in excerpt: + return "Training / enablement" + if "AUDIT" in title_upper or "COMPLIANCE" in title_upper: + return "Compliance / audit" + if title_upper.startswith("LLM"): + return "Runtime / app" + if "THIRD-PARTY" in title_upper or "VENDOR" in title_upper: + return "Procurement / TPRM" + if "ROI" in title_upper or "BUSINESS VALUE" in title_upper: + return "Governance / metrics" + return "Governance" + + +def _action_pack_owner(gate: str) -> str: + return { + "PR": "Engineering + AppSec", + "IDE / local": "Developer Enablement + AppSec", + "Access": "Security Platform + IT", + "Training / enablement": "Security Enablement + Engineering Leads", + "Compliance / audit": "GRC + Security", + "Runtime / app": "Platform + AppSec", + "Procurement / TPRM": "TPRM + Security + Procurement", + "Governance / metrics": "Security + Finance", + "Governance": "Security + Engineering Leadership", + }.get(gate, "Security + Engineering") + + +def _action_pack_stop_condition(gate: str) -> str: + return { + "PR": "Block merge on high severity (or unknown) findings; exceptions require owner + expiry.", + "IDE / local": "Block/deny assistant enablement when local scan signals are missing for the developer/device.", + "Access": "Deny access until prerequisites are met; exceptions auto-expire and require explicit owner.", + "Training / enablement": "Deny access until training completion is verified (not self-attested).", + "Compliance / audit": "Fail audit-readiness if evidence is missing/freshness expired; trigger remediation with owners.", + "Runtime / app": "Block tool-use/output execution unless allowlists and validation checks pass.", + "Procurement / TPRM": "Do not onboard until minimum evidence set is provided and decision owner signs accept/block.", + "Governance / metrics": "Do not claim ROI until baseline + measurement method are defined and collected.", + 'Governance': 'No "phased rollout" without exit criteria and an explicit decision owner.', + }.get(gate, "Define an explicit stop condition that cannot be reframed as iteration.") + + +def _action_pack_evidence(gate: str) -> str: + return { + "PR": "scan_event_id + policy_version + exception_record(expiry, owner)", + "IDE / local": "device_baseline + local_scan_signal + attestation_id", + "Access": "access_grant_event + prerequisite_check + exception_record(expiry, owner)", + "Training / enablement": "training_completion_id + quiz_result + access_grant_event", + "Compliance / audit": "evidence_bundle_hash + freshness_timestamp + decision_record", + "Runtime / app": "allowlist_version + execution_log_id + output_validation_event", + "Procurement / TPRM": "vendor_evidence_bundle_hash + risk_decision_record + reassessment_date", + "Governance / metrics": "baseline_metrics_snapshot + measurement_notes + renewal_decision_record", + "Governance": "decision_log + rollout_milestones + stop_condition_text", + }.get(gate, "decision_record + evidence_artifact") + + +def _render_action_pack(sections: list[_SourceSection]) -> str: + selected = _action_pack_sections(sections) + if not selected: + return "" + + out: list[str] = [ + "## Action Pack (Operational)", + "", + "This appendix turns the mirror into Monday-morning work: owners, gates, stop conditions, and evidence artifacts.", + "Keep it generic and auditable; adapt to your tooling without inventing fake implementation details.", + "", + "### Control Cards", + ] + + for section in selected: + gate = _action_pack_gate(section) + out.extend( + [ + "", + f"#### {section.title}", + "", + f'- **Control objective:** Prevent the dilution risk described in "{section.title}" by turning guidance into an enforceable workflow.', + f"- **Gate:** {gate}", + f"- **Owner (RACI):** {_action_pack_owner(gate)}", + f"- **Stop condition:** {_action_pack_stop_condition(gate)}", + f"- **Evidence artifact:** {_action_pack_evidence(gate)}", + ] + ) + + out.extend(["", "### Backlog Export (Jira-ready)", ""]) + for idx, section in enumerate(selected, 1): + gate = _action_pack_gate(section) + out.extend( + [ + f"{idx}. [{gate}] {section.title}: define owner, gate, and stop condition", + f" - Acceptance: owner assigned; stop condition documented and approved.", + f" - Acceptance: evidence artifact defined and stored (machine-generated where possible).", + f" - Acceptance: exceptions require owner + expiry; expiry is enforced automatically.", + ] + ) + + out.extend( + [ + "", + "### Policy-as-Code Appendix (pseudo-YAML)", + "", + "```yaml", + "gates:", + " pr:", + " - name: \"risk scanning\"", + " stop_condition: \"block on high severity (or unknown)\"", + " evidence: \"scan_event_id + policy_version\"", + " access:", + " - name: \"assistant enablement\"", + " prerequisite: \"device baseline + local scan signal\"", + " stop_condition: \"deny when signals missing\"", + " evidence: \"access_grant_event + prerequisite_check\"", + " runtime:", + " - name: \"tool-use\"", + " prerequisite: \"allowlist + validation\"", + " stop_condition: \"block disallowed actions\"", + " evidence: \"execution_log_id + allowlist_version\"", + "exceptions:", + " expiry_days: 14", + " require_owner: true", + " require_reason: true", + "evidence:", + " freshness_days: 30", + " require_hash: true", + "```", + ] + ) + + return "\n".join(out).strip() + + +def _generate_dave_v1_2_mirror(*, source_text: str, source_path: str, action_pack: bool, locale: str) -> str: today = _dt.date.today().isoformat() normalized = _normalize_ocr(source_text) extract_sha = _sha256_text(normalized) source_file_sha = _sha256_file(source_path) if Path(source_path).exists() else "unknown" + ctx = _RenderContext(seed=extract_sha, locale=locale) + + action_pack = bool(action_pack) or _truthy_env("REVOICE_ACTION_PACK") sections = _extract_sections(normalized) if not sections: raise ValueError("No content extracted from source") cover_lines = [ln.strip() for ln in sections[0].body.splitlines() if ln.strip() and ln.strip().lower() != "snyk"] - cover_h1 = sections[0].title.strip() or "SHADOW DOSSIER" + cover_h1 = sections[0].title.strip() or ("DOSSIER DE L’OMBRE" if locale.lower().startswith("fr") else "SHADOW DOSSIER") cover_h2 = " ".join(cover_lines[:2]).strip() if cover_lines else "" y, m, d = today.split("-") @@ -1400,25 +2537,45 @@ def _generate_dave_v1_2_mirror(*, source_text: str, source_path: str) -> str: source_basename = Path(source_path).name project_slug = _slugify(Path(source_basename).stem + "-mirror") source_slug = _slugify(source_basename) + filename_title = Path(source_basename).stem.replace("-", " ").replace("_", " ").strip() + if not filename_title: + filename_title = source_basename + + if ( + not cover_h1 + or cover_h1.upper() == "COUVERTURE" + or _looks_like_site_footer(cover_h1) + or len(cover_h1) > 96 + or "." in cover_h1 + ): + cover_h1 = filename_title out: list[str] = [ "---", "BRAND: InfraFabric.io", - "UNIT: RED TEAM (STRATEGIC OPS)", - "DOCUMENT: SHADOW DOSSIER", - "CLASSIFICATION: EYES ONLY // DAVE", + "UNIT: RED TEAM (STRATEGIC OPS)" if not locale.lower().startswith("fr") else "UNIT: RED TEAM (OPÉRATIONS STRATÉGIQUES)", + "DOCUMENT: SHADOW DOSSIER" if not locale.lower().startswith("fr") else "DOCUMENT: DOSSIER DE L’OMBRE", + "CLASSIFICATION: EYES ONLY // DAVE" if not locale.lower().startswith("fr") else "CLASSIFICATION: CONFIDENTIEL // DAVE", "---", "", - "# [ RED TEAM DECLASSIFIED ]", - f"## PROJECT: {project_slug}", - f"### SOURCE: {source_slug}", - f"**INFRAFABRIC REPORT ID:** `{report_id}`", + "# [ RED TEAM DECLASSIFIED ]" if not locale.lower().startswith("fr") else "# [ DÉCLASSIFIÉ – ÉQUIPE ROUGE ]", + f"## PROJECT: {project_slug}" if not locale.lower().startswith("fr") else f"## PROJET : {project_slug}", + f"### SOURCE: {source_slug}" if not locale.lower().startswith("fr") else f"### SOURCE : {source_slug}", + f"**INFRAFABRIC REPORT ID:** `{report_id}`" if not locale.lower().startswith("fr") else f"**ID DE RAPPORT INFRAFABRIC :** `{report_id}`", "", - "> NOTICE: This document is a product of InfraFabric Red Team.", - "> It provides socio-technical friction analysis for how a rollout survives contact with incentives.", + "> NOTICE: This document is a product of InfraFabric Red Team." + if not locale.lower().startswith("fr") + else "> AVIS : ce document est un produit de l’InfraFabric Red Team.", + "> It provides socio-technical friction analysis for how a rollout survives contact with incentives." + if not locale.lower().startswith("fr") + else "> Il fournit une analyse socio-technique des frictions : ce qui survit au contact des incitations.", "", - "**[ ACCESS GRANTED: INFRAFABRIC RED TEAM ]**", - "**[ STATUS: OPERATIONAL REALISM ]**", + "**[ ACCESS GRANTED: INFRAFABRIC RED TEAM ]**" + if not locale.lower().startswith("fr") + else "**[ ACCÈS AUTORISÉ : INFRAFABRIC ÉQUIPE ROUGE ]**", + "**[ STATUS: OPERATIONAL REALISM ]**" + if not locale.lower().startswith("fr") + else "**[ STATUT : RÉALISME OPÉRATIONNEL ]**", "", f"## {cover_h1}", ] @@ -1429,30 +2586,179 @@ def _generate_dave_v1_2_mirror(*, source_text: str, source_path: str) -> str: out.extend( [ - "> Shadow dossier (mirror-first).", + "> Shadow dossier (mirror-first)." if not locale.lower().startswith("fr") else "> Dossier de l’ombre (miroir d’abord).", ">", - "> Protocol: IF.DAVE.v1.2", - "> Citation: `if://bible/dave/v1.2`", - f"> Source: `{source_path}`", - f"> Generated: `{today}`", - f"> Source Hash (sha256): `{source_file_sha}`", - f"> Extract Hash (sha256): `{extract_sha}`", + "> Protocol: IF.DAVE.v1.2" if not locale.lower().startswith("fr") else "> Protocole : IF.DAVE.v1.2", + "> Citation: `if://bible/dave/v1.2`" + if not locale.lower().startswith("fr") + else "> Citation : `if://bible/dave/fr/v1.2`", + f"> Source: `{source_basename}`" if not locale.lower().startswith("fr") else f"> Source : `{source_basename}`", + f"> Generated: `{today}`" if not locale.lower().startswith("fr") else f"> Généré le : `{today}`", + f"> Source Hash (sha256): `{source_file_sha}`" + if not locale.lower().startswith("fr") + else f"> Empreinte source (sha256) : `{source_file_sha}`", + f"> Extract Hash (sha256): `{extract_sha}`" + if not locale.lower().startswith("fr") + else f"> Empreinte d’extraction (sha256) : `{extract_sha}`", "", ] ) for section in sections[1:]: if section.title.strip().upper() == "INTRODUCTION": - out.append(_render_intro(section)) + out.append(_render_intro(section, ctx=ctx)) else: - out.append(_render_section(section)) + out.append(_render_section(section, ctx=ctx)) + out.append("") + + if action_pack: + out.append(_render_action_pack(sections[1:])) out.append("") out.extend( [ "---", "", - "*Standard Dave Footer:* This document is intended for the recipient only. If you are not the recipient, please delete it and forget you saw anything. P.S. Please consider the environment before printing this email.", + "*InfraFabric Red Team Footer:* **RED-TEAM Shadow Dossiers**, part of the **InfraFabric.io governance stack**: https://infrafabric.io" + if not locale.lower().startswith("fr") + else "*InfraFabric Red Team Footer:* **RED-TEAM Shadow Dossiers**, partie de la **pile de gouvernance InfraFabric.io** : https://infrafabric.io", + "*Standard Dave Footer:* This document is intended for the recipient only. If you are not the recipient, please delete it and forget you saw anything. P.S. Please consider the environment before printing this email." + if not locale.lower().startswith("fr") + else "*Standard Dave Footer:* Ce document est destiné au seul destinataire. Si vous n’êtes pas le destinataire, veuillez le supprimer et oublier que vous l’avez vu. P.S. Veuillez considérer l’environnement avant d’imprimer ce document.", + ] + ) + + return "\n".join(out).strip() + "\n" + + +def _infer_vertical_line(*, normalized_text: str, source_basename: str, locale: str) -> str | None: + text = f"{source_basename}\n{normalized_text}".lower() + if "ai code guardrails" in text or ("guardrails" in text and "code" in text): + if locale.lower().startswith("fr"): + return "> Vertical : déploiement DevSecOps — vents contraires : dérive de politique et exceptions qui deviennent la norme." + return "> Vertical: DevSecOps rollout — headwinds: policy drift and exception creep." + if "owasp" in text and "llm" in text: + if locale.lower().startswith("fr"): + return "> Vertical : application LLM — vents contraires : injection de prompts et dette de contrôle." + return "> Vertical: LLM application security — headwinds: prompt injection and control debt." + return None + + +def _generate_dave_v1_3_mirror(*, source_text: str, source_path: str, action_pack: bool, locale: str) -> str: + today = _dt.date.today().isoformat() + normalized = _normalize_ocr(source_text) + extract_sha = _sha256_text(normalized) + source_file_sha = _sha256_file(source_path) if Path(source_path).exists() else "unknown" + ctx = _RenderContext(seed=extract_sha, locale=locale) + + action_pack = bool(action_pack) or _truthy_env("REVOICE_ACTION_PACK") + + sections = _extract_sections(normalized) + if not sections: + raise ValueError("No content extracted from source") + + cover_lines = [ln.strip() for ln in sections[0].body.splitlines() if ln.strip() and ln.strip().lower() != "snyk"] + cover_h1 = sections[0].title.strip() or ("DOSSIER DE L’OMBRE" if locale.lower().startswith("fr") else "SHADOW DOSSIER") + cover_h2 = " ".join(cover_lines[:2]).strip() if cover_lines else "" + + y, m, d = today.split("-") + report_id = f"IF-RT-DAVE-{y}-{m}{d}" + source_basename = Path(source_path).name + project_slug = _slugify(Path(source_basename).stem + "-mirror") + source_slug = _slugify(source_basename) + filename_title = Path(source_basename).stem.replace("-", " ").replace("_", " ").strip() + if not filename_title: + filename_title = source_basename + + if ( + not cover_h1 + or cover_h1.upper() == "COUVERTURE" + or _looks_like_site_footer(cover_h1) + or len(cover_h1) > 96 + or "." in cover_h1 + ): + cover_h1 = filename_title + + vertical_line = _infer_vertical_line(normalized_text=normalized, source_basename=source_basename, locale=locale) + + out: list[str] = [ + "---", + "BRAND: InfraFabric.io", + "UNIT: RED TEAM (STRATEGIC OPS)" if not locale.lower().startswith("fr") else "UNIT: RED TEAM (OPÉRATIONS STRATÉGIQUES)", + "DOCUMENT: SHADOW DOSSIER" if not locale.lower().startswith("fr") else "DOCUMENT: DOSSIER DE L’OMBRE", + "CLASSIFICATION: EYES ONLY // DAVE" if not locale.lower().startswith("fr") else "CLASSIFICATION: CONFIDENTIEL // DAVE", + "---", + "", + "# [ RED TEAM DECLASSIFIED ]" if not locale.lower().startswith("fr") else "# [ DÉCLASSIFIÉ – ÉQUIPE ROUGE ]", + f"## PROJECT: {project_slug}" if not locale.lower().startswith("fr") else f"## PROJET : {project_slug}", + f"### SOURCE: {source_slug}" if not locale.lower().startswith("fr") else f"### SOURCE : {source_slug}", + f"**INFRAFABRIC REPORT ID:** `{report_id}`" if not locale.lower().startswith("fr") else f"**ID DE RAPPORT INFRAFABRIC :** `{report_id}`", + "", + "> NOTICE: This document is a product of InfraFabric Red Team." + if not locale.lower().startswith("fr") + else "> AVIS : ce document est un produit de l’InfraFabric Red Team.", + "> It provides socio-technical friction analysis for how a rollout survives contact with incentives." + if not locale.lower().startswith("fr") + else "> Il fournit une analyse socio-technique des frictions : ce qui survit au contact des incitations.", + ] + if vertical_line: + out.extend([vertical_line]) + out.extend( + [ + "", + "**[ ACCESS GRANTED: INFRAFABRIC RED TEAM ]**" + if not locale.lower().startswith("fr") + else "**[ ACCÈS AUTORISÉ : INFRAFABRIC ÉQUIPE ROUGE ]**", + "**[ STATUS: OPERATIONAL REALISM ]**" + if not locale.lower().startswith("fr") + else "**[ STATUT : RÉALISME OPÉRATIONNEL ]**", + "", + f"## {cover_h1}", + ] + ) + if cover_h2: + out.extend([f"### {cover_h2}", ""]) + else: + out.append("") + + out.extend( + [ + "> Shadow dossier (mirror-first)." if not locale.lower().startswith("fr") else "> Dossier de l’ombre (miroir d’abord).", + ">", + "> Protocol: IF.DAVE.v1.3" if not locale.lower().startswith("fr") else "> Protocole : IF.DAVE.v1.3", + "> Citation: `if://bible/dave/v1.3`" + if not locale.lower().startswith("fr") + else "> Citation : `if://bible/dave/fr/v1.3`", + f"> Source: `{source_basename}`" if not locale.lower().startswith("fr") else f"> Source : `{source_basename}`", + f"> Generated: `{today}`" if not locale.lower().startswith("fr") else f"> Généré le : `{today}`", + f"> Source Hash (sha256): `{source_file_sha}`" + if not locale.lower().startswith("fr") + else f"> Empreinte source (sha256) : `{source_file_sha}`", + "", + ] + ) + + for section in sections[1:]: + if section.title.strip().upper() == "INTRODUCTION": + out.append(_render_intro(section, ctx=ctx)) + else: + out.append(_render_section(section, ctx=ctx)) + out.append("") + + if action_pack: + out.append(_render_action_pack(sections[1:])) + out.append("") + + out.extend( + [ + "---", + "", + "*InfraFabric Red Team Footer:* **RED-TEAM Shadow Dossiers** for socio-technical friction analysis: https://infrafabric.io" + if not locale.lower().startswith("fr") + else "*InfraFabric Red Team Footer:* **RED-TEAM Shadow Dossiers** (analyse socio-technique des frictions) : https://infrafabric.io", + "*Standard Dave Footer:* This document is intended for the recipient only. If you are not the recipient, please delete it and forget you saw anything. P.S. Please consider the environment before printing this email." + if not locale.lower().startswith("fr") + else "*Standard Dave Footer:* Ce document est destiné au seul destinataire. Si vous n’êtes pas le destinataire, veuillez le supprimer et oublier que vous l’avez vu. P.S. Veuillez considérer l’environnement avant d’imprimer ce document.", ] ) diff --git a/src/revoice/lint.py b/src/revoice/lint.py index 24bcc85..5380f59 100644 --- a/src/revoice/lint.py +++ b/src/revoice/lint.py @@ -1,5 +1,7 @@ from __future__ import annotations +import hashlib +import json import re @@ -12,30 +14,62 @@ _EMOJI_RE = re.compile( def lint_markdown(*, style_id: str, markdown: str) -> list[str]: - require_mermaid = style_id.lower() in {"if.dave.v1.2", "dave", "if://bible/dave/v1.2"} + require_mermaid = style_id.lower() in { + "if.dave.v1.2", + "if.dave.v1.3", + "if.dave.fr.v1.2", + "if.dave.fr.v1.3", + "dave", + "if://bible/dave/v1.2", + "if://bible/dave/v1.3", + "if://bible/dave/fr/v1.2", + "if://bible/dave/fr/v1.3", + } if style_id.lower() in { "if.dave.v1", "if.dave.v1.1", "if.dave.v1.2", + "if.dave.v1.3", + "if.dave.fr.v1.2", + "if.dave.fr.v1.3", "dave", "if://bible/dave/v1.0", "if://bible/dave/v1.1", "if://bible/dave/v1.2", + "if://bible/dave/v1.3", + "if://bible/dave/fr/v1.2", + "if://bible/dave/fr/v1.3", }: return _lint_dave(markdown, source_text=None, require_mermaid=require_mermaid) return [f"Unknown style id: {style_id}"] def lint_markdown_with_source(*, style_id: str, markdown: str, source_text: str) -> list[str]: - require_mermaid = style_id.lower() in {"if.dave.v1.2", "dave", "if://bible/dave/v1.2"} + require_mermaid = style_id.lower() in { + "if.dave.v1.2", + "if.dave.v1.3", + "if.dave.fr.v1.2", + "if.dave.fr.v1.3", + "dave", + "if://bible/dave/v1.2", + "if://bible/dave/v1.3", + "if://bible/dave/fr/v1.2", + "if://bible/dave/fr/v1.3", + } if style_id.lower() in { "if.dave.v1", "if.dave.v1.1", "if.dave.v1.2", + "if.dave.v1.3", + "if.dave.fr.v1.2", + "if.dave.fr.v1.3", "dave", "if://bible/dave/v1.0", "if://bible/dave/v1.1", "if://bible/dave/v1.2", + "if://bible/dave/v1.3", + "if://bible/dave/fr/v1.2", + "if://bible/dave/fr/v1.3", }: return _lint_dave(markdown, source_text=source_text, require_mermaid=require_mermaid) return [f"Unknown style id: {style_id}"] @@ -44,6 +78,9 @@ def lint_markdown_with_source(*, style_id: str, markdown: str, source_text: str) def _lint_dave(md: str, *, source_text: str | None, require_mermaid: bool) -> list[str]: issues: list[str] = [] + if "InfraFabric Red Team Footer" not in md: + issues.append("Missing required footer: InfraFabric Red Team Footer") + if "Standard Dave Footer" not in md: issues.append("Missing required footer: Standard Dave Footer") @@ -58,4 +95,111 @@ def _lint_dave(md: str, *, source_text: str | None, require_mermaid: bool) -> li if require_mermaid and "```mermaid" not in md: issues.append("Missing required Mermaid diagram") + issues.extend(_lint_duplicate_mermaid(md)) + issues.extend(_lint_duplicate_dave_factor(md)) + issues.extend(_lint_json_blocks(md)) + issues.extend(_lint_repeated_lines(md)) + + return issues + + +def _stable_hash(text: str) -> str: + return hashlib.sha256(text.encode("utf-8", errors="replace")).hexdigest() + + +def _lint_duplicate_mermaid(md: str) -> list[str]: + blocks = re.findall(r"```mermaid\s*([\s\S]*?)```", md, flags=re.MULTILINE) + if len(blocks) <= 1: + return [] + + counts: dict[str, int] = {} + for block in blocks: + normalized = "\n".join([ln.rstrip() for ln in str(block).strip().splitlines() if ln.strip()]) + if not normalized: + continue + h = _stable_hash(normalized) + counts[h] = counts.get(h, 0) + 1 + + issues: list[str] = [] + for h, n in sorted(counts.items(), key=lambda kv: (-kv[1], kv[0])): + if n > 1: + issues.append(f"Duplicate Mermaid diagram appears {n} times (sha256:{h[:12]})") + return issues + + +def _lint_duplicate_dave_factor(md: str) -> list[str]: + lines = md.splitlines() + blocks: list[str] = [] + i = 0 + while i < len(lines): + line = lines[i].rstrip() + if line.startswith("> **The Dave Factor:**"): + block_lines = [line.strip()] + if i + 1 < len(lines) and lines[i + 1].rstrip().startswith("> **Countermeasure:**"): + block_lines.append(lines[i + 1].strip()) + i += 1 + blocks.append("\n".join(block_lines)) + i += 1 + + if len(blocks) <= 1: + return [] + + counts: dict[str, int] = {} + for block in blocks: + h = _stable_hash(block.strip()) + counts[h] = counts.get(h, 0) + 1 + + issues: list[str] = [] + for h, n in sorted(counts.items(), key=lambda kv: (-kv[1], kv[0])): + if n > 1: + issues.append(f"Duplicate Dave Factor callout appears {n} times (sha256:{h[:12]})") + return issues + + +def _lint_json_blocks(md: str) -> list[str]: + blocks = re.findall(r"```json\s*([\s\S]*?)```", md, flags=re.MULTILINE) + issues: list[str] = [] + for idx, raw in enumerate(blocks, 1): + text = str(raw).strip() + if not text: + continue + try: + json.loads(text) + except Exception as e: + issues.append(f"Invalid JSON code block #{idx}: {e}") + return issues + + +def _lint_repeated_lines(md: str) -> list[str]: + lines = md.splitlines() + counts: dict[str, int] = {} + + in_fence = False + fence = None + for ln in lines: + stripped = ln.strip() + if stripped.startswith("```"): + if not in_fence: + in_fence = True + fence = stripped + else: + in_fence = False + fence = None + continue + if in_fence: + continue + if not stripped: + continue + if stripped.startswith("#"): + continue + if stripped.startswith(">"): + continue + if len(stripped) < 18: + continue + counts[stripped] = counts.get(stripped, 0) + 1 + + issues: list[str] = [] + for line, n in sorted(counts.items(), key=lambda kv: (-kv[1], kv[0])): + if n >= 3: + issues.append(f"Repeated line appears {n} times: {line[:120]}{'…' if len(line) > 120 else ''}") return issues