Compare commits

..

No commits in common. "5dbae039d46773b666c834e6b29342ab62c4b88b" and "951096a0f28885e01d8ae128f4d257a1b5a40205" have entirely different histories.

16 changed files with 40 additions and 1049 deletions

4
.vscode/launch.json vendored
View File

@ -9,11 +9,9 @@
"console": "integratedTerminal", "console": "integratedTerminal",
"justMyCode": true, "justMyCode": true,
"args": [ "args": [
"--prompt=马云在直播间卖红薯", "{\"prompt\":\"一只站在雨夜街头的白猫\",\"model\":\"jimeng-5.0\",\"negative_prompt\":\"模糊, 低清\",\"ratio\":\"16:9\",\"resolution\":\"2k\"}"
"--model=gpt-image-2"
], ],
"env": { "env": {
"ROBOT_WECHAT_CLIENT_PORT": "9001",
"ROBOT_FROM_WX_ID": "57004904192@chatroom", "ROBOT_FROM_WX_ID": "57004904192@chatroom",
"ROBOT_CODE": "houhouipad", "ROBOT_CODE": "houhouipad",
"MYSQL_HOST": "127.0.0.1", "MYSQL_HOST": "127.0.0.1",

View File

@ -55,8 +55,6 @@
**发送图片的时候也可以调用 Agent 接口** **发送图片的时候也可以调用 Agent 接口**
1. 发送远程图片地址
``` ```
[POST] http://127.0.0.1:{ROBOT_WECHAT_CLIENT_PORT}/api/v1//robot/message/send/image/url [POST] http://127.0.0.1:{ROBOT_WECHAT_CLIENT_PORT}/api/v1//robot/message/send/image/url
@ -69,20 +67,6 @@
``` ```
2. 发送本地图片路径
```
[POST] http://127.0.0.1:{ROBOT_WECHAT_CLIENT_PORT}/api/v1//robot/message/send/image/local
请求体 Body:
{
"to_wxid": "{{ROBOT_FROM_WX_ID}}",
"file_path": "{{file_path}}"
}
```
**发送视频的时候也可以调用 Agent 接口** **发送视频的时候也可以调用 Agent 接口**
``` ```

View File

@ -20,7 +20,7 @@ argument-hint: "无需参数,直接调用即可"
## 接口信息 ## 接口信息
- 获取图片地址:`https://api.pearapi.ai/api/today_wife` - 获取图片地址:`https://api.pearktrue.cn/api/today_wife`
- 请求方式:`GET` - 请求方式:`GET`
- 发图接口:`http://127.0.0.1:{ROBOT_WECHAT_CLIENT_PORT}/api/v1/robot/message/send/image/url` - 发图接口:`http://127.0.0.1:{ROBOT_WECHAT_CLIENT_PORT}/api/v1/robot/message/send/image/url`
- 请求方式:`POST` - 请求方式:`POST`
@ -32,12 +32,12 @@ argument-hint: "无需参数,直接调用即可"
"code": 200, "code": 200,
"msg": "获取成功", "msg": "获取成功",
"data": { "data": {
"image_url": "https://api.pearapi.ai/api_assets/wife/9a6a9c38-7d6e-464f-8930-eb9dac41cde9.webp", "image_url": "https://api.pearktrue.cn/api_assets/wife/9a6a9c38-7d6e-464f-8930-eb9dac41cde9.webp",
"role_name": "初音未来、巡音流歌", "role_name": "初音未来、巡音流歌",
"width": 2480, "width": 2480,
"height": 3508 "height": 3508
}, },
"api_source": "官方API网:https://api.pearapi.ai/" "api_source": "官方API网:https://api.pearktrue.cn/"
} }
``` ```
@ -52,7 +52,7 @@ argument-hint: "无需参数,直接调用即可"
1. 当用户发送 `999` 时触发该技能。 1. 当用户发送 `999` 时触发该技能。
2. 在仓库根目录下执行本地脚本:`python3 scripts/beauty.py`。 2. 在仓库根目录下执行本地脚本:`python3 scripts/beauty.py`。
3. 脚本内部发送 `GET` 请求到 `https://api.pearapi.ai/api/today_wife`。 3. 脚本内部发送 `GET` 请求到 `https://api.pearktrue.cn/api/today_wife`。
4. 脚本解析返回的 JSON并提取 `data.image_url` 4. 脚本解析返回的 JSON并提取 `data.image_url`
5. 脚本从环境变量中读取 `ROBOT_WECHAT_CLIENT_PORT``ROBOT_FROM_WX_ID` 5. 脚本从环境变量中读取 `ROBOT_WECHAT_CLIENT_PORT``ROBOT_FROM_WX_ID`
6. 脚本发送 `POST` 请求到 `http://127.0.0.1:{ROBOT_WECHAT_CLIENT_PORT}/api/v1/robot/message/send/image/url`,请求体为: 6. 脚本发送 `POST` 请求到 `http://127.0.0.1:{ROBOT_WECHAT_CLIENT_PORT}/api/v1/robot/message/send/image/url`,请求体为:

View File

@ -13,7 +13,7 @@ import urllib.request
sys.stderr = sys.stdout sys.stderr = sys.stdout
FETCH_API_URL = "https://api.pearapi.ai/api/today_wife" FETCH_API_URL = "https://api.pearktrue.cn/api/today_wife"
FALLBACK_TEXT = "今天的美女图片暂时没拿到,等我再找找。" FALLBACK_TEXT = "今天的美女图片暂时没拿到,等我再找找。"

View File

@ -1,53 +0,0 @@
---
name: douyin-video-parse
description: "当用户发送包含抖音短链接https://v.douyin.com/xxx的消息时触发。自动解析抖音视频/图片,并发送给当前用户。"
argument-hint: "消息中包含抖音短链接即可自动触发"
---
# Douyin Video Parse Skill
## 描述
这是一个用于解析抖音短视频/图片的技能。
当用户发送的消息中包含 `https://v.douyin.com/` 链接时,自动解析该链接对应的视频或图片,并通过本地微信机器人接口发送给当前用户。
这个仓库里额外提供了一个可执行脚本 `scripts/douyin_video_parse.py`,方便宿主机器人直接调用。
## 触发条件
- 用户消息中包含 `https://v.douyin.com/` 链接
## 解析原理
1. 访问抖音短链接,跟随 302 重定向获取真实页面 URL
2. 请求真实页面 HTML从中提取 `window._ROUTER_DATA` JSON 数据
3. 从 JSON 中解析出视频播放地址或图片列表
4. 通过本地微信机器人接口发送视频或图片
## 环境变量
- `ROBOT_WECHAT_CLIENT_PORT`:本地微信机器人服务端口。
- `ROBOT_FROM_WX_ID`:当前消息来源用户的 wxid。
- `ROBOT_MESSAGE_CONTENT`:用户发送的原始消息内容(用于提取抖音链接)。
## 执行步骤
1. 当用户消息中包含 `https://v.douyin.com/` 链接时触发该技能。
2. 在仓库根目录下执行本地脚本:`python3 scripts/douyin_video_parse.py`。
3. 脚本从环境变量 `ROBOT_MESSAGE_CONTENT` 中提取抖音短链接。
4. 脚本访问短链接,跟随重定向获取真实页面 URL。
5. 脚本请求真实页面,解析 `window._ROUTER_DATA` 中的视频/图片信息。
6. 如果是视频:
- 先发送分享卡片链接
- 再调用 `POST http://127.0.0.1:{ROBOT_WECHAT_CLIENT_PORT}/api/v1/robot/message/send/video/url` 发送视频
7. 如果是图片:
- 发送文字提示(作者、标题、图片数量)
- 调用 `POST http://127.0.0.1:{ROBOT_WECHAT_CLIENT_PORT}/api/v1/robot/message/send/image/url` 逐张发送图片
8. 如果解析失败,回复兜底文案:`抖音解析失败,可能是链接已失效或格式不正确。`
## 回复要求
- 视频类型:发送视频文件,附带作者和标题信息。
- 图片类型:发送所有图片,附带作者和标题信息。
- 失败时,使用固定兜底文案回复。

