import os import re import json import requests import asyncio import zipfile import shutil import cv2 import time from pathlib import Path from fastapi import FastAPI, Request, BackgroundTasks, Form from fastapi.responses import HTMLResponse, StreamingResponse from fastapi.templating import Jinja2Templates from huggingface_hub import HfApi, list_repo_files, hf_hub_download app = FastAPI() templates = Jinja2Templates(directory="templates") # Configuration from environment variables HF_TOKEN = os.getenv("HF_TOKEN", "") TARGET_REPO_ID = os.getenv("TARGET_REPO_ID", "factorstudios/movzip") DOWNLOAD_DIR = "downloads" FRAMES_DIR = "frames" ZIPS_DIR = "zips" STATE_FILE = "processing_state.json" for d in [DOWNLOAD_DIR, FRAMES_DIR, ZIPS_DIR]: os.makedirs(d, exist_ok=True) api = HfApi(token=HF_TOKEN) # Global status for tracking processing_status = { "is_running": False, "last_processed_url": None, "total_urls_to_process": 0, "processed_count": 0, "current_action": "Idle", "logs": [] } def add_log(msg): timestamp = time.strftime("%H:%M:%S") log_msg = f"[{timestamp}] {msg}" processing_status["logs"].append(log_msg) if len(processing_status["logs"]) > 50: processing_status["logs"].pop(0) print(log_msg) def load_state(): if os.path.exists(STATE_FILE): with open(STATE_FILE, "r") as f: try: return json.load(f) except: pass return {"processed_urls": []} def save_state(state): with open(STATE_FILE, "w") as f: json.dump(state, f, indent=2) def get_direct_link(page_url): match = re.search(r'downloadwella\.com/([^/]+)', page_url) if not match: return None, "Error: Could not extract file ID from URL." file_id = match.group(1).split('.')[0] data = { "op": "download2", "id": file_id, "rand": "", "referer": "", "method_free": "", "method_premium": "" } headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", "Referer": page_url } try: response = requests.post(page_url, data=data, headers=headers) response.raise_for_status() download_match = re.search(r'href="(https://[^"]+\.downloadwella\.com/d/[^"]+)"', response.text) if download_match: return download_match.group(1), None else: if "captcha" in response.text.lower(): return None, "Error: CAPTCHA required by the site. Cannot automate." return None, "Error: Could not find direct download link." except Exception as e: return None, f"Error: {str(e)}" def extract_frames(video_path, output_dir, fps=10): os.makedirs(output_dir, exist_ok=True) cap = cv2.VideoCapture(str(video_path)) if not cap.isOpened(): return 0 video_fps = cap.get(cv2.CAP_PROP_FPS) or 30 frame_interval = max(1, int(round(video_fps / fps))) frame_idx = 0 saved_count = 0 while True: ret, frame = cap.read() if not ret: break if frame_idx % frame_interval == 0: saved_count += 1 cv2.imwrite(os.path.join(output_dir, f"{saved_count:06d}.jpg"), frame, [int(cv2.IMWRITE_JPEG_QUALITY), 90]) frame_idx += 1 cap.release() return saved_count def zip_folder(folder_path, zip_path): with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf: for root, _, files in os.walk(folder_path): for file in files: zipf.write(os.path.join(root, file), arcname=file) async def process_video_url(video_url: str): if processing_status["is_running"]: return processing_status["is_running"] = True state = load_state() try: if video_url in state["processed_urls"]: add_log(f"URL {video_url} already processed. Skipping.") processing_status["is_running"] = False return add_log(f"Processing URL: {video_url}") processing_status["current_action"] = f"Getting direct link for {video_url}" direct_link, error = get_direct_link(video_url) if error: add_log(f"❌ Error getting direct link for {video_url}: {error}") processing_status["is_running"] = False return filename = direct_link.split('/')[-1] local_video_path = os.path.join(DOWNLOAD_DIR, filename) add_log(f"Direct link found. Downloading {filename}...") processing_status["current_action"] = f"Downloading {filename}" with requests.get(direct_link, stream=True) as r: r.raise_for_status() with open(local_video_path, 'wb') as f: for chunk in r.iter_content(chunk_size=8192): f.write(chunk) video_name = Path(filename).stem video_frames_dir = os.path.join(FRAMES_DIR, video_name) add_log(f"Extracting frames for {video_name}...") processing_status["current_action"] = f"Extracting frames for {video_name}" frame_count = extract_frames(local_video_path, video_frames_dir) if frame_count > 0: zip_filename = f"{video_name}_frames.zip" zip_path = os.path.join(ZIPS_DIR, zip_filename) add_log(f"Zipping {frame_count} frames...") processing_status["current_action"] = f"Zipping frames for {video_name}" zip_folder(video_frames_dir, zip_path) add_log(f"Uploading {zip_filename} to {TARGET_REPO_ID}...") processing_status["current_action"] = f"Uploading {zip_filename}" api.upload_file( path_or_fileobj=zip_path, path_in_repo=zip_filename, repo_id=TARGET_REPO_ID, repo_type="dataset" ) state["processed_urls"].append(video_url) save_state(state) processing_status["processed_count"] = len(state["processed_urls"]) processing_status["last_processed_url"] = video_url add_log(f"✅ Finished processing {video_url}") else: add_log(f"⚠️ No frames extracted for {video_name}. Skipping upload.") # Cleanup if os.path.exists(video_frames_dir): shutil.rmtree(video_frames_dir) if os.path.exists(local_video_path): os.remove(local_video_path) if os.path.exists(zip_path): os.remove(zip_path) processing_status["current_action"] = "Completed" add_log("🎉 Video processing completed!") except Exception as e: add_log(f"❌ Error during processing {video_url}: {str(e)}") processing_status["current_action"] = "Error" finally: processing_status["is_running"] = False @app.get("/", response_class=HTMLResponse) async def index(request: Request): return templates.TemplateResponse(request=request, name="index.html", context={"status": processing_status}) @app.get("/stats") async def get_stats(): return processing_status @app.post("/start") async def start_processor(background_tasks: BackgroundTasks, video_url: str = Form(...)): if not processing_status["is_running"]: background_tasks.add_task(process_video_url, video_url) return {"message": "Processor started", "url": video_url} return {"message": "Processor already running", "current_url": processing_status["last_processed_url"]} if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=7860)