521 lines
16 KiB
Python
521 lines
16 KiB
Python
#!/usr/bin/env python3
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import re
|
|
from dataclasses import asdict, dataclass
|
|
from datetime import datetime, timezone
|
|
from typing import Any, Iterable, Literal
|
|
from urllib.parse import urlparse
|
|
|
|
import requests
|
|
try:
|
|
from bs4 import BeautifulSoup
|
|
except ImportError as exc: # pragma: no cover
|
|
raise SystemExit(
|
|
"Missing dependency 'beautifulsoup4'. Install with:\n"
|
|
" python3 -m pip install beautifulsoup4\n"
|
|
"or (recommended) inside a venv:\n"
|
|
" python3 -m venv .venv && . .venv/bin/activate && pip install beautifulsoup4\n"
|
|
) from exc
|
|
|
|
|
|
Platform = Literal[
|
|
"auto",
|
|
"booking",
|
|
"tripadvisor",
|
|
"instagram",
|
|
"facebook",
|
|
"tiktok",
|
|
"policy",
|
|
]
|
|
|
|
GOOGLEBOT_UA = (
|
|
"Mozilla/5.0 (Linux; Android 6.0.1; Nexus 5X Build/MMB29P) "
|
|
"AppleWebKit/537.36 (KHTML, like Gecko) "
|
|
"Chrome/41.0.2272.96 Mobile Safari/537.36 "
|
|
"(compatible; Googlebot/2.1; +http://www.google.com/bot.html)"
|
|
)
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class AuditResult:
|
|
url: str
|
|
platform: str
|
|
fetched_at: str
|
|
status_code: int | None
|
|
final_url: str | None
|
|
ok: bool
|
|
error: str | None
|
|
data: dict[str, Any]
|
|
|
|
|
|
def iso_now() -> str:
|
|
return datetime.now(timezone.utc).astimezone().isoformat(timespec="seconds")
|
|
|
|
|
|
def detect_platform(url: str) -> Platform:
|
|
host = (urlparse(url).netloc or "").lower()
|
|
if "booking.com" in host:
|
|
return "booking"
|
|
if "tripadvisor." in host or "tripadvisor.com" in host:
|
|
return "tripadvisor"
|
|
if "instagram.com" in host:
|
|
return "instagram"
|
|
if "facebook.com" in host or host.endswith("fb.com"):
|
|
return "facebook"
|
|
if "tiktok.com" in host:
|
|
return "tiktok"
|
|
return "policy"
|
|
|
|
|
|
def normalize_platform(platform: str) -> Platform | None:
|
|
key = re.sub(r"[^a-z0-9]+", "", (platform or "").strip().lower())
|
|
if key in ("", "auto", "detect"):
|
|
return "auto"
|
|
if key in ("booking", "bookingcom"):
|
|
return "booking"
|
|
if key in ("tripadvisor", "tripadvisorcom", "tripadvisorfr", "tripadvisoruk"):
|
|
return "tripadvisor"
|
|
if key in ("instagram", "insta", "ig"):
|
|
return "instagram"
|
|
if key in ("facebook", "fb"):
|
|
return "facebook"
|
|
if key in ("tiktok", "tik", "tik_tok", "ticktok"):
|
|
return "tiktok"
|
|
if key in ("policy", "site", "general"):
|
|
return "policy"
|
|
return None
|
|
|
|
|
|
def normalize_whitespace(text: str) -> str:
|
|
return re.sub(r"\s+", " ", text).strip()
|
|
|
|
|
|
def parse_human_number(raw: str) -> int | None:
|
|
s = raw.strip().replace("\u202f", "").replace("\xa0", "").replace(" ", "")
|
|
match = re.match(r"^(?P<num>\d+(?:[.,]\d+)?)(?P<suffix>[KkMmBb])?$", s)
|
|
if not match:
|
|
return None
|
|
|
|
num_part = match.group("num")
|
|
suffix = (match.group("suffix") or "").upper()
|
|
|
|
if "," in num_part and "." in num_part:
|
|
num_part = num_part.replace(",", "")
|
|
elif "," in num_part and "." not in num_part:
|
|
parts = num_part.split(",")
|
|
if len(parts) > 1 and len(parts[-1]) == 3:
|
|
num_part = "".join(parts)
|
|
else:
|
|
num_part = num_part.replace(",", ".")
|
|
|
|
try:
|
|
value = float(num_part)
|
|
except ValueError:
|
|
return None
|
|
|
|
multiplier = {"": 1, "K": 1_000, "M": 1_000_000, "B": 1_000_000_000}.get(suffix)
|
|
if multiplier is None:
|
|
return None
|
|
return int(round(value * multiplier))
|
|
|
|
|
|
def fetch_url(
|
|
session: requests.Session,
|
|
url: str,
|
|
*,
|
|
timeout_s: float,
|
|
user_agent: str,
|
|
) -> tuple[requests.Response | None, str | None]:
|
|
try:
|
|
resp = session.get(
|
|
url,
|
|
headers={
|
|
"User-Agent": user_agent,
|
|
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
|
|
},
|
|
timeout=timeout_s,
|
|
allow_redirects=True,
|
|
)
|
|
return resp, None
|
|
except requests.RequestException as exc:
|
|
return None, str(exc)
|
|
|
|
|
|
def collect_meta(soup: BeautifulSoup) -> dict[str, list[str]]:
|
|
out: dict[str, list[str]] = {}
|
|
for tag in soup.find_all("meta"):
|
|
key = tag.get("property") or tag.get("name")
|
|
if not key:
|
|
continue
|
|
content = tag.get("content")
|
|
if not content:
|
|
continue
|
|
out.setdefault(key.strip().lower(), []).append(content.strip())
|
|
return out
|
|
|
|
|
|
def first_meta(meta: dict[str, list[str]], keys: Iterable[str]) -> str | None:
|
|
for key in keys:
|
|
values = meta.get(key.lower())
|
|
if values:
|
|
return values[0]
|
|
return None
|
|
|
|
|
|
def clean_jsonld_text(raw: str) -> str:
|
|
s = raw.strip()
|
|
s = re.sub(r"^\s*<!--", "", s)
|
|
s = re.sub(r"-->\s*$", "", s)
|
|
s = re.sub(r"^\s*/\*+\s*<!\[CDATA\[\s*\*/\s*", "", s)
|
|
s = re.sub(r"\s*/\*+\s*\]\]>\s*\*/\s*$", "", s)
|
|
return s.strip()
|
|
|
|
|
|
def extract_jsonld_objects(soup: BeautifulSoup) -> tuple[list[Any], list[dict[str, str]]]:
|
|
objects: list[Any] = []
|
|
errors: list[dict[str, str]] = []
|
|
for script in soup.find_all("script", attrs={"type": re.compile(r"^application/ld\+json$", re.I)}):
|
|
raw = script.string or script.get_text() or ""
|
|
raw = clean_jsonld_text(raw)
|
|
if not raw:
|
|
continue
|
|
try:
|
|
objects.append(json.loads(raw))
|
|
continue
|
|
except json.JSONDecodeError:
|
|
pass
|
|
|
|
try:
|
|
patched = "[" + re.sub(r"}\s*{", "},{", raw) + "]"
|
|
objects.append(json.loads(patched))
|
|
except json.JSONDecodeError as exc:
|
|
errors.append({"error": str(exc), "snippet": raw[:400]})
|
|
return objects, errors
|
|
|
|
|
|
def iter_dicts(obj: Any) -> Iterable[dict[str, Any]]:
|
|
if isinstance(obj, dict):
|
|
yield obj
|
|
graph = obj.get("@graph")
|
|
if isinstance(graph, list):
|
|
for item in graph:
|
|
yield from iter_dicts(item)
|
|
for value in obj.values():
|
|
yield from iter_dicts(value)
|
|
elif isinstance(obj, list):
|
|
for item in obj:
|
|
yield from iter_dicts(item)
|
|
|
|
|
|
def coerce_float(value: Any) -> float | None:
|
|
if value is None:
|
|
return None
|
|
if isinstance(value, (int, float)):
|
|
return float(value)
|
|
if isinstance(value, str):
|
|
v = value.strip().replace(",", ".")
|
|
try:
|
|
return float(v)
|
|
except ValueError:
|
|
return None
|
|
return None
|
|
|
|
|
|
def coerce_int(value: Any) -> int | None:
|
|
if value is None:
|
|
return None
|
|
if isinstance(value, int):
|
|
return value
|
|
if isinstance(value, float):
|
|
return int(value)
|
|
if isinstance(value, str):
|
|
v = re.sub(r"[^\d]", "", value)
|
|
return int(v) if v else None
|
|
return None
|
|
|
|
|
|
def pick_best_aggregate_rating(jsonld: list[Any]) -> dict[str, Any] | None:
|
|
candidates: list[tuple[int, float, dict[str, Any]]] = []
|
|
for obj in jsonld:
|
|
for d in iter_dicts(obj):
|
|
agg = d.get("aggregateRating")
|
|
if not isinstance(agg, dict):
|
|
continue
|
|
rating_value = coerce_float(agg.get("ratingValue"))
|
|
if rating_value is None:
|
|
continue
|
|
review_count = coerce_int(agg.get("reviewCount") or agg.get("ratingCount"))
|
|
score = (review_count or 0) * 10 + int(round(rating_value * 100))
|
|
candidates.append(
|
|
(
|
|
score,
|
|
rating_value,
|
|
{
|
|
"rating_value": rating_value,
|
|
"review_count": review_count,
|
|
"best_rating": coerce_float(agg.get("bestRating")),
|
|
"worst_rating": coerce_float(agg.get("worstRating")),
|
|
"source": "jsonld",
|
|
"aggregate_rating": agg,
|
|
"parent_types": d.get("@type"),
|
|
},
|
|
)
|
|
)
|
|
candidates.sort(reverse=True)
|
|
return candidates[0][2] if candidates else None
|
|
|
|
|
|
def extract_awards(jsonld: list[Any]) -> list[str]:
|
|
awards: list[str] = []
|
|
seen: set[str] = set()
|
|
for obj in jsonld:
|
|
for d in iter_dicts(obj):
|
|
raw = d.get("award") or d.get("awards")
|
|
if isinstance(raw, str):
|
|
items = [raw]
|
|
elif isinstance(raw, list):
|
|
items = [x for x in raw if isinstance(x, str)]
|
|
else:
|
|
items = []
|
|
for item in items:
|
|
cleaned = normalize_whitespace(item)
|
|
if not cleaned or cleaned.lower() in seen:
|
|
continue
|
|
seen.add(cleaned.lower())
|
|
awards.append(cleaned)
|
|
return awards
|
|
|
|
|
|
BADGE_KEYWORDS = [
|
|
"travellers' choice",
|
|
"travelers' choice",
|
|
"traveller review award",
|
|
"traveler review award",
|
|
"greenleaders",
|
|
"green leader",
|
|
"travel sustainable",
|
|
"preferred partner",
|
|
"genius",
|
|
"key collection",
|
|
]
|
|
|
|
|
|
def extract_badges_from_html(soup: BeautifulSoup) -> list[str]:
|
|
text = normalize_whitespace(soup.get_text(" ", strip=True))
|
|
lowered = text.lower()
|
|
hits: list[str] = []
|
|
for keyword in BADGE_KEYWORDS:
|
|
if keyword in lowered:
|
|
hits.append(keyword)
|
|
return sorted(set(hits))
|
|
|
|
|
|
def extract_followers_from_description(description: str) -> dict[str, Any] | None:
|
|
patterns = [
|
|
r"(?P<count>\d[\d.,\s\u202f\xa0]*[KkMmBb]?)\s+followers?\b",
|
|
r"(?P<count>\d[\d.,\s\u202f\xa0]*[KkMmBb]?)\s+abonn[eé]s?\b",
|
|
r"(?P<count>\d[\d.,\s\u202f\xa0]*[KkMmBb]?)\s+people\s+like\s+this\b",
|
|
r"(?P<count>\d[\d.,\s\u202f\xa0]*[KkMmBb]?)\s+likes\b",
|
|
]
|
|
lowered = description.lower()
|
|
for pattern in patterns:
|
|
match = re.search(pattern, lowered, flags=re.IGNORECASE)
|
|
if not match:
|
|
continue
|
|
raw = match.group("count")
|
|
return {"raw": raw, "value": parse_human_number(raw)}
|
|
return None
|
|
|
|
|
|
DATE_META_KEYS = [
|
|
"article:published_time",
|
|
"article:modified_time",
|
|
"og:published_time",
|
|
"og:updated_time",
|
|
"og:video:release_date",
|
|
"last-modified",
|
|
]
|
|
|
|
|
|
def parse_first_iso_datetime(values: Iterable[str]) -> str | None:
|
|
for value in values:
|
|
v = value.strip()
|
|
try:
|
|
return datetime.fromisoformat(v).isoformat()
|
|
except ValueError:
|
|
continue
|
|
return None
|
|
|
|
|
|
def audit_booking_or_tripadvisor(
|
|
soup: BeautifulSoup,
|
|
*,
|
|
include_jsonld: bool,
|
|
) -> dict[str, Any]:
|
|
jsonld, jsonld_errors = extract_jsonld_objects(soup)
|
|
rating = pick_best_aggregate_rating(jsonld)
|
|
awards = extract_awards(jsonld)
|
|
badges = sorted(set(awards + extract_badges_from_html(soup)))
|
|
|
|
data: dict[str, Any] = {
|
|
"rating": rating,
|
|
"review_count": rating.get("review_count") if rating else None,
|
|
"badges": badges,
|
|
"jsonld_count": len(jsonld),
|
|
"jsonld_parse_errors": jsonld_errors,
|
|
}
|
|
if include_jsonld:
|
|
data["jsonld"] = jsonld
|
|
return data
|
|
|
|
|
|
def audit_social(
|
|
soup: BeautifulSoup,
|
|
) -> dict[str, Any]:
|
|
meta = collect_meta(soup)
|
|
description = first_meta(meta, ["description", "og:description", "twitter:description"]) or ""
|
|
followers = extract_followers_from_description(description) if description else None
|
|
|
|
raw_date = first_meta(meta, DATE_META_KEYS)
|
|
last_post_date = parse_first_iso_datetime([raw_date]) if raw_date else None
|
|
|
|
return {
|
|
"meta_description": description or None,
|
|
"follower_count": followers,
|
|
"last_post_date": {"raw": raw_date, "value": last_post_date} if (raw_date or last_post_date) else None,
|
|
"og_title": first_meta(meta, ["og:title"]),
|
|
"og_url": first_meta(meta, ["og:url"]),
|
|
}
|
|
|
|
|
|
POLICY_TERMS = [
|
|
"New Year's Eve",
|
|
"minimum stay",
|
|
"sold out",
|
|
]
|
|
|
|
|
|
def find_term_snippets(text: str, term: str, *, max_hits: int = 3, context: int = 60) -> list[str]:
|
|
pattern = re.escape(term)
|
|
if term.lower() == "new year's eve":
|
|
pattern = r"new\s+year(?:'|\u2019)?s?\s+eve"
|
|
regex = re.compile(pattern, flags=re.IGNORECASE)
|
|
|
|
snippets: list[str] = []
|
|
for match in regex.finditer(text):
|
|
start = max(0, match.start() - context)
|
|
end = min(len(text), match.end() + context)
|
|
snippets.append(normalize_whitespace(text[start:end]))
|
|
if len(snippets) >= max_hits:
|
|
break
|
|
return snippets
|
|
|
|
|
|
def audit_policy(soup: BeautifulSoup) -> dict[str, Any]:
|
|
text = normalize_whitespace(soup.get_text(" ", strip=True))
|
|
checks: list[dict[str, Any]] = []
|
|
for term in POLICY_TERMS:
|
|
snippets = find_term_snippets(text, term)
|
|
checks.append({"term": term, "found": bool(snippets), "snippets": snippets})
|
|
return {"policy_checks": checks}
|
|
|
|
|
|
def audit_listing(
|
|
url: str,
|
|
platform: str,
|
|
*,
|
|
timeout_s: float = 25.0,
|
|
user_agent: str = GOOGLEBOT_UA,
|
|
include_jsonld: bool = False,
|
|
) -> AuditResult:
|
|
normalized = normalize_platform(platform)
|
|
resolved_platform: Platform
|
|
if normalized is None or normalized == "auto":
|
|
resolved_platform = detect_platform(url)
|
|
else:
|
|
resolved_platform = normalized
|
|
fetched_at = iso_now()
|
|
|
|
session = requests.Session()
|
|
resp, error = fetch_url(session, url, timeout_s=timeout_s, user_agent=user_agent)
|
|
if error or resp is None:
|
|
return AuditResult(
|
|
url=url,
|
|
platform=resolved_platform,
|
|
fetched_at=fetched_at,
|
|
status_code=None,
|
|
final_url=None,
|
|
ok=False,
|
|
error=error or "unknown error",
|
|
data={},
|
|
)
|
|
|
|
content_type = (resp.headers.get("Content-Type") or "").lower()
|
|
html = resp.text if "html" in content_type or "<html" in resp.text[:200].lower() else resp.text
|
|
soup = BeautifulSoup(html, "html.parser")
|
|
|
|
data: dict[str, Any] = {
|
|
"content_type": resp.headers.get("Content-Type"),
|
|
"content_length": resp.headers.get("Content-Length"),
|
|
}
|
|
|
|
if resolved_platform in ("booking", "tripadvisor"):
|
|
data.update(audit_booking_or_tripadvisor(soup, include_jsonld=include_jsonld))
|
|
elif resolved_platform in ("instagram", "facebook", "tiktok"):
|
|
data.update(audit_social(soup))
|
|
elif resolved_platform == "policy":
|
|
data.update(audit_policy(soup))
|
|
else:
|
|
data.update(audit_policy(soup))
|
|
|
|
return AuditResult(
|
|
url=url,
|
|
platform=resolved_platform,
|
|
fetched_at=fetched_at,
|
|
status_code=resp.status_code,
|
|
final_url=str(resp.url) if resp.url else None,
|
|
ok=True,
|
|
error=None,
|
|
data=data,
|
|
)
|
|
|
|
|
|
def parse_args() -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser(
|
|
description="Audit listing pages using a Googlebot user-agent (ratings/reviews/badges, followers, policy text)."
|
|
)
|
|
parser.add_argument(
|
|
"--platform",
|
|
default="auto",
|
|
help="Platform hint (auto/booking/tripadvisor/instagram/facebook/tiktok/policy).",
|
|
)
|
|
parser.add_argument("--timeout", type=float, default=25.0)
|
|
parser.add_argument("--user-agent", default=GOOGLEBOT_UA)
|
|
parser.add_argument("--include-jsonld", action="store_true", help="Include parsed JSON-LD blobs in output.")
|
|
parser.add_argument("urls", nargs="+", help="One or more URLs to audit")
|
|
return parser.parse_args()
|
|
|
|
|
|
def main() -> int:
|
|
args = parse_args()
|
|
exit_code = 0
|
|
for url in args.urls:
|
|
result = audit_listing(
|
|
url,
|
|
args.platform,
|
|
timeout_s=args.timeout,
|
|
user_agent=args.user_agent,
|
|
include_jsonld=bool(args.include_jsonld),
|
|
)
|
|
os.write(1, (json.dumps(asdict(result), ensure_ascii=False) + "\n").encode("utf-8"))
|
|
if not result.ok:
|
|
exit_code = 2
|
|
return exit_code
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|