上传文件至 gupiao/scripts

This commit is contained in:
lj091715 2026-05-20 21:38:50 +08:00
parent 47d9a5bbb3
commit 8912f45ceb

603
gupiao/scripts/gupiao.py Normal file
View File

@ -0,0 +1,603 @@
#!/usr/bin/env python3
"""股票游戏插件 - 10只真实股票交易与签到积分联动"""
from __future__ import annotations
import json
import os
import sys
import traceback
import urllib.error
import urllib.request
from datetime import datetime
from typing import Any
sys.stderr = sys.stdout
# ---------- 路径配置 ----------
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
DATA_DIR = os.path.join(SCRIPT_DIR, "..", "data")
os.makedirs(DATA_DIR, exist_ok=True)
GUPIAO_FILE = os.path.join(DATA_DIR, "gupiao.json")
QIANDAN_FILE = os.path.join(
SCRIPT_DIR, "..", "..", "qiandao", "data", "qiandao.json"
)
# ---------- 股票池定义 ----------
STOCK_DEFINITIONS = [
{"symbol": "腾讯", "api_code": "hk00700"},
{"symbol": "阿里", "api_code": "hk09988"},
{"symbol": "苹果", "api_code": "usAAPL"},
{"symbol": "平安", "api_code": "sh601318"},
{"symbol": "特斯拉", "api_code": "usTSLA"},
{"symbol": "工业富联","api_code": "sh601138"},
{"symbol": "药明康德","api_code": "sh603259"},
{"symbol": "宁德时代","api_code": "sz300750"},
{"symbol": "茅台", "api_code": "sh600519"},
{"symbol": "三七互娱","api_code": "sz002555"},
]
SYMBOLS_LOWER = {s["symbol"].lower(): s for s in STOCK_DEFINITIONS}
# ---------- 游戏参数 ----------
TRANSACTION_FEE_RATE = 0.001 # 交易手续费 0.1%
TAX_RATE = 0.0005 # 印花税费率 0.05%(卖出时收取)
MIN_TRANSACTION_SHARES = 1 # 最小交易数量
API_TIMEOUT = 10 # API 请求超时(秒)
# ====================================================================
# API 数据获取
# ====================================================================
# --- qt.gtimg.cn (股票) ---
def parse_gtimg_response(raw: str) -> dict[str, Any] | None:
"""解析 qt.gtimg.cn 返回的 ~ 分隔数据。"""
try:
eq_pos = raw.find("=")
if eq_pos == -1:
return None
content = raw[eq_pos + 1:].strip().strip('"').strip(";")
fields = content.split("~")
if len(fields) < 34:
return None
name = fields[1] if len(fields) > 1 else ""
try:
price = float(fields[3]) if fields[3] else 0.0
except (ValueError, TypeError):
return None
try:
change = float(fields[31]) if fields[31] else 0.0
except ValueError:
change = 0.0
try:
change_pct = float(fields[32]) if fields[32] else 0.0
except ValueError:
change_pct = 0.0
try:
high = float(fields[33]) if fields[33] else price
except ValueError:
high = price
try:
low = float(fields[34]) if fields[34] else price
except ValueError:
low = price
return {
"name": name, "price": price,
"change": change, "change_pct": change_pct,
"high": high, "low": low,
}
except Exception:
return None
def fetch_stock_price(api_code: str) -> dict[str, Any] | None:
"""从 qt.gtimg.cn 获取单只股票实时行情。"""
url = f"https://qt.gtimg.cn/q={api_code}"
try:
req = urllib.request.Request(url, headers={
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
})
with urllib.request.urlopen(req, timeout=API_TIMEOUT) as resp:
raw = resp.read().decode("gbk")
return parse_gtimg_response(raw)
except (urllib.error.URLError, TimeoutError, OSError):
return None
# --- 统一查询 ---
def fetch_all_prices() -> dict[str, dict[str, Any]]:
"""批量获取所有10只股票的真实价格。"""
results: dict[str, dict[str, Any]] = {}
for sd in STOCK_DEFINITIONS:
api_data = fetch_stock_price(sd["api_code"])
if api_data:
results[sd["symbol"]] = {
"price": api_data["price"],
"change": api_data.get("change", 0),
"change_pct": api_data.get("change_pct", 0),
"high": api_data.get("high", api_data["price"]),
"low": api_data.get("low", api_data["price"]),
}
else:
results[sd["symbol"]] = {"price": None, "error": True}
return results
# ====================================================================
# 数据加载 / 保存
# ====================================================================
def load_game_data() -> dict[str, Any]:
"""加载股票游戏数据(持仓和交易记录)。"""
if os.path.exists(GUPIAO_FILE):
try:
with open(GUPIAO_FILE, encoding="utf-8") as f:
return json.load(f)
except (json.JSONDecodeError, OSError):
pass
return {"portfolios": {}}
def save_game_data(data: dict[str, Any]) -> None:
"""保存股票游戏数据。"""
with open(GUPIAO_FILE, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
def load_qiandao_data() -> dict[str, Any]:
"""加载签到数据。"""
if os.path.exists(QIANDAN_FILE):
try:
with open(QIANDAN_FILE, encoding="utf-8") as f:
return json.load(f)
except (json.JSONDecodeError, OSError):
pass
return {"users": {}}
def save_qiandao_data(data: dict[str, Any]) -> None:
"""保存签到数据。"""
with open(QIANDAN_FILE, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
# ====================================================================
# 签到积分操作
# ====================================================================
def get_user_cash(user_id: str) -> float:
"""获取用户的可用现金。"""
qd = load_qiandao_data()
user = qd.get("users", {}).get(user_id)
if user is None:
return 0.0
return float(user.get("total_points", 0))
def deduct_cash(user_id: str, amount: float) -> bool:
"""从签到积分扣除金额(买入)。"""
qd = load_qiandao_data()
user = qd.get("users", {}).get(user_id)
if user is None:
return False
current = float(user.get("total_points", 0))
if current < amount - 0.001:
return False
user["total_points"] = round(current - amount, 2)
save_qiandao_data(qd)
return True
def add_cash(user_id: str, amount: float) -> None:
"""向签到积分增加金额(卖出所得)。"""
qd = load_qiandao_data()
user = qd.get("users", {}).get(user_id)
if user is None:
return
current = float(user.get("total_points", 0))
user["total_points"] = round(current + amount, 2)
save_qiandao_data(qd)
# ====================================================================
# 核心业务函数
# ====================================================================
def resolve_symbol(text: str) -> dict[str, Any] | None:
"""将用户输入转换为股票定义。支持「苹果」「苹果股票」等格式。"""
key = text.strip().lower().replace("股票", "").replace(" ", "")
return SYMBOLS_LOWER.get(key)
def parse_symbol_shares(combined: str) -> tuple[str | None, int | None]:
"""解析合并格式如「苹果股票13股」→ (苹果, 13)。"""
raw = combined.strip()
# 提取尾部数字, support 13 or 13股
import re as _re
m = _re.search(r'(\d+)(?:股)?$', raw)
if not m:
return None, None
shares = int(m.group(1))
sym_raw = raw[:m.start()].strip().replace("股票", "").replace(" ", "")
if sym_raw:
return sym_raw, shares
return None, None
def stock_display_name(sd: dict[str, Any]) -> str:
"""返回显示名,如「腾讯股票」「比特币股票」。"""
return f"{sd['symbol']}股票"
def market(user_id: str, nickname: str) -> str:
"""查看行情。"""
prices = fetch_all_prices()
lines = ["📈 股票行情", "" * 22]
for sd in STOCK_DEFINITIONS:
sym = sd["symbol"]
info = prices.get(sym, {})
if info.get("error") or info.get("price") is None:
lines.append(f"⚠️ 【{stock_display_name(sd)}】行情获取失败...")
continue
price = info["price"]
change = info["change"]
arrow = "📈" if change >= 0 else "📉"
pct_abs = abs(info["change_pct"])
vol = "" if pct_abs >= 2 else ("" if pct_abs >= 0.5 else "")
lines.append(
f"{arrow}{stock_display_name(sd)}{price:.2f} (波动:{vol})"
)
return "\n".join(lines)
def buy(user_id: str, nickname: str, symbol: str, shares: int) -> str:
"""买入股票。"""
if shares < MIN_TRANSACTION_SHARES:
return f"❌ 最少买入 {MIN_TRANSACTION_SHARES}"
sd = resolve_symbol(symbol)
if not sd:
return f"❌ 未知股票「{symbol}」,支持的:{', '.join(s['symbol'] for s in STOCK_DEFINITIONS)}"
# 获取实时价格
api_data = fetch_stock_price(sd["api_code"])
if not api_data:
return f"❌ 获取 {stock_display_name(sd)} 实时行情失败,请稍后再试"
price = api_data["price"]
total_cost = round(price * shares, 2)
fee = round(total_cost * TRANSACTION_FEE_RATE, 2)
total_need = round(total_cost + fee, 2)
fee_pct = TRANSACTION_FEE_RATE * 100
cash = get_user_cash(user_id)
if cash < total_need:
return (
f"❌ 现金不足!\n"
f" 💰 需要:{total_need:.2f} 积分(含手续费 {fee:.2f}\n"
f" 💵 持有:{cash:.2f} 积分\n"
f" 💡 还差 {total_need - cash:.2f} 积分,去签到赚积分吧!"
)
if not deduct_cash(user_id, total_need):
return "❌ 扣款失败,请重试"
data = load_game_data()
portfolio = data.setdefault("portfolios", {}).setdefault(
user_id, {"holdings": {}, "transactions": []}
)
holding = portfolio["holdings"].setdefault(sd["symbol"], {"shares": 0, "avg_cost": 0.0})
total_shares = holding["shares"] + shares
holding["avg_cost"] = round(
(holding["avg_cost"] * holding["shares"] + price * shares) / total_shares, 2
)
holding["shares"] = total_shares
portfolio["transactions"].append({
"time": datetime.now().strftime("%Y-%m-%d %H:%M"),
"type": "BUY",
"symbol": sd["symbol"],
"shares": shares,
"price": price,
"fee": fee,
"total": total_need,
})
save_game_data(data)
return (
f"@{nickname} ✅ 买入 {stock_display_name(sd)} {shares}\n"
f"💰 成交价:{price:.2f}\n"
f"💵 成本:{total_cost:.2f}\n"
f"💸 手续费:{fee:.2f}{fee_pct}%\n"
f"💡 卖出时还需{fee_pct}%手续费+利润税"
)
def sell(user_id: str, nickname: str, symbol: str, shares: int) -> str:
"""卖出股票。"""
if shares < MIN_TRANSACTION_SHARES:
return f"❌ 最少卖出 {MIN_TRANSACTION_SHARES}"
sd = resolve_symbol(symbol)
if not sd:
return f"❌ 未知股票「{symbol}」,支持的:{', '.join(s['symbol'] for s in STOCK_DEFINITIONS)}"
data = load_game_data()
portfolio = data.get("portfolios", {}).get(user_id)
if not portfolio:
return "❌ 你还没有持仓哦,先买入一些股票吧!"
holding = portfolio["holdings"].get(sd["symbol"])
if not holding or holding["shares"] < shares:
return f"❌ 持仓不足!当前持有 {stock_display_name(sd)}{holding['shares'] if holding else 0}"
is_liquidation = holding["shares"] == shares
# 获取实时价格
api_data = fetch_stock_price(sd["api_code"])
if not api_data:
return f"❌ 获取 {stock_display_name(sd)} 实时行情失败,请稍后再试"
sell_price = api_data["price"]
buy_price = holding["avg_cost"]
total_income = round(sell_price * shares, 2)
fee = round(total_income * TRANSACTION_FEE_RATE, 2)
tax = round(total_income * TAX_RATE, 2)
net_income = round(total_income - fee - tax, 2)
cost_basis = round(buy_price * shares, 2)
profit = round(net_income - cost_basis, 2)
add_cash(user_id, net_income)
holding["shares"] -= shares
if holding["shares"] <= 0:
del portfolio["holdings"][sd["symbol"]]
portfolio["transactions"].append({
"time": datetime.now().strftime("%Y-%m-%d %H:%M"),
"type": "SELL",
"symbol": sd["symbol"],
"shares": shares,
"price": sell_price,
"fee": fee,
"tax": tax,
"total": net_income,
})
save_game_data(data)
action = "清仓" if is_liquidation else "卖出"
profit_emoji = "🟢" if profit >= 0 else "🔴"
return (
f"@{nickname} {action} {stock_display_name(sd)} {shares}\n"
f"买入价格:{buy_price:.2f}\n"
f"抛售价格:{sell_price:.2f}\n"
f"💵 毛收入:{total_income:.2f}\n"
f"💸 手续费:{fee:.2f}\n"
f"📄 税费:{tax:.2f}\n"
f"📥 实际到账:{net_income:.2f}\n"
f"{profit_emoji} 总盈亏:{profit:+.2f}\n"
f"💰 资金已到账!"
)
def portfolio(user_id: str, nickname: str) -> str:
"""查看我的持仓。"""
data = load_game_data()
portfolio_data = data.get("portfolios", {}).get(user_id)
if not portfolio_data or not portfolio_data.get("holdings"):
qd = load_qiandao_data()
user = qd.get("users", {}).get(user_id, {})
name = user.get("nickname", nickname)
return (
f"📭 {name} 还没有持仓\n"
f"💡 发送 买入股票 名称 数量 来购买"
)
prices = fetch_all_prices()
lines = [f"📊 {nickname} 的持仓", "" * 22]
total_market_value = 0.0
total_cost = 0.0
for sym, holding in portfolio_data["holdings"].items():
sd = SYMBOLS_LOWER.get(sym.lower())
if not sd:
continue
info = prices.get(sym, {})
price = info.get("price")
if price is None:
lines.append(f"⚠️ 【{stock_display_name(sd)}】行情获取失败...")
continue
shares = holding["shares"]
market_value = round(price * shares, 2)
cost = round(holding["avg_cost"] * shares, 2)
profit = round(market_value - cost, 2)
profit_pct = round((profit / cost) * 100, 2) if cost > 0 else 0
total_market_value += market_value
total_cost += cost
emoji = "📈" if profit >= 0 else "📉"
lines.append(
f"{emoji}{stock_display_name(sd)}{shares}\n"
f" 均价:{holding['avg_cost']:.2f} | 现价:{price:.2f}\n"
f" 市值:{market_value:.2f} | {emoji} {profit:+.2f}{profit_pct:+.1f}%"
)
lines.append("" * 22)
total_profit = round(total_market_value - total_cost, 2)
total_profit_pct = round((total_profit / (total_cost or 1)) * 100, 1)
lines.append(f"💎 总市值:{total_market_value:.2f}(投入{total_cost:.2f}")
emoji = "📈" if total_profit >= 0 else "📉"
lines.append(f"{emoji} 总盈亏:{total_profit:+.2f}{total_profit_pct:.1f}%")
return "\n".join(lines)
def rank(user_id: str, nickname: str, top_n: int = 10) -> str:
"""股市排行榜。"""
data = load_game_data()
prices = fetch_all_prices()
qd = load_qiandao_data()
qd_users = qd.get("users", {})
game_ports = data.get("portfolios", {})
user_list: list[tuple[str, float, float, float, str]] = []
for uid, quser in qd_users.items():
name = quser.get("nickname", uid)
holdings_value = 0.0
total_cost = 0.0
port = game_ports.get(uid, {}).get("holdings", {})
for sym, h in port.items():
info = prices.get(sym, {})
price = info.get("price")
if price is not None:
holdings_value += price * h["shares"]
total_cost += h["avg_cost"] * h["shares"]
if holdings_value > 0:
profit = round(holdings_value - total_cost, 2)
profit_pct = round((profit / (total_cost or 1)) * 100, 1)
user_list.append((uid, round(holdings_value, 2), profit, profit_pct, name))
user_list.sort(key=lambda x: x[1], reverse=True)
if not user_list:
return "📭 还没有人进行股票交易,快来成为第一个股神吧!"
lines = ["🏆 股市排行榜", "" * 22]
for rank_i, (uid, mv, profit, pct, name) in enumerate(user_list[:top_n], 1):
medal = {1: "🥇", 2: "🥈", 3: "🥉"}.get(rank_i, f"#{rank_i}")
profit_str = f"+{profit}" if profit >= 0 else f"{profit}"
lines.append(
f"{medal} {name}\n"
f" 市值:{mv:.1f} | 盈亏:{profit_str} ({pct:.1f}%)"
)
lines.append("" * 22)
lines.append("💡 发送 买入股票 名称 数量 来购买")
lines.append("💡 发送 我的持仓 查看持股")
lines.append("💡 发送 股市排行榜 查看股神榜")
return "\n".join(lines)
def history(user_id: str, nickname: str, limit: int = 10) -> str:
"""查看交易记录。"""
data = load_game_data()
port = data.get("portfolios", {}).get(user_id)
if not port or not port.get("transactions"):
return f"📭 {nickname} 还没有交易记录"
txns = port["transactions"][-limit:]
lines = [f"📋 {nickname} 的交易记录(最近{len(txns)}条)", "" * 28]
for txn in reversed(txns):
emoji = "🟢" if txn["type"] == "BUY" else "🔴"
lines.append(
f"{emoji} [{txn['time']}] {txn['type']} {txn['symbol']}\n"
f" 数量 {txn['shares']}× {txn['price']:.2f} | 手续费 {txn['fee']:.2f}"
)
return "\n".join(lines)
def reset_game(user_id: str) -> str:
"""重置用户股票数据。"""
data = load_game_data()
if user_id in data.get("portfolios", {}):
del data["portfolios"][user_id]
save_game_data(data)
return f"✅ 已重置用户 {user_id} 的股票数据"
return f"❌ 用户 {user_id} 没有股票数据"
# ====================================================================
# CLI 主入口
# ====================================================================
def main() -> int:
"""CLI 入口。"""
args = sys.argv[1:]
if not args:
print("用法: python gupiao.py <命令> [参数...]")
print("命令: market, buy, sell, portfolio, rank, history, reset")
return 1
cmd = args[0]
if cmd == "market":
uid = args[1] if len(args) > 1 else "unknown"
nickname = args[2] if len(args) > 2 else uid
print(market(uid, nickname))
elif cmd in ("buy", "sell"):
if len(args) < 3:
print(f"用法: python gupiao.py {cmd} <user_id> <名称> <数量> [nickname]")
return 1
uid = args[1]
# 判断 args[2] 是否为合并格式含数字如「苹果股票13股」
import re as _re2
if _re2.search(r'\d', args[2]):
sym_raw, shares_num = parse_symbol_shares(args[2])
if sym_raw is None or shares_num is None:
print("❌ 格式错误,示例: buy 腾讯 10 或 buy 腾讯股票10股")
return 1
symbol = sym_raw
shares = shares_num
nickname = args[3] if len(args) > 3 else uid
else:
if len(args) < 4:
print(f"用法: python gupiao.py {cmd} <user_id> <名称> <数量> [nickname]")
return 1
symbol = args[2]
try:
shares = int(args[3])
except ValueError:
print("❌ 数量必须是整数")
return 1
nickname = args[4] if len(args) > 4 else uid
if cmd == "buy":
print(buy(uid, nickname, symbol, shares))
else:
print(sell(uid, nickname, symbol, shares))
elif cmd == "portfolio":
uid = args[1] if len(args) > 1 else "unknown"
nickname = args[2] if len(args) > 2 else uid
print(portfolio(uid, nickname))
elif cmd == "rank":
top_n = int(args[1]) if len(args) > 1 else 10
print(rank("", "", top_n))
elif cmd == "history":
uid = args[1] if len(args) > 1 else "unknown"
limit = int(args[2]) if len(args) > 2 and args[2].isdigit() else 10
nickname = args[3] if len(args) > 3 else uid
print(history(uid, nickname, limit))
elif cmd == "reset":
if len(args) < 2:
print("请提供 user_id")
return 1
print(reset_game(args[1]))
else:
print(f"未知命令: {cmd}")
return 1
return 0
if __name__ == "__main__":
try:
raise SystemExit(main())
except SystemExit:
raise
except Exception:
traceback.print_exc(file=sys.stdout)
raise SystemExit(1)