commit 699b79d0da4c0720ecb65cd3e0b5579f7a7d0e15 Author: jianghaiying Date: Wed Nov 5 17:10:39 2025 +0800 gametree:1.0 diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..505a3b1 --- /dev/null +++ b/.gitignore @@ -0,0 +1,10 @@ +# Python-generated files +__pycache__/ +*.py[oc] +build/ +dist/ +wheels/ +*.egg-info + +# Virtual environments +.venv diff --git a/.python-version b/.python-version new file mode 100644 index 0000000..24ee5b1 --- /dev/null +++ b/.python-version @@ -0,0 +1 @@ +3.13 diff --git a/README.md b/README.md new file mode 100644 index 0000000..e69de29 diff --git a/gametree/__init__.py b/gametree/__init__.py new file mode 100644 index 0000000..673eea4 --- /dev/null +++ b/gametree/__init__.py @@ -0,0 +1,37 @@ +from .model import ( + PlayerId, + Player, + Action, + ActionType, + Street, +) + +from .deck import DeckManager + +from .card import Card, Rank, Suit + +from .game import Game + +from .hand_ranking import HandRanking, HandType + +from .hand_evaluator import HandEvaluator + +__all__ = [ + "PlayerId", + "Player", + "Action", + "ActionType", + "Street", + "Game", + + "Card", + "Rank", + "Suit", + + "HandRanking", + "HandType", + "HandEvaluator", + + "DeckManager", + +] \ No newline at end of file diff --git a/gametree/card.py b/gametree/card.py new file mode 100644 index 0000000..05e73bd --- /dev/null +++ b/gametree/card.py @@ -0,0 +1,86 @@ +from __future__ import annotations + +from enum import IntEnum +from typing import List + + +class Suit(IntEnum): + S = 0 + H = 1 + D = 2 + C = 3 + + def __str__(self) -> str: + return "shdc"[self.value] + + +class Rank(IntEnum): + R6 = 6 + R7 = 7 + R8 = 8 + R9 = 9 + RT = 10 + RJ = 11 + RQ = 12 + RK = 13 + RA = 14 + + def __str__(self): + if self.value <= 9: + return str(self.value) + return {10: "T", 11: "J", 12: "Q", 13: "K", 14: "A"}[self.value] + + @property + def numeric_value(self): + return self.value + + @property + def symbol(self): + return str(self) + + +class Card: + def __init__(self, rank: Rank, suit: Suit): + self.rank = rank + self.suit = suit + + def __repr__(self) -> str: + return f"{str(self.rank)}{str(self.suit)}" + + def __str__(self) -> str: + return self.__repr__() + + @classmethod + def all_short(cls) -> List["Card"]: + cards: List[Card] = [] + for r in Rank: + for s in Suit: + cards.append(Card(r, s)) + return cards + + def __eq__(self, other): + if not isinstance(other, Card): + return False + return self.rank == other.rank and self.suit == other.suit + + def __hash__(self): + return hash((self.rank, self.suit)) + + @classmethod + def from_str(cls, card_str: str) -> "Card": + if len(card_str) != 2: + raise ValueError(f"Card must be 2 char: {card_str}") + rank_char = card_str[0].upper() + suit_char = card_str[1].lower() + + rank = next((r for r in Rank if str(r).upper() == rank_char), None) + if rank is None: + raise ValueError(f"Invalid rank: {rank_char}") + + suit = next((s for s in Suit if str(s).lower() == suit_char), None) + if suit is None: + raise ValueError(f"Invalid suit: {suit_char}") + + return cls(rank, suit) + + diff --git a/gametree/deck.py b/gametree/deck.py new file mode 100644 index 0000000..cb8da4a --- /dev/null +++ b/gametree/deck.py @@ -0,0 +1,54 @@ +# 新的 dealing.py +import random +from typing import List, Tuple, Dict +from .model import PlayerId +from .card import Card + +class DeckManager: + + def __init__(self, shuffle: bool = True): + self.cards = Card.all_short() + if shuffle: + random.shuffle(self.cards) + self.position = 0 # 当前发牌位置 + self.burned_cards: List[Card] = [] + + def remaining_cards(self) -> int: + return len(self.cards) - self.position + + def deal_card(self) -> Card: + if self.position >= len(self.cards): + raise ValueError("Deck has no more cards") + card = self.cards[self.position] + self.position += 1 + return card + + def burn_card(self) -> Card: + card = self.deal_card() + self.burned_cards.append(card) + return card + + def deal_hand_cards(self, players: List[PlayerId]) -> Dict[PlayerId, List[Card]]: + hand_cards = {} + first_cards = [self.deal_card() for _ in players] + second_cards = [self.deal_card() for _ in players] + + for i, player_id in enumerate(players): + hand_cards[player_id] = [first_cards[i], second_cards[i]] + + return hand_cards + + def deal_flop(self) -> Tuple[Card, List[Card]]: + burn = self.burn_card() + flop = [self.deal_card() for _ in range(3)] + return burn, flop + + def deal_turn(self) -> Tuple[Card, Card]: + burn = self.burn_card() + turn = self.deal_card() + return burn, turn + + def deal_river(self) -> Tuple[Card, Card]: + burn = self.burn_card() + river = self.deal_card() + return burn, river diff --git a/gametree/game.py b/gametree/game.py new file mode 100644 index 0000000..66fe570 --- /dev/null +++ b/gametree/game.py @@ -0,0 +1,683 @@ +from __future__ import annotations +from dataclasses import dataclass, field +from typing import List, Optional, Dict, Any, Tuple +import random + +from .model import PlayerId, Action, ActionType, Street +from .card import Card +# from .dealing import deal_hand_cards, deal_flop, deal_turn, deal_river +from .hand_evaluator import HandEvaluator +from .deck import DeckManager + + +@dataclass +class PlayerState: + pid: PlayerId + stack: int # 玩家当前筹码量 + committed: int = 0 # 本轮已投入筹码 + in_hand: bool = True # 玩家是否仍在本局中 + folded: bool = False # 玩家是否已弃牌 + all_in: bool = False # 玩家是否已全押 + + def to_dict(self) -> Dict[str, Any]: + return { + "pid": self.pid.to_dict(), + "stack": self.stack, + "committed": self.committed, + "in_hand": self.in_hand, + "folded": self.folded, + "all_in": self.all_in, + } + + +@dataclass +class Pot: + amount: int = 0 + eligible: List[PlayerId] = field(default_factory=list) + + def to_dict(self) -> Dict[str, Any]: + return { + "amount": self.amount, + "eligible": [pid.to_dict() for pid in self.eligible], + } + + @staticmethod + def compute_side_pots(players: List[PlayerState]) -> List[Pot]: + pots: List[Pot] = [] + remaining = sorted([p for p in players if p.committed > 0], + key=lambda p: p.committed) + consumed = 0 + while remaining: + smallest = remaining[0].committed + player_n = len(remaining) + take = smallest - consumed + if take <= 0: + break + pot_amount = take * player_n + pot_eligible = [p.pid for p in remaining] + pots.append(Pot(amount=pot_amount, eligible=pot_eligible)) + consumed += take + # 移除已贡献完的玩家 + remaining = [p for p in remaining if p.committed > consumed] + return pots + + +@dataclass +class Game: + players_init: List[Tuple[PlayerId, int]] = field(default_factory=list) + dealer_idx: int = 0 + bb_idx: int = 0 + sb_idx: int = 0 + small_blind: int = 1 + big_blind: int = 2 + + current_street: Street = Street.PREFLOP + hand_cards: Dict[PlayerId, List[Card]] = field(default_factory=dict) + board: List[Card] = field(default_factory=list) + + players: List[PlayerState] = field(default_factory=list) + current_bet: int = 0 # 当前轮次最高下注额 + + last_raise: int = 0 # 上次的加注量 + original_bet: int = 0 # 本轮次的起始下注额(bb) + + street_actions: Dict[Street, List[Action]] = field(default_factory=dict) + _all_actions: List[Action] = field(default_factory=list) + + next_to_act_idx: Optional[int] = None + terminal: bool = False + winner: Optional[List[PlayerId]] = None + + pot: List[Pot] = field(default_factory=list) + hand_rankings: Dict[PlayerId, Any] = field(default_factory=dict) + deck: List[Card] = field(default_factory=list) # 剩余牌 + burned_cards: List[Card] = field(default_factory=list) + + invalid_raise: bool = False + deck_manager: Optional[DeckManager] = None + + def __post_init__(self): + if self.players_init and not self.players: + self._init_deck_manager() + self._init_players() + self._init_hand_cards() + self._init_street_actions() + self._init_bet() + + def _init_players(self): + self.players = [ + PlayerState(pid=pid, stack=stack) + for pid, stack in self.players_init + ] + + def _init_deck_manager(self): + if self.deck_manager is None: + self.deck_manager = DeckManager(shuffle=True) + self.burned_cards = [] + + def _init_hand_cards(self): + player_ids = [pid for pid, _ in self.players_init] + self.hand_cards = self.deck_manager.deal_hand_cards(player_ids) + + def _init_street_actions(self): + self.street_actions = { + Street.PREFLOP: [], + Street.FLOP: [], + Street.TURN: [], + Street.RIVER: [] + } + + def _init_deck(self): + self.deck = Card.all_short().copy() + random.shuffle(self.deck) + # 移除已发的底牌 + used_cards = [card for hand in self.hand_cards.values() for card in hand] + self.deck = [card for card in self.deck if card not in used_cards] + + def _init_bet(self): + n_players = len(self.players) + if n_players < 2: + return + + if n_players == 2: + self.sb_idx = self.dealer_idx # 两人游戏:庄家是小盲 + self.bb_idx = (self.dealer_idx + 1) % n_players + else: + self.sb_idx = (self.dealer_idx + 1) % n_players # 小盲位置 = 庄家下一位 + self.bb_idx = (self.dealer_idx + 2) % n_players # 大盲位置 = 小盲下一位 + + # 小盲注 + self.players[self.sb_idx].stack -= self.small_blind + self.players[self.sb_idx].committed += self.small_blind + + self.players[self.bb_idx].stack -= self.big_blind + self.players[self.bb_idx].committed += self.big_blind + + self.current_bet = self.big_blind + self.original_bet = self.big_blind + self.last_raise = self.big_blind + + self._get_first_act_idx() + + def _get_first_act_idx(self): + """ + 首个行动玩家位置 + 1. preflop: 大盲后第一位(players>=3); 小盲(players=2) + 2. postflop: 小盲先行动 + """ + n_players = len(self.players) + if n_players < 2: + self.next_to_act_idx = None + return None + + if self.current_street == Street.PREFLOP: + first_idx = (self.bb_idx + 1) % n_players if n_players >= 3 else self.sb_idx + self.next_to_act_idx = first_idx + return first_idx + else: + for i in range(0, n_players): + idx = (self.sb_idx + i) % n_players + player = self.players[idx] + if player.in_hand and not player.folded and not player.all_in: + self.next_to_act_idx = idx + return idx + + # 没有找到可行动玩家 + self.next_to_act_idx = None + return None + + + def add_action(self, action: Action): + + if self.terminal: + raise ValueError("Game is over") + + if not action.actor: + raise ValueError("Action requires player") + + self.street_actions[self.current_street].append(action) + self._all_actions.append(action) + + self._apply_action_to_state(action) + self._update_next_to_act() + + if not self.terminal: + if self.next_to_act_idx is None or self._is_round_over(): + if self.current_street == Street.RIVER: + self.finalize_game() + else: + self.advance_to_next_street() + # all allin + while (not self.terminal and self.current_street != Street.RIVER and + self._all_active_all_in()): + self.advance_to_next_street() + + if (not self.terminal and self.current_street == Street.RIVER and + (self.next_to_act_idx is None or self._all_active_all_in())): + self.finalize_game() + + + def _all_active_all_in(self) -> bool: + active = [p for p in self.players if p.in_hand and not p.folded] + if not active: + return True + if len(active) <= 1: + return True + return all(p.all_in for p in active) + + def finalize_game(self): + print("+++++++++++++++++++++++++++++++++++++++++++++++++++++++++") + print("DEBUG: finalize_game") + print(f"DEBUG: pot before finalize: {[(p.amount, len(p.eligible)) for p in self.pot]}") + print(f"DEBUG: active players: {[(p.pid, p.stack, p.committed, p.folded, p.all_in) + for p in self.players if p.in_hand]}") + if not self.pot: + self.finalize_pots() + + active = [p for p in self.players if p.in_hand and not p.folded] + if len(active) == 1: + win_pid = active[0].pid + total = sum(pot.amount for pot in self.pot) + active[0].stack += total + for p in self.players: + p.committed = 0 + self.pot = [] + self.winner = [win_pid] + self.terminal = True + return + + winners_all = self.showdown() + if winners_all: + self.winner = winners_all + for pot in self.pot: + elig = [pid for pid in pot.eligible if pid in self.hand_rankings] + if not elig: + continue + best_strength = max(self.hand_rankings[pid].get_strength() for pid in elig) + pot_winners = [pid for pid in elig if self.hand_rankings[pid].get_strength() == best_strength] + share = pot.amount // len(pot_winners) + rem = pot.amount % len(pot_winners) + for i, pid in enumerate(pot_winners): + ps = self._get_player(pid) + if ps is None: + continue + ps.stack += share + (rem if i == 0 else 0) + + for p in self.players: + p.committed = 0 + p.all_in = False + self.pot = [] + self.terminal = True + return + + + def _is_round_over(self) -> bool: + active = [p for p in self.players if p.in_hand and not p.folded] + if len(active) <= 1: + return True + if self.current_bet == 0 and self.next_to_act_idx is None: + return True + for p in active: + if not p.all_in and p.committed < self.current_bet: + return False + if not p.all_in and p.committed!= self.current_bet: + return False + return True + + def _player_fold_act(self, player: PlayerState): + player.folded = True + player.in_hand = False + # 检查是否只剩一个玩家,如果是的话,游戏结束,进行结算 todo + active_players = [p for p in self.players if p.in_hand and not p.folded] + if len(active_players) == 1: + self.terminal = True + self.winner = [active_players[0].pid] + + def _player_call_act(self, player: PlayerState, amount: Optional[int] = None): + if not player.in_hand or player.folded: + raise ValueError("Player cannot act") + to_call = max(0, self.current_bet - player.committed) + amount = amount if amount else to_call + pay = min(amount, player.stack) + player.stack -= pay + player.committed += pay + if player.stack == 0: + player.all_in = True + + def _player_check_act(self, player: PlayerState): + if not player.in_hand or player.folded: + raise ValueError("Player cannot act") + to_call = max(0, self.current_bet - player.committed) + if to_call > 0: + raise ValueError(f"Cannot CHECK,need {to_call} to call") + if player.stack == 0: + player.all_in = True + + def _player_bet_act(self, player: PlayerState, amount: int): + if amount is None: + raise ValueError("BET requires amount") + if self.current_bet > 0: + raise ValueError(f"Cannot BET when current bet is {self.current_bet}") + + # BET的最小金额通常是bb + min_bet = self.big_blind + if amount < min_bet: + raise ValueError(f"BET amount {amount} < minimum bet {min_bet}") + + required = max(0, amount - player.committed) + pay = min(required, player.stack) + player.stack -= pay + player.committed += pay + + if player.stack == 0: + player.all_in = True + + self.current_bet = player.committed + self.original_bet = player.committed + self.last_raise = player.committed + + def _player_raise_act(self, player: PlayerState, amount: int): + if not player.in_hand or player.folded: + raise ValueError("Player cannot act") + if amount is None: + raise ValueError("RAISE requires amount") + if self.current_bet == 0: + raise ValueError("Cannot RAISE when no one has bet") + + min_raise_to = self.current_bet + self.last_raise + if amount < min_raise_to: + raise ValueError( + f"RAISE amount {amount} < minimum raise to {min_raise_to} " + f"(current bet {self.current_bet} + last raise {self.last_raise})" + ) + + required = max(0, amount - player.committed) + pay = min(required, player.stack) + player.stack -= pay + player.committed += pay + + if player.stack == 0: + player.all_in = True + + # 有效加注 + if player.committed > self.current_bet: + self.last_raise = player.committed - self.current_bet + self.current_bet = player.committed + + def _player_all_in_act(self, player: PlayerState): + if not player.in_hand or player.folded: + raise ValueError("Player cannot act") + if player.stack <= 0: + raise ValueError("Player no chips to all-in") + if player.all_in: + raise ValueError("Player already all-in") + + pay = player.stack + player.stack = 0 + player.committed += pay + player.all_in = True + + if self.current_bet == 0: # 下注即allin + self.current_bet = player.committed + self.original_bet = player.committed + self.last_raise = player.committed + elif player.committed > self.current_bet: + if player.committed < self.current_bet + self.last_raise: + self.invalid_raise = True + self.last_raise = player.committed - self.current_bet + self.current_bet = player.committed + else: + self.invalid_raise = True + + def _apply_action_to_state(self, action: Action): + if action.amount and action.amount < 0: + raise ValueError("Action amount cannot be negative") + + player_idx = self._get_player_index(action.actor) + player = self.players[player_idx] + match action.type: + case ActionType.FOLD: + self._player_fold_act(player) + case ActionType.CALL: + self._player_call_act(player, action.amount) + case ActionType.CHECK: + self._player_check_act(player) + case ActionType.BET: + self._player_bet_act(player, action.amount) + case ActionType.RAISE: + self._player_raise_act(player, action.amount) + case ActionType.ALL_IN: + self._player_all_in_act(player) + + def _update_next_to_act(self): + if self.terminal: + self.next_to_act_idx = None + return + + current_idx = self.next_to_act_idx + if current_idx is None: + return + + n_players = len(self.players) + for i in range(1, n_players + 1): + next_idx = (current_idx + i) % n_players + player = self.players[next_idx] + if player.in_hand and not player.folded and not player.all_in and player.stack > 0: + self.next_to_act_idx = next_idx + return + # 没有找到可行动的玩家,进入下一轮 todo + self.next_to_act_idx = None + + def advance_to_next_street(self): + if self.current_street == Street.RIVER: + # 已经是最后一街道,准备摊牌 todo + return + + round_pot = sum(player.committed for player in self.players) + if round_pot > 0: + eligible_players = [p.pid for p in self.players if p.in_hand and not p.folded] + if not self.pot: + self.pot = [] + self.pot.append(Pot(amount=round_pot, eligible=eligible_players)) + + for player in self.players: + player.committed = 0 + self.current_bet = 0 + + self.original_bet = 0 + self.last_raise = 0 + self.invalid_raise = False + + self._deal_cards_for_street() + + self._get_first_act_idx() + + def _deal_cards_for_street(self): + if self.current_street == Street.PREFLOP: + burn_card, flop_cards = self.deck_manager.deal_flop() + self.burned_cards.append(burn_card) + self.board = flop_cards + self.current_street = Street.FLOP + + elif self.current_street == Street.FLOP: + burn_card, turn_card = self.deck_manager.deal_turn() + self.burned_cards.append(burn_card) + self.board.append(turn_card) + self.current_street = Street.TURN + + elif self.current_street == Street.TURN: + burn_card, river_card = self.deck_manager.deal_river() + self.burned_cards.append(burn_card) + self.board.append(river_card) + self.current_street = Street.RIVER + + def get_call_amt(self, player_id: PlayerId) -> int: + player = self._get_player(player_id) + if not player: + return 0 + return max(0, self.current_bet - player.committed) + + def get_allin_amt(self, player_id: PlayerId) -> int: + player = self._get_player(player_id) + if not player: + return 0 + return player.stack + + def get_bet_bound(self, player_id: PlayerId) -> Tuple[Optional[int], Optional[int]]: + player = self._get_player(player_id) + if not player: + return (None, None) + to_call = self.get_call_amt(player_id) + if to_call != 0 or player.stack <= 0: + return (None, None) + min_bet = max(1, self.big_blind) + max_bet = player.stack + return (min_bet, max_bet) + + def get_raise_bounds(self, player_id: PlayerId) -> Tuple[Optional[int], Optional[int]]: + # raise total + player = self._get_player(player_id) + if not player: + return (None, None) + to_call = self.get_call_amt(player_id) + if to_call == 0 or player.stack <= 0: + return (None, None) + base_raise = self.last_raise if (self.last_raise and self.last_raise > 0) else self.big_blind + min_raise_total = self.current_bet + base_raise + max_raise_total = player.committed + player.stack # alin + # min>max, invalide-raise, todo:legal_action处理 + return (min_raise_total, max_raise_total) + + def get_hand_cards(self, player_id: PlayerId) -> List[Card]: + return self.hand_cards.get(player_id, []) + + def get_current_board(self) -> List[Card]: + return self.board.copy() + + def get_actions_in_street(self, street: Street) -> List[Action]: + return self.street_actions.get(street, []) + + def get_all_actions(self) -> List[Action]: + return self._all_actions.copy() + + def get_remaining_deck(self) -> int: + return self.deck_manager.remaining_cards() if self.deck_manager else 0 + + def get_burned_cards(self) -> List[Card]: + return self.burned_cards.copy() + + @property + def total_pot(self) -> int: + committed_total = sum(p.committed for p in self.players) + pot_total = sum(pot.amount for pot in self.pot) + return committed_total + pot_total + + def get_player_stack(self, player_id: PlayerId) -> int: + player = self._get_player(player_id) + return player.stack if player else 0 + + def legal_actions(self, player_id: PlayerId) -> List[ActionType]: + player = self._get_player(player_id) + if not player or self.terminal or player.folded or not player.in_hand: + return [] + + to_call = max(0, self.current_bet - player.committed) + actions = [] + + # 前面行动的玩家无效加注行为,后面行动的玩家只能选择跟注、弃牌、全押 + # TODO: 这里的逻辑是否合理还需要研究 + # deepseek:“跟注或弃牌”是针对“加注”这一行为的回应,而“全下”是超越于这个循环之外的、每个玩家的终极权利。 + # eg:50/100的牌桌,第一个行动的玩家bet100,第二个玩家raise400,第三个玩家allin250, + # 第四个玩家allin180,现在又回到第一个玩家行动,行动有哪些 + # deepseek认为:第一个玩家面对的是一个400的加注,他可以选择弃牌、跟注400或者加注全押 + # 1.弃牌或跟注 + # 2.根据最后一个有效加注400的有效加注 + # 3.全压的无效加注 + # 那么玩家2对应玩家1的行为为: + # 1.翻牌 + # 2.跟注、加注、弃牌 + # 3.跟注、弃牌 + # 疑问:为什么玩家1可以开启加注轮 + if self.invalid_raise: + if to_call == 0: + if player.stack > 0: + actions.append(ActionType.ALL_IN) + else: + actions.append(ActionType.FOLD) + if player.stack > 0: + if player.stack >= to_call: # 暂时这么处理吧 + actions.append(ActionType.CALL) + else: + actions.append(ActionType.ALL_IN) + return actions + + if to_call == 0: + actions.append(ActionType.CHECK) + if player.stack > 0: + actions.append(ActionType.BET) + actions.append(ActionType.ALL_IN) + else: + actions.append(ActionType.FOLD) + if player.stack > 0: + base_raise = self.last_raise if (self.last_raise and self.last_raise > 0) else self.big_blind + min_raise_to = self.current_bet + base_raise + max_raise_to = player.committed + player.stack + if max_raise_to < min_raise_to: + if player.stack >= to_call: + actions.append(ActionType.CALL) + else: + actions.append(ActionType.ALL_IN) + self.invalid_raise = True + else: + actions.append(ActionType.CALL) + actions.append(ActionType.RAISE) + actions.append(ActionType.ALL_IN) + + return actions + + def evaluate_hand(self, player_id: PlayerId) -> Optional[Any]: + """评估玩家手牌""" + if player_id not in self.hand_cards or len(self.board) < 5: + return None + + all_cards = self.hand_cards[player_id] + self.board + if len(all_cards) != 7: + return None + + return HandEvaluator.evaluateHand(all_cards) + + def showdown(self) -> List[PlayerId]: + if len(self.board) != 5: + active_players = [p for p in self.players if p.in_hand and not p.folded] + if len(active_players) == 1: + self.winner = [active_players[0].pid] + self.terminal = True + return self.winner + return [] + + player_rankings = {} + for player in self.players: + if player.in_hand and not player.folded: + ranking = self.evaluate_hand(player.pid) + if ranking: + player_rankings[player.pid] = ranking + + if not player_rankings: + return [] + + best_strength = max(ranking.get_strength() for ranking in player_rankings.values()) + winners = [pid for pid, ranking in player_rankings.items() + if ranking.get_strength() == best_strength] + + self.hand_rankings = player_rankings + self.winner = winners + self.terminal = True + return winners + + def finalize_pots(self): + self.pot = Pot.compute_side_pots(self.players) + + + def _get_player(self, player_id: PlayerId) -> Optional[PlayerState]: + for player in self.players: + if player.pid == player_id: + return player + return None + + def _get_player_index(self, player_id: PlayerId) -> int: + for i, player in enumerate(self.players): + if player.pid == player_id: + return i + raise ValueError(f"Player {player_id} not found") + + def get_player_by_idx(self, index)-> PlayerId: + # print("----------------------------------------------------") + if index < 0 or index >= len(self.players): + print(f"Invalid player index: {index}") + raise ValueError(f"Player index {index} out of range") + # print(f"debug-pid: {self.players[index].pid}") + # print("----------------------------------------------------") + return self.players[index].pid + + + @property + def actions(self) -> List[Action]: + return self._all_actions + + def to_dict(self) -> Dict[str, Any]: + return { + "players": [p.to_dict() for p in self.players], + "dealer_idx": self.dealer_idx, + "small_blind": self.small_blind, + "big_blind": self.big_blind, + "street": self.current_street.name, + "board": [str(card) for card in self.board], + "current_bet": self.current_bet, + "pot": [pot.to_dict() for pot in self.pot], + "hand_cards": {str(pid): [str(card) for card in cards] + for pid, cards in self.hand_cards.items()}, + "next_to_act_idx": self.next_to_act_idx, + "terminal": self.terminal, + "winner": [w.to_dict() for w in self.winner] if self.winner else None, + "total_actions": len(self._all_actions), + } \ No newline at end of file diff --git a/gametree/hand_evaluator.py b/gametree/hand_evaluator.py new file mode 100644 index 0000000..eafb751 --- /dev/null +++ b/gametree/hand_evaluator.py @@ -0,0 +1,109 @@ +from typing import List, Tuple, Dict +from collections import Counter +from itertools import combinations +from .card import Card, Rank, Suit +from .hand_ranking import HandRanking, HandType + + +class HandEvaluator: + @staticmethod + def evaluateHand(cards) -> HandRanking: + if len(cards) != 7: + raise ValueError(f"Expected 7 cards, got {len(cards)}") + + best_ranking = None + best_cards = None + + # 所有可能的5张牌组合 + for five_cards in combinations(cards, 5): + ranking = HandEvaluator.evaluate5Cards(list(five_cards)) + + if best_ranking is None or ranking > best_ranking: + best_ranking = ranking + best_cards = list(five_cards) + best_ranking.cards = best_cards + return best_ranking + + @staticmethod + def evaluate5Cards(cards) -> HandRanking: + + if len(cards) != 5: + raise ValueError(f"Expected 5 cards, got {len(cards)}") + + # 按点数排序(降序) + sorted_cards = sorted(cards, key=lambda c: c.rank.numeric_value, reverse=True) + ranks = [card.rank for card in sorted_cards] + suits = [card.suit for card in sorted_cards] + + # 统计点数出现次数 + rank_counts = Counter(ranks) + count_values = sorted(rank_counts.values(), reverse=True) + + # 同花 + is_flush = len(set(suits)) == 1 + + # 顺子 + is_straight, straight_high = HandEvaluator._isStraight(ranks) + + # 根据牌型返回相应的HandRanking + if is_straight and is_flush: + if straight_high == Rank.RA and ranks == [Rank.RA, Rank.RK, Rank.RQ, Rank.RJ, Rank.RT]: + return HandRanking(HandType.ROYAL_FLUSH, [Rank.RA], sorted_cards) # 皇家同花顺 + else: + return HandRanking(HandType.STRAIGHT_FLUSH, [straight_high], sorted_cards) # 同花顺 + + elif count_values == [4, 1]: # 四条 + quad_rank = [rank for rank, count in rank_counts.items() if count == 4][0] + kicker = [rank for rank, count in rank_counts.items() if count == 1][0] + return HandRanking(HandType.FOUR_OF_A_KIND, [quad_rank, kicker], sorted_cards) + + elif count_values == [3, 2]: # 三条+一对 + trips_rank = [rank for rank, count in rank_counts.items() if count == 3][0] + pair_rank = [rank for rank, count in rank_counts.items() if count == 2][0] + return HandRanking(HandType.FULL_HOUSE, [trips_rank, pair_rank], sorted_cards) + + elif is_flush: # 同花 + return HandRanking(HandType.FLUSH, ranks, sorted_cards) + + elif is_straight: # 顺子 + return HandRanking(HandType.STRAIGHT, [straight_high], sorted_cards) + + elif count_values == [3, 1, 1]: # 三条 + trips_rank = [rank for rank, count in rank_counts.items() if count == 3][0] + kickers = sorted([rank for rank, count in rank_counts.items() if count == 1], reverse=True) + return HandRanking(HandType.THREE_OF_A_KIND, [trips_rank] + kickers, sorted_cards) + + elif count_values == [2, 2, 1]: # 两对 + pairs = sorted([rank for rank, count in rank_counts.items() if count == 2], reverse=True) + kicker = [rank for rank, count in rank_counts.items() if count == 1][0] + return HandRanking(HandType.TWO_PAIR, pairs + [kicker], sorted_cards) + + elif count_values == [2, 1, 1, 1]: # 一对 + pair_rank = [rank for rank, count in rank_counts.items() if count == 2][0] + kickers = sorted([rank for rank, count in rank_counts.items() if count == 1], reverse=True) + return HandRanking(HandType.ONE_PAIR, [pair_rank] + kickers, sorted_cards) + + else: # 高牌 + return HandRanking(HandType.HIGH_CARD, ranks, sorted_cards) + + @staticmethod + def _isStraight(ranks: List[Rank]) -> Tuple[bool, Rank]: + values = sorted([rank.numeric_value for rank in ranks], reverse=True) + + is_regular_straight = True + for i in range(1, len(values)): + if values[i-1] - values[i] != 1: + is_regular_straight = False + break + + if is_regular_straight: + highest_rank = None + for rank in ranks: + if rank.numeric_value == values[0]: + highest_rank = rank + break + return True, highest_rank + + if values == {14, 9, 8, 7, 6}: # A, 9, 8, 7, 6 + return True, Rank.R9 + return False, None \ No newline at end of file diff --git a/gametree/hand_ranking.py b/gametree/hand_ranking.py new file mode 100644 index 0000000..093faea --- /dev/null +++ b/gametree/hand_ranking.py @@ -0,0 +1,82 @@ +from enum import Enum +from typing import List, Tuple +from .card import Card, Rank + + +class HandType(Enum): + # 短牌规则:同花 > 葫芦 + HIGH_CARD = (1, "High Card") + ONE_PAIR = (2, "Pair") + TWO_PAIR = (3, "Two Pair") + THREE_OF_A_KIND = (4, "Three of a Kind") + STRAIGHT = (5, "Straight") + FULL_HOUSE = (6, "Full House") + FLUSH = (7, "Flush") + FOUR_OF_A_KIND = (8, "Four of a Kind") + STRAIGHT_FLUSH = (9, "Straight Flush") + ROYAL_FLUSH = (10, "Royal Flush") + + def __new__(cls, strength, name): + obj = object.__new__(cls) + obj._value_ = strength + obj.strength = strength + obj.type_name = name + return obj + + + +class HandRanking: + + def __init__(self, hand_type: HandType, key_ranks: List[Rank], cards: List[Card]): + self.hand_type = hand_type + self.key_ranks = key_ranks # 用于比较的关键点数 + self.cards = cards # 组成这个ranking的5张牌 + + def __str__(self): + if self.hand_type == HandType.FOUR_OF_A_KIND: + return f"Quad({self.key_ranks[0].symbol})" + elif self.hand_type == HandType.FULL_HOUSE: + return f"Full House({self.key_ranks[0].symbol} over {self.key_ranks[1].symbol})" + elif self.hand_type == HandType.FLUSH: + return f"Flush({self.key_ranks[0].symbol} high)" + elif self.hand_type == HandType.STRAIGHT: + return f"Straight({self.key_ranks[0].symbol} high)" + elif self.hand_type == HandType.STRAIGHT_FLUSH: + if self.key_ranks[0] == Rank.ACE: + return "Royal Flush" + else: + return f"Straight Flush({self.key_ranks[0].symbol} high)" + elif self.hand_type == HandType.ROYAL_FLUSH: + return "Royal Flush" + elif self.hand_type == HandType.THREE_OF_A_KIND: + return f"Three of a Kind({self.key_ranks[0].symbol})" + elif self.hand_type == HandType.TWO_PAIR: + return f"Two Pair({self.key_ranks[0].symbol} and {self.key_ranks[1].symbol})" + elif self.hand_type == HandType.ONE_PAIR: + return f"Pair({self.key_ranks[0].symbol})" + else: + return f"High Card({self.key_ranks[0].symbol})" + + def __lt__(self, other): + """比较牌力,用于排序""" + if not isinstance(other, HandRanking): + return NotImplemented + + if self.hand_type.strength != other.hand_type.strength: + return self.hand_type.strength < other.hand_type.strength + + for my_rank, other_rank in zip(self.key_ranks, other.key_ranks): + if my_rank.numeric_value != other_rank.numeric_value: + return my_rank.numeric_value < other_rank.numeric_value + + return False + def get_strength(self) -> int: + # 返回牌力 还是 牌型+点数 + # 基础强度 = 牌型强度 * 1000000 + # todo + strength = self.hand_type.strength * 1000000 + + for i, rank in enumerate(self.key_ranks): + strength += rank.numeric_value * (100 ** (4 - i)) + + return strength diff --git a/gametree/model.py b/gametree/model.py new file mode 100644 index 0000000..a23684a --- /dev/null +++ b/gametree/model.py @@ -0,0 +1,107 @@ +from dataclasses import dataclass, field +from typing import List, Optional,Tuple,Dict,Any +from enum import Enum, auto +from .card import Card +import random + + +@dataclass(frozen=True) +class PlayerId: + id: int + name: str + + def __post_init__(self): + if not isinstance(self.id, int): + raise TypeError("PlayerId.id must be int") + if self.id < 0: + raise ValueError("PlayerId.id must be >= 0") + + def __str__(self) -> str: + return f"PlayerId{self.id}" + f"({self.name})" + + def __hash__(self): + return hash(self.id) + + def to_dict(self) -> Dict[str, Any]: + return {"id": self.id, "name": self.name} + + @staticmethod + def from_dict(cls, d: Dict[str, Any]) -> "PlayerId": + return cls(int(d["id"]), d.get("name")) + +@dataclass +class Player: + pid: PlayerId + name: str + # seat: int + # address: str + +class Street(Enum): + PREFLOP = 0 + FLOP = 1 + TURN = 2 + RIVER = 3 + +class ActionType(Enum): + FOLD = "fold" + CALL = "call" + CHECK = "check" + RAISE = "raise" + BET = "bet" + ALL_IN = "all_in" + + +@dataclass +class Action: + actor: PlayerId # 现在所有动作都必须有玩家 + type: ActionType + amount: Optional[int] = None + + cards:Optional[List[Card]] = None + + def __post_init__(self): + if self.type in (ActionType.BET, ActionType.RAISE): + if self.amount is None: + raise ValueError(f"Action{self.type.name} need amount") + if self.amount <= 0: + raise ValueError(f"Action{self.type.name} need amount > 0") + + def action_to_dict(self) -> Dict[str, Any]: + return { + "actor": self.actor.to_dict(), + "type": self.type.name, + "amount": self.amount, + "cards": [str(c) for c in self.cards] if self.cards else None, + } + + @staticmethod + def action_from_dict(d: Dict[str, Any]) -> "Action": + actor = PlayerId.from_dict(d["actor"]) + type = ActionType[d["type"]] + # 反序列 + cards = [Card.from_str(s) for s in d["cards"]] if d.get("cards") else None + return Action( + actor=actor, + type=type, + amount=d.get("amount"), + cards=cards + ) + +def act_fold(pid: PlayerId) -> Action: + return Action(actor=pid, type=ActionType.FOLD) + +def act_check(pid: PlayerId) -> Action: + return Action(actor=pid, type=ActionType.CHECK) + +def act_call(pid: PlayerId, amount: int) -> Action: + return Action(actor=pid, type=ActionType.CALL, amount=amount) + +def act_bet(pid: PlayerId, amount: int) -> Action: + return Action(actor=pid, type=ActionType.BET, amount=amount) + +def act_raise(pid: PlayerId, to_amount: int) -> Action: + return Action(actor=pid, type=ActionType.RAISE, amount=to_amount) + +def act_all_in(pid: PlayerId, amount: int) -> Action: + return Action(actor=pid, type=ActionType.ALL_IN, amount=amount) + diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..6fd1796 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,7 @@ +[project] +name = "gametree" +version = "0.1.0" +description = "Add your description here" +readme = "README.md" +requires-python = ">=3.13" +dependencies = [] diff --git a/test/__init__.py b/test/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/test/pg.json b/test/pg.json new file mode 100644 index 0000000..c3e34ce --- /dev/null +++ b/test/pg.json @@ -0,0 +1,109 @@ +{ + "player_names": { + "0": "a", + "1": "b", + "2": "c", + "3": "d" + }, + "game_state": { + "players_init": [ + [ + 0, + 500 + ], + [ + 1, + 500 + ], + [ + 2, + 500 + ], + [ + 3, + 500 + ] + ], + "dealer_idx": 0, + "small_blind": 5, + "big_blind": 10, + "current_street": "RIVER", + "all_actions": [ + { + "type": "CALL", + "actor": 3, + "amount": null + }, + { + "type": "CALL", + "actor": 0, + "amount": null + }, + { + "type": "CALL", + "actor": 1, + "amount": null + }, + { + "type": "BET", + "actor": 1, + "amount": 40 + }, + { + "type": "CALL", + "actor": 2, + "amount": null + }, + { + "type": "CALL", + "actor": 3, + "amount": null + }, + { + "type": "CALL", + "actor": 0, + "amount": null + }, + { + "type": "BET", + "actor": 1, + "amount": 40 + }, + { + "type": "CALL", + "actor": 2, + "amount": null + }, + { + "type": "CALL", + "actor": 3, + "amount": null + }, + { + "type": "CALL", + "actor": 0, + "amount": null + }, + { + "type": "BET", + "actor": 1, + "amount": 40 + }, + { + "type": "CALL", + "actor": 2, + "amount": null + }, + { + "type": "CALL", + "actor": 3, + "amount": null + }, + { + "type": "CALL", + "actor": 0, + "amount": null + } + ] + } +} \ No newline at end of file diff --git a/test/pg.py b/test/pg.py new file mode 100644 index 0000000..31d2c14 --- /dev/null +++ b/test/pg.py @@ -0,0 +1,352 @@ +#!/usr/bin/env python3 +import json +import os +import sys +from pathlib import Path +from typing import Optional, List, Dict, Any, Tuple + +sys.path.insert(0, str(Path(__file__).parent.parent)) +import json +from pathlib import Path +from gametree import Game, PlayerId, Action, ActionType, Street +from gametree.model import act_fold, act_call, act_check, act_bet, act_raise, act_all_in + + +GAME_FILE = "pg.json" + +def create_simple_game(player_names: List[str], + small_blind: int = 5, + big_blind: int = 10, + stack: int = 500, + dealer_idx: int = 0): + player_map: Dict[int, str] = {i: name for i, name in enumerate(player_names)} + player_ids: List[PlayerId] = [PlayerId(i,name) for i,name in enumerate(player_names)] + players_init = [(pid, stack) for pid in player_ids] + game = Game(players_init=players_init, dealer_idx=dealer_idx, + small_blind=small_blind, big_blind=big_blind) + return game, player_map, player_ids + +def save_simple_game(path: str, game: Game, players) -> None: + p = Path(path) + data = { + "player_names": players, + "game_state": { + "players_init": [[pid.id, stack] for pid, stack in getattr(game, "players_init", [])], + "dealer_idx": getattr(game, "dealer_idx", None), + "small_blind": getattr(game, "small_blind", None), + "big_blind": getattr(game, "big_blind", None), + "current_street": getattr(game, "current_street").name if getattr(game, "current_street", None) else None, + "all_actions": [ + {"type": a.type.name, "actor": int(a.actor.id), "amount": a.amount} + for a in game.get_all_actions() + ] + } + } + p.write_text(json.dumps(data, indent=2), encoding="utf-8") + +# 单开不加载 +# def load_simple_game(path: str) -> Tuple[Game, Dict[int,str]]: +# p = Path(path) +# data = json.loads(p.read_text(encoding="utf-8")) +# player_names = {int(k): v for k, v in data["player_names"].items()} +# gs = data["game_state"] +# players_init = [(PlayerId(pid), stack) for pid, stack in gs.get("players_init", [])] +# game = Game(players_init=players_init, +# dealer_idx=gs.get("dealer_idx", 0), +# small_blind=gs.get("small_blind", 5), +# big_blind=gs.get("big_blind", 10)) +# for ad in gs.get("all_actions", []): +# try: +# act = Action(type=ActionType[ad["type"]], actor=PlayerId(ad["actor"]), amount=ad.get("amount")) +# game.add_action(act) +# except Exception: +# continue +# target = gs.get("current_street") +# if target: +# from .model import Street as _Street +# target_st = _Street[target] +# while game.current_street != target_st and not getattr(game, "terminal", False): +# game.advance_to_next_street() +# return game, player_names + +def display_game_status(game: Game, player_names: Dict[int,str], show_cards_for: Optional[str] = None) -> None: + print(f"Street: {game.current_street.name}") + board = game.get_current_board() + if board: + print("Board:", " ".join(str(c) for c in board)) + print("Pot:", getattr(game, "total_pot", 0)) + for i, pstate in enumerate(game.players): + name = player_names.get(i, f"Player_{i}") + marks = [] + if i == getattr(game, "dealer_idx", None): + marks.append("D") + if i == getattr(game, "next_to_act_idx", None): + marks.append("->") + if pstate.folded: + marks.append("F") + if pstate.all_in: + marks.append("A") + info = f"{i}:{name} stack={pstate.stack} bet={pstate.committed} [{' '.join(marks)}]" + if show_cards_for == "all" or show_cards_for == name: + cards = game.get_hand_cards(PlayerId(i)) or [] + info += " hand=" + " ".join(str(c) for c in cards) + print(info) + + if getattr(game, "terminal", True): + display_winners(game, player_names) + +def display_winners(game: Game, player_names: Dict[int,str]) -> None: + print("=" * 50) + print("GAME OVER") + + winners = getattr(game, "winner", []) + if winners: + winner_names = [] + for pid in winners: + winner_idx = getattr(pid, "id", None) + if winner_idx is not None and winner_idx in player_names: + winner_names.append(player_names[winner_idx]) + else: + winner_names.append(str(pid)) + print(f"Winner(s): {', '.join(winner_names)}") + hand_rankings = getattr(game, "hand_rankings", {}) + if hand_rankings: + print("\nHand Rankings:") + for pid, ranking in hand_rankings.items(): + player_idx = getattr(pid, "id", None) + player_name = player_names.get(player_idx, str(pid)) if player_idx is not None else str(pid) + cards = game.get_hand_cards(pid) or [] + board = game.get_current_board() + all_cards = cards + board + print(f" {player_name}: {' '.join(str(c) for c in cards)} | {ranking}") + + print("=" * 50) + + +def display_player_turn(): + if GAME is None or P_IDS is None: + print("No game loaded/created") + return + + idx = getattr(GAME, "next_to_act_idx", None) + if idx is None: + GAME._get_first_act_idx() + idx = getattr(GAME, "next_to_act_idx", None) + if idx is None: + print("No next-to-act player (index is None)") + return + pid = GAME.get_player_by_idx(idx) + pstate = GAME.players[idx] + # todo : 接口 + to_call = max(0, getattr(GAME, "current_bet", 0) - getattr(pstate, "committed", 0)) + allin_amount = getattr(pstate, "stack", 0) + last_raise = getattr(GAME, "last_raise", 0) or 0 + + actions_display = GAME.legal_actions(pid) + + player_name = P_NAME.get(idx, f"Player_{idx}") if isinstance(P_NAME, dict) else f"Player_{idx}" + pid_name = getattr(pid, "name", None) + print("\n--- Next to act ---") + print(f"player index: {idx}") + print(f"player name: {player_name}") + print(f"player id: {pid.id}" + (f" ({pid_name})")) + print("legal actions:") + for a in actions_display: + if a == ActionType.FOLD: + print(" - fold") + elif a == ActionType.CALL: + print(f" - call (amount: {GAME.get_call_amt(pid)})") + elif a == ActionType.CHECK: + print(" - check") + elif a == ActionType.BET: + min_bet, max_bet = GAME.get_bet_bound(pid) + print(f" - bet (min: {min_bet}, max: {max_bet})") + elif a == ActionType.RAISE: + min_raise, max_raise = GAME.get_raise_bounds(pid) + print(f" - raise (min: {min_raise}, max: {max_raise})") + elif a == ActionType.ALL_IN: + print(f" - all_in (amount: {GAME.get_allin_amt(pid)})") + + + print("----------------------------------------------------") + + +def main(): + global GAME, P_IDS, CUR_PID, P_NAME + import shlex + + def get_pid_by_name(name): + if P_NAME is None or P_IDS is None: + return None, None + for idx, pid in enumerate(P_IDS): + if P_NAME.get(idx) == name: + return pid, idx + return None, None + + def cmd_set(args): + global GAME, P_NAME, P_IDS + input_line = args.split() + if len(input_line) < 3: + print("usage: set / [player ...] [--stack N]") + return + blinds = input_line[0].split('/') + names = [] + stack = 500 + for p in input_line[1:]: + if not p.startswith('--stack'): + names.append(p) + + if len(input_line) >= 4 and input_line[-2] == '--stack': + stack = int(input_line[-1]) + names = input_line[1:-2] + + game, pname_map, pids = create_simple_game(names, + small_blind=int(blinds[0]), + big_blind=int(blinds[1]), + stack=stack) + GAME = game + P_NAME = pname_map + P_IDS = pids + save_simple_game(GAME_FILE, GAME, P_NAME) + GAME._get_first_act_idx() + display_game_status(GAME, P_NAME) + display_player_turn() + + + def cmd_act(args): + global GAME + if GAME is None or P_IDS is None: + print("no game to play!!!") + return + input = shlex.split(args) + if len(input) < 2: + print("act [amount]") + return + pname = input[0] + action = input[1] + amt = int(input[2]) if len(input) >= 3 else None + + pid, idx = get_pid_by_name(pname) + if pid is None: + print(f"unknown player:{pname}") + return + + legal = GAME.legal_actions(pid) + if action.lower() == 'allin': + atype = ActionType.ALL_IN + elif action.lower() in ('fold', 'call', 'check', 'bet', 'raise'): + atype = ActionType[action.upper()] + else: + print(f"invalid action:{action}") + return + if atype == ActionType.BET or atype == ActionType.RAISE: + if amt is None: + print(f"action {action} need amount") + return + if atype not in legal: + print(f"invalid action:{action} for player :{pname}") + return + + a = Action(type=atype, actor=pid, amount=amt) + GAME.add_action(a) + save_simple_game(GAME_FILE, GAME, P_NAME) + print(f" {pname} {action}" + (f" {amt}" if amt is not None else "")) + display_game_status(GAME, P_NAME) + if not GAME.terminal: + display_player_turn() + + + def cmd_status(args): + # 单开 + # if GAME is None: + # if os.path.exists(GAME_FILE): + # g, names = load_simple_game(GAME_FILE) + # display_game_status(g, names, show_cards_for=args.strip() if args else None) + # else: + # print("no game") + # else: + if GAME is None or P_NAME is None: + print("no game to display") + return + display_game_status(GAME, P_NAME, show_cards_for=args.strip() if args else None) + + def cmd_next(args): + global GAME + + GAME.advance_to_next_street() + save_simple_game(GAME_FILE, GAME, P_NAME) + print(" ----------- 推进到street----------") + display_game_status(GAME, P_NAME) + display_player_turn() + + + def cmd_save(args): + global GAME + if GAME is None: + print("no game to save") + return + save_simple_game(GAME_FILE, GAME, P_NAME) + print(" saved", GAME_FILE) + + # def cmd_load(args): + # global GAME, P_NAME, P_IDS + # if not os.path.exists(GAME_FILE): + # print("no saved game file.") + # return + # GAME, P_NAME = load_simple_game(GAME_FILE) + # P_IDS = [PlayerId(i, name) for i, name in P_NAME.items()] + # print(" Loaded", GAME_FILE) + # display_game_status(GAME, P_NAME) + # display_player_turn() + + def cmd_reset(args): + global GAME, P_NAME, P_IDS + if os.path.exists(GAME_FILE): + os.remove(GAME_FILE) + GAME = None + P_NAME = None + P_IDS = None + print(" reset game file") + + def cmd_help(args): + print("可用命令:") + print(" set / [p2 ...] [--stack N] ") + print(" act [amount]") + print(" status [player|all] ") + print(" save ") + # print(" load ") 单开不加载 + print(" reset ") + print(" ? ") + print(" q|e ") + + commands = { + 'set': cmd_set, + 'act': cmd_act, + 'status': cmd_status, + 'next': cmd_next, + 'save': cmd_save, + 'reset': cmd_reset, + '?': cmd_help, + } + + while True: + try: + raw = input("pg> ").strip() + except (EOFError, KeyboardInterrupt): + print() + break + if not raw: + continue + input_cmd = shlex.split(raw) + cmd = input_cmd[0].lower() + rest = raw[len(input_cmd[0]):].strip() + if cmd == 'q' or cmd == 'e': + break + fn = commands.get(cmd) + if fn: + fn(rest) + else: + print("unkown command:", cmd) + +if __name__ == "__main__": + main() \ No newline at end of file