View File

@ -1,345 +0,0 @@
#!/usr/bin/env python3
from __future__ import annotations
import html
import json
import os
import re
import sys
import traceback
import urllib.error
import urllib.parse
import urllib.request
sys.stderr = sys.stdout
DOUYIN_USER_AGENT = (
"Mozilla/5.0 (iPhone; CPU iPhone OS 14_0 like Mac OS X) "
"AppleWebKit/605.1.15 (KHTML, like Gecko) "
"Version/14.0 Mobile/15E148 Safari/604.1"
)
DOUYIN_REFERER = "https://www.douyin.com/"
FALLBACK_TEXT = "抖音解析失败,可能是链接已失效或格式不正确。"
ROUTER_DATA_RE = re.compile(r"(?s)window\._ROUTER_DATA\s*=\s*(\{.*?\})\s*</script>")
DOUYIN_URL_RE = re.compile(r"https://[^\s]+")
def build_request(url: str) -> urllib.request.Request:
return urllib.request.Request(
url,
headers={
"User-Agent": DOUYIN_USER_AGENT,
"Referer": DOUYIN_REFERER,
},
)
def resolve_redirect(short_url: str) -> str | None:
"""Follow the 302 redirect to get the real page URL."""
class NoRedirectHandler(urllib.request.HTTPRedirectHandler):
def redirect_request(self, req, fp, code, msg, headers, newurl):
return None
opener = urllib.request.build_opener(NoRedirectHandler)
req = build_request(short_url)
try:
response = opener.open(req, timeout=15)
return response.url
except urllib.error.HTTPError as e:
location = e.headers.get("Location")
if location:
return location
return None
except (urllib.error.URLError, TimeoutError):
return None
def fetch_page_html(page_url: str) -> str | None:
"""Fetch the Douyin page HTML content."""
req = build_request(page_url)
try:
with urllib.request.urlopen(req, timeout=15) as response:
if response.status != 200:
return None
return response.read().decode("utf-8", errors="replace")
except (urllib.error.URLError, TimeoutError):
return None
def decode_escaped_value(value: str) -> str:
"""Decode HTML entities and JSON escape sequences."""
decoded = html.unescape(value)
if "\\" in decoded:
try:
unquoted = json.loads('"' + decoded.replace('"', '\\"') + '"')
decoded = unquoted
except (json.JSONDecodeError, ValueError):
pass
return html.unescape(decoded)
def pick_preferred_url(urls: list[str]) -> str:
"""Pick the best URL from a list, preferring p26 CDN."""
first_url = ""
for raw_url in urls:
if not raw_url:
continue
decoded_url = decode_escaped_value(raw_url)
if not decoded_url:
continue
if decoded_url.startswith("https://p26"):
return decoded_url
if not first_url:
first_url = decoded_url
return first_url
def pick_video_url(urls: list[str]) -> str:
"""Pick the best video URL, preferring aweme.snssdk.com."""
decoded_urls = []
for raw_url in urls:
if not raw_url:
continue
decoded_url = decode_escaped_value(raw_url).replace("playwm", "play")
decoded_urls.append(decoded_url)
for url in decoded_urls:
if "aweme.snssdk.com" in url:
return url
return decoded_urls[0] if decoded_urls else ""
def extract_aweme_item(html_content: str) -> dict | None:
"""Extract the first aweme item from _ROUTER_DATA."""
match = ROUTER_DATA_RE.search(html_content)
if not match:
return None
try:
router_data = json.loads(match.group(1))
except json.JSONDecodeError:
return None
loader_data = router_data.get("loaderData", {})
for page_data in loader_data.values():
if not isinstance(page_data, dict):
continue
video_info_res = page_data.get("videoInfoRes", {})
item_list = video_info_res.get("item_list", [])
if item_list:
return item_list[0]
return None
def parse_note_item(item: dict) -> dict | None:
"""Parse image/note type content."""
images = item.get("images") or item.get("image_infos") or []
if not images:
return None
image_urls = []
seen = set()
for img_info in images:
url_list = img_info.get("url_list", [])
for url in url_list:
if url and url.startswith("http"):
decoded = html.unescape(url)
if decoded not in seen:
image_urls.append(decoded)
seen.add(decoded)
break
if not image_urls:
return None
author = item.get("author", {})
music = item.get("music", {})
music_url = pick_preferred_url(music.get("play_url", {}).get("url_list", []))
# Fallback music URL from video play_addr
if not music_url:
video = item.get("video", {})
play_addr = video.get("play_addr", {})
uri = play_addr.get("uri", "")
if uri.startswith("http"):
music_url = decode_escaped_value(uri)
else:
music_url = pick_preferred_url(play_addr.get("url_list", []))
return {
"type": "note",
"author": html.unescape(author.get("nickname", "")),
"title": html.unescape(item.get("desc", "")),
"images": image_urls,
"music_url": music_url,
}
def parse_video_item(item: dict) -> dict | None:
"""Parse video type content."""
video = item.get("video", {})
duration = video.get("duration")
if duration is not None and duration == 0:
return None
play_addr = video.get("play_addr", {})
video_url = pick_video_url(play_addr.get("url_list", []))
if not video_url:
return None
author = item.get("author", {})
return {
"type": "video",
"author": html.unescape(author.get("nickname", "")),
"title": html.unescape(item.get("desc", "")),
"url": video_url,
"cover": pick_preferred_url(video.get("cover", {}).get("url_list", [])),
}
def parse_douyin(short_url: str) -> dict | None:
"""Main parsing logic: resolve redirect -> fetch HTML -> extract data."""
resolved_url = resolve_redirect(short_url)
if not resolved_url:
return None
html_content = fetch_page_html(resolved_url)
if not html_content:
return None
item = extract_aweme_item(html_content)
if not item:
return None
# Try note (images) first, then video
result = parse_note_item(item)
if result:
return result
result = parse_video_item(item)
if result:
return result
return None
def send_video(video_url: str, robot_port: str, to_wxid: str) -> bool:
"""Send video via local robot API."""
api_url = f"http://127.0.0.1:{robot_port}/api/v1/robot/message/send/video/url"
body = json.dumps({
"to_wxid": to_wxid,
"video_urls": [video_url],
}).encode("utf-8")
request = urllib.request.Request(
api_url,
data=body,
headers={"Content-Type": "application/json"},
method="POST",
)
try:
with urllib.request.urlopen(request, timeout=60) as response:
return 200 <= response.status < 300
except (urllib.error.URLError, TimeoutError):
return False
def send_images(image_urls: list[str], robot_port: str, to_wxid: str) -> bool:
"""Send images via local robot API."""
api_url = f"http://127.0.0.1:{robot_port}/api/v1/robot/message/send/image/url"
body = json.dumps({
"to_wxid": to_wxid,
"image_urls": image_urls,
}).encode("utf-8")
request = urllib.request.Request(
api_url,
data=body,
headers={"Content-Type": "application/json"},
method="POST",
)
try:
with urllib.request.urlopen(request, timeout=60) as response:
return 200 <= response.status < 300
except (urllib.error.URLError, TimeoutError):
return False
def send_text(text: str, robot_port: str, to_wxid: str) -> bool:
"""Send text message via local robot API."""
api_url = f"http://127.0.0.1:{robot_port}/api/v1/robot/message/send/text"
body = json.dumps({
"to_wxid": to_wxid,
"content": text,
}).encode("utf-8")
request = urllib.request.Request(
api_url,
data=body,
headers={"Content-Type": "application/json"},
method="POST",
)
try:
with urllib.request.urlopen(request, timeout=10) as response:
return 200 <= response.status < 300
except (urllib.error.URLError, TimeoutError):
return False
def main() -> int:
robot_port = os.environ.get("ROBOT_WECHAT_CLIENT_PORT", "").strip()
to_wxid = os.environ.get("ROBOT_FROM_WX_ID", "").strip()
message_content = os.environ.get("ROBOT_MESSAGE_CONTENT", "").strip()
if not robot_port or not to_wxid or not message_content:
sys.stdout.write(FALLBACK_TEXT + "\n")
return 0
# Extract douyin URL from message
matches = DOUYIN_URL_RE.findall(message_content)
douyin_urls = [u for u in matches if "v.douyin.com" in u]
if not douyin_urls:
sys.stdout.write(FALLBACK_TEXT + "\n")
return 0
douyin_url = douyin_urls[0]
result = parse_douyin(douyin_url)
if not result:
sys.stdout.write(FALLBACK_TEXT + "\n")
return 0
if result["type"] == "video":
# Send info text
info_text = f"抖音视频解析成功\n作者: {result['author']}\n标题: {result['title']}"
send_text(info_text, robot_port, to_wxid)
# Send video
if not send_video(result["url"], robot_port, to_wxid):
sys.stdout.write("发送抖音视频失败,请稍后重试。\n")
return 0
elif result["type"] == "note":
# Send info text
info_text = (
f"抖音图片解析成功\n"
f"作者: {result['author']}\n"
f"标题: {result['title']}\n\n"
f"{len(result['images'])}张图片正在发送中..."
)
send_text(info_text, robot_port, to_wxid)
# Send images
if not send_images(result["images"], robot_port, to_wxid):
sys.stdout.write("发送抖音图片失败,请稍后重试。\n")
return 0
return 0
if __name__ == "__main__":
try:
raise SystemExit(main())
except SystemExit:
raise
except Exception:
traceback.print_exc(file=sys.stdout)
raise SystemExit(1)

