| import requests, json, os, asyncio, time
|
| from datetime import datetime
|
| from fastapi import FastAPI, UploadFile, File, Form
|
| from fastapi.responses import HTMLResponse, JSONResponse
|
| from typing import List
|
| import uvicorn
|
|
|
| TRANSCRIBE_URL = "https://likhith0715--memory-keeper-transcribe-audio.modal.run"
|
| PHOTO_URL = "https://likhith0715--memory-keeper-describe-photo.modal.run"
|
| BOOK_URL = "https://likhith0715--memory-keeper-build-memory-book.modal.run"
|
|
|
|
|
| STORAGE_DIR = "./data/memories"
|
| os.makedirs(STORAGE_DIR, exist_ok=True)
|
|
|
| def profile_path(name: str) -> str:
|
| safe = "".join(c if c.isalnum() or c in "-_ " else "_" for c in name).strip().lower()
|
| return f"{STORAGE_DIR}/{safe}.json"
|
|
|
| def load_profile(name: str) -> dict:
|
| path = profile_path(name)
|
| if os.path.exists(path):
|
| with open(path) as f:
|
| return json.load(f)
|
| return {"name": name, "transcripts": [], "photo_descriptions": [], "sessions": 0, "history": []}
|
|
|
| def save_profile(profile: dict):
|
| with open(profile_path(profile["name"]), "w") as f:
|
| json.dump(profile, f, indent=2)
|
|
|
| def list_profiles() -> list:
|
| profiles = []
|
| for fname in os.listdir(STORAGE_DIR):
|
| if fname.endswith(".json"):
|
| with open(f"{STORAGE_DIR}/{fname}") as f:
|
| try:
|
| p = json.load(f)
|
| profiles.append({
|
| "name": p["name"],
|
| "sessions": p.get("sessions", 0),
|
| "memories": len(p.get("transcripts", [])),
|
| "photos": len(p.get("photo_descriptions", []))
|
| })
|
| except: pass
|
| import gradio as gr
|
|
|
| fast_app = gr.Server()
|
|
|
| async def cleanup_old_memories():
|
| """Background task to delete profile JSONs older than 48 hours."""
|
| while True:
|
| now = time.time()
|
| for fname in os.listdir(STORAGE_DIR):
|
| if fname.endswith(".json"):
|
| path = os.path.join(STORAGE_DIR, fname)
|
| try:
|
| if now - os.path.getmtime(path) > 48 * 3600:
|
| os.remove(path)
|
| except Exception:
|
| pass
|
| await asyncio.sleep(3600)
|
|
|
| @fast_app.on_event("startup")
|
| async def startup_event():
|
| asyncio.create_task(cleanup_old_memories())
|
|
|
| @fast_app.get("/", response_class=HTMLResponse)
|
| def index():
|
| return open("index.html").read()
|
|
|
| @fast_app.get("/profiles")
|
| def get_profiles():
|
| return JSONResponse(list_profiles())
|
|
|
| @fast_app.get("/profile/{name}")
|
| def get_profile(name: str):
|
| return JSONResponse(load_profile(name))
|
|
|
| @fast_app.delete("/profile/{name}")
|
| def delete_profile(name: str):
|
| path = profile_path(name)
|
| if os.path.exists(path):
|
| os.remove(path)
|
| return JSONResponse({"ok": True})
|
|
|
| @fast_app.post("/run")
|
| async def run(
|
| name: str = Form(...),
|
| text: str = Form(""),
|
| audios: List[UploadFile] = File(default=[]),
|
| photos: List[UploadFile] = File(default=[])
|
| ):
|
|
|
| profile = load_profile(name)
|
| new_transcripts = []
|
| new_photos = []
|
|
|
| if text.strip():
|
| new_transcripts.append(text)
|
|
|
| for audio in audios:
|
| if audio and audio.filename:
|
| audio_bytes = await audio.read()
|
| if audio_bytes:
|
| resp = requests.post(
|
| TRANSCRIBE_URL, data=audio_bytes,
|
| headers={"Content-Type": "application/octet-stream"},
|
| timeout=300
|
| )
|
| if resp.status_code == 200:
|
| new_transcripts.append(f"[Voice]: {resp.json()}")
|
|
|
| for photo in photos:
|
| if photo and photo.filename:
|
| photo_bytes = await photo.read()
|
| if photo_bytes:
|
| resp = requests.post(
|
| PHOTO_URL, data=photo_bytes,
|
| headers={"Content-Type": "application/octet-stream"},
|
| timeout=300
|
| )
|
| if resp.status_code == 200:
|
| new_photos.append(resp.json())
|
|
|
| if not new_transcripts and not new_photos:
|
| return JSONResponse({"error": "No memories provided"}, status_code=400)
|
|
|
|
|
| profile["transcripts"].extend(new_transcripts)
|
| profile["photo_descriptions"].extend(new_photos)
|
| profile["sessions"] = profile.get("sessions", 0) + 1
|
| save_profile(profile)
|
|
|
|
|
| if not profile["transcripts"]:
|
| profile["transcripts"].append(f"Photos were shared of {name}.")
|
|
|
| resp = requests.post(BOOK_URL, json={
|
| "name": name,
|
| "transcripts": profile["transcripts"],
|
| "photo_descriptions": profile["photo_descriptions"]
|
| }, timeout=600)
|
|
|
| result = resp.json()
|
|
|
|
|
| history_entry = {
|
| "session": profile["sessions"],
|
| "timestamp": datetime.now().strftime("%b %d, %Y - %H:%M"),
|
| "timeline": result.get("timeline", ""),
|
| "chapter": result.get("chapter", ""),
|
| "letter": result.get("letter", ""),
|
| "people": result.get("people", "")
|
| }
|
| profile.setdefault("history", []).append(history_entry)
|
| save_profile(profile)
|
|
|
| result["sessions"] = profile["sessions"]
|
| result["total_memories"] = len(profile["transcripts"])
|
| result["total_photos"] = len(profile["photo_descriptions"])
|
| result["history"] = profile["history"]
|
| return JSONResponse(result)
|
|
|
| if __name__ == "__main__":
|
| fast_app.launch(server_name="0.0.0.0", server_port=7860)
|
|
|