import peewee as pw from aiogram.types import Message from src.service import paths from .exceptions import MissingModuleName, NotExpectedModuleName from .models.chat_stats import ChatStats from .models.chats import Chats from .models.db import database_proxy from .models.messages import Messages from .models.user_stats import UserStats from .models.users import Users def connect_database(is_test: bool = False, module: str | None = None): if is_test: if not module: raise MissingModuleName() db_path = f"{paths.modules_standard}/{module}/tests/database" else: if module: raise NotExpectedModuleName() db_path = f"{paths.core}/database" database = pw.SqliteDatabase(f"{db_path}/OCAB.db") database_proxy.initialize(database) database.connect() create_tables(database) return database, f"{db_path}/OCAB.db" def create_tables(db: pw.SqliteDatabase): """Создание таблиц""" for table in Chats, Messages, Users, UserStats, ChatStats: if not table.table_exists(): db.create_tables([table]) def add_chat(chat_id, chat_name, chat_type=10, chat_stats=0): chat, created = Chats.get_or_create( id=chat_id, defaults={ "chat_name": chat_name, "chat_type": chat_type, "chat_all_stat": chat_stats, }, ) if not created: # Обновить существующий чат, если он уже существует chat.chat_name = chat_name chat.chat_type = chat_type chat.chat_stats = chat_stats chat.save() def add_user( user_id, user_first_name, user_last_name=None, user_tag=None, user_role=0, user_stats=0, user_rep=0, ): if user_last_name is None: user_name = user_first_name else: user_name = user_first_name + " " + user_last_name user, created = Users.get_or_create( id=user_id, defaults={ "user_tag": user_tag, "user_name": user_name, "user_role": user_role, "user_stats": user_stats, "user_rep": user_rep, }, ) if not created: # Обновить существующего пользователя, если он уже существует user.user_tag = user_tag user.user_name = user_name user.user_role = user_role user.user_stats = user_stats user.user_rep = user_rep user.save() def add_message(message: Message, message_ai_model=None): if message.reply_to_message: answer_to_message_id = message.reply_to_message.message_id else: answer_to_message_id = None Messages.create( message_chat_id=message.chat.id, message_id=message.message_id, message_sender_id=message.from_user.id, answer_to_message_id=answer_to_message_id, message_ai_model=message_ai_model, message_text=message.text, ) def add_chat_stats(chat_id, date, messages_count): ChatStats.create(chat_id=chat_id, date=date, messages_count=messages_count) def add_user_stats(chat_id, user_id, date, messages_count): UserStats.create( chat_id=chat_id, user_id=user_id, date=date, messages_count=messages_count ) # Работа с таблицей чатов def get_chat(chat_id): return Chats.get_or_none(Chats.id == chat_id) def change_chat_name(chat_id, new_chat_name): query = Chats.update(chat_name=new_chat_name).where(Chats.id == chat_id) query.execute() def change_chat_type(chat_id, new_chat_type): query = Chats.update(chat_type=new_chat_type).where(Chats.id == chat_id) query.execute() def get_chat_all_stat(chat_id): chat = Chats.get_or_none(Chats.id == chat_id) return chat.chat_all_stat if chat else None # Работа с таблицей пользователей def get_user(user_id) -> Users | None: return Users.get_or_none(Users.id == user_id) def get_user_tag(user_id): user = Users.get_or_none(Users.id == user_id) return user.user_tag if user else None def get_user_id(user_tag): user = Users.get_or_none(Users.user_tag == user_tag) return user.id if user else None def get_user_name(user_id): user = Users.get_or_none(Users.id == user_id) return user.user_name if user else None def get_user_role(user_id): user = Users.get_or_none(Users.id == user_id) return user.user_role if user else None def get_user_all_stats(user_id): user = Users.get_or_none(Users.id == user_id) return user.user_stats if user else None def get_user_rep(user_id): user = Users.get_or_none(Users.id == user_id) return user.user_rep if user else None def change_user_name(user_id, user_first_name, user_last_name=None): if user_last_name is None: new_user_name = user_first_name else: new_user_name = user_first_name + " " + user_last_name query = Users.update(user_name=new_user_name).where(Users.id == user_id) query.execute() def change_user_tag(user_id, new_user_tag): query = Users.update(user_tag=new_user_tag).where(Users.id == user_id) query.execute() def change_user_role(user_id, new_user_role): query = Users.update(user_role=new_user_role).where(Users.id == user_id) query.execute() # Работа с таблицей сообщений def get_message(message_chat_id, message_id): return Messages.get_or_none( Messages.message_chat_id == message_chat_id, Messages.message_id == message_id, ) def get_message_sender_id(message_chat_id, message_id): message = Messages.get_or_none( Messages.message_chat_id == message_chat_id, Messages.message_id == message_id ) return message.message_sender_id if message else None def get_message_text(message_chat_id, message_id): message = Messages.get_or_none( Messages.message_chat_id == message_chat_id, Messages.message_id == message_id ) return message.message_text if message else None def get_message_ai_model(message_chat_id, message_id): message = Messages.get_or_none( Messages.message_chat_id == message_chat_id, Messages.message_id == message_id ) return message.message_ai_model if message else None def get_answer_to_message_id(message_chat_id, message_id): message = Messages.get_or_none( Messages.message_chat_id == message_chat_id, Messages.message_id == message_id ) return message.answer_to_message_id if message else None # Работа с таблицей статистики чатов def get_chat_stats(chat_id): chat_stats = {} for chat_stat in ChatStats.select().where(ChatStats.chat_id == chat_id): chat_stats[chat_stat.date] = chat_stat.messages_count return chat_stats # Работа с таблицей статистики пользователей def get_user_stats(user_id): user_stats = {} for user_stat in UserStats.select().where(UserStats.user_id == user_id): user_stats[user_stat.date] = user_stat.messages_count return user_stats # Функции обновления def update_chat_all_stat(chat_id): query = Chats.update(chat_all_stat=Chats.chat_all_stat + 1).where( Chats.id == chat_id ) query.execute() def update_chat_stats(chat_id, date): chat_stats = ChatStats.get_or_none( ChatStats.chat_id == chat_id, ChatStats.date == date ) if chat_stats: query = ChatStats.update(messages_count=ChatStats.messages_count + 1).where( ChatStats.chat_id == chat_id, ChatStats.date == date ) query.execute() else: ChatStats.create(chat_id=chat_id, date=date, messages_count=1) def update_user_all_stat(user_id): user = Users.get_or_none(Users.id == user_id) if user: query = Users.update(user_stats=Users.user_stats + 1).where(Users.id == user_id) query.execute() else: Users.create(id=user_id, user_stats=1) def update_user_rep(user_id): user = Users.get_or_none(Users.id == user_id) if user: query = Users.update(user_rep=Users.user_rep + 1).where(Users.id == user_id) query.execute() else: Users.create(id=user_id, user_rep=1) def update_user_stats(chat_id, user_id, date): user_stats = UserStats.get_or_none( UserStats.chat_id == chat_id, UserStats.user_id == user_id, UserStats.date == date, ) if user_stats: query = UserStats.update(messages_count=UserStats.messages_count + 1).where( UserStats.chat_id == chat_id, UserStats.user_id == user_id, UserStats.date == date, ) query.execute() else: UserStats.create(chat_id=chat_id, user_id=user_id, date=date, messages_count=1)