0
0
mirror of https://gitflic.ru/project/maks1ms/ocab.git synced 2025-11-28 10:21:55 +03:00

2 Commits

Author SHA1 Message Date
armatik
e8953b2a15 Удалены лишние директории 2023-11-25 19:44:50 +03:00
armatik
de9a954fe3 Разработка перенесена в ветку OCAB-V2 2023-11-25 19:38:34 +03:00
83 changed files with 1028 additions and 3291 deletions

View File

@@ -1 +0,0 @@
[bandit]

View File

@@ -1,6 +0,0 @@
[flake8]
per-file-ignores =
__init__.py:F401
max-line-length = 88
count = true
extend-ignore = E203,E701

11
.gitignore vendored
View File

@@ -1,11 +0,0 @@
.idea
.vscode
.env
env
.venv
venv
__pycache__
OCAB.db
src/paths.json
src/core/config.yaml
src/core/log/**/*

View File

@@ -1,30 +0,0 @@
# See https://pre-commit.com for more information
# See https://pre-commit.com/hooks.html for more hooks
repos:
- repo: https://github.com/crashappsec/pre-commit-sync
rev: 04b0e02eefa7c41bedca7456ad542e60b67c16c6
hooks:
- id: pre-commit-sync
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v3.2.0
hooks:
- id: trailing-whitespace
- id: end-of-file-fixer
- id: check-yaml
- id: check-added-large-files
- repo: https://github.com/PyCQA/isort
rev: 5.13.2 # sync:isort:poetry.lock
hooks:
- id: isort
- repo: https://github.com/psf/black
rev: 24.4.2 # sync:black:poetry.lock
hooks:
- id: black
- repo: https://github.com/PyCQA/flake8
rev: 7.1.0 # sync:flake8:poetry.lock
hooks:
- id: flake8
- repo: https://github.com/PyCQA/bandit
rev: 1.7.9 # sync:bandit:poetry.lock
hooks:
- id: bandit

View File

