Spaces:
Sleeping
Sleeping
| import argparse | |
| import json | |
| import pickle | |
| import sys | |
| from pathlib import Path | |
| import gradio as gr | |
| import numpy as np | |
| import pandas as pd | |
| import torch | |
| import torch.nn as nn | |
| import torch.nn.functional as F | |
| # Allow imports from ./deepseenet without requiring it to be a package. | |
| PROJECT_ROOT = Path(__file__).resolve().parent | |
| DEEPSEENET_DIR = PROJECT_ROOT / "deepseenet" | |
| sys.path.insert(0, str(DEEPSEENET_DIR)) | |
| from augmentations import get_val_transforms # noqa: E402 | |
| from model import DeepSeeNet # noqa: E402 | |
| N_CLASSES = { | |
| "DRUS": 3, | |
| "PIG": 2, | |
| } | |
| LABELS = { | |
| "DRUS": ["small_none", "medium", "large"], | |
| "PIG": ["no_pigment", "pigment"], | |
| } | |
| APP_CSS = """ | |
| #terms-overlay { | |
| position: fixed; | |
| inset: 0; | |
| z-index: 9999; | |
| display: flex; | |
| align-items: center; | |
| justify-content: center; | |
| padding: 32px; | |
| background: rgba(255, 255, 255, 0.72); | |
| backdrop-filter: blur(3px); | |
| -webkit-backdrop-filter: blur(3px); | |
| } | |
| #terms-card { | |
| width: min(800px, calc(100vw - 64px)); | |
| max-height: 86vh; | |
| overflow-y: auto; | |
| padding: 24px 28px; | |
| border: 1px solid #d9d9df; | |
| border-radius: 8px; | |
| background: #f3f3f5; | |
| box-shadow: 0 12px 36px rgba(0, 0, 0, 0.12); | |
| color: #1f2933; | |
| } | |
| #terms-card h1 { | |
| margin-top: 0; | |
| font-size: 1.55rem; | |
| } | |
| #terms-card h2 { | |
| margin-top: 1.35rem; | |
| font-size: 1.15rem; | |
| } | |
| #terms-card p, | |
| #terms-card ul { | |
| font-size: 0.95rem; | |
| line-height: 1.55; | |
| margin-left: 1.1rem; | |
| } | |
| #terms-card li { | |
| margin-bottom: 0.35rem; | |
| } | |
| #acknowledge-btn { | |
| width: 100%; | |
| margin-top: 16px; | |
| padding: 10px 14px; | |
| border: none; | |
| border-radius: 6px; | |
| background: #2563eb; | |
| color: white; | |
| font-weight: 600; | |
| cursor: pointer; | |
| } | |
| #acknowledge-btn:hover { | |
| background: #1d4ed8; | |
| } | |
| """ | |
| ACKNOWLEDGEMENT_TEXT = "I acknowledge and continue" | |
| TERMS_OVERLAY_HTML = f""" | |
| <div id="terms-overlay"> | |
| <div id="terms-card"> | |
| <h1>AMD Risk Prediction Demo — Research Use Only</h1> | |
| <p>This demo is a modernized research implementation based on the original AMD risk prediction work. It is provided solely for research, educational, and reproducibility purposes.</p> | |
| <h2>Not for Clinical Use</h2> | |
| <p>This Gradio demo and its associated model outputs are provided for research, educational, and reproducibility purposes only, and are not intended for clinical diagnosis, treatment, screening, triage, patient management, or any other medical decision-making purpose.</p> | |
| <p>This demo implementation has not been clinically validated for routine patient care and has not been reviewed, cleared, or approved by any regulatory authority, including the U.S. Food and Drug Administration.</p> | |
| <h2>Data Privacy</h2> | |
| <p>Do not upload protected health information, personally identifiable information, or patient data unless you have the necessary rights, permissions, and safeguards to do so.</p> | |
| <h2>User Responsibility</h2> | |
| <p>By using this demo, you acknowledge that:</p> | |
| <ul> | |
| <li>You understand this demo is for research and educational purposes only.</li> | |
| <li>You will not use the outputs for diagnosis, treatment, screening, triage, or clinical decision-making.</li> | |
| <li>You are responsible for how you interpret and use any results generated by the demo.</li> | |
| <li>You will not upload protected health information or identifiable patient data unless you are authorized to do so.</li> | |
| </ul> | |
| <h2>Attribution</h2> | |
| <p>This project is a modernized reimplementation of the DeepSeeNet AMD risk prediction work. Please cite the original publication when using this project:</p> | |
| <p>Peng Y, Keenan TD, Chen Q, Agrón E, Allot A, Wong WT, Chew EY, Lu Z. Predicting risk of late age-related macular degeneration using deep learning. NPJ Digital Medicine. 2020 Aug 27;3(1):111.</p> | |
| <h2>License and Warranty Disclaimer</h2> | |
| <p>Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the “Software”), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:</p> | |
| <p>The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.</p> | |
| <p>THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.</p> | |
| <button id="acknowledge-btn" onclick="document.getElementById('terms-overlay').remove();">{ACKNOWLEDGEMENT_TEXT}</button> | |
| </div> | |
| </div> | |
| """ | |
| class AlbumentationsTransform: | |
| def __init__(self, transform): | |
| self.transform = transform | |
| def __call__(self, image): | |
| return self.transform(image=np.asarray(image))["image"] | |
| class FeatureModel(nn.Module): | |
| """ | |
| Wraps DeepSeeNet and captures the input to the final Linear layer. | |
| This should match the feature extraction used in deepseenet/extract_features.py. | |
| """ | |
| def __init__(self, model: nn.Module): | |
| super().__init__() | |
| self.model = model | |
| self.features = None | |
| final_linear = self._find_last_linear(model) | |
| final_linear.register_forward_pre_hook(self._capture_penultimate) | |
| def _find_last_linear(model: nn.Module) -> nn.Linear: | |
| last_linear = None | |
| for module in model.modules(): | |
| if isinstance(module, nn.Linear): | |
| last_linear = module | |
| if last_linear is None: | |
| raise RuntimeError("Could not find final nn.Linear layer in DeepSeeNet.") | |
| return last_linear | |
| def _capture_penultimate(self, module, inputs): | |
| x = inputs[0] | |
| if isinstance(x, (tuple, list)): | |
| x = x[0] | |
| if x.ndim > 2: | |
| x = torch.flatten(x, start_dim=1) | |
| self.features = x.detach() | |
| def forward(self, x): | |
| self.features = None | |
| out = self.model(x) | |
| if isinstance(out, (tuple, list)): | |
| logits = out[0] | |
| else: | |
| logits = out | |
| if self.features is None: | |
| raise RuntimeError("Feature hook did not capture penultimate features.") | |
| return logits, self.features | |
| def parse_args(): | |
| parser = argparse.ArgumentParser() | |
| parser.add_argument("--checkpoint-folder", default="deepseenet/weights") | |
| parser.add_argument("--drusen-weights", default=None) | |
| parser.add_argument("--pigment-weights", default=None) | |
| parser.add_argument("--backbone", default="inception_v3") | |
| parser.add_argument("--image-size", type=int, default=1024) | |
| parser.add_argument( | |
| "--late-amd-model-dir", | |
| default="runs/stage2_late_amd_deep_clinical_block4_p001", | |
| ) | |
| parser.add_argument( | |
| "--anyga-model-dir", | |
| default="runs/stage2_anyga_deep_clinical_block2_p001", | |
| ) | |
| parser.add_argument( | |
| "--nv-model-dir", | |
| default="runs/stage2_nv_deep_clinical_block2_p001", | |
| ) | |
| parser.add_argument("--max-years", type=int, default=12) | |
| parser.add_argument("--server-name", default="127.0.0.1") | |
| parser.add_argument("--server-port", type=int, default=7860) | |
| parser.add_argument("--share", action="store_true") | |
| return parser.parse_args() | |
| def resolve_path(path): | |
| path = Path(path) | |
| if path.is_absolute(): | |
| return path | |
| return PROJECT_ROOT / path | |
| def load_deepseenet_model(path, task, backbone, device): | |
| path = resolve_path(path) | |
| checkpoint = torch.load(path, map_location=device) | |
| checkpoint_args = checkpoint.get("args", {}) if isinstance(checkpoint, dict) else {} | |
| model = DeepSeeNet( | |
| n_classes=N_CLASSES[task], | |
| backbone=checkpoint_args.get("backbone", backbone), | |
| pretrained=False, | |
| ).to(device) | |
| state_dict = checkpoint["model"] if isinstance(checkpoint, dict) and "model" in checkpoint else checkpoint | |
| model.load_state_dict(state_dict) | |
| model.eval() | |
| return FeatureModel(model).to(device).eval() | |
| def load_survival_bundle(model_dir): | |
| model_dir = resolve_path(model_dir) | |
| with open(model_dir / "cox_model.pkl", "rb") as f: | |
| cox_model = pickle.load(f) | |
| with open(model_dir / "scaler.pkl", "rb") as f: | |
| scaler = pickle.load(f) | |
| with open(model_dir / "config.json", "r") as f: | |
| config = json.load(f) | |
| return { | |
| "model_dir": str(model_dir), | |
| "cox_model": cox_model, | |
| "scaler": scaler, | |
| "config": config, | |
| "feature_cols": config["feature_cols"], | |
| } | |
| def load_image(image, transform, device): | |
| if image is None: | |
| raise ValueError("Please upload both left and right images.") | |
| image = image.convert("RGB") | |
| return transform(image).unsqueeze(0).to(device) | |
| def run_feature_model(model, image_tensor, task): | |
| logits, features = model(image_tensor) | |
| logits_1d = logits[0].detach().cpu() | |
| probs = F.softmax(logits_1d, dim=0).numpy() | |
| pred = int(np.argmax(probs)) | |
| features_1d = features[0].detach().cpu().numpy().astype(np.float32) | |
| return { | |
| "prediction": pred, | |
| "label": LABELS[task][pred], | |
| "confidence": float(probs[pred]), | |
| "probabilities": { | |
| LABELS[task][i]: float(probs[i]) | |
| for i in range(len(LABELS[task])) | |
| }, | |
| "features": features_1d, | |
| } | |
| def make_feature_dict(left_drus, right_drus, left_pig, right_pig, age, smkever): | |
| feature_dict = {} | |
| # Structured grading-style features. | |
| # These allow the app to also run a stage1 grading_clinical Cox model | |
| # if you point the endpoint directory to one. | |
| feature_dict["LE_DRUS"] = float(left_drus["prediction"]) | |
| feature_dict["RE_DRUS"] = float(right_drus["prediction"]) | |
| feature_dict["LE_PIG"] = float(left_pig["prediction"]) | |
| feature_dict["RE_PIG"] = float(right_pig["prediction"]) | |
| # DeepSeeNet hidden features. Paper-faithful order: | |
| # LE_DRUS, RE_DRUS, LE_PIG, RE_PIG. | |
| for i, value in enumerate(left_drus["features"]): | |
| feature_dict[f"LE_DRUS_{i:03d}"] = float(value) | |
| for i, value in enumerate(right_drus["features"]): | |
| feature_dict[f"RE_DRUS_{i:03d}"] = float(value) | |
| for i, value in enumerate(left_pig["features"]): | |
| feature_dict[f"LE_PIG_{i:03d}"] = float(value) | |
| for i, value in enumerate(right_pig["features"]): | |
| feature_dict[f"RE_PIG_{i:03d}"] = float(value) | |
| # Clinical variables used by the Cox models. | |
| feature_dict["age"] = float(age) | |
| feature_dict["smkever"] = float(smkever) | |
| return feature_dict | |
| def predict_survival(bundle, feature_dict, max_years): | |
| feature_cols = bundle["feature_cols"] | |
| missing = [c for c in feature_cols if c not in feature_dict] | |
| if missing: | |
| raise ValueError( | |
| "The selected Cox model requires features that the app could not build:\n" | |
| + "\n".join(missing[:20]) | |
| ) | |
| x = np.array([[feature_dict[c] for c in feature_cols]], dtype=np.float32) | |
| x_scaled = bundle["scaler"].transform(x) | |
| x_df = pd.DataFrame(x_scaled, columns=feature_cols) | |
| risk_score = float(bundle["cox_model"].predict_partial_hazard(x_df).values.reshape(-1)[0]) | |
| years = list(range(1, max_years + 1)) | |
| surv = bundle["cox_model"].predict_survival_function(x_df, times=years) | |
| rows = [] | |
| for year in years: | |
| survival_prob = float(surv.loc[year].values[0]) | |
| risk = 1.0 - survival_prob | |
| rows.append( | |
| { | |
| "Year": year, | |
| "Survival probability": survival_prob, | |
| "Risk probability": risk, | |
| "Risk (%)": 100.0 * risk, | |
| } | |
| ) | |
| return risk_score, pd.DataFrame(rows) | |
| def format_probs(probabilities): | |
| return " | ".join( | |
| f"{label}: {prob:.3f}" | |
| for label, prob in probabilities.items() | |
| ) | |
| def make_app(args): | |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| checkpoint_folder = resolve_path(args.checkpoint_folder) | |
| drusen_weights = args.drusen_weights or str(checkpoint_folder / "drus.pt") | |
| pigment_weights = args.pigment_weights or str(checkpoint_folder / "pig.pt") | |
| transform = AlbumentationsTransform(get_val_transforms(args.image_size)) | |
| drus_model = load_deepseenet_model( | |
| drusen_weights, | |
| task="DRUS", | |
| backbone=args.backbone, | |
| device=device, | |
| ) | |
| pig_model = load_deepseenet_model( | |
| pigment_weights, | |
| task="PIG", | |
| backbone=args.backbone, | |
| device=device, | |
| ) | |
| survival_bundles = { | |
| "Late AMD": load_survival_bundle(args.late_amd_model_dir), | |
| "Any GA": load_survival_bundle(args.anyga_model_dir), | |
| "NV": load_survival_bundle(args.nv_model_dir), | |
| } | |
| endpoint_to_label = { | |
| "Late AMD": "late AMD", | |
| "Any GA": "geographic atrophy", | |
| "NV": "neovascular AMD", | |
| } | |
| def run(left_image, right_image, endpoint, age, smkever, prediction_year): | |
| left_tensor = load_image(left_image, transform, device) | |
| right_tensor = load_image(right_image, transform, device) | |
| left_drus = run_feature_model(drus_model, left_tensor, "DRUS") | |
| right_drus = run_feature_model(drus_model, right_tensor, "DRUS") | |
| left_pig = run_feature_model(pig_model, left_tensor, "PIG") | |
| right_pig = run_feature_model(pig_model, right_tensor, "PIG") | |
| feature_dict = make_feature_dict( | |
| left_drus=left_drus, | |
| right_drus=right_drus, | |
| left_pig=left_pig, | |
| right_pig=right_pig, | |
| age=age, | |
| smkever=smkever, | |
| ) | |
| bundle = survival_bundles[endpoint] | |
| risk_score, risk_df = predict_survival( | |
| bundle=bundle, | |
| feature_dict=feature_dict, | |
| max_years=args.max_years, | |
| ) | |
| year_row = risk_df[risk_df["Year"] == int(prediction_year)].iloc[0] | |
| selected_risk = float(year_row["Risk probability"]) | |
| selected_survival = float(year_row["Survival probability"]) | |
| summary_rows = [ | |
| ["Endpoint", endpoint_to_label[endpoint]], | |
| ["Prediction year", f"{int(prediction_year)} years"], | |
| ["Risk", f"{100.0 * selected_risk:.2f}%"], | |
| ["Survival probability", f"{100.0 * selected_survival:.2f}%"], | |
| ["Cox partial hazard", f"{risk_score:.4f}"], | |
| # ["Cox model directory", bundle["model_dir"]], | |
| ["Number of Cox features", len(bundle["feature_cols"])], | |
| ] | |
| grading_rows = [ | |
| [ | |
| "Left", | |
| left_drus["label"], | |
| f"{left_drus['confidence']:.3f}", | |
| format_probs(left_drus["probabilities"]), | |
| left_pig["label"], | |
| f"{left_pig['confidence']:.3f}", | |
| format_probs(left_pig["probabilities"]), | |
| ], | |
| [ | |
| "Right", | |
| right_drus["label"], | |
| f"{right_drus['confidence']:.3f}", | |
| format_probs(right_drus["probabilities"]), | |
| right_pig["label"], | |
| f"{right_pig['confidence']:.3f}", | |
| format_probs(right_pig["probabilities"]), | |
| ], | |
| ] | |
| # Clean display formatting. | |
| display_risk_df = risk_df.copy() | |
| display_risk_df["Survival probability"] = display_risk_df["Survival probability"].map(lambda x: f"{x:.4f}") | |
| display_risk_df["Risk probability"] = display_risk_df["Risk probability"].map(lambda x: f"{x:.4f}") | |
| display_risk_df["Risk (%)"] = display_risk_df["Risk (%)"].map(lambda x: f"{x:.2f}") | |
| return summary_rows, grading_rows, display_risk_df | |
| model_info = ( | |
| "# AMD Risk Prediction\n\n" | |
| f"DeepSeeNet backbone: `{args.backbone}` | " | |
| f"Input size: `{args.image_size} × {args.image_size}` | " | |
| f"Device: `{device.type}` " | |
| # f"Drusen weights: `{drusen_weights}` | " | |
| # f"Pigment weights: `{pigment_weights}`" | |
| ) | |
| with gr.Blocks(title="AMD Risk Prediction", css=APP_CSS) as demo: | |
| gr.HTML(TERMS_OVERLAY_HTML) | |
| gr.Markdown(model_info) | |
| with gr.Row(): | |
| left_image = gr.Image(type="pil", label="Left eye CFP") | |
| right_image = gr.Image(type="pil", label="Right eye CFP") | |
| with gr.Row(): | |
| endpoint = gr.Dropdown( | |
| choices=["Late AMD", "Any GA", "NV"], | |
| value="Late AMD", | |
| label="Endpoint", | |
| ) | |
| prediction_year = gr.Slider( | |
| minimum=1, | |
| maximum=args.max_years, | |
| value=5, | |
| step=1, | |
| label="Prediction horizon, years", | |
| ) | |
| with gr.Row(): | |
| age = gr.Number(value=70, label="Age") | |
| smkever = gr.Dropdown( | |
| choices=[ | |
| ("Never smoker / smkever=0", 0), | |
| ("Ever smoker / smkever=1", 1), | |
| ], | |
| value=1, | |
| label="Smoking history", | |
| ) | |
| button = gr.Button("Run AMD risk prediction") | |
| summary = gr.Dataframe( | |
| headers=["Item", "Result"], | |
| label="Risk summary", | |
| ) | |
| grading = gr.Dataframe( | |
| headers=[ | |
| "Eye", | |
| "Drusen prediction", | |
| "Drusen confidence", | |
| "Drusen probabilities", | |
| "Pigment prediction", | |
| "Pigment confidence", | |
| "Pigment probabilities", | |
| ], | |
| label="DeepSeeNet image outputs", | |
| ) | |
| risk_table = gr.Dataframe( | |
| label="Risk curve", | |
| ) | |
| button.click( | |
| run, | |
| inputs=[ | |
| left_image, | |
| right_image, | |
| endpoint, | |
| age, | |
| smkever, | |
| prediction_year, | |
| ], | |
| outputs=[ | |
| summary, | |
| grading, | |
| risk_table, | |
| ], | |
| ) | |
| return demo | |
| def main(): | |
| args = parse_args() | |
| demo = make_app(args) | |
| demo.launch( | |
| # server_name=args.server_name, | |
| # server_port=args.server_port, | |
| # share=args.share, | |
| ) | |
| if __name__ == "__main__": | |
| main() |