#!/usr/bin/env python3 from __future__ import annotations import io import json import os import re import sys import traceback import urllib.error import urllib.request # 将 stdout 编码设置为 UTF-8,确保 emoji 等字符能正常输出 if hasattr(sys.stdout, "reconfigure"): sys.stdout.reconfigure(encoding="utf-8") else: sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8") sys.stderr = sys.stdout QUOTE_API_URL = "https://qt.gtimg.cn/q={}" FALLBACK_TEXT = "股票查询失败,请稍后再试。" def normalize_stock_code(code: str) -> str: """自动补全股票代码市场前缀""" code = code.strip().upper() # 如果已经带前缀,直接返回 if code.startswith("SH") or code.startswith("SZ"): return code # 如果已经是带 sh/sz 前缀 if re.match(r"^(sh|sz)\d{6}$", code, re.IGNORECASE): return code.upper() # 纯数字处理 if re.match(r"^\d{6}$", code): # 沪市:6开头 if code.startswith("6"): return f"SH{code}" # 深市:0或3开头 elif code.startswith(("0", "3")): return f"SZ{code}" # 北交所:8开头 elif code.startswith(("8", "4")): return f"BJ{code}" # 无法识别 return code def parse_market_prefix(code: str) -> str: """将 SH/SZ 转换为 qq 接口使用的 sh/sz 前缀""" code = code.upper() if code.startswith("SH"): return f"sh{code[2:]}" elif code.startswith("SZ"): return f"sz{code[2:]}" elif code.startswith("BJ"): return f"bj{code[2:]}" return code def fetch_stock_quote(stock_code: str) -> dict | None: """获取股票行情数据并解析为字典""" try: url = QUOTE_API_URL.format(stock_code) with urllib.request.urlopen(url, timeout=10) as response: raw = response.read().decode("gbk") except (urllib.error.URLError, TimeoutError, OSError): return None # 解析返回数据,格式如:v_sh600519="..."; match = re.search(r'="([^"]+)"', raw) if not match: return None fields = match.group(1).split("~") if len(fields) < 45: return None return { "name": fields[1], # 股票名称 "code": fields[2], # 股票代码 "price": fields[3], # 当前价格 "yest_close": fields[4], # 昨收价 "open": fields[5], # 今开价 "volume": fields[6], # 成交量(手) "amount": fields[37], # 成交额(万) "high": fields[33], # 最高价 "low": fields[34], # 最低价 "change": fields[31], # 涨跌额 "change_pct": fields[32], # 涨跌幅(%) } def format_stock_message(data: dict) -> str: """格式化股票信息为可读文本""" up_icon = "📈" if float(data.get("change", 0)) >= 0 else "📉" lines = [ f"【{data['name']}({data['code']})】", f"{'=' * 20}", f"{up_icon} 当前价:{data['price']}", f"📊 涨跌幅:{data['change_pct']}%", f"📊 涨跌额:{data['change']}", f"⬆️ 最高:{data['high']}", f"⬇️ 最低:{data['low']}", f"🔓 今开:{data['open']}", f"🔒 昨收:{data['yest_close']}", f"📈 成交量:{data['volume']}", f"💰 成交额:{data['amount']}万", ] return "\n".join(lines) def send_text(text: str) -> bool: """发送文本消息到微信机器人""" robot_port = os.environ.get("ROBOT_WECHAT_CLIENT_PORT", "").strip() to_wxid = os.environ.get("ROBOT_FROM_WX_ID", "").strip() if not robot_port or not to_wxid: return False api_url = ( f"http://127.0.0.1:{robot_port}/api/v1/robot/message/send/text" ) body = json.dumps( { "to_wxid": to_wxid, "content": text, } ).encode("utf-8") request = urllib.request.Request( api_url, data=body, headers={"Content-Type": "application/json"}, method="POST", ) try: with urllib.request.urlopen(request, timeout=10) as response: if 200 <= response.status < 300: return True payload = json.load(response) except (urllib.error.URLError, TimeoutError, json.JSONDecodeError): return False code = payload.get("code") return code == 200 or code == 0 def main() -> int: args = sys.argv[1:] if not args: sys.stdout.write("请提供股票代码,例如:python3 stock.py 600519\n") return 0 raw_code = args[0] normalized = normalize_stock_code(raw_code) quoted_code = parse_market_prefix(normalized) # 在QQ接口中,多个股票用逗号分隔 data = fetch_stock_quote(quoted_code) if not data: sys.stdout.write(FALLBACK_TEXT) sys.stdout.write("\n") return 0 message = format_stock_message(data) if send_text(message): return 0 # 发送失败,输出到stdout让宿主机器人捕获 sys.stdout.write(message) sys.stdout.write("\n") return 0 if __name__ == "__main__": try: raise SystemExit(main()) except SystemExit: raise except Exception: traceback.print_exc(file=sys.stdout) raise SystemExit(1)