Удалены старые директории

This commit is contained in:
armatik 2023-11-25 19:46:09 +03:00
parent b386e258cc
commit 9e64cbfc4c
35 changed files with 1 additions and 1219 deletions

View File

@ -1 +1 @@
# OpenChatAiBot # OpenChatAiBot V2

View File

@ -1,3 +0,0 @@
aiogram==3.1.1
openai==0.27.8
PyYAML==6.0.1

View File

@ -1,198 +0,0 @@
import sqlite3
import os
import configparser
from openai import OpenAIError
import time
mother_path = os.path.dirname(os.path.dirname(os.getcwd()))
config = configparser.ConfigParser()
config.read(os.path.join(mother_path, 'src/config.ini'))
database = sqlite3.connect(os.path.join(mother_path, 'DataBase/OCAB_DB.db'))
cursor = database.cursor()
reply_ignore = config['Telegram']['reply_ignore'].split('| ')
reply_ignore = list(map(int, reply_ignore))
#print(reply_ignore)
min_token_for_answer = int(config['Openai']['min_token_for_answer'])
# Импорт библиотек
import openai
max_token_count = int(config['Openai']['max_token_count'])
# Создание файла лога если его нет
if not os.path.exists(os.path.join(mother_path, 'src/OpenAI/GPT35turbo/log.txt')):
with open(os.path.join(mother_path, 'src/OpenAI/GPT35turbo/log.txt'), 'w') as log_file:
log_file.write('')
def openai_response(message_formated_text):
# Запуск OpenAI
# Считаем размер полученного текста
#print(message_formated_text)
count_length = 0
if len(message_formated_text) == 0:
message_formated_text = [
{
"role": "user",
"content": "Напиши короткий ответ говорящий что контекст сообщения слишком длинный и попроси задать вопрос отдельно без ответа на другие сообщения по ключевому слову"
}
]
for message in message_formated_text:
#print(message["content"])
count_length += int(len(message["content"]))
#print(count_length)
try:
response = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=message_formated_text,
max_tokens=max_token_count - count_length
)
except OpenAIError as ex:
if 'on requests per min. Limit: 3 / min. Please try again' in str(ex):
response = ('Извини мой процессор перегрелся, дай мне минутку отдохнуть')
elif 'Bad gateway.' in str(ex):
response = (
'Ой, где я? Кажется кто то перерзал мой интернет кабель, подожди немного пока я его починю')
#запись ошибки в лог с указанием времени и даты
with open(os.path.join(mother_path, 'src/OpenAI/GPT35turbo/log.txt'), 'a') as log_file:
log_file.write('\n' + time.strftime("%d.%m.%Y %H:%M:%S") + ' ' + str(ex))
return response
def sort_message_from_user(message_formated_text, message_id):
# print(int(*(
# cursor.execute("SELECT message_sender FROM message_list WHERE message_id = ?", (message_id,)).fetchone())))
if int(*(
cursor.execute("SELECT message_sender FROM message_list WHERE message_id = ?",
(message_id,)).fetchone())) == 0:
message_formated_text.append({
"role": "assistant",
"content": str(*(cursor.execute("SELECT message_text FROM message_list WHERE message_id = ?",
(message_id,)).fetchone()))
})
else:
message_formated_text.append({
"role": "user",
"content": str(*(cursor.execute("SELECT message_text FROM message_list WHERE message_id = ?",
(message_id,)).fetchone()))
})
#Проверка что длина всех сообщений в кортеже не превышает max_token_count-min_token_for_answer
return message_formated_text
def openai_collecting_message(message_id, message_formated_text):
# собирает цепочку сообщений для OpenAI длинной до max_token_count
# проверяем что сообщение отвечает на другое сообщение
#print(int(*(cursor.execute("SELECT answer_id FROM message_list WHERE message_id = ?", (message_id,)).fetchone())))
#print(reply_ignore)
if int(*(cursor.execute("SELECT answer_id FROM message_list WHERE message_id = ?", (message_id,)).fetchone())) not in reply_ignore:
# Продолжаем искать ответы на сообщения
#print(int(*(cursor.execute("SELECT answer_id FROM message_list WHERE message_id = ?", (message_id,)).fetchone())))
message_formated_text = openai_collecting_message(int(*(cursor.execute("SELECT answer_id FROM message_list WHERE message_id = ?", (message_id,)).fetchone())), message_formated_text)
#Проверяем ID отправителя сообщения, если 0 то это сообщение от бота
sort_message_from_user(message_formated_text, message_id)
else:
# Проверяем ID отправителя сообщения, если 0 то это сообщение от бота
sort_message_from_user(message_formated_text, message_id)
return message_formated_text
def openai_message_processing(message_id):
#проверяем на наличие сообщения в базе данных
if cursor.execute("SELECT message_text FROM message_list WHERE message_id = ?", (message_id,)).fetchone() is None:
return None
else:
# проверяем на то что сообщение влезает в max_token_count с учётом message_formated_text
message_formated_text = [
{
"role": "system",
"content": config['Openai']['story_model']
}
]
if ((len(str(cursor.execute("SELECT message_text FROM message_list WHERE message_id")))) < (max_token_count - len(message_formated_text[0]['content']))):
message_formated_text = openai_collecting_message(message_id, message_formated_text)
count_length = 0
# Обработка невозможности ответить на сообщение
try:
for message in message_formated_text:
count_length += len(message['content'])
while count_length > max_token_count - min_token_for_answer:
message_formated_text.pop(1)
count_length = 0
for message in message_formated_text:
count_length += len(message['content'])
except IndexError:
message_formated_text = [
{
"role": "system",
"content": "Выведи сообщение об ошибке."
}
]
response = openai_response(message_formated_text)
return response
else:
return f"Сообщение слишком длинное, максимальная длина сообщения \
{max_token_count - len(message_formated_text[0]['content'])} символов, укоротите его на \
{len(str(cursor.execute('SELECT message_text FROM message_list WHERE message_id'))) - max_token_count} символов"
def openai_collecting_history_context(start_id, end_id, message_formated_text, message_id = 0):
# собираем список сообщений для OpenAI длинной до 14500 символов начиная с end_id и заканчивая start_id
if message_id == 0:
message_id = end_id
message_formated_text = openai_collecting_history_context(start_id, end_id, message_formated_text, message_id)
elif message_id > start_id:
message_formated_text = openai_collecting_history_context(start_id, end_id, message_formated_text, (message_id - 1))
try:
message_formated_text.append({
"role": "user",
"content": str(*(cursor.execute("SELECT message_text FROM message_list WHERE message_id = ?",
(message_id,)).fetchone()))
})
except Exception:
pass
return message_formated_text
def openai_message_history_processing(start_id, end_id):
message_formated_text = []
message_formated_text = openai_collecting_history_context(start_id, end_id, message_formated_text)
max_token_count_history = 15000
min_token_for_answer_history = 1500
count_length = 0
try:
for message in message_formated_text:
count_length += len(message['content'])
while count_length > max_token_count_history - min_token_for_answer_history:
message_formated_text.pop(1)
count_length = 0
for message in message_formated_text:
count_length += len(message['content'])
except IndexError:
message_formated_text = [
{
"role": "system",
"content": "Выведи сообщение об ошибке."
}
]
message_formated_text.append({
"role": "user",
"content": "Сделай пересказ всей истории сообщений без потери смысла от 3-его лица."
})
try:
response = openai.ChatCompletion.create(
model="gpt-3.5-turbo-16k",
messages=message_formated_text,
max_tokens=max_token_count_history - count_length
)
except OpenAIError as ex:
if 'on requests per min. Limit: 3 / min. Please try again' in str(ex):
response = ('Извини мой процессор перегрелся, дай мне минутку отдохнуть')
elif 'Bad gateway.' in str(ex):
response = (
'Ой, где я? Кажется кто то перерзал мой интернет кабель, подожди немного пока я его починю')
# запись ошибки в лог с указанием времени и даты
with open(os.path.join(mother_path, 'src/OpenAI/GPT35turbo/log.txt'), 'a') as log_file:
log_file.write('\n' + time.strftime("%d.%m.%Y %H:%M:%S") + ' ' + str(ex))
return response

