Переименование файлов и директорий

This commit is contained in:
Armatik
2024-08-16 22:53:33 +03:00
parent 2f634a4eef
commit 1fbe2b0c18
112 changed files with 0 additions and 0 deletions

View File

@@ -0,0 +1,12 @@
# Karkas Blocks
Karkas Blocks содержит набор модулей для платформы Open Chat AI Bot (Karkas).
## Описание
Karkas - это платформа для создания чат-ботов Telegram. Модули - это расширения, которые добавляют функциональность ботам Karkas.
## Типы модулей
* **Стандартные модули (standard.*):** Предоставляют основные функции, такие как управление пользователями, ролями и настройками.
* **Дополнительные официальные модули (external.*):** Разработаны командой Karkas и предоставляют расширенные возможности, такие как интеграция с нейросетями, внешними сервисами и API.

View File

@@ -0,0 +1 @@
from .lib import block_loader

View File

@@ -0,0 +1,25 @@
import importlib
import os
from karkas_core.modules_system.loaders.fs_loader import FSLoader
from karkas_core.modules_system.loaders.unsafe_fs_loader import UnsafeFSLoader
def get_module_directory(module_name):
spec = importlib.util.find_spec(module_name)
if spec is None:
raise ImportError(f"Module {module_name} not found")
module_path = spec.origin
if module_path is None:
raise ImportError(f"Module {module_name} has no origin path")
return os.path.dirname(module_path)
karkas_blocks_path = get_module_directory("karkas_blocks")
def block_loader(namespace: str, module_name: str, safe=True):
if not safe:
return UnsafeFSLoader(f"{karkas_blocks_path}/{namespace}/{module_name}")
else:
return FSLoader(f"{karkas_blocks_path}/{namespace}/{module_name}")

View File

@@ -0,0 +1,20 @@
# Модуль Admin
Модуль `admin` предоставляет администраторам и модераторам чата инструменты для управления:
## Функциональность
- Удаление сообщений.
- Получение ID чата.
## Команды
- `/rm` - удалить сообщение, на которое отвечает команда.
- `/chatID` - получить ID текущего чата.
## Использование
1. Ответьте на сообщение, которое нужно удалить.
2. Отправьте команду `/rm`.
Чтобы получить ID чата, отправьте команду `/chatID`.

View File

@@ -0,0 +1 @@
from .main import module_init

View File

@@ -0,0 +1,65 @@
# flake8: noqa
from typing import TYPE_CHECKING
from aiogram import Bot
from aiogram.types import Message
from karkas_core.modules_system.public_api import get_module
if TYPE_CHECKING:
from karkas_blocks.standard.config import IConfig
config: "IConfig" = get_module("standard.config", "config")
def get_default_chat_tag():
return config.get("filters::default_chat_tag")
async def delete_message(message: Message, bot: Bot):
reply_message_id = message.reply_to_message.message_id
await bot.delete_message(message.chat.id, reply_message_id)
async def error_access(message: Message, bot: Bot):
await message.reply("Вы не админ/модератор")
async def get_chat_id(message: Message, bot: Bot):
await message.reply(
f"ID данного чата: `{message.chat.id}`", parse_mode="MarkdownV2"
)
async def chat_not_in_approve_list(message: Message, bot: Bot):
await message.reply(
f"Бот недоступен в данном чате, пожалуйста,"
f" обратитесь к администратору для добавления чата в список доступных или перейдите в чат "
f"{get_default_chat_tag()}"
)
await get_chat_id(message, bot)
async def mute_user(chat_id: int, user_id: int, time: int, bot: Bot):
# *, can_send_messages: bool | None = None, can_send_audios: bool | None = None, can_send_documents: bool | None = None, can_send_photos: bool | None = None, can_send_videos: bool | None = None, can_send_video_notes: bool | None = None, can_send_voice_notes: bool | None = None, can_send_polls: bool | None = None, can_send_other_messages: bool | None = None, can_add_web_page_previews: bool | None = None, can_change_info: bool | None = None, can_invite_users: bool | None = None, can_pin_messages: bool | None = None, can_manage_topics: bool | None = None, **extra_data: Any)
mutePermissions = {
"can_send_messages": False,
"can_send_audios": False,
"can_send_documents": False,
"can_send_photos": False,
"can_send_videos": False,
"can_send_video_notes": False,
"can_send_voice_notes": False,
"can_send_polls": False,
"can_send_other_messages": False,
"can_add_web_page_previews": False,
"can_change_info": False,
"can_invite_users": False,
"can_pin_messages": False,
"can_manage_topics": False,
}
end_time = time + int(time.time())
await bot.restrict_chat_member(
chat_id, user_id, until_date=end_time, **mutePermissions
)

View File

@@ -0,0 +1,14 @@
{
"id": "standard.admin",
"name": "Admin",
"description": "Модуль для работы с админкой",
"author": "Karkas Team",
"version": "1.0.0",
"privileged": false,
"dependencies": {
"required": {
"standard.filters": "^1.0.0",
"standard.roles": "^1.0.0"
}
}
}

View File

@@ -0,0 +1,7 @@
from karkas_core.modules_system.public_api import register_router
from .routers import router
async def module_init():
register_router(router)

View File

@@ -0,0 +1,27 @@
# flake8: noqa
from aiogram import F, Router
from aiogram.filters import Command
from karkas_core.modules_system.public_api import get_module, log
from .handlers import (
chat_not_in_approve_list,
delete_message,
error_access,
get_chat_id,
)
(ChatModerOrAdminFilter, ChatNotInApproveFilter) = get_module(
"standard.filters", ["ChatModerOrAdminFilter", "ChatNotInApproveFilter"]
)
router = Router()
# Если сообщение содержит какой либо текст и выполняется фильтр ChatNotInApproveFilter, то вызывается функция chat_not_in_approve_list
router.message.register(chat_not_in_approve_list, ChatNotInApproveFilter(), F.text)
router.message.register(get_chat_id, ChatModerOrAdminFilter(), Command("chatID"))
router.message.register(delete_message, ChatModerOrAdminFilter(), Command("rm"))
router.message.register(error_access, Command("rm"))
router.message.register(error_access, Command("chatID"))

View File

@@ -0,0 +1,24 @@
# Модуль Command Helper
Модуль `command_helper` упрощает регистрацию команд бота и управление ими.
## Функциональность
- Регистрация команд бота.
- Установка команд для пользователей в зависимости от их роли.
## Использование
1. Импортируйте функцию `register_command`.
2. Вызовите функцию `register_command`, передав ей название команды, ее описание и роль пользователя,
которому доступна эта команда.
## Пример
```python
from karkas_core.modules_system.public_api import get_module
register_command = get_module("standard.command_helper", "register_command")
register_command("my_command", "Описание моей команды", role="ADMIN")
```

View File

@@ -0,0 +1 @@
from .main import get_user_commands, module_late_init, register_command

View File

@@ -0,0 +1,9 @@
{
"id": "standard.command_helper",
"name": "Command helper",
"description": "Модуль для отображения команд при вводе '/'",
"author": "Karkas Team",
"version": "1.0.0",
"privileged": false,
"dependencies": {}
}

View File

@@ -0,0 +1,42 @@
from aiogram.types import BotCommand
from karkas_core.modules_system.public_api import set_my_commands
commands = dict()
def register_command(command, description, role="USER"):
if role not in commands:
commands[role] = dict()
commands[role][command] = {
"description": description,
}
async def set_user_commands():
bot_commands = []
if "USER" in commands:
user_commands = commands["USER"]
for command in user_commands:
bot_commands.append(
BotCommand(
command=command,
description=user_commands[command]["description"],
)
)
# log(bot_commands)
await set_my_commands(
bot_commands,
)
def get_user_commands():
if "USER" in commands:
return commands["USER"].copy()
return {}
async def module_late_init():
await set_user_commands()

View File

@@ -0,0 +1,28 @@
# Модуль Config
Модуль `config` управляет конфигурацией бота.
## Функциональность
- Загрузка конфигурации из файла `config.yaml`.
- Сохранение конфигурации в файл.
- Регистрация параметров конфигурации.
- Получение значений параметров.
## Использование
1. Импортируйте объект `config`.
2. Вызовите метод `register`, чтобы зарегистрировать параметр конфигурации.
3. Вызовите метод `get`, чтобы получить значение параметра.
## Пример
```python
from karkas_core.modules_system.public_api import get_module
config = get_module("standard.config", "config")
config.register("my_parameter", "string", default_value="default")
value = config.get("my_parameter")
```

View File

@@ -0,0 +1,2 @@
from .config import IConfig, config
from .main import module_late_init

View File

@@ -0,0 +1,7 @@
# flake8: noqa
from .config_manager import ConfigManager
IConfig = ConfigManager
config: ConfigManager = ConfigManager(config_path="config.yaml")

View File

@@ -0,0 +1,115 @@
import inspect
from typing import Any, Dict, List
import yaml
class ConfigManager:
def __init__(self, config_path: str):
self.config_path = config_path
self._config: Dict[str, Dict[str, Any]] = {}
self._metadata: Dict[str, Dict[str, Any]] = {}
def load(self, file_path: str = ""):
if not file_path:
file_path = self.config_path
def build_key(prev, next):
if prev:
return f"{prev}::{next}"
return next
def recurse_set(value, key=""):
if isinstance(value, dict):
for k, v in value.items():
recurse_set(v, build_key(key, k))
return
if key in self._metadata:
self._config[key] = value
with open(file_path, "r", encoding="utf-8") as file:
data = yaml.safe_load(file)
recurse_set(data)
def save(self, file_path: str = ""):
if not file_path:
file_path = self.config_path
def nested_dict(flat_dict):
result = {}
for key, value in flat_dict.items():
keys = key.split("::")
d = result
for k in keys[:-1]:
d = d.setdefault(k, {})
d[keys[-1]] = value
return result
with open(file_path, "w", encoding="utf-8") as file:
yaml.dump(nested_dict(self._config), file, allow_unicode=True)
def _check_rights(self, key, module_id, access_type="get"):
return
def get(self, key: str):
module_id = self._get_module_id()
self._check_rights(key, module_id)
return self._config.get(key, self._metadata.get(key).get("default_value"))
def get_meta(self, key: str):
module_id = self._get_module_id()
self._check_rights(key, module_id, "get_meta")
return self._metadata.get(key)
def _get_module_id(self):
caller_frame = inspect.currentframe().f_back.f_back
caller_globals = caller_frame.f_globals
module_id = caller_globals.get("__karkas_block_id__")
return module_id
def mass_set(self, updates: Dict[str, Any]):
module_id = self._get_module_id()
for key, value in updates.items():
self._check_rights(key, module_id, "set")
if key in self._metadata:
# TODO: add checks to validate the type and value based on metadata
self._config[key] = value
else:
raise KeyError(f"Key {key} is not registered.")
def register(
self,
key: str,
value_type: str,
options: List[Any] = None,
multiple: bool = False,
default_value=None,
editable: bool = True,
shared: bool = False,
required: bool = False,
visible: bool = True,
pretty_name: str = "",
description: str = "",
):
module_id = self._get_module_id()
self._check_rights(key, module_id, "register")
if key in self._metadata:
raise ValueError("ERROR")
self._metadata[key] = {
"type": value_type,
"multiple": multiple,
"options": options,
"default_value": default_value,
"visible": visible,
"editable": editable,
"shared": shared,
"required": required,
"pretty_name": pretty_name,
"description": description,
"module_id": module_id,
}

View File

@@ -0,0 +1,18 @@
{
"id": "standard.config",
"name": "Config YAML",
"description": "Модуль для работы с конфигурационным файлом бота (YAML)",
"author": "Karkas Team",
"version": "1.0.0",
"privileged": true,
"dependencies": {
"optional": {
"standard.miniapp": "^1.0.0"
}
},
"pythonDependencies": {
"optional": {
"flet": "^0.23.2"
}
}
}

View File

@@ -0,0 +1,30 @@
from karkas_core.modules_system.public_api import get_module, log
from .config import config
from .miniapp_ui import get_miniapp_blueprint
def register_settings_page():
try:
register_page = get_module("standard.miniapp", "register_page")
prefix = "settings"
register_page(
name="Настройки",
path="/settings",
blueprint=get_miniapp_blueprint(config, prefix),
prefix=prefix,
role="ADMIN",
)
pass
except Exception as e:
log(str(e))
pass
def module_late_init():
register_settings_page()
pass

View File

