mirror of
				https://gitflic.ru/project/alt-gnome/karkas.git
				synced 2025-11-03 23:12:27 +03:00 
			
		
		
		
	wip: Правки по standard.welcome
This commit is contained in:
		@@ -1 +1 @@
 | 
			
		||||
from .main import module_init
 | 
			
		||||
from .main import module_init, module_late_init
 | 
			
		||||
 
 | 
			
		||||
@@ -6,6 +6,9 @@
 | 
			
		||||
    "version": "1.0.0",
 | 
			
		||||
    "privileged": true,
 | 
			
		||||
    "dependencies": {
 | 
			
		||||
        "required": {
 | 
			
		||||
            "standard.config": "^1.0.0"
 | 
			
		||||
        },
 | 
			
		||||
        "optional": {
 | 
			
		||||
            "standard.command_helper": "^1.0.0",
 | 
			
		||||
            "standard.filters": "^1.0.0"
 | 
			
		||||
 
 | 
			
		||||
@@ -1,94 +1,165 @@
 | 
			
		||||
import asyncio
 | 
			
		||||
import random
 | 
			
		||||
from string import Template
 | 
			
		||||
from typing import TYPE_CHECKING
 | 
			
		||||
 | 
			
		||||
from aiogram import Bot, Router, types
 | 
			
		||||
from aiogram.enums import ChatMemberStatus
 | 
			
		||||
from aiogram.enums import ChatMemberStatus, ParseMode
 | 
			
		||||
from aiogram.filters import JOIN_TRANSITION, LEAVE_TRANSITION, ChatMemberUpdatedFilter
 | 
			
		||||
from aiogram.types import ChatMemberUpdated
 | 
			
		||||
from aiogram.types import ChatMemberUpdated, PollAnswer
 | 
			
		||||
 | 
			
		||||
from ocab_core.modules_system.public_api import log, register_router
 | 
			
		||||
from ocab_core.modules_system.public_api import get_module, log, register_router
 | 
			
		||||
 | 
			
		||||
from .utils import get_plural_form
 | 
			
		||||
from .verifications_methods.base import VerificationCallback, VerificationMethod
 | 
			
		||||
from .verifications_methods.iamhuman import IAmHumanButton
 | 
			
		||||
from .verifications_methods.math import MathButtonsVerification
 | 
			
		||||
from .verifications_methods.question import QuestionButtonsVerification
 | 
			
		||||
from .utils import MultiKeyDict, get_plural_form, key_from_poll, key_from_user_chat
 | 
			
		||||
from .verifications_methods.base import BaseTask, VerificationCallback
 | 
			
		||||
 | 
			
		||||
# По хорошему, надо вынести в конфиг, но пока оставим так.
 | 
			
		||||
verification_methods = [
 | 
			
		||||
    IAmHumanButton(),
 | 
			
		||||
    # IAmHumanInput(),
 | 
			
		||||
    MathButtonsVerification(),
 | 
			
		||||
    # MathInputVerificationMethod(),
 | 
			
		||||
    QuestionButtonsVerification(),
 | 
			
		||||
    # QuestionInputVerification(),
 | 
			
		||||
# from .verifications_methods.simple import SimpleInlineButtonsMethod
 | 
			
		||||
# from .verifications_methods.iamhuman import IAmHumanButton
 | 
			
		||||
from .verifications_methods.math import MathInlineButtonsTask, MathPollTask
 | 
			
		||||
from .verifications_methods.question import QuestionInlineButtonsTask, QuestionPollTask
 | 
			
		||||
from .verifications_methods.simple import (
 | 
			
		||||
    SimpleVariantsBaseTask,
 | 
			
		||||
    SimpleVariantsBaseTaskConfig,
 | 
			
		||||
)
 | 
			
		||||
from .verifications_methods.utils import user_mention
 | 
			
		||||
 | 
			
		||||
# from .verifications_methods.question import QuestionButtonsVerification
 | 
			
		||||
 | 
			
		||||
all_tasks = [
 | 
			
		||||
    MathInlineButtonsTask,
 | 
			
		||||
    MathPollTask,
 | 
			
		||||
    QuestionInlineButtonsTask,
 | 
			
		||||
    QuestionPollTask,
 | 
			
		||||
]
 | 
			
		||||
 | 
			
		||||
verification_tasks = {}
 | 
			
		||||
 | 
			
		||||
class TaskManager:
 | 
			
		||||
    def __init__(self, config: "IConfig"):
 | 
			
		||||
        self.config = config
 | 
			
		||||
        self.available_tasks = []
 | 
			
		||||
        self.max_attempts = 1
 | 
			
		||||
 | 
			
		||||
    def init(self):
 | 
			
		||||
        for cls in all_tasks:
 | 
			
		||||
            type_name = cls.type_name()
 | 
			
		||||
            if self.config.get(f"welcome::tasks::{type_name}::enabled"):
 | 
			
		||||
                self.available_tasks.append(cls)
 | 
			
		||||
 | 
			
		||||
        self.max_attempts = self.config.get("welcome::max_attempts")
 | 
			
		||||
 | 
			
		||||
    def build_random_task(self, event, bot, attempt_number=1) -> BaseTask:
 | 
			
		||||
        cls = random.choice(self.available_tasks)  # nosec
 | 
			
		||||
        obj = cls(
 | 
			
		||||
            event,
 | 
			
		||||
            bot,
 | 
			
		||||
            timeout_func=verify_timeout,
 | 
			
		||||
            attempt_number=attempt_number,
 | 
			
		||||
            max_attempts=self.max_attempts,
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        if isinstance(obj, SimpleVariantsBaseTask):
 | 
			
		||||
            cfg = SimpleVariantsBaseTaskConfig(obj.type_name(), self.config)
 | 
			
		||||
            obj.set_config(cfg)
 | 
			
		||||
 | 
			
		||||
        return obj
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
verification_tasks = MultiKeyDict()
 | 
			
		||||
 | 
			
		||||
if TYPE_CHECKING:
 | 
			
		||||
    from ocab_modules.standard.config import IConfig
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def new_member_handler(event: ChatMemberUpdated, bot: Bot):
 | 
			
		||||
    # НЕ СРАБОТАЕТ, ЕСЛИ ЧЕЛОВЕК УЖЕ ОГРАНИЧЕН В ПРАВАХ (RESTRICTED)
 | 
			
		||||
    if event.new_chat_member.status == ChatMemberStatus.MEMBER:
 | 
			
		||||
        user_id = event.from_user.id
 | 
			
		||||
        chat_id = event.chat.id
 | 
			
		||||
        task = task_manager.build_random_task(event, bot)
 | 
			
		||||
        keys = await task.run()
 | 
			
		||||
 | 
			
		||||
        method: VerificationMethod = random.choice(verification_methods)  # nosec
 | 
			
		||||
 | 
			
		||||
        await method.pre_task(
 | 
			
		||||
            chat_id,
 | 
			
		||||
            user_id,
 | 
			
		||||
            bot,
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        task_data = await method.create_task(event, bot)
 | 
			
		||||
 | 
			
		||||
        task_data["user_id"] = user_id
 | 
			
		||||
        task_data["chat_id"] = chat_id
 | 
			
		||||
        task_data["method"] = method
 | 
			
		||||
        task_data["timeout"] = method.timeout(task_data)
 | 
			
		||||
 | 
			
		||||
        task = asyncio.create_task(verify_timeout(bot, task_data))
 | 
			
		||||
 | 
			
		||||
        verification_tasks[(user_id, chat_id)] = {
 | 
			
		||||
            "task": task,
 | 
			
		||||
            "task_data": task_data,
 | 
			
		||||
        }
 | 
			
		||||
        verification_tasks.add(task, keys)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def left_member_handler(event: ChatMemberUpdated, bot: Bot):
 | 
			
		||||
    user_id = event.from_user.id
 | 
			
		||||
    chat_id = event.chat.id
 | 
			
		||||
 | 
			
		||||
    if (user_id, chat_id) not in verification_tasks:
 | 
			
		||||
    key = key_from_user_chat(user_id, chat_id)
 | 
			
		||||
 | 
			
		||||
    if not verification_tasks.exists(key):
 | 
			
		||||
        return
 | 
			
		||||
 | 
			
		||||
    task = verification_tasks[(user_id, chat_id)]
 | 
			
		||||
    task["task"].cancel()
 | 
			
		||||
    task = verification_tasks.get(key)
 | 
			
		||||
    await task.end(success=False)
 | 
			
		||||
 | 
			
		||||
    verification_tasks.pop((user_id, chat_id), None)
 | 
			
		||||
    task_data = task["task_data"]
 | 
			
		||||
    method: VerificationMethod = task_data["method"]
 | 
			
		||||
    await method.post_task(task_data, bot, success=False)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def verify_timeout(bot: Bot, task_data: dict):
 | 
			
		||||
async def verify_timeout(task: BaseTask):
 | 
			
		||||
    user_id = task.from_user_id
 | 
			
		||||
    chat_id = task.from_chat_id
 | 
			
		||||
 | 
			
		||||
    try:
 | 
			
		||||
        chat_id = task_data["chat_id"]
 | 
			
		||||
        user_id = task_data["user_id"]
 | 
			
		||||
        method: VerificationMethod = task_data["method"]
 | 
			
		||||
        timeout = task.get_timeout()
 | 
			
		||||
        # log(f"Start timeout {timeout}")
 | 
			
		||||
        await asyncio.sleep(timeout)
 | 
			
		||||
 | 
			
		||||
        await asyncio.sleep(task_data["timeout"])
 | 
			
		||||
 | 
			
		||||
        await method.post_task(task_data, success=False)
 | 
			
		||||
 | 
			
		||||
        chat_member = await bot.get_chat_member(chat_id, user_id)
 | 
			
		||||
        if chat_member.status in [ChatMemberStatus.MEMBER, ChatMemberStatus.RESTRICTED]:
 | 
			
		||||
            await bot.ban_chat_member(chat_id, user_id)
 | 
			
		||||
        await task.end(success=False)
 | 
			
		||||
 | 
			
		||||
        await task.bot.ban_chat_member(chat_id, user_id)
 | 
			
		||||
    except Exception as e:
 | 
			
		||||
        log(f"Error in verify_timeout: {e}")
 | 
			
		||||
    finally:
 | 
			
		||||
        verification_tasks.pop((user_id, chat_id), None)
 | 
			
		||||
        verification_tasks.remove(key_from_user_chat(user_id, chat_id))
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def success_end(task: BaseTask):
 | 
			
		||||
    await task.end()
 | 
			
		||||
 | 
			
		||||
    await asyncio.sleep(3)
 | 
			
		||||
 | 
			
		||||
    if config.get("welcome::show_success_message"):
 | 
			
		||||
        await task.bot.send_message(
 | 
			
		||||
            task.from_chat_id,
 | 
			
		||||
            Template(config.get("welcome::success_message")).substitute(
 | 
			
		||||
                mention=user_mention(task.from_user)
 | 
			
		||||
            ),
 | 
			
		||||
            parse_mode=ParseMode.HTML,
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def handle_poll_verification(answer: PollAnswer, bot: Bot):
 | 
			
		||||
    key = key_from_poll(answer.poll_id)
 | 
			
		||||
    if not verification_tasks.exists(key):
 | 
			
		||||
        return
 | 
			
		||||
 | 
			
		||||
    task: BaseTask = verification_tasks.get(key)
 | 
			
		||||
 | 
			
		||||
    if task.from_user_id != answer.user.id:
 | 
			
		||||
        return
 | 
			
		||||
 | 
			
		||||
    result = await task.verify(answer.option_ids[0])
 | 
			
		||||
 | 
			
		||||
    if result:
 | 
			
		||||
        await success_end(task)
 | 
			
		||||
        return
 | 
			
		||||
 | 
			
		||||
    await task.end(success=False)
 | 
			
		||||
 | 
			
		||||
    current_attempt = task.attempt_number
 | 
			
		||||
 | 
			
		||||
    if current_attempt >= task_manager.max_attempts:
 | 
			
		||||
        await task.bot.ban_chat_member(task.from_chat_id, task.from_user_id)
 | 
			
		||||
        return
 | 
			
		||||
 | 
			
		||||
    await asyncio.sleep(5)
 | 
			
		||||
 | 
			
		||||
    current_attempt = current_attempt + 1
 | 
			
		||||
    new_task = task_manager.build_random_task(
 | 
			
		||||
        task.event, task.bot, attempt_number=current_attempt
 | 
			
		||||
    )
 | 
			
		||||
    keys = await new_task.run()
 | 
			
		||||
    log(keys)
 | 
			
		||||
    verification_tasks.add(new_task, keys)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def handle_inline_button_verification(
 | 
			
		||||
@@ -101,71 +172,152 @@ async def handle_inline_button_verification(
 | 
			
		||||
        await callback_query.answer("Эта кнопка не для вас!", show_alert=True)
 | 
			
		||||
        return
 | 
			
		||||
 | 
			
		||||
    if (user_id, chat_id) not in verification_tasks:
 | 
			
		||||
    key = key_from_user_chat(user_id, chat_id)
 | 
			
		||||
 | 
			
		||||
    if not verification_tasks.exists(key):
 | 
			
		||||
        await callback_query.answer()
 | 
			
		||||
        return
 | 
			
		||||
 | 
			
		||||
    verification_task = verification_tasks[(user_id, chat_id)]
 | 
			
		||||
    task_data = verification_task["task_data"]
 | 
			
		||||
    method: VerificationMethod = task_data["method"]
 | 
			
		||||
    task_data["answer"] = callback_data.answer
 | 
			
		||||
    task: BaseTask = verification_tasks.get(key)
 | 
			
		||||
 | 
			
		||||
    result = await method.verify(task_data)
 | 
			
		||||
    result = await task.verify(callback_data.answer)
 | 
			
		||||
 | 
			
		||||
    if result:
 | 
			
		||||
        verification_task["task"].cancel()
 | 
			
		||||
        await method.post_task(task_data, bot)
 | 
			
		||||
        await success_end(task)
 | 
			
		||||
        return
 | 
			
		||||
 | 
			
		||||
    if "attempts_count" not in task_data:
 | 
			
		||||
        await callback_query.answer("Неправильный ответ!", show_alert=True)
 | 
			
		||||
        return
 | 
			
		||||
    await task.end(success=False)
 | 
			
		||||
 | 
			
		||||
    attempts_count = task_data["attempts_count"]
 | 
			
		||||
    current_attempt = task.attempt_number
 | 
			
		||||
 | 
			
		||||
    verification_task["task"].cancel()
 | 
			
		||||
    attempts_count = attempts_count - 1
 | 
			
		||||
 | 
			
		||||
    if attempts_count > 0:
 | 
			
		||||
        await callback_query.answer(
 | 
			
		||||
            "Неправильный ответ! "
 | 
			
		||||
            + "У вас еще "
 | 
			
		||||
            + get_plural_form(attempts_count, "попытка", "попытки", "попыток"),
 | 
			
		||||
            show_alert=True,
 | 
			
		||||
        )
 | 
			
		||||
        task_data["timeout"] = method.timeout(task_data)
 | 
			
		||||
        task_data["attempts_count"] = attempts_count
 | 
			
		||||
    else:
 | 
			
		||||
        task_data["timeout"] = 0
 | 
			
		||||
    if current_attempt >= task_manager.max_attempts:
 | 
			
		||||
        await callback_query.answer()
 | 
			
		||||
 | 
			
		||||
    task = asyncio.create_task(verify_timeout(bot, task_data))
 | 
			
		||||
    verification_task["task"] = task
 | 
			
		||||
 | 
			
		||||
    # Эта строчка нужна, т.к. во время cancel происходит pop
 | 
			
		||||
    verification_tasks[(user_id, chat_id)] = verification_task
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def handle_input_verification(message: types.Message, bot: Bot):
 | 
			
		||||
    user_id = message.from_user.id
 | 
			
		||||
    chat_id = message.chat.id
 | 
			
		||||
 | 
			
		||||
    if (user_id, chat_id) not in verification_tasks:
 | 
			
		||||
        await task.bot.ban_chat_member(chat_id, user_id)
 | 
			
		||||
        return
 | 
			
		||||
    task = verification_tasks[(user_id, chat_id)]
 | 
			
		||||
    task_data = task["task_data"]
 | 
			
		||||
    method: VerificationMethod = task_data["method"]
 | 
			
		||||
    task_data["answer"] = message.text
 | 
			
		||||
    task_data["answer_message_id"] = message.message_id
 | 
			
		||||
 | 
			
		||||
    result = await method.verify(task_data)
 | 
			
		||||
    await callback_query.answer(
 | 
			
		||||
        Template(config.get("welcome::retry_message")).substitute(
 | 
			
		||||
            attempts=get_plural_form(
 | 
			
		||||
                task_manager.max_attempts - current_attempt,
 | 
			
		||||
                "попытка",
 | 
			
		||||
                "попытки",
 | 
			
		||||
                "попыток",
 | 
			
		||||
            )
 | 
			
		||||
        ),
 | 
			
		||||
        show_alert=True,
 | 
			
		||||
    )
 | 
			
		||||
    current_attempt = current_attempt + 1
 | 
			
		||||
    new_task = task_manager.build_random_task(
 | 
			
		||||
        task.event, task.bot, attempt_number=current_attempt
 | 
			
		||||
    )
 | 
			
		||||
    keys = await new_task.run()
 | 
			
		||||
    verification_tasks.add(new_task, keys)
 | 
			
		||||
 | 
			
		||||
    if result:
 | 
			
		||||
        task["task"].cancel()
 | 
			
		||||
        await method.post_task(task_data, bot)
 | 
			
		||||
 | 
			
		||||
config: "IConfig" = get_module("standard.config", "config")
 | 
			
		||||
 | 
			
		||||
task_manager = TaskManager(config)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def module_init():
 | 
			
		||||
    config.register("welcome::timeout", "int", default_value=45)
 | 
			
		||||
    config.register("welcome::max_attempts", "int", default_value=2)
 | 
			
		||||
    config.register(
 | 
			
		||||
        "welcome::retry_message",
 | 
			
		||||
        "string",
 | 
			
		||||
        default_value="Неправильный ответ! У вас еще $attempts",
 | 
			
		||||
    )
 | 
			
		||||
    config.register("welcome::show_success_message", "boolean", default_value=True)
 | 
			
		||||
    config.register(
 | 
			
		||||
        "welcome::success_message",
 | 
			
		||||
        "string",
 | 
			
		||||
        default_value="$mention, вы успешно прошли проверку!",
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
    # MATH BUTTONS
 | 
			
		||||
 | 
			
		||||
    config.register(
 | 
			
		||||
        "welcome::tasks::math_buttons::enabled", "boolean", default_value=True
 | 
			
		||||
    )
 | 
			
		||||
    config.register(
 | 
			
		||||
        "welcome::tasks::math_buttons::message_text",
 | 
			
		||||
        "int",
 | 
			
		||||
        default_value="Привет, $mention!\n"
 | 
			
		||||
        "Решите простую математическую задачу,"
 | 
			
		||||
        "чтобы подтвердить, что вы не робот:\n\n<b>$task</b>",
 | 
			
		||||
    )
 | 
			
		||||
    config.register(
 | 
			
		||||
        "welcome::tasks::math_buttons::retry_message_text",
 | 
			
		||||
        "int",
 | 
			
		||||
        default_value="$mention, неправильный ответ! У вас еще $attempts\n"
 | 
			
		||||
        "Решите простую математическую задачу,"
 | 
			
		||||
        "чтобы подтвердить, что вы не робот:\n\n<b>$task</b>",
 | 
			
		||||
    )
 | 
			
		||||
    config.register("welcome::tasks::math_buttons::timeout", "int", default_value=None)
 | 
			
		||||
 | 
			
		||||
    # MATH POLL
 | 
			
		||||
 | 
			
		||||
    config.register("welcome::tasks::math_poll::enabled", "boolean", default_value=True)
 | 
			
		||||
    config.register(
 | 
			
		||||
        "welcome::tasks::math_poll::message_text",
 | 
			
		||||
        "string",
 | 
			
		||||
        default_value="Привет, $mention!\n"
 | 
			
		||||
        "Решите простую математическую задачу,"
 | 
			
		||||
        "чтобы подтвердить, что вы не робот:\n\n$task",
 | 
			
		||||
    )
 | 
			
		||||
    config.register(
 | 
			
		||||
        "welcome::tasks::math_poll::retry_message_text",
 | 
			
		||||
        "string",
 | 
			
		||||
        default_value="$mention, неправильный ответ! У вас еще $attempts\n"
 | 
			
		||||
        "Решите простую математическую задачу,"
 | 
			
		||||
        "чтобы подтвердить, что вы не робот:\n\n$task",
 | 
			
		||||
    )
 | 
			
		||||
    config.register("welcome::tasks::math_poll::timeout", "int", default_value=None)
 | 
			
		||||
 | 
			
		||||
    # QUESTION BUTTONS
 | 
			
		||||
 | 
			
		||||
    config.register(
 | 
			
		||||
        "welcome::tasks::question_buttons::enabled", "boolean", default_value=True
 | 
			
		||||
    )
 | 
			
		||||
    config.register(
 | 
			
		||||
        "welcome::tasks::question_buttons::message_text",
 | 
			
		||||
        "string",
 | 
			
		||||
        default_value="Привет, $mention!\n"
 | 
			
		||||
        "Решите простую математическую задачу,"
 | 
			
		||||
        "чтобы подтвердить, что вы не робот:\n\n<b>$task</b>",
 | 
			
		||||
    )
 | 
			
		||||
    config.register(
 | 
			
		||||
        "welcome::tasks::question_buttons::retry_message_text",
 | 
			
		||||
        "string",
 | 
			
		||||
        default_value="$mention, неправильный ответ! У вас еще $attempts\n"
 | 
			
		||||
        "Решите простую математическую задачу,"
 | 
			
		||||
        "чтобы подтвердить, что вы не робот:\n\n<b>$task</b>",
 | 
			
		||||
    )
 | 
			
		||||
    config.register(
 | 
			
		||||
        "welcome::tasks::question_buttons::timeout", "int", default_value=None
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
    # QUESTION POLL
 | 
			
		||||
 | 
			
		||||
    config.register(
 | 
			
		||||
        "welcome::tasks::question_poll::enabled", "boolean", default_value=True
 | 
			
		||||
    )
 | 
			
		||||
    config.register(
 | 
			
		||||
        "welcome::tasks::question_poll::message_text",
 | 
			
		||||
        "string",
 | 
			
		||||
        default_value="Привет, $mention!\n"
 | 
			
		||||
        "Решите простую математическую задачу,"
 | 
			
		||||
        "чтобы подтвердить, что вы не робот:\n\n$task",
 | 
			
		||||
    )
 | 
			
		||||
    config.register(
 | 
			
		||||
        "welcome::tasks::question_poll::retry_message_text",
 | 
			
		||||
        "string",
 | 
			
		||||
        default_value="$mention, неправильный ответ! У вас еще $attempts\n"
 | 
			
		||||
        "Решите простую математическую задачу,"
 | 
			
		||||
        "чтобы подтвердить, что вы не робот:\n\n$task",
 | 
			
		||||
    )
 | 
			
		||||
    config.register("welcome::tasks::question_poll::timeout", "int", default_value=None)
 | 
			
		||||
 | 
			
		||||
    router = Router()
 | 
			
		||||
 | 
			
		||||
    router.chat_member(ChatMemberUpdatedFilter(JOIN_TRANSITION))(new_member_handler)
 | 
			
		||||
@@ -175,6 +327,11 @@ async def module_init():
 | 
			
		||||
    router.callback_query(VerificationCallback.filter())(
 | 
			
		||||
        handle_inline_button_verification
 | 
			
		||||
    )
 | 
			
		||||
    router.message()(handle_input_verification)
 | 
			
		||||
    router.poll_answer()(handle_poll_verification)
 | 
			
		||||
    # router.message()(handle_input_verification)
 | 
			
		||||
 | 
			
		||||
    register_router(router)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def module_late_init():
 | 
			
		||||
    task_manager.init()
 | 
			
		||||
 
 | 
			
		||||
@@ -7,3 +7,39 @@ def get_plural_form(number, singular, genitive_singular, plural):
 | 
			
		||||
        return f"{number} {genitive_singular}"
 | 
			
		||||
    else:
 | 
			
		||||
        return f"{number} {plural}"
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class MultiKeyDict:
 | 
			
		||||
    def __init__(self):
 | 
			
		||||
        self.value_to_keys = {}  # Словарь значений и связанных с ними ключей
 | 
			
		||||
        self.key_to_value = {}  # Словарь ключей и связанных с ними значений
 | 
			
		||||
 | 
			
		||||
    def add(self, value, keys):
 | 
			
		||||
        # Добавляем значение в словарь с множеством ключей
 | 
			
		||||
        self.value_to_keys[value] = set(keys)
 | 
			
		||||
 | 
			
		||||
        # Для каждого ключа создаем запись в словаре key_to_value
 | 
			
		||||
        for key in keys:
 | 
			
		||||
            self.key_to_value[key] = value
 | 
			
		||||
 | 
			
		||||
    def get(self, key):
 | 
			
		||||
        return self.key_to_value.get(key)
 | 
			
		||||
 | 
			
		||||
    def exists(self, key):
 | 
			
		||||
        return key in self.key_to_value
 | 
			
		||||
 | 
			
		||||
    def remove(self, key):
 | 
			
		||||
        if key in self.key_to_value:
 | 
			
		||||
            value = self.key_to_value.pop(key)
 | 
			
		||||
            self.value_to_keys[value].remove(key)
 | 
			
		||||
 | 
			
		||||
            for k in self.value_to_keys[value]:
 | 
			
		||||
                del self.key_to_value[k]
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def key_from_user_chat(user_id, chat_id):
 | 
			
		||||
    return f"uc:{user_id}_{chat_id}"
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def key_from_poll(poll_id):
 | 
			
		||||
    return f"p:{poll_id}"
 | 
			
		||||
 
 | 
			
		||||
@@ -1,133 +1,102 @@
 | 
			
		||||
import time
 | 
			
		||||
import asyncio
 | 
			
		||||
from functools import wraps
 | 
			
		||||
 | 
			
		||||
from aiogram import Bot
 | 
			
		||||
from aiogram.exceptions import TelegramBadRequest
 | 
			
		||||
from aiogram.filters.callback_data import CallbackData
 | 
			
		||||
from aiogram.types import ChatMemberUpdated, ChatPermissions
 | 
			
		||||
from aiogram.types import ChatMemberUpdated
 | 
			
		||||
 | 
			
		||||
from .utils import user_mention
 | 
			
		||||
from ocab_core.modules_system.public_api import log
 | 
			
		||||
 | 
			
		||||
from .utils import mute_user, unmute_user
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def mute_user(chat_id, user_id, until, bot: Bot):
 | 
			
		||||
    end_time = until + int(time.time())
 | 
			
		||||
    await bot.restrict_chat_member(
 | 
			
		||||
        chat_id,
 | 
			
		||||
        user_id,
 | 
			
		||||
        until_date=end_time,
 | 
			
		||||
        use_independent_chat_permissions=True,
 | 
			
		||||
        permissions=ChatPermissions(
 | 
			
		||||
            can_send_messages=False,
 | 
			
		||||
            can_send_audios=False,
 | 
			
		||||
            can_send_documents=False,
 | 
			
		||||
            can_send_photos=False,
 | 
			
		||||
            can_send_videos=False,
 | 
			
		||||
            can_send_video_notes=False,
 | 
			
		||||
            can_send_voice_notes=False,
 | 
			
		||||
            can_send_polls=False,
 | 
			
		||||
            can_send_other_messages=False,
 | 
			
		||||
            can_add_web_page_previews=False,
 | 
			
		||||
            can_change_info=False,
 | 
			
		||||
            can_invite_users=False,
 | 
			
		||||
            can_pin_messages=False,
 | 
			
		||||
            can_manage_topics=False,
 | 
			
		||||
        ),
 | 
			
		||||
    )
 | 
			
		||||
class BaseTask:
 | 
			
		||||
    def __init__(
 | 
			
		||||
        self,
 | 
			
		||||
        event: ChatMemberUpdated,
 | 
			
		||||
        bot: Bot,
 | 
			
		||||
        timeout_func=None,
 | 
			
		||||
        attempt_number=1,
 | 
			
		||||
        max_attempts=1,
 | 
			
		||||
    ):
 | 
			
		||||
        self.bot = bot
 | 
			
		||||
        self.event = event
 | 
			
		||||
        self.timeout_func = timeout_func
 | 
			
		||||
        self.attempt_number = attempt_number
 | 
			
		||||
        self.max_attempts = max_attempts
 | 
			
		||||
        self.timeout_func_task = None
 | 
			
		||||
 | 
			
		||||
    @property
 | 
			
		||||
    def from_chat_id(self):
 | 
			
		||||
        return self.event.chat.id
 | 
			
		||||
 | 
			
		||||
    @property
 | 
			
		||||
    def from_user_id(self):
 | 
			
		||||
        return self.event.from_user.id
 | 
			
		||||
 | 
			
		||||
    @property
 | 
			
		||||
    def from_user(self):
 | 
			
		||||
        return self.event.from_user
 | 
			
		||||
 | 
			
		||||
    @property
 | 
			
		||||
    def attemps_left(self):
 | 
			
		||||
        return self.max_attempts - self.attempt_number + 1
 | 
			
		||||
 | 
			
		||||
    async def start_timeout_func(self):
 | 
			
		||||
        if self.timeout_func:
 | 
			
		||||
            self.timeout_func_task = asyncio.create_task(self.timeout_func(self))
 | 
			
		||||
 | 
			
		||||
    @staticmethod
 | 
			
		||||
    def type_name():
 | 
			
		||||
        raise NotImplementedError()
 | 
			
		||||
 | 
			
		||||
    async def run(self):
 | 
			
		||||
        raise NotImplementedError()
 | 
			
		||||
 | 
			
		||||
    async def verify(self, data):
 | 
			
		||||
        raise NotImplementedError()
 | 
			
		||||
 | 
			
		||||
    async def end(self, success=True):
 | 
			
		||||
        raise NotImplementedError()
 | 
			
		||||
 | 
			
		||||
    async def get_timeout():
 | 
			
		||||
        raise NotImplementedError()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def unmute_user(chat_id, user_id, bot: Bot):
 | 
			
		||||
    await bot.restrict_chat_member(
 | 
			
		||||
        chat_id,
 | 
			
		||||
        user_id,
 | 
			
		||||
        use_independent_chat_permissions=True,
 | 
			
		||||
        permissions=ChatPermissions(
 | 
			
		||||
            can_send_messages=True,
 | 
			
		||||
            can_send_audios=True,
 | 
			
		||||
            can_send_documents=True,
 | 
			
		||||
            can_send_photos=True,
 | 
			
		||||
            can_send_videos=True,
 | 
			
		||||
            can_send_video_notes=True,
 | 
			
		||||
            can_send_voice_notes=True,
 | 
			
		||||
            can_send_polls=True,
 | 
			
		||||
            can_send_other_messages=True,
 | 
			
		||||
            can_add_web_page_previews=True,
 | 
			
		||||
            can_change_info=True,
 | 
			
		||||
            can_invite_users=True,
 | 
			
		||||
            can_pin_messages=True,
 | 
			
		||||
            can_manage_topics=True,
 | 
			
		||||
        ),
 | 
			
		||||
    )
 | 
			
		||||
def mute_while_task(cls):
 | 
			
		||||
    original_run = getattr(cls, "run", None)
 | 
			
		||||
    original_end = getattr(cls, "end", None)
 | 
			
		||||
 | 
			
		||||
    if not original_run and not original_end:
 | 
			
		||||
        return cls
 | 
			
		||||
 | 
			
		||||
class VerificationMethod:
 | 
			
		||||
 | 
			
		||||
    def timeout(self, task_data=None) -> int:
 | 
			
		||||
        """
 | 
			
		||||
        Время ожидания
 | 
			
		||||
        """
 | 
			
		||||
        return 30
 | 
			
		||||
 | 
			
		||||
    def method_name(self):
 | 
			
		||||
        pass
 | 
			
		||||
 | 
			
		||||
    async def pre_task(self, chat_id, user_id, bot: Bot):
 | 
			
		||||
        pass
 | 
			
		||||
 | 
			
		||||
    async def create_task(self, event: ChatMemberUpdated, bot: Bot):
 | 
			
		||||
        pass
 | 
			
		||||
 | 
			
		||||
    async def post_task(self, task_data, bot: Bot, success=True):
 | 
			
		||||
        pass
 | 
			
		||||
 | 
			
		||||
    async def verify(self, task_data):
 | 
			
		||||
        pass
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class InputVerificationMethod(VerificationMethod):
 | 
			
		||||
    async def post_task(self, task_data, bot: Bot, success=True, user=None):
 | 
			
		||||
        chat_id = task_data["chat_id"]
 | 
			
		||||
        message_id = task_data["message_id"]
 | 
			
		||||
        answer_message_id = task_data["answer_message_id"]
 | 
			
		||||
 | 
			
		||||
        await bot.delete_message(chat_id, message_id)
 | 
			
		||||
        await bot.delete_message(chat_id, answer_message_id)
 | 
			
		||||
 | 
			
		||||
        if not success or user is None:
 | 
			
		||||
            return
 | 
			
		||||
 | 
			
		||||
        await bot.send_message(
 | 
			
		||||
            chat_id,
 | 
			
		||||
            f"{user_mention(user)}, успешно прошли проверку! "
 | 
			
		||||
            "Пожалуйста, соблюдайте правила группы.",
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class InlineButtonVerificationMethod(VerificationMethod):
 | 
			
		||||
    async def pre_task(self, chat_id, user_id, bot: Bot):
 | 
			
		||||
    @wraps(original_run)
 | 
			
		||||
    async def wrapped_run(self: BaseTask):
 | 
			
		||||
        chat_id = self.from_chat_id
 | 
			
		||||
        user_id = self.from_user_id
 | 
			
		||||
        try:
 | 
			
		||||
            await mute_user(chat_id, user_id, 0, bot)
 | 
			
		||||
        except TelegramBadRequest:
 | 
			
		||||
            await mute_user(chat_id, user_id, 0, self.bot)
 | 
			
		||||
        except TelegramBadRequest as e:
 | 
			
		||||
            log(e)
 | 
			
		||||
            pass
 | 
			
		||||
        return await original_run(self)
 | 
			
		||||
 | 
			
		||||
    async def post_task(self, task_data, bot: Bot, success=True, user=None):
 | 
			
		||||
        user_id = task_data["user_id"]
 | 
			
		||||
        chat_id = task_data["chat_id"]
 | 
			
		||||
        message_id = task_data["message_id"]
 | 
			
		||||
    @wraps(original_end)
 | 
			
		||||
    async def wrapped_end(self: BaseTask, success=True):
 | 
			
		||||
        await original_end(self, success)
 | 
			
		||||
        if success:
 | 
			
		||||
            chat_id = self.from_chat_id
 | 
			
		||||
            user_id = self.from_user_id
 | 
			
		||||
            try:
 | 
			
		||||
                await unmute_user(chat_id, user_id, self.bot)
 | 
			
		||||
            except TelegramBadRequest as e:
 | 
			
		||||
                log(e)
 | 
			
		||||
                pass
 | 
			
		||||
 | 
			
		||||
        await bot.delete_message(chat_id, message_id)
 | 
			
		||||
 | 
			
		||||
        try:
 | 
			
		||||
            await unmute_user(chat_id, user_id, bot)
 | 
			
		||||
        except TelegramBadRequest:
 | 
			
		||||
            pass
 | 
			
		||||
 | 
			
		||||
        if not success or user is None:
 | 
			
		||||
            return
 | 
			
		||||
 | 
			
		||||
        await bot.send_message(
 | 
			
		||||
            chat_id,
 | 
			
		||||
            f"{user_mention(user)}, успешно прошли проверку! "
 | 
			
		||||
            "Пожалуйста, соблюдайте правила группы.",
 | 
			
		||||
        )
 | 
			
		||||
    cls.run = wrapped_run
 | 
			
		||||
    cls.end = wrapped_end
 | 
			
		||||
    return cls
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class VerificationCallback(CallbackData, prefix="verify"):
 | 
			
		||||
 
 | 
			
		||||
@@ -1,78 +0,0 @@
 | 
			
		||||
import random
 | 
			
		||||
 | 
			
		||||
from aiogram import Bot
 | 
			
		||||
from aiogram.enums import ParseMode
 | 
			
		||||
from aiogram.types import ChatMemberUpdated, InlineKeyboardButton, InlineKeyboardMarkup
 | 
			
		||||
 | 
			
		||||
from .base import (
 | 
			
		||||
    InlineButtonVerificationMethod,
 | 
			
		||||
    InputVerificationMethod,
 | 
			
		||||
    VerificationCallback,
 | 
			
		||||
)
 | 
			
		||||
from .utils import user_mention
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class IAmHumanButton(InlineButtonVerificationMethod):
 | 
			
		||||
    def __init__(self):
 | 
			
		||||
        pass
 | 
			
		||||
 | 
			
		||||
    def method_name(self):
 | 
			
		||||
        return "i_am_human_button"
 | 
			
		||||
 | 
			
		||||
    async def create_task(self, event: ChatMemberUpdated, bot: Bot):
 | 
			
		||||
        user_id = event.from_user.id
 | 
			
		||||
        chat_id = event.chat.id
 | 
			
		||||
 | 
			
		||||
        keyboard = InlineKeyboardMarkup(
 | 
			
		||||
            inline_keyboard=[
 | 
			
		||||
                [
 | 
			
		||||
                    InlineKeyboardButton(
 | 
			
		||||
                        text="Я человек!",
 | 
			
		||||
                        callback_data=VerificationCallback(
 | 
			
		||||
                            user_id=user_id, chat_id=chat_id, answer="OK"
 | 
			
		||||
                        ).pack(),
 | 
			
		||||
                    )
 | 
			
		||||
                ]
 | 
			
		||||
            ]
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        message = await bot.send_message(
 | 
			
		||||
            chat_id,
 | 
			
		||||
            f"Привет, {user_mention(event.from_user)}! ",
 | 
			
		||||
            "Нажмите кнопку, чтобы подтвердить, что вы не робот.",
 | 
			
		||||
            reply_markup=keyboard,
 | 
			
		||||
            parse_mode=ParseMode.HTML,
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        return {"message_id": message.message_id}
 | 
			
		||||
 | 
			
		||||
    async def verify(self, task_data):
 | 
			
		||||
        return True
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class IAmHumanInput(InputVerificationMethod):
 | 
			
		||||
    def __init__(self):
 | 
			
		||||
        pass
 | 
			
		||||
 | 
			
		||||
    def method_name(self):
 | 
			
		||||
        return "i_am_human_input"
 | 
			
		||||
 | 
			
		||||
    def get_text(self):
 | 
			
		||||
        return random.choice(["Я человек", "Я не робот"])  # nosec
 | 
			
		||||
 | 
			
		||||
    async def create_task(self, event: ChatMemberUpdated, bot: Bot):
 | 
			
		||||
        chat_id = event.chat.id
 | 
			
		||||
        text = self.get_text()
 | 
			
		||||
        message = await bot.send_message(
 | 
			
		||||
            chat_id,
 | 
			
		||||
            f"Привет, {user_mention(event.from_user)}! "
 | 
			
		||||
            f'Напишите "{text}", чтобы подтвердить, что вы не робот.',
 | 
			
		||||
            parse_mode=ParseMode.HTML,
 | 
			
		||||
        )
 | 
			
		||||
        return {"message_id": message.message_id, "correct": text}
 | 
			
		||||
 | 
			
		||||
    async def verify(self, task_data):
 | 
			
		||||
        correct: str = task_data["correct"]
 | 
			
		||||
        answer: str = task_data["answer"]
 | 
			
		||||
 | 
			
		||||
        return answer.lower() == correct.lower()
 | 
			
		||||
@@ -1,63 +1,9 @@
 | 
			
		||||
import random
 | 
			
		||||
 | 
			
		||||
from aiogram import Bot
 | 
			
		||||
from aiogram.enums import ParseMode
 | 
			
		||||
from aiogram.types import ChatMemberUpdated, InlineKeyboardButton, InlineKeyboardMarkup
 | 
			
		||||
 | 
			
		||||
from .base import (
 | 
			
		||||
    InlineButtonVerificationMethod,
 | 
			
		||||
    InputVerificationMethod,
 | 
			
		||||
    VerificationCallback,
 | 
			
		||||
)
 | 
			
		||||
from .utils import user_mention
 | 
			
		||||
from .simple import SimpleInlineButtonsTask, SimplePollTask, SimpleVariantsBaseTask
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class MathInputVerificationMethod(InputVerificationMethod):
 | 
			
		||||
    def __init__(self):
 | 
			
		||||
        pass
 | 
			
		||||
 | 
			
		||||
    def method_name(self):
 | 
			
		||||
        return "math_input"
 | 
			
		||||
 | 
			
		||||
    def generate_math_problem(self):
 | 
			
		||||
        a = random.randint(1, 10)  # nosec
 | 
			
		||||
        b = random.randint(1, 10)  # nosec
 | 
			
		||||
        operation = random.choice(["+", "-", "*"])  # nosec
 | 
			
		||||
        if operation == "+":
 | 
			
		||||
            answer = a + b
 | 
			
		||||
        elif operation == "-":
 | 
			
		||||
            answer = a - b
 | 
			
		||||
        else:
 | 
			
		||||
            answer = a * b
 | 
			
		||||
        return f"{a} {operation} {b}", str(answer)
 | 
			
		||||
 | 
			
		||||
    async def create_task(self, event: ChatMemberUpdated, bot: Bot):
 | 
			
		||||
        chat_id = event.chat.id
 | 
			
		||||
        problem, answer = self.generate_math_problem()
 | 
			
		||||
        message = await bot.send_message(
 | 
			
		||||
            chat_id,
 | 
			
		||||
            f"Привет, {user_mention(event.from_user)}! "
 | 
			
		||||
            "Решите простую математическую задачу, "
 | 
			
		||||
            "чтобы подтвердить, что вы не робот:\\n"
 | 
			
		||||
            f"{problem} = ?",
 | 
			
		||||
            parse_mode=ParseMode.HTML,
 | 
			
		||||
        )
 | 
			
		||||
        return {"message_id": message.message_id, "correct": answer}
 | 
			
		||||
 | 
			
		||||
    async def verify(self, task_data):
 | 
			
		||||
        correct: str = task_data["correct"]
 | 
			
		||||
        answer: str = task_data["answer"]
 | 
			
		||||
 | 
			
		||||
        return answer.strip() == correct
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class MathButtonsVerification(InlineButtonVerificationMethod):
 | 
			
		||||
    def __init__(self):
 | 
			
		||||
        pass
 | 
			
		||||
 | 
			
		||||
    def method_name(self):
 | 
			
		||||
        return "math_buttons"
 | 
			
		||||
 | 
			
		||||
class BaseMathTask(SimpleVariantsBaseTask):
 | 
			
		||||
    def generate_math_problem(self):
 | 
			
		||||
        a = random.randint(1, 10)  # nosec
 | 
			
		||||
        b = random.randint(1, 10)  # nosec
 | 
			
		||||
@@ -70,52 +16,38 @@ class MathButtonsVerification(InlineButtonVerificationMethod):
 | 
			
		||||
            answer = a * b
 | 
			
		||||
        return f"{a} {operation} {b}", answer
 | 
			
		||||
 | 
			
		||||
    async def create_task(self, event: ChatMemberUpdated, bot: Bot):
 | 
			
		||||
        user_id = event.from_user.id
 | 
			
		||||
        chat_id = event.chat.id
 | 
			
		||||
 | 
			
		||||
    async def init(self):
 | 
			
		||||
        problem, correct_answer = self.generate_math_problem()
 | 
			
		||||
        options = [correct_answer]
 | 
			
		||||
        while len(options) < 4:
 | 
			
		||||
        self.variants = [correct_answer]
 | 
			
		||||
        while len(self.variants) < 4:
 | 
			
		||||
            wrong_answer = random.randint(
 | 
			
		||||
                correct_answer - 5, correct_answer + 5
 | 
			
		||||
            )  # nosec
 | 
			
		||||
            if wrong_answer not in options:
 | 
			
		||||
                options.append(wrong_answer)
 | 
			
		||||
        random.shuffle(options)  # nosec
 | 
			
		||||
            if wrong_answer not in self.variants:
 | 
			
		||||
                self.variants.append(wrong_answer)
 | 
			
		||||
        random.shuffle(self.variants)  # nosec
 | 
			
		||||
 | 
			
		||||
        keyboard = InlineKeyboardMarkup(
 | 
			
		||||
            inline_keyboard=[
 | 
			
		||||
                [
 | 
			
		||||
                    InlineKeyboardButton(
 | 
			
		||||
                        text=str(option),
 | 
			
		||||
                        callback_data=VerificationCallback(
 | 
			
		||||
                            user_id=user_id, chat_id=chat_id, answer=str(option)
 | 
			
		||||
                        ).pack(),
 | 
			
		||||
                    )
 | 
			
		||||
                    for option in options
 | 
			
		||||
                ]
 | 
			
		||||
            ]
 | 
			
		||||
        )
 | 
			
		||||
        self.variants = [str(x) for x in self.variants]
 | 
			
		||||
 | 
			
		||||
        message = await bot.send_message(
 | 
			
		||||
            chat_id,
 | 
			
		||||
            f"Привет, {user_mention(event.from_user)}! "
 | 
			
		||||
            "Решите простую математическую задачу, "
 | 
			
		||||
            "чтобы подтвердить, что вы не робот:\\n"
 | 
			
		||||
            f"{problem} = ?",
 | 
			
		||||
            reply_markup=keyboard,
 | 
			
		||||
            parse_mode=ParseMode.HTML,
 | 
			
		||||
        )
 | 
			
		||||
        self.task = f"{problem} = ?"
 | 
			
		||||
        self.correct = str(correct_answer)
 | 
			
		||||
 | 
			
		||||
        return {
 | 
			
		||||
            "message_id": message.message_id,
 | 
			
		||||
            "correct": str(correct_answer),
 | 
			
		||||
            "attempts_count": 2,
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
    async def verify(self, task_data):
 | 
			
		||||
        correct: str = task_data["correct"]
 | 
			
		||||
        answer: str = task_data["answer"]
 | 
			
		||||
class MathInlineButtonsTask(BaseMathTask, SimpleInlineButtonsTask):
 | 
			
		||||
    """
 | 
			
		||||
    Математическая задача с выбором через inline-кнопки
 | 
			
		||||
    """
 | 
			
		||||
 | 
			
		||||
        return answer == correct
 | 
			
		||||
    @staticmethod
 | 
			
		||||
    def type_name():
 | 
			
		||||
        return "math_buttons"
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class MathPollTask(BaseMathTask, SimplePollTask):
 | 
			
		||||
    """
 | 
			
		||||
    Математическая задача с выбором через Poll
 | 
			
		||||
    """
 | 
			
		||||
 | 
			
		||||
    @staticmethod
 | 
			
		||||
    def type_name():
 | 
			
		||||
        return "math_poll"
 | 
			
		||||
 
 | 
			
		||||
@@ -1,16 +1,9 @@
 | 
			
		||||
import random
 | 
			
		||||
 | 
			
		||||
from aiogram import Bot
 | 
			
		||||
from aiogram.enums import ParseMode
 | 
			
		||||
from aiogram.types import ChatMemberUpdated, InlineKeyboardButton, InlineKeyboardMarkup
 | 
			
		||||
from aiogram.types import InlineKeyboardButton, InlineKeyboardMarkup
 | 
			
		||||
 | 
			
		||||
from ocab_modules.standard.welcome.verifications_methods.utils import user_mention
 | 
			
		||||
 | 
			
		||||
from .base import (
 | 
			
		||||
    InlineButtonVerificationMethod,
 | 
			
		||||
    InputVerificationMethod,
 | 
			
		||||
    VerificationCallback,
 | 
			
		||||
)
 | 
			
		||||
from .base import VerificationCallback
 | 
			
		||||
from .simple import SimpleInlineButtonsTask, SimplePollTask, SimpleVariantsBaseTask
 | 
			
		||||
 | 
			
		||||
QUESTIONS = [
 | 
			
		||||
    (
 | 
			
		||||
@@ -42,85 +35,45 @@ QUESTIONS = [
 | 
			
		||||
]
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class QuestionInputVerification(InputVerificationMethod):
 | 
			
		||||
    def __init__(self):
 | 
			
		||||
        pass
 | 
			
		||||
 | 
			
		||||
    def method_name(self):
 | 
			
		||||
        return "question_input"
 | 
			
		||||
 | 
			
		||||
    def get_random_question(self):
 | 
			
		||||
        return random.choice(QUESTIONS)  # nosec
 | 
			
		||||
 | 
			
		||||
    async def create_task(self, event: ChatMemberUpdated, bot: Bot):
 | 
			
		||||
        chat_id = event.chat.id
 | 
			
		||||
        question, answer, _ = self.get_random_question()
 | 
			
		||||
        message = await bot.send_message(
 | 
			
		||||
            chat_id,
 | 
			
		||||
            f"Привет, {user_mention(event.from_user)}! "
 | 
			
		||||
            "Пожалуйста, ответьте на следующий вопрос, "
 | 
			
		||||
            f"чтобы подтвердить, что вы не робот: {question}",
 | 
			
		||||
            parse_mode=ParseMode.HTML,
 | 
			
		||||
        )
 | 
			
		||||
        return {"message_id": message.message_id, "correct": answer.lower()}
 | 
			
		||||
 | 
			
		||||
    async def verify(self, task_data):
 | 
			
		||||
        correct: str = task_data["correct"]
 | 
			
		||||
        answer: str = task_data["answer"]
 | 
			
		||||
 | 
			
		||||
        return answer.lower().strip() == correct
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class QuestionButtonsVerification(InlineButtonVerificationMethod):
 | 
			
		||||
    def __init__(self):
 | 
			
		||||
        pass
 | 
			
		||||
 | 
			
		||||
    def method_name(self):
 | 
			
		||||
        return "question_inline"
 | 
			
		||||
 | 
			
		||||
    def get_random_question(self):
 | 
			
		||||
        return random.choice(QUESTIONS)  # nosec
 | 
			
		||||
 | 
			
		||||
    async def create_task(self, event: ChatMemberUpdated, bot: Bot):
 | 
			
		||||
        user_id = event.from_user.id
 | 
			
		||||
        chat_id = event.chat.id
 | 
			
		||||
 | 
			
		||||
        question, correct_answer, wrong_answers = self.get_random_question()
 | 
			
		||||
class BaseQuestionsTask(SimpleVariantsBaseTask):
 | 
			
		||||
    async def init(self):
 | 
			
		||||
        question, correct_answer, wrong_answers = random.choice(QUESTIONS)  # nosec
 | 
			
		||||
        options = [correct_answer] + wrong_answers
 | 
			
		||||
        random.shuffle(options)  # nosec
 | 
			
		||||
 | 
			
		||||
        keyboard = InlineKeyboardMarkup(
 | 
			
		||||
        self.variants = [str(x) for x in options]
 | 
			
		||||
 | 
			
		||||
        self.task = question
 | 
			
		||||
        self.correct = correct_answer
 | 
			
		||||
 | 
			
		||||
    async def verify(self, data):
 | 
			
		||||
        return self.variants[data] == self.correct
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class QuestionInlineButtonsTask(BaseQuestionsTask, SimpleInlineButtonsTask):
 | 
			
		||||
    @staticmethod
 | 
			
		||||
    def type_name():
 | 
			
		||||
        return "question_buttons"
 | 
			
		||||
 | 
			
		||||
    def build_keyboard(self):
 | 
			
		||||
        return InlineKeyboardMarkup(
 | 
			
		||||
            inline_keyboard=[
 | 
			
		||||
                [
 | 
			
		||||
                    InlineKeyboardButton(
 | 
			
		||||
                        text=option,
 | 
			
		||||
                        text=str(option),
 | 
			
		||||
                        callback_data=VerificationCallback(
 | 
			
		||||
                            user_id=user_id, chat_id=chat_id, answer=str(i)
 | 
			
		||||
                            user_id=self.from_user_id,
 | 
			
		||||
                            chat_id=self.from_chat_id,
 | 
			
		||||
                            answer=str(i),
 | 
			
		||||
                        ).pack(),
 | 
			
		||||
                    )
 | 
			
		||||
                    for i, option in enumerate(self.variants)
 | 
			
		||||
                ]
 | 
			
		||||
                for i, option in enumerate(options)
 | 
			
		||||
            ]
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        message = await bot.send_message(
 | 
			
		||||
            chat_id,
 | 
			
		||||
            f"Привет, {user_mention(event.from_user)}! "
 | 
			
		||||
            "Пожалуйста, ответьте на следующий вопрос, "
 | 
			
		||||
            f"чтобы подтвердить, что вы не робот: {question}",
 | 
			
		||||
            reply_markup=keyboard,
 | 
			
		||||
            parse_mode=ParseMode.HTML,
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        return {
 | 
			
		||||
            "message_id": message.message_id,
 | 
			
		||||
            "correct": correct_answer,
 | 
			
		||||
            "options": options,
 | 
			
		||||
            "attempts_count": 2,
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
    async def verify(self, task_data):
 | 
			
		||||
        correct: str = task_data["correct"]
 | 
			
		||||
        answer: str = task_data["answer"]
 | 
			
		||||
 | 
			
		||||
        return task_data["options"][int(answer)] == correct
 | 
			
		||||
class QuestionPollTask(BaseQuestionsTask, SimplePollTask):
 | 
			
		||||
    @staticmethod
 | 
			
		||||
    def type_name():
 | 
			
		||||
        return "question_poll"
 | 
			
		||||
 
 | 
			
		||||
@@ -0,0 +1,178 @@
 | 
			
		||||
from string import Template
 | 
			
		||||
 | 
			
		||||
from aiogram import Bot
 | 
			
		||||
from aiogram.enums import ParseMode, PollType
 | 
			
		||||
from aiogram.types import ChatMemberUpdated, InlineKeyboardButton, InlineKeyboardMarkup
 | 
			
		||||
 | 
			
		||||
from ..utils import get_plural_form, key_from_poll, key_from_user_chat
 | 
			
		||||
from .base import BaseTask, VerificationCallback, mute_while_task
 | 
			
		||||
from .utils import user_mention
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class SimpleBaseTask(BaseTask):
 | 
			
		||||
    pass
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class SimpleVariantsBaseTaskConfig:
 | 
			
		||||
    def __init__(self, task_type, config: dict):
 | 
			
		||||
        self.config = config
 | 
			
		||||
        self.task_type = task_type
 | 
			
		||||
 | 
			
		||||
    @property
 | 
			
		||||
    def timeout(self):
 | 
			
		||||
        timeout = self.config.get(f"welcome::tasks::{self.task_type}::timeout")
 | 
			
		||||
 | 
			
		||||
        if timeout is None:
 | 
			
		||||
            return self.config.get("welcome::timeout")
 | 
			
		||||
 | 
			
		||||
        return timeout
 | 
			
		||||
 | 
			
		||||
    @property
 | 
			
		||||
    def task_message_text(self):
 | 
			
		||||
        return self.config.get(f"welcome::tasks::{self.task_type}::message_text")
 | 
			
		||||
 | 
			
		||||
    @property
 | 
			
		||||
    def task_retry_message_text(self):
 | 
			
		||||
        return self.config.get(f"welcome::tasks::{self.task_type}::retry_message_text")
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class SimpleVariantsBaseTask(SimpleBaseTask):
 | 
			
		||||
    def __init__(self, *args, **kwargs):
 | 
			
		||||
        super().__init__(*args, **kwargs)
 | 
			
		||||
 | 
			
		||||
        self.config = None
 | 
			
		||||
 | 
			
		||||
        self.variants = []
 | 
			
		||||
        self.task = ""
 | 
			
		||||
        self.correct = None
 | 
			
		||||
 | 
			
		||||
        self.task_message_id = None
 | 
			
		||||
 | 
			
		||||
    def set_config(self, cfg: SimpleVariantsBaseTaskConfig):
 | 
			
		||||
        self.config = cfg
 | 
			
		||||
 | 
			
		||||
    def get_timeout(self):
 | 
			
		||||
        return self.config.timeout
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@mute_while_task
 | 
			
		||||
class SimpleInlineButtonsTask(SimpleVariantsBaseTask):
 | 
			
		||||
    async def init(self):
 | 
			
		||||
        raise NotImplementedError()
 | 
			
		||||
 | 
			
		||||
    def build_keyboard(self):
 | 
			
		||||
        return InlineKeyboardMarkup(
 | 
			
		||||
            inline_keyboard=[
 | 
			
		||||
                [
 | 
			
		||||
                    InlineKeyboardButton(
 | 
			
		||||
                        text=str(option),
 | 
			
		||||
                        callback_data=VerificationCallback(
 | 
			
		||||
                            user_id=self.from_user_id,
 | 
			
		||||
                            chat_id=self.from_chat_id,
 | 
			
		||||
                            answer=str(option),
 | 
			
		||||
                        ).pack(),
 | 
			
		||||
                    )
 | 
			
		||||
                    for option in self.variants
 | 
			
		||||
                ]
 | 
			
		||||
            ]
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
    async def run(self):
 | 
			
		||||
        await self.init()
 | 
			
		||||
 | 
			
		||||
        message_template = Template(
 | 
			
		||||
            self.config.task_message_text
 | 
			
		||||
            if self.attempt_number == 1
 | 
			
		||||
            else self.config.task_retry_message_text
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        chat_id = self.from_chat_id
 | 
			
		||||
        message = await self.bot.send_message(
 | 
			
		||||
            chat_id,
 | 
			
		||||
            text=message_template.substitute(
 | 
			
		||||
                mention=user_mention(self.from_user),
 | 
			
		||||
                task=self.task,
 | 
			
		||||
                attempts=get_plural_form(
 | 
			
		||||
                    self.attemps_left, "попытка", "попытки", "попыток"
 | 
			
		||||
                ),
 | 
			
		||||
            ),
 | 
			
		||||
            reply_markup=self.build_keyboard(),
 | 
			
		||||
            parse_mode=ParseMode.HTML,
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        self.task_message_id = message.message_id
 | 
			
		||||
 | 
			
		||||
        await self.start_timeout_func()
 | 
			
		||||
 | 
			
		||||
        return [key_from_user_chat(self.from_user_id, self.from_chat_id)]
 | 
			
		||||
 | 
			
		||||
    async def verify(self, data):
 | 
			
		||||
        return self.correct == data
 | 
			
		||||
 | 
			
		||||
    async def end(self, success=True):
 | 
			
		||||
        await self.bot.delete_message(self.from_chat_id, self.task_message_id)
 | 
			
		||||
        if self.timeout_func_task:
 | 
			
		||||
            self.timeout_func_task.cancel()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@mute_while_task
 | 
			
		||||
class SimplePollTask(SimpleVariantsBaseTask):
 | 
			
		||||
    def __init__(
 | 
			
		||||
        self,
 | 
			
		||||
        event: ChatMemberUpdated,
 | 
			
		||||
        bot: Bot,
 | 
			
		||||
        timeout_func=None,
 | 
			
		||||
        attempt_number=1,
 | 
			
		||||
        max_attempts=1,
 | 
			
		||||
    ):
 | 
			
		||||
        super().__init__(event, bot, timeout_func, attempt_number, max_attempts)
 | 
			
		||||
        self.correct_index = None
 | 
			
		||||
 | 
			
		||||
    async def init(self):
 | 
			
		||||
        raise NotImplementedError()
 | 
			
		||||
 | 
			
		||||
    async def run(self):
 | 
			
		||||
        await self.init()
 | 
			
		||||
 | 
			
		||||
        self.correct_index = self.variants.index(self.correct)
 | 
			
		||||
 | 
			
		||||
        message_template = Template(
 | 
			
		||||
            self.config.task_message_text
 | 
			
		||||
            if self.attempt_number == 1
 | 
			
		||||
            else self.config.task_retry_message_text
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        chat_id = self.from_chat_id
 | 
			
		||||
        message = await self.bot.send_poll(
 | 
			
		||||
            chat_id,
 | 
			
		||||
            question=message_template.substitute(
 | 
			
		||||
                mention=self.from_user.first_name,
 | 
			
		||||
                task=self.task,
 | 
			
		||||
                attempts=get_plural_form(
 | 
			
		||||
                    self.attemps_left, "попытка", "попытки", "попыток"
 | 
			
		||||
                ),
 | 
			
		||||
            ),
 | 
			
		||||
            options=self.variants,
 | 
			
		||||
            type=PollType.QUIZ,
 | 
			
		||||
            correct_option_id=self.correct_index,
 | 
			
		||||
            allows_multiple_answers=False,
 | 
			
		||||
            is_anonymous=False,
 | 
			
		||||
            # parse_mode=ParseMode.HTML
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        self.task_message_id = message.message_id
 | 
			
		||||
 | 
			
		||||
        await self.start_timeout_func()
 | 
			
		||||
 | 
			
		||||
        return [
 | 
			
		||||
            key_from_poll(message.poll.id),
 | 
			
		||||
            key_from_user_chat(self.from_user_id, self.from_chat_id),
 | 
			
		||||
        ]
 | 
			
		||||
 | 
			
		||||
    async def verify(self, data):
 | 
			
		||||
        return self.correct_index == data
 | 
			
		||||
 | 
			
		||||
    async def end(self, success=True):
 | 
			
		||||
        await self.bot.delete_message(self.from_chat_id, self.task_message_id)
 | 
			
		||||
        if self.timeout_func_task:
 | 
			
		||||
            self.timeout_func_task.cancel()
 | 
			
		||||
@@ -1,5 +1,8 @@
 | 
			
		||||
import time
 | 
			
		||||
 | 
			
		||||
from aiogram import Bot
 | 
			
		||||
from aiogram.enums import ParseMode
 | 
			
		||||
from aiogram.types import User
 | 
			
		||||
from aiogram.types import ChatPermissions, User
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def user_mention(user: User, mode=ParseMode.HTML):
 | 
			
		||||
@@ -9,3 +12,53 @@ def user_mention(user: User, mode=ParseMode.HTML):
 | 
			
		||||
        return f"[{user.first_name}](tg://user?id={user.id})"
 | 
			
		||||
    else:
 | 
			
		||||
        raise ValueError(f"Unknown parse mode {mode}")
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def mute_user(chat_id, user_id, until, bot: Bot):
 | 
			
		||||
    end_time = until + int(time.time())
 | 
			
		||||
    await bot.restrict_chat_member(
 | 
			
		||||
        chat_id,
 | 
			
		||||
        user_id,
 | 
			
		||||
        until_date=end_time,
 | 
			
		||||
        use_independent_chat_permissions=True,
 | 
			
		||||
        permissions=ChatPermissions(
 | 
			
		||||
            can_send_messages=False,
 | 
			
		||||
            can_send_audios=False,
 | 
			
		||||
            can_send_documents=False,
 | 
			
		||||
            can_send_photos=False,
 | 
			
		||||
            can_send_videos=False,
 | 
			
		||||
            can_send_video_notes=False,
 | 
			
		||||
            can_send_voice_notes=False,
 | 
			
		||||
            can_send_polls=False,
 | 
			
		||||
            can_send_other_messages=False,
 | 
			
		||||
            can_add_web_page_previews=False,
 | 
			
		||||
            can_change_info=False,
 | 
			
		||||
            can_invite_users=False,
 | 
			
		||||
            can_pin_messages=False,
 | 
			
		||||
            can_manage_topics=False,
 | 
			
		||||
        ),
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def unmute_user(chat_id, user_id, bot: Bot):
 | 
			
		||||
    await bot.restrict_chat_member(
 | 
			
		||||
        chat_id,
 | 
			
		||||
        user_id,
 | 
			
		||||
        use_independent_chat_permissions=True,
 | 
			
		||||
        permissions=ChatPermissions(
 | 
			
		||||
            can_send_messages=True,
 | 
			
		||||
            can_send_audios=True,
 | 
			
		||||
            can_send_documents=True,
 | 
			
		||||
            can_send_photos=True,
 | 
			
		||||
            can_send_videos=True,
 | 
			
		||||
            can_send_video_notes=True,
 | 
			
		||||
            can_send_voice_notes=True,
 | 
			
		||||
            can_send_polls=True,
 | 
			
		||||
            can_send_other_messages=True,
 | 
			
		||||
            can_add_web_page_previews=True,
 | 
			
		||||
            can_change_info=True,
 | 
			
		||||
            can_invite_users=True,
 | 
			
		||||
            can_pin_messages=True,
 | 
			
		||||
            can_manage_topics=True,
 | 
			
		||||
        ),
 | 
			
		||||
    )
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user