| |
| """ |
| MicroForge End-to-End Test Suite |
| Validates all modules work correctly on CPU. |
| """ |
|
|
| import torch |
| import time |
| import sys |
| import os |
|
|
| |
| sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) |
|
|
|
|
| def test_vae(): |
| """Test all VAE configurations.""" |
| from microforge.vae import MicroForgeVAE |
|
|
| print("=" * 60) |
| print("TEST: MicroForge VAE") |
| print("=" * 60) |
|
|
| for config in ['tiny', 'small', 'base']: |
| vae = MicroForgeVAE(config=config) |
| params = sum(p.numel() for p in vae.parameters()) |
|
|
| |
| x = torch.randn(1, 3, 256, 256) |
| x_recon, mu, logvar = vae(x) |
|
|
| assert x_recon.shape == x.shape, f"Recon shape mismatch: {x_recon.shape} vs {x.shape}" |
| assert not torch.isnan(mu).any(), "NaN in mu" |
| assert not torch.isnan(logvar).any(), "NaN in logvar" |
|
|
| |
| z = vae.get_latent(x) |
| x_dec = vae.decode(z) |
| assert x_dec.shape == x.shape |
|
|
| |
| kl = MicroForgeVAE.kl_loss(mu, logvar) |
| assert not torch.isnan(kl), "NaN in KL loss" |
|
|
| print(f" [{config:>5}] PASS | params={params:,} | latent={mu.shape} | KL={kl.item():.2f}") |
|
|
| print() |
|
|
|
|
| def test_backbone(): |
| """Test all backbone configurations.""" |
| from microforge.backbone import MicroForgeBackbone |
|
|
| print("=" * 60) |
| print("TEST: MicroForge Backbone") |
| print("=" * 60) |
|
|
| for config in ['tiny', 'small', 'base']: |
| lc = 16 if config == 'tiny' else 32 |
| backbone = MicroForgeBackbone(latent_channels=lc, config=config) |
| params = sum(p.numel() for p in backbone.parameters()) |
|
|
| z = torch.randn(1, lc, 8, 8) |
| t = torch.rand(1) |
| text_emb = torch.randn(1, 10, 768) |
| text_pooled = torch.randn(1, 768) |
|
|
| start = time.time() |
| v = backbone(z, t, text_emb, text_pooled) |
| elapsed = (time.time() - start) * 1000 |
|
|
| assert v.shape == z.shape, f"Output shape mismatch: {v.shape} vs {z.shape}" |
| assert not torch.isnan(v).any(), "NaN in velocity prediction" |
|
|
| print(f" [{config:>5}] PASS | params={params:,} | latency={elapsed:.0f}ms") |
|
|
| print() |
|
|
|
|
| def test_planner(): |
| """Test Recurrent Latent Planner.""" |
| from microforge.planner import RecurrentLatentPlanner |
|
|
| print("=" * 60) |
| print("TEST: Recurrent Latent Planner") |
| print("=" * 60) |
|
|
| planner = RecurrentLatentPlanner( |
| num_plan_tokens=32, dim=384, text_dim=768, latent_channels=32 |
| ) |
| params = sum(p.numel() for p in planner.parameters()) |
|
|
| |
| text_pooled = torch.randn(2, 768) |
| plan = planner.initialize_plan(text_pooled, batch_size=2) |
| assert plan.shape == (2, 32, 384), f"Plan shape: {plan.shape}" |
|
|
| |
| img_tokens = torch.randn(2, 64, 32) |
| t_emb = torch.randn(2, 384) |
| plan_out, output = planner(img_tokens, plan, t_emb) |
|
|
| assert plan_out.shape == (2, 32, 384) |
| assert output.shape == (2, 32, 768) |
| assert not torch.isnan(plan_out).any() |
| assert not torch.isnan(output).any() |
|
|
| |
| plan_next = planner.initialize_plan(text_pooled, 2, prev_plan=plan_out) |
| assert plan_next.shape == plan.shape |
|
|
| print(f" PASS | params={params:,} | plan_state={planner.get_plan_size_bytes()} bytes") |
| print() |
|
|
|
|
| def test_training(): |
| """Test training loop.""" |
| from microforge.vae import MicroForgeVAE |
| from microforge.backbone import MicroForgeBackbone |
| from microforge.planner import RecurrentLatentPlanner |
| from microforge.training import MicroForgeTrainer, FlowMatchingScheduler |
|
|
| print("=" * 60) |
| print("TEST: Training Pipeline") |
| print("=" * 60) |
|
|
| vae = MicroForgeVAE(config='tiny').eval() |
| backbone = MicroForgeBackbone(latent_channels=16, config='tiny') |
| planner = RecurrentLatentPlanner(num_plan_tokens=16, dim=256, text_dim=768, latent_channels=16) |
|
|
| trainer = MicroForgeTrainer(vae, backbone, planner, lr=1e-4, use_ema=True) |
|
|
| |
| scheduler = FlowMatchingScheduler() |
| t = scheduler.sample_timesteps(4, torch.device('cpu')) |
| assert t.min() >= 0 and t.max() <= 1, f"Timesteps out of range: {t}" |
|
|
| z_0 = torch.randn(4, 16, 4, 4) |
| noise = torch.randn_like(z_0) |
| z_t, v_target = scheduler.add_noise(z_0, noise, t) |
| assert z_t.shape == z_0.shape |
| assert v_target.shape == z_0.shape |
|
|
| |
| images = torch.randn(2, 3, 128, 128) |
| text_emb = torch.randn(2, 10, 768) |
| text_pooled = torch.randn(2, 768) |
|
|
| losses = [] |
| for i in range(5): |
| step_losses = trainer.train_step(images, text_emb, text_pooled) |
| losses.append(step_losses['flow']) |
| assert not any(torch.isnan(torch.tensor(v)) for v in step_losses.values()), \ |
| f"NaN in losses: {step_losses}" |
|
|
| print(f" 5 training steps: loss {losses[0]:.2f} -> {losses[-1]:.2f}") |
| print(f" PASS") |
| print() |
|
|
|
|
| def test_pipeline(): |
| """Test end-to-end inference pipeline.""" |
| from microforge.vae import MicroForgeVAE |
| from microforge.backbone import MicroForgeBackbone |
| from microforge.planner import RecurrentLatentPlanner |
| from microforge.pipeline import MicroForgePipeline, SimpleTextEncoder |
|
|
| print("=" * 60) |
| print("TEST: End-to-End Pipeline") |
| print("=" * 60) |
|
|
| vae = MicroForgeVAE(config='tiny') |
| backbone = MicroForgeBackbone(latent_channels=16, config='tiny') |
| planner = RecurrentLatentPlanner(num_plan_tokens=16, dim=256, text_dim=768, latent_channels=16) |
| text_enc = SimpleTextEncoder(embed_dim=768, num_layers=2) |
|
|
| pipeline = MicroForgePipeline(vae, backbone, text_enc, planner, device='cpu') |
|
|
| |
| tokens = torch.randint(0, 8192, (1, 10)) |
| start = time.time() |
| images = pipeline.text2img(tokens, height=128, width=128, num_steps=2, cfg_scale=1.0, seed=42) |
| t2i_time = time.time() - start |
|
|
| assert images.shape == (1, 3, 128, 128), f"Wrong output shape: {images.shape}" |
| assert images.min() >= -1 and images.max() <= 1, f"Range error: [{images.min()}, {images.max()}]" |
|
|
| print(f" text2img: {images.shape} in {t2i_time:.2f}s | PASS") |
|
|
| |
| params = pipeline.count_parameters() |
| print(f" Total params: {params['total']:,}") |
|
|
| |
| mem = pipeline.get_memory_estimate(512, 512) |
| print(f" Est. memory @512px: {mem['estimated_inference_mb']:.0f} MB") |
|
|
| print(f" PASS") |
| print() |
|
|
|
|
| def test_editing_pathway(): |
| """Test that editing pathway works (spatial concat).""" |
| from microforge.backbone import MicroForgeBackbone |
|
|
| print("=" * 60) |
| print("TEST: Editing Pathway (Spatial Concat)") |
| print("=" * 60) |
|
|
| backbone = MicroForgeBackbone(latent_channels=16, config='tiny') |
|
|
| |
| z_gen = torch.randn(1, 16, 8, 8) |
| t = torch.rand(1) |
| text_emb = torch.randn(1, 5, 768) |
| text_pooled = torch.randn(1, 768) |
|
|
| v_gen = backbone(z_gen, t, text_emb, text_pooled) |
| assert v_gen.shape == z_gen.shape, f"Gen output shape: {v_gen.shape}" |
|
|
| |
| z_edit = torch.randn(1, 16, 8, 16) |
| v_edit = backbone(z_edit, t, text_emb, text_pooled) |
| assert v_edit.shape == z_edit.shape, f"Edit output shape: {v_edit.shape}" |
|
|
| |
| v_target = v_edit[..., :8] |
| assert v_target.shape == z_gen.shape |
|
|
| print(f" Generation: {z_gen.shape} -> {v_gen.shape} | PASS") |
| print(f" Editing: {z_edit.shape} -> {v_edit.shape} | PASS") |
| print() |
|
|
|
|
| def main(): |
| print() |
| print("🔨 MicroForge Architecture Test Suite") |
| print("=" * 60) |
| print() |
|
|
| test_vae() |
| test_backbone() |
| test_planner() |
| test_training() |
| test_pipeline() |
| test_editing_pathway() |
|
|
| print("=" * 60) |
| print("✅ ALL TESTS PASSED") |
| print("=" * 60) |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|