@@ -0,0 +1,266 @@
import asyncio
from typing import TYPE_CHECKING
from .config_manager import ConfigManager
try:
import dash_bootstrap_components as dbc
import flask
from dash_extensions.enrich import ALL, Input, Output, State, dcc, html
DASH_AVAILABLE = True
except ImportError:
DASH_AVAILABLE = False
from karkas_core.modules_system.public_api import get_module
if TYPE_CHECKING:
from karkas_blocks.standard.roles import Roles as IRoles
def create_control(key: str, config: ConfigManager):
value = config.get(key)
meta = config.get_meta(key)
component_id = {
"type": "setting",
"key": key,
}
label_text = meta.get("pretty_name") or key
if meta.get("type") in ["string", "int", "float", "password"]:
input_type = {
"string": "text",
"int": "number",
"float": "number",
"password": "password",
}.get(meta.get("type"), "text")
input_props = {
"id": component_id,
"type": input_type,
}
if meta.get("type") != "password":
input_props["value"] = value
if meta.get("type") == "int":
input_props["step"] = 1
input_props["pattern"] = r"\d+"
elif meta.get("type") == "float":
input_props["step"] = "any"
component = dbc.Input(**input_props, invalid=False)
elif meta.get("type") == "select":
options = [{"label": opt, "value": opt} for opt in meta.get("options", [])]
component = dcc.Dropdown(
id=component_id,
options=options,
value=value,
style={
"padding-left": 0,
"padding-right": 0,
},
)
elif meta.get("type") == "checkbox":
component = dbc.Checkbox(
id=component_id,
checked=value,
label=label_text,
)
else:
return None
row = []
if meta.get("type") != "checkbox":
row.append(dbc.Label(label_text))
row.append(component)
if meta.get("description"):
row.append(dbc.FormText(meta.get("description")))
return dbc.Row(row, className="mb-3 mx-1")
def build_settings_tree(config: ConfigManager):
tree = {}
for key, value in config._metadata.items():
if not value["visible"]:
continue
parts = key.split("::")
control = create_control(key, config)
current = tree
for i, part in enumerate(parts[:-1]):
if part not in current:
current[part] = {"__controls": []}
current = current[part]
current["__controls"].append(control)
return tree
def create_card(category, controls):
return dbc.Card(
[
dbc.CardHeader(html.H3(category, className="mb-0")),
dbc.CardBody(controls),
],
className="mb-3",
)
def create_settings_components(tree, level=0):
components = []
for category, subtree in tree.items():
if category == "__controls":
continue
controls = subtree.get("__controls", [])
subcomponents = create_settings_components(subtree, level + 1)
if controls or subcomponents:
card_content = controls + subcomponents
card = create_card(category, card_content)
components.append(card)
return components
def get_miniapp_blueprint(config: ConfigManager, prefix: str):
Roles: "type[IRoles]" = get_module("standard.roles", "Roles")
roles = Roles()
import datetime
from dash_extensions.enrich import DashBlueprint
bp = DashBlueprint()
def create_layout():
settings_tree = build_settings_tree(config)
settings_components = create_settings_components(settings_tree)
layout = html.Div(
[
html.Script(),
html.H1("Настройки"),
dbc.Form(settings_components),
html.Div(id="save-confirmation"),
dbc.Button(
"Сохранить",
id="save-settings",
color="primary",
className="mt-3 w-100",
n_clicks=0,
),
html.Div(id="settings-update-trigger", style={"display": "none"}),
dcc.Store(id="settings-store"),
],
style={
"padding": "20px",
},
)
return layout
bp.layout = create_layout
@bp.callback(
Output("save-confirmation", "children"),
Output("settings-store", "data"),
Input("save-settings", "n_clicks"),
State({"type": "setting", "key": ALL}, "value"),
State({"type": "setting", "key": ALL}, "id"),
prevent_initial_call=True,
allow_duplicate=True,
)
def save_settings(n_clicks, values, keys):
if n_clicks > 0:
user = getattr(flask.g, "user", None)
if user is None:
return (
dbc.Alert(
"Вы не авторизованы!",
color="danger",
duration=10000,
),
"-",
)
if not asyncio.run(roles.check_admin_permission(user["id"])):
return (
dbc.Alert(
"Вы не администратор!",
color="danger",
duration=10000,
),
"-",
)
# TODO: добавить валидацию значений
updated_settings = {}
for value, id_dict in zip(values, keys):
key: str = id_dict["key"]
if prefix:
key = key.removeprefix(f"{prefix}-")
meta = config.get_meta(key)
if meta["type"] == "password":
if value: # Only update if a new value is provided
updated_settings[key] = value
else:
updated_settings[key] = value
config.mass_set(updated_settings)
config.save()
now = datetime.datetime.now()
date_str = now.strftime("%H:%M:%S")
return (
dbc.Alert(
f"Настройки сохранены в {date_str}",
color="success",
duration=10000,
),
date_str,
)
bp.clientside_callback(
"""
function(n_clicks) {
const buttonSelector = '#%s-save-settings';
if (n_clicks > 0) {
document.querySelector(buttonSelector).disabled = true;
}
}
"""
% (prefix),
Input("save-settings", "n_clicks"),
)
bp.clientside_callback(
"""
function(data) {
const buttonSelector = '#%s-save-settings';
if (data) {
document.querySelector(buttonSelector).disabled = false;
}
}
"""
% (prefix),
Input("settings-store", "data"),
)
return bp

View File

@@ -0,0 +1,45 @@
import unittest
from src.karkas_blocks.standard.config.config import get_config
yaml_load = get_config(is_test=True)
class TestConfig(unittest.TestCase):
def test_yaml_load_correctness(self):
self.assertIsNotNone(yaml_load)
self.assertIn("TELEGRAM", yaml_load)
self.assertIn("TOKEN", yaml_load["TELEGRAM"])
self.assertIn("ROLES", yaml_load)
self.assertIn("ADMIN", yaml_load["ROLES"])
self.assertIn("MODERATOR", yaml_load["ROLES"])
self.assertIn("USER", yaml_load["ROLES"])
self.assertIn("BOT", yaml_load["ROLES"])
def test_yaml_keys_existence(self):
self.assertTrue(all(key in yaml_load for key in ["TELEGRAM", "ROLES"]))
self.assertIn("TOKEN", yaml_load["TELEGRAM"])
self.assertTrue(
all(role in yaml_load["ROLES"] for role in ["ADMIN", "MODERATOR", "USER"])
)
def test_yaml_yaml_load_types(self):
self.assertIsInstance(yaml_load["TELEGRAM"]["TOKEN"], str)
self.assertTrue(
all(
isinstance(yaml_load["ROLES"][role], int)
for role in ["ADMIN", "MODERATOR", "USER"]
)
)
def test_yaml_values(self):
expected_token = "xxxxxxxxxxxxxxxxxxxx" # nosec
expected_role_values = {"ADMIN": 0, "MODERATOR": 1, "USER": 2, "BOT": 3}
self.assertEqual(yaml_load["TELEGRAM"]["TOKEN"], expected_token)
for role, value in expected_role_values.items():
self.assertEqual(yaml_load["ROLES"][role], value)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,48 @@
## Модуль DataBase
Модуль DataBase предназначен для ведения и работы с базами данных Karkas.
Модуль содержит в себе следующие таблицы:
* `Chats` - таблица для хранения информации о чатах.
* `Users` - таблица для хранения информации о пользователях.
* `Messages` - таблица для хранения информации о сообщениях.
* `ChatStats` - таблица для хранения статистики чатов по дням.
* `UserStats` - таблица для хранения статистики пользователей по дням.
руктура таблицы `Chats`:
* `chat_id` - идентификатор чата.
* `chat_name` - название чата.
* `chat_type` - тип чата. (0 - Чат администраторов, 1 - Пользовательский чат, 3 - Чат разрешённых личных запросов к боту
10 - Не инициализированный чат)
* `chat_stats` - количество всех отправленных сообщений в чате.
руктура таблицы `Users`:
* `user_id` - идентификатор пользователя telegram.
* `user_tag` - тег пользователя telegram.
* `user_name` - имя пользователя telegram.
* `user_role` - роль пользователя в чате. (0 - Администратор, 1 - Модератор, 2 - Пользователь)
* `user_stats` - количество всех отправленных сообщений пользователем.
* `user_rep` - репутация пользователя.
руктура таблицы `Messages`:
* `message_chat_id` - идентификатор чата в котором отправлено сообщение.
* `message_id` - идентификатор сообщения.
* `messag_sender_id` - идентификатор пользователя отправившего сообщение. Если сообщение отправил бот, то
`messag_sender_id` = 0.
* `answer_to_message_id` - идентификатор сообщения на которое дан ответ. Если ответа нет или ответ на служебное
сообщение о создании топика в чатах с форумным типом, то `answer_to_message_id` = 0.
* `message_ai_model` - идентификатор модели нейросети, которая использовалась для генерации ответа. Если ответ'
сгенерирован не был, то `message_ai_model` = null.
* `message_text` - текст сообщения.
руктура таблицы `ChatStats`:
* `chat_id` - идентификатор чата для которого собрана статистика.
* `date` - дата на которую собрана статистика.
* `messages_count` - количество сообщений отправленных в чат за день.
руктура таблицы `UserStats`:
* `chat_id` - идентификатор чата для которого собрана статистика.
* `user_id` - идентификатор пользователя для которого собрана статистика.
* `date` - дата на которую собрана статистика.
* `messages_count` - количество сообщений отправленных пользователем в чат за день.

View File

@@ -0,0 +1,5 @@
from . import db_api, models, repositories
async def module_init():
db_api.connect_database()

View File

