0
0
mirror of https://gitflic.ru/project/maks1ms/ocab.git synced 2025-11-28 10:21:55 +03:00

22 Commits

Author SHA1 Message Date
63b321a500 улучшена документация 2024-07-31 19:24:27 +03:00
ffa5af740e добавлен workspace 2024-07-31 17:03:50 +03:00
3130e820c3 исправлено сохранение настроек 2024-07-31 15:20:13 +03:00
d5f6f1bb4f подготовка к публикации 2024-07-28 18:48:25 +03:00
bfa1d13931 wip 2024-07-28 15:24:28 +03:00
5b02c2ce6a добавил типизацию 2024-07-27 22:23:42 +03:00
e48e83bf2c wip 2024-07-24 16:28:42 +03:00
6aab1ee244 wip 2024-07-22 00:44:27 +03:00
2177c712a3 фикс логгирования в hypercorn 2024-07-21 21:27:57 +03:00
34c365178b wip 2024-07-21 20:01:50 +03:00
d52864a231 wip 2024-07-20 12:01:00 +03:00
e8b5f79d99 wip 2024-07-18 11:32:09 +03:00
abf8f8047c Добавлен .mailmap 2024-07-17 19:37:48 +03:00
eecc59ca94 Merged with private/new-module-system 2024-07-14 17:07:46 +03:00
370b4fc648 Merged with private/new-module-system 2024-07-10 19:30:23 +03:00
2a2b9e15e8 Merged with chore/code-quality-tools 2024-07-08 14:20:21 +03:00
ef10f05a73 Добавлены nosec для прохождения bandit 2024-07-08 00:55:33 +03:00
ef0dda07f7 Добавлен bandit в pre-commit-hook 2024-07-08 00:49:04 +03:00
e80a01157f Автоматический рефакторинг и игнорирование flake8
Выполнен автоматический рефакторинг. Для тех файлов,
которые не прошли flake8 - был добавлен `noqa`, чтобы
в будущем исправить эти проблемы
2024-07-08 00:38:01 +03:00
4edeef4003 Добавлен pre-commit 2024-07-08 00:21:50 +03:00
31142dfb1c Добавлены инструменты для повышения качества кода 2024-07-07 23:59:33 +03:00
837613e072 Merged with chore/refactor-core 2024-07-07 21:25:10 +03:00
178 changed files with 10361 additions and 1364 deletions

1
.bandit Normal file
View File

@@ -0,0 +1 @@
[bandit]

6
.flake8 Normal file
View File

@@ -0,0 +1,6 @@
[flake8]
per-file-ignores =
__init__.py:F401
max-line-length = 88
count = true
extend-ignore = E203,E701

4
.gitignore vendored
View File

@@ -6,5 +6,5 @@ env
venv
__pycache__
OCAB.db
src/paths.json
src/core/config.yaml
config.yaml
dist

7
.mailmap Normal file
View File

@@ -0,0 +1,7 @@
armatik <s.fomchenkov@yandex.ru> <57626821+armatik@users.noreply.github.com>
armatik <s.fomchenkov@yandex.ru> <57626821+Armatik@users.noreply.github.com>
armatik <s.fomchenkov@yandex.ru> Armatik <s.fomchenkov@yandex.ru>
ilyazheprog <ilyazheprog@gmail.com> <ilya_zhenetskij@vk.com>
Maxim Slipenko <maxim@slipenko.com> Максим Слипенко <maxim@slipenko.com>

30
.pre-commit-config.yaml Normal file
View File

@@ -0,0 +1,30 @@
# See https://pre-commit.com for more information
# See https://pre-commit.com/hooks.html for more hooks
repos:
- repo: https://github.com/crashappsec/pre-commit-sync
rev: 04b0e02eefa7c41bedca7456ad542e60b67c16c6
hooks:
- id: pre-commit-sync
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v3.2.0
hooks:
- id: trailing-whitespace
- id: end-of-file-fixer
- id: check-yaml
- id: check-added-large-files
- repo: https://github.com/PyCQA/isort
rev: 5.13.2 # sync:isort:poetry.lock
hooks:
- id: isort
- repo: https://github.com/psf/black
rev: 24.4.2 # sync:black:poetry.lock
hooks:
- id: black
- repo: https://github.com/PyCQA/flake8
rev: 7.1.0 # sync:flake8:poetry.lock
hooks:
- id: flake8
- repo: https://github.com/PyCQA/bandit
rev: 1.7.9 # sync:bandit:poetry.lock
hooks:
- id: bandit

View File

