0
0
mirror of https://gitflic.ru/project/maks1ms/ocab.git synced 2025-11-28 10:21:55 +03:00

41 Commits
dev ... OCAB-V2

Author SHA1 Message Date
eecc59ca94 Merged with private/new-module-system 2024-07-14 17:07:46 +03:00
370b4fc648 Merged with private/new-module-system 2024-07-10 19:30:23 +03:00
2a2b9e15e8 Merged with chore/code-quality-tools 2024-07-08 14:20:21 +03:00
ef10f05a73 Добавлены nosec для прохождения bandit 2024-07-08 00:55:33 +03:00
ef0dda07f7 Добавлен bandit в pre-commit-hook 2024-07-08 00:49:04 +03:00
e80a01157f Автоматический рефакторинг и игнорирование flake8
Выполнен автоматический рефакторинг. Для тех файлов,
которые не прошли flake8 - был добавлен `noqa`, чтобы
в будущем исправить эти проблемы
2024-07-08 00:38:01 +03:00
4edeef4003 Добавлен pre-commit 2024-07-08 00:21:50 +03:00
31142dfb1c Добавлены инструменты для повышения качества кода 2024-07-07 23:59:33 +03:00
837613e072 Merged with chore/refactor-core 2024-07-07 21:25:10 +03:00
armatik
9d89aee578 Открытое бета тестирование + фиксы первого дня 2024-05-10 18:18:08 +03:00
armatik
370ac7d02b Исправление конфига 2024-05-01 01:50:29 +03:00
armatik
85a357e37b Множественные мелкие исправления 2024-05-01 01:50:18 +03:00
armatik
661209055d Тестирование YandexGPT API 2024-05-01 01:49:20 +03:00
armatik
53ed19d25c Poetry fix 2024-04-30 23:58:30 +03:00
Armatik
c6e60b7810 Изменение названия db_api в main.py 2024-04-25 15:57:37 +03:00
Armatik
2cbe898a57 Merge remote-tracking branch 'origin/OCAB-V2' into OCAB-V2
# Conflicts:
#	src/core/main.py
2024-04-25 15:57:06 +03:00
Armatik
ca9834206b Начало разработки модуля модерации 2024-04-25 15:55:50 +03:00
Armatik
fb6da3d627 Изменение наименования db_api 2024-04-25 15:55:23 +03:00
Armatik
b7fdd75458 Доработка реквестов и исправление ошибок 2024-04-25 15:54:44 +03:00
Armatik
28e157a9b6 Более понятное наименование 2024-04-25 15:54:27 +03:00
Armatik
192b2ed51c Добавлено получение id роли пользователя 2024-04-25 15:42:17 +03:00
ilyazheprog
4d2f61b1c2 Добавил /chatID
Проверка админ прав используя телеграм
2024-02-25 03:59:26 +07:00
fiersik
4d6ee64731 Обновил pyproject 2024-02-24 23:49:50 +04:00
fiersik
0dc216d9f4 Обновил how-to install deps 2024-02-24 21:13:51 +04:00
ilyazheprog
88f40c7f82 Админ модуль: удаление сообщений
Модуль фильтров
Добавлены роутеры
2024-02-12 14:55:14 +07:00
ilyazheprog
d71fef0fed обновил стек: Python 3.11.6 или выше 2024-02-12 11:48:55 +07:00
qualimock
68406fee53 Fix E302 2024-02-05 15:51:50 +03:00
Armatik
cfaffd337d Мелки исправления форматирование и описание модуля welcome 2024-02-05 15:43:26 +03:00
armatik
6f1efa6b7b Реализован модуль YandexGPT первой версии. Реализованы методы: генерация ответа с помощью YandexGPT и YandexGPT-lite, перевод текста с помощью YandexGPT, проверка орфографии и пунктуации с помощью YandexGPT, краткий пересказ истории чата с помощью Yandex Text Summarization Model. 2024-01-14 00:57:33 +03:00
Armatik
dee4f5bc79 Начало разработки модуля YandexGPT 2024-01-02 22:39:00 +03:00
Armatik
649e780109 Добавлены новые методы и изменены старые в API модуля DataBase. 2024-01-02 21:26:09 +03:00
Armatik
7918620b99 Изменение в моделях баз данных. Добавлен README.md к модулю DataBase с информацией о базе данных. 2024-01-02 14:59:10 +03:00
ilyazheprog
1e7c70078b Исправлены импорты внутри модулей, добавлены: модуль для исключений, скрипт инициализации путей, переделано подключение к бд 2023-12-10 23:04:17 +07:00
ilyazheprog
88db66c360 добавлены модули config, role, исправлена бд 2023-12-07 18:02:04 +07:00
ilyazheprog
bc5de691a7 написана точка входа бота, конфиг, добавлена зависимость PyYAML 2023-11-30 20:23:37 +07:00
ilyazheprog
0d614cee69 модуль Database приведён к правильному виду 2023-11-29 20:09:08 +07:00
ilyazheprog
b33db9203a Уточнил версии в README.md "Технологический стек" 2023-11-27 15:18:20 +07:00
armatik
d7bafdf513 Базовая разметка README.md 2023-11-27 00:27:41 +03:00
ilyazheprog
8378625694 Написал интерфейс взаимодействия с БД и покрыл тестами 2023-11-26 20:23:39 +07:00
ilyazheprog
9dde0530d0 Внедрил poetry, установил в зависимости: aiogram3, peewee, добавил гайд на установку poetry 2023-11-26 12:24:47 +07:00
armatik
9e64cbfc4c Удалены старые директории 2023-11-25 19:46:09 +03:00
131 changed files with 5097 additions and 1219 deletions

1
.bandit Normal file
View File

@@ -0,0 +1 @@
[bandit]

6
.flake8 Normal file
View File

@@ -0,0 +1,6 @@
[flake8]
per-file-ignores =
__init__.py:F401
max-line-length = 88
count = true
extend-ignore = E203,E701

5
.gitignore vendored
View File

@@ -2,5 +2,10 @@
.vscode
.env
env
.venv
venv
__pycache__
OCAB.db
src/paths.json
src/ocab_core/config.yaml
src/ocab_core/log/**/*

30
.pre-commit-config.yaml Normal file
View File

@@ -0,0 +1,30 @@
# See https://pre-commit.com for more information
# See https://pre-commit.com/hooks.html for more hooks
repos:
- repo: https://github.com/crashappsec/pre-commit-sync
rev: 04b0e02eefa7c41bedca7456ad542e60b67c16c6
hooks:
- id: pre-commit-sync
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v3.2.0
hooks:
- id: trailing-whitespace
- id: end-of-file-fixer
- id: check-yaml
- id: check-added-large-files
- repo: https://github.com/PyCQA/isort
rev: 5.13.2 # sync:isort:poetry.lock
hooks:
- id: isort
- repo: https://github.com/psf/black
rev: 24.4.2 # sync:black:poetry.lock
hooks:
- id: black
- repo: https://github.com/PyCQA/flake8
rev: 7.1.0 # sync:flake8:poetry.lock
hooks:
- id: flake8
- repo: https://github.com/PyCQA/bandit
rev: 1.7.9 # sync:bandit:poetry.lock
hooks:
- id: bandit

View File

