547 lines
20 KiB
Python
547 lines
20 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Build an IF.emotion "evidence bundle" tarball for a trace ID.
|
|
|
|
This is an operator tool that runs on the Proxmox host and pulls artifacts from:
|
|
- pct 220 (emo-social / if.emotion backend)
|
|
- pct 240 (IF.TTT registry)
|
|
|
|
Outputs:
|
|
/root/tmp/emo-trace-package-<trace_id>/
|
|
payload/...
|
|
emo_trace_payload_<trace_id>.tar.gz
|
|
payload_tar_sha256.txt
|
|
ttt_tarball_audit_entry.json
|
|
ttt_tarball_chain_record.json
|
|
ttt_tarball_chain_ref.json
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import hashlib
|
|
import json
|
|
import os
|
|
import subprocess
|
|
import tarfile
|
|
import textwrap
|
|
import uuid
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
|
|
def utc_now_iso() -> str:
|
|
return datetime.now(timezone.utc).isoformat()
|
|
|
|
|
|
def sha256_bytes(data: bytes) -> str:
|
|
return hashlib.sha256(data or b"").hexdigest()
|
|
|
|
|
|
def sha256_file(path: Path) -> str:
|
|
h = hashlib.sha256()
|
|
with path.open("rb") as f:
|
|
for chunk in iter(lambda: f.read(1024 * 1024), b""):
|
|
h.update(chunk)
|
|
return h.hexdigest()
|
|
|
|
|
|
def canonical_json_bytes(obj: Any) -> bytes:
|
|
return json.dumps(obj, ensure_ascii=False, sort_keys=True, separators=(",", ":")).encode("utf-8")
|
|
|
|
|
|
def merkle_root_hex(leaves_hex: list[str]) -> str:
|
|
if not leaves_hex:
|
|
return sha256_bytes(b"")
|
|
level: list[bytes] = [bytes.fromhex(h) for h in leaves_hex if isinstance(h, str) and len(h) == 64]
|
|
if not level:
|
|
return sha256_bytes(b"")
|
|
while len(level) > 1:
|
|
if len(level) % 2 == 1:
|
|
level.append(level[-1])
|
|
nxt: list[bytes] = []
|
|
for i in range(0, len(level), 2):
|
|
nxt.append(hashlib.sha256(level[i] + level[i + 1]).digest())
|
|
level = nxt
|
|
return level[0].hex()
|
|
|
|
|
|
def merkle_inclusion_proof(leaves_hex: list[str], index: int) -> dict:
|
|
if index < 0 or index >= len(leaves_hex):
|
|
raise ValueError("index out of range")
|
|
level: list[bytes] = [bytes.fromhex(h) for h in leaves_hex]
|
|
proof: list[dict] = []
|
|
idx = index
|
|
while len(level) > 1:
|
|
if len(level) % 2 == 1:
|
|
level.append(level[-1])
|
|
sibling_idx = idx ^ 1
|
|
sibling = level[sibling_idx]
|
|
side = "left" if sibling_idx < idx else "right"
|
|
proof.append({"sibling": sibling.hex(), "side": side})
|
|
nxt: list[bytes] = []
|
|
for i in range(0, len(level), 2):
|
|
nxt.append(hashlib.sha256(level[i] + level[i + 1]).digest())
|
|
level = nxt
|
|
idx //= 2
|
|
root = level[0].hex()
|
|
return {"index": index, "root": root, "path": proof}
|
|
|
|
|
|
def run(cmd: list[str], *, stdin: bytes | None = None, timeout_s: int = 120) -> bytes:
|
|
p = subprocess.run(
|
|
cmd,
|
|
input=stdin,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
timeout=timeout_s,
|
|
check=False,
|
|
)
|
|
if p.returncode != 0:
|
|
raise RuntimeError(f"cmd failed ({p.returncode}): {' '.join(cmd)}\n{p.stderr.decode('utf-8',errors='ignore')}")
|
|
return p.stdout
|
|
|
|
|
|
def pct_exec(pct: int, bash_cmd: str, *, stdin: bytes | None = None, timeout_s: int = 120) -> bytes:
|
|
return run(["pct", "exec", str(pct), "--", "bash", "-lc", bash_cmd], stdin=stdin, timeout_s=timeout_s)
|
|
|
|
|
|
def pct_cat(pct: int, path: str, *, timeout_s: int = 120) -> bytes:
|
|
return pct_exec(pct, f"cat {shlex_quote(path)}", timeout_s=timeout_s)
|
|
|
|
|
|
def shlex_quote(s: str) -> str:
|
|
# Minimal, safe shell quoting for paths.
|
|
return "'" + (s or "").replace("'", "'\"'\"'") + "'"
|
|
|
|
|
|
def write_json(path: Path, obj: Any) -> None:
|
|
path.write_text(json.dumps(obj, ensure_ascii=False, sort_keys=True, indent=2) + "\n", encoding="utf-8")
|
|
|
|
|
|
def write_text(path: Path, text: str) -> None:
|
|
path.write_text(text, encoding="utf-8")
|
|
|
|
|
|
def fetch_api_json(*, trace_id: str, endpoint: str, email: str) -> Any:
|
|
raw = pct_exec(
|
|
220,
|
|
f"curl -fsSL -H {shlex_quote('X-Auth-Request-Email: ' + email)} http://127.0.0.1:5000{endpoint}",
|
|
timeout_s=60,
|
|
)
|
|
return json.loads(raw.decode("utf-8", errors="ignore") or "{}")
|
|
|
|
|
|
def extract_ttt_signed_record(*, trace_id: str) -> dict:
|
|
script = textwrap.dedent(
|
|
f"""
|
|
python3 - <<'PY'
|
|
import json
|
|
tid = {trace_id!r}
|
|
path = "/opt/if-emotion/data/ttt_signed_log.jsonl"
|
|
out = None
|
|
try:
|
|
with open(path, "r", encoding="utf-8", errors="ignore") as f:
|
|
for line in f:
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
try:
|
|
rec = json.loads(line)
|
|
except Exception:
|
|
continue
|
|
ev = rec.get("event") or {{}}
|
|
if isinstance(ev, dict) and str(ev.get("trace_id") or "").strip() == tid:
|
|
out = rec
|
|
except Exception:
|
|
out = None
|
|
print(json.dumps(out or {{}}, ensure_ascii=False, sort_keys=True))
|
|
PY
|
|
"""
|
|
).strip()
|
|
raw = pct_exec(220, script, timeout_s=60)
|
|
return json.loads(raw.decode("utf-8", errors="ignore") or "{}")
|
|
|
|
|
|
def resolve_ttt_records_by_id(record_ids: list[str]) -> list[dict]:
|
|
payload = json.dumps({"ids": record_ids}, ensure_ascii=False).encode("utf-8")
|
|
py = """
|
|
import json
|
|
import sys
|
|
import importlib.util
|
|
import contextlib
|
|
|
|
req = json.loads(sys.stdin.read() or "{}")
|
|
ids = req.get("ids") or []
|
|
|
|
spec = importlib.util.spec_from_file_location("ttt_registry_mod", "/opt/ttt-registry/ttt_registry.py")
|
|
mod = importlib.util.module_from_spec(spec)
|
|
|
|
with contextlib.redirect_stdout(sys.stderr):
|
|
spec.loader.exec_module(mod) # type: ignore
|
|
reg = mod.TTTRegistry()
|
|
|
|
out = []
|
|
for rid in ids:
|
|
rid = str(rid or "").strip()
|
|
if not rid:
|
|
continue
|
|
h = reg.redis.get(f"ttt:index:id:{rid}")
|
|
if not h:
|
|
continue
|
|
try:
|
|
out.append(reg.get(h))
|
|
except Exception:
|
|
continue
|
|
|
|
print(json.dumps(out, ensure_ascii=False, sort_keys=True))
|
|
""".strip()
|
|
raw = pct_exec(
|
|
240,
|
|
f"OQS_INSTALL_PATH=/opt/ttt-registry/_oqs /opt/ttt-registry/venv/bin/python -c {shlex_quote(py)}",
|
|
stdin=payload,
|
|
timeout_s=180,
|
|
)
|
|
try:
|
|
data = json.loads(raw.decode("utf-8", errors="ignore") or "[]")
|
|
except Exception:
|
|
return []
|
|
return data if isinstance(data, list) else [data]
|
|
|
|
|
|
def write_audit_entries(entries: list[dict]) -> None:
|
|
payload = json.dumps({"entries": entries}, ensure_ascii=False).encode("utf-8")
|
|
py = """
|
|
import json
|
|
import sys
|
|
import re
|
|
import uuid
|
|
import redis
|
|
from pathlib import Path
|
|
|
|
req = json.loads(sys.stdin.read() or "{}")
|
|
entries = req.get("entries") or []
|
|
|
|
cfg = Path("/etc/redis/ttt.conf").read_text(encoding="utf-8", errors="ignore")
|
|
m = re.search(r"^requirepass\\s+(\\S+)", cfg, flags=re.M)
|
|
password = m.group(1) if m else None
|
|
|
|
r = redis.Redis(host="localhost", port=6380, password=password, decode_responses=True)
|
|
written = 0
|
|
for e in entries:
|
|
cid = str(e.get("citation_id") or "").strip()
|
|
parts = cid.split("/")
|
|
uid = (parts[3] if len(parts) > 3 else "").strip()
|
|
try:
|
|
uuid.UUID(uid)
|
|
except Exception:
|
|
continue
|
|
r.set(f"audit:entry:{uid}", json.dumps(e, ensure_ascii=False, sort_keys=True))
|
|
written += 1
|
|
|
|
print(json.dumps({"ok": True, "written": written}, ensure_ascii=False))
|
|
""".strip()
|
|
_ = pct_exec(240, f"/opt/ttt-registry/venv/bin/python -c {shlex_quote(py)}", stdin=payload, timeout_s=60)
|
|
|
|
|
|
def ttt_import_audit() -> dict:
|
|
raw = pct_exec(
|
|
240,
|
|
"OQS_INSTALL_PATH=/opt/ttt-registry/_oqs /opt/ttt-registry/venv/bin/python /opt/ttt-registry/ttt_registry.py import-audit",
|
|
timeout_s=300,
|
|
)
|
|
txt = raw.decode("utf-8", errors="ignore") or ""
|
|
# The registry prints capability banners before JSON; best-effort parse the last JSON object.
|
|
for chunk in reversed([c.strip() for c in txt.splitlines() if c.strip()]):
|
|
if chunk.startswith("{") and chunk.endswith("}"):
|
|
try:
|
|
return json.loads(chunk)
|
|
except Exception:
|
|
break
|
|
return {"raw": txt.strip()}
|
|
|
|
|
|
def build_story(trace_id: str, events: list[dict]) -> str:
|
|
lines = [
|
|
"# IF.story — contextual narrative log",
|
|
"",
|
|
f"Trace: `{trace_id}`",
|
|
"",
|
|
"Deterministic narrative projection of `trace_events.jsonl`. Each line includes the `event_hash` anchor.",
|
|
"",
|
|
]
|
|
for ev in sorted(events, key=lambda e: int(e.get("idx") or 0)):
|
|
ts = str(ev.get("ts_utc") or "")
|
|
et = str(ev.get("type") or "")
|
|
mono_ms = int(ev.get("mono_ms") or 0)
|
|
data = ev.get("data") if isinstance(ev.get("data"), dict) else {}
|
|
h = str(ev.get("event_hash") or "")
|
|
summary = ""
|
|
if et == "request_commit":
|
|
summary = f"Request body commitment; commit_ok={bool(data.get('commit_ok'))} client_trace_id={data.get('client_trace_id') or ''}".strip()
|
|
elif et == "req_seen":
|
|
summary = f"REQ_SEEN witnessed; hour={data.get('hour_utc')} count={data.get('count')} merkle_root={data.get('merkle_root')}"
|
|
elif et == "request_received":
|
|
summary = f"Auth+quota succeeded; provider={data.get('provider')} model={data.get('requested_model')} stream={data.get('stream')} user_len={data.get('user_len')} auth_ms={data.get('auth_ms')}"
|
|
elif et == "guard_short_circuit":
|
|
summary = f"IF.GUARD short-circuit; reasons={data.get('reasons')}"
|
|
elif et == "trace_finalizing":
|
|
summary = f"Trace finalizing; ok={data.get('ok')} provider={data.get('provider')}"
|
|
else:
|
|
# generic
|
|
keys = list(data.keys())[:6] if isinstance(data, dict) else []
|
|
summary = f"Event data keys={keys}"
|
|
lines.append(f"- {ts} (+{mono_ms}ms) | `{et}` | {summary} | event_hash={h}")
|
|
lines += [
|
|
"",
|
|
"Notes:",
|
|
"- Ground truth remains `trace_events.jsonl` + `ttt_signed_record.json`.",
|
|
"- REQ_SEEN ledger+head are included; public key is `trace_ed25519.pub`.",
|
|
"",
|
|
]
|
|
return "\n".join(lines)
|
|
|
|
|
|
def build_manifest(payload_dir: Path) -> tuple[dict, dict[str, str]]:
|
|
sha_map: dict[str, str] = {}
|
|
files = []
|
|
for p in sorted(payload_dir.iterdir(), key=lambda x: x.name):
|
|
if not p.is_file():
|
|
continue
|
|
data = p.read_bytes()
|
|
sha = sha256_bytes(data)
|
|
sha_map[p.name] = sha
|
|
files.append({"path": p.name, "bytes": len(data), "sha256": sha})
|
|
manifest = {"files": files}
|
|
return manifest, sha_map
|
|
|
|
|
|
def write_sha256s(payload_dir: Path, sha_map: dict[str, str]) -> None:
|
|
lines = []
|
|
for name in sorted(sha_map.keys()):
|
|
lines.append(f"{sha_map[name]} {name}")
|
|
(payload_dir / "sha256s.txt").write_text("\n".join(lines) + "\n", encoding="utf-8")
|
|
|
|
|
|
def tar_payload(workdir: Path, trace_id: str) -> Path:
|
|
tar_path = workdir / f"emo_trace_payload_{trace_id}.tar.gz"
|
|
with tarfile.open(tar_path, "w:gz") as tf:
|
|
tf.add(workdir / "payload", arcname="payload")
|
|
return tar_path
|
|
|
|
|
|
def main() -> int:
|
|
ap = argparse.ArgumentParser()
|
|
ap.add_argument("trace_id", help="Trace ID to package")
|
|
ap.add_argument("--email", default="ds@infrafabric.io", help="Trusted email for owner-gated endpoints")
|
|
ap.add_argument("--headers", default="", help="Path to captured HTTP response headers (optional)")
|
|
ap.add_argument("--response", default="", help="Path to captured HTTP response body (optional)")
|
|
ap.add_argument("--api-payload", default="", help="Path to captured request JSON (optional)")
|
|
ap.add_argument("--out-dir", default="", help="Output directory (default: /root/tmp/emo-trace-package-<trace_id>)")
|
|
args = ap.parse_args()
|
|
|
|
trace_id = str(args.trace_id).strip()
|
|
if not trace_id:
|
|
raise SystemExit("trace_id required")
|
|
|
|
out_dir = Path(args.out_dir or f"/root/tmp/emo-trace-package-{trace_id}").resolve()
|
|
payload_dir = out_dir / "payload"
|
|
payload_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Captured request/response artifacts (optional).
|
|
if args.headers:
|
|
write_text(payload_dir / "headers.txt", Path(args.headers).read_text(encoding="utf-8", errors="ignore"))
|
|
if args.response:
|
|
# Ensure JSON is stable.
|
|
raw = Path(args.response).read_text(encoding="utf-8", errors="ignore")
|
|
try:
|
|
obj = json.loads(raw)
|
|
write_json(payload_dir / "response.json", obj)
|
|
except Exception:
|
|
write_text(payload_dir / "response.json", raw)
|
|
if args.api_payload:
|
|
raw = Path(args.api_payload).read_text(encoding="utf-8", errors="ignore")
|
|
try:
|
|
obj = json.loads(raw)
|
|
write_json(payload_dir / "api_payload.json", obj)
|
|
except Exception:
|
|
write_text(payload_dir / "api_payload.json", raw)
|
|
|
|
# API snapshots (owner-gated).
|
|
api_trace = fetch_api_json(trace_id=trace_id, endpoint=f"/api/trace/{trace_id}", email=args.email)
|
|
write_json(payload_dir / "api_trace.json", api_trace)
|
|
api_events = fetch_api_json(trace_id=trace_id, endpoint=f"/api/trace/events/{trace_id}?limit=10000", email=args.email)
|
|
write_json(payload_dir / "api_events.json", api_events)
|
|
|
|
# Signed record from append-only log (ground truth).
|
|
ttt_rec = extract_ttt_signed_record(trace_id=trace_id)
|
|
if not ttt_rec:
|
|
raise SystemExit("ttt_signed_record not found for trace_id")
|
|
write_json(payload_dir / "ttt_signed_record.json", ttt_rec)
|
|
|
|
# Raw trace payload (ground truth).
|
|
payload_path = f"/opt/if-emotion/data/trace_payloads/{trace_id}.json"
|
|
trace_payload_raw = pct_exec(220, f"cat {shlex_quote(payload_path)}", timeout_s=60)
|
|
payload_dir.joinpath("trace_payload.json").write_bytes(trace_payload_raw)
|
|
|
|
# Trace events (canonical JSONL form).
|
|
events = api_events.get("events") if isinstance(api_events, dict) else None
|
|
if not isinstance(events, list):
|
|
raise SystemExit("api_events missing events[]")
|
|
trace_events_lines = []
|
|
for ev in sorted((e for e in events if isinstance(e, dict)), key=lambda e: int(e.get("idx") or 0)):
|
|
trace_events_lines.append(json.dumps({"event": ev}, ensure_ascii=False, sort_keys=True))
|
|
write_text(payload_dir / "trace_events.jsonl", "\n".join(trace_events_lines) + "\n")
|
|
|
|
# Story projection.
|
|
write_text(payload_dir / "if_story.md", build_story(trace_id, [e for e in events if isinstance(e, dict)]))
|
|
|
|
# Trace public key (Ed25519).
|
|
pub = pct_exec(220, "cat /opt/if-emotion/data/trace_ed25519.pub", timeout_s=30)
|
|
payload_dir.joinpath("trace_ed25519.pub").write_bytes(pub)
|
|
|
|
# REQ_SEEN hour ledger+head, derived from the trace events.
|
|
req_seen_ev = next((e for e in events if isinstance(e, dict) and e.get("type") == "req_seen"), None)
|
|
if not isinstance(req_seen_ev, dict):
|
|
raise SystemExit("trace has no req_seen event; cannot build completeness proof")
|
|
hour = str((req_seen_ev.get("data") or {}).get("hour_utc") or "").strip()
|
|
if not hour:
|
|
raise SystemExit("req_seen event missing hour_utc")
|
|
ledger_path = f"/opt/if-emotion/data/req_seen/{hour}.jsonl"
|
|
head_path = f"/opt/if-emotion/data/req_seen/heads/{hour}.json"
|
|
ledger_bytes = pct_exec(220, f"cat {shlex_quote(ledger_path)}", timeout_s=30)
|
|
head_bytes = pct_exec(220, f"cat {shlex_quote(head_path)}", timeout_s=30)
|
|
payload_dir.joinpath(f"req_seen_{hour}.jsonl").write_bytes(ledger_bytes)
|
|
payload_dir.joinpath(f"req_seen_head_{hour}.json").write_bytes(head_bytes)
|
|
|
|
# Inclusion proof for this trace_id in the hour ledger.
|
|
leaves: list[str] = []
|
|
idx_for_trace: int | None = None
|
|
leaf_for_trace: str = ""
|
|
for raw_line in ledger_bytes.splitlines():
|
|
if not raw_line.strip():
|
|
continue
|
|
try:
|
|
entry = json.loads(raw_line.decode("utf-8", errors="ignore"))
|
|
except Exception:
|
|
continue
|
|
lh = str(entry.get("leaf_hash") or "").strip()
|
|
if len(lh) != 64:
|
|
continue
|
|
leaves.append(lh)
|
|
if idx_for_trace is None and str(entry.get("trace_id") or "").strip() == trace_id:
|
|
idx_for_trace = len(leaves) - 1
|
|
leaf_for_trace = lh
|
|
if idx_for_trace is None:
|
|
raise SystemExit("trace_id not found in REQ_SEEN hour ledger")
|
|
proof = merkle_inclusion_proof(leaves, idx_for_trace)
|
|
proof["leaf_hash"] = leaf_for_trace
|
|
proof["hour_utc"] = hour
|
|
# Sanity: root must match head's merkle_root.
|
|
head_obj = json.loads(head_bytes.decode("utf-8", errors="ignore") or "{}")
|
|
if str(head_obj.get("merkle_root") or "") and proof["root"] != str(head_obj.get("merkle_root") or ""):
|
|
raise SystemExit("Merkle root mismatch (ledger != head)")
|
|
write_json(payload_dir / "req_seen_inclusion_proof.json", proof)
|
|
|
|
# Manifest + sha list.
|
|
manifest, sha_map = build_manifest(payload_dir)
|
|
write_json(payload_dir / "manifest.json", manifest)
|
|
write_sha256s(payload_dir, sha_map)
|
|
|
|
# Register child artifacts in IF.TTT (audit:entry -> import-audit -> signed records).
|
|
child_paths = [
|
|
"headers.txt",
|
|
"response.json",
|
|
"trace_payload.json",
|
|
"trace_events.jsonl",
|
|
"ttt_signed_record.json",
|
|
"api_trace.json",
|
|
"api_events.json",
|
|
"api_payload.json",
|
|
"if_story.md",
|
|
"trace_ed25519.pub",
|
|
f"req_seen_{hour}.jsonl",
|
|
f"req_seen_head_{hour}.json",
|
|
"req_seen_inclusion_proof.json",
|
|
]
|
|
children_pre = []
|
|
audit_entries = []
|
|
created_utc = utc_now_iso()
|
|
for name in child_paths:
|
|
p = payload_dir / name
|
|
if not p.exists():
|
|
continue
|
|
cid_uuid = str(uuid.uuid4())
|
|
citation_id = f"if://citation/{cid_uuid}/v1"
|
|
sha = sha256_file(p)
|
|
rel_path = f"payload/{name}"
|
|
children_pre.append({"citation_id": citation_id, "rel_path": rel_path, "sha256": sha})
|
|
audit_entries.append(
|
|
{
|
|
"citation_id": citation_id,
|
|
"claim": f"emo-social trace artifact {name} for trace_id={trace_id}",
|
|
"source_filename": rel_path,
|
|
"source_sha256": sha,
|
|
"verification_status": "source-sha256",
|
|
"ingested_at": created_utc,
|
|
}
|
|
)
|
|
write_json(payload_dir / "ttt_children_pre.json", {"trace_id": trace_id, "created_utc": created_utc, "children": children_pre})
|
|
write_audit_entries(audit_entries)
|
|
_ = ttt_import_audit()
|
|
|
|
# Resolve signed IF.TTT records for the children.
|
|
child_ids = [c["citation_id"] for c in children_pre]
|
|
chain_records = resolve_ttt_records_by_id(child_ids)
|
|
write_json(payload_dir / "ttt_children_chain_records.json", chain_records)
|
|
# Minimal index for the bundle.
|
|
rec_by_id = {r.get("id"): r for r in chain_records if isinstance(r, dict) and r.get("id")}
|
|
children = []
|
|
for c in children_pre:
|
|
rid = c["citation_id"]
|
|
rec = rec_by_id.get(rid) or {}
|
|
children.append(
|
|
{
|
|
"citation_id": rid,
|
|
"rel_path": c["rel_path"],
|
|
"sha256": c["sha256"],
|
|
"content_hash": rec.get("content_hash"),
|
|
"pq_status": rec.get("pq_status"),
|
|
}
|
|
)
|
|
write_json(payload_dir / "ttt_children.json", {"trace_id": trace_id, "children": children})
|
|
|
|
# Build tarball and register it in IF.TTT.
|
|
tar_path = tar_payload(out_dir, trace_id)
|
|
tar_sha = sha256_file(tar_path)
|
|
write_text(out_dir / "payload_tar_sha256.txt", f"{tar_sha} {tar_path}\n")
|
|
|
|
tar_uuid = str(uuid.uuid4())
|
|
tar_citation_id = f"if://citation/{tar_uuid}/v1"
|
|
tar_audit_entry = {
|
|
"citation_id": tar_citation_id,
|
|
"claim": f"emo-social trace payload tarball (bundle) for trace_id={trace_id}",
|
|
"source_filename": tar_path.name,
|
|
"source_sha256": tar_sha,
|
|
"verification_status": "source-sha256",
|
|
"ingested_at": utc_now_iso(),
|
|
"source_path": str(tar_path),
|
|
}
|
|
write_json(out_dir / "ttt_tarball_audit_entry.json", tar_audit_entry)
|
|
write_audit_entries([tar_audit_entry])
|
|
_ = ttt_import_audit()
|
|
|
|
tar_chain = resolve_ttt_records_by_id([tar_citation_id])
|
|
if not tar_chain:
|
|
raise SystemExit("Failed to resolve tarball chain record from IF.TTT")
|
|
tar_rec = tar_chain[0]
|
|
write_json(out_dir / "ttt_tarball_chain_record.json", tar_rec)
|
|
write_json(out_dir / "ttt_tarball_chain_ref.json", {"citation_id": tar_citation_id, "content_hash": tar_rec.get("content_hash")})
|
|
|
|
print(str(out_dir))
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|