diff --git a/tp/scripts/beauty.py b/tp/scripts/beauty.py new file mode 100644 index 0000000..63b94fc --- /dev/null +++ b/tp/scripts/beauty.py @@ -0,0 +1,116 @@ +#!/usr/bin/env python3 +from __future__ import annotations +import io +import json +import os +import sys +import traceback +import urllib.error +import urllib.request + +# 将 stdout 编码设置为 UTF-8,确保 emoji 等字符能正常输出 +if hasattr(sys.stdout, "reconfigure"): + sys.stdout.reconfigure(encoding="utf-8") +else: + sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8") +sys.stderr = sys.stdout + +# 三个图片接口 +API_LIST = [ + {"name": "美女图片", "url": "https://api.ust1.cc/api/meinvpic?return=302"}, + {"name": "白色丝袜", "url": "https://api.ust1.cc/api/baisi?return=302"}, + {"name": "黑色丝袜", "url": "https://api.ust1.cc/api/heisi?return=302"}, +] +DEFAULT_KEY = "14e655df72c1b429" +FALLBACK_TEXT = "今天的美女图片暂时没拿到,等我再找找。" + + +def get_key() -> str: + return os.environ.get("BEAUTY_KEY", "").strip() or DEFAULT_KEY + + +class NoRedirect(urllib.request.HTTPRedirectHandler): + """不跟随重定向,直接返回响应(用于捕获 302 Location)""" + def http_error_302(self, req, fp, code, msg, headers): + return fp + + +def fetch_single_image(api: dict) -> str | None: + """从单个接口获取图片地址(通过302重定向的Location头)""" + key = get_key() + opener = urllib.request.build_opener(NoRedirect) + req = urllib.request.Request(api["url"], headers={"key": key}) + + try: + resp = opener.open(req, timeout=10) + location = resp.headers.get("Location") + if location: + return location.strip() + except (urllib.error.URLError, TimeoutError, OSError): + pass + + return None + + +def fetch_all_images() -> list[str]: + """从全部3个接口获取图片,返回图片地址列表""" + urls = [] + for api in API_LIST: + url = fetch_single_image(api) + if url: + urls.append(url) + return urls + + +def send_images(image_urls: list[str]) -> bool: + robot_port = os.environ.get("ROBOT_WECHAT_CLIENT_PORT", "").strip() + to_wxid = os.environ.get("ROBOT_FROM_WX_ID", "").strip() + if not robot_port or not to_wxid: + return False + + api_url = ( + f"http://127.0.0.1:{robot_port}/api/v1/robot/message/send/image/url" + ) + body = json.dumps( + { + "to_wxid": to_wxid, + "image_urls": image_urls, + } + ).encode("utf-8") + + request = urllib.request.Request( + api_url, + data=body, + headers={"Content-Type": "application/json"}, + method="POST", + ) + + try: + with urllib.request.urlopen(request, timeout=10) as response: + if 200 <= response.status < 300: + return True + payload = json.load(response) + except (urllib.error.URLError, TimeoutError, json.JSONDecodeError): + return False + + code = payload.get("code") + return code == 200 or code == 0 + + +def main() -> int: + urls = fetch_all_images() + if len(urls) >= 2 and send_images(urls): + return 0 + sys.stdout.write(FALLBACK_TEXT) + sys.stdout.write("\n") + return 0 + + +if __name__ == "__main__": + try: + raise SystemExit(main()) + except SystemExit: + raise + except Exception: + traceback.print_exc(file=sys.stdout) + raise SystemExit(1)