View File

@ -10,7 +10,7 @@ argument-hint: "需要 prompt提示词和 images图片链接列表
这是一个 AI 图生图技能,基于输入的一张或多张图片,结合文本提示词生成新的图片。支持图片混合、风格转换、内容合成等多种创作模式。 这是一个 AI 图生图技能,基于输入的一张或多张图片,结合文本提示词生成新的图片。支持图片混合、风格转换、内容合成等多种创作模式。
支持多个绘图模型即梦JiMeng、豆包DouBao、造相Z-Image、OpenAI GPT Image 支持多个绘图模型即梦JiMeng、豆包DouBao、造相Z-Image
从数据库中读取绘图配置API 密钥、Base URL 等),根据用户选择的模型调用对应的绘图 API返回生成的图片 URL。 从数据库中读取绘图配置API 密钥、Base URL 等),根据用户选择的模型调用对应的绘图 API返回生成的图片 URL。
@ -37,18 +37,16 @@ argument-hint: "需要 prompt提示词和 images图片链接列表
}, },
"model": { "model": {
"type": "string", "type": "string",
"description": "画图模型选择可选即梦4.5(jimeng-4.5) / 即梦4.6(jimeng-4.6) / 即梦4.7(jimeng-4.7) / 即梦5.0(jimeng-5.0) / 豆包图生图(doubao-seededit-3.0-i2i) / 造相基础版(Z-Image) / 造相蒸馏版(Z-Image-Turbo) / 造相图片编辑(Qwen-Image-Edit-2511) / OpenAI GPT Image(gpt-image-2),默认: 空(none)。", "description": "画图模型选择可选即梦4.5(jimeng-4.5) / 即梦4.6(jimeng-4.6) / 即梦5.0(jimeng-5.0) / 豆包图生图(doubao-seededit-3.0-i2i) / 造相基础版(Z-Image) / 造相蒸馏版(Z-Image-Turbo) / 造相图片编辑(Qwen-Image-Edit-2511),默认: 空(none)。",
"enum": [ "enum": [
"none", "none",
"jimeng-4.5", "jimeng-4.5",
"jimeng-4.6", "jimeng-4.6",
"jimeng-4.7",
"jimeng-5.0", "jimeng-5.0",
"doubao-seededit-3.0-i2i", "doubao-seededit-3.0-i2i",
"Z-Image", "Z-Image",
"Z-Image-Turbo", "Z-Image-Turbo",
"Qwen-Image-Edit-2511", "Qwen-Image-Edit-2511"
"gpt-image-2"
], ],
"default": "none" "default": "none"
}, },

View File

