Spaces:
Running
Running
| from flask import Flask, request, Response | |
| from flask_apscheduler import APScheduler | |
| from litemapy import Schematic | |
| import os | |
| import json | |
| import zlib | |
| import base64 | |
| app = Flask(__name__) | |
| scheduler = APScheduler() | |
| scheduler.init_app(app) | |
| scheduler.start() | |
| LITEMATIC_DIR = os.path.join(os.path.dirname(__file__), "litematics") | |
| default_chunk_size = 20000 | |
| default_chunk_block_size = 3000 | |
| def check_litematics(): | |
| directory = LITEMATIC_DIR | |
| litematics = [ | |
| item for item in os.listdir(directory) | |
| if os.path.isfile(os.path.join(directory, item)) and item.endswith('.litematic') | |
| ] | |
| for i in litematics: | |
| itxt = i.replace(".litematic", ".txt") | |
| txt_path = os.path.join(LITEMATIC_DIR, itxt) | |
| litematic_path = os.path.join(LITEMATIC_DIR, i) | |
| if (not os.path.exists(txt_path) or not open(txt_path, 'r', encoding='utf-8').read().strip()) \ | |
| and os.path.exists(litematic_path): | |
| dict_blocks = convert_litematic(litematic_path, default_chunk_block_size) | |
| chunks = chunking(dict_blocks) | |
| comp_chunks = compress(chunks) | |
| with open(txt_path, 'w', encoding='utf-8') as f: | |
| f.write("\n".join(str(x) for x in comp_chunks)) | |
| scheduler.add_job(id='check_litematics', func=check_litematics, trigger='interval', seconds=5) | |
| def chunking(orig_dict, chunk_size=default_chunk_size): | |
| blocks_dict = orig_dict.copy() | |
| result = [] | |
| while blocks_dict: | |
| keys_to_delete = [] | |
| json_dict = {} | |
| for k, v in blocks_dict.items(): | |
| json_dict[k] = [] | |
| while v: | |
| if k not in json_dict: | |
| json_dict[k] = [] | |
| sublist_list = v[-1] | |
| if not sublist_list: | |
| v.pop() | |
| continue | |
| sublists = json_dict[k] | |
| while sublist_list: | |
| sublist = sublist_list[-1] | |
| json_dict[k] = sublists+[sublist] | |
| json_text = json.dumps(json_dict, separators=(",", ":")) | |
| if len(json_text) > chunk_size: | |
| if sublists: | |
| json_dict[k] = sublists | |
| else: | |
| json_dict.pop(k) | |
| json_text = json.dumps(json_dict, separators=(",", ":")) | |
| result.append(json_text) | |
| json_dict = {} | |
| break | |
| else: | |
| sublist_list.pop() | |
| sublists.append(sublist) | |
| blocks_dict[k] = v | |
| if not v: | |
| keys_to_delete.append(k) | |
| for k in keys_to_delete: | |
| blocks_dict.pop(k) | |
| if json_dict: | |
| json_text = json.dumps(json_dict, separators=(",", ":")) | |
| result.append(json_text) | |
| return result | |
| def convert_litematic(file, chunk_block_size=default_chunk_block_size): | |
| try: | |
| schem = Schematic.load(file) | |
| dict_blocks = process_regions(schem, chunk_block_size) | |
| if type(dict_blocks).__name__ == "dict": | |
| if ("error", "status") in dict_blocks.keys(): | |
| raise Exception({"error": str(dict_blocks["error"]), "status": int(dict_blocks["status"])}) | |
| dict_blocks.pop("minecraft:air", None) | |
| dict_blocks.pop("minecraft:cave_air", None) | |
| dict_blocks.pop("minecraft:void_air", None) | |
| return dict_blocks | |
| else: | |
| return False | |
| except Exception as e: | |
| return {"error": str(e), "status": 400} | |
| def process_regions(schem, chunk_block_size=default_chunk_block_size): | |
| try: | |
| result = {} | |
| for region in schem.regions.values(): | |
| offset = getattr(region, "position", None) or getattr(region, "origin", None) or [0, 0, 0] | |
| for x, y, z in region.block_positions(): | |
| block = region[x, y, z] | |
| nbt = block.to_nbt().get("Properties", {}) | |
| props = [f"{k}={v}" for k, v in nbt.items() if v != "none"] | |
| key = block.id + (f"[{','.join(props)}]" if props else "") | |
| coords = [x + offset[0], y + offset[1], z + offset[2]] | |
| result.setdefault(key, [[]]) | |
| if len(result[key][-1]) >= chunk_block_size: | |
| result[key].append([]) | |
| result[key][-1].append(coords) | |
| return result | |
| except Exception as e: | |
| return {"error": str(e), "status": 400} | |
| def compress(chunks): | |
| result = [] | |
| for i in chunks: | |
| to_bytes = str(i).encode("utf-8") | |
| to_zlib = zlib.compress(to_bytes, 9) | |
| to_base64 = base64.b64encode(to_zlib).decode("utf-8") | |
| result.append(to_base64) | |
| return result | |
| def litematic(): | |
| try: | |
| data = request.get_json() | |
| litematic = data.get("litematic") | |
| player = data.get("player") | |
| stream = data.get("stream", 0) | |
| start_chunk = data.get("start_chunk", 0) | |
| txt = LITEMATIC_DIR + f'/{litematic}.txt' | |
| litematic_file = LITEMATIC_DIR + f'/{litematic}.litematic' | |
| if os.path.exists(litematic_file): | |
| if os.path.exists(txt): | |
| with open(txt, 'r', encoding="utf-8") as f: | |
| content = f.readlines() | |
| else: | |
| return Response(json.dumps({"error": "Txt not exist. Try Later."}), status=400, mimetype="application/json") | |
| else: | |
| return Response(json.dumps({"error": "Litematic not exist."}), status=400, mimetype="application/json") | |
| return Response(json.dumps({"count_chunks": len(content), "litematic": litematic, "player": player, "start_chunk": start_chunk, "stream": stream}), status=200, mimetype="application/json") | |
| except Exception as e: | |
| return Response(json.dumps({"error": str(e)}), status=400, mimetype="application/json") | |
| def status(): | |
| try: | |
| litematics = [item for item in os.listdir(LITEMATIC_DIR) | |
| if os.path.isfile(os.path.join(LITEMATIC_DIR, item)) and item.endswith('.litematic')] | |
| txts = [item for item in os.listdir(LITEMATIC_DIR) | |
| if os.path.isfile(os.path.join(LITEMATIC_DIR, item)) and item.endswith('.txt')] | |
| ready_litematics = {} | |
| for i, v in enumerate(litematics): | |
| stat = 0 | |
| if v.replace(".litematic", ".txt") in txts: | |
| stat = 1 | |
| try: | |
| with open(f"{LITEMATIC_DIR}/{v.replace(".litematic", ".txt")}", "r", encoding="utf-8") as f: | |
| ready_litematics[v.replace(".litematic", "")] = [stat, len(f.readlines())] | |
| except Exception as e: | |
| continue | |
| return Response(json.dumps({"l": ready_litematics})) | |
| except Exception as e: | |
| return Response(json.dumps({"error": str(e)}), status=400, mimetype="application/json") | |
| def chunk(): | |
| try: | |
| data = request.get_json() | |
| index_chunk = data.get("index_chunk") | |
| litematic = data.get("litematic") | |
| player = data.get("player") | |
| stream = data.get("stream") | |
| # {"index_chunk": 0, "litematic": "gradient_cube", "player": "%player%"} | |
| txt = LITEMATIC_DIR + f'/{litematic}.txt' | |
| litematic_file = LITEMATIC_DIR + f'/{litematic}.litematic' | |
| if os.path.exists(litematic_file): | |
| if os.path.exists(txt): | |
| with open(txt, 'r', encoding="utf-8") as f: | |
| content = f.readlines() | |
| else: | |
| return Response(json.dumps({"error": "Txt not exist. Try Later."}), status=400, mimetype="application/json") | |
| else: | |
| return Response(json.dumps({"error": "Litematic not exist."}), status=400, mimetype="application/json") | |
| return Response(json.dumps({"chunk": content[index_chunk], "litematic": litematic, "index": index_chunk, "player": player, "stream": stream}, separators=(",", ":")), status=200, mimetype="application/json") | |
| except Exception as e: | |
| return Response(json.dumps({"error": str(e)}), status=400, mimetype="application/json") | |
| if __name__ == "__main__": | |
| app.run(host="0.0.0.0", port=7860) | |