добавлен модуль welcome

This commit is contained in:
Maxim Slipenko 2024-08-11 16:15:47 +03:00
parent e3443f835a
commit 5732b1bcc3
10 changed files with 515 additions and 11 deletions

View File

@ -13,6 +13,7 @@ async def main():
# safe=False из-за super().__init__()
module_loader("standard", "filters", safe=False),
module_loader("standard", "report"),
module_loader("standard", "welcome", safe=False),
]
)
await ocab.start()

View File

@ -1,16 +1,16 @@
from typing import TYPE_CHECKING
from typing_extensions import deprecated
from aiogram import Bot
from aiogram.filters import BaseFilter
from aiogram.types import Message
from typing_extensions import deprecated
from ocab_core.modules_system.public_api import get_module, log
from ocab_core.modules_system.public_api import get_module
if TYPE_CHECKING:
from ocab_modules.standard.config import IConfig
from ocab_modules.standard.roles import Roles as IRoles
config: "IConfig" = get_module("standard.config", "config")
@ -21,8 +21,11 @@ except Exception:
ROLES_MODULE_LOADED = False
pass
def module_init():
config.register("filters::approved_chat_id", "int", multiple=True, shared=True, default_value=[])
config.register(
"filters::approved_chat_id", "int", multiple=True, shared=True, default_value=[]
)
config.register("filters::default_chat_tag", "string", shared=True)
@ -40,8 +43,9 @@ def chat_not_in_approve(message: Message) -> bool:
# log(f"Chat not in approve list: {chat_id}")
return True
class ChatIDFilter(BaseFilter):
def __init__(self, blacklist = False, approved_chats = None) -> None:
def __init__(self, blacklist=False, approved_chats=None) -> None:
self.blacklist = blacklist
self.approved_chats = approved_chats
super().__init__()
@ -50,22 +54,21 @@ class ChatIDFilter(BaseFilter):
chat_id = message.chat.id
approved_chats = self.approved_chats or get_approved_chat_id()
print(approved_chats)
res = chat_id in approved_chats
return res ^ (self.blacklist)
class ChatNotInApproveFilter(ChatIDFilter):
def __init__(self) -> None:
super().__init__(allow = False)
super().__init__(allow=False)
class ChatModerOrAdminFilter(BaseFilter):
async def __call__(self, message: Message, bot: Bot) -> bool:
if not ROLES_MODULE_LOADED:
raise Exception("Roles module not loaded")
user_id = message.from_user.id
roles = Roles()
admins = await bot.get_chat_administrators(message.chat.id)
@ -73,4 +76,4 @@ class ChatModerOrAdminFilter(BaseFilter):
await roles.check_admin_permission(user_id)
or await roles.check_moderator_permission(user_id)
or any(user_id == admin.user.id for admin in admins)
)
)

View File

@ -0,0 +1,21 @@
## Модуль Welcome
Модуль `welcome` отвечает за верификацию новых участников чата, используя различные методы проверки. Он помогает предотвратить спам и автоматические атаки на чат, обеспечивая, что новые участники подтверждают свою человеческую природу перед получением доступа.
## Команды и Методы
Модуль поддерживает несколько методов верификации, которые случайным образом применяются к новым участникам чата:
- **IAmHumanButton** - Верификация с помощью кнопки.
- **IAmHumanInput** - Верификация с помощью ввода текста.
- **MathButtonsVerification** - Верификация решением математической задачи с помощью кнопок.
- **MathInputVerificationMethod** - Верификация решением математической задачи с помощью ввода.
- **QuestionButtonsVerification** - Верификация ответом на вопрос с помощью кнопок.
- **QuestionInputVerification** - Верификация ответом на вопрос с помощью ввода.
## Как это работает
1. **Обработка новых участников**: Когда новый участник присоединяется к чату, выбирается случайный метод верификации, и создается задача проверки.
2. **Тайм-аут проверки**: Если новый участник не проходит проверку в течение 30 секунд, его статус в чате меняется на "забанен".
3. **Верификация по кнопкам**: Если верификация осуществляется с помощью кнопок, обработчик будет ожидать нажатие кнопки от пользователя и проверит правильность ответа.
4. **Верификация по вводу**: Если верификация осуществляется путем ввода текста, обработчик будет проверять введенный текст и действовать в зависимости от результата проверки.

