| """Evaluate CORP-ENV policies through the real OpenEnv environment. |
| |
| Examples: |
| uv run python eval.py --policy scripted_weak --label baseline |
| uv run python eval.py --policy oracle --label oracle |
| uv run python eval.py --policy openai --model Qwen/Qwen2.5-7B-Instruct |
| uv run python eval.py --policy hf --model outputs/sft_adapter --adapter outputs/grpo_adapter |
| """ |
|
|
| from __future__ import annotations |
|
|
| import argparse |
| import json |
| import os |
| import re |
| import sys |
| from pathlib import Path |
| from typing import Any, Dict, List, Optional |
|
|
| from dotenv import load_dotenv |
| from openai import OpenAI |
|
|
| ROOT = Path(__file__).resolve().parent |
| if str(ROOT) not in sys.path: |
| sys.path.insert(0, str(ROOT)) |
|
|
| from corp_env.models import CorpAction |
| from scripts._trajectory_utils import ( |
| DEFAULT_TASKS, |
| extract_json_object, |
| normalize_action_obj, |
| observation_message, |
| oracle_actions, |
| write_jsonl, |
| ) |
| from server.agents.master_prompts import build_system_prompt |
| from server.environment import CorpEnvironment |
| from server.llm_env import openai_client_kwargs_master |
|
|
|
|
| def weak_actions(task_id: str) -> List[Dict[str, Any]]: |
| """A deliberately weak, deterministic baseline for no-GPU smoke comparisons.""" |
| if task_id == "e1_launch_readiness": |
| return [ |
| { |
| "action_type": "delegate", |
| "agent_id": "qa_engineer", |
| "payload": "Give launch status.", |
| }, |
| {"action_type": "finalize", "payload": "GO"}, |
| ] |
| if task_id == "m1_budget_reallocation": |
| return [ |
| { |
| "action_type": "delegate", |
| "agent_id": "dev_lead", |
| "payload": "Say whether we need GPUs.", |
| }, |
| {"action_type": "finalize", "payload": json.dumps({"phase_1": "Buy GPUs now."})}, |
| ] |
| return [ |
| {"action_type": "delegate", "agent_id": "cto", "payload": "Assess acquisition offer."}, |
| { |
| "action_type": "finalize", |
| "payload": json.dumps( |
| { |
| "counter_offer": "Ask for more.", |
| "deadline": "Soon.", |
| "retention_plan": "Keep key staff.", |
| } |
| ), |
| }, |
| ] |
|
|
|
|
| class HFPolicy: |
| def __init__(self, model: str, adapter: Optional[str], max_new_tokens: int) -> None: |
| try: |
| import torch |
| from transformers import AutoModelForCausalLM, AutoTokenizer |
| from peft import PeftModel |
| except ImportError as exc: |
| raise SystemExit( |
| "HF evaluation requires torch, transformers, and peft. " |
| "Install eval dependencies on the GPU machine." |
| ) from exc |
|
|
| self.torch = torch |
| self.tokenizer = AutoTokenizer.from_pretrained(model, trust_remote_code=True) |
| if getattr(self.tokenizer, "pad_token", None) is None and getattr(self.tokenizer, "eos_token", None) is not None: |
| self.tokenizer.pad_token = self.tokenizer.eos_token |
|
|
| model_kwargs: Dict[str, Any] = { |
| "trust_remote_code": True, |
| "device_map": "auto", |
| } |
| |
| try: |
| from transformers import BitsAndBytesConfig |
|
|
| model_kwargs["quantization_config"] = BitsAndBytesConfig( |
| load_in_4bit=True, |
| bnb_4bit_use_double_quant=True, |
| bnb_4bit_quant_type="nf4", |
| bnb_4bit_compute_dtype=torch.bfloat16 if torch.cuda.is_available() else torch.float32, |
| ) |
| except Exception: |
| model_kwargs["torch_dtype"] = torch.bfloat16 if torch.cuda.is_available() else torch.float32 |
|
|
| self.model = AutoModelForCausalLM.from_pretrained(model, **model_kwargs) |
| if adapter: |
| self.model = PeftModel.from_pretrained(self.model, adapter) |
|
|
| self.model.eval() |
| self.max_new_tokens = max_new_tokens |
|
|
| def complete(self, messages: List[Dict[str, str]]) -> str: |
| if hasattr(self.tokenizer, "apply_chat_template"): |
| prompt = self.tokenizer.apply_chat_template( |
| messages, |
| tokenize=False, |
| add_generation_prompt=True, |
| ) |
| else: |
| prompt = "\n".join([f"{m['role']}: {m['content']}" for m in messages]) + "\nassistant:" |
|
|
| inputs = self.tokenizer(prompt, return_tensors="pt").to(self.model.device) |
| with self.torch.no_grad(): |
| out = self.model.generate( |
| **inputs, |
| max_new_tokens=self.max_new_tokens, |
| do_sample=False, |
| pad_token_id=self.tokenizer.eos_token_id, |
| ) |
| new_tokens = out[0][inputs["input_ids"].shape[1] :] |
| return self.tokenizer.decode(new_tokens, skip_special_tokens=True).strip() |
|
|
|
|
| class OpenAIPolicy: |
| def __init__(self, model: str, max_new_tokens: int) -> None: |
| load_dotenv() |
| kwargs = openai_client_kwargs_master() |
| if not kwargs.get("api_key"): |
| raise SystemExit("Set CORP_MASTER_API_KEY, HF_TOKEN, or OPENAI_API_KEY for --policy openai.") |
| self.client = OpenAI(**kwargs) |
| self.model = model or os.getenv("CORP_MASTER_MODEL") or os.getenv("MODEL_NAME") |
| if not self.model: |
| raise SystemExit("Pass --model or set CORP_MASTER_MODEL/MODEL_NAME.") |
| self.max_new_tokens = max_new_tokens |
|
|
| def complete(self, messages: List[Dict[str, str]]) -> str: |
| resp = self.client.chat.completions.create( |
| model=self.model, |
| messages=messages, |
| temperature=0.0, |
| max_tokens=self.max_new_tokens, |
| ) |
| return (resp.choices[0].message.content or "").strip() |
|
|
|
|
| def run_scripted_episode(task_id: str, actions: List[Dict[str, Any]]) -> Dict[str, Any]: |
| env = CorpEnvironment() |
| obs = env.reset(task_id=task_id) |
| rewards: List[float] = [] |
| errors: List[str] = [] |
| for action_obj in actions: |
| action = CorpAction.model_validate(action_obj) |
| obs = env.step(action) |
| rewards.append(float(obs.reward or 0.0)) |
| if obs.error: |
| errors.append(obs.error) |
| if obs.done: |
| break |
| verifier = env.task.verifier(obs.swd) |
| return episode_record(task_id, "scripted", env, rewards, errors, obs.swd) |
|
|
|
|
| def run_model_episode( |
| *, |
| task_id: str, |
| policy: Any, |
| max_steps: int, |
| ) -> Dict[str, Any]: |
| env = CorpEnvironment() |
| obs = env.reset(task_id=task_id) |
| messages: List[Dict[str, str]] = [ |
| {"role": "system", "content": build_system_prompt(obs.master_tier, obs.role)} |
| ] |
| rewards: List[float] = [] |
| errors: List[str] = [] |
| invalid_actions = 0 |
|
|
| for step in range(max_steps): |
| messages.append({"role": "user", "content": observation_message(step, obs)}) |
| raw = policy.complete(messages) |
| messages.append({"role": "assistant", "content": raw}) |
| try: |
| action_obj = normalize_action_obj(extract_json_object(raw)) |
| action = CorpAction.model_validate(action_obj) |
| except Exception as exc: |
| invalid_actions += 1 |
| action = CorpAction(action_type="query_swd", payload="$.phase") |
| errors.append(f"invalid_action: {exc}") |
| obs = env.step(action) |
| rewards.append(float(obs.reward or 0.0)) |
| if obs.error: |
| errors.append(obs.error) |
| if obs.done: |
| break |
|
|
| rec = episode_record(task_id, "model", env, rewards, errors, obs.swd) |
| rec["invalid_action_count"] += invalid_actions |
| return rec |
|
|
|
|
| def episode_record( |
| task_id: str, |
| policy_kind: str, |
| env: CorpEnvironment, |
| rewards: List[float], |
| errors: List[str], |
| swd: Dict[str, Any], |
| ) -> Dict[str, Any]: |
| verifier = env.task.verifier(swd) |
| milestones = swd.get("milestones", []) or [] |
| completed = [m for m in milestones if m.get("status") == "complete"] |
| missed = [m for m in milestones if m.get("status") == "missed"] |
| terminal_reward = rewards[-1] if rewards else 0.0 |
| pass_rate = sum(1 for v in verifier.values() if v) / max(len(verifier), 1) |
| return { |
| "task_id": task_id, |
| "policy_kind": policy_kind, |
| "steps": env.turn, |
| "total_reward": round(sum(rewards), 6), |
| "terminal_reward": round(terminal_reward, 6), |
| "reward_trace": rewards, |
| "verifier_pass_rate": round(pass_rate, 6), |
| "passed_checks": [k for k, v in verifier.items() if v], |
| "failed_checks": [k for k, v in verifier.items() if not v], |
| "milestones_total": len(milestones), |
| "milestones_complete": len(completed), |
| "milestones_missed": len(missed), |
| "invalid_action_count": 0, |
| "env_error_count": len(errors), |
| "errors": errors, |
| "final_swd_version": int(swd.get("swd_version", 0)), |
| "success": bool(pass_rate >= 0.99 and not missed), |
| } |
|
|
|
|
| def summarize(rows: List[Dict[str, Any]], label: str) -> None: |
| by_task = {} |
| for row in rows: |
| by_task.setdefault(row["task_id"], []).append(row) |
| print(f"\n[{label}] {len(rows)} episodes") |
| for task_id, task_rows in by_task.items(): |
| avg_reward = sum(r["terminal_reward"] for r in task_rows) / len(task_rows) |
| avg_pass = sum(r["verifier_pass_rate"] for r in task_rows) / len(task_rows) |
| success = sum(1 for r in task_rows if r["success"]) / len(task_rows) |
| print( |
| f"- {task_id}: terminal_reward={avg_reward:.3f} " |
| f"pass_rate={avg_pass:.3f} success={success:.3f}" |
| ) |
|
|
|
|
| def slugify(value: str, fallback: str = "unknown") -> str: |
| value = (value or "").strip() |
| if not value: |
| return fallback |
| value = value.replace("\\", "/").rstrip("/") |
| value = value.split("/")[-1] if "/" in value else value |
| value = re.sub(r"[^A-Za-z0-9._-]+", "-", value) |
| value = value.strip("-._") |
| return value or fallback |
|
|
|
|
| def default_output_path(args: argparse.Namespace) -> Path: |
| stage = slugify(args.label or args.policy, "eval") |
| model_slug = slugify(args.model, args.policy) |
| adapter_slug = slugify(args.adapter, "no-adapter") |
| if args.adapter: |
| run_slug = f"{model_slug}__{adapter_slug}__{stage}" |
| else: |
| run_slug = f"{model_slug}__{stage}" |
| return Path(args.results_root) / "runs" / run_slug / f"{stage}_eval.jsonl" |
|
|
|
|
| def write_run_metadata(path: Path, args: argparse.Namespace, rows: List[Dict[str, Any]]) -> None: |
| by_task: Dict[str, List[Dict[str, Any]]] = {} |
| for row in rows: |
| by_task.setdefault(row["task_id"], []).append(row) |
| summary = { |
| "model_stage": args.label or args.policy, |
| "policy": args.policy, |
| "model": args.model, |
| "adapter": args.adapter, |
| "tasks": [t.strip() for t in args.tasks.split(",") if t.strip()], |
| "episodes": args.episodes, |
| "max_steps": args.max_steps, |
| "max_new_tokens": args.max_new_tokens, |
| "eval_file": str(path), |
| "metrics_by_task": {}, |
| } |
| for task_id, task_rows in by_task.items(): |
| summary["metrics_by_task"][task_id] = { |
| "avg_terminal_reward": round( |
| sum(r["terminal_reward"] for r in task_rows) / len(task_rows), 6 |
| ), |
| "avg_verifier_pass_rate": round( |
| sum(r["verifier_pass_rate"] for r in task_rows) / len(task_rows), 6 |
| ), |
| "success_rate": round( |
| sum(1 for r in task_rows if r["success"]) / len(task_rows), 6 |
| ), |
| } |
| metadata_path = path.with_name("metadata.json") |
| metadata_path.write_text(json.dumps(summary, indent=2, ensure_ascii=False), encoding="utf-8") |
|
|
|
|
| def main() -> None: |
| parser = argparse.ArgumentParser(description="Evaluate CORP-ENV model stages.") |
| parser.add_argument("--policy", choices=["scripted_weak", "oracle", "openai", "hf"], default="scripted_weak") |
| parser.add_argument("--label", default="", help="Model stage label written to each row.") |
| parser.add_argument("--model", default="", help="OpenAI/HF model id or local model path.") |
| parser.add_argument("--adapter", default="", help="Optional PEFT adapter path for --policy hf.") |
| parser.add_argument("--tasks", default=",".join(DEFAULT_TASKS)) |
| parser.add_argument("--episodes", type=int, default=1) |
| parser.add_argument("--max-steps", type=int, default=30) |
| parser.add_argument("--max-new-tokens", type=int, default=1536) |
| parser.add_argument("--results-root", default="results", help="Root for auto-organized eval output.") |
| parser.add_argument( |
| "--output", |
| default="", |
| help="Explicit JSONL path. If omitted, writes under results/runs/<model-adapter-label>/.", |
| ) |
| args = parser.parse_args() |
|
|
| os.environ.setdefault("CORP_STUB_WORKERS", "1") |
| os.environ.setdefault("CORP_DISABLE_LLM_JUDGE", "1") |
|
|
| tasks = [t.strip() for t in args.tasks.split(",") if t.strip()] |
| rows: List[Dict[str, Any]] = [] |
| policy: Any = None |
| if args.policy == "openai": |
| policy = OpenAIPolicy(args.model, args.max_new_tokens) |
| elif args.policy == "hf": |
| if not args.model: |
| raise SystemExit("--policy hf requires --model") |
| policy = HFPolicy(args.model, args.adapter or None, args.max_new_tokens) |
|
|
| for ep in range(args.episodes): |
| for task_id in tasks: |
| if args.policy == "scripted_weak": |
| row = run_scripted_episode(task_id, weak_actions(task_id)) |
| elif args.policy == "oracle": |
| row = run_scripted_episode(task_id, oracle_actions(task_id, ep)) |
| else: |
| max_steps = args.max_steps * 2 if task_id == "h1_acquisition_defence" else args.max_steps |
| row = run_model_episode(task_id=task_id, policy=policy, max_steps=max_steps) |
| row["episode_index"] = ep |
| row["model_stage"] = args.label or args.policy |
| row["policy"] = args.policy |
| row["model"] = args.model |
| row["adapter"] = args.adapter |
| rows.append(row) |
|
|
| out = Path(args.output) if args.output else default_output_path(args) |
| write_jsonl(out, rows) |
| write_run_metadata(out, args, rows) |
| summarize(rows, args.label or args.policy) |
| print(f"\nWrote {out}") |
| print(f"Wrote {out.with_name('metadata.json')}") |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|