mirror of
https://gitflic.ru/project/alt-gnome/karkas.git
synced 2024-12-23 16:23:02 +03:00
wip
This commit is contained in:
parent
f5f662d6de
commit
9fb56ce1e0
@ -9,9 +9,11 @@ async def main():
|
||||
await ocab.init_app(
|
||||
[
|
||||
block_loader("standard", "config", safe=False),
|
||||
block_loader("experimental", "database", safe=False),
|
||||
block_loader("experimental", "roles", safe=False),
|
||||
block_loader("experimental", "message_db_logger", safe=False),
|
||||
block_loader("standard", "database", safe=False),
|
||||
block_loader("standard", "command_helper"),
|
||||
block_loader("standard", "roles", safe=False),
|
||||
# block_loader("experimental", "message_db_logger", safe=False),
|
||||
block_loader("standard", "info"),
|
||||
block_loader("standard", "help"),
|
||||
# block_loader("standard", "config", safe=False),
|
||||
# block_loader("standard", "database", safe=False),
|
||||
|
@ -1 +0,0 @@
|
||||
from .main import module_init, module_late_init, register_app_config
|
@ -1,9 +0,0 @@
|
||||
{
|
||||
"id": "experimental.database",
|
||||
"name": "Database",
|
||||
"description": "Модуль для работы с БД",
|
||||
"author": "Karkas Team",
|
||||
"version": "1.0.0",
|
||||
"privileged": true,
|
||||
"dependencies": {}
|
||||
}
|
@ -1,48 +0,0 @@
|
||||
## Модуль DataBase
|
||||
|
||||
Модуль DataBase предназначен для ведения и работы с базами данных Karkas.
|
||||
|
||||
Модуль содержит в себе следующие таблицы:
|
||||
|
||||
* `Chats` - таблица для хранения информации о чатах.
|
||||
* `Users` - таблица для хранения информации о пользователях.
|
||||
* `Messages` - таблица для хранения информации о сообщениях.
|
||||
* `ChatStats` - таблица для хранения статистики чатов по дням.
|
||||
* `UserStats` - таблица для хранения статистики пользователей по дням.
|
||||
|
||||
Cтруктура таблицы `Chats`:
|
||||
* `chat_id` - идентификатор чата.
|
||||
* `chat_name` - название чата.
|
||||
* `chat_type` - тип чата. (0 - Чат администраторов, 1 - Пользовательский чат, 3 - Чат разрешённых личных запросов к боту
|
||||
10 - Не инициализированный чат)
|
||||
* `chat_stats` - количество всех отправленных сообщений в чате.
|
||||
|
||||
Cтруктура таблицы `Users`:
|
||||
* `user_id` - идентификатор пользователя telegram.
|
||||
* `user_tag` - тег пользователя telegram.
|
||||
* `user_name` - имя пользователя telegram.
|
||||
* `user_role` - роль пользователя в чате. (0 - Администратор, 1 - Модератор, 2 - Пользователь)
|
||||
* `user_stats` - количество всех отправленных сообщений пользователем.
|
||||
* `user_rep` - репутация пользователя.
|
||||
|
||||
Cтруктура таблицы `Messages`:
|
||||
* `message_chat_id` - идентификатор чата в котором отправлено сообщение.
|
||||
* `message_id` - идентификатор сообщения.
|
||||
* `messag_sender_id` - идентификатор пользователя отправившего сообщение. Если сообщение отправил бот, то
|
||||
`messag_sender_id` = 0.
|
||||
* `answer_to_message_id` - идентификатор сообщения на которое дан ответ. Если ответа нет или ответ на служебное
|
||||
сообщение о создании топика в чатах с форумным типом, то `answer_to_message_id` = 0.
|
||||
* `message_ai_model` - идентификатор модели нейросети, которая использовалась для генерации ответа. Если ответ'
|
||||
сгенерирован не был, то `message_ai_model` = null.
|
||||
* `message_text` - текст сообщения.
|
||||
|
||||
Cтруктура таблицы `ChatStats`:
|
||||
* `chat_id` - идентификатор чата для которого собрана статистика.
|
||||
* `date` - дата на которую собрана статистика.
|
||||
* `messages_count` - количество сообщений отправленных в чат за день.
|
||||
|
||||
Cтруктура таблицы `UserStats`:
|
||||
* `chat_id` - идентификатор чата для которого собрана статистика.
|
||||
* `user_id` - идентификатор пользователя для которого собрана статистика.
|
||||
* `date` - дата на которую собрана статистика.
|
||||
* `messages_count` - количество сообщений отправленных пользователем в чат за день.
|
@ -1,5 +1 @@
|
||||
from . import db_api, models, repositories
|
||||
|
||||
|
||||
async def module_init():
|
||||
db_api.connect_database()
|
||||
from .main import module_init, module_late_init, register_app_config
|
||||
|
@ -1,301 +0,0 @@
|
||||
import peewee as pw
|
||||
from aiogram.types import Message
|
||||
|
||||
from .exceptions import NotExpectedModuleName
|
||||
from .models.chat_stats import ChatStats
|
||||
from .models.chats import Chats
|
||||
from .models.db import database_proxy
|
||||
from .models.fsm_data import FSMData
|
||||
from .models.messages import Messages
|
||||
from .models.user_stats import UserStats
|
||||
from .models.users import Users
|
||||
|
||||
|
||||
def connect_database(is_test: bool = False, module: str | None = None):
|
||||
if module:
|
||||
raise NotExpectedModuleName()
|
||||
db_path = "database"
|
||||
|
||||
database = pw.SqliteDatabase(f"{db_path}/Karkas.db")
|
||||
database_proxy.initialize(database)
|
||||
database.connect()
|
||||
create_tables(database)
|
||||
|
||||
return database, f"{db_path}/Karkas.db"
|
||||
|
||||
|
||||
def create_tables(db: pw.SqliteDatabase):
|
||||
"""Создание таблиц"""
|
||||
for table in Chats, Messages, Users, UserStats, ChatStats, FSMData:
|
||||
if not table.table_exists():
|
||||
db.create_tables([table])
|
||||
|
||||
|
||||
def add_chat(chat_id, chat_name, chat_type=10, chat_stats=0):
|
||||
chat, created = Chats.get_or_create(
|
||||
id=chat_id,
|
||||
defaults={
|
||||
"chat_name": chat_name,
|
||||
"chat_type": chat_type,
|
||||
"chat_all_stat": chat_stats,
|
||||
},
|
||||
)
|
||||
if not created:
|
||||
# Обновить существующий чат, если он уже существует
|
||||
chat.chat_name = chat_name
|
||||
chat.chat_type = chat_type
|
||||
chat.chat_stats = chat_stats
|
||||
chat.save()
|
||||
|
||||
|
||||
def add_user(
|
||||
user_id,
|
||||
user_first_name,
|
||||
user_last_name=None,
|
||||
user_tag=None,
|
||||
user_role=0,
|
||||
user_stats=0,
|
||||
user_rep=0,
|
||||
):
|
||||
if user_last_name is None:
|
||||
user_name = user_first_name
|
||||
else:
|
||||
user_name = user_first_name + " " + user_last_name
|
||||
|
||||
user, created = Users.get_or_create(
|
||||
id=user_id,
|
||||
defaults={
|
||||
"user_tag": user_tag,
|
||||
"user_name": user_name,
|
||||
"user_role": user_role,
|
||||
"user_stats": user_stats,
|
||||
"user_rep": user_rep,
|
||||
},
|
||||
)
|
||||
if not created:
|
||||
# Обновить существующего пользователя, если он уже существует
|
||||
user.user_tag = user_tag
|
||||
user.user_name = user_name
|
||||
user.user_role = user_role
|
||||
user.user_stats = user_stats
|
||||
user.user_rep = user_rep
|
||||
user.save()
|
||||
|
||||
|
||||
def add_message(message: Message, message_ai_model=None):
|
||||
if message.reply_to_message:
|
||||
answer_to_message_id = message.reply_to_message.message_id
|
||||
else:
|
||||
answer_to_message_id = None
|
||||
Messages.create(
|
||||
message_chat_id=message.chat.id,
|
||||
message_id=message.message_id,
|
||||
message_sender_id=message.from_user.id,
|
||||
answer_to_message_id=answer_to_message_id,
|
||||
message_ai_model=message_ai_model,
|
||||
message_text=message.text,
|
||||
)
|
||||
|
||||
|
||||
def add_chat_stats(chat_id, date, messages_count):
|
||||
ChatStats.create(chat_id=chat_id, date=date, messages_count=messages_count)
|
||||
|
||||
|
||||
def add_user_stats(chat_id, user_id, date, messages_count):
|
||||
UserStats.create(
|
||||
chat_id=chat_id, user_id=user_id, date=date, messages_count=messages_count
|
||||
)
|
||||
|
||||
|
||||
# Работа с таблицей чатов
|
||||
|
||||
|
||||
def get_chat(chat_id):
|
||||
return Chats.get_or_none(Chats.id == chat_id)
|
||||
|
||||
|
||||
def change_chat_name(chat_id, new_chat_name):
|
||||
query = Chats.update(chat_name=new_chat_name).where(Chats.id == chat_id)
|
||||
query.execute()
|
||||
|
||||
|
||||
def change_chat_type(chat_id, new_chat_type):
|
||||
query = Chats.update(chat_type=new_chat_type).where(Chats.id == chat_id)
|
||||
query.execute()
|
||||
|
||||
|
||||
def get_chat_all_stat(chat_id):
|
||||
chat = Chats.get_or_none(Chats.id == chat_id)
|
||||
return chat.chat_all_stat if chat else None
|
||||
|
||||
|
||||
# Работа с таблицей пользователей
|
||||
|
||||
|
||||
def get_user(user_id) -> Users | None:
|
||||
return Users.get_or_none(Users.id == user_id)
|
||||
|
||||
|
||||
def get_user_tag(user_id):
|
||||
user = Users.get_or_none(Users.id == user_id)
|
||||
return user.user_tag if user else None
|
||||
|
||||
|
||||
def get_user_id(user_tag):
|
||||
user = Users.get_or_none(Users.user_tag == user_tag)
|
||||
return user.id if user else None
|
||||
|
||||
|
||||
def get_user_name(user_id):
|
||||
user = Users.get_or_none(Users.id == user_id)
|
||||
return user.user_name if user else None
|
||||
|
||||
|
||||
def get_user_role(user_id: str):
|
||||
user = Users.get_or_none(Users.id == user_id)
|
||||
return user.user_role if user else None
|
||||
|
||||
|
||||
def get_user_all_stats(user_id):
|
||||
user = Users.get_or_none(Users.id == user_id)
|
||||
return user.user_stats if user else None
|
||||
|
||||
|
||||
def get_user_rep(user_id):
|
||||
user = Users.get_or_none(Users.id == user_id)
|
||||
return user.user_rep if user else None
|
||||
|
||||
|
||||
def change_user_name(user_id, user_first_name, user_last_name=None):
|
||||
if user_last_name is None:
|
||||
new_user_name = user_first_name
|
||||
else:
|
||||
new_user_name = user_first_name + " " + user_last_name
|
||||
query = Users.update(user_name=new_user_name).where(Users.id == user_id)
|
||||
query.execute()
|
||||
|
||||
|
||||
def change_user_tag(user_id, new_user_tag):
|
||||
query = Users.update(user_tag=new_user_tag).where(Users.id == user_id)
|
||||
query.execute()
|
||||
|
||||
|
||||
def change_user_role(user_id, new_user_role):
|
||||
query = Users.update(user_role=new_user_role).where(Users.id == user_id)
|
||||
query.execute()
|
||||
|
||||
|
||||
# Работа с таблицей сообщений
|
||||
|
||||
|
||||
def get_message(message_chat_id, message_id):
|
||||
return Messages.get_or_none(
|
||||
Messages.message_chat_id == message_chat_id,
|
||||
Messages.message_id == message_id,
|
||||
)
|
||||
|
||||
|
||||
def get_message_sender_id(message_chat_id, message_id):
|
||||
message = Messages.get_or_none(
|
||||
Messages.message_chat_id == message_chat_id, Messages.message_id == message_id
|
||||
)
|
||||
return message.message_sender_id if message else None
|
||||
|
||||
|
||||
def get_message_text(message_chat_id, message_id):
|
||||
message = Messages.get_or_none(
|
||||
Messages.message_chat_id == message_chat_id, Messages.message_id == message_id
|
||||
)
|
||||
return message.message_text if message else None
|
||||
|
||||
|
||||
def get_message_ai_model(message_chat_id, message_id):
|
||||
message = Messages.get_or_none(
|
||||
Messages.message_chat_id == message_chat_id, Messages.message_id == message_id
|
||||
)
|
||||
return message.message_ai_model if message else None
|
||||
|
||||
|
||||
def get_answer_to_message_id(message_chat_id, message_id):
|
||||
message = Messages.get_or_none(
|
||||
Messages.message_chat_id == message_chat_id, Messages.message_id == message_id
|
||||
)
|
||||
return message.answer_to_message_id if message else None
|
||||
|
||||
|
||||
# Работа с таблицей статистики чатов
|
||||
|
||||
|
||||
def get_chat_stats(chat_id):
|
||||
chat_stats = {}
|
||||
for chat_stat in ChatStats.select().where(ChatStats.chat_id == chat_id):
|
||||
chat_stats[chat_stat.date] = chat_stat.messages_count
|
||||
return chat_stats
|
||||
|
||||
|
||||
# Работа с таблицей статистики пользователей
|
||||
|
||||
|
||||
def get_user_stats(user_id):
|
||||
user_stats = {}
|
||||
for user_stat in UserStats.select().where(UserStats.user_id == user_id):
|
||||
user_stats[user_stat.date] = user_stat.messages_count
|
||||
return user_stats
|
||||
|
||||
|
||||
# Функции обновления
|
||||
|
||||
|
||||
def update_chat_all_stat(chat_id):
|
||||
query = Chats.update(chat_all_stat=Chats.chat_all_stat + 1).where(
|
||||
Chats.id == chat_id
|
||||
)
|
||||
query.execute()
|
||||
|
||||
|
||||
def update_chat_stats(chat_id, date):
|
||||
chat_stats = ChatStats.get_or_none(
|
||||
ChatStats.chat_id == chat_id, ChatStats.date == date
|
||||
)
|
||||
if chat_stats:
|
||||
query = ChatStats.update(messages_count=ChatStats.messages_count + 1).where(
|
||||
ChatStats.chat_id == chat_id, ChatStats.date == date
|
||||
)
|
||||
query.execute()
|
||||
else:
|
||||
ChatStats.create(chat_id=chat_id, date=date, messages_count=1)
|
||||
|
||||
|
||||
def update_user_all_stat(user_id):
|
||||
user = Users.get_or_none(Users.id == user_id)
|
||||
if user:
|
||||
query = Users.update(user_stats=Users.user_stats + 1).where(Users.id == user_id)
|
||||
query.execute()
|
||||
else:
|
||||
Users.create(id=user_id, user_stats=1)
|
||||
|
||||
|
||||
def update_user_rep(user_id):
|
||||
user = Users.get_or_none(Users.id == user_id)
|
||||
if user:
|
||||
query = Users.update(user_rep=Users.user_rep + 1).where(Users.id == user_id)
|
||||
query.execute()
|
||||
else:
|
||||
Users.create(id=user_id, user_rep=1)
|
||||
|
||||
|
||||
def update_user_stats(chat_id, user_id, date):
|
||||
user_stats = UserStats.get_or_none(
|
||||
UserStats.chat_id == chat_id,
|
||||
UserStats.user_id == user_id,
|
||||
UserStats.date == date,
|
||||
)
|
||||
if user_stats:
|
||||
query = UserStats.update(messages_count=UserStats.messages_count + 1).where(
|
||||
UserStats.chat_id == chat_id,
|
||||
UserStats.user_id == user_id,
|
||||
UserStats.date == date,
|
||||
)
|
||||
query.execute()
|
||||
else:
|
||||
UserStats.create(chat_id=chat_id, user_id=user_id, date=date, messages_count=1)
|
@ -1,12 +0,0 @@
|
||||
class MissingModuleName(BaseException):
|
||||
def __init__(self):
|
||||
self.message = "Пропущено название директории модуля"
|
||||
|
||||
super().__init__(self.message)
|
||||
|
||||
|
||||
class NotExpectedModuleName(BaseException):
|
||||
def __init__(self):
|
||||
self.message = "Не ожидалось название директории модуля"
|
||||
|
||||
super().__init__(self.message)
|
@ -1,9 +1,10 @@
|
||||
from karkas_piccolo.karkas import migrate_forward # isort:skip
|
||||
from karkas_piccolo.conf.apps import AppConfig, AppRegistry # isort:skip
|
||||
|
||||
from piccolo.conf.apps import PiccoloConfModule
|
||||
from piccolo.engine.sqlite import SQLiteEngine
|
||||
|
||||
from karkas_core.singleton import Singleton
|
||||
from karkas_piccolo.conf.apps import AppConfig, AppRegistry
|
||||
from karkas_piccolo.karkas import migrate_forward
|
||||
|
||||
# from karkas_piccolo.conf.apps import A
|
||||
|
||||
@ -28,7 +29,7 @@ async def module_late_init():
|
||||
|
||||
APP_REGISTRY = AppRegistry(apps_configs=APPS_CONFIGS)
|
||||
|
||||
module = PiccoloConfModule(name="experimental.database")
|
||||
module = PiccoloConfModule(name="standard.database")
|
||||
module.DB = DB
|
||||
module.APP_REGISTRY = APP_REGISTRY
|
||||
|
@ -1 +0,0 @@
|
||||
from .fsm_data import FSMData
|
@ -1,12 +0,0 @@
|
||||
import peewee as pw
|
||||
|
||||
from .db import database_proxy
|
||||
|
||||
|
||||
class ChatStats(pw.Model):
|
||||
class Meta:
|
||||
database = database_proxy
|
||||
|
||||
chat_id = pw.IntegerField(null=False)
|
||||
date = pw.DateField(null=False)
|
||||
messages_count = pw.IntegerField(null=False, default=0)
|
@ -1,12 +0,0 @@
|
||||
import peewee as pw
|
||||
|
||||
from .db import database_proxy
|
||||
|
||||
|
||||
class Chats(pw.Model):
|
||||
class Meta:
|
||||
database = database_proxy
|
||||
|
||||
chat_name = pw.CharField(null=False)
|
||||
chat_type = pw.IntegerField(null=False, default=10)
|
||||
chat_all_stat = pw.IntegerField(null=False)
|
@ -1,3 +0,0 @@
|
||||
from peewee import DatabaseProxy
|
||||
|
||||
database_proxy = DatabaseProxy()
|
@ -1,12 +0,0 @@
|
||||
import peewee as pw
|
||||
|
||||
from .db import database_proxy
|
||||
|
||||
|
||||
class FSMData(pw.Model):
|
||||
class Meta:
|
||||
database = database_proxy
|
||||
|
||||
key = pw.CharField(primary_key=True)
|
||||
state = pw.CharField(null=True)
|
||||
data = pw.CharField(null=True)
|
@ -1,15 +0,0 @@
|
||||
import peewee as pw
|
||||
|
||||
from .db import database_proxy
|
||||
|
||||
|
||||
class Messages(pw.Model):
|
||||
class Meta:
|
||||
database = database_proxy
|
||||
|
||||
message_chat_id = pw.IntegerField(null=False)
|
||||
message_id = pw.IntegerField(null=False)
|
||||
message_sender_id = pw.IntegerField(null=False)
|
||||
answer_to_message_id = pw.IntegerField(null=True)
|
||||
message_ai_model = pw.TextField(null=True)
|
||||
message_text = pw.TextField(null=False)
|
@ -1,13 +0,0 @@
|
||||
import peewee as pw
|
||||
|
||||
from .db import database_proxy
|
||||
|
||||
|
||||
class UserStats(pw.Model):
|
||||
class Meta:
|
||||
database = database_proxy
|
||||
|
||||
chat_id = pw.IntegerField(null=False)
|
||||
user_id = pw.IntegerField(null=False)
|
||||
date = pw.DateField(null=False)
|
||||
messages_count = pw.IntegerField(null=False, default=0)
|
@ -1,14 +0,0 @@
|
||||
import peewee as pw
|
||||
|
||||
from .db import database_proxy
|
||||
|
||||
|
||||
class Users(pw.Model):
|
||||
class Meta:
|
||||
database = database_proxy
|
||||
|
||||
user_tag = pw.CharField(null=True)
|
||||
user_name = pw.CharField(null=False) # до 255 символов
|
||||
user_role = pw.IntegerField(null=True, default=3)
|
||||
user_stats = pw.IntegerField(null=True, default=0)
|
||||
user_rep = pw.IntegerField(null=True, default=0)
|
@ -1 +0,0 @@
|
||||
from .fsm_data import FSMDataRepository
|
@ -1,32 +0,0 @@
|
||||
from peewee import fn
|
||||
|
||||
from ..models import FSMData
|
||||
|
||||
|
||||
class FSMDataRepository:
|
||||
def get(self, key: str):
|
||||
return FSMData.get_or_none(key=key)
|
||||
|
||||
def set_state(self, key: str, state: str):
|
||||
FSMData.insert(
|
||||
key=key,
|
||||
state=state,
|
||||
data=fn.COALESCE(
|
||||
FSMData.select(FSMData.data).where(FSMData.key == key), None
|
||||
),
|
||||
).on_conflict(
|
||||
conflict_target=[FSMData.key],
|
||||
update={FSMData.state: state},
|
||||
).execute()
|
||||
|
||||
def set_data(self, key: str, data: str):
|
||||
FSMData.insert(
|
||||
key=key,
|
||||
data=data,
|
||||
state=fn.COALESCE(
|
||||
FSMData.select(FSMData.state).where(FSMData.key == key), None
|
||||
),
|
||||
).on_conflict(
|
||||
conflict_target=[FSMData.key],
|
||||
update={FSMData.data: data},
|
||||
).execute()
|
@ -1 +0,0 @@
|
||||
Эта директория для тестовой БД
|
@ -1,142 +0,0 @@
|
||||
# flake8: noqa
|
||||
|
||||
import os
|
||||
import unittest
|
||||
|
||||
from ...exceptions.module_exceptions import MissingModuleName, NotExpectedModuleName
|
||||
from ..db_api import *
|
||||
|
||||
|
||||
class TestDatabaseAPI(unittest.TestCase):
|
||||
database = None
|
||||
path = None
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
cls.database, cls.path = connect_database(is_test=True, module="database")
|
||||
create_tables(cls.database)
|
||||
|
||||
def test_fail_connect(cls):
|
||||
with cls.assertRaises(MissingModuleName):
|
||||
cls.database, cls.path = connect_database(is_test=True)
|
||||
|
||||
with cls.assertRaises(NotExpectedModuleName):
|
||||
cls.database, cls.path = connect_database(module="database")
|
||||
|
||||
def test_add_and_get_chat(self):
|
||||
add_chat(chat_id=21, chat_role=0, chat_stats=0, chat_federation=0)
|
||||
add_chat(chat_id=22, chat_role=1, chat_stats=100, chat_federation=1)
|
||||
|
||||
chat1 = get_chat(21)
|
||||
self.assertIsNotNone(chat1)
|
||||
self.assertEqual(chat1.id, 21)
|
||||
self.assertEqual(chat1.chat_role, 0)
|
||||
|
||||
chat2 = get_chat(22)
|
||||
self.assertIsNotNone(chat2)
|
||||
self.assertEqual(chat2.id, 22)
|
||||
self.assertEqual(chat2.chat_role, 1)
|
||||
|
||||
def test_add_and_get_message(self):
|
||||
add_message(
|
||||
message_id=1, message_text="Test Message 1", message_sender=1, answer_id=2
|
||||
)
|
||||
add_message(
|
||||
message_id=2, message_text="Test Message 2", message_sender=2, answer_id=1
|
||||
)
|
||||
|
||||
message1 = get_message(1)
|
||||
self.assertIsNotNone(message1)
|
||||
self.assertEqual(message1.id, 1)
|
||||
self.assertEqual(message1.message_text, "Test Message 1")
|
||||
|
||||
message2 = get_message(2)
|
||||
self.assertIsNotNone(message2)
|
||||
self.assertEqual(message2.id, 2)
|
||||
self.assertEqual(message2.message_text, "Test Message 2")
|
||||
|
||||
def test_add_and_get_user(self):
|
||||
add_user(
|
||||
user_id=100,
|
||||
user_name="TestUser1",
|
||||
user_tag="TestTag1",
|
||||
user_role=0,
|
||||
user_stats=10,
|
||||
user_rep=5,
|
||||
)
|
||||
add_user(
|
||||
user_id=101,
|
||||
user_name="TestUser2",
|
||||
user_tag="TestTag2",
|
||||
user_role=1,
|
||||
user_stats=20,
|
||||
user_rep=10,
|
||||
)
|
||||
|
||||
user1 = get_user(100)
|
||||
self.assertIsNotNone(user1)
|
||||
self.assertEqual(user1.id, 100)
|
||||
self.assertEqual(user1.user_name, "TestUser1")
|
||||
|
||||
user2 = get_user(101)
|
||||
self.assertIsNotNone(user2)
|
||||
self.assertEqual(user2.id, 101)
|
||||
self.assertEqual(user2.user_name, "TestUser2")
|
||||
|
||||
def test_get_user_role(self):
|
||||
add_user(
|
||||
user_id=102,
|
||||
user_name="TestUser3",
|
||||
user_tag="TestTag3",
|
||||
user_role=0,
|
||||
user_stats=30,
|
||||
user_rep=15,
|
||||
)
|
||||
add_user(
|
||||
user_id=103,
|
||||
user_name="TestUser4",
|
||||
user_tag="TestTag4",
|
||||
user_role=1,
|
||||
user_stats=40,
|
||||
user_rep=20,
|
||||
)
|
||||
|
||||
user_role1 = get_user_role(102)
|
||||
self.assertEqual(user_role1, 0)
|
||||
|
||||
user_role2 = get_user_role(103)
|
||||
self.assertEqual(user_role2, 1)
|
||||
|
||||
def test_change_user_name(self):
|
||||
add_user(
|
||||
user_id=104,
|
||||
user_name="OldName1",
|
||||
user_tag="TestTag5",
|
||||
user_role=0,
|
||||
user_stats=50,
|
||||
user_rep=25,
|
||||
)
|
||||
change_user_name(104, "NewName1")
|
||||
updated_user1 = get_user(104)
|
||||
self.assertEqual(updated_user1.user_name, "NewName1")
|
||||
|
||||
add_user(
|
||||
user_id=105,
|
||||
user_name="OldName2",
|
||||
user_tag="TestTag6",
|
||||
user_role=1,
|
||||
user_stats=60,
|
||||
user_rep=30,
|
||||
)
|
||||
change_user_name(105, "NewName2")
|
||||
updated_user2 = get_user(105)
|
||||
self.assertEqual(updated_user2.user_name, "NewName2")
|
||||
|
||||
@classmethod
|
||||
def tearDownClass(cls):
|
||||
cls.database.close()
|
||||
os.system(f"rm {cls.path}") # nosec
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
@ -1,2 +1,2 @@
|
||||
from .handlers import get_chat_info, get_user_info
|
||||
from .handlers import get_user_info
|
||||
from .main import module_init
|
||||
|
@ -20,32 +20,10 @@ Roles: "type[IRoles]" = get_module("standard.roles", "Roles")
|
||||
|
||||
|
||||
async def get_info_answer_by_id(message: Message, bot: Bot, user_id: int):
|
||||
ai_model = db_api.get_message_ai_model(message.chat.id, message.message_id)
|
||||
if ai_model is not None:
|
||||
await message.reply(
|
||||
"Это сообщение было сгенерировано ботом используя модель: " + ai_model
|
||||
)
|
||||
return
|
||||
|
||||
if user_id == bot.id:
|
||||
await message.reply("Это сообщение было отправлено ботом")
|
||||
return
|
||||
|
||||
user = db_api.get_user(user_id)
|
||||
|
||||
if user is None:
|
||||
await message.reply("Пользователь не найден")
|
||||
log(f"Пользователь не найден: {user_id}, {user}")
|
||||
return
|
||||
|
||||
roles = Roles()
|
||||
answer = (
|
||||
f"Пользователь: {user.user_name}\n"
|
||||
f"Роль: {await roles.get_role_name(role_id=user.user_role)}\n"
|
||||
f"Тег: @{user.user_tag}\n"
|
||||
f"Кол-во сообщений: {user.user_stats}\n"
|
||||
f"Репутация: {user.user_rep}"
|
||||
)
|
||||
role = roles.get_user_role(user_id)
|
||||
|
||||
answer = f"Имя: {user_id}\n" f"Роль: {role}\n"
|
||||
await message.reply(answer)
|
||||
|
||||
|
||||
@ -54,14 +32,7 @@ async def get_user_info(message: Message, bot: Bot):
|
||||
# Если сообщение отвечает на другое сообщение, то выводим информацию о пользователе, на чье сообщение был ответ
|
||||
# Если это бот то выводим информацию, что это бот и какая модель yandexgpt используется
|
||||
try:
|
||||
if len(message.text.split()) > 1 and message.text.split()[1].startswith("@"):
|
||||
user_tag = message.text.split()[1][1:]
|
||||
user_id = db_api.get_user_id(user_tag)
|
||||
if user_id:
|
||||
await get_info_answer_by_id(message, bot, user_id)
|
||||
else:
|
||||
await message.reply(f"Пользователь с тегом @{user_tag} не найден")
|
||||
elif message.reply_to_message:
|
||||
if message.reply_to_message:
|
||||
user_id = message.reply_to_message.from_user.id
|
||||
await get_info_answer_by_id(message, bot, user_id)
|
||||
else:
|
||||
|
@ -3,7 +3,7 @@ from aiogram.filters import Command
|
||||
|
||||
from karkas_core.modules_system.public_api import get_module, register_router
|
||||
|
||||
from .handlers import get_chat_info, get_user_info
|
||||
from .handlers import get_user_info
|
||||
|
||||
register_command = get_module("standard.command_helper", "register_command")
|
||||
|
||||
@ -12,9 +12,9 @@ async def module_init():
|
||||
router = Router()
|
||||
|
||||
router.message.register(get_user_info, Command("info"))
|
||||
router.message.register(get_chat_info, Command("chatinfo"))
|
||||
# router.message.register(get_chat_info, Command("chatinfo"))
|
||||
|
||||
register_router(router)
|
||||
|
||||
register_command("info", "Информация о пользователе")
|
||||
register_command("chatinfo", "Информация о чате")
|
||||
# register_command("chatinfo", "Информация о чате")
|
||||
|
@ -0,0 +1 @@
|
||||
from .piccolo_app import APP_CONFIG
|
@ -0,0 +1,15 @@
|
||||
import os
|
||||
|
||||
from karkas_piccolo.conf.apps import AppConfig
|
||||
|
||||
from .tables import Roles
|
||||
|
||||
CURRENT_DIRECTORY = os.path.dirname(os.path.abspath(__file__))
|
||||
|
||||
APP_CONFIG = AppConfig(
|
||||
app_name="standard.roles",
|
||||
migrations_folder_path=os.path.join(CURRENT_DIRECTORY, "piccolo_migrations"),
|
||||
table_classes=[Roles],
|
||||
migration_dependencies=[],
|
||||
commands=[],
|
||||
)
|
@ -0,0 +1,59 @@
|
||||
from piccolo.apps.migrations.auto.migration_manager import MigrationManager
|
||||
from piccolo.columns.column_types import Integer
|
||||
from piccolo.columns.indexes import IndexMethod
|
||||
|
||||
ID = "2024-08-20T13:36:02:417372"
|
||||
VERSION = "1.16.0"
|
||||
DESCRIPTION = ""
|
||||
|
||||
|
||||
async def forwards():
|
||||
manager = MigrationManager(
|
||||
migration_id=ID, app_name="standard.roles", description=DESCRIPTION
|
||||
)
|
||||
|
||||
manager.add_table(class_name="Roles", tablename="roles", schema=None, columns=None)
|
||||
|
||||
manager.add_column(
|
||||
table_class_name="Roles",
|
||||
tablename="roles",
|
||||
column_name="role_id",
|
||||
db_column_name="role_id",
|
||||
column_class_name="Integer",
|
||||
column_class=Integer,
|
||||
params={
|
||||
"default": 0,
|
||||
"null": False,
|
||||
"primary_key": False,
|
||||
"unique": False,
|
||||
"index": False,
|
||||
"index_method": IndexMethod.btree,
|
||||
"choices": None,
|
||||
"db_column_name": None,
|
||||
"secret": False,
|
||||
},
|
||||
schema=None,
|
||||
)
|
||||
|
||||
manager.add_column(
|
||||
table_class_name="Roles",
|
||||
tablename="roles",
|
||||
column_name="user_id",
|
||||
db_column_name="user_id",
|
||||
column_class_name="Integer",
|
||||
column_class=Integer,
|
||||
params={
|
||||
"default": 0,
|
||||
"null": False,
|
||||
"primary_key": False,
|
||||
"unique": False,
|
||||
"index": False,
|
||||
"index_method": IndexMethod.btree,
|
||||
"choices": None,
|
||||
"db_column_name": None,
|
||||
"secret": False,
|
||||
},
|
||||
schema=None,
|
||||
)
|
||||
|
||||
return manager
|
@ -0,0 +1,7 @@
|
||||
from piccolo.columns import Integer
|
||||
from piccolo.table import Table
|
||||
|
||||
|
||||
class Roles(Table):
|
||||
role_id = Integer()
|
||||
user_id = Integer()
|
@ -4,6 +4,9 @@ from karkas_core.modules_system.public_api import get_module
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from karkas_blocks.standard.config import IConfig
|
||||
from karkas_blocks.standard.database import (
|
||||
register_app_config as Iregister_app_config,
|
||||
)
|
||||
|
||||
|
||||
def module_init():
|
||||
@ -34,4 +37,12 @@ def module_init():
|
||||
visible=False,
|
||||
)
|
||||
|
||||
register_app_config: "Iregister_app_config" = get_module(
|
||||
"standard.database", "register_app_config"
|
||||
)
|
||||
|
||||
from .db import APP_CONFIG
|
||||
|
||||
register_app_config(APP_CONFIG)
|
||||
|
||||
pass
|
||||
|
@ -2,13 +2,11 @@ from typing import TYPE_CHECKING
|
||||
|
||||
from karkas_core.modules_system.public_api import get_module
|
||||
|
||||
from .db.tables import Roles as RolesTable
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from karkas_blocks.standard.config import IConfig
|
||||
from karkas_blocks.standard.database.db_api import get_user_role as IGetUserRoleType
|
||||
|
||||
get_user_role: "IGetUserRoleType" = get_module(
|
||||
"standard.database", "db_api.get_user_role"
|
||||
)
|
||||
config: "IConfig" = get_module("standard.config", "config")
|
||||
|
||||
|
||||
@ -21,6 +19,17 @@ class Roles:
|
||||
def __init__(self):
|
||||
pass
|
||||
|
||||
def get_user_role(self, user_id):
|
||||
row = (
|
||||
RolesTable.select(RolesTable.role_id)
|
||||
.where(RolesTable.user_id == user_id)
|
||||
.first()
|
||||
.run_sync()
|
||||
)
|
||||
if row is not None:
|
||||
return row["role_id"]
|
||||
return None
|
||||
|
||||
def update_roles(self):
|
||||
self.user_role_id = config.get("roles::user")
|
||||
self.moderator_role_id = config.get("roles::moderator")
|
||||
@ -29,7 +38,7 @@ class Roles:
|
||||
|
||||
async def check_admin_permission(self, user_id):
|
||||
self.update_roles()
|
||||
match get_user_role(user_id):
|
||||
match self.get_user_role(user_id):
|
||||
case self.admin_role_id:
|
||||
return True
|
||||
case _:
|
||||
@ -37,7 +46,7 @@ class Roles:
|
||||
|
||||
async def check_moderator_permission(self, user_id):
|
||||
self.update_roles()
|
||||
match get_user_role(user_id):
|
||||
match self.get_user_role(user_id):
|
||||
case self.moderator_role_id:
|
||||
return True
|
||||
case _:
|
||||
@ -59,7 +68,7 @@ class Roles:
|
||||
|
||||
async def get_user_permission(self, user_id):
|
||||
self.update_roles()
|
||||
match get_user_role(user_id):
|
||||
match self.get_user_role(user_id):
|
||||
case self.admin_role_id:
|
||||
return self.admin
|
||||
case self.moderator_role_id:
|
||||
|
@ -1,6 +1,6 @@
|
||||
from piccolo.apps.migrations.commands.forwards import run_forwards
|
||||
import karkas_piccolo.patches.app.finder # noqa: F401 isort:skip
|
||||
|
||||
import karkas_piccolo.patches.app.finder # noqa: F401
|
||||
from piccolo.apps.migrations.commands.forwards import run_forwards
|
||||
|
||||
|
||||
async def migrate_forward():
|
||||
|
@ -1,5 +1,5 @@
|
||||
import karkas_piccolo.patches.cli.finder # noqa: F401
|
||||
import karkas_piccolo.patches.migrations # noqa: F401
|
||||
import karkas_piccolo.patches.cli.finder # noqa: F401 isort:skip
|
||||
import karkas_piccolo.patches.migrations # noqa: F401 isort:skip
|
||||
|
||||
|
||||
def main():
|
||||
|
Loading…
Reference in New Issue
Block a user