diff --git a/src/ocab_modules/ocab_modules/standard/command_helper/main.py b/src/ocab_modules/ocab_modules/standard/command_helper/main.py
index d33d1ba..f7c417e 100644
--- a/src/ocab_modules/ocab_modules/standard/command_helper/main.py
+++ b/src/ocab_modules/ocab_modules/standard/command_helper/main.py
@@ -1,11 +1,10 @@
from aiogram.types import BotCommand
-from ocab_core.modules_system.public_api import (
- set_my_commands, log
-)
+from ocab_core.modules_system.public_api import set_my_commands
commands = dict()
+
def register_command(command, description, role="USER"):
if role not in commands:
commands[role] = dict()
@@ -26,7 +25,7 @@ async def set_user_commands():
)
)
- log(bot_commands)
+ # log(bot_commands)
await set_my_commands(
bot_commands,
diff --git a/src/ocab_modules/ocab_modules/standard/filters/filters.py b/src/ocab_modules/ocab_modules/standard/filters/filters.py
index 3457048..4370baf 100644
--- a/src/ocab_modules/ocab_modules/standard/filters/filters.py
+++ b/src/ocab_modules/ocab_modules/standard/filters/filters.py
@@ -54,6 +54,11 @@ class ChatIDFilter(BaseFilter):
chat_id = message.chat.id
approved_chats = self.approved_chats or get_approved_chat_id()
+
+ # Если список для фильтрации пуст - разрешаем всем.
+ if len(approved_chats) == 0:
+ return True
+
res = chat_id in approved_chats
return res ^ (self.blacklist)
diff --git a/src/ocab_modules/ocab_modules/standard/welcome/__init__.py b/src/ocab_modules/ocab_modules/standard/welcome/__init__.py
index c8fccb0..91f142d 100644
--- a/src/ocab_modules/ocab_modules/standard/welcome/__init__.py
+++ b/src/ocab_modules/ocab_modules/standard/welcome/__init__.py
@@ -1 +1 @@
-from .main import module_init
+from .main import module_init, module_late_init
diff --git a/src/ocab_modules/ocab_modules/standard/welcome/info.json b/src/ocab_modules/ocab_modules/standard/welcome/info.json
index 356aa8f..bf99dba 100644
--- a/src/ocab_modules/ocab_modules/standard/welcome/info.json
+++ b/src/ocab_modules/ocab_modules/standard/welcome/info.json
@@ -6,6 +6,9 @@
"version": "1.0.0",
"privileged": true,
"dependencies": {
+ "required": {
+ "standard.config": "^1.0.0"
+ },
"optional": {
"standard.command_helper": "^1.0.0",
"standard.filters": "^1.0.0"
diff --git a/src/ocab_modules/ocab_modules/standard/welcome/main.py b/src/ocab_modules/ocab_modules/standard/welcome/main.py
index 1cbb27c..0d7de71 100644
--- a/src/ocab_modules/ocab_modules/standard/welcome/main.py
+++ b/src/ocab_modules/ocab_modules/standard/welcome/main.py
@@ -1,76 +1,182 @@
import asyncio
import random
+from string import Template
+from typing import TYPE_CHECKING
from aiogram import Bot, Router, types
+from aiogram.enums import ChatMemberStatus, ParseMode
from aiogram.exceptions import TelegramBadRequest
-from aiogram.filters import IS_MEMBER, IS_NOT_MEMBER, ChatMemberUpdatedFilter
-from aiogram.types import ChatMemberUpdated
+from aiogram.filters import JOIN_TRANSITION, LEAVE_TRANSITION, ChatMemberUpdatedFilter
+from aiogram.types import ChatMemberUpdated, PollAnswer
-from ocab_core.modules_system.public_api import log, register_router
+from ocab_core.modules_system.public_api import get_module, log, register_router
-from .verifications_methods.base import VerificationCallback
-from .verifications_methods.iamhuman import IAmHumanButton, IAmHumanInput
-from .verifications_methods.math import (
- MathButtonsVerification,
- MathInputVerificationMethod,
-)
-from .verifications_methods.question import (
- QuestionButtonsVerification,
- QuestionInputVerification,
+from .utils import MultiKeyDict, get_plural_form, key_from_poll, key_from_user_chat
+from .verifications_methods.base import BaseTask, VerificationCallback
+from .verifications_methods.math import MathInlineButtonsTask, MathPollTask
+from .verifications_methods.question import QuestionInlineButtonsTask, QuestionPollTask
+from .verifications_methods.simple import (
+ SimpleVariantsBaseTask,
+ SimpleVariantsBaseTaskConfig,
)
+from .verifications_methods.utils import user_mention
-verification_methods = [
- IAmHumanButton(),
- IAmHumanInput(),
- MathButtonsVerification(),
- MathInputVerificationMethod(),
- QuestionButtonsVerification(),
- QuestionInputVerification(),
+if TYPE_CHECKING:
+ from ocab_modules.standard.config import IConfig
+ from ocab_modules.standard.filters import ChatIDFilter as IChatIDFilter
+
+config: "IConfig" = get_module("standard.config", "config")
+
+try:
+ ChatIDFilter: "type[IChatIDFilter]" = get_module("standard.filters", "ChatIDFilter")
+ FILTERS_MODULE_LOADED = True
+except Exception:
+ FILTERS_MODULE_LOADED = False
+ pass
+
+all_tasks = [
+ MathInlineButtonsTask,
+ MathPollTask,
+ QuestionInlineButtonsTask,
+ QuestionPollTask,
]
-verification_tasks = {}
+
+class TaskManager:
+ def __init__(self, config: "IConfig"):
+ self.config = config
+ self.available_tasks = []
+ self.max_attempts = 1
+
+ def init(self):
+ for cls in all_tasks:
+ type_name = cls.type_name()
+ if self.config.get(f"welcome::tasks::{type_name}::enabled"):
+ log(f"Task {cls.type_name()} enabled")
+ self.available_tasks.append(cls)
+
+ self.max_attempts = self.config.get("welcome::max_attempts")
+
+ def build_random_task(self, event, bot, attempt_number=1) -> BaseTask:
+ cls = random.choice(self.available_tasks) # nosec
+ obj = cls(
+ event,
+ bot,
+ timeout_func=verify_timeout,
+ attempt_number=attempt_number,
+ max_attempts=self.max_attempts,
+ )
+
+ if isinstance(obj, SimpleVariantsBaseTask):
+ cfg = SimpleVariantsBaseTaskConfig(obj.type_name(), self.config)
+ obj.set_config(cfg)
+
+ return obj
+
+
+verification_tasks = MultiKeyDict()
+last_success = {}
async def new_member_handler(event: ChatMemberUpdated, bot: Bot):
- if event.new_chat_member.status == "member":
- user_id = event.from_user.id
- chat_id = event.chat.id
+ # НЕ СРАБОТАЕТ, ЕСЛИ ЧЕЛОВЕК УЖЕ ОГРАНИЧЕН В ПРАВАХ (RESTRICTED)
+ if event.new_chat_member.status == ChatMemberStatus.MEMBER:
+ task = task_manager.build_random_task(event, bot)
+ keys = await task.run()
- method = random.choice(verification_methods) # nosec
- task_data = await method.create_task(event, bot)
-
- task_data["user_id"] = user_id
- task_data["chat_id"] = chat_id
- task_data["method"] = method
-
- task = asyncio.create_task(verify_timeout(bot, task_data))
-
- verification_tasks[(user_id, chat_id)] = {
- "task": task,
- "task_data": task_data,
- }
+ verification_tasks.add(task, keys)
-async def verify_timeout(bot: Bot, task_data: dict):
+async def left_member_handler(event: ChatMemberUpdated, bot: Bot):
+ user_id = event.from_user.id
+ chat_id = event.chat.id
+
+ key = key_from_user_chat(user_id, chat_id)
+
+ if not verification_tasks.exists(key):
+ return
+
+ task = verification_tasks.get(key)
+ await task.end(success=False)
+
+ verification_tasks.pop((user_id, chat_id), None)
+
+
+async def verify_timeout(task: BaseTask):
+ user_id = task.from_user_id
+ chat_id = task.from_chat_id
+
try:
- chat_id = task_data["chat_id"]
- user_id = task_data["user_id"]
+ timeout = task.get_timeout()
+ # log(f"Start timeout {timeout}")
+ await asyncio.sleep(timeout)
- await asyncio.sleep(30)
- try:
- if "message_id" in task_data:
- await bot.delete_message(chat_id, task_data["message_id"])
- except TelegramBadRequest:
- return
-
- chat_member = await bot.get_chat_member(chat_id, user_id)
- if chat_member.status == "member":
- await bot.ban_chat_member(chat_id, user_id)
+ await task.end(success=False)
+ await task.bot.ban_chat_member(chat_id, user_id)
except Exception as e:
log(f"Error in verify_timeout: {e}")
finally:
- verification_tasks.pop((user_id, chat_id), None)
+ verification_tasks.remove(key_from_user_chat(user_id, chat_id))
+
+
+async def success_end(task: BaseTask):
+ await task.end()
+
+ await asyncio.sleep(3)
+
+ if task.from_chat_id in last_success:
+ message_id = last_success.pop(task.from_chat_id)
+ try:
+ await task.bot.delete_message(task.from_chat_id, message_id)
+ except TelegramBadRequest:
+ pass
+
+ if config.get("welcome::show_success_message"):
+ message = await task.bot.send_message(
+ task.from_chat_id,
+ Template(config.get("welcome::success_message")).substitute(
+ mention=user_mention(task.from_user)
+ ),
+ parse_mode=ParseMode.HTML,
+ )
+
+ last_success[task.from_chat_id] = message.message_id
+
+
+async def handle_poll_verification(answer: PollAnswer, bot: Bot):
+ key = key_from_poll(answer.poll_id)
+ if not verification_tasks.exists(key):
+ return
+
+ task: BaseTask = verification_tasks.get(key)
+
+ if task.from_user_id != answer.user.id:
+ return
+
+ result = await task.verify(answer.option_ids[0])
+
+ if result:
+ await success_end(task)
+ return
+
+ await task.end(success=False)
+
+ current_attempt = task.attempt_number
+
+ if current_attempt >= task_manager.max_attempts:
+ await task.bot.ban_chat_member(task.from_chat_id, task.from_user_id)
+ return
+
+ await asyncio.sleep(5)
+
+ current_attempt = current_attempt + 1
+ new_task = task_manager.build_random_task(
+ task.event, task.bot, attempt_number=current_attempt
+ )
+ keys = await new_task.run()
+ log(keys)
+ verification_tasks.add(new_task, keys)
async def handle_inline_button_verification(
@@ -79,55 +185,178 @@ async def handle_inline_button_verification(
user_id = callback_data.user_id
chat_id = callback_data.chat_id
- if callback_query.from_user.id == user_id:
- if (user_id, chat_id) in verification_tasks:
- task = verification_tasks[(user_id, chat_id)]
- task_data = task["task_data"]
- method = task_data["method"]
- task_data["answer"] = callback_data.answer
-
- result = await method.verify(task_data)
-
- if result:
- task["task"].cancel()
- await bot.delete_message(chat_id, callback_query.message.message_id)
- else:
- await callback_query.answer("Неправильный ответ!", show_alert=True)
- pass
- else:
+ if callback_query.from_user.id != user_id:
await callback_query.answer("Эта кнопка не для вас!", show_alert=True)
+ return
+
+ key = key_from_user_chat(user_id, chat_id)
+
+ if not verification_tasks.exists(key):
+ await callback_query.answer()
+ return
+
+ task: BaseTask = verification_tasks.get(key)
+
+ result = await task.verify(callback_data.answer)
+
+ if result:
+ await success_end(task)
+ return
+
+ await task.end(success=False)
+
+ current_attempt = task.attempt_number
+
+ if current_attempt >= task_manager.max_attempts:
+ await callback_query.answer()
+ await task.bot.ban_chat_member(chat_id, user_id)
+ return
+
+ await callback_query.answer(
+ Template(config.get("welcome::retry_message")).substitute(
+ attempts=get_plural_form(
+ task_manager.max_attempts - current_attempt,
+ "попытка",
+ "попытки",
+ "попыток",
+ )
+ ),
+ show_alert=True,
+ )
+ current_attempt = current_attempt + 1
+ new_task = task_manager.build_random_task(
+ task.event, task.bot, attempt_number=current_attempt
+ )
+ keys = await new_task.run()
+ verification_tasks.add(new_task, keys)
-async def handle_input_verification(message: types.Message, bot: Bot):
- user_id = message.from_user.id
- chat_id = message.chat.id
+config: "IConfig" = get_module("standard.config", "config")
- if (user_id, chat_id) in verification_tasks:
- task = verification_tasks[(user_id, chat_id)]
- task_data = task["task_data"]
- method = task_data["method"]
- task_data["answer"] = message.text
-
- result = await method.verify(task_data)
-
- if result:
- task["task"].cancel()
- await bot.delete_message(chat_id, task_data["message_id"])
- await bot.delete_message(chat_id, message.message_id)
-
- pass
+task_manager = TaskManager(config)
async def module_init():
+ config.register("welcome::timeout", "int", default_value=60)
+ config.register("welcome::max_attempts", "int", default_value=5)
+ config.register(
+ "welcome::retry_message",
+ "string",
+ default_value="Неправильный ответ! У вас еще $attempts.",
+ )
+ config.register("welcome::show_success_message", "boolean", default_value=True)
+ config.register(
+ "welcome::success_message",
+ "string",
+ default_value="$mention, вы успешно прошли проверку!",
+ )
+
+ # MATH BUTTONS
+
+ config.register(
+ "welcome::tasks::math_buttons::enabled", "boolean", default_value=False
+ )
+ config.register(
+ "welcome::tasks::math_buttons::message_text",
+ "int",
+ default_value="Привет, $mention!\n"
+ "Ответьте на вопрос, "
+ "чтобы подтвердить, что вы не робот:\n\n$task",
+ )
+ config.register(
+ "welcome::tasks::math_buttons::retry_message_text",
+ "int",
+ default_value="$mention, неправильный ответ! У вас еще $attempts\n"
+ "Ответьте на вопрос, "
+ "чтобы подтвердить, что вы не робот:\n\n$task",
+ )
+ config.register("welcome::tasks::math_buttons::timeout", "int", default_value=None)
+
+ # MATH POLL
+
+ config.register("welcome::tasks::math_poll::enabled", "boolean", default_value=True)
+ config.register(
+ "welcome::tasks::math_poll::message_text",
+ "string",
+ default_value="Привет, $mention!\n"
+ "Ответьте на вопрос, "
+ "чтобы подтвердить, что вы не робот:\n\n$task",
+ )
+ config.register(
+ "welcome::tasks::math_poll::retry_message_text",
+ "string",
+ default_value="$mention, неправильный ответ! У вас еще $attempts\n"
+ "Ответьте на вопрос, "
+ "чтобы подтвердить, что вы не робот:\n\n$task",
+ )
+ config.register("welcome::tasks::math_poll::timeout", "int", default_value=None)
+
+ # QUESTION BUTTONS
+
+ config.register(
+ "welcome::tasks::question_buttons::enabled", "boolean", default_value=False
+ )
+ config.register(
+ "welcome::tasks::question_buttons::message_text",
+ "string",
+ default_value="Привет, $mention!\n"
+ "Ответьте на вопрос, "
+ "чтобы подтвердить, что вы не робот:\n\n$task",
+ )
+ config.register(
+ "welcome::tasks::question_buttons::retry_message_text",
+ "string",
+ default_value="$mention, неправильный ответ! У вас еще $attempts.\n"
+ "Ответьте на вопрос, "
+ "чтобы подтвердить, что вы не робот:\n\n$task",
+ )
+ config.register(
+ "welcome::tasks::question_buttons::timeout", "int", default_value=None
+ )
+
+ # QUESTION POLL
+
+ config.register(
+ "welcome::tasks::question_poll::enabled", "boolean", default_value=True
+ )
+ config.register(
+ "welcome::tasks::question_poll::message_text",
+ "string",
+ default_value="Привет, $mention!\n"
+ "Ответьте на вопрос, "
+ "чтобы подтвердить, что вы не робот:\n\n$task",
+ )
+ config.register(
+ "welcome::tasks::question_poll::retry_message_text",
+ "string",
+ default_value="$mention, неправильный ответ! У вас еще $attempts.\n"
+ "Ответьте на вопрос, "
+ "чтобы подтвердить, что вы не робот:\n\n$task",
+ )
+ config.register("welcome::tasks::question_poll::timeout", "int", default_value=None)
+
router = Router()
- router.chat_member(ChatMemberUpdatedFilter(IS_NOT_MEMBER >> IS_MEMBER))(
+ common_filters_pre = []
+
+ if FILTERS_MODULE_LOADED:
+ common_filters_pre.append(ChatIDFilter())
+
+ router.chat_member(*common_filters_pre, ChatMemberUpdatedFilter(JOIN_TRANSITION))(
new_member_handler
)
-
- router.callback_query(VerificationCallback.filter())(
+ router.chat_member(*common_filters_pre, ChatMemberUpdatedFilter(LEAVE_TRANSITION))(
+ left_member_handler
+ )
+ router.callback_query(*common_filters_pre, VerificationCallback.filter())(
handle_inline_button_verification
)
- router.message()(handle_input_verification)
+
+ # Нельзя применить ChatIDFilter из-за отстутсвия id чата
+ router.poll_answer()(handle_poll_verification)
register_router(router)
+
+
+async def module_late_init():
+ task_manager.init()
diff --git a/src/ocab_modules/ocab_modules/standard/welcome/utils.py b/src/ocab_modules/ocab_modules/standard/welcome/utils.py
new file mode 100644
index 0000000..86ec5e9
--- /dev/null
+++ b/src/ocab_modules/ocab_modules/standard/welcome/utils.py
@@ -0,0 +1,45 @@
+def get_plural_form(number, singular, genitive_singular, plural):
+ if 11 <= number % 100 <= 19:
+ return f"{number} {plural}"
+ elif number % 10 == 1:
+ return f"{number} {singular}"
+ elif 2 <= number % 10 <= 4:
+ return f"{number} {genitive_singular}"
+ else:
+ return f"{number} {plural}"
+
+
+class MultiKeyDict:
+ def __init__(self):
+ self.value_to_keys = {} # Словарь значений и связанных с ними ключей
+ self.key_to_value = {} # Словарь ключей и связанных с ними значений
+
+ def add(self, value, keys):
+ # Добавляем значение в словарь с множеством ключей
+ self.value_to_keys[value] = set(keys)
+
+ # Для каждого ключа создаем запись в словаре key_to_value
+ for key in keys:
+ self.key_to_value[key] = value
+
+ def get(self, key):
+ return self.key_to_value.get(key)
+
+ def exists(self, key):
+ return key in self.key_to_value
+
+ def remove(self, key):
+ if key in self.key_to_value:
+ value = self.key_to_value.pop(key)
+ self.value_to_keys[value].remove(key)
+
+ for k in self.value_to_keys[value]:
+ del self.key_to_value[k]
+
+
+def key_from_user_chat(user_id, chat_id):
+ return f"uc:{user_id}_{chat_id}"
+
+
+def key_from_poll(poll_id):
+ return f"p:{poll_id}"
diff --git a/src/ocab_modules/ocab_modules/standard/welcome/verifications_methods/base.py b/src/ocab_modules/ocab_modules/standard/welcome/verifications_methods/base.py
index aba1f27..8172900 100644
--- a/src/ocab_modules/ocab_modules/standard/welcome/verifications_methods/base.py
+++ b/src/ocab_modules/ocab_modules/standard/welcome/verifications_methods/base.py
@@ -1,18 +1,102 @@
+import asyncio
+from functools import wraps
+
+from aiogram import Bot
+from aiogram.exceptions import TelegramBadRequest
from aiogram.filters.callback_data import CallbackData
+from aiogram.types import ChatMemberUpdated
+
+from ocab_core.modules_system.public_api import log
+
+from .utils import mute_user, unmute_user
-class VerificationMethod:
- pass
+class BaseTask:
+ def __init__(
+ self,
+ event: ChatMemberUpdated,
+ bot: Bot,
+ timeout_func=None,
+ attempt_number=1,
+ max_attempts=1,
+ ):
+ self.bot = bot
+ self.event = event
+ self.timeout_func = timeout_func
+ self.attempt_number = attempt_number
+ self.max_attempts = max_attempts
+ self.timeout_func_task = None
+
+ @property
+ def from_chat_id(self):
+ return self.event.chat.id
+
+ @property
+ def from_user_id(self):
+ return self.event.from_user.id
+
+ @property
+ def from_user(self):
+ return self.event.from_user
+
+ @property
+ def attemps_left(self):
+ return self.max_attempts - self.attempt_number + 1
+
+ async def start_timeout_func(self):
+ if self.timeout_func:
+ self.timeout_func_task = asyncio.create_task(self.timeout_func(self))
+
+ @staticmethod
+ def type_name():
+ raise NotImplementedError()
+
+ async def run(self):
+ raise NotImplementedError()
+
+ async def verify(self, data):
+ raise NotImplementedError()
+
+ async def end(self, success=True):
+ raise NotImplementedError()
+
+ async def get_timeout():
+ raise NotImplementedError()
-class InputVerificationMethod(VerificationMethod):
- def verify(input_value: str, task_data):
- pass
+def mute_while_task(cls):
+ original_run = getattr(cls, "run", None)
+ original_end = getattr(cls, "end", None)
+ if not original_run and not original_end:
+ return cls
-class InlineButtonVerificationMethod(VerificationMethod):
- def verify(input_value: str, task_data):
- pass
+ @wraps(original_run)
+ async def wrapped_run(self: BaseTask):
+ chat_id = self.from_chat_id
+ user_id = self.from_user_id
+ try:
+ await mute_user(chat_id, user_id, 0, self.bot)
+ except TelegramBadRequest as e:
+ log(e)
+ pass
+ return await original_run(self)
+
+ @wraps(original_end)
+ async def wrapped_end(self: BaseTask, success=True):
+ await original_end(self, success)
+ if success:
+ chat_id = self.from_chat_id
+ user_id = self.from_user_id
+ try:
+ await unmute_user(chat_id, user_id, self.bot)
+ except TelegramBadRequest as e:
+ log(e)
+ pass
+
+ cls.run = wrapped_run
+ cls.end = wrapped_end
+ return cls
class VerificationCallback(CallbackData, prefix="verify"):
diff --git a/src/ocab_modules/ocab_modules/standard/welcome/verifications_methods/iamhuman.py b/src/ocab_modules/ocab_modules/standard/welcome/verifications_methods/iamhuman.py
deleted file mode 100644
index acd50da..0000000
--- a/src/ocab_modules/ocab_modules/standard/welcome/verifications_methods/iamhuman.py
+++ /dev/null
@@ -1,74 +0,0 @@
-import random
-
-from aiogram import Bot
-from aiogram.types import ChatMemberUpdated, InlineKeyboardButton, InlineKeyboardMarkup
-
-from .base import (
- InlineButtonVerificationMethod,
- InputVerificationMethod,
- VerificationCallback,
-)
-
-
-class IAmHumanButton(InlineButtonVerificationMethod):
- def __init__(self):
- pass
-
- def method_name(self):
- return "i_am_human_button"
-
- async def create_task(self, event: ChatMemberUpdated, bot: Bot):
- user_id = event.from_user.id
- chat_id = event.chat.id
-
- keyboard = InlineKeyboardMarkup(
- inline_keyboard=[
- [
- InlineKeyboardButton(
- text="Я человек!",
- callback_data=VerificationCallback(
- user_id=user_id, chat_id=chat_id, answer="OK"
- ).pack(),
- )
- ]
- ]
- )
-
- message = await bot.send_message(
- chat_id,
- f"Привет, {event.from_user.first_name}! "
- "Нажмите кнопку, чтобы подтвердить, что вы не робот.",
- reply_markup=keyboard,
- )
-
- return {"message_id": message.message_id}
-
- async def verify(self, task_data):
- return True
-
-
-class IAmHumanInput(InputVerificationMethod):
- def __init__(self):
- pass
-
- def method_name(self):
- return "i_am_human_input"
-
- def get_text(self):
- return random.choice(["Я человек", "Я не робот"]) # nosec
-
- async def create_task(self, event: ChatMemberUpdated, bot: Bot):
- chat_id = event.chat.id
- text = self.get_text()
- message = await bot.send_message(
- chat_id,
- f"Привет, {event.from_user.first_name}! "
- f'Напишите "{text}", чтобы подтвердить, что вы не робот.',
- )
- return {"message_id": message.message_id, "correct": text}
-
- async def verify(self, task_data):
- correct: str = task_data["correct"]
- answer: str = task_data["answer"]
-
- return answer.lower() == correct.lower()
diff --git a/src/ocab_modules/ocab_modules/standard/welcome/verifications_methods/math.py b/src/ocab_modules/ocab_modules/standard/welcome/verifications_methods/math.py
index 97e01ed..301459b 100644
--- a/src/ocab_modules/ocab_modules/standard/welcome/verifications_methods/math.py
+++ b/src/ocab_modules/ocab_modules/standard/welcome/verifications_methods/math.py
@@ -1,59 +1,9 @@
import random
-from aiogram import Bot
-from aiogram.types import ChatMemberUpdated, InlineKeyboardButton, InlineKeyboardMarkup
-
-from .base import (
- InlineButtonVerificationMethod,
- InputVerificationMethod,
- VerificationCallback,
-)
+from .simple import SimpleInlineButtonsTask, SimplePollTask, SimpleVariantsBaseTask
-class MathInputVerificationMethod(InputVerificationMethod):
- def __init__(self):
- pass
-
- def method_name(self):
- return "math_input"
-
- def generate_math_problem(self):
- a = random.randint(1, 10) # nosec
- b = random.randint(1, 10) # nosec
- operation = random.choice(["+", "-", "*"]) # nosec
- if operation == "+":
- answer = a + b
- elif operation == "-":
- answer = a - b
- else:
- answer = a * b
- return f"{a} {operation} {b}", str(answer)
-
- async def create_task(self, event: ChatMemberUpdated, bot: Bot):
- chat_id = event.chat.id
- problem, answer = self.generate_math_problem()
- message = await bot.send_message(
- chat_id,
- f"Привет, {event.from_user.first_name}! "
- "Решите простую математическую задачу, "
- f"чтобы подтвердить, что вы не робот: {problem} = ?",
- )
- return {"message_id": message.message_id, "correct": answer}
-
- async def verify(self, task_data):
- correct: str = task_data["correct"]
- answer: str = task_data["answer"]
-
- return answer.strip() == correct
-
-
-class MathButtonsVerification(InlineButtonVerificationMethod):
- def __init__(self):
- pass
-
- def method_name(self):
- return "math_buttons"
-
+class BaseMathTask(SimpleVariantsBaseTask):
def generate_math_problem(self):
a = random.randint(1, 10) # nosec
b = random.randint(1, 10) # nosec
@@ -66,46 +16,38 @@ class MathButtonsVerification(InlineButtonVerificationMethod):
answer = a * b
return f"{a} {operation} {b}", answer
- async def create_task(self, event: ChatMemberUpdated, bot: Bot):
- user_id = event.from_user.id
- chat_id = event.chat.id
-
+ async def init(self):
problem, correct_answer = self.generate_math_problem()
- options = [correct_answer]
- while len(options) < 4:
+ self.variants = [correct_answer]
+ while len(self.variants) < 4:
wrong_answer = random.randint(
- max(1, correct_answer - 5), correct_answer + 5
+ correct_answer - 5, correct_answer + 5
) # nosec
- if wrong_answer not in options:
- options.append(wrong_answer)
- random.shuffle(options) # nosec
+ if wrong_answer not in self.variants:
+ self.variants.append(wrong_answer)
+ random.shuffle(self.variants) # nosec
- keyboard = InlineKeyboardMarkup(
- inline_keyboard=[
- [
- InlineKeyboardButton(
- text=str(option),
- callback_data=VerificationCallback(
- user_id=user_id, chat_id=chat_id, answer=str(option)
- ).pack(),
- )
- for option in options
- ]
- ]
- )
+ self.variants = [str(x) for x in self.variants]
- message = await bot.send_message(
- chat_id,
- f"Привет, {event.from_user.first_name}! "
- "Решите простую математическую задачу, "
- f"чтобы подтвердить, что вы не робот: {problem} = ?",
- reply_markup=keyboard,
- )
+ self.task = f"{problem} = ?"
+ self.correct = str(correct_answer)
- return {"message_id": message.message_id, "correct": str(correct_answer)}
- async def verify(self, task_data):
- correct: str = task_data["correct"]
- answer: str = task_data["answer"]
+class MathInlineButtonsTask(BaseMathTask, SimpleInlineButtonsTask):
+ """
+ Математическая задача с выбором через inline-кнопки
+ """
- return answer == correct
+ @staticmethod
+ def type_name():
+ return "math_buttons"
+
+
+class MathPollTask(BaseMathTask, SimplePollTask):
+ """
+ Математическая задача с выбором через Poll
+ """
+
+ @staticmethod
+ def type_name():
+ return "math_poll"
diff --git a/src/ocab_modules/ocab_modules/standard/welcome/verifications_methods/question.py b/src/ocab_modules/ocab_modules/standard/welcome/verifications_methods/question.py
index a7b51ea..4e28139 100644
--- a/src/ocab_modules/ocab_modules/standard/welcome/verifications_methods/question.py
+++ b/src/ocab_modules/ocab_modules/standard/welcome/verifications_methods/question.py
@@ -1,13 +1,9 @@
import random
-from aiogram import Bot
-from aiogram.types import ChatMemberUpdated, InlineKeyboardButton, InlineKeyboardMarkup
+from aiogram.types import InlineKeyboardButton, InlineKeyboardMarkup
-from .base import (
- InlineButtonVerificationMethod,
- InputVerificationMethod,
- VerificationCallback,
-)
+from .base import VerificationCallback
+from .simple import SimpleInlineButtonsTask, SimplePollTask, SimpleVariantsBaseTask
QUESTIONS = [
(
@@ -39,82 +35,45 @@ QUESTIONS = [
]
-class QuestionInputVerification(InputVerificationMethod):
- def __init__(self):
- pass
-
- def method_name(self):
- return "question_input"
-
- def get_random_question(self):
- return random.choice(QUESTIONS) # nosec
-
- async def create_task(self, event: ChatMemberUpdated, bot: Bot):
- chat_id = event.chat.id
- question, answer, _ = self.get_random_question()
- message = await bot.send_message(
- chat_id,
- f"Привет, {event.from_user.first_name}! "
- "Пожалуйста, ответьте на следующий вопрос, "
- f"чтобы подтвердить, что вы не робот: {question}",
- )
- return {"message_id": message.message_id, "correct": answer.lower()}
-
- async def verify(self, task_data):
- correct: str = task_data["correct"]
- answer: str = task_data["answer"]
-
- return answer.lower().strip() == correct
-
-
-class QuestionButtonsVerification(InlineButtonVerificationMethod):
- def __init__(self):
- pass
-
- def method_name(self):
- return "question_inline"
-
- def get_random_question(self):
- return random.choice(QUESTIONS) # nosec
-
- async def create_task(self, event: ChatMemberUpdated, bot: Bot):
- user_id = event.from_user.id
- chat_id = event.chat.id
-
- question, correct_answer, wrong_answers = self.get_random_question()
+class BaseQuestionsTask(SimpleVariantsBaseTask):
+ async def init(self):
+ question, correct_answer, wrong_answers = random.choice(QUESTIONS) # nosec
options = [correct_answer] + wrong_answers
random.shuffle(options) # nosec
- keyboard = InlineKeyboardMarkup(
+ self.variants = [str(x) for x in options]
+
+ self.task = question
+ self.correct = correct_answer
+
+ async def verify(self, data):
+ return self.variants[int(data)] == self.correct
+
+
+class QuestionInlineButtonsTask(BaseQuestionsTask, SimpleInlineButtonsTask):
+ @staticmethod
+ def type_name():
+ return "question_buttons"
+
+ def build_keyboard(self):
+ return InlineKeyboardMarkup(
inline_keyboard=[
[
InlineKeyboardButton(
- text=option,
+ text=str(option),
callback_data=VerificationCallback(
- user_id=user_id, chat_id=chat_id, answer=str(i)
+ user_id=self.from_user_id,
+ chat_id=self.from_chat_id,
+ answer=str(i),
).pack(),
)
- for i, option in enumerate(options)
]
+ for i, option in enumerate(self.variants)
]
)
- message = await bot.send_message(
- chat_id,
- f"Привет, {event.from_user.first_name}! "
- "Пожалуйста, ответьте на следующий вопрос, "
- f"чтобы подтвердить, что вы не робот: {question}",
- reply_markup=keyboard,
- )
- return {
- "message_id": message.message_id,
- "correct": correct_answer,
- "options": options,
- }
-
- async def verify(self, task_data):
- correct: str = task_data["correct"]
- answer: str = task_data["answer"]
-
- return task_data["options"][int(answer)] == correct
+class QuestionPollTask(BaseQuestionsTask, SimplePollTask):
+ @staticmethod
+ def type_name():
+ return "question_poll"
diff --git a/src/ocab_modules/ocab_modules/standard/welcome/verifications_methods/simple.py b/src/ocab_modules/ocab_modules/standard/welcome/verifications_methods/simple.py
new file mode 100644
index 0000000..f396822
--- /dev/null
+++ b/src/ocab_modules/ocab_modules/standard/welcome/verifications_methods/simple.py
@@ -0,0 +1,178 @@
+from string import Template
+
+from aiogram import Bot
+from aiogram.enums import ParseMode, PollType
+from aiogram.types import ChatMemberUpdated, InlineKeyboardButton, InlineKeyboardMarkup
+
+from ..utils import get_plural_form, key_from_poll, key_from_user_chat
+from .base import BaseTask, VerificationCallback, mute_while_task
+from .utils import user_mention
+
+
+class SimpleBaseTask(BaseTask):
+ pass
+
+
+class SimpleVariantsBaseTaskConfig:
+ def __init__(self, task_type, config: dict):
+ self.config = config
+ self.task_type = task_type
+
+ @property
+ def timeout(self):
+ timeout = self.config.get(f"welcome::tasks::{self.task_type}::timeout")
+
+ if timeout is None:
+ return self.config.get("welcome::timeout")
+
+ return timeout
+
+ @property
+ def task_message_text(self):
+ return self.config.get(f"welcome::tasks::{self.task_type}::message_text")
+
+ @property
+ def task_retry_message_text(self):
+ return self.config.get(f"welcome::tasks::{self.task_type}::retry_message_text")
+
+
+class SimpleVariantsBaseTask(SimpleBaseTask):
+ def __init__(self, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+
+ self.config = None
+
+ self.variants = []
+ self.task = ""
+ self.correct = None
+
+ self.task_message_id = None
+
+ def set_config(self, cfg: SimpleVariantsBaseTaskConfig):
+ self.config = cfg
+
+ def get_timeout(self):
+ return self.config.timeout
+
+
+@mute_while_task
+class SimpleInlineButtonsTask(SimpleVariantsBaseTask):
+ async def init(self):
+ raise NotImplementedError()
+
+ def build_keyboard(self):
+ return InlineKeyboardMarkup(
+ inline_keyboard=[
+ [
+ InlineKeyboardButton(
+ text=str(option),
+ callback_data=VerificationCallback(
+ user_id=self.from_user_id,
+ chat_id=self.from_chat_id,
+ answer=str(option),
+ ).pack(),
+ )
+ for option in self.variants
+ ]
+ ]
+ )
+
+ async def run(self):
+ await self.init()
+
+ message_template = Template(
+ self.config.task_message_text
+ if self.attempt_number == 1
+ else self.config.task_retry_message_text
+ )
+
+ chat_id = self.from_chat_id
+ message = await self.bot.send_message(
+ chat_id,
+ text=message_template.substitute(
+ mention=user_mention(self.from_user),
+ task=self.task,
+ attempts=get_plural_form(
+ self.attemps_left, "попытка", "попытки", "попыток"
+ ),
+ ),
+ reply_markup=self.build_keyboard(),
+ parse_mode=ParseMode.HTML,
+ )
+
+ self.task_message_id = message.message_id
+
+ await self.start_timeout_func()
+
+ return [key_from_user_chat(self.from_user_id, self.from_chat_id)]
+
+ async def verify(self, data):
+ return self.correct == data
+
+ async def end(self, success=True):
+ await self.bot.delete_message(self.from_chat_id, self.task_message_id)
+ if self.timeout_func_task:
+ self.timeout_func_task.cancel()
+
+
+@mute_while_task
+class SimplePollTask(SimpleVariantsBaseTask):
+ def __init__(
+ self,
+ event: ChatMemberUpdated,
+ bot: Bot,
+ timeout_func=None,
+ attempt_number=1,
+ max_attempts=1,
+ ):
+ super().__init__(event, bot, timeout_func, attempt_number, max_attempts)
+ self.correct_index = None
+
+ async def init(self):
+ raise NotImplementedError()
+
+ async def run(self):
+ await self.init()
+
+ self.correct_index = self.variants.index(self.correct)
+
+ message_template = Template(
+ self.config.task_message_text
+ if self.attempt_number == 1
+ else self.config.task_retry_message_text
+ )
+
+ chat_id = self.from_chat_id
+ message = await self.bot.send_poll(
+ chat_id,
+ question=message_template.substitute(
+ mention=self.from_user.first_name,
+ task=self.task,
+ attempts=get_plural_form(
+ self.attemps_left, "попытка", "попытки", "попыток"
+ ),
+ ),
+ options=self.variants,
+ type=PollType.QUIZ,
+ correct_option_id=self.correct_index,
+ allows_multiple_answers=False,
+ is_anonymous=False,
+ # parse_mode=ParseMode.HTML
+ )
+
+ self.task_message_id = message.message_id
+
+ await self.start_timeout_func()
+
+ return [
+ key_from_poll(message.poll.id),
+ key_from_user_chat(self.from_user_id, self.from_chat_id),
+ ]
+
+ async def verify(self, data):
+ return self.correct_index == data
+
+ async def end(self, success=True):
+ await self.bot.delete_message(self.from_chat_id, self.task_message_id)
+ if self.timeout_func_task:
+ self.timeout_func_task.cancel()
diff --git a/src/ocab_modules/ocab_modules/standard/welcome/verifications_methods/utils.py b/src/ocab_modules/ocab_modules/standard/welcome/verifications_methods/utils.py
new file mode 100644
index 0000000..f544735
--- /dev/null
+++ b/src/ocab_modules/ocab_modules/standard/welcome/verifications_methods/utils.py
@@ -0,0 +1,64 @@
+import time
+
+from aiogram import Bot
+from aiogram.enums import ParseMode
+from aiogram.types import ChatPermissions, User
+
+
+def user_mention(user: User, mode=ParseMode.HTML):
+ if mode == ParseMode.HTML:
+ return f"{user.first_name}"
+ elif mode == ParseMode.MARKDOWN:
+ return f"[{user.first_name}](tg://user?id={user.id})"
+ else:
+ raise ValueError(f"Unknown parse mode {mode}")
+
+
+async def mute_user(chat_id, user_id, until, bot: Bot):
+ end_time = until + int(time.time())
+ await bot.restrict_chat_member(
+ chat_id,
+ user_id,
+ until_date=end_time,
+ use_independent_chat_permissions=True,
+ permissions=ChatPermissions(
+ can_send_messages=False,
+ can_send_audios=False,
+ can_send_documents=False,
+ can_send_photos=False,
+ can_send_videos=False,
+ can_send_video_notes=False,
+ can_send_voice_notes=False,
+ can_send_polls=False,
+ can_send_other_messages=False,
+ can_add_web_page_previews=False,
+ can_change_info=False,
+ can_invite_users=False,
+ can_pin_messages=False,
+ can_manage_topics=False,
+ ),
+ )
+
+
+async def unmute_user(chat_id, user_id, bot: Bot):
+ await bot.restrict_chat_member(
+ chat_id,
+ user_id,
+ use_independent_chat_permissions=True,
+ permissions=ChatPermissions(
+ can_send_messages=True,
+ can_send_audios=True,
+ can_send_documents=True,
+ can_send_photos=True,
+ can_send_videos=True,
+ can_send_video_notes=True,
+ can_send_voice_notes=True,
+ can_send_polls=True,
+ can_send_other_messages=True,
+ can_add_web_page_previews=True,
+ can_change_info=True,
+ can_invite_users=True,
+ can_pin_messages=True,
+ can_manage_topics=True,
+ ),
+ )