更新 qiandao/scripts/qiandao.py

This commit is contained in:
lj091715 2026-05-20 21:55:10 +08:00
parent 195d8eca99
commit 22336380ad

View File

@ -1,249 +1,246 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
"""签到游戏插件 - 支持每日签到、连续签到奖励、排行榜""" # pyright: reportExplicitAny=false, reportAny=false, reportUnknownMemberType=false, reportUnknownArgumentType=false, reportUnknownVariableType=false, reportUnusedVariable=false, reportUnusedImport=false
from __future__ import annotations """签到游戏插件 - 支持每日签到、连续签到奖励、排行榜"""
from __future__ import annotations
import json
import os import json
import sys import os
import time import sys
import traceback import traceback
from datetime import date, datetime from datetime import date, timedelta
from typing import Any from typing import Any
sys.stderr = sys.stdout # ---------- 数据存储 ----------
DATA_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "data")
# ---------- 数据存储 ---------- DATA_FILE = os.path.join(DATA_DIR, "qiandao.json")
DATA_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "data") os.makedirs(DATA_DIR, exist_ok=True)
DATA_FILE = os.path.join(DATA_DIR, "qiandao.json")
os.makedirs(DATA_DIR, exist_ok=True) # ---------- 签到奖励配置 ----------
BASE_POINTS = 10 # 每日基础签到积分
# ---------- 签到奖励配置 ---------- CONSECUTIVE_BONUS = { # 连续签到额外奖励
BASE_POINTS = 10 # 每日基础签到积分 3: 5, # 连续3天额外+5
CONSECUTIVE_BONUS = { # 连续签到额外奖励 7: 15, # 连续7天额外+15
3: 5, # 连续3天额外+5 15: 30, # 连续15天额外+30
7: 15, # 连续7天额外+15 30: 80, # 连续30天额外+80
15: 30, # 连续15天额外+30 }
30: 80, # 连续30天额外+80 MAX_HISTORY_DAYS = 365 # 保留签到历史的天数
}
MAX_HISTORY_DAYS = 365 # 保留签到历史的天数
def load_data() -> dict[str, Any]:
"""从 JSON 文件加载用户数据。"""
def load_data() -> dict[str, Any]: if os.path.exists(DATA_FILE):
"""从 JSON 文件加载用户数据。""" try:
if os.path.exists(DATA_FILE): with open(DATA_FILE, encoding="utf-8") as f:
try: return json.load(f)
with open(DATA_FILE, encoding="utf-8") as f: except (json.JSONDecodeError, OSError):
return json.load(f) pass
except (json.JSONDecodeError, OSError): return {"users": {}, "last_cleanup": ""}
pass
return {"users": {}, "last_cleanup": ""}
def save_data(data: dict[str, Any]) -> None:
"""保存用户数据到 JSON 文件。"""
def save_data(data: dict[str, Any]) -> None: with open(DATA_FILE, "w", encoding="utf-8") as f:
"""保存用户数据到 JSON 文件。""" json.dump(data, f, ensure_ascii=False, indent=2)
with open(DATA_FILE, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
def today_str() -> str:
return date.today().isoformat()
def today_str() -> str:
return date.today().isoformat()
def get_or_create_user(data: dict[str, Any], user_id: str, nickname: str) -> dict[str, Any]:
"""获取或创建用户数据。"""
def get_or_create_user(data: dict[str, Any], user_id: str, nickname: str) -> dict[str, Any]: users = data["users"]
"""获取或创建用户数据。""" if user_id not in users:
users = data["users"] users[user_id] = {
if user_id not in users: "nickname": nickname,
users[user_id] = { "total_points": 0,
"nickname": nickname, "consecutive_days": 0,
"total_points": 0, "last_checkin": "",
"consecutive_days": 0, "history": [], # 签到日期列表 ["2026-05-01", ...]
"last_checkin": "", "points_history": [], # [(日期, 获得积分), ...]
"history": [], # 签到日期列表 ["2026-05-01", ...] }
"points_history": [], # [(日期, 获得积分), ...] else:
} # 更新昵称
else: if nickname:
# 更新昵称 users[user_id]["nickname"] = nickname
if nickname: return users[user_id]
users[user_id]["nickname"] = nickname
return users[user_id]
def do_checkin(user_id: str, nickname: str = "") -> str:
"""执行签到,返回回复文本。"""
def do_checkin(user_id: str, nickname: str = "") -> str: data = load_data()
"""执行签到,返回回复文本。""" user = get_or_create_user(data, user_id, nickname)
data = load_data() today = today_str()
user = get_or_create_user(data, user_id, nickname)
today = today_str() # ---------- 已签到 ----------
if today in user["history"]:
# ---------- 已签到 ---------- return (
if today in user["history"]: f"⚠️ {user['nickname']},你今天已经签过到啦!\n"
return ( f"📅 总积分:{user['total_points']}\n"
f"⚠️ {user['nickname']},你今天已经签过到啦!\n" f"🔥 连续签到:{user['consecutive_days']}"
f"📅 总积分:{user['total_points']}\n" )
f"🔥 连续签到:{user['consecutive_days']}"
) # ---------- 计算连续签到 ----------
yesterday = date.fromisoformat(today)
# ---------- 计算连续签到 ---------- yesterday_str = yesterday.isoformat()
yesterday = date.fromisoformat(today)
yesterday_str = yesterday.isoformat() if user["last_checkin"] == yesterday_str:
user["consecutive_days"] += 1
if user["last_checkin"] == yesterday_str: else:
user["consecutive_days"] += 1 user["consecutive_days"] = 1
else:
user["consecutive_days"] = 1 # ---------- 计算积分 ----------
earned = BASE_POINTS
# ---------- 计算积分 ---------- bonus_reason = []
earned = BASE_POINTS
bonus_reason = [] # 连续签到额外奖励
for threshold, bonus in sorted(CONSECUTIVE_BONUS.items()):
# 连续签到额外奖励 if user["consecutive_days"] == threshold:
for threshold, bonus in sorted(CONSECUTIVE_BONUS.items()): earned += bonus
if user["consecutive_days"] == threshold: bonus_reason.append(f"🎉 连续签到 {threshold} 天奖励 +{bonus}")
earned += bonus break # 只触发最近的里程碑
bonus_reason.append(f"🎉 连续签到 {threshold} 天奖励 +{bonus}")
break # 只触发最近的里程碑 # 如果连续天数超过最大里程碑但又不是正好等于,奖励固定值
milestones = sorted(CONSECUTIVE_BONUS.keys())
# 如果连续天数超过最大里程碑但又不是正好等于,奖励固定值 if user["consecutive_days"] > milestones[-1]:
milestones = sorted(CONSECUTIVE_BONUS.keys()) # 超过最大里程碑的,每多一天额外+1
if user["consecutive_days"] > milestones[-1]: extra_days = user["consecutive_days"] - milestones[-1]
# 超过最大里程碑的,每多一天额外+1 earned += extra_days
extra_days = user["consecutive_days"] - milestones[-1] bonus_reason.append(f"🔥 长期签到奖励 +{extra_days}")
earned += extra_days
bonus_reason.append(f"🔥 长期签到奖励 +{extra_days}") user["total_points"] += earned
user["last_checkin"] = today
user["total_points"] += earned user["history"].append(today)
user["last_checkin"] = today user["points_history"].append(f"{today}|{earned}")
user["history"].append(today)
user["points_history"].append(f"{today}|{earned}") # ---------- 清理超期历史 ----------
cleanup_old_history(data)
# ---------- 清理超期历史 ----------
cleanup_old_history(data) save_data(data)
save_data(data) # ---------- 构建回复 ----------
lines = [
# ---------- 构建回复 ---------- f"✅ 签到成功!{user['nickname']}",
lines = [ f"📅 获得 {earned} 积分(基础 {BASE_POINTS} 分)",
f"✅ 签到成功!{user['nickname']}", ]
f"📅 获得 {earned} 积分(基础 {BASE_POINTS} 分)", lines.extend(bonus_reason)
] lines.append(f"🔥 已连续签到 {user['consecutive_days']}")
lines.extend(bonus_reason) lines.append(f"💰 总积分:{user['total_points']}")
lines.append(f"🔥 已连续签到 {user['consecutive_days']}") return "\n".join(lines)
lines.append(f"💰 总积分:{user['total_points']}")
return "\n".join(lines)
def query_user(user_id: str, nickname: str = "") -> str:
"""查询用户签到状态。"""
def query_user(user_id: str, nickname: str = "") -> str: data = load_data()
"""查询用户签到状态。""" if user_id not in data["users"]:
data = load_data() return f"{nickname or user_id} 还没有签到记录,发送「签到」开始吧!"
if user_id not in data["users"]: user = data["users"][user_id]
return f"{nickname or user_id} 还没有签到记录,发送「签到」开始吧!" if nickname:
user = data["users"][user_id] user["nickname"] = nickname
if nickname: today = today_str()
user["nickname"] = nickname checked = "✅ 已签到" if today in user["history"] else "❌ 未签到"
today = today_str() return (
checked = "✅ 已签到" if today in user["history"] else "❌ 未签到" f"📊 {user['nickname']} 的签到信息\n"
return ( f"{'' * 20}\n"
f"📊 {user['nickname']} 的签到信息\n" f"📅 今日状态:{checked}\n"
f"{'' * 20}\n" f"💰 总积分:{user['total_points']}\n"
f"📅 今日状态:{checked}\n" f"🔥 连续签到:{user['consecutive_days']}\n"
f"💰 总积分:{user['total_points']}\n" f"📆 累计签到:{len(user['history'])}"
f"🔥 连续签到:{user['consecutive_days']}\n" )
f"📆 累计签到:{len(user['history'])}"
)
def leaderboard(top_n: int = 10) -> str:
"""返回积分排行榜。"""
def leaderboard(top_n: int = 10) -> str: data = load_data()
"""返回积分排行榜。""" sorted_users = sorted(
data = load_data() data["users"].values(),
sorted_users = sorted( key=lambda u: u["total_points"],
data["users"].values(), reverse=True,
key=lambda u: u["total_points"], )
reverse=True, if not sorted_users:
) return "📭 还没有人签到,快发送「签到」成为第一名吧!"
if not sorted_users:
return "📭 还没有人签到,快发送「签到」成为第一名吧!" lines = ["🏆 签到排行榜 🏆", "" * 20]
for rank, user in enumerate(sorted_users[:top_n], 1):
lines = ["🏆 签到排行榜 🏆", "" * 20] medal = {1: "🥇", 2: "🥈", 3: "🥉"}.get(rank, f"{rank}.")
for rank, user in enumerate(sorted_users[:top_n], 1): lines.append(f"{medal} {user['nickname']}{user['total_points']} 分(连续{user['consecutive_days']}天)")
medal = {1: "🥇", 2: "🥈", 3: "🥉"}.get(rank, f"{rank}.") return "\n".join(lines)
lines.append(f"{medal} {user['nickname']}{user['total_points']} 分(连续{user['consecutive_days']}天)")
return "\n".join(lines)
def cleanup_old_history(data: dict[str, Any]) -> None:
"""清理超过 MAX_HISTORY_DAYS 的历史记录。"""
def cleanup_old_history(data: dict[str, Any]) -> None: today = date.today()
"""清理超过 MAX_HISTORY_DAYS 的历史记录。""" cutoff = (today - timedelta(days=MAX_HISTORY_DAYS)).isoformat()
today = date.today() for uid, user in data["users"].items():
threshold = today.isoformat() user["history"] = [d for d in user["history"] if d >= cutoff]
for uid, user in data["users"].items(): user["points_history"] = [p for p in user["points_history"] if p.split("|")[0] >= cutoff]
cutoff = (today - __import__("datetime").timedelta(days=MAX_HISTORY_DAYS)).isoformat()
user["history"] = [d for d in user["history"] if d >= cutoff]
user["points_history"] = [p for p in user["points_history"] if p.split("|")[0] >= cutoff] def reset_user(user_id: str) -> str:
"""重置用户数据(管理员功能)。"""
data = load_data()
def reset_user(user_id: str) -> str: if user_id in data["users"]:
"""重置用户数据(管理员功能)。""" del data["users"][user_id]
data = load_data() save_data(data)
if user_id in data["users"]: return f"✅ 已重置用户 {user_id} 的签到数据"
del data["users"][user_id] return f"❌ 用户 {user_id} 不存在"
save_data(data)
return f"✅ 已重置用户 {user_id} 的签到数据"
return f"❌ 用户 {user_id} 不存在" def main() -> int:
"""主入口,根据命令行参数执行不同操作。
def main() -> int: 用法:
"""主入口,根据命令行参数执行不同操作。 python qiandao.py checkin <user_id> [nickname]
python qiandao.py query <user_id> [nickname]
用法: python qiandao.py rank [top_n]
python qiandao.py checkin <user_id> [nickname] python qiandao.py reset <user_id>
python qiandao.py query <user_id> [nickname] """
python qiandao.py rank [top_n] args = sys.argv[1:]
python qiandao.py reset <user_id> if not args:
""" print("用法: python qiandao.py <命令> [参数...]")
args = sys.argv[1:] print("命令: checkin, query, rank, reset")
if not args: return 1
print("用法: python qiandao.py <命令> [参数...]")
print("命令: checkin, query, rank, reset") cmd = args[0]
return 1
if cmd == "checkin":
cmd = args[0] if len(args) < 2:
print("请提供 user_id")
if cmd == "checkin": return 1
if len(args) < 2: uid = args[1]
print("请提供 user_id") nickname = args[2] if len(args) > 2 else uid
return 1 print(do_checkin(uid, nickname))
uid = args[1]
nickname = args[2] if len(args) > 2 else uid elif cmd == "query":
print(do_checkin(uid, nickname)) if len(args) < 2:
print("请提供 user_id")
elif cmd == "query": return 1
if len(args) < 2: uid = args[1]
print("请提供 user_id") nickname = args[2] if len(args) > 2 else uid
return 1 print(query_user(uid, nickname))
uid = args[1]
nickname = args[2] if len(args) > 2 else uid elif cmd == "rank":
print(query_user(uid, nickname)) top_n = int(args[1]) if len(args) > 1 else 10
print(leaderboard(top_n))
elif cmd == "rank":
top_n = int(args[1]) if len(args) > 1 else 10 elif cmd == "reset":
print(leaderboard(top_n)) if len(args) < 2:
print("请提供 user_id")
elif cmd == "reset": return 1
if len(args) < 2: print(reset_user(args[1]))
print("请提供 user_id")
return 1 else:
print(reset_user(args[1])) print(f"未知命令: {cmd}")
return 1
else:
print(f"未知命令: {cmd}") return 0
return 1
return 0 if __name__ == "__main__":
try:
raise SystemExit(main())
if __name__ == "__main__": except SystemExit:
try: raise
raise SystemExit(main()) except Exception:
except SystemExit: traceback.print_exc(file=sys.stdout)
raise raise SystemExit(1)
except Exception:
traceback.print_exc(file=sys.stdout)
raise SystemExit(1)