Files
shortdeck/shortdeck_arena/simulation.py
2025-10-11 18:24:24 +08:00

627 lines
22 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from __future__ import annotations
import json
import random
from pathlib import Path
from typing import List, Dict, Optional
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from .agent import Agent
from .card import Card
from .game_stage import GameStage, PlayerState, BlindConfig
from .side_pot import SidePotManager
from .hand_evaluator import HandEvaluator
from .hand_ranking import HandRanking
class Simulation:
def __init__(self, agents: List[Agent], blind_config: Optional[BlindConfig] = None):
self.agents = agents
self.history: List[Dict] = []
self.cards: List[Card] = []
self.saved = False
# 游戏状态管理
self.current_stage = GameStage.PREFLOP
self.player_states: List[PlayerState] = [PlayerState.ACTIVE] * len(agents)
self.current_turn = 0
self.betting_round_complete = False
# 盲注配置
self.blind_config = blind_config or BlindConfig()
# 筹码和底池管理
self.pot: List[int] = [0] * len(agents) # 每个玩家在当前轮的投入
self.total_pot = 0
self.last_raise_amount = 0
self.min_raise = self.blind_config.big_blind
self.dealer_position = -1
# 边池管理和筹码
self.side_pot_manager = SidePotManager()
self.stacks: List[int] = [1000] * len(agents) # 默认筹码
# 用于结算
self.hand_completed = False
self.new_round()
def new_round(self):
self.history = []
self.cards = Card.all_short()
random.shuffle(self.cards) # 洗牌
self.saved = False
# 重置游戏状态
self.current_stage = GameStage.PREFLOP
self.player_states = [PlayerState.ACTIVE] * len(self.agents)
self.betting_round_complete = False
self.hand_completed = False # 重置完成标志
# 重置下注状态
self.pot = [0] * len(self.agents)
self.total_pot = 0
self.last_raise_amount = 0
self.min_raise = self.blind_config.big_blind
# 重置边池管理器
self.side_pot_manager.reset()
# 设置盲注
self._setup_blinds()
# 庄家位置
self.dealer_position = random.choice(range(len(self.agents)))
def _setup_blinds(self):
num_players = len(self.agents)
# 至少需要2个玩家才能设置盲注
if num_players < 2:
self.current_turn = 0 if num_players > 0 else 0
return
sb_pos = self.blind_config.get_sb_position(num_players,self.dealer_position)
bb_pos = self.blind_config.get_bb_position(num_players,self.dealer_position)
# 确保位置有效
if sb_pos >= num_players or bb_pos >= num_players:
self.current_turn = 0
return
# 扣除小盲
sb_amount = min(self.blind_config.small_blind, self.stacks[sb_pos])
self.pot[sb_pos] = sb_amount
self.stacks[sb_pos] -= sb_amount
self.total_pot += sb_amount
self.side_pot_manager.add_investment(sb_pos, sb_amount)
self.history.append({
"pid": sb_pos,
"action": "small_blind",
"amount": sb_amount
})
# 扣除大盲
bb_amount = min(self.blind_config.big_blind, self.stacks[bb_pos])
self.pot[bb_pos] = bb_amount
self.stacks[bb_pos] -= bb_amount
self.total_pot += bb_amount
self.side_pot_manager.add_investment(bb_pos, bb_amount)
self.history.append({
"pid": bb_pos,
"action": "big_blind",
"amount": bb_amount
})
# 首个行动玩家
self.current_turn = self.blind_config.get_first_to_act(self.current_stage, num_players, self.dealer_position)
self.last_raise_amount = self.blind_config.big_blind
def player_cards(self, pid) -> List[Card]:
return self.cards[pid * 2 : (pid * 2 + 2)]
def board_cards(self, street) -> List[Card]:
nplayers = len(self.agents)
idx_start = nplayers * 2
if street == "flop":
return self.cards[idx_start: idx_start + 3]
if street == "turn":
return self.cards[idx_start: idx_start + 4]
if street == "river":
return self.cards[idx_start: idx_start + 5]
return []
def get_current_max_bet(self) -> int:
return max(self.pot) if self.pot else 0
def get_call_amount(self, pid) -> int:
"""
跟注金额
"""
if pid >= len(self.pot):
return 0
max_pot = self.get_current_max_bet()
return max(0, max_pot - self.pot[pid])
def get_min_raise_amount(self, pid) -> int:
call_amount = self.get_call_amount(pid)
min_raise = call_amount + max(self.last_raise_amount, self.blind_config.big_blind)
return min_raise
def get_max_bet_amount(self, pid) -> int:
if pid >= len(self.stacks):
return 0
return self.stacks[pid]
def is_all_in_amount(self, pid, amount) -> bool:
return amount >= self.stacks[pid]
def validate_bet_amount(self, pid, action, amount) -> tuple[bool, str, int]:
if pid >= len(self.stacks):
return False, "无效玩家", amount
available_stack = self.stacks[pid]
call_amount = self.get_call_amount(pid)
if action == "fold":
return True, "", 0
elif action == "check":
if call_amount > 0:
return False, "不能过牌,需跟注或弃牌", 0
return True, "", 0
elif action == "call":
if call_amount == 0:
return False, "不需要跟注", 0
if call_amount >= available_stack:
return True, "", available_stack
return True, "", call_amount
elif action == "allin":
if available_stack <= 0:
return False, "没有筹码进行 all-in", 0
return True, "", available_stack
elif action in ["bet", "raise"]:
if amount <= 0:
return False, "无效下注金额", amount
# allin
if amount >= available_stack:
return True, "", available_stack
if action == "raise":
min_raise = self.get_min_raise_amount(pid)
if amount < min_raise:
return False, f"最小加注金额为 {min_raise}", amount
if action == "bet" and max(self.pot) == 0:
if amount < self.blind_config.big_blind:
return False, f"最小下注金额为 {self.blind_config.big_blind}", amount
return True, "", amount
return False, "无效行为", amount
def get_available_actions(self, pid: int) -> dict:
if self.current_stage in [GameStage.FINISHED, GameStage.SHOWDOWN]:
return {"can_act": False, "reason": "游戏已结束" if self.current_stage == GameStage.FINISHED else "摊牌阶段"}
if pid != self.current_turn:
return {"can_act": False, "reason": "不是你的回合"}
if pid >= len(self.player_states):
return {"can_act": False, "reason": "无效玩家"}
state = self.player_states[pid]
if state in [PlayerState.FOLDED, PlayerState.ALLIN]:
return {"can_act": False, "reason": f"Player state: {state}"}
call_amount = self.get_call_amount(pid)
available_stack = self.stacks[pid]
actions = {
"can_act": True,
"can_fold": True,
"can_check": call_amount == 0,
"can_call": call_amount > 0 and call_amount < available_stack,
"can_bet": call_amount == 0 and available_stack > 0,
"can_raise": call_amount > 0 and available_stack > call_amount,
"can_allin": available_stack > 0,
"call_amount": call_amount,
"min_bet": self.blind_config.big_blind if max(self.pot) == 0 else 0,
"min_raise": self.get_min_raise_amount(pid) if call_amount > 0 else 0,
"max_bet": available_stack,
"stack": available_stack
}
return actions
def is_betting_round_complete(self) -> bool:
"""
检查当前下注轮是否完成
"""
# 首先检查是否只剩一个未弃牌的玩家
non_folded_players = [i for i, state in enumerate(self.player_states)
if state != PlayerState.FOLDED]
if len(non_folded_players) <= 1:
return True
active_or_allin_players = [i for i, state in enumerate(self.player_states)
if state in (PlayerState.ACTIVE, PlayerState.CALLED, PlayerState.ALLIN)]
all_allin_or_folded = all(state in (PlayerState.ALLIN, PlayerState.FOLDED)
for state in self.player_states)
if all_allin_or_folded:
return True
max_pot = self.get_current_max_bet()
# 统计还需要行动的玩家
players_need_action = []
for i in active_or_allin_players:
# allin
if self.player_states[i] == PlayerState.ALLIN:
continue
if self.pot[i] < max_pot:
players_need_action.append(i)
elif self.player_states[i] == PlayerState.ACTIVE:
if (self.current_stage == GameStage.PREFLOP and
i == self.blind_config.get_bb_position(len(self.agents), self.dealer_position)):
# 检查大盲是否已经行动过(除了盲注)
bb_actions = [h for h in self.history if h.get('pid') == i and h.get('action') not in ['big_blind']]
if not bb_actions:
players_need_action.append(i)
return len(players_need_action) == 0
def advance_to_next_street(self):
if self.current_stage == GameStage.FINISHED:
return
next_stage = GameStage.get_next_stage(self.current_stage)
if next_stage is None:
self.current_stage = GameStage.FINISHED
self.complete_hand()
return
self.current_stage = next_stage
active_players = self.get_active_players()
if len(active_players) <= 1:
self.current_stage = GameStage.FINISHED
self.complete_hand()
return
if self.current_stage == GameStage.SHOWDOWN:
self.current_stage = GameStage.FINISHED
self.complete_hand()
return
# 重置下注轮状态
self.betting_round_complete = False
# 重置行动状态
for i, state in enumerate(self.player_states):
if state == PlayerState.CALLED:
self.player_states[i] = PlayerState.ACTIVE
# 首个行动玩家
num_players = len(self.agents)
self.current_turn = self.blind_config.get_first_to_act(self.current_stage, num_players, self.dealer_position)
self.last_raise_amount = 0
self.min_raise = self.blind_config.big_blind
def get_next_active_player(self, start_pos) -> Optional[int]:
for i in range(len(self.agents)):
pos = (start_pos + i) % len(self.agents)
# 只有ACTIVE状态的玩家可以行动ALLIN和FOLDED的玩家不能行动
if self.player_states[pos] == PlayerState.ACTIVE:
return pos
return None
def get_side_pots(self) -> List:
active_players = [
i for i, state in enumerate(self.player_states)
if state not in [PlayerState.FOLDED]
]
return self.side_pot_manager.create_side_pots(active_players)
def node_info(self) -> Dict:
if self.current_turn >= len(self.pot):
return {"bet_min": self.min_raise, "bet_max": 0, "call_amount": 0}
actions = self.get_available_actions(self.current_turn)
return {
"bet_min": actions.get("min_bet", self.min_raise),
"bet_max": actions.get("max_bet", 100),
"call_amount": actions.get("call_amount", 0)
}
def apply_action(self, pid, action, amount):
if pid != self.current_turn:
raise ValueError(f"不是玩家 {pid} 的回合")
if self.player_states[pid] not in (PlayerState.ACTIVE,):
raise ValueError(f"玩家 {pid} 无法行动,当前状态: {self.player_states[pid]}")
action = action.lower()
# 验证动作合法性
is_valid, error_msg, adjusted_amount = self.validate_bet_amount(pid, action, amount or 0)
if not is_valid:
raise ValueError(error_msg)
# 使用调整后的金额
amount = adjusted_amount
self.history.append({"pid": pid, "action": action, "amount": amount})
if action == "fold":
self.player_states[pid] = PlayerState.FOLDED
elif action == "call":
call_amount = self.get_call_amount(pid)
if call_amount == 0:
# check
self.history[-1]["action"] = "check"
self.player_states[pid] = PlayerState.CALLED
else:
# 检查是否all-in
actual_amount = min(call_amount, self.stacks[pid])
if actual_amount >= self.stacks[pid]:
self.player_states[pid] = PlayerState.ALLIN
else:
self.player_states[pid] = PlayerState.CALLED
self.pot[pid] += actual_amount
self.stacks[pid] -= actual_amount
self.total_pot += actual_amount
self.side_pot_manager.add_investment(pid, actual_amount)
elif action == "check":
call_amount = self.get_call_amount(pid)
if call_amount > 0:
raise ValueError("跟注金额>0, 无法过牌,需要跟注或弃牌")
self.player_states[pid] = PlayerState.CALLED
elif action == "allin":
# all-in
actual_amount = self.stacks[pid]
if actual_amount <= 0:
raise ValueError("没有可用筹码进行 all-in")
self.player_states[pid] = PlayerState.ALLIN
self.pot[pid] += actual_amount
self.stacks[pid] = 0
self.total_pot += actual_amount
self.side_pot_manager.add_investment(pid, actual_amount)
# 更新最后加注金额(如果 all-in 金额超过跟注金额)
call_amount = self.get_call_amount(pid)
raise_amount = actual_amount - call_amount
if raise_amount > 0:
self.last_raise_amount = raise_amount
self.min_raise = raise_amount
for i, state in enumerate(self.player_states):
if i != pid and state == PlayerState.CALLED:
self.player_states[i] = PlayerState.ACTIVE
elif action in ("bet", "raise"):
if amount is None:
raise ValueError(f"{action} 需要指定金额")
# 检查是否all-in
actual_amount = min(amount, self.stacks[pid])
if actual_amount >= self.stacks[pid]:
self.player_states[pid] = PlayerState.ALLIN
else:
self.player_states[pid] = PlayerState.CALLED
self.pot[pid] += actual_amount
self.stacks[pid] -= actual_amount
self.total_pot += actual_amount
self.side_pot_manager.add_investment(pid, actual_amount)
# 更新最后加注金额
call_amount = self.get_call_amount(pid)
raise_amount = actual_amount - call_amount
if raise_amount > 0:
self.last_raise_amount = raise_amount
self.min_raise = raise_amount
for i, state in enumerate(self.player_states):
if i != pid and state == PlayerState.CALLED:
self.player_states[i] = PlayerState.ACTIVE
else:
raise ValueError(f"未知动作: {action}")
# 下一个玩家
self._advance_turn()
def _advance_turn(self):
"""
推进回合
"""
# 检查下注轮是否完成
if self.is_betting_round_complete():
self.betting_round_complete = True
self.advance_to_next_street()
else:
# 找到下一个可行动玩家
next_player = self.get_next_active_player(self.current_turn + 1)
if next_player is not None:
self.current_turn = next_player
else:
# 没有玩家需要行动,结束下注轮
self.betting_round_complete = True
self.advance_to_next_street()
def to_save_data(self) -> Dict:
players = [f"Agent{a.pid}" for a in self.agents]
return {
"history": self.history,
"players": players,
"player_cards": ["".join(str(c) for c in self.player_cards(i)) for i in range(len(self.agents))],
"board": "".join(str(c) for c in self.board_cards("river")),
}
def dump_data(self, path: Path | None = None):
if self.saved:
return
if path is None:
path = Path.cwd() / "shortdeck_arena_history.jsonl"
with path.open("a", encoding="utf-8") as f:
f.write(json.dumps(self.to_save_data()))
f.write("\n")
self.saved = True
def evaluate_player_hand(self, pid: int) -> Optional[HandRanking]:
if pid >= len(self.agents):
return None
if self.player_states[pid] == PlayerState.FOLDED:
return None
try:
player_cards = self.player_cards(pid)
board_cards = self.board_cards(self.current_stage.value)
all_cards = player_cards + board_cards
if len(all_cards) < 5:
return None
if len(all_cards) == 5:
return HandEvaluator.evaluate5Cards(all_cards)
return HandEvaluator.evaluateHand(all_cards)
except Exception as e:
print(f"评估玩家 {pid} 手牌时出错: {e}")
return None
def get_active_players(self) -> List[int]:
return [i for i, state in enumerate(self.player_states)
if state not in [PlayerState.FOLDED]]
def is_hand_complete(self) -> bool:
active_players = self.get_active_players()
if len(active_players) <= 1:
return True
# 到达河牌且所有下注完成
if (self.current_stage == GameStage.FINISHED or
(self.current_stage == GameStage.RIVER and self.betting_round_complete)):
return True
return False
def determine_winners(self) -> Dict[int, HandRanking]:
active_players = self.get_active_players()
if not active_players:
return {}
if len(active_players) == 1:
return {active_players[0]: None} # 不需要摊牌
# 多人摊牌
hand_rankings = {}
for pid in active_players:
ranking = self.evaluate_player_hand(pid)
if ranking is not None:
hand_rankings[pid] = ranking
return hand_rankings
def distribute_pot(self) -> Dict[int, int]:
winners = self.determine_winners()
if not winners:
return {}
# 只有一人获胜(其他人弃牌)
if len(winners) == 1 and list(winners.values())[0] is None:
winner_id = list(winners.keys())[0]
return {winner_id: self.total_pot}
# 多人摊牌
if len(winners) > 1:
hand_strengths = {}
for pid, ranking in winners.items():
if ranking is not None:
hand_strengths[pid] = ranking.get_strength()
else:
hand_strengths[pid] = 0 # 弃牌玩家
return self.side_pot_manager.distribute_winnings(hand_strengths)
return {}
def complete_hand(self) -> Dict:
if not self.is_hand_complete():
return {"complete": False, "message": "牌局未结束"}
if self.hand_completed:
return {
"complete": True,
"winners": [],
"winnings": {},
"final_stacks": self.stacks.copy(),
"showdown_hands": {},
"message": "手牌已完成"
}
winners = self.determine_winners()
winnings = self.distribute_pot()
for pid, amount in winnings.items():
if pid < len(self.stacks):
self.stacks[pid] += amount
self.total_pot = 0
self.pot = [0] * len(self.agents)
self.side_pot_manager.reset()
self.current_stage = GameStage.FINISHED
self.hand_completed = True
result = {
"complete": True,
"winners": list(winners.keys()),
"winnings": winnings,
"final_stacks": self.stacks.copy(),
"showdown_hands": {}
}
active_players = [i for i, state in enumerate(self.player_states)
if state != PlayerState.FOLDED]
for pid in active_players:
player_hand = self.player_cards(pid)
if len(player_hand) >= 2:
evaluator = HandEvaluator()
board_cards = self.board_cards()
ranking = evaluator.evaluate(player_hand, board_cards)
result["showdown_hands"][pid] = {
"cards": [str(card) for card in player_hand],
"hand_type": ranking.hand_type.type_name if ranking else "无牌型",
"description": str(ranking) if ranking else "无效手牌",
"is_winner": pid in winners
}
return result