From 5dbae039d46773b666c834e6b29342ab62c4b88b Mon Sep 17 00:00:00 2001 From: houhou <1944230461@qq.com> Date: Mon, 11 May 2026 14:32:26 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=96=B0=E5=A2=9E=E6=8A=96=E9=9F=B3?= =?UTF-8?q?=E8=A7=86=E9=A2=91/=E5=9B=BE=E7=89=87=E8=A7=A3=E6=9E=90=20skill?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- skills/douyin-video-parse/SKILL.md | 53 +++ .../scripts/douyin_video_parse.py | 345 ++++++++++++++++++ 2 files changed, 398 insertions(+) create mode 100644 skills/douyin-video-parse/SKILL.md create mode 100644 skills/douyin-video-parse/scripts/douyin_video_parse.py diff --git a/skills/douyin-video-parse/SKILL.md b/skills/douyin-video-parse/SKILL.md new file mode 100644 index 0000000..cd83e02 --- /dev/null +++ b/skills/douyin-video-parse/SKILL.md @@ -0,0 +1,53 @@ +--- +name: douyin-video-parse +description: "当用户发送包含抖音短链接(https://v.douyin.com/xxx)的消息时触发。自动解析抖音视频/图片,并发送给当前用户。" +argument-hint: "消息中包含抖音短链接即可自动触发" +--- + +# Douyin Video Parse Skill + +## 描述 + +这是一个用于解析抖音短视频/图片的技能。 + +当用户发送的消息中包含 `https://v.douyin.com/` 链接时,自动解析该链接对应的视频或图片,并通过本地微信机器人接口发送给当前用户。 + +这个仓库里额外提供了一个可执行脚本 `scripts/douyin_video_parse.py`,方便宿主机器人直接调用。 + +## 触发条件 + +- 用户消息中包含 `https://v.douyin.com/` 链接 + +## 解析原理 + +1. 访问抖音短链接,跟随 302 重定向获取真实页面 URL +2. 请求真实页面 HTML,从中提取 `window._ROUTER_DATA` JSON 数据 +3. 从 JSON 中解析出视频播放地址或图片列表 +4. 通过本地微信机器人接口发送视频或图片 + +## 环境变量 + +- `ROBOT_WECHAT_CLIENT_PORT`:本地微信机器人服务端口。 +- `ROBOT_FROM_WX_ID`:当前消息来源用户的 wxid。 +- `ROBOT_MESSAGE_CONTENT`:用户发送的原始消息内容(用于提取抖音链接)。 + +## 执行步骤 + +1. 当用户消息中包含 `https://v.douyin.com/` 链接时触发该技能。 +2. 在仓库根目录下执行本地脚本:`python3 scripts/douyin_video_parse.py`。 +3. 脚本从环境变量 `ROBOT_MESSAGE_CONTENT` 中提取抖音短链接。 +4. 脚本访问短链接,跟随重定向获取真实页面 URL。 +5. 脚本请求真实页面,解析 `window._ROUTER_DATA` 中的视频/图片信息。 +6. 如果是视频: + - 先发送分享卡片链接 + - 再调用 `POST http://127.0.0.1:{ROBOT_WECHAT_CLIENT_PORT}/api/v1/robot/message/send/video/url` 发送视频 +7. 如果是图片: + - 发送文字提示(作者、标题、图片数量) + - 调用 `POST http://127.0.0.1:{ROBOT_WECHAT_CLIENT_PORT}/api/v1/robot/message/send/image/url` 逐张发送图片 +8. 如果解析失败,回复兜底文案:`抖音解析失败,可能是链接已失效或格式不正确。` + +## 回复要求 + +- 视频类型:发送视频文件,附带作者和标题信息。 +- 图片类型:发送所有图片,附带作者和标题信息。 +- 失败时,使用固定兜底文案回复。 \ No newline at end of file diff --git a/skills/douyin-video-parse/scripts/douyin_video_parse.py b/skills/douyin-video-parse/scripts/douyin_video_parse.py new file mode 100644 index 0000000..a59eb21 --- /dev/null +++ b/skills/douyin-video-parse/scripts/douyin_video_parse.py @@ -0,0 +1,345 @@ +#!/usr/bin/env python3 + +from __future__ import annotations + +import html +import json +import os +import re +import sys +import traceback +import urllib.error +import urllib.parse +import urllib.request + + +sys.stderr = sys.stdout + + +DOUYIN_USER_AGENT = ( + "Mozilla/5.0 (iPhone; CPU iPhone OS 14_0 like Mac OS X) " + "AppleWebKit/605.1.15 (KHTML, like Gecko) " + "Version/14.0 Mobile/15E148 Safari/604.1" +) +DOUYIN_REFERER = "https://www.douyin.com/" +FALLBACK_TEXT = "抖音解析失败,可能是链接已失效或格式不正确。" +ROUTER_DATA_RE = re.compile(r"(?s)window\._ROUTER_DATA\s*=\s*(\{.*?\})\s*") +DOUYIN_URL_RE = re.compile(r"https://[^\s]+") + + +def build_request(url: str) -> urllib.request.Request: + return urllib.request.Request( + url, + headers={ + "User-Agent": DOUYIN_USER_AGENT, + "Referer": DOUYIN_REFERER, + }, + ) + + +def resolve_redirect(short_url: str) -> str | None: + """Follow the 302 redirect to get the real page URL.""" + + class NoRedirectHandler(urllib.request.HTTPRedirectHandler): + def redirect_request(self, req, fp, code, msg, headers, newurl): + return None + + opener = urllib.request.build_opener(NoRedirectHandler) + req = build_request(short_url) + try: + response = opener.open(req, timeout=15) + return response.url + except urllib.error.HTTPError as e: + location = e.headers.get("Location") + if location: + return location + return None + except (urllib.error.URLError, TimeoutError): + return None + + +def fetch_page_html(page_url: str) -> str | None: + """Fetch the Douyin page HTML content.""" + req = build_request(page_url) + try: + with urllib.request.urlopen(req, timeout=15) as response: + if response.status != 200: + return None + return response.read().decode("utf-8", errors="replace") + except (urllib.error.URLError, TimeoutError): + return None + + +def decode_escaped_value(value: str) -> str: + """Decode HTML entities and JSON escape sequences.""" + decoded = html.unescape(value) + if "\\" in decoded: + try: + unquoted = json.loads('"' + decoded.replace('"', '\\"') + '"') + decoded = unquoted + except (json.JSONDecodeError, ValueError): + pass + return html.unescape(decoded) + + +def pick_preferred_url(urls: list[str]) -> str: + """Pick the best URL from a list, preferring p26 CDN.""" + first_url = "" + for raw_url in urls: + if not raw_url: + continue + decoded_url = decode_escaped_value(raw_url) + if not decoded_url: + continue + if decoded_url.startswith("https://p26"): + return decoded_url + if not first_url: + first_url = decoded_url + return first_url + + +def pick_video_url(urls: list[str]) -> str: + """Pick the best video URL, preferring aweme.snssdk.com.""" + decoded_urls = [] + for raw_url in urls: + if not raw_url: + continue + decoded_url = decode_escaped_value(raw_url).replace("playwm", "play") + decoded_urls.append(decoded_url) + + for url in decoded_urls: + if "aweme.snssdk.com" in url: + return url + return decoded_urls[0] if decoded_urls else "" + + +def extract_aweme_item(html_content: str) -> dict | None: + """Extract the first aweme item from _ROUTER_DATA.""" + match = ROUTER_DATA_RE.search(html_content) + if not match: + return None + + try: + router_data = json.loads(match.group(1)) + except json.JSONDecodeError: + return None + + loader_data = router_data.get("loaderData", {}) + for page_data in loader_data.values(): + if not isinstance(page_data, dict): + continue + video_info_res = page_data.get("videoInfoRes", {}) + item_list = video_info_res.get("item_list", []) + if item_list: + return item_list[0] + return None + + +def parse_note_item(item: dict) -> dict | None: + """Parse image/note type content.""" + images = item.get("images") or item.get("image_infos") or [] + if not images: + return None + + image_urls = [] + seen = set() + for img_info in images: + url_list = img_info.get("url_list", []) + for url in url_list: + if url and url.startswith("http"): + decoded = html.unescape(url) + if decoded not in seen: + image_urls.append(decoded) + seen.add(decoded) + break + + if not image_urls: + return None + + author = item.get("author", {}) + music = item.get("music", {}) + music_url = pick_preferred_url(music.get("play_url", {}).get("url_list", [])) + + # Fallback music URL from video play_addr + if not music_url: + video = item.get("video", {}) + play_addr = video.get("play_addr", {}) + uri = play_addr.get("uri", "") + if uri.startswith("http"): + music_url = decode_escaped_value(uri) + else: + music_url = pick_preferred_url(play_addr.get("url_list", [])) + + return { + "type": "note", + "author": html.unescape(author.get("nickname", "")), + "title": html.unescape(item.get("desc", "")), + "images": image_urls, + "music_url": music_url, + } + + +def parse_video_item(item: dict) -> dict | None: + """Parse video type content.""" + video = item.get("video", {}) + duration = video.get("duration") + if duration is not None and duration == 0: + return None + + play_addr = video.get("play_addr", {}) + video_url = pick_video_url(play_addr.get("url_list", [])) + if not video_url: + return None + + author = item.get("author", {}) + return { + "type": "video", + "author": html.unescape(author.get("nickname", "")), + "title": html.unescape(item.get("desc", "")), + "url": video_url, + "cover": pick_preferred_url(video.get("cover", {}).get("url_list", [])), + } + + +def parse_douyin(short_url: str) -> dict | None: + """Main parsing logic: resolve redirect -> fetch HTML -> extract data.""" + resolved_url = resolve_redirect(short_url) + if not resolved_url: + return None + + html_content = fetch_page_html(resolved_url) + if not html_content: + return None + + item = extract_aweme_item(html_content) + if not item: + return None + + # Try note (images) first, then video + result = parse_note_item(item) + if result: + return result + + result = parse_video_item(item) + if result: + return result + + return None + + +def send_video(video_url: str, robot_port: str, to_wxid: str) -> bool: + """Send video via local robot API.""" + api_url = f"http://127.0.0.1:{robot_port}/api/v1/robot/message/send/video/url" + body = json.dumps({ + "to_wxid": to_wxid, + "video_urls": [video_url], + }).encode("utf-8") + request = urllib.request.Request( + api_url, + data=body, + headers={"Content-Type": "application/json"}, + method="POST", + ) + try: + with urllib.request.urlopen(request, timeout=60) as response: + return 200 <= response.status < 300 + except (urllib.error.URLError, TimeoutError): + return False + + +def send_images(image_urls: list[str], robot_port: str, to_wxid: str) -> bool: + """Send images via local robot API.""" + api_url = f"http://127.0.0.1:{robot_port}/api/v1/robot/message/send/image/url" + body = json.dumps({ + "to_wxid": to_wxid, + "image_urls": image_urls, + }).encode("utf-8") + request = urllib.request.Request( + api_url, + data=body, + headers={"Content-Type": "application/json"}, + method="POST", + ) + try: + with urllib.request.urlopen(request, timeout=60) as response: + return 200 <= response.status < 300 + except (urllib.error.URLError, TimeoutError): + return False + + +def send_text(text: str, robot_port: str, to_wxid: str) -> bool: + """Send text message via local robot API.""" + api_url = f"http://127.0.0.1:{robot_port}/api/v1/robot/message/send/text" + body = json.dumps({ + "to_wxid": to_wxid, + "content": text, + }).encode("utf-8") + request = urllib.request.Request( + api_url, + data=body, + headers={"Content-Type": "application/json"}, + method="POST", + ) + try: + with urllib.request.urlopen(request, timeout=10) as response: + return 200 <= response.status < 300 + except (urllib.error.URLError, TimeoutError): + return False + + +def main() -> int: + robot_port = os.environ.get("ROBOT_WECHAT_CLIENT_PORT", "").strip() + to_wxid = os.environ.get("ROBOT_FROM_WX_ID", "").strip() + message_content = os.environ.get("ROBOT_MESSAGE_CONTENT", "").strip() + + if not robot_port or not to_wxid or not message_content: + sys.stdout.write(FALLBACK_TEXT + "\n") + return 0 + + # Extract douyin URL from message + matches = DOUYIN_URL_RE.findall(message_content) + douyin_urls = [u for u in matches if "v.douyin.com" in u] + if not douyin_urls: + sys.stdout.write(FALLBACK_TEXT + "\n") + return 0 + + douyin_url = douyin_urls[0] + result = parse_douyin(douyin_url) + if not result: + sys.stdout.write(FALLBACK_TEXT + "\n") + return 0 + + if result["type"] == "video": + # Send info text + info_text = f"抖音视频解析成功\n作者: {result['author']}\n标题: {result['title']}" + send_text(info_text, robot_port, to_wxid) + # Send video + if not send_video(result["url"], robot_port, to_wxid): + sys.stdout.write("发送抖音视频失败,请稍后重试。\n") + return 0 + + elif result["type"] == "note": + # Send info text + info_text = ( + f"抖音图片解析成功\n" + f"作者: {result['author']}\n" + f"标题: {result['title']}\n\n" + f"{len(result['images'])}张图片正在发送中..." + ) + send_text(info_text, robot_port, to_wxid) + # Send images + if not send_images(result["images"], robot_port, to_wxid): + sys.stdout.write("发送抖音图片失败,请稍后重试。\n") + return 0 + + return 0 + + +if __name__ == "__main__": + try: + raise SystemExit(main()) + except SystemExit: + raise + except Exception: + traceback.print_exc(file=sys.stdout) + raise SystemExit(1) \ No newline at end of file