View File

@ -0,0 +1 @@
from .main import module_init

View File

@ -0,0 +1,19 @@
{
"id": "standard.welcome",
"name": "Welcome",
"description": "Модуль для проверки на бота",
"author": "OCAB Team",
"version": "1.0.0",
"privileged": true,
"dependencies": {
"optional": {
"standard.command_helper": "^1.0.0",
"standard.filters": "^1.0.0"
}
},
"pythonDependencies": {
"required": {
"asyncio": "*"
}
}
}

View File

@ -0,0 +1,133 @@
import asyncio
import random
from aiogram import Bot, Router, types
from aiogram.exceptions import TelegramBadRequest
from aiogram.filters import IS_MEMBER, IS_NOT_MEMBER, ChatMemberUpdatedFilter
from aiogram.types import ChatMemberUpdated
from ocab_core.modules_system.public_api import log, register_router
from .verifications_methods.base import VerificationCallback
from .verifications_methods.iamhuman import IAmHumanButton, IAmHumanInput
from .verifications_methods.math import (
MathButtonsVerification,
MathInputVerificationMethod,
)
from .verifications_methods.question import (
QuestionButtonsVerification,
QuestionInputVerification,
)
verification_methods = [
IAmHumanButton(),
IAmHumanInput(),
MathButtonsVerification(),
MathInputVerificationMethod(),
QuestionButtonsVerification(),
QuestionInputVerification(),
]
verification_tasks = {}
async def new_member_handler(event: ChatMemberUpdated, bot: Bot):
if event.new_chat_member.status == "member":
user_id = event.from_user.id
chat_id = event.chat.id
method = random.choice(verification_methods) # nosec
task_data = await method.create_task(event, bot)
task_data["user_id"] = user_id
task_data["chat_id"] = chat_id
task_data["method"] = method
task = asyncio.create_task(verify_timeout(bot, task_data))
verification_tasks[(user_id, chat_id)] = {
"task": task,
"task_data": task_data,
}
async def verify_timeout(bot: Bot, task_data: dict):
try:
chat_id = task_data["chat_id"]
user_id = task_data["user_id"]
await asyncio.sleep(30)
try:
if "message_id" in task_data:
await bot.delete_message(chat_id, task_data["message_id"])
except TelegramBadRequest:
return
chat_member = await bot.get_chat_member(chat_id, user_id)
if chat_member.status == "member":
await bot.ban_chat_member(chat_id, user_id)
except Exception as e:
log(f"Error in verify_timeout: {e}")
finally:
verification_tasks.pop((user_id, chat_id), None)
async def handle_inline_button_verification(
callback_query: types.CallbackQuery, callback_data: VerificationCallback, bot: Bot
):
user_id = callback_data.user_id
chat_id = callback_data.chat_id
if callback_query.from_user.id == user_id:
if (user_id, chat_id) in verification_tasks:
task = verification_tasks[(user_id, chat_id)]
task_data = task["task_data"]
method = task_data["method"]
task_data["answer"] = callback_data.answer
result = await method.verify(task_data)
if result:
task["task"].cancel()
await bot.delete_message(chat_id, callback_query.message.message_id)
else:
await callback_query.answer("Неправильный ответ!", show_alert=True)
pass
else:
await callback_query.answer("Эта кнопка не для вас!", show_alert=True)
async def handle_input_verification(message: types.Message, bot: Bot):
user_id = message.from_user.id
chat_id = message.chat.id
if (user_id, chat_id) in verification_tasks:
task = verification_tasks[(user_id, chat_id)]
task_data = task["task_data"]
method = task_data["method"]
task_data["answer"] = message.text
result = await method.verify(task_data)
if result:
task["task"].cancel()
await bot.delete_message(chat_id, task_data["message_id"])
await bot.delete_message(chat_id, message.message_id)
pass
async def module_init():
router = Router()
router.chat_member(ChatMemberUpdatedFilter(IS_NOT_MEMBER >> IS_MEMBER))(
new_member_handler
)
router.callback_query(VerificationCallback.filter())(
handle_inline_button_verification
)
router.message()(handle_input_verification)
register_router(router)

View File

