Overview

Lacie is a Discord bot I've been building for my own server. It's organised as a cog-based discord.py application with around 18 feature modules covering an XP/levelling system, moderation tooling, mini-games, and various fun commands. The whole thing is open source on GitHub.

Some of the more technically interesting pieces: the XP system tracks progress across five parallel time-scoped databases (lifetime, annual, monthly, weekly, daily) in one atomic pass per message. The spam protection system uses an async message queue and per-user sliding deques to detect flood patterns in real time. The minesweeper game implements a full 13×13 board with BFS flood-fill, safe-first-click generation, and per-user stats.

Python discord.py SQLite aiosqlite Asyncio Pillow

Code

// XP system — five parallel databases per message
Every message that earns XP writes to five separate SQLite databases in a single function call — lifetime, annual, monthly, weekly, and daily. The cooldown is checked against the lifetime DB first; if the user is still on cooldown, all five databases are skipped entirely. The role multiplier (1.25× to 3×) is applied only to lifetime XP so the time-scoped leaderboards stay fair.
async def add_xp(user):
    if not isinstance(user, discord.Member):
        return

    # check cooldown against lifetime DB only (60s between XP gains)
    conn_check, cur_check = get_db("lifetime")
    cur_check.execute("SELECT last_message FROM xp WHERE user_id = ?", (str(user.id),))
    row = cur_check.fetchone()
    conn_check.close()
    if row and not can_get_xp(row[0]):
        return

    base_xp = random.randint(50, 100)

    # write to all five leaderboard databases in one pass
    leaderboard_types = [
        ("lifetime", True),   # apply role multiplier
        ("annual",   False),
        ("monthly",  False),
        ("weekly",   False),
        ("daily",    False),
    ]
    for db_type, apply_multiplier in leaderboard_types:
        conn, cur = get_db(db_type)
        cur.execute("SELECT xp, level FROM xp WHERE user_id = ?", (str(user.id),))
        row = cur.fetchone()
        xp, level = row if row else (0, 0)

        gained  = int(base_xp * get_multiplier(user, apply_multiplier=apply_multiplier))
        cur.execute("UPDATE xp SET xp = ?, last_message = ? WHERE user_id = ?",
                    (xp + gained, int(time.time()), str(user.id)))
        conn.commit()
        await check_level_up(user, cur, conn, lifetime=(db_type == "lifetime"))
        conn.close()


def xp_for_level(level: int) -> int:
    # cubic curve: grows slowly at low levels, steeply at high levels
    xp = (level**3 * 1) + (level**2 * 50) + (level * 100)
    return int(math.floor(xp / 100) * 100)   # round down to nearest 100


# role multipliers: higher roles grant up to 3x XP on lifetime leaderboard
MULTIPLIERS = {
    role_id_booster:    1.25,
    role_id_active:     1.5,
    role_id_supporter:  2.0,
    role_id_partner:    2.5,
    role_id_staff:      3.0,
}

def get_multiplier(member, apply_multiplier=True):
    if not apply_multiplier: return 1
    return max((1, *(MULTIPLIERS[r.id] for r in member.roles if r.id in MULTIPLIERS)))
// Minesweeper — board generation and BFS flood-fill
The board is only generated on the first click, and mines are excluded from a 3×3 safe zone around that cell — so the first move can never be a mine. Revealing a cell with zero adjacent mines triggers BFS flood-fill to auto-reveal all connected empty cells, matching the behaviour of standard minesweeper.
def setup_board(self, safe_row: int, safe_col: int):
    # exclude the safe zone from mine placement
    positions = [
        (r, c) for r in range(self.rows) for c in range(self.cols)
        if not (abs(r - safe_row) <= 1 and abs(c - safe_col) <= 1)
    ]
    mine_positions = random.sample(positions, self.mine_count)
    for r, c in mine_positions:
        self.board[r][c] = -1

    # fill in adjacent-mine counts for all non-mine cells
    for r in range(self.rows):
        for c in range(self.cols):
            if self.board[r][c] != -1:
                self.board[r][c] = sum(
                    1 for dr in (-1, 0, 1) for dc in (-1, 0, 1)
                    if not (dr == dc == 0)
                    and 0 <= r+dr < self.rows
                    and 0 <= c+dc < self.cols
                    and self.board[r+dr][c+dc] == -1
                )


def _flood_fill(self, start_row: int, start_col: int):
    # BFS: auto-reveal all connected empty cells
    queue   = deque([(start_row, start_col)])
    visited = {(start_row, start_col)}

    while queue:
        row, col = queue.popleft()
        for dr in (-1, 0, 1):
            for dc in (-1, 0, 1):
                if dr == dc == 0: continue
                nr, nc = row + dr, col + dc
                if not (0 <= nr < self.rows and 0 <= nc < self.cols): continue
                if (nr, nc) in visited or self.flags[nr][nc]: continue
                visited.add((nr, nc))
                self.revealed[nr][nc] = True
                if self.board[nr][nc] == 0:   # empty cell — keep spreading
                    queue.append((nr, nc))