@@ -0,0 +1,301 @@
import peewee as pw
from aiogram.types import Message
from .exceptions import NotExpectedModuleName
from .models.chat_stats import ChatStats
from .models.chats import Chats
from .models.db import database_proxy
from .models.fsm_data import FSMData
from .models.messages import Messages
from .models.user_stats import UserStats
from .models.users import Users
def connect_database(is_test: bool = False, module: str | None = None):
if module:
raise NotExpectedModuleName()
db_path = "database"
database = pw.SqliteDatabase(f"{db_path}/Karkas.db")
database_proxy.initialize(database)
database.connect()
create_tables(database)
return database, f"{db_path}/Karkas.db"
def create_tables(db: pw.SqliteDatabase):
"""Создание таблиц"""
for table in Chats, Messages, Users, UserStats, ChatStats, FSMData:
if not table.table_exists():
db.create_tables([table])
def add_chat(chat_id, chat_name, chat_type=10, chat_stats=0):
chat, created = Chats.get_or_create(
id=chat_id,
defaults={
"chat_name": chat_name,
"chat_type": chat_type,
"chat_all_stat": chat_stats,
},
)
if not created:
# Обновить существующий чат, если он уже существует
chat.chat_name = chat_name
chat.chat_type = chat_type
chat.chat_stats = chat_stats
chat.save()
def add_user(
user_id,
user_first_name,
user_last_name=None,
user_tag=None,
user_role=0,
user_stats=0,
user_rep=0,
):
if user_last_name is None:
user_name = user_first_name
else:
user_name = user_first_name + " " + user_last_name
user, created = Users.get_or_create(
id=user_id,
defaults={
"user_tag": user_tag,
"user_name": user_name,
"user_role": user_role,
"user_stats": user_stats,
"user_rep": user_rep,
},
)
if not created:
# Обновить существующего пользователя, если он уже существует
user.user_tag = user_tag
user.user_name = user_name
user.user_role = user_role
user.user_stats = user_stats
user.user_rep = user_rep
user.save()
def add_message(message: Message, message_ai_model=None):
if message.reply_to_message:
answer_to_message_id = message.reply_to_message.message_id
else:
answer_to_message_id = None
Messages.create(
message_chat_id=message.chat.id,
message_id=message.message_id,
message_sender_id=message.from_user.id,
answer_to_message_id=answer_to_message_id,
message_ai_model=message_ai_model,
message_text=message.text,
)
def add_chat_stats(chat_id, date, messages_count):
ChatStats.create(chat_id=chat_id, date=date, messages_count=messages_count)
def add_user_stats(chat_id, user_id, date, messages_count):
UserStats.create(
chat_id=chat_id, user_id=user_id, date=date, messages_count=messages_count
)
# Работа с таблицей чатов
def get_chat(chat_id):
return Chats.get_or_none(Chats.id == chat_id)
def change_chat_name(chat_id, new_chat_name):
query = Chats.update(chat_name=new_chat_name).where(Chats.id == chat_id)
query.execute()
def change_chat_type(chat_id, new_chat_type):
query = Chats.update(chat_type=new_chat_type).where(Chats.id == chat_id)
query.execute()
def get_chat_all_stat(chat_id):
chat = Chats.get_or_none(Chats.id == chat_id)
return chat.chat_all_stat if chat else None
# Работа с таблицей пользователей
def get_user(user_id) -> Users | None:
return Users.get_or_none(Users.id == user_id)
def get_user_tag(user_id):
user = Users.get_or_none(Users.id == user_id)
return user.user_tag if user else None
def get_user_id(user_tag):
user = Users.get_or_none(Users.user_tag == user_tag)
return user.id if user else None
def get_user_name(user_id):
user = Users.get_or_none(Users.id == user_id)
return user.user_name if user else None
def get_user_role(user_id: str):
user = Users.get_or_none(Users.id == user_id)
return user.user_role if user else None
def get_user_all_stats(user_id):
user = Users.get_or_none(Users.id == user_id)
return user.user_stats if user else None
def get_user_rep(user_id):
user = Users.get_or_none(Users.id == user_id)
return user.user_rep if user else None
def change_user_name(user_id, user_first_name, user_last_name=None):
if user_last_name is None:
new_user_name = user_first_name
else:
new_user_name = user_first_name + " " + user_last_name
query = Users.update(user_name=new_user_name).where(Users.id == user_id)
query.execute()
def change_user_tag(user_id, new_user_tag):
query = Users.update(user_tag=new_user_tag).where(Users.id == user_id)
query.execute()
def change_user_role(user_id, new_user_role):
query = Users.update(user_role=new_user_role).where(Users.id == user_id)
query.execute()
# Работа с таблицей сообщений
def get_message(message_chat_id, message_id):
return Messages.get_or_none(
Messages.message_chat_id == message_chat_id,
Messages.message_id == message_id,
)
def get_message_sender_id(message_chat_id, message_id):
message = Messages.get_or_none(
Messages.message_chat_id == message_chat_id, Messages.message_id == message_id
)
return message.message_sender_id if message else None
def get_message_text(message_chat_id, message_id):
message = Messages.get_or_none(
Messages.message_chat_id == message_chat_id, Messages.message_id == message_id
)
return message.message_text if message else None
def get_message_ai_model(message_chat_id, message_id):
message = Messages.get_or_none(
Messages.message_chat_id == message_chat_id, Messages.message_id == message_id
)
return message.message_ai_model if message else None
def get_answer_to_message_id(message_chat_id, message_id):
message = Messages.get_or_none(
Messages.message_chat_id == message_chat_id, Messages.message_id == message_id
)
return message.answer_to_message_id if message else None
# Работа с таблицей статистики чатов
def get_chat_stats(chat_id):
chat_stats = {}
for chat_stat in ChatStats.select().where(ChatStats.chat_id == chat_id):
chat_stats[chat_stat.date] = chat_stat.messages_count
return chat_stats
# Работа с таблицей статистики пользователей
def get_user_stats(user_id):
user_stats = {}
for user_stat in UserStats.select().where(UserStats.user_id == user_id):
user_stats[user_stat.date] = user_stat.messages_count
return user_stats
# Функции обновления
def update_chat_all_stat(chat_id):
query = Chats.update(chat_all_stat=Chats.chat_all_stat + 1).where(
Chats.id == chat_id
)
query.execute()
def update_chat_stats(chat_id, date):
chat_stats = ChatStats.get_or_none(
ChatStats.chat_id == chat_id, ChatStats.date == date
)
if chat_stats:
query = ChatStats.update(messages_count=ChatStats.messages_count + 1).where(
ChatStats.chat_id == chat_id, ChatStats.date == date
)
query.execute()
else:
ChatStats.create(chat_id=chat_id, date=date, messages_count=1)
def update_user_all_stat(user_id):
user = Users.get_or_none(Users.id == user_id)
if user:
query = Users.update(user_stats=Users.user_stats + 1).where(Users.id == user_id)
query.execute()
else:
Users.create(id=user_id, user_stats=1)
def update_user_rep(user_id):
user = Users.get_or_none(Users.id == user_id)
if user:
query = Users.update(user_rep=Users.user_rep + 1).where(Users.id == user_id)
query.execute()
else:
Users.create(id=user_id, user_rep=1)
def update_user_stats(chat_id, user_id, date):
user_stats = UserStats.get_or_none(
UserStats.chat_id == chat_id,
UserStats.user_id == user_id,
UserStats.date == date,
)
if user_stats:
query = UserStats.update(messages_count=UserStats.messages_count + 1).where(
UserStats.chat_id == chat_id,
UserStats.user_id == user_id,
UserStats.date == date,
)
query.execute()
else:
UserStats.create(chat_id=chat_id, user_id=user_id, date=date, messages_count=1)

View File

@@ -0,0 +1,12 @@
class MissingModuleName(BaseException):
def __init__(self):
self.message = "Пропущено название директории модуля"
super().__init__(self.message)
class NotExpectedModuleName(BaseException):
def __init__(self):
self.message = "Не ожидалось название директории модуля"
super().__init__(self.message)

View File

@@ -0,0 +1,9 @@
{
"id": "standard.database",
"name": "Database",
"description": "Модуль для работы с БД",
"author": "Karkas Team",
"version": "1.0.0",
"privileged": true,
"dependencies": {}
}

View File

@@ -0,0 +1 @@
from .fsm_data import FSMData

View File

@@ -0,0 +1,12 @@
import peewee as pw
from .db import database_proxy
class ChatStats(pw.Model):
class Meta:
database = database_proxy
chat_id = pw.IntegerField(null=False)
date = pw.DateField(null=False)
messages_count = pw.IntegerField(null=False, default=0)

View File

@@ -0,0 +1,12 @@
import peewee as pw
from .db import database_proxy
class Chats(pw.Model):
class Meta:
database = database_proxy
chat_name = pw.CharField(null=False)
chat_type = pw.IntegerField(null=False, default=10)
chat_all_stat = pw.IntegerField(null=False)

View File

@@ -0,0 +1,3 @@
from peewee import DatabaseProxy
database_proxy = DatabaseProxy()

View File

@@ -0,0 +1,12 @@
import peewee as pw
from .db import database_proxy
class FSMData(pw.Model):
class Meta:
database = database_proxy
key = pw.CharField(primary_key=True)
state = pw.CharField(null=True)
data = pw.CharField(null=True)

View File

@@ -0,0 +1,15 @@
import peewee as pw
from .db import database_proxy
class Messages(pw.Model):
class Meta:
database = database_proxy
message_chat_id = pw.IntegerField(null=False)
message_id = pw.IntegerField(null=False)
message_sender_id = pw.IntegerField(null=False)
answer_to_message_id = pw.IntegerField(null=True)
message_ai_model = pw.TextField(null=True)
message_text = pw.TextField(null=False)

View File

@@ -0,0 +1,13 @@
import peewee as pw
from .db import database_proxy
class UserStats(pw.Model):
class Meta:
database = database_proxy
chat_id = pw.IntegerField(null=False)
user_id = pw.IntegerField(null=False)
date = pw.DateField(null=False)
messages_count = pw.IntegerField(null=False, default=0)

View File

@@ -0,0 +1,14 @@
import peewee as pw
from .db import database_proxy
class Users(pw.Model):
class Meta:
database = database_proxy
user_tag = pw.CharField(null=True)
user_name = pw.CharField(null=False) # до 255 символов
user_role = pw.IntegerField(null=True, default=3)
user_stats = pw.IntegerField(null=True, default=0)
user_rep = pw.IntegerField(null=True, default=0)

View File

@@ -0,0 +1 @@
from .fsm_data import FSMDataRepository

View File

@@ -0,0 +1,32 @@
from peewee import fn
from ..models import FSMData
class FSMDataRepository:
def get(self, key: str):
return FSMData.get_or_none(key=key)
def set_state(self, key: str, state: str):
FSMData.insert(
key=key,
state=state,
data=fn.COALESCE(
FSMData.select(FSMData.data).where(FSMData.key == key), None
),
).on_conflict(
conflict_target=[FSMData.key],
update={FSMData.state: state},
).execute()
def set_data(self, key: str, data: str):
FSMData.insert(
key=key,
data=data,
state=fn.COALESCE(
FSMData.select(FSMData.state).where(FSMData.key == key), None
),
).on_conflict(
conflict_target=[FSMData.key],
update={FSMData.data: data},
).execute()

View File

@@ -0,0 +1 @@
Эта директория для тестовой БД

View File

@@ -0,0 +1,142 @@
# flake8: noqa
import os
import unittest
from ...exceptions.module_exceptions import MissingModuleName, NotExpectedModuleName
from ..db_api import *
class TestDatabaseAPI(unittest.TestCase):
database = None
path = None
@classmethod
def setUpClass(cls):
cls.database, cls.path = connect_database(is_test=True, module="database")
create_tables(cls.database)
def test_fail_connect(cls):
with cls.assertRaises(MissingModuleName):
cls.database, cls.path = connect_database(is_test=True)
with cls.assertRaises(NotExpectedModuleName):
cls.database, cls.path = connect_database(module="database")
def test_add_and_get_chat(self):
add_chat(chat_id=21, chat_role=0, chat_stats=0, chat_federation=0)
add_chat(chat_id=22, chat_role=1, chat_stats=100, chat_federation=1)
chat1 = get_chat(21)
self.assertIsNotNone(chat1)
self.assertEqual(chat1.id, 21)
self.assertEqual(chat1.chat_role, 0)
chat2 = get_chat(22)
self.assertIsNotNone(chat2)
self.assertEqual(chat2.id, 22)
self.assertEqual(chat2.chat_role, 1)
def test_add_and_get_message(self):
add_message(
message_id=1, message_text="Test Message 1", message_sender=1, answer_id=2
)
add_message(
message_id=2, message_text="Test Message 2", message_sender=2, answer_id=1
)
message1 = get_message(1)
self.assertIsNotNone(message1)
self.assertEqual(message1.id, 1)
self.assertEqual(message1.message_text, "Test Message 1")
message2 = get_message(2)
self.assertIsNotNone(message2)
self.assertEqual(message2.id, 2)
self.assertEqual(message2.message_text, "Test Message 2")
def test_add_and_get_user(self):
add_user(
user_id=100,
user_name="TestUser1",
user_tag="TestTag1",
user_role=0,
user_stats=10,
user_rep=5,
)
add_user(
user_id=101,
user_name="TestUser2",
user_tag="TestTag2",
user_role=1,
user_stats=20,
user_rep=10,
)
user1 = get_user(100)
self.assertIsNotNone(user1)
self.assertEqual(user1.id, 100)
self.assertEqual(user1.user_name, "TestUser1")
user2 = get_user(101)
self.assertIsNotNone(user2)
self.assertEqual(user2.id, 101)
self.assertEqual(user2.user_name, "TestUser2")
def test_get_user_role(self):
add_user(
user_id=102,
user_name="TestUser3",
user_tag="TestTag3",
user_role=0,
user_stats=30,
user_rep=15,
)
add_user(
user_id=103,
user_name="TestUser4",
user_tag="TestTag4",
user_role=1,
user_stats=40,
user_rep=20,
)
user_role1 = get_user_role(102)
self.assertEqual(user_role1, 0)
user_role2 = get_user_role(103)
self.assertEqual(user_role2, 1)
def test_change_user_name(self):
add_user(
user_id=104,
user_name="OldName1",
user_tag="TestTag5",
user_role=0,
user_stats=50,
user_rep=25,
)
change_user_name(104, "NewName1")
updated_user1 = get_user(104)
self.assertEqual(updated_user1.user_name, "NewName1")
add_user(
user_id=105,
user_name="OldName2",
user_tag="TestTag6",
user_role=1,
user_stats=60,
user_rep=30,
)
change_user_name(105, "NewName2")
updated_user2 = get_user(105)
self.assertEqual(updated_user2.user_name, "NewName2")
@classmethod
def tearDownClass(cls):
cls.database.close()
os.system(f"rm {cls.path}") # nosec
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,29 @@
# Модуль Filters
Модуль `filters` предоставляет фильтры для aiogram, которые используются для ограничения доступа к командам
и обработчикам событий.
## Фильтры
- `ChatModerOrAdminFilter` - пропускает сообщения только от модераторов и администраторов чата.
- `ChatNotInApproveFilter` - пропускает сообщения только из чатов, не входящих в список разрешенных.
## Использование
Фильтры можно использовать в декораторах `@router.message` и `@router.callback_query`.
## Пример
```python
from aiogram import Router
from karkas_core.modules_system.public_api import get_module
ChatModerOrAdminFilter = get_module("standard.filters", "ChatModerOrAdminFilter")
router = Router()
@router.message(ChatModerOrAdminFilter())
async def admin_command(message: Message):
# Обработка команды, доступной только администраторам и модераторам.
pass
```

View File

@@ -0,0 +1,7 @@
from .filters import (
ChatIDFilter,
ChatModerOrAdminFilter,
ChatNotInApproveFilter,
chat_not_in_approve,
module_init,
)

View File

