# Импорт библиотек import os from contextlib import suppress import openai import configparser import sqlite3 import asyncio import sys import datetime from time import mktime from aiogram import Bot, Dispatcher, executor, types from aiogram.contrib.fsm_storage.memory import MemoryStorage from aiogram.utils.exceptions import MessageCantBeDeleted, MessageToDeleteNotFound mother_path = os.path.dirname(os.path.dirname(os.getcwd())) sys.path.insert(1, mother_path) # Импорт переменных из файла .ini config = configparser.ConfigParser() config.read(os.path.join(mother_path, 'src/config.ini')) TOKEN = config['Telegram']['token'] OPENAI_API_KEY = (config['Openai']['api_key']) bot_trigger_front = (config['Telegram']['bot_trigger_front']).split('|') bot_trigger_all = (config['Telegram']['bot_trigger_all']).split('|') # удаление лишних элементов массивов bot_trigger_front.remove('') bot_trigger_all.remove('') DB_message_limit = int(config['DataBase']['message_limit']) # Инициализация бота bot = Bot(token=TOKEN) dp = Dispatcher(bot, storage=MemoryStorage()) # Инициализация API OpenAI openai.api_key = OPENAI_API_KEY # Инициализация базы данных OCAB_DB в папке DataBase/OCAB_DB.db # Создаём базу данных sqlite3 по пути /home/armatik/PycharmProjects/OpenChatAiBot/DataBase/OCAB_DB.db database = sqlite3.connect(os.path.join(mother_path, 'DataBase/OCAB_DB.db')) cursor = database.cursor() # Создаём таблицу chat_list cursor.execute("""CREATE TABLE IF NOT EXISTS chat_list ( chat_id INTEGER PRIMARY KEY, chat_role INTEGER NOT NULL, chat_stats INTEGER NOT NULL )""") # Создаём таблицу message_list cursor.execute("""CREATE TABLE IF NOT EXISTS message_list ( message_id INTEGER PRIMARY KEY, message_text TEXT NOT NULL, message_sender INTEGER NOT NULL, answer_id INTEGER )""") # Создаём таблицу user_list cursor.execute("""CREATE TABLE IF NOT EXISTS user_list ( user_id INTEGER PRIMARY KEY, user_name TEXT NOT NULL, user_role INTEGER, user_stats INTEGER user_rep INTEGER )""") #запись информации о чате в базу данных async def empty_role(id): cursor.execute("UPDATE user_list SET user_role = ? WHERE user_id = ?", (0, id)) database.commit() async def check(id): #проверка что у человека есть роль user_role = cursor.execute("SELECT user_role FROM user_list WHERE user_id = ?", (id,)).fetchone()[0] if user_role not in [0, 1, 2]: await empty_role(id) async def get_role_name(rolenum): rolenum = int(rolenum) if rolenum == 0: role = config['Roles']['user'] elif rolenum == 1: role = config['Roles']['moderator'] elif rolenum == 2: role = config['Roles']['admin'] return role async def get_role(id): #получение роли пользователя user_role = cursor.execute("SELECT user_role FROM user_list WHERE user_id = ?", (id,)).fetchone()[0] return await get_role_name(user_role) async def check_admin(id, chat_id): #Проверка что человек есть в списке администраторов чата chat_admins = await bot.get_chat_administrators(chat_id) flag = False for admin in chat_admins: if admin.user.id == id: flag = True return flag async def check_moderator(id): #Проверка что человек имеет роль модератора if cursor.execute("SELECT user_role FROM user_list WHERE user_id = ?", (id,)).fetchone()[0] >= 1: return True else: return False async def check_user(id): #Проверка что человек имеет роль пользователя if cursor.execute("SELECT user_role FROM user_list WHERE user_id = ?", (id,)).fetchone()[0] == 0: return True else: return False async def save_message(message): #Сохранение сообщения в базу данных cursor.execute("INSERT INTO message_list VALUES (?, ?, ?, ?)", (message.message_id, message.text, message.from_user.id, 0)) if message.reply_to_message is not None: cursor.execute("UPDATE message_list SET answer_id = ? WHERE message_id = ?", (message.reply_to_message.message_id, message.message_id)) #Если отправитель не зарегистрирован в базе данных, то добавляем его if cursor.execute("SELECT user_id FROM user_list WHERE user_id = ?", (message.from_user.id,)).fetchone() is None: cursor.execute("INSERT INTO user_list VALUES (?, ?, ?, ?)", (message.from_user.id, message.from_user.username, 0, 1)) #Добавляем статистику в чат и в данные пользователя, если пользователь не бот. cursor.execute("UPDATE chat_list SET chat_stats = chat_stats + 1 WHERE chat_id = ?", (message.chat.id,)) cursor.execute("UPDATE user_list SET user_stats = user_stats + 1 WHERE user_id = ?", (message.from_user.id,)) database.commit() async def time_to_seconds(time): #Конвертация текстового указания времени по типу 3h, 5m, 10s в минуты if time[-1] == 'd': return int(time[:-1])*86400 elif time[-1] == 'h': return int(time[:-1])*3600 elif time[-1] == 'm': return int(time[:-1])*60 elif time[-1] == 's': return int(time[:-1]) async def short_time_to_time(time): #Конвертация времени в длинное название if time[-1] == 'd': return str(f"{time[0:-1]} дней") elif time[-1] == 'h': return str(f"{time[0:-1]} часов") elif time[-1] == 'm': return str(f"{time[0:-1]} минут") elif time[-1] == 's': return str(f"{time[0:-1]} секунд") @dp.message_handler(commands=['mute']) async def mute(message: types.Message): #Проверка что отправитель является администратором чата try: if await check_moderator(message.from_user.id): #Проверка отвечает ли сообщение на другое сообщение if message.reply_to_message is not None: time = message.text.split(' ')[1] #получаем id отправителя сообщение на которое отвечает message target_id = message.reply_to_message.from_user.id target_name = cursor.execute("SELECT user_name FROM user_list WHERE user_id = ?", (target_id,)).fetchone()[0] #Проверка, что человек пользователь if cursor.execute("SELECT user_role FROM user_list WHERE user_id = ?", (target_id,)).fetchone()[0] == 0: #ограничения прав пользователя по отправке сообщений на time секунд time_sec = await time_to_seconds(message.text.split(' ')[1]) if time_sec <= 30 or time_sec >= 31536000: await message.reply("Время мута должно быть больше 30 секунд и меньше 365 дней") return date_string = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") date_time = datetime.datetime.strptime(date_string, "%Y-%m-%d %H:%M:%S") unix_time = int(mktime(date_time.timetuple())) await bot.restrict_chat_member(message.chat.id, target_id, until_date=unix_time+time_sec, permissions=types.ChatPermissions(can_send_messages=False)) await message.reply( f"Пользователь {target_name} замьючен на {await short_time_to_time(time)}") else: await message.reply(f"Пользователь [{target_name}](tg://user?id={target_id}) является {await get_role(target_id)} и не может быть замьючен", parse_mode='Markdown') return else: target_tag = message.text.split(' ')[1] target_tag = target_tag[1:] target_id = int(cursor.execute("SELECT user_id FROM user_list WHERE user_name = ?", (target_tag,)).fetchone()[0]) target_name = cursor.execute("SELECT user_name FROM user_list WHERE user_id = ?", (target_id,)).fetchone()[0] #ограничения прав пользователя по отправке сообщений на time секунд time_mute = message.text.split(' ')[2] if cursor.execute("SELECT user_role FROM user_list WHERE user_id = ?", (target_id,)).fetchone()[0] == 0: #ограничения прав пользователя по отправке сообщений на time секунд time_sec = await time_to_seconds(message.text.split(' ')[2]) if time_sec <= 30 or time_sec >= 31536000: await message.reply("Время мута должно быть больше 30 секунд и меньше 365 дней") return date_string = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") date_time = datetime.datetime.strptime(date_string, "%Y-%m-%d %H:%M:%S") unix_time = int(mktime(date_time.timetuple())) await bot.restrict_chat_member(message.chat.id, target_id, until_date=unix_time+time_sec, permissions=types.ChatPermissions(can_send_messages=False)) await message.reply( f"Пользователь {target_name} замьючен на {await short_time_to_time(time_mute)}.") else: await message.reply(f"Пользователь [{target_name}](tg://user?id={target_id}) является {await get_role(target_id)} и не может быть замьючен", parse_mode="Markdown") return except: await message.reply("Ошибка данных. Возможно пользователь ещё ничего не написал.") @dp.message_handler(commands=['unmute']) async def unmute(message: types.Message): chat_id= message.chat.id if await check_moderator(message.from_user.id): if message.reply_to_message is not None: target_id = message.reply_to_message.from_user.id target_name = cursor.execute("SELECT user_name FROM user_list WHERE user_id = ?", (target_id,)).fetchone()[0] await bot.restrict_chat_member(message.chat.id, target_id, until_date=0, permissions=types.ChatPermissions(can_send_messages=True)) await message.reply( f"Пользователь [{target_name}](tg://user?id={target_id}) размьючен", parse_mode="Markdown") else: target_tag = message.text.split(' ')[1] target_tag = target_tag[1:] target_id = int(cursor.execute("SELECT user_id FROM user_list WHERE user_name = ?", (target_tag,)).fetchone()[0]) target_name = cursor.execute("SELECT user_name FROM user_list WHERE user_id = ?", (target_id,)).fetchone()[0] #получаем стандартные права чата if cursor.execute("SELECT user_role FROM user_list WHERE user_id = ?", (target_id,)).fetchone()[0] == 0: #возвращаем пользователю стандартные права как у новых пользователей chat_permisson = await bot.get_chat(chat_id) await bot.restrict_chat_member(message.chat.id, target_id, until_date=0, permissions=chat_permisson.permissions) await message.reply( f"Пользователь [{target_name}](tg://user?id={target_id}) размьючен", parse_mode="Markdown") #обрабатываем вход пользователя в чат @dp.message_handler(content_types=['new_chat_members']) async def new_chat_members(message: types.Message): #проверяем что пользователь не бот if not message.new_chat_members[0].is_bot: #Заносим пользователя в базу данных cursor.execute("INSERT INTO user_list VALUES (?, ?, ?, ?)", (message.new_chat_members[0].id, message.new_chat_members[0].username, 0, 0)) @dp.message_handler(commands=['chatinfo']) async def chat_info(message: types.Message): #Выводит информацию о чате. Название, количество пользователей, количество администраторов, количество модераторов, количество сообщений. if await check_moderator(message.from_user.id): chat_id = message.chat.id chat_title = message.chat.title chat_members = await bot.get_chat_members_count(chat_id) chat_admins = await bot.get_chat_administrators(chat_id) chat_moderators = cursor.execute("SELECT COUNT(user_id) FROM user_list WHERE user_role = 1").fetchone()[0] chat_messages = cursor.execute("SELECT chat_stats FROM chat_list WHERE chat_id = ?", (chat_id,)).fetchone()[0] await message.reply( f"Название чата: {chat_title}\n" f"Количество пользователей: {chat_members}\n" f"Количество администраторов: {len(chat_admins)}\n" f"Количество модераторов: {chat_moderators}\n" f"Количество сообщений: {chat_messages}", parse_mode="HTML") await top10(message) else: await message.reply( f"У вас недостаточно прав для выполнения этой команды.") @dp.message_handler(commands=['start']) async def start(message: types.Message): #проверка что чат есть в базе данных if cursor.execute("SELECT chat_id FROM chat_list WHERE chat_id = ?", (message.chat.id,)).fetchone() is None: chat_id = message.chat.id chat_role = 0 chat_stats = 1 cursor.execute("INSERT INTO chat_list VALUES (?, ?, ?)", (chat_id, chat_role, chat_stats)) database.commit() await message.reply( f"{config['Telegram']['start_answer']}", parse_mode="Markdown") else: await message.reply( f"Чат уже инициализирован.", parse_mode="Markdown") # Проверка наличия столбца имени пользователя в базе данных, если его нет, то добавить. try: cursor.execute("SELECT user_name FROM user_list") except sqlite3.OperationalError: cursor.execute("ALTER TABLE user_list ADD COLUMN user_name TEXT") database.commit() await message.reply("База данных пользователей реструктурирована.") # Проверка наличия столбца репутации пользователя в базе данных, если его нет, то добавить. try: cursor.execute("SELECT user_rep FROM user_list") except sqlite3.OperationalError: cursor.execute("ALTER TABLE user_list ADD COLUMN user_rep INTEGER") database.commit() await message.reply("База данных пользователей реструктурирована.") @dp.message_handler(commands=['top10']) async def top10(message: types.Message): #топ 10 пользователей по количеству сообщений в user_stats в формате: Имя пользователя - количество сообщений top10 = cursor.execute("SELECT user_id, user_stats FROM user_list ORDER BY user_stats DESC LIMIT 10").fetchall() top10_message = '' for user in top10: username = (cursor.execute("SELECT user_name FROM user_list WHERE user_id = ?", (user[0],)).fetchone())[0] if username is None: username = "Аноним" top10_message += f"{username} - {user[1]}\n" #в начале сообщения берём текст из config.ini и вставляем в него топ 10 пользователей await message.reply( f"{config['Telegram']['top10_answer']}\n{top10_message}") @dp.message_handler(commands=['aboutme']) async def aboutme(message: types.Message): your_id = message.from_id await check(your_id) your_name = message.from_user.username user_stats = cursor.execute("SELECT user_stats FROM user_list WHERE user_id = ?", (message.from_user.id,)).fetchone() rolenum = cursor.execute("SELECT user_role FROM user_list WHERE user_id = ?", (message.from_user.id,)).fetchone() user_rep = cursor.execute("SELECT user_rep FROM user_list WHERE user_id = ?", (message.from_user.id,)).fetchone() if "None" in str(user_rep): user_rep = 0 cursor.execute("UPDATE user_list SET user_rep = ? WHERE user_id = ?", (user_rep, your_id)) role = await get_role_name(rolenum[0]) #Имя пользователя на следующей строке статистика сообщений на следующей строке права пользователя #если пользователь есть в списке администраторов чата, то выдаём роль администратор chat_admins = await bot.get_chat_administrators(message.chat.id) default_for_admin = int(config['Roles']['default_for_admin']) no_answer = False if rolenum[0] < default_for_admin: for admin in chat_admins: if admin.user.id == your_id: if default_for_admin == 0: await message.reply( f"Вы обнаружены в списке администраторов чата! \n" f"Вам будет выдана роль: {config['Roles']['user']}\n" f"Имя: {your_name}\n" f"Кол-во сообщений: {user_stats[0]}\n" f"Репутация: {user_rep[0]}\n" f"Роль: {role}", parse_mode="HTML") no_answer = True elif default_for_admin == 1: await message.reply( f"Вы обнаружены в списке администраторов чата! \n" f"Вам будет выдана роль: {config['Roles']['moderator']}\n" f"Имя: {your_name}\n" f"Кол-во сообщений: {user_stats[0]}\n" f"Репутация: {user_rep[0]}\n" f"Роль: {role}", parse_mode="HTML") no_answer = True elif default_for_admin == 2: await message.reply( f"Вы обнаружены в списке администраторов чата! \n" f"Вам будет выдана роль: {config['Roles']['admin']}\n" f"Имя: {your_name}\n" f"Кол-во сообщений: {user_stats[0]}\n" f"Репутация: {user_rep[0]}\n" f"Роль: {role}", parse_mode="HTML") no_answer = True cursor.execute("UPDATE user_list SET user_role = ? WHERE user_id = ?", (default_for_admin, your_id)) database.commit() if no_answer == False: await message.reply( f"Имя: {your_name}\n" f"Кол-во сообщений: {user_stats[0]}\n" f"Репутация: {user_rep[0]}\n" f"Роль: {role}", parse_mode="HTML") @dp.message_handler(commands=['setrole']) async def setrole(message: types.Message): try: user_role = cursor.execute("SELECT user_role FROM user_list WHERE user_id = ?", (message.from_user.id,)).fetchone() user_name = message.from_user.username target_name = message.text.split()[1] target_name = target_name[1:] target_role = cursor.execute("SELECT user_role FROM user_list WHERE user_name = ?", (target_name,)).fetchone()[0] user_id = cursor.execute("SELECT user_id FROM user_list WHERE user_name = ?", (target_name,)).fetchone() user_id = user_id[0] except: await message.reply("Пользователь не найден!") return if await check_admin(user_id, message.chat.id) and target_role == 2: await message.reply("Вы не можете изменить роль этому пользователю!") else: if user_role[0] == 2: try: #Получаем id пользователя по нику в базе данных role = message.text.split()[2] if role == "0" or role == config['Roles']['user']: role = config['Roles']['user'] rolenum = 0 elif role == "1" or role == config['Roles']['moderator']: role = config['Roles']['moderator'] rolenum = 1 elif role == "2" or role == config['Roles']['admin']: role = config['Roles']['admin'] rolenum = 2 cursor.execute("UPDATE user_list SET user_role = ? WHERE user_id = ?", (rolenum, user_id)) database.commit() await message.reply( f"Пользователю [{target_name}](tg://user?id={str(user_id)}) выдана роль: {role}", parse_mode="Markdown") except: await message.reply( f"Ошибка! Проверьте правильность написания команды.", parse_mode="Markdown") else: await message.reply( f"Ошибка! У вас нет прав на использование этой команды.", parse_mode="Markdown") @dp.message_handler(commands=['about']) async def about(message: types.Message): # проверка что в сообщении есть тег пользователя if len(message.text.split()) == 2: try: target_name = message.text.split()[1] target_name = target_name[1:] target_id = cursor.execute("SELECT user_id FROM user_list WHERE user_name = ?", (target_name,)).fetchone() target_id = target_id[0] user_rep = cursor.execute("SELECT user_rep FROM user_list WHERE user_id = ?", (target_id,)).fetchone()[0] if "None" in str(user_rep): user_rep = 0 cursor.execute("UPDATE user_list SET user_rep = ? WHERE user_id = ?", (user_rep, target_id)) await check(target_id) except: await message.reply( f"Ошибка! Проверьте правильность написания тега пользователя.", parse_mode="Markdown") user_stats = cursor.execute("SELECT user_stats FROM user_list WHERE user_id = ?", (target_id,)).fetchone()[0] user_role = cursor.execute("SELECT user_role FROM user_list WHERE user_id = ?", (target_id,)).fetchone()[0] user_role = await get_role_name(user_role) await message.reply( f"Имя: {target_name}\n" f"Кол-во сообщений: {user_stats}\n" f"Репутация: {user_rep}\n" f"Роль: {user_role}", parse_mode="HTML") else: await message.reply( f"Ошибка! Проверьте правильность написания команды.") from src.OpenAI.GPT35turbo.OA_processing import openai_message_history_processing @dp.message_handler(commands=['history']) async def history(message: types.Message): if message.reply_to_message is not None: # Получаем id сообщения на которое отвечает message start_message_id = message.reply_to_message.message_id # Получаем id сообщения message end_message_id = message.message_id elif message.text.split()[1] != '': end_message_id = message.message_id start_message_id = end_message_id - int(message.text.split()[1]) else: await message.reply("Ошибка! Проверьте правильность написания команды.") return temp_message = await message.reply("Подождите немного, я обрабатываю историю сообщений... Функция ЭКСПЕРЕМЕНТАЛЬНАЯ " "и может работать некорректно.") # Получаем историю сообщений response = openai_message_history_processing(start_message_id, end_message_id) if response is None: await message.reply("Я не понял тебя, попробуй перефразировать") else: bot_message_id = await message.reply(response['choices'][0]['message']['content'], parse_mode="markdown") # заносим сообщение в базу данных в качестве message_id мы пишем id сообщения в bot_message_id cursor.execute("INSERT INTO message_list VALUES (?, ?, ?, ?)", (bot_message_id.message_id, response['choices'][0]['message']['content'], 0, message.message_id)) # очищаем сообщение asyncio.create_task(delete_message(temp_message, 0)) from src.OpenAI.GPT35turbo.OA_processing import openai_message_processing async def delete_message(message: types.Message, sleep_time: int = 0): await asyncio.sleep(sleep_time) with suppress(MessageCantBeDeleted, MessageToDeleteNotFound): await message.delete() @dp.message_handler() async def in_message(message: types.Message): chat_id = message.chat.id # Получение сообщений в чате, и запись их в базу данных if (message.chat.type == "private" or message.chat.type == "channel"): await message.reply( f"{config['Telegram']['private_answer']}", parse_mode="Markdown") elif (message.chat.type != "group" or message.chat.type != "supergroup") and \ message.text != '' and message.text != ' ' and \ (cursor.execute("SELECT chat_role FROM chat_list WHERE chat_id;") == 1): return None else: # Запись сообщения в базу данных await save_message(message) # Обработка сообщения OpenAI send_answer = False typing_mode = False # импортируем массив триггеров из файла .ini if message.reply_to_message and message.reply_to_message.from_user.id == (await bot.me).id: send_answer = True typing_mode = True for trigger in bot_trigger_all: if trigger.lower() in message.text.lower(): send_answer = True typing_mode = False for trigger in bot_trigger_front: if message.text.lower().startswith(trigger.lower()): send_answer = True typing_mode = False if send_answer == False: # Если сообщение отвечает на другое сообщение, то проверяем наличие слова спасибо if message.reply_to_message is not None: if message.text.startswith("Спасибо"): target_id = message.reply_to_message.from_user.id target_name = cursor.execute("SELECT user_name FROM user_list WHERE user_id = ?", (target_id,)).fetchone()[0] cursor.execute("UPDATE user_list SET user_rep = user_rep + 1 WHERE user_id = ?", (target_id,)) database.commit() await message.reply( f"Спасибо, [{target_name}](tg://user?id={str(target_id)}) за ваш ответ!", parse_mode="Markdown") if send_answer: if typing_mode is False: your_id = message.from_id your_name = message.from_user.username temp_msg = await message.reply( f"[{your_name}](tg://user?id={str(your_id)}), Подожди немного и я обязательно отвечу тебе!", parse_mode="Markdown") # Пишем что бот печатает await bot.send_chat_action(message.chat.id, "typing") # Получаем ответ от OpenAI response = openai_message_processing(message.message_id) if response is None: bot_message_id = await message.reply("Я не понял тебя, попробуй перефразировать") if typing_mode is False: asyncio.create_task(delete_message(temp_msg, 0)) # заносим сообщение в базу данных в качестве message_id пишем id сообщения которое отправил бот cursor.execute("INSERT INTO message_list VALUES (?, ?, ?, ?)", (bot_message_id, "Я не понял тебя, попробуй перефразировать", 0, message.message_id)) else: bot_message_id = await message.reply(response['choices'][0]['message']['content'], parse_mode="markdown") if typing_mode is False: asyncio.create_task(delete_message(temp_msg, 0)) # заносим сообщение в базу данных в качестве message_id мы пишем id сообщения в bot_message_id cursor.execute("INSERT INTO message_list VALUES (?, ?, ?, ?)", (bot_message_id.message_id, response['choices'][0]['message']['content'], 0, message.message_id)) # очищаем базу данных от старых сообщений cursor.execute("DELETE FROM message_list WHERE message_id < ?", (bot_message_id.message_id - DB_message_limit,)) database.commit() if __name__ == '__main__': executor.start_polling(dp, skip_updates=True)