| """ |
| Data processing pipeline for LOBPatternNet v2. |
| Fixed: proper normalization, balanced labeling, oversampling. |
| """ |
|
|
| import numpy as np |
| import pandas as pd |
| from datasets import load_dataset |
| from sklearn.model_selection import train_test_split |
| import torch |
| from torch.utils.data import Dataset, DataLoader, WeightedRandomSampler |
| import os |
|
|
|
|
| def load_lob_data(): |
| """Load TRADES-LOB dataset from HF Hub.""" |
| ds = load_dataset("LeonardoBerti/TRADES-LOB", split="train") |
| df = ds.to_pandas() |
| print(f"Loaded dataset: {len(df)} rows") |
| return df |
|
|
|
|
| def extract_and_normalize_features(df): |
| """ |
| Extract and normalize LOB features properly. |
| |
| Approach: |
| 1. Separate price and size features |
| 2. Prices: normalize relative to mid-price (basis points) |
| 3. Sizes: log-transform then z-score |
| 4. Replace invalid values with 0 |
| 5. Final z-score normalization per feature |
| |
| Returns: (N, 40) normalized features |
| """ |
| N = len(df) |
| |
| |
| ask_prices = np.zeros((N, 10), dtype=np.float64) |
| ask_sizes = np.zeros((N, 10), dtype=np.float64) |
| bid_prices = np.zeros((N, 10), dtype=np.float64) |
| bid_sizes = np.zeros((N, 10), dtype=np.float64) |
| |
| for i in range(10): |
| ask_prices[:, i] = df[f'ask_price_{i+1}'].values.astype(np.float64) |
| ask_sizes[:, i] = df[f'ask_size_{i+1}'].values.astype(np.float64) |
| bid_prices[:, i] = df[f'bid_price_{i+1}'].values.astype(np.float64) |
| bid_sizes[:, i] = df[f'bid_size_{i+1}'].values.astype(np.float64) |
| |
| |
| SENTINEL = 1e9 |
| ask_p_valid = np.abs(ask_prices) < SENTINEL |
| ask_s_valid = np.abs(ask_sizes) < SENTINEL |
| bid_p_valid = np.abs(bid_prices) < SENTINEL |
| bid_s_valid = np.abs(bid_sizes) < SENTINEL |
| |
| n_invalid = (~ask_p_valid).sum() + (~bid_p_valid).sum() + (~ask_s_valid).sum() + (~bid_s_valid).sum() |
| print(f"Found {n_invalid} invalid/sentinel values") |
| |
| |
| best_ask = ask_prices[:, 0].copy() |
| best_bid = bid_prices[:, 0].copy() |
| both_valid = ask_p_valid[:, 0] & bid_p_valid[:, 0] |
| mid_price = np.where(both_valid, (best_ask + best_bid) / 2.0, 0.0) |
| |
| |
| for i in range(1, N): |
| if mid_price[i] == 0 and mid_price[i-1] != 0: |
| mid_price[i] = mid_price[i-1] |
| |
| |
| norm_ask_p = np.zeros_like(ask_prices) |
| norm_bid_p = np.zeros_like(bid_prices) |
| |
| for i in range(10): |
| valid_a = ask_p_valid[:, i] & (mid_price > 0) |
| valid_b = bid_p_valid[:, i] & (mid_price > 0) |
| norm_ask_p[valid_a, i] = (ask_prices[valid_a, i] - mid_price[valid_a]) / mid_price[valid_a] * 10000 |
| norm_bid_p[valid_b, i] = (bid_prices[valid_b, i] - mid_price[valid_b]) / mid_price[valid_b] * 10000 |
| |
| |
| norm_ask_s = np.zeros_like(ask_sizes) |
| norm_bid_s = np.zeros_like(bid_sizes) |
| |
| for i in range(10): |
| valid_a = ask_s_valid[:, i] & (ask_sizes[:, i] > 0) |
| valid_b = bid_s_valid[:, i] & (bid_sizes[:, i] > 0) |
| norm_ask_s[valid_a, i] = np.log1p(ask_sizes[valid_a, i]) |
| norm_bid_s[valid_b, i] = np.log1p(bid_sizes[valid_b, i]) |
| |
| |
| features = np.zeros((N, 40), dtype=np.float32) |
| for i in range(10): |
| features[:, i*4] = norm_ask_p[:, i] |
| features[:, i*4+1] = norm_ask_s[:, i] |
| features[:, i*4+2] = norm_bid_p[:, i] |
| features[:, i*4+3] = norm_bid_s[:, i] |
| |
| |
| means = features.mean(axis=0) |
| stds = features.std(axis=0) |
| stds[stds < 1e-8] = 1.0 |
| features = (features - means) / stds |
| |
| |
| features = np.nan_to_num(features, nan=0.0, posinf=0.0, neginf=0.0) |
| |
| print(f"Feature shape: {features.shape}") |
| print(f"Feature range: [{features.min():.4f}, {features.max():.4f}]") |
| print(f"Feature mean: {features.mean():.6f}, std: {features.std():.4f}") |
| |
| return features, means, stds |
|
|
|
|
| def rolling_sum(arr, window): |
| """Fully vectorized rolling sum using cumsum trick.""" |
| cum = np.cumsum(arr) |
| result = np.zeros_like(cum) |
| result[window:] = cum[window:] - cum[:-window] |
| return result |
|
|
|
|
| def construct_labels_vectorized(df, window=50, ofi_threshold=0.15, percentile=85): |
| """ |
| Fully vectorized label construction for institutional trading detection. |
| Uses rolling windows and relaxed thresholds for better class balance. |
| """ |
| N = len(df) |
| buy_sell = df['BUY_SELL_FLAG'].values.astype(np.float32) |
| sizes = df['SIZE'].values.astype(np.float32) |
| types = df['TYPE'].values |
| |
| print(f"Constructing labels from {N} events, window={window}...") |
| |
| |
| signed_vol = sizes * (2 * buy_sell - 1) |
| |
| |
| roll_signed = rolling_sum(signed_vol, window) |
| roll_total = rolling_sum(sizes, window) |
| norm_ofi = roll_signed / (roll_total + 1e-8) |
| |
| |
| is_large = (sizes > np.percentile(sizes, percentile)).astype(np.float32) |
| roll_large_buy = rolling_sum(is_large * buy_sell, window) |
| roll_large_sell = rolling_sum(is_large * (1 - buy_sell), window) |
| |
| |
| is_cancel = (types == 'ORDER_CANCELLED').astype(np.float32) |
| roll_cancel = rolling_sum(is_cancel, window) / window |
| |
| |
| large_diff = (roll_large_buy - roll_large_sell) / (window * 0.1 + 1e-8) |
| buy_score = norm_ofi + 0.3 * large_diff + 0.2 * roll_cancel |
| sell_score = -norm_ofi - 0.3 * large_diff + 0.2 * roll_cancel |
| |
| |
| valid = np.arange(window, N) |
| buy_threshold = np.percentile(buy_score[valid], 80) |
| sell_threshold = np.percentile(sell_score[valid], 80) |
| |
| print(f"Buy threshold (p80): {buy_threshold:.4f}, Sell threshold (p80): {sell_threshold:.4f}") |
| |
| labels = np.ones(N, dtype=np.int64) |
| labels[(buy_score > buy_threshold) & (norm_ofi > ofi_threshold)] = 0 |
| labels[(sell_score > sell_threshold) & (norm_ofi < -ofi_threshold)] = 2 |
| |
| unique, counts = np.unique(labels, return_counts=True) |
| label_names = {0: '主力买入', 1: '中性', 2: '主力卖出'} |
| print("Label distribution:") |
| for u, c in zip(unique, counts): |
| print(f" {u} ({label_names.get(u, '?')}): {c} ({c/N*100:.1f}%)") |
| |
| return labels |
|
|
|
|
| def create_sequences(features, labels, seq_len=100, stride=20): |
| """Create sliding window sequences using stride_tricks for efficiency.""" |
| N = len(features) |
| F = features.shape[1] |
| n_sequences = (N - seq_len) // stride |
| |
| |
| starts = np.arange(0, N - seq_len, stride) |
| n_sequences = len(starts) |
| |
| print(f"Creating {n_sequences} sequences of length {seq_len}, stride {stride}...") |
| |
| X = np.zeros((n_sequences, seq_len, F), dtype=np.float32) |
| y = np.zeros(n_sequences, dtype=np.int64) |
| |
| for idx, start in enumerate(starts): |
| X[idx] = features[start:start + seq_len] |
| y[idx] = labels[start + seq_len - 1] |
| |
| print(f"Created {n_sequences} sequences, memory: {X.nbytes / 1e6:.1f} MB") |
| return X, y |
|
|
|
|
| class LOBDataset(Dataset): |
| def __init__(self, X, y): |
| self.X = torch.from_numpy(X) |
| self.y = torch.from_numpy(y) |
| |
| def __len__(self): |
| return len(self.X) |
| |
| def __getitem__(self, idx): |
| return self.X[idx], self.y[idx] |
|
|
|
|
| def get_weighted_sampler(y_train): |
| """Create WeightedRandomSampler to oversample minority classes.""" |
| class_counts = np.bincount(y_train) |
| class_weights = 1.0 / class_counts |
| sample_weights = class_weights[y_train] |
| sampler = WeightedRandomSampler( |
| weights=torch.from_numpy(sample_weights).double(), |
| num_samples=len(y_train), |
| replacement=True |
| ) |
| return sampler |
|
|
|
|
| def prepare_data(seq_len=100, stride=5, window=50, ofi_threshold=0.2, |
| percentile=90, test_size=0.15, val_size=0.15, |
| random_state=42, batch_size=64): |
| """ |
| Full data preparation pipeline. |
| Returns train, val, test DataLoaders with balanced sampling. |
| """ |
| cache_path = f"/app/data_v2_{seq_len}_{stride}_{window}.npz" |
| |
| if os.path.exists(cache_path): |
| print(f"Loading cached data from {cache_path}") |
| data = np.load(cache_path, allow_pickle=True) |
| X_train, y_train = data['X_train'], data['y_train'] |
| X_val, y_val = data['X_val'], data['y_val'] |
| X_test, y_test = data['X_test'], data['y_test'] |
| else: |
| |
| df = load_lob_data() |
| |
| |
| features, means, stds = extract_and_normalize_features(df) |
| |
| |
| labels = construct_labels_vectorized(df, window=window, |
| ofi_threshold=ofi_threshold, |
| percentile=percentile) |
| |
| |
| X, y = create_sequences(features, labels, seq_len=seq_len, stride=stride) |
| |
| |
| X_train, X_temp, y_train, y_temp = train_test_split( |
| X, y, test_size=test_size + val_size, random_state=random_state, stratify=y) |
| X_val, X_test, y_val, y_test = train_test_split( |
| X_temp, y_temp, test_size=test_size / (test_size + val_size), |
| random_state=random_state, stratify=y_temp) |
| |
| |
| np.savez_compressed(cache_path, |
| X_train=X_train, y_train=y_train, |
| X_val=X_val, y_val=y_val, |
| X_test=X_test, y_test=y_test, |
| means=means, stds=stds) |
| print(f"Cached to {cache_path}") |
| |
| print(f"Train: {len(X_train)}, Val: {len(X_val)}, Test: {len(X_test)}") |
| |
| |
| for name, ys in [("Train", y_train), ("Val", y_val), ("Test", y_test)]: |
| unique, counts = np.unique(ys, return_counts=True) |
| dist = {u: c for u, c in zip(unique, counts)} |
| print(f" {name}: {dist}") |
| |
| |
| train_dataset = LOBDataset(X_train, y_train) |
| val_dataset = LOBDataset(X_val, y_val) |
| test_dataset = LOBDataset(X_test, y_test) |
| |
| |
| train_sampler = get_weighted_sampler(y_train) |
| |
| train_loader = DataLoader(train_dataset, batch_size=batch_size, sampler=train_sampler, num_workers=0) |
| val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, num_workers=0) |
| test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=0) |
| |
| return train_loader, val_loader, test_loader, y_train |
|
|
|
|
| if __name__ == "__main__": |
| train_loader, val_loader, test_loader, y_train = prepare_data() |
| |
| |
| for X_batch, y_batch in train_loader: |
| print(f"Batch X: {X_batch.shape}, y: {y_batch.shape}") |
| print(f"Batch labels: {y_batch[:20]}") |
| print(f"Batch X range: [{X_batch.min():.4f}, {X_batch.max():.4f}]") |
| break |
|
|