302 lines
11 KiB
Python
302 lines
11 KiB
Python
import uuid
|
||
import random
|
||
from typing import List, Optional, Dict
|
||
|
||
from shortdeck_arena import Simulation, Agent, HumanAgent, BlindConfig
|
||
|
||
|
||
class ArenaGame:
|
||
def __init__(self, starting_stack: int = 1000, max_players: int = 6,
|
||
small_blind: int = 1, big_blind: int = 2):
|
||
self.agents = []
|
||
self.player_names: List[str] = []
|
||
self.starting_stack = starting_stack
|
||
self.max_players = max_players
|
||
self.sim: Optional[Simulation] = None
|
||
|
||
self.blind_config = BlindConfig(small_blind, big_blind, ante=0)
|
||
self.game_id = str(uuid.uuid4())
|
||
|
||
def join_game(self, name) -> int:
|
||
if len(self.agents) >= self.max_players:
|
||
raise ValueError("Game is full")
|
||
|
||
player_id = len(self.agents)
|
||
agent = HumanAgent(player_id)
|
||
self.agents.append(agent)
|
||
self.player_names.append(name)
|
||
|
||
if len(self.agents) == 1:
|
||
self.sim = Simulation(self.agents, blind_config=self.blind_config)
|
||
# 筹码默认1000
|
||
self.sim.stacks = [self.starting_stack] * len(self.agents)
|
||
elif self.sim:
|
||
self.sim.agents = self.agents
|
||
self.sim.player_states.append(self.sim.player_states[0].__class__.ACTIVE)
|
||
self.sim.pot.append(0)
|
||
self.sim.stacks.append(self.starting_stack)
|
||
|
||
# 当有至少2个玩家时,触发新一轮
|
||
if len(self.agents) >= 2 and self.sim:
|
||
self.sim.stacks = [self.starting_stack] * len(self.agents)
|
||
self.sim.new_round()
|
||
|
||
return player_id
|
||
|
||
def get_valid_actions(self, player_id) -> List[str]:
|
||
if not self.sim:
|
||
return []
|
||
|
||
try:
|
||
actions_info = self.sim.get_available_actions(player_id)
|
||
if isinstance(actions_info, dict) and actions_info.get("can_act", False):
|
||
valid_actions = []
|
||
|
||
if actions_info.get("can_fold", False):
|
||
valid_actions.append("fold")
|
||
if actions_info.get("can_check", False):
|
||
valid_actions.append("check")
|
||
if actions_info.get("can_call", False):
|
||
valid_actions.append("call")
|
||
if actions_info.get("can_bet", False):
|
||
valid_actions.append("bet")
|
||
if actions_info.get("can_raise", False):
|
||
valid_actions.append("raise")
|
||
|
||
return valid_actions
|
||
else:
|
||
return []
|
||
# 是否需要添加日志
|
||
except Exception as e:
|
||
print(f"Error getting valid actions: {e}")
|
||
valid_actions = ["fold"]
|
||
|
||
try:
|
||
if hasattr(self.sim, 'current_bet') and self.sim.current_bet > 0:
|
||
valid_actions.append("call")
|
||
else:
|
||
valid_actions.append("check")
|
||
|
||
if hasattr(self.sim, 'stacks') and player_id < len(self.sim.stacks):
|
||
if self.sim.stacks[player_id] > getattr(self.sim, 'current_bet', 0):
|
||
if getattr(self.sim, 'current_bet', 0) > 0:
|
||
valid_actions.append("raise")
|
||
else:
|
||
valid_actions.append("bet")
|
||
except Exception:
|
||
pass
|
||
|
||
return valid_actions
|
||
|
||
def apply_action(self, player_id, action, amount: Optional[int] = None) -> dict:
|
||
if not self.sim:
|
||
return {"success": False, "message": "游戏未开始"}
|
||
|
||
try:
|
||
self.sim.apply_action(player_id, action, amount)
|
||
|
||
if (hasattr(self.sim, 'current_stage') and
|
||
getattr(self.sim.current_stage, 'value', '') == 'finished' and
|
||
hasattr(self.sim, 'hand_completed') and
|
||
self.sim.hand_completed):
|
||
|
||
import time
|
||
time.sleep(10)
|
||
print("游戏结束,自动开始新一轮...")
|
||
self.reset_hand_keep_chips()
|
||
|
||
self.sim.dump_data()
|
||
|
||
return {"success": True, "message": f"Applied {action}"}
|
||
except Exception as e:
|
||
return {"success": False, "message": str(e)}
|
||
|
||
def info(self, player_id: Optional[int] = None) -> Dict:
|
||
|
||
if not self.sim:
|
||
return {
|
||
"game_id": self.game_id,
|
||
"players": self.player_names,
|
||
"dealer_index": 0,
|
||
"current_turn": 0,
|
||
"stage": "waiting",
|
||
"total_pot": 0,
|
||
"side_pots": [],
|
||
"player_cards": [],
|
||
"board_cards": [],
|
||
"stacks": [self.starting_stack] * len(self.agents),
|
||
"player_states": ["WAITING"] * len(self.agents),
|
||
"current_pot": [0] * len(self.agents),
|
||
"actions": {"can_act": False, "reason": "游戏未开始"}
|
||
}
|
||
|
||
info_data = {
|
||
"game_id": self.game_id,
|
||
"players": self.player_names,
|
||
"dealer_index": getattr(self.sim, 'dealer_position', 0),
|
||
"current_turn": getattr(self.sim, 'current_turn', 0),
|
||
"stage": getattr(self.sim.current_stage, 'value', 'pre_flop') if hasattr(self.sim, 'current_stage') else 'pre_flop',
|
||
"total_pot": getattr(self.sim, 'total_pot', 0),
|
||
"side_pots": [],
|
||
}
|
||
|
||
try:
|
||
side_pots = self.sim.get_side_pots()
|
||
info_data["side_pots"] = [{"amount": pot.amount, "eligible_players": list(pot.eligible_players)}
|
||
for pot in side_pots]
|
||
except Exception:
|
||
info_data["side_pots"] = []
|
||
|
||
|
||
if player_id is not None and 0 <= player_id < len(self.sim.stacks):
|
||
try:
|
||
player_cards = self.sim.player_cards(player_id)
|
||
info_data["player_cards"] = [str(card) for card in player_cards]
|
||
print(f"DEBUG - Player {player_id} cards: {info_data['player_cards']} (raw: {player_cards})")
|
||
except Exception as e:
|
||
print(f"DEBUG - Error getting player {player_id} cards: {e}")
|
||
info_data["player_cards"] = []
|
||
|
||
try:
|
||
actions_result = self.sim.get_available_actions(player_id)
|
||
info_data["actions"] = actions_result
|
||
except Exception as e:
|
||
info_data["actions"] = {"can_act": False, "reason": f"Error getting actions: {str(e)}"}
|
||
else:
|
||
info_data["player_cards"] = []
|
||
info_data["actions"] = {"can_act": False, "reason": "Invalid player"}
|
||
|
||
try:
|
||
stage_value = getattr(self.sim.current_stage, 'value', 'pre_flop') if hasattr(self.sim, 'current_stage') else 'pre_flop'
|
||
board_cards = self.sim.board_cards(stage_value)
|
||
info_data["board_cards"] = [str(card) for card in board_cards]
|
||
except Exception:
|
||
info_data["board_cards"] = []
|
||
|
||
try:
|
||
info_data["stacks"] = self.sim.stacks.copy() if hasattr(self.sim, 'stacks') else []
|
||
except Exception:
|
||
info_data["stacks"] = []
|
||
|
||
try:
|
||
info_data["player_states"] = [state.value for state in self.sim.player_states] if hasattr(self.sim, 'player_states') else []
|
||
except Exception:
|
||
info_data["player_states"] = []
|
||
|
||
try:
|
||
info_data["current_pot"] = self.sim.pot.copy() if hasattr(self.sim, 'pot') else []
|
||
except Exception:
|
||
info_data["current_pot"] = []
|
||
|
||
if (hasattr(self.sim, 'current_stage') and
|
||
getattr(self.sim.current_stage, 'value', '') == 'finished' and
|
||
self.sim.is_hand_complete()):
|
||
|
||
result = self.sim.complete_hand()
|
||
if result.get("complete", False):
|
||
info_data["showdown_hands"] = result.get("showdown_hands", {})
|
||
info_data["winnings"] = result.get("winnings", {})
|
||
info_data["winners"] = result.get("winners", [])
|
||
|
||
if not hasattr(self, '_result_shown_count'):
|
||
self._result_shown_count = 0
|
||
self._result_shown_count += 1
|
||
|
||
if self._result_shown_count >= 3:
|
||
print(" 自动开始新一轮游戏...")
|
||
self.sim.new_round()
|
||
self._result_shown_count = 0
|
||
info_data["stage"] = "preflop"
|
||
## todo
|
||
return info_data
|
||
|
||
|
||
def get_hand_strength(self, player_id: int) -> Dict:
|
||
ranking = self.sim.evaluate_player_hand(player_id)
|
||
if ranking is None:
|
||
return {"error": "无法评估手牌"}
|
||
return {
|
||
"hand_type": ranking.hand_type.type_name,
|
||
"description": str(ranking),
|
||
"strength": ranking.get_strength(),
|
||
"cards": [str(card) for card in ranking.cards]
|
||
}
|
||
|
||
def check_hand_complete(self) -> bool:
|
||
return self.sim.is_hand_complete() if self.sim else False
|
||
|
||
def get_winners(self) -> Dict:
|
||
|
||
if not self.sim:
|
||
return {"error": "游戏未初始化"}
|
||
|
||
if not self.sim.is_hand_complete():
|
||
return {"error": "手牌未完成"}
|
||
|
||
return self.sim.complete_hand()
|
||
|
||
def showdown(self) -> Dict:
|
||
if not self.sim:
|
||
return {"error": "游戏未初始化"}
|
||
|
||
winners = self.sim.determine_winners()
|
||
showdown_info = {}
|
||
|
||
for pid, ranking in winners.items():
|
||
if ranking is not None:
|
||
showdown_info[pid] = {
|
||
"cards": [str(card) for card in self.sim.player_cards(pid)],
|
||
"hand_type": ranking.hand_type.type_name,
|
||
"description": str(ranking),
|
||
"strength": ranking.get_strength()
|
||
}
|
||
else:
|
||
showdown_info[pid] = {
|
||
"cards": [str(card) for card in self.sim.player_cards(pid)],
|
||
"hand_type": "Winner by default",
|
||
"description": "Other players folded",
|
||
"strength": float('inf')
|
||
}
|
||
|
||
return {
|
||
"showdown": showdown_info,
|
||
"pot_distribution": self.sim.distribute_pot()
|
||
}
|
||
|
||
@property
|
||
def pot(self) -> int:
|
||
return self.sim.total_pot if self.sim else 0
|
||
|
||
@property
|
||
def stacks(self) -> List[int]:
|
||
return self.sim.stacks if self.sim else []
|
||
|
||
@property
|
||
def current_turn(self) -> int:
|
||
return self.sim.current_turn if self.sim else -1
|
||
|
||
@property
|
||
def history(self) -> List[Dict]:
|
||
return self.sim.history if self.sim else []
|
||
|
||
def reset_hand_keep_chips(self):
|
||
if not self.sim or len(self.agents) < 2:
|
||
return
|
||
|
||
if hasattr(self.sim, 'hand_completed') and not self.sim.hand_completed:
|
||
if self.sim.is_hand_complete():
|
||
self.sim.complete_hand()
|
||
|
||
current_stacks = self.sim.stacks.copy()
|
||
|
||
self.sim = Simulation(self.agents, blind_config=self.blind_config)
|
||
|
||
self.sim.stacks = current_stacks
|
||
|
||
self.sim.new_round()
|
||
|
||
def full_reset(self):
|
||
self.agents = []
|
||
self.player_names = []
|
||
self.sim = None
|
||
self.game_id = str(uuid.uuid4()) |