gametree:1.0

This commit is contained in:
2025-11-05 17:10:39 +08:00
commit 699b79d0da
14 changed files with 1637 additions and 0 deletions

10
.gitignore vendored Normal file
View File

@@ -0,0 +1,10 @@
# Python-generated files
__pycache__/
*.py[oc]
build/
dist/
wheels/
*.egg-info
# Virtual environments
.venv

1
.python-version Normal file
View File

@@ -0,0 +1 @@
3.13

0
README.md Normal file
View File

37
gametree/__init__.py Normal file
View File

@@ -0,0 +1,37 @@
from .model import (
PlayerId,
Player,
Action,
ActionType,
Street,
)
from .deck import DeckManager
from .card import Card, Rank, Suit
from .game import Game
from .hand_ranking import HandRanking, HandType
from .hand_evaluator import HandEvaluator
__all__ = [
"PlayerId",
"Player",
"Action",
"ActionType",
"Street",
"Game",
"Card",
"Rank",
"Suit",
"HandRanking",
"HandType",
"HandEvaluator",
"DeckManager",
]

86
gametree/card.py Normal file
View File

@@ -0,0 +1,86 @@
from __future__ import annotations
from enum import IntEnum
from typing import List
class Suit(IntEnum):
S = 0
H = 1
D = 2
C = 3
def __str__(self) -> str:
return "shdc"[self.value]
class Rank(IntEnum):
R6 = 6
R7 = 7
R8 = 8
R9 = 9
RT = 10
RJ = 11
RQ = 12
RK = 13
RA = 14
def __str__(self):
if self.value <= 9:
return str(self.value)
return {10: "T", 11: "J", 12: "Q", 13: "K", 14: "A"}[self.value]
@property
def numeric_value(self):
return self.value
@property
def symbol(self):
return str(self)
class Card:
def __init__(self, rank: Rank, suit: Suit):
self.rank = rank
self.suit = suit
def __repr__(self) -> str:
return f"{str(self.rank)}{str(self.suit)}"
def __str__(self) -> str:
return self.__repr__()
@classmethod
def all_short(cls) -> List["Card"]:
cards: List[Card] = []
for r in Rank:
for s in Suit:
cards.append(Card(r, s))
return cards
def __eq__(self, other):
if not isinstance(other, Card):
return False
return self.rank == other.rank and self.suit == other.suit
def __hash__(self):
return hash((self.rank, self.suit))
@classmethod
def from_str(cls, card_str: str) -> "Card":
if len(card_str) != 2:
raise ValueError(f"Card must be 2 char: {card_str}")
rank_char = card_str[0].upper()
suit_char = card_str[1].lower()
rank = next((r for r in Rank if str(r).upper() == rank_char), None)
if rank is None:
raise ValueError(f"Invalid rank: {rank_char}")
suit = next((s for s in Suit if str(s).lower() == suit_char), None)
if suit is None:
raise ValueError(f"Invalid suit: {suit_char}")
return cls(rank, suit)

54
gametree/deck.py Normal file
View File

@@ -0,0 +1,54 @@
# 新的 dealing.py
import random
from typing import List, Tuple, Dict
from .model import PlayerId
from .card import Card
class DeckManager:
def __init__(self, shuffle: bool = True):
self.cards = Card.all_short()
if shuffle:
random.shuffle(self.cards)
self.position = 0 # 当前发牌位置
self.burned_cards: List[Card] = []
def remaining_cards(self) -> int:
return len(self.cards) - self.position
def deal_card(self) -> Card:
if self.position >= len(self.cards):
raise ValueError("Deck has no more cards")
card = self.cards[self.position]
self.position += 1
return card
def burn_card(self) -> Card:
card = self.deal_card()
self.burned_cards.append(card)
return card
def deal_hand_cards(self, players: List[PlayerId]) -> Dict[PlayerId, List[Card]]:
hand_cards = {}
first_cards = [self.deal_card() for _ in players]
second_cards = [self.deal_card() for _ in players]
for i, player_id in enumerate(players):
hand_cards[player_id] = [first_cards[i], second_cards[i]]
return hand_cards
def deal_flop(self) -> Tuple[Card, List[Card]]:
burn = self.burn_card()
flop = [self.deal_card() for _ in range(3)]
return burn, flop
def deal_turn(self) -> Tuple[Card, Card]:
burn = self.burn_card()
turn = self.deal_card()
return burn, turn
def deal_river(self) -> Tuple[Card, Card]:
burn = self.burn_card()
river = self.deal_card()
return burn, river

683
gametree/game.py Normal file
View File