View File

@ -1,8 +0,0 @@
# Получение сообщений в чате, и запись их в базу данных
from aiogram import types
from src.TelegramBot.main import dp
from main import cursor, config
#импортировать функцию для обработки сообщений из OpenAI

View File

@ -1,566 +0,0 @@
# Импорт библиотек
import os
from contextlib import suppress
import openai
import configparser
import sqlite3
import asyncio
import sys
import datetime
from time import mktime
from aiogram import Bot, Dispatcher, executor, types
from aiogram.contrib.fsm_storage.memory import MemoryStorage
from aiogram.utils.exceptions import MessageCantBeDeleted, MessageToDeleteNotFound
mother_path = os.path.dirname(os.path.dirname(os.getcwd()))
sys.path.insert(1, mother_path)
# Импорт переменных из файла .ini
config = configparser.ConfigParser()
config.read(os.path.join(mother_path, 'src/config.ini'))
TOKEN = config['Telegram']['token']
OPENAI_API_KEY = (config['Openai']['api_key'])
bot_trigger_front = (config['Telegram']['bot_trigger_front']).split('|')
bot_trigger_all = (config['Telegram']['bot_trigger_all']).split('|')
# удаление лишних элементов массивов
bot_trigger_front.remove('')
bot_trigger_all.remove('')
DB_message_limit = int(config['DataBase']['message_limit'])
# Инициализация бота
bot = Bot(token=TOKEN)
dp = Dispatcher(bot, storage=MemoryStorage())
# Инициализация API OpenAI
openai.api_key = OPENAI_API_KEY
# Инициализация базы данных OCAB_DB в папке DataBase/OCAB_DB.db
# Создаём базу данных sqlite3 по пути /home/armatik/PycharmProjects/OpenChatAiBot/DataBase/OCAB_DB.db
database = sqlite3.connect(os.path.join(mother_path, 'DataBase/OCAB_DB.db'))
cursor = database.cursor()
# Создаём таблицу chat_list
cursor.execute("""CREATE TABLE IF NOT EXISTS chat_list (
chat_id INTEGER PRIMARY KEY,
chat_role INTEGER NOT NULL,
chat_stats INTEGER NOT NULL
)""")
# Создаём таблицу message_list
cursor.execute("""CREATE TABLE IF NOT EXISTS message_list (
message_id INTEGER PRIMARY KEY,
message_text TEXT NOT NULL,
message_sender INTEGER NOT NULL,
answer_id INTEGER
)""")
# Создаём таблицу user_list
cursor.execute("""CREATE TABLE IF NOT EXISTS user_list (
user_id INTEGER PRIMARY KEY,
user_name TEXT NOT NULL,
user_role INTEGER,
user_stats INTEGER
user_rep INTEGER
)""")
#запись информации о чате в базу данных
async def empty_role(id):
cursor.execute("UPDATE user_list SET user_role = ? WHERE user_id = ?", (0, id))
database.commit()
async def check(id):
#проверка что у человека есть роль
user_role = cursor.execute("SELECT user_role FROM user_list WHERE user_id = ?", (id,)).fetchone()[0]
if user_role not in [0, 1, 2]:
await empty_role(id)
async def get_role_name(rolenum):
rolenum = int(rolenum)
if rolenum == 0:
role = config['Roles']['user']
elif rolenum == 1:
role = config['Roles']['moderator']
elif rolenum == 2:
role = config['Roles']['admin']
return role
async def get_role(id):
#получение роли пользователя
user_role = cursor.execute("SELECT user_role FROM user_list WHERE user_id = ?", (id,)).fetchone()[0]
return await get_role_name(user_role)
async def check_admin(id, chat_id):
#Проверка что человек есть в списке администраторов чата
chat_admins = await bot.get_chat_administrators(chat_id)
flag = False
for admin in chat_admins:
if admin.user.id == id:
flag = True
return flag
async def check_moderator(id):
#Проверка что человек имеет роль модератора
if cursor.execute("SELECT user_role FROM user_list WHERE user_id = ?", (id,)).fetchone()[0] >= 1:
return True
else:
return False
async def check_user(id):
#Проверка что человек имеет роль пользователя
if cursor.execute("SELECT user_role FROM user_list WHERE user_id = ?", (id,)).fetchone()[0] == 0:
return True
else:
return False
async def save_message(message):
#Сохранение сообщения в базу данных
cursor.execute("INSERT INTO message_list VALUES (?, ?, ?, ?)", (message.message_id, message.text, message.from_user.id, 0))
if message.reply_to_message is not None:
cursor.execute("UPDATE message_list SET answer_id = ? WHERE message_id = ?", (message.reply_to_message.message_id, message.message_id))
#Если отправитель не зарегистрирован в базе данных, то добавляем его
if cursor.execute("SELECT user_id FROM user_list WHERE user_id = ?", (message.from_user.id,)).fetchone() is None:
cursor.execute("INSERT INTO user_list VALUES (?, ?, ?, ?)", (message.from_user.id, message.from_user.username, 0, 1))
#Добавляем статистику в чат и в данные пользователя, если пользователь не бот.
cursor.execute("UPDATE chat_list SET chat_stats = chat_stats + 1 WHERE chat_id = ?", (message.chat.id,))
cursor.execute("UPDATE user_list SET user_stats = user_stats + 1 WHERE user_id = ?", (message.from_user.id,))
database.commit()
async def time_to_seconds(time):
#Конвертация текстового указания времени по типу 3h, 5m, 10s в минуты
if time[-1] == 'd':
return int(time[:-1])*86400
elif time[-1] == 'h':
return int(time[:-1])*3600
elif time[-1] == 'm':
return int(time[:-1])*60
elif time[-1] == 's':
return int(time[:-1])
async def short_time_to_time(time):
#Конвертация времени в длинное название
if time[-1] == 'd':
return str(f"{time[0:-1]} дней")
elif time[-1] == 'h':
return str(f"{time[0:-1]} часов")
elif time[-1] == 'm':
return str(f"{time[0:-1]} минут")
elif time[-1] == 's':
return str(f"{time[0:-1]} секунд")
@dp.message_handler(commands=['mute'])
async def mute(message: types.Message):
#Проверка что отправитель является администратором чата
try:
if await check_moderator(message.from_user.id):
#Проверка отвечает ли сообщение на другое сообщение
if message.reply_to_message is not None:
time = message.text.split(' ')[1]
#получаем id отправителя сообщение на которое отвечает message
target_id = message.reply_to_message.from_user.id
target_name = cursor.execute("SELECT user_name FROM user_list WHERE user_id = ?", (target_id,)).fetchone()[0]
#Проверка, что человек пользователь
if cursor.execute("SELECT user_role FROM user_list WHERE user_id = ?", (target_id,)).fetchone()[0] == 0:
#ограничения прав пользователя по отправке сообщений на time секунд
time_sec = await time_to_seconds(message.text.split(' ')[1])
if time_sec <= 30 or time_sec >= 31536000:
await message.reply("Время мута должно быть больше 30 секунд и меньше 365 дней")
return
date_string = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
date_time = datetime.datetime.strptime(date_string, "%Y-%m-%d %H:%M:%S")
unix_time = int(mktime(date_time.timetuple()))
await bot.restrict_chat_member(message.chat.id, target_id, until_date=unix_time+time_sec, permissions=types.ChatPermissions(can_send_messages=False))
await message.reply(
f"Пользователь {target_name} замьючен на {await short_time_to_time(time)}")
else:
await message.reply(f"Пользователь [{target_name}](tg://user?id={target_id}) является {await get_role(target_id)} и не может быть замьючен",
parse_mode='Markdown')
return
else:
target_tag = message.text.split(' ')[1]
target_tag = target_tag[1:]
target_id = int(cursor.execute("SELECT user_id FROM user_list WHERE user_name = ?", (target_tag,)).fetchone()[0])
target_name = cursor.execute("SELECT user_name FROM user_list WHERE user_id = ?", (target_id,)).fetchone()[0]
#ограничения прав пользователя по отправке сообщений на time секунд
time_mute = message.text.split(' ')[2]
if cursor.execute("SELECT user_role FROM user_list WHERE user_id = ?", (target_id,)).fetchone()[0] == 0:
#ограничения прав пользователя по отправке сообщений на time секунд
time_sec = await time_to_seconds(message.text.split(' ')[2])
if time_sec <= 30 or time_sec >= 31536000:
await message.reply("Время мута должно быть больше 30 секунд и меньше 365 дней")
return
date_string = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
date_time = datetime.datetime.strptime(date_string, "%Y-%m-%d %H:%M:%S")
unix_time = int(mktime(date_time.timetuple()))
await bot.restrict_chat_member(message.chat.id, target_id, until_date=unix_time+time_sec, permissions=types.ChatPermissions(can_send_messages=False))
await message.reply(
f"Пользователь {target_name} замьючен на {await short_time_to_time(time_mute)}.")
else:
await message.reply(f"Пользователь [{target_name}](tg://user?id={target_id}) является {await get_role(target_id)} и не может быть замьючен",
parse_mode="Markdown")
return
except:
await message.reply("Ошибка данных. Возможно пользователь ещё ничего не написал.")
@dp.message_handler(commands=['unmute'])
async def unmute(message: types.Message):
chat_id= message.chat.id
if await check_moderator(message.from_user.id):
if message.reply_to_message is not None:
target_id = message.reply_to_message.from_user.id
target_name = cursor.execute("SELECT user_name FROM user_list WHERE user_id = ?", (target_id,)).fetchone()[0]
await bot.restrict_chat_member(message.chat.id, target_id, until_date=0, permissions=types.ChatPermissions(can_send_messages=True))
await message.reply(
f"Пользователь [{target_name}](tg://user?id={target_id}) размьючен",
parse_mode="Markdown")
else:
target_tag = message.text.split(' ')[1]
target_tag = target_tag[1:]
target_id = int(cursor.execute("SELECT user_id FROM user_list WHERE user_name = ?", (target_tag,)).fetchone()[0])
target_name = cursor.execute("SELECT user_name FROM user_list WHERE user_id = ?", (target_id,)).fetchone()[0]
#получаем стандартные права чата
if cursor.execute("SELECT user_role FROM user_list WHERE user_id = ?", (target_id,)).fetchone()[0] == 0:
#возвращаем пользователю стандартные права как у новых пользователей
chat_permisson = await bot.get_chat(chat_id)
await bot.restrict_chat_member(message.chat.id, target_id, until_date=0, permissions=chat_permisson.permissions)
await message.reply(
f"Пользователь [{target_name}](tg://user?id={target_id}) размьючен",
parse_mode="Markdown")
#обрабатываем вход пользователя в чат
@dp.message_handler(content_types=['new_chat_members'])
async def new_chat_members(message: types.Message):
#проверяем что пользователь не бот
if not message.new_chat_members[0].is_bot:
#Заносим пользователя в базу данных
cursor.execute("INSERT INTO user_list VALUES (?, ?, ?, ?)", (message.new_chat_members[0].id, message.new_chat_members[0].username, 0, 0))
@dp.message_handler(commands=['chatinfo'])
async def chat_info(message: types.Message):
#Выводит информацию о чате. Название, количество пользователей, количество администраторов, количество модераторов, количество сообщений.
if await check_moderator(message.from_user.id):
chat_id = message.chat.id
chat_title = message.chat.title
chat_members = await bot.get_chat_members_count(chat_id)
chat_admins = await bot.get_chat_administrators(chat_id)
chat_moderators = cursor.execute("SELECT COUNT(user_id) FROM user_list WHERE user_role = 1").fetchone()[0]
chat_messages = cursor.execute("SELECT chat_stats FROM chat_list WHERE chat_id = ?", (chat_id,)).fetchone()[0]
await message.reply(
f"<b>Название чата:</b> {chat_title}\n"
f"<b>Количество пользователей:</b> {chat_members}\n"
f"<b>Количество администраторов:</b> {len(chat_admins)}\n"
f"<b>Количество модераторов:</b> {chat_moderators}\n"
f"<b>Количество сообщений:</b> {chat_messages}",
parse_mode="HTML")
await top10(message)
else:
await message.reply(
f"У вас недостаточно прав для выполнения этой команды.")
@dp.message_handler(commands=['start'])
async def start(message: types.Message):
#проверка что чат есть в базе данных
if cursor.execute("SELECT chat_id FROM chat_list WHERE chat_id = ?", (message.chat.id,)).fetchone() is None:
chat_id = message.chat.id
chat_role = 0
chat_stats = 1
cursor.execute("INSERT INTO chat_list VALUES (?, ?, ?)", (chat_id, chat_role, chat_stats))
database.commit()
await message.reply(
f"{config['Telegram']['start_answer']}",
parse_mode="Markdown")
else:
await message.reply(
f"Чат уже инициализирован.",
parse_mode="Markdown")
# Проверка наличия столбца имени пользователя в базе данных, если его нет, то добавить.
try:
cursor.execute("SELECT user_name FROM user_list")
except sqlite3.OperationalError:
cursor.execute("ALTER TABLE user_list ADD COLUMN user_name TEXT")
database.commit()
await message.reply("База данных пользователей реструктурирована.")
# Проверка наличия столбца репутации пользователя в базе данных, если его нет, то добавить.
try:
cursor.execute("SELECT user_rep FROM user_list")
except sqlite3.OperationalError:
cursor.execute("ALTER TABLE user_list ADD COLUMN user_rep INTEGER")
database.commit()
await message.reply("База данных пользователей реструктурирована.")
@dp.message_handler(commands=['top10'])
async def top10(message: types.Message):
#топ 10 пользователей по количеству сообщений в user_stats в формате: Имя пользователя - количество сообщений
top10 = cursor.execute("SELECT user_id, user_stats FROM user_list ORDER BY user_stats DESC LIMIT 10").fetchall()
top10_message = ''
for user in top10:
username = (cursor.execute("SELECT user_name FROM user_list WHERE user_id = ?", (user[0],)).fetchone())[0]
if username is None:
username = "Аноним"
top10_message += f"{username} - {user[1]}\n"
#в начале сообщения берём текст из config.ini и вставляем в него топ 10 пользователей
await message.reply(
f"{config['Telegram']['top10_answer']}\n{top10_message}")
@dp.message_handler(commands=['aboutme'])
async def aboutme(message: types.Message):
your_id = message.from_id
await check(your_id)
your_name = message.from_user.username
user_stats = cursor.execute("SELECT user_stats FROM user_list WHERE user_id = ?", (message.from_user.id,)).fetchone()
rolenum = cursor.execute("SELECT user_role FROM user_list WHERE user_id = ?", (message.from_user.id,)).fetchone()
user_rep = cursor.execute("SELECT user_rep FROM user_list WHERE user_id = ?", (message.from_user.id,)).fetchone()
if "None" in str(user_rep):
user_rep = 0
cursor.execute("UPDATE user_list SET user_rep = ? WHERE user_id = ?", (user_rep, your_id))
role = await get_role_name(rolenum[0])
#Имя пользователя на следующей строке статистика сообщений на следующей строке права пользователя
#если пользователь есть в списке администраторов чата, то выдаём роль администратор
chat_admins = await bot.get_chat_administrators(message.chat.id)
default_for_admin = int(config['Roles']['default_for_admin'])
no_answer = False
if rolenum[0] < default_for_admin:
for admin in chat_admins:
if admin.user.id == your_id:
if default_for_admin == 0:
await message.reply(
f"<b>Вы обнаружены в списке администраторов чата!</b> \n"
f"<b>Вам будет выдана роль:</b> {config['Roles']['user']}\n"
f"<b>Имя:</b> <a href=\"tg://user?id={str(your_id)}\">{your_name}</a>\n"
f"<b>Кол-во сообщений:</b> {user_stats[0]}\n"
f"<b>Репутация:</b> {user_rep[0]}\n"
f"<b>Роль:</b> {role}",
parse_mode="HTML")
no_answer = True
elif default_for_admin == 1:
await message.reply(
f"<b>Вы обнаружены в списке администраторов чата!</b> \n"
f"<b>Вам будет выдана роль:</b> {config['Roles']['moderator']}\n"
f"<b>Имя:</b> <a href=\"tg://user?id={str(your_id)}\">{your_name}</a>\n"
f"<b>Кол-во сообщений:</b> {user_stats[0]}\n"
f"<b>Репутация:</b> {user_rep[0]}\n"
f"<b>Роль:</b> {role}",
parse_mode="HTML")
no_answer = True
elif default_for_admin == 2:
await message.reply(
f"<b>Вы обнаружены в списке администраторов чата!</b> \n"
f"<b>Вам будет выдана роль:</b> {config['Roles']['admin']}\n"
f"<b>Имя:</b> <a href=\"tg://user?id={str(your_id)}\">{your_name}</a>\n"
f"<b>Кол-во сообщений:</b> {user_stats[0]}\n"
f"<b>Репутация:</b> {user_rep[0]}\n"
f"<b>Роль:</b> {role}",
parse_mode="HTML")
no_answer = True
cursor.execute("UPDATE user_list SET user_role = ? WHERE user_id = ?", (default_for_admin, your_id))
database.commit()
if no_answer == False:
await message.reply(
f"<b>Имя:</b> <a href=\"tg://user?id={str(your_id)}\">{your_name}</a>\n"
f"<b>Кол-во сообщений:</b> {user_stats[0]}\n"
f"<b>Репутация:</b> {user_rep[0]}\n"
f"<b>Роль:</b> {role}",
parse_mode="HTML")
@dp.message_handler(commands=['setrole'])
async def setrole(message: types.Message):
try:
user_role = cursor.execute("SELECT user_role FROM user_list WHERE user_id = ?", (message.from_user.id,)).fetchone()
user_name = message.from_user.username
target_name = message.text.split()[1]
target_name = target_name[1:]
target_role = cursor.execute("SELECT user_role FROM user_list WHERE user_name = ?", (target_name,)).fetchone()[0]
user_id = cursor.execute("SELECT user_id FROM user_list WHERE user_name = ?", (target_name,)).fetchone()
user_id = user_id[0]
except:
await message.reply("Пользователь не найден!")
return
if await check_admin(user_id, message.chat.id) and target_role == 2:
await message.reply("Вы не можете изменить роль этому пользователю!")
else:
if user_role[0] == 2:
try:
#Получаем id пользователя по нику в базе данных
role = message.text.split()[2]
if role == "0" or role == config['Roles']['user']:
role = config['Roles']['user']
rolenum = 0
elif role == "1" or role == config['Roles']['moderator']:
role = config['Roles']['moderator']
rolenum = 1
elif role == "2" or role == config['Roles']['admin']:
role = config['Roles']['admin']
rolenum = 2
cursor.execute("UPDATE user_list SET user_role = ? WHERE user_id = ?", (rolenum, user_id))
database.commit()
await message.reply(
f"Пользователю [{target_name}](tg://user?id={str(user_id)}) выдана роль: {role}",
parse_mode="Markdown")
except:
await message.reply(
f"Ошибка! Проверьте правильность написания команды.",
parse_mode="Markdown")
else:
await message.reply(
f"Ошибка! У вас нет прав на использование этой команды.",
parse_mode="Markdown")
@dp.message_handler(commands=['about'])
async def about(message: types.Message):
# проверка что в сообщении есть тег пользователя
if len(message.text.split()) == 2:
try:
target_name = message.text.split()[1]
target_name = target_name[1:]
target_id = cursor.execute("SELECT user_id FROM user_list WHERE user_name = ?", (target_name,)).fetchone()
target_id = target_id[0]
user_rep = cursor.execute("SELECT user_rep FROM user_list WHERE user_id = ?", (target_id,)).fetchone()[0]
if "None" in str(user_rep):
user_rep = 0
cursor.execute("UPDATE user_list SET user_rep = ? WHERE user_id = ?", (user_rep, target_id))
await check(target_id)
except:
await message.reply(
f"Ошибка! Проверьте правильность написания тега пользователя.",
parse_mode="Markdown")
user_stats = cursor.execute("SELECT user_stats FROM user_list WHERE user_id = ?", (target_id,)).fetchone()[0]
user_role = cursor.execute("SELECT user_role FROM user_list WHERE user_id = ?", (target_id,)).fetchone()[0]
user_role = await get_role_name(user_role)
await message.reply(
f"<b>Имя:</b> {target_name}\n"
f"<b>Кол-во сообщений:</b> {user_stats}\n"
f"<b>Репутация:</b> {user_rep}\n"
f"<b>Роль:</b> {user_role}", parse_mode="HTML")
else:
await message.reply(
f"Ошибка! Проверьте правильность написания команды.")
from src.OpenAI.GPT35turbo.OA_processing import openai_message_history_processing
@dp.message_handler(commands=['history'])
async def history(message: types.Message):
if message.reply_to_message is not None:
# Получаем id сообщения на которое отвечает message
start_message_id = message.reply_to_message.message_id
# Получаем id сообщения message
end_message_id = message.message_id
elif message.text.split()[1] != '':
end_message_id = message.message_id
start_message_id = end_message_id - int(message.text.split()[1])
else:
await message.reply("Ошибка! Проверьте правильность написания команды.")
return
temp_message = await message.reply("Подождите немного, я обрабатываю историю сообщений... Функция ЭКСПЕРЕМЕНТАЛЬНАЯ "
"и может работать некорректно.")
# Получаем историю сообщений
response = openai_message_history_processing(start_message_id, end_message_id)
if response is None:
await message.reply("Я не понял тебя, попробуй перефразировать")
else:
bot_message_id = await message.reply(response['choices'][0]['message']['content'], parse_mode="markdown")
# заносим сообщение в базу данных в качестве message_id мы пишем id сообщения в bot_message_id
cursor.execute("INSERT INTO message_list VALUES (?, ?, ?, ?)",
(bot_message_id.message_id, response['choices'][0]['message']['content'], 0, message.message_id))
# очищаем сообщение
asyncio.create_task(delete_message(temp_message, 0))
from src.OpenAI.GPT35turbo.OA_processing import openai_message_processing
async def delete_message(message: types.Message, sleep_time: int = 0):
await asyncio.sleep(sleep_time)
with suppress(MessageCantBeDeleted, MessageToDeleteNotFound):
await message.delete()
@dp.message_handler()
async def in_message(message: types.Message):
chat_id = message.chat.id
# Получение сообщений в чате, и запись их в базу данных
if (message.chat.type == "private" or message.chat.type == "channel"):
await message.reply(
f"{config['Telegram']['private_answer']}",
parse_mode="Markdown")
elif (message.chat.type != "group" or message.chat.type != "supergroup") and \
message.text != '' and message.text != ' ' and \
(cursor.execute("SELECT chat_role FROM chat_list WHERE chat_id;") == 1): return None
else:
# Запись сообщения в базу данных
await save_message(message)
# Обработка сообщения OpenAI
send_answer = False
typing_mode = False
# импортируем массив триггеров из файла .ini
if message.reply_to_message and message.reply_to_message.from_user.id == (await bot.me).id:
send_answer = True
typing_mode = True
for trigger in bot_trigger_all:
if trigger.lower() in message.text.lower():
send_answer = True
typing_mode = False
for trigger in bot_trigger_front:
if message.text.lower().startswith(trigger.lower()):
send_answer = True
typing_mode = False
if send_answer == False:
# Если сообщение отвечает на другое сообщение, то проверяем наличие слова спасибо
if message.reply_to_message is not None:
if message.text.startswith("Спасибо"):
target_id = message.reply_to_message.from_user.id
target_name = cursor.execute("SELECT user_name FROM user_list WHERE user_id = ?", (target_id,)).fetchone()[0]
cursor.execute("UPDATE user_list SET user_rep = user_rep + 1 WHERE user_id = ?", (target_id,))
database.commit()
await message.reply(
f"Спасибо, [{target_name}](tg://user?id={str(target_id)}) за ваш ответ!",
parse_mode="Markdown")
if send_answer:
if typing_mode is False:
your_id = message.from_id
your_name = message.from_user.username
temp_msg = await message.reply(
f"[{your_name}](tg://user?id={str(your_id)}), Подожди немного и я обязательно отвечу тебе!",
parse_mode="Markdown")
# Пишем что бот печатает
await bot.send_chat_action(message.chat.id, "typing")
# Получаем ответ от OpenAI
response = openai_message_processing(message.message_id)
if response is None:
bot_message_id = await message.reply("Я не понял тебя, попробуй перефразировать")
if typing_mode is False:
asyncio.create_task(delete_message(temp_msg, 0))
# заносим сообщение в базу данных в качестве message_id пишем id сообщения которое отправил бот
cursor.execute("INSERT INTO message_list VALUES (?, ?, ?, ?)",
(bot_message_id, "Я не понял тебя, попробуй перефразировать", 0, message.message_id))
else:
bot_message_id = await message.reply(response['choices'][0]['message']['content'], parse_mode="markdown")
if typing_mode is False:
asyncio.create_task(delete_message(temp_msg, 0))
# заносим сообщение в базу данных в качестве message_id мы пишем id сообщения в bot_message_id
cursor.execute("INSERT INTO message_list VALUES (?, ?, ?, ?)",
(bot_message_id.message_id, response['choices'][0]['message']['content'], 0, message.message_id))
# очищаем базу данных от старых сообщений
cursor.execute("DELETE FROM message_list WHERE message_id < ?", (bot_message_id.message_id - DB_message_limit,))
database.commit()
if __name__ == '__main__':
executor.start_polling(dp, skip_updates=True)

