77 lines
2.4 KiB
Python
77 lines
2.4 KiB
Python
from __future__ import annotations
|
|
|
|
import os
|
|
import shutil
|
|
import subprocess
|
|
import tempfile
|
|
from pathlib import Path
|
|
|
|
|
|
class ExtractionError(RuntimeError):
|
|
pass
|
|
|
|
|
|
def _run(cmd: list[str], *, cwd: str | None = None) -> subprocess.CompletedProcess[str]:
|
|
return subprocess.run(cmd, cwd=cwd, check=True, capture_output=True, text=True)
|
|
|
|
|
|
def _looks_empty(text: str) -> bool:
|
|
stripped = text.replace("\f", "").strip()
|
|
return len(stripped) < 50
|
|
|
|
|
|
def extract_text(path: str) -> str:
|
|
input_path = Path(path)
|
|
if not input_path.exists():
|
|
raise ExtractionError(f"Input not found: {input_path}")
|
|
|
|
ext = input_path.suffix.lower()
|
|
if ext in {".txt", ".md"}:
|
|
return input_path.read_text(encoding="utf-8", errors="replace")
|
|
|
|
if ext == ".pdf":
|
|
return extract_text_from_pdf(str(input_path))
|
|
|
|
raise ExtractionError(f"Unsupported file type: {ext}")
|
|
|
|
|
|
def extract_text_from_pdf(path: str) -> str:
|
|
pdftotext = shutil.which("pdftotext")
|
|
if not pdftotext:
|
|
raise ExtractionError("Missing dependency: pdftotext (poppler-utils)")
|
|
|
|
with tempfile.TemporaryDirectory(prefix="revoice-pdf-") as tmpdir:
|
|
out_txt = os.path.join(tmpdir, "out.txt")
|
|
_run([pdftotext, "-layout", path, out_txt])
|
|
text = Path(out_txt).read_text(encoding="utf-8", errors="replace")
|
|
|
|
if not _looks_empty(text):
|
|
return text
|
|
|
|
return ocr_pdf(path)
|
|
|
|
|
|
def ocr_pdf(path: str, *, dpi: int = 200, lang: str = "eng", psm: int = 3) -> str:
|
|
pdftoppm = shutil.which("pdftoppm")
|
|
tesseract = shutil.which("tesseract")
|
|
if not pdftoppm:
|
|
raise ExtractionError("Missing dependency: pdftoppm (poppler-utils)")
|
|
if not tesseract:
|
|
raise ExtractionError("Missing dependency: tesseract (tesseract-ocr)")
|
|
|
|
with tempfile.TemporaryDirectory(prefix="revoice-ocr-") as tmpdir:
|
|
prefix = os.path.join(tmpdir, "page")
|
|
_run([pdftoppm, "-png", "-r", str(dpi), path, prefix])
|
|
|
|
parts: list[str] = []
|
|
for page_path in sorted(Path(tmpdir).glob("page-*.png")):
|
|
header = f"===== {page_path.stem} ====="
|
|
proc = subprocess.run(
|
|
[tesseract, str(page_path), "stdout", "-l", lang, "--psm", str(psm)],
|
|
check=True,
|
|
capture_output=True,
|
|
text=True,
|
|
)
|
|
parts.append(f"{header}\n{proc.stdout.strip()}\n")
|
|
|
|
return "\n\n".join(parts).strip() + "\n"
|