@@ -1 +1,60 @@
# OpenChatAiBot
# OpenChatAiBot V2
## Что такое OCAB?
OCAB - это бот для Telegram, который призван помочь во взаимодействии с чатом.
Бот поддерживает интеграцию модулей для расширения функционала.
Фактически бот является платформой для запуска созданных для него модулей.
Модули могут взаимодействовать друг с другом или быть полностью независимыми.
## Что такое модуль?
Модуль - это директория, которая содержит в себе код модуля и его конфигурацию.
### Структура модуля
*Будет дополнено после закрытия [issue #17](https://gitflic.ru/project/armatik/ocab/issue/17).*
## Стандартные модули
В стандартный состав бота входят следующие модули:
* `admin` - модуль для модерирования чата. Позволяет удалять сообщения, банить пользователей и т.д.
* `reputation` - модуль репутации пользователей. Позволяет оценивать ответы пользователей и накапливать репутацию.
* `welcome` - модуль приветствия новых пользователей. Позволяет приветствовать новых пользователей в чате, а также
проверять пользователя капчей для предотвращения спама.
* `roles` - модуль ролей. Позволяет назначать пользователям роли и ограничивать доступ к командам бота по ролям.
Является важной частью системы прав доступа и модуля `admin`.
## Дополнительные официальные модули
* `yandexgpt` - модуль для генерации ответов на основе нейросети GPT-3.5. Позволяет боту отвечать на сообщения
пользователей, используя нейросеть. Ключевой особенностью является построение линии контекста для нейросети,
которая позволяет боту отвечать на вопросы, используя контекст предыдущих сообщений. Для этого используется
модуль база данных хранящий историю сообщений.
<!--
* `bugzilla` - модуль для интеграции с BugZilla. Позволяет получать уведомления о новых багах в BugZilla, отслеживать их
статус, формировать стандартизированные сообщения для корректного описания багов. В будущем планируется интеграция с
API BugZilla для возможности создания багов из чата.
* `alt_packages` - модуль для интеграции с AltLinux Packages. Позволяет получать уведомления о новых пакетах в репозитории
AltLinux, поиска пакетов по названию, получения истории изменений, команды для установки пакета из репозитория и
прочей информации о пакете.
* `notes` - модуль заметок. Позволяет сохранять заметки для пользователей и чатов. Заметки являются ссылками на
сообщения в чате.
-->
Список модулей будет пополняться. Идеи для модулей можно оставлять в [issues](https://gitflic.ru/project/armatik/ocab/issue/create).
## Установка бота
### Docker
### Вручную
## Технологический стек
* Python 3.11.6 или выше - основной язык программирования.
* SQLite 3 - база данных для хранения информации о чате и пользователях.
* [Poetry](https://gitflic.ru/project/armatik/ocab/blob?file=how-to%20install%20deps.md&branch=OCAB-V2) - менеджер зависимостей.
* aiogram 3 - библиотека для работы с Telegram API.
* peewee - ORM для работы с базой данных.

60
docs/MODULES-SPEC.md Normal file
View File

@@ -0,0 +1,60 @@
# Спецификация модулей
> **Внимание!**
>
> Данная спецификация еще не закончена и активно разрабатывается.
> Могут быть значительные изменения (breaking changes).
Каждый модуль представлен в виде папки, содержащей два обязательных файла: info.json и `__init__.py`.
## Метаинформация о модуле (info.json)
Этот файл содержит метаинформацию о модуле в формате JSON. Пример структуры info.json приведён ниже:
```json
{
"id": "standard.info",
"name": "Info",
"description": "Модуль с информацией",
"author": "OCAB Team",
"version": "1.0.0",
"privileged": false,
"dependencies": {
"standard.roles": "^1.0.0",
"standard.database": "^1.0.0"
}
}
```
- `id`: Уникальный идентификатор модуля.
- `name`: Название модуля.
- `description`: Описание функциональности модуля.
- `author`: Автор модуля.
- `version`: Версия модуля.
- `privileged`: Булево значение, указывающее, является ли модуль привилегированным.
- `dependencies`: Объект, описывающий зависимости модуля от других модулей с указанием версии.
## Режимы выполнения модулей
Непривилегированный режим (`privileged: false`):
- Модуль выполняется в доверенной среде на основе RestrictedPython (это накладывает ряд ограничений);
- Может использовать только определенный набор разрешенных пакетов
- Имеет доступ к пакету `ocab_core.modules_system.public_api` для взаимодействия с ботом.
Привилегированный режим (`privileged: true`):
- Модуль выполняется без ограничений.
- Имеет полный доступ ко всем пакетам.
- Должен использоваться с осторожностью и только для модулей, требующих расширенных прав.
## Жизненный цикл модуля
1. Загрузка метаданных из `info.json`
2. Проверка зависимостей
3. Загрузка кода модуля из `__init__.py`
4. Вызов функции `module_init` (если она есть)
## Взаимодействие между модулями
Модули могут взаимодействовать друг с другом через [API](../src/ocab_core/modules_system/public_api/__init__.py), предоставляемое системой управления модулями.
Например, есть функция `get_module`. Она позволяет получить модуль или предоставляемые им объекты по его идентификатору.

47
how-to install deps.md Normal file
View File

@@ -0,0 +1,47 @@
## Poetry
### Установка с официального сайта
```shell
curl -sSL https://install.python-poetry.org | python3 -
```
### Установка с PyPi
```shell
python3 -m pip install poetry
```
Доп информация:https://www.8host.com/blog/ustanovka-menedzhera-zavisimostej-poetry/
## Зависимости
### Добавление зависимости
```shell
poetry add NAME
```
`NAME` - название зависимости.
### Установка зависимостей
```shell
poetry install
```
### Обновление зависимостей
```shell
poetry update
```
## Виртуальное окружение
### Создание/активация
```shell
poetry shell
```
### Настройка
Хранить окружение внутри проекта
```shell
poetry config virtualenvs.in-project true
```

1321
poetry.lock generated Normal file

File diff suppressed because it is too large Load Diff

55
pyproject.toml Normal file
View File

@@ -0,0 +1,55 @@
[tool.poetry]
package-mode = false
name = "ocab"
version = "2.0.0"
description = "OCAB is a modular Telegram bot"
license = "GPL-3.0-only"
authors = ["Семён Фомченков <s.fomchenkov@yandex.ru>"]
maintainers = [
"Илья Женецкий <ilya_zhenetskij@vk.com>",
"qualimock <qualimock@yandex.ru>",
"Кирилл Уницаев <fiersik.kouji@yandex.ru>",
"Максим Слипенко <maxim@slipenko.com>"
]
readme = "README.md"
repository = "https://gitflic.ru/project/armatik/ocab"
packages = [{include = "scripts"}]
[tool.poetry.urls]
"Bug Tracker" = "https://gitflic.ru/project/armatik/ocab/issue?status=OPEN"
[tool.poetry.scripts]
test = 'scripts.test:main'
init = 'scripts.init:main'
module = 'scripts.module:main'
[tool.poetry.dependencies]
python = ">=3.11.6,<3.13"
aiogram = "^3.10.0"
peewee = "^3.17.6"
pyyaml = "^6.0.1"
requests = "^2.32.3"
restrictedpython = "^7.1"
dataclasses-json = "^0.6.7"
semver = "^3.0.2"
[tool.poetry.group.dev.dependencies]
flake8 = "^7.1.0"
black = "^24.4.2"
isort = "^5.13.2"
bandit = "^1.7.9"
pre-commit = "^3.7.1"
semver = "^3.0.2"
[tool.black]
line-length = 88
[tool.isort]
profile = "black"
line_length = 88
multi_line_output = 3
skip_gitignore = true
[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"

View File

@@ -1,3 +0,0 @@
aiogram==3.1.1
openai==0.27.8
PyYAML==6.0.1

21
scripts/init.py Normal file
View File

@@ -0,0 +1,21 @@
from json import dumps
from pathlib import Path
def main():
pwd = Path().cwd()
dir_core = pwd / "src" / "ocab_core"
dir_modules_standard = pwd / "src" / "ocab_modules" / "standard"
dir_modules_external = pwd / "src" / "ocab_modules" / "external"
json = {
"core": str(dir_core),
"modules standard": str(dir_modules_standard),
"modules external": str(dir_modules_external),
}
with open("src/paths.json", "w", encoding="utf8") as f:
f.write(dumps(json, indent=4))
if __name__ == "__main__":
main()

115
scripts/module.py Normal file
View File

@@ -0,0 +1,115 @@
import argparse
import json
import os
DEFAULTS = {
"description": "Очень полезный модуль",
"author": "OCAB Team",
"version": "1.0.0",
"privileged": "false",
}
def create_module(args):
module_dir = os.path.join("src/ocab_modules/standard", args.module_name)
os.makedirs(module_dir, exist_ok=True)
module_info = {
"id": args.id,
"name": args.name,
"description": args.description,
"author": args.author,
"version": args.version,
"privileged": args.privileged.lower() == "true",
"dependencies": {},
}
with open(os.path.join(module_dir, "info.json"), "w", encoding="utf-8") as f:
json.dump(module_info, f, ensure_ascii=False, indent=4)
with open(os.path.join(module_dir, "__init__.py"), "w", encoding="utf-8") as f:
f.write("# Init file for the module\n")
print(f"Module {args.module_name} created successfully.")
def interactive_mode(args):
def get_input(prompt, default=None):
if default:
value = input(f"{prompt} [{default}]: ")
return value if value else default
else:
value = input(f"{prompt}: ")
return value
module_name = get_input("Введите название модуля (папки)")
module_id = get_input("Введите ID")
name = get_input("Введите название модуля")
description = get_input(
"Введите описание модуля", args.description or DEFAULTS["description"]
)
author = get_input("Введите автора", args.author or DEFAULTS["author"])
version = get_input("Введите версию", args.version or DEFAULTS["version"])
privileged = get_input(
"Модуль привилегированный (true/false)",
args.privileged or DEFAULTS["privileged"],
)
args = argparse.Namespace(
command="create",
module_name=module_name,
id=module_id,
name=name,
description=description,
author=author,
version=version,
privileged=privileged,
dependencies="",
)
create_module(args)
def main():
parser = argparse.ArgumentParser(
description="Утилита для создания директории модуля с файлами."
)
subparsers = parser.add_subparsers(dest="command", required=True)
create_parser = subparsers.add_parser("create", help="Создать новый модуль")
create_parser.add_argument("--module_name", help="Название директории модуля")
create_parser.add_argument("--id", help="ID модуля")
create_parser.add_argument("--name", help="Название модуля")
create_parser.add_argument("--description", help="Описание модуля")
create_parser.add_argument("--author", help="Автор модуля")
create_parser.add_argument("--version", help="Версия модуля")
create_parser.add_argument(
"--privileged", help="Привилегированный модуль (true/false)"
)
create_parser.add_argument(
"--dependencies", help="Список зависимостей в формате имя:версия через запятую"
)
args = parser.parse_args()
if args.command == "create":
if not all(
[
args.module_name,
args.id,
args.name,
args.description,
args.author,
args.version,
args.privileged,
args.dependencies,
]
):
print("Переход в интерактивный режим...")
interactive_mode(args)
else:
create_module(args)
if __name__ == "__main__":
main()

9
scripts/test.py Normal file
View File

@@ -0,0 +1,9 @@
import subprocess # nosec
def main():
subprocess.run(["python", "-u", "-m", "unittest", "discover"]) # nosec
if __name__ == "__main__":
main()

View File

@@ -1,198 +0,0 @@
import sqlite3
import os
import configparser
from openai import OpenAIError
import time
mother_path = os.path.dirname(os.path.dirname(os.getcwd()))
config = configparser.ConfigParser()
config.read(os.path.join(mother_path, 'src/config.ini'))
database = sqlite3.connect(os.path.join(mother_path, 'DataBase/OCAB_DB.db'))
cursor = database.cursor()
reply_ignore = config['Telegram']['reply_ignore'].split('| ')
reply_ignore = list(map(int, reply_ignore))
#print(reply_ignore)
min_token_for_answer = int(config['Openai']['min_token_for_answer'])
# Импорт библиотек
import openai
max_token_count = int(config['Openai']['max_token_count'])
# Создание файла лога если его нет
if not os.path.exists(os.path.join(mother_path, 'src/OpenAI/GPT35turbo/log.txt')):
with open(os.path.join(mother_path, 'src/OpenAI/GPT35turbo/log.txt'), 'w') as log_file:
log_file.write('')
def openai_response(message_formated_text):
# Запуск OpenAI
# Считаем размер полученного текста
#print(message_formated_text)
count_length = 0
if len(message_formated_text) == 0:
message_formated_text = [
{
"role": "user",
"content": "Напиши короткий ответ говорящий что контекст сообщения слишком длинный и попроси задать вопрос отдельно без ответа на другие сообщения по ключевому слову"
}
]
for message in message_formated_text:
#print(message["content"])
count_length += int(len(message["content"]))
#print(count_length)
try:
response = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=message_formated_text,
max_tokens=max_token_count - count_length
)
except OpenAIError as ex:
if 'on requests per min. Limit: 3 / min. Please try again' in str(ex):
response = ('Извини мой процессор перегрелся, дай мне минутку отдохнуть')
elif 'Bad gateway.' in str(ex):
response = (
'Ой, где я? Кажется кто то перерзал мой интернет кабель, подожди немного пока я его починю')
#запись ошибки в лог с указанием времени и даты
with open(os.path.join(mother_path, 'src/OpenAI/GPT35turbo/log.txt'), 'a') as log_file:
log_file.write('\n' + time.strftime("%d.%m.%Y %H:%M:%S") + ' ' + str(ex))
return response
def sort_message_from_user(message_formated_text, message_id):
# print(int(*(
# cursor.execute("SELECT message_sender FROM message_list WHERE message_id = ?", (message_id,)).fetchone())))
if int(*(
cursor.execute("SELECT message_sender FROM message_list WHERE message_id = ?",
(message_id,)).fetchone())) == 0:
message_formated_text.append({
"role": "assistant",
"content": str(*(cursor.execute("SELECT message_text FROM message_list WHERE message_id = ?",
(message_id,)).fetchone()))
})
else:
message_formated_text.append({
"role": "user",
"content": str(*(cursor.execute("SELECT message_text FROM message_list WHERE message_id = ?",
(message_id,)).fetchone()))
})
#Проверка что длина всех сообщений в кортеже не превышает max_token_count-min_token_for_answer
return message_formated_text
def openai_collecting_message(message_id, message_formated_text):
# собирает цепочку сообщений для OpenAI длинной до max_token_count
# проверяем что сообщение отвечает на другое сообщение
#print(int(*(cursor.execute("SELECT answer_id FROM message_list WHERE message_id = ?", (message_id,)).fetchone())))
#print(reply_ignore)
if int(*(cursor.execute("SELECT answer_id FROM message_list WHERE message_id = ?", (message_id,)).fetchone())) not in reply_ignore:
# Продолжаем искать ответы на сообщения
#print(int(*(cursor.execute("SELECT answer_id FROM message_list WHERE message_id = ?", (message_id,)).fetchone())))
message_formated_text = openai_collecting_message(int(*(cursor.execute("SELECT answer_id FROM message_list WHERE message_id = ?", (message_id,)).fetchone())), message_formated_text)
#Проверяем ID отправителя сообщения, если 0 то это сообщение от бота
sort_message_from_user(message_formated_text, message_id)
else:
# Проверяем ID отправителя сообщения, если 0 то это сообщение от бота
sort_message_from_user(message_formated_text, message_id)
return message_formated_text
def openai_message_processing(message_id):
#проверяем на наличие сообщения в базе данных
if cursor.execute("SELECT message_text FROM message_list WHERE message_id = ?", (message_id,)).fetchone() is None:
return None
else:
# проверяем на то что сообщение влезает в max_token_count с учётом message_formated_text
message_formated_text = [
{
"role": "system",
"content": config['Openai']['story_model']
}
]
if ((len(str(cursor.execute("SELECT message_text FROM message_list WHERE message_id")))) < (max_token_count - len(message_formated_text[0]['content']))):
message_formated_text = openai_collecting_message(message_id, message_formated_text)
count_length = 0
# Обработка невозможности ответить на сообщение
try:
for message in message_formated_text:
count_length += len(message['content'])
while count_length > max_token_count - min_token_for_answer:
message_formated_text.pop(1)
count_length = 0
for message in message_formated_text:
count_length += len(message['content'])
except IndexError:
message_formated_text = [
{
"role": "system",
"content": "Выведи сообщение об ошибке."
}
]
response = openai_response(message_formated_text)
return response
else:
return f"Сообщение слишком длинное, максимальная длина сообщения \
{max_token_count - len(message_formated_text[0]['content'])} символов, укоротите его на \
{len(str(cursor.execute('SELECT message_text FROM message_list WHERE message_id'))) - max_token_count} символов"
def openai_collecting_history_context(start_id, end_id, message_formated_text, message_id = 0):
# собираем список сообщений для OpenAI длинной до 14500 символов начиная с end_id и заканчивая start_id
if message_id == 0:
message_id = end_id
message_formated_text = openai_collecting_history_context(start_id, end_id, message_formated_text, message_id)
elif message_id > start_id:
message_formated_text = openai_collecting_history_context(start_id, end_id, message_formated_text, (message_id - 1))
try:
message_formated_text.append({
"role": "user",
"content": str(*(cursor.execute("SELECT message_text FROM message_list WHERE message_id = ?",
(message_id,)).fetchone()))
})
except Exception:
pass
return message_formated_text
def openai_message_history_processing(start_id, end_id):
message_formated_text = []
message_formated_text = openai_collecting_history_context(start_id, end_id, message_formated_text)
max_token_count_history = 15000
min_token_for_answer_history = 1500
count_length = 0
try:
for message in message_formated_text:
count_length += len(message['content'])
while count_length > max_token_count_history - min_token_for_answer_history:
message_formated_text.pop(1)
count_length = 0
for message in message_formated_text:
count_length += len(message['content'])
except IndexError:
message_formated_text = [
{
"role": "system",
"content": "Выведи сообщение об ошибке."
}
]
message_formated_text.append({
"role": "user",
"content": "Сделай пересказ всей истории сообщений без потери смысла от 3-его лица."
})
try:
response = openai.ChatCompletion.create(
model="gpt-3.5-turbo-16k",
messages=message_formated_text,
max_tokens=max_token_count_history - count_length
)
except OpenAIError as ex:
if 'on requests per min. Limit: 3 / min. Please try again' in str(ex):
response = ('Извини мой процессор перегрелся, дай мне минутку отдохнуть')
elif 'Bad gateway.' in str(ex):
response = (
'Ой, где я? Кажется кто то перерзал мой интернет кабель, подожди немного пока я его починю')
# запись ошибки в лог с указанием времени и даты
with open(os.path.join(mother_path, 'src/OpenAI/GPT35turbo/log.txt'), 'a') as log_file:
log_file.write('\n' + time.strftime("%d.%m.%Y %H:%M:%S") + ' ' + str(ex))
return response

View File

@@ -1,8 +0,0 @@
# Получение сообщений в чате, и запись их в базу данных
from aiogram import types
from src.TelegramBot.main import dp
from main import cursor, config
#импортировать функцию для обработки сообщений из OpenAI

View File

@@ -1,566 +0,0 @@
# Импорт библиотек
import os
from contextlib import suppress
import openai
import configparser
import sqlite3
import asyncio
import sys
import datetime
from time import mktime
from aiogram import Bot, Dispatcher, executor, types
from aiogram.contrib.fsm_storage.memory import MemoryStorage
from aiogram.utils.exceptions import MessageCantBeDeleted, MessageToDeleteNotFound
mother_path = os.path.dirname(os.path.dirname(os.getcwd()))
sys.path.insert(1, mother_path)
# Импорт переменных из файла .ini
config = configparser.ConfigParser()
config.read(os.path.join(mother_path, 'src/config.ini'))
TOKEN = config['Telegram']['token']
OPENAI_API_KEY = (config['Openai']['api_key'])
bot_trigger_front = (config['Telegram']['bot_trigger_front']).split('|')
bot_trigger_all = (config['Telegram']['bot_trigger_all']).split('|')
# удаление лишних элементов массивов
bot_trigger_front.remove('')
bot_trigger_all.remove('')
DB_message_limit = int(config['DataBase']['message_limit'])
# Инициализация бота
bot = Bot(token=TOKEN)
dp = Dispatcher(bot, storage=MemoryStorage())
# Инициализация API OpenAI
openai.api_key = OPENAI_API_KEY
# Инициализация базы данных OCAB_DB в папке DataBase/OCAB_DB.db
# Создаём базу данных sqlite3 по пути /home/armatik/PycharmProjects/OpenChatAiBot/DataBase/OCAB_DB.db
database = sqlite3.connect(os.path.join(mother_path, 'DataBase/OCAB_DB.db'))
cursor = database.cursor()
# Создаём таблицу chat_list
cursor.execute("""CREATE TABLE IF NOT EXISTS chat_list (
chat_id INTEGER PRIMARY KEY,
chat_role INTEGER NOT NULL,
chat_stats INTEGER NOT NULL
)""")
# Создаём таблицу message_list
cursor.execute("""CREATE TABLE IF NOT EXISTS message_list (
message_id INTEGER PRIMARY KEY,
message_text TEXT NOT NULL,
message_sender INTEGER NOT NULL,
answer_id INTEGER
)""")
# Создаём таблицу user_list
cursor.execute("""CREATE TABLE IF NOT EXISTS user_list (
user_id INTEGER PRIMARY KEY,
user_name TEXT NOT NULL,
user_role INTEGER,
user_stats INTEGER
user_rep INTEGER
)""")
#запись информации о чате в базу данных
async def empty_role(id):
cursor.execute("UPDATE user_list SET user_role = ? WHERE user_id = ?", (0, id))
database.commit()
async def check(id):
#проверка что у человека есть роль
user_role = cursor.execute("SELECT user_role FROM user_list WHERE user_id = ?", (id,)).fetchone()[0]
if user_role not in [0, 1, 2]:
await empty_role(id)
async def get_role_name(rolenum):
rolenum = int(rolenum)
if rolenum == 0:
role = config['Roles']['user']
elif rolenum == 1:
role = config['Roles']['moderator']
elif rolenum == 2:
role = config['Roles']['admin']
return role
async def get_role(id):
#получение роли пользователя
user_role = cursor.execute("SELECT user_role FROM user_list WHERE user_id = ?", (id,)).fetchone()[0]
return await get_role_name(user_role)
async def check_admin(id, chat_id):
#Проверка что человек есть в списке администраторов чата
chat_admins = await bot.get_chat_administrators(chat_id)
flag = False
for admin in chat_admins:
if admin.user.id == id:
flag = True
return flag
async def check_moderator(id):
#Проверка что человек имеет роль модератора
if cursor.execute("SELECT user_role FROM user_list WHERE user_id = ?", (id,)).fetchone()[0] >= 1:
return True
else:
return False
async def check_user(id):
#Проверка что человек имеет роль пользователя
if cursor.execute("SELECT user_role FROM user_list WHERE user_id = ?", (id,)).fetchone()[0] == 0:
return True
else:
return False
async def save_message(message):
#Сохранение сообщения в базу данных
cursor.execute("INSERT INTO message_list VALUES (?, ?, ?, ?)", (message.message_id, message.text, message.from_user.id, 0))
if message.reply_to_message is not None:
cursor.execute("UPDATE message_list SET answer_id = ? WHERE message_id = ?", (message.reply_to_message.message_id, message.message_id))
#Если отправитель не зарегистрирован в базе данных, то добавляем его
if cursor.execute("SELECT user_id FROM user_list WHERE user_id = ?", (message.from_user.id,)).fetchone() is None:
cursor.execute("INSERT INTO user_list VALUES (?, ?, ?, ?)", (message.from_user.id, message.from_user.username, 0, 1))
#Добавляем статистику в чат и в данные пользователя, если пользователь не бот.
cursor.execute("UPDATE chat_list SET chat_stats = chat_stats + 1 WHERE chat_id = ?", (message.chat.id,))
cursor.execute("UPDATE user_list SET user_stats = user_stats + 1 WHERE user_id = ?", (message.from_user.id,))
database.commit()
async def time_to_seconds(time):
#Конвертация текстового указания времени по типу 3h, 5m, 10s в минуты
if time[-1] == 'd':
return int(time[:-1])*86400
elif time[-1] == 'h':
return int(time[:-1])*3600
elif time[-1] == 'm':
return int(time[:-1])*60
elif time[-1] == 's':
return int(time[:-1])
async def short_time_to_time(time):
#Конвертация времени в длинное название
if time[-1] == 'd':
return str(f"{time[0:-1]} дней")
elif time[-1] == 'h':
return str(f"{time[0:-1]} часов")
elif time[-1] == 'm':
return str(f"{time[0:-1]} минут")
elif time[-1] == 's':
return str(f"{time[0:-1]} секунд")
@dp.message_handler(commands=['mute'])
async def mute(message: types.Message):
#Проверка что отправитель является администратором чата
try:
if await check_moderator(message.from_user.id):
#Проверка отвечает ли сообщение на другое сообщение
if message.reply_to_message is not None:
time = message.text.split(' ')[1]
#получаем id отправителя сообщение на которое отвечает message
target_id = message.reply_to_message.from_user.id
target_name = cursor.execute("SELECT user_name FROM user_list WHERE user_id = ?", (target_id,)).fetchone()[0]
#Проверка, что человек пользователь
if cursor.execute("SELECT user_role FROM user_list WHERE user_id = ?", (target_id,)).fetchone()[0] == 0:
#ограничения прав пользователя по отправке сообщений на time секунд
time_sec = await time_to_seconds(message.text.split(' ')[1])
if time_sec <= 30 or time_sec >= 31536000:
await message.reply("Время мута должно быть больше 30 секунд и меньше 365 дней")
return
date_string = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
date_time = datetime.datetime.strptime(date_string, "%Y-%m-%d %H:%M:%S")
unix_time = int(mktime(date_time.timetuple()))
await bot.restrict_chat_member(message.chat.id, target_id, until_date=unix_time+time_sec, permissions=types.ChatPermissions(can_send_messages=False))
await message.reply(
f"Пользователь {target_name} замьючен на {await short_time_to_time(time)}")
else:
await message.reply(f"Пользователь [{target_name}](tg://user?id={target_id}) является {await get_role(target_id)} и не может быть замьючен",
parse_mode='Markdown')
return
else:
target_tag = message.text.split(' ')[1]
target_tag = target_tag[1:]
target_id = int(cursor.execute("SELECT user_id FROM user_list WHERE user_name = ?", (target_tag,)).fetchone()[0])
target_name = cursor.execute("SELECT user_name FROM user_list WHERE user_id = ?", (target_id,)).fetchone()[0]
#ограничения прав пользователя по отправке сообщений на time секунд
time_mute = message.text.split(' ')[2]
if cursor.execute("SELECT user_role FROM user_list WHERE user_id = ?", (target_id,)).fetchone()[0] == 0:
#ограничения прав пользователя по отправке сообщений на time секунд
time_sec = await time_to_seconds(message.text.split(' ')[2])
if time_sec <= 30 or time_sec >= 31536000:
await message.reply("Время мута должно быть больше 30 секунд и меньше 365 дней")
return
date_string = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
date_time = datetime.datetime.strptime(date_string, "%Y-%m-%d %H:%M:%S")
unix_time = int(mktime(date_time.timetuple()))
await bot.restrict_chat_member(message.chat.id, target_id, until_date=unix_time+time_sec, permissions=types.ChatPermissions(can_send_messages=False))
await message.reply(
f"Пользователь {target_name} замьючен на {await short_time_to_time(time_mute)}.")
else:
await message.reply(f"Пользователь [{target_name}](tg://user?id={target_id}) является {await get_role(target_id)} и не может быть замьючен",
parse_mode="Markdown")
return
except:
await message.reply("Ошибка данных. Возможно пользователь ещё ничего не написал.")
@dp.message_handler(commands=['unmute'])
async def unmute(message: types.Message):
chat_id= message.chat.id
if await check_moderator(message.from_user.id):
if message.reply_to_message is not None:
target_id = message.reply_to_message.from_user.id
target_name = cursor.execute("SELECT user_name FROM user_list WHERE user_id = ?", (target_id,)).fetchone()[0]
await bot.restrict_chat_member(message.chat.id, target_id, until_date=0, permissions=types.ChatPermissions(can_send_messages=True))
await message.reply(
f"Пользователь [{target_name}](tg://user?id={target_id}) размьючен",
parse_mode="Markdown")
else:
target_tag = message.text.split(' ')[1]
target_tag = target_tag[1:]
target_id = int(cursor.execute("SELECT user_id FROM user_list WHERE user_name = ?", (target_tag,)).fetchone()[0])
target_name = cursor.execute("SELECT user_name FROM user_list WHERE user_id = ?", (target_id,)).fetchone()[0]
#получаем стандартные права чата
if cursor.execute("SELECT user_role FROM user_list WHERE user_id = ?", (target_id,)).fetchone()[0] == 0:
#возвращаем пользователю стандартные права как у новых пользователей
chat_permisson = await bot.get_chat(chat_id)
await bot.restrict_chat_member(message.chat.id, target_id, until_date=0, permissions=chat_permisson.permissions)
await message.reply(
f"Пользователь [{target_name}](tg://user?id={target_id}) размьючен",
parse_mode="Markdown")
#обрабатываем вход пользователя в чат
@dp.message_handler(content_types=['new_chat_members'])
async def new_chat_members(message: types.Message):
#проверяем что пользователь не бот
if not message.new_chat_members[0].is_bot:
#Заносим пользователя в базу данных
cursor.execute("INSERT INTO user_list VALUES (?, ?, ?, ?)", (message.new_chat_members[0].id, message.new_chat_members[0].username, 0, 0))
@dp.message_handler(commands=['chatinfo'])
async def chat_info(message: types.Message):
#Выводит информацию о чате. Название, количество пользователей, количество администраторов, количество модераторов, количество сообщений.
if await check_moderator(message.from_user.id):
chat_id = message.chat.id
chat_title = message.chat.title
chat_members = await bot.get_chat_members_count(chat_id)
chat_admins = await bot.get_chat_administrators(chat_id)
chat_moderators = cursor.execute("SELECT COUNT(user_id) FROM user_list WHERE user_role = 1").fetchone()[0]
chat_messages = cursor.execute("SELECT chat_stats FROM chat_list WHERE chat_id = ?", (chat_id,)).fetchone()[0]
await message.reply(
f"<b>Название чата:</b> {chat_title}\n"
f"<b>Количество пользователей:</b> {chat_members}\n"
f"<b>Количество администраторов:</b> {len(chat_admins)}\n"
f"<b>Количество модераторов:</b> {chat_moderators}\n"
f"<b>Количество сообщений:</b> {chat_messages}",
parse_mode="HTML")
await top10(message)
else:
await message.reply(
f"У вас недостаточно прав для выполнения этой команды.")
@dp.message_handler(commands=['start'])
async def start(message: types.Message):
#проверка что чат есть в базе данных
if cursor.execute("SELECT chat_id FROM chat_list WHERE chat_id = ?", (message.chat.id,)).fetchone() is None:
chat_id = message.chat.id
chat_role = 0
chat_stats = 1
cursor.execute("INSERT INTO chat_list VALUES (?, ?, ?)", (chat_id, chat_role, chat_stats))
database.commit()
await message.reply(
f"{config['Telegram']['start_answer']}",
parse_mode="Markdown")
else:
await message.reply(
f"Чат уже инициализирован.",
parse_mode="Markdown")
# Проверка наличия столбца имени пользователя в базе данных, если его нет, то добавить.
try:
cursor.execute("SELECT user_name FROM user_list")
except sqlite3.OperationalError:
cursor.execute("ALTER TABLE user_list ADD COLUMN user_name TEXT")
database.commit()
await message.reply("База данных пользователей реструктурирована.")
# Проверка наличия столбца репутации пользователя в базе данных, если его нет, то добавить.
try:
cursor.execute("SELECT user_rep FROM user_list")
except sqlite3.OperationalError:
cursor.execute("ALTER TABLE user_list ADD COLUMN user_rep INTEGER")
database.commit()
await message.reply("База данных пользователей реструктурирована.")
@dp.message_handler(commands=['top10'])
async def top10(message: types.Message):
#топ 10 пользователей по количеству сообщений в user_stats в формате: Имя пользователя - количество сообщений
top10 = cursor.execute("SELECT user_id, user_stats FROM user_list ORDER BY user_stats DESC LIMIT 10").fetchall()
top10_message = ''
for user in top10:
username = (cursor.execute("SELECT user_name FROM user_list WHERE user_id = ?", (user[0],)).fetchone())[0]
if username is None:
username = "Аноним"
top10_message += f"{username} - {user[1]}\n"
#в начале сообщения берём текст из config.ini и вставляем в него топ 10 пользователей
await message.reply(
f"{config['Telegram']['top10_answer']}\n{top10_message}")
@dp.message_handler(commands=['aboutme'])
async def aboutme(message: types.Message):
your_id = message.from_id
await check(your_id)
your_name = message.from_user.username
user_stats = cursor.execute("SELECT user_stats FROM user_list WHERE user_id = ?", (message.from_user.id,)).fetchone()
rolenum = cursor.execute("SELECT user_role FROM user_list WHERE user_id = ?", (message.from_user.id,)).fetchone()
user_rep = cursor.execute("SELECT user_rep FROM user_list WHERE user_id = ?", (message.from_user.id,)).fetchone()
if "None" in str(user_rep):
user_rep = 0
cursor.execute("UPDATE user_list SET user_rep = ? WHERE user_id = ?", (user_rep, your_id))
role = await get_role_name(rolenum[0])
#Имя пользователя на следующей строке статистика сообщений на следующей строке права пользователя
#если пользователь есть в списке администраторов чата, то выдаём роль администратор
chat_admins = await bot.get_chat_administrators(message.chat.id)
default_for_admin = int(config['Roles']['default_for_admin'])
no_answer = False
if rolenum[0] < default_for_admin:
for admin in chat_admins:
if admin.user.id == your_id:
if default_for_admin == 0:
await message.reply(
f"<b>Вы обнаружены в списке администраторов чата!</b> \n"
f"<b>Вам будет выдана роль:</b> {config['Roles']['user']}\n"
f"<b>Имя:</b> <a href=\"tg://user?id={str(your_id)}\">{your_name}</a>\n"
f"<b>Кол-во сообщений:</b> {user_stats[0]}\n"
f"<b>Репутация:</b> {user_rep[0]}\n"
f"<b>Роль:</b> {role}",
parse_mode="HTML")
no_answer = True
elif default_for_admin == 1:
await message.reply(
f"<b>Вы обнаружены в списке администраторов чата!</b> \n"
f"<b>Вам будет выдана роль:</b> {config['Roles']['moderator']}\n"
f"<b>Имя:</b> <a href=\"tg://user?id={str(your_id)}\">{your_name}</a>\n"
f"<b>Кол-во сообщений:</b> {user_stats[0]}\n"
f"<b>Репутация:</b> {user_rep[0]}\n"
f"<b>Роль:</b> {role}",
parse_mode="HTML")
no_answer = True
elif default_for_admin == 2:
await message.reply(
f"<b>Вы обнаружены в списке администраторов чата!</b> \n"
f"<b>Вам будет выдана роль:</b> {config['Roles']['admin']}\n"
f"<b>Имя:</b> <a href=\"tg://user?id={str(your_id)}\">{your_name}</a>\n"
f"<b>Кол-во сообщений:</b> {user_stats[0]}\n"
f"<b>Репутация:</b> {user_rep[0]}\n"
f"<b>Роль:</b> {role}",
parse_mode="HTML")
no_answer = True
cursor.execute("UPDATE user_list SET user_role = ? WHERE user_id = ?", (default_for_admin, your_id))
database.commit()
if no_answer == False:
await message.reply(
f"<b>Имя:</b> <a href=\"tg://user?id={str(your_id)}\">{your_name}</a>\n"
f"<b>Кол-во сообщений:</b> {user_stats[0]}\n"
f"<b>Репутация:</b> {user_rep[0]}\n"
f"<b>Роль:</b> {role}",
parse_mode="HTML")
@dp.message_handler(commands=['setrole'])
async def setrole(message: types.Message):
try:
user_role = cursor.execute("SELECT user_role FROM user_list WHERE user_id = ?", (message.from_user.id,)).fetchone()
user_name = message.from_user.username
target_name = message.text.split()[1]
target_name = target_name[1:]
target_role = cursor.execute("SELECT user_role FROM user_list WHERE user_name = ?", (target_name,)).fetchone()[0]
user_id = cursor.execute("SELECT user_id FROM user_list WHERE user_name = ?", (target_name,)).fetchone()
user_id = user_id[0]
except:
await message.reply("Пользователь не найден!")
return
if await check_admin(user_id, message.chat.id) and target_role == 2:
await message.reply("Вы не можете изменить роль этому пользователю!")
else:
if user_role[0] == 2:
try:
#Получаем id пользователя по нику в базе данных
role = message.text.split()[2]
if role == "0" or role == config['Roles']['user']:
role = config['Roles']['user']
rolenum = 0
elif role == "1" or role == config['Roles']['moderator']:
role = config['Roles']['moderator']
rolenum = 1
elif role == "2" or role == config['Roles']['admin']:
role = config['Roles']['admin']
rolenum = 2
cursor.execute("UPDATE user_list SET user_role = ? WHERE user_id = ?", (rolenum, user_id))
database.commit()
await message.reply(
f"Пользователю [{target_name}](tg://user?id={str(user_id)}) выдана роль: {role}",
parse_mode="Markdown")
except:
await message.reply(
f"Ошибка! Проверьте правильность написания команды.",
parse_mode="Markdown")
else:
await message.reply(
f"Ошибка! У вас нет прав на использование этой команды.",
parse_mode="Markdown")
@dp.message_handler(commands=['about'])
async def about(message: types.Message):
# проверка что в сообщении есть тег пользователя
if len(message.text.split()) == 2:
try:
target_name = message.text.split()[1]
target_name = target_name[1:]
target_id = cursor.execute("SELECT user_id FROM user_list WHERE user_name = ?", (target_name,)).fetchone()
target_id = target_id[0]
user_rep = cursor.execute("SELECT user_rep FROM user_list WHERE user_id = ?", (target_id,)).fetchone()[0]
if "None" in str(user_rep):
user_rep = 0
cursor.execute("UPDATE user_list SET user_rep = ? WHERE user_id = ?", (user_rep, target_id))
await check(target_id)
except:
await message.reply(
f"Ошибка! Проверьте правильность написания тега пользователя.",
parse_mode="Markdown")
user_stats = cursor.execute("SELECT user_stats FROM user_list WHERE user_id = ?", (target_id,)).fetchone()[0]
user_role = cursor.execute("SELECT user_role FROM user_list WHERE user_id = ?", (target_id,)).fetchone()[0]
user_role = await get_role_name(user_role)
await message.reply(
f"<b>Имя:</b> {target_name}\n"
f"<b>Кол-во сообщений:</b> {user_stats}\n"
f"<b>Репутация:</b> {user_rep}\n"
f"<b>Роль:</b> {user_role}", parse_mode="HTML")
else:
await message.reply(
f"Ошибка! Проверьте правильность написания команды.")
from src.OpenAI.GPT35turbo.OA_processing import openai_message_history_processing
@dp.message_handler(commands=['history'])
async def history(message: types.Message):
if message.reply_to_message is not None:
# Получаем id сообщения на которое отвечает message
start_message_id = message.reply_to_message.message_id
# Получаем id сообщения message
end_message_id = message.message_id
elif message.text.split()[1] != '':
end_message_id = message.message_id
start_message_id = end_message_id - int(message.text.split()[1])
else:
await message.reply("Ошибка! Проверьте правильность написания команды.")
return
temp_message = await message.reply("Подождите немного, я обрабатываю историю сообщений... Функция ЭКСПЕРЕМЕНТАЛЬНАЯ "
"и может работать некорректно.")
# Получаем историю сообщений
response = openai_message_history_processing(start_message_id, end_message_id)
if response is None:
await message.reply("Я не понял тебя, попробуй перефразировать")
else:
bot_message_id = await message.reply(response['choices'][0]['message']['content'], parse_mode="markdown")
# заносим сообщение в базу данных в качестве message_id мы пишем id сообщения в bot_message_id
cursor.execute("INSERT INTO message_list VALUES (?, ?, ?, ?)",
(bot_message_id.message_id, response['choices'][0]['message']['content'], 0, message.message_id))
# очищаем сообщение
asyncio.create_task(delete_message(temp_message, 0))
from src.OpenAI.GPT35turbo.OA_processing import openai_message_processing
async def delete_message(message: types.Message, sleep_time: int = 0):
await asyncio.sleep(sleep_time)
with suppress(MessageCantBeDeleted, MessageToDeleteNotFound):
await message.delete()
@dp.message_handler()
async def in_message(message: types.Message):
chat_id = message.chat.id
# Получение сообщений в чате, и запись их в базу данных
if (message.chat.type == "private" or message.chat.type == "channel"):
await message.reply(
f"{config['Telegram']['private_answer']}",
parse_mode="Markdown")
elif (message.chat.type != "group" or message.chat.type != "supergroup") and \
message.text != '' and message.text != ' ' and \
(cursor.execute("SELECT chat_role FROM chat_list WHERE chat_id;") == 1): return None
else:
# Запись сообщения в базу данных
await save_message(message)
# Обработка сообщения OpenAI
send_answer = False
typing_mode = False
# импортируем массив триггеров из файла .ini
if message.reply_to_message and message.reply_to_message.from_user.id == (await bot.me).id:
send_answer = True
typing_mode = True
for trigger in bot_trigger_all:
if trigger.lower() in message.text.lower():
send_answer = True
typing_mode = False
for trigger in bot_trigger_front:
if message.text.lower().startswith(trigger.lower()):
send_answer = True
typing_mode = False
if send_answer == False:
# Если сообщение отвечает на другое сообщение, то проверяем наличие слова спасибо
if message.reply_to_message is not None:
if message.text.startswith("Спасибо"):
target_id = message.reply_to_message.from_user.id
target_name = cursor.execute("SELECT user_name FROM user_list WHERE user_id = ?", (target_id,)).fetchone()[0]
cursor.execute("UPDATE user_list SET user_rep = user_rep + 1 WHERE user_id = ?", (target_id,))
database.commit()
await message.reply(
f"Спасибо, [{target_name}](tg://user?id={str(target_id)}) за ваш ответ!",
parse_mode="Markdown")
if send_answer:
if typing_mode is False:
your_id = message.from_id
your_name = message.from_user.username
temp_msg = await message.reply(
f"[{your_name}](tg://user?id={str(your_id)}), Подожди немного и я обязательно отвечу тебе!",
parse_mode="Markdown")
# Пишем что бот печатает
await bot.send_chat_action(message.chat.id, "typing")
# Получаем ответ от OpenAI
response = openai_message_processing(message.message_id)
if response is None:
bot_message_id = await message.reply("Я не понял тебя, попробуй перефразировать")
if typing_mode is False:
asyncio.create_task(delete_message(temp_msg, 0))
# заносим сообщение в базу данных в качестве message_id пишем id сообщения которое отправил бот
cursor.execute("INSERT INTO message_list VALUES (?, ?, ?, ?)",
(bot_message_id, "Я не понял тебя, попробуй перефразировать", 0, message.message_id))
else:
bot_message_id = await message.reply(response['choices'][0]['message']['content'], parse_mode="markdown")
if typing_mode is False:
asyncio.create_task(delete_message(temp_msg, 0))
# заносим сообщение в базу данных в качестве message_id мы пишем id сообщения в bot_message_id
cursor.execute("INSERT INTO message_list VALUES (?, ?, ?, ?)",
(bot_message_id.message_id, response['choices'][0]['message']['content'], 0, message.message_id))
# очищаем базу данных от старых сообщений
cursor.execute("DELETE FROM message_list WHERE message_id < ?", (bot_message_id.message_id - DB_message_limit,))
database.commit()
if __name__ == '__main__':
executor.start_polling(dp, skip_updates=True)

2
src/__init__.py Normal file
View File

@@ -0,0 +1,2 @@
import ocab_core
import service

View File

@@ -1,43 +0,0 @@
[Telegram]
token=****
admin_password="Test_pass"
# Пока не используется
# Массивы заполнять через запятую. Пример: test|test2 |test3,
bot_trigger_front=
# Живой пример: Арма |Армат |Арма, |
bot_trigger_all=
# Живой пример: @arma_ai_bot |помогите |
private_answer=
# Живой пример: Я не понимаю тебя, но я могу поговорить с тобой в группе [название группы](https://t.me/) и ещё в некоторых других группах
reply_ignore=0
# По умолчанию: 0
# Содержит в себе все id топиков чата для чатов с форумным типом, если не заполнить контекст бота СЛОМАЕТСЯ!
# Пример заполнения для одного из чатов: 0| 643885| 476959| 1| 476977| 633077| 630664| 476966| 634567
start_answer= Привет! Я OCAB, открытый чат бот с ИИ для вашего чата!
top10_answer= Вот топ 10 самых разговорчивых пользователей чата:
[Roles]
user= Пользователь
moderator= Модератор
admin= Администратор
default_for_admin= 2
[Openai]
api_key=****
chat_model=gpt-3.5-turbo
story_model=
# Тут должен быть текст истории бота, но я его не показываю)))
max_token_count=4000
# максимальное количество токенов в сообщении
min_token_for_answer=800
# минимальное количество токенов в сообщении ответа бота (чем больше, тем более длинный ответ ГАРАНТИРОВАН)
[DataBase]
message_limit=3000
# Максимальное количество сообщений в базе данных
[AI_Dungeon]
use=openai
# "openai" or "YaGPT" Пока не используется

View File

@@ -1,23 +0,0 @@
TELEGRAM:
TOKEN: 1234567890ABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890
PRIVATE_ANSWER:
BOT:
ANSWER:
HELP_ANSWER:
START_ANSWER:
ABOUT_ANSWER:
ROLES:
ADMIN:
MODERATOR:
USER:
DEFAULT_FOR_ADMIN:
DATABASE:
MESSAGE_LIMIT:
GPT_MODULE:
TRIGGER_FRONT:
TRIGGER_ALL:
REPLY_IGNORE:

View File

@@ -1,5 +0,0 @@
from aiogram.filters import BaseFilter
from aiogram.fsm.context import FSMContext
from aiogram.types import Message
# You filters

View File

@@ -1,4 +0,0 @@
from aiogram.fsm.context import FSMContext
from aiogram.types import Message
# Your handlers

View File

@@ -1,3 +0,0 @@
from aiogram.types import KeyboardButton, ReplyKeyboardMarkup
# Your keyboards

View File

@@ -1,3 +0,0 @@
from aiogram.fsm.context import FSMContext
# Your funcs

View File

@@ -1,5 +0,0 @@
from aiogram.fsm.state import State, StatesGroup
class MyState(StatesGroup):
state1 = State()

View File

@@ -1 +0,0 @@
# Here text of buttons

View File

@@ -1,14 +0,0 @@
from aiogram import Dispatcher
def registry_middlewares(dp: Dispatcher):
pass
def registry_handlers(dp: Dispatcher):
pass
def registry(dp: Dispatcher):
registry_middlewares(dp)
registry_handlers(dp)

View File

@@ -1,62 +0,0 @@
import openai
import configparser
from dataclasses import dataclass
@dataclass
class App:
telegram_token: str
openai_key: str
bot_trigger_front: list[str]
bot_trigger_all: list[str]
@dataclass
class Database:
path: str
message_limit: int
@dataclass
class Config:
app: App
database: Database
def _clean_bot_trigger(bot_trigger_front, bot_trigger_all):
"""удаление лишних элементов массивов"""
bot_trigger_front.remove('')
bot_trigger_all.remove('')
def get_settings():
# Импорт переменных из файла .ini
config = configparser.ConfigParser()
config.read('core/config.ini')
bot_trigger_front = (config['Telegram']['bot_trigger_front']).split('|')
bot_trigger_all = (config['Telegram']['bot_trigger_all']).split('|')
_clean_bot_trigger(bot_trigger_front, bot_trigger_all)
return Config(
app=App(
telegram_token=config['Telegram']['token'],
openai_key=config['Openai']['api_key'],
bot_trigger_front = bot_trigger_front,
bot_trigger_all = bot_trigger_all,
),
database=Database(
path="core/database/data.db",
message_limit=int(config['DataBase']['message_limit']),
),
)
def _openai_init(settings: Config):
openai.api_key = settings.app.openai_key
settings = get_settings()
_openai_init(settings)

View File

@@ -1,28 +0,0 @@
from asyncio import run
from aiogram import Bot, Dispatcher
from core.utils.registry import registry
from core.utils.settings import settings
async def start(bot: Bot, dp: Dispatcher):
try:
await dp.start_polling(bot)
finally:
await bot.session.close()
# close db connection here
def main():
bot = Bot(token=settings.app.telegram_token)
dp = Dispatcher()
registry(dp)
run(start(bot, dp))
if __name__ == "__main__":
main()

View File

@@ -1,43 +0,0 @@
[Telegram]
token=****
admin_password="Test_pass"
# Пока не используется
# Массивы заполнять через запятую. Пример: test|test2 |test3,
bot_trigger_front=
# Живой пример: Арма |Армат |Арма, |
bot_trigger_all=
# Живой пример: @arma_ai_bot |помогите |
private_answer=
# Живой пример: Я не понимаю тебя, но я могу поговорить с тобой в группе [название группы](https://t.me/) и ещё в некоторых других группах
reply_ignore=0
# По умолчанию: 0
# Содержит в себе все id топиков чата для чатов с форумным типом, если не заполнить контекст бота СЛОМАЕТСЯ!
# Пример заполнения для одного из чатов: 0| 643885| 476959| 1| 476977| 633077| 630664| 476966| 634567
start_answer= Привет! Я OCAB, открытый чат бот с ИИ для вашего чата!
top10_answer= Вот топ 10 самых разговорчивых пользователей чата:
[Roles]
user= Пользователь
moderator= Модератор
admin= Администратор
default_for_admin= 2
[Openai]
api_key=****
chat_model=gpt-3.5-turbo
story_model=
# Тут должен быть текст истории бота, но я его не показываю)))
max_token_count=4000
# максимальное количество токенов в сообщении
min_token_for_answer=800
# минимальное количество токенов в сообщении ответа бота (чем больше, тем более длинный ответ ГАРАНТИРОВАН)
[DataBase]
message_limit=3000
# Максимальное количество сообщений в базе данных
[AI_Dungeon]
use=openai
# "openai" or "YaGPT" Пока не используется

View File

@@ -1,23 +0,0 @@
TELEGRAM:
TOKEN: 1234567890ABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890
PRIVATE_ANSWER:
BOT:
ANSWER:
HELP_ANSWER:
START_ANSWER:
ABOUT_ANSWER:
ROLES:
ADMIN:
MODERATOR:
USER:
DEFAULT_FOR_ADMIN:
DATABASE:
MESSAGE_LIMIT:
GPT_MODULE:
TRIGGER_FRONT:
TRIGGER_ALL:
REPLY_IGNORE:

View File

@@ -1 +0,0 @@

View File

@@ -1,21 +0,0 @@
import os
from yaml import load, Loader
class Config:
def __init__(self):
self.bot_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.getcwd())))
self.config_dir = os.path.join(self.bot_dir, 'config.yaml')
def get_config(self):
return load(open(self.config_dir), Loader=Loader)
def get_bot_token(self):
return self.get_config()['BOT']['TOKEN']
def get_roles(self):
return self.get_config()['ROLES']
def get_bot_answers(self):
return self.get_config()['BOT']['ANSWER']

