250 lines
8.1 KiB
Python
250 lines
8.1 KiB
Python
#!/usr/bin/env python3
|
|
"""签到游戏插件 - 支持每日签到、连续签到奖励、排行榜"""
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import os
|
|
import sys
|
|
import time
|
|
import traceback
|
|
from datetime import date, datetime
|
|
from typing import Any
|
|
|
|
sys.stderr = sys.stdout
|
|
|
|
# ---------- 数据存储 ----------
|
|
DATA_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "data")
|
|
DATA_FILE = os.path.join(DATA_DIR, "qiandao.json")
|
|
os.makedirs(DATA_DIR, exist_ok=True)
|
|
|
|
# ---------- 签到奖励配置 ----------
|
|
BASE_POINTS = 10 # 每日基础签到积分
|
|
CONSECUTIVE_BONUS = { # 连续签到额外奖励
|
|
3: 5, # 连续3天额外+5
|
|
7: 15, # 连续7天额外+15
|
|
15: 30, # 连续15天额外+30
|
|
30: 80, # 连续30天额外+80
|
|
}
|
|
MAX_HISTORY_DAYS = 365 # 保留签到历史的天数
|
|
|
|
|
|
def load_data() -> dict[str, Any]:
|
|
"""从 JSON 文件加载用户数据。"""
|
|
if os.path.exists(DATA_FILE):
|
|
try:
|
|
with open(DATA_FILE, encoding="utf-8") as f:
|
|
return json.load(f)
|
|
except (json.JSONDecodeError, OSError):
|
|
pass
|
|
return {"users": {}, "last_cleanup": ""}
|
|
|
|
|
|
def save_data(data: dict[str, Any]) -> None:
|
|
"""保存用户数据到 JSON 文件。"""
|
|
with open(DATA_FILE, "w", encoding="utf-8") as f:
|
|
json.dump(data, f, ensure_ascii=False, indent=2)
|
|
|
|
|
|
def today_str() -> str:
|
|
return date.today().isoformat()
|
|
|
|
|
|
def get_or_create_user(data: dict[str, Any], user_id: str, nickname: str) -> dict[str, Any]:
|
|
"""获取或创建用户数据。"""
|
|
users = data["users"]
|
|
if user_id not in users:
|
|
users[user_id] = {
|
|
"nickname": nickname,
|
|
"total_points": 0,
|
|
"consecutive_days": 0,
|
|
"last_checkin": "",
|
|
"history": [], # 签到日期列表 ["2026-05-01", ...]
|
|
"points_history": [], # [(日期, 获得积分), ...]
|
|
}
|
|
else:
|
|
# 更新昵称
|
|
if nickname:
|
|
users[user_id]["nickname"] = nickname
|
|
return users[user_id]
|
|
|
|
|
|
def do_checkin(user_id: str, nickname: str = "") -> str:
|
|
"""执行签到,返回回复文本。"""
|
|
data = load_data()
|
|
user = get_or_create_user(data, user_id, nickname)
|
|
today = today_str()
|
|
|
|
# ---------- 已签到 ----------
|
|
if today in user["history"]:
|
|
return (
|
|
f"⚠️ {user['nickname']},你今天已经签过到啦!\n"
|
|
f"📅 总积分:{user['total_points']} 分\n"
|
|
f"🔥 连续签到:{user['consecutive_days']} 天"
|
|
)
|
|
|
|
# ---------- 计算连续签到 ----------
|
|
yesterday = date.fromisoformat(today)
|
|
yesterday_str = yesterday.isoformat()
|
|
|
|
if user["last_checkin"] == yesterday_str:
|
|
user["consecutive_days"] += 1
|
|
else:
|
|
user["consecutive_days"] = 1
|
|
|
|
# ---------- 计算积分 ----------
|
|
earned = BASE_POINTS
|
|
bonus_reason = []
|
|
|
|
# 连续签到额外奖励
|
|
for threshold, bonus in sorted(CONSECUTIVE_BONUS.items()):
|
|
if user["consecutive_days"] == threshold:
|
|
earned += bonus
|
|
bonus_reason.append(f"🎉 连续签到 {threshold} 天奖励 +{bonus} 分")
|
|
break # 只触发最近的里程碑
|
|
|
|
# 如果连续天数超过最大里程碑但又不是正好等于,奖励固定值
|
|
milestones = sorted(CONSECUTIVE_BONUS.keys())
|
|
if user["consecutive_days"] > milestones[-1]:
|
|
# 超过最大里程碑的,每多一天额外+1
|
|
extra_days = user["consecutive_days"] - milestones[-1]
|
|
earned += extra_days
|
|
bonus_reason.append(f"🔥 长期签到奖励 +{extra_days} 分")
|
|
|
|
user["total_points"] += earned
|
|
user["last_checkin"] = today
|
|
user["history"].append(today)
|
|
user["points_history"].append(f"{today}|{earned}")
|
|
|
|
# ---------- 清理超期历史 ----------
|
|
cleanup_old_history(data)
|
|
|
|
save_data(data)
|
|
|
|
# ---------- 构建回复 ----------
|
|
lines = [
|
|
f"✅ 签到成功!{user['nickname']}",
|
|
f"📅 获得 {earned} 积分(基础 {BASE_POINTS} 分)",
|
|
]
|
|
lines.extend(bonus_reason)
|
|
lines.append(f"🔥 已连续签到 {user['consecutive_days']} 天")
|
|
lines.append(f"💰 总积分:{user['total_points']}")
|
|
return "\n".join(lines)
|
|
|
|
|
|
def query_user(user_id: str, nickname: str = "") -> str:
|
|
"""查询用户签到状态。"""
|
|
data = load_data()
|
|
if user_id not in data["users"]:
|
|
return f"❌ {nickname or user_id} 还没有签到记录,发送「签到」开始吧!"
|
|
user = data["users"][user_id]
|
|
if nickname:
|
|
user["nickname"] = nickname
|
|
today = today_str()
|
|
checked = "✅ 已签到" if today in user["history"] else "❌ 未签到"
|
|
return (
|
|
f"📊 {user['nickname']} 的签到信息\n"
|
|
f"{'━' * 20}\n"
|
|
f"📅 今日状态:{checked}\n"
|
|
f"💰 总积分:{user['total_points']} 分\n"
|
|
f"🔥 连续签到:{user['consecutive_days']} 天\n"
|
|
f"📆 累计签到:{len(user['history'])} 天"
|
|
)
|
|
|
|
|
|
def leaderboard(top_n: int = 10) -> str:
|
|
"""返回积分排行榜。"""
|
|
data = load_data()
|
|
sorted_users = sorted(
|
|
data["users"].values(),
|
|
key=lambda u: u["total_points"],
|
|
reverse=True,
|
|
)
|
|
if not sorted_users:
|
|
return "📭 还没有人签到,快发送「签到」成为第一名吧!"
|
|
|
|
lines = ["🏆 签到排行榜 🏆", "━" * 20]
|
|
for rank, user in enumerate(sorted_users[:top_n], 1):
|
|
medal = {1: "🥇", 2: "🥈", 3: "🥉"}.get(rank, f"{rank}.")
|
|
lines.append(f"{medal} {user['nickname']} — {user['total_points']} 分(连续{user['consecutive_days']}天)")
|
|
return "\n".join(lines)
|
|
|
|
|
|
def cleanup_old_history(data: dict[str, Any]) -> None:
|
|
"""清理超过 MAX_HISTORY_DAYS 的历史记录。"""
|
|
today = date.today()
|
|
threshold = today.isoformat()
|
|
for uid, user in data["users"].items():
|
|
cutoff = (today - __import__("datetime").timedelta(days=MAX_HISTORY_DAYS)).isoformat()
|
|
user["history"] = [d for d in user["history"] if d >= cutoff]
|
|
user["points_history"] = [p for p in user["points_history"] if p.split("|")[0] >= cutoff]
|
|
|
|
|
|
def reset_user(user_id: str) -> str:
|
|
"""重置用户数据(管理员功能)。"""
|
|
data = load_data()
|
|
if user_id in data["users"]:
|
|
del data["users"][user_id]
|
|
save_data(data)
|
|
return f"✅ 已重置用户 {user_id} 的签到数据"
|
|
return f"❌ 用户 {user_id} 不存在"
|
|
|
|
|
|
def main() -> int:
|
|
"""主入口,根据命令行参数执行不同操作。
|
|
|
|
用法:
|
|
python qiandao.py checkin <user_id> [nickname]
|
|
python qiandao.py query <user_id> [nickname]
|
|
python qiandao.py rank [top_n]
|
|
python qiandao.py reset <user_id>
|
|
"""
|
|
args = sys.argv[1:]
|
|
if not args:
|
|
print("用法: python qiandao.py <命令> [参数...]")
|
|
print("命令: checkin, query, rank, reset")
|
|
return 1
|
|
|
|
cmd = args[0]
|
|
|
|
if cmd == "checkin":
|
|
if len(args) < 2:
|
|
print("请提供 user_id")
|
|
return 1
|
|
uid = args[1]
|
|
nickname = args[2] if len(args) > 2 else uid
|
|
print(do_checkin(uid, nickname))
|
|
|
|
elif cmd == "query":
|
|
if len(args) < 2:
|
|
print("请提供 user_id")
|
|
return 1
|
|
uid = args[1]
|
|
nickname = args[2] if len(args) > 2 else uid
|
|
print(query_user(uid, nickname))
|
|
|
|
elif cmd == "rank":
|
|
top_n = int(args[1]) if len(args) > 1 else 10
|
|
print(leaderboard(top_n))
|
|
|
|
elif cmd == "reset":
|
|
if len(args) < 2:
|
|
print("请提供 user_id")
|
|
return 1
|
|
print(reset_user(args[1]))
|
|
|
|
else:
|
|
print(f"未知命令: {cmd}")
|
|
return 1
|
|
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
try:
|
|
raise SystemExit(main())
|
|
except SystemExit:
|
|
raise
|
|
except Exception:
|
|
traceback.print_exc(file=sys.stdout)
|
|
raise SystemExit(1)
|