更新 tp/beauty.py

This commit is contained in:
lj091715 2026-05-19 13:14:57 +08:00
parent e2eb13dbc3
commit 2b73653e4f

View File

@ -1,116 +1,117 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
from __future__ import annotations from __future__ import annotations
import io import io
import json import json
import os import os
import sys import sys
import traceback import traceback
import urllib.error import urllib.error
import urllib.request import urllib.request
# 将 stdout 编码设置为 UTF-8确保 emoji 等字符能正常输出 # 将 stdout 编码设置为 UTF-8确保 emoji 等字符能正常输出
if hasattr(sys.stdout, "reconfigure"): if hasattr(sys.stdout, "reconfigure"):
sys.stdout.reconfigure(encoding="utf-8") sys.stdout.reconfigure(encoding="utf-8")
else: else:
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8") sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8")
sys.stderr = sys.stdout sys.stderr = sys.stdout
# 三个图片接口 # 三个图片接口
API_LIST = [ API_LIST = [
{"name": "美女图片", "url": "https://api.ust1.cc/api/meinvpic?return=302"}, {"name": "美女图片", "url": "https://api.ust1.cc/api/meinvpic?return=302"},
{"name": "白色丝袜", "url": "https://api.ust1.cc/api/baisi?return=302"}, {"name": "白色丝袜", "url": "https://api.ust1.cc/api/baisi?return=302"},
{"name": "黑色丝袜", "url": "https://api.ust1.cc/api/heisi?return=302"}, {"name": "黑色丝袜", "url": "https://api.ust1.cc/api/heisi?return=302"},
] ]
DEFAULT_KEY = "14e655df72c1b429" DEFAULT_KEY = "14e655df72c1b429"
FALLBACK_TEXT = "今天的美女图片暂时没拿到,等我再找找。" FALLBACK_TEXT = "今天的美女图片暂时没拿到,等我再找找。"
def get_key() -> str: def get_key() -> str:
return os.environ.get("BEAUTY_KEY", "").strip() or DEFAULT_KEY return os.environ.get("BEAUTY_KEY", "").strip() or DEFAULT_KEY
class NoRedirect(urllib.request.HTTPRedirectHandler): class NoRedirect(urllib.request.HTTPRedirectHandler):
"""不跟随重定向,直接返回响应(用于捕获 302 Location""" """不跟随重定向,直接返回响应(用于捕获 302 Location"""
def http_error_302(self, req, fp, code, msg, headers): def http_error_302(self, req, fp, code, msg, headers):
return fp return fp
def fetch_single_image(api: dict) -> str | None: def fetch_single_image(api: dict) -> str | None:
"""从单个接口获取图片地址通过302重定向的Location头""" """从单个接口获取图片地址通过302重定向的Location头"""
key = get_key() key = get_key()
opener = urllib.request.build_opener(NoRedirect) opener = urllib.request.build_opener(NoRedirect)
req = urllib.request.Request(api["url"], headers={"key": key}) req = urllib.request.Request(api["url"], headers={"key": key})
try: try:
resp = opener.open(req, timeout=10) resp = opener.open(req, timeout=10)
location = resp.headers.get("Location") location = resp.headers.get("Location")
if location: if location:
return location.strip() return location.strip()
except (urllib.error.URLError, TimeoutError, OSError): except (urllib.error.URLError, TimeoutError, OSError):
pass pass
return None return None
def fetch_all_images() -> list[str]: def fetch_all_images() -> list[str]:
"""从全部3个接口获取图片返回图片地址列表""" """从全部3个接口获取图片返回图片地址列表"""
urls = [] urls = []
for api in API_LIST: for api in API_LIST:
url = fetch_single_image(api) url = fetch_single_image(api)
if url: if url:
urls.append(url) urls.append(url)
return urls return urls
def send_images(image_urls: list[str]) -> bool: def send_images(image_urls: list[str]) -> bool:
robot_port = os.environ.get("ROBOT_WECHAT_CLIENT_PORT", "").strip() robot_port = os.environ.get("ROBOT_WECHAT_CLIENT_PORT", "").strip()
to_wxid = os.environ.get("ROBOT_FROM_WX_ID", "").strip() to_wxid = os.environ.get("ROBOT_FROM_WX_ID", "").strip()
if not robot_port or not to_wxid: if not robot_port or not to_wxid:
return False return False
api_url = ( api_url = (
f"http://127.0.0.1:{robot_port}/api/v1/robot/message/send/image/url" f"http://127.0.0.1:{robot_port}/api/v1/robot/message/send/image/url"
) )
body = json.dumps( body = json.dumps(
{ {
"to_wxid": to_wxid, "to_wxid": to_wxid,
"image_urls": image_urls, "image_urls": image_urls,
} }
).encode("utf-8") ).encode("utf-8")
request = urllib.request.Request( request = urllib.request.Request(
api_url, api_url,
data=body, data=body,
headers={"Content-Type": "application/json"}, headers={"Content-Type": "application/json"},
method="POST", method="POST",
) )
try: try:
with urllib.request.urlopen(request, timeout=10) as response: with urllib.request.urlopen(request, timeout=10) as response:
if 200 <= response.status < 300: if 200 <= response.status < 300:
return True return True
payload = json.load(response) payload = json.load(response)
except (urllib.error.URLError, TimeoutError, json.JSONDecodeError): except (urllib.error.URLError, TimeoutError, json.JSONDecodeError):
return False return False
code = payload.get("code") code = payload.get("code")
return code == 200 or code == 0 return code == 200 or code == 0
def main() -> int: def main() -> int:
urls = fetch_all_images() urls = fetch_all_images()
if len(urls) >= 2 and send_images(urls): if len(urls) >= 2 and send_images(urls):
return 0 return 0
sys.stdout.write(FALLBACK_TEXT) sys.stdout.write(FALLBACK_TEXT)
sys.stdout.write("\n") sys.stdout.write("\n")
return 0 return 0
if __name__ == "__main__": if __name__ == "__main__":
try: try:
raise SystemExit(main()) raise SystemExit(main())
except SystemExit: except SystemExit:
raise raise
except Exception: except Exception:
traceback.print_exc(file=sys.stdout) traceback.print_exc(file=sys.stdout)
raise SystemExit(1) raise SystemExit(1)