From d986198103570a16f9b39aa1683a367530296e8d Mon Sep 17 00:00:00 2001 From: lj091715 <1091062319@qq.com> Date: Tue, 19 May 2026 10:50:59 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0=20gpdm/stock.py?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- gpdm/stock.py | 491 +++++++++++++++++++++++++++++++------------------- 1 file changed, 304 insertions(+), 187 deletions(-) diff --git a/gpdm/stock.py b/gpdm/stock.py index 86d9b33..cf0808a 100644 --- a/gpdm/stock.py +++ b/gpdm/stock.py @@ -1,187 +1,304 @@ -#!/usr/bin/env python3 -from __future__ import annotations -import io -import json -import os -import re -import sys -import traceback -import urllib.error -import urllib.request - -# 将 stdout 编码设置为 UTF-8,确保 emoji 等字符能正常输出 -if hasattr(sys.stdout, "reconfigure"): - sys.stdout.reconfigure(encoding="utf-8") -else: - sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8") -sys.stderr = sys.stdout - -QUOTE_API_URL = "https://api.mairuiapi.com/hsrl/ssjy/{}/{}" -DEFAULT_LICENCE = "3E81CB37-4BCE-4DAF-B0AC-AEA069A58973" -FALLBACK_TEXT = "股票查询失败,请稍后再试。" - - -def normalize_stock_code(code: str) -> str: - """验证并标准化股票代码(仅保留6位纯数字)""" - code = code.strip() - # 去掉可能的前缀(sh/SH/sz/SZ/bj/BJ) - code = re.sub(r"^(sh|sz|bj)", "", code, flags=re.IGNORECASE) - if re.match(r"^\d{6}$", code): - return code - # 如果传入了完整代码但包含其他字符,尝试提取数字 - digits = re.findall(r"\d", code) - if len(digits) >= 6: - return "".join(digits[:6]) - return "" - - -def get_licence() -> str: - """获取 licence,优先从环境变量读取""" - return os.environ.get("STOCK_LICENCE", "").strip() or DEFAULT_LICENCE - - -def fetch_stock_quote(stock_code: str) -> dict | None: - """获取股票行情数据并解析为字典""" - licence = get_licence() - url = QUOTE_API_URL.format(stock_code, licence) - try: - req = urllib.request.Request(url, headers={"User-Agent": "Mozilla/5.0"}) - with urllib.request.urlopen(req, timeout=10) as response: - payload = json.load(response) - except (urllib.error.URLError, TimeoutError, json.JSONDecodeError, OSError): - return None - - # 处理 JSON 数组返回格式 [{}, ...] - if isinstance(payload, list) and len(payload) > 0: - payload = payload[0] - - if not isinstance(payload, dict): - return None - - # 检查接口返回是否包含有效数据 - if "p" not in payload: - return None - - return payload - - -def format_stock_message(data: dict, stock_code: str = "") -> str: - """格式化股票信息为可读文本""" - p = data.get("p", 0) or 0 - pc = data.get("pc", 0) or 0 - ud = data.get("ud", 0) or 0 - - up_icon = "📈" if float(pc) >= 0 else "📉" - title = f"【{stock_code}】" if stock_code else "【查询结果】" - - lines = [ - title, - f"{'=' * 20}", - f"{up_icon} 当前价:{_fmt(p)}", - f"📊 涨跌幅:{_fmt(pc)}%", - f"📊 涨跌额:{_fmt(ud)}", - f"⬆️ 最高:{_fmt(data.get('h'))}", - f"⬇️ 最低:{_fmt(data.get('l'))}", - f"🔓 今开:{_fmt(data.get('o'))}", - f"🔒 昨收:{_fmt(data.get('yc'))}", - f"📈 成交量:{_fmt(data.get('v'))}手", - f"💰 成交额:{_fmt(data.get('cje'))}", - f"🔄 换手率:{_fmt(data.get('hs'))}%", - f"📐 振幅:{_fmt(data.get('zf'))}%", - f"📊 市盈率:{_fmt(data.get('pe'))}", - f"📊 市净率:{_fmt(data.get('sjl'))}", - f"📊 量比:{_fmt(data.get('lb'))}", - f"⚡ 涨速:{_fmt(data.get('zs'))}%", - f"⏱ 五分钟涨跌:{_fmt(data.get('fm'))}%", - f"📅 60日涨跌:{_fmt(data.get('zdf60'))}%", - f"📅 年初至今:{_fmt(data.get('zdfnc'))}%", - f"📊 总市值:{_fmt(data.get('sz'))}", - f"📊 流通市值:{_fmt(data.get('lt'))}", - f"{'=' * 20}", - f"更新时间:{data.get('t', '未知')}", - ] - return "\n".join(lines) - - -def _fmt(val) -> str: - """统一格式化数值,金融数据保留2位小数""" - if val is None: - return "0" - if isinstance(val, (int, float)): - return f"{val:.2f}" - return str(val) - - -def send_text(text: str) -> bool: - """发送文本消息到微信机器人""" - robot_port = os.environ.get("ROBOT_WECHAT_CLIENT_PORT", "").strip() - to_wxid = os.environ.get("ROBOT_FROM_WX_ID", "").strip() - if not robot_port or not to_wxid: - return False - - api_url = ( - f"http://127.0.0.1:{robot_port}/api/v1/robot/message/send/text" - ) - body = json.dumps( - { - "to_wxid": to_wxid, - "content": text, - } - ).encode("utf-8") - - request = urllib.request.Request( - api_url, - data=body, - headers={"Content-Type": "application/json"}, - method="POST", - ) - - try: - with urllib.request.urlopen(request, timeout=10) as response: - if 200 <= response.status < 300: - return True - payload = json.load(response) - except (urllib.error.URLError, TimeoutError, json.JSONDecodeError): - return False - - code = payload.get("code") - return code == 200 or code == 0 - - -def main() -> int: - args = sys.argv[1:] - if not args: - sys.stdout.write("请提供股票代码,例如:python3 stock.py 600519\n") - return 0 - - raw_code = args[0] - stock_code = normalize_stock_code(raw_code) - if not stock_code: - sys.stdout.write(FALLBACK_TEXT) - sys.stdout.write("\n") - return 0 - - data = fetch_stock_quote(stock_code) - if not data: - sys.stdout.write(FALLBACK_TEXT) - sys.stdout.write("\n") - return 0 - - message = format_stock_message(data, stock_code) - if send_text(message): - return 0 - - # 发送失败,输出到stdout让宿主机器人捕获 - sys.stdout.write(message) - sys.stdout.write("\n") - return 0 - - -if __name__ == "__main__": - try: - raise SystemExit(main()) - except SystemExit: - raise - except Exception: - traceback.print_exc(file=sys.stdout) - raise SystemExit(1) +#!/usr/bin/env python3 +from __future__ import annotations +import io +import json +import os +import re +import sys +import traceback +import urllib.error +import urllib.request + +# 将 stdout 编码设置为 UTF-8,确保 emoji 等字符能正常输出 +if hasattr(sys.stdout, "reconfigure"): + sys.stdout.reconfigure(encoding="utf-8") +else: + sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8") +sys.stderr = sys.stdout + +QUOTE_API_URL = "https://api.mairuiapi.com/hsrl/ssjy/{}/{}" +DEFAULT_LICENCE = "3E81CB37-4BCE-4DAF-B0AC-AEA069A58973" +FALLBACK_TEXT = "股票查询失败,请稍后再试。" + +# 股票名称 → 代码映射(常用 A 股) +STOCK_NAME_MAP = { + # 指数 + "上证指数": "000001", "深证成指": "399001", "创业板指": "399006", + "科创50": "000688", "沪深300": "000300", + # 白酒 + "贵州茅台": "600519", "五粮液": "000858", "泸州老窖": "000568", + "洋河股份": "002304", "山西汾酒": "600809", "古井贡酒": "000596", + # 银行 + "工商银行": "601398", "建设银行": "601939", "农业银行": "601288", + "中国银行": "601988", "招商银行": "600036", "兴业银行": "601166", + "平安银行": "000001", "交通银行": "601328", "浦发银行": "600000", + "中信银行": "601998", "民生银行": "600016", + # 保险 + "中国平安": "601318", "中国人寿": "601628", "中国太保": "601601", + "中国人保": "601319", "新华保险": "601336", + # 证券 + "中信证券": "600030", "东方财富": "300059", "华泰证券": "601688", + "国泰君安": "601211", "海通证券": "600837", "广发证券": "000776", + "招商证券": "600999", "申万宏源": "000166", + # 新能源 + "宁德时代": "300750", "比亚迪": "002594", "隆基绿能": "601012", + "阳光电源": "300274", "通威股份": "600438", "天齐锂业": "002466", + "赣锋锂业": "002460", "亿纬锂能": "300014", "华友钴业": "600516", + # 科技 + "中兴通讯": "000063", "中芯国际": "688981", "海康威视": "002415", + "浪潮信息": "000977", "韦尔股份": "603501", "北方华创": "002371", + "紫光国微": "002049", "长电科技": "600584", "中科曙光": "603019", + # 医药 + "恒瑞医药": "600276", "药明康德": "603259", "迈瑞医疗": "300760", + "复星医药": "600196", "智飞生物": "300122", "长春高新": "000661", + "云南白药": "000538", "片仔癀": "600436", + # 消费 + "美的集团": "000333", "格力电器": "000651", "海尔智家": "600690", + "伊利股份": "600887", "海天味业": "603288", "牧原股份": "002714", + "金龙鱼": "300999", "双汇发展": "000895", + # 地产 + "万科A": "000002", "保利发展": "600048", "招商蛇口": "001979", + "华润置地": "01109", "碧桂园": "02007", + # 能源 + "中国石油": "601857", "中国石化": "600028", "中国海油": "600938", + "中煤能源": "601898", "陕西煤业": "601225", "长江电力": "600900", + # 通信 + "中国移动": "600941", "中国电信": "601728", "中国联通": "600941", + # 汽车 + "上汽集团": "600104", "长城汽车": "601633", "长安汽车": "000625", + "赛力斯": "601127", "江淮汽车": "600418", + # 军工 + "中航沈飞": "600760", "航发动力": "600893", "中国重工": "601989", + "中航西飞": "000768", "航天电器": "002025", + # 互联网 + "腾讯控股": "00700", "阿里巴巴": "09988", "美团": "03690", + "京东": "09618", "百度": "09888", "网易": "09999", + "小米集团": "01810", + # 其他热门 + "京东方A": "000725", "TCL科技": "000100", "顺丰控股": "002352", + "科大讯飞": "002230", "三一重工": "600031", "万华化学": "600309", + "中免集团": "601888", "中国中免": "601888", "中国神华": "601088", + # 体育 + "金陵体育": "300651", + # 消费电子 + "立讯精密": "002475", + # 农牧 + "新希望": "000876", +} + + +def resolve_stock_name(name: str) -> tuple[str, str] | None: + """通过股票名称查找股票代码,返回 (代码, 名称) + 支持全称匹配和模糊匹配(输入"茅台"也能找到"贵州茅台")""" + name = name.strip() + if not name: + return None + + # 1. 精确匹配 + if name in STOCK_NAME_MAP: + return STOCK_NAME_MAP[name], name + + # 2. 模糊匹配:遍历映射,看输入是否是某个股票名称的子串或反之 + candidates = [] + for stock_name, code in STOCK_NAME_MAP.items(): + # 输入是股票名的子串(如"茅台"→"贵州茅台") + if name in stock_name: + candidates.append((stock_name, code)) + # 股票名是输入的子串 + elif stock_name in name: + candidates.append((stock_name, code)) + + if len(candidates) == 1: + return candidates[0][1], candidates[0][0] + elif len(candidates) > 1: + # 多个匹配时选最短的(最精确) + candidates.sort(key=lambda x: len(x[0])) + return candidates[0][1], candidates[0][0] + + return None + + +def normalize_stock_code(code: str) -> str: + """验证并标准化股票代码(仅保留6位纯数字)""" + code = code.strip() + # 去掉可能的前缀(sh/SH/sz/SZ/bj/BJ) + code = re.sub(r"^(sh|sz|bj)", "", code, flags=re.IGNORECASE) + if re.match(r"^\d{6}$", code): + return code + # 如果传入了完整代码但包含其他字符,尝试提取数字 + digits = re.findall(r"\d", code) + if len(digits) >= 6: + return "".join(digits[:6]) + return "" + + +def get_licence() -> str: + """获取 licence,优先从环境变量读取""" + return os.environ.get("STOCK_LICENCE", "").strip() or DEFAULT_LICENCE + + +def fetch_stock_quote(stock_code: str) -> dict | None: + """获取股票行情数据并解析为字典""" + licence = get_licence() + url = QUOTE_API_URL.format(stock_code, licence) + try: + req = urllib.request.Request(url, headers={"User-Agent": "Mozilla/5.0"}) + with urllib.request.urlopen(req, timeout=10) as response: + payload = json.load(response) + except (urllib.error.URLError, TimeoutError, json.JSONDecodeError, OSError): + return None + + # 处理 JSON 数组返回格式 [{}, ...] + if isinstance(payload, list) and len(payload) > 0: + payload = payload[0] + + if not isinstance(payload, dict): + return None + + # 检查接口返回是否包含有效数据 + if "p" not in payload: + return None + + return payload + + +def format_stock_message(data: dict, stock_code: str = "", stock_name: str = "") -> str: + """格式化股票信息为可读文本""" + p = data.get("p", 0) or 0 + pc = data.get("pc", 0) or 0 + ud = data.get("ud", 0) or 0 + + up_icon = "📈" if float(pc) >= 0 else "📉" + if stock_name: + title = f"【{stock_name}({stock_code})】" + elif stock_code: + title = f"【{stock_code}】" + else: + title = "【查询结果】" + + lines = [ + title, + f"{'=' * 20}", + f"{up_icon} 当前价:{_fmt(p)}", + f"📊 涨跌幅:{_fmt(pc)}%", + f"📊 涨跌额:{_fmt(ud)}", + f"⬆️ 最高:{_fmt(data.get('h'))}", + f"⬇️ 最低:{_fmt(data.get('l'))}", + f"🔓 今开:{_fmt(data.get('o'))}", + f"🔒 昨收:{_fmt(data.get('yc'))}", + f"📈 成交量:{_fmt(data.get('v'))}手", + f"💰 成交额:{_fmt(data.get('cje'))}", + f"🔄 换手率:{_fmt(data.get('hs'))}%", + f"📐 振幅:{_fmt(data.get('zf'))}%", + f"📊 市盈率:{_fmt(data.get('pe'))}", + f"📊 市净率:{_fmt(data.get('sjl'))}", + f"📊 量比:{_fmt(data.get('lb'))}", + f"⚡ 涨速:{_fmt(data.get('zs'))}%", + f"⏱ 五分钟涨跌:{_fmt(data.get('fm'))}%", + f"📅 60日涨跌:{_fmt(data.get('zdf60'))}%", + f"📅 年初至今:{_fmt(data.get('zdfnc'))}%", + f"📊 总市值:{_fmt(data.get('sz'))}", + f"📊 流通市值:{_fmt(data.get('lt'))}", + f"{'=' * 20}", + f"更新时间:{data.get('t', '未知')}", + ] + return "\n".join(lines) + + +def _fmt(val) -> str: + """统一格式化数值,金融数据保留2位小数""" + if val is None: + return "0" + if isinstance(val, (int, float)): + return f"{val:.2f}" + return str(val) + + +def send_text(text: str) -> bool: + """发送文本消息到微信机器人""" + robot_port = os.environ.get("ROBOT_WECHAT_CLIENT_PORT", "").strip() + to_wxid = os.environ.get("ROBOT_FROM_WX_ID", "").strip() + if not robot_port or not to_wxid: + return False + + api_url = ( + f"http://127.0.0.1:{robot_port}/api/v1/robot/message/send/text" + ) + body = json.dumps( + { + "to_wxid": to_wxid, + "content": text, + } + ).encode("utf-8") + + request = urllib.request.Request( + api_url, + data=body, + headers={"Content-Type": "application/json"}, + method="POST", + ) + + try: + with urllib.request.urlopen(request, timeout=10) as response: + if 200 <= response.status < 300: + return True + payload = json.load(response) + except (urllib.error.URLError, TimeoutError, json.JSONDecodeError): + return False + + code = payload.get("code") + return code == 200 or code == 0 + + +def main() -> int: + args = sys.argv[1:] + if not args: + sys.stdout.write("请提供股票代码或名称,例如:python3 stock.py 600519 或 python3 stock.py 贵州茅台\n") + return 0 + + raw = args[0] + stock_code = "" + stock_name = "" + + # 第一步:尝试名称映射 + result = resolve_stock_name(raw) + if result: + stock_code, stock_name = result + else: + # 第二步:尝试数字代码解析 + stock_code = normalize_stock_code(raw) + if not stock_code and re.search(r"[\u4e00-\u9fff]", raw): + # 输入包含中文 → 明显是想用名称查,但映射表里没有 + sys.stdout.write(f"未找到「{raw}」对应的股票,请使用股票代码查询,例如 600519\n") + sys.stdout.write("提示:如需添加新股票名称映射,请联系管理员。\n") + return 0 + + if not stock_code: + sys.stdout.write(FALLBACK_TEXT) + sys.stdout.write("\n") + return 0 + + data = fetch_stock_quote(stock_code) + if not data: + sys.stdout.write(FALLBACK_TEXT) + sys.stdout.write("\n") + return 0 + + message = format_stock_message(data, stock_code, stock_name) + if send_text(message): + return 0 + + # 发送失败,输出到stdout让宿主机器人捕获 + sys.stdout.write(message) + sys.stdout.write("\n") + return 0 + + +if __name__ == "__main__": + try: + raise SystemExit(main()) + except SystemExit: + raise + except Exception: + traceback.print_exc(file=sys.stdout) + raise SystemExit(1)