Initial import: Emo-Social Insta DM agent

This commit is contained in:
danny 2025-12-24 07:57:00 +00:00
commit 9b53aaff9e
18 changed files with 4166 additions and 0 deletions

11
.dockerignore Normal file
View file

@ -0,0 +1,11 @@
__pycache__/
*.pyc
*.pyo
*.pyd
.pytest_cache/
.mypy_cache/
.ruff_cache/
.venv/
venv/
.git/

10
.gitignore vendored Normal file
View file

@ -0,0 +1,10 @@
__pycache__/
*.pyc
.venv/
venv/
.DS_Store
.idea/
.vscode/
/dist/
/build/

11
Dockerfile Normal file
View file

@ -0,0 +1,11 @@
FROM python:3.13-slim
WORKDIR /app
COPY sergio_instagram_messaging/ /app/sergio_instagram_messaging/
COPY README.md /app/README.md
ENV PYTHONUNBUFFERED=1
CMD ["python", "-m", "sergio_instagram_messaging.export_meta_ig_history", "--help"]

161
README.md Normal file
View file

@ -0,0 +1,161 @@
# Emo-Social Insta DM Agent
Project for the **Emo-Social Instagram DM agent** for `@socialmediatorr` (distinct from the existing Sergio RAG DB agent).
Includes:
- Meta Graph API exporter (full DM history)
- Instagram “Download your information” importer
- DM analysis pipeline (bot-vs-human, conversions, objections, rescue logic, product eras)
- Helpers for device login + page subscription + sending replies
## Where this runs
Build/run the Docker image in `pct 250` (ai-dev) and keep production (`pct 220`) clean.
## Credentials
Source of truth creds file (host):
- `/root/tmp/emo-social-meta-app-creds.txt`
Inside `pct 250`, place the same file at the same path:
- `/root/tmp/emo-social-meta-app-creds.txt`
Required for export:
- `META_PAGE_ID`
- `META_PAGE_ACCESS_TOKEN`
## Build (pct 250)
- `docker build -t emo-social-insta-dm-agent:dev /root/ai-workspace/emo-social-insta-dm-agent`
Note: in this LXC, `docker run` / `podman run` can fail due to AppArmor confinement. Use the **direct Python** commands below unless you change the LXC config to be AppArmor-unconfined.
## Obtain tokens (pct 250)
`meta_device_login` requires `META_CLIENT_TOKEN` (from Meta app dashboard → Settings → Advanced → Client token).
Alternatively, set `META_CLIENT_ACCESS_TOKEN` as `APP_ID|CLIENT_TOKEN` to override `META_APP_ID` for device-login only.
If `meta_device_login start` returns OAuth error `(#3) enable "Login from Devices"`, enable it in the *same app id* the script prints:
- `https://developers.facebook.com/apps/<APP_ID>/fb-login/settings/` → set **Login from Devices** = Yes
Start device login (prints a URL + code to enter; no tokens are printed):
- `cd /root/ai-workspace/emo-social-insta-dm-agent && python3 -m sergio_instagram_messaging.meta_device_login start`
After authorizing in the browser, poll and write `META_PAGE_ID` + `META_PAGE_ACCESS_TOKEN` into `/root/tmp/emo-social-meta-app-creds.txt`:
- `cd /root/ai-workspace/emo-social-insta-dm-agent && python3 -m sergio_instagram_messaging.meta_device_login poll --write-page-token --target-ig-user-id 17841466913731557`
### Fallback: Graph API Explorer → user token → page token
If device-login is blocked, get a **Facebook User access token** via Graph API Explorer and derive the Page token:
- Open: `https://developers.facebook.com/tools/explorer/`
- Select the correct app, then generate a user token with:
- `pages_show_list,pages_read_engagement,instagram_manage_messages`
- Save the user token to (keep mode `600`):
- `/root/tmp/meta-user-access-token.txt`
- Then write `META_PAGE_ID` + `META_PAGE_ACCESS_TOKEN` into `/root/tmp/emo-social-meta-app-creds.txt` (no token values printed):
- `cd /root/ai-workspace/emo-social-insta-dm-agent && python3 -m sergio_instagram_messaging.meta_page_token_from_user_token --target-ig-user-id 17841466913731557`
## Export history (pct 250)
Quick token sanity check (does not print token values):
- `cd /root/ai-workspace/emo-social-insta-dm-agent && python3 -m sergio_instagram_messaging.meta_token_doctor`
Export to a mounted directory so results persist on disk:
- `docker run --rm -v /root/tmp:/root/tmp emo-social-insta-dm-agent:dev python -m sergio_instagram_messaging.export_meta_ig_history --out /root/tmp/emo-social-insta-dm-history`
- (Direct) `cd /root/ai-workspace/emo-social-insta-dm-agent && python3 -m sergio_instagram_messaging.export_meta_ig_history --out /root/tmp/emo-social-insta-dm-history`
### Workaround: fetch a single thread by `user_id`
If `/conversations` listing times out, you can still fetch a single thread if you know the senders `user_id` (from webhook `sender.id`):
- `cd /root/ai-workspace/emo-social-insta-dm-agent && python3 -m sergio_instagram_messaging.fetch_thread_messages --user-id <SENDER_ID> --max-pages 2 --out /root/tmp/thread.jsonl`
Small test run:
- `docker run --rm -v /root/tmp:/root/tmp emo-social-insta-dm-agent:dev python -m sergio_instagram_messaging.export_meta_ig_history --out /root/tmp/emo-social-insta-dm-history --max-conversations 3 --max-pages 2`
- (Direct) `cd /root/ai-workspace/emo-social-insta-dm-agent && python3 -m sergio_instagram_messaging.export_meta_ig_history --out /root/tmp/emo-social-insta-dm-history --max-conversations 3 --max-pages 2`
## Full history fallback (Instagram export)
If Meta app review blocks Graph API DM access, export Sergios IG data via Instagram “Download your information” and import it:
- `cd /root/ai-workspace/emo-social-insta-dm-agent && python3 -m sergio_instagram_messaging.import_instagram_export --input /path/to/instagram-export.zip --out /root/tmp/emo-social-insta-dm-export-history`
## Analyze DM history (Behavioral Cloning / “Biographical Sales”)
This produces the “Sergio persona” artifacts needed for the DM agent:
- Separates frequent `[BOT]` templates vs rare `[MANUAL]` replies (plus `[HYBRID]`).
- Builds **bot fatigue** + **script editorial timeline** charts.
- Extracts **training pairs** (user → Sergio manual reply) from converted threads.
- Generates **rescue playbook** (human saves after silence/negative sentiment).
- Generates **objection handlers** (price/time/trust/stop → best replies).
- Builds a quarterly **eras** CSV (offers/pricing + vocabulary drift).
Outputs are written with mode `600` and may contain sensitive DM content. Keep them out of git.
### Analyze a raw Instagram export folder (recommended)
Optional: index first (lets you filter recency without scanning every thread):
- `python3 -m sergio_instagram_messaging.index_instagram_export --input /path/to/your_instagram_activity --out /root/tmp/socialmediatorr-ig-index.jsonl`
Then analyze:
- `python3 -m sergio_instagram_messaging.analyze_instagram_export --input /path/to/your_instagram_activity --out /root/tmp/socialmediatorr-agent-analysis --owner-name "Sergio de Vocht" --index /root/tmp/socialmediatorr-ig-index.jsonl --since-days 180`
### Analyze an imported history dir (messages/*.jsonl)
If you already ran `import_instagram_export` (or have a partial import output), point the analyzer at that directory:
- `python3 -m sergio_instagram_messaging.analyze_instagram_export --input /root/tmp/socialmediatorr-ig-export-history --out /root/tmp/socialmediatorr-agent-analysis --owner-name "Sergio de Vocht"`
### Two-stage workflow (verify templates before full run)
Pass 1 generates `top_outgoing_templates.json` + `template_counts.jsonl`:
- `python3 -m sergio_instagram_messaging.analyze_instagram_export --input /path/to/your_instagram_activity --out /root/tmp/socialmediatorr-agent-analysis --stage pass1`
Pass 2 reuses the cache and writes the full deliverables:
- `python3 -m sergio_instagram_messaging.analyze_instagram_export --input /path/to/your_instagram_activity --out /root/tmp/socialmediatorr-agent-analysis --stage pass2 --templates-cache /root/tmp/socialmediatorr-agent-analysis/top_outgoing_templates.json`
### Human-readable report (English)
After analysis, generate a single Markdown report:
- `python3 -m sergio_instagram_messaging.generate_dm_report --analysis-dir /root/tmp/socialmediatorr-agent-analysis`
## Webhooks (new messages → auto-reply)
Meta webhooks are two steps:
1) App subscription (`/{app_id}/subscriptions`) — sets callback URL + verify token + fields.
2) Page subscription (`/{page_id}/subscribed_apps`) — attaches the Page/IG inbox to the app.
The production webhook endpoint exists at:
- `https://emo-social.infrafabric.io/meta/webhook`
### Subscribe the Page to the app (required for message delivery)
This requires a Page access token that includes `pages_manage_metadata`.
Device login with that scope:
- `cd /root/ai-workspace/emo-social-insta-dm-agent && python3 -m sergio_instagram_messaging.meta_device_login start --scope pages_show_list,pages_read_engagement,instagram_manage_messages,pages_manage_metadata`
- After authorizing in the browser, poll and write tokens:
- `cd /root/ai-workspace/emo-social-insta-dm-agent && python3 -m sergio_instagram_messaging.meta_device_login poll --write-page-token --target-ig-user-id 17841466913731557`
Then subscribe the Page:
- `cd /root/ai-workspace/emo-social-insta-dm-agent && python3 -m sergio_instagram_messaging.meta_subscribe_page subscribe --fields messages`

14
compose.yaml Normal file
View file

@ -0,0 +1,14 @@
services:
exporter:
build: .
image: emo-social-insta-dm-agent:dev
volumes:
- /root/tmp:/root/tmp
command:
[
"python",
"-m",
"sergio_instagram_messaging.export_meta_ig_history",
"--out",
"/root/tmp/emo-social-insta-dm-history",
]

View file