@@ -28,10 +28,11 @@ OCAB - это бот для Telegram, который призван помочь
## Дополнительные официальные модули
* `gpt` - модуль для генерации ответов на основе нейросети GPT-3.5. Позволяет боту отвечать на сообщения
* `yandexgpt` - модуль для генерации ответов на основе нейросети GPT-3.5. Позволяет боту отвечать на сообщения
пользователей, используя нейросеть. Ключевой особенностью является построение линии контекста для нейросети,
которая позволяет боту отвечать на вопросы, используя контекст предыдущих сообщений. Для этого используется
модуль база данных хранящий историю сообщений.
<!--
* `bugzilla` - модуль для интеграции с BugZilla. Позволяет получать уведомления о новых багах в BugZilla, отслеживать их
статус, формировать стандартизированные сообщения для корректного описания багов. В будущем планируется интеграция с
API BugZilla для возможности создания багов из чата.
@@ -40,6 +41,7 @@ OCAB - это бот для Telegram, который призван помочь
прочей информации о пакете.
* `notes` - модуль заметок. Позволяет сохранять заметки для пользователей и чатов. Заметки являются ссылками на
сообщения в чате.
-->
Список модулей будет пополняться. Идеи для модулей можно оставлять в [issues](https://gitflic.ru/project/armatik/ocab/issue/create).

107
docs/MODULES-SPEC.md Normal file
View File

@@ -0,0 +1,107 @@
# Спецификация модулей
> **Внимание!**
>
> Данная спецификация еще не закончена и активно разрабатывается.
> Могут быть значительные изменения (breaking changes).
Каждый модуль представлен в виде папки, содержащей два обязательных файла: info.json и `__init__.py`.
## Метаинформация о модуле (info.json)
Этот файл содержит метаинформацию о модуле в формате JSON. Пример структуры info.json приведен ниже:
```json
{
"id": "standard.info",
"name": "Info",
"description": "Модуль с информацией",
"author": "OCAB Team",
"version": "1.0.0",
"privileged": false,
"dependencies": {
"required": {
"standard.roles": "^1.0.0",
"standard.database": {
"version": "^1.0.0",
"uses": [
"db_api"
]
}
},
"optional": {
"external.yandexgpt": "*"
}
},
"pythonDependencies": {
"required": {
"some_package": "^1.2.3"
},
"optional": {
"another_package": "*"
}
}
}
```
- `id`: Уникальный идентификатор модуля.
- `name`: Название модуля.
- `description`: Описание функциональности модуля.
- `author`: Автор модуля.
- `version`: Версия модуля в формате [SemVer](https://semver.org/).
- `privileged`: Булево значение, указывающее, является ли модуль привилегированным.
- `dependencies`: Объект, описывающий зависимости модуля от других **OCAB** модулей.
- `required`: Обязательные зависимости. Ключ - идентификатор модуля, значение - версия или объект `DependencyInfo`.
- `optional`: Необязательные зависимости. Ключ - идентификатор модуля, значение - версия или объект `DependencyInfo`.
- `pythonDependencies`: Объект, описывающий зависимости модуля от внешних Python пакетов.
- `required`: Обязательные зависимости. Ключ - название пакета, значение - версия.
- `optional`: Необязательные зависимости. Ключ - название пакета, значение - версия.
### DependencyInfo
Объект `DependencyInfo` позволяет указать не только версию зависимости, но и список разрешенных к использованию
атрибутов модуля (`uses`). Если `uses` не указан, то доступ к модулю целиком запрещен.
```json
{
"version": "^1.0.0",
"uses": [
"db_api"
]
}
```
- `version`: Версия модуля.
- `uses`: Список разрешенных атрибутов модуля.
## Режимы выполнения модулей
**Непривилегированный режим** (`privileged: false`):
- Модуль выполняется в доверенной среде на основе RestrictedPython (это накладывает ряд ограничений).
- Может импортировать только явно разрешенные модули, указанные в `pythonDependencies`,
а также несколько стандартных модулей, необходимых для работы.
- Имеет доступ к пакету `ocab_core.modules_system.public_api` для взаимодействия с ботом.
**Привилегированный режим** (`privileged: true`):
- Модуль выполняется без ограничений.
- Имеет полный доступ ко всем пакетам, доступным в окружении.
- Должен использоваться с осторожностью и только для модулей, требующих расширенных прав.
## Жизненный цикл модуля
1. Загрузка метаданных из `info.json`.
2. Проверка зависимостей:
- Проверяется наличие всех обязательных зависимостей.
- Проверяется совместимость версий зависимостей.
- Проверяется наличие Python зависимостей.
3. Загрузка кода модуля из `__init__.py`.
4. Вызов функции `module_init` (если она есть).
5. После загрузки всех модулей вызывается функция `module_late_init` (если она есть).
## Взаимодействие между модулями
Модули могут взаимодействовать друг с другом через [API](../src/ocab_core/ocab_core/modules_system/public_api/__init__.py),
предоставляемое системой управления модулями.
Например, есть функция `get_module`. Она позволяет получить модуль или предоставляемые им объекты по его
идентификатору.

15
init.py
View File

@@ -1,15 +0,0 @@
from pathlib import Path
from json import dumps
pwd = Path().cwd()
dir_core = pwd / "src" / "core"
dir_modules_standard = pwd / "src" / "modules" / "standard"
dir_modules_custom = pwd / "src" / "modules" / "custom"
json = {
'core': str(dir_core),
'modules standard': str(dir_modules_standard),
'modules custom': str(dir_modules_custom),
}
with open("src/paths.json", "w", encoding="utf8") as f:
f.write(dumps(json, indent=4))

25
ocab.code-workspace Normal file
View File

@@ -0,0 +1,25 @@
{
"folders": [
{
"name": "OCAB Monorepo Root",
"path": ".",
},
{
"name": "OCAB Modules",
"path": "src/ocab_modules"
},
{
"name": "OCAB Core",
"path": "src/ocab_core"
},
{
"name": "Gnomik",
"path": "src/gnomik"
}
],
"extensions": {
"recommendations": [
"ms-python.python"
]
},
}

1044
poetry.lock generated

File diff suppressed because it is too large Load Diff

2
poetry.toml Normal file
View File

@@ -0,0 +1,2 @@
[virtualenvs]
in-project = true

View File

@@ -1,24 +1,48 @@
[tool.poetry]
name = "ocab"
version = "0.1.0"
description = ""
name = "ocab-monorepo"
version = "2.0.0"
description = "OCAB is a modular Telegram bot"
license = "GPL-3.0-only"
authors = ["Семён Фомченков <s.fomchenkov@yandex.ru>"]
maintainers = [
"Илья Женецкий <ilya_zhenetskij@vk.com>",
"qualimock <qualimock@yandex.ru>",
"Кирилл Уницаев fiersik.kouji@yandex.ru",
"Кирилл Уницаев <fiersik.kouji@yandex.ru>",
"Максим Слипенко <maxim@slipenko.com>"
]
readme = "README.md"
repository = "https://gitflic.ru/project/armatik/ocab"
packages = [
{ include = "scripts" }
]
[tool.poetry.urls]
"Bug Tracker" = "https://gitflic.ru/project/armatik/ocab/issue?status=OPEN"
[tool.poetry.scripts]
test = 'scripts.test:main'
init = 'scripts.init:main'
module = 'scripts.module:main'
[tool.poetry.dependencies]
python = "^3.11.6"
aiogram = "^3.2.0"
peewee = "^3.17.0"
pyyaml = "^6.0.1"
requests = "^2.31.0"
python = "~3.12"
[tool.poetry.group.dev.dependencies]
flake8 = "^7.1.0"
black = "^24.4.2"
isort = "^5.13.2"
bandit = "^1.7.9"
pre-commit = "^3.7.1"
semver = "^3.0.2"
[tool.black]
line-length = 88
[tool.isort]
profile = "black"
line_length = 88
multi_line_output = 3
skip_gitignore = true
[build-system]
requires = ["poetry-core"]

View File

@@ -1,4 +0,0 @@
#! /bin/sh
cd src
python -m unittest discover -v
cd ..

21
scripts/init.py Normal file
View File

@@ -0,0 +1,21 @@
from json import dumps
from pathlib import Path
def main():
pwd = Path().cwd()
dir_core = pwd / "src" / "ocab_core"
dir_modules_standard = pwd / "src" / "ocab_modules" / "standard"
dir_modules_external = pwd / "src" / "ocab_modules" / "external"
json = {
"core": str(dir_core),
"modules standard": str(dir_modules_standard),
"modules external": str(dir_modules_external),
}
with open("src/paths.json", "w", encoding="utf8") as f:
f.write(dumps(json, indent=4))
if __name__ == "__main__":
main()

115
scripts/module.py Normal file
View File

@@ -0,0 +1,115 @@
import argparse
import json
import os
DEFAULTS = {
"description": "Очень полезный модуль",
"author": "OCAB Team",
"version": "1.0.0",
"privileged": "false",
}
def create_module(args):
module_dir = os.path.join("src/ocab_modules/standard", args.module_name)
os.makedirs(module_dir, exist_ok=True)
module_info = {
"id": args.id,
"name": args.name,
"description": args.description,
"author": args.author,
"version": args.version,
"privileged": args.privileged.lower() == "true",
"dependencies": {},
}
with open(os.path.join(module_dir, "info.json"), "w", encoding="utf-8") as f:
json.dump(module_info, f, ensure_ascii=False, indent=4)
with open(os.path.join(module_dir, "__init__.py"), "w", encoding="utf-8") as f:
f.write("# Init file for the module\n")
print(f"Module {args.module_name} created successfully.")
def interactive_mode(args):
def get_input(prompt, default=None):
if default:
value = input(f"{prompt} [{default}]: ")
return value if value else default
else:
value = input(f"{prompt}: ")
return value
module_name = get_input("Введите название модуля (папки)")
module_id = get_input("Введите ID")
name = get_input("Введите название модуля")
description = get_input(
"Введите описание модуля", args.description or DEFAULTS["description"]
)
author = get_input("Введите автора", args.author or DEFAULTS["author"])
version = get_input("Введите версию", args.version or DEFAULTS["version"])
privileged = get_input(
"Модуль привилегированный (true/false)",
args.privileged or DEFAULTS["privileged"],
)
args = argparse.Namespace(
command="create",
module_name=module_name,
id=module_id,
name=name,
description=description,
author=author,
version=version,
privileged=privileged,
dependencies="",
)
create_module(args)
def main():
parser = argparse.ArgumentParser(
description="Утилита для создания директории модуля с файлами."
)
subparsers = parser.add_subparsers(dest="command", required=True)
create_parser = subparsers.add_parser("create", help="Создать новый модуль")
create_parser.add_argument("--module_name", help="Название директории модуля")
create_parser.add_argument("--id", help="ID модуля")
create_parser.add_argument("--name", help="Название модуля")
create_parser.add_argument("--description", help="Описание модуля")
create_parser.add_argument("--author", help="Автор модуля")
create_parser.add_argument("--version", help="Версия модуля")
create_parser.add_argument(
"--privileged", help="Привилегированный модуль (true/false)"
)
create_parser.add_argument(
"--dependencies", help="Список зависимостей в формате имя:версия через запятую"
)
args = parser.parse_args()
if args.command == "create":
if not all(
[
args.module_name,
args.id,
args.name,
args.description,
args.author,
args.version,
args.privileged,
args.dependencies,
]
):
print("Переход в интерактивный режим...")
interactive_mode(args)
else:
create_module(args)
if __name__ == "__main__":
main()

9
scripts/test.py Normal file
View File

@@ -0,0 +1,9 @@
import subprocess # nosec
def main():
subprocess.run(["python", "-u", "-m", "unittest", "discover"]) # nosec
if __name__ == "__main__":
main()

View File

@@ -1,2 +0,0 @@
import src.service
import src.core

View File

@@ -1,13 +0,0 @@
TELEGRAM:
TOKEN:
YANDEXGPT:
TOKEN:
CATALOGID:
PROMPT:
ROLES:
ADMIN: 0
MODERATOR: 1
USER: 2
BOT: 3

View File

@@ -1,21 +0,0 @@
TELEGRAM:
TOKEN: xxxxxxxxxxxxxxxxxxxxxxxxx
APPROVED_CHAT_ID: "-123456789 | -012345678"
ADMINCHATID: -12345678
DEFAULT_CHAT_TAG: "@alt_gnome_chat"
CHECK_BOT: True
YANDEXGPT:
TOKEN: xxxxxxxxxxxxxxxxxxxxxxxxx
TOKEN_FOR_REQUEST: 8000
TOKEN_FOR_ANSWER: 2000
CATALOGID: xxxxxxxxxxxxxxxxxxxxxxxxx
PROMPT: "Ты чат-бот ..."
STARTWORD: "Бот| Бот, | бот | бот,"
INWORD: "помогите | не работает"
ROLES:
ADMIN: 2
MODERATOR: 1
USER: 0
BOT: 3

View File

@@ -1,24 +0,0 @@
import os
import time
async def check_log_file():
# Проверка наличия файла для логов в формате log-dd-mm-yyyy.log
# Если файл существует, то pass
# Если файл не существует, то создаём его. файл лежит в директории src.core.log
current_data = time.strftime("%d-%m-%Y")
log_file = os.path.join(os.path.dirname(__file__), "log/", f"log-{current_data}.log")
if not os.path.exists(log_file):
with open(log_file, 'w') as file:
file.write("Log file created\n")
file.close()
else:
pass
async def log(message):
await check_log_file()
current_data = time.strftime("%d-%m-%Y")
log_file = os.path.join(os.path.dirname(__file__), "log/", f"log-{current_data}.log")
# print(log_file)
with open(log_file, 'a') as file:
file.write(f"{time.strftime('%H:%M:%S')} {message}\n")
file.close()

View File

@@ -1,26 +0,0 @@
from routers import include_routers
from src.modules.standard.config.config import get_telegram_token
from src.modules.standard.database.db_api import connect_database, create_tables
from asyncio import run
from aiogram import Bot, Dispatcher
async def main(bot: Bot):
try:
database, path = connect_database()
database.connect()
create_tables(database)
dp = Dispatcher()
await include_routers(dp)
await dp.start_polling(bot)
finally:
await bot.session.close()
database.close()
if __name__ == "__main__":
bot = Bot(token=get_telegram_token())
run(main(bot))

View File

@@ -1,18 +0,0 @@
from aiogram import Dispatcher, Router, F, Bot
from aiogram.types import Message
from src.modules.standard.info.routers import router as info_router
from src.modules.standard.admin.routers import router as admin_router
from src.modules.standard.message_processing.message_api import router as process_message
from src.modules.standard.welcome.routers import router as welcome_router
async def include_routers(dp: Dispatcher):
"""
Подключение роутеров в бота
dp.include_router()
"""
dp.include_router(info_router)
dp.include_router(admin_router)
dp.include_router(process_message)

23
src/gnomik/Dockerfile Normal file
View File

@@ -0,0 +1,23 @@
FROM python:3.12-slim as builder
RUN pip install poetry
RUN mkdir -p /app
COPY . /app
# Фикс
RUN sed -i '/ocab-core = {/{s/, develop = true//}' /app/src/gnomik/pyproject.toml && \
sed -i '/ocab-modules = {/{s/, develop = true//}' /app/src/gnomik/pyproject.toml && \
sed -i '/ocab-core = {/{s/, develop = true//}' /app/src/ocab_modules/pyproject.toml
WORKDIR /app/src/gnomik
RUN poetry lock && poetry install
FROM python:3.12-slim as base
COPY --from=builder /app/src/gnomik /app
WORKDIR /app
ENV PATH="/app/.venv/bin:$PATH"
CMD ["python", "-m", "gnomik"]

View File

@@ -0,0 +1,14 @@
**/Dockerfile
**/*.dockerignore
**/docker-compose.yml
**/.git
**/.gitignore
**/.venv
**/.mypy_cache
**/__pycache__/
src/gnomik/config.yaml
src/gnomik/database/*

37
src/gnomik/README.md Normal file
View File

@@ -0,0 +1,37 @@
# Gnomик
![Логотип](./docs/gnomik.jpg)
Чат-бот помощник в [ALT Gnome Chat](https://t.me/alt_gnome_chat).
ALT Regular Gnome Community - открытое сообщество пользователей операционной системы ALT Regular Gnome.
- [Канал](https://t.me/alt_gnome)
- [Wiki](https://alt-gnome.wiki)
## Описание
Gnomик - это чат-бот, разработанный на платформе Open Chat AI Bot (OCAB) для Telegram. Он предоставляет различные функции и возможности, помогающие пользователям операционной системы ALT Regular Gnome.
## Функционал
<!--
TODO: описать функционал
-->
## Запуск
Запуск бота осуществляется с помощью команды:
```bash
python -m gnomik
```
## Конфигурация
Конфигурация бота находится в файле `config.yaml`.
## Модули
Список загружаемых модулей указан в файле `__main__.py`.

View File

@@ -0,0 +1,12 @@
version: '3'
services:
app:
build:
context: ../..
dockerfile: src/gnomik/Dockerfile
ports:
- 9000:9000
volumes:
- ./config.yaml:/app/config.yaml
- ./database:/app/database

BIN
src/gnomik/docs/gnomik.jpg Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 61 KiB

View File

@@ -0,0 +1,29 @@
import asyncio
from ocab_core import OCAB
from ocab_modules import module_loader
async def main():
ocab = OCAB()
await ocab.init_app(
[
module_loader("standard", "config", safe=False),
module_loader("standard", "database", safe=False),
module_loader("standard", "fsm_database_storage", safe=False),
module_loader("standard", "roles", safe=False),
module_loader("external", "yandexgpt", safe=False),
#
module_loader("standard", "command_helper"),
module_loader("standard", "info"),
module_loader("standard", "filters"),
module_loader("external", "create_report_apps"),
module_loader("standard", "admin"),
module_loader("standard", "message_processing"),
module_loader("standard", "miniapp", safe=False),
]
)
await ocab.start()
asyncio.run(main())

2162
src/gnomik/poetry.lock generated Normal file

File diff suppressed because it is too large Load Diff

2
src/gnomik/poetry.toml Normal file
View File

@@ -0,0 +1,2 @@
[virtualenvs]
in-project = true

15
src/gnomik/pyproject.toml Normal file
View File

@@ -0,0 +1,15 @@
[tool.poetry]
name = "gnomik"
version = "0.1.0"
description = ""
authors = ["Максим Слипенко <maxim@slipenko.com>"]
readme = "README.md"
[tool.poetry.dependencies]
python = "~3.12"
ocab-core = { extras=["webhook"], path = "../ocab_core", develop = true }
ocab-modules = { path = "../ocab_modules", develop = true }
[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"

View File

@@ -1 +0,0 @@

View File

@@ -1 +0,0 @@
from . import yandexgpt

View File

@@ -1,19 +0,0 @@
from aiogram import Bot
from aiogram.types import Message
from src.modules.external.yandexgpt.yandexgpt import *
from src.modules.standard.config.config import get_yandexgpt_token, get_yandexgpt_catalog_id, get_yandexgpt_prompt
from src.modules.standard.database.db_api import add_message
from src.core.logger import log
import asyncio
async def answer_to_message(message: Message, bot: Bot):
# print("answer_to_message")
await log("answer_to_message")
yagpt = YandexGPT(get_yandexgpt_token(), get_yandexgpt_catalog_id())
text = message.text
prompt = get_yandexgpt_prompt()
# response = await yagpt.async_yandexgpt(system_prompt=prompt, input_messages=text)
response = await yagpt.yandexgpt_request(chat_id = message.chat.id, message_id = message.message_id, type = "yandexgpt")
reply = await message.reply(response, parse_mode="Markdown")
add_message(reply, message_ai_model="yandexgpt")

View File

@@ -1,6 +0,0 @@
{
"name": "YandexGPT",
"description": "Модуль для работы с Yandex GPT",
"author": "OCAB Team",
"version": "1.0"
}

View File

@@ -1 +0,0 @@
from . import config, database, exceptions, roles

View File

@@ -1 +0,0 @@
from . import routers

View File

@@ -1,6 +0,0 @@
{
"name": "Admin",
"description": "Модуль для работы с админкой",
"author": "OCAB Team",
"version": "1.0"
}

View File

@@ -1,15 +0,0 @@
from aiogram import Router, F
from src.modules.standard.admin.handlers import delete_message, error_access, get_chat_id, chat_not_in_approve_list
from src.modules.standard.filters.filters import ChatModerOrAdminFilter, ChatNotInApproveFilter
router = Router()
# Если сообщение содержит какой либо текст и выполняется фильтр ChatNotInApproveFilter, то вызывается функция chat_not_in_approve_list
router.message.register(chat_not_in_approve_list, ChatNotInApproveFilter(), F.text)
router.message.register(get_chat_id, ChatModerOrAdminFilter(), F.text == '/chatID')
router.message.register(delete_message, ChatModerOrAdminFilter(), F.text == '/rm')
router.message.register(error_access, F.text == '/rm')
router.message.register(error_access, F.text == '/chatID')

View File

@@ -1 +0,0 @@
from . import config

View File

@@ -1,60 +0,0 @@
import yaml
from ....service import paths
def get_config(is_test: bool = False) -> dict:
if is_test:
path = f"{paths.modules_standard}/config/tests"
else:
path = paths.core
path = f"{path}/config.yaml"
with open(path, 'r') as file:
return yaml.full_load(file)
config = get_config()
def get_telegram_token() -> str:
return config["TELEGRAM"]["TOKEN"]
def get_telegram_check_bot() -> bool:
return config["TELEGRAM"]["CHECK_BOT"]
def get_aproved_chat_id() -> list:
# Возваращем сплитованный список id чатов в формате int
return [int(chat_id) for chat_id in config["TELEGRAM"]["APPROVED_CHAT_ID"].split(" | ")]
def get_user_role_name(role_number) -> dict:
# Возвращаем название роли пользвателя по номеру роли, если такой роли нет, возвращаем неизвестно
return config["ROLES"].get(role_number, "Неизвестно")
def get_default_chat_tag() -> str:
return config["TELEGRAM"]["DEFAULT_CHAT_TAG"]
def get_yandexgpt_token() -> str:
return config["YANDEXGPT"]["TOKEN"]
def get_yandexgpt_catalog_id() -> str:
return config["YANDEXGPT"]["CATALOGID"]
def get_yandexgpt_prompt() -> str:
return config["YANDEXGPT"]["PROMPT"]
def get_yandexgpt_start_words() -> list:
return config["YANDEXGPT"]["STARTWORD"].split(" | ")
def get_yandexgpt_in_words() -> list:
return config["YANDEXGPT"]["INWORD"].split(" | ")
def get_yandexgpt_token_for_request() -> int:
return config["YANDEXGPT"]["TOKEN_FOR_REQUEST"]
def get_yandexgpt_token_for_answer() -> int:
return config["YANDEXGPT"]["TOKEN_FOR_ANSWER"]
def get_access_rights() -> dict:
return get_config()["ACCESS_RIGHTS"]

View File

@@ -1,6 +0,0 @@
{
"name": "Config YAML",
"description": "Модуль для работы с конфигурационным файлом бота (YAML)",
"author": "OCAB Team",
"version": "1.0"
}

View File

@@ -1,7 +0,0 @@
TELEGRAM:
TOKEN: xxxxxxxxxxxxxxxxxxxx
ROLES:
ADMIN: 0
MODERATOR: 1
USER: 2
BOT: 3

View File

@@ -1 +0,0 @@
from . import db_api, models

View File

@@ -1 +0,0 @@
Эта директория для тестовой БД

View File

@@ -1 +0,0 @@
from . import module_exceptions

View File

@@ -1,6 +0,0 @@
{
"name": "Exceptions",
"description": "Модуль с исключениями",
"author": "OCAB Team",
"version": "1.0"
}

View File

@@ -1,29 +0,0 @@
from aiogram.filters import BaseFilter
from aiogram.types import Message
from aiogram import Bot
from src.modules.standard.roles.roles import Roles
from src.modules.standard.config.config import get_aproved_chat_id
from src.core.logger import log
class ChatModerOrAdminFilter(BaseFilter):
async def __call__(self, message: Message, bot: Bot) -> bool:
user_id = message.from_user.id
roles = Roles()
admins = await bot.get_chat_administrators(message.chat.id)
return await roles.check_admin_permission(user_id) or \
await roles.check_moderator_permission(user_id) or any(user_id == admin.user.id for admin in admins)
class ChatNotInApproveFilter(BaseFilter):
async def __call__(self, message: Message, bot: Bot) -> bool:
# print("chat_check")
await log("chat_check")
chat_id = message.chat.id
if chat_id in get_aproved_chat_id():
# print(f"Chat in approve list: {chat_id}")
await log(f"Chat in approve list: {chat_id}")
return False
else:
# print(f"Chat not in approve list: {chat_id}")
await log(f"Chat not in approve list: {chat_id}")
return True

View File

@@ -1,6 +0,0 @@
{
"name": "Filters",
"description": "Модуль с фильтрами",
"author": "OCAB Team",
"version": "1.0"
}

View File

@@ -1,8 +0,0 @@
from aiogram import Router, F
from src.modules.standard.info.handlers import get_user_info, get_chat_info
router = Router()
router.message.register(get_user_info, F.text.startswith("/info") == True)
router.message.register(get_chat_info, F.text.startswith("/chatinfo") == True)

View File

@@ -1,6 +0,0 @@
{
"name": "Roles",
"description": "Модуль для работы с ролями",
"author": "OCAB Team",
"version": "1.0"
}

View File

@@ -1,7 +0,0 @@
TELEGRAM:
TOKEN: xxxxxxxxxxxxxxxxxxxx
ROLES:
ADMIN: 0
MODERATOR: 1
USER: 2
BOT: 3

23
src/ocab_core/README.md Normal file
View File

@@ -0,0 +1,23 @@
# OCAB Core
Это ядро OCAB, содержащее базовые компоненты:
- Система управления модулями.
- Логирование.
- Утилиты.
## Система управления модулями
Система управления модулями отвечает за:
- Загрузку модулей.
- Проверку зависимостей.
- Предоставление API для взаимодействия между модулями.
## Логирование
Модуль логирования предоставляет функции для записи логов в консоль.
## Утилиты
Модуль утилит содержит вспомогательные функции, например, для форматирования текста.

View File

@@ -0,0 +1 @@
from .main import OCAB

View File

@@ -0,0 +1,38 @@
import importlib
import os
import traceback
from aiogram import Bot, Dispatcher
from aiogram.types import Update
def get_module_directory(module_name):
spec = importlib.util.find_spec(module_name)
if spec is None:
raise ImportError(f"Module {module_name} not found")
module_path = spec.origin
if module_path is None:
raise ImportError(f"Module {module_name} has no origin path")
return os.path.dirname(module_path)
try:
from fastapi import FastAPI, Request
async def register_bot_webhook(app: FastAPI, bot: Bot, dp: Dispatcher):
async def handle_webhook(request: Request):
try:
update = Update.model_validate(
await request.json(), context={"bot": bot}
)
await dp.feed_update(bot, update)
except Exception:
traceback.print_exc()
return {"ok": False}
return {"ok": True}
app.post("/webhook")(handle_webhook)
except ImportError:
pass

View File

@@ -0,0 +1,46 @@
import logging
import traceback
app_logger = logging.getLogger("ocab")
log_level = logging.INFO
def patch_logger(logger_: logging.Logger):
logger_.handlers = []
formatter = logging.Formatter("%(asctime)s %(message)s", datefmt="%H:%M:%S")
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
logger_.addHandler(console_handler)
logger_.propagate = False
logger_.setLevel(log_level)
return logger_
def setup_logger():
"""
Настройка логирования
"""
patch_logger(app_logger)
def log(message):
if isinstance(message, Exception):
error_message = f"Error: {str(message)}\n{traceback.format_exc()}"
app_logger.error(error_message)
else:
app_logger.info(message)
try:
from hypercorn.logging import Logger as HypercornLogger
class CustomLogger(HypercornLogger):
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
if self.error_logger:
patch_logger(self.error_logger)
if self.access_logger:
patch_logger(self.access_logger)
except ImportError:
pass

View File

@@ -0,0 +1,118 @@
import traceback
from typing import TYPE_CHECKING
from aiogram import Bot, Dispatcher
from ocab_core.lib import register_bot_webhook
from ocab_core.logger import CustomLogger, log, setup_logger
from ocab_core.modules_system import ModulesManager
from ocab_core.modules_system.public_api import get_module
from ocab_core.singleton import Singleton
if TYPE_CHECKING:
from ocab_modules.standard.config import IConfig
class OCAB:
def __init__(self) -> None:
pass
async def init_app(self, bot_modules):
setup_logger()
singleton = Singleton()
try:
singleton.modules_manager = ModulesManager()
for module_loader in bot_modules:
info = module_loader.info()
log(f"Loading {info.name} ({info.id}) module")
await singleton.modules_manager.load(module_loader)
register_config()
config: "IConfig" = get_module("standard.config", "config")
config.load()
singleton.bot = Bot(token=config.get("core::token"))
singleton.dp = Dispatcher(storage=singleton.storage["_fsm_storage"])
singleton.dp.include_routers(*singleton.storage["_routers"])
for middleware in singleton.storage["_outer_message_middlewares"]:
singleton.dp.message.outer_middleware.register(middleware)
await singleton.modules_manager.late_init()
except Exception:
traceback.print_exc()
raise
async def start(self):
config: "IConfig" = get_module("standard.config", "config")
if config.get("core::mode") == "WEBHOOK":
await self.start_webhook_mode()
else:
await self.start_long_polling_mode()
return
async def start_long_polling_mode(self):
singleton = Singleton()
await singleton.bot.delete_webhook()
await singleton.dp.start_polling(singleton.bot)
async def start_webhook_mode(self):
try:
from fastapi import FastAPI
from hypercorn.asyncio import serve
from hypercorn.config import Config as HyperConfig
except ImportError:
log(
"Error: FastAPI and Hypercorn are required"
"for webhook mode. Please install them."
)
return
singleton = Singleton()
app = FastAPI()
config = get_module("standard.config", "config")
app.mount("/webapp", singleton.storage["webapp"])
await register_bot_webhook(app, singleton.bot, singleton.dp)
await singleton.bot.set_webhook(config.get("core::webhook::public_url"))
hyperConfig = HyperConfig()
hyperConfig.bind = [f"0.0.0.0:{config.get("core::webhook::port")}"]
hyperConfig.logger_class = CustomLogger
await serve(app, hyperConfig)
def register_config():
config: "IConfig" = get_module("standard.config", "config")
config.register(
"core::token",
"password",
visible=False,
)
config.register(
"core::mode",
"select",
options=["WEBHOOK", "LONG_POLLING"],
default_value="WEBHOOK",
visible=False,
)
config.register(
"core::webhook::port",
"int",
default_value=9000,
visible=False,
)
config.register(
"core::webhook::public_url",
"string",
visible=False,
)

View File

@@ -0,0 +1 @@
from .modules_manager import ModulesManager

View File

@@ -0,0 +1,2 @@
from .fs_loader import FSLoader
from .unsafe_fs_loader import UnsafeFSLoader

View File

@@ -0,0 +1,43 @@
import types
from dataclasses import dataclass
from typing import Dict, List, Optional, Union
from dataclasses_json import dataclass_json
@dataclass_json
@dataclass
class DependencyInfo:
version: str
uses: Optional[List[str]] = None
DependencyType = Union[str, DependencyInfo]
@dataclass_json
@dataclass
class Dependencies:
required: Optional[Dict[str, DependencyType]] = None
optional: Optional[Dict[str, DependencyType]] = None
@dataclass_json
@dataclass
class ModuleInfo:
id: str
name: str
description: str
version: str
author: Union[str, List[str]]
privileged: bool
dependencies: Dependencies
pythonDependencies: Optional[Dependencies] = None
class AbstractLoader:
def info(self) -> ModuleInfo:
raise NotImplementedError
def load(self) -> types.ModuleType:
raise NotImplementedError

View File

@@ -0,0 +1,91 @@
import types
from pathlib import Path
from RestrictedPython import compile_restricted_exec
# from ocab_core.logger import log
from ocab_core.modules_system.loaders.unsafe_fs_loader import UnsafeFSLoader
from ocab_core.modules_system.safe.policy import (
ALLOWED_IMPORTS,
BUILTINS,
RestrictedPythonPolicy,
)
class FSLoader(UnsafeFSLoader):
def __init__(self, path):
super().__init__(path)
self.builtins = BUILTINS.copy()
self.builtins["__import__"] = self._hook_import
self.module_info = self.info()
self.allowed_python_dependencies = self._get_allowed_python_dependencies()
def load(self):
if self.module_info.privileged:
raise Exception("Only non privileged modules are allowed to be imported")
self.module_id = self.module_info.id
return self._hook_import(".")
def _get_allowed_python_dependencies(self):
allowed = {}
if self.module_info.pythonDependencies:
if self.module_info.pythonDependencies.required:
allowed.update(self.module_info.pythonDependencies.required)
if self.module_info.pythonDependencies.optional:
allowed.update(self.module_info.pythonDependencies.optional)
for allowed_module in ALLOWED_IMPORTS:
allowed[allowed_module] = "*"
return allowed
def _resolve_module_from_path(self, module_name: str):
path = Path(self.path)
if module_name != ".":
path = path.joinpath(module_name.replace(".", "/"))
if path.is_dir():
init_file_path = path / "__init__.py"
if not init_file_path.exists():
raise FileNotFoundError(f"File {init_file_path} does not exist.")
file_path = init_file_path
else:
path = path.with_suffix(".py")
if path.is_file():
file_path = path
else:
raise ValueError(f"Module not found: {module_name}")
return file_path
def _hook_import(self, name: str, *args, **kwargs):
if name == "ocab_core.modules_system.public_api":
module = __import__(name, *args, **kwargs)
module.__ocab_module_id__ = self.module_id
return module
for key in self.allowed_python_dependencies.keys():
if name == key or name.startswith(f"{key}."):
return __import__(name, *args, **kwargs)
module_file_path = self._resolve_module_from_path(name)
with open(module_file_path, "r") as f:
src = f.read()
module = types.ModuleType(name)
module.__dict__.update(
{"__builtins__": self.builtins, "__ocab_module_id__": self.module_id}
)
result = compile_restricted_exec(src, "<string>", policy=RestrictedPythonPolicy)
if result.errors:
for error in result.errors:
print(error)
exec(result.code, module.__dict__) # nosec
return module

View File

@@ -0,0 +1 @@
from .FSLoader import FSLoader

View File

@@ -0,0 +1,61 @@
import importlib.util
import os
import sys
from pathlib import Path
from ocab_core.modules_system.loaders.base import AbstractLoader, ModuleInfo
class UnsafeFSLoader(AbstractLoader):
def __init__(self, path):
self.path = path
def info(self):
with open(os.path.join(self.path, "info.json"), "r") as f:
return ModuleInfo.from_json(f.read())
def _resolve_module_from_path(self, module_name: str):
path = Path(self.path)
if module_name != ".":
path = path.joinpath(module_name.replace(".", "/"))
if path.is_dir():
init_file_path = path / "__init__.py"
if not init_file_path.exists():
raise FileNotFoundError(f"File {init_file_path} does not exist.")
file_path = init_file_path
else:
path = path.with_suffix(".py")
if path.is_file():
file_path = path
else:
raise ValueError(f"Module not found: {module_name}")
return file_path
def load(self):
self.info()
full_path = self._resolve_module_from_path(".")
if full_path.name == "__init__.py":
module_name = full_path.parent.name
path = full_path.parent.parent.absolute()
else:
module_name = full_path.stem
path = full_path.parent.absolute()
# Добавляем директорию модуля в sys.path
sys.path.insert(0, str(path))
# Загружаем спецификацию модуля
spec = importlib.util.spec_from_file_location(module_name, full_path)
# Создаем модуль
module = importlib.util.module_from_spec(spec)
# Выполняем модуль
spec.loader.exec_module(module)
return module

View File

@@ -0,0 +1 @@
from .UnsafeFSLoader import UnsafeFSLoader

View File

@@ -0,0 +1,169 @@
import importlib
import inspect
import pkg_resources
import semver
from ocab_core.modules_system.loaders.base import (
AbstractLoader,
DependencyInfo,
ModuleInfo,
)
def is_version_compatible(version, requirement):
def parse_requirement(req):
if req.startswith("^"):
base_version = req[1:]
base_version_info = semver.VersionInfo.parse(base_version)
range_start = base_version_info
range_end = base_version_info.bump_major()
return [f">={range_start}", f"<{range_end}"]
else:
return [req]
for r in parse_requirement(requirement):
if r == "*":
continue
if not semver.Version.parse(version).match(r):
return False
return True
def check_python_dependencies(info: ModuleInfo):
if info.pythonDependencies and info.pythonDependencies.required:
for dependency, req in info.pythonDependencies.required.items():
try:
importlib.import_module(dependency)
except ImportError:
raise Exception(
f"Module {info.id} requires {dependency}, "
f"but it is not installed"
)
try:
installed_version = pkg_resources.get_distribution(dependency).version
except pkg_resources.DistributionNotFound:
installed_version = "*"
if isinstance(req, str):
required_version = req
elif isinstance(req, DependencyInfo):
required_version = req.version
else:
raise ValueError(f"Invalid dependency specification for {dependency}")
if not is_version_compatible(installed_version, required_version):
raise Exception(
f"Module {info.id} depends on {dependency} {required_version}, "
f"but version {installed_version} is installed"
)
def check_dependency_uses(
loaded_dependency, required_uses, dependent_module_id, dependency_id
):
module = loaded_dependency.get("module")
if not module:
raise Exception(f"Module object not found for dependency {dependency_id}")
for required_attr in required_uses:
if not hasattr(module, required_attr):
raise Exception(
f"Module {dependent_module_id} requires '{required_attr}' "
f"from {dependency_id}, but it is not available"
)
async def await_if_async(module, method_name):
if hasattr(module, method_name):
method = getattr(module, method_name)
if inspect.iscoroutinefunction(method):
await method()
else:
method()
class ModulesManager:
def __init__(self):
self.modules = []
async def load(self, loader: AbstractLoader):
info = loader.info()
# Check if the module is already loaded
if any(mod["info"].id == info.id for mod in self.modules):
return
self.check_module_dependencies(info)
check_python_dependencies(info)
module_info = {
"info": info,
"module": None,
}
self.modules.append(module_info)
module = loader.load()
module_info["module"] = module
await await_if_async(module, "module_init")
def check_module_dependencies(self, info: ModuleInfo):
if info.dependencies.required:
for dependency, req in info.dependencies.required.items():
loaded_dependency = next(
(mod for mod in self.modules if mod["info"].id == dependency), None
)
if not loaded_dependency:
raise Exception(
f"Module {info.id} depends on {dependency},"
f"but it is not loaded"
)
loaded_dependency_info = loaded_dependency["info"]
if isinstance(req, str):
required_version = req
elif isinstance(req, DependencyInfo):
required_version = req.version
if req.uses:
check_dependency_uses(
loaded_dependency, req.uses, info.id, dependency
)
else:
raise ValueError(
f"Invalid dependency specification for {dependency}"
)
if not is_version_compatible(
loaded_dependency_info.version, required_version
):
raise Exception(
f"Module {info.id} depends on {dependency} {required_version}, "
f"but version {loaded_dependency_info.version} is loaded"
)
async def late_init(self):
for m in self.modules:
module = m["module"]
await await_if_async(module, "module_late_init")
def get_by_id(self, module_id: str):
module = next(
(mod for mod in self.modules if mod["info"].id == module_id), None
)
if not module:
raise Exception(f"Module with id {module_id} not loaded")
return module["module"]
def get_info_by_id(self, module_id: str):
module = next(
(mod for mod in self.modules if mod["info"].id == module_id), None
)
if not module:
raise Exception(f"Module with id {module_id} not loaded")
return module["info"]

View File

@@ -0,0 +1,12 @@
from ocab_core.logger import log
from .public_api import (
Storage,
get_fsm_context,
get_module,
register_outer_message_middleware,
register_router,
set_chat_menu_button,
set_my_commands,
)
from .utils import Utils

View File

@@ -0,0 +1,131 @@
import inspect
import types
from typing import Any, Tuple, Union
from aiogram import BaseMiddleware, Router
from aiogram.fsm.context import FSMContext
from aiogram.fsm.storage.base import StorageKey
# from ocab_core.logger import log
from ocab_core.modules_system.loaders.base import DependencyInfo
from ocab_core.singleton import Singleton
async def set_chat_menu_button(menu_button):
app = Singleton()
await app.bot.set_chat_menu_button(menu_button=menu_button)
def register_router(router: Router):
app = Singleton()
app.storage["_routers"].append(router)
def register_outer_message_middleware(middleware: BaseMiddleware):
app = Singleton()
app.storage["_outer_message_middlewares"].append(middleware)
async def set_my_commands(commands):
app = Singleton()
await app.bot.set_my_commands(commands)
async def get_fsm_context(chat_id: int, user_id: int) -> FSMContext:
dp = Singleton().dp
bot = Singleton().bot
return FSMContext(
storage=dp.storage,
key=StorageKey(
chat_id=chat_id,
user_id=user_id,
bot_id=bot.id,
),
)
def set_fsm(storage):
app = Singleton()
app.storage["_fsm_storage"] = storage
def get_module(
module_id: str, paths=None
) -> Union[types.ModuleType, Union[Any, None], Tuple[Union[Any, None], ...]]:
caller_globals = inspect.currentframe().f_back.f_globals
app = Singleton()
allowed_uses = None
if "__ocab_module_id__" in caller_globals:
caller_module_id = caller_globals["__ocab_module_id__"]
caller_module_info = app.modules_manager.get_info_by_id(caller_module_id)
if caller_module_info and caller_module_info.dependencies:
dependency = None
if caller_module_info.dependencies.required:
dependency = caller_module_info.dependencies.required.get(module_id)
if not dependency and caller_module_info.dependencies.optional:
dependency = caller_module_info.dependencies.optional.get(module_id)
if (
dependency
and isinstance(dependency, DependencyInfo)
and dependency.uses
):
allowed_uses = set(dependency.uses)
module = app.modules_manager.get_by_id(module_id)
if not module:
raise ModuleNotFoundError(f"Module {module_id} not found")
if paths is None:
if allowed_uses is not None:
raise PermissionError(
f"Direct access to module {module_id} is "
f"not allowed for {caller_module_id}. Specify allowed attributes."
)
return module
if isinstance(paths, str):
paths = [paths]
results = []
for path in paths:
current_obj = module
try:
parts = path.split(".")
for part in parts:
if allowed_uses is not None and part not in allowed_uses:
raise AttributeError(
f"Access to '{part}' is not allowed "
+ f"for module {caller_module_id}"
)
current_obj = getattr(current_obj, part)
results.append(current_obj)
except AttributeError as e:
if "is not allowed" in str(e):
raise PermissionError(str(e))
results.append(None)
if len(results) == 1:
return results[0]
else:
return tuple(results)
class Storage:
@staticmethod
def set(key: str, value: Any):
storage = Singleton().storage
storage[key] = value
@staticmethod
def get(key: str):
storage = Singleton().storage
return storage.get(key)

View File

@@ -0,0 +1,12 @@
import re
CLEAN_HTML = re.compile("<.*?>")
class Utils:
@staticmethod
def code_format(code: str, lang: str):
if lang:
return f'<pre><code class="language-{lang}">{code}</code></pre>'
else:
return f"<pre>{code}</pre>"

View File

@@ -0,0 +1,144 @@
import types
from _ast import AnnAssign
from typing import Any
from aiogram import Bot
from RestrictedPython import (
RestrictingNodeTransformer,
limited_builtins,
safe_builtins,
utility_builtins,
)
from RestrictedPython.Eval import default_guarded_getitem, default_guarded_getiter
from RestrictedPython.Guards import ( # guarded_setattr,; full_write_guard,
_write_wrapper,
guarded_unpack_sequence,
safer_getattr,
)
from ocab_core.logger import log
from ocab_core.modules_system.safe.zope_guards import extra_safe_builtins
class RestrictedPythonPolicy(RestrictingNodeTransformer):
def visit_AsyncFunctionDef(self, node):
return self.node_contents_visit(node)
def visit_Await(self, node):
return self.node_contents_visit(node)
def visit_AsyncFor(self, node):
return self.node_contents_visit(node)
def visit_AsyncWith(self, node):
return self.node_contents_visit(node)
"""
Не работает из-за getattr
def visit_Match(self, node) -> Any:
return self.node_contents_visit(node)
def visit_match_case(self, node) -> Any:
return self.node_contents_visit(node)
def visit_MatchAs(self, node) -> Any:
return self.node_contents_visit(node)
def visit_MatchValue(self, node) -> Any:
return self.node_contents_visit(node)
"""
def visit_AnnAssign(self, node: AnnAssign) -> Any:
# missing in RestrictingNodeTransformer
# this doesn't need the logic that is in visit_Assign
# because it doesn't have a "targets" attribute,
# and node.target: Name | Attribute | Subscript
return self.node_contents_visit(node)
# new Python 3.12 nodes
def visit_TypeAlias(self, node) -> Any:
# missing in RestrictingNodeTransformer
return self.node_contents_visit(node)
def visit_TypeVar(self, node) -> Any:
# missing in RestrictingNodeTransformer
return self.node_contents_visit(node)
def visit_TypeVarTuple(self, node) -> Any:
# missing in RestrictingNodeTransformer
return self.node_contents_visit(node)
def visit_ParamSpec(self, node) -> Any:
# missing in RestrictingNodeTransformer
return self.node_contents_visit(node)
def _metaclass(name, bases, dict):
ob = type(name, bases, dict)
ob.__allow_access_to_unprotected_subobjects__ = 1
ob._guarded_writes = 1
return ob
ALLOWED_IMPORTS = [
"typing",
"aiogram",
"warnings",
]
def safes_getattr(object, name, default=None, getattr=safer_getattr):
if isinstance(object, Bot) and name == "token":
log("Bot.token is not allowed")
raise Exception("Bot.token is not allowed")
return getattr(object, name, default)
trusted_settters_classes = []
def safes_setattr(self, key, value):
if (
isinstance(getattr(type(self), key, None), property)
and getattr(type(self), key).fset is not None
):
getattr(type(self), key).fset(self, value)
return
def write_guard():
# ed scope abuse!
# safetypes and Wrapper variables are used by guard()
safetypes = {dict, list}
Wrapper = _write_wrapper()
def guard(ob):
# Don't bother wrapping simple types, or objects that claim to
# handle their own write security.
if type(ob) in safetypes or hasattr(ob, "_guarded_writes"):
return ob
if type(ob) in trusted_settters_classes:
setattr(ob, "__guarded_setattr__", types.MethodType(safes_setattr, ob))
# Hand the object to the Wrapper instance, then return the instance.
return Wrapper(ob)
return guard
BUILTINS = safe_builtins.copy()
BUILTINS.update(utility_builtins)
BUILTINS.update(limited_builtins)
BUILTINS.update(extra_safe_builtins)
BUILTINS["__metaclass__"] = _metaclass
BUILTINS["_getitem_"] = default_guarded_getitem
BUILTINS["_getattr_"] = safes_getattr
BUILTINS["_getiter_"] = default_guarded_getiter
BUILTINS["_write_"] = write_guard()
BUILTINS["_unpack_sequence_"] = guarded_unpack_sequence
BUILTINS["staticmethod"] = staticmethod
BUILTINS["tuple"] = tuple
BUILTINS["reversed"] = reversed

View File

@@ -0,0 +1,225 @@
#############################################################################
#
# Copyright (c) 2024 OCAB Team
# Copyright (c) 2002 Zope Foundation and Contributors.
#
# This software includes a function derived from the software subject to the
# provisions of the Zope Public License, Version 2.1 (ZPL). A copy of the ZPL
# should accompany this distribution. THIS SOFTWARE IS PROVIDED "AS IS" AND ANY
# AND ALL EXPRESS OR IMPLIED WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST
# INFRINGEMENT, AND FITNESS FOR A PARTICULAR PURPOSE.
#
#
##############################################################################
extra_safe_builtins = {}
class GuardedDictType:
def __call__(self, *args, **kwargs):
return dict(*args, **kwargs)
def fromkeys(self, S, v=None):
return dict.fromkeys(S, v)
extra_safe_builtins["dict"] = GuardedDictType()
ContainerAssertions = {
type(()): 1,
bytes: 1,
str: 1,
range: 1,
}
Containers = ContainerAssertions.get
def _error(index):
raise Exception("unauthorized access to element")
def guard(container, value, index=None):
# if Containers(type(container)) and Containers(type(value)):
# # Simple type. Short circuit.
# return
# I don't know how to do this.
# if getSecurityManager().validate(container, container, index, value):
# return
# _error(index)
return
class SafeIter:
__allow_access_to_unprotected_subobjects__ = 1
def __init__(self, ob, container=None):
self._iter = iter(ob)
if container is None:
container = ob
self.container = container
def __iter__(self):
return self
def __next__(self):
ob = next(self._iter)
guard(self.container, ob)
return ob
next = __next__
class NullIter(SafeIter):
def __init__(self, ob):
self._iter = ob
def __next__(self):
return next(self._iter)
next = __next__
def guarded_iter(*args):
if len(args) == 1:
i = args[0]
# Don't double-wrap
if isinstance(i, SafeIter):
return i
if not isinstance(i, range):
return SafeIter(i)
# Other call styles / targets don't need to be guarded
return NullIter(iter(*args))
extra_safe_builtins["iter"] = guarded_iter
def guarded_any(seq):
return any(guarded_iter(seq))
extra_safe_builtins["any"] = guarded_any
def guarded_all(seq):
return all(guarded_iter(seq))
extra_safe_builtins["all"] = guarded_all
valid_inplace_types = (list, set)
inplace_slots = {
"+=": "__iadd__",
"-=": "__isub__",
"*=": "__imul__",
"/=": (1 / 2 == 0) and "__idiv__" or "__itruediv__",
"//=": "__ifloordiv__",
"%=": "__imod__",
"**=": "__ipow__",
"<<=": "__ilshift__",
">>=": "__irshift__",
"&=": "__iand__",
"^=": "__ixor__",
"|=": "__ior__",
}
def __iadd__(x, y):
x += y
return x
def __isub__(x, y):
x -= y
return x
def __imul__(x, y):
x *= y
return x
def __idiv__(x, y):
x /= y
return x
def __ifloordiv__(x, y):
x //= y
return x
def __imod__(x, y):
x %= y
return x
def __ipow__(x, y):
x **= y
return x
def __ilshift__(x, y):
x <<= y
return x
def __irshift__(x, y):
x >>= y
return x
def __iand__(x, y):
x &= y
return x
def __ixor__(x, y):
x ^= y
return x
def __ior__(x, y):
x |= y
return x
inplace_ops = {
"+=": __iadd__,
"-=": __isub__,
"*=": __imul__,
"/=": __idiv__,
"//=": __ifloordiv__,
"%=": __imod__,
"**=": __ipow__,
"<<=": __ilshift__,
">>=": __irshift__,
"&=": __iand__,
"^=": __ixor__,
"|=": __ior__,
}
def protected_inplacevar(op, var, expr):
"""Do an inplace operation
If the var has an inplace slot, then disallow the operation
unless the var an instance of ``valid_inplace_types``.
"""
if hasattr(var, inplace_slots[op]) and not isinstance(var, valid_inplace_types):
try:
cls = var.__class__
except AttributeError:
cls = type(var)
raise TypeError(
"Augmented assignment to %s objects is not allowed"
" in untrusted code" % cls.__name__
)
return inplace_ops[op](var, expr)
extra_safe_builtins["_inplacevar_"] = protected_inplacevar

View File

@@ -0,0 +1,25 @@
from aiogram import Bot, Dispatcher
from aiogram.fsm.storage.memory import MemoryStorage
from ocab_core.modules_system import ModulesManager
class SingletonMeta(type):
_instances = {}
def __call__(cls, *args, **kwargs):
if cls not in cls._instances:
instance = super().__call__(*args, **kwargs)
cls._instances[cls] = instance
return cls._instances[cls]
class Singleton(metaclass=SingletonMeta):
bot: Bot
dp: Dispatcher = None
modules_manager: ModulesManager = None
storage = {
"_fsm_storage": MemoryStorage(),
"_routers": [],
"_outer_message_middlewares": [],
}

2140
src/ocab_core/poetry.lock generated Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,2 @@
[virtualenvs]
in-project = true

View File

@@ -0,0 +1,26 @@
[tool.poetry]
name = "ocab-core"
version = "0.1.0"
description = ""
authors = ["Максим Слипенко <maxim@slipenko.com>"]
readme = "README.md"
[tool.poetry.dependencies]
python = "~3.12"
aiogram = "^3.10.0"
setuptools = "^71.0.1"
restrictedpython = "^7.1"
semver = "^3.0.2"
dataclasses-json = "^0.6.7"
fastapi = { version = "^0.111.1", optional = true }
hypercorn = { version = "^0.17.3", optional = true }
[tool.poetry.group.dev.dependencies]
ocab-modules = { path = "../ocab_modules", develop = true }
[tool.poetry.extras]
webhook = ["fastapi", "hypercorn"]
[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"

View File

@@ -0,0 +1,12 @@
# OCAB Modules
OCAB Modules содержит набор модулей для платформы Open Chat AI Bot (OCAB).
## Описание
OCAB - это платформа для создания чат-ботов Telegram. Модули - это расширения, которые добавляют функциональность ботам OCAB.
## Типы модулей
* **Стандартные модули (standard.*):** Предоставляют основные функции, такие как управление пользователями, ролями и настройками.
* **Дополнительные официальные модули (external.*):** Разработаны командой OCAB и предоставляют расширенные возможности, такие как интеграция с нейросетями, внешними сервисами и API.

View File

@@ -0,0 +1 @@
from .lib import module_loader

View File

@@ -0,0 +1 @@
from . import yandexgpt

View File

@@ -0,0 +1,19 @@
# Модуль Create Report Apps
Модуль `create_report_apps` предназначен для помощи пользователям в создании отчетов об ошибках в приложениях.
## Функциональность
- Задает пользователю ряд вопросов, необходимых для составления отчета.
- Собирает информацию о системе пользователя.
- Формирует отчет в текстовом формате.
## Команды
- `/create_report_apps` - запустить процесс создания отчета.
## Использование
1. Отправьте команду `/create_report_apps` боту в личных сообщениях или в групповом чате.
2. Ответьте на вопросы бота.
3. Бот сформирует отчет и отправит его вам.

View File

@@ -0,0 +1 @@
from .main import module_init

View File

@@ -0,0 +1,136 @@
from aiogram import Bot, Router
from aiogram.enums import ParseMode
from aiogram.fsm.context import FSMContext
from aiogram.fsm.state import State, StatesGroup
from aiogram.types import (
BufferedInputFile,
KeyboardButton,
Message,
ReplyKeyboardMarkup,
ReplyKeyboardRemove,
)
from ocab_core.modules_system.public_api import Utils, get_fsm_context
from .report import Report
router = Router()
class ReportState(StatesGroup):
input_system_info = State()
input_app_name = State()
input_problem_step_by_step = State()
input_actual_result = State()
input_expected_result = State()
input_additional_info = State()
system_info_code = """echo "SESSION_TYPE: ${XDG_SESSION_TYPE:-Unknown}"
[ -f /etc/os-release ] && grep "^PRETTY_NAME=" /etc/os-release | cut -d= -f2 \
| tr -d '"' | xargs echo "OS: "
echo "Kernel: $(uname -r)"
echo "DE: ${XDG_CURRENT_DESKTOP:-Unknown}"
grep "^model name" /proc/cpuinfo | head -n1 | cut -d: -f2 \
| xargs echo "CPU: "
lspci | grep "VGA compatible controller" | cut -d: -f3 \
| xargs -I{} echo "GPU: {}"
"""
system_info_message = """Укажите параметры свой системы.
Собрать информацию о системе можно с помощью данного скрипта:
""" + Utils.code_format(
system_info_code,
"shell",
)
async def start_report(chat_id: int, bot: Bot):
await bot.send_message(
chat_id=chat_id,
text=system_info_message,
parse_mode=ParseMode.HTML,
reply_markup=ReplyKeyboardRemove(),
)
state = await get_fsm_context(chat_id, chat_id)
await state.set_state(ReportState.input_system_info)
app_info_message = """Укажите название и версию приложения.
Узнать можно с помощью данной команды:""" + Utils.code_format(
"rpm -qa | grep -i НАЗВАНИЕРИЛОЖЕНИЯ", "shell"
)
@router.message(ReportState.input_system_info)
async def system_entered(message: Message, state: FSMContext):
await state.update_data(system=message.text)
await message.answer(
text=app_info_message,
parse_mode=ParseMode.HTML,
)
await state.set_state(ReportState.input_app_name)
step_by_step_message = (
"""Опиши проблему пошагово, что ты делал, что происходило, что не так."""
)
@router.message(ReportState.input_app_name)
async def app_name_entered(message: Message, state: FSMContext):
await state.update_data(app=message.text)
await message.answer(text=step_by_step_message)
await state.set_state(ReportState.input_problem_step_by_step)
@router.message(ReportState.input_problem_step_by_step)
async def problem_step_by_step_entered(message: Message, state: FSMContext):
await state.update_data(problem_step_by_step=message.text)
await message.answer(text="Опиши, что произошло (фактический результат).")
await state.set_state(ReportState.input_actual_result)
@router.message(ReportState.input_actual_result)
async def actual_result_entered(message: Message, state: FSMContext):
await state.update_data(actual=message.text)
await message.answer(text="Опиши ожидаемый результат.")
await state.set_state(ReportState.input_expected_result)
@router.message(ReportState.input_expected_result)
async def expected_result_entered(message: Message, state: FSMContext):
await state.update_data(expected=message.text)
await message.answer(
text="Если есть дополнительная информация, то напиши ее.",
reply_markup=ReplyKeyboardMarkup(
resize_keyboard=True,
keyboard=[
[KeyboardButton(text="Дополнительной информации нет")],
],
),
)
await state.set_state(ReportState.input_additional_info)
@router.message(ReportState.input_additional_info)
async def additional_info_entered(message: Message, state: FSMContext):
if message.text == "Дополнительной информации нет":
additional_info = ""
else:
additional_info = message.text
await state.update_data(additional=additional_info)
await message.answer(
text="Вот твой отчет сообщением, а также файлом:",
reply_markup=ReplyKeyboardRemove(),
)
data = await state.get_data()
report = Report(data)
file_report = report.export().encode()
await message.answer(text=report.export())
await message.answer_document(document=BufferedInputFile(file_report, "report.txt"))
await state.clear()

View File

@@ -0,0 +1,14 @@
{
"id": "external.create_report_apps",
"name": "Create Report Apps",
"description": "Модуль для создания отчетов о ошибках в приложениях",
"author": [
"OCAB Team",
"Maxim Slipenko"
],
"version": "1.0.0",
"privileged": false,
"dependencies": {
"standard.command_helper": "^1.0.0"
}
}

View File

@@ -0,0 +1,115 @@
from typing import Union
from aiogram import Bot, F, Router
from aiogram.exceptions import TelegramForbiddenError
from aiogram.filters import BaseFilter, Command, CommandStart
from aiogram.types import (
CallbackQuery,
InlineKeyboardButton,
InlineKeyboardMarkup,
Message,
)
from ocab_core.modules_system.public_api import get_module, register_router
from .create_report import router as create_report_router
from .create_report import start_report
register_command = get_module("standard.command_helper", "register_command")
router = Router()
class ChatTypeFilter(BaseFilter):
def __init__(self, chat_type: Union[str, list]):
self.chat_type = chat_type
async def __call__(self, message: Message) -> bool:
if isinstance(self.chat_type, str):
return message.chat.type == self.chat_type
return message.chat.type in self.chat_type
@router.message(
ChatTypeFilter(chat_type=["group", "supergroup"]), Command("create_report_apps")
)
async def create_report_apps_command_group(message: Message):
keyboard = InlineKeyboardMarkup(
inline_keyboard=[
[
InlineKeyboardButton(
text="Да", callback_data=f"create_report:{message.from_user.id}"
),
InlineKeyboardButton(
text="Нет", callback_data=f"cancel_report:{message.from_user.id}"
),
]
]
)
await message.answer(
"Я могу отправить тебе пару вопросов "
"для помощи в составлении репорта личными "
"сообщениями.",
reply_markup=keyboard,
)
@router.message(
ChatTypeFilter(chat_type=["private"]),
CommandStart(deep_link=True, magic=F.args == "create_report_apps"),
)
@router.message(ChatTypeFilter(chat_type=["private"]), Command("create_report_apps"))
async def create_report_apps_command(message: Message, bot: Bot):
await start_report(message.from_user.id, bot)
@router.callback_query(F.data.startswith("cancel_report"))
async def cancel_report_callback(callback_query: CallbackQuery):
callback_user_id = int(callback_query.data.split(":")[1])
if callback_query.from_user.id != callback_user_id:
await callback_query.answer("Эта кнопка не для вас.", show_alert=True)
return
await callback_query.message.delete()
@router.callback_query(F.data.startswith("create_report"))
async def create_report_callback(callback_query: CallbackQuery, bot: Bot):
callback_user_id = int(callback_query.data.split(":")[1])
if callback_query.from_user.id != callback_user_id:
await callback_query.answer("Эта кнопка не для вас.", show_alert=True)
return
user_id = callback_query.from_user.id
async def on_chat_unavailable():
await callback_query.message.edit_text(
"Я в личных сообщениях задам тебе вопросы "
"для помощи в составлении репорта. "
'Но перед этим ты должен нажать кнопку "Запустить"'
)
info = await bot.get_me()
await callback_query.answer(
url=f"https://t.me/{info.username}?start=create_report_apps"
)
try:
chat_member = await bot.get_chat_member(chat_id=user_id, user_id=user_id)
if chat_member.status != "left":
await start_report(user_id, bot)
await callback_query.message.edit_text(
"Я в личных сообщениях задам тебе "
"вопросы для помощи в составлении "
"репорта."
)
else:
await on_chat_unavailable()
except TelegramForbiddenError:
await on_chat_unavailable()
async def module_init():
router.include_router(create_report_router)
register_router(router)
register_command("create_report_apps", "Написать репорт о приложении")

View File

@@ -0,0 +1,59 @@
import aiogram
class ReportFormatter:
def __init__(self, html=True):
self.html = html
def bold(self, string):
if self.html:
return f"<b>{self.text(string)}</b>"
return self.text(string)
def text(self, string):
if self.html:
return aiogram.html.quote(string)
return string
class Report:
def __init__(self, data: dict):
self.data = data
def export(self):
data = self.data
report = f"""
Стенд с ошибкой:
==============================
{data['system']}
Пакет:
==============================
{data['app']}
Шаги, приводящие к ошибке:
==============================
{data['problem_step_by_step']}
Фактический результат:
==============================
{data['actual']}
Ожидаемый результат:
==============================
{data['expected']}
"""
if data["additional"] != "":
report += f"""
Дополнительно:
==============================
{data['additional']}
"""
return report

View File

@@ -0,0 +1,22 @@
# Модуль YandexGPT
Модуль `yandexgpt` интегрирует в бота OCAB нейросеть YandexGPT.
## Функциональность
- Позволяет боту отвечать на сообщения пользователей, используя YandexGPT.
- Строит линию контекста для нейросети, используя историю сообщений.
## Конфигурация
- `yandexgpt::token` - API-ключ для доступа к YandexGPT.
- `yandexgpt::catalogid` - идентификатор каталога YandexGPT.
- `yandexgpt::prompt` - системная подсказка для YandexGPT.
- `yandexgpt::startword` - слова, с которых должно начинаться сообщение, чтобы бот ответил.
- `yandexgpt::inword` - слова, которые должны быть в сообщении, чтобы бот ответил.
## Использование
1. Настройте конфигурационные параметры модуля.
2. Отправьте боту сообщение, которое соответствует условиям, указанным в параметрах `startword` и `inword`.
3. Бот ответит на сообщение, используя YandexGPT.

View File

@@ -0,0 +1,2 @@
from .handlers import answer_to_message
from .main import module_init

View File

@@ -0,0 +1,47 @@
# flake8: noqa
from typing import TYPE_CHECKING
from aiogram import Bot
from aiogram.types import Message
from ocab_core.modules_system.public_api import get_module, log
from .yandexgpt import YandexGPT
if TYPE_CHECKING:
from ocab_modules.standard.config import IConfig
from ocab_modules.standard.database.db_api import add_message as IAddMessage
config: "IConfig" = get_module(
"standard.config",
"config",
)
def get_yandexgpt_catalog_id():
return config.get("yandexgpt::catalogid")
def get_yandexgpt_token():
return config.get("yandexgpt::token")
def get_yandexgpt_prompt():
return config.get("yandexgpt::prompt")
add_message: "IAddMessage" = get_module("standard.database", "db_api.add_message")
async def answer_to_message(message: Message, bot: Bot):
# print("answer_to_message")
log("answer_to_message")
yagpt = YandexGPT(get_yandexgpt_token(), get_yandexgpt_catalog_id())
text = message.text
prompt = get_yandexgpt_prompt()
# response = await yagpt.async_yandexgpt(system_prompt=prompt, input_messages=text)
response = await yagpt.yandexgpt_request(
chat_id=message.chat.id, message_id=message.message_id, type="yandexgpt"
)
reply = await message.reply(response, parse_mode="Markdown")
add_message(reply, message_ai_model="yandexgpt")

View File

@@ -0,0 +1,21 @@
{
"id": "external.yandexgpt",
"name": "Yandex GPT",
"description": "Модуль для работы с Yandex GPT",
"author": "OCAB Team",
"version": "1.0.0",
"privileged": false,
"dependencies": {
"required": {
"standard.config": "^1.0.0",
"standard.database": "^1.0.0"
}
},
"pythonDependencies": {
"required": {
"aiohttp": "*",
"requests": "*",
"json": "*"
}
}
}

View File

@@ -0,0 +1,50 @@
from typing import TYPE_CHECKING
from ocab_core.modules_system.public_api import get_module
if TYPE_CHECKING:
from ocab_modules.standard.config import IConfig
config: "IConfig" = get_module("standard.config", "config")
def module_init():
config.register(
"yandexgpt::token",
"password",
required=True,
)
config.register(
"yandexgpt::token_for_request",
"int",
default_value=8000,
)
config.register(
"yandexgpt::token_for_answer",
"int",
default_value=2000,
)
config.register(
"yandexgpt::catalogid",
"password",
required=True,
)
config.register(
"yandexgpt::prompt",
"string",
default_value="Ты чат-бот ...",
)
config.register(
"yandexgpt::startword",
"string",
default_value="Бот| Бот, | бот | бот,",
)
config.register(
"yandexgpt::inword",
"string",
default_value="помогите | не работает",
)

View File

@@ -1,7 +1,10 @@
from aiogram import Router, F
# flake8: noqa
from aiogram import F, Router
from src.modules.external.yandexgpt.handlers import answer_to_message
from .handlers import answer_to_message
router = Router()
# Если сообщение содержит в начале текст "Гномик" или "гномик" или отвечает на сообщение бота, то вызывается функция answer_to_message
router.message.register(answer_to_message, F.text.startswith("Гномик") | F.text.startswith("гномик"))
router.message.register(
answer_to_message, F.text.startswith("Гномик") | F.text.startswith("гномик")
)

View File

@@ -1,24 +1,44 @@
import requests
# flake8: noqa
import json
import asyncio
import aiohttp
from src.core.logger import log
from typing import TYPE_CHECKING
from ...standard.database import *
from ...standard.config.config import *
import aiohttp
import requests
from ocab_core.modules_system.public_api import get_module, log
if TYPE_CHECKING:
from ocab_modules.standard.config import IConfig
from ocab_modules.standard.database import db_api as IDbApi
db_api: "IDbApi" = get_module("standard.database", "db_api")
config: "IConfig" = get_module("standard.config", "config")
def get_yandexgpt_token_for_answer():
return config.get("yandexgpt::token_for_answer")
def get_yandexgpt_token_for_request():
return config.get("yandexgpt::token_for_request")
def get_yandexgpt_prompt():
return config.get("yandexgpt::prompt")
class YandexGPT:
token = None
catalog_id = None
languages = {
"ru": "русский язык",
"en": "английский язык",
"de": "немецкий язык",
"uk": "украинский язык",
"es": "испанский язык",
"be": "белорусский язык",
}
"ru": "русский язык",
"en": "английский язык",
"de": "немецкий язык",
"uk": "украинский язык",
"es": "испанский язык",
"be": "белорусский язык",
}
def __init__(self, token, catalog_id):
self.token = token
@@ -29,11 +49,13 @@ class YandexGPT:
async with session.post(url, headers=headers, json=prompt) as response:
return await response.json()
async def async_token_check(self, messages, gpt, max_tokens, stream, temperature, del_msg_id=1):
async def async_token_check(
self, messages, gpt, max_tokens, stream, temperature, del_msg_id=1
):
url = "https://llm.api.cloud.yandex.net/foundationModels/v1/tokenizeCompletion"
headers = {
"Content-Type": "application/json",
"Authorization": f"Api-Key {self.token}"
"Authorization": f"Api-Key {self.token}",
}
answer_token = get_yandexgpt_token_for_answer()
while True:
@@ -43,14 +65,16 @@ class YandexGPT:
"completionOptions": {
"stream": stream,
"temperature": temperature,
"maxTokens": max_tokens
"maxTokens": max_tokens,
},
"messages": messages
"messages": messages,
}
response = await self.async_request(url=url, headers=headers, prompt=request)
except Exception as e: # TODO: Переделать обработку ошибок
response = await self.async_request(
url=url, headers=headers, prompt=request
)
except Exception as e: # TODO: Переделать обработку ошибок
# print(e)
await log(f"Error: {e}")
log(f"Error: {e}")
continue
if int(len(response["tokens"])) < (max_tokens - answer_token):
@@ -62,12 +86,19 @@ class YandexGPT:
Exception("IndexError: list index out of range")
return messages
async def async_yandexgpt_lite(self, system_prompt, input_messages, stream=False, temperature=0.6, max_tokens=8000):
async def async_yandexgpt_lite(
self,
system_prompt,
input_messages,
stream=False,
temperature=0.6,
max_tokens=8000,
):
url = "https://llm.api.cloud.yandex.net/foundationModels/v1/completion"
gpt = f"gpt://{self.catalog_id}/yandexgpt-lite/latest"
headers = {
"Content-Type": "application/json",
"Authorization": f"Api-Key {self.token}"
"Authorization": f"Api-Key {self.token}",
}
messages = [{"role": "system", "text": system_prompt}]
@@ -80,27 +111,30 @@ class YandexGPT:
"completionOptions": {
"stream": stream,
"temperature": temperature,
"maxTokens": max_tokens
"maxTokens": max_tokens,
},
"messages": messages
"messages": messages,
}
response = requests.post(url, headers=headers, json=prompt).text
response = requests.post(url, headers=headers, json=prompt).text # nosec
return json.loads(response)["result"]["alternatives"][0]["message"]["text"]
async def async_yandexgpt(
self,
system_prompt,
input_messages,
stream=False,
temperature=0.6,
max_tokens=get_yandexgpt_token_for_request()
self,
system_prompt,
input_messages,
stream=False,
temperature=0.6,
max_tokens=None,
):
if max_tokens is None:
max_tokens = get_yandexgpt_token_for_request()
url = "https://llm.api.cloud.yandex.net/foundationModels/v1/completion"
gpt = f"gpt://{self.catalog_id}/yandexgpt/latest"
headers = {
"Content-Type": "application/json",
"Authorization": f"Api-Key {self.token}"
"Authorization": f"Api-Key {self.token}",
}
messages = []
@@ -108,21 +142,24 @@ class YandexGPT:
for message in input_messages:
messages.append(message)
messages = await self.async_token_check(messages, gpt, max_tokens, stream, temperature)
messages = await self.async_token_check(
messages, gpt, max_tokens, stream, temperature
)
request = {
"modelUri": gpt,
"completionOptions": {
"stream": stream,
"temperature": temperature,
"maxTokens": max_tokens
"maxTokens": max_tokens,
},
"messages": messages
"messages": messages,
}
response = await self.async_request(url=url, headers=headers, prompt=request)
response = await self.async_request(
url=url, headers=headers, prompt=request
) # nosec
return response["result"]["alternatives"][0]["message"]["text"]
async def async_yandexgpt_translate(self, input_language, output_language, text):
input_language = self.languages[input_language]
output_language = self.languages[output_language]
@@ -130,7 +167,9 @@ class YandexGPT:
return await self.async_yandexgpt(
f"Переведи на {output_language} сохранив оригинальный смысл текста. Верни только результат:",
[{"role": "user", "text": text}],
stream=False, temperature=0.6, max_tokens=8000
stream=False,
temperature=0.6,
max_tokens=8000,
)
async def async_yandexgpt_spelling_check(self, input_language, text):
@@ -140,15 +179,19 @@ class YandexGPT:
f"Проверьте орфографию и пунктуацию текста на {input_language}. Верни исправленный текст "
f"без смысловых искажений:",
[{"role": "user", "text": text}],
stream=False, temperature=0.6, max_tokens=8000
stream=False,
temperature=0.6,
max_tokens=8000,
)
async def async_yandexgpt_text_history(self, input_messages, stream=False, temperature=0.6, max_tokens=8000):
async def async_yandexgpt_text_history(
self, input_messages, stream=False, temperature=0.6, max_tokens=8000
):
url = "https://llm.api.cloud.yandex.net/foundationModels/v1/completion"
gpt = f"gpt://{self.catalog_id}/summarization/latest"
headers = {
"Content-Type": "application/json",
"Authorization": f"Api-Key {self.token}"
"Authorization": f"Api-Key {self.token}",
}
messages = []
@@ -161,15 +204,17 @@ class YandexGPT:
"completionOptions": {
"stream": stream,
"temperature": temperature,
"maxTokens": max_tokens
"maxTokens": max_tokens,
},
"messages": messages
"messages": messages,
}
response = requests.post(url, headers=headers, json=prompt).text
response = requests.post(url, headers=headers, json=prompt).text # nosec
return json.loads(response)["result"]["alternatives"][0]["message"]["text"]
async def async_yandex_cloud_text_to_speech(self, text, voice, emotion, speed, format, quality):
async def async_yandex_cloud_text_to_speech(
self, text, voice, emotion, speed, format, quality
):
tts = "tts.api.cloud.yandex.net/speech/v1/tts:synthesize"
# TODO: Сделать функцию TTS
return 0
@@ -187,14 +232,18 @@ class YandexGPT:
if db_api.get_message_ai_model(chat_id, message_id) != None:
messages.append({"role": "assistant", "text": message})
else:
sender_name = db_api.get_user_name(db_api.get_message_sender_id(chat_id, message_id))
sender_name = db_api.get_user_name(
db_api.get_message_sender_id(chat_id, message_id)
)
messages.append({"role": "user", "text": sender_name + ": " + message})
message_id = db_api.get_answer_to_message_id(chat_id, message_id)
if message_id is None:
break
return list(reversed(messages))
async def collecting_messages_for_history(self, start_message_id, end_message_id, chat_id):
async def collecting_messages_for_history(
self, start_message_id, end_message_id, chat_id
):
messages = []
# Собираем цепочку сообщений в формате: [{"role": "user", "text": "<Имя_пользователя>: Привет!"},
# {"role": "assistant", "text": "Привет!"}]
@@ -203,47 +252,61 @@ class YandexGPT:
if db_api.get_message_ai_model(chat_id, start_message_id) != None:
messages.append({"role": "assistant", "text": message})
else:
sender_name = db_api.get_user_name(db_api.get_message_sender_id(chat_id, start_message_id))
sender_name = db_api.get_user_name(
db_api.get_message_sender_id(chat_id, start_message_id)
)
messages.append({"role": "user", "text": sender_name + ": " + message})
start_message_id -= 1
if start_message_id <= end_message_id:
break
return messages.reverse()
async def yandexgpt_request(self, message_id = None, type = "yandexgpt-lite", chat_id = None,
message_id_end = None, input_language = None, output_language = None, text = None):
async def yandexgpt_request(
self,
message_id=None,
type="yandexgpt-lite",
chat_id=None,
message_id_end=None,
input_language=None,
output_language=None,
text=None,
):
if type == "yandexgpt-lite":
messages = await self.collect_messages(message_id, chat_id)
return await self.async_yandexgpt_lite(
system_prompt=get_yandexgpt_prompt(),
input_messages=messages,
stream=False, temperature=0.6, max_tokens=8000
stream=False,
temperature=0.6,
max_tokens=8000,
)
elif type == "yandexgpt":
# print("yandexgpt_request")
await log("yandexgpt_request")
log("yandexgpt_request")
messages = await self.collect_messages(message_id, chat_id)
return await self.async_yandexgpt(
system_prompt=get_yandexgpt_prompt(),
input_messages=messages,
stream=False, temperature=0.6, max_tokens=get_yandexgpt_token_for_request()
stream=False,
temperature=0.6,
max_tokens=get_yandexgpt_token_for_request(),
)
elif type == "yandexgpt-translate":
return await self.async_yandexgpt_translate(
input_language,
output_language,
text=db_api.get_message_text(chat_id, message_id)
text=db_api.get_message_text(chat_id, message_id),
)
elif type == "yandexgpt-spelling-check":
return await self.async_yandexgpt_spelling_check(
input_language,
text=db_api.get_message_text(chat_id, message_id)
input_language, text=db_api.get_message_text(chat_id, message_id)
)
elif type == "yandexgpt-text-history":
messages = await self.collect_messages_for_history(message_id, message_id_end, chat_id)
messages = await self.collect_messages_for_history(
message_id, message_id_end, chat_id
)
return await self.async_yandexgpt_text_history(
messages=messages,
stream=False, temperature=0.6, max_tokens=8000
messages=messages, stream=False, temperature=0.6, max_tokens=8000
)
else:
return "Ошибка: Неизвестный тип запроса | Error: Unknown request type"

View File

@@ -0,0 +1 @@
from .moderation import ban_user, unmute_user

Some files were not shown because too many files have changed in this diff Show More