diff --git a/tp/scripts/beauty.py b/tp/scripts/beauty.py index 63b94fc..f8c7eb5 100644 --- a/tp/scripts/beauty.py +++ b/tp/scripts/beauty.py @@ -1,116 +1,149 @@ -#!/usr/bin/env python3 -from __future__ import annotations -import io -import json -import os -import sys -import traceback -import urllib.error -import urllib.request - -# 将 stdout 编码设置为 UTF-8,确保 emoji 等字符能正常输出 -if hasattr(sys.stdout, "reconfigure"): - sys.stdout.reconfigure(encoding="utf-8") -else: - sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8") -sys.stderr = sys.stdout - -# 三个图片接口 -API_LIST = [ - {"name": "美女图片", "url": "https://api.ust1.cc/api/meinvpic?return=302"}, - {"name": "白色丝袜", "url": "https://api.ust1.cc/api/baisi?return=302"}, - {"name": "黑色丝袜", "url": "https://api.ust1.cc/api/heisi?return=302"}, -] -DEFAULT_KEY = "14e655df72c1b429" -FALLBACK_TEXT = "今天的美女图片暂时没拿到,等我再找找。" - - -def get_key() -> str: - return os.environ.get("BEAUTY_KEY", "").strip() or DEFAULT_KEY - - -class NoRedirect(urllib.request.HTTPRedirectHandler): - """不跟随重定向,直接返回响应(用于捕获 302 Location)""" - def http_error_302(self, req, fp, code, msg, headers): - return fp - - -def fetch_single_image(api: dict) -> str | None: - """从单个接口获取图片地址(通过302重定向的Location头)""" - key = get_key() - opener = urllib.request.build_opener(NoRedirect) - req = urllib.request.Request(api["url"], headers={"key": key}) - - try: - resp = opener.open(req, timeout=10) - location = resp.headers.get("Location") - if location: - return location.strip() - except (urllib.error.URLError, TimeoutError, OSError): - pass - - return None - - -def fetch_all_images() -> list[str]: - """从全部3个接口获取图片,返回图片地址列表""" - urls = [] - for api in API_LIST: - url = fetch_single_image(api) - if url: - urls.append(url) - return urls - - -def send_images(image_urls: list[str]) -> bool: - robot_port = os.environ.get("ROBOT_WECHAT_CLIENT_PORT", "").strip() - to_wxid = os.environ.get("ROBOT_FROM_WX_ID", "").strip() - if not robot_port or not to_wxid: - return False - - api_url = ( - f"http://127.0.0.1:{robot_port}/api/v1/robot/message/send/image/url" - ) - body = json.dumps( - { - "to_wxid": to_wxid, - "image_urls": image_urls, - } - ).encode("utf-8") - - request = urllib.request.Request( - api_url, - data=body, - headers={"Content-Type": "application/json"}, - method="POST", - ) - - try: - with urllib.request.urlopen(request, timeout=10) as response: - if 200 <= response.status < 300: - return True - payload = json.load(response) - except (urllib.error.URLError, TimeoutError, json.JSONDecodeError): - return False - - code = payload.get("code") - return code == 200 or code == 0 - - -def main() -> int: - urls = fetch_all_images() - if len(urls) >= 2 and send_images(urls): - return 0 - sys.stdout.write(FALLBACK_TEXT) - sys.stdout.write("\n") - return 0 - - -if __name__ == "__main__": - try: - raise SystemExit(main()) - except SystemExit: - raise - except Exception: - traceback.print_exc(file=sys.stdout) - raise SystemExit(1) +#!/usr/bin/env python3 +from __future__ import annotations +import io +import json +import os +import sys +import traceback +import urllib.error +import urllib.request + +# 将 stdout 编码设置为 UTF-8 +if hasattr(sys.stdout, "reconfigure"): + sys.stdout.reconfigure(encoding="utf-8") +else: + sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8") +sys.stderr = sys.stdout + +# 三个图片接口 +API_LIST = [ + {"name": "meinvpic", "url": "https://api.ust1.cc/api/meinvpic?return=302"}, + {"name": "baisi", "url": "https://api.ust1.cc/api/baisi?return=302"}, + {"name": "heisi", "url": "https://api.ust1.cc/api/heisi?return=302"}, +] +DEFAULT_KEY = "14e655df72c1b429" +FALLBACK_TEXT = "今天的美女图片暂时没拿到,等我再找找。" + + +def get_key() -> str: + return os.environ.get("BEAUTY_KEY", "").strip() or DEFAULT_KEY + + +class NoRedirectHandler(urllib.request.HTTPRedirectHandler): + """不跟随重定向,让 open() 直接返回 302 响应""" + def http_error_302(self, req, fp, code, msg, headers): + return fp + http_error_301 = http_error_302 + http_error_303 = http_error_302 + http_error_307 = http_error_302 + + +def fetch_single_image(api: dict) -> str | None: + """从单个接口获取图片地址(通过302重定向的Location头)""" + key = get_key() + + # 方式1:通过 NoRedirectHandler 捕获 302 的 Location + try: + opener = urllib.request.build_opener(NoRedirectHandler()) + req = urllib.request.Request(api["url"], headers={"key": key}) + resp = opener.open(req, timeout=10) + location = resp.headers.get("Location") + if location: + return location.strip() + # 方式2:如果没Location头(可能被跟随了重定向),尝试读响应body中的URL + body = resp.read().decode("utf-8", errors="replace") + sys.stdout.write(f"[debug] {api['name']}: 无Location, body={body[:100]}\n") + except urllib.error.HTTPError as e: + # 某些Python版本 NoRedirectHandler 仍会抛异常 + if e.code in (301, 302, 303, 307): + location = e.headers.get("Location") + if location: + return location.strip() + sys.stdout.write(f"[debug] {api['name']}: HTTP {e.code}\n") + except urllib.error.URLError as e: + sys.stdout.write(f"[debug] {api['name']}: 网络错误 {e.reason}\n") + except TimeoutError: + sys.stdout.write(f"[debug] {api['name']}: 请求超时\n") + except OSError as e: + sys.stdout.write(f"[debug] {api['name']}: 系统错误 {e}\n") + + return None + + +def fetch_all_images() -> list[str]: + """从全部3个接口获取图片,返回图片地址列表""" + urls = [] + for api in API_LIST: + url = fetch_single_image(api) + if url: + urls.append(url) + return urls + + +def send_images(image_urls: list[str]) -> bool: + robot_port = os.environ.get("ROBOT_WECHAT_CLIENT_PORT", "").strip() + to_wxid = os.environ.get("ROBOT_FROM_WX_ID", "").strip() + if not robot_port or not to_wxid: + sys.stdout.write(f"[debug] 环境变量缺失: PORT={robot_port!r} WXID={to_wxid!r}\n") + return False + + api_url = ( + f"http://127.0.0.1:{robot_port}/api/v1/robot/message/send/image/url" + ) + body = json.dumps( + { + "to_wxid": to_wxid, + "image_urls": image_urls, + } + ).encode("utf-8") + + request = urllib.request.Request( + api_url, + data=body, + headers={"Content-Type": "application/json"}, + method="POST", + ) + + try: + with urllib.request.urlopen(request, timeout=10) as response: + if 200 <= response.status < 300: + return True + payload = json.load(response) + except urllib.error.URLError as e: + sys.stdout.write(f"[debug] 发图失败: 无法连接机器人 {e.reason}\n") + return False + except TimeoutError: + sys.stdout.write("[debug] 发图失败: 超时\n") + return False + except json.JSONDecodeError: + sys.stdout.write("[debug] 发图失败: 返回不是JSON\n") + return False + except OSError: + return False + + code = payload.get("code") + ok = code == 200 or code == 0 + if not ok: + sys.stdout.write(f"[debug] 发图失败: code={code}\n") + return ok + + +def main() -> int: + urls = fetch_all_images() + if urls: + sys.stdout.write(f"[debug] 成功获取 {len(urls)} 张图片\n") + if len(urls) >= 2 and send_images(urls): + return 0 + sys.stdout.write(FALLBACK_TEXT) + sys.stdout.write("\n") + return 0 + + +if __name__ == "__main__": + try: + raise SystemExit(main()) + except SystemExit: + raise + except Exception: + traceback.print_exc(file=sys.stdout) + raise SystemExit(1)