@ -0,0 +1 @@
"""Sergio Instagram messaging utilities."""

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,340 @@
from __future__ import annotations
import argparse
import json
import os
import re
import sys
import time
import urllib.error
import urllib.parse
import urllib.request
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Iterable
DEFAULT_CREDS_PATH = "/root/tmp/emo-social-meta-app-creds.txt"
def _now_utc_iso() -> str:
return datetime.now(timezone.utc).replace(microsecond=0).isoformat()
def load_dotenv(path: str) -> dict[str, str]:
env: dict[str, str] = {}
p = Path(path)
if not p.exists():
raise FileNotFoundError(path)
for raw in p.read_text(errors="replace").splitlines():
line = raw.strip()
if not line or line.startswith("#"):
continue
if "=" not in line:
continue
key, value = line.split("=", 1)
key = key.strip()
value = value.strip()
if not key:
continue
env[key] = value
return env
def _require(env: dict[str, str], key: str) -> str:
value = (env.get(key) or "").strip()
if not value:
raise KeyError(f"Missing required config: {key}")
return value
def _optional(env: dict[str, str], key: str, default: str) -> str:
value = (env.get(key) or "").strip()
return value if value else default
@dataclass(frozen=True)
class GraphApiConfig:
access_token: str
version: str = "v20.0"
@property
def base(self) -> str:
return f"https://graph.facebook.com/{self.version}"
class GraphApiError(RuntimeError):
def __init__(self, payload: dict[str, Any]):
self.payload = payload
super().__init__(payload)
class GraphApiClient:
def __init__(self, cfg: GraphApiConfig):
self._cfg = cfg
def get_json(self, path: str, params: dict[str, str] | None = None) -> dict[str, Any]:
params = dict(params or {})
params["access_token"] = self._cfg.access_token
url = f"{self._cfg.base}{path}?{urllib.parse.urlencode(params)}"
try:
with urllib.request.urlopen(url, timeout=60) as r:
return json.loads(r.read().decode("utf-8"))
except urllib.error.HTTPError as e:
body = e.read().decode("utf-8", errors="replace")
try:
payload = json.loads(body)
except Exception:
payload = {"raw": body[:1000]}
raise GraphApiError({"status": e.code, "error": payload})
def iter_paged(
self,
path: str,
params: dict[str, str] | None = None,
*,
max_pages: int | None = None,
sleep_s: float = 0.0,
) -> Iterable[dict[str, Any]]:
version_prefix = re.compile(r"^/v\d+\.\d+")
page = 0
next_path = path
next_params = dict(params or {})
while True:
page += 1
if max_pages is not None and page > max_pages:
return
payload = self.get_json(next_path, next_params)
data = payload.get("data") or []
if isinstance(data, list):
for item in data:
if isinstance(item, dict):
yield item
paging = payload.get("paging") or {}
next_url = paging.get("next")
if not next_url:
return
parsed = urllib.parse.urlparse(next_url)
next_path = parsed.path
# Meta's paging URLs include the version prefix (e.g. /v24.0/...).
# Our client already pins a version in cfg.base, so strip it here.
m = version_prefix.match(next_path)
if m:
next_path = next_path[m.end() :] or "/"
next_params = dict(urllib.parse.parse_qsl(parsed.query))
# Ensure we don't leak a token from the paging URL if Meta includes it.
next_params["access_token"] = self._cfg.access_token
if sleep_s:
time.sleep(sleep_s)
def export_history(
*,
creds_path: str,
out_dir: str,
platform: str,
conversations_fields: str,
messages_fields: str,
conversations_limit: int | None,
max_conversations: int | None,
max_pages: int | None,
sleep_s: float,
) -> None:
env = load_dotenv(creds_path)
token = _require(env, "META_PAGE_ACCESS_TOKEN")
version = _optional(env, "META_GRAPH_API_VERSION", "v20.0")
client = GraphApiClient(GraphApiConfig(access_token=token, version=version))
page_id = (env.get("META_PAGE_ID") or "").strip()
page_name: str | None = None
if not page_id:
me = client.get_json("/me", {"fields": "id,name"})
page_id = str(me.get("id") or "").strip()
page_name = str(me.get("name") or "").strip() or None
if not page_id:
raise KeyError("Missing required config: META_PAGE_ID (and token did not resolve /me.id)")
out = Path(out_dir)
out.mkdir(parents=True, exist_ok=True)
(out / "messages").mkdir(parents=True, exist_ok=True)
meta_path = out / "export_metadata.json"
meta_path.write_text(
json.dumps(
{
"exported_at": _now_utc_iso(),
"graph_api_version": version,
"platform": platform,
"page_id": page_id,
"page_name": page_name,
"conversations_fields": conversations_fields,
"messages_fields": messages_fields,
},
indent=2,
)
+ "\n"
)
conversations: list[dict[str, Any]] = []
conv_params = {"platform": platform, "fields": conversations_fields}
if conversations_limit is not None:
conv_params["limit"] = str(conversations_limit)
def _iter_conversations() -> Iterable[dict[str, Any]]:
return client.iter_paged(
f"/{page_id}/conversations",
params=conv_params,
max_pages=max_pages,
sleep_s=sleep_s,
)
def _is_large_inbox_timeout(err: GraphApiError) -> bool:
if not isinstance(err.payload, dict):
return False
meta = (err.payload.get("error") or {}).get("error")
if not isinstance(meta, dict):
return False
return meta.get("code") == -2 and meta.get("error_subcode") == 2534084
def _collect_conversations() -> None:
nonlocal conversations
try:
for conv in _iter_conversations():
conversations.append(conv)
if max_conversations is not None and len(conversations) >= max_conversations:
break
except GraphApiError as e:
if _is_large_inbox_timeout(e) and conversations:
print(
f"Warning: conversation listing timed out after {len(conversations)} conversation(s); "
"continuing with partial export.",
file=sys.stderr,
)
return
raise
try:
_collect_conversations()
except GraphApiError as e:
if _is_large_inbox_timeout(e) and conv_params.get("limit") != "1":
print(
"Conversation listing timed out; retrying with limit=1 (workaround for large inboxes).",
file=sys.stderr,
)
conv_params["limit"] = "1"
_collect_conversations()
else:
raise
(out / "conversations.json").write_text(json.dumps({"data": conversations}, indent=2) + "\n")
exported_messages = 0
for idx, conv in enumerate(conversations, 1):
conv_id = str(conv.get("id") or "").strip()
if not conv_id:
continue
dst = out / "messages" / f"{conv_id}.jsonl"
tmp_dst = dst.with_suffix(dst.suffix + ".tmp")
def _is_too_much_data_error(err: GraphApiError) -> bool:
if not isinstance(err.payload, dict):
return False
if err.payload.get("status") != 500:
return False
meta = (err.payload.get("error") or {}).get("error")
if not isinstance(meta, dict):
return False
msg = str(meta.get("message") or "").lower()
return meta.get("code") == 1 and "reduce the amount of data" in msg
def _export_messages(*, fields: str) -> int:
count = 0
with tmp_dst.open("w", encoding="utf-8") as f:
for msg in client.iter_paged(
f"/{conv_id}/messages",
params={"fields": fields},
max_pages=max_pages,
sleep_s=sleep_s,
):
f.write(json.dumps(msg, ensure_ascii=False) + "\n")
count += 1
tmp_dst.replace(dst)
return count
try:
exported_messages += _export_messages(fields=messages_fields)
except GraphApiError as e:
if tmp_dst.exists():
tmp_dst.unlink(missing_ok=True) # type: ignore[arg-type]
if _is_too_much_data_error(e):
fallback_fields = "message,created_time,from,to"
print(
f"Warning: message export for {conv_id} was too large; retrying without attachments.",
file=sys.stderr,
)
exported_messages += _export_messages(fields=fallback_fields)
else:
raise
print(f"[{idx}/{len(conversations)}] exported conversation {conv_id}: {dst}")
print(f"Done. conversations={len(conversations)} messages={exported_messages} out={out}")
def main(argv: list[str] | None = None) -> int:
ap = argparse.ArgumentParser(description="Export Instagram DM history via Meta Graph API.")
ap.add_argument("--creds", default=DEFAULT_CREDS_PATH, help="dotenv-style creds file path")
ap.add_argument("--out", required=True, help="output directory")
ap.add_argument("--platform", default=os.getenv("META_EXPORT_PLATFORM", "instagram"))
ap.add_argument(
"--conversations-fields",
default=os.getenv("META_CONVERSATIONS_FIELDS", "participants,updated_time,message_count,unread_count"),
)
ap.add_argument(
"--messages-fields",
default=os.getenv("META_MESSAGES_FIELDS", "message,created_time,from,to,attachments"),
)
ap.add_argument(
"--conversations-limit",
type=int,
default=int(os.getenv("META_CONVERSATIONS_LIMIT", "25")),
help="Graph API limit for conversation listing (use 1 if listing times out).",
)
ap.add_argument("--max-conversations", type=int, default=None)
ap.add_argument("--max-pages", type=int, default=None, help="max pages per paginated endpoint")
ap.add_argument("--sleep-s", type=float, default=0.2, help="sleep between page fetches")
args = ap.parse_args(argv)
try:
export_history(
creds_path=args.creds,
out_dir=args.out,
platform=args.platform,
conversations_fields=args.conversations_fields,
messages_fields=args.messages_fields,
conversations_limit=args.conversations_limit,
max_conversations=args.max_conversations,
max_pages=args.max_pages,
sleep_s=args.sleep_s,
)
except KeyError as e:
print(str(e), file=sys.stderr)
return 2
except FileNotFoundError as e:
print(f"Creds file not found: {e}", file=sys.stderr)
return 2
except GraphApiError as e:
print(f"Graph API error: {e}", file=sys.stderr)
return 3
return 0
if __name__ == "__main__":
raise SystemExit(main())

View file

