Lacie

Technical Paper — Architecture, Async Design, and Systems

What It Is

Lacie is a Discord bot I've been building for my own server, named after a character from a game I love. It's built as a cog-based discord.py application with around 18 feature modules, covering an XP and levelling system, moderation tools, mini-games, utility commands, and several smaller features that exist purely because I wanted them.

This paper covers the more interesting technical decisions: the multi-database XP architecture, the async spam detection pipeline, the minesweeper implementation, the sparkle system, and how the permission layer works across both of discord.py's command systems.

Architecture

The bot is organised as a set of discord.py cogs — classes that group related commands and listeners. Each cog is loaded at startup and can be reloaded at runtime without restarting the bot. The main entry point registers all cogs, sets up logging, and connects to Discord's gateway.

Cog Structure

Cogs encapsulate both commands and event listeners. This means the spam detection system, for instance, registers its own on_message listener in its own class rather than polluting a central dispatcher. The XP system does the same — it listens for messages, applies the cooldown, and writes to databases entirely within its cog.

# cog registration at startup
initial_extensions = [
    'cogs.xp',
    'cogs.moderation',
    'cogs.spam',
    'cogs.games',
    'cogs.sparkle',
    'cogs.fun',
    'cogs.utility',
    'cogs.admin',
    'cogs.stats',
    # ... 9 more
]

async def setup_hook(self):
    for ext in initial_extensions:
        await self.load_extension(ext)
    await self.tree.sync()   # sync slash command tree with Discord

Database Layout

The bot uses separate SQLite databases for different data lifetimes. The XP system uses five databases (one per time scope). Moderation data, sparkles, and game stats each live in their own database files. This keeps queries simple — you never need a cross-join between, say, lifetime XP and minesweeper stats.

DatabaseContentsType
xp_lifetime.dbXP, level, last message timeSQLite (sync)
xp_annual.dbAnnual leaderboard XPSQLite (sync)
xp_monthly.dbMonthly leaderboard XPSQLite (sync)
xp_weekly.dbWeekly leaderboard XPSQLite (sync)
xp_daily.dbDaily leaderboard XPSQLite (sync)
moderation.dbWarnings, mutes, bansaiosqlite (async)
sparkles.dbSparkle records per userSQLite + thread pool
games.dbMinesweeper statsSQLite (sync)

XP System

Every message that qualifies for XP writes to five separate databases in a single function call. The five time-scoped leaderboards (lifetime, annual, monthly, weekly, daily) let the server have both long-term rankings and short-term competitive resets without storing redundant data — each database independently tracks only XP for its scope and gets wiped on the appropriate reset cycle.

Cooldown and Award

The cooldown (60 seconds between XP gains) is checked against the lifetime database only — there's one source of truth for timing. If the user is still on cooldown, all five writes are skipped immediately. Base XP is a random integer between 50 and 100, applied after the cooldown check so the same random value is used across all five databases for that message.

async def add_xp(user):
    if not isinstance(user, discord.Member):
        return

    # cooldown checked against lifetime DB only — single source of truth
    conn_check, cur_check = get_db("lifetime")
    cur_check.execute("SELECT last_message FROM xp WHERE user_id = ?", (str(user.id),))
    row = cur_check.fetchone()
    conn_check.close()
    if row and not can_get_xp(row[0]):
        return

    base_xp = random.randint(50, 100)   # one roll, used across all five DBs

    leaderboard_types = [
        ("lifetime", True),   # role multiplier applied only here
        ("annual",   False),
        ("monthly",  False),
        ("weekly",   False),
        ("daily",    False),
    ]
    for db_type, apply_multiplier in leaderboard_types:
        conn, cur = get_db(db_type)
        cur.execute("SELECT xp, level FROM xp WHERE user_id = ?", (str(user.id),))
        row = cur.fetchone()
        xp, level = row if row else (0, 0)

        gained = int(base_xp * get_multiplier(user, apply_multiplier=apply_multiplier))
        cur.execute(
            "UPDATE xp SET xp = ?, last_message = ? WHERE user_id = ?",
            (xp + gained, int(time.time()), str(user.id))
        )
        conn.commit()
        await check_level_up(user, cur, conn, lifetime=(db_type == "lifetime"))
        conn.close()

