Files
shortdeck/shortdeck_arena/simulation.py
2025-10-09 15:28:34 +08:00

288 lines
9.9 KiB
Python

from __future__ import annotations
import json
import random
from pathlib import Path
from typing import List, Dict, Optional
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from .agent import Agent
from .card import Card
from .game_stage import GameStage, PlayerState, BlindConfig
class Simulation:
def __init__(self, agents: List[Agent], blind_config: Optional[BlindConfig] = None):
self.agents = agents
self.history: List[Dict] = []
self.cards: List[Card] = []
self.saved = False
# 游戏状态管理
self.current_stage = GameStage.PREFLOP
self.player_states: List[PlayerState] = [PlayerState.ACTIVE] * len(agents)
self.current_turn = 0
self.betting_round_complete = False
# 盲注配置
self.blind_config = blind_config or BlindConfig()
# 筹码和底池管理
self.pot: List[int] = [0] * len(agents) # 每个玩家在当前轮的投入
self.total_pot = 0
self.last_raise_amount = 0
self.min_raise = self.blind_config.big_blind
self.dealer_position = -1
self.new_round()
def new_round(self):
self.history = []
self.cards = Card.all_short()
random.shuffle(self.cards) # 洗牌
self.saved = False
# 重置游戏状态
self.current_stage = GameStage.PREFLOP
self.player_states = [PlayerState.ACTIVE] * len(self.agents)
self.betting_round_complete = False
# 重置下注状态
self.pot = [0] * len(self.agents)
self.total_pot = 0
self.last_raise_amount = 0
self.min_raise = self.blind_config.big_blind
# 设置盲注
self._setup_blinds()
# 庄家位置
self.dealer_position = random.choice(range(len(self.agents)))
def _setup_blinds(self):
"""设置盲注"""
num_players = len(self.agents)
# 至少需要2个玩家才能设置盲注
if num_players < 2:
self.current_turn = 0 if num_players > 0 else 0
return
sb_pos = self.blind_config.get_sb_position(num_players,self.dealer_position)
bb_pos = self.blind_config.get_bb_position(num_players,self.dealer_position)
# 确保位置有效
if sb_pos >= num_players or bb_pos >= num_players:
self.current_turn = 0
return
# 扣除小盲
self.pot[sb_pos] = self.blind_config.small_blind
self.total_pot += self.blind_config.small_blind
self.history.append({
"pid": sb_pos,
"action": "small_blind",
"amount": self.blind_config.small_blind
})
# 扣除大盲
self.pot[bb_pos] = self.blind_config.big_blind
self.total_pot += self.blind_config.big_blind
self.history.append({
"pid": bb_pos,
"action": "big_blind",
"amount": self.blind_config.big_blind
})
# 首个行动玩家
self.current_turn = self.blind_config.get_first_to_act(self.current_stage, num_players)
self.last_raise_amount = self.blind_config.big_blind
def player_cards(self, pid) -> List[Card]:
return self.cards[pid * 2 : (pid * 2 + 2)]
def board_cards(self, street) -> List[Card]:
nplayers = len(self.agents)
idx_start = nplayers * 2
if street == "flop":
return self.cards[idx_start: idx_start + 3]
if street == "turn":
return self.cards[idx_start: idx_start + 4]
if street == "river":
return self.cards[idx_start: idx_start + 5]
return []
def get_current_max_bet(self) -> int:
"""
获取当前最高下注额, 用于计算跟注金额
"""
return max(self.pot) if self.pot else 0
def get_call_amount(self, pid) -> int:
"""
计算玩家跟注所需金额
"""
if pid >= len(self.pot):
return 0
max_pot = self.get_current_max_bet()
return max(0, max_pot - self.pot[pid])
def is_betting_round_complete(self) -> bool:
"""
检查当前下注轮是否完成
"""
active_players = [i for i, state in enumerate(self.player_states)
if state in (PlayerState.ACTIVE, PlayerState.CALLED)]
if len(active_players) <= 1:
return True
# 检查所有active玩家是否都已投入相同金额
max_pot = self.get_current_max_bet()
for i in active_players: # allin玩家不要求跟注
if self.pot[i] < max_pot and self.player_states[i] != PlayerState.ALLIN:
return False
return True
def advance_to_next_street(self):
if self.current_stage == GameStage.FINISHED:
return
next_stage = GameStage.get_next_stage(self.current_stage)
if next_stage is None:
self.current_stage = GameStage.FINISHED
return
self.current_stage = next_stage
# 重置下注轮状态
self.betting_round_complete = False
#### 重置pot为累计投入 (不清零,为了计算边池)
# 重置行动状态
for i, state in enumerate(self.player_states):
if state == PlayerState.CALLED:
self.player_states[i] = PlayerState.ACTIVE
# 设置首个行动玩家
num_players = len(self.agents)
self.current_turn = self.blind_config.get_first_to_act(self.current_stage, num_players)
self.last_raise_amount = 0
self.min_raise = self.blind_config.big_blind
def get_next_active_player(self, start_pos) -> Optional[int]:
for i in range(len(self.agents)):
pos = (start_pos + i) % len(self.agents)
if self.player_states[pos] == PlayerState.ACTIVE:
return pos
return None
def node_info(self) -> Dict:
if self.current_turn >= len(self.pot):
return {"bet_min": self.min_raise, "bet_max": 0, "call_amount": 0}
# 跟注
call_amount = self.get_call_amount(self.current_turn)
return {
"bet_min": max(self.min_raise, call_amount + self.min_raise),
"bet_max": 100, ########## 需要从ArenaGame获取实际大小
"call_amount": call_amount
}
def apply_action(self, pid, action, amount):
"""
应用玩家动作
"""
if pid != self.current_turn:
raise ValueError(f"不是玩家 {pid} 的回合")
if self.player_states[pid] not in (PlayerState.ACTIVE,):
raise ValueError(f"玩家 {pid} 无法行动,当前状态: {self.player_states[pid]}")
action = action.lower()
self.history.append({"pid": pid, "action": action, "amount": amount})
if action == "fold":
self.player_states[pid] = PlayerState.FOLDED
elif action == "call":
call_amount = self.get_call_amount(pid)
if call_amount == 0:
# check
self.history[-1]["action"] = "check"
else:
self.pot[pid] += call_amount
self.total_pot += call_amount
self.player_states[pid] = PlayerState.CALLED
elif action == "check":
call_amount = self.get_call_amount(pid)
if call_amount > 0:
raise ValueError("跟注金额>0, 无法过牌,需要跟注或弃牌")
self.player_states[pid] = PlayerState.CALLED
elif action in ("bet", "raise"):
if amount is None:
raise ValueError(f"{action} 需要指定金额")
# 加注需要补齐跟注金额
call_amount = self.get_call_amount(pid)
total_amount = call_amount + (amount or 0)
self.pot[pid] += total_amount
self.total_pot += total_amount
if amount and amount > 0:
self.last_raise_amount = amount
self.min_raise = amount
self.player_states[pid] = PlayerState.CALLED
# 其他跟注的玩家
for i, state in enumerate(self.player_states):
if i != pid and state == PlayerState.CALLED:
self.player_states[i] = PlayerState.ACTIVE
else:
raise ValueError(f"未知动作: {action}")
# 下一个玩家
self._advance_turn()
def _advance_turn(self):
"""
推进回合
"""
# 检查下注轮是否完成
if self.is_betting_round_complete():
self.betting_round_complete = True
self.advance_to_next_street()
else:
# 找到下一个可行动玩家
next_player = self.get_next_active_player(self.current_turn + 1)
if next_player is not None:
self.current_turn = next_player
else:
# 没有玩家需要行动,结束下注轮
self.betting_round_complete = True
self.advance_to_next_street()
def to_save_data(self) -> Dict:
players = [f"Agent{a.pid}" for a in self.agents]
return {
"history": self.history,
"players": players,
"player_cards": ["".join(str(c) for c in self.player_cards(i)) for i in range(len(self.agents))],
"board": "".join(str(c) for c in self.board_cards("river")),
}
def dump_data(self, path: Path | None = None):
if self.saved:
return
if path is None:
path = Path.cwd() / "shortdeck_arena_history.jsonl"
with path.open("a", encoding="utf-8") as f:
f.write(json.dumps(self.to_save_data()))
f.write("\n")
self.saved = True