Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,15 @@ A comprehensive Telegram bot for managing group members with profile verificatio
- **New user probation**: New members restricted from sending links/forwarded messages for 3 days (configurable)
- **Contact card blocking**: Prevents all non-admin members from sharing contact cards/phone numbers (delete + restrict)
- **Anti-spam enforcement**: Tracks violations and restricts spammers after threshold
- **Trusted users**: Admin-managed trusted list to bypass anti-spam + duplicate-spam checks

### Admin Tools
- **/verify command**: Whitelist users with hidden profile pictures (DM only)
- **/unverify command**: Remove users from verification whitelist (DM only)
- **Inline verification**: Forward messages to bot for quick verify/unverify buttons
- **/trust command**: Add trusted users (DM only, supports user ID or forwarded message)
- **/untrust command**: Remove trusted users from trusted list (DM only)
- **/trusted command**: List all trusted users (DM only)
- **Automatic clearance**: Sends notification when verified users' warnings are cleared

## Requirements
Expand Down
36 changes: 36 additions & 0 deletions src/bot/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,42 @@ def format_hours_display(hours: int) -> str:

ADMIN_WARN_SENT_MESSAGE = "✅ Peringatan telah dikirim ke {user_mention} di grup."

TRUST_USER_ID_REQUIRED_MESSAGE = (
"❌ Penggunaan: /trust USER_ID atau /untrust USER_ID, atau forward pesan user ke bot."
)

TRUST_USER_ID_INVALID_MESSAGE = "❌ User ID harus berupa angka."

TRUST_ADDED_MESSAGE = (
"✅ User `{user_id}` ditambahkan ke trusted list.\n"
"• Probation dibersihkan di {probation_clear_count} grup\n"
"• Unrestrict dicoba di {unrestrict_count} grup"
)

TRUST_ALREADY_EXISTS_MESSAGE = "ℹ️ User `{user_id}` sudah ada di trusted list."

TRUST_REMOVED_MESSAGE = "✅ User `{user_id}` dihapus dari trusted list."

TRUST_USER_NOT_FOUND_MESSAGE = "ℹ️ User `{user_id}` tidak ada di trusted list."

TRUST_LIST_EMPTY_MESSAGE = "ℹ️ Trusted list masih kosong."

TRUST_LIST_HEADER = "📋 Trusted Users:\n{trusted_lines}"

TRUST_DM_ONLY_MESSAGE = (
"❌ Perintah ini hanya bisa digunakan di chat pribadi dengan bot."
)

TRUST_NO_PERMISSION_MESSAGE = (
"❌ Kamu tidak memiliki izin untuk menggunakan perintah ini."
)

TRUST_CALLBACK_INVALID_MESSAGE = "❌ Data callback tidak valid."

CHECK_TRUST_BUTTON_LABEL = "🤝 Trust User"

CHECK_UNTRUST_BUTTON_LABEL = "🤝 Untrust User"

