313 lines
11 KiB
Python
313 lines
11 KiB
Python
#!/usr/bin/env python3
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import re
|
|
from dataclasses import asdict, dataclass
|
|
from datetime import date, datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
from playwright.sync_api import Error as PlaywrightError
|
|
from playwright.sync_api import sync_playwright
|
|
|
|
|
|
FLANEUR_URL = "https://booking.roomraccoon.fr/le-fl-neur-guesthouse-8346/fr/"
|
|
HO36_URL = "https://ho36lyon.com/"
|
|
|
|
|
|
def iso_now() -> str:
|
|
return datetime.now(timezone.utc).astimezone().isoformat(timespec="seconds")
|
|
|
|
|
|
def parse_iso_date(value: str) -> date:
|
|
try:
|
|
return date.fromisoformat(value)
|
|
except ValueError as exc:
|
|
msg = f"Invalid ISO date: {value!r} (expected YYYY-MM-DD)"
|
|
raise SystemExit(msg) from exc
|
|
|
|
|
|
def fmt_dd_mm_yyyy(d: date, sep: str = "-") -> str:
|
|
return f"{d.day:02d}{sep}{d.month:02d}{sep}{d.year:04d}"
|
|
|
|
|
|
def fmt_mm_dd_yyyy(d: date) -> str:
|
|
return f"{d.month:02d}/{d.day:02d}/{d.year:04d}"
|
|
|
|
|
|
def ensure_parent(path: Path) -> None:
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
|
def parse_eur_amount(text: str) -> float | None:
|
|
cleaned = text.replace("\xa0", " ").strip()
|
|
m = re.search(r"([0-9]+(?:[.,][0-9]{1,2})?)", cleaned)
|
|
if not m:
|
|
return None
|
|
return float(m.group(1).replace(",", "."))
|
|
|
|
|
|
def parse_int(text: str) -> int | None:
|
|
m = re.search(r"(\d+)", text)
|
|
return int(m.group(1)) if m else None
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class RoomOffer:
|
|
name: str
|
|
available_units: int | None
|
|
min_price_eur: float | None
|
|
unit: str | None
|
|
refund_policy_hint: str | None
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class Capture:
|
|
url: str
|
|
final_url: str | None
|
|
captured_at: str
|
|
checkin: str
|
|
checkout: str
|
|
screenshot_path: str
|
|
html_path: str
|
|
rooms: list[RoomOffer]
|
|
|
|
|
|
def capture_flaneur(*, checkin: date, checkout: date, page, screenshot_path: Path, html_path: Path) -> Capture:
|
|
page.goto(FLANEUR_URL, wait_until="domcontentloaded", timeout=60_000)
|
|
page.wait_for_timeout(1_500)
|
|
|
|
expected_start = fmt_dd_mm_yyyy(checkin, sep="-")
|
|
expected_end = fmt_dd_mm_yyyy(checkout, sep="-")
|
|
|
|
# RoomRaccoon uses readonly inputs with overlay divs; if defaults differ, we still capture
|
|
# but record what was actually present.
|
|
actual_start = page.input_value("#reservationStart")
|
|
actual_end = page.input_value("#reservationEnd")
|
|
|
|
if actual_start != expected_start or actual_end != expected_end:
|
|
# Best-effort adjust using DOM injection + event dispatch (may be ignored by the app).
|
|
page.evaluate(
|
|
"""([start, end]) => {
|
|
const s = document.querySelector('#reservationStart');
|
|
const e = document.querySelector('#reservationEnd');
|
|
if (s) { s.value = start; s.dispatchEvent(new Event('change', { bubbles: true })); }
|
|
if (e) { e.value = end; e.dispatchEvent(new Event('change', { bubbles: true })); }
|
|
}""",
|
|
[expected_start, expected_end],
|
|
)
|
|
page.wait_for_timeout(300)
|
|
actual_start = page.input_value("#reservationStart")
|
|
actual_end = page.input_value("#reservationEnd")
|
|
|
|
page.locator('div:has-text("V\u00c9RIFIER LA DISPONIBILIT\u00c9")').first.click()
|
|
page.wait_for_selector(".be-room", timeout=45_000)
|
|
page.wait_for_timeout(800)
|
|
|
|
ensure_parent(screenshot_path)
|
|
ensure_parent(html_path)
|
|
page.screenshot(path=str(screenshot_path), full_page=True)
|
|
html_path.write_text(page.content(), encoding="utf-8")
|
|
|
|
rooms: list[RoomOffer] = []
|
|
cards = page.locator(".be-room")
|
|
for i in range(cards.count()):
|
|
card = cards.nth(i)
|
|
name = (card.locator("h2,h3").first.inner_text().strip() if card.locator("h2,h3").count() else "").strip()
|
|
if not name:
|
|
continue
|
|
avail_text = card.locator(".be-room-availability").first.inner_text().strip() if card.locator(".be-room-availability").count() else ""
|
|
available_units = parse_int(avail_text)
|
|
price_texts = [t.strip() for t in card.locator(".be-room-ratetype-price").all_text_contents() if t.strip()]
|
|
prices = [p for p in (parse_eur_amount(t) for t in price_texts) if p is not None]
|
|
min_price = min(prices) if prices else None
|
|
card_text = card.inner_text().strip()
|
|
refund_hint = None
|
|
for line in card_text.splitlines():
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
if "Remboursable" in line or "Non remboursable" in line:
|
|
refund_hint = line
|
|
break
|
|
if name.lower().startswith("chambre"):
|
|
unit = "room"
|
|
else:
|
|
unit = "bed"
|
|
rooms.append(
|
|
RoomOffer(
|
|
name=name,
|
|
available_units=available_units,
|
|
min_price_eur=min_price,
|
|
unit=unit,
|
|
refund_policy_hint=refund_hint,
|
|
)
|
|
)
|
|
|
|
return Capture(
|
|
url=FLANEUR_URL,
|
|
final_url=page.url,
|
|
captured_at=iso_now(),
|
|
checkin=checkin.isoformat(),
|
|
checkout=checkout.isoformat(),
|
|
screenshot_path=str(screenshot_path),
|
|
html_path=str(html_path),
|
|
rooms=rooms,
|
|
)
|
|
|
|
|
|
def capture_ho36(*, checkin: date, checkout: date, page, screenshot_path: Path, html_path: Path) -> Capture:
|
|
page.goto(HO36_URL, wait_until="domcontentloaded", timeout=60_000)
|
|
page.wait_for_timeout(2_500)
|
|
|
|
page.locator('input[id^="mews-checkin-"]').first.fill(fmt_mm_dd_yyyy(checkin))
|
|
page.locator('input[id^="mews-checkout-"]').first.fill(fmt_mm_dd_yyyy(checkout))
|
|
page.wait_for_timeout(300)
|
|
page.locator(".mews-button").first.click()
|
|
page.wait_for_selector("iframe.mews-distributor", timeout=30_000)
|
|
|
|
frame = page.frame_locator("iframe.mews-distributor")
|
|
frame.locator("text=/S\u00e9lectionnez une cat\u00e9gorie/i").wait_for(timeout=45_000)
|
|
|
|
# Currency is sometimes CAD by default. Switch to EUR if needed.
|
|
if frame.locator("text=CAD").count():
|
|
frame.locator("text=CAD").first.click()
|
|
frame.locator('h2:has-text("S\u00e9lectionnez votre devise")').wait_for(timeout=30_000)
|
|
frame.locator("text=\u20ac\xa0EUR").first.click()
|
|
frame.locator("text=EUR").first.wait_for(timeout=30_000)
|
|
page.wait_for_timeout(800)
|
|
|
|
# Currency switching can trigger a brief reload; wait for room names to appear.
|
|
frame.locator("text=/S\u00e9lectionnez une cat\u00e9gorie/i").wait_for(timeout=45_000)
|
|
frame.locator("text=/^(Chambre|Lit|Dortoir|Suite)\\b/i").first.wait_for(timeout=45_000)
|
|
|
|
body_text = frame.locator("body").inner_text(timeout=30_000)
|
|
lines = [l.strip() for l in body_text.splitlines() if l.strip()]
|
|
|
|
rooms: list[RoomOffer] = []
|
|
name_re = re.compile(r"^(Chambre|Lit|Dortoir|Suite)\b", re.IGNORECASE)
|
|
|
|
i = 0
|
|
while i < len(lines):
|
|
line = lines[i]
|
|
if line.lower().startswith("cat\u00e9gories sans disponibilit\u00e9"):
|
|
break
|
|
if line != "Image pr\u00e9c\u00e9dente":
|
|
i += 1
|
|
continue
|
|
|
|
j = i + 1
|
|
while j < len(lines) and lines[j].isdigit():
|
|
j += 1
|
|
if j >= len(lines) or not name_re.search(lines[j]):
|
|
i += 1
|
|
continue
|
|
|
|
name = lines[j]
|
|
available_units = None
|
|
price_eur = None
|
|
unit = None
|
|
refund_hint = None
|
|
|
|
k = j + 1
|
|
while k < len(lines):
|
|
nxt = lines[k]
|
|
if nxt.lower().startswith("cat\u00e9gories sans disponibilit\u00e9") or nxt == "Image pr\u00e9c\u00e9dente":
|
|
break
|
|
if nxt == "Non remboursable" and refund_hint is None:
|
|
refund_hint = nxt
|
|
if nxt.startswith("Disponible") and available_units is None:
|
|
available_units = parse_int(nxt)
|
|
if "\u20ac" in nxt and price_eur is None:
|
|
price_eur = parse_eur_amount(nxt)
|
|
if nxt.startswith("par ") and unit is None:
|
|
unit = nxt
|
|
k += 1
|
|
|
|
rooms.append(
|
|
RoomOffer(
|
|
name=name,
|
|
available_units=available_units,
|
|
min_price_eur=price_eur,
|
|
unit=unit,
|
|
refund_policy_hint=refund_hint,
|
|
)
|
|
)
|
|
i = k
|
|
|
|
ensure_parent(screenshot_path)
|
|
ensure_parent(html_path)
|
|
page.screenshot(path=str(screenshot_path), full_page=True)
|
|
html_path.write_text(body_text, encoding="utf-8")
|
|
|
|
return Capture(
|
|
url=HO36_URL,
|
|
final_url=page.url,
|
|
captured_at=iso_now(),
|
|
checkin=checkin.isoformat(),
|
|
checkout=checkout.isoformat(),
|
|
screenshot_path=str(screenshot_path),
|
|
html_path=str(html_path),
|
|
rooms=rooms,
|
|
)
|
|
|
|
|
|
def parse_args() -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser(description="Capture a comparable pricing/availability window for both hostels.")
|
|
parser.add_argument("--checkin", default="2026-01-03", help="ISO date YYYY-MM-DD")
|
|
parser.add_argument("--checkout", default="2026-01-04", help="ISO date YYYY-MM-DD")
|
|
parser.add_argument("--repo-root", default=str(Path(__file__).resolve().parents[2]), help="Repo root (default: auto)")
|
|
parser.add_argument("--run-tag", default=None, help="Optional YYYYMMDD tag for filenames (default: today)")
|
|
return parser.parse_args()
|
|
|
|
|
|
def main() -> int:
|
|
args = parse_args()
|
|
repo_root = Path(args.repo_root).resolve()
|
|
checkin = parse_iso_date(args.checkin)
|
|
checkout = parse_iso_date(args.checkout)
|
|
|
|
if checkout <= checkin:
|
|
raise SystemExit("--checkout must be after --checkin")
|
|
|
|
run_tag = args.run_tag or datetime.now().strftime("%Y%m%d")
|
|
window_tag = f"{checkin.strftime('%Y%m%d')}_{checkout.strftime('%Y%m%d')}"
|
|
|
|
flaneur_png = repo_root / "data" / "flaneur" / "screenshots" / f"flaneur__roomraccoon__pricing__{window_tag}__{run_tag}.png"
|
|
flaneur_html = repo_root / "data" / "flaneur" / "raw" / f"flaneur__roomraccoon__pricing__{window_tag}__{run_tag}.html"
|
|
ho36_png = repo_root / "data" / "ho36" / "screenshots" / f"ho36__mews__pricing__{window_tag}__{run_tag}.png"
|
|
ho36_html = repo_root / "data" / "ho36" / "raw" / f"ho36__mews__pricing__{window_tag}__{run_tag}.txt"
|
|
|
|
out_json = repo_root / "verify" / "results" / f"pricing_window__{window_tag}__{run_tag}.json"
|
|
ensure_parent(out_json)
|
|
|
|
try:
|
|
with sync_playwright() as p:
|
|
browser = p.chromium.launch(headless=True)
|
|
context = browser.new_context(locale="fr-FR", timezone_id="Europe/Paris")
|
|
page = context.new_page()
|
|
|
|
flaneur = capture_flaneur(checkin=checkin, checkout=checkout, page=page, screenshot_path=flaneur_png, html_path=flaneur_html)
|
|
page.wait_for_timeout(1_250)
|
|
|
|
ho36 = capture_ho36(checkin=checkin, checkout=checkout, page=page, screenshot_path=ho36_png, html_path=ho36_html)
|
|
browser.close()
|
|
except (PlaywrightError, OSError, ValueError) as exc:
|
|
raise SystemExit(f"capture failed: {exc}") from exc
|
|
|
|
payload: dict[str, Any] = {
|
|
"captured_at": iso_now(),
|
|
"window": {"checkin": checkin.isoformat(), "checkout": checkout.isoformat()},
|
|
"flaneur": asdict(flaneur),
|
|
"ho36": asdict(ho36),
|
|
}
|
|
|
|
out_json.write_text(json.dumps(payload, ensure_ascii=False, indent=2) + "\n", encoding="utf-8")
|
|
print(json.dumps(payload, ensure_ascii=False, indent=2))
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|