更新 tp/scripts/beauty.py

This commit is contained in:
lj091715 2026-05-19 13:44:01 +08:00
parent 32ae60ad9a
commit 74266e8cce

View File

@ -1,116 +1,149 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
from __future__ import annotations from __future__ import annotations
import io import io
import json import json
import os import os
import sys import sys
import traceback import traceback
import urllib.error import urllib.error
import urllib.request import urllib.request
# 将 stdout 编码设置为 UTF-8确保 emoji 等字符能正常输出 # 将 stdout 编码设置为 UTF-8
if hasattr(sys.stdout, "reconfigure"): if hasattr(sys.stdout, "reconfigure"):
sys.stdout.reconfigure(encoding="utf-8") sys.stdout.reconfigure(encoding="utf-8")
else: else:
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8") sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8")
sys.stderr = sys.stdout sys.stderr = sys.stdout
# 三个图片接口 # 三个图片接口
API_LIST = [ API_LIST = [
{"name": "美女图片", "url": "https://api.ust1.cc/api/meinvpic?return=302"}, {"name": "meinvpic", "url": "https://api.ust1.cc/api/meinvpic?return=302"},
{"name": "白色丝袜", "url": "https://api.ust1.cc/api/baisi?return=302"}, {"name": "baisi", "url": "https://api.ust1.cc/api/baisi?return=302"},
{"name": "黑色丝袜", "url": "https://api.ust1.cc/api/heisi?return=302"}, {"name": "heisi", "url": "https://api.ust1.cc/api/heisi?return=302"},
] ]
DEFAULT_KEY = "14e655df72c1b429" DEFAULT_KEY = "14e655df72c1b429"
FALLBACK_TEXT = "今天的美女图片暂时没拿到,等我再找找。" FALLBACK_TEXT = "今天的美女图片暂时没拿到,等我再找找。"
def get_key() -> str: def get_key() -> str:
return os.environ.get("BEAUTY_KEY", "").strip() or DEFAULT_KEY return os.environ.get("BEAUTY_KEY", "").strip() or DEFAULT_KEY
class NoRedirect(urllib.request.HTTPRedirectHandler): class NoRedirectHandler(urllib.request.HTTPRedirectHandler):
"""不跟随重定向,直接返回响应(用于捕获 302 Location""" """不跟随重定向,让 open() 直接返回 302 响应"""
def http_error_302(self, req, fp, code, msg, headers): def http_error_302(self, req, fp, code, msg, headers):
return fp return fp
http_error_301 = http_error_302
http_error_303 = http_error_302
def fetch_single_image(api: dict) -> str | None: http_error_307 = http_error_302
"""从单个接口获取图片地址通过302重定向的Location头"""
key = get_key()
opener = urllib.request.build_opener(NoRedirect) def fetch_single_image(api: dict) -> str | None:
req = urllib.request.Request(api["url"], headers={"key": key}) """从单个接口获取图片地址通过302重定向的Location头"""
key = get_key()
try:
resp = opener.open(req, timeout=10) # 方式1通过 NoRedirectHandler 捕获 302 的 Location
location = resp.headers.get("Location") try:
if location: opener = urllib.request.build_opener(NoRedirectHandler())
return location.strip() req = urllib.request.Request(api["url"], headers={"key": key})
except (urllib.error.URLError, TimeoutError, OSError): resp = opener.open(req, timeout=10)
pass location = resp.headers.get("Location")
if location:
return None return location.strip()
# 方式2如果没Location头可能被跟随了重定向尝试读响应body中的URL
body = resp.read().decode("utf-8", errors="replace")
def fetch_all_images() -> list[str]: sys.stdout.write(f"[debug] {api['name']}: 无Location, body={body[:100]}\n")
"""从全部3个接口获取图片返回图片地址列表""" except urllib.error.HTTPError as e:
urls = [] # 某些Python版本 NoRedirectHandler 仍会抛异常
for api in API_LIST: if e.code in (301, 302, 303, 307):
url = fetch_single_image(api) location = e.headers.get("Location")
if url: if location:
urls.append(url) return location.strip()
return urls sys.stdout.write(f"[debug] {api['name']}: HTTP {e.code}\n")
except urllib.error.URLError as e:
sys.stdout.write(f"[debug] {api['name']}: 网络错误 {e.reason}\n")
def send_images(image_urls: list[str]) -> bool: except TimeoutError:
robot_port = os.environ.get("ROBOT_WECHAT_CLIENT_PORT", "").strip() sys.stdout.write(f"[debug] {api['name']}: 请求超时\n")
to_wxid = os.environ.get("ROBOT_FROM_WX_ID", "").strip() except OSError as e:
if not robot_port or not to_wxid: sys.stdout.write(f"[debug] {api['name']}: 系统错误 {e}\n")
return False
return None
api_url = (
f"http://127.0.0.1:{robot_port}/api/v1/robot/message/send/image/url"
) def fetch_all_images() -> list[str]:
body = json.dumps( """从全部3个接口获取图片返回图片地址列表"""
{ urls = []
"to_wxid": to_wxid, for api in API_LIST:
"image_urls": image_urls, url = fetch_single_image(api)
} if url:
).encode("utf-8") urls.append(url)
return urls
request = urllib.request.Request(
api_url,
data=body, def send_images(image_urls: list[str]) -> bool:
headers={"Content-Type": "application/json"}, robot_port = os.environ.get("ROBOT_WECHAT_CLIENT_PORT", "").strip()
method="POST", to_wxid = os.environ.get("ROBOT_FROM_WX_ID", "").strip()
) if not robot_port or not to_wxid:
sys.stdout.write(f"[debug] 环境变量缺失: PORT={robot_port!r} WXID={to_wxid!r}\n")
try: return False
with urllib.request.urlopen(request, timeout=10) as response:
if 200 <= response.status < 300: api_url = (
return True f"http://127.0.0.1:{robot_port}/api/v1/robot/message/send/image/url"
payload = json.load(response) )
except (urllib.error.URLError, TimeoutError, json.JSONDecodeError): body = json.dumps(
return False {
"to_wxid": to_wxid,
code = payload.get("code") "image_urls": image_urls,
return code == 200 or code == 0 }
).encode("utf-8")
def main() -> int: request = urllib.request.Request(
urls = fetch_all_images() api_url,
if len(urls) >= 2 and send_images(urls): data=body,
return 0 headers={"Content-Type": "application/json"},
sys.stdout.write(FALLBACK_TEXT) method="POST",
sys.stdout.write("\n") )
return 0
try:
with urllib.request.urlopen(request, timeout=10) as response:
if __name__ == "__main__": if 200 <= response.status < 300:
try: return True
raise SystemExit(main()) payload = json.load(response)
except SystemExit: except urllib.error.URLError as e:
raise sys.stdout.write(f"[debug] 发图失败: 无法连接机器人 {e.reason}\n")
except Exception: return False
traceback.print_exc(file=sys.stdout) except TimeoutError:
raise SystemExit(1) sys.stdout.write("[debug] 发图失败: 超时\n")
return False
except json.JSONDecodeError:
sys.stdout.write("[debug] 发图失败: 返回不是JSON\n")
return False
except OSError:
return False
code = payload.get("code")
ok = code == 200 or code == 0
if not ok:
sys.stdout.write(f"[debug] 发图失败: code={code}\n")
return ok
def main() -> int:
urls = fetch_all_images()
if urls:
sys.stdout.write(f"[debug] 成功获取 {len(urls)} 张图片\n")
if len(urls) >= 2 and send_images(urls):
return 0
sys.stdout.write(FALLBACK_TEXT)
sys.stdout.write("\n")
return 0
if __name__ == "__main__":
try:
raise SystemExit(main())
except SystemExit:
raise
except Exception:
traceback.print_exc(file=sys.stdout)
raise SystemExit(1)