-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathdb.py
305 lines (273 loc) · 11.4 KB
/
db.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
import datetime
import os
import psycopg2
from logger import logger
class DB:
"""Class for handling database methods.
Attributes:
__verbose -- turns the verbose mode on, useful for debugging
__location -- the absolute location for the bot
"""
__verbose = False
def __init__(self):
self.__conn = psycopg2.connect(
database=os.environ['POSTGRES_DB'],
user=os.environ['POSTGRES_USER'],
password=os.environ['POSTGRES_PASSWORD'],
host=os.environ['POSTGRES_HOST'],
port="5432",
)
self.__cursor = self.__conn.cursor()
def __del__(self):
self.__cursor.close()
self.__conn.close()
def get_token(self):
"""Fetches the Telegram Bot API Token from the database.
"""
try:
self.__cursor.execute(''' SELECT data from settings WHERE id = 2 ''')
return self.__cursor.fetchone()[0]
except:
logger.critical("Telegram Bot Secret is not in the database")
exit()
def get_offset(self):
"""Fetches the message offset from the database.
"""
self.__cursor.execute(''' SELECT data from settings WHERE id = 1 ''')
return self.__cursor.fetchone()[0]
def update_offset(self, offset):
"""Updates the message offset according to the offset parameter.
Returns boolean.
"""
res = True
try:
self.__cursor.execute(''' UPDATE settings SET data = (%s) WHERE id = 1 ''', (offset,))
logger.debug("Offset updated with %s" % offset)
except:
res = False
logger.warning("Offset could not be updated")
finally:
self.__conn.commit()
return res
def check_if_user_exists(self, uid):
"""Checks if a user with the given user id exists.
Returns boolean.
"""
self.__cursor.execute(''' SELECT COUNT(*) from people WHERE uid = (%s) ''', (uid,))
status = self.__cursor.fetchone()[0]
return status == 1
def add_user(self, uid, fname, lname, uname):
"""Adds a user with the given id, first name, last name, and username to the database.
Returns boolean.
"""
res = True
if self.check_if_user_exists(uid):
logger.debug("User %s %s (%s) exists in the database" % (fname, lname, uname))
return res
else:
try:
self.__cursor.execute(''' INSERT into people VALUES((%s),(%s),(%s),(%s)) ''',
(uid, fname, lname, uname))
logger.debug("User %s %s (%s) created successfully" % (fname, lname, uname))
except:
res = False
logger.warning("User %s %s (%s) cannot be inserted" % (fname, lname, uname))
finally:
self.__conn.commit()
return res
def check_if_group_exists(self, gid):
"""Checks if a group with the given group id exists.
Returns boolean.
"""
self.__cursor.execute(''' SELECT COUNT(*) from groups WHERE gid = (%s) ''', (gid,))
status = self.__cursor.fetchone()[0]
return status == 1
def add_group(self, gid, title):
"""Adds a group with the given id, and title to the database.
Returns boolean.
"""
res = True
if self.check_if_group_exists(gid):
logger.debug("Group %s (%s) exists in the database" % (title, gid))
return res
else:
try:
self.__cursor.execute(''' INSERT into groups VALUES((%s),(%s)) ''', (gid, title))
logger.debug("Group %s (%s) created successfully" % (title, gid))
except:
res = False
logger.warning("Group %s (%s) cannot be inserted" % (title, gid))
finally:
self.__conn.commit()
return res
def log(self, uid, fname, lname, uname, mid, time, content, gid, gtitle='private'):
"""Logs the activity to the database.
Returns boolean.
"""
self.add_user(uid, fname, lname, uname)
if gid != 0:
self.add_group(gid, gtitle)
res = True
try:
self.__cursor.execute(''' INSERT into logs VALUES((%s),(%s),(%s),(%s),(%s)) ''',
(uid, mid, time, content, gid))
logger.debug("Message %s logged successfully" % (mid))
except:
res = False
logger.warning("Message %s cannot be logged" % (mid))
finally:
self.__conn.commit()
return res
def get_service_title(self, sid):
"""Fetches the service title with the given service id from the database.
"""
try:
self.__cursor.execute(''' SELECT name from services WHERE id = (%s) ''', (sid,))
return self.__cursor.fetchone()[0]
except:
return False
def check_service(self, uid, service):
"""Checks if a user with the given user id subscribed to the service.
Returns boolean.
"""
self.__cursor.execute(''' SELECT COUNT(*) from subscriptions WHERE uid = (%s) and service = (%s)''',
(uid, service))
status = self.__cursor.fetchone()[0]
return status == 1
def add_service(self, uid, service):
"""Adds a service subscription to the database.
Returns boolean.
"""
res = True
if self.check_service(uid, service) is True:
logger.debug("User %s already subscribed to service %s" % (uid, self.get_service_title(service)))
return res
try:
self.__cursor.execute(''' INSERT into subscriptions VALUES((%s),(%s)) ''', (uid, service))
logger.debug("User %s subscribed to %s service successfully" % (uid, self.get_service_title(service)))
except:
res = False
logger.warning("User %s cannot subscribe to %s service" % (uid, self.get_service_title(service)))
finally:
self.__conn.commit()
return res
def remove_service(self, uid, service):
"""Removes a service subscription from the database.
Returns boolean.
"""
res = False
if self.check_service(uid, service) is False:
logger.debug("User %s has not subscribed to service %s" % (uid, self.get_service_title(service)))
return res
try:
self.__cursor.execute(''' DELETE from subscriptions WHERE uid = (%s) and service = (%s) ''', (uid, service))
res = True
logger.debug("User %s unsubscribed from %s service successfully" % (uid, self.get_service_title(service)))
except:
logger.warning("User %s cannot unsubscribe to %s service" % (uid, self.get_service_title(service)))
finally:
self.__conn.commit()
return res
def check_if_service_sent_today(self, service):
"""Checks if the service blast has been sent today.
Returns boolean.
"""
now = datetime.datetime.now()
today = str(now.year) + str(now.month) + str(now.day)
self.__cursor.execute(''' SELECT COUNT(*) from servicedays WHERE id = (%s) and day = (%s) ''', (service, today))
status = self.__cursor.fetchone()[0]
return status == 1
def mark_service_sent_today(self, service):
"""Updates a service blast as sent today.
Returns boolean.
"""
now = datetime.datetime.now()
today = str(now.year) + str(now.month) + str(now.day)
res = True
try:
self.__cursor.execute(''' INSERT into servicedays VALUES((%s),(%s)) ''', (service, today))
logger.debug("Service days for %s updated as sent" % self.get_service_title(service))
except:
res = False
logger.warning("Service days could not be updated")
finally:
self.__conn.commit()
return res
def get_group_title(self, gid):
"""Fetches the group title with the given group id from the database.
"""
if self.check_if_group_exists(gid):
self.__cursor.execute(''' SELECT title from groups WHERE gid = (%s) ''', (gid,))
return self.__cursor.fetchone()[0]
else:
return False
def get_user(self, uid):
"""Fetches the user details with the given user id from the database.
"""
if self.check_if_user_exists(uid):
self.__cursor.execute(''' SELECT * from people WHERE uid = (%s) ''', (uid,))
return self.__cursor.fetchone()
else:
return False
def get_service_users(self, service):
"""Fetches the user ids subscribed to the given service from the database.
"""
self.__cursor.execute(''' SELECT * from subscriptions WHERE service = (%s) ''', (service,))
return self.__cursor.fetchall()
def list_logs(self):
"""Prints all logs from the database.
"""
self.__cursor.execute(''' SELECT * FROM logs ORDER by mid ''')
logger.debug("Printing all logs")
for i in self.__cursor.fetchall():
logger.debug(i)
def list_logs_from_user(self, uid):
"""Prints all logs with the given user id from the database.
"""
if self.check_if_user_exists(uid):
user = self.get_user(uid)
self.__cursor.execute(''' SELECT * FROM logs WHERE uid = (%s) ORDER by mid ''', (uid,))
logger.debug("Printing all logs from user %s %s (%s)" % (user[1], user[2], user[3]))
for i in self.__cursor.fetchall():
logger.debug(i)
else:
logger.debug("No user exists with User ID %s" % uid)
def list_logs_with_user(self):
"""Prints all logs and the user details as a joined table from the database.
"""
self.__cursor.execute(''' SELECT * FROM logs INNER JOIN people ON logs.uid = people.uid ORDER by mid ''')
logger.debug("Printing all logs with user data")
for i in self.__cursor.fetchall():
logger.debug(i)
def list_logs_from_group(self, gid):
"""Prints all logs with the given group id from the database.
"""
if self.check_if_group_exists(gid):
gtitle = self.get_group_title(gid)
self.__cursor.execute(''' SELECT * FROM logs WHERE gid = (%s) ORDER by mid ''', (gid,))
logger.debug("Printing all logs from group %s (%s)" % (gtitle, gid))
for i in self.__cursor.fetchall():
logger.debug(i)
else:
logger.debug("No group exists with Group ID %s" % gid)
def list_db(self):
"""Prints all table names from the database.
"""
self.__cursor.execute(''' SELECT name FROM sqlite_master WHERE type='table' ''')
logger.debug("Printing all databases")
for i in self.__cursor.fetchall():
logger.debug(i)
def list_users(self):
"""Prints all users from the database.
"""
self.__cursor.execute(''' SELECT * FROM people ORDER by uid ''')
logger.debug("Printing all users")
for i in self.__cursor.fetchall():
logger.debug(i)
def list_groups(self):
"""Prints all groups from the database.
"""
self.__cursor.execute(''' SELECT * FROM groups ORDER by gid ''')
logger.debug("Printing all groups")
for i in self.__cursor.fetchall():
logger.debug(i)