-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmoce.py
172 lines (133 loc) · 5.16 KB
/
moce.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
import os
import json
import chainlit as cl
from embedchain import Pipeline as App
from datetime import datetime
try:
HF_TOKEN = os.environ['HUGGINGFACE_API_KEY']
if HF_TOKEN is None:
raise ValueError('HUGGINGFACE_API_KEY is not set')
except Exception as err:
raise(err)
class DatabaseError(Exception):
pass
class JSONDB:
def __init__(self, file_path):
self.file_path = file_path
def _create_file_if_not_exists(self):
if not os.path.exists(self.file_path):
with open(self.file_path, 'w') as fp:
json.dump([], fp)
def add_record(self, record):
try:
self._create_file_if_not_exists()
with open(self.file_path, 'r+') as fp:
try:
data = json.load(fp)
if record not in data:
data.append(record)
else:
pass
except Exception as err:
print(f'[DEBUG] Error adding record: {str(err)}')
raise(err)
fp.seek(0)
json.dump(data, fp, indent=4)
except (FileNotFoundError, json.JSONDecodeError, IOError) as e:
raise DatabaseError(f"Error adding record: {str(e)}")
def get_all_records(self):
try:
with open(self.file_path, 'r') as fp:
# Attempt to load data, handle empty file scenario
try:
data = json.load(fp)
except json.JSONDecodeError:
data = []
return data
except (FileNotFoundError, IOError) as e:
raise DatabaseError(f"Error getting all records: {str(e)}")
def get_top_records(self, n):
try:
records = self.get_all_records()
sorted_records = sorted(records, key=lambda x: x.get('added', 0), reverse=True)
return sorted_records[:n]
except (FileNotFoundError, json.JSONDecodeError, IOError) as e:
raise DatabaseError(f"Error getting top records: {str(e)}")
@cl.on_chat_start
async def setup_app():
app = App.from_config(config_path='data/config.yaml')
app.collect_metrics = False
cl.user_session.set('app', app)
db = JSONDB('data/index.json')
cl.user_session.set('db', db)
def update_db(data):
db = cl.user_session.get('db')
record = {
'url': data, # Store the URL as a JSON field
'added': datetime.now().strftime('%d/%m/%Y %H:%M:%S')
}
db.add_record(record)
@cl.on_message
async def main(message: cl.Message):
task_list = cl.TaskList()
task_list.status = 'Running...'
app = cl.user_session.get('app')
msg = cl.Message(content='')
user_message = message.content
if user_message.startswith('/help'):
markdown_content = "| Command | Description |\n| --- | --- |\n"
markdown_content += "| /add | Add a document to the knowledge base |\n"
markdown_content += "| /kb | Display the knowledge base |\n"
markdown_content += "| /help | Display the available commands |\n"
markdown_content += "| * | Chat with the AI |\n"
await cl.Message(
content=markdown_content
).send()
elif user_message.startswith('/add'):
data = user_message.replace('/add', '').strip()
db = cl.user_session.get('db')
records = db.get_all_records()
if data in [record['url'] for record in records]:
await cl.Message(
content='This document already exists in the knowledge base!'
).send()
else:
add_task = cl.Task(title='Adding to knowledge base', status=cl.TaskStatus.RUNNING)
await task_list.add_task(add_task)
await task_list.send()
app.add(data)
update_db(data)
add_task.status = cl.TaskStatus.DONE
await task_list.send()
await cl.Message(
content='Added data to knowledge base!'
).send()
elif user_message.startswith('/kb'):
kb_task = cl.Task(title='Getting records', status=cl.TaskStatus.RUNNING)
await task_list.add_task(kb_task)
await task_list.send()
data = cl.user_session.get('db').get_top_records(25)
kb_task.status = cl.TaskStatus.DONE
await task_list.send()
if len(data) == 0:
await cl.Message(
content='No documents in json index!'
).send()
else:
markdown_content = "| URL | Added |\n| --- | --- |\n"
for record in data:
url = record['url']
added = record['added']
markdown_content += f"| {url} | {added} |\n"
await cl.Message(
content=markdown_content
).send()
else:
chat_task = cl.Task(title='Querying LLM', status=cl.TaskStatus.RUNNING)
await task_list.add_task(chat_task)
await task_list.send()
for chunk in await cl.make_async(app.chat)(message.content):
await msg.stream_token(chunk)
chat_task.status = cl.TaskStatus.DONE
await task_list.send()
await msg.send()