View File

@@ -1,131 +0,0 @@
import os
import sqlite3
class DataBase:
def __init__(self):
self.bot_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.getcwd())))
self.db = sqlite3.connect(os.path.join(self.bot_dir, 'DataBase/OCAB_DB.db'))
self.cursor = self.db.cursor()
# Проверки наличия таблиц в базе данных
def check_db(self):
self.check_chatdb()
self.check_userdb()
self.check_messagedb()
def check_chatdb(self):
self.cursor.execute("""SELECT name FROM sqlite_master WHERE type='table' AND name='chat_list'""")
if self.cursor.fetchone() is None:
self.create_chatdb()
def check_userdb(self):
self.cursor.execute("""SELECT name FROM sqlite_master WHERE type='table' AND name='user_list'""")
if self.cursor.fetchone() is None:
self.create_userdb()
def check_messagedb(self):
self.cursor.execute("""SELECT name FROM sqlite_master WHERE type='table' AND name='message_list'""")
if self.cursor.fetchone() is None:
self.create_messagedb()
# Создание таблиц в базе данных
def create_chatdb(self):
self.cursor.execute("""CREATE TABLE IF NOT EXISTS chat_list (
chat_id INTEGER PRIMARY KEY,
chat_role INTEGER NOT NULL,
chat_stats INTEGER NOT NULL,
chat_federation INTEGER,
)""")
self.db.commit()
def create_userdb(self):
self.cursor.execute("""CREATE TABLE IF NOT EXISTS user_list (
user_id INTEGER PRIMARY KEY,
user_name TEXT NOT NULL,
user_tag TEXT,
user_role INTEGER,
user_stats INTEGER
user_rep INTEGER
)""")
self.db.commit()
def create_messagedb(self):
self.cursor.execute("""CREATE TABLE IF NOT EXISTS message_list (
message_id INTEGER PRIMARY KEY,
message_text TEXT NOT NULL,
message_id_sender INTEGER NOT NULL,
answer_to_id INTEGER
)""")
self.db.commit()
async def add_chat(self, chat_id, chat_role, chat_stats=0, chat_federation=0):
self.cursor.execute("""INSERT INTO chat_list VALUES (?, ?, ?, ?)""",
(chat_id, chat_role, chat_stats, chat_federation))
async def add_user(self, user_id, user_name, user_tag=None, user_role=0, user_stats=0, user_rep=0):
self.cursor.execute("""INSERT INTO user_list VALUES (?, ?, ?, ?, ?, ?)""",
(user_id, user_name, user_tag, user_role, user_stats, user_rep))
self.db.commit()
async def add_message(self, message_id, message_text, message_sender, answer_id):
self.cursor.execute("""INSERT INTO message_list VALUES (?, ?, ?, ?)""",
(message_id, message_text, message_sender, answer_id))
self.db.commit()
async def get_chat(self, chat_id):
self.cursor.execute("""SELECT * FROM chat_list WHERE chat_id=?""", (chat_id,))
return self.cursor.fetchone()
async def get_user(self, user_id):
self.cursor.execute("""SELECT * FROM user_list WHERE user_id=?""", (user_id,))
return self.cursor.fetchone()
async def get_user_role(self, user_id):
self.cursor.execute("""SELECT user_role FROM user_list WHERE user_id=?""", (user_id,))
return self.cursor.fetchone()
async def get_message(self, message_id):
self.cursor.execute("""SELECT * FROM message_list WHERE message_id=?""", (message_id,))
return self.cursor.fetchone()
async def change_user_name(self, user_id, new_user_name):
self.cursor.execute("""UPDATE user_list SET user_name=? WHERE user_id=?""", (new_user_name, user_id))
self.db.commit()
async def change_user_role(self, user_id, new_user_role):
self.cursor.execute("""UPDATE user_list SET user_role=? WHERE user_id=?""", (new_user_role, user_id))
self.db.commit()
async def change_user_stats(self, user_id, new_user_stats):
self.cursor.execute("""UPDATE user_list SET user_stats=? WHERE user_id=?""", (new_user_stats, user_id))
self.db.commit()
async def change_user_rep(self, user_id, new_user_rep):
self.cursor.execute("""UPDATE user_list SET user_rep=? WHERE user_id=?""", (new_user_rep, user_id))
self.db.commit()
async def change_chat_role(self, chat_id, new_chat_role):
self.cursor.execute("""UPDATE chat_list SET chat_role=? WHERE chat_id=?""", (new_chat_role, chat_id))
self.db.commit()
async def change_chat_stats(self, chat_id, new_chat_stats):
self.cursor.execute("""UPDATE chat_list SET chat_stats=? WHERE chat_id=?""", (new_chat_stats, chat_id))
self.db.commit()
async def change_chat_federation(self, chat_id, new_chat_federation):
self.cursor.execute("""UPDATE chat_list SET chat_federation=? WHERE chat_id=?""",
(new_chat_federation, chat_id))
self.db.commit()
async def update_user_stats(self):
self.cursor.execute("""UPDATE user_list SET user_stats=user_stats+1""")
self.db.commit()
async def update_user_rep(self):
self.cursor.execute("""UPDATE user_list SET user_rep=user_rep+1""")
self.db.commit()
async def update_chat_stats(self):
self.cursor.execute("""UPDATE chat_list SET chat_stats=chat_stats+1""")
self.db.commit()

