feat: 豆包视频理解

This commit is contained in:
hp0912 2026-04-18 13:19:21 +08:00
parent 718eca02ca
commit b130f22012
5 changed files with 587 additions and 0 deletions

View File

@ -0,0 +1,16 @@
# 视频理解技能
**视频理解技能由豆包加持,使用本技能请将图片识别模型设置为豆包大模型**
这个技能需要注入 mysql 数据库环境变量
- MYSQL_HOST
- MYSQL_PORT
- MYSQL_USER
- MYSQL_PASSWORD
需要额外注入豆包密钥
- ARK_API_KEY
以上环境变量,在界面上安装完本技能后,点击`环境变量`按钮设置

View File

@ -0,0 +1,89 @@
---
name: doubao-video-understanding
description: "豆包视频解析理解工具。当用户提供一个视频链接并希望获得视频的详细描述、总结或理解时使用。"
argument-hint: "需要 prompt、video_url可选 fps、max_tokens。"
---
# Doubao Video Understanding Skill
## 描述
这是一个 AI 视频解析理解技能,输入一个视频链接,输出视频的详细描述、总结,或对视频内容的理解。
脚本会先从数据库读取当前会话的图像 AI 配置开关,再读取对应的 `image_recognition_model` 作为理解模型,并使用环境变量中的 `ARK_API_KEY` 调用 Ark 多模态对话接口完成视频分析。
这个仓库里额外提供了一个可执行脚本 `scripts/video_understanding.py`,方便宿主机器人直接调用。
## 触发条件
- 用户发来一个视频链接,并要求描述视频内容。
- 用户说「总结这个视频」「帮我理解这个视频」「分析一下这个视频讲了什么」。
- 用户希望获取视频的详细描述、核心摘要、主题理解。
## 入参规范
```json
{
"type": "object",
"properties": {
"prompt": {
"type": "string",
"description": "可选的分析指令。默认会要求模型输出详细描述、总结和理解。"
},
"video_url": {
"type": "string",
"description": "需要解析的视频链接,必须是 https 地址。"
},
"fps": {
"type": "integer",
"description": "抽帧频率,可选,默认 2。"
},
"max_tokens": {
"type": "integer",
"description": "模型输出最大 token 数,可选,默认 800。"
}
},
"required": ["prompt", "video_url"],
"additionalProperties": false
}
```
对应的命令行参数为:
- `--prompt <分析指令>` 必填
- `--video_url <视频链接>` 必填,必须是 `https` 地址
- `--fps <抽帧频率>` 可选
- `--max_tokens <最大输出 token 数>` 可选
## 依赖安装
- 脚本首次运行时会自动创建虚拟环境并安装依赖,无需手动执行。
- 如需手动重新安装,可执行:`python3 scripts/bootstrap.py`
## 执行步骤
1. 当用户提供视频链接并要求描述、总结或理解时触发该技能。
2. 提取 `prompt` 用户需求和 `video_url` 视频链接。可选提取 `fps`、`max_tokens`。
3. 在仓库根目录执行脚本,例如:
```bash
python3 scripts/video_understanding.py --prompt '请描述这个视频' --video_url 'https://example.com/demo.mp4'
```
4. 脚本会从数据库读取 `image_ai_enabled``image_recognition_model`。模型读取顺序为:当前会话覆盖配置优先,其次全局配置;如果表字段不存在,则回退到 `image_ai_settings` JSON 中的同名字段。
5. 脚本调用 `https://ark.cn-beijing.volces.com/api/v3/chat/completions`,将视频链接和分析指令一起发送给视觉模型。
6. 成功时,脚本输出文本结果,宿主机器人可直接作为消息回复给用户。
## 校验规则
- `prompt` 不能为空。
- `video_url` 不能为空,且必须是 `https` 链接。
- `fps` 必须大于 0。
- `max_tokens` 必须大于 0。
- 环境变量 `ARK_API_KEY` 必须存在。
- 数据库里必须开启图像 AI 能力,并能解析出 `image_recognition_model`
## 回复要求
- 成功时,脚本输出视频理解结果。
- 失败时,返回脚本输出的具体错误信息。

View File

