import time import uuid import gradio as gr from dotenv import load_dotenv from agent import Agent, FETCH_WEBPAGE_TOOL, SHELL_TOOL, READ_TOOL, FINAL_MESSAGE_TOOL import os from pathlib import Path load_dotenv() gr.set_static_paths("static/") _SYSTEM_PROMPT = """\ You are OpenMythos, a powerful AI agent specialized in cybersecurity-related tasks. You have access to tools that you can use to accomplish your goals. You are a multi-level vulnerability analysis, a visual dependency risk path, a declared threat level then generates an instant, verifiable hotfix patch before threat actors can exploit it. When finding exploits list it in multi step use tools to search for something specific if needed. After finding one bulnerability, you will generate a patch for it and provide a detailed explanation of the vulnerability, including its potential impact and how the patch mitigates the risk. Than THinks again and search for new vulnerabilities and repeat the process until all vulnerabilities are found. Don't go much looped if you find yourself in a loop just call the `final_message` tool to end the conversation. === IMPORTANT: How to end the conversation === You MUST call the `final_message` tool when you have completed your response and want to end. If you do NOT call `final_message`, you will be stuck in a loop: - You respond → system waits for final_message → you did not call it - → system sends your response back to you → you must respond again - → this repeats until you call `final_message` To break out of the loop, simply call `final_message` with no arguments. Only call `final_message` when you are done or already responded or stuck in a loop. """ agent = Agent( base_url=os.getenv("OPENAI_BASE_URL"), api_key=os.getenv("OPENAI_API_KEY"), model=os.getenv("OPENAI_MODEL"), system_prompt=_SYSTEM_PROMPT, ) agent.register_tool(FETCH_WEBPAGE_TOOL, SHELL_TOOL, READ_TOOL, FINAL_MESSAGE_TOOL) agent.register_all_mcp() agent.set_final_message_tool() # Load JS from external files _js_dir = Path(__file__).parent / "static" / "js" JS_LOAD_STATE = (_js_dir / "storage.load.js").read_text() JS_SAVE_STATE = (_js_dir / "storage.save.js").read_text() LANDING_PAGE_SCRIPT = (_js_dir / "landing.js").read_text() def _conv_choices(state_value): convs = sorted(state_value["conversations"], key=lambda c: c.get("last_updated", 0), reverse=True) return gr.update( choices=[(c["label"], c["key"]) for c in convs], value=state_value.get("conversation_id") or None, ) class GradioEvents: """Event handlers for the chatbot UI.""" @staticmethod def stream_response(message, state_value): """Stream a chat completion into the active conversation.""" if not message or not message.strip(): yield gr.skip() return if not state_value.get("conversation_id"): conv_id = str(uuid.uuid4()) state_value["conversation_id"] = conv_id state_value["conversations"].append( {"label": message[:30], "key": conv_id, "last_updated": int(time.time() * 1000)}) state_value["conversation_contexts"][conv_id] = { "history": [] } else: conv_id = state_value["conversation_id"] state_value["conversation_contexts"].setdefault( conv_id, {"history": []}) # Update last_updated for existing conversation for c in state_value["conversations"]: if c["key"] == conv_id: c["last_updated"] = int(time.time() * 1000) break ctx = state_value["conversation_contexts"][conv_id] for c in state_value["conversations"]: if c["key"] == conv_id and not c.get("label"): c["label"] = message[:30] break ctx["history"].append({"role": "user", "content": message}) yield { msg: gr.update(value=""), chatbot: gr.update(value=ctx["history"]), state: gr.update(value=state_value), conv_choice: _conv_choices(state_value), send_btn: gr.update(visible=False), stop_btn: gr.update(visible=True)} # Build display as separate titled messages (smolagents style) display_messages: list[dict] = list(ctx["history"]) text_msg_idx: int | None = None thinking_msg_idx: int | None = None tool_call_idx: int | None = None spinner_frames = ["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"] spinner_idx = 0 try: for ev in agent.stream(ctx["history"]): t = ev["type"] if t == "reasoning": spinner_idx += 1 content = f"{spinner_frames[spinner_idx % len(spinner_frames)]} Thinking..." if thinking_msg_idx is not None: display_messages[thinking_msg_idx]["content"] = content else: display_messages.append({"role": "assistant", "content": content, "metadata": {}}) thinking_msg_idx = len(display_messages) - 1 elif t == "text": # Remove spinner message if showing — it was a separate bubble if thinking_msg_idx is not None: display_messages.pop(thinking_msg_idx) thinking_msg_idx = None # Re-index since we popped if text_msg_idx is not None and thinking_msg_idx is not None: if text_msg_idx > thinking_msg_idx: text_msg_idx -= 1 if text_msg_idx is not None: display_messages[text_msg_idx]["content"] += ev["content"] else: display_messages.append({"role": "assistant", "content": ev["content"], "metadata": {}}) text_msg_idx = len(display_messages) - 1 elif t == "tool_call": # Remove spinner if present if thinking_msg_idx is not None: display_messages.pop(thinking_msg_idx) if text_msg_idx is not None and text_msg_idx > thinking_msg_idx: text_msg_idx -= 1 thinking_msg_idx = None # Finalize any in-flight text message (keep if it has real content) if text_msg_idx is not None: c = display_messages[text_msg_idx].get("content", "").strip() if not c: display_messages.pop(text_msg_idx) text_msg_idx = None tool_call_idx = None tool_name = ev["name"] display_messages.append({ "role": "assistant", "content": f"```\n{tool_name}({ev['arguments']})\n```\n⏳ Running...", "metadata": {"title": f"🛠️ Used tool {tool_name}"}, }) tool_call_idx = len(display_messages) - 1 elif t == "tool_output": if tool_call_idx is not None: snippet = ev["content"][:500] cc = snippet if len(ev["content"]) <= 500 else snippet + "\n..." # Grab the tool name from the existing message tool_name = display_messages[tool_call_idx]["metadata"]["title"].split("Used tool ")[-1] display_messages[tool_call_idx]["content"] = ( f"```\n{tool_name}({ev['arguments']})\n```\n\n" f"**Output:**\n```\n{cc}\n```" ) display_messages[tool_call_idx]["metadata"] = { "title": f"🛠️ {tool_name} — {len(ev['content'])} chars", } # Only clear tool_call_idx if NOT partial (final output) if not ev.get("partial"): tool_call_idx = None elif t == "error": display_messages.append({ "role": "assistant", "content": f'{ev["content"]}', "metadata": {"title": "💥 Error"}, }) yield { chatbot: gr.update(value=display_messages), state: gr.update(value=state_value), send_btn: gr.update(visible=True), stop_btn: gr.update(visible=False), } return elif t == "done": break # Update history in real-time so state always has latest messages ctx["history"] = display_messages yield { chatbot: gr.update(value=display_messages), state: gr.update(value=state_value), } # Final sync (ensures tool output messages are saved) ctx["history"] = display_messages yield { chatbot: gr.update(value=ctx["history"]), state: gr.update(value=state_value), send_btn: gr.update(visible=True), stop_btn: gr.update(visible=False), } except Exception as exc: display_messages.append({ "role": "assistant", "content": f'{exc}', "metadata": {"title": "💥 Error"}, }) ctx["history"] = display_messages yield { chatbot: gr.update(value=display_messages), state: gr.update(value=state_value), send_btn: gr.update(visible=True), stop_btn: gr.update(visible=False), } @staticmethod def new_chat(state_value): state_value["conversation_id"] = "" return ( gr.update(value=None), gr.update(value=None), gr.update(value=state_value), ) @staticmethod def select_conversation(choice, state_value): if not choice: return gr.skip() if choice == state_value.get("conversation_id"): return gr.skip() state_value["conversation_id"] = choice ctx = state_value["conversation_contexts"].get(choice, {}) return ( gr.update(value=ctx.get("history", [])), gr.update(value=state_value), ) @staticmethod def delete_selected_conversation(choice, state_value): if not choice: return gr.skip() state_value["conversation_contexts"].pop(choice, None) state_value["conversations"] = [ c for c in state_value["conversations"] if c["key"] != choice ] was_active = state_value.get("conversation_id") == choice if was_active: state_value["conversation_id"] = "" return ( _conv_choices(state_value), gr.update(value=None), gr.update(value=state_value), ) return ( _conv_choices(state_value), gr.skip(), gr.update(value=state_value), ) @staticmethod def cancel_stream(state_value): """Mark the current assistant message as cancelled.""" if not state_value.get("conversation_id"): return gr.skip() ctx = state_value["conversation_contexts"][ state_value["conversation_id"]] if ctx.get("history") and ctx["history"][-1].get("role") == "assistant": ctx["history"][-1]["metadata"] = ctx["history"][-1].get( "metadata", {}) ctx["history"][-1]["metadata"]["footer"] = "Chat completion paused" return (gr.update(value=ctx.get("history", [])), gr.update(value=state_value), gr.update(visible=True), gr.update(visible=False)) @staticmethod def load_from_js(serialised_json, state_value): """Receive the JSON string that JS_LOAD_STATE returned, merge into state.""" import json if not serialised_json: return gr.skip(), gr.skip() try: loaded = json.loads(serialised_json) except Exception: return gr.skip(), gr.skip() state_value["conversations"] = loaded.get("conversations", []) state_value["conversation_contexts"] = loaded.get("conversation_contexts", {}) return _conv_choices(state_value), gr.update(value=state_value) @staticmethod def prepare_save(state_value): """Serialise state to JSON so JS_SAVE_STATE can write it to localStorage.""" import json return json.dumps({ "conversations": state_value.get("conversations", []), "conversation_contexts": state_value.get("conversation_contexts", {}), }) with gr.Blocks(fill_width=True, title="OpenMythos Demo") as demo: state = gr.State({ "conversation_contexts": {}, "conversations": [], "conversation_id": "", }) js_load_output = gr.Textbox(visible=False, elem_id="js-load-output") js_save_input = gr.Textbox(visible=False, elem_id="js-save-input") with gr.Row(elem_id="main-row"): with gr.Sidebar(open=False): new_chat_btn = gr.Button( value="New Conversation", variant="primary", ) conv_choice = gr.Radio( choices=[], label=None, interactive=True, elem_id="conversations-radio", ) delete_btn = gr.Button( value="Delete Selected", variant="stop", visible=False ) with gr.Column(elem_id="chat-column"): # Landing page shown when chat is empty with added hyperlinks landing_page = gr.HTML( value="""

