-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathdbseeker.py
170 lines (125 loc) · 6.07 KB
/
dbseeker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
import mysql.connector
from tabulate import tabulate
import argparse
import time
SEARCH_PADDING_CHARS = 25
PREPEND_APPEND_SEARCH_CHARS = "..."
def connect_to_mysql_server(host, port, user, password: str = None):
connection = mysql.connector.connect(user=user, host=host, port=port, password=password)
cursor = connection.cursor()
return connection, cursor
def get_databases(cursor, fallback_databases: list[str] = None):
if fallback_databases:
return fallback_databases
cursor.execute("SHOW DATABASES")
databases = [database[0] for database in cursor.fetchall()]
return databases
def filter_databases(databases, blacklisted_databases):
filtered_databases = [db for db in databases if db not in blacklisted_databases]
return filtered_databases
def row_map(row, search_term):
_row = []
for item in row:
_item = str(item)
search_begin_str = _item.lower().find(search_term.lower())
if search_begin_str > -1:
search_after_str = search_begin_str + len(search_term)
item_begin_str = item_after_str = SEARCH_PADDING_CHARS
if search_begin_str < item_begin_str:
item_begin_str = search_begin_str
if (len(_item) - search_after_str) < item_after_str:
item_after_str = len(_item) - search_after_str
padding_left = search_begin_str - item_begin_str
padding_right = search_after_str + item_after_str
prepend = append = ""
if padding_left > 0:
prepend = PREPEND_APPEND_SEARCH_CHARS
if padding_right < len(_item):
append = PREPEND_APPEND_SEARCH_CHARS
_item = '\033[32m' + prepend + _item[padding_left:padding_right] + append + '\033[0m'
else:
if len(_item) > 5:
_item = _item[0:5] + PREPEND_APPEND_SEARCH_CHARS
_row.append(_item)
return _row
def search_tables(databases, cursor, search_term):
total_row_count = 0
for database in databases:
cursor.execute(f"USE `{database}`")
cursor.execute("SHOW TABLES")
tables = [table[0] for table in cursor.fetchall()]
table_count = len(tables)
if table_count == 0:
print(f"No tables found in {database}.")
continue
for table_name in tables:
try:
cursor.execute(f"SHOW COLUMNS FROM `{table_name}`")
columns = [column[0] for column in cursor.fetchall()]
total_columns = len(columns)
query = f"SELECT * FROM `{table_name}` WHERE 1=1 AND "
for i, column_name in enumerate(columns):
query += f"`{column_name}` LIKE '%{search_term}%' "
if i + 1 < total_columns:
query += "OR "
cursor.execute(query)
rows = [row_map(list(row), search_term) for row in cursor.fetchall()]
row_count = len(rows)
if row_count == 0:
continue
total_row_count += row_count
print(f"Found {row_count} rows for {table_name} (DB: {database}):\n")
print(tabulate(rows, headers=columns, tablefmt="double_grid"))
except mysql.connector.errors.ProgrammingError as error:
print(f"Programming Error: {error}\n")
continue
except Exception as error:
print(f"General Error: {error}\n")
continue
return total_row_count
def main():
parser = argparse.ArgumentParser(description="Search for a term in a database")
parser.add_argument("-a", "--address", metavar="address", type=str, help="Enter the host address", required=True)
parser.add_argument("-P", "--port", metavar="port", type=str, help="Enter the port", required=True)
parser.add_argument("-u", "--user", metavar="user", type=str, help="Enter the user", required=True)
parser.add_argument("-p", "--password", metavar="password", type=str, help="Enter the password")
group = parser.add_mutually_exclusive_group()
group.add_argument("-d", "--database", metavar="database", type=str, help="Enter databases you want "
"to search in")
group.add_argument("-bl", "--blacklist", metavar="blacklist", type=str, help="Enter databases you want"
" to be excluded, separated by commas")
parser.add_argument("-s", "--search", metavar="search", type=str, help="Enter the search term", required=True)
args = parser.parse_args()
host = args.address
port = args.port
user = args.user
password = args.password
search_term = args.search
user_databases = args.database
user_blacklisted_databases = args.blacklist
start = time.time()
connection, cursor = connect_to_mysql_server(host, port, user, password)
databases = get_databases(cursor, [db.strip() for db in user_databases.split(",")] if user_databases else None)
if len(databases) == 0:
print("No databases found.")
exit(0)
blacklisted_databases = ["mysql", "information_schema", "performance_schema", "sys"]
blacklisted_databases.extend([db.strip() for db in user_blacklisted_databases.split(',')]
if user_blacklisted_databases else [])
filtered_databases = filter_databases(databases, blacklisted_databases)
if len(filtered_databases) == 0:
print("No databases to search on.")
exit(0)
print("Databases to search on:", "\n", tabulate([filtered_databases]), "\n", "searching...")
if len(search_term) < 3:
print("Search term should be at least 3 characters long.")
exit(0)
total_row_count = search_tables(filtered_databases, cursor, search_term)
cursor.close()
connection.close()
end = time.time()
time_execution = end - start
print("Execution time:", time_execution, "seconds")
print("Total rows found:", total_row_count)
if __name__ == "__main__":
main()