0
0
mirror of https://gitflic.ru/project/maks1ms/ocab.git synced 2025-01-11 17:28:12 +03:00
This commit is contained in:
Maxim Slipenko 2024-07-13 15:06:29 +03:00
parent 3b849417c3
commit 4a609db595
20 changed files with 413 additions and 48 deletions

View File

@ -21,6 +21,7 @@ packages = [{include = "scripts"}]
[tool.poetry.scripts]
test = 'scripts.test:main'
init = 'scripts.init:main'
module = 'scripts.module:main'
[tool.poetry.dependencies]
python = ">=3.11.6,<3.13"

View File

@ -27,12 +27,12 @@ async def log(message):
Она асинхронная, хотя таковой на самом деле не является.
"""
log_new(message)
def log_new(message):
if isinstance(message, Exception):
error_message = f"Error: {str(message)}\n{traceback.format_exc()}"
logging.error(error_message)
else:
logging.info(message)
def log_new(message):
logging.info(message)

View File

@ -15,34 +15,35 @@ bot_modules = [
UnsafeFSLoader(f"{paths.modules_standard}/config"),
UnsafeFSLoader(f"{paths.modules_standard}/database"),
UnsafeFSLoader(f"{paths.modules_standard}/roles"),
FSLoader(f"{paths.modules_standard}/command_helper"),
FSLoader(f"{paths.modules_standard}/info"),
FSLoader(f"{paths.modules_standard}/create_report_apps"),
FSLoader(f"{paths.modules_standard}/admin"),
]
async def main():
bot = None
database = None
setup_logger()
app = Singleton()
try:
bot = Bot(token=get_telegram_token())
app.bot = Bot(token=get_telegram_token())
app.dp = Dispatcher()
app.modules_manager = ModulesManager()
for module_loader in bot_modules:
app.modules_manager.load(module_loader)
await app.modules_manager.load(module_loader)
await app.dp.start_polling(bot)
await app.modules_manager.late_init()
await app.dp.start_polling(app.bot)
except Exception:
traceback.print_exc()
finally:
if bot is not None:
await bot.session.close()
if database is not None:
database.close()
await app.bot.session.close()
if __name__ == "__main__":

View File

@ -1,13 +1,17 @@
from _ast import AnnAssign
from typing import Any
from aiogram import Bot
from RestrictedPython import (
RestrictingNodeTransformer,
limited_builtins,
safe_builtins,
utility_builtins,
)
from RestrictedPython.Eval import default_guarded_getitem
from RestrictedPython.Eval import default_guarded_getitem, default_guarded_getiter
from RestrictedPython.Guards import full_write_guard, safer_getattr
from ocab_core.logger import log_new
class RestrictedPythonPolicy(RestrictingNodeTransformer):
@ -77,12 +81,24 @@ ALLOWED_IMPORTS = [
"warnings",
]
def safes_getattr(object, name, default=None, getattr=safer_getattr):
if isinstance(object, Bot) and name == "token":
log_new("Bot.token is not allowed")
raise Exception("Bot.token is not allowed")
return getattr(object, name, default)
BUILTINS = safe_builtins.copy()
BUILTINS.update(utility_builtins)
BUILTINS.update(limited_builtins)
BUILTINS["__metaclass__"] = _metaclass
BUILTINS["_getitem_"] = default_guarded_getitem
# BUILTINS["_write_"] = full_write_guard
BUILTINS["_getattr_"] = safes_getattr
BUILTINS["_getiter_"] = default_guarded_getiter
BUILTINS["_write_"] = full_write_guard
BUILTINS["staticmethod"] = staticmethod

View File

@ -49,9 +49,6 @@ class UnsafeFSLoader(AbstractLoader):
# Добавляем директорию модуля в sys.path
sys.path.insert(0, str(path))
print(full_path.parent.absolute())
print(module_name)
# Загружаем спецификацию модуля
spec = importlib.util.spec_from_file_location(module_name, full_path)

View File

@ -23,20 +23,25 @@ def is_version_compatible(version, requirement):
class ModulesManager:
def __init__(self):
self.modules = {}
self.modules = []
def load(self, loader: AbstractLoader):
async def load(self, loader: AbstractLoader):
info = loader.info()
if info.id in self.modules:
# Check if the module is already loaded
if any(mod["info"].id == info.id for mod in self.modules):
return
# Check dependencies
for dependency, version in info.dependencies.items():
if dependency not in self.modules:
loaded_dependency = next(
(mod for mod in self.modules if mod["info"].id == dependency), None
)
if not loaded_dependency:
raise Exception(
f"Module {info.id} depends on {dependency}, but it is not loaded"
)
loaded_dependency_info = self.modules[dependency]["info"]
loaded_dependency_info = loaded_dependency["info"]
if not is_version_compatible(loaded_dependency_info.version, version):
raise Exception(
f"Module {info.id} depends on {dependency}, "
@ -45,22 +50,36 @@ class ModulesManager:
module = loader.load()
self.modules[info.id] = {
self.modules.append(
{
"info": info,
"module": module,
}
)
if hasattr(module, "module_init"):
module.module_init()
await module.module_init()
async def late_init(self):
for m in self.modules:
module = m["module"]
if hasattr(module, "module_late_init"):
await module.module_late_init()
def get_by_id(self, module_id: str):
if module_id not in self.modules:
module = next(
(mod for mod in self.modules if mod["info"].id == module_id), None
)
if not module:
raise Exception(f"Module with id {module_id} not loaded")
return self.modules[module_id]["module"]
return module["module"]
def get_info_by_id(self, module_id: str):
if module_id not in self.modules:
module = next(
(mod for mod in self.modules if mod["info"].id == module_id), None
)
if not module:
raise Exception(f"Module with id {module_id} not loaded")
return self.modules[module_id]["info"]
return module["info"]

View File

@ -1,3 +1,10 @@
from ocab_core.logger import log # noqa
from .public_api import Storage, get_module, register_router
from .public_api import (
Storage,
get_fsm_context,
get_module,
register_outer_message_middleware,
register_router,
set_my_commands,
)

View File

@ -1,7 +1,9 @@
import types
from typing import Any, Tuple, Union
from aiogram import Router
from aiogram import BaseMiddleware, Router
from aiogram.fsm.context import FSMContext
from aiogram.fsm.storage.base import StorageKey
from ocab_core.singleton import Singleton
@ -11,6 +13,30 @@ def register_router(router: Router):
app.dp.include_router(router)
def register_outer_message_middleware(middleware: BaseMiddleware):
app = Singleton()
app.dp.message.outer_middleware.register(middleware)
async def set_my_commands(commands):
app = Singleton()
await app.bot.set_my_commands(commands)
async def get_fsm_context(chat_id: int, user_id: int) -> FSMContext:
dp = Singleton().dp
bot = Singleton().bot
return FSMContext(
storage=dp.storage,
key=StorageKey(
chat_id=chat_id,
user_id=user_id,
bot_id=bot.id,
),
)
def get_module(
module_id: str, paths=None
) -> Union[types.ModuleType, Union[Any, None], Tuple[Union[Any, None], ...]]:

View File

@ -1,4 +1,4 @@
from aiogram import Dispatcher
from aiogram import Bot, Dispatcher
from ocab_core.modules_system import ModulesManager
@ -14,6 +14,7 @@ class SingletonMeta(type):
class Singleton(metaclass=SingletonMeta):
bot: Bot
dp: Dispatcher = None
modules_manager: ModulesManager = None
storage = dict()

View File

@ -0,0 +1 @@
from .main import module_init, module_late_init, register_command

View File

@ -0,0 +1,12 @@
{
"id": "standard.command_helper",
"name": "Command helper",
"description": "Модуль для отображения команд при вводе '/'",
"author": "OCAB Team",
"version": "1.0.0",
"privileged": false,
"dependencies": {
"standard.roles": "^1.0.0",
"standard.database": "^1.0.0"
}
}

View File

@ -0,0 +1,98 @@
from typing import Any, Awaitable, Callable, Dict
from aiogram import BaseMiddleware
from aiogram.types import BotCommand, Message, TelegramObject
from ocab_core.modules_system.public_api import (
get_module,
log,
register_outer_message_middleware,
set_my_commands,
)
commands = dict()
db_api = get_module(
"standard.database",
"db_api",
)
Roles = get_module("standard.roles", "Roles")
def register_command(command, description, role="USER"):
if role not in commands:
commands[role] = dict()
commands[role][command] = {
"description": description,
}
class OuterMiddleware(BaseMiddleware):
async def __call__(
self,
handler: Callable[[TelegramObject, Dict[str, Any]], Awaitable[Any]],
event: TelegramObject,
data: Dict[str, Any],
):
if not isinstance(event, Message):
return await handler(event, data)
user = db_api.get_user(event.from_user.id)
if user is None:
return
roles = Roles()
role_name = await roles.get_role_name(role_id=user.user_role)
if role_name not in commands:
return await handler(event, data)
# bot_commands = []
# for role_command in commands[role_name]:
# bot_commands.append(
# BotCommand(
# command=role_command,
# description=commands[role_name][role_command]["description"],
# )
# )
# await event.bot.set_my_commands(
# bot_commands,
# BotCommandScopeChatMember(
# chat_id=event.chat.id,
# user_id=event.from_user.id,
# ),
# )
return await handler(event, data)
async def module_init():
register_outer_message_middleware(OuterMiddleware())
async def set_user_commands():
bot_commands = []
if "USER" in commands:
user_commands = commands["USER"]
for command in user_commands:
bot_commands.append(
BotCommand(
command=command,
description=user_commands[command]["description"],
)
)
await set_my_commands(
bot_commands,
)
async def module_late_init():
await log("module_late_init")
await set_user_commands()

View File

@ -0,0 +1 @@
from .main import module_init

View File

@ -0,0 +1,64 @@
from aiogram import Bot, Router
from aiogram.fsm.context import FSMContext
from aiogram.fsm.state import State, StatesGroup
from aiogram.types import BufferedInputFile, Message
from ocab_core.modules_system.public_api import get_fsm_context
router = Router()
class ReportState(StatesGroup):
input_kernel_info = State()
input_app_name = State()
input_problem_step_by_step = State()
async def start_report(chat_id: int, bot: Bot):
await bot.send_message(
chat_id=chat_id,
text="Какая версия ядра у тебя на "
"текущий момент? Можно узнать "
"командой `uname -rm`",
parse_mode="Markdown",
)
state = await get_fsm_context(chat_id, chat_id)
await state.set_state(ReportState.input_kernel_info)
@router.message(ReportState.input_kernel_info)
async def kernel_version_entered(message: Message, state: FSMContext):
await state.update_data(kernel=message.text)
await message.answer(text="В каком приложении " "возникла проблема?")
await state.set_state(ReportState.input_app_name)
@router.message(ReportState.input_app_name)
async def app_name_entered(message: Message, state: FSMContext):
await state.update_data(app_name=message.text)
await message.answer(
text="Опиши проблему пошагово, " "что ты делал, что происходило, что не так"
)
await state.set_state(ReportState.input_problem_step_by_step)
@router.message(ReportState.input_problem_step_by_step)
async def problem_step_by_step_entered(message: Message, state: FSMContext):
await state.update_data(problem_step_by_step=message.text)
await message.answer(text="Вот твой отчет сообщением, " "а также файлом:")
data = await state.get_data()
report = f"""Стенд с ошибкой:
# uname -rm
{data['kernel']}
Шаги, приводящие к ошибке:
{data['problem_step_by_step']}
"""
await message.answer(text=report)
await message.answer_document(
document=BufferedInputFile(report.encode(), "report.txt")
)
await state.clear()

View File

@ -0,0 +1,11 @@
{
"id": "standard.create_report_apps",
"name": "Create Report Apps",
"description": "Модуль для создания отчетов о ошибках в приложениях",
"author": "OCAB Team",
"version": "1.0.0",
"privileged": false,
"dependencies": {
"standard.command_helper": "^1.0.0"
}
}

View File

@ -0,0 +1,101 @@
from typing import Union
from aiogram import Bot, F, Router
from aiogram.exceptions import TelegramForbiddenError
from aiogram.filters import BaseFilter, Command, CommandStart
from aiogram.types import (
CallbackQuery,
InlineKeyboardButton,
InlineKeyboardMarkup,
Message,
)
from ocab_core.modules_system.public_api import get_module, register_router
from .create_report import router as create_report_router
from .create_report import start_report
register_command = get_module("standard.command_helper", "register_command")
router = Router()
class ChatTypeFilter(BaseFilter):
def __init__(self, chat_type: Union[str, list]):
self.chat_type = chat_type
async def __call__(self, message: Message) -> bool:
if isinstance(self.chat_type, str):
return message.chat.type == self.chat_type
return message.chat.type in self.chat_type
@router.message(
ChatTypeFilter(chat_type=["group", "supergroup"]), Command("create_report_apps")
)
async def create_report_apps_command_group(message: Message):
keyboard = InlineKeyboardMarkup(
inline_keyboard=[
[
InlineKeyboardButton(text="Да", callback_data="create_report"),
InlineKeyboardButton(text="Нет", callback_data="cancel_report"),
]
]
)
await message.answer(
"Я могу отправить тебе пару вопросов "
"для помощи в составлении репорта личными "
"сообщениями.",
reply_markup=keyboard,
)
@router.message(
ChatTypeFilter(chat_type=["private"]),
CommandStart(deep_link=True, magic=F.args == "create_report_apps"),
)
@router.message(ChatTypeFilter(chat_type=["private"]), Command("create_report_apps"))
async def create_report_apps_command(message: Message, bot: Bot):
await start_report(message.from_user.id, bot)
@router.callback_query(F.data == "cancel_report")
async def cancel_report_callback(callback_query: CallbackQuery):
await callback_query.message.delete()
@router.callback_query(F.data == "create_report")
async def create_report_callback(callback_query: CallbackQuery, bot: Bot):
user_id = callback_query.from_user.id
async def on_chat_unavailable():
await callback_query.message.edit_text(
"Я в личных сообщениях задам тебе вопросы "
"для помощи в составлении репорта. "
'Но перед этим ты должен нажать кнопку "Запустить"'
)
info = await bot.get_me()
await callback_query.answer(
url=f"https://t.me/{info.username}?start=create_report_apps"
)
try:
chat_member = await bot.get_chat_member(chat_id=user_id, user_id=user_id)
if chat_member.status != "left":
await start_report(user_id, bot)
await callback_query.message.edit_text(
"Я в личных сообщениях задам тебе "
"вопросы для помощи в составлении "
"репорта."
)
else:
await on_chat_unavailable()
except TelegramForbiddenError:
await on_chat_unavailable()
async def module_init():
router.include_router(create_report_router)
register_router(router)
register_command("create_report_apps", "Написать репорт о приложении")

View File

@ -1,5 +1,5 @@
from . import db_api, models
def module_init():
async def module_init():
db_api.connect_database()

View File

@ -1,14 +1,2 @@
from aiogram import F, Router
from ocab_core.modules_system.public_api import register_router
from .handlers import get_chat_info, get_user_info
def module_init():
router = Router()
router.message.register(get_user_info, F.text.startswith("/info"))
router.message.register(get_chat_info, F.text.startswith("/chatinfo"))
register_router(router)
from .main import module_init

View File

@ -7,6 +7,7 @@
"privileged": false,
"dependencies": {
"standard.roles": "^1.0.0",
"standard.database": "^1.0.0"
"standard.database": "^1.0.0",
"standard.command_helper": "^1.0.0"
}
}

View File

@ -0,0 +1,20 @@
from aiogram import Router
from aiogram.filters import Command
from ocab_core.modules_system.public_api import get_module, register_router
from .handlers import get_chat_info, get_user_info
register_command = get_module("standard.command_helper", "register_command")
async def module_init():
router = Router()
router.message.register(get_user_info, Command("info"))
router.message.register(get_chat_info, Command("chatinfo"))
register_router(router)
register_command("info", "Информация о пользователе")
register_command("chatinfo", "Информация о чате")