""" Data processing pipeline for LOBPatternNet v2. Fixed: proper normalization, balanced labeling, oversampling. """ import numpy as np import pandas as pd from datasets import load_dataset from sklearn.model_selection import train_test_split import torch from torch.utils.data import Dataset, DataLoader, WeightedRandomSampler import os def load_lob_data(): """Load TRADES-LOB dataset from HF Hub.""" ds = load_dataset("LeonardoBerti/TRADES-LOB", split="train") df = ds.to_pandas() print(f"Loaded dataset: {len(df)} rows") return df def extract_and_normalize_features(df): """ Extract and normalize LOB features properly. Approach: 1. Separate price and size features 2. Prices: normalize relative to mid-price (basis points) 3. Sizes: log-transform then z-score 4. Replace invalid values with 0 5. Final z-score normalization per feature Returns: (N, 40) normalized features """ N = len(df) # Collect raw features ask_prices = np.zeros((N, 10), dtype=np.float64) ask_sizes = np.zeros((N, 10), dtype=np.float64) bid_prices = np.zeros((N, 10), dtype=np.float64) bid_sizes = np.zeros((N, 10), dtype=np.float64) for i in range(10): ask_prices[:, i] = df[f'ask_price_{i+1}'].values.astype(np.float64) ask_sizes[:, i] = df[f'ask_size_{i+1}'].values.astype(np.float64) bid_prices[:, i] = df[f'bid_price_{i+1}'].values.astype(np.float64) bid_sizes[:, i] = df[f'bid_size_{i+1}'].values.astype(np.float64) # Mark sentinel/invalid values SENTINEL = 1e9 ask_p_valid = np.abs(ask_prices) < SENTINEL ask_s_valid = np.abs(ask_sizes) < SENTINEL bid_p_valid = np.abs(bid_prices) < SENTINEL bid_s_valid = np.abs(bid_sizes) < SENTINEL n_invalid = (~ask_p_valid).sum() + (~bid_p_valid).sum() + (~ask_s_valid).sum() + (~bid_s_valid).sum() print(f"Found {n_invalid} invalid/sentinel values") # Compute mid-price from valid best bid/ask best_ask = ask_prices[:, 0].copy() best_bid = bid_prices[:, 0].copy() both_valid = ask_p_valid[:, 0] & bid_p_valid[:, 0] mid_price = np.where(both_valid, (best_ask + best_bid) / 2.0, 0.0) # Forward-fill mid_price where it's 0 for i in range(1, N): if mid_price[i] == 0 and mid_price[i-1] != 0: mid_price[i] = mid_price[i-1] # Normalize prices: (price - mid) / mid * 10000 = basis points norm_ask_p = np.zeros_like(ask_prices) norm_bid_p = np.zeros_like(bid_prices) for i in range(10): valid_a = ask_p_valid[:, i] & (mid_price > 0) valid_b = bid_p_valid[:, i] & (mid_price > 0) norm_ask_p[valid_a, i] = (ask_prices[valid_a, i] - mid_price[valid_a]) / mid_price[valid_a] * 10000 norm_bid_p[valid_b, i] = (bid_prices[valid_b, i] - mid_price[valid_b]) / mid_price[valid_b] * 10000 # Normalize sizes: log1p then z-score norm_ask_s = np.zeros_like(ask_sizes) norm_bid_s = np.zeros_like(bid_sizes) for i in range(10): valid_a = ask_s_valid[:, i] & (ask_sizes[:, i] > 0) valid_b = bid_s_valid[:, i] & (bid_sizes[:, i] > 0) norm_ask_s[valid_a, i] = np.log1p(ask_sizes[valid_a, i]) norm_bid_s[valid_b, i] = np.log1p(bid_sizes[valid_b, i]) # Assemble into (N, 40) array: [ask_p_1, ask_s_1, bid_p_1, bid_s_1, ...] features = np.zeros((N, 40), dtype=np.float32) for i in range(10): features[:, i*4] = norm_ask_p[:, i] features[:, i*4+1] = norm_ask_s[:, i] features[:, i*4+2] = norm_bid_p[:, i] features[:, i*4+3] = norm_bid_s[:, i] # Final z-score normalization per feature (critical for model convergence) means = features.mean(axis=0) stds = features.std(axis=0) stds[stds < 1e-8] = 1.0 # avoid division by 0 features = (features - means) / stds # Replace any remaining NaN/inf features = np.nan_to_num(features, nan=0.0, posinf=0.0, neginf=0.0) print(f"Feature shape: {features.shape}") print(f"Feature range: [{features.min():.4f}, {features.max():.4f}]") print(f"Feature mean: {features.mean():.6f}, std: {features.std():.4f}") return features, means, stds def rolling_sum(arr, window): """Fully vectorized rolling sum using cumsum trick.""" cum = np.cumsum(arr) result = np.zeros_like(cum) result[window:] = cum[window:] - cum[:-window] return result def construct_labels_vectorized(df, window=50, ofi_threshold=0.15, percentile=85): """ Fully vectorized label construction for institutional trading detection. Uses rolling windows and relaxed thresholds for better class balance. """ N = len(df) buy_sell = df['BUY_SELL_FLAG'].values.astype(np.float32) # 1=buy, 0=sell sizes = df['SIZE'].values.astype(np.float32) types = df['TYPE'].values print(f"Constructing labels from {N} events, window={window}...") # Signed volume signed_vol = sizes * (2 * buy_sell - 1) # Rolling sums (vectorized) roll_signed = rolling_sum(signed_vol, window) roll_total = rolling_sum(sizes, window) norm_ofi = roll_signed / (roll_total + 1e-8) # Large orders is_large = (sizes > np.percentile(sizes, percentile)).astype(np.float32) roll_large_buy = rolling_sum(is_large * buy_sell, window) roll_large_sell = rolling_sum(is_large * (1 - buy_sell), window) # Cancellation rate is_cancel = (types == 'ORDER_CANCELLED').astype(np.float32) roll_cancel = rolling_sum(is_cancel, window) / window # Combined scores large_diff = (roll_large_buy - roll_large_sell) / (window * 0.1 + 1e-8) buy_score = norm_ofi + 0.3 * large_diff + 0.2 * roll_cancel sell_score = -norm_ofi - 0.3 * large_diff + 0.2 * roll_cancel # Use percentile thresholds for ~15-20% per class valid = np.arange(window, N) buy_threshold = np.percentile(buy_score[valid], 80) sell_threshold = np.percentile(sell_score[valid], 80) print(f"Buy threshold (p80): {buy_threshold:.4f}, Sell threshold (p80): {sell_threshold:.4f}") labels = np.ones(N, dtype=np.int64) labels[(buy_score > buy_threshold) & (norm_ofi > ofi_threshold)] = 0 labels[(sell_score > sell_threshold) & (norm_ofi < -ofi_threshold)] = 2 unique, counts = np.unique(labels, return_counts=True) label_names = {0: '主力买入', 1: '中性', 2: '主力卖出'} print("Label distribution:") for u, c in zip(unique, counts): print(f" {u} ({label_names.get(u, '?')}): {c} ({c/N*100:.1f}%)") return labels def create_sequences(features, labels, seq_len=100, stride=20): """Create sliding window sequences using stride_tricks for efficiency.""" N = len(features) F = features.shape[1] n_sequences = (N - seq_len) // stride # Use list comprehension (more memory efficient than pre-allocating huge array) starts = np.arange(0, N - seq_len, stride) n_sequences = len(starts) print(f"Creating {n_sequences} sequences of length {seq_len}, stride {stride}...") X = np.zeros((n_sequences, seq_len, F), dtype=np.float32) y = np.zeros(n_sequences, dtype=np.int64) for idx, start in enumerate(starts): X[idx] = features[start:start + seq_len] y[idx] = labels[start + seq_len - 1] print(f"Created {n_sequences} sequences, memory: {X.nbytes / 1e6:.1f} MB") return X, y class LOBDataset(Dataset): def __init__(self, X, y): self.X = torch.from_numpy(X) self.y = torch.from_numpy(y) def __len__(self): return len(self.X) def __getitem__(self, idx): return self.X[idx], self.y[idx] def get_weighted_sampler(y_train): """Create WeightedRandomSampler to oversample minority classes.""" class_counts = np.bincount(y_train) class_weights = 1.0 / class_counts sample_weights = class_weights[y_train] sampler = WeightedRandomSampler( weights=torch.from_numpy(sample_weights).double(), num_samples=len(y_train), replacement=True ) return sampler def prepare_data(seq_len=100, stride=5, window=50, ofi_threshold=0.2, percentile=90, test_size=0.15, val_size=0.15, random_state=42, batch_size=64): """ Full data preparation pipeline. Returns train, val, test DataLoaders with balanced sampling. """ cache_path = f"/app/data_v2_{seq_len}_{stride}_{window}.npz" if os.path.exists(cache_path): print(f"Loading cached data from {cache_path}") data = np.load(cache_path, allow_pickle=True) X_train, y_train = data['X_train'], data['y_train'] X_val, y_val = data['X_val'], data['y_val'] X_test, y_test = data['X_test'], data['y_test'] else: # Load raw data df = load_lob_data() # Extract and normalize features features, means, stds = extract_and_normalize_features(df) # Construct labels labels = construct_labels_vectorized(df, window=window, ofi_threshold=ofi_threshold, percentile=percentile) # Create sequences X, y = create_sequences(features, labels, seq_len=seq_len, stride=stride) # Split (stratified) X_train, X_temp, y_train, y_temp = train_test_split( X, y, test_size=test_size + val_size, random_state=random_state, stratify=y) X_val, X_test, y_val, y_test = train_test_split( X_temp, y_temp, test_size=test_size / (test_size + val_size), random_state=random_state, stratify=y_temp) # Save cache np.savez_compressed(cache_path, X_train=X_train, y_train=y_train, X_val=X_val, y_val=y_val, X_test=X_test, y_test=y_test, means=means, stds=stds) print(f"Cached to {cache_path}") print(f"Train: {len(X_train)}, Val: {len(X_val)}, Test: {len(X_test)}") # Print label distributions for name, ys in [("Train", y_train), ("Val", y_val), ("Test", y_test)]: unique, counts = np.unique(ys, return_counts=True) dist = {u: c for u, c in zip(unique, counts)} print(f" {name}: {dist}") # Create datasets train_dataset = LOBDataset(X_train, y_train) val_dataset = LOBDataset(X_val, y_val) test_dataset = LOBDataset(X_test, y_test) # Weighted sampler for training (oversamples minority classes) train_sampler = get_weighted_sampler(y_train) train_loader = DataLoader(train_dataset, batch_size=batch_size, sampler=train_sampler, num_workers=0) val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, num_workers=0) test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=0) return train_loader, val_loader, test_loader, y_train if __name__ == "__main__": train_loader, val_loader, test_loader, y_train = prepare_data() # Check a batch for X_batch, y_batch in train_loader: print(f"Batch X: {X_batch.shape}, y: {y_batch.shape}") print(f"Batch labels: {y_batch[:20]}") print(f"Batch X range: [{X_batch.min():.4f}, {X_batch.max():.4f}]") break