View File

@@ -1,33 +0,0 @@
from Config import Config
from DataBase import DataBase
class Roles:
def __init__(self):
self.DB = DataBase()
self.Config = Config()
self.user_role_name = self.Config.get_roles()['USER']
self.moderator_role_name = self.Config.get_roles()['MODERATOR']
self.admin_role_name = self.Config.get_roles()['ADMIN']
async def check_admin_permission(self, user_id):
if await self.DB.get_user_role(user_id) == 2:
return True
else:
return False
async def check_moderator_permission(self, user_id):
if await self.DB.get_user_role(user_id) == 1:
return True
else:
return False
async def get_role_name(self, role_number):
if role_number == 0:
return self.user_role_name
elif role_number == 1:
return self.moderator_role_name
elif role_number == 2:
return self.admin_role_name
else:
return None

View File

@@ -0,0 +1,21 @@
TELEGRAM:
TOKEN: xxxxxxxxxxxxxxxxxxxxxxxxx
APPROVED_CHAT_ID: "-123456789 | -012345678"
ADMINCHATID: -12345678
DEFAULT_CHAT_TAG: "@alt_gnome_chat"
CHECK_BOT: True
YANDEXGPT:
TOKEN: xxxxxxxxxxxxxxxxxxxxxxxxx
TOKEN_FOR_REQUEST: 8000
TOKEN_FOR_ANSWER: 2000
CATALOGID: xxxxxxxxxxxxxxxxxxxxxxxxx
PROMPT: "Ты чат-бот ..."
STARTWORD: "Бот| Бот, | бот | бот,"
INWORD: "помогите | не работает"
ROLES:
ADMIN: 2
MODERATOR: 1
USER: 0
BOT: 3