@@ -0,0 +1,683 @@
from __future__ import annotations
from dataclasses import dataclass, field
from typing import List, Optional, Dict, Any, Tuple
import random
from .model import PlayerId, Action, ActionType, Street
from .card import Card
# from .dealing import deal_hand_cards, deal_flop, deal_turn, deal_river
from .hand_evaluator import HandEvaluator
from .deck import DeckManager
@dataclass
class PlayerState:
pid: PlayerId
stack: int # 玩家当前筹码量
committed: int = 0 # 本轮已投入筹码
in_hand: bool = True # 玩家是否仍在本局中
folded: bool = False # 玩家是否已弃牌
all_in: bool = False # 玩家是否已全押
def to_dict(self) -> Dict[str, Any]:
return {
"pid": self.pid.to_dict(),
"stack": self.stack,
"committed": self.committed,
"in_hand": self.in_hand,
"folded": self.folded,
"all_in": self.all_in,
}
@dataclass
class Pot:
amount: int = 0
eligible: List[PlayerId] = field(default_factory=list)
def to_dict(self) -> Dict[str, Any]:
return {
"amount": self.amount,
"eligible": [pid.to_dict() for pid in self.eligible],
}
@staticmethod
def compute_side_pots(players: List[PlayerState]) -> List[Pot]:
pots: List[Pot] = []
remaining = sorted([p for p in players if p.committed > 0],
key=lambda p: p.committed)
consumed = 0
while remaining:
smallest = remaining[0].committed
player_n = len(remaining)
take = smallest - consumed
if take <= 0:
break
pot_amount = take * player_n
pot_eligible = [p.pid for p in remaining]
pots.append(Pot(amount=pot_amount, eligible=pot_eligible))
consumed += take
# 移除已贡献完的玩家
remaining = [p for p in remaining if p.committed > consumed]
return pots
@dataclass
class Game:
players_init: List[Tuple[PlayerId, int]] = field(default_factory=list)
dealer_idx: int = 0
bb_idx: int = 0
sb_idx: int = 0
small_blind: int = 1
big_blind: int = 2
current_street: Street = Street.PREFLOP
hand_cards: Dict[PlayerId, List[Card]] = field(default_factory=dict)
board: List[Card] = field(default_factory=list)
players: List[PlayerState] = field(default_factory=list)
current_bet: int = 0 # 当前轮次最高下注额
last_raise: int = 0 # 上次的加注量
original_bet: int = 0 # 本轮次的起始下注额bb
street_actions: Dict[Street, List[Action]] = field(default_factory=dict)
_all_actions: List[Action] = field(default_factory=list)
next_to_act_idx: Optional[int] = None
terminal: bool = False
winner: Optional[List[PlayerId]] = None
pot: List[Pot] = field(default_factory=list)
hand_rankings: Dict[PlayerId, Any] = field(default_factory=dict)
deck: List[Card] = field(default_factory=list) # 剩余牌
burned_cards: List[Card] = field(default_factory=list)
invalid_raise: bool = False
deck_manager: Optional[DeckManager] = None
def __post_init__(self):
if self.players_init and not self.players:
self._init_deck_manager()
self._init_players()
self._init_hand_cards()
self._init_street_actions()
self._init_bet()
def _init_players(self):
self.players = [
PlayerState(pid=pid, stack=stack)
for pid, stack in self.players_init
]
def _init_deck_manager(self):
if self.deck_manager is None:
self.deck_manager = DeckManager(shuffle=True)
self.burned_cards = []
def _init_hand_cards(self):
player_ids = [pid for pid, _ in self.players_init]
self.hand_cards = self.deck_manager.deal_hand_cards(player_ids)
def _init_street_actions(self):
self.street_actions = {
Street.PREFLOP: [],
Street.FLOP: [],
Street.TURN: [],
Street.RIVER: []
}
def _init_deck(self):
self.deck = Card.all_short().copy()
random.shuffle(self.deck)
# 移除已发的底牌
used_cards = [card for hand in self.hand_cards.values() for card in hand]
self.deck = [card for card in self.deck if card not in used_cards]
def _init_bet(self):
n_players = len(self.players)
if n_players < 2:
return
if n_players == 2:
self.sb_idx = self.dealer_idx # 两人游戏:庄家是小盲
self.bb_idx = (self.dealer_idx + 1) % n_players
else:
self.sb_idx = (self.dealer_idx + 1) % n_players # 小盲位置 = 庄家下一位
self.bb_idx = (self.dealer_idx + 2) % n_players # 大盲位置 = 小盲下一位
# 小盲注
self.players[self.sb_idx].stack -= self.small_blind
self.players[self.sb_idx].committed += self.small_blind
self.players[self.bb_idx].stack -= self.big_blind
self.players[self.bb_idx].committed += self.big_blind
self.current_bet = self.big_blind
self.original_bet = self.big_blind
self.last_raise = self.big_blind
self._get_first_act_idx()
def _get_first_act_idx(self):
"""
首个行动玩家位置
1. preflop: 大盲后第一位(players>=3); 小盲(players=2)
2. postflop: 小盲先行动
"""
n_players = len(self.players)
if n_players < 2:
self.next_to_act_idx = None
return None
if self.current_street == Street.PREFLOP:
first_idx = (self.bb_idx + 1) % n_players if n_players >= 3 else self.sb_idx
self.next_to_act_idx = first_idx
return first_idx
else:
for i in range(0, n_players):
idx = (self.sb_idx + i) % n_players
player = self.players[idx]
if player.in_hand and not player.folded and not player.all_in:
self.next_to_act_idx = idx
return idx
# 没有找到可行动玩家
self.next_to_act_idx = None
return None
def add_action(self, action: Action):
if self.terminal:
raise ValueError("Game is over")
if not action.actor:
raise ValueError("Action requires player")
self.street_actions[self.current_street].append(action)
self._all_actions.append(action)
self._apply_action_to_state(action)
self._update_next_to_act()
if not self.terminal:
if self.next_to_act_idx is None or self._is_round_over():
if self.current_street == Street.RIVER:
self.finalize_game()
else:
self.advance_to_next_street()
# all allin
while (not self.terminal and self.current_street != Street.RIVER and
self._all_active_all_in()):
self.advance_to_next_street()
if (not self.terminal and self.current_street == Street.RIVER and
(self.next_to_act_idx is None or self._all_active_all_in())):
self.finalize_game()
def _all_active_all_in(self) -> bool:
active = [p for p in self.players if p.in_hand and not p.folded]
if not active:
return True
if len(active) <= 1:
return True
return all(p.all_in for p in active)
def finalize_game(self):
print("+++++++++++++++++++++++++++++++++++++++++++++++++++++++++")
print("DEBUG: finalize_game")
print(f"DEBUG: pot before finalize: {[(p.amount, len(p.eligible)) for p in self.pot]}")
print(f"DEBUG: active players: {[(p.pid, p.stack, p.committed, p.folded, p.all_in)
for p in self.players if p.in_hand]}")
if not self.pot:
self.finalize_pots()
active = [p for p in self.players if p.in_hand and not p.folded]
if len(active) == 1:
win_pid = active[0].pid
total = sum(pot.amount for pot in self.pot)
active[0].stack += total
for p in self.players:
p.committed = 0
self.pot = []
self.winner = [win_pid]
self.terminal = True
return
winners_all = self.showdown()
if winners_all:
self.winner = winners_all
for pot in self.pot:
elig = [pid for pid in pot.eligible if pid in self.hand_rankings]
if not elig:
continue
best_strength = max(self.hand_rankings[pid].get_strength() for pid in elig)
pot_winners = [pid for pid in elig if self.hand_rankings[pid].get_strength() == best_strength]
share = pot.amount // len(pot_winners)
rem = pot.amount % len(pot_winners)
for i, pid in enumerate(pot_winners):
ps = self._get_player(pid)
if ps is None:
continue
ps.stack += share + (rem if i == 0 else 0)
for p in self.players:
p.committed = 0
p.all_in = False
self.pot = []
self.terminal = True
return
def _is_round_over(self) -> bool:
active = [p for p in self.players if p.in_hand and not p.folded]
if len(active) <= 1:
return True
if self.current_bet == 0 and self.next_to_act_idx is None:
return True
for p in active:
if not p.all_in and p.committed < self.current_bet:
return False
if not p.all_in and p.committed!= self.current_bet:
return False
return True
def _player_fold_act(self, player: PlayerState):
player.folded = True
player.in_hand = False
# 检查是否只剩一个玩家,如果是的话,游戏结束,进行结算 todo
active_players = [p for p in self.players if p.in_hand and not p.folded]
if len(active_players) == 1:
self.terminal = True
self.winner = [active_players[0].pid]
def _player_call_act(self, player: PlayerState, amount: Optional[int] = None):
if not player.in_hand or player.folded:
raise ValueError("Player cannot act")
to_call = max(0, self.current_bet - player.committed)
amount = amount if amount else to_call
pay = min(amount, player.stack)
player.stack -= pay
player.committed += pay
if player.stack == 0:
player.all_in = True
def _player_check_act(self, player: PlayerState):
if not player.in_hand or player.folded:
raise ValueError("Player cannot act")
to_call = max(0, self.current_bet - player.committed)
if to_call > 0:
raise ValueError(f"Cannot CHECK,need {to_call} to call")
if player.stack == 0:
player.all_in = True
def _player_bet_act(self, player: PlayerState, amount: int):
if amount is None:
raise ValueError("BET requires amount")
if self.current_bet > 0:
raise ValueError(f"Cannot BET when current bet is {self.current_bet}")
# BET的最小金额通常是bb
min_bet = self.big_blind
if amount < min_bet:
raise ValueError(f"BET amount {amount} < minimum bet {min_bet}")
required = max(0, amount - player.committed)
pay = min(required, player.stack)
player.stack -= pay
player.committed += pay
if player.stack == 0:
player.all_in = True
self.current_bet = player.committed
self.original_bet = player.committed
self.last_raise = player.committed
def _player_raise_act(self, player: PlayerState, amount: int):
if not player.in_hand or player.folded:
raise ValueError("Player cannot act")
if amount is None:
raise ValueError("RAISE requires amount")
if self.current_bet == 0:
raise ValueError("Cannot RAISE when no one has bet")
min_raise_to = self.current_bet + self.last_raise
if amount < min_raise_to:
raise ValueError(
f"RAISE amount {amount} < minimum raise to {min_raise_to} "
f"(current bet {self.current_bet} + last raise {self.last_raise})"
)
required = max(0, amount - player.committed)
pay = min(required, player.stack)
player.stack -= pay
player.committed += pay
if player.stack == 0:
player.all_in = True
# 有效加注
if player.committed > self.current_bet:
self.last_raise = player.committed - self.current_bet
self.current_bet = player.committed
def _player_all_in_act(self, player: PlayerState):
if not player.in_hand or player.folded:
raise ValueError("Player cannot act")
if player.stack <= 0:
raise ValueError("Player no chips to all-in")
if player.all_in:
raise ValueError("Player already all-in")
pay = player.stack
player.stack = 0
player.committed += pay
player.all_in = True
if self.current_bet == 0: # 下注即allin
self.current_bet = player.committed
self.original_bet = player.committed
self.last_raise = player.committed
elif player.committed > self.current_bet:
if player.committed < self.current_bet + self.last_raise:
self.invalid_raise = True
self.last_raise = player.committed - self.current_bet
self.current_bet = player.committed
else:
self.invalid_raise = True
def _apply_action_to_state(self, action: Action):
if action.amount and action.amount < 0:
raise ValueError("Action amount cannot be negative")
player_idx = self._get_player_index(action.actor)
player = self.players[player_idx]
match action.type:
case ActionType.FOLD:
self._player_fold_act(player)
case ActionType.CALL:
self._player_call_act(player, action.amount)
case ActionType.CHECK:
self._player_check_act(player)
case ActionType.BET:
self._player_bet_act(player, action.amount)
case ActionType.RAISE:
self._player_raise_act(player, action.amount)
case ActionType.ALL_IN:
self._player_all_in_act(player)
def _update_next_to_act(self):
if self.terminal:
self.next_to_act_idx = None
return
current_idx = self.next_to_act_idx
if current_idx is None:
return
n_players = len(self.players)
for i in range(1, n_players + 1):
next_idx = (current_idx + i) % n_players
player = self.players[next_idx]
if player.in_hand and not player.folded and not player.all_in and player.stack > 0:
self.next_to_act_idx = next_idx
return
# 没有找到可行动的玩家,进入下一轮 todo
self.next_to_act_idx = None
def advance_to_next_street(self):
if self.current_street == Street.RIVER:
# 已经是最后一街道,准备摊牌 todo
return
round_pot = sum(player.committed for player in self.players)
if round_pot > 0:
eligible_players = [p.pid for p in self.players if p.in_hand and not p.folded]
if not self.pot:
self.pot = []
self.pot.append(Pot(amount=round_pot, eligible=eligible_players))
for player in self.players:
player.committed = 0
self.current_bet = 0
self.original_bet = 0
self.last_raise = 0
self.invalid_raise = False
self._deal_cards_for_street()
self._get_first_act_idx()
def _deal_cards_for_street(self):
if self.current_street == Street.PREFLOP:
burn_card, flop_cards = self.deck_manager.deal_flop()
self.burned_cards.append(burn_card)
self.board = flop_cards
self.current_street = Street.FLOP
elif self.current_street == Street.FLOP:
burn_card, turn_card = self.deck_manager.deal_turn()
self.burned_cards.append(burn_card)
self.board.append(turn_card)
self.current_street = Street.TURN
elif self.current_street == Street.TURN:
burn_card, river_card = self.deck_manager.deal_river()
self.burned_cards.append(burn_card)
self.board.append(river_card)
self.current_street = Street.RIVER
def get_call_amt(self, player_id: PlayerId) -> int:
player = self._get_player(player_id)
if not player:
return 0
return max(0, self.current_bet - player.committed)
def get_allin_amt(self, player_id: PlayerId) -> int:
player = self._get_player(player_id)
if not player:
return 0
return player.stack
def get_bet_bound(self, player_id: PlayerId) -> Tuple[Optional[int], Optional[int]]:
player = self._get_player(player_id)
if not player:
return (None, None)
to_call = self.get_call_amt(player_id)
if to_call != 0 or player.stack <= 0:
return (None, None)
min_bet = max(1, self.big_blind)
max_bet = player.stack
return (min_bet, max_bet)
def get_raise_bounds(self, player_id: PlayerId) -> Tuple[Optional[int], Optional[int]]:
# raise total
player = self._get_player(player_id)
if not player:
return (None, None)
to_call = self.get_call_amt(player_id)
if to_call == 0 or player.stack <= 0:
return (None, None)
base_raise = self.last_raise if (self.last_raise and self.last_raise > 0) else self.big_blind
min_raise_total = self.current_bet + base_raise
max_raise_total = player.committed + player.stack # alin
# min>max, invalide-raise, todo:legal_action处理
return (min_raise_total, max_raise_total)
def get_hand_cards(self, player_id: PlayerId) -> List[Card]:
return self.hand_cards.get(player_id, [])
def get_current_board(self) -> List[Card]:
return self.board.copy()
def get_actions_in_street(self, street: Street) -> List[Action]:
return self.street_actions.get(street, [])
def get_all_actions(self) -> List[Action]:
return self._all_actions.copy()
def get_remaining_deck(self) -> int:
return self.deck_manager.remaining_cards() if self.deck_manager else 0
def get_burned_cards(self) -> List[Card]:
return self.burned_cards.copy()
@property
def total_pot(self) -> int:
committed_total = sum(p.committed for p in self.players)
pot_total = sum(pot.amount for pot in self.pot)
return committed_total + pot_total
def get_player_stack(self, player_id: PlayerId) -> int:
player = self._get_player(player_id)
return player.stack if player else 0
def legal_actions(self, player_id: PlayerId) -> List[ActionType]:
player = self._get_player(player_id)
if not player or self.terminal or player.folded or not player.in_hand:
return []
to_call = max(0, self.current_bet - player.committed)
actions = []
# 前面行动的玩家无效加注行为,后面行动的玩家只能选择跟注、弃牌、全押
# TODO: 这里的逻辑是否合理还需要研究
# deepseek:“跟注或弃牌”是针对“加注”这一行为的回应,而“全下”是超越于这个循环之外的、每个玩家的终极权利。
# eg:50/100的牌桌第一个行动的玩家bet100第二个玩家raise400第三个玩家allin250
# 第四个玩家allin180,现在又回到第一个玩家行动,行动有哪些
# deepseek认为第一个玩家面对的是一个400的加注他可以选择弃牌、跟注400或者加注全押
# 1.弃牌或跟注
# 2.根据最后一个有效加注400的有效加注
# 3.全压的无效加注
# 那么玩家2对应玩家1的行为为
# 1.翻牌
# 2.跟注、加注、弃牌
# 3.跟注、弃牌
# 疑问为什么玩家1可以开启加注轮
if self.invalid_raise:
if to_call == 0:
if player.stack > 0:
actions.append(ActionType.ALL_IN)
else:
actions.append(ActionType.FOLD)
if player.stack > 0:
if player.stack >= to_call: # 暂时这么处理吧
actions.append(ActionType.CALL)
else:
actions.append(ActionType.ALL_IN)
return actions
if to_call == 0:
actions.append(ActionType.CHECK)
if player.stack > 0:
actions.append(ActionType.BET)
actions.append(ActionType.ALL_IN)
else:
actions.append(ActionType.FOLD)
if player.stack > 0:
base_raise = self.last_raise if (self.last_raise and self.last_raise > 0) else self.big_blind
min_raise_to = self.current_bet + base_raise
max_raise_to = player.committed + player.stack
if max_raise_to < min_raise_to:
if player.stack >= to_call:
actions.append(ActionType.CALL)
else:
actions.append(ActionType.ALL_IN)
self.invalid_raise = True
else:
actions.append(ActionType.CALL)
actions.append(ActionType.RAISE)
actions.append(ActionType.ALL_IN)
return actions
def evaluate_hand(self, player_id: PlayerId) -> Optional[Any]:
"""评估玩家手牌"""
if player_id not in self.hand_cards or len(self.board) < 5:
return None
all_cards = self.hand_cards[player_id] + self.board
if len(all_cards) != 7:
return None
return HandEvaluator.evaluateHand(all_cards)
def showdown(self) -> List[PlayerId]:
if len(self.board) != 5:
active_players = [p for p in self.players if p.in_hand and not p.folded]
if len(active_players) == 1:
self.winner = [active_players[0].pid]
self.terminal = True
return self.winner
return []
player_rankings = {}
for player in self.players:
if player.in_hand and not player.folded:
ranking = self.evaluate_hand(player.pid)
if ranking:
player_rankings[player.pid] = ranking
if not player_rankings:
return []
best_strength = max(ranking.get_strength() for ranking in player_rankings.values())
winners = [pid for pid, ranking in player_rankings.items()
if ranking.get_strength() == best_strength]
self.hand_rankings = player_rankings
self.winner = winners
self.terminal = True
return winners
def finalize_pots(self):
self.pot = Pot.compute_side_pots(self.players)
def _get_player(self, player_id: PlayerId) -> Optional[PlayerState]:
for player in self.players:
if player.pid == player_id:
return player
return None
def _get_player_index(self, player_id: PlayerId) -> int:
for i, player in enumerate(self.players):
if player.pid == player_id:
return i
raise ValueError(f"Player {player_id} not found")
def get_player_by_idx(self, index)-> PlayerId:
# print("----------------------------------------------------")
if index < 0 or index >= len(self.players):
print(f"Invalid player index: {index}")
raise ValueError(f"Player index {index} out of range")
# print(f"debug-pid: {self.players[index].pid}")
# print("----------------------------------------------------")
return self.players[index].pid
@property
def actions(self) -> List[Action]:
return self._all_actions
def to_dict(self) -> Dict[str, Any]:
return {
"players": [p.to_dict() for p in self.players],
"dealer_idx": self.dealer_idx,
"small_blind": self.small_blind,
"big_blind": self.big_blind,
"street": self.current_street.name,
"board": [str(card) for card in self.board],
"current_bet": self.current_bet,
"pot": [pot.to_dict() for pot in self.pot],
"hand_cards": {str(pid): [str(card) for card in cards]
for pid, cards in self.hand_cards.items()},
"next_to_act_idx": self.next_to_act_idx,
"terminal": self.terminal,
"winner": [w.to_dict() for w in self.winner] if self.winner else None,
"total_actions": len(self._all_actions),
}

