某体育购票proto文件解析
本文最后更新于61 天前,其中的信息可能已经过时,如有错误请5Yqg5b6u5L+ha2Fud2s2NjY=
import re
from typing import List, Dict, Any

_HEX_ID_RE = re.compile(r"^[0-9a-f]{24,32}$", re.I)


def parse_seat_bytes_to_json(data: bytes) -> List[Dict[str, Any]]:
    def sanitize(s: str) -> str:
        return "".join(ch for ch in s if ch >= " " or ch in "\t\n\r").strip()
    def clean_id(s: str) -> str:
        s = sanitize(s)
        return s[:-1] if s.endswith("j") else s

    def read_varint(buf: bytes, i: int):
        shift = 0
        val = 0
        while True:
            if i >= len(buf):
                raise EOFError
            b = buf[i]
            i += 1
            val |= (b & 0x7F) << shift
            if not (b & 0x80):
                return val, i
            shift += 7
            if shift > 70:
                raise ValueError("varint too long")

    def try_utf8(bs: bytes):
        try:
            return bs.decode("utf-8")
        except Exception:
            return None

    def extract_payload_and_records(buf: bytes):
        i = 0
        payload = None
        records: List[bytes] = []
        while i < len(buf):
            try:
                key, i = read_varint(buf, i)
            except Exception:
                break
            field = key >> 3
            wtype = key & 0x7

            if wtype == 0:
                try:
                    _, i = read_varint(buf, i)
                except Exception:
                    break
            elif wtype == 2:
                try:
                    ln, i = read_varint(buf, i)
                except Exception:
                    break
                if i + ln > len(buf):
                    break
                chunk = buf[i:i + ln]
                i += ln

                if field == 4:
                    payload = chunk
                elif field == 1:
                    s = try_utf8(chunk)
                    is_header = (s is not None and len(s) < 60 and all(32 <= ord(c) <= 126 for c in s))
                    if not is_header:
                        records.append(chunk)
            elif wtype == 5:
                i += 4
            elif wtype == 1:
                i += 8
            else:
                i += 1

        return payload, records

    def recursive_collect_strings(buf: bytes, depth: int = 0, max_depth: int = 6) -> List[str]:
        if depth > max_depth:
            return []
        i = 0
        out: List[str] = []
        while i < len(buf):
            try:
                key, i = read_varint(buf, i)
            except Exception:
                break
            wtype = key & 0x7

            if wtype == 0:
                try:
                    _, i = read_varint(buf, i)
                except Exception:
                    break
            elif wtype == 2:
                try:
                    ln, i = read_varint(buf, i)
                except Exception:
                    break
                if i + ln > len(buf):
                    break
                chunk = buf[i:i + ln]
                i += ln

                s = try_utf8(chunk)
                if s is not None:
                    s2 = sanitize(s)
                    if s2 and s2 != "feature-0j":
                        out.append(s2)
                else:
                    if ln >= 6:
                        out.extend(recursive_collect_strings(chunk, depth + 1, max_depth))
            elif wtype == 5:
                i += 4
            elif wtype == 1:
                i += 8
            else:
                i += 1
        return out

    def parse_row_seat(seat_name: str):
        row = ""
        seat_no = ""
        m = re.search(r"(\d+)\s*排", seat_name)
        if m:
            row = m.group(1)
        m = re.search(r"(\d+)\s*座", seat_name)
        if m:
            seat_no = m.group(1)
        return row, seat_no


    payload, _ = extract_payload_and_records(data)
    if not payload:
        raise ValueError("未找到外层 field=4 payload,数据结构可能不同。")

    _, records = extract_payload_and_records(payload)
    if not records:
        raise ValueError("未在 payload 中找到 repeated record bytes(field=1)。")

    result: List[Dict[str, Any]] = []
    for rec in records:
        svals = recursive_collect_strings(rec)

        ids: List[str] = []
        for s in svals:
            cs = clean_id(s)
            if _HEX_ID_RE.match(cs) and cs not in ids:
                ids.append(cs)

        seat_concrete_id = ids[0] if len(ids) > 0 else ""
        zone_id = ids[1] if len(ids) > 1 else ""

        seat_name = next((s for s in svals if ("排" in s and "Row" in s and "座" in s)), "")
        row, seat_no = parse_row_seat(seat_name)

        zone_name = next((s for s in svals if ("看台" in s and "号" in s)), "")
        sector_name = next((s for s in svals if s == "看台"), "")

        result.append({
            "seatConcreteId": seat_concrete_id,
            "zoneId": zone_id,
            "seatName": seat_name,
            "row": row,
            "zoneName": zone_name,
            "sectorName": sector_name,
            "seatNo": seat_no,
        })

    return result


if __name__ == "__main__":
    import requests
    import json
    headers = {
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/140.0.0.0 Safari/537.36",
}
    url = "https://jstkt-static.jussyun.com/prod/seat/20260109/6960aea1e5a2d80001f35f30/6960af39efdd4c00012a5d40_1_1767943996277.proto?versionNo=2"
    response = requests.get(url, headers=headers)
    seats = parse_seat_bytes_to_json(response.content)
    print(json.dumps(seats, ensure_ascii=False, indent=2))
文末附加内容
上一篇