AMDRisk / app.py
Hou
update app
2b61dd5
Raw
History Blame Contribute Delete
18.8 kB
import argparse
import json
import pickle
import sys
from pathlib import Path
import gradio as gr
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.nn.functional as F
# Allow imports from ./deepseenet without requiring it to be a package.
PROJECT_ROOT = Path(__file__).resolve().parent
DEEPSEENET_DIR = PROJECT_ROOT / "deepseenet"
sys.path.insert(0, str(DEEPSEENET_DIR))
from augmentations import get_val_transforms # noqa: E402
from model import DeepSeeNet # noqa: E402
N_CLASSES = {
"DRUS": 3,
"PIG": 2,
}
LABELS = {
"DRUS": ["small_none", "medium", "large"],
"PIG": ["no_pigment", "pigment"],
}
APP_CSS = """
#terms-overlay {
position: fixed;
inset: 0;
z-index: 9999;
display: flex;
align-items: center;
justify-content: center;
padding: 32px;
background: rgba(255, 255, 255, 0.72);
backdrop-filter: blur(3px);
-webkit-backdrop-filter: blur(3px);
}
#terms-card {
width: min(800px, calc(100vw - 64px));
max-height: 86vh;
overflow-y: auto;
padding: 24px 28px;
border: 1px solid #d9d9df;
border-radius: 8px;
background: #f3f3f5;
box-shadow: 0 12px 36px rgba(0, 0, 0, 0.12);
color: #1f2933;
}
#terms-card h1 {
margin-top: 0;
font-size: 1.55rem;
}
#terms-card h2 {
margin-top: 1.35rem;
font-size: 1.15rem;
}
#terms-card p,
#terms-card ul {
font-size: 0.95rem;
line-height: 1.55;
margin-left: 1.1rem;
}
#terms-card li {
margin-bottom: 0.35rem;
}
#acknowledge-btn {
width: 100%;
margin-top: 16px;
padding: 10px 14px;
border: none;
border-radius: 6px;
background: #2563eb;
color: white;
font-weight: 600;
cursor: pointer;
}
#acknowledge-btn:hover {
background: #1d4ed8;
}
"""
ACKNOWLEDGEMENT_TEXT = "I acknowledge and continue"
TERMS_OVERLAY_HTML = f"""
<div id="terms-overlay">
<div id="terms-card">
<h1>AMD Risk Prediction Demo — Research Use Only</h1>
<p>This demo is a modernized research implementation based on the original AMD risk prediction work. It is provided solely for research, educational, and reproducibility purposes.</p>
<h2>Not for Clinical Use</h2>
<p>This Gradio demo and its associated model outputs are provided for research, educational, and reproducibility purposes only, and are not intended for clinical diagnosis, treatment, screening, triage, patient management, or any other medical decision-making purpose.</p>
<p>This demo implementation has not been clinically validated for routine patient care and has not been reviewed, cleared, or approved by any regulatory authority, including the U.S. Food and Drug Administration.</p>
<h2>Data Privacy</h2>
<p>Do not upload protected health information, personally identifiable information, or patient data unless you have the necessary rights, permissions, and safeguards to do so.</p>
<h2>User Responsibility</h2>
<p>By using this demo, you acknowledge that:</p>
<ul>
<li>You understand this demo is for research and educational purposes only.</li>
<li>You will not use the outputs for diagnosis, treatment, screening, triage, or clinical decision-making.</li>
<li>You are responsible for how you interpret and use any results generated by the demo.</li>
<li>You will not upload protected health information or identifiable patient data unless you are authorized to do so.</li>
</ul>
<h2>Attribution</h2>
<p>This project is a modernized reimplementation of the DeepSeeNet AMD risk prediction work. Please cite the original publication when using this project:</p>
<p>Peng Y, Keenan TD, Chen Q, Agrón E, Allot A, Wong WT, Chew EY, Lu Z. Predicting risk of late age-related macular degeneration using deep learning. NPJ Digital Medicine. 2020 Aug 27;3(1):111.</p>
<h2>License and Warranty Disclaimer</h2>
<p>Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the “Software”), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:</p>
<p>The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.</p>
<p>THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.</p>
<button id="acknowledge-btn" onclick="document.getElementById('terms-overlay').remove();">{ACKNOWLEDGEMENT_TEXT}</button>
</div>
</div>
"""
class AlbumentationsTransform:
def __init__(self, transform):
self.transform = transform
def __call__(self, image):
return self.transform(image=np.asarray(image))["image"]
class FeatureModel(nn.Module):
"""
Wraps DeepSeeNet and captures the input to the final Linear layer.
This should match the feature extraction used in deepseenet/extract_features.py.
"""
def __init__(self, model: nn.Module):
super().__init__()
self.model = model
self.features = None
final_linear = self._find_last_linear(model)
final_linear.register_forward_pre_hook(self._capture_penultimate)
@staticmethod
def _find_last_linear(model: nn.Module) -> nn.Linear:
last_linear = None
for module in model.modules():
if isinstance(module, nn.Linear):
last_linear = module
if last_linear is None:
raise RuntimeError("Could not find final nn.Linear layer in DeepSeeNet.")
return last_linear
def _capture_penultimate(self, module, inputs):
x = inputs[0]
if isinstance(x, (tuple, list)):
x = x[0]
if x.ndim > 2:
x = torch.flatten(x, start_dim=1)
self.features = x.detach()
def forward(self, x):
self.features = None
out = self.model(x)
if isinstance(out, (tuple, list)):
logits = out[0]
else:
logits = out
if self.features is None:
raise RuntimeError("Feature hook did not capture penultimate features.")
return logits, self.features
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument("--checkpoint-folder", default="deepseenet/weights")
parser.add_argument("--drusen-weights", default=None)
parser.add_argument("--pigment-weights", default=None)
parser.add_argument("--backbone", default="inception_v3")
parser.add_argument("--image-size", type=int, default=1024)
parser.add_argument(
"--late-amd-model-dir",
default="runs/stage2_late_amd_deep_clinical_block4_p001",
)
parser.add_argument(
"--anyga-model-dir",
default="runs/stage2_anyga_deep_clinical_block2_p001",
)
parser.add_argument(
"--nv-model-dir",
default="runs/stage2_nv_deep_clinical_block2_p001",
)
parser.add_argument("--max-years", type=int, default=12)
parser.add_argument("--server-name", default="127.0.0.1")
parser.add_argument("--server-port", type=int, default=7860)
parser.add_argument("--share", action="store_true")
return parser.parse_args()
def resolve_path(path):
path = Path(path)
if path.is_absolute():
return path
return PROJECT_ROOT / path
def load_deepseenet_model(path, task, backbone, device):
path = resolve_path(path)
checkpoint = torch.load(path, map_location=device)
checkpoint_args = checkpoint.get("args", {}) if isinstance(checkpoint, dict) else {}
model = DeepSeeNet(
n_classes=N_CLASSES[task],
backbone=checkpoint_args.get("backbone", backbone),
pretrained=False,
).to(device)
state_dict = checkpoint["model"] if isinstance(checkpoint, dict) and "model" in checkpoint else checkpoint
model.load_state_dict(state_dict)
model.eval()
return FeatureModel(model).to(device).eval()
def load_survival_bundle(model_dir):
model_dir = resolve_path(model_dir)
with open(model_dir / "cox_model.pkl", "rb") as f:
cox_model = pickle.load(f)
with open(model_dir / "scaler.pkl", "rb") as f:
scaler = pickle.load(f)
with open(model_dir / "config.json", "r") as f:
config = json.load(f)
return {
"model_dir": str(model_dir),
"cox_model": cox_model,
"scaler": scaler,
"config": config,
"feature_cols": config["feature_cols"],
}
def load_image(image, transform, device):
if image is None:
raise ValueError("Please upload both left and right images.")
image = image.convert("RGB")
return transform(image).unsqueeze(0).to(device)
@torch.no_grad()
def run_feature_model(model, image_tensor, task):
logits, features = model(image_tensor)
logits_1d = logits[0].detach().cpu()
probs = F.softmax(logits_1d, dim=0).numpy()
pred = int(np.argmax(probs))
features_1d = features[0].detach().cpu().numpy().astype(np.float32)
return {
"prediction": pred,
"label": LABELS[task][pred],
"confidence": float(probs[pred]),
"probabilities": {
LABELS[task][i]: float(probs[i])
for i in range(len(LABELS[task]))
},
"features": features_1d,
}
def make_feature_dict(left_drus, right_drus, left_pig, right_pig, age, smkever):
feature_dict = {}
# Structured grading-style features.
# These allow the app to also run a stage1 grading_clinical Cox model
# if you point the endpoint directory to one.
feature_dict["LE_DRUS"] = float(left_drus["prediction"])
feature_dict["RE_DRUS"] = float(right_drus["prediction"])
feature_dict["LE_PIG"] = float(left_pig["prediction"])
feature_dict["RE_PIG"] = float(right_pig["prediction"])
# DeepSeeNet hidden features. Paper-faithful order:
# LE_DRUS, RE_DRUS, LE_PIG, RE_PIG.
for i, value in enumerate(left_drus["features"]):
feature_dict[f"LE_DRUS_{i:03d}"] = float(value)
for i, value in enumerate(right_drus["features"]):
feature_dict[f"RE_DRUS_{i:03d}"] = float(value)
for i, value in enumerate(left_pig["features"]):
feature_dict[f"LE_PIG_{i:03d}"] = float(value)
for i, value in enumerate(right_pig["features"]):
feature_dict[f"RE_PIG_{i:03d}"] = float(value)
# Clinical variables used by the Cox models.
feature_dict["age"] = float(age)
feature_dict["smkever"] = float(smkever)
return feature_dict
def predict_survival(bundle, feature_dict, max_years):
feature_cols = bundle["feature_cols"]
missing = [c for c in feature_cols if c not in feature_dict]
if missing:
raise ValueError(
"The selected Cox model requires features that the app could not build:\n"
+ "\n".join(missing[:20])
)
x = np.array([[feature_dict[c] for c in feature_cols]], dtype=np.float32)
x_scaled = bundle["scaler"].transform(x)
x_df = pd.DataFrame(x_scaled, columns=feature_cols)
risk_score = float(bundle["cox_model"].predict_partial_hazard(x_df).values.reshape(-1)[0])
years = list(range(1, max_years + 1))
surv = bundle["cox_model"].predict_survival_function(x_df, times=years)
rows = []
for year in years:
survival_prob = float(surv.loc[year].values[0])
risk = 1.0 - survival_prob
rows.append(
{
"Year": year,
"Survival probability": survival_prob,
"Risk probability": risk,
"Risk (%)": 100.0 * risk,
}
)
return risk_score, pd.DataFrame(rows)
def format_probs(probabilities):
return " | ".join(
f"{label}: {prob:.3f}"
for label, prob in probabilities.items()
)
def make_app(args):
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
checkpoint_folder = resolve_path(args.checkpoint_folder)
drusen_weights = args.drusen_weights or str(checkpoint_folder / "drus.pt")
pigment_weights = args.pigment_weights or str(checkpoint_folder / "pig.pt")
transform = AlbumentationsTransform(get_val_transforms(args.image_size))
drus_model = load_deepseenet_model(
drusen_weights,
task="DRUS",
backbone=args.backbone,
device=device,
)
pig_model = load_deepseenet_model(
pigment_weights,
task="PIG",
backbone=args.backbone,
device=device,
)
survival_bundles = {
"Late AMD": load_survival_bundle(args.late_amd_model_dir),
"Any GA": load_survival_bundle(args.anyga_model_dir),
"NV": load_survival_bundle(args.nv_model_dir),
}
endpoint_to_label = {
"Late AMD": "late AMD",
"Any GA": "geographic atrophy",
"NV": "neovascular AMD",
}
def run(left_image, right_image, endpoint, age, smkever, prediction_year):
left_tensor = load_image(left_image, transform, device)
right_tensor = load_image(right_image, transform, device)
left_drus = run_feature_model(drus_model, left_tensor, "DRUS")
right_drus = run_feature_model(drus_model, right_tensor, "DRUS")
left_pig = run_feature_model(pig_model, left_tensor, "PIG")
right_pig = run_feature_model(pig_model, right_tensor, "PIG")
feature_dict = make_feature_dict(
left_drus=left_drus,
right_drus=right_drus,
left_pig=left_pig,
right_pig=right_pig,
age=age,
smkever=smkever,
)
bundle = survival_bundles[endpoint]
risk_score, risk_df = predict_survival(
bundle=bundle,
feature_dict=feature_dict,
max_years=args.max_years,
)
year_row = risk_df[risk_df["Year"] == int(prediction_year)].iloc[0]
selected_risk = float(year_row["Risk probability"])
selected_survival = float(year_row["Survival probability"])
summary_rows = [
["Endpoint", endpoint_to_label[endpoint]],
["Prediction year", f"{int(prediction_year)} years"],
["Risk", f"{100.0 * selected_risk:.2f}%"],
["Survival probability", f"{100.0 * selected_survival:.2f}%"],
["Cox partial hazard", f"{risk_score:.4f}"],
# ["Cox model directory", bundle["model_dir"]],
["Number of Cox features", len(bundle["feature_cols"])],
]
grading_rows = [
[
"Left",
left_drus["label"],
f"{left_drus['confidence']:.3f}",
format_probs(left_drus["probabilities"]),
left_pig["label"],
f"{left_pig['confidence']:.3f}",
format_probs(left_pig["probabilities"]),
],
[
"Right",
right_drus["label"],
f"{right_drus['confidence']:.3f}",
format_probs(right_drus["probabilities"]),
right_pig["label"],
f"{right_pig['confidence']:.3f}",
format_probs(right_pig["probabilities"]),
],
]
# Clean display formatting.
display_risk_df = risk_df.copy()
display_risk_df["Survival probability"] = display_risk_df["Survival probability"].map(lambda x: f"{x:.4f}")
display_risk_df["Risk probability"] = display_risk_df["Risk probability"].map(lambda x: f"{x:.4f}")
display_risk_df["Risk (%)"] = display_risk_df["Risk (%)"].map(lambda x: f"{x:.2f}")
return summary_rows, grading_rows, display_risk_df
model_info = (
"# AMD Risk Prediction\n\n"
f"DeepSeeNet backbone: `{args.backbone}` | "
f"Input size: `{args.image_size} × {args.image_size}` | "
f"Device: `{device.type}` "
# f"Drusen weights: `{drusen_weights}` | "
# f"Pigment weights: `{pigment_weights}`"
)
with gr.Blocks(title="AMD Risk Prediction", css=APP_CSS) as demo:
gr.HTML(TERMS_OVERLAY_HTML)
gr.Markdown(model_info)
with gr.Row():
left_image = gr.Image(type="pil", label="Left eye CFP")
right_image = gr.Image(type="pil", label="Right eye CFP")
with gr.Row():
endpoint = gr.Dropdown(
choices=["Late AMD", "Any GA", "NV"],
value="Late AMD",
label="Endpoint",
)
prediction_year = gr.Slider(
minimum=1,
maximum=args.max_years,
value=5,
step=1,
label="Prediction horizon, years",
)
with gr.Row():
age = gr.Number(value=70, label="Age")
smkever = gr.Dropdown(
choices=[
("Never smoker / smkever=0", 0),
("Ever smoker / smkever=1", 1),
],
value=1,
label="Smoking history",
)
button = gr.Button("Run AMD risk prediction")
summary = gr.Dataframe(
headers=["Item", "Result"],
label="Risk summary",
)
grading = gr.Dataframe(
headers=[
"Eye",
"Drusen prediction",
"Drusen confidence",
"Drusen probabilities",
"Pigment prediction",
"Pigment confidence",
"Pigment probabilities",
],
label="DeepSeeNet image outputs",
)
risk_table = gr.Dataframe(
label="Risk curve",
)
button.click(
run,
inputs=[
left_image,
right_image,
endpoint,
age,
smkever,
prediction_year,
],
outputs=[
summary,
grading,
risk_table,
],
)
return demo
def main():
args = parse_args()
demo = make_app(args)
demo.launch(
# server_name=args.server_name,
# server_port=args.server_port,
# share=args.share,
)
if __name__ == "__main__":
main()