-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathbot.py
225 lines (187 loc) · 8.15 KB
/
bot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
import openai
import os
import re
import trafilatura
from PyPDF2 import PdfReader
from concurrent.futures import ThreadPoolExecutor
from tqdm import tqdm
from telegram.ext import CommandHandler, MessageHandler, filters, ApplicationBuilder
from youtube_transcript_api import YouTubeTranscriptApi
telegram_token = ""
apikey = ""
model = ""
lang = ""
chunk_size= 1500
def split_user_input(text):
# Split the input text into paragraphs
paragraphs = text.split('\n')
# Remove empty paragraphs and trim whitespace
paragraphs = [paragraph.strip() for paragraph in paragraphs if paragraph.strip()]
return paragraphs
def scrape_text_from_url(url):
"""
Парсим содержимое с URL-адреса
"""
try:
downloaded = trafilatura.fetch_url(url)
text = trafilatura.extract(downloaded, include_formatting=True)
if text is None:
return []
text_chunks = text.split("\n")
article_content = [text for text in text_chunks if text]
except Exception as e:
print(f"Ошибка: {e}")
return article_content
def summarize(text_array):
"""
Резюмируйте текст с помощью GPT API
"""
def create_chunks(paragraphs):
chunks = []
chunk = ''
for paragraph in paragraphs:
if len(chunk) + len(paragraph) < chunk_size:
chunk += paragraph + ' '
else:
chunks.append(chunk.strip())
chunk = paragraph + ' '
if chunk:
chunks.append(chunk.strip())
return chunks
try:
text_chunks = create_chunks(text_array)
text_chunks = [chunk for chunk in text_chunks if chunk] # Remove empty chunks
# Call the GPT API in parallel to summarize the text chunks
summaries = []
with ThreadPoolExecutor() as executor:
futures = [executor.submit(call_gpt_api, f"Резюмируйте следующий текст, используя вдвое меньшее количество слов:\n{chunk}") for chunk in text_chunks]
for future in tqdm(futures, total=len(text_chunks), desc="Summarizing"):
summaries.append(future.result())
if len(summaries) <= 5:
summary = ' '.join(summaries)
with tqdm(total=1, desc="Final summarization") as progress_bar:
final_summary = call_gpt_api(f"Пожалуйста, обобщите следующий текст в виде списка в формате markdown в {lang}, обеспечение того, чтобы терминология оставалась непереведенной:\n{summary}")
progress_bar.update(1)
return final_summary
else:
return summarize(summaries)
except Exception as e:
print(f"Error: {e}")
return "Неизвестная ошибка! Пожалуйста, свяжитесь с разработчиком @whitehodok."
def extract_youtube_transcript(youtube_url):
try:
video_id = youtube_url.split('v=')[1].split('&')[0]
if video_id is None:
return "no transcript"
transcript_list = YouTubeTranscriptApi.list_transcripts(video_id)
transcript = transcript_list.find_transcript(['en', 'ja', 'ko', 'de', 'fr', 'ru', 'zh-TW', 'zh-CN'])
transcript_text = ' '.join([item['text'] for item in transcript.fetch()])
return transcript_text
except Exception as e:
print(f"Error: {e}")
return "no transcript"
def retrieve_yt_transcript_from_url(youtube_url):
output = extract_youtube_transcript(youtube_url)
if output == 'no transcript':
raise ValueError("Тут нет русских субтитров для видео!(это не ваша вина)")
# Split output into an array based on the end of the sentence (like a dot),
# but each chunk should be smaller than chunk_size
output_sentences = output.split(' ')
output_chunks = []
current_chunk = ""
for sentence in output_sentences:
if len(current_chunk) + len(sentence) + 1 <= chunk_size:
current_chunk += sentence + ' '
else:
output_chunks.append(current_chunk.strip())
current_chunk = sentence + ' '
if current_chunk:
output_chunks.append(current_chunk.strip())
return output_chunks
def call_gpt_api(prompt):
"""
Вызов GPT API для подведения итогов текста или предоставления основных выводов
"""
try:
openai.api_key = apikey
response = openai.ChatCompletion.create(
model=model,
messages=[{"role": "user", "content": prompt}],
)
message = response.choices[0].message.content.strip()
return message
except Exception as e:
print(f"Error: {e}")
return ""
async def start(update, context):
try:
await context.bot.send_message(chat_id=update.effective_chat.id, text="Я могу обобщить для вас текст, URL, PDF и видео на YouTube.")
except Exception as e:
print(f"Error: {e}")
async def help(update, context):
try:
await context.bot.send_message(chat_id=update.effective_chat.id, text="Сообщайте о багах здесь. 👉 https://github.com/whitehodok/gigasum")
except Exception as e:
print(f"Error: {e}")
async def handle_summarize(update, context):
chat_id = update.effective_chat.id
message_id = update.message.message_id
try:
user_input = update.message.text
print(user_input)
youtube_pattern = re.compile(r"https?://(www\.|m\.)?(youtube\.com|youtu\.be)/")
url_pattern = re.compile(r"https?://")
if youtube_pattern.match(user_input):
text_array = retrieve_yt_transcript_from_url(user_input)
elif url_pattern.match(user_input):
text_array = scrape_text_from_url(user_input)
else:
text_array = split_user_input(user_input)
print(text_array)
if not text_array:
raise ValueError("Не найдено содержимого для конспектирования")
await context.bot.send_chat_action(chat_id=chat_id, action="TYPING")
summary = summarize(text_array)
await context.bot.send_message(chat_id=chat_id, text=f"{summary}", reply_to_message_id=message_id)
except Exception as e:
print(f"Error: {e}")
await context.bot.send_message(chat_id=chat_id, text=str(e))
async def handle_file(update, context):
chat_id = update.effective_chat.id
message_id = update.message.message_id
file_path = f"{update.message.document.file_unique_id}.pdf"
try:
file = await context.bot.get_file(update.message.document)
await file.download_to_drive(file_path)
text_array = []
reader = PdfReader(file_path)
for page_num in range(len(reader.pages)):
page = reader.pages[page_num]
text = page.extract_text()
text_array.append(text)
print(file_path)
await context.bot.send_chat_action(chat_id=chat_id, action="TYPING")
summary = summarize(text_array)
await context.bot.send_message(chat_id=chat_id, text=f"{summary}", reply_to_message_id=message_id)
except Exception as e:
print(f"Error: {e}")
try:
os.remove(file_path)
except Exception as e:
print(f"Error: {e}")
def main():
try:
application = ApplicationBuilder().token(telegram_token).build()
start_handler = CommandHandler('start', start)
help_handler = CommandHandler('help', help)
summarize_handler = MessageHandler(filters.TEXT & ~filters.COMMAND, handle_summarize)
file_handler = MessageHandler(filters.Document.PDF, handle_file)
application.add_handler(file_handler)
application.add_handler(start_handler)
application.add_handler(help_handler)
application.add_handler(summarize_handler)
application.run_polling()
except Exception as e:
print(e)
if __name__ == '__main__':
main()