mirror of
https://gitflic.ru/project/alt-gnome/karkas.git
synced 2025-10-06 21:06:06 +03:00
Merged with private/new-module-system
This commit is contained in:
0
src/ocab_modules/__init__.py
Normal file
0
src/ocab_modules/__init__.py
Normal file
1
src/ocab_modules/external/__init__.py
vendored
Normal file
1
src/ocab_modules/external/__init__.py
vendored
Normal file
@@ -0,0 +1 @@
|
||||
from . import yandexgpt
|
0
src/ocab_modules/external/yandexgpt/__init__.py
vendored
Normal file
0
src/ocab_modules/external/yandexgpt/__init__.py
vendored
Normal file
27
src/ocab_modules/external/yandexgpt/handlers.py
vendored
Normal file
27
src/ocab_modules/external/yandexgpt/handlers.py
vendored
Normal file
@@ -0,0 +1,27 @@
|
||||
# flake8: noqa
|
||||
import asyncio
|
||||
|
||||
from aiogram import Bot
|
||||
from aiogram.types import Message
|
||||
|
||||
from ocab_modules.external.yandexgpt.yandexgpt import *
|
||||
from ocab_modules.standard.config.config import (
|
||||
get_yandexgpt_catalog_id,
|
||||
get_yandexgpt_prompt,
|
||||
get_yandexgpt_token,
|
||||
)
|
||||
from ocab_modules.standard.database.db_api import add_message
|
||||
|
||||
|
||||
async def answer_to_message(message: Message, bot: Bot):
|
||||
# print("answer_to_message")
|
||||
await log("answer_to_message")
|
||||
yagpt = YandexGPT(get_yandexgpt_token(), get_yandexgpt_catalog_id())
|
||||
text = message.text
|
||||
prompt = get_yandexgpt_prompt()
|
||||
# response = await yagpt.async_yandexgpt(system_prompt=prompt, input_messages=text)
|
||||
response = await yagpt.yandexgpt_request(
|
||||
chat_id=message.chat.id, message_id=message.message_id, type="yandexgpt"
|
||||
)
|
||||
reply = await message.reply(response, parse_mode="Markdown")
|
||||
add_message(reply, message_ai_model="yandexgpt")
|
6
src/ocab_modules/external/yandexgpt/info.json
vendored
Normal file
6
src/ocab_modules/external/yandexgpt/info.json
vendored
Normal file
@@ -0,0 +1,6 @@
|
||||
{
|
||||
"name": "YandexGPT",
|
||||
"description": "Модуль для работы с Yandex GPT",
|
||||
"author": "OCAB Team",
|
||||
"version": "1.0"
|
||||
}
|
10
src/ocab_modules/external/yandexgpt/routers.py
vendored
Normal file
10
src/ocab_modules/external/yandexgpt/routers.py
vendored
Normal file
@@ -0,0 +1,10 @@
|
||||
# flake8: noqa
|
||||
from aiogram import F, Router
|
||||
|
||||
from src.ocab_modules.external.yandexgpt.handlers import answer_to_message
|
||||
|
||||
router = Router()
|
||||
# Если сообщение содержит в начале текст "Гномик" или "гномик" или отвечает на сообщение бота, то вызывается функция answer_to_message
|
||||
router.message.register(
|
||||
answer_to_message, F.text.startswith("Гномик") | F.text.startswith("гномик")
|
||||
)
|
292
src/ocab_modules/external/yandexgpt/yandexgpt.py
vendored
Normal file
292
src/ocab_modules/external/yandexgpt/yandexgpt.py
vendored
Normal file
@@ -0,0 +1,292 @@
|
||||
# flake8: noqa
|
||||
import asyncio
|
||||
import json
|
||||
|
||||
import aiohttp
|
||||
import requests
|
||||
|
||||
from ocab_core.logger import log
|
||||
|
||||
from ...standard.config.config import *
|
||||
from ...standard.database import *
|
||||
|
||||
|
||||
class YandexGPT:
|
||||
token = None
|
||||
catalog_id = None
|
||||
languages = {
|
||||
"ru": "русский язык",
|
||||
"en": "английский язык",
|
||||
"de": "немецкий язык",
|
||||
"uk": "украинский язык",
|
||||
"es": "испанский язык",
|
||||
"be": "белорусский язык",
|
||||
}
|
||||
|
||||
def __init__(self, token, catalog_id):
|
||||
self.token = token
|
||||
self.catalog_id = catalog_id
|
||||
|
||||
async def async_request(self, url, headers, prompt) -> dict:
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.post(url, headers=headers, json=prompt) as response:
|
||||
return await response.json()
|
||||
|
||||
async def async_token_check(
|
||||
self, messages, gpt, max_tokens, stream, temperature, del_msg_id=1
|
||||
):
|
||||
url = "https://llm.api.cloud.yandex.net/foundationModels/v1/tokenizeCompletion"
|
||||
headers = {
|
||||
"Content-Type": "application/json",
|
||||
"Authorization": f"Api-Key {self.token}",
|
||||
}
|
||||
answer_token = get_yandexgpt_token_for_answer()
|
||||
while True:
|
||||
try:
|
||||
request = {
|
||||
"modelUri": gpt,
|
||||
"completionOptions": {
|
||||
"stream": stream,
|
||||
"temperature": temperature,
|
||||
"maxTokens": max_tokens,
|
||||
},
|
||||
"messages": messages,
|
||||
}
|
||||
response = await self.async_request(
|
||||
url=url, headers=headers, prompt=request
|
||||
)
|
||||
except Exception as e: # TODO: Переделать обработку ошибок
|
||||
# print(e)
|
||||
await log(f"Error: {e}")
|
||||
|
||||
continue
|
||||
if int(len(response["tokens"])) < (max_tokens - answer_token):
|
||||
break
|
||||
else:
|
||||
try:
|
||||
messages.pop(del_msg_id)
|
||||
except IndexError:
|
||||
Exception("IndexError: list index out of range")
|
||||
return messages
|
||||
|
||||
async def async_yandexgpt_lite(
|
||||
self,
|
||||
system_prompt,
|
||||
input_messages,
|
||||
stream=False,
|
||||
temperature=0.6,
|
||||
max_tokens=8000,
|
||||
):
|
||||
url = "https://llm.api.cloud.yandex.net/foundationModels/v1/completion"
|
||||
gpt = f"gpt://{self.catalog_id}/yandexgpt-lite/latest"
|
||||
headers = {
|
||||
"Content-Type": "application/json",
|
||||
"Authorization": f"Api-Key {self.token}",
|
||||
}
|
||||
|
||||
messages = [{"role": "system", "text": system_prompt}]
|
||||
for message in input_messages:
|
||||
messages.append(message)
|
||||
messages = await self.async_token_check(messages, gpt, max_tokens)
|
||||
|
||||
prompt = {
|
||||
"modelUri": gpt,
|
||||
"completionOptions": {
|
||||
"stream": stream,
|
||||
"temperature": temperature,
|
||||
"maxTokens": max_tokens,
|
||||
},
|
||||
"messages": messages,
|
||||
}
|
||||
|
||||
response = requests.post(url, headers=headers, json=prompt).text # nosec
|
||||
return json.loads(response)["result"]["alternatives"][0]["message"]["text"]
|
||||
|
||||
async def async_yandexgpt(
|
||||
self,
|
||||
system_prompt,
|
||||
input_messages,
|
||||
stream=False,
|
||||
temperature=0.6,
|
||||
max_tokens=get_yandexgpt_token_for_request(),
|
||||
):
|
||||
url = "https://llm.api.cloud.yandex.net/foundationModels/v1/completion"
|
||||
gpt = f"gpt://{self.catalog_id}/yandexgpt/latest"
|
||||
headers = {
|
||||
"Content-Type": "application/json",
|
||||
"Authorization": f"Api-Key {self.token}",
|
||||
}
|
||||
|
||||
messages = []
|
||||
messages.append({"role": "system", "text": system_prompt})
|
||||
for message in input_messages:
|
||||
messages.append(message)
|
||||
|
||||
messages = await self.async_token_check(
|
||||
messages, gpt, max_tokens, stream, temperature
|
||||
)
|
||||
|
||||
request = {
|
||||
"modelUri": gpt,
|
||||
"completionOptions": {
|
||||
"stream": stream,
|
||||
"temperature": temperature,
|
||||
"maxTokens": max_tokens,
|
||||
},
|
||||
"messages": messages,
|
||||
}
|
||||
response = await self.async_request(
|
||||
url=url, headers=headers, prompt=request
|
||||
) # nosec
|
||||
return response["result"]["alternatives"][0]["message"]["text"]
|
||||
|
||||
async def async_yandexgpt_translate(self, input_language, output_language, text):
|
||||
input_language = self.languages[input_language]
|
||||
output_language = self.languages[output_language]
|
||||
|
||||
return await self.async_yandexgpt(
|
||||
f"Переведи на {output_language} сохранив оригинальный смысл текста. Верни только результат:",
|
||||
[{"role": "user", "text": text}],
|
||||
stream=False,
|
||||
temperature=0.6,
|
||||
max_tokens=8000,
|
||||
)
|
||||
|
||||
async def async_yandexgpt_spelling_check(self, input_language, text):
|
||||
input_language = self.languages[input_language]
|
||||
|
||||
return await self.async_yandexgpt(
|
||||
f"Проверьте орфографию и пунктуацию текста на {input_language}. Верни исправленный текст "
|
||||
f"без смысловых искажений:",
|
||||
[{"role": "user", "text": text}],
|
||||
stream=False,
|
||||
temperature=0.6,
|
||||
max_tokens=8000,
|
||||
)
|
||||
|
||||
async def async_yandexgpt_text_history(
|
||||
self, input_messages, stream=False, temperature=0.6, max_tokens=8000
|
||||
):
|
||||
url = "https://llm.api.cloud.yandex.net/foundationModels/v1/completion"
|
||||
gpt = f"gpt://{self.catalog_id}/summarization/latest"
|
||||
headers = {
|
||||
"Content-Type": "application/json",
|
||||
"Authorization": f"Api-Key {self.token}",
|
||||
}
|
||||
|
||||
messages = []
|
||||
for message in input_messages:
|
||||
messages.append(message)
|
||||
messages = await self.async_token_check(messages, gpt, max_tokens, del_msg_id=0)
|
||||
|
||||
prompt = {
|
||||
"modelUri": gpt,
|
||||
"completionOptions": {
|
||||
"stream": stream,
|
||||
"temperature": temperature,
|
||||
"maxTokens": max_tokens,
|
||||
},
|
||||
"messages": messages,
|
||||
}
|
||||
|
||||
response = requests.post(url, headers=headers, json=prompt).text # nosec
|
||||
return json.loads(response)["result"]["alternatives"][0]["message"]["text"]
|
||||
|
||||
async def async_yandex_cloud_text_to_speech(
|
||||
self, text, voice, emotion, speed, format, quality
|
||||
):
|
||||
tts = "tts.api.cloud.yandex.net/speech/v1/tts:synthesize"
|
||||
# TODO: Сделать функцию TTS
|
||||
return 0
|
||||
|
||||
async def async_yandex_cloud_vision(self, image, features, language):
|
||||
# TODO: Сделать функцию Vision
|
||||
return 0
|
||||
|
||||
async def collect_messages(self, message_id, chat_id):
|
||||
messages = []
|
||||
# Собираем цепочку сообщений в формате: [{"role": "user", "text": "<Имя_пользователя>: Привет!"},
|
||||
# {"role": "assistant", "text": "Привет!"}]
|
||||
while True:
|
||||
message = db_api.get_message_text(chat_id, message_id)
|
||||
if db_api.get_message_ai_model(chat_id, message_id) != None:
|
||||
messages.append({"role": "assistant", "text": message})
|
||||
else:
|
||||
sender_name = db_api.get_user_name(
|
||||
db_api.get_message_sender_id(chat_id, message_id)
|
||||
)
|
||||
messages.append({"role": "user", "text": sender_name + ": " + message})
|
||||
message_id = db_api.get_answer_to_message_id(chat_id, message_id)
|
||||
if message_id is None:
|
||||
break
|
||||
return list(reversed(messages))
|
||||
|
||||
async def collecting_messages_for_history(
|
||||
self, start_message_id, end_message_id, chat_id
|
||||
):
|
||||
messages = []
|
||||
# Собираем цепочку сообщений в формате: [{"role": "user", "text": "<Имя_пользователя>: Привет!"},
|
||||
# {"role": "assistant", "text": "Привет!"}]
|
||||
while True:
|
||||
message = db_api.get_message_text(chat_id, start_message_id)
|
||||
if db_api.get_message_ai_model(chat_id, start_message_id) != None:
|
||||
messages.append({"role": "assistant", "text": message})
|
||||
else:
|
||||
sender_name = db_api.get_user_name(
|
||||
db_api.get_message_sender_id(chat_id, start_message_id)
|
||||
)
|
||||
messages.append({"role": "user", "text": sender_name + ": " + message})
|
||||
start_message_id -= 1
|
||||
if start_message_id <= end_message_id:
|
||||
break
|
||||
return messages.reverse()
|
||||
|
||||
async def yandexgpt_request(
|
||||
self,
|
||||
message_id=None,
|
||||
type="yandexgpt-lite",
|
||||
chat_id=None,
|
||||
message_id_end=None,
|
||||
input_language=None,
|
||||
output_language=None,
|
||||
text=None,
|
||||
):
|
||||
if type == "yandexgpt-lite":
|
||||
messages = await self.collect_messages(message_id, chat_id)
|
||||
return await self.async_yandexgpt_lite(
|
||||
system_prompt=get_yandexgpt_prompt(),
|
||||
input_messages=messages,
|
||||
stream=False,
|
||||
temperature=0.6,
|
||||
max_tokens=8000,
|
||||
)
|
||||
elif type == "yandexgpt":
|
||||
# print("yandexgpt_request")
|
||||
await log("yandexgpt_request")
|
||||
messages = await self.collect_messages(message_id, chat_id)
|
||||
return await self.async_yandexgpt(
|
||||
system_prompt=get_yandexgpt_prompt(),
|
||||
input_messages=messages,
|
||||
stream=False,
|
||||
temperature=0.6,
|
||||
max_tokens=get_yandexgpt_token_for_request(),
|
||||
)
|
||||
elif type == "yandexgpt-translate":
|
||||
return await self.async_yandexgpt_translate(
|
||||
input_language,
|
||||
output_language,
|
||||
text=db_api.get_message_text(chat_id, message_id),
|
||||
)
|
||||
elif type == "yandexgpt-spelling-check":
|
||||
return await self.async_yandexgpt_spelling_check(
|
||||
input_language, text=db_api.get_message_text(chat_id, message_id)
|
||||
)
|
||||
elif type == "yandexgpt-text-history":
|
||||
messages = await self.collect_messages_for_history(
|
||||
message_id, message_id_end, chat_id
|
||||
)
|
||||
return await self.async_yandexgpt_text_history(
|
||||
messages=messages, stream=False, temperature=0.6, max_tokens=8000
|
||||
)
|
||||
else:
|
||||
return "Ошибка: Неизвестный тип запроса | Error: Unknown request type"
|
1
src/ocab_modules/standard/admin/__init__.py
Normal file
1
src/ocab_modules/standard/admin/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
from . import routers
|
56
src/ocab_modules/standard/admin/handlers.py
Normal file
56
src/ocab_modules/standard/admin/handlers.py
Normal file
@@ -0,0 +1,56 @@
|
||||
# flake8: noqa
|
||||
import time
|
||||
|
||||
from aiogram import Bot
|
||||
from aiogram.types import Message
|
||||
|
||||
from src.ocab_modules.standard.config.config import get_default_chat_tag
|
||||
|
||||
|
||||
async def delete_message(message: Message, bot: Bot):
|
||||
reply_message_id = message.reply_to_message.message_id
|
||||
await bot.delete_message(message.chat.id, reply_message_id)
|
||||
|
||||
|
||||
async def error_access(message: Message, bot: Bot):
|
||||
await message.reply("Вы не админ/модератор")
|
||||
|
||||
|
||||
async def get_chat_id(message: Message, bot: Bot):
|
||||
await message.reply(
|
||||
f"ID данного чата: `{message.chat.id}`", parse_mode="MarkdownV2"
|
||||
)
|
||||
|
||||
|
||||
async def chat_not_in_approve_list(message: Message, bot: Bot):
|
||||
await message.reply(
|
||||
f"Бот недоступен в данном чате, пожалуйста,"
|
||||
f" обратитесь к администратору для добавления чата в список доступных или перейдите в чат "
|
||||
f"{get_default_chat_tag()}"
|
||||
)
|
||||
await get_chat_id(message, bot)
|
||||
|
||||
|
||||
async def mute_user(chat_id: int, user_id: int, time: int, bot: Bot):
|
||||
# *, can_send_messages: bool | None = None, can_send_audios: bool | None = None, can_send_documents: bool | None = None, can_send_photos: bool | None = None, can_send_videos: bool | None = None, can_send_video_notes: bool | None = None, can_send_voice_notes: bool | None = None, can_send_polls: bool | None = None, can_send_other_messages: bool | None = None, can_add_web_page_previews: bool | None = None, can_change_info: bool | None = None, can_invite_users: bool | None = None, can_pin_messages: bool | None = None, can_manage_topics: bool | None = None, **extra_data: Any)
|
||||
|
||||
mutePermissions = {
|
||||
"can_send_messages": False,
|
||||
"can_send_audios": False,
|
||||
"can_send_documents": False,
|
||||
"can_send_photos": False,
|
||||
"can_send_videos": False,
|
||||
"can_send_video_notes": False,
|
||||
"can_send_voice_notes": False,
|
||||
"can_send_polls": False,
|
||||
"can_send_other_messages": False,
|
||||
"can_add_web_page_previews": False,
|
||||
"can_change_info": False,
|
||||
"can_invite_users": False,
|
||||
"can_pin_messages": False,
|
||||
"can_manage_topics": False,
|
||||
}
|
||||
end_time = time + int(time.time())
|
||||
await bot.restrict_chat_member(
|
||||
chat_id, user_id, until_date=end_time, **mutePermissions
|
||||
)
|
6
src/ocab_modules/standard/admin/info.json
Normal file
6
src/ocab_modules/standard/admin/info.json
Normal file
@@ -0,0 +1,6 @@
|
||||
{
|
||||
"name": "Admin",
|
||||
"description": "Модуль для работы с админкой",
|
||||
"author": "OCAB Team",
|
||||
"version": "1.0"
|
||||
}
|
24
src/ocab_modules/standard/admin/routers.py
Normal file
24
src/ocab_modules/standard/admin/routers.py
Normal file
@@ -0,0 +1,24 @@
|
||||
# flake8: noqa
|
||||
from aiogram import F, Router
|
||||
|
||||
from src.ocab_modules.standard.admin.handlers import (
|
||||
chat_not_in_approve_list,
|
||||
delete_message,
|
||||
error_access,
|
||||
get_chat_id,
|
||||
)
|
||||
from src.ocab_modules.standard.filters.filters import (
|
||||
ChatModerOrAdminFilter,
|
||||
ChatNotInApproveFilter,
|
||||
)
|
||||
|
||||
router = Router()
|
||||
|
||||
# Если сообщение содержит какой либо текст и выполняется фильтр ChatNotInApproveFilter, то вызывается функция chat_not_in_approve_list
|
||||
router.message.register(chat_not_in_approve_list, ChatNotInApproveFilter(), F.text)
|
||||
|
||||
router.message.register(get_chat_id, ChatModerOrAdminFilter(), F.text == "/chatID")
|
||||
|
||||
router.message.register(delete_message, ChatModerOrAdminFilter(), F.text == "/rm")
|
||||
router.message.register(error_access, F.text == "/rm")
|
||||
router.message.register(error_access, F.text == "/chatID")
|
1
src/ocab_modules/standard/config/__init__.py
Normal file
1
src/ocab_modules/standard/config/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
from .config import config
|
75
src/ocab_modules/standard/config/config.py
Normal file
75
src/ocab_modules/standard/config/config.py
Normal file
@@ -0,0 +1,75 @@
|
||||
# flake8: noqa
|
||||
|
||||
import yaml
|
||||
|
||||
from src.service import paths
|
||||
|
||||
|
||||
def get_config(is_test: bool = False) -> dict:
|
||||
if is_test:
|
||||
path = f"{paths.modules_standard}/config/tests"
|
||||
else:
|
||||
path = paths.core
|
||||
path = f"{path}/config.yaml"
|
||||
|
||||
with open(path, "r") as file:
|
||||
return yaml.full_load(file)
|
||||
|
||||
|
||||
config = get_config()
|
||||
|
||||
|
||||
def get_telegram_token() -> str:
|
||||
return config["TELEGRAM"]["TOKEN"]
|
||||
|
||||
|
||||
def get_telegram_check_bot() -> bool:
|
||||
return config["TELEGRAM"]["CHECK_BOT"]
|
||||
|
||||
|
||||
def get_aproved_chat_id() -> list:
|
||||
# Возваращем сплитованный список id чатов в формате int
|
||||
return [
|
||||
int(chat_id) for chat_id in config["TELEGRAM"]["APPROVED_CHAT_ID"].split(" | ")
|
||||
]
|
||||
|
||||
|
||||
def get_user_role_name(role_number) -> dict:
|
||||
# Возвращаем название роли пользвателя по номеру роли, если такой роли нет, возвращаем неизвестно
|
||||
return config["ROLES"].get(role_number, "Неизвестно")
|
||||
|
||||
|
||||
def get_default_chat_tag() -> str:
|
||||
return config["TELEGRAM"]["DEFAULT_CHAT_TAG"]
|
||||
|
||||
|
||||
def get_yandexgpt_token() -> str:
|
||||
return config["YANDEXGPT"]["TOKEN"]
|
||||
|
||||
|
||||
def get_yandexgpt_catalog_id() -> str:
|
||||
return config["YANDEXGPT"]["CATALOGID"]
|
||||
|
||||
|
||||
def get_yandexgpt_prompt() -> str:
|
||||
return config["YANDEXGPT"]["PROMPT"]
|
||||
|
||||
|
||||
def get_yandexgpt_start_words() -> list:
|
||||
return config["YANDEXGPT"]["STARTWORD"].split(" | ")
|
||||
|
||||
|
||||
def get_yandexgpt_in_words() -> list:
|
||||
return config["YANDEXGPT"]["INWORD"].split(" | ")
|
||||
|
||||
|
||||
def get_yandexgpt_token_for_request() -> int:
|
||||
return config["YANDEXGPT"]["TOKEN_FOR_REQUEST"]
|
||||
|
||||
|
||||
def get_yandexgpt_token_for_answer() -> int:
|
||||
return config["YANDEXGPT"]["TOKEN_FOR_ANSWER"]
|
||||
|
||||
|
||||
def get_access_rights() -> dict:
|
||||
return get_config()["ACCESS_RIGHTS"]
|
9
src/ocab_modules/standard/config/info.json
Normal file
9
src/ocab_modules/standard/config/info.json
Normal file
@@ -0,0 +1,9 @@
|
||||
{
|
||||
"id": "standard.config",
|
||||
"name": "Config YAML",
|
||||
"description": "Модуль для работы с конфигурационным файлом бота (YAML)",
|
||||
"author": "OCAB Team",
|
||||
"version": "1.0.0",
|
||||
"privileged": true,
|
||||
"dependencies": {}
|
||||
}
|
7
src/ocab_modules/standard/config/tests/config.yaml
Normal file
7
src/ocab_modules/standard/config/tests/config.yaml
Normal file
@@ -0,0 +1,7 @@
|
||||
TELEGRAM:
|
||||
TOKEN: xxxxxxxxxxxxxxxxxxxx
|
||||
ROLES:
|
||||
ADMIN: 0
|
||||
MODERATOR: 1
|
||||
USER: 2
|
||||
BOT: 3
|
45
src/ocab_modules/standard/config/tests/test_config.py
Normal file
45
src/ocab_modules/standard/config/tests/test_config.py
Normal file
@@ -0,0 +1,45 @@
|
||||
import unittest
|
||||
|
||||
from src.ocab_modules.standard.config.config import get_config
|
||||
|
||||
yaml_load = get_config(is_test=True)
|
||||
|
||||
|
||||
class TestConfig(unittest.TestCase):
|
||||
def test_yaml_load_correctness(self):
|
||||
self.assertIsNotNone(yaml_load)
|
||||
self.assertIn("TELEGRAM", yaml_load)
|
||||
self.assertIn("TOKEN", yaml_load["TELEGRAM"])
|
||||
self.assertIn("ROLES", yaml_load)
|
||||
self.assertIn("ADMIN", yaml_load["ROLES"])
|
||||
self.assertIn("MODERATOR", yaml_load["ROLES"])
|
||||
self.assertIn("USER", yaml_load["ROLES"])
|
||||
self.assertIn("BOT", yaml_load["ROLES"])
|
||||
|
||||
def test_yaml_keys_existence(self):
|
||||
self.assertTrue(all(key in yaml_load for key in ["TELEGRAM", "ROLES"]))
|
||||
self.assertIn("TOKEN", yaml_load["TELEGRAM"])
|
||||
self.assertTrue(
|
||||
all(role in yaml_load["ROLES"] for role in ["ADMIN", "MODERATOR", "USER"])
|
||||
)
|
||||
|
||||
def test_yaml_yaml_load_types(self):
|
||||
self.assertIsInstance(yaml_load["TELEGRAM"]["TOKEN"], str)
|
||||
self.assertTrue(
|
||||
all(
|
||||
isinstance(yaml_load["ROLES"][role], int)
|
||||
for role in ["ADMIN", "MODERATOR", "USER"]
|
||||
)
|
||||
)
|
||||
|
||||
def test_yaml_values(self):
|
||||
expected_token = "xxxxxxxxxxxxxxxxxxxx" # nosec
|
||||
expected_role_values = {"ADMIN": 0, "MODERATOR": 1, "USER": 2, "BOT": 3}
|
||||
|
||||
self.assertEqual(yaml_load["TELEGRAM"]["TOKEN"], expected_token)
|
||||
for role, value in expected_role_values.items():
|
||||
self.assertEqual(yaml_load["ROLES"][role], value)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
48
src/ocab_modules/standard/database/README.md
Normal file
48
src/ocab_modules/standard/database/README.md
Normal file
@@ -0,0 +1,48 @@
|
||||
## Модуль DataBase
|
||||
|
||||
Модуль DataBase предназначен для ведения и работы с базами данных OCAB.
|
||||
|
||||
Модуль содержит в себе следующие таблицы:
|
||||
|
||||
* `Chats` - таблица для хранения информации о чатах.
|
||||
* `Users` - таблица для хранения информации о пользователях.
|
||||
* `Messages` - таблица для хранения информации о сообщениях.
|
||||
* `ChatStats` - таблица для хранения статистики чатов по дням.
|
||||
* `UserStats` - таблица для хранения статистики пользователей по дням.
|
||||
|
||||
Cтруктура таблицы `Chats`:
|
||||
* `chat_id` - идентификатор чата.
|
||||
* `chat_name` - название чата.
|
||||
* `chat_type` - тип чата. (0 - Чат администраторов, 1 - Пользовательский чат, 3 - Чат разрешённых личных запросов к боту
|
||||
10 - Не инициализированный чат)
|
||||
* `chat_stats` - количество всех отправленных сообщений в чате.
|
||||
|
||||
Cтруктура таблицы `Users`:
|
||||
* `user_id` - идентификатор пользователя telegram.
|
||||
* `user_tag` - тег пользователя telegram.
|
||||
* `user_name` - имя пользователя telegram.
|
||||
* `user_role` - роль пользователя в чате. (0 - Администратор, 1 - Модератор, 2 - Пользователь)
|
||||
* `user_stats` - количество всех отправленных сообщений пользователем.
|
||||
* `user_rep` - репутация пользователя.
|
||||
|
||||
Cтруктура таблицы `Messages`:
|
||||
* `message_chat_id` - идентификатор чата в котором отправлено сообщение.
|
||||
* `message_id` - идентификатор сообщения.
|
||||
* `messag_sender_id` - идентификатор пользователя отправившего сообщение. Если сообщение отправил бот, то
|
||||
`messag_sender_id` = 0.
|
||||
* `answer_to_message_id` - идентификатор сообщения на которое дан ответ. Если ответа нет или ответ на служебное
|
||||
сообщение о создании топика в чатах с форумным типом, то `answer_to_message_id` = 0.
|
||||
* `message_ai_model` - идентификатор модели нейросети, которая использовалась для генерации ответа. Если ответ'
|
||||
сгенерирован не был, то `message_ai_model` = null.
|
||||
* `message_text` - текст сообщения.
|
||||
|
||||
Cтруктура таблицы `ChatStats`:
|
||||
* `chat_id` - идентификатор чата для которого собрана статистика.
|
||||
* `date` - дата на которую собрана статистика.
|
||||
* `messages_count` - количество сообщений отправленных в чат за день.
|
||||
|
||||
Cтруктура таблицы `UserStats`:
|
||||
* `chat_id` - идентификатор чата для которого собрана статистика.
|
||||
* `user_id` - идентификатор пользователя для которого собрана статистика.
|
||||
* `date` - дата на которую собрана статистика.
|
||||
* `messages_count` - количество сообщений отправленных пользователем в чат за день.
|
5
src/ocab_modules/standard/database/__init__.py
Normal file
5
src/ocab_modules/standard/database/__init__.py
Normal file
@@ -0,0 +1,5 @@
|
||||
from . import db_api, models
|
||||
|
||||
|
||||
def module_init():
|
||||
db_api.connect_database()
|
307
src/ocab_modules/standard/database/db_api.py
Normal file
307
src/ocab_modules/standard/database/db_api.py
Normal file
@@ -0,0 +1,307 @@
|
||||
import peewee as pw
|
||||
from aiogram.types import Message
|
||||
|
||||
from src.service import paths
|
||||
|
||||
from .exceptions import MissingModuleName, NotExpectedModuleName
|
||||
from .models.chat_stats import ChatStats
|
||||
from .models.chats import Chats
|
||||
from .models.db import database_proxy
|
||||
from .models.messages import Messages
|
||||
from .models.user_stats import UserStats
|
||||
from .models.users import Users
|
||||
|
||||
|
||||
def connect_database(is_test: bool = False, module: str | None = None):
|
||||
if is_test:
|
||||
if not module:
|
||||
raise MissingModuleName()
|
||||
db_path = f"{paths.modules_standard}/{module}/tests/database"
|
||||
else:
|
||||
if module:
|
||||
raise NotExpectedModuleName()
|
||||
db_path = f"{paths.core}/database"
|
||||
|
||||
database = pw.SqliteDatabase(f"{db_path}/OCAB.db")
|
||||
database_proxy.initialize(database)
|
||||
database.connect()
|
||||
create_tables(database)
|
||||
|
||||
return database, f"{db_path}/OCAB.db"
|
||||
|
||||
|
||||
def create_tables(db: pw.SqliteDatabase):
|
||||
"""Создание таблиц"""
|
||||
for table in Chats, Messages, Users, UserStats, ChatStats:
|
||||
if not table.table_exists():
|
||||
db.create_tables([table])
|
||||
|
||||
|
||||
def add_chat(chat_id, chat_name, chat_type=10, chat_stats=0):
|
||||
chat, created = Chats.get_or_create(
|
||||
id=chat_id,
|
||||
defaults={
|
||||
"chat_name": chat_name,
|
||||
"chat_type": chat_type,
|
||||
"chat_all_stat": chat_stats,
|
||||
},
|
||||
)
|
||||
if not created:
|
||||
# Обновить существующий чат, если он уже существует
|
||||
chat.chat_name = chat_name
|
||||
chat.chat_type = chat_type
|
||||
chat.chat_stats = chat_stats
|
||||
chat.save()
|
||||
|
||||
|
||||
def add_user(
|
||||
user_id,
|
||||
user_first_name,
|
||||
user_last_name=None,
|
||||
user_tag=None,
|
||||
user_role=0,
|
||||
user_stats=0,
|
||||
user_rep=0,
|
||||
):
|
||||
if user_last_name is None:
|
||||
user_name = user_first_name
|
||||
else:
|
||||
user_name = user_first_name + " " + user_last_name
|
||||
|
||||
user, created = Users.get_or_create(
|
||||
id=user_id,
|
||||
defaults={
|
||||
"user_tag": user_tag,
|
||||
"user_name": user_name,
|
||||
"user_role": user_role,
|
||||
"user_stats": user_stats,
|
||||
"user_rep": user_rep,
|
||||
},
|
||||
)
|
||||
if not created:
|
||||
# Обновить существующего пользователя, если он уже существует
|
||||
user.user_tag = user_tag
|
||||
user.user_name = user_name
|
||||
user.user_role = user_role
|
||||
user.user_stats = user_stats
|
||||
user.user_rep = user_rep
|
||||
user.save()
|
||||
|
||||
|
||||
def add_message(message: Message, message_ai_model=None):
|
||||
if message.reply_to_message:
|
||||
answer_to_message_id = message.reply_to_message.message_id
|
||||
else:
|
||||
answer_to_message_id = None
|
||||
Messages.create(
|
||||
message_chat_id=message.chat.id,
|
||||
message_id=message.message_id,
|
||||
message_sender_id=message.from_user.id,
|
||||
answer_to_message_id=answer_to_message_id,
|
||||
message_ai_model=message_ai_model,
|
||||
message_text=message.text,
|
||||
)
|
||||
|
||||
|
||||
def add_chat_stats(chat_id, date, messages_count):
|
||||
ChatStats.create(chat_id=chat_id, date=date, messages_count=messages_count)
|
||||
|
||||
|
||||
def add_user_stats(chat_id, user_id, date, messages_count):
|
||||
UserStats.create(
|
||||
chat_id=chat_id, user_id=user_id, date=date, messages_count=messages_count
|
||||
)
|
||||
|
||||
|
||||
# Работа с таблицей чатов
|
||||
|
||||
|
||||
def get_chat(chat_id):
|
||||
return Chats.get_or_none(Chats.id == chat_id)
|
||||
|
||||
|
||||
def change_chat_name(chat_id, new_chat_name):
|
||||
query = Chats.update(chat_name=new_chat_name).where(Chats.id == chat_id)
|
||||
query.execute()
|
||||
|
||||
|
||||
def change_chat_type(chat_id, new_chat_type):
|
||||
query = Chats.update(chat_type=new_chat_type).where(Chats.id == chat_id)
|
||||
query.execute()
|
||||
|
||||
|
||||
def get_chat_all_stat(chat_id):
|
||||
chat = Chats.get_or_none(Chats.id == chat_id)
|
||||
return chat.chat_all_stat if chat else None
|
||||
|
||||
|
||||
# Работа с таблицей пользователей
|
||||
|
||||
|
||||
def get_user(user_id) -> Users | None:
|
||||
return Users.get_or_none(Users.id == user_id)
|
||||
|
||||
|
||||
def get_user_tag(user_id):
|
||||
user = Users.get_or_none(Users.id == user_id)
|
||||
return user.user_tag if user else None
|
||||
|
||||
|
||||
def get_user_id(user_tag):
|
||||
user = Users.get_or_none(Users.user_tag == user_tag)
|
||||
return user.id if user else None
|
||||
|
||||
|
||||
def get_user_name(user_id):
|
||||
user = Users.get_or_none(Users.id == user_id)
|
||||
return user.user_name if user else None
|
||||
|
||||
|
||||
def get_user_role(user_id):
|
||||
user = Users.get_or_none(Users.id == user_id)
|
||||
return user.user_role if user else None
|
||||
|
||||
|
||||
def get_user_all_stats(user_id):
|
||||
user = Users.get_or_none(Users.id == user_id)
|
||||
return user.user_stats if user else None
|
||||
|
||||
|
||||
def get_user_rep(user_id):
|
||||
user = Users.get_or_none(Users.id == user_id)
|
||||
return user.user_rep if user else None
|
||||
|
||||
|
||||
def change_user_name(user_id, user_first_name, user_last_name=None):
|
||||
if user_last_name is None:
|
||||
new_user_name = user_first_name
|
||||
else:
|
||||
new_user_name = user_first_name + " " + user_last_name
|
||||
query = Users.update(user_name=new_user_name).where(Users.id == user_id)
|
||||
query.execute()
|
||||
|
||||
|
||||
def change_user_tag(user_id, new_user_tag):
|
||||
query = Users.update(user_tag=new_user_tag).where(Users.id == user_id)
|
||||
query.execute()
|
||||
|
||||
|
||||
def change_user_role(user_id, new_user_role):
|
||||
query = Users.update(user_role=new_user_role).where(Users.id == user_id)
|
||||
query.execute()
|
||||
|
||||
|
||||
# Работа с таблицей сообщений
|
||||
|
||||
|
||||
def get_message(message_chat_id, message_id):
|
||||
return Messages.get_or_none(
|
||||
Messages.message_chat_id == message_chat_id,
|
||||
Messages.message_id == message_id,
|
||||
)
|
||||
|
||||
|
||||
def get_message_sender_id(message_chat_id, message_id):
|
||||
message = Messages.get_or_none(
|
||||
Messages.message_chat_id == message_chat_id, Messages.message_id == message_id
|
||||
)
|
||||
return message.message_sender_id if message else None
|
||||
|
||||
|
||||
def get_message_text(message_chat_id, message_id):
|
||||
message = Messages.get_or_none(
|
||||
Messages.message_chat_id == message_chat_id, Messages.message_id == message_id
|
||||
)
|
||||
return message.message_text if message else None
|
||||
|
||||
|
||||
def get_message_ai_model(message_chat_id, message_id):
|
||||
message = Messages.get_or_none(
|
||||
Messages.message_chat_id == message_chat_id, Messages.message_id == message_id
|
||||
)
|
||||
return message.message_ai_model if message else None
|
||||
|
||||
|
||||
def get_answer_to_message_id(message_chat_id, message_id):
|
||||
message = Messages.get_or_none(
|
||||
Messages.message_chat_id == message_chat_id, Messages.message_id == message_id
|
||||
)
|
||||
return message.answer_to_message_id if message else None
|
||||
|
||||
|
||||
# Работа с таблицей статистики чатов
|
||||
|
||||
|
||||
def get_chat_stats(chat_id):
|
||||
chat_stats = {}
|
||||
for chat_stat in ChatStats.select().where(ChatStats.chat_id == chat_id):
|
||||
chat_stats[chat_stat.date] = chat_stat.messages_count
|
||||
return chat_stats
|
||||
|
||||
|
||||
# Работа с таблицей статистики пользователей
|
||||
|
||||
|
||||
def get_user_stats(user_id):
|
||||
user_stats = {}
|
||||
for user_stat in UserStats.select().where(UserStats.user_id == user_id):
|
||||
user_stats[user_stat.date] = user_stat.messages_count
|
||||
return user_stats
|
||||
|
||||
|
||||
# Функции обновления
|
||||
|
||||
|
||||
def update_chat_all_stat(chat_id):
|
||||
query = Chats.update(chat_all_stat=Chats.chat_all_stat + 1).where(
|
||||
Chats.id == chat_id
|
||||
)
|
||||
query.execute()
|
||||
|
||||
|
||||
def update_chat_stats(chat_id, date):
|
||||
chat_stats = ChatStats.get_or_none(
|
||||
ChatStats.chat_id == chat_id, ChatStats.date == date
|
||||
)
|
||||
if chat_stats:
|
||||
query = ChatStats.update(messages_count=ChatStats.messages_count + 1).where(
|
||||
ChatStats.chat_id == chat_id, ChatStats.date == date
|
||||
)
|
||||
query.execute()
|
||||
else:
|
||||
ChatStats.create(chat_id=chat_id, date=date, messages_count=1)
|
||||
|
||||
|
||||
def update_user_all_stat(user_id):
|
||||
user = Users.get_or_none(Users.id == user_id)
|
||||
if user:
|
||||
query = Users.update(user_stats=Users.user_stats + 1).where(Users.id == user_id)
|
||||
query.execute()
|
||||
else:
|
||||
Users.create(id=user_id, user_stats=1)
|
||||
|
||||
|
||||
def update_user_rep(user_id):
|
||||
user = Users.get_or_none(Users.id == user_id)
|
||||
if user:
|
||||
query = Users.update(user_rep=Users.user_rep + 1).where(Users.id == user_id)
|
||||
query.execute()
|
||||
else:
|
||||
Users.create(id=user_id, user_rep=1)
|
||||
|
||||
|
||||
def update_user_stats(chat_id, user_id, date):
|
||||
user_stats = UserStats.get_or_none(
|
||||
UserStats.chat_id == chat_id,
|
||||
UserStats.user_id == user_id,
|
||||
UserStats.date == date,
|
||||
)
|
||||
if user_stats:
|
||||
query = UserStats.update(messages_count=UserStats.messages_count + 1).where(
|
||||
UserStats.chat_id == chat_id,
|
||||
UserStats.user_id == user_id,
|
||||
UserStats.date == date,
|
||||
)
|
||||
query.execute()
|
||||
else:
|
||||
UserStats.create(chat_id=chat_id, user_id=user_id, date=date, messages_count=1)
|
12
src/ocab_modules/standard/database/exceptions.py
Normal file
12
src/ocab_modules/standard/database/exceptions.py
Normal file
@@ -0,0 +1,12 @@
|
||||
class MissingModuleName(BaseException):
|
||||
def __init__(self):
|
||||
self.message = "Пропущено название директории модуля"
|
||||
|
||||
super().__init__(self.message)
|
||||
|
||||
|
||||
class NotExpectedModuleName(BaseException):
|
||||
def __init__(self):
|
||||
self.message = "Не ожидалось название директории модуля"
|
||||
|
||||
super().__init__(self.message)
|
9
src/ocab_modules/standard/database/info.json
Normal file
9
src/ocab_modules/standard/database/info.json
Normal file
@@ -0,0 +1,9 @@
|
||||
{
|
||||
"id": "standard.database",
|
||||
"name": "Database",
|
||||
"description": "Модуль для работы с БД",
|
||||
"author": "OCAB Team",
|
||||
"version": "1.0.0",
|
||||
"privileged": true,
|
||||
"dependencies": {}
|
||||
}
|
12
src/ocab_modules/standard/database/models/chat_stats.py
Normal file
12
src/ocab_modules/standard/database/models/chat_stats.py
Normal file
@@ -0,0 +1,12 @@
|
||||
import peewee as pw
|
||||
|
||||
from .db import database_proxy
|
||||
|
||||
|
||||
class ChatStats(pw.Model):
|
||||
class Meta:
|
||||
database = database_proxy
|
||||
|
||||
chat_id = pw.IntegerField(null=False)
|
||||
date = pw.DateField(null=False)
|
||||
messages_count = pw.IntegerField(null=False, default=0)
|
12
src/ocab_modules/standard/database/models/chats.py
Normal file
12
src/ocab_modules/standard/database/models/chats.py
Normal file
@@ -0,0 +1,12 @@
|
||||
import peewee as pw
|
||||
|
||||
from .db import database_proxy
|
||||
|
||||
|
||||
class Chats(pw.Model):
|
||||
class Meta:
|
||||
database = database_proxy
|
||||
|
||||
chat_name = pw.CharField(null=False)
|
||||
chat_type = pw.IntegerField(null=False, default=10)
|
||||
chat_all_stat = pw.IntegerField(null=False)
|
3
src/ocab_modules/standard/database/models/db.py
Normal file
3
src/ocab_modules/standard/database/models/db.py
Normal file
@@ -0,0 +1,3 @@
|
||||
from peewee import DatabaseProxy
|
||||
|
||||
database_proxy = DatabaseProxy()
|
15
src/ocab_modules/standard/database/models/messages.py
Normal file
15
src/ocab_modules/standard/database/models/messages.py
Normal file
@@ -0,0 +1,15 @@
|
||||
import peewee as pw
|
||||
|
||||
from .db import database_proxy
|
||||
|
||||
|
||||
class Messages(pw.Model):
|
||||
class Meta:
|
||||
database = database_proxy
|
||||
|
||||
message_chat_id = pw.IntegerField(null=False)
|
||||
message_id = pw.IntegerField(null=False)
|
||||
message_sender_id = pw.IntegerField(null=False)
|
||||
answer_to_message_id = pw.IntegerField(null=True)
|
||||
message_ai_model = pw.TextField(null=True)
|
||||
message_text = pw.TextField(null=False)
|
13
src/ocab_modules/standard/database/models/user_stats.py
Normal file
13
src/ocab_modules/standard/database/models/user_stats.py
Normal file
@@ -0,0 +1,13 @@
|
||||
import peewee as pw
|
||||
|
||||
from .db import database_proxy
|
||||
|
||||
|
||||
class UserStats(pw.Model):
|
||||
class Meta:
|
||||
database = database_proxy
|
||||
|
||||
chat_id = pw.IntegerField(null=False)
|
||||
user_id = pw.IntegerField(null=False)
|
||||
date = pw.DateField(null=False)
|
||||
messages_count = pw.IntegerField(null=False, default=0)
|
14
src/ocab_modules/standard/database/models/users.py
Normal file
14
src/ocab_modules/standard/database/models/users.py
Normal file
@@ -0,0 +1,14 @@
|
||||
import peewee as pw
|
||||
|
||||
from .db import database_proxy
|
||||
|
||||
|
||||
class Users(pw.Model):
|
||||
class Meta:
|
||||
database = database_proxy
|
||||
|
||||
user_tag = pw.CharField(null=True)
|
||||
user_name = pw.CharField(null=False) # до 255 символов
|
||||
user_role = pw.IntegerField(null=True, default=3)
|
||||
user_stats = pw.IntegerField(null=True, default=0)
|
||||
user_rep = pw.IntegerField(null=True, default=0)
|
1
src/ocab_modules/standard/database/tests/database/file
Normal file
1
src/ocab_modules/standard/database/tests/database/file
Normal file
@@ -0,0 +1 @@
|
||||
Эта директория для тестовой БД
|
142
src/ocab_modules/standard/database/tests/test_db.py
Normal file
142
src/ocab_modules/standard/database/tests/test_db.py
Normal file
@@ -0,0 +1,142 @@
|
||||
# flake8: noqa
|
||||
|
||||
import os
|
||||
import unittest
|
||||
|
||||
from ...exceptions.module_exceptions import MissingModuleName, NotExpectedModuleName
|
||||
from ..db_api import *
|
||||
|
||||
|
||||
class TestDatabaseAPI(unittest.TestCase):
|
||||
database = None
|
||||
path = None
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
cls.database, cls.path = connect_database(is_test=True, module="database")
|
||||
create_tables(cls.database)
|
||||
|
||||
def test_fail_connect(cls):
|
||||
with cls.assertRaises(MissingModuleName):
|
||||
cls.database, cls.path = connect_database(is_test=True)
|
||||
|
||||
with cls.assertRaises(NotExpectedModuleName):
|
||||
cls.database, cls.path = connect_database(module="database")
|
||||
|
||||
def test_add_and_get_chat(self):
|
||||
add_chat(chat_id=21, chat_role=0, chat_stats=0, chat_federation=0)
|
||||
add_chat(chat_id=22, chat_role=1, chat_stats=100, chat_federation=1)
|
||||
|
||||
chat1 = get_chat(21)
|
||||
self.assertIsNotNone(chat1)
|
||||
self.assertEqual(chat1.id, 21)
|
||||
self.assertEqual(chat1.chat_role, 0)
|
||||
|
||||
chat2 = get_chat(22)
|
||||
self.assertIsNotNone(chat2)
|
||||
self.assertEqual(chat2.id, 22)
|
||||
self.assertEqual(chat2.chat_role, 1)
|
||||
|
||||
def test_add_and_get_message(self):
|
||||
add_message(
|
||||
message_id=1, message_text="Test Message 1", message_sender=1, answer_id=2
|
||||
)
|
||||
add_message(
|
||||
message_id=2, message_text="Test Message 2", message_sender=2, answer_id=1
|
||||
)
|
||||
|
||||
message1 = get_message(1)
|
||||
self.assertIsNotNone(message1)
|
||||
self.assertEqual(message1.id, 1)
|
||||
self.assertEqual(message1.message_text, "Test Message 1")
|
||||
|
||||
message2 = get_message(2)
|
||||
self.assertIsNotNone(message2)
|
||||
self.assertEqual(message2.id, 2)
|
||||
self.assertEqual(message2.message_text, "Test Message 2")
|
||||
|
||||
def test_add_and_get_user(self):
|
||||
add_user(
|
||||
user_id=100,
|
||||
user_name="TestUser1",
|
||||
user_tag="TestTag1",
|
||||
user_role=0,
|
||||
user_stats=10,
|
||||
user_rep=5,
|
||||
)
|
||||
add_user(
|
||||
user_id=101,
|
||||
user_name="TestUser2",
|
||||
user_tag="TestTag2",
|
||||
user_role=1,
|
||||
user_stats=20,
|
||||
user_rep=10,
|
||||
)
|
||||
|
||||
user1 = get_user(100)
|
||||
self.assertIsNotNone(user1)
|
||||
self.assertEqual(user1.id, 100)
|
||||
self.assertEqual(user1.user_name, "TestUser1")
|
||||
|
||||
user2 = get_user(101)
|
||||
self.assertIsNotNone(user2)
|
||||
self.assertEqual(user2.id, 101)
|
||||
self.assertEqual(user2.user_name, "TestUser2")
|
||||
|
||||
def test_get_user_role(self):
|
||||
add_user(
|
||||
user_id=102,
|
||||
user_name="TestUser3",
|
||||
user_tag="TestTag3",
|
||||
user_role=0,
|
||||
user_stats=30,
|
||||
user_rep=15,
|
||||
)
|
||||
add_user(
|
||||
user_id=103,
|
||||
user_name="TestUser4",
|
||||
user_tag="TestTag4",
|
||||
user_role=1,
|
||||
user_stats=40,
|
||||
user_rep=20,
|
||||
)
|
||||
|
||||
user_role1 = get_user_role(102)
|
||||
self.assertEqual(user_role1, 0)
|
||||
|
||||
user_role2 = get_user_role(103)
|
||||
self.assertEqual(user_role2, 1)
|
||||
|
||||
def test_change_user_name(self):
|
||||
add_user(
|
||||
user_id=104,
|
||||
user_name="OldName1",
|
||||
user_tag="TestTag5",
|
||||
user_role=0,
|
||||
user_stats=50,
|
||||
user_rep=25,
|
||||
)
|
||||
change_user_name(104, "NewName1")
|
||||
updated_user1 = get_user(104)
|
||||
self.assertEqual(updated_user1.user_name, "NewName1")
|
||||
|
||||
add_user(
|
||||
user_id=105,
|
||||
user_name="OldName2",
|
||||
user_tag="TestTag6",
|
||||
user_role=1,
|
||||
user_stats=60,
|
||||
user_rep=30,
|
||||
)
|
||||
change_user_name(105, "NewName2")
|
||||
updated_user2 = get_user(105)
|
||||
self.assertEqual(updated_user2.user_name, "NewName2")
|
||||
|
||||
@classmethod
|
||||
def tearDownClass(cls):
|
||||
cls.database.close()
|
||||
os.system(f"rm {cls.path}") # nosec
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
0
src/ocab_modules/standard/filters/__init__.py
Normal file
0
src/ocab_modules/standard/filters/__init__.py
Normal file
34
src/ocab_modules/standard/filters/filters.py
Normal file
34
src/ocab_modules/standard/filters/filters.py
Normal file
@@ -0,0 +1,34 @@
|
||||
from aiogram import Bot
|
||||
from aiogram.filters import BaseFilter
|
||||
from aiogram.types import Message
|
||||
|
||||
from ocab_core.logger import log
|
||||
from ocab_modules.standard.config.config import get_aproved_chat_id
|
||||
from ocab_modules.standard.roles.roles import Roles
|
||||
|
||||
|
||||
class ChatModerOrAdminFilter(BaseFilter):
|
||||
async def __call__(self, message: Message, bot: Bot) -> bool:
|
||||
user_id = message.from_user.id
|
||||
roles = Roles()
|
||||
admins = await bot.get_chat_administrators(message.chat.id)
|
||||
return (
|
||||
await roles.check_admin_permission(user_id)
|
||||
or await roles.check_moderator_permission(user_id)
|
||||
or any(user_id == admin.user.id for admin in admins)
|
||||
)
|
||||
|
||||
|
||||
class ChatNotInApproveFilter(BaseFilter):
|
||||
async def __call__(self, message: Message, bot: Bot) -> bool:
|
||||
# print("chat_check")
|
||||
await log("chat_check")
|
||||
chat_id = message.chat.id
|
||||
if chat_id in get_aproved_chat_id():
|
||||
# print(f"Chat in approve list: {chat_id}")
|
||||
await log(f"Chat in approve list: {chat_id}")
|
||||
return False
|
||||
else:
|
||||
# print(f"Chat not in approve list: {chat_id}")
|
||||
await log(f"Chat not in approve list: {chat_id}")
|
||||
return True
|
6
src/ocab_modules/standard/filters/info.json
Normal file
6
src/ocab_modules/standard/filters/info.json
Normal file
@@ -0,0 +1,6 @@
|
||||
{
|
||||
"name": "Filters",
|
||||
"description": "Модуль с фильтрами",
|
||||
"author": "OCAB Team",
|
||||
"version": "1.0"
|
||||
}
|
14
src/ocab_modules/standard/info/__init__.py
Normal file
14
src/ocab_modules/standard/info/__init__.py
Normal file
@@ -0,0 +1,14 @@
|
||||
from aiogram import F, Router
|
||||
|
||||
from ocab_core.modules_system.public_api import register_router
|
||||
|
||||
from .handlers import get_chat_info, get_user_info
|
||||
|
||||
|
||||
def module_init():
|
||||
router = Router()
|
||||
|
||||
router.message.register(get_user_info, F.text.startswith("/info"))
|
||||
router.message.register(get_chat_info, F.text.startswith("/chatinfo"))
|
||||
|
||||
register_router(router)
|
83
src/ocab_modules/standard/info/handlers.py
Normal file
83
src/ocab_modules/standard/info/handlers.py
Normal file
@@ -0,0 +1,83 @@
|
||||
# flake8: noqa
|
||||
from typing import Type
|
||||
|
||||
from aiogram import Bot
|
||||
from aiogram.types import Message
|
||||
|
||||
from ocab_core.modules_system.public_api import get_module, log
|
||||
|
||||
from .interfaces import IDbApi, IRoles
|
||||
|
||||
db_api: Type[IDbApi] = get_module(
|
||||
"standard.database",
|
||||
"db_api",
|
||||
)
|
||||
|
||||
Roles: Type[IRoles] = get_module("standard.roles", "Roles")
|
||||
|
||||
|
||||
async def get_info_answer_by_id(message: Message, bot: Bot, user_id: int):
|
||||
ai_model = db_api.get_message_ai_model(message.chat.id, message.message_id)
|
||||
if ai_model is not None:
|
||||
await message.reply(
|
||||
"Это сообщение было сгенерировано ботом используя модель: " + ai_model
|
||||
)
|
||||
return
|
||||
|
||||
if user_id == bot.id:
|
||||
await message.reply("Это сообщение было отправлено ботом")
|
||||
return
|
||||
|
||||
user = db_api.get_user(user_id)
|
||||
|
||||
if user is None:
|
||||
await message.reply("Пользователь не найден")
|
||||
await log(f"Пользователь не найден: {user_id}, {user}")
|
||||
return
|
||||
|
||||
roles = Roles()
|
||||
answer = (
|
||||
f"Пользователь: {user.user_name}\n"
|
||||
f"Роль: {await roles.get_role_name(role_id=user.user_role)}\n"
|
||||
f"Тег: @{user.user_tag}\n"
|
||||
f"Кол-во сообщений: {user.user_stats}\n"
|
||||
f"Репутация: {user.user_rep}"
|
||||
)
|
||||
await message.reply(answer)
|
||||
|
||||
|
||||
async def get_user_info(message: Message, bot: Bot):
|
||||
# Проверяем содержимое сообщения, если содержит вторым элементом тег пользователя, то выводим информацию о нем
|
||||
# Если сообщение отвечает на другое сообщение, то выводим информацию о пользователе, на чье сообщение был ответ
|
||||
# Если это бот то выводим информацию, что это бот и какая модель yandexgpt используется
|
||||
try:
|
||||
if len(message.text.split()) > 1 and message.text.split()[1].startswith("@"):
|
||||
user_tag = message.text.split()[1][1:]
|
||||
user_id = db_api.get_user_id(user_tag)
|
||||
if user_id:
|
||||
await get_info_answer_by_id(message, bot, user_id)
|
||||
else:
|
||||
await message.reply(f"Пользователь с тегом @{user_tag} не найден")
|
||||
elif message.reply_to_message:
|
||||
user_id = message.reply_to_message.from_user.id
|
||||
await get_info_answer_by_id(message, bot, user_id)
|
||||
else:
|
||||
await get_info_answer_by_id(message, bot, message.from_user.id)
|
||||
except Exception as e:
|
||||
await message.reply(
|
||||
"В вашем запросе что-то пошло не так,"
|
||||
" попробуйте запросить информацию о пользователе по его тегу или ответив на его сообщение"
|
||||
)
|
||||
# print(e)
|
||||
await log(e)
|
||||
|
||||
|
||||
async def get_chat_info(message: Message, bot: Bot):
|
||||
answer = (
|
||||
f"*Название чата:* {message.chat.title}\n"
|
||||
f"*ID чата:* `{message.chat.id}`\n \n"
|
||||
f"*Суммарное количество сообщений в чате:* {db_api.get_chat_all_stat(message.chat.id)}\n"
|
||||
f"*Количество пользователей в чате:* {await bot.get_chat_member_count(message.chat.id)}\n"
|
||||
f"*Количество администраторов в чате:* {len(await bot.get_chat_administrators(message.chat.id))}"
|
||||
)
|
||||
await message.reply(answer, parse_mode="MarkdownV2")
|
12
src/ocab_modules/standard/info/info.json
Normal file
12
src/ocab_modules/standard/info/info.json
Normal file
@@ -0,0 +1,12 @@
|
||||
{
|
||||
"id": "standard.info",
|
||||
"name": "Info",
|
||||
"description": "Модуль с информацией",
|
||||
"author": "OCAB Team",
|
||||
"version": "1.0.0",
|
||||
"privileged": false,
|
||||
"dependencies": {
|
||||
"standard.roles": "^1.0.0",
|
||||
"standard.database": "^1.0.0"
|
||||
}
|
||||
}
|
47
src/ocab_modules/standard/info/interfaces.py
Normal file
47
src/ocab_modules/standard/info/interfaces.py
Normal file
@@ -0,0 +1,47 @@
|
||||
from typing import Any
|
||||
|
||||
|
||||
class IRoles:
|
||||
user: str
|
||||
moderator: str
|
||||
admin: str
|
||||
bot: str
|
||||
|
||||
def __init__(self):
|
||||
pass
|
||||
|
||||
async def check_admin_permission(self, user_id: int) -> bool:
|
||||
pass
|
||||
|
||||
async def check_moderator_permission(self, user_id) -> bool:
|
||||
pass
|
||||
|
||||
async def get_role_name(self, role_id) -> str:
|
||||
pass
|
||||
|
||||
async def get_user_permission(self, user_id) -> str | None:
|
||||
pass
|
||||
|
||||
|
||||
class IUsers:
|
||||
user_id: int
|
||||
user_name: str
|
||||
user_role: int
|
||||
user_tag: str
|
||||
user_stats: int
|
||||
user_rep: int
|
||||
|
||||
|
||||
class IDbApi:
|
||||
|
||||
@staticmethod
|
||||
def get_message_ai_model(message_chat_id: Any, message_id: Any) -> Any | None:
|
||||
pass
|
||||
|
||||
@staticmethod
|
||||
def get_user(user_id: int) -> IUsers | None:
|
||||
pass
|
||||
|
||||
@staticmethod
|
||||
def get_chat_all_stat(chat_id: int) -> int:
|
||||
pass
|
117
src/ocab_modules/standard/message_processing/message_api.py
Normal file
117
src/ocab_modules/standard/message_processing/message_api.py
Normal file
@@ -0,0 +1,117 @@
|
||||
# flake8: noqa
|
||||
|
||||
from aiogram import Bot, F, Router, types
|
||||
|
||||
from ocab_core.logger import log
|
||||
from ocab_modules.external.yandexgpt.handlers import answer_to_message
|
||||
from ocab_modules.standard.config.config import (
|
||||
get_aproved_chat_id,
|
||||
get_yandexgpt_in_words,
|
||||
get_yandexgpt_start_words,
|
||||
)
|
||||
from ocab_modules.standard.database.db_api import *
|
||||
|
||||
|
||||
async def chat_check(message: types.Message):
|
||||
# Проверка наличия id чата в базе данных чатов
|
||||
# Если чата нет в базе данных, то проверяем его в наличии в конфиге и если он там есть то добавляем его в БД
|
||||
# Если чат есть в базе данных, то pass
|
||||
if get_chat(message.chat.id) is None:
|
||||
if message.chat.id in get_aproved_chat_id():
|
||||
# print(f"Chat in approve list: {message.chat.id} {message.chat.title}")
|
||||
await log(f"Chat in approve list: {message.chat.id} {message.chat.title}")
|
||||
add_chat(message.chat.id, message.chat.title)
|
||||
# print(f"Chat added: {message.chat.id} {message.chat.title}")
|
||||
await log(f"Chat added: {message.chat.id} {message.chat.title}")
|
||||
else:
|
||||
# print(f"Chat not in approve list: {message.chat.id} {message.chat.title}")
|
||||
await log(
|
||||
f"Chat not in approve list: {message.chat.id} {message.chat.title}"
|
||||
)
|
||||
pass
|
||||
else:
|
||||
# Проверяем обновление названия чата
|
||||
chat = get_chat(message.chat.id)
|
||||
if chat.chat_name != message.chat.title:
|
||||
chat.chat_name = message.chat.title
|
||||
chat.save()
|
||||
# print(f"Chat updated: {message.chat.id} {message.chat.title}")
|
||||
await log(f"Chat updated: {message.chat.id} {message.chat.title}")
|
||||
else:
|
||||
# print(f"Chat already exists: {message.chat.id} {message.chat.title}")
|
||||
await log(f"Chat already exists: {message.chat.id} {message.chat.title}")
|
||||
pass
|
||||
|
||||
|
||||
async def user_check(message: types.Message):
|
||||
# Проверка наличия id пользователя в базе данных пользователей
|
||||
# Если пользователя нет в базе данных, то добавляем его
|
||||
# Если пользователь есть в базе данных, то pass
|
||||
current_user_name = ""
|
||||
if message.from_user.last_name is None:
|
||||
current_user_name = message.from_user.first_name
|
||||
else:
|
||||
current_user_name = (
|
||||
message.from_user.first_name + " " + message.from_user.last_name
|
||||
)
|
||||
|
||||
if get_user(message.from_user.id) is None:
|
||||
add_user(
|
||||
message.from_user.id,
|
||||
message.from_user.first_name,
|
||||
message.from_user.last_name,
|
||||
message.from_user.username,
|
||||
)
|
||||
# print(f"User added: {message.from_user.id} {message.from_user.first_name} {message.from_user.last_name}")
|
||||
await log(f"User added: {message.from_user.id} {current_user_name}")
|
||||
else:
|
||||
# print(f"User already exists: {message.from_user.id} {message.from_user.first_name} {message.from_user.last_name} {message.from_user.username}")
|
||||
await log(
|
||||
f"User already exists: {message.from_user.id} {current_user_name} {message.from_user.username}"
|
||||
)
|
||||
# Проверяем обновление имени пользователя
|
||||
if get_user_name(message.from_user.id) != current_user_name:
|
||||
change_user_name(message.from_user.id, current_user_name)
|
||||
# print(f"User updated: {message.from_user.id} {message.from_user.first_name} {message.from_user.last_name}")
|
||||
await log(f"User name updated: {message.from_user.id} {current_user_name}")
|
||||
# Проверяем обновление username пользователя
|
||||
if get_user_tag(message.from_user.id) != message.from_user.username:
|
||||
change_user_tag(message.from_user.id, message.from_user.username)
|
||||
# print(f"User updated: {message.from_user.id} {message.from_user.username}")
|
||||
await log(
|
||||
f"User tag updated: {message.from_user.id} {message.from_user.username}"
|
||||
)
|
||||
pass
|
||||
|
||||
|
||||
async def add_stats(message: types.Message):
|
||||
# Добавляем пользователю и чату статистику
|
||||
update_chat_all_stat(message.chat.id)
|
||||
update_user_all_stat(message.from_user.id)
|
||||
|
||||
|
||||
async def message_processing(message: types.Message, bot: Bot):
|
||||
await chat_check(message)
|
||||
await user_check(message)
|
||||
await add_stats(message)
|
||||
|
||||
add_message(message)
|
||||
# Если сообщение в начале содержит слово из списка или внутри сообщения содержится слово из списка или сообщение отвечает на сообщение бота
|
||||
|
||||
if (message.text.split(" ")[0] in get_yandexgpt_start_words()) or (
|
||||
any(word in message.text for word in get_yandexgpt_in_words())
|
||||
):
|
||||
# print("message_processing")
|
||||
await log("message_processing")
|
||||
await answer_to_message(message, bot)
|
||||
|
||||
elif message.reply_to_message is not None:
|
||||
if message.reply_to_message.from_user.is_bot:
|
||||
# print("message_processing")
|
||||
await log("message_processing")
|
||||
await answer_to_message(message, bot)
|
||||
|
||||
|
||||
router = Router()
|
||||
# Если сообщение содержит текст то вызывается функция message_processing
|
||||
router.message.register(message_processing, F.text)
|
0
src/ocab_modules/standard/moderation/__init__.py
Normal file
0
src/ocab_modules/standard/moderation/__init__.py
Normal file
6
src/ocab_modules/standard/moderation/info.json
Normal file
6
src/ocab_modules/standard/moderation/info.json
Normal file
@@ -0,0 +1,6 @@
|
||||
{
|
||||
"name": "Moderation",
|
||||
"description": "Moderation commands for OCAB",
|
||||
"author": "OCAB Team",
|
||||
"version": "1.0"
|
||||
}
|
82
src/ocab_modules/standard/moderation/moderation.py
Normal file
82
src/ocab_modules/standard/moderation/moderation.py
Normal file
@@ -0,0 +1,82 @@
|
||||
# flake8: noqa
|
||||
|
||||
import asyncio
|
||||
import time
|
||||
|
||||
import aiogram
|
||||
import aiohttp
|
||||
|
||||
from ...standard.config.config import *
|
||||
from ...standard.roles.roles import *
|
||||
|
||||
|
||||
class Moderation:
|
||||
def __init__(self):
|
||||
access_rights = get_access_rights()
|
||||
bot_check_message = bool(self.access_rights["BOT_CHECK_MESSAGE"])
|
||||
bot_ban_user = bool(self.access_rights["BOT_BAN_USER"])
|
||||
bot_mute_user = bool(self.access_rights["BOT_MUTE_USER"])
|
||||
moderator_rights = self.access_rights["MODERATOR_RIGHTS"]
|
||||
ai_check_message = bool(self.access_rights["AI_CHECK_MESSAGE"])
|
||||
beta_ai_check_message = bool(self.access_rights["BETA_AI_CHECK_MESSAGE"])
|
||||
|
||||
async def time_to_seconds(time):
|
||||
# Конвертация текстового указания времени по типу 3h, 5m, 10s в минуты
|
||||
if time[-1] == "d":
|
||||
return int(time[:-1]) * 86400
|
||||
elif time[-1] == "h":
|
||||
return int(time[:-1]) * 3600
|
||||
elif time[-1] == "m":
|
||||
return int(time[:-1]) * 60
|
||||
elif time[-1] == "s":
|
||||
return int(time[:-1])
|
||||
|
||||
async def short_time_to_time(self, time):
|
||||
# Конвертация времени в длинное название
|
||||
if time[-1] == "d":
|
||||
return str(f"{time[0:-1]} дней")
|
||||
elif time[-1] == "h":
|
||||
return str(f"{time[0:-1]} часов")
|
||||
elif time[-1] == "m":
|
||||
return str(f"{time[0:-1]} минут")
|
||||
elif time[-1] == "s":
|
||||
return str(f"{time[0:-1]} секунд")
|
||||
|
||||
async def delete_message(self, chat_id, message_id, bot: aiogram.Bot):
|
||||
await bot.delete_message(chat_id, message_id)
|
||||
|
||||
async def ban_user(self, chat_id, user_id, bot: aiogram.Bot):
|
||||
await bot.ban_chat_member(chat_id, user_id)
|
||||
|
||||
|
||||
async def mute_user(chat_id, user_id, time, bot: aiogram.Bot):
|
||||
mutePermissions = {
|
||||
"can_send_messages": False,
|
||||
"can_send_audios": False,
|
||||
"can_send_documents": False,
|
||||
"can_send_photos": False,
|
||||
"can_send_videos": False,
|
||||
"can_send_video_notes": False,
|
||||
"can_send_voice_notes": False,
|
||||
"can_send_polls": False,
|
||||
"can_send_other_messages": False,
|
||||
"can_add_web_page_previews": False,
|
||||
"can_change_info": False,
|
||||
"can_invite_users": False,
|
||||
"can_pin_messages": False,
|
||||
"can_manage_topics": False,
|
||||
}
|
||||
end_time = time + int(time.time())
|
||||
await bot.restrict_chat_member(
|
||||
chat_id, user_id, until_date=end_time, **mutePermissions
|
||||
)
|
||||
|
||||
|
||||
async def unmute_user(chat_id, user_id, bot: aiogram.Bot):
|
||||
await bot.restrict_chat_member(
|
||||
chat_id, user_id, use_independent_chat_permissions=True
|
||||
)
|
||||
|
||||
|
||||
async def ban_user(chat_id, user_id, bot: aiogram.Bot):
|
||||
await bot.ban_chat_member(chat_id, user_id)
|
1
src/ocab_modules/standard/roles/__init__.py
Normal file
1
src/ocab_modules/standard/roles/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
from .roles import Roles
|
12
src/ocab_modules/standard/roles/info.json
Normal file
12
src/ocab_modules/standard/roles/info.json
Normal file
@@ -0,0 +1,12 @@
|
||||
{
|
||||
"id": "standard.roles",
|
||||
"name": "Roles",
|
||||
"description": "Модуль для работы с ролями",
|
||||
"author": "OCAB Team",
|
||||
"version": "1.0.0",
|
||||
"privileged": true,
|
||||
"dependencies": {
|
||||
"standard.config": "^1.0.0",
|
||||
"standard.database": "^1.0.0"
|
||||
}
|
||||
}
|
58
src/ocab_modules/standard/roles/roles.py
Normal file
58
src/ocab_modules/standard/roles/roles.py
Normal file
@@ -0,0 +1,58 @@
|
||||
from ocab_core.modules_system.public_api import get_module
|
||||
|
||||
get_user_role = get_module("standard.database", "db_api.get_user_role")
|
||||
config: dict = get_module("standard.config", "config")
|
||||
|
||||
|
||||
class Roles:
|
||||
user = "USER"
|
||||
moderator = "MODERATOR"
|
||||
admin = "ADMIN"
|
||||
bot = "BOT"
|
||||
__roles = config["ROLES"]
|
||||
|
||||
def __init__(self):
|
||||
self.user_role_id = self.__roles[self.user]
|
||||
self.moderator_role_id = self.__roles[self.moderator]
|
||||
self.admin_role_id = self.__roles[self.admin]
|
||||
self.bot_role_id = self.__roles[self.bot]
|
||||
|
||||
async def check_admin_permission(self, user_id):
|
||||
match get_user_role(user_id):
|
||||
case self.admin_role_id:
|
||||
return True
|
||||
case _:
|
||||
return False
|
||||
|
||||
async def check_moderator_permission(self, user_id):
|
||||
match get_user_role(user_id):
|
||||
case self.moderator_role_id:
|
||||
return True
|
||||
case _:
|
||||
return False
|
||||
|
||||
async def get_role_name(self, role_id):
|
||||
match role_id:
|
||||
case self.admin_role_id:
|
||||
return self.admin
|
||||
case self.moderator_role_id:
|
||||
return self.moderator
|
||||
case self.user_role_id:
|
||||
return self.user
|
||||
case self.bot_role_id:
|
||||
return self.bot
|
||||
case _:
|
||||
raise ValueError(f"Нет роли с id={role_id}")
|
||||
|
||||
async def get_user_permission(self, user_id):
|
||||
match get_user_role(user_id):
|
||||
case self.admin_role_id:
|
||||
return self.admin
|
||||
case self.moderator_role_id:
|
||||
return self.moderator
|
||||
case self.user_role_id:
|
||||
return self.user
|
||||
case self.bot_role_id:
|
||||
return self.bot
|
||||
case _:
|
||||
return None
|
0
src/ocab_modules/standard/roles/tests/__init__.py
Normal file
0
src/ocab_modules/standard/roles/tests/__init__.py
Normal file
7
src/ocab_modules/standard/roles/tests/config.yaml
Normal file
7
src/ocab_modules/standard/roles/tests/config.yaml
Normal file
@@ -0,0 +1,7 @@
|
||||
TELEGRAM:
|
||||
TOKEN: xxxxxxxxxxxxxxxxxxxx
|
||||
ROLES:
|
||||
ADMIN: 0
|
||||
MODERATOR: 1
|
||||
USER: 2
|
||||
BOT: 3
|
80
src/ocab_modules/standard/roles/tests/test_roles.py
Normal file
80
src/ocab_modules/standard/roles/tests/test_roles.py
Normal file
@@ -0,0 +1,80 @@
|
||||
import os
|
||||
import unittest
|
||||
|
||||
from ...database.db_api import add_user, connect_database, create_tables
|
||||
from ..roles import Roles
|
||||
|
||||
|
||||
class TestRoles(unittest.IsolatedAsyncioTestCase):
|
||||
database = None
|
||||
path = None
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
cls.roles = Roles()
|
||||
cls.database, cls.path = connect_database(is_test=True, module="roles")
|
||||
create_tables(cls.database)
|
||||
|
||||
add_user(
|
||||
user_id=1,
|
||||
user_name="TestUser1",
|
||||
user_tag="TestTag1",
|
||||
user_role=0,
|
||||
user_stats=30,
|
||||
user_rep=15,
|
||||
)
|
||||
add_user(
|
||||
user_id=2,
|
||||
user_name="TestUser3",
|
||||
user_tag="TestTag3",
|
||||
user_role=1,
|
||||
user_stats=30,
|
||||
user_rep=15,
|
||||
)
|
||||
add_user(
|
||||
user_id=3,
|
||||
user_name="TestUser4",
|
||||
user_tag="TestTag4",
|
||||
user_role=2,
|
||||
user_stats=30,
|
||||
user_rep=15,
|
||||
)
|
||||
add_user(
|
||||
user_id=4,
|
||||
user_name="TestUser2",
|
||||
user_tag="TestTag2",
|
||||
user_role=3,
|
||||
user_stats=30,
|
||||
user_rep=15,
|
||||
)
|
||||
|
||||
async def test_check_admin_permission(cls):
|
||||
cls.assertTrue(await cls.roles.check_admin_permission(1))
|
||||
cls.assertFalse(await cls.roles.check_admin_permission(2))
|
||||
cls.assertFalse(await cls.roles.check_admin_permission(3))
|
||||
cls.assertFalse(await cls.roles.check_admin_permission(4))
|
||||
|
||||
async def test_check_moderator_permission(cls):
|
||||
cls.assertTrue(await cls.roles.check_moderator_permission(2))
|
||||
cls.assertFalse(await cls.roles.check_moderator_permission(0))
|
||||
cls.assertFalse(await cls.roles.check_moderator_permission(1))
|
||||
cls.assertFalse(await cls.roles.check_moderator_permission(3))
|
||||
|
||||
async def test_get_role_name(cls):
|
||||
cls.assertEqual(await cls.roles.get_role_name(cls.roles.admin_role_id), "ADMIN")
|
||||
cls.assertEqual(
|
||||
await cls.roles.get_role_name(cls.roles.moderator_role_id), "MODERATOR"
|
||||
)
|
||||
cls.assertEqual(await cls.roles.get_role_name(cls.roles.user_role_id), "USER")
|
||||
cls.assertEqual(await cls.roles.get_role_name(cls.roles.bot_role_id), "BOT")
|
||||
with cls.assertRaises(ValueError):
|
||||
await cls.roles.get_role_name(999) # Несуществующий ID роли
|
||||
|
||||
@classmethod
|
||||
def tearDownClass(cls):
|
||||
cls.database.close()
|
||||
os.system(f"rm {cls.path}") # nosec
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
0
src/ocab_modules/standard/welcome/__init__.py
Normal file
0
src/ocab_modules/standard/welcome/__init__.py
Normal file
90
src/ocab_modules/standard/welcome/handlers.py
Normal file
90
src/ocab_modules/standard/welcome/handlers.py
Normal file
@@ -0,0 +1,90 @@
|
||||
# flake8: noqa
|
||||
|
||||
import asyncio
|
||||
import random
|
||||
from threading import Thread
|
||||
|
||||
from aiogram import Bot
|
||||
from aiogram.types import Message
|
||||
from aiogram.types import inline_keyboard_button as types
|
||||
from aiogram.utils.keyboard import InlineKeyboardBuilder
|
||||
|
||||
from src.ocab_modules.standard.config.config import get_telegram_check_bot
|
||||
from src.ocab_modules.standard.database.db_api import *
|
||||
from src.ocab_modules.standard.moderation.moderation import (
|
||||
ban_user,
|
||||
mute_user,
|
||||
unmute_user,
|
||||
)
|
||||
from src.ocab_modules.standard.roles.roles import Roles
|
||||
|
||||
|
||||
async def create_math_task():
|
||||
first_number = random.randint(1, 100) # nosec
|
||||
second_number = random.randint(1, 100) # nosec
|
||||
answer = first_number + second_number
|
||||
fake_answers = []
|
||||
for i in range(3):
|
||||
diff = random.randint(1, 10) # nosec
|
||||
diff_sign = random.choice(["+", "-"]) # nosec
|
||||
fake_answers.append(answer + diff if diff_sign == "+" else answer - diff)
|
||||
fake_answers.append(answer)
|
||||
random.shuffle(fake_answers)
|
||||
return [answer, first_number, second_number, fake_answers]
|
||||
|
||||
|
||||
async def ban_user_timer(chat_id: int, user_id: int, time: int, bot: Bot):
|
||||
await asyncio.sleep(time)
|
||||
if get_user(user_id) is not None:
|
||||
pass
|
||||
else:
|
||||
await ban_user()
|
||||
|
||||
|
||||
async def check_new_user(message: Message, bot: Bot):
|
||||
print("check_new_user")
|
||||
if get_telegram_check_bot():
|
||||
# Проверяем наличие пользователя в базе данных
|
||||
|
||||
if get_user(message.from_user.id) is None:
|
||||
# Выдаём пользователю ограничение на отправку сообщений на 3 минуты
|
||||
ban_task = Thread(
|
||||
target=ban_user_timer,
|
||||
args=(message.chat.id, message.from_user.id, 180, bot),
|
||||
)
|
||||
ban_task.start()
|
||||
# Создаём задачу с отложенным выполнением на 3 минуты
|
||||
|
||||
math_task = await create_math_task()
|
||||
text = f"{math_task[1]} + {math_task[2]}"
|
||||
builder = InlineKeyboardBuilder()
|
||||
for answer in math_task[3]:
|
||||
if answer == math_task[0]:
|
||||
builder.add(
|
||||
types.InlineKeyboardButton(
|
||||
text=answer, callback_data=f"check_math_task_true"
|
||||
)
|
||||
)
|
||||
else:
|
||||
builder.add(
|
||||
types.InlineKeyboardButton(
|
||||
text=answer, callback_data=f"check_math_task_false"
|
||||
)
|
||||
)
|
||||
await message.reply(
|
||||
f"Приветствую, {message.from_user.first_name}!\n"
|
||||
f"Для продолжения работы с ботом, пожалуйста, решите математический пример в течении 3х минут:\n"
|
||||
f"*{text}*",
|
||||
reply_markup=builder.as_markup(),
|
||||
)
|
||||
|
||||
|
||||
async def math_task_true(message: Message, bot: Bot):
|
||||
await message.reply(f"Верно! Добро пожаловать в чат {message.from_user.first_name}")
|
||||
await unmute_user(message.chat.id, message.from_user.id, bot)
|
||||
add_user(
|
||||
message.from_user.id,
|
||||
message.from_user.first_name + " " + message.from_user.last_name,
|
||||
message.from_user.username,
|
||||
)
|
||||
pass
|
6
src/ocab_modules/standard/welcome/info.json
Normal file
6
src/ocab_modules/standard/welcome/info.json
Normal file
@@ -0,0 +1,6 @@
|
||||
{
|
||||
"name": "Welcome",
|
||||
"description": "Мо",
|
||||
"author": "OCAB Team",
|
||||
"version": "1.0"
|
||||
}
|
12
src/ocab_modules/standard/welcome/routers.py
Normal file
12
src/ocab_modules/standard/welcome/routers.py
Normal file
@@ -0,0 +1,12 @@
|
||||
from aiogram import F, Router
|
||||
|
||||
from src.ocab_modules.standard.welcome.handlers import check_new_user
|
||||
|
||||
router = Router()
|
||||
|
||||
# Если в чат пришел новый пользователь
|
||||
router.message.register(check_new_user, F.new_chat_members.exists())
|
||||
# Ловин колбеки от кнопок с callback_data=f"check_math_task_true"
|
||||
router.callback_query.register(
|
||||
check_new_user, F.callback_data == "check_math_task_true"
|
||||
)
|
0
src/ocab_modules/standard/welcome/welcome.py
Normal file
0
src/ocab_modules/standard/welcome/welcome.py
Normal file
Reference in New Issue
Block a user