#!/usr/bin/env python3 """ Build an IF.emotion "evidence bundle" tarball for a trace ID. This is an operator tool that runs on the Proxmox host and pulls artifacts from: - pct 220 (emo-social / if.emotion backend) - pct 240 (IF.TTT registry) Outputs: /root/tmp/emo-trace-package-/ payload/... emo_trace_payload_.tar.gz payload_tar_sha256.txt ttt_tarball_audit_entry.json ttt_tarball_chain_record.json ttt_tarball_chain_ref.json """ from __future__ import annotations import argparse import hashlib import json import os import subprocess import tarfile import textwrap import uuid from datetime import datetime, timezone from pathlib import Path from typing import Any def utc_now_iso() -> str: return datetime.now(timezone.utc).isoformat() def sha256_bytes(data: bytes) -> str: return hashlib.sha256(data or b"").hexdigest() def sha256_file(path: Path) -> str: h = hashlib.sha256() with path.open("rb") as f: for chunk in iter(lambda: f.read(1024 * 1024), b""): h.update(chunk) return h.hexdigest() def canonical_json_bytes(obj: Any) -> bytes: return json.dumps(obj, ensure_ascii=False, sort_keys=True, separators=(",", ":")).encode("utf-8") def merkle_root_hex(leaves_hex: list[str]) -> str: if not leaves_hex: return sha256_bytes(b"") level: list[bytes] = [bytes.fromhex(h) for h in leaves_hex if isinstance(h, str) and len(h) == 64] if not level: return sha256_bytes(b"") while len(level) > 1: if len(level) % 2 == 1: level.append(level[-1]) nxt: list[bytes] = [] for i in range(0, len(level), 2): nxt.append(hashlib.sha256(level[i] + level[i + 1]).digest()) level = nxt return level[0].hex() def merkle_inclusion_proof(leaves_hex: list[str], index: int) -> dict: if index < 0 or index >= len(leaves_hex): raise ValueError("index out of range") level: list[bytes] = [bytes.fromhex(h) for h in leaves_hex] proof: list[dict] = [] idx = index while len(level) > 1: if len(level) % 2 == 1: level.append(level[-1]) sibling_idx = idx ^ 1 sibling = level[sibling_idx] side = "left" if sibling_idx < idx else "right" proof.append({"sibling": sibling.hex(), "side": side}) nxt: list[bytes] = [] for i in range(0, len(level), 2): nxt.append(hashlib.sha256(level[i] + level[i + 1]).digest()) level = nxt idx //= 2 root = level[0].hex() return {"index": index, "root": root, "path": proof} def run(cmd: list[str], *, stdin: bytes | None = None, timeout_s: int = 120) -> bytes: p = subprocess.run( cmd, input=stdin, stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=timeout_s, check=False, ) if p.returncode != 0: raise RuntimeError(f"cmd failed ({p.returncode}): {' '.join(cmd)}\n{p.stderr.decode('utf-8',errors='ignore')}") return p.stdout def pct_exec(pct: int, bash_cmd: str, *, stdin: bytes | None = None, timeout_s: int = 120) -> bytes: return run(["pct", "exec", str(pct), "--", "bash", "-lc", bash_cmd], stdin=stdin, timeout_s=timeout_s) def pct_cat(pct: int, path: str, *, timeout_s: int = 120) -> bytes: return pct_exec(pct, f"cat {shlex_quote(path)}", timeout_s=timeout_s) def shlex_quote(s: str) -> str: # Minimal, safe shell quoting for paths. return "'" + (s or "").replace("'", "'\"'\"'") + "'" def write_json(path: Path, obj: Any) -> None: path.write_text(json.dumps(obj, ensure_ascii=False, sort_keys=True, indent=2) + "\n", encoding="utf-8") def write_text(path: Path, text: str) -> None: path.write_text(text, encoding="utf-8") def fetch_api_json(*, trace_id: str, endpoint: str, email: str) -> Any: raw = pct_exec( 220, f"curl -fsSL -H {shlex_quote('X-Auth-Request-Email: ' + email)} http://127.0.0.1:5000{endpoint}", timeout_s=60, ) return json.loads(raw.decode("utf-8", errors="ignore") or "{}") def extract_ttt_signed_record(*, trace_id: str) -> dict: script = textwrap.dedent( f""" python3 - <<'PY' import json tid = {trace_id!r} path = "/opt/if-emotion/data/ttt_signed_log.jsonl" out = None try: with open(path, "r", encoding="utf-8", errors="ignore") as f: for line in f: line = line.strip() if not line: continue try: rec = json.loads(line) except Exception: continue ev = rec.get("event") or {{}} if isinstance(ev, dict) and str(ev.get("trace_id") or "").strip() == tid: out = rec except Exception: out = None print(json.dumps(out or {{}}, ensure_ascii=False, sort_keys=True)) PY """ ).strip() raw = pct_exec(220, script, timeout_s=60) return json.loads(raw.decode("utf-8", errors="ignore") or "{}") def resolve_ttt_records_by_id(record_ids: list[str]) -> list[dict]: payload = json.dumps({"ids": record_ids}, ensure_ascii=False).encode("utf-8") py = """ import json import sys import importlib.util import contextlib req = json.loads(sys.stdin.read() or "{}") ids = req.get("ids") or [] spec = importlib.util.spec_from_file_location("ttt_registry_mod", "/opt/ttt-registry/ttt_registry.py") mod = importlib.util.module_from_spec(spec) with contextlib.redirect_stdout(sys.stderr): spec.loader.exec_module(mod) # type: ignore reg = mod.TTTRegistry() out = [] for rid in ids: rid = str(rid or "").strip() if not rid: continue h = reg.redis.get(f"ttt:index:id:{rid}") if not h: continue try: out.append(reg.get(h)) except Exception: continue print(json.dumps(out, ensure_ascii=False, sort_keys=True)) """.strip() raw = pct_exec( 240, f"OQS_INSTALL_PATH=/opt/ttt-registry/_oqs /opt/ttt-registry/venv/bin/python -c {shlex_quote(py)}", stdin=payload, timeout_s=180, ) try: data = json.loads(raw.decode("utf-8", errors="ignore") or "[]") except Exception: return [] return data if isinstance(data, list) else [data] def write_audit_entries(entries: list[dict]) -> None: payload = json.dumps({"entries": entries}, ensure_ascii=False).encode("utf-8") py = """ import json import sys import re import uuid import redis from pathlib import Path req = json.loads(sys.stdin.read() or "{}") entries = req.get("entries") or [] cfg = Path("/etc/redis/ttt.conf").read_text(encoding="utf-8", errors="ignore") m = re.search(r"^requirepass\\s+(\\S+)", cfg, flags=re.M) password = m.group(1) if m else None r = redis.Redis(host="localhost", port=6380, password=password, decode_responses=True) written = 0 for e in entries: cid = str(e.get("citation_id") or "").strip() parts = cid.split("/") uid = (parts[3] if len(parts) > 3 else "").strip() try: uuid.UUID(uid) except Exception: continue r.set(f"audit:entry:{uid}", json.dumps(e, ensure_ascii=False, sort_keys=True)) written += 1 print(json.dumps({"ok": True, "written": written}, ensure_ascii=False)) """.strip() _ = pct_exec(240, f"/opt/ttt-registry/venv/bin/python -c {shlex_quote(py)}", stdin=payload, timeout_s=60) def ttt_import_audit() -> dict: raw = pct_exec( 240, "OQS_INSTALL_PATH=/opt/ttt-registry/_oqs /opt/ttt-registry/venv/bin/python /opt/ttt-registry/ttt_registry.py import-audit", timeout_s=300, ) txt = raw.decode("utf-8", errors="ignore") or "" # The registry prints capability banners before JSON; best-effort parse the last JSON object. for chunk in reversed([c.strip() for c in txt.splitlines() if c.strip()]): if chunk.startswith("{") and chunk.endswith("}"): try: return json.loads(chunk) except Exception: break return {"raw": txt.strip()} def build_story(trace_id: str, events: list[dict]) -> str: lines = [ "# IF.story — contextual narrative log", "", f"Trace: `{trace_id}`", "", "Deterministic narrative projection of `trace_events.jsonl`. Each line includes the `event_hash` anchor.", "", ] for ev in sorted(events, key=lambda e: int(e.get("idx") or 0)): ts = str(ev.get("ts_utc") or "") et = str(ev.get("type") or "") mono_ms = int(ev.get("mono_ms") or 0) data = ev.get("data") if isinstance(ev.get("data"), dict) else {} h = str(ev.get("event_hash") or "") summary = "" if et == "request_commit": summary = f"Request body commitment; commit_ok={bool(data.get('commit_ok'))} client_trace_id={data.get('client_trace_id') or ''}".strip() elif et == "req_seen": summary = f"REQ_SEEN witnessed; hour={data.get('hour_utc')} count={data.get('count')} merkle_root={data.get('merkle_root')}" elif et == "request_received": summary = f"Auth+quota succeeded; provider={data.get('provider')} model={data.get('requested_model')} stream={data.get('stream')} user_len={data.get('user_len')} auth_ms={data.get('auth_ms')}" elif et == "guard_short_circuit": summary = f"IF.GUARD short-circuit; reasons={data.get('reasons')}" elif et == "trace_finalizing": summary = f"Trace finalizing; ok={data.get('ok')} provider={data.get('provider')}" else: # generic keys = list(data.keys())[:6] if isinstance(data, dict) else [] summary = f"Event data keys={keys}" lines.append(f"- {ts} (+{mono_ms}ms) | `{et}` | {summary} | event_hash={h}") lines += [ "", "Notes:", "- Ground truth remains `trace_events.jsonl` + `ttt_signed_record.json`.", "- REQ_SEEN ledger+head are included; public key is `trace_ed25519.pub`.", "", ] return "\n".join(lines) def build_manifest(payload_dir: Path) -> tuple[dict, dict[str, str]]: sha_map: dict[str, str] = {} files = [] for p in sorted(payload_dir.iterdir(), key=lambda x: x.name): if not p.is_file(): continue data = p.read_bytes() sha = sha256_bytes(data) sha_map[p.name] = sha files.append({"path": p.name, "bytes": len(data), "sha256": sha}) manifest = {"files": files} return manifest, sha_map def write_sha256s(payload_dir: Path, sha_map: dict[str, str]) -> None: lines = [] for name in sorted(sha_map.keys()): lines.append(f"{sha_map[name]} {name}") (payload_dir / "sha256s.txt").write_text("\n".join(lines) + "\n", encoding="utf-8") def tar_payload(workdir: Path, trace_id: str) -> Path: tar_path = workdir / f"emo_trace_payload_{trace_id}.tar.gz" with tarfile.open(tar_path, "w:gz") as tf: tf.add(workdir / "payload", arcname="payload") return tar_path def main() -> int: ap = argparse.ArgumentParser() ap.add_argument("trace_id", help="Trace ID to package") ap.add_argument("--email", default="ds@infrafabric.io", help="Trusted email for owner-gated endpoints") ap.add_argument("--headers", default="", help="Path to captured HTTP response headers (optional)") ap.add_argument("--response", default="", help="Path to captured HTTP response body (optional)") ap.add_argument("--api-payload", default="", help="Path to captured request JSON (optional)") ap.add_argument("--out-dir", default="", help="Output directory (default: /root/tmp/emo-trace-package-)") args = ap.parse_args() trace_id = str(args.trace_id).strip() if not trace_id: raise SystemExit("trace_id required") out_dir = Path(args.out_dir or f"/root/tmp/emo-trace-package-{trace_id}").resolve() payload_dir = out_dir / "payload" payload_dir.mkdir(parents=True, exist_ok=True) # Captured request/response artifacts (optional). if args.headers: write_text(payload_dir / "headers.txt", Path(args.headers).read_text(encoding="utf-8", errors="ignore")) if args.response: # Ensure JSON is stable. raw = Path(args.response).read_text(encoding="utf-8", errors="ignore") try: obj = json.loads(raw) write_json(payload_dir / "response.json", obj) except Exception: write_text(payload_dir / "response.json", raw) if args.api_payload: raw = Path(args.api_payload).read_text(encoding="utf-8", errors="ignore") try: obj = json.loads(raw) write_json(payload_dir / "api_payload.json", obj) except Exception: write_text(payload_dir / "api_payload.json", raw) # API snapshots (owner-gated). api_trace = fetch_api_json(trace_id=trace_id, endpoint=f"/api/trace/{trace_id}", email=args.email) write_json(payload_dir / "api_trace.json", api_trace) api_events = fetch_api_json(trace_id=trace_id, endpoint=f"/api/trace/events/{trace_id}?limit=10000", email=args.email) write_json(payload_dir / "api_events.json", api_events) # Signed record from append-only log (ground truth). ttt_rec = extract_ttt_signed_record(trace_id=trace_id) if not ttt_rec: raise SystemExit("ttt_signed_record not found for trace_id") write_json(payload_dir / "ttt_signed_record.json", ttt_rec) # Raw trace payload (ground truth). payload_path = f"/opt/if-emotion/data/trace_payloads/{trace_id}.json" trace_payload_raw = pct_exec(220, f"cat {shlex_quote(payload_path)}", timeout_s=60) payload_dir.joinpath("trace_payload.json").write_bytes(trace_payload_raw) # Trace events (canonical JSONL form). events = api_events.get("events") if isinstance(api_events, dict) else None if not isinstance(events, list): raise SystemExit("api_events missing events[]") trace_events_lines = [] for ev in sorted((e for e in events if isinstance(e, dict)), key=lambda e: int(e.get("idx") or 0)): trace_events_lines.append(json.dumps({"event": ev}, ensure_ascii=False, sort_keys=True)) write_text(payload_dir / "trace_events.jsonl", "\n".join(trace_events_lines) + "\n") # Story projection. write_text(payload_dir / "if_story.md", build_story(trace_id, [e for e in events if isinstance(e, dict)])) # Trace public key (Ed25519). pub = pct_exec(220, "cat /opt/if-emotion/data/trace_ed25519.pub", timeout_s=30) payload_dir.joinpath("trace_ed25519.pub").write_bytes(pub) # REQ_SEEN hour ledger+head, derived from the trace events. req_seen_ev = next((e for e in events if isinstance(e, dict) and e.get("type") == "req_seen"), None) if not isinstance(req_seen_ev, dict): raise SystemExit("trace has no req_seen event; cannot build completeness proof") hour = str((req_seen_ev.get("data") or {}).get("hour_utc") or "").strip() if not hour: raise SystemExit("req_seen event missing hour_utc") ledger_path = f"/opt/if-emotion/data/req_seen/{hour}.jsonl" head_path = f"/opt/if-emotion/data/req_seen/heads/{hour}.json" ledger_bytes = pct_exec(220, f"cat {shlex_quote(ledger_path)}", timeout_s=30) head_bytes = pct_exec(220, f"cat {shlex_quote(head_path)}", timeout_s=30) payload_dir.joinpath(f"req_seen_{hour}.jsonl").write_bytes(ledger_bytes) payload_dir.joinpath(f"req_seen_head_{hour}.json").write_bytes(head_bytes) # Inclusion proof for this trace_id in the hour ledger. leaves: list[str] = [] idx_for_trace: int | None = None leaf_for_trace: str = "" for raw_line in ledger_bytes.splitlines(): if not raw_line.strip(): continue try: entry = json.loads(raw_line.decode("utf-8", errors="ignore")) except Exception: continue lh = str(entry.get("leaf_hash") or "").strip() if len(lh) != 64: continue leaves.append(lh) if idx_for_trace is None and str(entry.get("trace_id") or "").strip() == trace_id: idx_for_trace = len(leaves) - 1 leaf_for_trace = lh if idx_for_trace is None: raise SystemExit("trace_id not found in REQ_SEEN hour ledger") proof = merkle_inclusion_proof(leaves, idx_for_trace) proof["leaf_hash"] = leaf_for_trace proof["hour_utc"] = hour # Sanity: root must match head's merkle_root. head_obj = json.loads(head_bytes.decode("utf-8", errors="ignore") or "{}") if str(head_obj.get("merkle_root") or "") and proof["root"] != str(head_obj.get("merkle_root") or ""): raise SystemExit("Merkle root mismatch (ledger != head)") write_json(payload_dir / "req_seen_inclusion_proof.json", proof) # Manifest + sha list. manifest, sha_map = build_manifest(payload_dir) write_json(payload_dir / "manifest.json", manifest) write_sha256s(payload_dir, sha_map) # Register child artifacts in IF.TTT (audit:entry -> import-audit -> signed records). child_paths = [ "headers.txt", "response.json", "trace_payload.json", "trace_events.jsonl", "ttt_signed_record.json", "api_trace.json", "api_events.json", "api_payload.json", "if_story.md", "trace_ed25519.pub", f"req_seen_{hour}.jsonl", f"req_seen_head_{hour}.json", "req_seen_inclusion_proof.json", ] children_pre = [] audit_entries = [] created_utc = utc_now_iso() for name in child_paths: p = payload_dir / name if not p.exists(): continue cid_uuid = str(uuid.uuid4()) citation_id = f"if://citation/{cid_uuid}/v1" sha = sha256_file(p) rel_path = f"payload/{name}" children_pre.append({"citation_id": citation_id, "rel_path": rel_path, "sha256": sha}) audit_entries.append( { "citation_id": citation_id, "claim": f"emo-social trace artifact {name} for trace_id={trace_id}", "source_filename": rel_path, "source_sha256": sha, "verification_status": "source-sha256", "ingested_at": created_utc, } ) write_json(payload_dir / "ttt_children_pre.json", {"trace_id": trace_id, "created_utc": created_utc, "children": children_pre}) write_audit_entries(audit_entries) _ = ttt_import_audit() # Resolve signed IF.TTT records for the children. child_ids = [c["citation_id"] for c in children_pre] chain_records = resolve_ttt_records_by_id(child_ids) write_json(payload_dir / "ttt_children_chain_records.json", chain_records) # Minimal index for the bundle. rec_by_id = {r.get("id"): r for r in chain_records if isinstance(r, dict) and r.get("id")} children = [] for c in children_pre: rid = c["citation_id"] rec = rec_by_id.get(rid) or {} children.append( { "citation_id": rid, "rel_path": c["rel_path"], "sha256": c["sha256"], "content_hash": rec.get("content_hash"), "pq_status": rec.get("pq_status"), } ) write_json(payload_dir / "ttt_children.json", {"trace_id": trace_id, "children": children}) # Build tarball and register it in IF.TTT. tar_path = tar_payload(out_dir, trace_id) tar_sha = sha256_file(tar_path) write_text(out_dir / "payload_tar_sha256.txt", f"{tar_sha} {tar_path}\n") tar_uuid = str(uuid.uuid4()) tar_citation_id = f"if://citation/{tar_uuid}/v1" tar_audit_entry = { "citation_id": tar_citation_id, "claim": f"emo-social trace payload tarball (bundle) for trace_id={trace_id}", "source_filename": tar_path.name, "source_sha256": tar_sha, "verification_status": "source-sha256", "ingested_at": utc_now_iso(), "source_path": str(tar_path), } write_json(out_dir / "ttt_tarball_audit_entry.json", tar_audit_entry) write_audit_entries([tar_audit_entry]) _ = ttt_import_audit() tar_chain = resolve_ttt_records_by_id([tar_citation_id]) if not tar_chain: raise SystemExit("Failed to resolve tarball chain record from IF.TTT") tar_rec = tar_chain[0] write_json(out_dir / "ttt_tarball_chain_record.json", tar_rec) write_json(out_dir / "ttt_tarball_chain_ref.json", {"citation_id": tar_citation_id, "content_hash": tar_rec.get("content_hash")}) print(str(out_dir)) return 0 if __name__ == "__main__": raise SystemExit(main())