362 lines
13 KiB
Python
362 lines
13 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
IF.emotion trace bundle verifier + Merkle inclusion proof tool.
|
|
|
|
Run with the venv:
|
|
/root/tmp/iftrace_venv/bin/python /root/tmp/iftrace.py <command> ...
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import hashlib
|
|
import io
|
|
import json
|
|
import os
|
|
import tarfile
|
|
import tempfile
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
from typing import Any, Iterable
|
|
|
|
from canonicaljson import encode_canonical_json
|
|
from nacl.signing import VerifyKey
|
|
from nacl.encoding import HexEncoder
|
|
|
|
|
|
def sha256_bytes(data: bytes) -> str:
|
|
return hashlib.sha256(data or b"").hexdigest()
|
|
|
|
|
|
def sha256_file(path: Path) -> str:
|
|
h = hashlib.sha256()
|
|
with path.open("rb") as f:
|
|
for chunk in iter(lambda: f.read(1024 * 1024), b""):
|
|
h.update(chunk)
|
|
return h.hexdigest()
|
|
|
|
|
|
def canonical_json_bytes(obj: Any) -> bytes:
|
|
return encode_canonical_json(obj)
|
|
|
|
|
|
def merkle_root_hex(leaves_hex: list[str]) -> str:
|
|
if not leaves_hex:
|
|
return sha256_bytes(b"")
|
|
level: list[bytes] = [bytes.fromhex(h) for h in leaves_hex if isinstance(h, str) and len(h) == 64]
|
|
if not level:
|
|
return sha256_bytes(b"")
|
|
while len(level) > 1:
|
|
if len(level) % 2 == 1:
|
|
level.append(level[-1])
|
|
nxt: list[bytes] = []
|
|
for i in range(0, len(level), 2):
|
|
nxt.append(hashlib.sha256(level[i] + level[i + 1]).digest())
|
|
level = nxt
|
|
return level[0].hex()
|
|
|
|
|
|
def merkle_inclusion_proof(leaves_hex: list[str], index: int) -> dict:
|
|
if index < 0 or index >= len(leaves_hex):
|
|
raise ValueError("index out of range")
|
|
level: list[bytes] = [bytes.fromhex(h) for h in leaves_hex]
|
|
proof: list[dict] = []
|
|
idx = index
|
|
while len(level) > 1:
|
|
if len(level) % 2 == 1:
|
|
level.append(level[-1])
|
|
sibling_idx = idx ^ 1
|
|
sibling = level[sibling_idx]
|
|
side = "left" if sibling_idx < idx else "right"
|
|
proof.append({"sibling": sibling.hex(), "side": side})
|
|
nxt: list[bytes] = []
|
|
for i in range(0, len(level), 2):
|
|
nxt.append(hashlib.sha256(level[i] + level[i + 1]).digest())
|
|
level = nxt
|
|
idx //= 2
|
|
root = level[0].hex()
|
|
return {"index": index, "root": root, "path": proof}
|
|
|
|
|
|
def merkle_verify_proof(leaf_hex: str, proof: dict) -> bool:
|
|
try:
|
|
cur = bytes.fromhex(leaf_hex)
|
|
for step in proof.get("path", []):
|
|
sib = bytes.fromhex(step["sibling"])
|
|
if step["side"] == "left":
|
|
cur = hashlib.sha256(sib + cur).digest()
|
|
else:
|
|
cur = hashlib.sha256(cur + sib).digest()
|
|
return cur.hex() == proof.get("root")
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
def read_json(path: Path) -> Any:
|
|
return json.loads(path.read_text(encoding="utf-8", errors="strict"))
|
|
|
|
|
|
def verify_ed25519_hex(*, pub_hex: str, msg: bytes, sig_hex: str) -> None:
|
|
vk = VerifyKey(pub_hex, encoder=HexEncoder)
|
|
vk.verify(msg, bytes.fromhex(sig_hex))
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class VerifyResult:
|
|
ok: bool
|
|
notes: list[str]
|
|
|
|
|
|
def verify_trace_events(events_path: Path) -> VerifyResult:
|
|
notes: list[str] = []
|
|
prev_hash = "0" * 64
|
|
expected_idx = 0
|
|
lines = events_path.read_text(encoding="utf-8", errors="ignore").splitlines()
|
|
for line in lines:
|
|
if not line.strip():
|
|
continue
|
|
obj = json.loads(line)
|
|
ev = obj.get("event") or {}
|
|
idx = int(ev.get("idx", -1))
|
|
if idx != expected_idx:
|
|
return VerifyResult(False, notes + [f"trace_events: idx mismatch (got {idx}, expected {expected_idx})"])
|
|
if str(ev.get("prev_hash") or "") != prev_hash:
|
|
return VerifyResult(False, notes + ["trace_events: prev_hash mismatch"])
|
|
stored_hash = str(ev.get("event_hash") or "")
|
|
payload = dict(ev)
|
|
payload.pop("event_hash", None)
|
|
recomputed = sha256_bytes(prev_hash.encode("utf-8") + canonical_json_bytes(payload))
|
|
if recomputed != stored_hash:
|
|
return VerifyResult(False, notes + ["trace_events: event_hash mismatch (recomputed != stored)"])
|
|
prev_hash = stored_hash
|
|
expected_idx += 1
|
|
notes.append(f"trace_events: ok (events={expected_idx}, head_hash={prev_hash[:16]}…)")
|
|
return VerifyResult(True, notes)
|
|
|
|
|
|
def verify_req_seen(ledger_path: Path, head_path: Path) -> VerifyResult:
|
|
notes: list[str] = []
|
|
head = read_json(head_path)
|
|
pub_hex = str(head.get("signer_ed25519") or "").strip()
|
|
sig_hex = str(head.get("sig_ed25519") or "").strip()
|
|
if not pub_hex or not sig_hex:
|
|
return VerifyResult(False, ["req_seen: missing signer_ed25519 or sig_ed25519 in head"])
|
|
# Recreate the message that was signed (the head core before adding sig/key_id/signer).
|
|
head_core = {
|
|
"schema": head.get("schema"),
|
|
"hour_utc": head.get("hour_utc"),
|
|
"updated_utc": head.get("updated_utc"),
|
|
"count": head.get("count"),
|
|
"merkle_root": head.get("merkle_root"),
|
|
"last_leaf_hash": head.get("last_leaf_hash"),
|
|
}
|
|
verify_ed25519_hex(pub_hex=pub_hex, msg=canonical_json_bytes(head_core), sig_hex=sig_hex)
|
|
notes.append("req_seen_head: Ed25519 signature OK")
|
|
|
|
leaves: list[str] = []
|
|
lines = ledger_path.read_text(encoding="utf-8", errors="ignore").splitlines()
|
|
for line in lines:
|
|
if not line.strip():
|
|
continue
|
|
entry = json.loads(line)
|
|
leaf = str(entry.get("leaf_hash") or "").strip()
|
|
entry_core = dict(entry)
|
|
entry_core.pop("leaf_hash", None)
|
|
recomputed_leaf = sha256_bytes(canonical_json_bytes(entry_core))
|
|
if recomputed_leaf != leaf:
|
|
return VerifyResult(False, notes + ["req_seen: leaf_hash mismatch"])
|
|
leaves.append(leaf)
|
|
|
|
root = merkle_root_hex(leaves)
|
|
if root != str(head.get("merkle_root") or ""):
|
|
return VerifyResult(False, notes + ["req_seen: merkle_root mismatch"])
|
|
if int(head.get("count") or 0) != len(leaves):
|
|
return VerifyResult(False, notes + ["req_seen: count mismatch"])
|
|
notes.append(f"req_seen: ok (count={len(leaves)}, merkle_root={root[:16]}…)")
|
|
return VerifyResult(True, notes)
|
|
|
|
|
|
def verify_story(story_path: Path, events_path: Path) -> VerifyResult:
|
|
notes: list[str] = []
|
|
# Collect all event hashes from ground truth.
|
|
hashes: set[str] = set()
|
|
for line in events_path.read_text(encoding="utf-8", errors="ignore").splitlines():
|
|
if not line.strip():
|
|
continue
|
|
ev = (json.loads(line).get("event") or {})
|
|
h = str(ev.get("event_hash") or "").strip()
|
|
if len(h) == 64:
|
|
hashes.add(h)
|
|
# Ensure every story line that mentions event_hash=... points to a real event.
|
|
for line in story_path.read_text(encoding="utf-8", errors="ignore").splitlines():
|
|
if "event_hash=" not in line:
|
|
continue
|
|
h = line.split("event_hash=", 1)[1].strip().split()[0]
|
|
if h and h not in hashes:
|
|
return VerifyResult(False, [f"if_story: unknown event_hash referenced: {h}"])
|
|
notes.append("if_story: ok (all referenced event_hash values exist)")
|
|
return VerifyResult(True, notes)
|
|
|
|
|
|
def verify_manifest(payload_dir: Path) -> VerifyResult:
|
|
notes: list[str] = []
|
|
manifest_path = payload_dir / "manifest.json"
|
|
sha_list_path = payload_dir / "sha256s.txt"
|
|
if not manifest_path.exists() or not sha_list_path.exists():
|
|
return VerifyResult(False, ["manifest: missing manifest.json or sha256s.txt"])
|
|
|
|
manifest = read_json(manifest_path)
|
|
files = manifest.get("files") or []
|
|
manifest_map = {f["path"]: f["sha256"] for f in files if isinstance(f, dict) and "path" in f and "sha256" in f}
|
|
|
|
sha_map: dict[str, str] = {}
|
|
for line in sha_list_path.read_text(encoding="utf-8", errors="ignore").splitlines():
|
|
parts = line.strip().split()
|
|
if len(parts) >= 2:
|
|
sha_map[parts[1]] = parts[0]
|
|
|
|
# sha256s.txt is a checksum file; it must not be self-referential.
|
|
sha_map.pop("sha256s.txt", None)
|
|
# manifest.json is the root index; do not make it self-referential in sha256s.
|
|
sha_map.pop("manifest.json", None)
|
|
|
|
for name, sha in sha_map.items():
|
|
p = payload_dir / name
|
|
if not p.exists():
|
|
return VerifyResult(False, [f"manifest: sha256s references missing file: {name}"])
|
|
got = sha256_file(p)
|
|
if got != sha:
|
|
return VerifyResult(False, [f"manifest: sha256 mismatch for {name}"])
|
|
if name != "manifest.json":
|
|
if manifest_map.get(name) != sha:
|
|
return VerifyResult(False, [f"manifest: manifest.json mismatch for {name}"])
|
|
|
|
notes.append(f"manifest: ok (files={len(sha_map)})")
|
|
return VerifyResult(True, notes)
|
|
|
|
|
|
def extract_tarball(tar_path: Path) -> Path:
|
|
tmp = Path(tempfile.mkdtemp(prefix="iftrace_"))
|
|
with tarfile.open(tar_path, "r:gz") as tf:
|
|
tf.extractall(tmp) # trusted local artifact
|
|
return tmp
|
|
|
|
|
|
def cmd_verify(args: argparse.Namespace) -> int:
|
|
tar_path = Path(args.tar).resolve()
|
|
expected_sha = (args.expected_sha256 or "").strip().lower()
|
|
got_sha = sha256_file(tar_path)
|
|
if expected_sha and got_sha != expected_sha:
|
|
print(f"FAIL tar_sha256 expected={expected_sha} got={got_sha}")
|
|
return 2
|
|
print(f"OK tar_sha256 {got_sha}")
|
|
|
|
root = extract_tarball(tar_path)
|
|
payload_dir = root / "payload"
|
|
if not payload_dir.exists():
|
|
print("FAIL: tarball missing payload/ directory")
|
|
return 2
|
|
|
|
checks: list[VerifyResult] = []
|
|
checks.append(verify_manifest(payload_dir))
|
|
|
|
events_path = payload_dir / "trace_events.jsonl"
|
|
if events_path.exists():
|
|
checks.append(verify_trace_events(events_path))
|
|
|
|
story_path = payload_dir / "if_story.md"
|
|
if story_path.exists() and events_path.exists():
|
|
checks.append(verify_story(story_path, events_path))
|
|
|
|
# REQ_SEEN verification if present
|
|
head_files = sorted(payload_dir.glob("req_seen_head_*.json"))
|
|
ledger_files = sorted(payload_dir.glob("req_seen_*.jsonl"))
|
|
if head_files and ledger_files:
|
|
checks.append(verify_req_seen(ledger_files[0], head_files[0]))
|
|
|
|
ok = True
|
|
for res in checks:
|
|
for n in res.notes:
|
|
print(n)
|
|
ok = ok and res.ok
|
|
|
|
if not ok:
|
|
print("FAIL verify")
|
|
return 2
|
|
print("OK verify")
|
|
return 0
|
|
|
|
|
|
def cmd_prove_inclusion(args: argparse.Namespace) -> int:
|
|
ledger = Path(args.ledger).resolve()
|
|
head = Path(args.head).resolve()
|
|
trace_id = (args.trace_id or "").strip()
|
|
leaf_hash = (args.leaf_hash or "").strip().lower()
|
|
|
|
leaves: list[str] = []
|
|
idx_by_trace: dict[str, int] = {}
|
|
lines = ledger.read_text(encoding="utf-8", errors="ignore").splitlines()
|
|
for i, line in enumerate(lines):
|
|
if not line.strip():
|
|
continue
|
|
entry = json.loads(line)
|
|
lh = str(entry.get("leaf_hash") or "").strip()
|
|
leaves.append(lh)
|
|
tid = str(entry.get("trace_id") or "").strip()
|
|
if tid and tid not in idx_by_trace:
|
|
idx_by_trace[tid] = len(leaves) - 1
|
|
|
|
if trace_id:
|
|
if trace_id not in idx_by_trace:
|
|
raise SystemExit("trace_id not found in ledger")
|
|
index = idx_by_trace[trace_id]
|
|
leaf_hash = leaves[index]
|
|
else:
|
|
if not leaf_hash:
|
|
raise SystemExit("provide --trace-id or --leaf-hash")
|
|
if leaf_hash not in leaves:
|
|
raise SystemExit("leaf_hash not found in ledger")
|
|
index = leaves.index(leaf_hash)
|
|
|
|
proof = merkle_inclusion_proof(leaves, index)
|
|
proof["leaf_hash"] = leaf_hash
|
|
proof["hour_utc"] = read_json(head).get("hour_utc")
|
|
print(json.dumps(proof, indent=2, sort_keys=True))
|
|
return 0
|
|
|
|
|
|
def cmd_verify_inclusion(args: argparse.Namespace) -> int:
|
|
proof = read_json(Path(args.proof).resolve())
|
|
leaf = str(proof.get("leaf_hash") or "").strip()
|
|
ok = merkle_verify_proof(leaf, proof)
|
|
print("OK" if ok else "FAIL")
|
|
return 0 if ok else 2
|
|
|
|
|
|
def main() -> int:
|
|
ap = argparse.ArgumentParser(prog="iftrace")
|
|
sub = ap.add_subparsers(dest="cmd", required=True)
|
|
|
|
v = sub.add_parser("verify", help="Verify a trace payload tarball (manifest, hashes, chains, signatures)")
|
|
v.add_argument("tar", help="Path to emo_trace_payload_<trace_id>.tar.gz")
|
|
v.add_argument("--expected-sha256", default="", help="Expected tarball SHA256 (optional)")
|
|
v.set_defaults(func=cmd_verify)
|
|
|
|
p = sub.add_parser("prove-inclusion", help="Generate a Merkle inclusion proof for a REQ_SEEN ledger leaf")
|
|
p.add_argument("--ledger", required=True, help="Path to req_seen_<hour>.jsonl")
|
|
p.add_argument("--head", required=True, help="Path to req_seen_head_<hour>.json")
|
|
g = p.add_mutually_exclusive_group(required=True)
|
|
g.add_argument("--trace-id", default="", help="Trace ID to prove inclusion for")
|
|
g.add_argument("--leaf-hash", default="", help="Leaf hash to prove inclusion for")
|
|
p.set_defaults(func=cmd_prove_inclusion)
|
|
|
|
pv = sub.add_parser("verify-inclusion", help="Verify a Merkle inclusion proof JSON")
|
|
pv.add_argument("proof", help="Path to proof JSON")
|
|
pv.set_defaults(func=cmd_verify_inclusion)
|
|
|
|
args = ap.parse_args()
|
|
return int(args.func(args))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|