st/gpdm/stock.py
2026-05-19 10:58:06 +08:00

330 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
from __future__ import annotations
import io
import json
import os
import re
import sys
import traceback
import urllib.error
import urllib.request
# 将 stdout 编码设置为 UTF-8确保 emoji 等字符能正常输出
if hasattr(sys.stdout, "reconfigure"):
sys.stdout.reconfigure(encoding="utf-8")
else:
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8")
sys.stderr = sys.stdout
QUOTE_API_URL = "https://api.mairuiapi.com/hsrl/ssjy/{}/{}"
DEFAULT_LICENCE = "3E81CB37-4BCE-4DAF-B0AC-AEA069A58973"
FALLBACK_TEXT = "股票查询失败,请稍后再试。"
# 防重复标记5秒内防止同一股票代码被重复查询
_LOCK_FILE = os.path.join(os.path.dirname(__file__), ".stock_lock")
_LOCK_TTL = 5 # 秒
def _acquire_lock(stock_code: str) -> bool:
"""尝试获取锁成功返回True失败被锁定返回False"""
import time
try:
if os.path.exists(_LOCK_FILE):
with open(_LOCK_FILE, "r") as f:
locked_code, lock_time = f.read().strip().split("|")
lock_time = float(lock_time)
if locked_code == stock_code and time.time() - lock_time < _LOCK_TTL:
return False # 还在锁定期内,跳过
with open(_LOCK_FILE, "w") as f:
f.write(f"{stock_code}|{time.time()}")
except Exception:
pass
return True
# 股票名称 → 代码映射(常用 A 股)
STOCK_NAME_MAP = {
# 指数
"上证指数": "000001", "深证成指": "399001", "创业板指": "399006",
"科创50": "000688", "沪深300": "000300",
# 白酒
"贵州茅台": "600519", "五粮液": "000858", "泸州老窖": "000568",
"洋河股份": "002304", "山西汾酒": "600809", "古井贡酒": "000596",
# 银行
"工商银行": "601398", "建设银行": "601939", "农业银行": "601288",
"中国银行": "601988", "招商银行": "600036", "兴业银行": "601166",
"平安银行": "000001", "交通银行": "601328", "浦发银行": "600000",
"中信银行": "601998", "民生银行": "600016",
# 保险
"中国平安": "601318", "中国人寿": "601628", "中国太保": "601601",
"中国人保": "601319", "新华保险": "601336",
# 证券
"中信证券": "600030", "东方财富": "300059", "华泰证券": "601688",
"国泰君安": "601211", "海通证券": "600837", "广发证券": "000776",
"招商证券": "600999", "申万宏源": "000166",
# 新能源
"宁德时代": "300750", "比亚迪": "002594", "隆基绿能": "601012",
"阳光电源": "300274", "通威股份": "600438", "天齐锂业": "002466",
"赣锋锂业": "002460", "亿纬锂能": "300014", "华友钴业": "600516",
# 科技
"中兴通讯": "000063", "中芯国际": "688981", "海康威视": "002415",
"浪潮信息": "000977", "韦尔股份": "603501", "北方华创": "002371",
"紫光国微": "002049", "长电科技": "600584", "中科曙光": "603019",
# 医药
"恒瑞医药": "600276", "药明康德": "603259", "迈瑞医疗": "300760",
"复星医药": "600196", "智飞生物": "300122", "长春高新": "000661",
"云南白药": "000538", "片仔癀": "600436",
# 消费
"美的集团": "000333", "格力电器": "000651", "海尔智家": "600690",
"伊利股份": "600887", "海天味业": "603288", "牧原股份": "002714",
"金龙鱼": "300999", "双汇发展": "000895",
# 地产
"万科A": "000002", "保利发展": "600048", "招商蛇口": "001979",
"华润置地": "01109", "碧桂园": "02007",
# 能源
"中国石油": "601857", "中国石化": "600028", "中国海油": "600938",
"中煤能源": "601898", "陕西煤业": "601225", "长江电力": "600900",
# 通信
"中国移动": "600941", "中国电信": "601728", "中国联通": "600941",
# 汽车
"上汽集团": "600104", "长城汽车": "601633", "长安汽车": "000625",
"赛力斯": "601127", "江淮汽车": "600418",
# 军工
"中航沈飞": "600760", "航发动力": "600893", "中国重工": "601989",
"中航西飞": "000768", "航天电器": "002025",
# 互联网
"腾讯控股": "00700", "阿里巴巴": "09988", "美团": "03690",
"京东": "09618", "百度": "09888", "网易": "09999",
"小米集团": "01810",
# 其他热门
"京东方A": "000725", "TCL科技": "000100", "顺丰控股": "002352",
"科大讯飞": "002230", "三一重工": "600031", "万华化学": "600309",
"中免集团": "601888", "中国中免": "601888", "中国神华": "601088",
# 体育
"金陵体育": "300651",
# 消费电子
"立讯精密": "002475",
# 农牧
"新希望": "000876",
}
def resolve_stock_name(name: str) -> tuple[str, str] | None:
"""通过股票名称查找股票代码,返回 (代码, 名称)
支持全称匹配和模糊匹配(输入"茅台"也能找到"贵州茅台""""
name = name.strip()
if not name:
return None
# 1. 精确匹配
if name in STOCK_NAME_MAP:
return STOCK_NAME_MAP[name], name
# 2. 模糊匹配:遍历映射,看输入是否是某个股票名称的子串或反之
candidates = []
for stock_name, code in STOCK_NAME_MAP.items():
# 输入是股票名的子串(如"茅台"→"贵州茅台"
if name in stock_name:
candidates.append((stock_name, code))
# 股票名是输入的子串
elif stock_name in name:
candidates.append((stock_name, code))
if len(candidates) == 1:
return candidates[0][1], candidates[0][0]
elif len(candidates) > 1:
# 多个匹配时选最短的(最精确)
candidates.sort(key=lambda x: len(x[0]))
return candidates[0][1], candidates[0][0]
return None
def normalize_stock_code(code: str) -> str:
"""验证并标准化股票代码仅保留6位纯数字"""
code = code.strip()
# 去掉可能的前缀sh/SH/sz/SZ/bj/BJ
code = re.sub(r"^(sh|sz|bj)", "", code, flags=re.IGNORECASE)
if re.match(r"^\d{6}$", code):
return code
# 如果传入了完整代码但包含其他字符,尝试提取数字
digits = re.findall(r"\d", code)
if len(digits) >= 6:
return "".join(digits[:6])
return ""
def get_licence() -> str:
"""获取 licence优先从环境变量读取"""
return os.environ.get("STOCK_LICENCE", "").strip() or DEFAULT_LICENCE
def fetch_stock_quote(stock_code: str) -> dict | None:
"""获取股票行情数据并解析为字典"""
licence = get_licence()
url = QUOTE_API_URL.format(stock_code, licence)
try:
req = urllib.request.Request(url, headers={"User-Agent": "Mozilla/5.0"})
with urllib.request.urlopen(req, timeout=10) as response:
payload = json.load(response)
except (urllib.error.URLError, TimeoutError, json.JSONDecodeError, OSError):
return None
# 处理 JSON 数组返回格式 [{}, ...]
if isinstance(payload, list) and len(payload) > 0:
payload = payload[0]
if not isinstance(payload, dict):
return None
# 检查接口返回是否包含有效数据
if "p" not in payload:
return None
return payload
def format_stock_message(data: dict, stock_code: str = "", stock_name: str = "") -> str:
"""格式化股票信息为可读文本"""
p = data.get("p", 0) or 0
pc = data.get("pc", 0) or 0
ud = data.get("ud", 0) or 0
up_icon = "📈" if float(pc) >= 0 else "📉"
if stock_name:
title = f"{stock_name}({stock_code})】"
elif stock_code:
title = f"{stock_code}"
else:
title = "【查询结果】"
lines = [
title,
f"{'=' * 20}",
f"{up_icon} 当前价:{_fmt(p)}",
f"📊 涨跌幅:{_fmt(pc)}%",
f"📊 涨跌额:{_fmt(ud)}",
f"⬆️ 最高:{_fmt(data.get('h'))}",
f"⬇️ 最低:{_fmt(data.get('l'))}",
f"🔓 今开:{_fmt(data.get('o'))}",
f"🔒 昨收:{_fmt(data.get('yc'))}",
f"📈 成交量:{_fmt(data.get('v'))}",
f"💰 成交额:{_fmt(data.get('cje'))}",
f"🔄 换手率:{_fmt(data.get('hs'))}%",
f"📐 振幅:{_fmt(data.get('zf'))}%",
f"📊 市盈率:{_fmt(data.get('pe'))}",
f"📊 市净率:{_fmt(data.get('sjl'))}",
f"📊 量比:{_fmt(data.get('lb'))}",
f"⚡ 涨速:{_fmt(data.get('zs'))}%",
f"⏱ 五分钟涨跌:{_fmt(data.get('fm'))}%",
f"📅 60日涨跌{_fmt(data.get('zdf60'))}%",
f"📅 年初至今:{_fmt(data.get('zdfnc'))}%",
f"📊 总市值:{_fmt(data.get('sz'))}",
f"📊 流通市值:{_fmt(data.get('lt'))}",
f"{'=' * 20}",
f"更新时间:{data.get('t', '未知')}",
]
return "\n".join(lines)
def _fmt(val) -> str:
"""统一格式化数值金融数据保留2位小数"""
if val is None:
return "0"
if isinstance(val, (int, float)):
return f"{val:.2f}"
return str(val)
def send_text(text: str) -> bool:
"""发送文本消息到微信机器人"""
robot_port = os.environ.get("ROBOT_WECHAT_CLIENT_PORT", "").strip()
to_wxid = os.environ.get("ROBOT_FROM_WX_ID", "").strip()
if not robot_port or not to_wxid:
return False
api_url = (
f"http://127.0.0.1:{robot_port}/api/v1/robot/message/send/text"
)
body = json.dumps(
{
"to_wxid": to_wxid,
"content": text,
}
).encode("utf-8")
request = urllib.request.Request(
api_url,
data=body,
headers={"Content-Type": "application/json"},
method="POST",
)
try:
with urllib.request.urlopen(request, timeout=10) as response:
if 200 <= response.status < 300:
return True
payload = json.load(response)
except (urllib.error.URLError, TimeoutError, json.JSONDecodeError):
return False
code = payload.get("code")
return code == 200 or code == 0
def main() -> int:
args = sys.argv[1:]
if not args:
sys.stdout.write("请提供股票代码或名称例如python3 stock.py 600519 或 python3 stock.py 贵州茅台\n")
return 0
raw = args[0]
stock_code = ""
stock_name = ""
# 第一步:尝试名称映射
result = resolve_stock_name(raw)
if result:
stock_code, stock_name = result
else:
# 第二步:尝试数字代码解析
stock_code = normalize_stock_code(raw)
if not stock_code and re.search(r"[\u4e00-\u9fff]", raw):
# 输入包含中文 → 明显是想用名称查,但映射表里没有
sys.stdout.write(f"未找到「{raw}」对应的股票,请使用股票代码查询,例如 600519\n")
sys.stdout.write("提示:如需添加新股票名称映射,请联系管理员。\n")
return 0
if not stock_code:
sys.stdout.write(FALLBACK_TEXT)
sys.stdout.write("\n")
return 0
# 防重复5秒内相同股票不重复查询
if not _acquire_lock(stock_code):
return 0
data = fetch_stock_quote(stock_code)
if not data:
sys.stdout.write(FALLBACK_TEXT)
sys.stdout.write("\n")
return 0
message = format_stock_message(data, stock_code, stock_name)
if send_text(message):
return 0
# 发送失败输出到stdout让宿主机器人捕获
sys.stdout.write(message)
sys.stdout.write("\n")
return 0
if __name__ == "__main__":
try:
raise SystemExit(main())
except SystemExit:
raise
except Exception:
traceback.print_exc(file=sys.stdout)
raise SystemExit(1)