29
src/ocab_core/logger.py Normal file
View File

@@ -0,0 +1,29 @@
import logging
import os
import time
import traceback
def setup_logger():
"""
Настройка логирования
"""
current_date = time.strftime("%d-%m-%Y")
log_dir = os.path.join(os.path.dirname(__file__), "log")
os.makedirs(log_dir, exist_ok=True)
log_file = os.path.join(log_dir, f"log-{current_date}.log")
logging.basicConfig(
filename=log_file,
level=logging.INFO,
format="%(asctime)s %(message)s",
datefmt="%H:%M:%S",
)
def log(message):
if isinstance(message, Exception):
error_message = f"Error: {str(message)}\n{traceback.format_exc()}"
logging.error(error_message)
else:
logging.info(message)

61
src/ocab_core/main.py Normal file
View File

@@ -0,0 +1,61 @@
import asyncio
import traceback
from aiogram import Bot, Dispatcher
from ocab_core.logger import log, setup_logger
from ocab_core.modules_system import ModulesManager
from ocab_core.modules_system.loaders import FSLoader
from ocab_core.modules_system.loaders.unsafe_fs_loader import UnsafeFSLoader
from ocab_core.singleton import Singleton
from ocab_modules.standard.config.config import get_telegram_token
from service import paths
bot_modules = [
UnsafeFSLoader(f"{paths.modules_standard}/config"),
UnsafeFSLoader(f"{paths.modules_standard}/database"),
UnsafeFSLoader(f"{paths.modules_standard}/fsm_database_storage"),
UnsafeFSLoader(f"{paths.modules_standard}/roles"),
UnsafeFSLoader(f"{paths.modules_external}/yandexgpt"),
FSLoader(f"{paths.modules_standard}/command_helper"),
FSLoader(f"{paths.modules_standard}/info"),
FSLoader(f"{paths.modules_standard}/filters"),
FSLoader(f"{paths.modules_external}/create_report_apps"),
FSLoader(f"{paths.modules_standard}/admin"),
FSLoader(f"{paths.modules_standard}/message_processing"),
]
async def main():
bot = None
setup_logger()
app = Singleton()
try:
app.bot = Bot(token=get_telegram_token())
app.modules_manager = ModulesManager()
for module_loader in bot_modules:
info = module_loader.info()
log(f"Loading {info.name}({info.id}) module")
await app.modules_manager.load(module_loader)
app.dp = Dispatcher(storage=app.storage["_fsm_storage"])
app.dp.include_routers(*app.storage["_routers"])
for middleware in app.storage["_outer_message_middlewares"]:
app.dp.message.outer_middleware.register(middleware)
await app.modules_manager.late_init()
await app.dp.start_polling(app.bot)
except Exception:
traceback.print_exc()
finally:
if bot is not None:
await app.bot.session.close()
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1 @@
from .modules_manager import ModulesManager

View File

@@ -0,0 +1 @@
from .fs_loader import FSLoader

View File

@@ -0,0 +1,24 @@
import types
from dataclasses import dataclass
from dataclasses_json import dataclass_json
@dataclass_json
@dataclass
class ModuleInfo:
id: str
name: str
description: str
version: str
author: str | list[str]
privileged: bool
dependencies: dict
class AbstractLoader:
def info(self) -> ModuleInfo:
raise NotImplementedError
def load(self) -> types.ModuleType:
raise NotImplementedError

View File

@@ -0,0 +1,73 @@
import types
from pathlib import Path
from RestrictedPython import compile_restricted_exec
from ocab_core.modules_system.loaders.unsafe_fs_loader import UnsafeFSLoader
from ocab_core.modules_system.safe.policy import (
ALLOWED_IMPORTS,
BUILTINS,
RestrictedPythonPolicy,
)
class FSLoader(UnsafeFSLoader):
def __init__(self, path):
super().__init__(path)
self.builtins = BUILTINS.copy()
self.builtins["__import__"] = self._hook_import
def load(self):
info = self.info()
if info.privileged:
raise Exception("Only non privileged modules are allowed to be imported")
return self._hook_import(".")
def _resolve_module_from_path(self, module_name: str):
path = Path(self.path)
if module_name != ".":
path = path.joinpath(module_name.replace(".", "/"))
if path.is_dir():
init_file_path = path / "__init__.py"
if not init_file_path.exists():
raise FileNotFoundError(f"File {init_file_path} does not exist.")
file_path = init_file_path
else:
path = path.with_suffix(".py")
if path.is_file():
file_path = path
else:
raise ValueError(f"Module not found: {module_name}")
return file_path
def _hook_import(self, name: str, *args, **kwargs):
for allowed in ALLOWED_IMPORTS:
if name == allowed or name.startswith(f"{allowed}."):
return __import__(name, *args, **kwargs)
if name == "ocab_core.modules_system.public_api":
return __import__(name, *args, **kwargs)
module_file_path = self._resolve_module_from_path(name)
with open(module_file_path, "r") as f:
src = f.read()
module = types.ModuleType(name)
module.__dict__.update(
{
"__builtins__": self.builtins,
}
)
result = compile_restricted_exec(src, "<string>", policy=RestrictedPythonPolicy)
if result.errors:
for error in result.errors:
print(error)
exec(result.code, module.__dict__) # nosec
return module

View File

@@ -0,0 +1 @@
from .FSLoader import FSLoader

View File

@@ -0,0 +1,61 @@
import importlib.util
import os
import sys
from pathlib import Path
from ocab_core.modules_system.loaders.base import AbstractLoader, ModuleInfo
class UnsafeFSLoader(AbstractLoader):
def __init__(self, path):
self.path = path
def info(self):
with open(os.path.join(self.path, "info.json"), "r") as f:
return ModuleInfo.from_json(f.read())
def _resolve_module_from_path(self, module_name: str):
path = Path(self.path)
if module_name != ".":
path = path.joinpath(module_name.replace(".", "/"))
if path.is_dir():
init_file_path = path / "__init__.py"
if not init_file_path.exists():
raise FileNotFoundError(f"File {init_file_path} does not exist.")
file_path = init_file_path
else:
path = path.with_suffix(".py")
if path.is_file():
file_path = path
else:
raise ValueError(f"Module not found: {module_name}")
return file_path
def load(self):
self.info()
full_path = self._resolve_module_from_path(".")
if full_path.name == "__init__.py":
module_name = full_path.parent.name
path = full_path.parent.parent.absolute()
else:
module_name = full_path.stem
path = full_path.parent.absolute()
# Добавляем директорию модуля в sys.path
sys.path.insert(0, str(path))
# Загружаем спецификацию модуля
spec = importlib.util.spec_from_file_location(module_name, full_path)
# Создаем модуль
module = importlib.util.module_from_spec(spec)
# Выполняем модуль
spec.loader.exec_module(module)
return module

View File

@@ -0,0 +1 @@
from .UnsafeFSLoader import UnsafeFSLoader

View File

@@ -0,0 +1,85 @@
import semver
from ocab_core.modules_system.loaders.base import AbstractLoader
def is_version_compatible(version, requirement):
def parse_requirement(req):
if req.startswith("^"):
base_version = req[1:]
base_version_info = semver.VersionInfo.parse(base_version)
range_start = base_version_info
range_end = base_version_info.bump_major()
return [f">={range_start}", f"<{range_end}"]
else:
return [req]
for r in parse_requirement(requirement):
if not semver.Version.parse(version).match(r):
return False
return True
class ModulesManager:
def __init__(self):
self.modules = []
async def load(self, loader: AbstractLoader):
info = loader.info()
# Check if the module is already loaded
if any(mod["info"].id == info.id for mod in self.modules):
return
# Check dependencies
for dependency, version in info.dependencies.items():
loaded_dependency = next(
(mod for mod in self.modules if mod["info"].id == dependency), None
)
if not loaded_dependency:
raise Exception(
f"Module {info.id} depends on {dependency}, but it is not loaded"
)
loaded_dependency_info = loaded_dependency["info"]
if not is_version_compatible(loaded_dependency_info.version, version):
raise Exception(
f"Module {info.id} depends on {dependency}, "
f"but version {version} is not compatible"
)
module = loader.load()
self.modules.append(
{
"info": info,
"module": module,
}
)
if hasattr(module, "module_init"):
await module.module_init()
async def late_init(self):
for m in self.modules:
module = m["module"]
if hasattr(module, "module_late_init"):
await module.module_late_init()
def get_by_id(self, module_id: str):
module = next(
(mod for mod in self.modules if mod["info"].id == module_id), None
)
if not module:
raise Exception(f"Module with id {module_id} not loaded")
return module["module"]
def get_info_by_id(self, module_id: str):
module = next(
(mod for mod in self.modules if mod["info"].id == module_id), None
)
if not module:
raise Exception(f"Module with id {module_id} not loaded")
return module["info"]

View File

@@ -0,0 +1,10 @@
from .public_api import (
Storage,
get_fsm_context,
get_module,
log,
register_outer_message_middleware,
register_router,
set_my_commands,
)
from .utils import Utils

View File

@@ -0,0 +1,87 @@
import types
from typing import Any, Tuple, Union
from aiogram import BaseMiddleware, Router
from aiogram.fsm.context import FSMContext
from aiogram.fsm.storage.base import StorageKey
from ocab_core.logger import log
from ocab_core.singleton import Singleton
def register_router(router: Router):
app = Singleton()
app.storage["_routers"].append(router)
def register_outer_message_middleware(middleware: BaseMiddleware):
app = Singleton()
app.storage["_outer_message_middlewares"].append(middleware)
async def set_my_commands(commands):
app = Singleton()
await app.bot.set_my_commands(commands)
async def get_fsm_context(chat_id: int, user_id: int) -> FSMContext:
dp = Singleton().dp
bot = Singleton().bot
return FSMContext(
storage=dp.storage,
key=StorageKey(
chat_id=chat_id,
user_id=user_id,
bot_id=bot.id,
),
)
def set_fsm(storage):
app = Singleton()
log(storage)
app.storage["_fsm_storage"] = storage
def get_module(
module_id: str, paths=None
) -> Union[types.ModuleType, Union[Any, None], Tuple[Union[Any, None], ...]]:
app = Singleton()
module = app.modules_manager.get_by_id(module_id)
if paths is None:
return module
if isinstance(paths, str):
paths = [paths]
results = []
for path in paths:
current_obj = module
try:
parts = path.split(".")
for part in parts:
current_obj = getattr(current_obj, part)
results.append(current_obj)
except AttributeError:
results.append(None)
if len(results) == 1:
return results[0]
else:
return tuple(results)
class Storage:
@staticmethod
def set(key: str, value: Any):
storage = Singleton().storage
storage[key] = value
@staticmethod
def get(key: str):
storage = Singleton().storage
return storage.get(key)

View File

@@ -0,0 +1,12 @@
import re
CLEAN_HTML = re.compile("<.*?>")
class Utils:
@staticmethod
def code_format(code: str, lang: str):
if lang:
return f'<pre><code class="language-{lang}">{code}</code></pre>'
else:
return f"<pre>{code}</pre>"

View File

@@ -0,0 +1,109 @@
from _ast import AnnAssign
from typing import Any
from aiogram import Bot
from RestrictedPython import (
RestrictingNodeTransformer,
limited_builtins,
safe_builtins,
utility_builtins,
)
from RestrictedPython.Eval import default_guarded_getitem, default_guarded_getiter
from RestrictedPython.Guards import (
full_write_guard,
guarded_unpack_sequence,
safer_getattr,
)
from ocab_core.logger import log
from ocab_core.modules_system.safe.zope_guards import extra_safe_builtins
class RestrictedPythonPolicy(RestrictingNodeTransformer):
def visit_AsyncFunctionDef(self, node):
return self.node_contents_visit(node)
def visit_Await(self, node):
return self.node_contents_visit(node)
def visit_AsyncFor(self, node):
return self.node_contents_visit(node)
def visit_AsyncWith(self, node):
return self.node_contents_visit(node)
"""
Не работает из-за getattr
def visit_Match(self, node) -> Any:
return self.node_contents_visit(node)
def visit_match_case(self, node) -> Any:
return self.node_contents_visit(node)
def visit_MatchAs(self, node) -> Any:
return self.node_contents_visit(node)
def visit_MatchValue(self, node) -> Any:
return self.node_contents_visit(node)
"""
def visit_AnnAssign(self, node: AnnAssign) -> Any:
# missing in RestrictingNodeTransformer
# this doesn't need the logic that is in visit_Assign
# because it doesn't have a "targets" attribute,
# and node.target: Name | Attribute | Subscript
return self.node_contents_visit(node)
# new Python 3.12 nodes
def visit_TypeAlias(self, node) -> Any:
# missing in RestrictingNodeTransformer
return self.node_contents_visit(node)
def visit_TypeVar(self, node) -> Any:
# missing in RestrictingNodeTransformer
return self.node_contents_visit(node)
def visit_TypeVarTuple(self, node) -> Any:
# missing in RestrictingNodeTransformer
return self.node_contents_visit(node)
def visit_ParamSpec(self, node) -> Any:
# missing in RestrictingNodeTransformer
return self.node_contents_visit(node)
def _metaclass(name, bases, dict):
ob = type(name, bases, dict)
ob.__allow_access_to_unprotected_subobjects__ = 1
ob._guarded_writes = 1
return ob
ALLOWED_IMPORTS = [
"typing",
"aiogram",
"warnings",
]
def safes_getattr(object, name, default=None, getattr=safer_getattr):
if isinstance(object, Bot) and name == "token":
log("Bot.token is not allowed")
raise Exception("Bot.token is not allowed")
return getattr(object, name, default)
BUILTINS = safe_builtins.copy()
BUILTINS.update(utility_builtins)
BUILTINS.update(limited_builtins)
BUILTINS.update(extra_safe_builtins)
BUILTINS["__metaclass__"] = _metaclass
BUILTINS["_getitem_"] = default_guarded_getitem
BUILTINS["_getattr_"] = safes_getattr
BUILTINS["_getiter_"] = default_guarded_getiter
BUILTINS["_write_"] = full_write_guard
BUILTINS["_unpack_sequence_"] = guarded_unpack_sequence
BUILTINS["staticmethod"] = staticmethod
BUILTINS["tuple"] = tuple

View File