View File

@ -1,43 +0,0 @@
[Telegram]
token=****
admin_password="Test_pass"
# Пока не используется
# Массивы заполнять через запятую. Пример: test|test2 |test3,
bot_trigger_front=
# Живой пример: Арма |Армат |Арма, |
bot_trigger_all=
# Живой пример: @arma_ai_bot |помогите |
private_answer=
# Живой пример: Я не понимаю тебя, но я могу поговорить с тобой в группе [название группы](https://t.me/) и ещё в некоторых других группах
reply_ignore=0
# По умолчанию: 0
# Содержит в себе все id топиков чата для чатов с форумным типом, если не заполнить контекст бота СЛОМАЕТСЯ!
# Пример заполнения для одного из чатов: 0| 643885| 476959| 1| 476977| 633077| 630664| 476966| 634567
start_answer= Привет! Я OCAB, открытый чат бот с ИИ для вашего чата!
top10_answer= Вот топ 10 самых разговорчивых пользователей чата:
[Roles]
user= Пользователь
moderator= Модератор
admin= Администратор
default_for_admin= 2
[Openai]
api_key=****
chat_model=gpt-3.5-turbo
story_model=
# Тут должен быть текст истории бота, но я его не показываю)))
max_token_count=4000
# максимальное количество токенов в сообщении
min_token_for_answer=800
# минимальное количество токенов в сообщении ответа бота (чем больше, тем более длинный ответ ГАРАНТИРОВАН)
[DataBase]
message_limit=3000
# Максимальное количество сообщений в базе данных
[AI_Dungeon]
use=openai
# "openai" or "YaGPT" Пока не используется