109
gametree/hand_evaluator.py Normal file
View File

@@ -0,0 +1,109 @@
from typing import List, Tuple, Dict
from collections import Counter
from itertools import combinations
from .card import Card, Rank, Suit
from .hand_ranking import HandRanking, HandType
class HandEvaluator:
@staticmethod
def evaluateHand(cards) -> HandRanking:
if len(cards) != 7:
raise ValueError(f"Expected 7 cards, got {len(cards)}")
best_ranking = None
best_cards = None
# 所有可能的5张牌组合
for five_cards in combinations(cards, 5):
ranking = HandEvaluator.evaluate5Cards(list(five_cards))
if best_ranking is None or ranking > best_ranking:
best_ranking = ranking
best_cards = list(five_cards)
best_ranking.cards = best_cards
return best_ranking
@staticmethod
def evaluate5Cards(cards) -> HandRanking:
if len(cards) != 5:
raise ValueError(f"Expected 5 cards, got {len(cards)}")
# 按点数排序(降序)
sorted_cards = sorted(cards, key=lambda c: c.rank.numeric_value, reverse=True)
ranks = [card.rank for card in sorted_cards]
suits = [card.suit for card in sorted_cards]
# 统计点数出现次数
rank_counts = Counter(ranks)
count_values = sorted(rank_counts.values(), reverse=True)
# 同花
is_flush = len(set(suits)) == 1
# 顺子
is_straight, straight_high = HandEvaluator._isStraight(ranks)
# 根据牌型返回相应的HandRanking
if is_straight and is_flush:
if straight_high == Rank.RA and ranks == [Rank.RA, Rank.RK, Rank.RQ, Rank.RJ, Rank.RT]:
return HandRanking(HandType.ROYAL_FLUSH, [Rank.RA], sorted_cards) # 皇家同花顺
else:
return HandRanking(HandType.STRAIGHT_FLUSH, [straight_high], sorted_cards) # 同花顺
elif count_values == [4, 1]: # 四条
quad_rank = [rank for rank, count in rank_counts.items() if count == 4][0]
kicker = [rank for rank, count in rank_counts.items() if count == 1][0]
return HandRanking(HandType.FOUR_OF_A_KIND, [quad_rank, kicker], sorted_cards)
elif count_values == [3, 2]: # 三条+一对
trips_rank = [rank for rank, count in rank_counts.items() if count == 3][0]
pair_rank = [rank for rank, count in rank_counts.items() if count == 2][0]
return HandRanking(HandType.FULL_HOUSE, [trips_rank, pair_rank], sorted_cards)
elif is_flush: # 同花
return HandRanking(HandType.FLUSH, ranks, sorted_cards)
elif is_straight: # 顺子
return HandRanking(HandType.STRAIGHT, [straight_high], sorted_cards)
elif count_values == [3, 1, 1]: # 三条
trips_rank = [rank for rank, count in rank_counts.items() if count == 3][0]
kickers = sorted([rank for rank, count in rank_counts.items() if count == 1], reverse=True)
return HandRanking(HandType.THREE_OF_A_KIND, [trips_rank] + kickers, sorted_cards)
elif count_values == [2, 2, 1]: # 两对
pairs = sorted([rank for rank, count in rank_counts.items() if count == 2], reverse=True)
kicker = [rank for rank, count in rank_counts.items() if count == 1][0]
return HandRanking(HandType.TWO_PAIR, pairs + [kicker], sorted_cards)
elif count_values == [2, 1, 1, 1]: # 一对
pair_rank = [rank for rank, count in rank_counts.items() if count == 2][0]
kickers = sorted([rank for rank, count in rank_counts.items() if count == 1], reverse=True)
return HandRanking(HandType.ONE_PAIR, [pair_rank] + kickers, sorted_cards)
else: # 高牌
return HandRanking(HandType.HIGH_CARD, ranks, sorted_cards)
@staticmethod
def _isStraight(ranks: List[Rank]) -> Tuple[bool, Rank]:
values = sorted([rank.numeric_value for rank in ranks], reverse=True)
is_regular_straight = True
for i in range(1, len(values)):
if values[i-1] - values[i] != 1:
is_regular_straight = False
break
if is_regular_straight:
highest_rank = None
for rank in ranks:
if rank.numeric_value == values[0]:
highest_rank = rank
break
return True, highest_rank
if values == {14, 9, 8, 7, 6}: # A, 9, 8, 7, 6
return True, Rank.R9
return False, None

