Add v2.1 trace payload + styled paper + verifier
This commit is contained in:
parent
868cb3fa7e
commit
8fa6acb798
7 changed files with 616 additions and 0 deletions
234
IF_EMOTION_DEBUGGING_TRACE_WHITEPAPER_v2.1_STYLED.md
Normal file
234
IF_EMOTION_DEBUGGING_TRACE_WHITEPAPER_v2.1_STYLED.md
Normal file
|
|
@ -0,0 +1,234 @@
|
|||
# IF.EMOTION TRACE PROTOCOL v2.1: AUDITABLE DEBUGGING (WITHOUT WISHFUL THINKING)
|
||||
|
||||
**Subject:** End-to-End Traceability, Completeness Witnessing, and PQ-Anchored Evidence Binding
|
||||
**Protocol:** IF.TTT (Traceable, Transparent, Trustworthy)
|
||||
**Version:** 2.1 (Methodology Hardening)
|
||||
**Date:** 2025-12-21
|
||||
**Status:** AUDIT REQUIRED
|
||||
**Citation:** `if://whitepaper/emotion/trace-protocol/v2.1`
|
||||
|
||||
---
|
||||
|
||||
## 1) What This Protocol Actually Guarantees
|
||||
|
||||
This system does not try to “prove the model is true.” That is not a meaningful claim for probabilistic generation.
|
||||
|
||||
This system proves something narrower and more valuable:
|
||||
|
||||
1) what the system received (as a commitment),
|
||||
2) what the system did (trace event chain),
|
||||
3) what the system returned (output hash),
|
||||
4) what evidence it claims to have used (retrieval IDs + citation handles),
|
||||
5) that the resulting artifacts are tamper-evident and portable for external review.
|
||||
|
||||
If a claim cannot be bound to an artifact, it does not exist.
|
||||
|
||||
---
|
||||
|
||||
## 2) The Trace ID Contract (Non-Negotiable)
|
||||
|
||||
Every request to `/api/chat/completions` receives a Trace ID. This includes denials.
|
||||
|
||||
**Surfaces:**
|
||||
|
||||
- **Header:** `X-IF-Emotion-Trace: <uuid>`
|
||||
- **Header:** `X-IF-Emotion-Trace-Sig: <sig>` (app-level integrity)
|
||||
- **User output:** final line `Trace: <uuid>`
|
||||
|
||||
The Trace ID is the support ticket, the incident handle, and the audit join key.
|
||||
|
||||
---
|
||||
|
||||
## 3) Completeness: REQ_SEEN Witness Ledger (And Its Real Boundary)
|
||||
|
||||
Integrity alone is easy. Completeness is where systems lie.
|
||||
|
||||
### What REQ_SEEN does (v2.1)
|
||||
|
||||
REQ_SEEN records every request attempt that reaches the backend witness boundary as a privacy-preserving commitment:
|
||||
|
||||
- `user_text_sha256`, `user_len`, decision/reason, and `leaf_hash`
|
||||
|
||||
It writes:
|
||||
|
||||
- Hour ledger: `/opt/if-emotion/data/req_seen/<YYYYMMDDTHH>.jsonl`
|
||||
- Signed Merkle head: `/opt/if-emotion/data/req_seen/heads/<YYYYMMDDTHH>.json`
|
||||
|
||||
### The boundary (explicit)
|
||||
|
||||
REQ_SEEN completeness is only valid for requests that reach the backend process. Requests blocked before the backend are out of scope until the witness is moved to the edge proxy.
|
||||
|
||||
This is not a weakness in wording. It is a hard boundary condition.
|
||||
|
||||
---
|
||||
|
||||
## 4) Merkle Proofs: Roots Are Not Enough
|
||||
|
||||
A signed Merkle root helps, but roots alone do not give efficient proofs.
|
||||
|
||||
v2.1 adds inclusion proofs for REQ_SEEN:
|
||||
|
||||
- A specific trace can be proven to exist in an hourly ledger with an O(log n) Merkle path.
|
||||
- The proof is generated from the ledger and verified against the signed head.
|
||||
|
||||
Verification tooling is provided via `iftrace.py` (see Section 10).
|
||||
|
||||
---
|
||||
|
||||
## 5) Trace Events: Hash Chain + Immediate Head Attestation
|
||||
|
||||
Trace events are stored as a hash chain in:
|
||||
|
||||
- `/opt/if-emotion/data/trace_events.jsonl`
|
||||
|
||||
Each event includes:
|
||||
|
||||
- `prev_hash` pointer
|
||||
- `event_hash` computed as `sha256(prev_hash || canonical_json(event_without_event_hash))`
|
||||
|
||||
This detects deletion or modification of interior events. It does not prevent a malicious deployment from not emitting events. That is addressed as a limitation and a roadmap item (Section 11).
|
||||
|
||||
The trace head is also attested in the signed completion record with an app-level Ed25519 signature so integrity can be verified immediately.
|
||||
|
||||
---
|
||||
|
||||
## 6) Canonicalization: What We Hash Must Be Stable
|
||||
|
||||
Cryptographic systems die by “almost the same bytes.”
|
||||
|
||||
v2.1 mandates canonical JSON bytes for hashing/signing:
|
||||
|
||||
- Primary: `canonicaljson.encode_canonical_json(obj)`
|
||||
- Fallback: stable JSON serialization (`sort_keys`, fixed separators, UTF-8)
|
||||
|
||||
If two environments hash different bytes for “the same object,” you have no protocol.
|
||||
|
||||
---
|
||||
|
||||
## 7) Key Management (POC-Grade Today, Audit-Grade Tomorrow)
|
||||
|
||||
### Current state
|
||||
|
||||
- App Ed25519 signing key:
|
||||
- Private: `/opt/if-emotion/data/trace_ed25519.key` (0600)
|
||||
- Public: `/opt/if-emotion/data/trace_ed25519.pub` (shipped in bundles)
|
||||
- Key ID: `ed25519-app-v1`
|
||||
|
||||
The key is generated by libsodium-backed primitives and stored on disk with file permissions. This is acceptable for a POC, not for external certification.
|
||||
|
||||
### Rotation and compromise (required discipline)
|
||||
|
||||
- If the key is compromised: rotate immediately, bump `key_id`, and mark all traces from that time window as `trust_tier=degraded` unless independently anchored.
|
||||
- Old signatures remain verifiable with the historic public keys; key history must be preserved.
|
||||
|
||||
### Certification path
|
||||
|
||||
- Move keys to HSM/TPM or threshold signing.
|
||||
- Bind deploy attestations (image digest + config hash) to IF.TTT.
|
||||
|
||||
---
|
||||
|
||||
## 8) Post-Quantum: What Is PQ Today (And What Isn’t)
|
||||
|
||||
### What is PQ-anchored today
|
||||
|
||||
Evidence bundles are PQ-hybrid signed when registered into IF.TTT. The IF.TTT registry record includes:
|
||||
|
||||
- `pq_status: hybrid-fips204`
|
||||
- `pq_algo: ML-DSA-87`
|
||||
|
||||
This is the PQ anchoring layer.
|
||||
|
||||
### What is not PQ today
|
||||
|
||||
Hot-path app signatures (Ed25519) are not post-quantum. That is a conscious trade:
|
||||
|
||||
- Ed25519 provides immediate integrity at low latency.
|
||||
- PQ signing occurs at registration time in IF.TTT.
|
||||
|
||||
The correct claim is “PQ-anchored at registry time,” not “PQ everywhere.”
|
||||
|
||||
---
|
||||
|
||||
## 9) IF.story: Readability Without Evidence Drift
|
||||
|
||||
IF.story is a deterministic narrative projection of `trace_events.jsonl`.
|
||||
|
||||
It is not evidence. It is an index.
|
||||
|
||||
Each IF.story line includes the `event_hash` anchor, and auditors should verify those anchors against the raw JSONL.
|
||||
|
||||
---
|
||||
|
||||
## 10) Verifier Tooling (Independent Checks, Not Operator Vibes)
|
||||
|
||||
The bundle is designed to be verified with a single command and then deep-audited selectively.
|
||||
|
||||
Verifier:
|
||||
|
||||
- `iftrace.py verify <tar.gz> --expected-sha256 <sha>`
|
||||
|
||||
Merkle inclusion proof (REQ_SEEN):
|
||||
|
||||
- `iftrace.py prove-inclusion --ledger <req_seen_hour.jsonl> --head <req_seen_head.json> --trace-id <uuid>`
|
||||
- `iftrace.py verify-inclusion <proof.json>`
|
||||
|
||||
Checksum rules (important):
|
||||
|
||||
- `sha256s.txt` intentionally excludes itself and `manifest.json` to avoid self-referential checksum traps.
|
||||
|
||||
---
|
||||
|
||||
## 11) Threat Model and Limitations (Explicit)
|
||||
|
||||
### A) Truncation and external anchoring
|
||||
|
||||
Hash chains detect edits. They do not prevent truncation unless head hashes are anchored externally or independently cached.
|
||||
|
||||
Current mitigation:
|
||||
|
||||
- IF.TTT registration anchors the tarball hash into a separate chain.
|
||||
|
||||
Remaining requirement for certification:
|
||||
|
||||
- scheduled external anchoring of IF.TTT head hashes to a public append-only log.
|
||||
|
||||
### B) Clock integrity
|
||||
|
||||
Timestamps are derived from system clocks and are not trusted for cryptographic time.
|
||||
|
||||
Ordering is guaranteed by hash chain indices and hash pointers, not by wall-clock truth.
|
||||
|
||||
Certification path:
|
||||
|
||||
- introduce time witnesses or external timestamping for head hashes.
|
||||
|
||||
### C) Code integrity
|
||||
|
||||
Hash chains detect post-hoc tampering. They do not prevent a modified binary from choosing not to record.
|
||||
|
||||
Certification path:
|
||||
|
||||
- signed deploy attestations (image digest + config hash) bound into IF.TTT
|
||||
- optional remote attestation
|
||||
|
||||
---
|
||||
|
||||
## 12) Reference Proof Run (v2.1)
|
||||
|
||||
Trace ID:
|
||||
|
||||
- `016cca78-6f9d-4ffe-aec0-99792d383ca1`
|
||||
|
||||
Hosted tarball URL:
|
||||
|
||||
- `https://git.infrafabric.io/danny/hosted/raw/branch/main/emo_trace_payload_016cca78-6f9d-4ffe-aec0-99792d383ca1.tar.gz`
|
||||
|
||||
Tarball SHA256:
|
||||
|
||||
- `7101ff9c38fc759a66157f6a6ab9c0936af547d0ec77a51b5d05db07069966c8`
|
||||
|
||||
IF.TTT citation handle for the tarball (PQ hybrid signed):
|
||||
|
||||
- `if://citation/c24fe95e-226c-4efc-ba22-5ddcc37ff7d2/v1`
|
||||
|
||||
|
|
@ -0,0 +1 @@
|
|||
e6a2c04eb550f980f1e03dc220961c55fc9b8da356d115cf6085cc7b85f7815b /root/tmp/hosted_repo_update/IF_EMOTION_DEBUGGING_TRACE_WHITEPAPER_v2.1_STYLED.md
|
||||
17
README.md
17
README.md
|
|
@ -21,3 +21,20 @@ Static hosted artifacts used in InfraFabric reviews.
|
|||
|
||||
- File: `IF_EMOTION_DEBUGGING_TRACE_WHITEPAPER_v2.0_STYLED.md`
|
||||
- Notes: methodology v2.0, references trace tarball `emo_trace_payload_09aad3e1-f420-451e-a189-e86f68073dc0.tar.gz`.
|
||||
|
||||
## emo-social trace payload (v2.1, inclusion proof + pubkey)
|
||||
|
||||
- File: `emo_trace_payload_016cca78-6f9d-4ffe-aec0-99792d383ca1.tar.gz`
|
||||
- SHA256: `7101ff9c38fc759a66157f6a6ab9c0936af547d0ec77a51b5d05db07069966c8`
|
||||
- IF.TTT citation (PQ hybrid signed): `if://citation/c24fe95e-226c-4efc-ba22-5ddcc37ff7d2/v1`
|
||||
- Notes: includes `payload/trace_ed25519.pub` + `payload/req_seen_inclusion_proof.json` + nested priors (`payload/ttt_children*.json`).
|
||||
|
||||
## IF.emotion trace whitepaper (styled v2.1)
|
||||
|
||||
- File: `IF_EMOTION_DEBUGGING_TRACE_WHITEPAPER_v2.1_STYLED.md`
|
||||
|
||||
## Verifier tool
|
||||
|
||||
- File: `iftrace.py` (run with a Python venv that has `canonicaljson` + `pynacl`)
|
||||
- Verify tarball: `python iftrace.py verify emo_trace_payload_<trace>.tar.gz --expected-sha256 <sha>`
|
||||
- Prove inclusion: `python iftrace.py prove-inclusion --ledger req_seen_<hour>.jsonl --head req_seen_head_<hour>.json --trace-id <uuid>`
|
||||
|
|
|
|||
BIN
emo_trace_payload_016cca78-6f9d-4ffe-aec0-99792d383ca1.tar.gz
Normal file
BIN
emo_trace_payload_016cca78-6f9d-4ffe-aec0-99792d383ca1.tar.gz
Normal file
Binary file not shown.
|
|
@ -0,0 +1 @@
|
|||
7101ff9c38fc759a66157f6a6ab9c0936af547d0ec77a51b5d05db07069966c8 /root/tmp/hosted_repo_update/emo_trace_payload_016cca78-6f9d-4ffe-aec0-99792d383ca1.tar.gz
|
||||
362
iftrace.py
Normal file
362
iftrace.py
Normal file
|
|
@ -0,0 +1,362 @@
|
|||
#!/usr/bin/env python3
|
||||
"""
|
||||
IF.emotion trace bundle verifier + Merkle inclusion proof tool.
|
||||
|
||||
Run with the venv:
|
||||
/root/tmp/iftrace_venv/bin/python /root/tmp/iftrace.py <command> ...
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import hashlib
|
||||
import io
|
||||
import json
|
||||
import os
|
||||
import tarfile
|
||||
import tempfile
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from typing import Any, Iterable
|
||||
|
||||
from canonicaljson import encode_canonical_json
|
||||
from nacl.signing import VerifyKey
|
||||
from nacl.encoding import HexEncoder
|
||||
|
||||
|
||||
def sha256_bytes(data: bytes) -> str:
|
||||
return hashlib.sha256(data or b"").hexdigest()
|
||||
|
||||
|
||||
def sha256_file(path: Path) -> str:
|
||||
h = hashlib.sha256()
|
||||
with path.open("rb") as f:
|
||||
for chunk in iter(lambda: f.read(1024 * 1024), b""):
|
||||
h.update(chunk)
|
||||
return h.hexdigest()
|
||||
|
||||
|
||||
def canonical_json_bytes(obj: Any) -> bytes:
|
||||
return encode_canonical_json(obj)
|
||||
|
||||
|
||||
def merkle_root_hex(leaves_hex: list[str]) -> str:
|
||||
if not leaves_hex:
|
||||
return sha256_bytes(b"")
|
||||
level: list[bytes] = [bytes.fromhex(h) for h in leaves_hex if isinstance(h, str) and len(h) == 64]
|
||||
if not level:
|
||||
return sha256_bytes(b"")
|
||||
while len(level) > 1:
|
||||
if len(level) % 2 == 1:
|
||||
level.append(level[-1])
|
||||
nxt: list[bytes] = []
|
||||
for i in range(0, len(level), 2):
|
||||
nxt.append(hashlib.sha256(level[i] + level[i + 1]).digest())
|
||||
level = nxt
|
||||
return level[0].hex()
|
||||
|
||||
|
||||
def merkle_inclusion_proof(leaves_hex: list[str], index: int) -> dict:
|
||||
if index < 0 or index >= len(leaves_hex):
|
||||
raise ValueError("index out of range")
|
||||
level: list[bytes] = [bytes.fromhex(h) for h in leaves_hex]
|
||||
proof: list[dict] = []
|
||||
idx = index
|
||||
while len(level) > 1:
|
||||
if len(level) % 2 == 1:
|
||||
level.append(level[-1])
|
||||
sibling_idx = idx ^ 1
|
||||
sibling = level[sibling_idx]
|
||||
side = "left" if sibling_idx < idx else "right"
|
||||
proof.append({"sibling": sibling.hex(), "side": side})
|
||||
nxt: list[bytes] = []
|
||||
for i in range(0, len(level), 2):
|
||||
nxt.append(hashlib.sha256(level[i] + level[i + 1]).digest())
|
||||
level = nxt
|
||||
idx //= 2
|
||||
root = level[0].hex()
|
||||
return {"index": index, "root": root, "path": proof}
|
||||
|
||||
|
||||
def merkle_verify_proof(leaf_hex: str, proof: dict) -> bool:
|
||||
try:
|
||||
cur = bytes.fromhex(leaf_hex)
|
||||
for step in proof.get("path", []):
|
||||
sib = bytes.fromhex(step["sibling"])
|
||||
if step["side"] == "left":
|
||||
cur = hashlib.sha256(sib + cur).digest()
|
||||
else:
|
||||
cur = hashlib.sha256(cur + sib).digest()
|
||||
return cur.hex() == proof.get("root")
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
|
||||
def read_json(path: Path) -> Any:
|
||||
return json.loads(path.read_text(encoding="utf-8", errors="strict"))
|
||||
|
||||
|
||||
def verify_ed25519_hex(*, pub_hex: str, msg: bytes, sig_hex: str) -> None:
|
||||
vk = VerifyKey(pub_hex, encoder=HexEncoder)
|
||||
vk.verify(msg, bytes.fromhex(sig_hex))
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class VerifyResult:
|
||||
ok: bool
|
||||
notes: list[str]
|
||||
|
||||
|
||||
def verify_trace_events(events_path: Path) -> VerifyResult:
|
||||
notes: list[str] = []
|
||||
prev_hash = "0" * 64
|
||||
expected_idx = 0
|
||||
lines = events_path.read_text(encoding="utf-8", errors="ignore").splitlines()
|
||||
for line in lines:
|
||||
if not line.strip():
|
||||
continue
|
||||
obj = json.loads(line)
|
||||
ev = obj.get("event") or {}
|
||||
idx = int(ev.get("idx", -1))
|
||||
if idx != expected_idx:
|
||||
return VerifyResult(False, notes + [f"trace_events: idx mismatch (got {idx}, expected {expected_idx})"])
|
||||
if str(ev.get("prev_hash") or "") != prev_hash:
|
||||
return VerifyResult(False, notes + ["trace_events: prev_hash mismatch"])
|
||||
stored_hash = str(ev.get("event_hash") or "")
|
||||
payload = dict(ev)
|
||||
payload.pop("event_hash", None)
|
||||
recomputed = sha256_bytes(prev_hash.encode("utf-8") + canonical_json_bytes(payload))
|
||||
if recomputed != stored_hash:
|
||||
return VerifyResult(False, notes + ["trace_events: event_hash mismatch (recomputed != stored)"])
|
||||
prev_hash = stored_hash
|
||||
expected_idx += 1
|
||||
notes.append(f"trace_events: ok (events={expected_idx}, head_hash={prev_hash[:16]}…)")
|
||||
return VerifyResult(True, notes)
|
||||
|
||||
|
||||
def verify_req_seen(ledger_path: Path, head_path: Path) -> VerifyResult:
|
||||
notes: list[str] = []
|
||||
head = read_json(head_path)
|
||||
pub_hex = str(head.get("signer_ed25519") or "").strip()
|
||||
sig_hex = str(head.get("sig_ed25519") or "").strip()
|
||||
if not pub_hex or not sig_hex:
|
||||
return VerifyResult(False, ["req_seen: missing signer_ed25519 or sig_ed25519 in head"])
|
||||
# Recreate the message that was signed (the head core before adding sig/key_id/signer).
|
||||
head_core = {
|
||||
"schema": head.get("schema"),
|
||||
"hour_utc": head.get("hour_utc"),
|
||||
"updated_utc": head.get("updated_utc"),
|
||||
"count": head.get("count"),
|
||||
"merkle_root": head.get("merkle_root"),
|
||||
"last_leaf_hash": head.get("last_leaf_hash"),
|
||||
}
|
||||
verify_ed25519_hex(pub_hex=pub_hex, msg=canonical_json_bytes(head_core), sig_hex=sig_hex)
|
||||
notes.append("req_seen_head: Ed25519 signature OK")
|
||||
|
||||
leaves: list[str] = []
|
||||
lines = ledger_path.read_text(encoding="utf-8", errors="ignore").splitlines()
|
||||
for line in lines:
|
||||
if not line.strip():
|
||||
continue
|
||||
entry = json.loads(line)
|
||||
leaf = str(entry.get("leaf_hash") or "").strip()
|
||||
entry_core = dict(entry)
|
||||
entry_core.pop("leaf_hash", None)
|
||||
recomputed_leaf = sha256_bytes(canonical_json_bytes(entry_core))
|
||||
if recomputed_leaf != leaf:
|
||||
return VerifyResult(False, notes + ["req_seen: leaf_hash mismatch"])
|
||||
leaves.append(leaf)
|
||||
|
||||
root = merkle_root_hex(leaves)
|
||||
if root != str(head.get("merkle_root") or ""):
|
||||
return VerifyResult(False, notes + ["req_seen: merkle_root mismatch"])
|
||||
if int(head.get("count") or 0) != len(leaves):
|
||||
return VerifyResult(False, notes + ["req_seen: count mismatch"])
|
||||
notes.append(f"req_seen: ok (count={len(leaves)}, merkle_root={root[:16]}…)")
|
||||
return VerifyResult(True, notes)
|
||||
|
||||
|
||||
def verify_story(story_path: Path, events_path: Path) -> VerifyResult:
|
||||
notes: list[str] = []
|
||||
# Collect all event hashes from ground truth.
|
||||
hashes: set[str] = set()
|
||||
for line in events_path.read_text(encoding="utf-8", errors="ignore").splitlines():
|
||||
if not line.strip():
|
||||
continue
|
||||
ev = (json.loads(line).get("event") or {})
|
||||
h = str(ev.get("event_hash") or "").strip()
|
||||
if len(h) == 64:
|
||||
hashes.add(h)
|
||||
# Ensure every story line that mentions event_hash=... points to a real event.
|
||||
for line in story_path.read_text(encoding="utf-8", errors="ignore").splitlines():
|
||||
if "event_hash=" not in line:
|
||||
continue
|
||||
h = line.split("event_hash=", 1)[1].strip().split()[0]
|
||||
if h and h not in hashes:
|
||||
return VerifyResult(False, [f"if_story: unknown event_hash referenced: {h}"])
|
||||
notes.append("if_story: ok (all referenced event_hash values exist)")
|
||||
return VerifyResult(True, notes)
|
||||
|
||||
|
||||
def verify_manifest(payload_dir: Path) -> VerifyResult:
|
||||
notes: list[str] = []
|
||||
manifest_path = payload_dir / "manifest.json"
|
||||
sha_list_path = payload_dir / "sha256s.txt"
|
||||
if not manifest_path.exists() or not sha_list_path.exists():
|
||||
return VerifyResult(False, ["manifest: missing manifest.json or sha256s.txt"])
|
||||
|
||||
manifest = read_json(manifest_path)
|
||||
files = manifest.get("files") or []
|
||||
manifest_map = {f["path"]: f["sha256"] for f in files if isinstance(f, dict) and "path" in f and "sha256" in f}
|
||||
|
||||
sha_map: dict[str, str] = {}
|
||||
for line in sha_list_path.read_text(encoding="utf-8", errors="ignore").splitlines():
|
||||
parts = line.strip().split()
|
||||
if len(parts) >= 2:
|
||||
sha_map[parts[1]] = parts[0]
|
||||
|
||||
# sha256s.txt is a checksum file; it must not be self-referential.
|
||||
sha_map.pop("sha256s.txt", None)
|
||||
# manifest.json is the root index; do not make it self-referential in sha256s.
|
||||
sha_map.pop("manifest.json", None)
|
||||
|
||||
for name, sha in sha_map.items():
|
||||
p = payload_dir / name
|
||||
if not p.exists():
|
||||
return VerifyResult(False, [f"manifest: sha256s references missing file: {name}"])
|
||||
got = sha256_file(p)
|
||||
if got != sha:
|
||||
return VerifyResult(False, [f"manifest: sha256 mismatch for {name}"])
|
||||
if name != "manifest.json":
|
||||
if manifest_map.get(name) != sha:
|
||||
return VerifyResult(False, [f"manifest: manifest.json mismatch for {name}"])
|
||||
|
||||
notes.append(f"manifest: ok (files={len(sha_map)})")
|
||||
return VerifyResult(True, notes)
|
||||
|
||||
|
||||
def extract_tarball(tar_path: Path) -> Path:
|
||||
tmp = Path(tempfile.mkdtemp(prefix="iftrace_"))
|
||||
with tarfile.open(tar_path, "r:gz") as tf:
|
||||
tf.extractall(tmp) # trusted local artifact
|
||||
return tmp
|
||||
|
||||
|
||||
def cmd_verify(args: argparse.Namespace) -> int:
|
||||
tar_path = Path(args.tar).resolve()
|
||||
expected_sha = (args.expected_sha256 or "").strip().lower()
|
||||
got_sha = sha256_file(tar_path)
|
||||
if expected_sha and got_sha != expected_sha:
|
||||
print(f"FAIL tar_sha256 expected={expected_sha} got={got_sha}")
|
||||
return 2
|
||||
print(f"OK tar_sha256 {got_sha}")
|
||||
|
||||
root = extract_tarball(tar_path)
|
||||
payload_dir = root / "payload"
|
||||
if not payload_dir.exists():
|
||||
print("FAIL: tarball missing payload/ directory")
|
||||
return 2
|
||||
|
||||
checks: list[VerifyResult] = []
|
||||
checks.append(verify_manifest(payload_dir))
|
||||
|
||||
events_path = payload_dir / "trace_events.jsonl"
|
||||
if events_path.exists():
|
||||
checks.append(verify_trace_events(events_path))
|
||||
|
||||
story_path = payload_dir / "if_story.md"
|
||||
if story_path.exists() and events_path.exists():
|
||||
checks.append(verify_story(story_path, events_path))
|
||||
|
||||
# REQ_SEEN verification if present
|
||||
head_files = sorted(payload_dir.glob("req_seen_head_*.json"))
|
||||
ledger_files = sorted(payload_dir.glob("req_seen_*.jsonl"))
|
||||
if head_files and ledger_files:
|
||||
checks.append(verify_req_seen(ledger_files[0], head_files[0]))
|
||||
|
||||
ok = True
|
||||
for res in checks:
|
||||
for n in res.notes:
|
||||
print(n)
|
||||
ok = ok and res.ok
|
||||
|
||||
if not ok:
|
||||
print("FAIL verify")
|
||||
return 2
|
||||
print("OK verify")
|
||||
return 0
|
||||
|
||||
|
||||
def cmd_prove_inclusion(args: argparse.Namespace) -> int:
|
||||
ledger = Path(args.ledger).resolve()
|
||||
head = Path(args.head).resolve()
|
||||
trace_id = (args.trace_id or "").strip()
|
||||
leaf_hash = (args.leaf_hash or "").strip().lower()
|
||||
|
||||
leaves: list[str] = []
|
||||
idx_by_trace: dict[str, int] = {}
|
||||
lines = ledger.read_text(encoding="utf-8", errors="ignore").splitlines()
|
||||
for i, line in enumerate(lines):
|
||||
if not line.strip():
|
||||
continue
|
||||
entry = json.loads(line)
|
||||
lh = str(entry.get("leaf_hash") or "").strip()
|
||||
leaves.append(lh)
|
||||
tid = str(entry.get("trace_id") or "").strip()
|
||||
if tid and tid not in idx_by_trace:
|
||||
idx_by_trace[tid] = len(leaves) - 1
|
||||
|
||||
if trace_id:
|
||||
if trace_id not in idx_by_trace:
|
||||
raise SystemExit("trace_id not found in ledger")
|
||||
index = idx_by_trace[trace_id]
|
||||
leaf_hash = leaves[index]
|
||||
else:
|
||||
if not leaf_hash:
|
||||
raise SystemExit("provide --trace-id or --leaf-hash")
|
||||
if leaf_hash not in leaves:
|
||||
raise SystemExit("leaf_hash not found in ledger")
|
||||
index = leaves.index(leaf_hash)
|
||||
|
||||
proof = merkle_inclusion_proof(leaves, index)
|
||||
proof["leaf_hash"] = leaf_hash
|
||||
proof["hour_utc"] = read_json(head).get("hour_utc")
|
||||
print(json.dumps(proof, indent=2, sort_keys=True))
|
||||
return 0
|
||||
|
||||
|
||||
def cmd_verify_inclusion(args: argparse.Namespace) -> int:
|
||||
proof = read_json(Path(args.proof).resolve())
|
||||
leaf = str(proof.get("leaf_hash") or "").strip()
|
||||
ok = merkle_verify_proof(leaf, proof)
|
||||
print("OK" if ok else "FAIL")
|
||||
return 0 if ok else 2
|
||||
|
||||
|
||||
def main() -> int:
|
||||
ap = argparse.ArgumentParser(prog="iftrace")
|
||||
sub = ap.add_subparsers(dest="cmd", required=True)
|
||||
|
||||
v = sub.add_parser("verify", help="Verify a trace payload tarball (manifest, hashes, chains, signatures)")
|
||||
v.add_argument("tar", help="Path to emo_trace_payload_<trace_id>.tar.gz")
|
||||
v.add_argument("--expected-sha256", default="", help="Expected tarball SHA256 (optional)")
|
||||
v.set_defaults(func=cmd_verify)
|
||||
|
||||
p = sub.add_parser("prove-inclusion", help="Generate a Merkle inclusion proof for a REQ_SEEN ledger leaf")
|
||||
p.add_argument("--ledger", required=True, help="Path to req_seen_<hour>.jsonl")
|
||||
p.add_argument("--head", required=True, help="Path to req_seen_head_<hour>.json")
|
||||
g = p.add_mutually_exclusive_group(required=True)
|
||||
g.add_argument("--trace-id", default="", help="Trace ID to prove inclusion for")
|
||||
g.add_argument("--leaf-hash", default="", help="Leaf hash to prove inclusion for")
|
||||
p.set_defaults(func=cmd_prove_inclusion)
|
||||
|
||||
pv = sub.add_parser("verify-inclusion", help="Verify a Merkle inclusion proof JSON")
|
||||
pv.add_argument("proof", help="Path to proof JSON")
|
||||
pv.set_defaults(func=cmd_verify_inclusion)
|
||||
|
||||
args = ap.parse_args()
|
||||
return int(args.func(args))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
1
iftrace.py.sha256
Normal file
1
iftrace.py.sha256
Normal file
|
|
@ -0,0 +1 @@
|
|||
7b4587a12218b37abaafe56adb3a7071e84bbc6bfe9ee9442236e476c6533f9c /root/tmp/hosted_repo_update/iftrace.py
|
||||
Loading…
Add table
Reference in a new issue