Compare commits
10 Commits
951096a0f2
...
5dbae039d4
| Author | SHA1 | Date | |
|---|---|---|---|
| 5dbae039d4 | |||
| 4c4a3c5c95 | |||
| c5831daa55 | |||
| 708e56666a | |||
| 0a14269187 | |||
| 569533e9d9 | |||
| f812638e44 | |||
| 20ea36a340 | |||
| 66c4cb51e7 | |||
| 352a1dab1b |
4
.vscode/launch.json
vendored
4
.vscode/launch.json
vendored
@ -9,9 +9,11 @@
|
|||||||
"console": "integratedTerminal",
|
"console": "integratedTerminal",
|
||||||
"justMyCode": true,
|
"justMyCode": true,
|
||||||
"args": [
|
"args": [
|
||||||
"{\"prompt\":\"一只站在雨夜街头的白猫\",\"model\":\"jimeng-5.0\",\"negative_prompt\":\"模糊, 低清\",\"ratio\":\"16:9\",\"resolution\":\"2k\"}"
|
"--prompt=马云在直播间卖红薯",
|
||||||
|
"--model=gpt-image-2"
|
||||||
],
|
],
|
||||||
"env": {
|
"env": {
|
||||||
|
"ROBOT_WECHAT_CLIENT_PORT": "9001",
|
||||||
"ROBOT_FROM_WX_ID": "57004904192@chatroom",
|
"ROBOT_FROM_WX_ID": "57004904192@chatroom",
|
||||||
"ROBOT_CODE": "houhouipad",
|
"ROBOT_CODE": "houhouipad",
|
||||||
"MYSQL_HOST": "127.0.0.1",
|
"MYSQL_HOST": "127.0.0.1",
|
||||||
|
|||||||
16
README.md
16
README.md
@ -55,6 +55,8 @@
|
|||||||
|
|
||||||
**发送图片的时候也可以调用 Agent 接口**
|
**发送图片的时候也可以调用 Agent 接口**
|
||||||
|
|
||||||
|
1. 发送远程图片地址
|
||||||
|
|
||||||
```
|
```
|
||||||
[POST] http://127.0.0.1:{ROBOT_WECHAT_CLIENT_PORT}/api/v1//robot/message/send/image/url
|
[POST] http://127.0.0.1:{ROBOT_WECHAT_CLIENT_PORT}/api/v1//robot/message/send/image/url
|
||||||
|
|
||||||
@ -67,6 +69,20 @@
|
|||||||
|
|
||||||
```
|
```
|
||||||
|
|
||||||
|
2. 发送本地图片路径
|
||||||
|
|
||||||
|
```
|
||||||
|
[POST] http://127.0.0.1:{ROBOT_WECHAT_CLIENT_PORT}/api/v1//robot/message/send/image/local
|
||||||
|
|
||||||
|
请求体 Body:
|
||||||
|
|
||||||
|
{
|
||||||
|
"to_wxid": "{{ROBOT_FROM_WX_ID}}",
|
||||||
|
"file_path": "{{file_path}}"
|
||||||
|
}
|
||||||
|
|
||||||
|
```
|
||||||
|
|
||||||
**发送视频的时候也可以调用 Agent 接口**
|
**发送视频的时候也可以调用 Agent 接口**
|
||||||
|
|
||||||
```
|
```
|
||||||
|
|||||||
@ -20,7 +20,7 @@ argument-hint: "无需参数,直接调用即可"
|
|||||||
|
|
||||||
## 接口信息
|
## 接口信息
|
||||||
|
|
||||||
- 获取图片地址:`https://api.pearktrue.cn/api/today_wife`
|
- 获取图片地址:`https://api.pearapi.ai/api/today_wife`
|
||||||
- 请求方式:`GET`
|
- 请求方式:`GET`
|
||||||
- 发图接口:`http://127.0.0.1:{ROBOT_WECHAT_CLIENT_PORT}/api/v1/robot/message/send/image/url`
|
- 发图接口:`http://127.0.0.1:{ROBOT_WECHAT_CLIENT_PORT}/api/v1/robot/message/send/image/url`
|
||||||
- 请求方式:`POST`
|
- 请求方式:`POST`
|
||||||
@ -32,12 +32,12 @@ argument-hint: "无需参数,直接调用即可"
|
|||||||
"code": 200,
|
"code": 200,
|
||||||
"msg": "获取成功",
|
"msg": "获取成功",
|
||||||
"data": {
|
"data": {
|
||||||
"image_url": "https://api.pearktrue.cn/api_assets/wife/9a6a9c38-7d6e-464f-8930-eb9dac41cde9.webp",
|
"image_url": "https://api.pearapi.ai/api_assets/wife/9a6a9c38-7d6e-464f-8930-eb9dac41cde9.webp",
|
||||||
"role_name": "初音未来、巡音流歌",
|
"role_name": "初音未来、巡音流歌",
|
||||||
"width": 2480,
|
"width": 2480,
|
||||||
"height": 3508
|
"height": 3508
|
||||||
},
|
},
|
||||||
"api_source": "官方API网:https://api.pearktrue.cn/"
|
"api_source": "官方API网:https://api.pearapi.ai/"
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
@ -52,7 +52,7 @@ argument-hint: "无需参数,直接调用即可"
|
|||||||
|
|
||||||
1. 当用户发送 `999` 时触发该技能。
|
1. 当用户发送 `999` 时触发该技能。
|
||||||
2. 在仓库根目录下执行本地脚本:`python3 scripts/beauty.py`。
|
2. 在仓库根目录下执行本地脚本:`python3 scripts/beauty.py`。
|
||||||
3. 脚本内部发送 `GET` 请求到 `https://api.pearktrue.cn/api/today_wife`。
|
3. 脚本内部发送 `GET` 请求到 `https://api.pearapi.ai/api/today_wife`。
|
||||||
4. 脚本解析返回的 JSON,并提取 `data.image_url`。
|
4. 脚本解析返回的 JSON,并提取 `data.image_url`。
|
||||||
5. 脚本从环境变量中读取 `ROBOT_WECHAT_CLIENT_PORT` 和 `ROBOT_FROM_WX_ID`。
|
5. 脚本从环境变量中读取 `ROBOT_WECHAT_CLIENT_PORT` 和 `ROBOT_FROM_WX_ID`。
|
||||||
6. 脚本发送 `POST` 请求到 `http://127.0.0.1:{ROBOT_WECHAT_CLIENT_PORT}/api/v1/robot/message/send/image/url`,请求体为:
|
6. 脚本发送 `POST` 请求到 `http://127.0.0.1:{ROBOT_WECHAT_CLIENT_PORT}/api/v1/robot/message/send/image/url`,请求体为:
|
||||||
|
|||||||
@ -13,7 +13,7 @@ import urllib.request
|
|||||||
sys.stderr = sys.stdout
|
sys.stderr = sys.stdout
|
||||||
|
|
||||||
|
|
||||||
FETCH_API_URL = "https://api.pearktrue.cn/api/today_wife"
|
FETCH_API_URL = "https://api.pearapi.ai/api/today_wife"
|
||||||
FALLBACK_TEXT = "今天的美女图片暂时没拿到,等我再找找。"
|
FALLBACK_TEXT = "今天的美女图片暂时没拿到,等我再找找。"
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
53
skills/douyin-video-parse/SKILL.md
Normal file
53
skills/douyin-video-parse/SKILL.md
Normal file
@ -0,0 +1,53 @@
|
|||||||
|
---
|
||||||
|
name: douyin-video-parse
|
||||||
|
description: "当用户发送包含抖音短链接(https://v.douyin.com/xxx)的消息时触发。自动解析抖音视频/图片,并发送给当前用户。"
|
||||||
|
argument-hint: "消息中包含抖音短链接即可自动触发"
|
||||||
|
---
|
||||||
|
|
||||||
|
# Douyin Video Parse Skill
|
||||||
|
|
||||||
|
## 描述
|
||||||
|
|
||||||
|
这是一个用于解析抖音短视频/图片的技能。
|
||||||
|
|
||||||
|
当用户发送的消息中包含 `https://v.douyin.com/` 链接时,自动解析该链接对应的视频或图片,并通过本地微信机器人接口发送给当前用户。
|
||||||
|
|
||||||
|
这个仓库里额外提供了一个可执行脚本 `scripts/douyin_video_parse.py`,方便宿主机器人直接调用。
|
||||||
|
|
||||||
|
## 触发条件
|
||||||
|
|
||||||
|
- 用户消息中包含 `https://v.douyin.com/` 链接
|
||||||
|
|
||||||
|
## 解析原理
|
||||||
|
|
||||||
|
1. 访问抖音短链接,跟随 302 重定向获取真实页面 URL
|
||||||
|
2. 请求真实页面 HTML,从中提取 `window._ROUTER_DATA` JSON 数据
|
||||||
|
3. 从 JSON 中解析出视频播放地址或图片列表
|
||||||
|
4. 通过本地微信机器人接口发送视频或图片
|
||||||
|
|
||||||
|
## 环境变量
|
||||||
|
|
||||||
|
- `ROBOT_WECHAT_CLIENT_PORT`:本地微信机器人服务端口。
|
||||||
|
- `ROBOT_FROM_WX_ID`:当前消息来源用户的 wxid。
|
||||||
|
- `ROBOT_MESSAGE_CONTENT`:用户发送的原始消息内容(用于提取抖音链接)。
|
||||||
|
|
||||||
|
## 执行步骤
|
||||||
|
|
||||||
|
1. 当用户消息中包含 `https://v.douyin.com/` 链接时触发该技能。
|
||||||
|
2. 在仓库根目录下执行本地脚本:`python3 scripts/douyin_video_parse.py`。
|
||||||
|
3. 脚本从环境变量 `ROBOT_MESSAGE_CONTENT` 中提取抖音短链接。
|
||||||
|
4. 脚本访问短链接,跟随重定向获取真实页面 URL。
|
||||||
|
5. 脚本请求真实页面,解析 `window._ROUTER_DATA` 中的视频/图片信息。
|
||||||
|
6. 如果是视频:
|
||||||
|
- 先发送分享卡片链接
|
||||||
|
- 再调用 `POST http://127.0.0.1:{ROBOT_WECHAT_CLIENT_PORT}/api/v1/robot/message/send/video/url` 发送视频
|
||||||
|
7. 如果是图片:
|
||||||
|
- 发送文字提示(作者、标题、图片数量)
|
||||||
|
- 调用 `POST http://127.0.0.1:{ROBOT_WECHAT_CLIENT_PORT}/api/v1/robot/message/send/image/url` 逐张发送图片
|
||||||
|
8. 如果解析失败,回复兜底文案:`抖音解析失败,可能是链接已失效或格式不正确。`
|
||||||
|
|
||||||
|
## 回复要求
|
||||||
|
|
||||||
|
- 视频类型:发送视频文件,附带作者和标题信息。
|
||||||
|
- 图片类型:发送所有图片,附带作者和标题信息。
|
||||||
|
- 失败时,使用固定兜底文案回复。
|
||||||
345
skills/douyin-video-parse/scripts/douyin_video_parse.py
Normal file
345
skills/douyin-video-parse/scripts/douyin_video_parse.py
Normal file
@ -0,0 +1,345 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import html
|
||||||
|
import json
|
||||||
|
import os
|
||||||
|
import re
|
||||||
|
import sys
|
||||||
|
import traceback
|
||||||
|
import urllib.error
|
||||||
|
import urllib.parse
|
||||||
|
import urllib.request
|
||||||
|
|
||||||
|
|
||||||
|
sys.stderr = sys.stdout
|
||||||
|
|
||||||
|
|
||||||
|
DOUYIN_USER_AGENT = (
|
||||||
|
"Mozilla/5.0 (iPhone; CPU iPhone OS 14_0 like Mac OS X) "
|
||||||
|
"AppleWebKit/605.1.15 (KHTML, like Gecko) "
|
||||||
|
"Version/14.0 Mobile/15E148 Safari/604.1"
|
||||||
|
)
|
||||||
|
DOUYIN_REFERER = "https://www.douyin.com/"
|
||||||
|
FALLBACK_TEXT = "抖音解析失败,可能是链接已失效或格式不正确。"
|
||||||
|
ROUTER_DATA_RE = re.compile(r"(?s)window\._ROUTER_DATA\s*=\s*(\{.*?\})\s*</script>")
|
||||||
|
DOUYIN_URL_RE = re.compile(r"https://[^\s]+")
|
||||||
|
|
||||||
|
|
||||||
|
def build_request(url: str) -> urllib.request.Request:
|
||||||
|
return urllib.request.Request(
|
||||||
|
url,
|
||||||
|
headers={
|
||||||
|
"User-Agent": DOUYIN_USER_AGENT,
|
||||||
|
"Referer": DOUYIN_REFERER,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def resolve_redirect(short_url: str) -> str | None:
|
||||||
|
"""Follow the 302 redirect to get the real page URL."""
|
||||||
|
|
||||||
|
class NoRedirectHandler(urllib.request.HTTPRedirectHandler):
|
||||||
|
def redirect_request(self, req, fp, code, msg, headers, newurl):
|
||||||
|
return None
|
||||||
|
|
||||||
|
opener = urllib.request.build_opener(NoRedirectHandler)
|
||||||
|
req = build_request(short_url)
|
||||||
|
try:
|
||||||
|
response = opener.open(req, timeout=15)
|
||||||
|
return response.url
|
||||||
|
except urllib.error.HTTPError as e:
|
||||||
|
location = e.headers.get("Location")
|
||||||
|
if location:
|
||||||
|
return location
|
||||||
|
return None
|
||||||
|
except (urllib.error.URLError, TimeoutError):
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def fetch_page_html(page_url: str) -> str | None:
|
||||||
|
"""Fetch the Douyin page HTML content."""
|
||||||
|
req = build_request(page_url)
|
||||||
|
try:
|
||||||
|
with urllib.request.urlopen(req, timeout=15) as response:
|
||||||
|
if response.status != 200:
|
||||||
|
return None
|
||||||
|
return response.read().decode("utf-8", errors="replace")
|
||||||
|
except (urllib.error.URLError, TimeoutError):
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def decode_escaped_value(value: str) -> str:
|
||||||
|
"""Decode HTML entities and JSON escape sequences."""
|
||||||
|
decoded = html.unescape(value)
|
||||||
|
if "\\" in decoded:
|
||||||
|
try:
|
||||||
|
unquoted = json.loads('"' + decoded.replace('"', '\\"') + '"')
|
||||||
|
decoded = unquoted
|
||||||
|
except (json.JSONDecodeError, ValueError):
|
||||||
|
pass
|
||||||
|
return html.unescape(decoded)
|
||||||
|
|
||||||
|
|
||||||
|
def pick_preferred_url(urls: list[str]) -> str:
|
||||||
|
"""Pick the best URL from a list, preferring p26 CDN."""
|
||||||
|
first_url = ""
|
||||||
|
for raw_url in urls:
|
||||||
|
if not raw_url:
|
||||||
|
continue
|
||||||
|
decoded_url = decode_escaped_value(raw_url)
|
||||||
|
if not decoded_url:
|
||||||
|
continue
|
||||||
|
if decoded_url.startswith("https://p26"):
|
||||||
|
return decoded_url
|
||||||
|
if not first_url:
|
||||||
|
first_url = decoded_url
|
||||||
|
return first_url
|
||||||
|
|
||||||
|
|
||||||
|
def pick_video_url(urls: list[str]) -> str:
|
||||||
|
"""Pick the best video URL, preferring aweme.snssdk.com."""
|
||||||
|
decoded_urls = []
|
||||||
|
for raw_url in urls:
|
||||||
|
if not raw_url:
|
||||||
|
continue
|
||||||
|
decoded_url = decode_escaped_value(raw_url).replace("playwm", "play")
|
||||||
|
decoded_urls.append(decoded_url)
|
||||||
|
|
||||||
|
for url in decoded_urls:
|
||||||
|
if "aweme.snssdk.com" in url:
|
||||||
|
return url
|
||||||
|
return decoded_urls[0] if decoded_urls else ""
|
||||||
|
|
||||||
|
|
||||||
|
def extract_aweme_item(html_content: str) -> dict | None:
|
||||||
|
"""Extract the first aweme item from _ROUTER_DATA."""
|
||||||
|
match = ROUTER_DATA_RE.search(html_content)
|
||||||
|
if not match:
|
||||||
|
return None
|
||||||
|
|
||||||
|
try:
|
||||||
|
router_data = json.loads(match.group(1))
|
||||||
|
except json.JSONDecodeError:
|
||||||
|
return None
|
||||||
|
|
||||||
|
loader_data = router_data.get("loaderData", {})
|
||||||
|
for page_data in loader_data.values():
|
||||||
|
if not isinstance(page_data, dict):
|
||||||
|
continue
|
||||||
|
video_info_res = page_data.get("videoInfoRes", {})
|
||||||
|
item_list = video_info_res.get("item_list", [])
|
||||||
|
if item_list:
|
||||||
|
return item_list[0]
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def parse_note_item(item: dict) -> dict | None:
|
||||||
|
"""Parse image/note type content."""
|
||||||
|
images = item.get("images") or item.get("image_infos") or []
|
||||||
|
if not images:
|
||||||
|
return None
|
||||||
|
|
||||||
|
image_urls = []
|
||||||
|
seen = set()
|
||||||
|
for img_info in images:
|
||||||
|
url_list = img_info.get("url_list", [])
|
||||||
|
for url in url_list:
|
||||||
|
if url and url.startswith("http"):
|
||||||
|
decoded = html.unescape(url)
|
||||||
|
if decoded not in seen:
|
||||||
|
image_urls.append(decoded)
|
||||||
|
seen.add(decoded)
|
||||||
|
break
|
||||||
|
|
||||||
|
if not image_urls:
|
||||||
|
return None
|
||||||
|
|
||||||
|
author = item.get("author", {})
|
||||||
|
music = item.get("music", {})
|
||||||
|
music_url = pick_preferred_url(music.get("play_url", {}).get("url_list", []))
|
||||||
|
|
||||||
|
# Fallback music URL from video play_addr
|
||||||
|
if not music_url:
|
||||||
|
video = item.get("video", {})
|
||||||
|
play_addr = video.get("play_addr", {})
|
||||||
|
uri = play_addr.get("uri", "")
|
||||||
|
if uri.startswith("http"):
|
||||||
|
music_url = decode_escaped_value(uri)
|
||||||
|
else:
|
||||||
|
music_url = pick_preferred_url(play_addr.get("url_list", []))
|
||||||
|
|
||||||
|
return {
|
||||||
|
"type": "note",
|
||||||
|
"author": html.unescape(author.get("nickname", "")),
|
||||||
|
"title": html.unescape(item.get("desc", "")),
|
||||||
|
"images": image_urls,
|
||||||
|
"music_url": music_url,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def parse_video_item(item: dict) -> dict | None:
|
||||||
|
"""Parse video type content."""
|
||||||
|
video = item.get("video", {})
|
||||||
|
duration = video.get("duration")
|
||||||
|
if duration is not None and duration == 0:
|
||||||
|
return None
|
||||||
|
|
||||||
|
play_addr = video.get("play_addr", {})
|
||||||
|
video_url = pick_video_url(play_addr.get("url_list", []))
|
||||||
|
if not video_url:
|
||||||
|
return None
|
||||||
|
|
||||||
|
author = item.get("author", {})
|
||||||
|
return {
|
||||||
|
"type": "video",
|
||||||
|
"author": html.unescape(author.get("nickname", "")),
|
||||||
|
"title": html.unescape(item.get("desc", "")),
|
||||||
|
"url": video_url,
|
||||||
|
"cover": pick_preferred_url(video.get("cover", {}).get("url_list", [])),
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def parse_douyin(short_url: str) -> dict | None:
|
||||||
|
"""Main parsing logic: resolve redirect -> fetch HTML -> extract data."""
|
||||||
|
resolved_url = resolve_redirect(short_url)
|
||||||
|
if not resolved_url:
|
||||||
|
return None
|
||||||
|
|
||||||
|
html_content = fetch_page_html(resolved_url)
|
||||||
|
if not html_content:
|
||||||
|
return None
|
||||||
|
|
||||||
|
item = extract_aweme_item(html_content)
|
||||||
|
if not item:
|
||||||
|
return None
|
||||||
|
|
||||||
|
# Try note (images) first, then video
|
||||||
|
result = parse_note_item(item)
|
||||||
|
if result:
|
||||||
|
return result
|
||||||
|
|
||||||
|
result = parse_video_item(item)
|
||||||
|
if result:
|
||||||
|
return result
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def send_video(video_url: str, robot_port: str, to_wxid: str) -> bool:
|
||||||
|
"""Send video via local robot API."""
|
||||||
|
api_url = f"http://127.0.0.1:{robot_port}/api/v1/robot/message/send/video/url"
|
||||||
|
body = json.dumps({
|
||||||
|
"to_wxid": to_wxid,
|
||||||
|
"video_urls": [video_url],
|
||||||
|
}).encode("utf-8")
|
||||||
|
request = urllib.request.Request(
|
||||||
|
api_url,
|
||||||
|
data=body,
|
||||||
|
headers={"Content-Type": "application/json"},
|
||||||
|
method="POST",
|
||||||
|
)
|
||||||
|
try:
|
||||||
|
with urllib.request.urlopen(request, timeout=60) as response:
|
||||||
|
return 200 <= response.status < 300
|
||||||
|
except (urllib.error.URLError, TimeoutError):
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def send_images(image_urls: list[str], robot_port: str, to_wxid: str) -> bool:
|
||||||
|
"""Send images via local robot API."""
|
||||||
|
api_url = f"http://127.0.0.1:{robot_port}/api/v1/robot/message/send/image/url"
|
||||||
|
body = json.dumps({
|
||||||
|
"to_wxid": to_wxid,
|
||||||
|
"image_urls": image_urls,
|
||||||
|
}).encode("utf-8")
|
||||||
|
request = urllib.request.Request(
|
||||||
|
api_url,
|
||||||
|
data=body,
|
||||||
|
headers={"Content-Type": "application/json"},
|
||||||
|
method="POST",
|
||||||
|
)
|
||||||
|
try:
|
||||||
|
with urllib.request.urlopen(request, timeout=60) as response:
|
||||||
|
return 200 <= response.status < 300
|
||||||
|
except (urllib.error.URLError, TimeoutError):
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def send_text(text: str, robot_port: str, to_wxid: str) -> bool:
|
||||||
|
"""Send text message via local robot API."""
|
||||||
|
api_url = f"http://127.0.0.1:{robot_port}/api/v1/robot/message/send/text"
|
||||||
|
body = json.dumps({
|
||||||
|
"to_wxid": to_wxid,
|
||||||
|
"content": text,
|
||||||
|
}).encode("utf-8")
|
||||||
|
request = urllib.request.Request(
|
||||||
|
api_url,
|
||||||
|
data=body,
|
||||||
|
headers={"Content-Type": "application/json"},
|
||||||
|
method="POST",
|
||||||
|
)
|
||||||
|
try:
|
||||||
|
with urllib.request.urlopen(request, timeout=10) as response:
|
||||||
|
return 200 <= response.status < 300
|
||||||
|
except (urllib.error.URLError, TimeoutError):
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def main() -> int:
|
||||||
|
robot_port = os.environ.get("ROBOT_WECHAT_CLIENT_PORT", "").strip()
|
||||||
|
to_wxid = os.environ.get("ROBOT_FROM_WX_ID", "").strip()
|
||||||
|
message_content = os.environ.get("ROBOT_MESSAGE_CONTENT", "").strip()
|
||||||
|
|
||||||
|
if not robot_port or not to_wxid or not message_content:
|
||||||
|
sys.stdout.write(FALLBACK_TEXT + "\n")
|
||||||
|
return 0
|
||||||
|
|
||||||
|
# Extract douyin URL from message
|
||||||
|
matches = DOUYIN_URL_RE.findall(message_content)
|
||||||
|
douyin_urls = [u for u in matches if "v.douyin.com" in u]
|
||||||
|
if not douyin_urls:
|
||||||
|
sys.stdout.write(FALLBACK_TEXT + "\n")
|
||||||
|
return 0
|
||||||
|
|
||||||
|
douyin_url = douyin_urls[0]
|
||||||
|
result = parse_douyin(douyin_url)
|
||||||
|
if not result:
|
||||||
|
sys.stdout.write(FALLBACK_TEXT + "\n")
|
||||||
|
return 0
|
||||||
|
|
||||||
|
if result["type"] == "video":
|
||||||
|
# Send info text
|
||||||
|
info_text = f"抖音视频解析成功\n作者: {result['author']}\n标题: {result['title']}"
|
||||||
|
send_text(info_text, robot_port, to_wxid)
|
||||||
|
# Send video
|
||||||
|
if not send_video(result["url"], robot_port, to_wxid):
|
||||||
|
sys.stdout.write("发送抖音视频失败,请稍后重试。\n")
|
||||||
|
return 0
|
||||||
|
|
||||||
|
elif result["type"] == "note":
|
||||||
|
# Send info text
|
||||||
|
info_text = (
|
||||||
|
f"抖音图片解析成功\n"
|
||||||
|
f"作者: {result['author']}\n"
|
||||||
|
f"标题: {result['title']}\n\n"
|
||||||
|
f"{len(result['images'])}张图片正在发送中..."
|
||||||
|
)
|
||||||
|
send_text(info_text, robot_port, to_wxid)
|
||||||
|
# Send images
|
||||||
|
if not send_images(result["images"], robot_port, to_wxid):
|
||||||
|
sys.stdout.write("发送抖音图片失败,请稍后重试。\n")
|
||||||
|
return 0
|
||||||
|
|
||||||
|
return 0
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
try:
|
||||||
|
raise SystemExit(main())
|
||||||
|
except SystemExit:
|
||||||
|
raise
|
||||||
|
except Exception:
|
||||||
|
traceback.print_exc(file=sys.stdout)
|
||||||
|
raise SystemExit(1)
|
||||||
@ -10,7 +10,7 @@ argument-hint: "需要 prompt(提示词)和 images(图片链接列表)
|
|||||||
|
|
||||||
这是一个 AI 图生图技能,基于输入的一张或多张图片,结合文本提示词生成新的图片。支持图片混合、风格转换、内容合成等多种创作模式。
|
这是一个 AI 图生图技能,基于输入的一张或多张图片,结合文本提示词生成新的图片。支持图片混合、风格转换、内容合成等多种创作模式。
|
||||||
|
|
||||||
支持多个绘图模型:即梦(JiMeng)、豆包(DouBao)、造相(Z-Image)。
|
支持多个绘图模型:即梦(JiMeng)、豆包(DouBao)、造相(Z-Image)、OpenAI GPT Image。
|
||||||
|
|
||||||
从数据库中读取绘图配置(API 密钥、Base URL 等),根据用户选择的模型调用对应的绘图 API,返回生成的图片 URL。
|
从数据库中读取绘图配置(API 密钥、Base URL 等),根据用户选择的模型调用对应的绘图 API,返回生成的图片 URL。
|
||||||
|
|
||||||
@ -37,16 +37,18 @@ argument-hint: "需要 prompt(提示词)和 images(图片链接列表)
|
|||||||
},
|
},
|
||||||
"model": {
|
"model": {
|
||||||
"type": "string",
|
"type": "string",
|
||||||
"description": "画图模型选择(可选):即梦4.5(jimeng-4.5) / 即梦4.6(jimeng-4.6) / 即梦5.0(jimeng-5.0) / 豆包图生图(doubao-seededit-3.0-i2i) / 造相基础版(Z-Image) / 造相蒸馏版(Z-Image-Turbo) / 造相图片编辑(Qwen-Image-Edit-2511),默认: 空(none)。",
|
"description": "画图模型选择(可选):即梦4.5(jimeng-4.5) / 即梦4.6(jimeng-4.6) / 即梦4.7(jimeng-4.7) / 即梦5.0(jimeng-5.0) / 豆包图生图(doubao-seededit-3.0-i2i) / 造相基础版(Z-Image) / 造相蒸馏版(Z-Image-Turbo) / 造相图片编辑(Qwen-Image-Edit-2511) / OpenAI GPT Image(gpt-image-2),默认: 空(none)。",
|
||||||
"enum": [
|
"enum": [
|
||||||
"none",
|
"none",
|
||||||
"jimeng-4.5",
|
"jimeng-4.5",
|
||||||
"jimeng-4.6",
|
"jimeng-4.6",
|
||||||
|
"jimeng-4.7",
|
||||||
"jimeng-5.0",
|
"jimeng-5.0",
|
||||||
"doubao-seededit-3.0-i2i",
|
"doubao-seededit-3.0-i2i",
|
||||||
"Z-Image",
|
"Z-Image",
|
||||||
"Z-Image-Turbo",
|
"Z-Image-Turbo",
|
||||||
"Qwen-Image-Edit-2511"
|
"Qwen-Image-Edit-2511",
|
||||||
|
"gpt-image-2"
|
||||||
],
|
],
|
||||||
"default": "none"
|
"default": "none"
|
||||||
},
|
},
|
||||||
|
|||||||
@ -3,13 +3,17 @@
|
|||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
import argparse
|
import argparse
|
||||||
|
import base64
|
||||||
import json
|
import json
|
||||||
|
import mimetypes
|
||||||
import os
|
import os
|
||||||
import re
|
import re
|
||||||
import subprocess
|
import subprocess
|
||||||
import sys
|
import sys
|
||||||
|
import tempfile
|
||||||
import time
|
import time
|
||||||
import traceback
|
import traceback
|
||||||
|
import urllib.parse
|
||||||
import urllib.request
|
import urllib.request
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
@ -67,6 +71,7 @@ _ensure_skill_venv_python()
|
|||||||
|
|
||||||
try:
|
try:
|
||||||
import pymysql # type: ignore # noqa: E402
|
import pymysql # type: ignore # noqa: E402
|
||||||
|
from openai import OpenAI # type: ignore # noqa: E402
|
||||||
except ModuleNotFoundError:
|
except ModuleNotFoundError:
|
||||||
_run_bootstrap()
|
_run_bootstrap()
|
||||||
_py = _get_python_executable()
|
_py = _get_python_executable()
|
||||||
@ -167,6 +172,240 @@ def _http_get_json(url: str, headers: dict, timeout: int = 30) -> dict:
|
|||||||
return json.loads(resp.read().decode("utf-8"))
|
return json.loads(resp.read().decode("utf-8"))
|
||||||
|
|
||||||
|
|
||||||
|
def _coerce_int(value, default: int, minimum: int, maximum: int) -> int:
|
||||||
|
try:
|
||||||
|
parsed = int(value)
|
||||||
|
except (TypeError, ValueError):
|
||||||
|
parsed = default
|
||||||
|
return min(max(parsed, minimum), maximum)
|
||||||
|
|
||||||
|
|
||||||
|
def _openai_output_format(config: dict) -> str:
|
||||||
|
output_format = str(config.get("output_format", "png") or "png").lower()
|
||||||
|
if output_format not in {"png", "jpeg", "webp"}:
|
||||||
|
return "png"
|
||||||
|
return output_format
|
||||||
|
|
||||||
|
|
||||||
|
def _openai_size(config: dict, ratio: str, resolution: str) -> str:
|
||||||
|
configured = str(config.get("size", "") or "").strip()
|
||||||
|
if configured:
|
||||||
|
return configured
|
||||||
|
|
||||||
|
normalized_ratio = (ratio or "").replace(" ", "").lower()
|
||||||
|
normalized_resolution = (resolution or "").replace(" ", "").lower()
|
||||||
|
|
||||||
|
if normalized_resolution in {"4k", "2160p", "3840x2160"}:
|
||||||
|
sizes = {
|
||||||
|
"16:9": "3840x2160",
|
||||||
|
"9:16": "2160x3840",
|
||||||
|
"1:1": "2048x2048",
|
||||||
|
"3:2": "3072x2048",
|
||||||
|
"2:3": "2048x3072",
|
||||||
|
}
|
||||||
|
elif normalized_resolution in {"2k", "1440p", "2048"}:
|
||||||
|
sizes = {
|
||||||
|
"16:9": "2048x1152",
|
||||||
|
"9:16": "1152x2048",
|
||||||
|
"1:1": "2048x2048",
|
||||||
|
"3:2": "2048x1360",
|
||||||
|
"2:3": "1360x2048",
|
||||||
|
}
|
||||||
|
elif normalized_resolution in {"1k", "1024", "1024p"}:
|
||||||
|
sizes = {
|
||||||
|
"16:9": "1536x864",
|
||||||
|
"9:16": "864x1536",
|
||||||
|
"1:1": "1024x1024",
|
||||||
|
"3:2": "1536x1024",
|
||||||
|
"2:3": "1024x1536",
|
||||||
|
}
|
||||||
|
else:
|
||||||
|
return "auto"
|
||||||
|
|
||||||
|
return sizes.get(normalized_ratio, "auto")
|
||||||
|
|
||||||
|
|
||||||
|
def _openai_prompt(prompt: str, negative_prompt: str) -> str:
|
||||||
|
if not negative_prompt:
|
||||||
|
return prompt
|
||||||
|
return f"{prompt}\n\n不要包含: {negative_prompt}"
|
||||||
|
|
||||||
|
|
||||||
|
def _openai_client(config: dict) -> OpenAI:
|
||||||
|
api_key = str(config.get("api_key", "")).strip()
|
||||||
|
if not api_key:
|
||||||
|
raise RuntimeError("OpenAI 绘图配置缺少 api_key")
|
||||||
|
|
||||||
|
base_url = str(config.get("base_url", "") or "").strip()
|
||||||
|
organization = str(config.get("organization", "") or "").strip()
|
||||||
|
project = str(config.get("project", "") or "").strip()
|
||||||
|
timeout: float | None = None
|
||||||
|
timeout_value = config.get("timeout")
|
||||||
|
if timeout_value not in (None, ""):
|
||||||
|
timeout = float(timeout_value)
|
||||||
|
|
||||||
|
return OpenAI(
|
||||||
|
api_key=api_key,
|
||||||
|
base_url=base_url or None,
|
||||||
|
organization=organization or None,
|
||||||
|
project=project or None,
|
||||||
|
timeout=timeout,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _truncate_debug_payload(value):
|
||||||
|
if isinstance(value, dict):
|
||||||
|
return {
|
||||||
|
key: (
|
||||||
|
f"{item[:50]}..." if key == "b64_json" and isinstance(item, str) and len(item) > 50 else _truncate_debug_payload(item)
|
||||||
|
)
|
||||||
|
for key, item in value.items()
|
||||||
|
}
|
||||||
|
if isinstance(value, list):
|
||||||
|
return [_truncate_debug_payload(item) for item in value]
|
||||||
|
return value
|
||||||
|
|
||||||
|
|
||||||
|
def _debug_response(label: str, payload) -> None:
|
||||||
|
if hasattr(payload, "model_dump"):
|
||||||
|
payload = payload.model_dump()
|
||||||
|
payload = _truncate_debug_payload(payload)
|
||||||
|
sys.stdout.write(f"[debug] {label}: {json.dumps(payload, ensure_ascii=False)}\n")
|
||||||
|
|
||||||
|
|
||||||
|
def _rewrite_openai_image_url(url: str) -> str:
|
||||||
|
internal_host = "http://chatgpt2api:80"
|
||||||
|
external_host = "https://chatgpt2api.houhoukang.com"
|
||||||
|
if url.startswith(internal_host):
|
||||||
|
return f"{external_host}{url[len(internal_host):]}"
|
||||||
|
return url
|
||||||
|
|
||||||
|
|
||||||
|
def _extension_from_output_format(output_format: str) -> str:
|
||||||
|
if output_format == "jpeg":
|
||||||
|
return ".jpg"
|
||||||
|
if output_format == "webp":
|
||||||
|
return ".webp"
|
||||||
|
return ".png"
|
||||||
|
|
||||||
|
|
||||||
|
def _openai_response_value(item, key: str):
|
||||||
|
if isinstance(item, dict):
|
||||||
|
return item.get(key)
|
||||||
|
return getattr(item, key, None)
|
||||||
|
|
||||||
|
|
||||||
|
def _write_openai_b64_image(b64_json: str, output_format: str) -> str:
|
||||||
|
encoded = b64_json.strip()
|
||||||
|
suffix = _extension_from_output_format(output_format)
|
||||||
|
if encoded.startswith("data:"):
|
||||||
|
header, encoded = encoded.split(",", 1)
|
||||||
|
mime_type = header[5:].split(";", 1)[0].strip().lower()
|
||||||
|
if mime_type:
|
||||||
|
suffix = _extension_from_mime(mime_type)
|
||||||
|
|
||||||
|
encoded = "".join(encoded.split())
|
||||||
|
padding = len(encoded) % 4
|
||||||
|
if padding:
|
||||||
|
encoded = f"{encoded}{'=' * (4 - padding)}"
|
||||||
|
|
||||||
|
image_bytes = base64.b64decode(encoded)
|
||||||
|
with tempfile.NamedTemporaryFile(prefix="wechat-openai-image-", suffix=suffix, delete=False) as temp_file:
|
||||||
|
temp_file.write(image_bytes)
|
||||||
|
return temp_file.name
|
||||||
|
|
||||||
|
|
||||||
|
def _openai_images_from_response(response, output_format: str) -> list[str]:
|
||||||
|
outputs: list[str] = []
|
||||||
|
try:
|
||||||
|
for item in getattr(response, "data", []) or []:
|
||||||
|
b64_json = _openai_response_value(item, "b64_json")
|
||||||
|
if b64_json:
|
||||||
|
outputs.append(_write_openai_b64_image(str(b64_json), output_format))
|
||||||
|
continue
|
||||||
|
|
||||||
|
url = _openai_response_value(item, "url")
|
||||||
|
if url:
|
||||||
|
outputs.append(_rewrite_openai_image_url(str(url)))
|
||||||
|
except Exception:
|
||||||
|
_cleanup_openai_temp_files(outputs)
|
||||||
|
raise
|
||||||
|
return outputs
|
||||||
|
|
||||||
|
|
||||||
|
def _is_remote_image_url(value: str) -> bool:
|
||||||
|
return urllib.parse.urlparse(value).scheme in {"http", "https"}
|
||||||
|
|
||||||
|
|
||||||
|
def _send_image_outputs(client_port: str, from_wx_id: str, image_outputs: list[str]) -> None:
|
||||||
|
remote_urls = [value for value in image_outputs if value and _is_remote_image_url(value)]
|
||||||
|
local_paths = [value for value in image_outputs if value and not _is_remote_image_url(value)]
|
||||||
|
|
||||||
|
if remote_urls:
|
||||||
|
send_url = f"http://127.0.0.1:{client_port}/api/v1/robot/message/send/image/url"
|
||||||
|
send_body = {
|
||||||
|
"to_wxid": from_wx_id,
|
||||||
|
"image_urls": remote_urls,
|
||||||
|
}
|
||||||
|
response = _http_post_json(send_url, send_body, {"Content-Type": "application/json"}, timeout=300)
|
||||||
|
_debug_response("send image url response", response)
|
||||||
|
|
||||||
|
for file_path in local_paths:
|
||||||
|
send_url = f"http://127.0.0.1:{client_port}/api/v1/robot/message/send/image/local"
|
||||||
|
send_body = {
|
||||||
|
"to_wxid": from_wx_id,
|
||||||
|
"file_path": file_path,
|
||||||
|
}
|
||||||
|
response = _http_post_json(send_url, send_body, {"Content-Type": "application/json"}, timeout=300)
|
||||||
|
_debug_response("send image local response", response)
|
||||||
|
|
||||||
|
|
||||||
|
def _cleanup_openai_temp_files(image_outputs: list[str]) -> None:
|
||||||
|
for value in image_outputs:
|
||||||
|
path = Path(value)
|
||||||
|
if path.name.startswith("wechat-openai-image-") and path.is_file():
|
||||||
|
try:
|
||||||
|
path.unlink()
|
||||||
|
except OSError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
def _extension_from_mime(mime_type: str) -> str:
|
||||||
|
if mime_type == "image/jpeg":
|
||||||
|
return ".jpg"
|
||||||
|
guessed = mimetypes.guess_extension(mime_type)
|
||||||
|
if guessed in {".png", ".jpg", ".jpeg", ".webp"}:
|
||||||
|
return guessed
|
||||||
|
return ".png"
|
||||||
|
|
||||||
|
|
||||||
|
def _download_openai_input_image(image: str, directory: str, index: int) -> Path:
|
||||||
|
stripped = image.strip()
|
||||||
|
if stripped.startswith("data:"):
|
||||||
|
header, encoded = stripped.split(",", 1)
|
||||||
|
mime_type = header[5:].split(";", 1)[0] or "image/png"
|
||||||
|
path = Path(directory) / f"input-{index}{_extension_from_mime(mime_type)}"
|
||||||
|
path.write_bytes(base64.b64decode(encoded))
|
||||||
|
return path
|
||||||
|
|
||||||
|
parsed = urllib.parse.urlparse(stripped)
|
||||||
|
if parsed.scheme in {"http", "https"}:
|
||||||
|
request = urllib.request.Request(stripped, headers={"User-Agent": "wechat-robot-skills/1.0"})
|
||||||
|
with urllib.request.urlopen(request, timeout=60) as response:
|
||||||
|
content_type = response.headers.get("Content-Type", "image/png").split(";", 1)[0].strip()
|
||||||
|
suffix = Path(parsed.path).suffix.lower()
|
||||||
|
if suffix not in {".png", ".jpg", ".jpeg", ".webp"}:
|
||||||
|
suffix = _extension_from_mime(content_type)
|
||||||
|
path = Path(directory) / f"input-{index}{suffix}"
|
||||||
|
path.write_bytes(response.read())
|
||||||
|
return path
|
||||||
|
|
||||||
|
path = Path(stripped).expanduser()
|
||||||
|
if path.is_file():
|
||||||
|
return path
|
||||||
|
raise RuntimeError(f"无法读取图片: {image}")
|
||||||
|
|
||||||
|
|
||||||
def call_jimeng(config: dict, prompt: str, model: str, images: list[str],
|
def call_jimeng(config: dict, prompt: str, model: str, images: list[str],
|
||||||
negative_prompt: str, ratio: str, resolution: str) -> list[str]:
|
negative_prompt: str, ratio: str, resolution: str) -> list[str]:
|
||||||
"""Call JiMeng (即梦) image compositions API (图生图)."""
|
"""Call JiMeng (即梦) image compositions API (图生图)."""
|
||||||
@ -309,13 +548,53 @@ def call_zimage(config: dict, prompt: str, model: str, images: list[str]) -> lis
|
|||||||
raise RuntimeError("造相绘图任务超时")
|
raise RuntimeError("造相绘图任务超时")
|
||||||
|
|
||||||
|
|
||||||
|
def call_openai(config: dict, prompt: str, model: str, images: list[str],
|
||||||
|
negative_prompt: str, ratio: str, resolution: str) -> list[str]:
|
||||||
|
"""Call OpenAI GPT Image API for image editing."""
|
||||||
|
client = _openai_client(config)
|
||||||
|
output_format = _openai_output_format(config)
|
||||||
|
quality = str(config.get("quality", "auto") or "auto")
|
||||||
|
background = str(config.get("background", "auto") or "auto")
|
||||||
|
if background == "transparent":
|
||||||
|
background = "auto"
|
||||||
|
|
||||||
|
with tempfile.TemporaryDirectory() as temp_dir:
|
||||||
|
input_paths = [
|
||||||
|
_download_openai_input_image(image, temp_dir, index)
|
||||||
|
for index, image in enumerate(images[:16], start=1)
|
||||||
|
]
|
||||||
|
input_files = [path.open("rb") for path in input_paths]
|
||||||
|
try:
|
||||||
|
kwargs = {
|
||||||
|
"model": model or "gpt-image-2",
|
||||||
|
"prompt": _openai_prompt(prompt, negative_prompt),
|
||||||
|
"image": input_files,
|
||||||
|
"n": _coerce_int(config.get("n"), 1, 1, 10),
|
||||||
|
"size": _openai_size(config, ratio, resolution),
|
||||||
|
"quality": quality,
|
||||||
|
"background": background,
|
||||||
|
"output_format": output_format,
|
||||||
|
}
|
||||||
|
if output_format in {"jpeg", "webp"} and config.get("output_compression") is not None:
|
||||||
|
kwargs["output_compression"] = _coerce_int(config.get("output_compression"), 100, 0, 100)
|
||||||
|
|
||||||
|
response = client.images.edit(**kwargs)
|
||||||
|
finally:
|
||||||
|
for input_file in input_files:
|
||||||
|
input_file.close()
|
||||||
|
|
||||||
|
_debug_response("openai images.edit response", response)
|
||||||
|
return _openai_images_from_response(response, output_format)
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
# Main
|
# Main
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
JIMENG_MODELS = {"jimeng-4.5", "jimeng-4.6", "jimeng-5.0"}
|
JIMENG_MODELS = {"jimeng-4.5", "jimeng-4.6", "jimeng-4.7", "jimeng-5.0"}
|
||||||
DOUBAO_MODELS = {"doubao-seededit-3.0-i2i"}
|
DOUBAO_MODELS = {"doubao-seededit-3.0-i2i"}
|
||||||
ZIMAGE_MODELS = {"Z-Image", "Z-Image-Turbo", "Qwen-Image-Edit-2511"}
|
ZIMAGE_MODELS = {"Z-Image", "Z-Image-Turbo", "Qwen-Image-Edit-2511"}
|
||||||
|
OPENAI_MODELS = {"gpt-image-2"}
|
||||||
|
|
||||||
|
|
||||||
def _parse_cli_params(argv: list[str]) -> dict:
|
def _parse_cli_params(argv: list[str]) -> dict:
|
||||||
@ -424,6 +703,13 @@ def main() -> int:
|
|||||||
return 0
|
return 0
|
||||||
image_urls = call_zimage(zimage_config, prompt, model, images)
|
image_urls = call_zimage(zimage_config, prompt, model, images)
|
||||||
|
|
||||||
|
elif model in OPENAI_MODELS:
|
||||||
|
openai_config = settings_json.get("OpenAI", {})
|
||||||
|
if not openai_config.get("enabled", False):
|
||||||
|
sys.stdout.write("OpenAI 绘图未开启\n")
|
||||||
|
return 0
|
||||||
|
image_urls = call_openai(openai_config, prompt, model, images, negative_prompt, ratio, resolution)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
sys.stdout.write("不支持的 AI 图像模型\n")
|
sys.stdout.write("不支持的 AI 图像模型\n")
|
||||||
return 1
|
return 1
|
||||||
@ -439,20 +725,18 @@ def main() -> int:
|
|||||||
# 通过客户端接口发送图片
|
# 通过客户端接口发送图片
|
||||||
client_port = os.environ.get("ROBOT_WECHAT_CLIENT_PORT", "").strip()
|
client_port = os.environ.get("ROBOT_WECHAT_CLIENT_PORT", "").strip()
|
||||||
if not client_port:
|
if not client_port:
|
||||||
|
_cleanup_openai_temp_files(image_urls)
|
||||||
sys.stdout.write("环境变量 ROBOT_WECHAT_CLIENT_PORT 未配置\n")
|
sys.stdout.write("环境变量 ROBOT_WECHAT_CLIENT_PORT 未配置\n")
|
||||||
return 1
|
return 1
|
||||||
|
|
||||||
send_url = f"http://127.0.0.1:{client_port}/api/v1/robot/message/send/image/url"
|
|
||||||
send_body = {
|
|
||||||
"to_wxid": from_wx_id,
|
|
||||||
"image_urls": [u for u in image_urls if u],
|
|
||||||
}
|
|
||||||
try:
|
try:
|
||||||
_http_post_json(send_url, send_body, {"Content-Type": "application/json"}, timeout=60)
|
_send_image_outputs(client_port, from_wx_id, image_urls)
|
||||||
sys.stdout.write("图片发送成功\n")
|
sys.stdout.write("图片发送成功\n")
|
||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
sys.stdout.write(f"发送图片失败: {exc}\n")
|
sys.stdout.write(f"发送图片失败: {exc}\n")
|
||||||
return 1
|
return 1
|
||||||
|
finally:
|
||||||
|
_cleanup_openai_temp_files(image_urls)
|
||||||
|
|
||||||
return 0
|
return 0
|
||||||
|
|
||||||
|
|||||||
@ -1,2 +1,3 @@
|
|||||||
cryptography
|
cryptography
|
||||||
|
openai>=2.34.0
|
||||||
pymysql>=1.1,<2
|
pymysql>=1.1,<2
|
||||||
|
|||||||
@ -23,7 +23,7 @@ argument-hint: "无需参数,直接调用即可"
|
|||||||
|
|
||||||
## 接口信息
|
## 接口信息
|
||||||
|
|
||||||
- 请求地址:`https://api.pearktrue.cn/api/kfc?type=json`
|
- 请求地址:`https://api.pearapi.ai/api/kfc?type=json`
|
||||||
- 请求方式:`GET`
|
- 请求方式:`GET`
|
||||||
- 本地脚本:`scripts/kfc.py`
|
- 本地脚本:`scripts/kfc.py`
|
||||||
- 返回示例:
|
- 返回示例:
|
||||||
@ -33,7 +33,7 @@ argument-hint: "无需参数,直接调用即可"
|
|||||||
"code": 200,
|
"code": 200,
|
||||||
"msg": "获取成功",
|
"msg": "获取成功",
|
||||||
"text": "14看着不香,果然还是13更香,iPhone14真是更新了个寂寞!......今天肯德基疯狂星期四,谁请我吃?",
|
"text": "14看着不香,果然还是13更香,iPhone14真是更新了个寂寞!......今天肯德基疯狂星期四,谁请我吃?",
|
||||||
"api_source": "官方API网:https://api.pearktrue.cn/"
|
"api_source": "官方API网:https://api.pearapi.ai/"
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
@ -43,7 +43,7 @@ argument-hint: "无需参数,直接调用即可"
|
|||||||
|
|
||||||
1. 当用户输入 `kfc`、`KFC`、`肯德基` 或 `肯德基文案` 时触发该技能。
|
1. 当用户输入 `kfc`、`KFC`、`肯德基` 或 `肯德基文案` 时触发该技能。
|
||||||
2. 在仓库根目录下执行本地脚本:`python3 scripts/kfc.py`。
|
2. 在仓库根目录下执行本地脚本:`python3 scripts/kfc.py`。
|
||||||
3. 脚本内部发送 `GET` 请求到 `https://api.pearktrue.cn/api/kfc?type=json`。
|
3. 脚本内部发送 `GET` 请求到 `https://api.pearapi.ai/api/kfc?type=json`。
|
||||||
4. 脚本解析返回的 JSON,并输出 `text` 字段。
|
4. 脚本解析返回的 JSON,并输出 `text` 字段。
|
||||||
5. 如果接口请求失败、返回格式异常,或没有拿到 `text`,脚本输出:`今天的肯德基文案暂时没拿到,等我再去问问。`
|
5. 如果接口请求失败、返回格式异常,或没有拿到 `text`,脚本输出:`今天的肯德基文案暂时没拿到,等我再去问问。`
|
||||||
6. 如果脚本无法执行(Python 环境不可用),直接回复兜底文案:`今天的肯德基文案暂时没拿到,等我再去问问。`
|
6. 如果脚本无法执行(Python 环境不可用),直接回复兜底文案:`今天的肯德基文案暂时没拿到,等我再去问问。`
|
||||||
|
|||||||
@ -12,7 +12,7 @@ import urllib.request
|
|||||||
sys.stderr = sys.stdout
|
sys.stderr = sys.stdout
|
||||||
|
|
||||||
|
|
||||||
API_URL = "https://api.pearktrue.cn/api/kfc?type=json"
|
API_URL = "https://api.pearapi.ai/api/kfc?type=json"
|
||||||
FALLBACK_TEXT = "今天的肯德基文案暂时没拿到,等我再去问问。"
|
FALLBACK_TEXT = "今天的肯德基文案暂时没拿到,等我再去问问。"
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@ -8,7 +8,7 @@ argument-hint: "需要 prompt 参数(画图提示词),可选 model(模
|
|||||||
|
|
||||||
## 描述
|
## 描述
|
||||||
|
|
||||||
这是一个 AI 文生图技能,当用户想通过文本描述生成图像时触发。支持多个绘图模型:即梦(JiMeng)、豆包(DouBao)、造相(Z-Image)。
|
这是一个 AI 文生图技能,当用户想通过文本描述生成图像时触发。支持多个绘图模型:即梦(JiMeng)、豆包(DouBao)、造相(Z-Image)、OpenAI GPT Image。
|
||||||
|
|
||||||
从数据库中读取绘图配置(API 密钥、Base URL 等),根据用户选择的模型调用对应的绘图 API,返回生成的图片 URL。
|
从数据库中读取绘图配置(API 密钥、Base URL 等),根据用户选择的模型调用对应的绘图 API,返回生成的图片 URL。
|
||||||
|
|
||||||
@ -35,11 +35,12 @@ argument-hint: "需要 prompt 参数(画图提示词),可选 model(模
|
|||||||
},
|
},
|
||||||
"model": {
|
"model": {
|
||||||
"type": "string",
|
"type": "string",
|
||||||
"description": "画图模型选择(可选):即梦4.5(jimeng-4.5) / 即梦4.6(jimeng-4.6) / 即梦5.0(jimeng-5.0) / 豆包4.5(doubao-seedream-4.5) / 豆包4.0(doubao-seedream-4.0) / 豆包文生图(doubao-seedream-3.0-t2i) / 豆包图生图(doubao-seededit-3.0-i2i) / 造相基础版(Z-Image) / 造相蒸馏版(Z-Image-Turbo) / 造相图片编辑(Qwen-Image-Edit-2511),默认: 空(none)。",
|
"description": "画图模型选择(可选):即梦4.5(jimeng-4.5) / 即梦4.6(jimeng-4.6) / 即梦4.7(jimeng-4.7) / 即梦5.0(jimeng-5.0) / 豆包4.5(doubao-seedream-4.5) / 豆包4.0(doubao-seedream-4.0) / 豆包文生图(doubao-seedream-3.0-t2i) / 豆包图生图(doubao-seededit-3.0-i2i) / 造相基础版(Z-Image) / 造相蒸馏版(Z-Image-Turbo) / 造相图片编辑(Qwen-Image-Edit-2511) / OpenAI GPT Image(gpt-image-2),默认: 空(none)。",
|
||||||
"enum": [
|
"enum": [
|
||||||
"none",
|
"none",
|
||||||
"jimeng-4.5",
|
"jimeng-4.5",
|
||||||
"jimeng-4.6",
|
"jimeng-4.6",
|
||||||
|
"jimeng-4.7",
|
||||||
"jimeng-5.0",
|
"jimeng-5.0",
|
||||||
"doubao-seedream-4.5",
|
"doubao-seedream-4.5",
|
||||||
"doubao-seedream-4.0",
|
"doubao-seedream-4.0",
|
||||||
@ -47,7 +48,8 @@ argument-hint: "需要 prompt 参数(画图提示词),可选 model(模
|
|||||||
"doubao-seededit-3.0-i2i",
|
"doubao-seededit-3.0-i2i",
|
||||||
"Z-Image",
|
"Z-Image",
|
||||||
"Z-Image-Turbo",
|
"Z-Image-Turbo",
|
||||||
"Qwen-Image-Edit-2511"
|
"Qwen-Image-Edit-2511",
|
||||||
|
"gpt-image-2"
|
||||||
],
|
],
|
||||||
"default": "none"
|
"default": "none"
|
||||||
},
|
},
|
||||||
|
|||||||
@ -1,2 +1,3 @@
|
|||||||
cryptography
|
cryptography
|
||||||
|
openai>=2.34.0
|
||||||
pymysql>=1.1,<2
|
pymysql>=1.1,<2
|
||||||
@ -3,13 +3,17 @@
|
|||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
import argparse
|
import argparse
|
||||||
|
import base64
|
||||||
import json
|
import json
|
||||||
|
import mimetypes
|
||||||
import os
|
import os
|
||||||
import re
|
import re
|
||||||
import subprocess
|
import subprocess
|
||||||
import sys
|
import sys
|
||||||
|
import tempfile
|
||||||
import time
|
import time
|
||||||
import traceback
|
import traceback
|
||||||
|
import urllib.parse
|
||||||
import urllib.request
|
import urllib.request
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
@ -67,6 +71,7 @@ _ensure_skill_venv_python()
|
|||||||
|
|
||||||
try:
|
try:
|
||||||
import pymysql # type: ignore # noqa: E402
|
import pymysql # type: ignore # noqa: E402
|
||||||
|
from openai import OpenAI # type: ignore # noqa: E402
|
||||||
except ModuleNotFoundError:
|
except ModuleNotFoundError:
|
||||||
_run_bootstrap()
|
_run_bootstrap()
|
||||||
_py = _get_python_executable()
|
_py = _get_python_executable()
|
||||||
@ -169,6 +174,213 @@ def _http_get_json(url: str, headers: dict, timeout: int = 30) -> dict:
|
|||||||
return json.loads(resp.read().decode("utf-8"))
|
return json.loads(resp.read().decode("utf-8"))
|
||||||
|
|
||||||
|
|
||||||
|
def _coerce_int(value, default: int, minimum: int, maximum: int) -> int:
|
||||||
|
try:
|
||||||
|
parsed = int(value)
|
||||||
|
except (TypeError, ValueError):
|
||||||
|
parsed = default
|
||||||
|
return min(max(parsed, minimum), maximum)
|
||||||
|
|
||||||
|
|
||||||
|
def _openai_output_format(config: dict) -> str:
|
||||||
|
output_format = str(config.get("output_format", "png") or "png").lower()
|
||||||
|
if output_format not in {"png", "jpeg", "webp"}:
|
||||||
|
return "png"
|
||||||
|
return output_format
|
||||||
|
|
||||||
|
|
||||||
|
def _openai_size(config: dict, ratio: str, resolution: str) -> str:
|
||||||
|
configured = str(config.get("size", "") or "").strip()
|
||||||
|
if configured:
|
||||||
|
return configured
|
||||||
|
|
||||||
|
normalized_ratio = (ratio or "").replace(" ", "").lower()
|
||||||
|
normalized_resolution = (resolution or "").replace(" ", "").lower()
|
||||||
|
|
||||||
|
if normalized_resolution in {"4k", "2160p", "3840x2160"}:
|
||||||
|
sizes = {
|
||||||
|
"16:9": "3840x2160",
|
||||||
|
"9:16": "2160x3840",
|
||||||
|
"1:1": "2048x2048",
|
||||||
|
"3:2": "3072x2048",
|
||||||
|
"2:3": "2048x3072",
|
||||||
|
}
|
||||||
|
elif normalized_resolution in {"2k", "1440p", "2048"}:
|
||||||
|
sizes = {
|
||||||
|
"16:9": "2048x1152",
|
||||||
|
"9:16": "1152x2048",
|
||||||
|
"1:1": "2048x2048",
|
||||||
|
"3:2": "2048x1360",
|
||||||
|
"2:3": "1360x2048",
|
||||||
|
}
|
||||||
|
elif normalized_resolution in {"1k", "1024", "1024p"}:
|
||||||
|
sizes = {
|
||||||
|
"16:9": "1536x864",
|
||||||
|
"9:16": "864x1536",
|
||||||
|
"1:1": "1024x1024",
|
||||||
|
"3:2": "1536x1024",
|
||||||
|
"2:3": "1024x1536",
|
||||||
|
}
|
||||||
|
else:
|
||||||
|
return "auto"
|
||||||
|
|
||||||
|
return sizes.get(normalized_ratio, "auto")
|
||||||
|
|
||||||
|
|
||||||
|
def _openai_prompt(prompt: str, negative_prompt: str) -> str:
|
||||||
|
if not negative_prompt:
|
||||||
|
return prompt
|
||||||
|
return f"{prompt}\n\n不要包含: {negative_prompt}"
|
||||||
|
|
||||||
|
|
||||||
|
def _openai_client(config: dict) -> OpenAI:
|
||||||
|
api_key = str(config.get("api_key", "")).strip()
|
||||||
|
if not api_key:
|
||||||
|
raise RuntimeError("OpenAI 绘图配置缺少 api_key")
|
||||||
|
|
||||||
|
base_url = str(config.get("base_url", "") or "").strip()
|
||||||
|
organization = str(config.get("organization", "") or "").strip()
|
||||||
|
project = str(config.get("project", "") or "").strip()
|
||||||
|
timeout: float | None = None
|
||||||
|
timeout_value = config.get("timeout")
|
||||||
|
if timeout_value not in (None, ""):
|
||||||
|
timeout = float(timeout_value)
|
||||||
|
|
||||||
|
return OpenAI(
|
||||||
|
api_key=api_key,
|
||||||
|
base_url=base_url or None,
|
||||||
|
organization=organization or None,
|
||||||
|
project=project or None,
|
||||||
|
timeout=timeout,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _truncate_debug_payload(value):
|
||||||
|
if isinstance(value, dict):
|
||||||
|
return {
|
||||||
|
key: (
|
||||||
|
f"{item[:50]}..." if key == "b64_json" and isinstance(item, str) and len(item) > 50 else _truncate_debug_payload(item)
|
||||||
|
)
|
||||||
|
for key, item in value.items()
|
||||||
|
}
|
||||||
|
if isinstance(value, list):
|
||||||
|
return [_truncate_debug_payload(item) for item in value]
|
||||||
|
return value
|
||||||
|
|
||||||
|
|
||||||
|
def _debug_response(label: str, payload) -> None:
|
||||||
|
if hasattr(payload, "model_dump"):
|
||||||
|
payload = payload.model_dump()
|
||||||
|
payload = _truncate_debug_payload(payload)
|
||||||
|
sys.stdout.write(f"[debug] {label}: {json.dumps(payload, ensure_ascii=False)}\n")
|
||||||
|
|
||||||
|
|
||||||
|
def _rewrite_openai_image_url(url: str) -> str:
|
||||||
|
internal_host = "http://chatgpt2api:80"
|
||||||
|
external_host = "https://chatgpt2api.houhoukang.com"
|
||||||
|
if url.startswith(internal_host):
|
||||||
|
return f"{external_host}{url[len(internal_host):]}"
|
||||||
|
return url
|
||||||
|
|
||||||
|
|
||||||
|
def _extension_from_mime(mime_type: str) -> str:
|
||||||
|
if mime_type == "image/jpeg":
|
||||||
|
return ".jpg"
|
||||||
|
guessed = mimetypes.guess_extension(mime_type)
|
||||||
|
if guessed in {".png", ".jpg", ".jpeg", ".webp"}:
|
||||||
|
return guessed
|
||||||
|
return ".png"
|
||||||
|
|
||||||
|
|
||||||
|
def _extension_from_output_format(output_format: str) -> str:
|
||||||
|
if output_format == "jpeg":
|
||||||
|
return ".jpg"
|
||||||
|
if output_format == "webp":
|
||||||
|
return ".webp"
|
||||||
|
return ".png"
|
||||||
|
|
||||||
|
|
||||||
|
def _openai_response_value(item, key: str):
|
||||||
|
if isinstance(item, dict):
|
||||||
|
return item.get(key)
|
||||||
|
return getattr(item, key, None)
|
||||||
|
|
||||||
|
|
||||||
|
def _write_openai_b64_image(b64_json: str, output_format: str) -> str:
|
||||||
|
encoded = b64_json.strip()
|
||||||
|
suffix = _extension_from_output_format(output_format)
|
||||||
|
if encoded.startswith("data:"):
|
||||||
|
header, encoded = encoded.split(",", 1)
|
||||||
|
mime_type = header[5:].split(";", 1)[0].strip().lower()
|
||||||
|
if mime_type:
|
||||||
|
suffix = _extension_from_mime(mime_type)
|
||||||
|
|
||||||
|
encoded = "".join(encoded.split())
|
||||||
|
padding = len(encoded) % 4
|
||||||
|
if padding:
|
||||||
|
encoded = f"{encoded}{'=' * (4 - padding)}"
|
||||||
|
|
||||||
|
image_bytes = base64.b64decode(encoded)
|
||||||
|
with tempfile.NamedTemporaryFile(prefix="wechat-openai-image-", suffix=suffix, delete=False) as temp_file:
|
||||||
|
temp_file.write(image_bytes)
|
||||||
|
return temp_file.name
|
||||||
|
|
||||||
|
|
||||||
|
def _openai_images_from_response(response, output_format: str) -> list[str]:
|
||||||
|
outputs: list[str] = []
|
||||||
|
try:
|
||||||
|
for item in getattr(response, "data", []) or []:
|
||||||
|
b64_json = _openai_response_value(item, "b64_json")
|
||||||
|
if b64_json:
|
||||||
|
outputs.append(_write_openai_b64_image(str(b64_json), output_format))
|
||||||
|
continue
|
||||||
|
|
||||||
|
url = _openai_response_value(item, "url")
|
||||||
|
if url:
|
||||||
|
outputs.append(_rewrite_openai_image_url(str(url)))
|
||||||
|
except Exception:
|
||||||
|
_cleanup_openai_temp_files(outputs)
|
||||||
|
raise
|
||||||
|
return outputs
|
||||||
|
|
||||||
|
|
||||||
|
def _is_remote_image_url(value: str) -> bool:
|
||||||
|
return urllib.parse.urlparse(value).scheme in {"http", "https"}
|
||||||
|
|
||||||
|
|
||||||
|
def _send_image_outputs(client_port: str, from_wx_id: str, image_outputs: list[str]) -> None:
|
||||||
|
remote_urls = [value for value in image_outputs if value and _is_remote_image_url(value)]
|
||||||
|
local_paths = [value for value in image_outputs if value and not _is_remote_image_url(value)]
|
||||||
|
|
||||||
|
if remote_urls:
|
||||||
|
send_url = f"http://127.0.0.1:{client_port}/api/v1/robot/message/send/image/url"
|
||||||
|
send_body = {
|
||||||
|
"to_wxid": from_wx_id,
|
||||||
|
"image_urls": remote_urls,
|
||||||
|
}
|
||||||
|
response = _http_post_json(send_url, send_body, {"Content-Type": "application/json"}, timeout=300)
|
||||||
|
_debug_response("send image url response", response)
|
||||||
|
|
||||||
|
for file_path in local_paths:
|
||||||
|
send_url = f"http://127.0.0.1:{client_port}/api/v1/robot/message/send/image/local"
|
||||||
|
send_body = {
|
||||||
|
"to_wxid": from_wx_id,
|
||||||
|
"file_path": file_path,
|
||||||
|
}
|
||||||
|
response = _http_post_json(send_url, send_body, {"Content-Type": "application/json"}, timeout=300)
|
||||||
|
_debug_response("send image local response", response)
|
||||||
|
|
||||||
|
|
||||||
|
def _cleanup_openai_temp_files(image_outputs: list[str]) -> None:
|
||||||
|
for value in image_outputs:
|
||||||
|
path = Path(value)
|
||||||
|
if path.name.startswith("wechat-openai-image-") and path.is_file():
|
||||||
|
try:
|
||||||
|
path.unlink()
|
||||||
|
except OSError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
def call_jimeng(config: dict, prompt: str, model: str,
|
def call_jimeng(config: dict, prompt: str, model: str,
|
||||||
negative_prompt: str, ratio: str, resolution: str) -> list[str]:
|
negative_prompt: str, ratio: str, resolution: str) -> list[str]:
|
||||||
"""Call JiMeng (即梦) image generation API."""
|
"""Call JiMeng (即梦) image generation API."""
|
||||||
@ -315,13 +527,43 @@ def call_zimage(config: dict, prompt: str, model: str) -> list[str]:
|
|||||||
raise RuntimeError("造相绘图任务超时")
|
raise RuntimeError("造相绘图任务超时")
|
||||||
|
|
||||||
|
|
||||||
|
def call_openai(config: dict, prompt: str, model: str,
|
||||||
|
negative_prompt: str, ratio: str, resolution: str) -> list[str]:
|
||||||
|
"""Call OpenAI GPT Image API for text-to-image generation."""
|
||||||
|
client = _openai_client(config)
|
||||||
|
output_format = _openai_output_format(config)
|
||||||
|
quality = str(config.get("quality", "auto") or "auto")
|
||||||
|
moderation = str(config.get("moderation", "auto") or "auto")
|
||||||
|
background = str(config.get("background", "auto") or "auto")
|
||||||
|
if background == "transparent":
|
||||||
|
background = "auto"
|
||||||
|
|
||||||
|
kwargs = {
|
||||||
|
"model": model or "gpt-image-2",
|
||||||
|
"prompt": _openai_prompt(prompt, negative_prompt),
|
||||||
|
"n": _coerce_int(config.get("n"), 1, 1, 10),
|
||||||
|
"size": _openai_size(config, ratio, resolution),
|
||||||
|
"quality": quality,
|
||||||
|
"background": background,
|
||||||
|
"moderation": moderation,
|
||||||
|
"output_format": output_format,
|
||||||
|
}
|
||||||
|
if output_format in {"jpeg", "webp"} and config.get("output_compression") is not None:
|
||||||
|
kwargs["output_compression"] = _coerce_int(config.get("output_compression"), 100, 0, 100)
|
||||||
|
|
||||||
|
response = client.images.generate(**kwargs)
|
||||||
|
_debug_response("openai images.generate response", response)
|
||||||
|
return _openai_images_from_response(response, output_format)
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
# Main
|
# Main
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
JIMENG_MODELS = {"jimeng-4.5", "jimeng-4.6", "jimeng-5.0"}
|
JIMENG_MODELS = {"jimeng-4.5", "jimeng-4.6", "jimeng-4.7", "jimeng-5.0"}
|
||||||
DOUBAO_MODELS = {"doubao-seedream-4.5", "doubao-seedream-4.0", "doubao-seedream-3.0-t2i", "doubao-seededit-3.0-i2i"}
|
DOUBAO_MODELS = {"doubao-seedream-4.5", "doubao-seedream-4.0", "doubao-seedream-3.0-t2i", "doubao-seededit-3.0-i2i"}
|
||||||
ZIMAGE_MODELS = {"Z-Image", "Z-Image-Turbo", "Qwen-Image-Edit-2511"}
|
ZIMAGE_MODELS = {"Z-Image", "Z-Image-Turbo", "Qwen-Image-Edit-2511"}
|
||||||
|
OPENAI_MODELS = {"gpt-image-2"}
|
||||||
|
|
||||||
|
|
||||||
def _parse_cli_params(argv: list[str]) -> dict[str, str]:
|
def _parse_cli_params(argv: list[str]) -> dict[str, str]:
|
||||||
@ -423,6 +665,13 @@ def main() -> int:
|
|||||||
return 0
|
return 0
|
||||||
image_urls = call_zimage(zimage_config, prompt, model)
|
image_urls = call_zimage(zimage_config, prompt, model)
|
||||||
|
|
||||||
|
elif model in OPENAI_MODELS:
|
||||||
|
openai_config = settings_json.get("OpenAI", {})
|
||||||
|
if not openai_config.get("enabled", False):
|
||||||
|
sys.stdout.write("OpenAI 绘图未开启\n")
|
||||||
|
return 0
|
||||||
|
image_urls = call_openai(openai_config, prompt, model, negative_prompt, ratio, resolution)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
sys.stdout.write("不支持的 AI 图像模型\n")
|
sys.stdout.write("不支持的 AI 图像模型\n")
|
||||||
return 1
|
return 1
|
||||||
@ -438,20 +687,18 @@ def main() -> int:
|
|||||||
# 通过客户端接口发送图片
|
# 通过客户端接口发送图片
|
||||||
client_port = os.environ.get("ROBOT_WECHAT_CLIENT_PORT", "").strip()
|
client_port = os.environ.get("ROBOT_WECHAT_CLIENT_PORT", "").strip()
|
||||||
if not client_port:
|
if not client_port:
|
||||||
|
_cleanup_openai_temp_files(image_urls)
|
||||||
sys.stdout.write("环境变量 ROBOT_WECHAT_CLIENT_PORT 未配置\n")
|
sys.stdout.write("环境变量 ROBOT_WECHAT_CLIENT_PORT 未配置\n")
|
||||||
return 1
|
return 1
|
||||||
|
|
||||||
send_url = f"http://127.0.0.1:{client_port}/api/v1/robot/message/send/image/url"
|
|
||||||
send_body = {
|
|
||||||
"to_wxid": from_wx_id,
|
|
||||||
"image_urls": [u for u in image_urls if u],
|
|
||||||
}
|
|
||||||
try:
|
try:
|
||||||
_http_post_json(send_url, send_body, {"Content-Type": "application/json"}, timeout=60)
|
_send_image_outputs(client_port, from_wx_id, image_urls)
|
||||||
sys.stdout.write("图片发送成功\n")
|
sys.stdout.write("图片发送成功\n")
|
||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
sys.stdout.write(f"发送图片失败: {exc}\n")
|
sys.stdout.write(f"发送图片失败: {exc}\n")
|
||||||
return 1
|
return 1
|
||||||
|
finally:
|
||||||
|
_cleanup_openai_temp_files(image_urls)
|
||||||
|
|
||||||
return 0
|
return 0
|
||||||
|
|
||||||
|
|||||||
@ -111,6 +111,70 @@ argument-hint: "需要 content;可选 emotion、voice、style_prompt、voice_p
|
|||||||
9. 不要传递音色复刻音频参数。若当前消息引用了一条语音消息,脚本会通过 `ROBOT_REF_MESSAGE_ID` 自动判断并下载引用语音作为复刻样本。
|
9. 不要传递音色复刻音频参数。若当前消息引用了一条语音消息,脚本会通过 `ROBOT_REF_MESSAGE_ID` 自动判断并下载引用语音作为复刻样本。
|
||||||
10. `content` 超过 260 个字符时,不应该调用本技能。
|
10. `content` 超过 260 个字符时,不应该调用本技能。
|
||||||
|
|
||||||
|
## 音频标签控制
|
||||||
|
|
||||||
|
通过在文本中嵌入风格标签与音频标签,直接对语音进行精细控制。开头是整体风格标签,中间可以插入细粒度控制标签。
|
||||||
|
|
||||||
|
在目标文本开头添加 `(风格)` 标签,即可指定语音的发音风格。支持同时设置多种风格,将多个风格名称置于同一对括号内,分隔符不限。
|
||||||
|
|
||||||
|
支持的括号格式: 可使用半角 `()`、全角 `()` 或 `[]`。
|
||||||
|
|
||||||
|
### 格式示例
|
||||||
|
|
||||||
|
```
|
||||||
|
风格类型 风格示例
|
||||||
|
基础情绪 开心/悲伤/愤怒/恐惧/惊讶/兴奋/委屈/平静/冷漠
|
||||||
|
复合情绪 怅然/欣慰/无奈/愧疚/释然/嫉妒/厌倦/忐忑/动情
|
||||||
|
整体语调 温柔/高冷/活泼/严肃/慵懒/俏皮/深沉/干练/凌厉
|
||||||
|
音色定位 磁性/醇厚/清亮/空灵/稚嫩/苍老/甜美/沙哑/醇雅
|
||||||
|
人设腔调 夹子音/御姐音/正太音/大叔音/台湾腔
|
||||||
|
方言 东北话/四川话/河南话/粤语
|
||||||
|
角色扮演 孙悟空/林黛玉
|
||||||
|
唱歌 唱歌
|
||||||
|
```
|
||||||
|
|
||||||
|
样例:
|
||||||
|
|
||||||
|
- (怅然)这么多年过去了,再走过那条街,心里一下子空了一块。
|
||||||
|
|
||||||
|
- (慵懒)再让我睡五分钟……就五分钟,真的,最后一次。
|
||||||
|
|
||||||
|
- (磁性)夜已经深了,城市还在呼吸。我是今晚陪你的人,欢迎收听《午夜电台》。
|
||||||
|
|
||||||
|
- (东北话)哎呀妈呀,这天儿也忒冷了吧!你说这风,嗖嗖的,跟刀子似的,割脸啊!
|
||||||
|
|
||||||
|
- (粤语)呢个真係好正啊!食过一次就唔会忘记!
|
||||||
|
|
||||||
|
- (唱歌)原谅我这一生不羁放纵爱自由,也会怕有一天会跌倒,Oh no。背弃了理想,谁人都可以,哪会怕有一天只你共我。
|
||||||
|
|
||||||
|
在此基础上,我们还支持在文本中任意位置插入 [音频标签]。通过 [音频标签] ,你可以对声音进行细粒度控制,精准调节语气、情绪和表达风格——无论是低声耳语、放声大笑,还是带点小情绪的小吐槽,也可以灵活插入呼吸声,停顿,咳嗽等,都能轻松实现。语速同样可以灵活调整,让每句话都有它该有的节奏。
|
||||||
|
|
||||||
|
```
|
||||||
|
风格类型 风格示例
|
||||||
|
语速与节奏 吸气/深呼吸/叹气/长叹一口气/喘息/屏息
|
||||||
|
情绪状态 紧张/害怕/激动/疲惫/委屈/撒娇/心虚/震惊/不耐烦
|
||||||
|
语音特征 颤抖/声音颤抖/变调/破音/鼻音/气声/沙哑
|
||||||
|
哭笑表达 笑/轻笑/大笑/冷笑/抽泣/呜咽/哽咽/嚎啕大哭
|
||||||
|
```
|
||||||
|
|
||||||
|
样例:
|
||||||
|
|
||||||
|
- (紧张,深呼吸)呼……冷静,冷静。不就是一个面试吗……(语速加快,碎碎念)自我介绍已经背了五十遍了,应该没问题的。加油,你可以的……(小声)哎呀,领带歪没歪?
|
||||||
|
|
||||||
|
- (极其疲惫,有气无力)师傅……到地方了叫我一声……(长叹一口气)我先眯一会儿,这班加得我魂儿都要散了。
|
||||||
|
|
||||||
|
- 如果我当时……(沉默片刻)哪怕再坚持一秒钟,结果是不是就不一样了?(苦笑)呵,没如果了。
|
||||||
|
|
||||||
|
- (寒冷导致的急促呼吸)呼——呼——这、这大兴安岭的雪……(咳嗽)简直能把人骨头冻透了……别、别停下,走,快走。
|
||||||
|
|
||||||
|
- (提高音量喊话)大姐!这鱼新鲜着呢!早上刚捞上来的!哎!那个谁,别乱翻,压坏了你赔啊?!
|
||||||
|
|
||||||
|
### 特别注意
|
||||||
|
|
||||||
|
- 只有`mimo-v2.5-tts`模型支持唱歌模式
|
||||||
|
|
||||||
|
- 如需体验更佳的唱歌风格,必须在目标文本最开头添加 `(唱歌)` 标签,格式为:`(唱歌)歌词`。歌词 建议采用中文,可获得更优合成效果。标签内标识支持以下取值,效果等效:`唱歌`、`sing`、`singing`
|
||||||
|
|
||||||
## 执行步骤
|
## 执行步骤
|
||||||
|
|
||||||
1. 识别用户是否明确需要语音消息。
|
1. 识别用户是否明确需要语音消息。
|
||||||
|
|||||||
@ -740,14 +740,6 @@ def synthesize_audio_mimo(config: dict, params: dict) -> tuple[bytes, str]:
|
|||||||
|
|
||||||
url = f"{base_url}/chat/completions"
|
url = f"{base_url}/chat/completions"
|
||||||
payload, audio_format, stream = _build_mimo_payload(config, params)
|
payload, audio_format, stream = _build_mimo_payload(config, params)
|
||||||
|
|
||||||
sys.stdout.write(
|
|
||||||
f"[mimo debug] config={json.dumps(config, ensure_ascii=False)}\n"
|
|
||||||
f"[mimo debug] url={url}\n"
|
|
||||||
f"[mimo debug] api_key={api_key}\n"
|
|
||||||
f"[mimo debug] model={payload.get('model')}\n"
|
|
||||||
)
|
|
||||||
|
|
||||||
request_data = json.dumps(payload, ensure_ascii=False).encode("utf-8")
|
request_data = json.dumps(payload, ensure_ascii=False).encode("utf-8")
|
||||||
|
|
||||||
req = urllib.request.Request(
|
req = urllib.request.Request(
|
||||||
@ -755,7 +747,7 @@ def synthesize_audio_mimo(config: dict, params: dict) -> tuple[bytes, str]:
|
|||||||
data=request_data,
|
data=request_data,
|
||||||
headers={
|
headers={
|
||||||
"Content-Type": "application/json",
|
"Content-Type": "application/json",
|
||||||
"api-key": api_key,
|
"Authorization": f"Bearer {api_key}",
|
||||||
"Accept": "application/json, text/event-stream",
|
"Accept": "application/json, text/event-stream",
|
||||||
"Accept-Encoding": "identity",
|
"Accept-Encoding": "identity",
|
||||||
},
|
},
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user