// Spam detection — async queue + sliding window
Each incoming message is pushed to an async queue so the on_message listener returns immediately without blocking. A separate task processes up to 10 messages per 100ms cycle. Per-user message history is stored in a deque(maxlen=50) so old entries fall off automatically. Two patterns are checked: 10+ messages in the same channel within 5 seconds, or messages spread across 10+ different channels in 5 seconds (raid pattern). Confirmed spam immediately mutes the user and sends a staff alert with interactive resolution buttons; if staff takes no action within 12 hours, a background task applies an automatic 1-day mute.
# per-user sliding window — deque drops entries automatically when maxlen is hit
self.user_messages = defaultdict(lambda: deque(maxlen=50))

@tasks.loop(seconds=0.1)
async def process_message_queue(self):
    for _ in range(10):          # drain up to 10 per cycle
        try:
            msg = self.message_queue.get_nowait()
            await self._process_message(msg)
        except asyncio.QueueEmpty:
            break

async def check_spam_patterns(self, member, guild):
    messages = self.user_messages[member.id]
    cutoff   = datetime.now(timezone.utc) - timedelta(seconds=5)
    recent   = [m for m in messages if m[0] >= cutoff]

    if len(recent) < 2: return None

    # Pattern 1: flood in one channel
    channel_counts = defaultdict(int)
    for _, ch_id, _ in recent:
        channel_counts[ch_id] += 1
    for ch_id, count in channel_counts.items():
        if count >= 10:
            return {"type": "same_channel", "count": count, ...}

    # Pattern 2: cross-channel raid
    unique_channels = len(set(m[1] for m in recent))
    if unique_channels >= 10:
        return {"type": "multiple_channels", "channel_count": unique_channels, ...}

    return None


# if staff ignores the alert, a background task auto-mutes after 12 hours
@tasks.loop(minutes=1)
async def check_pending_actions(self):
    now = datetime.now(timezone.utc).isoformat()
    c.execute("SELECT * FROM spam_actions WHERE expires_at <= ?", (now,))
    for row in c.fetchall():
        await self.apply_default_action(row["user_id"], row["guild_id"], row["spam_data"])
        c.execute("DELETE FROM spam_actions WHERE message_id = ?", (row["message_id"],))
// Admin permission check — one decorator for prefix and slash
Discord.py has two separate command systems: legacy prefix commands (@commands.command) and newer slash commands (@app_commands.command). Rather than writing two separate permission checks, is_admin() returns a single predicate that detects which type it's running under and responds appropriately — checking roles, handling errors, and sending messages via the right API in both cases.
@staticmethod
def is_admin():
    'Decorator that works for both prefix commands and slash commands.'
    async def predicate(target):
        # target is Context for prefix commands, Interaction for slash commands
        user           = getattr(target, "author", None) or getattr(target, "user", None)
        is_interaction = hasattr(target, "response")

        # unified send helper — right API, right ephemeral setting
        async def send_message(msg, ephemeral=False):
            if is_interaction:
                if not target.response.is_done():
                    await target.response.send_message(msg, ephemeral=ephemeral)
                else:
                    await target.followup.send(msg, ephemeral=ephemeral)
            else:
                await target.send(msg)

        has_admin_role = any(r.id in ADMIN_ROLE_IDS for r in user.roles)
        is_owner       = user.id == LILAC_ID          # owner bypass

        if not (has_admin_role or is_owner):
            await send_message("You do not have permission to use this command.",
                               ephemeral=is_interaction)
            raise CheckFailure("User lacks admin permissions.")
        return True

    # apply the same predicate to both command systems
    def decorator(func):
        func = commands.check(predicate)(func)
        func = app_commands.check(predicate)(func)
        return func
    return decorator
// Sparkle system — probability via message ID
Discord message IDs are snowflakes — 64-bit integers that increase monotonically. The sparkle system uses the trailing digits as a cheap probability source: roughly 1-in-1000 messages end in 000 (regular sparkle), 1-in-10000 end in 0000 (rare), and 1-in-100000 end in 00000 (epic). No RNG needed — the ID itself is the roll. The database write is pushed to a thread pool with asyncio.to_thread so it doesn't block the event loop.
CHANCES = {
    "epic":    ("00000", "💫", "an **epic sparkle**"),    # ~1 in 100,000
    "rare":    ("0000",  "🌟", "a **rare sparkle**"),     # ~1 in 10,000
    "regular": ("000",   "✨", "a **regular sparkle**"),  # ~1 in 1,000
}

@commands.Cog.listener()
async def on_message(self, message):
    if message.author.bot or not message.guild: return
    msg_id = str(message.id)

    for sparkle_type, (suffix, emoji, desc) in CHANCES.items():
        if msg_id.endswith(suffix):
            await message.add_reaction(emoji)
            await message.reply(f"**{message.author.name}** got {desc}! {emoji}",
                                mention_author=False)
            await asyncio.to_thread(self._db_write, message, sparkle_type)
            break   # a message ending in 00000 matches all three — only award once

Links