@@ -1,58 +1 @@
# OpenChatAiBot V2 # OpenChatAiBot
## Что такое OCAB?
OCAB - это бот для Telegram, который призван помочь во взаимодействии с чатом.
Бот поддерживает интеграцию модулей для расширения функционала.
Фактически бот является платформой для запуска созданных для него модулей.
Модули могут взаимодействовать друг с другом или быть полностью независимыми.
## Что такое модуль?
Модуль - это директория, которая содержит в себе код модуля и его конфигурацию.
### Структура модуля
*Будет дополнено после закрытия [issue #17](https://gitflic.ru/project/armatik/ocab/issue/17).*
## Стандартные модули
В стандартный состав бота входят следующие модули:
* `admin` - модуль для модерирования чата. Позволяет удалять сообщения, банить пользователей и т.д.
* `reputation` - модуль репутации пользователей. Позволяет оценивать ответы пользователей и накапливать репутацию.
* `welcome` - модуль приветствия новых пользователей. Позволяет приветствовать новых пользователей в чате, а также
проверять пользователя капчей для предотвращения спама.
* `roles` - модуль ролей. Позволяет назначать пользователям роли и ограничивать доступ к командам бота по ролям.
Является важной частью системы прав доступа и модуля `admin`.
## Дополнительные официальные модули
* `gpt` - модуль для генерации ответов на основе нейросети GPT-3.5. Позволяет боту отвечать на сообщения
пользователей, используя нейросеть. Ключевой особенностью является построение линии контекста для нейросети,
которая позволяет боту отвечать на вопросы, используя контекст предыдущих сообщений. Для этого используется
модуль база данных хранящий историю сообщений.
* `bugzilla` - модуль для интеграции с BugZilla. Позволяет получать уведомления о новых багах в BugZilla, отслеживать их
статус, формировать стандартизированные сообщения для корректного описания багов. В будущем планируется интеграция с
API BugZilla для возможности создания багов из чата.
* `alt_packages` - модуль для интеграции с AltLinux Packages. Позволяет получать уведомления о новых пакетах в репозитории
AltLinux, поиска пакетов по названию, получения истории изменений, команды для установки пакета из репозитория и
прочей информации о пакете.
* `notes` - модуль заметок. Позволяет сохранять заметки для пользователей и чатов. Заметки являются ссылками на
сообщения в чате.
Список модулей будет пополняться. Идеи для модулей можно оставлять в [issues](https://gitflic.ru/project/armatik/ocab/issue/create).
## Установка бота
### Docker
### Вручную
## Технологический стек
* Python 3.11.6 или выше - основной язык программирования.
* SQLite 3 - база данных для хранения информации о чате и пользователях.
* [Poetry](https://gitflic.ru/project/armatik/ocab/blob?file=how-to%20install%20deps.md&branch=OCAB-V2) - менеджер зависимостей.
* aiogram 3 - библиотека для работы с Telegram API.
* peewee - ORM для работы с базой данных.

View File

@@ -1,47 +0,0 @@
## Poetry
### Установка с официального сайта
```shell
curl -sSL https://install.python-poetry.org | python3 -
```
### Установка с PyPi
```shell
python3 -m pip install poetry
```
Доп информация:https://www.8host.com/blog/ustanovka-menedzhera-zavisimostej-poetry/
## Зависимости
### Добавление зависимости
```shell
poetry add NAME
```
`NAME` - название зависимости.
### Установка зависимостей
```shell
poetry install
```
### Обновление зависимостей
```shell
poetry update
```
## Виртуальное окружение
### Создание/активация
```shell
poetry shell
```
### Настройка
Хранить окружение внутри проекта
```shell
poetry config virtualenvs.in-project true
```

15
init.py
View File

@@ -1,15 +0,0 @@
from json import dumps
from pathlib import Path
pwd = Path().cwd()
dir_core = pwd / "src" / "core"
dir_modules_standard = pwd / "src" / "modules" / "standard"
dir_modules_custom = pwd / "src" / "modules" / "custom"
json = {
"core": str(dir_core),
"modules standard": str(dir_modules_standard),
"modules custom": str(dir_modules_custom),
}
with open("src/paths.json", "w", encoding="utf8") as f:
f.write(dumps(json, indent=4))

1235
poetry.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -1,41 +0,0 @@
[tool.poetry]
name = "ocab"
version = "0.1.0"
description = ""
license = "GPL-3.0-only"
authors = ["Семён Фомченков <s.fomchenkov@yandex.ru>"]
maintainers = [
"Илья Женецкий <ilya_zhenetskij@vk.com>",
"qualimock <qualimock@yandex.ru>",
"Кирилл Уницаев fiersik.kouji@yandex.ru",
]
readme = "README.md"
repository = "https://gitflic.ru/project/armatik/ocab"
[tool.poetry.dependencies]
python = "^3.11.6"
aiogram = "^3.2.0"
peewee = "^3.17.0"
pyyaml = "^6.0.1"
requests = "^2.31.0"
[tool.poetry.group.dev.dependencies]
flake8 = "^7.1.0"
black = "^24.4.2"
isort = "^5.13.2"
bandit = "^1.7.9"
pre-commit = "^3.7.1"
[tool.black]
line-length = 88
[tool.isort]
profile = "black"
line_length = 88
multi_line_output = 3
skip_gitignore = true
[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"

3
requirements.txt Normal file
View File

@@ -0,0 +1,3 @@
aiogram==3.1.1
openai==0.27.8
PyYAML==6.0.1

View File

@@ -1,4 +0,0 @@
#! /bin/sh
cd src
python -m unittest discover -v
cd ..

View File

@@ -0,0 +1,198 @@
import sqlite3
import os
import configparser
from openai import OpenAIError
import time
mother_path = os.path.dirname(os.path.dirname(os.getcwd()))
config = configparser.ConfigParser()
config.read(os.path.join(mother_path, 'src/config.ini'))
database = sqlite3.connect(os.path.join(mother_path, 'DataBase/OCAB_DB.db'))
cursor = database.cursor()
reply_ignore = config['Telegram']['reply_ignore'].split('| ')
reply_ignore = list(map(int, reply_ignore))
#print(reply_ignore)
min_token_for_answer = int(config['Openai']['min_token_for_answer'])
# Импорт библиотек
import openai
max_token_count = int(config['Openai']['max_token_count'])
# Создание файла лога если его нет
if not os.path.exists(os.path.join(mother_path, 'src/OpenAI/GPT35turbo/log.txt')):
with open(os.path.join(mother_path, 'src/OpenAI/GPT35turbo/log.txt'), 'w') as log_file:
log_file.write('')
def openai_response(message_formated_text):
# Запуск OpenAI
# Считаем размер полученного текста
#print(message_formated_text)
count_length = 0
if len(message_formated_text) == 0:
message_formated_text = [
{
"role": "user",
"content": "Напиши короткий ответ говорящий что контекст сообщения слишком длинный и попроси задать вопрос отдельно без ответа на другие сообщения по ключевому слову"
}
]
for message in message_formated_text:
#print(message["content"])
count_length += int(len(message["content"]))
#print(count_length)
try:
response = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=message_formated_text,
max_tokens=max_token_count - count_length
)
except OpenAIError as ex:
if 'on requests per min. Limit: 3 / min. Please try again' in str(ex):
response = ('Извини мой процессор перегрелся, дай мне минутку отдохнуть')
elif 'Bad gateway.' in str(ex):
response = (
'Ой, где я? Кажется кто то перерзал мой интернет кабель, подожди немного пока я его починю')
#запись ошибки в лог с указанием времени и даты
with open(os.path.join(mother_path, 'src/OpenAI/GPT35turbo/log.txt'), 'a') as log_file:
log_file.write('\n' + time.strftime("%d.%m.%Y %H:%M:%S") + ' ' + str(ex))
return response
def sort_message_from_user(message_formated_text, message_id):
# print(int(*(
# cursor.execute("SELECT message_sender FROM message_list WHERE message_id = ?", (message_id,)).fetchone())))
if int(*(
cursor.execute("SELECT message_sender FROM message_list WHERE message_id = ?",
(message_id,)).fetchone())) == 0:
message_formated_text.append({
"role": "assistant",
"content": str(*(cursor.execute("SELECT message_text FROM message_list WHERE message_id = ?",
(message_id,)).fetchone()))
})
else:
message_formated_text.append({
"role": "user",
"content": str(*(cursor.execute("SELECT message_text FROM message_list WHERE message_id = ?",
(message_id,)).fetchone()))
})
#Проверка что длина всех сообщений в кортеже не превышает max_token_count-min_token_for_answer
return message_formated_text
def openai_collecting_message(message_id, message_formated_text):
# собирает цепочку сообщений для OpenAI длинной до max_token_count
# проверяем что сообщение отвечает на другое сообщение
#print(int(*(cursor.execute("SELECT answer_id FROM message_list WHERE message_id = ?", (message_id,)).fetchone())))
#print(reply_ignore)
if int(*(cursor.execute("SELECT answer_id FROM message_list WHERE message_id = ?", (message_id,)).fetchone())) not in reply_ignore:
# Продолжаем искать ответы на сообщения
#print(int(*(cursor.execute("SELECT answer_id FROM message_list WHERE message_id = ?", (message_id,)).fetchone())))
message_formated_text = openai_collecting_message(int(*(cursor.execute("SELECT answer_id FROM message_list WHERE message_id = ?", (message_id,)).fetchone())), message_formated_text)
#Проверяем ID отправителя сообщения, если 0 то это сообщение от бота
sort_message_from_user(message_formated_text, message_id)
else:
# Проверяем ID отправителя сообщения, если 0 то это сообщение от бота
sort_message_from_user(message_formated_text, message_id)
return message_formated_text
def openai_message_processing(message_id):
#проверяем на наличие сообщения в базе данных
if cursor.execute("SELECT message_text FROM message_list WHERE message_id = ?", (message_id,)).fetchone() is None:
return None
else:
# проверяем на то что сообщение влезает в max_token_count с учётом message_formated_text
message_formated_text = [
{
"role": "system",
"content": config['Openai']['story_model']
}
]
if ((len(str(cursor.execute("SELECT message_text FROM message_list WHERE message_id")))) < (max_token_count - len(message_formated_text[0]['content']))):
message_formated_text = openai_collecting_message(message_id, message_formated_text)
count_length = 0
# Обработка невозможности ответить на сообщение
try:
for message in message_formated_text:
count_length += len(message['content'])
while count_length > max_token_count - min_token_for_answer:
message_formated_text.pop(1)
count_length = 0
for message in message_formated_text:
count_length += len(message['content'])
except IndexError:
message_formated_text = [
{
"role": "system",
"content": "Выведи сообщение об ошибке."
}
]
response = openai_response(message_formated_text)
return response
else:
return f"Сообщение слишком длинное, максимальная длина сообщения \
{max_token_count - len(message_formated_text[0]['content'])} символов, укоротите его на \
{len(str(cursor.execute('SELECT message_text FROM message_list WHERE message_id'))) - max_token_count} символов"
def openai_collecting_history_context(start_id, end_id, message_formated_text, message_id = 0):
# собираем список сообщений для OpenAI длинной до 14500 символов начиная с end_id и заканчивая start_id
if message_id == 0:
message_id = end_id
message_formated_text = openai_collecting_history_context(start_id, end_id, message_formated_text, message_id)
elif message_id > start_id:
message_formated_text = openai_collecting_history_context(start_id, end_id, message_formated_text, (message_id - 1))
try:
message_formated_text.append({
"role": "user",
"content": str(*(cursor.execute("SELECT message_text FROM message_list WHERE message_id = ?",
(message_id,)).fetchone()))
})
except Exception:
pass
return message_formated_text
def openai_message_history_processing(start_id, end_id):
message_formated_text = []
message_formated_text = openai_collecting_history_context(start_id, end_id, message_formated_text)
max_token_count_history = 15000
min_token_for_answer_history = 1500
count_length = 0
try:
for message in message_formated_text:
count_length += len(message['content'])
while count_length > max_token_count_history - min_token_for_answer_history:
message_formated_text.pop(1)
count_length = 0
for message in message_formated_text:
count_length += len(message['content'])
except IndexError:
message_formated_text = [
{
"role": "system",
"content": "Выведи сообщение об ошибке."
}
]
message_formated_text.append({
"role": "user",
"content": "Сделай пересказ всей истории сообщений без потери смысла от 3-его лица."
})
try:
response = openai.ChatCompletion.create(
model="gpt-3.5-turbo-16k",
messages=message_formated_text,
max_tokens=max_token_count_history - count_length
)
except OpenAIError as ex:
if 'on requests per min. Limit: 3 / min. Please try again' in str(ex):
response = ('Извини мой процессор перегрелся, дай мне минутку отдохнуть')
elif 'Bad gateway.' in str(ex):
response = (
'Ой, где я? Кажется кто то перерзал мой интернет кабель, подожди немного пока я его починю')
# запись ошибки в лог с указанием времени и даты
with open(os.path.join(mother_path, 'src/OpenAI/GPT35turbo/log.txt'), 'a') as log_file:
log_file.write('\n' + time.strftime("%d.%m.%Y %H:%M:%S") + ' ' + str(ex))
return response

View File

@@ -0,0 +1,8 @@
# Получение сообщений в чате, и запись их в базу данных
from aiogram import types
from src.TelegramBot.main import dp
from main import cursor, config
#импортировать функцию для обработки сообщений из OpenAI

566
src/TelegramBot/main.py Normal file
View File

@@ -0,0 +1,566 @@
# Импорт библиотек
import os
from contextlib import suppress
import openai
import configparser
import sqlite3
import asyncio
import sys
import datetime
from time import mktime
from aiogram import Bot, Dispatcher, executor, types
from aiogram.contrib.fsm_storage.memory import MemoryStorage
from aiogram.utils.exceptions import MessageCantBeDeleted, MessageToDeleteNotFound
mother_path = os.path.dirname(os.path.dirname(os.getcwd()))
sys.path.insert(1, mother_path)
# Импорт переменных из файла .ini
config = configparser.ConfigParser()
config.read(os.path.join(mother_path, 'src/config.ini'))
TOKEN = config['Telegram']['token']
OPENAI_API_KEY = (config['Openai']['api_key'])
bot_trigger_front = (config['Telegram']['bot_trigger_front']).split('|')
bot_trigger_all = (config['Telegram']['bot_trigger_all']).split('|')
# удаление лишних элементов массивов
bot_trigger_front.remove('')
bot_trigger_all.remove('')
DB_message_limit = int(config['DataBase']['message_limit'])
# Инициализация бота
bot = Bot(token=TOKEN)
dp = Dispatcher(bot, storage=MemoryStorage())
# Инициализация API OpenAI
openai.api_key = OPENAI_API_KEY
# Инициализация базы данных OCAB_DB в папке DataBase/OCAB_DB.db
# Создаём базу данных sqlite3 по пути /home/armatik/PycharmProjects/OpenChatAiBot/DataBase/OCAB_DB.db
database = sqlite3.connect(os.path.join(mother_path, 'DataBase/OCAB_DB.db'))
cursor = database.cursor()
# Создаём таблицу chat_list
cursor.execute("""CREATE TABLE IF NOT EXISTS chat_list (
chat_id INTEGER PRIMARY KEY,
chat_role INTEGER NOT NULL,
chat_stats INTEGER NOT NULL
)""")
# Создаём таблицу message_list
cursor.execute("""CREATE TABLE IF NOT EXISTS message_list (
message_id INTEGER PRIMARY KEY,
message_text TEXT NOT NULL,
message_sender INTEGER NOT NULL,
answer_id INTEGER
)""")
# Создаём таблицу user_list
cursor.execute("""CREATE TABLE IF NOT EXISTS user_list (
user_id INTEGER PRIMARY KEY,
user_name TEXT NOT NULL,
user_role INTEGER,
user_stats INTEGER
user_rep INTEGER
)""")
#запись информации о чате в базу данных
async def empty_role(id):
cursor.execute("UPDATE user_list SET user_role = ? WHERE user_id = ?", (0, id))
database.commit()
async def check(id):
#проверка что у человека есть роль
user_role = cursor.execute("SELECT user_role FROM user_list WHERE user_id = ?", (id,)).fetchone()[0]
if user_role not in [0, 1, 2]:
await empty_role(id)
async def get_role_name(rolenum):
rolenum = int(rolenum)
if rolenum == 0:
role = config['Roles']['user']
elif rolenum == 1:
role = config['Roles']['moderator']
elif rolenum == 2:
role = config['Roles']['admin']
return role
async def get_role(id):
#получение роли пользователя
user_role = cursor.execute("SELECT user_role FROM user_list WHERE user_id = ?", (id,)).fetchone()[0]
return await get_role_name(user_role)
async def check_admin(id, chat_id):
#Проверка что человек есть в списке администраторов чата
chat_admins = await bot.get_chat_administrators(chat_id)
flag = False
for admin in chat_admins:
if admin.user.id == id:
flag = True
return flag
async def check_moderator(id):
#Проверка что человек имеет роль модератора
if cursor.execute("SELECT user_role FROM user_list WHERE user_id = ?", (id,)).fetchone()[0] >= 1:
return True
else:
return False
async def check_user(id):
#Проверка что человек имеет роль пользователя
if cursor.execute("SELECT user_role FROM user_list WHERE user_id = ?", (id,)).fetchone()[0] == 0:
return True
else:
return False
async def save_message(message):
#Сохранение сообщения в базу данных
cursor.execute("INSERT INTO message_list VALUES (?, ?, ?, ?)", (message.message_id, message.text, message.from_user.id, 0))
if message.reply_to_message is not None:
cursor.execute("UPDATE message_list SET answer_id = ? WHERE message_id = ?", (message.reply_to_message.message_id, message.message_id))
#Если отправитель не зарегистрирован в базе данных, то добавляем его
if cursor.execute("SELECT user_id FROM user_list WHERE user_id = ?", (message.from_user.id,)).fetchone() is None:
cursor.execute("INSERT INTO user_list VALUES (?, ?, ?, ?)", (message.from_user.id, message.from_user.username, 0, 1))
#Добавляем статистику в чат и в данные пользователя, если пользователь не бот.
cursor.execute("UPDATE chat_list SET chat_stats = chat_stats + 1 WHERE chat_id = ?", (message.chat.id,))
cursor.execute("UPDATE user_list SET user_stats = user_stats + 1 WHERE user_id = ?", (message.from_user.id,))
database.commit()
async def time_to_seconds(time):
#Конвертация текстового указания времени по типу 3h, 5m, 10s в минуты
if time[-1] == 'd':
return int(time[:-1])*86400
elif time[-1] == 'h':
return int(time[:-1])*3600
elif time[-1] == 'm':
return int(time[:-1])*60
elif time[-1] == 's':
return int(time[:-1])
async def short_time_to_time(time):
#Конвертация времени в длинное название
if time[-1] == 'd':
return str(f"{time[0:-1]} дней")
elif time[-1] == 'h':
return str(f"{time[0:-1]} часов")
elif time[-1] == 'm':
return str(f"{time[0:-1]} минут")
elif time[-1] == 's':
return str(f"{time[0:-1]} секунд")
@dp.message_handler(commands=['mute'])
async def mute(message: types.Message):
#Проверка что отправитель является администратором чата
try:
if await check_moderator(message.from_user.id):
#Проверка отвечает ли сообщение на другое сообщение
if message.reply_to_message is not None:
time = message.text.split(' ')[1]
#получаем id отправителя сообщение на которое отвечает message
target_id = message.reply_to_message.from_user.id
target_name = cursor.execute("SELECT user_name FROM user_list WHERE user_id = ?", (target_id,)).fetchone()[0]
#Проверка, что человек пользователь
if cursor.execute("SELECT user_role FROM user_list WHERE user_id = ?", (target_id,)).fetchone()[0] == 0:
#ограничения прав пользователя по отправке сообщений на time секунд
time_sec = await time_to_seconds(message.text.split(' ')[1])
if time_sec <= 30 or time_sec >= 31536000:
await message.reply("Время мута должно быть больше 30 секунд и меньше 365 дней")
return
date_string = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
date_time = datetime.datetime.strptime(date_string, "%Y-%m-%d %H:%M:%S")
unix_time = int(mktime(date_time.timetuple()))
await bot.restrict_chat_member(message.chat.id, target_id, until_date=unix_time+time_sec, permissions=types.ChatPermissions(can_send_messages=False))
await message.reply(
f"Пользователь {target_name} замьючен на {await short_time_to_time(time)}")
else:
await message.reply(f"Пользователь [{target_name}](tg://user?id={target_id}) является {await get_role(target_id)} и не может быть замьючен",
parse_mode='Markdown')
return
else:
target_tag = message.text.split(' ')[1]
target_tag = target_tag[1:]
target_id = int(cursor.execute("SELECT user_id FROM user_list WHERE user_name = ?", (target_tag,)).fetchone()[0])
target_name = cursor.execute("SELECT user_name FROM user_list WHERE user_id = ?", (target_id,)).fetchone()[0]
#ограничения прав пользователя по отправке сообщений на time секунд
time_mute = message.text.split(' ')[2]
if cursor.execute("SELECT user_role FROM user_list WHERE user_id = ?", (target_id,)).fetchone()[0] == 0:
#ограничения прав пользователя по отправке сообщений на time секунд
time_sec = await time_to_seconds(message.text.split(' ')[2])
if time_sec <= 30 or time_sec >= 31536000:
await message.reply("Время мута должно быть больше 30 секунд и меньше 365 дней")
return
date_string = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
date_time = datetime.datetime.strptime(date_string, "%Y-%m-%d %H:%M:%S")
unix_time = int(mktime(date_time.timetuple()))
await bot.restrict_chat_member(message.chat.id, target_id, until_date=unix_time+time_sec, permissions=types.ChatPermissions(can_send_messages=False))
await message.reply(
f"Пользователь {target_name} замьючен на {await short_time_to_time(time_mute)}.")
else:
await message.reply(f"Пользователь [{target_name}](tg://user?id={target_id}) является {await get_role(target_id)} и не может быть замьючен",
parse_mode="Markdown")
return
except:
await message.reply("Ошибка данных. Возможно пользователь ещё ничего не написал.")
@dp.message_handler(commands=['unmute'])
async def unmute(message: types.Message):
chat_id= message.chat.id
if await check_moderator(message.from_user.id):
if message.reply_to_message is not None:
target_id = message.reply_to_message.from_user.id
target_name = cursor.execute("SELECT user_name FROM user_list WHERE user_id = ?", (target_id,)).fetchone()[0]
await bot.restrict_chat_member(message.chat.id, target_id, until_date=0, permissions=types.ChatPermissions(can_send_messages=True))
await message.reply(
f"Пользователь [{target_name}](tg://user?id={target_id}) размьючен",
parse_mode="Markdown")
else:
target_tag = message.text.split(' ')[1]
target_tag = target_tag[1:]
target_id = int(cursor.execute("SELECT user_id FROM user_list WHERE user_name = ?", (target_tag,)).fetchone()[0])
target_name = cursor.execute("SELECT user_name FROM user_list WHERE user_id = ?", (target_id,)).fetchone()[0]
#получаем стандартные права чата
if cursor.execute("SELECT user_role FROM user_list WHERE user_id = ?", (target_id,)).fetchone()[0] == 0:
#возвращаем пользователю стандартные права как у новых пользователей
chat_permisson = await bot.get_chat(chat_id)
await bot.restrict_chat_member(message.chat.id, target_id, until_date=0, permissions=chat_permisson.permissions)
await message.reply(
f"Пользователь [{target_name}](tg://user?id={target_id}) размьючен",
parse_mode="Markdown")
#обрабатываем вход пользователя в чат
@dp.message_handler(content_types=['new_chat_members'])
async def new_chat_members(message: types.Message):
#проверяем что пользователь не бот
if not message.new_chat_members[0].is_bot:
#Заносим пользователя в базу данных
cursor.execute("INSERT INTO user_list VALUES (?, ?, ?, ?)", (message.new_chat_members[0].id, message.new_chat_members[0].username, 0, 0))
@dp.message_handler(commands=['chatinfo'])
async def chat_info(message: types.Message):
#Выводит информацию о чате. Название, количество пользователей, количество администраторов, количество модераторов, количество сообщений.
if await check_moderator(message.from_user.id):
chat_id = message.chat.id
chat_title = message.chat.title
chat_members = await bot.get_chat_members_count(chat_id)
chat_admins = await bot.get_chat_administrators(chat_id)
chat_moderators = cursor.execute("SELECT COUNT(user_id) FROM user_list WHERE user_role = 1").fetchone()[0]
chat_messages = cursor.execute("SELECT chat_stats FROM chat_list WHERE chat_id = ?", (chat_id,)).fetchone()[0]
await message.reply(
f"<b>Название чата:</b> {chat_title}\n"
f"<b>Количество пользователей:</b> {chat_members}\n"
f"<b>Количество администраторов:</b> {len(chat_admins)}\n"
f"<b>Количество модераторов:</b> {chat_moderators}\n"
f"<b>Количество сообщений:</b> {chat_messages}",
parse_mode="HTML")
await top10(message)
else:
await message.reply(
f"У вас недостаточно прав для выполнения этой команды.")
@dp.message_handler(commands=['start'])
async def start(message: types.Message):
#проверка что чат есть в базе данных
if cursor.execute("SELECT chat_id FROM chat_list WHERE chat_id = ?", (message.chat.id,)).fetchone() is None:
chat_id = message.chat.id
chat_role = 0
chat_stats = 1
cursor.execute("INSERT INTO chat_list VALUES (?, ?, ?)", (chat_id, chat_role, chat_stats))
database.commit()
await message.reply(
f"{config['Telegram']['start_answer']}",
parse_mode="Markdown")
else:
await message.reply(
f"Чат уже инициализирован.",
parse_mode="Markdown")
# Проверка наличия столбца имени пользователя в базе данных, если его нет, то добавить.
try:
cursor.execute("SELECT user_name FROM user_list")
except sqlite3.OperationalError:
cursor.execute("ALTER TABLE user_list ADD COLUMN user_name TEXT")
database.commit()
await message.reply("База данных пользователей реструктурирована.")
# Проверка наличия столбца репутации пользователя в базе данных, если его нет, то добавить.
try:
cursor.execute("SELECT user_rep FROM user_list")
except sqlite3.OperationalError:
cursor.execute("ALTER TABLE user_list ADD COLUMN user_rep INTEGER")
database.commit()
await message.reply("База данных пользователей реструктурирована.")
@dp.message_handler(commands=['top10'])
async def top10(message: types.Message):
#топ 10 пользователей по количеству сообщений в user_stats в формате: Имя пользователя - количество сообщений
top10 = cursor.execute("SELECT user_id, user_stats FROM user_list ORDER BY user_stats DESC LIMIT 10").fetchall()
top10_message = ''
for user in top10:
username = (cursor.execute("SELECT user_name FROM user_list WHERE user_id = ?", (user[0],)).fetchone())[0]
if username is None:
username = "Аноним"
top10_message += f"{username} - {user[1]}\n"
#в начале сообщения берём текст из config.ini и вставляем в него топ 10 пользователей
await message.reply(
f"{config['Telegram']['top10_answer']}\n{top10_message}")
@dp.message_handler(commands=['aboutme'])
async def aboutme(message: types.Message):
your_id = message.from_id
await check(your_id)
your_name = message.from_user.username
user_stats = cursor.execute("SELECT user_stats FROM user_list WHERE user_id = ?", (message.from_user.id,)).fetchone()
rolenum = cursor.execute("SELECT user_role FROM user_list WHERE user_id = ?", (message.from_user.id,)).fetchone()
user_rep = cursor.execute("SELECT user_rep FROM user_list WHERE user_id = ?", (message.from_user.id,)).fetchone()
if "None" in str(user_rep):
user_rep = 0
cursor.execute("UPDATE user_list SET user_rep = ? WHERE user_id = ?", (user_rep, your_id))
role = await get_role_name(rolenum[0])
#Имя пользователя на следующей строке статистика сообщений на следующей строке права пользователя
#если пользователь есть в списке администраторов чата, то выдаём роль администратор
chat_admins = await bot.get_chat_administrators(message.chat.id)
default_for_admin = int(config['Roles']['default_for_admin'])
no_answer = False
if rolenum[0] < default_for_admin:
for admin in chat_admins:
if admin.user.id == your_id:
if default_for_admin == 0:
await message.reply(
f"<b>Вы обнаружены в списке администраторов чата!</b> \n"
f"<b>Вам будет выдана роль:</b> {config['Roles']['user']}\n"
f"<b>Имя:</b> <a href=\"tg://user?id={str(your_id)}\">{your_name}</a>\n"
f"<b>Кол-во сообщений:</b> {user_stats[0]}\n"
f"<b>Репутация:</b> {user_rep[0]}\n"
f"<b>Роль:</b> {role}",
parse_mode="HTML")
no_answer = True
elif default_for_admin == 1:
await message.reply(
f"<b>Вы обнаружены в списке администраторов чата!</b> \n"
f"<b>Вам будет выдана роль:</b> {config['Roles']['moderator']}\n"
f"<b>Имя:</b> <a href=\"tg://user?id={str(your_id)}\">{your_name}</a>\n"
f"<b>Кол-во сообщений:</b> {user_stats[0]}\n"
f"<b>Репутация:</b> {user_rep[0]}\n"
f"<b>Роль:</b> {role}",
parse_mode="HTML")
no_answer = True
elif default_for_admin == 2:
await message.reply(
f"<b>Вы обнаружены в списке администраторов чата!</b> \n"
f"<b>Вам будет выдана роль:</b> {config['Roles']['admin']}\n"
f"<b>Имя:</b> <a href=\"tg://user?id={str(your_id)}\">{your_name}</a>\n"
f"<b>Кол-во сообщений:</b> {user_stats[0]}\n"
f"<b>Репутация:</b> {user_rep[0]}\n"
f"<b>Роль:</b> {role}",
parse_mode="HTML")
no_answer = True
cursor.execute("UPDATE user_list SET user_role = ? WHERE user_id = ?", (default_for_admin, your_id))
database.commit()
if no_answer == False:
await message.reply(
f"<b>Имя:</b> <a href=\"tg://user?id={str(your_id)}\">{your_name}</a>\n"
f"<b>Кол-во сообщений:</b> {user_stats[0]}\n"
f"<b>Репутация:</b> {user_rep[0]}\n"
f"<b>Роль:</b> {role}",
parse_mode="HTML")
@dp.message_handler(commands=['setrole'])
async def setrole(message: types.Message):
try:
user_role = cursor.execute("SELECT user_role FROM user_list WHERE user_id = ?", (message.from_user.id,)).fetchone()
user_name = message.from_user.username
target_name = message.text.split()[1]
target_name = target_name[1:]
target_role = cursor.execute("SELECT user_role FROM user_list WHERE user_name = ?", (target_name,)).fetchone()[0]
user_id = cursor.execute("SELECT user_id FROM user_list WHERE user_name = ?", (target_name,)).fetchone()
user_id = user_id[0]
except:
await message.reply("Пользователь не найден!")
return
if await check_admin(user_id, message.chat.id) and target_role == 2:
await message.reply("Вы не можете изменить роль этому пользователю!")
else:
if user_role[0] == 2:
try:
#Получаем id пользователя по нику в базе данных
role = message.text.split()[2]
if role == "0" or role == config['Roles']['user']:
role = config['Roles']['user']
rolenum = 0
elif role == "1" or role == config['Roles']['moderator']:
role = config['Roles']['moderator']
rolenum = 1
elif role == "2" or role == config['Roles']['admin']:
role = config['Roles']['admin']
rolenum = 2
cursor.execute("UPDATE user_list SET user_role = ? WHERE user_id = ?", (rolenum, user_id))
database.commit()
await message.reply(
f"Пользователю [{target_name}](tg://user?id={str(user_id)}) выдана роль: {role}",
parse_mode="Markdown")
except:
await message.reply(
f"Ошибка! Проверьте правильность написания команды.",
parse_mode="Markdown")
else:
await message.reply(
f"Ошибка! У вас нет прав на использование этой команды.",
parse_mode="Markdown")
@dp.message_handler(commands=['about'])
async def about(message: types.Message):
# проверка что в сообщении есть тег пользователя
if len(message.text.split()) == 2:
try:
target_name = message.text.split()[1]
target_name = target_name[1:]
target_id = cursor.execute("SELECT user_id FROM user_list WHERE user_name = ?", (target_name,)).fetchone()
target_id = target_id[0]
user_rep = cursor.execute("SELECT user_rep FROM user_list WHERE user_id = ?", (target_id,)).fetchone()[0]
if "None" in str(user_rep):
user_rep = 0
cursor.execute("UPDATE user_list SET user_rep = ? WHERE user_id = ?", (user_rep, target_id))
await check(target_id)
except:
await message.reply(
f"Ошибка! Проверьте правильность написания тега пользователя.",
parse_mode="Markdown")
user_stats = cursor.execute("SELECT user_stats FROM user_list WHERE user_id = ?", (target_id,)).fetchone()[0]
user_role = cursor.execute("SELECT user_role FROM user_list WHERE user_id = ?", (target_id,)).fetchone()[0]
user_role = await get_role_name(user_role)
await message.reply(
f"<b>Имя:</b> {target_name}\n"
f"<b>Кол-во сообщений:</b> {user_stats}\n"
f"<b>Репутация:</b> {user_rep}\n"
f"<b>Роль:</b> {user_role}", parse_mode="HTML")
else:
await message.reply(
f"Ошибка! Проверьте правильность написания команды.")
from src.OpenAI.GPT35turbo.OA_processing import openai_message_history_processing
@dp.message_handler(commands=['history'])
async def history(message: types.Message):
if message.reply_to_message is not None:
# Получаем id сообщения на которое отвечает message
start_message_id = message.reply_to_message.message_id
# Получаем id сообщения message
end_message_id = message.message_id
elif message.text.split()[1] != '':
end_message_id = message.message_id
start_message_id = end_message_id - int(message.text.split()[1])
else:
await message.reply("Ошибка! Проверьте правильность написания команды.")
return
temp_message = await message.reply("Подождите немного, я обрабатываю историю сообщений... Функция ЭКСПЕРЕМЕНТАЛЬНАЯ "
"и может работать некорректно.")
# Получаем историю сообщений
response = openai_message_history_processing(start_message_id, end_message_id)
if response is None:
await message.reply("Я не понял тебя, попробуй перефразировать")
else:
bot_message_id = await message.reply(response['choices'][0]['message']['content'], parse_mode="markdown")
# заносим сообщение в базу данных в качестве message_id мы пишем id сообщения в bot_message_id
cursor.execute("INSERT INTO message_list VALUES (?, ?, ?, ?)",
(bot_message_id.message_id, response['choices'][0]['message']['content'], 0, message.message_id))
# очищаем сообщение
asyncio.create_task(delete_message(temp_message, 0))
from src.OpenAI.GPT35turbo.OA_processing import openai_message_processing
async def delete_message(message: types.Message, sleep_time: int = 0):
await asyncio.sleep(sleep_time)
with suppress(MessageCantBeDeleted, MessageToDeleteNotFound):
await message.delete()
@dp.message_handler()
async def in_message(message: types.Message):
chat_id = message.chat.id
# Получение сообщений в чате, и запись их в базу данных
if (message.chat.type == "private" or message.chat.type == "channel"):
await message.reply(
f"{config['Telegram']['private_answer']}",
parse_mode="Markdown")
elif (message.chat.type != "group" or message.chat.type != "supergroup") and \
message.text != '' and message.text != ' ' and \
(cursor.execute("SELECT chat_role FROM chat_list WHERE chat_id;") == 1): return None
else:
# Запись сообщения в базу данных
await save_message(message)
# Обработка сообщения OpenAI
send_answer = False
typing_mode = False
# импортируем массив триггеров из файла .ini
if message.reply_to_message and message.reply_to_message.from_user.id == (await bot.me).id:
send_answer = True
typing_mode = True
for trigger in bot_trigger_all:
if trigger.lower() in message.text.lower():
send_answer = True
typing_mode = False
for trigger in bot_trigger_front:
if message.text.lower().startswith(trigger.lower()):
send_answer = True
typing_mode = False
if send_answer == False:
# Если сообщение отвечает на другое сообщение, то проверяем наличие слова спасибо
if message.reply_to_message is not None:
if message.text.startswith("Спасибо"):
target_id = message.reply_to_message.from_user.id
target_name = cursor.execute("SELECT user_name FROM user_list WHERE user_id = ?", (target_id,)).fetchone()[0]
cursor.execute("UPDATE user_list SET user_rep = user_rep + 1 WHERE user_id = ?", (target_id,))
database.commit()
await message.reply(
f"Спасибо, [{target_name}](tg://user?id={str(target_id)}) за ваш ответ!",
parse_mode="Markdown")
if send_answer:
if typing_mode is False:
your_id = message.from_id
your_name = message.from_user.username
temp_msg = await message.reply(
f"[{your_name}](tg://user?id={str(your_id)}), Подожди немного и я обязательно отвечу тебе!",
parse_mode="Markdown")
# Пишем что бот печатает
await bot.send_chat_action(message.chat.id, "typing")
# Получаем ответ от OpenAI
response = openai_message_processing(message.message_id)
if response is None:
bot_message_id = await message.reply("Я не понял тебя, попробуй перефразировать")
if typing_mode is False:
asyncio.create_task(delete_message(temp_msg, 0))
# заносим сообщение в базу данных в качестве message_id пишем id сообщения которое отправил бот
cursor.execute("INSERT INTO message_list VALUES (?, ?, ?, ?)",
(bot_message_id, "Я не понял тебя, попробуй перефразировать", 0, message.message_id))
else:
bot_message_id = await message.reply(response['choices'][0]['message']['content'], parse_mode="markdown")
if typing_mode is False:
asyncio.create_task(delete_message(temp_msg, 0))
# заносим сообщение в базу данных в качестве message_id мы пишем id сообщения в bot_message_id
cursor.execute("INSERT INTO message_list VALUES (?, ?, ?, ?)",
(bot_message_id.message_id, response['choices'][0]['message']['content'], 0, message.message_id))
# очищаем базу данных от старых сообщений
cursor.execute("DELETE FROM message_list WHERE message_id < ?", (bot_message_id.message_id - DB_message_limit,))
database.commit()
if __name__ == '__main__':
executor.start_polling(dp, skip_updates=True)

View File

@@ -1,2 +0,0 @@
import src.core
import src.service

43
src/config.ini Normal file
View File

@@ -0,0 +1,43 @@
[Telegram]
token=****
admin_password="Test_pass"
# Пока не используется
# Массивы заполнять через запятую. Пример: test|test2 |test3,
bot_trigger_front=
# Живой пример: Арма |Армат |Арма, |
bot_trigger_all=
# Живой пример: @arma_ai_bot |помогите |
private_answer=
# Живой пример: Я не понимаю тебя, но я могу поговорить с тобой в группе [название группы](https://t.me/) и ещё в некоторых других группах
reply_ignore=0
# По умолчанию: 0
# Содержит в себе все id топиков чата для чатов с форумным типом, если не заполнить контекст бота СЛОМАЕТСЯ!
# Пример заполнения для одного из чатов: 0| 643885| 476959| 1| 476977| 633077| 630664| 476966| 634567
start_answer= Привет! Я OCAB, открытый чат бот с ИИ для вашего чата!
top10_answer= Вот топ 10 самых разговорчивых пользователей чата:
[Roles]
user= Пользователь
moderator= Модератор
admin= Администратор
default_for_admin= 2
[Openai]
api_key=****
chat_model=gpt-3.5-turbo
story_model=
# Тут должен быть текст истории бота, но я его не показываю)))
max_token_count=4000
# максимальное количество токенов в сообщении
min_token_for_answer=800
# минимальное количество токенов в сообщении ответа бота (чем больше, тем более длинный ответ ГАРАНТИРОВАН)
[DataBase]
message_limit=3000
# Максимальное количество сообщений в базе данных
[AI_Dungeon]
use=openai
# "openai" or "YaGPT" Пока не используется

23
src/config.yaml Normal file
View File

@@ -0,0 +1,23 @@
TELEGRAM:
TOKEN: 1234567890ABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890
PRIVATE_ANSWER:
BOT:
ANSWER:
HELP_ANSWER:
START_ANSWER:
ABOUT_ANSWER:
ROLES:
ADMIN:
MODERATOR:
USER:
DEFAULT_FOR_ADMIN:
DATABASE:
MESSAGE_LIMIT:
GPT_MODULE:
TRIGGER_FRONT:
TRIGGER_ALL:
REPLY_IGNORE:

1
src/core.py Normal file
View File

@@ -0,0 +1 @@

View File

@@ -1,21 +0,0 @@
TELEGRAM:
TOKEN: xxxxxxxxxxxxxxxxxxxxxxxxx
APPROVED_CHAT_ID: "-123456789 | -012345678"
ADMINCHATID: -12345678
DEFAULT_CHAT_TAG: "@alt_gnome_chat"
CHECK_BOT: True
YANDEXGPT:
TOKEN: xxxxxxxxxxxxxxxxxxxxxxxxx
TOKEN_FOR_REQUEST: 8000
TOKEN_FOR_ANSWER: 2000
CATALOGID: xxxxxxxxxxxxxxxxxxxxxxxxx
PROMPT: "Ты чат-бот ..."
STARTWORD: "Бот| Бот, | бот | бот,"
INWORD: "помогите | не работает"
ROLES:
ADMIN: 2
MODERATOR: 1
USER: 0
BOT: 3

View File

@@ -1,29 +0,0 @@
import logging
import os
import time
def setup_logger():
"""
Настройка логирования
"""
current_date = time.strftime("%d-%m-%Y")
log_dir = os.path.join(os.path.dirname(__file__), "log")
os.makedirs(log_dir, exist_ok=True)
log_file = os.path.join(log_dir, f"log-{current_date}.log")
logging.basicConfig(
filename=log_file,
level=logging.INFO,
format="%(asctime)s %(message)s",
datefmt="%H:%M:%S",
)
async def log(message):
"""
Функция для логирования сообщений
Она асинхронная, хотя таковой на самом деле не является.
"""
logging.info(message)

View File

@@ -1,35 +0,0 @@
import asyncio
from aiogram import Bot, Dispatcher
from routers import include_routers
from src.core.logger import log, setup_logger
from src.modules.standard.config.config import get_telegram_token
from src.modules.standard.database.db_api import connect_database, create_tables
async def main():
bot = None
database = None
setup_logger()
try:
bot = Bot(token=get_telegram_token())
database, path = connect_database()
database.connect()
create_tables(database)
dp = Dispatcher()
await include_routers(dp)
await dp.start_polling(bot)
except Exception as e:
log(e)
finally:
if bot is not None:
await bot.session.close()
if database is not None:
database.close()
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -1,17 +0,0 @@
from aiogram import Dispatcher
from src.modules.standard.admin.routers import router as admin_router
from src.modules.standard.info.routers import router as info_router
from src.modules.standard.message_processing.message_api import (
router as process_message,
)
async def include_routers(dp: Dispatcher):
"""
Подключение роутеров в бота
dp.include_router()
"""
dp.include_router(info_router)
dp.include_router(admin_router)
dp.include_router(process_message)

21
src/modules/Config.py Normal file
View File

@@ -0,0 +1,21 @@
import os
from yaml import load, Loader
class Config:
def __init__(self):
self.bot_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.getcwd())))
self.config_dir = os.path.join(self.bot_dir, 'config.yaml')
def get_config(self):
return load(open(self.config_dir), Loader=Loader)
def get_bot_token(self):
return self.get_config()['BOT']['TOKEN']
def get_roles(self):
return self.get_config()['ROLES']
def get_bot_answers(self):
return self.get_config()['BOT']['ANSWER']