82
gametree/hand_ranking.py Normal file
View File

@@ -0,0 +1,82 @@
from enum import Enum
from typing import List, Tuple
from .card import Card, Rank
class HandType(Enum):
# 短牌规则:同花 > 葫芦
HIGH_CARD = (1, "High Card")
ONE_PAIR = (2, "Pair")
TWO_PAIR = (3, "Two Pair")
THREE_OF_A_KIND = (4, "Three of a Kind")
STRAIGHT = (5, "Straight")
FULL_HOUSE = (6, "Full House")
FLUSH = (7, "Flush")
FOUR_OF_A_KIND = (8, "Four of a Kind")
STRAIGHT_FLUSH = (9, "Straight Flush")
ROYAL_FLUSH = (10, "Royal Flush")
def __new__(cls, strength, name):
obj = object.__new__(cls)
obj._value_ = strength
obj.strength = strength
obj.type_name = name
return obj
class HandRanking:
def __init__(self, hand_type: HandType, key_ranks: List[Rank], cards: List[Card]):
self.hand_type = hand_type
self.key_ranks = key_ranks # 用于比较的关键点数
self.cards = cards # 组成这个ranking的5张牌
def __str__(self):
if self.hand_type == HandType.FOUR_OF_A_KIND:
return f"Quad({self.key_ranks[0].symbol})"
elif self.hand_type == HandType.FULL_HOUSE:
return f"Full House({self.key_ranks[0].symbol} over {self.key_ranks[1].symbol})"
elif self.hand_type == HandType.FLUSH:
return f"Flush({self.key_ranks[0].symbol} high)"
elif self.hand_type == HandType.STRAIGHT:
return f"Straight({self.key_ranks[0].symbol} high)"
elif self.hand_type == HandType.STRAIGHT_FLUSH:
if self.key_ranks[0] == Rank.ACE:
return "Royal Flush"
else:
return f"Straight Flush({self.key_ranks[0].symbol} high)"
elif self.hand_type == HandType.ROYAL_FLUSH:
return "Royal Flush"
elif self.hand_type == HandType.THREE_OF_A_KIND:
return f"Three of a Kind({self.key_ranks[0].symbol})"
elif self.hand_type == HandType.TWO_PAIR:
return f"Two Pair({self.key_ranks[0].symbol} and {self.key_ranks[1].symbol})"
elif self.hand_type == HandType.ONE_PAIR:
return f"Pair({self.key_ranks[0].symbol})"
else:
return f"High Card({self.key_ranks[0].symbol})"
def __lt__(self, other):
"""比较牌力,用于排序"""
if not isinstance(other, HandRanking):
return NotImplemented
if self.hand_type.strength != other.hand_type.strength:
return self.hand_type.strength < other.hand_type.strength
for my_rank, other_rank in zip(self.key_ranks, other.key_ranks):
if my_rank.numeric_value != other_rank.numeric_value:
return my_rank.numeric_value < other_rank.numeric_value
return False
def get_strength(self) -> int:
# 返回牌力 还是 牌型+点数
# 基础强度 = 牌型强度 * 1000000
# todo
strength = self.hand_type.strength * 1000000
for i, rank in enumerate(self.key_ranks):
strength += rank.numeric_value * (100 ** (4 - i))
return strength

