feat: 画图支持 OpenAI
This commit is contained in:
parent
66c4cb51e7
commit
20ea36a340
@ -10,7 +10,7 @@ argument-hint: "需要 prompt(提示词)和 images(图片链接列表)
|
||||
|
||||
这是一个 AI 图生图技能,基于输入的一张或多张图片,结合文本提示词生成新的图片。支持图片混合、风格转换、内容合成等多种创作模式。
|
||||
|
||||
支持多个绘图模型:即梦(JiMeng)、豆包(DouBao)、造相(Z-Image)。
|
||||
支持多个绘图模型:即梦(JiMeng)、豆包(DouBao)、造相(Z-Image)、OpenAI GPT Image。
|
||||
|
||||
从数据库中读取绘图配置(API 密钥、Base URL 等),根据用户选择的模型调用对应的绘图 API,返回生成的图片 URL。
|
||||
|
||||
@ -37,7 +37,7 @@ argument-hint: "需要 prompt(提示词)和 images(图片链接列表)
|
||||
},
|
||||
"model": {
|
||||
"type": "string",
|
||||
"description": "画图模型选择(可选):即梦4.5(jimeng-4.5) / 即梦4.6(jimeng-4.6) / 即梦5.0(jimeng-5.0) / 豆包图生图(doubao-seededit-3.0-i2i) / 造相基础版(Z-Image) / 造相蒸馏版(Z-Image-Turbo) / 造相图片编辑(Qwen-Image-Edit-2511),默认: 空(none)。",
|
||||
"description": "画图模型选择(可选):即梦4.5(jimeng-4.5) / 即梦4.6(jimeng-4.6) / 即梦5.0(jimeng-5.0) / 豆包图生图(doubao-seededit-3.0-i2i) / 造相基础版(Z-Image) / 造相蒸馏版(Z-Image-Turbo) / 造相图片编辑(Qwen-Image-Edit-2511) / OpenAI GPT Image(gpt-image-2),默认: 空(none)。",
|
||||
"enum": [
|
||||
"none",
|
||||
"jimeng-4.5",
|
||||
@ -46,7 +46,8 @@ argument-hint: "需要 prompt(提示词)和 images(图片链接列表)
|
||||
"doubao-seededit-3.0-i2i",
|
||||
"Z-Image",
|
||||
"Z-Image-Turbo",
|
||||
"Qwen-Image-Edit-2511"
|
||||
"Qwen-Image-Edit-2511",
|
||||
"gpt-image-2"
|
||||
],
|
||||
"default": "none"
|
||||
},
|
||||
|
||||
@ -3,13 +3,17 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import base64
|
||||
import json
|
||||
import mimetypes
|
||||
import os
|
||||
import re
|
||||
import subprocess
|
||||
import sys
|
||||
import tempfile
|
||||
import time
|
||||
import traceback
|
||||
import urllib.parse
|
||||
import urllib.request
|
||||
from pathlib import Path
|
||||
|
||||
@ -67,6 +71,7 @@ _ensure_skill_venv_python()
|
||||
|
||||
try:
|
||||
import pymysql # type: ignore # noqa: E402
|
||||
from openai import OpenAI # type: ignore # noqa: E402
|
||||
except ModuleNotFoundError:
|
||||
_run_bootstrap()
|
||||
_py = _get_python_executable()
|
||||
@ -167,6 +172,188 @@ def _http_get_json(url: str, headers: dict, timeout: int = 30) -> dict:
|
||||
return json.loads(resp.read().decode("utf-8"))
|
||||
|
||||
|
||||
def _coerce_int(value, default: int, minimum: int, maximum: int) -> int:
|
||||
try:
|
||||
parsed = int(value)
|
||||
except (TypeError, ValueError):
|
||||
parsed = default
|
||||
return min(max(parsed, minimum), maximum)
|
||||
|
||||
|
||||
def _openai_output_format(config: dict) -> str:
|
||||
output_format = str(config.get("output_format", "png") or "png").lower()
|
||||
if output_format not in {"png", "jpeg", "webp"}:
|
||||
return "png"
|
||||
return output_format
|
||||
|
||||
|
||||
def _openai_size(config: dict, ratio: str, resolution: str) -> str:
|
||||
configured = str(config.get("size", "") or "").strip()
|
||||
if configured:
|
||||
return configured
|
||||
|
||||
normalized_ratio = (ratio or "").replace(" ", "").lower()
|
||||
normalized_resolution = (resolution or "").replace(" ", "").lower()
|
||||
|
||||
if normalized_resolution in {"4k", "2160p", "3840x2160"}:
|
||||
sizes = {
|
||||
"16:9": "3840x2160",
|
||||
"9:16": "2160x3840",
|
||||
"1:1": "2048x2048",
|
||||
"3:2": "3072x2048",
|
||||
"2:3": "2048x3072",
|
||||
}
|
||||
elif normalized_resolution in {"2k", "1440p", "2048"}:
|
||||
sizes = {
|
||||
"16:9": "2048x1152",
|
||||
"9:16": "1152x2048",
|
||||
"1:1": "2048x2048",
|
||||
"3:2": "2048x1360",
|
||||
"2:3": "1360x2048",
|
||||
}
|
||||
elif normalized_resolution in {"1k", "1024", "1024p"}:
|
||||
sizes = {
|
||||
"16:9": "1536x864",
|
||||
"9:16": "864x1536",
|
||||
"1:1": "1024x1024",
|
||||
"3:2": "1536x1024",
|
||||
"2:3": "1024x1536",
|
||||
}
|
||||
else:
|
||||
return "auto"
|
||||
|
||||
return sizes.get(normalized_ratio, "auto")
|
||||
|
||||
|
||||
def _openai_prompt(prompt: str, negative_prompt: str) -> str:
|
||||
if not negative_prompt:
|
||||
return prompt
|
||||
return f"{prompt}\n\n不要包含: {negative_prompt}"
|
||||
|
||||
|
||||
def _openai_client(config: dict) -> OpenAI:
|
||||
api_key = str(config.get("api_key", "")).strip()
|
||||
if not api_key:
|
||||
raise RuntimeError("OpenAI 绘图配置缺少 api_key")
|
||||
|
||||
kwargs: dict[str, str | float] = {"api_key": api_key}
|
||||
base_url = str(config.get("base_url", "") or "").strip()
|
||||
if base_url:
|
||||
kwargs["base_url"] = base_url
|
||||
organization = str(config.get("organization", "") or "").strip()
|
||||
if organization:
|
||||
kwargs["organization"] = organization
|
||||
project = str(config.get("project", "") or "").strip()
|
||||
if project:
|
||||
kwargs["project"] = project
|
||||
timeout_value = config.get("timeout")
|
||||
if timeout_value not in (None, ""):
|
||||
kwargs["timeout"] = float(timeout_value)
|
||||
|
||||
return OpenAI(**kwargs)
|
||||
|
||||
|
||||
def _openai_image_suffix(output_format: str) -> str:
|
||||
if output_format == "jpeg":
|
||||
return ".jpg"
|
||||
return f".{output_format}"
|
||||
|
||||
|
||||
def _write_openai_image_file(b64_json: str, output_format: str) -> str:
|
||||
image_bytes = base64.b64decode(b64_json)
|
||||
with tempfile.NamedTemporaryFile(
|
||||
prefix="wechat-openai-image-",
|
||||
suffix=_openai_image_suffix(output_format),
|
||||
delete=False,
|
||||
) as image_file:
|
||||
image_file.write(image_bytes)
|
||||
return image_file.name
|
||||
|
||||
|
||||
def _openai_images_from_response(response, output_format: str) -> list[str]:
|
||||
outputs: list[str] = []
|
||||
for item in getattr(response, "data", []) or []:
|
||||
b64_json = getattr(item, "b64_json", None)
|
||||
if b64_json:
|
||||
outputs.append(_write_openai_image_file(b64_json, output_format))
|
||||
continue
|
||||
url = getattr(item, "url", None)
|
||||
if url:
|
||||
outputs.append(url)
|
||||
return outputs
|
||||
|
||||
|
||||
def _is_remote_image_url(value: str) -> bool:
|
||||
return urllib.parse.urlparse(value).scheme in {"http", "https"}
|
||||
|
||||
|
||||
def _send_image_outputs(client_port: str, from_wx_id: str, image_outputs: list[str]) -> None:
|
||||
remote_urls = [value for value in image_outputs if value and _is_remote_image_url(value)]
|
||||
local_paths = [value for value in image_outputs if value and not _is_remote_image_url(value)]
|
||||
|
||||
if remote_urls:
|
||||
send_url = f"http://127.0.0.1:{client_port}/api/v1/robot/message/send/image/url"
|
||||
send_body = {
|
||||
"to_wxid": from_wx_id,
|
||||
"image_urls": remote_urls,
|
||||
}
|
||||
_http_post_json(send_url, send_body, {"Content-Type": "application/json"}, timeout=60)
|
||||
|
||||
if local_paths:
|
||||
send_url = f"http://127.0.0.1:{client_port}/api/v1/robot/message/send/image/local"
|
||||
send_body = {
|
||||
"to_wxid": from_wx_id,
|
||||
"file_path": local_paths,
|
||||
}
|
||||
_http_post_json(send_url, send_body, {"Content-Type": "application/json"}, timeout=60)
|
||||
|
||||
|
||||
def _cleanup_openai_temp_files(image_outputs: list[str]) -> None:
|
||||
for value in image_outputs:
|
||||
path = Path(value)
|
||||
if path.name.startswith("wechat-openai-image-") and path.is_file():
|
||||
try:
|
||||
path.unlink()
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
|
||||
def _extension_from_mime(mime_type: str) -> str:
|
||||
if mime_type == "image/jpeg":
|
||||
return ".jpg"
|
||||
guessed = mimetypes.guess_extension(mime_type)
|
||||
if guessed in {".png", ".jpg", ".jpeg", ".webp"}:
|
||||
return guessed
|
||||
return ".png"
|
||||
|
||||
|
||||
def _download_openai_input_image(image: str, directory: str, index: int) -> Path:
|
||||
stripped = image.strip()
|
||||
if stripped.startswith("data:"):
|
||||
header, encoded = stripped.split(",", 1)
|
||||
mime_type = header[5:].split(";", 1)[0] or "image/png"
|
||||
path = Path(directory) / f"input-{index}{_extension_from_mime(mime_type)}"
|
||||
path.write_bytes(base64.b64decode(encoded))
|
||||
return path
|
||||
|
||||
parsed = urllib.parse.urlparse(stripped)
|
||||
if parsed.scheme in {"http", "https"}:
|
||||
request = urllib.request.Request(stripped, headers={"User-Agent": "wechat-robot-skills/1.0"})
|
||||
with urllib.request.urlopen(request, timeout=60) as response:
|
||||
content_type = response.headers.get("Content-Type", "image/png").split(";", 1)[0].strip()
|
||||
suffix = Path(parsed.path).suffix.lower()
|
||||
if suffix not in {".png", ".jpg", ".jpeg", ".webp"}:
|
||||
suffix = _extension_from_mime(content_type)
|
||||
path = Path(directory) / f"input-{index}{suffix}"
|
||||
path.write_bytes(response.read())
|
||||
return path
|
||||
|
||||
path = Path(stripped).expanduser()
|
||||
if path.is_file():
|
||||
return path
|
||||
raise RuntimeError(f"无法读取图片: {image}")
|
||||
|
||||
|
||||
def call_jimeng(config: dict, prompt: str, model: str, images: list[str],
|
||||
negative_prompt: str, ratio: str, resolution: str) -> list[str]:
|
||||
"""Call JiMeng (即梦) image compositions API (图生图)."""
|
||||
@ -309,6 +496,44 @@ def call_zimage(config: dict, prompt: str, model: str, images: list[str]) -> lis
|
||||
raise RuntimeError("造相绘图任务超时")
|
||||
|
||||
|
||||
def call_openai(config: dict, prompt: str, model: str, images: list[str],
|
||||
negative_prompt: str, ratio: str, resolution: str) -> list[str]:
|
||||
"""Call OpenAI GPT Image API for image editing."""
|
||||
client = _openai_client(config)
|
||||
output_format = _openai_output_format(config)
|
||||
quality = str(config.get("quality", "auto") or "auto")
|
||||
background = str(config.get("background", "auto") or "auto")
|
||||
if background == "transparent":
|
||||
background = "auto"
|
||||
|
||||
with tempfile.TemporaryDirectory() as temp_dir:
|
||||
input_paths = [
|
||||
_download_openai_input_image(image, temp_dir, index)
|
||||
for index, image in enumerate(images[:16], start=1)
|
||||
]
|
||||
input_files = [path.open("rb") for path in input_paths]
|
||||
try:
|
||||
kwargs = {
|
||||
"model": model or "gpt-image-2",
|
||||
"prompt": _openai_prompt(prompt, negative_prompt),
|
||||
"image": input_files,
|
||||
"n": _coerce_int(config.get("n"), 1, 1, 10),
|
||||
"size": _openai_size(config, ratio, resolution),
|
||||
"quality": quality,
|
||||
"background": background,
|
||||
"output_format": output_format,
|
||||
}
|
||||
if output_format in {"jpeg", "webp"} and config.get("output_compression") is not None:
|
||||
kwargs["output_compression"] = _coerce_int(config.get("output_compression"), 100, 0, 100)
|
||||
|
||||
response = client.images.edit(**kwargs)
|
||||
finally:
|
||||
for input_file in input_files:
|
||||
input_file.close()
|
||||
|
||||
return _openai_images_from_response(response, output_format)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Main
|
||||
# ---------------------------------------------------------------------------
|
||||
@ -316,6 +541,7 @@ def call_zimage(config: dict, prompt: str, model: str, images: list[str]) -> lis
|
||||
JIMENG_MODELS = {"jimeng-4.5", "jimeng-4.6", "jimeng-5.0"}
|
||||
DOUBAO_MODELS = {"doubao-seededit-3.0-i2i"}
|
||||
ZIMAGE_MODELS = {"Z-Image", "Z-Image-Turbo", "Qwen-Image-Edit-2511"}
|
||||
OPENAI_MODELS = {"gpt-image-2"}
|
||||
|
||||
|
||||
def _parse_cli_params(argv: list[str]) -> dict:
|
||||
@ -424,6 +650,13 @@ def main() -> int:
|
||||
return 0
|
||||
image_urls = call_zimage(zimage_config, prompt, model, images)
|
||||
|
||||
elif model in OPENAI_MODELS:
|
||||
openai_config = settings_json.get("OpenAI", {})
|
||||
if not openai_config.get("enabled", False):
|
||||
sys.stdout.write("OpenAI 绘图未开启\n")
|
||||
return 0
|
||||
image_urls = call_openai(openai_config, prompt, model, images, negative_prompt, ratio, resolution)
|
||||
|
||||
else:
|
||||
sys.stdout.write("不支持的 AI 图像模型\n")
|
||||
return 1
|
||||
@ -439,20 +672,18 @@ def main() -> int:
|
||||
# 通过客户端接口发送图片
|
||||
client_port = os.environ.get("ROBOT_WECHAT_CLIENT_PORT", "").strip()
|
||||
if not client_port:
|
||||
_cleanup_openai_temp_files(image_urls)
|
||||
sys.stdout.write("环境变量 ROBOT_WECHAT_CLIENT_PORT 未配置\n")
|
||||
return 1
|
||||
|
||||
send_url = f"http://127.0.0.1:{client_port}/api/v1/robot/message/send/image/url"
|
||||
send_body = {
|
||||
"to_wxid": from_wx_id,
|
||||
"image_urls": [u for u in image_urls if u],
|
||||
}
|
||||
try:
|
||||
_http_post_json(send_url, send_body, {"Content-Type": "application/json"}, timeout=60)
|
||||
_send_image_outputs(client_port, from_wx_id, image_urls)
|
||||
sys.stdout.write("图片发送成功\n")
|
||||
except Exception as exc:
|
||||
sys.stdout.write(f"发送图片失败: {exc}\n")
|
||||
return 1
|
||||
finally:
|
||||
_cleanup_openai_temp_files(image_urls)
|
||||
|
||||
return 0
|
||||
|
||||
|
||||
@ -1,2 +1,3 @@
|
||||
cryptography
|
||||
openai>=2.34.0
|
||||
pymysql>=1.1,<2
|
||||
|
||||
@ -8,7 +8,7 @@ argument-hint: "需要 prompt 参数(画图提示词),可选 model(模
|
||||
|
||||
## 描述
|
||||
|
||||
这是一个 AI 文生图技能,当用户想通过文本描述生成图像时触发。支持多个绘图模型:即梦(JiMeng)、豆包(DouBao)、造相(Z-Image)。
|
||||
这是一个 AI 文生图技能,当用户想通过文本描述生成图像时触发。支持多个绘图模型:即梦(JiMeng)、豆包(DouBao)、造相(Z-Image)、OpenAI GPT Image。
|
||||
|
||||
从数据库中读取绘图配置(API 密钥、Base URL 等),根据用户选择的模型调用对应的绘图 API,返回生成的图片 URL。
|
||||
|
||||
@ -35,7 +35,7 @@ argument-hint: "需要 prompt 参数(画图提示词),可选 model(模
|
||||
},
|
||||
"model": {
|
||||
"type": "string",
|
||||
"description": "画图模型选择(可选):即梦4.5(jimeng-4.5) / 即梦4.6(jimeng-4.6) / 即梦5.0(jimeng-5.0) / 豆包4.5(doubao-seedream-4.5) / 豆包4.0(doubao-seedream-4.0) / 豆包文生图(doubao-seedream-3.0-t2i) / 豆包图生图(doubao-seededit-3.0-i2i) / 造相基础版(Z-Image) / 造相蒸馏版(Z-Image-Turbo) / 造相图片编辑(Qwen-Image-Edit-2511),默认: 空(none)。",
|
||||
"description": "画图模型选择(可选):即梦4.5(jimeng-4.5) / 即梦4.6(jimeng-4.6) / 即梦5.0(jimeng-5.0) / 豆包4.5(doubao-seedream-4.5) / 豆包4.0(doubao-seedream-4.0) / 豆包文生图(doubao-seedream-3.0-t2i) / 豆包图生图(doubao-seededit-3.0-i2i) / 造相基础版(Z-Image) / 造相蒸馏版(Z-Image-Turbo) / 造相图片编辑(Qwen-Image-Edit-2511) / OpenAI GPT Image(gpt-image-2),默认: 空(none)。",
|
||||
"enum": [
|
||||
"none",
|
||||
"jimeng-4.5",
|
||||
@ -47,7 +47,8 @@ argument-hint: "需要 prompt 参数(画图提示词),可选 model(模
|
||||
"doubao-seededit-3.0-i2i",
|
||||
"Z-Image",
|
||||
"Z-Image-Turbo",
|
||||
"Qwen-Image-Edit-2511"
|
||||
"Qwen-Image-Edit-2511",
|
||||
"gpt-image-2"
|
||||
],
|
||||
"default": "none"
|
||||
},
|
||||
|
||||
@ -1,2 +1,3 @@
|
||||
cryptography
|
||||
pymysql>=1.1,<2
|
||||
openai>=2.34.0
|
||||
pymysql>=1.1,<2
|
||||
|
||||
@ -3,13 +3,16 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import base64
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import subprocess
|
||||
import sys
|
||||
import tempfile
|
||||
import time
|
||||
import traceback
|
||||
import urllib.parse
|
||||
import urllib.request
|
||||
from pathlib import Path
|
||||
|
||||
@ -67,6 +70,7 @@ _ensure_skill_venv_python()
|
||||
|
||||
try:
|
||||
import pymysql # type: ignore # noqa: E402
|
||||
from openai import OpenAI # type: ignore # noqa: E402
|
||||
except ModuleNotFoundError:
|
||||
_run_bootstrap()
|
||||
_py = _get_python_executable()
|
||||
@ -169,6 +173,152 @@ def _http_get_json(url: str, headers: dict, timeout: int = 30) -> dict:
|
||||
return json.loads(resp.read().decode("utf-8"))
|
||||
|
||||
|
||||
def _coerce_int(value, default: int, minimum: int, maximum: int) -> int:
|
||||
try:
|
||||
parsed = int(value)
|
||||
except (TypeError, ValueError):
|
||||
parsed = default
|
||||
return min(max(parsed, minimum), maximum)
|
||||
|
||||
|
||||
def _openai_output_format(config: dict) -> str:
|
||||
output_format = str(config.get("output_format", "png") or "png").lower()
|
||||
if output_format not in {"png", "jpeg", "webp"}:
|
||||
return "png"
|
||||
return output_format
|
||||
|
||||
|
||||
def _openai_size(config: dict, ratio: str, resolution: str) -> str:
|
||||
configured = str(config.get("size", "") or "").strip()
|
||||
if configured:
|
||||
return configured
|
||||
|
||||
normalized_ratio = (ratio or "").replace(" ", "").lower()
|
||||
normalized_resolution = (resolution or "").replace(" ", "").lower()
|
||||
|
||||
if normalized_resolution in {"4k", "2160p", "3840x2160"}:
|
||||
sizes = {
|
||||
"16:9": "3840x2160",
|
||||
"9:16": "2160x3840",
|
||||
"1:1": "2048x2048",
|
||||
"3:2": "3072x2048",
|
||||
"2:3": "2048x3072",
|
||||
}
|
||||
elif normalized_resolution in {"2k", "1440p", "2048"}:
|
||||
sizes = {
|
||||
"16:9": "2048x1152",
|
||||
"9:16": "1152x2048",
|
||||
"1:1": "2048x2048",
|
||||
"3:2": "2048x1360",
|
||||
"2:3": "1360x2048",
|
||||
}
|
||||
elif normalized_resolution in {"1k", "1024", "1024p"}:
|
||||
sizes = {
|
||||
"16:9": "1536x864",
|
||||
"9:16": "864x1536",
|
||||
"1:1": "1024x1024",
|
||||
"3:2": "1536x1024",
|
||||
"2:3": "1024x1536",
|
||||
}
|
||||
else:
|
||||
return "auto"
|
||||
|
||||
return sizes.get(normalized_ratio, "auto")
|
||||
|
||||
|
||||
def _openai_prompt(prompt: str, negative_prompt: str) -> str:
|
||||
if not negative_prompt:
|
||||
return prompt
|
||||
return f"{prompt}\n\n不要包含: {negative_prompt}"
|
||||
|
||||
|
||||
def _openai_client(config: dict) -> OpenAI:
|
||||
api_key = str(config.get("api_key", "")).strip()
|
||||
if not api_key:
|
||||
raise RuntimeError("OpenAI 绘图配置缺少 api_key")
|
||||
|
||||
kwargs: dict[str, str | float] = {"api_key": api_key}
|
||||
base_url = str(config.get("base_url", "") or "").strip()
|
||||
if base_url:
|
||||
kwargs["base_url"] = base_url
|
||||
organization = str(config.get("organization", "") or "").strip()
|
||||
if organization:
|
||||
kwargs["organization"] = organization
|
||||
project = str(config.get("project", "") or "").strip()
|
||||
if project:
|
||||
kwargs["project"] = project
|
||||
timeout_value = config.get("timeout")
|
||||
if timeout_value not in (None, ""):
|
||||
kwargs["timeout"] = float(timeout_value)
|
||||
|
||||
return OpenAI(**kwargs)
|
||||
|
||||
|
||||
def _openai_image_suffix(output_format: str) -> str:
|
||||
if output_format == "jpeg":
|
||||
return ".jpg"
|
||||
return f".{output_format}"
|
||||
|
||||
|
||||
def _write_openai_image_file(b64_json: str, output_format: str) -> str:
|
||||
image_bytes = base64.b64decode(b64_json)
|
||||
with tempfile.NamedTemporaryFile(
|
||||
prefix="wechat-openai-image-",
|
||||
suffix=_openai_image_suffix(output_format),
|
||||
delete=False,
|
||||
) as image_file:
|
||||
image_file.write(image_bytes)
|
||||
return image_file.name
|
||||
|
||||
|
||||
def _openai_images_from_response(response, output_format: str) -> list[str]:
|
||||
outputs: list[str] = []
|
||||
for item in getattr(response, "data", []) or []:
|
||||
b64_json = getattr(item, "b64_json", None)
|
||||
if b64_json:
|
||||
outputs.append(_write_openai_image_file(b64_json, output_format))
|
||||
continue
|
||||
url = getattr(item, "url", None)
|
||||
if url:
|
||||
outputs.append(url)
|
||||
return outputs
|
||||
|
||||
|
||||
def _is_remote_image_url(value: str) -> bool:
|
||||
return urllib.parse.urlparse(value).scheme in {"http", "https"}
|
||||
|
||||
|
||||
def _send_image_outputs(client_port: str, from_wx_id: str, image_outputs: list[str]) -> None:
|
||||
remote_urls = [value for value in image_outputs if value and _is_remote_image_url(value)]
|
||||
local_paths = [value for value in image_outputs if value and not _is_remote_image_url(value)]
|
||||
|
||||
if remote_urls:
|
||||
send_url = f"http://127.0.0.1:{client_port}/api/v1/robot/message/send/image/url"
|
||||
send_body = {
|
||||
"to_wxid": from_wx_id,
|
||||
"image_urls": remote_urls,
|
||||
}
|
||||
_http_post_json(send_url, send_body, {"Content-Type": "application/json"}, timeout=60)
|
||||
|
||||
if local_paths:
|
||||
send_url = f"http://127.0.0.1:{client_port}/api/v1/robot/message/send/image/local"
|
||||
send_body = {
|
||||
"to_wxid": from_wx_id,
|
||||
"file_path": local_paths,
|
||||
}
|
||||
_http_post_json(send_url, send_body, {"Content-Type": "application/json"}, timeout=60)
|
||||
|
||||
|
||||
def _cleanup_openai_temp_files(image_outputs: list[str]) -> None:
|
||||
for value in image_outputs:
|
||||
path = Path(value)
|
||||
if path.name.startswith("wechat-openai-image-") and path.is_file():
|
||||
try:
|
||||
path.unlink()
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
|
||||
def call_jimeng(config: dict, prompt: str, model: str,
|
||||
negative_prompt: str, ratio: str, resolution: str) -> list[str]:
|
||||
"""Call JiMeng (即梦) image generation API."""
|
||||
@ -315,6 +465,34 @@ def call_zimage(config: dict, prompt: str, model: str) -> list[str]:
|
||||
raise RuntimeError("造相绘图任务超时")
|
||||
|
||||
|
||||
def call_openai(config: dict, prompt: str, model: str,
|
||||
negative_prompt: str, ratio: str, resolution: str) -> list[str]:
|
||||
"""Call OpenAI GPT Image API for text-to-image generation."""
|
||||
client = _openai_client(config)
|
||||
output_format = _openai_output_format(config)
|
||||
quality = str(config.get("quality", "auto") or "auto")
|
||||
moderation = str(config.get("moderation", "auto") or "auto")
|
||||
background = str(config.get("background", "auto") or "auto")
|
||||
if background == "transparent":
|
||||
background = "auto"
|
||||
|
||||
kwargs = {
|
||||
"model": model or "gpt-image-2",
|
||||
"prompt": _openai_prompt(prompt, negative_prompt),
|
||||
"n": _coerce_int(config.get("n"), 1, 1, 10),
|
||||
"size": _openai_size(config, ratio, resolution),
|
||||
"quality": quality,
|
||||
"background": background,
|
||||
"moderation": moderation,
|
||||
"output_format": output_format,
|
||||
}
|
||||
if output_format in {"jpeg", "webp"} and config.get("output_compression") is not None:
|
||||
kwargs["output_compression"] = _coerce_int(config.get("output_compression"), 100, 0, 100)
|
||||
|
||||
response = client.images.generate(**kwargs)
|
||||
return _openai_images_from_response(response, output_format)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Main
|
||||
# ---------------------------------------------------------------------------
|
||||
@ -322,6 +500,7 @@ def call_zimage(config: dict, prompt: str, model: str) -> list[str]:
|
||||
JIMENG_MODELS = {"jimeng-4.5", "jimeng-4.6", "jimeng-5.0"}
|
||||
DOUBAO_MODELS = {"doubao-seedream-4.5", "doubao-seedream-4.0", "doubao-seedream-3.0-t2i", "doubao-seededit-3.0-i2i"}
|
||||
ZIMAGE_MODELS = {"Z-Image", "Z-Image-Turbo", "Qwen-Image-Edit-2511"}
|
||||
OPENAI_MODELS = {"gpt-image-2"}
|
||||
|
||||
|
||||
def _parse_cli_params(argv: list[str]) -> dict[str, str]:
|
||||
@ -423,6 +602,13 @@ def main() -> int:
|
||||
return 0
|
||||
image_urls = call_zimage(zimage_config, prompt, model)
|
||||
|
||||
elif model in OPENAI_MODELS:
|
||||
openai_config = settings_json.get("OpenAI", {})
|
||||
if not openai_config.get("enabled", False):
|
||||
sys.stdout.write("OpenAI 绘图未开启\n")
|
||||
return 0
|
||||
image_urls = call_openai(openai_config, prompt, model, negative_prompt, ratio, resolution)
|
||||
|
||||
else:
|
||||
sys.stdout.write("不支持的 AI 图像模型\n")
|
||||
return 1
|
||||
@ -438,20 +624,18 @@ def main() -> int:
|
||||
# 通过客户端接口发送图片
|
||||
client_port = os.environ.get("ROBOT_WECHAT_CLIENT_PORT", "").strip()
|
||||
if not client_port:
|
||||
_cleanup_openai_temp_files(image_urls)
|
||||
sys.stdout.write("环境变量 ROBOT_WECHAT_CLIENT_PORT 未配置\n")
|
||||
return 1
|
||||
|
||||
send_url = f"http://127.0.0.1:{client_port}/api/v1/robot/message/send/image/url"
|
||||
send_body = {
|
||||
"to_wxid": from_wx_id,
|
||||
"image_urls": [u for u in image_urls if u],
|
||||
}
|
||||
try:
|
||||
_http_post_json(send_url, send_body, {"Content-Type": "application/json"}, timeout=60)
|
||||
_send_image_outputs(client_port, from_wx_id, image_urls)
|
||||
sys.stdout.write("图片发送成功\n")
|
||||
except Exception as exc:
|
||||
sys.stdout.write(f"发送图片失败: {exc}\n")
|
||||
return 1
|
||||
finally:
|
||||
_cleanup_openai_temp_files(image_urls)
|
||||
|
||||
return 0
|
||||
|
||||
|
||||
Loading…
Reference in New Issue
Block a user