131
src/modules/DataBase.py Normal file
View File

@@ -0,0 +1,131 @@
import os
import sqlite3
class DataBase:
def __init__(self):
self.bot_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.getcwd())))
self.db = sqlite3.connect(os.path.join(self.bot_dir, 'DataBase/OCAB_DB.db'))
self.cursor = self.db.cursor()
# Проверки наличия таблиц в базе данных
def check_db(self):
self.check_chatdb()
self.check_userdb()
self.check_messagedb()
def check_chatdb(self):
self.cursor.execute("""SELECT name FROM sqlite_master WHERE type='table' AND name='chat_list'""")
if self.cursor.fetchone() is None:
self.create_chatdb()
def check_userdb(self):
self.cursor.execute("""SELECT name FROM sqlite_master WHERE type='table' AND name='user_list'""")
if self.cursor.fetchone() is None:
self.create_userdb()
def check_messagedb(self):
self.cursor.execute("""SELECT name FROM sqlite_master WHERE type='table' AND name='message_list'""")
if self.cursor.fetchone() is None:
self.create_messagedb()
# Создание таблиц в базе данных
def create_chatdb(self):
self.cursor.execute("""CREATE TABLE IF NOT EXISTS chat_list (
chat_id INTEGER PRIMARY KEY,
chat_role INTEGER NOT NULL,
chat_stats INTEGER NOT NULL,
chat_federation INTEGER,
)""")
self.db.commit()
def create_userdb(self):
self.cursor.execute("""CREATE TABLE IF NOT EXISTS user_list (
user_id INTEGER PRIMARY KEY,
user_name TEXT NOT NULL,
user_tag TEXT,
user_role INTEGER,
user_stats INTEGER
user_rep INTEGER
)""")
self.db.commit()
def create_messagedb(self):
self.cursor.execute("""CREATE TABLE IF NOT EXISTS message_list (
message_id INTEGER PRIMARY KEY,
message_text TEXT NOT NULL,
message_id_sender INTEGER NOT NULL,
answer_to_id INTEGER
)""")
self.db.commit()
async def add_chat(self, chat_id, chat_role, chat_stats=0, chat_federation=0):
self.cursor.execute("""INSERT INTO chat_list VALUES (?, ?, ?, ?)""",
(chat_id, chat_role, chat_stats, chat_federation))
async def add_user(self, user_id, user_name, user_tag=None, user_role=0, user_stats=0, user_rep=0):
self.cursor.execute("""INSERT INTO user_list VALUES (?, ?, ?, ?, ?, ?)""",
(user_id, user_name, user_tag, user_role, user_stats, user_rep))
self.db.commit()
async def add_message(self, message_id, message_text, message_sender, answer_id):
self.cursor.execute("""INSERT INTO message_list VALUES (?, ?, ?, ?)""",
(message_id, message_text, message_sender, answer_id))
self.db.commit()
async def get_chat(self, chat_id):
self.cursor.execute("""SELECT * FROM chat_list WHERE chat_id=?""", (chat_id,))
return self.cursor.fetchone()
async def get_user(self, user_id):
self.cursor.execute("""SELECT * FROM user_list WHERE user_id=?""", (user_id,))
return self.cursor.fetchone()
async def get_user_role(self, user_id):
self.cursor.execute("""SELECT user_role FROM user_list WHERE user_id=?""", (user_id,))
return self.cursor.fetchone()
async def get_message(self, message_id):
self.cursor.execute("""SELECT * FROM message_list WHERE message_id=?""", (message_id,))
return self.cursor.fetchone()
async def change_user_name(self, user_id, new_user_name):
self.cursor.execute("""UPDATE user_list SET user_name=? WHERE user_id=?""", (new_user_name, user_id))
self.db.commit()
async def change_user_role(self, user_id, new_user_role):
self.cursor.execute("""UPDATE user_list SET user_role=? WHERE user_id=?""", (new_user_role, user_id))
self.db.commit()
async def change_user_stats(self, user_id, new_user_stats):
self.cursor.execute("""UPDATE user_list SET user_stats=? WHERE user_id=?""", (new_user_stats, user_id))
self.db.commit()
async def change_user_rep(self, user_id, new_user_rep):
self.cursor.execute("""UPDATE user_list SET user_rep=? WHERE user_id=?""", (new_user_rep, user_id))
self.db.commit()
async def change_chat_role(self, chat_id, new_chat_role):
self.cursor.execute("""UPDATE chat_list SET chat_role=? WHERE chat_id=?""", (new_chat_role, chat_id))
self.db.commit()
async def change_chat_stats(self, chat_id, new_chat_stats):
self.cursor.execute("""UPDATE chat_list SET chat_stats=? WHERE chat_id=?""", (new_chat_stats, chat_id))
self.db.commit()
async def change_chat_federation(self, chat_id, new_chat_federation):
self.cursor.execute("""UPDATE chat_list SET chat_federation=? WHERE chat_id=?""",
(new_chat_federation, chat_id))
self.db.commit()
async def update_user_stats(self):
self.cursor.execute("""UPDATE user_list SET user_stats=user_stats+1""")
self.db.commit()
async def update_user_rep(self):
self.cursor.execute("""UPDATE user_list SET user_rep=user_rep+1""")
self.db.commit()
async def update_chat_stats(self):
self.cursor.execute("""UPDATE chat_list SET chat_stats=chat_stats+1""")
self.db.commit()

