删除 stock.py

This commit is contained in:
lj091715 2026-05-19 09:42:44 +08:00
parent 91e71dcfe2
commit b68542ae34

181
stock.py
View File

@ -1,181 +0,0 @@
#!/usr/bin/env python3
from __future__ import annotations
import io
import json
import os
import re
import sys
import traceback
import urllib.error
import urllib.request
# 将 stdout 编码设置为 UTF-8确保 emoji 等字符能正常输出
if hasattr(sys.stdout, "reconfigure"):
sys.stdout.reconfigure(encoding="utf-8")
else:
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8")
sys.stderr = sys.stdout
QUOTE_API_URL = "https://qt.gtimg.cn/q={}"
FALLBACK_TEXT = "股票查询失败,请稍后再试。"
def normalize_stock_code(code: str) -> str:
"""自动补全股票代码市场前缀"""
code = code.strip().upper()
# 如果已经带前缀,直接返回
if code.startswith("SH") or code.startswith("SZ"):
return code
# 如果已经是带 sh/sz 前缀
if re.match(r"^(sh|sz)\d{6}$", code, re.IGNORECASE):
return code.upper()
# 纯数字处理
if re.match(r"^\d{6}$", code):
# 沪市6开头
if code.startswith("6"):
return f"SH{code}"
# 深市0或3开头
elif code.startswith(("0", "3")):
return f"SZ{code}"
# 北交所8开头
elif code.startswith(("8", "4")):
return f"BJ{code}"
# 无法识别
return code
def parse_market_prefix(code: str) -> str:
"""将 SH/SZ 转换为 qq 接口使用的 sh/sz 前缀"""
code = code.upper()
if code.startswith("SH"):
return f"sh{code[2:]}"
elif code.startswith("SZ"):
return f"sz{code[2:]}"
elif code.startswith("BJ"):
return f"bj{code[2:]}"
return code
def fetch_stock_quote(stock_code: str) -> dict | None:
"""获取股票行情数据并解析为字典"""
try:
url = QUOTE_API_URL.format(stock_code)
with urllib.request.urlopen(url, timeout=10) as response:
raw = response.read().decode("gbk")
except (urllib.error.URLError, TimeoutError, OSError):
return None
# 解析返回数据格式如v_sh600519="...";
match = re.search(r'="([^"]+)"', raw)
if not match:
return None
fields = match.group(1).split("~")
if len(fields) < 45:
return None
return {
"name": fields[1], # 股票名称
"code": fields[2], # 股票代码
"price": fields[3], # 当前价格
"yest_close": fields[4], # 昨收价
"open": fields[5], # 今开价
"volume": fields[6], # 成交量(手)
"amount": fields[37], # 成交额(万)
"high": fields[33], # 最高价
"low": fields[34], # 最低价
"change": fields[31], # 涨跌额
"change_pct": fields[32], # 涨跌幅(%)
}
def format_stock_message(data: dict) -> str:
"""格式化股票信息为可读文本"""
up_icon = "📈" if float(data.get("change", 0)) >= 0 else "📉"
lines = [
f"{data['name']}({data['code']})】",
f"{'=' * 20}",
f"{up_icon} 当前价:{data['price']}",
f"📊 涨跌幅:{data['change_pct']}%",
f"📊 涨跌额:{data['change']}",
f"⬆️ 最高:{data['high']}",
f"⬇️ 最低:{data['low']}",
f"🔓 今开:{data['open']}",
f"🔒 昨收:{data['yest_close']}",
f"📈 成交量:{data['volume']}",
f"💰 成交额:{data['amount']}",
]
return "\n".join(lines)
def send_text(text: str) -> bool:
"""发送文本消息到微信机器人"""
robot_port = os.environ.get("ROBOT_WECHAT_CLIENT_PORT", "").strip()
to_wxid = os.environ.get("ROBOT_FROM_WX_ID", "").strip()
if not robot_port or not to_wxid:
return False
api_url = (
f"http://127.0.0.1:{robot_port}/api/v1/robot/message/send/text"
)
body = json.dumps(
{
"to_wxid": to_wxid,
"content": text,
}
).encode("utf-8")
request = urllib.request.Request(
api_url,
data=body,
headers={"Content-Type": "application/json"},
method="POST",
)
try:
with urllib.request.urlopen(request, timeout=10) as response:
if 200 <= response.status < 300:
return True
payload = json.load(response)
except (urllib.error.URLError, TimeoutError, json.JSONDecodeError):
return False
code = payload.get("code")
return code == 200 or code == 0
def main() -> int:
args = sys.argv[1:]
if not args:
sys.stdout.write("请提供股票代码例如python3 stock.py 600519\n")
return 0
raw_code = args[0]
normalized = normalize_stock_code(raw_code)
quoted_code = parse_market_prefix(normalized)
# 在QQ接口中多个股票用逗号分隔
data = fetch_stock_quote(quoted_code)
if not data:
sys.stdout.write(FALLBACK_TEXT)
sys.stdout.write("\n")
return 0
message = format_stock_message(data)
if send_text(message):
return 0
# 发送失败输出到stdout让宿主机器人捕获
sys.stdout.write(message)
sys.stdout.write("\n")
return 0
if __name__ == "__main__":
try:
raise SystemExit(main())
except SystemExit:
raise
except Exception:
traceback.print_exc(file=sys.stdout)
raise SystemExit(1)