commit 9b53aaff9eee3ff7b0bbf1b62b2197751a8ea7ac Author: danny Date: Wed Dec 24 07:57:00 2025 +0000 Initial import: Emo-Social Insta DM agent diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..a20ea03 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,11 @@ +__pycache__/ +*.pyc +*.pyo +*.pyd +.pytest_cache/ +.mypy_cache/ +.ruff_cache/ +.venv/ +venv/ +.git/ + diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..2e7a1fa --- /dev/null +++ b/.gitignore @@ -0,0 +1,10 @@ +__pycache__/ +*.pyc +.venv/ +venv/ +.DS_Store +.idea/ +.vscode/ +/dist/ +/build/ + diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..8512b52 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,11 @@ +FROM python:3.13-slim + +WORKDIR /app + +COPY sergio_instagram_messaging/ /app/sergio_instagram_messaging/ +COPY README.md /app/README.md + +ENV PYTHONUNBUFFERED=1 + +CMD ["python", "-m", "sergio_instagram_messaging.export_meta_ig_history", "--help"] + diff --git a/README.md b/README.md new file mode 100644 index 0000000..c6af9d5 --- /dev/null +++ b/README.md @@ -0,0 +1,161 @@ +# Emo-Social Insta DM Agent + +Project for the **Emo-Social Instagram DM agent** for `@socialmediatorr` (distinct from the existing Sergio RAG DB agent). + +Includes: +- Meta Graph API exporter (full DM history) +- Instagram “Download your information” importer +- DM analysis pipeline (bot-vs-human, conversions, objections, rescue logic, product eras) +- Helpers for device login + page subscription + sending replies + +## Where this runs + +Build/run the Docker image in `pct 250` (ai-dev) and keep production (`pct 220`) clean. + +## Credentials + +Source of truth creds file (host): + +- `/root/tmp/emo-social-meta-app-creds.txt` + +Inside `pct 250`, place the same file at the same path: + +- `/root/tmp/emo-social-meta-app-creds.txt` + +Required for export: + +- `META_PAGE_ID` +- `META_PAGE_ACCESS_TOKEN` + +## Build (pct 250) + +- `docker build -t emo-social-insta-dm-agent:dev /root/ai-workspace/emo-social-insta-dm-agent` + +Note: in this LXC, `docker run` / `podman run` can fail due to AppArmor confinement. Use the **direct Python** commands below unless you change the LXC config to be AppArmor-unconfined. + +## Obtain tokens (pct 250) + +`meta_device_login` requires `META_CLIENT_TOKEN` (from Meta app dashboard → Settings → Advanced → Client token). +Alternatively, set `META_CLIENT_ACCESS_TOKEN` as `APP_ID|CLIENT_TOKEN` to override `META_APP_ID` for device-login only. + +If `meta_device_login start` returns OAuth error `(#3) enable "Login from Devices"`, enable it in the *same app id* the script prints: + +- `https://developers.facebook.com/apps//fb-login/settings/` → set **Login from Devices** = Yes + +Start device login (prints a URL + code to enter; no tokens are printed): + +- `cd /root/ai-workspace/emo-social-insta-dm-agent && python3 -m sergio_instagram_messaging.meta_device_login start` + +After authorizing in the browser, poll and write `META_PAGE_ID` + `META_PAGE_ACCESS_TOKEN` into `/root/tmp/emo-social-meta-app-creds.txt`: + +- `cd /root/ai-workspace/emo-social-insta-dm-agent && python3 -m sergio_instagram_messaging.meta_device_login poll --write-page-token --target-ig-user-id 17841466913731557` + +### Fallback: Graph API Explorer → user token → page token + +If device-login is blocked, get a **Facebook User access token** via Graph API Explorer and derive the Page token: + +- Open: `https://developers.facebook.com/tools/explorer/` +- Select the correct app, then generate a user token with: + - `pages_show_list,pages_read_engagement,instagram_manage_messages` +- Save the user token to (keep mode `600`): + - `/root/tmp/meta-user-access-token.txt` +- Then write `META_PAGE_ID` + `META_PAGE_ACCESS_TOKEN` into `/root/tmp/emo-social-meta-app-creds.txt` (no token values printed): +- `cd /root/ai-workspace/emo-social-insta-dm-agent && python3 -m sergio_instagram_messaging.meta_page_token_from_user_token --target-ig-user-id 17841466913731557` + +## Export history (pct 250) + +Quick token sanity check (does not print token values): + +- `cd /root/ai-workspace/emo-social-insta-dm-agent && python3 -m sergio_instagram_messaging.meta_token_doctor` + +Export to a mounted directory so results persist on disk: + +- `docker run --rm -v /root/tmp:/root/tmp emo-social-insta-dm-agent:dev python -m sergio_instagram_messaging.export_meta_ig_history --out /root/tmp/emo-social-insta-dm-history` + - (Direct) `cd /root/ai-workspace/emo-social-insta-dm-agent && python3 -m sergio_instagram_messaging.export_meta_ig_history --out /root/tmp/emo-social-insta-dm-history` + +### Workaround: fetch a single thread by `user_id` + +If `/conversations` listing times out, you can still fetch a single thread if you know the sender’s `user_id` (from webhook `sender.id`): + +- `cd /root/ai-workspace/emo-social-insta-dm-agent && python3 -m sergio_instagram_messaging.fetch_thread_messages --user-id --max-pages 2 --out /root/tmp/thread.jsonl` + +Small test run: + +- `docker run --rm -v /root/tmp:/root/tmp emo-social-insta-dm-agent:dev python -m sergio_instagram_messaging.export_meta_ig_history --out /root/tmp/emo-social-insta-dm-history --max-conversations 3 --max-pages 2` + - (Direct) `cd /root/ai-workspace/emo-social-insta-dm-agent && python3 -m sergio_instagram_messaging.export_meta_ig_history --out /root/tmp/emo-social-insta-dm-history --max-conversations 3 --max-pages 2` + +## Full history fallback (Instagram export) + +If Meta app review blocks Graph API DM access, export Sergio’s IG data via Instagram “Download your information” and import it: + +- `cd /root/ai-workspace/emo-social-insta-dm-agent && python3 -m sergio_instagram_messaging.import_instagram_export --input /path/to/instagram-export.zip --out /root/tmp/emo-social-insta-dm-export-history` + +## Analyze DM history (Behavioral Cloning / “Biographical Sales”) + +This produces the “Sergio persona” artifacts needed for the DM agent: + +- Separates frequent `[BOT]` templates vs rare `[MANUAL]` replies (plus `[HYBRID]`). +- Builds **bot fatigue** + **script editorial timeline** charts. +- Extracts **training pairs** (user → Sergio manual reply) from converted threads. +- Generates **rescue playbook** (human saves after silence/negative sentiment). +- Generates **objection handlers** (price/time/trust/stop → best replies). +- Builds a quarterly **eras** CSV (offers/pricing + vocabulary drift). + +Outputs are written with mode `600` and may contain sensitive DM content. Keep them out of git. + +### Analyze a raw Instagram export folder (recommended) + +Optional: index first (lets you filter recency without scanning every thread): + +- `python3 -m sergio_instagram_messaging.index_instagram_export --input /path/to/your_instagram_activity --out /root/tmp/socialmediatorr-ig-index.jsonl` + +Then analyze: + +- `python3 -m sergio_instagram_messaging.analyze_instagram_export --input /path/to/your_instagram_activity --out /root/tmp/socialmediatorr-agent-analysis --owner-name "Sergio de Vocht" --index /root/tmp/socialmediatorr-ig-index.jsonl --since-days 180` + +### Analyze an imported history dir (messages/*.jsonl) + +If you already ran `import_instagram_export` (or have a partial import output), point the analyzer at that directory: + +- `python3 -m sergio_instagram_messaging.analyze_instagram_export --input /root/tmp/socialmediatorr-ig-export-history --out /root/tmp/socialmediatorr-agent-analysis --owner-name "Sergio de Vocht"` + +### Two-stage workflow (verify templates before full run) + +Pass 1 generates `top_outgoing_templates.json` + `template_counts.jsonl`: + +- `python3 -m sergio_instagram_messaging.analyze_instagram_export --input /path/to/your_instagram_activity --out /root/tmp/socialmediatorr-agent-analysis --stage pass1` + +Pass 2 reuses the cache and writes the full deliverables: + +- `python3 -m sergio_instagram_messaging.analyze_instagram_export --input /path/to/your_instagram_activity --out /root/tmp/socialmediatorr-agent-analysis --stage pass2 --templates-cache /root/tmp/socialmediatorr-agent-analysis/top_outgoing_templates.json` + +### Human-readable report (English) + +After analysis, generate a single Markdown report: + +- `python3 -m sergio_instagram_messaging.generate_dm_report --analysis-dir /root/tmp/socialmediatorr-agent-analysis` + +## Webhooks (new messages → auto-reply) + +Meta webhooks are two steps: + +1) App subscription (`/{app_id}/subscriptions`) — sets callback URL + verify token + fields. +2) Page subscription (`/{page_id}/subscribed_apps`) — attaches the Page/IG inbox to the app. + +The production webhook endpoint exists at: + +- `https://emo-social.infrafabric.io/meta/webhook` + +### Subscribe the Page to the app (required for message delivery) + +This requires a Page access token that includes `pages_manage_metadata`. + +Device login with that scope: + +- `cd /root/ai-workspace/emo-social-insta-dm-agent && python3 -m sergio_instagram_messaging.meta_device_login start --scope pages_show_list,pages_read_engagement,instagram_manage_messages,pages_manage_metadata` +- After authorizing in the browser, poll and write tokens: + - `cd /root/ai-workspace/emo-social-insta-dm-agent && python3 -m sergio_instagram_messaging.meta_device_login poll --write-page-token --target-ig-user-id 17841466913731557` + +Then subscribe the Page: + +- `cd /root/ai-workspace/emo-social-insta-dm-agent && python3 -m sergio_instagram_messaging.meta_subscribe_page subscribe --fields messages` diff --git a/compose.yaml b/compose.yaml new file mode 100644 index 0000000..4d292d0 --- /dev/null +++ b/compose.yaml @@ -0,0 +1,14 @@ +services: + exporter: + build: . + image: emo-social-insta-dm-agent:dev + volumes: + - /root/tmp:/root/tmp + command: + [ + "python", + "-m", + "sergio_instagram_messaging.export_meta_ig_history", + "--out", + "/root/tmp/emo-social-insta-dm-history", + ] diff --git a/sergio_instagram_messaging/__init__.py b/sergio_instagram_messaging/__init__.py new file mode 100644 index 0000000..22cc122 --- /dev/null +++ b/sergio_instagram_messaging/__init__.py @@ -0,0 +1 @@ +"""Sergio Instagram messaging utilities.""" diff --git a/sergio_instagram_messaging/analyze_instagram_export.py b/sergio_instagram_messaging/analyze_instagram_export.py new file mode 100644 index 0000000..203c5d4 --- /dev/null +++ b/sergio_instagram_messaging/analyze_instagram_export.py @@ -0,0 +1,1595 @@ +from __future__ import annotations + +import argparse +import csv +import difflib +import json +import os +import re +import statistics +import sys +import time +from collections import Counter, defaultdict +from dataclasses import dataclass +from datetime import datetime, timedelta, timezone +from pathlib import Path +from typing import Any, Iterable, Literal + +from .simple_png import Canvas + + +def _now_utc_iso() -> str: + return datetime.now(timezone.utc).replace(microsecond=0).isoformat() + + +def _dt_from_ts_ms(ts_ms: int) -> datetime: + return datetime.fromtimestamp(ts_ms / 1000.0, tz=timezone.utc) + + +def _iso_from_ts_ms(ts_ms: int | None) -> str | None: + if not ts_ms: + return None + return _dt_from_ts_ms(ts_ms).replace(microsecond=0).isoformat() + + +def _week_key(ts_ms: int) -> str: + dt = _dt_from_ts_ms(ts_ms) + y, w, _ = dt.isocalendar() + return f"{y:04d}-W{w:02d}" + + +def _quarter_key(ts_ms: int) -> str: + dt = _dt_from_ts_ms(ts_ms) + q = (dt.month - 1) // 3 + 1 + return f"{dt.year:04d}-Q{q}" + + +_RE_URL = re.compile(r"(?i)\b(?:https?://|www\.)\S+") +_RE_EMAIL = re.compile(r"(?i)\b[\w.+-]+@[\w.-]+\.[a-z]{2,}\b") +_RE_HANDLE = re.compile(r"(? ]+", flags=re.UNICODE) +_RE_WS = re.compile(r"\s+") +_RE_EXPORT_TS_BYTES = re.compile(rb'"timestamp_ms"\s*:\s*(\d{10,})') + + +def canonicalize_text(text: str) -> str: + s = (text or "").strip().lower() + if not s: + return "" + s = _RE_EMAIL.sub("", s) + s = _RE_URL.sub("", s) + s = _RE_HANDLE.sub("", s) + s = _RE_MONEY.sub("", s) + s = _RE_NUM.sub("", s) + s = _RE_NONWORD.sub(" ", s) + s = _RE_WS.sub(" ", s).strip() + return s + + +_RE_PHONE_LOOSE = re.compile(r"(? str: + s = (text or "").strip() + if not s: + return "" + s = _RE_EMAIL.sub("", s) + s = _RE_URL.sub("", s) + s = _RE_HANDLE.sub("", s) + + def repl_phone(m: re.Match[str]) -> str: + raw = m.group(0) + digits = sum(1 for ch in raw if ch.isdigit()) + return "" if digits >= 8 else raw + + s = _RE_PHONE_LOOSE.sub(repl_phone, s) + return s + + +EN_STOPWORDS = { + "a", + "an", + "and", + "are", + "as", + "at", + "be", + "but", + "by", + "for", + "from", + "has", + "have", + "he", + "her", + "his", + "i", + "if", + "in", + "is", + "it", + "its", + "me", + "my", + "no", + "not", + "of", + "on", + "or", + "our", + "she", + "so", + "that", + "the", + "their", + "them", + "there", + "they", + "this", + "to", + "u", + "us", + "we", + "what", + "with", + "you", + "your", +} + +ES_STOPWORDS = { + "a", + "al", + "algo", + "como", + "con", + "de", + "del", + "el", + "ella", + "en", + "era", + "eres", + "es", + "esa", + "ese", + "esto", + "gracias", + "hola", + "la", + "las", + "lo", + "los", + "me", + "mi", + "mis", + "mucho", + "muy", + "no", + "para", + "por", + "que", + "se", + "si", + "sin", + "soy", + "su", + "sus", + "te", + "tu", + "tus", + "un", + "una", + "y", + "yo", +} + + +def tokenize(text: str) -> list[str]: + return re.findall(r"\b[\w']+\b", (text or "").lower(), flags=re.UNICODE) + + +def guess_lang(text: str) -> Literal["en", "es", "unknown"]: + s = (text or "").lower() + if not s: + return "unknown" + if any(ch in s for ch in "áéíóúñü¿¡"): + return "es" + tokens = set(tokenize(s)) + es_hits = len(tokens & {"que", "para", "pero", "porque", "gracias", "hola", "vale", "listo", "precio"}) + en_hits = len(tokens & {"the", "and", "but", "because", "thanks", "hello", "price", "ready"}) + if es_hits > en_hits: + return "es" + if en_hits > es_hits: + return "en" + return "unknown" + + +def message_has_emoji(text: str) -> bool: + # Basic heuristic: emojis are typically outside ASCII range. + return any(ord(ch) > 0x1FFF for ch in (text or "")) + + +_POS_WORDS = {"great", "awesome", "perfect", "thanks", "thank", "amazing", "love", "genial", "perfecto", "gracias"} +_NEG_WORDS = { + "no", + "not", + "dont", + "don't", + "stop", + "leave", + "expensive", + "later", + "busy", + "scam", + "caro", + "dinero", + "luego", + "ocupado", + "estafa", +} + + +def sentiment_score(text: str) -> int: + toks = tokenize(text) + pos = sum(1 for t in toks if t in _POS_WORDS) + neg = sum(1 for t in toks if t in _NEG_WORDS) + return pos - neg + + +def extract_offer_signals(text: str) -> dict[str, Any] | None: + raw = (text or "").strip() + if not raw: + return None + + lower = raw.lower() + price = None + m = _RE_MONEY.search(raw) + if m: + price = m.group(0) + + offer_terms = [] + for term in ( + "book", + "ebook", + "pdf", + "call", + "strategy call", + "audit", + "mentorship", + "mentoring", + "coaching", + "consulting", + "service", + "done for you", + "course", + "workshop", + "calendly", + "stripe", + "paypal", + ): + if term in lower: + offer_terms.append(term) + + if not price and not offer_terms: + return None + + tokens = raw.split() + around = None + if price: + try: + idx = next(i for i, t in enumerate(tokens) if price in t) + except StopIteration: + idx = None + if idx is not None: + start = max(0, idx - 6) + end = min(len(tokens), idx + 7) + around = " ".join(tokens[start:end]) + + caps = re.findall(r"\b[A-Z][a-z]+(?:\s+[A-Z][a-z]+){0,4}\b", raw) + caps = [c.strip() for c in caps if c.strip()] + + return { + "price": price, + "offer_terms": sorted(set(offer_terms)), + "around_price": around, + "caps_phrases": caps[:5], + } + + +def _extract_text_from_export_msg(msg: dict[str, Any]) -> str | None: + for key in ("content", "text", "message"): + v = msg.get(key) + if isinstance(v, str) and v.strip(): + return v + + share = msg.get("share") + if isinstance(share, dict): + parts: list[str] = [] + for k in ("link", "share_text", "original_content_owner"): + v = share.get(k) + if isinstance(v, str) and v.strip(): + parts.append(v.strip()) + if parts: + return " | ".join(parts) + + return None + + +def _participants_list(obj: Any) -> list[str]: + out: list[str] = [] + if not isinstance(obj, list): + return out + for item in obj: + if isinstance(item, dict): + for k in ("name", "username"): + v = item.get(k) + if isinstance(v, str) and v.strip(): + out.append(v.strip()) + break + elif isinstance(item, str) and item.strip(): + out.append(item.strip()) + seen: set[str] = set() + unique: list[str] = [] + for name in out: + if name in seen: + continue + unique.append(name) + seen.add(name) + return unique + + +@dataclass(frozen=True) +class MessageEvent: + timestamp_ms: int + sender: str + text: str | None + is_owner: bool + + +@dataclass(frozen=True) +class Conversation: + conversation_id: str + title: str | None + participants: list[str] + messages: list[MessageEvent] + + +SourceKind = Literal["raw_export", "jsonl_history"] + + +@dataclass(frozen=True) +class ConversationSource: + kind: SourceKind + path: Path + message_files: tuple[Path, ...] = () + bucket: str = "inbox" + + +def _resolve_export_roots(input_path: Path) -> list[tuple[str, Path]]: + p = input_path + if p.is_dir() and p.name.lower() in {"inbox", "message_requests"}: + return [(p.name.lower(), p)] + + messages_dir = p / "messages" + if messages_dir.is_dir(): + inbox = messages_dir / "inbox" + req = messages_dir / "message_requests" + roots: list[tuple[str, Path]] = [] + if inbox.is_dir(): + roots.append(("inbox", inbox)) + if req.is_dir(): + roots.append(("message_requests", req)) + if roots: + return roots + + raise FileNotFoundError(f"Could not find messages/(inbox|message_requests) under: {p}") + + +def discover_sources( + *, + input_path: Path, + include_message_requests: bool, + index_path: Path | None, + min_ts_ms: int | None, + max_conversations: int | None, +) -> tuple[list[ConversationSource], dict[str, Any]]: + if (input_path / "messages").is_dir() and list((input_path / "messages").glob("*.jsonl")): + msg_dir = input_path / "messages" + sources = [ + ConversationSource(kind="jsonl_history", path=p, bucket="messages") + for p in sorted(msg_dir.glob("*.jsonl"), key=lambda x: x.name) + ] + if max_conversations is not None: + sources = sources[:max_conversations] + return sources, {"mode": "jsonl_history", "messages_dir": str(msg_dir), "sources": len(sources)} + + roots = _resolve_export_roots(input_path) + filtered_roots = [] + for name, root in roots: + if name == "message_requests" and not include_message_requests: + continue + filtered_roots.append((name, root)) + + index_latest: dict[str, int] = {} + if index_path: + try: + with index_path.open("r", encoding="utf-8") as f: + for line in f: + try: + row = json.loads(line) + except Exception: + continue + conv_dir = row.get("conv_dir") + ts = row.get("latest_timestamp_ms") + if isinstance(conv_dir, str) and isinstance(ts, int): + index_latest[conv_dir] = ts + except FileNotFoundError: + pass + + sources: list[ConversationSource] = [] + + def _fast_latest_ts_ms(conv_dir: Path) -> int | None: + msg1 = conv_dir / "message_1.json" + if not msg1.exists(): + candidates = sorted(conv_dir.glob("message*.json"), key=lambda p: p.name) + msg1 = candidates[0] if candidates else msg1 + if not msg1.exists(): + return None + try: + head = msg1.read_bytes()[: 64 * 1024] + except Exception: + return None + m = _RE_EXPORT_TS_BYTES.search(head) + if not m: + return None + try: + return int(m.group(1)) + except Exception: + return None + + for bucket, root in filtered_roots: + if index_latest and bucket == "inbox": + conv_names = sorted(index_latest.keys()) + if min_ts_ms is not None: + conv_names = [c for c in conv_names if index_latest.get(c, 0) >= min_ts_ms] + for name in conv_names: + conv_dir = root / name + if conv_dir.is_dir(): + parts = tuple(sorted(conv_dir.glob("message*.json"), key=lambda p: p.name)) + if parts: + sources.append(ConversationSource(kind="raw_export", path=conv_dir, message_files=parts, bucket=bucket)) + if max_conversations is not None and len(sources) >= max_conversations: + break + continue + + for conv_dir in sorted(root.iterdir(), key=lambda p: p.name): + if not conv_dir.is_dir(): + continue + if min_ts_ms is not None: + ts = index_latest.get(conv_dir.name) + if ts is None: + ts = _fast_latest_ts_ms(conv_dir) + if ts is not None and ts < min_ts_ms: + continue + parts = tuple(sorted(conv_dir.glob("message*.json"), key=lambda p: p.name)) + if not parts: + continue + sources.append(ConversationSource(kind="raw_export", path=conv_dir, message_files=parts, bucket=bucket)) + if max_conversations is not None and len(sources) >= max_conversations: + break + + meta = { + "mode": "raw_export", + "roots": [{"bucket": b, "path": str(p)} for b, p in filtered_roots], + "sources": len(sources), + "index_used": bool(index_latest), + "index_path": str(index_path) if index_path else None, + } + return sources, meta + + +def load_conversation(source: ConversationSource, *, owner_name: str | None) -> Conversation | None: + if source.kind == "jsonl_history": + msgs: list[MessageEvent] = [] + conv_id: str | None = None + title: str | None = None + participants: list[str] = [] + try: + with source.path.open("r", encoding="utf-8") as f: + for line in f: + try: + row = json.loads(line) + except Exception: + continue + if conv_id is None and isinstance(row.get("conversation_id"), str): + conv_id = row.get("conversation_id") + if title is None and isinstance(row.get("conversation_title"), str): + title = row.get("conversation_title") + if not participants and isinstance(row.get("participants"), list): + participants = [str(x) for x in row.get("participants") if isinstance(x, str)] + ts_ms = row.get("timestamp_ms") + sender = row.get("sender") + text = row.get("text") + if not isinstance(ts_ms, int) or not isinstance(sender, str): + continue + is_owner = bool(owner_name and sender == owner_name) + msgs.append( + MessageEvent( + timestamp_ms=ts_ms, + sender=sender, + text=(text if isinstance(text, str) else None), + is_owner=is_owner, + ) + ) + except FileNotFoundError: + return None + + if not msgs: + return None + msgs.sort(key=lambda m: m.timestamp_ms) + conv_id = conv_id or source.path.stem + return Conversation(conversation_id=conv_id, title=title, participants=participants, messages=msgs) + + parts: list[dict[str, Any]] = [] + for p in source.message_files: + try: + parts.append(json.loads(p.read_text(encoding="utf-8", errors="replace"))) + except Exception: + continue + if not parts: + return None + + title = parts[0].get("title") if isinstance(parts[0].get("title"), str) else None + conv_id = None + for key in ("thread_path", "threadPath", "thread_id", "threadId"): + v = parts[0].get(key) + if isinstance(v, str) and v.strip(): + conv_id = v.strip() + break + if not conv_id: + conv_id = f"{source.bucket}/{source.path.name}" + + participants = _participants_list(parts[0].get("participants")) + messages: list[dict[str, Any]] = [] + for part in parts: + msgs = part.get("messages") + if isinstance(msgs, list): + for m in msgs: + if isinstance(m, dict): + messages.append(m) + + def _key(m: dict[str, Any]) -> int: + ts = m.get("timestamp_ms") + return int(ts) if isinstance(ts, (int, float)) else 0 + + messages.sort(key=_key) + out_msgs: list[MessageEvent] = [] + for m in messages: + ts = m.get("timestamp_ms") + sender = m.get("sender_name") + if not isinstance(ts, int) or not isinstance(sender, str) or not sender.strip(): + continue + text = _extract_text_from_export_msg(m) + sender = sender.strip() + is_owner = bool(owner_name and sender == owner_name) + out_msgs.append(MessageEvent(timestamp_ms=ts, sender=sender, text=text, is_owner=is_owner)) + + if not out_msgs: + return None + + return Conversation(conversation_id=conv_id, title=title, participants=participants, messages=out_msgs) + + +@dataclass +class TemplateStats: + count: int = 0 + sample: str | None = None + first_ts_ms: int | None = None + last_ts_ms: int | None = None + + def add(self, ts_ms: int, raw: str) -> None: + self.count += 1 + if self.first_ts_ms is None or ts_ms < self.first_ts_ms: + self.first_ts_ms = ts_ms + if self.last_ts_ms is None or ts_ms > self.last_ts_ms: + self.last_ts_ms = ts_ms + if self.sample is None and raw.strip(): + self.sample = raw.strip()[:400] + + +@dataclass(frozen=True) +class AnalysisConfig: + owner_name: str | None + min_ts_ms: int | None + max_ts_ms: int | None + bot_min_count: int + manual_max_count: int + hybrid_similarity: float + ghost_hours: float + reply_window_hours: float + redact: bool + include_hybrid_in_pairs: bool + max_training_pairs: int + max_rescue_events: int + top_bot_templates: int + progress_every: int + + +def infer_owner_name(sources: list[ConversationSource], *, progress_every: int) -> str | None: + sender_msg_counts: Counter[str] = Counter() + sender_conv_counts: Counter[str] = Counter() + started = time.time() + for idx, src in enumerate(sources, 1): + conv = load_conversation(src, owner_name=None) + if not conv: + continue + seen = set() + for m in conv.messages: + sender_msg_counts[m.sender] += 1 + seen.add(m.sender) + for s in seen: + sender_conv_counts[s] += 1 + if progress_every and idx % progress_every == 0: + elapsed = int(time.time() - started) + print(f"infer_owner scanned={idx}/{len(sources)} elapsed_s={elapsed}", file=sys.stderr, flush=True) + if not sender_conv_counts: + return None + best = max(sender_conv_counts.items(), key=lambda kv: (kv[1], sender_msg_counts.get(kv[0], 0)))[0] + return best + + +def _label_outgoing( + canonical: str, + *, + bot_set: set[str], + template_counts: dict[str, int], + manual_max_count: int, + bot_prefixes: set[str], + top_bot_canons: list[str], + hybrid_similarity: float, +) -> Literal["bot", "manual", "hybrid", "other"]: + if canonical in bot_set: + return "bot" + count = template_counts.get(canonical) + if isinstance(count, int) and count <= manual_max_count: + return "manual" + toks = canonical.split() + prefix = " ".join(toks[:6]) if toks else canonical[:40] + if prefix in bot_prefixes: + return "hybrid" + for b in top_bot_canons: + if abs(len(b) - len(canonical)) > 120: + continue + r = difflib.SequenceMatcher(a=b, b=canonical).ratio() + if r >= hybrid_similarity: + return "hybrid" + return "other" + + +def _is_objection(text: str) -> str | None: + s = (text or "").lower() + if not s: + return None + if any(k in s for k in ("expensive", "too much", "price", "how much", "cost", "caro", "precio", "cuanto", "cuánto")): + return "price" + if any(k in s for k in ("later", "not now", "busy", "time", "tomorrow", "luego", "mañana", "ocupado", "ahora no")): + return "time" + if any(k in s for k in ("scam", "real?", "is this real", "legit", "trust", "estafa", "real", "verdad")): + return "trust" + if any(k in s for k in ("stop", "unsubscribe", "leave me", "no thanks", "no gracias")): + return "stop" + return None + + +def _conversion_signal(text: str, *, is_owner: bool) -> Literal["none", "intent", "confirmed"]: + s = (text or "").lower() + if not s: + return "none" + + if is_owner: + if any(k in s for k in ("stripe.com", "calendly.com", "paypal.me", "checkout", "invoice")): + return "confirmed" + if any(k in s for k in ("stripe", "calendly", "paypal", "payment link", "pay link")): + return "intent" + return "none" + + if any(k in s for k in ("i paid", "paid", "payed", "i bought", "bought", "purchased", "done", "sent it", "ya pag", "pagado", "compr")): + return "confirmed" + if any(k in s for k in ("link", "price", "how much", "send", "ready", "book", "calendly", "stripe", "paypal", "precio", "cuanto", "envia", "enviar", "listo")): + return "intent" + return "none" + + +def _safe_chmod_600(path: Path) -> None: + try: + os.chmod(path, 0o600) + except Exception: + return + + +def _write_json(path: Path, obj: Any) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text(json.dumps(obj, indent=2, ensure_ascii=False) + "\n", encoding="utf-8") + _safe_chmod_600(path) + + +def _write_text(path: Path, text: str) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text(text, encoding="utf-8") + _safe_chmod_600(path) + + +def _chart_line( + *, + path: Path, + title: str, + series: list[tuple[str, float]], + width: int = 1000, + height: int = 420, +) -> None: + c = Canvas.new(width, height, bg=(255, 255, 255)) + pad_l, pad_r, pad_t, pad_b = 60, 20, 20, 50 + plot_w = width - pad_l - pad_r + plot_h = height - pad_t - pad_b + c.draw_rect(pad_l, pad_t, pad_l + plot_w, pad_t + plot_h, (250, 250, 250)) + c.draw_line(pad_l, pad_t, pad_l, pad_t + plot_h, (20, 20, 20)) + c.draw_line(pad_l, pad_t + plot_h, pad_l + plot_w, pad_t + plot_h, (20, 20, 20)) + + if not series: + c.save(str(path)) + _safe_chmod_600(path) + return + + values = [v for _, v in series] + vmin = min(values) + vmax = max(values) + if vmax <= vmin: + vmax = vmin + 1e-9 + + pts: list[tuple[int, int]] = [] + for i, (_k, v) in enumerate(series): + x = pad_l + int(i * (plot_w / max(1, len(series) - 1))) + norm = (v - vmin) / (vmax - vmin) + y = pad_t + plot_h - int(norm * plot_h) + pts.append((x, y)) + + for n in range(1, 5): + y = pad_t + int(n * plot_h / 5) + c.draw_line(pad_l, y, pad_l + plot_w, y, (230, 230, 230)) + + c.draw_polyline(pts, (24, 90, 200)) + for x, y in pts: + c.draw_rect(x - 1, y - 1, x + 1, y + 1, (24, 90, 200)) + + c.save(str(path)) + _safe_chmod_600(path) + + +def _chart_multiline_with_bars( + *, + path: Path, + keys: list[str], + lines: dict[str, list[int]], + bars: list[int], + width: int = 1200, + height: int = 520, +) -> None: + c = Canvas.new(width, height, bg=(255, 255, 255)) + pad_l, pad_r, pad_t, pad_b = 70, 20, 20, 60 + plot_w = width - pad_l - pad_r + plot_h = height - pad_t - pad_b + c.draw_rect(pad_l, pad_t, pad_l + plot_w, pad_t + plot_h, (250, 250, 250)) + c.draw_line(pad_l, pad_t, pad_l, pad_t + plot_h, (20, 20, 20)) + c.draw_line(pad_l, pad_t + plot_h, pad_l + plot_w, pad_t + plot_h, (20, 20, 20)) + + if not keys: + c.save(str(path)) + _safe_chmod_600(path) + return + + max_line = 0 + for values in lines.values(): + if values: + max_line = max(max_line, max(values)) + max_bar = max(bars) if bars else 0 + ymax = max(1, max(max_line, max_bar)) + + n = len(keys) + dx = plot_w / max(1, n) + for i in range(n): + x0 = int(pad_l + i * dx) + x1 = int(pad_l + (i + 1) * dx) - 1 + h = int((bars[i] / ymax) * plot_h) if i < len(bars) else 0 + if h > 0: + y0 = pad_t + plot_h - h + y1 = pad_t + plot_h - 1 + for x in range(x0, x1 + 1): + for y in range(y0, y1 + 1): + c.blend_pixel(x, y, (200, 80, 60), alpha=0.18) + + palette = [ + (24, 90, 200), + (30, 160, 90), + (160, 90, 200), + (200, 150, 40), + (80, 170, 180), + ] + for idx, (name, values) in enumerate(sorted(lines.items(), key=lambda kv: kv[0])): + if not values: + continue + color = palette[idx % len(palette)] + pts: list[tuple[int, int]] = [] + for i, v in enumerate(values): + x = pad_l + int((i + 0.5) * dx) + y = pad_t + plot_h - int((v / ymax) * plot_h) + pts.append((x, y)) + c.draw_polyline(pts, color) + + c.save(str(path)) + _safe_chmod_600(path) + + +def analyze( + *, + sources: list[ConversationSource], + out_dir: Path, + cfg: AnalysisConfig, + sources_meta: dict[str, Any], + templates_cache: Path | None, + stage: Literal["pass1", "pass2", "all"], +) -> dict[str, Any]: + out_dir.mkdir(parents=True, exist_ok=True) + + templates_path = templates_cache or (out_dir / "top_outgoing_templates.json") + counts_path = out_dir / "template_counts.jsonl" + + template_stats: dict[str, TemplateStats] = {} + sender_counts: Counter[str] = Counter() + sender_conv_counts: Counter[str] = Counter() + + def iter_messages_for_pass(owner_name: str | None) -> Iterable[tuple[str, MessageEvent]]: + for src in sources: + conv = load_conversation(src, owner_name=owner_name) + if not conv: + continue + for m in conv.messages: + yield conv.conversation_id, m + + owner_name = cfg.owner_name + template_counts: dict[str, int] | None = None + top_templates: list[dict[str, Any]] = [] + top_bot_canons: list[str] = [] + + if stage in {"pass1", "all"}: + started = time.time() + if not owner_name: + owner_name = infer_owner_name(sources, progress_every=cfg.progress_every) + if not owner_name: + raise RuntimeError("Could not infer owner_name; pass --owner-name") + + for idx, src in enumerate(sources, 1): + conv = load_conversation(src, owner_name=owner_name) + if not conv: + continue + seen_senders = set() + for m in conv.messages: + if cfg.min_ts_ms is not None and m.timestamp_ms < cfg.min_ts_ms: + continue + if cfg.max_ts_ms is not None and m.timestamp_ms > cfg.max_ts_ms: + continue + seen_senders.add(m.sender) + sender_counts[m.sender] += 1 + if m.is_owner and isinstance(m.text, str) and m.text.strip(): + canon = canonicalize_text(m.text) + if not canon: + continue + st = template_stats.get(canon) + if not st: + st = TemplateStats() + template_stats[canon] = st + st.add(m.timestamp_ms, m.text) + for s in seen_senders: + sender_conv_counts[s] += 1 + + if cfg.progress_every and idx % cfg.progress_every == 0: + elapsed = int(time.time() - started) + print(f"pass1 scanned={idx}/{len(sources)} templates={len(template_stats)} elapsed_s={elapsed}", file=sys.stderr, flush=True) + + sorted_templates = sorted(template_stats.items(), key=lambda kv: kv[1].count, reverse=True) + bot_set = {k for k, st in template_stats.items() if st.count >= cfg.bot_min_count} + top = [] + for canon, st in sorted_templates[: max(50, cfg.top_bot_templates)]: + sample = st.sample + if cfg.redact and sample: + sample = redact_text(sample) + top.append( + { + "canonical": canon, + "count": st.count, + "first_seen": _iso_from_ts_ms(st.first_ts_ms), + "last_seen": _iso_from_ts_ms(st.last_ts_ms), + "sample": sample, + "label_hint": ("bot" if canon in bot_set else "manual" if st.count <= cfg.manual_max_count else "other"), + } + ) + + template_counts = {canon: st.count for canon, st in template_stats.items()} + with counts_path.open("w", encoding="utf-8") as f: + for canon, st in sorted_templates: + f.write( + json.dumps( + { + "canonical": canon, + "count": st.count, + "first_seen": _iso_from_ts_ms(st.first_ts_ms), + "last_seen": _iso_from_ts_ms(st.last_ts_ms), + }, + ensure_ascii=False, + ) + + "\n" + ) + _safe_chmod_600(counts_path) + + templates_payload = { + "generated_at": _now_utc_iso(), + "owner_name": owner_name, + "config": { + "bot_min_count": cfg.bot_min_count, + "manual_max_count": cfg.manual_max_count, + }, + "counts_path": str(counts_path), + "templates_total": len(template_stats), + "bot_templates": len(bot_set), + "manual_templates": sum(1 for _, st in template_stats.items() if st.count <= cfg.manual_max_count), + "top_templates": top, + } + _write_json(templates_path, templates_payload) + + if stage == "pass1": + return { + "stage": "pass1", + "out_dir": str(out_dir), + "templates_path": str(templates_path), + "owner_name": owner_name, + "sources": sources_meta, + "templates_total": len(template_stats), + } + + if template_counts is None and not templates_path.exists(): + raise FileNotFoundError(f"Missing templates cache: {templates_path} (run with --stage pass1 or all)") + + templates_payload = {} + if templates_path.exists(): + templates_payload = json.loads(templates_path.read_text(encoding="utf-8", errors="replace")) + if isinstance(templates_payload, dict): + for entry in templates_payload.get("top_templates") or []: + if isinstance(entry, dict): + top_templates.append(entry) + + if stage in {"pass2", "all"}: + if stage == "pass2" and cfg.progress_every: + print("pass2 starting (templates loaded from cache)", file=sys.stderr, flush=True) + + owner_name = owner_name or (templates_payload.get("owner_name") if isinstance(templates_payload, dict) else None) + if not isinstance(owner_name, str) or not owner_name.strip(): + raise RuntimeError("owner_name missing (use --owner-name or re-run pass1)") + owner_name = owner_name.strip() + + bot_min_count = cfg.bot_min_count + manual_max_count = cfg.manual_max_count + counts_path_str = None + if isinstance(templates_payload.get("config"), dict): + bot_min_count = int(templates_payload["config"].get("bot_min_count", bot_min_count)) + manual_max_count = int(templates_payload["config"].get("manual_max_count", manual_max_count)) + if isinstance(templates_payload.get("counts_path"), str): + counts_path_str = templates_payload.get("counts_path") + + if template_counts is None: + if not counts_path_str: + raise RuntimeError("templates cache missing counts_path; re-run pass1") + cp = Path(counts_path_str) + template_counts = {} + with cp.open("r", encoding="utf-8") as f: + for line in f: + try: + row = json.loads(line) + except Exception: + continue + canon = row.get("canonical") + cnt = row.get("count") + if isinstance(canon, str) and isinstance(cnt, int): + template_counts[canon] = cnt + + bot_set = {canon for canon, cnt in template_counts.items() if cnt >= bot_min_count} + + top_bot_canons = [ + e.get("canonical") + for e in top_templates + if e.get("label_hint") == "bot" and isinstance(e.get("canonical"), str) + ] + if not top_bot_canons and template_counts: + top_bot_canons = [canon for canon, _cnt in sorted(template_counts.items(), key=lambda kv: kv[1], reverse=True)[: max(20, cfg.top_bot_templates)]] + top_bot_canons = [c for c in top_bot_canons if isinstance(c, str)] + top_bot_canons = top_bot_canons[: max(20, cfg.top_bot_templates)] + main_bot = top_bot_canons[0] if top_bot_canons else None + + bot_prefixes: set[str] = set() + for canon in top_bot_canons: + toks = canon.split() + bot_prefixes.add(" ".join(toks[:6]) if toks else canon[:40]) + + conv_total = 0 + conv_converted_intent = 0 + conv_converted_confirmed = 0 + conv_bot_only = 0 + conv_human_intervened = 0 + outgoing_stats: dict[str, dict[str, int]] = defaultdict(lambda: {"sent": 0, "replied": 0, "converted_intent": 0, "converted_confirmed": 0}) + + fatigue: dict[str, dict[str, int]] = defaultdict(lambda: {"sent": 0, "replied": 0}) + editorial: dict[str, dict[str, int]] = defaultdict(lambda: defaultdict(int)) + conversions_by_week: Counter[str] = Counter() + + objection_reply_counts: dict[str, Counter[str]] = defaultdict(Counter) + rescue_events: list[dict[str, Any]] = [] + + vocab_by_quarter: dict[str, Counter[str]] = defaultdict(Counter) + offers_by_quarter: dict[str, Counter[str]] = defaultdict(Counter) + user_sentiment_by_quarter: dict[str, list[int]] = defaultdict(list) + + manual_len_sample: list[int] = [] + manual_len_sample_cap = 200_000 + manual_question = 0 + manual_exclaim = 0 + manual_emoji = 0 + manual_total = 0 + manual_lang: Counter[str] = Counter() + + training_pairs_path = out_dir / "training_pairs.jsonl" + rescue_path = out_dir / "rescue_playbook.json" + bot_audit_path = out_dir / "bot_performance_audit.csv" + sergio_eras_path = out_dir / "sergio_eras.csv" + objections_path = out_dir / "objection_handlers.json" + summary_path = out_dir / "summary.json" + exec_path = out_dir / "executive_summary.md" + fatigue_png = out_dir / "bot_fatigue_chart.png" + editorial_png = out_dir / "editorial_timeline.png" + + tp_count = 0 + + training_pairs_path.parent.mkdir(parents=True, exist_ok=True) + with training_pairs_path.open("w", encoding="utf-8") as tp_f: + started = time.time() + for idx, src in enumerate(sources, 1): + conv = load_conversation(src, owner_name=owner_name) + if not conv: + continue + + events = [ + m + for m in conv.messages + if (cfg.min_ts_ms is None or m.timestamp_ms >= cfg.min_ts_ms) + and (cfg.max_ts_ms is None or m.timestamp_ms <= cfg.max_ts_ms) + ] + if not events: + continue + + conv_total += 1 + + conversion_intent_ts: int | None = None + conversion_confirmed_ts: int | None = None + for m in events: + if not isinstance(m.text, str) or not m.text.strip(): + continue + sig = _conversion_signal(m.text, is_owner=m.is_owner) + if sig != "none": + conversion_intent_ts = m.timestamp_ms if conversion_intent_ts is None else min(conversion_intent_ts, m.timestamp_ms) + if sig == "confirmed": + conversion_confirmed_ts = m.timestamp_ms if conversion_confirmed_ts is None else min(conversion_confirmed_ts, m.timestamp_ms) + + if conversion_intent_ts is not None: + conv_converted_intent += 1 + conversions_by_week[_week_key(conversion_intent_ts)] += 1 + if conversion_confirmed_ts is not None: + conv_converted_confirmed += 1 + + has_bot = False + has_manual_like = False + + # Precompute outgoing labels + canon. + labeled: list[tuple[MessageEvent, str, str]] = [] + for m in events: + if not m.is_owner or not isinstance(m.text, str) or not m.text.strip(): + labeled.append((m, "", "")) + continue + canon = canonicalize_text(m.text) + label = _label_outgoing( + canon, + bot_set=bot_set, + template_counts=template_counts, + manual_max_count=manual_max_count, + bot_prefixes=bot_prefixes, + top_bot_canons=top_bot_canons, + hybrid_similarity=cfg.hybrid_similarity, + ) + if label == "bot": + has_bot = True + if label in {"manual", "hybrid"}: + has_manual_like = True + labeled.append((m, canon, label)) + + if has_bot and not has_manual_like: + conv_bot_only += 1 + if has_manual_like: + conv_human_intervened += 1 + + # Pair objections to next manual reply; produce training pairs. + for i, m in enumerate(events): + if m.is_owner or not isinstance(m.text, str) or not m.text.strip(): + continue + obj = _is_objection(m.text) + if obj: + for j in range(i + 1, min(len(events), i + 8)): + nxt = events[j] + if not nxt.is_owner or not isinstance(nxt.text, str) or not nxt.text.strip(): + continue + canon = canonicalize_text(nxt.text) + label = _label_outgoing( + canon, + bot_set=bot_set, + template_counts=template_counts, + manual_max_count=manual_max_count, + bot_prefixes=bot_prefixes, + top_bot_canons=top_bot_canons, + hybrid_similarity=cfg.hybrid_similarity, + ) + if label == "manual" or (cfg.include_hybrid_in_pairs and label == "hybrid"): + rep = nxt.text.strip() + rep = redact_text(rep) if cfg.redact else rep + objection_reply_counts[obj][rep] += 1 + break + + # Training pairs: user message + next owner manual reply, only in converted threads. + if conversion_intent_ts is None: + continue + for j in range(i + 1, min(len(events), i + 6)): + nxt = events[j] + if not nxt.is_owner or not isinstance(nxt.text, str) or not nxt.text.strip(): + continue + canon = canonicalize_text(nxt.text) + label = _label_outgoing( + canon, + bot_set=bot_set, + template_counts=template_counts, + manual_max_count=manual_max_count, + bot_prefixes=bot_prefixes, + top_bot_canons=top_bot_canons, + hybrid_similarity=cfg.hybrid_similarity, + ) + if label == "manual" or (cfg.include_hybrid_in_pairs and label == "hybrid"): + prompt = m.text.strip() + completion = nxt.text.strip() + if cfg.redact: + prompt = redact_text(prompt) + completion = redact_text(completion) + if prompt and completion and tp_count < cfg.max_training_pairs: + tp_f.write( + json.dumps( + { + "prompt": prompt, + "completion": completion, + "conversation_id": conv.conversation_id, + "user_sender": m.sender, + "timestamp_ms": nxt.timestamp_ms, + }, + ensure_ascii=False, + ) + + "\n" + ) + tp_count += 1 + break + + # Per-message stats (reply rates, fatigue, eras). + last_owner_idx: int | None = None + last_owner_ts: int | None = None + for i, (m, canon, label) in enumerate(labeled): + if not m.is_owner or not isinstance(m.text, str) or not m.text.strip(): + if not m.is_owner and last_owner_idx is not None: + dt_h = (m.timestamp_ms - (last_owner_ts or m.timestamp_ms)) / 1000.0 / 3600.0 + if dt_h <= cfg.reply_window_hours: + prev_m, prev_canon, prev_label = labeled[last_owner_idx] + if prev_label in {"bot", "manual", "hybrid", "other"} and prev_canon: + if prev_canon in bot_set: + outgoing_stats[prev_canon]["replied"] += 1 + if prev_canon == main_bot: + fatigue[_week_key(prev_m.timestamp_ms)]["replied"] += 1 + last_owner_idx = None + last_owner_ts = None + if not m.is_owner and isinstance(m.text, str) and m.text.strip(): + user_sentiment_by_quarter[_quarter_key(m.timestamp_ms)].append(sentiment_score(m.text)) + continue + + if canon: + qk = _quarter_key(m.timestamp_ms) + if canon in bot_set: + outgoing_stats[canon]["sent"] += 1 + if conversion_intent_ts is not None and m.timestamp_ms <= conversion_intent_ts: + outgoing_stats[canon]["converted_intent"] += 1 + if conversion_confirmed_ts is not None and m.timestamp_ms <= conversion_confirmed_ts: + outgoing_stats[canon]["converted_confirmed"] += 1 + + if canon == main_bot: + wk = _week_key(m.timestamp_ms) + fatigue[wk]["sent"] += 1 + + if canon in top_bot_canons: + editorial[_week_key(m.timestamp_ms)][canon] += 1 + + offer = extract_offer_signals(m.text or "") + if offer: + if offer.get("price"): + offers_by_quarter[qk][str(offer["price"])] += 1 + for term in offer.get("offer_terms") or []: + offers_by_quarter[qk][term] += 1 + if offer.get("around_price"): + offers_by_quarter[qk][str(offer["around_price"])[:120]] += 1 + + if label == "manual" or label == "hybrid": + manual_total += 1 + manual_question += 1 if "?" in (m.text or "") else 0 + manual_exclaim += 1 if "!" in (m.text or "") else 0 + manual_emoji += 1 if message_has_emoji(m.text or "") else 0 + manual_lang[guess_lang(m.text or "")] += 1 + if len(manual_len_sample) < manual_len_sample_cap: + manual_len_sample.append(len(m.text or "")) + + qk = qk if canon else _quarter_key(m.timestamp_ms) + for t in tokenize(m.text or ""): + if t in EN_STOPWORDS or t in ES_STOPWORDS: + continue + if t.startswith("<") and t.endswith(">"): + continue + if len(t) < 2: + continue + vocab_by_quarter[qk][t] += 1 + + last_owner_idx = i + last_owner_ts = m.timestamp_ms + + # Rescue logic: manual message after ghost or negative, following bot, then conversion. + if conversion_confirmed_ts is not None and len(rescue_events) < cfg.max_rescue_events: + for i in range(1, len(events)): + cur = events[i] + prev = events[i - 1] + if not cur.is_owner or not isinstance(cur.text, str) or not cur.text.strip(): + continue + canon = canonicalize_text(cur.text) + label = _label_outgoing( + canon, + bot_set=bot_set, + template_counts=template_counts, + manual_max_count=manual_max_count, + bot_prefixes=bot_prefixes, + top_bot_canons=top_bot_canons, + hybrid_similarity=cfg.hybrid_similarity, + ) + if label not in {"manual", "hybrid"}: + continue + + gap_h = (cur.timestamp_ms - prev.timestamp_ms) / 1000.0 / 3600.0 + prev_sig = _conversion_signal(prev.text or "", is_owner=prev.is_owner) + if prev_sig == "confirmed": + continue + prev_negative = sentiment_score(prev.text or "") < 0 + prev_label = labeled[i - 1][2] if i - 1 < len(labeled) else "" + ghost_gap = gap_h >= cfg.ghost_hours and prev.is_owner and prev_label == "bot" + if not (ghost_gap or prev_negative): + continue + + had_bot_before = any(lbl == "bot" for _m, _canon, lbl in labeled[:i] if lbl) + if not had_bot_before: + continue + if cur.timestamp_ms > conversion_confirmed_ts: + continue + + next_owner_reply = None + for j in range(i + 1, min(len(events), i + 6)): + e = events[j] + if e.is_owner and isinstance(e.text, str) and e.text.strip(): + next_owner_reply = e.text.strip() + break + + user_trigger = prev.text.strip() if isinstance(prev.text, str) else "" + rescue_msg = cur.text.strip() + if cfg.redact: + user_trigger = redact_text(user_trigger) + rescue_msg = redact_text(rescue_msg) + + rescue_events.append( + { + "conversation_id": conv.conversation_id, + "timestamp_ms": cur.timestamp_ms, + "gap_hours": round(gap_h, 2), + "trigger_user_message": user_trigger[:500], + "rescue_message": rescue_msg[:500], + "followup_owner_message": (redact_text(next_owner_reply) if (cfg.redact and next_owner_reply) else next_owner_reply), + "conversion_time": _iso_from_ts_ms(conversion_confirmed_ts), + } + ) + break + + if cfg.progress_every and idx % cfg.progress_every == 0: + elapsed = int(time.time() - started) + print( + f"pass2 scanned={idx}/{len(sources)} conv={conv_total} tp={tp_count} rescue={len(rescue_events)} elapsed_s={elapsed}", + file=sys.stderr, + flush=True, + ) + + _safe_chmod_600(training_pairs_path) + + # Build bot fatigue series (weekly reply rate). + fatigue_series: list[tuple[str, float]] = [] + for wk in sorted(fatigue.keys()): + sent = fatigue[wk]["sent"] + replied = fatigue[wk]["replied"] + fatigue_series.append((wk, (replied / sent) if sent else 0.0)) + _chart_line(path=fatigue_png, title="Bot fatigue", series=fatigue_series) + + # Editorial timeline: top bot templates weekly counts + conversions. + weekly_keys = sorted(set(editorial.keys()) | set(conversions_by_week.keys())) + top_lines: dict[str, list[int]] = {} + for canon in top_bot_canons[: cfg.top_bot_templates]: + top_lines[canon] = [int(editorial.get(wk, {}).get(canon, 0)) for wk in weekly_keys] + bars = [int(conversions_by_week.get(wk, 0)) for wk in weekly_keys] + _chart_multiline_with_bars(path=editorial_png, keys=weekly_keys, lines=top_lines, bars=bars) + + # Objection handlers. + objections_out: dict[str, list[dict[str, Any]]] = {} + for k, counter in objection_reply_counts.items(): + top_replies = [{"reply": r, "count": c} for r, c in counter.most_common(25)] + objections_out[k] = top_replies + _write_json(objections_path, objections_out) + + # Rescue playbook. + _write_json(rescue_path, rescue_events) + + # Bot performance audit. + with bot_audit_path.open("w", encoding="utf-8", newline="") as f: + w = csv.writer(f) + w.writerow( + [ + "canonical_template", + "sent", + "reply_rate", + "conversion_intent_rate", + "conversion_confirmed_rate", + ] + ) + for canon, st in sorted(outgoing_stats.items(), key=lambda kv: kv[1]["sent"], reverse=True): + if canon not in bot_set: + continue + sent = st["sent"] + if sent <= 0: + continue + w.writerow( + [ + canon, + sent, + round(st["replied"] / sent, 4), + round(st["converted_intent"] / sent, 4), + round(st["converted_confirmed"] / sent, 4), + ] + ) + _safe_chmod_600(bot_audit_path) + + # Sergio eras CSV (quarterly). + quarters = sorted(set(vocab_by_quarter.keys()) | set(offers_by_quarter.keys()) | set(user_sentiment_by_quarter.keys())) + with sergio_eras_path.open("w", encoding="utf-8", newline="") as f: + w = csv.writer(f) + w.writerow(["quarter", "top_offers", "top_terms_manual", "avg_user_sentiment"]) + for q in quarters: + offers = "; ".join([f"{k}({c})" for k, c in offers_by_quarter[q].most_common(10)]) + terms = "; ".join([f"{k}({c})" for k, c in vocab_by_quarter[q].most_common(15)]) + sents = user_sentiment_by_quarter.get(q) or [] + avg_sent = round(sum(sents) / len(sents), 3) if sents else "" + w.writerow([q, offers, terms, avg_sent]) + _safe_chmod_600(sergio_eras_path) + + # Summary stats. + manual_len_median = statistics.median(manual_len_sample) if manual_len_sample else None + manual_len_p90 = ( + statistics.quantiles(manual_len_sample, n=10)[-1] if len(manual_len_sample) >= 20 else None + ) + bot_only_rate = (conv_bot_only / conv_total) if conv_total else 0.0 + human_rate = (conv_human_intervened / conv_total) if conv_total else 0.0 + conv_intent_rate = (conv_converted_intent / conv_total) if conv_total else 0.0 + conv_confirmed_rate = (conv_converted_confirmed / conv_total) if conv_total else 0.0 + + summary = { + "generated_at": _now_utc_iso(), + "owner_name": owner_name, + "sources": sources_meta, + "config": { + "min_ts_ms": cfg.min_ts_ms, + "max_ts_ms": cfg.max_ts_ms, + "bot_min_count": cfg.bot_min_count, + "manual_max_count": cfg.manual_max_count, + "ghost_hours": cfg.ghost_hours, + "reply_window_hours": cfg.reply_window_hours, + "redact": cfg.redact, + }, + "conversations": { + "total": conv_total, + "bot_only": conv_bot_only, + "human_intervened": conv_human_intervened, + "bot_only_rate": round(bot_only_rate, 4), + "human_intervened_rate": round(human_rate, 4), + }, + "conversions": { + "intent": conv_converted_intent, + "confirmed": conv_converted_confirmed, + "intent_rate": round(conv_intent_rate, 4), + "confirmed_rate": round(conv_confirmed_rate, 4), + }, + "manual_style": { + "messages_sampled": len(manual_len_sample), + "median_len_chars": manual_len_median, + "p90_len_chars": manual_len_p90, + "question_rate": round(manual_question / manual_total, 4) if manual_total else 0.0, + "exclaim_rate": round(manual_exclaim / manual_total, 4) if manual_total else 0.0, + "emoji_rate": round(manual_emoji / manual_total, 4) if manual_total else 0.0, + "lang_guess": manual_lang, + }, + "artifacts": { + "templates": str(templates_path), + "training_pairs": str(training_pairs_path), + "rescue_playbook": str(rescue_path), + "objection_handlers": str(objections_path), + "bot_performance_audit": str(bot_audit_path), + "sergio_eras": str(sergio_eras_path), + "bot_fatigue_chart": str(fatigue_png), + "editorial_timeline": str(editorial_png), + "executive_summary": str(exec_path), + }, + } + _write_json(summary_path, summary) + + exec_md = [ + "# Executive Summary — Socialmediatorr Instagram DMs", + "", + f"- Generated: `{summary['generated_at']}`", + f"- Owner inferred/used: `{owner_name}`", + f"- Conversations analyzed: `{conv_total}`", + f"- Conversion (intent): `{conv_converted_intent}` (`{round(conv_intent_rate*100,1)}%`)", + f"- Conversion (confirmed): `{conv_converted_confirmed}` (`{round(conv_confirmed_rate*100,1)}%`)", + f"- Bot-only conversations: `{conv_bot_only}` (`{round(bot_only_rate*100,1)}%`)", + f"- Human-intervened conversations: `{conv_human_intervened}` (`{round(human_rate*100,1)}%`)", + "", + "## Persona (Manual Replies)", + "", + f"- Median reply length (chars): `{manual_len_median}` (p90 `{manual_len_p90}`)", + f"- Questions: `{round((manual_question/manual_total)*100,1) if manual_total else 0}%` | Exclamations: `{round((manual_exclaim/manual_total)*100,1) if manual_total else 0}%` | Emoji: `{round((manual_emoji/manual_total)*100,1) if manual_total else 0}%`", + f"- Language guess: `{dict(manual_lang)}`", + "", + "## Outputs", + "", + f"- Templates + bot scripts: `{templates_path}`", + f"- Training pairs (manual-only, converted threads): `{training_pairs_path}`", + f"- Rescue playbook (human saves): `{rescue_path}`", + f"- Objection handlers: `{objections_path}`", + f"- Bot template audit: `{bot_audit_path}`", + f"- Product eras / vocab drift: `{sergio_eras_path}`", + f"- Bot fatigue chart: `{fatigue_png}`", + f"- Editorial timeline: `{editorial_png}`", + "", + "## Recommendations (Next Agent Iteration)", + "", + "- Use bot templates for first contact; switch to human-style replies when user shows objections or goes silent.", + "- Drive the agent with retrieval over `training_pairs.jsonl` + `rescue_playbook.json` for re-engagement patterns.", + "- Refresh the dominant bot script if the fatigue chart shows reply-rate decay over time.", + "", + ] + _write_text(exec_path, "\n".join(exec_md)) + + return summary + + raise RuntimeError(f"Invalid stage: {stage}") + + +def _parse_ts_arg(value: str) -> int: + v = value.strip() + if v.isdigit(): + return int(v) + dt = datetime.fromisoformat(v.replace("Z", "+00:00")) + if dt.tzinfo is None: + dt = dt.replace(tzinfo=timezone.utc) + return int(dt.timestamp() * 1000) + + +def main(argv: list[str] | None = None) -> int: + ap = argparse.ArgumentParser(description="Analyze Instagram export DMs for bot-vs-human patterns and sales flow.") + ap.add_argument("--input", required=True, help="Instagram export root (contains messages/inbox) or history dir with messages/*.jsonl") + ap.add_argument("--out", required=True, help="output directory") + ap.add_argument("--owner-name", default=None, help="owner sender_name (default: infer)") + ap.add_argument("--include-message-requests", action="store_true", help="include messages/message_requests in addition to inbox") + ap.add_argument("--index", default=None, help="optional index JSONL (from index_instagram_export) to speed filtering") + ap.add_argument("--min-ts", default=None, help="min timestamp (ms or ISO8601)") + ap.add_argument("--max-ts", default=None, help="max timestamp (ms or ISO8601)") + ap.add_argument("--since-days", type=int, default=None, help="analyze only last N days") + ap.add_argument("--max-conversations", type=int, default=None, help="cap number of conversations") + ap.add_argument("--bot-min-count", type=int, default=50, help="label template as [BOT] if sent >= this many times") + ap.add_argument("--manual-max-count", type=int, default=10, help="label template as [MANUAL] if sent <= this many times") + ap.add_argument("--hybrid-similarity", type=float, default=0.9, help="SequenceMatcher similarity for [HYBRID]") + ap.add_argument("--ghost-hours", type=float, default=24.0, help="silence threshold for ghost/rescue detection") + ap.add_argument("--reply-window-hours", type=float, default=48.0, help="counts as 'replied' if within this window") + ap.add_argument("--no-redact", action="store_true", help="do not redact emails/phones/urls/handles in outputs") + ap.add_argument("--include-hybrid-in-pairs", action="store_true", help="allow HYBRID replies in training_pairs.jsonl") + ap.add_argument("--max-training-pairs", type=int, default=25000) + ap.add_argument("--max-rescue-events", type=int, default=5000) + ap.add_argument("--top-bot-templates", type=int, default=5, help="top bot templates to chart in editorial timeline") + ap.add_argument("--progress-every", type=int, default=250, help="progress log frequency (stderr)") + ap.add_argument("--templates-cache", default=None, help="path to templates cache JSON (reuses pass1 results)") + ap.add_argument("--stage", choices=["pass1", "pass2", "all"], default="all") + args = ap.parse_args(argv) + + out_dir = Path(args.out) + input_path = Path(args.input) + index_path = Path(args.index) if args.index else None + + min_ts_ms = _parse_ts_arg(args.min_ts) if args.min_ts else None + max_ts_ms = _parse_ts_arg(args.max_ts) if args.max_ts else None + if args.since_days: + since = datetime.now(timezone.utc) - timedelta(days=int(args.since_days)) + min_ts_ms = int(since.timestamp() * 1000) + + sources, sources_meta = discover_sources( + input_path=input_path, + include_message_requests=bool(args.include_message_requests), + index_path=index_path, + min_ts_ms=min_ts_ms, + max_conversations=(int(args.max_conversations) if args.max_conversations else None), + ) + if not sources: + print("No conversations found under input.", file=sys.stderr) + return 2 + + cfg = AnalysisConfig( + owner_name=(args.owner_name.strip() if args.owner_name else None), + min_ts_ms=min_ts_ms, + max_ts_ms=max_ts_ms, + bot_min_count=int(args.bot_min_count), + manual_max_count=int(args.manual_max_count), + hybrid_similarity=float(args.hybrid_similarity), + ghost_hours=float(args.ghost_hours), + reply_window_hours=float(args.reply_window_hours), + redact=not bool(args.no_redact), + include_hybrid_in_pairs=bool(args.include_hybrid_in_pairs), + max_training_pairs=int(args.max_training_pairs), + max_rescue_events=int(args.max_rescue_events), + top_bot_templates=int(args.top_bot_templates), + progress_every=int(args.progress_every), + ) + + templates_cache = Path(args.templates_cache) if args.templates_cache else None + try: + result = analyze( + sources=sources, + out_dir=out_dir, + cfg=cfg, + sources_meta=sources_meta, + templates_cache=templates_cache, + stage=args.stage, # type: ignore[arg-type] + ) + print(json.dumps({"ok": True, "out": str(out_dir), "summary": str(out_dir / "summary.json"), "stage": args.stage})) + return 0 + except KeyboardInterrupt: + return 130 + except Exception as e: + print(f"Analysis failed: {e}", file=sys.stderr) + return 1 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/sergio_instagram_messaging/export_meta_ig_history.py b/sergio_instagram_messaging/export_meta_ig_history.py new file mode 100644 index 0000000..cfc7159 --- /dev/null +++ b/sergio_instagram_messaging/export_meta_ig_history.py @@ -0,0 +1,340 @@ +from __future__ import annotations + +import argparse +import json +import os +import re +import sys +import time +import urllib.error +import urllib.parse +import urllib.request +from dataclasses import dataclass +from datetime import datetime, timezone +from pathlib import Path +from typing import Any, Iterable + + +DEFAULT_CREDS_PATH = "/root/tmp/emo-social-meta-app-creds.txt" + + +def _now_utc_iso() -> str: + return datetime.now(timezone.utc).replace(microsecond=0).isoformat() + + +def load_dotenv(path: str) -> dict[str, str]: + env: dict[str, str] = {} + p = Path(path) + if not p.exists(): + raise FileNotFoundError(path) + for raw in p.read_text(errors="replace").splitlines(): + line = raw.strip() + if not line or line.startswith("#"): + continue + if "=" not in line: + continue + key, value = line.split("=", 1) + key = key.strip() + value = value.strip() + if not key: + continue + env[key] = value + return env + + +def _require(env: dict[str, str], key: str) -> str: + value = (env.get(key) or "").strip() + if not value: + raise KeyError(f"Missing required config: {key}") + return value + + +def _optional(env: dict[str, str], key: str, default: str) -> str: + value = (env.get(key) or "").strip() + return value if value else default + + +@dataclass(frozen=True) +class GraphApiConfig: + access_token: str + version: str = "v20.0" + + @property + def base(self) -> str: + return f"https://graph.facebook.com/{self.version}" + + +class GraphApiError(RuntimeError): + def __init__(self, payload: dict[str, Any]): + self.payload = payload + super().__init__(payload) + + +class GraphApiClient: + def __init__(self, cfg: GraphApiConfig): + self._cfg = cfg + + def get_json(self, path: str, params: dict[str, str] | None = None) -> dict[str, Any]: + params = dict(params or {}) + params["access_token"] = self._cfg.access_token + url = f"{self._cfg.base}{path}?{urllib.parse.urlencode(params)}" + try: + with urllib.request.urlopen(url, timeout=60) as r: + return json.loads(r.read().decode("utf-8")) + except urllib.error.HTTPError as e: + body = e.read().decode("utf-8", errors="replace") + try: + payload = json.loads(body) + except Exception: + payload = {"raw": body[:1000]} + raise GraphApiError({"status": e.code, "error": payload}) + + def iter_paged( + self, + path: str, + params: dict[str, str] | None = None, + *, + max_pages: int | None = None, + sleep_s: float = 0.0, + ) -> Iterable[dict[str, Any]]: + version_prefix = re.compile(r"^/v\d+\.\d+") + page = 0 + next_path = path + next_params = dict(params or {}) + while True: + page += 1 + if max_pages is not None and page > max_pages: + return + + payload = self.get_json(next_path, next_params) + data = payload.get("data") or [] + if isinstance(data, list): + for item in data: + if isinstance(item, dict): + yield item + + paging = payload.get("paging") or {} + next_url = paging.get("next") + if not next_url: + return + + parsed = urllib.parse.urlparse(next_url) + next_path = parsed.path + # Meta's paging URLs include the version prefix (e.g. /v24.0/...). + # Our client already pins a version in cfg.base, so strip it here. + m = version_prefix.match(next_path) + if m: + next_path = next_path[m.end() :] or "/" + next_params = dict(urllib.parse.parse_qsl(parsed.query)) + # Ensure we don't leak a token from the paging URL if Meta includes it. + next_params["access_token"] = self._cfg.access_token + + if sleep_s: + time.sleep(sleep_s) + + +def export_history( + *, + creds_path: str, + out_dir: str, + platform: str, + conversations_fields: str, + messages_fields: str, + conversations_limit: int | None, + max_conversations: int | None, + max_pages: int | None, + sleep_s: float, +) -> None: + env = load_dotenv(creds_path) + token = _require(env, "META_PAGE_ACCESS_TOKEN") + version = _optional(env, "META_GRAPH_API_VERSION", "v20.0") + + client = GraphApiClient(GraphApiConfig(access_token=token, version=version)) + page_id = (env.get("META_PAGE_ID") or "").strip() + page_name: str | None = None + if not page_id: + me = client.get_json("/me", {"fields": "id,name"}) + page_id = str(me.get("id") or "").strip() + page_name = str(me.get("name") or "").strip() or None + if not page_id: + raise KeyError("Missing required config: META_PAGE_ID (and token did not resolve /me.id)") + + out = Path(out_dir) + out.mkdir(parents=True, exist_ok=True) + (out / "messages").mkdir(parents=True, exist_ok=True) + + meta_path = out / "export_metadata.json" + meta_path.write_text( + json.dumps( + { + "exported_at": _now_utc_iso(), + "graph_api_version": version, + "platform": platform, + "page_id": page_id, + "page_name": page_name, + "conversations_fields": conversations_fields, + "messages_fields": messages_fields, + }, + indent=2, + ) + + "\n" + ) + + conversations: list[dict[str, Any]] = [] + conv_params = {"platform": platform, "fields": conversations_fields} + if conversations_limit is not None: + conv_params["limit"] = str(conversations_limit) + + def _iter_conversations() -> Iterable[dict[str, Any]]: + return client.iter_paged( + f"/{page_id}/conversations", + params=conv_params, + max_pages=max_pages, + sleep_s=sleep_s, + ) + + def _is_large_inbox_timeout(err: GraphApiError) -> bool: + if not isinstance(err.payload, dict): + return False + meta = (err.payload.get("error") or {}).get("error") + if not isinstance(meta, dict): + return False + return meta.get("code") == -2 and meta.get("error_subcode") == 2534084 + + def _collect_conversations() -> None: + nonlocal conversations + try: + for conv in _iter_conversations(): + conversations.append(conv) + if max_conversations is not None and len(conversations) >= max_conversations: + break + except GraphApiError as e: + if _is_large_inbox_timeout(e) and conversations: + print( + f"Warning: conversation listing timed out after {len(conversations)} conversation(s); " + "continuing with partial export.", + file=sys.stderr, + ) + return + raise + + try: + _collect_conversations() + except GraphApiError as e: + if _is_large_inbox_timeout(e) and conv_params.get("limit") != "1": + print( + "Conversation listing timed out; retrying with limit=1 (workaround for large inboxes).", + file=sys.stderr, + ) + conv_params["limit"] = "1" + _collect_conversations() + else: + raise + + (out / "conversations.json").write_text(json.dumps({"data": conversations}, indent=2) + "\n") + + exported_messages = 0 + for idx, conv in enumerate(conversations, 1): + conv_id = str(conv.get("id") or "").strip() + if not conv_id: + continue + + dst = out / "messages" / f"{conv_id}.jsonl" + tmp_dst = dst.with_suffix(dst.suffix + ".tmp") + + def _is_too_much_data_error(err: GraphApiError) -> bool: + if not isinstance(err.payload, dict): + return False + if err.payload.get("status") != 500: + return False + meta = (err.payload.get("error") or {}).get("error") + if not isinstance(meta, dict): + return False + msg = str(meta.get("message") or "").lower() + return meta.get("code") == 1 and "reduce the amount of data" in msg + + def _export_messages(*, fields: str) -> int: + count = 0 + with tmp_dst.open("w", encoding="utf-8") as f: + for msg in client.iter_paged( + f"/{conv_id}/messages", + params={"fields": fields}, + max_pages=max_pages, + sleep_s=sleep_s, + ): + f.write(json.dumps(msg, ensure_ascii=False) + "\n") + count += 1 + tmp_dst.replace(dst) + return count + + try: + exported_messages += _export_messages(fields=messages_fields) + except GraphApiError as e: + if tmp_dst.exists(): + tmp_dst.unlink(missing_ok=True) # type: ignore[arg-type] + if _is_too_much_data_error(e): + fallback_fields = "message,created_time,from,to" + print( + f"Warning: message export for {conv_id} was too large; retrying without attachments.", + file=sys.stderr, + ) + exported_messages += _export_messages(fields=fallback_fields) + else: + raise + + print(f"[{idx}/{len(conversations)}] exported conversation {conv_id}: {dst}") + + print(f"Done. conversations={len(conversations)} messages={exported_messages} out={out}") + + +def main(argv: list[str] | None = None) -> int: + ap = argparse.ArgumentParser(description="Export Instagram DM history via Meta Graph API.") + ap.add_argument("--creds", default=DEFAULT_CREDS_PATH, help="dotenv-style creds file path") + ap.add_argument("--out", required=True, help="output directory") + ap.add_argument("--platform", default=os.getenv("META_EXPORT_PLATFORM", "instagram")) + ap.add_argument( + "--conversations-fields", + default=os.getenv("META_CONVERSATIONS_FIELDS", "participants,updated_time,message_count,unread_count"), + ) + ap.add_argument( + "--messages-fields", + default=os.getenv("META_MESSAGES_FIELDS", "message,created_time,from,to,attachments"), + ) + ap.add_argument( + "--conversations-limit", + type=int, + default=int(os.getenv("META_CONVERSATIONS_LIMIT", "25")), + help="Graph API limit for conversation listing (use 1 if listing times out).", + ) + ap.add_argument("--max-conversations", type=int, default=None) + ap.add_argument("--max-pages", type=int, default=None, help="max pages per paginated endpoint") + ap.add_argument("--sleep-s", type=float, default=0.2, help="sleep between page fetches") + args = ap.parse_args(argv) + + try: + export_history( + creds_path=args.creds, + out_dir=args.out, + platform=args.platform, + conversations_fields=args.conversations_fields, + messages_fields=args.messages_fields, + conversations_limit=args.conversations_limit, + max_conversations=args.max_conversations, + max_pages=args.max_pages, + sleep_s=args.sleep_s, + ) + except KeyError as e: + print(str(e), file=sys.stderr) + return 2 + except FileNotFoundError as e: + print(f"Creds file not found: {e}", file=sys.stderr) + return 2 + except GraphApiError as e: + print(f"Graph API error: {e}", file=sys.stderr) + return 3 + + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/sergio_instagram_messaging/fetch_thread_messages.py b/sergio_instagram_messaging/fetch_thread_messages.py new file mode 100644 index 0000000..d1b4505 --- /dev/null +++ b/sergio_instagram_messaging/fetch_thread_messages.py @@ -0,0 +1,123 @@ +from __future__ import annotations + +import argparse +import json +import os +import sys +from pathlib import Path +from typing import Any + +from .export_meta_ig_history import ( + DEFAULT_CREDS_PATH, + GraphApiClient, + GraphApiConfig, + GraphApiError, + load_dotenv, +) + + +def _redact(obj: Any) -> Any: + if isinstance(obj, dict): + return {k: ("" if k == "access_token" else _redact(v)) for k, v in obj.items()} + if isinstance(obj, list): + return [_redact(v) for v in obj] + return obj + + +def _require(env: dict[str, str], key: str) -> str: + value = (env.get(key) or "").strip() + if not value: + raise KeyError(f"Missing required config: {key}") + return value + + +def main(argv: list[str] | None = None) -> int: + ap = argparse.ArgumentParser( + description=( + "Fetch messages for a single Instagram thread by participant user id (sender.id from webhook). " + "Useful when /conversations listing times out." + ) + ) + ap.add_argument("--creds", default=DEFAULT_CREDS_PATH) + ap.add_argument("--user-id", required=True, help="Instagram-scoped user id (from webhook sender.id)") + ap.add_argument("--platform", default=os.getenv("META_EXPORT_PLATFORM", "instagram")) + ap.add_argument("--version", default=None, help="override graph api version (e.g., v20.0)") + ap.add_argument( + "--messages-fields", + default=os.getenv("META_MESSAGES_FIELDS", "message,created_time,from,to,attachments"), + ) + ap.add_argument("--max-pages", type=int, default=1, help="pages to fetch from /messages pagination") + ap.add_argument("--sleep-s", type=float, default=0.2, help="sleep between page fetches") + ap.add_argument("--out", default=None, help="write JSONL to this path (default: stdout)") + args = ap.parse_args(argv) + + env = load_dotenv(args.creds) + token = _require(env, "META_PAGE_ACCESS_TOKEN") + page_id = (env.get("META_PAGE_ID") or "").strip() + version = (args.version or env.get("META_GRAPH_API_VERSION") or "v20.0").strip() or "v20.0" + + client = GraphApiClient(GraphApiConfig(access_token=token, version=version)) + + try: + if not page_id: + me = client.get_json("/me", {"fields": "id"}) + page_id = str(me.get("id") or "").strip() + if not page_id: + raise KeyError("Missing required config: META_PAGE_ID (and token did not resolve /me.id)") + + conv = client.get_json( + f"/{page_id}/conversations", + { + "platform": args.platform, + "user_id": args.user_id, + "fields": "id,updated_time,message_count,unread_count,participants", + "limit": "1", + }, + ) + data = conv.get("data") or [] + if not isinstance(data, list) or not data or not isinstance(data[0], dict): + print("No conversation found for this user_id.", file=sys.stderr) + return 2 + conv_id = str(data[0].get("id") or "").strip() + if not conv_id: + print("Conversation result missing id.", file=sys.stderr) + return 2 + + out_f = None + if args.out: + out_path = Path(args.out) + out_path.parent.mkdir(parents=True, exist_ok=True) + out_f = out_path.open("w", encoding="utf-8") + else: + out_f = sys.stdout + + written = 0 + for msg in client.iter_paged( + f"/{conv_id}/messages", + params={"fields": args.messages_fields}, + max_pages=args.max_pages, + sleep_s=args.sleep_s, + ): + out_f.write(json.dumps(msg, ensure_ascii=False) + "\n") + written += 1 + + if args.out: + out_f.close() + + print(f"conversation_id: {conv_id} messages_written: {written}", file=sys.stderr) + return 0 + except GraphApiError as e: + payload = e.args[0] if e.args else {"error": str(e)} + print(json.dumps(_redact(payload), ensure_ascii=False), file=sys.stderr) + return 3 + except KeyError as e: + print(str(e), file=sys.stderr) + return 2 + except Exception as e: + print(f"Error: {e}", file=sys.stderr) + return 1 + + +if __name__ == "__main__": + raise SystemExit(main()) + diff --git a/sergio_instagram_messaging/generate_dm_report.py b/sergio_instagram_messaging/generate_dm_report.py new file mode 100644 index 0000000..d24fba1 --- /dev/null +++ b/sergio_instagram_messaging/generate_dm_report.py @@ -0,0 +1,347 @@ +from __future__ import annotations + +import argparse +import csv +import json +import os +import statistics +from dataclasses import dataclass +from pathlib import Path +from typing import Any + + +def _safe_chmod_600(path: Path) -> None: + try: + os.chmod(path, 0o600) + except Exception: + return + + +def _load_json(path: Path) -> dict[str, Any]: + return json.loads(path.read_text(encoding="utf-8", errors="replace")) + + +def _read_csv(path: Path) -> list[dict[str, str]]: + with path.open("r", encoding="utf-8", newline="") as f: + return list(csv.DictReader(f)) + + +def _count_jsonl(path: Path, *, max_lines: int = 5_000_000) -> int: + n = 0 + with path.open("r", encoding="utf-8", errors="replace") as f: + for _ in f: + n += 1 + if n >= max_lines: + break + return n + + +def _pct(x: float) -> str: + return f"{x*100:.1f}%" + + +@dataclass(frozen=True) +class ReportInputs: + summary: Path + templates: Path + bot_audit: Path + objections: Path + rescue: Path + eras: Path + training_pairs: Path + fatigue_png: Path + editorial_png: Path + + +def _resolve_inputs(analysis_dir: Path) -> ReportInputs: + return ReportInputs( + summary=analysis_dir / "summary.json", + templates=analysis_dir / "top_outgoing_templates.json", + bot_audit=analysis_dir / "bot_performance_audit.csv", + objections=analysis_dir / "objection_handlers.json", + rescue=analysis_dir / "rescue_playbook.json", + eras=analysis_dir / "sergio_eras.csv", + training_pairs=analysis_dir / "training_pairs.jsonl", + fatigue_png=analysis_dir / "bot_fatigue_chart.png", + editorial_png=analysis_dir / "editorial_timeline.png", + ) + + +def generate_report(*, analysis_dir: Path, out_path: Path) -> Path: + inp = _resolve_inputs(analysis_dir) + for p in inp.__dict__.values(): + if not Path(p).exists(): + raise FileNotFoundError(str(p)) + + summary = _load_json(inp.summary) + templates = _load_json(inp.templates) + objections = _load_json(inp.objections) + rescues = _load_json(inp.rescue) + bot_audit = _read_csv(inp.bot_audit) + + owner = summary.get("owner_name") or "Unknown" + conv = summary.get("conversations") or {} + conv_total = int(conv.get("total") or 0) + bot_only = int(conv.get("bot_only") or 0) + human = int(conv.get("human_intervened") or 0) + conversions = summary.get("conversions") or {} + conv_intent = int(conversions.get("intent") or 0) + conv_confirmed = int(conversions.get("confirmed") or 0) + + bot_only_rate = (bot_only / conv_total) if conv_total else 0.0 + human_rate = (human / conv_total) if conv_total else 0.0 + intent_rate = (conv_intent / conv_total) if conv_total else 0.0 + confirmed_rate = (conv_confirmed / conv_total) if conv_total else 0.0 + + manual_style = summary.get("manual_style") or {} + median_len = manual_style.get("median_len_chars") + p90_len = manual_style.get("p90_len_chars") + question_rate = float(manual_style.get("question_rate") or 0.0) + exclaim_rate = float(manual_style.get("exclaim_rate") or 0.0) + emoji_rate = float(manual_style.get("emoji_rate") or 0.0) + lang_guess = manual_style.get("lang_guess") or {} + + # Templates: prefer canonical strings (safe-ish) and avoid raw samples. + top_templates = templates.get("top_templates") or [] + top_bot = [t for t in top_templates if isinstance(t, dict) and t.get("label_hint") == "bot"] + top_manual = [t for t in top_templates if isinstance(t, dict) and t.get("label_hint") == "manual"] + + # Bot audit: best/worst by reply_rate. + def fnum(v: str | None) -> float: + try: + return float(v or 0) + except Exception: + return 0.0 + + bot_audit_sorted = sorted(bot_audit, key=lambda r: fnum(r.get("sent")), reverse=True) + top_audit = bot_audit_sorted[:10] + best_reply = sorted(bot_audit, key=lambda r: fnum(r.get("reply_rate")), reverse=True)[:10] + worst_reply = sorted(bot_audit, key=lambda r: fnum(r.get("reply_rate")))[:10] + + # Objections: most common replies per category. + objection_blocks: list[str] = [] + if isinstance(objections, dict): + for cat in ("price", "time", "trust", "stop"): + replies = objections.get(cat) or [] + if not isinstance(replies, list) or not replies: + continue + top3 = [] + for r in replies[:3]: + if not isinstance(r, dict): + continue + top3.append(f"- ({r.get('count')}) {r.get('reply')}") + if top3: + objection_blocks.append(f"### {cat}\n" + "\n".join(top3)) + + rescue_count = len(rescues) if isinstance(rescues, list) else 0 + pairs_count = _count_jsonl(inp.training_pairs, max_lines=2_000_000) + + # Era summary: simple high-level notes. + eras_rows = _read_csv(inp.eras) + era_recent = eras_rows[-6:] if len(eras_rows) > 6 else eras_rows + era_offer_terms: list[str] = [] + for row in era_recent: + offers = (row.get("top_offers") or "").strip() + if offers: + era_offer_terms.append(offers) + + # A few derived notes. + lang_line = ", ".join(f"{k}={v}" for k, v in lang_guess.items()) + + # Summarize bot fatigue trend from image existence only (analysis already made it). + report = [] + report.append("# Socialmediatorr Instagram DM History — Human Readable Report (English)") + report.append("") + report.append(f"- Generated: `{summary.get('generated_at')}`") + report.append(f"- Owner name used: `{owner}`") + report.append("") + + report.append("## 1) What This Dataset Represents") + report.append("") + report.append( + "This is an all-time audit of Instagram DM conversations for `@socialmediatorr`, focused on extracting repeatable sales + support behavior so an AI agent can reply in Sergio’s style." + ) + report.append( + "The analysis treats the account as a hybrid system: frequent repeated templates (likely automation/scripts) plus lower-frequency custom replies (human Sergio)." + ) + report.append("") + + report.append("## 2) High-Level Metrics (All-Time)") + report.append("") + report.append(f"- Conversations analyzed: **{conv_total:,}**") + report.append(f"- Bot-only conversations: **{bot_only:,}** ({_pct(bot_only_rate)})") + report.append(f"- Human-intervened conversations: **{human:,}** ({_pct(human_rate)})") + report.append(f"- Conversion (intent signals): **{conv_intent:,}** ({_pct(intent_rate)})") + report.append(f"- Conversion (confirmed signals): **{conv_confirmed:,}** ({_pct(confirmed_rate)})") + report.append("") + report.append( + "Notes on conversion: this uses heuristics (keywords + payment/link mentions). It is directionally useful for ranking scripts, but it is not a ground-truth revenue ledger." + ) + report.append("") + + report.append("## 3) Sergio Persona (From Manual/Hybrid Replies)") + report.append("") + report.append(f"- Typical reply length: median **{median_len}** chars (p90 **{p90_len}**)") + report.append(f"- Questions: **{_pct(question_rate)}** | Exclamations: **{_pct(exclaim_rate)}** | Emoji: **{_pct(emoji_rate)}**") + report.append(f"- Language guess (manual replies): {lang_line or 'n/a'}") + report.append("") + report.append("Practical implication for an agent: short, direct replies; minimal punctuation; bilingual capability; low/no emoji usage.") + report.append("") + + report.append("## 4) Bot vs Human Segmentation (What It Means)") + report.append("") + report.append( + "- **[BOT]** = outgoing message template repeated frequently (>= configured threshold).\n" + "- **[MANUAL]** = outgoing message that is rare/unique (<= configured threshold).\n" + "- **[HYBRID]** = messages that look like a bot template but with manual edits (prefix match/similarity)." + ) + report.append("") + report.append( + "This separation is the foundation for: (1) extracting safe reusable scripts, and (2) extracting human-only replies as training data for a RAG or fine-tune." + ) + report.append("") + + report.append("## 5) Top Detected Script Templates (Canonicalized)") + report.append("") + if top_bot: + for i, t in enumerate(top_bot[:10], 1): + canon = (t.get("canonical") or "").strip() + count = int(t.get("count") or 0) + report.append(f"- BOT #{i}: sent **{count}**× — `{canon[:160]}`") + else: + report.append("- (No high-frequency bot templates detected with current thresholds.)") + report.append("") + + report.append("## 6) Human Reply Library (Rare/Manual Examples, Canonicalized)") + report.append("") + if top_manual: + for i, t in enumerate(top_manual[:10], 1): + canon = (t.get("canonical") or "").strip() + count = int(t.get("count") or 0) + report.append(f"- MANUAL-ish #{i}: seen **{count}**× — `{canon[:160]}`") + else: + report.append("- (No low-frequency manual templates included in the cached top list.)") + report.append("") + + report.append("## 7) Bot Template Performance (Reply/Conversion Heuristics)") + report.append("") + report.append("These come from `bot_performance_audit.csv` and are computed per canonical bot template.") + report.append("") + if top_audit: + report.append("### Most-used bot templates (by volume)") + for r in top_audit[:8]: + report.append( + f"- sent={r.get('sent')} reply_rate={r.get('reply_rate')} intent_rate={r.get('conversion_intent_rate')} confirmed_rate={r.get('conversion_confirmed_rate')} — `{(r.get('canonical_template') or '')[:140]}`" + ) + report.append("") + if best_reply: + report.append("### Best reply-rate bot templates") + for r in best_reply[:8]: + report.append(f"- reply_rate={r.get('reply_rate')} sent={r.get('sent')} — `{(r.get('canonical_template') or '')[:140]}`") + report.append("") + if worst_reply: + report.append("### Worst reply-rate bot templates") + for r in worst_reply[:8]: + report.append(f"- reply_rate={r.get('reply_rate')} sent={r.get('sent')} — `{(r.get('canonical_template') or '')[:140]}`") + report.append("") + + report.append("## 8) Objections → Best Sergio Replies (Playbook)") + report.append("") + if objection_blocks: + report.extend(objection_blocks) + else: + report.append("- No objection handlers detected with current keyword rules.") + report.append("") + + report.append("## 9) Rescue / Save Logic (Human Intervention After Silence/Negativity)") + report.append("") + report.append(f"- Rescue events detected (heuristic): **{rescue_count:,}**") + report.append( + "A “rescue” is when a manual/hybrid owner message follows either (a) a user negative signal, or (b) >24h silence after a bot message, and the thread later shows a confirmed conversion signal." + ) + report.append("") + + report.append("## 10) Product / Offer Evolution (Eras)") + report.append("") + report.append( + "This is inferred from mentions of pricing/currency + offer terms (e.g., call/audit/coaching) and summarized quarterly." + ) + report.append("") + if era_offer_terms: + report.append("Recent quarters (top extracted offer signals):") + for line in era_offer_terms: + report.append(f"- {line}") + else: + report.append("- No offer signals detected in the most recent quarters with current extraction rules.") + report.append("") + + report.append("## 11) Charts") + report.append("") + report.append(f"- Bot fatigue (weekly reply rate to the dominant bot script): `{inp.fatigue_png}`") + report.append(f"- Editorial timeline (top bot scripts vs conversions): `{inp.editorial_png}`") + report.append("") + + report.append("## 12) What To Build From This (Agent Requirements)") + report.append("") + report.append("### Core behavior") + report.append("- Start with top bot templates for predictable openers and FAQ-style flows.") + report.append("- Switch to Sergio-style manual patterns on objections, negotiation, or when conversation stalls.") + report.append("- Use a rescue cadence (time-based triggers) after silence.") + report.append("") + report.append("### Data products to drive the agent") + report.append(f"- Training pairs (manual-only, converted threads): `{inp.training_pairs}` (rows: ~{pairs_count:,})") + report.append(f"- Objection handlers: `{inp.objections}`") + report.append(f"- Rescue playbook: `{inp.rescue}`") + report.append(f"- Script templates + editorial drift: `{inp.templates}`") + report.append("") + report.append("### Safety boundaries (recommended)") + report.append("- Never request or store passwords/2FA codes.") + report.append("- Avoid medical/legal/financial advice; redirect to a call or a human.") + report.append("- If user asks to move off-platform, follow Sergio’s historical policy and business rules.") + report.append("") + + report.append("## 13) What We Do NOT Need To Know (Ignore / Do Not Store)") + report.append("") + report.append("- Exact client identities (names, handles, phone numbers, emails) unless required for operational routing.") + report.append("- Media attachments (photos/videos/audio) for persona cloning; they add storage cost and privacy risk.") + report.append("- Full verbatim message dumps for every thread; for RAG you only need high-quality pairs and playbook snippets.") + report.append("- Individual one-off edge cases that never repeat (unless they represent a safety boundary).") + report.append("- Internal Meta export folder structure details beyond `messages/inbox/**/message*.json`.") + report.append("") + + report.append("## 14) Caveats / Gaps") + report.append("") + report.append("- The export does not reliably label ManyChat vs Human; bot/human is inferred by repetition and similarity.") + report.append("- Conversion is heuristic; integrate Stripe/Calendly/CRM events if you want ground-truth attribution.") + report.append("- Language detection is heuristic; improve it if you need precise bilingual routing.") + report.append("") + + out_path.parent.mkdir(parents=True, exist_ok=True) + out_path.write_text("\n".join(report) + "\n", encoding="utf-8") + _safe_chmod_600(out_path) + return out_path + + +def main(argv: list[str] | None = None) -> int: + ap = argparse.ArgumentParser(description="Generate a human-readable English report from analyze_instagram_export outputs.") + ap.add_argument("--analysis-dir", required=True, help="directory produced by analyze_instagram_export (contains summary.json)") + ap.add_argument("--out", default=None, help="output markdown path (default: /dm_history_report_en.md)") + args = ap.parse_args(argv) + + analysis_dir = Path(args.analysis_dir) + out_path = Path(args.out) if args.out else (analysis_dir / "dm_history_report_en.md") + try: + p = generate_report(analysis_dir=analysis_dir, out_path=out_path) + print(json.dumps({"ok": True, "out": str(p)}, ensure_ascii=False)) + return 0 + except FileNotFoundError as e: + print(f"Missing required input: {e}", file=os.sys.stderr) + return 2 + except Exception as e: + print(f"Report generation failed: {e}", file=os.sys.stderr) + return 1 + + +if __name__ == "__main__": + raise SystemExit(main()) + diff --git a/sergio_instagram_messaging/import_instagram_export.py b/sergio_instagram_messaging/import_instagram_export.py new file mode 100644 index 0000000..a386994 --- /dev/null +++ b/sergio_instagram_messaging/import_instagram_export.py @@ -0,0 +1,347 @@ +from __future__ import annotations + +import argparse +import hashlib +import json +import os +import re +import sys +from dataclasses import dataclass +from datetime import datetime, timezone +from pathlib import Path +from typing import Any, Iterable +from zipfile import ZipFile + + +def _now_utc_iso() -> str: + return datetime.now(timezone.utc).replace(microsecond=0).isoformat() + + +def _ts_ms_to_iso(ts_ms: int | None) -> str | None: + if not ts_ms: + return None + try: + return datetime.fromtimestamp(ts_ms / 1000.0, tz=timezone.utc).replace(microsecond=0).isoformat() + except Exception: + return None + + +def _safe_slug(value: str, *, max_len: int = 120) -> str: + raw = value.strip() + base = re.sub(r"[^A-Za-z0-9._-]+", "_", raw).strip("_.-") + if not base: + base = "conversation" + if len(base) > max_len: + base = base[:max_len] + digest = hashlib.sha1(raw.encode("utf-8", errors="ignore")).hexdigest()[:10] + return f"{base}__{digest}" + + +def _extract_text(msg: dict[str, Any]) -> str | None: + for key in ("content", "text", "message"): + value = msg.get(key) + if isinstance(value, str) and value.strip(): + return value + + share = msg.get("share") + if isinstance(share, dict): + parts: list[str] = [] + for k in ("link", "share_text", "original_content_owner", "share_text"): + v = share.get(k) + if isinstance(v, str) and v.strip(): + parts.append(v.strip()) + if parts: + return " | ".join(parts) + + return None + + +def _extract_attachments(msg: dict[str, Any]) -> list[dict[str, Any]]: + out: list[dict[str, Any]] = [] + + def add(kind: str, items: Any) -> None: + if not isinstance(items, list): + return + for it in items: + if not isinstance(it, dict): + continue + uri = it.get("uri") or it.get("url") + entry = {"type": kind} + if isinstance(uri, str) and uri: + entry["uri"] = uri + for k in ("creation_timestamp", "timestamp_ms", "duration"): + if k in it: + entry[k] = it.get(k) + out.append(entry) + + add("photo", msg.get("photos")) + add("video", msg.get("videos")) + add("audio", msg.get("audio_files")) + add("file", msg.get("files")) + add("gif", msg.get("gifs")) + add("sticker", msg.get("sticker")) + + return out + + +@dataclass(frozen=True) +class ConversationExport: + conv_id: str + title: str | None + participants: list[str] + messages: list[dict[str, Any]] + + +def _participants_list(obj: Any) -> list[str]: + out: list[str] = [] + if not isinstance(obj, list): + return out + for item in obj: + if isinstance(item, dict): + for k in ("name", "username"): + v = item.get(k) + if isinstance(v, str) and v.strip(): + out.append(v.strip()) + break + elif isinstance(item, str) and item.strip(): + out.append(item.strip()) + # stable, de-duped + seen: set[str] = set() + unique: list[str] = [] + for name in out: + if name in seen: + continue + unique.append(name) + seen.add(name) + return unique + + +def _message_sort_key(msg: dict[str, Any]) -> int: + ts = msg.get("timestamp_ms") + if isinstance(ts, int): + return ts + if isinstance(ts, float): + return int(ts) + return 0 + + +def _load_conversation_parts(parts: Iterable[tuple[str, Any]]) -> ConversationExport: + title: str | None = None + conv_id: str | None = None + participants: list[str] = [] + messages: list[dict[str, Any]] = [] + + for _name, data in parts: + if not isinstance(data, dict): + continue + if title is None and isinstance(data.get("title"), str): + title = data.get("title") + if conv_id is None: + for key in ("thread_path", "threadPath", "thread_id", "threadId"): + v = data.get(key) + if isinstance(v, str) and v.strip(): + conv_id = v.strip() + break + if not participants: + participants = _participants_list(data.get("participants")) + msgs = data.get("messages") + if isinstance(msgs, list): + for m in msgs: + if isinstance(m, dict): + messages.append(m) + + if conv_id is None: + conv_id = "unknown" + + messages.sort(key=_message_sort_key) + return ConversationExport(conv_id=conv_id, title=title, participants=participants, messages=messages) + + +def _dir_conversation_groups(root: Path) -> dict[Path, list[Path]]: + """ + Group Instagram export message part files by conversation folder. + + The export can contain huge attachment subtrees (photos/videos). To avoid + walking those, prefer `messages/inbox/*` and only glob `message*.json` in the + top level of each conversation folder. + """ + inbox_root = root / "messages" / "inbox" + if inbox_root.is_dir(): + groups: dict[Path, list[Path]] = {} + for conv_dir in inbox_root.iterdir(): + if not conv_dir.is_dir(): + continue + parts = sorted(conv_dir.glob("message*.json"), key=lambda p: p.name) + if parts: + groups[conv_dir] = parts + return groups + + # Fallback: legacy/unknown layout; wider scan but still filter to messages/inbox. + groups: dict[Path, list[Path]] = {} + for path in root.rglob("message*.json"): + try: + rel = path.relative_to(root) + except Exception: + continue + rel_lower = str(rel).lower() + if "/messages/inbox/" not in rel_lower.replace("\\", "/"): + continue + if path.is_dir(): + continue + groups.setdefault(path.parent, []).append(path) + for folder, files in groups.items(): + files.sort(key=lambda p: p.name) + return groups + + +def _zip_conversation_groups(z: ZipFile) -> dict[str, list[str]]: + groups: dict[str, list[str]] = {} + for name in z.namelist(): + lower = name.lower() + if not lower.endswith(".json"): + continue + if "/messages/inbox/" not in lower: + continue + base = os.path.basename(lower) + if not base.startswith("message"): + continue + folder = os.path.dirname(name) + groups.setdefault(folder, []).append(name) + for folder, files in groups.items(): + files.sort(key=lambda n: os.path.basename(n)) + return groups + + +def import_instagram_export(*, input_path: str, out_dir: str) -> None: + src = Path(input_path) + out = Path(out_dir) + out.mkdir(parents=True, exist_ok=True) + (out / "messages").mkdir(parents=True, exist_ok=True) + + conversations_out: list[dict[str, Any]] = [] + total_messages = 0 + + if src.is_file() and src.suffix.lower() == ".zip": + with ZipFile(src) as z: + groups = _zip_conversation_groups(z) + for folder, files in sorted(groups.items(), key=lambda kv: kv[0]): + parts: list[tuple[str, Any]] = [] + for name in files: + with z.open(name) as f: + parts.append((name, json.load(f))) + conv = _load_conversation_parts(parts) + conv_key = conv.conv_id if conv.conv_id != "unknown" else folder + safe = _safe_slug(conv_key) + dst = out / "messages" / f"{safe}.jsonl" + latest_ts_ms: int | None = None + with dst.open("w", encoding="utf-8") as f: + for m in conv.messages: + ts_ms = m.get("timestamp_ms") if isinstance(m.get("timestamp_ms"), int) else None + latest_ts_ms = ts_ms if (ts_ms and (latest_ts_ms is None or ts_ms > latest_ts_ms)) else latest_ts_ms + row = { + "source": "instagram_export", + "conversation_id": conv_key, + "conversation_title": conv.title, + "participants": conv.participants, + "timestamp_ms": ts_ms, + "created_time": _ts_ms_to_iso(ts_ms), + "sender": m.get("sender_name"), + "text": _extract_text(m), + "type": m.get("type"), + "attachments": _extract_attachments(m), + } + f.write(json.dumps(row, ensure_ascii=False) + "\n") + total_messages += 1 + conversations_out.append( + { + "id": conv_key, + "title": conv.title, + "participants": conv.participants, + "message_count": len(conv.messages), + "updated_time": _ts_ms_to_iso(latest_ts_ms), + "messages_file": f"messages/{safe}.jsonl", + } + ) + elif src.is_dir(): + groups = _dir_conversation_groups(src) + for folder, files in sorted(groups.items(), key=lambda kv: str(kv[0])): + parts: list[tuple[str, Any]] = [] + for p in files: + try: + parts.append((str(p), json.loads(p.read_text(encoding="utf-8", errors="replace")))) + except Exception: + continue + conv = _load_conversation_parts(parts) + conv_key = conv.conv_id if conv.conv_id != "unknown" else folder.name + safe = _safe_slug(conv_key) + dst = out / "messages" / f"{safe}.jsonl" + latest_ts_ms: int | None = None + with dst.open("w", encoding="utf-8") as f: + for m in conv.messages: + ts_ms = m.get("timestamp_ms") if isinstance(m.get("timestamp_ms"), int) else None + latest_ts_ms = ts_ms if (ts_ms and (latest_ts_ms is None or ts_ms > latest_ts_ms)) else latest_ts_ms + row = { + "source": "instagram_export", + "conversation_id": conv_key, + "conversation_title": conv.title, + "participants": conv.participants, + "timestamp_ms": ts_ms, + "created_time": _ts_ms_to_iso(ts_ms), + "sender": m.get("sender_name"), + "text": _extract_text(m), + "type": m.get("type"), + "attachments": _extract_attachments(m), + } + f.write(json.dumps(row, ensure_ascii=False) + "\n") + total_messages += 1 + conversations_out.append( + { + "id": conv_key, + "title": conv.title, + "participants": conv.participants, + "message_count": len(conv.messages), + "updated_time": _ts_ms_to_iso(latest_ts_ms), + "messages_file": f"messages/{safe}.jsonl", + } + ) + else: + raise SystemExit("Input must be an Instagram data export .zip file or an extracted export directory.") + + (out / "conversations.json").write_text(json.dumps({"data": conversations_out}, indent=2) + "\n") + (out / "export_metadata.json").write_text( + json.dumps( + { + "exported_at": _now_utc_iso(), + "source": "instagram_export", + "input": str(src), + "conversations": len(conversations_out), + "messages": total_messages, + }, + indent=2, + ) + + "\n" + ) + + print(f"Done. conversations={len(conversations_out)} messages={total_messages} out={out}") + + +def main(argv: list[str] | None = None) -> int: + ap = argparse.ArgumentParser(description="Import Instagram 'Download your information' message export (zip or folder).") + ap.add_argument("--input", required=True, help="path to export .zip or extracted export directory") + ap.add_argument("--out", required=True, help="output directory") + args = ap.parse_args(argv) + + try: + import_instagram_export(input_path=args.input, out_dir=args.out) + except KeyboardInterrupt: + return 130 + except SystemExit: + raise + except Exception as e: + print(f"Import failed: {e}", file=sys.stderr) + return 1 + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/sergio_instagram_messaging/index_instagram_export.py b/sergio_instagram_messaging/index_instagram_export.py new file mode 100644 index 0000000..c866c72 --- /dev/null +++ b/sergio_instagram_messaging/index_instagram_export.py @@ -0,0 +1,214 @@ +from __future__ import annotations + +import argparse +import json +import re +import sys +import time +from collections import Counter +from datetime import datetime, timezone +from pathlib import Path + + +def _now_utc_iso() -> str: + return datetime.now(timezone.utc).replace(microsecond=0).isoformat() + + +def _decode_json_str(raw: bytes) -> str: + """ + Decode a JSON string fragment (no surrounding quotes) into a Python string. + Handles escape sequences. + """ + try: + return json.loads(b'"' + raw + b'"') + except Exception: + return raw.decode("utf-8", errors="replace") + + +def _resolve_inbox_root(input_path: Path) -> Path: + """ + Accept either: + - export root directory (contains messages/inbox) + - the inbox directory itself + """ + p = input_path + if p.is_dir() and p.name.lower() == "inbox": + return p + candidate = p / "messages" / "inbox" + if candidate.is_dir(): + return candidate + raise FileNotFoundError(f"Could not find messages/inbox under: {p}") + + +def index_export( + *, + input_path: Path, + out_path: Path, + chunk_kb: int, + fallback_chunk_kb: int, + max_conversations: int | None, + progress_every: int, +) -> dict: + inbox_root = _resolve_inbox_root(input_path) + out_path.parent.mkdir(parents=True, exist_ok=True) + + re_ts = re.compile(rb'"timestamp_ms"\s*:\s*(\d{10,})') + re_sender = re.compile(rb'"sender_name"\s*:\s*"((?:[^"\\]|\\.)*)"') + re_title = re.compile(rb'"title"\s*:\s*"((?:[^"\\]|\\.)*)"') + re_thread = re.compile(rb'"thread_path"\s*:\s*"((?:[^"\\]|\\.)*)"') + + scanned = 0 + indexed = 0 + missing = 0 + errors = 0 + + min_ts: int | None = None + max_ts: int | None = None + sender_counts: Counter[str] = Counter() + + started = time.time() + + with out_path.open("w", encoding="utf-8") as out: + for conv_dir in inbox_root.iterdir(): + if not conv_dir.is_dir(): + continue + scanned += 1 + if max_conversations is not None and scanned > max_conversations: + break + + msg1 = conv_dir / "message_1.json" + if not msg1.exists(): + candidates = sorted(conv_dir.glob("message*.json"), key=lambda p: p.name) + msg1 = candidates[0] if candidates else msg1 + + if not msg1.exists(): + missing += 1 + continue + + head = b"" + try: + with msg1.open("rb") as f: + head = f.read(max(1, chunk_kb) * 1024) + except Exception: + errors += 1 + continue + + m_ts = re_ts.search(head) + if not m_ts and fallback_chunk_kb > chunk_kb: + try: + with msg1.open("rb") as f: + head = f.read(max(1, fallback_chunk_kb) * 1024) + except Exception: + errors += 1 + continue + m_ts = re_ts.search(head) + + if not m_ts: + errors += 1 + continue + + try: + ts_ms = int(m_ts.group(1)) + except Exception: + errors += 1 + continue + + sender: str | None = None + m_sender = re_sender.search(head) + if m_sender: + sender = _decode_json_str(m_sender.group(1)) + + title: str | None = None + m_title = re_title.search(head) + if m_title: + title = _decode_json_str(m_title.group(1)) + + thread_path: str | None = None + m_thread = re_thread.search(head) + if m_thread: + thread_path = _decode_json_str(m_thread.group(1)) + + indexed += 1 + if sender: + sender_counts[sender] += 1 + + min_ts = ts_ms if min_ts is None or ts_ms < min_ts else min_ts + max_ts = ts_ms if max_ts is None or ts_ms > max_ts else max_ts + + out.write( + json.dumps( + { + "conv_dir": conv_dir.name, + "message_file": msg1.name, + "thread_path": thread_path, + "title": title, + "latest_timestamp_ms": ts_ms, + "latest_sender": sender, + }, + ensure_ascii=False, + ) + + "\n" + ) + + if progress_every and scanned % progress_every == 0: + elapsed = int(time.time() - started) + print( + f"scanned={scanned} indexed={indexed} missing={missing} errors={errors} elapsed_s={elapsed}", + file=sys.stderr, + flush=True, + ) + + def _iso(ts: int | None) -> str | None: + if ts is None: + return None + return datetime.fromtimestamp(ts / 1000, tz=timezone.utc).replace(microsecond=0).isoformat() + + return { + "exported_at": _now_utc_iso(), + "input": str(input_path), + "inbox_root": str(inbox_root), + "out": str(out_path), + "scanned_dirs": scanned, + "indexed_conversations": indexed, + "missing_message_json": missing, + "errors": errors, + "latest_time_utc": _iso(max_ts), + "oldest_time_utc": _iso(min_ts), + "top_latest_senders": sender_counts.most_common(20), + "elapsed_s": int(time.time() - started), + } + + +def main(argv: list[str] | None = None) -> int: + ap = argparse.ArgumentParser(description="Index Instagram 'Download your information' export (fast scan).") + ap.add_argument("--input", required=True, help="export root directory (contains messages/inbox) or inbox dir") + ap.add_argument("--out", required=True, help="output index JSONL path") + ap.add_argument("--chunk-kb", type=int, default=64, help="bytes to read from each message file (KB)") + ap.add_argument("--fallback-chunk-kb", type=int, default=256, help="retry read size if timestamp not found (KB)") + ap.add_argument("--max-conversations", type=int, default=None, help="stop after this many conversation folders") + ap.add_argument("--progress-every", type=int, default=1000, help="progress log frequency (stderr)") + args = ap.parse_args(argv) + + try: + meta = index_export( + input_path=Path(args.input), + out_path=Path(args.out), + chunk_kb=int(args.chunk_kb), + fallback_chunk_kb=int(args.fallback_chunk_kb), + max_conversations=(int(args.max_conversations) if args.max_conversations else None), + progress_every=int(args.progress_every), + ) + meta_path = Path(args.out).with_suffix(".metadata.json") + meta_path.write_text(json.dumps(meta, indent=2, ensure_ascii=False) + "\n") + print(json.dumps(meta, indent=2, ensure_ascii=False)) + return 0 + except FileNotFoundError as e: + print(str(e), file=sys.stderr) + return 2 + except KeyboardInterrupt: + return 130 + + +if __name__ == "__main__": + raise SystemExit(main()) + diff --git a/sergio_instagram_messaging/meta_device_login.py b/sergio_instagram_messaging/meta_device_login.py new file mode 100644 index 0000000..5013724 --- /dev/null +++ b/sergio_instagram_messaging/meta_device_login.py @@ -0,0 +1,366 @@ +from __future__ import annotations + +import argparse +import json +import sys +import time +import urllib.error +import urllib.parse +import urllib.request +from dataclasses import dataclass +from datetime import datetime, timezone +from pathlib import Path +from typing import Any + +from .export_meta_ig_history import DEFAULT_CREDS_PATH, load_dotenv + + +DEFAULT_STATE_PATH = "/root/tmp/meta-device-login-state.json" + + +def _now_ts() -> int: + return int(datetime.now(timezone.utc).timestamp()) + + +def _post_form(url: str, data: dict[str, str]) -> dict[str, Any]: + body = urllib.parse.urlencode(data).encode("utf-8") + req = urllib.request.Request(url, data=body, method="POST") + req.add_header("Content-Type", "application/x-www-form-urlencoded") + try: + with urllib.request.urlopen(req, timeout=60) as r: + return json.loads(r.read().decode("utf-8")) + except urllib.error.HTTPError as e: + raise SystemExit(_format_meta_http_error(e)) + + +def _get_json(url: str, params: dict[str, str]) -> dict[str, Any]: + full = f"{url}?{urllib.parse.urlencode(params)}" + try: + with urllib.request.urlopen(full, timeout=60) as r: + return json.loads(r.read().decode("utf-8")) + except urllib.error.HTTPError as e: + raise SystemExit(_format_meta_http_error(e)) + + +def _format_meta_http_error(e: urllib.error.HTTPError) -> str: + body = e.read().decode("utf-8", errors="replace") + try: + payload = json.loads(body) + except Exception: + payload = {"raw": body[:1000]} + + def redact(obj: Any) -> Any: + if isinstance(obj, dict): + return {k: ("" if k == "access_token" else redact(v)) for k, v in obj.items()} + if isinstance(obj, list): + return [redact(v) for v in obj] + return obj + + err = payload.get("error", payload) + err = redact(err) + return f"Meta API request failed (HTTP {e.code}): {json.dumps(err, ensure_ascii=False)}" + + +def _write_dotenv_kv(path: str, updates: dict[str, str]) -> None: + p = Path(path) + lines: list[str] = [] + if p.exists(): + lines = p.read_text(errors="replace").splitlines() + + seen: set[str] = set() + out: list[str] = [] + for raw in lines: + line = raw.rstrip("\n") + stripped = line.strip() + if not stripped or stripped.startswith("#") or "=" not in stripped: + out.append(line) + continue + key, _value = stripped.split("=", 1) + key = key.strip() + if key in updates: + out.append(f"{key}={updates[key]}") + seen.add(key) + else: + out.append(line) + + for key, value in updates.items(): + if key in seen: + continue + out.append(f"{key}={value}") + + tmp = p.with_name(p.name + ".tmp") + tmp.write_text("\n".join(out).rstrip("\n") + "\n") + tmp.chmod(0o600) + tmp.replace(p) + + +@dataclass(frozen=True) +class AppCreds: + app_id: str + app_secret: str | None + client_token: str | None + graph_version: str + + @property + def app_access_token(self) -> str: + if not self.app_secret: + raise SystemExit("Missing META_APP_SECRET in creds file.") + return f"{self.app_id}|{self.app_secret}" + + @property + def client_access_token(self) -> str: + if not self.client_token: + raise SystemExit( + "Missing META_CLIENT_TOKEN in creds file. " + "For Facebook Login for Devices, /device/login requires a client access token: APP_ID|CLIENT_TOKEN." + ) + return f"{self.app_id}|{self.client_token}" + + @property + def base(self) -> str: + return f"https://graph.facebook.com/{self.graph_version}" + + +def _load_app_creds(creds_path: str) -> AppCreds: + env = load_dotenv(creds_path) + app_id = (env.get("META_APP_ID") or "").strip() + app_secret = (env.get("META_APP_SECRET") or "").strip() or None + client_token = (env.get("META_CLIENT_TOKEN") or "").strip() or None + client_access_token = (env.get("META_CLIENT_ACCESS_TOKEN") or "").strip() + if client_access_token and "|" in client_access_token: + parsed_app_id, parsed_client_token = client_access_token.split("|", 1) + if parsed_app_id.strip(): + app_id = parsed_app_id.strip() + if parsed_client_token.strip(): + client_token = parsed_client_token.strip() + graph_version = (env.get("META_GRAPH_API_VERSION") or "v20.0").strip() or "v20.0" + if not app_id: + raise SystemExit("Missing META_APP_ID in creds file.") + return AppCreds(app_id=app_id, app_secret=app_secret, client_token=client_token, graph_version=graph_version) + + +def cmd_start(*, creds_path: str, state_path: str, scope: str) -> int: + creds = _load_app_creds(creds_path) + print(f"using_app_id: {creds.app_id} graph_version: {creds.graph_version}") + resp = _post_form( + f"{creds.base}/device/login", + {"access_token": creds.client_access_token, "scope": scope}, + ) + + required = ["code", "user_code", "verification_uri", "expires_in", "interval"] + missing = [k for k in required if k not in resp] + if missing: + raise SystemExit(f"Unexpected response from device/login; missing {missing}: {resp}") + + expires_at = _now_ts() + int(resp["expires_in"]) + state = { + "created_at": _now_ts(), + "expires_at": expires_at, + "interval": int(resp["interval"]), + "code": resp["code"], + "user_code": resp["user_code"], + "verification_uri": resp["verification_uri"], + "scope": scope, + "app_id": creds.app_id, + "graph_version": creds.graph_version, + } + + sp = Path(state_path) + sp.parent.mkdir(parents=True, exist_ok=True) + sp.write_text(json.dumps(state, indent=2) + "\n") + sp.chmod(0o600) + + print("Device login started.") + print(f"1) Open: {state['verification_uri']}") + print(f"2) Enter code: {state['user_code']}") + print(f"3) Then run: python3 -m sergio_instagram_messaging.meta_device_login poll --state {state_path}") + print(f"(expires at unix ts: {expires_at})") + return 0 + + +def _read_state(state_path: str) -> dict[str, Any]: + sp = Path(state_path) + if not sp.exists(): + raise SystemExit(f"State file not found: {state_path}") + return json.loads(sp.read_text()) + + +def _exchange_long_lived(*, creds: AppCreds, short_token: str) -> str: + if not creds.app_secret: + raise SystemExit("Missing META_APP_SECRET in creds file; cannot exchange a long-lived user token.") + resp = _get_json( + f"{creds.base}/oauth/access_token", + { + "grant_type": "fb_exchange_token", + "client_id": creds.app_id, + "client_secret": creds.app_secret, + "fb_exchange_token": short_token, + }, + ) + token = (resp.get("access_token") or "").strip() + if not token: + raise SystemExit(f"Token exchange failed: {resp}") + return token + + +def _discover_page_and_token(*, creds: AppCreds, user_token: str, target_ig_user_id: str | None) -> dict[str, str]: + # Request page access tokens; DO NOT print them. + payload = _get_json( + f"{creds.base}/me/accounts", + { + "fields": "id,name,access_token,instagram_business_account", + "access_token": user_token, + }, + ) + data = payload.get("data") or [] + if not isinstance(data, list) or not data: + raise SystemExit("No pages found for this user token (me/accounts returned empty).") + + chosen: dict[str, Any] | None = None + if target_ig_user_id: + for p in data: + if not isinstance(p, dict): + continue + iba = p.get("instagram_business_account") or {} + if isinstance(iba, dict) and str(iba.get("id") or "") == str(target_ig_user_id): + chosen = p + break + + if chosen is None: + # Fall back: first page with an Instagram business account attached. + for p in data: + if not isinstance(p, dict): + continue + if p.get("instagram_business_account"): + chosen = p + break + + if chosen is None: + raise SystemExit( + "Could not find a Page with an attached instagram_business_account. " + "Connect the Instagram professional account to a Facebook Page first." + ) + + page_id = str(chosen.get("id") or "").strip() + page_token = str(chosen.get("access_token") or "").strip() + if not page_id or not page_token: + raise SystemExit("Selected page is missing id/access_token in response.") + + iba = chosen.get("instagram_business_account") or {} + ig_id = str(iba.get("id") or "").strip() if isinstance(iba, dict) else "" + + return { + "META_PAGE_ID": page_id, + "META_PAGE_ACCESS_TOKEN": page_token, + **({"META_IG_USER_ID": ig_id} if ig_id else {}), + } + + +def cmd_poll( + *, + creds_path: str, + state_path: str, + write_page_token: bool, + target_ig_user_id: str | None, + max_wait_s: int, +) -> int: + state = _read_state(state_path) + creds = _load_app_creds(creds_path) + print(f"using_app_id: {creds.app_id} graph_version: {creds.graph_version}") + + expires_at = int(state.get("expires_at") or 0) + interval = int(state.get("interval") or 5) + code = str(state.get("code") or "").strip() + if not code: + raise SystemExit("Invalid state file: missing code.") + + deadline = min(expires_at, _now_ts() + max_wait_s) + while _now_ts() < deadline: + resp = _post_form( + f"{creds.base}/device/login_status", + {"access_token": creds.client_access_token, "code": code}, + ) + + token = (resp.get("access_token") or "").strip() + if token: + # Store short-lived and long-lived user tokens. + updates = {"META_USER_ACCESS_TOKEN": token} + try: + updates["META_USER_ACCESS_TOKEN_LONG"] = _exchange_long_lived(creds=creds, short_token=token) + except BaseException: + # Not fatal; keep short-lived token. + pass + + user_token = updates.get("META_USER_ACCESS_TOKEN_LONG") or updates["META_USER_ACCESS_TOKEN"] + + if write_page_token: + try: + updates.update( + _discover_page_and_token( + creds=creds, + user_token=user_token, + target_ig_user_id=target_ig_user_id, + ) + ) + except SystemExit as e: + print( + "Warning: could not discover META_PAGE_ID/META_PAGE_ACCESS_TOKEN from this user token.\n" + f"Reason: {e}\n" + "Saved user token(s) only; connect the Instagram professional account to a Facebook Page, " + "then run meta_page_token_from_user_token.", + file=sys.stderr, + ) + + _write_dotenv_kv(creds_path, updates) + print(f"Saved tokens to {creds_path} (no values printed).") + return 0 + + # Waiting; Meta returns error-ish objects until the user authorizes. + time.sleep(max(1, interval)) + + raise SystemExit("Timed out waiting for device authorization.") + + +def main(argv: list[str] | None = None) -> int: + ap = argparse.ArgumentParser(description="Meta device login helper (Facebook Login for Devices).") + ap.set_defaults(cmd=None) + sub = ap.add_subparsers(required=True) + + start = sub.add_parser("start", help="Start device login and print user_code + URL") + start.set_defaults(cmd="start") + start.add_argument("--creds", default=DEFAULT_CREDS_PATH) + start.add_argument("--state", default=DEFAULT_STATE_PATH) + start.add_argument( + "--scope", + default="pages_show_list,pages_read_engagement,instagram_manage_messages", + help="Comma-separated permissions to request", + ) + + poll = sub.add_parser("poll", help="Poll for completion and save tokens into creds file") + poll.set_defaults(cmd="poll") + poll.add_argument("--creds", default=DEFAULT_CREDS_PATH) + poll.add_argument("--state", default=DEFAULT_STATE_PATH) + poll.add_argument("--write-page-token", action="store_true", help="Also store META_PAGE_ID and META_PAGE_ACCESS_TOKEN") + poll.add_argument( + "--target-ig-user-id", + default=None, + help="Prefer selecting the Page connected to this instagram_business_account id", + ) + poll.add_argument("--max-wait-s", type=int, default=900) + + args = ap.parse_args(argv) + if args.cmd == "start": + return cmd_start(creds_path=args.creds, state_path=args.state, scope=args.scope) + if args.cmd == "poll": + return cmd_poll( + creds_path=args.creds, + state_path=args.state, + write_page_token=bool(args.write_page_token), + target_ig_user_id=(args.target_ig_user_id or None), + max_wait_s=int(args.max_wait_s), + ) + raise SystemExit(2) + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/sergio_instagram_messaging/meta_page_token_from_user_token.py b/sergio_instagram_messaging/meta_page_token_from_user_token.py new file mode 100644 index 0000000..726697c --- /dev/null +++ b/sergio_instagram_messaging/meta_page_token_from_user_token.py @@ -0,0 +1,231 @@ +from __future__ import annotations + +import argparse +import json +import sys +from pathlib import Path +from typing import Any + +from .export_meta_ig_history import DEFAULT_CREDS_PATH, GraphApiClient, GraphApiConfig, GraphApiError, load_dotenv + + +DEFAULT_USER_TOKEN_PATH = "/root/tmp/meta-user-access-token.txt" + + +def _redact(obj: Any) -> Any: + if isinstance(obj, dict): + return {k: ("" if k == "access_token" else _redact(v)) for k, v in obj.items()} + if isinstance(obj, list): + return [_redact(v) for v in obj] + return obj + + +def _write_dotenv_kv(path: str, updates: dict[str, str]) -> None: + p = Path(path) + lines: list[str] = [] + if p.exists(): + lines = p.read_text(errors="replace").splitlines() + + seen: set[str] = set() + out: list[str] = [] + for raw in lines: + line = raw.rstrip("\n") + stripped = line.strip() + if not stripped or stripped.startswith("#") or "=" not in stripped: + out.append(line) + continue + key, _value = stripped.split("=", 1) + key = key.strip() + if key in updates: + out.append(f"{key}={updates[key]}") + seen.add(key) + else: + out.append(line) + + for key, value in updates.items(): + if key in seen: + continue + out.append(f"{key}={value}") + + tmp = p.with_name(p.name + ".tmp") + tmp.write_text("\n".join(out).rstrip("\n") + "\n") + tmp.chmod(0o600) + tmp.replace(p) + + +def _read_user_token(*, user_token: str | None, user_token_file: str) -> str: + if user_token is not None: + token = user_token.strip() + if not token: + raise SystemExit("Empty --user-token provided.") + return token + + p = Path(user_token_file) + if not p.exists(): + raise SystemExit( + f"User token file not found: {user_token_file}\n" + "Create it with mode 600 and put a Facebook User access token inside (single line)." + ) + raw = p.read_text(errors="replace") + lines = [ln.strip() for ln in raw.splitlines()] + lines = [ln for ln in lines if ln and not ln.startswith("#")] + if not lines: + raise SystemExit(f"User token file is empty: {user_token_file}") + + # Accept either: + # - a single raw token line + # - an env-style line: META_USER_ACCESS_TOKEN=... + # - a file with multiple lines (we'll pick the most likely user-token line) + candidates: list[str] = [] + for ln in lines: + if "=" in ln: + _k, v = ln.split("=", 1) + v = v.strip() + if v: + candidates.append(v) + else: + candidates.append(ln) + + # Heuristic: Facebook user tokens typically start with "EA" and are long. + user_like = [c for c in candidates if c.startswith("EA") and len(c) > 50] + if user_like: + return max(user_like, key=len) + + # Fallback: pick the longest non-empty candidate. + return max(candidates, key=len) + + +def main(argv: list[str] | None = None) -> int: + ap = argparse.ArgumentParser( + description=( + "Derive a Page access token from a Facebook User access token (no token values printed). " + "Useful fallback when device-login is blocked." + ) + ) + ap.add_argument("--creds", default=DEFAULT_CREDS_PATH, help="dotenv-style creds file to update") + ap.add_argument("--user-token-file", default=DEFAULT_USER_TOKEN_PATH) + ap.add_argument( + "--user-token", + default=None, + help="Provide the user token inline (not recommended; may leak in shell history). Prefer --user-token-file.", + ) + ap.add_argument("--target-ig-user-id", default=None, help="Prefer a Page connected to this instagram_business_account id") + ap.add_argument("--page-id", default=None, help="Force selecting a specific Page id from /me/accounts") + ap.add_argument("--platform", default="instagram") + ap.add_argument("--version", default=None, help="override graph api version (e.g., v20.0)") + ap.add_argument("--list-pages", action="store_true", help="List pages (id/name/has_ig) without saving tokens") + ap.add_argument("--dry-run", action="store_true", help="Do not write into creds file") + args = ap.parse_args(argv) + + env = load_dotenv(args.creds) + version = (args.version or env.get("META_GRAPH_API_VERSION") or "v20.0").strip() or "v20.0" + user_token = _read_user_token(user_token=args.user_token, user_token_file=args.user_token_file) + client = GraphApiClient(GraphApiConfig(access_token=user_token, version=version)) + + try: + me = client.get_json("/me", {"fields": "id,name"}) + me_id = str(me.get("id") or "").strip() + me_name = str(me.get("name") or "").strip() + print(f"/me: id={me_id} name={me_name}") + + payload = client.get_json( + "/me/accounts", + { + "fields": "id,name,access_token,instagram_business_account", + "limit": "200", + }, + ) + data = payload.get("data") or [] + if not isinstance(data, list) or not data: + print("No pages found for this user token (me/accounts returned empty).", file=sys.stderr) + return 2 + + if args.list_pages: + for p in data: + if not isinstance(p, dict): + continue + pid = str(p.get("id") or "").strip() + name = str(p.get("name") or "").strip() + iba = p.get("instagram_business_account") + ig_id = "" + if isinstance(iba, dict): + ig_id = str(iba.get("id") or "").strip() + has_ig = bool(ig_id) + print(f"page: id={pid} name={name} has_ig={has_ig} ig_id={ig_id}") + return 0 + + chosen: dict[str, Any] | None = None + if args.page_id: + for p in data: + if not isinstance(p, dict): + continue + if str(p.get("id") or "").strip() == str(args.page_id).strip(): + chosen = p + break + if chosen is None: + print(f"--page-id not found in /me/accounts: {args.page_id}", file=sys.stderr) + return 2 + + if chosen is None and args.target_ig_user_id: + for p in data: + if not isinstance(p, dict): + continue + iba = p.get("instagram_business_account") or {} + if isinstance(iba, dict) and str(iba.get("id") or "") == str(args.target_ig_user_id): + chosen = p + break + + if chosen is None: + for p in data: + if not isinstance(p, dict): + continue + if p.get("instagram_business_account"): + chosen = p + break + + if chosen is None: + print( + "Could not find a Page with an attached instagram_business_account.\n" + "Connect the Instagram professional account to a Facebook Page first.", + file=sys.stderr, + ) + return 2 + + page_id = str(chosen.get("id") or "").strip() + page_name = str(chosen.get("name") or "").strip() + page_token = str(chosen.get("access_token") or "").strip() + iba = chosen.get("instagram_business_account") or {} + ig_id = str(iba.get("id") or "").strip() if isinstance(iba, dict) else "" + if not page_id or not page_token: + print("Selected page is missing id/access_token in response.", file=sys.stderr) + return 2 + + print(f"selected_page: id={page_id} name={page_name}") + if ig_id: + print(f"selected_instagram_business_account_id: {ig_id}") + + if args.dry_run: + print("dry_run: not writing creds") + return 0 + + updates = { + "META_PAGE_ID": page_id, + "META_PAGE_ACCESS_TOKEN": page_token, + } + if ig_id: + updates["META_IG_USER_ID"] = ig_id + + _write_dotenv_kv(args.creds, updates) + print(f"Saved META_PAGE_ID + META_PAGE_ACCESS_TOKEN to {args.creds} (no values printed).") + return 0 + except GraphApiError as e: + payload = e.args[0] if e.args else {"error": str(e)} + print(json.dumps(_redact(payload), ensure_ascii=False), file=sys.stderr) + return 3 + except Exception as e: + print(f"Error: {e}", file=sys.stderr) + return 1 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/sergio_instagram_messaging/meta_subscribe_page.py b/sergio_instagram_messaging/meta_subscribe_page.py new file mode 100644 index 0000000..b388a56 --- /dev/null +++ b/sergio_instagram_messaging/meta_subscribe_page.py @@ -0,0 +1,106 @@ +from __future__ import annotations + +import argparse +import json +import sys +import urllib.parse +import urllib.request +from typing import Any + +from .export_meta_ig_history import DEFAULT_CREDS_PATH, GraphApiClient, GraphApiConfig, GraphApiError, load_dotenv + + +def _require(env: dict[str, str], key: str) -> str: + value = (env.get(key) or "").strip() + if not value: + raise KeyError(f"Missing required config: {key}") + return value + + +def _post_form(url: str, data: dict[str, str]) -> dict[str, Any]: + body = urllib.parse.urlencode(data).encode("utf-8") + req = urllib.request.Request(url, data=body, method="POST") + req.add_header("Content-Type", "application/x-www-form-urlencoded") + with urllib.request.urlopen(req, timeout=60) as r: + return json.loads(r.read().decode("utf-8")) + + +def subscribe_page(*, creds_path: str, fields: str) -> dict[str, Any]: + env = load_dotenv(creds_path) + page_id = _require(env, "META_PAGE_ID") + page_token = _require(env, "META_PAGE_ACCESS_TOKEN") + version = (env.get("META_GRAPH_API_VERSION") or "v24.0").strip() or "v24.0" + + url = f"https://graph.facebook.com/{version}/{page_id}/subscribed_apps" + try: + return _post_form( + url, + { + "subscribed_fields": fields, + "access_token": page_token, + }, + ) + except urllib.error.HTTPError as e: + body = e.read().decode("utf-8", errors="replace") + try: + payload = json.loads(body) + except Exception: + payload = {"raw": body[:1000]} + raise GraphApiError({"status": e.code, "error": payload}) + + +def list_subscribed_apps(*, creds_path: str) -> dict[str, Any]: + env = load_dotenv(creds_path) + page_id = _require(env, "META_PAGE_ID") + page_token = _require(env, "META_PAGE_ACCESS_TOKEN") + version = (env.get("META_GRAPH_API_VERSION") or "v24.0").strip() or "v24.0" + client = GraphApiClient(GraphApiConfig(access_token=page_token, version=version)) + return client.get_json(f"/{page_id}/subscribed_apps", {"fields": "id,name"}) + + +def main(argv: list[str] | None = None) -> int: + ap = argparse.ArgumentParser( + description=( + "Subscribe the Facebook Page to this Meta app's webhooks.\n\n" + "Note: this requires a Page access token that includes the pages_manage_metadata permission." + ) + ) + ap.set_defaults(cmd=None) + sub = ap.add_subparsers(required=True) + + p_list = sub.add_parser("list", help="List subscribed apps for the Page") + p_list.set_defaults(cmd="list") + p_list.add_argument("--creds", default=DEFAULT_CREDS_PATH) + + p_sub = sub.add_parser("subscribe", help="Subscribe the Page to this app's webhook fields") + p_sub.set_defaults(cmd="subscribe") + p_sub.add_argument("--creds", default=DEFAULT_CREDS_PATH) + p_sub.add_argument( + "--fields", + default="messages", + help="Comma-separated subscribed_fields (default: messages)", + ) + + args = ap.parse_args(argv) + + try: + if args.cmd == "list": + out = list_subscribed_apps(creds_path=args.creds) + print(json.dumps(out, indent=2, ensure_ascii=False)) + return 0 + if args.cmd == "subscribe": + out = subscribe_page(creds_path=args.creds, fields=args.fields) + print(json.dumps(out, indent=2, ensure_ascii=False)) + return 0 + raise SystemExit(2) + except KeyError as e: + print(str(e), file=sys.stderr) + return 2 + except GraphApiError as e: + print(f"Graph API error: {e}", file=sys.stderr) + return 3 + + +if __name__ == "__main__": + raise SystemExit(main()) + diff --git a/sergio_instagram_messaging/meta_token_doctor.py b/sergio_instagram_messaging/meta_token_doctor.py new file mode 100644 index 0000000..ac9b249 --- /dev/null +++ b/sergio_instagram_messaging/meta_token_doctor.py @@ -0,0 +1,63 @@ +from __future__ import annotations + +import argparse +import json +import sys +from typing import Any + +from .export_meta_ig_history import DEFAULT_CREDS_PATH, GraphApiClient, GraphApiConfig, GraphApiError, load_dotenv + + +def _redact(obj: Any) -> Any: + if isinstance(obj, dict): + return {k: ("" if k == "access_token" else _redact(v)) for k, v in obj.items()} + if isinstance(obj, list): + return [_redact(v) for v in obj] + return obj + + +def main(argv: list[str] | None = None) -> int: + ap = argparse.ArgumentParser(description="Quick sanity checks for Meta Graph API tokens (no token values printed).") + ap.add_argument("--creds", default=DEFAULT_CREDS_PATH) + ap.add_argument("--platform", default="instagram") + ap.add_argument("--version", default=None, help="override graph api version (e.g., v20.0)") + args = ap.parse_args(argv) + + env = load_dotenv(args.creds) + token = (env.get("META_PAGE_ACCESS_TOKEN") or "").strip() + if not token: + print("Missing META_PAGE_ACCESS_TOKEN in creds file.", file=sys.stderr) + return 2 + + version = (args.version or env.get("META_GRAPH_API_VERSION") or "v20.0").strip() or "v20.0" + client = GraphApiClient(GraphApiConfig(access_token=token, version=version)) + + try: + me = client.get_json("/me", {"fields": "id,name"}) + me_id = str(me.get("id") or "").strip() + me_name = str(me.get("name") or "").strip() + print(f"/me: id={me_id} name={me_name}") + + page_id = (env.get("META_PAGE_ID") or "").strip() or me_id + print(f"page_id_used: {page_id}") + + test = client.get_json( + f"/{page_id}/conversations", + {"platform": args.platform, "limit": "1", "fields": "id,updated_time,message_count,participants"}, + ) + data = test.get("data") or [] + print(f"conversations_first_page_count: {len(data) if isinstance(data, list) else 0}") + if isinstance(data, list) and data: + print(f"sample_conversation_id: {data[0].get('id')}") + return 0 + except GraphApiError as e: + print(json.dumps(_redact({"graph_api_error": str(e)}), ensure_ascii=False), file=sys.stderr) + return 3 + except Exception as e: + print(f"Error: {e}", file=sys.stderr) + return 1 + + +if __name__ == "__main__": + raise SystemExit(main()) + diff --git a/sergio_instagram_messaging/send_ig_message.py b/sergio_instagram_messaging/send_ig_message.py new file mode 100644 index 0000000..baad82d --- /dev/null +++ b/sergio_instagram_messaging/send_ig_message.py @@ -0,0 +1,81 @@ +from __future__ import annotations + +import argparse +import json +import sys +import urllib.parse +import urllib.request +from typing import Any + +from .export_meta_ig_history import DEFAULT_CREDS_PATH, GraphApiError, load_dotenv + + +def _require(env: dict[str, str], key: str) -> str: + value = (env.get(key) or "").strip() + if not value: + raise KeyError(f"Missing required config: {key}") + return value + + +def _post_json(url: str, payload: dict[str, Any]) -> dict[str, Any]: + body = json.dumps(payload, ensure_ascii=False).encode("utf-8") + req = urllib.request.Request(url, data=body, method="POST") + req.add_header("Content-Type", "application/json") + try: + with urllib.request.urlopen(req, timeout=30) as r: + return json.loads(r.read().decode("utf-8")) + except urllib.error.HTTPError as e: + body = e.read().decode("utf-8", errors="replace") + try: + payload = json.loads(body) + except Exception: + payload = {"raw": body[:1000]} + raise GraphApiError({"status": e.code, "error": payload}) + + +def send_text(*, creds_path: str, recipient_id: str, text: str, version: str | None) -> dict[str, Any]: + env = load_dotenv(creds_path) + token = _require(env, "META_PAGE_ACCESS_TOKEN") + graph_version = (version or env.get("META_GRAPH_API_VERSION") or "v24.0").strip() or "v24.0" + + url = ( + f"https://graph.facebook.com/{graph_version}/me/messages?" + + urllib.parse.urlencode({"access_token": token}) + ) + return _post_json( + url, + { + "recipient": {"id": str(recipient_id)}, + "message": {"text": text}, + }, + ) + + +def main(argv: list[str] | None = None) -> int: + ap = argparse.ArgumentParser(description="Send an Instagram DM via Meta Graph API (Messenger API for Instagram).") + ap.add_argument("--creds", default=DEFAULT_CREDS_PATH) + ap.add_argument("--recipient-id", required=True) + ap.add_argument("--text", required=True) + ap.add_argument("--version", default=None, help="override graph api version (e.g., v24.0)") + args = ap.parse_args(argv) + + try: + out = send_text( + creds_path=args.creds, + recipient_id=str(args.recipient_id), + text=str(args.text), + version=(args.version or None), + ) + print(json.dumps(out, indent=2, ensure_ascii=False)) + return 0 + except KeyError as e: + print(str(e), file=sys.stderr) + return 2 + except GraphApiError as e: + print(f"Graph API error: {e}", file=sys.stderr) + return 3 + + +if __name__ == "__main__": + raise SystemExit(main()) + diff --git a/sergio_instagram_messaging/simple_png.py b/sergio_instagram_messaging/simple_png.py new file mode 100644 index 0000000..9a5ca7a --- /dev/null +++ b/sergio_instagram_messaging/simple_png.py @@ -0,0 +1,145 @@ +from __future__ import annotations + +import struct +import zlib +from dataclasses import dataclass + + +def _png_chunk(kind: bytes, data: bytes) -> bytes: + length = struct.pack(">I", len(data)) + crc = zlib.crc32(kind) + crc = zlib.crc32(data, crc) + crc_bytes = struct.pack(">I", crc & 0xFFFFFFFF) + return length + kind + data + crc_bytes + + +def write_png_rgb(path: str, *, width: int, height: int, rgb: bytes) -> None: + if width <= 0 or height <= 0: + raise ValueError("width and height must be positive") + if len(rgb) != width * height * 3: + raise ValueError("rgb buffer must be width*height*3 bytes") + + signature = b"\x89PNG\r\n\x1a\n" + ihdr = struct.pack(">IIBBBBB", width, height, 8, 2, 0, 0, 0) # 8-bit, RGB + + stride = width * 3 + raw = bytearray() + for y in range(height): + raw.append(0) # filter type 0 + start = y * stride + raw.extend(rgb[start : start + stride]) + + compressed = zlib.compress(bytes(raw), level=6) + + png = bytearray() + png.extend(signature) + png.extend(_png_chunk(b"IHDR", ihdr)) + png.extend(_png_chunk(b"IDAT", compressed)) + png.extend(_png_chunk(b"IEND", b"")) + + with open(path, "wb") as f: + f.write(png) + + +def _clamp(v: int) -> int: + return 0 if v < 0 else 255 if v > 255 else v + + +def _blend(bg: tuple[int, int, int], fg: tuple[int, int, int], alpha: float) -> tuple[int, int, int]: + a = 0.0 if alpha < 0 else 1.0 if alpha > 1 else alpha + return ( + _clamp(int(bg[0] * (1.0 - a) + fg[0] * a)), + _clamp(int(bg[1] * (1.0 - a) + fg[1] * a)), + _clamp(int(bg[2] * (1.0 - a) + fg[2] * a)), + ) + + +@dataclass +class Canvas: + width: int + height: int + _rgb: bytearray + + @classmethod + def new(cls, width: int, height: int, *, bg: tuple[int, int, int] = (255, 255, 255)) -> "Canvas": + if width <= 0 or height <= 0: + raise ValueError("width and height must be positive") + buf = bytearray(width * height * 3) + c = cls(width=width, height=height, _rgb=buf) + c.clear(bg) + return c + + def clear(self, color: tuple[int, int, int]) -> None: + r, g, b = (_clamp(color[0]), _clamp(color[1]), _clamp(color[2])) + for i in range(0, len(self._rgb), 3): + self._rgb[i] = r + self._rgb[i + 1] = g + self._rgb[i + 2] = b + + def to_bytes(self) -> bytes: + return bytes(self._rgb) + + def save(self, path: str) -> None: + write_png_rgb(path, width=self.width, height=self.height, rgb=self.to_bytes()) + + def _idx(self, x: int, y: int) -> int | None: + if x < 0 or y < 0 or x >= self.width or y >= self.height: + return None + return (y * self.width + x) * 3 + + def set_pixel(self, x: int, y: int, color: tuple[int, int, int]) -> None: + idx = self._idx(x, y) + if idx is None: + return + self._rgb[idx] = _clamp(color[0]) + self._rgb[idx + 1] = _clamp(color[1]) + self._rgb[idx + 2] = _clamp(color[2]) + + def blend_pixel(self, x: int, y: int, color: tuple[int, int, int], *, alpha: float) -> None: + idx = self._idx(x, y) + if idx is None: + return + bg = (self._rgb[idx], self._rgb[idx + 1], self._rgb[idx + 2]) + r, g, b = _blend((int(bg[0]), int(bg[1]), int(bg[2])), color, alpha) + self._rgb[idx] = r + self._rgb[idx + 1] = g + self._rgb[idx + 2] = b + + def draw_rect(self, x0: int, y0: int, x1: int, y1: int, color: tuple[int, int, int]) -> None: + if x0 > x1: + x0, x1 = x1, x0 + if y0 > y1: + y0, y1 = y1, y0 + x0 = max(0, x0) + y0 = max(0, y0) + x1 = min(self.width - 1, x1) + y1 = min(self.height - 1, y1) + for y in range(y0, y1 + 1): + for x in range(x0, x1 + 1): + self.set_pixel(x, y, color) + + def draw_line(self, x0: int, y0: int, x1: int, y1: int, color: tuple[int, int, int]) -> None: + dx = abs(x1 - x0) + dy = -abs(y1 - y0) + sx = 1 if x0 < x1 else -1 + sy = 1 if y0 < y1 else -1 + err = dx + dy + x, y = x0, y0 + while True: + self.set_pixel(x, y, color) + if x == x1 and y == y1: + break + e2 = 2 * err + if e2 >= dy: + err += dy + x += sx + if e2 <= dx: + err += dx + y += sy + + def draw_polyline(self, points: list[tuple[int, int]], color: tuple[int, int, int]) -> None: + if len(points) < 2: + return + for (x0, y0), (x1, y1) in zip(points, points[1:]): + self.draw_line(x0, y0, x1, y1, color) +