import asyncio import sys if sys.platform.startswith("linux"): asyncio.set_event_loop_policy(asyncio.DefaultEventLoopPolicy()) import os import gradio as gr import numpy as np from PIL import Image from huggingface_hub import InferenceClient from moviepy.video.io.ImageSequenceClip import ImageSequenceClip from settings import EVENTS from player import Player from utils import PROMPT, INPUT_PROMPT, extract_after_final_answer # Initialize the HF Inference Client # It will automatically pick up the HF_TOKEN environment variable if it's set # Define the models we want to use from the Hugging Face Hub LLM_MODEL = "Qwen/Qwen2.5-7B-Instruct" target_size=(1024, 1024) def compile_game_video(player, output_path="game_summary.mp4", fps=0.5): """Compiles accumulated images into a video. fps=0.5 means 2 seconds per image.""" image_paths = player.get_images() if not image_paths: return None image_arrays = [] try: for idx, img_path in enumerate(image_paths): if isinstance(img_path, str) and os.path.exists(img_path): with Image.open(img_path) as img: # 1. Force convert to RGB (strips alpha channel if PNG) img = img.convert("RGB") # 2. Resize to prevent MoviePy size mismatch crash if img.size != target_size: img = img.resize(target_size, Image.Resampling.LANCZOS if hasattr(Image, 'Resampling') else Image.LANCZOS) # 3. Convert PIL Image directly into a NumPy array and store in RAM img_array = np.array(img) image_arrays.append(img_array) else: print(f"āš ļø Warning: Skipping missing path: {img_path}") if not image_arrays: print("āŒ Error: No valid image arrays collected.") return None print(f"šŸŽ¬ Compiling {len(image_arrays)} arrays directly from RAM...") # MoviePy natively accepts a list of NumPy arrays! clip = ImageSequenceClip(image_arrays, fps=fps) clip.write_videofile(output_path, codec="libx264", logger=None) return output_path except Exception as e: print(f"Video compilation failed: {e}") return None def start_game(): """Initializes or resets the game state when the app loads or restarts.""" initial_loc = 0 initial_player = Player('player') # Assuming your Player class initializes fresh attributes initial_event = EVENTS[initial_loc] initial_player.append_file(initial_event.filelist) # Format the very first event presentation initial_story = f"### {initial_event.get_title()}\n\n{initial_event.get_description()}" status = initial_player.get_attribute() # Return initial UI values + initial state values return ( initial_story, # updates game_screen_ui status['hp'], # updates status_ui status['mp'], status['coin'], initial_event.get_images(), gr.update(visible=True), # ensures user input layout is visible gr.update(visible=False, value=None), initial_loc, # sets state_loc initial_player, # sets state_player initial_event # sets state_cur ) def play_turn(human_response, loc, player, cur, custom_token, request: gr.Request): """Executes a single turn of the game loop based on user input.""" status = player.get_attribute() if not human_response.strip(): return ( "Please type an action to proceed!", status['hp'], status['mp'], status['coin'], cur.get_images(), gr.update(), gr.update(), loc, player, cur ) chosen_token = custom_token.strip() if custom_token and custom_token.strip() else (request.token if request else None) if not chosen_token: yield ( "āŒ **Authentication Required:** Please enter a token above or use OAuth.", status['hp'], status['mp'], status['coin'], cur.get_images(), gr.update(), gr.update(), loc, player, cur ) return # 1. Process AI Model Response full_msg = PROMPT + INPUT_PROMPT.format(human_response) messages = [ { "role": "user", "content": full_msg } ] try: client = InferenceClient( api_key=chosen_token ) response = client.chat_completion( model=LLM_MODEL, messages=messages, max_tokens=500, temperature=0.7 ) except Exception as e: yield ( f"Error calling API: {str(e)}", status['hp'], status['mp'], status['coin'], cur.get_images(), gr.update(), gr.update(), loc, player, cur ) return processed_response = extract_after_final_answer(response.choices[0].message.content) # 2. Update Game State / Location loc = cur._next(processed_response, player) attribute = player.get_attribute() # 3. Check for Game Over conditions if attribute['hp'] <= 0: loc = -1 cur = EVENTS[loc] if loc == -1 or loc >= 10: # Final summary / Game Over screens end_title = cur.get_title() end_desc = cur.get_description() next_image = cur.get_images() player.append_file(cur.filelist) final_story = f"### šŸ {end_title}\n\n{end_desc}\n\n**The Journey Ends Here.**" yield ( final_story, attribute['hp'], attribute['mp'], attribute['coin'], next_image, gr.update(visible=False), gr.update(), loc, player, cur ) # Compile video now that game loop is terminating video_file = compile_game_video(player) if video_file: final_story += "\n\nšŸŽžļø **Your adventure video is ready below!**" yield ( final_story, attribute['hp'], attribute['mp'], attribute['coin'], next_image, gr.update(visible=False), gr.update(visible=True, value=video_file), loc, player, cur ) return # 4. Advance to the next event normally cur = EVENTS[loc] next_story = f"### {cur.get_title()}\n\n{cur.get_description()}" next_image = cur.get_images() player.append_file(cur.filelist) # Clear out the text box for the next turn while returning updated states yield ( next_story, attribute['hp'], attribute['mp'], attribute['coin'], next_image, gr.update(value=""), gr.update(), loc, player, cur ) # -------------------------------------------------------------------------- # LAYOUT DESIGN WITH INTEGRATED GALLERY BUILDER # -------------------------------------------------------------------------- with gr.Blocks() as demo: gr.Markdown("# āš”ļø RPG Sandbox") gr.Markdown("## You can find your adventure recap below.") with gr.Accordion("šŸ”‘ Authentication Options", open=False): with gr.Row(): user_token_input = gr.Textbox(label="Paste your HF Token (hf_...)", type="password") state_loc = gr.State() state_player = gr.State() state_cur = gr.State() with gr.Row(): with gr.Column(scale=2): game_screen_ui = gr.Markdown(value="### Setup authentication, then click below to wake up.") gr.Markdown("### šŸ“œ Status Panel") hp_ui = gr.Textbox(label="HP", interactive=False, value="0") mp_ui = gr.Textbox(label="Mana Points (MP)", interactive=False, value="0") coin_ui = gr.Textbox(label="Coin", interactive=False, value="0") with gr.Row(visible=False) as input_panel: user_input = gr.Textbox(label="Your Turn Action", placeholder="Type action and hit Enter...", scale=4) submit_btn = gr.Button("Execute Action", variant="primary", scale=1) start_btn = gr.Button("šŸš€ Start / Reset Game", variant="secondary") with gr.Column(scale=3): gr.Markdown("### šŸ–¼ļø Story Scene") # UPGRADED: Gr.Image swapped to gr.Gallery to swallow flexible size limits game_gallery_ui = gr.Gallery( label="Story Plot", columns=[2], # Display up to 2 side-by-side tiles automatically rows=[2], object_fit="contain", height=800, interactive=False ) # Added: Video output component, hidden by default video_summary_ui = gr.Video(label="šŸŽ¬ Adventure Recap", visible=False, autoplay=True) start_btn.click( fn=start_game, outputs=[game_screen_ui, hp_ui, mp_ui, coin_ui, game_gallery_ui, input_panel, video_summary_ui, state_loc, state_player, state_cur] ) turn_config = { "fn": play_turn, "inputs": [user_input, state_loc, state_player, state_cur, user_token_input], "outputs": [game_screen_ui, hp_ui, mp_ui, coin_ui, game_gallery_ui, user_input, video_summary_ui, state_loc, state_player, state_cur] } submit_btn.click(**turn_config) user_input.submit(**turn_config) if __name__ == "__main__": demo.launch(ssr_mode=False)