hosted/emo_trace_pack.py
2025-12-21 10:45:56 +00:00

547 lines
20 KiB
Python

#!/usr/bin/env python3
"""
Build an IF.emotion "evidence bundle" tarball for a trace ID.
This is an operator tool that runs on the Proxmox host and pulls artifacts from:
- pct 220 (emo-social / if.emotion backend)
- pct 240 (IF.TTT registry)
Outputs:
/root/tmp/emo-trace-package-<trace_id>/
payload/...
emo_trace_payload_<trace_id>.tar.gz
payload_tar_sha256.txt
ttt_tarball_audit_entry.json
ttt_tarball_chain_record.json
ttt_tarball_chain_ref.json
"""
from __future__ import annotations
import argparse
import hashlib
import json
import os
import subprocess
import tarfile
import textwrap
import uuid
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
def utc_now_iso() -> str:
return datetime.now(timezone.utc).isoformat()
def sha256_bytes(data: bytes) -> str:
return hashlib.sha256(data or b"").hexdigest()
def sha256_file(path: Path) -> str:
h = hashlib.sha256()
with path.open("rb") as f:
for chunk in iter(lambda: f.read(1024 * 1024), b""):
h.update(chunk)
return h.hexdigest()
def canonical_json_bytes(obj: Any) -> bytes:
return json.dumps(obj, ensure_ascii=False, sort_keys=True, separators=(",", ":")).encode("utf-8")
def merkle_root_hex(leaves_hex: list[str]) -> str:
if not leaves_hex:
return sha256_bytes(b"")
level: list[bytes] = [bytes.fromhex(h) for h in leaves_hex if isinstance(h, str) and len(h) == 64]
if not level:
return sha256_bytes(b"")
while len(level) > 1:
if len(level) % 2 == 1:
level.append(level[-1])
nxt: list[bytes] = []
for i in range(0, len(level), 2):
nxt.append(hashlib.sha256(level[i] + level[i + 1]).digest())
level = nxt
return level[0].hex()
def merkle_inclusion_proof(leaves_hex: list[str], index: int) -> dict:
if index < 0 or index >= len(leaves_hex):
raise ValueError("index out of range")
level: list[bytes] = [bytes.fromhex(h) for h in leaves_hex]
proof: list[dict] = []
idx = index
while len(level) > 1:
if len(level) % 2 == 1:
level.append(level[-1])
sibling_idx = idx ^ 1
sibling = level[sibling_idx]
side = "left" if sibling_idx < idx else "right"
proof.append({"sibling": sibling.hex(), "side": side})
nxt: list[bytes] = []
for i in range(0, len(level), 2):
nxt.append(hashlib.sha256(level[i] + level[i + 1]).digest())
level = nxt
idx //= 2
root = level[0].hex()
return {"index": index, "root": root, "path": proof}
def run(cmd: list[str], *, stdin: bytes | None = None, timeout_s: int = 120) -> bytes:
p = subprocess.run(
cmd,
input=stdin,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
timeout=timeout_s,
check=False,
)
if p.returncode != 0:
raise RuntimeError(f"cmd failed ({p.returncode}): {' '.join(cmd)}\n{p.stderr.decode('utf-8',errors='ignore')}")
return p.stdout
def pct_exec(pct: int, bash_cmd: str, *, stdin: bytes | None = None, timeout_s: int = 120) -> bytes:
return run(["pct", "exec", str(pct), "--", "bash", "-lc", bash_cmd], stdin=stdin, timeout_s=timeout_s)
def pct_cat(pct: int, path: str, *, timeout_s: int = 120) -> bytes:
return pct_exec(pct, f"cat {shlex_quote(path)}", timeout_s=timeout_s)
def shlex_quote(s: str) -> str:
# Minimal, safe shell quoting for paths.
return "'" + (s or "").replace("'", "'\"'\"'") + "'"
def write_json(path: Path, obj: Any) -> None:
path.write_text(json.dumps(obj, ensure_ascii=False, sort_keys=True, indent=2) + "\n", encoding="utf-8")
def write_text(path: Path, text: str) -> None:
path.write_text(text, encoding="utf-8")
def fetch_api_json(*, trace_id: str, endpoint: str, email: str) -> Any:
raw = pct_exec(
220,
f"curl -fsSL -H {shlex_quote('X-Auth-Request-Email: ' + email)} http://127.0.0.1:5000{endpoint}",
timeout_s=60,
)
return json.loads(raw.decode("utf-8", errors="ignore") or "{}")
def extract_ttt_signed_record(*, trace_id: str) -> dict:
script = textwrap.dedent(
f"""
python3 - <<'PY'
import json
tid = {trace_id!r}
path = "/opt/if-emotion/data/ttt_signed_log.jsonl"
out = None
try:
with open(path, "r", encoding="utf-8", errors="ignore") as f:
for line in f:
line = line.strip()
if not line:
continue
try:
rec = json.loads(line)
except Exception:
continue
ev = rec.get("event") or {{}}
if isinstance(ev, dict) and str(ev.get("trace_id") or "").strip() == tid:
out = rec
except Exception:
out = None
print(json.dumps(out or {{}}, ensure_ascii=False, sort_keys=True))
PY
"""
).strip()
raw = pct_exec(220, script, timeout_s=60)
return json.loads(raw.decode("utf-8", errors="ignore") or "{}")
def resolve_ttt_records_by_id(record_ids: list[str]) -> list[dict]:
payload = json.dumps({"ids": record_ids}, ensure_ascii=False).encode("utf-8")
py = """
import json
import sys
import importlib.util
import contextlib
req = json.loads(sys.stdin.read() or "{}")
ids = req.get("ids") or []
spec = importlib.util.spec_from_file_location("ttt_registry_mod", "/opt/ttt-registry/ttt_registry.py")
mod = importlib.util.module_from_spec(spec)
with contextlib.redirect_stdout(sys.stderr):
spec.loader.exec_module(mod) # type: ignore
reg = mod.TTTRegistry()
out = []
for rid in ids:
rid = str(rid or "").strip()
if not rid:
continue
h = reg.redis.get(f"ttt:index:id:{rid}")
if not h:
continue
try:
out.append(reg.get(h))
except Exception:
continue
print(json.dumps(out, ensure_ascii=False, sort_keys=True))
""".strip()
raw = pct_exec(
240,
f"OQS_INSTALL_PATH=/opt/ttt-registry/_oqs /opt/ttt-registry/venv/bin/python -c {shlex_quote(py)}",
stdin=payload,
timeout_s=180,
)
try:
data = json.loads(raw.decode("utf-8", errors="ignore") or "[]")
except Exception:
return []
return data if isinstance(data, list) else [data]
def write_audit_entries(entries: list[dict]) -> None:
payload = json.dumps({"entries": entries}, ensure_ascii=False).encode("utf-8")
py = """
import json
import sys
import re
import uuid
import redis
from pathlib import Path
req = json.loads(sys.stdin.read() or "{}")
entries = req.get("entries") or []
cfg = Path("/etc/redis/ttt.conf").read_text(encoding="utf-8", errors="ignore")
m = re.search(r"^requirepass\\s+(\\S+)", cfg, flags=re.M)
password = m.group(1) if m else None
r = redis.Redis(host="localhost", port=6380, password=password, decode_responses=True)
written = 0
for e in entries:
cid = str(e.get("citation_id") or "").strip()
parts = cid.split("/")
uid = (parts[3] if len(parts) > 3 else "").strip()
try:
uuid.UUID(uid)
except Exception:
continue
r.set(f"audit:entry:{uid}", json.dumps(e, ensure_ascii=False, sort_keys=True))
written += 1
print(json.dumps({"ok": True, "written": written}, ensure_ascii=False))
""".strip()
_ = pct_exec(240, f"/opt/ttt-registry/venv/bin/python -c {shlex_quote(py)}", stdin=payload, timeout_s=60)
def ttt_import_audit() -> dict:
raw = pct_exec(
240,
"OQS_INSTALL_PATH=/opt/ttt-registry/_oqs /opt/ttt-registry/venv/bin/python /opt/ttt-registry/ttt_registry.py import-audit",
timeout_s=300,
)
txt = raw.decode("utf-8", errors="ignore") or ""
# The registry prints capability banners before JSON; best-effort parse the last JSON object.
for chunk in reversed([c.strip() for c in txt.splitlines() if c.strip()]):
if chunk.startswith("{") and chunk.endswith("}"):
try:
return json.loads(chunk)
except Exception:
break
return {"raw": txt.strip()}
def build_story(trace_id: str, events: list[dict]) -> str:
lines = [
"# IF.story — contextual narrative log",
"",
f"Trace: `{trace_id}`",
"",
"Deterministic narrative projection of `trace_events.jsonl`. Each line includes the `event_hash` anchor.",
"",
]
for ev in sorted(events, key=lambda e: int(e.get("idx") or 0)):
ts = str(ev.get("ts_utc") or "")
et = str(ev.get("type") or "")
mono_ms = int(ev.get("mono_ms") or 0)
data = ev.get("data") if isinstance(ev.get("data"), dict) else {}
h = str(ev.get("event_hash") or "")
summary = ""
if et == "request_commit":
summary = f"Request body commitment; commit_ok={bool(data.get('commit_ok'))} client_trace_id={data.get('client_trace_id') or ''}".strip()
elif et == "req_seen":
summary = f"REQ_SEEN witnessed; hour={data.get('hour_utc')} count={data.get('count')} merkle_root={data.get('merkle_root')}"
elif et == "request_received":
summary = f"Auth+quota succeeded; provider={data.get('provider')} model={data.get('requested_model')} stream={data.get('stream')} user_len={data.get('user_len')} auth_ms={data.get('auth_ms')}"
elif et == "guard_short_circuit":
summary = f"IF.GUARD short-circuit; reasons={data.get('reasons')}"
elif et == "trace_finalizing":
summary = f"Trace finalizing; ok={data.get('ok')} provider={data.get('provider')}"
else:
# generic
keys = list(data.keys())[:6] if isinstance(data, dict) else []
summary = f"Event data keys={keys}"
lines.append(f"- {ts} (+{mono_ms}ms) | `{et}` | {summary} | event_hash={h}")
lines += [
"",
"Notes:",
"- Ground truth remains `trace_events.jsonl` + `ttt_signed_record.json`.",
"- REQ_SEEN ledger+head are included; public key is `trace_ed25519.pub`.",
"",
]
return "\n".join(lines)
def build_manifest(payload_dir: Path) -> tuple[dict, dict[str, str]]:
sha_map: dict[str, str] = {}
files = []
for p in sorted(payload_dir.iterdir(), key=lambda x: x.name):
if not p.is_file():
continue
data = p.read_bytes()
sha = sha256_bytes(data)
sha_map[p.name] = sha
files.append({"path": p.name, "bytes": len(data), "sha256": sha})
manifest = {"files": files}
return manifest, sha_map
def write_sha256s(payload_dir: Path, sha_map: dict[str, str]) -> None:
lines = []
for name in sorted(sha_map.keys()):
lines.append(f"{sha_map[name]} {name}")
(payload_dir / "sha256s.txt").write_text("\n".join(lines) + "\n", encoding="utf-8")
def tar_payload(workdir: Path, trace_id: str) -> Path:
tar_path = workdir / f"emo_trace_payload_{trace_id}.tar.gz"
with tarfile.open(tar_path, "w:gz") as tf:
tf.add(workdir / "payload", arcname="payload")
return tar_path
def main() -> int:
ap = argparse.ArgumentParser()
ap.add_argument("trace_id", help="Trace ID to package")
ap.add_argument("--email", default="ds@infrafabric.io", help="Trusted email for owner-gated endpoints")
ap.add_argument("--headers", default="", help="Path to captured HTTP response headers (optional)")
ap.add_argument("--response", default="", help="Path to captured HTTP response body (optional)")
ap.add_argument("--api-payload", default="", help="Path to captured request JSON (optional)")
ap.add_argument("--out-dir", default="", help="Output directory (default: /root/tmp/emo-trace-package-<trace_id>)")
args = ap.parse_args()
trace_id = str(args.trace_id).strip()
if not trace_id:
raise SystemExit("trace_id required")
out_dir = Path(args.out_dir or f"/root/tmp/emo-trace-package-{trace_id}").resolve()
payload_dir = out_dir / "payload"
payload_dir.mkdir(parents=True, exist_ok=True)
# Captured request/response artifacts (optional).
if args.headers:
write_text(payload_dir / "headers.txt", Path(args.headers).read_text(encoding="utf-8", errors="ignore"))
if args.response:
# Ensure JSON is stable.
raw = Path(args.response).read_text(encoding="utf-8", errors="ignore")
try:
obj = json.loads(raw)
write_json(payload_dir / "response.json", obj)
except Exception:
write_text(payload_dir / "response.json", raw)
if args.api_payload:
raw = Path(args.api_payload).read_text(encoding="utf-8", errors="ignore")
try:
obj = json.loads(raw)
write_json(payload_dir / "api_payload.json", obj)
except Exception:
write_text(payload_dir / "api_payload.json", raw)
# API snapshots (owner-gated).
api_trace = fetch_api_json(trace_id=trace_id, endpoint=f"/api/trace/{trace_id}", email=args.email)
write_json(payload_dir / "api_trace.json", api_trace)
api_events = fetch_api_json(trace_id=trace_id, endpoint=f"/api/trace/events/{trace_id}?limit=10000", email=args.email)
write_json(payload_dir / "api_events.json", api_events)
# Signed record from append-only log (ground truth).
ttt_rec = extract_ttt_signed_record(trace_id=trace_id)
if not ttt_rec:
raise SystemExit("ttt_signed_record not found for trace_id")
write_json(payload_dir / "ttt_signed_record.json", ttt_rec)
# Raw trace payload (ground truth).
payload_path = f"/opt/if-emotion/data/trace_payloads/{trace_id}.json"
trace_payload_raw = pct_exec(220, f"cat {shlex_quote(payload_path)}", timeout_s=60)
payload_dir.joinpath("trace_payload.json").write_bytes(trace_payload_raw)
# Trace events (canonical JSONL form).
events = api_events.get("events") if isinstance(api_events, dict) else None
if not isinstance(events, list):
raise SystemExit("api_events missing events[]")
trace_events_lines = []
for ev in sorted((e for e in events if isinstance(e, dict)), key=lambda e: int(e.get("idx") or 0)):
trace_events_lines.append(json.dumps({"event": ev}, ensure_ascii=False, sort_keys=True))
write_text(payload_dir / "trace_events.jsonl", "\n".join(trace_events_lines) + "\n")
# Story projection.
write_text(payload_dir / "if_story.md", build_story(trace_id, [e for e in events if isinstance(e, dict)]))
# Trace public key (Ed25519).
pub = pct_exec(220, "cat /opt/if-emotion/data/trace_ed25519.pub", timeout_s=30)
payload_dir.joinpath("trace_ed25519.pub").write_bytes(pub)
# REQ_SEEN hour ledger+head, derived from the trace events.
req_seen_ev = next((e for e in events if isinstance(e, dict) and e.get("type") == "req_seen"), None)
if not isinstance(req_seen_ev, dict):
raise SystemExit("trace has no req_seen event; cannot build completeness proof")
hour = str((req_seen_ev.get("data") or {}).get("hour_utc") or "").strip()
if not hour:
raise SystemExit("req_seen event missing hour_utc")
ledger_path = f"/opt/if-emotion/data/req_seen/{hour}.jsonl"
head_path = f"/opt/if-emotion/data/req_seen/heads/{hour}.json"
ledger_bytes = pct_exec(220, f"cat {shlex_quote(ledger_path)}", timeout_s=30)
head_bytes = pct_exec(220, f"cat {shlex_quote(head_path)}", timeout_s=30)
payload_dir.joinpath(f"req_seen_{hour}.jsonl").write_bytes(ledger_bytes)
payload_dir.joinpath(f"req_seen_head_{hour}.json").write_bytes(head_bytes)
# Inclusion proof for this trace_id in the hour ledger.
leaves: list[str] = []
idx_for_trace: int | None = None
leaf_for_trace: str = ""
for raw_line in ledger_bytes.splitlines():
if not raw_line.strip():
continue
try:
entry = json.loads(raw_line.decode("utf-8", errors="ignore"))
except Exception:
continue
lh = str(entry.get("leaf_hash") or "").strip()
if len(lh) != 64:
continue
leaves.append(lh)
if idx_for_trace is None and str(entry.get("trace_id") or "").strip() == trace_id:
idx_for_trace = len(leaves) - 1
leaf_for_trace = lh
if idx_for_trace is None:
raise SystemExit("trace_id not found in REQ_SEEN hour ledger")
proof = merkle_inclusion_proof(leaves, idx_for_trace)
proof["leaf_hash"] = leaf_for_trace
proof["hour_utc"] = hour
# Sanity: root must match head's merkle_root.
head_obj = json.loads(head_bytes.decode("utf-8", errors="ignore") or "{}")
if str(head_obj.get("merkle_root") or "") and proof["root"] != str(head_obj.get("merkle_root") or ""):
raise SystemExit("Merkle root mismatch (ledger != head)")
write_json(payload_dir / "req_seen_inclusion_proof.json", proof)
# Manifest + sha list.
manifest, sha_map = build_manifest(payload_dir)
write_json(payload_dir / "manifest.json", manifest)
write_sha256s(payload_dir, sha_map)
# Register child artifacts in IF.TTT (audit:entry -> import-audit -> signed records).
child_paths = [
"headers.txt",
"response.json",
"trace_payload.json",
"trace_events.jsonl",
"ttt_signed_record.json",
"api_trace.json",
"api_events.json",
"api_payload.json",
"if_story.md",
"trace_ed25519.pub",
f"req_seen_{hour}.jsonl",
f"req_seen_head_{hour}.json",
"req_seen_inclusion_proof.json",
]
children_pre = []
audit_entries = []
created_utc = utc_now_iso()
for name in child_paths:
p = payload_dir / name
if not p.exists():
continue
cid_uuid = str(uuid.uuid4())
citation_id = f"if://citation/{cid_uuid}/v1"
sha = sha256_file(p)
rel_path = f"payload/{name}"
children_pre.append({"citation_id": citation_id, "rel_path": rel_path, "sha256": sha})
audit_entries.append(
{
"citation_id": citation_id,
"claim": f"emo-social trace artifact {name} for trace_id={trace_id}",
"source_filename": rel_path,
"source_sha256": sha,
"verification_status": "source-sha256",
"ingested_at": created_utc,
}
)
write_json(payload_dir / "ttt_children_pre.json", {"trace_id": trace_id, "created_utc": created_utc, "children": children_pre})
write_audit_entries(audit_entries)
_ = ttt_import_audit()
# Resolve signed IF.TTT records for the children.
child_ids = [c["citation_id"] for c in children_pre]
chain_records = resolve_ttt_records_by_id(child_ids)
write_json(payload_dir / "ttt_children_chain_records.json", chain_records)
# Minimal index for the bundle.
rec_by_id = {r.get("id"): r for r in chain_records if isinstance(r, dict) and r.get("id")}
children = []
for c in children_pre:
rid = c["citation_id"]
rec = rec_by_id.get(rid) or {}
children.append(
{
"citation_id": rid,
"rel_path": c["rel_path"],
"sha256": c["sha256"],
"content_hash": rec.get("content_hash"),
"pq_status": rec.get("pq_status"),
}
)
write_json(payload_dir / "ttt_children.json", {"trace_id": trace_id, "children": children})
# Build tarball and register it in IF.TTT.
tar_path = tar_payload(out_dir, trace_id)
tar_sha = sha256_file(tar_path)
write_text(out_dir / "payload_tar_sha256.txt", f"{tar_sha} {tar_path}\n")
tar_uuid = str(uuid.uuid4())
tar_citation_id = f"if://citation/{tar_uuid}/v1"
tar_audit_entry = {
"citation_id": tar_citation_id,
"claim": f"emo-social trace payload tarball (bundle) for trace_id={trace_id}",
"source_filename": tar_path.name,
"source_sha256": tar_sha,
"verification_status": "source-sha256",
"ingested_at": utc_now_iso(),
"source_path": str(tar_path),
}
write_json(out_dir / "ttt_tarball_audit_entry.json", tar_audit_entry)
write_audit_entries([tar_audit_entry])
_ = ttt_import_audit()
tar_chain = resolve_ttt_records_by_id([tar_citation_id])
if not tar_chain:
raise SystemExit("Failed to resolve tarball chain record from IF.TTT")
tar_rec = tar_chain[0]
write_json(out_dir / "ttt_tarball_chain_record.json", tar_rec)
write_json(out_dir / "ttt_tarball_chain_ref.json", {"citation_id": tar_citation_id, "content_hash": tar_rec.get("content_hash")})
print(str(out_dir))
return 0
if __name__ == "__main__":
raise SystemExit(main())