-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathmysql.py
255 lines (223 loc) · 8.51 KB
/
mysql.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
# made by realistikdash
import asyncio
from typing import Union, Tuple
import aiomysql
import pymysql
from config import config # type: ignore
"""
Entire module is deprecated and should be replaced with Tortoise ORM.
"""
class MySQLPool:
"""The wrapper around the `aiomysql` module. It allows for
the simple usage of the database within many circumstances while
respecting the one connection at a time rule.
Creating a wrapper as such allows us to quickly change the module used for
MySQL without rewriting a large portion of the codebase.
Classmethods:
connect: Creates a connection to the MySQL server and establishes a
pool.
"""
def __init__(self) -> None:
"""Creates the default values for the connector. Use the `connect`
classmethod instead."""
self._pool: aiomysql.Pool = None
self.last_row_id: int = 0
self._loop: asyncio.AbstractEventLoop
@classmethod
async def connect_with_config(cls, loop: asyncio.AbstractEventLoop = None) -> "MySQLPool":
"""Creates a connection to the MySQL server and establishes a pool.
Args:
config_name (str): The name of the configuration file to be used.
loop (AbstractEventLoop): The event loop that should be used
for the MySQL pool.
"""
if not loop:
loop = asyncio.get_event_loop()
# Create the object.
obj = cls()
# Connect to the MySQL server.
await obj.connect_local(
config["db_host"],
config["db_user"],
config["db_password"],
config["db_database"],
config["db_port"],
loop,
)
return obj
@classmethod
async def connect(
cls,
host: str,
user: str,
password: str,
database: str,
port: int = 3306,
loop: asyncio.AbstractEventLoop = None,
) -> "MySQLPool":
"""Creates the MySQL connecton pool. Handles authentication and the
configuration of the object.
Note:
Calling this function allows for the usage of all functions within
this class, such as `fetchone`.
Args:
host (str): The hostname of the MySQL server you would like to
connect. Usually `localhost`.
user (str): The username of the MySQL user you would like to log
into.
password (str): The password of the MySQL user you would like to
log into.
database (str): The database you would like to interact with.
port (int): The port at which the MySQL server is located at.
Default set to 3306.
loop (AbstractEventLoop): The event loop that should be used
for the MySQL pool. If not set or equal to None, a new
one will be created.
"""
cls = cls()
await cls.connect_local(
host,
user,
password,
database,
port,
asyncio.get_event_loop() if loop is None else loop,
)
return cls
async def connect_local(
self,
host: str,
user: str,
password: str,
database: str,
port: int = 3306,
loop: asyncio.AbstractEventLoop = None,
) -> None:
"""Connects the current object to the pool without creating a new
object.
Args:
host (str): The hostname of the MySQL server you would like to
connect. Usually `localhost`.
user (str): The username of the MySQL user you would like to log
into.
password (str): The password of the MySQL user you would like to
log into.
database (str): The database you would like to interact with.
port (int): The port at which the MySQL server is located at.
Default set to 3306.
loop (AbstractEventLoop): The event loop that should be used
for the MySQL pool.
"""
self._pool = await aiomysql.create_pool(
host=host, port=port, user=user, password=password, db=database, loop=loop
)
async def fetchone(self, query: str, args: tuple = ()) -> Union[tuple, None]:
"""Executes `query` in MySQL and returns the first result.
Args:
query (str): The MySQL query to be executed.
args (tuple, list): The list or tuple of arguments to be safely
formatted into the query.
Returns:
Tuple in the arrangement specified within the query if a result is
found.
None if no results are found.
"""
if not self._pool:
loop = asyncio.get_event_loop()
await self.connect_local(
config["db_host"],
config["db_user"],
config["db_password"],
config["db_database"],
config["db_port"],
loop,
)
# Fetch a connection from the pool.
async with self._pool.acquire() as pool:
# Grab a cur.
async with pool.cursor() as cur:
# Execute and fetchone.
async def execute() -> None:
try:
await cur.execute(query, args)
except pymysql.InternalError:
await execute()
await execute()
# Immidiately return it
return await cur.fetchone()
async def fetchall(self, query: str, args: tuple = ()) -> Tuple[tuple]:
"""Executes `query` in MySQL and returns all of the found results.
Args:
query (str): The MySQL query to be executed.
args (tuple, list): The list or tuple of arguments to be safely
formatted into the query.
Returns:
Tuple of tuples with the results found.
"""
if not self._pool:
loop = asyncio.get_event_loop()
await self.connect_local(
config["db_host"],
config["db_user"],
config["db_password"],
config["db_database"],
config["db_port"],
loop,
)
# Fetch a connection from the pool.
async with self._pool.acquire() as pool:
# Grab a cur.
async with pool.cursor() as cur:
# Execute and fetchall.
async def execute() -> None:
try:
await cur.execute(query, args)
except pymysql.InternalError:
await execute()
await execute()
# Immidiately return it
return await cur.fetchall()
async def execute(self, query: str, args: tuple = ()) -> int:
"""Simply executes `query` and commits all changes made by it to
database.
Note:
Please don't use this function for select queries. For them rather
use `fetchall` and `fetchone`
Args:
query (str): The MySQL query to be executed.
args (tuple, list): The list or tuple of arguments to be safely
formatted into the query.
Returns:
The ID of the last row affected.
"""
if not self._pool:
loop = asyncio.get_event_loop()
await self.connect_local(
config["db_host"],
config["db_user"],
config["db_password"],
config["db_database"],
config["db_port"],
loop,
)
# Fetch a connection from the pool.
async with self._pool.acquire() as pool:
# Grab a cur.
async with pool.cursor() as cur:
# Execute it.
async def execute() -> None:
try:
await cur.execute(query, args)
except pymysql.InternalError:
await execute()
await execute()
# Set `last_row_id`
self.last_row_id = cur.lastrowid
# Commit it.
await pool.commit()
# Return it
return cur
async def close(self) -> None:
"""Closes the connection to the MySQL server."""
self._pool.close()
await self._pool.wait_closed()