@ -0,0 +1,21 @@
from aiogram.filters.callback_data import CallbackData
class VerificationMethod:
pass
class InputVerificationMethod(VerificationMethod):
def verify(input_value: str, task_data):
pass
class InlineButtonVerificationMethod(VerificationMethod):
def verify(input_value: str, task_data):
pass
class VerificationCallback(CallbackData, prefix="verify"):
user_id: int
chat_id: int
answer: str = None

View File

@ -0,0 +1,74 @@
import random
from aiogram import Bot
from aiogram.types import ChatMemberUpdated, InlineKeyboardButton, InlineKeyboardMarkup
from .base import (
InlineButtonVerificationMethod,
InputVerificationMethod,
VerificationCallback,
)
class IAmHumanButton(InlineButtonVerificationMethod):
def __init__(self):
pass
def method_name(self):
return "i_am_human_button"
async def create_task(self, event: ChatMemberUpdated, bot: Bot):
user_id = event.from_user.id
chat_id = event.chat.id
keyboard = InlineKeyboardMarkup(
inline_keyboard=[
[
InlineKeyboardButton(
text="Я человек!",
callback_data=VerificationCallback(
user_id=user_id, chat_id=chat_id, answer="OK"
).pack(),
)
]
]
)
message = await bot.send_message(
chat_id,
f"Привет, {event.from_user.first_name}! "
"Нажмите кнопку, чтобы подтвердить, что вы не робот.",
reply_markup=keyboard,
)
return {"message_id": message.message_id}
async def verify(self, task_data):
return True
class IAmHumanInput(InputVerificationMethod):
def __init__(self):
pass
def method_name(self):
return "i_am_human_input"
def get_text(self):
return random.choice(["Я человек", "Я не робот"]) # nosec
async def create_task(self, event: ChatMemberUpdated, bot: Bot):
chat_id = event.chat.id
text = self.get_text()
message = await bot.send_message(
chat_id,
f"Привет, {event.from_user.first_name}! "
f'Напишите "{text}", чтобы подтвердить, что вы не робот.',
)
return {"message_id": message.message_id, "correct": text}
async def verify(self, task_data):
correct: str = task_data["correct"]
answer: str = task_data["answer"]
return answer.lower() == correct.lower()

View File

@ -0,0 +1,111 @@
import random
from aiogram import Bot
from aiogram.types import ChatMemberUpdated, InlineKeyboardButton, InlineKeyboardMarkup
from .base import (
InlineButtonVerificationMethod,
InputVerificationMethod,
VerificationCallback,
)
class MathInputVerificationMethod(InputVerificationMethod):
def __init__(self):
pass
def method_name(self):
return "math_input"
def generate_math_problem(self):
a = random.randint(1, 10) # nosec
b = random.randint(1, 10) # nosec
operation = random.choice(["+", "-", "*"]) # nosec
if operation == "+":
answer = a + b
elif operation == "-":
answer = a - b
else:
answer = a * b
return f"{a} {operation} {b}", str(answer)
async def create_task(self, event: ChatMemberUpdated, bot: Bot):
chat_id = event.chat.id
problem, answer = self.generate_math_problem()
message = await bot.send_message(
chat_id,
f"Привет, {event.from_user.first_name}! "
"Решите простую математическую задачу, "
f"чтобы подтвердить, что вы не робот: {problem} = ?",
)
return {"message_id": message.message_id, "correct": answer}
async def verify(self, task_data):
correct: str = task_data["correct"]
answer: str = task_data["answer"]
return answer.strip() == correct
class MathButtonsVerification(InlineButtonVerificationMethod):
def __init__(self):
pass
def method_name(self):
return "math_buttons"
def generate_math_problem(self):
a = random.randint(1, 10) # nosec
b = random.randint(1, 10) # nosec
operation = random.choice(["+", "-", "*"]) # nosec
if operation == "+":
answer = a + b
elif operation == "-":
answer = a - b
else:
answer = a * b
return f"{a} {operation} {b}", answer
async def create_task(self, event: ChatMemberUpdated, bot: Bot):
user_id = event.from_user.id
chat_id = event.chat.id
problem, correct_answer = self.generate_math_problem()
options = [correct_answer]
while len(options) < 4:
wrong_answer = random.randint(
max(1, correct_answer - 5), correct_answer + 5
) # nosec
if wrong_answer not in options:
options.append(wrong_answer)
random.shuffle(options) # nosec
keyboard = InlineKeyboardMarkup(
inline_keyboard=[
[
InlineKeyboardButton(
text=str(option),
callback_data=VerificationCallback(
user_id=user_id, chat_id=chat_id, answer=str(option)
).pack(),
)
for option in options
]
]
)
message = await bot.send_message(
chat_id,
f"Привет, {event.from_user.first_name}! "
"Решите простую математическую задачу, "
f"чтобы подтвердить, что вы не робот: {problem} = ?",
reply_markup=keyboard,
)
return {"message_id": message.message_id, "correct": str(correct_answer)}
async def verify(self, task_data):
correct: str = task_data["correct"]
answer: str = task_data["answer"]
return answer == correct