33
src/modules/Roles.py Normal file
View File

@@ -0,0 +1,33 @@
from Config import Config
from DataBase import DataBase
class Roles:
def __init__(self):
self.DB = DataBase()
self.Config = Config()
self.user_role_name = self.Config.get_roles()['USER']
self.moderator_role_name = self.Config.get_roles()['MODERATOR']
self.admin_role_name = self.Config.get_roles()['ADMIN']
async def check_admin_permission(self, user_id):
if await self.DB.get_user_role(user_id) == 2:
return True
else:
return False
async def check_moderator_permission(self, user_id):
if await self.DB.get_user_role(user_id) == 1:
return True
else:
return False
async def get_role_name(self, role_number):
if role_number == 0:
return self.user_role_name
elif role_number == 1:
return self.moderator_role_name
elif role_number == 2:
return self.admin_role_name
else:
return None

View File

@@ -1 +0,0 @@
from . import yandexgpt

View File

@@ -1,28 +0,0 @@
# flake8: noqa
import asyncio
from aiogram import Bot
from aiogram.types import Message
from src.core.logger import log
from src.modules.external.yandexgpt.yandexgpt import *
from src.modules.standard.config.config import (
get_yandexgpt_catalog_id,
get_yandexgpt_prompt,
get_yandexgpt_token,
)
from src.modules.standard.database.db_api import add_message
async def answer_to_message(message: Message, bot: Bot):
# print("answer_to_message")
await log("answer_to_message")
yagpt = YandexGPT(get_yandexgpt_token(), get_yandexgpt_catalog_id())
text = message.text
prompt = get_yandexgpt_prompt()
# response = await yagpt.async_yandexgpt(system_prompt=prompt, input_messages=text)
response = await yagpt.yandexgpt_request(
chat_id=message.chat.id, message_id=message.message_id, type="yandexgpt"
)
reply = await message.reply(response, parse_mode="Markdown")
add_message(reply, message_ai_model="yandexgpt")