@ -0,0 +1,123 @@
from __future__ import annotations
import argparse
import json
import os
import sys
from pathlib import Path
from typing import Any
from .export_meta_ig_history import (
DEFAULT_CREDS_PATH,
GraphApiClient,
GraphApiConfig,
GraphApiError,
load_dotenv,
)
def _redact(obj: Any) -> Any:
if isinstance(obj, dict):
return {k: ("<redacted>" if k == "access_token" else _redact(v)) for k, v in obj.items()}
if isinstance(obj, list):
return [_redact(v) for v in obj]
return obj
def _require(env: dict[str, str], key: str) -> str:
value = (env.get(key) or "").strip()
if not value:
raise KeyError(f"Missing required config: {key}")
return value
def main(argv: list[str] | None = None) -> int:
ap = argparse.ArgumentParser(
description=(
"Fetch messages for a single Instagram thread by participant user id (sender.id from webhook). "
"Useful when /conversations listing times out."
)
)
ap.add_argument("--creds", default=DEFAULT_CREDS_PATH)
ap.add_argument("--user-id", required=True, help="Instagram-scoped user id (from webhook sender.id)")
ap.add_argument("--platform", default=os.getenv("META_EXPORT_PLATFORM", "instagram"))
ap.add_argument("--version", default=None, help="override graph api version (e.g., v20.0)")
ap.add_argument(
"--messages-fields",
default=os.getenv("META_MESSAGES_FIELDS", "message,created_time,from,to,attachments"),
)
ap.add_argument("--max-pages", type=int, default=1, help="pages to fetch from /messages pagination")
ap.add_argument("--sleep-s", type=float, default=0.2, help="sleep between page fetches")
ap.add_argument("--out", default=None, help="write JSONL to this path (default: stdout)")
args = ap.parse_args(argv)
env = load_dotenv(args.creds)
token = _require(env, "META_PAGE_ACCESS_TOKEN")
page_id = (env.get("META_PAGE_ID") or "").strip()
version = (args.version or env.get("META_GRAPH_API_VERSION") or "v20.0").strip() or "v20.0"
client = GraphApiClient(GraphApiConfig(access_token=token, version=version))
try:
if not page_id:
me = client.get_json("/me", {"fields": "id"})
page_id = str(me.get("id") or "").strip()
if not page_id:
raise KeyError("Missing required config: META_PAGE_ID (and token did not resolve /me.id)")
conv = client.get_json(
f"/{page_id}/conversations",
{
"platform": args.platform,
"user_id": args.user_id,
"fields": "id,updated_time,message_count,unread_count,participants",
"limit": "1",
},
)
data = conv.get("data") or []
if not isinstance(data, list) or not data or not isinstance(data[0], dict):
print("No conversation found for this user_id.", file=sys.stderr)
return 2
conv_id = str(data[0].get("id") or "").strip()
if not conv_id:
print("Conversation result missing id.", file=sys.stderr)
return 2
out_f = None
if args.out:
out_path = Path(args.out)
out_path.parent.mkdir(parents=True, exist_ok=True)
out_f = out_path.open("w", encoding="utf-8")
else:
out_f = sys.stdout
written = 0
for msg in client.iter_paged(
f"/{conv_id}/messages",
params={"fields": args.messages_fields},
max_pages=args.max_pages,
sleep_s=args.sleep_s,
):
out_f.write(json.dumps(msg, ensure_ascii=False) + "\n")
written += 1
if args.out:
out_f.close()
print(f"conversation_id: {conv_id} messages_written: {written}", file=sys.stderr)
return 0
except GraphApiError as e:
payload = e.args[0] if e.args else {"error": str(e)}
print(json.dumps(_redact(payload), ensure_ascii=False), file=sys.stderr)
return 3
except KeyError as e:
print(str(e), file=sys.stderr)
return 2
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
return 1
if __name__ == "__main__":
raise SystemExit(main())

View file

@ -0,0 +1,347 @@
from __future__ import annotations
import argparse
import csv
import json
import os
import statistics
from dataclasses import dataclass
from pathlib import Path
from typing import Any
def _safe_chmod_600(path: Path) -> None:
try:
os.chmod(path, 0o600)
except Exception:
return
def _load_json(path: Path) -> dict[str, Any]:
return json.loads(path.read_text(encoding="utf-8", errors="replace"))
def _read_csv(path: Path) -> list[dict[str, str]]:
with path.open("r", encoding="utf-8", newline="") as f:
return list(csv.DictReader(f))
def _count_jsonl(path: Path, *, max_lines: int = 5_000_000) -> int:
n = 0
with path.open("r", encoding="utf-8", errors="replace") as f:
for _ in f:
n += 1
if n >= max_lines:
break
return n
def _pct(x: float) -> str:
return f"{x*100:.1f}%"
@dataclass(frozen=True)
class ReportInputs:
summary: Path
templates: Path
bot_audit: Path
objections: Path
rescue: Path
eras: Path
training_pairs: Path
fatigue_png: Path
editorial_png: Path
def _resolve_inputs(analysis_dir: Path) -> ReportInputs:
return ReportInputs(
summary=analysis_dir / "summary.json",
templates=analysis_dir / "top_outgoing_templates.json",
bot_audit=analysis_dir / "bot_performance_audit.csv",
objections=analysis_dir / "objection_handlers.json",
rescue=analysis_dir / "rescue_playbook.json",
eras=analysis_dir / "sergio_eras.csv",
training_pairs=analysis_dir / "training_pairs.jsonl",
fatigue_png=analysis_dir / "bot_fatigue_chart.png",
editorial_png=analysis_dir / "editorial_timeline.png",
)
def generate_report(*, analysis_dir: Path, out_path: Path) -> Path:
inp = _resolve_inputs(analysis_dir)
for p in inp.__dict__.values():
if not Path(p).exists():
raise FileNotFoundError(str(p))
summary = _load_json(inp.summary)
templates = _load_json(inp.templates)
objections = _load_json(inp.objections)
rescues = _load_json(inp.rescue)
bot_audit = _read_csv(inp.bot_audit)
owner = summary.get("owner_name") or "Unknown"
conv = summary.get("conversations") or {}
conv_total = int(conv.get("total") or 0)
bot_only = int(conv.get("bot_only") or 0)
human = int(conv.get("human_intervened") or 0)
conversions = summary.get("conversions") or {}
conv_intent = int(conversions.get("intent") or 0)
conv_confirmed = int(conversions.get("confirmed") or 0)
bot_only_rate = (bot_only / conv_total) if conv_total else 0.0
human_rate = (human / conv_total) if conv_total else 0.0
intent_rate = (conv_intent / conv_total) if conv_total else 0.0
confirmed_rate = (conv_confirmed / conv_total) if conv_total else 0.0
manual_style = summary.get("manual_style") or {}
median_len = manual_style.get("median_len_chars")
p90_len = manual_style.get("p90_len_chars")
question_rate = float(manual_style.get("question_rate") or 0.0)
exclaim_rate = float(manual_style.get("exclaim_rate") or 0.0)
emoji_rate = float(manual_style.get("emoji_rate") or 0.0)
lang_guess = manual_style.get("lang_guess") or {}
# Templates: prefer canonical strings (safe-ish) and avoid raw samples.
top_templates = templates.get("top_templates") or []
top_bot = [t for t in top_templates if isinstance(t, dict) and t.get("label_hint") == "bot"]
top_manual = [t for t in top_templates if isinstance(t, dict) and t.get("label_hint") == "manual"]
# Bot audit: best/worst by reply_rate.
def fnum(v: str | None) -> float:
try:
return float(v or 0)
except Exception:
return 0.0
bot_audit_sorted = sorted(bot_audit, key=lambda r: fnum(r.get("sent")), reverse=True)
top_audit = bot_audit_sorted[:10]
best_reply = sorted(bot_audit, key=lambda r: fnum(r.get("reply_rate")), reverse=True)[:10]
worst_reply = sorted(bot_audit, key=lambda r: fnum(r.get("reply_rate")))[:10]
# Objections: most common replies per category.
objection_blocks: list[str] = []
if isinstance(objections, dict):
for cat in ("price", "time", "trust", "stop"):
replies = objections.get(cat) or []
if not isinstance(replies, list) or not replies:
continue
top3 = []
for r in replies[:3]:
if not isinstance(r, dict):
continue
top3.append(f"- ({r.get('count')}) {r.get('reply')}")
if top3:
objection_blocks.append(f"### {cat}\n" + "\n".join(top3))
rescue_count = len(rescues) if isinstance(rescues, list) else 0
pairs_count = _count_jsonl(inp.training_pairs, max_lines=2_000_000)
# Era summary: simple high-level notes.
eras_rows = _read_csv(inp.eras)
era_recent = eras_rows[-6:] if len(eras_rows) > 6 else eras_rows
era_offer_terms: list[str] = []
for row in era_recent:
offers = (row.get("top_offers") or "").strip()
if offers:
era_offer_terms.append(offers)
# A few derived notes.
lang_line = ", ".join(f"{k}={v}" for k, v in lang_guess.items())
# Summarize bot fatigue trend from image existence only (analysis already made it).
report = []
report.append("# Socialmediatorr Instagram DM History — Human Readable Report (English)")
report.append("")
report.append(f"- Generated: `{summary.get('generated_at')}`")
report.append(f"- Owner name used: `{owner}`")
report.append("")
report.append("## 1) What This Dataset Represents")
report.append("")
report.append(
"This is an all-time audit of Instagram DM conversations for `@socialmediatorr`, focused on extracting repeatable sales + support behavior so an AI agent can reply in Sergios style."
)
report.append(
"The analysis treats the account as a hybrid system: frequent repeated templates (likely automation/scripts) plus lower-frequency custom replies (human Sergio)."
)
report.append("")
report.append("## 2) High-Level Metrics (All-Time)")
report.append("")
report.append(f"- Conversations analyzed: **{conv_total:,}**")
report.append(f"- Bot-only conversations: **{bot_only:,}** ({_pct(bot_only_rate)})")
report.append(f"- Human-intervened conversations: **{human:,}** ({_pct(human_rate)})")
report.append(f"- Conversion (intent signals): **{conv_intent:,}** ({_pct(intent_rate)})")
report.append(f"- Conversion (confirmed signals): **{conv_confirmed:,}** ({_pct(confirmed_rate)})")
report.append("")
report.append(
"Notes on conversion: this uses heuristics (keywords + payment/link mentions). It is directionally useful for ranking scripts, but it is not a ground-truth revenue ledger."
)
report.append("")
report.append("## 3) Sergio Persona (From Manual/Hybrid Replies)")
report.append("")
report.append(f"- Typical reply length: median **{median_len}** chars (p90 **{p90_len}**)")
report.append(f"- Questions: **{_pct(question_rate)}** | Exclamations: **{_pct(exclaim_rate)}** | Emoji: **{_pct(emoji_rate)}**")
report.append(f"- Language guess (manual replies): {lang_line or 'n/a'}")
report.append("")
report.append("Practical implication for an agent: short, direct replies; minimal punctuation; bilingual capability; low/no emoji usage.")
report.append("")
report.append("## 4) Bot vs Human Segmentation (What It Means)")
report.append("")
report.append(
"- **[BOT]** = outgoing message template repeated frequently (>= configured threshold).\n"
"- **[MANUAL]** = outgoing message that is rare/unique (<= configured threshold).\n"
"- **[HYBRID]** = messages that look like a bot template but with manual edits (prefix match/similarity)."
)
report.append("")
report.append(
"This separation is the foundation for: (1) extracting safe reusable scripts, and (2) extracting human-only replies as training data for a RAG or fine-tune."
)
report.append("")
report.append("## 5) Top Detected Script Templates (Canonicalized)")
report.append("")
if top_bot:
for i, t in enumerate(top_bot[:10], 1):
canon = (t.get("canonical") or "").strip()
count = int(t.get("count") or 0)
report.append(f"- BOT #{i}: sent **{count}**× — `{canon[:160]}`")
else:
report.append("- (No high-frequency bot templates detected with current thresholds.)")
report.append("")
report.append("## 6) Human Reply Library (Rare/Manual Examples, Canonicalized)")
report.append("")
if top_manual:
for i, t in enumerate(top_manual[:10], 1):
canon = (t.get("canonical") or "").strip()
count = int(t.get("count") or 0)
report.append(f"- MANUAL-ish #{i}: seen **{count}**× — `{canon[:160]}`")
else:
report.append("- (No low-frequency manual templates included in the cached top list.)")
report.append("")
report.append("## 7) Bot Template Performance (Reply/Conversion Heuristics)")
report.append("")
report.append("These come from `bot_performance_audit.csv` and are computed per canonical bot template.")
report.append("")
if top_audit:
report.append("### Most-used bot templates (by volume)")
for r in top_audit[:8]:
report.append(
f"- sent={r.get('sent')} reply_rate={r.get('reply_rate')} intent_rate={r.get('conversion_intent_rate')} confirmed_rate={r.get('conversion_confirmed_rate')} — `{(r.get('canonical_template') or '')[:140]}`"
)
report.append("")
if best_reply:
report.append("### Best reply-rate bot templates")
for r in best_reply[:8]:
report.append(f"- reply_rate={r.get('reply_rate')} sent={r.get('sent')} — `{(r.get('canonical_template') or '')[:140]}`")
report.append("")
if worst_reply:
report.append("### Worst reply-rate bot templates")
for r in worst_reply[:8]:
report.append(f"- reply_rate={r.get('reply_rate')} sent={r.get('sent')} — `{(r.get('canonical_template') or '')[:140]}`")
report.append("")
report.append("## 8) Objections → Best Sergio Replies (Playbook)")
report.append("")
if objection_blocks:
report.extend(objection_blocks)
else:
report.append("- No objection handlers detected with current keyword rules.")
report.append("")
report.append("## 9) Rescue / Save Logic (Human Intervention After Silence/Negativity)")
report.append("")
report.append(f"- Rescue events detected (heuristic): **{rescue_count:,}**")
report.append(
"A “rescue” is when a manual/hybrid owner message follows either (a) a user negative signal, or (b) >24h silence after a bot message, and the thread later shows a confirmed conversion signal."
)
report.append("")
report.append("## 10) Product / Offer Evolution (Eras)")
report.append("")
report.append(
"This is inferred from mentions of pricing/currency + offer terms (e.g., call/audit/coaching) and summarized quarterly."
)
report.append("")
if era_offer_terms:
report.append("Recent quarters (top extracted offer signals):")
for line in era_offer_terms:
report.append(f"- {line}")
else:
report.append("- No offer signals detected in the most recent quarters with current extraction rules.")
report.append("")
report.append("## 11) Charts")
report.append("")
report.append(f"- Bot fatigue (weekly reply rate to the dominant bot script): `{inp.fatigue_png}`")
report.append(f"- Editorial timeline (top bot scripts vs conversions): `{inp.editorial_png}`")
report.append("")
report.append("## 12) What To Build From This (Agent Requirements)")
report.append("")
report.append("### Core behavior")
report.append("- Start with top bot templates for predictable openers and FAQ-style flows.")
report.append("- Switch to Sergio-style manual patterns on objections, negotiation, or when conversation stalls.")
report.append("- Use a rescue cadence (time-based triggers) after silence.")
report.append("")
report.append("### Data products to drive the agent")
report.append(f"- Training pairs (manual-only, converted threads): `{inp.training_pairs}` (rows: ~{pairs_count:,})")
report.append(f"- Objection handlers: `{inp.objections}`")
report.append(f"- Rescue playbook: `{inp.rescue}`")
report.append(f"- Script templates + editorial drift: `{inp.templates}`")
report.append("")
report.append("### Safety boundaries (recommended)")
report.append("- Never request or store passwords/2FA codes.")
report.append("- Avoid medical/legal/financial advice; redirect to a call or a human.")
report.append("- If user asks to move off-platform, follow Sergios historical policy and business rules.")
report.append("")
report.append("## 13) What We Do NOT Need To Know (Ignore / Do Not Store)")
report.append("")
report.append("- Exact client identities (names, handles, phone numbers, emails) unless required for operational routing.")
report.append("- Media attachments (photos/videos/audio) for persona cloning; they add storage cost and privacy risk.")
report.append("- Full verbatim message dumps for every thread; for RAG you only need high-quality pairs and playbook snippets.")
report.append("- Individual one-off edge cases that never repeat (unless they represent a safety boundary).")
report.append("- Internal Meta export folder structure details beyond `messages/inbox/**/message*.json`.")
report.append("")
report.append("## 14) Caveats / Gaps")
report.append("")
report.append("- The export does not reliably label ManyChat vs Human; bot/human is inferred by repetition and similarity.")
report.append("- Conversion is heuristic; integrate Stripe/Calendly/CRM events if you want ground-truth attribution.")
report.append("- Language detection is heuristic; improve it if you need precise bilingual routing.")
report.append("")
out_path.parent.mkdir(parents=True, exist_ok=True)
out_path.write_text("\n".join(report) + "\n", encoding="utf-8")
_safe_chmod_600(out_path)
return out_path
def main(argv: list[str] | None = None) -> int:
ap = argparse.ArgumentParser(description="Generate a human-readable English report from analyze_instagram_export outputs.")
ap.add_argument("--analysis-dir", required=True, help="directory produced by analyze_instagram_export (contains summary.json)")
ap.add_argument("--out", default=None, help="output markdown path (default: <analysis-dir>/dm_history_report_en.md)")
args = ap.parse_args(argv)
analysis_dir = Path(args.analysis_dir)
out_path = Path(args.out) if args.out else (analysis_dir / "dm_history_report_en.md")
try:
p = generate_report(analysis_dir=analysis_dir, out_path=out_path)
print(json.dumps({"ok": True, "out": str(p)}, ensure_ascii=False))
return 0
except FileNotFoundError as e:
print(f"Missing required input: {e}", file=os.sys.stderr)
return 2
except Exception as e:
print(f"Report generation failed: {e}", file=os.sys.stderr)
return 1
if __name__ == "__main__":
raise SystemExit(main())