Level Curve

The XP threshold per level follows a cubic polynomial. At low levels the steps are small enough that new users rank up fairly quickly. At high levels the curve steepens so the upper end of the leaderboard represents genuine long-term activity. The threshold is rounded down to the nearest 100 so level boundaries always land on clean numbers.

def xp_for_level(level: int) -> int:
    # cubic: grows slowly at first, steeply at high levels
    xp = (level**3 * 1) + (level**2 * 50) + (level * 100)
    return int(math.floor(xp / 100) * 100)
LevelXP Required
1100
5875
106,000
2547,100
50252,500
1001,510,000

Role Multipliers

Server members with certain roles earn bonus XP — but only on the lifetime leaderboard. The time-scoped boards (annual, monthly, etc.) use a flat multiplier of 1 so they remain a fair measure of recent activity rather than a reflection of server role status. The get_multiplier function takes the member's full role list and returns the highest applicable multiplier.

MULTIPLIERS = {
    role_id_booster:    1.25,
    role_id_active:     1.5,
    role_id_supporter:  2.0,
    role_id_partner:    2.5,
    role_id_staff:      3.0,
}

def get_multiplier(member, apply_multiplier=True):
    if not apply_multiplier:
        return 1
    # max() over a generator: 1 (base) or the highest matching role multiplier
    return max((1, *(MULTIPLIERS[r.id] for r in member.roles if r.id in MULTIPLIERS)))

Level-Up Detection

After each XP write, check_level_up computes the level the user should be at given their current cumulative XP total, compares it to the stored level, and if they've crossed a threshold it updates the level and sends a congratulations message. Level-up announcements are only sent for the lifetime database — the time-scoped boards don't announce levels since they reset periodically.

async def check_level_up(user, cur, conn, lifetime=False):
    cur.execute("SELECT xp, level FROM xp WHERE user_id = ?", (str(user.id),))
    row = cur.fetchone()
    if not row: return
    current_xp, current_level = row

    # find the highest level whose threshold the user has passed
    new_level = current_level
    while current_xp >= xp_for_level(new_level + 1):
        new_level += 1

    if new_level > current_level:
        cur.execute("UPDATE xp SET level = ? WHERE user_id = ?", (new_level, str(user.id)))
        conn.commit()
        if lifetime:
            await announce_level_up(user, new_level)

Spam Detection

The spam system intercepts every incoming message without blocking the event loop. It uses an async queue so on_message returns immediately and actual spam analysis happens in a separate task. Per-user message history is stored in bounded deques so old entries fall off automatically without any explicit cleanup.

Queue Architecture

The on_message listener only does one thing: push the message into a queue and return. A tasks.loop running every 100ms drains up to 10 messages per cycle. This decoupling means a spam burst (say, 50 messages in a second) queues up without stalling the bot's event loop — everything else continues to work normally while the queue drains.

class SpamDetection(commands.Cog):
    def __init__(self, bot):
        self.bot          = bot
        self.message_queue = asyncio.Queue()
        self.user_messages = defaultdict(lambda: deque(maxlen=50))

    @commands.Cog.listener()
    async def on_message(self, message):
        if message.author.bot or not message.guild: return
        await self.message_queue.put(message)   # non-blocking — returns immediately

    @tasks.loop(seconds=0.1)
    async def process_message_queue(self):
        for _ in range(10):          # process up to 10 per 100ms cycle
            try:
                msg = self.message_queue.get_nowait()
            except asyncio.QueueEmpty:
                break
            self.user_messages[msg.author.id].append(
                (msg.created_at, msg.channel.id, msg.content)
            )
            await self._process_message(msg)

Detection Patterns

