import itertools import numpy as np import random from collections import defaultdict from collections.abc import Iterable from pathlib import Path from poker import Suit, Card from shortdeck import ShortDeckHandEvaluator as HE from shortdeck import ShortDeckRank as SDR data_path = Path(".") / "ehs-data" np_river = np.load(data_path / "river_ehs_sd.npy") np_turn = np.load(data_path / "turn_hist_sd.npy") np_flop = np.load(data_path / "flop_hist_sd.npy") cards = [Card(r, s) for r in SDR for s in Suit] CARD_BITS = 6 class SuitMapping: def __init__(self): self.mapping = {} self.suits = list(reversed(Suit)) def map_suit(self, s: Suit) -> Suit: if s not in self.mapping: self.mapping[s] = self.suits.pop() return self.mapping[s] def get_rank_idx(rank: SDR) -> int: rank_order = [SDR.SIX, SDR.SEVEN, SDR.EIGHT, SDR.NINE, SDR.TEN, SDR.JACK,SDR.QUEEN, SDR.KING, SDR.ACE] return rank_order.index(rank) def get_suit_idx(suit: Suit) -> int: suit_order = [Suit.SPADES, Suit.HEARTS, Suit.DIAMONDS, Suit.CLUBS] return suit_order.index(suit) def card_index(card: Card) -> int: return (get_rank_idx(card.rank) + 4) * 4 + get_suit_idx(card.suit) Card.__eq__ = lambda a, b: (a.rank == b.rank) and (a.suit == b.suit) Card.__hash__ = lambda a: hash((get_rank_idx(a.rank), get_suit_idx(a.suit))) def cards_to_u32(cards: list[Card]) -> int: res = 0 for i, card in enumerate(cards): bits = card_index(card) & 0x3F res |= bits << (i * CARD_BITS) return res def to_iso(cards: list[Card], mapping: SuitMapping) -> list[Card]: def count_suit(card: Card) -> int: return sum(1 for other in cards if other.suit == card.suit) sorted_cards = sorted( cards, key=lambda c: (count_suit(c), get_rank_idx(c.rank), get_suit_idx(c.suit)) ) res = [] for card in sorted_cards: mapped_suit = mapping.map_suit(card.suit) res.append(Card(card.rank, mapped_suit)) return sorted(res, key=lambda c: (get_rank_idx(c.rank), get_suit_idx(c.suit))) def cards_to_u16(cards: list[Card]) -> int: res = 0 for i, card in enumerate(cards): bits = card_index(card) & 0x3F res |= bits << (i * CARD_BITS) return res def calc_river_ehs(board: list[Card], player: list[Card]) -> float: player_hand = [*board, *player] player_ranking = HE.evaluate_hand(player_hand) acc = 0 sum = 0 for other in itertools.combinations(cards, 2): if set(other) & set(player_hand): continue if set(other) & set(board): continue other_ranking = HE.evaluate_hand([*board, *other]) if player_ranking == other_ranking: acc += 1 elif player_ranking > other_ranking: acc += 2 sum += 2 return acc / sum def get_data(board: list[Card], player: list[Card]): def _get_data(data, board: list[Card], player: list[Card]): suit_map = SuitMapping() iso_board = to_iso(board, suit_map) iso_player = to_iso(player, suit_map) mask_board = data["board"] == cards_to_u32(iso_board) mask_player = data["player"] == cards_to_u16(iso_player) return data[mask_board & mask_player][0][2] match len(board): case 3: return _get_data(np_flop, board, player) case 4: return _get_data(np_turn, board, player) case 5: return _get_data(np_river, board, player) case _: raise NotImplementedError def euclidean_dist(left, right): if isinstance(left, Iterable): v1 = np.sort(np.array(left, dtype=np.float32)) v2 = np.sort(np.array(right, dtype=np.float32)) return np.linalg.norm(v2 - v1) else: return np.abs(left - right) ** 2 def compare_data(sampled, board, player): err_count = 0 d = euclidean_dist(get_data(board, player), sampled) if not np.isclose(d, 0.0): print(f"[{''.join(map(str, board))} {''.join(map(str, player))}]: {d}") err_count += 1 return err_count def calc_turn_hist(board, player) -> list[float]: flop = board[:3] turn = board[3] used_cards = set(board + player) ehs_values = [] for river in cards: if river in used_cards: continue board = [*flop, turn, river] ehs = calc_river_ehs(board, player) ehs_values.append(ehs) return ehs_values def analysis(flop, player, sampled): print(f"sampled flop: {''.join(map(str, flop))}") print(f"sampled player cards: {''.join(map(str, player))}") compare_data( [sampled[t][r] for t in sampled for r in sampled[t] if t > r], flop, player, ) for turn in sampled: compare_data(list(sampled[turn].values()), [*flop, turn], player) for river in sampled[turn]: if turn > river: continue compare_data(sampled[turn][river], [*flop, turn, river], player) def validate_river(n = 100): all_combos = list(itertools.combinations(cards, 5)) sampled_combos = random.sample(all_combos, min(n, len(all_combos))) error_count = 0 for i, river_combo in enumerate(sampled_combos): board = list(river_combo) unused_cards = [c for c in cards if c not in board] player = list(random.sample(unused_cards, 2)) ehs = calc_river_ehs(board, player) error_count = compare_data(ehs, board, player) # print(f"river validate count: {i}") print(f"Validated river : {n}, Errors: {error_count}") def validate_turn(n = 50): error_count = 0 for i in range(n): turn_combo = random.sample(cards, 4) board = list(turn_combo) unused_cards = [c for c in cards if c not in board] player = list(random.sample(unused_cards, 2)) turn_hist = calc_turn_hist(board, player) error_count += compare_data(turn_hist, board, player) # print(f"turn validate count: {i}") print(f"Validated turn : {n}, Errors: {error_count}") CACHE = {} def calc_river_ehs_cached(board, player) -> float: suit_map = SuitMapping() iso_board = to_iso(board, suit_map) iso_player = to_iso(player, suit_map) hand = "".join(map(str, [*iso_board, *iso_player])) if hand not in CACHE: CACHE[hand] = calc_river_ehs(board, player) return CACHE[hand] def validate_flop(): sample = random.sample(cards, 5) flop = sample[2:] player = sample[:2] sampled_ehs = defaultdict(dict) for turn in cards: if turn in flop or turn in player: continue for river in cards: if river in flop or river in player or river == turn: continue board = [*flop, turn, river] sampled_ehs[turn][river] = calc_river_ehs_cached(board, player) print(".", end="", flush=True) print("") analysis(flop, player, sampled_ehs) def cross_validate_main(): validate_river(1000) validate_turn(500) validate_flop() if __name__ == "__main__": cross_validate_main()