243 lines
7.9 KiB
Python
243 lines
7.9 KiB
Python
#!/usr/bin/env python3
|
||
import requests
|
||
import time
|
||
import random
|
||
import json
|
||
import sys
|
||
from typing import Dict, List, Optional
|
||
|
||
class RandomAgentClient:
|
||
def __init__(self, server_url: str = "http://localhost:8001", player_name: str = "RandomBot"):
|
||
self.server_url = server_url.rstrip('/')
|
||
self.player_name = player_name
|
||
self.player_id: Optional[int] = None
|
||
self.game_active = False
|
||
|
||
def join_game(self) -> bool:
|
||
response = requests.post(
|
||
f"{self.server_url}/join",
|
||
json={"name": self.player_name},
|
||
timeout=5
|
||
)
|
||
|
||
if response.status_code == 200:
|
||
data = response.json()
|
||
self.player_id = data["player_id"]
|
||
print(f" {self.player_name} 成功加入游戏,玩家ID: {self.player_id}")
|
||
return True
|
||
else:
|
||
print(f" 加入游戏失败: {response.status_code} - {response.text}")
|
||
return False
|
||
|
||
|
||
def get_game_info(self) -> Optional[Dict]:
|
||
if self.player_id is None:
|
||
return None
|
||
|
||
try:
|
||
response = requests.get(
|
||
f"{self.server_url}/info/{self.player_id}",
|
||
timeout=5
|
||
)
|
||
|
||
if response.status_code == 200:
|
||
return response.json()
|
||
else:
|
||
print(f" 获取游戏信息失败: {response.status_code}")
|
||
return None
|
||
|
||
except requests.RequestException as e:
|
||
print(f" 获取游戏信息时连接失败: {e}")
|
||
return None
|
||
|
||
def get_valid_actions(self) -> List[str]:
|
||
if self.player_id is None:
|
||
return []
|
||
|
||
try:
|
||
response = requests.get(
|
||
f"{self.server_url}/valid_actions/{self.player_id}",
|
||
timeout=5
|
||
)
|
||
|
||
if response.status_code == 200:
|
||
data = response.json()
|
||
return data.get("valid_actions", [])
|
||
else:
|
||
return []
|
||
|
||
except requests.RequestException:
|
||
return []
|
||
|
||
def make_random_action(self, valid_actions, game_info) -> bool:
|
||
if not valid_actions:
|
||
print(" 没有可用动作")
|
||
return False
|
||
|
||
# 随机选择一个动作, 弃牌率高, todo
|
||
action = random.choice(valid_actions)
|
||
amount = 10
|
||
|
||
if action in ["raise", "bet"]:
|
||
player_stack = game_info.get("stacks", [1000])[self.player_id] if self.player_id < len(game_info.get("stacks", [])) else 1000
|
||
min_bet = max(game_info.get("current_pot", [0])) if game_info.get("current_pot") else 10
|
||
max_bet = min(player_stack, player_stack // 2) # 最多下注一半筹码
|
||
|
||
if max_bet > min_bet:
|
||
amount = random.randint(min_bet, max_bet)
|
||
else:
|
||
amount = min_bet
|
||
|
||
|
||
payload = {
|
||
"player_id": self.player_id,
|
||
"action": action,
|
||
"amount": amount
|
||
}
|
||
|
||
response = requests.post(
|
||
f"{self.server_url}/apply_action",
|
||
json=payload,
|
||
timeout=5
|
||
)
|
||
|
||
if response.status_code == 200:
|
||
action_str = f"{action}" + (f" {amount}" if amount else "")
|
||
print(f" {self.player_name} 执行动作: {action_str}")
|
||
return True
|
||
else:
|
||
print(f" 动作执行失败: {response.status_code} - {response.text}")
|
||
return False
|
||
|
||
|
||
|
||
def is_my_turn(self, game_info: Dict) -> bool:
|
||
if not game_info or self.player_id is None:
|
||
return False
|
||
|
||
current_turn = game_info.get("current_turn", -1)
|
||
|
||
return current_turn == self.player_id
|
||
|
||
def display_game_state(self, game_info: Dict):
|
||
if not game_info:
|
||
return
|
||
|
||
print(f"\n游戏状态:")
|
||
print(f" 阶段: {game_info.get('stage', 'unknown')}")
|
||
print(f" 总奖池: {game_info.get('total_pot', 0)}")
|
||
print(f" 当前轮到: 玩家 {game_info.get('current_turn', -1)}")
|
||
print(f" 我的ID: {self.player_id}")
|
||
print(f" 我的手牌: {game_info.get('player_cards', [])}")
|
||
print(f" 公共牌: {game_info.get('board_cards', [])}")
|
||
|
||
stacks = game_info.get('stacks', [])
|
||
if stacks and self.player_id is not None and self.player_id < len(stacks):
|
||
print(f" 我的筹码: {stacks[self.player_id]}")
|
||
|
||
def wait_for_players(self) -> bool:
|
||
print(" 等待其他玩家加入游戏...")
|
||
|
||
for _ in range(60): #60S
|
||
game_info = self.get_game_info()
|
||
if game_info:
|
||
players = game_info.get("players", [])
|
||
if len(players) >= 2:
|
||
print(f" 游戏开始!玩家列表: {players}")
|
||
return True
|
||
|
||
time.sleep(1)
|
||
|
||
print(" 等待超时,未能开始游戏......")
|
||
return False
|
||
|
||
def game_loop(self):
|
||
print(f" {self.player_name} 开始游戏循环......")
|
||
|
||
consecutive_errors = 0
|
||
|
||
while self.game_active and consecutive_errors < 5:
|
||
try:
|
||
game_info = self.get_game_info()
|
||
if not game_info:
|
||
consecutive_errors += 1
|
||
time.sleep(2)
|
||
continue
|
||
|
||
consecutive_errors = 0
|
||
|
||
self.display_game_state(game_info)
|
||
|
||
if self.is_my_turn(game_info):
|
||
print(f" 轮到 {self.player_name} 行动...")
|
||
|
||
valid_actions = self.get_valid_actions()
|
||
if valid_actions:
|
||
print(f"可用动作: {valid_actions}")
|
||
self.make_random_action(valid_actions, game_info)
|
||
else:
|
||
print(" 没有可用动作")
|
||
|
||
stage = game_info.get("stage", "")
|
||
if stage in ["game_over", "showdown", "finished"]:
|
||
print(" 游戏结束")
|
||
self.show_final_results(game_info)
|
||
time.sleep(3)
|
||
print(" 准备下一轮游戏...")
|
||
continue
|
||
|
||
time.sleep(2)
|
||
except Exception as e:
|
||
print(f" 游戏循环出错: {e}")
|
||
consecutive_errors += 1
|
||
time.sleep(2)
|
||
|
||
if consecutive_errors >= 5:
|
||
print(" 连续错误过多,退出游戏")
|
||
|
||
def show_final_results(self, game_info):
|
||
print("\n 游戏结果:")
|
||
# todo
|
||
response = requests.get(f"{self.server_url}/showdown", timeout=5)
|
||
if response.status_code == 200:
|
||
showdown_data = response.json()
|
||
print(json.dumps(showdown_data, indent=2))
|
||
|
||
|
||
def run(self):
|
||
print(f" 启动 RandomAgent 客户端: {self.player_name}")
|
||
print(f" 连接服务器: {self.server_url}")
|
||
|
||
if not self.join_game():
|
||
return False
|
||
|
||
if not self.wait_for_players():
|
||
return False
|
||
|
||
self.game_active = True
|
||
self.game_loop()
|
||
|
||
return True
|
||
|
||
|
||
def main():
|
||
import argparse
|
||
|
||
parser = argparse.ArgumentParser(description="RandomAgent")
|
||
parser.add_argument("--server", default="http://localhost:8001")
|
||
parser.add_argument("--name", default="RandomBot")
|
||
parser.add_argument("--seed", type=int)
|
||
|
||
args = parser.parse_args()
|
||
|
||
if args.seed:
|
||
random.seed(args.seed)
|
||
print(f" 设置随机种子: {args.seed}")
|
||
|
||
client = RandomAgentClient(args.server, args.name)
|
||
|
||
success = client.run()
|
||
sys.exit(0 if success else 1)
|
||
|
||
if __name__ == "__main__":
|
||
main() |