@@ -0,0 +1,84 @@
from typing import TYPE_CHECKING
from aiogram import Bot
from aiogram.filters import BaseFilter
from aiogram.types import Message
from typing_extensions import deprecated
from karkas_core.modules_system.public_api import get_module
if TYPE_CHECKING:
from karkas_blocks.standard.config import IConfig
from karkas_blocks.standard.roles import Roles as IRoles
config: "IConfig" = get_module("standard.config", "config")
try:
Roles: "type[IRoles]" = get_module("standard.roles", "Roles")
ROLES_MODULE_LOADED = True
except Exception:
ROLES_MODULE_LOADED = False
pass
def module_init():
config.register(
"filters::approved_chat_id", "int", multiple=True, shared=True, default_value=[]
)
config.register("filters::default_chat_tag", "string", shared=True)
def get_approved_chat_id() -> list:
return config.get("filters::approved_chat_id")
@deprecated("Use ChatIDFilter or own implementation")
def chat_not_in_approve(message: Message) -> bool:
chat_id = message.chat.id
if chat_id in get_approved_chat_id():
# log(f"Chat in approve list: {chat_id}")
return False
else:
# log(f"Chat not in approve list: {chat_id}")
return True
class ChatIDFilter(BaseFilter):
def __init__(self, blacklist=False, approved_chats=None) -> None:
self.blacklist = blacklist
self.approved_chats = approved_chats
super().__init__()
async def __call__(self, message: Message, bot: Bot) -> bool:
chat_id = message.chat.id
approved_chats = self.approved_chats or get_approved_chat_id()
# Если список для фильтрации пуст - разрешаем всем.
if len(approved_chats) == 0:
return True
res = chat_id in approved_chats
return res ^ (self.blacklist)
class ChatNotInApproveFilter(ChatIDFilter):
def __init__(self) -> None:
super().__init__(allow=False)
class ChatModerOrAdminFilter(BaseFilter):
async def __call__(self, message: Message, bot: Bot) -> bool:
if not ROLES_MODULE_LOADED:
raise Exception("Roles module not loaded")
user_id = message.from_user.id
roles = Roles()
admins = await bot.get_chat_administrators(message.chat.id)
return (
await roles.check_admin_permission(user_id)
or await roles.check_moderator_permission(user_id)
or any(user_id == admin.user.id for admin in admins)
)

View File

@@ -0,0 +1,16 @@
{
"id": "standard.filters",
"name": "Filters",
"description": "Модуль с фильтрами",
"author": "Karkas Team",
"version": "1.0.0",
"privileged": true,
"dependencies": {
"required": {
"standard.config": "^1.0.0"
},
"optional": {
"standard.roles": "^1.0.0"
}
}
}

View File

@@ -0,0 +1,13 @@
# Модуль FSM Database Storage
Модуль `fsm_database_storage` реализует хранение состояний FSM (Finite State Machine) в базе данных.
## Функциональность
- Сохранение состояния FSM в базу данных.
- Получение состояния FSM из базы данных.
- Обновление данных состояния FSM.
## Использование
Модуль автоматически регистрирует хранилище состояний FSM при инициализации.

View File

@@ -0,0 +1 @@
from .fsm import module_init

View File

@@ -0,0 +1,129 @@
import json
from typing import TYPE_CHECKING, Any, Dict, Optional
from aiogram.fsm.state import State
from aiogram.fsm.storage.base import BaseStorage, StorageKey
from karkas_core.modules_system.public_api import get_module, log
from karkas_core.modules_system.public_api.public_api import set_fsm
if TYPE_CHECKING:
from karkas_blocks.standard.database.repositories import (
FSMDataRepository as IFSMDataRepository,
)
FSMDataRepository: "type[IFSMDataRepository]" = get_module(
"standard.database", "repositories.FSMDataRepository"
)
def serialize_key(key: StorageKey) -> str:
return f"{key.bot_id}:{key.chat_id}:{key.user_id}"
def serialize_object(obj: object) -> str | None:
try:
return json.dumps(obj)
except Exception as e:
log(f"Serializing error! {e}")
return None
def deserialize_object(obj):
try:
return json.loads(obj)
except Exception as e:
log(f"Deserializing error! {e}")
return None
class SQLStorage(BaseStorage):
def __init__(self):
super().__init__()
self.repo = FSMDataRepository()
async def set_state(self, key: StorageKey, state: State | None = None) -> None:
"""
Set state for specified key
:param key: storage key
:param state: new state
"""
s_key = serialize_key(key)
s_state = state.state if isinstance(state, State) else state
try:
self.repo.set_state(s_key, s_state)
except Exception as e:
log(f"FSM Storage database error: {e}")
async def get_state(self, key: StorageKey) -> Optional[str]:
"""
Get key state
:param key: storage key
:return: current state
"""
s_key = serialize_key(key)
try:
s_state = self.repo.get(s_key)
return s_state.state if s_state else None
except Exception as e:
log(f"FSM Storage database error: {e}")
return None
async def set_data(self, key: StorageKey, data: Dict[str, Any]) -> None:
"""
Write data (replace)
:param key: storage key
:param data: new data
"""
s_key = serialize_key(key)
s_data = serialize_object(data)
try:
self.repo.set_data(s_key, s_data)
except Exception as e:
log(f"FSM Storage database error: {e}")
async def get_data(self, key: StorageKey) -> Optional[Dict[str, Any]]:
"""
Get current data for key
:param key: storage key
:return: current data
"""
s_key = serialize_key(key)
try:
s_data = self.repo.get(s_key)
return deserialize_object(s_data.data) if s_data else None
except Exception as e:
log(f"FSM Storage database error: {e}")
return None
async def update_data(
self, key: StorageKey, data: Dict[str, Any]
) -> Dict[str, Any]:
"""
Update data in the storage for key (like dict.update)
:param key: storage key
:param data: partial data
:return: new data
"""
current_data = await self.get_data(key=key)
if not current_data:
current_data = {}
current_data.update(data)
await self.set_data(key=key, data=current_data)
return current_data.copy()
async def close(self) -> None: # pragma: no cover
pass
async def module_init():
set_fsm(SQLStorage())

View File

@@ -0,0 +1,13 @@
{
"id": "standard.fsm_database_storage",
"name": "FSM Database Storage",
"description": "Очень полезный модуль",
"author": "Karkas Team",
"version": "1.0.0",
"privileged": false,
"dependencies": {
"required": {
"standard.database": "^1.0.0"
}
}
}

View File

@@ -0,0 +1 @@
from .main import module_init

View File

@@ -0,0 +1,21 @@
{
"id": "standard.help",
"name": "Help",
"description": "Модуль для вывода /help сообщения",
"author": "Karkas Team",
"version": "1.0.0",
"privileged": false,
"dependencies": {
"required": {
"standard.config": "^1.0.0"
},
"optional": {
"standard.command_helper": "^1.0.0"
}
},
"pythonDependencies": {
"required": {
"string": "*"
}
}
}

View File

@@ -0,0 +1,81 @@
import string
from typing import TYPE_CHECKING
from aiogram import Router
from aiogram.filters import Command
from aiogram.types import Message
from karkas_core.modules_system.public_api import (
get_metainfo,
get_module,
register_router,
)
if TYPE_CHECKING:
from karkas_blocks.standard.config import IConfig
config: "IConfig" = get_module("standard.config", "config")
try:
(register_command, get_user_commands) = get_module(
"standard.command_helper", ["register_command", "get_user_commands"]
)
COMMAND_HELPER_MODULE_LOADED = True
except Exception:
COMMAND_HELPER_MODULE_LOADED = False
pass
FOOTER = """===
Разработано командой ALT Gnome Infrastructure в рамках проекта Каркас.
Исходный код: https://gitflic.ru/project/alt-gnome/karkas
Оставить репорт: https://gitflic.ru/project/alt-gnome/karkas/issue/create
Руководитель проекта: Семен Фомченков
Ведущий разработчик: Максим Слипенко
Версия: $version
"""
def format_commands(commands_dict):
formatted_commands = []
for command, details in commands_dict.items():
formatted_commands.append(f"/{command} - {details['description']}")
return "\n".join(formatted_commands)
async def help(message: Message):
commands = ""
version = ""
if COMMAND_HELPER_MODULE_LOADED:
commands = format_commands(get_user_commands())
metainfo = get_metainfo()
if "app_version" in metainfo:
version = metainfo["app_version"]
await message.reply(
string.Template(config.get("help::message") + "\n\n" + FOOTER).substitute(
commands=commands, version=version or "не указана"
)
)
async def module_init():
config.register(
"help::message",
"string",
default_value="$commands",
)
router = Router()
router.message.register(help, Command("help"))
register_router(router)
if COMMAND_HELPER_MODULE_LOADED:
register_command("help", "Cправка")

View File

@@ -0,0 +1,15 @@
# Модуль Info
Модуль `info` предоставляет информацию о пользователях и чатах.
## Команды
- `/info` - получить информацию о пользователе.
- `/chatinfo` - получить информацию о чате.
## Использование
Чтобы получить информацию о пользователе, отправьте команду `/info`,
ответив на сообщение пользователя или указав его тег.
Чтобы получить информацию о чате, отправьте команду `/chatinfo`.

View File

@@ -0,0 +1,2 @@
from .handlers import get_chat_info, get_user_info
from .main import module_init

View File

@@ -0,0 +1,86 @@
# flake8: noqa
from typing import TYPE_CHECKING
from aiogram import Bot
from aiogram.types import Message
from karkas_core.modules_system.public_api import get_module, log
if TYPE_CHECKING:
from karkas_blocks.standard.database import db_api as IDbApi
from karkas_blocks.standard.roles import Roles as IRoles
db_api: "IDbApi" = get_module(
"standard.database",
"db_api",
)
Roles: "type[IRoles]" = get_module("standard.roles", "Roles")
async def get_info_answer_by_id(message: Message, bot: Bot, user_id: int):
ai_model = db_api.get_message_ai_model(message.chat.id, message.message_id)
if ai_model is not None:
await message.reply(
"Это сообщение было сгенерировано ботом используя модель: " + ai_model
)
return
if user_id == bot.id:
await message.reply("Это сообщение было отправлено ботом")
return
user = db_api.get_user(user_id)
if user is None:
await message.reply("Пользователь не найден")
log(f"Пользователь не найден: {user_id}, {user}")
return
roles = Roles()
answer = (
f"Пользователь: {user.user_name}\n"
f"Роль: {await roles.get_role_name(role_id=user.user_role)}\n"
f"Тег: @{user.user_tag}\n"
f"Кол-во сообщений: {user.user_stats}\n"
f"Репутация: {user.user_rep}"
)
await message.reply(answer)
async def get_user_info(message: Message, bot: Bot):
# Проверяем содержимое сообщения, если содержит вторым элементом тег пользователя, то выводим информацию о нем
# Если сообщение отвечает на другое сообщение, то выводим информацию о пользователе, на чье сообщение был ответ
# Если это бот то выводим информацию, что это бот и какая модель yandexgpt используется
try:
if len(message.text.split()) > 1 and message.text.split()[1].startswith("@"):
user_tag = message.text.split()[1][1:]
user_id = db_api.get_user_id(user_tag)
if user_id:
await get_info_answer_by_id(message, bot, user_id)
else:
await message.reply(f"Пользователь с тегом @{user_tag} не найден")
elif message.reply_to_message:
user_id = message.reply_to_message.from_user.id
await get_info_answer_by_id(message, bot, user_id)
else:
await get_info_answer_by_id(message, bot, message.from_user.id)
except Exception as e:
await message.reply(
"В вашем запросе что-то пошло не так,"
" попробуйте запросить информацию о пользователе по его тегу или ответив на его сообщение"
)
# print(e)
log(e)
async def get_chat_info(message: Message, bot: Bot):
answer = (
f"*Название чата:* {message.chat.title}\n"
f"*ID чата:* `{message.chat.id}`\n \n"
f"*Суммарное количество сообщений в чате:* {db_api.get_chat_all_stat(message.chat.id)}\n"
f"*Количество пользователей в чате:* {await bot.get_chat_member_count(message.chat.id)}\n"
f"*Количество администраторов в чате:* {len(await bot.get_chat_administrators(message.chat.id))}"
)
await message.reply(answer, parse_mode="MarkdownV2")

View File

@@ -0,0 +1,15 @@
{
"id": "standard.info",
"name": "Info",
"description": "Модуль с информацией",
"author": "Karkas Team",
"version": "1.0.0",
"privileged": false,
"dependencies": {
"required": {
"standard.roles": "^1.0.0",
"standard.database": "^1.0.0",
"standard.command_helper": "^1.0.0"
}
}
}

View File

@@ -0,0 +1,20 @@
from aiogram import Router
from aiogram.filters import Command
from karkas_core.modules_system.public_api import get_module, register_router
from .handlers import get_chat_info, get_user_info
register_command = get_module("standard.command_helper", "register_command")
async def module_init():
router = Router()
router.message.register(get_user_info, Command("info"))
router.message.register(get_chat_info, Command("chatinfo"))
register_router(router)
register_command("info", "Информация о пользователе")
register_command("chatinfo", "Информация о чате")

