Merged with fix/welcome-stage-1

This commit is contained in:
Maxim Slipenko 2024-08-14 00:08:25 +03:00
commit 3f46a83de3
12 changed files with 767 additions and 333 deletions

View File

@ -1,11 +1,10 @@
from aiogram.types import BotCommand
from ocab_core.modules_system.public_api import (
set_my_commands, log
)
from ocab_core.modules_system.public_api import set_my_commands
commands = dict()
def register_command(command, description, role="USER"):
if role not in commands:
commands[role] = dict()
@ -26,7 +25,7 @@ async def set_user_commands():
)
)
log(bot_commands)
# log(bot_commands)
await set_my_commands(
bot_commands,

View File

@ -54,6 +54,11 @@ class ChatIDFilter(BaseFilter):
chat_id = message.chat.id
approved_chats = self.approved_chats or get_approved_chat_id()
# Если список для фильтрации пуст - разрешаем всем.
if len(approved_chats) == 0:
return True
res = chat_id in approved_chats
return res ^ (self.blacklist)

View File

@ -1 +1 @@
from .main import module_init
from .main import module_init, module_late_init

View File

@ -6,6 +6,9 @@
"version": "1.0.0",
"privileged": true,
"dependencies": {
"required": {
"standard.config": "^1.0.0"
},
"optional": {
"standard.command_helper": "^1.0.0",
"standard.filters": "^1.0.0"

View File

@ -1,76 +1,182 @@
import asyncio
import random
from string import Template
from typing import TYPE_CHECKING
from aiogram import Bot, Router, types
from aiogram.enums import ChatMemberStatus, ParseMode
from aiogram.exceptions import TelegramBadRequest
from aiogram.filters import IS_MEMBER, IS_NOT_MEMBER, ChatMemberUpdatedFilter
from aiogram.types import ChatMemberUpdated
from aiogram.filters import JOIN_TRANSITION, LEAVE_TRANSITION, ChatMemberUpdatedFilter
from aiogram.types import ChatMemberUpdated, PollAnswer
from ocab_core.modules_system.public_api import log, register_router
from ocab_core.modules_system.public_api import get_module, log, register_router
from .verifications_methods.base import VerificationCallback
from .verifications_methods.iamhuman import IAmHumanButton, IAmHumanInput
from .verifications_methods.math import (
MathButtonsVerification,
MathInputVerificationMethod,
)
from .verifications_methods.question import (
QuestionButtonsVerification,
QuestionInputVerification,
from .utils import MultiKeyDict, get_plural_form, key_from_poll, key_from_user_chat
from .verifications_methods.base import BaseTask, VerificationCallback
from .verifications_methods.math import MathInlineButtonsTask, MathPollTask
from .verifications_methods.question import QuestionInlineButtonsTask, QuestionPollTask
from .verifications_methods.simple import (
SimpleVariantsBaseTask,
SimpleVariantsBaseTaskConfig,
)
from .verifications_methods.utils import user_mention
verification_methods = [
IAmHumanButton(),
IAmHumanInput(),
MathButtonsVerification(),
MathInputVerificationMethod(),
QuestionButtonsVerification(),
QuestionInputVerification(),
if TYPE_CHECKING:
from ocab_modules.standard.config import IConfig
from ocab_modules.standard.filters import ChatIDFilter as IChatIDFilter
config: "IConfig" = get_module("standard.config", "config")
try:
ChatIDFilter: "type[IChatIDFilter]" = get_module("standard.filters", "ChatIDFilter")
FILTERS_MODULE_LOADED = True
except Exception:
FILTERS_MODULE_LOADED = False
pass
all_tasks = [
MathInlineButtonsTask,
MathPollTask,
QuestionInlineButtonsTask,
QuestionPollTask,
]
verification_tasks = {}
class TaskManager:
def __init__(self, config: "IConfig"):
self.config = config
self.available_tasks = []
self.max_attempts = 1
def init(self):
for cls in all_tasks:
type_name = cls.type_name()
if self.config.get(f"welcome::tasks::{type_name}::enabled"):
log(f"Task {cls.type_name()} enabled")
self.available_tasks.append(cls)
self.max_attempts = self.config.get("welcome::max_attempts")
def build_random_task(self, event, bot, attempt_number=1) -> BaseTask:
cls = random.choice(self.available_tasks) # nosec
obj = cls(
event,
bot,
timeout_func=verify_timeout,
attempt_number=attempt_number,
max_attempts=self.max_attempts,
)
if isinstance(obj, SimpleVariantsBaseTask):
cfg = SimpleVariantsBaseTaskConfig(obj.type_name(), self.config)
obj.set_config(cfg)
return obj
verification_tasks = MultiKeyDict()
last_success = {}
async def new_member_handler(event: ChatMemberUpdated, bot: Bot):
if event.new_chat_member.status == "member":
# НЕ СРАБОТАЕТ, ЕСЛИ ЧЕЛОВЕК УЖЕ ОГРАНИЧЕН В ПРАВАХ (RESTRICTED)
if event.new_chat_member.status == ChatMemberStatus.MEMBER:
task = task_manager.build_random_task(event, bot)
keys = await task.run()
verification_tasks.add(task, keys)
async def left_member_handler(event: ChatMemberUpdated, bot: Bot):
user_id = event.from_user.id
chat_id = event.chat.id
method = random.choice(verification_methods) # nosec
task_data = await method.create_task(event, bot)
key = key_from_user_chat(user_id, chat_id)
task_data["user_id"] = user_id
task_data["chat_id"] = chat_id
task_data["method"] = method
task = asyncio.create_task(verify_timeout(bot, task_data))
verification_tasks[(user_id, chat_id)] = {
"task": task,
"task_data": task_data,
}
async def verify_timeout(bot: Bot, task_data: dict):
try:
chat_id = task_data["chat_id"]
user_id = task_data["user_id"]
await asyncio.sleep(30)
try:
if "message_id" in task_data:
await bot.delete_message(chat_id, task_data["message_id"])
except TelegramBadRequest:
if not verification_tasks.exists(key):
return
chat_member = await bot.get_chat_member(chat_id, user_id)
if chat_member.status == "member":
await bot.ban_chat_member(chat_id, user_id)
task = verification_tasks.get(key)
await task.end(success=False)
verification_tasks.pop((user_id, chat_id), None)
async def verify_timeout(task: BaseTask):
user_id = task.from_user_id
chat_id = task.from_chat_id
try:
timeout = task.get_timeout()
# log(f"Start timeout {timeout}")
await asyncio.sleep(timeout)
await task.end(success=False)
await task.bot.ban_chat_member(chat_id, user_id)
except Exception as e:
log(f"Error in verify_timeout: {e}")
finally:
verification_tasks.pop((user_id, chat_id), None)
verification_tasks.remove(key_from_user_chat(user_id, chat_id))
async def success_end(task: BaseTask):
await task.end()
await asyncio.sleep(3)
if task.from_chat_id in last_success:
message_id = last_success.pop(task.from_chat_id)
try:
await task.bot.delete_message(task.from_chat_id, message_id)
except TelegramBadRequest:
pass
if config.get("welcome::show_success_message"):
message = await task.bot.send_message(
task.from_chat_id,
Template(config.get("welcome::success_message")).substitute(
mention=user_mention(task.from_user)
),
parse_mode=ParseMode.HTML,
)
last_success[task.from_chat_id] = message.message_id
async def handle_poll_verification(answer: PollAnswer, bot: Bot):
key = key_from_poll(answer.poll_id)
if not verification_tasks.exists(key):
return
task: BaseTask = verification_tasks.get(key)
if task.from_user_id != answer.user.id:
return
result = await task.verify(answer.option_ids[0])
if result:
await success_end(task)
return
await task.end(success=False)
current_attempt = task.attempt_number
if current_attempt >= task_manager.max_attempts:
await task.bot.ban_chat_member(task.from_chat_id, task.from_user_id)
return
await asyncio.sleep(5)
current_attempt = current_attempt + 1
new_task = task_manager.build_random_task(
task.event, task.bot, attempt_number=current_attempt
)
keys = await new_task.run()
log(keys)
verification_tasks.add(new_task, keys)
async def handle_inline_button_verification(
@ -79,55 +185,178 @@ async def handle_inline_button_verification(
user_id = callback_data.user_id
chat_id = callback_data.chat_id
if callback_query.from_user.id == user_id:
if (user_id, chat_id) in verification_tasks:
task = verification_tasks[(user_id, chat_id)]
task_data = task["task_data"]
method = task_data["method"]
task_data["answer"] = callback_data.answer
result = await method.verify(task_data)
if result:
task["task"].cancel()
await bot.delete_message(chat_id, callback_query.message.message_id)
else:
await callback_query.answer("Неправильный ответ!", show_alert=True)
pass
else:
if callback_query.from_user.id != user_id:
await callback_query.answer("Эта кнопка не для вас!", show_alert=True)
return
key = key_from_user_chat(user_id, chat_id)
async def handle_input_verification(message: types.Message, bot: Bot):
user_id = message.from_user.id
chat_id = message.chat.id
if not verification_tasks.exists(key):
await callback_query.answer()
return
if (user_id, chat_id) in verification_tasks:
task = verification_tasks[(user_id, chat_id)]
task_data = task["task_data"]
method = task_data["method"]
task_data["answer"] = message.text
task: BaseTask = verification_tasks.get(key)
result = await method.verify(task_data)
result = await task.verify(callback_data.answer)
if result:
task["task"].cancel()
await bot.delete_message(chat_id, task_data["message_id"])
await bot.delete_message(chat_id, message.message_id)
await success_end(task)
return
pass
await task.end(success=False)
current_attempt = task.attempt_number
if current_attempt >= task_manager.max_attempts:
await callback_query.answer()
await task.bot.ban_chat_member(chat_id, user_id)
return
await callback_query.answer(
Template(config.get("welcome::retry_message")).substitute(
attempts=get_plural_form(
task_manager.max_attempts - current_attempt,
"попытка",
"попытки",
"попыток",
)
),
show_alert=True,
)
current_attempt = current_attempt + 1
new_task = task_manager.build_random_task(
task.event, task.bot, attempt_number=current_attempt
)
keys = await new_task.run()
verification_tasks.add(new_task, keys)
config: "IConfig" = get_module("standard.config", "config")
task_manager = TaskManager(config)
async def module_init():
config.register("welcome::timeout", "int", default_value=60)
config.register("welcome::max_attempts", "int", default_value=5)
config.register(
"welcome::retry_message",
"string",
default_value="Неправильный ответ! У вас еще $attempts.",
)
config.register("welcome::show_success_message", "boolean", default_value=True)
config.register(
"welcome::success_message",
"string",
default_value="$mention, вы успешно прошли проверку!",
)
# MATH BUTTONS
config.register(
"welcome::tasks::math_buttons::enabled", "boolean", default_value=False
)
config.register(
"welcome::tasks::math_buttons::message_text",
"int",
default_value="Привет, $mention!\n"
"Ответьте на вопрос, "
"чтобы подтвердить, что вы не робот:\n\n<b>$task</b>",
)
config.register(
"welcome::tasks::math_buttons::retry_message_text",
"int",
default_value="$mention, неправильный ответ! У вас еще $attempts\n"
"Ответьте на вопрос, "
"чтобы подтвердить, что вы не робот:\n\n<b>$task</b>",
)
config.register("welcome::tasks::math_buttons::timeout", "int", default_value=None)
# MATH POLL
config.register("welcome::tasks::math_poll::enabled", "boolean", default_value=True)
config.register(
"welcome::tasks::math_poll::message_text",
"string",
default_value="Привет, $mention!\n"
"Ответьте на вопрос, "
"чтобы подтвердить, что вы не робот:\n\n$task",
)
config.register(
"welcome::tasks::math_poll::retry_message_text",
"string",
default_value="$mention, неправильный ответ! У вас еще $attempts\n"
"Ответьте на вопрос, "
"чтобы подтвердить, что вы не робот:\n\n$task",
)
config.register("welcome::tasks::math_poll::timeout", "int", default_value=None)
# QUESTION BUTTONS
config.register(
"welcome::tasks::question_buttons::enabled", "boolean", default_value=False
)
config.register(
"welcome::tasks::question_buttons::message_text",
"string",
default_value="Привет, $mention!\n"
"Ответьте на вопрос, "
"чтобы подтвердить, что вы не робот:\n\n<b>$task</b>",
)
config.register(
"welcome::tasks::question_buttons::retry_message_text",
"string",
default_value="$mention, неправильный ответ! У вас еще $attempts.\n"
"Ответьте на вопрос, "
"чтобы подтвердить, что вы не робот:\n\n<b>$task</b>",
)
config.register(
"welcome::tasks::question_buttons::timeout", "int", default_value=None
)
# QUESTION POLL
config.register(
"welcome::tasks::question_poll::enabled", "boolean", default_value=True
)
config.register(
"welcome::tasks::question_poll::message_text",
"string",
default_value="Привет, $mention!\n"
"Ответьте на вопрос, "
"чтобы подтвердить, что вы не робот:\n\n$task",
)
config.register(
"welcome::tasks::question_poll::retry_message_text",
"string",
default_value="$mention, неправильный ответ! У вас еще $attempts.\n"
"Ответьте на вопрос, "
"чтобы подтвердить, что вы не робот:\n\n$task",
)
config.register("welcome::tasks::question_poll::timeout", "int", default_value=None)
router = Router()
router.chat_member(ChatMemberUpdatedFilter(IS_NOT_MEMBER >> IS_MEMBER))(
common_filters_pre = []
if FILTERS_MODULE_LOADED:
common_filters_pre.append(ChatIDFilter())
router.chat_member(*common_filters_pre, ChatMemberUpdatedFilter(JOIN_TRANSITION))(
new_member_handler
)
router.callback_query(VerificationCallback.filter())(
router.chat_member(*common_filters_pre, ChatMemberUpdatedFilter(LEAVE_TRANSITION))(
left_member_handler
)
router.callback_query(*common_filters_pre, VerificationCallback.filter())(
handle_inline_button_verification
)
router.message()(handle_input_verification)
# Нельзя применить ChatIDFilter из-за отстутсвия id чата
router.poll_answer()(handle_poll_verification)
register_router(router)
async def module_late_init():
task_manager.init()

View File

@ -0,0 +1,45 @@
def get_plural_form(number, singular, genitive_singular, plural):
if 11 <= number % 100 <= 19:
return f"{number} {plural}"
elif number % 10 == 1:
return f"{number} {singular}"
elif 2 <= number % 10 <= 4:
return f"{number} {genitive_singular}"
else:
return f"{number} {plural}"
class MultiKeyDict:
def __init__(self):
self.value_to_keys = {} # Словарь значений и связанных с ними ключей
self.key_to_value = {} # Словарь ключей и связанных с ними значений
def add(self, value, keys):
# Добавляем значение в словарь с множеством ключей
self.value_to_keys[value] = set(keys)
# Для каждого ключа создаем запись в словаре key_to_value
for key in keys:
self.key_to_value[key] = value
def get(self, key):
return self.key_to_value.get(key)
def exists(self, key):
return key in self.key_to_value
def remove(self, key):
if key in self.key_to_value:
value = self.key_to_value.pop(key)
self.value_to_keys[value].remove(key)
for k in self.value_to_keys[value]:
del self.key_to_value[k]
def key_from_user_chat(user_id, chat_id):
return f"uc:{user_id}_{chat_id}"
def key_from_poll(poll_id):
return f"p:{poll_id}"

View File

@ -1,18 +1,102 @@
import asyncio
from functools import wraps
from aiogram import Bot
from aiogram.exceptions import TelegramBadRequest
from aiogram.filters.callback_data import CallbackData
from aiogram.types import ChatMemberUpdated
from ocab_core.modules_system.public_api import log
from .utils import mute_user, unmute_user
class VerificationMethod:
class BaseTask:
def __init__(
self,
event: ChatMemberUpdated,
bot: Bot,
timeout_func=None,
attempt_number=1,
max_attempts=1,
):
self.bot = bot
self.event = event
self.timeout_func = timeout_func
self.attempt_number = attempt_number
self.max_attempts = max_attempts
self.timeout_func_task = None
@property
def from_chat_id(self):
return self.event.chat.id
@property
def from_user_id(self):
return self.event.from_user.id
@property
def from_user(self):
return self.event.from_user
@property
def attemps_left(self):
return self.max_attempts - self.attempt_number + 1
async def start_timeout_func(self):
if self.timeout_func:
self.timeout_func_task = asyncio.create_task(self.timeout_func(self))
@staticmethod
def type_name():
raise NotImplementedError()
async def run(self):
raise NotImplementedError()
async def verify(self, data):
raise NotImplementedError()
async def end(self, success=True):
raise NotImplementedError()
async def get_timeout():
raise NotImplementedError()
def mute_while_task(cls):
original_run = getattr(cls, "run", None)
original_end = getattr(cls, "end", None)
if not original_run and not original_end:
return cls
@wraps(original_run)
async def wrapped_run(self: BaseTask):
chat_id = self.from_chat_id
user_id = self.from_user_id
try:
await mute_user(chat_id, user_id, 0, self.bot)
except TelegramBadRequest as e:
log(e)
pass
return await original_run(self)
@wraps(original_end)
async def wrapped_end(self: BaseTask, success=True):
await original_end(self, success)
if success:
chat_id = self.from_chat_id
user_id = self.from_user_id
try:
await unmute_user(chat_id, user_id, self.bot)
except TelegramBadRequest as e:
log(e)
pass
class InputVerificationMethod(VerificationMethod):
def verify(input_value: str, task_data):
pass
class InlineButtonVerificationMethod(VerificationMethod):
def verify(input_value: str, task_data):
pass
cls.run = wrapped_run
cls.end = wrapped_end
return cls
class VerificationCallback(CallbackData, prefix="verify"):

View File

@ -1,74 +0,0 @@
import random
from aiogram import Bot
from aiogram.types import ChatMemberUpdated, InlineKeyboardButton, InlineKeyboardMarkup
from .base import (
InlineButtonVerificationMethod,
InputVerificationMethod,
VerificationCallback,
)
class IAmHumanButton(InlineButtonVerificationMethod):
def __init__(self):
pass
def method_name(self):
return "i_am_human_button"
async def create_task(self, event: ChatMemberUpdated, bot: Bot):
user_id = event.from_user.id
chat_id = event.chat.id
keyboard = InlineKeyboardMarkup(
inline_keyboard=[
[
InlineKeyboardButton(
text="Я человек!",
callback_data=VerificationCallback(
user_id=user_id, chat_id=chat_id, answer="OK"
).pack(),
)
]
]
)
message = await bot.send_message(
chat_id,
f"Привет, {event.from_user.first_name}! "
"Нажмите кнопку, чтобы подтвердить, что вы не робот.",
reply_markup=keyboard,
)
return {"message_id": message.message_id}
async def verify(self, task_data):
return True
class IAmHumanInput(InputVerificationMethod):
def __init__(self):
pass
def method_name(self):
return "i_am_human_input"
def get_text(self):
return random.choice(["Я человек", "Я не робот"]) # nosec
async def create_task(self, event: ChatMemberUpdated, bot: Bot):
chat_id = event.chat.id
text = self.get_text()
message = await bot.send_message(
chat_id,
f"Привет, {event.from_user.first_name}! "
f'Напишите "{text}", чтобы подтвердить, что вы не робот.',
)
return {"message_id": message.message_id, "correct": text}
async def verify(self, task_data):
correct: str = task_data["correct"]
answer: str = task_data["answer"]
return answer.lower() == correct.lower()

View File

@ -1,59 +1,9 @@
import random
from aiogram import Bot
from aiogram.types import ChatMemberUpdated, InlineKeyboardButton, InlineKeyboardMarkup
from .base import (
InlineButtonVerificationMethod,
InputVerificationMethod,
VerificationCallback,
)
from .simple import SimpleInlineButtonsTask, SimplePollTask, SimpleVariantsBaseTask
class MathInputVerificationMethod(InputVerificationMethod):
def __init__(self):
pass
def method_name(self):
return "math_input"
def generate_math_problem(self):
a = random.randint(1, 10) # nosec
b = random.randint(1, 10) # nosec
operation = random.choice(["+", "-", "*"]) # nosec
if operation == "+":
answer = a + b
elif operation == "-":
answer = a - b
else:
answer = a * b
return f"{a} {operation} {b}", str(answer)
async def create_task(self, event: ChatMemberUpdated, bot: Bot):
chat_id = event.chat.id
problem, answer = self.generate_math_problem()
message = await bot.send_message(
chat_id,
f"Привет, {event.from_user.first_name}! "
"Решите простую математическую задачу, "
f"чтобы подтвердить, что вы не робот: {problem} = ?",
)
return {"message_id": message.message_id, "correct": answer}
async def verify(self, task_data):
correct: str = task_data["correct"]
answer: str = task_data["answer"]
return answer.strip() == correct
class MathButtonsVerification(InlineButtonVerificationMethod):
def __init__(self):
pass
def method_name(self):
return "math_buttons"
class BaseMathTask(SimpleVariantsBaseTask):
def generate_math_problem(self):
a = random.randint(1, 10) # nosec
b = random.randint(1, 10) # nosec
@ -66,46 +16,38 @@ class MathButtonsVerification(InlineButtonVerificationMethod):
answer = a * b
return f"{a} {operation} {b}", answer
async def create_task(self, event: ChatMemberUpdated, bot: Bot):
user_id = event.from_user.id
chat_id = event.chat.id
async def init(self):
problem, correct_answer = self.generate_math_problem()
options = [correct_answer]
while len(options) < 4:
self.variants = [correct_answer]
while len(self.variants) < 4:
wrong_answer = random.randint(
max(1, correct_answer - 5), correct_answer + 5
correct_answer - 5, correct_answer + 5
) # nosec
if wrong_answer not in options:
options.append(wrong_answer)
random.shuffle(options) # nosec
if wrong_answer not in self.variants:
self.variants.append(wrong_answer)
random.shuffle(self.variants) # nosec
keyboard = InlineKeyboardMarkup(
inline_keyboard=[
[
InlineKeyboardButton(
text=str(option),
callback_data=VerificationCallback(
user_id=user_id, chat_id=chat_id, answer=str(option)
).pack(),
)
for option in options
]
]
)
self.variants = [str(x) for x in self.variants]
message = await bot.send_message(
chat_id,
f"Привет, {event.from_user.first_name}! "
"Решите простую математическую задачу, "
f"чтобы подтвердить, что вы не робот: {problem} = ?",
reply_markup=keyboard,
)
self.task = f"{problem} = ?"
self.correct = str(correct_answer)
return {"message_id": message.message_id, "correct": str(correct_answer)}
async def verify(self, task_data):
correct: str = task_data["correct"]
answer: str = task_data["answer"]
class MathInlineButtonsTask(BaseMathTask, SimpleInlineButtonsTask):
"""
Математическая задача с выбором через inline-кнопки
"""
return answer == correct
@staticmethod
def type_name():
return "math_buttons"
class MathPollTask(BaseMathTask, SimplePollTask):
"""
Математическая задача с выбором через Poll
"""
@staticmethod
def type_name():
return "math_poll"

View File

@ -1,13 +1,9 @@
import random
from aiogram import Bot
from aiogram.types import ChatMemberUpdated, InlineKeyboardButton, InlineKeyboardMarkup
from aiogram.types import InlineKeyboardButton, InlineKeyboardMarkup
from .base import (
InlineButtonVerificationMethod,
InputVerificationMethod,
VerificationCallback,
)
from .base import VerificationCallback
from .simple import SimpleInlineButtonsTask, SimplePollTask, SimpleVariantsBaseTask
QUESTIONS = [
(
@ -39,82 +35,45 @@ QUESTIONS = [
]
class QuestionInputVerification(InputVerificationMethod):
def __init__(self):
pass
def method_name(self):
return "question_input"
def get_random_question(self):
return random.choice(QUESTIONS) # nosec
async def create_task(self, event: ChatMemberUpdated, bot: Bot):
chat_id = event.chat.id
question, answer, _ = self.get_random_question()
message = await bot.send_message(
chat_id,
f"Привет, {event.from_user.first_name}! "
"Пожалуйста, ответьте на следующий вопрос, "
f"чтобы подтвердить, что вы не робот: {question}",
)
return {"message_id": message.message_id, "correct": answer.lower()}
async def verify(self, task_data):
correct: str = task_data["correct"]
answer: str = task_data["answer"]
return answer.lower().strip() == correct
class QuestionButtonsVerification(InlineButtonVerificationMethod):
def __init__(self):
pass
def method_name(self):
return "question_inline"
def get_random_question(self):
return random.choice(QUESTIONS) # nosec
async def create_task(self, event: ChatMemberUpdated, bot: Bot):
user_id = event.from_user.id
chat_id = event.chat.id
question, correct_answer, wrong_answers = self.get_random_question()
class BaseQuestionsTask(SimpleVariantsBaseTask):
async def init(self):
question, correct_answer, wrong_answers = random.choice(QUESTIONS) # nosec
options = [correct_answer] + wrong_answers
random.shuffle(options) # nosec
keyboard = InlineKeyboardMarkup(
self.variants = [str(x) for x in options]
self.task = question
self.correct = correct_answer
async def verify(self, data):
return self.variants[int(data)] == self.correct
class QuestionInlineButtonsTask(BaseQuestionsTask, SimpleInlineButtonsTask):
@staticmethod
def type_name():
return "question_buttons"
def build_keyboard(self):
return InlineKeyboardMarkup(
inline_keyboard=[
[
InlineKeyboardButton(
text=option,
text=str(option),
callback_data=VerificationCallback(
user_id=user_id, chat_id=chat_id, answer=str(i)
user_id=self.from_user_id,
chat_id=self.from_chat_id,
answer=str(i),
).pack(),
)
for i, option in enumerate(options)
]
for i, option in enumerate(self.variants)
]
)
message = await bot.send_message(
chat_id,
f"Привет, {event.from_user.first_name}! "
"Пожалуйста, ответьте на следующий вопрос, "
f"чтобы подтвердить, что вы не робот: {question}",
reply_markup=keyboard,
)
return {
"message_id": message.message_id,
"correct": correct_answer,
"options": options,
}
async def verify(self, task_data):
correct: str = task_data["correct"]
answer: str = task_data["answer"]
return task_data["options"][int(answer)] == correct
class QuestionPollTask(BaseQuestionsTask, SimplePollTask):
@staticmethod
def type_name():
return "question_poll"

View File

@ -0,0 +1,178 @@
from string import Template
from aiogram import Bot
from aiogram.enums import ParseMode, PollType
from aiogram.types import ChatMemberUpdated, InlineKeyboardButton, InlineKeyboardMarkup
from ..utils import get_plural_form, key_from_poll, key_from_user_chat
from .base import BaseTask, VerificationCallback, mute_while_task
from .utils import user_mention
class SimpleBaseTask(BaseTask):
pass
class SimpleVariantsBaseTaskConfig:
def __init__(self, task_type, config: dict):
self.config = config
self.task_type = task_type
@property
def timeout(self):
timeout = self.config.get(f"welcome::tasks::{self.task_type}::timeout")
if timeout is None:
return self.config.get("welcome::timeout")
return timeout
@property
def task_message_text(self):
return self.config.get(f"welcome::tasks::{self.task_type}::message_text")
@property
def task_retry_message_text(self):
return self.config.get(f"welcome::tasks::{self.task_type}::retry_message_text")
class SimpleVariantsBaseTask(SimpleBaseTask):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.config = None
self.variants = []
self.task = ""
self.correct = None
self.task_message_id = None
def set_config(self, cfg: SimpleVariantsBaseTaskConfig):
self.config = cfg
def get_timeout(self):
return self.config.timeout
@mute_while_task
class SimpleInlineButtonsTask(SimpleVariantsBaseTask):
async def init(self):
raise NotImplementedError()
def build_keyboard(self):
return InlineKeyboardMarkup(
inline_keyboard=[
[
InlineKeyboardButton(
text=str(option),
callback_data=VerificationCallback(
user_id=self.from_user_id,
chat_id=self.from_chat_id,
answer=str(option),
).pack(),
)
for option in self.variants
]
]
)
async def run(self):
await self.init()
message_template = Template(
self.config.task_message_text
if self.attempt_number == 1
else self.config.task_retry_message_text
)
chat_id = self.from_chat_id
message = await self.bot.send_message(
chat_id,
text=message_template.substitute(
mention=user_mention(self.from_user),
task=self.task,
attempts=get_plural_form(
self.attemps_left, "попытка", "попытки", "попыток"
),
),
reply_markup=self.build_keyboard(),
parse_mode=ParseMode.HTML,
)
self.task_message_id = message.message_id
await self.start_timeout_func()
return [key_from_user_chat(self.from_user_id, self.from_chat_id)]
async def verify(self, data):
return self.correct == data
async def end(self, success=True):
await self.bot.delete_message(self.from_chat_id, self.task_message_id)
if self.timeout_func_task:
self.timeout_func_task.cancel()
@mute_while_task
class SimplePollTask(SimpleVariantsBaseTask):
def __init__(
self,
event: ChatMemberUpdated,
bot: Bot,
timeout_func=None,
attempt_number=1,
max_attempts=1,
):
super().__init__(event, bot, timeout_func, attempt_number, max_attempts)
self.correct_index = None
async def init(self):
raise NotImplementedError()
async def run(self):
await self.init()
self.correct_index = self.variants.index(self.correct)
message_template = Template(
self.config.task_message_text
if self.attempt_number == 1
else self.config.task_retry_message_text
)
chat_id = self.from_chat_id
message = await self.bot.send_poll(
chat_id,
question=message_template.substitute(
mention=self.from_user.first_name,
task=self.task,
attempts=get_plural_form(
self.attemps_left, "попытка", "попытки", "попыток"
),
),
options=self.variants,
type=PollType.QUIZ,
correct_option_id=self.correct_index,
allows_multiple_answers=False,
is_anonymous=False,
# parse_mode=ParseMode.HTML
)
self.task_message_id = message.message_id
await self.start_timeout_func()
return [
key_from_poll(message.poll.id),
key_from_user_chat(self.from_user_id, self.from_chat_id),
]
async def verify(self, data):
return self.correct_index == data
async def end(self, success=True):
await self.bot.delete_message(self.from_chat_id, self.task_message_id)
if self.timeout_func_task:
self.timeout_func_task.cancel()

View File

@ -0,0 +1,64 @@
import time
from aiogram import Bot
from aiogram.enums import ParseMode
from aiogram.types import ChatPermissions, User
def user_mention(user: User, mode=ParseMode.HTML):
if mode == ParseMode.HTML:
return f"<a href='tg://user?id={user.id}'>{user.first_name}</a>"
elif mode == ParseMode.MARKDOWN:
return f"[{user.first_name}](tg://user?id={user.id})"
else:
raise ValueError(f"Unknown parse mode {mode}")
async def mute_user(chat_id, user_id, until, bot: Bot):
end_time = until + int(time.time())
await bot.restrict_chat_member(
chat_id,
user_id,
until_date=end_time,
use_independent_chat_permissions=True,
permissions=ChatPermissions(
can_send_messages=False,
can_send_audios=False,
can_send_documents=False,
can_send_photos=False,
can_send_videos=False,
can_send_video_notes=False,
can_send_voice_notes=False,
can_send_polls=False,
can_send_other_messages=False,
can_add_web_page_previews=False,
can_change_info=False,
can_invite_users=False,
can_pin_messages=False,
can_manage_topics=False,
),
)
async def unmute_user(chat_id, user_id, bot: Bot):
await bot.restrict_chat_member(
chat_id,
user_id,
use_independent_chat_permissions=True,
permissions=ChatPermissions(
can_send_messages=True,
can_send_audios=True,
can_send_documents=True,
can_send_photos=True,
can_send_videos=True,
can_send_video_notes=True,
can_send_voice_notes=True,
can_send_polls=True,
can_send_other_messages=True,
can_add_web_page_previews=True,
can_change_info=True,
can_invite_users=True,
can_pin_messages=True,
can_manage_topics=True,
),
)