#!/usr/bin/env python3 import requests import time import random import json import sys from typing import Dict, List, Optional class RandomAgentClient: def __init__(self, server_url: str = "http://localhost:8001", player_name: str = "RandomBot"): self.server_url = server_url.rstrip('/') self.player_name = player_name self.player_id: Optional[int] = None self.game_active = False def join_game(self) -> bool: response = requests.post( f"{self.server_url}/join", json={"name": self.player_name}, timeout=5 ) if response.status_code == 200: data = response.json() self.player_id = data["player_id"] print(f" {self.player_name} 成功加入游戏,玩家ID: {self.player_id}") return True else: print(f" 加入游戏失败: {response.status_code} - {response.text}") return False def get_game_info(self) -> Optional[Dict]: if self.player_id is None: return None try: response = requests.get( f"{self.server_url}/info/{self.player_id}", timeout=5 ) if response.status_code == 200: return response.json() else: print(f" 获取游戏信息失败: {response.status_code}") return None except requests.RequestException as e: print(f" 获取游戏信息时连接失败: {e}") return None def get_valid_actions(self) -> List[str]: if self.player_id is None: return [] try: response = requests.get( f"{self.server_url}/valid_actions/{self.player_id}", timeout=5 ) if response.status_code == 200: data = response.json() return data.get("valid_actions", []) else: return [] except requests.RequestException: return [] def make_random_action(self, valid_actions, game_info) -> bool: if not valid_actions: print(" 没有可用动作") return False # 随机选择一个动作, 弃牌率高, todo action = random.choice(valid_actions) amount = 10 if action in ["raise", "bet"]: player_stack = game_info.get("stacks", [1000])[self.player_id] if self.player_id < len(game_info.get("stacks", [])) else 1000 min_bet = max(game_info.get("current_pot", [0])) if game_info.get("current_pot") else 10 max_bet = min(player_stack, player_stack // 2) # 最多下注一半筹码 if max_bet > min_bet: amount = random.randint(min_bet, max_bet) else: amount = min_bet payload = { "player_id": self.player_id, "action": action, "amount": amount } response = requests.post( f"{self.server_url}/apply_action", json=payload, timeout=5 ) if response.status_code == 200: action_str = f"{action}" + (f" {amount}" if amount else "") print(f" {self.player_name} 执行动作: {action_str}") return True else: print(f" 动作执行失败: {response.status_code} - {response.text}") return False def is_my_turn(self, game_info: Dict) -> bool: if not game_info or self.player_id is None: return False current_turn = game_info.get("current_turn", -1) return current_turn == self.player_id def display_game_state(self, game_info: Dict): if not game_info: return print(f"\n游戏状态:") print(f" 阶段: {game_info.get('stage', 'unknown')}") print(f" 总奖池: {game_info.get('total_pot', 0)}") print(f" 当前轮到: 玩家 {game_info.get('current_turn', -1)}") print(f" 我的ID: {self.player_id}") print(f" 我的手牌: {game_info.get('player_cards', [])}") print(f" 公共牌: {game_info.get('board_cards', [])}") stacks = game_info.get('stacks', []) if stacks and self.player_id is not None and self.player_id < len(stacks): print(f" 我的筹码: {stacks[self.player_id]}") def wait_for_players(self) -> bool: print(" 等待其他玩家加入游戏...") for _ in range(60): #60S game_info = self.get_game_info() if game_info: players = game_info.get("players", []) if len(players) >= 2: print(f" 游戏开始!玩家列表: {players}") return True time.sleep(1) print(" 等待超时,未能开始游戏......") return False def game_loop(self): print(f" {self.player_name} 开始游戏循环......") consecutive_errors = 0 while self.game_active and consecutive_errors < 5: try: game_info = self.get_game_info() if not game_info: consecutive_errors += 1 time.sleep(2) continue consecutive_errors = 0 self.display_game_state(game_info) if self.is_my_turn(game_info): print(f" 轮到 {self.player_name} 行动...") valid_actions = self.get_valid_actions() if valid_actions: print(f"可用动作: {valid_actions}") self.make_random_action(valid_actions, game_info) else: print(" 没有可用动作") stage = game_info.get("stage", "") if stage in ["game_over", "showdown", "finished"]: print(" 游戏结束") self.show_final_results(game_info) time.sleep(3) print(" 准备下一轮游戏...") continue time.sleep(2) except Exception as e: print(f" 游戏循环出错: {e}") consecutive_errors += 1 time.sleep(2) if consecutive_errors >= 5: print(" 连续错误过多,退出游戏") def show_final_results(self, game_info): print("\n 游戏结果:") # todo response = requests.get(f"{self.server_url}/showdown", timeout=5) if response.status_code == 200: showdown_data = response.json() print(json.dumps(showdown_data, indent=2)) def run(self): print(f" 启动 RandomAgent 客户端: {self.player_name}") print(f" 连接服务器: {self.server_url}") if not self.join_game(): return False if not self.wait_for_players(): return False self.game_active = True self.game_loop() return True def main(): import argparse parser = argparse.ArgumentParser(description="RandomAgent") parser.add_argument("--server", default="http://localhost:8001") parser.add_argument("--name", default="RandomBot") parser.add_argument("--seed", type=int) args = parser.parse_args() if args.seed: random.seed(args.seed) print(f" 设置随机种子: {args.seed}") client = RandomAgentClient(args.server, args.name) success = client.run() sys.exit(0 if success else 1) if __name__ == "__main__": main()