From b4afdb06c4359b254e1417498df160232662dd00 Mon Sep 17 00:00:00 2001 From: jianghaiying Date: Thu, 6 Nov 2025 17:41:59 +0800 Subject: [PATCH] gametree: fix player --- gametree/__init__.py | 2 - gametree/deck.py | 3 + gametree/game.py | 247 +++++++++++++++++++++++++++---------------- gametree/model.py | 11 +- test/p_creat.py | 67 ++++++++++++ test/pg.json | 64 ----------- test/pg.py | 13 +-- 7 files changed, 236 insertions(+), 171 deletions(-) create mode 100644 test/p_creat.py delete mode 100644 test/pg.json diff --git a/gametree/__init__.py b/gametree/__init__.py index 673eea4..84c2ea3 100644 --- a/gametree/__init__.py +++ b/gametree/__init__.py @@ -1,6 +1,5 @@ from .model import ( PlayerId, - Player, Action, ActionType, Street, @@ -18,7 +17,6 @@ from .hand_evaluator import HandEvaluator __all__ = [ "PlayerId", - "Player", "Action", "ActionType", "Street", diff --git a/gametree/deck.py b/gametree/deck.py index cb8da4a..8c5935a 100644 --- a/gametree/deck.py +++ b/gametree/deck.py @@ -52,3 +52,6 @@ class DeckManager: burn = self.burn_card() river = self.deal_card() return burn, river + + # def get_remaining_cards(self) -> List[Card]: + # return self.cards[self.position:] diff --git a/gametree/game.py b/gametree/game.py index 9ae9c9c..ecf3afb 100644 --- a/gametree/game.py +++ b/gametree/game.py @@ -16,7 +16,9 @@ class PlayerState: committed: int = 0 # 本轮已投入筹码 in_hand: bool = True # 玩家是否仍在本局中 folded: bool = False # 玩家是否已弃牌 - all_in: bool = False # 玩家是否已全押 + allin: bool = False # 玩家是否已全押 + hand_cards: List[Card] = field(default_factory=list) + hand_rankings: Optional[Any] = None def to_dict(self) -> Dict[str, Any]: return { @@ -25,7 +27,8 @@ class PlayerState: "committed": self.committed, "in_hand": self.in_hand, "folded": self.folded, - "all_in": self.all_in, + "allin": self.allin, + "hand_cards": [str(c) for c in self.hand_cards], } @@ -65,13 +68,12 @@ class Pot: class Game: players_init: List[Tuple[PlayerId, int]] = field(default_factory=list) dealer_idx: int = 0 - bb_idx: int = 0 - sb_idx: int = 0 + _bb_idx: Optional[int] = field(default=None, init=False) + _sb_idx: Optional[int] = field(default=None, init=False) small_blind: int = 1 big_blind: int = 2 current_street: Street = Street.PREFLOP - hand_cards: Dict[PlayerId, List[Card]] = field(default_factory=dict) board: List[Card] = field(default_factory=list) players: List[PlayerState] = field(default_factory=list) @@ -85,10 +87,9 @@ class Game: next_to_act_idx: Optional[int] = None terminal: bool = False - winner: Optional[List[PlayerId]] = None pot: List[Pot] = field(default_factory=list) - hand_rankings: Dict[PlayerId, Any] = field(default_factory=dict) + # hand_rankings: Dict[PlayerId, Any] = field(default_factory=dict) deck: List[Card] = field(default_factory=list) # 剩余牌 burned_cards: List[Card] = field(default_factory=list) @@ -116,7 +117,9 @@ class Game: def _init_hand_cards(self): player_ids = [pid for pid, _ in self.players_init] - self.hand_cards = self.deck_manager.deal_hand_cards(player_ids) + deal_card = self.deck_manager.deal_hand_cards(player_ids) + for p in self.players: + p.hand_cards = deal_card.get(p.pid, []) def _init_street_actions(self): self.street_actions = { @@ -130,7 +133,7 @@ class Game: self.deck = Card.all_short().copy() random.shuffle(self.deck) # 移除已发的底牌 - used_cards = [card for hand in self.hand_cards.values() for card in hand] + used_cards = [card for hand in self.players for card in hand.hand_cards] self.deck = [card for card in self.deck if card not in used_cards] def _init_bet(self): @@ -139,18 +142,18 @@ class Game: return if n_players == 2: - self.sb_idx = self.dealer_idx # 两人游戏:庄家是小盲 - self.bb_idx = (self.dealer_idx + 1) % n_players + self._sb_idx = self.dealer_idx # 两人游戏:庄家是小盲 + self._bb_idx = (self.dealer_idx + 1) % n_players else: - self.sb_idx = (self.dealer_idx + 1) % n_players # 小盲位置 = 庄家下一位 - self.bb_idx = (self.dealer_idx + 2) % n_players # 大盲位置 = 小盲下一位 + self._sb_idx = (self.dealer_idx + 1) % n_players # 小盲位置 = 庄家下一位 + self._bb_idx = (self.dealer_idx + 2) % n_players # 大盲位置 = 小盲下一位 # 小盲注 - self.players[self.sb_idx].stack -= self.small_blind - self.players[self.sb_idx].committed += self.small_blind + self.players[self._sb_idx].stack -= self.small_blind + self.players[self._sb_idx].committed += self.small_blind - self.players[self.bb_idx].stack -= self.big_blind - self.players[self.bb_idx].committed += self.big_blind + self.players[self._bb_idx].stack -= self.big_blind + self.players[self._bb_idx].committed += self.big_blind self.current_bet = self.big_blind self.original_bet = self.big_blind @@ -170,14 +173,14 @@ class Game: return None if self.current_street == Street.PREFLOP: - first_idx = (self.bb_idx + 1) % n_players if n_players >= 3 else self.sb_idx + first_idx = (self._bb_idx + 1) % n_players if n_players >= 3 else self._sb_idx self.next_to_act_idx = first_idx return first_idx else: for i in range(0, n_players): - idx = (self.sb_idx + i) % n_players + idx = (self._sb_idx + i) % n_players player = self.players[idx] - if player.in_hand and not player.folded and not player.all_in: + if player.in_hand and not player.folded and not player.allin: self.next_to_act_idx = idx return idx @@ -186,7 +189,7 @@ class Game: return None - def add_action(self, action: Action): + def add_action(self, action: Action) -> Optional[List[PlayerId]]: if self.terminal: raise ValueError("Game is over") @@ -203,7 +206,8 @@ class Game: if not self.terminal: if self.next_to_act_idx is None or self._is_round_over(): if self.current_street == Street.RIVER: - self.finalize_game() + winners = self.finalize_game() + return winners else: self.advance_to_next_street() # all allin @@ -213,8 +217,9 @@ class Game: if (not self.terminal and self.current_street == Street.RIVER and (self.next_to_act_idx is None or self._all_active_all_in())): - self.finalize_game() - + winners = self.finalize_game() + return winners + return None def _all_active_all_in(self) -> bool: active = [p for p in self.players if p.in_hand and not p.folded] @@ -222,13 +227,13 @@ class Game: return True if len(active) <= 1: return True - return all(p.all_in for p in active) + return all(p.allin for p in active) - def finalize_game(self): + def finalize_game(self) -> Optional[List[PlayerId]]: print("+++++++++++++++++++++++++++++++++++++++++++++++++++++++++") print("DEBUG: finalize_game") print(f"DEBUG: pot before finalize: {[(p.amount, len(p.eligible)) for p in self.pot]}") - print(f"DEBUG: active players: {[(p.pid, p.stack, p.committed, p.folded, p.all_in) + print(f"DEBUG: active players: {[(p.pid, p.stack, p.committed, p.folded, p.allin) for p in self.players if p.in_hand]}") if not self.pot: self.finalize_pots() @@ -241,19 +246,28 @@ class Game: for p in self.players: p.committed = 0 self.pot = [] - self.winner = [win_pid] self.terminal = True - return - - winners_all = self.showdown() - if winners_all: - self.winner = winners_all + return [win_pid] + winners = self.cal_winners() + self._distribute_pot() + + for p in self.players: + p.committed = 0 + p.allin = False + self.pot = [] + self.terminal = True + return winners + + def _distribute_pot(self): for pot in self.pot: - elig = [pid for pid in pot.eligible if pid in self.hand_rankings] + elig = [self._get_player(pid) for pid in pot.eligible] + elig = [p for p in elig if p and p.in_hand and not p.folded + and p.hand_rankings is not None] if not elig: continue - best_strength = max(self.hand_rankings[pid].get_strength() for pid in elig) - pot_winners = [pid for pid in elig if self.hand_rankings[pid].get_strength() == best_strength] + best_strength = max(p.hand_rankings.get_strength() for p in elig) + pot_winners = [p.pid for p in elig if + p.hand_rankings.get_strength() == best_strength] share = pot.amount // len(pot_winners) rem = pot.amount % len(pot_winners) for i, pid in enumerate(pot_winners): @@ -262,13 +276,28 @@ class Game: continue ps.stack += share + (rem if i == 0 else 0) + def _compute_hand_rankings(self) -> Dict[PlayerId, Any]: + p_rank: Dict[PlayerId, Any] = {} for p in self.players: - p.committed = 0 - p.all_in = False - self.pot = [] - self.terminal = True - return - + if p.in_hand and not p.folded: + ranking = self.evaluate_hand(p.pid) + if ranking: + p.hand_rankings = ranking + p_rank[p.pid] = ranking + return p_rank + + def cal_winners(self) -> List[PlayerId]: + if len(self.board) != 5: + p_active = [p for p in self.players if p.in_hand and not p.folded] + if len(p_active) == 1: + return [p_active[0].pid] + return [] + p_rank = self._compute_hand_rankings() + if not p_rank: + return [] + best_r = max(r.get_strength() for r in p_rank.values()) + winners = [pid for pid, r in p_rank.items() if r.get_strength() == best_r] + return winners def _is_round_over(self) -> bool: active = [p for p in self.players if p.in_hand and not p.folded] @@ -277,9 +306,9 @@ class Game: if self.current_bet == 0 and self.next_to_act_idx is None: return True for p in active: - if not p.all_in and p.committed < self.current_bet: + if not p.allin and p.committed < self.current_bet: return False - if not p.all_in and p.committed!= self.current_bet: + if not p.allin and p.committed!= self.current_bet: return False return True @@ -287,10 +316,10 @@ class Game: player.folded = True player.in_hand = False # 检查是否只剩一个玩家,如果是的话,游戏结束,进行结算 todo - active_players = [p for p in self.players if p.in_hand and not p.folded] - if len(active_players) == 1: - self.terminal = True - self.winner = [active_players[0].pid] + # active_players = [p for p in self.players if p.in_hand and not p.folded] + # if len(active_players) == 1: + # self.terminal = True + # self.winner = [active_players[0].pid] def _player_call_act(self, player: PlayerState, amount: Optional[int] = None): if not player.in_hand or player.folded: @@ -301,7 +330,7 @@ class Game: player.stack -= pay player.committed += pay if player.stack == 0: - player.all_in = True + player.allin = True def _player_check_act(self, player: PlayerState): if not player.in_hand or player.folded: @@ -310,7 +339,7 @@ class Game: if to_call > 0: raise ValueError(f"Cannot CHECK,need {to_call} to call") if player.stack == 0: - player.all_in = True + player.allin = True def _player_bet_act(self, player: PlayerState, amount: int): if amount is None: @@ -329,7 +358,7 @@ class Game: player.committed += pay if player.stack == 0: - player.all_in = True + player.allin = True self.current_bet = player.committed self.original_bet = player.committed @@ -356,7 +385,7 @@ class Game: player.committed += pay if player.stack == 0: - player.all_in = True + player.allin = True # 有效加注 if player.committed > self.current_bet: @@ -368,13 +397,13 @@ class Game: raise ValueError("Player cannot act") if player.stack <= 0: raise ValueError("Player no chips to all-in") - if player.all_in: + if player.allin: raise ValueError("Player already all-in") pay = player.stack player.stack = 0 player.committed += pay - player.all_in = True + player.allin = True if self.current_bet == 0: # 下注即allin self.current_bet = player.committed @@ -405,7 +434,7 @@ class Game: self._player_bet_act(player, action.amount) case ActionType.RAISE: self._player_raise_act(player, action.amount) - case ActionType.ALL_IN: + case ActionType.ALLIN: self._player_all_in_act(player) def _update_next_to_act(self): @@ -421,17 +450,17 @@ class Game: for i in range(1, n_players + 1): next_idx = (current_idx + i) % n_players player = self.players[next_idx] - if player.in_hand and not player.folded and not player.all_in and player.stack > 0: + if player.in_hand and not player.folded and not player.allin and player.stack > 0: self.next_to_act_idx = next_idx return # 没有找到可行动的玩家,进入下一轮 todo self.next_to_act_idx = None - def advance_to_next_street(self): + def advance_to_next_street(self) -> Street: if self.current_street == Street.RIVER: # 已经是最后一街道,准备摊牌 todo - return - + return self.current_street + round_pot = sum(player.committed for player in self.players) if round_pot > 0: eligible_players = [p.pid for p in self.players if p.in_hand and not p.folded] @@ -448,9 +477,9 @@ class Game: self.invalid_raise = False self._deal_cards_for_street() - self._get_first_act_idx() - + return self.current_street + def _deal_cards_for_street(self): if self.current_street == Street.PREFLOP: burn_card, flop_cards = self.deck_manager.deal_flop() @@ -509,7 +538,8 @@ class Game: return (min_raise_total, max_raise_total) def get_hand_cards(self, player_id: PlayerId) -> List[Card]: - return self.hand_cards.get(player_id, []) + p = self._get_player(player_id) + return p.hand_cards if p else [] def get_current_board(self) -> List[Card]: return self.board.copy() @@ -561,21 +591,21 @@ class Game: if self.invalid_raise: if to_call == 0: if player.stack > 0: - actions.append(ActionType.ALL_IN) + actions.append(ActionType.ALLIN) else: actions.append(ActionType.FOLD) if player.stack > 0: if player.stack >= to_call: # 暂时这么处理吧 actions.append(ActionType.CALL) else: - actions.append(ActionType.ALL_IN) + actions.append(ActionType.ALLIN) return actions if to_call == 0: actions.append(ActionType.CHECK) if player.stack > 0: actions.append(ActionType.BET) - actions.append(ActionType.ALL_IN) + actions.append(ActionType.ALLIN) else: actions.append(ActionType.FOLD) if player.stack > 0: @@ -585,52 +615,47 @@ class Game: if max_raise_to < min_raise_to: if player.stack >= to_call: actions.append(ActionType.CALL) - actions.append(ActionType.ALL_IN) + actions.append(ActionType.ALLIN) else: - actions.append(ActionType.ALL_IN) + actions.append(ActionType.ALLIN) self.invalid_raise = True else: actions.append(ActionType.CALL) actions.append(ActionType.RAISE) - actions.append(ActionType.ALL_IN) + actions.append(ActionType.ALLIN) return actions def evaluate_hand(self, player_id: PlayerId) -> Optional[Any]: - if player_id not in self.hand_cards or len(self.board) < 5: + p = self._get_player(player_id) + if not p or len(self.board) < 5: return None - - all_cards = self.hand_cards[player_id] + self.board + all_cards = p.hand_cards + self.board if len(all_cards) != 7: return None - return HandEvaluator.evaluateHand(all_cards) def showdown(self) -> List[PlayerId]: if len(self.board) != 5: active_players = [p for p in self.players if p.in_hand and not p.folded] if len(active_players) == 1: - self.winner = [active_players[0].pid] self.terminal = True - return self.winner + return [active_players[0].pid] return [] - - player_rankings = {} + in_rank = False for player in self.players: if player.in_hand and not player.folded: ranking = self.evaluate_hand(player.pid) if ranking: - player_rankings[player.pid] = ranking - - if not player_rankings: + in_rank = True + player.hand_rankings = ranking + if not in_rank: return [] - - best_strength = max(ranking.get_strength() for ranking in player_rankings.values()) - winners = [pid for pid, ranking in player_rankings.items() - if ranking.get_strength() == best_strength] - - self.hand_rankings = player_rankings - self.winner = winners + best_rank = max(p.hand_rankings.get_strength() for p in self.players + if p.in_hand and not None) + winners = [p.pid for p in self.players + if p.in_hand and p.hand_rankings is not None and + p.hand_rankings.get_strength() == best_rank] self.terminal = True return winners @@ -674,10 +699,52 @@ class Game: "board": [str(card) for card in self.board], "current_bet": self.current_bet, "pot": [pot.to_dict() for pot in self.pot], - "hand_cards": {str(pid): [str(card) for card in cards] - for pid, cards in self.hand_cards.items()}, "next_to_act_idx": self.next_to_act_idx, "terminal": self.terminal, - "winner": [w.to_dict() for w in self.winner] if self.winner else None, + "winner": [w.to_dict() for w in (self.get_winners() or [])] if self.get_winners() else None, "total_actions": len(self._all_actions), - } \ No newline at end of file + } + + # 测试 + @property + def get_sb_idx(self) -> Optional[int]: + return self._sb_idx + + @property + def get_bb_idx(self) -> Optional[int]: + return self._bb_idx + + @property + def get_active_players(self) -> List[PlayerState]: + return [p for p in self.players if p.in_hand and not p.folded] + + @property + def get_sb_amt(self) -> int: + return self.small_blind if self._sb_idx is not None else 0 + + @property + def get_bb_amt(self) -> int: + return self.big_blind if self._bb_idx is not None else 0 + + @property + def get_dealer_idx(self) -> int: + return self.dealer_idx + + @property + def get_winners(self) -> Optional[List[PlayerId]]: + if not self.terminal: + return None + if self.hand_rankings: + best_strength = max(r.get_strength() for r in self.hand_rankings.values()) + winners = [pid for pid, r in self.hand_rankings.items() + if r.get_strength() == best_strength] + return winners + return None + + @property + def get_next_act_idx(self) -> Optional[int]: + return self.next_to_act_idx + + @property + def get_deck(self) -> Optional[DeckManager]: + return self.deck_manager \ No newline at end of file diff --git a/gametree/model.py b/gametree/model.py index a23684a..1da9aa4 100644 --- a/gametree/model.py +++ b/gametree/model.py @@ -28,13 +28,6 @@ class PlayerId: @staticmethod def from_dict(cls, d: Dict[str, Any]) -> "PlayerId": return cls(int(d["id"]), d.get("name")) - -@dataclass -class Player: - pid: PlayerId - name: str - # seat: int - # address: str class Street(Enum): PREFLOP = 0 @@ -48,7 +41,7 @@ class ActionType(Enum): CHECK = "check" RAISE = "raise" BET = "bet" - ALL_IN = "all_in" + ALLIN = "allin" @dataclass @@ -103,5 +96,5 @@ def act_raise(pid: PlayerId, to_amount: int) -> Action: return Action(actor=pid, type=ActionType.RAISE, amount=to_amount) def act_all_in(pid: PlayerId, amount: int) -> Action: - return Action(actor=pid, type=ActionType.ALL_IN, amount=amount) + return Action(actor=pid, type=ActionType.ALLIN, amount=amount) diff --git a/test/p_creat.py b/test/p_creat.py new file mode 100644 index 0000000..9f66655 --- /dev/null +++ b/test/p_creat.py @@ -0,0 +1,67 @@ +import sys +from pathlib import Path +import pytest +from gametree import Game, PlayerId, Street, ActionType + +def make_players(n, stack): + return [(PlayerId(i, f"p{i}"), stack) for i in range(n)] + +def test_player_states(): + sb = 5 + bb = 10 + d_idx = 1 + n_player = 4 + stack = 500 + players_init = make_players(n_player, stack) + g = Game(players_init=players_init, small_blind=sb, big_blind=bb, dealer_idx=d_idx) + + assert g.current_street == Street.PREFLOP + sb_idx = (d_idx + 1) % n_player + bb_idx = (d_idx + 2) % n_player + next_idx = (d_idx + 3) % n_player + # init_bet + assert g.get_sb_amt == sb + assert g.get_bb_amt == bb + assert g.get_sb_idx == sb_idx + assert g.get_bb_idx == bb_idx + assert g.get_next_act_idx == next_idx + + # init_players + players = g.get_active_players + assert len(players) == n_player + for idx, p in enumerate(players): + assert p.hand_cards is not None and len(p.hand_cards) == 2 + if idx == sb_idx: + assert p.stack == stack - sb + assert p.committed == sb + elif idx == bb_idx: + assert p.stack == stack - bb + assert p.committed == bb + else: + assert p.stack == stack + assert p.committed == 0 + + # init_deck + deck = getattr(g, 'deck_manager', None) + rem_cards = getattr(deck, 'remaining_cards', None) + assert rem_cards is not None + assert rem_cards() == 28 # 36-8 + + # first_player + legal_actions = g.legal_actions(g.players[next_idx].pid) + assert legal_actions is not None and len(legal_actions) == 4 + assert set(legal_actions) == {ActionType.FOLD, ActionType.CALL, + ActionType.RAISE, ActionType.ALLIN} + + # n_players = len(players_init) + # if n_players < 2: + # expected_sb = None + # expected_bb = None + # elif n_players == 2: + # expected_sb = d_idx + # expected_bb = (d_idx + 1) % n_players + # else: + # expected_sb = (d_idx + 1) % len(players_init) + # expected_bb = (d_idx + 2) % len(players_init) + # assert g.get_sb_idx == expected_sb + # assert g.get_bb_idx == expected_bb diff --git a/test/pg.json b/test/pg.json deleted file mode 100644 index 2171eb6..0000000 --- a/test/pg.json +++ /dev/null @@ -1,64 +0,0 @@ -{ - "player_names": { - "0": "a", - "1": "b", - "2": "c", - "3": "d", - "4": "e" - }, - "game_state": { - "players_init": [ - [ - 0, - 500 - ], - [ - 1, - 500 - ], - [ - 2, - 500 - ], - [ - 3, - 500 - ], - [ - 4, - 500 - ] - ], - "dealer_idx": 0, - "small_blind": 5, - "big_blind": 10, - "current_street": "PREFLOP", - "all_actions": [ - { - "type": "CALL", - "actor": 3, - "amount": null - }, - { - "type": "CALL", - "actor": 4, - "amount": null - }, - { - "type": "CALL", - "actor": 0, - "amount": null - }, - { - "type": "ALL_IN", - "actor": 1, - "amount": null - }, - { - "type": "ALL_IN", - "actor": 2, - "amount": null - } - ] - } -} \ No newline at end of file diff --git a/test/pg.py b/test/pg.py index 948b44b..704f6c2 100644 --- a/test/pg.py +++ b/test/pg.py @@ -15,6 +15,7 @@ from gametree.model import act_fold, act_call, act_check, act_bet, act_raise, ac GAME_FILE = "pg.json" def create_file_idx(): + Path("data").mkdir(exist_ok=True) files = list(Path("data").glob("pg_*.json")) game_id = len(files) + 1 return f"data/pg_{game_id:03d}.json" @@ -64,7 +65,7 @@ def display_game_status(game: Game, player_names: Dict[int,str], show_cards_for: marks.append("->") if pstate.folded: marks.append("F") - if pstate.all_in: + if pstate.allin: marks.append("A") info = f"{i}:{name} stack={pstate.stack} bet={pstate.committed} [{' '.join(marks)}]" if show_cards_for == "all" or show_cards_for == name: @@ -144,8 +145,8 @@ def display_player_turn(): elif a == ActionType.RAISE: min_raise, max_raise = GAME.get_raise_bounds(pid) print(f" - raise (min: {min_raise}, max: {max_raise})") - elif a == ActionType.ALL_IN: - print(f" - all_in (amount: {GAME.get_allin_amt(pid)})") + elif a == ActionType.ALLIN: + print(f" - allin (amount: {GAME.get_allin_amt(pid)})") print("----------------------------------------------------") @@ -200,7 +201,7 @@ def main(): return input = shlex.split(args) if len(input) < 2: - print("act [amount]") + print("act [amount]") return pname = input[0] action = input[1] @@ -213,7 +214,7 @@ def main(): legal = GAME.legal_actions(pid) if action.lower() == 'allin': - atype = ActionType.ALL_IN + atype = ActionType.ALLIN elif action.lower() in ('fold', 'call', 'check', 'bet', 'raise'): atype = ActionType[action.upper()] else: @@ -273,7 +274,7 @@ def main(): def cmd_help(args): print("可用命令:") print(" set / [p2 ...] [--stack N] ") - print(" act [amount]") + print(" act [amount]") print(" status [player|all] ") print(" save ") print(" reset ")