Spaces:
Sleeping
Sleeping
| import asyncio | |
| import sys | |
| if sys.platform.startswith("linux"): | |
| asyncio.set_event_loop_policy(asyncio.DefaultEventLoopPolicy()) | |
| import os | |
| import gradio as gr | |
| import numpy as np | |
| from PIL import Image | |
| from huggingface_hub import InferenceClient | |
| from moviepy.video.io.ImageSequenceClip import ImageSequenceClip | |
| from settings import EVENTS | |
| from player import Player | |
| from utils import PROMPT, INPUT_PROMPT, extract_after_final_answer | |
| # Initialize the HF Inference Client | |
| # It will automatically pick up the HF_TOKEN environment variable if it's set | |
| # Define the models we want to use from the Hugging Face Hub | |
| LLM_MODEL = "Qwen/Qwen2.5-7B-Instruct" | |
| target_size=(1024, 1024) | |
| def compile_game_video(player, output_path="game_summary.mp4", fps=0.5): | |
| """Compiles accumulated images into a video. fps=0.5 means 2 seconds per image.""" | |
| image_paths = player.get_images() | |
| if not image_paths: | |
| return None | |
| image_arrays = [] | |
| try: | |
| for idx, img_path in enumerate(image_paths): | |
| if isinstance(img_path, str) and os.path.exists(img_path): | |
| with Image.open(img_path) as img: | |
| # 1. Force convert to RGB (strips alpha channel if PNG) | |
| img = img.convert("RGB") | |
| # 2. Resize to prevent MoviePy size mismatch crash | |
| if img.size != target_size: | |
| img = img.resize(target_size, Image.Resampling.LANCZOS if hasattr(Image, 'Resampling') else Image.LANCZOS) | |
| # 3. Convert PIL Image directly into a NumPy array and store in RAM | |
| img_array = np.array(img) | |
| image_arrays.append(img_array) | |
| else: | |
| print(f"β οΈ Warning: Skipping missing path: {img_path}") | |
| if not image_arrays: | |
| print("β Error: No valid image arrays collected.") | |
| return None | |
| print(f"π¬ Compiling {len(image_arrays)} arrays directly from RAM...") | |
| # MoviePy natively accepts a list of NumPy arrays! | |
| clip = ImageSequenceClip(image_arrays, fps=fps) | |
| clip.write_videofile(output_path, codec="libx264", logger=None) | |
| return output_path | |
| except Exception as e: | |
| print(f"Video compilation failed: {e}") | |
| return None | |
| def start_game(): | |
| """Initializes or resets the game state when the app loads or restarts.""" | |
| initial_loc = 0 | |
| initial_player = Player('player') # Assuming your Player class initializes fresh attributes | |
| initial_event = EVENTS[initial_loc] | |
| initial_player.append_file(initial_event.filelist) | |
| # Format the very first event presentation | |
| initial_story = f"### {initial_event.get_title()}\n\n{initial_event.get_description()}" | |
| status = initial_player.get_attribute() | |
| # Return initial UI values + initial state values | |
| return ( | |
| initial_story, # updates game_screen_ui | |
| status['hp'], # updates status_ui | |
| status['mp'], | |
| status['coin'], | |
| initial_event.get_images(), | |
| gr.update(visible=True), # ensures user input layout is visible | |
| gr.update(visible=False, value=None), | |
| initial_loc, # sets state_loc | |
| initial_player, # sets state_player | |
| initial_event # sets state_cur | |
| ) | |
| def play_turn(human_response, loc, player, cur, custom_token, request: gr.Request): | |
| """Executes a single turn of the game loop based on user input.""" | |
| status = player.get_attribute() | |
| if not human_response.strip(): | |
| return ( | |
| "Please type an action to proceed!", | |
| status['hp'], status['mp'], status['coin'], | |
| cur.get_images(), | |
| gr.update(), gr.update(), | |
| loc, player, cur | |
| ) | |
| chosen_token = custom_token.strip() if custom_token and custom_token.strip() else (request.token if request else None) | |
| if not chosen_token: | |
| yield ( | |
| "β **Authentication Required:** Please enter a token above or use OAuth.", | |
| status['hp'], status['mp'], status['coin'], | |
| cur.get_images(), | |
| gr.update(), gr.update(), | |
| loc, player, cur | |
| ) | |
| return | |
| # 1. Process AI Model Response | |
| full_msg = PROMPT + INPUT_PROMPT.format(human_response) | |
| messages = [ | |
| { | |
| "role": "user", | |
| "content": full_msg | |
| } | |
| ] | |
| try: | |
| client = InferenceClient( | |
| api_key=chosen_token | |
| ) | |
| response = client.chat_completion( | |
| model=LLM_MODEL, | |
| messages=messages, | |
| max_tokens=500, | |
| temperature=0.7 | |
| ) | |
| except Exception as e: | |
| yield ( | |
| f"Error calling API: {str(e)}", | |
| status['hp'], status['mp'], status['coin'], | |
| cur.get_images(), | |
| gr.update(), gr.update(), | |
| loc, player, cur | |
| ) | |
| return | |
| processed_response = extract_after_final_answer(response.choices[0].message.content) | |
| # 2. Update Game State / Location | |
| loc = cur._next(processed_response, player) | |
| attribute = player.get_attribute() | |
| # 3. Check for Game Over conditions | |
| if attribute['hp'] <= 0: | |
| loc = -1 | |
| cur = EVENTS[loc] | |
| if loc == -1 or loc >= 10: | |
| # Final summary / Game Over screens | |
| end_title = cur.get_title() | |
| end_desc = cur.get_description() | |
| next_image = cur.get_images() | |
| player.append_file(cur.filelist) | |
| final_story = f"### π {end_title}\n\n{end_desc}\n\n**The Journey Ends Here.**" | |
| yield ( | |
| final_story, | |
| attribute['hp'], attribute['mp'], attribute['coin'], | |
| next_image, | |
| gr.update(visible=False), | |
| gr.update(), | |
| loc, player, cur | |
| ) | |
| # Compile video now that game loop is terminating | |
| video_file = compile_game_video(player) | |
| if video_file: | |
| final_story += "\n\nποΈ **Your adventure video is ready below!**" | |
| yield ( | |
| final_story, | |
| attribute['hp'], attribute['mp'], attribute['coin'], | |
| next_image, | |
| gr.update(visible=False), | |
| gr.update(visible=True, value=video_file), | |
| loc, player, cur | |
| ) | |
| return | |
| # 4. Advance to the next event normally | |
| cur = EVENTS[loc] | |
| next_story = f"### {cur.get_title()}\n\n{cur.get_description()}" | |
| next_image = cur.get_images() | |
| player.append_file(cur.filelist) | |
| # Clear out the text box for the next turn while returning updated states | |
| yield ( | |
| next_story, | |
| attribute['hp'], attribute['mp'], attribute['coin'], | |
| next_image, | |
| gr.update(value=""), | |
| gr.update(), | |
| loc, player, cur | |
| ) | |
| # -------------------------------------------------------------------------- | |
| # LAYOUT DESIGN WITH INTEGRATED GALLERY BUILDER | |
| # -------------------------------------------------------------------------- | |
| with gr.Blocks() as demo: | |
| gr.Markdown("# βοΈ RPG Sandbox") | |
| gr.Markdown("## You can find your adventure recap below.") | |
| with gr.Accordion("π Authentication Options", open=False): | |
| with gr.Row(): | |
| user_token_input = gr.Textbox(label="Paste your HF Token (hf_...)", type="password") | |
| state_loc = gr.State() | |
| state_player = gr.State() | |
| state_cur = gr.State() | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| game_screen_ui = gr.Markdown(value="### Setup authentication, then click below to wake up.") | |
| gr.Markdown("### π Status Panel") | |
| hp_ui = gr.Textbox(label="HP", interactive=False, value="0") | |
| mp_ui = gr.Textbox(label="Mana Points (MP)", interactive=False, value="0") | |
| coin_ui = gr.Textbox(label="Coin", interactive=False, value="0") | |
| with gr.Row(visible=False) as input_panel: | |
| user_input = gr.Textbox(label="Your Turn Action", placeholder="Type action and hit Enter...", scale=4) | |
| submit_btn = gr.Button("Execute Action", variant="primary", scale=1) | |
| start_btn = gr.Button("π Start / Reset Game", variant="secondary") | |
| with gr.Column(scale=3): | |
| gr.Markdown("### πΌοΈ Story Scene") | |
| # UPGRADED: Gr.Image swapped to gr.Gallery to swallow flexible size limits | |
| game_gallery_ui = gr.Gallery( | |
| label="Story Plot", | |
| columns=[2], # Display up to 2 side-by-side tiles automatically | |
| rows=[2], | |
| object_fit="contain", | |
| height=800, | |
| interactive=False | |
| ) | |
| # Added: Video output component, hidden by default | |
| video_summary_ui = gr.Video(label="π¬ Adventure Recap", visible=False, autoplay=True) | |
| start_btn.click( | |
| fn=start_game, | |
| outputs=[game_screen_ui, hp_ui, mp_ui, coin_ui, game_gallery_ui, input_panel, video_summary_ui, state_loc, state_player, state_cur] | |
| ) | |
| turn_config = { | |
| "fn": play_turn, | |
| "inputs": [user_input, state_loc, state_player, state_cur, user_token_input], | |
| "outputs": [game_screen_ui, hp_ui, mp_ui, coin_ui, game_gallery_ui, user_input, video_summary_ui, state_loc, state_player, state_cur] | |
| } | |
| submit_btn.click(**turn_config) | |
| user_input.submit(**turn_config) | |
| if __name__ == "__main__": | |
| demo.launch(ssr_mode=False) |