更新 gpdm/stock.py

This commit is contained in:
lj091715 2026-05-19 10:50:59 +08:00
parent 9ee7919825
commit d986198103

View File

@ -1,187 +1,304 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
from __future__ import annotations from __future__ import annotations
import io import io
import json import json
import os import os
import re import re
import sys import sys
import traceback import traceback
import urllib.error import urllib.error
import urllib.request import urllib.request
# 将 stdout 编码设置为 UTF-8确保 emoji 等字符能正常输出 # 将 stdout 编码设置为 UTF-8确保 emoji 等字符能正常输出
if hasattr(sys.stdout, "reconfigure"): if hasattr(sys.stdout, "reconfigure"):
sys.stdout.reconfigure(encoding="utf-8") sys.stdout.reconfigure(encoding="utf-8")
else: else:
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8") sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8")
sys.stderr = sys.stdout sys.stderr = sys.stdout
QUOTE_API_URL = "https://api.mairuiapi.com/hsrl/ssjy/{}/{}" QUOTE_API_URL = "https://api.mairuiapi.com/hsrl/ssjy/{}/{}"
DEFAULT_LICENCE = "3E81CB37-4BCE-4DAF-B0AC-AEA069A58973" DEFAULT_LICENCE = "3E81CB37-4BCE-4DAF-B0AC-AEA069A58973"
FALLBACK_TEXT = "股票查询失败,请稍后再试。" FALLBACK_TEXT = "股票查询失败,请稍后再试。"
# 股票名称 → 代码映射(常用 A 股)
def normalize_stock_code(code: str) -> str: STOCK_NAME_MAP = {
"""验证并标准化股票代码仅保留6位纯数字""" # 指数
code = code.strip() "上证指数": "000001", "深证成指": "399001", "创业板指": "399006",
# 去掉可能的前缀sh/SH/sz/SZ/bj/BJ "科创50": "000688", "沪深300": "000300",
code = re.sub(r"^(sh|sz|bj)", "", code, flags=re.IGNORECASE) # 白酒
if re.match(r"^\d{6}$", code): "贵州茅台": "600519", "五粮液": "000858", "泸州老窖": "000568",
return code "洋河股份": "002304", "山西汾酒": "600809", "古井贡酒": "000596",
# 如果传入了完整代码但包含其他字符,尝试提取数字 # 银行
digits = re.findall(r"\d", code) "工商银行": "601398", "建设银行": "601939", "农业银行": "601288",
if len(digits) >= 6: "中国银行": "601988", "招商银行": "600036", "兴业银行": "601166",
return "".join(digits[:6]) "平安银行": "000001", "交通银行": "601328", "浦发银行": "600000",
return "" "中信银行": "601998", "民生银行": "600016",
# 保险
"中国平安": "601318", "中国人寿": "601628", "中国太保": "601601",
def get_licence() -> str: "中国人保": "601319", "新华保险": "601336",
"""获取 licence优先从环境变量读取""" # 证券
return os.environ.get("STOCK_LICENCE", "").strip() or DEFAULT_LICENCE "中信证券": "600030", "东方财富": "300059", "华泰证券": "601688",
"国泰君安": "601211", "海通证券": "600837", "广发证券": "000776",
"招商证券": "600999", "申万宏源": "000166",
def fetch_stock_quote(stock_code: str) -> dict | None: # 新能源
"""获取股票行情数据并解析为字典""" "宁德时代": "300750", "比亚迪": "002594", "隆基绿能": "601012",
licence = get_licence() "阳光电源": "300274", "通威股份": "600438", "天齐锂业": "002466",
url = QUOTE_API_URL.format(stock_code, licence) "赣锋锂业": "002460", "亿纬锂能": "300014", "华友钴业": "600516",
try: # 科技
req = urllib.request.Request(url, headers={"User-Agent": "Mozilla/5.0"}) "中兴通讯": "000063", "中芯国际": "688981", "海康威视": "002415",
with urllib.request.urlopen(req, timeout=10) as response: "浪潮信息": "000977", "韦尔股份": "603501", "北方华创": "002371",
payload = json.load(response) "紫光国微": "002049", "长电科技": "600584", "中科曙光": "603019",
except (urllib.error.URLError, TimeoutError, json.JSONDecodeError, OSError): # 医药
return None "恒瑞医药": "600276", "药明康德": "603259", "迈瑞医疗": "300760",
"复星医药": "600196", "智飞生物": "300122", "长春高新": "000661",
# 处理 JSON 数组返回格式 [{}, ...] "云南白药": "000538", "片仔癀": "600436",
if isinstance(payload, list) and len(payload) > 0: # 消费
payload = payload[0] "美的集团": "000333", "格力电器": "000651", "海尔智家": "600690",
"伊利股份": "600887", "海天味业": "603288", "牧原股份": "002714",
if not isinstance(payload, dict): "金龙鱼": "300999", "双汇发展": "000895",
return None # 地产
"万科A": "000002", "保利发展": "600048", "招商蛇口": "001979",
# 检查接口返回是否包含有效数据 "华润置地": "01109", "碧桂园": "02007",
if "p" not in payload: # 能源
return None "中国石油": "601857", "中国石化": "600028", "中国海油": "600938",
"中煤能源": "601898", "陕西煤业": "601225", "长江电力": "600900",
return payload # 通信
"中国移动": "600941", "中国电信": "601728", "中国联通": "600941",
# 汽车
def format_stock_message(data: dict, stock_code: str = "") -> str: "上汽集团": "600104", "长城汽车": "601633", "长安汽车": "000625",
"""格式化股票信息为可读文本""" "赛力斯": "601127", "江淮汽车": "600418",
p = data.get("p", 0) or 0 # 军工
pc = data.get("pc", 0) or 0 "中航沈飞": "600760", "航发动力": "600893", "中国重工": "601989",
ud = data.get("ud", 0) or 0 "中航西飞": "000768", "航天电器": "002025",
# 互联网
up_icon = "📈" if float(pc) >= 0 else "📉" "腾讯控股": "00700", "阿里巴巴": "09988", "美团": "03690",
title = f"{stock_code}" if stock_code else "【查询结果】" "京东": "09618", "百度": "09888", "网易": "09999",
"小米集团": "01810",
lines = [ # 其他热门
title, "京东方A": "000725", "TCL科技": "000100", "顺丰控股": "002352",
f"{'=' * 20}", "科大讯飞": "002230", "三一重工": "600031", "万华化学": "600309",
f"{up_icon} 当前价:{_fmt(p)}", "中免集团": "601888", "中国中免": "601888", "中国神华": "601088",
f"📊 涨跌幅:{_fmt(pc)}%", # 体育
f"📊 涨跌额:{_fmt(ud)}", "金陵体育": "300651",
f"⬆️ 最高:{_fmt(data.get('h'))}", # 消费电子
f"⬇️ 最低:{_fmt(data.get('l'))}", "立讯精密": "002475",
f"🔓 今开:{_fmt(data.get('o'))}", # 农牧
f"🔒 昨收:{_fmt(data.get('yc'))}", "新希望": "000876",
f"📈 成交量:{_fmt(data.get('v'))}", }
f"💰 成交额:{_fmt(data.get('cje'))}",
f"🔄 换手率:{_fmt(data.get('hs'))}%",
f"📐 振幅:{_fmt(data.get('zf'))}%", def resolve_stock_name(name: str) -> tuple[str, str] | None:
f"📊 市盈率:{_fmt(data.get('pe'))}", """通过股票名称查找股票代码,返回 (代码, 名称)
f"📊 市净率:{_fmt(data.get('sjl'))}", 支持全称匹配和模糊匹配输入"茅台"也能找到"贵州茅台""""
f"📊 量比:{_fmt(data.get('lb'))}", name = name.strip()
f"⚡ 涨速:{_fmt(data.get('zs'))}%", if not name:
f"⏱ 五分钟涨跌:{_fmt(data.get('fm'))}%", return None
f"📅 60日涨跌{_fmt(data.get('zdf60'))}%",
f"📅 年初至今:{_fmt(data.get('zdfnc'))}%", # 1. 精确匹配
f"📊 总市值:{_fmt(data.get('sz'))}", if name in STOCK_NAME_MAP:
f"📊 流通市值:{_fmt(data.get('lt'))}", return STOCK_NAME_MAP[name], name
f"{'=' * 20}",
f"更新时间:{data.get('t', '未知')}", # 2. 模糊匹配:遍历映射,看输入是否是某个股票名称的子串或反之
] candidates = []
return "\n".join(lines) for stock_name, code in STOCK_NAME_MAP.items():
# 输入是股票名的子串(如"茅台"→"贵州茅台"
if name in stock_name:
def _fmt(val) -> str: candidates.append((stock_name, code))
"""统一格式化数值金融数据保留2位小数""" # 股票名是输入的子串
if val is None: elif stock_name in name:
return "0" candidates.append((stock_name, code))
if isinstance(val, (int, float)):
return f"{val:.2f}" if len(candidates) == 1:
return str(val) return candidates[0][1], candidates[0][0]
elif len(candidates) > 1:
# 多个匹配时选最短的(最精确)
def send_text(text: str) -> bool: candidates.sort(key=lambda x: len(x[0]))
"""发送文本消息到微信机器人""" return candidates[0][1], candidates[0][0]
robot_port = os.environ.get("ROBOT_WECHAT_CLIENT_PORT", "").strip()
to_wxid = os.environ.get("ROBOT_FROM_WX_ID", "").strip() return None
if not robot_port or not to_wxid:
return False
def normalize_stock_code(code: str) -> str:
api_url = ( """验证并标准化股票代码仅保留6位纯数字"""
f"http://127.0.0.1:{robot_port}/api/v1/robot/message/send/text" code = code.strip()
) # 去掉可能的前缀sh/SH/sz/SZ/bj/BJ
body = json.dumps( code = re.sub(r"^(sh|sz|bj)", "", code, flags=re.IGNORECASE)
{ if re.match(r"^\d{6}$", code):
"to_wxid": to_wxid, return code
"content": text, # 如果传入了完整代码但包含其他字符,尝试提取数字
} digits = re.findall(r"\d", code)
).encode("utf-8") if len(digits) >= 6:
return "".join(digits[:6])
request = urllib.request.Request( return ""
api_url,
data=body,
headers={"Content-Type": "application/json"}, def get_licence() -> str:
method="POST", """获取 licence优先从环境变量读取"""
) return os.environ.get("STOCK_LICENCE", "").strip() or DEFAULT_LICENCE
try:
with urllib.request.urlopen(request, timeout=10) as response: def fetch_stock_quote(stock_code: str) -> dict | None:
if 200 <= response.status < 300: """获取股票行情数据并解析为字典"""
return True licence = get_licence()
payload = json.load(response) url = QUOTE_API_URL.format(stock_code, licence)
except (urllib.error.URLError, TimeoutError, json.JSONDecodeError): try:
return False req = urllib.request.Request(url, headers={"User-Agent": "Mozilla/5.0"})
with urllib.request.urlopen(req, timeout=10) as response:
code = payload.get("code") payload = json.load(response)
return code == 200 or code == 0 except (urllib.error.URLError, TimeoutError, json.JSONDecodeError, OSError):
return None
def main() -> int: # 处理 JSON 数组返回格式 [{}, ...]
args = sys.argv[1:] if isinstance(payload, list) and len(payload) > 0:
if not args: payload = payload[0]
sys.stdout.write("请提供股票代码例如python3 stock.py 600519\n")
return 0 if not isinstance(payload, dict):
return None
raw_code = args[0]
stock_code = normalize_stock_code(raw_code) # 检查接口返回是否包含有效数据
if not stock_code: if "p" not in payload:
sys.stdout.write(FALLBACK_TEXT) return None
sys.stdout.write("\n")
return 0 return payload
data = fetch_stock_quote(stock_code)
if not data: def format_stock_message(data: dict, stock_code: str = "", stock_name: str = "") -> str:
sys.stdout.write(FALLBACK_TEXT) """格式化股票信息为可读文本"""
sys.stdout.write("\n") p = data.get("p", 0) or 0
return 0 pc = data.get("pc", 0) or 0
ud = data.get("ud", 0) or 0
message = format_stock_message(data, stock_code)
if send_text(message): up_icon = "📈" if float(pc) >= 0 else "📉"
return 0 if stock_name:
title = f"{stock_name}({stock_code})】"
# 发送失败输出到stdout让宿主机器人捕获 elif stock_code:
sys.stdout.write(message) title = f"{stock_code}"
sys.stdout.write("\n") else:
return 0 title = "【查询结果】"
lines = [
if __name__ == "__main__": title,
try: f"{'=' * 20}",
raise SystemExit(main()) f"{up_icon} 当前价:{_fmt(p)}",
except SystemExit: f"📊 涨跌幅:{_fmt(pc)}%",
raise f"📊 涨跌额:{_fmt(ud)}",
except Exception: f"⬆️ 最高:{_fmt(data.get('h'))}",
traceback.print_exc(file=sys.stdout) f"⬇️ 最低:{_fmt(data.get('l'))}",
raise SystemExit(1) f"🔓 今开:{_fmt(data.get('o'))}",
f"🔒 昨收:{_fmt(data.get('yc'))}",
f"📈 成交量:{_fmt(data.get('v'))}",
f"💰 成交额:{_fmt(data.get('cje'))}",
f"🔄 换手率:{_fmt(data.get('hs'))}%",
f"📐 振幅:{_fmt(data.get('zf'))}%",
f"📊 市盈率:{_fmt(data.get('pe'))}",
f"📊 市净率:{_fmt(data.get('sjl'))}",
f"📊 量比:{_fmt(data.get('lb'))}",
f"⚡ 涨速:{_fmt(data.get('zs'))}%",
f"⏱ 五分钟涨跌:{_fmt(data.get('fm'))}%",
f"📅 60日涨跌{_fmt(data.get('zdf60'))}%",
f"📅 年初至今:{_fmt(data.get('zdfnc'))}%",
f"📊 总市值:{_fmt(data.get('sz'))}",
f"📊 流通市值:{_fmt(data.get('lt'))}",
f"{'=' * 20}",
f"更新时间:{data.get('t', '未知')}",
]
return "\n".join(lines)
def _fmt(val) -> str:
"""统一格式化数值金融数据保留2位小数"""
if val is None:
return "0"
if isinstance(val, (int, float)):
return f"{val:.2f}"
return str(val)
def send_text(text: str) -> bool:
"""发送文本消息到微信机器人"""
robot_port = os.environ.get("ROBOT_WECHAT_CLIENT_PORT", "").strip()
to_wxid = os.environ.get("ROBOT_FROM_WX_ID", "").strip()
if not robot_port or not to_wxid:
return False
api_url = (
f"http://127.0.0.1:{robot_port}/api/v1/robot/message/send/text"
)
body = json.dumps(
{
"to_wxid": to_wxid,
"content": text,
}
).encode("utf-8")
request = urllib.request.Request(
api_url,
data=body,
headers={"Content-Type": "application/json"},
method="POST",
)
try:
with urllib.request.urlopen(request, timeout=10) as response:
if 200 <= response.status < 300:
return True
payload = json.load(response)
except (urllib.error.URLError, TimeoutError, json.JSONDecodeError):
return False
code = payload.get("code")
return code == 200 or code == 0
def main() -> int:
args = sys.argv[1:]
if not args:
sys.stdout.write("请提供股票代码或名称例如python3 stock.py 600519 或 python3 stock.py 贵州茅台\n")
return 0
raw = args[0]
stock_code = ""
stock_name = ""
# 第一步:尝试名称映射
result = resolve_stock_name(raw)
if result:
stock_code, stock_name = result
else:
# 第二步:尝试数字代码解析
stock_code = normalize_stock_code(raw)
if not stock_code and re.search(r"[\u4e00-\u9fff]", raw):
# 输入包含中文 → 明显是想用名称查,但映射表里没有
sys.stdout.write(f"未找到「{raw}」对应的股票,请使用股票代码查询,例如 600519\n")
sys.stdout.write("提示:如需添加新股票名称映射,请联系管理员。\n")
return 0
if not stock_code:
sys.stdout.write(FALLBACK_TEXT)
sys.stdout.write("\n")
return 0
data = fetch_stock_quote(stock_code)
if not data:
sys.stdout.write(FALLBACK_TEXT)
sys.stdout.write("\n")
return 0
message = format_stock_message(data, stock_code, stock_name)
if send_text(message):
return 0
# 发送失败输出到stdout让宿主机器人捕获
sys.stdout.write(message)
sys.stdout.write("\n")
return 0
if __name__ == "__main__":
try:
raise SystemExit(main())
except SystemExit:
raise
except Exception:
traceback.print_exc(file=sys.stdout)
raise SystemExit(1)