@ -3,17 +3,13 @@
from __future__ import annotations from __future__ import annotations
import argparse import argparse
import base64
import json import json
import mimetypes
import os import os
import re import re
import subprocess import subprocess
import sys import sys
import tempfile
import time import time
import traceback import traceback
import urllib.parse
import urllib.request import urllib.request
from pathlib import Path from pathlib import Path
@ -71,7 +67,6 @@ _ensure_skill_venv_python()
try: try:
import pymysql # type: ignore # noqa: E402 import pymysql # type: ignore # noqa: E402
from openai import OpenAI # type: ignore # noqa: E402
except ModuleNotFoundError: except ModuleNotFoundError:
_run_bootstrap() _run_bootstrap()
_py = _get_python_executable() _py = _get_python_executable()
@ -172,240 +167,6 @@ def _http_get_json(url: str, headers: dict, timeout: int = 30) -> dict:
return json.loads(resp.read().decode("utf-8")) return json.loads(resp.read().decode("utf-8"))
def _coerce_int(value, default: int, minimum: int, maximum: int) -> int:
try:
parsed = int(value)
except (TypeError, ValueError):
parsed = default
return min(max(parsed, minimum), maximum)
def _openai_output_format(config: dict) -> str:
output_format = str(config.get("output_format", "png") or "png").lower()
if output_format not in {"png", "jpeg", "webp"}:
return "png"
return output_format
def _openai_size(config: dict, ratio: str, resolution: str) -> str:
configured = str(config.get("size", "") or "").strip()
if configured:
return configured
normalized_ratio = (ratio or "").replace(" ", "").lower()
normalized_resolution = (resolution or "").replace(" ", "").lower()
if normalized_resolution in {"4k", "2160p", "3840x2160"}:
sizes = {
"16:9": "3840x2160",
"9:16": "2160x3840",
"1:1": "2048x2048",
"3:2": "3072x2048",
"2:3": "2048x3072",
}
elif normalized_resolution in {"2k", "1440p", "2048"}:
sizes = {
"16:9": "2048x1152",
"9:16": "1152x2048",
"1:1": "2048x2048",
"3:2": "2048x1360",
"2:3": "1360x2048",
}
elif normalized_resolution in {"1k", "1024", "1024p"}:
sizes = {
"16:9": "1536x864",
"9:16": "864x1536",
"1:1": "1024x1024",
"3:2": "1536x1024",
"2:3": "1024x1536",
}
else:
return "auto"
return sizes.get(normalized_ratio, "auto")
def _openai_prompt(prompt: str, negative_prompt: str) -> str:
if not negative_prompt:
return prompt
return f"{prompt}\n\n不要包含: {negative_prompt}"
def _openai_client(config: dict) -> OpenAI:
api_key = str(config.get("api_key", "")).strip()
if not api_key:
raise RuntimeError("OpenAI 绘图配置缺少 api_key")
base_url = str(config.get("base_url", "") or "").strip()
organization = str(config.get("organization", "") or "").strip()
project = str(config.get("project", "") or "").strip()
timeout: float | None = None
timeout_value = config.get("timeout")
if timeout_value not in (None, ""):
timeout = float(timeout_value)
return OpenAI(
api_key=api_key,
base_url=base_url or None,
organization=organization or None,
project=project or None,
timeout=timeout,
)
def _truncate_debug_payload(value):
if isinstance(value, dict):
return {
key: (
f"{item[:50]}..." if key == "b64_json" and isinstance(item, str) and len(item) > 50 else _truncate_debug_payload(item)
)
for key, item in value.items()
}
if isinstance(value, list):
return [_truncate_debug_payload(item) for item in value]
return value
def _debug_response(label: str, payload) -> None:
if hasattr(payload, "model_dump"):
payload = payload.model_dump()
payload = _truncate_debug_payload(payload)
sys.stdout.write(f"[debug] {label}: {json.dumps(payload, ensure_ascii=False)}\n")
def _rewrite_openai_image_url(url: str) -> str:
internal_host = "http://chatgpt2api:80"
external_host = "https://chatgpt2api.houhoukang.com"
if url.startswith(internal_host):
return f"{external_host}{url[len(internal_host):]}"
return url
def _extension_from_output_format(output_format: str) -> str:
if output_format == "jpeg":
return ".jpg"
if output_format == "webp":
return ".webp"
return ".png"
def _openai_response_value(item, key: str):
if isinstance(item, dict):
return item.get(key)
return getattr(item, key, None)
def _write_openai_b64_image(b64_json: str, output_format: str) -> str:
encoded = b64_json.strip()
suffix = _extension_from_output_format(output_format)
if encoded.startswith("data:"):
header, encoded = encoded.split(",", 1)
mime_type = header[5:].split(";", 1)[0].strip().lower()
if mime_type:
suffix = _extension_from_mime(mime_type)
encoded = "".join(encoded.split())
padding = len(encoded) % 4
if padding:
encoded = f"{encoded}{'=' * (4 - padding)}"
image_bytes = base64.b64decode(encoded)
with tempfile.NamedTemporaryFile(prefix="wechat-openai-image-", suffix=suffix, delete=False) as temp_file:
temp_file.write(image_bytes)
return temp_file.name
def _openai_images_from_response(response, output_format: str) -> list[str]:
outputs: list[str] = []
try:
for item in getattr(response, "data", []) or []:
b64_json = _openai_response_value(item, "b64_json")
if b64_json:
outputs.append(_write_openai_b64_image(str(b64_json), output_format))
continue
url = _openai_response_value(item, "url")
if url:
outputs.append(_rewrite_openai_image_url(str(url)))
except Exception:
_cleanup_openai_temp_files(outputs)
raise
return outputs
def _is_remote_image_url(value: str) -> bool:
return urllib.parse.urlparse(value).scheme in {"http", "https"}
def _send_image_outputs(client_port: str, from_wx_id: str, image_outputs: list[str]) -> None:
remote_urls = [value for value in image_outputs if value and _is_remote_image_url(value)]
local_paths = [value for value in image_outputs if value and not _is_remote_image_url(value)]
if remote_urls:
send_url = f"http://127.0.0.1:{client_port}/api/v1/robot/message/send/image/url"
send_body = {
"to_wxid": from_wx_id,
"image_urls": remote_urls,
}
response = _http_post_json(send_url, send_body, {"Content-Type": "application/json"}, timeout=300)
_debug_response("send image url response", response)
for file_path in local_paths:
send_url = f"http://127.0.0.1:{client_port}/api/v1/robot/message/send/image/local"
send_body = {
"to_wxid": from_wx_id,
"file_path": file_path,
}
response = _http_post_json(send_url, send_body, {"Content-Type": "application/json"}, timeout=300)
_debug_response("send image local response", response)
def _cleanup_openai_temp_files(image_outputs: list[str]) -> None:
for value in image_outputs:
path = Path(value)
if path.name.startswith("wechat-openai-image-") and path.is_file():
try:
path.unlink()
except OSError:
pass
def _extension_from_mime(mime_type: str) -> str:
if mime_type == "image/jpeg":
return ".jpg"
guessed = mimetypes.guess_extension(mime_type)
if guessed in {".png", ".jpg", ".jpeg", ".webp"}:
return guessed
return ".png"
def _download_openai_input_image(image: str, directory: str, index: int) -> Path:
stripped = image.strip()
if stripped.startswith("data:"):
header, encoded = stripped.split(",", 1)
mime_type = header[5:].split(";", 1)[0] or "image/png"
path = Path(directory) / f"input-{index}{_extension_from_mime(mime_type)}"
path.write_bytes(base64.b64decode(encoded))
return path
parsed = urllib.parse.urlparse(stripped)
if parsed.scheme in {"http", "https"}:
request = urllib.request.Request(stripped, headers={"User-Agent": "wechat-robot-skills/1.0"})
with urllib.request.urlopen(request, timeout=60) as response:
content_type = response.headers.get("Content-Type", "image/png").split(";", 1)[0].strip()
suffix = Path(parsed.path).suffix.lower()
if suffix not in {".png", ".jpg", ".jpeg", ".webp"}:
suffix = _extension_from_mime(content_type)
path = Path(directory) / f"input-{index}{suffix}"
path.write_bytes(response.read())
return path
path = Path(stripped).expanduser()
if path.is_file():
return path
raise RuntimeError(f"无法读取图片: {image}")
def call_jimeng(config: dict, prompt: str, model: str, images: list[str], def call_jimeng(config: dict, prompt: str, model: str, images: list[str],
negative_prompt: str, ratio: str, resolution: str) -> list[str]: negative_prompt: str, ratio: str, resolution: str) -> list[str]:
"""Call JiMeng (即梦) image compositions API (图生图).""" """Call JiMeng (即梦) image compositions API (图生图)."""
@ -548,53 +309,13 @@ def call_zimage(config: dict, prompt: str, model: str, images: list[str]) -> lis
raise RuntimeError("造相绘图任务超时") raise RuntimeError("造相绘图任务超时")
def call_openai(config: dict, prompt: str, model: str, images: list[str],
negative_prompt: str, ratio: str, resolution: str) -> list[str]:
"""Call OpenAI GPT Image API for image editing."""
client = _openai_client(config)
output_format = _openai_output_format(config)
quality = str(config.get("quality", "auto") or "auto")
background = str(config.get("background", "auto") or "auto")
if background == "transparent":
background = "auto"
with tempfile.TemporaryDirectory() as temp_dir:
input_paths = [
_download_openai_input_image(image, temp_dir, index)
for index, image in enumerate(images[:16], start=1)
]
input_files = [path.open("rb") for path in input_paths]
try:
kwargs = {
"model": model or "gpt-image-2",
"prompt": _openai_prompt(prompt, negative_prompt),
"image": input_files,
"n": _coerce_int(config.get("n"), 1, 1, 10),
"size": _openai_size(config, ratio, resolution),
"quality": quality,
"background": background,
"output_format": output_format,
}
if output_format in {"jpeg", "webp"} and config.get("output_compression") is not None:
kwargs["output_compression"] = _coerce_int(config.get("output_compression"), 100, 0, 100)
response = client.images.edit(**kwargs)
finally:
for input_file in input_files:
input_file.close()
_debug_response("openai images.edit response", response)
return _openai_images_from_response(response, output_format)
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Main # Main
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
JIMENG_MODELS = {"jimeng-4.5", "jimeng-4.6", "jimeng-4.7", "jimeng-5.0"} JIMENG_MODELS = {"jimeng-4.5", "jimeng-4.6", "jimeng-5.0"}
DOUBAO_MODELS = {"doubao-seededit-3.0-i2i"} DOUBAO_MODELS = {"doubao-seededit-3.0-i2i"}
ZIMAGE_MODELS = {"Z-Image", "Z-Image-Turbo", "Qwen-Image-Edit-2511"} ZIMAGE_MODELS = {"Z-Image", "Z-Image-Turbo", "Qwen-Image-Edit-2511"}
OPENAI_MODELS = {"gpt-image-2"}
def _parse_cli_params(argv: list[str]) -> dict: def _parse_cli_params(argv: list[str]) -> dict:
@ -703,13 +424,6 @@ def main() -> int:
return 0 return 0
image_urls = call_zimage(zimage_config, prompt, model, images) image_urls = call_zimage(zimage_config, prompt, model, images)
elif model in OPENAI_MODELS:
openai_config = settings_json.get("OpenAI", {})
if not openai_config.get("enabled", False):
sys.stdout.write("OpenAI 绘图未开启\n")
return 0
image_urls = call_openai(openai_config, prompt, model, images, negative_prompt, ratio, resolution)
else: else:
sys.stdout.write("不支持的 AI 图像模型\n") sys.stdout.write("不支持的 AI 图像模型\n")
return 1 return 1
@ -725,18 +439,20 @@ def main() -> int:
# 通过客户端接口发送图片 # 通过客户端接口发送图片
client_port = os.environ.get("ROBOT_WECHAT_CLIENT_PORT", "").strip() client_port = os.environ.get("ROBOT_WECHAT_CLIENT_PORT", "").strip()
if not client_port: if not client_port:
_cleanup_openai_temp_files(image_urls)
sys.stdout.write("环境变量 ROBOT_WECHAT_CLIENT_PORT 未配置\n") sys.stdout.write("环境变量 ROBOT_WECHAT_CLIENT_PORT 未配置\n")
return 1 return 1
send_url = f"http://127.0.0.1:{client_port}/api/v1/robot/message/send/image/url"
send_body = {
"to_wxid": from_wx_id,
"image_urls": [u for u in image_urls if u],
}
try: try:
_send_image_outputs(client_port, from_wx_id, image_urls) _http_post_json(send_url, send_body, {"Content-Type": "application/json"}, timeout=60)
sys.stdout.write("图片发送成功\n") sys.stdout.write("图片发送成功\n")
except Exception as exc: except Exception as exc:
sys.stdout.write(f"发送图片失败: {exc}\n") sys.stdout.write(f"发送图片失败: {exc}\n")
return 1 return 1
finally:
_cleanup_openai_temp_files(image_urls)
return 0 return 0