View File

@ -1,23 +0,0 @@
TELEGRAM:
TOKEN: 1234567890ABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890
PRIVATE_ANSWER:
BOT:
ANSWER:
HELP_ANSWER:
START_ANSWER:
ABOUT_ANSWER:
ROLES:
ADMIN:
MODERATOR:
USER:
DEFAULT_FOR_ADMIN:
DATABASE:
MESSAGE_LIMIT:
GPT_MODULE:
TRIGGER_FRONT:
TRIGGER_ALL:
REPLY_IGNORE:

View File

@ -1,5 +0,0 @@
from aiogram.filters import BaseFilter
from aiogram.fsm.context import FSMContext
from aiogram.types import Message
# You filters

View File

@ -1,4 +0,0 @@
from aiogram.fsm.context import FSMContext
from aiogram.types import Message
# Your handlers

View File

@ -1,3 +0,0 @@
from aiogram.types import KeyboardButton, ReplyKeyboardMarkup
# Your keyboards

View File

@ -1,3 +0,0 @@
from aiogram.fsm.context import FSMContext
# Your funcs

View File

@ -1,5 +0,0 @@
from aiogram.fsm.state import State, StatesGroup
class MyState(StatesGroup):
state1 = State()

View File

@ -1 +0,0 @@
# Here text of buttons

