ctx / src /ctx_monitor.py
Stevesolun's picture
Sync ctx 333622a
547daee verified
# mypy: disable-error-code=attr-defined
"""ctx_monitor.py -- Local HTTP dashboard for ctx runtime and catalog activity.
``ctx-monitor serve [--port 8765]`` starts a zero-dependency threaded HTTP server
(stdlib http.server) that renders the audit log + skill-events.jsonl +
sidecars into a browser UI at http://localhost:8765/.
Routes:
/ Home β€” summary stats + session list + links
/loaded Live manifest view + load/unload actions
/sessions List of sessions (skills/agents/MCP activity)
/session/<id> Skills + agents seen in that session
/skills Sidecar card grid with grade + score filters
/skill/<slug> Sidecar breakdown + timeline of audit events
/wiki Wiki entity index β€” all pages with search
/wiki/<slug>?type=<entity> One wiki entity page (frontmatter + body)
/graph Built-in graph explorer + popular seeds
/graph?slug=<slug>&type=... Focus graph view on a specific entity
/manage Search/edit/delete/import catalog entities
/harness Manual harness setup for user-owned LLMs
/docs Local docs index + public docs handoff
/config Editable ctx config with defaults fallback
/status Durable queue + graph/wiki artifact state
/kpi Grade / lifecycle / category KPIs
/runtime Generic harness validation/escalation ledger
/logs Filterable tail of ctx-audit.jsonl
/events Live SSE stream of new audit-log lines
/api/sessions.json JSON index for scripting
/api/manifest.json Raw ~/.claude/skill-manifest.json
/api/status.json Queue counts + artifact promotion metadata
/api/runtime.json Generic harness validation/escalation summary
/api/skill/<slug>.json Sidecar passthrough
/api/graph/<slug>.json Dashboard-shaped neighborhood; accepts type
/api/entities/search.json Search wiki entities across supported types
/api/entity/<slug>.json Wiki entity frontmatter + Markdown body
/api/config.json Effective/default/user config
/api/kpi.json DashboardSummary passthrough
Design notes:
- No Flask / Starlette / FastAPI dependency. Request handling is threaded
so one open SSE client cannot monopolize the local dashboard. Repo-doc
rendering uses the package's Markdown dependencies for MkDocs-like output.
- GET views read append-only files. POST mutation endpoints require
loopback access, a per-process token, and same-origin headers.
- SSE endpoint tails ``~/.claude/ctx-audit.jsonl`` and pushes each new
line as a server-sent event. Clients auto-reconnect.
- Security: binds to 127.0.0.1 by default. ``--host`` override requires
an explicit flag to emphasize the local-dev-only intent.
This is a minimal dashboard. Power users should pipe the audit log
into Grafana / Loki / whatever; ``ctx-monitor`` is the zero-config
starting point.
"""
from __future__ import annotations
import argparse
import hashlib
import html
import ipaddress
import json
import math
import os
import re
import secrets
import sqlite3
import socket
import sys
import tarfile
import threading
import time
import zlib
from collections import defaultdict, deque
from http.cookies import CookieError, SimpleCookie
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from importlib.resources import files
from pathlib import Path
from typing import Any
from urllib.parse import quote, unquote, urlsplit
from ctx import dashboard_docs, dashboard_entities, dashboard_graph
from ctx.core import entity_types as core_entity_types
from ctx.core.wiki import wiki_queue
from ctx.core.wiki.wiki_utils import parse_frontmatter_and_body
from ctx.utils._file_lock import file_lock
from ctx.utils._fs_utils import atomic_write_text as _atomic_write_text
from ctx.utils._fs_utils import safe_atomic_write_text as _safe_atomic_write_text
from ctx.utils._safe_name import is_safe_source_name
_MONITOR_TOKEN = ""
_MONITOR_MUTATIONS_ENABLED = True
_GRAPH_CACHE_KEY: tuple[Any, ...] | None = None
_GRAPH_CACHE_VALUE: Any | None = None
_PACKAGED_GRAPH_EXPORT_ID_CACHE: str | None | bool = None
_OVERLAY_INDEX_COVERAGE_CACHE_KEY: tuple[Any, ...] | None = None
_OVERLAY_INDEX_COVERAGE_CACHE_VALUE: bool | None = None
_SIDECAR_INDEX_CACHE_KEY: tuple[tuple[Path, float, int], ...] | None = None
_SIDECAR_INDEX_CACHE_VALUE: dict[tuple[str, str], dict] | None = None
_SIDECAR_FILTER_CACHE_SIGNATURE: tuple[Any, ...] | None = None
_SIDECAR_FILTER_CACHE_VALUE: dict[tuple[Any, ...], list[dict[str, Any]]] = {}
_KPI_SUMMARY_CACHE_KEY: tuple[Any, ...] | None = None
_KPI_SUMMARY_CACHE_VALUE: Any | None = None
_KPI_SUMMARY_CACHE_AT = 0.0
_WIKI_RENDER_CACHE_KEY: tuple[Any, ...] | None = None
_WIKI_RENDER_CACHE_VALUE: str | None = None
_WIKI_INDEX_LIMIT_PER_TYPE = 500
_SKILLS_PAGE_DEFAULT_LIMIT = 100
_SKILLS_PAGE_MAX_LIMIT = 500
_KPI_SUMMARY_CACHE_SECONDS = 30
_GRAPH_REPORT_RE = re.compile(r"Nodes:\s*([\d,]+)\s*\|\s*Edges:\s*([\d,]+)")
_MAX_POST_BODY_BYTES = 64 * 1024
_DASHBOARD_INDEX_MEMBER = "graphify-out/dashboard-neighborhoods.sqlite3"
_READ_TOKEN_COOKIE = "ctx_monitor_read_token"
# ─── Data sources ────────────────────────────────────────────────────────────
def _host_allows_mutations(host: str) -> bool:
normalized = (host or "").strip().strip("[]").rstrip(".").lower()
if normalized == "localhost":
return True
try:
return ipaddress.ip_address(normalized).is_loopback
except ValueError:
return False
def _request_host_name(host_header: str) -> str:
value = (host_header or "").strip()
if not value:
return ""
if value.startswith("["):
end = value.find("]")
return value[1:end].rstrip(".").lower() if end != -1 else ""
return value.rsplit(":", 1)[0].rstrip(".").lower()
def _origin_host_name(origin: str) -> str:
try:
parsed = urlsplit(origin)
except ValueError:
return ""
if parsed.scheme not in {"http", "https"}:
return ""
return (parsed.hostname or "").rstrip(".").lower()
def _read_token_cookie(cookie_header: str) -> str:
if not cookie_header:
return ""
try:
cookie = SimpleCookie()
cookie.load(cookie_header)
except CookieError:
return ""
morsel = cookie.get(_READ_TOKEN_COOKIE)
return morsel.value if morsel is not None else ""
def _claude_dir() -> Path:
return Path(os.path.expanduser("~/.claude"))
def _audit_log_path() -> Path:
# Avoid importing ctx_audit_log here so the monitor can run even if
# ctx_audit_log is absent for some reason.
return _claude_dir() / "ctx-audit.jsonl"
def _events_jsonl_path() -> Path:
return _claude_dir() / "skill-events.jsonl"
def _runtime_lifecycle_path() -> Path:
from ctx.adapters.generic.runtime_lifecycle import RuntimeLifecycleStore
return RuntimeLifecycleStore().events_path
def _manifest_path() -> Path:
return _claude_dir() / "skill-manifest.json"
def _sidecar_dir() -> Path:
return _claude_dir() / "skill-quality"
def _wiki_dir() -> Path:
return _claude_dir() / "skill-wiki"
def _user_config_path() -> Path:
return _claude_dir() / "skill-system-config.json"
def _load_dashboard_graph() -> Any:
"""Load the wiki graph once per graph.json file version."""
global _GRAPH_CACHE_KEY, _GRAPH_CACHE_VALUE
graph_path = _wiki_dir() / "graphify-out" / "graph.json"
overlay_path = graph_path.with_name("entity-overlays.jsonl")
from ctx.core.graph.resolve_graph import load_graph as _lg # type: ignore
if not graph_path.exists():
_GRAPH_CACHE_KEY = None
_GRAPH_CACHE_VALUE = None
return _lg(graph_path)
stat = graph_path.stat()
overlay_key = None
if overlay_path.exists():
overlay_stat = overlay_path.stat()
overlay_key = (overlay_stat.st_mtime, overlay_stat.st_size)
cache_key = (graph_path.resolve(), stat.st_mtime, stat.st_size, id(_lg), overlay_key)
if _GRAPH_CACHE_KEY == cache_key and _GRAPH_CACHE_VALUE is not None:
return _GRAPH_CACHE_VALUE
try:
graph = _lg(graph_path, apply_runtime_filter=False)
except TypeError:
graph = _lg(graph_path)
_GRAPH_CACHE_KEY = cache_key
_GRAPH_CACHE_VALUE = graph
return graph
def _mcp_shard(slug: str) -> str:
return core_entity_types.mcp_shard(slug)
_DASHBOARD_ENTITY_SOURCES: tuple[tuple[str, str, bool], ...] = core_entity_types.entity_source_specs()
_DASHBOARD_ENTITY_TYPES: tuple[str, ...] = tuple(
entity_type for _, entity_type, _ in _DASHBOARD_ENTITY_SOURCES
)
_DEFAULT_GRAPH_FOCUS_SLUG = "github"
def _normalize_dashboard_entity_type(raw: object) -> str | None:
return core_entity_types.normalize_entity_type(
raw,
allowed=_DASHBOARD_ENTITY_TYPES,
)
def _audit_entity_type(row: dict) -> str | None:
raw_meta = row.get("meta")
meta: dict[str, Any] = raw_meta if isinstance(raw_meta, dict) else {}
for raw in (
meta.get("entity_type"),
row.get("entity_type"),
row.get("subject_type"),
row.get("type"),
):
normalized = _normalize_dashboard_entity_type(raw)
if normalized:
return normalized
event = str(row.get("event") or "")
prefix, _, _ = event.partition(".")
return _normalize_dashboard_entity_type(prefix)
def _wiki_entity_path(slug: str, entity_type: str | None = None) -> Path | None:
"""Resolve a slug to its wiki entity page.
Wiki layout: ``entities/skills/<slug>.md``, ``entities/agents/<slug>.md``,
``entities/harnesses/<slug>.md``, or sharded
``entities/mcp-servers/<first-char>/<slug>.md``. Returns the first match
unless ``entity_type`` disambiguates duplicate slugs.
"""
# Validate slug so a crafted request can't escape the wiki tree.
if not _is_safe_slug(slug):
return None
for _sub, current_type, _recursive in _DASHBOARD_ENTITY_SOURCES:
if entity_type is not None and entity_type != current_type:
continue
p = core_entity_types.entity_page_path(_wiki_dir(), current_type, slug)
if p is None:
continue
if p.exists():
return p
return None
def _wiki_entity_target_path(slug: str, entity_type: str) -> Path:
"""Return the canonical wiki entity path for a new/updated entity."""
if not _is_safe_slug(slug):
raise ValueError(f"invalid slug: {slug!r}")
normalized = _normalize_dashboard_entity_type(entity_type)
if normalized is None:
raise ValueError(f"unsupported entity_type: {entity_type!r}")
path = core_entity_types.entity_page_path(_wiki_dir(), normalized, slug)
if path is None:
raise ValueError(f"unsupported entity_type: {entity_type!r}")
return path
def _iter_wiki_entity_paths(
entity_type: str | None = None,
) -> list[tuple[str, str, Path]]:
normalized = _normalize_dashboard_entity_type(entity_type) if entity_type else None
if entity_type is not None and normalized is None:
raise ValueError(f"unsupported entity_type: {entity_type!r}")
base = _wiki_dir() / "entities"
if not base.is_dir():
return []
rows: list[tuple[str, str, Path]] = []
for sub, current_type, recursive in _DASHBOARD_ENTITY_SOURCES:
if normalized is not None and normalized != current_type:
continue
root = base / sub
if not root.is_dir():
continue
paths = root.rglob("*.md") if recursive else root.glob("*.md")
for path in paths:
slug = path.stem
if _is_safe_slug(slug):
rows.append((slug, current_type, path))
return sorted(rows, key=lambda row: (row[1], row[0].lower(), row[2].as_posix()))
def _wiki_entity_detail(slug: str, entity_type: str | None = None) -> dict[str, Any] | None:
normalized = _normalize_dashboard_entity_type(entity_type) if entity_type else None
if entity_type is not None and normalized is None:
raise ValueError(f"unsupported entity_type: {entity_type!r}")
path = _wiki_entity_path(slug, entity_type=normalized)
if path is None:
return None
text = path.read_text(encoding="utf-8", errors="replace")
frontmatter, body = _parse_frontmatter(text)
detected_type = normalized or _normalize_dashboard_entity_type(frontmatter.get("type")) or "skill"
return {
"slug": slug,
"type": detected_type,
"path": str(path),
"frontmatter": frontmatter,
"body": body,
}
def _search_wiki_entities(
query: str = "",
entity_type: str | None = None,
*,
limit: int = 80,
) -> list[dict[str, Any]]:
return dashboard_entities.search_wiki_entities(
query,
entity_type,
limit=limit,
deps=_entity_crud_deps(),
)
def _queue_entity_refresh(
*,
entity_type: str,
slug: str,
entity_path: Path,
content: str,
action: str,
) -> None:
wiki = _wiki_dir()
wiki_queue.enqueue_entity_upsert(
wiki,
entity_type=entity_type,
slug=slug,
entity_path=entity_path,
content=content,
action=action,
source="ctx-monitor",
)
if action == "delete":
return
wiki_queue.enqueue_maintenance_job(
wiki,
kind=wiki_queue.GRAPH_EXPORT_JOB,
payload={"reason": f"entity-{action}", "entity_type": entity_type, "slug": slug},
source="ctx-monitor",
)
def _write_entity_text(path: Path, content: str) -> None:
_safe_atomic_write_text(path, content, encoding="utf-8")
def _entity_crud_deps() -> dashboard_entities.EntityCrudDeps:
return dashboard_entities.EntityCrudDeps(
is_safe_slug=_is_safe_slug,
normalize_entity_type=_normalize_dashboard_entity_type,
wiki_entity_detail=_wiki_entity_detail,
wiki_entity_target_path=_wiki_entity_target_path,
wiki_entity_path=_wiki_entity_path,
iter_wiki_entity_paths=_iter_wiki_entity_paths,
read_manifest=_read_manifest,
perform_unload=_perform_unload,
queue_entity_refresh=lambda entity_type, slug, entity_path, content, action: _queue_entity_refresh(
entity_type=entity_type,
slug=slug,
entity_path=entity_path,
content=content,
action=action,
),
file_lock=file_lock,
write_entity_text=_write_entity_text,
parse_frontmatter=_parse_frontmatter,
frontmatter_tags=lambda value: _frontmatter_tags(value, limit=None),
frontmatter_text=_frontmatter_text,
display_slug=_display_slug,
display_label=lambda value: _display_label(value),
entity_wiki_href=_entity_wiki_href,
)
def _entity_runtime_deps() -> dashboard_entities.EntityRuntimeDeps:
return dashboard_entities.EntityRuntimeDeps(
is_safe_slug=_is_safe_slug,
normalize_entity_type=_normalize_dashboard_entity_type,
wiki_dir=_wiki_dir,
claude_dir=_claude_dir,
log_dashboard_entity_event=_log_dashboard_entity_event,
remove_loaded_manifest_entry=_remove_loaded_manifest_entry,
)
def _upsert_wiki_entity(payload: dict[str, Any]) -> tuple[bool, str]:
return dashboard_entities.upsert_wiki_entity(payload, deps=_entity_crud_deps())
def _delete_wiki_entity(slug: str, entity_type: str) -> tuple[bool, str]:
return dashboard_entities.delete_wiki_entity(
slug,
entity_type,
deps=_entity_crud_deps(),
)
def _parse_frontmatter(text: str) -> tuple[dict[str, Any], str]:
"""Split frontmatter from body using the canonical wiki parser."""
return parse_frontmatter_and_body(text)
def _frontmatter_text(value: Any) -> str:
if isinstance(value, list):
return ", ".join(str(v) for v in value)
if isinstance(value, dict):
return json.dumps(value, ensure_ascii=False, default=str)
if value is None:
return ""
return str(value)
def _truncate_text(value: str, limit: int) -> tuple[str, bool]:
if limit <= 0 or len(value) <= limit:
return value, False
if limit <= 3:
return value[:limit], True
return value[: limit - 3].rstrip() + "...", True
def _json_for_script(value: Any) -> str:
return json.dumps(value, ensure_ascii=False, default=str).replace("</", "<\\/")
def _frontmatter_tags(value: Any, *, limit: int | None = 6) -> list[str]:
if isinstance(value, list):
raw_items = value
else:
raw = _frontmatter_text(value)
raw_items = raw.replace("[", "").replace("]", "").split(",")
out: list[str] = []
for item in raw_items:
tok = str(item).strip().strip("'\"")
if tok:
out.append(tok)
if limit is not None and len(out) >= limit:
break
return out
_WIKI_INLINE_RE = re.compile(
r"(`[^`\n]+`|\[\[[^\]\n]+\]\]|(?<!!)\[[^\]\n]+\]\([^\s()\n]+(?:\s+\"[^\"]*\")?\))",
)
_WIKI_QUALITY_BLOCK_RE = re.compile(
r"<!--\s*quality:begin\s*-->\s*(.*?)\s*<!--\s*quality:end\s*-->",
re.IGNORECASE | re.DOTALL,
)
def _wiki_link_href(target: str) -> tuple[str, str]:
"""Return dashboard href + display label for an Obsidian-style wikilink."""
normalized = target.strip().replace("\\", "/").removesuffix(".md")
parts = [part for part in normalized.split("/") if part]
entity_type = ""
if len(parts) >= 3 and parts[0] == "entities":
entity_type = {
"skills": "skill",
"agents": "agent",
"mcp-servers": "mcp-server",
"harnesses": "harness",
}.get(parts[1], "")
slug = parts[-1] if parts else normalized
if not _is_safe_slug(slug):
return "#", slug or target
suffix = f"?type={quote(entity_type)}" if entity_type else ""
return f"/wiki/{quote(slug)}{suffix}", _display_slug(slug)
def _markdown_link_href(target: str) -> str | None:
"""Return a safe href for normal Markdown links, or None to suppress it."""
cleaned = target.strip()
if not cleaned:
return None
if cleaned.startswith("//"):
return None
if cleaned.startswith(("/", "#")):
return cleaned
if re.match(r"^https?://", cleaned, re.IGNORECASE):
return cleaned
if re.match(r"^mailto:[^@\s]+@[^@\s]+$", cleaned, re.IGNORECASE):
return cleaned
return None
def _render_wiki_inline(text: str) -> str:
"""Render a small safe inline Markdown subset used by wiki pages."""
out: list[str] = []
last = 0
for match in _WIKI_INLINE_RE.finditer(text):
out.append(html.escape(text[last:match.start()]))
token = match.group(0)
if token.startswith("`"):
out.append(f"<code>{html.escape(token[1:-1])}</code>")
elif token.startswith("[["):
inner = token[2:-2]
target, _, label = inner.partition("|")
href, fallback_label = _wiki_link_href(target)
link_text = label.strip() or fallback_label
out.append(
f"<a href='{html.escape(href)}'>{html.escape(link_text)}</a>",
)
else:
link_match = re.fullmatch(
r"\[([^\]\n]+)\]\(([^\s()\n]+)(?:\s+\"[^\"]*\")?\)",
token,
)
if not link_match:
out.append(html.escape(token))
else:
label, target = link_match.groups()
safe_href = _markdown_link_href(target)
if safe_href is None:
out.append(html.escape(label))
else:
out.append(
f"<a href='{html.escape(safe_href)}'>{html.escape(label)}</a>",
)
last = match.end()
out.append(html.escape(text[last:]))
return "".join(out)
def _render_wiki_markdown(markdown_text: str) -> str:
"""Render a conservative Markdown subset without adding dependencies."""
lines = markdown_text.splitlines()
out: list[str] = []
paragraph: list[str] = []
list_items: list[str] = []
code_lines: list[str] = []
in_code = False
def flush_paragraph() -> None:
if paragraph:
out.append(f"<p>{_render_wiki_inline(' '.join(paragraph))}</p>")
paragraph.clear()
def flush_list() -> None:
if list_items:
out.append("<ul>" + "".join(f"<li>{item}</li>" for item in list_items) + "</ul>")
list_items.clear()
def flush_code() -> None:
if code_lines:
out.append("<pre><code>" + html.escape("\n".join(code_lines)) + "</code></pre>")
code_lines.clear()
for line in lines:
stripped = line.strip()
if stripped.startswith("```"):
if in_code:
flush_code()
in_code = False
else:
flush_paragraph()
flush_list()
in_code = True
continue
if in_code:
code_lines.append(line)
continue
if not stripped:
flush_paragraph()
flush_list()
continue
heading = re.match(r"^(#{1,4})\s+(.+)$", stripped)
if heading:
flush_paragraph()
flush_list()
level = min(len(heading.group(1)), 4)
out.append(f"<h{level}>{_render_wiki_inline(heading.group(2))}</h{level}>")
continue
bullet = re.match(r"^\s*[-*]\s+(.+)$", line)
if bullet:
flush_paragraph()
list_items.append(_render_wiki_inline(bullet.group(1).strip()))
continue
flush_list()
paragraph.append(stripped)
flush_code()
flush_paragraph()
flush_list()
return "".join(out) if out else "<p class='muted'>No body.</p>"
def _extract_embedded_quality_block(markdown_text: str) -> tuple[str, str | None]:
matches = list(_WIKI_QUALITY_BLOCK_RE.finditer(markdown_text))
if not matches:
return markdown_text, None
quality_blocks = [
match.group(1).strip()
for match in matches
if match.group(1).strip()
]
body = _WIKI_QUALITY_BLOCK_RE.sub("\n\n", markdown_text)
body = re.sub(r"\n{3,}", "\n\n", body).strip()
quality_markdown = "\n\n".join(quality_blocks).strip() or None
return body, quality_markdown
def _slugish(value: str) -> str:
return re.sub(r"[^a-z0-9]+", "-", value.lower()).strip("-")
def _display_slug(slug: str) -> str:
"""Return the user-facing slug while preserving raw IDs for links/actions."""
text = str(slug or "")
return text.removeprefix("skills-sh-")
def _display_label(value: Any, *, fallback_slug: str = "") -> str:
text = str(value or fallback_slug or "")
return _display_slug(text)
def _strip_duplicate_wiki_heading(markdown_text: str, slug: str) -> str:
"""Drop the first H1 if it only repeats the page slug."""
lines = markdown_text.splitlines()
for idx, line in enumerate(lines):
if not line.strip():
continue
match = re.match(r"^#\s+(.+?)\s*$", line.strip())
if match and _slugish(match.group(1)) == _slugish(slug):
del lines[idx]
while idx < len(lines) and not lines[idx].strip():
del lines[idx]
break
return "\n".join(lines)
def _entity_wiki_href(slug: str, entity_type: str | None = None) -> str:
suffix = f"?type={quote(entity_type)}" if entity_type in _DASHBOARD_ENTITY_TYPES else ""
return f"/wiki/{quote(slug)}{suffix}"
def _graph_type_from_node_id(node_id: str, fallback: str = "skill") -> str:
prefix = node_id.split(":", 1)[0] if ":" in node_id else ""
return {
"skill": "skill",
"agent": "agent",
"mcp-server": "mcp-server",
"harness": "harness",
}.get(prefix, fallback)
def _subgraph_sidecar(slug: str, entity_type: str) -> dict[str, Any] | None:
sidecar = _load_sidecar(slug, entity_type=entity_type)
return sidecar if isinstance(sidecar, dict) else None
def _subgraph_quality_cell(sidecar: dict[str, Any] | None) -> str:
if sidecar is None:
return "<span class='muted'>no sidecar</span>"
grade = html.escape(str(sidecar.get("grade", "F")))
score = float(sidecar.get("raw_score", sidecar.get("score", 0.0)) or 0.0)
floor = str(sidecar.get("hard_floor") or "").strip()
floor_html = (
f" <span class='muted'>floor {html.escape(floor)}</span>"
if floor
else ""
)
return (
f"<span class='pill grade-{grade}'>{grade}</span> "
f"<code>{score:.3f}</code>{floor_html}"
)
def _subgraph_node_title(
label: str,
entity_type: str,
sidecar: dict[str, Any] | None,
) -> str:
if sidecar is None:
return f"{label} ({entity_type}) Β· no sidecar"
grade = str(sidecar.get("grade", "F"))
score = float(sidecar.get("raw_score", sidecar.get("score", 0.0)) or 0.0)
floor = str(sidecar.get("hard_floor") or "").strip()
floor_text = f" Β· floor {floor}" if floor else ""
return f"{label} ({entity_type}) Β· grade {grade} Β· score {score:.3f}{floor_text}"
def _subgraph_node_fill(entity_type: str) -> str:
return {
"agent": "#f59e0b",
"mcp-server": "#ef4444",
"harness": "#22c55e",
"skill": "#6366f1",
}.get(entity_type, "#64748b")
def _subgraph_grade_stroke(sidecar: dict[str, Any] | None) -> str:
grade = str((sidecar or {}).get("grade") or "")
return {
"A": "#059669",
"B": "#2563eb",
"C": "#d97706",
"D": "#ea580c",
"F": "#dc2626",
}.get(grade, "#ffffff")
def _render_entity_subgraph_svg(
node_by_id: dict[str, dict[str, Any]],
edges: list[dict],
center: str,
sidecar_by_id: dict[str, dict[str, Any] | None],
) -> str:
"""Render an embedded, interactive 3D graph for wiki entity pages."""
width = 980
height = 380
node_payload: list[dict[str, Any]] = []
for node_id, node in sorted(
node_by_id.items(),
key=lambda item: (
0 if item[0] == center else 1,
str(item[1].get("label") or item[0]),
),
):
node = node_by_id[node_id]
node_type = _graph_type_from_node_id(
node_id, str(node.get("type") or "skill"),
)
node_slug = _graph_slug_from_node_id(node_id)
label = _display_label(node.get("label"), fallback_slug=node_slug)
sidecar = sidecar_by_id.get(node_id)
node_payload.append({
"id": node_id,
"slug": node_slug,
"label": label,
"type": node_type,
"href": _entity_wiki_href(node_slug, node_type),
"title": _subgraph_node_title(label, node_type, sidecar),
"fill": _subgraph_node_fill(node_type),
"stroke": _subgraph_grade_stroke(sidecar),
"is_center": node_id == center,
})
edge_payload: list[dict[str, Any]] = []
for edge in edges:
data = edge.get("data", {})
source = str(data.get("source", ""))
target = str(data.get("target", ""))
if source not in node_by_id or target not in node_by_id:
continue
shared = ", ".join(str(tag) for tag in data.get("shared_tags", [])[:6]) or "none"
weight = float(data.get("weight", 0.0) or 0.0)
edge_payload.append({
"source": source,
"target": target,
"weight": weight,
"title": (
f"{_display_slug(_graph_slug_from_node_id(source))} ↔ "
f"{_display_slug(_graph_slug_from_node_id(target))} Β· weight {weight:.3f} "
f"Β· shared {shared}"
),
})
nodes_json = _json_for_script(node_payload)
edges_json = _json_for_script(edge_payload)
return (
"<div data-testid='entity-subgraph-graph' "
"style='border:1px solid #e5e7eb; border-radius:8px; "
"background:#f8fafc; margin:1rem 0; overflow:hidden;'>"
"<div style='display:flex; align-items:center; gap:0.5rem; "
"padding:0.45rem 0.6rem; border-bottom:1px solid #e5e7eb; background:#fff;'>"
"<button id='entity-subgraph-zoom-in' type='button'>zoom in</button>"
"<button id='entity-subgraph-zoom-out' type='button'>zoom out</button>"
"<span class='muted'>drag to rotate Β· wheel to zoom Β· hover nodes or edges</span>"
"</div>"
f"<svg data-testid='entity-subgraph-3d' viewBox='0 0 {width} {height}' "
"width='100%' height='380' role='img' aria-label='Embedded 3D entity subgraph' "
"style='display:block; background:#f8fafc; touch-action:none;'></svg>"
"<div style='display:grid; grid-template-columns:1fr 1fr; gap:0.5rem; "
"padding:0.45rem 0.6rem; border-top:1px solid #e5e7eb; background:#fff;'>"
"<div data-testid='entity-subgraph-node-detail' class='muted'>"
"Hover a node for sidecar grade/score/floor.</div>"
"<div data-testid='entity-subgraph-edge-detail' class='muted'>"
"Hover an edge for weight and shared signals.</div></div>"
"<script>\n"
"(function () {\n"
f" const nodes = {nodes_json};\n"
f" const edges = {edges_json};\n"
f" const width = {width};\n"
f" const height = {height};\n"
" const svg = document.querySelector('[data-testid=\"entity-subgraph-3d\"]');\n"
" const nodeDetail = document.querySelector('[data-testid=\"entity-subgraph-node-detail\"]');\n"
" const edgeDetail = document.querySelector('[data-testid=\"entity-subgraph-edge-detail\"]');\n"
" if (!svg) return;\n"
" const points = new Map();\n"
" const center = nodes.find(n => n.is_center) || nodes[0];\n"
" if (!center) return;\n"
" points.set(center.id, {x: 0, y: 0, z: 0});\n"
" nodes.filter(n => n.id !== center.id).forEach((n, idx) => {\n"
" const i = idx + 1;\n"
" const phi = Math.acos(1 - 2 * i / Math.max(2, nodes.length));\n"
" const theta = Math.PI * (3 - Math.sqrt(5)) * i;\n"
" const radius = 250;\n"
" points.set(n.id, {x: radius * Math.cos(theta) * Math.sin(phi), y: radius * Math.sin(theta) * Math.sin(phi), z: radius * Math.cos(phi)});\n"
" });\n"
" let yaw = -0.4;\n"
" let pitch = 0.55;\n"
" let zoom = 1;\n"
" function escapeHtml(s) { return String(s).replace(/[&<>\"']/g, ch => ({'&':'&amp;','<':'&lt;','>':'&gt;','\"':'&quot;',\"'\":'&#39;'}[ch])); }\n"
" function project(p) {\n"
" const cyaw = Math.cos(yaw), syaw = Math.sin(yaw);\n"
" const cp = Math.cos(pitch), sp = Math.sin(pitch);\n"
" const x1 = p.x * cyaw - p.z * syaw;\n"
" const z1 = p.x * syaw + p.z * cyaw;\n"
" const y1 = p.y * cp - z1 * sp;\n"
" const z2 = p.y * sp + z1 * cp;\n"
" const scale = zoom * 620 / (760 + z2);\n"
" return {x: width / 2 + x1 * scale, y: height / 2 + y1 * scale, z: z2, scale};\n"
" }\n"
" function attachHover() {\n"
" svg.querySelectorAll('[data-node-detail]').forEach(n => n.addEventListener('mouseenter', () => { nodeDetail.textContent = n.dataset.nodeDetail || ''; }));\n"
" svg.querySelectorAll('[data-edge-detail]').forEach(e => e.addEventListener('mouseenter', () => { edgeDetail.textContent = e.dataset.edgeDetail || ''; }));\n"
" }\n"
" function draw() {\n"
" const projected = new Map();\n"
" points.forEach((p, id) => projected.set(id, project(p)));\n"
" const edgeLines = edges.map(e => {\n"
" const s = projected.get(e.source);\n"
" const t = projected.get(e.target);\n"
" if (!s || !t) return '';\n"
" const w = Math.max(1, Math.min(4, 1 + Math.sqrt(Math.max(0, Number(e.weight || 1)))));\n"
" return '<line x1=\"' + s.x.toFixed(1) + '\" y1=\"' + s.y.toFixed(1) + '\" x2=\"' + t.x.toFixed(1) + '\" y2=\"' + t.y.toFixed(1) + '\" stroke=\"#64748b\" stroke-opacity=\"0.55\" stroke-width=\"' + w.toFixed(2) + '\" />';\n"
" }).join('');\n"
" const nodeEls = nodes.slice().sort((a, b) => (projected.get(a.id)?.z || 0) - (projected.get(b.id)?.z || 0)).map(n => {\n"
" const p = projected.get(n.id) || {x: width / 2, y: height / 2, z: 0, scale: 1};\n"
" const r = Math.max(7, (n.is_center ? 18 : 12) * Math.max(0.7, p.scale));\n"
" return '<a href=\"' + escapeHtml(n.href) + '\"><g data-testid=\"entity-subgraph-node\" data-node-detail=\"' + escapeHtml(n.title) + '\"><title>' + escapeHtml(n.title) + '</title><circle cx=\"' + p.x.toFixed(1) + '\" cy=\"' + p.y.toFixed(1) + '\" r=\"' + r + '\" fill=\"' + escapeHtml(n.fill) + '\" stroke=\"' + escapeHtml(n.stroke) + '\" stroke-width=\"3\" /><text x=\"' + p.x.toFixed(1) + '\" y=\"' + (p.y + r + 14).toFixed(1) + '\" text-anchor=\"middle\" font-size=\"11\" fill=\"#111827\" style=\"pointer-events:none;\">' + escapeHtml(String(n.label).slice(0, 28)) + '</text></g></a>';\n"
" }).join('');\n"
" const edgeHits = edges.map(e => {\n"
" const s = projected.get(e.source);\n"
" const t = projected.get(e.target);\n"
" if (!s || !t) return '';\n"
" const hx1 = s.x + (t.x - s.x) * 0.18, hy1 = s.y + (t.y - s.y) * 0.18;\n"
" const hx2 = s.x + (t.x - s.x) * 0.82, hy2 = s.y + (t.y - s.y) * 0.82;\n"
" return '<line data-testid=\"entity-subgraph-edge\" data-edge-detail=\"' + escapeHtml(e.title) + '\" x1=\"' + hx1.toFixed(1) + '\" y1=\"' + hy1.toFixed(1) + '\" x2=\"' + hx2.toFixed(1) + '\" y2=\"' + hy2.toFixed(1) + '\" stroke=\"transparent\" stroke-width=\"12\" style=\"pointer-events:stroke;\"><title>' + escapeHtml(e.title) + '</title></line>';\n"
" }).join('');\n"
" svg.innerHTML = '<rect width=\"100%\" height=\"100%\" fill=\"#f8fafc\" />' + edgeLines + nodeEls + edgeHits;\n"
" attachHover();\n"
" }\n"
" document.getElementById('entity-subgraph-zoom-in')?.addEventListener('click', () => { zoom = Math.min(2.5, zoom * 1.18); draw(); });\n"
" document.getElementById('entity-subgraph-zoom-out')?.addEventListener('click', () => { zoom = Math.max(0.35, zoom / 1.18); draw(); });\n"
" let dragging = false, lastX = 0, lastY = 0;\n"
" svg.addEventListener('pointerdown', ev => { dragging = true; lastX = ev.clientX; lastY = ev.clientY; svg.setPointerCapture(ev.pointerId); });\n"
" svg.addEventListener('pointerup', ev => { dragging = false; try { svg.releasePointerCapture(ev.pointerId); } catch (_) {} });\n"
" svg.addEventListener('pointermove', ev => { if (!dragging) return; yaw += (ev.clientX - lastX) * 0.01; pitch += (ev.clientY - lastY) * 0.01; pitch = Math.max(-1.35, Math.min(1.35, pitch)); lastX = ev.clientX; lastY = ev.clientY; draw(); });\n"
" svg.addEventListener('wheel', ev => { ev.preventDefault(); zoom = Math.max(0.35, Math.min(2.5, zoom * (ev.deltaY < 0 ? 1.08 : 0.92))); draw(); }, {passive:false});\n"
" draw();\n"
"})();\n"
"</script>"
"</div>"
)
def _render_entity_subgraph(slug: str, entity_type: str | None = None) -> str:
"""Render a compact 1-hop subgraph table for wiki entity pages."""
graph = _graph_neighborhood(slug, hops=1, limit=32, entity_type=entity_type)
center = graph.get("center")
nodes = graph.get("nodes") or []
edges = graph.get("edges") or []
if not center:
return (
"<div class='card'>"
"<p class='muted'>No graph node was found for this entity.</p>"
"</div>"
)
node_by_id = {
str(node.get("data", {}).get("id", "")): node.get("data", {})
for node in nodes
}
sidecar_by_id = {
node_id: _subgraph_sidecar(
_graph_slug_from_node_id(node_id),
_graph_type_from_node_id(node_id, str(node.get("type") or "skill")),
)
for node_id, node in node_by_id.items()
}
rows: list[str] = []
for edge in edges:
data = edge.get("data", {})
source = str(data.get("source", ""))
target = str(data.get("target", ""))
other_id = target if source == center else source
if other_id == center or other_id not in node_by_id:
continue
other = node_by_id[other_id]
other_type = _graph_type_from_node_id(other_id, str(other.get("type", "skill")))
other_slug = other_id.split(":", 1)[-1]
shared = ", ".join(str(tag) for tag in data.get("shared_tags", [])[:6])
shared_html = html.escape(shared) if shared else "<span class='muted'>none</span>"
quality_html = _subgraph_quality_cell(sidecar_by_id.get(other_id))
rows.append(
"<tr>"
f"<td><a href='{html.escape(_entity_wiki_href(other_slug, other_type))}'>"
f"{html.escape(str(other.get('label') or other_slug))}</a></td>"
f"<td><span class='pill entity-type-{html.escape(other_type)}'>"
f"{html.escape(other_type)}</span></td>"
f"<td>{quality_html}</td>"
f"<td><code>{float(data.get('weight', 0.0)):.3f}</code></td>"
f"<td>{shared_html}</td>"
"</tr>"
)
table = (
"<table><tr><th>Entity</th><th>Type</th><th>Quality sidecar</th>"
"<th>Weight</th><th>Shared signals</th></tr>"
+ ("".join(rows) if rows else "<tr><td colspan='5' class='muted'>No neighbors under the current limit.</td></tr>")
+ "</table>"
)
return (
"<div class='card'>"
"<h2>Subgraph</h2>"
f"<p class='muted'>{len(nodes)} nodes and {len(edges)} edges in the 1-hop neighborhood.</p>"
+ _render_entity_subgraph_svg(node_by_id, edges, center, sidecar_by_id)
+ table
+ "</div>"
)
def _entity_tab_script() -> str:
return """
<script>
(function () {
function showEntityTab(name) {
document.querySelectorAll('[data-entity-tab]').forEach(function (button) {
var active = button.getAttribute('data-entity-tab') === name;
button.classList.toggle('active', active);
button.setAttribute('aria-selected', active ? 'true' : 'false');
});
document.querySelectorAll('[data-entity-tab-panel]').forEach(function (panel) {
panel.hidden = panel.getAttribute('data-entity-tab-panel') !== name;
});
}
document.querySelectorAll('[data-entity-tab]').forEach(function (button) {
button.addEventListener('click', function () {
var name = button.getAttribute('data-entity-tab');
showEntityTab(name);
if (history.replaceState) {
history.replaceState(null, '', '#' + name);
}
});
});
document.querySelectorAll('[data-open-entity-tab]').forEach(function (link) {
link.addEventListener('click', function (event) {
event.preventDefault();
var name = link.getAttribute('data-open-entity-tab');
showEntityTab(name);
if (history.replaceState) {
history.replaceState(null, '', '#' + name);
}
});
});
var initial = (location.hash || '#overview').replace('#', '');
if (!document.querySelector('[data-entity-tab="' + initial + '"]')) {
initial = 'overview';
}
showEntityTab(initial);
})();
</script>
"""
def _render_entity_tabs(
*,
overview_html: str,
subgraph_html: str,
quality_html: str,
) -> str:
return (
"<div class='entity-tabs' role='tablist' aria-label='Entity sections'>"
"<button type='button' class='entity-tab-button active' role='tab' aria-selected='true' "
"data-entity-tab='overview'>Overview</button>"
"<button type='button' class='entity-tab-button' role='tab' aria-selected='false' "
"data-entity-tab='subgraph'>Subgraph</button>"
"<button type='button' class='entity-tab-button' role='tab' aria-selected='false' "
"data-entity-tab='quality'>Quality</button>"
"</div>"
f"<section id='overview' class='entity-tab-panel' data-entity-tab-panel='overview'>{overview_html}</section>"
f"<section id='subgraph' class='entity-tab-panel' data-entity-tab-panel='subgraph' hidden>{subgraph_html}</section>"
f"<section id='quality' class='entity-tab-panel' data-entity-tab-panel='quality' hidden>{quality_html}</section>"
+ _entity_tab_script()
)
def _render_quality_drilldown(
sidecar: dict | None,
embedded_quality_markdown: str | None = None,
) -> str:
"""Explain quality score signals for a wiki entity."""
if sidecar is None:
if embedded_quality_markdown:
quality_markdown = embedded_quality_markdown.strip()
if not re.search(r"^#{1,6}\s+Quality\b", quality_markdown, re.IGNORECASE | re.MULTILINE):
quality_markdown = "## Quality\n\n" + quality_markdown
return (
"<div class='card wiki-body'>"
+ _render_wiki_markdown(quality_markdown)
+ "</div>"
)
return (
"<div class='card'>"
"<h2>Quality</h2>"
"<p class='muted'>No quality sidecar exists for this entity yet.</p>"
"</div>"
)
grade = str(sidecar.get("grade", "F"))
score = float(sidecar.get("raw_score", sidecar.get("score", 0.0)) or 0.0)
weights_raw = sidecar.get("weights")
signals_raw = sidecar.get("signals")
weights: dict[str, Any] = weights_raw if isinstance(weights_raw, dict) else {}
signals: dict[str, Any] = signals_raw if isinstance(signals_raw, dict) else {}
signal_rows: list[str] = []
for name, signal in sorted(signals.items()):
signal_data = signal if isinstance(signal, dict) else {}
signal_score = float(signal_data.get("score", 0.0) or 0.0)
weight = float(weights.get(name, 0.0) or 0.0)
contribution = signal_score * weight
evidence = signal_data.get("evidence", {})
evidence_text = json.dumps(evidence, ensure_ascii=False, sort_keys=True, default=str)
evidence_preview, evidence_truncated = _truncate_text(evidence_text, 420)
truncated_marker = " <span class='muted'>(truncated)</span>" if evidence_truncated else ""
signal_rows.append(
"<tr>"
f"<td><code>{html.escape(str(name))}</code></td>"
f"<td><code>{signal_score:.3f}</code></td>"
f"<td><code>{weight:.3f}</code></td>"
f"<td><code>{contribution:.3f}</code></td>"
f"<td><code>{html.escape(evidence_preview)}</code>{truncated_marker}</td>"
"</tr>"
)
if not signal_rows:
signal_rows.append("<tr><td colspan='5' class='muted'>No signal breakdown was recorded.</td></tr>")
hard_floor = sidecar.get("hard_floor")
floor_html = f" <span class='muted'>floor {html.escape(str(hard_floor))}</span>" if hard_floor else ""
return (
"<div class='card'>"
"<h2>Quality</h2>"
f"<p><span class='pill grade-{html.escape(grade)}'>{html.escape(grade)}</span> "
f"score <strong>{score:.3f}</strong>"
f"{floor_html}</p>"
"<p class='muted'>Score is the weighted sum of recorded quality signals. "
"A hard floor can cap the final grade even when individual signals pass.</p>"
"<table class='quality-signal-table'>"
"<tr><th>Signal</th><th>Signal score</th><th>Weight</th><th>Contribution</th><th>Evidence</th></tr>"
+ "".join(signal_rows)
+ "</table>"
"<details><summary>Raw sidecar JSON</summary>"
f"<pre>{html.escape(json.dumps(sidecar, indent=2, ensure_ascii=False, default=str)[:6000])}</pre>"
"</details>"
"</div>"
)
def _save_manifest(manifest: dict) -> None:
_atomic_write_text(_manifest_path(), json.dumps(manifest, indent=2) + "\n")
def _read_skill_manifest_only() -> dict:
"""Read the mutable skill manifest without synthetic harness rows."""
path = _manifest_path()
if not path.exists():
return {"load": [], "unload": [], "warnings": []}
try:
manifest = json.loads(path.read_text(encoding="utf-8"))
except (OSError, json.JSONDecodeError):
return {"load": [], "unload": [], "warnings": []}
if not isinstance(manifest, dict):
return {"load": [], "unload": [], "warnings": []}
if not isinstance(manifest.get("load"), list):
manifest["load"] = []
if not isinstance(manifest.get("unload"), list):
manifest["unload"] = []
if not isinstance(manifest.get("warnings"), list):
manifest["warnings"] = []
return manifest
def _remove_loaded_manifest_entry(slug: str, entity_type: str) -> list[dict]:
"""Remove loaded rows for one entity tuple and return removed rows."""
path = _manifest_path()
with file_lock(path):
manifest = _read_skill_manifest_only()
removed: list[dict] = []
remaining: list[dict] = []
for entry in manifest.get("load", []):
entry_type = str(entry.get("entity_type") or "skill")
if entry.get("skill") == slug and entry_type == entity_type:
removed.append(entry)
else:
remaining.append(entry)
if not removed:
return []
manifest["load"] = remaining
unloaded = {
(entry.get("skill"), str(entry.get("entity_type") or "skill"))
for entry in manifest.get("unload", [])
}
preserved: dict[str, object] = {}
for field in ("command", "json_config", "priority", "reason"):
value = removed[0].get(field)
if value not in (None, ""):
preserved[field] = value
if (slug, entity_type) not in unloaded:
entry = {
"skill": slug,
"entity_type": entity_type,
"source": removed[0].get("source") or "ctx-monitor",
}
entry.update(preserved)
manifest.setdefault("unload", []).append(entry)
elif preserved:
for entry in manifest.get("unload", []):
if (
entry.get("skill") == slug
and str(entry.get("entity_type") or "skill") == entity_type
):
for field, value in preserved.items():
entry.setdefault(field, value)
break
_save_manifest(manifest)
return removed
def _log_dashboard_entity_event(
entity_type: str,
action: str,
slug: str,
) -> None:
"""Append a dashboard-visible audit row for a load/unload action."""
try:
from ctx_audit_log import log
if entity_type == "skill":
log(
f"skill.{action}",
subject_type="skill",
subject=slug,
actor="user",
meta={"via": "ctx-monitor"},
path=_audit_log_path(),
)
elif entity_type == "agent":
log(
f"agent.{action}",
subject_type="agent",
subject=slug,
actor="user",
meta={"via": "ctx-monitor"},
path=_audit_log_path(),
)
elif entity_type == "mcp-server":
log(
"toolbox.triggered",
subject_type="toolbox",
subject=slug,
actor="user",
meta={
"via": "ctx-monitor",
"entity_type": "mcp-server",
"action": action,
},
path=_audit_log_path(),
)
except Exception: # noqa: BLE001
pass
def _read_manifest() -> dict:
"""Return current loaded entities from the skill manifest plus harness installs."""
path = _manifest_path()
manifest: dict[str, Any]
if not path.exists():
manifest = {"load": [], "unload": [], "warnings": []}
else:
try:
manifest = json.loads(path.read_text(encoding="utf-8"))
except (OSError, json.JSONDecodeError):
manifest = {"load": [], "unload": [], "warnings": []}
if not isinstance(manifest, dict):
manifest = {"load": [], "unload": [], "warnings": []}
load_rows = manifest.setdefault("load", [])
if not isinstance(load_rows, list):
load_rows = []
manifest["load"] = load_rows
manifest.setdefault("unload", [])
manifest.setdefault("warnings", [])
existing = {
(str(row.get("entity_type") or "skill"), str(row.get("skill") or ""))
for row in load_rows
if isinstance(row, dict)
}
for row in _read_harness_install_rows():
key = ("harness", str(row.get("skill") or ""))
if key not in existing:
load_rows.append(row)
existing.add(key)
return manifest
def _read_harness_install_rows() -> list[dict]:
"""Return installed harness records as manifest-compatible load rows."""
root = _claude_dir() / "harness-installs"
if not root.is_dir():
return []
rows: list[dict] = []
for path in sorted(root.glob("*.json")):
try:
data = json.loads(path.read_text(encoding="utf-8"))
except (OSError, json.JSONDecodeError):
continue
if not isinstance(data, dict) or data.get("status") != "installed":
continue
slug = str(data.get("slug") or path.stem).strip()
if not slug or not _is_safe_slug(slug):
continue
rows.append({
"skill": slug,
"entity_type": "harness",
"source": "ctx-harness-install",
"command": data.get("target") or data.get("repo_url") or "",
"installed_at": data.get("installed_at", ""),
"status": data.get("status", "installed"),
})
return rows
def _queue_job_summary(job: wiki_queue.QueueJob) -> dict[str, Any]:
return {
"id": job.id,
"kind": job.kind,
"status": job.status,
"attempts": job.attempts,
"max_attempts": job.max_attempts,
"worker_id": job.worker_id,
"leased_until": job.leased_until,
"available_at": job.available_at,
"last_error": job.last_error,
"created_at": job.created_at,
"updated_at": job.updated_at,
"source": job.payload.get("source"),
"payload_keys": sorted(str(key) for key in job.payload),
}
def _queue_status() -> dict[str, Any]:
"""Return durable wiki/graph queue state without creating the DB."""
db_path = wiki_queue.queue_db_path(_wiki_dir())
counts = {
wiki_queue.STATUS_PENDING: 0,
wiki_queue.STATUS_RUNNING: 0,
wiki_queue.STATUS_SUCCEEDED: 0,
wiki_queue.STATUS_FAILED: 0,
wiki_queue.STATUS_CANCELLED: 0,
}
if not db_path.exists():
return {
"available": False,
"db_path": str(db_path),
"total": 0,
"counts": counts,
"recent_jobs": [],
}
try:
raw_counts = wiki_queue.count_jobs_by_status(db_path)
recent = wiki_queue.list_recent_jobs(db_path, limit=20)
except Exception as exc: # noqa: BLE001
return {
"available": False,
"db_path": str(db_path),
"total": 0,
"counts": counts,
"recent_jobs": [],
"error": str(exc),
}
for status, count in raw_counts.items():
counts[status] = count
return {
"available": True,
"db_path": str(db_path),
"total": sum(raw_counts.values()),
"counts": counts,
"recent_jobs": [_queue_job_summary(job) for job in recent],
}
def _file_status(path: Path) -> dict[str, Any]:
if not path.exists():
return {"path": str(path), "exists": False, "size": 0, "mtime": None}
try:
stat = path.stat()
except OSError as exc:
return {
"path": str(path),
"exists": False,
"size": 0,
"mtime": None,
"error": str(exc),
}
return {
"path": str(path),
"exists": path.is_file(),
"size": stat.st_size,
"mtime": stat.st_mtime,
}
def _repo_graph_dir() -> Path:
return Path(__file__).resolve().parents[1] / "graph"
def _first_existing_file_status(*paths: Path) -> dict[str, Any]:
for path in paths:
if path.exists():
return _file_status(path)
return _file_status(paths[0])
def _promotion_status(path: Path) -> dict[str, Any] | None:
try:
data = json.loads(path.read_text(encoding="utf-8"))
except (OSError, json.JSONDecodeError):
return None
if not isinstance(data, dict):
return None
previous = _dict_or_empty(data.get("previous"))
candidate = _dict_or_empty(data.get("candidate"))
current = _dict_or_empty(data.get("current"))
return {
"path": str(path),
"status": data.get("status"),
"target": data.get("target"),
"started_at": data.get("started_at"),
"promoted_at": data.get("promoted_at"),
"previous_sha256": previous.get("sha256"),
"previous_size": previous.get("size"),
"candidate_sha256": candidate.get("sha256"),
"candidate_size": candidate.get("size"),
"current_sha256": current.get("sha256"),
"current_size": current.get("size"),
}
def _dict_or_empty(value: Any) -> dict[str, Any]:
return value if isinstance(value, dict) else {}
def _artifact_status() -> dict[str, Any]:
"""Return shipped graph/wiki artifact file state and promotion metadata."""
wiki = _wiki_dir()
graph_dir = wiki / "graphify-out"
claude_graph_dir = _claude_dir() / "graph"
repo_graph_dir = _repo_graph_dir()
promotion_paths = sorted(
{
*graph_dir.glob("*.promotion.json"),
*wiki.glob("*.promotion.json"),
*claude_graph_dir.glob("*.promotion.json"),
},
key=lambda path: str(path),
)
promotions = [
promotion
for promotion in (_promotion_status(path) for path in promotion_paths)
if promotion is not None
]
return {
"graph_json": _file_status(graph_dir / "graph.json"),
"graph_delta_json": _file_status(graph_dir / "graph-delta.json"),
"communities_json": _file_status(graph_dir / "communities.json"),
"wiki_graph_tar": _first_existing_file_status(
claude_graph_dir / "wiki-graph.tar.gz",
repo_graph_dir / "wiki-graph.tar.gz",
),
"skills_sh_catalog": _first_existing_file_status(
wiki / "external-catalogs" / "skills-sh" / "catalog.json",
claude_graph_dir / "skills-sh-catalog.json.gz",
repo_graph_dir / "skills-sh-catalog.json.gz",
),
"promotion_count": len(promotions),
"promotions": promotions,
}
def _status_payload() -> dict[str, Any]:
return {
"queue": _queue_status(),
"artifacts": _artifact_status(),
}
def _read_jsonl(path: Path, limit: int | None = None) -> list[dict]:
if not path.exists():
return []
if limit is not None and limit <= 0:
return []
out: deque[dict] | list[dict]
out = deque(maxlen=limit) if limit is not None else []
with path.open(encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line:
continue
try:
event = json.loads(line)
except json.JSONDecodeError:
continue
if isinstance(event, dict):
out.append(event)
return list(out)
def _runtime_lifecycle_events(limit: int | None = 200) -> list[dict[str, Any]]:
events = _read_jsonl(_runtime_lifecycle_path(), limit=limit)
return [
event for event in events
if event.get("action") in {"validation", "escalation"}
]
def _runtime_escalation_key(event: dict[str, Any]) -> str:
for field in ("escalation_id", "event_id", "id"):
value = event.get(field)
if value:
return str(value)
return "\0".join(
str(event.get(field) or "")
for field in ("session_id", "trigger", "reason", "severity")
)
def _runtime_lifecycle_summary(limit: int = 200) -> dict[str, Any]:
events = _runtime_lifecycle_events(limit=None)
validations = [
event for event in events if event.get("action") == "validation"
]
escalations = [
event for event in events if event.get("action") == "escalation"
]
open_by_key: dict[str, dict[str, Any]] = {}
for event in escalations:
key = _runtime_escalation_key(event)
status = str(event.get("status") or "open").lower()
if status == "open":
open_by_key[key] = event
else:
open_by_key.pop(key, None)
open_escalations = list(open_by_key.values())
validation_failures = [
event for event in validations
if str(event.get("status") or "").lower() in {"failed", "error"}
]
sessions = sorted({
str(event.get("session_id") or "")
for event in events
if event.get("session_id")
})
return {
"path": str(_runtime_lifecycle_path()),
"events_total": len(events),
"validations_total": len(validations),
"validation_failures": len(validation_failures),
"escalations_total": len(escalations),
"open_escalations_total": len(open_escalations),
"latest_validation": validations[-1] if validations else None,
"recent_validations": validations[-20:],
"open_escalations": open_escalations[-20:],
"sessions": sessions,
}
def _sidecar_entity_type(sidecar: dict, fallback: str = "skill") -> str:
raw = str(
sidecar.get("entity_type")
or sidecar.get("subject_type")
or sidecar.get("type")
or fallback
)
return {
"skills": "skill",
"skill": "skill",
"agents": "agent",
"agent": "agent",
"mcp": "mcp-server",
"mcp-server": "mcp-server",
"mcp-servers": "mcp-server",
"harness": "harness",
"harnesses": "harness",
}.get(raw, raw)
def _sidecar_fallback_type(path: Path) -> str:
return "mcp-server" if path.parent.name == "mcp" else "skill"
def _read_sidecar_file(path: Path) -> dict | None:
try:
sidecar = json.loads(path.read_text(encoding="utf-8"))
except (OSError, json.JSONDecodeError):
return None
if not isinstance(sidecar, dict):
return None
etype = _sidecar_entity_type(sidecar, _sidecar_fallback_type(path))
sidecar.setdefault("slug", path.stem)
sidecar["subject_type"] = etype
return sidecar
def _load_sidecar(slug: str, entity_type: str | None = None) -> dict | None:
if not _is_safe_slug(slug):
return None
paths = [
_sidecar_dir() / f"{slug}.json",
_sidecar_dir() / "mcp" / f"{slug}.json",
]
if entity_type is not None:
suffixes = [entity_type]
if entity_type == "mcp-server":
suffixes.append("mcp")
for suffix in suffixes:
paths.append(_sidecar_dir() / f"{slug}-{suffix}.json")
for path in paths:
if not path.exists():
continue
sidecar = _read_sidecar_file(path)
if sidecar is None:
continue
if entity_type is None or _sidecar_entity_type(sidecar) == entity_type:
return sidecar
if entity_type is not None and _SIDECAR_INDEX_CACHE_VALUE is not None:
return _sidecar_index().get((slug, entity_type))
return None
def _sidecar_files() -> list[Path]:
files: list[Path] = []
for root in (_sidecar_dir(), _sidecar_dir() / "mcp"):
if not root.is_dir():
continue
files.extend(
p for p in sorted(root.glob("*.json"))
if not p.name.startswith(".")
and not p.name.endswith(".lifecycle.json")
)
return files
def _sidecar_index_cache_key() -> tuple[tuple[Path, float, int], ...]:
keys: list[tuple[Path, float, int]] = []
for path in _sidecar_files():
stat = path.stat()
keys.append((path.resolve(), stat.st_mtime, stat.st_size))
if keys:
return tuple(keys)
for root in (_sidecar_dir(), _sidecar_dir() / "mcp"):
if not root.is_dir():
continue
stat = root.stat()
keys.append((root.resolve(), stat.st_mtime, stat.st_size))
return tuple(keys)
def _sidecar_index() -> dict[tuple[str, str], dict]:
global _SIDECAR_INDEX_CACHE_KEY, _SIDECAR_INDEX_CACHE_VALUE
cache_key = _sidecar_index_cache_key()
if _SIDECAR_INDEX_CACHE_KEY == cache_key and _SIDECAR_INDEX_CACHE_VALUE is not None:
return _SIDECAR_INDEX_CACHE_VALUE
index: dict[tuple[str, str], dict] = {}
for path in _sidecar_files():
sidecar = _read_sidecar_file(path)
if sidecar is None:
continue
slug = str(sidecar.get("slug") or path.stem)
entity_type = _sidecar_entity_type(sidecar)
index.setdefault((slug, entity_type), sidecar)
_SIDECAR_INDEX_CACHE_KEY = cache_key
_SIDECAR_INDEX_CACHE_VALUE = index
return index
def _all_sidecars() -> list[dict]:
return list(_sidecar_index().values())
def _skills_page_int(
value: str | None,
*,
default: int,
minimum: int = 1,
maximum: int | None = None,
) -> int:
try:
parsed = int(str(value or "").strip())
except ValueError:
parsed = default
parsed = max(minimum, parsed)
if maximum is not None:
parsed = min(maximum, parsed)
return parsed
def _skills_query_values(raw: str | None, allowed: set[str]) -> set[str]:
values = {
item.strip()
for item in str(raw or "").split(",")
if item.strip()
}
return {item for item in values if item in allowed}
def _sidecar_sort_key(sidecar: dict) -> tuple[str, float, str]:
return (
str(sidecar.get("grade") or "F"),
-float(sidecar.get("raw_score") or sidecar.get("score") or 0.0),
str(sidecar.get("slug") or ""),
)
def _sidecar_card_payload(sidecar: dict) -> dict[str, Any]:
slug = str(sidecar.get("slug") or "")
entity_type = _sidecar_entity_type(sidecar)
return {
"slug": slug,
"grade": str(sidecar.get("grade") or "F"),
"type": entity_type,
"hard_floor": str(sidecar.get("hard_floor") or ""),
"raw_score": float(sidecar.get("raw_score") or sidecar.get("score") or 0.0),
"sidecar_href": f"/skill/{quote(slug)}?type={quote(entity_type)}",
"wiki_href": f"/wiki/{quote(slug)}?type={quote(entity_type)}",
"graph_href": f"/graph?slug={quote(slug)}&type={quote(entity_type)}",
}
def _sidecar_filter_signature(files: list[Path]) -> tuple[Any, ...]:
signature: list[tuple[str, int, int]] = []
for path in files:
try:
stat = path.stat()
except OSError:
continue
signature.append((
str(path.resolve()),
int(getattr(stat, "st_mtime_ns", int(stat.st_mtime * 1_000_000_000))),
int(stat.st_size),
))
if signature:
return tuple(signature)
roots = (_sidecar_dir(), _sidecar_dir() / "mcp")
for root in roots:
if not root.is_dir():
signature.append((str(root.resolve()), 0, 0))
continue
stat = root.stat()
signature.append((
str(root.resolve()),
int(getattr(stat, "st_mtime_ns", int(stat.st_mtime * 1_000_000_000))),
0,
))
return tuple(signature)
def _sidecar_candidate_files(
files: list[Path],
*,
q: str,
types: set[str],
) -> list[Path]:
q_lower = q.lower()
candidates = [
path for path in files
if not q_lower or q_lower in path.stem.lower()
]
if not types:
return candidates
if types == {"mcp-server"}:
return [path for path in candidates if path.parent.name == "mcp"]
if "mcp-server" not in types:
return [path for path in candidates if path.parent.name != "mcp"]
return candidates
def _filtered_sidecar_records(
files: list[Path],
*,
q: str,
types: set[str],
grades: set[str],
hide_floor: bool,
) -> list[dict[str, Any]]:
"""Return cached filtered sidecar card records for /skills search."""
global _SIDECAR_FILTER_CACHE_SIGNATURE, _SIDECAR_FILTER_CACHE_VALUE
signature = _sidecar_filter_signature(files)
if _SIDECAR_FILTER_CACHE_SIGNATURE != signature:
_SIDECAR_FILTER_CACHE_SIGNATURE = signature
_SIDECAR_FILTER_CACHE_VALUE = {}
cache_key = (
q.lower(),
tuple(sorted(types)),
tuple(sorted(grades)),
hide_floor,
)
cached = _SIDECAR_FILTER_CACHE_VALUE.get(cache_key)
if cached is not None:
return cached
records: list[dict[str, Any]] = []
for path in _sidecar_candidate_files(files, q=q, types=types):
sidecar = _read_sidecar_file(path)
if sidecar is None:
continue
if not _sidecar_matches_filters(
sidecar,
q=q,
types=types,
grades=grades,
hide_floor=hide_floor,
):
continue
records.append(_sidecar_card_payload(sidecar))
records.sort(key=_sidecar_sort_key)
if len(_SIDECAR_FILTER_CACHE_VALUE) >= 32:
_SIDECAR_FILTER_CACHE_VALUE.clear()
_SIDECAR_FILTER_CACHE_VALUE[cache_key] = records
return records
def _sidecar_matches_filters(
sidecar: dict,
*,
q: str,
types: set[str],
grades: set[str],
hide_floor: bool,
) -> bool:
entity_type = _sidecar_entity_type(sidecar)
grade = str(sidecar.get("grade") or "F")
floor = str(sidecar.get("hard_floor") or "")
if types and entity_type not in types:
return False
if grades and grade not in grades:
return False
if hide_floor and floor:
return False
if q:
return q.lower() in str(sidecar.get("slug") or "").lower()
return True
def _sidecar_page_payload(qs: dict[str, str] | None = None) -> dict[str, Any]:
"""Return a paginated sidecar payload for /skills and its JSON API."""
qs = qs or {}
page = _skills_page_int(qs.get("page"), default=1)
limit = _skills_page_int(
qs.get("limit"),
default=_SKILLS_PAGE_DEFAULT_LIMIT,
maximum=_SKILLS_PAGE_MAX_LIMIT,
)
q = str(qs.get("q") or "").strip()
types = _skills_query_values(qs.get("type"), set(_DASHBOARD_ENTITY_TYPES))
grades = _skills_query_values(qs.get("grade"), {"A", "B", "C", "D", "F"})
hide_floor = str(qs.get("hide_floor") or "").strip().lower() in {
"1", "true", "yes", "on",
}
files = _sidecar_files()
catalog_total = len(files)
has_filters = bool(q or types or grades or hide_floor)
if has_filters:
sidecars = _filtered_sidecar_records(
files,
q=q,
types=types,
grades=grades,
hide_floor=hide_floor,
)
total = len(sidecars)
start = (page - 1) * limit
page_sidecars = sidecars[start:start + limit]
else:
total = catalog_total
start = (page - 1) * limit
selected_files = files[start:start + limit]
page_sidecars = [
sidecar
for path in selected_files
if (sidecar := _read_sidecar_file(path)) is not None
]
if catalog_total <= limit:
page_sidecars.sort(key=_sidecar_sort_key)
pages = max(1, math.ceil(total / limit)) if total else 1
if page > pages:
page = pages
return _sidecar_page_payload({
**qs,
"page": str(page),
"limit": str(limit),
})
return {
"items": [_sidecar_card_payload(sidecar) for sidecar in page_sidecars],
"total": total,
"catalog_total": catalog_total,
"page": page,
"limit": limit,
"pages": pages,
"has_next": page < pages,
"has_prev": page > 1,
"filtered": has_filters,
"q": q,
"types": sorted(types),
"grades": sorted(grades),
"hide_floor": hide_floor,
}
# ─── Aggregations ────────────────────────────────────────────────────────────
def _summarize_sessions() -> list[dict]:
"""Join audit-log session events with skill-events.jsonl load/unloads."""
audit = _read_jsonl(_audit_log_path())
events = _read_jsonl(_events_jsonl_path())
by_session: dict[str, dict[str, Any]] = defaultdict(
lambda: {
"session_id": "",
"first_seen": None,
"last_seen": None,
"skills_loaded": set(),
"skills_unloaded": set(),
"agents_loaded": set(),
"agents_unloaded": set(),
"mcps_loaded": set(),
"mcps_unloaded": set(),
"score_updates": 0,
"lifecycle_transitions": 0,
}
)
for line in audit:
sid = line.get("session_id") or "unknown"
row = by_session[sid]
row["session_id"] = sid
ts = line.get("ts")
if ts and (row["first_seen"] is None or ts < row["first_seen"]):
row["first_seen"] = ts
if ts and (row["last_seen"] is None or ts > row["last_seen"]):
row["last_seen"] = ts
event = line.get("event", "")
if event == "skill.loaded":
row["skills_loaded"].add(line.get("subject", ""))
elif event == "skill.unloaded":
row["skills_unloaded"].add(line.get("subject", ""))
elif event == "agent.loaded":
row["agents_loaded"].add(line.get("subject", ""))
elif event == "agent.unloaded":
row["agents_unloaded"].add(line.get("subject", ""))
elif event == "toolbox.triggered":
raw_meta = line.get("meta")
meta: dict[str, Any] = raw_meta if isinstance(raw_meta, dict) else {}
if meta.get("entity_type") == "mcp-server":
action = meta.get("action")
if action == "loaded":
row["mcps_loaded"].add(line.get("subject", ""))
elif action == "unloaded":
row["mcps_unloaded"].add(line.get("subject", ""))
elif event.endswith(".score_updated"):
row["score_updates"] += 1
elif event in ("skill.archived", "skill.demoted", "skill.restored",
"skill.deleted", "agent.archived", "agent.demoted",
"agent.restored", "agent.deleted"):
row["lifecycle_transitions"] += 1
for line in events:
sid = line.get("session_id") or "unknown"
row = by_session[sid]
row["session_id"] = sid
ts = line.get("timestamp")
if ts and (row["first_seen"] is None or ts < row["first_seen"]):
row["first_seen"] = ts
if ts and (row["last_seen"] is None or ts > row["last_seen"]):
row["last_seen"] = ts
action = line.get("event")
entity_type = (
_audit_entity_type(line)
or ("agent" if line.get("agent") else None)
or ("mcp-server" if line.get("mcp") or line.get("mcp_server") else None)
or ("skill" if line.get("skill") else None)
)
if entity_type == "agent":
subject = line.get("agent")
elif entity_type == "mcp-server":
subject = line.get("mcp") or line.get("mcp_server")
else:
subject = line.get("skill")
if action == "load" and subject:
if entity_type == "agent":
row["agents_loaded"].add(subject)
elif entity_type == "mcp-server":
row["mcps_loaded"].add(subject)
else:
row["skills_loaded"].add(subject)
elif action == "unload" and subject:
if entity_type == "agent":
row["agents_unloaded"].add(subject)
elif entity_type == "mcp-server":
row["mcps_unloaded"].add(subject)
else:
row["skills_unloaded"].add(subject)
summaries: list[dict] = []
for row in by_session.values():
summaries.append({
"session_id": row["session_id"],
"first_seen": row["first_seen"],
"last_seen": row["last_seen"],
"skills_loaded": sorted(row["skills_loaded"]),
"skills_unloaded": sorted(row["skills_unloaded"]),
"agents_loaded": sorted(row["agents_loaded"]),
"agents_unloaded": sorted(row["agents_unloaded"]),
"mcps_loaded": sorted(row["mcps_loaded"]),
"mcps_unloaded": sorted(row["mcps_unloaded"]),
"score_updates": row["score_updates"],
"lifecycle_transitions": row["lifecycle_transitions"],
})
summaries.sort(key=lambda r: r.get("last_seen") or "", reverse=True)
return summaries
def _grade_distribution() -> dict[str, int]:
dist = {"A": 0, "B": 0, "C": 0, "D": 0, "F": 0}
for s in _all_sidecars():
g = s.get("grade")
if g in dist:
dist[g] += 1
return dist
def _grade_distribution_payload() -> dict[str, Any]:
grades = _grade_distribution()
return {"grades": grades, "total": sum(grades.values())}
def _session_detail(session_id: str) -> dict:
audit = _read_jsonl(_audit_log_path())
events = _read_jsonl(_events_jsonl_path())
session_audit = [r for r in audit if r.get("session_id") == session_id]
session_events = [e for e in events if e.get("session_id") == session_id]
return {
"session_id": session_id,
"audit_entries": session_audit,
"load_events": session_events,
}
# ─── HTML rendering ──────────────────────────────────────────────────────────
def _monitor_asset_text(name: str) -> str:
"""Read a packaged dashboard asset."""
return files("ctx").joinpath("assets", name).read_text(encoding="utf-8")
def _monitor_inline_script(name: str) -> str:
return f"<script>\n{_monitor_asset_text(name).rstrip()}\n</script>"
_CSS = _monitor_asset_text("monitor.css")
def _layout(title: str, body: str) -> str:
"""Wrap body HTML in the standard page chrome."""
nav_items = (
("home", "Home", "/"),
("loaded", "Loaded", "/loaded"),
("skills", "Skills", "/skills"),
("wiki", "Wiki", "/wiki"),
("graph", "Graph", "/graph"),
("manage", "Manage", "/manage"),
("harness", "Harness Setup", "/harness"),
("docs", "Docs", "/docs"),
("config", "Config", "/config"),
("status", "Status", "/status"),
("kpi", "KPIs", "/kpi"),
("runtime", "Runtime", "/runtime"),
("sessions", "Sessions", "/sessions"),
("logs", "Logs", "/logs"),
("events", "Live", "/events"),
)
nav_html = "".join(
f"<a href='{html.escape(href)}' data-nav-key='{html.escape(key)}' "
"draggable='true' title='Drag to reorder dashboard tabs'>"
f"{html.escape(label)}</a>"
for key, label, href in nav_items
)
nav_default_keys = html.escape(
json.dumps([key for key, _label, _href in nav_items]),
quote=True,
)
return (
"<!doctype html><html><head><meta charset='utf-8'>"
"<meta name='viewport' content='width=device-width, initial-scale=1'>"
f"<title>{html.escape(title)} β€” ctx monitor</title>"
f"<style>{_CSS}</style></head><body>"
"<div class='nav' id='dashboard-nav' "
"data-nav-storage-key='ctx-monitor-nav-order' "
f"data-nav-default-keys='{nav_default_keys}' "
"aria-label='Dashboard navigation'>"
+ nav_html
+ "<button type='button' id='nav-reset' class='nav-reset' "
"title='Reset dashboard tab order'>reset</button>"
"</div>"
+ _monitor_inline_script("monitor-nav.js")
+ body
+ "</body></html>"
)
# ─── Graph neighborhood (for /graph) ────────────────────────────────────────
def _graph_slug_from_node_id(node_id: str) -> str:
return node_id.split(":", 1)[-1] if ":" in node_id else node_id
def _resolve_graph_center(
G: Any,
slug: str,
entity_type: str | None,
) -> tuple[str | None, dict[str, str] | None, list[str]]:
"""Resolve exact and fuzzy graph focus queries to one graph node id."""
raw_query = str(slug or "").strip()
if not raw_query or "/" in raw_query or "\\" in raw_query or ".." in raw_query:
return None, None, []
normalized_query = _slugish(raw_query)
if not normalized_query or not _is_safe_slug(normalized_query):
return None, None, []
entity_types = (
(entity_type,)
if entity_type is not None
else _DASHBOARD_ENTITY_TYPES
)
for current_type in entity_types:
for candidate_slug in (raw_query, normalized_query):
candidate = f"{current_type}:{candidate_slug}"
if candidate in G:
return candidate, None, [candidate_slug]
matches: list[tuple[tuple[int, int, int], str, str]] = []
query_tokens = set(normalized_query.split("-"))
for node_id in G.nodes:
node_type = _graph_type_from_node_id(str(node_id))
if node_type not in entity_types:
continue
data = G.nodes.get(node_id, {})
node_slug = _graph_slug_from_node_id(str(node_id))
label = _display_label(data.get("label"), fallback_slug=node_slug)
haystacks = {_slugish(node_slug), _slugish(_display_slug(node_slug)), _slugish(label)}
tags = data.get("tags", [])
if isinstance(tags, list):
haystacks.update(_slugish(str(tag)) for tag in tags[:12])
rank = None
if normalized_query in haystacks:
rank = 0
elif any(h.startswith(normalized_query) for h in haystacks):
rank = 1
elif any(normalized_query in h for h in haystacks):
rank = 2
elif query_tokens and all(
any(token in h for h in haystacks) for token in query_tokens
):
rank = 3
if rank is None:
continue
try:
degree = int(G.degree[node_id])
except Exception: # noqa: BLE001
degree = 0
matches.append(((rank, len(node_slug), -degree), str(node_id), node_slug))
matches.sort(key=lambda item: item[0])
suggestions = []
for _, _node_id, suggestion in matches[:8]:
display_suggestion = _display_slug(suggestion)
if display_suggestion not in suggestions:
suggestions.append(display_suggestion)
if not matches:
return None, None, suggestions
center = matches[0][1]
resolved_slug = _graph_slug_from_node_id(center)
return (
center,
{"query": raw_query, "slug": resolved_slug, "id": center},
suggestions,
)
def _unit_score(value: Any) -> float | None:
try:
score = float(value)
except (TypeError, ValueError):
return None
if not math.isfinite(score):
return None
return max(0.0, min(1.0, score))
def _format_count(value: Any) -> str:
try:
return f"{int(value):,}"
except (TypeError, ValueError):
return "0"
def _load_direct_sidecar(slug: str, entity_type: str | None = None) -> dict | None:
if not _is_safe_slug(slug):
return None
for path in (
_sidecar_dir() / f"{slug}.json",
_sidecar_dir() / "mcp" / f"{slug}.json",
):
if not path.exists():
continue
sidecar = _read_sidecar_file(path)
if sidecar is None:
continue
if entity_type is None or _sidecar_entity_type(sidecar) == entity_type:
return sidecar
return None
def _sidecar_score_inputs(slug: str, entity_type: str) -> tuple[float | None, float | None]:
sidecar = _load_direct_sidecar(slug, entity_type=entity_type)
if not isinstance(sidecar, dict):
return None, None
quality = _unit_score(sidecar.get("score", sidecar.get("raw_score")))
usage = None
signals = sidecar.get("signals")
if isinstance(signals, dict):
telemetry = signals.get("telemetry")
if isinstance(telemetry, dict):
usage = _unit_score(telemetry.get("score"))
return quality, usage
def _graph_node_size(
nid: str,
data: dict[str, Any],
*,
entity_type: str,
degree: int,
max_degree: int,
) -> dict[str, Any]:
"""Return bounded visual size metadata for a graph node."""
slug = nid.split(":", 1)[-1]
quality = _unit_score(data.get("quality_score"))
usage = _unit_score(data.get("usage_score"))
if quality is None or usage is None:
sidecar_quality, sidecar_usage = _sidecar_score_inputs(slug, entity_type)
quality = quality if quality is not None else sidecar_quality
usage = usage if usage is not None else sidecar_usage
quality_value = 0.35 if quality is None else quality
usage_value = 0.0 if usage is None else usage
popularity = (
math.log1p(max(0, degree)) / math.log1p(max(1, max_degree))
if max_degree > 0
else 0.0
)
signal = max(
0.0,
min(1.0, 0.45 * quality_value + 0.35 * usage_value + 0.20 * popularity),
)
return {
"node_size": round(8.0 + signal * 16.0, 2),
"size_signal": round(signal, 4),
"size_reason": (
f"quality {quality_value:.3f}; usage {usage_value:.3f}; "
f"popularity {popularity:.3f}"
),
}
def _dashboard_graph_index_path() -> Path:
return _wiki_dir() / "graphify-out" / "dashboard-neighborhoods.sqlite3"
def _dashboard_graph_manifest_export_id() -> str | None:
manifest_path = _wiki_dir() / "graphify-out" / "graph-export-manifest.json"
try:
data = json.loads(manifest_path.read_text(encoding="utf-8"))
except (OSError, json.JSONDecodeError):
return None
export_id = data.get("export_id") if isinstance(data, dict) else None
if not isinstance(export_id, str) or not export_id.strip():
return None
return export_id.strip()
def _dashboard_index_meta(index_path: Path) -> dict[str, Any] | None:
try:
conn = sqlite3.connect(f"file:{index_path.as_posix()}?mode=ro", uri=True)
except sqlite3.Error:
return None
try:
rows = conn.execute("SELECT key,value FROM meta").fetchall()
except sqlite3.Error:
return None
finally:
conn.close()
try:
return {str(key): json.loads(str(value)) for key, value in rows}
except (TypeError, ValueError, json.JSONDecodeError):
return None
def _dashboard_index_matches_manifest(index_path: Path) -> bool:
manifest_export_id = _dashboard_graph_manifest_export_id()
if manifest_export_id is None:
return False
meta = _dashboard_index_meta(index_path)
if meta is None:
return False
return meta.get("export_id") == manifest_export_id
def _dashboard_graph_has_runtime_overlays() -> bool:
overlay = _wiki_dir() / "graphify-out" / "entity-overlays.jsonl"
try:
return overlay.is_file() and overlay.stat().st_size > 0
except OSError:
return False
def _overlay_index_coverage_key(index_path: Path, overlay: Path) -> tuple[Any, ...] | None:
try:
index_stat = index_path.stat()
overlay_stat = overlay.stat()
except OSError:
return None
return (
index_path.resolve(),
index_stat.st_mtime,
index_stat.st_size,
overlay.resolve(),
overlay_stat.st_mtime,
overlay_stat.st_size,
_dashboard_graph_manifest_export_id(),
)
def _active_dashboard_overlay_records(overlay: Path) -> list[dict[str, Any]] | None:
try:
from ctx.core.graph.entity_overlays import active_overlay_records
rows = []
for line in overlay.read_text(encoding="utf-8").splitlines():
line = line.strip()
if not line:
continue
payload = json.loads(line)
if not isinstance(payload, dict):
return None
rows.append(payload)
return [dict(row) for row in active_overlay_records(rows)]
except (OSError, json.JSONDecodeError, TypeError, ValueError):
return None
def _dashboard_overlay_matches_known_release(overlay: Path) -> bool:
try:
from ctx_init import _GRAPH_ENTITY_OVERLAY_SHA256
except (ImportError, AttributeError):
return False
if not isinstance(_GRAPH_ENTITY_OVERLAY_SHA256, str) or not _GRAPH_ENTITY_OVERLAY_SHA256:
return False
try:
data = overlay.read_bytes().replace(b"\r\n", b"\n")
return hashlib.sha256(data).hexdigest() == _GRAPH_ENTITY_OVERLAY_SHA256
except OSError:
return False
def _dashboard_index_uncovered_overlay_nodes(
conn: sqlite3.Connection,
records: list[dict[str, Any]],
*,
require_edges: bool,
) -> set[str] | None:
uncovered: set[str] = set()
neighbor_targets: dict[str, set[str]] = {}
def node_exists(node_id: str) -> bool:
return bool(conn.execute(
"SELECT 1 FROM nodes WHERE id=? LIMIT 1",
(node_id,),
).fetchone())
def indexed_neighbors(node_id: str) -> set[str]:
cached = neighbor_targets.get(node_id)
if cached is not None:
return cached
row = conn.execute(
"SELECT payload FROM neighbors WHERE source=?",
(node_id,),
).fetchone()
targets: set[str] = set()
if row is not None:
try:
payload = json.loads(zlib.decompress(row["payload"]).decode("utf-8"))
except (TypeError, json.JSONDecodeError, zlib.error):
payload = []
if isinstance(payload, list):
targets = {
str(edge.get("target"))
for edge in payload
if isinstance(edge, dict) and isinstance(edge.get("target"), str)
}
neighbor_targets[node_id] = targets
return targets
for record in records:
nodes = record.get("nodes", [])
edges = record.get("edges", [])
if not isinstance(nodes, list) or not isinstance(edges, list):
return None
if not nodes and edges:
return None
for node in nodes:
if not isinstance(node, dict):
return None
node_id = node.get("id")
if not isinstance(node_id, str):
return None
if not node_exists(node_id):
uncovered.add(node_id)
if not require_edges:
continue
for edge in edges:
if not isinstance(edge, dict):
return None
source = edge.get("source")
target = edge.get("target")
if not isinstance(source, str) or not isinstance(target, str):
return None
source_exists = node_exists(source)
target_exists = node_exists(target)
if not source_exists:
uncovered.add(source)
if not target_exists:
uncovered.add(target)
if not source_exists or not target_exists:
uncovered.update((source, target))
continue
if target not in indexed_neighbors(source) and source not in indexed_neighbors(target):
uncovered.update((source, target))
return uncovered
def _dashboard_uncovered_runtime_overlay_nodes(index_path: Path) -> set[str] | None:
overlay = _wiki_dir() / "graphify-out" / "entity-overlays.jsonl"
try:
if not overlay.is_file() or overlay.stat().st_size == 0:
return set()
except OSError:
return set()
if not index_path.is_file() or not _dashboard_index_matches_manifest(index_path):
return None
records = _active_dashboard_overlay_records(overlay)
if records is None:
return None
require_edges = not _dashboard_overlay_matches_known_release(overlay)
try:
conn = sqlite3.connect(f"file:{index_path.as_posix()}?mode=ro", uri=True)
conn.row_factory = sqlite3.Row
try:
return _dashboard_index_uncovered_overlay_nodes(
conn,
records,
require_edges=require_edges,
)
finally:
conn.close()
except (OSError, sqlite3.Error, KeyError, TypeError):
return None
def _dashboard_index_covers_runtime_overlays(index_path: Path) -> bool:
"""Return True when the shipped SQLite index already includes overlays.
``ctx-init`` may install ``entity-overlays.jsonl`` beside a graph export.
Older dashboard code treated any overlay file as runtime-only and skipped
the SQLite fast path. Current release artifacts can already contain those
overlay nodes and edges in ``dashboard-neighborhoods.sqlite3``; in that
case loading the full graph just to merge the same small known-release
overlay is wasted cold-start work. Local/user overlays still fall back to
the full graph merge so newly attached edges remain visible.
"""
overlay = _wiki_dir() / "graphify-out" / "entity-overlays.jsonl"
try:
if not overlay.is_file() or overlay.stat().st_size == 0:
return True
except OSError:
return True
if not index_path.is_file() or not _dashboard_index_matches_manifest(index_path):
return False
global _OVERLAY_INDEX_COVERAGE_CACHE_KEY, _OVERLAY_INDEX_COVERAGE_CACHE_VALUE
cache_key = _overlay_index_coverage_key(index_path, overlay)
if (
cache_key is not None
and _OVERLAY_INDEX_COVERAGE_CACHE_KEY == cache_key
and _OVERLAY_INDEX_COVERAGE_CACHE_VALUE is not None
):
return _OVERLAY_INDEX_COVERAGE_CACHE_VALUE
uncovered = _dashboard_uncovered_runtime_overlay_nodes(index_path)
coverage = uncovered == set()
if cache_key is not None:
_OVERLAY_INDEX_COVERAGE_CACHE_KEY = cache_key
_OVERLAY_INDEX_COVERAGE_CACHE_VALUE = coverage
return coverage
def _dashboard_graph_index_archives() -> list[Path]:
module_root = Path(__file__).resolve().parent.parent
roots = (module_root,)
names = ("wiki-graph-runtime.tar.gz", "wiki-graph.tar.gz")
seen: set[Path] = set()
archives: list[Path] = []
for root in roots:
for name in names:
candidate = (root / "graph" / name).resolve()
if candidate in seen:
continue
seen.add(candidate)
if candidate.is_file():
archives.append(candidate)
return archives
def _packaged_graph_export_id() -> str | None:
global _PACKAGED_GRAPH_EXPORT_ID_CACHE
if isinstance(_PACKAGED_GRAPH_EXPORT_ID_CACHE, bool):
return None
if isinstance(_PACKAGED_GRAPH_EXPORT_ID_CACHE, str):
return _PACKAGED_GRAPH_EXPORT_ID_CACHE
module_root = Path(__file__).resolve().parent.parent
try:
data = json.loads(
(module_root / "graph" / "communities.json").read_text(
encoding="utf-8",
)
)
except (OSError, json.JSONDecodeError):
_PACKAGED_GRAPH_EXPORT_ID_CACHE = False
return None
export_id = data.get("export_id") if isinstance(data, dict) else None
if isinstance(export_id, str) and export_id.strip():
_PACKAGED_GRAPH_EXPORT_ID_CACHE = export_id.strip()
return export_id.strip()
_PACKAGED_GRAPH_EXPORT_ID_CACHE = False
return None
def _archive_graph_export_id(archive: Path) -> str | None:
try:
with tarfile.open(archive, "r:gz") as tar:
try:
member = tar.getmember("./graphify-out/graph-export-manifest.json")
except KeyError:
member = tar.getmember("graphify-out/graph-export-manifest.json")
source = tar.extractfile(member)
if source is None:
return None
try:
data = json.loads(source.read().decode("utf-8", errors="replace"))
finally:
source.close()
except (KeyError, OSError, tarfile.TarError, json.JSONDecodeError):
return None
export_id = data.get("export_id") if isinstance(data, dict) else None
return export_id.strip() if isinstance(export_id, str) and export_id.strip() else None
def _ensure_dashboard_graph_index() -> Path | None:
target = _dashboard_graph_index_path()
if target.is_file():
if _dashboard_index_matches_manifest(target):
return target
try:
target.unlink()
except OSError:
return None
manifest_export_id = _dashboard_graph_manifest_export_id()
packaged_export_id = _packaged_graph_export_id()
if (
manifest_export_id is not None
and packaged_export_id is not None
and manifest_export_id != packaged_export_id
):
return None
archives = _dashboard_graph_index_archives()
if not archives:
return None
if manifest_export_id is None:
return None
target.parent.mkdir(parents=True, exist_ok=True)
try:
with file_lock(target):
if target.is_file():
if _dashboard_index_matches_manifest(target):
return target
try:
target.unlink()
except OSError:
return None
for archive in archives:
archive_export_id = packaged_export_id or _archive_graph_export_id(archive)
if manifest_export_id and archive_export_id and archive_export_id != manifest_export_id:
continue
try:
with tarfile.open(archive, "r:gz") as tar:
try:
member = tar.getmember(f"./{_DASHBOARD_INDEX_MEMBER}")
except KeyError:
member = tar.getmember(_DASHBOARD_INDEX_MEMBER)
if not member.isfile():
continue
source = tar.extractfile(member)
if source is None:
continue
tmp = target.with_name(f".{target.name}.{os.getpid()}.tmp")
try:
with tmp.open("wb") as out:
for chunk in iter(lambda: source.read(1024 * 1024), b""):
out.write(chunk)
if not _dashboard_index_matches_manifest(tmp):
continue
os.replace(tmp, target)
return target
finally:
source.close()
if tmp.exists():
tmp.unlink()
except (KeyError, OSError, tarfile.TarError):
continue
except TimeoutError:
return None
return target if target.is_file() else None
def _index_node_size(
*,
slug: str,
entity_type: str,
quality: Any,
usage: Any,
degree: int,
max_degree: int,
) -> dict[str, Any]:
quality_value = _unit_score(quality)
usage_value = _unit_score(usage)
if quality_value is None or usage_value is None:
sidecar_quality, sidecar_usage = _sidecar_score_inputs(slug, entity_type)
quality_value = quality_value if quality_value is not None else sidecar_quality
usage_value = usage_value if usage_value is not None else sidecar_usage
q = 0.35 if quality_value is None else quality_value
u = 0.0 if usage_value is None else usage_value
popularity = (
math.log1p(max(0, degree)) / math.log1p(max(1, max_degree))
if max_degree > 0
else 0.0
)
signal = max(0.0, min(1.0, 0.45 * q + 0.35 * u + 0.20 * popularity))
return {
"node_size": round(8.0 + signal * 16.0, 2),
"size_signal": round(signal, 4),
"size_reason": f"quality {q:.3f}; usage {u:.3f}; popularity {popularity:.3f}",
}
def _resolve_index_center(
conn: sqlite3.Connection,
raw_query: str,
entity_type: str | None,
) -> tuple[str | None, dict[str, str] | None, list[str]]:
raw_query = str(raw_query or "").strip()
if not raw_query or "/" in raw_query or "\\" in raw_query or ".." in raw_query:
return None, None, []
normalized_query = _slugish(raw_query)
if not normalized_query or not _is_safe_slug(normalized_query):
return None, None, []
entity_types = (
(entity_type,)
if entity_type is not None
else _DASHBOARD_ENTITY_TYPES
)
candidates = []
for candidate in (raw_query, normalized_query):
if candidate and candidate not in candidates:
candidates.append(candidate)
for current_type in entity_types:
for candidate_slug in candidates:
row = conn.execute(
"SELECT node_id FROM slug_index WHERE slug=? AND type=? LIMIT 1",
(candidate_slug, current_type),
).fetchone()
if row is not None:
return str(row["node_id"]), None, [candidate_slug]
where = ""
params: list[Any] = []
if entity_type is not None:
where = "WHERE s.type=?"
params.append(entity_type)
rows = conn.execute(
"SELECT s.slug,s.type,s.node_id,n.label,n.tags,n.degree "
"FROM slug_index s JOIN nodes n ON n.id=s.node_id "
f"{where}",
params,
)
matches: list[tuple[tuple[int, int, int], str, str]] = []
query_tokens = set(normalized_query.split("-"))
for row in rows:
node_slug = str(row["slug"] or "")
label = _display_label(row["label"], fallback_slug=node_slug)
haystacks = {_slugish(node_slug), _slugish(_display_slug(node_slug)), _slugish(label)}
try:
tags = json.loads(row["tags"] or "[]")
except (TypeError, json.JSONDecodeError):
tags = []
if isinstance(tags, list):
haystacks.update(_slugish(str(tag)) for tag in tags[:12])
rank = None
if normalized_query in haystacks:
rank = 0
elif any(h.startswith(normalized_query) for h in haystacks):
rank = 1
elif any(normalized_query in h for h in haystacks):
rank = 2
elif query_tokens and all(
any(token in h for h in haystacks) for token in query_tokens
):
rank = 3
if rank is None:
continue
try:
degree = int(row["degree"] or 0)
except (TypeError, ValueError):
degree = 0
matches.append(((rank, len(node_slug), -degree), str(row["node_id"]), node_slug))
matches.sort(key=lambda item: item[0])
suggestions: list[str] = []
for _, _node_id, suggestion in matches[:8]:
display_suggestion = _display_slug(suggestion)
if display_suggestion not in suggestions:
suggestions.append(display_suggestion)
if not matches:
return None, None, suggestions
center = matches[0][1]
return (
center,
{"query": raw_query, "slug": _graph_slug_from_node_id(center), "id": center},
suggestions,
)
def _graph_neighborhood_from_index(
slug: str,
*,
hops: int,
limit: int,
entity_type: str | None,
) -> dict | None:
index_path = _ensure_dashboard_graph_index()
if index_path is None or not index_path.is_file():
return None
try:
conn = sqlite3.connect(f"file:{index_path.as_posix()}?mode=ro", uri=True)
except sqlite3.Error:
return None
conn.row_factory = sqlite3.Row
try:
meta = {
row["key"]: json.loads(row["value"])
for row in conn.execute("SELECT key,value FROM meta")
}
max_degree = int(meta.get("max_degree") or 1)
top_k = int(meta.get("top_k") or 0)
if hops > 1 or (top_k > 0 and limit > top_k):
return None
center, resolved, suggestions = _resolve_index_center(conn, slug, entity_type)
if center is None:
return {"nodes": [], "edges": [], "center": None, "suggestions": suggestions}
nodes_out: dict[str, dict[str, Any]] = {}
edges_out: list[dict[str, Any]] = []
emitted_edges: set[tuple[str, str]] = set()
frontier = [center]
seen = {center}
def add_node(node_id: str, depth: int) -> None:
if node_id in nodes_out:
return
row = conn.execute("SELECT * FROM nodes WHERE id=?", (node_id,)).fetchone()
if row is None:
return
tags = json.loads(row["tags"] or "[]")
degree = int(row["degree"] or 0)
node_type = str(row["type"] or _graph_type_from_node_id(node_id))
node_slug = _graph_slug_from_node_id(node_id)
size_data = _index_node_size(
slug=node_slug,
entity_type=node_type,
quality=row["quality_score"],
usage=row["usage_score"],
degree=degree,
max_degree=max_degree,
)
label = _display_label(row["label"], fallback_slug=node_slug)
nodes_out[node_id] = {
"data": {
"id": node_id,
"label": label,
"type": node_type,
"depth": depth,
"degree": degree,
"tags": tags[:6],
"description": row["description"] or "",
"quality_score": row["quality_score"],
"usage_score": row["usage_score"],
"filter_tokens": [
node_id,
row["label"],
node_slug,
_display_slug(node_slug),
label,
*tags,
],
**size_data,
},
}
add_node(center, 0)
for depth in range(1, hops + 1):
next_frontier: list[str] = []
for node_id in frontier:
row = conn.execute(
"SELECT payload FROM neighbors WHERE source=?",
(node_id,),
).fetchone()
if row is None:
continue
neighbors = json.loads(zlib.decompress(row["payload"]).decode("utf-8"))
for edge in neighbors:
if len(nodes_out) >= limit:
break
other = str(edge.get("target") or "")
if not other:
continue
add_node(other, depth)
edge_a, edge_b = sorted((node_id, other))
edge_key = (edge_a, edge_b)
if edge_key not in emitted_edges and other in nodes_out:
emitted_edges.add(edge_key)
shared_tags = list(edge.get("shared_tags") or [])[:4]
for current in (node_id, other):
tokens = nodes_out[current]["data"].setdefault(
"filter_tokens", []
)
tokens.extend(shared_tags)
edges_out.append({
"data": {
"id": f"{edge_key[0]}__{edge_key[1]}",
"source": node_id,
"target": other,
"weight": edge.get("weight", 1),
"shared_tags": shared_tags,
"reasons": edge.get("reasons", []),
"semantic": edge.get("semantic"),
"tag_sim": edge.get("tag_sim"),
"slug_token_sim": edge.get("slug_token_sim"),
"source_overlap": edge.get("source_overlap"),
},
})
if other not in seen:
seen.add(other)
next_frontier.append(other)
if len(nodes_out) >= limit:
break
frontier = next_frontier
if len(nodes_out) >= limit:
break
return dashboard_graph.enrich_neighborhood({
"nodes": list(nodes_out.values()),
"edges": edges_out,
"center": center,
"resolved": resolved or {"source": "dashboard-index"},
"suggestions": [],
}, source="dashboard-index")
except (OSError, sqlite3.Error, json.JSONDecodeError, zlib.error, KeyError, TypeError):
return None
finally:
conn.close()
def _graph_neighborhood(
slug: str,
hops: int = 1,
limit: int = 40,
entity_type: str | None = None,
) -> dict:
"""Return dashboard-shaped {nodes, edges} for the N-hop neighborhood.
Uses ``resolve_graph.load_graph`` so the NetworkX 'links' vs 'edges'
schema is handled centrally. Returns an empty shape if the graph
hasn't been built or the slug isn't a node.
"""
if "/" in slug or "\\" in slug or ".." in slug:
return {"nodes": [], "edges": [], "center": None}
normalized_entity_type = _normalize_dashboard_entity_type(entity_type)
index_path = _dashboard_graph_index_path()
has_runtime_overlays = _dashboard_graph_has_runtime_overlays()
index_covers_overlays = (
not has_runtime_overlays
or _dashboard_index_covers_runtime_overlays(index_path)
)
if index_covers_overlays:
indexed = _graph_neighborhood_from_index(
slug,
hops=hops,
limit=limit,
entity_type=normalized_entity_type,
)
if indexed is not None:
return indexed
elif hops == 1 and index_path.is_file() and _dashboard_index_matches_manifest(index_path):
indexed = _graph_neighborhood_from_index(
slug,
hops=hops,
limit=limit,
entity_type=normalized_entity_type,
)
center = indexed.get("center") if isinstance(indexed, dict) else None
uncovered = _dashboard_uncovered_runtime_overlay_nodes(index_path)
if indexed is not None and isinstance(center, str) and uncovered is not None:
if center not in uncovered:
return indexed
try:
G = _load_dashboard_graph()
except Exception: # noqa: BLE001 β€” graph is advisory; blank on error
return {"nodes": [], "edges": [], "center": None}
if G.number_of_nodes() == 0:
return {"nodes": [], "edges": [], "center": None}
center = None
if entity_type is not None and normalized_entity_type is None:
return {"nodes": [], "edges": [], "center": None}
center, resolved, suggestions = _resolve_graph_center(
G, slug, normalized_entity_type,
)
if center is None:
return {"nodes": [], "edges": [], "center": None}
nodes_out: dict[str, dict] = {}
edges_out: list[dict] = []
emitted_edges: set[tuple[str, str]] = set()
frontier = [center]
seen: set[str] = {center}
try:
max_degree = max((int(degree) for _node, degree in G.degree()), default=1)
except Exception: # noqa: BLE001
max_degree = 1
def _add_node(nid: str, depth: int) -> None:
if nid in nodes_out:
return
data = dict(G.nodes.get(nid, {}))
node_slug = nid.split(":", 1)[-1]
label = _display_label(data.get("label"), fallback_slug=node_slug)
tags = list(data.get("tags", []))
default_type = (
"mcp-server" if nid.startswith("mcp-server:")
else "harness" if nid.startswith("harness:")
else "agent" if nid.startswith("agent:")
else "skill"
)
ntype = data.get("type") or default_type
try:
degree = int(G.degree[nid])
except Exception: # noqa: BLE001
degree = 0
size_data = _graph_node_size(
nid,
data,
entity_type=str(ntype),
degree=degree,
max_degree=max_degree,
)
nodes_out[nid] = {
"data": {
"id": nid,
"label": label,
"type": ntype,
"depth": depth,
"degree": degree,
"tags": tags[:6],
"description": data.get("description", ""),
"quality_score": data.get("quality_score"),
"usage_score": data.get("usage_score"),
"filter_tokens": [nid, label, node_slug, _display_slug(node_slug), *tags],
**size_data,
},
}
_add_node(center, 0)
for depth in range(1, hops + 1):
next_frontier: list[str] = []
for nid in frontier:
# Sort neighbors by edge weight so we pick the strongest
# connections first under the ``limit`` cap.
neighbors = sorted(
G[nid].items(),
key=lambda kv: -kv[1].get("weight", 1),
)
for other, edata in neighbors:
if len(nodes_out) >= limit:
break
_add_node(other, depth)
edge_key = tuple(sorted((nid, other)))
if edge_key not in emitted_edges:
emitted_edges.add(edge_key)
shared_tags = edata.get("shared_tags", [])[:4]
for node_id in (nid, other):
tokens = nodes_out[node_id]["data"].setdefault(
"filter_tokens", []
)
tokens.extend(shared_tags)
edges_out.append({
"data": {
"id": f"{edge_key[0]}__{edge_key[1]}",
"source": nid,
"target": other,
"weight": edata.get("weight", 1),
"shared_tags": shared_tags,
"reasons": edata.get("reasons", []),
"semantic": edata.get("semantic"),
"tag_sim": edata.get("tag_sim"),
"slug_token_sim": edata.get("slug_token_sim"),
"source_overlap": edata.get("source_overlap"),
},
})
if other not in seen:
seen.add(other)
next_frontier.append(other)
if len(nodes_out) >= limit:
break
frontier = next_frontier
if len(nodes_out) >= limit:
break
return dashboard_graph.enrich_neighborhood({
"nodes": list(nodes_out.values()),
"edges": edges_out,
"center": center,
"resolved": resolved,
"suggestions": suggestions,
}, source="networkx")
def _graph_stats() -> dict:
"""Top-line graph stats for the home page."""
report = _wiki_dir() / "graphify-out" / "graph-report.md"
try:
match = _GRAPH_REPORT_RE.search(
report.read_text(encoding="utf-8", errors="replace"),
)
if match:
return {
"nodes": int(match.group(1).replace(",", "")),
"edges": int(match.group(2).replace(",", "")),
"available": True,
}
except OSError:
pass
index_path = _ensure_dashboard_graph_index()
if index_path is not None and index_path.is_file():
try:
conn = sqlite3.connect(f"file:{index_path.as_posix()}?mode=ro", uri=True)
try:
meta = {
row[0]: json.loads(row[1])
for row in conn.execute("SELECT key,value FROM meta")
}
return {
"nodes": int(meta.get("nodes_count") or 0),
"edges": int(meta.get("edges_count") or 0),
"available": int(meta.get("nodes_count") or 0) > 0,
}
finally:
conn.close()
except (OSError, sqlite3.Error, ValueError, TypeError, json.JSONDecodeError):
pass
try:
G = _load_dashboard_graph()
except Exception: # noqa: BLE001
return {"nodes": 0, "edges": 0, "available": False}
return {
"nodes": G.number_of_nodes(),
"edges": G.number_of_edges(),
"available": G.number_of_nodes() > 0,
}
def _wiki_stats_from_dashboard_index() -> dict[str, int] | None:
index_path = _dashboard_graph_index_path()
if not index_path.is_file() or not _dashboard_index_matches_manifest(index_path):
return None
try:
conn = sqlite3.connect(f"file:{index_path.as_posix()}?mode=ro", uri=True)
try:
rows = {
str(row[0]): int(row[1])
for row in conn.execute("SELECT type,COUNT(*) FROM nodes GROUP BY type")
}
finally:
conn.close()
except (OSError, sqlite3.Error, ValueError, TypeError):
return None
stats = {
"skills": rows.get("skill", 0),
"agents": rows.get("agent", 0),
"mcps": rows.get("mcp-server", 0),
"harnesses": rows.get("harness", 0),
}
stats["total"] = sum(stats.values())
stats["split_known"] = True
return stats
def _wiki_stats() -> dict:
"""Entity counts across all dashboard-supported entity types.
MCPs are sharded by first-char under ``entities/mcp-servers/<shard>/``
so we recurse rather than the flat glob used for skills + agents.
Home page consumes ``total`` for the headline number and the
individual counts for the dashboard entity-type detail
line.
"""
indexed = _wiki_stats_from_dashboard_index()
if indexed is not None:
return indexed
base = _wiki_dir() / "entities"
graph_out = _wiki_dir() / "graphify-out"
if graph_out.is_dir() and (graph_out / "graph-report.md").is_file():
graph_stats = _graph_stats()
return {
"skills": 0,
"agents": 0,
"mcps": 0,
"harnesses": 0,
"total": int(graph_stats.get("nodes") or 0),
"split_known": False,
}
skills = len(list((base / "skills").glob("*.md"))) if (base / "skills").is_dir() else 0
agents = len(list((base / "agents").glob("*.md"))) if (base / "agents").is_dir() else 0
mcp_dir = base / "mcp-servers"
mcps = len(list(mcp_dir.rglob("*.md"))) if mcp_dir.is_dir() else 0
harnesses = len(list((base / "harnesses").glob("*.md"))) if (base / "harnesses").is_dir() else 0
return {
"skills": skills,
"agents": agents,
"mcps": mcps,
"harnesses": harnesses,
"total": skills + agents + mcps + harnesses,
"split_known": True,
}
def _render_home() -> str:
sessions = _summarize_sessions()
recent = sessions[:10]
gstats = _graph_stats()
wstats = _wiki_stats()
runtime = _runtime_lifecycle_summary()
audit_lines = sum(1 for _ in _audit_log_path().open(encoding="utf-8")) \
if _audit_log_path().exists() else 0
manifest = _read_manifest()
recent_audit = _read_jsonl(_audit_log_path(), limit=10)
if wstats.get("split_known", True):
wiki_detail = (
f"{wstats['skills']:,} skills Β· {wstats['agents']:,} agents Β· "
f"{wstats['mcps']:,} MCPs Β· {wstats['harnesses']:,} harnesses"
)
else:
wiki_detail = "entity split unavailable; install the current graph index"
rows = []
for s in recent:
sid = s["session_id"]
rows.append(
f"<tr>"
f"<td><a href='/session/{html.escape(sid)}'>{html.escape(sid[:20])}</a></td>"
f"<td class='muted'>{html.escape(s['last_seen'] or 'β€”')}</td>"
f"<td>{len(s['skills_loaded'])}</td>"
f"<td>{len(s['skills_unloaded'])}</td>"
f"<td>{len(s['agents_loaded'])}</td>"
f"<td>{s['score_updates']}</td>"
f"</tr>"
)
audit_rows = "".join(
f"<tr><td class='muted'>{html.escape((r.get('ts') or '')[-8:])}</td>"
f"<td><span class='pill'>{html.escape(r.get('event', ''))}</span></td>"
f"<td><a href='/wiki/{html.escape(r.get('subject',''))}'><code>{html.escape(r.get('subject',''))}</code></a></td>"
f"</tr>"
for r in reversed(recent_audit)
)
body = (
"<h1>ctx monitor</h1>"
# ── Stat grid ────────────────────────────────────────────────
"<div style='display:grid; grid-template-columns:repeat(auto-fit, minmax(180px,1fr));"
" gap:0.8rem; margin-bottom:1.25rem;'>"
+ f"<div class='card'><div class='muted' style='font-size:0.8rem;'>Currently loaded</div>"
f"<div style='font-size:1.6rem; font-weight:600;'>{_format_count(len(manifest.get('load', [])))}</div>"
f"<a href='/loaded'>manage β†’</a></div>"
+ "<div class='card'><div class='muted' style='font-size:0.8rem;'>Sidecars</div>"
"<div id='home-sidecar-count' style='font-size:1.6rem; font-weight:600;'>...</div>"
"<a href='/skills'>browse β†’</a></div>"
+ f"<div class='card'><div class='muted' style='font-size:0.8rem;'>Wiki entities</div>"
f"<div style='font-size:1.6rem; font-weight:600;'>{_format_count(wstats['total'])}</div>"
f"<span class='muted' style='font-size:0.75rem;'>"
f"{html.escape(wiki_detail)}</span></div>"
+ f"<div class='card'><div class='muted' style='font-size:0.8rem;'>Knowledge graph</div>"
f"<div style='font-size:1.6rem; font-weight:600;'>{_format_count(gstats['nodes'])}</div>"
f"<span class='muted' style='font-size:0.75rem;'>{_format_count(gstats['edges'])} edges</span>"
f" Β· <a href='/graph'>explore β†’</a></div>"
+ f"<div class='card'><div class='muted' style='font-size:0.8rem;'>Runtime checks</div>"
f"<div style='font-size:1.6rem; font-weight:600;'>{_format_count(runtime['validations_total'])}</div>"
f"<span class='muted' style='font-size:0.75rem;'>"
f"{_format_count(runtime['validation_failures'])} failed / "
f"{_format_count(runtime['open_escalations_total'])} open escalations</span>"
f" / <a href='/runtime'>view -></a></div>"
+ f"<div class='card'><div class='muted' style='font-size:0.8rem;'>Audit events</div>"
f"<div style='font-size:1.6rem; font-weight:600;'>{_format_count(audit_lines)}</div>"
f"<a href='/logs'>view β†’</a> Β· <a href='/events'>live β†’</a></div>"
+ f"<div class='card'><div class='muted' style='font-size:0.8rem;'>Sessions</div>"
f"<div style='font-size:1.6rem; font-weight:600;'>{_format_count(len(sessions))}</div>"
f"<a href='/sessions'>browse β†’</a></div>"
+ "</div>"
# ── Grade distribution ────────────────────────────────────────
"<div class='card'><strong>Skill quality grades:</strong> "
+ "".join(
f"<span class='pill grade-{g}' data-home-grade='{g}'>{g}: ...</span> "
for g in ("A", "B", "C", "D", "F")
)
+ "<span id='home-grade-total' class='muted'> Β· total loading</span>"
"</div>"
"<script>"
"(() => {"
"const fmt = n => Number(n || 0).toLocaleString();"
"const countEl = document.getElementById('home-sidecar-count');"
"const totalEl = document.getElementById('home-grade-total');"
"fetch('/api/grades.json').then(r => r.ok ? r.json() : Promise.reject())"
".then(data => {"
"const grades = data.grades || {};"
"['A','B','C','D','F'].forEach(g => {"
"const el = document.querySelector(`[data-home-grade=\"${g}\"]`);"
"if (el) el.textContent = `${g}: ${fmt(grades[g] || 0)}`;"
"});"
"if (countEl) countEl.textContent = fmt(data.total || 0);"
"if (totalEl) totalEl.textContent = ` Β· total ${fmt(data.total || 0)}`;"
"})"
".catch(() => {"
"if (countEl) countEl.textContent = 'open';"
"if (totalEl) totalEl.textContent = ' Β· open Skills for counts';"
"});"
"})();"
"</script>"
# ── Two-column: recent sessions + recent audit ────────────────
"<div style='display:grid; grid-template-columns:2fr 1fr; gap:1rem;'>"
f"<div class='card'><strong>Recent sessions</strong> ({_format_count(len(sessions))} total)"
+ ("<table>"
"<tr><th>Session</th><th>Last seen</th><th>Load</th>"
"<th>Unload</th><th>Agents</th><th>Scores</th></tr>"
+ "".join(rows)
+ "</table>" if recent else
"<p class='muted'>No sessions recorded yet. Hooks start logging "
"once you run a Claude Code session with ctx installed.</p>")
+ "</div>"
"<div class='card'><strong>Latest audit events</strong>"
+ ("<table>"
"<tr><th>Time</th><th>Event</th><th>Subject</th></tr>"
+ audit_rows
+ "</table>" if recent_audit else
"<p class='muted'>No audit events yet.</p>")
+ "</div>"
"</div>"
)
return _layout("Home", body)
def _render_sessions_index() -> str:
sessions = _summarize_sessions()
rows = []
for s in sessions:
sid = s["session_id"]
rows.append(
f"<tr>"
f"<td><a href='/session/{html.escape(sid)}'><code>{html.escape(sid[:32])}</code></a></td>"
f"<td class='muted'>{html.escape(s['first_seen'] or 'β€”')}</td>"
f"<td class='muted'>{html.escape(s['last_seen'] or 'β€”')}</td>"
f"<td>{len(s['skills_loaded'])}</td>"
f"<td>{len(s['skills_unloaded'])}</td>"
f"<td>{len(s['agents_loaded'])}</td>"
f"<td>{len(s['agents_unloaded'])}</td>"
f"<td>{len(s['mcps_loaded'])}</td>"
f"<td>{len(s['mcps_unloaded'])}</td>"
f"<td>{s['lifecycle_transitions']}</td>"
f"</tr>"
)
body = (
"<h1>Sessions</h1>"
f"<p class='muted'>{len(sessions)} unique sessions observed.</p>"
"<table>"
"<tr><th>Session</th><th>First seen</th><th>Last seen</th>"
"<th>Skills↑</th><th>Skills↓</th>"
"<th>Agents↑</th><th>Agents↓</th>"
"<th>MCPs↑</th><th>MCPs↓</th><th>Lifecycle</th></tr>"
+ "".join(rows)
+ "</table>"
)
return _layout("Sessions", body)
def _render_session_detail(session_id: str) -> str:
detail = _session_detail(session_id)
audit = detail["audit_entries"]
events = detail["load_events"]
audit_rows = "".join(
f"<tr><td class='muted'>{html.escape(r.get('ts', ''))}</td>"
f"<td><span class='pill'>{html.escape(r.get('event', ''))}</span></td>"
f"<td><code>{html.escape(r.get('subject', ''))}</code></td>"
f"<td class='muted'>{html.escape(json.dumps(r.get('meta', {}))[:80])}</td></tr>"
for r in audit
)
event_rows = "".join(
f"<tr><td class='muted'>{html.escape(r.get('timestamp', ''))}</td>"
f"<td>{html.escape(r.get('event', ''))}</td>"
f"<td><code>{html.escape(r.get('skill') or r.get('agent') or '')}</code></td></tr>"
for r in events
)
body = (
f"<h1>Session {html.escape(session_id)}</h1>"
f"<div class='card'><strong>{len(audit)}</strong> audit entries Β· "
f"<strong>{len(events)}</strong> load/unload events</div>"
"<h2>Audit timeline</h2>"
"<table><tr><th>ts</th><th>event</th><th>subject</th><th>meta</th></tr>"
+ audit_rows
+ "</table>"
"<h2>Load/unload events</h2>"
"<table><tr><th>ts</th><th>event</th><th>subject</th></tr>"
+ event_rows
+ "</table>"
)
return _layout(f"Session {session_id}", body)
def _render_skills(qs: dict[str, str] | None = None) -> str:
payload = _sidecar_page_payload(qs)
sidecars = payload["items"]
cards = "".join(
f"<div class='skill-card' data-slug='{html.escape(s.get('slug', ''))}' "
f"data-grade='{html.escape(s.get('grade', 'F'))}' "
f"data-type='{html.escape(_sidecar_entity_type(s))}' "
f"data-floor='{html.escape(s.get('hard_floor') or '')}' "
f"style='border:1px solid #e5e7eb; border-radius:6px; padding:0.7rem 0.9rem; "
f"display:flex; flex-direction:column; gap:0.3rem;'>"
f"<div style='display:flex; justify-content:space-between; align-items:center;'>"
f"<code style='font-size:0.85rem;'>{html.escape(s.get('slug', ''))}</code>"
f"<span class='pill grade-{html.escape(s.get('grade', 'F'))}'>{html.escape(s.get('grade', 'F'))}</span>"
f"</div>"
f"<div class='muted' style='font-size:0.78rem;'>"
f"score {s.get('raw_score', 0.0):.3f} Β· {html.escape(s.get('type', s.get('subject_type', 'skill')))}"
f"{' Β· ' + html.escape(s.get('hard_floor','')) if s.get('hard_floor') else ''}"
f"</div>"
f"<div style='display:flex; gap:0.4rem; margin-top:0.2rem;'>"
f"<a href='/skill/{html.escape(s.get('slug', ''))}?type={html.escape(_sidecar_entity_type(s))}' style='font-size:0.78rem;'>sidecar</a>"
f"<a href='/wiki/{html.escape(s.get('slug', ''))}?type={html.escape(_sidecar_entity_type(s))}' style='font-size:0.78rem;'>wiki</a>"
f"<a href='/graph?slug={html.escape(s.get('slug', ''))}&amp;type={html.escape(_sidecar_entity_type(s))}' style='font-size:0.78rem;'>graph</a>"
f"</div>"
f"</div>"
for s in sidecars
)
start_index = ((payload["page"] - 1) * payload["limit"]) + 1 if payload["total"] else 0
end_index = min(payload["page"] * payload["limit"], payload["total"])
summary = (
f"Showing {start_index}-{end_index} of {payload['total']} matching sidecars"
if payload["filtered"]
else f"Showing {start_index}-{end_index} of {payload['catalog_total']} sidecars"
)
query_base = {
key: value
for key, value in (qs or {}).items()
if key not in {"page"}
}
def page_href(page: int) -> str:
params = {
**query_base,
"page": str(max(1, page)),
"limit": str(payload["limit"]),
}
query = "&".join(
f"{quote(str(key))}={quote(str(value))}"
for key, value in params.items()
if str(value).strip()
)
return "/skills" + (f"?{query}" if query else "")
prev_link = (
f"<a href='{html.escape(page_href(payload['page'] - 1))}'>previous</a>"
if payload["has_prev"]
else "<span class='muted'>previous</span>"
)
next_link = (
f"<a href='{html.escape(page_href(payload['page'] + 1))}'>next</a>"
if payload["has_next"]
else "<span class='muted'>next</span>"
)
pagination = (
"<div class='card' style='display:flex; justify-content:space-between; "
"align-items:center; gap:1rem;'>"
f"<span id='match-count' class='muted'>{html.escape(summary)} Β· page "
f"{payload['page']} of {payload['pages']}</span>"
f"<span>{prev_link} Β· {next_link}</span>"
"</div>"
)
selected_type = ",".join(payload["types"])
selected_grade = ",".join(payload["grades"])
type_options = "<option value=''>all types</option>" + "".join(
f"<option value='{html.escape(t)}'"
f"{' selected' if selected_type == t else ''}>{html.escape(t)}</option>"
for t in _DASHBOARD_ENTITY_TYPES
)
grade_options = "<option value=''>all grades</option>" + "".join(
f"<option value='{g}'{' selected' if selected_grade == g else ''}>{g}</option>"
for g in ("A", "B", "C", "D", "F")
)
limit_options = "".join(
f"<option value='{n}'{' selected' if payload['limit'] == n else ''}>{n}</option>"
for n in (50, 100, 200, 500)
)
hide_checked = " checked" if payload["hide_floor"] else ""
body = (
"<h1>Quality sidecars</h1>"
f"<p class='muted'>{payload['catalog_total']} sidecars Β· click any card to drill in.</p>"
+ pagination
+ "<div style='display:grid; grid-template-columns:220px 1fr; gap:1.25rem; align-items:start;'>"
# ── Left filter sidebar ──────────────────────────────────────
"<aside style='position:sticky; top:1rem;'>"
"<form class='card' id='skills-filter-form' method='get' action='/skills'>"
"<strong>Search</strong>"
f"<input type='text' id='skill-search' name='q' value='{html.escape(payload['q'])}' placeholder='filter by slug...' "
"style='width:100%; margin-top:0.4rem; padding:0.35rem 0.5rem; "
"border:1px solid #ccc; border-radius:4px;'>"
"<label style='display:block; margin-top:0.6rem; font-size:0.82rem;'>Type"
f"<select class='type-filter' name='type' style='width:100%; margin-top:0.25rem;'>{type_options}</select>"
"</label>"
"<label style='display:block; margin-top:0.6rem; font-size:0.82rem;'>Grade"
f"<select class='grade-filter' name='grade' style='width:100%; margin-top:0.25rem;'>{grade_options}</select>"
"</label>"
"<label style='display:block; margin-top:0.6rem; font-size:0.82rem;'>Limit"
f"<select name='limit' style='width:100%; margin-top:0.25rem;'>{limit_options}</select>"
"</label>"
"<label style='display:block; padding:0.55rem 0 0.35rem;'>"
f"<input type='checkbox' id='hide-floor' name='hide_floor' value='1'{hide_checked}> hide floored</label>"
"<button type='submit' style='width:100%;'>apply</button>"
"</form>"
"</aside>"
# ── Card grid ────────────────────────────────────────────────
"<div id='card-grid' style='display:grid; "
"grid-template-columns:repeat(auto-fill, minmax(280px, 1fr)); gap:0.7rem;'>"
+ cards
+ "</div>"
"</div>"
"<script>\n"
"document.querySelectorAll('#skills-filter-form select').forEach(el => {\n"
" el.addEventListener('change', () => el.form.submit());\n"
"});\n"
"</script>"
)
return _layout("Skills", body)
def _render_skill_detail(slug: str, entity_type: str | None = None) -> str:
sidecar = _load_sidecar(slug, entity_type=entity_type)
if sidecar is None:
return _layout(slug, f"<h1>{html.escape(slug)}</h1><p>No sidecar.</p>")
requested_type = (
_normalize_dashboard_entity_type(entity_type)
or _sidecar_entity_type(sidecar)
)
audit = [r for r in _read_jsonl(_audit_log_path())
if r.get("subject") == slug and _audit_entity_type(r) == requested_type]
audit_rows = "".join(
f"<tr><td class='muted'>{html.escape(r.get('ts', ''))}</td>"
f"<td><span class='pill'>{html.escape(r.get('event', ''))}</span></td>"
f"<td class='muted'>{html.escape(r.get('actor', ''))}</td></tr>"
for r in audit[-100:]
)
hard_floor = sidecar.get("hard_floor")
hard_floor_html = (
f" Β· floor {html.escape(str(hard_floor))}" if hard_floor else ""
)
body = (
f"<h1>{html.escape(slug)}</h1>"
f"<div class='card'>"
f"<span class='pill grade-{html.escape(sidecar.get('grade', 'F'))}'>grade {html.escape(sidecar.get('grade', 'F'))}</span> "
f"score <strong>{sidecar.get('raw_score', 0.0):.3f}</strong> "
f"<span class='muted'>Β· type {html.escape(sidecar.get('subject_type', ''))}"
f"{hard_floor_html}</span>"
"</div>"
"<h2>Sidecar</h2>"
f"<pre>{html.escape(json.dumps(sidecar, indent=2)[:4000])}</pre>"
f"<h2>Audit timeline ({len(audit)} entries)</h2>"
"<table><tr><th>ts</th><th>event</th><th>actor</th></tr>"
+ audit_rows
+ "</table>"
)
return _layout(slug, body)
def _top_degree_seeds_from_index(limit: int = 18) -> list[dict]:
index_path = _dashboard_graph_index_path()
if (
_dashboard_graph_has_runtime_overlays()
and not _dashboard_index_covers_runtime_overlays(index_path)
):
return []
ensured_index_path = _ensure_dashboard_graph_index()
if ensured_index_path is None or not ensured_index_path.is_file():
return []
conn: sqlite3.Connection | None = None
try:
conn = sqlite3.connect(f"file:{ensured_index_path.as_posix()}?mode=ro", uri=True)
conn.row_factory = sqlite3.Row
rows = conn.execute(
"SELECT id,label,type,degree FROM nodes ORDER BY degree DESC,id LIMIT ?",
(max(1, limit),),
).fetchall()
except (OSError, sqlite3.Error, TimeoutError):
return []
finally:
if conn is not None:
conn.close()
return [
{
"slug": _graph_slug_from_node_id(str(row["id"])),
"type": _graph_type_from_node_id(str(row["id"]), str(row["type"] or "skill")),
"degree": int(row["degree"] or 0),
"label": row["label"] or _graph_slug_from_node_id(str(row["id"])),
}
for row in rows
]
def _top_degree_seeds(limit: int = 18, *, allow_load: bool = True) -> list[dict]:
"""Pick high-degree nodes from the graph as seed suggestions.
Used by ``/graph`` landing page so the first-time visitor has
something to click. Falls back to empty on any graph-load failure.
"""
try:
G = _load_dashboard_graph() if allow_load else _GRAPH_CACHE_VALUE
except Exception: # noqa: BLE001
return []
if G is None:
return _top_degree_seeds_from_index(limit)
if G.number_of_nodes() == 0:
return []
ranked = sorted(G.degree, key=lambda kv: -kv[1])[:limit]
out: list[dict] = []
for node_id, degree in ranked:
prefix, _, slug = node_id.partition(":")
seed_type = (
"mcp-server" if prefix == "mcp-server"
else "harness" if prefix == "harness"
else "agent" if prefix == "agent"
else "skill"
)
out.append({
"slug": slug,
"type": seed_type,
"degree": int(degree),
"label": G.nodes[node_id].get("label", slug),
})
return out
def _read_default_config_raw() -> dict[str, Any]:
try:
from ctx_config import _read_default_config # type: ignore
raw = _read_default_config()
return raw if isinstance(raw, dict) else {}
except Exception: # noqa: BLE001
path = Path(__file__).with_name("config.json")
if not path.exists():
return {}
try:
raw = json.loads(path.read_text(encoding="utf-8"))
return raw if isinstance(raw, dict) else {}
except Exception: # noqa: BLE001
return {}
def _read_user_config_raw() -> dict[str, Any]:
path = _user_config_path()
if not path.exists():
return {}
try:
raw = json.loads(path.read_text(encoding="utf-8"))
return raw if isinstance(raw, dict) else {}
except Exception: # noqa: BLE001
return {}
def _deep_merge_config(base: dict[str, Any], override: dict[str, Any]) -> None:
for key, value in override.items():
if isinstance(base.get(key), dict) and isinstance(value, dict):
_deep_merge_config(base[key], value)
else:
base[key] = value
def _config_value(raw: dict[str, Any], path: str, default: Any = None) -> Any:
current: Any = raw
for part in path.split("."):
if not isinstance(current, dict) or part not in current:
return default
current = current[part]
return current
def _set_config_value(raw: dict[str, Any], path: str, value: Any) -> None:
current = raw
parts = path.split(".")
for part in parts[:-1]:
child = current.get(part)
if not isinstance(child, dict):
child = {}
current[part] = child
current = child
current[parts[-1]] = value
def _delete_config_value(raw: dict[str, Any], path: str) -> None:
current = raw
parts = path.split(".")
parents: list[tuple[dict[str, Any], str]] = []
for part in parts[:-1]:
child = current.get(part)
if not isinstance(child, dict):
return
parents.append((current, part))
current = child
current.pop(parts[-1], None)
for parent, key in reversed(parents):
child = parent.get(key)
if isinstance(child, dict) and not child:
parent.pop(key, None)
def _config_field_specs() -> tuple[dict[str, Any], ...]:
return (
{"group": "Knowledge", "path": "knowledge.mode", "type": "choice", "choices": ("shipped", "local", "enriched"), "required": True, "label": "Knowledge source mode", "help": "shipped uses ctx's packaged graph/wiki, local stays private, enriched starts from shipped knowledge and adds your own.", "example": "enriched"},
{"group": "Recommendation", "path": "resolver.recommendation_top_k", "type": "int", "min": 1, "max": 5, "required": True, "label": "Max mixed recommendations", "help": "Hard cap for the combined skills/agents/MCP recommendation bundle.", "example": 5},
{"group": "Recommendation", "path": "resolver.recommendation_min_normalized_score", "type": "float", "min": 0.0, "max": 1.0, "step": 0.01, "required": True, "label": "Minimum recommendation score", "help": "Drops weak skill/agent/MCP matches instead of recommending at all cost.", "example": 0.30},
{"group": "Recommendation", "path": "resolver.max_skills", "type": "int", "min": 1, "max": 50, "label": "Resolver hard skill ceiling", "help": "Maximum load candidates considered by a resolver call.", "example": 15},
{"group": "Harness", "path": "harness.recommendation_min_fit_score", "type": "float", "min": 0.0, "max": 1.0, "step": 0.01, "required": True, "label": "Minimum harness fit score", "help": "Custom/API/local model users only see harnesses at or above this fit floor.", "example": 0.85},
{"group": "Harness", "path": "harness.recommendation_min_normalized_score", "type": "float", "min": 0.0, "max": 1.0, "step": 0.01, "label": "Harness normalized score floor", "help": "Compatibility display floor for older configs.", "example": 0.85},
{"group": "Micro-skills", "path": "skill_transformer.line_threshold", "type": "int", "min": 1, "max": 2000, "required": True, "label": "Micro-skill line threshold", "help": "Any SKILL.md above this many lines triggers the micro-skills conversion gate.", "example": 180},
{"group": "Micro-skills", "path": "skill_transformer.max_stage_lines", "type": "int", "min": 1, "max": 300, "label": "Max staged reference lines", "help": "Target maximum lines for each generated reference stage.", "example": 40},
{"group": "Micro-skills", "path": "skill_transformer.stage_count", "type": "int", "min": 1, "max": 20, "label": "Stage count", "help": "Target number of staged references for long skills.", "example": 5},
{"group": "Graph", "path": "graph.min_edge_weight", "type": "float", "min": 0.0, "max": 1.0, "step": 0.01, "required": True, "label": "Minimum final edge weight", "help": "Edges below this blended score are dropped from graph.json during rebuild.", "example": 0.03},
{"group": "Graph", "path": "graph.edge_weights.semantic", "type": "float", "min": 0.0, "max": 1.0, "step": 0.01, "required": True, "label": "Semantic edge weight", "help": "Semantic portion of the blended edge score. Semantic/tags/slug tokens should sum to 1.", "example": 0.70},
{"group": "Graph", "path": "graph.edge_weights.tags", "type": "float", "min": 0.0, "max": 1.0, "step": 0.01, "required": True, "label": "Tag edge weight", "help": "Tag-overlap portion of the blended edge score.", "example": 0.15},
{"group": "Graph", "path": "graph.edge_weights.slug_tokens", "type": "float", "min": 0.0, "max": 1.0, "step": 0.01, "required": True, "label": "Slug-token edge weight", "help": "Slug-token overlap portion of the blended edge score.", "example": 0.15},
{"group": "Graph", "path": "graph.semantic.top_k", "type": "int", "min": 1, "max": 200, "label": "Semantic neighbors per entity", "help": "Maximum nearest semantic neighbors retained per entity during graph build.", "example": 20},
{"group": "Graph", "path": "graph.semantic.build_floor", "type": "float", "min": 0.0, "max": 1.0, "step": 0.01, "label": "Semantic build floor", "help": "Low inclusion bar used when graph embeddings are rebuilt.", "example": 0.50},
{"group": "Graph", "path": "graph.semantic.min_cosine", "type": "float", "min": 0.0, "max": 1.0, "step": 0.01, "required": True, "label": "Semantic display floor", "help": "Read-time semantic filter. Raising this is stricter without forcing a rebuild.", "example": 0.80},
{"group": "Graph", "path": "graph.tag_edges.dense_tag_threshold", "type": "int", "min": 1, "max": 10000, "label": "Dense tag cutoff", "help": "Tags shared by more than this many entities do not create broad noisy cliques.", "example": 500},
{"group": "Graph", "path": "graph.token_edges.dense_token_threshold", "type": "int", "min": 1, "max": 10000, "label": "Dense slug-token cutoff", "help": "Slug words shared by too many entities are ignored as edge creators.", "example": 30},
{"group": "Intake", "path": "intake.enabled", "type": "bool", "required": True, "label": "Intake quality gate", "help": "Runs duplicate/near-duplicate and body-quality checks when entities are added or updated.", "example": True},
{"group": "Intake", "path": "intake.dup_threshold", "type": "float", "min": 0.0, "max": 1.0, "step": 0.01, "label": "Duplicate threshold", "help": "Similarity at or above this is treated as a duplicate.", "example": 0.93},
{"group": "Intake", "path": "intake.near_dup_threshold", "type": "float", "min": 0.0, "max": 1.0, "step": 0.01, "label": "Near-duplicate threshold", "help": "Similarity at or above this asks the user to update/merge instead of blindly adding.", "example": 0.80},
{"group": "Paths", "path": "paths.wiki_dir", "type": "str", "required": True, "label": "Wiki directory", "help": "Runtime llm-wiki directory used by dashboard, graph, and recommendation flows.", "example": "~/.claude/skill-wiki"},
{"group": "Paths", "path": "paths.skills_dir", "type": "str", "required": True, "label": "Skills directory", "help": "Installed local skills directory.", "example": "~/.claude/skills"},
{"group": "Paths", "path": "paths.agents_dir", "type": "str", "required": True, "label": "Agents directory", "help": "Installed local agents directory.", "example": "~/.claude/agents"},
)
_CONFIG_REMOVE = object()
def _coerce_config_value(spec: dict[str, Any], raw_value: Any) -> Any:
if raw_value is None or (isinstance(raw_value, str) and raw_value.strip() == ""):
return _CONFIG_REMOVE
kind = spec.get("type", "str")
if kind == "bool":
if isinstance(raw_value, bool):
return raw_value
text = str(raw_value).strip().lower()
if text in {"true", "1", "yes", "on"}:
return True
if text in {"false", "0", "no", "off"}:
return False
raise ValueError(f"{spec['path']} must be true or false")
if kind == "int":
if isinstance(raw_value, bool):
raise ValueError(f"{spec['path']} must be an integer")
value: int | float = 0
value = int(raw_value)
elif kind == "float":
if isinstance(raw_value, bool):
raise ValueError(f"{spec['path']} must be a number")
value = float(raw_value)
elif kind == "choice":
choice_value = str(raw_value).strip()
if choice_value not in spec.get("choices", ()):
raise ValueError(f"{spec['path']} must be one of {spec.get('choices')}")
return choice_value
else:
text_value = str(raw_value).strip()
return text_value if text_value else _CONFIG_REMOVE
if "min" in spec and value < spec["min"]:
raise ValueError(f"{spec['path']} must be >= {spec['min']}")
if "max" in spec and value > spec["max"]:
raise ValueError(f"{spec['path']} must be <= {spec['max']}")
return value
def _effective_config_payload() -> dict[str, Any]:
defaults = _read_default_config_raw()
user = _read_user_config_raw()
effective = json.loads(json.dumps(defaults))
_deep_merge_config(effective, user)
return {
"defaults": defaults,
"user": user,
"effective": effective,
"path": str(_user_config_path()),
}
def _save_config_updates(updates: dict[str, Any]) -> dict[str, Any]:
specs = {spec["path"]: spec for spec in _config_field_specs()}
unknown = sorted(set(updates) - set(specs))
if unknown:
return {"ok": False, "detail": f"unknown config keys: {', '.join(unknown)}"}
user_config = _read_user_config_raw()
try:
for path, raw_value in updates.items():
value = _coerce_config_value(specs[path], raw_value)
if value is _CONFIG_REMOVE:
_delete_config_value(user_config, path)
else:
_set_config_value(user_config, path, value)
except (TypeError, ValueError) as exc:
return {"ok": False, "detail": str(exc)}
config_path = _user_config_path()
with file_lock(config_path):
_atomic_write_text(
config_path,
json.dumps(user_config, indent=2, sort_keys=True) + "\n",
)
return {"ok": True, "detail": f"saved {len(updates)} config keys"}
def _render_config() -> str:
payload = _effective_config_payload()
effective = payload["effective"]
user = payload["user"]
rows_by_group: dict[str, list[str]] = defaultdict(list)
for spec in _config_field_specs():
path = spec["path"]
value = _config_value(effective, path, "")
default = _config_value(payload["defaults"], path, "")
user_value = _config_value(user, path, _CONFIG_REMOVE)
is_override = user_value is not _CONFIG_REMOVE
required = bool(spec.get("required"))
req_html = " <span class='pill grade-A'>Required</span>" if required else ""
help_text = html.escape(str(spec.get("help", "")))
default_html = html.escape(json.dumps(default) if not isinstance(default, str) else default)
example_value = spec.get("example")
example_html = html.escape(json.dumps(example_value) if not isinstance(example_value, str) else str(example_value))
common_attrs = (
f"name='{html.escape(path)}' data-config-path='{html.escape(path)}' "
f"data-original-value='{html.escape(str(value))}' "
f"data-default='{default_html}' {'required' if required else ''}"
)
if spec.get("type") == "choice":
options = "".join(
f"<option value='{html.escape(str(choice))}' {'selected' if str(value) == str(choice) else ''}>"
f"{html.escape(str(choice))}</option>"
for choice in spec.get("choices", ())
)
control = f"<select {common_attrs}>{options}</select>"
elif spec.get("type") == "bool":
control = (
f"<select {common_attrs}>"
f"<option value='true' {'selected' if bool(value) else ''}>true</option>"
f"<option value='false' {'selected' if not bool(value) else ''}>false</option>"
"</select>"
)
elif spec.get("type") in {"int", "float"}:
step = spec.get("step", 1 if spec.get("type") == "int" else 0.01)
control = (
f"<input type='number' {common_attrs} "
f"min='{html.escape(str(spec.get('min', '')))}' "
f"max='{html.escape(str(spec.get('max', '')))}' "
f"step='{html.escape(str(step))}' "
f"value='{html.escape(str(value))}' placeholder='{default_html}'>"
)
else:
control = (
f"<input type='text' {common_attrs} "
f"value='{html.escape(str(value))}' placeholder='{default_html}'>"
)
override_html = (
"<span class='pill grade-B'>override</span>"
if is_override
else "<span class='muted'>default</span>"
)
clear_html = (
f"<label style='display:inline-flex; align-items:center; gap:0.35rem; "
f"margin-top:0.45rem;'>"
f"<input type='checkbox' data-config-clear='{html.escape(path)}'>"
"remove user override on save</label>"
if is_override
else ""
)
rows_by_group[str(spec["group"])].append(
"<div class='card' style='margin:0 0 0.75rem 0;'>"
f"<label><strong>{html.escape(str(spec['label']))}</strong>{req_html}<br>"
f"<code>{html.escape(path)}</code></label>"
f"<div style='margin-top:0.45rem;'>{control}</div>"
f"{clear_html}"
f"<p class='muted' style='margin-bottom:0;'>{help_text}<br>"
f"Default: <code>{default_html}</code> Β· Example: <code>{example_html}</code> Β· "
f"{override_html}</p>"
"</div>"
)
group_html = "".join(
"<section style='margin-bottom:1rem;'>"
f"<h2>{html.escape(group)}</h2>"
+ "".join(rows)
+ "</section>"
for group, rows in rows_by_group.items()
)
token = _MONITOR_TOKEN or ""
body = (
"<h1>Config</h1>"
"<p class='muted'>Edit ctx runtime defaults from the dashboard. Saves only changed fields. For existing overrides, use remove user override to fall back to the shipped default. Important fields are marked Required.</p>"
f"<p class='muted'>User config: <code>{html.escape(payload['path'])}</code></p>"
"<form id='config-form'>"
+ group_html
+ "<div class='card' style='position:sticky; bottom:0; background:rgba(255,255,255,0.96);'>"
"<button type='submit'>save config</button> "
"<button type='button' id='config-reset'>reset form to effective values</button> "
"<span id='config-msg' class='muted'></span>"
"</div></form>"
"<script>\n"
f"const CTX_MONITOR_TOKEN = {json.dumps(token)};\n"
"const form = document.getElementById('config-form');\n"
"const msg = document.getElementById('config-msg');\n"
"form.addEventListener('submit', async (ev) => {\n"
" ev.preventDefault();\n"
" const updates = {};\n"
" const clears = new Set(Array.from(form.querySelectorAll('[data-config-clear]:checked')).map(el => el.dataset.configClear));\n"
" form.querySelectorAll('[data-config-path]').forEach(el => {\n"
" const path = el.dataset.configPath;\n"
" if (clears.has(path)) updates[path] = '';\n"
" else if (String(el.value) !== String(el.dataset.originalValue || '')) updates[path] = el.value;\n"
" });\n"
" if (Object.keys(updates).length === 0) { msg.textContent = 'no config changes to save'; return; }\n"
" msg.textContent = 'saving...';\n"
" const r = await fetch('/api/config', {method:'POST', headers:{'Content-Type':'application/json', 'X-CTX-Monitor-Token':CTX_MONITOR_TOKEN}, body: JSON.stringify({updates})});\n"
" let body = {}; try { body = await r.json(); } catch (_) {}\n"
" msg.textContent = r.ok && body.ok ? body.detail : ('failed: ' + (body.detail || r.statusText));\n"
"});\n"
"document.getElementById('config-reset').addEventListener('click', () => location.reload());\n"
"</script>"
)
return _layout("Config", body)
def _render_graph(focus: str | None = None, focus_type: str | None = None) -> str:
"""Interactive graph view backed by a dependency-free SVG renderer."""
focus_slug = focus or ""
gstats = _graph_stats()
seeds = (
_top_degree_seeds(allow_load=False)
if not focus_slug and gstats.get("available")
else []
)
initial_slug = focus_slug
initial_type = focus_type or ""
if not initial_slug and seeds:
initial_slug = str(seeds[0].get("slug") or "")
initial_type = str(seeds[0].get("type") or "")
elif not initial_slug and gstats.get("available"):
initial_slug = _DEFAULT_GRAPH_FOCUS_SLUG
focus_js = _json_for_script(initial_slug)
focus_type_js = _json_for_script(initial_type)
seed_html = ""
if seeds:
chips = "".join(
f"<a href='/graph?slug={html.escape(s['slug'])}&amp;type={html.escape(s['type'])}' "
f"style='display:inline-block; margin:0.2rem 0.25rem; padding:0.25rem 0.6rem; "
f"border-radius:999px; background:{'#fef3c7' if s['type']=='agent' else '#fee2e2' if s['type']=='mcp-server' else '#dcfce7' if s['type']=='harness' else '#e0e7ff'}; "
f"color:#111; font-size:0.82rem; text-decoration:none;'>"
f"<code style='background:transparent;'>{html.escape(s['slug'])}</code> "
f"<span class='muted' style='font-size:0.72rem;'>Β· deg {_format_count(s['degree'])}</span>"
f"</a>"
for s in seeds
)
seed_html = (
"<div class='card'><strong>Popular seed slugs</strong> "
"<span class='muted' style='font-size:0.8rem;'>"
"(click to explore 1-hop neighborhood)</span>"
f"<div style='margin-top:0.4rem;'>{chips}</div></div>"
)
stats_html = (
f"<span class='muted'>{gstats.get('nodes', 0):,} nodes Β· "
f"{gstats.get('edges', 0):,} edges</span>"
)
body = (
"<h1>Knowledge graph</h1>"
f"<p class='muted'>Enter an entity slug to explore its 1-hop "
f"neighborhood. Edges blend semantic + tag + slug-token "
f"signals (weight = final_weight). {stats_html}</p>"
+ seed_html
# Two-column layout β€” filter sidebar on the left (mirrors /wiki),
# graph list on the right. Client-side JS hides nodes by
# type + tag without hitting the server so a user can carve out
# a subgraph without rebuilding anything.
+ "<div style='display:grid; grid-template-columns:240px 1fr; "
"gap:1rem; align-items:start; margin-top:1rem;'>"
# Left sidebar
"<aside style='position:sticky; top:1rem;'>"
"<div class='card'><strong>Focus</strong>"
"<input type='text' id='focus' "
"placeholder='skill / agent / mcp / harness slug' "
f"value='{html.escape(initial_slug)}' "
"style='width:100%; margin-top:0.4rem; padding:0.35rem 0.5rem; "
"border:1px solid #ccc; border-radius:4px;'>"
"<select id='focus-type' "
"style='width:100%; margin-top:0.4rem; padding:0.35rem 0.5rem; "
"border:1px solid #ccc; border-radius:4px;'>"
"<option value=''>auto type</option>"
f"<option value='skill' {'selected' if initial_type == 'skill' else ''}>skill</option>"
f"<option value='agent' {'selected' if initial_type == 'agent' else ''}>agent</option>"
f"<option value='mcp-server' {'selected' if initial_type == 'mcp-server' else ''}>mcp-server</option>"
f"<option value='harness' {'selected' if initial_type == 'harness' else ''}>harness</option>"
"</select>"
"<button id='go' style='margin-top:0.4rem; width:100%;'>"
"explore</button></div>"
"<div class='card'><strong>Type</strong>"
"<label style='display:flex; justify-content:space-between; padding:0.25rem 0;'>"
"<span><input type='checkbox' class='graph-type-filter' value='skill' checked> skill</span>"
"<span class='muted' id='graph-count-skill' style='font-size:0.78rem;'>β€”</span></label>"
"<label style='display:flex; justify-content:space-between; padding:0.25rem 0;'>"
"<span><input type='checkbox' class='graph-type-filter' value='agent' checked> agent</span>"
"<span class='muted' id='graph-count-agent' style='font-size:0.78rem;'>β€”</span></label>"
"<label style='display:flex; justify-content:space-between; padding:0.25rem 0;'>"
"<span><input type='checkbox' class='graph-type-filter' value='mcp-server' checked> mcp-server</span>"
"<span class='muted' id='graph-count-mcp-server' style='font-size:0.78rem;'>β€”</span></label>"
"<label style='display:flex; justify-content:space-between; padding:0.25rem 0;'>"
"<span><input type='checkbox' class='graph-type-filter' value='harness' checked> harness</span>"
"<span class='muted' id='graph-count-harness' style='font-size:0.78rem;'>-</span></label>"
"</div>"
"<div class='card'><strong>Tag filter</strong>"
"<input type='text' id='tag-filter' "
"placeholder='shared_tag or slug_token' "
"style='width:100%; margin-top:0.4rem; padding:0.3rem 0.5rem; "
"border:1px solid #ccc; border-radius:4px;'>"
"<p class='muted' style='font-size:0.72rem; margin:0.4rem 0 0 0;'>"
"Filters nodes by tag substring (client-side).</p></div>"
"<div class='card'>"
"<span id='graph-match-count' class='muted'>β€”</span>"
"</div>"
"<div class='card'><span id='msg' class='muted'></span></div>"
"<div class='card'><strong>Why this view?</strong>"
"<p id='graph-explanation' class='muted' "
"style='font-size:0.78rem; margin:0.45rem 0 0 0;'>"
"Search a slug to see why ctx picked this neighborhood and how to read it."
"</p></div>"
"</aside>"
# Right: graph list panel
"<div id='cy' style='width:100%; height:75vh; border:1px solid #ddd; "
"border-radius:6px; background:#fafafa;'></div>"
"</div>"
"<script>\n"
f"const initial = {focus_js};\n"
f"const initialType = {focus_type_js};\n"
"const cyMount = document.getElementById('cy');\n"
"function nodeColor(type) {\n"
" if (type === 'agent') return '#f59e0b';\n"
" if (type === 'mcp-server') return '#ef4444';\n"
" if (type === 'harness') return '#22c55e';\n"
" return '#6366f1';\n"
"}\n"
"function escapeHtml(s) { return String(s).replace(/[&<>\"']/g, ch => ({'&':'&amp;','<':'&lt;','>':'&gt;','\"':'&quot;',\"'\":'&#39;'}[ch])); }\n"
"function fmtCount(n) { return Number(n || 0).toLocaleString(); }\n"
"function rawNodeSlug(id) { return String(id || '').replace(/^(skill|agent|mcp-server|harness):/, ''); }\n"
"function displaySlug(slug) { return String(slug || '').replace(/^skills-sh-/, ''); }\n"
"function nodeSlug(id) { return displaySlug(rawNodeSlug(id)); }\n"
"function nodeDomId(id) { return 'graph-node-' + String(id || '').replace(/[^a-zA-Z0-9_-]/g, '_'); }\n"
"function wikiHref(data) {\n"
" const nodeType = data.type || '';\n"
" const suffix = nodeType ? '?type=' + encodeURIComponent(nodeType) : '';\n"
" return '/wiki/' + encodeURIComponent(rawNodeSlug(data.id)) + suffix;\n"
"}\n"
"function renderFallback(g) {\n"
" const nodes = g.nodes || [];\n"
" const rows = nodes.map(n => {\n"
" const d = n.data || {};\n"
" const tags = Array.from(d.filter_tokens || d.tags || []).join(' ');\n"
" const typeKey = ['skill', 'agent', 'mcp-server', 'harness'].includes(d.type) ? d.type : 'entity';\n"
" return '<a data-testid=\"graph-fallback-node\" class=\"graph-fallback-node\" '\n"
" + 'data-node-id=\"' + escapeHtml(d.id || '') + '\" '\n"
" + 'data-slug=\"' + escapeHtml(nodeSlug(d.id)) + '\" '\n"
" + 'data-type=\"' + escapeHtml(d.type || '') + '\" '\n"
" + 'data-depth=\"' + escapeHtml(d.depth || 0) + '\" '\n"
" + 'data-tags=\"' + escapeHtml(tags.toLowerCase()) + '\" '\n"
" + 'href=\"' + escapeHtml(wikiHref(d)) + '\" '\n"
" + 'style=\"display:flex; justify-content:space-between; gap:0.75rem; padding:0.45rem 0.6rem; border-bottom:1px solid #e5e7eb; color:inherit; text-decoration:none;\">'\n"
" + '<code>' + escapeHtml(d.label || nodeSlug(d.id)) + '</code>'\n"
" + '<span class=\"pill entity-type-' + escapeHtml(typeKey) + '\">' + escapeHtml(d.type || 'entity') + '</span></a>';\n"
" }).join('');\n"
" cyMount.innerHTML = '<div data-testid=\"graph-fallback\" style=\"padding:0.75rem; height:100%; overflow:auto;\">'\n"
" + '<div class=\"muted\" style=\"margin-bottom:0.5rem;\">Showing list view.</div>'\n"
" + rows + '</div>';\n"
"}\n"
"function nodeDetail(d) {\n"
" const tags = Array.from(d.tags || []).join(', ') || 'none';\n"
" const quality = d.quality_score == null ? 'unknown' : Number(d.quality_score).toFixed(3);\n"
" const usage = d.usage_score == null ? 'unknown' : Number(d.usage_score).toFixed(3);\n"
" const size = d.node_size == null ? 'auto' : Number(d.node_size).toFixed(1);\n"
" const sizeReason = d.size_reason ? ' Β· size: ' + size + ' (' + d.size_reason + ')' : ' Β· size: ' + size;\n"
" const desc = d.description ? ' Β· ' + d.description : '';\n"
" return (d.label || nodeSlug(d.id)) + ' Β· ' + (d.type || 'entity') + desc + ' Β· tags: ' + tags + ' Β· quality: ' + quality + ' Β· usage: ' + usage + sizeReason;\n"
"}\n"
"function nodeRadius(d, isCenter, scale) {\n"
" const base = Math.max(isCenter ? 14 : 8, Math.min(24, Number(d.node_size || (isCenter ? 16 : 11))));\n"
" const perspective = Math.max(0.8, Math.min(1.2, Number(scale || 1)));\n"
" return Math.max(8, Math.min(28, base * perspective));\n"
"}\n"
"function edgeDetail(d) {\n"
" const tags = Array.from(d.shared_tags || []).join(', ') || 'none';\n"
" const reasons = Array.from(d.reasons || []).join(', ') || 'graph score';\n"
" const weight = Number(d.weight || 0).toFixed(3);\n"
" return nodeSlug(d.source) + ' ↔ ' + nodeSlug(d.target) + ' Β· weight ' + weight + ' Β· shared: ' + tags + ' Β· reasons: ' + reasons;\n"
"}\n"
"function graphExplanation(g) {\n"
" const e = g.explanations || {};\n"
" return [e.focus, e.source, e.search, e.layout, e.edges].filter(Boolean).join(' ');\n"
"}\n"
"function renderGraph3d(g) {\n"
" const nodes = g.nodes || [];\n"
" const edges = g.edges || [];\n"
" if (!nodes.length) { renderFallback(g); return; }\n"
" const width = Math.max(640, Math.floor(cyMount.clientWidth || 800));\n"
" const height = Math.max(420, Math.floor(cyMount.clientHeight || 520));\n"
" const center = nodes.find(n => (n.data || {}).depth === 0) || nodes[0];\n"
" const centerId = (center.data || {}).id;\n"
" const points = new Map([[centerId, {x: 0, y: 0, z: 0}]]);\n"
" const others = nodes.filter(n => (n.data || {}).id !== centerId);\n"
" others.forEach((n, idx) => {\n"
" const d = n.data || {};\n"
" const i = idx + 1;\n"
" const phi = Math.acos(1 - 2 * i / Math.max(2, others.length + 1));\n"
" const theta = Math.PI * (3 - Math.sqrt(5)) * i;\n"
" const depth = Math.max(1, Number(d.depth || 1));\n"
" const radius = 180 + depth * 90;\n"
" points.set(d.id, {x: radius * Math.cos(theta) * Math.sin(phi), y: radius * Math.sin(theta) * Math.sin(phi), z: radius * Math.cos(phi)});\n"
" });\n"
" renderFallback(g);\n"
" const list = cyMount.querySelector('[data-testid=\"graph-fallback\"]');\n"
" const heading = list ? list.querySelector('.muted') : null;\n"
" if (heading) heading.remove();\n"
" const rows = list ? list.innerHTML : '';\n"
" cyMount.innerHTML = '<div data-testid=\"graph-renderer\" style=\"height:100%; display:grid; grid-template-rows:auto minmax(0,1fr) auto 30%;\">'\n"
" + '<div style=\"display:flex; align-items:center; gap:0.5rem; padding:0.45rem 0.6rem; border-bottom:1px solid #e5e7eb; background:#fff;\">'\n"
" + '<button id=\"graph-zoom-in\" type=\"button\">zoom in</button><button id=\"graph-zoom-out\" type=\"button\">zoom out</button>'\n"
" + '<span class=\"muted\">drag to rotate Β· wheel to zoom Β· hover nodes or edges</span></div>'\n"
" + '<svg data-testid=\"graph-3d\" viewBox=\"0 0 ' + width + ' ' + height + '\" style=\"display:block; width:100%; height:100%; min-height:0; background:#f8fafc; touch-action:none;\"></svg>'\n"
" + '<div style=\"display:grid; grid-template-columns:1fr 1fr; gap:0.5rem; padding:0.45rem 0.6rem; border-top:1px solid #e5e7eb; background:#fff;\">'\n"
" + '<div data-testid=\"graph-node-detail\" class=\"muted\">Hover a node for entity highlights.</div>'\n"
" + '<div data-testid=\"graph-edge-detail\" class=\"muted\">Hover an edge for relationship signals.</div></div>'\n"
" + '<div data-testid=\"graph-list\" style=\"overflow:auto; border-top:1px solid #e5e7eb;\">' + rows + '</div></div>';\n"
" const svg = cyMount.querySelector('[data-testid=\"graph-3d\"]');\n"
" const nodeDetailBox = cyMount.querySelector('[data-testid=\"graph-node-detail\"]');\n"
" const edgeDetailBox = cyMount.querySelector('[data-testid=\"graph-edge-detail\"]');\n"
" let yaw = -0.4;\n"
" let pitch = 0.55;\n"
" let zoom = 1;\n"
" function project(p) {\n"
" const cyaw = Math.cos(yaw), syaw = Math.sin(yaw);\n"
" const cp = Math.cos(pitch), sp = Math.sin(pitch);\n"
" const x1 = p.x * cyaw - p.z * syaw;\n"
" const z1 = p.x * syaw + p.z * cyaw;\n"
" const y1 = p.y * cp - z1 * sp;\n"
" const z2 = p.y * sp + z1 * cp;\n"
" const scale = zoom * 600 / (720 + z2);\n"
" return {x: width / 2 + x1 * scale, y: height / 2 + y1 * scale, z: z2, scale};\n"
" }\n"
" function attach3dHoverHandlers() {\n"
" svg.querySelectorAll('[data-node-detail]').forEach(n => n.addEventListener('mouseenter', () => { nodeDetailBox.textContent = n.dataset.nodeDetail || ''; }));\n"
" svg.querySelectorAll('[data-edge-detail]').forEach(e => e.addEventListener('mouseenter', () => { edgeDetailBox.textContent = e.dataset.edgeDetail || ''; }));\n"
" }\n"
" function drawGraph3d() {\n"
" const projected = new Map();\n"
" points.forEach((p, id) => projected.set(id, project(p)));\n"
" const edgeHtml = edges.map(e => {\n"
" const d = e.data || {};\n"
" const s = projected.get(d.source);\n"
" const t = projected.get(d.target);\n"
" if (!s || !t) return '';\n"
" const w = Math.max(1, Math.min(4, 1 + Math.sqrt(Math.max(0, Number(d.weight || 1)))));\n"
" return '<line data-testid=\"graph-svg-edge\" data-3d-edge-source=\"' + escapeHtml(d.source || '') + '\" data-3d-edge-target=\"' + escapeHtml(d.target || '') + '\" data-svg-edge-source=\"' + escapeHtml(d.source || '') + '\" data-svg-edge-target=\"' + escapeHtml(d.target || '') + '\" x1=\"' + s.x.toFixed(1) + '\" y1=\"' + s.y.toFixed(1) + '\" x2=\"' + t.x.toFixed(1) + '\" y2=\"' + t.y.toFixed(1) + '\" stroke=\"#64748b\" stroke-opacity=\"' + (0.35 + Math.min(0.4, Number(d.weight || 0) / 3)).toFixed(2) + '\" stroke-width=\"' + w.toFixed(2) + '\" />';\n"
" }).join('');\n"
" const nodeHtml = nodes.slice().sort((a, b) => (projected.get((a.data || {}).id)?.z || 0) - (projected.get((b.data || {}).id)?.z || 0)).map(n => {\n"
" const d = n.data || {};\n"
" const p = projected.get(d.id) || {x: width / 2, y: height / 2, z: 0, scale: 1};\n"
" const tags = Array.from(d.filter_tokens || d.tags || []).join(' ');\n"
" const label = d.label || nodeSlug(d.id);\n"
" const isCenter = d.id === centerId;\n"
" const r = nodeRadius(d, isCenter, p.scale);\n"
" return '<a href=\"' + escapeHtml(wikiHref(d)) + '\"><g data-testid=\"graph-3d-node\" id=\"' + escapeHtml(nodeDomId(d.id)) + '\" data-3d-node-id=\"' + escapeHtml(d.id || '') + '\" data-node-detail=\"' + escapeHtml(nodeDetail(d)) + '\" data-type=\"' + escapeHtml(d.type || '') + '\" data-depth=\"' + escapeHtml(d.depth || 0) + '\" data-tags=\"' + escapeHtml(tags.toLowerCase()) + '\"><title>' + escapeHtml(nodeDetail(d)) + '</title><circle data-testid=\"graph-svg-node\" cx=\"' + p.x.toFixed(1) + '\" cy=\"' + p.y.toFixed(1) + '\" r=\"' + r + '\" fill=\"' + nodeColor(d.type) + '\" stroke=\"#fff\" stroke-width=\"2\" /><text x=\"' + p.x.toFixed(1) + '\" y=\"' + (p.y + r + 14).toFixed(1) + '\" text-anchor=\"middle\" font-size=\"11\" fill=\"#111827\" style=\"pointer-events:none;\">' + escapeHtml(label).slice(0, 28) + '</text></g></a>';\n"
" }).join('');\n"
" const edgeHitHtml = edges.map(e => {\n"
" const d = e.data || {};\n"
" const s = projected.get(d.source);\n"
" const t = projected.get(d.target);\n"
" if (!s || !t) return '';\n"
" const hx1 = s.x + (t.x - s.x) * 0.18, hy1 = s.y + (t.y - s.y) * 0.18;\n"
" const hx2 = s.x + (t.x - s.x) * 0.82, hy2 = s.y + (t.y - s.y) * 0.82;\n"
" return '<line data-testid=\"graph-3d-edge\" data-3d-edge-source=\"' + escapeHtml(d.source || '') + '\" data-3d-edge-target=\"' + escapeHtml(d.target || '') + '\" data-svg-edge-source=\"' + escapeHtml(d.source || '') + '\" data-svg-edge-target=\"' + escapeHtml(d.target || '') + '\" data-edge-detail=\"' + escapeHtml(edgeDetail(d)) + '\" x1=\"' + hx1.toFixed(1) + '\" y1=\"' + hy1.toFixed(1) + '\" x2=\"' + hx2.toFixed(1) + '\" y2=\"' + hy2.toFixed(1) + '\" stroke=\"transparent\" stroke-width=\"12\" style=\"pointer-events:stroke;\"><title>' + escapeHtml(edgeDetail(d)) + '</title></line>';\n"
" }).join('');\n"
" svg.innerHTML = '<rect width=\"100%\" height=\"100%\" fill=\"#f8fafc\" />' + edgeHtml + nodeHtml + edgeHitHtml;\n"
" attach3dHoverHandlers();\n"
" applyFilters();\n"
" }\n"
" document.getElementById('graph-zoom-in').addEventListener('click', () => { zoom = Math.min(2.5, zoom * 1.18); drawGraph3d(); });\n"
" document.getElementById('graph-zoom-out').addEventListener('click', () => { zoom = Math.max(0.35, zoom / 1.18); drawGraph3d(); });\n"
" let dragging = false, lastX = 0, lastY = 0;\n"
" svg.addEventListener('pointerdown', ev => { dragging = true; lastX = ev.clientX; lastY = ev.clientY; svg.setPointerCapture(ev.pointerId); });\n"
" svg.addEventListener('pointerup', ev => { dragging = false; try { svg.releasePointerCapture(ev.pointerId); } catch (_) {} });\n"
" svg.addEventListener('pointermove', ev => { if (!dragging) return; yaw += (ev.clientX - lastX) * 0.01; pitch += (ev.clientY - lastY) * 0.01; pitch = Math.max(-1.35, Math.min(1.35, pitch)); lastX = ev.clientX; lastY = ev.clientY; drawGraph3d(); });\n"
" svg.addEventListener('wheel', ev => { ev.preventDefault(); zoom = Math.max(0.35, Math.min(2.5, zoom * (ev.deltaY < 0 ? 1.08 : 0.92))); drawGraph3d(); }, {passive:false});\n"
" drawGraph3d();\n"
"}\n"
"cyMount.innerHTML = '<div data-testid=\"graph-empty\" class=\"muted\" style=\"padding:0.75rem;\">Enter a slug to render the graph.</div>';\n"
# ── Client-side filtering (type + tag substring) ─────────────
"function applyFilters() {\n"
" const allowedTypes = new Set(\n"
" Array.from(document.querySelectorAll('.graph-type-filter'))\n"
" .filter(cb => cb.checked).map(cb => cb.value));\n"
" const tagQ = (document.getElementById('tag-filter').value || '')\n"
" .trim().toLowerCase();\n"
" const counts = {skill: 0, agent: 0, 'mcp-server': 0, harness: 0};\n"
" let visible = 0;\n"
" const hiddenIds = new Set();\n"
" document.querySelectorAll('[data-testid=\"graph-fallback-node\"]').forEach(n => {\n"
" const t = n.dataset.type;\n"
" const isFocus = n.dataset.depth === '0';\n"
" const tags = (n.dataset.tags || '').split(/\\s+/).map(x => x.toLowerCase());\n"
" const typeOk = isFocus || allowedTypes.has(t);\n"
" const tagOk = isFocus || !tagQ || tags.some(tag => tag.includes(tagQ));\n"
" const hidden = !(typeOk && tagOk);\n"
" n.style.display = hidden ? 'none' : 'flex';\n"
" if (hidden) hiddenIds.add(n.dataset.nodeId || '');\n"
" if (!hidden) {\n"
" visible++;\n"
" if (t in counts) counts[t]++;\n"
" }\n"
" });\n"
" document.querySelectorAll('[data-3d-node-id]').forEach(n => {\n"
" n.style.display = hiddenIds.has(n.dataset['3dNodeId'] || '') ? 'none' : '';\n"
" });\n"
" const edgeEls = new Set([...document.querySelectorAll('[data-3d-edge-source]'), ...document.querySelectorAll('[data-svg-edge-source]')]);\n"
" edgeEls.forEach(e => {\n"
" const source = e.dataset['3dEdgeSource'] || e.dataset.svgEdgeSource || '';\n"
" const target = e.dataset['3dEdgeTarget'] || e.dataset.svgEdgeTarget || '';\n"
" const hidden = hiddenIds.has(source) || hiddenIds.has(target);\n"
" e.style.display = hidden ? 'none' : '';\n"
" });\n"
" document.getElementById('graph-count-skill').textContent = fmtCount(counts.skill);\n"
" document.getElementById('graph-count-agent').textContent = fmtCount(counts.agent);\n"
" document.getElementById('graph-count-mcp-server').textContent = fmtCount(counts['mcp-server']);\n"
" document.getElementById('graph-count-harness').textContent = fmtCount(counts.harness);\n"
" document.getElementById('graph-match-count').textContent = fmtCount(visible) + ' visible';\n"
"}\n"
"async function load(slug, entityType = '') {\n"
" if (!slug) return;\n"
" document.getElementById('msg').textContent = 'loading…';\n"
" const suffix = entityType ? '?type=' + encodeURIComponent(entityType) : '';\n"
" const r = await fetch('/api/graph/' + encodeURIComponent(slug) + '.json' + suffix);\n"
" if (!r.ok) { document.getElementById('msg').textContent = 'not found'; return; }\n"
" const g = await r.json();\n"
" if (!g.center) { document.getElementById('msg').textContent = 'slug not in graph'; return; }\n"
" try { renderGraph3d(g); } catch (err) { renderFallback(g); }\n"
" const resolved = g.resolved && g.resolved.query !== g.resolved.slug ? ' Β· showing ' + g.resolved.slug + ' for ' + g.resolved.query : '';\n"
" document.getElementById('msg').textContent = fmtCount(g.nodes.length) + ' nodes Β· ' + fmtCount(g.edges.length) + ' edges' + resolved;\n"
" document.getElementById('graph-explanation').textContent = graphExplanation(g);\n"
" applyFilters();\n"
"}\n"
"function selectedFocusType() { return document.getElementById('focus-type').value || ''; }\n"
"document.getElementById('go').addEventListener('click', () => load(document.getElementById('focus').value.trim(), selectedFocusType()));\n"
"document.getElementById('focus').addEventListener('keydown', (ev) => { if (ev.key === 'Enter') load(ev.target.value.trim(), selectedFocusType()); });\n"
"document.querySelectorAll('.graph-type-filter').forEach(cb => cb.addEventListener('change', applyFilters));\n"
"document.getElementById('tag-filter').addEventListener('input', applyFilters);\n"
"if (initial) load(initial, initialType);\n"
"</script>"
)
return _layout("Graph", body)
def _runtime_graph_center_data(graph: dict) -> dict[str, Any] | None:
center = str(graph.get("center") or "")
if not center:
return None
for node in graph.get("nodes", []):
if not isinstance(node, dict):
continue
data = node.get("data", {})
if isinstance(data, dict) and str(data.get("id") or "") == center:
return data
return None
def _runtime_graph_metric_row(label: str, value: object) -> str:
if value is None or value == "":
value_html = "<span class='muted'>unknown</span>"
elif isinstance(value, float):
value_html = f"<code>{value:.3f}</code>"
else:
value_html = f"<code>{html.escape(str(value))}</code>"
return f"<tr><td class='muted'>{html.escape(label)}</td><td>{value_html}</td></tr>"
def _render_runtime_entity_action(
slug: str,
entity_type: str,
*,
mutations_enabled: bool,
) -> str:
escaped_slug = html.escape(slug)
escaped_type = html.escape(entity_type)
if entity_type == "harness":
return (
"<div class='card'>"
"<h2>Install harness</h2>"
"<p class='muted'>Harnesses are installed through the harness CLI so ctx can "
"collect the model, goal, and verification details before wiring recommendations.</p>"
f"<pre><code>ctx-harness-install {escaped_slug} --dry-run\n"
f"ctx-harness-install {escaped_slug}</code></pre>"
"</div>"
)
disabled = " disabled" if not mutations_enabled else ""
disabled_note = (
"<p class='muted'>Load/install actions are disabled because this dashboard is not "
"bound to loopback.</p>"
if not mutations_enabled
else ""
)
return (
"<div class='card'>"
"<h2>Load or install</h2>"
"<p class='muted'>Use this when the backing wiki contains the installable entity. "
"If runtime mode only installed graph metadata, install the full wiki first.</p>"
f"<button type='button' class='action-btn' data-testid='runtime-entity-load' "
f"data-runtime-slug='{escaped_slug}' data-runtime-type='{escaped_type}'{disabled}>"
"Load / install from current wiki</button>"
f"{disabled_note}"
"<p id='runtime-entity-load-result' class='muted'></p>"
"</div>"
)
def _render_runtime_entity_load_script(
slug: str,
entity_type: str,
*,
mutations_enabled: bool,
) -> str:
return (
"<script>\n"
f"const CTX_RUNTIME_ENTITY_MUTATIONS_ENABLED = {json.dumps(mutations_enabled)};\n"
f"const CTX_RUNTIME_ENTITY_TOKEN = {json.dumps(_MONITOR_TOKEN if mutations_enabled else '')};\n"
f"const CTX_RUNTIME_ENTITY_SLUG = {json.dumps(slug)};\n"
f"const CTX_RUNTIME_ENTITY_TYPE = {json.dumps(entity_type)};\n"
"document.querySelectorAll('[data-testid=\"runtime-entity-load\"]').forEach(function (button) {\n"
" button.addEventListener('click', async function () {\n"
" const result = document.getElementById('runtime-entity-load-result');\n"
" if (!CTX_RUNTIME_ENTITY_MUTATIONS_ENABLED) {\n"
" if (result) result.textContent = 'mutations disabled on non-loopback bind';\n"
" return;\n"
" }\n"
" button.disabled = true;\n"
" if (result) result.textContent = 'loading...';\n"
" try {\n"
" const response = await fetch('/api/load', {\n"
" method: 'POST',\n"
" headers: {'Content-Type': 'application/json', 'X-CTX-Monitor-Token': CTX_RUNTIME_ENTITY_TOKEN},\n"
" body: JSON.stringify({slug: CTX_RUNTIME_ENTITY_SLUG, entity_type: CTX_RUNTIME_ENTITY_TYPE})\n"
" });\n"
" const payload = await response.json();\n"
" const message = payload.detail || payload.msg || response.status;\n"
" if (result) result.textContent = (payload.ok ? 'loaded: ' : 'not loaded: ') + message;\n"
" } catch (error) {\n"
" if (result) result.textContent = 'load failed: ' + error;\n"
" } finally {\n"
" button.disabled = false;\n"
" }\n"
" });\n"
"});\n"
"</script>"
)
def _render_runtime_graph_entity(
slug: str,
entity_type: str | None = None,
*,
mutations_enabled: bool | None = None,
) -> str | None:
"""Render graph metadata when the fast runtime graph lacks a full wiki page."""
normalized_type = _normalize_dashboard_entity_type(entity_type) if entity_type else None
if entity_type is not None and normalized_type is None:
return None
graph = _graph_neighborhood(slug, hops=1, limit=32, entity_type=normalized_type)
data = _runtime_graph_center_data(graph)
if data is None:
return None
node_id = str(data.get("id") or graph.get("center") or "")
resolved_slug = _graph_slug_from_node_id(node_id) or slug
resolved_type = _graph_type_from_node_id(
node_id,
str(data.get("type") or normalized_type or "skill"),
)
label = _display_label(data.get("label"), fallback_slug=resolved_slug)
display_slug = _display_slug(resolved_slug)
description = str(data.get("description") or "").strip()
tags = [str(tag) for tag in data.get("tags", []) if str(tag).strip()][:12]
sidecar = _load_sidecar(resolved_slug, entity_type=resolved_type)
quality_score = data.get("quality_score")
usage_score = data.get("usage_score")
degree = data.get("degree")
mutations = _MONITOR_MUTATIONS_ENABLED if mutations_enabled is None else mutations_enabled
tag_html = (
"".join(f"<span class='pill'>{html.escape(tag)}</span> " for tag in tags)
if tags
else "<span class='muted'>no tags in runtime graph</span>"
)
description_html = (
f"<p>{html.escape(description)}</p>"
if description
else "<p class='muted'>No description is present in the runtime graph metadata.</p>"
)
quality_summary = (
"<div class='card'>"
"<strong>Runtime graph entity</strong> "
f"<span class='pill entity-type-{html.escape(resolved_type)}'>{html.escape(resolved_type)}</span> "
f"<span class='muted'>node <code>{html.escape(node_id)}</code></span>"
"<div style='margin-top:0.4rem;'>"
"<a href='#subgraph' data-open-entity-tab='subgraph'>graph neighborhood &rarr;</a> &middot; "
"<a href='#quality' data-open-entity-tab='quality'>quality drilldown &rarr;</a>"
"</div></div>"
)
overview_html = (
"<div class='wiki-entity-grid'>"
"<div class='card wiki-body'>"
"<h2>Runtime graph entity</h2>"
+ description_html
+ "<h3>Tags</h3>"
+ f"<p>{tag_html}</p>"
+ "<h3>Full wiki page</h3>"
+ "<p class='muted'>This entity exists in the installed runtime graph, but its full "
"Markdown wiki page is not expanded locally. The graph and recommendation paths still "
"work. Install the full wiki when you want the complete body/docs in this dashboard.</p>"
+ "<pre><code>ctx-init --graph --graph-install-mode full</code></pre>"
+ "</div>"
"<div class='card'><strong>Runtime metadata</strong>"
"<table class='frontmatter-table'><tr><th>Field</th><th>Value</th></tr>"
+ _runtime_graph_metric_row("slug", display_slug)
+ _runtime_graph_metric_row("type", resolved_type)
+ _runtime_graph_metric_row("node_id", node_id)
+ _runtime_graph_metric_row("quality_score", quality_score)
+ _runtime_graph_metric_row("usage_score", usage_score)
+ _runtime_graph_metric_row("degree", degree)
+ "</table></div>"
"</div>"
+ _render_runtime_entity_action(
resolved_slug,
resolved_type,
mutations_enabled=mutations,
)
)
quality_html = (
_render_quality_drilldown(sidecar)
if isinstance(sidecar, dict)
else (
"<div class='card'>"
"<h2>Runtime graph quality</h2>"
"<p class='muted'>No full quality sidecar is installed for this entity. "
"The runtime graph still exposes the ranking signals available at graph build time.</p>"
"<table class='frontmatter-table'><tr><th>Signal</th><th>Value</th></tr>"
+ _runtime_graph_metric_row("quality_score", quality_score)
+ _runtime_graph_metric_row("usage_score", usage_score)
+ _runtime_graph_metric_row("degree", degree)
+ "</table></div>"
)
)
body = (
f"<h1>{html.escape(label)}</h1>"
+ quality_summary
+ _render_entity_tabs(
overview_html=overview_html,
subgraph_html=_render_entity_subgraph(resolved_slug, entity_type=resolved_type),
quality_html=quality_html,
)
+ _render_runtime_entity_load_script(
resolved_slug,
resolved_type,
mutations_enabled=mutations,
)
)
return _layout(label, body)
def _render_wiki_entity(
slug: str,
entity_type: str | None = None,
*,
mutations_enabled: bool | None = None,
) -> str:
"""Render one wiki entity page (frontmatter + body)."""
path = _wiki_entity_path(slug, entity_type=entity_type)
if path is None:
runtime_html = _render_runtime_graph_entity(
slug,
entity_type=entity_type,
mutations_enabled=mutations_enabled,
)
if runtime_html is not None:
return runtime_html
return _layout(
slug,
f"<h1>{html.escape(slug)}</h1>"
f"<p class='muted'>No wiki page found for <code>{html.escape(slug)}</code>. "
f"Try <a href='/skills'>the skills index</a>.</p>",
)
try:
raw = path.read_text(encoding="utf-8", errors="replace")
except OSError as exc:
return _layout(
slug,
f"<h1>{html.escape(slug)}</h1><p class='muted'>read error: {html.escape(str(exc))}</p>",
)
meta, md_body = _parse_frontmatter(raw)
sidecar = _load_sidecar(slug, entity_type=entity_type)
display_slug = _display_slug(slug)
type_suffix = (
f"&amp;type={html.escape(entity_type)}"
if entity_type in _DASHBOARD_ENTITY_TYPES
else ""
)
fm_row_parts = []
for k, v in sorted(meta.items()):
value, truncated = _truncate_text(_frontmatter_text(v), 120)
marker = " <span class='muted'>(truncated)</span>" if truncated else ""
fm_row_parts.append(
f"<tr><td class='muted'>{html.escape(k)}</td>"
f"<td><code>{html.escape(value)}</code>{marker}</td></tr>"
)
fm_rows = "".join(fm_row_parts)
quality_summary_html = ""
if sidecar is not None:
quality_summary_html = (
"<div class='card'>"
f"<strong>Quality</strong> <span class='pill grade-{html.escape(sidecar.get('grade', 'F'))}'>"
f"{html.escape(sidecar.get('grade', 'F'))}</span> "
f"score <strong>{sidecar.get('raw_score', 0.0):.3f}</strong>"
f"{' &middot; floor ' + html.escape(sidecar.get('hard_floor','')) if sidecar.get('hard_floor') else ''}"
f"<div style='margin-top:0.4rem;'>"
"<a href='#quality' data-open-entity-tab='quality'>quality drilldown &rarr;</a> &middot; "
f"<a href='/skill/{html.escape(slug)}?type={html.escape(entity_type or '')}'>sidecar detail &rarr;</a> &middot; "
f"<a href='/graph?slug={html.escape(slug)}{type_suffix}'>graph neighborhood &rarr;</a>"
"</div></div>"
)
md_body_without_quality, embedded_quality_markdown = _extract_embedded_quality_block(md_body)
display_body = _strip_duplicate_wiki_heading(md_body_without_quality, slug)
body_preview, body_truncated = _truncate_text(display_body, 12000)
body_html = _render_wiki_markdown(body_preview)
body_truncated_html = (
"<p class='muted'>Body preview truncated at 12,000 characters.</p>"
if body_truncated
else ""
)
overview_html = (
"<div class='wiki-entity-grid'>"
f"<div class='card wiki-body'>{body_html}"
f"{body_truncated_html}</div>"
f"<div class='card'><strong>Frontmatter</strong>"
"<table class='frontmatter-table'>"
"<tr><th>Field</th><th>Value</th></tr>"
+ (fm_rows or "<tr><td class='muted' colspan='2'>none</td></tr>")
+ "</table></div>"
"</div>"
)
subgraph_html = _render_entity_subgraph(slug, entity_type=entity_type)
quality_html = _render_quality_drilldown(sidecar, embedded_quality_markdown)
body = (
f"<h1>{html.escape(display_slug)}</h1>"
+ quality_summary_html
+ _render_entity_tabs(
overview_html=overview_html,
subgraph_html=subgraph_html,
quality_html=quality_html,
)
)
return _layout(display_slug, body)
def _wiki_index_entries(
limit_per_type: int | None = _WIKI_INDEX_LIMIT_PER_TYPE,
) -> list[dict]:
"""List every wiki entity page under ~/.claude/skill-wiki/entities/.
Returns ``{slug, type, tags, description}`` rows. The full skill inventory
is too large to render as one HTML page, so the dashboard samples
a bounded number of pages per entity type.
"""
indexed = _wiki_index_entries_from_dashboard_index(limit_per_type)
if indexed is not None:
return indexed
base = _wiki_dir() / "entities"
if not base.is_dir():
return []
# MCPs are sharded (one dir per first-char) so we glob recursively;
# all other dashboard entity types are flat.
sources = _DASHBOARD_ENTITY_SOURCES
out: list[dict] = []
for sub, entity_type, recursive in sources:
d = base / sub
if not d.is_dir():
continue
paths = sorted(
d.rglob("*.md") if recursive else d.glob("*.md"),
key=lambda path: (path.stem.lower(), path.relative_to(d).as_posix().lower()),
)
seen_for_type = 0
for path in paths:
if limit_per_type is not None and seen_for_type >= limit_per_type:
break
slug = path.stem
if not _is_safe_slug(slug):
continue
try:
# Read only the first ~2 KB β€” enough for frontmatter.
head = path.read_text(encoding="utf-8", errors="replace")[:2048]
except OSError:
continue
meta, _ = _parse_frontmatter(head)
all_tags = _frontmatter_tags(meta.get("tags", ""), limit=None)
description, _truncated = _truncate_text(
_frontmatter_text(meta.get("description", "")),
200,
)
out.append({
"slug": slug,
"display_slug": _display_slug(slug),
"type": entity_type,
"tags": all_tags[:6],
"search_tags": all_tags,
"description": description,
})
seen_for_type += 1
return out
def _wiki_index_entries_from_dashboard_index(
limit_per_type: int | None,
) -> list[dict] | None:
index_path = _dashboard_graph_index_path()
if not index_path.is_file() or not _dashboard_index_matches_manifest(index_path):
return None
out: list[dict] = []
try:
conn = sqlite3.connect(f"file:{index_path.as_posix()}?mode=ro", uri=True)
try:
for _sub, entity_type, _recursive in _DASHBOARD_ENTITY_SOURCES:
params: list[Any] = [entity_type]
limit_sql = ""
if limit_per_type is not None:
limit_sql = " LIMIT ?"
params.append(max(0, int(limit_per_type)))
rows = conn.execute(
"SELECT id,label,type,tags,description,quality_score FROM nodes "
"WHERE type=? ORDER BY lower(label), id" + limit_sql,
params,
)
for (
node_id,
label,
row_type,
tags_raw,
description_raw,
quality_score,
) in rows:
node_id_text = str(node_id)
slug = (
node_id_text.split(":", 1)[1]
if ":" in node_id_text
else str(label)
)
if not _is_safe_slug(slug):
continue
try:
parsed_tags = json.loads(str(tags_raw or "[]"))
except json.JSONDecodeError:
parsed_tags = []
all_tags = [
str(tag) for tag in parsed_tags
if isinstance(tag, str)
]
description, _truncated = _truncate_text(
_frontmatter_text(description_raw),
200,
)
out.append({
"slug": slug,
"display_slug": _display_slug(str(label or slug)),
"type": str(row_type or entity_type),
"tags": all_tags[:6],
"search_tags": all_tags,
"description": description,
"grade": _grade_from_quality_score(quality_score),
})
finally:
conn.close()
except (OSError, sqlite3.Error, ValueError, TypeError):
return None
return out
def _grade_from_quality_score(value: Any) -> str:
try:
score = float(value)
except (TypeError, ValueError):
return ""
if score >= 0.80:
return "A"
if score >= 0.60:
return "B"
if score >= 0.40:
return "C"
if score >= 0.0:
return "D"
return ""
def _wiki_render_cache_key(
selected_type: str | None,
query: str,
) -> tuple[Any, ...] | None:
index_path = _dashboard_graph_index_path()
if not index_path.is_file() or not _dashboard_index_matches_manifest(index_path):
return None
try:
index_stat = index_path.stat()
source_stat = Path(__file__).stat()
except OSError:
return None
try:
css_hash = hashlib.sha256(
_monitor_asset_text("monitor.css").encode("utf-8")
).hexdigest()
except Exception:
css_hash = ""
return (
"wiki-index-v1",
selected_type or "",
query,
str(index_path.resolve()),
index_stat.st_mtime_ns,
index_stat.st_size,
_dashboard_graph_manifest_export_id() or "",
source_stat.st_mtime_ns,
source_stat.st_size,
css_hash,
)
def _disk_cache_token(cache_key: tuple[Any, ...]) -> str:
return json.dumps(cache_key, separators=(",", ":"), sort_keys=True)
def _read_disk_cache_payload(path: Path, cache_token: str) -> dict[str, Any] | None:
try:
data = json.loads(path.read_text(encoding="utf-8"))
except (OSError, json.JSONDecodeError):
return None
if not isinstance(data, dict):
return None
if data.get("schema_version") != 1 or data.get("cache_token") != cache_token:
return None
return data
def _write_disk_cache_payload(
path: Path,
cache_token: str,
payload: dict[str, Any],
*,
sort_keys: bool = False,
) -> None:
try:
_atomic_write_text(
path,
json.dumps(
{
"schema_version": 1,
"cache_token": cache_token,
**payload,
},
ensure_ascii=False,
sort_keys=sort_keys,
) + "\n",
encoding="utf-8",
)
except (OSError, TypeError, ValueError):
return
def _read_html_disk_cache(path: Path, cache_token: str) -> str | None:
data = _read_disk_cache_payload(path, cache_token)
if data is None:
return None
html_text = data.get("html")
return html_text if isinstance(html_text, str) else None
def _write_html_disk_cache(path: Path, cache_token: str, html_text: str) -> None:
_write_disk_cache_payload(path, cache_token, {"html": html_text})
def _wiki_render_disk_cache_path() -> Path:
return _claude_dir() / ".ctx-monitor-wiki-cache.json"
def _render_wiki_index(entity_type: str | None = None, query: str = "") -> str:
"""Card grid of every wiki entity β€” search + type filter + sidecar grades."""
selected_type = _normalize_dashboard_entity_type(entity_type) if entity_type else None
if entity_type is not None and selected_type is None:
return _layout(
"Wiki",
f"<div class='error'>Unsupported entity type: {html.escape(entity_type)}</div>",
)
initial_query = query.strip()
cache_key = _wiki_render_cache_key(selected_type, initial_query)
global _WIKI_RENDER_CACHE_KEY, _WIKI_RENDER_CACHE_VALUE
if cache_key is not None:
if _WIKI_RENDER_CACHE_KEY == cache_key and _WIKI_RENDER_CACHE_VALUE is not None:
return _WIKI_RENDER_CACHE_VALUE
cache_token = _disk_cache_token(cache_key)
cached = _read_html_disk_cache(_wiki_render_disk_cache_path(), cache_token)
if cached is not None:
_WIKI_RENDER_CACHE_KEY = cache_key
_WIKI_RENDER_CACHE_VALUE = cached
return cached
else:
cache_token = ""
entries = _wiki_index_entries()
wstats = _wiki_stats()
total_available = int(wstats.get("total") or len(entries))
# Join with grade pills where a sidecar exists.
grade_by_key: dict[tuple[str, str], str] = {}
for entry in entries:
slug = str(entry["slug"])
row_type = str(entry["type"])
grade = str(entry.get("grade") or "")
if grade:
grade_by_key[(slug, row_type)] = grade
continue
sidecar = _load_sidecar(slug, entity_type=row_type)
if sidecar:
grade_by_key[(slug, row_type)] = str(sidecar.get("grade") or "")
type_counts = {
"skill": int(wstats.get("skills") or 0),
"agent": int(wstats.get("agents") or 0),
"mcp-server": int(wstats.get("mcps") or 0),
"harness": int(wstats.get("harnesses") or 0),
}
suggestions = "".join(
f"<option value='{html.escape(e['slug'])}' "
f"label='{html.escape(e.get('display_slug') or e['slug'])}'>"
for e in entries[:1000]
)
cards = "".join(
"<a class='wiki-card' "
f"data-slug='{html.escape(e['slug'])}' "
f"data-display-slug='{html.escape(e.get('display_slug') or e['slug'])}' "
f"data-type='{html.escape(e['type'])}' "
f"data-tags='{html.escape(' '.join(e.get('search_tags', e['tags'])).lower())}' "
f"href='/wiki/{html.escape(e['slug'])}?type={html.escape(e['type'])}' "
"style='border:1px solid #e5e7eb; border-radius:6px; "
"padding:0.6rem 0.8rem; text-decoration:none; color:inherit; "
"display:flex; flex-direction:column; gap:0.25rem;'>"
"<div style='display:flex; justify-content:space-between; align-items:center; gap:0.4rem;'>"
f"<code style='font-size:0.84rem;'>{html.escape(e.get('display_slug') or e['slug'])}</code>"
+ (f"<span class='pill grade-{html.escape(grade_by_key[(e['slug'], e['type'])])}'>"
f"{html.escape(grade_by_key[(e['slug'], e['type'])])}</span>"
if grade_by_key.get((e['slug'], e['type'])) else
f"<span class='pill'>{html.escape(e['type'])}</span>")
+ "</div>"
f"<div class='muted' style='font-size:0.78rem; line-height:1.3;'>"
f"{html.escape(e['description'] or '(no description)')}"
"</div>"
+ (f"<div class='muted' style='font-size:0.72rem;'>"
f"{' Β· '.join(html.escape(t) for t in e['tags'][:5])}</div>"
if e["tags"] else "")
+ "</a>"
for e in entries
)
type_checkboxes = "".join(
f"<label style='display:flex; justify-content:space-between; padding:0.25rem 0;'>"
f"<span><input type='checkbox' class='wiki-type-filter' value='{t}' "
f"{'checked' if selected_type is None or selected_type == t else ''}> {t}</span>"
f"<span class='muted' style='font-size:0.78rem;'>{type_counts.get(t, 0):,}</span>"
f"</label>"
for t in _DASHBOARD_ENTITY_TYPES
)
badge_links = "".join(
f"<a class='pill entity-type-{html.escape(t)}' href='/wiki?type={quote(t)}'>"
f"{html.escape(t)}</a>"
for t in _DASHBOARD_ENTITY_TYPES
)
body = (
"<h1>Wiki</h1>"
f"<p class='muted'>{len(entries):,} shown of {total_available:,} entity pages under "
f"<code>~/.claude/skill-wiki/entities/</code> Β· "
"search by slug / description / tag, pick a suggestion, "
"or click a tile to read the full page.</p>"
"<div class='card' style='display:flex; gap:0.45rem; flex-wrap:wrap; align-items:center;'>"
f"<strong>Catalog shortcuts</strong>{badge_links}</div>"
"<div style='display:grid; grid-template-columns:220px 1fr; gap:1.25rem; align-items:start;'>"
# Left sidebar
"<aside style='position:sticky; top:1rem;'>"
"<div class='card'><strong>Search</strong>"
f"<datalist id='wiki-entity-suggestions'>{suggestions}</datalist>"
"<input type='text' id='wiki-search' placeholder='slug / tag / text…' "
"style='width:100%; margin-top:0.4rem; padding:0.35rem 0.5rem; "
"border:1px solid #ccc; border-radius:4px;'></div>"
"<div class='card'><strong>Type</strong>" + type_checkboxes + "</div>"
"<div class='card'><span id='wiki-match-count' class='muted'>β€”</span></div>"
"</aside>"
# Card grid
"<div id='wiki-grid' style='display:grid; "
"grid-template-columns:repeat(auto-fill, minmax(280px, 1fr)); gap:0.6rem;'>"
+ (cards or "<p class='muted'>No wiki entities found. "
"Extract <code>graph/wiki-graph.tar.gz</code> into "
"<code>~/.claude/skill-wiki/</code> to populate.</p>")
+ "</div>"
"</div>"
"<script>\n"
"const wcards = document.querySelectorAll('.wiki-card');\n"
"const wsearch = document.getElementById('wiki-search');\n"
"wsearch.setAttribute('list', 'wiki-entity-suggestions');\n"
f"wsearch.value = {json.dumps(initial_query)};\n"
"function wActiveTypes() { return Array.from(document.querySelectorAll('.wiki-type-filter:checked')).map(x => x.value); }\n"
"function wApply() {\n"
" const q = wsearch.value.trim().toLowerCase();\n"
" const types = new Set(wActiveTypes());\n"
" let shown = 0;\n"
" wcards.forEach(c => {\n"
" const hay = (c.dataset.slug + ' ' + c.dataset.displaySlug + ' ' + (c.textContent||'') + ' ' + c.dataset.tags).toLowerCase();\n"
" const ok = types.has(c.dataset.type) && (!q || hay.includes(q));\n"
" c.style.display = ok ? '' : 'none';\n"
" if (ok) shown++;\n"
" });\n"
" document.getElementById('wiki-match-count').textContent = shown + ' of ' + wcards.length + ' match';\n"
"}\n"
"wsearch.addEventListener('input', wApply);\n"
"document.querySelectorAll('.wiki-type-filter').forEach(el => el.addEventListener('change', wApply));\n"
"wApply();\n"
"</script>"
)
html_out = _layout("Wiki", body)
if cache_key is not None:
_write_html_disk_cache(_wiki_render_disk_cache_path(), cache_token, html_out)
_WIKI_RENDER_CACHE_KEY = cache_key
_WIKI_RENDER_CACHE_VALUE = html_out
return html_out
def _docs_roots() -> list[Path]:
return dashboard_docs.docs_roots(Path.cwd(), Path(__file__).resolve().parent.parent)
def _docs_render_disk_cache_path() -> Path:
return dashboard_docs.docs_render_disk_cache_path(_claude_dir())
def _docs_index_entries() -> list[dict[str, Any]]:
return dashboard_docs.docs_index_entries(_docs_roots())
def _docs_tabs(entries: list[dict[str, Any]]) -> list[dict[str, Any]]:
return dashboard_docs.docs_tabs(entries, _docs_roots())
def _render_docs_markdown(markdown_text: str, page_anchor: str) -> str:
return dashboard_docs.render_docs_markdown(
markdown_text,
page_anchor,
fallback_renderer=_render_wiki_markdown,
)
def _render_docs() -> str:
return dashboard_docs.render_docs(
roots=_docs_roots(),
layout=_layout,
asset_text=_monitor_asset_text,
inline_script=_monitor_inline_script,
cache_path=_docs_render_disk_cache_path(),
fallback_markdown=_render_wiki_markdown,
cache_salt_paths=(Path(__file__),),
cache_extra=(id(_docs_index_entries), id(_docs_tabs)),
index_entries=_docs_index_entries,
tabs_for_entries=_docs_tabs,
render_markdown_func=_render_docs_markdown,
)
def _render_manage(mutations_enabled: bool | None = None) -> str:
"""Render catalog management for wiki entities and graph refresh queueing."""
if mutations_enabled is None:
mutations_enabled = _MONITOR_MUTATIONS_ENABLED
token = _MONITOR_TOKEN if mutations_enabled else ""
initial_results = _search_wiki_entities(limit=40)
type_options = "".join(
f"<option value='{html.escape(entity_type)}'>{html.escape(entity_type)}</option>"
for entity_type in _DASHBOARD_ENTITY_TYPES
)
initial_json = _json_for_script(initial_results)
read_only = (
""
if mutations_enabled
else (
"<div class='card'><strong>Read-only mode.</strong> Catalog edits are "
"disabled because ctx-monitor is not bound to a loopback address.</div>"
)
)
disabled = "" if mutations_enabled else " disabled"
body = (
"<h1>Manage catalog</h1>"
"<p class='muted'>Search skills, agents, MCP servers, and harnesses. "
"Edit the wiki page, delete stale entries, or add a new entity. Saves "
"write into <code>~/.claude/skill-wiki/entities/</code> and queue graph "
"refresh work for the knowledge graph.</p>"
+ read_only
+
"<div class='wizard-layout'>"
"<section class='card'>"
"<h2>Search catalog</h2>"
"<div class='wizard-grid'>"
"<label>Query"
"<input id='manage-search' type='search' placeholder='slug, tag, description' "
"autocomplete='off'></label>"
"<label>Type"
f"<select id='manage-type'><option value=''>all types</option>{type_options}</select>"
"</label>"
"</div>"
"<p id='manage-search-status' class='muted'>Loading...</p>"
"<div id='manage-results' class='manage-results'></div>"
"</section>"
"<section class='card'>"
"<h2>Add or update entity</h2>"
"<form id='entity-editor-form'>"
"<div class='wizard-grid'>"
"<label>Slug <span class='pill grade-A'>Required</span>"
"<input name='slug' required pattern='[a-z0-9][a-z0-9_.+-]*' "
"placeholder='custom-reviewer'></label>"
"<label>Type <span class='pill grade-A'>Required</span>"
f"<select name='entity_type' required>{type_options}</select></label>"
"<label>Title <span class='pill grade-A'>Required</span>"
"<input name='title' required placeholder='Custom Reviewer'></label>"
"<label>Tags"
"<input name='tags' placeholder='python, review, policy'></label>"
"<label class='wide'>Description"
"<input name='description' placeholder='What this entity does and when to use it'></label>"
"<label class='wide'>Source URL"
"<input name='source_url' placeholder='https://github.com/org/repo'></label>"
"<label class='wide'>Markdown body <span class='pill grade-A'>Required</span>"
"<textarea name='body' required rows='16' "
"placeholder='# Custom Reviewer\n\nInstall and usage notes...'></textarea></label>"
"</div>"
"<div style='display:flex; gap:0.5rem; flex-wrap:wrap; margin-top:0.8rem;'>"
f"<button type='submit'{disabled}>Save to wiki + graph queue</button>"
"<button type='button' id='entity-new-button'>New</button>"
f"<button type='button' id='entity-delete-button' data-testid='entity-delete-button'{disabled}>Delete selected</button>"
"</div>"
"<p id='entity-editor-status' class='muted'></p>"
"</form>"
"</section>"
"</div>"
"<script>\n"
"window.CTX_MONITOR_MANAGE = {\n"
f" mutationsEnabled: {json.dumps(mutations_enabled)},\n"
f" token: {json.dumps(token)},\n"
f" initialResults: {initial_json}\n"
"};\n"
"</script>"
+ _monitor_inline_script("monitor-manage.js")
)
return _layout("Manage catalog", body)
def _harness_wizard_entries(limit: int = 24) -> list[dict[str, Any]]:
"""Return catalog harness pages for the manual dashboard wizard."""
harness_dir = _wiki_dir() / "entities" / "harnesses"
if not harness_dir.is_dir():
return []
rows: list[dict[str, Any]] = []
for path in sorted(harness_dir.glob("*.md"), key=lambda p: p.stem.lower()):
if len(rows) >= limit:
break
slug = path.stem
if not _is_safe_slug(slug):
continue
try:
head = path.read_text(encoding="utf-8", errors="replace")[:4096]
except OSError:
continue
meta, _body = _parse_frontmatter(head)
sidecar = _harness_wizard_sidecar(slug) or {}
score = float(sidecar.get("raw_score", sidecar.get("score", 0.0)) or 0.0)
tags = _frontmatter_tags(meta.get("tags", ""), limit=None)
description, _truncated = _truncate_text(
_frontmatter_text(meta.get("description", "")),
260,
)
repo_url = _frontmatter_text(
meta.get("repo_url")
or meta.get("github_url")
or meta.get("homepage_url")
or ""
)
rows.append({
"slug": slug,
"title": _frontmatter_text(meta.get("title") or meta.get("name") or slug),
"description": description,
"tags": tags[:12],
"score": score,
"grade": str(sidecar.get("grade") or ""),
"repo_url": repo_url,
})
return sorted(rows, key=lambda row: (-float(row["score"]), str(row["slug"])))
def _harness_wizard_sidecar(slug: str) -> dict[str, Any] | None:
"""Load harness sidecar candidates without scanning every sidecar file."""
if not _is_safe_slug(slug):
return None
for path in (
_sidecar_dir() / f"{slug}.json",
_sidecar_dir() / f"{slug}-harness.json",
):
if not path.exists():
continue
sidecar = _read_sidecar_file(path)
if sidecar is None:
continue
if sidecar.get("slug") == slug and _sidecar_entity_type(sidecar) == "harness":
return sidecar
return None
def _render_harness_wizard() -> str:
"""Manual harness interview for users who bring their own LLM."""
harnesses = _harness_wizard_entries()
provider_options = (
"openai", "anthropic", "google", "openrouter", "ollama",
"lm-studio", "local", "other",
)
provider_html = "".join(
f"<option value='{html.escape(provider)}'>{html.escape(provider)}</option>"
for provider in provider_options
)
tool_options = (
("files", "Files"),
("git", "Git"),
("shell", "Shell"),
("browser", "Browser"),
("http", "HTTP/network"),
("package-manager", "Package manager"),
("database", "Database"),
)
tools_html = "".join(
"<label style='display:flex; align-items:center; gap:0.35rem;'>"
f"<input type='checkbox' name='tools' value='{html.escape(value)}' "
f"{'checked' if value in {'files', 'git', 'shell'} else ''}>"
f"{html.escape(label)}</label>"
for value, label in tool_options
)
harness_cards = "".join(
"<div class='harness-card' "
f"data-harness-slug='{html.escape(row['slug'])}' "
f"data-harness-text='{html.escape(' '.join([row['slug'], row['title'], row['description'], *row['tags']]).lower())}' "
f"data-harness-score='{float(row['score']):.3f}'>"
"<div style='display:flex; justify-content:space-between; gap:0.5rem; align-items:start;'>"
f"<strong>{html.escape(row['title'])}</strong>"
+ (
f"<span class='pill grade-{html.escape(row['grade'])}'>{html.escape(row['grade'])}</span>"
if row["grade"]
else "<span class='pill entity-type-harness'>harness</span>"
)
+ "</div>"
f"<p class='muted' style='margin:0;'>{html.escape(row['description'] or 'No description available.')}</p>"
+ (
"<div class='muted' style='font-size:0.78rem;'>"
+ " ".join(f"<code>{html.escape(tag)}</code>" for tag in row["tags"][:8])
+ "</div>"
if row["tags"]
else ""
)
+ (
f"<a class='muted' href='{html.escape(row['repo_url'])}'>{html.escape(row['repo_url'])}</a>"
if row["repo_url"].startswith(("http://", "https://"))
else ""
)
+ f"<code>ctx-harness-install {html.escape(row['slug'])} --dry-run</code>"
+ f"<button type='button' class='secondary' data-select-harness='{html.escape(row['slug'])}'>select</button>"
+ "</div>"
for row in harnesses
)
if not harness_cards:
harness_cards = (
"<p class='muted'>No harness pages were found under "
"<code>~/.claude/skill-wiki/entities/harnesses/</code>. "
"Use the no-fit PRD output below to build an attachable harness.</p>"
)
body = (
"<div class='setup-header'>"
"<div><div class='setup-kicker'>Model -> intent -> install -> attach ctx</div>"
"<h1>Harness Setup</h1>"
"<p class='muted'>For users running their own API or local model instead of Claude Code. "
"Interview the model/runtime choice, generate a real ctx harness recommendation command, "
"then install a harness or produce a no-fit PRD for a custom harness.</p></div>"
"<span class='pill entity-type-harness'>local/API model path</span>"
"</div>"
"<div class='setup-flow'>"
"<div class='setup-flow-step'><strong>1. Model</strong><span class='muted'>Provider, model slug, endpoint.</span></div>"
"<div class='setup-flow-step'><strong>2. Intent</strong><span class='muted'>Goal, OS, access, privacy.</span></div>"
"<div class='setup-flow-step'><strong>3. Install</strong><span class='muted'>Recommend, dry-run, install.</span></div>"
"<div class='setup-flow-step'><strong>4. Attach ctx</strong><span class='muted'>Graph/wiki recommendations flow into the harness.</span></div>"
"</div>"
"<div class='wizard-layout'>"
"<form id='harness-wizard-form' class='card'>"
"<div class='wizard-step'><strong>1. Model</strong>"
"<div class='wizard-grid' style='margin-top:0.65rem;'>"
"<label>Model provider <span class='pill grade-A'>Required</span>"
f"<select name='model_provider' required>{provider_html}</select></label>"
"<label>Model slug <span class='pill grade-A'>Required</span>"
"<input name='model' required placeholder='openai/gpt-5.5 or ollama/qwen3-coder'></label>"
"<label class='wide'>API base URL or local endpoint"
"<input name='endpoint' placeholder='https://api.openai.com/v1 or http://localhost:11434'></label>"
"</div></div>"
"<div class='wizard-step'><strong>2. Goal and access</strong>"
"<div class='wizard-grid' style='margin-top:0.65rem;'>"
"<label class='wide'>Development goal <span class='pill grade-A'>Required</span>"
"<textarea name='goal' rows='4' required placeholder='What should the agent build, fix, research, or operate?'></textarea></label>"
"<label>Runtime / OS"
"<select name='runtime'><option>windows</option><option>macos</option><option>linux</option>"
"<option selected>cross-platform</option></select></label>"
"<label>Autonomy"
"<select name='autonomy'><option>read-only</option><option selected>repo-write</option>"
"<option>deploy-capable</option></select></label>"
"<label class='wide'>Allowed tools"
f"<div style='display:grid; grid-template-columns:repeat(auto-fit,minmax(140px,1fr)); gap:0.25rem;'>{tools_html}</div></label>"
"<label>Verification gates"
"<input name='verify' placeholder='pytest, ruff, mypy, build, smoke'></label>"
"<label>Privacy / network"
"<select name='privacy'><option selected>local repo only</option><option>network allowed</option>"
"<option>secrets allowed by env only</option><option>offline only</option></select></label>"
"<label>ctx attachment"
"<select name='attach_mode'><option selected>mcp</option><option>python</option><option>cli</option></select></label>"
"</div></div>"
"<div class='wizard-step'><strong>3. Recommend and install</strong>"
"<p class='muted'>The dashboard previews catalog matches. The command below calls the real harness recommender and keeps the no-fit path available.</p>"
"<button type='submit'>build recommendation command</button> "
"<button type='button' id='harness-reset' class='secondary'>reset</button>"
"</div>"
"</form>"
"<aside class='card'>"
"<h2 style='margin-top:0;'>Command plan</h2>"
"<pre class='command-box' data-testid='harness-command-output'>ctx-harness-install --recommend --goal \"...\" --model-provider openai --model openai/gpt-5.5 --top-k 5 --plan-on-no-fit</pre>"
"<p class='muted'>Run the dry-run first. The installer writes attach files under the harness target so the selected harness can connect to ctx graph/wiki recommendations.</p>"
"<div id='selected-harness-command' class='muted'>Select a harness card to see install, update, and validation commands.</div>"
"</aside>"
"</div>"
"<section class='card'>"
"<div style='display:flex; justify-content:space-between; gap:0.75rem; align-items:center; flex-wrap:wrap;'>"
"<div><h2 style='margin:0;'>Catalog harnesses</h2>"
"<p class='muted' style='margin:0.2rem 0 0;'>Cards are filtered by the interview text. If none fit, use the no-fit PRD path.</p></div>"
"<span id='harness-match-count' class='pill entity-type-harness'>0 matches</span>"
"</div>"
"<div id='harness-cards' style='display:grid; grid-template-columns:repeat(auto-fill,minmax(280px,1fr)); gap:0.7rem; margin-top:0.8rem;'>"
+ harness_cards
+ "</div>"
"</section>"
"<section class='card'>"
"<h2>No-fit custom harness PRD</h2>"
"<p class='muted'>When no catalog harness clears the configured match score, generate a PRD for the user's strong model or engineering team. It must include orchestration, durable state, permissions, verification gates, and ctx recommendation hooks.</p>"
"<pre class='command-box' id='no-fit-command'>ctx-harness-install --recommend --goal \"...\" --model-provider openai --model openai/gpt-5.5 --plan-on-no-fit --plan-output custom-harness-prd.md</pre>"
"</section>"
"<script>\n"
"(function () {\n"
" const form = document.getElementById('harness-wizard-form');\n"
" const output = document.querySelector('[data-testid=\"harness-command-output\"]');\n"
" const noFit = document.getElementById('no-fit-command');\n"
" const selected = document.getElementById('selected-harness-command');\n"
" const count = document.getElementById('harness-match-count');\n"
" const cards = Array.from(document.querySelectorAll('.harness-card'));\n"
" function value(name) { const el = form.elements[name]; return el ? String(el.value || '').trim() : ''; }\n"
" function shellQuote(value) { return '\"' + String(value || '').replace(/\\\\/g, '\\\\\\\\').replace(/\"/g, '\\\\\"') + '\"'; }\n"
" function checkedTools() { return Array.from(form.querySelectorAll('input[name=\"tools\"]:checked')).map(el => el.value).join(','); }\n"
" function arg(flag, val) { return val ? ' ' + flag + ' ' + shellQuote(val) : ''; }\n"
" function recommendCommand() {\n"
" const tools = checkedTools();\n"
" let cmd = 'ctx-harness-install --recommend';\n"
" cmd += arg('--goal', value('goal'));\n"
" cmd += arg('--model-provider', value('model_provider'));\n"
" cmd += arg('--model', value('model'));\n"
" cmd += arg('--harness-runtime', value('runtime'));\n"
" cmd += arg('--harness-autonomy', value('autonomy'));\n"
" cmd += arg('--harness-tools', tools);\n"
" cmd += arg('--harness-verify', value('verify'));\n"
" cmd += arg('--harness-privacy', value('privacy'));\n"
" cmd += arg('--harness-attach-mode', value('attach_mode'));\n"
" return cmd + ' --top-k 5 --plan-on-no-fit';\n"
" }\n"
" function fitCards() {\n"
" const intent = [value('goal'), value('model_provider'), value('model'), value('runtime'), value('autonomy'), checkedTools(), value('verify'), value('privacy'), value('attach_mode')].join(' ').toLowerCase();\n"
" const terms = intent.split(/[^a-z0-9_.-]+/).filter(Boolean);\n"
" const host = document.getElementById('harness-cards');\n"
" let visible = 0;\n"
" cards.forEach(card => {\n"
" const text = card.dataset.harnessText || '';\n"
" const base = Number(card.dataset.harnessScore || 0);\n"
" const hits = terms.filter(term => text.includes(term)).length;\n"
" const fit = base + hits * 0.08;\n"
" card.dataset.fit = fit.toFixed(3);\n"
" const hide = terms.length > 0 && fit < 0.12;\n"
" card.dataset.fitHidden = hide ? 'true' : 'false';\n"
" if (!hide) visible++;\n"
" });\n"
" cards.sort((a, b) => Number(b.dataset.fit || 0) - Number(a.dataset.fit || 0)).forEach(card => host.appendChild(card));\n"
" count.textContent = visible + ' matches';\n"
" }\n"
" function refresh() {\n"
" const cmd = recommendCommand();\n"
" output.textContent = cmd;\n"
" noFit.textContent = cmd + ' --plan-output custom-harness-prd.md';\n"
" fitCards();\n"
" }\n"
" form.addEventListener('submit', ev => { ev.preventDefault(); refresh(); });\n"
" form.addEventListener('input', refresh);\n"
" document.getElementById('harness-reset').addEventListener('click', () => { form.reset(); refresh(); });\n"
" document.querySelectorAll('[data-select-harness]').forEach(btn => btn.addEventListener('click', () => {\n"
" const slug = btn.dataset.selectHarness || '';\n"
" cards.forEach(card => card.classList.toggle('selected', card.dataset.harnessSlug === slug));\n"
" selected.innerHTML = '<pre class=\"command-box\">ctx-harness-install ' + slug + ' --dry-run\\nctx-harness-install ' + slug + '\\nctx-harness-install ' + slug + ' --update --dry-run\\nctx-scan-repo --repo . --recommend\\nctx-monitor serve</pre>';\n"
" }));\n"
" refresh();\n"
"})();\n"
"</script>"
)
return _layout("Harness Setup", body)
def _kpi_summary_cache_key(sidecar_dir: Path) -> tuple[Any, ...]:
parts: list[tuple[str, str, int, int, int, int]] = []
for root in (sidecar_dir, sidecar_dir / "mcp"):
try:
root_name = str(root.resolve())
except OSError:
root_name = str(root)
buckets = {
"quality": [0, 0, 0, 0],
"lifecycle": [0, 0, 0, 0],
}
try:
with os.scandir(root) as entries:
for entry in entries:
name = entry.name
if (
name.startswith(".")
or not name.endswith(".json")
or not entry.is_file(follow_symlinks=False)
):
continue
bucket = (
"lifecycle"
if name.endswith(".lifecycle.json")
else "quality"
)
try:
stat = entry.stat(follow_symlinks=False)
except OSError:
continue
data = buckets[bucket]
data[0] += 1
data[1] += int(stat.st_size)
data[2] = max(data[2], int(stat.st_mtime_ns))
data[3] = (data[3] + int(stat.st_mtime_ns)) & ((1 << 63) - 1)
except OSError:
pass
for bucket, values in buckets.items():
parts.append((root_name, bucket, values[0], values[1], values[2], values[3]))
return tuple(parts)
def _kpi_summary_disk_cache_path(sidecar_dir: Path) -> Path:
return sidecar_dir / ".dashboard-kpi-summary.json"
def _dashboard_summary_from_dict(summary_cls: Any, data: Any) -> Any | None:
if not isinstance(data, dict):
return None
def dict_field(name: str) -> dict[str, Any]:
value = data.get(name)
return dict(value) if isinstance(value, dict) else {}
def list_field(name: str) -> list[dict[str, Any]]:
value = data.get(name)
if not isinstance(value, list):
return []
return [dict(item) for item in value if isinstance(item, dict)]
try:
return summary_cls(
generated_at=str(data.get("generated_at") or ""),
total=int(data.get("total") or 0),
by_subject=dict_field("by_subject"),
grade_counts=dict_field("grade_counts"),
lifecycle_counts=dict_field("lifecycle_counts"),
category_breakdown=list_field("category_breakdown"),
hard_floor_counts=dict_field("hard_floor_counts"),
low_quality_candidates=list_field("low_quality_candidates"),
archived=list_field("archived"),
)
except (TypeError, ValueError):
return None
def _read_kpi_summary_disk_cache(
sidecar_dir: Path,
cache_token: str,
summary_cls: Any,
) -> Any | None:
data = _read_disk_cache_payload(_kpi_summary_disk_cache_path(sidecar_dir), cache_token)
if data is None:
return None
return _dashboard_summary_from_dict(summary_cls, data.get("summary"))
def _write_kpi_summary_disk_cache(
sidecar_dir: Path,
cache_token: str,
summary: Any,
) -> None:
_write_disk_cache_payload(
_kpi_summary_disk_cache_path(sidecar_dir),
cache_token,
{"summary": summary.to_dict()},
sort_keys=True,
)
def _kpi_summary():
"""Compute the KPI DashboardSummary using the default source layout.
Returns ``None`` if the kpi_dashboard module can't be imported or
the required directories don't exist β€” the caller renders an
explanatory empty state instead of failing.
"""
try:
from kpi_dashboard import DashboardSummary # type: ignore
from kpi_dashboard import generate # type: ignore
from ctx_lifecycle import LifecycleSources # type: ignore
except Exception: # noqa: BLE001 β€” KPIs are advisory
return None
sidecar_dir = _sidecar_dir()
if not sidecar_dir.is_dir():
return None
cache_key = _kpi_summary_cache_key(sidecar_dir)
global _KPI_SUMMARY_CACHE_AT, _KPI_SUMMARY_CACHE_KEY, _KPI_SUMMARY_CACHE_VALUE
if (
_KPI_SUMMARY_CACHE_KEY == cache_key
and _KPI_SUMMARY_CACHE_VALUE is not None
and time.monotonic() - _KPI_SUMMARY_CACHE_AT < _KPI_SUMMARY_CACHE_SECONDS
):
return _KPI_SUMMARY_CACHE_VALUE
cache_token = _disk_cache_token(cache_key)
summary = _read_kpi_summary_disk_cache(sidecar_dir, cache_token, DashboardSummary)
if summary is not None:
_KPI_SUMMARY_CACHE_KEY = cache_key
_KPI_SUMMARY_CACHE_VALUE = summary
_KPI_SUMMARY_CACHE_AT = time.monotonic()
return summary
try:
from ctx_config import cfg # type: ignore
sources = LifecycleSources(
skills_dir=cfg.skills_dir,
agents_dir=cfg.agents_dir,
sidecar_dir=sidecar_dir,
)
except Exception: # noqa: BLE001 β€” fallback: sidecar-only
sources = LifecycleSources(
skills_dir=sidecar_dir,
agents_dir=sidecar_dir,
sidecar_dir=sidecar_dir,
)
try:
summary = generate(sources=sources, top_n=25)
except Exception: # noqa: BLE001
return None
_write_kpi_summary_disk_cache(sidecar_dir, cache_token, summary)
_KPI_SUMMARY_CACHE_KEY = cache_key
_KPI_SUMMARY_CACHE_VALUE = summary
_KPI_SUMMARY_CACHE_AT = time.monotonic()
return summary
def _render_kpi() -> str:
"""HTML-rendered KPI dashboard β€” grades, lifecycle, categories,
hard floors, top demotion candidates, archived entities.
Mirrors the structure of ``kpi_dashboard.render_markdown`` so the
commit-friendly Markdown digest and the browser view show the
same numbers.
"""
summary = _kpi_summary()
if summary is None or summary.total == 0:
empty = (
"<h1>KPIs</h1>"
"<div class='card'><strong>No KPI data yet.</strong>"
"<p class='muted' style='margin-top:0.4rem;'>"
"The KPI dashboard reads from "
"<code>~/.claude/skill-quality/*.json</code> and "
"<code>*.lifecycle.json</code>. Run "
"<code>ctx-skill-quality recompute --all</code> to populate "
"sidecars, then reload this page.</p>"
"<p class='muted'>CLI equivalent: "
"<code>python -m kpi_dashboard render</code></p></div>"
)
return _layout("KPIs", empty)
total = summary.total
# Grade distribution pills + detail table
grade_pills = "".join(
f"<span class='pill grade-{g}'>{g}: {summary.grade_counts.get(g, 0)}</span> "
for g in ("A", "B", "C", "D", "F")
)
def pct(n: int) -> str:
return f"{(100.0 * n / total):.1f}%" if total else "β€”"
grade_rows = "".join(
f"<tr><td><span class='pill grade-{g}'>{g}</span></td>"
f"<td>{summary.grade_counts.get(g, 0)}</td>"
f"<td class='muted'>{pct(summary.grade_counts.get(g, 0))}</td></tr>"
for g in ("A", "B", "C", "D", "F")
)
lifecycle_rows = "".join(
f"<tr><td><code>{html.escape(state)}</code></td>"
f"<td>{summary.lifecycle_counts.get(state, 0)}</td></tr>"
for state in ("active", "watch", "demote", "archive")
)
floor_rows = "".join(
f"<tr><td><code>{html.escape(reason)}</code></td><td>{count}</td></tr>"
for reason, count in sorted(
summary.hard_floor_counts.items(), key=lambda kv: (-kv[1], kv[0]),
)
) or "<tr><td colspan='2' class='muted'>No hard floors active.</td></tr>"
category_rows = "".join(
"<tr>"
f"<td>{html.escape(c['category'])}</td>"
f"<td>{c['count']}</td>"
f"<td class='muted'>{c['avg_score']:.3f}</td>"
f"<td><span class='pill grade-A'>{c['grade_mix'].get('A', 0)}</span></td>"
f"<td><span class='pill grade-B'>{c['grade_mix'].get('B', 0)}</span></td>"
f"<td><span class='pill grade-C'>{c['grade_mix'].get('C', 0)}</span></td>"
f"<td><span class='pill grade-D'>{c['grade_mix'].get('D', 0)}</span></td>"
f"<td><span class='pill grade-F'>{c['grade_mix'].get('F', 0)}</span></td>"
"</tr>"
for c in summary.category_breakdown
) or "<tr><td colspan='8' class='muted'>No categorized entities.</td></tr>"
def detail_href(slug: str, entity_type: str) -> str:
normalized = _normalize_dashboard_entity_type(entity_type)
suffix = f"?type={html.escape(normalized)}" if normalized else ""
return f"/skill/{html.escape(slug)}{suffix}"
demotion_rows = "".join(
"<tr>"
f"<td><a href='{detail_href(c['slug'], c['subject_type'])}'><code>{html.escape(c['slug'])}</code></a></td>"
f"<td class='muted'>{html.escape(c['subject_type'])}</td>"
f"<td class='muted'>{html.escape(c['category'])}</td>"
f"<td><span class='pill grade-{html.escape(c['grade'])}'>{html.escape(c['grade'])}</span></td>"
f"<td class='muted'>{c['score']:.3f}</td>"
f"<td class='muted'>{html.escape(c['lifecycle_state'])}</td>"
f"<td>{c['consecutive_d_count']}</td>"
f"<td class='muted'>{html.escape(c.get('hard_floor') or 'β€”')}</td>"
"</tr>"
for c in summary.low_quality_candidates
) or "<tr><td colspan='8' class='muted'>No active D/F grade entries β€” corpus is healthy.</td></tr>"
archived_rows = "".join(
"<tr>"
f"<td><a href='{detail_href(a['slug'], a['subject_type'])}'><code>{html.escape(a['slug'])}</code></a></td>"
f"<td class='muted'>{html.escape(a['subject_type'])}</td>"
f"<td class='muted'>{html.escape(a['category'])}</td>"
f"<td class='muted'>{html.escape(a.get('last_grade') or 'β€”')}</td>"
f"<td class='muted'>{html.escape(a.get('computed_at') or 'β€”')}</td>"
"</tr>"
for a in summary.archived
) or "<tr><td colspan='5' class='muted'>None.</td></tr>"
by_subject = summary.by_subject
subject_blurb = " Β· ".join(
f"{html.escape(s)}: {n}" for s, n in sorted(by_subject.items())
) or "β€”"
body = (
"<h1>KPIs</h1>"
"<p class='muted'>Aggregated from "
"<code>~/.claude/skill-quality/*.json</code> (quality sidecars) "
"and <code>*.lifecycle.json</code> (tier sidecars). "
f"Generated {html.escape(summary.generated_at)}.</p>"
"<div class='card'>"
f"<strong>Total entities:</strong> {total} "
f"<span class='muted'>Β· {subject_blurb}</span>"
f"<div style='margin-top:0.5rem;'>{grade_pills}</div>"
"<div style='margin-top:0.4rem;'>"
"<a href='/api/kpi.json'>JSON</a> Β· "
"<a href='/skills'>skill cards β†’</a></div>"
"</div>"
"<div style='display:grid; grid-template-columns:1fr 1fr; gap:1rem;'>"
"<div class='card'><strong>Grade distribution</strong>"
"<table><tr><th>Grade</th><th>Count</th><th>Share</th></tr>"
+ grade_rows + "</table></div>"
"<div class='card'><strong>Lifecycle tiers</strong>"
"<table><tr><th>State</th><th>Count</th></tr>"
+ lifecycle_rows + "</table></div>"
"</div>"
"<div class='card'><strong>Hard floors active</strong>"
"<table><tr><th>Reason</th><th>Count</th></tr>"
+ floor_rows + "</table></div>"
"<div class='card'><strong>By category</strong>"
"<table><tr><th>Category</th><th>Count</th><th>Avg score</th>"
"<th>A</th><th>B</th><th>C</th><th>D</th><th>F</th></tr>"
+ category_rows + "</table></div>"
"<div class='card'><strong>Top demotion candidates</strong> "
"<span class='muted'>(active or watch Β· grade D/F Β· sorted by D-streak desc, score asc)</span>"
"<table><tr><th>Slug</th><th>Type</th><th>Category</th><th>Grade</th>"
"<th>Score</th><th>State</th><th>D-streak</th><th>Hard floor</th></tr>"
+ demotion_rows + "</table></div>"
"<div class='card'><strong>Archived</strong>"
"<table><tr><th>Slug</th><th>Type</th><th>Category</th>"
"<th>Last grade</th><th>Computed at</th></tr>"
+ archived_rows + "</table></div>"
)
return _layout("KPIs", body)
def _render_status() -> str:
"""Render queue and graph/wiki artifact state for operator checks."""
status = _status_payload()
queue = status["queue"]
artifacts = status["artifacts"]
counts = queue.get("counts", {})
count_pills = " ".join(
f"<span class='pill'>{html.escape(name)}: {int(counts.get(name, 0))}</span>"
for name in (
wiki_queue.STATUS_PENDING,
wiki_queue.STATUS_RUNNING,
wiki_queue.STATUS_SUCCEEDED,
wiki_queue.STATUS_FAILED,
wiki_queue.STATUS_CANCELLED,
)
)
job_rows = "".join(
"<tr>"
f"<td>{job.get('id')}</td>"
f"<td><code>{html.escape(str(job.get('kind') or ''))}</code></td>"
f"<td><span class='pill'>{html.escape(str(job.get('status') or ''))}</span></td>"
f"<td>{job.get('attempts')}/{job.get('max_attempts')}</td>"
f"<td class='muted'>{html.escape(str(job.get('source') or ''))}</td>"
f"<td class='muted'>{html.escape(str(job.get('worker_id') or ''))}</td>"
f"<td class='muted'>{html.escape(str(job.get('last_error') or ''))[:120]}</td>"
"</tr>"
for job in queue.get("recent_jobs", [])
) or "<tr><td colspan='7' class='muted'>No queue jobs recorded.</td></tr>"
artifact_keys = (
("graph_json", "graph.json"),
("graph_delta_json", "graph-delta.json"),
("communities_json", "communities.json"),
("wiki_graph_tar", "wiki-graph.tar.gz"),
("skills_sh_catalog", "skill-index.json.gz"),
)
artifact_rows = "".join(
"<tr>"
f"<td><code>{label}</code></td>"
f"<td>{'yes' if artifacts[key].get('exists') else 'no'}</td>"
f"<td>{int(artifacts[key].get('size') or 0):,}</td>"
f"<td class='muted'>{html.escape(str(artifacts[key].get('path') or ''))}</td>"
"</tr>"
for key, label in artifact_keys
)
promotion_rows = "".join(
"<tr>"
f"<td><span class='pill'>{html.escape(str(row.get('status') or ''))}</span></td>"
f"<td class='muted'>{html.escape(str(row.get('promoted_at') or row.get('started_at') or ''))}</td>"
f"<td class='muted'><code>{html.escape(str(row.get('current_sha256') or row.get('candidate_sha256') or ''))[:16]}</code></td>"
f"<td class='muted'>{html.escape(str(row.get('target') or ''))}</td>"
"</tr>"
for row in artifacts.get("promotions", [])
) or "<tr><td colspan='4' class='muted'>No promotion metadata recorded.</td></tr>"
queue_error = queue.get("error")
if queue_error:
availability = f"error ({html.escape(str(queue.get('db_path') or ''))})"
elif queue.get("available"):
availability = "available"
else:
availability = f"not initialized ({html.escape(str(queue.get('db_path') or ''))})"
queue_error_html = (
"<p class='error'>Queue DB error: "
f"{html.escape(str(queue_error))}</p>"
if queue_error
else ""
)
body = (
"<h1>Status</h1>"
"<div class='card'>"
"<strong>Queue state</strong>"
f"<p class='muted'>Durable worker DB: {availability}. "
f"Total jobs: {int(queue.get('total') or 0)}. "
"<a href='/api/status.json'>JSON</a></p>"
f"{queue_error_html}"
f"<div>{count_pills}</div>"
"</div>"
"<div class='card'><strong>Recent queue jobs</strong>"
"<table><tr><th>ID</th><th>Kind</th><th>Status</th><th>Attempts</th>"
"<th>Source</th><th>Worker</th><th>Last error</th></tr>"
+ job_rows
+ "</table></div>"
"<div class='card'><strong>Artifact versions</strong>"
"<table><tr><th>Artifact</th><th>Exists</th><th>Bytes</th><th>Path</th></tr>"
+ artifact_rows
+ "</table></div>"
f"<div class='card'><strong>Artifact promotions ({artifacts.get('promotion_count', 0)})</strong>"
"<table><tr><th>Status</th><th>Time</th><th>Hash</th><th>Target</th></tr>"
+ promotion_rows
+ "</table></div>"
)
return _layout("Status", body)
def _render_events() -> str:
"""SSE endpoint page. The server emits events at /api/events.stream."""
entries = _read_jsonl(_audit_log_path(), limit=200)
event_lines = [
json.dumps(entry, ensure_ascii=False, default=str)
for entry in entries
]
initial_stream = "\n".join(event_lines)
if not initial_stream:
initial_stream = "-- no audit events recorded yet; waiting for new events --"
return _layout(
"Live events",
"<h1>Live events</h1>"
"<p class='muted'>Tails <code>~/.claude/ctx-audit.jsonl</code> "
"via server-sent events.</p>"
"<div class='card'>"
f"<strong>Showing last {len(entries)} audit events</strong>; "
"new writes append below. "
"<span id='stream-status' class='muted'>connecting...</span>"
"</div>"
"<pre id='stream' style='min-height:20rem; max-height:70vh; "
"overflow-y:scroll; font-size:0.78rem;'>"
f"{html.escape(initial_stream)}"
"</pre>"
"<script>\n"
"const src = new EventSource('/api/events.stream');\n"
"const pre = document.getElementById('stream');\n"
"const status = document.getElementById('stream-status');\n"
"const appendLine = (line) => {\n"
" if (pre.textContent && !pre.textContent.endsWith('\\n')) pre.textContent += '\\n';\n"
" pre.textContent += line + '\\n';\n"
" pre.scrollTop = pre.scrollHeight;\n"
"};\n"
"pre.scrollTop = pre.scrollHeight;\n"
"src.onopen = () => { status.textContent = 'connected; waiting for new events'; };\n"
"src.onmessage = (e) => { appendLine(e.data); status.textContent = 'live'; };\n"
"src.onerror = () => { status.textContent = 'stream error; reconnecting'; };\n"
"</script>",
)
def _render_loaded(mutations_enabled: bool | None = None) -> str:
"""Live view of ~/.claude/skill-manifest.json with load/unload actions.
Groups manifest entries by ``entity_type`` (skill / agent / mcp-server / harness)
with a per-section count. Unload button posts both the slug and
entity_type so the server routes correctly β€” MCPs need
``claude mcp remove``, skills + agents take the file-copy path.
Legacy entries without entity_type default to ``skill`` (what the
pre-install_utils manifest implicitly assumed).
"""
if mutations_enabled is None:
mutations_enabled = _MONITOR_MUTATIONS_ENABLED
manifest = _read_manifest()
load_rows = manifest.get("load", [])
unload_rows = manifest.get("unload", [])
def _etype(entry: dict) -> str:
# Missing entity_type => legacy skill entry.
return str(entry.get("entity_type") or "skill")
# Split loaded by entity_type for the sectioned layout.
by_type: dict[str, list[dict]] = {
"skill": [],
"agent": [],
"mcp-server": [],
"harness": [],
}
for e in load_rows:
by_type.setdefault(_etype(e), []).append(e)
disabled_attr = "" if mutations_enabled else " disabled"
mutation_token = _MONITOR_TOKEN if mutations_enabled else ""
mutation_notice = (
""
if mutations_enabled
else (
"<div class='card'><strong>Read-only mode.</strong> "
"Load/unload actions are disabled because ctx-monitor is not "
"bound to a loopback address.</div>"
)
)
def _row(e: dict) -> str:
slug = e.get("skill", "")
etype = _etype(e)
link = (
f"<a href='/wiki/{html.escape(slug)}?type={html.escape(etype)}'>"
f"<code>{html.escape(slug)}</code></a>"
)
action = (
f"<td class='muted'><code>ctx-harness-install {html.escape(slug)} "
f"--uninstall --dry-run</code></td>"
if etype == "harness" else
f"<td><button class='btn-unload' data-slug='{html.escape(slug)}' "
f"data-etype='{html.escape(etype)}'{disabled_attr}>unload</button></td>"
)
return (
f"<tr>"
f"<td>{link}</td>"
f"<td class='muted'>{html.escape(e.get('source', ''))}</td>"
f"<td class='muted'>{html.escape(str(e.get('command', '') or e.get('priority', 'β€”')))[:60]}</td>"
f"{action}"
f"</tr>"
)
def _section(title: str, etype: str) -> str:
rows = by_type.get(etype, [])
if not rows:
return (
f"<h3 style='margin-top:1.2rem;'>{title} "
f"<span class='muted' style='font-size:0.85rem;'>(0)</span></h3>"
f"<p class='muted' style='margin-left:0.4rem;'>"
f"None loaded.</p>"
)
return (
f"<h3 style='margin-top:1.2rem;'>{title} "
f"<span class='muted' style='font-size:0.85rem;'>({len(rows)})</span></h3>"
f"<table>"
f"<tr><th>Slug</th><th>Source</th><th>Cmd / priority</th><th></th></tr>"
+ "".join(_row(e) for e in rows)
+ "</table>"
)
unload_html = "".join(
f"<tr>"
f"<td><code>{html.escape(e.get('skill', ''))}</code></td>"
f"<td class='muted'>{html.escape(_etype(e))}</td>"
f"<td class='muted'>{html.escape(str(e.get('source', '') or e.get('reason', ''))[:80])}</td>"
f"<td><button class='btn-load' data-slug='{html.escape(e.get('skill', ''))}' "
f"data-etype='{html.escape(_etype(e))}'"
f"{' data-command=' + repr(html.escape(str(e.get('command')))) if e.get('command') else ''}"
f"{' data-json-config=' + repr(html.escape(str(e.get('json_config')))) if e.get('json_config') else ''}"
f"{disabled_attr}>load</button></td>"
f"</tr>"
for e in unload_rows
)
body = (
"<h1>Loaded entities β€” skills, agents, MCPs &amp; harnesses</h1>"
f"<div class='card'>"
f"<strong>{len(load_rows)}</strong> currently loaded "
f"(<span class='muted'>"
f"{len(by_type.get('skill', []))} skills Β· "
f"{len(by_type.get('agent', []))} agents Β· "
f"{len(by_type.get('mcp-server', []))} MCPs Β· "
f"{len(by_type.get('harness', []))} harnesses</span>) Β· "
f"<strong>{len(unload_rows)}</strong> known-unloaded Β· "
f"<span class='muted'>source: <code>~/.claude/skill-manifest.json</code> "
f"+ <code>~/.claude/harness-installs/*.json</code></span>"
"</div>"
f"{mutation_notice}"
"<h2>Load an entity</h2>"
"<div class='card'>"
"<form id='load-form'>"
"<input type='text' id='load-input' placeholder='slug (e.g. fastapi-pro)' "
"style='padding:0.35rem 0.6rem; width:18rem; border:1px solid #ccc; "
"border-radius:4px;'>"
"<select id='load-type' style='margin-left:0.5rem; padding:0.35rem 0.6rem; "
"border:1px solid #ccc; border-radius:4px;'>"
"<option value='skill'>skill</option>"
"<option value='agent'>agent</option>"
"<option value='mcp-server'>mcp-server</option>"
"</select>"
f"<button type='submit' style='margin-left:0.5rem;'{disabled_attr}>load</button>"
"<span id='load-msg' class='muted' style='margin-left:0.75rem;'></span>"
"</form></div>"
f"<h2>Currently loaded ({len(load_rows)})</h2>"
+ _section("Skills", "skill")
+ _section("Agents", "agent")
+ _section("MCP servers", "mcp-server")
+ _section("Harnesses", "harness")
+ f"<h2>Recently unloaded ({len(unload_rows)})</h2>"
"<table><tr><th>Slug</th><th>Type</th><th>Source / reason</th><th></th></tr>"
+ unload_html + "</table>"
"<script>\n"
f"const CTX_MONITOR_MUTATIONS_ENABLED = {json.dumps(mutations_enabled)};\n"
f"const CTX_MONITOR_TOKEN = {json.dumps(mutation_token)};\n"
"async function post(url, body) {\n"
" if (!CTX_MONITOR_MUTATIONS_ENABLED) return {ok:false, msg:'mutations disabled on non-loopback bind'};\n"
" const r = await fetch(url, {method:'POST', headers:{'Content-Type':'application/json', 'X-CTX-Monitor-Token':CTX_MONITOR_TOKEN}, body: JSON.stringify(body || {})});\n"
" const ok = r.status >= 200 && r.status < 300;\n"
" let msg = ''; try { msg = (await r.json()).detail || r.statusText; } catch(_) { msg = r.statusText; }\n"
" return {ok, msg};\n"
"}\n"
"document.querySelectorAll('.btn-unload').forEach(b => b.addEventListener('click', async () => {\n"
" b.disabled = true; const slug = b.dataset.slug; const entity_type = b.dataset.etype || 'skill';\n"
" const r = await post('/api/unload', {slug, entity_type});\n"
" if (r.ok) location.reload(); else { b.disabled = false; alert('unload failed: ' + r.msg); }\n"
"}));\n"
"document.querySelectorAll('.btn-load').forEach(b => b.addEventListener('click', async () => {\n"
" b.disabled = true; const slug = b.dataset.slug; const entity_type = b.dataset.etype || 'skill';\n"
" const payload = {slug, entity_type};\n"
" if (b.dataset.command) payload.command = b.dataset.command;\n"
" if (b.dataset.jsonConfig) payload.json_config = b.dataset.jsonConfig;\n"
" const r = await post('/api/load', payload);\n"
" if (r.ok) location.reload(); else { b.disabled = false; alert('load failed: ' + r.msg); }\n"
"}));\n"
"document.getElementById('load-form').addEventListener('submit', async (ev) => {\n"
" ev.preventDefault();\n"
" const slug = document.getElementById('load-input').value.trim();\n"
" const entity_type = document.getElementById('load-type').value;\n"
" if (!slug) return;\n"
" document.getElementById('load-msg').textContent = 'loading…';\n"
" const r = await post('/api/load', {slug, entity_type});\n"
" document.getElementById('load-msg').textContent = r.ok ? 'ok β€” reloading' : ('failed: ' + r.msg);\n"
" if (r.ok) setTimeout(() => location.reload(), 400);\n"
"});\n"
"</script>"
)
return _layout("Loaded", body)
def _render_runtime_lifecycle() -> str:
summary = _runtime_lifecycle_summary()
def _event_cell(event: dict[str, Any], key: str, limit: int = 120) -> str:
return html.escape(str(event.get(key) or ""))[:limit]
validation_rows = "".join(
"<tr>"
f"<td class='muted'>{_event_cell(event, 'created_at')}</td>"
f"<td><code>{_event_cell(event, 'check_name')}</code></td>"
f"<td><span class='pill'>{_event_cell(event, 'status')}</span></td>"
f"<td class='muted'>{_event_cell(event, 'session_id')}</td>"
f"<td class='muted'>{_event_cell(event, 'summary')}</td>"
"</tr>"
for event in reversed(summary["recent_validations"])
)
escalation_rows = "".join(
"<tr>"
f"<td class='muted'>{_event_cell(event, 'created_at')}</td>"
f"<td><code>{_event_cell(event, 'trigger')}</code></td>"
f"<td><span class='pill'>{_event_cell(event, 'severity')}</span></td>"
f"<td class='muted'>{_event_cell(event, 'session_id')}</td>"
f"<td class='muted'>{_event_cell(event, 'reason')}</td>"
"</tr>"
for event in reversed(summary["open_escalations"])
)
body = (
"<h1>Runtime lifecycle</h1>"
"<div class='card'>"
f"<strong>{summary['validations_total']}</strong> validations / "
f"<strong>{summary['validation_failures']}</strong> failed / "
f"<strong>{summary['open_escalations_total']}</strong> open escalations"
f"<br><span class='muted'>source: <code>{html.escape(summary['path'])}</code></span>"
" / <a href='/api/runtime.json'>JSON</a>"
"</div>"
"<div class='card'><strong>Recent validations</strong>"
+ (
"<table><tr><th>Created</th><th>Check</th><th>Status</th>"
"<th>Session</th><th>Summary</th></tr>"
+ validation_rows
+ "</table>"
if validation_rows else
"<p class='muted'>No validation checks recorded yet.</p>"
)
+ "</div>"
"<div class='card'><strong>Open escalations</strong>"
+ (
"<table><tr><th>Created</th><th>Trigger</th><th>Severity</th>"
"<th>Session</th><th>Reason</th></tr>"
+ escalation_rows
+ "</table>"
if escalation_rows else
"<p class='muted'>No open escalations.</p>"
)
+ "</div>"
)
return _layout("Runtime lifecycle", body)
def _render_logs() -> str:
"""Filterable audit-log viewer β€” reads the last 500 lines of the log."""
entries = _read_jsonl(_audit_log_path(), limit=500)
rows = "".join(
f"<tr data-event='{html.escape(e.get('event', ''))}' "
f"data-subject='{html.escape(e.get('subject', ''))}' "
f"data-session='{html.escape(e.get('session_id', '') or '')}'>"
f"<td class='muted'>{html.escape(e.get('ts', ''))}</td>"
f"<td><span class='pill'>{html.escape(e.get('event', ''))}</span></td>"
f"<td><code>{html.escape(e.get('subject', ''))}</code></td>"
f"<td class='muted'>{html.escape(e.get('actor', ''))}</td>"
f"<td class='muted'>{html.escape((e.get('session_id') or '')[:24])}</td>"
f"<td class='muted'>{html.escape(json.dumps(e.get('meta', {}))[:100])}</td>"
f"</tr>"
for e in reversed(entries)
)
body = (
"<h1>Audit log</h1>"
f"<div class='card'>Showing last {len(entries)} of "
f"<code>~/.claude/ctx-audit.jsonl</code>. "
"<a href='/events'>Live stream β†’</a>"
"</div>"
"<div class='card'>"
"<input type='text' id='filter' placeholder='filter: event/subject/session…' "
"style='padding:0.35rem 0.6rem; width:20rem; border:1px solid #ccc; border-radius:4px;'>"
"<span class='muted' style='margin-left:0.75rem;'>"
"e.g. <code>skill.loaded</code>, <code>kubernetes-deployment</code>, or a session id</span>"
"</div>"
"<table id='logs'><tr><th>ts</th><th>event</th><th>subject</th>"
"<th>actor</th><th>session</th><th>meta</th></tr>" + rows + "</table>"
"<script>\n"
"const input = document.getElementById('filter');\n"
"const rows = document.querySelectorAll('#logs tr[data-event]');\n"
"input.addEventListener('input', () => {\n"
" const q = input.value.toLowerCase();\n"
" rows.forEach(r => {\n"
" const hay = [r.dataset.event, r.dataset.subject, r.dataset.session].join(' ').toLowerCase();\n"
" r.style.display = !q || hay.includes(q) ? '' : 'none';\n"
" });\n"
"});\n"
"</script>"
)
return _layout("Audit log", body)
# ─── Mutation endpoints ──────────────────────────────────────────────────────
def _is_safe_slug(slug: str) -> bool:
return is_safe_source_name(slug)
def _perform_load(
slug: str,
entity_type: str = "skill",
*,
command: str | None = None,
json_config: str | None = None,
) -> tuple[bool, str]:
return dashboard_entities.perform_load(
slug,
entity_type,
command=command,
json_config=json_config,
deps=_entity_runtime_deps(),
)
def _perform_unload(slug: str, entity_type: str = "skill") -> tuple[bool, str]:
return dashboard_entities.perform_unload(
slug,
entity_type,
deps=_entity_runtime_deps(),
)
# ─── HTTP handler ────────────────────────────────────────────────────────────
def _server_shutdown_requested(server: Any) -> bool:
event = getattr(server, "_ctx_shutdown", None)
return bool(event is not None and event.is_set())
class _MonitorHandler(BaseHTTPRequestHandler):
# Silence the per-request access log spam. Users running
# ctx-monitor get a clean stdout; errors still surface via
# log_error() below.
def log_message(self, fmt: str, *args: Any) -> None:
return
# CSRF defense. Dashboard mutation endpoints (/api/load, /api/unload)
# require same-origin POSTs plus a per-process token injected into the
# served dashboard page.
def _same_origin(self) -> bool:
request_host = _request_host_name(self.headers.get("Host", ""))
if not _host_allows_mutations(request_host):
return False
origin = self.headers.get("Origin") or ""
if origin:
return _origin_host_name(origin) == request_host
# No Origin header (curl, direct tool calls) is acceptable only
# when the mutation token below is also present.
return True
def _mutations_enabled(self) -> bool:
return bool(
getattr(self.server, "_ctx_mutations_enabled", _MONITOR_MUTATIONS_ENABLED),
)
def _mutation_authorized(self) -> bool:
token = self.headers.get("X-CTX-Monitor-Token") or ""
return (
self._mutations_enabled()
and bool(_MONITOR_TOKEN)
and secrets.compare_digest(token, _MONITOR_TOKEN)
)
def _api_reads_enabled(self) -> bool:
return self._mutations_enabled()
def _read_authorized(self, qs: dict[str, str]) -> bool:
request_host = _request_host_name(self.headers.get("Host", ""))
if self._mutations_enabled():
return _host_allows_mutations(request_host)
token = (
self.headers.get("X-CTX-Monitor-Token")
or qs.get("token", "")
or _read_token_cookie(self.headers.get("Cookie", ""))
)
return bool(_MONITOR_TOKEN) and secrets.compare_digest(token, _MONITOR_TOKEN)
def _send_security_headers(self, *, html_response: bool = False) -> None:
self.send_header("X-Content-Type-Options", "nosniff")
self.send_header("Referrer-Policy", "no-referrer")
self.send_header("X-Frame-Options", "DENY")
if getattr(self, "_ctx_set_read_cookie", False):
self.send_header(
"Set-Cookie",
f"{_READ_TOKEN_COOKIE}={_MONITOR_TOKEN}; Path=/; "
"HttpOnly; SameSite=Strict",
)
if html_response:
self.send_header(
"Content-Security-Policy",
"default-src 'self'; script-src 'self' 'unsafe-inline'; "
"style-src 'self' 'unsafe-inline'; connect-src 'self'; "
"object-src 'none'; base-uri 'none'; frame-ancestors 'none'",
)
def _content_length(self) -> int | None:
raw = self.headers.get("Content-Length")
if raw is None:
return 0
try:
length = int(raw)
except ValueError:
self._send_json_status(400, {"detail": "invalid Content-Length"})
return None
if length < 0:
self._send_json_status(400, {"detail": "invalid Content-Length"})
return None
if length > _MAX_POST_BODY_BYTES:
self._send_json_status(413, {"detail": "JSON body too large"})
return None
return length
def _read_json_body(self) -> dict[str, Any] | None:
content_type = self.headers.get("Content-Type", "").split(";", 1)[0]
if content_type.lower() != "application/json":
self._send_json_status(415, {"detail": "JSON body required"})
return None
length = self._content_length()
if length is None:
return None
raw = self.rfile.read(length) if length else b""
try:
body = json.loads(raw.decode("utf-8")) if raw else {}
except (UnicodeDecodeError, json.JSONDecodeError):
self._send_json_status(400, {"detail": "invalid JSON body"})
return None
if not isinstance(body, dict):
self._send_json_status(400, {"detail": "JSON object body required"})
return None
return body
def _discard_small_body(self) -> None:
raw = self.headers.get("Content-Length")
if raw is None:
return
try:
length = int(raw)
except ValueError:
return
if 0 < length <= _MAX_POST_BODY_BYTES:
self.rfile.read(length)
def do_GET(self) -> None: # noqa: N802 β€” stdlib signature
# Parse once so we can reuse the query string for /graph?slug=…
raw_path, _, raw_query = self.path.partition("?")
path = raw_path
qs = {}
if raw_query:
from urllib.parse import parse_qs
qs = {k: v[0] for k, v in parse_qs(raw_query).items()}
try:
self._ctx_set_read_cookie = False
read_authorized = getattr(self, "_read_authorized", lambda _qs: True)
if not read_authorized(qs):
if path.startswith("/api/"):
self._send_json_status(
403,
{"detail": "monitor read token required on non-loopback bind"},
)
else:
self._send_html_status(
403,
"<h1>403</h1>"
"<p>monitor read token required on non-loopback bind</p>",
)
return
query_token = qs.get("token", "")
self._ctx_set_read_cookie = (
not self._mutations_enabled()
and bool(query_token)
and bool(_MONITOR_TOKEN)
and secrets.compare_digest(query_token, _MONITOR_TOKEN)
)
if path == "/":
self._send_html(_render_home())
elif path == "/sessions":
self._send_html(_render_sessions_index())
elif path.startswith("/session/"):
self._send_html(_render_session_detail(path.split("/session/", 1)[1]))
elif path == "/skills":
self._send_html(_render_skills(qs))
elif path.startswith("/skill/"):
self._send_html(_render_skill_detail(
path.split("/skill/", 1)[1],
qs.get("type"),
))
elif path == "/loaded":
self._send_html(_render_loaded(self._mutations_enabled()))
elif path == "/logs":
self._send_html(_render_logs())
elif path == "/graph":
self._send_html(_render_graph(qs.get("slug"), qs.get("type")))
elif path == "/manage":
self._send_html(_render_manage(self._mutations_enabled()))
elif path == "/harness":
self._send_html(_render_harness_wizard())
elif path == "/docs":
self._send_html(_render_docs())
elif path == "/config":
self._send_html(_render_config())
elif path == "/status":
self._send_html(_render_status())
elif path == "/wiki":
self._send_html(_render_wiki_index(qs.get("type"), qs.get("q", "")))
elif path.startswith("/wiki/"):
slug = path.split("/wiki/", 1)[1]
self._send_html(
_render_wiki_entity(
slug,
qs.get("type"),
mutations_enabled=self._mutations_enabled(),
),
)
elif path == "/kpi":
self._send_html(_render_kpi())
elif path == "/runtime":
self._send_html(_render_runtime_lifecycle())
elif path in {"/events", "/live"}:
self._send_html(_render_events())
elif path == "/api/sessions.json":
self._send_json(_summarize_sessions())
elif path == "/api/manifest.json":
self._send_json(_read_manifest())
elif path == "/api/status.json":
self._send_json(_status_payload())
elif path == "/api/kpi.json":
summary = _kpi_summary()
self._send_json(summary.to_dict() if summary is not None else {
"total": 0, "detail": "no sidecars yet",
})
elif path == "/api/grades.json":
self._send_json(_grade_distribution_payload())
elif path == "/api/sidecars.json":
self._send_json(_sidecar_page_payload(qs))
elif path == "/api/runtime.json":
self._send_json(_runtime_lifecycle_summary())
elif path == "/api/config.json":
self._send_json(_effective_config_payload())
elif path == "/api/entities/search.json":
try:
limit = max(1, min(int(qs.get("limit", 80)), 200))
results = _search_wiki_entities(
qs.get("q", ""),
qs.get("type") or None,
limit=limit,
)
except ValueError as exc:
self._send_json_status(400, {"detail": str(exc)})
return
self._send_json({"results": results, "total": len(results)})
elif path.startswith("/api/entity/") and path.endswith(".json"):
slug = unquote(path[len("/api/entity/"): -len(".json")])
try:
detail = _wiki_entity_detail(slug, qs.get("type"))
except ValueError as exc:
self._send_json_status(400, {"detail": str(exc)})
return
if detail is None:
self._send_json_status(404, {"detail": f"no wiki entity for {slug}"})
else:
self._send_json(detail)
elif path.startswith("/api/skill/") and path.endswith(".json"):
slug = unquote(path[len("/api/skill/"): -len(".json")])
sidecar = _load_sidecar(slug, entity_type=qs.get("type"))
if sidecar is None:
self._send_404(f"no sidecar for {slug}")
else:
self._send_json(sidecar)
elif path.startswith("/api/graph/") and path.endswith(".json"):
slug = unquote(path[len("/api/graph/"): -len(".json")])
requested_type = qs.get("type")
graph_entity_type = _normalize_dashboard_entity_type(requested_type)
if requested_type is not None and graph_entity_type is None:
self._send_json_status(
400,
{"detail": f"unsupported entity_type: {requested_type!r}"},
)
return
try:
hops = max(1, min(int(qs.get("hops", 1)), 3))
limit = max(5, min(int(qs.get("limit", 40)), 150))
except ValueError:
self._send_json_status(
400,
{"detail": "hops and limit must be integers"},
)
return
self._send_json(_graph_neighborhood(
slug, hops=hops, limit=limit, entity_type=graph_entity_type,
))
elif path == "/api/events.stream":
self._stream_audit_log()
else:
self._send_404(path)
except (BrokenPipeError, ConnectionAbortedError):
# Browser disconnected mid-response β€” benign for a local
# dashboard; nothing to do.
return
except Exception as exc: # noqa: BLE001 β€” last-resort handler
self._send_500(exc)
def do_POST(self) -> None: # noqa: N802 β€” stdlib signature
"""Mutation endpoints. Same-origin only; JSON body required."""
path = self.path.split("?", 1)[0]
try:
if not self._mutations_enabled():
self._discard_small_body()
self._send_json_status(
403, {"detail": "monitor mutations disabled on non-loopback bind"},
)
return
if not self._same_origin():
self._discard_small_body()
self._send_json_status(
403, {"detail": "cross-origin POST denied"},
)
return
if not self._mutation_authorized():
self._discard_small_body()
self._send_json_status(
403, {"detail": "monitor token required"},
)
return
body = self._read_json_body()
if body is None:
return
if path == "/api/load":
slug = str(body.get("slug", "")).strip()
etype = str(body.get("entity_type", "skill")).strip() or "skill"
command = body.get("command")
json_config = body.get("json_config")
kwargs: dict[str, str] = {}
if isinstance(command, str) and command:
kwargs["command"] = command
if isinstance(json_config, str) and json_config:
kwargs["json_config"] = json_config
ok, msg = _perform_load(slug, entity_type=etype, **kwargs)
self._send_json_status(
200 if ok else 400, {"ok": ok, "detail": msg},
)
elif path == "/api/unload":
slug = str(body.get("slug", "")).strip()
# entity_type defaults to "skill" for backward compat with
# existing JS that only sends {slug}. New /loaded page
# sends {slug, entity_type} so MCPs flow through the
# subprocess unload path.
etype = str(body.get("entity_type", "skill")).strip() or "skill"
ok, msg = _perform_unload(slug, entity_type=etype)
self._send_json_status(
200 if ok else 400, {"ok": ok, "detail": msg},
)
elif path == "/api/config":
updates = body.get("updates", {})
if not isinstance(updates, dict):
self._send_json_status(
400, {"ok": False, "detail": "updates must be an object"},
)
return
result = _save_config_updates(updates)
self._send_json_status(200 if result.get("ok") else 400, result)
elif path == "/api/entity/upsert":
ok, msg = _upsert_wiki_entity(body)
self._send_json_status(
200 if ok else 400, {"ok": ok, "detail": msg},
)
elif path == "/api/entity/delete":
slug = str(body.get("slug", "")).strip()
etype = str(body.get("entity_type", "skill")).strip() or "skill"
ok, msg = _delete_wiki_entity(slug, etype)
self._send_json_status(
200 if ok else 400, {"ok": ok, "detail": msg},
)
else:
self._send_404(path)
except (BrokenPipeError, ConnectionAbortedError):
return
except Exception as exc: # noqa: BLE001
self._send_500(exc)
def _send_json_status(self, status: int, obj: Any) -> None:
raw = json.dumps(obj, default=str).encode("utf-8")
self.send_response(status)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(raw)))
self._send_security_headers()
self.end_headers()
self.wfile.write(raw)
def _send_html(self, body: str) -> None:
raw = body.encode("utf-8")
self.send_response(200)
self.send_header("Content-Type", "text/html; charset=utf-8")
self.send_header("Content-Length", str(len(raw)))
self._send_security_headers(html_response=True)
self.end_headers()
self.wfile.write(raw)
def _send_html_status(self, status: int, body: str) -> None:
raw = body.encode("utf-8")
self.send_response(status)
self.send_header("Content-Type", "text/html; charset=utf-8")
self.send_header("Content-Length", str(len(raw)))
self._send_security_headers(html_response=True)
self.end_headers()
self.wfile.write(raw)
def _send_json(self, obj: Any) -> None:
raw = json.dumps(obj, default=str).encode("utf-8")
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(raw)))
self._send_security_headers()
self.end_headers()
self.wfile.write(raw)
def _send_404(self, detail: str) -> None:
body = f"<h1>404</h1><p>{html.escape(detail)}</p>".encode()
self.send_response(404)
self.send_header("Content-Type", "text/html; charset=utf-8")
self.send_header("Content-Length", str(len(body)))
self._send_security_headers(html_response=True)
self.end_headers()
self.wfile.write(body)
def _send_500(self, exc: BaseException) -> None:
self.log_error("render error: %s", exc)
body = f"<h1>500</h1><pre>{html.escape(repr(exc))}</pre>".encode()
self.send_response(500)
self.send_header("Content-Type", "text/html; charset=utf-8")
self.send_header("Content-Length", str(len(body)))
self._send_security_headers(html_response=True)
self.end_headers()
self.wfile.write(body)
def _stream_audit_log(self) -> None:
"""Server-sent events: tail the audit log line-by-line."""
self.send_response(200)
self.send_header("Content-Type", "text/event-stream")
self.send_header("Cache-Control", "no-cache")
self.send_header("Connection", "keep-alive")
self._send_security_headers()
self.end_headers()
path = _audit_log_path()
position = path.stat().st_size if path.exists() else 0
last_heartbeat = time.monotonic()
try:
while not _server_shutdown_requested(self.server):
if path.exists() and path.stat().st_size > position:
with path.open("r", encoding="utf-8") as f:
f.seek(position)
for line in f:
if not line.strip():
continue
self.wfile.write(f"data: {line.rstrip()}\n\n".encode())
self.wfile.flush()
position = f.tell()
last_heartbeat = time.monotonic()
elif time.monotonic() - last_heartbeat > 25:
# SSE heartbeat comment β€” keeps proxies from timing out
# on idle streams. Also detects dead clients (write
# will raise BrokenPipeError).
self.wfile.write(b": heartbeat\n\n")
self.wfile.flush()
last_heartbeat = time.monotonic()
time.sleep(0.5)
except (BrokenPipeError, ConnectionAbortedError, ConnectionResetError):
return
# ─── CLI ─────────────────────────────────────────────────────────────────────
class _MonitorServer(ThreadingHTTPServer):
daemon_threads = True
block_on_close = False
def __init__(self, *args: Any, **kwargs: Any) -> None:
self._ctx_shutdown = threading.Event()
super().__init__(*args, **kwargs)
def shutdown(self) -> None:
self._ctx_shutdown.set()
super().shutdown()
def server_close(self) -> None:
self._ctx_shutdown.set()
super().server_close()
def handle_error(self, request: Any, client_address: Any) -> None:
exc_type, _, _ = sys.exc_info()
if exc_type is not None and issubclass(
exc_type,
(BrokenPipeError, ConnectionAbortedError, ConnectionResetError),
):
return
super().handle_error(request, client_address)
def _make_monitor_server(host: str, port: int) -> _MonitorServer:
global _MONITOR_MUTATIONS_ENABLED
_MONITOR_MUTATIONS_ENABLED = _host_allows_mutations(host)
server = _MonitorServer((host, port), _MonitorHandler)
server._ctx_mutations_enabled = _MONITOR_MUTATIONS_ENABLED
return server
def serve(host: str = "127.0.0.1", port: int = 8765) -> None:
"""Run the monitor. Blocks until Ctrl+C."""
global _MONITOR_TOKEN
server = _make_monitor_server(host, port)
_MONITOR_TOKEN = secrets.token_urlsafe(32)
mutations_enabled = bool(getattr(server, "_ctx_mutations_enabled", False))
url = f"http://{_monitor_display_host(host)}:{port}/"
if not mutations_enabled:
url = f"{url}?token={_MONITOR_TOKEN}"
print(f"ctx-monitor serving at {url} (Ctrl+C to stop)", flush=True)
if not mutations_enabled:
print(
"ctx-monitor: non-loopback bind; read token required and "
"load/unload mutations disabled",
flush=True,
)
try:
server.serve_forever()
except KeyboardInterrupt:
print("ctx-monitor: shutdown", flush=True)
finally:
server.server_close()
def _monitor_display_host(host: str) -> str:
"""Return a URL host users can paste into a browser."""
if host in {"0.0.0.0", "::"}:
try:
candidate = socket.gethostbyname(socket.gethostname())
except OSError:
candidate = ""
if candidate and not candidate.startswith("127."):
return candidate
return "localhost"
if ":" in host and not host.startswith("["):
return f"[{host}]"
return host
def main(argv: list[str] | None = None) -> int:
parser = argparse.ArgumentParser(
prog="ctx-monitor",
description="Local HTTP dashboard for ctx skill/agent activity.",
)
sub = parser.add_subparsers(dest="cmd", required=True)
sp = sub.add_parser("serve", help="Start the monitor web server")
sp.add_argument("--port", type=int, default=8765)
sp.add_argument(
"--host", default="127.0.0.1",
help="Host to bind (default: 127.0.0.1; use 0.0.0.0 to expose β€” be careful)",
)
args = parser.parse_args(argv)
if args.cmd == "serve":
serve(host=args.host, port=args.port)
return 0
if __name__ == "__main__":
sys.exit(main())