View File

@ -1,3 +1,2 @@
cryptography cryptography
openai>=2.34.0
pymysql>=1.1,<2 pymysql>=1.1,<2

View File

@ -23,7 +23,7 @@ argument-hint: "无需参数,直接调用即可"
## 接口信息 ## 接口信息
- 请求地址:`https://api.pearapi.ai/api/kfc?type=json` - 请求地址:`https://api.pearktrue.cn/api/kfc?type=json`
- 请求方式:`GET` - 请求方式:`GET`
- 本地脚本:`scripts/kfc.py` - 本地脚本:`scripts/kfc.py`
- 返回示例: - 返回示例:
@ -33,7 +33,7 @@ argument-hint: "无需参数,直接调用即可"
"code": 200, "code": 200,
"msg": "获取成功", "msg": "获取成功",
"text": "14看着不香果然还是13更香iPhone14真是更新了个寂寞......今天肯德基疯狂星期四,谁请我吃?", "text": "14看着不香果然还是13更香iPhone14真是更新了个寂寞......今天肯德基疯狂星期四,谁请我吃?",
"api_source": "官方API网:https://api.pearapi.ai/" "api_source": "官方API网:https://api.pearktrue.cn/"
} }
``` ```
@ -43,7 +43,7 @@ argument-hint: "无需参数,直接调用即可"
1. 当用户输入 `kfc`、`KFC`、`肯德基` 或 `肯德基文案` 时触发该技能。 1. 当用户输入 `kfc`、`KFC`、`肯德基` 或 `肯德基文案` 时触发该技能。
2. 在仓库根目录下执行本地脚本:`python3 scripts/kfc.py`。 2. 在仓库根目录下执行本地脚本:`python3 scripts/kfc.py`。
3. 脚本内部发送 `GET` 请求到 `https://api.pearapi.ai/api/kfc?type=json`。 3. 脚本内部发送 `GET` 请求到 `https://api.pearktrue.cn/api/kfc?type=json`。
4. 脚本解析返回的 JSON并输出 `text` 字段。 4. 脚本解析返回的 JSON并输出 `text` 字段。
5. 如果接口请求失败、返回格式异常,或没有拿到 `text`,脚本输出:`今天的肯德基文案暂时没拿到,等我再去问问。` 5. 如果接口请求失败、返回格式异常,或没有拿到 `text`,脚本输出:`今天的肯德基文案暂时没拿到,等我再去问问。`
6. 如果脚本无法执行Python 环境不可用),直接回复兜底文案:`今天的肯德基文案暂时没拿到,等我再去问问。` 6. 如果脚本无法执行Python 环境不可用),直接回复兜底文案:`今天的肯德基文案暂时没拿到,等我再去问问。`

