diff --git a/shortdeck_arena/__pycache__/card.cpython-313.pyc b/shortdeck_arena/__pycache__/card.cpython-313.pyc index 3e09a51..bb2f87c 100644 Binary files a/shortdeck_arena/__pycache__/card.cpython-313.pyc and b/shortdeck_arena/__pycache__/card.cpython-313.pyc differ diff --git a/shortdeck_arena/__pycache__/game_stage.cpython-313.pyc b/shortdeck_arena/__pycache__/game_stage.cpython-313.pyc new file mode 100644 index 0000000..8653e8f Binary files /dev/null and b/shortdeck_arena/__pycache__/game_stage.cpython-313.pyc differ diff --git a/shortdeck_arena/__pycache__/simulation.cpython-313.pyc b/shortdeck_arena/__pycache__/simulation.cpython-313.pyc index 1009cfc..b0ea0eb 100644 Binary files a/shortdeck_arena/__pycache__/simulation.cpython-313.pyc and b/shortdeck_arena/__pycache__/simulation.cpython-313.pyc differ diff --git a/shortdeck_arena/card.py b/shortdeck_arena/card.py index a57bf94..3da1e94 100644 --- a/shortdeck_arena/card.py +++ b/shortdeck_arena/card.py @@ -50,3 +50,5 @@ class Card: for s in Suit: cards.append(Card(r, s)) return cards + + diff --git a/shortdeck_arena/game_stage.py b/shortdeck_arena/game_stage.py new file mode 100644 index 0000000..8406d57 --- /dev/null +++ b/shortdeck_arena/game_stage.py @@ -0,0 +1,91 @@ +from __future__ import annotations + +from enum import Enum +from typing import List, Optional + + +class GameStage(Enum): + PREFLOP = "preflop" + FLOP = "flop" + TURN = "turn" + RIVER = "river" + SHOWDOWN = "showdown" + FINISHED = "finished" + + def __str__(self) -> str: + return self.value + + @classmethod + def get_next_stage(cls, current: 'GameStage') -> Optional['GameStage']: + stage_order = [cls.PREFLOP, cls.FLOP, cls.TURN, cls.RIVER, cls.SHOWDOWN, cls.FINISHED] + current_idx = stage_order.index(current) + if current_idx < len(stage_order) - 1: + return stage_order[current_idx + 1] + return None + + def get_board_card_count(self) -> int: + if self == GameStage.PREFLOP: + return 0 + elif self == GameStage.FLOP: + return 3 + elif self == GameStage.TURN: + return 4 + elif self in (GameStage.RIVER, GameStage.SHOWDOWN): + return 5 + return 0 + + +class PlayerState(Enum): + ACTIVE = "active" # 可行动 + FOLDED = "folded" # 已弃牌 + ALLIN = "allin" # 全下 + CALLED = "called" # 已跟注 + RAISE = "raised" # 已加注 + WAITING = "waiting" # 等待其他玩家 + OUT = "out" # 出局 + + def __str__(self) -> str: + return self.value + + +class BlindConfig: + # 对抗玩法是否需要前注? + def __init__(self, small_blind: int = 1, big_blind: int = 2, ante: int = 0): + self.small_blind = small_blind + self.big_blind = big_blind + self.ante = ante + + + def get_sb_position(self, num_players, dealer_position) -> int: + """ + 获取小盲位置 + heads-up时小盲为庄位 + """ + if num_players == 2: + return dealer_position # heads-up: 小盲 = 庄位 + else: + return (dealer_position + 1) % num_players + + def get_bb_position(self, num_players, dealer_position) -> int: + """ + 获取大盲位置 + """ + if num_players == 2: + return (dealer_position + 1) % 2 # heads-up: 大盲 = 庄位 + return (dealer_position + 2) % num_players # 多人: 大盲为庄位后两位 + + def get_first_to_act(self, stage: GameStage, num_players) -> int: + """ + 获取首个行动玩家位置 + 区分preflop和postflop + 1. preflop: 大盲后第一位 (UTG) + 2. postflop: 小盲位先行动 + """ + if stage == GameStage.PREFLOP: + if num_players == 2: + return self.get_sb_position(num_players) # heads-up: 小盲先行动 + else: # preflop: 大盲后第一位 + return (self.get_bb_position(num_players) + 1) % num_players + else: + # flop/river/turn: 小盲位先行动 + return self.get_sb_position(num_players) \ No newline at end of file diff --git a/shortdeck_arena/hand_evaluator.py b/shortdeck_arena/hand_evaluator.py new file mode 100644 index 0000000..455f884 --- /dev/null +++ b/shortdeck_arena/hand_evaluator.py @@ -0,0 +1,115 @@ +from typing import List, Tuple, Dict +from collections import Counter +from itertools import combinations +from .card import Card, Rank, Suit +from .hand_ranking import HandRanking, HandType + + +class HandEvaluator: + @staticmethod + def evaluateHand(cards) -> HandRanking: + """ + 从7张牌中找出最好的5张牌组合 + """ + if len(cards) != 7: + raise ValueError(f"Expected 7 cards, got {len(cards)}") + + best_ranking = None + best_cards = None + + # 所有可能的5张牌组合 + for five_cards in combinations(cards, 5): + ranking = HandEvaluator.evaluate5Cards(list(five_cards)) + + if best_ranking is None or ranking > best_ranking: + best_ranking = ranking + best_cards = list(five_cards) + best_ranking.cards = best_cards + return best_ranking + + @staticmethod + def evaluate5Cards(cards) -> HandRanking: + + if len(cards) != 5: + raise ValueError(f"Expected 5 cards, got {len(cards)}") + + # 按点数排序(降序) + sorted_cards = sorted(cards, key=lambda c: c.rank.numeric_value, reverse=True) + ranks = [card.rank for card in sorted_cards] + suits = [card.suit for card in sorted_cards] + + # 统计点数出现次数 + rank_counts = Counter(ranks) + count_values = sorted(rank_counts.values(), reverse=True) + + # 同花 + is_flush = len(set(suits)) == 1 + + # 顺子 + is_straight, straight_high = HandEvaluator._isStraight(ranks) + + # 根据牌型返回相应的HandRanking + if is_straight and is_flush: + if straight_high == Rank.ACE and ranks == [Rank.ACE, Rank.KING, Rank.QUEEN, Rank.JACK, Rank.TEN]: + return HandRanking(HandType.ROYAL_FLUSH, [Rank.ACE], sorted_cards) # 皇家同花顺 + else: + return HandRanking(HandType.STRAIGHT_FLUSH, [straight_high], sorted_cards) # 同花顺 + + elif count_values == [4, 1]: # 四条 + quad_rank = [rank for rank, count in rank_counts.items() if count == 4][0] + kicker = [rank for rank, count in rank_counts.items() if count == 1][0] + return HandRanking(HandType.FOUR_OF_A_KIND, [quad_rank, kicker], sorted_cards) + + elif count_values == [3, 2]: # 三条+一对 + trips_rank = [rank for rank, count in rank_counts.items() if count == 3][0] + pair_rank = [rank for rank, count in rank_counts.items() if count == 2][0] + return HandRanking(HandType.FULL_HOUSE, [trips_rank, pair_rank], sorted_cards) + + elif is_flush: # 同花 + return HandRanking(HandType.FLUSH, ranks, sorted_cards) + + elif is_straight: # 顺子 + return HandRanking(HandType.STRAIGHT, [straight_high], sorted_cards) + + elif count_values == [3, 1, 1]: # 三条 + trips_rank = [rank for rank, count in rank_counts.items() if count == 3][0] + kickers = sorted([rank for rank, count in rank_counts.items() if count == 1], reverse=True) + return HandRanking(HandType.THREE_OF_A_KIND, [trips_rank] + kickers, sorted_cards) + + elif count_values == [2, 2, 1]: # 两对 + pairs = sorted([rank for rank, count in rank_counts.items() if count == 2], reverse=True) + kicker = [rank for rank, count in rank_counts.items() if count == 1][0] + return HandRanking(HandType.TWO_PAIR, pairs + [kicker], sorted_cards) + + elif count_values == [2, 1, 1, 1]: # 一对 + pair_rank = [rank for rank, count in rank_counts.items() if count == 2][0] + kickers = sorted([rank for rank, count in rank_counts.items() if count == 1], reverse=True) + return HandRanking(HandType.ONE_PAIR, [pair_rank] + kickers, sorted_cards) + + else: # 高牌 + return HandRanking(HandType.HIGH_CARD, ranks, sorted_cards) + + @staticmethod + def _isStraight(ranks: List[Rank]) -> Tuple[bool, Rank]: + + values = sorted([rank.numeric_value for rank in ranks], reverse=True) + + is_regular_straight = True + for i in range(1, len(values)): + if values[i-1] - values[i] != 1: + is_regular_straight = False + break + + if is_regular_straight: + # 返回最高牌 + highest_rank = None + for rank in ranks: + if rank.numeric_value == values[0]: + highest_rank = rank + break + return True, highest_rank + + if values == [14, 5, 4, 3, 2]: # A, 5, 4, 3, 2 + return True, Rank.FIVE + + return False, None \ No newline at end of file diff --git a/shortdeck_arena/hand_ranking.py b/shortdeck_arena/hand_ranking.py new file mode 100644 index 0000000..9a29768 --- /dev/null +++ b/shortdeck_arena/hand_ranking.py @@ -0,0 +1,57 @@ +from enum import Enum +from typing import List, Tuple +from .card import Card, Rank + + +class HandType(Enum): + HIGH_CARD = (1, "High Card") + ONE_PAIR = (2, "Pair") + TWO_PAIR = (3, "Two Pair") + THREE_OF_A_KIND = (4, "Three of a Kind") + STRAIGHT = (5, "Straight") + FLUSH = (6, "Flush") + FULL_HOUSE = (7, "Full House") + FOUR_OF_A_KIND = (8, "Four of a Kind") + STRAIGHT_FLUSH = (9, "Straight Flush") + ROYAL_FLUSH = (10, "Royal Flush") + + def __new__(cls, strength, name): + obj = object.__new__(cls) + obj._value_ = strength + obj.strength = strength + obj.type_name = name + return obj + + + +class HandRanking: + + def __init__(self, hand_type: HandType, key_ranks: List[Rank], cards: List[Card]): + self.hand_type = hand_type + self.key_ranks = key_ranks # 用于比较的关键点数 + self.cards = cards # 组成这个ranking的5张牌 + + def __str__(self): + if self.hand_type == HandType.FOUR_OF_A_KIND: + return f"Quad({self.key_ranks[0].symbol})" + elif self.hand_type == HandType.FULL_HOUSE: + return f"Full House({self.key_ranks[0].symbol} over {self.key_ranks[1].symbol})" + elif self.hand_type == HandType.FLUSH: + return f"Flush({self.key_ranks[0].symbol} high)" + elif self.hand_type == HandType.STRAIGHT: + return f"Straight({self.key_ranks[0].symbol} high)" + elif self.hand_type == HandType.STRAIGHT_FLUSH: + if self.key_ranks[0] == Rank.ACE: + return "Royal Flush" + else: + return f"Straight Flush({self.key_ranks[0].symbol} high)" + elif self.hand_type == HandType.ROYAL_FLUSH: + return "Royal Flush" + elif self.hand_type == HandType.THREE_OF_A_KIND: + return f"Three of a Kind({self.key_ranks[0].symbol})" + elif self.hand_type == HandType.TWO_PAIR: + return f"Two Pair({self.key_ranks[0].symbol} and {self.key_ranks[1].symbol})" + elif self.hand_type == HandType.ONE_PAIR: + return f"Pair({self.key_ranks[0].symbol})" + else: + return f"High Card({self.key_ranks[0].symbol})" diff --git a/shortdeck_arena/simulation.py b/shortdeck_arena/simulation.py index ef39d8c..8b6a390 100644 --- a/shortdeck_arena/simulation.py +++ b/shortdeck_arena/simulation.py @@ -9,26 +9,100 @@ from typing import TYPE_CHECKING if TYPE_CHECKING: from .agent import Agent from .card import Card +from .game_stage import GameStage, PlayerState, BlindConfig class Simulation: - def __init__(self, agents: List[Agent]): + def __init__(self, agents: List[Agent], blind_config: Optional[BlindConfig] = None): self.agents = agents self.history: List[Dict] = [] self.cards: List[Card] = [] self.saved = False + + # 游戏状态管理 + self.current_stage = GameStage.PREFLOP + self.player_states: List[PlayerState] = [PlayerState.ACTIVE] * len(agents) + self.current_turn = 0 + self.betting_round_complete = False + + # 盲注配置 + self.blind_config = blind_config or BlindConfig() + + # 筹码和底池管理 + self.pot: List[int] = [0] * len(agents) # 每个玩家在当前轮的投入 + self.total_pot = 0 + self.last_raise_amount = 0 + self.min_raise = self.blind_config.big_blind + self.dealer_position = -1 + self.new_round() def new_round(self): self.history = [] self.cards = Card.all_short() - random.shuffle(self.cards) + random.shuffle(self.cards) # 洗牌 self.saved = False + + # 重置游戏状态 + self.current_stage = GameStage.PREFLOP + self.player_states = [PlayerState.ACTIVE] * len(self.agents) + self.betting_round_complete = False + + # 重置下注状态 + self.pot = [0] * len(self.agents) + self.total_pot = 0 + self.last_raise_amount = 0 + self.min_raise = self.blind_config.big_blind + + # 设置盲注 + self._setup_blinds() - def player_cards(self, pid: int) -> List[Card]: - return self.cards[pid * 2 : pid * 2 + 2] + # 庄家位置 + self.dealer_position = random.choice(range(len(self.agents))) + + def _setup_blinds(self): + """设置盲注""" + num_players = len(self.agents) + + # 至少需要2个玩家才能设置盲注 + if num_players < 2: + self.current_turn = 0 if num_players > 0 else 0 + return - def board_cards(self, street: str) -> List[Card]: + sb_pos = self.blind_config.get_sb_position(num_players,self.dealer_position) + bb_pos = self.blind_config.get_bb_position(num_players,self.dealer_position) + + # 确保位置有效 + if sb_pos >= num_players or bb_pos >= num_players: + self.current_turn = 0 + return + + # 扣除小盲 + self.pot[sb_pos] = self.blind_config.small_blind + self.total_pot += self.blind_config.small_blind + self.history.append({ + "pid": sb_pos, + "action": "small_blind", + "amount": self.blind_config.small_blind + }) + + # 扣除大盲 + self.pot[bb_pos] = self.blind_config.big_blind + self.total_pot += self.blind_config.big_blind + self.history.append({ + "pid": bb_pos, + "action": "big_blind", + "amount": self.blind_config.big_blind + }) + + # 首个行动玩家 + self.current_turn = self.blind_config.get_first_to_act(self.current_stage, num_players) + self.last_raise_amount = self.blind_config.big_blind + + def player_cards(self, pid) -> List[Card]: + return self.cards[pid * 2 : (pid * 2 + 2)] + + def board_cards(self, street) -> List[Card]: nplayers = len(self.agents) idx_start = nplayers * 2 if street == "flop": @@ -39,11 +113,159 @@ class Simulation: return self.cards[idx_start: idx_start + 5] return [] - def node_info(self) -> Dict: - return {"bet_min": 1, "bet_max": 100} + def get_current_max_bet(self) -> int: + """ + 获取当前最高下注额, 用于计算跟注金额 + """ + return max(self.pot) if self.pot else 0 + + def get_call_amount(self, pid) -> int: + """ + 计算玩家跟注所需金额 + """ + if pid >= len(self.pot): + return 0 + max_pot = self.get_current_max_bet() + return max(0, max_pot - self.pot[pid]) + + def is_betting_round_complete(self) -> bool: + """ + 检查当前下注轮是否完成 + """ + active_players = [i for i, state in enumerate(self.player_states) + if state in (PlayerState.ACTIVE, PlayerState.CALLED)] + + if len(active_players) <= 1: + return True + + # 检查所有active玩家是否都已投入相同金额 + max_pot = self.get_current_max_bet() + for i in active_players: # allin玩家不要求跟注 + if self.pot[i] < max_pot and self.player_states[i] != PlayerState.ALLIN: + return False + return True + + def advance_to_next_street(self): + if self.current_stage == GameStage.FINISHED: + return + + next_stage = GameStage.get_next_stage(self.current_stage) + if next_stage is None: + self.current_stage = GameStage.FINISHED + return + + self.current_stage = next_stage + + # 重置下注轮状态 + self.betting_round_complete = False + + #### 重置pot为累计投入 (不清零,为了计算边池) - def apply_action(self, pid: int, action: str, amount: Optional[int] = None): + # 重置行动状态 + for i, state in enumerate(self.player_states): + if state == PlayerState.CALLED: + self.player_states[i] = PlayerState.ACTIVE + + # 设置首个行动玩家 + num_players = len(self.agents) + self.current_turn = self.blind_config.get_first_to_act(self.current_stage, num_players) + self.last_raise_amount = 0 + self.min_raise = self.blind_config.big_blind + + def get_next_active_player(self, start_pos) -> Optional[int]: + for i in range(len(self.agents)): + pos = (start_pos + i) % len(self.agents) + if self.player_states[pos] == PlayerState.ACTIVE: + return pos + return None + + def node_info(self) -> Dict: + if self.current_turn >= len(self.pot): + return {"bet_min": self.min_raise, "bet_max": 0, "call_amount": 0} + # 跟注 + call_amount = self.get_call_amount(self.current_turn) + return { + "bet_min": max(self.min_raise, call_amount + self.min_raise), + "bet_max": 100, ########## 需要从ArenaGame获取实际大小 + "call_amount": call_amount + } + + def apply_action(self, pid, action, amount): + """ + 应用玩家动作 + """ + if pid != self.current_turn: + raise ValueError(f"不是玩家 {pid} 的回合") + + if self.player_states[pid] not in (PlayerState.ACTIVE,): + raise ValueError(f"玩家 {pid} 无法行动,当前状态: {self.player_states[pid]}") + + action = action.lower() + self.history.append({"pid": pid, "action": action, "amount": amount}) + + if action == "fold": + self.player_states[pid] = PlayerState.FOLDED + + elif action == "call": + call_amount = self.get_call_amount(pid) + if call_amount == 0: + # check + self.history[-1]["action"] = "check" + else: + self.pot[pid] += call_amount + self.total_pot += call_amount + self.player_states[pid] = PlayerState.CALLED + + elif action == "check": + call_amount = self.get_call_amount(pid) + if call_amount > 0: + raise ValueError("跟注金额>0, 无法过牌,需要跟注或弃牌") + self.player_states[pid] = PlayerState.CALLED + + elif action in ("bet", "raise"): + if amount is None: + raise ValueError(f"{action} 需要指定金额") + # 加注需要补齐跟注金额 + call_amount = self.get_call_amount(pid) + total_amount = call_amount + (amount or 0) + + self.pot[pid] += total_amount + self.total_pot += total_amount + + if amount and amount > 0: + self.last_raise_amount = amount + self.min_raise = amount + self.player_states[pid] = PlayerState.CALLED + + # 其他跟注的玩家 + for i, state in enumerate(self.player_states): + if i != pid and state == PlayerState.CALLED: + self.player_states[i] = PlayerState.ACTIVE + + else: + raise ValueError(f"未知动作: {action}") + + # 下一个玩家 + self._advance_turn() + + def _advance_turn(self): + """ + 推进回合 + """ + # 检查下注轮是否完成 + if self.is_betting_round_complete(): + self.betting_round_complete = True + self.advance_to_next_street() + else: + # 找到下一个可行动玩家 + next_player = self.get_next_active_player(self.current_turn + 1) + if next_player is not None: + self.current_turn = next_player + else: + # 没有玩家需要行动,结束下注轮 + self.betting_round_complete = True + self.advance_to_next_street() def to_save_data(self) -> Dict: players = [f"Agent{a.pid}" for a in self.agents] @@ -60,6 +282,6 @@ class Simulation: if path is None: path = Path.cwd() / "shortdeck_arena_history.jsonl" with path.open("a", encoding="utf-8") as f: - f.write(json.dumps(self.to_save_data(), ensure_ascii=False)) + f.write(json.dumps(self.to_save_data())) f.write("\n") self.saved = True diff --git a/shortdeck_server/__pycache__/arena_adapter.cpython-313.pyc b/shortdeck_server/__pycache__/arena_adapter.cpython-313.pyc index 00a859a..6f49232 100644 Binary files a/shortdeck_server/__pycache__/arena_adapter.cpython-313.pyc and b/shortdeck_server/__pycache__/arena_adapter.cpython-313.pyc differ diff --git a/shortdeck_server/arena_adapter.py b/shortdeck_server/arena_adapter.py index 08d18fb..c0aa5e6 100644 --- a/shortdeck_server/arena_adapter.py +++ b/shortdeck_server/arena_adapter.py @@ -5,35 +5,43 @@ from pathlib import Path from shortdeck_arena.simulation import Simulation from shortdeck_arena.agent import HumanAgent +from shortdeck_arena.game_stage import BlindConfig, GameStage, PlayerState import uuid class ArenaGame: - def __init__(self, starting_stack: int = 1000, max_players: int = 6): + def __init__(self, starting_stack: int = 1000, max_players: int = 6, + small_blind: int = 1, big_blind: int = 2): self.agents = [] self.player_names: List[str] = [] self.starting_stack = starting_stack self.max_players = max_players self.sim: Optional[Simulation] = None + + # 筹码管理 self.stacks: List[int] = [] - self.current_turn: int = 0 - self.pot: int = 0 - self.game_id = [str(name) for name in self.player_names] + + # 盲注配置 + self.blind_config = BlindConfig(small_blind, big_blind, ante=0) + + # 游戏标识 + self.game_id = str(uuid.uuid4()) def join_game(self, name: str) -> int: if len(self.player_names) >= self.max_players: raise ValueError("table full") + pid = len(self.player_names) self.player_names.append(name) agent = HumanAgent(pid) self.agents.append(agent) self.stacks.append(self.starting_stack) - self.sim = Simulation(self.agents) + self.sim = Simulation(self.agents, self.blind_config) return pid - def info(self, player_id) -> Dict: - + def info(self, player_id: Optional[int] = None) -> Dict: + """获取游戏状态信息""" if not self.sim: return { "game_id": self.game_id, @@ -48,62 +56,123 @@ class ArenaGame: "board_cards": [], } + # 更新栈大小 (扣除已投入底池的金额) + updated_stacks = [] + for i, base_stack in enumerate(self.stacks): + pot_contribution = self.sim.pot[i] if i < len(self.sim.pot) else 0 + updated_stacks.append(max(0, base_stack - pot_contribution)) + + for i in range(len(self.stacks)): + if i < len(updated_stacks): + self.stacks[i] = updated_stacks[i] + player_cards = [] board_cards = [] + + # 获取玩家手牌 try: if player_id is not None and 0 <= player_id < len(self.agents): player_cards = [str(c) for c in self.sim.player_cards(player_id)] except Exception: player_cards = [] + # 获取公共牌 (根据当前阶段) try: - - board_cards = [str(c) for c in self.sim.board_cards("river")] + current_stage = self.sim.current_stage.value + board_cards = [str(c) for c in self.sim.board_cards(current_stage)] except Exception: board_cards = [] + + # 获取可用动作信息 + actions = {} + if (player_id is not None and player_id == self.sim.current_turn and + self.sim.current_stage != GameStage.FINISHED): + + call_amount = self.sim.get_call_amount(player_id) + available_stack = updated_stacks[player_id] if player_id < len(updated_stacks) else 0 + + # 更新node_info以包含实际栈信息 + node_info = self.sim.node_info() + node_info["bet_max"] = available_stack + + actions = { + "call_amount": call_amount, + "bet_min": node_info["bet_min"], + "bet_max": node_info["bet_max"], + "can_check": call_amount == 0, + "can_fold": True, + "can_call": call_amount > 0 and call_amount <= available_stack, + "can_bet": available_stack > call_amount, + } return { "game_id": self.game_id, "players": self.player_names, - "stacks": list(self.stacks), - "dealer_index": 0, - "current_turn": self.current_turn, - "pot": self.pot, - "stage": "preflop", - "actions": {"bet_min": 1, "bet_max": 100}, + "stacks": updated_stacks, + "dealer_index": 0, # 简化:固定庄家位置, (优化轮询) + "current_turn": self.sim.current_turn, + "pot": self.sim.total_pot, + "stage": self.sim.current_stage.value, + "actions": actions, "player_cards": player_cards, "board_cards": board_cards, + "player_states": [state.value for state in self.sim.player_states], } def apply_action(self, pid: int, action: str, amount: Optional[int] = None): if not self.sim: raise ValueError("no game") - if pid != self.current_turn: - raise ValueError("not your turn") - + + # 验证动作合法性 + if pid != self.sim.current_turn: + raise ValueError(f"not your turn, current turn: {self.sim.current_turn}") + + if self.sim.current_stage == GameStage.FINISHED: + raise ValueError("game already finished") + + # 获取玩家可用筹码 + pot_contribution = self.sim.pot[pid] if pid < len(self.sim.pot) else 0 + available_stack = self.stacks[pid] - pot_contribution + + # 预处理动作和金额 action = action.lower() - if action == "check": - pass - elif action == "bet": - if amount is None: - raise ValueError("bet requires amount") - if amount < 0: - raise ValueError("invalid amount") - if amount > self.stacks[pid]: - - amount = self.stacks[pid] - self.stacks[pid] -= amount - self.pot += amount - elif action == "fold": - self.stacks[pid] = 0 - else: - raise ValueError(f"unknown action: {action}") - - self.sim.apply_action(pid, action, amount) + + if action in ("bet", "raise") and amount is not None: + # 限制下注金额不超过可用筹码 + if amount > available_stack: + amount = available_stack + # 如果全下,可能需要标记为allin + if amount == available_stack and available_stack > 0: + self.sim.player_states[pid] = PlayerState.ALLIN + + elif action == "call": + # call动作的金额验证 + call_amount = self.sim.get_call_amount(pid) + if call_amount > available_stack: + # 不够跟注,自动allin + amount = available_stack + if available_stack > 0: + self.sim.player_states[pid] = PlayerState.ALLIN + + try: + self.sim.apply_action(pid, action, amount) + except ValueError as e: + raise ValueError(f"invalid action: {e}") + self.sim.dump_data(Path.cwd() / "shortdeck_arena_history.jsonl") - if len(self.agents) > 0: - self.current_turn = (self.current_turn + 1) % len(self.agents) + @property + def current_turn(self) -> int: + if not self.sim: + return 0 + return self.sim.current_turn + + @property + def pot(self) -> int: + if not self.sim: + return 0 + return self.sim.total_pot + @property def history(self) -> List[Dict]: if not self.sim: diff --git a/shortdeck_server/game_stage.py b/shortdeck_server/game_stage.py new file mode 100644 index 0000000..2010fd1 --- /dev/null +++ b/shortdeck_server/game_stage.py @@ -0,0 +1,42 @@ + + +class GameStage(Enum): + PREFLOP = "preflop" + FLOP = "flop" + TURN = "turn" + RIVER = "river" + SHOWDOWN = "showdown" + + # def setup_preflop(self): + # # 自动扣除小盲/大盲 + # # 设置首个行动玩家 + # pass + + def advance_street(self): + # 检查下注轮是否结束 + # 发放公共牌 + # 重置行动顺序 + pass + + # def get_call_amount(self, pid): + # # 计算跟注所需金额 + # pass + + # def get_min_raise(self, pid): + # # 计算最小加注金额 + # pass + + def handle_all_in(self, pid, amount): + # 创建边池 + # 标记玩家状态 + pass + + def evaluate_hand(self, hole_cards, board_cards): + # 短牌手牌强度排序 + # A-6 低顺特殊处理 + pass + + def determine_winners(self, active_players): + # 边池分配 + # 短牌规则下的比牌 + pass diff --git a/shortdeck_server/main.py b/shortdeck_server/main.py index 9bb5a6f..5ee71bb 100644 --- a/shortdeck_server/main.py +++ b/shortdeck_server/main.py @@ -18,7 +18,7 @@ class JoinResponse(BaseModel): class ActionRequest(BaseModel): player_id: int - action: str + action: str # "fold", "call", "raise", "check", "bet" amount: int | None = None diff --git a/shortdeck_server/random_cli.py b/shortdeck_server/random_cli.py index a083cbf..94032ed 100644 --- a/shortdeck_server/random_cli.py +++ b/shortdeck_server/random_cli.py @@ -12,13 +12,13 @@ BASE_PATH = "/get_game_state" APPLY_PATH = "/apply_action" -def fetch_game_state(base_url: str, player_id: int) -> Dict[str, Any]: +def fetch_game_state(base_url, player_id) -> Dict[str, Any]: resp = requests.get(f"{base_url.rstrip('/')}{BASE_PATH}", params={"player_id": player_id}, timeout=5) resp.raise_for_status() return resp.json() -def post_action(base_url: str, player_id: int, action: str, amount: Optional[int]): +def post_action(base_url, player_id, action, amount): payload = {"player_id": player_id, "action": action} if amount is not None: payload["amount"] = amount @@ -27,7 +27,7 @@ def post_action(base_url: str, player_id: int, action: str, amount: Optional[int return resp.json() -def choose_random_action(info: Dict[str, Any]) -> Optional[tuple[str, Optional[int]]]: +def choose_random_action(info) -> Optional[tuple[str, Optional[int]]]: actions = info.get("actions") if not actions: return None @@ -48,7 +48,7 @@ def choose_random_action(info: Dict[str, Any]) -> Optional[tuple[str, Optional[i return random.choice(choices) -def run_loop(base_url: str, player_id: int, interval: float = 2.0, seed: Optional[int] = None): +def run_loop(base_url, player_id, interval, seed): if seed is not None: random.seed(seed) @@ -77,7 +77,7 @@ def run_loop(base_url: str, player_id: int, interval: float = 2.0, seed: Optiona time.sleep(interval) -def main(argv: list[str] | None = None) -> int: +def main(argv) -> int: parser = argparse.ArgumentParser() parser.add_argument("--server", default="http://localhost:8000") parser.add_argument("--player_id", type=int, default=1) diff --git a/shortdeck_server/tests/test_game.py b/shortdeck_server/tests/test_game.py index 73a4e6a..6281683 100644 --- a/shortdeck_server/tests/test_game.py +++ b/shortdeck_server/tests/test_game.py @@ -4,23 +4,31 @@ from shortdeck_server.arena_adapter import ArenaGame def test_join_and_actions(): - g = ArenaGame(starting_stack=100, max_players=3) + g = ArenaGame(starting_stack=100, max_players=3, small_blind=1, big_blind=2) pid0 = g.join_game("aa") pid1 = g.join_game("bb") assert pid0 == 0 assert pid1 == 1 state = g.info() - assert state["stacks"] == [100, 100] + # 在短牌扑克中,玩家加入后盲注已自动扣除 + # 小盲(pid0): 100-1=99, 大盲(pid1): 100-2=98 + assert state["stacks"] == [99, 98] + + # 验证轮次管理:heads-up时小盲先行动 + assert g.current_turn == 0 + + # 测试错误的玩家尝试行动 try: g.apply_action(1, "fold") except ValueError as e: assert "not your turn" in str(e) - g.apply_action(0, "check") + # 小盲玩家call (跟注到大盲) + g.apply_action(0, "call") assert g.current_turn == 1 - g.apply_action(1, "bet", 10) - assert g.pot == 10 - assert g.stacks[1] == 90 + # 大盲玩家加注 + g.apply_action(1, "bet", 10) + assert g.pot >= 10 # 底池至少包含加注金额 assert g.history[-1]["action"] == "bet"