Compare commits
No commits in common. "5dbae039d46773b666c834e6b29342ab62c4b88b" and "951096a0f28885e01d8ae128f4d257a1b5a40205" have entirely different histories.
5dbae039d4
...
951096a0f2
4
.vscode/launch.json
vendored
4
.vscode/launch.json
vendored
@ -9,11 +9,9 @@
|
||||
"console": "integratedTerminal",
|
||||
"justMyCode": true,
|
||||
"args": [
|
||||
"--prompt=马云在直播间卖红薯",
|
||||
"--model=gpt-image-2"
|
||||
"{\"prompt\":\"一只站在雨夜街头的白猫\",\"model\":\"jimeng-5.0\",\"negative_prompt\":\"模糊, 低清\",\"ratio\":\"16:9\",\"resolution\":\"2k\"}"
|
||||
],
|
||||
"env": {
|
||||
"ROBOT_WECHAT_CLIENT_PORT": "9001",
|
||||
"ROBOT_FROM_WX_ID": "57004904192@chatroom",
|
||||
"ROBOT_CODE": "houhouipad",
|
||||
"MYSQL_HOST": "127.0.0.1",
|
||||
|
||||
16
README.md
16
README.md
@ -55,8 +55,6 @@
|
||||
|
||||
**发送图片的时候也可以调用 Agent 接口**
|
||||
|
||||
1. 发送远程图片地址
|
||||
|
||||
```
|
||||
[POST] http://127.0.0.1:{ROBOT_WECHAT_CLIENT_PORT}/api/v1//robot/message/send/image/url
|
||||
|
||||
@ -69,20 +67,6 @@
|
||||
|
||||
```
|
||||
|
||||
2. 发送本地图片路径
|
||||
|
||||
```
|
||||
[POST] http://127.0.0.1:{ROBOT_WECHAT_CLIENT_PORT}/api/v1//robot/message/send/image/local
|
||||
|
||||
请求体 Body:
|
||||
|
||||
{
|
||||
"to_wxid": "{{ROBOT_FROM_WX_ID}}",
|
||||
"file_path": "{{file_path}}"
|
||||
}
|
||||
|
||||
```
|
||||
|
||||
**发送视频的时候也可以调用 Agent 接口**
|
||||
|
||||
```
|
||||
|
||||
@ -20,7 +20,7 @@ argument-hint: "无需参数,直接调用即可"
|
||||
|
||||
## 接口信息
|
||||
|
||||
- 获取图片地址:`https://api.pearapi.ai/api/today_wife`
|
||||
- 获取图片地址:`https://api.pearktrue.cn/api/today_wife`
|
||||
- 请求方式:`GET`
|
||||
- 发图接口:`http://127.0.0.1:{ROBOT_WECHAT_CLIENT_PORT}/api/v1/robot/message/send/image/url`
|
||||
- 请求方式:`POST`
|
||||
@ -32,12 +32,12 @@ argument-hint: "无需参数,直接调用即可"
|
||||
"code": 200,
|
||||
"msg": "获取成功",
|
||||
"data": {
|
||||
"image_url": "https://api.pearapi.ai/api_assets/wife/9a6a9c38-7d6e-464f-8930-eb9dac41cde9.webp",
|
||||
"image_url": "https://api.pearktrue.cn/api_assets/wife/9a6a9c38-7d6e-464f-8930-eb9dac41cde9.webp",
|
||||
"role_name": "初音未来、巡音流歌",
|
||||
"width": 2480,
|
||||
"height": 3508
|
||||
},
|
||||
"api_source": "官方API网:https://api.pearapi.ai/"
|
||||
"api_source": "官方API网:https://api.pearktrue.cn/"
|
||||
}
|
||||
```
|
||||
|
||||
@ -52,7 +52,7 @@ argument-hint: "无需参数,直接调用即可"
|
||||
|
||||
1. 当用户发送 `999` 时触发该技能。
|
||||
2. 在仓库根目录下执行本地脚本:`python3 scripts/beauty.py`。
|
||||
3. 脚本内部发送 `GET` 请求到 `https://api.pearapi.ai/api/today_wife`。
|
||||
3. 脚本内部发送 `GET` 请求到 `https://api.pearktrue.cn/api/today_wife`。
|
||||
4. 脚本解析返回的 JSON,并提取 `data.image_url`。
|
||||
5. 脚本从环境变量中读取 `ROBOT_WECHAT_CLIENT_PORT` 和 `ROBOT_FROM_WX_ID`。
|
||||
6. 脚本发送 `POST` 请求到 `http://127.0.0.1:{ROBOT_WECHAT_CLIENT_PORT}/api/v1/robot/message/send/image/url`,请求体为:
|
||||
|
||||
@ -13,7 +13,7 @@ import urllib.request
|
||||
sys.stderr = sys.stdout
|
||||
|
||||
|
||||
FETCH_API_URL = "https://api.pearapi.ai/api/today_wife"
|
||||
FETCH_API_URL = "https://api.pearktrue.cn/api/today_wife"
|
||||
FALLBACK_TEXT = "今天的美女图片暂时没拿到,等我再找找。"
|
||||
|
||||
|
||||
|
||||
@ -1,53 +0,0 @@
|
||||
---
|
||||
name: douyin-video-parse
|
||||
description: "当用户发送包含抖音短链接(https://v.douyin.com/xxx)的消息时触发。自动解析抖音视频/图片,并发送给当前用户。"
|
||||
argument-hint: "消息中包含抖音短链接即可自动触发"
|
||||
---
|
||||
|
||||
# Douyin Video Parse Skill
|
||||
|
||||
## 描述
|
||||
|
||||
这是一个用于解析抖音短视频/图片的技能。
|
||||
|
||||
当用户发送的消息中包含 `https://v.douyin.com/` 链接时,自动解析该链接对应的视频或图片,并通过本地微信机器人接口发送给当前用户。
|
||||
|
||||
这个仓库里额外提供了一个可执行脚本 `scripts/douyin_video_parse.py`,方便宿主机器人直接调用。
|
||||
|
||||
## 触发条件
|
||||
|
||||
- 用户消息中包含 `https://v.douyin.com/` 链接
|
||||
|
||||
## 解析原理
|
||||
|
||||
1. 访问抖音短链接,跟随 302 重定向获取真实页面 URL
|
||||
2. 请求真实页面 HTML,从中提取 `window._ROUTER_DATA` JSON 数据
|
||||
3. 从 JSON 中解析出视频播放地址或图片列表
|
||||
4. 通过本地微信机器人接口发送视频或图片
|
||||
|
||||
## 环境变量
|
||||
|
||||
- `ROBOT_WECHAT_CLIENT_PORT`:本地微信机器人服务端口。
|
||||
- `ROBOT_FROM_WX_ID`:当前消息来源用户的 wxid。
|
||||
- `ROBOT_MESSAGE_CONTENT`:用户发送的原始消息内容(用于提取抖音链接)。
|
||||
|
||||
## 执行步骤
|
||||
|
||||
1. 当用户消息中包含 `https://v.douyin.com/` 链接时触发该技能。
|
||||
2. 在仓库根目录下执行本地脚本:`python3 scripts/douyin_video_parse.py`。
|
||||
3. 脚本从环境变量 `ROBOT_MESSAGE_CONTENT` 中提取抖音短链接。
|
||||
4. 脚本访问短链接,跟随重定向获取真实页面 URL。
|
||||
5. 脚本请求真实页面,解析 `window._ROUTER_DATA` 中的视频/图片信息。
|
||||
6. 如果是视频:
|
||||
- 先发送分享卡片链接
|
||||
- 再调用 `POST http://127.0.0.1:{ROBOT_WECHAT_CLIENT_PORT}/api/v1/robot/message/send/video/url` 发送视频
|
||||
7. 如果是图片:
|
||||
- 发送文字提示(作者、标题、图片数量)
|
||||
- 调用 `POST http://127.0.0.1:{ROBOT_WECHAT_CLIENT_PORT}/api/v1/robot/message/send/image/url` 逐张发送图片
|
||||
8. 如果解析失败,回复兜底文案:`抖音解析失败,可能是链接已失效或格式不正确。`
|
||||
|
||||
## 回复要求
|
||||
|
||||
- 视频类型:发送视频文件,附带作者和标题信息。
|
||||
- 图片类型:发送所有图片,附带作者和标题信息。
|
||||
- 失败时,使用固定兜底文案回复。
|
||||
@ -1,345 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import html
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
import traceback
|
||||
import urllib.error
|
||||
import urllib.parse
|
||||
import urllib.request
|
||||
|
||||
|
||||
sys.stderr = sys.stdout
|
||||
|
||||
|
||||
DOUYIN_USER_AGENT = (
|
||||
"Mozilla/5.0 (iPhone; CPU iPhone OS 14_0 like Mac OS X) "
|
||||
"AppleWebKit/605.1.15 (KHTML, like Gecko) "
|
||||
"Version/14.0 Mobile/15E148 Safari/604.1"
|
||||
)
|
||||
DOUYIN_REFERER = "https://www.douyin.com/"
|
||||
FALLBACK_TEXT = "抖音解析失败,可能是链接已失效或格式不正确。"
|
||||
ROUTER_DATA_RE = re.compile(r"(?s)window\._ROUTER_DATA\s*=\s*(\{.*?\})\s*</script>")
|
||||
DOUYIN_URL_RE = re.compile(r"https://[^\s]+")
|
||||
|
||||
|
||||
def build_request(url: str) -> urllib.request.Request:
|
||||
return urllib.request.Request(
|
||||
url,
|
||||
headers={
|
||||
"User-Agent": DOUYIN_USER_AGENT,
|
||||
"Referer": DOUYIN_REFERER,
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
def resolve_redirect(short_url: str) -> str | None:
|
||||
"""Follow the 302 redirect to get the real page URL."""
|
||||
|
||||
class NoRedirectHandler(urllib.request.HTTPRedirectHandler):
|
||||
def redirect_request(self, req, fp, code, msg, headers, newurl):
|
||||
return None
|
||||
|
||||
opener = urllib.request.build_opener(NoRedirectHandler)
|
||||
req = build_request(short_url)
|
||||
try:
|
||||
response = opener.open(req, timeout=15)
|
||||
return response.url
|
||||
except urllib.error.HTTPError as e:
|
||||
location = e.headers.get("Location")
|
||||
if location:
|
||||
return location
|
||||
return None
|
||||
except (urllib.error.URLError, TimeoutError):
|
||||
return None
|
||||
|
||||
|
||||
def fetch_page_html(page_url: str) -> str | None:
|
||||
"""Fetch the Douyin page HTML content."""
|
||||
req = build_request(page_url)
|
||||
try:
|
||||
with urllib.request.urlopen(req, timeout=15) as response:
|
||||
if response.status != 200:
|
||||
return None
|
||||
return response.read().decode("utf-8", errors="replace")
|
||||
except (urllib.error.URLError, TimeoutError):
|
||||
return None
|
||||
|
||||
|
||||
def decode_escaped_value(value: str) -> str:
|
||||
"""Decode HTML entities and JSON escape sequences."""
|
||||
decoded = html.unescape(value)
|
||||
if "\\" in decoded:
|
||||
try:
|
||||
unquoted = json.loads('"' + decoded.replace('"', '\\"') + '"')
|
||||
decoded = unquoted
|
||||
except (json.JSONDecodeError, ValueError):
|
||||
pass
|
||||
return html.unescape(decoded)
|
||||
|
||||
|
||||
def pick_preferred_url(urls: list[str]) -> str:
|
||||
"""Pick the best URL from a list, preferring p26 CDN."""
|
||||
first_url = ""
|
||||
for raw_url in urls:
|
||||
if not raw_url:
|
||||
continue
|
||||
decoded_url = decode_escaped_value(raw_url)
|
||||
if not decoded_url:
|
||||
continue
|
||||
if decoded_url.startswith("https://p26"):
|
||||
return decoded_url
|
||||
if not first_url:
|
||||
first_url = decoded_url
|
||||
return first_url
|
||||
|
||||
|
||||
def pick_video_url(urls: list[str]) -> str:
|
||||
"""Pick the best video URL, preferring aweme.snssdk.com."""
|
||||
decoded_urls = []
|
||||
for raw_url in urls:
|
||||
if not raw_url:
|
||||
continue
|
||||
decoded_url = decode_escaped_value(raw_url).replace("playwm", "play")
|
||||
decoded_urls.append(decoded_url)
|
||||
|
||||
for url in decoded_urls:
|
||||
if "aweme.snssdk.com" in url:
|
||||
return url
|
||||
return decoded_urls[0] if decoded_urls else ""
|
||||
|
||||
|
||||
def extract_aweme_item(html_content: str) -> dict | None:
|
||||
"""Extract the first aweme item from _ROUTER_DATA."""
|
||||
match = ROUTER_DATA_RE.search(html_content)
|
||||
if not match:
|
||||
return None
|
||||
|
||||
try:
|
||||
router_data = json.loads(match.group(1))
|
||||
except json.JSONDecodeError:
|
||||
return None
|
||||
|
||||
loader_data = router_data.get("loaderData", {})
|
||||
for page_data in loader_data.values():
|
||||
if not isinstance(page_data, dict):
|
||||
continue
|
||||
video_info_res = page_data.get("videoInfoRes", {})
|
||||
item_list = video_info_res.get("item_list", [])
|
||||
if item_list:
|
||||
return item_list[0]
|
||||
return None
|
||||
|
||||
|
||||
def parse_note_item(item: dict) -> dict | None:
|
||||
"""Parse image/note type content."""
|
||||
images = item.get("images") or item.get("image_infos") or []
|
||||
if not images:
|
||||
return None
|
||||
|
||||
image_urls = []
|
||||
seen = set()
|
||||
for img_info in images:
|
||||
url_list = img_info.get("url_list", [])
|
||||
for url in url_list:
|
||||
if url and url.startswith("http"):
|
||||
decoded = html.unescape(url)
|
||||
if decoded not in seen:
|
||||
image_urls.append(decoded)
|
||||
seen.add(decoded)
|
||||
break
|
||||
|
||||
if not image_urls:
|
||||
return None
|
||||
|
||||
author = item.get("author", {})
|
||||
music = item.get("music", {})
|
||||
music_url = pick_preferred_url(music.get("play_url", {}).get("url_list", []))
|
||||
|
||||
# Fallback music URL from video play_addr
|
||||
if not music_url:
|
||||
video = item.get("video", {})
|
||||
play_addr = video.get("play_addr", {})
|
||||
uri = play_addr.get("uri", "")
|
||||
if uri.startswith("http"):
|
||||
music_url = decode_escaped_value(uri)
|
||||
else:
|
||||
music_url = pick_preferred_url(play_addr.get("url_list", []))
|
||||
|
||||
return {
|
||||
"type": "note",
|
||||
"author": html.unescape(author.get("nickname", "")),
|
||||
"title": html.unescape(item.get("desc", "")),
|
||||
"images": image_urls,
|
||||
"music_url": music_url,
|
||||
}
|
||||
|
||||
|
||||
def parse_video_item(item: dict) -> dict | None:
|
||||
"""Parse video type content."""
|
||||
video = item.get("video", {})
|
||||
duration = video.get("duration")
|
||||
if duration is not None and duration == 0:
|
||||
return None
|
||||
|
||||
play_addr = video.get("play_addr", {})
|
||||
video_url = pick_video_url(play_addr.get("url_list", []))
|
||||
if not video_url:
|
||||
return None
|
||||
|
||||
author = item.get("author", {})
|
||||
return {
|
||||
"type": "video",
|
||||
"author": html.unescape(author.get("nickname", "")),
|
||||
"title": html.unescape(item.get("desc", "")),
|
||||
"url": video_url,
|
||||
"cover": pick_preferred_url(video.get("cover", {}).get("url_list", [])),
|
||||
}
|
||||
|
||||
|
||||
def parse_douyin(short_url: str) -> dict | None:
|
||||
"""Main parsing logic: resolve redirect -> fetch HTML -> extract data."""
|
||||
resolved_url = resolve_redirect(short_url)
|
||||
if not resolved_url:
|
||||
return None
|
||||
|
||||
html_content = fetch_page_html(resolved_url)
|
||||
if not html_content:
|
||||
return None
|
||||
|
||||
item = extract_aweme_item(html_content)
|
||||
if not item:
|
||||
return None
|
||||
|
||||
# Try note (images) first, then video
|
||||
result = parse_note_item(item)
|
||||
if result:
|
||||
return result
|
||||
|
||||
result = parse_video_item(item)
|
||||
if result:
|
||||
return result
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def send_video(video_url: str, robot_port: str, to_wxid: str) -> bool:
|
||||
"""Send video via local robot API."""
|
||||
api_url = f"http://127.0.0.1:{robot_port}/api/v1/robot/message/send/video/url"
|
||||
body = json.dumps({
|
||||
"to_wxid": to_wxid,
|
||||
"video_urls": [video_url],
|
||||
}).encode("utf-8")
|
||||
request = urllib.request.Request(
|
||||
api_url,
|
||||
data=body,
|
||||
headers={"Content-Type": "application/json"},
|
||||
method="POST",
|
||||
)
|
||||
try:
|
||||
with urllib.request.urlopen(request, timeout=60) as response:
|
||||
return 200 <= response.status < 300
|
||||
except (urllib.error.URLError, TimeoutError):
|
||||
return False
|
||||
|
||||
|
||||
def send_images(image_urls: list[str], robot_port: str, to_wxid: str) -> bool:
|
||||
"""Send images via local robot API."""
|
||||
api_url = f"http://127.0.0.1:{robot_port}/api/v1/robot/message/send/image/url"
|
||||
body = json.dumps({
|
||||
"to_wxid": to_wxid,
|
||||
"image_urls": image_urls,
|
||||
}).encode("utf-8")
|
||||
request = urllib.request.Request(
|
||||
api_url,
|
||||
data=body,
|
||||
headers={"Content-Type": "application/json"},
|
||||
method="POST",
|
||||
)
|
||||
try:
|
||||
with urllib.request.urlopen(request, timeout=60) as response:
|
||||
return 200 <= response.status < 300
|
||||
except (urllib.error.URLError, TimeoutError):
|
||||
return False
|
||||
|
||||
|
||||
def send_text(text: str, robot_port: str, to_wxid: str) -> bool:
|
||||
"""Send text message via local robot API."""
|
||||
api_url = f"http://127.0.0.1:{robot_port}/api/v1/robot/message/send/text"
|
||||
body = json.dumps({
|
||||
"to_wxid": to_wxid,
|
||||
"content": text,
|
||||
}).encode("utf-8")
|
||||
request = urllib.request.Request(
|
||||
api_url,
|
||||
data=body,
|
||||
headers={"Content-Type": "application/json"},
|
||||
method="POST",
|
||||
)
|
||||
try:
|
||||
with urllib.request.urlopen(request, timeout=10) as response:
|
||||
return 200 <= response.status < 300
|
||||
except (urllib.error.URLError, TimeoutError):
|
||||
return False
|
||||
|
||||
|
||||
def main() -> int:
|
||||
robot_port = os.environ.get("ROBOT_WECHAT_CLIENT_PORT", "").strip()
|
||||
to_wxid = os.environ.get("ROBOT_FROM_WX_ID", "").strip()
|
||||
message_content = os.environ.get("ROBOT_MESSAGE_CONTENT", "").strip()
|
||||
|
||||
if not robot_port or not to_wxid or not message_content:
|
||||
sys.stdout.write(FALLBACK_TEXT + "\n")
|
||||
return 0
|
||||
|
||||
# Extract douyin URL from message
|
||||
matches = DOUYIN_URL_RE.findall(message_content)
|
||||
douyin_urls = [u for u in matches if "v.douyin.com" in u]
|
||||
if not douyin_urls:
|
||||
sys.stdout.write(FALLBACK_TEXT + "\n")
|
||||
return 0
|
||||
|
||||
douyin_url = douyin_urls[0]
|
||||
result = parse_douyin(douyin_url)
|
||||
if not result:
|
||||
sys.stdout.write(FALLBACK_TEXT + "\n")
|
||||
return 0
|
||||
|
||||
if result["type"] == "video":
|
||||
# Send info text
|
||||
info_text = f"抖音视频解析成功\n作者: {result['author']}\n标题: {result['title']}"
|
||||
send_text(info_text, robot_port, to_wxid)
|
||||
# Send video
|
||||
if not send_video(result["url"], robot_port, to_wxid):
|
||||
sys.stdout.write("发送抖音视频失败,请稍后重试。\n")
|
||||
return 0
|
||||
|
||||
elif result["type"] == "note":
|
||||
# Send info text
|
||||
info_text = (
|
||||
f"抖音图片解析成功\n"
|
||||
f"作者: {result['author']}\n"
|
||||
f"标题: {result['title']}\n\n"
|
||||
f"{len(result['images'])}张图片正在发送中..."
|
||||
)
|
||||
send_text(info_text, robot_port, to_wxid)
|
||||
# Send images
|
||||
if not send_images(result["images"], robot_port, to_wxid):
|
||||
sys.stdout.write("发送抖音图片失败,请稍后重试。\n")
|
||||
return 0
|
||||
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
try:
|
||||
raise SystemExit(main())
|
||||
except SystemExit:
|
||||
raise
|
||||
except Exception:
|
||||
traceback.print_exc(file=sys.stdout)
|
||||
raise SystemExit(1)
|
||||
@ -10,7 +10,7 @@ argument-hint: "需要 prompt(提示词)和 images(图片链接列表)
|
||||
|
||||
这是一个 AI 图生图技能,基于输入的一张或多张图片,结合文本提示词生成新的图片。支持图片混合、风格转换、内容合成等多种创作模式。
|
||||
|
||||
支持多个绘图模型:即梦(JiMeng)、豆包(DouBao)、造相(Z-Image)、OpenAI GPT Image。
|
||||
支持多个绘图模型:即梦(JiMeng)、豆包(DouBao)、造相(Z-Image)。
|
||||
|
||||
从数据库中读取绘图配置(API 密钥、Base URL 等),根据用户选择的模型调用对应的绘图 API,返回生成的图片 URL。
|
||||
|
||||
@ -37,18 +37,16 @@ argument-hint: "需要 prompt(提示词)和 images(图片链接列表)
|
||||
},
|
||||
"model": {
|
||||
"type": "string",
|
||||
"description": "画图模型选择(可选):即梦4.5(jimeng-4.5) / 即梦4.6(jimeng-4.6) / 即梦4.7(jimeng-4.7) / 即梦5.0(jimeng-5.0) / 豆包图生图(doubao-seededit-3.0-i2i) / 造相基础版(Z-Image) / 造相蒸馏版(Z-Image-Turbo) / 造相图片编辑(Qwen-Image-Edit-2511) / OpenAI GPT Image(gpt-image-2),默认: 空(none)。",
|
||||
"description": "画图模型选择(可选):即梦4.5(jimeng-4.5) / 即梦4.6(jimeng-4.6) / 即梦5.0(jimeng-5.0) / 豆包图生图(doubao-seededit-3.0-i2i) / 造相基础版(Z-Image) / 造相蒸馏版(Z-Image-Turbo) / 造相图片编辑(Qwen-Image-Edit-2511),默认: 空(none)。",
|
||||
"enum": [
|
||||
"none",
|
||||
"jimeng-4.5",
|
||||
"jimeng-4.6",
|
||||
"jimeng-4.7",
|
||||
"jimeng-5.0",
|
||||
"doubao-seededit-3.0-i2i",
|
||||
"Z-Image",
|
||||
"Z-Image-Turbo",
|
||||
"Qwen-Image-Edit-2511",
|
||||
"gpt-image-2"
|
||||
"Qwen-Image-Edit-2511"
|
||||
],
|
||||
"default": "none"
|
||||
},
|
||||
|
||||
@ -3,17 +3,13 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import base64
|
||||
import json
|
||||
import mimetypes
|
||||
import os
|
||||
import re
|
||||
import subprocess
|
||||
import sys
|
||||
import tempfile
|
||||
import time
|
||||
import traceback
|
||||
import urllib.parse
|
||||
import urllib.request
|
||||
from pathlib import Path
|
||||
|
||||
@ -71,7 +67,6 @@ _ensure_skill_venv_python()
|
||||
|
||||
try:
|
||||
import pymysql # type: ignore # noqa: E402
|
||||
from openai import OpenAI # type: ignore # noqa: E402
|
||||
except ModuleNotFoundError:
|
||||
_run_bootstrap()
|
||||
_py = _get_python_executable()
|
||||
@ -172,240 +167,6 @@ def _http_get_json(url: str, headers: dict, timeout: int = 30) -> dict:
|
||||
return json.loads(resp.read().decode("utf-8"))
|
||||
|
||||
|
||||
def _coerce_int(value, default: int, minimum: int, maximum: int) -> int:
|
||||
try:
|
||||
parsed = int(value)
|
||||
except (TypeError, ValueError):
|
||||
parsed = default
|
||||
return min(max(parsed, minimum), maximum)
|
||||
|
||||
|
||||
def _openai_output_format(config: dict) -> str:
|
||||
output_format = str(config.get("output_format", "png") or "png").lower()
|
||||
if output_format not in {"png", "jpeg", "webp"}:
|
||||
return "png"
|
||||
return output_format
|
||||
|
||||
|
||||
def _openai_size(config: dict, ratio: str, resolution: str) -> str:
|
||||
configured = str(config.get("size", "") or "").strip()
|
||||
if configured:
|
||||
return configured
|
||||
|
||||
normalized_ratio = (ratio or "").replace(" ", "").lower()
|
||||
normalized_resolution = (resolution or "").replace(" ", "").lower()
|
||||
|
||||
if normalized_resolution in {"4k", "2160p", "3840x2160"}:
|
||||
sizes = {
|
||||
"16:9": "3840x2160",
|
||||
"9:16": "2160x3840",
|
||||
"1:1": "2048x2048",
|
||||
"3:2": "3072x2048",
|
||||
"2:3": "2048x3072",
|
||||
}
|
||||
elif normalized_resolution in {"2k", "1440p", "2048"}:
|
||||
sizes = {
|
||||
"16:9": "2048x1152",
|
||||
"9:16": "1152x2048",
|
||||
"1:1": "2048x2048",
|
||||
"3:2": "2048x1360",
|
||||
"2:3": "1360x2048",
|
||||
}
|
||||
elif normalized_resolution in {"1k", "1024", "1024p"}:
|
||||
sizes = {
|
||||
"16:9": "1536x864",
|
||||
"9:16": "864x1536",
|
||||
"1:1": "1024x1024",
|
||||
"3:2": "1536x1024",
|
||||
"2:3": "1024x1536",
|
||||
}
|
||||
else:
|
||||
return "auto"
|
||||
|
||||
return sizes.get(normalized_ratio, "auto")
|
||||
|
||||
|
||||
def _openai_prompt(prompt: str, negative_prompt: str) -> str:
|
||||
if not negative_prompt:
|
||||
return prompt
|
||||
return f"{prompt}\n\n不要包含: {negative_prompt}"
|
||||
|
||||
|
||||
def _openai_client(config: dict) -> OpenAI:
|
||||
api_key = str(config.get("api_key", "")).strip()
|
||||
if not api_key:
|
||||
raise RuntimeError("OpenAI 绘图配置缺少 api_key")
|
||||
|
||||
base_url = str(config.get("base_url", "") or "").strip()
|
||||
organization = str(config.get("organization", "") or "").strip()
|
||||
project = str(config.get("project", "") or "").strip()
|
||||
timeout: float | None = None
|
||||
timeout_value = config.get("timeout")
|
||||
if timeout_value not in (None, ""):
|
||||
timeout = float(timeout_value)
|
||||
|
||||
return OpenAI(
|
||||
api_key=api_key,
|
||||
base_url=base_url or None,
|
||||
organization=organization or None,
|
||||
project=project or None,
|
||||
timeout=timeout,
|
||||
)
|
||||
|
||||
|
||||
def _truncate_debug_payload(value):
|
||||
if isinstance(value, dict):
|
||||
return {
|
||||
key: (
|
||||
f"{item[:50]}..." if key == "b64_json" and isinstance(item, str) and len(item) > 50 else _truncate_debug_payload(item)
|
||||
)
|
||||
for key, item in value.items()
|
||||
}
|
||||
if isinstance(value, list):
|
||||
return [_truncate_debug_payload(item) for item in value]
|
||||
return value
|
||||
|
||||
|
||||
def _debug_response(label: str, payload) -> None:
|
||||
if hasattr(payload, "model_dump"):
|
||||
payload = payload.model_dump()
|
||||
payload = _truncate_debug_payload(payload)
|
||||
sys.stdout.write(f"[debug] {label}: {json.dumps(payload, ensure_ascii=False)}\n")
|
||||
|
||||
|
||||
def _rewrite_openai_image_url(url: str) -> str:
|
||||
internal_host = "http://chatgpt2api:80"
|
||||
external_host = "https://chatgpt2api.houhoukang.com"
|
||||
if url.startswith(internal_host):
|
||||
return f"{external_host}{url[len(internal_host):]}"
|
||||
return url
|
||||
|
||||
|
||||
def _extension_from_output_format(output_format: str) -> str:
|
||||
if output_format == "jpeg":
|
||||
return ".jpg"
|
||||
if output_format == "webp":
|
||||
return ".webp"
|
||||
return ".png"
|
||||
|
||||
|
||||
def _openai_response_value(item, key: str):
|
||||
if isinstance(item, dict):
|
||||
return item.get(key)
|
||||
return getattr(item, key, None)
|
||||
|
||||
|
||||
def _write_openai_b64_image(b64_json: str, output_format: str) -> str:
|
||||
encoded = b64_json.strip()
|
||||
suffix = _extension_from_output_format(output_format)
|
||||
if encoded.startswith("data:"):
|
||||
header, encoded = encoded.split(",", 1)
|
||||
mime_type = header[5:].split(";", 1)[0].strip().lower()
|
||||
if mime_type:
|
||||
suffix = _extension_from_mime(mime_type)
|
||||
|
||||
encoded = "".join(encoded.split())
|
||||
padding = len(encoded) % 4
|
||||
if padding:
|
||||
encoded = f"{encoded}{'=' * (4 - padding)}"
|
||||
|
||||
image_bytes = base64.b64decode(encoded)
|
||||
with tempfile.NamedTemporaryFile(prefix="wechat-openai-image-", suffix=suffix, delete=False) as temp_file:
|
||||
temp_file.write(image_bytes)
|
||||
return temp_file.name
|
||||
|
||||
|
||||
def _openai_images_from_response(response, output_format: str) -> list[str]:
|
||||
outputs: list[str] = []
|
||||
try:
|
||||
for item in getattr(response, "data", []) or []:
|
||||
b64_json = _openai_response_value(item, "b64_json")
|
||||
if b64_json:
|
||||
outputs.append(_write_openai_b64_image(str(b64_json), output_format))
|
||||
continue
|
||||
|
||||
url = _openai_response_value(item, "url")
|
||||
if url:
|
||||
outputs.append(_rewrite_openai_image_url(str(url)))
|
||||
except Exception:
|
||||
_cleanup_openai_temp_files(outputs)
|
||||
raise
|
||||
return outputs
|
||||
|
||||
|
||||
def _is_remote_image_url(value: str) -> bool:
|
||||
return urllib.parse.urlparse(value).scheme in {"http", "https"}
|
||||
|
||||
|
||||
def _send_image_outputs(client_port: str, from_wx_id: str, image_outputs: list[str]) -> None:
|
||||
remote_urls = [value for value in image_outputs if value and _is_remote_image_url(value)]
|
||||
local_paths = [value for value in image_outputs if value and not _is_remote_image_url(value)]
|
||||
|
||||
if remote_urls:
|
||||
send_url = f"http://127.0.0.1:{client_port}/api/v1/robot/message/send/image/url"
|
||||
send_body = {
|
||||
"to_wxid": from_wx_id,
|
||||
"image_urls": remote_urls,
|
||||
}
|
||||
response = _http_post_json(send_url, send_body, {"Content-Type": "application/json"}, timeout=300)
|
||||
_debug_response("send image url response", response)
|
||||
|
||||
for file_path in local_paths:
|
||||
send_url = f"http://127.0.0.1:{client_port}/api/v1/robot/message/send/image/local"
|
||||
send_body = {
|
||||
"to_wxid": from_wx_id,
|
||||
"file_path": file_path,
|
||||
}
|
||||
response = _http_post_json(send_url, send_body, {"Content-Type": "application/json"}, timeout=300)
|
||||
_debug_response("send image local response", response)
|
||||
|
||||
|
||||
def _cleanup_openai_temp_files(image_outputs: list[str]) -> None:
|
||||
for value in image_outputs:
|
||||
path = Path(value)
|
||||
if path.name.startswith("wechat-openai-image-") and path.is_file():
|
||||
try:
|
||||
path.unlink()
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
|
||||
def _extension_from_mime(mime_type: str) -> str:
|
||||
if mime_type == "image/jpeg":
|
||||
return ".jpg"
|
||||
guessed = mimetypes.guess_extension(mime_type)
|
||||
if guessed in {".png", ".jpg", ".jpeg", ".webp"}:
|
||||
return guessed
|
||||
return ".png"
|
||||
|
||||
|
||||
def _download_openai_input_image(image: str, directory: str, index: int) -> Path:
|
||||
stripped = image.strip()
|
||||
if stripped.startswith("data:"):
|
||||
header, encoded = stripped.split(",", 1)
|
||||
mime_type = header[5:].split(";", 1)[0] or "image/png"
|
||||
path = Path(directory) / f"input-{index}{_extension_from_mime(mime_type)}"
|
||||
path.write_bytes(base64.b64decode(encoded))
|
||||
return path
|
||||
|
||||
parsed = urllib.parse.urlparse(stripped)
|
||||
if parsed.scheme in {"http", "https"}:
|
||||
request = urllib.request.Request(stripped, headers={"User-Agent": "wechat-robot-skills/1.0"})
|
||||
with urllib.request.urlopen(request, timeout=60) as response:
|
||||
content_type = response.headers.get("Content-Type", "image/png").split(";", 1)[0].strip()
|
||||
suffix = Path(parsed.path).suffix.lower()
|
||||
if suffix not in {".png", ".jpg", ".jpeg", ".webp"}:
|
||||
suffix = _extension_from_mime(content_type)
|
||||
path = Path(directory) / f"input-{index}{suffix}"
|
||||
path.write_bytes(response.read())
|
||||
return path
|
||||
|
||||
path = Path(stripped).expanduser()
|
||||
if path.is_file():
|
||||
return path
|
||||
raise RuntimeError(f"无法读取图片: {image}")
|
||||
|
||||
|
||||
def call_jimeng(config: dict, prompt: str, model: str, images: list[str],
|
||||
negative_prompt: str, ratio: str, resolution: str) -> list[str]:
|
||||
"""Call JiMeng (即梦) image compositions API (图生图)."""
|
||||
@ -548,53 +309,13 @@ def call_zimage(config: dict, prompt: str, model: str, images: list[str]) -> lis
|
||||
raise RuntimeError("造相绘图任务超时")
|
||||
|
||||
|
||||
def call_openai(config: dict, prompt: str, model: str, images: list[str],
|
||||
negative_prompt: str, ratio: str, resolution: str) -> list[str]:
|
||||
"""Call OpenAI GPT Image API for image editing."""
|
||||
client = _openai_client(config)
|
||||
output_format = _openai_output_format(config)
|
||||
quality = str(config.get("quality", "auto") or "auto")
|
||||
background = str(config.get("background", "auto") or "auto")
|
||||
if background == "transparent":
|
||||
background = "auto"
|
||||
|
||||
with tempfile.TemporaryDirectory() as temp_dir:
|
||||
input_paths = [
|
||||
_download_openai_input_image(image, temp_dir, index)
|
||||
for index, image in enumerate(images[:16], start=1)
|
||||
]
|
||||
input_files = [path.open("rb") for path in input_paths]
|
||||
try:
|
||||
kwargs = {
|
||||
"model": model or "gpt-image-2",
|
||||
"prompt": _openai_prompt(prompt, negative_prompt),
|
||||
"image": input_files,
|
||||
"n": _coerce_int(config.get("n"), 1, 1, 10),
|
||||
"size": _openai_size(config, ratio, resolution),
|
||||
"quality": quality,
|
||||
"background": background,
|
||||
"output_format": output_format,
|
||||
}
|
||||
if output_format in {"jpeg", "webp"} and config.get("output_compression") is not None:
|
||||
kwargs["output_compression"] = _coerce_int(config.get("output_compression"), 100, 0, 100)
|
||||
|
||||
response = client.images.edit(**kwargs)
|
||||
finally:
|
||||
for input_file in input_files:
|
||||
input_file.close()
|
||||
|
||||
_debug_response("openai images.edit response", response)
|
||||
return _openai_images_from_response(response, output_format)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Main
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
JIMENG_MODELS = {"jimeng-4.5", "jimeng-4.6", "jimeng-4.7", "jimeng-5.0"}
|
||||
JIMENG_MODELS = {"jimeng-4.5", "jimeng-4.6", "jimeng-5.0"}
|
||||
DOUBAO_MODELS = {"doubao-seededit-3.0-i2i"}
|
||||
ZIMAGE_MODELS = {"Z-Image", "Z-Image-Turbo", "Qwen-Image-Edit-2511"}
|
||||
OPENAI_MODELS = {"gpt-image-2"}
|
||||
|
||||
|
||||
def _parse_cli_params(argv: list[str]) -> dict:
|
||||
@ -703,13 +424,6 @@ def main() -> int:
|
||||
return 0
|
||||
image_urls = call_zimage(zimage_config, prompt, model, images)
|
||||
|
||||
elif model in OPENAI_MODELS:
|
||||
openai_config = settings_json.get("OpenAI", {})
|
||||
if not openai_config.get("enabled", False):
|
||||
sys.stdout.write("OpenAI 绘图未开启\n")
|
||||
return 0
|
||||
image_urls = call_openai(openai_config, prompt, model, images, negative_prompt, ratio, resolution)
|
||||
|
||||
else:
|
||||
sys.stdout.write("不支持的 AI 图像模型\n")
|
||||
return 1
|
||||
@ -725,18 +439,20 @@ def main() -> int:
|
||||
# 通过客户端接口发送图片
|
||||
client_port = os.environ.get("ROBOT_WECHAT_CLIENT_PORT", "").strip()
|
||||
if not client_port:
|
||||
_cleanup_openai_temp_files(image_urls)
|
||||
sys.stdout.write("环境变量 ROBOT_WECHAT_CLIENT_PORT 未配置\n")
|
||||
return 1
|
||||
|
||||
send_url = f"http://127.0.0.1:{client_port}/api/v1/robot/message/send/image/url"
|
||||
send_body = {
|
||||
"to_wxid": from_wx_id,
|
||||
"image_urls": [u for u in image_urls if u],
|
||||
}
|
||||
try:
|
||||
_send_image_outputs(client_port, from_wx_id, image_urls)
|
||||
_http_post_json(send_url, send_body, {"Content-Type": "application/json"}, timeout=60)
|
||||
sys.stdout.write("图片发送成功\n")
|
||||
except Exception as exc:
|
||||
sys.stdout.write(f"发送图片失败: {exc}\n")
|
||||
return 1
|
||||
finally:
|
||||
_cleanup_openai_temp_files(image_urls)
|
||||
|
||||
return 0
|
||||
|
||||
|
||||
@ -1,3 +1,2 @@
|
||||
cryptography
|
||||
openai>=2.34.0
|
||||
pymysql>=1.1,<2
|
||||
|
||||
@ -23,7 +23,7 @@ argument-hint: "无需参数,直接调用即可"
|
||||
|
||||
## 接口信息
|
||||
|
||||
- 请求地址:`https://api.pearapi.ai/api/kfc?type=json`
|
||||
- 请求地址:`https://api.pearktrue.cn/api/kfc?type=json`
|
||||
- 请求方式:`GET`
|
||||
- 本地脚本:`scripts/kfc.py`
|
||||
- 返回示例:
|
||||
@ -33,7 +33,7 @@ argument-hint: "无需参数,直接调用即可"
|
||||
"code": 200,
|
||||
"msg": "获取成功",
|
||||
"text": "14看着不香,果然还是13更香,iPhone14真是更新了个寂寞!......今天肯德基疯狂星期四,谁请我吃?",
|
||||
"api_source": "官方API网:https://api.pearapi.ai/"
|
||||
"api_source": "官方API网:https://api.pearktrue.cn/"
|
||||
}
|
||||
```
|
||||
|
||||
@ -43,7 +43,7 @@ argument-hint: "无需参数,直接调用即可"
|
||||
|
||||
1. 当用户输入 `kfc`、`KFC`、`肯德基` 或 `肯德基文案` 时触发该技能。
|
||||
2. 在仓库根目录下执行本地脚本:`python3 scripts/kfc.py`。
|
||||
3. 脚本内部发送 `GET` 请求到 `https://api.pearapi.ai/api/kfc?type=json`。
|
||||
3. 脚本内部发送 `GET` 请求到 `https://api.pearktrue.cn/api/kfc?type=json`。
|
||||
4. 脚本解析返回的 JSON,并输出 `text` 字段。
|
||||
5. 如果接口请求失败、返回格式异常,或没有拿到 `text`,脚本输出:`今天的肯德基文案暂时没拿到,等我再去问问。`
|
||||
6. 如果脚本无法执行(Python 环境不可用),直接回复兜底文案:`今天的肯德基文案暂时没拿到,等我再去问问。`
|
||||
|
||||
@ -12,7 +12,7 @@ import urllib.request
|
||||
sys.stderr = sys.stdout
|
||||
|
||||
|
||||
API_URL = "https://api.pearapi.ai/api/kfc?type=json"
|
||||
API_URL = "https://api.pearktrue.cn/api/kfc?type=json"
|
||||
FALLBACK_TEXT = "今天的肯德基文案暂时没拿到,等我再去问问。"
|
||||
|
||||
|
||||
|
||||
@ -8,7 +8,7 @@ argument-hint: "需要 prompt 参数(画图提示词),可选 model(模
|
||||
|
||||
## 描述
|
||||
|
||||
这是一个 AI 文生图技能,当用户想通过文本描述生成图像时触发。支持多个绘图模型:即梦(JiMeng)、豆包(DouBao)、造相(Z-Image)、OpenAI GPT Image。
|
||||
这是一个 AI 文生图技能,当用户想通过文本描述生成图像时触发。支持多个绘图模型:即梦(JiMeng)、豆包(DouBao)、造相(Z-Image)。
|
||||
|
||||
从数据库中读取绘图配置(API 密钥、Base URL 等),根据用户选择的模型调用对应的绘图 API,返回生成的图片 URL。
|
||||
|
||||
@ -35,12 +35,11 @@ argument-hint: "需要 prompt 参数(画图提示词),可选 model(模
|
||||
},
|
||||
"model": {
|
||||
"type": "string",
|
||||
"description": "画图模型选择(可选):即梦4.5(jimeng-4.5) / 即梦4.6(jimeng-4.6) / 即梦4.7(jimeng-4.7) / 即梦5.0(jimeng-5.0) / 豆包4.5(doubao-seedream-4.5) / 豆包4.0(doubao-seedream-4.0) / 豆包文生图(doubao-seedream-3.0-t2i) / 豆包图生图(doubao-seededit-3.0-i2i) / 造相基础版(Z-Image) / 造相蒸馏版(Z-Image-Turbo) / 造相图片编辑(Qwen-Image-Edit-2511) / OpenAI GPT Image(gpt-image-2),默认: 空(none)。",
|
||||
"description": "画图模型选择(可选):即梦4.5(jimeng-4.5) / 即梦4.6(jimeng-4.6) / 即梦5.0(jimeng-5.0) / 豆包4.5(doubao-seedream-4.5) / 豆包4.0(doubao-seedream-4.0) / 豆包文生图(doubao-seedream-3.0-t2i) / 豆包图生图(doubao-seededit-3.0-i2i) / 造相基础版(Z-Image) / 造相蒸馏版(Z-Image-Turbo) / 造相图片编辑(Qwen-Image-Edit-2511),默认: 空(none)。",
|
||||
"enum": [
|
||||
"none",
|
||||
"jimeng-4.5",
|
||||
"jimeng-4.6",
|
||||
"jimeng-4.7",
|
||||
"jimeng-5.0",
|
||||
"doubao-seedream-4.5",
|
||||
"doubao-seedream-4.0",
|
||||
@ -48,8 +47,7 @@ argument-hint: "需要 prompt 参数(画图提示词),可选 model(模
|
||||
"doubao-seededit-3.0-i2i",
|
||||
"Z-Image",
|
||||
"Z-Image-Turbo",
|
||||
"Qwen-Image-Edit-2511",
|
||||
"gpt-image-2"
|
||||
"Qwen-Image-Edit-2511"
|
||||
],
|
||||
"default": "none"
|
||||
},
|
||||
|
||||
@ -1,3 +1,2 @@
|
||||
cryptography
|
||||
openai>=2.34.0
|
||||
pymysql>=1.1,<2
|
||||
@ -3,17 +3,13 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import base64
|
||||
import json
|
||||
import mimetypes
|
||||
import os
|
||||
import re
|
||||
import subprocess
|
||||
import sys
|
||||
import tempfile
|
||||
import time
|
||||
import traceback
|
||||
import urllib.parse
|
||||
import urllib.request
|
||||
from pathlib import Path
|
||||
|
||||
@ -71,7 +67,6 @@ _ensure_skill_venv_python()
|
||||
|
||||
try:
|
||||
import pymysql # type: ignore # noqa: E402
|
||||
from openai import OpenAI # type: ignore # noqa: E402
|
||||
except ModuleNotFoundError:
|
||||
_run_bootstrap()
|
||||
_py = _get_python_executable()
|
||||
@ -174,213 +169,6 @@ def _http_get_json(url: str, headers: dict, timeout: int = 30) -> dict:
|
||||
return json.loads(resp.read().decode("utf-8"))
|
||||
|
||||
|
||||
def _coerce_int(value, default: int, minimum: int, maximum: int) -> int:
|
||||
try:
|
||||
parsed = int(value)
|
||||
except (TypeError, ValueError):
|
||||
parsed = default
|
||||
return min(max(parsed, minimum), maximum)
|
||||
|
||||
|
||||
def _openai_output_format(config: dict) -> str:
|
||||
output_format = str(config.get("output_format", "png") or "png").lower()
|
||||
if output_format not in {"png", "jpeg", "webp"}:
|
||||
return "png"
|
||||
return output_format
|
||||
|
||||
|
||||
def _openai_size(config: dict, ratio: str, resolution: str) -> str:
|
||||
configured = str(config.get("size", "") or "").strip()
|
||||
if configured:
|
||||
return configured
|
||||
|
||||
normalized_ratio = (ratio or "").replace(" ", "").lower()
|
||||
normalized_resolution = (resolution or "").replace(" ", "").lower()
|
||||
|
||||
if normalized_resolution in {"4k", "2160p", "3840x2160"}:
|
||||
sizes = {
|
||||
"16:9": "3840x2160",
|
||||
"9:16": "2160x3840",
|
||||
"1:1": "2048x2048",
|
||||
"3:2": "3072x2048",
|
||||
"2:3": "2048x3072",
|
||||
}
|
||||
elif normalized_resolution in {"2k", "1440p", "2048"}:
|
||||
sizes = {
|
||||
"16:9": "2048x1152",
|
||||
"9:16": "1152x2048",
|
||||
"1:1": "2048x2048",
|
||||
"3:2": "2048x1360",
|
||||
"2:3": "1360x2048",
|
||||
}
|
||||
elif normalized_resolution in {"1k", "1024", "1024p"}:
|
||||
sizes = {
|
||||
"16:9": "1536x864",
|
||||
"9:16": "864x1536",
|
||||
"1:1": "1024x1024",
|
||||
"3:2": "1536x1024",
|
||||
"2:3": "1024x1536",
|
||||
}
|
||||
else:
|
||||
return "auto"
|
||||
|
||||
return sizes.get(normalized_ratio, "auto")
|
||||
|
||||
|
||||
def _openai_prompt(prompt: str, negative_prompt: str) -> str:
|
||||
if not negative_prompt:
|
||||
return prompt
|
||||
return f"{prompt}\n\n不要包含: {negative_prompt}"
|
||||
|
||||
|
||||
def _openai_client(config: dict) -> OpenAI:
|
||||
api_key = str(config.get("api_key", "")).strip()
|
||||
if not api_key:
|
||||
raise RuntimeError("OpenAI 绘图配置缺少 api_key")
|
||||
|
||||
base_url = str(config.get("base_url", "") or "").strip()
|
||||
organization = str(config.get("organization", "") or "").strip()
|
||||
project = str(config.get("project", "") or "").strip()
|
||||
timeout: float | None = None
|
||||
timeout_value = config.get("timeout")
|
||||
if timeout_value not in (None, ""):
|
||||
timeout = float(timeout_value)
|
||||
|
||||
return OpenAI(
|
||||
api_key=api_key,
|
||||
base_url=base_url or None,
|
||||
organization=organization or None,
|
||||
project=project or None,
|
||||
timeout=timeout,
|
||||
)
|
||||
|
||||
|
||||
def _truncate_debug_payload(value):
|
||||
if isinstance(value, dict):
|
||||
return {
|
||||
key: (
|
||||
f"{item[:50]}..." if key == "b64_json" and isinstance(item, str) and len(item) > 50 else _truncate_debug_payload(item)
|
||||
)
|
||||
for key, item in value.items()
|
||||
}
|
||||
if isinstance(value, list):
|
||||
return [_truncate_debug_payload(item) for item in value]
|
||||
return value
|
||||
|
||||
|
||||
def _debug_response(label: str, payload) -> None:
|
||||
if hasattr(payload, "model_dump"):
|
||||
payload = payload.model_dump()
|
||||
payload = _truncate_debug_payload(payload)
|
||||
sys.stdout.write(f"[debug] {label}: {json.dumps(payload, ensure_ascii=False)}\n")
|
||||
|
||||
|
||||
def _rewrite_openai_image_url(url: str) -> str:
|
||||
internal_host = "http://chatgpt2api:80"
|
||||
external_host = "https://chatgpt2api.houhoukang.com"
|
||||
if url.startswith(internal_host):
|
||||
return f"{external_host}{url[len(internal_host):]}"
|
||||
return url
|
||||
|
||||
|
||||
def _extension_from_mime(mime_type: str) -> str:
|
||||
if mime_type == "image/jpeg":
|
||||
return ".jpg"
|
||||
guessed = mimetypes.guess_extension(mime_type)
|
||||
if guessed in {".png", ".jpg", ".jpeg", ".webp"}:
|
||||
return guessed
|
||||
return ".png"
|
||||
|
||||
|
||||
def _extension_from_output_format(output_format: str) -> str:
|
||||
if output_format == "jpeg":
|
||||
return ".jpg"
|
||||
if output_format == "webp":
|
||||
return ".webp"
|
||||
return ".png"
|
||||
|
||||
|
||||
def _openai_response_value(item, key: str):
|
||||
if isinstance(item, dict):
|
||||
return item.get(key)
|
||||
return getattr(item, key, None)
|
||||
|
||||
|
||||
def _write_openai_b64_image(b64_json: str, output_format: str) -> str:
|
||||
encoded = b64_json.strip()
|
||||
suffix = _extension_from_output_format(output_format)
|
||||
if encoded.startswith("data:"):
|
||||
header, encoded = encoded.split(",", 1)
|
||||
mime_type = header[5:].split(";", 1)[0].strip().lower()
|
||||
if mime_type:
|
||||
suffix = _extension_from_mime(mime_type)
|
||||
|
||||
encoded = "".join(encoded.split())
|
||||
padding = len(encoded) % 4
|
||||
if padding:
|
||||
encoded = f"{encoded}{'=' * (4 - padding)}"
|
||||
|
||||
image_bytes = base64.b64decode(encoded)
|
||||
with tempfile.NamedTemporaryFile(prefix="wechat-openai-image-", suffix=suffix, delete=False) as temp_file:
|
||||
temp_file.write(image_bytes)
|
||||
return temp_file.name
|
||||
|
||||
|
||||
def _openai_images_from_response(response, output_format: str) -> list[str]:
|
||||
outputs: list[str] = []
|
||||
try:
|
||||
for item in getattr(response, "data", []) or []:
|
||||
b64_json = _openai_response_value(item, "b64_json")
|
||||
if b64_json:
|
||||
outputs.append(_write_openai_b64_image(str(b64_json), output_format))
|
||||
continue
|
||||
|
||||
url = _openai_response_value(item, "url")
|
||||
if url:
|
||||
outputs.append(_rewrite_openai_image_url(str(url)))
|
||||
except Exception:
|
||||
_cleanup_openai_temp_files(outputs)
|
||||
raise
|
||||
return outputs
|
||||
|
||||
|
||||
def _is_remote_image_url(value: str) -> bool:
|
||||
return urllib.parse.urlparse(value).scheme in {"http", "https"}
|
||||
|
||||
|
||||
def _send_image_outputs(client_port: str, from_wx_id: str, image_outputs: list[str]) -> None:
|
||||
remote_urls = [value for value in image_outputs if value and _is_remote_image_url(value)]
|
||||
local_paths = [value for value in image_outputs if value and not _is_remote_image_url(value)]
|
||||
|
||||
if remote_urls:
|
||||
send_url = f"http://127.0.0.1:{client_port}/api/v1/robot/message/send/image/url"
|
||||
send_body = {
|
||||
"to_wxid": from_wx_id,
|
||||
"image_urls": remote_urls,
|
||||
}
|
||||
response = _http_post_json(send_url, send_body, {"Content-Type": "application/json"}, timeout=300)
|
||||
_debug_response("send image url response", response)
|
||||
|
||||
for file_path in local_paths:
|
||||
send_url = f"http://127.0.0.1:{client_port}/api/v1/robot/message/send/image/local"
|
||||
send_body = {
|
||||
"to_wxid": from_wx_id,
|
||||
"file_path": file_path,
|
||||
}
|
||||
response = _http_post_json(send_url, send_body, {"Content-Type": "application/json"}, timeout=300)
|
||||
_debug_response("send image local response", response)
|
||||
|
||||
|
||||
def _cleanup_openai_temp_files(image_outputs: list[str]) -> None:
|
||||
for value in image_outputs:
|
||||
path = Path(value)
|
||||
if path.name.startswith("wechat-openai-image-") and path.is_file():
|
||||
try:
|
||||
path.unlink()
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
|
||||
def call_jimeng(config: dict, prompt: str, model: str,
|
||||
negative_prompt: str, ratio: str, resolution: str) -> list[str]:
|
||||
"""Call JiMeng (即梦) image generation API."""
|
||||
@ -527,43 +315,13 @@ def call_zimage(config: dict, prompt: str, model: str) -> list[str]:
|
||||
raise RuntimeError("造相绘图任务超时")
|
||||
|
||||
|
||||
def call_openai(config: dict, prompt: str, model: str,
|
||||
negative_prompt: str, ratio: str, resolution: str) -> list[str]:
|
||||
"""Call OpenAI GPT Image API for text-to-image generation."""
|
||||
client = _openai_client(config)
|
||||
output_format = _openai_output_format(config)
|
||||
quality = str(config.get("quality", "auto") or "auto")
|
||||
moderation = str(config.get("moderation", "auto") or "auto")
|
||||
background = str(config.get("background", "auto") or "auto")
|
||||
if background == "transparent":
|
||||
background = "auto"
|
||||
|
||||
kwargs = {
|
||||
"model": model or "gpt-image-2",
|
||||
"prompt": _openai_prompt(prompt, negative_prompt),
|
||||
"n": _coerce_int(config.get("n"), 1, 1, 10),
|
||||
"size": _openai_size(config, ratio, resolution),
|
||||
"quality": quality,
|
||||
"background": background,
|
||||
"moderation": moderation,
|
||||
"output_format": output_format,
|
||||
}
|
||||
if output_format in {"jpeg", "webp"} and config.get("output_compression") is not None:
|
||||
kwargs["output_compression"] = _coerce_int(config.get("output_compression"), 100, 0, 100)
|
||||
|
||||
response = client.images.generate(**kwargs)
|
||||
_debug_response("openai images.generate response", response)
|
||||
return _openai_images_from_response(response, output_format)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Main
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
JIMENG_MODELS = {"jimeng-4.5", "jimeng-4.6", "jimeng-4.7", "jimeng-5.0"}
|
||||
JIMENG_MODELS = {"jimeng-4.5", "jimeng-4.6", "jimeng-5.0"}
|
||||
DOUBAO_MODELS = {"doubao-seedream-4.5", "doubao-seedream-4.0", "doubao-seedream-3.0-t2i", "doubao-seededit-3.0-i2i"}
|
||||
ZIMAGE_MODELS = {"Z-Image", "Z-Image-Turbo", "Qwen-Image-Edit-2511"}
|
||||
OPENAI_MODELS = {"gpt-image-2"}
|
||||
|
||||
|
||||
def _parse_cli_params(argv: list[str]) -> dict[str, str]:
|
||||
@ -665,13 +423,6 @@ def main() -> int:
|
||||
return 0
|
||||
image_urls = call_zimage(zimage_config, prompt, model)
|
||||
|
||||
elif model in OPENAI_MODELS:
|
||||
openai_config = settings_json.get("OpenAI", {})
|
||||
if not openai_config.get("enabled", False):
|
||||
sys.stdout.write("OpenAI 绘图未开启\n")
|
||||
return 0
|
||||
image_urls = call_openai(openai_config, prompt, model, negative_prompt, ratio, resolution)
|
||||
|
||||
else:
|
||||
sys.stdout.write("不支持的 AI 图像模型\n")
|
||||
return 1
|
||||
@ -687,18 +438,20 @@ def main() -> int:
|
||||
# 通过客户端接口发送图片
|
||||
client_port = os.environ.get("ROBOT_WECHAT_CLIENT_PORT", "").strip()
|
||||
if not client_port:
|
||||
_cleanup_openai_temp_files(image_urls)
|
||||
sys.stdout.write("环境变量 ROBOT_WECHAT_CLIENT_PORT 未配置\n")
|
||||
return 1
|
||||
|
||||
send_url = f"http://127.0.0.1:{client_port}/api/v1/robot/message/send/image/url"
|
||||
send_body = {
|
||||
"to_wxid": from_wx_id,
|
||||
"image_urls": [u for u in image_urls if u],
|
||||
}
|
||||
try:
|
||||
_send_image_outputs(client_port, from_wx_id, image_urls)
|
||||
_http_post_json(send_url, send_body, {"Content-Type": "application/json"}, timeout=60)
|
||||
sys.stdout.write("图片发送成功\n")
|
||||
except Exception as exc:
|
||||
sys.stdout.write(f"发送图片失败: {exc}\n")
|
||||
return 1
|
||||
finally:
|
||||
_cleanup_openai_temp_files(image_urls)
|
||||
|
||||
return 0
|
||||
|
||||
|
||||
@ -111,70 +111,6 @@ argument-hint: "需要 content;可选 emotion、voice、style_prompt、voice_p
|
||||
9. 不要传递音色复刻音频参数。若当前消息引用了一条语音消息,脚本会通过 `ROBOT_REF_MESSAGE_ID` 自动判断并下载引用语音作为复刻样本。
|
||||
10. `content` 超过 260 个字符时,不应该调用本技能。
|
||||
|
||||
## 音频标签控制
|
||||
|
||||
通过在文本中嵌入风格标签与音频标签,直接对语音进行精细控制。开头是整体风格标签,中间可以插入细粒度控制标签。
|
||||
|
||||
在目标文本开头添加 `(风格)` 标签,即可指定语音的发音风格。支持同时设置多种风格,将多个风格名称置于同一对括号内,分隔符不限。
|
||||
|
||||
支持的括号格式: 可使用半角 `()`、全角 `()` 或 `[]`。
|
||||
|
||||
### 格式示例
|
||||
|
||||
```
|
||||
风格类型 风格示例
|
||||
基础情绪 开心/悲伤/愤怒/恐惧/惊讶/兴奋/委屈/平静/冷漠
|
||||
复合情绪 怅然/欣慰/无奈/愧疚/释然/嫉妒/厌倦/忐忑/动情
|
||||
整体语调 温柔/高冷/活泼/严肃/慵懒/俏皮/深沉/干练/凌厉
|
||||
音色定位 磁性/醇厚/清亮/空灵/稚嫩/苍老/甜美/沙哑/醇雅
|
||||
人设腔调 夹子音/御姐音/正太音/大叔音/台湾腔
|
||||
方言 东北话/四川话/河南话/粤语
|
||||
角色扮演 孙悟空/林黛玉
|
||||
唱歌 唱歌
|
||||
```
|
||||
|
||||
样例:
|
||||
|
||||
- (怅然)这么多年过去了,再走过那条街,心里一下子空了一块。
|
||||
|
||||
- (慵懒)再让我睡五分钟……就五分钟,真的,最后一次。
|
||||
|
||||
- (磁性)夜已经深了,城市还在呼吸。我是今晚陪你的人,欢迎收听《午夜电台》。
|
||||
|
||||
- (东北话)哎呀妈呀,这天儿也忒冷了吧!你说这风,嗖嗖的,跟刀子似的,割脸啊!
|
||||
|
||||
- (粤语)呢个真係好正啊!食过一次就唔会忘记!
|
||||
|
||||
- (唱歌)原谅我这一生不羁放纵爱自由,也会怕有一天会跌倒,Oh no。背弃了理想,谁人都可以,哪会怕有一天只你共我。
|
||||
|
||||
在此基础上,我们还支持在文本中任意位置插入 [音频标签]。通过 [音频标签] ,你可以对声音进行细粒度控制,精准调节语气、情绪和表达风格——无论是低声耳语、放声大笑,还是带点小情绪的小吐槽,也可以灵活插入呼吸声,停顿,咳嗽等,都能轻松实现。语速同样可以灵活调整,让每句话都有它该有的节奏。
|
||||
|
||||
```
|
||||
风格类型 风格示例
|
||||
语速与节奏 吸气/深呼吸/叹气/长叹一口气/喘息/屏息
|
||||
情绪状态 紧张/害怕/激动/疲惫/委屈/撒娇/心虚/震惊/不耐烦
|
||||
语音特征 颤抖/声音颤抖/变调/破音/鼻音/气声/沙哑
|
||||
哭笑表达 笑/轻笑/大笑/冷笑/抽泣/呜咽/哽咽/嚎啕大哭
|
||||
```
|
||||
|
||||
样例:
|
||||
|
||||
- (紧张,深呼吸)呼……冷静,冷静。不就是一个面试吗……(语速加快,碎碎念)自我介绍已经背了五十遍了,应该没问题的。加油,你可以的……(小声)哎呀,领带歪没歪?
|
||||
|
||||
- (极其疲惫,有气无力)师傅……到地方了叫我一声……(长叹一口气)我先眯一会儿,这班加得我魂儿都要散了。
|
||||
|
||||
- 如果我当时……(沉默片刻)哪怕再坚持一秒钟,结果是不是就不一样了?(苦笑)呵,没如果了。
|
||||
|
||||
- (寒冷导致的急促呼吸)呼——呼——这、这大兴安岭的雪……(咳嗽)简直能把人骨头冻透了……别、别停下,走,快走。
|
||||
|
||||
- (提高音量喊话)大姐!这鱼新鲜着呢!早上刚捞上来的!哎!那个谁,别乱翻,压坏了你赔啊?!
|
||||
|
||||
### 特别注意
|
||||
|
||||
- 只有`mimo-v2.5-tts`模型支持唱歌模式
|
||||
|
||||
- 如需体验更佳的唱歌风格,必须在目标文本最开头添加 `(唱歌)` 标签,格式为:`(唱歌)歌词`。歌词 建议采用中文,可获得更优合成效果。标签内标识支持以下取值,效果等效:`唱歌`、`sing`、`singing`
|
||||
|
||||
## 执行步骤
|
||||
|
||||
1. 识别用户是否明确需要语音消息。
|
||||
|
||||
@ -740,6 +740,14 @@ def synthesize_audio_mimo(config: dict, params: dict) -> tuple[bytes, str]:
|
||||
|
||||
url = f"{base_url}/chat/completions"
|
||||
payload, audio_format, stream = _build_mimo_payload(config, params)
|
||||
|
||||
sys.stdout.write(
|
||||
f"[mimo debug] config={json.dumps(config, ensure_ascii=False)}\n"
|
||||
f"[mimo debug] url={url}\n"
|
||||
f"[mimo debug] api_key={api_key}\n"
|
||||
f"[mimo debug] model={payload.get('model')}\n"
|
||||
)
|
||||
|
||||
request_data = json.dumps(payload, ensure_ascii=False).encode("utf-8")
|
||||
|
||||
req = urllib.request.Request(
|
||||
@ -747,7 +755,7 @@ def synthesize_audio_mimo(config: dict, params: dict) -> tuple[bytes, str]:
|
||||
data=request_data,
|
||||
headers={
|
||||
"Content-Type": "application/json",
|
||||
"Authorization": f"Bearer {api_key}",
|
||||
"api-key": api_key,
|
||||
"Accept": "application/json, text/event-stream",
|
||||
"Accept-Encoding": "identity",
|
||||
},
|
||||
|
||||
Loading…
Reference in New Issue
Block a user