nishu08's picture
Deploy CodeBERT training Space
9b2cded verified
Raw
History Blame Contribute Delete
10.8 kB
"""SQL error classifiers: TF-IDF baseline and MiniLM embedding model."""
from __future__ import annotations
import json
from pathlib import Path
from typing import List, Literal, Optional, Protocol, Union
import joblib
import numpy as np
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import SGDClassifier
from sklearn.pipeline import FeatureUnion, Pipeline
PROJECT_ROOT = Path(__file__).resolve().parent.parent
DEFAULT_MODEL_PATH = PROJECT_ROOT / "models" / "sql_error_classifier.joblib"
DEFAULT_ENCODER = "sentence-transformers/all-MiniLM-L6-v2"
ModelType = Literal["cross_encoder", "cross_encoder_ft", "multi_tower", "minilm", "tfidf"]
class TextClassifier(Protocol):
classes_: np.ndarray
def fit(self, texts: List[str], y: np.ndarray) -> "TextClassifier": ...
def predict(self, texts: List[str]) -> np.ndarray: ...
def predict_proba(self, texts: List[str]) -> np.ndarray: ...
def combine_features(
queries: List[str],
error_messages: Optional[List[str]] = None,
schemas: Optional[List[str]] = None,
questions: Optional[List[str]] = None,
) -> List[str]:
"""Fuse question, schema, query, and optional error message."""
texts: List[str] = []
for i, query in enumerate(queries):
parts: List[str] = []
if questions and questions[i]:
parts.append(f"QUESTION: {questions[i]}")
if schemas and schemas[i]:
parts.append(f"SCHEMA: {schemas[i]}")
parts.append(f"QUERY: {query}")
if error_messages and error_messages[i]:
parts.append(f"ERROR: {error_messages[i]}")
texts.append(" ".join(parts))
return texts
def _build_text_features() -> FeatureUnion:
return FeatureUnion(
[
(
"word",
TfidfVectorizer(
analyzer="word",
ngram_range=(1, 2),
max_features=30_000,
sublinear_tf=True,
strip_accents="unicode",
token_pattern=r"(?u)\b\w+\b|(?<=[=<>!])\S+",
),
),
(
"char",
TfidfVectorizer(
analyzer="char_wb",
ngram_range=(2, 5),
max_features=20_000,
sublinear_tf=True,
),
),
]
)
def build_tfidf_classifier() -> Pipeline:
"""Bag-of-words baseline. Fast but no deep semantic understanding."""
clf = SGDClassifier(
loss="log_loss",
penalty="l2",
alpha=1e-5,
max_iter=1000,
tol=1e-3,
class_weight="balanced",
random_state=42,
)
return Pipeline([("tfidf", _build_text_features()), ("clf", clf)])
class EmbeddingClassifier:
"""
MiniLM sentence embeddings + linear classifier.
Understands question intent (e.g. 'average' vs wrong aggregate) because
the encoder models full sentence context, not isolated word counts.
"""
def __init__(
self,
encoder_name: str = DEFAULT_ENCODER,
batch_size: int = 256,
):
self.encoder_name = encoder_name
self.batch_size = batch_size
self.encoder = None
self.clf = SGDClassifier(
loss="log_loss",
penalty="l2",
alpha=1e-4,
max_iter=1000,
tol=1e-3,
class_weight="balanced",
random_state=42,
)
self.classes_: Optional[np.ndarray] = None
def _load_encoder(self):
if self.encoder is None:
from sentence_transformers import SentenceTransformer
self.encoder = SentenceTransformer(self.encoder_name)
def encode(self, texts: List[str], show_progress: bool = False) -> np.ndarray:
self._load_encoder()
return self.encoder.encode(
texts,
batch_size=self.batch_size,
show_progress_bar=show_progress,
convert_to_numpy=True,
)
def fit(self, texts: List[str], y: np.ndarray) -> "EmbeddingClassifier":
X = self.encode(texts, show_progress=True)
self.clf.fit(X, y)
self.classes_ = self.clf.classes_
return self
def predict(self, texts: List[str]) -> np.ndarray:
return self.clf.predict(self.encode(texts))
def predict_proba(self, texts: List[str]) -> np.ndarray:
return self.clf.predict_proba(self.encode(texts))
def build_classifier(
model_type: ModelType = "cross_encoder",
) -> Union[
Pipeline,
EmbeddingClassifier,
"MultiTowerClassifier",
"CrossEncoderClassifier",
"FineTunedCrossEncoderClassifier",
]:
if model_type == "tfidf":
return build_tfidf_classifier()
if model_type == "minilm":
return EmbeddingClassifier()
if model_type == "multi_tower":
from src.multi_tower_model import MultiTowerClassifier
return MultiTowerClassifier()
if model_type == "cross_encoder":
from src.cross_encoder_model import CrossEncoderClassifier
return CrossEncoderClassifier()
if model_type == "cross_encoder_ft":
from src.cross_encoder_model import FineTunedCrossEncoderClassifier
return FineTunedCrossEncoderClassifier()
raise ValueError(f"Unknown model_type: {model_type}")
def save_model(
model: Union[
Pipeline,
EmbeddingClassifier,
"MultiTowerClassifier",
"CrossEncoderClassifier",
"FineTunedCrossEncoderClassifier",
],
path: Path = DEFAULT_MODEL_PATH,
model_type: ModelType = "cross_encoder",
) -> Path:
from src.cross_encoder_model import (
CrossEncoderClassifier,
FineTunedCrossEncoderClassifier,
)
from src.multi_tower_model import MultiTowerClassifier
path.parent.mkdir(parents=True, exist_ok=True)
if isinstance(model, FineTunedCrossEncoderClassifier):
ft_path = path if path.is_dir() or str(path).endswith("/") else path.with_suffix(".ce")
if ft_path.suffix == ".joblib":
ft_path = ft_path.with_suffix(".ce")
model.save(ft_path)
meta_path = ft_path / "meta.json" if ft_path.is_dir() else path.with_suffix(".meta.json")
with open(meta_path, "w") as f:
json.dump({"model_type": "cross_encoder_ft", "path": str(ft_path)}, f, indent=2)
return ft_path
if isinstance(model, CrossEncoderClassifier):
payload = {
"model_type": "cross_encoder",
"cross_encoder_name": model.cross_encoder_name,
"batch_size": model.batch_size,
"max_length": model.max_length,
"scaler": model.scaler,
"classifier": model.clf,
"classes_": model.classes_,
}
joblib.dump(payload, path)
meta_path = path.with_suffix(".meta.json")
with open(meta_path, "w") as f:
json.dump(
{
"model_type": "cross_encoder",
"cross_encoder_name": model.cross_encoder_name,
},
f,
indent=2,
)
elif isinstance(model, MultiTowerClassifier):
payload = {
"model_type": "multi_tower",
"encoder_name": model.encoder_name,
"batch_size": model.batch_size,
"scaler": model.scaler,
"classifier": model.clf,
"classes_": model.classes_,
}
joblib.dump(payload, path)
meta_path = path.with_suffix(".meta.json")
with open(meta_path, "w") as f:
json.dump(
{"model_type": "multi_tower", "encoder_name": model.encoder_name},
f,
indent=2,
)
elif isinstance(model, EmbeddingClassifier):
payload = {
"model_type": model_type,
"encoder_name": model.encoder_name,
"batch_size": model.batch_size,
"classifier": model.clf,
"classes_": model.classes_,
}
joblib.dump(payload, path)
meta_path = path.with_suffix(".meta.json")
with open(meta_path, "w") as f:
json.dump(
{"model_type": model_type, "encoder_name": model.encoder_name},
f,
indent=2,
)
else:
joblib.dump({"model_type": "tfidf", "pipeline": model}, path)
return path
def load_model(
path: Path = DEFAULT_MODEL_PATH,
) -> Union[
Pipeline,
EmbeddingClassifier,
"MultiTowerClassifier",
"CrossEncoderClassifier",
"FineTunedCrossEncoderClassifier",
]:
from src.cross_encoder_model import (
CrossEncoderClassifier,
FineTunedCrossEncoderClassifier,
)
from src.multi_tower_model import MultiTowerClassifier
path = Path(path)
# Fine-tuned cross-encoder saved as directory
ce_path = path.with_suffix(".ce") if path.suffix == ".joblib" else path
if ce_path.exists() and (ce_path / "config.json").exists():
return FineTunedCrossEncoderClassifier.load(ce_path)
meta_path = path.with_suffix(".meta.json")
if meta_path.exists():
with open(meta_path) as f:
meta = json.load(f)
if meta.get("model_type") == "cross_encoder_ft":
ft_path = Path(meta.get("path", str(ce_path)))
return FineTunedCrossEncoderClassifier.load(ft_path)
obj = joblib.load(path)
if isinstance(obj, dict):
if obj.get("model_type") == "cross_encoder":
model = CrossEncoderClassifier(
cross_encoder_name=obj["cross_encoder_name"],
batch_size=obj.get("batch_size", 32),
max_length=obj.get("max_length", 512),
)
model.scaler = obj["scaler"]
model.clf = obj["classifier"]
model.classes_ = obj.get("classes_", obj["classifier"].classes_)
return model
if obj.get("model_type") == "multi_tower":
model = MultiTowerClassifier(
encoder_name=obj["encoder_name"],
batch_size=obj.get("batch_size", 256),
)
model.scaler = obj["scaler"]
model.clf = obj["classifier"]
model.classes_ = obj.get("classes_", obj["classifier"].classes_)
return model
if obj.get("model_type") == "minilm":
model = EmbeddingClassifier(
encoder_name=obj["encoder_name"],
batch_size=obj.get("batch_size", 256),
)
model.clf = obj["classifier"]
model.classes_ = obj.get("classes_", obj["classifier"].classes_)
return model
return obj["pipeline"]
return obj