View file

@ -0,0 +1,347 @@
from __future__ import annotations
import argparse
import hashlib
import json
import os
import re
import sys
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Iterable
from zipfile import ZipFile
def _now_utc_iso() -> str:
return datetime.now(timezone.utc).replace(microsecond=0).isoformat()
def _ts_ms_to_iso(ts_ms: int | None) -> str | None:
if not ts_ms:
return None
try:
return datetime.fromtimestamp(ts_ms / 1000.0, tz=timezone.utc).replace(microsecond=0).isoformat()
except Exception:
return None
def _safe_slug(value: str, *, max_len: int = 120) -> str:
raw = value.strip()
base = re.sub(r"[^A-Za-z0-9._-]+", "_", raw).strip("_.-")
if not base:
base = "conversation"
if len(base) > max_len:
base = base[:max_len]
digest = hashlib.sha1(raw.encode("utf-8", errors="ignore")).hexdigest()[:10]
return f"{base}__{digest}"
def _extract_text(msg: dict[str, Any]) -> str | None:
for key in ("content", "text", "message"):
value = msg.get(key)
if isinstance(value, str) and value.strip():
return value
share = msg.get("share")
if isinstance(share, dict):
parts: list[str] = []
for k in ("link", "share_text", "original_content_owner", "share_text"):
v = share.get(k)
if isinstance(v, str) and v.strip():
parts.append(v.strip())
if parts:
return " | ".join(parts)
return None
def _extract_attachments(msg: dict[str, Any]) -> list[dict[str, Any]]:
out: list[dict[str, Any]] = []
def add(kind: str, items: Any) -> None:
if not isinstance(items, list):
return
for it in items:
if not isinstance(it, dict):
continue
uri = it.get("uri") or it.get("url")
entry = {"type": kind}
if isinstance(uri, str) and uri:
entry["uri"] = uri
for k in ("creation_timestamp", "timestamp_ms", "duration"):
if k in it:
entry[k] = it.get(k)
out.append(entry)
add("photo", msg.get("photos"))
add("video", msg.get("videos"))
add("audio", msg.get("audio_files"))
add("file", msg.get("files"))
add("gif", msg.get("gifs"))
add("sticker", msg.get("sticker"))
return out
@dataclass(frozen=True)
class ConversationExport:
conv_id: str
title: str | None
participants: list[str]
messages: list[dict[str, Any]]
def _participants_list(obj: Any) -> list[str]:
out: list[str] = []
if not isinstance(obj, list):
return out
for item in obj:
if isinstance(item, dict):
for k in ("name", "username"):
v = item.get(k)
if isinstance(v, str) and v.strip():
out.append(v.strip())
break
elif isinstance(item, str) and item.strip():
out.append(item.strip())
# stable, de-duped
seen: set[str] = set()
unique: list[str] = []
for name in out:
if name in seen:
continue
unique.append(name)
seen.add(name)
return unique
def _message_sort_key(msg: dict[str, Any]) -> int:
ts = msg.get("timestamp_ms")
if isinstance(ts, int):
return ts
if isinstance(ts, float):
return int(ts)
return 0
def _load_conversation_parts(parts: Iterable[tuple[str, Any]]) -> ConversationExport:
title: str | None = None
conv_id: str | None = None
participants: list[str] = []
messages: list[dict[str, Any]] = []
for _name, data in parts:
if not isinstance(data, dict):
continue
if title is None and isinstance(data.get("title"), str):
title = data.get("title")
if conv_id is None:
for key in ("thread_path", "threadPath", "thread_id", "threadId"):
v = data.get(key)
if isinstance(v, str) and v.strip():
conv_id = v.strip()
break
if not participants:
participants = _participants_list(data.get("participants"))
msgs = data.get("messages")
if isinstance(msgs, list):
for m in msgs:
if isinstance(m, dict):
messages.append(m)
if conv_id is None:
conv_id = "unknown"
messages.sort(key=_message_sort_key)
return ConversationExport(conv_id=conv_id, title=title, participants=participants, messages=messages)
def _dir_conversation_groups(root: Path) -> dict[Path, list[Path]]:
"""
Group Instagram export message part files by conversation folder.
The export can contain huge attachment subtrees (photos/videos). To avoid
walking those, prefer `messages/inbox/*` and only glob `message*.json` in the
top level of each conversation folder.
"""
inbox_root = root / "messages" / "inbox"
if inbox_root.is_dir():
groups: dict[Path, list[Path]] = {}
for conv_dir in inbox_root.iterdir():
if not conv_dir.is_dir():
continue
parts = sorted(conv_dir.glob("message*.json"), key=lambda p: p.name)
if parts:
groups[conv_dir] = parts
return groups
# Fallback: legacy/unknown layout; wider scan but still filter to messages/inbox.
groups: dict[Path, list[Path]] = {}
for path in root.rglob("message*.json"):
try:
rel = path.relative_to(root)
except Exception:
continue
rel_lower = str(rel).lower()
if "/messages/inbox/" not in rel_lower.replace("\\", "/"):
continue
if path.is_dir():
continue
groups.setdefault(path.parent, []).append(path)
for folder, files in groups.items():
files.sort(key=lambda p: p.name)
return groups
def _zip_conversation_groups(z: ZipFile) -> dict[str, list[str]]:
groups: dict[str, list[str]] = {}
for name in z.namelist():
lower = name.lower()
if not lower.endswith(".json"):
continue
if "/messages/inbox/" not in lower:
continue
base = os.path.basename(lower)
if not base.startswith("message"):
continue
folder = os.path.dirname(name)
groups.setdefault(folder, []).append(name)
for folder, files in groups.items():
files.sort(key=lambda n: os.path.basename(n))
return groups
def import_instagram_export(*, input_path: str, out_dir: str) -> None:
src = Path(input_path)
out = Path(out_dir)
out.mkdir(parents=True, exist_ok=True)
(out / "messages").mkdir(parents=True, exist_ok=True)
conversations_out: list[dict[str, Any]] = []
total_messages = 0
if src.is_file() and src.suffix.lower() == ".zip":
with ZipFile(src) as z:
groups = _zip_conversation_groups(z)
for folder, files in sorted(groups.items(), key=lambda kv: kv[0]):
parts: list[tuple[str, Any]] = []
for name in files:
with z.open(name) as f:
parts.append((name, json.load(f)))
conv = _load_conversation_parts(parts)
conv_key = conv.conv_id if conv.conv_id != "unknown" else folder
safe = _safe_slug(conv_key)
dst = out / "messages" / f"{safe}.jsonl"
latest_ts_ms: int | None = None
with dst.open("w", encoding="utf-8") as f:
for m in conv.messages:
ts_ms = m.get("timestamp_ms") if isinstance(m.get("timestamp_ms"), int) else None
latest_ts_ms = ts_ms if (ts_ms and (latest_ts_ms is None or ts_ms > latest_ts_ms)) else latest_ts_ms
row = {
"source": "instagram_export",
"conversation_id": conv_key,
"conversation_title": conv.title,
"participants": conv.participants,
"timestamp_ms": ts_ms,
"created_time": _ts_ms_to_iso(ts_ms),
"sender": m.get("sender_name"),
"text": _extract_text(m),
"type": m.get("type"),
"attachments": _extract_attachments(m),
}
f.write(json.dumps(row, ensure_ascii=False) + "\n")
total_messages += 1
conversations_out.append(
{
"id": conv_key,
"title": conv.title,
"participants": conv.participants,
"message_count": len(conv.messages),
"updated_time": _ts_ms_to_iso(latest_ts_ms),
"messages_file": f"messages/{safe}.jsonl",
}
)
elif src.is_dir():
groups = _dir_conversation_groups(src)
for folder, files in sorted(groups.items(), key=lambda kv: str(kv[0])):
parts: list[tuple[str, Any]] = []
for p in files:
try:
parts.append((str(p), json.loads(p.read_text(encoding="utf-8", errors="replace"))))
except Exception:
continue
conv = _load_conversation_parts(parts)
conv_key = conv.conv_id if conv.conv_id != "unknown" else folder.name
safe = _safe_slug(conv_key)
dst = out / "messages" / f"{safe}.jsonl"
latest_ts_ms: int | None = None
with dst.open("w", encoding="utf-8") as f:
for m in conv.messages:
ts_ms = m.get("timestamp_ms") if isinstance(m.get("timestamp_ms"), int) else None
latest_ts_ms = ts_ms if (ts_ms and (latest_ts_ms is None or ts_ms > latest_ts_ms)) else latest_ts_ms
row = {
"source": "instagram_export",
"conversation_id": conv_key,
"conversation_title": conv.title,
"participants": conv.participants,
"timestamp_ms": ts_ms,
"created_time": _ts_ms_to_iso(ts_ms),
"sender": m.get("sender_name"),
"text": _extract_text(m),
"type": m.get("type"),
"attachments": _extract_attachments(m),
}
f.write(json.dumps(row, ensure_ascii=False) + "\n")
total_messages += 1
conversations_out.append(
{
"id": conv_key,
"title": conv.title,
"participants": conv.participants,
"message_count": len(conv.messages),
"updated_time": _ts_ms_to_iso(latest_ts_ms),
"messages_file": f"messages/{safe}.jsonl",
}
)
else:
raise SystemExit("Input must be an Instagram data export .zip file or an extracted export directory.")
(out / "conversations.json").write_text(json.dumps({"data": conversations_out}, indent=2) + "\n")
(out / "export_metadata.json").write_text(
json.dumps(
{
"exported_at": _now_utc_iso(),
"source": "instagram_export",
"input": str(src),
"conversations": len(conversations_out),
"messages": total_messages,
},
indent=2,
)
+ "\n"
)
print(f"Done. conversations={len(conversations_out)} messages={total_messages} out={out}")
def main(argv: list[str] | None = None) -> int:
ap = argparse.ArgumentParser(description="Import Instagram 'Download your information' message export (zip or folder).")
ap.add_argument("--input", required=True, help="path to export .zip or extracted export directory")
ap.add_argument("--out", required=True, help="output directory")
args = ap.parse_args(argv)
try:
import_instagram_export(input_path=args.input, out_dir=args.out)
except KeyboardInterrupt:
return 130
except SystemExit:
raise
except Exception as e:
print(f"Import failed: {e}", file=sys.stderr)
return 1
return 0
if __name__ == "__main__":
raise SystemExit(main())