# Anti-spam probation warning for new users
NEW_USER_SPAM_WARNING = (
"⚠️ {user_mention} baru bergabung dan sedang dalam masa percobaan.\n"
Expand Down
29 changes: 29 additions & 0 deletions src/bot/database/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,35 @@ class PhotoVerificationWhitelist(SQLModel, table=True):
notes: str | None = Field(default=None)


class TrustedUser(SQLModel, table=True):
"""
Trusted users who bypass anti-spam and duplicate spam enforcement.

`group_id=0` is used as a global trust scope. Future per-group trust can
use a real Telegram group ID without changing the schema.

Attributes:
id: Primary key (auto-generated).
user_id: Telegram user ID.
group_id: Scope identifier (0 = global).
trusted_by_admin_id: Telegram user ID of admin granting trust.
trusted_at: Timestamp when trust was granted.
notes: Optional admin notes.
"""

__tablename__ = "trusted_users"
__table_args__ = (
UniqueConstraint('user_id', 'group_id', name='uix_trusted_user_group'),
)

id: int | None = Field(default=None, primary_key=True)
user_id: int = Field(index=True)
group_id: int = Field(default=0, index=True)
trusted_by_admin_id: int
trusted_at: datetime = Field(default_factory=lambda: datetime.now(UTC))
notes: str | None = Field(default=None)


class PendingCaptchaValidation(SQLModel, table=True):
"""
Tracks users who need to complete captcha verification.
Expand Down
159 changes: 159 additions & 0 deletions src/bot/database/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
NewUserProbation,
PendingCaptchaValidation,
PhotoVerificationWhitelist,
TrustedUser,
UserWarning,
)

Expand Down Expand Up @@ -347,6 +348,164 @@ def remove_photo_verification_whitelist(self, user_id: int) -> None:
session.commit()
logger.info(f"Removed from photo whitelist: user_id={user_id}")

def add_trusted_user(
self,
user_id: int,
trusted_by_admin_id: int,
group_id: int = 0,
notes: str | None = None,
) -> TrustedUser:
"""
Add a user to trusted list.

Args:
user_id: Telegram user ID.
trusted_by_admin_id: Telegram user ID of admin granting trust.
group_id: Trust scope ID (0 means global).
notes: Optional admin notes.

Returns:
TrustedUser: Created trusted record.

Raises:
ValueError: If user is already trusted in the scope.
"""
with Session(self._engine) as session:
statement = select(TrustedUser).where(
TrustedUser.user_id == user_id,
TrustedUser.group_id == group_id,
)
existing = session.exec(statement).first()

if existing:
raise ValueError(f"User {user_id} is already trusted for scope {group_id}")

record = TrustedUser(
user_id=user_id,
group_id=group_id,
trusted_by_admin_id=trusted_by_admin_id,
notes=notes,
)
session.add(record)
session.commit()
session.refresh(record)
logger.info(
f"Added trusted user: user_id={user_id}, admin_id={trusted_by_admin_id}, scope={group_id}"
)
return record

def remove_trusted_user(self, user_id: int, group_id: int = 0) -> None:
"""
Remove a user from trusted list.

Args:
user_id: Telegram user ID.
group_id: Trust scope ID (0 means global).

Raises:
ValueError: If user is not trusted in the scope.
"""
with Session(self._engine) as session:
statement = select(TrustedUser).where(
TrustedUser.user_id == user_id,
TrustedUser.group_id == group_id,
)
record = session.exec(statement).first()

if not record:
raise ValueError(f"User {user_id} is not trusted for scope {group_id}")

session.delete(record)
session.commit()
logger.info(f"Removed trusted user: user_id={user_id}, scope={group_id}")

def is_user_trusted(self, user_id: int, group_id: int | None = None) -> bool:
"""
Check whether a user is trusted.

Currently only global trust (group_id=None or 0) is supported; passing
a non-zero group_id raises NotImplementedError.

Args:
user_id: Telegram user ID.
group_id: Optional group scope. Must be None or 0 for now.

Returns:
bool: True if user is trusted globally.

Raises:
NotImplementedError: If a non-zero group_id is provided.
"""
if group_id is not None and group_id != 0:
raise NotImplementedError(
"Per-group trusted user reads are not yet supported; "
"only global (group_id=0) trust is implemented."
)

with Session(self._engine) as session:
statement = select(TrustedUser).where(
TrustedUser.user_id == user_id,
TrustedUser.group_id == 0,
)
record = session.exec(statement).first()
return record is not None

def get_trusted_user_ids(self, group_id: int | None = None) -> set[int]:
"""
Get trusted user IDs.

Currently only global trust (group_id=None or 0) is supported; passing
a non-zero group_id raises NotImplementedError.

Args:
group_id: Optional group scope. Must be None or 0 for now.

Returns:
set[int]: Trusted user IDs scoped to global (group_id=0).

Raises:
NotImplementedError: If a non-zero group_id is provided.
"""
if group_id is not None and group_id != 0:
raise NotImplementedError(
"Per-group trusted user reads are not yet supported; "
"only global (group_id=0) trust is implemented."
)

with Session(self._engine) as session:
statement = select(TrustedUser.user_id).where(TrustedUser.group_id == 0)
return set(session.exec(statement).all())

def get_trusted_users(self, group_id: int | None = None) -> list[TrustedUser]:
"""
Get trusted user records with metadata.

Currently only global trust (group_id=None or 0) is supported; passing
a non-zero group_id raises NotImplementedError.

Args:
group_id: Optional group scope. Must be None or 0 for now.

Returns:
list[TrustedUser]: Trusted user records scoped to global (group_id=0).

Raises:
NotImplementedError: If a non-zero group_id is provided.
"""
if group_id is not None and group_id != 0:
raise NotImplementedError(
"Per-group trusted user reads are not yet supported; "
"only global (group_id=0) trust is implemented."
)

with Session(self._engine) as session:
statement = (
select(TrustedUser)
.where(TrustedUser.group_id == 0)
.order_by(TrustedUser.trusted_at.desc())
)
return list(session.exec(statement).all())

def get_warnings_past_time_threshold(
self, threshold: timedelta
) -> list[UserWarning]:
Expand Down
11 changes: 6 additions & 5 deletions src/bot/handlers/anti_spam.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
)
from bot.database.service import get_database
from bot.group_config import get_group_config_for_update
from bot.services.telegram_utils import get_user_mention
from bot.services.telegram_utils import get_user_mention, is_user_admin_or_trusted

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -305,8 +305,7 @@ async def handle_contact_spam(
if user.is_bot:
return

admin_ids = context.bot_data.get("group_admin_ids", {}).get(group_config.group_id, [])
if user.id in admin_ids:
if is_user_admin_or_trusted(context, group_config.group_id, user.id):
return

msg = update.message
Expand Down Expand Up @@ -394,8 +393,7 @@ async def handle_inline_keyboard_spam(
if user.is_bot:
return

admin_ids = context.bot_data.get("group_admin_ids", {}).get(group_config.group_id, [])
if user.id in admin_ids:
if is_user_admin_or_trusted(context, group_config.group_id, user.id):
return

msg = update.message
Expand Down Expand Up @@ -488,6 +486,9 @@ async def handle_new_user_spam(
if user.is_bot:
return

if is_user_admin_or_trusted(context, group_config.group_id, user.id):
return

db = get_database()
record = db.get_new_user_probation(user.id, group_config.group_id)

Expand Down
27 changes: 23 additions & 4 deletions src/bot/handlers/check.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
ADMIN_CHECK_PROMPT,
ADMIN_WARN_SENT_MESSAGE,
ADMIN_WARN_USER_MESSAGE,
CHECK_TRUST_BUTTON_LABEL,
CHECK_UNTRUST_BUTTON_LABEL,
MISSING_ITEMS_SEPARATOR,
)
from bot.database.service import get_database
Expand Down Expand Up @@ -60,15 +62,24 @@ async def _build_check_response(

db = get_database()
is_whitelisted = db.is_user_photo_whitelisted(user_id)
is_trusted = db.is_user_trusted(user_id) is True

if result.is_complete:
action_prompt = ADMIN_CHECK_ACTION_COMPLETE
buttons: list[InlineKeyboardButton] = []
if is_whitelisted:
keyboard = InlineKeyboardMarkup([
[InlineKeyboardButton("❌ Unverify User", callback_data=f"unverify:{user_id}")]
])
buttons.append(
InlineKeyboardButton("❌ Unverify User", callback_data=f"unverify:{user_id}")
)
if is_trusted:
buttons.append(
InlineKeyboardButton(CHECK_UNTRUST_BUTTON_LABEL, callback_data=f"untrust:{user_id}")
)
else:
keyboard = None
buttons.append(
InlineKeyboardButton(CHECK_TRUST_BUTTON_LABEL, callback_data=f"trust:{user_id}")
)
keyboard = InlineKeyboardMarkup([buttons]) if buttons else None
else:
action_prompt = ADMIN_CHECK_ACTION_INCOMPLETE
# Store missing items in callback data (photo,username format)
Expand All @@ -77,10 +88,18 @@ async def _build_check_response(
missing_code += "p"
if not result.has_username:
missing_code += "u"

trust_button = (
InlineKeyboardButton(CHECK_UNTRUST_BUTTON_LABEL, callback_data=f"untrust:{user_id}")
if is_trusted
else InlineKeyboardButton(CHECK_TRUST_BUTTON_LABEL, callback_data=f"trust:{user_id}")
)

keyboard = InlineKeyboardMarkup([
[
InlineKeyboardButton("⚠️ Warn User", callback_data=f"warn:{user_id}:{missing_code}"),
InlineKeyboardButton("✅ Verify User", callback_data=f"verify:{user_id}"),
trust_button,
]
])

Expand Down
5 changes: 2 additions & 3 deletions src/bot/handlers/duplicate_spam.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
RESTRICTED_PERMISSIONS,
)
from bot.group_config import GroupConfig, get_group_config_for_update
from bot.services.telegram_utils import get_user_mention
from bot.services.telegram_utils import get_user_mention, is_user_admin_or_trusted

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -117,8 +117,7 @@ async def handle_duplicate_spam(
if user.is_bot:
return

admin_ids = context.bot_data.get("group_admin_ids", {}).get(group_config.group_id, [])
if user.id in admin_ids:
if is_user_admin_or_trusted(context, group_config.group_id, user.id):
return

text = update.message.text or update.message.caption
Expand Down
Loading
Loading