@@ -0,0 +1,225 @@
#############################################################################
#
# Copyright (c) 2024 OCAB Team
# Copyright (c) 2002 Zope Foundation and Contributors.
#
# This software includes a function derived from the software subject to the
# provisions of the Zope Public License, Version 2.1 (ZPL). A copy of the ZPL
# should accompany this distribution. THIS SOFTWARE IS PROVIDED "AS IS" AND ANY
# AND ALL EXPRESS OR IMPLIED WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST
# INFRINGEMENT, AND FITNESS FOR A PARTICULAR PURPOSE.
#
#
##############################################################################
extra_safe_builtins = {}
class GuardedDictType:
def __call__(self, *args, **kwargs):
return dict(*args, **kwargs)
def fromkeys(self, S, v=None):
return dict.fromkeys(S, v)
extra_safe_builtins["dict"] = GuardedDictType()
ContainerAssertions = {
type(()): 1,
bytes: 1,
str: 1,
range: 1,
}
Containers = ContainerAssertions.get
def _error(index):
raise Exception("unauthorized access to element")
def guard(container, value, index=None):
# if Containers(type(container)) and Containers(type(value)):
# # Simple type. Short circuit.
# return
# I don't know how to do this.
# if getSecurityManager().validate(container, container, index, value):
# return
# _error(index)
return
class SafeIter:
__allow_access_to_unprotected_subobjects__ = 1
def __init__(self, ob, container=None):
self._iter = iter(ob)
if container is None:
container = ob
self.container = container
def __iter__(self):
return self
def __next__(self):
ob = next(self._iter)
guard(self.container, ob)
return ob
next = __next__
class NullIter(SafeIter):
def __init__(self, ob):
self._iter = ob
def __next__(self):
return next(self._iter)
next = __next__
def guarded_iter(*args):
if len(args) == 1:
i = args[0]
# Don't double-wrap
if isinstance(i, SafeIter):
return i
if not isinstance(i, range):
return SafeIter(i)
# Other call styles / targets don't need to be guarded
return NullIter(iter(*args))
extra_safe_builtins["iter"] = guarded_iter
def guarded_any(seq):
return any(guarded_iter(seq))
extra_safe_builtins["any"] = guarded_any
def guarded_all(seq):
return all(guarded_iter(seq))
extra_safe_builtins["all"] = guarded_all
valid_inplace_types = (list, set)
inplace_slots = {
"+=": "__iadd__",
"-=": "__isub__",
"*=": "__imul__",
"/=": (1 / 2 == 0) and "__idiv__" or "__itruediv__",
"//=": "__ifloordiv__",
"%=": "__imod__",
"**=": "__ipow__",
"<<=": "__ilshift__",
">>=": "__irshift__",
"&=": "__iand__",
"^=": "__ixor__",
"|=": "__ior__",
}
def __iadd__(x, y):
x += y
return x
def __isub__(x, y):
x -= y
return x
def __imul__(x, y):
x *= y
return x
def __idiv__(x, y):
x /= y
return x
def __ifloordiv__(x, y):
x //= y
return x
def __imod__(x, y):
x %= y
return x
def __ipow__(x, y):
x **= y
return x
def __ilshift__(x, y):
x <<= y
return x
def __irshift__(x, y):
x >>= y
return x
def __iand__(x, y):
x &= y
return x
def __ixor__(x, y):
x ^= y
return x
def __ior__(x, y):
x |= y
return x
inplace_ops = {
"+=": __iadd__,
"-=": __isub__,
"*=": __imul__,
"/=": __idiv__,
"//=": __ifloordiv__,
"%=": __imod__,
"**=": __ipow__,
"<<=": __ilshift__,
">>=": __irshift__,
"&=": __iand__,
"^=": __ixor__,
"|=": __ior__,
}
def protected_inplacevar(op, var, expr):
"""Do an inplace operation
If the var has an inplace slot, then disallow the operation
unless the var an instance of ``valid_inplace_types``.
"""
if hasattr(var, inplace_slots[op]) and not isinstance(var, valid_inplace_types):
try:
cls = var.__class__
except AttributeError:
cls = type(var)
raise TypeError(
"Augmented assignment to %s objects is not allowed"
" in untrusted code" % cls.__name__
)
return inplace_ops[op](var, expr)
extra_safe_builtins["_inplacevar_"] = protected_inplacevar

View File

@@ -0,0 +1,25 @@
from aiogram import Bot, Dispatcher
from aiogram.fsm.storage.memory import MemoryStorage
from ocab_core.modules_system import ModulesManager
class SingletonMeta(type):
_instances = {}
def __call__(cls, *args, **kwargs):
if cls not in cls._instances:
instance = super().__call__(*args, **kwargs)
cls._instances[cls] = instance
return cls._instances[cls]
class Singleton(metaclass=SingletonMeta):
bot: Bot
dp: Dispatcher = None
modules_manager: ModulesManager = None
storage = {
"_fsm_storage": MemoryStorage(),
"_routers": [],
"_outer_message_middlewares": [],
}

1
src/ocab_modules/external/__init__.py vendored Normal file
View File

@@ -0,0 +1 @@
from . import yandexgpt

View File

@@ -0,0 +1 @@
from .main import module_init

View File

@@ -0,0 +1,136 @@
from aiogram import Bot, Router
from aiogram.enums import ParseMode
from aiogram.fsm.context import FSMContext
from aiogram.fsm.state import State, StatesGroup
from aiogram.types import (
BufferedInputFile,
KeyboardButton,
Message,
ReplyKeyboardMarkup,
ReplyKeyboardRemove,
)
from ocab_core.modules_system.public_api import Utils, get_fsm_context
from .report import Report
router = Router()
class ReportState(StatesGroup):
input_system_info = State()
input_app_name = State()
input_problem_step_by_step = State()
input_actual_result = State()
input_expected_result = State()
input_additional_info = State()
system_info_code = """echo "SESSION_TYPE: ${XDG_SESSION_TYPE:-Unknown}"
[ -f /etc/os-release ] && grep "^PRETTY_NAME=" /etc/os-release | cut -d= -f2 \
| tr -d '"' | xargs echo "OS: "
echo "Kernel: $(uname -r)"
echo "DE: ${XDG_CURRENT_DESKTOP:-Unknown}"
grep "^model name" /proc/cpuinfo | head -n1 | cut -d: -f2 \
| xargs echo "CPU: "
lspci | grep "VGA compatible controller" | cut -d: -f3 \
| xargs -I{} echo "GPU: {}"
"""
system_info_message = """Укажите параметры свой системы.
Собрать информацию о системе можно с помощью данного скрипта:
""" + Utils.code_format(
system_info_code,
"shell",
)
async def start_report(chat_id: int, bot: Bot):
await bot.send_message(
chat_id=chat_id,
text=system_info_message,
parse_mode=ParseMode.HTML,
reply_markup=ReplyKeyboardRemove(),
)
state = await get_fsm_context(chat_id, chat_id)
await state.set_state(ReportState.input_system_info)
app_info_message = """Укажите название и версию приложения.
Узнать можно с помощью данной команды:""" + Utils.code_format(
"rpm -qa | grep -i НАЗВАНИЕРИЛОЖЕНИЯ", "shell"
)
@router.message(ReportState.input_system_info)
async def system_entered(message: Message, state: FSMContext):
await state.update_data(system=message.text)
await message.answer(
text=app_info_message,
parse_mode=ParseMode.HTML,
)
await state.set_state(ReportState.input_app_name)
step_by_step_message = (
"""Опиши проблему пошагово, что ты делал, что происходило, что не так."""
)
@router.message(ReportState.input_app_name)
async def app_name_entered(message: Message, state: FSMContext):
await state.update_data(app=message.text)
await message.answer(text=step_by_step_message)
await state.set_state(ReportState.input_problem_step_by_step)
@router.message(ReportState.input_problem_step_by_step)
async def problem_step_by_step_entered(message: Message, state: FSMContext):
await state.update_data(problem_step_by_step=message.text)
await message.answer(text="Опиши, что произошло (фактический результат).")
await state.set_state(ReportState.input_actual_result)
@router.message(ReportState.input_actual_result)
async def actual_result_entered(message: Message, state: FSMContext):
await state.update_data(actual=message.text)
await message.answer(text="Опиши ожидаемый результат.")
await state.set_state(ReportState.input_expected_result)
@router.message(ReportState.input_expected_result)
async def expected_result_entered(message: Message, state: FSMContext):
await state.update_data(expected=message.text)
await message.answer(
text="Если есть дополнительная информация, то напиши ее.",
reply_markup=ReplyKeyboardMarkup(
resize_keyboard=True,
keyboard=[
[KeyboardButton(text="Дополнительной информации нет")],
],
),
)
await state.set_state(ReportState.input_additional_info)
@router.message(ReportState.input_additional_info)
async def additional_info_entered(message: Message, state: FSMContext):
if message.text == "Дополнительной информации нет":
additional_info = ""
else:
additional_info = message.text
await state.update_data(additional=additional_info)
await message.answer(
text="Вот твой отчет сообщением, а также файлом:",
reply_markup=ReplyKeyboardRemove(),
)
data = await state.get_data()
report = Report(data)
file_report = report.export().encode()
await message.answer(text=report.export())
await message.answer_document(document=BufferedInputFile(file_report, "report.txt"))
await state.clear()

View File

@@ -0,0 +1,14 @@
{
"id": "external.create_report_apps",
"name": "Create Report Apps",
"description": "Модуль для создания отчетов о ошибках в приложениях",
"author": [
"OCAB Team",
"Maxim Slipenko"
],
"version": "1.0.0",
"privileged": false,
"dependencies": {
"standard.command_helper": "^1.0.0"
}
}

View File

@@ -0,0 +1,115 @@
from typing import Union
from aiogram import Bot, F, Router
from aiogram.exceptions import TelegramForbiddenError
from aiogram.filters import BaseFilter, Command, CommandStart
from aiogram.types import (
CallbackQuery,
InlineKeyboardButton,
InlineKeyboardMarkup,
Message,
)
from ocab_core.modules_system.public_api import get_module, register_router
from .create_report import router as create_report_router
from .create_report import start_report
register_command = get_module("standard.command_helper", "register_command")
router = Router()
class ChatTypeFilter(BaseFilter):
def __init__(self, chat_type: Union[str, list]):
self.chat_type = chat_type
async def __call__(self, message: Message) -> bool:
if isinstance(self.chat_type, str):
return message.chat.type == self.chat_type
return message.chat.type in self.chat_type
@router.message(
ChatTypeFilter(chat_type=["group", "supergroup"]), Command("create_report_apps")
)
async def create_report_apps_command_group(message: Message):
keyboard = InlineKeyboardMarkup(
inline_keyboard=[
[
InlineKeyboardButton(
text="Да", callback_data=f"create_report:{message.from_user.id}"
),
InlineKeyboardButton(
text="Нет", callback_data=f"cancel_report:{message.from_user.id}"
),
]
]
)
await message.answer(
"Я могу отправить тебе пару вопросов "
"для помощи в составлении репорта личными "
"сообщениями.",
reply_markup=keyboard,
)
@router.message(
ChatTypeFilter(chat_type=["private"]),
CommandStart(deep_link=True, magic=F.args == "create_report_apps"),
)
@router.message(ChatTypeFilter(chat_type=["private"]), Command("create_report_apps"))
async def create_report_apps_command(message: Message, bot: Bot):
await start_report(message.from_user.id, bot)
@router.callback_query(F.data.startswith("cancel_report"))
async def cancel_report_callback(callback_query: CallbackQuery):
callback_user_id = int(callback_query.data.split(":")[1])
if callback_query.from_user.id != callback_user_id:
await callback_query.answer("Эта кнопка не для вас.", show_alert=True)
return
await callback_query.message.delete()
@router.callback_query(F.data.startswith("create_report"))
async def create_report_callback(callback_query: CallbackQuery, bot: Bot):
callback_user_id = int(callback_query.data.split(":")[1])
if callback_query.from_user.id != callback_user_id:
await callback_query.answer("Эта кнопка не для вас.", show_alert=True)
return
user_id = callback_query.from_user.id
async def on_chat_unavailable():
await callback_query.message.edit_text(
"Я в личных сообщениях задам тебе вопросы "
"для помощи в составлении репорта. "
'Но перед этим ты должен нажать кнопку "Запустить"'
)
info = await bot.get_me()
await callback_query.answer(
url=f"https://t.me/{info.username}?start=create_report_apps"
)
try:
chat_member = await bot.get_chat_member(chat_id=user_id, user_id=user_id)
if chat_member.status != "left":
await start_report(user_id, bot)
await callback_query.message.edit_text(
"Я в личных сообщениях задам тебе "
"вопросы для помощи в составлении "
"репорта."
)
else:
await on_chat_unavailable()
except TelegramForbiddenError:
await on_chat_unavailable()
async def module_init():
router.include_router(create_report_router)
register_router(router)
register_command("create_report_apps", "Написать репорт о приложении")

View File

@@ -0,0 +1,59 @@
import aiogram
class ReportFormatter:
def __init__(self, html=True):
self.html = html
def bold(self, string):
if self.html:
return f"<b>{self.text(string)}</b>"
return self.text(string)
def text(self, string):
if self.html:
return aiogram.html.quote(string)
return string
class Report:
def __init__(self, data: dict):
self.data = data
def export(self):
data = self.data
report = f"""
Стенд с ошибкой:
==============================
{data['system']}
Пакет:
==============================
{data['app']}
Шаги, приводящие к ошибке:
==============================
{data['problem_step_by_step']}
Фактический результат:
==============================
{data['actual']}
Ожидаемый результат:
==============================
{data['expected']}
"""
if data["additional"] != "":
report += f"""
Дополнительно:
==============================
{data['additional']}
"""
return report

View File

@@ -0,0 +1 @@
from .handlers import answer_to_message

View File

@@ -0,0 +1,25 @@
# flake8: noqa
from aiogram import Bot
from aiogram.types import Message
from ocab_modules.external.yandexgpt.yandexgpt import *
from ocab_modules.standard.config.config import (
get_yandexgpt_catalog_id,
get_yandexgpt_prompt,
get_yandexgpt_token,
)
from ocab_modules.standard.database.db_api import add_message
async def answer_to_message(message: Message, bot: Bot):
# print("answer_to_message")
log("answer_to_message")
yagpt = YandexGPT(get_yandexgpt_token(), get_yandexgpt_catalog_id())
text = message.text
prompt = get_yandexgpt_prompt()
# response = await yagpt.async_yandexgpt(system_prompt=prompt, input_messages=text)
response = await yagpt.yandexgpt_request(
chat_id=message.chat.id, message_id=message.message_id, type="yandexgpt"
)
reply = await message.reply(response, parse_mode="Markdown")
add_message(reply, message_ai_model="yandexgpt")

View File

@@ -0,0 +1,9 @@
{
"id": "external.yandexgpt",
"name": "Yandex GPT",
"description": "Модуль для работы с Yandex GPT",
"author": "OCAB Team",
"version": "1.0.0",
"privileged": true,
"dependencies": {}
}

View File

@@ -0,0 +1,10 @@
# flake8: noqa
from aiogram import F, Router
from src.ocab_modules.external.yandexgpt.handlers import answer_to_message
router = Router()
# Если сообщение содержит в начале текст "Гномик" или "гномик" или отвечает на сообщение бота, то вызывается функция answer_to_message
router.message.register(
answer_to_message, F.text.startswith("Гномик") | F.text.startswith("гномик")
)

View File