View File

@ -12,7 +12,7 @@ import urllib.request
sys.stderr = sys.stdout sys.stderr = sys.stdout
API_URL = "https://api.pearapi.ai/api/kfc?type=json" API_URL = "https://api.pearktrue.cn/api/kfc?type=json"
FALLBACK_TEXT = "今天的肯德基文案暂时没拿到,等我再去问问。" FALLBACK_TEXT = "今天的肯德基文案暂时没拿到,等我再去问问。"

View File

@ -8,7 +8,7 @@ argument-hint: "需要 prompt 参数(画图提示词),可选 model
## 描述 ## 描述
这是一个 AI 文生图技能当用户想通过文本描述生成图像时触发。支持多个绘图模型即梦JiMeng、豆包DouBao、造相Z-Image、OpenAI GPT Image 这是一个 AI 文生图技能当用户想通过文本描述生成图像时触发。支持多个绘图模型即梦JiMeng、豆包DouBao、造相Z-Image
从数据库中读取绘图配置API 密钥、Base URL 等),根据用户选择的模型调用对应的绘图 API返回生成的图片 URL。 从数据库中读取绘图配置API 密钥、Base URL 等),根据用户选择的模型调用对应的绘图 API返回生成的图片 URL。
@ -35,12 +35,11 @@ argument-hint: "需要 prompt 参数(画图提示词),可选 model
}, },
"model": { "model": {
"type": "string", "type": "string",
"description": "画图模型选择可选即梦4.5(jimeng-4.5) / 即梦4.6(jimeng-4.6) / 即梦4.7(jimeng-4.7) / 即梦5.0(jimeng-5.0) / 豆包4.5(doubao-seedream-4.5) / 豆包4.0(doubao-seedream-4.0) / 豆包文生图(doubao-seedream-3.0-t2i) / 豆包图生图(doubao-seededit-3.0-i2i) / 造相基础版(Z-Image) / 造相蒸馏版(Z-Image-Turbo) / 造相图片编辑(Qwen-Image-Edit-2511) / OpenAI GPT Image(gpt-image-2),默认: 空(none)。", "description": "画图模型选择可选即梦4.5(jimeng-4.5) / 即梦4.6(jimeng-4.6) / 即梦5.0(jimeng-5.0) / 豆包4.5(doubao-seedream-4.5) / 豆包4.0(doubao-seedream-4.0) / 豆包文生图(doubao-seedream-3.0-t2i) / 豆包图生图(doubao-seededit-3.0-i2i) / 造相基础版(Z-Image) / 造相蒸馏版(Z-Image-Turbo) / 造相图片编辑(Qwen-Image-Edit-2511),默认: 空(none)。",
"enum": [ "enum": [
"none", "none",
"jimeng-4.5", "jimeng-4.5",
"jimeng-4.6", "jimeng-4.6",
"jimeng-4.7",
"jimeng-5.0", "jimeng-5.0",
"doubao-seedream-4.5", "doubao-seedream-4.5",
"doubao-seedream-4.0", "doubao-seedream-4.0",
@ -48,8 +47,7 @@ argument-hint: "需要 prompt 参数(画图提示词),可选 model
"doubao-seededit-3.0-i2i", "doubao-seededit-3.0-i2i",
"Z-Image", "Z-Image",
"Z-Image-Turbo", "Z-Image-Turbo",
"Qwen-Image-Edit-2511", "Qwen-Image-Edit-2511"
"gpt-image-2"
], ],
"default": "none" "default": "none"
}, },

View File

@ -1,3 +1,2 @@
cryptography cryptography
openai>=2.34.0
pymysql>=1.1,<2 pymysql>=1.1,<2

View File

