fix: 支持小米音色模型

This commit is contained in:
hp0912 2026-05-02 00:33:02 +08:00
parent 376e635fbe
commit 0638ed7010
2 changed files with 411 additions and 87 deletions

View File

@ -1,7 +1,7 @@
---
name: voice-message
description: "文本转语音与语音消息发送技能。当用户想让我说话、发语音、把一段话转成语音、用某种情绪读出来时使用。支持 content、emotion、context_texts 参数,并自动把合成结果作为语音消息发给当前会话。"
argument-hint: "需要 content可选 emotion、context_texts。context_texts 可重复传入。"
description: "文本转语音与语音消息发送技能。当用户想让我说话、发语音、把一段话转成语音、用某种情绪/音色/语速/方言读出来时使用。支持 content、emotion、voice、style_prompt、voice_prompt、audio_tags、context_texts 等通用参数,并自动把合成结果作为语音消息发给当前会话。"
argument-hint: "需要 content可选 emotion、voice、style_prompt、voice_prompt、audio_tags、context_texts、speaking_rate、pitch、volume、dialect。"
---
# Voice Message Skill
@ -10,12 +10,13 @@ argument-hint: "需要 content可选 emotion、context_texts。context_texts
这是一个将文本合成为语音并直接发送到当前微信会话的技能。
技能脚本位于 `cripts/voice_message.py`。
技能脚本位于 `scripts/voice_message.py`。
## 触发条件
- 用户想让你发语音、说一句话、用语音回复。
- 用户说「把这句话读出来」「帮我发个语音」「用开心一点的语气说」。
- 用户要求指定音色、语速、音量、方言、角色感、播报风格或音频标签。
- 用户明确要求文本转语音。
## 入参规范
@ -30,31 +31,52 @@ argument-hint: "需要 content可选 emotion、context_texts。context_texts
},
"emotion": {
"type": "string",
"description": "可选,输出语音的情绪类型。仅在用户明确要求语气、情绪或声线风格时传入。",
"enum": [
"happy",
"sad",
"angry",
"surprised",
"fear",
"hate",
"excited",
"lovey-dovey",
"shy",
"comfort",
"tension",
"tender",
"magnetic",
"vocal-fry",
"ASMR"
]
"description": "可选,用户明确要求的情绪或整体风格词,例如 happy、tender、开心、委屈、慵懒、磁性。不要为了适配供应商而改写。"
},
"voice": {
"type": "string",
"description": "可选用户明确指定的音色名、speaker 名或供应商配置中约定的 voice 名称,例如 Chloe、冰糖、mimo_default。不要把“女声”“低沉”这类描述放在这里应放到 voice_prompt。"
},
"voice_prompt": {
"type": "string",
"description": "可选,声线/音色描述,例如“年轻女性,声音清亮,语气温柔但带一点疲惫”。适合文本音色设计,也会作为其他供应商的辅助风格提示。"
},
"context_texts": {
"type": "array",
"items": {
"type": "string"
},
"description": "可选,语音合成辅助信息。仅在需要引导语速、情绪、音量、说话方式时使用,例如“你可以说慢一点吗?”“你用很委屈的语气说”。"
"description": "可选,语音合成辅助信息或对话上下文。仅在需要补充语境、人物状态、说话方式时使用。"
},
"style_prompt": {
"type": "array",
"items": {
"type": "string"
},
"description": "可选,自然语言风格/导演提示,例如“语速稍快,尾音上扬,像刚查到好成绩一样压不住开心”。可重复传入。"
},
"audio_tags": {
"type": "array",
"items": {
"type": "string"
},
"description": "可选,音频标签或整体标签,例如“粤语”“唱歌”“轻笑”“深呼吸”。仅当用户明确要求标签、方言、唱歌、笑声、停顿等细粒度控制时传入。"
},
"speaking_rate": {
"type": "string",
"description": "可选,语速要求,例如“偏慢”“稍快”“像连珠炮”。"
},
"pitch": {
"type": "string",
"description": "可选,音高要求,例如“更低沉”“明亮上扬”。"
},
"volume": {
"type": "string",
"description": "可选,音量或力度要求,例如“小声耳语”“提高音量喊话”。"
},
"dialect": {
"type": "string",
"description": "可选,方言或口音要求,例如“粤语”“四川话”“东北话”“轻微台湾腔”。"
}
},
"required": ["content"],
@ -65,28 +87,49 @@ argument-hint: "需要 content可选 emotion、context_texts。context_texts
对应命令行参数:
- `--content <文本>` 必填
- `--emotion <情绪>` 可选
- `--emotion <情绪/风格>` 可选
- `--voice <音色名或 speaker 名>` 可选
- `--voice_prompt <声线/音色描述>` 可选
- `--style_prompt <自然语言风格提示>` 可选,可重复传入多次
- `--audio_tags <音频标签>` 可选,可重复传入多次
- `--context_texts <辅助文本>` 可选,可重复传入多次
- `--speaking_rate <语速>` 可选
- `--pitch <音高>` 可选
- `--volume <音量>` 可选
- `--dialect <方言/口音>` 可选
## 参数抽取规则
1. `content` 必须来自用户明确想让你说出的内容,不要加入寒暄、解释或额外总结。
2. 如果用户只说“你用语音回复我”但没有提供具体要说的话,应先基于上下文生成一段简洁、自然、适合直接播报的回复,再把这段回复作为 `content`
3. 只有当用户明确要求情绪或语气时才传 `emotion`
4. `context_texts` 适合表达细粒度播报要求,优先用于语速、语调、音量、说话状态的补充说明。
5. `content` 超过 260 个字符时,不应该调用本技能。
3. 不要判断当前使用的是哪个语音供应商,也不要为了供应商改写参数;只按用户意图提取通用参数,脚本会自动映射。
4. 只有当用户明确要求情绪或语气时才传 `emotion`。`emotion` 可以是中文或英文短词,不必限制在某个供应商枚举内。
5. 用户指定明确音色名时用 `voice`;用户描述“女声、低沉、御姐音、年轻男性”等声线质感时用 `voice_prompt`
6. 语速、音高、音量、方言有明确要求时优先填 `speaking_rate`、`pitch`、`volume`、`dialect`;复杂演绎要求放入 `style_prompt`
7. `audio_tags` 仅用于用户明确要求唱歌、方言、笑声、停顿、深呼吸等标签化控制时;如果用户已把标签写在 `content` 中,不要重复添加。
8. `context_texts` 适合表达上下文、场景、人物状态和补充播报要求。
9. 不要传递音色复刻音频参数。若当前消息引用了一条语音消息,脚本会通过 `ROBOT_REF_MESSAGE_ID` 自动判断并下载引用语音作为复刻样本。
10. `content` 超过 260 个字符时,不应该调用本技能。
## 执行步骤
1. 识别用户是否明确需要语音消息。
2. 提取 `content`,可选提取 `emotion`、`context_texts`。
2. 提取 `content`,可选提取 `emotion`、`voice`、`voice_prompt`、`style_prompt`、`audio_tags`、`context_texts` 等通用控制参数
3. 在仓库根目录执行:
```bash
python3 scripts/voice_message.py --content '这是一条语音消息' --emotion happy --context_texts '请自然一点'
python3 skills/voice-message/scripts/voice_message.py --content '这是一条语音消息' --emotion happy --style_prompt '请自然一点'
```
4. 脚本会读取数据库中的 TTS 配置,调用语音合成接口并通过客户端接口 `POST http://127.0.0.1:{ROBOT_WECHAT_CLIENT_PORT}/api/v1/robot/message/send/voice` 直接发送语音。
4. 脚本会读取数据库中的 TTS 配置,按当前供应商能力映射通用参数,调用语音合成接口并通过客户端接口 `POST http://127.0.0.1:{ROBOT_WECHAT_CLIENT_PORT}/api/v1/robot/message/send/voice` 直接发送语音。
## 供应商映射说明
- Doubao`content` 写入文本字段;支持的 `emotion` 写入音频情绪参数;`voice` 可覆盖 speaker其他风格控制会合并到 `context_texts` 辅助信息。
- MiMo V2.5`content` 写入 `assistant` 消息;`style_prompt`、`voice_prompt`、`context_texts`、`emotion`、`speaking_rate`、`pitch`、`volume`、`dialect` 会合并为 `user` 风格/音色控制;`audio_tags` 会作为整体标签加到要合成的文本前。
- MiMo 会默认使用非流式 `wav` 输出;配置中 `stream: true` 时使用 `pcm16` 流式兼容模式并在脚本内封装为 `wav`
- MiMo 在 `auto_model` 未关闭时,会根据 `voice_prompt` 自动选择 `mimo-v2.5-tts-voicedesign`;如果 `ROBOT_REF_MESSAGE_ID` 指向数据库中 `messages.type = 34` 的语音消息,则脚本会调用客户端接口下载该语音 wav并自动选择 `mimo-v2.5-tts-voiceclone`
- 引用消息下载接口为 `GET http://127.0.0.1:{ROBOT_WECHAT_CLIENT_PORT}/api/v1/robot/chat/voice/download?message_id={ROBOT_REF_MESSAGE_ID}`,返回 wav 后由脚本封装为 MiMo 需要的 `data:audio/wav;base64,...`
## 依赖安装