View File

@ -1,14 +0,0 @@
from aiogram import Dispatcher
def registry_middlewares(dp: Dispatcher):
pass
def registry_handlers(dp: Dispatcher):
pass
def registry(dp: Dispatcher):
registry_middlewares(dp)
registry_handlers(dp)

View File

@ -1,62 +0,0 @@
import openai
import configparser
from dataclasses import dataclass
@dataclass
class App:
telegram_token: str
openai_key: str
bot_trigger_front: list[str]
bot_trigger_all: list[str]
@dataclass
class Database:
path: str
message_limit: int
@dataclass
class Config:
app: App
database: Database
def _clean_bot_trigger(bot_trigger_front, bot_trigger_all):
"""удаление лишних элементов массивов"""
bot_trigger_front.remove('')
bot_trigger_all.remove('')
def get_settings():
# Импорт переменных из файла .ini
config = configparser.ConfigParser()
config.read('core/config.ini')
bot_trigger_front = (config['Telegram']['bot_trigger_front']).split('|')
bot_trigger_all = (config['Telegram']['bot_trigger_all']).split('|')
_clean_bot_trigger(bot_trigger_front, bot_trigger_all)
return Config(
app=App(
telegram_token=config['Telegram']['token'],
openai_key=config['Openai']['api_key'],
bot_trigger_front = bot_trigger_front,
bot_trigger_all = bot_trigger_all,
),
database=Database(
path="core/database/data.db",
message_limit=int(config['DataBase']['message_limit']),
),
)
def _openai_init(settings: Config):
openai.api_key = settings.app.openai_key
settings = get_settings()
_openai_init(settings)

