"""SQL error classifiers: TF-IDF baseline and MiniLM embedding model.""" from __future__ import annotations import json from pathlib import Path from typing import List, Literal, Optional, Protocol, Union import joblib import numpy as np from sklearn.feature_extraction.text import TfidfVectorizer from sklearn.linear_model import SGDClassifier from sklearn.pipeline import FeatureUnion, Pipeline PROJECT_ROOT = Path(__file__).resolve().parent.parent DEFAULT_MODEL_PATH = PROJECT_ROOT / "models" / "sql_error_classifier.joblib" DEFAULT_ENCODER = "sentence-transformers/all-MiniLM-L6-v2" ModelType = Literal["cross_encoder", "cross_encoder_ft", "multi_tower", "minilm", "tfidf"] class TextClassifier(Protocol): classes_: np.ndarray def fit(self, texts: List[str], y: np.ndarray) -> "TextClassifier": ... def predict(self, texts: List[str]) -> np.ndarray: ... def predict_proba(self, texts: List[str]) -> np.ndarray: ... def combine_features( queries: List[str], error_messages: Optional[List[str]] = None, schemas: Optional[List[str]] = None, questions: Optional[List[str]] = None, ) -> List[str]: """Fuse question, schema, query, and optional error message.""" texts: List[str] = [] for i, query in enumerate(queries): parts: List[str] = [] if questions and questions[i]: parts.append(f"QUESTION: {questions[i]}") if schemas and schemas[i]: parts.append(f"SCHEMA: {schemas[i]}") parts.append(f"QUERY: {query}") if error_messages and error_messages[i]: parts.append(f"ERROR: {error_messages[i]}") texts.append(" ".join(parts)) return texts def _build_text_features() -> FeatureUnion: return FeatureUnion( [ ( "word", TfidfVectorizer( analyzer="word", ngram_range=(1, 2), max_features=30_000, sublinear_tf=True, strip_accents="unicode", token_pattern=r"(?u)\b\w+\b|(?<=[=<>!])\S+", ), ), ( "char", TfidfVectorizer( analyzer="char_wb", ngram_range=(2, 5), max_features=20_000, sublinear_tf=True, ), ), ] ) def build_tfidf_classifier() -> Pipeline: """Bag-of-words baseline. Fast but no deep semantic understanding.""" clf = SGDClassifier( loss="log_loss", penalty="l2", alpha=1e-5, max_iter=1000, tol=1e-3, class_weight="balanced", random_state=42, ) return Pipeline([("tfidf", _build_text_features()), ("clf", clf)]) class EmbeddingClassifier: """ MiniLM sentence embeddings + linear classifier. Understands question intent (e.g. 'average' vs wrong aggregate) because the encoder models full sentence context, not isolated word counts. """ def __init__( self, encoder_name: str = DEFAULT_ENCODER, batch_size: int = 256, ): self.encoder_name = encoder_name self.batch_size = batch_size self.encoder = None self.clf = SGDClassifier( loss="log_loss", penalty="l2", alpha=1e-4, max_iter=1000, tol=1e-3, class_weight="balanced", random_state=42, ) self.classes_: Optional[np.ndarray] = None def _load_encoder(self): if self.encoder is None: from sentence_transformers import SentenceTransformer self.encoder = SentenceTransformer(self.encoder_name) def encode(self, texts: List[str], show_progress: bool = False) -> np.ndarray: self._load_encoder() return self.encoder.encode( texts, batch_size=self.batch_size, show_progress_bar=show_progress, convert_to_numpy=True, ) def fit(self, texts: List[str], y: np.ndarray) -> "EmbeddingClassifier": X = self.encode(texts, show_progress=True) self.clf.fit(X, y) self.classes_ = self.clf.classes_ return self def predict(self, texts: List[str]) -> np.ndarray: return self.clf.predict(self.encode(texts)) def predict_proba(self, texts: List[str]) -> np.ndarray: return self.clf.predict_proba(self.encode(texts)) def build_classifier( model_type: ModelType = "cross_encoder", ) -> Union[ Pipeline, EmbeddingClassifier, "MultiTowerClassifier", "CrossEncoderClassifier", "FineTunedCrossEncoderClassifier", ]: if model_type == "tfidf": return build_tfidf_classifier() if model_type == "minilm": return EmbeddingClassifier() if model_type == "multi_tower": from src.multi_tower_model import MultiTowerClassifier return MultiTowerClassifier() if model_type == "cross_encoder": from src.cross_encoder_model import CrossEncoderClassifier return CrossEncoderClassifier() if model_type == "cross_encoder_ft": from src.cross_encoder_model import FineTunedCrossEncoderClassifier return FineTunedCrossEncoderClassifier() raise ValueError(f"Unknown model_type: {model_type}") def save_model( model: Union[ Pipeline, EmbeddingClassifier, "MultiTowerClassifier", "CrossEncoderClassifier", "FineTunedCrossEncoderClassifier", ], path: Path = DEFAULT_MODEL_PATH, model_type: ModelType = "cross_encoder", ) -> Path: from src.cross_encoder_model import ( CrossEncoderClassifier, FineTunedCrossEncoderClassifier, ) from src.multi_tower_model import MultiTowerClassifier path.parent.mkdir(parents=True, exist_ok=True) if isinstance(model, FineTunedCrossEncoderClassifier): ft_path = path if path.is_dir() or str(path).endswith("/") else path.with_suffix(".ce") if ft_path.suffix == ".joblib": ft_path = ft_path.with_suffix(".ce") model.save(ft_path) meta_path = ft_path / "meta.json" if ft_path.is_dir() else path.with_suffix(".meta.json") with open(meta_path, "w") as f: json.dump({"model_type": "cross_encoder_ft", "path": str(ft_path)}, f, indent=2) return ft_path if isinstance(model, CrossEncoderClassifier): payload = { "model_type": "cross_encoder", "cross_encoder_name": model.cross_encoder_name, "batch_size": model.batch_size, "max_length": model.max_length, "scaler": model.scaler, "classifier": model.clf, "classes_": model.classes_, } joblib.dump(payload, path) meta_path = path.with_suffix(".meta.json") with open(meta_path, "w") as f: json.dump( { "model_type": "cross_encoder", "cross_encoder_name": model.cross_encoder_name, }, f, indent=2, ) elif isinstance(model, MultiTowerClassifier): payload = { "model_type": "multi_tower", "encoder_name": model.encoder_name, "batch_size": model.batch_size, "scaler": model.scaler, "classifier": model.clf, "classes_": model.classes_, } joblib.dump(payload, path) meta_path = path.with_suffix(".meta.json") with open(meta_path, "w") as f: json.dump( {"model_type": "multi_tower", "encoder_name": model.encoder_name}, f, indent=2, ) elif isinstance(model, EmbeddingClassifier): payload = { "model_type": model_type, "encoder_name": model.encoder_name, "batch_size": model.batch_size, "classifier": model.clf, "classes_": model.classes_, } joblib.dump(payload, path) meta_path = path.with_suffix(".meta.json") with open(meta_path, "w") as f: json.dump( {"model_type": model_type, "encoder_name": model.encoder_name}, f, indent=2, ) else: joblib.dump({"model_type": "tfidf", "pipeline": model}, path) return path def load_model( path: Path = DEFAULT_MODEL_PATH, ) -> Union[ Pipeline, EmbeddingClassifier, "MultiTowerClassifier", "CrossEncoderClassifier", "FineTunedCrossEncoderClassifier", ]: from src.cross_encoder_model import ( CrossEncoderClassifier, FineTunedCrossEncoderClassifier, ) from src.multi_tower_model import MultiTowerClassifier path = Path(path) # Fine-tuned cross-encoder saved as directory ce_path = path.with_suffix(".ce") if path.suffix == ".joblib" else path if ce_path.exists() and (ce_path / "config.json").exists(): return FineTunedCrossEncoderClassifier.load(ce_path) meta_path = path.with_suffix(".meta.json") if meta_path.exists(): with open(meta_path) as f: meta = json.load(f) if meta.get("model_type") == "cross_encoder_ft": ft_path = Path(meta.get("path", str(ce_path))) return FineTunedCrossEncoderClassifier.load(ft_path) obj = joblib.load(path) if isinstance(obj, dict): if obj.get("model_type") == "cross_encoder": model = CrossEncoderClassifier( cross_encoder_name=obj["cross_encoder_name"], batch_size=obj.get("batch_size", 32), max_length=obj.get("max_length", 512), ) model.scaler = obj["scaler"] model.clf = obj["classifier"] model.classes_ = obj.get("classes_", obj["classifier"].classes_) return model if obj.get("model_type") == "multi_tower": model = MultiTowerClassifier( encoder_name=obj["encoder_name"], batch_size=obj.get("batch_size", 256), ) model.scaler = obj["scaler"] model.clf = obj["classifier"] model.classes_ = obj.get("classes_", obj["classifier"].classes_) return model if obj.get("model_type") == "minilm": model = EmbeddingClassifier( encoder_name=obj["encoder_name"], batch_size=obj.get("batch_size", 256), ) model.clf = obj["classifier"] model.classes_ = obj.get("classes_", obj["classifier"].classes_) return model return obj["pipeline"] return obj