View file

@ -0,0 +1,214 @@
from __future__ import annotations
import argparse
import json
import re
import sys
import time
from collections import Counter
from datetime import datetime, timezone
from pathlib import Path
def _now_utc_iso() -> str:
return datetime.now(timezone.utc).replace(microsecond=0).isoformat()
def _decode_json_str(raw: bytes) -> str:
"""
Decode a JSON string fragment (no surrounding quotes) into a Python string.
Handles escape sequences.
"""
try:
return json.loads(b'"' + raw + b'"')
except Exception:
return raw.decode("utf-8", errors="replace")
def _resolve_inbox_root(input_path: Path) -> Path:
"""
Accept either:
- export root directory (contains messages/inbox)
- the inbox directory itself
"""
p = input_path
if p.is_dir() and p.name.lower() == "inbox":
return p
candidate = p / "messages" / "inbox"
if candidate.is_dir():
return candidate
raise FileNotFoundError(f"Could not find messages/inbox under: {p}")
def index_export(
*,
input_path: Path,
out_path: Path,
chunk_kb: int,
fallback_chunk_kb: int,
max_conversations: int | None,
progress_every: int,
) -> dict:
inbox_root = _resolve_inbox_root(input_path)
out_path.parent.mkdir(parents=True, exist_ok=True)
re_ts = re.compile(rb'"timestamp_ms"\s*:\s*(\d{10,})')
re_sender = re.compile(rb'"sender_name"\s*:\s*"((?:[^"\\]|\\.)*)"')
re_title = re.compile(rb'"title"\s*:\s*"((?:[^"\\]|\\.)*)"')
re_thread = re.compile(rb'"thread_path"\s*:\s*"((?:[^"\\]|\\.)*)"')
scanned = 0
indexed = 0
missing = 0
errors = 0
min_ts: int | None = None
max_ts: int | None = None
sender_counts: Counter[str] = Counter()
started = time.time()
with out_path.open("w", encoding="utf-8") as out:
for conv_dir in inbox_root.iterdir():
if not conv_dir.is_dir():
continue
scanned += 1
if max_conversations is not None and scanned > max_conversations:
break
msg1 = conv_dir / "message_1.json"
if not msg1.exists():
candidates = sorted(conv_dir.glob("message*.json"), key=lambda p: p.name)
msg1 = candidates[0] if candidates else msg1
if not msg1.exists():
missing += 1
continue
head = b""
try:
with msg1.open("rb") as f:
head = f.read(max(1, chunk_kb) * 1024)
except Exception:
errors += 1
continue
m_ts = re_ts.search(head)
if not m_ts and fallback_chunk_kb > chunk_kb:
try:
with msg1.open("rb") as f:
head = f.read(max(1, fallback_chunk_kb) * 1024)
except Exception:
errors += 1
continue
m_ts = re_ts.search(head)
if not m_ts:
errors += 1
continue
try:
ts_ms = int(m_ts.group(1))
except Exception:
errors += 1
continue
sender: str | None = None
m_sender = re_sender.search(head)
if m_sender:
sender = _decode_json_str(m_sender.group(1))
title: str | None = None
m_title = re_title.search(head)
if m_title:
title = _decode_json_str(m_title.group(1))
thread_path: str | None = None
m_thread = re_thread.search(head)
if m_thread:
thread_path = _decode_json_str(m_thread.group(1))
indexed += 1
if sender:
sender_counts[sender] += 1
min_ts = ts_ms if min_ts is None or ts_ms < min_ts else min_ts
max_ts = ts_ms if max_ts is None or ts_ms > max_ts else max_ts
out.write(
json.dumps(
{
"conv_dir": conv_dir.name,
"message_file": msg1.name,
"thread_path": thread_path,
"title": title,
"latest_timestamp_ms": ts_ms,
"latest_sender": sender,
},
ensure_ascii=False,
)
+ "\n"
)
if progress_every and scanned % progress_every == 0:
elapsed = int(time.time() - started)
print(
f"scanned={scanned} indexed={indexed} missing={missing} errors={errors} elapsed_s={elapsed}",
file=sys.stderr,
flush=True,
)
def _iso(ts: int | None) -> str | None:
if ts is None:
return None
return datetime.fromtimestamp(ts / 1000, tz=timezone.utc).replace(microsecond=0).isoformat()
return {
"exported_at": _now_utc_iso(),
"input": str(input_path),
"inbox_root": str(inbox_root),
"out": str(out_path),
"scanned_dirs": scanned,
"indexed_conversations": indexed,
"missing_message_json": missing,
"errors": errors,
"latest_time_utc": _iso(max_ts),
"oldest_time_utc": _iso(min_ts),
"top_latest_senders": sender_counts.most_common(20),
"elapsed_s": int(time.time() - started),
}
def main(argv: list[str] | None = None) -> int:
ap = argparse.ArgumentParser(description="Index Instagram 'Download your information' export (fast scan).")
ap.add_argument("--input", required=True, help="export root directory (contains messages/inbox) or inbox dir")
ap.add_argument("--out", required=True, help="output index JSONL path")
ap.add_argument("--chunk-kb", type=int, default=64, help="bytes to read from each message file (KB)")
ap.add_argument("--fallback-chunk-kb", type=int, default=256, help="retry read size if timestamp not found (KB)")
ap.add_argument("--max-conversations", type=int, default=None, help="stop after this many conversation folders")
ap.add_argument("--progress-every", type=int, default=1000, help="progress log frequency (stderr)")
args = ap.parse_args(argv)
try:
meta = index_export(
input_path=Path(args.input),
out_path=Path(args.out),
chunk_kb=int(args.chunk_kb),
fallback_chunk_kb=int(args.fallback_chunk_kb),
max_conversations=(int(args.max_conversations) if args.max_conversations else None),
progress_every=int(args.progress_every),
)
meta_path = Path(args.out).with_suffix(".metadata.json")
meta_path.write_text(json.dumps(meta, indent=2, ensure_ascii=False) + "\n")
print(json.dumps(meta, indent=2, ensure_ascii=False))
return 0
except FileNotFoundError as e:
print(str(e), file=sys.stderr)
return 2
except KeyboardInterrupt:
return 130
if __name__ == "__main__":
raise SystemExit(main())

