| import asyncio |
| import json |
| import requests |
| import websockets |
| import time |
| import urllib.parse |
| from identity import MEPIdentity |
| import uuid |
| from typing import Optional |
|
|
| HUB_URL = "http://localhost:8000" |
| WS_URL = "ws://localhost:8000/ws" |
|
|
| def get_auth_url(identity: MEPIdentity): |
| ts = str(int(time.time())) |
| sig = identity.sign(identity.node_id, ts) |
| sig_safe = urllib.parse.quote(sig) |
| return f"{WS_URL}/{identity.node_id}?timestamp={ts}&signature={sig_safe}" |
|
|
| def submit_task(identity: MEPIdentity, payload: str, bounty: float, target: Optional[str] = None): |
| data = { |
| "consumer_id": identity.node_id, |
| "payload": payload, |
| "bounty": bounty |
| } |
| if target: |
| data["target_node"] = target |
| |
| payload_str = json.dumps(data) |
| headers = identity.get_auth_headers(payload_str) |
| headers["Content-Type"] = "application/json" |
| r = requests.post(f"{HUB_URL}/tasks/submit", data=payload_str, headers=headers) |
| return r.json() |
|
|
| def place_bid(identity: MEPIdentity, task_id: str): |
| data = { |
| "task_id": task_id, |
| "provider_id": identity.node_id |
| } |
| payload_str = json.dumps(data) |
| headers = identity.get_auth_headers(payload_str) |
| headers["Content-Type"] = "application/json" |
| r = requests.post(f"{HUB_URL}/tasks/bid", data=payload_str, headers=headers) |
| return r.json() |
|
|
| def complete_task(identity: MEPIdentity, task_id: str, result: str): |
| data = { |
| "task_id": task_id, |
| "provider_id": identity.node_id, |
| "result_payload": result |
| } |
| payload_str = json.dumps(data) |
| headers = identity.get_auth_headers(payload_str) |
| headers["Content-Type"] = "application/json" |
| r = requests.post(f"{HUB_URL}/tasks/complete", data=payload_str, headers=headers) |
| return r.json() |
|
|
| def get_balance(identity: MEPIdentity): |
| r = requests.get(f"{HUB_URL}/balance/{identity.node_id}") |
| return r.json().get("balance_seconds", 0.0) |
|
|
| async def test_three_markets(): |
| print("=" * 60) |
| print("Testing the 3 MEP Markets (+, 0, -)") |
| print("=" * 60) |
| |
| alice = MEPIdentity(f"alice_{uuid.uuid4().hex[:6]}.pem") |
| bob = MEPIdentity(f"bob_{uuid.uuid4().hex[:6]}.pem") |
| |
| requests.post(f"{HUB_URL}/register", json={"pubkey": alice.pub_pem}) |
| requests.post(f"{HUB_URL}/register", json={"pubkey": bob.pub_pem}) |
| |
| print(f"π© Alice (Consumer): {alice.node_id} | Starting Bal: {get_balance(alice)}") |
| print(f"π¦ Bob (Provider): {bob.node_id} | Starting Bal: {get_balance(bob)}\n") |
|
|
| async def bob_listener(): |
| async with websockets.connect(get_auth_url(bob)) as ws: |
| |
| msg = await ws.recv() |
| data = json.loads(msg) |
| if data["event"] == "rfc" and data["data"]["bounty"] > 0: |
| task_id = data["data"]["id"] |
| print(f"π¦ Bob: Received Compute RFC {task_id[:8]} for +{data['data']['bounty']} SECONDS") |
| bid_res = place_bid(bob, task_id) |
| if bid_res["status"] == "accepted": |
| print("π¦ Bob: Won Compute Bid! Completing task...") |
| complete_task(bob, task_id, "Here is the code you requested.") |
| print("π¦ Bob: Compute task done.\n") |
|
|
| |
| msg = await ws.recv() |
| data = json.loads(msg) |
| if data["event"] == "new_task" and data["data"]["bounty"] == 0.0: |
| task_id = data["data"]["id"] |
| print(f"π¦ Bob: Received Cyberspace DM {task_id[:8]} from Alice (0.0 SECONDS)") |
| print(f"π¦ Bob: Message = '{data['data']['payload']}'") |
| complete_task(bob, task_id, "Yes Alice, I am free.") |
| print("π¦ Bob: Sent free reply.\n") |
|
|
| |
| msg = await ws.recv() |
| data = json.loads(msg) |
| if data["event"] == "rfc" and data["data"]["bounty"] < 0: |
| task_id = data["data"]["id"] |
| cost = data["data"]["bounty"] |
| print(f"π¦ Bob: Received Data Market RFC {task_id[:8]} costing {cost} SECONDS") |
| |
| |
| max_purchase_price = -5.0 |
| if cost >= max_purchase_price: |
| print("π¦ Bob: Budget allows it! Bidding on premium data...") |
| bid_res = place_bid(bob, task_id) |
| if bid_res["status"] == "accepted": |
| print(f"π¦ Bob: Paid {abs(cost)} SECONDS to download premium data: '{bid_res['payload']}'") |
| complete_task(bob, task_id, "Data received successfully.") |
| print("π¦ Bob: Premium data acquisition complete.\n") |
| else: |
| print("π¦ Bob: Too expensive. Ignored.") |
| |
| await asyncio.sleep(0.5) |
|
|
| async def alice_sender(): |
| |
| await asyncio.sleep(0.5) |
| |
| async with websockets.connect(get_auth_url(alice)) as ws: |
| |
| print("π© Alice: Submitting Compute Task (+5.0 SECONDS)...") |
| submit_task(alice, "Write me a python script", 5.0) |
| await asyncio.wait_for(ws.recv(), timeout=6.0) |
| |
| |
| print("π© Alice: Sending Cyberspace DM to Bob (0.0 SECONDS)...") |
| submit_task(alice, "Are you free to chat?", 0.0, target=bob.node_id) |
| await asyncio.wait_for(ws.recv(), timeout=6.0) |
| |
| |
| print("π© Alice: Broadcasting Premium Dataset (-2.0 SECONDS)...") |
| submit_task(alice, "SECRET_TRADING_ALGO_V9", -2.0) |
| await asyncio.wait_for(ws.recv(), timeout=6.0) |
| |
| await asyncio.sleep(0.5) |
|
|
| await asyncio.gather(bob_listener(), alice_sender()) |
| |
| print("=" * 60) |
| print("Final Balances:") |
| print(f"π© Alice (Started 10.0): {get_balance(alice)} (Paid 5.0, Earned 2.0 = Expected 7.0)") |
| print(f"π¦ Bob (Started 10.0): {get_balance(bob)} (Earned 5.0, Paid 2.0 = Expected 13.0)") |
| print("=" * 60) |
|
|
| if __name__ == "__main__": |
| asyncio.run(test_three_markets()) |
|
|