import asyncio import json import os import re import uuid from pathlib import Path from typing import Optional from fastapi import FastAPI, HTTPException from fastapi.responses import FileResponse, StreamingResponse from fastapi.staticfiles import StaticFiles from pydantic import BaseModel import yt_dlp DOWNLOAD_DIR = Path(os.environ.get("DOWNLOAD_DIR", "/downloads")) DOWNLOAD_DIR.mkdir(parents=True, exist_ok=True) app = FastAPI(title="yt-dlp Web Downloader") # In-memory job store jobs: dict[str, dict] = {} class DownloadRequest(BaseModel): url: str format: str = "video" # "video" | "audio" quality: str = "best" # "best" | "1080" | "720" | "480" | "360" class JobStatus(BaseModel): id: str status: str # "pending" | "running" | "done" | "error" progress: float = 0.0 speed: str = "" eta: str = "" filename: str = "" error: str = "" title: str = "" def make_progress_hook(job_id: str): def hook(d): job = jobs.get(job_id) if not job: return if d["status"] == "downloading": raw = d.get("_percent_str", "0%").strip().replace("%", "") try: job["progress"] = float(raw) except ValueError: job["progress"] = 0.0 job["speed"] = d.get("_speed_str", "").strip() job["eta"] = d.get("_eta_str", "").strip() job["status"] = "running" elif d["status"] == "finished": job["progress"] = 100.0 job["status"] = "processing" return hook def build_ydl_opts(req: DownloadRequest, job_id: str, out_template: str) -> dict: opts = { "outtmpl": out_template, "progress_hooks": [make_progress_hook(job_id)], "quiet": True, "no_warnings": True, "noplaylist": True, } if req.format == "audio": opts["format"] = "bestaudio/best" opts["postprocessors"] = [ { "key": "FFmpegExtractAudio", "preferredcodec": "mp3", "preferredquality": "192", } ] else: # Video quality mapping quality_map = { "best": "bestvideo[ext=mp4]+bestaudio[ext=m4a]/bestvideo+bestaudio/best", "1080": "bestvideo[height<=1080][ext=mp4]+bestaudio[ext=m4a]/bestvideo[height<=1080]+bestaudio/best[height<=1080]", "720": "bestvideo[height<=720][ext=mp4]+bestaudio[ext=m4a]/bestvideo[height<=720]+bestaudio/best[height<=720]", "480": "bestvideo[height<=480][ext=mp4]+bestaudio[ext=m4a]/bestvideo[height<=480]+bestaudio/best[height<=480]", "360": "bestvideo[height<=360][ext=mp4]+bestaudio[ext=m4a]/bestvideo[height<=360]+bestaudio/best[height<=360]", } opts["format"] = quality_map.get(req.quality, quality_map["best"]) opts["merge_output_format"] = "mp4" return opts def run_download(job_id: str, req: DownloadRequest): job = jobs[job_id] try: out_template = str(DOWNLOAD_DIR / f"{job_id}_%(title)s.%(ext)s") opts = build_ydl_opts(req, job_id, out_template) # First fetch info to get title with yt_dlp.YoutubeDL({"quiet": True, "no_warnings": True}) as ydl_info: info = ydl_info.extract_info(req.url, download=False) job["title"] = info.get("title", "Unknown") with yt_dlp.YoutubeDL(opts) as ydl: ydl.download([req.url]) # Find the output file found = list(DOWNLOAD_DIR.glob(f"{job_id}_*")) if found: job["filename"] = found[0].name job["status"] = "done" job["progress"] = 100.0 else: job["status"] = "error" job["error"] = "Output file not found after download." except Exception as e: job["status"] = "error" job["error"] = str(e) @app.post("/api/download", response_model=JobStatus) async def start_download(req: DownloadRequest): job_id = str(uuid.uuid4()) jobs[job_id] = { "id": job_id, "status": "pending", "progress": 0.0, "speed": "", "eta": "", "filename": "", "error": "", "title": "", } loop = asyncio.get_event_loop() loop.run_in_executor(None, run_download, job_id, req) return JobStatus(id=job_id, status="pending") @app.get("/api/status/{job_id}", response_model=JobStatus) async def get_status(job_id: str): job = jobs.get(job_id) if not job: raise HTTPException(status_code=404, detail="Job not found") return JobStatus(**job) @app.get("/api/file/{job_id}") async def download_file(job_id: str): job = jobs.get(job_id) if not job: raise HTTPException(status_code=404, detail="Job not found") if job["status"] != "done": raise HTTPException(status_code=400, detail="File not ready") file_path = DOWNLOAD_DIR / job["filename"] if not file_path.exists(): raise HTTPException(status_code=404, detail="File missing on disk") return FileResponse( path=str(file_path), filename=job["filename"].split("_", 1)[-1] if "_" in job["filename"] else job["filename"], media_type="application/octet-stream", ) @app.get("/api/info") async def get_info(url: str): try: with yt_dlp.YoutubeDL({"quiet": True, "no_warnings": True}) as ydl: info = ydl.extract_info(url, download=False) return { "title": info.get("title", ""), "thumbnail": info.get("thumbnail", ""), "duration": info.get("duration", 0), "uploader": info.get("uploader", ""), "view_count": info.get("view_count", 0), } except Exception as e: raise HTTPException(status_code=400, detail=str(e)) # Serve static frontend — resolve relative to this file so it works both locally and in Docker STATIC_DIR = Path(__file__).parent / "static" app.mount("/", StaticFiles(directory=str(STATIC_DIR), html=True), name="static")