ups / main.py
samfred2's picture
Update main.py
e37f165 verified
Raw
History Blame Contribute Delete
7.7 kB
import os
import re
import json
import requests
import asyncio
import zipfile
import shutil
import cv2
import time
from pathlib import Path
from fastapi import FastAPI, Request, BackgroundTasks, Form
from fastapi.responses import HTMLResponse, StreamingResponse
from fastapi.templating import Jinja2Templates
from huggingface_hub import HfApi, list_repo_files, hf_hub_download
app = FastAPI()
templates = Jinja2Templates(directory="templates")
# Configuration from environment variables
HF_TOKEN = os.getenv("HF_TOKEN", "")
TARGET_REPO_ID = os.getenv("TARGET_REPO_ID", "factorstudios/movzip")
DOWNLOAD_DIR = "downloads"
FRAMES_DIR = "frames"
ZIPS_DIR = "zips"
STATE_FILE = "processing_state.json"
for d in [DOWNLOAD_DIR, FRAMES_DIR, ZIPS_DIR]:
os.makedirs(d, exist_ok=True)
api = HfApi(token=HF_TOKEN)
# Global status for tracking
processing_status = {
"is_running": False,
"last_processed_url": None,
"total_urls_to_process": 0,
"processed_count": 0,
"current_action": "Idle",
"logs": []
}
def add_log(msg):
timestamp = time.strftime("%H:%M:%S")
log_msg = f"[{timestamp}] {msg}"
processing_status["logs"].append(log_msg)
if len(processing_status["logs"]) > 50:
processing_status["logs"].pop(0)
print(log_msg)
def load_state():
if os.path.exists(STATE_FILE):
with open(STATE_FILE, "r") as f:
try:
return json.load(f)
except:
pass
return {"processed_urls": []}
def save_state(state):
with open(STATE_FILE, "w") as f:
json.dump(state, f, indent=2)
def get_direct_link(page_url):
match = re.search(r'downloadwella\.com/([^/]+)', page_url)
if not match:
return None, "Error: Could not extract file ID from URL."
file_id = match.group(1).split('.')[0]
data = {
"op": "download2",
"id": file_id,
"rand": "",
"referer": "",
"method_free": "",
"method_premium": ""
}
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Referer": page_url
}
try:
response = requests.post(page_url, data=data, headers=headers)
response.raise_for_status()
download_match = re.search(r'href="(https://[^"]+\.downloadwella\.com/d/[^"]+)"', response.text)
if download_match:
return download_match.group(1), None
else:
if "captcha" in response.text.lower():
return None, "Error: CAPTCHA required by the site. Cannot automate."
return None, "Error: Could not find direct download link."
except Exception as e:
return None, f"Error: {str(e)}"
def extract_frames(video_path, output_dir, fps=10):
os.makedirs(output_dir, exist_ok=True)
cap = cv2.VideoCapture(str(video_path))
if not cap.isOpened():
return 0
video_fps = cap.get(cv2.CAP_PROP_FPS) or 30
frame_interval = max(1, int(round(video_fps / fps)))
frame_idx = 0
saved_count = 0
while True:
ret, frame = cap.read()
if not ret: break
if frame_idx % frame_interval == 0:
saved_count += 1
cv2.imwrite(os.path.join(output_dir, f"{saved_count:06d}.jpg"), frame, [int(cv2.IMWRITE_JPEG_QUALITY), 90])
frame_idx += 1
cap.release()
return saved_count
def zip_folder(folder_path, zip_path):
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
for root, _, files in os.walk(folder_path):
for file in files:
zipf.write(os.path.join(root, file), arcname=file)
async def process_video_url(video_url: str):
if processing_status["is_running"]:
return
processing_status["is_running"] = True
state = load_state()
try:
if video_url in state["processed_urls"]:
add_log(f"URL {video_url} already processed. Skipping.")
processing_status["is_running"] = False
return
add_log(f"Processing URL: {video_url}")
processing_status["current_action"] = f"Getting direct link for {video_url}"
direct_link, error = get_direct_link(video_url)
if error:
add_log(f"โŒ Error getting direct link for {video_url}: {error}")
processing_status["is_running"] = False
return
filename = direct_link.split('/')[-1]
local_video_path = os.path.join(DOWNLOAD_DIR, filename)
add_log(f"Direct link found. Downloading {filename}...")
processing_status["current_action"] = f"Downloading {filename}"
with requests.get(direct_link, stream=True) as r:
r.raise_for_status()
with open(local_video_path, 'wb') as f:
for chunk in r.iter_content(chunk_size=8192):
f.write(chunk)
video_name = Path(filename).stem
video_frames_dir = os.path.join(FRAMES_DIR, video_name)
add_log(f"Extracting frames for {video_name}...")
processing_status["current_action"] = f"Extracting frames for {video_name}"
frame_count = extract_frames(local_video_path, video_frames_dir)
if frame_count > 0:
zip_filename = f"{video_name}_frames.zip"
zip_path = os.path.join(ZIPS_DIR, zip_filename)
add_log(f"Zipping {frame_count} frames...")
processing_status["current_action"] = f"Zipping frames for {video_name}"
zip_folder(video_frames_dir, zip_path)
add_log(f"Uploading {zip_filename} to {TARGET_REPO_ID}...")
processing_status["current_action"] = f"Uploading {zip_filename}"
api.upload_file(
path_or_fileobj=zip_path,
path_in_repo=zip_filename,
repo_id=TARGET_REPO_ID,
repo_type="dataset"
)
state["processed_urls"].append(video_url)
save_state(state)
processing_status["processed_count"] = len(state["processed_urls"])
processing_status["last_processed_url"] = video_url
add_log(f"โœ… Finished processing {video_url}")
else:
add_log(f"โš ๏ธ No frames extracted for {video_name}. Skipping upload.")
# Cleanup
if os.path.exists(video_frames_dir): shutil.rmtree(video_frames_dir)
if os.path.exists(local_video_path): os.remove(local_video_path)
if os.path.exists(zip_path): os.remove(zip_path)
processing_status["current_action"] = "Completed"
add_log("๐ŸŽ‰ Video processing completed!")
except Exception as e:
add_log(f"โŒ Error during processing {video_url}: {str(e)}")
processing_status["current_action"] = "Error"
finally:
processing_status["is_running"] = False
@app.get("/", response_class=HTMLResponse)
async def index(request: Request):
return templates.TemplateResponse(request=request, name="index.html", context={"status": processing_status})
@app.get("/stats")
async def get_stats():
return processing_status
@app.post("/start")
async def start_processor(background_tasks: BackgroundTasks, video_url: str = Form(...)):
if not processing_status["is_running"]:
background_tasks.add_task(process_video_url, video_url)
return {"message": "Processor started", "url": video_url}
return {"message": "Processor already running", "current_url": processing_status["last_processed_url"]}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7860)