117 lines
3.4 KiB
Python
117 lines
3.4 KiB
Python
import json
|
|
import subprocess
|
|
import sys
|
|
from typing import Any
|
|
from urllib.parse import parse_qs, urlparse
|
|
|
|
|
|
class YouTubeServiceError(RuntimeError):
|
|
pass
|
|
|
|
|
|
YOUTUBE_HOSTS = {
|
|
"youtube.com",
|
|
"www.youtube.com",
|
|
"m.youtube.com",
|
|
"music.youtube.com",
|
|
"youtu.be",
|
|
"www.youtu.be",
|
|
"youtube-nocookie.com",
|
|
"www.youtube-nocookie.com",
|
|
}
|
|
|
|
|
|
def validate_youtube_url(url: str) -> str:
|
|
candidate = (url or "").strip()
|
|
parsed = urlparse(candidate)
|
|
|
|
if parsed.scheme not in {"http", "https"}:
|
|
raise YouTubeServiceError("Enter a full YouTube URL starting with http:// or https://.")
|
|
|
|
host = (parsed.netloc or "").lower().removesuffix(".")
|
|
if host not in YOUTUBE_HOSTS:
|
|
raise YouTubeServiceError("Only YouTube video URLs are supported.")
|
|
|
|
path = parsed.path or ""
|
|
query = parse_qs(parsed.query)
|
|
|
|
if host in {"youtu.be", "www.youtu.be"} and path.strip("/"):
|
|
return candidate
|
|
|
|
if host.endswith("youtube.com") or host.endswith("youtube-nocookie.com"):
|
|
if path == "/watch" and query.get("v", [""])[0].strip():
|
|
return candidate
|
|
parts = [part for part in path.split("/") if part]
|
|
if len(parts) >= 2 and parts[0] in {"shorts", "live", "embed"} and parts[1].strip():
|
|
return candidate
|
|
|
|
raise YouTubeServiceError("That does not look like a supported YouTube video URL.")
|
|
|
|
|
|
def fetch_video_info(url: str) -> dict[str, Any]:
|
|
valid_url = validate_youtube_url(url)
|
|
command = [
|
|
sys.executable,
|
|
"-m",
|
|
"yt_dlp",
|
|
"--dump-single-json",
|
|
"--no-playlist",
|
|
"--skip-download",
|
|
"--no-warnings",
|
|
valid_url,
|
|
]
|
|
|
|
try:
|
|
result = subprocess.run(
|
|
command,
|
|
check=False,
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=90,
|
|
)
|
|
except subprocess.TimeoutExpired as exc:
|
|
raise YouTubeServiceError("Fetching video info timed out.") from exc
|
|
|
|
if result.returncode != 0:
|
|
raise YouTubeServiceError(_friendly_process_error(result.stderr, "Could not fetch video info."))
|
|
|
|
try:
|
|
data = json.loads(result.stdout)
|
|
except json.JSONDecodeError as exc:
|
|
raise YouTubeServiceError("yt-dlp returned video info in an unreadable format.") from exc
|
|
|
|
return {
|
|
"title": data.get("title") or "Untitled YouTube video",
|
|
"channel": data.get("channel") or data.get("uploader") or "Unknown channel",
|
|
"duration": _format_duration(data.get("duration")),
|
|
"duration_seconds": data.get("duration"),
|
|
"thumbnail": data.get("thumbnail"),
|
|
"webpage_url": data.get("webpage_url") or valid_url,
|
|
}
|
|
|
|
|
|
def _format_duration(seconds: Any) -> str:
|
|
if not isinstance(seconds, (int, float)) or seconds < 0:
|
|
return "Unknown"
|
|
|
|
total = int(seconds)
|
|
hours, remainder = divmod(total, 3600)
|
|
minutes, secs = divmod(remainder, 60)
|
|
|
|
if hours:
|
|
return f"{hours}:{minutes:02d}:{secs:02d}"
|
|
return f"{minutes}:{secs:02d}"
|
|
|
|
|
|
def _friendly_process_error(stderr: str, fallback: str) -> str:
|
|
lines = [line.strip() for line in (stderr or "").splitlines() if line.strip()]
|
|
if not lines:
|
|
return fallback
|
|
|
|
message = lines[-1]
|
|
for prefix in ("ERROR:", "WARNING:"):
|
|
if message.startswith(prefix):
|
|
message = message.removeprefix(prefix).strip()
|
|
|
|
return message or fallback
|