Made with ❤️ by KingNish and Himanshu

""", elem_id="landing-page-container", ) chatbot = gr.Chatbot( elem_id="chatbot", show_label=False, buttons=[], layout="bubble", autoscroll=True ) with gr.Row(elem_id="input-row"): msg = gr.Textbox( placeholder="Enter your message here...", show_label=False, scale=4, container=False, max_lines=10, ) send_btn = gr.Button( "Send", variant="primary", scale=0, min_width=40, elem_id="send-btn", ) stop_btn = gr.Button( "Stop", variant="stop", scale=0, min_width=40, visible=False, elem_id="stop-btn", ) new_chat_btn.click( fn=GradioEvents.new_chat, inputs=[state], outputs=[conv_choice, chatbot, state], ) conv_choice.change( fn=GradioEvents.select_conversation, inputs=[conv_choice, state], outputs=[chatbot, state], ) delete_btn.click( fn=GradioEvents.delete_selected_conversation, inputs=[conv_choice, state], outputs=[conv_choice, chatbot, state], ) submit_event = send_btn.click( fn=GradioEvents.stream_response, inputs=[msg, state], outputs=[msg, chatbot, state, conv_choice, send_btn, stop_btn], ) msg.submit( fn=GradioEvents.stream_response, inputs=[msg, state], outputs=[msg, chatbot, state, conv_choice, send_btn, stop_btn], ) stop_btn.click( fn=GradioEvents.cancel_stream, inputs=[state], outputs=[chatbot, state, send_btn, stop_btn], cancels=[submit_event], ) state.change( fn=GradioEvents.prepare_save, inputs=[state], outputs=[js_save_input], ).then( fn=lambda x: x, inputs=[js_save_input], outputs=[js_save_input], js=JS_SAVE_STATE, ) demo.load( fn=lambda: None, inputs=None, outputs=None, js=LANDING_PAGE_SCRIPT, ).then( fn=lambda x: x, inputs=[js_load_output], outputs=[js_load_output], js=JS_LOAD_STATE, ).then( fn=GradioEvents.load_from_js, inputs=[js_load_output, state], outputs=[conv_choice, state], ) theme = gr.themes.Base(radius_size="none") if __name__ == "__main__": demo.queue(default_concurrency_limit=100, max_size=100).launch(ssr_mode=False, max_threads=100, css_paths="app.css", theme=theme)