lob-pattern-net / data_processor.py
kangkangchen's picture
Upload folder using huggingface_hub
d730c93 verified
"""
Data processing pipeline for LOBPatternNet v2.
Fixed: proper normalization, balanced labeling, oversampling.
"""
import numpy as np
import pandas as pd
from datasets import load_dataset
from sklearn.model_selection import train_test_split
import torch
from torch.utils.data import Dataset, DataLoader, WeightedRandomSampler
import os
def load_lob_data():
"""Load TRADES-LOB dataset from HF Hub."""
ds = load_dataset("LeonardoBerti/TRADES-LOB", split="train")
df = ds.to_pandas()
print(f"Loaded dataset: {len(df)} rows")
return df
def extract_and_normalize_features(df):
"""
Extract and normalize LOB features properly.
Approach:
1. Separate price and size features
2. Prices: normalize relative to mid-price (basis points)
3. Sizes: log-transform then z-score
4. Replace invalid values with 0
5. Final z-score normalization per feature
Returns: (N, 40) normalized features
"""
N = len(df)
# Collect raw features
ask_prices = np.zeros((N, 10), dtype=np.float64)
ask_sizes = np.zeros((N, 10), dtype=np.float64)
bid_prices = np.zeros((N, 10), dtype=np.float64)
bid_sizes = np.zeros((N, 10), dtype=np.float64)
for i in range(10):
ask_prices[:, i] = df[f'ask_price_{i+1}'].values.astype(np.float64)
ask_sizes[:, i] = df[f'ask_size_{i+1}'].values.astype(np.float64)
bid_prices[:, i] = df[f'bid_price_{i+1}'].values.astype(np.float64)
bid_sizes[:, i] = df[f'bid_size_{i+1}'].values.astype(np.float64)
# Mark sentinel/invalid values
SENTINEL = 1e9
ask_p_valid = np.abs(ask_prices) < SENTINEL
ask_s_valid = np.abs(ask_sizes) < SENTINEL
bid_p_valid = np.abs(bid_prices) < SENTINEL
bid_s_valid = np.abs(bid_sizes) < SENTINEL
n_invalid = (~ask_p_valid).sum() + (~bid_p_valid).sum() + (~ask_s_valid).sum() + (~bid_s_valid).sum()
print(f"Found {n_invalid} invalid/sentinel values")
# Compute mid-price from valid best bid/ask
best_ask = ask_prices[:, 0].copy()
best_bid = bid_prices[:, 0].copy()
both_valid = ask_p_valid[:, 0] & bid_p_valid[:, 0]
mid_price = np.where(both_valid, (best_ask + best_bid) / 2.0, 0.0)
# Forward-fill mid_price where it's 0
for i in range(1, N):
if mid_price[i] == 0 and mid_price[i-1] != 0:
mid_price[i] = mid_price[i-1]
# Normalize prices: (price - mid) / mid * 10000 = basis points
norm_ask_p = np.zeros_like(ask_prices)
norm_bid_p = np.zeros_like(bid_prices)
for i in range(10):
valid_a = ask_p_valid[:, i] & (mid_price > 0)
valid_b = bid_p_valid[:, i] & (mid_price > 0)
norm_ask_p[valid_a, i] = (ask_prices[valid_a, i] - mid_price[valid_a]) / mid_price[valid_a] * 10000
norm_bid_p[valid_b, i] = (bid_prices[valid_b, i] - mid_price[valid_b]) / mid_price[valid_b] * 10000
# Normalize sizes: log1p then z-score
norm_ask_s = np.zeros_like(ask_sizes)
norm_bid_s = np.zeros_like(bid_sizes)
for i in range(10):
valid_a = ask_s_valid[:, i] & (ask_sizes[:, i] > 0)
valid_b = bid_s_valid[:, i] & (bid_sizes[:, i] > 0)
norm_ask_s[valid_a, i] = np.log1p(ask_sizes[valid_a, i])
norm_bid_s[valid_b, i] = np.log1p(bid_sizes[valid_b, i])
# Assemble into (N, 40) array: [ask_p_1, ask_s_1, bid_p_1, bid_s_1, ...]
features = np.zeros((N, 40), dtype=np.float32)
for i in range(10):
features[:, i*4] = norm_ask_p[:, i]
features[:, i*4+1] = norm_ask_s[:, i]
features[:, i*4+2] = norm_bid_p[:, i]
features[:, i*4+3] = norm_bid_s[:, i]
# Final z-score normalization per feature (critical for model convergence)
means = features.mean(axis=0)
stds = features.std(axis=0)
stds[stds < 1e-8] = 1.0 # avoid division by 0
features = (features - means) / stds
# Replace any remaining NaN/inf
features = np.nan_to_num(features, nan=0.0, posinf=0.0, neginf=0.0)
print(f"Feature shape: {features.shape}")
print(f"Feature range: [{features.min():.4f}, {features.max():.4f}]")
print(f"Feature mean: {features.mean():.6f}, std: {features.std():.4f}")
return features, means, stds
def rolling_sum(arr, window):
"""Fully vectorized rolling sum using cumsum trick."""
cum = np.cumsum(arr)
result = np.zeros_like(cum)
result[window:] = cum[window:] - cum[:-window]
return result
def construct_labels_vectorized(df, window=50, ofi_threshold=0.15, percentile=85):
"""
Fully vectorized label construction for institutional trading detection.
Uses rolling windows and relaxed thresholds for better class balance.
"""
N = len(df)
buy_sell = df['BUY_SELL_FLAG'].values.astype(np.float32) # 1=buy, 0=sell
sizes = df['SIZE'].values.astype(np.float32)
types = df['TYPE'].values
print(f"Constructing labels from {N} events, window={window}...")
# Signed volume
signed_vol = sizes * (2 * buy_sell - 1)
# Rolling sums (vectorized)
roll_signed = rolling_sum(signed_vol, window)
roll_total = rolling_sum(sizes, window)
norm_ofi = roll_signed / (roll_total + 1e-8)
# Large orders
is_large = (sizes > np.percentile(sizes, percentile)).astype(np.float32)
roll_large_buy = rolling_sum(is_large * buy_sell, window)
roll_large_sell = rolling_sum(is_large * (1 - buy_sell), window)
# Cancellation rate
is_cancel = (types == 'ORDER_CANCELLED').astype(np.float32)
roll_cancel = rolling_sum(is_cancel, window) / window
# Combined scores
large_diff = (roll_large_buy - roll_large_sell) / (window * 0.1 + 1e-8)
buy_score = norm_ofi + 0.3 * large_diff + 0.2 * roll_cancel
sell_score = -norm_ofi - 0.3 * large_diff + 0.2 * roll_cancel
# Use percentile thresholds for ~15-20% per class
valid = np.arange(window, N)
buy_threshold = np.percentile(buy_score[valid], 80)
sell_threshold = np.percentile(sell_score[valid], 80)
print(f"Buy threshold (p80): {buy_threshold:.4f}, Sell threshold (p80): {sell_threshold:.4f}")
labels = np.ones(N, dtype=np.int64)
labels[(buy_score > buy_threshold) & (norm_ofi > ofi_threshold)] = 0
labels[(sell_score > sell_threshold) & (norm_ofi < -ofi_threshold)] = 2
unique, counts = np.unique(labels, return_counts=True)
label_names = {0: '主力买入', 1: '中性', 2: '主力卖出'}
print("Label distribution:")
for u, c in zip(unique, counts):
print(f" {u} ({label_names.get(u, '?')}): {c} ({c/N*100:.1f}%)")
return labels
def create_sequences(features, labels, seq_len=100, stride=20):
"""Create sliding window sequences using stride_tricks for efficiency."""
N = len(features)
F = features.shape[1]
n_sequences = (N - seq_len) // stride
# Use list comprehension (more memory efficient than pre-allocating huge array)
starts = np.arange(0, N - seq_len, stride)
n_sequences = len(starts)
print(f"Creating {n_sequences} sequences of length {seq_len}, stride {stride}...")
X = np.zeros((n_sequences, seq_len, F), dtype=np.float32)
y = np.zeros(n_sequences, dtype=np.int64)
for idx, start in enumerate(starts):
X[idx] = features[start:start + seq_len]
y[idx] = labels[start + seq_len - 1]
print(f"Created {n_sequences} sequences, memory: {X.nbytes / 1e6:.1f} MB")
return X, y
class LOBDataset(Dataset):
def __init__(self, X, y):
self.X = torch.from_numpy(X)
self.y = torch.from_numpy(y)
def __len__(self):
return len(self.X)
def __getitem__(self, idx):
return self.X[idx], self.y[idx]
def get_weighted_sampler(y_train):
"""Create WeightedRandomSampler to oversample minority classes."""
class_counts = np.bincount(y_train)
class_weights = 1.0 / class_counts
sample_weights = class_weights[y_train]
sampler = WeightedRandomSampler(
weights=torch.from_numpy(sample_weights).double(),
num_samples=len(y_train),
replacement=True
)
return sampler
def prepare_data(seq_len=100, stride=5, window=50, ofi_threshold=0.2,
percentile=90, test_size=0.15, val_size=0.15,
random_state=42, batch_size=64):
"""
Full data preparation pipeline.
Returns train, val, test DataLoaders with balanced sampling.
"""
cache_path = f"/app/data_v2_{seq_len}_{stride}_{window}.npz"
if os.path.exists(cache_path):
print(f"Loading cached data from {cache_path}")
data = np.load(cache_path, allow_pickle=True)
X_train, y_train = data['X_train'], data['y_train']
X_val, y_val = data['X_val'], data['y_val']
X_test, y_test = data['X_test'], data['y_test']
else:
# Load raw data
df = load_lob_data()
# Extract and normalize features
features, means, stds = extract_and_normalize_features(df)
# Construct labels
labels = construct_labels_vectorized(df, window=window,
ofi_threshold=ofi_threshold,
percentile=percentile)
# Create sequences
X, y = create_sequences(features, labels, seq_len=seq_len, stride=stride)
# Split (stratified)
X_train, X_temp, y_train, y_temp = train_test_split(
X, y, test_size=test_size + val_size, random_state=random_state, stratify=y)
X_val, X_test, y_val, y_test = train_test_split(
X_temp, y_temp, test_size=test_size / (test_size + val_size),
random_state=random_state, stratify=y_temp)
# Save cache
np.savez_compressed(cache_path,
X_train=X_train, y_train=y_train,
X_val=X_val, y_val=y_val,
X_test=X_test, y_test=y_test,
means=means, stds=stds)
print(f"Cached to {cache_path}")
print(f"Train: {len(X_train)}, Val: {len(X_val)}, Test: {len(X_test)}")
# Print label distributions
for name, ys in [("Train", y_train), ("Val", y_val), ("Test", y_test)]:
unique, counts = np.unique(ys, return_counts=True)
dist = {u: c for u, c in zip(unique, counts)}
print(f" {name}: {dist}")
# Create datasets
train_dataset = LOBDataset(X_train, y_train)
val_dataset = LOBDataset(X_val, y_val)
test_dataset = LOBDataset(X_test, y_test)
# Weighted sampler for training (oversamples minority classes)
train_sampler = get_weighted_sampler(y_train)
train_loader = DataLoader(train_dataset, batch_size=batch_size, sampler=train_sampler, num_workers=0)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, num_workers=0)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=0)
return train_loader, val_loader, test_loader, y_train
if __name__ == "__main__":
train_loader, val_loader, test_loader, y_train = prepare_data()
# Check a batch
for X_batch, y_batch in train_loader:
print(f"Batch X: {X_batch.shape}, y: {y_batch.shape}")
print(f"Batch labels: {y_batch[:20]}")
print(f"Batch X range: [{X_batch.min():.4f}, {X_batch.max():.4f}]")
break