107
gametree/model.py Normal file
View File

@@ -0,0 +1,107 @@
from dataclasses import dataclass, field
from typing import List, Optional,Tuple,Dict,Any
from enum import Enum, auto
from .card import Card
import random
@dataclass(frozen=True)
class PlayerId:
id: int
name: str
def __post_init__(self):
if not isinstance(self.id, int):
raise TypeError("PlayerId.id must be int")
if self.id < 0:
raise ValueError("PlayerId.id must be >= 0")
def __str__(self) -> str:
return f"PlayerId{self.id}" + f"({self.name})"
def __hash__(self):
return hash(self.id)
def to_dict(self) -> Dict[str, Any]:
return {"id": self.id, "name": self.name}
@staticmethod
def from_dict(cls, d: Dict[str, Any]) -> "PlayerId":
return cls(int(d["id"]), d.get("name"))
@dataclass
class Player:
pid: PlayerId
name: str
# seat: int
# address: str
class Street(Enum):
PREFLOP = 0
FLOP = 1
TURN = 2
RIVER = 3
class ActionType(Enum):
FOLD = "fold"
CALL = "call"
CHECK = "check"
RAISE = "raise"
BET = "bet"
ALL_IN = "all_in"
@dataclass
class Action:
actor: PlayerId # 现在所有动作都必须有玩家
type: ActionType
amount: Optional[int] = None
cards:Optional[List[Card]] = None
def __post_init__(self):
if self.type in (ActionType.BET, ActionType.RAISE):
if self.amount is None:
raise ValueError(f"Action{self.type.name} need amount")
if self.amount <= 0:
raise ValueError(f"Action{self.type.name} need amount > 0")
def action_to_dict(self) -> Dict[str, Any]:
return {
"actor": self.actor.to_dict(),
"type": self.type.name,
"amount": self.amount,
"cards": [str(c) for c in self.cards] if self.cards else None,
}
@staticmethod
def action_from_dict(d: Dict[str, Any]) -> "Action":
actor = PlayerId.from_dict(d["actor"])
type = ActionType[d["type"]]
# 反序列
cards = [Card.from_str(s) for s in d["cards"]] if d.get("cards") else None
return Action(
actor=actor,
type=type,
amount=d.get("amount"),
cards=cards
)
def act_fold(pid: PlayerId) -> Action:
return Action(actor=pid, type=ActionType.FOLD)
def act_check(pid: PlayerId) -> Action:
return Action(actor=pid, type=ActionType.CHECK)
def act_call(pid: PlayerId, amount: int) -> Action:
return Action(actor=pid, type=ActionType.CALL, amount=amount)
def act_bet(pid: PlayerId, amount: int) -> Action:
return Action(actor=pid, type=ActionType.BET, amount=amount)
def act_raise(pid: PlayerId, to_amount: int) -> Action:
return Action(actor=pid, type=ActionType.RAISE, amount=to_amount)
def act_all_in(pid: PlayerId, amount: int) -> Action:
return Action(actor=pid, type=ActionType.ALL_IN, amount=amount)