View file

@ -0,0 +1,366 @@
from __future__ import annotations
import argparse
import json
import sys
import time
import urllib.error
import urllib.parse
import urllib.request
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
from .export_meta_ig_history import DEFAULT_CREDS_PATH, load_dotenv
DEFAULT_STATE_PATH = "/root/tmp/meta-device-login-state.json"
def _now_ts() -> int:
return int(datetime.now(timezone.utc).timestamp())
def _post_form(url: str, data: dict[str, str]) -> dict[str, Any]:
body = urllib.parse.urlencode(data).encode("utf-8")
req = urllib.request.Request(url, data=body, method="POST")
req.add_header("Content-Type", "application/x-www-form-urlencoded")
try:
with urllib.request.urlopen(req, timeout=60) as r:
return json.loads(r.read().decode("utf-8"))
except urllib.error.HTTPError as e:
raise SystemExit(_format_meta_http_error(e))
def _get_json(url: str, params: dict[str, str]) -> dict[str, Any]:
full = f"{url}?{urllib.parse.urlencode(params)}"
try:
with urllib.request.urlopen(full, timeout=60) as r:
return json.loads(r.read().decode("utf-8"))
except urllib.error.HTTPError as e:
raise SystemExit(_format_meta_http_error(e))
def _format_meta_http_error(e: urllib.error.HTTPError) -> str:
body = e.read().decode("utf-8", errors="replace")
try:
payload = json.loads(body)
except Exception:
payload = {"raw": body[:1000]}
def redact(obj: Any) -> Any:
if isinstance(obj, dict):
return {k: ("<redacted>" if k == "access_token" else redact(v)) for k, v in obj.items()}
if isinstance(obj, list):
return [redact(v) for v in obj]
return obj
err = payload.get("error", payload)
err = redact(err)
return f"Meta API request failed (HTTP {e.code}): {json.dumps(err, ensure_ascii=False)}"
def _write_dotenv_kv(path: str, updates: dict[str, str]) -> None:
p = Path(path)
lines: list[str] = []
if p.exists():
lines = p.read_text(errors="replace").splitlines()
seen: set[str] = set()
out: list[str] = []
for raw in lines:
line = raw.rstrip("\n")
stripped = line.strip()
if not stripped or stripped.startswith("#") or "=" not in stripped:
out.append(line)
continue
key, _value = stripped.split("=", 1)
key = key.strip()
if key in updates:
out.append(f"{key}={updates[key]}")
seen.add(key)
else:
out.append(line)
for key, value in updates.items():
if key in seen:
continue
out.append(f"{key}={value}")
tmp = p.with_name(p.name + ".tmp")
tmp.write_text("\n".join(out).rstrip("\n") + "\n")
tmp.chmod(0o600)
tmp.replace(p)
@dataclass(frozen=True)
class AppCreds:
app_id: str
app_secret: str | None
client_token: str | None
graph_version: str
@property
def app_access_token(self) -> str:
if not self.app_secret:
raise SystemExit("Missing META_APP_SECRET in creds file.")
return f"{self.app_id}|{self.app_secret}"
@property
def client_access_token(self) -> str:
if not self.client_token:
raise SystemExit(
"Missing META_CLIENT_TOKEN in creds file. "
"For Facebook Login for Devices, /device/login requires a client access token: APP_ID|CLIENT_TOKEN."
)
return f"{self.app_id}|{self.client_token}"
@property
def base(self) -> str:
return f"https://graph.facebook.com/{self.graph_version}"
def _load_app_creds(creds_path: str) -> AppCreds:
env = load_dotenv(creds_path)
app_id = (env.get("META_APP_ID") or "").strip()
app_secret = (env.get("META_APP_SECRET") or "").strip() or None
client_token = (env.get("META_CLIENT_TOKEN") or "").strip() or None
client_access_token = (env.get("META_CLIENT_ACCESS_TOKEN") or "").strip()
if client_access_token and "|" in client_access_token:
parsed_app_id, parsed_client_token = client_access_token.split("|", 1)
if parsed_app_id.strip():
app_id = parsed_app_id.strip()
if parsed_client_token.strip():
client_token = parsed_client_token.strip()
graph_version = (env.get("META_GRAPH_API_VERSION") or "v20.0").strip() or "v20.0"
if not app_id:
raise SystemExit("Missing META_APP_ID in creds file.")
return AppCreds(app_id=app_id, app_secret=app_secret, client_token=client_token, graph_version=graph_version)
def cmd_start(*, creds_path: str, state_path: str, scope: str) -> int:
creds = _load_app_creds(creds_path)
print(f"using_app_id: {creds.app_id} graph_version: {creds.graph_version}")
resp = _post_form(
f"{creds.base}/device/login",
{"access_token": creds.client_access_token, "scope": scope},
)
required = ["code", "user_code", "verification_uri", "expires_in", "interval"]
missing = [k for k in required if k not in resp]
if missing:
raise SystemExit(f"Unexpected response from device/login; missing {missing}: {resp}")
expires_at = _now_ts() + int(resp["expires_in"])
state = {
"created_at": _now_ts(),
"expires_at": expires_at,
"interval": int(resp["interval"]),
"code": resp["code"],
"user_code": resp["user_code"],
"verification_uri": resp["verification_uri"],
"scope": scope,
"app_id": creds.app_id,
"graph_version": creds.graph_version,
}
sp = Path(state_path)
sp.parent.mkdir(parents=True, exist_ok=True)
sp.write_text(json.dumps(state, indent=2) + "\n")
sp.chmod(0o600)
print("Device login started.")
print(f"1) Open: {state['verification_uri']}")
print(f"2) Enter code: {state['user_code']}")
print(f"3) Then run: python3 -m sergio_instagram_messaging.meta_device_login poll --state {state_path}")
print(f"(expires at unix ts: {expires_at})")
return 0
def _read_state(state_path: str) -> dict[str, Any]:
sp = Path(state_path)
if not sp.exists():
raise SystemExit(f"State file not found: {state_path}")
return json.loads(sp.read_text())
def _exchange_long_lived(*, creds: AppCreds, short_token: str) -> str:
if not creds.app_secret:
raise SystemExit("Missing META_APP_SECRET in creds file; cannot exchange a long-lived user token.")
resp = _get_json(
f"{creds.base}/oauth/access_token",
{
"grant_type": "fb_exchange_token",
"client_id": creds.app_id,
"client_secret": creds.app_secret,
"fb_exchange_token": short_token,
},
)
token = (resp.get("access_token") or "").strip()
if not token:
raise SystemExit(f"Token exchange failed: {resp}")
return token
def _discover_page_and_token(*, creds: AppCreds, user_token: str, target_ig_user_id: str | None) -> dict[str, str]:
# Request page access tokens; DO NOT print them.
payload = _get_json(
f"{creds.base}/me/accounts",
{
"fields": "id,name,access_token,instagram_business_account",
"access_token": user_token,
},
)
data = payload.get("data") or []
if not isinstance(data, list) or not data:
raise SystemExit("No pages found for this user token (me/accounts returned empty).")
chosen: dict[str, Any] | None = None
if target_ig_user_id:
for p in data:
if not isinstance(p, dict):
continue
iba = p.get("instagram_business_account") or {}
if isinstance(iba, dict) and str(iba.get("id") or "") == str(target_ig_user_id):
chosen = p
break
if chosen is None:
# Fall back: first page with an Instagram business account attached.
for p in data:
if not isinstance(p, dict):
continue
if p.get("instagram_business_account"):
chosen = p
break
if chosen is None:
raise SystemExit(
"Could not find a Page with an attached instagram_business_account. "
"Connect the Instagram professional account to a Facebook Page first."
)
page_id = str(chosen.get("id") or "").strip()
page_token = str(chosen.get("access_token") or "").strip()
if not page_id or not page_token:
raise SystemExit("Selected page is missing id/access_token in response.")
iba = chosen.get("instagram_business_account") or {}
ig_id = str(iba.get("id") or "").strip() if isinstance(iba, dict) else ""
return {
"META_PAGE_ID": page_id,
"META_PAGE_ACCESS_TOKEN": page_token,
**({"META_IG_USER_ID": ig_id} if ig_id else {}),
}
def cmd_poll(
*,
creds_path: str,
state_path: str,
write_page_token: bool,
target_ig_user_id: str | None,
max_wait_s: int,
) -> int:
state = _read_state(state_path)
creds = _load_app_creds(creds_path)
print(f"using_app_id: {creds.app_id} graph_version: {creds.graph_version}")
expires_at = int(state.get("expires_at") or 0)
interval = int(state.get("interval") or 5)
code = str(state.get("code") or "").strip()
if not code:
raise SystemExit("Invalid state file: missing code.")
deadline = min(expires_at, _now_ts() + max_wait_s)
while _now_ts() < deadline:
resp = _post_form(
f"{creds.base}/device/login_status",
{"access_token": creds.client_access_token, "code": code},
)
token = (resp.get("access_token") or "").strip()
if token:
# Store short-lived and long-lived user tokens.
updates = {"META_USER_ACCESS_TOKEN": token}
try:
updates["META_USER_ACCESS_TOKEN_LONG"] = _exchange_long_lived(creds=creds, short_token=token)
except BaseException:
# Not fatal; keep short-lived token.
pass
user_token = updates.get("META_USER_ACCESS_TOKEN_LONG") or updates["META_USER_ACCESS_TOKEN"]
if write_page_token:
try:
updates.update(
_discover_page_and_token(
creds=creds,
user_token=user_token,
target_ig_user_id=target_ig_user_id,
)
)
except SystemExit as e:
print(
"Warning: could not discover META_PAGE_ID/META_PAGE_ACCESS_TOKEN from this user token.\n"
f"Reason: {e}\n"
"Saved user token(s) only; connect the Instagram professional account to a Facebook Page, "
"then run meta_page_token_from_user_token.",
file=sys.stderr,
)
_write_dotenv_kv(creds_path, updates)
print(f"Saved tokens to {creds_path} (no values printed).")
return 0
# Waiting; Meta returns error-ish objects until the user authorizes.
time.sleep(max(1, interval))
raise SystemExit("Timed out waiting for device authorization.")
def main(argv: list[str] | None = None) -> int:
ap = argparse.ArgumentParser(description="Meta device login helper (Facebook Login for Devices).")
ap.set_defaults(cmd=None)
sub = ap.add_subparsers(required=True)
start = sub.add_parser("start", help="Start device login and print user_code + URL")
start.set_defaults(cmd="start")
start.add_argument("--creds", default=DEFAULT_CREDS_PATH)
start.add_argument("--state", default=DEFAULT_STATE_PATH)
start.add_argument(
"--scope",
default="pages_show_list,pages_read_engagement,instagram_manage_messages",
help="Comma-separated permissions to request",
)
poll = sub.add_parser("poll", help="Poll for completion and save tokens into creds file")
poll.set_defaults(cmd="poll")
poll.add_argument("--creds", default=DEFAULT_CREDS_PATH)
poll.add_argument("--state", default=DEFAULT_STATE_PATH)
poll.add_argument("--write-page-token", action="store_true", help="Also store META_PAGE_ID and META_PAGE_ACCESS_TOKEN")
poll.add_argument(
"--target-ig-user-id",
default=None,
help="Prefer selecting the Page connected to this instagram_business_account id",
)
poll.add_argument("--max-wait-s", type=int, default=900)
args = ap.parse_args(argv)
if args.cmd == "start":
return cmd_start(creds_path=args.creds, state_path=args.state, scope=args.scope)
if args.cmd == "poll":
return cmd_poll(
creds_path=args.creds,
state_path=args.state,
write_page_token=bool(args.write_page_token),
target_ig_user_id=(args.target_ig_user_id or None),
max_wait_s=int(args.max_wait_s),
)
raise SystemExit(2)
if __name__ == "__main__":
raise SystemExit(main())

