st/tp/beauty.py
2026-05-19 13:14:57 +08:00

118 lines
3.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
from __future__ import annotations
import io
import json
import os
import sys
import traceback
import urllib.error
import urllib.request
# 将 stdout 编码设置为 UTF-8确保 emoji 等字符能正常输出
if hasattr(sys.stdout, "reconfigure"):
sys.stdout.reconfigure(encoding="utf-8")
else:
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8")
sys.stderr = sys.stdout
# 三个图片接口
API_LIST = [
{"name": "美女图片", "url": "https://api.ust1.cc/api/meinvpic?return=302"},
{"name": "白色丝袜", "url": "https://api.ust1.cc/api/baisi?return=302"},
{"name": "黑色丝袜", "url": "https://api.ust1.cc/api/heisi?return=302"},
]
DEFAULT_KEY = "14e655df72c1b429"
FALLBACK_TEXT = "今天的美女图片暂时没拿到,等我再找找。"
def get_key() -> str:
return os.environ.get("BEAUTY_KEY", "").strip() or DEFAULT_KEY
class NoRedirect(urllib.request.HTTPRedirectHandler):
"""不跟随重定向,直接返回响应(用于捕获 302 Location"""
def http_error_302(self, req, fp, code, msg, headers):
return fp
def fetch_single_image(api: dict) -> str | None:
"""从单个接口获取图片地址通过302重定向的Location头"""
key = get_key()
opener = urllib.request.build_opener(NoRedirect)
req = urllib.request.Request(api["url"], headers={"key": key})
try:
resp = opener.open(req, timeout=10)
location = resp.headers.get("Location")
if location:
return location.strip()
except (urllib.error.URLError, TimeoutError, OSError):
pass
return None
def fetch_all_images() -> list[str]:
"""从全部3个接口获取图片返回图片地址列表"""
urls = []
for api in API_LIST:
url = fetch_single_image(api)
if url:
urls.append(url)
return urls
def send_images(image_urls: list[str]) -> bool:
robot_port = os.environ.get("ROBOT_WECHAT_CLIENT_PORT", "").strip()
to_wxid = os.environ.get("ROBOT_FROM_WX_ID", "").strip()
if not robot_port or not to_wxid:
return False
api_url = (
f"http://127.0.0.1:{robot_port}/api/v1/robot/message/send/image/url"
)
body = json.dumps(
{
"to_wxid": to_wxid,
"image_urls": image_urls,
}
).encode("utf-8")
request = urllib.request.Request(
api_url,
data=body,
headers={"Content-Type": "application/json"},
method="POST",
)
try:
with urllib.request.urlopen(request, timeout=10) as response:
if 200 <= response.status < 300:
return True
payload = json.load(response)
except (urllib.error.URLError, TimeoutError, json.JSONDecodeError):
return False
code = payload.get("code")
return code == 200 or code == 0
def main() -> int:
urls = fetch_all_images()
if len(urls) >= 2 and send_images(urls):
return 0
sys.stdout.write(FALLBACK_TEXT)
sys.stdout.write("\n")
return 0
if __name__ == "__main__":
try:
raise SystemExit(main())
except SystemExit:
raise
except Exception:
traceback.print_exc(file=sys.stdout)
raise SystemExit(1)