@ -0,0 +1,128 @@
#!/usr/bin/env python3
from __future__ import annotations
import hashlib
import subprocess
import sys
import traceback
from pathlib import Path
sys.stderr = sys.stdout
def _skill_root_from(script_dir: Path) -> Path:
return script_dir.parent
def _venv_dir(script_dir: Path) -> Path:
return _skill_root_from(script_dir) / ".venv"
def _venv_python(venv_dir: Path) -> Path:
if sys.platform == "win32":
return venv_dir / "Scripts" / "python.exe"
return venv_dir / "bin" / "python"
def _stamp_file(venv_dir: Path) -> Path:
return venv_dir / ".req_hash"
def _file_hash(path: Path) -> str:
return hashlib.sha256(path.read_bytes()).hexdigest()
def _deps_up_to_date(requirements_file: Path, venv_dir: Path) -> bool:
stamp = _stamp_file(venv_dir)
if not stamp.is_file():
return False
return stamp.read_text().strip() == _file_hash(requirements_file)
def _write_stamp(requirements_file: Path, venv_dir: Path) -> None:
_stamp_file(venv_dir).write_text(_file_hash(requirements_file))
def _ensure_venv(venv_dir: Path, venv_python: Path) -> int:
if venv_python.is_file():
return 0
sys.stdout.write(f"未检测到技能虚拟环境,正在创建: {venv_dir}\n")
command = [
sys.executable,
"-m",
"venv",
str(venv_dir),
]
try:
subprocess.run(command, check=True, stdout=sys.stdout, stderr=sys.stdout)
except subprocess.CalledProcessError as exc:
sys.stdout.write(f"创建虚拟环境失败,退出码: {exc.returncode}\n")
return exc.returncode or 1
return 0
def main() -> int:
script_dir = Path(__file__).resolve().parent
requirements_file = script_dir / "requirements.txt"
venv_dir = _venv_dir(script_dir)
venv_python = _venv_python(venv_dir)
if not requirements_file.is_file():
sys.stdout.write(f"未找到依赖文件: {requirements_file}\n")
return 1
ensure_result = _ensure_venv(venv_dir, venv_python)
if ensure_result != 0:
return ensure_result
if _deps_up_to_date(requirements_file, venv_dir):
sys.stdout.write("依赖已是最新,跳过安装\n")
return 0
command = [
str(venv_python),
"-m",
"pip",
"install",
"--upgrade",
"pip",
]
try:
subprocess.run(command, check=True, stdout=sys.stdout, stderr=sys.stdout)
except subprocess.CalledProcessError as exc:
sys.stdout.write(f"升级 pip 失败,退出码: {exc.returncode}\n")
return exc.returncode or 1
command = [
str(venv_python),
"-m",
"pip",
"install",
"-r",
str(requirements_file),
]
try:
subprocess.run(command, check=True, stdout=sys.stdout, stderr=sys.stdout)
except subprocess.CalledProcessError as exc:
sys.stdout.write(f"安装依赖失败,退出码: {exc.returncode}\n")
return exc.returncode or 1
_write_stamp(requirements_file, venv_dir)
sys.stdout.write(f"依赖安装完成,当前技能虚拟环境: {venv_dir}\n")
return 0
if __name__ == "__main__":
try:
raise SystemExit(main())
except SystemExit:
raise
except Exception:
traceback.print_exc(file=sys.stdout)
raise SystemExit(1)

View File

@ -0,0 +1 @@
pymysql>=1.1,<2

View File