Two patterns are checked on every message. The first catches classic channel floods: 10 or more messages in the same channel within a 5-second window. The second catches raid behaviour: a single user posting across 10 or more different channels within 5 seconds — a pattern typical of coordinated raids where bots spread across a server simultaneously.

async def check_spam_patterns(self, member, guild):
    messages = self.user_messages[member.id]
    cutoff   = datetime.now(timezone.utc) - timedelta(seconds=5)
    recent   = [m for m in messages if m[0] >= cutoff]

    if len(recent) < 2: return None

    # Pattern 1: flood in one channel (≥10 messages in 5s)
    channel_counts = defaultdict(int)
    for _, ch_id, _ in recent:
        channel_counts[ch_id] += 1
    for ch_id, count in channel_counts.items():
        if count >= 10:
            return {"type": "same_channel", "channel_id": ch_id, "count": count}

    # Pattern 2: cross-channel raid (≥10 different channels in 5s)
    unique_channels = len(set(m[1] for m in recent))
    if unique_channels >= 10:
        return {"type": "multiple_channels", "channel_count": unique_channels}

    return None

Response and Escalation

When spam is confirmed, the bot immediately mutes the user and sends an interactive staff alert. The alert embed includes the spam pattern type, message count, and action buttons for staff to kick, ban, or dismiss. If staff takes no action within 12 hours, a background task applies a default 1-day mute automatically — so the system degrades gracefully even when staff aren't online.

# staff alert with resolution buttons — ephemeral to keep channels clean
class SpamActionView(discord.ui.View):
    def __init__(self, user_id, guild_id, spam_data):
        super().__init__(timeout=None)
        self.user_id   = user_id
        self.guild_id  = guild_id
        self.spam_data = spam_data

    @discord.ui.button(label="Kick", style=discord.ButtonStyle.danger)
    async def kick_button(self, interaction, button):
        guild  = interaction.client.get_guild(self.guild_id)
        member = guild.get_member(self.user_id)
        if member: await member.kick(reason="Spam — staff action")
        await interaction.response.send_message("User kicked.", ephemeral=True)
        self.cleanup_pending(self.spam_data["message_id"])

    @discord.ui.button(label="Ban", style=discord.ButtonStyle.danger)
    async def ban_button(self, interaction, button):
        guild = interaction.client.get_guild(self.guild_id)
        await guild.ban(discord.Object(id=self.user_id), reason="Spam — staff action")
        await interaction.response.send_message("User banned.", ephemeral=True)
        self.cleanup_pending(self.spam_data["message_id"])

    @discord.ui.button(label="Dismiss", style=discord.ButtonStyle.secondary)
    async def dismiss_button(self, interaction, button):
        self.cleanup_pending(self.spam_data["message_id"])
        await interaction.response.send_message("Alert dismissed.", ephemeral=True)


# fallback: auto-apply if staff ignores the alert for 12 hours
@tasks.loop(minutes=1)
async def check_pending_actions(self):
    now = datetime.now(timezone.utc).isoformat()
    c.execute("SELECT * FROM spam_actions WHERE expires_at <= ?", (now,))
    for row in c.fetchall():
        await self.apply_default_action(row["user_id"], row["guild_id"], row["spam_data"])
        c.execute("DELETE FROM spam_actions WHERE message_id = ?", (row["message_id"],))

Minesweeper

The minesweeper game implements a full 13×13 board directly in Discord using button components. Each cell is a button; revealed cells show their adjacent mine count or are blank if zero. The board is generated lazily — mines aren't placed until the first click, which guarantees the first cell is always safe.

Safe First Click

When the player clicks their first cell, the board calls setup_board with the clicked row and column. All cells within a 3×3 radius around that cell are excluded from mine placement candidates, so the first click — and the flood-fill it triggers — always lands on safe ground.

