182 lines
5.5 KiB
Python
182 lines
5.5 KiB
Python
#!/usr/bin/env python3
|
||
from __future__ import annotations
|
||
import io
|
||
import json
|
||
import os
|
||
import re
|
||
import sys
|
||
import traceback
|
||
import urllib.error
|
||
import urllib.request
|
||
|
||
# 将 stdout 编码设置为 UTF-8,确保 emoji 等字符能正常输出
|
||
if hasattr(sys.stdout, "reconfigure"):
|
||
sys.stdout.reconfigure(encoding="utf-8")
|
||
else:
|
||
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8")
|
||
sys.stderr = sys.stdout
|
||
|
||
QUOTE_API_URL = "https://qt.gtimg.cn/q={}"
|
||
FALLBACK_TEXT = "股票查询失败,请稍后再试。"
|
||
|
||
|
||
def normalize_stock_code(code: str) -> str:
|
||
"""自动补全股票代码市场前缀"""
|
||
code = code.strip().upper()
|
||
# 如果已经带前缀,直接返回
|
||
if code.startswith("SH") or code.startswith("SZ"):
|
||
return code
|
||
# 如果已经是带 sh/sz 前缀
|
||
if re.match(r"^(sh|sz)\d{6}$", code, re.IGNORECASE):
|
||
return code.upper()
|
||
# 纯数字处理
|
||
if re.match(r"^\d{6}$", code):
|
||
# 沪市:6开头
|
||
if code.startswith("6"):
|
||
return f"SH{code}"
|
||
# 深市:0或3开头
|
||
elif code.startswith(("0", "3")):
|
||
return f"SZ{code}"
|
||
# 北交所:8开头
|
||
elif code.startswith(("8", "4")):
|
||
return f"BJ{code}"
|
||
# 无法识别
|
||
return code
|
||
|
||
|
||
def parse_market_prefix(code: str) -> str:
|
||
"""将 SH/SZ 转换为 qq 接口使用的 sh/sz 前缀"""
|
||
code = code.upper()
|
||
if code.startswith("SH"):
|
||
return f"sh{code[2:]}"
|
||
elif code.startswith("SZ"):
|
||
return f"sz{code[2:]}"
|
||
elif code.startswith("BJ"):
|
||
return f"bj{code[2:]}"
|
||
return code
|
||
|
||
|
||
def fetch_stock_quote(stock_code: str) -> dict | None:
|
||
"""获取股票行情数据并解析为字典"""
|
||
try:
|
||
url = QUOTE_API_URL.format(stock_code)
|
||
with urllib.request.urlopen(url, timeout=10) as response:
|
||
raw = response.read().decode("gbk")
|
||
except (urllib.error.URLError, TimeoutError, OSError):
|
||
return None
|
||
|
||
# 解析返回数据,格式如:v_sh600519="...";
|
||
match = re.search(r'="([^"]+)"', raw)
|
||
if not match:
|
||
return None
|
||
|
||
fields = match.group(1).split("~")
|
||
if len(fields) < 45:
|
||
return None
|
||
|
||
return {
|
||
"name": fields[1], # 股票名称
|
||
"code": fields[2], # 股票代码
|
||
"price": fields[3], # 当前价格
|
||
"yest_close": fields[4], # 昨收价
|
||
"open": fields[5], # 今开价
|
||
"volume": fields[6], # 成交量(手)
|
||
"amount": fields[37], # 成交额(万)
|
||
"high": fields[33], # 最高价
|
||
"low": fields[34], # 最低价
|
||
"change": fields[31], # 涨跌额
|
||
"change_pct": fields[32], # 涨跌幅(%)
|
||
}
|
||
|
||
|
||
def format_stock_message(data: dict) -> str:
|
||
"""格式化股票信息为可读文本"""
|
||
up_icon = "📈" if float(data.get("change", 0)) >= 0 else "📉"
|
||
lines = [
|
||
f"【{data['name']}({data['code']})】",
|
||
f"{'=' * 20}",
|
||
f"{up_icon} 当前价:{data['price']}",
|
||
f"📊 涨跌幅:{data['change_pct']}%",
|
||
f"📊 涨跌额:{data['change']}",
|
||
f"⬆️ 最高:{data['high']}",
|
||
f"⬇️ 最低:{data['low']}",
|
||
f"🔓 今开:{data['open']}",
|
||
f"🔒 昨收:{data['yest_close']}",
|
||
f"📈 成交量:{data['volume']}",
|
||
f"💰 成交额:{data['amount']}万",
|
||
]
|
||
return "\n".join(lines)
|
||
|
||
|
||
def send_text(text: str) -> bool:
|
||
"""发送文本消息到微信机器人"""
|
||
robot_port = os.environ.get("ROBOT_WECHAT_CLIENT_PORT", "").strip()
|
||
to_wxid = os.environ.get("ROBOT_FROM_WX_ID", "").strip()
|
||
if not robot_port or not to_wxid:
|
||
return False
|
||
|
||
api_url = (
|
||
f"http://127.0.0.1:{robot_port}/api/v1/robot/message/send/text"
|
||
)
|
||
body = json.dumps(
|
||
{
|
||
"to_wxid": to_wxid,
|
||
"content": text,
|
||
}
|
||
).encode("utf-8")
|
||
|
||
request = urllib.request.Request(
|
||
api_url,
|
||
data=body,
|
||
headers={"Content-Type": "application/json"},
|
||
method="POST",
|
||
)
|
||
|
||
try:
|
||
with urllib.request.urlopen(request, timeout=10) as response:
|
||
if 200 <= response.status < 300:
|
||
return True
|
||
payload = json.load(response)
|
||
except (urllib.error.URLError, TimeoutError, json.JSONDecodeError):
|
||
return False
|
||
|
||
code = payload.get("code")
|
||
return code == 200 or code == 0
|
||
|
||
|
||
def main() -> int:
|
||
args = sys.argv[1:]
|
||
if not args:
|
||
sys.stdout.write("请提供股票代码,例如:python3 stock.py 600519\n")
|
||
return 0
|
||
|
||
raw_code = args[0]
|
||
normalized = normalize_stock_code(raw_code)
|
||
quoted_code = parse_market_prefix(normalized)
|
||
|
||
# 在QQ接口中,多个股票用逗号分隔
|
||
data = fetch_stock_quote(quoted_code)
|
||
if not data:
|
||
sys.stdout.write(FALLBACK_TEXT)
|
||
sys.stdout.write("\n")
|
||
return 0
|
||
|
||
message = format_stock_message(data)
|
||
if send_text(message):
|
||
return 0
|
||
|
||
# 发送失败,输出到stdout让宿主机器人捕获
|
||
sys.stdout.write(message)
|
||
sys.stdout.write("\n")
|
||
return 0
|
||
|
||
|
||
if __name__ == "__main__":
|
||
try:
|
||
raise SystemExit(main())
|
||
except SystemExit:
|
||
raise
|
||
except Exception:
|
||
traceback.print_exc(file=sys.stdout)
|
||
raise SystemExit(1)
|