diff --git a/README.md b/README.md
index 90bf046..8f13515 100644
--- a/README.md
+++ b/README.md
@@ -37,6 +37,13 @@ MYSQL_PASSWORD=houhou
视频URL2
```
+**需要发语音的时候可以在控制台输出如下内容**
+
+```
+语音URL1
+语音URL2
+```
+
**发送图片的时候也可以调用 Agent 接口**
```
@@ -62,5 +69,25 @@ MYSQL_PASSWORD=houhou
"to_wxid": "{{ROBOT_FROM_WX_ID}}",
"video_urls": ["{{videourl}}"]
}
+```
+
+**发送语音的时候也可以调用 Agent 接口**
```
+[POST] http://127.0.0.1:{ROBOT_WECHAT_CLIENT_PORT}/api/v1/robot/message/send/voice
+
+说明:
+该接口用于上传语音文件并发送给指定微信用户或群聊。
+请求方式为 multipart/form-data,支持 .amr、.mp3、.wav 格式,单个文件大小不能超过 50MB。
+
+表单参数:
+- to_wxid: 接收方微信 ID,必填
+- voice: 语音文件,必填
+
+请求体 Body:
+
+{
+ "to_wxid": "{{ROBOT_FROM_WX_ID}}",
+ "voice": "@/path/to/voice.amr"
+}
+```
diff --git a/skills/video-generation/SKILL.md b/skills/video-generation/SKILL.md
index e686b4e..0631f90 100644
--- a/skills/video-generation/SKILL.md
+++ b/skills/video-generation/SKILL.md
@@ -101,7 +101,7 @@ argument-hint: "需要 prompt;可选 model、file_paths、ratio、resolution
python3 video-generation/scripts/video_generation.py --prompt '海边日落,镜头缓慢推进' --file_paths 'https://example.com/start.jpg'
```
-6. 脚本生成视频后会自动调用客户端接口 `POST http://127.0.0.1:{ROBOT_WECHAT_CLIENT_PORT}/api/v1/robot/message/send/video/url` 将视频发送给用户,成功时输出「视频发送成功」。
+6. 脚本生成视频后会自动调用客户端接口 `POST http://127.0.0.1:{ROBOT_WECHAT_CLIENT_PORT}/api/v1/robot/message/send/video/url` 将视频发送给用户,成功时输出「ended」。
## 校验规则
@@ -112,5 +112,5 @@ python3 video-generation/scripts/video_generation.py --prompt '海边日落,
## 回复要求
-- 成功时,脚本输出「视频发送成功」,表示视频已通过客户端接口直接发送,无需 AI 智能体再做额外处理。
+- 成功时,脚本输出「ended」,表示视频已通过客户端接口直接发送,无需 AI 智能体再做额外处理。
- 失败时,返回脚本输出的具体错误信息。
diff --git a/skills/video-generation/scripts/video_generation.py b/skills/video-generation/scripts/video_generation.py
index d661011..ccbbca4 100644
--- a/skills/video-generation/scripts/video_generation.py
+++ b/skills/video-generation/scripts/video_generation.py
@@ -340,7 +340,7 @@ def main() -> int:
try:
send_videos(from_wx_id, video_urls)
- sys.stdout.write("视频发送成功\n")
+ sys.stdout.write("ended")
except Exception as exc:
sys.stdout.write(f"发送视频失败: {exc}\n")
return 1
diff --git a/skills/voice-message/SKILL.md b/skills/voice-message/SKILL.md
new file mode 100644
index 0000000..d20face
--- /dev/null
+++ b/skills/voice-message/SKILL.md
@@ -0,0 +1,99 @@
+---
+name: voice-message
+description: "文本转语音与语音消息发送技能。当用户想让我说话、发语音、把一段话转成语音、用某种情绪读出来时使用。支持 content、emotion、context_texts 参数,并自动把合成结果作为语音消息发给当前会话。"
+argument-hint: "需要 content;可选 emotion、context_texts。context_texts 可重复传入。"
+---
+
+# Voice Message Skill
+
+## 描述
+
+这是一个将文本合成为语音并直接发送到当前微信会话的技能。
+
+技能脚本位于 `voice-message/cripts/voice_message.py`。
+
+## 触发条件
+
+- 用户想让你发语音、说一句话、用语音回复。
+- 用户说「把这句话读出来」「帮我发个语音」「用开心一点的语气说」。
+- 用户明确要求文本转语音。
+
+## 入参规范
+
+```json
+{
+ "type": "object",
+ "properties": {
+ "content": {
+ "type": "string",
+ "description": "要转成语音的文本内容。必须保留用户原意,不要无故扩写。最长 260 个字符。"
+ },
+ "emotion": {
+ "type": "string",
+ "description": "可选,输出语音的情绪类型。仅在用户明确要求语气、情绪或声线风格时传入。",
+ "enum": [
+ "happy",
+ "sad",
+ "angry",
+ "surprised",
+ "fear",
+ "hate",
+ "excited",
+ "lovey-dovey",
+ "shy",
+ "comfort",
+ "tension",
+ "tender",
+ "magnetic",
+ "vocal-fry",
+ "ASMR"
+ ]
+ },
+ "context_texts": {
+ "type": "array",
+ "items": {
+ "type": "string"
+ },
+ "description": "可选,语音合成辅助信息。仅在需要引导语速、情绪、音量、说话方式时使用,例如“你可以说慢一点吗?”“你用很委屈的语气说”。"
+ }
+ },
+ "required": ["content"],
+ "additionalProperties": false
+}
+```
+
+对应命令行参数:
+
+- `--content <文本>` 必填
+- `--emotion <情绪>` 可选
+- `--context_texts <辅助文本>` 可选,可重复传入多次
+
+## 参数抽取规则
+
+1. `content` 必须来自用户明确想让你说出的内容,不要加入寒暄、解释或额外总结。
+2. 如果用户只说“你用语音回复我”但没有提供具体要说的话,应先基于上下文生成一段简洁、自然、适合直接播报的回复,再把这段回复作为 `content`。
+3. 只有当用户明确要求情绪或语气时才传 `emotion`。
+4. `context_texts` 适合表达细粒度播报要求,优先用于语速、语调、音量、说话状态的补充说明。
+5. `content` 超过 260 个字符时,不应该调用本技能。
+
+## 执行步骤
+
+1. 识别用户是否明确需要语音消息。
+2. 提取 `content`,可选提取 `emotion`、`context_texts`。
+3. 在仓库根目录执行:
+
+```bash
+python3 voice-message/scripts/voice_message.py --content '这是一条语音消息' --emotion happy --context_texts '请自然一点'
+```
+
+4. 脚本会读取数据库中的 TTS 配置,调用语音合成接口并通过客户端接口 `POST http://127.0.0.1:{ROBOT_WECHAT_CLIENT_PORT}/api/v1/robot/message/send/voice` 直接发送语音。
+
+## 依赖安装
+
+- 脚本首次运行时会自动创建虚拟环境并安装依赖,无需手动执行。
+- 如需手动重新安装,可执行:`python3 voice-message/scripts/bootstrap.py`
+
+## 回复要求
+
+- 成功时,脚本输出「ended」,表示语音已直接发送,无需 AI 智能体再拼装额外消息。
+- 失败时,返回脚本输出的具体错误信息。
diff --git a/skills/voice-message/scripts/bootstrap.py b/skills/voice-message/scripts/bootstrap.py
new file mode 100644
index 0000000..d267b18
--- /dev/null
+++ b/skills/voice-message/scripts/bootstrap.py
@@ -0,0 +1,109 @@
+#!/usr/bin/env python3
+
+from __future__ import annotations
+
+import hashlib
+import subprocess
+import sys
+import traceback
+from pathlib import Path
+
+sys.stderr = sys.stdout
+
+
+def _skill_root_from(script_dir: Path) -> Path:
+ return script_dir.parent
+
+
+def _venv_dir(script_dir: Path) -> Path:
+ return _skill_root_from(script_dir) / ".venv"
+
+
+def _venv_python(venv_dir: Path) -> Path:
+ if sys.platform == "win32":
+ return venv_dir / "Scripts" / "python.exe"
+ return venv_dir / "bin" / "python"
+
+
+def _stamp_file(venv_dir: Path) -> Path:
+ return venv_dir / ".req_hash"
+
+
+def _file_hash(path: Path) -> str:
+ return hashlib.sha256(path.read_bytes()).hexdigest()
+
+
+def _deps_up_to_date(requirements_file: Path, venv_dir: Path) -> bool:
+ stamp = _stamp_file(venv_dir)
+ if not stamp.is_file():
+ return False
+ return stamp.read_text().strip() == _file_hash(requirements_file)
+
+
+def _write_stamp(requirements_file: Path, venv_dir: Path) -> None:
+ _stamp_file(venv_dir).write_text(_file_hash(requirements_file))
+
+
+def _ensure_venv(venv_dir: Path, venv_python: Path) -> int:
+ if venv_python.is_file():
+ return 0
+
+ sys.stdout.write(f"未检测到技能虚拟环境,正在创建: {venv_dir}\n")
+ command = [sys.executable, "-m", "venv", str(venv_dir)]
+
+ try:
+ subprocess.run(command, check=True, stdout=sys.stdout, stderr=sys.stdout)
+ except subprocess.CalledProcessError as exc:
+ sys.stdout.write(f"创建虚拟环境失败,退出码: {exc.returncode}\n")
+ return exc.returncode or 1
+
+ return 0
+
+
+def main() -> int:
+ script_dir = Path(__file__).resolve().parent
+ requirements_file = script_dir / "requirements.txt"
+ venv_dir = _venv_dir(script_dir)
+ venv_python = _venv_python(venv_dir)
+
+ if not requirements_file.is_file():
+ sys.stdout.write(f"未找到依赖文件: {requirements_file}\n")
+ return 1
+
+ ensure_result = _ensure_venv(venv_dir, venv_python)
+ if ensure_result != 0:
+ return ensure_result
+
+ if _deps_up_to_date(requirements_file, venv_dir):
+ sys.stdout.write("依赖已是最新,跳过安装\n")
+ return 0
+
+ command = [str(venv_python), "-m", "pip", "install", "--upgrade", "pip"]
+
+ try:
+ subprocess.run(command, check=True, stdout=sys.stdout, stderr=sys.stdout)
+ except subprocess.CalledProcessError as exc:
+ sys.stdout.write(f"升级 pip 失败,退出码: {exc.returncode}\n")
+ return exc.returncode or 1
+
+ command = [str(venv_python), "-m", "pip", "install", "-r", str(requirements_file)]
+
+ try:
+ subprocess.run(command, check=True, stdout=sys.stdout, stderr=sys.stdout)
+ except subprocess.CalledProcessError as exc:
+ sys.stdout.write(f"安装依赖失败,退出码: {exc.returncode}\n")
+ return exc.returncode or 1
+
+ _write_stamp(requirements_file, venv_dir)
+ sys.stdout.write(f"依赖安装完成,当前技能虚拟环境: {venv_dir}\n")
+ return 0
+
+
+if __name__ == "__main__":
+ try:
+ raise SystemExit(main())
+ except SystemExit:
+ raise
+ except Exception:
+ traceback.print_exc(file=sys.stdout)
+ raise SystemExit(1)
\ No newline at end of file
diff --git a/skills/voice-message/scripts/requirements.txt b/skills/voice-message/scripts/requirements.txt
new file mode 100644
index 0000000..2c34aed
--- /dev/null
+++ b/skills/voice-message/scripts/requirements.txt
@@ -0,0 +1 @@
+pymysql>=1.1,<2
\ No newline at end of file
diff --git a/skills/voice-message/scripts/voice_message.py b/skills/voice-message/scripts/voice_message.py
new file mode 100644
index 0000000..fcf94c4
--- /dev/null
+++ b/skills/voice-message/scripts/voice_message.py
@@ -0,0 +1,489 @@
+#!/usr/bin/env python3
+
+from __future__ import annotations
+
+import argparse
+import base64
+import json
+import os
+import subprocess
+import sys
+import tempfile
+import traceback
+import urllib.error
+import urllib.request
+import uuid
+from pathlib import Path
+
+sys.stderr = sys.stdout
+
+
+VALID_EMOTIONS = {
+ "happy",
+ "sad",
+ "angry",
+ "surprised",
+ "fear",
+ "hate",
+ "excited",
+ "lovey-dovey",
+ "shy",
+ "comfort",
+ "tension",
+ "tender",
+ "magnetic",
+ "vocal-fry",
+ "ASMR",
+}
+
+EMOTION_ALIASES = {
+ "vocal - fry": "vocal-fry",
+}
+
+DEFAULT_SPEAKER = "zh_female_vv_uranus_bigtts"
+DEFAULT_AUDIO_FORMAT = "mp3"
+DEFAULT_SAMPLE_RATE = 24000
+MAX_CONTENT_LENGTH = 260
+STREAM_END_CODE = 20000000
+
+
+def _skill_root() -> Path:
+ return Path(__file__).resolve().parent.parent
+
+
+def _skill_venv_python() -> Path:
+ venv_dir = _skill_root() / ".venv"
+ if sys.platform == "win32":
+ return venv_dir / "Scripts" / "python.exe"
+ return venv_dir / "bin" / "python"
+
+
+def _run_bootstrap() -> None:
+ bootstrap = Path(__file__).resolve().parent / "bootstrap.py"
+ result = subprocess.run([sys.executable, str(bootstrap)])
+ if result.returncode != 0:
+ raise SystemExit(result.returncode)
+
+
+def _ensure_skill_venv_python() -> None:
+ venv_python = _skill_venv_python()
+ if not venv_python.is_file():
+ _run_bootstrap()
+ venv_python = _skill_venv_python()
+ if not venv_python.is_file():
+ sys.stdout.write("bootstrap 后仍未找到虚拟环境\n")
+ raise SystemExit(1)
+
+ venv_dir = _skill_root() / ".venv"
+ if Path(sys.prefix) == venv_dir.resolve():
+ return
+
+ os.execv(str(venv_python), [str(venv_python), str(Path(__file__).resolve()), *sys.argv[1:]])
+
+
+_ensure_skill_venv_python()
+
+try:
+ import pymysql # type: ignore # noqa: E402
+except ModuleNotFoundError:
+ _run_bootstrap()
+ os.execv(sys.executable, [sys.executable, str(Path(__file__).resolve()), *sys.argv[1:]])
+
+
+def _mysql_connect():
+ host = os.environ.get("MYSQL_HOST", "127.0.0.1")
+ port = int(os.environ.get("MYSQL_PORT", "3306"))
+ user = os.environ.get("MYSQL_USER", "root")
+ password = os.environ.get("MYSQL_PASSWORD", "")
+ database = os.environ.get("ROBOT_CODE", "")
+ if not database:
+ raise RuntimeError("环境变量 ROBOT_CODE 未配置")
+
+ return pymysql.connect(
+ host=host,
+ port=port,
+ user=user,
+ password=password,
+ database=database,
+ charset="utf8mb4",
+ connect_timeout=10,
+ read_timeout=300,
+ write_timeout=300,
+ )
+
+
+def _query_one(conn, sql: str, params: tuple = ()) -> dict | None:
+ cur = conn.cursor()
+ cur.execute(sql, params)
+ columns = [desc[0] for desc in cur.description] if cur.description else []
+ row = cur.fetchone()
+ cur.close()
+ if row is None:
+ return None
+ return dict(zip(columns, row))
+
+
+def _load_json_field(raw: object) -> dict:
+ if raw is None:
+ return {}
+ if isinstance(raw, (bytes, bytearray)):
+ raw = raw.decode("utf-8")
+ if isinstance(raw, str):
+ if not raw.strip():
+ return {}
+ value = json.loads(raw)
+ return value if isinstance(value, dict) else {}
+ if isinstance(raw, dict):
+ return raw
+ return {}
+
+
+def load_tts_settings(conn, from_wx_id: str) -> tuple[bool, dict]:
+ global_row = _query_one(conn, "SELECT tts_enabled, tts_settings FROM global_settings LIMIT 1")
+ enabled = False
+ settings_json: dict = {}
+
+ if global_row:
+ if global_row.get("tts_enabled") is not None:
+ enabled = bool(global_row["tts_enabled"])
+ settings_json = _load_json_field(global_row.get("tts_settings"))
+
+ if from_wx_id.endswith("@chatroom"):
+ override = _query_one(
+ conn,
+ "SELECT tts_enabled, tts_settings FROM chat_room_settings WHERE chat_room_id = %s LIMIT 1",
+ (from_wx_id,),
+ )
+ else:
+ override = _query_one(
+ conn,
+ "SELECT tts_enabled, tts_settings FROM friend_settings WHERE wechat_id = %s LIMIT 1",
+ (from_wx_id,),
+ )
+
+ if override:
+ if override.get("tts_enabled") is not None:
+ enabled = bool(override["tts_enabled"])
+ override_settings = _load_json_field(override.get("tts_settings"))
+ if override_settings:
+ settings_json = override_settings
+
+ return enabled, settings_json
+
+
+def _normalize_emotion(emotion: str) -> str:
+ normalized = EMOTION_ALIASES.get(emotion.strip(), emotion.strip())
+ if normalized not in VALID_EMOTIONS:
+ raise ValueError("emotion 不在支持范围内")
+ return normalized
+
+
+def _parse_cli_params(argv: list[str]) -> dict:
+ parser = argparse.ArgumentParser(add_help=False)
+ parser.add_argument("--content", default="")
+ parser.add_argument("--emotion", default="")
+ parser.add_argument("--context_texts", action="append", default=[])
+
+ namespace, unknown = parser.parse_known_args(argv)
+ if unknown:
+ raise ValueError(f"存在不支持的参数: {' '.join(unknown)}")
+
+ return {
+ "content": namespace.content,
+ "emotion": namespace.emotion,
+ "context_texts": [item for item in namespace.context_texts if item.strip()],
+ }
+
+
+def _build_request_headers(config: dict) -> dict[str, str]:
+ request_header = config.get("request_header") or {}
+ if not isinstance(request_header, dict):
+ raise RuntimeError("request_header 配置格式错误")
+
+ app_id = str(request_header.get("X-Api-App-Id") or "").strip()
+ access_key = str(request_header.get("X-Api-Access-Key") or "").strip()
+ resource_id = str(request_header.get("X-Api-Resource-Id") or "").strip()
+ if not app_id or not access_key or not resource_id:
+ raise RuntimeError("请求头参数不能为空")
+
+ headers = {
+ "Content-Type": "application/json",
+ "X-Api-App-Id": app_id,
+ "X-Api-Access-Key": access_key,
+ "X-Api-Resource-Id": resource_id,
+ }
+ request_id = str(request_header.get("X-Api-Request-Id") or "").strip()
+ if request_id:
+ headers["X-Api-Request-Id"] = request_id
+ usage_header = str(request_header.get("X-Control-Require-Usage-Tokens-Return") or "").strip()
+ if usage_header:
+ headers["X-Control-Require-Usage-Tokens-Return"] = usage_header
+ return headers
+
+
+def _build_request_body(config: dict, content: str, emotion: str, context_texts: list[str]) -> dict:
+ request_body = config.get("request_body") or {}
+ if not isinstance(request_body, dict):
+ raise RuntimeError("request_body 配置格式错误")
+
+ body = json.loads(json.dumps(request_body))
+ user = body.setdefault("user", {})
+ if not isinstance(user, dict):
+ raise RuntimeError("user 配置格式错误")
+ user["uid"] = str(uuid.uuid4())
+
+ req_params = body.setdefault("req_params", {})
+ if not isinstance(req_params, dict):
+ raise RuntimeError("req_params 配置格式错误")
+
+ if not str(req_params.get("speaker") or "").strip():
+ req_params["speaker"] = DEFAULT_SPEAKER
+ req_params["text"] = content
+
+ audio_params = req_params.setdefault("audio_params", {})
+ if not isinstance(audio_params, dict):
+ raise RuntimeError("audio_params 配置格式错误")
+ audio_params["format"] = DEFAULT_AUDIO_FORMAT
+ audio_params["sample_rate"] = DEFAULT_SAMPLE_RATE
+ if emotion:
+ audio_params["emotion"] = emotion
+ audio_params["emotion_scale"] = 5
+
+ additions = req_params.setdefault("x-additions", {})
+ if not isinstance(additions, dict):
+ raise RuntimeError("x-additions 配置格式错误")
+ if context_texts:
+ additions["context_texts"] = context_texts
+
+ return body
+
+
+def synthesize_audio(config: dict, content: str, emotion: str, context_texts: list[str]) -> tuple[bytes, str]:
+ url = str(config.get("url") or "").strip()
+ if not url:
+ raise RuntimeError("语音合成地址不能为空")
+
+ request_headers = _build_request_headers(config)
+ request_body = _build_request_body(config, content, emotion, context_texts)
+ request_data = json.dumps(request_body).encode("utf-8")
+
+ req = urllib.request.Request(url, data=request_data, headers=request_headers, method="POST")
+ try:
+ response = urllib.request.urlopen(req, timeout=300)
+ except urllib.error.HTTPError as exc:
+ error_body = exc.read().decode("utf-8", errors="replace")
+ raise RuntimeError(f"API请求失败,状态码 {exc.code}: {error_body}") from exc
+ except urllib.error.URLError as exc:
+ raise RuntimeError(f"发送请求失败: {exc}") from exc
+
+ audio_chunks = bytearray()
+ audio_format = str(
+ ((request_body.get("req_params") or {}).get("audio_params") or {}).get("format") or DEFAULT_AUDIO_FORMAT
+ ).strip() or DEFAULT_AUDIO_FORMAT
+
+ with response:
+ for raw_line in response:
+ line = raw_line.decode("utf-8", errors="replace").strip()
+ if not line:
+ continue
+ if line.startswith("data:"):
+ line = line[5:].strip()
+ if not line:
+ continue
+
+ try:
+ payload = json.loads(line)
+ except json.JSONDecodeError as exc:
+ raise RuntimeError(f"解析响应失败: {exc}, 行内容: {line}") from exc
+
+ code = int(payload.get("code") or 0)
+ message = str(payload.get("message") or "")
+ audio_b64 = payload.get("data")
+
+ if code == 0 and isinstance(audio_b64, str) and audio_b64:
+ try:
+ audio_chunks.extend(base64.b64decode(audio_b64))
+ except Exception as exc:
+ raise RuntimeError(f"解码音频数据失败: {exc}") from exc
+ continue
+
+ if code == 0 and isinstance(payload.get("sentence"), dict):
+ continue
+
+ if code == STREAM_END_CODE:
+ break
+
+ if code > 0:
+ raise RuntimeError(f"合成失败,错误码: {code}, 错误信息: {message}")
+
+ if not audio_chunks:
+ raise RuntimeError("未接收到音频数据")
+
+ return bytes(audio_chunks), audio_format
+
+
+def _guess_mime_type(audio_format: str) -> str:
+ fmt = audio_format.lower()
+ if fmt == "mp3":
+ return "audio/mpeg"
+ if fmt == "wav":
+ return "audio/wav"
+ if fmt == "amr":
+ return "audio/amr"
+ return "application/octet-stream"
+
+
+def _encode_multipart_formdata(fields: dict[str, str], files: list[tuple[str, str, bytes, str]]) -> tuple[bytes, str]:
+ boundary = f"----wechatrobot{uuid.uuid4().hex}"
+ chunks: list[bytes] = []
+
+ for name, value in fields.items():
+ chunks.extend(
+ [
+ f"--{boundary}\r\n".encode("utf-8"),
+ f'Content-Disposition: form-data; name="{name}"\r\n\r\n'.encode("utf-8"),
+ value.encode("utf-8"),
+ b"\r\n",
+ ]
+ )
+
+ for field_name, filename, data, content_type in files:
+ chunks.extend(
+ [
+ f"--{boundary}\r\n".encode("utf-8"),
+ (
+ f'Content-Disposition: form-data; name="{field_name}"; '
+ f'filename="{filename}"\r\n'
+ ).encode("utf-8"),
+ f"Content-Type: {content_type}\r\n\r\n".encode("utf-8"),
+ data,
+ b"\r\n",
+ ]
+ )
+
+ chunks.append(f"--{boundary}--\r\n".encode("utf-8"))
+ return b"".join(chunks), boundary
+
+
+def send_voice(from_wx_id: str, audio_data: bytes, audio_format: str) -> None:
+ client_port = os.environ.get("ROBOT_WECHAT_CLIENT_PORT", "").strip()
+ if not client_port:
+ raise RuntimeError("环境变量 ROBOT_WECHAT_CLIENT_PORT 未配置")
+
+ send_url = f"http://127.0.0.1:{client_port}/api/v1/robot/message/send/voice"
+ suffix = f".{audio_format.lower() or DEFAULT_AUDIO_FORMAT}"
+
+ with tempfile.NamedTemporaryFile(prefix="voice-message-", suffix=suffix, delete=False) as temp_file:
+ temp_file.write(audio_data)
+ temp_path = Path(temp_file.name)
+
+ try:
+ file_bytes = temp_path.read_bytes()
+ body, boundary = _encode_multipart_formdata(
+ {"to_wxid": from_wx_id},
+ [("voice", temp_path.name, file_bytes, _guess_mime_type(audio_format))],
+ )
+ req = urllib.request.Request(
+ send_url,
+ data=body,
+ headers={"Content-Type": f"multipart/form-data; boundary={boundary}"},
+ method="POST",
+ )
+ try:
+ with urllib.request.urlopen(req, timeout=60) as resp:
+ resp.read()
+ except urllib.error.HTTPError as exc:
+ error_body = exc.read().decode("utf-8", errors="replace")
+ raise RuntimeError(f"发送语音失败,状态码 {exc.code}: {error_body}") from exc
+ except urllib.error.URLError as exc:
+ raise RuntimeError(f"发送语音失败: {exc}") from exc
+ finally:
+ try:
+ temp_path.unlink(missing_ok=True)
+ except Exception:
+ pass
+
+
+def main() -> int:
+ if len(sys.argv) < 2:
+ sys.stdout.write("缺少输入参数\n")
+ return 1
+
+ try:
+ params = _parse_cli_params(sys.argv[1:])
+ except ValueError as exc:
+ sys.stdout.write(f"参数格式错误: {exc}\n")
+ return 1
+
+ content = params.get("content", "").strip()
+ if not content:
+ sys.stdout.write("文本转语音的输入文本不能为空\n")
+ return 1
+ if len(content) > MAX_CONTENT_LENGTH:
+ sys.stdout.write("你要说的也太多了,要不你还是说点别的吧。\n")
+ return 1
+
+ emotion = params.get("emotion", "").strip()
+ if emotion:
+ try:
+ emotion = _normalize_emotion(emotion)
+ except ValueError as exc:
+ sys.stdout.write(f"参数格式错误: {exc}\n")
+ return 1
+
+ context_texts = params.get("context_texts", [])
+
+ from_wx_id = os.environ.get("ROBOT_FROM_WX_ID", "").strip()
+ if not from_wx_id:
+ sys.stdout.write("环境变量 ROBOT_FROM_WX_ID 未配置\n")
+ return 1
+
+ try:
+ conn = _mysql_connect()
+ except Exception as exc:
+ sys.stdout.write(f"数据库连接失败: {exc}\n")
+ return 1
+
+ try:
+ enabled, tts_settings = load_tts_settings(conn, from_wx_id)
+ except Exception as exc:
+ sys.stdout.write(f"加载文本转语音配置失败: {exc}\n")
+ return 1
+ finally:
+ try:
+ conn.close()
+ except Exception:
+ pass
+
+ if not enabled:
+ sys.stdout.write("文本转语音未开启\n")
+ return 0
+
+ if not isinstance(tts_settings, dict) or not tts_settings:
+ sys.stdout.write("未找到文本转语音配置\n")
+ return 1
+
+ try:
+ audio_data, audio_format = synthesize_audio(tts_settings, content, emotion, context_texts)
+ except Exception as exc:
+ sys.stdout.write(f"语音合成失败: {exc}\n")
+ return 1
+
+ try:
+ send_voice(from_wx_id, audio_data, audio_format)
+ sys.stdout.write("ended")
+ except Exception as exc:
+ sys.stdout.write(f"发送语音失败: {exc}\n")
+ return 1
+
+ return 0
+
+
+if __name__ == "__main__":
+ try:
+ raise SystemExit(main())
+ except SystemExit:
+ raise
+ except Exception:
+ traceback.print_exc(file=sys.stdout)
+ raise SystemExit(1)
\ No newline at end of file