View File

@@ -1,6 +0,0 @@
{
"name": "YandexGPT",
"description": "Модуль для работы с Yandex GPT",
"author": "OCAB Team",
"version": "1.0"
}

View File

@@ -1,10 +0,0 @@
# flake8: noqa
from aiogram import F, Router
from src.modules.external.yandexgpt.handlers import answer_to_message
router = Router()
# Если сообщение содержит в начале текст "Гномик" или "гномик" или отвечает на сообщение бота, то вызывается функция answer_to_message
router.message.register(
answer_to_message, F.text.startswith("Гномик") | F.text.startswith("гномик")
)

View File

@@ -1,292 +0,0 @@
# flake8: noqa
import asyncio
import json
import aiohttp
import requests
from src.core.logger import log
from ...standard.config.config import *
from ...standard.database import *
class YandexGPT:
token = None
catalog_id = None
languages = {
"ru": "русский язык",
"en": "английский язык",
"de": "немецкий язык",
"uk": "украинский язык",
"es": "испанский язык",
"be": "белорусский язык",
}
def __init__(self, token, catalog_id):
self.token = token
self.catalog_id = catalog_id
async def async_request(self, url, headers, prompt) -> dict:
async with aiohttp.ClientSession() as session:
async with session.post(url, headers=headers, json=prompt) as response:
return await response.json()
async def async_token_check(
self, messages, gpt, max_tokens, stream, temperature, del_msg_id=1
):
url = "https://llm.api.cloud.yandex.net/foundationModels/v1/tokenizeCompletion"
headers = {
"Content-Type": "application/json",
"Authorization": f"Api-Key {self.token}",
}
answer_token = get_yandexgpt_token_for_answer()
while True:
try:
request = {
"modelUri": gpt,
"completionOptions": {
"stream": stream,
"temperature": temperature,
"maxTokens": max_tokens,
},
"messages": messages,
}
response = await self.async_request(
url=url, headers=headers, prompt=request
)
except Exception as e: # TODO: Переделать обработку ошибок
# print(e)
await log(f"Error: {e}")
continue
if int(len(response["tokens"])) < (max_tokens - answer_token):
break
else:
try:
messages.pop(del_msg_id)
except IndexError:
Exception("IndexError: list index out of range")
return messages
async def async_yandexgpt_lite(
self,
system_prompt,
input_messages,
stream=False,
temperature=0.6,
max_tokens=8000,
):
url = "https://llm.api.cloud.yandex.net/foundationModels/v1/completion"
gpt = f"gpt://{self.catalog_id}/yandexgpt-lite/latest"
headers = {
"Content-Type": "application/json",
"Authorization": f"Api-Key {self.token}",
}
messages = [{"role": "system", "text": system_prompt}]
for message in input_messages:
messages.append(message)
messages = await self.async_token_check(messages, gpt, max_tokens)
prompt = {
"modelUri": gpt,
"completionOptions": {
"stream": stream,
"temperature": temperature,
"maxTokens": max_tokens,
},
"messages": messages,
}
response = requests.post(url, headers=headers, json=prompt).text # nosec
return json.loads(response)["result"]["alternatives"][0]["message"]["text"]
async def async_yandexgpt(
self,
system_prompt,
input_messages,
stream=False,
temperature=0.6,
max_tokens=get_yandexgpt_token_for_request(),
):
url = "https://llm.api.cloud.yandex.net/foundationModels/v1/completion"
gpt = f"gpt://{self.catalog_id}/yandexgpt/latest"
headers = {
"Content-Type": "application/json",
"Authorization": f"Api-Key {self.token}",
}
messages = []
messages.append({"role": "system", "text": system_prompt})
for message in input_messages:
messages.append(message)
messages = await self.async_token_check(
messages, gpt, max_tokens, stream, temperature
)
request = {
"modelUri": gpt,
"completionOptions": {
"stream": stream,
"temperature": temperature,
"maxTokens": max_tokens,
},
"messages": messages,
}
response = await self.async_request(
url=url, headers=headers, prompt=request
) # nosec
return response["result"]["alternatives"][0]["message"]["text"]
async def async_yandexgpt_translate(self, input_language, output_language, text):
input_language = self.languages[input_language]
output_language = self.languages[output_language]
return await self.async_yandexgpt(
f"Переведи на {output_language} сохранив оригинальный смысл текста. Верни только результат:",
[{"role": "user", "text": text}],
stream=False,
temperature=0.6,
max_tokens=8000,
)
async def async_yandexgpt_spelling_check(self, input_language, text):
input_language = self.languages[input_language]
return await self.async_yandexgpt(
f"Проверьте орфографию и пунктуацию текста на {input_language}. Верни исправленный текст "
f"без смысловых искажений:",
[{"role": "user", "text": text}],
stream=False,
temperature=0.6,
max_tokens=8000,
)
async def async_yandexgpt_text_history(
self, input_messages, stream=False, temperature=0.6, max_tokens=8000
):
url = "https://llm.api.cloud.yandex.net/foundationModels/v1/completion"
gpt = f"gpt://{self.catalog_id}/summarization/latest"
headers = {
"Content-Type": "application/json",
"Authorization": f"Api-Key {self.token}",
}
messages = []
for message in input_messages:
messages.append(message)
messages = await self.async_token_check(messages, gpt, max_tokens, del_msg_id=0)
prompt = {
"modelUri": gpt,
"completionOptions": {
"stream": stream,
"temperature": temperature,
"maxTokens": max_tokens,
},
"messages": messages,
}
response = requests.post(url, headers=headers, json=prompt).text # nosec
return json.loads(response)["result"]["alternatives"][0]["message"]["text"]
async def async_yandex_cloud_text_to_speech(
self, text, voice, emotion, speed, format, quality
):
tts = "tts.api.cloud.yandex.net/speech/v1/tts:synthesize"
# TODO: Сделать функцию TTS
return 0
async def async_yandex_cloud_vision(self, image, features, language):
# TODO: Сделать функцию Vision
return 0
async def collect_messages(self, message_id, chat_id):
messages = []
# Собираем цепочку сообщений в формате: [{"role": "user", "text": "<Имя_пользователя>: Привет!"},
# {"role": "assistant", "text": "Привет!"}]
while True:
message = db_api.get_message_text(chat_id, message_id)
if db_api.get_message_ai_model(chat_id, message_id) != None:
messages.append({"role": "assistant", "text": message})
else:
sender_name = db_api.get_user_name(
db_api.get_message_sender_id(chat_id, message_id)
)
messages.append({"role": "user", "text": sender_name + ": " + message})
message_id = db_api.get_answer_to_message_id(chat_id, message_id)
if message_id is None:
break
return list(reversed(messages))
async def collecting_messages_for_history(
self, start_message_id, end_message_id, chat_id
):
messages = []
# Собираем цепочку сообщений в формате: [{"role": "user", "text": "<Имя_пользователя>: Привет!"},
# {"role": "assistant", "text": "Привет!"}]
while True:
message = db_api.get_message_text(chat_id, start_message_id)
if db_api.get_message_ai_model(chat_id, start_message_id) != None:
messages.append({"role": "assistant", "text": message})
else:
sender_name = db_api.get_user_name(
db_api.get_message_sender_id(chat_id, start_message_id)
)
messages.append({"role": "user", "text": sender_name + ": " + message})
start_message_id -= 1
if start_message_id <= end_message_id:
break
return messages.reverse()
async def yandexgpt_request(
self,
message_id=None,
type="yandexgpt-lite",
chat_id=None,
message_id_end=None,
input_language=None,
output_language=None,
text=None,
):
if type == "yandexgpt-lite":
messages = await self.collect_messages(message_id, chat_id)
return await self.async_yandexgpt_lite(
system_prompt=get_yandexgpt_prompt(),
input_messages=messages,
stream=False,
temperature=0.6,
max_tokens=8000,
)
elif type == "yandexgpt":
# print("yandexgpt_request")
await log("yandexgpt_request")
messages = await self.collect_messages(message_id, chat_id)
return await self.async_yandexgpt(
system_prompt=get_yandexgpt_prompt(),
input_messages=messages,
stream=False,
temperature=0.6,
max_tokens=get_yandexgpt_token_for_request(),
)
elif type == "yandexgpt-translate":
return await self.async_yandexgpt_translate(
input_language,
output_language,
text=db_api.get_message_text(chat_id, message_id),
)
elif type == "yandexgpt-spelling-check":
return await self.async_yandexgpt_spelling_check(
input_language, text=db_api.get_message_text(chat_id, message_id)
)
elif type == "yandexgpt-text-history":
messages = await self.collect_messages_for_history(
message_id, message_id_end, chat_id
)
return await self.async_yandexgpt_text_history(
messages=messages, stream=False, temperature=0.6, max_tokens=8000
)
else:
return "Ошибка: Неизвестный тип запроса | Error: Unknown request type"

View File

@@ -1 +0,0 @@
from . import config, database, exceptions, roles

View File

@@ -1 +0,0 @@
from . import routers

View File

@@ -1,56 +0,0 @@
# flake8: noqa
import time
from aiogram import Bot
from aiogram.types import Message
from src.modules.standard.config.config import get_default_chat_tag
async def delete_message(message: Message, bot: Bot):
reply_message_id = message.reply_to_message.message_id
await bot.delete_message(message.chat.id, reply_message_id)
async def error_access(message: Message, bot: Bot):
await message.reply("Вы не админ/модератор")
async def get_chat_id(message: Message, bot: Bot):
await message.reply(
f"ID данного чата: `{message.chat.id}`", parse_mode="MarkdownV2"
)
async def chat_not_in_approve_list(message: Message, bot: Bot):
await message.reply(
f"Бот недоступен в данном чате, пожалуйста,"
f" обратитесь к администратору для добавления чата в список доступных или перейдите в чат "
f"{get_default_chat_tag()}"
)
await get_chat_id(message, bot)
async def mute_user(chat_id: int, user_id: int, time: int, bot: Bot):
# *, can_send_messages: bool | None = None, can_send_audios: bool | None = None, can_send_documents: bool | None = None, can_send_photos: bool | None = None, can_send_videos: bool | None = None, can_send_video_notes: bool | None = None, can_send_voice_notes: bool | None = None, can_send_polls: bool | None = None, can_send_other_messages: bool | None = None, can_add_web_page_previews: bool | None = None, can_change_info: bool | None = None, can_invite_users: bool | None = None, can_pin_messages: bool | None = None, can_manage_topics: bool | None = None, **extra_data: Any)
mutePermissions = {
"can_send_messages": False,
"can_send_audios": False,
"can_send_documents": False,
"can_send_photos": False,
"can_send_videos": False,
"can_send_video_notes": False,
"can_send_voice_notes": False,
"can_send_polls": False,
"can_send_other_messages": False,
"can_add_web_page_previews": False,
"can_change_info": False,
"can_invite_users": False,
"can_pin_messages": False,
"can_manage_topics": False,
}
end_time = time + int(time.time())
await bot.restrict_chat_member(
chat_id, user_id, until_date=end_time, **mutePermissions
)

View File

@@ -1,6 +0,0 @@
{
"name": "Admin",
"description": "Модуль для работы с админкой",
"author": "OCAB Team",
"version": "1.0"
}

View File

@@ -1,24 +0,0 @@
# flake8: noqa
from aiogram import F, Router
from src.modules.standard.admin.handlers import (
chat_not_in_approve_list,
delete_message,
error_access,
get_chat_id,
)
from src.modules.standard.filters.filters import (
ChatModerOrAdminFilter,
ChatNotInApproveFilter,
)
router = Router()
# Если сообщение содержит какой либо текст и выполняется фильтр ChatNotInApproveFilter, то вызывается функция chat_not_in_approve_list
router.message.register(chat_not_in_approve_list, ChatNotInApproveFilter(), F.text)
router.message.register(get_chat_id, ChatModerOrAdminFilter(), F.text == "/chatID")
router.message.register(delete_message, ChatModerOrAdminFilter(), F.text == "/rm")
router.message.register(error_access, F.text == "/rm")
router.message.register(error_access, F.text == "/chatID")

View File

@@ -1 +0,0 @@
from . import config

View File

