test / api.py
Aviral Jain
Update api.py
d2c1dba verified
Raw
History Blame Contribute Delete
10.6 kB
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import pandas as pd
import numpy as np
import tensorflow as tf
from yahoo_fin.stock_info import get_data
from sklearn.preprocessing import MinMaxScaler
from transformers import AutoTokenizer, AutoModelForSequenceClassification
from pytorch_forecasting import TemporalFusionTransformer
from bs4 import BeautifulSoup
import requests
from dotenv import load_dotenv
import os
from fastapi.middleware.cors import CORSMiddleware
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"
import torch
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
MODEL_PATH = "lib/20_lstm_model.h5"
model = tf.keras.models.load_model(MODEL_PATH)
model_name_news = "mrm8488/distilroberta-finetuned-financial-news-sentiment-analysis"
tokenizer = AutoTokenizer.from_pretrained(model_name_news)
sentiment_model = AutoModelForSequenceClassification.from_pretrained(
model_name_news).to(device)
best_model_path = 'lib/tft_pred.ckpt'
best_tft = TemporalFusionTransformer.load_from_checkpoint(best_model_path).to(device)
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["GET", "POST", "PUT", "DELETE"],
allow_headers=["*"],
)
class TickerRequest(BaseModel):
ticker: str
start_date: str
end_date: str
interval: str = "1d"
def fetch_and_process_ticker_data(ticker, start_date, end_date, interval="1d"):
df = pd.DataFrame()
try:
temp = get_data(ticker, start_date=start_date,
end_date=end_date, index_as_date=True, interval=interval)
temp = temp.drop(columns="close")
temp["revenue"] = temp["adjclose"] * temp["volume"]
temp["daily_profit"] = temp["adjclose"] - temp["open"]
df = pd.concat([df, temp], axis=0)
except Exception as error:
raise HTTPException(
status_code=500, detail=f"Error processing ticker {ticker}: {error}")
return df
def ticker_encoded(df):
label_map = {'ATOM': 0, 'HBIO': 1, 'IBEX': 2, 'MYFW': 3, 'NATH': 4}
ticker_encoded = []
for i in df.iloc():
ticker_name = i['ticker']
encoded_ticker = label_map[ticker_name]
ticker_encoded.append(encoded_ticker)
df['ticker_encoded'] = ticker_encoded
return df
def normalize(df):
price_scaler = MinMaxScaler()
volume_revenue_scaler = MinMaxScaler()
profit_scaler = MinMaxScaler()
df[["open", "high", "low", "adjclose"]] = price_scaler.fit_transform(
df[["open", "high", "low", "adjclose"]])
df[["volume", "revenue"]] = volume_revenue_scaler.fit_transform(
df[["volume", "revenue"]])
df[["daily_profit"]] = profit_scaler.fit_transform(df[["daily_profit"]])
return df, price_scaler
def create_sequence(dataset):
sequences = []
labels = []
dates = []
stock = []
df_copy = dataset.drop(columns=["date"])
start_idx = 0
for stop_idx in range(20, len(dataset)):
set_ = set(dataset.iloc[start_idx:stop_idx]["ticker_encoded"].values)
target_day_ticker_class = dataset.iloc[stop_idx]["ticker_encoded"]
if len(set_) == 1 and target_day_ticker_class == list(set_)[0]:
sequences.append(df_copy.iloc[start_idx:stop_idx].values)
labels.append(df_copy.iloc[stop_idx][["open", "adjclose"]])
date_string = dataset.iloc[stop_idx]["date"].strftime('%Y-%m-%d')
dates.append(date_string)
stock.append(str(dataset.iloc[stop_idx]["ticker_encoded"]))
start_idx += 1
return np.array(sequences), np.array(labels), dates, stock
def scaling_predictions(price_scaler, combined_dataset_prediction):
price_scaler.min_ = np.array([price_scaler.min_[0], price_scaler.min_[3]])
price_scaler.scale_ = np.array(
[price_scaler.scale_[0], price_scaler.scale_[3]])
combined_dataset_prediction_inverse = price_scaler.inverse_transform(
combined_dataset_prediction)
return combined_dataset_prediction_inverse
def storing_predictions(df, dates, stock, combined_dataset_prediction_inverse):
df['pred_open'] = np.nan
df['pred_closing'] = np.nan
for idx, row in df.iterrows():
current_row_date = row.date.strftime('%Y-%m-%d')
current_row_ticker = str(row.ticker_encoded)
for i in range(len(dates)):
if current_row_date == dates[i] and stock[i] == current_row_ticker:
opening_price = combined_dataset_prediction_inverse[i][0]
closing_price = combined_dataset_prediction_inverse[i][1]
df.loc[idx, 'pred_open'] = opening_price
df.loc[idx, 'pred_closing'] = closing_price
break
df = df.dropna(subset=['pred_open', 'pred_closing']).reset_index(drop=True)
return df
def scrape_news(ticker_name):
columns = ['datatime', 'title', 'source',
'link', 'top_sentiment', 'sentiment_score']
df = pd.DataFrame(columns=columns)
for i in range(1, 3):
url = f'https://markets.businessinsider.com/news/{ticker_name}-stock?p={i}'
response = requests.get(url)
html = response.text
soup = BeautifulSoup(html, 'lxml')
articles = soup.find_all('div', class_='latest-news__story')
for article in articles:
datatime = article.find(
'time', class_='latest-news__date').get('datetime')
title = article.find('a', class_='news-link').text
source = article.find('span', class_='latest-news__source').text
link = article.find('a', class_='news-link').get('href')
top_sentiment = ''
sentiment_score = 0
temp = pd.DataFrame(
[[datatime, title, source, link, top_sentiment, sentiment_score]], columns=df.columns)
df = pd.concat([temp, df], axis=0)
return df
def add_recent_news(main_df, news_df, lookback_days=10):
news_df.drop(columns=['top_sentiment', 'sentiment_score'], inplace=True)
main_df['date'] = pd.to_datetime(main_df['date'])
news_df['datatime'] = pd.to_datetime(news_df['datatime'])
news_list = []
last_available_news = ''
for _, row in main_df.iterrows():
current_date = row['date']
current_ticker = row['ticker']
news_articles = ''
for _, news_row in news_df.iterrows():
extracted_date = news_row['datatime']
if (current_date - extracted_date).days <= lookback_days and extracted_date < current_date:
news_articles += news_row['title'] + " "
if not news_articles.strip():
for _, news_row in news_df[::-1].iterrows():
if news_row['datatime'] < current_date:
news_articles = news_row['title']
break
last_available_news = news_articles.strip() or last_available_news
news_list.append(last_available_news)
main_df['news'] = news_list
return main_df
def news_sentiment(df):
news_column_name = 'news'
texts = df[news_column_name].tolist()
inputs = tokenizer(texts, padding=True,
truncation=True, return_tensors="pt")
inputs = {key: val.to(device) for key, val in inputs.items()}
with torch.no_grad():
outputs = sentiment_model(**inputs)
logits = outputs.logits
probs = torch.softmax(logits, dim=-1)
labels = ["negative", "neutral", "positive"]
predictions = torch.argmax(probs, dim=-1)
df['predicted_sentiment'] = pd.Series(
[labels[pred] for pred in predictions], index=df[df[news_column_name].notna()].index)
sentiment_map = {
'positive': 1,
'neutral': 0,
'negative': -1
}
df['sentiment_score'] = df['predicted_sentiment'].map(sentiment_map)
df = df.drop(columns=['news'])
return df
def get_tft_predictions(df):
for i in range(1, 21):
df[f'open_lag_{i}'] = df.groupby('ticker')['open'].shift(i)
df[f'adjclose_lag_{i}'] = df.groupby('ticker')['adjclose'].shift(i)
lag_columns = [f'open_lag_{i}' for i in range(
1, 21)] + [f'adjclose_lag_{i}' for i in range(1, 21)]
df.dropna(subset=lag_columns, inplace=True)
predictions = best_tft.predict(df.to(device), mode="quantiles")
return predictions
@app.get("/fetch-ticker-data/{ticker_name}/{start_date}/{end_date}/{interval}")
async def fetch_ticker_data(ticker_name: str, start_date: str, end_date: str, interval: str):
try:
result_df = fetch_and_process_ticker_data(
ticker=ticker_name,
start_date=start_date,
end_date=end_date,
interval=interval
)
return result_df.to_dict(orient="records")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/predict-prices/{ticker_name}/{start_date}/{end_date}/{interval}")
async def predict_prices(ticker_name: str, start_date: str, end_date: str, interval: str):
try:
result_df = fetch_and_process_ticker_data(
ticker=ticker_name,
start_date=start_date,
end_date=end_date,
interval=interval
)
raw_data = result_df.tail(60)
raw_data = raw_data.reset_index()
raw_data.rename(columns={"index": "date"}, inplace=True)
raw_data = ticker_encoded(raw_data)
temp_df = raw_data.copy()
normalized_data, scaler = normalize(raw_data)
normalized_data = normalized_data.drop(columns=['ticker'])
sequences, _, dates, stock = create_sequence(normalized_data)
combined_dataset_prediction = model.predict(sequences)
combined_dataset_prediction_inverse = scaling_predictions(
scaler, combined_dataset_prediction)
lstm_pred_df = storing_predictions(
temp_df, dates, stock, combined_dataset_prediction_inverse)
news_df = scrape_news(ticker_name=ticker_name)
combined_with_news_df = add_recent_news(lstm_pred_df, news_df)
sentiment_df = news_sentiment(combined_with_news_df)
sentiment_df['time_idx'] = range(1, len(sentiment_df) + 1)
predicted_values = get_tft_predictions(sentiment_df)
final_pred_open_price = predicted_values[0].item()
final_pred_closing_price = predicted_values[1].item()
return {"open": final_pred_open_price, 'close': final_pred_closing_price}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))