View File

@ -11,6 +11,7 @@ import sys
import tempfile
import traceback
import urllib.error
import urllib.parse
import urllib.request
import uuid
from pathlib import Path
@ -43,6 +44,15 @@ EMOTION_ALIASES = {
DEFAULT_SPEAKER = "zh_female_vv_uranus_bigtts"
DEFAULT_AUDIO_FORMAT = "mp3"
DEFAULT_SAMPLE_RATE = 24000
DEFAULT_MIMO_BASE_URL = "https://api.xiaomimimo.com/v1"
DEFAULT_MIMO_MODEL = "mimo-v2.5-tts"
DEFAULT_MIMO_VOICE = "mimo_default"
DEFAULT_MIMO_AUDIO_FORMAT = "wav"
MIMO_STREAM_AUDIO_FORMAT = "pcm16"
MIMO_PCM_SAMPLE_RATE = 24000
MIMO_VOICE_DESIGN_MODEL = "mimo-v2.5-tts-voicedesign"
MIMO_VOICE_CLONE_MODEL = "mimo-v2.5-tts-voiceclone"
WECHAT_VOICE_MESSAGE_TYPE = 34
MAX_CONTENT_LENGTH = 260
STREAM_END_CODE = 20000000
@ -187,11 +197,82 @@ def load_tts_settings(conn, from_wx_id: str) -> tuple[bool, str, dict, str, str]
return enabled, tts_model, settings_json, fallback_base_url, fallback_api_key
def _clean_text(value: object) -> str:
return str(value or "").strip()
def _clean_text_list(values: object) -> list[str]:
if not isinstance(values, list):
return []
return [item for item in (_clean_text(value) for value in values) if item]
def _coerce_bool(value: object, default: bool = False) -> bool:
if value is None:
return default
if isinstance(value, bool):
return value
if isinstance(value, (int, float)):
return bool(value)
if isinstance(value, str):
normalized = value.strip().lower()
if normalized in {"1", "true", "yes", "y", "on"}:
return True
if normalized in {"0", "false", "no", "n", "off"}:
return False
return default
def _normalize_emotion(emotion: str) -> str:
normalized = EMOTION_ALIASES.get(emotion.strip(), emotion.strip())
if normalized not in VALID_EMOTIONS:
raise ValueError("emotion 不在支持范围内")
return normalized
return normalized if normalized in VALID_EMOTIONS else ""
def _download_referenced_voice_clone(message_id: str) -> str:
client_port = os.environ.get("ROBOT_WECHAT_CLIENT_PORT", "").strip()
if not client_port:
raise RuntimeError("环境变量 ROBOT_WECHAT_CLIENT_PORT 未配置")
encoded_message_id = urllib.parse.quote(message_id, safe="")
download_url = (
f"http://127.0.0.1:{client_port}/api/v1/robot/chat/voice/download"
f"?message_id={encoded_message_id}"
)
req = urllib.request.Request(download_url, method="GET")
try:
with urllib.request.urlopen(req, timeout=60) as response:
wav_data = response.read()
except urllib.error.HTTPError as exc:
error_body = exc.read().decode("utf-8", errors="replace")
raise RuntimeError(f"下载引用语音失败,状态码 {exc.code}: {error_body}") from exc
except urllib.error.URLError as exc:
raise RuntimeError(f"下载引用语音失败: {exc}") from exc
if not wav_data:
raise RuntimeError("下载引用语音失败: 响应为空")
audio_b64 = base64.b64encode(wav_data).decode("utf-8")
return f"data:audio/wav;base64,{audio_b64}"
def _load_referenced_voice_clone(conn) -> str:
ref_message_id = os.environ.get("ROBOT_REF_MESSAGE_ID", "").strip()
if not ref_message_id:
return ""
message = _query_one(conn, "SELECT * FROM messages WHERE msg_id = %s LIMIT 1", (ref_message_id,))
if not message:
return ""
try:
message_type = int(message.get("type") or 0)
except (TypeError, ValueError):
return ""
if message_type != WECHAT_VOICE_MESSAGE_TYPE:
return ""
return _download_referenced_voice_clone(ref_message_id)
def _parse_cli_params(argv: list[str]) -> dict:
@ -199,6 +280,14 @@ def _parse_cli_params(argv: list[str]) -> dict:
parser.add_argument("--content", default="")
parser.add_argument("--emotion", default="")
parser.add_argument("--context_texts", action="append", default=[])
parser.add_argument("--voice", default="")
parser.add_argument("--style_prompt", action="append", default=[])
parser.add_argument("--voice_prompt", default="")
parser.add_argument("--audio_tags", action="append", default=[])
parser.add_argument("--speaking_rate", default="")
parser.add_argument("--pitch", default="")
parser.add_argument("--volume", default="")
parser.add_argument("--dialect", default="")
namespace, unknown = parser.parse_known_args(argv)
if unknown:
@ -206,8 +295,16 @@ def _parse_cli_params(argv: list[str]) -> dict:
return {
"content": namespace.content,
"emotion": namespace.emotion,
"context_texts": [item for item in namespace.context_texts if item.strip()],
"emotion": _clean_text(namespace.emotion),
"context_texts": _clean_text_list(namespace.context_texts),
"voice": _clean_text(namespace.voice),
"style_prompt": _clean_text_list(namespace.style_prompt),
"voice_prompt": _clean_text(namespace.voice_prompt),
"audio_tags": _clean_text_list(namespace.audio_tags),
"speaking_rate": _clean_text(namespace.speaking_rate),
"pitch": _clean_text(namespace.pitch),
"volume": _clean_text(namespace.volume),
"dialect": _clean_text(namespace.dialect),
}
@ -237,11 +334,36 @@ def _build_request_headers(config: dict) -> dict[str, str]:
return headers
def _build_request_body(config: dict, content: str, emotion: str, context_texts: list[str]) -> dict:
def _build_control_texts(params: dict) -> list[str]:
controls = list(params.get("context_texts") or [])
controls.extend(params.get("style_prompt") or [])
labeled_fields = [
("emotion", "情绪/风格"),
("voice_prompt", "音色描述"),
("speaking_rate", "语速"),
("pitch", "音高"),
("volume", "音量"),
("dialect", "方言/口音"),
]
for field_name, label in labeled_fields:
value = _clean_text(params.get(field_name))
if value:
controls.append(f"{label}: {value}")
for tag in params.get("audio_tags") or []:
controls.append(f"音频标签: {tag}")
return [item for item in controls if item]
def _build_request_body(config: dict, params: dict) -> dict:
request_body = config.get("request_body") or {}
if not isinstance(request_body, dict):
raise RuntimeError("request_body 配置格式错误")
content = params.get("content", "")
body = json.loads(json.dumps(request_body))
user = body.setdefault("user", {})
if not isinstance(user, dict):
@ -252,7 +374,10 @@ def _build_request_body(config: dict, content: str, emotion: str, context_texts:
if not isinstance(req_params, dict):
raise RuntimeError("req_params 配置格式错误")
if not str(req_params.get("speaker") or "").strip():
voice = _clean_text(params.get("voice"))
if voice:
req_params["speaker"] = voice
elif not str(req_params.get("speaker") or "").strip():
req_params["speaker"] = DEFAULT_SPEAKER
req_params["text"] = content
@ -261,6 +386,7 @@ def _build_request_body(config: dict, content: str, emotion: str, context_texts:
raise RuntimeError("audio_params 配置格式错误")
audio_params["format"] = DEFAULT_AUDIO_FORMAT
audio_params["sample_rate"] = DEFAULT_SAMPLE_RATE
emotion = _normalize_emotion(_clean_text(params.get("emotion")))
if emotion:
audio_params["emotion"] = emotion
audio_params["emotion_scale"] = 5
@ -268,19 +394,20 @@ def _build_request_body(config: dict, content: str, emotion: str, context_texts:
additions = req_params.setdefault("x-additions", {})
if not isinstance(additions, dict):
raise RuntimeError("x-additions 配置格式错误")
context_texts = _build_control_texts(params)
if context_texts:
additions["context_texts"] = context_texts
return body
def synthesize_audio(config: dict, content: str, emotion: str, context_texts: list[str]) -> tuple[bytes, str]:
def synthesize_audio(config: dict, params: dict) -> tuple[bytes, str]:
url = str(config.get("url") or "").strip()
if not url:
raise RuntimeError("语音合成地址不能为空")
request_headers = _build_request_headers(config)
request_body = _build_request_body(config, content, emotion, context_texts)
request_body = _build_request_body(config, params)
request_data = json.dumps(request_body).encode("utf-8")
req = urllib.request.Request(url, data=request_data, headers=request_headers, method="POST")
@ -363,42 +490,158 @@ def _pcm16le_to_wav(pcm_data: bytes, sample_rate: int = 24000, channels: int = 1
return header + pcm_data
def synthesize_audio_mimo(config: dict, content: str, voice: str) -> tuple[bytes, str]:
api_key = str(config.get("api_key") or "").strip()
base_url = str(config.get("base_url") or "https://api.xiaomimimo.com/v1").strip().rstrip("/")
model = str(config.get("model") or "mimo-v2.5-tts").strip()
if not voice:
voice = str(config.get("voice") or "mimo_default").strip()
if not api_key:
raise RuntimeError("mimo api_key 不能为空")
def _config_texts(config: dict, key: str) -> list[str]:
value = config.get(key)
if isinstance(value, list):
return _clean_text_list(value)
text = _clean_text(value)
return [text] if text else []
url = f"{base_url}/chat/completions"
payload = json.dumps({
"model": model,
"messages": [{"role": "assistant", "content": content}],
"audio": {"format": "pcm16", "voice": voice},
"stream": True,
}).encode("utf-8")
req = urllib.request.Request(
url,
data=payload,
headers={
"Content-Type": "application/json",
"api-key": api_key,
},
method="POST",
def _resolve_mimo_model(config: dict, params: dict) -> str:
configured_model = _clean_text(config.get("model"))
if _clean_text(params.get("voice_clone_audio")):
return MIMO_VOICE_CLONE_MODEL
auto_model = _coerce_bool(config.get("auto_model"), True)
if auto_model and _clean_text(config.get("voice_clone_audio")):
return MIMO_VOICE_CLONE_MODEL
if auto_model and (_clean_text(params.get("voice_prompt")) or _clean_text(config.get("voice_prompt"))):
return MIMO_VOICE_DESIGN_MODEL
if configured_model:
return configured_model
return DEFAULT_MIMO_MODEL
def _format_mimo_audio_tags(tags: list[str]) -> str:
cleaned_tags = [tag.strip("()[] ") for tag in tags if tag.strip("()[] ")]
if not cleaned_tags:
return ""
return f"({' '.join(cleaned_tags)})"
def _build_mimo_assistant_content(params: dict) -> str:
content = _clean_text(params.get("content"))
tags = _format_mimo_audio_tags(params.get("audio_tags") or [])
return f"{tags}{content}" if tags else content
def _build_mimo_user_content(config: dict, params: dict, model: str) -> str:
parts: list[str] = []
voice_prompt = _clean_text(params.get("voice_prompt")) or _clean_text(config.get("voice_prompt"))
if voice_prompt:
if model == MIMO_VOICE_DESIGN_MODEL:
parts.append(voice_prompt)
else:
parts.append(f"音色/声线: {voice_prompt}")
parts.extend(_config_texts(config, "style_prompt"))
parts.extend(params.get("style_prompt") or [])
parts.extend(_config_texts(config, "context_texts"))
parts.extend(params.get("context_texts") or [])
labeled_fields = [
("emotion", "情绪/风格"),
("speaking_rate", "语速"),
("pitch", "音高"),
("volume", "音量"),
("dialect", "方言/口音"),
]
for field_name, label in labeled_fields:
value = _clean_text(params.get(field_name)) or _clean_text(config.get(field_name))
if value:
parts.append(f"{label}: {value}")
if model == MIMO_VOICE_DESIGN_MODEL and not parts:
raise RuntimeError("mimo 文本音色设计模型需要 voice_prompt 或 style_prompt")
return "\n".join(parts)
def _resolve_mimo_voice(config: dict, params: dict, model: str) -> str:
if model == MIMO_VOICE_DESIGN_MODEL:
return ""
if model == MIMO_VOICE_CLONE_MODEL:
voice_clone_audio = _clean_text(params.get("voice_clone_audio")) or _clean_text(config.get("voice_clone_audio"))
if not voice_clone_audio:
raise RuntimeError("mimo 音色复刻模型需要引用一条语音消息或配置 voice_clone_audio")
if voice_clone_audio.startswith("data:"):
return voice_clone_audio
mime_type = (
_clean_text(params.get("voice_clone_mime_type"))
or _clean_text(config.get("voice_clone_mime_type"))
or "audio/mpeg"
)
return f"data:{mime_type};base64,{voice_clone_audio}"
return _clean_text(params.get("voice")) or _clean_text(config.get("voice")) or DEFAULT_MIMO_VOICE
def _build_mimo_payload(config: dict, params: dict) -> tuple[dict, str, bool]:
model = _resolve_mimo_model(config, params)
stream = _coerce_bool(config.get("stream"), False)
audio_format = MIMO_STREAM_AUDIO_FORMAT if stream else (
_clean_text(config.get("audio_format")) or _clean_text(config.get("format")) or DEFAULT_MIMO_AUDIO_FORMAT
)
pcm_chunks = bytearray()
try:
response = urllib.request.urlopen(req, timeout=300)
except urllib.error.HTTPError as exc:
error_body = exc.read().decode("utf-8", errors="replace")
raise RuntimeError(f"mimo API请求失败状态码 {exc.code}: {error_body}") from exc
except urllib.error.URLError as exc:
raise RuntimeError(f"mimo 发送请求失败: {exc}") from exc
messages = []
user_content = _build_mimo_user_content(config, params, model)
if user_content or model == MIMO_VOICE_CLONE_MODEL:
messages.append({"role": "user", "content": user_content})
messages.append({"role": "assistant", "content": _build_mimo_assistant_content(params)})
audio = {"format": audio_format}
voice = _resolve_mimo_voice(config, params, model)
if voice:
audio["voice"] = voice
payload = {
"model": model,
"messages": messages,
"audio": audio,
}
if stream:
payload["stream"] = True
return payload, audio_format, stream
def _decode_mimo_audio(audio_b64: object, audio_format: str) -> tuple[bytes, str]:
if not isinstance(audio_b64, str) or not audio_b64:
raise RuntimeError("mimo 响应未包含音频数据")
try:
audio_bytes = base64.b64decode(audio_b64)
except Exception as exc:
raise RuntimeError(f"解码 mimo 音频数据失败: {exc}") from exc
if audio_format == MIMO_STREAM_AUDIO_FORMAT:
return _pcm16le_to_wav(audio_bytes, sample_rate=MIMO_PCM_SAMPLE_RATE), "wav"
return audio_bytes, audio_format
def _read_mimo_non_stream_response(response, audio_format: str) -> tuple[bytes, str]:
raw_body = response.read().decode("utf-8", errors="replace")
try:
payload = json.loads(raw_body)
except json.JSONDecodeError as exc:
raise RuntimeError(f"解析 mimo 响应失败: {exc}, 响应内容: {raw_body}") from exc
if isinstance(payload.get("error"), dict):
error = payload["error"]
message = _clean_text(error.get("message")) or json.dumps(error, ensure_ascii=False)
raise RuntimeError(f"mimo 合成失败: {message}")
choices = payload.get("choices") or []
if not choices:
raise RuntimeError(f"mimo 响应缺少 choices: {raw_body}")
message = choices[0].get("message") or {}
audio = message.get("audio") or {}
audio_b64 = audio.get("data") if isinstance(audio, dict) else None
return _decode_mimo_audio(audio_b64, audio_format)
def _read_mimo_stream_response(response) -> tuple[bytes, str]:
pcm_chunks = bytearray()
with response:
for raw_line in response:
line = raw_line.decode("utf-8", errors="replace").strip()
@ -411,6 +654,9 @@ def synthesize_audio_mimo(config: dict, content: str, voice: str) -> tuple[bytes
chunk = json.loads(data_str)
except json.JSONDecodeError:
continue
if isinstance(chunk.get("error"), dict):
message = _clean_text(chunk["error"].get("message")) or json.dumps(chunk["error"], ensure_ascii=False)
raise RuntimeError(f"mimo 合成失败: {message}")
choices = chunk.get("choices") or []
if not choices:
continue
@ -426,8 +672,42 @@ def synthesize_audio_mimo(config: dict, content: str, voice: str) -> tuple[bytes
if not pcm_chunks:
raise RuntimeError("mimo 未接收到音频数据")
wav_data = _pcm16le_to_wav(bytes(pcm_chunks))
return wav_data, "wav"
return _pcm16le_to_wav(bytes(pcm_chunks), sample_rate=MIMO_PCM_SAMPLE_RATE), "wav"
def synthesize_audio_mimo(config: dict, params: dict) -> tuple[bytes, str]:
api_key = str(config.get("api_key") or "").strip()
base_url = str(config.get("base_url") or DEFAULT_MIMO_BASE_URL).strip().rstrip("/")
if not api_key:
raise RuntimeError("mimo api_key 不能为空")
url = f"{base_url}/chat/completions"
payload, audio_format, stream = _build_mimo_payload(config, params)
request_data = json.dumps(payload, ensure_ascii=False).encode("utf-8")
req = urllib.request.Request(
url,
data=request_data,
headers={
"Content-Type": "application/json",
"api-key": api_key,
},
method="POST",
)
try:
response = urllib.request.urlopen(req, timeout=300)
except urllib.error.HTTPError as exc:
error_body = exc.read().decode("utf-8", errors="replace")
raise RuntimeError(f"mimo API请求失败状态码 {exc.code}: {error_body}") from exc
except urllib.error.URLError as exc:
raise RuntimeError(f"mimo 发送请求失败: {exc}") from exc
if stream:
return _read_mimo_stream_response(response)
with response:
return _read_mimo_non_stream_response(response, audio_format)
def _guess_mime_type(audio_format: str) -> str:
@ -531,16 +811,6 @@ def main() -> int:
sys.stdout.write("你要说的也太多了,要不你还是说点别的吧。\n")
return 1
emotion = params.get("emotion", "").strip()
if emotion:
try:
emotion = _normalize_emotion(emotion)
except ValueError as exc:
sys.stdout.write(f"参数格式错误: {exc}\n")
return 1
context_texts = params.get("context_texts", [])
from_wx_id = os.environ.get("ROBOT_FROM_WX_ID", "").strip()
if not from_wx_id:
sys.stdout.write("环境变量 ROBOT_FROM_WX_ID 未配置\n")
@ -552,11 +822,22 @@ def main() -> int:
sys.stdout.write(f"数据库连接失败: {exc}\n")
return 1
try:
try:
enabled, tts_model, tts_settings, fallback_base_url, fallback_api_key = load_tts_settings(conn, from_wx_id)
except Exception as exc:
sys.stdout.write(f"加载文本转语音配置失败: {exc}\n")
return 1
try:
if tts_model == "mimo":
voice_clone_audio = _load_referenced_voice_clone(conn)
if voice_clone_audio:
params = dict(params)
params["voice_clone_audio"] = voice_clone_audio
except Exception as exc:
sys.stdout.write(f"加载引用语音失败: {exc}\n")
return 1
finally:
try:
conn.close()
@ -578,7 +859,7 @@ def main() -> int:
try:
if tts_model == "doubao":
audio_data, audio_format = synthesize_audio(model_config, content, emotion, context_texts)
audio_data, audio_format = synthesize_audio(model_config, params)
elif tts_model == "mimo":
if not str(model_config.get("api_key") or "").strip() and fallback_api_key:
model_config = dict(model_config)
@ -586,7 +867,7 @@ def main() -> int:
if not str(model_config.get("base_url") or "").strip() and fallback_base_url:
model_config = dict(model_config)
model_config["base_url"] = fallback_base_url
audio_data, audio_format = synthesize_audio_mimo(model_config, content, "")
audio_data, audio_format = synthesize_audio_mimo(model_config, params)
else:
sys.stdout.write(f"未知的 TTS 模型: {tts_model}\n")
return 1