Discord bot — discord.py + SQLite + aiosqlite
[← Back to Portfolio]Lacie is a Discord bot I've been building for my own server. It's organised as a cog-based discord.py application with around 18 feature modules covering an XP/levelling system, moderation tooling, mini-games, and various fun commands. The whole thing is open source on GitHub.
Some of the more technically interesting pieces: the XP system tracks progress across five parallel time-scoped databases (lifetime, annual, monthly, weekly, daily) in one atomic pass per message. The spam protection system uses an async message queue and per-user sliding deques to detect flood patterns in real time. The minesweeper game implements a full 13×13 board with BFS flood-fill, safe-first-click generation, and per-user stats.
async def add_xp(user): if not isinstance(user, discord.Member): return # check cooldown against lifetime DB only (60s between XP gains) conn_check, cur_check = get_db("lifetime") cur_check.execute("SELECT last_message FROM xp WHERE user_id = ?", (str(user.id),)) row = cur_check.fetchone() conn_check.close() if row and not can_get_xp(row[0]): return base_xp = random.randint(50, 100) # write to all five leaderboard databases in one pass leaderboard_types = [ ("lifetime", True), # apply role multiplier ("annual", False), ("monthly", False), ("weekly", False), ("daily", False), ] for db_type, apply_multiplier in leaderboard_types: conn, cur = get_db(db_type) cur.execute("SELECT xp, level FROM xp WHERE user_id = ?", (str(user.id),)) row = cur.fetchone() xp, level = row if row else (0, 0) gained = int(base_xp * get_multiplier(user, apply_multiplier=apply_multiplier)) cur.execute("UPDATE xp SET xp = ?, last_message = ? WHERE user_id = ?", (xp + gained, int(time.time()), str(user.id))) conn.commit() await check_level_up(user, cur, conn, lifetime=(db_type == "lifetime")) conn.close() def xp_for_level(level: int) -> int: # cubic curve: grows slowly at low levels, steeply at high levels xp = (level**3 * 1) + (level**2 * 50) + (level * 100) return int(math.floor(xp / 100) * 100) # round down to nearest 100 # role multipliers: higher roles grant up to 3x XP on lifetime leaderboard MULTIPLIERS = { role_id_booster: 1.25, role_id_active: 1.5, role_id_supporter: 2.0, role_id_partner: 2.5, role_id_staff: 3.0, } def get_multiplier(member, apply_multiplier=True): if not apply_multiplier: return 1 return max((1, *(MULTIPLIERS[r.id] for r in member.roles if r.id in MULTIPLIERS)))
def setup_board(self, safe_row: int, safe_col: int): # exclude the safe zone from mine placement positions = [ (r, c) for r in range(self.rows) for c in range(self.cols) if not (abs(r - safe_row) <= 1 and abs(c - safe_col) <= 1) ] mine_positions = random.sample(positions, self.mine_count) for r, c in mine_positions: self.board[r][c] = -1 # fill in adjacent-mine counts for all non-mine cells for r in range(self.rows): for c in range(self.cols): if self.board[r][c] != -1: self.board[r][c] = sum( 1 for dr in (-1, 0, 1) for dc in (-1, 0, 1) if not (dr == dc == 0) and 0 <= r+dr < self.rows and 0 <= c+dc < self.cols and self.board[r+dr][c+dc] == -1 ) def _flood_fill(self, start_row: int, start_col: int): # BFS: auto-reveal all connected empty cells queue = deque([(start_row, start_col)]) visited = {(start_row, start_col)} while queue: row, col = queue.popleft() for dr in (-1, 0, 1): for dc in (-1, 0, 1): if dr == dc == 0: continue nr, nc = row + dr, col + dc if not (0 <= nr < self.rows and 0 <= nc < self.cols): continue if (nr, nc) in visited or self.flags[nr][nc]: continue visited.add((nr, nc)) self.revealed[nr][nc] = True if self.board[nr][nc] == 0: # empty cell — keep spreading queue.append((nr, nc))
deque(maxlen=50) so old entries fall off automatically. Two patterns are
checked: 10+ messages in the same channel within 5 seconds, or messages spread across
10+ different channels in 5 seconds (raid pattern). Confirmed spam immediately mutes the
user and sends a staff alert with interactive resolution buttons; if staff takes no
action within 12 hours, a background task applies an automatic 1-day mute.# per-user sliding window — deque drops entries automatically when maxlen is hit self.user_messages = defaultdict(lambda: deque(maxlen=50)) @tasks.loop(seconds=0.1) async def process_message_queue(self): for _ in range(10): # drain up to 10 per cycle try: msg = self.message_queue.get_nowait() await self._process_message(msg) except asyncio.QueueEmpty: break async def check_spam_patterns(self, member, guild): messages = self.user_messages[member.id] cutoff = datetime.now(timezone.utc) - timedelta(seconds=5) recent = [m for m in messages if m[0] >= cutoff] if len(recent) < 2: return None # Pattern 1: flood in one channel channel_counts = defaultdict(int) for _, ch_id, _ in recent: channel_counts[ch_id] += 1 for ch_id, count in channel_counts.items(): if count >= 10: return {"type": "same_channel", "count": count, ...} # Pattern 2: cross-channel raid unique_channels = len(set(m[1] for m in recent)) if unique_channels >= 10: return {"type": "multiple_channels", "channel_count": unique_channels, ...} return None # if staff ignores the alert, a background task auto-mutes after 12 hours @tasks.loop(minutes=1) async def check_pending_actions(self): now = datetime.now(timezone.utc).isoformat() c.execute("SELECT * FROM spam_actions WHERE expires_at <= ?", (now,)) for row in c.fetchall(): await self.apply_default_action(row["user_id"], row["guild_id"], row["spam_data"]) c.execute("DELETE FROM spam_actions WHERE message_id = ?", (row["message_id"],))
@commands.command) and newer slash commands
(@app_commands.command). Rather than writing two separate permission checks,
is_admin() returns a single predicate that detects which type it's running
under and responds appropriately — checking roles, handling errors, and sending messages
via the right API in both cases.@staticmethod def is_admin(): 'Decorator that works for both prefix commands and slash commands.' async def predicate(target): # target is Context for prefix commands, Interaction for slash commands user = getattr(target, "author", None) or getattr(target, "user", None) is_interaction = hasattr(target, "response") # unified send helper — right API, right ephemeral setting async def send_message(msg, ephemeral=False): if is_interaction: if not target.response.is_done(): await target.response.send_message(msg, ephemeral=ephemeral) else: await target.followup.send(msg, ephemeral=ephemeral) else: await target.send(msg) has_admin_role = any(r.id in ADMIN_ROLE_IDS for r in user.roles) is_owner = user.id == LILAC_ID # owner bypass if not (has_admin_role or is_owner): await send_message("You do not have permission to use this command.", ephemeral=is_interaction) raise CheckFailure("User lacks admin permissions.") return True # apply the same predicate to both command systems def decorator(func): func = commands.check(predicate)(func) func = app_commands.check(predicate)(func) return func return decorator
000
(regular sparkle), 1-in-10000 end in 0000 (rare), and
1-in-100000 end in 00000 (epic). No RNG needed — the ID itself is the
roll. The database write is pushed to a thread pool with
asyncio.to_thread so it doesn't block the event loop.CHANCES = {
"epic": ("00000", "💫", "an **epic sparkle**"), # ~1 in 100,000
"rare": ("0000", "🌟", "a **rare sparkle**"), # ~1 in 10,000
"regular": ("000", "✨", "a **regular sparkle**"), # ~1 in 1,000
}
@commands.Cog.listener()
async def on_message(self, message):
if message.author.bot or not message.guild: return
msg_id = str(message.id)
for sparkle_type, (suffix, emoji, desc) in CHANCES.items():
if msg_id.endswith(suffix):
await message.add_reaction(emoji)
await message.reply(f"**{message.author.name}** got {desc}! {emoji}",
mention_author=False)
await asyncio.to_thread(self._db_write, message, sparkle_type)
break # a message ending in 00000 matches all three — only award once