View file

@ -0,0 +1,231 @@
from __future__ import annotations
import argparse
import json
import sys
from pathlib import Path
from typing import Any
from .export_meta_ig_history import DEFAULT_CREDS_PATH, GraphApiClient, GraphApiConfig, GraphApiError, load_dotenv
DEFAULT_USER_TOKEN_PATH = "/root/tmp/meta-user-access-token.txt"
def _redact(obj: Any) -> Any:
if isinstance(obj, dict):
return {k: ("<redacted>" if k == "access_token" else _redact(v)) for k, v in obj.items()}
if isinstance(obj, list):
return [_redact(v) for v in obj]
return obj
def _write_dotenv_kv(path: str, updates: dict[str, str]) -> None:
p = Path(path)
lines: list[str] = []
if p.exists():
lines = p.read_text(errors="replace").splitlines()
seen: set[str] = set()
out: list[str] = []
for raw in lines:
line = raw.rstrip("\n")
stripped = line.strip()
if not stripped or stripped.startswith("#") or "=" not in stripped:
out.append(line)
continue
key, _value = stripped.split("=", 1)
key = key.strip()
if key in updates:
out.append(f"{key}={updates[key]}")
seen.add(key)
else:
out.append(line)
for key, value in updates.items():
if key in seen:
continue
out.append(f"{key}={value}")
tmp = p.with_name(p.name + ".tmp")
tmp.write_text("\n".join(out).rstrip("\n") + "\n")
tmp.chmod(0o600)
tmp.replace(p)
def _read_user_token(*, user_token: str | None, user_token_file: str) -> str:
if user_token is not None:
token = user_token.strip()
if not token:
raise SystemExit("Empty --user-token provided.")
return token
p = Path(user_token_file)
if not p.exists():
raise SystemExit(
f"User token file not found: {user_token_file}\n"
"Create it with mode 600 and put a Facebook User access token inside (single line)."
)
raw = p.read_text(errors="replace")
lines = [ln.strip() for ln in raw.splitlines()]
lines = [ln for ln in lines if ln and not ln.startswith("#")]
if not lines:
raise SystemExit(f"User token file is empty: {user_token_file}")
# Accept either:
# - a single raw token line
# - an env-style line: META_USER_ACCESS_TOKEN=...
# - a file with multiple lines (we'll pick the most likely user-token line)
candidates: list[str] = []
for ln in lines:
if "=" in ln:
_k, v = ln.split("=", 1)
v = v.strip()
if v:
candidates.append(v)
else:
candidates.append(ln)
# Heuristic: Facebook user tokens typically start with "EA" and are long.
user_like = [c for c in candidates if c.startswith("EA") and len(c) > 50]
if user_like:
return max(user_like, key=len)
# Fallback: pick the longest non-empty candidate.
return max(candidates, key=len)
def main(argv: list[str] | None = None) -> int:
ap = argparse.ArgumentParser(
description=(
"Derive a Page access token from a Facebook User access token (no token values printed). "
"Useful fallback when device-login is blocked."
)
)
ap.add_argument("--creds", default=DEFAULT_CREDS_PATH, help="dotenv-style creds file to update")
ap.add_argument("--user-token-file", default=DEFAULT_USER_TOKEN_PATH)
ap.add_argument(
"--user-token",
default=None,
help="Provide the user token inline (not recommended; may leak in shell history). Prefer --user-token-file.",
)
ap.add_argument("--target-ig-user-id", default=None, help="Prefer a Page connected to this instagram_business_account id")
ap.add_argument("--page-id", default=None, help="Force selecting a specific Page id from /me/accounts")
ap.add_argument("--platform", default="instagram")
ap.add_argument("--version", default=None, help="override graph api version (e.g., v20.0)")
ap.add_argument("--list-pages", action="store_true", help="List pages (id/name/has_ig) without saving tokens")
ap.add_argument("--dry-run", action="store_true", help="Do not write into creds file")
args = ap.parse_args(argv)
env = load_dotenv(args.creds)
version = (args.version or env.get("META_GRAPH_API_VERSION") or "v20.0").strip() or "v20.0"
user_token = _read_user_token(user_token=args.user_token, user_token_file=args.user_token_file)
client = GraphApiClient(GraphApiConfig(access_token=user_token, version=version))
try:
me = client.get_json("/me", {"fields": "id,name"})
me_id = str(me.get("id") or "").strip()
me_name = str(me.get("name") or "").strip()
print(f"/me: id={me_id} name={me_name}")
payload = client.get_json(
"/me/accounts",
{
"fields": "id,name,access_token,instagram_business_account",
"limit": "200",
},
)
data = payload.get("data") or []
if not isinstance(data, list) or not data:
print("No pages found for this user token (me/accounts returned empty).", file=sys.stderr)
return 2
if args.list_pages:
for p in data:
if not isinstance(p, dict):
continue
pid = str(p.get("id") or "").strip()
name = str(p.get("name") or "").strip()
iba = p.get("instagram_business_account")
ig_id = ""
if isinstance(iba, dict):
ig_id = str(iba.get("id") or "").strip()
has_ig = bool(ig_id)
print(f"page: id={pid} name={name} has_ig={has_ig} ig_id={ig_id}")
return 0
chosen: dict[str, Any] | None = None
if args.page_id:
for p in data:
if not isinstance(p, dict):
continue
if str(p.get("id") or "").strip() == str(args.page_id).strip():
chosen = p
break
if chosen is None:
print(f"--page-id not found in /me/accounts: {args.page_id}", file=sys.stderr)
return 2
if chosen is None and args.target_ig_user_id:
for p in data:
if not isinstance(p, dict):
continue
iba = p.get("instagram_business_account") or {}
if isinstance(iba, dict) and str(iba.get("id") or "") == str(args.target_ig_user_id):
chosen = p
break
if chosen is None:
for p in data:
if not isinstance(p, dict):
continue
if p.get("instagram_business_account"):
chosen = p
break
if chosen is None:
print(
"Could not find a Page with an attached instagram_business_account.\n"
"Connect the Instagram professional account to a Facebook Page first.",
file=sys.stderr,
)
return 2
page_id = str(chosen.get("id") or "").strip()
page_name = str(chosen.get("name") or "").strip()
page_token = str(chosen.get("access_token") or "").strip()
iba = chosen.get("instagram_business_account") or {}
ig_id = str(iba.get("id") or "").strip() if isinstance(iba, dict) else ""
if not page_id or not page_token:
print("Selected page is missing id/access_token in response.", file=sys.stderr)
return 2
print(f"selected_page: id={page_id} name={page_name}")
if ig_id:
print(f"selected_instagram_business_account_id: {ig_id}")
if args.dry_run:
print("dry_run: not writing creds")
return 0
updates = {
"META_PAGE_ID": page_id,
"META_PAGE_ACCESS_TOKEN": page_token,
}
if ig_id:
updates["META_IG_USER_ID"] = ig_id
_write_dotenv_kv(args.creds, updates)
print(f"Saved META_PAGE_ID + META_PAGE_ACCESS_TOKEN to {args.creds} (no values printed).")
return 0
except GraphApiError as e:
payload = e.args[0] if e.args else {"error": str(e)}
print(json.dumps(_redact(payload), ensure_ascii=False), file=sys.stderr)
return 3
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
return 1
if __name__ == "__main__":
raise SystemExit(main())

View file

@ -0,0 +1,106 @@
from __future__ import annotations
import argparse
import json
import sys
import urllib.parse
import urllib.request
from typing import Any
from .export_meta_ig_history import DEFAULT_CREDS_PATH, GraphApiClient, GraphApiConfig, GraphApiError, load_dotenv
def _require(env: dict[str, str], key: str) -> str:
value = (env.get(key) or "").strip()
if not value:
raise KeyError(f"Missing required config: {key}")
return value
def _post_form(url: str, data: dict[str, str]) -> dict[str, Any]:
body = urllib.parse.urlencode(data).encode("utf-8")
req = urllib.request.Request(url, data=body, method="POST")
req.add_header("Content-Type", "application/x-www-form-urlencoded")
with urllib.request.urlopen(req, timeout=60) as r:
return json.loads(r.read().decode("utf-8"))
def subscribe_page(*, creds_path: str, fields: str) -> dict[str, Any]:
env = load_dotenv(creds_path)
page_id = _require(env, "META_PAGE_ID")
page_token = _require(env, "META_PAGE_ACCESS_TOKEN")
version = (env.get("META_GRAPH_API_VERSION") or "v24.0").strip() or "v24.0"
url = f"https://graph.facebook.com/{version}/{page_id}/subscribed_apps"
try:
return _post_form(
url,
{
"subscribed_fields": fields,
"access_token": page_token,
},
)
except urllib.error.HTTPError as e:
body = e.read().decode("utf-8", errors="replace")
try:
payload = json.loads(body)
except Exception:
payload = {"raw": body[:1000]}
raise GraphApiError({"status": e.code, "error": payload})
def list_subscribed_apps(*, creds_path: str) -> dict[str, Any]:
env = load_dotenv(creds_path)
page_id = _require(env, "META_PAGE_ID")
page_token = _require(env, "META_PAGE_ACCESS_TOKEN")
version = (env.get("META_GRAPH_API_VERSION") or "v24.0").strip() or "v24.0"
client = GraphApiClient(GraphApiConfig(access_token=page_token, version=version))
return client.get_json(f"/{page_id}/subscribed_apps", {"fields": "id,name"})
def main(argv: list[str] | None = None) -> int:
ap = argparse.ArgumentParser(
description=(
"Subscribe the Facebook Page to this Meta app's webhooks.\n\n"
"Note: this requires a Page access token that includes the pages_manage_metadata permission."
)
)
ap.set_defaults(cmd=None)
sub = ap.add_subparsers(required=True)
p_list = sub.add_parser("list", help="List subscribed apps for the Page")
p_list.set_defaults(cmd="list")
p_list.add_argument("--creds", default=DEFAULT_CREDS_PATH)
p_sub = sub.add_parser("subscribe", help="Subscribe the Page to this app's webhook fields")
p_sub.set_defaults(cmd="subscribe")
p_sub.add_argument("--creds", default=DEFAULT_CREDS_PATH)
p_sub.add_argument(
"--fields",
default="messages",
help="Comma-separated subscribed_fields (default: messages)",
)
args = ap.parse_args(argv)
try:
if args.cmd == "list":
out = list_subscribed_apps(creds_path=args.creds)
print(json.dumps(out, indent=2, ensure_ascii=False))
return 0
if args.cmd == "subscribe":
out = subscribe_page(creds_path=args.creds, fields=args.fields)
print(json.dumps(out, indent=2, ensure_ascii=False))
return 0
raise SystemExit(2)
except KeyError as e:
print(str(e), file=sys.stderr)
return 2
except GraphApiError as e:
print(f"Graph API error: {e}", file=sys.stderr)
return 3
if __name__ == "__main__":
raise SystemExit(main())

