-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathmain.py
165 lines (139 loc) · 5.95 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
import requests
import random
import os
import io
import time
import sys
import subprocess
import json
import importlib
import inspect
from concurrent.futures import ThreadPoolExecutor, as_completed
from proxy_provider import ProxyProvider
from proxy_providers import *
from tqdm import tqdm
SPEEDTEST_URL = "http://212.183.159.230/5MB.zip"
def is_valid_proxy(proxy):
"""Check if the proxy is valid."""
return proxy.get("host") is not None and proxy.get("country") != "Russia"
def construct_proxy_string(proxy):
"""Construct a proxy string from the proxy dictionary."""
if proxy.get("username"):
return (
f'{proxy["username"]}:{proxy["password"]}@{proxy["host"]}:{proxy["port"]}'
)
return f'{proxy["host"]}:{proxy["port"]}'
def test_proxy(proxy):
"""Test the proxy by measuring the download time."""
proxy_str = construct_proxy_string(proxy)
start_time = time.perf_counter()
try:
response = requests.get(
SPEEDTEST_URL,
stream=True,
proxies={"http": f"http://{proxy_str}"},
timeout=5,
)
response.raise_for_status() # Ensure we raise an error for bad responses
total_length = response.headers.get("content-length")
if total_length is None or int(total_length) != 5242880:
return None
with io.BytesIO() as f:
download_time, _ = download_with_progress(
response, f, total_length, start_time
)
return {"time": download_time, **proxy} # Include original proxy info
except requests.RequestException:
return None
def download_with_progress(response, f, total_length, start_time):
"""Download content from the response with progress tracking."""
downloaded_bytes = 0
for chunk in response.iter_content(1024):
downloaded_bytes += len(chunk)
f.write(chunk)
done = int(30 * downloaded_bytes / int(total_length))
if done == 6:
break
if (
done > 3
and (downloaded_bytes // (time.perf_counter() - start_time) / 100000) < 1.0
):
return float("inf"), downloaded_bytes
return round(time.perf_counter() - start_time, 2), downloaded_bytes
def save_proxies_to_file(proxies, filename="proxy.json"):
"""Save the best proxies to a JSON file."""
with open(os.path.join(os.path.dirname(__file__), filename), "w") as f:
json.dump(proxies, f, indent=4)
def get_best_proxies(providers):
"""Return the top five proxies based on speed from all providers."""
all_proxies = []
proxies = None
for provider in providers:
try:
print(f"Fetching proxies from {provider.__class__.__name__}")
proxies = provider.fetch_proxies()
all_proxies.extend([proxy for proxy in proxies if is_valid_proxy(proxy)])
except Exception as e:
print(f"Failed to fetch proxies from {provider.__class__.__name__}: {e}")
best_proxies = []
with ThreadPoolExecutor(max_workers=2) as executor:
futures = {executor.submit(test_proxy, proxy): proxy for proxy in all_proxies}
for future in tqdm(as_completed(futures), total=len(futures), desc="Testing proxies", bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_noinv_fmt}]", unit=' proxies', unit_scale=True, ncols=80):
result = future.result()
if result is not None:
best_proxies.append(result)
return sorted(best_proxies, key=lambda x: x["time"])[:5]
def update_proxies():
"""Update the proxies list and save the best ones."""
providers = []
for filename in os.listdir(os.path.join(os.path.dirname(__file__), "proxy_providers")):
# Check if the file is a Python module
if filename.endswith(".py") and filename != "__init__.py":
module_name = filename[:-3] # Remove the '.py' suffix
module_path = f'{"proxy_providers"}.{module_name}'
module = importlib.import_module(module_path)
classes = inspect.getmembers(module, inspect.isclass)
providers.append(
[classs[-1]() for classs in classes if classs[0] != "ProxyProvider"][0]
)
best_proxies = get_best_proxies(providers)
save_proxies_to_file(best_proxies)
print("All done.")
def run_yt_dlp():
"""Run yt-dlp with a randomly selected proxy."""
while True:
try:
with open("proxy.json", "r") as f:
proxy = random.choice(json.load(f))
proxy_str = construct_proxy_string(proxy)
print(f"Using proxy from {proxy['city']}, {proxy['country']}")
if execute_yt_dlp_command(proxy_str):
os.remove("tempout")
break # Exit loop if command was successful
print("Got 'Sign in to confirm' error. Trying again with another proxy...")
except FileNotFoundError as e:
print("'proxy.json' not found. Starting proxy list update...")
update_proxies()
def execute_yt_dlp_command(proxy_str):
"""Execute the yt-dlp command with the given proxy."""
command = f"yt-dlp --color always --proxy http://{proxy_str} {' '.join([str(arg) for arg in sys.argv])} 2>&1 | tee tempout"
subprocess.run(command, shell=True)
with open("tempout", "r") as log_fl:
log_text = log_fl.read()
return "Sign in to" not in log_text and "403" not in log_text
def main():
"""Main function to handle script arguments and execute the appropriate command."""
try:
if "update" in sys.argv:
update_proxies()
elif len(sys.argv) < 2:
print(
"usage: main.py update | <yt-dlp args> \nScript for starting yt-dlp with best free proxy\nCommands:\n update Update best proxy"
)
else:
sys.argv.pop(0)
run_yt_dlp()
except KeyboardInterrupt:
print("Canceled by user")
if __name__ == "__main__":
main()