Files
shortdeck/shortdeck_server/arena_adapter.py
2025-10-11 18:24:24 +08:00

302 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import uuid
import random
from typing import List, Optional, Dict
from shortdeck_arena import Simulation, Agent, HumanAgent, BlindConfig
class ArenaGame:
def __init__(self, starting_stack: int = 1000, max_players: int = 6,
small_blind: int = 1, big_blind: int = 2):
self.agents = []
self.player_names: List[str] = []
self.starting_stack = starting_stack
self.max_players = max_players
self.sim: Optional[Simulation] = None
self.blind_config = BlindConfig(small_blind, big_blind, ante=0)
self.game_id = str(uuid.uuid4())
def join_game(self, name) -> int:
if len(self.agents) >= self.max_players:
raise ValueError("Game is full")
player_id = len(self.agents)
agent = HumanAgent(player_id)
self.agents.append(agent)
self.player_names.append(name)
if len(self.agents) == 1:
self.sim = Simulation(self.agents, blind_config=self.blind_config)
# 筹码默认1000
self.sim.stacks = [self.starting_stack] * len(self.agents)
elif self.sim:
self.sim.agents = self.agents
self.sim.player_states.append(self.sim.player_states[0].__class__.ACTIVE)
self.sim.pot.append(0)
self.sim.stacks.append(self.starting_stack)
# 当有至少2个玩家时触发新一轮
if len(self.agents) >= 2 and self.sim:
self.sim.stacks = [self.starting_stack] * len(self.agents)
self.sim.new_round()
return player_id
def get_valid_actions(self, player_id) -> List[str]:
if not self.sim:
return []
try:
actions_info = self.sim.get_available_actions(player_id)
if isinstance(actions_info, dict) and actions_info.get("can_act", False):
valid_actions = []
if actions_info.get("can_fold", False):
valid_actions.append("fold")
if actions_info.get("can_check", False):
valid_actions.append("check")
if actions_info.get("can_call", False):
valid_actions.append("call")
if actions_info.get("can_bet", False):
valid_actions.append("bet")
if actions_info.get("can_raise", False):
valid_actions.append("raise")
return valid_actions
else:
return []
# 是否需要添加日志
except Exception as e:
print(f"Error getting valid actions: {e}")
valid_actions = ["fold"]
try:
if hasattr(self.sim, 'current_bet') and self.sim.current_bet > 0:
valid_actions.append("call")
else:
valid_actions.append("check")
if hasattr(self.sim, 'stacks') and player_id < len(self.sim.stacks):
if self.sim.stacks[player_id] > getattr(self.sim, 'current_bet', 0):
if getattr(self.sim, 'current_bet', 0) > 0:
valid_actions.append("raise")
else:
valid_actions.append("bet")
except Exception:
pass
return valid_actions
def apply_action(self, player_id, action, amount: Optional[int] = None) -> dict:
if not self.sim:
return {"success": False, "message": "游戏未开始"}
try:
self.sim.apply_action(player_id, action, amount)
if (hasattr(self.sim, 'current_stage') and
getattr(self.sim.current_stage, 'value', '') == 'finished' and
hasattr(self.sim, 'hand_completed') and
self.sim.hand_completed):
import time
time.sleep(10)
print("游戏结束,自动开始新一轮...")
self.reset_hand_keep_chips()
self.sim.dump_data()
return {"success": True, "message": f"Applied {action}"}
except Exception as e:
return {"success": False, "message": str(e)}
def info(self, player_id: Optional[int] = None) -> Dict:
if not self.sim:
return {
"game_id": self.game_id,
"players": self.player_names,
"dealer_index": 0,
"current_turn": 0,
"stage": "waiting",
"total_pot": 0,
"side_pots": [],
"player_cards": [],
"board_cards": [],
"stacks": [self.starting_stack] * len(self.agents),
"player_states": ["WAITING"] * len(self.agents),
"current_pot": [0] * len(self.agents),
"actions": {"can_act": False, "reason": "游戏未开始"}
}
info_data = {
"game_id": self.game_id,
"players": self.player_names,
"dealer_index": getattr(self.sim, 'dealer_position', 0),
"current_turn": getattr(self.sim, 'current_turn', 0),
"stage": getattr(self.sim.current_stage, 'value', 'pre_flop') if hasattr(self.sim, 'current_stage') else 'pre_flop',
"total_pot": getattr(self.sim, 'total_pot', 0),
"side_pots": [],
}
try:
side_pots = self.sim.get_side_pots()
info_data["side_pots"] = [{"amount": pot.amount, "eligible_players": list(pot.eligible_players)}
for pot in side_pots]
except Exception:
info_data["side_pots"] = []
if player_id is not None and 0 <= player_id < len(self.sim.stacks):
try:
player_cards = self.sim.player_cards(player_id)
info_data["player_cards"] = [str(card) for card in player_cards]
print(f"DEBUG - Player {player_id} cards: {info_data['player_cards']} (raw: {player_cards})")
except Exception as e:
print(f"DEBUG - Error getting player {player_id} cards: {e}")
info_data["player_cards"] = []
try:
actions_result = self.sim.get_available_actions(player_id)
info_data["actions"] = actions_result
except Exception as e:
info_data["actions"] = {"can_act": False, "reason": f"Error getting actions: {str(e)}"}
else:
info_data["player_cards"] = []
info_data["actions"] = {"can_act": False, "reason": "Invalid player"}
try:
stage_value = getattr(self.sim.current_stage, 'value', 'pre_flop') if hasattr(self.sim, 'current_stage') else 'pre_flop'
board_cards = self.sim.board_cards(stage_value)
info_data["board_cards"] = [str(card) for card in board_cards]
except Exception:
info_data["board_cards"] = []
try:
info_data["stacks"] = self.sim.stacks.copy() if hasattr(self.sim, 'stacks') else []
except Exception:
info_data["stacks"] = []
try:
info_data["player_states"] = [state.value for state in self.sim.player_states] if hasattr(self.sim, 'player_states') else []
except Exception:
info_data["player_states"] = []
try:
info_data["current_pot"] = self.sim.pot.copy() if hasattr(self.sim, 'pot') else []
except Exception:
info_data["current_pot"] = []
if (hasattr(self.sim, 'current_stage') and
getattr(self.sim.current_stage, 'value', '') == 'finished' and
self.sim.is_hand_complete()):
result = self.sim.complete_hand()
if result.get("complete", False):
info_data["showdown_hands"] = result.get("showdown_hands", {})
info_data["winnings"] = result.get("winnings", {})
info_data["winners"] = result.get("winners", [])
if not hasattr(self, '_result_shown_count'):
self._result_shown_count = 0
self._result_shown_count += 1
if self._result_shown_count >= 3:
print(" 自动开始新一轮游戏...")
self.sim.new_round()
self._result_shown_count = 0
info_data["stage"] = "preflop"
## todo
return info_data
def get_hand_strength(self, player_id: int) -> Dict:
ranking = self.sim.evaluate_player_hand(player_id)
if ranking is None:
return {"error": "无法评估手牌"}
return {
"hand_type": ranking.hand_type.type_name,
"description": str(ranking),
"strength": ranking.get_strength(),
"cards": [str(card) for card in ranking.cards]
}
def check_hand_complete(self) -> bool:
return self.sim.is_hand_complete() if self.sim else False
def get_winners(self) -> Dict:
if not self.sim:
return {"error": "游戏未初始化"}
if not self.sim.is_hand_complete():
return {"error": "手牌未完成"}
return self.sim.complete_hand()
def showdown(self) -> Dict:
if not self.sim:
return {"error": "游戏未初始化"}
winners = self.sim.determine_winners()
showdown_info = {}
for pid, ranking in winners.items():
if ranking is not None:
showdown_info[pid] = {
"cards": [str(card) for card in self.sim.player_cards(pid)],
"hand_type": ranking.hand_type.type_name,
"description": str(ranking),
"strength": ranking.get_strength()
}
else:
showdown_info[pid] = {
"cards": [str(card) for card in self.sim.player_cards(pid)],
"hand_type": "Winner by default",
"description": "Other players folded",
"strength": float('inf')
}
return {
"showdown": showdown_info,
"pot_distribution": self.sim.distribute_pot()
}
@property
def pot(self) -> int:
return self.sim.total_pot if self.sim else 0
@property
def stacks(self) -> List[int]:
return self.sim.stacks if self.sim else []
@property
def current_turn(self) -> int:
return self.sim.current_turn if self.sim else -1
@property
def history(self) -> List[Dict]:
return self.sim.history if self.sim else []
def reset_hand_keep_chips(self):
if not self.sim or len(self.agents) < 2:
return
if hasattr(self.sim, 'hand_completed') and not self.sim.hand_completed:
if self.sim.is_hand_complete():
self.sim.complete_hand()
current_stacks = self.sim.stacks.copy()
self.sim = Simulation(self.agents, blind_config=self.blind_config)
self.sim.stacks = current_stacks
self.sim.new_round()
def full_reset(self):
self.agents = []
self.player_names = []
self.sim = None
self.game_id = str(uuid.uuid4())