-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathutils.py
42 lines (34 loc) · 1.15 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import json
import threading
# Function to retrieve users in a channel
def get_channel_users(client, channel_id):
response = client.conversations_members(channel=channel_id)
user_ids = response["members"]
users = []
for user_id in user_ids:
user_info = client.users_info(user=user_id)
users.append(user_info["user"])
return users
def ping_reviewer(client, review):
msg = f"Please review {review.url}. Submitted by <@{review.user.slack_id}>."
client.chat_postMessage(
channel=review.reviewer.slack_id,
text=msg
)
def metadata_serializer(object):
return json.dumps(object)
def metadata_deserializer(object):
return json.loads(object)
class DelayedExecutor:
def __init__(self):
self.timers = []
def set_timeout(self, callback, delay_in_seconds, *args, **kwargs):
timer = threading.Timer(delay_in_seconds, callback, args=args, kwargs=kwargs)
self.timers.append(timer)
timer.start()
return timer
def cancel_all(self):
"""Cancel all scheduled timers."""
for timer in self.timers:
timer.cancel()
self.timers.clear()