@ -3,17 +3,13 @@
from __future__ import annotations from __future__ import annotations
import argparse import argparse
import base64
import json import json
import mimetypes
import os import os
import re import re
import subprocess import subprocess
import sys import sys
import tempfile
import time import time
import traceback import traceback
import urllib.parse
import urllib.request import urllib.request
from pathlib import Path from pathlib import Path
@ -71,7 +67,6 @@ _ensure_skill_venv_python()
try: try:
import pymysql # type: ignore # noqa: E402 import pymysql # type: ignore # noqa: E402
from openai import OpenAI # type: ignore # noqa: E402
except ModuleNotFoundError: except ModuleNotFoundError:
_run_bootstrap() _run_bootstrap()
_py = _get_python_executable() _py = _get_python_executable()
@ -174,213 +169,6 @@ def _http_get_json(url: str, headers: dict, timeout: int = 30) -> dict:
return json.loads(resp.read().decode("utf-8")) return json.loads(resp.read().decode("utf-8"))
def _coerce_int(value, default: int, minimum: int, maximum: int) -> int:
try:
parsed = int(value)
except (TypeError, ValueError):
parsed = default
return min(max(parsed, minimum), maximum)
def _openai_output_format(config: dict) -> str:
output_format = str(config.get("output_format", "png") or "png").lower()
if output_format not in {"png", "jpeg", "webp"}:
return "png"
return output_format
def _openai_size(config: dict, ratio: str, resolution: str) -> str:
configured = str(config.get("size", "") or "").strip()
if configured:
return configured
normalized_ratio = (ratio or "").replace(" ", "").lower()
normalized_resolution = (resolution or "").replace(" ", "").lower()
if normalized_resolution in {"4k", "2160p", "3840x2160"}:
sizes = {
"16:9": "3840x2160",
"9:16": "2160x3840",
"1:1": "2048x2048",
"3:2": "3072x2048",
"2:3": "2048x3072",
}
elif normalized_resolution in {"2k", "1440p", "2048"}:
sizes = {
"16:9": "2048x1152",
"9:16": "1152x2048",
"1:1": "2048x2048",
"3:2": "2048x1360",
"2:3": "1360x2048",
}
elif normalized_resolution in {"1k", "1024", "1024p"}:
sizes = {
"16:9": "1536x864",
"9:16": "864x1536",
"1:1": "1024x1024",
"3:2": "1536x1024",
"2:3": "1024x1536",
}
else:
return "auto"
return sizes.get(normalized_ratio, "auto")
def _openai_prompt(prompt: str, negative_prompt: str) -> str:
if not negative_prompt:
return prompt
return f"{prompt}\n\n不要包含: {negative_prompt}"
def _openai_client(config: dict) -> OpenAI:
api_key = str(config.get("api_key", "")).strip()
if not api_key:
raise RuntimeError("OpenAI 绘图配置缺少 api_key")
base_url = str(config.get("base_url", "") or "").strip()
organization = str(config.get("organization", "") or "").strip()
project = str(config.get("project", "") or "").strip()
timeout: float | None = None
timeout_value = config.get("timeout")
if timeout_value not in (None, ""):
timeout = float(timeout_value)
return OpenAI(
api_key=api_key,
base_url=base_url or None,
organization=organization or None,
project=project or None,
timeout=timeout,
)
def _truncate_debug_payload(value):
if isinstance(value, dict):
return {
key: (
f"{item[:50]}..." if key == "b64_json" and isinstance(item, str) and len(item) > 50 else _truncate_debug_payload(item)
)
for key, item in value.items()
}
if isinstance(value, list):
return [_truncate_debug_payload(item) for item in value]
return value
def _debug_response(label: str, payload) -> None:
if hasattr(payload, "model_dump"):
payload = payload.model_dump()
payload = _truncate_debug_payload(payload)
sys.stdout.write(f"[debug] {label}: {json.dumps(payload, ensure_ascii=False)}\n")
def _rewrite_openai_image_url(url: str) -> str:
internal_host = "http://chatgpt2api:80"
external_host = "https://chatgpt2api.houhoukang.com"
if url.startswith(internal_host):
return f"{external_host}{url[len(internal_host):]}"
return url
def _extension_from_mime(mime_type: str) -> str:
if mime_type == "image/jpeg":
return ".jpg"
guessed = mimetypes.guess_extension(mime_type)
if guessed in {".png", ".jpg", ".jpeg", ".webp"}:
return guessed
return ".png"
def _extension_from_output_format(output_format: str) -> str:
if output_format == "jpeg":
return ".jpg"
if output_format == "webp":
return ".webp"
return ".png"
def _openai_response_value(item, key: str):
if isinstance(item, dict):
return item.get(key)
return getattr(item, key, None)
def _write_openai_b64_image(b64_json: str, output_format: str) -> str:
encoded = b64_json.strip()
suffix = _extension_from_output_format(output_format)
if encoded.startswith("data:"):
header, encoded = encoded.split(",", 1)
mime_type = header[5:].split(";", 1)[0].strip().lower()
if mime_type:
suffix = _extension_from_mime(mime_type)
encoded = "".join(encoded.split())
padding = len(encoded) % 4
if padding:
encoded = f"{encoded}{'=' * (4 - padding)}"
image_bytes = base64.b64decode(encoded)
with tempfile.NamedTemporaryFile(prefix="wechat-openai-image-", suffix=suffix, delete=False) as temp_file:
temp_file.write(image_bytes)
return temp_file.name
def _openai_images_from_response(response, output_format: str) -> list[str]:
outputs: list[str] = []
try:
for item in getattr(response, "data", []) or []:
b64_json = _openai_response_value(item, "b64_json")
if b64_json:
outputs.append(_write_openai_b64_image(str(b64_json), output_format))
continue
url = _openai_response_value(item, "url")
if url:
outputs.append(_rewrite_openai_image_url(str(url)))
except Exception:
_cleanup_openai_temp_files(outputs)
raise
return outputs
def _is_remote_image_url(value: str) -> bool:
return urllib.parse.urlparse(value).scheme in {"http", "https"}
def _send_image_outputs(client_port: str, from_wx_id: str, image_outputs: list[str]) -> None:
remote_urls = [value for value in image_outputs if value and _is_remote_image_url(value)]
local_paths = [value for value in image_outputs if value and not _is_remote_image_url(value)]
if remote_urls:
send_url = f"http://127.0.0.1:{client_port}/api/v1/robot/message/send/image/url"
send_body = {
"to_wxid": from_wx_id,
"image_urls": remote_urls,
}
response = _http_post_json(send_url, send_body, {"Content-Type": "application/json"}, timeout=300)
_debug_response("send image url response", response)
for file_path in local_paths:
send_url = f"http://127.0.0.1:{client_port}/api/v1/robot/message/send/image/local"
send_body = {
"to_wxid": from_wx_id,
"file_path": file_path,
}
response = _http_post_json(send_url, send_body, {"Content-Type": "application/json"}, timeout=300)
_debug_response("send image local response", response)
def _cleanup_openai_temp_files(image_outputs: list[str]) -> None:
for value in image_outputs:
path = Path(value)
if path.name.startswith("wechat-openai-image-") and path.is_file():
try:
path.unlink()
except OSError:
pass
def call_jimeng(config: dict, prompt: str, model: str, def call_jimeng(config: dict, prompt: str, model: str,
negative_prompt: str, ratio: str, resolution: str) -> list[str]: negative_prompt: str, ratio: str, resolution: str) -> list[str]:
"""Call JiMeng (即梦) image generation API.""" """Call JiMeng (即梦) image generation API."""
@ -527,43 +315,13 @@ def call_zimage(config: dict, prompt: str, model: str) -> list[str]:
raise RuntimeError("造相绘图任务超时") raise RuntimeError("造相绘图任务超时")
def call_openai(config: dict, prompt: str, model: str,
negative_prompt: str, ratio: str, resolution: str) -> list[str]:
"""Call OpenAI GPT Image API for text-to-image generation."""
client = _openai_client(config)
output_format = _openai_output_format(config)
quality = str(config.get("quality", "auto") or "auto")
moderation = str(config.get("moderation", "auto") or "auto")
background = str(config.get("background", "auto") or "auto")
if background == "transparent":
background = "auto"
kwargs = {
"model": model or "gpt-image-2",
"prompt": _openai_prompt(prompt, negative_prompt),
"n": _coerce_int(config.get("n"), 1, 1, 10),
"size": _openai_size(config, ratio, resolution),
"quality": quality,
"background": background,
"moderation": moderation,
"output_format": output_format,
}
if output_format in {"jpeg", "webp"} and config.get("output_compression") is not None:
kwargs["output_compression"] = _coerce_int(config.get("output_compression"), 100, 0, 100)
response = client.images.generate(**kwargs)
_debug_response("openai images.generate response", response)
return _openai_images_from_response(response, output_format)
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Main # Main
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
JIMENG_MODELS = {"jimeng-4.5", "jimeng-4.6", "jimeng-4.7", "jimeng-5.0"} JIMENG_MODELS = {"jimeng-4.5", "jimeng-4.6", "jimeng-5.0"}
DOUBAO_MODELS = {"doubao-seedream-4.5", "doubao-seedream-4.0", "doubao-seedream-3.0-t2i", "doubao-seededit-3.0-i2i"} DOUBAO_MODELS = {"doubao-seedream-4.5", "doubao-seedream-4.0", "doubao-seedream-3.0-t2i", "doubao-seededit-3.0-i2i"}
ZIMAGE_MODELS = {"Z-Image", "Z-Image-Turbo", "Qwen-Image-Edit-2511"} ZIMAGE_MODELS = {"Z-Image", "Z-Image-Turbo", "Qwen-Image-Edit-2511"}
OPENAI_MODELS = {"gpt-image-2"}
def _parse_cli_params(argv: list[str]) -> dict[str, str]: def _parse_cli_params(argv: list[str]) -> dict[str, str]:
@ -665,13 +423,6 @@ def main() -> int:
return 0 return 0
image_urls = call_zimage(zimage_config, prompt, model) image_urls = call_zimage(zimage_config, prompt, model)
elif model in OPENAI_MODELS:
openai_config = settings_json.get("OpenAI", {})
if not openai_config.get("enabled", False):
sys.stdout.write("OpenAI 绘图未开启\n")
return 0
image_urls = call_openai(openai_config, prompt, model, negative_prompt, ratio, resolution)
else: else:
sys.stdout.write("不支持的 AI 图像模型\n") sys.stdout.write("不支持的 AI 图像模型\n")
return 1 return 1
@ -687,18 +438,20 @@ def main() -> int:
# 通过客户端接口发送图片 # 通过客户端接口发送图片
client_port = os.environ.get("ROBOT_WECHAT_CLIENT_PORT", "").strip() client_port = os.environ.get("ROBOT_WECHAT_CLIENT_PORT", "").strip()
if not client_port: if not client_port:
_cleanup_openai_temp_files(image_urls)
sys.stdout.write("环境变量 ROBOT_WECHAT_CLIENT_PORT 未配置\n") sys.stdout.write("环境变量 ROBOT_WECHAT_CLIENT_PORT 未配置\n")
return 1 return 1
send_url = f"http://127.0.0.1:{client_port}/api/v1/robot/message/send/image/url"
send_body = {
"to_wxid": from_wx_id,
"image_urls": [u for u in image_urls if u],
}
try: try:
_send_image_outputs(client_port, from_wx_id, image_urls) _http_post_json(send_url, send_body, {"Content-Type": "application/json"}, timeout=60)
sys.stdout.write("图片发送成功\n") sys.stdout.write("图片发送成功\n")
except Exception as exc: except Exception as exc:
sys.stdout.write(f"发送图片失败: {exc}\n") sys.stdout.write(f"发送图片失败: {exc}\n")
return 1 return 1
finally:
_cleanup_openai_temp_files(image_urls)
return 0 return 0

