| import os |
| import re |
| import json |
| import requests |
| import asyncio |
| import zipfile |
| import shutil |
| import cv2 |
| import time |
| from pathlib import Path |
| from fastapi import FastAPI, Request, BackgroundTasks, Form |
| from fastapi.responses import HTMLResponse, StreamingResponse |
| from fastapi.templating import Jinja2Templates |
| from huggingface_hub import HfApi, list_repo_files, hf_hub_download |
|
|
| app = FastAPI() |
| templates = Jinja2Templates(directory="templates") |
|
|
| |
| HF_TOKEN = os.getenv("HF_TOKEN", "") |
| TARGET_REPO_ID = os.getenv("TARGET_REPO_ID", "factorstudios/movzip") |
|
|
| DOWNLOAD_DIR = "downloads" |
| FRAMES_DIR = "frames" |
| ZIPS_DIR = "zips" |
| STATE_FILE = "processing_state.json" |
|
|
| for d in [DOWNLOAD_DIR, FRAMES_DIR, ZIPS_DIR]: |
| os.makedirs(d, exist_ok=True) |
|
|
| api = HfApi(token=HF_TOKEN) |
|
|
| |
| processing_status = { |
| "is_running": False, |
| "last_processed_url": None, |
| "total_urls_to_process": 0, |
| "processed_count": 0, |
| "current_action": "Idle", |
| "logs": [] |
| } |
|
|
| def add_log(msg): |
| timestamp = time.strftime("%H:%M:%S") |
| log_msg = f"[{timestamp}] {msg}" |
| processing_status["logs"].append(log_msg) |
| if len(processing_status["logs"]) > 50: |
| processing_status["logs"].pop(0) |
| print(log_msg) |
|
|
| def load_state(): |
| if os.path.exists(STATE_FILE): |
| with open(STATE_FILE, "r") as f: |
| try: |
| return json.load(f) |
| except: |
| pass |
| return {"processed_urls": []} |
|
|
| def save_state(state): |
| with open(STATE_FILE, "w") as f: |
| json.dump(state, f, indent=2) |
|
|
| def get_direct_link(page_url): |
| match = re.search(r'downloadwella\.com/([^/]+)', page_url) |
| if not match: |
| return None, "Error: Could not extract file ID from URL." |
| |
| file_id = match.group(1).split('.')[0] |
| |
| data = { |
| "op": "download2", |
| "id": file_id, |
| "rand": "", |
| "referer": "", |
| "method_free": "", |
| "method_premium": "" |
| } |
|
|
| headers = { |
| "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", |
| "Referer": page_url |
| } |
|
|
| try: |
| response = requests.post(page_url, data=data, headers=headers) |
| response.raise_for_status() |
| |
| download_match = re.search(r'href="(https://[^"]+\.downloadwella\.com/d/[^"]+)"', response.text) |
| |
| if download_match: |
| return download_match.group(1), None |
| else: |
| if "captcha" in response.text.lower(): |
| return None, "Error: CAPTCHA required by the site. Cannot automate." |
| return None, "Error: Could not find direct download link." |
| except Exception as e: |
| return None, f"Error: {str(e)}" |
|
|
| def extract_frames(video_path, output_dir, fps=10): |
| os.makedirs(output_dir, exist_ok=True) |
| cap = cv2.VideoCapture(str(video_path)) |
| if not cap.isOpened(): |
| return 0 |
| video_fps = cap.get(cv2.CAP_PROP_FPS) or 30 |
| frame_interval = max(1, int(round(video_fps / fps))) |
| frame_idx = 0 |
| saved_count = 0 |
| while True: |
| ret, frame = cap.read() |
| if not ret: break |
| if frame_idx % frame_interval == 0: |
| saved_count += 1 |
| cv2.imwrite(os.path.join(output_dir, f"{saved_count:06d}.jpg"), frame, [int(cv2.IMWRITE_JPEG_QUALITY), 90]) |
| frame_idx += 1 |
| cap.release() |
| return saved_count |
|
|
| def zip_folder(folder_path, zip_path): |
| with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf: |
| for root, _, files in os.walk(folder_path): |
| for file in files: |
| zipf.write(os.path.join(root, file), arcname=file) |
|
|
| async def process_video_url(video_url: str): |
| if processing_status["is_running"]: |
| return |
| |
| processing_status["is_running"] = True |
| state = load_state() |
| |
| try: |
| if video_url in state["processed_urls"]: |
| add_log(f"URL {video_url} already processed. Skipping.") |
| processing_status["is_running"] = False |
| return |
|
|
| add_log(f"Processing URL: {video_url}") |
| processing_status["current_action"] = f"Getting direct link for {video_url}" |
| direct_link, error = get_direct_link(video_url) |
| if error: |
| add_log(f"โ Error getting direct link for {video_url}: {error}") |
| processing_status["is_running"] = False |
| return |
|
|
| filename = direct_link.split('/')[-1] |
| local_video_path = os.path.join(DOWNLOAD_DIR, filename) |
| add_log(f"Direct link found. Downloading {filename}...") |
| processing_status["current_action"] = f"Downloading {filename}" |
|
|
| with requests.get(direct_link, stream=True) as r: |
| r.raise_for_status() |
| with open(local_video_path, 'wb') as f: |
| for chunk in r.iter_content(chunk_size=8192): |
| f.write(chunk) |
| |
| video_name = Path(filename).stem |
| video_frames_dir = os.path.join(FRAMES_DIR, video_name) |
| add_log(f"Extracting frames for {video_name}...") |
| processing_status["current_action"] = f"Extracting frames for {video_name}" |
| frame_count = extract_frames(local_video_path, video_frames_dir) |
| |
| if frame_count > 0: |
| zip_filename = f"{video_name}_frames.zip" |
| zip_path = os.path.join(ZIPS_DIR, zip_filename) |
| add_log(f"Zipping {frame_count} frames...") |
| processing_status["current_action"] = f"Zipping frames for {video_name}" |
| zip_folder(video_frames_dir, zip_path) |
| |
| add_log(f"Uploading {zip_filename} to {TARGET_REPO_ID}...") |
| processing_status["current_action"] = f"Uploading {zip_filename}" |
| api.upload_file( |
| path_or_fileobj=zip_path, |
| path_in_repo=zip_filename, |
| repo_id=TARGET_REPO_ID, |
| repo_type="dataset" |
| ) |
| |
| state["processed_urls"].append(video_url) |
| save_state(state) |
| processing_status["processed_count"] = len(state["processed_urls"]) |
| processing_status["last_processed_url"] = video_url |
| add_log(f"โ
Finished processing {video_url}") |
| else: |
| add_log(f"โ ๏ธ No frames extracted for {video_name}. Skipping upload.") |
| |
| |
| if os.path.exists(video_frames_dir): shutil.rmtree(video_frames_dir) |
| if os.path.exists(local_video_path): os.remove(local_video_path) |
| if os.path.exists(zip_path): os.remove(zip_path) |
| |
| processing_status["current_action"] = "Completed" |
| add_log("๐ Video processing completed!") |
| |
| except Exception as e: |
| add_log(f"โ Error during processing {video_url}: {str(e)}") |
| processing_status["current_action"] = "Error" |
| finally: |
| processing_status["is_running"] = False |
|
|
| @app.get("/", response_class=HTMLResponse) |
| async def index(request: Request): |
| return templates.TemplateResponse(request=request, name="index.html", context={"status": processing_status}) |
|
|
| @app.get("/stats") |
| async def get_stats(): |
| return processing_status |
|
|
| @app.post("/start") |
| async def start_processor(background_tasks: BackgroundTasks, video_url: str = Form(...)): |
| if not processing_status["is_running"]: |
| background_tasks.add_task(process_video_url, video_url) |
| return {"message": "Processor started", "url": video_url} |
| return {"message": "Processor already running", "current_url": processing_status["last_processed_url"]} |
|
|
| if __name__ == "__main__": |
| import uvicorn |
| uvicorn.run(app, host="0.0.0.0", port=7860) |
|
|