7
pyproject.toml Normal file
View File

@@ -0,0 +1,7 @@
[project]
name = "gametree"
version = "0.1.0"
description = "Add your description here"
readme = "README.md"
requires-python = ">=3.13"
dependencies = []

0
test/__init__.py Normal file
View File

109
test/pg.json Normal file
View File

@@ -0,0 +1,109 @@
{
"player_names": {
"0": "a",
"1": "b",
"2": "c",
"3": "d"
},
"game_state": {
"players_init": [
[
0,
500
],
[
1,
500
],
[
2,
500
],
[
3,
500
]
],
"dealer_idx": 0,
"small_blind": 5,
"big_blind": 10,
"current_street": "RIVER",
"all_actions": [
{
"type": "CALL",
"actor": 3,
"amount": null
},
{
"type": "CALL",
"actor": 0,
"amount": null
},
{
"type": "CALL",
"actor": 1,
"amount": null
},
{
"type": "BET",
"actor": 1,
"amount": 40
},
{
"type": "CALL",
"actor": 2,
"amount": null
},
{
"type": "CALL",
"actor": 3,
"amount": null
},
{
"type": "CALL",
"actor": 0,
"amount": null
},
{
"type": "BET",
"actor": 1,
"amount": 40
},
{
"type": "CALL",
"actor": 2,
"amount": null
},
{
"type": "CALL",
"actor": 3,
"amount": null
},
{
"type": "CALL",
"actor": 0,
"amount": null
},
{
"type": "BET",
"actor": 1,
"amount": 40
},
{
"type": "CALL",
"actor": 2,
"amount": null
},
{
"type": "CALL",
"actor": 3,
"amount": null
},
{
"type": "CALL",
"actor": 0,
"amount": null
}
]
}
}

