150 lines
4.7 KiB
Python
150 lines
4.7 KiB
Python
#!/usr/bin/env python3
|
||
from __future__ import annotations
|
||
import io
|
||
import json
|
||
import os
|
||
import sys
|
||
import traceback
|
||
import urllib.error
|
||
import urllib.request
|
||
|
||
# 将 stdout 编码设置为 UTF-8
|
||
if hasattr(sys.stdout, "reconfigure"):
|
||
sys.stdout.reconfigure(encoding="utf-8")
|
||
else:
|
||
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8")
|
||
sys.stderr = sys.stdout
|
||
|
||
# 三个图片接口
|
||
API_LIST = [
|
||
{"name": "meinvpic", "url": "https://api.ust1.cc/api/meinvpic?return=302"},
|
||
{"name": "baisi", "url": "https://api.ust1.cc/api/baisi?return=302"},
|
||
{"name": "heisi", "url": "https://api.ust1.cc/api/heisi?return=302"},
|
||
]
|
||
DEFAULT_KEY = "14e655df72c1b429"
|
||
FALLBACK_TEXT = "今天的美女图片暂时没拿到,等我再找找。"
|
||
|
||
|
||
def get_key() -> str:
|
||
return os.environ.get("BEAUTY_KEY", "").strip() or DEFAULT_KEY
|
||
|
||
|
||
class NoRedirectHandler(urllib.request.HTTPRedirectHandler):
|
||
"""不跟随重定向,让 open() 直接返回 302 响应"""
|
||
def http_error_302(self, req, fp, code, msg, headers):
|
||
return fp
|
||
http_error_301 = http_error_302
|
||
http_error_303 = http_error_302
|
||
http_error_307 = http_error_302
|
||
|
||
|
||
def fetch_single_image(api: dict) -> str | None:
|
||
"""从单个接口获取图片地址(通过302重定向的Location头)"""
|
||
key = get_key()
|
||
|
||
# 方式1:通过 NoRedirectHandler 捕获 302 的 Location
|
||
try:
|
||
opener = urllib.request.build_opener(NoRedirectHandler())
|
||
req = urllib.request.Request(api["url"], headers={"key": key})
|
||
resp = opener.open(req, timeout=10)
|
||
location = resp.headers.get("Location")
|
||
if location:
|
||
return location.strip()
|
||
# 方式2:如果没Location头(可能被跟随了重定向),尝试读响应body中的URL
|
||
body = resp.read().decode("utf-8", errors="replace")
|
||
sys.stdout.write(f"[debug] {api['name']}: 无Location, body={body[:100]}\n")
|
||
except urllib.error.HTTPError as e:
|
||
# 某些Python版本 NoRedirectHandler 仍会抛异常
|
||
if e.code in (301, 302, 303, 307):
|
||
location = e.headers.get("Location")
|
||
if location:
|
||
return location.strip()
|
||
sys.stdout.write(f"[debug] {api['name']}: HTTP {e.code}\n")
|
||
except urllib.error.URLError as e:
|
||
sys.stdout.write(f"[debug] {api['name']}: 网络错误 {e.reason}\n")
|
||
except TimeoutError:
|
||
sys.stdout.write(f"[debug] {api['name']}: 请求超时\n")
|
||
except OSError as e:
|
||
sys.stdout.write(f"[debug] {api['name']}: 系统错误 {e}\n")
|
||
|
||
return None
|
||
|
||
|
||
def fetch_all_images() -> list[str]:
|
||
"""从全部3个接口获取图片,返回图片地址列表"""
|
||
urls = []
|
||
for api in API_LIST:
|
||
url = fetch_single_image(api)
|
||
if url:
|
||
urls.append(url)
|
||
return urls
|
||
|
||
|
||
def send_images(image_urls: list[str]) -> bool:
|
||
robot_port = os.environ.get("ROBOT_WECHAT_CLIENT_PORT", "").strip()
|
||
to_wxid = os.environ.get("ROBOT_FROM_WX_ID", "").strip()
|
||
if not robot_port or not to_wxid:
|
||
sys.stdout.write(f"[debug] 环境变量缺失: PORT={robot_port!r} WXID={to_wxid!r}\n")
|
||
return False
|
||
|
||
api_url = (
|
||
f"http://127.0.0.1:{robot_port}/api/v1/robot/message/send/image/url"
|
||
)
|
||
body = json.dumps(
|
||
{
|
||
"to_wxid": to_wxid,
|
||
"image_urls": image_urls,
|
||
}
|
||
).encode("utf-8")
|
||
|
||
request = urllib.request.Request(
|
||
api_url,
|
||
data=body,
|
||
headers={"Content-Type": "application/json"},
|
||
method="POST",
|
||
)
|
||
|
||
try:
|
||
with urllib.request.urlopen(request, timeout=30) as response:
|
||
if 200 <= response.status < 300:
|
||
return True
|
||
payload = json.load(response)
|
||
except urllib.error.URLError as e:
|
||
sys.stdout.write(f"[debug] 发图失败: 无法连接机器人 {e.reason}\n")
|
||
return False
|
||
except TimeoutError:
|
||
sys.stdout.write("[debug] 发图失败: 超时\n")
|
||
return False
|
||
except json.JSONDecodeError:
|
||
sys.stdout.write("[debug] 发图失败: 返回不是JSON\n")
|
||
return False
|
||
except OSError:
|
||
return False
|
||
|
||
code = payload.get("code")
|
||
ok = code == 200 or code == 0
|
||
if not ok:
|
||
sys.stdout.write(f"[debug] 发图失败: code={code}\n")
|
||
return ok
|
||
|
||
|
||
def main() -> int:
|
||
urls = fetch_all_images()
|
||
if urls:
|
||
sys.stdout.write(f"[debug] 成功获取 {len(urls)} 张图片\n")
|
||
if len(urls) >= 2 and send_images(urls):
|
||
return 0
|
||
sys.stdout.write(FALLBACK_TEXT)
|
||
sys.stdout.write("\n")
|
||
return 0
|
||
|
||
|
||
if __name__ == "__main__":
|
||
try:
|
||
raise SystemExit(main())
|
||
except SystemExit:
|
||
raise
|
||
except Exception:
|
||
traceback.print_exc(file=sys.stdout)
|
||
raise SystemExit(1)
|