Spaces:
Running
Running
| """ | |
| Trace Commons ingestion server. | |
| Receives anonymous donations from the donate-trace skill, re-runs the same | |
| deterministic scrubber as a backstop, and opens a pull request to the dataset | |
| under a single project-owned token. Contributors need no Hugging Face account. | |
| Designed to run as a Hugging Face Space (Docker SDK) or any host that can keep | |
| a secret. Set these as Space secrets / environment variables: | |
| HF_TOKEN write-scoped token for the project bot account (required) | |
| DATASET_REPO e.g. "trace-commons/agent-traces" (required) | |
| MAX_BYTES max accepted payload size (optional, default 5_000_000) | |
| RATE_PER_HOUR donations allowed per IP per hour (optional, default 20) | |
| This is intentionally small. The skill already scrubbed and the user already | |
| reviewed; the server's job is to never trust the client, re-scrub as a | |
| backstop, refuse anything that still trips the scrubber, and submit. | |
| """ | |
| import io | |
| import os | |
| import re | |
| import time | |
| import json | |
| import uuid | |
| import shutil | |
| import pathlib | |
| import tempfile | |
| import subprocess | |
| from collections import defaultdict, deque | |
| from fastapi import FastAPI, Request | |
| from fastapi.responses import JSONResponse, FileResponse, HTMLResponse | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from scrub import scrub_text # the exact same scrubber the skill runs | |
| HF_TOKEN = os.environ.get("HF_TOKEN") | |
| DATASET_REPO = os.environ.get("DATASET_REPO") | |
| MAX_BYTES = int(os.environ.get("MAX_BYTES", "5000000")) | |
| RATE_PER_HOUR = int(os.environ.get("RATE_PER_HOUR", "20")) | |
| VALID_HARNESS = {"claude_code", "codex", "pi", "opencode", "cursor"} | |
| SAFE_FILENAME = re.compile(r"^[A-Za-z0-9._\-]{1,200}$") | |
| app = FastAPI(title="Trace Commons ingestion") | |
| app.add_middleware( | |
| CORSMiddleware, allow_origins=["*"], allow_methods=["POST", "GET"], allow_headers=["*"] | |
| ) | |
| # --- simple in-memory rate limiting (per IP, sliding hour) ------------------ | |
| # For a single-process Space this is enough. Behind multiple replicas, move | |
| # this to a shared store. | |
| _hits = defaultdict(deque) | |
| def _rate_ok(ip): | |
| now = time.time() | |
| window = _hits[ip] | |
| while window and now - window[0] > 3600: | |
| window.popleft() | |
| if len(window) >= RATE_PER_HOUR: | |
| return False | |
| window.append(now) | |
| return True | |
| SITE_FILE = pathlib.Path(__file__).parent / "index.html" | |
| OG_FILE = pathlib.Path(__file__).parent / "og.png" | |
| TRUFFLEHOG = shutil.which("trufflehog") | |
| def trufflehog_findings(text): | |
| """Authoritative secret-detection backstop. | |
| Runs TruffleHog (hundreds of maintained detectors) over the already-scrubbed | |
| trace and returns the set of detector names it flags. Detection only: | |
| `--no-verification` means candidate secrets are NEVER sent to third parties | |
| to validate them. No-ops (returns []) when the binary isn't installed, so | |
| local/dev runs gracefully fall back to the regex pass in scrub.py. | |
| """ | |
| if not TRUFFLEHOG: | |
| return [] | |
| findings = set() | |
| tmp_path = None | |
| try: | |
| with tempfile.NamedTemporaryFile("w", suffix=".jsonl", delete=False) as tf: | |
| tf.write(text) | |
| tmp_path = tf.name | |
| proc = subprocess.run( | |
| [TRUFFLEHOG, "filesystem", tmp_path, | |
| "--json", "--no-verification", "--no-update"], | |
| capture_output=True, text=True, timeout=120, | |
| ) | |
| for line in proc.stdout.splitlines(): | |
| line = line.strip() | |
| if not line: | |
| continue | |
| try: | |
| obj = json.loads(line) | |
| except json.JSONDecodeError: | |
| continue | |
| name = obj.get("DetectorName") or obj.get("DetectorType") | |
| if name: | |
| findings.add(str(name)) | |
| except (subprocess.TimeoutExpired, OSError): | |
| # A scanner failure must not silently pass a donation, but the regex | |
| # backstop already ran; surface nothing here and let that stand. | |
| return [] | |
| finally: | |
| if tmp_path: | |
| try: | |
| os.unlink(tmp_path) | |
| except OSError: | |
| pass | |
| return sorted(findings) | |
| def home(): | |
| """Serve the Trace Commons website.""" | |
| if SITE_FILE.exists(): | |
| return FileResponse(str(SITE_FILE)) | |
| return HTMLResponse("<h1>Trace Commons</h1><p>Site file not found.</p>", status_code=200) | |
| def og_image(): | |
| """Serve the social-preview image referenced by the page's Open Graph tags.""" | |
| if OG_FILE.exists(): | |
| return FileResponse(str(OG_FILE), media_type="image/png") | |
| return JSONResponse({"error": "not_found"}, status_code=404) | |
| def health(): | |
| configured = bool(HF_TOKEN and DATASET_REPO) | |
| return { | |
| "service": "trace-commons-ingestion", | |
| "configured": configured, | |
| "dataset": DATASET_REPO or "(unset)", | |
| } | |
| async def donate(request: Request): | |
| ip = request.client.host if request.client else "unknown" | |
| if not _rate_ok(ip): | |
| return JSONResponse( | |
| {"error": "rate_limited", "detail": "Too many donations from this address this hour."}, | |
| status_code=429, | |
| ) | |
| body = await request.body() | |
| if len(body) > MAX_BYTES: | |
| return JSONResponse( | |
| {"error": "too_large", "detail": f"Payload exceeds {MAX_BYTES} bytes."}, | |
| status_code=413, | |
| ) | |
| try: | |
| data = json.loads(body) | |
| except json.JSONDecodeError: | |
| return JSONResponse({"error": "bad_json"}, status_code=400) | |
| harness = data.get("harness") | |
| filename = data.get("filename") | |
| consent = data.get("consent") | |
| trace = data.get("trace") | |
| # --- validation --------------------------------------------------------- | |
| if harness not in VALID_HARNESS: | |
| return JSONResponse({"error": "bad_harness", "detail": f"harness must be one of {sorted(VALID_HARNESS)}"}, status_code=400) | |
| if not isinstance(trace, str) or not trace.strip(): | |
| return JSONResponse({"error": "empty_trace"}, status_code=400) | |
| if consent is not True: | |
| return JSONResponse({"error": "no_consent", "detail": "consent must be true; the contributor must agree to open publication."}, status_code=400) | |
| if not filename or not SAFE_FILENAME.match(filename): | |
| # generate a safe one rather than trusting client input | |
| filename = f"{uuid.uuid4().hex}.jsonl" | |
| # --- backstop scrub: never trust the client ---------------------------- | |
| cleaned, report = scrub_text(trace, harness) | |
| # The skill should have already removed everything. If the backstop still | |
| # finds high-confidence secrets, refuse: something slipped through. | |
| secret_kinds = {k: v for k, v in report["redactions"].items() | |
| if k not in ("home_path", "email", "private_ip")} | |
| if secret_kinds: | |
| return JSONResponse( | |
| { | |
| "error": "secrets_found", | |
| "detail": "The server's backstop scrubber found secrets the client should have removed. Donation rejected.", | |
| "found": secret_kinds, | |
| }, | |
| status_code=422, | |
| ) | |
| # --- TruffleHog soft-warn pass over the scrubbed trace ------------------ | |
| # Catches what the regex pass cannot (vendor tokens with no fixed prefix). | |
| # Run WITHOUT verification so it never transmits candidate secrets — which | |
| # means occasional false positives (e.g. a 32-char hash read as a "Box" | |
| # token). It therefore does NOT auto-reject: findings are surfaced to the | |
| # contributor in the response and recorded on the PR for the maintainer to | |
| # review, on top of the human review every donation already gets. The regex | |
| # pass above stays the hard block for crisp, high-confidence secret formats. | |
| th_detectors = trufflehog_findings(cleaned) | |
| if not HF_TOKEN or not DATASET_REPO: | |
| # Not yet configured — accept-validate but don't pretend to publish. | |
| return JSONResponse( | |
| { | |
| "status": "validated_not_published", | |
| "detail": "Server is not yet configured with a dataset target. Trace passed all checks but was not published.", | |
| "redactions": report["redactions"], | |
| }, | |
| status_code=503, | |
| ) | |
| # --- open the PR on the contributor's behalf --------------------------- | |
| try: | |
| pr_url = _open_pr(cleaned, harness, filename, warnings=th_detectors) | |
| except Exception as e: # noqa: BLE001 — surface a clean message to the skill | |
| return JSONResponse({"error": "publish_failed", "detail": str(e)}, status_code=502) | |
| resp = {"status": "submitted", "pr_url": pr_url, "path": f"sessions/{harness}/{filename}"} | |
| if th_detectors: | |
| resp["warnings"] = { | |
| "trufflehog_unverified": th_detectors, | |
| "note": "TruffleHog flagged these without verification — often false positives " | |
| "on high-entropy strings, so the donation was NOT blocked. Please confirm " | |
| "none is a real secret; a maintainer will also review before merging.", | |
| } | |
| return resp | |
| def _open_pr(cleaned_text, harness, filename, warnings=None): | |
| """Open a PR to the dataset with the cleaned trace, under the project token.""" | |
| from huggingface_hub import HfApi, CommitOperationAdd | |
| api = HfApi(token=HF_TOKEN) | |
| op = CommitOperationAdd( | |
| path_in_repo=f"sessions/{harness}/{filename}", | |
| path_or_fileobj=io.BytesIO(cleaned_text.encode("utf-8")), | |
| ) | |
| description = "Anonymous donation via Trace Commons ingestion server." | |
| if warnings: | |
| description += ( | |
| "\n\n⚠️ **Maintainer review needed.** TruffleHog (unverified) flagged the " | |
| "following detector(s). These are frequently false positives on high-entropy " | |
| "strings (hashes, IDs, base64), but confirm none is a real secret before merging:\n- " | |
| + "\n- ".join(warnings) | |
| ) | |
| commit = api.create_commit( | |
| repo_id=DATASET_REPO, | |
| repo_type="dataset", | |
| operations=[op], | |
| commit_message=f"Donate {harness} trace ({filename})", | |
| commit_description=description, | |
| create_pr=True, | |
| ) | |
| # create_commit returns an object whose pr_url is set when create_pr=True | |
| return getattr(commit, "pr_url", None) or str(commit) | |