View File

@ -0,0 +1,120 @@
import random
from aiogram import Bot
from aiogram.types import ChatMemberUpdated, InlineKeyboardButton, InlineKeyboardMarkup
from .base import (
InlineButtonVerificationMethod,
InputVerificationMethod,
VerificationCallback,
)
QUESTIONS = [
(
"Какой город является столицей России?",
"Москва",
["Санкт-Петербург", "Новосибирск", "Екатеринбург"],
),
(
"Какой город называют северной столицей России?",
"Санкт-Петербург",
["Владивосток", "Новосибирск", "Екатеринбург"],
),
("Какая национальная валюта в России?", "Рубль", ["Евро", "Доллар", "Юань"]),
("Год окончания Великой Отечественной войны?", "1945", ["2024", "862", "1721"]),
(
"Самая БОЛЬШАЯ страна по площади?",
"Россия",
["Люксембург", "Ватикан", "Лихтенштейн"],
),
("Сколько лап у кошки?", "4", ["10", "12", "14"]),
("Сколько ног у осьминога?", "8", ["6", "10", "12"]),
(
"Какой день недели идет после понедельника?",
"Вторник",
["Среда", "Четверг", "Пятница"],
),
("Сколько часов в сутках?", "24", ["12", "48", "60"]),
("Какой месяц самый короткий?", "Февраль", ["Март", "Апрель", "Май"]),
]
class QuestionInputVerification(InputVerificationMethod):
def __init__(self):
pass
def method_name(self):
return "question_input"
def get_random_question(self):
return random.choice(QUESTIONS) # nosec
async def create_task(self, event: ChatMemberUpdated, bot: Bot):
chat_id = event.chat.id
question, answer, _ = self.get_random_question()
message = await bot.send_message(
chat_id,
f"Привет, {event.from_user.first_name}! "
"Пожалуйста, ответьте на следующий вопрос, "
f"чтобы подтвердить, что вы не робот: {question}",
)
return {"message_id": message.message_id, "correct": answer.lower()}
async def verify(self, task_data):
correct: str = task_data["correct"]
answer: str = task_data["answer"]
return answer.lower().strip() == correct
class QuestionButtonsVerification(InlineButtonVerificationMethod):
def __init__(self):
pass
def method_name(self):
return "question_inline"
def get_random_question(self):
return random.choice(QUESTIONS) # nosec
async def create_task(self, event: ChatMemberUpdated, bot: Bot):
user_id = event.from_user.id
chat_id = event.chat.id
question, correct_answer, wrong_answers = self.get_random_question()
options = [correct_answer] + wrong_answers
random.shuffle(options) # nosec
keyboard = InlineKeyboardMarkup(
inline_keyboard=[
[
InlineKeyboardButton(
text=option,
callback_data=VerificationCallback(
user_id=user_id, chat_id=chat_id, answer=str(i)
).pack(),
)
for i, option in enumerate(options)
]
]
)
message = await bot.send_message(
chat_id,
f"Привет, {event.from_user.first_name}! "
"Пожалуйста, ответьте на следующий вопрос, "
f"чтобы подтвердить, что вы не робот: {question}",
reply_markup=keyboard,
)
return {
"message_id": message.message_id,
"correct": correct_answer,
"options": options,
}
async def verify(self, task_data):
correct: str = task_data["correct"]
answer: str = task_data["answer"]
return task_data["options"][int(answer)] == correct