-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtg-scraper.py
201 lines (147 loc) · 7.51 KB
/
tg-scraper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
from datetime import date
from datetime import datetime
import time
import random
from sys import exit
import os
import snscrape.modules.telegram as tg
from pyarrow import Table
from pyarrow.parquet import write_table
# Check if 'data' folder exists, else create one
if not os.path.exists('data'):
os.makedirs('data')
# Channel name variable
CHANNEL_NAME = str(input("\n🔸 Enter a channel name to scrape, use the XXXX part in 'https://web.telegram.org/k/#@XXXX': ").strip()) # use the XXXX part in 'https://web.telegram.org/k/#@XXXX'
# Check for input
if len(CHANNEL_NAME.strip()) == 0 :
exit('\n🔴 A channel name was not provided. Exiting program.')
# Default date values
_default_start = f"{datetime.today().year}-01-01"
_default_finish = f"{datetime.today().year}-12-31"
# Lower date boundary variable. Posts before it are not included
START_DATE = str(input("🔸 Enter the first date to begin scraping from, in YYYY-MM-DD format (beginning of the current year by default): ").strip() or _default_start) # year, month, day
# Upper date boundary variable. Posts after it are not included
FINISH_DATE = str(input("🔸 Enter the last date to scrape, in YYYY-MM-DD format (end of the current year by default): ").strip() or _default_finish) # year, month, day
# Include posts contents in the output file if 'y', else mask posts with '#####'
VERBOSE = str(input("🔸 Do you want to include posts contents in the output file? y / n ('yes' by default): ").strip() or 'y')
# Maximum delay between requests
MAX_SLEEP = 0.1 # seconds
def validate_choice():
"""
This function validates user input to confirm their choice before proceeding with scraping posts.
Returns:
None
"""
print(f"\nYou are about to scrape all {CHANNEL_NAME} posts from {START_DATE} to {FINISH_DATE}.")
choice = str(input('🔸 Are you sure you want to continue? y / n: ')).lower()
if choice == 'n':
exit('Exiting program')
elif choice == 'y':
pass
else:
exit('\n🔴 Received an invalid response. Exiting program.')
def scrape_channel(channel_name=CHANNEL_NAME, start_date=START_DATE, finish_date=FINISH_DATE, max_sleep=MAX_SLEEP, verbose=VERBOSE):
"""
This function scrapes a specified Telegram channel for posts.
Parameters:
- channel_name (str): The name of the Telegram channel to scrape.
The default value is None.
- start_date (str): The date from which to start scraping posts.
The date should be in the format 'YYYY-MM-DD'.
- finish_date (str): The date up to which (including it) the posts will be scraped.
The date should be in the format 'YYYY-MM-DD'.
- max_sleep (int): The maximum number of seconds to wait between requests to avoid
overloading the server or getting blocked. The function will wait for a random
number of seconds between 0 and max_sleep before making each request.
The default value is 0.2.
- verbose (bool): If False, the function will substitute post content with '#####'.
The default value is True.
Returns:
- This function returns a list of posts from the specified Telegram channel.
Each message is represented as a dictionary with keys for different attributes
of the message (post_id, date, content).
"""
# Display target channel name
print(f"\n🧲 Target Telegram channel >>> '{channel_name}'\n")
# Create a Telegram channel scraper
channel = tg.TelegramChannelScraper(channel_name)
# Empty list will be filled with dictionaries
raw = []
# Convert date string to datetime object
start_datetime_object = datetime.strptime(start_date, '%Y-%m-%d')
finish_datetime_object = datetime.strptime(finish_date, '%Y-%m-%d')
# Start the timer
start_time = time.time()
elapsed_time_list = []
# Iterate over posts
for i, post in enumerate(channel.get_items()):
# Check the post against the dates thresholds
if post.date.date() >= start_datetime_object.date() and post.date.date() <= finish_datetime_object.date():
# Display contents of the first post
if i == 0:
print('🔸 Output sample:')
print(f"◻️ Post #{post.url.split('/')[-1]}:")
print(f"◻️ URL: {post.url}")
print(f"◻️ Date: {post.date}")
print(f"◻️ Content: {post.content[:50]}...")
print(f"\n⏰ Scraping posts between {start_date} and {finish_date} with random delay up to {max_sleep} seconds...\n")
# Mask post contents in the output file if False (True by default)
if verbose == 'y':
content = post.content
elif verbose == 'n':
content = '#####'
else:
exit('\n🔴 Received an invalid response. Exiting program.')
# Append posts to the list if they satisfy the conditions
raw.append({
'post_id': post.url.split('/')[-1], # last string in a split url is basically a post number (id)
'post_url': post.url,
'date': post.date,
'content': content
})
# Print elapsed time at each iteration
check_time = time.time() - start_time
# Writes the time when the post was stored relative to the start time of the loop
elapsed_time_list.append(check_time)
# Introduce a random delay
time.sleep(random.uniform(0, max_sleep)) # Sleep for a random time between 0 and MAX_SLEEP seconds
else:
break
# Total loop time
elapsed_time_total = time.time() - start_time
def post_download_speed():
"""
This function returns the approximate number of 'appended' posts per minute.
Returns:
int
"""
# Calculate differences between consecutive time intervals
differences = [(elapsed_time_list[i] - elapsed_time_list[i-1]) for i in range(1, len(elapsed_time_list))]
# Calculate mean sleep time
mean_sleep = sum(differences) / len(differences)
# Calculate approximate speed: 1min / mean_sleep = posts per minute
speed = 60 / mean_sleep
return int(speed)
print(f"✔️ Successfully scraped {len(raw)} posts from '{channel_name}' Telegram channel.")
print(f"🔹 Script took {elapsed_time_total:.0f} seconds to run.")
print(f"🔹 The approximate scraping speed was {post_download_speed()} posts per minute.")
return raw
def main():
"""
This function converts raw list of dictionaries to a
pyarrow table and then writes it to the compressed parquet file.
Additionaly, it creates a custom output file name based on the input parameters.
Returns:
None
"""
# Write scraped data into pyarrow table
table = Table.from_pylist(scrape_channel())
print(f"\n🔹 The dataset has {table.shape[0]} rows and {table.shape[1]} columns")
# Create an output file name
output_name = f"tg-posts-{CHANNEL_NAME}-{START_DATE}-{FINISH_DATE}.parquet.gzip"
# Export pyarrow table as a compressed parquet file
write_table(table, os.path.join('data', output_name))
print(f"\n🔽 Dataset saved in 'data' folder as '{output_name}'")
if __name__ == '__main__':
validate_choice()
main()