314 lines
11 KiB
Python
314 lines
11 KiB
Python
#!/usr/bin/env python3
|
|
import json
|
|
import os
|
|
import sys
|
|
from pathlib import Path
|
|
from typing import Optional, List, Dict, Any, Tuple
|
|
|
|
sys.path.insert(0, str(Path(__file__).parent.parent))
|
|
import json
|
|
from pathlib import Path
|
|
from gametree import Game, PlayerId, Action, ActionType, Street
|
|
from gametree.model import act_fold, act_call, act_check, act_bet, act_raise, act_all_in
|
|
|
|
|
|
GAME_FILE = "pg.json"
|
|
|
|
def create_file_idx():
|
|
Path("data").mkdir(exist_ok=True)
|
|
files = list(Path("data").glob("pg_*.json"))
|
|
game_id = len(files) + 1
|
|
return f"data/pg_{game_id:03d}.json"
|
|
|
|
def create_simple_game(player_names: List[str],
|
|
small_blind: int = 5,
|
|
big_blind: int = 10,
|
|
stack: int = 500,
|
|
dealer_idx: int = 0):
|
|
player_map: Dict[int, str] = {i: name for i, name in enumerate(player_names)}
|
|
player_ids: List[PlayerId] = [PlayerId(i,name) for i,name in enumerate(player_names)]
|
|
players_init = [(pid, stack) for pid in player_ids]
|
|
game = Game(players_init=players_init, dealer_idx=dealer_idx,
|
|
small_blind=small_blind, big_blind=big_blind)
|
|
return game, player_map, player_ids
|
|
|
|
def save_simple_game(path: str, game: Game, players) -> None:
|
|
p = Path(path)
|
|
data = {
|
|
"player_names": players,
|
|
"game_state": {
|
|
"players_init": [[pid.id, stack] for pid, stack in getattr(game, "players_init", [])],
|
|
"dealer_idx": getattr(game, "dealer_idx", None),
|
|
"small_blind": getattr(game, "small_blind", None),
|
|
"big_blind": getattr(game, "big_blind", None),
|
|
"current_street": getattr(game, "current_street").name if getattr(game, "current_street", None) else None,
|
|
"all_actions": [
|
|
{"type": a.type.name, "actor": int(a.actor.id), "amount": a.amount}
|
|
for a in game.get_all_actions()
|
|
]
|
|
}
|
|
}
|
|
p.write_text(json.dumps(data, indent=2), encoding="utf-8")
|
|
|
|
def display_game_status(game: Game, player_names: Dict[int,str], show_cards_for: Optional[str] = None) -> None:
|
|
print(f"Street: {game.current_street.name}")
|
|
board = game.get_current_board()
|
|
if board:
|
|
print("Board:", " ".join(str(c) for c in board))
|
|
print("Pot:", getattr(game, "total_pot", 0))
|
|
for i, pstate in enumerate(game.players):
|
|
name = player_names.get(i, f"Player_{i}")
|
|
marks = []
|
|
if i == getattr(game, "dealer_idx", None):
|
|
marks.append("D")
|
|
if i == getattr(game, "next_to_act_idx", None):
|
|
marks.append("->")
|
|
if pstate.folded:
|
|
marks.append("F")
|
|
if pstate.allin:
|
|
marks.append("A")
|
|
info = f"{i}:{name} stack={pstate.stack} bet={pstate.committed} [{' '.join(marks)}]"
|
|
if show_cards_for == "all" or show_cards_for == name:
|
|
cards = game.get_hand_cards(PlayerId(i)) or []
|
|
info += " hand=" + " ".join(str(c) for c in cards)
|
|
print(info)
|
|
|
|
if getattr(game, "terminal", True):
|
|
display_winners(game, player_names)
|
|
|
|
def display_winners(game: Game, player_names: Dict[int,str]) -> None:
|
|
print("=" * 50)
|
|
print("GAME OVER")
|
|
|
|
winners = getattr(game, "winner", [])
|
|
if winners:
|
|
winner_names = []
|
|
for pid in winners:
|
|
winner_idx = getattr(pid, "id", None)
|
|
if winner_idx is not None and winner_idx in player_names:
|
|
winner_names.append(player_names[winner_idx])
|
|
else:
|
|
winner_names.append(str(pid))
|
|
print(f"Winner(s): {', '.join(winner_names)}")
|
|
hand_rankings = getattr(game, "hand_rankings", {})
|
|
if hand_rankings:
|
|
print("\nHand Rankings:")
|
|
for pid, ranking in hand_rankings.items():
|
|
player_idx = getattr(pid, "id", None)
|
|
player_name = player_names.get(player_idx, str(pid)) if player_idx is not None else str(pid)
|
|
cards = game.get_hand_cards(pid) or []
|
|
board = game.get_current_board()
|
|
all_cards = cards + board
|
|
print(f" {player_name}: {' '.join(str(c) for c in cards)} | {ranking}")
|
|
|
|
print("=" * 50)
|
|
|
|
|
|
def display_player_turn():
|
|
if GAME is None or P_IDS is None:
|
|
print("No game loaded/created")
|
|
return
|
|
|
|
idx = getattr(GAME, "next_to_act_idx", None)
|
|
if idx is None:
|
|
GAME._get_first_act_idx()
|
|
idx = getattr(GAME, "next_to_act_idx", None)
|
|
if idx is None:
|
|
print("No next-to-act player (index is None)")
|
|
return
|
|
pid = GAME.get_player_by_idx(idx)
|
|
pstate = GAME.players[idx]
|
|
# todo : 接口
|
|
to_call = max(0, getattr(GAME, "current_bet", 0) - getattr(pstate, "committed", 0))
|
|
allin_amount = getattr(pstate, "stack", 0)
|
|
last_raise = getattr(GAME, "last_raise", 0) or 0
|
|
|
|
actions_display = GAME.legal_actions(pid)
|
|
|
|
player_name = P_NAME.get(idx, f"Player_{idx}") if isinstance(P_NAME, dict) else f"Player_{idx}"
|
|
pid_name = getattr(pid, "name", None)
|
|
print("\n--- Next to act ---")
|
|
print(f"player index: {idx}")
|
|
print(f"player name: {player_name}")
|
|
print(f"player id: {pid.id}" + (f" ({pid_name})"))
|
|
print("legal actions:")
|
|
for a in actions_display:
|
|
if a == ActionType.FOLD:
|
|
print(" - fold")
|
|
elif a == ActionType.CALL:
|
|
print(f" - call (amount: {GAME.get_call_amt(pid)})")
|
|
elif a == ActionType.CHECK:
|
|
print(" - check")
|
|
elif a == ActionType.BET:
|
|
min_bet, max_bet = GAME.get_bet_bound(pid)
|
|
print(f" - bet (min: {min_bet}, max: {max_bet})")
|
|
elif a == ActionType.RAISE:
|
|
min_raise, max_raise = GAME.get_raise_bounds(pid)
|
|
print(f" - raise (min: {min_raise}, max: {max_raise})")
|
|
elif a == ActionType.ALLIN:
|
|
print(f" - allin (amount: {GAME.get_allin_amt(pid)})")
|
|
|
|
|
|
print("----------------------------------------------------")
|
|
|
|
|
|
def main():
|
|
global GAME, P_IDS, CUR_PID, P_NAME
|
|
import shlex
|
|
|
|
def get_pid_by_name(name):
|
|
if P_NAME is None or P_IDS is None:
|
|
return None, None
|
|
for idx, pid in enumerate(P_IDS):
|
|
if P_NAME.get(idx) == name:
|
|
return pid, idx
|
|
return None, None
|
|
|
|
def cmd_set(args):
|
|
global GAME, P_NAME, P_IDS, GAME_FILE
|
|
input_line = args.split()
|
|
if len(input_line) < 3:
|
|
print("usage: set <small>/<big> [player ...] [--stack N]")
|
|
return
|
|
blinds = input_line[0].split('/')
|
|
names = []
|
|
stack = 500
|
|
for p in input_line[1:]:
|
|
if not p.startswith('--stack'):
|
|
names.append(p)
|
|
|
|
if len(input_line) >= 4 and input_line[-2] == '--stack':
|
|
stack = int(input_line[-1])
|
|
names = input_line[1:-2]
|
|
GAME_FILE = create_file_idx()
|
|
game, pname_map, pids = create_simple_game(names,
|
|
small_blind=int(blinds[0]),
|
|
big_blind=int(blinds[1]),
|
|
stack=stack)
|
|
GAME = game
|
|
P_NAME = pname_map
|
|
P_IDS = pids
|
|
save_simple_game(GAME_FILE, GAME, P_NAME)
|
|
GAME._get_first_act_idx()
|
|
display_game_status(GAME, P_NAME)
|
|
display_player_turn()
|
|
|
|
|
|
def cmd_act(args):
|
|
global GAME
|
|
if GAME is None or P_IDS is None:
|
|
print("no game to play!!!")
|
|
return
|
|
input = shlex.split(args)
|
|
if len(input) < 2:
|
|
print("act <player_name> <fold|call|check|bet|raise|allin> [amount]")
|
|
return
|
|
pname = input[0]
|
|
action = input[1]
|
|
amt = int(input[2]) if len(input) >= 3 else None
|
|
|
|
pid, idx = get_pid_by_name(pname)
|
|
if pid is None:
|
|
print(f"unknown player:{pname}")
|
|
return
|
|
|
|
legal = GAME.legal_actions(pid)
|
|
if action.lower() == 'allin':
|
|
atype = ActionType.ALLIN
|
|
elif action.lower() in ('fold', 'call', 'check', 'bet', 'raise'):
|
|
atype = ActionType[action.upper()]
|
|
else:
|
|
print(f"invalid action:{action}")
|
|
return
|
|
if atype == ActionType.BET or atype == ActionType.RAISE:
|
|
if amt is None:
|
|
print(f"action {action} need amount")
|
|
return
|
|
if atype not in legal:
|
|
print(f"invalid action:{action} for player :{pname}")
|
|
return
|
|
|
|
a = Action(type=atype, actor=pid, amount=amt)
|
|
GAME.add_action(a)
|
|
save_simple_game(GAME_FILE, GAME, P_NAME)
|
|
print(f" {pname} {action}" + (f" {amt}" if amt is not None else ""))
|
|
display_game_status(GAME, P_NAME)
|
|
if not GAME.terminal:
|
|
display_player_turn()
|
|
|
|
|
|
def cmd_status(args):
|
|
if GAME is None or P_NAME is None:
|
|
print("no game to display")
|
|
return
|
|
display_game_status(GAME, P_NAME, show_cards_for=args.strip() if args else None)
|
|
|
|
def cmd_next(args):
|
|
global GAME
|
|
|
|
GAME.advance_to_next_street()
|
|
save_simple_game(GAME_FILE, GAME, P_NAME)
|
|
print(" ----------- 推进到street----------")
|
|
display_game_status(GAME, P_NAME)
|
|
display_player_turn()
|
|
|
|
|
|
def cmd_save(args):
|
|
global GAME
|
|
if GAME is None:
|
|
print("no game to save")
|
|
return
|
|
save_simple_game(GAME_FILE, GAME, P_NAME)
|
|
print(" saved", GAME_FILE)
|
|
|
|
def cmd_reset(args):
|
|
global GAME, P_NAME, P_IDS, GAME_FILE
|
|
if os.path.exists(GAME_FILE):
|
|
os.remove(GAME_FILE)
|
|
GAME = None
|
|
P_NAME = None
|
|
P_IDS = None
|
|
GAME_FILE = "pg.json"
|
|
print(" reset game file")
|
|
|
|
def cmd_help(args):
|
|
print("可用命令:")
|
|
print(" set <small>/<big> <p1> [p2 ...] [--stack N] ")
|
|
print(" act <player> <fold|call|check|bet|raise|allin> [amount]")
|
|
print(" status [player|all] ")
|
|
print(" save ")
|
|
print(" reset ")
|
|
print(" ? ")
|
|
print(" q|e ")
|
|
|
|
commands = {
|
|
'set': cmd_set,
|
|
'act': cmd_act,
|
|
'status': cmd_status,
|
|
'next': cmd_next,
|
|
'save': cmd_save,
|
|
'reset': cmd_reset,
|
|
'?': cmd_help,
|
|
}
|
|
|
|
while True:
|
|
try:
|
|
raw = input("pg> ").strip()
|
|
except (EOFError, KeyboardInterrupt):
|
|
print()
|
|
break
|
|
if not raw:
|
|
continue
|
|
input_cmd = shlex.split(raw)
|
|
cmd = input_cmd[0].lower()
|
|
rest = raw[len(input_cmd[0]):].strip()
|
|
if cmd == 'q' or cmd == 'e':
|
|
break
|
|
fn = commands.get(cmd)
|
|
if fn:
|
|
fn(rest)
|
|
else:
|
|
print("unkown command:", cmd)
|
|
|
|
if __name__ == "__main__":
|
|
main() |