web / app.py
monsimas's picture
TruffleHog backstop: soft-warn (flag for review) instead of hard-reject; drop FP Twilio regex
30eb032 verified
Raw
History Blame Contribute Delete
10.3 kB
"""
Trace Commons ingestion server.
Receives anonymous donations from the donate-trace skill, re-runs the same
deterministic scrubber as a backstop, and opens a pull request to the dataset
under a single project-owned token. Contributors need no Hugging Face account.
Designed to run as a Hugging Face Space (Docker SDK) or any host that can keep
a secret. Set these as Space secrets / environment variables:
HF_TOKEN write-scoped token for the project bot account (required)
DATASET_REPO e.g. "trace-commons/agent-traces" (required)
MAX_BYTES max accepted payload size (optional, default 5_000_000)
RATE_PER_HOUR donations allowed per IP per hour (optional, default 20)
This is intentionally small. The skill already scrubbed and the user already
reviewed; the server's job is to never trust the client, re-scrub as a
backstop, refuse anything that still trips the scrubber, and submit.
"""
import io
import os
import re
import time
import json
import uuid
import shutil
import pathlib
import tempfile
import subprocess
from collections import defaultdict, deque
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse, FileResponse, HTMLResponse
from fastapi.middleware.cors import CORSMiddleware
from scrub import scrub_text # the exact same scrubber the skill runs
HF_TOKEN = os.environ.get("HF_TOKEN")
DATASET_REPO = os.environ.get("DATASET_REPO")
MAX_BYTES = int(os.environ.get("MAX_BYTES", "5000000"))
RATE_PER_HOUR = int(os.environ.get("RATE_PER_HOUR", "20"))
VALID_HARNESS = {"claude_code", "codex", "pi", "opencode", "cursor"}
SAFE_FILENAME = re.compile(r"^[A-Za-z0-9._\-]{1,200}$")
app = FastAPI(title="Trace Commons ingestion")
app.add_middleware(
CORSMiddleware, allow_origins=["*"], allow_methods=["POST", "GET"], allow_headers=["*"]
)
# --- simple in-memory rate limiting (per IP, sliding hour) ------------------
# For a single-process Space this is enough. Behind multiple replicas, move
# this to a shared store.
_hits = defaultdict(deque)
def _rate_ok(ip):
now = time.time()
window = _hits[ip]
while window and now - window[0] > 3600:
window.popleft()
if len(window) >= RATE_PER_HOUR:
return False
window.append(now)
return True
SITE_FILE = pathlib.Path(__file__).parent / "index.html"
OG_FILE = pathlib.Path(__file__).parent / "og.png"
TRUFFLEHOG = shutil.which("trufflehog")
def trufflehog_findings(text):
"""Authoritative secret-detection backstop.
Runs TruffleHog (hundreds of maintained detectors) over the already-scrubbed
trace and returns the set of detector names it flags. Detection only:
`--no-verification` means candidate secrets are NEVER sent to third parties
to validate them. No-ops (returns []) when the binary isn't installed, so
local/dev runs gracefully fall back to the regex pass in scrub.py.
"""
if not TRUFFLEHOG:
return []
findings = set()
tmp_path = None
try:
with tempfile.NamedTemporaryFile("w", suffix=".jsonl", delete=False) as tf:
tf.write(text)
tmp_path = tf.name
proc = subprocess.run(
[TRUFFLEHOG, "filesystem", tmp_path,
"--json", "--no-verification", "--no-update"],
capture_output=True, text=True, timeout=120,
)
for line in proc.stdout.splitlines():
line = line.strip()
if not line:
continue
try:
obj = json.loads(line)
except json.JSONDecodeError:
continue
name = obj.get("DetectorName") or obj.get("DetectorType")
if name:
findings.add(str(name))
except (subprocess.TimeoutExpired, OSError):
# A scanner failure must not silently pass a donation, but the regex
# backstop already ran; surface nothing here and let that stand.
return []
finally:
if tmp_path:
try:
os.unlink(tmp_path)
except OSError:
pass
return sorted(findings)
@app.get("/", response_class=HTMLResponse)
def home():
"""Serve the Trace Commons website."""
if SITE_FILE.exists():
return FileResponse(str(SITE_FILE))
return HTMLResponse("<h1>Trace Commons</h1><p>Site file not found.</p>", status_code=200)
@app.get("/og.png")
def og_image():
"""Serve the social-preview image referenced by the page's Open Graph tags."""
if OG_FILE.exists():
return FileResponse(str(OG_FILE), media_type="image/png")
return JSONResponse({"error": "not_found"}, status_code=404)
@app.get("/health")
def health():
configured = bool(HF_TOKEN and DATASET_REPO)
return {
"service": "trace-commons-ingestion",
"configured": configured,
"dataset": DATASET_REPO or "(unset)",
}
@app.post("/donate")
async def donate(request: Request):
ip = request.client.host if request.client else "unknown"
if not _rate_ok(ip):
return JSONResponse(
{"error": "rate_limited", "detail": "Too many donations from this address this hour."},
status_code=429,
)
body = await request.body()
if len(body) > MAX_BYTES:
return JSONResponse(
{"error": "too_large", "detail": f"Payload exceeds {MAX_BYTES} bytes."},
status_code=413,
)
try:
data = json.loads(body)
except json.JSONDecodeError:
return JSONResponse({"error": "bad_json"}, status_code=400)
harness = data.get("harness")
filename = data.get("filename")
consent = data.get("consent")
trace = data.get("trace")
# --- validation ---------------------------------------------------------
if harness not in VALID_HARNESS:
return JSONResponse({"error": "bad_harness", "detail": f"harness must be one of {sorted(VALID_HARNESS)}"}, status_code=400)
if not isinstance(trace, str) or not trace.strip():
return JSONResponse({"error": "empty_trace"}, status_code=400)
if consent is not True:
return JSONResponse({"error": "no_consent", "detail": "consent must be true; the contributor must agree to open publication."}, status_code=400)
if not filename or not SAFE_FILENAME.match(filename):
# generate a safe one rather than trusting client input
filename = f"{uuid.uuid4().hex}.jsonl"
# --- backstop scrub: never trust the client ----------------------------
cleaned, report = scrub_text(trace, harness)
# The skill should have already removed everything. If the backstop still
# finds high-confidence secrets, refuse: something slipped through.
secret_kinds = {k: v for k, v in report["redactions"].items()
if k not in ("home_path", "email", "private_ip")}
if secret_kinds:
return JSONResponse(
{
"error": "secrets_found",
"detail": "The server's backstop scrubber found secrets the client should have removed. Donation rejected.",
"found": secret_kinds,
},
status_code=422,
)
# --- TruffleHog soft-warn pass over the scrubbed trace ------------------
# Catches what the regex pass cannot (vendor tokens with no fixed prefix).
# Run WITHOUT verification so it never transmits candidate secrets — which
# means occasional false positives (e.g. a 32-char hash read as a "Box"
# token). It therefore does NOT auto-reject: findings are surfaced to the
# contributor in the response and recorded on the PR for the maintainer to
# review, on top of the human review every donation already gets. The regex
# pass above stays the hard block for crisp, high-confidence secret formats.
th_detectors = trufflehog_findings(cleaned)
if not HF_TOKEN or not DATASET_REPO:
# Not yet configured — accept-validate but don't pretend to publish.
return JSONResponse(
{
"status": "validated_not_published",
"detail": "Server is not yet configured with a dataset target. Trace passed all checks but was not published.",
"redactions": report["redactions"],
},
status_code=503,
)
# --- open the PR on the contributor's behalf ---------------------------
try:
pr_url = _open_pr(cleaned, harness, filename, warnings=th_detectors)
except Exception as e: # noqa: BLE001 — surface a clean message to the skill
return JSONResponse({"error": "publish_failed", "detail": str(e)}, status_code=502)
resp = {"status": "submitted", "pr_url": pr_url, "path": f"sessions/{harness}/{filename}"}
if th_detectors:
resp["warnings"] = {
"trufflehog_unverified": th_detectors,
"note": "TruffleHog flagged these without verification — often false positives "
"on high-entropy strings, so the donation was NOT blocked. Please confirm "
"none is a real secret; a maintainer will also review before merging.",
}
return resp
def _open_pr(cleaned_text, harness, filename, warnings=None):
"""Open a PR to the dataset with the cleaned trace, under the project token."""
from huggingface_hub import HfApi, CommitOperationAdd
api = HfApi(token=HF_TOKEN)
op = CommitOperationAdd(
path_in_repo=f"sessions/{harness}/{filename}",
path_or_fileobj=io.BytesIO(cleaned_text.encode("utf-8")),
)
description = "Anonymous donation via Trace Commons ingestion server."
if warnings:
description += (
"\n\n⚠️ **Maintainer review needed.** TruffleHog (unverified) flagged the "
"following detector(s). These are frequently false positives on high-entropy "
"strings (hashes, IDs, base64), but confirm none is a real secret before merging:\n- "
+ "\n- ".join(warnings)
)
commit = api.create_commit(
repo_id=DATASET_REPO,
repo_type="dataset",
operations=[op],
commit_message=f"Donate {harness} trace ({filename})",
commit_description=description,
create_pr=True,
)
# create_commit returns an object whose pr_url is set when create_pr=True
return getattr(commit, "pr_url", None) or str(commit)