@ -0,0 +1,353 @@
#!/usr/bin/env python3
from __future__ import annotations
import argparse
import json
import os
import subprocess
import sys
import traceback
import urllib.error
import urllib.request
from pathlib import Path
from urllib.parse import urlparse
sys.stderr = sys.stdout
DEFAULT_PROMPT = "请用中文输出分成三部分1. 详细描述视频内容2. 总结核心信息3. 给出对视频的理解。"
DEFAULT_FPS = 2
DEFAULT_MAX_TOKENS = 800
def _skill_root() -> Path:
return Path(__file__).resolve().parent.parent
def _skill_venv_python() -> Path:
venv_dir = _skill_root() / ".venv"
if sys.platform == "win32":
return venv_dir / "Scripts" / "python.exe"
return venv_dir / "bin" / "python"
def _run_bootstrap() -> None:
bootstrap = Path(__file__).resolve().parent / "bootstrap.py"
result = subprocess.run([sys.executable, str(bootstrap)])
if result.returncode != 0:
raise SystemExit(result.returncode)
def _ensure_skill_venv_python() -> None:
venv_python = _skill_venv_python()
if not venv_python.is_file():
_run_bootstrap()
venv_python = _skill_venv_python()
if not venv_python.is_file():
sys.stdout.write("bootstrap 后仍未找到虚拟环境\n")
raise SystemExit(1)
venv_dir = _skill_root() / ".venv"
if Path(sys.prefix) == venv_dir.resolve():
return
os.execv(str(venv_python), [str(venv_python), str(Path(__file__).resolve()), *sys.argv[1:]])
_ensure_skill_venv_python()
try:
import pymysql # type: ignore # noqa: E402
except ModuleNotFoundError:
_run_bootstrap()
os.execv(sys.executable, [sys.executable, str(Path(__file__).resolve()), *sys.argv[1:]])
def _mysql_connect():
host = os.environ.get("MYSQL_HOST", "127.0.0.1")
port = int(os.environ.get("MYSQL_PORT", "3306"))
user = os.environ.get("MYSQL_USER", "root")
password = os.environ.get("MYSQL_PASSWORD", "")
database = os.environ.get("ROBOT_CODE", "")
if not database:
raise RuntimeError("环境变量 ROBOT_CODE 未配置")
return pymysql.connect(
host=host,
port=port,
user=user,
password=password,
database=database,
charset="utf8mb4",
connect_timeout=10,
read_timeout=30,
)
def _query_one(conn, sql: str, params: tuple = ()) -> dict | None:
cur = conn.cursor()
cur.execute(sql, params)
columns = [desc[0] for desc in cur.description] if cur.description else []
row = cur.fetchone()
cur.close()
if row is None:
return None
return dict(zip(columns, row))
def _table_has_column(conn, table_name: str, column_name: str) -> bool:
sql = (
"SELECT 1 FROM information_schema.columns "
"WHERE table_schema = %s AND table_name = %s AND column_name = %s LIMIT 1"
)
database_name = conn.db
if isinstance(database_name, (bytes, bytearray)):
database_name = database_name.decode("utf-8")
cur = conn.cursor()
cur.execute(sql, (database_name, table_name, column_name))
row = cur.fetchone()
cur.close()
return row is not None
def _decode_settings(raw: object) -> dict:
if not raw:
return {}
if isinstance(raw, (bytes, bytearray)):
raw = raw.decode("utf-8")
if isinstance(raw, str) and raw.strip():
return json.loads(raw)
return {}
def _extract_model(record: dict | None, settings_json: dict) -> str:
if record:
model = record.get("image_recognition_model")
if isinstance(model, (bytes, bytearray)):
model = model.decode("utf-8")
if isinstance(model, str) and model.strip():
return model.strip()
for key in ("image_recognition_model", "imageRecognitionModel"):
value = settings_json.get(key)
if isinstance(value, str) and value.strip():
return value.strip()
return ""
def load_understanding_settings(conn, from_wx_id: str) -> tuple[bool, str]:
global_has_model = _table_has_column(conn, "global_settings", "image_recognition_model")
chatroom_has_model = _table_has_column(conn, "chat_room_settings", "image_recognition_model")
friend_has_model = _table_has_column(conn, "friend_settings", "image_recognition_model")
global_fields = "image_ai_enabled, image_ai_settings"
if global_has_model:
global_fields += ", image_recognition_model"
global_record = _query_one(conn, f"SELECT {global_fields} FROM global_settings LIMIT 1")
enabled = False
settings_json: dict = {}
model = ""
if global_record:
if global_record.get("image_ai_enabled") is not None:
enabled = bool(global_record["image_ai_enabled"])
settings_json = _decode_settings(global_record.get("image_ai_settings"))
model = _extract_model(global_record, settings_json)
if from_wx_id.endswith("@chatroom"):
override_fields = "image_ai_enabled, image_ai_settings"
if chatroom_has_model:
override_fields += ", image_recognition_model"
override = _query_one(
conn,
f"SELECT {override_fields} FROM chat_room_settings WHERE chat_room_id = %s LIMIT 1",
(from_wx_id,),
)
else:
override_fields = "image_ai_enabled, image_ai_settings"
if friend_has_model:
override_fields += ", image_recognition_model"
override = _query_one(
conn,
f"SELECT {override_fields} FROM friend_settings WHERE wechat_id = %s LIMIT 1",
(from_wx_id,),
)
if override:
if override.get("image_ai_enabled") is not None:
enabled = bool(override["image_ai_enabled"])
override_settings = _decode_settings(override.get("image_ai_settings"))
if override_settings:
settings_json = override_settings
override_model = _extract_model(override, settings_json)
if override_model:
model = override_model
return enabled, model
def _http_post_json(url: str, body: dict, headers: dict, timeout: int = 300) -> dict:
data = json.dumps(body).encode("utf-8")
req = urllib.request.Request(url, data=data, headers=headers, method="POST")
try:
with urllib.request.urlopen(req, timeout=timeout) as resp:
return json.loads(resp.read().decode("utf-8"))
except urllib.error.HTTPError as exc:
error_body = exc.read().decode("utf-8", errors="replace")
raise RuntimeError(f"HTTP {exc.code}: {error_body}") from exc
except urllib.error.URLError as exc:
raise RuntimeError(str(exc)) from exc
def _extract_response_text(payload: dict) -> str:
choices = payload.get("choices", [])
if not choices:
return ""
message = choices[0].get("message", {})
content = message.get("content", "")
if isinstance(content, str):
return content.strip()
if isinstance(content, list):
texts: list[str] = []
for item in content:
if not isinstance(item, dict):
continue
if item.get("type") == "text" and isinstance(item.get("text"), str):
texts.append(item["text"].strip())
return "\n".join(text for text in texts if text)
return ""
def analyze_video(video_url: str, prompt: str, model: str, fps: int, max_tokens: int) -> str:
api_key = os.environ.get("ARK_API_KEY", "").strip()
if not api_key:
raise RuntimeError("环境变量 ARK_API_KEY 未配置")
if not model:
raise RuntimeError("数据库中未配置 image_recognition_model")
body = {
"model": model,
"messages": [
{
"role": "user",
"content": [
{"type": "video_url", "video_url": {"url": video_url}, "fps": str(fps)},
{"type": "text", "text": prompt},
],
}
],
"max_tokens": max_tokens,
}
response = _http_post_json(
"https://ark.cn-beijing.volces.com/api/v3/chat/completions",
body,
{"Content-Type": "application/json", "Authorization": f"Bearer {api_key}"},
timeout=300,
)
text = _extract_response_text(response)
if not text:
raise RuntimeError("视频理解接口未返回文本内容")
return text
def _validate_video_url(value: str) -> str:
parsed = urlparse(value)
if parsed.scheme != "https" or not parsed.netloc:
raise ValueError("video_url 必须是 https 链接")
return value
def _parse_cli_params(argv: list[str]) -> dict:
parser = argparse.ArgumentParser(add_help=False)
parser.add_argument("--video_url", default="")
parser.add_argument("--prompt", default=DEFAULT_PROMPT)
parser.add_argument("--fps", type=int, default=DEFAULT_FPS)
parser.add_argument("--max_tokens", type=int, default=DEFAULT_MAX_TOKENS)
namespace, unknown = parser.parse_known_args(argv)
if unknown:
raise ValueError(f"存在不支持的参数: {' '.join(unknown)}")
if namespace.fps <= 0:
raise ValueError("fps 必须大于 0")
if namespace.max_tokens <= 0:
raise ValueError("max_tokens 必须大于 0")
return {
"video_url": namespace.video_url,
"prompt": namespace.prompt,
"fps": namespace.fps,
"max_tokens": namespace.max_tokens,
}
def main() -> int:
if len(sys.argv) < 2:
sys.stdout.write("缺少输入参数\n")
return 1
try:
params = _parse_cli_params(sys.argv[1:])
except ValueError as exc:
sys.stdout.write(f"参数格式错误: {exc}\n")
return 1
video_url = params.get("video_url", "").strip()
if not video_url:
sys.stdout.write("缺少视频链接\n")
return 1
try:
_validate_video_url(video_url)
except ValueError as exc:
sys.stdout.write(f"参数格式错误: {exc}\n")
return 1
prompt = params.get("prompt", "").strip() or DEFAULT_PROMPT
fps = int(params.get("fps", DEFAULT_FPS))
max_tokens = int(params.get("max_tokens", DEFAULT_MAX_TOKENS))
from_wx_id = os.environ.get("ROBOT_FROM_WX_ID", "").strip()
if not from_wx_id:
sys.stdout.write("环境变量 ROBOT_FROM_WX_ID 未配置\n")
return 1
try:
conn = _mysql_connect()
except Exception as exc:
sys.stdout.write(f"数据库连接失败: {exc}\n")
return 1
try:
enabled, model = load_understanding_settings(conn, from_wx_id)
except Exception as exc:
sys.stdout.write(f"加载视频理解配置失败: {exc}\n")
return 1
finally:
try:
conn.close()
except Exception:
pass
if not enabled:
sys.stdout.write("AI 图像识别未开启\n")
return 0
try:
content = analyze_video(video_url, prompt, model, fps, max_tokens)
except Exception as exc:
sys.stdout.write(f"调用视频理解接口失败: {exc}\n")
return 1
sys.stdout.write(f"{content}\n")
return 0
if __name__ == "__main__":
try:
raise SystemExit(main())
except SystemExit:
raise
except Exception:
traceback.print_exc(file=sys.stdout)
raise SystemExit(1)