View File

@ -1,28 +0,0 @@
from asyncio import run
from aiogram import Bot, Dispatcher
from core.utils.registry import registry
from core.utils.settings import settings
async def start(bot: Bot, dp: Dispatcher):
try:
await dp.start_polling(bot)
finally:
await bot.session.close()
# close db connection here
def main():
bot = Bot(token=settings.app.telegram_token)
dp = Dispatcher()
registry(dp)
run(start(bot, dp))
if __name__ == "__main__":
main()

View File

@ -1,43 +0,0 @@
[Telegram]
token=****
admin_password="Test_pass"
# Пока не используется
# Массивы заполнять через запятую. Пример: test|test2 |test3,
bot_trigger_front=
# Живой пример: Арма |Армат |Арма, |
bot_trigger_all=
# Живой пример: @arma_ai_bot |помогите |
private_answer=
# Живой пример: Я не понимаю тебя, но я могу поговорить с тобой в группе [название группы](https://t.me/) и ещё в некоторых других группах
reply_ignore=0
# По умолчанию: 0
# Содержит в себе все id топиков чата для чатов с форумным типом, если не заполнить контекст бота СЛОМАЕТСЯ!
# Пример заполнения для одного из чатов: 0| 643885| 476959| 1| 476977| 633077| 630664| 476966| 634567
start_answer= Привет! Я OCAB, открытый чат бот с ИИ для вашего чата!
top10_answer= Вот топ 10 самых разговорчивых пользователей чата:
[Roles]
user= Пользователь
moderator= Модератор
admin= Администратор
default_for_admin= 2
[Openai]
api_key=****
chat_model=gpt-3.5-turbo
story_model=
# Тут должен быть текст истории бота, но я его не показываю)))
max_token_count=4000
# максимальное количество токенов в сообщении
min_token_for_answer=800
# минимальное количество токенов в сообщении ответа бота (чем больше, тем более длинный ответ ГАРАНТИРОВАН)
[DataBase]
message_limit=3000
# Максимальное количество сообщений в базе данных
[AI_Dungeon]
use=openai
# "openai" or "YaGPT" Пока не используется

