Spaces:
Sleeping
Sleeping
| """SQL error classifiers: TF-IDF baseline and MiniLM embedding model.""" | |
| from __future__ import annotations | |
| import json | |
| from pathlib import Path | |
| from typing import List, Literal, Optional, Protocol, Union | |
| import joblib | |
| import numpy as np | |
| from sklearn.feature_extraction.text import TfidfVectorizer | |
| from sklearn.linear_model import SGDClassifier | |
| from sklearn.pipeline import FeatureUnion, Pipeline | |
| PROJECT_ROOT = Path(__file__).resolve().parent.parent | |
| DEFAULT_MODEL_PATH = PROJECT_ROOT / "models" / "sql_error_classifier.joblib" | |
| DEFAULT_ENCODER = "sentence-transformers/all-MiniLM-L6-v2" | |
| ModelType = Literal["cross_encoder", "cross_encoder_ft", "multi_tower", "minilm", "tfidf"] | |
| class TextClassifier(Protocol): | |
| classes_: np.ndarray | |
| def fit(self, texts: List[str], y: np.ndarray) -> "TextClassifier": ... | |
| def predict(self, texts: List[str]) -> np.ndarray: ... | |
| def predict_proba(self, texts: List[str]) -> np.ndarray: ... | |
| def combine_features( | |
| queries: List[str], | |
| error_messages: Optional[List[str]] = None, | |
| schemas: Optional[List[str]] = None, | |
| questions: Optional[List[str]] = None, | |
| ) -> List[str]: | |
| """Fuse question, schema, query, and optional error message.""" | |
| texts: List[str] = [] | |
| for i, query in enumerate(queries): | |
| parts: List[str] = [] | |
| if questions and questions[i]: | |
| parts.append(f"QUESTION: {questions[i]}") | |
| if schemas and schemas[i]: | |
| parts.append(f"SCHEMA: {schemas[i]}") | |
| parts.append(f"QUERY: {query}") | |
| if error_messages and error_messages[i]: | |
| parts.append(f"ERROR: {error_messages[i]}") | |
| texts.append(" ".join(parts)) | |
| return texts | |
| def _build_text_features() -> FeatureUnion: | |
| return FeatureUnion( | |
| [ | |
| ( | |
| "word", | |
| TfidfVectorizer( | |
| analyzer="word", | |
| ngram_range=(1, 2), | |
| max_features=30_000, | |
| sublinear_tf=True, | |
| strip_accents="unicode", | |
| token_pattern=r"(?u)\b\w+\b|(?<=[=<>!])\S+", | |
| ), | |
| ), | |
| ( | |
| "char", | |
| TfidfVectorizer( | |
| analyzer="char_wb", | |
| ngram_range=(2, 5), | |
| max_features=20_000, | |
| sublinear_tf=True, | |
| ), | |
| ), | |
| ] | |
| ) | |
| def build_tfidf_classifier() -> Pipeline: | |
| """Bag-of-words baseline. Fast but no deep semantic understanding.""" | |
| clf = SGDClassifier( | |
| loss="log_loss", | |
| penalty="l2", | |
| alpha=1e-5, | |
| max_iter=1000, | |
| tol=1e-3, | |
| class_weight="balanced", | |
| random_state=42, | |
| ) | |
| return Pipeline([("tfidf", _build_text_features()), ("clf", clf)]) | |
| class EmbeddingClassifier: | |
| """ | |
| MiniLM sentence embeddings + linear classifier. | |
| Understands question intent (e.g. 'average' vs wrong aggregate) because | |
| the encoder models full sentence context, not isolated word counts. | |
| """ | |
| def __init__( | |
| self, | |
| encoder_name: str = DEFAULT_ENCODER, | |
| batch_size: int = 256, | |
| ): | |
| self.encoder_name = encoder_name | |
| self.batch_size = batch_size | |
| self.encoder = None | |
| self.clf = SGDClassifier( | |
| loss="log_loss", | |
| penalty="l2", | |
| alpha=1e-4, | |
| max_iter=1000, | |
| tol=1e-3, | |
| class_weight="balanced", | |
| random_state=42, | |
| ) | |
| self.classes_: Optional[np.ndarray] = None | |
| def _load_encoder(self): | |
| if self.encoder is None: | |
| from sentence_transformers import SentenceTransformer | |
| self.encoder = SentenceTransformer(self.encoder_name) | |
| def encode(self, texts: List[str], show_progress: bool = False) -> np.ndarray: | |
| self._load_encoder() | |
| return self.encoder.encode( | |
| texts, | |
| batch_size=self.batch_size, | |
| show_progress_bar=show_progress, | |
| convert_to_numpy=True, | |
| ) | |
| def fit(self, texts: List[str], y: np.ndarray) -> "EmbeddingClassifier": | |
| X = self.encode(texts, show_progress=True) | |
| self.clf.fit(X, y) | |
| self.classes_ = self.clf.classes_ | |
| return self | |
| def predict(self, texts: List[str]) -> np.ndarray: | |
| return self.clf.predict(self.encode(texts)) | |
| def predict_proba(self, texts: List[str]) -> np.ndarray: | |
| return self.clf.predict_proba(self.encode(texts)) | |
| def build_classifier( | |
| model_type: ModelType = "cross_encoder", | |
| ) -> Union[ | |
| Pipeline, | |
| EmbeddingClassifier, | |
| "MultiTowerClassifier", | |
| "CrossEncoderClassifier", | |
| "FineTunedCrossEncoderClassifier", | |
| ]: | |
| if model_type == "tfidf": | |
| return build_tfidf_classifier() | |
| if model_type == "minilm": | |
| return EmbeddingClassifier() | |
| if model_type == "multi_tower": | |
| from src.multi_tower_model import MultiTowerClassifier | |
| return MultiTowerClassifier() | |
| if model_type == "cross_encoder": | |
| from src.cross_encoder_model import CrossEncoderClassifier | |
| return CrossEncoderClassifier() | |
| if model_type == "cross_encoder_ft": | |
| from src.cross_encoder_model import FineTunedCrossEncoderClassifier | |
| return FineTunedCrossEncoderClassifier() | |
| raise ValueError(f"Unknown model_type: {model_type}") | |
| def save_model( | |
| model: Union[ | |
| Pipeline, | |
| EmbeddingClassifier, | |
| "MultiTowerClassifier", | |
| "CrossEncoderClassifier", | |
| "FineTunedCrossEncoderClassifier", | |
| ], | |
| path: Path = DEFAULT_MODEL_PATH, | |
| model_type: ModelType = "cross_encoder", | |
| ) -> Path: | |
| from src.cross_encoder_model import ( | |
| CrossEncoderClassifier, | |
| FineTunedCrossEncoderClassifier, | |
| ) | |
| from src.multi_tower_model import MultiTowerClassifier | |
| path.parent.mkdir(parents=True, exist_ok=True) | |
| if isinstance(model, FineTunedCrossEncoderClassifier): | |
| ft_path = path if path.is_dir() or str(path).endswith("/") else path.with_suffix(".ce") | |
| if ft_path.suffix == ".joblib": | |
| ft_path = ft_path.with_suffix(".ce") | |
| model.save(ft_path) | |
| meta_path = ft_path / "meta.json" if ft_path.is_dir() else path.with_suffix(".meta.json") | |
| with open(meta_path, "w") as f: | |
| json.dump({"model_type": "cross_encoder_ft", "path": str(ft_path)}, f, indent=2) | |
| return ft_path | |
| if isinstance(model, CrossEncoderClassifier): | |
| payload = { | |
| "model_type": "cross_encoder", | |
| "cross_encoder_name": model.cross_encoder_name, | |
| "batch_size": model.batch_size, | |
| "max_length": model.max_length, | |
| "scaler": model.scaler, | |
| "classifier": model.clf, | |
| "classes_": model.classes_, | |
| } | |
| joblib.dump(payload, path) | |
| meta_path = path.with_suffix(".meta.json") | |
| with open(meta_path, "w") as f: | |
| json.dump( | |
| { | |
| "model_type": "cross_encoder", | |
| "cross_encoder_name": model.cross_encoder_name, | |
| }, | |
| f, | |
| indent=2, | |
| ) | |
| elif isinstance(model, MultiTowerClassifier): | |
| payload = { | |
| "model_type": "multi_tower", | |
| "encoder_name": model.encoder_name, | |
| "batch_size": model.batch_size, | |
| "scaler": model.scaler, | |
| "classifier": model.clf, | |
| "classes_": model.classes_, | |
| } | |
| joblib.dump(payload, path) | |
| meta_path = path.with_suffix(".meta.json") | |
| with open(meta_path, "w") as f: | |
| json.dump( | |
| {"model_type": "multi_tower", "encoder_name": model.encoder_name}, | |
| f, | |
| indent=2, | |
| ) | |
| elif isinstance(model, EmbeddingClassifier): | |
| payload = { | |
| "model_type": model_type, | |
| "encoder_name": model.encoder_name, | |
| "batch_size": model.batch_size, | |
| "classifier": model.clf, | |
| "classes_": model.classes_, | |
| } | |
| joblib.dump(payload, path) | |
| meta_path = path.with_suffix(".meta.json") | |
| with open(meta_path, "w") as f: | |
| json.dump( | |
| {"model_type": model_type, "encoder_name": model.encoder_name}, | |
| f, | |
| indent=2, | |
| ) | |
| else: | |
| joblib.dump({"model_type": "tfidf", "pipeline": model}, path) | |
| return path | |
| def load_model( | |
| path: Path = DEFAULT_MODEL_PATH, | |
| ) -> Union[ | |
| Pipeline, | |
| EmbeddingClassifier, | |
| "MultiTowerClassifier", | |
| "CrossEncoderClassifier", | |
| "FineTunedCrossEncoderClassifier", | |
| ]: | |
| from src.cross_encoder_model import ( | |
| CrossEncoderClassifier, | |
| FineTunedCrossEncoderClassifier, | |
| ) | |
| from src.multi_tower_model import MultiTowerClassifier | |
| path = Path(path) | |
| # Fine-tuned cross-encoder saved as directory | |
| ce_path = path.with_suffix(".ce") if path.suffix == ".joblib" else path | |
| if ce_path.exists() and (ce_path / "config.json").exists(): | |
| return FineTunedCrossEncoderClassifier.load(ce_path) | |
| meta_path = path.with_suffix(".meta.json") | |
| if meta_path.exists(): | |
| with open(meta_path) as f: | |
| meta = json.load(f) | |
| if meta.get("model_type") == "cross_encoder_ft": | |
| ft_path = Path(meta.get("path", str(ce_path))) | |
| return FineTunedCrossEncoderClassifier.load(ft_path) | |
| obj = joblib.load(path) | |
| if isinstance(obj, dict): | |
| if obj.get("model_type") == "cross_encoder": | |
| model = CrossEncoderClassifier( | |
| cross_encoder_name=obj["cross_encoder_name"], | |
| batch_size=obj.get("batch_size", 32), | |
| max_length=obj.get("max_length", 512), | |
| ) | |
| model.scaler = obj["scaler"] | |
| model.clf = obj["classifier"] | |
| model.classes_ = obj.get("classes_", obj["classifier"].classes_) | |
| return model | |
| if obj.get("model_type") == "multi_tower": | |
| model = MultiTowerClassifier( | |
| encoder_name=obj["encoder_name"], | |
| batch_size=obj.get("batch_size", 256), | |
| ) | |
| model.scaler = obj["scaler"] | |
| model.clf = obj["classifier"] | |
| model.classes_ = obj.get("classes_", obj["classifier"].classes_) | |
| return model | |
| if obj.get("model_type") == "minilm": | |
| model = EmbeddingClassifier( | |
| encoder_name=obj["encoder_name"], | |
| batch_size=obj.get("batch_size", 256), | |
| ) | |
| model.clf = obj["classifier"] | |
| model.classes_ = obj.get("classes_", obj["classifier"].classes_) | |
| return model | |
| return obj["pipeline"] | |
| return obj | |