View File

@@ -0,0 +1,14 @@
# Модуль Message Processing
Модуль `message_processing` обрабатывает все входящие сообщения.
## Функциональность
- Проверка чата и пользователя на наличие в базе данных.
- Обновление информации о чате и пользователе.
- Добавление статистики сообщений.
- Передача сообщения модулю `yandexgpt`, если оно соответствует условиям.
## Использование
Модуль автоматически обрабатывает все входящие сообщения.

View File

@@ -0,0 +1 @@
from .message_api import module_init

View File

@@ -0,0 +1,15 @@
{
"id": "standard.message_processing",
"name": "Info",
"description": "Модуль с информацией",
"author": "Karkas Team",
"version": "1.0.0",
"privileged": false,
"dependencies": {
"required": {
"standard.roles": "^1.0.0",
"standard.database": "^1.0.0",
"standard.command_helper": "^1.0.0"
}
}
}

View File

@@ -0,0 +1,155 @@
# flake8: noqa
from typing import TYPE_CHECKING
from aiogram import Bot, F, Router, types
from karkas_core.modules_system.public_api import get_module, log, register_router
if TYPE_CHECKING:
from karkas_blocks.standard.config import IConfig
config: "IConfig" = get_module("standard.config", "config")
def get_yandexgpt_start_words():
return config.get("yandexgpt::startword").split(" | ")
def get_yandexgpt_in_words():
return config.get("yandexgpt::inword").split(" | ")
chat_not_in_approve = get_module("standard.filters", ["chat_not_in_approve"])
answer_to_message = get_module("external.yandexgpt", "answer_to_message")
(
get_chat,
add_chat,
get_user,
add_user,
get_user_name,
change_user_name,
get_user_tag,
change_user_tag,
update_chat_all_stat,
update_user_all_stat,
add_message,
) = get_module(
"standard.database",
[
"db_api.get_chat",
"db_api.add_chat",
"db_api.get_user",
"db_api.add_user",
"db_api.get_user_name",
"db_api.change_user_name",
"db_api.get_user_tag",
"db_api.change_user_tag",
"db_api.update_chat_all_stat",
"db_api.update_user_all_stat",
"db_api.add_message",
],
)
async def chat_check(message: types.Message):
# Проверка наличия id чата в базе данных чатов
# Если чата нет в базе данных, то проверяем его в наличии в конфиге и если он там есть то добавляем его в БД
# Если чат есть в базе данных, то pass
if get_chat(message.chat.id) is None:
if not chat_not_in_approve(message):
# print(f"Chat in approve list: {message.chat.id} {message.chat.title}")
log(f"Chat in approve list: {message.chat.id} {message.chat.title}")
add_chat(message.chat.id, message.chat.title)
# print(f"Chat added: {message.chat.id} {message.chat.title}")
log(f"Chat added: {message.chat.id} {message.chat.title}")
else:
# print(f"Chat not in approve list: {message.chat.id} {message.chat.title}")
log(f"Chat not in approve list: {message.chat.id} {message.chat.title}")
pass
else:
# Проверяем обновление названия чата
chat = get_chat(message.chat.id)
if chat.chat_name != message.chat.title:
chat.chat_name = message.chat.title
chat.save()
# print(f"Chat updated: {message.chat.id} {message.chat.title}")
log(f"Chat updated: {message.chat.id} {message.chat.title}")
else:
# print(f"Chat already exists: {message.chat.id} {message.chat.title}")
log(f"Chat already exists: {message.chat.id} {message.chat.title}")
pass
async def user_check(message: types.Message):
# Проверка наличия id пользователя в базе данных пользователей
# Если пользователя нет в базе данных, то добавляем его
# Если пользователь есть в базе данных, то pass
current_user_name = ""
if message.from_user.last_name is None:
current_user_name = message.from_user.first_name
else:
current_user_name = (
message.from_user.first_name + " " + message.from_user.last_name
)
if get_user(message.from_user.id) is None:
add_user(
message.from_user.id,
message.from_user.first_name,
message.from_user.last_name,
message.from_user.username,
)
log(f"User added: {message.from_user.id} {current_user_name}")
else:
log(
f"User already exists: {message.from_user.id} {current_user_name} {message.from_user.username}"
)
# Проверяем обновление имени пользователя
if get_user_name(message.from_user.id) != current_user_name:
change_user_name(message.from_user.id, current_user_name)
log(f"User name updated: {message.from_user.id} {current_user_name}")
# Проверяем обновление username пользователя
if get_user_tag(message.from_user.id) != message.from_user.username:
change_user_tag(message.from_user.id, message.from_user.username)
log(
f"User tag updated: {message.from_user.id} {message.from_user.username}"
)
pass
async def add_stats(message: types.Message):
# Добавляем пользователю и чату статистику
update_chat_all_stat(message.chat.id)
update_user_all_stat(message.from_user.id)
async def message_processing(message: types.Message, bot: Bot):
await chat_check(message)
await user_check(message)
await add_stats(message)
add_message(message)
# Если сообщение в начале содержит слово из списка или внутри сообщения содержится слово из списка или сообщение отвечает на сообщение бота
if (message.text.split(" ")[0] in get_yandexgpt_start_words()) or (
any(word in message.text for word in get_yandexgpt_in_words())
):
log("message_processing")
await answer_to_message(message, bot)
elif message.reply_to_message is not None:
if message.reply_to_message.from_user.is_bot:
log("message_processing")
await answer_to_message(message, bot)
router = Router()
# Если сообщение содержит текст то вызывается функция message_processing
router.message.register(message_processing, F.text)
async def module_init():
register_router(router)

View File

@@ -0,0 +1,23 @@
# Модуль Miniapp
Модуль `miniapp` реализует веб-интерфейс для бота, доступный через Telegram Mini Apps.
## Функциональность
- Регистрация страниц веб-интерфейса.
- Авторизация пользователей через Telegram.
## Использование
1. Импортируйте функцию `register_page`.
2. Вызовите функцию `register_page`, передав ей название страницы, ее путь, blueprint Dash и префикс.
## Пример
```python
from karkas_core.modules_system.public_api import get_module
register_page = get_module("standard.miniapp", "register_page")
register_page("Моя страница", "/my_page", my_blueprint, prefix="my_page")
```

View File

@@ -0,0 +1,2 @@
from .lib import register_page
from .main import module_init, module_late_init

View File

@@ -0,0 +1,146 @@
import flask
from aiogram.utils.web_app import safe_parse_webapp_init_data
from dash import Dash
from dash_extensions.enrich import Input, Output
from flask import request
# TODO: добавить прокидывание BASE_PATH, т.к. это параметр из настроек
WEBAPP_LOADER_TEMPLATE = """
<!DOCTYPE html>
<html lang="ru">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Karkas</title>
<script src="https://telegram.org/js/telegram-web-app.js"></script>
<script>
window.addEventListener('message', function(event) {
if (event.origin !== window.location.origin) return;
if (event.data.type === 'iframe-url-changed') {
history.pushState(
null,
'',
window.BASE_PATH + event.data.pathname.substring(
window.INTERNAL_PATH.length
)
);
}
});
window.addEventListener('popstate', function(event) {
var iframe = document.getElementById('app-frame');
var iframeWindow = iframe.contentWindow;
iframeWindow.history.back();
});
</script>
<style>
#app-frame {
display:none;
width:100%;
height:100vh;
border:none;
position: absolute;
top: 0;
left: 0;
}
</style>
</head>
<body>
<div id="loading">Loading...</div>
<iframe id="app-frame"></iframe>
<script>
document.addEventListener('DOMContentLoaded', function() {
const tg = window.Telegram.WebApp;
document.cookie = `tg_init_data=${JSON.stringify(tg.initData)}; path=/`;
// if (!tg.initData) return;
const iframe = document.getElementById('app-frame');
// Константы для путей
const BASE_PATH = '/webapp';
const INTERNAL_PATH = '/webapp/_internal';
window.BASE_PATH = BASE_PATH;
window.INTERNAL_PATH = INTERNAL_PATH
// Текущий путь страницы
const currentPath = window.location.pathname;
// Формируем новый путь для iframe
let iframeSrc = INTERNAL_PATH;
// Если текущий путь начинается с BASE_PATH, убираем BASE_PATH из текущего пути
if (currentPath.startsWith(BASE_PATH)
&& currentPath.length > BASE_PATH.length) {
iframeSrc += currentPath.substring(BASE_PATH.length);
} else if (currentPath !== '/') {
iframeSrc += currentPath;
}
iframe.src = iframeSrc;
iframe.onload = function() {
document.getElementById('loading').style.display = 'none';
iframe.style.display = 'block';
};
});
</script>
</body>
</html>
"""
def get_auth_server(bot_token: str):
server = flask.Flask(__name__)
@server.route("/<path:rest>")
@server.route("/")
def webapp_loader(rest=None):
return flask.Response(WEBAPP_LOADER_TEMPLATE, mimetype="text/html")
@server.before_request
def add_auth_data():
init_data = request.cookies.get("tg_init_data")
if init_data:
try:
data = safe_parse_webapp_init_data(token=bot_token, init_data=init_data)
flask.g.user = data.user.model_dump()
except ValueError:
pass
return server
def setup_auth_clientcallbacks(app: Dash):
app.clientside_callback(
"""
function(n_intervals) {
if (window.webAppData) {
return window.webAppData;
}
function receiveMessage(event) {
if (event.data.type === 'webAppData') {
window.webAppData = event.data.webApp;
window.removeEventListener('message', receiveMessage);
}
}
window.addEventListener('message', receiveMessage, false);
return window.dash_clientside.no_update;
}
""",
Output("hidden-div", "children"),
Input("interval-component", "n_intervals"),
)
app.clientside_callback(
"""
function(pathname) {
window.parent.postMessage({ type: 'iframe-url-changed', pathname }, '*');
}
""",
Input("url", "pathname"),
)

View File

@@ -0,0 +1,23 @@
{
"id": "standard.miniapp",
"name": "Miniapp",
"description": "Очень полезный модуль",
"author": "Karkas Team",
"version": "1.0.0",
"privileged": false,
"dependencies": {
"required": {
"standard.config": {
"version": "^1.0.0",
"uses": []
}
}
},
"pythonDependencies": {
"required": {
"dash": "^2.17.1",
"dash_extensions": "^1.0.18",
"dash_bootstrap_components": "^1.6.0"
}
}
}

View File

@@ -0,0 +1,191 @@
import asyncio
from collections import OrderedDict
from typing import TYPE_CHECKING
import dash
import dash_bootstrap_components as dbc
import flask
from dash_extensions.enrich import DashBlueprint, DashProxy, Input, Output, dcc, html
from dash_extensions.pages import setup_page_components
from karkas_core.modules_system.public_api import get_module, log
from .dash_telegram_auth import get_auth_server, setup_auth_clientcallbacks
if TYPE_CHECKING:
from karkas_blocks.standard.config import IConfig
from karkas_blocks.standard.roles import Roles as IRoles
pages = OrderedDict()
def register_page(name, path, blueprint, prefix="", role="USER"):
pages[path] = {
"name": name,
"blueprint": blueprint,
"prefix": prefix,
"role": role,
}
def register_home_page():
page = DashBlueprint()
page.layout = html.Div([html.H1("Главная")])
register_page("Главная", path="/", blueprint=page)
register_home_page()
config: "IConfig" = get_module("standard.config", "config")
Roles: "type[IRoles]" = get_module("standard.roles", "Roles")
def create_dash_app(requests_pathname_prefix: str = None) -> dash.Dash:
log(requests_pathname_prefix)
real_prefix = f"{requests_pathname_prefix}_internal/"
server = get_auth_server(config.get("core::token"))
app = DashProxy(
pages_folder="",
use_pages=True,
suppress_callback_exceptions=True,
external_stylesheets=[
dbc.themes.BOOTSTRAP,
dbc.icons.BOOTSTRAP,
],
external_scripts=[
#
"https://telegram.org/js/telegram-web-app.js"
],
server=server,
requests_pathname_prefix=real_prefix,
routes_pathname_prefix="/_internal/",
# requests_pathname_prefix=requests_pathname_prefix,
meta_tags=[
{"name": "viewport", "content": "width=device-width, initial-scale=1"},
],
)
# app.enable_dev_tools(
# dev_tools_ui=True,
# dev_tools_serve_dev_bundles=True,
# )
# Register pages
for path, page in pages.items():
page["blueprint"].register(app, path, prefix=page["prefix"])
# Create navbar
navbar = dbc.Navbar(
dbc.Container(
[
dbc.Button(
html.I(className="bi bi-list"),
id="open-offcanvas",
color="light",
className="me-2",
),
dbc.NavbarBrand("Karkas"),
]
),
color="primary",
dark=True,
)
roles = Roles()
def create_layout():
user = getattr(flask.g, "user", None)
if not user:
return html.Div()
user_id = user["id"]
user_permission = asyncio.run(roles.get_user_permission(user_id)) or "USER"
available_pages = {
path: page
for path, page in pages.items()
if (isinstance(page["role"], list) and user_permission in page["role"])
or page["role"] == user_permission
or page["role"] == "USER"
}
# Create sidebar
sidebar = dbc.Offcanvas(
id="offcanvas",
title="Меню",
is_open=False,
children=[
dbc.Nav(
[
dbc.NavLink(
page["name"],
href=f"{real_prefix}/{path.lstrip('/')}",
id={"type": "nav-link", "index": path},
)
for path, page in available_pages.items()
],
vertical=True,
pills=True,
),
],
)
layout = html.Div(
[
dcc.Location(id="url", refresh=False),
dcc.Interval(
id="init-telegram-interval",
interval=100,
n_intervals=0,
max_intervals=1,
),
navbar,
sidebar,
dash.page_container,
setup_page_components(),
]
)
return layout
app.layout = create_layout
setup_auth_clientcallbacks(app)
# Открытие на кнопку меню
app.clientside_callback(
"""
function(n_clicks) {
if (n_clicks == null) {
return false;
}
return true;
}
""",
Output(
"offcanvas",
"is_open",
),
Input("open-offcanvas", "n_clicks"),
)
# Закрываем offcanvas при клике на ссылку в меню
app.clientside_callback(
"""
function(n_clicks) {
if (n_clicks == null) {
return true;
}
return false;
}
""",
Output("offcanvas", "is_open", allow_duplicate=True),
Input({"type": "nav-link", "index": dash.dependencies.ALL}, "n_clicks"),
prevent_initial_call="initial_duplicate",
)
return app