View File

@ -1,23 +0,0 @@
TELEGRAM:
TOKEN: 1234567890ABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890
PRIVATE_ANSWER:
BOT:
ANSWER:
HELP_ANSWER:
START_ANSWER:
ABOUT_ANSWER:
ROLES:
ADMIN:
MODERATOR:
USER:
DEFAULT_FOR_ADMIN:
DATABASE:
MESSAGE_LIMIT:
GPT_MODULE:
TRIGGER_FRONT:
TRIGGER_ALL:
REPLY_IGNORE:

View File

@ -1 +0,0 @@

View File

@ -1,21 +0,0 @@
import os
from yaml import load, Loader
class Config:
def __init__(self):
self.bot_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.getcwd())))
self.config_dir = os.path.join(self.bot_dir, 'config.yaml')
def get_config(self):
return load(open(self.config_dir), Loader=Loader)
def get_bot_token(self):
return self.get_config()['BOT']['TOKEN']
def get_roles(self):
return self.get_config()['ROLES']
def get_bot_answers(self):
return self.get_config()['BOT']['ANSWER']

View File

@ -1,131 +0,0 @@
import os
import sqlite3
class DataBase:
def __init__(self):
self.bot_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.getcwd())))
self.db = sqlite3.connect(os.path.join(self.bot_dir, 'DataBase/OCAB_DB.db'))
self.cursor = self.db.cursor()
# Проверки наличия таблиц в базе данных
def check_db(self):
self.check_chatdb()
self.check_userdb()
self.check_messagedb()
def check_chatdb(self):
self.cursor.execute("""SELECT name FROM sqlite_master WHERE type='table' AND name='chat_list'""")
if self.cursor.fetchone() is None:
self.create_chatdb()
def check_userdb(self):
self.cursor.execute("""SELECT name FROM sqlite_master WHERE type='table' AND name='user_list'""")
if self.cursor.fetchone() is None:
self.create_userdb()
def check_messagedb(self):
self.cursor.execute("""SELECT name FROM sqlite_master WHERE type='table' AND name='message_list'""")
if self.cursor.fetchone() is None:
self.create_messagedb()
# Создание таблиц в базе данных
def create_chatdb(self):
self.cursor.execute("""CREATE TABLE IF NOT EXISTS chat_list (
chat_id INTEGER PRIMARY KEY,
chat_role INTEGER NOT NULL,
chat_stats INTEGER NOT NULL,
chat_federation INTEGER,
)""")
self.db.commit()
def create_userdb(self):
self.cursor.execute("""CREATE TABLE IF NOT EXISTS user_list (
user_id INTEGER PRIMARY KEY,
user_name TEXT NOT NULL,
user_tag TEXT,
user_role INTEGER,
user_stats INTEGER
user_rep INTEGER
)""")
self.db.commit()
def create_messagedb(self):
self.cursor.execute("""CREATE TABLE IF NOT EXISTS message_list (
message_id INTEGER PRIMARY KEY,
message_text TEXT NOT NULL,
message_id_sender INTEGER NOT NULL,
answer_to_id INTEGER
)""")
self.db.commit()
async def add_chat(self, chat_id, chat_role, chat_stats=0, chat_federation=0):
self.cursor.execute("""INSERT INTO chat_list VALUES (?, ?, ?, ?)""",
(chat_id, chat_role, chat_stats, chat_federation))
async def add_user(self, user_id, user_name, user_tag=None, user_role=0, user_stats=0, user_rep=0):
self.cursor.execute("""INSERT INTO user_list VALUES (?, ?, ?, ?, ?, ?)""",
(user_id, user_name, user_tag, user_role, user_stats, user_rep))
self.db.commit()
async def add_message(self, message_id, message_text, message_sender, answer_id):
self.cursor.execute("""INSERT INTO message_list VALUES (?, ?, ?, ?)""",
(message_id, message_text, message_sender, answer_id))
self.db.commit()
async def get_chat(self, chat_id):
self.cursor.execute("""SELECT * FROM chat_list WHERE chat_id=?""", (chat_id,))
return self.cursor.fetchone()
async def get_user(self, user_id):
self.cursor.execute("""SELECT * FROM user_list WHERE user_id=?""", (user_id,))
return self.cursor.fetchone()
async def get_user_role(self, user_id):
self.cursor.execute("""SELECT user_role FROM user_list WHERE user_id=?""", (user_id,))
return self.cursor.fetchone()
async def get_message(self, message_id):
self.cursor.execute("""SELECT * FROM message_list WHERE message_id=?""", (message_id,))
return self.cursor.fetchone()
async def change_user_name(self, user_id, new_user_name):
self.cursor.execute("""UPDATE user_list SET user_name=? WHERE user_id=?""", (new_user_name, user_id))
self.db.commit()
async def change_user_role(self, user_id, new_user_role):
self.cursor.execute("""UPDATE user_list SET user_role=? WHERE user_id=?""", (new_user_role, user_id))
self.db.commit()
async def change_user_stats(self, user_id, new_user_stats):
self.cursor.execute("""UPDATE user_list SET user_stats=? WHERE user_id=?""", (new_user_stats, user_id))
self.db.commit()
async def change_user_rep(self, user_id, new_user_rep):
self.cursor.execute("""UPDATE user_list SET user_rep=? WHERE user_id=?""", (new_user_rep, user_id))
self.db.commit()
async def change_chat_role(self, chat_id, new_chat_role):
self.cursor.execute("""UPDATE chat_list SET chat_role=? WHERE chat_id=?""", (new_chat_role, chat_id))
self.db.commit()
async def change_chat_stats(self, chat_id, new_chat_stats):
self.cursor.execute("""UPDATE chat_list SET chat_stats=? WHERE chat_id=?""", (new_chat_stats, chat_id))
self.db.commit()
async def change_chat_federation(self, chat_id, new_chat_federation):
self.cursor.execute("""UPDATE chat_list SET chat_federation=? WHERE chat_id=?""",
(new_chat_federation, chat_id))
self.db.commit()
async def update_user_stats(self):
self.cursor.execute("""UPDATE user_list SET user_stats=user_stats+1""")
self.db.commit()
async def update_user_rep(self):
self.cursor.execute("""UPDATE user_list SET user_rep=user_rep+1""")
self.db.commit()
async def update_chat_stats(self):
self.cursor.execute("""UPDATE chat_list SET chat_stats=chat_stats+1""")
self.db.commit()

View File

@ -1,33 +0,0 @@
from Config import Config
from DataBase import DataBase
class Roles:
def __init__(self):
self.DB = DataBase()
self.Config = Config()
self.user_role_name = self.Config.get_roles()['USER']
self.moderator_role_name = self.Config.get_roles()['MODERATOR']
self.admin_role_name = self.Config.get_roles()['ADMIN']
async def check_admin_permission(self, user_id):
if await self.DB.get_user_role(user_id) == 2:
return True
else:
return False
async def check_moderator_permission(self, user_id):
if await self.DB.get_user_role(user_id) == 1:
return True
else:
return False
async def get_role_name(self, role_number):
if role_number == 0:
return self.user_role_name
elif role_number == 1:
return self.moderator_role_name
elif role_number == 2:
return self.admin_role_name
else:
return None