Technical Paper — Architecture, Async Design, and Systems
Lacie is a Discord bot I've been building for my own server, named after a character from a game I love. It's built as a cog-based discord.py application with around 18 feature modules, covering an XP and levelling system, moderation tools, mini-games, utility commands, and several smaller features that exist purely because I wanted them.
This paper covers the more interesting technical decisions: the multi-database XP architecture, the async spam detection pipeline, the minesweeper implementation, the sparkle system, and how the permission layer works across both of discord.py's command systems.
The bot is organised as a set of discord.py cogs — classes that group related commands and listeners. Each cog is loaded at startup and can be reloaded at runtime without restarting the bot. The main entry point registers all cogs, sets up logging, and connects to Discord's gateway.
Cogs encapsulate both commands and event listeners. This means the spam detection system,
for instance, registers its own on_message listener in its own class rather than
polluting a central dispatcher. The XP system does the same — it listens for messages, applies
the cooldown, and writes to databases entirely within its cog.
# cog registration at startup initial_extensions = [ 'cogs.xp', 'cogs.moderation', 'cogs.spam', 'cogs.games', 'cogs.sparkle', 'cogs.fun', 'cogs.utility', 'cogs.admin', 'cogs.stats', # ... 9 more ] async def setup_hook(self): for ext in initial_extensions: await self.load_extension(ext) await self.tree.sync() # sync slash command tree with Discord
The bot uses separate SQLite databases for different data lifetimes. The XP system uses five databases (one per time scope). Moderation data, sparkles, and game stats each live in their own database files. This keeps queries simple — you never need a cross-join between, say, lifetime XP and minesweeper stats.
| Database | Contents | Type |
|---|---|---|
| xp_lifetime.db | XP, level, last message time | SQLite (sync) |
| xp_annual.db | Annual leaderboard XP | SQLite (sync) |
| xp_monthly.db | Monthly leaderboard XP | SQLite (sync) |
| xp_weekly.db | Weekly leaderboard XP | SQLite (sync) |
| xp_daily.db | Daily leaderboard XP | SQLite (sync) |
| moderation.db | Warnings, mutes, bans | aiosqlite (async) |
| sparkles.db | Sparkle records per user | SQLite + thread pool |
| games.db | Minesweeper stats | SQLite (sync) |
Every message that qualifies for XP writes to five separate databases in a single function call. The five time-scoped leaderboards (lifetime, annual, monthly, weekly, daily) let the server have both long-term rankings and short-term competitive resets without storing redundant data — each database independently tracks only XP for its scope and gets wiped on the appropriate reset cycle.
The cooldown (60 seconds between XP gains) is checked against the lifetime database only — there's one source of truth for timing. If the user is still on cooldown, all five writes are skipped immediately. Base XP is a random integer between 50 and 100, applied after the cooldown check so the same random value is used across all five databases for that message.
async def add_xp(user): if not isinstance(user, discord.Member): return # cooldown checked against lifetime DB only — single source of truth conn_check, cur_check = get_db("lifetime") cur_check.execute("SELECT last_message FROM xp WHERE user_id = ?", (str(user.id),)) row = cur_check.fetchone() conn_check.close() if row and not can_get_xp(row[0]): return base_xp = random.randint(50, 100) # one roll, used across all five DBs leaderboard_types = [ ("lifetime", True), # role multiplier applied only here ("annual", False), ("monthly", False), ("weekly", False), ("daily", False), ] for db_type, apply_multiplier in leaderboard_types: conn, cur = get_db(db_type) cur.execute("SELECT xp, level FROM xp WHERE user_id = ?", (str(user.id),)) row = cur.fetchone() xp, level = row if row else (0, 0) gained = int(base_xp * get_multiplier(user, apply_multiplier=apply_multiplier)) cur.execute( "UPDATE xp SET xp = ?, last_message = ? WHERE user_id = ?", (xp + gained, int(time.time()), str(user.id)) ) conn.commit() await check_level_up(user, cur, conn, lifetime=(db_type == "lifetime")) conn.close()
The XP threshold per level follows a cubic polynomial. At low levels the steps are small enough that new users rank up fairly quickly. At high levels the curve steepens so the upper end of the leaderboard represents genuine long-term activity. The threshold is rounded down to the nearest 100 so level boundaries always land on clean numbers.
def xp_for_level(level: int) -> int: # cubic: grows slowly at first, steeply at high levels xp = (level**3 * 1) + (level**2 * 50) + (level * 100) return int(math.floor(xp / 100) * 100)
| Level | XP Required |
|---|---|
| 1 | 100 |
| 5 | 875 |
| 10 | 6,000 |
| 25 | 47,100 |
| 50 | 252,500 |
| 100 | 1,510,000 |
Server members with certain roles earn bonus XP — but only on the lifetime leaderboard.
The time-scoped boards (annual, monthly, etc.) use a flat multiplier of 1 so they remain
a fair measure of recent activity rather than a reflection of server role status. The
get_multiplier function takes the member's full role list and returns the highest
applicable multiplier.
MULTIPLIERS = {
role_id_booster: 1.25,
role_id_active: 1.5,
role_id_supporter: 2.0,
role_id_partner: 2.5,
role_id_staff: 3.0,
}
def get_multiplier(member, apply_multiplier=True):
if not apply_multiplier:
return 1
# max() over a generator: 1 (base) or the highest matching role multiplier
return max((1, *(MULTIPLIERS[r.id] for r in member.roles if r.id in MULTIPLIERS)))
After each XP write, check_level_up computes the level the user should be at
given their current cumulative XP total, compares it to the stored level, and if they've
crossed a threshold it updates the level and sends a congratulations message. Level-up
announcements are only sent for the lifetime database — the time-scoped boards don't announce
levels since they reset periodically.
async def check_level_up(user, cur, conn, lifetime=False): cur.execute("SELECT xp, level FROM xp WHERE user_id = ?", (str(user.id),)) row = cur.fetchone() if not row: return current_xp, current_level = row # find the highest level whose threshold the user has passed new_level = current_level while current_xp >= xp_for_level(new_level + 1): new_level += 1 if new_level > current_level: cur.execute("UPDATE xp SET level = ? WHERE user_id = ?", (new_level, str(user.id))) conn.commit() if lifetime: await announce_level_up(user, new_level)
The spam system intercepts every incoming message without blocking the event loop. It uses an
async queue so on_message returns immediately and actual spam analysis happens in a
separate task. Per-user message history is stored in bounded deques so old entries fall off
automatically without any explicit cleanup.
The on_message listener only does one thing: push the message into a queue and
return. A tasks.loop running every 100ms drains up to 10 messages per cycle. This
decoupling means a spam burst (say, 50 messages in a second) queues up without stalling the bot's
event loop — everything else continues to work normally while the queue drains.
class SpamDetection(commands.Cog): def __init__(self, bot): self.bot = bot self.message_queue = asyncio.Queue() self.user_messages = defaultdict(lambda: deque(maxlen=50)) @commands.Cog.listener() async def on_message(self, message): if message.author.bot or not message.guild: return await self.message_queue.put(message) # non-blocking — returns immediately @tasks.loop(seconds=0.1) async def process_message_queue(self): for _ in range(10): # process up to 10 per 100ms cycle try: msg = self.message_queue.get_nowait() except asyncio.QueueEmpty: break self.user_messages[msg.author.id].append( (msg.created_at, msg.channel.id, msg.content) ) await self._process_message(msg)
Two patterns are checked on every message. The first catches classic channel floods: 10 or more messages in the same channel within a 5-second window. The second catches raid behaviour: a single user posting across 10 or more different channels within 5 seconds — a pattern typical of coordinated raids where bots spread across a server simultaneously.
async def check_spam_patterns(self, member, guild): messages = self.user_messages[member.id] cutoff = datetime.now(timezone.utc) - timedelta(seconds=5) recent = [m for m in messages if m[0] >= cutoff] if len(recent) < 2: return None # Pattern 1: flood in one channel (≥10 messages in 5s) channel_counts = defaultdict(int) for _, ch_id, _ in recent: channel_counts[ch_id] += 1 for ch_id, count in channel_counts.items(): if count >= 10: return {"type": "same_channel", "channel_id": ch_id, "count": count} # Pattern 2: cross-channel raid (≥10 different channels in 5s) unique_channels = len(set(m[1] for m in recent)) if unique_channels >= 10: return {"type": "multiple_channels", "channel_count": unique_channels} return None
When spam is confirmed, the bot immediately mutes the user and sends an interactive staff alert. The alert embed includes the spam pattern type, message count, and action buttons for staff to kick, ban, or dismiss. If staff takes no action within 12 hours, a background task applies a default 1-day mute automatically — so the system degrades gracefully even when staff aren't online.
# staff alert with resolution buttons — ephemeral to keep channels clean class SpamActionView(discord.ui.View): def __init__(self, user_id, guild_id, spam_data): super().__init__(timeout=None) self.user_id = user_id self.guild_id = guild_id self.spam_data = spam_data @discord.ui.button(label="Kick", style=discord.ButtonStyle.danger) async def kick_button(self, interaction, button): guild = interaction.client.get_guild(self.guild_id) member = guild.get_member(self.user_id) if member: await member.kick(reason="Spam — staff action") await interaction.response.send_message("User kicked.", ephemeral=True) self.cleanup_pending(self.spam_data["message_id"]) @discord.ui.button(label="Ban", style=discord.ButtonStyle.danger) async def ban_button(self, interaction, button): guild = interaction.client.get_guild(self.guild_id) await guild.ban(discord.Object(id=self.user_id), reason="Spam — staff action") await interaction.response.send_message("User banned.", ephemeral=True) self.cleanup_pending(self.spam_data["message_id"]) @discord.ui.button(label="Dismiss", style=discord.ButtonStyle.secondary) async def dismiss_button(self, interaction, button): self.cleanup_pending(self.spam_data["message_id"]) await interaction.response.send_message("Alert dismissed.", ephemeral=True) # fallback: auto-apply if staff ignores the alert for 12 hours @tasks.loop(minutes=1) async def check_pending_actions(self): now = datetime.now(timezone.utc).isoformat() c.execute("SELECT * FROM spam_actions WHERE expires_at <= ?", (now,)) for row in c.fetchall(): await self.apply_default_action(row["user_id"], row["guild_id"], row["spam_data"]) c.execute("DELETE FROM spam_actions WHERE message_id = ?", (row["message_id"],))
The minesweeper game implements a full 13×13 board directly in Discord using button components. Each cell is a button; revealed cells show their adjacent mine count or are blank if zero. The board is generated lazily — mines aren't placed until the first click, which guarantees the first cell is always safe.
When the player clicks their first cell, the board calls setup_board with the
clicked row and column. All cells within a 3×3 radius around that cell are excluded from mine
placement candidates, so the first click — and the flood-fill it triggers — always lands on
safe ground.
def setup_board(self, safe_row: int, safe_col: int): # exclude the 3×3 safe zone from mine placement candidates = [ (r, c) for r in range(self.rows) for c in range(self.cols) if not (abs(r - safe_row) <= 1 and abs(c - safe_col) <= 1) ] mine_positions = random.sample(candidates, self.mine_count) for r, c in mine_positions: self.board[r][c] = -1 # -1 = mine # fill in adjacent mine counts for all non-mine cells for r in range(self.rows): for c in range(self.cols): if self.board[r][c] != -1: self.board[r][c] = sum( 1 for dr in (-1, 0, 1) for dc in (-1, 0, 1) if not (dr == dc == 0) and 0 <= r+dr < self.rows and 0 <= c+dc < self.cols and self.board[r+dr][c+dc] == -1 )
Clicking a cell with zero adjacent mines triggers an auto-reveal of all connected empty cells — matching the behaviour of standard minesweeper. This is a breadth-first search starting from the clicked cell. Empty cells (value 0) are added to the queue and their neighbours continue the spread. Cells with an adjacent mine count (1–8) are revealed but not queued — they form the boundary of the flood.
def _flood_fill(self, start_row: int, start_col: int): queue = deque([(start_row, start_col)]) visited = {(start_row, start_col)} while queue: row, col = queue.popleft() for dr in (-1, 0, 1): for dc in (-1, 0, 1): if dr == dc == 0: continue nr, nc = row + dr, col + dc if not (0 <= nr < self.rows and 0 <= nc < self.cols): continue if (nr, nc) in visited or self.flags[nr][nc]: continue visited.add((nr, nc)) self.revealed[nr][nc] = True if self.board[nr][nc] == 0: queue.append((nr, nc)) # only empty cells continue the fill
A 13×13 grid of buttons is the maximum Discord allows in a single message — Discord limits
component rows to 5, each holding 5 buttons, for 25 maximum. A 13-column board doesn't fit in
a single message view, so the game uses paginated rows: each button click re-renders the full
board state into a new message edit. Each button carries its row/column coordinates encoded in
its custom_id, so the click handler knows exactly which cell was pressed without
any external state lookup.
def build_view(self, game: MinesweeperGame) -> discord.ui.View: view = discord.ui.View(timeout=300) for r in range(game.rows): for c in range(game.cols): label, style = self._cell_display(game, r, c) btn = discord.ui.Button( label=label, style=style, custom_id=f"ms_{r}_{c}", # row/col encoded in custom_id row=r % 5 # Discord allows max 5 action rows ) btn.callback = self._make_callback(r, c) view.add_item(btn) return view def _cell_display(self, game, r, c): if game.flags[r][c]: return "🚩", discord.ButtonStyle.danger if not game.revealed[r][c]: return "·", discord.ButtonStyle.secondary val = game.board[r][c] if val == -1: return "💣", discord.ButtonStyle.danger if val == 0: return " ", discord.ButtonStyle.success return str(val), discord.ButtonStyle.primary
The sparkle system awards rare reactions to messages whose Discord snowflake ID ends in a specific pattern. Discord message IDs are 64-bit integers that increase monotonically — the trailing digits serve as a perfectly uniform probability source with no external RNG required.
Three tiers exist: a message ending in 000 is approximately 1-in-1000
(regular sparkle); ending in 0000 is roughly 1-in-10,000 (rare); ending in
00000 is roughly 1-in-100,000 (epic). The check is a simple string suffix test
and short-circuits at the first match, so a message ending in 00000 only triggers
the epic tier — not all three.
CHANCES = {
"epic": ("00000", "💫", "an **epic sparkle**"),
"rare": ("0000", "🌟", "a **rare sparkle**"),
"regular": ("000", "✨", "a **regular sparkle**"),
}
@commands.Cog.listener()
async def on_message(self, message):
if message.author.bot or not message.guild: return
msg_id = str(message.id)
for sparkle_type, (suffix, emoji, desc) in CHANCES.items():
if msg_id.endswith(suffix):
await message.add_reaction(emoji)
await message.reply(
f"**{message.author.name}** got {desc}! {emoji}",
mention_author=False
)
await asyncio.to_thread(self._db_write, message, sparkle_type)
break # break: 00000 matches "000" too — only award once
The database write is pushed to a thread pool with asyncio.to_thread so the
blocking SQLite call doesn't stall the async event loop. Sparkles are rare enough that the
write queue never backs up, so no queue management is needed here — unlike the spam system.
Discord.py has two separate command systems: legacy prefix commands (@commands.command)
that respond to text like !ban, and newer slash commands (@app_commands.command)
that use Discord's native UI. The two systems have different API surfaces — prefix commands receive a
Context object, slash commands receive an Interaction. They also respond
differently: ctx.send() vs interaction.response.send_message().
Rather than duplicating permission logic in every command, is_admin() returns a
single predicate that detects which system it's running under and adapts accordingly. The same
decorator works on both command types, which means a staff command added once gets consistent
permission enforcement whether it's invoked as a prefix command or a slash command.
@staticmethod def is_admin(): 'Decorator that works for both prefix commands and slash commands.' async def predicate(target): # target is Context for prefix commands, Interaction for slash user = getattr(target, "author", None) or getattr(target, "user", None) is_interaction = hasattr(target, "response") async def send_message(msg, ephemeral=False): if is_interaction: if not target.response.is_done(): await target.response.send_message(msg, ephemeral=ephemeral) else: await target.followup.send(msg, ephemeral=ephemeral) else: await target.send(msg) has_admin_role = any(r.id in ADMIN_ROLE_IDS for r in user.roles) is_owner = user.id == LILAC_ID if not (has_admin_role or is_owner): await send_message("You do not have permission to use this command.", ephemeral=is_interaction) raise CheckFailure("User lacks admin permissions.") return True # apply the same predicate to both command systems with one decorator call def decorator(func): func = commands.check(predicate)(func) func = app_commands.check(predicate)(func) return func return decorator
Applied to a command it looks like this — the decorator is the same regardless of which command type is being decorated:
@is_admin() @commands.command() async def purge(ctx, amount: int): await ctx.channel.purge(limit=amount) @is_admin() @app_commands.command(name="purge") async def purge_slash(interaction: discord.Interaction, amount: int): await interaction.channel.purge(limit=amount) await interaction.response.send_message("Done.", ephemeral=True)
The time-scoped leaderboard databases (daily, weekly, monthly, annual) are cleared on
schedule rather than queried with date filters. This keeps leaderboard queries simple —
a SELECT * FROM xp ORDER BY xp DESC always returns the current period's data with
no date arithmetic — at the cost of a scheduled reset task for each time scope.
Resets are implemented as scheduled async tasks that clear the appropriate database and log the event. The schedule is evaluated at startup so a restart during a reset window still correctly identifies whether a reset has already run that day/week/month.
@tasks.loop(time=datetime.time(0, 0)) # fires at midnight UTC async def daily_reset(self): conn, cur = get_db("daily") cur.execute("DELETE FROM xp") conn.commit() conn.close() await self.bot.get_channel(RESET_LOG_CHANNEL).send( "[SYSTEM] Daily leaderboard reset." ) @tasks.loop(time=datetime.time(0, 0)) # checked daily, fires only on Monday async def weekly_reset(self): if datetime.now(timezone.utc).weekday() != 0: return conn, cur = get_db("weekly") cur.execute("DELETE FROM xp") conn.commit() conn.close()