# mypy: disable-error-code=attr-defined """ctx_monitor.py -- Local HTTP dashboard for ctx runtime and catalog activity. ``ctx-monitor serve [--port 8765]`` starts a zero-dependency threaded HTTP server (stdlib http.server) that renders the audit log + skill-events.jsonl + sidecars into a browser UI at http://localhost:8765/. Routes: / Home — summary stats + session list + links /loaded Live manifest view + load/unload actions /sessions List of sessions (skills/agents/MCP activity) /session/ Skills + agents seen in that session /skills Sidecar card grid with grade + score filters /skill/ Sidecar breakdown + timeline of audit events /wiki Wiki entity index — all pages with search /wiki/?type= One wiki entity page (frontmatter + body) /graph Built-in graph explorer + popular seeds /graph?slug=&type=... Focus graph view on a specific entity /manage Search/edit/delete/import catalog entities /harness Manual harness setup for user-owned LLMs /docs Local docs index + public docs handoff /config Editable ctx config with defaults fallback /status Durable queue + graph/wiki artifact state /kpi Grade / lifecycle / category KPIs /runtime Generic harness validation/escalation ledger /logs Filterable tail of ctx-audit.jsonl /events Live SSE stream of new audit-log lines /api/sessions.json JSON index for scripting /api/manifest.json Raw ~/.claude/skill-manifest.json /api/status.json Queue counts + artifact promotion metadata /api/runtime.json Generic harness validation/escalation summary /api/skill/.json Sidecar passthrough /api/graph/.json Dashboard-shaped neighborhood; accepts type /api/entities/search.json Search wiki entities across supported types /api/entity/.json Wiki entity frontmatter + Markdown body /api/config.json Effective/default/user config /api/kpi.json DashboardSummary passthrough Design notes: - No Flask / Starlette / FastAPI dependency. Request handling is threaded so one open SSE client cannot monopolize the local dashboard. Repo-doc rendering uses the package's Markdown dependencies for MkDocs-like output. - GET views read append-only files. POST mutation endpoints require loopback access, a per-process token, and same-origin headers. - SSE endpoint tails ``~/.claude/ctx-audit.jsonl`` and pushes each new line as a server-sent event. Clients auto-reconnect. - Security: binds to 127.0.0.1 by default. ``--host`` override requires an explicit flag to emphasize the local-dev-only intent. This is a minimal dashboard. Power users should pipe the audit log into Grafana / Loki / whatever; ``ctx-monitor`` is the zero-config starting point. """ from __future__ import annotations import argparse import hashlib import html import ipaddress import json import math import os import re import secrets import sqlite3 import socket import sys import tarfile import threading import time import zlib from collections import defaultdict, deque from http.cookies import CookieError, SimpleCookie from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer from importlib.resources import files from pathlib import Path from typing import Any from urllib.parse import quote, unquote, urlsplit from ctx import dashboard_docs, dashboard_entities, dashboard_graph from ctx.core import entity_types as core_entity_types from ctx.core.wiki import wiki_queue from ctx.core.wiki.wiki_utils import parse_frontmatter_and_body from ctx.utils._file_lock import file_lock from ctx.utils._fs_utils import atomic_write_text as _atomic_write_text from ctx.utils._fs_utils import safe_atomic_write_text as _safe_atomic_write_text from ctx.utils._safe_name import is_safe_source_name _MONITOR_TOKEN = "" _MONITOR_MUTATIONS_ENABLED = True _GRAPH_CACHE_KEY: tuple[Any, ...] | None = None _GRAPH_CACHE_VALUE: Any | None = None _PACKAGED_GRAPH_EXPORT_ID_CACHE: str | None | bool = None _OVERLAY_INDEX_COVERAGE_CACHE_KEY: tuple[Any, ...] | None = None _OVERLAY_INDEX_COVERAGE_CACHE_VALUE: bool | None = None _SIDECAR_INDEX_CACHE_KEY: tuple[tuple[Path, float, int], ...] | None = None _SIDECAR_INDEX_CACHE_VALUE: dict[tuple[str, str], dict] | None = None _SIDECAR_FILTER_CACHE_SIGNATURE: tuple[Any, ...] | None = None _SIDECAR_FILTER_CACHE_VALUE: dict[tuple[Any, ...], list[dict[str, Any]]] = {} _KPI_SUMMARY_CACHE_KEY: tuple[Any, ...] | None = None _KPI_SUMMARY_CACHE_VALUE: Any | None = None _KPI_SUMMARY_CACHE_AT = 0.0 _WIKI_RENDER_CACHE_KEY: tuple[Any, ...] | None = None _WIKI_RENDER_CACHE_VALUE: str | None = None _WIKI_INDEX_LIMIT_PER_TYPE = 500 _SKILLS_PAGE_DEFAULT_LIMIT = 100 _SKILLS_PAGE_MAX_LIMIT = 500 _KPI_SUMMARY_CACHE_SECONDS = 30 _GRAPH_REPORT_RE = re.compile(r"Nodes:\s*([\d,]+)\s*\|\s*Edges:\s*([\d,]+)") _MAX_POST_BODY_BYTES = 64 * 1024 _DASHBOARD_INDEX_MEMBER = "graphify-out/dashboard-neighborhoods.sqlite3" _READ_TOKEN_COOKIE = "ctx_monitor_read_token" # ─── Data sources ──────────────────────────────────────────────────────────── def _host_allows_mutations(host: str) -> bool: normalized = (host or "").strip().strip("[]").rstrip(".").lower() if normalized == "localhost": return True try: return ipaddress.ip_address(normalized).is_loopback except ValueError: return False def _request_host_name(host_header: str) -> str: value = (host_header or "").strip() if not value: return "" if value.startswith("["): end = value.find("]") return value[1:end].rstrip(".").lower() if end != -1 else "" return value.rsplit(":", 1)[0].rstrip(".").lower() def _origin_host_name(origin: str) -> str: try: parsed = urlsplit(origin) except ValueError: return "" if parsed.scheme not in {"http", "https"}: return "" return (parsed.hostname or "").rstrip(".").lower() def _read_token_cookie(cookie_header: str) -> str: if not cookie_header: return "" try: cookie = SimpleCookie() cookie.load(cookie_header) except CookieError: return "" morsel = cookie.get(_READ_TOKEN_COOKIE) return morsel.value if morsel is not None else "" def _claude_dir() -> Path: return Path(os.path.expanduser("~/.claude")) def _audit_log_path() -> Path: # Avoid importing ctx_audit_log here so the monitor can run even if # ctx_audit_log is absent for some reason. return _claude_dir() / "ctx-audit.jsonl" def _events_jsonl_path() -> Path: return _claude_dir() / "skill-events.jsonl" def _runtime_lifecycle_path() -> Path: from ctx.adapters.generic.runtime_lifecycle import RuntimeLifecycleStore return RuntimeLifecycleStore().events_path def _manifest_path() -> Path: return _claude_dir() / "skill-manifest.json" def _sidecar_dir() -> Path: return _claude_dir() / "skill-quality" def _wiki_dir() -> Path: return _claude_dir() / "skill-wiki" def _user_config_path() -> Path: return _claude_dir() / "skill-system-config.json" def _load_dashboard_graph() -> Any: """Load the wiki graph once per graph.json file version.""" global _GRAPH_CACHE_KEY, _GRAPH_CACHE_VALUE graph_path = _wiki_dir() / "graphify-out" / "graph.json" overlay_path = graph_path.with_name("entity-overlays.jsonl") from ctx.core.graph.resolve_graph import load_graph as _lg # type: ignore if not graph_path.exists(): _GRAPH_CACHE_KEY = None _GRAPH_CACHE_VALUE = None return _lg(graph_path) stat = graph_path.stat() overlay_key = None if overlay_path.exists(): overlay_stat = overlay_path.stat() overlay_key = (overlay_stat.st_mtime, overlay_stat.st_size) cache_key = (graph_path.resolve(), stat.st_mtime, stat.st_size, id(_lg), overlay_key) if _GRAPH_CACHE_KEY == cache_key and _GRAPH_CACHE_VALUE is not None: return _GRAPH_CACHE_VALUE try: graph = _lg(graph_path, apply_runtime_filter=False) except TypeError: graph = _lg(graph_path) _GRAPH_CACHE_KEY = cache_key _GRAPH_CACHE_VALUE = graph return graph def _mcp_shard(slug: str) -> str: return core_entity_types.mcp_shard(slug) _DASHBOARD_ENTITY_SOURCES: tuple[tuple[str, str, bool], ...] = core_entity_types.entity_source_specs() _DASHBOARD_ENTITY_TYPES: tuple[str, ...] = tuple( entity_type for _, entity_type, _ in _DASHBOARD_ENTITY_SOURCES ) _DEFAULT_GRAPH_FOCUS_SLUG = "github" def _normalize_dashboard_entity_type(raw: object) -> str | None: return core_entity_types.normalize_entity_type( raw, allowed=_DASHBOARD_ENTITY_TYPES, ) def _audit_entity_type(row: dict) -> str | None: raw_meta = row.get("meta") meta: dict[str, Any] = raw_meta if isinstance(raw_meta, dict) else {} for raw in ( meta.get("entity_type"), row.get("entity_type"), row.get("subject_type"), row.get("type"), ): normalized = _normalize_dashboard_entity_type(raw) if normalized: return normalized event = str(row.get("event") or "") prefix, _, _ = event.partition(".") return _normalize_dashboard_entity_type(prefix) def _wiki_entity_path(slug: str, entity_type: str | None = None) -> Path | None: """Resolve a slug to its wiki entity page. Wiki layout: ``entities/skills/.md``, ``entities/agents/.md``, ``entities/harnesses/.md``, or sharded ``entities/mcp-servers//.md``. Returns the first match unless ``entity_type`` disambiguates duplicate slugs. """ # Validate slug so a crafted request can't escape the wiki tree. if not _is_safe_slug(slug): return None for _sub, current_type, _recursive in _DASHBOARD_ENTITY_SOURCES: if entity_type is not None and entity_type != current_type: continue p = core_entity_types.entity_page_path(_wiki_dir(), current_type, slug) if p is None: continue if p.exists(): return p return None def _wiki_entity_target_path(slug: str, entity_type: str) -> Path: """Return the canonical wiki entity path for a new/updated entity.""" if not _is_safe_slug(slug): raise ValueError(f"invalid slug: {slug!r}") normalized = _normalize_dashboard_entity_type(entity_type) if normalized is None: raise ValueError(f"unsupported entity_type: {entity_type!r}") path = core_entity_types.entity_page_path(_wiki_dir(), normalized, slug) if path is None: raise ValueError(f"unsupported entity_type: {entity_type!r}") return path def _iter_wiki_entity_paths( entity_type: str | None = None, ) -> list[tuple[str, str, Path]]: normalized = _normalize_dashboard_entity_type(entity_type) if entity_type else None if entity_type is not None and normalized is None: raise ValueError(f"unsupported entity_type: {entity_type!r}") base = _wiki_dir() / "entities" if not base.is_dir(): return [] rows: list[tuple[str, str, Path]] = [] for sub, current_type, recursive in _DASHBOARD_ENTITY_SOURCES: if normalized is not None and normalized != current_type: continue root = base / sub if not root.is_dir(): continue paths = root.rglob("*.md") if recursive else root.glob("*.md") for path in paths: slug = path.stem if _is_safe_slug(slug): rows.append((slug, current_type, path)) return sorted(rows, key=lambda row: (row[1], row[0].lower(), row[2].as_posix())) def _wiki_entity_detail(slug: str, entity_type: str | None = None) -> dict[str, Any] | None: normalized = _normalize_dashboard_entity_type(entity_type) if entity_type else None if entity_type is not None and normalized is None: raise ValueError(f"unsupported entity_type: {entity_type!r}") path = _wiki_entity_path(slug, entity_type=normalized) if path is None: return None text = path.read_text(encoding="utf-8", errors="replace") frontmatter, body = _parse_frontmatter(text) detected_type = normalized or _normalize_dashboard_entity_type(frontmatter.get("type")) or "skill" return { "slug": slug, "type": detected_type, "path": str(path), "frontmatter": frontmatter, "body": body, } def _search_wiki_entities( query: str = "", entity_type: str | None = None, *, limit: int = 80, ) -> list[dict[str, Any]]: return dashboard_entities.search_wiki_entities( query, entity_type, limit=limit, deps=_entity_crud_deps(), ) def _queue_entity_refresh( *, entity_type: str, slug: str, entity_path: Path, content: str, action: str, ) -> None: wiki = _wiki_dir() wiki_queue.enqueue_entity_upsert( wiki, entity_type=entity_type, slug=slug, entity_path=entity_path, content=content, action=action, source="ctx-monitor", ) if action == "delete": return wiki_queue.enqueue_maintenance_job( wiki, kind=wiki_queue.GRAPH_EXPORT_JOB, payload={"reason": f"entity-{action}", "entity_type": entity_type, "slug": slug}, source="ctx-monitor", ) def _write_entity_text(path: Path, content: str) -> None: _safe_atomic_write_text(path, content, encoding="utf-8") def _entity_crud_deps() -> dashboard_entities.EntityCrudDeps: return dashboard_entities.EntityCrudDeps( is_safe_slug=_is_safe_slug, normalize_entity_type=_normalize_dashboard_entity_type, wiki_entity_detail=_wiki_entity_detail, wiki_entity_target_path=_wiki_entity_target_path, wiki_entity_path=_wiki_entity_path, iter_wiki_entity_paths=_iter_wiki_entity_paths, read_manifest=_read_manifest, perform_unload=_perform_unload, queue_entity_refresh=lambda entity_type, slug, entity_path, content, action: _queue_entity_refresh( entity_type=entity_type, slug=slug, entity_path=entity_path, content=content, action=action, ), file_lock=file_lock, write_entity_text=_write_entity_text, parse_frontmatter=_parse_frontmatter, frontmatter_tags=lambda value: _frontmatter_tags(value, limit=None), frontmatter_text=_frontmatter_text, display_slug=_display_slug, display_label=lambda value: _display_label(value), entity_wiki_href=_entity_wiki_href, ) def _entity_runtime_deps() -> dashboard_entities.EntityRuntimeDeps: return dashboard_entities.EntityRuntimeDeps( is_safe_slug=_is_safe_slug, normalize_entity_type=_normalize_dashboard_entity_type, wiki_dir=_wiki_dir, claude_dir=_claude_dir, log_dashboard_entity_event=_log_dashboard_entity_event, remove_loaded_manifest_entry=_remove_loaded_manifest_entry, ) def _upsert_wiki_entity(payload: dict[str, Any]) -> tuple[bool, str]: return dashboard_entities.upsert_wiki_entity(payload, deps=_entity_crud_deps()) def _delete_wiki_entity(slug: str, entity_type: str) -> tuple[bool, str]: return dashboard_entities.delete_wiki_entity( slug, entity_type, deps=_entity_crud_deps(), ) def _parse_frontmatter(text: str) -> tuple[dict[str, Any], str]: """Split frontmatter from body using the canonical wiki parser.""" return parse_frontmatter_and_body(text) def _frontmatter_text(value: Any) -> str: if isinstance(value, list): return ", ".join(str(v) for v in value) if isinstance(value, dict): return json.dumps(value, ensure_ascii=False, default=str) if value is None: return "" return str(value) def _truncate_text(value: str, limit: int) -> tuple[str, bool]: if limit <= 0 or len(value) <= limit: return value, False if limit <= 3: return value[:limit], True return value[: limit - 3].rstrip() + "...", True def _json_for_script(value: Any) -> str: return json.dumps(value, ensure_ascii=False, default=str).replace(" list[str]: if isinstance(value, list): raw_items = value else: raw = _frontmatter_text(value) raw_items = raw.replace("[", "").replace("]", "").split(",") out: list[str] = [] for item in raw_items: tok = str(item).strip().strip("'\"") if tok: out.append(tok) if limit is not None and len(out) >= limit: break return out _WIKI_INLINE_RE = re.compile( r"(`[^`\n]+`|\[\[[^\]\n]+\]\]|(?\s*(.*?)\s*", re.IGNORECASE | re.DOTALL, ) def _wiki_link_href(target: str) -> tuple[str, str]: """Return dashboard href + display label for an Obsidian-style wikilink.""" normalized = target.strip().replace("\\", "/").removesuffix(".md") parts = [part for part in normalized.split("/") if part] entity_type = "" if len(parts) >= 3 and parts[0] == "entities": entity_type = { "skills": "skill", "agents": "agent", "mcp-servers": "mcp-server", "harnesses": "harness", }.get(parts[1], "") slug = parts[-1] if parts else normalized if not _is_safe_slug(slug): return "#", slug or target suffix = f"?type={quote(entity_type)}" if entity_type else "" return f"/wiki/{quote(slug)}{suffix}", _display_slug(slug) def _markdown_link_href(target: str) -> str | None: """Return a safe href for normal Markdown links, or None to suppress it.""" cleaned = target.strip() if not cleaned: return None if cleaned.startswith("//"): return None if cleaned.startswith(("/", "#")): return cleaned if re.match(r"^https?://", cleaned, re.IGNORECASE): return cleaned if re.match(r"^mailto:[^@\s]+@[^@\s]+$", cleaned, re.IGNORECASE): return cleaned return None def _render_wiki_inline(text: str) -> str: """Render a small safe inline Markdown subset used by wiki pages.""" out: list[str] = [] last = 0 for match in _WIKI_INLINE_RE.finditer(text): out.append(html.escape(text[last:match.start()])) token = match.group(0) if token.startswith("`"): out.append(f"{html.escape(token[1:-1])}") elif token.startswith("[["): inner = token[2:-2] target, _, label = inner.partition("|") href, fallback_label = _wiki_link_href(target) link_text = label.strip() or fallback_label out.append( f"{html.escape(link_text)}", ) else: link_match = re.fullmatch( r"\[([^\]\n]+)\]\(([^\s()\n]+)(?:\s+\"[^\"]*\")?\)", token, ) if not link_match: out.append(html.escape(token)) else: label, target = link_match.groups() safe_href = _markdown_link_href(target) if safe_href is None: out.append(html.escape(label)) else: out.append( f"{html.escape(label)}", ) last = match.end() out.append(html.escape(text[last:])) return "".join(out) def _render_wiki_markdown(markdown_text: str) -> str: """Render a conservative Markdown subset without adding dependencies.""" lines = markdown_text.splitlines() out: list[str] = [] paragraph: list[str] = [] list_items: list[str] = [] code_lines: list[str] = [] in_code = False def flush_paragraph() -> None: if paragraph: out.append(f"

{_render_wiki_inline(' '.join(paragraph))}

") paragraph.clear() def flush_list() -> None: if list_items: out.append("
    " + "".join(f"
  • {item}
  • " for item in list_items) + "