352
test/pg.py Normal file
View File

@@ -0,0 +1,352 @@
#!/usr/bin/env python3
import json
import os
import sys
from pathlib import Path
from typing import Optional, List, Dict, Any, Tuple
sys.path.insert(0, str(Path(__file__).parent.parent))
import json
from pathlib import Path
from gametree import Game, PlayerId, Action, ActionType, Street
from gametree.model import act_fold, act_call, act_check, act_bet, act_raise, act_all_in
GAME_FILE = "pg.json"
def create_simple_game(player_names: List[str],
small_blind: int = 5,
big_blind: int = 10,
stack: int = 500,
dealer_idx: int = 0):
player_map: Dict[int, str] = {i: name for i, name in enumerate(player_names)}
player_ids: List[PlayerId] = [PlayerId(i,name) for i,name in enumerate(player_names)]
players_init = [(pid, stack) for pid in player_ids]
game = Game(players_init=players_init, dealer_idx=dealer_idx,
small_blind=small_blind, big_blind=big_blind)
return game, player_map, player_ids
def save_simple_game(path: str, game: Game, players) -> None:
p = Path(path)
data = {
"player_names": players,
"game_state": {
"players_init": [[pid.id, stack] for pid, stack in getattr(game, "players_init", [])],
"dealer_idx": getattr(game, "dealer_idx", None),
"small_blind": getattr(game, "small_blind", None),
"big_blind": getattr(game, "big_blind", None),
"current_street": getattr(game, "current_street").name if getattr(game, "current_street", None) else None,
"all_actions": [
{"type": a.type.name, "actor": int(a.actor.id), "amount": a.amount}
for a in game.get_all_actions()
]
}
}
p.write_text(json.dumps(data, indent=2), encoding="utf-8")
# 单开不加载
# def load_simple_game(path: str) -> Tuple[Game, Dict[int,str]]:
# p = Path(path)
# data = json.loads(p.read_text(encoding="utf-8"))
# player_names = {int(k): v for k, v in data["player_names"].items()}
# gs = data["game_state"]
# players_init = [(PlayerId(pid), stack) for pid, stack in gs.get("players_init", [])]
# game = Game(players_init=players_init,
# dealer_idx=gs.get("dealer_idx", 0),
# small_blind=gs.get("small_blind", 5),
# big_blind=gs.get("big_blind", 10))
# for ad in gs.get("all_actions", []):
# try:
# act = Action(type=ActionType[ad["type"]], actor=PlayerId(ad["actor"]), amount=ad.get("amount"))
# game.add_action(act)
# except Exception:
# continue
# target = gs.get("current_street")
# if target:
# from .model import Street as _Street
# target_st = _Street[target]
# while game.current_street != target_st and not getattr(game, "terminal", False):
# game.advance_to_next_street()
# return game, player_names
def display_game_status(game: Game, player_names: Dict[int,str], show_cards_for: Optional[str] = None) -> None:
print(f"Street: {game.current_street.name}")
board = game.get_current_board()
if board:
print("Board:", " ".join(str(c) for c in board))
print("Pot:", getattr(game, "total_pot", 0))
for i, pstate in enumerate(game.players):
name = player_names.get(i, f"Player_{i}")
marks = []
if i == getattr(game, "dealer_idx", None):
marks.append("D")
if i == getattr(game, "next_to_act_idx", None):
marks.append("->")
if pstate.folded:
marks.append("F")
if pstate.all_in:
marks.append("A")
info = f"{i}:{name} stack={pstate.stack} bet={pstate.committed} [{' '.join(marks)}]"
if show_cards_for == "all" or show_cards_for == name:
cards = game.get_hand_cards(PlayerId(i)) or []
info += " hand=" + " ".join(str(c) for c in cards)
print(info)
if getattr(game, "terminal", True):
display_winners(game, player_names)
def display_winners(game: Game, player_names: Dict[int,str]) -> None:
print("=" * 50)
print("GAME OVER")
winners = getattr(game, "winner", [])
if winners:
winner_names = []
for pid in winners:
winner_idx = getattr(pid, "id", None)
if winner_idx is not None and winner_idx in player_names:
winner_names.append(player_names[winner_idx])
else:
winner_names.append(str(pid))
print(f"Winner(s): {', '.join(winner_names)}")
hand_rankings = getattr(game, "hand_rankings", {})
if hand_rankings:
print("\nHand Rankings:")
for pid, ranking in hand_rankings.items():
player_idx = getattr(pid, "id", None)
player_name = player_names.get(player_idx, str(pid)) if player_idx is not None else str(pid)
cards = game.get_hand_cards(pid) or []
board = game.get_current_board()
all_cards = cards + board
print(f" {player_name}: {' '.join(str(c) for c in cards)} | {ranking}")
print("=" * 50)
def display_player_turn():
if GAME is None or P_IDS is None:
print("No game loaded/created")
return
idx = getattr(GAME, "next_to_act_idx", None)
if idx is None:
GAME._get_first_act_idx()
idx = getattr(GAME, "next_to_act_idx", None)
if idx is None:
print("No next-to-act player (index is None)")
return
pid = GAME.get_player_by_idx(idx)
pstate = GAME.players[idx]
# todo : 接口
to_call = max(0, getattr(GAME, "current_bet", 0) - getattr(pstate, "committed", 0))
allin_amount = getattr(pstate, "stack", 0)
last_raise = getattr(GAME, "last_raise", 0) or 0
actions_display = GAME.legal_actions(pid)
player_name = P_NAME.get(idx, f"Player_{idx}") if isinstance(P_NAME, dict) else f"Player_{idx}"
pid_name = getattr(pid, "name", None)
print("\n--- Next to act ---")
print(f"player index: {idx}")
print(f"player name: {player_name}")
print(f"player id: {pid.id}" + (f" ({pid_name})"))
print("legal actions:")
for a in actions_display:
if a == ActionType.FOLD:
print(" - fold")
elif a == ActionType.CALL:
print(f" - call (amount: {GAME.get_call_amt(pid)})")
elif a == ActionType.CHECK:
print(" - check")
elif a == ActionType.BET:
min_bet, max_bet = GAME.get_bet_bound(pid)
print(f" - bet (min: {min_bet}, max: {max_bet})")
elif a == ActionType.RAISE:
min_raise, max_raise = GAME.get_raise_bounds(pid)
print(f" - raise (min: {min_raise}, max: {max_raise})")
elif a == ActionType.ALL_IN:
print(f" - all_in (amount: {GAME.get_allin_amt(pid)})")
print("----------------------------------------------------")
def main():
global GAME, P_IDS, CUR_PID, P_NAME
import shlex
def get_pid_by_name(name):
if P_NAME is None or P_IDS is None:
return None, None
for idx, pid in enumerate(P_IDS):
if P_NAME.get(idx) == name:
return pid, idx
return None, None
def cmd_set(args):
global GAME, P_NAME, P_IDS
input_line = args.split()
if len(input_line) < 3:
print("usage: set <small>/<big> [player ...] [--stack N]")
return
blinds = input_line[0].split('/')
names = []
stack = 500
for p in input_line[1:]:
if not p.startswith('--stack'):
names.append(p)
if len(input_line) >= 4 and input_line[-2] == '--stack':
stack = int(input_line[-1])
names = input_line[1:-2]
game, pname_map, pids = create_simple_game(names,
small_blind=int(blinds[0]),
big_blind=int(blinds[1]),
stack=stack)
GAME = game
P_NAME = pname_map
P_IDS = pids
save_simple_game(GAME_FILE, GAME, P_NAME)
GAME._get_first_act_idx()
display_game_status(GAME, P_NAME)
display_player_turn()
def cmd_act(args):
global GAME
if GAME is None or P_IDS is None:
print("no game to play!!!")
return
input = shlex.split(args)
if len(input) < 2:
print("act <player_name> <fold|call|check|bet|raise|all_in> [amount]")
return
pname = input[0]
action = input[1]
amt = int(input[2]) if len(input) >= 3 else None
pid, idx = get_pid_by_name(pname)
if pid is None:
print(f"unknown player:{pname}")
return
legal = GAME.legal_actions(pid)
if action.lower() == 'allin':
atype = ActionType.ALL_IN
elif action.lower() in ('fold', 'call', 'check', 'bet', 'raise'):
atype = ActionType[action.upper()]
else:
print(f"invalid action:{action}")
return
if atype == ActionType.BET or atype == ActionType.RAISE:
if amt is None:
print(f"action {action} need amount")
return
if atype not in legal:
print(f"invalid action:{action} for player :{pname}")
return
a = Action(type=atype, actor=pid, amount=amt)
GAME.add_action(a)
save_simple_game(GAME_FILE, GAME, P_NAME)
print(f" {pname} {action}" + (f" {amt}" if amt is not None else ""))
display_game_status(GAME, P_NAME)
if not GAME.terminal:
display_player_turn()
def cmd_status(args):
# 单开
# if GAME is None:
# if os.path.exists(GAME_FILE):
# g, names = load_simple_game(GAME_FILE)
# display_game_status(g, names, show_cards_for=args.strip() if args else None)
# else:
# print("no game")
# else:
if GAME is None or P_NAME is None:
print("no game to display")
return
display_game_status(GAME, P_NAME, show_cards_for=args.strip() if args else None)
def cmd_next(args):
global GAME
GAME.advance_to_next_street()
save_simple_game(GAME_FILE, GAME, P_NAME)
print(" ----------- 推进到street----------")
display_game_status(GAME, P_NAME)
display_player_turn()
def cmd_save(args):
global GAME
if GAME is None:
print("no game to save")
return
save_simple_game(GAME_FILE, GAME, P_NAME)
print(" saved", GAME_FILE)
# def cmd_load(args):
# global GAME, P_NAME, P_IDS
# if not os.path.exists(GAME_FILE):
# print("no saved game file.")
# return
# GAME, P_NAME = load_simple_game(GAME_FILE)
# P_IDS = [PlayerId(i, name) for i, name in P_NAME.items()]
# print(" Loaded", GAME_FILE)
# display_game_status(GAME, P_NAME)
# display_player_turn()
def cmd_reset(args):
global GAME, P_NAME, P_IDS
if os.path.exists(GAME_FILE):
os.remove(GAME_FILE)
GAME = None
P_NAME = None
P_IDS = None
print(" reset game file")
def cmd_help(args):
print("可用命令:")
print(" set <small>/<big> <p1> [p2 ...] [--stack N] ")
print(" act <player> <fold|call|check|bet|raise|all_in> [amount]")
print(" status [player|all] ")
print(" save ")
# print(" load ") 单开不加载
print(" reset ")
print(" ? ")
print(" q|e ")
commands = {
'set': cmd_set,
'act': cmd_act,
'status': cmd_status,
'next': cmd_next,
'save': cmd_save,
'reset': cmd_reset,
'?': cmd_help,
}
while True:
try:
raw = input("pg> ").strip()
except (EOFError, KeyboardInterrupt):
print()
break
if not raw:
continue
input_cmd = shlex.split(raw)
cmd = input_cmd[0].lower()
rest = raw[len(input_cmd[0]):].strip()
if cmd == 'q' or cmd == 'e':
break
fn = commands.get(cmd)
if fn:
fn(rest)
else:
print("unkown command:", cmd)
if __name__ == "__main__":
main()