def setup_board(self, safe_row: int, safe_col: int):
    # exclude the 3×3 safe zone from mine placement
    candidates = [
        (r, c) for r in range(self.rows)
               for c in range(self.cols)
               if not (abs(r - safe_row) <= 1 and abs(c - safe_col) <= 1)
    ]
    mine_positions = random.sample(candidates, self.mine_count)
    for r, c in mine_positions:
        self.board[r][c] = -1    # -1 = mine

    # fill in adjacent mine counts for all non-mine cells
    for r in range(self.rows):
        for c in range(self.cols):
            if self.board[r][c] != -1:
                self.board[r][c] = sum(
                    1 for dr in (-1, 0, 1)
                      for dc in (-1, 0, 1)
                      if not (dr == dc == 0)
                      and 0 <= r+dr < self.rows
                      and 0 <= c+dc < self.cols
                      and self.board[r+dr][c+dc] == -1
                )

BFS Flood-Fill

Clicking a cell with zero adjacent mines triggers an auto-reveal of all connected empty cells — matching the behaviour of standard minesweeper. This is a breadth-first search starting from the clicked cell. Empty cells (value 0) are added to the queue and their neighbours continue the spread. Cells with an adjacent mine count (1–8) are revealed but not queued — they form the boundary of the flood.

def _flood_fill(self, start_row: int, start_col: int):
    queue   = deque([(start_row, start_col)])
    visited = {(start_row, start_col)}

    while queue:
        row, col = queue.popleft()
        for dr in (-1, 0, 1):
            for dc in (-1, 0, 1):
                if dr == dc == 0: continue
                nr, nc = row + dr, col + dc
                if not (0 <= nr < self.rows and 0 <= nc < self.cols): continue
                if (nr, nc) in visited or self.flags[nr][nc]: continue
                visited.add((nr, nc))
                self.revealed[nr][nc] = True
                if self.board[nr][nc] == 0:
                    queue.append((nr, nc))   # only empty cells continue the fill

Discord UI Integration

A 13×13 grid of buttons is the maximum Discord allows in a single message — Discord limits component rows to 5, each holding 5 buttons, for 25 maximum. A 13-column board doesn't fit in a single message view, so the game uses paginated rows: each button click re-renders the full board state into a new message edit. Each button carries its row/column coordinates encoded in its custom_id, so the click handler knows exactly which cell was pressed without any external state lookup.

def build_view(self, game: MinesweeperGame) -> discord.ui.View:
    view = discord.ui.View(timeout=300)
    for r in range(game.rows):
        for c in range(game.cols):
            label, style = self._cell_display(game, r, c)
            btn = discord.ui.Button(
                label=label,
                style=style,
                custom_id=f"ms_{r}_{c}",   # row/col encoded in custom_id
                row=r % 5                 # Discord allows max 5 action rows
            )
            btn.callback = self._make_callback(r, c)
            view.add_item(btn)
    return view


def _cell_display(self, game, r, c):
    if game.flags[r][c]:
        return "🚩", discord.ButtonStyle.danger
    if not game.revealed[r][c]:
        return "·", discord.ButtonStyle.secondary
    val = game.board[r][c]
    if val == -1:
        return "💣", discord.ButtonStyle.danger
    if val == 0:
        return " ", discord.ButtonStyle.success
    return str(val), discord.ButtonStyle.primary

Sparkle System

The sparkle system awards rare reactions to messages whose Discord snowflake ID ends in a specific pattern. Discord message IDs are 64-bit integers that increase monotonically — the trailing digits serve as a perfectly uniform probability source with no external RNG required.

Three tiers exist: a message ending in 000 is approximately 1-in-1000 (regular sparkle); ending in 0000 is roughly 1-in-10,000 (rare); ending in 00000 is roughly 1-in-100,000 (epic). The check is a simple string suffix test and short-circuits at the first match, so a message ending in 00000 only triggers the epic tier — not all three.

CHANCES = {
    "epic":    ("00000", "💫", "an **epic sparkle**"),
    "rare":    ("0000",  "🌟", "a **rare sparkle**"),
    "regular": ("000",   "✨", "a **regular sparkle**"),
}

