|
|
@@ -0,0 +1,1095 @@
|
|
|
+#!/usr/bin/env python3
|
|
|
+# -*- coding: utf-8 -*-
|
|
|
+
|
|
|
+"""
|
|
|
+Module Name: disbot-a4
|
|
|
+======================
|
|
|
+
|
|
|
+Description:
|
|
|
+ disbot-a4 — py-cord (Python 3.12) single-file Discord bot with:
|
|
|
+
|
|
|
+ - Public "status panel" showing pickable roles, member counts, and
|
|
|
+ (inactive)/(full) labels
|
|
|
+ - Personal role picking via /roles (DM personalized panel with toggle
|
|
|
+ buttons)
|
|
|
+ - User commands: /role-list, /role-pick, /role-unpick, /role-toggle,
|
|
|
+ /role-help
|
|
|
+ - Admin commands (all hyphen names): post status panel, add/remove
|
|
|
+ pickables, set limits, set inactivity days, set refresh minutes,
|
|
|
+ list users, list config, add/remove user to/from role
|
|
|
+ - Daily inactivity cleanup removing pickable roles from inactive users
|
|
|
+ - Auto-refresh of all public panels and per-user DM panels at a configurable interval
|
|
|
+ - SQLite persistence (settings, pickable roles, activity, panel tracking)
|
|
|
+
|
|
|
+
|
|
|
+Author:
|
|
|
+ Lari Natri
|
|
|
+
|
|
|
+License:
|
|
|
+ MIT License
|
|
|
+
|
|
|
+Version:
|
|
|
+ 0.0.1
|
|
|
+
|
|
|
+Notes:
|
|
|
+ Depends on py-cord
|
|
|
+"""
|
|
|
+
|
|
|
+import asyncio
|
|
|
+import logging
|
|
|
+import os
|
|
|
+import sqlite3
|
|
|
+import time
|
|
|
+from typing import Optional, List, Dict, Tuple
|
|
|
+
|
|
|
+import discord
|
|
|
+from discord.ext import tasks
|
|
|
+
|
|
|
+# ---------- Config & Logging ----------
|
|
|
+
|
|
|
+LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper()
|
|
|
+logging.basicConfig(level=LOG_LEVEL, format="%(asctime)s [%(levelname)s] %(message)s")
|
|
|
+log = logging.getLogger("disbot-a4")
|
|
|
+
|
|
|
+TOKEN_PATH = "/run/secrets/discord_token"
|
|
|
+with open(TOKEN_PATH, "r", encoding="utf-8") as f:
|
|
|
+ DISCORD_TOKEN = f.read().strip()
|
|
|
+
|
|
|
+
|
|
|
+def getenv_int(name: str) -> Optional[int]:
|
|
|
+ """Parse optional int env var; return None if missing/empty/invalid."""
|
|
|
+ v = os.getenv(name)
|
|
|
+ if v is None or v.strip() == "":
|
|
|
+ return None
|
|
|
+ try:
|
|
|
+ return int(v)
|
|
|
+ except ValueError:
|
|
|
+ log.warning("Invalid int for %s=%r; ignoring.", name, v)
|
|
|
+ return None
|
|
|
+
|
|
|
+
|
|
|
+GUILD_ID = getenv_int("GUILD_ID")
|
|
|
+ROLE_PANEL_CHANNEL_ID = getenv_int("ROLE_PANEL_CHANNEL_ID")
|
|
|
+DATABASE_PATH = os.getenv("DATABASE_PATH", "/app/data/bot.db")
|
|
|
+DEFAULT_INACTIVITY_DAYS = int(os.getenv("INACTIVITY_DAYS_DEFAULT", "14"))
|
|
|
+DEFAULT_REFRESH_MINUTES = int(os.getenv("REFRESH_MINUTES_DEFAULT", "5")) # admin can override
|
|
|
+
|
|
|
+# ------------- Database Helpers -------------
|
|
|
+
|
|
|
+SCHEMA = """
|
|
|
+PRAGMA journal_mode=WAL;
|
|
|
+CREATE TABLE IF NOT EXISTS settings (
|
|
|
+ guild_id INTEGER PRIMARY KEY,
|
|
|
+ inactivity_days INTEGER NOT NULL,
|
|
|
+ refresh_minutes INTEGER NOT NULL
|
|
|
+);
|
|
|
+CREATE TABLE IF NOT EXISTS roles (
|
|
|
+ guild_id INTEGER NOT NULL,
|
|
|
+ role_id INTEGER PRIMARY KEY,
|
|
|
+ min_users INTEGER,
|
|
|
+ max_users INTEGER
|
|
|
+);
|
|
|
+CREATE TABLE IF NOT EXISTS activity (
|
|
|
+ guild_id INTEGER NOT NULL,
|
|
|
+ user_id INTEGER NOT NULL,
|
|
|
+ last_active_ts INTEGER NOT NULL,
|
|
|
+ PRIMARY KEY (guild_id, user_id)
|
|
|
+);
|
|
|
+CREATE TABLE IF NOT EXISTS panels (
|
|
|
+ guild_id INTEGER NOT NULL,
|
|
|
+ channel_id INTEGER NOT NULL,
|
|
|
+ message_id INTEGER NOT NULL,
|
|
|
+ PRIMARY KEY (guild_id, message_id)
|
|
|
+);
|
|
|
+CREATE TABLE IF NOT EXISTS dm_panels (
|
|
|
+ guild_id INTEGER NOT NULL,
|
|
|
+ user_id INTEGER NOT NULL,
|
|
|
+ channel_id INTEGER NOT NULL,
|
|
|
+ message_id INTEGER NOT NULL,
|
|
|
+ PRIMARY KEY (guild_id, user_id)
|
|
|
+);
|
|
|
+"""
|
|
|
+
|
|
|
+def db_connect():
|
|
|
+ os.makedirs(os.path.dirname(DATABASE_PATH), exist_ok=True)
|
|
|
+ conn = sqlite3.connect(DATABASE_PATH, isolation_level=None) # autocommit
|
|
|
+ conn.row_factory = sqlite3.Row
|
|
|
+ return conn
|
|
|
+
|
|
|
+def _ensure_settings_columns(conn: sqlite3.Connection):
|
|
|
+ cols = {r["name"] for r in conn.execute("PRAGMA table_info(settings)")}
|
|
|
+ if "refresh_minutes" not in cols:
|
|
|
+ conn.execute("ALTER TABLE settings ADD COLUMN refresh_minutes INTEGER NOT NULL DEFAULT ?", (DEFAULT_REFRESH_MINUTES,))
|
|
|
+
|
|
|
+def db_init():
|
|
|
+ with db_connect() as conn:
|
|
|
+ conn.executescript(SCHEMA)
|
|
|
+ _ensure_settings_columns(conn)
|
|
|
+ if GUILD_ID:
|
|
|
+ row = conn.execute("SELECT inactivity_days, refresh_minutes FROM settings WHERE guild_id=?", (GUILD_ID,)).fetchone()
|
|
|
+ if not row:
|
|
|
+ conn.execute(
|
|
|
+ "INSERT INTO settings (guild_id, inactivity_days, refresh_minutes) VALUES (?,?,?)",
|
|
|
+ (GUILD_ID, DEFAULT_INACTIVITY_DAYS, DEFAULT_REFRESH_MINUTES),
|
|
|
+ )
|
|
|
+
|
|
|
+def get_inactivity_days(conn, guild_id: int) -> int:
|
|
|
+ row = conn.execute("SELECT inactivity_days FROM settings WHERE guild_id=?", (guild_id,)).fetchone()
|
|
|
+ return int(row["inactivity_days"]) if row else DEFAULT_INACTIVITY_DAYS
|
|
|
+
|
|
|
+def set_inactivity_days(conn, guild_id: int, days: int):
|
|
|
+ conn.execute("""
|
|
|
+ INSERT INTO settings (guild_id, inactivity_days, refresh_minutes) VALUES (?,?,COALESCE((SELECT refresh_minutes FROM settings WHERE guild_id=?), ?))
|
|
|
+ ON CONFLICT(guild_id) DO UPDATE SET inactivity_days=excluded.inactivity_days
|
|
|
+ """, (guild_id, days, guild_id, DEFAULT_REFRESH_MINUTES))
|
|
|
+
|
|
|
+def get_refresh_minutes(conn, guild_id: int) -> int:
|
|
|
+ row = conn.execute("SELECT refresh_minutes FROM settings WHERE guild_id=?", (guild_id,)).fetchone()
|
|
|
+ return int(row["refresh_minutes"]) if row and row["refresh_minutes"] is not None else DEFAULT_REFRESH_MINUTES
|
|
|
+
|
|
|
+def set_refresh_minutes(conn, guild_id: int, minutes: int):
|
|
|
+ conn.execute("""
|
|
|
+ INSERT INTO settings (guild_id, inactivity_days, refresh_minutes) VALUES (?, COALESCE((SELECT inactivity_days FROM settings WHERE guild_id=?), ?), ?)
|
|
|
+ ON CONFLICT(guild_id) DO UPDATE SET refresh_minutes=excluded.refresh_minutes
|
|
|
+ """, (guild_id, guild_id, DEFAULT_INACTIVITY_DAYS, minutes))
|
|
|
+
|
|
|
+def set_activity(conn, guild_id: int, user_id: int, ts: Optional[int] = None):
|
|
|
+ ts = ts or int(time.time())
|
|
|
+ conn.execute("""
|
|
|
+ INSERT INTO activity (guild_id, user_id, last_active_ts) VALUES (?,?,?)
|
|
|
+ ON CONFLICT(guild_id, user_id) DO UPDATE SET last_active_ts=excluded.last_active_ts
|
|
|
+ """, (guild_id, user_id, ts))
|
|
|
+
|
|
|
+def get_last_active(conn, guild_id: int, user_id: int) -> Optional[int]:
|
|
|
+ row = conn.execute("SELECT last_active_ts FROM activity WHERE guild_id=? AND user_id=?", (guild_id, user_id)).fetchone()
|
|
|
+ return int(row["last_active_ts"]) if row else None
|
|
|
+
|
|
|
+def add_selectable_role(conn, guild_id: int, role_id: int, min_users: Optional[int], max_users: Optional[int]):
|
|
|
+ conn.execute("""
|
|
|
+ INSERT INTO roles (guild_id, role_id, min_users, max_users) VALUES (?,?,?,?)
|
|
|
+ ON CONFLICT(role_id) DO UPDATE SET min_users=excluded.min_users, max_users=excluded.max_users
|
|
|
+ """, (guild_id, role_id, min_users, max_users))
|
|
|
+
|
|
|
+def remove_selectable_role(conn, guild_id: int, role_id: int) -> bool:
|
|
|
+ cur = conn.execute("DELETE FROM roles WHERE guild_id=? AND role_id=?", (guild_id, role_id))
|
|
|
+ return cur.rowcount > 0
|
|
|
+
|
|
|
+def list_selectable_roles(conn, guild_id: int) -> List[sqlite3.Row]:
|
|
|
+ return list(conn.execute("SELECT role_id, min_users, max_users FROM roles WHERE guild_id=? ORDER BY role_id", (guild_id,)))
|
|
|
+
|
|
|
+def set_role_limits(conn, guild_id: int, role_id: int, min_users: Optional[int], max_users: Optional[int]):
|
|
|
+ conn.execute("""
|
|
|
+ UPDATE roles SET min_users=?, max_users=? WHERE guild_id=? AND role_id=?
|
|
|
+ """, (min_users, max_users, guild_id, role_id))
|
|
|
+
|
|
|
+def add_panel_record(conn, guild_id: int, channel_id: int, message_id: int):
|
|
|
+ conn.execute("""
|
|
|
+ INSERT OR IGNORE INTO panels (guild_id, channel_id, message_id) VALUES (?,?,?)
|
|
|
+ """, (guild_id, channel_id, message_id))
|
|
|
+
|
|
|
+def list_panel_records(conn, guild_id: int) -> List[sqlite3.Row]:
|
|
|
+ return list(conn.execute("SELECT channel_id, message_id FROM panels WHERE guild_id=?", (guild_id,)))
|
|
|
+
|
|
|
+def remove_panel_record(conn, guild_id: int, message_id: int):
|
|
|
+ conn.execute("DELETE FROM panels WHERE guild_id=? AND message_id=?", (guild_id, message_id))
|
|
|
+
|
|
|
+def upsert_dm_panel(conn, guild_id: int, user_id: int, channel_id: int, message_id: int):
|
|
|
+ conn.execute("""
|
|
|
+ INSERT INTO dm_panels (guild_id, user_id, channel_id, message_id) VALUES (?,?,?,?)
|
|
|
+ ON CONFLICT(guild_id, user_id) DO UPDATE SET channel_id=excluded.channel_id, message_id=excluded.message_id
|
|
|
+ """, (guild_id, user_id, channel_id, message_id))
|
|
|
+
|
|
|
+def list_dm_panels(conn, guild_id: int) -> List[sqlite3.Row]:
|
|
|
+ return list(conn.execute("SELECT user_id, channel_id, message_id FROM dm_panels WHERE guild_id=?", (guild_id,)))
|
|
|
+
|
|
|
+def remove_dm_panel(conn, guild_id: int, user_id: int):
|
|
|
+ conn.execute("DELETE FROM dm_panels WHERE guild_id=? AND user_id=?", (guild_id, user_id))
|
|
|
+
|
|
|
+# ------------- Discord Bot Setup (py-cord) -------------
|
|
|
+
|
|
|
+intents = discord.Intents.default()
|
|
|
+intents.guilds = True
|
|
|
+intents.members = True # enable in Developer Portal (Server Members Intent)
|
|
|
+intents.messages = True
|
|
|
+intents.message_content = True # optional; used for activity timestamps
|
|
|
+
|
|
|
+bot = discord.Bot(intents=intents)
|
|
|
+
|
|
|
+# ------------- Role helpers & UI building -------------
|
|
|
+
|
|
|
+MAX_COMPONENTS_PER_VIEW = 25
|
|
|
+
|
|
|
+def role_label_with_status(role: discord.Role, count: int, min_users: Optional[int], max_users: Optional[int]) -> Tuple[str, Optional[str]]:
|
|
|
+ """Return (base label, status_suffix)."""
|
|
|
+ label = role.name
|
|
|
+ status = None
|
|
|
+ if min_users is not None and count < min_users:
|
|
|
+ status = "inactive"
|
|
|
+ if max_users is not None and count >= max_users and status is None:
|
|
|
+ status = "full"
|
|
|
+ return label, status
|
|
|
+
|
|
|
+def make_status_lines(guild: discord.Guild, selectable_rows: List[sqlite3.Row]) -> List[str]:
|
|
|
+ lines: List[str] = []
|
|
|
+ for row in selectable_rows[:1000]: # guard
|
|
|
+ role = guild.get_role(int(row["role_id"]))
|
|
|
+ if not role:
|
|
|
+ continue
|
|
|
+ count = sum(1 for m in guild.members if role in m.roles)
|
|
|
+ base, status = role_label_with_status(role, count, row["min_users"], row["max_users"])
|
|
|
+ status_str = f" ({status})" if status else ""
|
|
|
+ lines.append(f"- {role.mention} — {count} member(s){status_str}")
|
|
|
+ return lines or ["(No pickable roles configured.)"]
|
|
|
+
|
|
|
+def dm_header_for_guild(guild: discord.Guild) -> str:
|
|
|
+ return f"**Server:** {guild.name} (ID: {guild.id})"
|
|
|
+
|
|
|
+def is_admin(member: Optional[discord.Member], channel: Optional[discord.abc.GuildChannel]) -> bool:
|
|
|
+ if not member or not getattr(member, "guild", None):
|
|
|
+ return False
|
|
|
+ if member.id == member.guild.owner_id:
|
|
|
+ return True
|
|
|
+ try:
|
|
|
+ if channel and hasattr(channel, "permissions_for"):
|
|
|
+ perms = channel.permissions_for(member)
|
|
|
+ return bool(perms.administrator or perms.manage_guild or perms.manage_roles)
|
|
|
+ except Exception:
|
|
|
+ pass
|
|
|
+ try:
|
|
|
+ gp = member.guild_permissions
|
|
|
+ return bool(gp.administrator or gp.manage_guild or gp.manage_roles)
|
|
|
+ except Exception:
|
|
|
+ return False
|
|
|
+
|
|
|
+class RoleToggleButton(discord.ui.Button):
|
|
|
+ """Button used in personalized panels (DM) to toggle user's roles."""
|
|
|
+ def __init__(self, guild_id: int, role_id: int, label: str, has_role: bool):
|
|
|
+ # custom_id format embeds guild_id for DM interactions
|
|
|
+ super().__init__(
|
|
|
+ style=discord.ButtonStyle.primary if has_role else discord.ButtonStyle.secondary,
|
|
|
+ label=label,
|
|
|
+ custom_id=f"role:{guild_id}:{role_id}"
|
|
|
+ )
|
|
|
+
|
|
|
+ async def callback(self, interaction: discord.Interaction):
|
|
|
+ # Resolve guild & member from custom_id, because DM interactions lack interaction.guild
|
|
|
+ try:
|
|
|
+ _, gid_s, rid_s = self.custom_id.split(":")
|
|
|
+ gid = int(gid_s); rid = int(rid_s)
|
|
|
+ except Exception:
|
|
|
+ await interaction.response.send_message("Invalid button metadata.", ephemeral=True)
|
|
|
+ return
|
|
|
+
|
|
|
+ guild = interaction.guild or bot.get_guild(gid)
|
|
|
+ if guild is None:
|
|
|
+ await interaction.response.send_message("Cannot resolve the server context.", ephemeral=True)
|
|
|
+ return
|
|
|
+
|
|
|
+ member = interaction.user if isinstance(interaction.user, discord.Member) else guild.get_member(interaction.user.id)
|
|
|
+ if member is None:
|
|
|
+ try:
|
|
|
+ member = await guild.fetch_member(interaction.user.id)
|
|
|
+ except Exception:
|
|
|
+ await interaction.response.send_message("Could not resolve your member record.", ephemeral=True)
|
|
|
+ return
|
|
|
+
|
|
|
+ role = guild.get_role(rid)
|
|
|
+ if role is None:
|
|
|
+ await interaction.response.send_message("This role no longer exists.", ephemeral=True)
|
|
|
+ return
|
|
|
+
|
|
|
+ if not guild.me or role >= guild.me.top_role:
|
|
|
+ await interaction.response.send_message("I cannot manage that role due to role hierarchy/permissions.", ephemeral=True)
|
|
|
+ return
|
|
|
+
|
|
|
+ has_role = role in member.roles
|
|
|
+ try:
|
|
|
+ if has_role:
|
|
|
+ await member.remove_roles(role, reason="disbot-a4: user toggled off")
|
|
|
+ msg = f"Removed **{role.name}**."
|
|
|
+ else:
|
|
|
+ await member.add_roles(role, reason="disbot-a4: user toggled on")
|
|
|
+ msg = f"Added **{role.name}**."
|
|
|
+ except discord.Forbidden:
|
|
|
+ await interaction.response.send_message("I lack permissions to manage that role.", ephemeral=True)
|
|
|
+ return
|
|
|
+ except discord.HTTPException as e:
|
|
|
+ await interaction.response.send_message(f"Discord error: {e}", ephemeral=True)
|
|
|
+ return
|
|
|
+
|
|
|
+ with db_connect() as conn:
|
|
|
+ set_activity(conn, guild.id, member.id)
|
|
|
+
|
|
|
+ # Rebuild personalized view in the same DM (include server info in content)
|
|
|
+ v = await build_member_panel_view(guild, member)
|
|
|
+ content = f"{dm_header_for_guild(guild)}\nSelect or unselect your roles:"
|
|
|
+ try:
|
|
|
+ await interaction.response.edit_message(content=content, view=v)
|
|
|
+ except discord.InteractionResponded:
|
|
|
+ await interaction.edit_original_response(content=content, view=v)
|
|
|
+
|
|
|
+ # Trigger global refreshes (public panels + DM panels)
|
|
|
+ await refresh_all_public_panels(guild)
|
|
|
+ await refresh_all_dm_panels(guild)
|
|
|
+
|
|
|
+ await interaction.followup.send(msg, ephemeral=True)
|
|
|
+
|
|
|
+async def build_member_panel_view(guild: discord.Guild, member: discord.Member) -> discord.ui.View:
|
|
|
+ """Build personalized DM view with toggle buttons; button style indicates possession."""
|
|
|
+ with db_connect() as conn:
|
|
|
+ selectable = list_selectable_roles(conn, guild.id)
|
|
|
+
|
|
|
+ if not selectable:
|
|
|
+ view = discord.ui.View(timeout=None)
|
|
|
+ view.add_item(discord.ui.Button(label="No pickable roles configured.", disabled=True))
|
|
|
+ return view
|
|
|
+
|
|
|
+ # Ensure role cache
|
|
|
+ need_ids = [int(r["role_id"]) for r in selectable[:MAX_COMPONENTS_PER_VIEW]]
|
|
|
+ by_id = {r.id: r for r in getattr(guild, "roles", [])}
|
|
|
+ if any(rid not in by_id for rid in need_ids):
|
|
|
+ try:
|
|
|
+ roles = await guild.fetch_roles()
|
|
|
+ by_id = {r.id: r for r in roles}
|
|
|
+ except Exception:
|
|
|
+ pass
|
|
|
+
|
|
|
+ user_roles = {r.id for r in getattr(member, "roles", [])}
|
|
|
+ view = discord.ui.View(timeout=None)
|
|
|
+
|
|
|
+ for row in selectable[:MAX_COMPONENTS_PER_VIEW]:
|
|
|
+ role = by_id.get(int(row["role_id"]))
|
|
|
+ if role is None:
|
|
|
+ continue
|
|
|
+ count = sum(1 for m in guild.members if role in m.roles)
|
|
|
+ base, status = role_label_with_status(role, count, row["min_users"], row["max_users"])
|
|
|
+ label = f"{base}" + (f" ({status})" if status else "")
|
|
|
+ has_role = role.id in user_roles
|
|
|
+ view.add_item(RoleToggleButton(guild.id, role.id, label[:80], has_role))
|
|
|
+
|
|
|
+ class Refresh(discord.ui.Button):
|
|
|
+ def __init__(self):
|
|
|
+ super().__init__(style=discord.ButtonStyle.primary, label="Refresh", custom_id="refresh_member")
|
|
|
+
|
|
|
+ async def callback(self, interaction: discord.Interaction):
|
|
|
+ g = guild
|
|
|
+ member2 = interaction.user if isinstance(interaction.user, discord.Member) else (g.get_member(interaction.user.id) or await g.fetch_member(interaction.user.id))
|
|
|
+ v = await build_member_panel_view(g, member2)
|
|
|
+ content = f"{dm_header_for_guild(g)}\nSelect or unselect your roles:"
|
|
|
+ try:
|
|
|
+ await interaction.response.edit_message(content=content, view=v)
|
|
|
+ except discord.InteractionResponded:
|
|
|
+ await interaction.edit_original_response(content=content, view=v)
|
|
|
+
|
|
|
+ view.add_item(Refresh())
|
|
|
+ return view
|
|
|
+
|
|
|
+def build_public_status_content(guild: discord.Guild) -> str:
|
|
|
+ with db_connect() as conn:
|
|
|
+ rows = list_selectable_roles(conn, guild.id)
|
|
|
+ if not rows:
|
|
|
+ return ("**Role status**\n"
|
|
|
+ "(No pickable roles configured.)\n"
|
|
|
+ "Use **/role-admin-add-pickable** to add roles.\n")
|
|
|
+ lines = make_status_lines(guild, rows) # includes (inactive)/(full) markers when applicable
|
|
|
+ return ("**Role status**\n" +
|
|
|
+ "\n".join(lines) +
|
|
|
+ "\n\nUse **/roles** to open your personal role picker (in DMs).")
|
|
|
+
|
|
|
+class PublicRefreshButton(discord.ui.Button):
|
|
|
+ def __init__(self):
|
|
|
+ super().__init__(style=discord.ButtonStyle.primary, label="Refresh", custom_id="refresh_public")
|
|
|
+
|
|
|
+ async def callback(self, interaction: discord.Interaction):
|
|
|
+ content = build_public_status_content(interaction.guild)
|
|
|
+ try:
|
|
|
+ await interaction.response.edit_message(content=content, view=make_public_status_view())
|
|
|
+ except discord.InteractionResponded:
|
|
|
+ await interaction.edit_original_response(content=content, view=make_public_status_view())
|
|
|
+
|
|
|
+def make_public_status_view() -> discord.ui.View:
|
|
|
+ v = discord.ui.View(timeout=None)
|
|
|
+ v.add_item(PublicRefreshButton())
|
|
|
+ return v
|
|
|
+
|
|
|
+# --- Panel posting & tracking ---
|
|
|
+
|
|
|
+async def post_public_status_panel(guild: discord.Guild, channel) -> discord.Message:
|
|
|
+ if not isinstance(channel, (discord.TextChannel, discord.Thread)):
|
|
|
+ raise RuntimeError("Role status panel can only be posted in a text channel or thread.")
|
|
|
+ me = guild.me
|
|
|
+ perms = channel.permissions_for(me) if me else None
|
|
|
+ if not perms or not perms.view_channel or not perms.send_messages:
|
|
|
+ missing = []
|
|
|
+ if not perms or not perms.view_channel: missing.append("View Channel")
|
|
|
+ if not perms or not perms.send_messages: missing.append("Send Messages")
|
|
|
+ raise RuntimeError(f"Missing permissions in #{channel.name}: {', '.join(missing)}")
|
|
|
+
|
|
|
+ content = build_public_status_content(guild)
|
|
|
+ msg = await channel.send(content, view=make_public_status_view())
|
|
|
+
|
|
|
+ with db_connect() as conn:
|
|
|
+ add_panel_record(conn, guild.id, channel.id, msg.id)
|
|
|
+
|
|
|
+ return msg
|
|
|
+
|
|
|
+async def post_personal_dm_panel(guild: discord.Guild, user: discord.User | discord.Member) -> Optional[discord.Message]:
|
|
|
+ try:
|
|
|
+ dm = await user.create_dm()
|
|
|
+ except discord.Forbidden:
|
|
|
+ return None
|
|
|
+ member = user if isinstance(user, discord.Member) else guild.get_member(user.id)
|
|
|
+ if not member:
|
|
|
+ try:
|
|
|
+ member = await guild.fetch_member(user.id)
|
|
|
+ except Exception:
|
|
|
+ return None
|
|
|
+
|
|
|
+ view = await build_member_panel_view(guild, member)
|
|
|
+ content = f"{dm_header_for_guild(guild)}\nSelect or unselect your roles:"
|
|
|
+ msg = await dm.send(content, view=view)
|
|
|
+
|
|
|
+ with db_connect() as conn:
|
|
|
+ upsert_dm_panel(conn, guild.id, member.id, dm.id, msg.id)
|
|
|
+
|
|
|
+ return msg
|
|
|
+
|
|
|
+_last_public_refresh_ts: Dict[int, int] = {}
|
|
|
+_last_dm_refresh_ts: Dict[int, int] = {}
|
|
|
+
|
|
|
+async def refresh_all_public_panels(guild: discord.Guild, force: bool = False):
|
|
|
+ """Edit each stored public panel message to reflect latest counts/labels."""
|
|
|
+ now = int(time.time())
|
|
|
+ last = _last_public_refresh_ts.get(guild.id, 0)
|
|
|
+ with db_connect() as conn:
|
|
|
+ refresh_minutes = get_refresh_minutes(conn, guild.id)
|
|
|
+ if not force and now - last < refresh_minutes * 60:
|
|
|
+ return
|
|
|
+ _last_public_refresh_ts[guild.id] = now
|
|
|
+
|
|
|
+ with db_connect() as conn:
|
|
|
+ records = list_panel_records(conn, guild.id)
|
|
|
+
|
|
|
+ if not records:
|
|
|
+ return
|
|
|
+
|
|
|
+ content = build_public_status_content(guild)
|
|
|
+ view = make_public_status_view()
|
|
|
+
|
|
|
+ for rec in records:
|
|
|
+ ch = guild.get_channel(int(rec["channel_id"]))
|
|
|
+ if not isinstance(ch, (discord.TextChannel, discord.Thread)):
|
|
|
+ with db_connect() as conn:
|
|
|
+ remove_panel_record(conn, guild.id, int(rec["message_id"]))
|
|
|
+ continue
|
|
|
+ try:
|
|
|
+ msg = await ch.fetch_message(int(rec["message_id"]))
|
|
|
+ except discord.NotFound:
|
|
|
+ with db_connect() as conn:
|
|
|
+ remove_panel_record(conn, guild.id, int(rec["message_id"]))
|
|
|
+ continue
|
|
|
+ except discord.Forbidden:
|
|
|
+ with db_connect() as conn:
|
|
|
+ remove_panel_record(conn, guild.id, int(rec["message_id"]))
|
|
|
+ continue
|
|
|
+ except discord.HTTPException:
|
|
|
+ continue
|
|
|
+
|
|
|
+ try:
|
|
|
+ await msg.edit(content=content, view=view)
|
|
|
+ except discord.HTTPException:
|
|
|
+ pass
|
|
|
+
|
|
|
+async def refresh_all_dm_panels(guild: discord.Guild, force: bool = False):
|
|
|
+ """Edit each stored DM panel message to reflect latest counts & their state (includes server info in content)."""
|
|
|
+ now = int(time.time())
|
|
|
+ last = _last_dm_refresh_ts.get(guild.id, 0)
|
|
|
+ with db_connect() as conn:
|
|
|
+ refresh_minutes = get_refresh_minutes(conn, guild.id)
|
|
|
+ if not force and now - last < refresh_minutes * 60:
|
|
|
+ return
|
|
|
+ _last_dm_refresh_ts[guild.id] = now
|
|
|
+
|
|
|
+ with db_connect() as conn:
|
|
|
+ records = list_dm_panels(conn, guild.id)
|
|
|
+
|
|
|
+ for rec in records:
|
|
|
+ user_id = int(rec["user_id"])
|
|
|
+ ch_id = int(rec["channel_id"])
|
|
|
+ msg_id = int(rec["message_id"])
|
|
|
+ # Resolve member
|
|
|
+ member = guild.get_member(user_id)
|
|
|
+ if member is None:
|
|
|
+ try:
|
|
|
+ member = await guild.fetch_member(user_id)
|
|
|
+ except Exception:
|
|
|
+ # Member left; remove tracking
|
|
|
+ with db_connect() as conn:
|
|
|
+ remove_dm_panel(conn, guild.id, user_id)
|
|
|
+ continue
|
|
|
+ channel = bot.get_channel(ch_id)
|
|
|
+ if not channel:
|
|
|
+ with db_connect() as conn:
|
|
|
+ remove_dm_panel(conn, guild.id, user_id)
|
|
|
+ continue
|
|
|
+ try:
|
|
|
+ msg = await channel.fetch_message(msg_id)
|
|
|
+ except discord.NotFound:
|
|
|
+ with db_connect() as conn:
|
|
|
+ remove_dm_panel(conn, guild.id, user_id)
|
|
|
+ continue
|
|
|
+ except discord.Forbidden:
|
|
|
+ with db_connect() as conn:
|
|
|
+ remove_dm_panel(conn, guild.id, user_id)
|
|
|
+ continue
|
|
|
+ except discord.HTTPException:
|
|
|
+ continue
|
|
|
+
|
|
|
+ view = await build_member_panel_view(guild, member)
|
|
|
+ content = f"{dm_header_for_guild(guild)}\nSelect or unselect your roles:"
|
|
|
+ try:
|
|
|
+ await msg.edit(content=content, view=view)
|
|
|
+ except discord.HTTPException:
|
|
|
+ pass
|
|
|
+
|
|
|
+# ------------- Activity Tracking -------------
|
|
|
+
|
|
|
+@bot.event
|
|
|
+async def on_message(message: discord.Message):
|
|
|
+ if message.guild and not message.author.bot:
|
|
|
+ with db_connect() as conn:
|
|
|
+ set_activity(conn, message.guild.id, message.author.id)
|
|
|
+
|
|
|
+# ------------- Slash Commands -------------
|
|
|
+
|
|
|
+def guild_ids_kw():
|
|
|
+ return {"guild_ids": [GUILD_ID]} if GUILD_ID else {}
|
|
|
+
|
|
|
+ADMIN_PERMS = discord.Permissions(administrator=True)
|
|
|
+
|
|
|
+# --- Admin (hyphen names) ---
|
|
|
+
|
|
|
+@bot.slash_command(
|
|
|
+ name="role-admin-add-pickable",
|
|
|
+ description="Add or update a pickable role with optional min/max.",
|
|
|
+ default_member_permissions=ADMIN_PERMS,
|
|
|
+ **guild_ids_kw()
|
|
|
+)
|
|
|
+async def role_admin_add_pickable(
|
|
|
+ ctx: discord.ApplicationContext,
|
|
|
+ role: discord.Option(discord.Role, "Role to add/update"),
|
|
|
+ min_users: discord.Option(int, "Minimum users hint", required=False) = None,
|
|
|
+ max_users: discord.Option(int, "Maximum users hint", required=False) = None
|
|
|
+):
|
|
|
+ assert ctx.guild is not None
|
|
|
+ with db_connect() as conn:
|
|
|
+ add_selectable_role(conn, ctx.guild.id, role.id, min_users, max_users)
|
|
|
+ await ctx.respond(f"Configured pickable role: **{role.name}** (min={min_users}, max={max_users}).", ephemeral=True)
|
|
|
+ await refresh_all_public_panels(ctx.guild, force=True)
|
|
|
+ await refresh_all_dm_panels(ctx.guild, force=True)
|
|
|
+
|
|
|
+@bot.slash_command(
|
|
|
+ name="role-admin-remove-pickable",
|
|
|
+ description="Remove a pickable role.",
|
|
|
+ default_member_permissions=ADMIN_PERMS,
|
|
|
+ **guild_ids_kw()
|
|
|
+)
|
|
|
+async def role_admin_remove_pickable(
|
|
|
+ ctx: discord.ApplicationContext,
|
|
|
+ role: discord.Option(discord.Role, "Role to remove")
|
|
|
+):
|
|
|
+ assert ctx.guild is not None
|
|
|
+ with db_connect() as conn:
|
|
|
+ removed = remove_selectable_role(conn, ctx.guild.id, role.id)
|
|
|
+ if removed:
|
|
|
+ await ctx.respond(f"Removed pickable role: **{role.name}**.", ephemeral=True)
|
|
|
+ else:
|
|
|
+ await ctx.respond(f"**{role.name}** was not in the pickable list.", ephemeral=True)
|
|
|
+ await refresh_all_public_panels(ctx.guild, force=True)
|
|
|
+ await refresh_all_dm_panels(ctx.guild, force=True)
|
|
|
+
|
|
|
+@bot.slash_command(
|
|
|
+ name="role-admin-set-role-limits",
|
|
|
+ description="Adjust min/max hints for a pickable role.",
|
|
|
+ default_member_permissions=ADMIN_PERMS,
|
|
|
+ **guild_ids_kw()
|
|
|
+)
|
|
|
+async def role_admin_set_role_limits(
|
|
|
+ ctx: discord.ApplicationContext,
|
|
|
+ role: discord.Option(discord.Role, "Role to update"),
|
|
|
+ min_users: discord.Option(int, "Minimum users hint", required=False) = None,
|
|
|
+ max_users: discord.Option(int, "Maximum users hint", required=False) = None
|
|
|
+):
|
|
|
+ assert ctx.guild is not None
|
|
|
+ with db_connect() as conn:
|
|
|
+ set_role_limits(conn, ctx.guild.id, role.id, min_users, max_users)
|
|
|
+ await ctx.respond(f"Updated **{role.name}** limits: min={min_users}, max={max_users}.", ephemeral=True)
|
|
|
+ await refresh_all_public_panels(ctx.guild, force=True)
|
|
|
+ await refresh_all_dm_panels(ctx.guild, force=True)
|
|
|
+
|
|
|
+@bot.slash_command(
|
|
|
+ name="role-admin-set-inactivity-days",
|
|
|
+ description="Set inactivity timeout in days.",
|
|
|
+ default_member_permissions=ADMIN_PERMS,
|
|
|
+ **guild_ids_kw()
|
|
|
+)
|
|
|
+async def role_admin_set_inactivity_days(
|
|
|
+ ctx: discord.ApplicationContext,
|
|
|
+ days: discord.Option(int, "Days (>=1)")
|
|
|
+):
|
|
|
+ if days < 1:
|
|
|
+ await ctx.respond("Days must be >= 1.", ephemeral=True)
|
|
|
+ return
|
|
|
+ assert ctx.guild is not None
|
|
|
+ with db_connect() as conn:
|
|
|
+ set_inactivity_days(conn, ctx.guild.id, days)
|
|
|
+ await ctx.respond(f"Inactivity timeout set to **{days}** days.", ephemeral=True)
|
|
|
+
|
|
|
+@bot.slash_command(
|
|
|
+ name="role-admin-refresh",
|
|
|
+ description="Set auto-refresh interval (minutes) for panels.",
|
|
|
+ default_member_permissions=ADMIN_PERMS,
|
|
|
+ **guild_ids_kw()
|
|
|
+)
|
|
|
+async def role_admin_refresh(
|
|
|
+ ctx: discord.ApplicationContext,
|
|
|
+ minutes: discord.Option(int, "Refresh interval in minutes (>=1)")
|
|
|
+):
|
|
|
+ if minutes < 1:
|
|
|
+ await ctx.respond("Minutes must be >= 1.", ephemeral=True)
|
|
|
+ return
|
|
|
+ assert ctx.guild is not None
|
|
|
+ with db_connect() as conn:
|
|
|
+ set_refresh_minutes(conn, ctx.guild.id, minutes)
|
|
|
+ await ctx.respond(f"Panel refresh interval set to **{minutes} minute(s)**.", ephemeral=True)
|
|
|
+ await refresh_all_public_panels(ctx.guild, force=True)
|
|
|
+ await refresh_all_dm_panels(ctx.guild, force=True)
|
|
|
+
|
|
|
+@bot.slash_command(
|
|
|
+ name="role-admin-list-users",
|
|
|
+ description="List all pickable roles and their users.",
|
|
|
+ default_member_permissions=ADMIN_PERMS,
|
|
|
+ **guild_ids_kw()
|
|
|
+)
|
|
|
+async def role_admin_list_users(ctx: discord.ApplicationContext):
|
|
|
+ assert ctx.guild is not None
|
|
|
+ with db_connect() as conn:
|
|
|
+ rows = list_selectable_roles(conn, ctx.guild.id)
|
|
|
+ if not rows:
|
|
|
+ await ctx.respond("No pickable roles configured.", ephemeral=True)
|
|
|
+ return
|
|
|
+
|
|
|
+ lines: List[str] = []
|
|
|
+ for r in rows:
|
|
|
+ rid = int(r["role_id"])
|
|
|
+ role = ctx.guild.get_role(rid)
|
|
|
+ if not role:
|
|
|
+ lines.append(f"[missing role {rid}]")
|
|
|
+ lines.append("")
|
|
|
+ continue
|
|
|
+ members = [m.mention for m in role.members if not m.bot]
|
|
|
+ header = f"{role.name} ({len(members)}):"
|
|
|
+ lines.append(header)
|
|
|
+ if members:
|
|
|
+ lines.extend([", ".join(members)])
|
|
|
+ else:
|
|
|
+ lines.append("- (none)")
|
|
|
+ lines.append("") # blank line
|
|
|
+
|
|
|
+ text = "\n".join(lines)
|
|
|
+ if len(text) < 1800:
|
|
|
+ await ctx.respond(text, ephemeral=True)
|
|
|
+ else:
|
|
|
+ from io import BytesIO
|
|
|
+ fp = BytesIO(text.encode("utf-8"))
|
|
|
+ await ctx.respond("Exported role membership:", file=discord.File(fp, filename="role_members.txt"), ephemeral=True)
|
|
|
+
|
|
|
+@bot.slash_command(
|
|
|
+ name="role-admin-list-config",
|
|
|
+ description="Show current configuration.",
|
|
|
+ default_member_permissions=ADMIN_PERMS,
|
|
|
+ **guild_ids_kw()
|
|
|
+)
|
|
|
+async def role_admin_list_config(ctx: discord.ApplicationContext):
|
|
|
+ assert ctx.guild is not None
|
|
|
+ with db_connect() as conn:
|
|
|
+ inactivity = get_inactivity_days(conn, ctx.guild.id)
|
|
|
+ refresh_m = get_refresh_minutes(conn, ctx.guild.id)
|
|
|
+ rows = list_selectable_roles(conn, ctx.guild.id)
|
|
|
+ role_lines = []
|
|
|
+ for r in rows:
|
|
|
+ rid = int(r["role_id"])
|
|
|
+ role = ctx.guild.get_role(rid)
|
|
|
+ mention = role.mention if role else f"<@&{rid}>"
|
|
|
+ role_lines.append(f"- {mention} (min={r['min_users']}, max={r['max_users']})")
|
|
|
+ role_block = "\n".join(role_lines) if role_lines else "(no pickable roles)"
|
|
|
+ msg = (f"**Configuration**\n"
|
|
|
+ f"- Inactivity days: **{inactivity}**\n"
|
|
|
+ f"- Refresh interval (minutes): **{refresh_m}**\n"
|
|
|
+ f"- Pickable roles:\n{role_block}")
|
|
|
+ await ctx.respond(msg, ephemeral=True)
|
|
|
+
|
|
|
+@bot.slash_command(
|
|
|
+ name="role-admin-add-user-for-role",
|
|
|
+ description="Admin: add a specific user to a pickable role.",
|
|
|
+ default_member_permissions=ADMIN_PERMS,
|
|
|
+ **guild_ids_kw()
|
|
|
+)
|
|
|
+async def role_admin_add_user_for_role(
|
|
|
+ ctx: discord.ApplicationContext,
|
|
|
+ user: discord.Option(discord.Member, "User to add"),
|
|
|
+ role: discord.Option(discord.Role, "Pickable role")
|
|
|
+):
|
|
|
+ assert ctx.guild is not None
|
|
|
+ with db_connect() as conn:
|
|
|
+ rows = list_selectable_roles(conn, ctx.guild.id)
|
|
|
+ pickable = {int(r["role_id"]) for r in rows}
|
|
|
+ if role.id not in pickable:
|
|
|
+ await ctx.respond("That role is not pickable.", ephemeral=True)
|
|
|
+ return
|
|
|
+ if not ctx.guild.me or role >= ctx.guild.me.top_role:
|
|
|
+ await ctx.respond("I cannot manage that role due to role hierarchy/permissions.", ephemeral=True)
|
|
|
+ return
|
|
|
+ if role in user.roles:
|
|
|
+ await ctx.respond(f"{user.mention} already has **{role.name}**.", ephemeral=True)
|
|
|
+ return
|
|
|
+ try:
|
|
|
+ await user.add_roles(role, reason=f"disbot-a4: admin add by {ctx.author}")
|
|
|
+ except discord.Forbidden:
|
|
|
+ await ctx.respond("I lack permissions to manage that role.", ephemeral=True)
|
|
|
+ return
|
|
|
+ with db_connect() as conn:
|
|
|
+ set_activity(conn, ctx.guild.id, user.id)
|
|
|
+ await ctx.respond(f"Added **{role.name}** to {user.mention}.", ephemeral=True)
|
|
|
+ await refresh_all_public_panels(ctx.guild)
|
|
|
+ await refresh_all_dm_panels(ctx.guild)
|
|
|
+
|
|
|
+@bot.slash_command(
|
|
|
+ name="role-admin-remove-user-from-role",
|
|
|
+ description="Admin: remove a specific user from a pickable role.",
|
|
|
+ default_member_permissions=ADMIN_PERMS,
|
|
|
+ **guild_ids_kw()
|
|
|
+)
|
|
|
+async def role_admin_remove_user_from_role(
|
|
|
+ ctx: discord.ApplicationContext,
|
|
|
+ user: discord.Option(discord.Member, "User to remove"),
|
|
|
+ role: discord.Option(discord.Role, "Pickable role")
|
|
|
+):
|
|
|
+ assert ctx.guild is not None
|
|
|
+ with db_connect() as conn:
|
|
|
+ rows = list_selectable_roles(conn, ctx.guild.id)
|
|
|
+ pickable = {int(r["role_id"]) for r in rows}
|
|
|
+ if role.id not in pickable:
|
|
|
+ await ctx.respond("That role is not pickable.", ephemeral=True)
|
|
|
+ return
|
|
|
+ if not ctx.guild.me or role >= ctx.guild.me.top_role:
|
|
|
+ await ctx.respond("I cannot manage that role due to role hierarchy/permissions.", ephemeral=True)
|
|
|
+ return
|
|
|
+ if role not in user.roles:
|
|
|
+ await ctx.respond(f"{user.mention} does not have **{role.name}**.", ephemeral=True)
|
|
|
+ return
|
|
|
+ try:
|
|
|
+ await user.remove_roles(role, reason=f"disbot-a4: admin remove by {ctx.author}")
|
|
|
+ except discord.Forbidden:
|
|
|
+ await ctx.respond("I lack permissions to manage that role.", ephemeral=True)
|
|
|
+ return
|
|
|
+ await ctx.respond(f"Removed **{role.name}** from {user.mention}.", ephemeral=True)
|
|
|
+ await refresh_all_public_panels(ctx.guild)
|
|
|
+ await refresh_all_dm_panels(ctx.guild)
|
|
|
+
|
|
|
+# --- Public status panel (admin-only command, posts for everyone) ---
|
|
|
+
|
|
|
+@bot.slash_command(
|
|
|
+ name="role-admin-post-status",
|
|
|
+ description="(Admin) (Re)create a public role status panel here (shows counts and inactive/full).",
|
|
|
+ default_member_permissions=ADMIN_PERMS,
|
|
|
+ **guild_ids_kw()
|
|
|
+)
|
|
|
+async def role_admin_post_status(ctx: discord.ApplicationContext):
|
|
|
+ assert ctx.guild is not None
|
|
|
+ await ctx.defer(ephemeral=True)
|
|
|
+ try:
|
|
|
+ msg = await post_public_status_panel(ctx.guild, ctx.channel)
|
|
|
+ except RuntimeError as e:
|
|
|
+ await ctx.followup.send(str(e), ephemeral=True)
|
|
|
+ return
|
|
|
+ except discord.Forbidden:
|
|
|
+ await ctx.followup.send("I don’t have access to post here. Grant **View Channel** and **Send Messages**.", ephemeral=True)
|
|
|
+ return
|
|
|
+ await ctx.followup.send(f"Role status panel created: [jump to message]({msg.jump_url})", ephemeral=True)
|
|
|
+
|
|
|
+# --- User utilities (hyphen names) ---
|
|
|
+
|
|
|
+@bot.slash_command(
|
|
|
+ name="role-list",
|
|
|
+ description="List the pickable roles you currently have.",
|
|
|
+ **({"guild_ids": [GUILD_ID]} if GUILD_ID else {})
|
|
|
+)
|
|
|
+async def role_list(ctx: discord.ApplicationContext):
|
|
|
+ assert ctx.guild is not None
|
|
|
+ member = ctx.author if isinstance(ctx.author, discord.Member) else ctx.guild.get_member(ctx.user.id)
|
|
|
+ if member is None:
|
|
|
+ await ctx.respond("Could not resolve your member record.", ephemeral=True)
|
|
|
+ return
|
|
|
+ with db_connect() as conn:
|
|
|
+ rows = list_selectable_roles(conn, ctx.guild.id)
|
|
|
+ selectable_ids = {int(r["role_id"]) for r in rows}
|
|
|
+ mine = [r for r in getattr(member, "roles", []) if r.id in selectable_ids]
|
|
|
+ if not mine:
|
|
|
+ await ctx.respond("You currently have the following roles: (none)", ephemeral=True)
|
|
|
+ return
|
|
|
+ mentions = ", ".join([r.mention for r in mine])
|
|
|
+ await ctx.respond(f"You currently have the following roles: {mentions}", ephemeral=True)
|
|
|
+
|
|
|
+@bot.slash_command(
|
|
|
+ name="roles",
|
|
|
+ description="Open your personal role picker (sent to your DMs).",
|
|
|
+ **({"guild_ids": [GUILD_ID]} if GUILD_ID else {})
|
|
|
+)
|
|
|
+async def roles_cmd(ctx: discord.ApplicationContext):
|
|
|
+ assert ctx.guild is not None
|
|
|
+ await ctx.defer(ephemeral=True)
|
|
|
+ msg = await post_personal_dm_panel(ctx.guild, ctx.user)
|
|
|
+ if msg is None:
|
|
|
+ # Fallback ephemeral if DMs are closed (not auto-refreshed globally)
|
|
|
+ member = ctx.author if isinstance(ctx.author, discord.Member) else ctx.guild.get_member(ctx.user.id)
|
|
|
+ if member is None:
|
|
|
+ await ctx.followup.send("Could not DM you or resolve your member record.", ephemeral=True)
|
|
|
+ return
|
|
|
+ view = await build_member_panel_view(ctx.guild, member)
|
|
|
+ content = f"{dm_header_for_guild(ctx.guild)}\nSelect or unselect your roles:"
|
|
|
+ await ctx.followup.send("Your DMs are closed; showing the picker here (won’t auto-refresh):", ephemeral=True)
|
|
|
+ await ctx.channel.send(content, view=view)
|
|
|
+ return
|
|
|
+ await ctx.followup.send(f"I’ve sent you a DM with your personal role picker for **{ctx.guild.name}**. 📬", ephemeral=True)
|
|
|
+
|
|
|
+@bot.slash_command(
|
|
|
+ name="role-pick",
|
|
|
+ description="Add yourself to a pickable role.",
|
|
|
+ **({"guild_ids": [GUILD_ID]} if GUILD_ID else {})
|
|
|
+)
|
|
|
+async def role_pick(ctx: discord.ApplicationContext,
|
|
|
+ role: discord.Option(discord.Role, "Role to pick")):
|
|
|
+ assert ctx.guild is not None
|
|
|
+ member = ctx.author if isinstance(ctx.author, discord.Member) else ctx.guild.get_member(ctx.user.id)
|
|
|
+ if not member:
|
|
|
+ await ctx.respond("Could not resolve your member record.", ephemeral=True)
|
|
|
+ return
|
|
|
+ with db_connect() as conn:
|
|
|
+ rows = list_selectable_roles(conn, ctx.guild.id)
|
|
|
+ if role.id not in {int(r["role_id"]) for r in rows}:
|
|
|
+ await ctx.respond("That role is not pickable.", ephemeral=True)
|
|
|
+ return
|
|
|
+ if role in member.roles:
|
|
|
+ await ctx.respond(f"You already have **{role.name}**.", ephemeral=True)
|
|
|
+ return
|
|
|
+ if not ctx.guild.me or role >= ctx.guild.me.top_role:
|
|
|
+ await ctx.respond("I cannot manage that role due to role hierarchy/permissions.", ephemeral=True)
|
|
|
+ return
|
|
|
+ try:
|
|
|
+ await member.add_roles(role, reason="disbot-a4: role-pick")
|
|
|
+ except discord.Forbidden:
|
|
|
+ await ctx.respond("I lack permissions to manage that role.", ephemeral=True)
|
|
|
+ return
|
|
|
+ with db_connect() as conn:
|
|
|
+ set_activity(conn, ctx.guild.id, member.id)
|
|
|
+ await ctx.respond(f"Added **{role.name}**.", ephemeral=True)
|
|
|
+ await refresh_all_public_panels(ctx.guild)
|
|
|
+ await refresh_all_dm_panels(ctx.guild)
|
|
|
+
|
|
|
+@bot.slash_command(
|
|
|
+ name="role-unpick",
|
|
|
+ description="Remove yourself from a pickable role.",
|
|
|
+ **({"guild_ids": [GUILD_ID]} if GUILD_ID else {})
|
|
|
+)
|
|
|
+async def role_unpick(ctx: discord.ApplicationContext,
|
|
|
+ role: discord.Option(discord.Role, "Role to unpick")):
|
|
|
+ assert ctx.guild is not None
|
|
|
+ member = ctx.author if isinstance(ctx.author, discord.Member) else ctx.guild.get_member(ctx.user.id)
|
|
|
+ if not member:
|
|
|
+ await ctx.respond("Could not resolve your member record.", ephemeral=True)
|
|
|
+ return
|
|
|
+ with db_connect() as conn:
|
|
|
+ rows = list_selectable_roles(conn, ctx.guild.id)
|
|
|
+ if role.id not in {int(r["role_id"]) for r in rows}:
|
|
|
+ await ctx.respond("That role is not pickable.", ephemeral=True)
|
|
|
+ return
|
|
|
+ if role not in member.roles:
|
|
|
+ await ctx.respond(f"You don’t have **{role.name}**.", ephemeral=True)
|
|
|
+ return
|
|
|
+ if not ctx.guild.me or role >= ctx.guild.me.top_role:
|
|
|
+ await ctx.respond("I cannot manage that role due to role hierarchy/permissions.", ephemeral=True)
|
|
|
+ return
|
|
|
+ try:
|
|
|
+ await member.remove_roles(role, reason="disbot-a4: role-unpick")
|
|
|
+ except discord.Forbidden:
|
|
|
+ await ctx.respond("I lack permissions to manage that role.", ephemeral=True)
|
|
|
+ return
|
|
|
+ with db_connect() as conn:
|
|
|
+ set_activity(conn, ctx.guild.id, member.id)
|
|
|
+ await ctx.respond(f"Removed **{role.name}**.", ephemeral=True)
|
|
|
+ await refresh_all_public_panels(ctx.guild)
|
|
|
+ await refresh_all_dm_panels(ctx.guild)
|
|
|
+
|
|
|
+@bot.slash_command(
|
|
|
+ name="role-toggle",
|
|
|
+ description="Toggle a pickable role on/off for yourself.",
|
|
|
+ **({"guild_ids": [GUILD_ID]} if GUILD_ID else {})
|
|
|
+)
|
|
|
+async def role_toggle(ctx: discord.ApplicationContext,
|
|
|
+ role: discord.Option(discord.Role, "Role to toggle")):
|
|
|
+ assert ctx.guild is not None
|
|
|
+ member = ctx.author if isinstance(ctx.author, discord.Member) else ctx.guild.get_member(ctx.user.id)
|
|
|
+ if not member:
|
|
|
+ await ctx.respond("Could not resolve your member record.", ephemeral=True)
|
|
|
+ return
|
|
|
+ with db_connect() as conn:
|
|
|
+ rows = list_selectable_roles(conn, ctx.guild.id)
|
|
|
+ if role.id not in {int(r["role_id"]) for r in rows}:
|
|
|
+ await ctx.respond("That role is not pickable.", ephemeral=True)
|
|
|
+ return
|
|
|
+ if not ctx.guild.me or role >= ctx.guild.me.top_role:
|
|
|
+ await ctx.respond("I cannot manage that role due to role hierarchy/permissions.", ephemeral=True)
|
|
|
+ return
|
|
|
+ try:
|
|
|
+ if role in member.roles:
|
|
|
+ await member.remove_roles(role, reason="disbot-a4: role-toggle off")
|
|
|
+ msg = f"Removed **{role.name}**."
|
|
|
+ else:
|
|
|
+ await member.add_roles(role, reason="disbot-a4: role-toggle on")
|
|
|
+ msg = f"Added **{role.name}**."
|
|
|
+ except discord.Forbidden:
|
|
|
+ await ctx.respond("I lack permissions to manage that role.", ephemeral=True)
|
|
|
+ return
|
|
|
+ with db_connect() as conn:
|
|
|
+ set_activity(conn, ctx.guild.id, member.id)
|
|
|
+ await ctx.respond(msg, ephemeral=True)
|
|
|
+ await refresh_all_public_panels(ctx.guild)
|
|
|
+ await refresh_all_dm_panels(ctx.guild)
|
|
|
+
|
|
|
+@bot.slash_command(
|
|
|
+ name="role-help",
|
|
|
+ description="Show available role commands."
|
|
|
+)
|
|
|
+async def role_help(ctx: discord.ApplicationContext):
|
|
|
+ """List commands available to the invoking user (admin commands shown only to admins)."""
|
|
|
+ is_admin_user = False
|
|
|
+ if ctx.guild:
|
|
|
+ m = ctx.author if isinstance(ctx.author, discord.Member) else ctx.guild.get_member(getattr(ctx.user, "id", 0))
|
|
|
+ is_admin_user = is_admin(m, ctx.channel) # robust channel-based check
|
|
|
+
|
|
|
+ user_cmds = [
|
|
|
+ "/roles — open your personal role picker (DM)",
|
|
|
+ "/role-list — list your current pickable roles",
|
|
|
+ "/role-pick <role> — add yourself to a pickable role",
|
|
|
+ "/role-unpick <role> — remove yourself from a pickable role",
|
|
|
+ "/role-toggle <role> — toggle a pickable role on/off",
|
|
|
+ ]
|
|
|
+ admin_cmds = [
|
|
|
+ "/role-admin-post-status — post the public status panel in this channel",
|
|
|
+ "/role-admin-add-pickable <role> [min] [max] — add/update a pickable role",
|
|
|
+ "/role-admin-remove-pickable <role> — remove a pickable role",
|
|
|
+ "/role-admin-set-role-limits <role> [min] [max] — set min/max hints",
|
|
|
+ "/role-admin-set-inactivity-days <days> — set inactivity timeout",
|
|
|
+ "/role-admin-refresh <minutes> — set auto-refresh interval",
|
|
|
+ "/role-admin-list-users — list users per pickable role",
|
|
|
+ "/role-admin-list-config — show current configuration",
|
|
|
+ "/role-admin-add-user-for-role <user> <role> — add user to role",
|
|
|
+ "/role-admin-remove-user-from-role <user> <role> — remove user from role",
|
|
|
+ ]
|
|
|
+
|
|
|
+ text = "**Role commands available to you:**\n" + "\n".join(f"- {c}" for c in user_cmds)
|
|
|
+ if is_admin_user:
|
|
|
+ text += "\n\n**Admin commands:**\n" + "\n".join(f"- {c}" for c in admin_cmds)
|
|
|
+ await ctx.respond(text, ephemeral=True)
|
|
|
+
|
|
|
+# ------------- Inactivity Cleanup Task -------------
|
|
|
+
|
|
|
+@tasks.loop(hours=24)
|
|
|
+async def daily_inactivity_cleanup():
|
|
|
+ if not GUILD_ID:
|
|
|
+ log.warning("GUILD_ID not set; skipping inactivity cleanup.")
|
|
|
+ return
|
|
|
+ guild = bot.get_guild(GUILD_ID)
|
|
|
+ if guild is None:
|
|
|
+ log.warning("Target guild not found in cache; skipping inactivity cleanup.")
|
|
|
+ return
|
|
|
+
|
|
|
+ with db_connect() as conn:
|
|
|
+ inactivity_days = get_inactivity_days(conn, guild.id)
|
|
|
+ cutoff_ts = int(time.time()) - inactivity_days * 24 * 3600
|
|
|
+ selectable = list_selectable_roles(conn, guild.id)
|
|
|
+ role_ids = {int(r["role_id"]) for r in selectable}
|
|
|
+
|
|
|
+ if not role_ids:
|
|
|
+ log.info("No pickable roles configured; inactivity cleanup skipped.")
|
|
|
+ return
|
|
|
+
|
|
|
+ removed_count = 0
|
|
|
+ for member in guild.members:
|
|
|
+ if member.bot:
|
|
|
+ continue
|
|
|
+ with db_connect() as conn:
|
|
|
+ last = get_last_active(conn, guild.id, member.id)
|
|
|
+ if last is not None and last >= cutoff_ts:
|
|
|
+ continue
|
|
|
+ roles_to_remove = [r for r in member.roles if r.id in role_ids]
|
|
|
+ if not roles_to_remove:
|
|
|
+ continue
|
|
|
+ if not guild.me:
|
|
|
+ continue
|
|
|
+ manageable = [r for r in roles_to_remove if r < guild.me.top_role]
|
|
|
+ if not manageable:
|
|
|
+ continue
|
|
|
+ try:
|
|
|
+ await member.remove_roles(*manageable, reason=f"disbot-a4: inactive >= {inactivity_days} days")
|
|
|
+ removed_count += len(manageable)
|
|
|
+ except discord.Forbidden:
|
|
|
+ log.warning("Forbidden removing roles from %s", member)
|
|
|
+ except discord.HTTPException as e:
|
|
|
+ log.warning("HTTP error removing roles: %s", e)
|
|
|
+
|
|
|
+ log.info("Inactivity cleanup complete. Roles removed: %s", removed_count)
|
|
|
+
|
|
|
+@daily_inactivity_cleanup.before_loop
|
|
|
+async def before_cleanup():
|
|
|
+ await bot.wait_until_ready()
|
|
|
+
|
|
|
+# Periodic refresh supervisor (checks per-minute, honors per-guild refresh_minutes)
|
|
|
+@tasks.loop(minutes=1)
|
|
|
+async def periodic_refresh_supervisor():
|
|
|
+ guilds = [bot.get_guild(GUILD_ID)] if GUILD_ID else list(bot.guilds)
|
|
|
+ for g in guilds:
|
|
|
+ if not g:
|
|
|
+ continue
|
|
|
+ await refresh_all_public_panels(g)
|
|
|
+ await refresh_all_dm_panels(g)
|
|
|
+
|
|
|
+@periodic_refresh_supervisor.before_loop
|
|
|
+async def before_periodic():
|
|
|
+ await bot.wait_until_ready()
|
|
|
+
|
|
|
+# ------------- Startup -------------
|
|
|
+
|
|
|
+@bot.event
|
|
|
+async def on_ready():
|
|
|
+ log.info("Logged in as %s (%s)", bot.user, bot.user.id)
|
|
|
+ db_init()
|
|
|
+
|
|
|
+ # Auto-post public status panel if configured (avoid Forum roots)
|
|
|
+ if ROLE_PANEL_CHANNEL_ID:
|
|
|
+ ch = bot.get_channel(ROLE_PANEL_CHANNEL_ID)
|
|
|
+ if isinstance(ch, (discord.TextChannel, discord.Thread)):
|
|
|
+ try:
|
|
|
+ msg = await post_public_status_panel(ch.guild, ch)
|
|
|
+ log.info("Role status panel posted at: %s", msg.jump_url)
|
|
|
+ except Exception as e:
|
|
|
+ log.warning("Failed to post role status panel automatically: %s", e)
|
|
|
+
|
|
|
+ if not daily_inactivity_cleanup.is_running():
|
|
|
+ daily_inactivity_cleanup.start()
|
|
|
+ if not periodic_refresh_supervisor.is_running():
|
|
|
+ periodic_refresh_supervisor.start()
|
|
|
+
|
|
|
+# Run
|
|
|
+bot.run(DISCORD_TOKEN)
|