#!/usr/bin/env python3 # pyright: reportExplicitAny=false, reportAny=false, reportUnknownMemberType=false, reportUnknownArgumentType=false, reportUnknownVariableType=false, reportUnusedVariable=false, reportUnusedImport=false """签到游戏插件 - 支持每日签到、连续签到奖励、排行榜""" from __future__ import annotations import json import os import sys import traceback from datetime import date, timedelta from typing import Any # ---------- 数据存储 ---------- DATA_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "data") DATA_FILE = os.path.join(DATA_DIR, "qiandao.json") os.makedirs(DATA_DIR, exist_ok=True) # ---------- 签到奖励配置 ---------- BASE_POINTS = 10 # 每日基础签到积分 CONSECUTIVE_BONUS = { # 连续签到额外奖励 3: 5, # 连续3天额外+5 7: 15, # 连续7天额外+15 15: 30, # 连续15天额外+30 30: 80, # 连续30天额外+80 } MAX_HISTORY_DAYS = 365 # 保留签到历史的天数 def load_data() -> dict[str, Any]: """从 JSON 文件加载用户数据。""" if os.path.exists(DATA_FILE): try: with open(DATA_FILE, encoding="utf-8") as f: return json.load(f) except (json.JSONDecodeError, OSError): pass return {"users": {}, "last_cleanup": ""} def save_data(data: dict[str, Any]) -> None: """保存用户数据到 JSON 文件。""" with open(DATA_FILE, "w", encoding="utf-8") as f: json.dump(data, f, ensure_ascii=False, indent=2) def today_str() -> str: return date.today().isoformat() def get_or_create_user(data: dict[str, Any], user_id: str, nickname: str) -> dict[str, Any]: """获取或创建用户数据。""" users = data["users"] if user_id not in users: users[user_id] = { "nickname": nickname, "total_points": 0, "consecutive_days": 0, "last_checkin": "", "history": [], # 签到日期列表 ["2026-05-01", ...] "points_history": [], # [(日期, 获得积分), ...] } else: # 更新昵称 if nickname: users[user_id]["nickname"] = nickname return users[user_id] def do_checkin(user_id: str, nickname: str = "") -> str: """执行签到,返回回复文本。""" data = load_data() user = get_or_create_user(data, user_id, nickname) today = today_str() # ---------- 已签到 ---------- if today in user["history"]: return ( f"⚠️ {user['nickname']},你今天已经签过到啦!\n" f"📅 总积分:{user['total_points']} 分\n" f"🔥 连续签到:{user['consecutive_days']} 天" ) # ---------- 计算连续签到 ---------- yesterday = date.fromisoformat(today) yesterday_str = yesterday.isoformat() if user["last_checkin"] == yesterday_str: user["consecutive_days"] += 1 else: user["consecutive_days"] = 1 # ---------- 计算积分 ---------- earned = BASE_POINTS bonus_reason = [] # 连续签到额外奖励 for threshold, bonus in sorted(CONSECUTIVE_BONUS.items()): if user["consecutive_days"] == threshold: earned += bonus bonus_reason.append(f"🎉 连续签到 {threshold} 天奖励 +{bonus} 分") break # 只触发最近的里程碑 # 如果连续天数超过最大里程碑但又不是正好等于,奖励固定值 milestones = sorted(CONSECUTIVE_BONUS.keys()) if user["consecutive_days"] > milestones[-1]: # 超过最大里程碑的,每多一天额外+1 extra_days = user["consecutive_days"] - milestones[-1] earned += extra_days bonus_reason.append(f"🔥 长期签到奖励 +{extra_days} 分") user["total_points"] += earned user["last_checkin"] = today user["history"].append(today) user["points_history"].append(f"{today}|{earned}") # ---------- 清理超期历史 ---------- cleanup_old_history(data) save_data(data) # ---------- 构建回复 ---------- lines = [ f"✅ 签到成功!{user['nickname']}", f"📅 获得 {earned} 积分(基础 {BASE_POINTS} 分)", ] lines.extend(bonus_reason) lines.append(f"🔥 已连续签到 {user['consecutive_days']} 天") lines.append(f"💰 总积分:{user['total_points']}") return "\n".join(lines) def query_user(user_id: str, nickname: str = "") -> str: """查询用户签到状态。""" data = load_data() if user_id not in data["users"]: return f"❌ {nickname or user_id} 还没有签到记录,发送「签到」开始吧!" user = data["users"][user_id] if nickname: user["nickname"] = nickname today = today_str() checked = "✅ 已签到" if today in user["history"] else "❌ 未签到" return ( f"📊 {user['nickname']} 的签到信息\n" f"{'━' * 20}\n" f"📅 今日状态:{checked}\n" f"💰 总积分:{user['total_points']} 分\n" f"🔥 连续签到:{user['consecutive_days']} 天\n" f"📆 累计签到:{len(user['history'])} 天" ) def leaderboard(top_n: int = 10) -> str: """返回积分排行榜。""" data = load_data() sorted_users = sorted( data["users"].values(), key=lambda u: u["total_points"], reverse=True, ) if not sorted_users: return "📭 还没有人签到,快发送「签到」成为第一名吧!" lines = ["🏆 签到排行榜 🏆", "━" * 20] for rank, user in enumerate(sorted_users[:top_n], 1): medal = {1: "🥇", 2: "🥈", 3: "🥉"}.get(rank, f"{rank}.") lines.append(f"{medal} {user['nickname']} — {user['total_points']} 分(连续{user['consecutive_days']}天)") return "\n".join(lines) def cleanup_old_history(data: dict[str, Any]) -> None: """清理超过 MAX_HISTORY_DAYS 的历史记录。""" today = date.today() cutoff = (today - timedelta(days=MAX_HISTORY_DAYS)).isoformat() for uid, user in data["users"].items(): user["history"] = [d for d in user["history"] if d >= cutoff] user["points_history"] = [p for p in user["points_history"] if p.split("|")[0] >= cutoff] def reset_user(user_id: str) -> str: """重置用户数据(管理员功能)。""" data = load_data() if user_id in data["users"]: del data["users"][user_id] save_data(data) return f"✅ 已重置用户 {user_id} 的签到数据" return f"❌ 用户 {user_id} 不存在" def main() -> int: """主入口,根据命令行参数执行不同操作。 用法: python qiandao.py checkin [nickname] python qiandao.py query [nickname] python qiandao.py rank [top_n] python qiandao.py reset """ args = sys.argv[1:] if not args: print("用法: python qiandao.py <命令> [参数...]") print("命令: checkin, query, rank, reset") return 1 cmd = args[0] if cmd == "checkin": if len(args) < 2: print("请提供 user_id") return 1 uid = args[1] nickname = args[2] if len(args) > 2 else uid print(do_checkin(uid, nickname)) elif cmd == "query": if len(args) < 2: print("请提供 user_id") return 1 uid = args[1] nickname = args[2] if len(args) > 2 else uid print(query_user(uid, nickname)) elif cmd == "rank": top_n = int(args[1]) if len(args) > 1 else 10 print(leaderboard(top_n)) elif cmd == "reset": if len(args) < 2: print("请提供 user_id") return 1 print(reset_user(args[1])) else: print(f"未知命令: {cmd}") return 1 return 0 if __name__ == "__main__": try: raise SystemExit(main()) except SystemExit: raise except Exception: traceback.print_exc(file=sys.stdout) raise SystemExit(1)