") list_items.clear() def flush_code() -> None: if code_lines: out.append("
" + html.escape("\n".join(code_lines)) + "
") code_lines.clear() for line in lines: stripped = line.strip() if stripped.startswith("```"): if in_code: flush_code() in_code = False else: flush_paragraph() flush_list() in_code = True continue if in_code: code_lines.append(line) continue if not stripped: flush_paragraph() flush_list() continue heading = re.match(r"^(#{1,4})\s+(.+)$", stripped) if heading: flush_paragraph() flush_list() level = min(len(heading.group(1)), 4) out.append(f"{_render_wiki_inline(heading.group(2))}") continue bullet = re.match(r"^\s*[-*]\s+(.+)$", line) if bullet: flush_paragraph() list_items.append(_render_wiki_inline(bullet.group(1).strip())) continue flush_list() paragraph.append(stripped) flush_code() flush_paragraph() flush_list() return "".join(out) if out else "

No body.

" def _extract_embedded_quality_block(markdown_text: str) -> tuple[str, str | None]: matches = list(_WIKI_QUALITY_BLOCK_RE.finditer(markdown_text)) if not matches: return markdown_text, None quality_blocks = [ match.group(1).strip() for match in matches if match.group(1).strip() ] body = _WIKI_QUALITY_BLOCK_RE.sub("\n\n", markdown_text) body = re.sub(r"\n{3,}", "\n\n", body).strip() quality_markdown = "\n\n".join(quality_blocks).strip() or None return body, quality_markdown def _slugish(value: str) -> str: return re.sub(r"[^a-z0-9]+", "-", value.lower()).strip("-") def _display_slug(slug: str) -> str: """Return the user-facing slug while preserving raw IDs for links/actions.""" text = str(slug or "") return text.removeprefix("skills-sh-") def _display_label(value: Any, *, fallback_slug: str = "") -> str: text = str(value or fallback_slug or "") return _display_slug(text) def _strip_duplicate_wiki_heading(markdown_text: str, slug: str) -> str: """Drop the first H1 if it only repeats the page slug.""" lines = markdown_text.splitlines() for idx, line in enumerate(lines): if not line.strip(): continue match = re.match(r"^#\s+(.+?)\s*$", line.strip()) if match and _slugish(match.group(1)) == _slugish(slug): del lines[idx] while idx < len(lines) and not lines[idx].strip(): del lines[idx] break return "\n".join(lines) def _entity_wiki_href(slug: str, entity_type: str | None = None) -> str: suffix = f"?type={quote(entity_type)}" if entity_type in _DASHBOARD_ENTITY_TYPES else "" return f"/wiki/{quote(slug)}{suffix}" def _graph_type_from_node_id(node_id: str, fallback: str = "skill") -> str: prefix = node_id.split(":", 1)[0] if ":" in node_id else "" return { "skill": "skill", "agent": "agent", "mcp-server": "mcp-server", "harness": "harness", }.get(prefix, fallback) def _subgraph_sidecar(slug: str, entity_type: str) -> dict[str, Any] | None: sidecar = _load_sidecar(slug, entity_type=entity_type) return sidecar if isinstance(sidecar, dict) else None def _subgraph_quality_cell(sidecar: dict[str, Any] | None) -> str: if sidecar is None: return "no sidecar" grade = html.escape(str(sidecar.get("grade", "F"))) score = float(sidecar.get("raw_score", sidecar.get("score", 0.0)) or 0.0) floor = str(sidecar.get("hard_floor") or "").strip() floor_html = ( f" floor {html.escape(floor)}" if floor else "" ) return ( f"{grade} " f"{score:.3f}{floor_html}" ) def _subgraph_node_title( label: str, entity_type: str, sidecar: dict[str, Any] | None, ) -> str: if sidecar is None: return f"{label} ({entity_type}) · no sidecar" grade = str(sidecar.get("grade", "F")) score = float(sidecar.get("raw_score", sidecar.get("score", 0.0)) or 0.0) floor = str(sidecar.get("hard_floor") or "").strip() floor_text = f" · floor {floor}" if floor else "" return f"{label} ({entity_type}) · grade {grade} · score {score:.3f}{floor_text}" def _subgraph_node_fill(entity_type: str) -> str: return { "agent": "#f59e0b", "mcp-server": "#ef4444", "harness": "#22c55e", "skill": "#6366f1", }.get(entity_type, "#64748b") def _subgraph_grade_stroke(sidecar: dict[str, Any] | None) -> str: grade = str((sidecar or {}).get("grade") or "") return { "A": "#059669", "B": "#2563eb", "C": "#d97706", "D": "#ea580c", "F": "#dc2626", }.get(grade, "#ffffff") def _render_entity_subgraph_svg( node_by_id: dict[str, dict[str, Any]], edges: list[dict], center: str, sidecar_by_id: dict[str, dict[str, Any] | None], ) -> str: """Render an embedded, interactive 3D graph for wiki entity pages.""" width = 980 height = 380 node_payload: list[dict[str, Any]] = [] for node_id, node in sorted( node_by_id.items(), key=lambda item: ( 0 if item[0] == center else 1, str(item[1].get("label") or item[0]), ), ): node = node_by_id[node_id] node_type = _graph_type_from_node_id( node_id, str(node.get("type") or "skill"), ) node_slug = _graph_slug_from_node_id(node_id) label = _display_label(node.get("label"), fallback_slug=node_slug) sidecar = sidecar_by_id.get(node_id) node_payload.append({ "id": node_id, "slug": node_slug, "label": label, "type": node_type, "href": _entity_wiki_href(node_slug, node_type), "title": _subgraph_node_title(label, node_type, sidecar), "fill": _subgraph_node_fill(node_type), "stroke": _subgraph_grade_stroke(sidecar), "is_center": node_id == center, }) edge_payload: list[dict[str, Any]] = [] for edge in edges: data = edge.get("data", {}) source = str(data.get("source", "")) target = str(data.get("target", "")) if source not in node_by_id or target not in node_by_id: continue shared = ", ".join(str(tag) for tag in data.get("shared_tags", [])[:6]) or "none" weight = float(data.get("weight", 0.0) or 0.0) edge_payload.append({ "source": source, "target": target, "weight": weight, "title": ( f"{_display_slug(_graph_slug_from_node_id(source))} ↔ " f"{_display_slug(_graph_slug_from_node_id(target))} · weight {weight:.3f} " f"· shared {shared}" ), }) nodes_json = _json_for_script(node_payload) edges_json = _json_for_script(edge_payload) return ( "
" "
" "" "" "drag to rotate · wheel to zoom · hover nodes or edges" "
" f"" "
" "
" "Hover a node for sidecar grade/score/floor.
" "
" "Hover an edge for weight and shared signals.
" "" "
" ) def _render_entity_subgraph(slug: str, entity_type: str | None = None) -> str: """Render a compact 1-hop subgraph table for wiki entity pages.""" graph = _graph_neighborhood(slug, hops=1, limit=32, entity_type=entity_type) center = graph.get("center") nodes = graph.get("nodes") or [] edges = graph.get("edges") or [] if not center: return ( "
" "

No graph node was found for this entity.

" "
" ) node_by_id = { str(node.get("data", {}).get("id", "")): node.get("data", {}) for node in nodes } sidecar_by_id = { node_id: _subgraph_sidecar( _graph_slug_from_node_id(node_id), _graph_type_from_node_id(node_id, str(node.get("type") or "skill")), ) for node_id, node in node_by_id.items() } rows: list[str] = [] for edge in edges: data = edge.get("data", {}) source = str(data.get("source", "")) target = str(data.get("target", "")) other_id = target if source == center else source if other_id == center or other_id not in node_by_id: continue other = node_by_id[other_id] other_type = _graph_type_from_node_id(other_id, str(other.get("type", "skill"))) other_slug = other_id.split(":", 1)[-1] shared = ", ".join(str(tag) for tag in data.get("shared_tags", [])[:6]) shared_html = html.escape(shared) if shared else "none" quality_html = _subgraph_quality_cell(sidecar_by_id.get(other_id)) rows.append( "" f"" f"{html.escape(str(other.get('label') or other_slug))}" f"" f"{html.escape(other_type)}" f"{quality_html}" f"{float(data.get('weight', 0.0)):.3f}" f"{shared_html}" "" ) table = ( "" "" + ("".join(rows) if rows else "") + "
EntityTypeQuality sidecarWeightShared signals
No neighbors under the current limit.
" ) return ( "
" "

Subgraph

" f"

{len(nodes)} nodes and {len(edges)} edges in the 1-hop neighborhood.

" + _render_entity_subgraph_svg(node_by_id, edges, center, sidecar_by_id) + table + "
" ) def _entity_tab_script() -> str: return """ """ def _render_entity_tabs( *, overview_html: str, subgraph_html: str, quality_html: str, ) -> str: return ( "
" "" "" "" "
" f"
{overview_html}
" f"" f"" + _entity_tab_script() ) def _render_quality_drilldown( sidecar: dict | None, embedded_quality_markdown: str | None = None, ) -> str: """Explain quality score signals for a wiki entity.""" if sidecar is None: if embedded_quality_markdown: quality_markdown = embedded_quality_markdown.strip() if not re.search(r"^#{1,6}\s+Quality\b", quality_markdown, re.IGNORECASE | re.MULTILINE): quality_markdown = "## Quality\n\n" + quality_markdown return ( "
" + _render_wiki_markdown(quality_markdown) + "
" ) return ( "
" "

Quality

" "

No quality sidecar exists for this entity yet.

" "
" ) grade = str(sidecar.get("grade", "F")) score = float(sidecar.get("raw_score", sidecar.get("score", 0.0)) or 0.0) weights_raw = sidecar.get("weights") signals_raw = sidecar.get("signals") weights: dict[str, Any] = weights_raw if isinstance(weights_raw, dict) else {} signals: dict[str, Any] = signals_raw if isinstance(signals_raw, dict) else {} signal_rows: list[str] = [] for name, signal in sorted(signals.items()): signal_data = signal if isinstance(signal, dict) else {} signal_score = float(signal_data.get("score", 0.0) or 0.0) weight = float(weights.get(name, 0.0) or 0.0) contribution = signal_score * weight evidence = signal_data.get("evidence", {}) evidence_text = json.dumps(evidence, ensure_ascii=False, sort_keys=True, default=str) evidence_preview, evidence_truncated = _truncate_text(evidence_text, 420) truncated_marker = " (truncated)" if evidence_truncated else "" signal_rows.append( "" f"{html.escape(str(name))}" f"{signal_score:.3f}" f"{weight:.3f}" f"{contribution:.3f}" f"{html.escape(evidence_preview)}{truncated_marker}" "" ) if not signal_rows: signal_rows.append("No signal breakdown was recorded.") hard_floor = sidecar.get("hard_floor") floor_html = f" floor {html.escape(str(hard_floor))}" if hard_floor else "" return ( "
" "

Quality

" f"

{html.escape(grade)} " f"score {score:.3f}" f"{floor_html}

" "

Score is the weighted sum of recorded quality signals. " "A hard floor can cap the final grade even when individual signals pass.

" "" "" + "".join(signal_rows) + "
SignalSignal scoreWeightContributionEvidence
" "
Raw sidecar JSON" f"
{html.escape(json.dumps(sidecar, indent=2, ensure_ascii=False, default=str)[:6000])}
" "
" "
" ) def _save_manifest(manifest: dict) -> None: _atomic_write_text(_manifest_path(), json.dumps(manifest, indent=2) + "\n") def _read_skill_manifest_only() -> dict: """Read the mutable skill manifest without synthetic harness rows.""" path = _manifest_path() if not path.exists(): return {"load": [], "unload": [], "warnings": []} try: manifest = json.loads(path.read_text(encoding="utf-8")) except (OSError, json.JSONDecodeError): return {"load": [], "unload": [], "warnings": []} if not isinstance(manifest, dict): return {"load": [], "unload": [], "warnings": []} if not isinstance(manifest.get("load"), list): manifest["load"] = [] if not isinstance(manifest.get("unload"), list): manifest["unload"] = [] if not isinstance(manifest.get("warnings"), list): manifest["warnings"] = [] return manifest def _remove_loaded_manifest_entry(slug: str, entity_type: str) -> list[dict]: """Remove loaded rows for one entity tuple and return removed rows.""" path = _manifest_path() with file_lock(path): manifest = _read_skill_manifest_only() removed: list[dict] = [] remaining: list[dict] = [] for entry in manifest.get("load", []): entry_type = str(entry.get("entity_type") or "skill") if entry.get("skill") == slug and entry_type == entity_type: removed.append(entry) else: remaining.append(entry) if not removed: return [] manifest["load"] = remaining unloaded = { (entry.get("skill"), str(entry.get("entity_type") or "skill")) for entry in manifest.get("unload", []) } preserved: dict[str, object] = {} for field in ("command", "json_config", "priority", "reason"): value = removed[0].get(field) if value not in (None, ""): preserved[field] = value if (slug, entity_type) not in unloaded: entry = { "skill": slug, "entity_type": entity_type, "source": removed[0].get("source") or "ctx-monitor", } entry.update(preserved) manifest.setdefault("unload", []).append(entry) elif preserved: for entry in manifest.get("unload", []): if ( entry.get("skill") == slug and str(entry.get("entity_type") or "skill") == entity_type ): for field, value in preserved.items(): entry.setdefault(field, value) break _save_manifest(manifest) return removed def _log_dashboard_entity_event( entity_type: str, action: str, slug: str, ) -> None: """Append a dashboard-visible audit row for a load/unload action.""" try: from ctx_audit_log import log if entity_type == "skill": log( f"skill.{action}", subject_type="skill", subject=slug, actor="user", meta={"via": "ctx-monitor"}, path=_audit_log_path(), ) elif entity_type == "agent": log( f"agent.{action}", subject_type="agent", subject=slug, actor="user", meta={"via": "ctx-monitor"}, path=_audit_log_path(), ) elif entity_type == "mcp-server": log( "toolbox.triggered", subject_type="toolbox", subject=slug, actor="user", meta={ "via": "ctx-monitor", "entity_type": "mcp-server", "action": action, }, path=_audit_log_path(), ) except Exception: # noqa: BLE001 pass def _read_manifest() -> dict: """Return current loaded entities from the skill manifest plus harness installs.""" path = _manifest_path() manifest: dict[str, Any] if not path.exists(): manifest = {"load": [], "unload": [], "warnings": []} else: try: manifest = json.loads(path.read_text(encoding="utf-8")) except (OSError, json.JSONDecodeError): manifest = {"load": [], "unload": [], "warnings": []} if not isinstance(manifest, dict): manifest = {"load": [], "unload": [], "warnings": []} load_rows = manifest.setdefault("load", []) if not isinstance(load_rows, list): load_rows = [] manifest["load"] = load_rows manifest.setdefault("unload", []) manifest.setdefault("warnings", []) existing = { (str(row.get("entity_type") or "skill"), str(row.get("skill") or "")) for row in load_rows if isinstance(row, dict) } for row in _read_harness_install_rows(): key = ("harness", str(row.get("skill") or "")) if key not in existing: load_rows.append(row) existing.add(key) return manifest def _read_harness_install_rows() -> list[dict]: """Return installed harness records as manifest-compatible load rows.""" root = _claude_dir() / "harness-installs" if not root.is_dir(): return [] rows: list[dict] = [] for path in sorted(root.glob("*.json")): try: data = json.loads(path.read_text(encoding="utf-8")) except (OSError, json.JSONDecodeError): continue if not isinstance(data, dict) or data.get("status") != "installed": continue slug = str(data.get("slug") or path.stem).strip() if not slug or not _is_safe_slug(slug): continue rows.append({ "skill": slug, "entity_type": "harness", "source": "ctx-harness-install", "command": data.get("target") or data.get("repo_url") or "", "installed_at": data.get("installed_at", ""), "status": data.get("status", "installed"), }) return rows def _queue_job_summary(job: wiki_queue.QueueJob) -> dict[str, Any]: return { "id": job.id, "kind": job.kind, "status": job.status, "attempts": job.attempts, "max_attempts": job.max_attempts, "worker_id": job.worker_id, "leased_until": job.leased_until, "available_at": job.available_at, "last_error": job.last_error, "created_at": job.created_at, "updated_at": job.updated_at, "source": job.payload.get("source"), "payload_keys": sorted(str(key) for key in job.payload), } def _queue_status() -> dict[str, Any]: """Return durable wiki/graph queue state without creating the DB.""" db_path = wiki_queue.queue_db_path(_wiki_dir()) counts = { wiki_queue.STATUS_PENDING: 0, wiki_queue.STATUS_RUNNING: 0, wiki_queue.STATUS_SUCCEEDED: 0, wiki_queue.STATUS_FAILED: 0, wiki_queue.STATUS_CANCELLED: 0, } if not db_path.exists(): return { "available": False, "db_path": str(db_path), "total": 0, "counts": counts, "recent_jobs": [], } try: raw_counts = wiki_queue.count_jobs_by_status(db_path) recent = wiki_queue.list_recent_jobs(db_path, limit=20) except Exception as exc: # noqa: BLE001 return { "available": False, "db_path": str(db_path), "total": 0, "counts": counts, "recent_jobs": [], "error": str(exc), } for status, count in raw_counts.items(): counts[status] = count return { "available": True, "db_path": str(db_path), "total": sum(raw_counts.values()), "counts": counts, "recent_jobs": [_queue_job_summary(job) for job in recent], } def _file_status(path: Path) -> dict[str, Any]: if not path.exists(): return {"path": str(path), "exists": False, "size": 0, "mtime": None} try: stat = path.stat() except OSError as exc: return { "path": str(path), "exists": False, "size": 0, "mtime": None, "error": str(exc), } return { "path": str(path), "exists": path.is_file(), "size": stat.st_size, "mtime": stat.st_mtime, } def _repo_graph_dir() -> Path: return Path(__file__).resolve().parents[1] / "graph" def _first_existing_file_status(*paths: Path) -> dict[str, Any]: for path in paths: if path.exists(): return _file_status(path) return _file_status(paths[0]) def _promotion_status(path: Path) -> dict[str, Any] | None: try: data = json.loads(path.read_text(encoding="utf-8")) except (OSError, json.JSONDecodeError): return None if not isinstance(data, dict): return None previous = _dict_or_empty(data.get("previous")) candidate = _dict_or_empty(data.get("candidate")) current = _dict_or_empty(data.get("current")) return { "path": str(path), "status": data.get("status"), "target": data.get("target"), "started_at": data.get("started_at"), "promoted_at": data.get("promoted_at"), "previous_sha256": previous.get("sha256"), "previous_size": previous.get("size"), "candidate_sha256": candidate.get("sha256"), "candidate_size": candidate.get("size"), "current_sha256": current.get("sha256"), "current_size": current.get("size"), } def _dict_or_empty(value: Any) -> dict[str, Any]: return value if isinstance(value, dict) else {} def _artifact_status() -> dict[str, Any]: """Return shipped graph/wiki artifact file state and promotion metadata.""" wiki = _wiki_dir() graph_dir = wiki / "graphify-out" claude_graph_dir = _claude_dir() / "graph" repo_graph_dir = _repo_graph_dir() promotion_paths = sorted( { *graph_dir.glob("*.promotion.json"), *wiki.glob("*.promotion.json"), *claude_graph_dir.glob("*.promotion.json"), }, key=lambda path: str(path), ) promotions = [ promotion for promotion in (_promotion_status(path) for path in promotion_paths) if promotion is not None ] return { "graph_json": _file_status(graph_dir / "graph.json"), "graph_delta_json": _file_status(graph_dir / "graph-delta.json"), "communities_json": _file_status(graph_dir / "communities.json"), "wiki_graph_tar": _first_existing_file_status( claude_graph_dir / "wiki-graph.tar.gz", repo_graph_dir / "wiki-graph.tar.gz", ), "skills_sh_catalog": _first_existing_file_status( wiki / "external-catalogs" / "skills-sh" / "catalog.json", claude_graph_dir / "skills-sh-catalog.json.gz", repo_graph_dir / "skills-sh-catalog.json.gz", ), "promotion_count": len(promotions), "promotions": promotions, } def _status_payload() -> dict[str, Any]: return { "queue": _queue_status(), "artifacts": _artifact_status(), } def _read_jsonl(path: Path, limit: int | None = None) -> list[dict]: if not path.exists(): return [] if limit is not None and limit <= 0: return [] out: deque[dict] | list[dict] out = deque(maxlen=limit) if limit is not None else [] with path.open(encoding="utf-8") as f: for line in f: line = line.strip() if not line: continue try: event = json.loads(line) except json.JSONDecodeError: continue if isinstance(event, dict): out.append(event) return list(out) def _runtime_lifecycle_events(limit: int | None = 200) -> list[dict[str, Any]]: events = _read_jsonl(_runtime_lifecycle_path(), limit=limit) return [ event for event in events if event.get("action") in {"validation", "escalation"} ] def _runtime_escalation_key(event: dict[str, Any]) -> str: for field in ("escalation_id", "event_id", "id"): value = event.get(field) if value: return str(value) return "\0".join( str(event.get(field) or "") for field in ("session_id", "trigger", "reason", "severity") ) def _runtime_lifecycle_summary(limit: int = 200) -> dict[str, Any]: events = _runtime_lifecycle_events(limit=None) validations = [ event for event in events if event.get("action") == "validation" ] escalations = [ event for event in events if event.get("action") == "escalation" ] open_by_key: dict[str, dict[str, Any]] = {} for event in escalations: key = _runtime_escalation_key(event) status = str(event.get("status") or "open").lower() if status == "open": open_by_key[key] = event else: open_by_key.pop(key, None) open_escalations = list(open_by_key.values()) validation_failures = [ event for event in validations if str(event.get("status") or "").lower() in {"failed", "error"} ] sessions = sorted({ str(event.get("session_id") or "") for event in events if event.get("session_id") }) return { "path": str(_runtime_lifecycle_path()), "events_total": len(events), "validations_total": len(validations), "validation_failures": len(validation_failures), "escalations_total": len(escalations), "open_escalations_total": len(open_escalations), "latest_validation": validations[-1] if validations else None, "recent_validations": validations[-20:], "open_escalations": open_escalations[-20:], "sessions": sessions, } def _sidecar_entity_type(sidecar: dict, fallback: str = "skill") -> str: raw = str( sidecar.get("entity_type") or sidecar.get("subject_type") or sidecar.get("type") or fallback ) return { "skills": "skill", "skill": "skill", "agents": "agent", "agent": "agent", "mcp": "mcp-server", "mcp-server": "mcp-server", "mcp-servers": "mcp-server", "harness": "harness", "harnesses": "harness", }.get(raw, raw) def _sidecar_fallback_type(path: Path) -> str: return "mcp-server" if path.parent.name == "mcp" else "skill" def _read_sidecar_file(path: Path) -> dict | None: try: sidecar = json.loads(path.read_text(encoding="utf-8")) except (OSError, json.JSONDecodeError): return None if not isinstance(sidecar, dict): return None etype = _sidecar_entity_type(sidecar, _sidecar_fallback_type(path)) sidecar.setdefault("slug", path.stem) sidecar["subject_type"] = etype return sidecar def _load_sidecar(slug: str, entity_type: str | None = None) -> dict | None: if not _is_safe_slug(slug): return None paths = [ _sidecar_dir() / f"{slug}.json", _sidecar_dir() / "mcp" / f"{slug}.json", ] if entity_type is not None: suffixes = [entity_type] if entity_type == "mcp-server": suffixes.append("mcp") for suffix in suffixes: paths.append(_sidecar_dir() / f"{slug}-{suffix}.json") for path in paths: if not path.exists(): continue sidecar = _read_sidecar_file(path) if sidecar is None: continue if entity_type is None or _sidecar_entity_type(sidecar) == entity_type: return sidecar if entity_type is not None and _SIDECAR_INDEX_CACHE_VALUE is not None: return _sidecar_index().get((slug, entity_type)) return None def _sidecar_files() -> list[Path]: files: list[Path] = [] for root in (_sidecar_dir(), _sidecar_dir() / "mcp"): if not root.is_dir(): continue files.extend( p for p in sorted(root.glob("*.json")) if not p.name.startswith(".") and not p.name.endswith(".lifecycle.json") ) return files def _sidecar_index_cache_key() -> tuple[tuple[Path, float, int], ...]: keys: list[tuple[Path, float, int]] = [] for path in _sidecar_files(): stat = path.stat() keys.append((path.resolve(), stat.st_mtime, stat.st_size)) if keys: return tuple(keys) for root in (_sidecar_dir(), _sidecar_dir() / "mcp"): if not root.is_dir(): continue stat = root.stat() keys.append((root.resolve(), stat.st_mtime, stat.st_size)) return tuple(keys) def _sidecar_index() -> dict[tuple[str, str], dict]: global _SIDECAR_INDEX_CACHE_KEY, _SIDECAR_INDEX_CACHE_VALUE cache_key = _sidecar_index_cache_key() if _SIDECAR_INDEX_CACHE_KEY == cache_key and _SIDECAR_INDEX_CACHE_VALUE is not None: return _SIDECAR_INDEX_CACHE_VALUE index: dict[tuple[str, str], dict] = {} for path in _sidecar_files(): sidecar = _read_sidecar_file(path) if sidecar is None: continue slug = str(sidecar.get("slug") or path.stem) entity_type = _sidecar_entity_type(sidecar) index.setdefault((slug, entity_type), sidecar) _SIDECAR_INDEX_CACHE_KEY = cache_key _SIDECAR_INDEX_CACHE_VALUE = index return index def _all_sidecars() -> list[dict]: return list(_sidecar_index().values()) def _skills_page_int( value: str | None, *, default: int, minimum: int = 1, maximum: int | None = None, ) -> int: try: parsed = int(str(value or "").strip()) except ValueError: parsed = default parsed = max(minimum, parsed) if maximum is not None: parsed = min(maximum, parsed) return parsed def _skills_query_values(raw: str | None, allowed: set[str]) -> set[str]: values = { item.strip() for item in str(raw or "").split(",") if item.strip() } return {item for item in values if item in allowed} def _sidecar_sort_key(sidecar: dict) -> tuple[str, float, str]: return ( str(sidecar.get("grade") or "F"), -float(sidecar.get("raw_score") or sidecar.get("score") or 0.0), str(sidecar.get("slug") or ""), ) def _sidecar_card_payload(sidecar: dict) -> dict[str, Any]: slug = str(sidecar.get("slug") or "") entity_type = _sidecar_entity_type(sidecar) return { "slug": slug, "grade": str(sidecar.get("grade") or "F"), "type": entity_type, "hard_floor": str(sidecar.get("hard_floor") or ""), "raw_score": float(sidecar.get("raw_score") or sidecar.get("score") or 0.0), "sidecar_href": f"/skill/{quote(slug)}?type={quote(entity_type)}", "wiki_href": f"/wiki/{quote(slug)}?type={quote(entity_type)}", "graph_href": f"/graph?slug={quote(slug)}&type={quote(entity_type)}", } def _sidecar_filter_signature(files: list[Path]) -> tuple[Any, ...]: signature: list[tuple[str, int, int]] = [] for path in files: try: stat = path.stat() except OSError: continue signature.append(( str(path.resolve()), int(getattr(stat, "st_mtime_ns", int(stat.st_mtime * 1_000_000_000))), int(stat.st_size), )) if signature: return tuple(signature) roots = (_sidecar_dir(), _sidecar_dir() / "mcp") for root in roots: if not root.is_dir(): signature.append((str(root.resolve()), 0, 0)) continue stat = root.stat() signature.append(( str(root.resolve()), int(getattr(stat, "st_mtime_ns", int(stat.st_mtime * 1_000_000_000))), 0, )) return tuple(signature) def _sidecar_candidate_files( files: list[Path], *, q: str, types: set[str], ) -> list[Path]: q_lower = q.lower() candidates = [ path for path in files if not q_lower or q_lower in path.stem.lower() ] if not types: return candidates if types == {"mcp-server"}: return [path for path in candidates if path.parent.name == "mcp"] if "mcp-server" not in types: return [path for path in candidates if path.parent.name != "mcp"] return candidates def _filtered_sidecar_records( files: list[Path], *, q: str, types: set[str], grades: set[str], hide_floor: bool, ) -> list[dict[str, Any]]: """Return cached filtered sidecar card records for /skills search.""" global _SIDECAR_FILTER_CACHE_SIGNATURE, _SIDECAR_FILTER_CACHE_VALUE signature = _sidecar_filter_signature(files) if _SIDECAR_FILTER_CACHE_SIGNATURE != signature: _SIDECAR_FILTER_CACHE_SIGNATURE = signature _SIDECAR_FILTER_CACHE_VALUE = {} cache_key = ( q.lower(), tuple(sorted(types)), tuple(sorted(grades)), hide_floor, ) cached = _SIDECAR_FILTER_CACHE_VALUE.get(cache_key) if cached is not None: return cached records: list[dict[str, Any]] = [] for path in _sidecar_candidate_files(files, q=q, types=types): sidecar = _read_sidecar_file(path) if sidecar is None: continue if not _sidecar_matches_filters( sidecar, q=q, types=types, grades=grades, hide_floor=hide_floor, ): continue records.append(_sidecar_card_payload(sidecar)) records.sort(key=_sidecar_sort_key) if len(_SIDECAR_FILTER_CACHE_VALUE) >= 32: _SIDECAR_FILTER_CACHE_VALUE.clear() _SIDECAR_FILTER_CACHE_VALUE[cache_key] = records return records def _sidecar_matches_filters( sidecar: dict, *, q: str, types: set[str], grades: set[str], hide_floor: bool, ) -> bool: entity_type = _sidecar_entity_type(sidecar) grade = str(sidecar.get("grade") or "F") floor = str(sidecar.get("hard_floor") or "") if types and entity_type not in types: return False if grades and grade not in grades: return False if hide_floor and floor: return False if q: return q.lower() in str(sidecar.get("slug") or "").lower() return True def _sidecar_page_payload(qs: dict[str, str] | None = None) -> dict[str, Any]: """Return a paginated sidecar payload for /skills and its JSON API.""" qs = qs or {} page = _skills_page_int(qs.get("page"), default=1) limit = _skills_page_int( qs.get("limit"), default=_SKILLS_PAGE_DEFAULT_LIMIT, maximum=_SKILLS_PAGE_MAX_LIMIT, ) q = str(qs.get("q") or "").strip() types = _skills_query_values(qs.get("type"), set(_DASHBOARD_ENTITY_TYPES)) grades = _skills_query_values(qs.get("grade"), {"A", "B", "C", "D", "F"}) hide_floor = str(qs.get("hide_floor") or "").strip().lower() in { "1", "true", "yes", "on", } files = _sidecar_files() catalog_total = len(files) has_filters = bool(q or types or grades or hide_floor) if has_filters: sidecars = _filtered_sidecar_records( files, q=q, types=types, grades=grades, hide_floor=hide_floor, ) total = len(sidecars) start = (page - 1) * limit page_sidecars = sidecars[start:start + limit] else: total = catalog_total start = (page - 1) * limit selected_files = files[start:start + limit] page_sidecars = [ sidecar for path in selected_files if (sidecar := _read_sidecar_file(path)) is not None ] if catalog_total <= limit: page_sidecars.sort(key=_sidecar_sort_key) pages = max(1, math.ceil(total / limit)) if total else 1 if page > pages: page = pages return _sidecar_page_payload({ **qs, "page": str(page), "limit": str(limit), }) return { "items": [_sidecar_card_payload(sidecar) for sidecar in page_sidecars], "total": total, "catalog_total": catalog_total, "page": page, "limit": limit, "pages": pages, "has_next": page < pages, "has_prev": page > 1, "filtered": has_filters, "q": q, "types": sorted(types), "grades": sorted(grades), "hide_floor": hide_floor, } # ─── Aggregations ──────────────────────────────────────────────────────────── def _summarize_sessions() -> list[dict]: """Join audit-log session events with skill-events.jsonl load/unloads.""" audit = _read_jsonl(_audit_log_path()) events = _read_jsonl(_events_jsonl_path()) by_session: dict[str, dict[str, Any]] = defaultdict( lambda: { "session_id": "", "first_seen": None, "last_seen": None, "skills_loaded": set(), "skills_unloaded": set(), "agents_loaded": set(), "agents_unloaded": set(), "mcps_loaded": set(), "mcps_unloaded": set(), "score_updates": 0, "lifecycle_transitions": 0, } ) for line in audit: sid = line.get("session_id") or "unknown" row = by_session[sid] row["session_id"] = sid ts = line.get("ts") if ts and (row["first_seen"] is None or ts < row["first_seen"]): row["first_seen"] = ts if ts and (row["last_seen"] is None or ts > row["last_seen"]): row["last_seen"] = ts event = line.get("event", "") if event == "skill.loaded": row["skills_loaded"].add(line.get("subject", "")) elif event == "skill.unloaded": row["skills_unloaded"].add(line.get("subject", "")) elif event == "agent.loaded": row["agents_loaded"].add(line.get("subject", "")) elif event == "agent.unloaded": row["agents_unloaded"].add(line.get("subject", "")) elif event == "toolbox.triggered": raw_meta = line.get("meta") meta: dict[str, Any] = raw_meta if isinstance(raw_meta, dict) else {} if meta.get("entity_type") == "mcp-server": action = meta.get("action") if action == "loaded": row["mcps_loaded"].add(line.get("subject", "")) elif action == "unloaded": row["mcps_unloaded"].add(line.get("subject", "")) elif event.endswith(".score_updated"): row["score_updates"] += 1 elif event in ("skill.archived", "skill.demoted", "skill.restored", "skill.deleted", "agent.archived", "agent.demoted", "agent.restored", "agent.deleted"): row["lifecycle_transitions"] += 1 for line in events: sid = line.get("session_id") or "unknown" row = by_session[sid] row["session_id"] = sid ts = line.get("timestamp") if ts and (row["first_seen"] is None or ts < row["first_seen"]): row["first_seen"] = ts if ts and (row["last_seen"] is None or ts > row["last_seen"]): row["last_seen"] = ts action = line.get("event") entity_type = ( _audit_entity_type(line) or ("agent" if line.get("agent") else None) or ("mcp-server" if line.get("mcp") or line.get("mcp_server") else None) or ("skill" if line.get("skill") else None) ) if entity_type == "agent": subject = line.get("agent") elif entity_type == "mcp-server": subject = line.get("mcp") or line.get("mcp_server") else: subject = line.get("skill") if action == "load" and subject: if entity_type == "agent": row["agents_loaded"].add(subject) elif entity_type == "mcp-server": row["mcps_loaded"].add(subject) else: row["skills_loaded"].add(subject) elif action == "unload" and subject: if entity_type == "agent": row["agents_unloaded"].add(subject) elif entity_type == "mcp-server": row["mcps_unloaded"].add(subject) else: row["skills_unloaded"].add(subject) summaries: list[dict] = [] for row in by_session.values(): summaries.append({ "session_id": row["session_id"], "first_seen": row["first_seen"], "last_seen": row["last_seen"], "skills_loaded": sorted(row["skills_loaded"]), "skills_unloaded": sorted(row["skills_unloaded"]), "agents_loaded": sorted(row["agents_loaded"]), "agents_unloaded": sorted(row["agents_unloaded"]), "mcps_loaded": sorted(row["mcps_loaded"]), "mcps_unloaded": sorted(row["mcps_unloaded"]), "score_updates": row["score_updates"], "lifecycle_transitions": row["lifecycle_transitions"], }) summaries.sort(key=lambda r: r.get("last_seen") or "", reverse=True) return summaries def _grade_distribution() -> dict[str, int]: dist = {"A": 0, "B": 0, "C": 0, "D": 0, "F": 0} for s in _all_sidecars(): g = s.get("grade") if g in dist: dist[g] += 1 return dist def _grade_distribution_payload() -> dict[str, Any]: grades = _grade_distribution() return {"grades": grades, "total": sum(grades.values())} def _session_detail(session_id: str) -> dict: audit = _read_jsonl(_audit_log_path()) events = _read_jsonl(_events_jsonl_path()) session_audit = [r for r in audit if r.get("session_id") == session_id] session_events = [e for e in events if e.get("session_id") == session_id] return { "session_id": session_id, "audit_entries": session_audit, "load_events": session_events, } # ─── HTML rendering ────────────────────────────────────────────────────────── def _monitor_asset_text(name: str) -> str: """Read a packaged dashboard asset.""" return files("ctx").joinpath("assets", name).read_text(encoding="utf-8") def _monitor_inline_script(name: str) -> str: return f"" _CSS = _monitor_asset_text("monitor.css") def _layout(title: str, body: str) -> str: """Wrap body HTML in the standard page chrome.""" nav_items = ( ("home", "Home", "/"), ("loaded", "Loaded", "/loaded"), ("skills", "Skills", "/skills"), ("wiki", "Wiki", "/wiki"), ("graph", "Graph", "/graph"), ("manage", "Manage", "/manage"), ("harness", "Harness Setup", "/harness"), ("docs", "Docs", "/docs"), ("config", "Config", "/config"), ("status", "Status", "/status"), ("kpi", "KPIs", "/kpi"), ("runtime", "Runtime", "/runtime"), ("sessions", "Sessions", "/sessions"), ("logs", "Logs", "/logs"), ("events", "Live", "/events"), ) nav_html = "".join( f"" f"{html.escape(label)}" for key, label, href in nav_items ) nav_default_keys = html.escape( json.dumps([key for key, _label, _href in nav_items]), quote=True, ) return ( "" "" f"{html.escape(title)} — ctx monitor" f"" "" + _monitor_inline_script("monitor-nav.js") + body + "" ) # ─── Graph neighborhood (for /graph) ──────────────────────────────────────── def _graph_slug_from_node_id(node_id: str) -> str: return node_id.split(":", 1)[-1] if ":" in node_id else node_id def _resolve_graph_center( G: Any, slug: str, entity_type: str | None, ) -> tuple[str | None, dict[str, str] | None, list[str]]: """Resolve exact and fuzzy graph focus queries to one graph node id.""" raw_query = str(slug or "").strip() if not raw_query or "/" in raw_query or "\\" in raw_query or ".." in raw_query: return None, None, [] normalized_query = _slugish(raw_query) if not normalized_query or not _is_safe_slug(normalized_query): return None, None, [] entity_types = ( (entity_type,) if entity_type is not None else _DASHBOARD_ENTITY_TYPES ) for current_type in entity_types: for candidate_slug in (raw_query, normalized_query): candidate = f"{current_type}:{candidate_slug}" if candidate in G: return candidate, None, [candidate_slug] matches: list[tuple[tuple[int, int, int], str, str]] = [] query_tokens = set(normalized_query.split("-")) for node_id in G.nodes: node_type = _graph_type_from_node_id(str(node_id)) if node_type not in entity_types: continue data = G.nodes.get(node_id, {}) node_slug = _graph_slug_from_node_id(str(node_id)) label = _display_label(data.get("label"), fallback_slug=node_slug) haystacks = {_slugish(node_slug), _slugish(_display_slug(node_slug)), _slugish(label)} tags = data.get("tags", []) if isinstance(tags, list): haystacks.update(_slugish(str(tag)) for tag in tags[:12]) rank = None if normalized_query in haystacks: rank = 0 elif any(h.startswith(normalized_query) for h in haystacks): rank = 1 elif any(normalized_query in h for h in haystacks): rank = 2 elif query_tokens and all( any(token in h for h in haystacks) for token in query_tokens ): rank = 3 if rank is None: continue try: degree = int(G.degree[node_id]) except Exception: # noqa: BLE001 degree = 0 matches.append(((rank, len(node_slug), -degree), str(node_id), node_slug)) matches.sort(key=lambda item: item[0]) suggestions = [] for _, _node_id, suggestion in matches[:8]: display_suggestion = _display_slug(suggestion) if display_suggestion not in suggestions: suggestions.append(display_suggestion) if not matches: return None, None, suggestions center = matches[0][1] resolved_slug = _graph_slug_from_node_id(center) return ( center, {"query": raw_query, "slug": resolved_slug, "id": center}, suggestions, ) def _unit_score(value: Any) -> float | None: try: score = float(value) except (TypeError, ValueError): return None if not math.isfinite(score): return None return max(0.0, min(1.0, score)) def _format_count(value: Any) -> str: try: return f"{int(value):,}" except (TypeError, ValueError): return "0" def _load_direct_sidecar(slug: str, entity_type: str | None = None) -> dict | None: if not _is_safe_slug(slug): return None for path in ( _sidecar_dir() / f"{slug}.json", _sidecar_dir() / "mcp" / f"{slug}.json", ): if not path.exists(): continue sidecar = _read_sidecar_file(path) if sidecar is None: continue if entity_type is None or _sidecar_entity_type(sidecar) == entity_type: return sidecar return None def _sidecar_score_inputs(slug: str, entity_type: str) -> tuple[float | None, float | None]: sidecar = _load_direct_sidecar(slug, entity_type=entity_type) if not isinstance(sidecar, dict): return None, None quality = _unit_score(sidecar.get("score", sidecar.get("raw_score"))) usage = None signals = sidecar.get("signals") if isinstance(signals, dict): telemetry = signals.get("telemetry") if isinstance(telemetry, dict): usage = _unit_score(telemetry.get("score")) return quality, usage def _graph_node_size( nid: str, data: dict[str, Any], *, entity_type: str, degree: int, max_degree: int, ) -> dict[str, Any]: """Return bounded visual size metadata for a graph node.""" slug = nid.split(":", 1)[-1] quality = _unit_score(data.get("quality_score")) usage = _unit_score(data.get("usage_score")) if quality is None or usage is None: sidecar_quality, sidecar_usage = _sidecar_score_inputs(slug, entity_type) quality = quality if quality is not None else sidecar_quality usage = usage if usage is not None else sidecar_usage quality_value = 0.35 if quality is None else quality usage_value = 0.0 if usage is None else usage popularity = ( math.log1p(max(0, degree)) / math.log1p(max(1, max_degree)) if max_degree > 0 else 0.0 ) signal = max( 0.0, min(1.0, 0.45 * quality_value + 0.35 * usage_value + 0.20 * popularity), ) return { "node_size": round(8.0 + signal * 16.0, 2), "size_signal": round(signal, 4), "size_reason": ( f"quality {quality_value:.3f}; usage {usage_value:.3f}; " f"popularity {popularity:.3f}" ), } def _dashboard_graph_index_path() -> Path: return _wiki_dir() / "graphify-out" / "dashboard-neighborhoods.sqlite3" def _dashboard_graph_manifest_export_id() -> str | None: manifest_path = _wiki_dir() / "graphify-out" / "graph-export-manifest.json" try: data = json.loads(manifest_path.read_text(encoding="utf-8")) except (OSError, json.JSONDecodeError): return None export_id = data.get("export_id") if isinstance(data, dict) else None if not isinstance(export_id, str) or not export_id.strip(): return None return export_id.strip() def _dashboard_index_meta(index_path: Path) -> dict[str, Any] | None: try: conn = sqlite3.connect(f"file:{index_path.as_posix()}?mode=ro", uri=True) except sqlite3.Error: return None try: rows = conn.execute("SELECT key,value FROM meta").fetchall() except sqlite3.Error: return None finally: conn.close() try: return {str(key): json.loads(str(value)) for key, value in rows} except (TypeError, ValueError, json.JSONDecodeError): return None def _dashboard_index_matches_manifest(index_path: Path) -> bool: manifest_export_id = _dashboard_graph_manifest_export_id() if manifest_export_id is None: return False meta = _dashboard_index_meta(index_path) if meta is None: return False return meta.get("export_id") == manifest_export_id def _dashboard_graph_has_runtime_overlays() -> bool: overlay = _wiki_dir() / "graphify-out" / "entity-overlays.jsonl" try: return overlay.is_file() and overlay.stat().st_size > 0 except OSError: return False def _overlay_index_coverage_key(index_path: Path, overlay: Path) -> tuple[Any, ...] | None: try: index_stat = index_path.stat() overlay_stat = overlay.stat() except OSError: return None return ( index_path.resolve(), index_stat.st_mtime, index_stat.st_size, overlay.resolve(), overlay_stat.st_mtime, overlay_stat.st_size, _dashboard_graph_manifest_export_id(), ) def _active_dashboard_overlay_records(overlay: Path) -> list[dict[str, Any]] | None: try: from ctx.core.graph.entity_overlays import active_overlay_records rows = [] for line in overlay.read_text(encoding="utf-8").splitlines(): line = line.strip() if not line: continue payload = json.loads(line) if not isinstance(payload, dict): return None rows.append(payload) return [dict(row) for row in active_overlay_records(rows)] except (OSError, json.JSONDecodeError, TypeError, ValueError): return None def _dashboard_overlay_matches_known_release(overlay: Path) -> bool: try: from ctx_init import _GRAPH_ENTITY_OVERLAY_SHA256 except (ImportError, AttributeError): return False if not isinstance(_GRAPH_ENTITY_OVERLAY_SHA256, str) or not _GRAPH_ENTITY_OVERLAY_SHA256: return False try: data = overlay.read_bytes().replace(b"\r\n", b"\n") return hashlib.sha256(data).hexdigest() == _GRAPH_ENTITY_OVERLAY_SHA256 except OSError: return False def _dashboard_index_uncovered_overlay_nodes( conn: sqlite3.Connection, records: list[dict[str, Any]], *, require_edges: bool, ) -> set[str] | None: uncovered: set[str] = set() neighbor_targets: dict[str, set[str]] = {} def node_exists(node_id: str) -> bool: return bool(conn.execute( "SELECT 1 FROM nodes WHERE id=? LIMIT 1", (node_id,), ).fetchone()) def indexed_neighbors(node_id: str) -> set[str]: cached = neighbor_targets.get(node_id) if cached is not None: return cached row = conn.execute( "SELECT payload FROM neighbors WHERE source=?", (node_id,), ).fetchone() targets: set[str] = set() if row is not None: try: payload = json.loads(zlib.decompress(row["payload"]).decode("utf-8")) except (TypeError, json.JSONDecodeError, zlib.error): payload = [] if isinstance(payload, list): targets = { str(edge.get("target")) for edge in payload if isinstance(edge, dict) and isinstance(edge.get("target"), str) } neighbor_targets[node_id] = targets return targets for record in records: nodes = record.get("nodes", []) edges = record.get("edges", []) if not isinstance(nodes, list) or not isinstance(edges, list): return None if not nodes and edges: return None for node in nodes: if not isinstance(node, dict): return None node_id = node.get("id") if not isinstance(node_id, str): return None if not node_exists(node_id): uncovered.add(node_id) if not require_edges: continue for edge in edges: if not isinstance(edge, dict): return None source = edge.get("source") target = edge.get("target") if not isinstance(source, str) or not isinstance(target, str): return None source_exists = node_exists(source) target_exists = node_exists(target) if not source_exists: uncovered.add(source) if not target_exists: uncovered.add(target) if not source_exists or not target_exists: uncovered.update((source, target)) continue if target not in indexed_neighbors(source) and source not in indexed_neighbors(target): uncovered.update((source, target)) return uncovered def _dashboard_uncovered_runtime_overlay_nodes(index_path: Path) -> set[str] | None: overlay = _wiki_dir() / "graphify-out" / "entity-overlays.jsonl" try: if not overlay.is_file() or overlay.stat().st_size == 0: return set() except OSError: return set() if not index_path.is_file() or not _dashboard_index_matches_manifest(index_path): return None records = _active_dashboard_overlay_records(overlay) if records is None: return None require_edges = not _dashboard_overlay_matches_known_release(overlay) try: conn = sqlite3.connect(f"file:{index_path.as_posix()}?mode=ro", uri=True) conn.row_factory = sqlite3.Row try: return _dashboard_index_uncovered_overlay_nodes( conn, records, require_edges=require_edges, ) finally: conn.close() except (OSError, sqlite3.Error, KeyError, TypeError): return None def _dashboard_index_covers_runtime_overlays(index_path: Path) -> bool: """Return True when the shipped SQLite index already includes overlays. ``ctx-init`` may install ``entity-overlays.jsonl`` beside a graph export. Older dashboard code treated any overlay file as runtime-only and skipped the SQLite fast path. Current release artifacts can already contain those overlay nodes and edges in ``dashboard-neighborhoods.sqlite3``; in that case loading the full graph just to merge the same small known-release overlay is wasted cold-start work. Local/user overlays still fall back to the full graph merge so newly attached edges remain visible. """ overlay = _wiki_dir() / "graphify-out" / "entity-overlays.jsonl" try: if not overlay.is_file() or overlay.stat().st_size == 0: return True except OSError: return True if not index_path.is_file() or not _dashboard_index_matches_manifest(index_path): return False global _OVERLAY_INDEX_COVERAGE_CACHE_KEY, _OVERLAY_INDEX_COVERAGE_CACHE_VALUE cache_key = _overlay_index_coverage_key(index_path, overlay) if ( cache_key is not None and _OVERLAY_INDEX_COVERAGE_CACHE_KEY == cache_key and _OVERLAY_INDEX_COVERAGE_CACHE_VALUE is not None ): return _OVERLAY_INDEX_COVERAGE_CACHE_VALUE uncovered = _dashboard_uncovered_runtime_overlay_nodes(index_path) coverage = uncovered == set() if cache_key is not None: _OVERLAY_INDEX_COVERAGE_CACHE_KEY = cache_key _OVERLAY_INDEX_COVERAGE_CACHE_VALUE = coverage return coverage def _dashboard_graph_index_archives() -> list[Path]: module_root = Path(__file__).resolve().parent.parent roots = (module_root,) names = ("wiki-graph-runtime.tar.gz", "wiki-graph.tar.gz") seen: set[Path] = set() archives: list[Path] = [] for root in roots: for name in names: candidate = (root / "graph" / name).resolve() if candidate in seen: continue seen.add(candidate) if candidate.is_file(): archives.append(candidate) return archives def _packaged_graph_export_id() -> str | None: global _PACKAGED_GRAPH_EXPORT_ID_CACHE if isinstance(_PACKAGED_GRAPH_EXPORT_ID_CACHE, bool): return None if isinstance(_PACKAGED_GRAPH_EXPORT_ID_CACHE, str): return _PACKAGED_GRAPH_EXPORT_ID_CACHE module_root = Path(__file__).resolve().parent.parent try: data = json.loads( (module_root / "graph" / "communities.json").read_text( encoding="utf-8", ) ) except (OSError, json.JSONDecodeError): _PACKAGED_GRAPH_EXPORT_ID_CACHE = False return None export_id = data.get("export_id") if isinstance(data, dict) else None if isinstance(export_id, str) and export_id.strip(): _PACKAGED_GRAPH_EXPORT_ID_CACHE = export_id.strip() return export_id.strip() _PACKAGED_GRAPH_EXPORT_ID_CACHE = False return None def _archive_graph_export_id(archive: Path) -> str | None: try: with tarfile.open(archive, "r:gz") as tar: try: member = tar.getmember("./graphify-out/graph-export-manifest.json") except KeyError: member = tar.getmember("graphify-out/graph-export-manifest.json") source = tar.extractfile(member) if source is None: return None try: data = json.loads(source.read().decode("utf-8", errors="replace")) finally: source.close() except (KeyError, OSError, tarfile.TarError, json.JSONDecodeError): return None export_id = data.get("export_id") if isinstance(data, dict) else None return export_id.strip() if isinstance(export_id, str) and export_id.strip() else None def _ensure_dashboard_graph_index() -> Path | None: target = _dashboard_graph_index_path() if target.is_file(): if _dashboard_index_matches_manifest(target): return target try: target.unlink() except OSError: return None manifest_export_id = _dashboard_graph_manifest_export_id() packaged_export_id = _packaged_graph_export_id() if ( manifest_export_id is not None and packaged_export_id is not None and manifest_export_id != packaged_export_id ): return None archives = _dashboard_graph_index_archives() if not archives: return None if manifest_export_id is None: return None target.parent.mkdir(parents=True, exist_ok=True) try: with file_lock(target): if target.is_file(): if _dashboard_index_matches_manifest(target): return target try: target.unlink() except OSError: return None for archive in archives: archive_export_id = packaged_export_id or _archive_graph_export_id(archive) if manifest_export_id and archive_export_id and archive_export_id != manifest_export_id: continue try: with tarfile.open(archive, "r:gz") as tar: try: member = tar.getmember(f"./{_DASHBOARD_INDEX_MEMBER}") except KeyError: member = tar.getmember(_DASHBOARD_INDEX_MEMBER) if not member.isfile(): continue source = tar.extractfile(member) if source is None: continue tmp = target.with_name(f".{target.name}.{os.getpid()}.tmp") try: with tmp.open("wb") as out: for chunk in iter(lambda: source.read(1024 * 1024), b""): out.write(chunk) if not _dashboard_index_matches_manifest(tmp): continue os.replace(tmp, target) return target finally: source.close() if tmp.exists(): tmp.unlink() except (KeyError, OSError, tarfile.TarError): continue except TimeoutError: return None return target if target.is_file() else None def _index_node_size( *, slug: str, entity_type: str, quality: Any, usage: Any, degree: int, max_degree: int, ) -> dict[str, Any]: quality_value = _unit_score(quality) usage_value = _unit_score(usage) if quality_value is None or usage_value is None: sidecar_quality, sidecar_usage = _sidecar_score_inputs(slug, entity_type) quality_value = quality_value if quality_value is not None else sidecar_quality usage_value = usage_value if usage_value is not None else sidecar_usage q = 0.35 if quality_value is None else quality_value u = 0.0 if usage_value is None else usage_value popularity = ( math.log1p(max(0, degree)) / math.log1p(max(1, max_degree)) if max_degree > 0 else 0.0 ) signal = max(0.0, min(1.0, 0.45 * q + 0.35 * u + 0.20 * popularity)) return { "node_size": round(8.0 + signal * 16.0, 2), "size_signal": round(signal, 4), "size_reason": f"quality {q:.3f}; usage {u:.3f}; popularity {popularity:.3f}", } def _resolve_index_center( conn: sqlite3.Connection, raw_query: str, entity_type: str | None, ) -> tuple[str | None, dict[str, str] | None, list[str]]: raw_query = str(raw_query or "").strip() if not raw_query or "/" in raw_query or "\\" in raw_query or ".." in raw_query: return None, None, [] normalized_query = _slugish(raw_query) if not normalized_query or not _is_safe_slug(normalized_query): return None, None, [] entity_types = ( (entity_type,) if entity_type is not None else _DASHBOARD_ENTITY_TYPES ) candidates = [] for candidate in (raw_query, normalized_query): if candidate and candidate not in candidates: candidates.append(candidate) for current_type in entity_types: for candidate_slug in candidates: row = conn.execute( "SELECT node_id FROM slug_index WHERE slug=? AND type=? LIMIT 1", (candidate_slug, current_type), ).fetchone() if row is not None: return str(row["node_id"]), None, [candidate_slug] where = "" params: list[Any] = [] if entity_type is not None: where = "WHERE s.type=?" params.append(entity_type) rows = conn.execute( "SELECT s.slug,s.type,s.node_id,n.label,n.tags,n.degree " "FROM slug_index s JOIN nodes n ON n.id=s.node_id " f"{where}", params, ) matches: list[tuple[tuple[int, int, int], str, str]] = [] query_tokens = set(normalized_query.split("-")) for row in rows: node_slug = str(row["slug"] or "") label = _display_label(row["label"], fallback_slug=node_slug) haystacks = {_slugish(node_slug), _slugish(_display_slug(node_slug)), _slugish(label)} try: tags = json.loads(row["tags"] or "[]") except (TypeError, json.JSONDecodeError): tags = [] if isinstance(tags, list): haystacks.update(_slugish(str(tag)) for tag in tags[:12]) rank = None if normalized_query in haystacks: rank = 0 elif any(h.startswith(normalized_query) for h in haystacks): rank = 1 elif any(normalized_query in h for h in haystacks): rank = 2 elif query_tokens and all( any(token in h for h in haystacks) for token in query_tokens ): rank = 3 if rank is None: continue try: degree = int(row["degree"] or 0) except (TypeError, ValueError): degree = 0 matches.append(((rank, len(node_slug), -degree), str(row["node_id"]), node_slug)) matches.sort(key=lambda item: item[0]) suggestions: list[str] = [] for _, _node_id, suggestion in matches[:8]: display_suggestion = _display_slug(suggestion) if display_suggestion not in suggestions: suggestions.append(display_suggestion) if not matches: return None, None, suggestions center = matches[0][1] return ( center, {"query": raw_query, "slug": _graph_slug_from_node_id(center), "id": center}, suggestions, ) def _graph_neighborhood_from_index( slug: str, *, hops: int, limit: int, entity_type: str | None, ) -> dict | None: index_path = _ensure_dashboard_graph_index() if index_path is None or not index_path.is_file(): return None try: conn = sqlite3.connect(f"file:{index_path.as_posix()}?mode=ro", uri=True) except sqlite3.Error: return None conn.row_factory = sqlite3.Row try: meta = { row["key"]: json.loads(row["value"]) for row in conn.execute("SELECT key,value FROM meta") } max_degree = int(meta.get("max_degree") or 1) top_k = int(meta.get("top_k") or 0) if hops > 1 or (top_k > 0 and limit > top_k): return None center, resolved, suggestions = _resolve_index_center(conn, slug, entity_type) if center is None: return {"nodes": [], "edges": [], "center": None, "suggestions": suggestions} nodes_out: dict[str, dict[str, Any]] = {} edges_out: list[dict[str, Any]] = [] emitted_edges: set[tuple[str, str]] = set() frontier = [center] seen = {center} def add_node(node_id: str, depth: int) -> None: if node_id in nodes_out: return row = conn.execute("SELECT * FROM nodes WHERE id=?", (node_id,)).fetchone() if row is None: return tags = json.loads(row["tags"] or "[]") degree = int(row["degree"] or 0) node_type = str(row["type"] or _graph_type_from_node_id(node_id)) node_slug = _graph_slug_from_node_id(node_id) size_data = _index_node_size( slug=node_slug, entity_type=node_type, quality=row["quality_score"], usage=row["usage_score"], degree=degree, max_degree=max_degree, ) label = _display_label(row["label"], fallback_slug=node_slug) nodes_out[node_id] = { "data": { "id": node_id, "label": label, "type": node_type, "depth": depth, "degree": degree, "tags": tags[:6], "description": row["description"] or "", "quality_score": row["quality_score"], "usage_score": row["usage_score"], "filter_tokens": [ node_id, row["label"], node_slug, _display_slug(node_slug), label, *tags, ], **size_data, }, } add_node(center, 0) for depth in range(1, hops + 1): next_frontier: list[str] = [] for node_id in frontier: row = conn.execute( "SELECT payload FROM neighbors WHERE source=?", (node_id,), ).fetchone() if row is None: continue neighbors = json.loads(zlib.decompress(row["payload"]).decode("utf-8")) for edge in neighbors: if len(nodes_out) >= limit: break other = str(edge.get("target") or "") if not other: continue add_node(other, depth) edge_a, edge_b = sorted((node_id, other)) edge_key = (edge_a, edge_b) if edge_key not in emitted_edges and other in nodes_out: emitted_edges.add(edge_key) shared_tags = list(edge.get("shared_tags") or [])[:4] for current in (node_id, other): tokens = nodes_out[current]["data"].setdefault( "filter_tokens", [] ) tokens.extend(shared_tags) edges_out.append({ "data": { "id": f"{edge_key[0]}__{edge_key[1]}", "source": node_id, "target": other, "weight": edge.get("weight", 1), "shared_tags": shared_tags, "reasons": edge.get("reasons", []), "semantic": edge.get("semantic"), "tag_sim": edge.get("tag_sim"), "slug_token_sim": edge.get("slug_token_sim"), "source_overlap": edge.get("source_overlap"), }, }) if other not in seen: seen.add(other) next_frontier.append(other) if len(nodes_out) >= limit: break frontier = next_frontier if len(nodes_out) >= limit: break return dashboard_graph.enrich_neighborhood({ "nodes": list(nodes_out.values()), "edges": edges_out, "center": center, "resolved": resolved or {"source": "dashboard-index"}, "suggestions": [], }, source="dashboard-index") except (OSError, sqlite3.Error, json.JSONDecodeError, zlib.error, KeyError, TypeError): return None finally: conn.close() def _graph_neighborhood( slug: str, hops: int = 1, limit: int = 40, entity_type: str | None = None, ) -> dict: """Return dashboard-shaped {nodes, edges} for the N-hop neighborhood. Uses ``resolve_graph.load_graph`` so the NetworkX 'links' vs 'edges' schema is handled centrally. Returns an empty shape if the graph hasn't been built or the slug isn't a node. """ if "/" in slug or "\\" in slug or ".." in slug: return {"nodes": [], "edges": [], "center": None} normalized_entity_type = _normalize_dashboard_entity_type(entity_type) index_path = _dashboard_graph_index_path() has_runtime_overlays = _dashboard_graph_has_runtime_overlays() index_covers_overlays = ( not has_runtime_overlays or _dashboard_index_covers_runtime_overlays(index_path) ) if index_covers_overlays: indexed = _graph_neighborhood_from_index( slug, hops=hops, limit=limit, entity_type=normalized_entity_type, ) if indexed is not None: return indexed elif hops == 1 and index_path.is_file() and _dashboard_index_matches_manifest(index_path): indexed = _graph_neighborhood_from_index( slug, hops=hops, limit=limit, entity_type=normalized_entity_type, ) center = indexed.get("center") if isinstance(indexed, dict) else None uncovered = _dashboard_uncovered_runtime_overlay_nodes(index_path) if indexed is not None and isinstance(center, str) and uncovered is not None: if center not in uncovered: return indexed try: G = _load_dashboard_graph() except Exception: # noqa: BLE001 — graph is advisory; blank on error return {"nodes": [], "edges": [], "center": None} if G.number_of_nodes() == 0: return {"nodes": [], "edges": [], "center": None} center = None if entity_type is not None and normalized_entity_type is None: return {"nodes": [], "edges": [], "center": None} center, resolved, suggestions = _resolve_graph_center( G, slug, normalized_entity_type, ) if center is None: return {"nodes": [], "edges": [], "center": None} nodes_out: dict[str, dict] = {} edges_out: list[dict] = [] emitted_edges: set[tuple[str, str]] = set() frontier = [center] seen: set[str] = {center} try: max_degree = max((int(degree) for _node, degree in G.degree()), default=1) except Exception: # noqa: BLE001 max_degree = 1 def _add_node(nid: str, depth: int) -> None: if nid in nodes_out: return data = dict(G.nodes.get(nid, {})) node_slug = nid.split(":", 1)[-1] label = _display_label(data.get("label"), fallback_slug=node_slug) tags = list(data.get("tags", [])) default_type = ( "mcp-server" if nid.startswith("mcp-server:") else "harness" if nid.startswith("harness:") else "agent" if nid.startswith("agent:") else "skill" ) ntype = data.get("type") or default_type try: degree = int(G.degree[nid]) except Exception: # noqa: BLE001 degree = 0 size_data = _graph_node_size( nid, data, entity_type=str(ntype), degree=degree, max_degree=max_degree, ) nodes_out[nid] = { "data": { "id": nid, "label": label, "type": ntype, "depth": depth, "degree": degree, "tags": tags[:6], "description": data.get("description", ""), "quality_score": data.get("quality_score"), "usage_score": data.get("usage_score"), "filter_tokens": [nid, label, node_slug, _display_slug(node_slug), *tags], **size_data, }, } _add_node(center, 0) for depth in range(1, hops + 1): next_frontier: list[str] = [] for nid in frontier: # Sort neighbors by edge weight so we pick the strongest # connections first under the ``limit`` cap. neighbors = sorted( G[nid].items(), key=lambda kv: -kv[1].get("weight", 1), ) for other, edata in neighbors: if len(nodes_out) >= limit: break _add_node(other, depth) edge_key = tuple(sorted((nid, other))) if edge_key not in emitted_edges: emitted_edges.add(edge_key) shared_tags = edata.get("shared_tags", [])[:4] for node_id in (nid, other): tokens = nodes_out[node_id]["data"].setdefault( "filter_tokens", [] ) tokens.extend(shared_tags) edges_out.append({ "data": { "id": f"{edge_key[0]}__{edge_key[1]}", "source": nid, "target": other, "weight": edata.get("weight", 1), "shared_tags": shared_tags, "reasons": edata.get("reasons", []), "semantic": edata.get("semantic"), "tag_sim": edata.get("tag_sim"), "slug_token_sim": edata.get("slug_token_sim"), "source_overlap": edata.get("source_overlap"), }, }) if other not in seen: seen.add(other) next_frontier.append(other) if len(nodes_out) >= limit: break frontier = next_frontier if len(nodes_out) >= limit: break return dashboard_graph.enrich_neighborhood({ "nodes": list(nodes_out.values()), "edges": edges_out, "center": center, "resolved": resolved, "suggestions": suggestions, }, source="networkx") def _graph_stats() -> dict: """Top-line graph stats for the home page.""" report = _wiki_dir() / "graphify-out" / "graph-report.md" try: match = _GRAPH_REPORT_RE.search( report.read_text(encoding="utf-8", errors="replace"), ) if match: return { "nodes": int(match.group(1).replace(",", "")), "edges": int(match.group(2).replace(",", "")), "available": True, } except OSError: pass index_path = _ensure_dashboard_graph_index() if index_path is not None and index_path.is_file(): try: conn = sqlite3.connect(f"file:{index_path.as_posix()}?mode=ro", uri=True) try: meta = { row[0]: json.loads(row[1]) for row in conn.execute("SELECT key,value FROM meta") } return { "nodes": int(meta.get("nodes_count") or 0), "edges": int(meta.get("edges_count") or 0), "available": int(meta.get("nodes_count") or 0) > 0, } finally: conn.close() except (OSError, sqlite3.Error, ValueError, TypeError, json.JSONDecodeError): pass try: G = _load_dashboard_graph() except Exception: # noqa: BLE001 return {"nodes": 0, "edges": 0, "available": False} return { "nodes": G.number_of_nodes(), "edges": G.number_of_edges(), "available": G.number_of_nodes() > 0, } def _wiki_stats_from_dashboard_index() -> dict[str, int] | None: index_path = _dashboard_graph_index_path() if not index_path.is_file() or not _dashboard_index_matches_manifest(index_path): return None try: conn = sqlite3.connect(f"file:{index_path.as_posix()}?mode=ro", uri=True) try: rows = { str(row[0]): int(row[1]) for row in conn.execute("SELECT type,COUNT(*) FROM nodes GROUP BY type") } finally: conn.close() except (OSError, sqlite3.Error, ValueError, TypeError): return None stats = { "skills": rows.get("skill", 0), "agents": rows.get("agent", 0), "mcps": rows.get("mcp-server", 0), "harnesses": rows.get("harness", 0), } stats["total"] = sum(stats.values()) stats["split_known"] = True return stats def _wiki_stats() -> dict: """Entity counts across all dashboard-supported entity types. MCPs are sharded by first-char under ``entities/mcp-servers//`` so we recurse rather than the flat glob used for skills + agents. Home page consumes ``total`` for the headline number and the individual counts for the dashboard entity-type detail line. """ indexed = _wiki_stats_from_dashboard_index() if indexed is not None: return indexed base = _wiki_dir() / "entities" graph_out = _wiki_dir() / "graphify-out" if graph_out.is_dir() and (graph_out / "graph-report.md").is_file(): graph_stats = _graph_stats() return { "skills": 0, "agents": 0, "mcps": 0, "harnesses": 0, "total": int(graph_stats.get("nodes") or 0), "split_known": False, } skills = len(list((base / "skills").glob("*.md"))) if (base / "skills").is_dir() else 0 agents = len(list((base / "agents").glob("*.md"))) if (base / "agents").is_dir() else 0 mcp_dir = base / "mcp-servers" mcps = len(list(mcp_dir.rglob("*.md"))) if mcp_dir.is_dir() else 0 harnesses = len(list((base / "harnesses").glob("*.md"))) if (base / "harnesses").is_dir() else 0 return { "skills": skills, "agents": agents, "mcps": mcps, "harnesses": harnesses, "total": skills + agents + mcps + harnesses, "split_known": True, } def _render_home() -> str: sessions = _summarize_sessions() recent = sessions[:10] gstats = _graph_stats() wstats = _wiki_stats() runtime = _runtime_lifecycle_summary() audit_lines = sum(1 for _ in _audit_log_path().open(encoding="utf-8")) \ if _audit_log_path().exists() else 0 manifest = _read_manifest() recent_audit = _read_jsonl(_audit_log_path(), limit=10) if wstats.get("split_known", True): wiki_detail = ( f"{wstats['skills']:,} skills · {wstats['agents']:,} agents · " f"{wstats['mcps']:,} MCPs · {wstats['harnesses']:,} harnesses" ) else: wiki_detail = "entity split unavailable; install the current graph index" rows = [] for s in recent: sid = s["session_id"] rows.append( f"" f"{html.escape(sid[:20])}" f"{html.escape(s['last_seen'] or '—')}" f"{len(s['skills_loaded'])}" f"{len(s['skills_unloaded'])}" f"{len(s['agents_loaded'])}" f"{s['score_updates']}" f"" ) audit_rows = "".join( f"{html.escape((r.get('ts') or '')[-8:])}" f"{html.escape(r.get('event', ''))}" f"{html.escape(r.get('subject',''))}" f"" for r in reversed(recent_audit) ) body = ( "

ctx monitor

" # ── Stat grid ──────────────────────────────────────────────── "
" + f"
Currently loaded
" f"
{_format_count(len(manifest.get('load', [])))}
" f"manage →
" + "
Sidecars
" "
...
" "browse →
" + f"
Wiki entities
" f"
{_format_count(wstats['total'])}
" f"" f"{html.escape(wiki_detail)}
" + f"
Knowledge graph
" f"
{_format_count(gstats['nodes'])}
" f"{_format_count(gstats['edges'])} edges" f" · explore →
" + f"
Runtime checks
" f"
{_format_count(runtime['validations_total'])}
" f"" f"{_format_count(runtime['validation_failures'])} failed / " f"{_format_count(runtime['open_escalations_total'])} open escalations" f" / view ->
" + f"
Audit events
" f"
{_format_count(audit_lines)}
" f"view → · live →
" + f"
Sessions
" f"
{_format_count(len(sessions))}
" f"browse →
" + "
" # ── Grade distribution ──────────────────────────────────────── "
Skill quality grades: " + "".join( f"{g}: ... " for g in ("A", "B", "C", "D", "F") ) + " · total loading" "
" "" # ── Two-column: recent sessions + recent audit ──────────────── "
" f"
Recent sessions ({_format_count(len(sessions))} total)" + ("" "" "" + "".join(rows) + "
SessionLast seenLoadUnloadAgentsScores
" if recent else "

No sessions recorded yet. Hooks start logging " "once you run a Claude Code session with ctx installed.

") + "
" "
Latest audit events" + ("" "" + audit_rows + "
TimeEventSubject
" if recent_audit else "

No audit events yet.

") + "
" "
" ) return _layout("Home", body) def _render_sessions_index() -> str: sessions = _summarize_sessions() rows = [] for s in sessions: sid = s["session_id"] rows.append( f"" f"{html.escape(sid[:32])}" f"{html.escape(s['first_seen'] or '—')}" f"{html.escape(s['last_seen'] or '—')}" f"{len(s['skills_loaded'])}" f"{len(s['skills_unloaded'])}" f"{len(s['agents_loaded'])}" f"{len(s['agents_unloaded'])}" f"{len(s['mcps_loaded'])}" f"{len(s['mcps_unloaded'])}" f"{s['lifecycle_transitions']}" f"" ) body = ( "

Sessions

" f"

{len(sessions)} unique sessions observed.

" "" "" "" "" "" + "".join(rows) + "
SessionFirst seenLast seenSkills↑Skills↓Agents↑Agents↓MCPs↑MCPs↓Lifecycle
" ) return _layout("Sessions", body) def _render_session_detail(session_id: str) -> str: detail = _session_detail(session_id) audit = detail["audit_entries"] events = detail["load_events"] audit_rows = "".join( f"{html.escape(r.get('ts', ''))}" f"{html.escape(r.get('event', ''))}" f"{html.escape(r.get('subject', ''))}" f"{html.escape(json.dumps(r.get('meta', {}))[:80])}" for r in audit ) event_rows = "".join( f"{html.escape(r.get('timestamp', ''))}" f"{html.escape(r.get('event', ''))}" f"{html.escape(r.get('skill') or r.get('agent') or '')}" for r in events ) body = ( f"

Session {html.escape(session_id)}

" f"
{len(audit)} audit entries · " f"{len(events)} load/unload events
" "

Audit timeline

" "" + audit_rows + "
tseventsubjectmeta
" "

Load/unload events

" "" + event_rows + "
tseventsubject
" ) return _layout(f"Session {session_id}", body) def _render_skills(qs: dict[str, str] | None = None) -> str: payload = _sidecar_page_payload(qs) sidecars = payload["items"] cards = "".join( f"
" f"
" f"{html.escape(s.get('slug', ''))}" f"{html.escape(s.get('grade', 'F'))}" f"
" f"
" f"score {s.get('raw_score', 0.0):.3f} · {html.escape(s.get('type', s.get('subject_type', 'skill')))}" f"{' · ' + html.escape(s.get('hard_floor','')) if s.get('hard_floor') else ''}" f"
" f"
" f"sidecar" f"wiki" f"graph" f"
" f"
" for s in sidecars ) start_index = ((payload["page"] - 1) * payload["limit"]) + 1 if payload["total"] else 0 end_index = min(payload["page"] * payload["limit"], payload["total"]) summary = ( f"Showing {start_index}-{end_index} of {payload['total']} matching sidecars" if payload["filtered"] else f"Showing {start_index}-{end_index} of {payload['catalog_total']} sidecars" ) query_base = { key: value for key, value in (qs or {}).items() if key not in {"page"} } def page_href(page: int) -> str: params = { **query_base, "page": str(max(1, page)), "limit": str(payload["limit"]), } query = "&".join( f"{quote(str(key))}={quote(str(value))}" for key, value in params.items() if str(value).strip() ) return "/skills" + (f"?{query}" if query else "") prev_link = ( f"previous" if payload["has_prev"] else "previous" ) next_link = ( f"next" if payload["has_next"] else "next" ) pagination = ( "
" f"{html.escape(summary)} · page " f"{payload['page']} of {payload['pages']}" f"{prev_link} · {next_link}" "
" ) selected_type = ",".join(payload["types"]) selected_grade = ",".join(payload["grades"]) type_options = "" + "".join( f"" for t in _DASHBOARD_ENTITY_TYPES ) grade_options = "" + "".join( f"" for g in ("A", "B", "C", "D", "F") ) limit_options = "".join( f"" for n in (50, 100, 200, 500) ) hide_checked = " checked" if payload["hide_floor"] else "" body = ( "

Quality sidecars

" f"

{payload['catalog_total']} sidecars · click any card to drill in.

" + pagination + "
" # ── Left filter sidebar ────────────────────────────────────── "" # ── Card grid ──────────────────────────────────────────────── "
" + cards + "
" "
" "" ) return _layout("Skills", body) def _render_skill_detail(slug: str, entity_type: str | None = None) -> str: sidecar = _load_sidecar(slug, entity_type=entity_type) if sidecar is None: return _layout(slug, f"

{html.escape(slug)}

No sidecar.

") requested_type = ( _normalize_dashboard_entity_type(entity_type) or _sidecar_entity_type(sidecar) ) audit = [r for r in _read_jsonl(_audit_log_path()) if r.get("subject") == slug and _audit_entity_type(r) == requested_type] audit_rows = "".join( f"{html.escape(r.get('ts', ''))}" f"{html.escape(r.get('event', ''))}" f"{html.escape(r.get('actor', ''))}" for r in audit[-100:] ) hard_floor = sidecar.get("hard_floor") hard_floor_html = ( f" · floor {html.escape(str(hard_floor))}" if hard_floor else "" ) body = ( f"

{html.escape(slug)}

" f"
" f"grade {html.escape(sidecar.get('grade', 'F'))} " f"score {sidecar.get('raw_score', 0.0):.3f} " f"· type {html.escape(sidecar.get('subject_type', ''))}" f"{hard_floor_html}" "
" "

Sidecar

" f"
{html.escape(json.dumps(sidecar, indent=2)[:4000])}
" f"

Audit timeline ({len(audit)} entries)

" "" + audit_rows + "
tseventactor
" ) return _layout(slug, body) def _top_degree_seeds_from_index(limit: int = 18) -> list[dict]: index_path = _dashboard_graph_index_path() if ( _dashboard_graph_has_runtime_overlays() and not _dashboard_index_covers_runtime_overlays(index_path) ): return [] ensured_index_path = _ensure_dashboard_graph_index() if ensured_index_path is None or not ensured_index_path.is_file(): return [] conn: sqlite3.Connection | None = None try: conn = sqlite3.connect(f"file:{ensured_index_path.as_posix()}?mode=ro", uri=True) conn.row_factory = sqlite3.Row rows = conn.execute( "SELECT id,label,type,degree FROM nodes ORDER BY degree DESC,id LIMIT ?", (max(1, limit),), ).fetchall() except (OSError, sqlite3.Error, TimeoutError): return [] finally: if conn is not None: conn.close() return [ { "slug": _graph_slug_from_node_id(str(row["id"])), "type": _graph_type_from_node_id(str(row["id"]), str(row["type"] or "skill")), "degree": int(row["degree"] or 0), "label": row["label"] or _graph_slug_from_node_id(str(row["id"])), } for row in rows ] def _top_degree_seeds(limit: int = 18, *, allow_load: bool = True) -> list[dict]: """Pick high-degree nodes from the graph as seed suggestions. Used by ``/graph`` landing page so the first-time visitor has something to click. Falls back to empty on any graph-load failure. """ try: G = _load_dashboard_graph() if allow_load else _GRAPH_CACHE_VALUE except Exception: # noqa: BLE001 return [] if G is None: return _top_degree_seeds_from_index(limit) if G.number_of_nodes() == 0: return [] ranked = sorted(G.degree, key=lambda kv: -kv[1])[:limit] out: list[dict] = [] for node_id, degree in ranked: prefix, _, slug = node_id.partition(":") seed_type = ( "mcp-server" if prefix == "mcp-server" else "harness" if prefix == "harness" else "agent" if prefix == "agent" else "skill" ) out.append({ "slug": slug, "type": seed_type, "degree": int(degree), "label": G.nodes[node_id].get("label", slug), }) return out def _read_default_config_raw() -> dict[str, Any]: try: from ctx_config import _read_default_config # type: ignore raw = _read_default_config() return raw if isinstance(raw, dict) else {} except Exception: # noqa: BLE001 path = Path(__file__).with_name("config.json") if not path.exists(): return {} try: raw = json.loads(path.read_text(encoding="utf-8")) return raw if isinstance(raw, dict) else {} except Exception: # noqa: BLE001 return {} def _read_user_config_raw() -> dict[str, Any]: path = _user_config_path() if not path.exists(): return {} try: raw = json.loads(path.read_text(encoding="utf-8")) return raw if isinstance(raw, dict) else {} except Exception: # noqa: BLE001 return {} def _deep_merge_config(base: dict[str, Any], override: dict[str, Any]) -> None: for key, value in override.items(): if isinstance(base.get(key), dict) and isinstance(value, dict): _deep_merge_config(base[key], value) else: base[key] = value def _config_value(raw: dict[str, Any], path: str, default: Any = None) -> Any: current: Any = raw for part in path.split("."): if not isinstance(current, dict) or part not in current: return default current = current[part] return current def _set_config_value(raw: dict[str, Any], path: str, value: Any) -> None: current = raw parts = path.split(".") for part in parts[:-1]: child = current.get(part) if not isinstance(child, dict): child = {} current[part] = child current = child current[parts[-1]] = value def _delete_config_value(raw: dict[str, Any], path: str) -> None: current = raw parts = path.split(".") parents: list[tuple[dict[str, Any], str]] = [] for part in parts[:-1]: child = current.get(part) if not isinstance(child, dict): return parents.append((current, part)) current = child current.pop(parts[-1], None) for parent, key in reversed(parents): child = parent.get(key) if isinstance(child, dict) and not child: parent.pop(key, None) def _config_field_specs() -> tuple[dict[str, Any], ...]: return ( {"group": "Knowledge", "path": "knowledge.mode", "type": "choice", "choices": ("shipped", "local", "enriched"), "required": True, "label": "Knowledge source mode", "help": "shipped uses ctx's packaged graph/wiki, local stays private, enriched starts from shipped knowledge and adds your own.", "example": "enriched"}, {"group": "Recommendation", "path": "resolver.recommendation_top_k", "type": "int", "min": 1, "max": 5, "required": True, "label": "Max mixed recommendations", "help": "Hard cap for the combined skills/agents/MCP recommendation bundle.", "example": 5}, {"group": "Recommendation", "path": "resolver.recommendation_min_normalized_score", "type": "float", "min": 0.0, "max": 1.0, "step": 0.01, "required": True, "label": "Minimum recommendation score", "help": "Drops weak skill/agent/MCP matches instead of recommending at all cost.", "example": 0.30}, {"group": "Recommendation", "path": "resolver.max_skills", "type": "int", "min": 1, "max": 50, "label": "Resolver hard skill ceiling", "help": "Maximum load candidates considered by a resolver call.", "example": 15}, {"group": "Harness", "path": "harness.recommendation_min_fit_score", "type": "float", "min": 0.0, "max": 1.0, "step": 0.01, "required": True, "label": "Minimum harness fit score", "help": "Custom/API/local model users only see harnesses at or above this fit floor.", "example": 0.85}, {"group": "Harness", "path": "harness.recommendation_min_normalized_score", "type": "float", "min": 0.0, "max": 1.0, "step": 0.01, "label": "Harness normalized score floor", "help": "Compatibility display floor for older configs.", "example": 0.85}, {"group": "Micro-skills", "path": "skill_transformer.line_threshold", "type": "int", "min": 1, "max": 2000, "required": True, "label": "Micro-skill line threshold", "help": "Any SKILL.md above this many lines triggers the micro-skills conversion gate.", "example": 180}, {"group": "Micro-skills", "path": "skill_transformer.max_stage_lines", "type": "int", "min": 1, "max": 300, "label": "Max staged reference lines", "help": "Target maximum lines for each generated reference stage.", "example": 40}, {"group": "Micro-skills", "path": "skill_transformer.stage_count", "type": "int", "min": 1, "max": 20, "label": "Stage count", "help": "Target number of staged references for long skills.", "example": 5}, {"group": "Graph", "path": "graph.min_edge_weight", "type": "float", "min": 0.0, "max": 1.0, "step": 0.01, "required": True, "label": "Minimum final edge weight", "help": "Edges below this blended score are dropped from graph.json during rebuild.", "example": 0.03}, {"group": "Graph", "path": "graph.edge_weights.semantic", "type": "float", "min": 0.0, "max": 1.0, "step": 0.01, "required": True, "label": "Semantic edge weight", "help": "Semantic portion of the blended edge score. Semantic/tags/slug tokens should sum to 1.", "example": 0.70}, {"group": "Graph", "path": "graph.edge_weights.tags", "type": "float", "min": 0.0, "max": 1.0, "step": 0.01, "required": True, "label": "Tag edge weight", "help": "Tag-overlap portion of the blended edge score.", "example": 0.15}, {"group": "Graph", "path": "graph.edge_weights.slug_tokens", "type": "float", "min": 0.0, "max": 1.0, "step": 0.01, "required": True, "label": "Slug-token edge weight", "help": "Slug-token overlap portion of the blended edge score.", "example": 0.15}, {"group": "Graph", "path": "graph.semantic.top_k", "type": "int", "min": 1, "max": 200, "label": "Semantic neighbors per entity", "help": "Maximum nearest semantic neighbors retained per entity during graph build.", "example": 20}, {"group": "Graph", "path": "graph.semantic.build_floor", "type": "float", "min": 0.0, "max": 1.0, "step": 0.01, "label": "Semantic build floor", "help": "Low inclusion bar used when graph embeddings are rebuilt.", "example": 0.50}, {"group": "Graph", "path": "graph.semantic.min_cosine", "type": "float", "min": 0.0, "max": 1.0, "step": 0.01, "required": True, "label": "Semantic display floor", "help": "Read-time semantic filter. Raising this is stricter without forcing a rebuild.", "example": 0.80}, {"group": "Graph", "path": "graph.tag_edges.dense_tag_threshold", "type": "int", "min": 1, "max": 10000, "label": "Dense tag cutoff", "help": "Tags shared by more than this many entities do not create broad noisy cliques.", "example": 500}, {"group": "Graph", "path": "graph.token_edges.dense_token_threshold", "type": "int", "min": 1, "max": 10000, "label": "Dense slug-token cutoff", "help": "Slug words shared by too many entities are ignored as edge creators.", "example": 30}, {"group": "Intake", "path": "intake.enabled", "type": "bool", "required": True, "label": "Intake quality gate", "help": "Runs duplicate/near-duplicate and body-quality checks when entities are added or updated.", "example": True}, {"group": "Intake", "path": "intake.dup_threshold", "type": "float", "min": 0.0, "max": 1.0, "step": 0.01, "label": "Duplicate threshold", "help": "Similarity at or above this is treated as a duplicate.", "example": 0.93}, {"group": "Intake", "path": "intake.near_dup_threshold", "type": "float", "min": 0.0, "max": 1.0, "step": 0.01, "label": "Near-duplicate threshold", "help": "Similarity at or above this asks the user to update/merge instead of blindly adding.", "example": 0.80}, {"group": "Paths", "path": "paths.wiki_dir", "type": "str", "required": True, "label": "Wiki directory", "help": "Runtime llm-wiki directory used by dashboard, graph, and recommendation flows.", "example": "~/.claude/skill-wiki"}, {"group": "Paths", "path": "paths.skills_dir", "type": "str", "required": True, "label": "Skills directory", "help": "Installed local skills directory.", "example": "~/.claude/skills"}, {"group": "Paths", "path": "paths.agents_dir", "type": "str", "required": True, "label": "Agents directory", "help": "Installed local agents directory.", "example": "~/.claude/agents"}, ) _CONFIG_REMOVE = object() def _coerce_config_value(spec: dict[str, Any], raw_value: Any) -> Any: if raw_value is None or (isinstance(raw_value, str) and raw_value.strip() == ""): return _CONFIG_REMOVE kind = spec.get("type", "str") if kind == "bool": if isinstance(raw_value, bool): return raw_value text = str(raw_value).strip().lower() if text in {"true", "1", "yes", "on"}: return True if text in {"false", "0", "no", "off"}: return False raise ValueError(f"{spec['path']} must be true or false") if kind == "int": if isinstance(raw_value, bool): raise ValueError(f"{spec['path']} must be an integer") value: int | float = 0 value = int(raw_value) elif kind == "float": if isinstance(raw_value, bool): raise ValueError(f"{spec['path']} must be a number") value = float(raw_value) elif kind == "choice": choice_value = str(raw_value).strip() if choice_value not in spec.get("choices", ()): raise ValueError(f"{spec['path']} must be one of {spec.get('choices')}") return choice_value else: text_value = str(raw_value).strip() return text_value if text_value else _CONFIG_REMOVE if "min" in spec and value < spec["min"]: raise ValueError(f"{spec['path']} must be >= {spec['min']}") if "max" in spec and value > spec["max"]: raise ValueError(f"{spec['path']} must be <= {spec['max']}") return value def _effective_config_payload() -> dict[str, Any]: defaults = _read_default_config_raw() user = _read_user_config_raw() effective = json.loads(json.dumps(defaults)) _deep_merge_config(effective, user) return { "defaults": defaults, "user": user, "effective": effective, "path": str(_user_config_path()), } def _save_config_updates(updates: dict[str, Any]) -> dict[str, Any]: specs = {spec["path"]: spec for spec in _config_field_specs()} unknown = sorted(set(updates) - set(specs)) if unknown: return {"ok": False, "detail": f"unknown config keys: {', '.join(unknown)}"} user_config = _read_user_config_raw() try: for path, raw_value in updates.items(): value = _coerce_config_value(specs[path], raw_value) if value is _CONFIG_REMOVE: _delete_config_value(user_config, path) else: _set_config_value(user_config, path, value) except (TypeError, ValueError) as exc: return {"ok": False, "detail": str(exc)} config_path = _user_config_path() with file_lock(config_path): _atomic_write_text( config_path, json.dumps(user_config, indent=2, sort_keys=True) + "\n", ) return {"ok": True, "detail": f"saved {len(updates)} config keys"} def _render_config() -> str: payload = _effective_config_payload() effective = payload["effective"] user = payload["user"] rows_by_group: dict[str, list[str]] = defaultdict(list) for spec in _config_field_specs(): path = spec["path"] value = _config_value(effective, path, "") default = _config_value(payload["defaults"], path, "") user_value = _config_value(user, path, _CONFIG_REMOVE) is_override = user_value is not _CONFIG_REMOVE required = bool(spec.get("required")) req_html = " Required" if required else "" help_text = html.escape(str(spec.get("help", ""))) default_html = html.escape(json.dumps(default) if not isinstance(default, str) else default) example_value = spec.get("example") example_html = html.escape(json.dumps(example_value) if not isinstance(example_value, str) else str(example_value)) common_attrs = ( f"name='{html.escape(path)}' data-config-path='{html.escape(path)}' " f"data-original-value='{html.escape(str(value))}' " f"data-default='{default_html}' {'required' if required else ''}" ) if spec.get("type") == "choice": options = "".join( f"" for choice in spec.get("choices", ()) ) control = f"" elif spec.get("type") == "bool": control = ( f"" ) elif spec.get("type") in {"int", "float"}: step = spec.get("step", 1 if spec.get("type") == "int" else 0.01) control = ( f"" ) else: control = ( f"" ) override_html = ( "override" if is_override else "default" ) clear_html = ( f"" if is_override else "" ) rows_by_group[str(spec["group"])].append( "
" f"" f"
{control}
" f"{clear_html}" f"

{help_text}
" f"Default: {default_html} · Example: {example_html} · " f"{override_html}

" "
" ) group_html = "".join( "
" f"

{html.escape(group)}

" + "".join(rows) + "
" for group, rows in rows_by_group.items() ) token = _MONITOR_TOKEN or "" body = ( "

Config

" "

Edit ctx runtime defaults from the dashboard. Saves only changed fields. For existing overrides, use remove user override to fall back to the shipped default. Important fields are marked Required.

" f"

User config: {html.escape(payload['path'])}

" "
" + group_html + "
" " " " " "" "
" "" ) return _layout("Config", body) def _render_graph(focus: str | None = None, focus_type: str | None = None) -> str: """Interactive graph view backed by a dependency-free SVG renderer.""" focus_slug = focus or "" gstats = _graph_stats() seeds = ( _top_degree_seeds(allow_load=False) if not focus_slug and gstats.get("available") else [] ) initial_slug = focus_slug initial_type = focus_type or "" if not initial_slug and seeds: initial_slug = str(seeds[0].get("slug") or "") initial_type = str(seeds[0].get("type") or "") elif not initial_slug and gstats.get("available"): initial_slug = _DEFAULT_GRAPH_FOCUS_SLUG focus_js = _json_for_script(initial_slug) focus_type_js = _json_for_script(initial_type) seed_html = "" if seeds: chips = "".join( f"" f"{html.escape(s['slug'])} " f"· deg {_format_count(s['degree'])}" f"" for s in seeds ) seed_html = ( "
Popular seed slugs " "" "(click to explore 1-hop neighborhood)" f"
{chips}
" ) stats_html = ( f"{gstats.get('nodes', 0):,} nodes · " f"{gstats.get('edges', 0):,} edges" ) body = ( "

Knowledge graph

" f"

Enter an entity slug to explore its 1-hop " f"neighborhood. Edges blend semantic + tag + slug-token " f"signals (weight = final_weight). {stats_html}

" + seed_html # Two-column layout — filter sidebar on the left (mirrors /wiki), # graph list on the right. Client-side JS hides nodes by # type + tag without hitting the server so a user can carve out # a subgraph without rebuilding anything. + "
" # Left sidebar "" # Right: graph list panel "
" "
" "" ) return _layout("Graph", body) def _runtime_graph_center_data(graph: dict) -> dict[str, Any] | None: center = str(graph.get("center") or "") if not center: return None for node in graph.get("nodes", []): if not isinstance(node, dict): continue data = node.get("data", {}) if isinstance(data, dict) and str(data.get("id") or "") == center: return data return None def _runtime_graph_metric_row(label: str, value: object) -> str: if value is None or value == "": value_html = "unknown" elif isinstance(value, float): value_html = f"{value:.3f}" else: value_html = f"{html.escape(str(value))}" return f"{html.escape(label)}{value_html}" def _render_runtime_entity_action( slug: str, entity_type: str, *, mutations_enabled: bool, ) -> str: escaped_slug = html.escape(slug) escaped_type = html.escape(entity_type) if entity_type == "harness": return ( "
" "

Install harness

" "

Harnesses are installed through the harness CLI so ctx can " "collect the model, goal, and verification details before wiring recommendations.

" f"
ctx-harness-install {escaped_slug} --dry-run\n"
            f"ctx-harness-install {escaped_slug}
" "
" ) disabled = " disabled" if not mutations_enabled else "" disabled_note = ( "

Load/install actions are disabled because this dashboard is not " "bound to loopback.

" if not mutations_enabled else "" ) return ( "
" "

Load or install

" "

Use this when the backing wiki contains the installable entity. " "If runtime mode only installed graph metadata, install the full wiki first.

" f"" f"{disabled_note}" "

" "
" ) def _render_runtime_entity_load_script( slug: str, entity_type: str, *, mutations_enabled: bool, ) -> str: return ( "" ) def _render_runtime_graph_entity( slug: str, entity_type: str | None = None, *, mutations_enabled: bool | None = None, ) -> str | None: """Render graph metadata when the fast runtime graph lacks a full wiki page.""" normalized_type = _normalize_dashboard_entity_type(entity_type) if entity_type else None if entity_type is not None and normalized_type is None: return None graph = _graph_neighborhood(slug, hops=1, limit=32, entity_type=normalized_type) data = _runtime_graph_center_data(graph) if data is None: return None node_id = str(data.get("id") or graph.get("center") or "") resolved_slug = _graph_slug_from_node_id(node_id) or slug resolved_type = _graph_type_from_node_id( node_id, str(data.get("type") or normalized_type or "skill"), ) label = _display_label(data.get("label"), fallback_slug=resolved_slug) display_slug = _display_slug(resolved_slug) description = str(data.get("description") or "").strip() tags = [str(tag) for tag in data.get("tags", []) if str(tag).strip()][:12] sidecar = _load_sidecar(resolved_slug, entity_type=resolved_type) quality_score = data.get("quality_score") usage_score = data.get("usage_score") degree = data.get("degree") mutations = _MONITOR_MUTATIONS_ENABLED if mutations_enabled is None else mutations_enabled tag_html = ( "".join(f"{html.escape(tag)} " for tag in tags) if tags else "no tags in runtime graph" ) description_html = ( f"

{html.escape(description)}

" if description else "

No description is present in the runtime graph metadata.

" ) quality_summary = ( "
" "Runtime graph entity " f"{html.escape(resolved_type)} " f"node {html.escape(node_id)}" "
" ) overview_html = ( "
" "
" "

Runtime graph entity

" + description_html + "

Tags

" + f"

{tag_html}

" + "

Full wiki page

" + "

This entity exists in the installed runtime graph, but its full " "Markdown wiki page is not expanded locally. The graph and recommendation paths still " "work. Install the full wiki when you want the complete body/docs in this dashboard.

" + "
ctx-init --graph --graph-install-mode full
" + "
" "
Runtime metadata" "" + _runtime_graph_metric_row("slug", display_slug) + _runtime_graph_metric_row("type", resolved_type) + _runtime_graph_metric_row("node_id", node_id) + _runtime_graph_metric_row("quality_score", quality_score) + _runtime_graph_metric_row("usage_score", usage_score) + _runtime_graph_metric_row("degree", degree) + "
FieldValue
" "
" + _render_runtime_entity_action( resolved_slug, resolved_type, mutations_enabled=mutations, ) ) quality_html = ( _render_quality_drilldown(sidecar) if isinstance(sidecar, dict) else ( "
" "

Runtime graph quality

" "

No full quality sidecar is installed for this entity. " "The runtime graph still exposes the ranking signals available at graph build time.

" "" + _runtime_graph_metric_row("quality_score", quality_score) + _runtime_graph_metric_row("usage_score", usage_score) + _runtime_graph_metric_row("degree", degree) + "
SignalValue
" ) ) body = ( f"

{html.escape(label)}

" + quality_summary + _render_entity_tabs( overview_html=overview_html, subgraph_html=_render_entity_subgraph(resolved_slug, entity_type=resolved_type), quality_html=quality_html, ) + _render_runtime_entity_load_script( resolved_slug, resolved_type, mutations_enabled=mutations, ) ) return _layout(label, body) def _render_wiki_entity( slug: str, entity_type: str | None = None, *, mutations_enabled: bool | None = None, ) -> str: """Render one wiki entity page (frontmatter + body).""" path = _wiki_entity_path(slug, entity_type=entity_type) if path is None: runtime_html = _render_runtime_graph_entity( slug, entity_type=entity_type, mutations_enabled=mutations_enabled, ) if runtime_html is not None: return runtime_html return _layout( slug, f"

{html.escape(slug)}

" f"

No wiki page found for {html.escape(slug)}. " f"Try the skills index.

", ) try: raw = path.read_text(encoding="utf-8", errors="replace") except OSError as exc: return _layout( slug, f"

{html.escape(slug)}

read error: {html.escape(str(exc))}

", ) meta, md_body = _parse_frontmatter(raw) sidecar = _load_sidecar(slug, entity_type=entity_type) display_slug = _display_slug(slug) type_suffix = ( f"&type={html.escape(entity_type)}" if entity_type in _DASHBOARD_ENTITY_TYPES else "" ) fm_row_parts = [] for k, v in sorted(meta.items()): value, truncated = _truncate_text(_frontmatter_text(v), 120) marker = " (truncated)" if truncated else "" fm_row_parts.append( f"{html.escape(k)}" f"{html.escape(value)}{marker}" ) fm_rows = "".join(fm_row_parts) quality_summary_html = "" if sidecar is not None: quality_summary_html = ( "
" f"Quality " f"{html.escape(sidecar.get('grade', 'F'))} " f"score {sidecar.get('raw_score', 0.0):.3f}" f"{' · floor ' + html.escape(sidecar.get('hard_floor','')) if sidecar.get('hard_floor') else ''}" f"
" ) md_body_without_quality, embedded_quality_markdown = _extract_embedded_quality_block(md_body) display_body = _strip_duplicate_wiki_heading(md_body_without_quality, slug) body_preview, body_truncated = _truncate_text(display_body, 12000) body_html = _render_wiki_markdown(body_preview) body_truncated_html = ( "

Body preview truncated at 12,000 characters.

" if body_truncated else "" ) overview_html = ( "
" f"
{body_html}" f"{body_truncated_html}
" f"
Frontmatter" "" "" + (fm_rows or "") + "
FieldValue
none
" "
" ) subgraph_html = _render_entity_subgraph(slug, entity_type=entity_type) quality_html = _render_quality_drilldown(sidecar, embedded_quality_markdown) body = ( f"

{html.escape(display_slug)}

" + quality_summary_html + _render_entity_tabs( overview_html=overview_html, subgraph_html=subgraph_html, quality_html=quality_html, ) ) return _layout(display_slug, body) def _wiki_index_entries( limit_per_type: int | None = _WIKI_INDEX_LIMIT_PER_TYPE, ) -> list[dict]: """List every wiki entity page under ~/.claude/skill-wiki/entities/. Returns ``{slug, type, tags, description}`` rows. The full skill inventory is too large to render as one HTML page, so the dashboard samples a bounded number of pages per entity type. """ indexed = _wiki_index_entries_from_dashboard_index(limit_per_type) if indexed is not None: return indexed base = _wiki_dir() / "entities" if not base.is_dir(): return [] # MCPs are sharded (one dir per first-char) so we glob recursively; # all other dashboard entity types are flat. sources = _DASHBOARD_ENTITY_SOURCES out: list[dict] = [] for sub, entity_type, recursive in sources: d = base / sub if not d.is_dir(): continue paths = sorted( d.rglob("*.md") if recursive else d.glob("*.md"), key=lambda path: (path.stem.lower(), path.relative_to(d).as_posix().lower()), ) seen_for_type = 0 for path in paths: if limit_per_type is not None and seen_for_type >= limit_per_type: break slug = path.stem if not _is_safe_slug(slug): continue try: # Read only the first ~2 KB — enough for frontmatter. head = path.read_text(encoding="utf-8", errors="replace")[:2048] except OSError: continue meta, _ = _parse_frontmatter(head) all_tags = _frontmatter_tags(meta.get("tags", ""), limit=None) description, _truncated = _truncate_text( _frontmatter_text(meta.get("description", "")), 200, ) out.append({ "slug": slug, "display_slug": _display_slug(slug), "type": entity_type, "tags": all_tags[:6], "search_tags": all_tags, "description": description, }) seen_for_type += 1 return out def _wiki_index_entries_from_dashboard_index( limit_per_type: int | None, ) -> list[dict] | None: index_path = _dashboard_graph_index_path() if not index_path.is_file() or not _dashboard_index_matches_manifest(index_path): return None out: list[dict] = [] try: conn = sqlite3.connect(f"file:{index_path.as_posix()}?mode=ro", uri=True) try: for _sub, entity_type, _recursive in _DASHBOARD_ENTITY_SOURCES: params: list[Any] = [entity_type] limit_sql = "" if limit_per_type is not None: limit_sql = " LIMIT ?" params.append(max(0, int(limit_per_type))) rows = conn.execute( "SELECT id,label,type,tags,description,quality_score FROM nodes " "WHERE type=? ORDER BY lower(label), id" + limit_sql, params, ) for ( node_id, label, row_type, tags_raw, description_raw, quality_score, ) in rows: node_id_text = str(node_id) slug = ( node_id_text.split(":", 1)[1] if ":" in node_id_text else str(label) ) if not _is_safe_slug(slug): continue try: parsed_tags = json.loads(str(tags_raw or "[]")) except json.JSONDecodeError: parsed_tags = [] all_tags = [ str(tag) for tag in parsed_tags if isinstance(tag, str) ] description, _truncated = _truncate_text( _frontmatter_text(description_raw), 200, ) out.append({ "slug": slug, "display_slug": _display_slug(str(label or slug)), "type": str(row_type or entity_type), "tags": all_tags[:6], "search_tags": all_tags, "description": description, "grade": _grade_from_quality_score(quality_score), }) finally: conn.close() except (OSError, sqlite3.Error, ValueError, TypeError): return None return out def _grade_from_quality_score(value: Any) -> str: try: score = float(value) except (TypeError, ValueError): return "" if score >= 0.80: return "A" if score >= 0.60: return "B" if score >= 0.40: return "C" if score >= 0.0: return "D" return "" def _wiki_render_cache_key( selected_type: str | None, query: str, ) -> tuple[Any, ...] | None: index_path = _dashboard_graph_index_path() if not index_path.is_file() or not _dashboard_index_matches_manifest(index_path): return None try: index_stat = index_path.stat() source_stat = Path(__file__).stat() except OSError: return None try: css_hash = hashlib.sha256( _monitor_asset_text("monitor.css").encode("utf-8") ).hexdigest() except Exception: css_hash = "" return ( "wiki-index-v1", selected_type or "", query, str(index_path.resolve()), index_stat.st_mtime_ns, index_stat.st_size, _dashboard_graph_manifest_export_id() or "", source_stat.st_mtime_ns, source_stat.st_size, css_hash, ) def _disk_cache_token(cache_key: tuple[Any, ...]) -> str: return json.dumps(cache_key, separators=(",", ":"), sort_keys=True) def _read_disk_cache_payload(path: Path, cache_token: str) -> dict[str, Any] | None: try: data = json.loads(path.read_text(encoding="utf-8")) except (OSError, json.JSONDecodeError): return None if not isinstance(data, dict): return None if data.get("schema_version") != 1 or data.get("cache_token") != cache_token: return None return data def _write_disk_cache_payload( path: Path, cache_token: str, payload: dict[str, Any], *, sort_keys: bool = False, ) -> None: try: _atomic_write_text( path, json.dumps( { "schema_version": 1, "cache_token": cache_token, **payload, }, ensure_ascii=False, sort_keys=sort_keys, ) + "\n", encoding="utf-8", ) except (OSError, TypeError, ValueError): return def _read_html_disk_cache(path: Path, cache_token: str) -> str | None: data = _read_disk_cache_payload(path, cache_token) if data is None: return None html_text = data.get("html") return html_text if isinstance(html_text, str) else None def _write_html_disk_cache(path: Path, cache_token: str, html_text: str) -> None: _write_disk_cache_payload(path, cache_token, {"html": html_text}) def _wiki_render_disk_cache_path() -> Path: return _claude_dir() / ".ctx-monitor-wiki-cache.json" def _render_wiki_index(entity_type: str | None = None, query: str = "") -> str: """Card grid of every wiki entity — search + type filter + sidecar grades.""" selected_type = _normalize_dashboard_entity_type(entity_type) if entity_type else None if entity_type is not None and selected_type is None: return _layout( "Wiki", f"
Unsupported entity type: {html.escape(entity_type)}
", ) initial_query = query.strip() cache_key = _wiki_render_cache_key(selected_type, initial_query) global _WIKI_RENDER_CACHE_KEY, _WIKI_RENDER_CACHE_VALUE if cache_key is not None: if _WIKI_RENDER_CACHE_KEY == cache_key and _WIKI_RENDER_CACHE_VALUE is not None: return _WIKI_RENDER_CACHE_VALUE cache_token = _disk_cache_token(cache_key) cached = _read_html_disk_cache(_wiki_render_disk_cache_path(), cache_token) if cached is not None: _WIKI_RENDER_CACHE_KEY = cache_key _WIKI_RENDER_CACHE_VALUE = cached return cached else: cache_token = "" entries = _wiki_index_entries() wstats = _wiki_stats() total_available = int(wstats.get("total") or len(entries)) # Join with grade pills where a sidecar exists. grade_by_key: dict[tuple[str, str], str] = {} for entry in entries: slug = str(entry["slug"]) row_type = str(entry["type"]) grade = str(entry.get("grade") or "") if grade: grade_by_key[(slug, row_type)] = grade continue sidecar = _load_sidecar(slug, entity_type=row_type) if sidecar: grade_by_key[(slug, row_type)] = str(sidecar.get("grade") or "") type_counts = { "skill": int(wstats.get("skills") or 0), "agent": int(wstats.get("agents") or 0), "mcp-server": int(wstats.get("mcps") or 0), "harness": int(wstats.get("harnesses") or 0), } suggestions = "".join( f"" for entity_type in _DASHBOARD_ENTITY_TYPES ) initial_json = _json_for_script(initial_results) read_only = ( "" if mutations_enabled else ( "
Read-only mode. Catalog edits are " "disabled because ctx-monitor is not bound to a loopback address.
" ) ) disabled = "" if mutations_enabled else " disabled" body = ( "

Manage catalog

" "

Search skills, agents, MCP servers, and harnesses. " "Edit the wiki page, delete stale entries, or add a new entity. Saves " "write into ~/.claude/skill-wiki/entities/ and queue graph " "refresh work for the knowledge graph.

" + read_only + "
" "
" "

Search catalog

" "
" "" "" "
" "

Loading...

" "
" "
" "
" "

Add or update entity

" "
" "
" "" "" "" "" "" "" "" "
" "
" f"" "" f"" "
" "

" "
" "
" "
" "" + _monitor_inline_script("monitor-manage.js") ) return _layout("Manage catalog", body) def _harness_wizard_entries(limit: int = 24) -> list[dict[str, Any]]: """Return catalog harness pages for the manual dashboard wizard.""" harness_dir = _wiki_dir() / "entities" / "harnesses" if not harness_dir.is_dir(): return [] rows: list[dict[str, Any]] = [] for path in sorted(harness_dir.glob("*.md"), key=lambda p: p.stem.lower()): if len(rows) >= limit: break slug = path.stem if not _is_safe_slug(slug): continue try: head = path.read_text(encoding="utf-8", errors="replace")[:4096] except OSError: continue meta, _body = _parse_frontmatter(head) sidecar = _harness_wizard_sidecar(slug) or {} score = float(sidecar.get("raw_score", sidecar.get("score", 0.0)) or 0.0) tags = _frontmatter_tags(meta.get("tags", ""), limit=None) description, _truncated = _truncate_text( _frontmatter_text(meta.get("description", "")), 260, ) repo_url = _frontmatter_text( meta.get("repo_url") or meta.get("github_url") or meta.get("homepage_url") or "" ) rows.append({ "slug": slug, "title": _frontmatter_text(meta.get("title") or meta.get("name") or slug), "description": description, "tags": tags[:12], "score": score, "grade": str(sidecar.get("grade") or ""), "repo_url": repo_url, }) return sorted(rows, key=lambda row: (-float(row["score"]), str(row["slug"]))) def _harness_wizard_sidecar(slug: str) -> dict[str, Any] | None: """Load harness sidecar candidates without scanning every sidecar file.""" if not _is_safe_slug(slug): return None for path in ( _sidecar_dir() / f"{slug}.json", _sidecar_dir() / f"{slug}-harness.json", ): if not path.exists(): continue sidecar = _read_sidecar_file(path) if sidecar is None: continue if sidecar.get("slug") == slug and _sidecar_entity_type(sidecar) == "harness": return sidecar return None def _render_harness_wizard() -> str: """Manual harness interview for users who bring their own LLM.""" harnesses = _harness_wizard_entries() provider_options = ( "openai", "anthropic", "google", "openrouter", "ollama", "lm-studio", "local", "other", ) provider_html = "".join( f"" for provider in provider_options ) tool_options = ( ("files", "Files"), ("git", "Git"), ("shell", "Shell"), ("browser", "Browser"), ("http", "HTTP/network"), ("package-manager", "Package manager"), ("database", "Database"), ) tools_html = "".join( "" for value, label in tool_options ) harness_cards = "".join( "
" "
" f"{html.escape(row['title'])}" + ( f"{html.escape(row['grade'])}" if row["grade"] else "harness" ) + "
" f"

{html.escape(row['description'] or 'No description available.')}

" + ( "
" + " ".join(f"{html.escape(tag)}" for tag in row["tags"][:8]) + "
" if row["tags"] else "" ) + ( f"{html.escape(row['repo_url'])}" if row["repo_url"].startswith(("http://", "https://")) else "" ) + f"ctx-harness-install {html.escape(row['slug'])} --dry-run" + f"" + "
" for row in harnesses ) if not harness_cards: harness_cards = ( "

No harness pages were found under " "~/.claude/skill-wiki/entities/harnesses/. " "Use the no-fit PRD output below to build an attachable harness.

" ) body = ( "
" "
Model -> intent -> install -> attach ctx
" "

Harness Setup

" "

For users running their own API or local model instead of Claude Code. " "Interview the model/runtime choice, generate a real ctx harness recommendation command, " "then install a harness or produce a no-fit PRD for a custom harness.

" "local/API model path" "
" "
" "
1. ModelProvider, model slug, endpoint.
" "
2. IntentGoal, OS, access, privacy.
" "
3. InstallRecommend, dry-run, install.
" "
4. Attach ctxGraph/wiki recommendations flow into the harness.
" "
" "
" "
" "
1. Model" "
" "" "" "" "
" "
2. Goal and access" "
" "" "" "" "" "" "" "" "
" "
3. Recommend and install" "

The dashboard previews catalog matches. The command below calls the real harness recommender and keeps the no-fit path available.

" " " "" "
" "
" "" "
" "
" "
" "

Catalog harnesses

" "

Cards are filtered by the interview text. If none fit, use the no-fit PRD path.

" "0 matches" "
" "
" + harness_cards + "
" "
" "
" "

No-fit custom harness PRD

" "

When no catalog harness clears the configured match score, generate a PRD for the user's strong model or engineering team. It must include orchestration, durable state, permissions, verification gates, and ctx recommendation hooks.

" "
ctx-harness-install --recommend --goal \"...\" --model-provider openai --model openai/gpt-5.5 --plan-on-no-fit --plan-output custom-harness-prd.md
" "
" "" ) return _layout("Harness Setup", body) def _kpi_summary_cache_key(sidecar_dir: Path) -> tuple[Any, ...]: parts: list[tuple[str, str, int, int, int, int]] = [] for root in (sidecar_dir, sidecar_dir / "mcp"): try: root_name = str(root.resolve()) except OSError: root_name = str(root) buckets = { "quality": [0, 0, 0, 0], "lifecycle": [0, 0, 0, 0], } try: with os.scandir(root) as entries: for entry in entries: name = entry.name if ( name.startswith(".") or not name.endswith(".json") or not entry.is_file(follow_symlinks=False) ): continue bucket = ( "lifecycle" if name.endswith(".lifecycle.json") else "quality" ) try: stat = entry.stat(follow_symlinks=False) except OSError: continue data = buckets[bucket] data[0] += 1 data[1] += int(stat.st_size) data[2] = max(data[2], int(stat.st_mtime_ns)) data[3] = (data[3] + int(stat.st_mtime_ns)) & ((1 << 63) - 1) except OSError: pass for bucket, values in buckets.items(): parts.append((root_name, bucket, values[0], values[1], values[2], values[3])) return tuple(parts) def _kpi_summary_disk_cache_path(sidecar_dir: Path) -> Path: return sidecar_dir / ".dashboard-kpi-summary.json" def _dashboard_summary_from_dict(summary_cls: Any, data: Any) -> Any | None: if not isinstance(data, dict): return None def dict_field(name: str) -> dict[str, Any]: value = data.get(name) return dict(value) if isinstance(value, dict) else {} def list_field(name: str) -> list[dict[str, Any]]: value = data.get(name) if not isinstance(value, list): return [] return [dict(item) for item in value if isinstance(item, dict)] try: return summary_cls( generated_at=str(data.get("generated_at") or ""), total=int(data.get("total") or 0), by_subject=dict_field("by_subject"), grade_counts=dict_field("grade_counts"), lifecycle_counts=dict_field("lifecycle_counts"), category_breakdown=list_field("category_breakdown"), hard_floor_counts=dict_field("hard_floor_counts"), low_quality_candidates=list_field("low_quality_candidates"), archived=list_field("archived"), ) except (TypeError, ValueError): return None def _read_kpi_summary_disk_cache( sidecar_dir: Path, cache_token: str, summary_cls: Any, ) -> Any | None: data = _read_disk_cache_payload(_kpi_summary_disk_cache_path(sidecar_dir), cache_token) if data is None: return None return _dashboard_summary_from_dict(summary_cls, data.get("summary")) def _write_kpi_summary_disk_cache( sidecar_dir: Path, cache_token: str, summary: Any, ) -> None: _write_disk_cache_payload( _kpi_summary_disk_cache_path(sidecar_dir), cache_token, {"summary": summary.to_dict()}, sort_keys=True, ) def _kpi_summary(): """Compute the KPI DashboardSummary using the default source layout. Returns ``None`` if the kpi_dashboard module can't be imported or the required directories don't exist — the caller renders an explanatory empty state instead of failing. """ try: from kpi_dashboard import DashboardSummary # type: ignore from kpi_dashboard import generate # type: ignore from ctx_lifecycle import LifecycleSources # type: ignore except Exception: # noqa: BLE001 — KPIs are advisory return None sidecar_dir = _sidecar_dir() if not sidecar_dir.is_dir(): return None cache_key = _kpi_summary_cache_key(sidecar_dir) global _KPI_SUMMARY_CACHE_AT, _KPI_SUMMARY_CACHE_KEY, _KPI_SUMMARY_CACHE_VALUE if ( _KPI_SUMMARY_CACHE_KEY == cache_key and _KPI_SUMMARY_CACHE_VALUE is not None and time.monotonic() - _KPI_SUMMARY_CACHE_AT < _KPI_SUMMARY_CACHE_SECONDS ): return _KPI_SUMMARY_CACHE_VALUE cache_token = _disk_cache_token(cache_key) summary = _read_kpi_summary_disk_cache(sidecar_dir, cache_token, DashboardSummary) if summary is not None: _KPI_SUMMARY_CACHE_KEY = cache_key _KPI_SUMMARY_CACHE_VALUE = summary _KPI_SUMMARY_CACHE_AT = time.monotonic() return summary try: from ctx_config import cfg # type: ignore sources = LifecycleSources( skills_dir=cfg.skills_dir, agents_dir=cfg.agents_dir, sidecar_dir=sidecar_dir, ) except Exception: # noqa: BLE001 — fallback: sidecar-only sources = LifecycleSources( skills_dir=sidecar_dir, agents_dir=sidecar_dir, sidecar_dir=sidecar_dir, ) try: summary = generate(sources=sources, top_n=25) except Exception: # noqa: BLE001 return None _write_kpi_summary_disk_cache(sidecar_dir, cache_token, summary) _KPI_SUMMARY_CACHE_KEY = cache_key _KPI_SUMMARY_CACHE_VALUE = summary _KPI_SUMMARY_CACHE_AT = time.monotonic() return summary def _render_kpi() -> str: """HTML-rendered KPI dashboard — grades, lifecycle, categories, hard floors, top demotion candidates, archived entities. Mirrors the structure of ``kpi_dashboard.render_markdown`` so the commit-friendly Markdown digest and the browser view show the same numbers. """ summary = _kpi_summary() if summary is None or summary.total == 0: empty = ( "

KPIs

" "
No KPI data yet." "

" "The KPI dashboard reads from " "~/.claude/skill-quality/*.json and " "*.lifecycle.json. Run " "ctx-skill-quality recompute --all to populate " "sidecars, then reload this page.

" "

CLI equivalent: " "python -m kpi_dashboard render

" ) return _layout("KPIs", empty) total = summary.total # Grade distribution pills + detail table grade_pills = "".join( f"{g}: {summary.grade_counts.get(g, 0)} " for g in ("A", "B", "C", "D", "F") ) def pct(n: int) -> str: return f"{(100.0 * n / total):.1f}%" if total else "—" grade_rows = "".join( f"{g}" f"{summary.grade_counts.get(g, 0)}" f"{pct(summary.grade_counts.get(g, 0))}" for g in ("A", "B", "C", "D", "F") ) lifecycle_rows = "".join( f"{html.escape(state)}" f"{summary.lifecycle_counts.get(state, 0)}" for state in ("active", "watch", "demote", "archive") ) floor_rows = "".join( f"{html.escape(reason)}{count}" for reason, count in sorted( summary.hard_floor_counts.items(), key=lambda kv: (-kv[1], kv[0]), ) ) or "No hard floors active." category_rows = "".join( "" f"{html.escape(c['category'])}" f"{c['count']}" f"{c['avg_score']:.3f}" f"{c['grade_mix'].get('A', 0)}" f"{c['grade_mix'].get('B', 0)}" f"{c['grade_mix'].get('C', 0)}" f"{c['grade_mix'].get('D', 0)}" f"{c['grade_mix'].get('F', 0)}" "" for c in summary.category_breakdown ) or "No categorized entities." def detail_href(slug: str, entity_type: str) -> str: normalized = _normalize_dashboard_entity_type(entity_type) suffix = f"?type={html.escape(normalized)}" if normalized else "" return f"/skill/{html.escape(slug)}{suffix}" demotion_rows = "".join( "" f"{html.escape(c['slug'])}" f"{html.escape(c['subject_type'])}" f"{html.escape(c['category'])}" f"{html.escape(c['grade'])}" f"{c['score']:.3f}" f"{html.escape(c['lifecycle_state'])}" f"{c['consecutive_d_count']}" f"{html.escape(c.get('hard_floor') or '—')}" "" for c in summary.low_quality_candidates ) or "No active D/F grade entries — corpus is healthy." archived_rows = "".join( "" f"{html.escape(a['slug'])}" f"{html.escape(a['subject_type'])}" f"{html.escape(a['category'])}" f"{html.escape(a.get('last_grade') or '—')}" f"{html.escape(a.get('computed_at') or '—')}" "" for a in summary.archived ) or "None." by_subject = summary.by_subject subject_blurb = " · ".join( f"{html.escape(s)}: {n}" for s, n in sorted(by_subject.items()) ) or "—" body = ( "

KPIs

" "

Aggregated from " "~/.claude/skill-quality/*.json (quality sidecars) " "and *.lifecycle.json (tier sidecars). " f"Generated {html.escape(summary.generated_at)}.

" "
" f"Total entities: {total} " f"· {subject_blurb}" f"
{grade_pills}
" "" "
" "
" "
Grade distribution" "" + grade_rows + "
GradeCountShare
" "
Lifecycle tiers" "" + lifecycle_rows + "
StateCount
" "
" "
Hard floors active" "" + floor_rows + "
ReasonCount
" "
By category" "" "" + category_rows + "
CategoryCountAvg scoreABCDF
" "
Top demotion candidates " "(active or watch · grade D/F · sorted by D-streak desc, score asc)" "" "" + demotion_rows + "
SlugTypeCategoryGradeScoreStateD-streakHard floor
" "
Archived" "" "" + archived_rows + "
SlugTypeCategoryLast gradeComputed at
" ) return _layout("KPIs", body) def _render_status() -> str: """Render queue and graph/wiki artifact state for operator checks.""" status = _status_payload() queue = status["queue"] artifacts = status["artifacts"] counts = queue.get("counts", {}) count_pills = " ".join( f"{html.escape(name)}: {int(counts.get(name, 0))}" for name in ( wiki_queue.STATUS_PENDING, wiki_queue.STATUS_RUNNING, wiki_queue.STATUS_SUCCEEDED, wiki_queue.STATUS_FAILED, wiki_queue.STATUS_CANCELLED, ) ) job_rows = "".join( "" f"{job.get('id')}" f"{html.escape(str(job.get('kind') or ''))}" f"{html.escape(str(job.get('status') or ''))}" f"{job.get('attempts')}/{job.get('max_attempts')}" f"{html.escape(str(job.get('source') or ''))}" f"{html.escape(str(job.get('worker_id') or ''))}" f"{html.escape(str(job.get('last_error') or ''))[:120]}" "" for job in queue.get("recent_jobs", []) ) or "No queue jobs recorded." artifact_keys = ( ("graph_json", "graph.json"), ("graph_delta_json", "graph-delta.json"), ("communities_json", "communities.json"), ("wiki_graph_tar", "wiki-graph.tar.gz"), ("skills_sh_catalog", "skill-index.json.gz"), ) artifact_rows = "".join( "" f"{label}" f"{'yes' if artifacts[key].get('exists') else 'no'}" f"{int(artifacts[key].get('size') or 0):,}" f"{html.escape(str(artifacts[key].get('path') or ''))}" "" for key, label in artifact_keys ) promotion_rows = "".join( "" f"{html.escape(str(row.get('status') or ''))}" f"{html.escape(str(row.get('promoted_at') or row.get('started_at') or ''))}" f"{html.escape(str(row.get('current_sha256') or row.get('candidate_sha256') or ''))[:16]}" f"{html.escape(str(row.get('target') or ''))}" "" for row in artifacts.get("promotions", []) ) or "No promotion metadata recorded." queue_error = queue.get("error") if queue_error: availability = f"error ({html.escape(str(queue.get('db_path') or ''))})" elif queue.get("available"): availability = "available" else: availability = f"not initialized ({html.escape(str(queue.get('db_path') or ''))})" queue_error_html = ( "

Queue DB error: " f"{html.escape(str(queue_error))}

" if queue_error else "" ) body = ( "

Status

" "
" "Queue state" f"

Durable worker DB: {availability}. " f"Total jobs: {int(queue.get('total') or 0)}. " "JSON

" f"{queue_error_html}" f"
{count_pills}
" "
" "
Recent queue jobs" "" "" + job_rows + "
IDKindStatusAttemptsSourceWorkerLast error
" "
Artifact versions" "" + artifact_rows + "
ArtifactExistsBytesPath
" f"
Artifact promotions ({artifacts.get('promotion_count', 0)})" "" + promotion_rows + "
StatusTimeHashTarget
" ) return _layout("Status", body) def _render_events() -> str: """SSE endpoint page. The server emits events at /api/events.stream.""" entries = _read_jsonl(_audit_log_path(), limit=200) event_lines = [ json.dumps(entry, ensure_ascii=False, default=str) for entry in entries ] initial_stream = "\n".join(event_lines) if not initial_stream: initial_stream = "-- no audit events recorded yet; waiting for new events --" return _layout( "Live events", "

Live events

" "

Tails ~/.claude/ctx-audit.jsonl " "via server-sent events.

" "
" f"Showing last {len(entries)} audit events; " "new writes append below. " "connecting..." "
" "
"
        f"{html.escape(initial_stream)}"
        "
" "", ) def _render_loaded(mutations_enabled: bool | None = None) -> str: """Live view of ~/.claude/skill-manifest.json with load/unload actions. Groups manifest entries by ``entity_type`` (skill / agent / mcp-server / harness) with a per-section count. Unload button posts both the slug and entity_type so the server routes correctly — MCPs need ``claude mcp remove``, skills + agents take the file-copy path. Legacy entries without entity_type default to ``skill`` (what the pre-install_utils manifest implicitly assumed). """ if mutations_enabled is None: mutations_enabled = _MONITOR_MUTATIONS_ENABLED manifest = _read_manifest() load_rows = manifest.get("load", []) unload_rows = manifest.get("unload", []) def _etype(entry: dict) -> str: # Missing entity_type => legacy skill entry. return str(entry.get("entity_type") or "skill") # Split loaded by entity_type for the sectioned layout. by_type: dict[str, list[dict]] = { "skill": [], "agent": [], "mcp-server": [], "harness": [], } for e in load_rows: by_type.setdefault(_etype(e), []).append(e) disabled_attr = "" if mutations_enabled else " disabled" mutation_token = _MONITOR_TOKEN if mutations_enabled else "" mutation_notice = ( "" if mutations_enabled else ( "
Read-only mode. " "Load/unload actions are disabled because ctx-monitor is not " "bound to a loopback address.
" ) ) def _row(e: dict) -> str: slug = e.get("skill", "") etype = _etype(e) link = ( f"" f"{html.escape(slug)}" ) action = ( f"ctx-harness-install {html.escape(slug)} " f"--uninstall --dry-run" if etype == "harness" else f"" ) return ( f"" f"{link}" f"{html.escape(e.get('source', ''))}" f"{html.escape(str(e.get('command', '') or e.get('priority', '—')))[:60]}" f"{action}" f"" ) def _section(title: str, etype: str) -> str: rows = by_type.get(etype, []) if not rows: return ( f"

{title} " f"(0)

" f"

" f"None loaded.

" ) return ( f"

{title} " f"({len(rows)})

" f"" f"" + "".join(_row(e) for e in rows) + "
SlugSourceCmd / priority
" ) unload_html = "".join( f"" f"{html.escape(e.get('skill', ''))}" f"{html.escape(_etype(e))}" f"{html.escape(str(e.get('source', '') or e.get('reason', ''))[:80])}" f"" f"" for e in unload_rows ) body = ( "

Loaded entities — skills, agents, MCPs & harnesses

" f"
" f"{len(load_rows)} currently loaded " f"(" f"{len(by_type.get('skill', []))} skills · " f"{len(by_type.get('agent', []))} agents · " f"{len(by_type.get('mcp-server', []))} MCPs · " f"{len(by_type.get('harness', []))} harnesses) · " f"{len(unload_rows)} known-unloaded · " f"source: ~/.claude/skill-manifest.json " f"+ ~/.claude/harness-installs/*.json" "
" f"{mutation_notice}" "

Load an entity

" "
" "
" "" "" f"" "" "
" f"

Currently loaded ({len(load_rows)})

" + _section("Skills", "skill") + _section("Agents", "agent") + _section("MCP servers", "mcp-server") + _section("Harnesses", "harness") + f"

Recently unloaded ({len(unload_rows)})

" "" + unload_html + "
SlugTypeSource / reason
" "" ) return _layout("Loaded", body) def _render_runtime_lifecycle() -> str: summary = _runtime_lifecycle_summary() def _event_cell(event: dict[str, Any], key: str, limit: int = 120) -> str: return html.escape(str(event.get(key) or ""))[:limit] validation_rows = "".join( "" f"{_event_cell(event, 'created_at')}" f"{_event_cell(event, 'check_name')}" f"{_event_cell(event, 'status')}" f"{_event_cell(event, 'session_id')}" f"{_event_cell(event, 'summary')}" "" for event in reversed(summary["recent_validations"]) ) escalation_rows = "".join( "" f"{_event_cell(event, 'created_at')}" f"{_event_cell(event, 'trigger')}" f"{_event_cell(event, 'severity')}" f"{_event_cell(event, 'session_id')}" f"{_event_cell(event, 'reason')}" "" for event in reversed(summary["open_escalations"]) ) body = ( "

Runtime lifecycle

" "
" f"{summary['validations_total']} validations / " f"{summary['validation_failures']} failed / " f"{summary['open_escalations_total']} open escalations" f"
source: {html.escape(summary['path'])}" " / JSON" "
" "
Recent validations" + ( "" "" + validation_rows + "
CreatedCheckStatusSessionSummary
" if validation_rows else "

No validation checks recorded yet.

" ) + "
" "
Open escalations" + ( "" "" + escalation_rows + "
CreatedTriggerSeveritySessionReason
" if escalation_rows else "

No open escalations.

" ) + "
" ) return _layout("Runtime lifecycle", body) def _render_logs() -> str: """Filterable audit-log viewer — reads the last 500 lines of the log.""" entries = _read_jsonl(_audit_log_path(), limit=500) rows = "".join( f"" f"{html.escape(e.get('ts', ''))}" f"{html.escape(e.get('event', ''))}" f"{html.escape(e.get('subject', ''))}" f"{html.escape(e.get('actor', ''))}" f"{html.escape((e.get('session_id') or '')[:24])}" f"{html.escape(json.dumps(e.get('meta', {}))[:100])}" f"" for e in reversed(entries) ) body = ( "

Audit log

" f"
Showing last {len(entries)} of " f"~/.claude/ctx-audit.jsonl. " "Live stream →" "
" "
" "" "" "e.g. skill.loaded, kubernetes-deployment, or a session id" "
" "" "" + rows + "
tseventsubjectactorsessionmeta
" "" ) return _layout("Audit log", body) # ─── Mutation endpoints ────────────────────────────────────────────────────── def _is_safe_slug(slug: str) -> bool: return is_safe_source_name(slug) def _perform_load( slug: str, entity_type: str = "skill", *, command: str | None = None, json_config: str | None = None, ) -> tuple[bool, str]: return dashboard_entities.perform_load( slug, entity_type, command=command, json_config=json_config, deps=_entity_runtime_deps(), ) def _perform_unload(slug: str, entity_type: str = "skill") -> tuple[bool, str]: return dashboard_entities.perform_unload( slug, entity_type, deps=_entity_runtime_deps(), ) # ─── HTTP handler ──────────────────────────────────────────────────────────── def _server_shutdown_requested(server: Any) -> bool: event = getattr(server, "_ctx_shutdown", None) return bool(event is not None and event.is_set()) class _MonitorHandler(BaseHTTPRequestHandler): # Silence the per-request access log spam. Users running # ctx-monitor get a clean stdout; errors still surface via # log_error() below. def log_message(self, fmt: str, *args: Any) -> None: return # CSRF defense. Dashboard mutation endpoints (/api/load, /api/unload) # require same-origin POSTs plus a per-process token injected into the # served dashboard page. def _same_origin(self) -> bool: request_host = _request_host_name(self.headers.get("Host", "")) if not _host_allows_mutations(request_host): return False origin = self.headers.get("Origin") or "" if origin: return _origin_host_name(origin) == request_host # No Origin header (curl, direct tool calls) is acceptable only # when the mutation token below is also present. return True def _mutations_enabled(self) -> bool: return bool( getattr(self.server, "_ctx_mutations_enabled", _MONITOR_MUTATIONS_ENABLED), ) def _mutation_authorized(self) -> bool: token = self.headers.get("X-CTX-Monitor-Token") or "" return ( self._mutations_enabled() and bool(_MONITOR_TOKEN) and secrets.compare_digest(token, _MONITOR_TOKEN) ) def _api_reads_enabled(self) -> bool: return self._mutations_enabled() def _read_authorized(self, qs: dict[str, str]) -> bool: request_host = _request_host_name(self.headers.get("Host", "")) if self._mutations_enabled(): return _host_allows_mutations(request_host) token = ( self.headers.get("X-CTX-Monitor-Token") or qs.get("token", "") or _read_token_cookie(self.headers.get("Cookie", "")) ) return bool(_MONITOR_TOKEN) and secrets.compare_digest(token, _MONITOR_TOKEN) def _send_security_headers(self, *, html_response: bool = False) -> None: self.send_header("X-Content-Type-Options", "nosniff") self.send_header("Referrer-Policy", "no-referrer") self.send_header("X-Frame-Options", "DENY") if getattr(self, "_ctx_set_read_cookie", False): self.send_header( "Set-Cookie", f"{_READ_TOKEN_COOKIE}={_MONITOR_TOKEN}; Path=/; " "HttpOnly; SameSite=Strict", ) if html_response: self.send_header( "Content-Security-Policy", "default-src 'self'; script-src 'self' 'unsafe-inline'; " "style-src 'self' 'unsafe-inline'; connect-src 'self'; " "object-src 'none'; base-uri 'none'; frame-ancestors 'none'", ) def _content_length(self) -> int | None: raw = self.headers.get("Content-Length") if raw is None: return 0 try: length = int(raw) except ValueError: self._send_json_status(400, {"detail": "invalid Content-Length"}) return None if length < 0: self._send_json_status(400, {"detail": "invalid Content-Length"}) return None if length > _MAX_POST_BODY_BYTES: self._send_json_status(413, {"detail": "JSON body too large"}) return None return length def _read_json_body(self) -> dict[str, Any] | None: content_type = self.headers.get("Content-Type", "").split(";", 1)[0] if content_type.lower() != "application/json": self._send_json_status(415, {"detail": "JSON body required"}) return None length = self._content_length() if length is None: return None raw = self.rfile.read(length) if length else b"" try: body = json.loads(raw.decode("utf-8")) if raw else {} except (UnicodeDecodeError, json.JSONDecodeError): self._send_json_status(400, {"detail": "invalid JSON body"}) return None if not isinstance(body, dict): self._send_json_status(400, {"detail": "JSON object body required"}) return None return body def _discard_small_body(self) -> None: raw = self.headers.get("Content-Length") if raw is None: return try: length = int(raw) except ValueError: return if 0 < length <= _MAX_POST_BODY_BYTES: self.rfile.read(length) def do_GET(self) -> None: # noqa: N802 — stdlib signature # Parse once so we can reuse the query string for /graph?slug=… raw_path, _, raw_query = self.path.partition("?") path = raw_path qs = {} if raw_query: from urllib.parse import parse_qs qs = {k: v[0] for k, v in parse_qs(raw_query).items()} try: self._ctx_set_read_cookie = False read_authorized = getattr(self, "_read_authorized", lambda _qs: True) if not read_authorized(qs): if path.startswith("/api/"): self._send_json_status( 403, {"detail": "monitor read token required on non-loopback bind"}, ) else: self._send_html_status( 403, "

403

" "

monitor read token required on non-loopback bind

", ) return query_token = qs.get("token", "") self._ctx_set_read_cookie = ( not self._mutations_enabled() and bool(query_token) and bool(_MONITOR_TOKEN) and secrets.compare_digest(query_token, _MONITOR_TOKEN) ) if path == "/": self._send_html(_render_home()) elif path == "/sessions": self._send_html(_render_sessions_index()) elif path.startswith("/session/"): self._send_html(_render_session_detail(path.split("/session/", 1)[1])) elif path == "/skills": self._send_html(_render_skills(qs)) elif path.startswith("/skill/"): self._send_html(_render_skill_detail( path.split("/skill/", 1)[1], qs.get("type"), )) elif path == "/loaded": self._send_html(_render_loaded(self._mutations_enabled())) elif path == "/logs": self._send_html(_render_logs()) elif path == "/graph": self._send_html(_render_graph(qs.get("slug"), qs.get("type"))) elif path == "/manage": self._send_html(_render_manage(self._mutations_enabled())) elif path == "/harness": self._send_html(_render_harness_wizard()) elif path == "/docs": self._send_html(_render_docs()) elif path == "/config": self._send_html(_render_config()) elif path == "/status": self._send_html(_render_status()) elif path == "/wiki": self._send_html(_render_wiki_index(qs.get("type"), qs.get("q", ""))) elif path.startswith("/wiki/"): slug = path.split("/wiki/", 1)[1] self._send_html( _render_wiki_entity( slug, qs.get("type"), mutations_enabled=self._mutations_enabled(), ), ) elif path == "/kpi": self._send_html(_render_kpi()) elif path == "/runtime": self._send_html(_render_runtime_lifecycle()) elif path in {"/events", "/live"}: self._send_html(_render_events()) elif path == "/api/sessions.json": self._send_json(_summarize_sessions()) elif path == "/api/manifest.json": self._send_json(_read_manifest()) elif path == "/api/status.json": self._send_json(_status_payload()) elif path == "/api/kpi.json": summary = _kpi_summary() self._send_json(summary.to_dict() if summary is not None else { "total": 0, "detail": "no sidecars yet", }) elif path == "/api/grades.json": self._send_json(_grade_distribution_payload()) elif path == "/api/sidecars.json": self._send_json(_sidecar_page_payload(qs)) elif path == "/api/runtime.json": self._send_json(_runtime_lifecycle_summary()) elif path == "/api/config.json": self._send_json(_effective_config_payload()) elif path == "/api/entities/search.json": try: limit = max(1, min(int(qs.get("limit", 80)), 200)) results = _search_wiki_entities( qs.get("q", ""), qs.get("type") or None, limit=limit, ) except ValueError as exc: self._send_json_status(400, {"detail": str(exc)}) return self._send_json({"results": results, "total": len(results)}) elif path.startswith("/api/entity/") and path.endswith(".json"): slug = unquote(path[len("/api/entity/"): -len(".json")]) try: detail = _wiki_entity_detail(slug, qs.get("type")) except ValueError as exc: self._send_json_status(400, {"detail": str(exc)}) return if detail is None: self._send_json_status(404, {"detail": f"no wiki entity for {slug}"}) else: self._send_json(detail) elif path.startswith("/api/skill/") and path.endswith(".json"): slug = unquote(path[len("/api/skill/"): -len(".json")]) sidecar = _load_sidecar(slug, entity_type=qs.get("type")) if sidecar is None: self._send_404(f"no sidecar for {slug}") else: self._send_json(sidecar) elif path.startswith("/api/graph/") and path.endswith(".json"): slug = unquote(path[len("/api/graph/"): -len(".json")]) requested_type = qs.get("type") graph_entity_type = _normalize_dashboard_entity_type(requested_type) if requested_type is not None and graph_entity_type is None: self._send_json_status( 400, {"detail": f"unsupported entity_type: {requested_type!r}"}, ) return try: hops = max(1, min(int(qs.get("hops", 1)), 3)) limit = max(5, min(int(qs.get("limit", 40)), 150)) except ValueError: self._send_json_status( 400, {"detail": "hops and limit must be integers"}, ) return self._send_json(_graph_neighborhood( slug, hops=hops, limit=limit, entity_type=graph_entity_type, )) elif path == "/api/events.stream": self._stream_audit_log() else: self._send_404(path) except (BrokenPipeError, ConnectionAbortedError): # Browser disconnected mid-response — benign for a local # dashboard; nothing to do. return except Exception as exc: # noqa: BLE001 — last-resort handler self._send_500(exc) def do_POST(self) -> None: # noqa: N802 — stdlib signature """Mutation endpoints. Same-origin only; JSON body required.""" path = self.path.split("?", 1)[0] try: if not self._mutations_enabled(): self._discard_small_body() self._send_json_status( 403, {"detail": "monitor mutations disabled on non-loopback bind"}, ) return if not self._same_origin(): self._discard_small_body() self._send_json_status( 403, {"detail": "cross-origin POST denied"}, ) return if not self._mutation_authorized(): self._discard_small_body() self._send_json_status( 403, {"detail": "monitor token required"}, ) return body = self._read_json_body() if body is None: return if path == "/api/load": slug = str(body.get("slug", "")).strip() etype = str(body.get("entity_type", "skill")).strip() or "skill" command = body.get("command") json_config = body.get("json_config") kwargs: dict[str, str] = {} if isinstance(command, str) and command: kwargs["command"] = command if isinstance(json_config, str) and json_config: kwargs["json_config"] = json_config ok, msg = _perform_load(slug, entity_type=etype, **kwargs) self._send_json_status( 200 if ok else 400, {"ok": ok, "detail": msg}, ) elif path == "/api/unload": slug = str(body.get("slug", "")).strip() # entity_type defaults to "skill" for backward compat with # existing JS that only sends {slug}. New /loaded page # sends {slug, entity_type} so MCPs flow through the # subprocess unload path. etype = str(body.get("entity_type", "skill")).strip() or "skill" ok, msg = _perform_unload(slug, entity_type=etype) self._send_json_status( 200 if ok else 400, {"ok": ok, "detail": msg}, ) elif path == "/api/config": updates = body.get("updates", {}) if not isinstance(updates, dict): self._send_json_status( 400, {"ok": False, "detail": "updates must be an object"}, ) return result = _save_config_updates(updates) self._send_json_status(200 if result.get("ok") else 400, result) elif path == "/api/entity/upsert": ok, msg = _upsert_wiki_entity(body) self._send_json_status( 200 if ok else 400, {"ok": ok, "detail": msg}, ) elif path == "/api/entity/delete": slug = str(body.get("slug", "")).strip() etype = str(body.get("entity_type", "skill")).strip() or "skill" ok, msg = _delete_wiki_entity(slug, etype) self._send_json_status( 200 if ok else 400, {"ok": ok, "detail": msg}, ) else: self._send_404(path) except (BrokenPipeError, ConnectionAbortedError): return except Exception as exc: # noqa: BLE001 self._send_500(exc) def _send_json_status(self, status: int, obj: Any) -> None: raw = json.dumps(obj, default=str).encode("utf-8") self.send_response(status) self.send_header("Content-Type", "application/json") self.send_header("Content-Length", str(len(raw))) self._send_security_headers() self.end_headers() self.wfile.write(raw) def _send_html(self, body: str) -> None: raw = body.encode("utf-8") self.send_response(200) self.send_header("Content-Type", "text/html; charset=utf-8") self.send_header("Content-Length", str(len(raw))) self._send_security_headers(html_response=True) self.end_headers() self.wfile.write(raw) def _send_html_status(self, status: int, body: str) -> None: raw = body.encode("utf-8") self.send_response(status) self.send_header("Content-Type", "text/html; charset=utf-8") self.send_header("Content-Length", str(len(raw))) self._send_security_headers(html_response=True) self.end_headers() self.wfile.write(raw) def _send_json(self, obj: Any) -> None: raw = json.dumps(obj, default=str).encode("utf-8") self.send_response(200) self.send_header("Content-Type", "application/json") self.send_header("Content-Length", str(len(raw))) self._send_security_headers() self.end_headers() self.wfile.write(raw) def _send_404(self, detail: str) -> None: body = f"

404

{html.escape(detail)}

".encode() self.send_response(404) self.send_header("Content-Type", "text/html; charset=utf-8") self.send_header("Content-Length", str(len(body))) self._send_security_headers(html_response=True) self.end_headers() self.wfile.write(body) def _send_500(self, exc: BaseException) -> None: self.log_error("render error: %s", exc) body = f"

500

{html.escape(repr(exc))}
".encode() self.send_response(500) self.send_header("Content-Type", "text/html; charset=utf-8") self.send_header("Content-Length", str(len(body))) self._send_security_headers(html_response=True) self.end_headers() self.wfile.write(body) def _stream_audit_log(self) -> None: """Server-sent events: tail the audit log line-by-line.""" self.send_response(200) self.send_header("Content-Type", "text/event-stream") self.send_header("Cache-Control", "no-cache") self.send_header("Connection", "keep-alive") self._send_security_headers() self.end_headers() path = _audit_log_path() position = path.stat().st_size if path.exists() else 0 last_heartbeat = time.monotonic() try: while not _server_shutdown_requested(self.server): if path.exists() and path.stat().st_size > position: with path.open("r", encoding="utf-8") as f: f.seek(position) for line in f: if not line.strip(): continue self.wfile.write(f"data: {line.rstrip()}\n\n".encode()) self.wfile.flush() position = f.tell() last_heartbeat = time.monotonic() elif time.monotonic() - last_heartbeat > 25: # SSE heartbeat comment — keeps proxies from timing out # on idle streams. Also detects dead clients (write # will raise BrokenPipeError). self.wfile.write(b": heartbeat\n\n") self.wfile.flush() last_heartbeat = time.monotonic() time.sleep(0.5) except (BrokenPipeError, ConnectionAbortedError, ConnectionResetError): return # ─── CLI ───────────────────────────────────────────────────────────────────── class _MonitorServer(ThreadingHTTPServer): daemon_threads = True block_on_close = False def __init__(self, *args: Any, **kwargs: Any) -> None: self._ctx_shutdown = threading.Event() super().__init__(*args, **kwargs) def shutdown(self) -> None: self._ctx_shutdown.set() super().shutdown() def server_close(self) -> None: self._ctx_shutdown.set() super().server_close() def handle_error(self, request: Any, client_address: Any) -> None: exc_type, _, _ = sys.exc_info() if exc_type is not None and issubclass( exc_type, (BrokenPipeError, ConnectionAbortedError, ConnectionResetError), ): return super().handle_error(request, client_address) def _make_monitor_server(host: str, port: int) -> _MonitorServer: global _MONITOR_MUTATIONS_ENABLED _MONITOR_MUTATIONS_ENABLED = _host_allows_mutations(host) server = _MonitorServer((host, port), _MonitorHandler) server._ctx_mutations_enabled = _MONITOR_MUTATIONS_ENABLED return server def serve(host: str = "127.0.0.1", port: int = 8765) -> None: """Run the monitor. Blocks until Ctrl+C.""" global _MONITOR_TOKEN server = _make_monitor_server(host, port) _MONITOR_TOKEN = secrets.token_urlsafe(32) mutations_enabled = bool(getattr(server, "_ctx_mutations_enabled", False)) url = f"http://{_monitor_display_host(host)}:{port}/" if not mutations_enabled: url = f"{url}?token={_MONITOR_TOKEN}" print(f"ctx-monitor serving at {url} (Ctrl+C to stop)", flush=True) if not mutations_enabled: print( "ctx-monitor: non-loopback bind; read token required and " "load/unload mutations disabled", flush=True, ) try: server.serve_forever() except KeyboardInterrupt: print("ctx-monitor: shutdown", flush=True) finally: server.server_close() def _monitor_display_host(host: str) -> str: """Return a URL host users can paste into a browser.""" if host in {"0.0.0.0", "::"}: try: candidate = socket.gethostbyname(socket.gethostname()) except OSError: candidate = "" if candidate and not candidate.startswith("127."): return candidate return "localhost" if ":" in host and not host.startswith("["): return f"[{host}]" return host def main(argv: list[str] | None = None) -> int: parser = argparse.ArgumentParser( prog="ctx-monitor", description="Local HTTP dashboard for ctx skill/agent activity.", ) sub = parser.add_subparsers(dest="cmd", required=True) sp = sub.add_parser("serve", help="Start the monitor web server") sp.add_argument("--port", type=int, default=8765) sp.add_argument( "--host", default="127.0.0.1", help="Host to bind (default: 127.0.0.1; use 0.0.0.0 to expose — be careful)", ) args = parser.parse_args(argv) if args.cmd == "serve": serve(host=args.host, port=args.port) return 0 if __name__ == "__main__": sys.exit(main())