From 8912f45ceb63b790e7ef773cbe662c7b4e18f341 Mon Sep 17 00:00:00 2001 From: lj091715 <1091062319@qq.com> Date: Wed, 20 May 2026 21:38:50 +0800 Subject: [PATCH] =?UTF-8?q?=E4=B8=8A=E4=BC=A0=E6=96=87=E4=BB=B6=E8=87=B3?= =?UTF-8?q?=20gupiao/scripts?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- gupiao/scripts/gupiao.py | 603 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 603 insertions(+) create mode 100644 gupiao/scripts/gupiao.py diff --git a/gupiao/scripts/gupiao.py b/gupiao/scripts/gupiao.py new file mode 100644 index 0000000..f7ed4a3 --- /dev/null +++ b/gupiao/scripts/gupiao.py @@ -0,0 +1,603 @@ +#!/usr/bin/env python3 +"""股票游戏插件 - 10只真实股票交易,与签到积分联动""" +from __future__ import annotations + +import json +import os +import sys +import traceback +import urllib.error +import urllib.request +from datetime import datetime +from typing import Any + +sys.stderr = sys.stdout + +# ---------- 路径配置 ---------- +SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) +DATA_DIR = os.path.join(SCRIPT_DIR, "..", "data") +os.makedirs(DATA_DIR, exist_ok=True) + +GUPIAO_FILE = os.path.join(DATA_DIR, "gupiao.json") +QIANDAN_FILE = os.path.join( + SCRIPT_DIR, "..", "..", "qiandao", "data", "qiandao.json" +) + +# ---------- 股票池定义 ---------- +STOCK_DEFINITIONS = [ + {"symbol": "腾讯", "api_code": "hk00700"}, + {"symbol": "阿里", "api_code": "hk09988"}, + {"symbol": "苹果", "api_code": "usAAPL"}, + {"symbol": "平安", "api_code": "sh601318"}, + {"symbol": "特斯拉", "api_code": "usTSLA"}, + {"symbol": "工业富联","api_code": "sh601138"}, + {"symbol": "药明康德","api_code": "sh603259"}, + {"symbol": "宁德时代","api_code": "sz300750"}, + {"symbol": "茅台", "api_code": "sh600519"}, + {"symbol": "三七互娱","api_code": "sz002555"}, +] +SYMBOLS_LOWER = {s["symbol"].lower(): s for s in STOCK_DEFINITIONS} + +# ---------- 游戏参数 ---------- +TRANSACTION_FEE_RATE = 0.001 # 交易手续费 0.1% +TAX_RATE = 0.0005 # 印花税费率 0.05%(卖出时收取) +MIN_TRANSACTION_SHARES = 1 # 最小交易数量 +API_TIMEOUT = 10 # API 请求超时(秒) + + +# ==================================================================== +# API 数据获取 +# ==================================================================== + +# --- qt.gtimg.cn (股票) --- + +def parse_gtimg_response(raw: str) -> dict[str, Any] | None: + """解析 qt.gtimg.cn 返回的 ~ 分隔数据。""" + try: + eq_pos = raw.find("=") + if eq_pos == -1: + return None + content = raw[eq_pos + 1:].strip().strip('"').strip(";") + fields = content.split("~") + if len(fields) < 34: + return None + name = fields[1] if len(fields) > 1 else "" + try: + price = float(fields[3]) if fields[3] else 0.0 + except (ValueError, TypeError): + return None + try: + change = float(fields[31]) if fields[31] else 0.0 + except ValueError: + change = 0.0 + try: + change_pct = float(fields[32]) if fields[32] else 0.0 + except ValueError: + change_pct = 0.0 + try: + high = float(fields[33]) if fields[33] else price + except ValueError: + high = price + try: + low = float(fields[34]) if fields[34] else price + except ValueError: + low = price + return { + "name": name, "price": price, + "change": change, "change_pct": change_pct, + "high": high, "low": low, + } + except Exception: + return None + + +def fetch_stock_price(api_code: str) -> dict[str, Any] | None: + """从 qt.gtimg.cn 获取单只股票实时行情。""" + url = f"https://qt.gtimg.cn/q={api_code}" + try: + req = urllib.request.Request(url, headers={ + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36", + }) + with urllib.request.urlopen(req, timeout=API_TIMEOUT) as resp: + raw = resp.read().decode("gbk") + return parse_gtimg_response(raw) + except (urllib.error.URLError, TimeoutError, OSError): + return None + + +# --- 统一查询 --- + +def fetch_all_prices() -> dict[str, dict[str, Any]]: + """批量获取所有10只股票的真实价格。""" + results: dict[str, dict[str, Any]] = {} + for sd in STOCK_DEFINITIONS: + api_data = fetch_stock_price(sd["api_code"]) + if api_data: + results[sd["symbol"]] = { + "price": api_data["price"], + "change": api_data.get("change", 0), + "change_pct": api_data.get("change_pct", 0), + "high": api_data.get("high", api_data["price"]), + "low": api_data.get("low", api_data["price"]), + } + else: + results[sd["symbol"]] = {"price": None, "error": True} + return results + + +# ==================================================================== +# 数据加载 / 保存 +# ==================================================================== + +def load_game_data() -> dict[str, Any]: + """加载股票游戏数据(持仓和交易记录)。""" + if os.path.exists(GUPIAO_FILE): + try: + with open(GUPIAO_FILE, encoding="utf-8") as f: + return json.load(f) + except (json.JSONDecodeError, OSError): + pass + return {"portfolios": {}} + + +def save_game_data(data: dict[str, Any]) -> None: + """保存股票游戏数据。""" + with open(GUPIAO_FILE, "w", encoding="utf-8") as f: + json.dump(data, f, ensure_ascii=False, indent=2) + + +def load_qiandao_data() -> dict[str, Any]: + """加载签到数据。""" + if os.path.exists(QIANDAN_FILE): + try: + with open(QIANDAN_FILE, encoding="utf-8") as f: + return json.load(f) + except (json.JSONDecodeError, OSError): + pass + return {"users": {}} + + +def save_qiandao_data(data: dict[str, Any]) -> None: + """保存签到数据。""" + with open(QIANDAN_FILE, "w", encoding="utf-8") as f: + json.dump(data, f, ensure_ascii=False, indent=2) + + +# ==================================================================== +# 签到积分操作 +# ==================================================================== + +def get_user_cash(user_id: str) -> float: + """获取用户的可用现金。""" + qd = load_qiandao_data() + user = qd.get("users", {}).get(user_id) + if user is None: + return 0.0 + return float(user.get("total_points", 0)) + + +def deduct_cash(user_id: str, amount: float) -> bool: + """从签到积分扣除金额(买入)。""" + qd = load_qiandao_data() + user = qd.get("users", {}).get(user_id) + if user is None: + return False + current = float(user.get("total_points", 0)) + if current < amount - 0.001: + return False + user["total_points"] = round(current - amount, 2) + save_qiandao_data(qd) + return True + + +def add_cash(user_id: str, amount: float) -> None: + """向签到积分增加金额(卖出所得)。""" + qd = load_qiandao_data() + user = qd.get("users", {}).get(user_id) + if user is None: + return + current = float(user.get("total_points", 0)) + user["total_points"] = round(current + amount, 2) + save_qiandao_data(qd) + + +# ==================================================================== +# 核心业务函数 +# ==================================================================== + +def resolve_symbol(text: str) -> dict[str, Any] | None: + """将用户输入转换为股票定义。支持「苹果」「苹果股票」等格式。""" + key = text.strip().lower().replace("股票", "").replace(" ", "") + return SYMBOLS_LOWER.get(key) + + +def parse_symbol_shares(combined: str) -> tuple[str | None, int | None]: + """解析合并格式,如「苹果股票13股」→ (苹果, 13)。""" + raw = combined.strip() + # 提取尾部数字, support 13 or 13股 + import re as _re + m = _re.search(r'(\d+)(?:股)?$', raw) + if not m: + return None, None + shares = int(m.group(1)) + sym_raw = raw[:m.start()].strip().replace("股票", "").replace(" ", "") + if sym_raw: + return sym_raw, shares + return None, None + + +def stock_display_name(sd: dict[str, Any]) -> str: + """返回显示名,如「腾讯股票」「比特币股票」。""" + return f"{sd['symbol']}股票" + + +def market(user_id: str, nickname: str) -> str: + """查看行情。""" + prices = fetch_all_prices() + lines = ["📈 股票行情", "━" * 22] + + for sd in STOCK_DEFINITIONS: + sym = sd["symbol"] + info = prices.get(sym, {}) + if info.get("error") or info.get("price") is None: + lines.append(f"⚠️ 【{stock_display_name(sd)}】行情获取失败...") + continue + + price = info["price"] + change = info["change"] + arrow = "📈" if change >= 0 else "📉" + pct_abs = abs(info["change_pct"]) + vol = "高" if pct_abs >= 2 else ("中" if pct_abs >= 0.5 else "低") + lines.append( + f"{arrow} 【{stock_display_name(sd)}】{price:.2f} (波动:{vol})" + ) + + return "\n".join(lines) + + +def buy(user_id: str, nickname: str, symbol: str, shares: int) -> str: + """买入股票。""" + if shares < MIN_TRANSACTION_SHARES: + return f"❌ 最少买入 {MIN_TRANSACTION_SHARES} 股" + + sd = resolve_symbol(symbol) + if not sd: + return f"❌ 未知股票「{symbol}」,支持的:{', '.join(s['symbol'] for s in STOCK_DEFINITIONS)}" + + # 获取实时价格 + api_data = fetch_stock_price(sd["api_code"]) + if not api_data: + return f"❌ 获取 {stock_display_name(sd)} 实时行情失败,请稍后再试" + price = api_data["price"] + + total_cost = round(price * shares, 2) + fee = round(total_cost * TRANSACTION_FEE_RATE, 2) + total_need = round(total_cost + fee, 2) + fee_pct = TRANSACTION_FEE_RATE * 100 + + cash = get_user_cash(user_id) + if cash < total_need: + return ( + f"❌ 现金不足!\n" + f" 💰 需要:{total_need:.2f} 积分(含手续费 {fee:.2f})\n" + f" 💵 持有:{cash:.2f} 积分\n" + f" 💡 还差 {total_need - cash:.2f} 积分,去签到赚积分吧!" + ) + + if not deduct_cash(user_id, total_need): + return "❌ 扣款失败,请重试" + data = load_game_data() + portfolio = data.setdefault("portfolios", {}).setdefault( + user_id, {"holdings": {}, "transactions": []} + ) + holding = portfolio["holdings"].setdefault(sd["symbol"], {"shares": 0, "avg_cost": 0.0}) + total_shares = holding["shares"] + shares + holding["avg_cost"] = round( + (holding["avg_cost"] * holding["shares"] + price * shares) / total_shares, 2 + ) + holding["shares"] = total_shares + portfolio["transactions"].append({ + "time": datetime.now().strftime("%Y-%m-%d %H:%M"), + "type": "BUY", + "symbol": sd["symbol"], + "shares": shares, + "price": price, + "fee": fee, + "total": total_need, + }) + save_game_data(data) + + return ( + f"@{nickname} ✅ 买入 {stock_display_name(sd)} {shares}股\n" + f"💰 成交价:{price:.2f}\n" + f"💵 成本:{total_cost:.2f}\n" + f"💸 手续费:{fee:.2f}({fee_pct}%)\n" + f"💡 卖出时还需{fee_pct}%手续费+利润税" + ) + + +def sell(user_id: str, nickname: str, symbol: str, shares: int) -> str: + """卖出股票。""" + if shares < MIN_TRANSACTION_SHARES: + return f"❌ 最少卖出 {MIN_TRANSACTION_SHARES} 股" + + sd = resolve_symbol(symbol) + if not sd: + return f"❌ 未知股票「{symbol}」,支持的:{', '.join(s['symbol'] for s in STOCK_DEFINITIONS)}" + + data = load_game_data() + portfolio = data.get("portfolios", {}).get(user_id) + if not portfolio: + return "❌ 你还没有持仓哦,先买入一些股票吧!" + + holding = portfolio["holdings"].get(sd["symbol"]) + if not holding or holding["shares"] < shares: + return f"❌ 持仓不足!当前持有 {stock_display_name(sd)}:{holding['shares'] if holding else 0} 股" + + is_liquidation = holding["shares"] == shares + + # 获取实时价格 + api_data = fetch_stock_price(sd["api_code"]) + if not api_data: + return f"❌ 获取 {stock_display_name(sd)} 实时行情失败,请稍后再试" + sell_price = api_data["price"] + buy_price = holding["avg_cost"] + + total_income = round(sell_price * shares, 2) + fee = round(total_income * TRANSACTION_FEE_RATE, 2) + tax = round(total_income * TAX_RATE, 2) + net_income = round(total_income - fee - tax, 2) + + cost_basis = round(buy_price * shares, 2) + profit = round(net_income - cost_basis, 2) + + add_cash(user_id, net_income) + holding["shares"] -= shares + if holding["shares"] <= 0: + del portfolio["holdings"][sd["symbol"]] + + portfolio["transactions"].append({ + "time": datetime.now().strftime("%Y-%m-%d %H:%M"), + "type": "SELL", + "symbol": sd["symbol"], + "shares": shares, + "price": sell_price, + "fee": fee, + "tax": tax, + "total": net_income, + }) + save_game_data(data) + + action = "清仓" if is_liquidation else "卖出" + profit_emoji = "🟢" if profit >= 0 else "🔴" + return ( + f"@{nickname} {action} {stock_display_name(sd)} {shares}股\n" + f"买入价格:{buy_price:.2f}\n" + f"抛售价格:{sell_price:.2f}\n" + f"💵 毛收入:{total_income:.2f}\n" + f"💸 手续费:{fee:.2f}\n" + f"📄 税费:{tax:.2f}\n" + f"📥 实际到账:{net_income:.2f}\n" + f"{profit_emoji} 总盈亏:{profit:+.2f}\n" + f"💰 资金已到账!" + ) + + +def portfolio(user_id: str, nickname: str) -> str: + """查看我的持仓。""" + data = load_game_data() + portfolio_data = data.get("portfolios", {}).get(user_id) + + if not portfolio_data or not portfolio_data.get("holdings"): + qd = load_qiandao_data() + user = qd.get("users", {}).get(user_id, {}) + name = user.get("nickname", nickname) + return ( + f"📭 {name} 还没有持仓\n" + f"💡 发送 买入股票 名称 数量 来购买" + ) + + prices = fetch_all_prices() + lines = [f"📊 {nickname} 的持仓", "━" * 22] + + total_market_value = 0.0 + total_cost = 0.0 + + for sym, holding in portfolio_data["holdings"].items(): + sd = SYMBOLS_LOWER.get(sym.lower()) + if not sd: + continue + info = prices.get(sym, {}) + price = info.get("price") + if price is None: + lines.append(f"⚠️ 【{stock_display_name(sd)}】行情获取失败...") + continue + + shares = holding["shares"] + market_value = round(price * shares, 2) + cost = round(holding["avg_cost"] * shares, 2) + profit = round(market_value - cost, 2) + profit_pct = round((profit / cost) * 100, 2) if cost > 0 else 0 + + total_market_value += market_value + total_cost += cost + + emoji = "📈" if profit >= 0 else "📉" + lines.append( + f"{emoji} 【{stock_display_name(sd)}】{shares}股\n" + f" 均价:{holding['avg_cost']:.2f} | 现价:{price:.2f}\n" + f" 市值:{market_value:.2f} | {emoji} {profit:+.2f}({profit_pct:+.1f}%)" + ) + + lines.append("━" * 22) + total_profit = round(total_market_value - total_cost, 2) + total_profit_pct = round((total_profit / (total_cost or 1)) * 100, 1) + lines.append(f"💎 总市值:{total_market_value:.2f}(投入{total_cost:.2f})") + emoji = "📈" if total_profit >= 0 else "📉" + lines.append(f"{emoji} 总盈亏:{total_profit:+.2f}({total_profit_pct:.1f}%)") + return "\n".join(lines) + + +def rank(user_id: str, nickname: str, top_n: int = 10) -> str: + """股市排行榜。""" + data = load_game_data() + prices = fetch_all_prices() + qd = load_qiandao_data() + qd_users = qd.get("users", {}) + game_ports = data.get("portfolios", {}) + + user_list: list[tuple[str, float, float, float, str]] = [] + + for uid, quser in qd_users.items(): + name = quser.get("nickname", uid) + holdings_value = 0.0 + total_cost = 0.0 + port = game_ports.get(uid, {}).get("holdings", {}) + for sym, h in port.items(): + info = prices.get(sym, {}) + price = info.get("price") + if price is not None: + holdings_value += price * h["shares"] + total_cost += h["avg_cost"] * h["shares"] + if holdings_value > 0: + profit = round(holdings_value - total_cost, 2) + profit_pct = round((profit / (total_cost or 1)) * 100, 1) + user_list.append((uid, round(holdings_value, 2), profit, profit_pct, name)) + + user_list.sort(key=lambda x: x[1], reverse=True) + + if not user_list: + return "📭 还没有人进行股票交易,快来成为第一个股神吧!" + + lines = ["🏆 股市排行榜", "━" * 22] + for rank_i, (uid, mv, profit, pct, name) in enumerate(user_list[:top_n], 1): + medal = {1: "🥇", 2: "🥈", 3: "🥉"}.get(rank_i, f"#{rank_i}") + profit_str = f"+{profit}" if profit >= 0 else f"{profit}" + lines.append( + f"{medal} {name}\n" + f" 市值:{mv:.1f} | 盈亏:{profit_str} ({pct:.1f}%)" + ) + + lines.append("━" * 22) + lines.append("💡 发送 买入股票 名称 数量 来购买") + lines.append("💡 发送 我的持仓 查看持股") + lines.append("💡 发送 股市排行榜 查看股神榜") + return "\n".join(lines) + + +def history(user_id: str, nickname: str, limit: int = 10) -> str: + """查看交易记录。""" + data = load_game_data() + port = data.get("portfolios", {}).get(user_id) + if not port or not port.get("transactions"): + return f"📭 {nickname} 还没有交易记录" + + txns = port["transactions"][-limit:] + lines = [f"📋 {nickname} 的交易记录(最近{len(txns)}条)", "─" * 28] + for txn in reversed(txns): + emoji = "🟢" if txn["type"] == "BUY" else "🔴" + lines.append( + f"{emoji} [{txn['time']}] {txn['type']} {txn['symbol']}\n" + f" 数量 {txn['shares']} 股 × {txn['price']:.2f} | 手续费 {txn['fee']:.2f}" + ) + return "\n".join(lines) + + +def reset_game(user_id: str) -> str: + """重置用户股票数据。""" + data = load_game_data() + if user_id in data.get("portfolios", {}): + del data["portfolios"][user_id] + save_game_data(data) + return f"✅ 已重置用户 {user_id} 的股票数据" + return f"❌ 用户 {user_id} 没有股票数据" + + +# ==================================================================== +# CLI 主入口 +# ==================================================================== + +def main() -> int: + """CLI 入口。""" + args = sys.argv[1:] + if not args: + print("用法: python gupiao.py <命令> [参数...]") + print("命令: market, buy, sell, portfolio, rank, history, reset") + return 1 + + cmd = args[0] + + if cmd == "market": + uid = args[1] if len(args) > 1 else "unknown" + nickname = args[2] if len(args) > 2 else uid + print(market(uid, nickname)) + + elif cmd in ("buy", "sell"): + if len(args) < 3: + print(f"用法: python gupiao.py {cmd} <名称> <数量> [nickname]") + return 1 + uid = args[1] + + # 判断 args[2] 是否为合并格式(含数字),如「苹果股票13股」 + import re as _re2 + if _re2.search(r'\d', args[2]): + sym_raw, shares_num = parse_symbol_shares(args[2]) + if sym_raw is None or shares_num is None: + print("❌ 格式错误,示例: buy 腾讯 10 或 buy 腾讯股票10股") + return 1 + symbol = sym_raw + shares = shares_num + nickname = args[3] if len(args) > 3 else uid + else: + if len(args) < 4: + print(f"用法: python gupiao.py {cmd} <名称> <数量> [nickname]") + return 1 + symbol = args[2] + try: + shares = int(args[3]) + except ValueError: + print("❌ 数量必须是整数") + return 1 + nickname = args[4] if len(args) > 4 else uid + + if cmd == "buy": + print(buy(uid, nickname, symbol, shares)) + else: + print(sell(uid, nickname, symbol, shares)) + + elif cmd == "portfolio": + uid = args[1] if len(args) > 1 else "unknown" + nickname = args[2] if len(args) > 2 else uid + print(portfolio(uid, nickname)) + + elif cmd == "rank": + top_n = int(args[1]) if len(args) > 1 else 10 + print(rank("", "", top_n)) + + elif cmd == "history": + uid = args[1] if len(args) > 1 else "unknown" + limit = int(args[2]) if len(args) > 2 and args[2].isdigit() else 10 + nickname = args[3] if len(args) > 3 else uid + print(history(uid, nickname, limit)) + + elif cmd == "reset": + if len(args) < 2: + print("请提供 user_id") + return 1 + print(reset_game(args[1])) + + else: + print(f"未知命令: {cmd}") + return 1 + + return 0 + + +if __name__ == "__main__": + try: + raise SystemExit(main()) + except SystemExit: + raise + except Exception: + traceback.print_exc(file=sys.stdout) + raise SystemExit(1)