@commands.Cog.listener()
async def on_message(self, message):
    if message.author.bot or not message.guild: return
    msg_id = str(message.id)

    for sparkle_type, (suffix, emoji, desc) in CHANCES.items():
        if msg_id.endswith(suffix):
            await message.add_reaction(emoji)
            await message.reply(
                f"**{message.author.name}** got {desc}! {emoji}",
                mention_author=False
            )
            await asyncio.to_thread(self._db_write, message, sparkle_type)
            break   # break: 00000 matches "000" too — only award once

The database write is pushed to a thread pool with asyncio.to_thread so the blocking SQLite call doesn't stall the async event loop. Sparkles are rare enough that the write queue never backs up, so no queue management is needed here — unlike the spam system.

Unified Permission Check

Discord.py has two separate command systems: legacy prefix commands (@commands.command) that respond to text like !ban, and newer slash commands (@app_commands.command) that use Discord's native UI. The two systems have different API surfaces — prefix commands receive a Context object, slash commands receive an Interaction. They also respond differently: ctx.send() vs interaction.response.send_message().

Rather than duplicating permission logic in every command, is_admin() returns a single predicate that detects which system it's running under and adapts accordingly. The same decorator works on both command types, which means a staff command added once gets consistent permission enforcement whether it's invoked as a prefix command or a slash command.

@staticmethod
def is_admin():
    'Decorator that works for both prefix commands and slash commands.'
    async def predicate(target):
        # target is Context for prefix commands, Interaction for slash
        user           = getattr(target, "author", None) or getattr(target, "user", None)
        is_interaction = hasattr(target, "response")

        async def send_message(msg, ephemeral=False):
            if is_interaction:
                if not target.response.is_done():
                    await target.response.send_message(msg, ephemeral=ephemeral)
                else:
                    await target.followup.send(msg, ephemeral=ephemeral)
            else:
                await target.send(msg)

        has_admin_role = any(r.id in ADMIN_ROLE_IDS for r in user.roles)
        is_owner       = user.id == LILAC_ID

        if not (has_admin_role or is_owner):
            await send_message("You do not have permission to use this command.",
                               ephemeral=is_interaction)
            raise CheckFailure("User lacks admin permissions.")
        return True

    # apply the same predicate to both command systems with one decorator call
    def decorator(func):
        func = commands.check(predicate)(func)
        func = app_commands.check(predicate)(func)
        return func
    return decorator

Applied to a command it looks like this — the decorator is the same regardless of which command type is being decorated:

@is_admin()
@commands.command()
async def purge(ctx, amount: int):
    await ctx.channel.purge(limit=amount)

@is_admin()
@app_commands.command(name="purge")
async def purge_slash(interaction: discord.Interaction, amount: int):
    await interaction.channel.purge(limit=amount)
    await interaction.response.send_message("Done.", ephemeral=True)

Leaderboard Resets

The time-scoped leaderboard databases (daily, weekly, monthly, annual) are cleared on schedule rather than queried with date filters. This keeps leaderboard queries simple — a SELECT * FROM xp ORDER BY xp DESC always returns the current period's data with no date arithmetic — at the cost of a scheduled reset task for each time scope.

Resets are implemented as scheduled async tasks that clear the appropriate database and log the event. The schedule is evaluated at startup so a restart during a reset window still correctly identifies whether a reset has already run that day/week/month.

@tasks.loop(time=datetime.time(0, 0))   # fires at midnight UTC
async def daily_reset(self):
    conn, cur = get_db("daily")
    cur.execute("DELETE FROM xp")
    conn.commit()
    conn.close()
    await self.bot.get_channel(RESET_LOG_CHANNEL).send(
        "[SYSTEM] Daily leaderboard reset."
    )

@tasks.loop(time=datetime.time(0, 0))   # checked daily, fires only on Monday
async def weekly_reset(self):
    if datetime.now(timezone.utc).weekday() != 0: return
    conn, cur = get_db("weekly")
    cur.execute("DELETE FROM xp")
    conn.commit()
    conn.close()