#!/usr/bin/env python3 from __future__ import annotations import html import json import os import re import sys import traceback import urllib.error import urllib.parse import urllib.request sys.stderr = sys.stdout DOUYIN_USER_AGENT = ( "Mozilla/5.0 (iPhone; CPU iPhone OS 14_0 like Mac OS X) " "AppleWebKit/605.1.15 (KHTML, like Gecko) " "Version/14.0 Mobile/15E148 Safari/604.1" ) DOUYIN_REFERER = "https://www.douyin.com/" FALLBACK_TEXT = "抖音解析失败,可能是链接已失效或格式不正确。" ROUTER_DATA_RE = re.compile(r"(?s)window\._ROUTER_DATA\s*=\s*(\{.*?\})\s*") DOUYIN_URL_RE = re.compile(r"https://[^\s]+") def build_request(url: str) -> urllib.request.Request: return urllib.request.Request( url, headers={ "User-Agent": DOUYIN_USER_AGENT, "Referer": DOUYIN_REFERER, }, ) def resolve_redirect(short_url: str) -> str | None: """Follow the 302 redirect to get the real page URL.""" class NoRedirectHandler(urllib.request.HTTPRedirectHandler): def redirect_request(self, req, fp, code, msg, headers, newurl): return None opener = urllib.request.build_opener(NoRedirectHandler) req = build_request(short_url) try: response = opener.open(req, timeout=15) return response.url except urllib.error.HTTPError as e: location = e.headers.get("Location") if location: return location return None except (urllib.error.URLError, TimeoutError): return None def fetch_page_html(page_url: str) -> str | None: """Fetch the Douyin page HTML content.""" req = build_request(page_url) try: with urllib.request.urlopen(req, timeout=15) as response: if response.status != 200: return None return response.read().decode("utf-8", errors="replace") except (urllib.error.URLError, TimeoutError): return None def decode_escaped_value(value: str) -> str: """Decode HTML entities and JSON escape sequences.""" decoded = html.unescape(value) if "\\" in decoded: try: unquoted = json.loads('"' + decoded.replace('"', '\\"') + '"') decoded = unquoted except (json.JSONDecodeError, ValueError): pass return html.unescape(decoded) def pick_preferred_url(urls: list[str]) -> str: """Pick the best URL from a list, preferring p26 CDN.""" first_url = "" for raw_url in urls: if not raw_url: continue decoded_url = decode_escaped_value(raw_url) if not decoded_url: continue if decoded_url.startswith("https://p26"): return decoded_url if not first_url: first_url = decoded_url return first_url def pick_video_url(urls: list[str]) -> str: """Pick the best video URL, preferring aweme.snssdk.com.""" decoded_urls = [] for raw_url in urls: if not raw_url: continue decoded_url = decode_escaped_value(raw_url).replace("playwm", "play") decoded_urls.append(decoded_url) for url in decoded_urls: if "aweme.snssdk.com" in url: return url return decoded_urls[0] if decoded_urls else "" def extract_aweme_item(html_content: str) -> dict | None: """Extract the first aweme item from _ROUTER_DATA.""" match = ROUTER_DATA_RE.search(html_content) if not match: return None try: router_data = json.loads(match.group(1)) except json.JSONDecodeError: return None loader_data = router_data.get("loaderData", {}) for page_data in loader_data.values(): if not isinstance(page_data, dict): continue video_info_res = page_data.get("videoInfoRes", {}) item_list = video_info_res.get("item_list", []) if item_list: return item_list[0] return None def parse_note_item(item: dict) -> dict | None: """Parse image/note type content.""" images = item.get("images") or item.get("image_infos") or [] if not images: return None image_urls = [] seen = set() for img_info in images: url_list = img_info.get("url_list", []) for url in url_list: if url and url.startswith("http"): decoded = html.unescape(url) if decoded not in seen: image_urls.append(decoded) seen.add(decoded) break if not image_urls: return None author = item.get("author", {}) music = item.get("music", {}) music_url = pick_preferred_url(music.get("play_url", {}).get("url_list", [])) # Fallback music URL from video play_addr if not music_url: video = item.get("video", {}) play_addr = video.get("play_addr", {}) uri = play_addr.get("uri", "") if uri.startswith("http"): music_url = decode_escaped_value(uri) else: music_url = pick_preferred_url(play_addr.get("url_list", [])) return { "type": "note", "author": html.unescape(author.get("nickname", "")), "title": html.unescape(item.get("desc", "")), "images": image_urls, "music_url": music_url, } def parse_video_item(item: dict) -> dict | None: """Parse video type content.""" video = item.get("video", {}) duration = video.get("duration") if duration is not None and duration == 0: return None play_addr = video.get("play_addr", {}) video_url = pick_video_url(play_addr.get("url_list", [])) if not video_url: return None author = item.get("author", {}) return { "type": "video", "author": html.unescape(author.get("nickname", "")), "title": html.unescape(item.get("desc", "")), "url": video_url, "cover": pick_preferred_url(video.get("cover", {}).get("url_list", [])), } def parse_douyin(short_url: str) -> dict | None: """Main parsing logic: resolve redirect -> fetch HTML -> extract data.""" resolved_url = resolve_redirect(short_url) if not resolved_url: return None html_content = fetch_page_html(resolved_url) if not html_content: return None item = extract_aweme_item(html_content) if not item: return None # Try note (images) first, then video result = parse_note_item(item) if result: return result result = parse_video_item(item) if result: return result return None def send_video(video_url: str, robot_port: str, to_wxid: str) -> bool: """Send video via local robot API.""" api_url = f"http://127.0.0.1:{robot_port}/api/v1/robot/message/send/video/url" body = json.dumps({ "to_wxid": to_wxid, "video_urls": [video_url], }).encode("utf-8") request = urllib.request.Request( api_url, data=body, headers={"Content-Type": "application/json"}, method="POST", ) try: with urllib.request.urlopen(request, timeout=60) as response: return 200 <= response.status < 300 except (urllib.error.URLError, TimeoutError): return False def send_images(image_urls: list[str], robot_port: str, to_wxid: str) -> bool: """Send images via local robot API.""" api_url = f"http://127.0.0.1:{robot_port}/api/v1/robot/message/send/image/url" body = json.dumps({ "to_wxid": to_wxid, "image_urls": image_urls, }).encode("utf-8") request = urllib.request.Request( api_url, data=body, headers={"Content-Type": "application/json"}, method="POST", ) try: with urllib.request.urlopen(request, timeout=60) as response: return 200 <= response.status < 300 except (urllib.error.URLError, TimeoutError): return False def send_text(text: str, robot_port: str, to_wxid: str) -> bool: """Send text message via local robot API.""" api_url = f"http://127.0.0.1:{robot_port}/api/v1/robot/message/send/text" body = json.dumps({ "to_wxid": to_wxid, "content": text, }).encode("utf-8") request = urllib.request.Request( api_url, data=body, headers={"Content-Type": "application/json"}, method="POST", ) try: with urllib.request.urlopen(request, timeout=10) as response: return 200 <= response.status < 300 except (urllib.error.URLError, TimeoutError): return False def main() -> int: robot_port = os.environ.get("ROBOT_WECHAT_CLIENT_PORT", "").strip() to_wxid = os.environ.get("ROBOT_FROM_WX_ID", "").strip() message_content = os.environ.get("ROBOT_MESSAGE_CONTENT", "").strip() if not robot_port or not to_wxid or not message_content: sys.stdout.write(FALLBACK_TEXT + "\n") return 0 # Extract douyin URL from message matches = DOUYIN_URL_RE.findall(message_content) douyin_urls = [u for u in matches if "v.douyin.com" in u] if not douyin_urls: sys.stdout.write(FALLBACK_TEXT + "\n") return 0 douyin_url = douyin_urls[0] result = parse_douyin(douyin_url) if not result: sys.stdout.write(FALLBACK_TEXT + "\n") return 0 if result["type"] == "video": # Send info text info_text = f"抖音视频解析成功\n作者: {result['author']}\n标题: {result['title']}" send_text(info_text, robot_port, to_wxid) # Send video if not send_video(result["url"], robot_port, to_wxid): sys.stdout.write("发送抖音视频失败,请稍后重试。\n") return 0 elif result["type"] == "note": # Send info text info_text = ( f"抖音图片解析成功\n" f"作者: {result['author']}\n" f"标题: {result['title']}\n\n" f"{len(result['images'])}张图片正在发送中..." ) send_text(info_text, robot_port, to_wxid) # Send images if not send_images(result["images"], robot_port, to_wxid): sys.stdout.write("发送抖音图片失败,请稍后重试。\n") return 0 return 0 if __name__ == "__main__": try: raise SystemExit(main()) except SystemExit: raise except Exception: traceback.print_exc(file=sys.stdout) raise SystemExit(1)