View File

@@ -0,0 +1,54 @@
from typing import TYPE_CHECKING
from aiogram import types
from fastapi.middleware.wsgi import WSGIMiddleware
from karkas_core.modules_system.public_api import (
Storage,
get_module,
set_chat_menu_button,
)
if TYPE_CHECKING:
from karkas_blocks.standard.config import IConfig
config: "IConfig" = get_module("standard.config", "config")
def get_link():
pass
def module_init():
config.register(
"miniapp::prefix",
"string",
default_value="/webapp/",
visible=False,
)
config.register(
"miniapp::public_url",
"string",
visible=False,
)
pass
def register_page():
pass
async def module_late_init():
from .lib import create_dash_app
dash_app = create_dash_app(requests_pathname_prefix=config.get("miniapp::prefix"))
Storage.set("webapp", WSGIMiddleware(dash_app.server))
web_app_info = types.WebAppInfo(url=config.get("miniapp::public_url"))
menu_button = types.MenuButtonWebApp(text="Меню", web_app=web_app_info)
await set_chat_menu_button(menu_button)

View File

@@ -0,0 +1,18 @@
# Модуль Report
Модуль `report` позволяет пользователям сообщать о спам-сообщениях в чате.
## Команды
- `/report` - пожаловаться на сообщение как на спам.
## Использование
Чтобы сообщить о сообщении как о спаме, отправьте команду `/report`, ответив на сообщение, которое вы хотите отметить. Модуль уведомит администраторов, которые имеют права модерации.
### Пример использования
1. Найдите сообщение, которое вы хотите отметить как спам.
2. Ответьте на это сообщение командой `/report`.
Примечание: Команда `/report` должна быть отправлена в ответ на сообщение, которое вы хотите отметить.

View File

@@ -0,0 +1 @@
from .main import module_init

View File

@@ -0,0 +1,23 @@
{
"id": "standard.report",
"name": "Report",
"description": "Модуль для быстрой жалобы на спам",
"author": "Karkas Team",
"version": "1.0.0",
"privileged": false,
"dependencies": {
"required": {
"standard.config": "^1.0.0"
},
"optional": {
"standard.command_helper": "^1.0.0",
"standard.filters": "^1.0.0"
}
},
"pythonDependencies": {
"required": {
"random": "*",
"string": "*"
}
}
}

View File

@@ -0,0 +1,98 @@
import random
from string import Template
from typing import TYPE_CHECKING
from aiogram import Router
from aiogram.filters import Command
from aiogram.types import ChatMemberAdministrator, ChatMemberOwner, Message
from karkas_core.modules_system.public_api import get_module, log, register_router
if TYPE_CHECKING:
from karkas_blocks.standard.config import IConfig
from karkas_blocks.standard.filters import ChatIDFilter as IChatIDFilter
config: "IConfig" = get_module("standard.config", "config")
try:
ChatIDFilter: "type[IChatIDFilter]" = get_module("standard.filters", "ChatIDFilter")
FILTERS_MODULE_LOADED = True
except Exception:
FILTERS_MODULE_LOADED = False
pass
try:
register_command = get_module("standard.command_helper", "register_command")
COMMAND_HELPER_MODULE_LOADED = True
except Exception:
COMMAND_HELPER_MODULE_LOADED = False
pass
def can_moderate(admin: ChatMemberOwner | ChatMemberAdministrator) -> bool:
if isinstance(admin, ChatMemberOwner):
return True
return admin.user.is_bot is False and (
admin.can_delete_messages and admin.can_restrict_members
)
async def report(message: Message):
try:
if message.reply_to_message is None:
await message.reply(config.get("report::errors::no_reply_message"))
return
mention_list = config.get("report::mention::list")
if mention_list is None:
admins = await message.chat.get_administrators()
mention_list = [
admin.user.mention_html() for admin in admins if can_moderate(admin)
]
random.shuffle(mention_list)
limit = config.get("report::mention::limit")
if limit != -1:
mention_list = mention_list[:limit]
if mention_list:
await message.reply_to_message.reply(
Template(config.get("report::mention::text")).substitute(
mention=", ".join(mention_list)
),
parse_mode="HTML",
)
except Exception as e:
log(e)
async def module_init():
config.register("report::mention::limit", "int", default_value=5)
config.register(
"report::mention::list", "string", multiple=True, default_value=None
)
config.register(
"report::mention::text",
"string",
default_value="⚠️ Внимание, жалоба на спам! $mention",
)
config.register(
"report::errors::no_reply_message",
"string",
default_value="Пожалуйста, используйте команду в ответ на сообщение",
)
router = Router()
if FILTERS_MODULE_LOADED:
router.message.register(report, ChatIDFilter(), Command("report"))
else:
router.message.register(report, Command("report"))
register_router(router)
if COMMAND_HELPER_MODULE_LOADED:
register_command("report", "Пожаловаться на спам")

View File

@@ -0,0 +1,36 @@
# Модуль Roles
Модуль `roles` управляет ролями пользователей.
## Роли
- `USER` - обычный пользователь.
- `MODERATOR` - модератор.
- `ADMIN` - администратор.
- `BOT` - бот.
## Функциональность
- Проверка роли пользователя.
- Получение имени роли по ID.
- Получение ID роли по имени.
## Использование
1. Импортируйте класс `Roles`.
2. Создайте экземпляр класса.
3. Вызовите методы класса для проверки роли пользователя или получения имени роли.
## Пример
```python
from karkas_core.modules_system.public_api import get_module
Roles = get_module("standard.roles", "Roles")
roles = Roles()
is_admin = await roles.check_admin_permission(user_id)
role_name = await roles.get_role_name(role_id)
```

View File

@@ -0,0 +1,2 @@
from .main import module_init
from .roles import Roles

View File

@@ -0,0 +1,14 @@
{
"id": "standard.roles",
"name": "Roles",
"description": "Модуль для работы с ролями",
"author": "Karkas Team",
"version": "1.0.0",
"privileged": true,
"dependencies": {
"required": {
"standard.config": "^1.0.0",
"standard.database": "^1.0.0"
}
}
}

View File

@@ -0,0 +1,37 @@
from typing import TYPE_CHECKING
from karkas_core.modules_system.public_api import get_module
if TYPE_CHECKING:
from karkas_blocks.standard.config import IConfig
def module_init():
config: "IConfig" = get_module("standard.config", "config")
config.register(
"roles::admin",
"number",
default_value=2,
visible=False,
)
config.register(
"roles::moderator",
"number",
default_value=1,
visible=False,
)
config.register(
"roles::user",
"number",
default_value=0,
visible=False,
)
config.register(
"roles::bot",
"number",
default_value=3,
visible=False,
)
pass

View File

@@ -0,0 +1,72 @@
from typing import TYPE_CHECKING
from karkas_core.modules_system.public_api import get_module
if TYPE_CHECKING:
from karkas_blocks.standard.config import IConfig
from karkas_blocks.standard.database.db_api import get_user_role as IGetUserRoleType
get_user_role: "IGetUserRoleType" = get_module(
"standard.database", "db_api.get_user_role"
)
config: "IConfig" = get_module("standard.config", "config")
class Roles:
user = "USER"
moderator = "MODERATOR"
admin = "ADMIN"
bot = "BOT"
def __init__(self):
pass
def update_roles(self):
self.user_role_id = config.get("roles::user")
self.moderator_role_id = config.get("roles::moderator")
self.admin_role_id = config.get("roles::admin")
self.bot_role_id = config.get("roles::bot")
async def check_admin_permission(self, user_id):
self.update_roles()
match get_user_role(user_id):
case self.admin_role_id:
return True
case _:
return False
async def check_moderator_permission(self, user_id):
self.update_roles()
match get_user_role(user_id):
case self.moderator_role_id:
return True
case _:
return False
async def get_role_name(self, role_id):
self.update_roles()
match role_id:
case self.admin_role_id:
return self.admin
case self.moderator_role_id:
return self.moderator
case self.user_role_id:
return self.user
case self.bot_role_id:
return self.bot
case _:
raise ValueError(f"Нет роли с id={role_id}")
async def get_user_permission(self, user_id):
self.update_roles()
match get_user_role(user_id):
case self.admin_role_id:
return self.admin
case self.moderator_role_id:
return self.moderator
case self.user_role_id:
return self.user
case self.bot_role_id:
return self.bot
case _:
return None

View File

@@ -0,0 +1,80 @@
import os
import unittest
from ...database.db_api import add_user, connect_database, create_tables
from ..roles import Roles
class TestRoles(unittest.IsolatedAsyncioTestCase):
database = None
path = None
@classmethod
def setUpClass(cls):
cls.roles = Roles()
cls.database, cls.path = connect_database(is_test=True, module="roles")
create_tables(cls.database)
add_user(
user_id=1,
user_name="TestUser1",
user_tag="TestTag1",
user_role=0,
user_stats=30,
user_rep=15,
)
add_user(
user_id=2,
user_name="TestUser3",
user_tag="TestTag3",
user_role=1,
user_stats=30,
user_rep=15,
)
add_user(
user_id=3,
user_name="TestUser4",
user_tag="TestTag4",
user_role=2,
user_stats=30,
user_rep=15,
)
add_user(
user_id=4,
user_name="TestUser2",
user_tag="TestTag2",
user_role=3,
user_stats=30,
user_rep=15,
)
async def test_check_admin_permission(cls):
cls.assertTrue(await cls.roles.check_admin_permission(1))
cls.assertFalse(await cls.roles.check_admin_permission(2))
cls.assertFalse(await cls.roles.check_admin_permission(3))
cls.assertFalse(await cls.roles.check_admin_permission(4))
async def test_check_moderator_permission(cls):
cls.assertTrue(await cls.roles.check_moderator_permission(2))
cls.assertFalse(await cls.roles.check_moderator_permission(0))
cls.assertFalse(await cls.roles.check_moderator_permission(1))
cls.assertFalse(await cls.roles.check_moderator_permission(3))
async def test_get_role_name(cls):
cls.assertEqual(await cls.roles.get_role_name(cls.roles.admin_role_id), "ADMIN")
cls.assertEqual(
await cls.roles.get_role_name(cls.roles.moderator_role_id), "MODERATOR"
)
cls.assertEqual(await cls.roles.get_role_name(cls.roles.user_role_id), "USER")
cls.assertEqual(await cls.roles.get_role_name(cls.roles.bot_role_id), "BOT")
with cls.assertRaises(ValueError):
await cls.roles.get_role_name(999) # Несуществующий ID роли
@classmethod
def tearDownClass(cls):
cls.database.close()
os.system(f"rm {cls.path}") # nosec
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,21 @@
## Модуль Welcome
Модуль `welcome` отвечает за верификацию новых участников чата, используя различные методы проверки. Он помогает предотвратить спам и автоматические атаки на чат, обеспечивая, что новые участники подтверждают свою человеческую природу перед получением доступа.
## Команды и Методы
Модуль поддерживает несколько методов верификации, которые случайным образом применяются к новым участникам чата:
- **IAmHumanButton** - Верификация с помощью кнопки.
- **IAmHumanInput** - Верификация с помощью ввода текста.
- **MathButtonsVerification** - Верификация решением математической задачи с помощью кнопок.
- **MathInputVerificationMethod** - Верификация решением математической задачи с помощью ввода.
- **QuestionButtonsVerification** - Верификация ответом на вопрос с помощью кнопок.
- **QuestionInputVerification** - Верификация ответом на вопрос с помощью ввода.
## Как это работает
1. **Обработка новых участников**: Когда новый участник присоединяется к чату, выбирается случайный метод верификации, и создается задача проверки.
2. **Тайм-аут проверки**: Если новый участник не проходит проверку в течение 30 секунд, его статус в чате меняется на "забанен".
3. **Верификация по кнопкам**: Если верификация осуществляется с помощью кнопок, обработчик будет ожидать нажатие кнопки от пользователя и проверит правильность ответа.
4. **Верификация по вводу**: Если верификация осуществляется путем ввода текста, обработчик будет проверять введенный текст и действовать в зависимости от результата проверки.

