-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathread_ytchat.py
149 lines (120 loc) · 4.26 KB
/
read_ytchat.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
import asyncio
import os
from pytchat import LiveChatAsync
from setup import logger
class YTchat:
def __init__(self, ytid, chid, func_send, normal_msg=False,
save=False, live=True, chat_folder="chat"):
# main
self.livechat = LiveChatAsync(ytid, callback=self.post)
if chid:
self.id = str(chid) + "." + ytid
else:
self.id = ytid
# discord channel and post function
self.chid = str(chid)
self.send = func_send
# pytchat parameters
self.ytid = ytid
self.normal_msg = normal_msg
self.live = live
# save the chat
self.save = save
self.folder = chat_folder + "/"
if save:
os.makedirs(self.folder, exist_ok=True)
if not self.is_alive():
raise ValueError("Is not live")
logger.info(self.id + " is added")
def is_alive(self):
return self.livechat.is_alive()
async def close(self):
logger.info(self.id + " to stopped")
await self.send(f"{self.ytid} is stopped")
self.livechat.terminate()
def raise_for_status(self):
return self.livechat.raise_for_status()
async def post(self, chatdata):
for c in chatdata.items:
if self.save:
with open(self.folder + self.id + ".data", "a") as f:
f.write(c.json() + "\n")
if c.type != "textMessage" or self.normal_msg:
logger.debug("post")
await self.send(c)
if self.live:
await chatdata.tick_async()
async def console_print(c):
if type(c) is str:
logger.info(c)
else:
logger.debug(f"Print data: {str(c.json())}")
class YTchats:
def __init__(self, state=False, state_file="./state", **kwargs):
self.videos = []
# save the list of videos id into files
self.state = state
self.state_file = state_file
if state:
self.load_state(**kwargs)
logger.debug(f"State will save to {self.state_file} while checking")
def load_state(self, **kwargs):
if not os.path.exists(self.state_file):
return
logger.info(f"Read last state from {self.state_file}")
for id in open("state"):
id = id.strip()
self.add_video(id.split('.')[1], id.split('.')[0], **kwargs)
def write_state(self):
with open(self.state_file, "w") as f:
f.writelines([i.id for i in self.videos])
def add_video(self, id, channel="", func_send=console_print, **kwargs):
try:
chat = YTchat(id, channel, func_send, **kwargs)
self.videos.append(chat)
return True
except BaseException as e:
logger.warning(str(type(e)) + str(e))
logger.warning(f"{id} cannot be added")
return False
async def remove_video(self, id, channel=""):
if channel:
id = str(channel) + "." + id
videos = []
for chat in self.videos:
if chat.id == id:
logger.debug(f"Remove {chat.id}")
await chat.close()
else:
videos.append(chat)
if len(videos) == len(self.videos):
return False
self.videos = videos
return True
async def remove_offline_video(self):
# check if finished
fin_chat = []
for chat in self.videos:
if not chat.is_alive():
fin_chat.append(chat.id)
# remove offline stream
for id in fin_chat:
await self.remove_video(id)
def show_status(self):
logger.info("check: " + ",".join([i.id for i in self.videos]))
# save state to file
if self.state:
self.write_state()
async def main(self, allow_empty=True):
while True:
await self.remove_offline_video()
if len(self.videos) == 0 and not allow_empty:
break
self.show_status()
await asyncio.sleep(10)
if __name__ == "__main__":
async def main():
chats = YTchats(state=False)
chats.add_video("ejGH1BC1l98", "backup", normal_msg=True, save=True, live=False)
await chats.main(allow_empty=True)
asyncio.run(main())