@@ -1,75 +0,0 @@
# flake8: noqa
import yaml
from ....service import paths
def get_config(is_test: bool = False) -> dict:
if is_test:
path = f"{paths.modules_standard}/config/tests"
else:
path = paths.core
path = f"{path}/config.yaml"
with open(path, "r") as file:
return yaml.full_load(file)
config = get_config()
def get_telegram_token() -> str:
return config["TELEGRAM"]["TOKEN"]
def get_telegram_check_bot() -> bool:
return config["TELEGRAM"]["CHECK_BOT"]
def get_aproved_chat_id() -> list:
# Возваращем сплитованный список id чатов в формате int
return [
int(chat_id) for chat_id in config["TELEGRAM"]["APPROVED_CHAT_ID"].split(" | ")
]
def get_user_role_name(role_number) -> dict:
# Возвращаем название роли пользвателя по номеру роли, если такой роли нет, возвращаем неизвестно
return config["ROLES"].get(role_number, "Неизвестно")
def get_default_chat_tag() -> str:
return config["TELEGRAM"]["DEFAULT_CHAT_TAG"]
def get_yandexgpt_token() -> str:
return config["YANDEXGPT"]["TOKEN"]
def get_yandexgpt_catalog_id() -> str:
return config["YANDEXGPT"]["CATALOGID"]
def get_yandexgpt_prompt() -> str:
return config["YANDEXGPT"]["PROMPT"]
def get_yandexgpt_start_words() -> list:
return config["YANDEXGPT"]["STARTWORD"].split(" | ")
def get_yandexgpt_in_words() -> list:
return config["YANDEXGPT"]["INWORD"].split(" | ")
def get_yandexgpt_token_for_request() -> int:
return config["YANDEXGPT"]["TOKEN_FOR_REQUEST"]
def get_yandexgpt_token_for_answer() -> int:
return config["YANDEXGPT"]["TOKEN_FOR_ANSWER"]
def get_access_rights() -> dict:
return get_config()["ACCESS_RIGHTS"]

View File

@@ -1,6 +0,0 @@
{
"name": "Config YAML",
"description": "Модуль для работы с конфигурационным файлом бота (YAML)",
"author": "OCAB Team",
"version": "1.0"
}

View File

@@ -1,7 +0,0 @@
TELEGRAM:
TOKEN: xxxxxxxxxxxxxxxxxxxx
ROLES:
ADMIN: 0
MODERATOR: 1
USER: 2
BOT: 3

View File

@@ -1,45 +0,0 @@
import unittest
from src.modules.standard.config.config import get_config
yaml_load = get_config(is_test=True)
class TestConfig(unittest.TestCase):
def test_yaml_load_correctness(self):
self.assertIsNotNone(yaml_load)
self.assertIn("TELEGRAM", yaml_load)
self.assertIn("TOKEN", yaml_load["TELEGRAM"])
self.assertIn("ROLES", yaml_load)
self.assertIn("ADMIN", yaml_load["ROLES"])
self.assertIn("MODERATOR", yaml_load["ROLES"])
self.assertIn("USER", yaml_load["ROLES"])
self.assertIn("BOT", yaml_load["ROLES"])
def test_yaml_keys_existence(self):
self.assertTrue(all(key in yaml_load for key in ["TELEGRAM", "ROLES"]))
self.assertIn("TOKEN", yaml_load["TELEGRAM"])
self.assertTrue(
all(role in yaml_load["ROLES"] for role in ["ADMIN", "MODERATOR", "USER"])
)
def test_yaml_yaml_load_types(self):
self.assertIsInstance(yaml_load["TELEGRAM"]["TOKEN"], str)
self.assertTrue(
all(
isinstance(yaml_load["ROLES"][role], int)
for role in ["ADMIN", "MODERATOR", "USER"]
)
)
def test_yaml_values(self):
expected_token = "xxxxxxxxxxxxxxxxxxxx" # nosec
expected_role_values = {"ADMIN": 0, "MODERATOR": 1, "USER": 2, "BOT": 3}
self.assertEqual(yaml_load["TELEGRAM"]["TOKEN"], expected_token)
for role, value in expected_role_values.items():
self.assertEqual(yaml_load["ROLES"][role], value)
if __name__ == "__main__":
unittest.main()

View File

@@ -1,48 +0,0 @@
## Модуль DataBase
Модуль DataBase предназначен для ведения и работы с базами данных OCAB.
Модуль содержит в себе следующие таблицы:
* `Chats` - таблица для хранения информации о чатах.
* `Users` - таблица для хранения информации о пользователях.
* `Messages` - таблица для хранения информации о сообщениях.
* `ChatStats` - таблица для хранения статистики чатов по дням.
* `UserStats` - таблица для хранения статистики пользователей по дням.
руктура таблицы `Chats`:
* `chat_id` - идентификатор чата.
* `chat_name` - название чата.
* `chat_type` - тип чата. (0 - Чат администраторов, 1 - Пользовательский чат, 3 - Чат разрешённых личных запросов к боту
10 - Не инициализированный чат)
* `chat_stats` - количество всех отправленных сообщений в чате.
руктура таблицы `Users`:
* `user_id` - идентификатор пользователя telegram.
* `user_tag` - тег пользователя telegram.
* `user_name` - имя пользователя telegram.
* `user_role` - роль пользователя в чате. (0 - Администратор, 1 - Модератор, 2 - Пользователь)
* `user_stats` - количество всех отправленных сообщений пользователем.
* `user_rep` - репутация пользователя.
руктура таблицы `Messages`:
* `message_chat_id` - идентификатор чата в котором отправлено сообщение.
* `message_id` - идентификатор сообщения.
* `messag_sender_id` - идентификатор пользователя отправившего сообщение. Если сообщение отправил бот, то
`messag_sender_id` = 0.
* `answer_to_message_id` - идентификатор сообщения на которое дан ответ. Если ответа нет или ответ на служебное
сообщение о создании топика в чатах с форумным типом, то `answer_to_message_id` = 0.
* `message_ai_model` - идентификатор модели нейросети, которая использовалась для генерации ответа. Если ответ'
сгенерирован не был, то `message_ai_model` = null.
* `message_text` - текст сообщения.
руктура таблицы `ChatStats`:
* `chat_id` - идентификатор чата для которого собрана статистика.
* `date` - дата на которую собрана статистика.
* `messages_count` - количество сообщений отправленных в чат за день.
руктура таблицы `UserStats`:
* `chat_id` - идентификатор чата для которого собрана статистика.
* `user_id` - идентификатор пользователя для которого собрана статистика.
* `date` - дата на которую собрана статистика.
* `messages_count` - количество сообщений отправленных пользователем в чат за день.

View File

@@ -1 +0,0 @@
from . import db_api, models

View File

@@ -1,306 +0,0 @@
import peewee as pw
from aiogram.types import Message
from ....service import paths
from ..exceptions.module_exceptions import MissingModuleName, NotExpectedModuleName
from .models.chat_stats import ChatStats
from .models.chats import Chats
from .models.messages import Messages
from .models.user_stats import UserStats
from .models.users import Users
def connect_database(is_test: bool = False, module: str | None = None):
if is_test:
if not module:
raise MissingModuleName()
db_path = f"{paths.modules_standard}/{module}/tests/database"
else:
if module:
raise NotExpectedModuleName()
db_path = f"{paths.core}/database"
_database = pw.SqliteDatabase(f"{db_path}/OCAB.db")
Chats._meta.database = _database
Messages._meta.database = _database
Users._meta.database = _database
UserStats._meta.database = _database
ChatStats._meta.database = _database
return _database, f"{db_path}/OCAB.db"
def create_tables(db: pw.SqliteDatabase):
"""Создание таблиц"""
for table in Chats, Messages, Users, UserStats, ChatStats:
if not table.table_exists():
db.create_tables([table])
def add_chat(chat_id, chat_name, chat_type=10, chat_stats=0):
chat, created = Chats.get_or_create(
id=chat_id,
defaults={
"chat_name": chat_name,
"chat_type": chat_type,
"chat_all_stat": chat_stats,
},
)
if not created:
# Обновить существующий чат, если он уже существует
chat.chat_name = chat_name
chat.chat_type = chat_type
chat.chat_stats = chat_stats
chat.save()
def add_user(
user_id,
user_first_name,
user_last_name=None,
user_tag=None,
user_role=0,
user_stats=0,
user_rep=0,
):
if user_last_name is None:
user_name = user_first_name
else:
user_name = user_first_name + " " + user_last_name
user, created = Users.get_or_create(
id=user_id,
defaults={
"user_tag": user_tag,
"user_name": user_name,
"user_role": user_role,
"user_stats": user_stats,
"user_rep": user_rep,
},
)
if not created:
# Обновить существующего пользователя, если он уже существует
user.user_tag = user_tag
user.user_name = user_name
user.user_role = user_role
user.user_stats = user_stats
user.user_rep = user_rep
user.save()
def add_message(message: Message, message_ai_model=None):
if message.reply_to_message:
answer_to_message_id = message.reply_to_message.message_id
else:
answer_to_message_id = None
Messages.create(
message_chat_id=message.chat.id,
message_id=message.message_id,
message_sender_id=message.from_user.id,
answer_to_message_id=answer_to_message_id,
message_ai_model=message_ai_model,
message_text=message.text,
)
def add_chat_stats(chat_id, date, messages_count):
ChatStats.create(chat_id=chat_id, date=date, messages_count=messages_count)
def add_user_stats(chat_id, user_id, date, messages_count):
UserStats.create(
chat_id=chat_id, user_id=user_id, date=date, messages_count=messages_count
)
# Работа с таблицей чатов
def get_chat(chat_id):
return Chats.get_or_none(Chats.id == chat_id)
def change_chat_name(chat_id, new_chat_name):
query = Chats.update(chat_name=new_chat_name).where(Chats.id == chat_id)
query.execute()
def change_chat_type(chat_id, new_chat_type):
query = Chats.update(chat_type=new_chat_type).where(Chats.id == chat_id)
query.execute()
def get_chat_all_stat(chat_id):
chat = Chats.get_or_none(Chats.id == chat_id)
return chat.chat_all_stat if chat else None
# Работа с таблицей пользователей
def get_user(user_id):
return Users.get_or_none(Users.id == user_id)
def get_user_tag(user_id):
user = Users.get_or_none(Users.id == user_id)
return user.user_tag if user else None
def get_user_id(user_tag):
user = Users.get_or_none(Users.user_tag == user_tag)
return user.id if user else None
def get_user_name(user_id):
user = Users.get_or_none(Users.id == user_id)
return user.user_name if user else None
def get_user_role(user_id):
user = Users.get_or_none(Users.id == user_id)
return user.user_role if user else None
def get_user_all_stats(user_id):
user = Users.get_or_none(Users.id == user_id)
return user.user_stats if user else None
def get_user_rep(user_id):
user = Users.get_or_none(Users.id == user_id)
return user.user_rep if user else None
def change_user_name(user_id, user_first_name, user_last_name=None):
if user_last_name is None:
new_user_name = user_first_name
else:
new_user_name = user_first_name + " " + user_last_name
query = Users.update(user_name=new_user_name).where(Users.id == user_id)
query.execute()
def change_user_tag(user_id, new_user_tag):
query = Users.update(user_tag=new_user_tag).where(Users.id == user_id)
query.execute()
def change_user_role(user_id, new_user_role):
query = Users.update(user_role=new_user_role).where(Users.id == user_id)
query.execute()
# Работа с таблицей сообщений
def get_message(message_chat_id, message_id):
return Messages.get_or_none(
Messages.message_chat_id == message_chat_id, Messages.message_id == message_id
)
def get_message_sender_id(message_chat_id, message_id):
message = Messages.get_or_none(
Messages.message_chat_id == message_chat_id, Messages.message_id == message_id
)
return message.message_sender_id if message else None
def get_message_text(message_chat_id, message_id):
message = Messages.get_or_none(
Messages.message_chat_id == message_chat_id, Messages.message_id == message_id
)
return message.message_text if message else None
def get_message_ai_model(message_chat_id, message_id):
message = Messages.get_or_none(
Messages.message_chat_id == message_chat_id, Messages.message_id == message_id
)
return message.message_ai_model if message else None
def get_answer_to_message_id(message_chat_id, message_id):
message = Messages.get_or_none(
Messages.message_chat_id == message_chat_id, Messages.message_id == message_id
)
return message.answer_to_message_id if message else None
# Работа с таблицей статистики чатов
def get_chat_stats(chat_id):
chat_stats = {}
for chat_stat in ChatStats.select().where(ChatStats.chat_id == chat_id):
chat_stats[chat_stat.date] = chat_stat.messages_count
return chat_stats
# Работа с таблицей статистики пользователей
def get_user_stats(user_id):
user_stats = {}
for user_stat in UserStats.select().where(UserStats.user_id == user_id):
user_stats[user_stat.date] = user_stat.messages_count
return user_stats
# Функции обновления
def update_chat_all_stat(chat_id):
query = Chats.update(chat_all_stat=Chats.chat_all_stat + 1).where(
Chats.id == chat_id
)
query.execute()
def update_chat_stats(chat_id, date):
chat_stats = ChatStats.get_or_none(
ChatStats.chat_id == chat_id, ChatStats.date == date
)
if chat_stats:
query = ChatStats.update(messages_count=ChatStats.messages_count + 1).where(
ChatStats.chat_id == chat_id, ChatStats.date == date
)
query.execute()
else:
ChatStats.create(chat_id=chat_id, date=date, messages_count=1)
def update_user_all_stat(user_id):
user = Users.get_or_none(Users.id == user_id)
if user:
query = Users.update(user_stats=Users.user_stats + 1).where(Users.id == user_id)
query.execute()
else:
Users.create(id=user_id, user_stats=1)
def update_user_rep(user_id):
user = Users.get_or_none(Users.id == user_id)
if user:
query = Users.update(user_rep=Users.user_rep + 1).where(Users.id == user_id)
query.execute()
else:
Users.create(id=user_id, user_rep=1)
def update_user_stats(chat_id, user_id, date):
user_stats = UserStats.get_or_none(
UserStats.chat_id == chat_id,
UserStats.user_id == user_id,
UserStats.date == date,
)
if user_stats:
query = UserStats.update(messages_count=UserStats.messages_count + 1).where(
UserStats.chat_id == chat_id,
UserStats.user_id == user_id,
UserStats.date == date,
)
query.execute()
else:
UserStats.create(chat_id=chat_id, user_id=user_id, date=date, messages_count=1)

