This repository has been archived by the owner on Jun 5, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdatabase.py
232 lines (185 loc) · 7.73 KB
/
database.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
from decouple import config
from sqlalchemy import create_engine, Column, Text, BigInteger, Boolean
from sqlalchemy.orm import sessionmaker, declarative_base
from logger import Logger
logger = Logger(__name__)
Base = declarative_base()
class User(Base):
__tablename__ = "users"
telegram_id = Column(BigInteger, primary_key=True, nullable=False)
username = Column(Text)
location = Column(Text)
notify_today = Column(Boolean, default=False)
notify_tomorrow = Column(Boolean, default=False)
class Database:
"""A class to create connection sessions to the database for user with specific telegram_id.
Args:
telegram_id (int): telegram_id to connect to the database.
"""
def __init__(self, telegram_id: int):
self.telegram_id = telegram_id
connection_config = {
"user": config("DBUSER"),
"password": config("PASSWORD"),
"host": config("HOST"),
"port": config("PORT"),
"database": config("DATABASE"),
"sslmode": "require",
}
self.engine = create_engine("postgresql://", connect_args=connection_config)
self.connect()
def connect(self):
"""Creates a connection session to the database."""
try:
Connection = sessionmaker(bind=self.engine)
self.session = Connection()
logger.info(
f"Connected to database [{config('DATABASE')}] with telegram ID [{self.telegram_id}]."
)
except Exception as error:
logger.error(
f"Error connecting to database: {error} with telegram ID [{self.telegram_id}]."
)
def disconnect(self):
"""Closes the connection session to the database."""
self.session.close()
logger.debug(
f"Disconnected from database with telegram ID [{self.telegram_id}]."
)
def exists_in_database(self) -> bool:
"""Checks if user with telegram_id exists in the database.
Returns:
bool: True if user exists in the database, False otherwise.
"""
exists = (
self.session.query(User)
.filter(User.telegram_id == self.telegram_id)
.count()
> 0
)
logger.debug(
f"User with telegram ID [{self.telegram_id}] exists: [{exists}] in the database."
)
return exists
def update_user(self, username: str, location: str):
"""Adds user to the database if it doesn't exist, otherwise updates the username and location
for existing user in the database.
Args:
username (str): telegram username
location (str): string-like location of the user in WeatherAPI format
"""
if not self.exists_in_database():
# Creating new user if it doesn't exist in the database.
user = User(
telegram_id=self.telegram_id, username=username, location=location
)
# Adding user to the database.
self.session.add(user)
self.session.commit()
logger.info(
f"User with telegram ID [{self.telegram_id}] added to the database."
)
else:
# Getting user from the database.
user = (
self.session.query(User)
.filter(User.telegram_id == self.telegram_id)
.first()
)
logger.debug(
f"Updating username from [{user.username}] to [{username}] for "
f"user with telegram ID [{self.telegram_id}]."
)
logger.debug(
f"Updating location from [{user.location}] to [{location}] for "
f"user with telegram ID [{self.telegram_id}]."
)
# Updating user data.
user.username = username
user.location = location
self.session.commit()
logger.debug(
f"Location and username updated for user with telegram ID [{self.telegram_id}]."
)
def get_user_location(self) -> str | None:
"""Returns the location of the user with telegram_id if it exists in the database.
Returns:
str | None: location of the user in WeatherAPI format if it exists in the database, None otherwise.
"""
if self.exists_in_database():
location = (
self.session.query(User)
.filter(User.telegram_id == self.telegram_id)
.first()
.location
)
return location
def get_all_usernames(self) -> list[str]:
"""Retrieves all usernames from the database, adds @ to the beginning of
each username and returns them as a list.
Returns:
list[str]: list of usernames with @ in front of each username.
"""
query = self.session.query(User.username).all()
usernames = [f"@{username[0]}" for username in query]
logger.debug(f"Get [{len(usernames)}] usernames from database.")
return usernames
def change_notification_status(self, notification: str):
"""Changes the notification status for user with telegram_id to opposite of the current status.
Requires notification to be either "today" or "tomorrow".
Args:
notification (str): notification to change status for ("today" or "tomorrow")
"""
user = (
self.session.query(User)
.filter(User.telegram_id == self.telegram_id)
.first()
)
if notification == "today":
user.notify_today = not user.notify_today
logger.debug(
f"User with telegram ID [{self.telegram_id}] changed {notification} "
f"notification status to [{user.notify_today}]."
)
elif notification == "tomorrow":
user.notify_tomorrow = not user.notify_tomorrow
logger.debug(
f"User with telegram ID [{self.telegram_id}] changed {notification} "
f"notification status to [{user.notify_tomorrow}]."
)
self.session.commit()
def notification_status(self, notification: str) -> bool:
"""Retrieves the status for specified notification for user in the database.
Returns True if notification is enabled, False otherwise.
Args:
notification (str): notification to retrieve status for ("today" or "tomorrow")
Returns:
bool: boolean value representing the status of the notification.
"""
user = (
self.session.query(User)
.filter(User.telegram_id == self.telegram_id)
.first()
)
logger.debug(
f"Checking [{notification}] notification status for user with telegram ID [{self.telegram_id}]."
)
if notification == "today":
return user.notify_today
elif notification == "tomorrow":
return user.notify_tomorrow
def get_notified_users(self, notification: str) -> list[User]:
"""Retrieves all users with specified notification enabled.
Args:
notification (str): notification to retrieve users for ("today" or "tomorrow")
Returns:
list[User]: list of User objects with specified notification enabled.
"""
if notification == "today":
users = self.session.query(User).filter(User.notify_today == True).all()
elif notification == "tomorrow":
users = self.session.query(User).filter(User.notify_tomorrow == True).all()
logger.debug(
f"Retrieved [{len(users)}] users with [{notification}] enabled notification status."
)
return users