本文最后更新于61 天前,其中的信息可能已经过时,如有错误请5Yqg5b6u5L+ha2Fud2s2NjY=
import re
from typing import List, Dict, Any
_HEX_ID_RE = re.compile(r"^[0-9a-f]{24,32}$", re.I)
def parse_seat_bytes_to_json(data: bytes) -> List[Dict[str, Any]]:
def sanitize(s: str) -> str:
return "".join(ch for ch in s if ch >= " " or ch in "\t\n\r").strip()
def clean_id(s: str) -> str:
s = sanitize(s)
return s[:-1] if s.endswith("j") else s
def read_varint(buf: bytes, i: int):
shift = 0
val = 0
while True:
if i >= len(buf):
raise EOFError
b = buf[i]
i += 1
val |= (b & 0x7F) << shift
if not (b & 0x80):
return val, i
shift += 7
if shift > 70:
raise ValueError("varint too long")
def try_utf8(bs: bytes):
try:
return bs.decode("utf-8")
except Exception:
return None
def extract_payload_and_records(buf: bytes):
i = 0
payload = None
records: List[bytes] = []
while i < len(buf):
try:
key, i = read_varint(buf, i)
except Exception:
break
field = key >> 3
wtype = key & 0x7
if wtype == 0:
try:
_, i = read_varint(buf, i)
except Exception:
break
elif wtype == 2:
try:
ln, i = read_varint(buf, i)
except Exception:
break
if i + ln > len(buf):
break
chunk = buf[i:i + ln]
i += ln
if field == 4:
payload = chunk
elif field == 1:
s = try_utf8(chunk)
is_header = (s is not None and len(s) < 60 and all(32 <= ord(c) <= 126 for c in s))
if not is_header:
records.append(chunk)
elif wtype == 5:
i += 4
elif wtype == 1:
i += 8
else:
i += 1
return payload, records
def recursive_collect_strings(buf: bytes, depth: int = 0, max_depth: int = 6) -> List[str]:
if depth > max_depth:
return []
i = 0
out: List[str] = []
while i < len(buf):
try:
key, i = read_varint(buf, i)
except Exception:
break
wtype = key & 0x7
if wtype == 0:
try:
_, i = read_varint(buf, i)
except Exception:
break
elif wtype == 2:
try:
ln, i = read_varint(buf, i)
except Exception:
break
if i + ln > len(buf):
break
chunk = buf[i:i + ln]
i += ln
s = try_utf8(chunk)
if s is not None:
s2 = sanitize(s)
if s2 and s2 != "feature-0j":
out.append(s2)
else:
if ln >= 6:
out.extend(recursive_collect_strings(chunk, depth + 1, max_depth))
elif wtype == 5:
i += 4
elif wtype == 1:
i += 8
else:
i += 1
return out
def parse_row_seat(seat_name: str):
row = ""
seat_no = ""
m = re.search(r"(\d+)\s*排", seat_name)
if m:
row = m.group(1)
m = re.search(r"(\d+)\s*座", seat_name)
if m:
seat_no = m.group(1)
return row, seat_no
payload, _ = extract_payload_and_records(data)
if not payload:
raise ValueError("未找到外层 field=4 payload,数据结构可能不同。")
_, records = extract_payload_and_records(payload)
if not records:
raise ValueError("未在 payload 中找到 repeated record bytes(field=1)。")
result: List[Dict[str, Any]] = []
for rec in records:
svals = recursive_collect_strings(rec)
ids: List[str] = []
for s in svals:
cs = clean_id(s)
if _HEX_ID_RE.match(cs) and cs not in ids:
ids.append(cs)
seat_concrete_id = ids[0] if len(ids) > 0 else ""
zone_id = ids[1] if len(ids) > 1 else ""
seat_name = next((s for s in svals if ("排" in s and "Row" in s and "座" in s)), "")
row, seat_no = parse_row_seat(seat_name)
zone_name = next((s for s in svals if ("看台" in s and "号" in s)), "")
sector_name = next((s for s in svals if s == "看台"), "")
result.append({
"seatConcreteId": seat_concrete_id,
"zoneId": zone_id,
"seatName": seat_name,
"row": row,
"zoneName": zone_name,
"sectorName": sector_name,
"seatNo": seat_no,
})
return result
if __name__ == "__main__":
import requests
import json
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/140.0.0.0 Safari/537.36",
}
url = "https://jstkt-static.jussyun.com/prod/seat/20260109/6960aea1e5a2d80001f35f30/6960af39efdd4c00012a5d40_1_1767943996277.proto?versionNo=2"
response = requests.get(url, headers=headers)
seats = parse_seat_bytes_to_json(response.content)
print(json.dumps(seats, ensure_ascii=False, indent=2))