@@ -0,0 +1,292 @@
# flake8: noqa
import asyncio
import json
import aiohttp
import requests
from ocab_core.logger import log
from ...standard.config.config import *
from ...standard.database import *
class YandexGPT:
token = None
catalog_id = None
languages = {
"ru": "русский язык",
"en": "английский язык",
"de": "немецкий язык",
"uk": "украинский язык",
"es": "испанский язык",
"be": "белорусский язык",
}
def __init__(self, token, catalog_id):
self.token = token
self.catalog_id = catalog_id
async def async_request(self, url, headers, prompt) -> dict:
async with aiohttp.ClientSession() as session:
async with session.post(url, headers=headers, json=prompt) as response:
return await response.json()
async def async_token_check(
self, messages, gpt, max_tokens, stream, temperature, del_msg_id=1
):
url = "https://llm.api.cloud.yandex.net/foundationModels/v1/tokenizeCompletion"
headers = {
"Content-Type": "application/json",
"Authorization": f"Api-Key {self.token}",
}
answer_token = get_yandexgpt_token_for_answer()
while True:
try:
request = {
"modelUri": gpt,
"completionOptions": {
"stream": stream,
"temperature": temperature,
"maxTokens": max_tokens,
},
"messages": messages,
}
response = await self.async_request(
url=url, headers=headers, prompt=request
)
except Exception as e: # TODO: Переделать обработку ошибок
# print(e)
log(f"Error: {e}")
continue
if int(len(response["tokens"])) < (max_tokens - answer_token):
break
else:
try:
messages.pop(del_msg_id)
except IndexError:
Exception("IndexError: list index out of range")
return messages
async def async_yandexgpt_lite(
self,
system_prompt,
input_messages,
stream=False,
temperature=0.6,
max_tokens=8000,
):
url = "https://llm.api.cloud.yandex.net/foundationModels/v1/completion"
gpt = f"gpt://{self.catalog_id}/yandexgpt-lite/latest"
headers = {
"Content-Type": "application/json",
"Authorization": f"Api-Key {self.token}",
}
messages = [{"role": "system", "text": system_prompt}]
for message in input_messages:
messages.append(message)
messages = await self.async_token_check(messages, gpt, max_tokens)
prompt = {
"modelUri": gpt,
"completionOptions": {
"stream": stream,
"temperature": temperature,
"maxTokens": max_tokens,
},
"messages": messages,
}
response = requests.post(url, headers=headers, json=prompt).text # nosec
return json.loads(response)["result"]["alternatives"][0]["message"]["text"]
async def async_yandexgpt(
self,
system_prompt,
input_messages,
stream=False,
temperature=0.6,
max_tokens=get_yandexgpt_token_for_request(),
):
url = "https://llm.api.cloud.yandex.net/foundationModels/v1/completion"
gpt = f"gpt://{self.catalog_id}/yandexgpt/latest"
headers = {
"Content-Type": "application/json",
"Authorization": f"Api-Key {self.token}",
}
messages = []
messages.append({"role": "system", "text": system_prompt})
for message in input_messages:
messages.append(message)
messages = await self.async_token_check(
messages, gpt, max_tokens, stream, temperature
)
request = {
"modelUri": gpt,
"completionOptions": {
"stream": stream,
"temperature": temperature,
"maxTokens": max_tokens,
},
"messages": messages,
}
response = await self.async_request(
url=url, headers=headers, prompt=request
) # nosec
return response["result"]["alternatives"][0]["message"]["text"]
async def async_yandexgpt_translate(self, input_language, output_language, text):
input_language = self.languages[input_language]
output_language = self.languages[output_language]
return await self.async_yandexgpt(
f"Переведи на {output_language} сохранив оригинальный смысл текста. Верни только результат:",
[{"role": "user", "text": text}],
stream=False,
temperature=0.6,
max_tokens=8000,
)
async def async_yandexgpt_spelling_check(self, input_language, text):
input_language = self.languages[input_language]
return await self.async_yandexgpt(
f"Проверьте орфографию и пунктуацию текста на {input_language}. Верни исправленный текст "
f"без смысловых искажений:",
[{"role": "user", "text": text}],
stream=False,
temperature=0.6,
max_tokens=8000,
)
async def async_yandexgpt_text_history(
self, input_messages, stream=False, temperature=0.6, max_tokens=8000
):
url = "https://llm.api.cloud.yandex.net/foundationModels/v1/completion"
gpt = f"gpt://{self.catalog_id}/summarization/latest"
headers = {
"Content-Type": "application/json",
"Authorization": f"Api-Key {self.token}",
}
messages = []
for message in input_messages:
messages.append(message)
messages = await self.async_token_check(messages, gpt, max_tokens, del_msg_id=0)
prompt = {
"modelUri": gpt,
"completionOptions": {
"stream": stream,
"temperature": temperature,
"maxTokens": max_tokens,
},
"messages": messages,
}
response = requests.post(url, headers=headers, json=prompt).text # nosec
return json.loads(response)["result"]["alternatives"][0]["message"]["text"]
async def async_yandex_cloud_text_to_speech(
self, text, voice, emotion, speed, format, quality
):
tts = "tts.api.cloud.yandex.net/speech/v1/tts:synthesize"
# TODO: Сделать функцию TTS
return 0
async def async_yandex_cloud_vision(self, image, features, language):
# TODO: Сделать функцию Vision
return 0
async def collect_messages(self, message_id, chat_id):
messages = []
# Собираем цепочку сообщений в формате: [{"role": "user", "text": "<Имя_пользователя>: Привет!"},
# {"role": "assistant", "text": "Привет!"}]
while True:
message = db_api.get_message_text(chat_id, message_id)
if db_api.get_message_ai_model(chat_id, message_id) != None:
messages.append({"role": "assistant", "text": message})
else:
sender_name = db_api.get_user_name(
db_api.get_message_sender_id(chat_id, message_id)
)
messages.append({"role": "user", "text": sender_name + ": " + message})
message_id = db_api.get_answer_to_message_id(chat_id, message_id)
if message_id is None:
break
return list(reversed(messages))
async def collecting_messages_for_history(
self, start_message_id, end_message_id, chat_id
):
messages = []
# Собираем цепочку сообщений в формате: [{"role": "user", "text": "<Имя_пользователя>: Привет!"},
# {"role": "assistant", "text": "Привет!"}]
while True:
message = db_api.get_message_text(chat_id, start_message_id)
if db_api.get_message_ai_model(chat_id, start_message_id) != None:
messages.append({"role": "assistant", "text": message})
else:
sender_name = db_api.get_user_name(
db_api.get_message_sender_id(chat_id, start_message_id)
)
messages.append({"role": "user", "text": sender_name + ": " + message})
start_message_id -= 1
if start_message_id <= end_message_id:
break
return messages.reverse()
async def yandexgpt_request(
self,
message_id=None,
type="yandexgpt-lite",
chat_id=None,
message_id_end=None,
input_language=None,
output_language=None,
text=None,
):
if type == "yandexgpt-lite":
messages = await self.collect_messages(message_id, chat_id)
return await self.async_yandexgpt_lite(
system_prompt=get_yandexgpt_prompt(),
input_messages=messages,
stream=False,
temperature=0.6,
max_tokens=8000,
)
elif type == "yandexgpt":
# print("yandexgpt_request")
log("yandexgpt_request")
messages = await self.collect_messages(message_id, chat_id)
return await self.async_yandexgpt(
system_prompt=get_yandexgpt_prompt(),
input_messages=messages,
stream=False,
temperature=0.6,
max_tokens=get_yandexgpt_token_for_request(),
)
elif type == "yandexgpt-translate":
return await self.async_yandexgpt_translate(
input_language,
output_language,
text=db_api.get_message_text(chat_id, message_id),
)
elif type == "yandexgpt-spelling-check":
return await self.async_yandexgpt_spelling_check(
input_language, text=db_api.get_message_text(chat_id, message_id)
)
elif type == "yandexgpt-text-history":
messages = await self.collect_messages_for_history(
message_id, message_id_end, chat_id
)
return await self.async_yandexgpt_text_history(
messages=messages, stream=False, temperature=0.6, max_tokens=8000
)
else:
return "Ошибка: Неизвестный тип запроса | Error: Unknown request type"

View File

@@ -0,0 +1 @@
from .moderation import ban_user, unmute_user

View File

@@ -0,0 +1,6 @@
{
"name": "Moderation",
"description": "Moderation commands for OCAB",
"author": "OCAB Team",
"version": "1.0"
}

View File

@@ -0,0 +1,82 @@
# flake8: noqa
import asyncio
import time
import aiogram
import aiohttp
from ocab_modules.standard.config.config import *
from ocab_modules.standard.roles.roles import *
class Moderation:
def __init__(self):
access_rights = get_access_rights()
bot_check_message = bool(self.access_rights["BOT_CHECK_MESSAGE"])
bot_ban_user = bool(self.access_rights["BOT_BAN_USER"])
bot_mute_user = bool(self.access_rights["BOT_MUTE_USER"])
moderator_rights = self.access_rights["MODERATOR_RIGHTS"]
ai_check_message = bool(self.access_rights["AI_CHECK_MESSAGE"])
beta_ai_check_message = bool(self.access_rights["BETA_AI_CHECK_MESSAGE"])
async def time_to_seconds(time):
# Конвертация текстового указания времени по типу 3h, 5m, 10s в минуты
if time[-1] == "d":
return int(time[:-1]) * 86400
elif time[-1] == "h":
return int(time[:-1]) * 3600
elif time[-1] == "m":
return int(time[:-1]) * 60
elif time[-1] == "s":
return int(time[:-1])
async def short_time_to_time(self, time):
# Конвертация времени в длинное название
if time[-1] == "d":
return str(f"{time[0:-1]} дней")
elif time[-1] == "h":
return str(f"{time[0:-1]} часов")
elif time[-1] == "m":
return str(f"{time[0:-1]} минут")
elif time[-1] == "s":
return str(f"{time[0:-1]} секунд")
async def delete_message(self, chat_id, message_id, bot: aiogram.Bot):
await bot.delete_message(chat_id, message_id)
async def ban_user(self, chat_id, user_id, bot: aiogram.Bot):
await bot.ban_chat_member(chat_id, user_id)
async def mute_user(chat_id, user_id, time, bot: aiogram.Bot):
mutePermissions = {
"can_send_messages": False,
"can_send_audios": False,
"can_send_documents": False,
"can_send_photos": False,
"can_send_videos": False,
"can_send_video_notes": False,
"can_send_voice_notes": False,
"can_send_polls": False,
"can_send_other_messages": False,
"can_add_web_page_previews": False,
"can_change_info": False,
"can_invite_users": False,
"can_pin_messages": False,
"can_manage_topics": False,
}
end_time = time + int(time.time())
await bot.restrict_chat_member(
chat_id, user_id, until_date=end_time, **mutePermissions
)
async def unmute_user(chat_id, user_id, bot: aiogram.Bot):
await bot.restrict_chat_member(
chat_id, user_id, use_independent_chat_permissions=True
)
async def ban_user(chat_id, user_id, bot: aiogram.Bot):
await bot.ban_chat_member(chat_id, user_id)

View File

@@ -0,0 +1,84 @@
# flake8: noqa
import asyncio
import random
from threading import Thread
from aiogram import Bot
from aiogram.types import inline_keyboard_button as types
from aiogram.utils.keyboard import InlineKeyboardBuilder
from ocab_modules.legacy.moderation import ban_user, unmute_user
from src.ocab_modules.standard.config.config import get_telegram_check_bot
from src.ocab_modules.standard.database.db_api import *
async def create_math_task():
first_number = random.randint(1, 100) # nosec
second_number = random.randint(1, 100) # nosec
answer = first_number + second_number
fake_answers = []
for i in range(3):
diff = random.randint(1, 10) # nosec
diff_sign = random.choice(["+", "-"]) # nosec
fake_answers.append(answer + diff if diff_sign == "+" else answer - diff)
fake_answers.append(answer)
random.shuffle(fake_answers)
return [answer, first_number, second_number, fake_answers]
async def ban_user_timer(chat_id: int, user_id: int, time: int, bot: Bot):
await asyncio.sleep(time)
if get_user(user_id) is not None:
pass
else:
await ban_user()
async def check_new_user(message: Message, bot: Bot):
print("check_new_user")
if get_telegram_check_bot():
# Проверяем наличие пользователя в базе данных
if get_user(message.from_user.id) is None:
# Выдаём пользователю ограничение на отправку сообщений на 3 минуты
ban_task = Thread(
target=ban_user_timer,
args=(message.chat.id, message.from_user.id, 180, bot),
)
ban_task.start()
# Создаём задачу с отложенным выполнением на 3 минуты
math_task = await create_math_task()
text = f"{math_task[1]} + {math_task[2]}"
builder = InlineKeyboardBuilder()
for answer in math_task[3]:
if answer == math_task[0]:
builder.add(
types.InlineKeyboardButton(
text=answer, callback_data=f"check_math_task_true"
)
)
else:
builder.add(
types.InlineKeyboardButton(
text=answer, callback_data=f"check_math_task_false"
)
)
await message.reply(
f"Приветствую, {message.from_user.first_name}!\n"
f"Для продолжения работы с ботом, пожалуйста, решите математический пример в течении 3х минут:\n"
f"*{text}*",
reply_markup=builder.as_markup(),
)
async def math_task_true(message: Message, bot: Bot):
await message.reply(f"Верно! Добро пожаловать в чат {message.from_user.first_name}")
await unmute_user(message.chat.id, message.from_user.id, bot)
add_user(
message.from_user.id,
message.from_user.first_name + " " + message.from_user.last_name,
message.from_user.username,
)
pass

View File

@@ -0,0 +1,6 @@
{
"name": "Welcome",
"description": "Мо",
"author": "OCAB Team",
"version": "1.0"
}

View File

@@ -0,0 +1,12 @@
from aiogram import F, Router
from .handlers import check_new_user
router = Router()
# Если в чат пришел новый пользователь
router.message.register(check_new_user, F.new_chat_members.exists())
# Ловин колбеки от кнопок с callback_data=f"check_math_task_true"
router.callback_query.register(
check_new_user, F.callback_data == "check_math_task_true"
)

View File

@@ -0,0 +1 @@
from .main import module_init

View File

@@ -0,0 +1,56 @@
# flake8: noqa
from aiogram import Bot
from aiogram.types import Message
from ocab_core.modules_system.public_api import get_module
get_default_chat_tag = get_module("standard.config", "get_default_chat_tag")
async def delete_message(message: Message, bot: Bot):
reply_message_id = message.reply_to_message.message_id
await bot.delete_message(message.chat.id, reply_message_id)
async def error_access(message: Message, bot: Bot):
await message.reply("Вы не админ/модератор")
async def get_chat_id(message: Message, bot: Bot):
await message.reply(
f"ID данного чата: `{message.chat.id}`", parse_mode="MarkdownV2"
)
async def chat_not_in_approve_list(message: Message, bot: Bot):
await message.reply(
f"Бот недоступен в данном чате, пожалуйста,"
f" обратитесь к администратору для добавления чата в список доступных или перейдите в чат "
f"{get_default_chat_tag()}"
)
await get_chat_id(message, bot)
async def mute_user(chat_id: int, user_id: int, time: int, bot: Bot):
# *, can_send_messages: bool | None = None, can_send_audios: bool | None = None, can_send_documents: bool | None = None, can_send_photos: bool | None = None, can_send_videos: bool | None = None, can_send_video_notes: bool | None = None, can_send_voice_notes: bool | None = None, can_send_polls: bool | None = None, can_send_other_messages: bool | None = None, can_add_web_page_previews: bool | None = None, can_change_info: bool | None = None, can_invite_users: bool | None = None, can_pin_messages: bool | None = None, can_manage_topics: bool | None = None, **extra_data: Any)
mutePermissions = {
"can_send_messages": False,
"can_send_audios": False,
"can_send_documents": False,
"can_send_photos": False,
"can_send_videos": False,
"can_send_video_notes": False,
"can_send_voice_notes": False,
"can_send_polls": False,
"can_send_other_messages": False,
"can_add_web_page_previews": False,
"can_change_info": False,
"can_invite_users": False,
"can_pin_messages": False,
"can_manage_topics": False,
}
end_time = time + int(time.time())
await bot.restrict_chat_member(
chat_id, user_id, until_date=end_time, **mutePermissions
)

View File

@@ -0,0 +1,11 @@
{
"id": "standard.admin",
"name": "Admin",
"description": "Модуль для работы с админкой",
"author": "OCAB Team",
"version": "1.0.0",
"privileged": false,
"dependencies": {
"standard.filters": "^1.0.0"
}
}

View File

@@ -0,0 +1,7 @@
from ocab_core.modules_system.public_api import register_router
from .routers import router
async def module_init():
register_router(router)

View File

@@ -0,0 +1,27 @@
# flake8: noqa
from aiogram import F, Router
from aiogram.filters import Command
from ocab_core.modules_system.public_api import get_module, log
from .handlers import (
chat_not_in_approve_list,
delete_message,
error_access,
get_chat_id,
)
(ChatModerOrAdminFilter, ChatNotInApproveFilter) = get_module(
"standard.filters", ["ChatModerOrAdminFilter", "ChatNotInApproveFilter"]
)
router = Router()
# Если сообщение содержит какой либо текст и выполняется фильтр ChatNotInApproveFilter, то вызывается функция chat_not_in_approve_list
router.message.register(chat_not_in_approve_list, ChatNotInApproveFilter(), F.text)
router.message.register(get_chat_id, ChatModerOrAdminFilter(), Command("chatID"))
router.message.register(delete_message, ChatModerOrAdminFilter(), Command("rm"))
router.message.register(error_access, Command("rm"))
router.message.register(error_access, Command("chatID"))

View File

@@ -0,0 +1 @@
from .main import module_init, register_command

View File

@@ -0,0 +1,12 @@
{
"id": "standard.command_helper",
"name": "Command helper",
"description": "Модуль для отображения команд при вводе '/'",
"author": "OCAB Team",
"version": "1.0.0",
"privileged": false,
"dependencies": {
"standard.roles": "^1.0.0",
"standard.database": "^1.0.0"
}
}

View File

@@ -0,0 +1,96 @@
from typing import Any, Awaitable, Callable, Dict
from aiogram import BaseMiddleware
from aiogram.types import BotCommand, TelegramObject
from ocab_core.modules_system.public_api import (
get_module,
register_outer_message_middleware,
set_my_commands,
)
commands = dict()
db_api = get_module(
"standard.database",
"db_api",
)
Roles = get_module("standard.roles", "Roles")
def register_command(command, description, role="USER"):
if role not in commands:
commands[role] = dict()
commands[role][command] = {
"description": description,
}
class OuterMiddleware(BaseMiddleware):
async def __call__(
self,
handler: Callable[[TelegramObject, Dict[str, Any]], Awaitable[Any]],
event: TelegramObject,
data: Dict[str, Any],
):
# if not isinstance(event, Message):
# return await handler(event, data)
#
# user = db_api.get_user(event.from_user.id)
#
# if user is None:
# return
#
# roles = Roles()
# role_name = await roles.get_role_name(role_id=user.user_role)
#
# if role_name not in commands:
# return await handler(event, data)
# bot_commands = []
# for role_command in commands[role_name]:
# bot_commands.append(
# BotCommand(
# command=role_command,
# description=commands[role_name][role_command]["description"],
# )
# )
# await event.bot.set_my_commands(
# bot_commands,
# BotCommandScopeChatMember(
# chat_id=event.chat.id,
# user_id=event.from_user.id,
# ),
# )
return await handler(event, data)
async def module_init():
register_outer_message_middleware(OuterMiddleware())
async def set_user_commands():
bot_commands = []
if "USER" in commands:
user_commands = commands["USER"]
for command in user_commands:
bot_commands.append(
BotCommand(
command=command,
description=user_commands[command]["description"],
)
)
await set_my_commands(
bot_commands,
)
async def module_late_init():
await set_user_commands()