View File

@@ -0,0 +1 @@
from .main import module_init, module_late_init

View File

@@ -0,0 +1,22 @@
{
"id": "standard.welcome",
"name": "Welcome",
"description": "Модуль для проверки на бота",
"author": "Karkas Team",
"version": "1.0.0",
"privileged": true,
"dependencies": {
"required": {
"standard.config": "^1.0.0"
},
"optional": {
"standard.command_helper": "^1.0.0",
"standard.filters": "^1.0.0"
}
},
"pythonDependencies": {
"required": {
"asyncio": "*"
}
}
}

View File

@@ -0,0 +1,362 @@
import asyncio
import random
from string import Template
from typing import TYPE_CHECKING
from aiogram import Bot, Router, types
from aiogram.enums import ChatMemberStatus, ParseMode
from aiogram.exceptions import TelegramBadRequest
from aiogram.filters import JOIN_TRANSITION, LEAVE_TRANSITION, ChatMemberUpdatedFilter
from aiogram.types import ChatMemberUpdated, PollAnswer
from karkas_core.modules_system.public_api import get_module, log, register_router
from .utils import MultiKeyDict, get_plural_form, key_from_poll, key_from_user_chat
from .verifications_methods.base import BaseTask, VerificationCallback
from .verifications_methods.math import MathInlineButtonsTask, MathPollTask
from .verifications_methods.question import QuestionInlineButtonsTask, QuestionPollTask
from .verifications_methods.simple import (
SimpleVariantsBaseTask,
SimpleVariantsBaseTaskConfig,
)
from .verifications_methods.utils import user_mention
if TYPE_CHECKING:
from karkas_blocks.standard.config import IConfig
from karkas_blocks.standard.filters import ChatIDFilter as IChatIDFilter
config: "IConfig" = get_module("standard.config", "config")
try:
ChatIDFilter: "type[IChatIDFilter]" = get_module("standard.filters", "ChatIDFilter")
FILTERS_MODULE_LOADED = True
except Exception:
FILTERS_MODULE_LOADED = False
pass
all_tasks = [
MathInlineButtonsTask,
MathPollTask,
QuestionInlineButtonsTask,
QuestionPollTask,
]
class TaskManager:
def __init__(self, config: "IConfig"):
self.config = config
self.available_tasks = []
self.max_attempts = 1
def init(self):
for cls in all_tasks:
type_name = cls.type_name()
if self.config.get(f"welcome::tasks::{type_name}::enabled"):
log(f"Task {cls.type_name()} enabled")
self.available_tasks.append(cls)
self.max_attempts = self.config.get("welcome::max_attempts")
def build_random_task(self, event, bot, attempt_number=1) -> BaseTask:
cls = random.choice(self.available_tasks) # nosec
obj = cls(
event,
bot,
timeout_func=verify_timeout,
attempt_number=attempt_number,
max_attempts=self.max_attempts,
)
if isinstance(obj, SimpleVariantsBaseTask):
cfg = SimpleVariantsBaseTaskConfig(obj.type_name(), self.config)
obj.set_config(cfg)
return obj
verification_tasks = MultiKeyDict()
last_success = {}
async def new_member_handler(event: ChatMemberUpdated, bot: Bot):
# НЕ СРАБОТАЕТ, ЕСЛИ ЧЕЛОВЕК УЖЕ ОГРАНИЧЕН В ПРАВАХ (RESTRICTED)
if event.new_chat_member.status == ChatMemberStatus.MEMBER:
task = task_manager.build_random_task(event, bot)
keys = await task.run()
verification_tasks.add(task, keys)
async def left_member_handler(event: ChatMemberUpdated, bot: Bot):
user_id = event.from_user.id
chat_id = event.chat.id
key = key_from_user_chat(user_id, chat_id)
if not verification_tasks.exists(key):
return
task = verification_tasks.get(key)
await task.end(success=False)
verification_tasks.pop((user_id, chat_id), None)
async def verify_timeout(task: BaseTask):
user_id = task.from_user_id
chat_id = task.from_chat_id
try:
timeout = task.get_timeout()
# log(f"Start timeout {timeout}")
await asyncio.sleep(timeout)
await task.end(success=False)
await task.bot.ban_chat_member(chat_id, user_id)
except Exception as e:
log(f"Error in verify_timeout: {e}")
finally:
verification_tasks.remove(key_from_user_chat(user_id, chat_id))
async def success_end(task: BaseTask):
await task.end()
await asyncio.sleep(3)
if task.from_chat_id in last_success:
message_id = last_success.pop(task.from_chat_id)
try:
await task.bot.delete_message(task.from_chat_id, message_id)
except TelegramBadRequest:
pass
if config.get("welcome::show_success_message"):
message = await task.bot.send_message(
task.from_chat_id,
Template(config.get("welcome::success_message")).substitute(
mention=user_mention(task.from_user)
),
parse_mode=ParseMode.HTML,
)
last_success[task.from_chat_id] = message.message_id
async def handle_poll_verification(answer: PollAnswer, bot: Bot):
key = key_from_poll(answer.poll_id)
if not verification_tasks.exists(key):
return
task: BaseTask = verification_tasks.get(key)
if task.from_user_id != answer.user.id:
return
result = await task.verify(answer.option_ids[0])
if result:
await success_end(task)
return
await task.end(success=False)
current_attempt = task.attempt_number
if current_attempt >= task_manager.max_attempts:
await task.bot.ban_chat_member(task.from_chat_id, task.from_user_id)
return
await asyncio.sleep(5)
current_attempt = current_attempt + 1
new_task = task_manager.build_random_task(
task.event, task.bot, attempt_number=current_attempt
)
keys = await new_task.run()
log(keys)
verification_tasks.add(new_task, keys)
async def handle_inline_button_verification(
callback_query: types.CallbackQuery, callback_data: VerificationCallback, bot: Bot
):
user_id = callback_data.user_id
chat_id = callback_data.chat_id
if callback_query.from_user.id != user_id:
await callback_query.answer("Эта кнопка не для вас!", show_alert=True)
return
key = key_from_user_chat(user_id, chat_id)
if not verification_tasks.exists(key):
await callback_query.answer()
return
task: BaseTask = verification_tasks.get(key)
result = await task.verify(callback_data.answer)
if result:
await success_end(task)
return
await task.end(success=False)
current_attempt = task.attempt_number
if current_attempt >= task_manager.max_attempts:
await callback_query.answer()
await task.bot.ban_chat_member(chat_id, user_id)
return
await callback_query.answer(
Template(config.get("welcome::retry_message")).substitute(
attempts=get_plural_form(
task_manager.max_attempts - current_attempt,
"попытка",
"попытки",
"попыток",
)
),
show_alert=True,
)
current_attempt = current_attempt + 1
new_task = task_manager.build_random_task(
task.event, task.bot, attempt_number=current_attempt
)
keys = await new_task.run()
verification_tasks.add(new_task, keys)
config: "IConfig" = get_module("standard.config", "config")
task_manager = TaskManager(config)
async def module_init():
config.register("welcome::timeout", "int", default_value=60)
config.register("welcome::max_attempts", "int", default_value=5)
config.register(
"welcome::retry_message",
"string",
default_value="Неправильный ответ! У вас еще $attempts.",
)
config.register("welcome::show_success_message", "boolean", default_value=True)
config.register(
"welcome::success_message",
"string",
default_value="$mention, вы успешно прошли проверку!",
)
# MATH BUTTONS
config.register(
"welcome::tasks::math_buttons::enabled", "boolean", default_value=False
)
config.register(
"welcome::tasks::math_buttons::message_text",
"int",
default_value="Привет, $mention!\n"
"Ответьте на вопрос, "
"чтобы подтвердить, что вы не робот:\n\n<b>$task</b>",
)
config.register(
"welcome::tasks::math_buttons::retry_message_text",
"int",
default_value="$mention, неправильный ответ! У вас еще $attempts\n"
"Ответьте на вопрос, "
"чтобы подтвердить, что вы не робот:\n\n<b>$task</b>",
)
config.register("welcome::tasks::math_buttons::timeout", "int", default_value=None)
# MATH POLL
config.register("welcome::tasks::math_poll::enabled", "boolean", default_value=True)
config.register(
"welcome::tasks::math_poll::message_text",
"string",
default_value="Привет, $mention!\n"
"Ответьте на вопрос, "
"чтобы подтвердить, что вы не робот:\n\n$task",
)
config.register(
"welcome::tasks::math_poll::retry_message_text",
"string",
default_value="$mention, неправильный ответ! У вас еще $attempts\n"
"Ответьте на вопрос, "
"чтобы подтвердить, что вы не робот:\n\n$task",
)
config.register("welcome::tasks::math_poll::timeout", "int", default_value=None)
# QUESTION BUTTONS
config.register(
"welcome::tasks::question_buttons::enabled", "boolean", default_value=False
)
config.register(
"welcome::tasks::question_buttons::message_text",
"string",
default_value="Привет, $mention!\n"
"Ответьте на вопрос, "
"чтобы подтвердить, что вы не робот:\n\n<b>$task</b>",
)
config.register(
"welcome::tasks::question_buttons::retry_message_text",
"string",
default_value="$mention, неправильный ответ! У вас еще $attempts.\n"
"Ответьте на вопрос, "
"чтобы подтвердить, что вы не робот:\n\n<b>$task</b>",
)
config.register(
"welcome::tasks::question_buttons::timeout", "int", default_value=None
)
# QUESTION POLL
config.register(
"welcome::tasks::question_poll::enabled", "boolean", default_value=True
)
config.register(
"welcome::tasks::question_poll::message_text",
"string",
default_value="Привет, $mention!\n"
"Ответьте на вопрос, "
"чтобы подтвердить, что вы не робот:\n\n$task",
)
config.register(
"welcome::tasks::question_poll::retry_message_text",
"string",
default_value="$mention, неправильный ответ! У вас еще $attempts.\n"
"Ответьте на вопрос, "
"чтобы подтвердить, что вы не робот:\n\n$task",
)
config.register("welcome::tasks::question_poll::timeout", "int", default_value=None)
router = Router()
common_filters_pre = []
if FILTERS_MODULE_LOADED:
common_filters_pre.append(ChatIDFilter())
router.chat_member(*common_filters_pre, ChatMemberUpdatedFilter(JOIN_TRANSITION))(
new_member_handler
)
router.chat_member(*common_filters_pre, ChatMemberUpdatedFilter(LEAVE_TRANSITION))(
left_member_handler
)
router.callback_query(*common_filters_pre, VerificationCallback.filter())(
handle_inline_button_verification
)
# Нельзя применить ChatIDFilter из-за отстутсвия id чата
router.poll_answer()(handle_poll_verification)
register_router(router)
async def module_late_init():
task_manager.init()

View File

@@ -0,0 +1,45 @@
def get_plural_form(number, singular, genitive_singular, plural):
if 11 <= number % 100 <= 19:
return f"{number} {plural}"
elif number % 10 == 1:
return f"{number} {singular}"
elif 2 <= number % 10 <= 4:
return f"{number} {genitive_singular}"
else:
return f"{number} {plural}"
class MultiKeyDict:
def __init__(self):
self.value_to_keys = {} # Словарь значений и связанных с ними ключей
self.key_to_value = {} # Словарь ключей и связанных с ними значений
def add(self, value, keys):
# Добавляем значение в словарь с множеством ключей
self.value_to_keys[value] = set(keys)
# Для каждого ключа создаем запись в словаре key_to_value
for key in keys:
self.key_to_value[key] = value
def get(self, key):
return self.key_to_value.get(key)
def exists(self, key):
return key in self.key_to_value
def remove(self, key):
if key in self.key_to_value:
value = self.key_to_value.pop(key)
self.value_to_keys[value].remove(key)
for k in self.value_to_keys[value]:
del self.key_to_value[k]
def key_from_user_chat(user_id, chat_id):
return f"uc:{user_id}_{chat_id}"
def key_from_poll(poll_id):
return f"p:{poll_id}"