View File

@ -111,70 +111,6 @@ argument-hint: "需要 content可选 emotion、voice、style_prompt、voice_p
9. 不要传递音色复刻音频参数。若当前消息引用了一条语音消息,脚本会通过 `ROBOT_REF_MESSAGE_ID` 自动判断并下载引用语音作为复刻样本。 9. 不要传递音色复刻音频参数。若当前消息引用了一条语音消息,脚本会通过 `ROBOT_REF_MESSAGE_ID` 自动判断并下载引用语音作为复刻样本。
10. `content` 超过 260 个字符时,不应该调用本技能。 10. `content` 超过 260 个字符时,不应该调用本技能。
## 音频标签控制
通过在文本中嵌入风格标签与音频标签,直接对语音进行精细控制。开头是整体风格标签,中间可以插入细粒度控制标签。
在目标文本开头添加 `(风格)` 标签,即可指定语音的发音风格。支持同时设置多种风格,将多个风格名称置于同一对括号内,分隔符不限。
支持的括号格式: 可使用半角 `()`、全角 ```[]`
### 格式示例
```
风格类型 风格示例
基础情绪 开心/悲伤/愤怒/恐惧/惊讶/兴奋/委屈/平静/冷漠
复合情绪 怅然/欣慰/无奈/愧疚/释然/嫉妒/厌倦/忐忑/动情
整体语调 温柔/高冷/活泼/严肃/慵懒/俏皮/深沉/干练/凌厉
音色定位 磁性/醇厚/清亮/空灵/稚嫩/苍老/甜美/沙哑/醇雅
人设腔调 夹子音/御姐音/正太音/大叔音/台湾腔
方言 东北话/四川话/河南话/粤语
角色扮演 孙悟空/林黛玉
唱歌 唱歌
```
样例:
- (怅然)这么多年过去了,再走过那条街,心里一下子空了一块。
- (慵懒)再让我睡五分钟……就五分钟,真的,最后一次。
- (磁性)夜已经深了,城市还在呼吸。我是今晚陪你的人,欢迎收听《午夜电台》。
- (东北话)哎呀妈呀,这天儿也忒冷了吧!你说这风,嗖嗖的,跟刀子似的,割脸啊!
- (粤语)呢个真係好正啊!食过一次就唔会忘记!
- (唱歌)原谅我这一生不羁放纵爱自由也会怕有一天会跌倒Oh no。背弃了理想谁人都可以哪会怕有一天只你共我。
在此基础上,我们还支持在文本中任意位置插入 [音频标签]。通过 [音频标签] ,你可以对声音进行细粒度控制,精准调节语气、情绪和表达风格——无论是低声耳语、放声大笑,还是带点小情绪的小吐槽,也可以灵活插入呼吸声,停顿,咳嗽等,都能轻松实现。语速同样可以灵活调整,让每句话都有它该有的节奏。
```
风格类型 风格示例
语速与节奏 吸气/深呼吸/叹气/长叹一口气/喘息/屏息
情绪状态 紧张/害怕/激动/疲惫/委屈/撒娇/心虚/震惊/不耐烦
语音特征 颤抖/声音颤抖/变调/破音/鼻音/气声/沙哑
哭笑表达 笑/轻笑/大笑/冷笑/抽泣/呜咽/哽咽/嚎啕大哭
```
样例:
- (紧张,深呼吸)呼……冷静,冷静。不就是一个面试吗……(语速加快,碎碎念)自我介绍已经背了五十遍了,应该没问题的。加油,你可以的……(小声)哎呀,领带歪没歪?
- (极其疲惫,有气无力)师傅……到地方了叫我一声……(长叹一口气)我先眯一会儿,这班加得我魂儿都要散了。
- 如果我当时……(沉默片刻)哪怕再坚持一秒钟,结果是不是就不一样了?(苦笑)呵,没如果了。
- (寒冷导致的急促呼吸)呼——呼——这、这大兴安岭的雪……(咳嗽)简直能把人骨头冻透了……别、别停下,走,快走。
- (提高音量喊话)大姐!这鱼新鲜着呢!早上刚捞上来的!哎!那个谁,别乱翻,压坏了你赔啊?!
### 特别注意
- 只有`mimo-v2.5-tts`模型支持唱歌模式
- 如需体验更佳的唱歌风格,必须在目标文本最开头添加 `(唱歌)` 标签,格式为:`(唱歌)歌词`。歌词 建议采用中文,可获得更优合成效果。标签内标识支持以下取值,效果等效:`唱歌`、`sing`、`singing`
## 执行步骤 ## 执行步骤
1. 识别用户是否明确需要语音消息。 1. 识别用户是否明确需要语音消息。

View File

@ -740,6 +740,14 @@ def synthesize_audio_mimo(config: dict, params: dict) -> tuple[bytes, str]:
url = f"{base_url}/chat/completions" url = f"{base_url}/chat/completions"
payload, audio_format, stream = _build_mimo_payload(config, params) payload, audio_format, stream = _build_mimo_payload(config, params)
sys.stdout.write(
f"[mimo debug] config={json.dumps(config, ensure_ascii=False)}\n"
f"[mimo debug] url={url}\n"
f"[mimo debug] api_key={api_key}\n"
f"[mimo debug] model={payload.get('model')}\n"
)
request_data = json.dumps(payload, ensure_ascii=False).encode("utf-8") request_data = json.dumps(payload, ensure_ascii=False).encode("utf-8")
req = urllib.request.Request( req = urllib.request.Request(
@ -747,7 +755,7 @@ def synthesize_audio_mimo(config: dict, params: dict) -> tuple[bytes, str]:
data=request_data, data=request_data,
headers={ headers={
"Content-Type": "application/json", "Content-Type": "application/json",
"Authorization": f"Bearer {api_key}", "api-key": api_key,
"Accept": "application/json, text/event-stream", "Accept": "application/json, text/event-stream",
"Accept-Encoding": "identity", "Accept-Encoding": "identity",
}, },