Files
shortdeck/shortdeck_arena/simulation.py
2025-10-09 15:28:34 +08:00

570 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from __future__ import annotations
import json
import random
from pathlib import Path
from typing import List, Dict, Optional
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from .agent import Agent
from .card import Card
from .game_stage import GameStage, PlayerState, BlindConfig
from .side_pot import SidePotManager
from .hand_evaluator import HandEvaluator
from .hand_ranking import HandRanking
class Simulation:
def __init__(self, agents: List[Agent], blind_config: Optional[BlindConfig] = None):
self.agents = agents
self.history: List[Dict] = []
self.cards: List[Card] = []
self.saved = False
# 游戏状态管理
self.current_stage = GameStage.PREFLOP
self.player_states: List[PlayerState] = [PlayerState.ACTIVE] * len(agents)
self.current_turn = 0
self.betting_round_complete = False
# 盲注配置
self.blind_config = blind_config or BlindConfig()
# 筹码和底池管理
self.pot: List[int] = [0] * len(agents) # 每个玩家在当前轮的投入
self.total_pot = 0
self.last_raise_amount = 0
self.min_raise = self.blind_config.big_blind
self.dealer_position = -1
# 边池管理和筹码
self.side_pot_manager = SidePotManager()
self.stacks: List[int] = [1000] * len(agents) # 默认筹码
self.new_round()
def new_round(self):
self.history = []
self.cards = Card.all_short()
random.shuffle(self.cards) # 洗牌
self.saved = False
# 重置游戏状态
self.current_stage = GameStage.PREFLOP
self.player_states = [PlayerState.ACTIVE] * len(self.agents)
self.betting_round_complete = False
# 重置下注状态
self.pot = [0] * len(self.agents)
self.total_pot = 0
self.last_raise_amount = 0
self.min_raise = self.blind_config.big_blind
# 重置边池管理器
self.side_pot_manager.reset()
# 设置盲注
self._setup_blinds()
# 庄家位置
self.dealer_position = random.choice(range(len(self.agents)))
def _setup_blinds(self):
num_players = len(self.agents)
# 至少需要2个玩家才能设置盲注
if num_players < 2:
self.current_turn = 0 if num_players > 0 else 0
return
sb_pos = self.blind_config.get_sb_position(num_players,self.dealer_position)
bb_pos = self.blind_config.get_bb_position(num_players,self.dealer_position)
# 确保位置有效
if sb_pos >= num_players or bb_pos >= num_players:
self.current_turn = 0
return
# 扣除小盲
sb_amount = min(self.blind_config.small_blind, self.stacks[sb_pos])
self.pot[sb_pos] = sb_amount
self.stacks[sb_pos] -= sb_amount
self.total_pot += sb_amount
self.side_pot_manager.add_investment(sb_pos, sb_amount)
self.history.append({
"pid": sb_pos,
"action": "small_blind",
"amount": sb_amount
})
# 扣除大盲
bb_amount = min(self.blind_config.big_blind, self.stacks[bb_pos])
self.pot[bb_pos] = bb_amount
self.stacks[bb_pos] -= bb_amount
self.total_pot += bb_amount
self.side_pot_manager.add_investment(bb_pos, bb_amount)
self.history.append({
"pid": bb_pos,
"action": "big_blind",
"amount": bb_amount
})
# 首个行动玩家
self.current_turn = self.blind_config.get_first_to_act(self.current_stage, num_players, self.dealer_position)
self.last_raise_amount = self.blind_config.big_blind
def player_cards(self, pid) -> List[Card]:
return self.cards[pid * 2 : (pid * 2 + 2)]
def board_cards(self, street) -> List[Card]:
nplayers = len(self.agents)
idx_start = nplayers * 2
if street == "flop":
return self.cards[idx_start: idx_start + 3]
if street == "turn":
return self.cards[idx_start: idx_start + 4]
if street == "river":
return self.cards[idx_start: idx_start + 5]
return []
def get_current_max_bet(self) -> int:
return max(self.pot) if self.pot else 0
def get_call_amount(self, pid) -> int:
"""
跟注金额
"""
if pid >= len(self.pot):
return 0
max_pot = self.get_current_max_bet()
return max(0, max_pot - self.pot[pid])
def get_min_raise_amount(self, pid) -> int:
"""最小加注金额"""
call_amount = self.get_call_amount(pid)
min_raise = call_amount + max(self.last_raise_amount, self.blind_config.big_blind)
return min_raise
def get_max_bet_amount(self, pid) -> int:
"""最大下注金额(剩余筹码)"""
if pid >= len(self.stacks):
return 0
return self.stacks[pid]
def is_all_in_amount(self, pid, amount) -> bool:
"""检查是否为allin"""
return amount >= self.stacks[pid]
def validate_bet_amount(self, pid, action, amount) -> tuple[bool, str, int]:
"""
验证下注金额合法性
"""
if pid >= len(self.stacks):
return False, "无效玩家", amount
available_stack = self.stacks[pid]
call_amount = self.get_call_amount(pid)
if action == "fold":
return True, "", 0
elif action == "check":
if call_amount > 0:
return False, "不能过牌,需跟注或弃牌", 0
return True, "", 0
elif action == "call":
if call_amount == 0:
return False, "不需要跟注", 0
# All-in call
if call_amount >= available_stack:
return True, "", available_stack
return True, "", call_amount
elif action in ["bet", "raise"]:
if amount <= 0:
return False, "无效下注金额", amount
# allin
if amount >= available_stack:
return True, "", available_stack
if action == "raise":
min_raise = self.get_min_raise_amount(pid)
if amount < min_raise:
return False, f"最小加注金额为 {min_raise}", amount
if action == "bet" and max(self.pot) == 0:
if amount < self.blind_config.big_blind:
return False, f"最小下注金额为 {self.blind_config.big_blind}", amount
return True, "", amount
return False, "无效行为", amount
def get_available_actions(self, pid: int) -> dict:
if pid != self.current_turn:
return {"can_act": False, "reason": "不是你的回合"}
if pid >= len(self.player_states):
return {"can_act": False, "reason": "无效玩家"}
state = self.player_states[pid]
if state in [PlayerState.FOLDED, PlayerState.ALLIN, PlayerState.OUT]:
return {"can_act": False, "reason": f"Player state: {state}"}
call_amount = self.get_call_amount(pid)
available_stack = self.stacks[pid]
actions = {
"can_act": True,
"can_fold": True,
"can_check": call_amount == 0,
"can_call": call_amount > 0 and call_amount < available_stack,
"can_bet": max(self.pot) == 0 and available_stack > 0,
"can_raise": call_amount > 0 and available_stack > call_amount,
"can_allin": available_stack > 0,
"call_amount": call_amount,
"min_bet": self.blind_config.big_blind if max(self.pot) == 0 else 0,
"min_raise": self.get_min_raise_amount(pid) if call_amount > 0 else 0,
"max_bet": available_stack,
"stack": available_stack
}
return actions
def is_betting_round_complete(self) -> bool:
"""
检查当前下注轮是否完成
"""
active_players = [i for i, state in enumerate(self.player_states)
if state in (PlayerState.ACTIVE, PlayerState.CALLED)]
if len(active_players) <= 1:
return True
# 检查所有active玩家是否都已投入相同金额且所有人都已经行动过
max_pot = self.get_current_max_bet()
# 统计还需要行动的玩家
players_need_action = []
for i in active_players:
# allin
if self.player_states[i] == PlayerState.ALLIN:
continue
# 投入金额不足的玩家需要行动
if self.pot[i] < max_pot:
players_need_action.append(i)
# Active状态的玩家如果还没有在本轮行动过也需要行动
elif self.player_states[i] == PlayerState.ACTIVE:
# 在翻前,大盲玩家即使投入了足够金额,也有权行动一次
if (self.current_stage == GameStage.PREFLOP and
i == self.blind_config.get_bb_position(len(self.agents), self.dealer_position)):
# 检查大盲是否已经行动过(除了盲注)
bb_actions = [h for h in self.history if h.get('pid') == i and h.get('action') not in ['big_blind']]
if not bb_actions:
players_need_action.append(i)
return len(players_need_action) == 0
def advance_to_next_street(self):
if self.current_stage == GameStage.FINISHED:
return
next_stage = GameStage.get_next_stage(self.current_stage)
if next_stage is None:
self.current_stage = GameStage.FINISHED
self.complete_hand()
return
self.current_stage = next_stage
active_players = self.get_active_players()
if len(active_players) <= 1:
self.current_stage = GameStage.FINISHED
self.complete_hand()
return
# 重置下注轮状态
self.betting_round_complete = False
# 重置行动状态
for i, state in enumerate(self.player_states):
if state == PlayerState.CALLED:
self.player_states[i] = PlayerState.ACTIVE
# 首个行动玩家
num_players = len(self.agents)
self.current_turn = self.blind_config.get_first_to_act(self.current_stage, num_players, self.dealer_position)
self.last_raise_amount = 0
self.min_raise = self.blind_config.big_blind
def get_next_active_player(self, start_pos) -> Optional[int]:
for i in range(len(self.agents)):
pos = (start_pos + i) % len(self.agents)
if self.player_states[pos] == PlayerState.ACTIVE:
return pos
return None
def get_side_pots(self) -> List:
active_players = [
i for i, state in enumerate(self.player_states)
if state not in [PlayerState.FOLDED, PlayerState.OUT]
]
return self.side_pot_manager.create_side_pots(active_players)
def node_info(self) -> Dict:
if self.current_turn >= len(self.pot):
return {"bet_min": self.min_raise, "bet_max": 0, "call_amount": 0}
actions = self.get_available_actions(self.current_turn)
return {
"bet_min": actions.get("min_bet", self.min_raise),
"bet_max": actions.get("max_bet", 100),
"call_amount": actions.get("call_amount", 0)
}
def apply_action(self, pid, action, amount):
if pid != self.current_turn:
raise ValueError(f"不是玩家 {pid} 的回合")
if self.player_states[pid] not in (PlayerState.ACTIVE,):
raise ValueError(f"玩家 {pid} 无法行动,当前状态: {self.player_states[pid]}")
action = action.lower()
# 验证动作合法性
is_valid, error_msg, adjusted_amount = self.validate_bet_amount(pid, action, amount or 0)
if not is_valid:
raise ValueError(error_msg)
# 使用调整后的金额
amount = adjusted_amount
self.history.append({"pid": pid, "action": action, "amount": amount})
if action == "fold":
self.player_states[pid] = PlayerState.FOLDED
elif action == "call":
call_amount = self.get_call_amount(pid)
if call_amount == 0:
# check
self.history[-1]["action"] = "check"
self.player_states[pid] = PlayerState.CALLED
else:
# 检查是否all-in
actual_amount = min(call_amount, self.stacks[pid])
if actual_amount >= self.stacks[pid]:
self.player_states[pid] = PlayerState.ALLIN
else:
self.player_states[pid] = PlayerState.CALLED
self.pot[pid] += actual_amount
self.stacks[pid] -= actual_amount
self.total_pot += actual_amount
self.side_pot_manager.add_investment(pid, actual_amount)
elif action == "check":
call_amount = self.get_call_amount(pid)
if call_amount > 0:
raise ValueError("跟注金额>0, 无法过牌,需要跟注或弃牌")
self.player_states[pid] = PlayerState.CALLED
elif action in ("bet", "raise"):
if amount is None:
raise ValueError(f"{action} 需要指定金额")
# 检查是否all-in
actual_amount = min(amount, self.stacks[pid])
if actual_amount >= self.stacks[pid]:
self.player_states[pid] = PlayerState.ALLIN
else:
self.player_states[pid] = PlayerState.CALLED
self.pot[pid] += actual_amount
self.stacks[pid] -= actual_amount
self.total_pot += actual_amount
self.side_pot_manager.add_investment(pid, actual_amount)
# 更新最后加注金额
call_amount = self.get_call_amount(pid)
raise_amount = actual_amount - call_amount
if raise_amount > 0:
self.last_raise_amount = raise_amount
self.min_raise = raise_amount
for i, state in enumerate(self.player_states):
if i != pid and state == PlayerState.CALLED:
self.player_states[i] = PlayerState.ACTIVE
else:
raise ValueError(f"未知动作: {action}")
# 下一个玩家
self._advance_turn()
def _advance_turn(self):
"""
推进回合
"""
# 检查下注轮是否完成
if self.is_betting_round_complete():
self.betting_round_complete = True
self.advance_to_next_street()
else:
# 找到下一个可行动玩家
next_player = self.get_next_active_player(self.current_turn + 1)
if next_player is not None:
self.current_turn = next_player
else:
# 没有玩家需要行动,结束下注轮
self.betting_round_complete = True
self.advance_to_next_street()
def to_save_data(self) -> Dict:
players = [f"Agent{a.pid}" for a in self.agents]
return {
"history": self.history,
"players": players,
"player_cards": ["".join(str(c) for c in self.player_cards(i)) for i in range(len(self.agents))],
"board": "".join(str(c) for c in self.board_cards("river")),
}
def dump_data(self, path: Path | None = None):
if self.saved:
return
if path is None:
path = Path.cwd() / "shortdeck_arena_history.jsonl"
with path.open("a", encoding="utf-8") as f:
f.write(json.dumps(self.to_save_data()))
f.write("\n")
self.saved = True
def evaluate_player_hand(self, pid: int) -> Optional[HandRanking]:
"""评估玩家手牌强度"""
if pid >= len(self.agents):
return None
if self.player_states[pid] == PlayerState.FOLDED:
return None
try:
# 获取玩家手牌
player_cards = self.player_cards(pid)
# 获取公共牌
board_cards = self.board_cards(self.current_stage.value)
# 至少需要5张牌才能评估
all_cards = player_cards + board_cards
if len(all_cards) < 5:
return None
# 如果正好5张牌直接评估
if len(all_cards) == 5:
return HandEvaluator.evaluate5Cards(all_cards)
# 如果超过5张牌找最佳组合
return HandEvaluator.evaluateHand(all_cards)
except Exception as e:
print(f"评估玩家 {pid} 手牌时出错: {e}")
return None
def get_active_players(self) -> List[int]:
return [i for i, state in enumerate(self.player_states)
if state not in [PlayerState.FOLDED, PlayerState.OUT]]
def is_hand_complete(self) -> bool:
active_players = self.get_active_players()
if len(active_players) <= 1:
return True
# 到达河牌且所有下注完成
if (self.current_stage == GameStage.FINISHED or
(self.current_stage == GameStage.RIVER and self.betting_round_complete)):
return True
return False
def determine_winners(self) -> Dict[int, HandRanking]:
active_players = self.get_active_players()
if not active_players:
return {}
if len(active_players) == 1:
return {active_players[0]: None} # 不需要摊牌
# 多人摊牌
hand_rankings = {}
for pid in active_players:
ranking = self.evaluate_player_hand(pid)
if ranking is not None:
hand_rankings[pid] = ranking
return hand_rankings
def distribute_pot(self) -> Dict[int, int]:
winners = self.determine_winners()
if not winners:
return {}
# 只有一人获胜(其他人弃牌)
if len(winners) == 1 and list(winners.values())[0] is None:
winner_id = list(winners.keys())[0]
return {winner_id: self.total_pot}
# 多人摊牌,使用边池分配
if len(winners) > 1:
#转换HandRanking为数值强度
hand_strengths = {}
for pid, ranking in winners.items():
if ranking is not None:
hand_strengths[pid] = ranking.get_strength()
else:
hand_strengths[pid] = 0 # 弃牌玩家
return self.side_pot_manager.distribute_winnings(hand_strengths)
return {}
def complete_hand(self) -> Dict:
if not self.is_hand_complete():
return {"complete": False, "message": "牌局未结束"}
winners = self.determine_winners()
winnings = self.distribute_pot()
# 更新筹码
for pid, amount in winnings.items():
if pid < len(self.stacks):
self.stacks[pid] += amount
self.current_stage = GameStage.FINISHED
result = {
"complete": True,
"winners": list(winners.keys()),
"winnings": winnings,
"final_stacks": self.stacks.copy(),
"showdown_hands": {}
}
# 摊牌信息
for pid, ranking in winners.items():
if ranking is not None:
result["showdown_hands"][pid] = {
"cards": [str(card) for card in self.player_cards(pid)],
"hand_type": ranking.hand_type.type_name,
"description": str(ranking)
}
return result