View File

@@ -1,6 +0,0 @@
{
"name": "Database",
"description": "Модуль для работы с БД",
"author": "OCAB Team",
"version": "1.0"
}

View File

@@ -1,9 +0,0 @@
import peewee as pw
class ChatStats(pw.Model):
class Meta: ...
chat_id = pw.IntegerField(null=False)
date = pw.DateField(null=False)
messages_count = pw.IntegerField(null=False, default=0)

View File

@@ -1,9 +0,0 @@
import peewee as pw
class Chats(pw.Model):
class Meta: ...
chat_name = pw.CharField(null=False)
chat_type = pw.IntegerField(null=False, default=10)
chat_all_stat = pw.IntegerField(null=False)

View File

@@ -1,12 +0,0 @@
import peewee as pw
class Messages(pw.Model):
class Meta: ...
message_chat_id = pw.IntegerField(null=False)
message_id = pw.IntegerField(null=False)
message_sender_id = pw.IntegerField(null=False)
answer_to_message_id = pw.IntegerField(null=True)
message_ai_model = pw.TextField(null=True)
message_text = pw.TextField(null=False)

View File

@@ -1,10 +0,0 @@
import peewee as pw
class UserStats(pw.Model):
class Meta: ...
chat_id = pw.IntegerField(null=False)
user_id = pw.IntegerField(null=False)
date = pw.DateField(null=False)
messages_count = pw.IntegerField(null=False, default=0)

View File

@@ -1,11 +0,0 @@
import peewee as pw
class Users(pw.Model):
class Meta: ...
user_tag = pw.CharField(null=True)
user_name = pw.CharField(null=False) # до 255 символов
user_role = pw.IntegerField(null=True, default=3)
user_stats = pw.IntegerField(null=True, default=0)
user_rep = pw.IntegerField(null=True, default=0)

View File

@@ -1 +0,0 @@
Эта директория для тестовой БД

View File

@@ -1,142 +0,0 @@
# flake8: noqa
import os
import unittest
from ...exceptions.module_exceptions import MissingModuleName, NotExpectedModuleName
from ..db_api import *
class TestDatabaseAPI(unittest.TestCase):
database = None
path = None
@classmethod
def setUpClass(cls):
cls.database, cls.path = connect_database(is_test=True, module="database")
create_tables(cls.database)
def test_fail_connect(cls):
with cls.assertRaises(MissingModuleName):
cls.database, cls.path = connect_database(is_test=True)
with cls.assertRaises(NotExpectedModuleName):
cls.database, cls.path = connect_database(module="database")
def test_add_and_get_chat(self):
add_chat(chat_id=21, chat_role=0, chat_stats=0, chat_federation=0)
add_chat(chat_id=22, chat_role=1, chat_stats=100, chat_federation=1)
chat1 = get_chat(21)
self.assertIsNotNone(chat1)
self.assertEqual(chat1.id, 21)
self.assertEqual(chat1.chat_role, 0)
chat2 = get_chat(22)
self.assertIsNotNone(chat2)
self.assertEqual(chat2.id, 22)
self.assertEqual(chat2.chat_role, 1)
def test_add_and_get_message(self):
add_message(
message_id=1, message_text="Test Message 1", message_sender=1, answer_id=2
)
add_message(
message_id=2, message_text="Test Message 2", message_sender=2, answer_id=1
)
message1 = get_message(1)
self.assertIsNotNone(message1)
self.assertEqual(message1.id, 1)
self.assertEqual(message1.message_text, "Test Message 1")
message2 = get_message(2)
self.assertIsNotNone(message2)
self.assertEqual(message2.id, 2)
self.assertEqual(message2.message_text, "Test Message 2")
def test_add_and_get_user(self):
add_user(
user_id=100,
user_name="TestUser1",
user_tag="TestTag1",
user_role=0,
user_stats=10,
user_rep=5,
)
add_user(
user_id=101,
user_name="TestUser2",
user_tag="TestTag2",
user_role=1,
user_stats=20,
user_rep=10,
)
user1 = get_user(100)
self.assertIsNotNone(user1)
self.assertEqual(user1.id, 100)
self.assertEqual(user1.user_name, "TestUser1")
user2 = get_user(101)
self.assertIsNotNone(user2)
self.assertEqual(user2.id, 101)
self.assertEqual(user2.user_name, "TestUser2")
def test_get_user_role(self):
add_user(
user_id=102,
user_name="TestUser3",
user_tag="TestTag3",
user_role=0,
user_stats=30,
user_rep=15,
)
add_user(
user_id=103,
user_name="TestUser4",
user_tag="TestTag4",
user_role=1,
user_stats=40,
user_rep=20,
)
user_role1 = get_user_role(102)
self.assertEqual(user_role1, 0)
user_role2 = get_user_role(103)
self.assertEqual(user_role2, 1)
def test_change_user_name(self):
add_user(
user_id=104,
user_name="OldName1",
user_tag="TestTag5",
user_role=0,
user_stats=50,
user_rep=25,
)
change_user_name(104, "NewName1")
updated_user1 = get_user(104)
self.assertEqual(updated_user1.user_name, "NewName1")
add_user(
user_id=105,
user_name="OldName2",
user_tag="TestTag6",
user_role=1,
user_stats=60,
user_rep=30,
)
change_user_name(105, "NewName2")
updated_user2 = get_user(105)
self.assertEqual(updated_user2.user_name, "NewName2")
@classmethod
def tearDownClass(cls):
cls.database.close()
os.system(f"rm {cls.path}") # nosec
if __name__ == "__main__":
unittest.main()

View File

@@ -1 +0,0 @@
from . import module_exceptions

View File

@@ -1,6 +0,0 @@
{
"name": "Exceptions",
"description": "Модуль с исключениями",
"author": "OCAB Team",
"version": "1.0"
}

View File

@@ -1,12 +0,0 @@
class MissingModuleName(BaseException):
def __init__(self):
self.message = "Пропущено название директории модуля"
super().__init__(self.message)
class NotExpectedModuleName(BaseException):
def __init__(self):
self.message = "Не ожидалось название директории модуля"
super().__init__(self.message)

View File

@@ -1,34 +0,0 @@
from aiogram import Bot
from aiogram.filters import BaseFilter
from aiogram.types import Message
from src.core.logger import log
from src.modules.standard.config.config import get_aproved_chat_id
from src.modules.standard.roles.roles import Roles
class ChatModerOrAdminFilter(BaseFilter):
async def __call__(self, message: Message, bot: Bot) -> bool:
user_id = message.from_user.id
roles = Roles()
admins = await bot.get_chat_administrators(message.chat.id)
return (
await roles.check_admin_permission(user_id)
or await roles.check_moderator_permission(user_id)
or any(user_id == admin.user.id for admin in admins)
)
class ChatNotInApproveFilter(BaseFilter):
async def __call__(self, message: Message, bot: Bot) -> bool:
# print("chat_check")
await log("chat_check")
chat_id = message.chat.id
if chat_id in get_aproved_chat_id():
# print(f"Chat in approve list: {chat_id}")
await log(f"Chat in approve list: {chat_id}")
return False
else:
# print(f"Chat not in approve list: {chat_id}")
await log(f"Chat not in approve list: {chat_id}")
return True

View File

@@ -1,6 +0,0 @@
{
"name": "Filters",
"description": "Модуль с фильтрами",
"author": "OCAB Team",
"version": "1.0"
}

View File

@@ -1,70 +0,0 @@
# flake8: noqa
from aiogram import Bot
from aiogram.types import Message
from src.core.logger import log
from src.modules.standard.config.config import get_user_role_name
from src.modules.standard.database.db_api import *
from src.modules.standard.roles.roles import Roles
async def get_info_answer_by_id(message: Message, bot: Bot, user_id: int):
if get_message_ai_model(message.chat.id, message.message_id) is not None:
await message.reply(
"Это сообщение было сгенерировано ботом используя модель: "
+ get_message_ai_model(message.chat.id, message.message_id)
)
elif user_id == bot.id:
await message.reply("Это сообщение было отправлено ботом")
elif get_user(user_id) is None:
await message.reply("Пользователь не найден")
# print(get_user(user_id))
await log(f"Пользователь не найден: {user_id}, {get_user(user_id)}")
else:
roles = Roles()
answer = (
f"Пользователь: {get_user_name(user_id)}\n"
f"Роль: {await roles.get_role_name(role_id=get_user_role(user_id))}\n"
f"Тег: @{get_user_tag(user_id)}\n"
f"Кол-во сообщений: {get_user_all_stats(user_id)}\n"
f"Репутация: {get_user_rep(user_id)}"
)
await message.reply(answer)
async def get_user_info(message: Message, bot: Bot):
# Проверяем содержимое сообщения, если содержит вторым элементом тег пользователя, то выводим информацию о нем
# Если сообщение отвечает на другое сообщение, то выводим информацию о пользователе, на чье сообщение был ответ
# Если это бот то выводим информацию что это бот и какая модель yandexgpt используется
try:
if len(message.text.split()) > 1 and message.text.split()[1].startswith("@"):
user_tag = message.text.split()[1][1:]
user_id = get_user_id(user_tag)
if user_id:
await get_info_answer_by_id(message, bot, user_id)
else:
await message.reply(f"Пользователь с тегом @{user_tag} не найден")
elif message.reply_to_message:
user_id = message.reply_to_message.from_user.id
await get_info_answer_by_id(message, bot, user_id)
else:
await get_info_answer_by_id(message, bot, message.from_user.id)
except Exception as e:
await message.reply(
"В вашем запросе что-то пошло не так,"
" попробуйте запросить информацию о пользователе по его тегу или ответив на его сообщение"
)
# print(e)
await log(e)
async def get_chat_info(message: Message, bot: Bot):
answer = (
f"*Название чата:* {message.chat.title}\n"
f"*ID чата:* `{message.chat.id}`\n \n"
f"*Суммарное количество сообщений в чате:* {get_chat_all_stat(message.chat.id)}\n"
f"*Количество пользователей в чате:* {await bot.get_chat_member_count(message.chat.id)}\n"
f"*Количество администраторов в чате:* {len(await bot.get_chat_administrators(message.chat.id))}"
)
await message.reply(answer, parse_mode="MarkdownV2")

View File

@@ -1,10 +0,0 @@
# flake8: noqa
from aiogram import F, Router
from src.modules.standard.info.handlers import get_chat_info, get_user_info
router = Router()
router.message.register(get_user_info, F.text.startswith("/info") == True)
router.message.register(get_chat_info, F.text.startswith("/chatinfo") == True)

View File

