| import os |
| import time |
| import json |
| import requests |
| import asyncio |
| import random |
| from datetime import datetime |
| from typing import Dict, List, Optional |
| from fastapi import FastAPI, HTTPException, BackgroundTasks |
| from fastapi.responses import StreamingResponse |
| import uvicorn |
| from pydantic import BaseModel |
| from shared.models import ChatRequest, ChatResponse, ChatMessage, WorkerStatus, NodeType |
| from shared.node_types import NodeRegistrationRequest, NodeRegistrationResponse, NodeListResponse, NodeStatus, ServiceOffering, ServiceRequest |
| from shared.approval_system import smilyai_approval_system, ApprovalType |
| from shared.credits_system import credits_system, CreditReason, TransactionType |
| from shared.fault_tolerance import fault_tolerance_manager, FailureType, RecoveryStrategy |
| from shared.load_balancer import load_balancer, Task, TaskPriority |
| from shared.chat_history import save_detailed_chat_log, initialize_chat_file |
|
|
| app = FastAPI( |
| title="Multi-Node Hugging Face API Gateway", |
| description="API Gateway that routes requests to specialized worker nodes", |
| version="1.0.0" |
| ) |
|
|
| |
| initialize_chat_file() |
|
|
| |
| WORKER_NODES = { |
| "sam-x-nano": os.getenv("NANO_WORKER_URL", "http://nano-worker:8000"), |
| "sam-x-mini": os.getenv("MINI_WORKER_URL", "http://mini-worker:8000"), |
| "sam-x-fast": os.getenv("FAST_WORKER_URL", "http://fast-worker:8000"), |
| "sam-x-large": os.getenv("LARGE_WORKER_URL", "http://large-worker:8000"), |
| "sam-large-2": os.getenv("SAM2_WORKER_URL", "http://sam2-worker:8000"), |
| "universal": os.getenv("UNIVERSAL_WORKER_URL", "http://universal-worker:8000"), |
| } |
|
|
| |
| worker_status = {} |
|
|
| @app.on_event('startup') |
| def startup_event(): |
| print("Starting Multi-Node Hugging Face API Gateway...") |
| |
| for model, url in WORKER_NODES.items(): |
| worker_status[model] = {"active": True, "last_check": time.time(), "load": 0.0} |
|
|
|
|
| def route_to_worker(chat_request: ChatRequest) -> Dict: |
| """ |
| Route the request to the appropriate worker node based on model |
| """ |
| model = chat_request.model.lower() |
|
|
| |
| if model not in WORKER_NODES: |
| |
| available_models = [m for m in WORKER_NODES.keys() if model in m or m in model] |
| if available_models: |
| model = available_models[0] |
| else: |
| raise HTTPException(status_code=400, detail=f"Model {chat_request.model} not available") |
|
|
| worker_url = WORKER_NODES[model] |
|
|
| |
| try: |
| response = requests.post( |
| f"{worker_url}/chat/completions", |
| json=chat_request.dict(), |
| timeout=300, |
| stream=chat_request.stream |
| ) |
| response.raise_for_status() |
|
|
| if chat_request.stream: |
| |
| return {"streaming": True, "response": response} |
| else: |
| return response.json() |
| except requests.exceptions.RequestException as e: |
| print(f"Error contacting worker {worker_url}: {str(e)}") |
| worker_status[model] = {"active": False, "last_check": time.time(), "load": 0.0} |
| raise HTTPException(status_code=503, detail=f"Worker for model {model} is not available") |
| except Exception as e: |
| print(f"Unexpected error contacting worker {worker_url}: {str(e)}") |
| raise HTTPException(status_code=500, detail="Internal server error") |
|
|
|
|
| def route_streaming_request(chat_request: ChatRequest): |
| """ |
| Handle streaming request by forwarding the stream from worker to client |
| """ |
| model = chat_request.model.lower() |
|
|
| |
| if model not in WORKER_NODES: |
| |
| available_models = [m for m in WORKER_NODES.keys() if model in m or m in model] |
| if available_models: |
| model = available_models[0] |
| else: |
| raise HTTPException(status_code=400, detail=f"Model {chat_request.model} not available") |
|
|
| worker_url = WORKER_NODES[model] |
|
|
| import requests |
| |
| worker_response = requests.post( |
| f"{worker_url}/chat/completions", |
| json=chat_request.dict(), |
| timeout=300, |
| stream=True |
| ) |
|
|
| |
| def generate(): |
| for chunk in worker_response.iter_lines(): |
| if chunk: |
| decoded_chunk = chunk.decode('utf-8') |
| yield decoded_chunk + "\n" |
|
|
| return StreamingResponse(generate(), media_type="text/event-stream") |
|
|
|
|
| @app.post("/chat/completions", response_model=ChatResponse) |
| async def chat_completions(request: ChatRequest, background_tasks: BackgroundTasks): |
| """ |
| Main chat completions endpoint - routes to appropriate worker |
| """ |
| start_time = time.time() |
|
|
| try: |
| |
| if request.stream: |
| |
| return route_streaming_request(request) |
|
|
| |
| worker_response = route_to_worker(request) |
|
|
| |
| processing_time = time.time() - start_time |
|
|
| |
| response_content = "" |
| if "choices" in worker_response and len(worker_response["choices"]) > 0: |
| response_content = worker_response["choices"][0].get("message", {}).get("content", "") |
|
|
| |
| background_tasks.add_task( |
| save_detailed_chat_log, |
| request.dict(), |
| response_content, |
| request.model, |
| processing_time |
| ) |
|
|
| return worker_response |
|
|
| except HTTPException: |
| |
| raise |
| except Exception as e: |
| print(f"Error in chat_completions: {str(e)}") |
| raise HTTPException(status_code=500, detail="Internal server error") |
|
|
|
|
| @app.get("/models") |
| async def list_models(): |
| """ |
| List available models |
| """ |
| available_models = [model for model, url in WORKER_NODES.items() |
| if worker_status.get(model, {}).get("active", True)] |
| |
| return { |
| "object": "list", |
| "data": [ |
| { |
| "id": model, |
| "object": "model", |
| "created": int(time.time()), |
| "owned_by": "multinode-hf-api" |
| } |
| for model in available_models |
| ] |
| } |
|
|
|
|
| @app.get("/health") |
| async def health_check(): |
| """ |
| Health check endpoint |
| """ |
| active_workers = {model: status for model, status in worker_status.items() |
| if status.get("active", False)} |
| |
| return { |
| "status": "healthy" if active_workers else "no_active_workers", |
| "active_workers": list(active_workers.keys()), |
| "total_workers": len(WORKER_NODES) |
| } |
|
|
|
|
| @app.get("/worker-status") |
| async def get_worker_status(): |
| """ |
| Get detailed status of all workers |
| """ |
| return worker_status |
|
|
|
|
| @app.post("/chat") |
| async def simple_chat(message: str, model: str = "sam-x-nano", max_tokens: int = 512): |
| """ |
| Simplified chat endpoint for basic interactions |
| """ |
| chat_request = ChatRequest( |
| messages=[ChatMessage(role="user", content=message)], |
| model=model, |
| max_tokens=max_tokens |
| ) |
| |
| worker_response = route_to_worker(chat_request) |
| |
| if "choices" in worker_response and len(worker_response["choices"]) > 0: |
| return {"response": worker_response["choices"][0]["message"]["content"]} |
| else: |
| raise HTTPException(status_code=500, detail="No response from worker") |
|
|
|
|
| |
| marketplace_services = [ |
| ServiceOffering( |
| service_id="storage_1", |
| service_name="SACCP Cloud Storage", |
| description="Distributed storage on SACCP network", |
| price_per_unit=0.01, |
| unit_type="gb_month" |
| ), |
| ServiceOffering( |
| service_id="compute_1", |
| service_name="SACCP Compute Power", |
| description="Distributed computing on SACCP network", |
| price_per_unit=0.10, |
| unit_type="compute_hour" |
| ), |
| ServiceOffering( |
| service_id="ai_model_hosting_1", |
| service_name="AI Model Hosting", |
| description="Host and serve AI models on SACCP network", |
| price_per_unit=0.05, |
| unit_type="model_hour" |
| ) |
| ] |
|
|
| |
| approved_head_nodes = set() |
|
|
| @app.post("/saccp/register-worker", response_model=NodeRegistrationResponse) |
| async def register_worker(registration_request: NodeRegistrationRequest): |
| """ |
| Register a worker node with the SACCP network |
| """ |
| |
| if registration_request.capabilities.node_type == NodeType.HEAD: |
| is_approved = smilyai_approval_system.is_approved( |
| registration_request.node_id, |
| ApprovalType.HEAD_NODE |
| ) |
|
|
| if not is_approved: |
| |
| request_id = smilyai_approval_system.request_approval( |
| node_id=registration_request.node_id, |
| endpoint=registration_request.endpoint, |
| request_type=ApprovalType.HEAD_NODE, |
| request_data=registration_request.dict(), |
| reason="HEAD node registration", |
| requested_by="system" |
| ) |
|
|
| |
| |
| pending_requests = smilyai_approval_system.get_pending_requests() |
| is_still_pending = any(req.request_id == request_id for req in pending_requests) |
|
|
| if is_still_pending: |
| return NodeRegistrationResponse( |
| success=False, |
| node_id=registration_request.node_id, |
| message="HEAD node registration requires approval, submitted for review", |
| approval_status="pending" |
| ) |
| else: |
| |
| is_approved = smilyai_approval_system.is_approved( |
| registration_request.node_id, |
| ApprovalType.HEAD_NODE |
| ) |
| if is_approved: |
| |
| approved_head_nodes.add(registration_request.node_id) |
| |
| fault_tolerance_manager.register_node( |
| registration_request.node_id, |
| registration_request.capabilities.node_type, |
| registration_request.capabilities.dict() |
| ) |
|
|
| return NodeRegistrationResponse( |
| success=True, |
| node_id=registration_request.node_id, |
| message=f"Successfully registered {registration_request.capabilities.node_type} node", |
| approval_status="approved" |
| ) |
| else: |
| return NodeRegistrationResponse( |
| success=False, |
| node_id=registration_request.node_id, |
| message="HEAD node registration denied", |
| approval_status="rejected" |
| ) |
| else: |
| |
| approved_head_nodes.add(registration_request.node_id) |
| |
| fault_tolerance_manager.register_node( |
| registration_request.node_id, |
| registration_request.capabilities.node_type, |
| registration_request.capabilities.dict() |
| ) |
| else: |
| |
| fault_tolerance_manager.register_node( |
| registration_request.node_id, |
| registration_request.capabilities.node_type, |
| registration_request.capabilities.dict() |
| ) |
|
|
| |
| load_balancer.register_node( |
| registration_request.node_id, |
| registration_request.capabilities.node_type, |
| registration_request.capabilities.dict() |
| ) |
|
|
| |
| |
| return NodeRegistrationResponse( |
| success=True, |
| node_id=registration_request.node_id, |
| message=f"Successfully registered {registration_request.capabilities.node_type} node", |
| approval_status="approved" |
| ) |
|
|
|
|
| @app.post("/saccp/heartbeat") |
| async def heartbeat(worker_id: str): |
| """ |
| Worker heartbeat to maintain connection with the network |
| """ |
| |
| ft_success = fault_tolerance_manager.heartbeat(worker_id) |
|
|
| |
| lb_success = load_balancer.heartbeat_node(worker_id) |
|
|
| if ft_success and lb_success: |
| return {"status": "alive", "timestamp": int(time.time())} |
| else: |
| status = "alive" if ft_success or lb_success else "unknown_node" |
| return {"status": status, "timestamp": int(time.time())} |
|
|
|
|
| @app.get("/saccp/next-task") |
| async def get_next_task(worker_id: str): |
| """ |
| Get the next task for a worker |
| """ |
| |
| |
| |
| |
| return {} |
|
|
|
|
| @app.post("/saccp/task-result") |
| async def report_task_result(worker_id: str, task_id: str, result: Dict): |
| """ |
| Report task completion result |
| """ |
| |
| success = fault_tolerance_manager.record_task_completion(task_id, worker_id) |
|
|
| |
| |
| task_type = result.get('task_type', 'compute') |
|
|
| |
| if task_type == 'inference': |
| credits_awarded = 0.1 |
| elif task_type == 'training': |
| credits_awarded = 1.0 |
| else: |
| credits_awarded = 0.5 |
|
|
| |
| credits_system.add_credits(worker_id, credits_awarded, CreditReason.TASK_COMPLETION, |
| metadata={"task_id": task_id, "task_type": task_type}) |
|
|
| return { |
| "status": "received", |
| "credits_awarded": credits_awarded, |
| "task_completed": success, |
| "new_balance": credits_system.get_balance(worker_id).balance |
| } |
|
|
|
|
| @app.post("/saccp/task-error") |
| async def report_task_error(worker_id: str, task_id: str, error: str): |
| """ |
| Report task error to the network |
| """ |
| |
| recovery_strategy = fault_tolerance_manager.record_task_failure( |
| task_id, worker_id, FailureType.TASK_TIMEOUT, error |
| ) |
|
|
| return { |
| "status": "error_received", |
| "recovery_strategy": recovery_strategy.value if recovery_strategy else "none" |
| } |
|
|
|
|
| @app.get("/saccp/stats") |
| async def get_network_stats(): |
| """ |
| Get network statistics |
| """ |
| |
| health_stats = fault_tolerance_manager.get_network_health() |
|
|
| return health_stats |
|
|
|
|
| @app.get("/saccp/health-detailed") |
| async def get_detailed_health(): |
| """ |
| Get detailed network health including failed nodes |
| """ |
| health_stats = fault_tolerance_manager.get_network_health() |
| failed_nodes = fault_tolerance_manager.get_failed_nodes() |
|
|
| return { |
| "network_health": health_stats, |
| "failed_nodes": failed_nodes, |
| "timestamp": int(time.time()) |
| } |
|
|
|
|
| @app.get("/saccp/nodes") |
| async def get_nodes(): |
| """ |
| Get list of nodes in the network |
| """ |
| |
| node_status = load_balancer.get_node_status() |
|
|
| return NodeListResponse( |
| nodes=node_status, |
| total_nodes=len(node_status), |
| online_nodes=len([n for n in node_status if n["is_available"]]) |
| ) |
|
|
|
|
| @app.post("/saccp/submit-task") |
| async def submit_task_for_distribution(task_data: Dict): |
| """ |
| Submit a task for distribution across the network |
| """ |
| task_id = task_data.get("task_id", f"task_{int(time.time())}_{random.randint(1000, 9999)}") |
| task_type = task_data.get("task_type", "compute") |
|
|
| |
| priority_str = task_data.get("priority", "normal") |
| priority_map = { |
| "low": TaskPriority.LOW, |
| "normal": TaskPriority.NORMAL, |
| "high": TaskPriority.HIGH, |
| "critical": TaskPriority.CRITICAL |
| } |
| priority = priority_map.get(priority_str, TaskPriority.NORMAL) |
|
|
| |
| resource_requirements = task_data.get("resource_requirements", {}) |
|
|
| |
| task = Task( |
| task_id=task_id, |
| task_type=task_type, |
| priority=priority, |
| resource_requirements=resource_requirements, |
| estimated_duration=task_data.get("estimated_duration", 30.0), |
| created_at=time.time() |
| ) |
|
|
| |
| assigned_node = load_balancer.submit_task(task) |
|
|
| return { |
| "task_id": task_id, |
| "status": "submitted", |
| "assigned_node": assigned_node, |
| "timestamp": int(time.time()) |
| } |
|
|
|
|
| @app.get("/saccp/load-balancer-status") |
| async def get_load_balancer_status(): |
| """ |
| Get status of the load balancer |
| """ |
| node_status = load_balancer.get_node_status() |
| queue_status = load_balancer.get_task_queue_status() |
|
|
| return { |
| "node_status": node_status, |
| "task_queue": queue_status, |
| "timestamp": int(time.time()) |
| } |
|
|
|
|
| @app.get("/credits/balance/{node_id}") |
| async def get_credit_balance(node_id: str): |
| """ |
| Get credit balance for a node |
| """ |
| balance = credits_system.get_balance(node_id) |
| return balance |
|
|
|
|
| @app.get("/credits/earn/{node_id}/{amount}") |
| async def earn_credits(node_id: str, amount: float, reason: str = "task_completion"): |
| """ |
| Endpoint for nodes to earn credits by contributing resources |
| """ |
| try: |
| credit_reason = CreditReason(reason) if reason in CreditReason.__members__ else CreditReason.RESOURCE_CONTRIBUTION |
| success = credits_system.add_credits(node_id, amount, credit_reason) |
|
|
| if success: |
| balance = credits_system.get_balance(node_id) |
| return {"status": "success", "new_balance": balance.balance} |
| else: |
| return {"status": "failed", "message": "Failed to add credits"} |
| except Exception as e: |
| return {"status": "error", "message": str(e)} |
|
|
|
|
| @app.get("/marketplace/services") |
| async def get_marketplace_services(): |
| """ |
| Get list of available services in the marketplace |
| """ |
| return marketplace_services |
|
|
|
|
| @app.post("/marketplace/purchase") |
| async def purchase_service(service_request: ServiceRequest): |
| """ |
| Purchase a service from the marketplace |
| """ |
| |
| service = None |
| for s in marketplace_services: |
| if s.service_id == service_request.service_id: |
| service = s |
| break |
|
|
| if not service: |
| raise HTTPException(status_code=404, detail="Service not found") |
|
|
| if not service.availability: |
| raise HTTPException(status_code=400, detail="Service not available") |
|
|
| |
| total_cost = service.price_per_unit * service_request.quantity |
|
|
| |
| success = credits_system.spend_credits( |
| service_request.node_id, |
| total_cost, |
| CreditReason.SERVICE_PURCHASE, |
| service.service_name, |
| metadata=service_request.parameters |
| ) |
|
|
| if not success: |
| raise HTTPException(status_code=400, detail="Insufficient credits") |
|
|
| |
| balance = credits_system.get_balance(service_request.node_id) |
|
|
| return { |
| "status": "success", |
| "service_id": service.service_id, |
| "service_name": service.service_name, |
| "cost": total_cost, |
| "remaining_balance": balance.balance |
| } |
|
|
|
|
| |
|
|
| @app.post("/credits/earn-resource-contribution") |
| async def earn_credits_for_resource_contribution(node_id: str, node_type: NodeType, duration_hours: float, |
| resource_amount: float = 1.0): |
| """ |
| Endpoint for nodes to earn credits by contributing resources to the network |
| Credits are awarded based on node type, duration, and amount of resources contributed |
| """ |
| |
| base_rates = { |
| NodeType.RAM: 0.5, |
| NodeType.DISK: 0.3, |
| NodeType.COMPUTE: 0.4, |
| NodeType.GPU: 1.0, |
| NodeType.TPU: 1.5, |
| NodeType.NPU: 1.2, |
| NodeType.HEAD: 0.8 |
| } |
|
|
| rate = base_rates.get(node_type, 0.4) |
| credits_to_earn = rate * duration_hours * resource_amount |
|
|
| success = credits_system.add_credits( |
| node_id, |
| credits_to_earn, |
| CreditReason.RESOURCE_CONTRIBUTION, |
| metadata={ |
| "node_type": node_type, |
| "duration_hours": duration_hours, |
| "resource_amount": resource_amount |
| } |
| ) |
|
|
| if success: |
| balance = credits_system.get_balance(node_id) |
| return { |
| "status": "success", |
| "credits_earned": credits_to_earn, |
| "new_balance": balance.balance |
| } |
| else: |
| return {"status": "failed", "message": "Failed to award credits"} |
|
|
|
|
| @app.get("/credits/top-contributors") |
| async def get_top_contributors(limit: int = 10): |
| """ |
| Get the top contributing nodes in the network |
| """ |
| top_nodes = credits_system.get_top_nodes_by_balance(limit) |
| return { |
| "top_contributors": top_nodes, |
| "total_nodes_in_network": len(top_nodes) |
| } |
|
|
|
|
| @app.get("/saccp/node-stats/{node_id}") |
| async def get_node_stats(node_id: str): |
| """ |
| Get comprehensive statistics for a node including credit information |
| """ |
| balance = credits_system.get_balance(node_id) |
| transactions = credits_system.get_transaction_history(node_id, limit=10) |
|
|
| return { |
| "node_id": node_id, |
| "credit_balance": balance, |
| "recent_transactions": transactions, |
| "status": "active" |
| } |
|
|
|
|
| if __name__ == "__main__": |
| uvicorn.run(app, host="0.0.0.0", port=7860) |