View File

@@ -0,0 +1,7 @@
from .config import (
get_approved_chat_id,
get_default_chat_tag,
get_roles,
get_yandexgpt_in_words,
get_yandexgpt_start_words,
)

View File

@@ -0,0 +1,79 @@
# flake8: noqa
import yaml
from src.service import paths
def get_config(is_test: bool = False) -> dict:
if is_test:
path = f"{paths.modules_standard}/config/tests"
else:
path = paths.core
path = f"{path}/config.yaml"
with open(path, "r") as file:
return yaml.full_load(file)
config = get_config()
def get_telegram_token() -> str:
return config["TELEGRAM"]["TOKEN"]
def get_telegram_check_bot() -> bool:
return config["TELEGRAM"]["CHECK_BOT"]
def get_approved_chat_id() -> list:
# Возваращем сплитованный список id чатов в формате int
return [
int(chat_id) for chat_id in config["TELEGRAM"]["APPROVED_CHAT_ID"].split(" | ")
]
def get_roles():
return config["ROLES"]
def get_user_role_name(role_number) -> dict:
# Возвращаем название роли пользвателя по номеру роли, если такой роли нет, возвращаем неизвестно
return config["ROLES"].get(role_number, "Неизвестно")
def get_default_chat_tag() -> str:
return config["TELEGRAM"]["DEFAULT_CHAT_TAG"]
def get_yandexgpt_token() -> str:
return config["YANDEXGPT"]["TOKEN"]
def get_yandexgpt_catalog_id() -> str:
return config["YANDEXGPT"]["CATALOGID"]
def get_yandexgpt_prompt() -> str:
return config["YANDEXGPT"]["PROMPT"]
def get_yandexgpt_start_words() -> list:
return config["YANDEXGPT"]["STARTWORD"].split(" | ")
def get_yandexgpt_in_words() -> list:
return config["YANDEXGPT"]["INWORD"].split(" | ")
def get_yandexgpt_token_for_request() -> int:
return config["YANDEXGPT"]["TOKEN_FOR_REQUEST"]
def get_yandexgpt_token_for_answer() -> int:
return config["YANDEXGPT"]["TOKEN_FOR_ANSWER"]
def get_access_rights() -> dict:
return get_config()["ACCESS_RIGHTS"]

View File

@@ -0,0 +1,9 @@
{
"id": "standard.config",
"name": "Config YAML",
"description": "Модуль для работы с конфигурационным файлом бота (YAML)",
"author": "OCAB Team",
"version": "1.0.0",
"privileged": true,
"dependencies": {}
}

View File

@@ -0,0 +1,7 @@
TELEGRAM:
TOKEN: xxxxxxxxxxxxxxxxxxxx
ROLES:
ADMIN: 0
MODERATOR: 1
USER: 2
BOT: 3

View File

@@ -0,0 +1,45 @@
import unittest
from src.ocab_modules.standard.config.config import get_config
yaml_load = get_config(is_test=True)
class TestConfig(unittest.TestCase):
def test_yaml_load_correctness(self):
self.assertIsNotNone(yaml_load)
self.assertIn("TELEGRAM", yaml_load)
self.assertIn("TOKEN", yaml_load["TELEGRAM"])
self.assertIn("ROLES", yaml_load)
self.assertIn("ADMIN", yaml_load["ROLES"])
self.assertIn("MODERATOR", yaml_load["ROLES"])
self.assertIn("USER", yaml_load["ROLES"])
self.assertIn("BOT", yaml_load["ROLES"])
def test_yaml_keys_existence(self):
self.assertTrue(all(key in yaml_load for key in ["TELEGRAM", "ROLES"]))
self.assertIn("TOKEN", yaml_load["TELEGRAM"])
self.assertTrue(
all(role in yaml_load["ROLES"] for role in ["ADMIN", "MODERATOR", "USER"])
)
def test_yaml_yaml_load_types(self):
self.assertIsInstance(yaml_load["TELEGRAM"]["TOKEN"], str)
self.assertTrue(
all(
isinstance(yaml_load["ROLES"][role], int)
for role in ["ADMIN", "MODERATOR", "USER"]
)
)
def test_yaml_values(self):
expected_token = "xxxxxxxxxxxxxxxxxxxx" # nosec
expected_role_values = {"ADMIN": 0, "MODERATOR": 1, "USER": 2, "BOT": 3}
self.assertEqual(yaml_load["TELEGRAM"]["TOKEN"], expected_token)
for role, value in expected_role_values.items():
self.assertEqual(yaml_load["ROLES"][role], value)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,48 @@
## Модуль DataBase
Модуль DataBase предназначен для ведения и работы с базами данных OCAB.
Модуль содержит в себе следующие таблицы:
* `Chats` - таблица для хранения информации о чатах.
* `Users` - таблица для хранения информации о пользователях.
* `Messages` - таблица для хранения информации о сообщениях.
* `ChatStats` - таблица для хранения статистики чатов по дням.
* `UserStats` - таблица для хранения статистики пользователей по дням.
руктура таблицы `Chats`:
* `chat_id` - идентификатор чата.
* `chat_name` - название чата.
* `chat_type` - тип чата. (0 - Чат администраторов, 1 - Пользовательский чат, 3 - Чат разрешённых личных запросов к боту
10 - Не инициализированный чат)
* `chat_stats` - количество всех отправленных сообщений в чате.
руктура таблицы `Users`:
* `user_id` - идентификатор пользователя telegram.
* `user_tag` - тег пользователя telegram.
* `user_name` - имя пользователя telegram.
* `user_role` - роль пользователя в чате. (0 - Администратор, 1 - Модератор, 2 - Пользователь)
* `user_stats` - количество всех отправленных сообщений пользователем.
* `user_rep` - репутация пользователя.
руктура таблицы `Messages`:
* `message_chat_id` - идентификатор чата в котором отправлено сообщение.
* `message_id` - идентификатор сообщения.
* `messag_sender_id` - идентификатор пользователя отправившего сообщение. Если сообщение отправил бот, то
`messag_sender_id` = 0.
* `answer_to_message_id` - идентификатор сообщения на которое дан ответ. Если ответа нет или ответ на служебное
сообщение о создании топика в чатах с форумным типом, то `answer_to_message_id` = 0.
* `message_ai_model` - идентификатор модели нейросети, которая использовалась для генерации ответа. Если ответ'
сгенерирован не был, то `message_ai_model` = null.
* `message_text` - текст сообщения.
руктура таблицы `ChatStats`:
* `chat_id` - идентификатор чата для которого собрана статистика.
* `date` - дата на которую собрана статистика.
* `messages_count` - количество сообщений отправленных в чат за день.
руктура таблицы `UserStats`:
* `chat_id` - идентификатор чата для которого собрана статистика.
* `user_id` - идентификатор пользователя для которого собрана статистика.
* `date` - дата на которую собрана статистика.
* `messages_count` - количество сообщений отправленных пользователем в чат за день.

View File

@@ -0,0 +1,5 @@
from . import db_api, models, repositories
async def module_init():
db_api.connect_database()

View File

@@ -0,0 +1,308 @@
import peewee as pw
from aiogram.types import Message
from src.service import paths
from .exceptions import MissingModuleName, NotExpectedModuleName
from .models.chat_stats import ChatStats
from .models.chats import Chats
from .models.db import database_proxy
from .models.fsm_data import FSMData
from .models.messages import Messages
from .models.user_stats import UserStats
from .models.users import Users
def connect_database(is_test: bool = False, module: str | None = None):
if is_test:
if not module:
raise MissingModuleName()
db_path = f"{paths.modules_standard}/{module}/tests/database"
else:
if module:
raise NotExpectedModuleName()
db_path = f"{paths.core}/database"
database = pw.SqliteDatabase(f"{db_path}/OCAB.db")
database_proxy.initialize(database)
database.connect()
create_tables(database)
return database, f"{db_path}/OCAB.db"
def create_tables(db: pw.SqliteDatabase):
"""Создание таблиц"""
for table in Chats, Messages, Users, UserStats, ChatStats, FSMData:
if not table.table_exists():
db.create_tables([table])
def add_chat(chat_id, chat_name, chat_type=10, chat_stats=0):
chat, created = Chats.get_or_create(
id=chat_id,
defaults={
"chat_name": chat_name,
"chat_type": chat_type,
"chat_all_stat": chat_stats,
},
)
if not created:
# Обновить существующий чат, если он уже существует
chat.chat_name = chat_name
chat.chat_type = chat_type
chat.chat_stats = chat_stats
chat.save()
def add_user(
user_id,
user_first_name,
user_last_name=None,
user_tag=None,
user_role=0,
user_stats=0,
user_rep=0,
):
if user_last_name is None:
user_name = user_first_name
else:
user_name = user_first_name + " " + user_last_name
user, created = Users.get_or_create(
id=user_id,
defaults={
"user_tag": user_tag,
"user_name": user_name,
"user_role": user_role,
"user_stats": user_stats,
"user_rep": user_rep,
},
)
if not created:
# Обновить существующего пользователя, если он уже существует
user.user_tag = user_tag
user.user_name = user_name
user.user_role = user_role
user.user_stats = user_stats
user.user_rep = user_rep
user.save()
def add_message(message: Message, message_ai_model=None):
if message.reply_to_message:
answer_to_message_id = message.reply_to_message.message_id
else:
answer_to_message_id = None
Messages.create(
message_chat_id=message.chat.id,
message_id=message.message_id,
message_sender_id=message.from_user.id,
answer_to_message_id=answer_to_message_id,
message_ai_model=message_ai_model,
message_text=message.text,
)
def add_chat_stats(chat_id, date, messages_count):
ChatStats.create(chat_id=chat_id, date=date, messages_count=messages_count)
def add_user_stats(chat_id, user_id, date, messages_count):
UserStats.create(
chat_id=chat_id, user_id=user_id, date=date, messages_count=messages_count
)
# Работа с таблицей чатов
def get_chat(chat_id):
return Chats.get_or_none(Chats.id == chat_id)
def change_chat_name(chat_id, new_chat_name):
query = Chats.update(chat_name=new_chat_name).where(Chats.id == chat_id)
query.execute()
def change_chat_type(chat_id, new_chat_type):
query = Chats.update(chat_type=new_chat_type).where(Chats.id == chat_id)
query.execute()
def get_chat_all_stat(chat_id):
chat = Chats.get_or_none(Chats.id == chat_id)
return chat.chat_all_stat if chat else None
# Работа с таблицей пользователей
def get_user(user_id) -> Users | None:
return Users.get_or_none(Users.id == user_id)
def get_user_tag(user_id):
user = Users.get_or_none(Users.id == user_id)
return user.user_tag if user else None
def get_user_id(user_tag):
user = Users.get_or_none(Users.user_tag == user_tag)
return user.id if user else None
def get_user_name(user_id):
user = Users.get_or_none(Users.id == user_id)
return user.user_name if user else None
def get_user_role(user_id):
user = Users.get_or_none(Users.id == user_id)
return user.user_role if user else None
def get_user_all_stats(user_id):
user = Users.get_or_none(Users.id == user_id)
return user.user_stats if user else None
def get_user_rep(user_id):
user = Users.get_or_none(Users.id == user_id)
return user.user_rep if user else None
def change_user_name(user_id, user_first_name, user_last_name=None):
if user_last_name is None:
new_user_name = user_first_name
else:
new_user_name = user_first_name + " " + user_last_name
query = Users.update(user_name=new_user_name).where(Users.id == user_id)
query.execute()
def change_user_tag(user_id, new_user_tag):
query = Users.update(user_tag=new_user_tag).where(Users.id == user_id)
query.execute()
def change_user_role(user_id, new_user_role):
query = Users.update(user_role=new_user_role).where(Users.id == user_id)
query.execute()
# Работа с таблицей сообщений
def get_message(message_chat_id, message_id):
return Messages.get_or_none(
Messages.message_chat_id == message_chat_id,
Messages.message_id == message_id,
)
def get_message_sender_id(message_chat_id, message_id):
message = Messages.get_or_none(
Messages.message_chat_id == message_chat_id, Messages.message_id == message_id
)
return message.message_sender_id if message else None
def get_message_text(message_chat_id, message_id):
message = Messages.get_or_none(
Messages.message_chat_id == message_chat_id, Messages.message_id == message_id
)
return message.message_text if message else None
def get_message_ai_model(message_chat_id, message_id):
message = Messages.get_or_none(
Messages.message_chat_id == message_chat_id, Messages.message_id == message_id
)
return message.message_ai_model if message else None
def get_answer_to_message_id(message_chat_id, message_id):
message = Messages.get_or_none(
Messages.message_chat_id == message_chat_id, Messages.message_id == message_id
)
return message.answer_to_message_id if message else None
# Работа с таблицей статистики чатов
def get_chat_stats(chat_id):
chat_stats = {}
for chat_stat in ChatStats.select().where(ChatStats.chat_id == chat_id):
chat_stats[chat_stat.date] = chat_stat.messages_count
return chat_stats
# Работа с таблицей статистики пользователей
def get_user_stats(user_id):
user_stats = {}
for user_stat in UserStats.select().where(UserStats.user_id == user_id):
user_stats[user_stat.date] = user_stat.messages_count
return user_stats
# Функции обновления
def update_chat_all_stat(chat_id):
query = Chats.update(chat_all_stat=Chats.chat_all_stat + 1).where(
Chats.id == chat_id
)
query.execute()
def update_chat_stats(chat_id, date):
chat_stats = ChatStats.get_or_none(
ChatStats.chat_id == chat_id, ChatStats.date == date
)
if chat_stats:
query = ChatStats.update(messages_count=ChatStats.messages_count + 1).where(
ChatStats.chat_id == chat_id, ChatStats.date == date
)
query.execute()
else:
ChatStats.create(chat_id=chat_id, date=date, messages_count=1)
def update_user_all_stat(user_id):
user = Users.get_or_none(Users.id == user_id)
if user:
query = Users.update(user_stats=Users.user_stats + 1).where(Users.id == user_id)
query.execute()
else:
Users.create(id=user_id, user_stats=1)
def update_user_rep(user_id):
user = Users.get_or_none(Users.id == user_id)
if user:
query = Users.update(user_rep=Users.user_rep + 1).where(Users.id == user_id)
query.execute()
else:
Users.create(id=user_id, user_rep=1)
def update_user_stats(chat_id, user_id, date):
user_stats = UserStats.get_or_none(
UserStats.chat_id == chat_id,
UserStats.user_id == user_id,
UserStats.date == date,
)
if user_stats:
query = UserStats.update(messages_count=UserStats.messages_count + 1).where(
UserStats.chat_id == chat_id,
UserStats.user_id == user_id,
UserStats.date == date,
)
query.execute()
else:
UserStats.create(chat_id=chat_id, user_id=user_id, date=date, messages_count=1)

View File

@@ -0,0 +1,12 @@
class MissingModuleName(BaseException):
def __init__(self):
self.message = "Пропущено название директории модуля"
super().__init__(self.message)
class NotExpectedModuleName(BaseException):
def __init__(self):
self.message = "Не ожидалось название директории модуля"
super().__init__(self.message)

View File

@@ -0,0 +1,9 @@
{
"id": "standard.database",
"name": "Database",
"description": "Модуль для работы с БД",
"author": "OCAB Team",
"version": "1.0.0",
"privileged": true,
"dependencies": {}
}

View File

@@ -0,0 +1 @@
from .fsm_data import FSMData

View File

@@ -0,0 +1,12 @@
import peewee as pw
from .db import database_proxy
class ChatStats(pw.Model):
class Meta:
database = database_proxy
chat_id = pw.IntegerField(null=False)
date = pw.DateField(null=False)
messages_count = pw.IntegerField(null=False, default=0)

View File

@@ -0,0 +1,12 @@
import peewee as pw
from .db import database_proxy
class Chats(pw.Model):
class Meta:
database = database_proxy
chat_name = pw.CharField(null=False)
chat_type = pw.IntegerField(null=False, default=10)
chat_all_stat = pw.IntegerField(null=False)

Some files were not shown because too many files have changed in this diff Show More