#!/usr/bin/env python3 from __future__ import annotations import io import json import os import re import sys import traceback import urllib.error import urllib.request # 将 stdout 编码设置为 UTF-8,确保 emoji 等字符能正常输出 if hasattr(sys.stdout, "reconfigure"): sys.stdout.reconfigure(encoding="utf-8") else: sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8") sys.stderr = sys.stdout QUOTE_API_URL = "https://api.mairuiapi.com/hsrl/ssjy/{}/{}" DEFAULT_LICENCE = "3E81CB37-4BCE-4DAF-B0AC-AEA069A58973" FALLBACK_TEXT = "股票查询失败,请稍后再试。" # 防重复标记:5秒内防止同一股票代码被重复查询 _LOCK_FILE = os.path.join(os.path.dirname(__file__), ".stock_lock") _LOCK_TTL = 5 # 秒 def _acquire_lock(stock_code: str) -> bool: """尝试获取锁,成功返回True,失败(被锁定)返回False""" import time try: if os.path.exists(_LOCK_FILE): with open(_LOCK_FILE, "r") as f: locked_code, lock_time = f.read().strip().split("|") lock_time = float(lock_time) if locked_code == stock_code and time.time() - lock_time < _LOCK_TTL: return False # 还在锁定期内,跳过 with open(_LOCK_FILE, "w") as f: f.write(f"{stock_code}|{time.time()}") except Exception: pass return True # 股票名称 → 代码映射(常用 A 股) STOCK_NAME_MAP = { # 指数 "上证指数": "000001", "深证成指": "399001", "创业板指": "399006", "科创50": "000688", "沪深300": "000300", # 白酒 "贵州茅台": "600519", "五粮液": "000858", "泸州老窖": "000568", "洋河股份": "002304", "山西汾酒": "600809", "古井贡酒": "000596", # 银行 "工商银行": "601398", "建设银行": "601939", "农业银行": "601288", "中国银行": "601988", "招商银行": "600036", "兴业银行": "601166", "平安银行": "000001", "交通银行": "601328", "浦发银行": "600000", "中信银行": "601998", "民生银行": "600016", # 保险 "中国平安": "601318", "中国人寿": "601628", "中国太保": "601601", "中国人保": "601319", "新华保险": "601336", # 证券 "中信证券": "600030", "东方财富": "300059", "华泰证券": "601688", "国泰君安": "601211", "海通证券": "600837", "广发证券": "000776", "招商证券": "600999", "申万宏源": "000166", # 新能源 "宁德时代": "300750", "比亚迪": "002594", "隆基绿能": "601012", "阳光电源": "300274", "通威股份": "600438", "天齐锂业": "002466", "赣锋锂业": "002460", "亿纬锂能": "300014", "华友钴业": "600516", # 科技 "中兴通讯": "000063", "中芯国际": "688981", "海康威视": "002415", "浪潮信息": "000977", "韦尔股份": "603501", "北方华创": "002371", "紫光国微": "002049", "长电科技": "600584", "中科曙光": "603019", # 医药 "恒瑞医药": "600276", "药明康德": "603259", "迈瑞医疗": "300760", "复星医药": "600196", "智飞生物": "300122", "长春高新": "000661", "云南白药": "000538", "片仔癀": "600436", # 消费 "美的集团": "000333", "格力电器": "000651", "海尔智家": "600690", "伊利股份": "600887", "海天味业": "603288", "牧原股份": "002714", "金龙鱼": "300999", "双汇发展": "000895", # 地产 "万科A": "000002", "保利发展": "600048", "招商蛇口": "001979", "华润置地": "01109", "碧桂园": "02007", # 能源 "中国石油": "601857", "中国石化": "600028", "中国海油": "600938", "中煤能源": "601898", "陕西煤业": "601225", "长江电力": "600900", # 通信 "中国移动": "600941", "中国电信": "601728", "中国联通": "600941", # 汽车 "上汽集团": "600104", "长城汽车": "601633", "长安汽车": "000625", "赛力斯": "601127", "江淮汽车": "600418", # 军工 "中航沈飞": "600760", "航发动力": "600893", "中国重工": "601989", "中航西飞": "000768", "航天电器": "002025", # 互联网 "腾讯控股": "00700", "阿里巴巴": "09988", "美团": "03690", "京东": "09618", "百度": "09888", "网易": "09999", "小米集团": "01810", # 其他热门 "京东方A": "000725", "TCL科技": "000100", "顺丰控股": "002352", "科大讯飞": "002230", "三一重工": "600031", "万华化学": "600309", "中免集团": "601888", "中国中免": "601888", "中国神华": "601088", # 体育 "金陵体育": "300651", # 消费电子 "立讯精密": "002475", # 农牧 "新希望": "000876", } def resolve_stock_name(name: str) -> tuple[str, str] | None: """通过股票名称查找股票代码,返回 (代码, 名称) 支持全称匹配和模糊匹配(输入"茅台"也能找到"贵州茅台")""" name = name.strip() if not name: return None # 1. 精确匹配 if name in STOCK_NAME_MAP: return STOCK_NAME_MAP[name], name # 2. 模糊匹配:遍历映射,看输入是否是某个股票名称的子串或反之 candidates = [] for stock_name, code in STOCK_NAME_MAP.items(): # 输入是股票名的子串(如"茅台"→"贵州茅台") if name in stock_name: candidates.append((stock_name, code)) # 股票名是输入的子串 elif stock_name in name: candidates.append((stock_name, code)) if len(candidates) == 1: return candidates[0][1], candidates[0][0] elif len(candidates) > 1: # 多个匹配时选最短的(最精确) candidates.sort(key=lambda x: len(x[0])) return candidates[0][1], candidates[0][0] return None def normalize_stock_code(code: str) -> str: """验证并标准化股票代码(仅保留6位纯数字)""" code = code.strip() # 去掉可能的前缀(sh/SH/sz/SZ/bj/BJ) code = re.sub(r"^(sh|sz|bj)", "", code, flags=re.IGNORECASE) if re.match(r"^\d{6}$", code): return code # 如果传入了完整代码但包含其他字符,尝试提取数字 digits = re.findall(r"\d", code) if len(digits) >= 6: return "".join(digits[:6]) return "" def get_licence() -> str: """获取 licence,优先从环境变量读取""" return os.environ.get("STOCK_LICENCE", "").strip() or DEFAULT_LICENCE def fetch_stock_quote(stock_code: str) -> dict | None: """获取股票行情数据并解析为字典""" licence = get_licence() url = QUOTE_API_URL.format(stock_code, licence) try: req = urllib.request.Request(url, headers={"User-Agent": "Mozilla/5.0"}) with urllib.request.urlopen(req, timeout=10) as response: payload = json.load(response) except (urllib.error.URLError, TimeoutError, json.JSONDecodeError, OSError): return None # 处理 JSON 数组返回格式 [{}, ...] if isinstance(payload, list) and len(payload) > 0: payload = payload[0] if not isinstance(payload, dict): return None # 检查接口返回是否包含有效数据 if "p" not in payload: return None return payload def format_stock_message(data: dict, stock_code: str = "", stock_name: str = "") -> str: """格式化股票信息为可读文本""" p = data.get("p", 0) or 0 pc = data.get("pc", 0) or 0 ud = data.get("ud", 0) or 0 up_icon = "📈" if float(pc) >= 0 else "📉" if stock_name: title = f"【{stock_name}({stock_code})】" elif stock_code: title = f"【{stock_code}】" else: title = "【查询结果】" lines = [ title, f"{'=' * 20}", f"{up_icon} 当前价:{_fmt(p)}", f"📊 涨跌幅:{_fmt(pc)}%", f"📊 涨跌额:{_fmt(ud)}", f"⬆️ 最高:{_fmt(data.get('h'))}", f"⬇️ 最低:{_fmt(data.get('l'))}", f"🔓 今开:{_fmt(data.get('o'))}", f"🔒 昨收:{_fmt(data.get('yc'))}", f"📈 成交量:{_fmt(data.get('v'))}手", f"💰 成交额:{_fmt(data.get('cje'))}", f"🔄 换手率:{_fmt(data.get('hs'))}%", f"📐 振幅:{_fmt(data.get('zf'))}%", f"📊 市盈率:{_fmt(data.get('pe'))}", f"📊 市净率:{_fmt(data.get('sjl'))}", f"📊 量比:{_fmt(data.get('lb'))}", f"⚡ 涨速:{_fmt(data.get('zs'))}%", f"⏱ 五分钟涨跌:{_fmt(data.get('fm'))}%", f"📅 60日涨跌:{_fmt(data.get('zdf60'))}%", f"📅 年初至今:{_fmt(data.get('zdfnc'))}%", f"📊 总市值:{_fmt(data.get('sz'))}", f"📊 流通市值:{_fmt(data.get('lt'))}", f"{'=' * 20}", f"更新时间:{data.get('t', '未知')}", ] return "\n".join(lines) def _fmt(val) -> str: """统一格式化数值,金融数据保留2位小数""" if val is None: return "0" if isinstance(val, (int, float)): return f"{val:.2f}" return str(val) def send_text(text: str) -> bool: """发送文本消息到微信机器人""" robot_port = os.environ.get("ROBOT_WECHAT_CLIENT_PORT", "").strip() to_wxid = os.environ.get("ROBOT_FROM_WX_ID", "").strip() if not robot_port or not to_wxid: return False api_url = ( f"http://127.0.0.1:{robot_port}/api/v1/robot/message/send/text" ) body = json.dumps( { "to_wxid": to_wxid, "content": text, } ).encode("utf-8") request = urllib.request.Request( api_url, data=body, headers={"Content-Type": "application/json"}, method="POST", ) try: with urllib.request.urlopen(request, timeout=10) as response: if 200 <= response.status < 300: return True payload = json.load(response) except (urllib.error.URLError, TimeoutError, json.JSONDecodeError): return False code = payload.get("code") return code == 200 or code == 0 def main() -> int: args = sys.argv[1:] if not args: sys.stdout.write("请提供股票代码或名称,例如:python3 stock.py 600519 或 python3 stock.py 贵州茅台\n") return 0 raw = args[0] stock_code = "" stock_name = "" # 第一步:尝试名称映射 result = resolve_stock_name(raw) if result: stock_code, stock_name = result else: # 第二步:尝试数字代码解析 stock_code = normalize_stock_code(raw) if not stock_code and re.search(r"[\u4e00-\u9fff]", raw): # 输入包含中文 → 明显是想用名称查,但映射表里没有 sys.stdout.write(f"未找到「{raw}」对应的股票,请使用股票代码查询,例如 600519\n") sys.stdout.write("提示:如需添加新股票名称映射,请联系管理员。\n") return 0 if not stock_code: sys.stdout.write(FALLBACK_TEXT) sys.stdout.write("\n") return 0 # 防重复:5秒内相同股票不重复查询 if not _acquire_lock(stock_code): return 0 data = fetch_stock_quote(stock_code) if not data: sys.stdout.write(FALLBACK_TEXT) sys.stdout.write("\n") return 0 message = format_stock_message(data, stock_code, stock_name) if send_text(message): return 0 # 发送失败,输出到stdout让宿主机器人捕获 sys.stdout.write(message) sys.stdout.write("\n") return 0 if __name__ == "__main__": try: raise SystemExit(main()) except SystemExit: raise except Exception: traceback.print_exc(file=sys.stdout) raise SystemExit(1)