Files
shortdeck/client2_random_agent.py
2025-10-11 18:24:24 +08:00

243 lines
7.9 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
import requests
import time
import random
import json
import sys
from typing import Dict, List, Optional
class RandomAgentClient:
def __init__(self, server_url: str = "http://localhost:8001", player_name: str = "RandomBot"):
self.server_url = server_url.rstrip('/')
self.player_name = player_name
self.player_id: Optional[int] = None
self.game_active = False
def join_game(self) -> bool:
response = requests.post(
f"{self.server_url}/join",
json={"name": self.player_name},
timeout=5
)
if response.status_code == 200:
data = response.json()
self.player_id = data["player_id"]
print(f" {self.player_name} 成功加入游戏玩家ID: {self.player_id}")
return True
else:
print(f" 加入游戏失败: {response.status_code} - {response.text}")
return False
def get_game_info(self) -> Optional[Dict]:
if self.player_id is None:
return None
try:
response = requests.get(
f"{self.server_url}/info/{self.player_id}",
timeout=5
)
if response.status_code == 200:
return response.json()
else:
print(f" 获取游戏信息失败: {response.status_code}")
return None
except requests.RequestException as e:
print(f" 获取游戏信息时连接失败: {e}")
return None
def get_valid_actions(self) -> List[str]:
if self.player_id is None:
return []
try:
response = requests.get(
f"{self.server_url}/valid_actions/{self.player_id}",
timeout=5
)
if response.status_code == 200:
data = response.json()
return data.get("valid_actions", [])
else:
return []
except requests.RequestException:
return []
def make_random_action(self, valid_actions, game_info) -> bool:
if not valid_actions:
print(" 没有可用动作")
return False
# 随机选择一个动作, 弃牌率高, todo
action = random.choice(valid_actions)
amount = 10
if action in ["raise", "bet"]:
player_stack = game_info.get("stacks", [1000])[self.player_id] if self.player_id < len(game_info.get("stacks", [])) else 1000
min_bet = max(game_info.get("current_pot", [0])) if game_info.get("current_pot") else 10
max_bet = min(player_stack, player_stack // 2) # 最多下注一半筹码
if max_bet > min_bet:
amount = random.randint(min_bet, max_bet)
else:
amount = min_bet
payload = {
"player_id": self.player_id,
"action": action,
"amount": amount
}
response = requests.post(
f"{self.server_url}/apply_action",
json=payload,
timeout=5
)
if response.status_code == 200:
action_str = f"{action}" + (f" {amount}" if amount else "")
print(f" {self.player_name} 执行动作: {action_str}")
return True
else:
print(f" 动作执行失败: {response.status_code} - {response.text}")
return False
def is_my_turn(self, game_info: Dict) -> bool:
if not game_info or self.player_id is None:
return False
current_turn = game_info.get("current_turn", -1)
return current_turn == self.player_id
def display_game_state(self, game_info: Dict):
if not game_info:
return
print(f"\n游戏状态:")
print(f" 阶段: {game_info.get('stage', 'unknown')}")
print(f" 总奖池: {game_info.get('total_pot', 0)}")
print(f" 当前轮到: 玩家 {game_info.get('current_turn', -1)}")
print(f" 我的ID: {self.player_id}")
print(f" 我的手牌: {game_info.get('player_cards', [])}")
print(f" 公共牌: {game_info.get('board_cards', [])}")
stacks = game_info.get('stacks', [])
if stacks and self.player_id is not None and self.player_id < len(stacks):
print(f" 我的筹码: {stacks[self.player_id]}")
def wait_for_players(self) -> bool:
print(" 等待其他玩家加入游戏...")
for _ in range(60): #60S
game_info = self.get_game_info()
if game_info:
players = game_info.get("players", [])
if len(players) >= 2:
print(f" 游戏开始!玩家列表: {players}")
return True
time.sleep(1)
print(" 等待超时,未能开始游戏......")
return False
def game_loop(self):
print(f" {self.player_name} 开始游戏循环......")
consecutive_errors = 0
while self.game_active and consecutive_errors < 5:
try:
game_info = self.get_game_info()
if not game_info:
consecutive_errors += 1
time.sleep(2)
continue
consecutive_errors = 0
self.display_game_state(game_info)
if self.is_my_turn(game_info):
print(f" 轮到 {self.player_name} 行动...")
valid_actions = self.get_valid_actions()
if valid_actions:
print(f"可用动作: {valid_actions}")
self.make_random_action(valid_actions, game_info)
else:
print(" 没有可用动作")
stage = game_info.get("stage", "")
if stage in ["game_over", "showdown", "finished"]:
print(" 游戏结束")
self.show_final_results(game_info)
time.sleep(3)
print(" 准备下一轮游戏...")
continue
time.sleep(2)
except Exception as e:
print(f" 游戏循环出错: {e}")
consecutive_errors += 1
time.sleep(2)
if consecutive_errors >= 5:
print(" 连续错误过多,退出游戏")
def show_final_results(self, game_info):
print("\n 游戏结果:")
# todo
response = requests.get(f"{self.server_url}/showdown", timeout=5)
if response.status_code == 200:
showdown_data = response.json()
print(json.dumps(showdown_data, indent=2))
def run(self):
print(f" 启动 RandomAgent 客户端: {self.player_name}")
print(f" 连接服务器: {self.server_url}")
if not self.join_game():
return False
if not self.wait_for_players():
return False
self.game_active = True
self.game_loop()
return True
def main():
import argparse
parser = argparse.ArgumentParser(description="RandomAgent")
parser.add_argument("--server", default="http://localhost:8001")
parser.add_argument("--name", default="RandomBot")
parser.add_argument("--seed", type=int)
args = parser.parse_args()
if args.seed:
random.seed(args.seed)
print(f" 设置随机种子: {args.seed}")
client = RandomAgentClient(args.server, args.name)
success = client.run()
sys.exit(0 if success else 1)
if __name__ == "__main__":
main()