github-actions[bot]
Deploy backend with correct structure
099df87
Raw
History Blame Contribute Delete
6.3 kB
import os
import uuid
import asyncio
import json
import requests
from typing import AsyncGenerator
from dotenv import load_dotenv
# Load .env so env vars are available when starting Uvicorn directly
load_dotenv()
# Groq configuration
GROQ_API_KEY = os.getenv("GROQ_API_KEY", "").strip()
GROQ_MODEL = os.getenv("GROQ_MODEL", "llama-3.1-8b-instant").strip()
GROQ_URL = "https://api.groq.com/openai/v1/chat/completions"
# Optional switch to simulate local behavior (no external calls)
GROQ_DISABLED = os.getenv("GROQ_DISABLED", "").lower() in {"1", "true", "yes"}
# Reasonable connect/read timeouts for generation/streaming
DEFAULT_TIMEOUT = (10, 120)
# Base headers for Groq API
HEADERS = {
"Authorization": f"Bearer {GROQ_API_KEY}" if GROQ_API_KEY else "",
"Content-Type": "application/json",
}
def generate_report_id() -> str:
"""Create a unique ID for each report."""
return str(uuid.uuid4())
def stream_event(kind: str, data):
"""
Serialize events as proper JSON for SSE.
The FastAPI route will send lines like: `data: <json>\n\n`
Frontend can safely parse with json.loads(payload).
"""
return json.dumps({"kind": kind, "data": data}, ensure_ascii=False)
def _chunk(text: str, n: int):
"""Split text into small pieces to render a smoother streaming experience."""
for i in range(0, len(text), n):
yield text[i : i + n]
async def run_researcher_async(topic: str) -> str:
"""
Researcher step: produce compact factual bullets.
Fallback text is returned if GROQ is disabled or unavailable.
"""
if GROQ_DISABLED or not GROQ_API_KEY:
return (
f"- What is '{topic}'?\n"
f"- 3–5 key facts\n"
f"- Common use cases\n"
f"- Simple examples\n"
)
payload = {
"model": GROQ_MODEL,
"messages": [
{"role": "system", "content": "You are a concise researcher."},
{
"role": "user",
"content": f"Provide compact, factual bullet points about '{topic}'. "
f"Max 8 bullets. Avoid filler text.",
},
],
"temperature": 0.5,
}
try:
r = requests.post(GROQ_URL, headers=HEADERS, json=payload, timeout=DEFAULT_TIMEOUT)
r.raise_for_status()
return r.json()["choices"][0]["message"]["content"]
except Exception as e:
# Fallback on any network/API error
return f"[fallback researcher due to error: {e}]\n- Background\n- Key points\n- Examples"
async def run_analyst_async(researcher_notes: str) -> str:
"""
Analyst step: extract key insights and implications from researcher notes.
Fallback text is returned if GROQ is disabled or unavailable.
"""
if GROQ_DISABLED or not GROQ_API_KEY:
return "- 3 key insights\n- 2 implications\n- 1 trade-off\n"
payload = {
"model": GROQ_MODEL,
"messages": [
{"role": "system", "content": "You extract insights cleanly."},
{
"role": "user",
"content": f"From these notes, produce exactly 3 insights and 2 implications:\n{researcher_notes}",
},
],
"temperature": 0.5,
}
try:
r = requests.post(GROQ_URL, headers=HEADERS, json=payload, timeout=DEFAULT_TIMEOUT)
r.raise_for_status()
return r.json()["choices"][0]["message"]["content"]
except Exception as e:
return f"[fallback analyst due to error: {e}]\n- Insight 1\n- Insight 2\n- Insight 3\n- Implication A\n- Implication B"
async def run_writer_token_stream(
topic: str,
researcher_notes: str,
analyst_notes: str,
) -> AsyncGenerator[str, None]:
"""
Writer step: stream the final report as small token-like chunks for smooth UI updates.
Yields strings (small chunks). Caller accumulates or forwards as SSE tokens.
"""
writer_prompt = (
"Write a clear, beginner-friendly report with markdown headings:\n"
"Sections: Introduction, Key Concepts, Insights, Practical Tips, Conclusion.\n"
"Use concise language and bullets where helpful.\n\n"
f"Topic: {topic}\n\n"
f"Researcher Notes:\n{researcher_notes}\n\n"
f"Analyst Notes:\n{analyst_notes}\n"
)
# Local simulated streaming if GROQ is disabled or key missing
if GROQ_DISABLED or not GROQ_API_KEY:
simulated = [
f"## {topic}\n\n",
"### Introduction\n",
"This response is streaming locally to simulate real-time typing.\n\n",
"### Key Concepts\n",
"- Concept A\n- Concept B\n\n",
"### Insights\n",
"- Insight 1\n- Insight 2\n\n",
"### Practical Tips\n",
"- Tip 1\n- Tip 2\n\n",
"### Conclusion\n",
"Short summary.\n",
]
for piece in simulated:
for small in _chunk(piece, 20):
yield small
await asyncio.sleep(0.015)
return
# Real streaming via Groq's OpenAI-compatible API
payload = {
"model": GROQ_MODEL,
"messages": [
{"role": "system", "content": "You are a clear, helpful technical writer."},
{"role": "user", "content": writer_prompt},
],
"temperature": 0.6,
"stream": True,
}
# Using requests stream; iterate server-sent "data: ..." lines
with requests.post(
GROQ_URL, headers=HEADERS, json=payload, stream=True, timeout=DEFAULT_TIMEOUT
) as resp:
resp.raise_for_status()
for line in resp.iter_lines(decode_unicode=True):
if not line:
continue
if not line.startswith("data: "):
continue
data = line[6:].strip()
if data == "[DONE]":
break
try:
obj = json.loads(data)
delta = obj["choices"][0]["delta"].get("content", "")
if not delta:
continue
# Yield tiny chunks to update UI frequently
for small in _chunk(delta, 20):
yield small
except Exception:
# Skip malformed lines gracefully
continue