from __future__ import annotations import json import random from pathlib import Path from typing import List, Dict, Optional from typing import TYPE_CHECKING if TYPE_CHECKING: from .agent import Agent from .card import Card from .game_stage import GameStage, PlayerState, BlindConfig from .side_pot import SidePotManager from .hand_evaluator import HandEvaluator from .hand_ranking import HandRanking class Simulation: def __init__(self, agents: List[Agent], blind_config: Optional[BlindConfig] = None): self.agents = agents self.history: List[Dict] = [] self.cards: List[Card] = [] self.saved = False # 游戏状态管理 self.current_stage = GameStage.PREFLOP self.player_states: List[PlayerState] = [PlayerState.ACTIVE] * len(agents) self.current_turn = 0 self.betting_round_complete = False # 盲注配置 self.blind_config = blind_config or BlindConfig() # 筹码和底池管理 self.pot: List[int] = [0] * len(agents) # 每个玩家在当前轮的投入 self.total_pot = 0 self.last_raise_amount = 0 self.min_raise = self.blind_config.big_blind self.dealer_position = -1 # 边池管理和筹码 self.side_pot_manager = SidePotManager() self.stacks: List[int] = [1000] * len(agents) # 默认筹码 # 用于结算 self.hand_completed = False self.new_round() def new_round(self): self.history = [] self.cards = Card.all_short() random.shuffle(self.cards) # 洗牌 self.saved = False # 重置游戏状态 self.current_stage = GameStage.PREFLOP self.player_states = [PlayerState.ACTIVE] * len(self.agents) self.betting_round_complete = False self.hand_completed = False # 重置完成标志 # 重置下注状态 self.pot = [0] * len(self.agents) self.total_pot = 0 self.last_raise_amount = 0 self.min_raise = self.blind_config.big_blind # 重置边池管理器 self.side_pot_manager.reset() # 设置盲注 self._setup_blinds() # 庄家位置 self.dealer_position = random.choice(range(len(self.agents))) def _setup_blinds(self): num_players = len(self.agents) # 至少需要2个玩家才能设置盲注 if num_players < 2: self.current_turn = 0 if num_players > 0 else 0 return sb_pos = self.blind_config.get_sb_position(num_players,self.dealer_position) bb_pos = self.blind_config.get_bb_position(num_players,self.dealer_position) # 确保位置有效 if sb_pos >= num_players or bb_pos >= num_players: self.current_turn = 0 return # 扣除小盲 sb_amount = min(self.blind_config.small_blind, self.stacks[sb_pos]) self.pot[sb_pos] = sb_amount self.stacks[sb_pos] -= sb_amount self.total_pot += sb_amount self.side_pot_manager.add_investment(sb_pos, sb_amount) self.history.append({ "pid": sb_pos, "action": "small_blind", "amount": sb_amount }) # 扣除大盲 bb_amount = min(self.blind_config.big_blind, self.stacks[bb_pos]) self.pot[bb_pos] = bb_amount self.stacks[bb_pos] -= bb_amount self.total_pot += bb_amount self.side_pot_manager.add_investment(bb_pos, bb_amount) self.history.append({ "pid": bb_pos, "action": "big_blind", "amount": bb_amount }) # 首个行动玩家 self.current_turn = self.blind_config.get_first_to_act(self.current_stage, num_players, self.dealer_position) self.last_raise_amount = self.blind_config.big_blind def player_cards(self, pid) -> List[Card]: return self.cards[pid * 2 : (pid * 2 + 2)] def board_cards(self, street) -> List[Card]: nplayers = len(self.agents) idx_start = nplayers * 2 if street == "flop": return self.cards[idx_start: idx_start + 3] if street == "turn": return self.cards[idx_start: idx_start + 4] if street == "river": return self.cards[idx_start: idx_start + 5] return [] def get_current_max_bet(self) -> int: return max(self.pot) if self.pot else 0 def get_call_amount(self, pid) -> int: """ 跟注金额 """ if pid >= len(self.pot): return 0 max_pot = self.get_current_max_bet() return max(0, max_pot - self.pot[pid]) def get_min_raise_amount(self, pid) -> int: call_amount = self.get_call_amount(pid) min_raise = call_amount + max(self.last_raise_amount, self.blind_config.big_blind) return min_raise def get_max_bet_amount(self, pid) -> int: if pid >= len(self.stacks): return 0 return self.stacks[pid] def is_all_in_amount(self, pid, amount) -> bool: return amount >= self.stacks[pid] def validate_bet_amount(self, pid, action, amount) -> tuple[bool, str, int]: if pid >= len(self.stacks): return False, "无效玩家", amount available_stack = self.stacks[pid] call_amount = self.get_call_amount(pid) if action == "fold": return True, "", 0 elif action == "check": if call_amount > 0: return False, "不能过牌,需跟注或弃牌", 0 return True, "", 0 elif action == "call": if call_amount == 0: return False, "不需要跟注", 0 if call_amount >= available_stack: return True, "", available_stack return True, "", call_amount elif action == "allin": if available_stack <= 0: return False, "没有筹码进行 all-in", 0 return True, "", available_stack elif action in ["bet", "raise"]: if amount <= 0: return False, "无效下注金额", amount # allin if amount >= available_stack: return True, "", available_stack if action == "raise": min_raise = self.get_min_raise_amount(pid) if amount < min_raise: return False, f"最小加注金额为 {min_raise}", amount if action == "bet" and max(self.pot) == 0: if amount < self.blind_config.big_blind: return False, f"最小下注金额为 {self.blind_config.big_blind}", amount return True, "", amount return False, "无效行为", amount def get_available_actions(self, pid: int) -> dict: if self.current_stage in [GameStage.FINISHED, GameStage.SHOWDOWN]: return {"can_act": False, "reason": "游戏已结束" if self.current_stage == GameStage.FINISHED else "摊牌阶段"} if pid != self.current_turn: return {"can_act": False, "reason": "不是你的回合"} if pid >= len(self.player_states): return {"can_act": False, "reason": "无效玩家"} state = self.player_states[pid] if state in [PlayerState.FOLDED, PlayerState.ALLIN]: return {"can_act": False, "reason": f"Player state: {state}"} call_amount = self.get_call_amount(pid) available_stack = self.stacks[pid] actions = { "can_act": True, "can_fold": True, "can_check": call_amount == 0, "can_call": call_amount > 0 and call_amount < available_stack, "can_bet": call_amount == 0 and available_stack > 0, "can_raise": call_amount > 0 and available_stack > call_amount, "can_allin": available_stack > 0, "call_amount": call_amount, "min_bet": self.blind_config.big_blind if max(self.pot) == 0 else 0, "min_raise": self.get_min_raise_amount(pid) if call_amount > 0 else 0, "max_bet": available_stack, "stack": available_stack } return actions def is_betting_round_complete(self) -> bool: """ 检查当前下注轮是否完成 """ # 首先检查是否只剩一个未弃牌的玩家 non_folded_players = [i for i, state in enumerate(self.player_states) if state != PlayerState.FOLDED] if len(non_folded_players) <= 1: return True active_or_allin_players = [i for i, state in enumerate(self.player_states) if state in (PlayerState.ACTIVE, PlayerState.CALLED, PlayerState.ALLIN)] all_allin_or_folded = all(state in (PlayerState.ALLIN, PlayerState.FOLDED) for state in self.player_states) if all_allin_or_folded: return True max_pot = self.get_current_max_bet() # 统计还需要行动的玩家 players_need_action = [] for i in active_or_allin_players: # allin if self.player_states[i] == PlayerState.ALLIN: continue if self.pot[i] < max_pot: players_need_action.append(i) elif self.player_states[i] == PlayerState.ACTIVE: if (self.current_stage == GameStage.PREFLOP and i == self.blind_config.get_bb_position(len(self.agents), self.dealer_position)): # 检查大盲是否已经行动过(除了盲注) bb_actions = [h for h in self.history if h.get('pid') == i and h.get('action') not in ['big_blind']] if not bb_actions: players_need_action.append(i) return len(players_need_action) == 0 def advance_to_next_street(self): if self.current_stage == GameStage.FINISHED: return next_stage = GameStage.get_next_stage(self.current_stage) if next_stage is None: self.current_stage = GameStage.FINISHED self.complete_hand() return self.current_stage = next_stage active_players = self.get_active_players() if len(active_players) <= 1: self.current_stage = GameStage.FINISHED self.complete_hand() return if self.current_stage == GameStage.SHOWDOWN: self.current_stage = GameStage.FINISHED self.complete_hand() return # 重置下注轮状态 self.betting_round_complete = False # 重置行动状态 for i, state in enumerate(self.player_states): if state == PlayerState.CALLED: self.player_states[i] = PlayerState.ACTIVE # 首个行动玩家 num_players = len(self.agents) self.current_turn = self.blind_config.get_first_to_act(self.current_stage, num_players, self.dealer_position) self.last_raise_amount = 0 self.min_raise = self.blind_config.big_blind def get_next_active_player(self, start_pos) -> Optional[int]: for i in range(len(self.agents)): pos = (start_pos + i) % len(self.agents) # 只有ACTIVE状态的玩家可以行动,ALLIN和FOLDED的玩家不能行动 if self.player_states[pos] == PlayerState.ACTIVE: return pos return None def get_side_pots(self) -> List: active_players = [ i for i, state in enumerate(self.player_states) if state not in [PlayerState.FOLDED] ] return self.side_pot_manager.create_side_pots(active_players) def node_info(self) -> Dict: if self.current_turn >= len(self.pot): return {"bet_min": self.min_raise, "bet_max": 0, "call_amount": 0} actions = self.get_available_actions(self.current_turn) return { "bet_min": actions.get("min_bet", self.min_raise), "bet_max": actions.get("max_bet", 100), "call_amount": actions.get("call_amount", 0) } def apply_action(self, pid, action, amount): if pid != self.current_turn: raise ValueError(f"不是玩家 {pid} 的回合") if self.player_states[pid] not in (PlayerState.ACTIVE,): raise ValueError(f"玩家 {pid} 无法行动,当前状态: {self.player_states[pid]}") action = action.lower() # 验证动作合法性 is_valid, error_msg, adjusted_amount = self.validate_bet_amount(pid, action, amount or 0) if not is_valid: raise ValueError(error_msg) # 使用调整后的金额 amount = adjusted_amount self.history.append({"pid": pid, "action": action, "amount": amount}) if action == "fold": self.player_states[pid] = PlayerState.FOLDED elif action == "call": call_amount = self.get_call_amount(pid) if call_amount == 0: # check self.history[-1]["action"] = "check" self.player_states[pid] = PlayerState.CALLED else: # 检查是否all-in actual_amount = min(call_amount, self.stacks[pid]) if actual_amount >= self.stacks[pid]: self.player_states[pid] = PlayerState.ALLIN else: self.player_states[pid] = PlayerState.CALLED self.pot[pid] += actual_amount self.stacks[pid] -= actual_amount self.total_pot += actual_amount self.side_pot_manager.add_investment(pid, actual_amount) elif action == "check": call_amount = self.get_call_amount(pid) if call_amount > 0: raise ValueError("跟注金额>0, 无法过牌,需要跟注或弃牌") self.player_states[pid] = PlayerState.CALLED elif action == "allin": # all-in actual_amount = self.stacks[pid] if actual_amount <= 0: raise ValueError("没有可用筹码进行 all-in") self.player_states[pid] = PlayerState.ALLIN self.pot[pid] += actual_amount self.stacks[pid] = 0 self.total_pot += actual_amount self.side_pot_manager.add_investment(pid, actual_amount) # 更新最后加注金额(如果 all-in 金额超过跟注金额) call_amount = self.get_call_amount(pid) raise_amount = actual_amount - call_amount if raise_amount > 0: self.last_raise_amount = raise_amount self.min_raise = raise_amount for i, state in enumerate(self.player_states): if i != pid and state == PlayerState.CALLED: self.player_states[i] = PlayerState.ACTIVE elif action in ("bet", "raise"): if amount is None: raise ValueError(f"{action} 需要指定金额") # 检查是否all-in actual_amount = min(amount, self.stacks[pid]) if actual_amount >= self.stacks[pid]: self.player_states[pid] = PlayerState.ALLIN else: self.player_states[pid] = PlayerState.CALLED self.pot[pid] += actual_amount self.stacks[pid] -= actual_amount self.total_pot += actual_amount self.side_pot_manager.add_investment(pid, actual_amount) # 更新最后加注金额 call_amount = self.get_call_amount(pid) raise_amount = actual_amount - call_amount if raise_amount > 0: self.last_raise_amount = raise_amount self.min_raise = raise_amount for i, state in enumerate(self.player_states): if i != pid and state == PlayerState.CALLED: self.player_states[i] = PlayerState.ACTIVE else: raise ValueError(f"未知动作: {action}") # 下一个玩家 self._advance_turn() def _advance_turn(self): """ 推进回合 """ # 检查下注轮是否完成 if self.is_betting_round_complete(): self.betting_round_complete = True self.advance_to_next_street() else: # 找到下一个可行动玩家 next_player = self.get_next_active_player(self.current_turn + 1) if next_player is not None: self.current_turn = next_player else: # 没有玩家需要行动,结束下注轮 self.betting_round_complete = True self.advance_to_next_street() def to_save_data(self) -> Dict: players = [f"Agent{a.pid}" for a in self.agents] return { "history": self.history, "players": players, "player_cards": ["".join(str(c) for c in self.player_cards(i)) for i in range(len(self.agents))], "board": "".join(str(c) for c in self.board_cards("river")), } def dump_data(self, path: Path | None = None): if self.saved: return if path is None: path = Path.cwd() / "shortdeck_arena_history.jsonl" with path.open("a", encoding="utf-8") as f: f.write(json.dumps(self.to_save_data())) f.write("\n") self.saved = True def evaluate_player_hand(self, pid: int) -> Optional[HandRanking]: if pid >= len(self.agents): return None if self.player_states[pid] == PlayerState.FOLDED: return None try: player_cards = self.player_cards(pid) board_cards = self.board_cards(self.current_stage.value) all_cards = player_cards + board_cards if len(all_cards) < 5: return None if len(all_cards) == 5: return HandEvaluator.evaluate5Cards(all_cards) return HandEvaluator.evaluateHand(all_cards) except Exception as e: print(f"评估玩家 {pid} 手牌时出错: {e}") return None def get_active_players(self) -> List[int]: return [i for i, state in enumerate(self.player_states) if state not in [PlayerState.FOLDED]] def is_hand_complete(self) -> bool: active_players = self.get_active_players() if len(active_players) <= 1: return True # 到达河牌且所有下注完成 if (self.current_stage == GameStage.FINISHED or (self.current_stage == GameStage.RIVER and self.betting_round_complete)): return True return False def determine_winners(self) -> Dict[int, HandRanking]: active_players = self.get_active_players() if not active_players: return {} if len(active_players) == 1: return {active_players[0]: None} # 不需要摊牌 # 多人摊牌 hand_rankings = {} for pid in active_players: ranking = self.evaluate_player_hand(pid) if ranking is not None: hand_rankings[pid] = ranking return hand_rankings def distribute_pot(self) -> Dict[int, int]: winners = self.determine_winners() if not winners: return {} # 只有一人获胜(其他人弃牌) if len(winners) == 1 and list(winners.values())[0] is None: winner_id = list(winners.keys())[0] return {winner_id: self.total_pot} # 多人摊牌 if len(winners) > 1: hand_strengths = {} for pid, ranking in winners.items(): if ranking is not None: hand_strengths[pid] = ranking.get_strength() else: hand_strengths[pid] = 0 # 弃牌玩家 return self.side_pot_manager.distribute_winnings(hand_strengths) return {} def complete_hand(self) -> Dict: if not self.is_hand_complete(): return {"complete": False, "message": "牌局未结束"} if self.hand_completed: return { "complete": True, "winners": [], "winnings": {}, "final_stacks": self.stacks.copy(), "showdown_hands": {}, "message": "手牌已完成" } winners = self.determine_winners() winnings = self.distribute_pot() for pid, amount in winnings.items(): if pid < len(self.stacks): self.stacks[pid] += amount self.total_pot = 0 self.pot = [0] * len(self.agents) self.side_pot_manager.reset() self.current_stage = GameStage.FINISHED self.hand_completed = True result = { "complete": True, "winners": list(winners.keys()), "winnings": winnings, "final_stacks": self.stacks.copy(), "showdown_hands": {} } active_players = [i for i, state in enumerate(self.player_states) if state != PlayerState.FOLDED] for pid in active_players: player_hand = self.player_cards(pid) if len(player_hand) >= 2: evaluator = HandEvaluator() board_cards = self.board_cards() ranking = evaluator.evaluate(player_hand, board_cards) result["showdown_hands"][pid] = { "cards": [str(card) for card in player_hand], "hand_type": ranking.hand_type.type_name if ranking else "无牌型", "description": str(ranking) if ranking else "无效手牌", "is_winner": pid in winners } return result