@@ -1,117 +0,0 @@
# flake8: noqa
from aiogram import Bot, F, Router, types
from src.core.logger import log
from src.modules.external.yandexgpt.handlers import answer_to_message
from src.modules.standard.config.config import (
get_aproved_chat_id,
get_yandexgpt_in_words,
get_yandexgpt_start_words,
)
from src.modules.standard.database.db_api import *
async def chat_check(message: types.Message):
# Проверка наличия id чата в базе данных чатов
# Если чата нет в базе данных, то проверяем его в наличии в конфиге и если он там есть то добавляем его в БД
# Если чат есть в базе данных, то pass
if get_chat(message.chat.id) is None:
if message.chat.id in get_aproved_chat_id():
# print(f"Chat in approve list: {message.chat.id} {message.chat.title}")
await log(f"Chat in approve list: {message.chat.id} {message.chat.title}")
add_chat(message.chat.id, message.chat.title)
# print(f"Chat added: {message.chat.id} {message.chat.title}")
await log(f"Chat added: {message.chat.id} {message.chat.title}")
else:
# print(f"Chat not in approve list: {message.chat.id} {message.chat.title}")
await log(
f"Chat not in approve list: {message.chat.id} {message.chat.title}"
)
pass
else:
# Проверяем обновление названия чата
chat = get_chat(message.chat.id)
if chat.chat_name != message.chat.title:
chat.chat_name = message.chat.title
chat.save()
# print(f"Chat updated: {message.chat.id} {message.chat.title}")
await log(f"Chat updated: {message.chat.id} {message.chat.title}")
else:
# print(f"Chat already exists: {message.chat.id} {message.chat.title}")
await log(f"Chat already exists: {message.chat.id} {message.chat.title}")
pass
async def user_check(message: types.Message):
# Проверка наличия id пользователя в базе данных пользователей
# Если пользователя нет в базе данных, то добавляем его
# Если пользователь есть в базе данных, то pass
current_user_name = ""
if message.from_user.last_name is None:
current_user_name = message.from_user.first_name
else:
current_user_name = (
message.from_user.first_name + " " + message.from_user.last_name
)
if get_user(message.from_user.id) is None:
add_user(
message.from_user.id,
message.from_user.first_name,
message.from_user.last_name,
message.from_user.username,
)
# print(f"User added: {message.from_user.id} {message.from_user.first_name} {message.from_user.last_name}")
await log(f"User added: {message.from_user.id} {current_user_name}")
else:
# print(f"User already exists: {message.from_user.id} {message.from_user.first_name} {message.from_user.last_name} {message.from_user.username}")
await log(
f"User already exists: {message.from_user.id} {current_user_name} {message.from_user.username}"
)
# Проверяем обновление имени пользователя
if get_user_name(message.from_user.id) != current_user_name:
change_user_name(message.from_user.id, current_user_name)
# print(f"User updated: {message.from_user.id} {message.from_user.first_name} {message.from_user.last_name}")
await log(f"User name updated: {message.from_user.id} {current_user_name}")
# Проверяем обновление username пользователя
if get_user_tag(message.from_user.id) != message.from_user.username:
change_user_tag(message.from_user.id, message.from_user.username)
# print(f"User updated: {message.from_user.id} {message.from_user.username}")
await log(
f"User tag updated: {message.from_user.id} {message.from_user.username}"
)
pass
async def add_stats(message: types.Message):
# Добавляем пользователю и чату статистику
update_chat_all_stat(message.chat.id)
update_user_all_stat(message.from_user.id)
async def message_processing(message: types.Message, bot: Bot):
await chat_check(message)
await user_check(message)
await add_stats(message)
add_message(message)
# Если сообщение в начале содержит слово из списка или внутри сообщения содержится слово из списка или сообщение отвечает на сообщение бота
if (message.text.split(" ")[0] in get_yandexgpt_start_words()) or (
any(word in message.text for word in get_yandexgpt_in_words())
):
# print("message_processing")
await log("message_processing")
await answer_to_message(message, bot)
elif message.reply_to_message is not None:
if message.reply_to_message.from_user.is_bot:
# print("message_processing")
await log("message_processing")
await answer_to_message(message, bot)
router = Router()
# Если сообщение содержит текст то вызывается функция message_processing
router.message.register(message_processing, F.text)

View File

@@ -1,6 +0,0 @@
{
"name": "Moderation",
"description": "Moderation commands for OCAB",
"author": "OCAB Team",
"version": "1.0"
}

View File

@@ -1,82 +0,0 @@
# flake8: noqa
import asyncio
import time
import aiogram
import aiohttp
from ...standard.config.config import *
from ...standard.roles.roles import *
class Moderation:
def __init__(self):
access_rights = get_access_rights()
bot_check_message = bool(self.access_rights["BOT_CHECK_MESSAGE"])
bot_ban_user = bool(self.access_rights["BOT_BAN_USER"])
bot_mute_user = bool(self.access_rights["BOT_MUTE_USER"])
moderator_rights = self.access_rights["MODERATOR_RIGHTS"]
ai_check_message = bool(self.access_rights["AI_CHECK_MESSAGE"])
beta_ai_check_message = bool(self.access_rights["BETA_AI_CHECK_MESSAGE"])
async def time_to_seconds(time):
# Конвертация текстового указания времени по типу 3h, 5m, 10s в минуты
if time[-1] == "d":
return int(time[:-1]) * 86400
elif time[-1] == "h":
return int(time[:-1]) * 3600
elif time[-1] == "m":
return int(time[:-1]) * 60
elif time[-1] == "s":
return int(time[:-1])
async def short_time_to_time(self, time):
# Конвертация времени в длинное название
if time[-1] == "d":
return str(f"{time[0:-1]} дней")
elif time[-1] == "h":
return str(f"{time[0:-1]} часов")
elif time[-1] == "m":
return str(f"{time[0:-1]} минут")
elif time[-1] == "s":
return str(f"{time[0:-1]} секунд")
async def delete_message(self, chat_id, message_id, bot: aiogram.Bot):
await bot.delete_message(chat_id, message_id)
async def ban_user(self, chat_id, user_id, bot: aiogram.Bot):
await bot.ban_chat_member(chat_id, user_id)
async def mute_user(chat_id, user_id, time, bot: aiogram.Bot):
mutePermissions = {
"can_send_messages": False,
"can_send_audios": False,
"can_send_documents": False,
"can_send_photos": False,
"can_send_videos": False,
"can_send_video_notes": False,
"can_send_voice_notes": False,
"can_send_polls": False,
"can_send_other_messages": False,
"can_add_web_page_previews": False,
"can_change_info": False,
"can_invite_users": False,
"can_pin_messages": False,
"can_manage_topics": False,
}
end_time = time + int(time.time())
await bot.restrict_chat_member(
chat_id, user_id, until_date=end_time, **mutePermissions
)
async def unmute_user(chat_id, user_id, bot: aiogram.Bot):
await bot.restrict_chat_member(
chat_id, user_id, use_independent_chat_permissions=True
)
async def ban_user(chat_id, user_id, bot: aiogram.Bot):
await bot.ban_chat_member(chat_id, user_id)

View File

@@ -1,6 +0,0 @@
{
"name": "Roles",
"description": "Модуль для работы с ролями",
"author": "OCAB Team",
"version": "1.0"
}

View File

@@ -1,58 +0,0 @@
from ..config.config import get_config
from ..database.db_api import get_user_role
yaml_load = get_config()
class Roles:
user = "USER"
moderator = "MODERATOR"
admin = "ADMIN"
bot = "BOT"
__roles = yaml_load["ROLES"]
def __init__(self):
self.user_role_id = self.__roles[self.user]
self.moderator_role_id = self.__roles[self.moderator]
self.admin_role_id = self.__roles[self.admin]
self.bot_role_id = self.__roles[self.bot]
async def check_admin_permission(self, user_id):
match get_user_role(user_id):
case self.admin_role_id:
return True
case _:
return False
async def check_moderator_permission(self, user_id):
match get_user_role(user_id):
case self.moderator_role_id:
return True
case _:
return False
async def get_role_name(self, role_id):
match role_id:
case self.admin_role_id:
return self.admin
case self.moderator_role_id:
return self.moderator
case self.user_role_id:
return self.user
case self.bot_role_id:
return self.bot
case _:
raise ValueError(f"Нет роли с id={role_id}")
async def get_user_permission(self, user_id):
match get_user_role(user_id):
case self.admin_role_id:
return self.admin
case self.moderator_role_id:
return self.moderator
case self.user_role_id:
return self.user
case self.bot_role_id:
return self.bot
case _:
return None

View File

@@ -1,7 +0,0 @@
TELEGRAM:
TOKEN: xxxxxxxxxxxxxxxxxxxx
ROLES:
ADMIN: 0
MODERATOR: 1
USER: 2
BOT: 3

View File

@@ -1,80 +0,0 @@
import os
import unittest
from ...database.db_api import add_user, connect_database, create_tables
from ..roles import Roles
class TestRoles(unittest.IsolatedAsyncioTestCase):
database = None
path = None
@classmethod
def setUpClass(cls):
cls.roles = Roles()
cls.database, cls.path = connect_database(is_test=True, module="roles")
create_tables(cls.database)
add_user(
user_id=1,
user_name="TestUser1",
user_tag="TestTag1",
user_role=0,
user_stats=30,
user_rep=15,
)
add_user(
user_id=2,
user_name="TestUser3",
user_tag="TestTag3",
user_role=1,
user_stats=30,
user_rep=15,
)
add_user(
user_id=3,
user_name="TestUser4",
user_tag="TestTag4",
user_role=2,
user_stats=30,
user_rep=15,
)
add_user(
user_id=4,
user_name="TestUser2",
user_tag="TestTag2",
user_role=3,
user_stats=30,
user_rep=15,
)
async def test_check_admin_permission(cls):
cls.assertTrue(await cls.roles.check_admin_permission(1))
cls.assertFalse(await cls.roles.check_admin_permission(2))
cls.assertFalse(await cls.roles.check_admin_permission(3))
cls.assertFalse(await cls.roles.check_admin_permission(4))
async def test_check_moderator_permission(cls):
cls.assertTrue(await cls.roles.check_moderator_permission(2))
cls.assertFalse(await cls.roles.check_moderator_permission(0))
cls.assertFalse(await cls.roles.check_moderator_permission(1))
cls.assertFalse(await cls.roles.check_moderator_permission(3))
async def test_get_role_name(cls):
cls.assertEqual(await cls.roles.get_role_name(cls.roles.admin_role_id), "ADMIN")
cls.assertEqual(
await cls.roles.get_role_name(cls.roles.moderator_role_id), "MODERATOR"
)
cls.assertEqual(await cls.roles.get_role_name(cls.roles.user_role_id), "USER")
cls.assertEqual(await cls.roles.get_role_name(cls.roles.bot_role_id), "BOT")
with cls.assertRaises(ValueError):
await cls.roles.get_role_name(999) # Несуществующий ID роли
@classmethod
def tearDownClass(cls):
cls.database.close()
os.system(f"rm {cls.path}") # nosec
if __name__ == "__main__":
unittest.main()

View File

@@ -1,86 +0,0 @@
# flake8: noqa
import asyncio
import random
from threading import Thread
from aiogram import Bot
from aiogram.types import Message
from aiogram.types import inline_keyboard_button as types
from aiogram.utils.keyboard import InlineKeyboardBuilder
from src.modules.standard.config.config import get_telegram_check_bot
from src.modules.standard.database.db_api import *
from src.modules.standard.moderation.moderation import ban_user, mute_user, unmute_user
from src.modules.standard.roles.roles import Roles
async def create_math_task():
first_number = random.randint(1, 100) # nosec
second_number = random.randint(1, 100) # nosec
answer = first_number + second_number
fake_answers = []
for i in range(3):
diff = random.randint(1, 10) # nosec
diff_sign = random.choice(["+", "-"]) # nosec
fake_answers.append(answer + diff if diff_sign == "+" else answer - diff)
fake_answers.append(answer)
random.shuffle(fake_answers)
return [answer, first_number, second_number, fake_answers]
async def ban_user_timer(chat_id: int, user_id: int, time: int, bot: Bot):
await asyncio.sleep(time)
if get_user(user_id) is not None:
pass
else:
await ban_user()
async def check_new_user(message: Message, bot: Bot):
print("check_new_user")
if get_telegram_check_bot():
# Проверяем наличие пользователя в базе данных
if get_user(message.from_user.id) is None:
# Выдаём пользователю ограничение на отправку сообщений на 3 минуты
ban_task = Thread(
target=ban_user_timer,
args=(message.chat.id, message.from_user.id, 180, bot),
)
ban_task.start()
# Создаём задачу с отложенным выполнением на 3 минуты
math_task = await create_math_task()
text = f"{math_task[1]} + {math_task[2]}"
builder = InlineKeyboardBuilder()
for answer in math_task[3]:
if answer == math_task[0]:
builder.add(
types.InlineKeyboardButton(
text=answer, callback_data=f"check_math_task_true"
)
)
else:
builder.add(
types.InlineKeyboardButton(
text=answer, callback_data=f"check_math_task_false"
)
)
await message.reply(
f"Приветствую, {message.from_user.first_name}!\n"
f"Для продолжения работы с ботом, пожалуйста, решите математический пример в течении 3х минут:\n"
f"*{text}*",
reply_markup=builder.as_markup(),
)
async def math_task_true(message: Message, bot: Bot):
await message.reply(f"Верно! Добро пожаловать в чат {message.from_user.first_name}")
await unmute_user(message.chat.id, message.from_user.id, bot)
add_user(
message.from_user.id,
message.from_user.first_name + " " + message.from_user.last_name,
message.from_user.username,
)
pass

View File

@@ -1,6 +0,0 @@
{
"name": "Welcome",
"description": "Мо",
"author": "OCAB Team",
"version": "1.0"
}

View File

@@ -1,12 +0,0 @@
from aiogram import F, Router
from src.modules.standard.welcome.handlers import check_new_user
router = Router()
# Если в чат пришел новый пользователь
router.message.register(check_new_user, F.new_chat_members.exists())
# Ловин колбеки от кнопок с callback_data=f"check_math_task_true"
router.callback_query.register(
check_new_user, F.callback_data == "check_math_task_true"
)

View File

@@ -1,26 +0,0 @@
import os.path
from dataclasses import dataclass
from json import loads
@dataclass
class Path:
core: str
modules_standard: str
modules_custom: str
def _get_paths(path_to_json: str):
with open(path_to_json, encoding="utf8") as f:
paths = loads(f.read())
return Path(
core=paths["core"],
modules_standard=paths["modules standard"],
modules_custom=paths["modules custom"],
)
cwd = os.getcwd()
cwd = cwd + "/src"
paths = _get_paths(f"{cwd}/paths.json")