View file

@ -0,0 +1,63 @@
from __future__ import annotations
import argparse
import json
import sys
from typing import Any
from .export_meta_ig_history import DEFAULT_CREDS_PATH, GraphApiClient, GraphApiConfig, GraphApiError, load_dotenv
def _redact(obj: Any) -> Any:
if isinstance(obj, dict):
return {k: ("<redacted>" if k == "access_token" else _redact(v)) for k, v in obj.items()}
if isinstance(obj, list):
return [_redact(v) for v in obj]
return obj
def main(argv: list[str] | None = None) -> int:
ap = argparse.ArgumentParser(description="Quick sanity checks for Meta Graph API tokens (no token values printed).")
ap.add_argument("--creds", default=DEFAULT_CREDS_PATH)
ap.add_argument("--platform", default="instagram")
ap.add_argument("--version", default=None, help="override graph api version (e.g., v20.0)")
args = ap.parse_args(argv)
env = load_dotenv(args.creds)
token = (env.get("META_PAGE_ACCESS_TOKEN") or "").strip()
if not token:
print("Missing META_PAGE_ACCESS_TOKEN in creds file.", file=sys.stderr)
return 2
version = (args.version or env.get("META_GRAPH_API_VERSION") or "v20.0").strip() or "v20.0"
client = GraphApiClient(GraphApiConfig(access_token=token, version=version))
try:
me = client.get_json("/me", {"fields": "id,name"})
me_id = str(me.get("id") or "").strip()
me_name = str(me.get("name") or "").strip()
print(f"/me: id={me_id} name={me_name}")
page_id = (env.get("META_PAGE_ID") or "").strip() or me_id
print(f"page_id_used: {page_id}")
test = client.get_json(
f"/{page_id}/conversations",
{"platform": args.platform, "limit": "1", "fields": "id,updated_time,message_count,participants"},
)
data = test.get("data") or []
print(f"conversations_first_page_count: {len(data) if isinstance(data, list) else 0}")
if isinstance(data, list) and data:
print(f"sample_conversation_id: {data[0].get('id')}")
return 0
except GraphApiError as e:
print(json.dumps(_redact({"graph_api_error": str(e)}), ensure_ascii=False), file=sys.stderr)
return 3
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
return 1
if __name__ == "__main__":
raise SystemExit(main())

View file

@ -0,0 +1,81 @@
from __future__ import annotations
import argparse
import json
import sys
import urllib.parse
import urllib.request
from typing import Any
from .export_meta_ig_history import DEFAULT_CREDS_PATH, GraphApiError, load_dotenv
def _require(env: dict[str, str], key: str) -> str:
value = (env.get(key) or "").strip()
if not value:
raise KeyError(f"Missing required config: {key}")
return value
def _post_json(url: str, payload: dict[str, Any]) -> dict[str, Any]:
body = json.dumps(payload, ensure_ascii=False).encode("utf-8")
req = urllib.request.Request(url, data=body, method="POST")
req.add_header("Content-Type", "application/json")
try:
with urllib.request.urlopen(req, timeout=30) as r:
return json.loads(r.read().decode("utf-8"))
except urllib.error.HTTPError as e:
body = e.read().decode("utf-8", errors="replace")
try:
payload = json.loads(body)
except Exception:
payload = {"raw": body[:1000]}
raise GraphApiError({"status": e.code, "error": payload})
def send_text(*, creds_path: str, recipient_id: str, text: str, version: str | None) -> dict[str, Any]:
env = load_dotenv(creds_path)
token = _require(env, "META_PAGE_ACCESS_TOKEN")
graph_version = (version or env.get("META_GRAPH_API_VERSION") or "v24.0").strip() or "v24.0"
url = (
f"https://graph.facebook.com/{graph_version}/me/messages?"
+ urllib.parse.urlencode({"access_token": token})
)
return _post_json(
url,
{
"recipient": {"id": str(recipient_id)},
"message": {"text": text},
},
)
def main(argv: list[str] | None = None) -> int:
ap = argparse.ArgumentParser(description="Send an Instagram DM via Meta Graph API (Messenger API for Instagram).")
ap.add_argument("--creds", default=DEFAULT_CREDS_PATH)
ap.add_argument("--recipient-id", required=True)
ap.add_argument("--text", required=True)
ap.add_argument("--version", default=None, help="override graph api version (e.g., v24.0)")
args = ap.parse_args(argv)
try:
out = send_text(
creds_path=args.creds,
recipient_id=str(args.recipient_id),
text=str(args.text),
version=(args.version or None),
)
print(json.dumps(out, indent=2, ensure_ascii=False))
return 0
except KeyError as e:
print(str(e), file=sys.stderr)
return 2
except GraphApiError as e:
print(f"Graph API error: {e}", file=sys.stderr)
return 3
if __name__ == "__main__":
raise SystemExit(main())

View file

@ -0,0 +1,145 @@
from __future__ import annotations
import struct
import zlib
from dataclasses import dataclass
def _png_chunk(kind: bytes, data: bytes) -> bytes:
length = struct.pack(">I", len(data))
crc = zlib.crc32(kind)
crc = zlib.crc32(data, crc)
crc_bytes = struct.pack(">I", crc & 0xFFFFFFFF)
return length + kind + data + crc_bytes
def write_png_rgb(path: str, *, width: int, height: int, rgb: bytes) -> None:
if width <= 0 or height <= 0:
raise ValueError("width and height must be positive")
if len(rgb) != width * height * 3:
raise ValueError("rgb buffer must be width*height*3 bytes")
signature = b"\x89PNG\r\n\x1a\n"
ihdr = struct.pack(">IIBBBBB", width, height, 8, 2, 0, 0, 0) # 8-bit, RGB
stride = width * 3
raw = bytearray()
for y in range(height):
raw.append(0) # filter type 0
start = y * stride
raw.extend(rgb[start : start + stride])
compressed = zlib.compress(bytes(raw), level=6)
png = bytearray()
png.extend(signature)
png.extend(_png_chunk(b"IHDR", ihdr))
png.extend(_png_chunk(b"IDAT", compressed))
png.extend(_png_chunk(b"IEND", b""))
with open(path, "wb") as f:
f.write(png)
def _clamp(v: int) -> int:
return 0 if v < 0 else 255 if v > 255 else v
def _blend(bg: tuple[int, int, int], fg: tuple[int, int, int], alpha: float) -> tuple[int, int, int]:
a = 0.0 if alpha < 0 else 1.0 if alpha > 1 else alpha
return (
_clamp(int(bg[0] * (1.0 - a) + fg[0] * a)),
_clamp(int(bg[1] * (1.0 - a) + fg[1] * a)),
_clamp(int(bg[2] * (1.0 - a) + fg[2] * a)),
)
@dataclass
class Canvas:
width: int
height: int
_rgb: bytearray
@classmethod
def new(cls, width: int, height: int, *, bg: tuple[int, int, int] = (255, 255, 255)) -> "Canvas":
if width <= 0 or height <= 0:
raise ValueError("width and height must be positive")
buf = bytearray(width * height * 3)
c = cls(width=width, height=height, _rgb=buf)
c.clear(bg)
return c
def clear(self, color: tuple[int, int, int]) -> None:
r, g, b = (_clamp(color[0]), _clamp(color[1]), _clamp(color[2]))
for i in range(0, len(self._rgb), 3):
self._rgb[i] = r
self._rgb[i + 1] = g
self._rgb[i + 2] = b
def to_bytes(self) -> bytes:
return bytes(self._rgb)
def save(self, path: str) -> None:
write_png_rgb(path, width=self.width, height=self.height, rgb=self.to_bytes())
def _idx(self, x: int, y: int) -> int | None:
if x < 0 or y < 0 or x >= self.width or y >= self.height:
return None
return (y * self.width + x) * 3
def set_pixel(self, x: int, y: int, color: tuple[int, int, int]) -> None:
idx = self._idx(x, y)
if idx is None:
return
self._rgb[idx] = _clamp(color[0])
self._rgb[idx + 1] = _clamp(color[1])
self._rgb[idx + 2] = _clamp(color[2])
def blend_pixel(self, x: int, y: int, color: tuple[int, int, int], *, alpha: float) -> None:
idx = self._idx(x, y)
if idx is None:
return
bg = (self._rgb[idx], self._rgb[idx + 1], self._rgb[idx + 2])
r, g, b = _blend((int(bg[0]), int(bg[1]), int(bg[2])), color, alpha)
self._rgb[idx] = r
self._rgb[idx + 1] = g
self._rgb[idx + 2] = b
def draw_rect(self, x0: int, y0: int, x1: int, y1: int, color: tuple[int, int, int]) -> None:
if x0 > x1:
x0, x1 = x1, x0
if y0 > y1:
y0, y1 = y1, y0
x0 = max(0, x0)
y0 = max(0, y0)
x1 = min(self.width - 1, x1)
y1 = min(self.height - 1, y1)
for y in range(y0, y1 + 1):
for x in range(x0, x1 + 1):
self.set_pixel(x, y, color)
def draw_line(self, x0: int, y0: int, x1: int, y1: int, color: tuple[int, int, int]) -> None:
dx = abs(x1 - x0)
dy = -abs(y1 - y0)
sx = 1 if x0 < x1 else -1
sy = 1 if y0 < y1 else -1
err = dx + dy
x, y = x0, y0
while True:
self.set_pixel(x, y, color)
if x == x1 and y == y1:
break
e2 = 2 * err
if e2 >= dy:
err += dy
x += sx
if e2 <= dx:
err += dx
y += sy
def draw_polyline(self, points: list[tuple[int, int]], color: tuple[int, int, int]) -> None:
if len(points) < 2:
return
for (x0, y0), (x1, y1) in zip(points, points[1:]):
self.draw_line(x0, y0, x1, y1, color)