View File

@@ -0,0 +1,105 @@
import asyncio
from functools import wraps
from aiogram import Bot
from aiogram.exceptions import TelegramBadRequest
from aiogram.filters.callback_data import CallbackData
from aiogram.types import ChatMemberUpdated
from karkas_core.modules_system.public_api import log
from .utils import mute_user, unmute_user
class BaseTask:
def __init__(
self,
event: ChatMemberUpdated,
bot: Bot,
timeout_func=None,
attempt_number=1,
max_attempts=1,
):
self.bot = bot
self.event = event
self.timeout_func = timeout_func
self.attempt_number = attempt_number
self.max_attempts = max_attempts
self.timeout_func_task = None
@property
def from_chat_id(self):
return self.event.chat.id
@property
def from_user_id(self):
return self.event.from_user.id
@property
def from_user(self):
return self.event.from_user
@property
def attemps_left(self):
return self.max_attempts - self.attempt_number + 1
async def start_timeout_func(self):
if self.timeout_func:
self.timeout_func_task = asyncio.create_task(self.timeout_func(self))
@staticmethod
def type_name():
raise NotImplementedError()
async def run(self):
raise NotImplementedError()
async def verify(self, data):
raise NotImplementedError()
async def end(self, success=True):
raise NotImplementedError()
async def get_timeout():
raise NotImplementedError()
def mute_while_task(cls):
original_run = getattr(cls, "run", None)
original_end = getattr(cls, "end", None)
if not original_run and not original_end:
return cls
@wraps(original_run)
async def wrapped_run(self: BaseTask):
chat_id = self.from_chat_id
user_id = self.from_user_id
try:
await mute_user(chat_id, user_id, 0, self.bot)
except TelegramBadRequest as e:
log(e)
pass
return await original_run(self)
@wraps(original_end)
async def wrapped_end(self: BaseTask, success=True):
await original_end(self, success)
if success:
chat_id = self.from_chat_id
user_id = self.from_user_id
try:
await unmute_user(chat_id, user_id, self.bot)
except TelegramBadRequest as e:
log(e)
pass
cls.run = wrapped_run
cls.end = wrapped_end
return cls
class VerificationCallback(CallbackData, prefix="verify"):
user_id: int
chat_id: int
answer: str = None

View File

@@ -0,0 +1,53 @@
import random
from .simple import SimpleInlineButtonsTask, SimplePollTask, SimpleVariantsBaseTask
class BaseMathTask(SimpleVariantsBaseTask):
def generate_math_problem(self):
a = random.randint(1, 10) # nosec
b = random.randint(1, 10) # nosec
operation = random.choice(["+", "-", "*"]) # nosec
if operation == "+":
answer = a + b
elif operation == "-":
answer = a - b
else:
answer = a * b
return f"{a} {operation} {b}", answer
async def init(self):
problem, correct_answer = self.generate_math_problem()
self.variants = [correct_answer]
while len(self.variants) < 4:
wrong_answer = random.randint(
correct_answer - 5, correct_answer + 5
) # nosec
if wrong_answer not in self.variants:
self.variants.append(wrong_answer)
random.shuffle(self.variants) # nosec
self.variants = [str(x) for x in self.variants]
self.task = f"{problem} = ?"
self.correct = str(correct_answer)
class MathInlineButtonsTask(BaseMathTask, SimpleInlineButtonsTask):
"""
Математическая задача с выбором через inline-кнопки
"""
@staticmethod
def type_name():
return "math_buttons"
class MathPollTask(BaseMathTask, SimplePollTask):
"""
Математическая задача с выбором через Poll
"""
@staticmethod
def type_name():
return "math_poll"

View File

@@ -0,0 +1,79 @@
import random
from aiogram.types import InlineKeyboardButton, InlineKeyboardMarkup
from .base import VerificationCallback
from .simple import SimpleInlineButtonsTask, SimplePollTask, SimpleVariantsBaseTask
QUESTIONS = [
(
"Какой город является столицей России?",
"Москва",
["Санкт-Петербург", "Новосибирск", "Екатеринбург"],
),
(
"Какой город называют северной столицей России?",
"Санкт-Петербург",
["Владивосток", "Новосибирск", "Екатеринбург"],
),
("Какая национальная валюта в России?", "Рубль", ["Евро", "Доллар", "Юань"]),
("Год окончания Великой Отечественной войны?", "1945", ["2024", "862", "1721"]),
(
"Самая БОЛЬШАЯ страна по площади?",
"Россия",
["Люксембург", "Ватикан", "Лихтенштейн"],
),
("Сколько лап у кошки?", "4", ["10", "12", "14"]),
("Сколько ног у осьминога?", "8", ["6", "10", "12"]),
(
"Какой день недели идет после понедельника?",
"Вторник",
["Среда", "Четверг", "Пятница"],
),
("Сколько часов в сутках?", "24", ["12", "48", "60"]),
("Какой месяц самый короткий?", "Февраль", ["Март", "Апрель", "Май"]),
]
class BaseQuestionsTask(SimpleVariantsBaseTask):
async def init(self):
question, correct_answer, wrong_answers = random.choice(QUESTIONS) # nosec
options = [correct_answer] + wrong_answers
random.shuffle(options) # nosec
self.variants = [str(x) for x in options]
self.task = question
self.correct = correct_answer
async def verify(self, data):
return self.variants[int(data)] == self.correct
class QuestionInlineButtonsTask(BaseQuestionsTask, SimpleInlineButtonsTask):
@staticmethod
def type_name():
return "question_buttons"
def build_keyboard(self):
return InlineKeyboardMarkup(
inline_keyboard=[
[
InlineKeyboardButton(
text=str(option),
callback_data=VerificationCallback(
user_id=self.from_user_id,
chat_id=self.from_chat_id,
answer=str(i),
).pack(),
)
]
for i, option in enumerate(self.variants)
]
)
class QuestionPollTask(BaseQuestionsTask, SimplePollTask):
@staticmethod
def type_name():
return "question_poll"

View File

@@ -0,0 +1,178 @@
from string import Template
from aiogram import Bot
from aiogram.enums import ParseMode, PollType
from aiogram.types import ChatMemberUpdated, InlineKeyboardButton, InlineKeyboardMarkup
from ..utils import get_plural_form, key_from_poll, key_from_user_chat
from .base import BaseTask, VerificationCallback, mute_while_task
from .utils import user_mention
class SimpleBaseTask(BaseTask):
pass
class SimpleVariantsBaseTaskConfig:
def __init__(self, task_type, config: dict):
self.config = config
self.task_type = task_type
@property
def timeout(self):
timeout = self.config.get(f"welcome::tasks::{self.task_type}::timeout")
if timeout is None:
return self.config.get("welcome::timeout")
return timeout
@property
def task_message_text(self):
return self.config.get(f"welcome::tasks::{self.task_type}::message_text")
@property
def task_retry_message_text(self):
return self.config.get(f"welcome::tasks::{self.task_type}::retry_message_text")
class SimpleVariantsBaseTask(SimpleBaseTask):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.config = None
self.variants = []
self.task = ""
self.correct = None
self.task_message_id = None
def set_config(self, cfg: SimpleVariantsBaseTaskConfig):
self.config = cfg
def get_timeout(self):
return self.config.timeout
@mute_while_task
class SimpleInlineButtonsTask(SimpleVariantsBaseTask):
async def init(self):
raise NotImplementedError()
def build_keyboard(self):
return InlineKeyboardMarkup(
inline_keyboard=[
[
InlineKeyboardButton(
text=str(option),
callback_data=VerificationCallback(
user_id=self.from_user_id,
chat_id=self.from_chat_id,
answer=str(option),
).pack(),
)
for option in self.variants
]
]
)
async def run(self):
await self.init()
message_template = Template(
self.config.task_message_text
if self.attempt_number == 1
else self.config.task_retry_message_text
)
chat_id = self.from_chat_id
message = await self.bot.send_message(
chat_id,
text=message_template.substitute(
mention=user_mention(self.from_user),
task=self.task,
attempts=get_plural_form(
self.attemps_left, "попытка", "попытки", "попыток"
),
),
reply_markup=self.build_keyboard(),
parse_mode=ParseMode.HTML,
)
self.task_message_id = message.message_id
await self.start_timeout_func()
return [key_from_user_chat(self.from_user_id, self.from_chat_id)]
async def verify(self, data):
return self.correct == data
async def end(self, success=True):
await self.bot.delete_message(self.from_chat_id, self.task_message_id)
if self.timeout_func_task:
self.timeout_func_task.cancel()
@mute_while_task
class SimplePollTask(SimpleVariantsBaseTask):
def __init__(
self,
event: ChatMemberUpdated,
bot: Bot,
timeout_func=None,
attempt_number=1,
max_attempts=1,
):
super().__init__(event, bot, timeout_func, attempt_number, max_attempts)
self.correct_index = None
async def init(self):
raise NotImplementedError()
async def run(self):
await self.init()
self.correct_index = self.variants.index(self.correct)
message_template = Template(
self.config.task_message_text
if self.attempt_number == 1
else self.config.task_retry_message_text
)
chat_id = self.from_chat_id
message = await self.bot.send_poll(
chat_id,
question=message_template.substitute(
mention=self.from_user.first_name,
task=self.task,
attempts=get_plural_form(
self.attemps_left, "попытка", "попытки", "попыток"
),
),
options=self.variants,
type=PollType.QUIZ,
correct_option_id=self.correct_index,
allows_multiple_answers=False,
is_anonymous=False,
# parse_mode=ParseMode.HTML
)
self.task_message_id = message.message_id
await self.start_timeout_func()
return [
key_from_poll(message.poll.id),
key_from_user_chat(self.from_user_id, self.from_chat_id),
]
async def verify(self, data):
return self.correct_index == data
async def end(self, success=True):
await self.bot.delete_message(self.from_chat_id, self.task_message_id)
if self.timeout_func_task:
self.timeout_func_task.cancel()

View File

@@ -0,0 +1,64 @@
import time
from aiogram import Bot
from aiogram.enums import ParseMode
from aiogram.types import ChatPermissions, User
def user_mention(user: User, mode=ParseMode.HTML):
if mode == ParseMode.HTML:
return f"<a href='tg://user?id={user.id}'>{user.first_name}</a>"
elif mode == ParseMode.MARKDOWN:
return f"[{user.first_name}](tg://user?id={user.id})"
else:
raise ValueError(f"Unknown parse mode {mode}")
async def mute_user(chat_id, user_id, until, bot: Bot):
end_time = until + int(time.time())
await bot.restrict_chat_member(
chat_id,
user_id,
until_date=end_time,
use_independent_chat_permissions=True,
permissions=ChatPermissions(
can_send_messages=False,
can_send_audios=False,
can_send_documents=False,
can_send_photos=False,
can_send_videos=False,
can_send_video_notes=False,
can_send_voice_notes=False,
can_send_polls=False,
can_send_other_messages=False,
can_add_web_page_previews=False,
can_change_info=False,
can_invite_users=False,
can_pin_messages=False,
can_manage_topics=False,
),
)
async def unmute_user(chat_id, user_id, bot: Bot):
await bot.restrict_chat_member(
chat_id,
user_id,
use_independent_chat_permissions=True,
permissions=ChatPermissions(
can_send_messages=True,
can_send_audios=True,
can_send_documents=True,
can_send_photos=True,
can_send_videos=True,
can_send_video_notes=True,
can_send_voice_notes=True,
can_send_polls=True,
can_send_other_messages=True,
can_add_web_page_previews=True,
can_change_info=True,
can_invite_users=True,
can_pin_messages=True,
can_manage_topics=True,
),
)

1465
src/karkas_blocks/poetry.lock generated Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,2 @@
[virtualenvs]
in-project = true

View File

@@ -0,0 +1,25 @@
[tool.poetry]
name = "karkas-blocks"
version = "0.1.0"
description = ""
authors = ["Maxim Slipenko <maxim@slipenko.com>"]
readme = "README.md"
[tool.poetry.dependencies]
python = "~3.12"
karkas-core = { path = "../karkas_core", develop = true }
peewee = "^3.17.6"
pyyaml = "^6.0.1"
dash = "^2.17.1"
dash-extensions = "^1.0.18"
dash-bootstrap-components = "^1.6.0"
[tool.poetry-monorepo.deps]
enabled = true
[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"