-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathmain.py
141 lines (128 loc) · 4.94 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
import asyncio
import re
from secrets import token_hex
from arclet.alconna import Alconna, Args, CommandMeta, Option, OptionResult, namespace, store_true
from arclet.alconna.avilla import AlconnaBehaviour, AlconnaOutputMessage
from arclet.alconna.tools import MarkdownTextFormatter
from avilla.core import Avilla, Context, Picture, RawResource
from avilla.core.exceptions import ActionFailed
from creart import it
from graia.amnesia.builtins.aiohttp import AiohttpClientService
from graia.broadcast import Broadcast
from graia.saya import Saya
from graia.scheduler import GraiaScheduler
from graia.scheduler.service import SchedulerService
from graiax.playwright import PlaywrightService
from launart import Launart
from loguru import logger
from app.config import load_config
from app.core import RaianBotDispatcher, RaianBotService
from app.image import md2img, setup_qrcode
from app.logger import loguru_exc_callback_async, setup_logger
from app.shortcut import picture
cli = Alconna(
Option(
"--root-dir|-D",
Args["dir", str],
dest="root",
help_text="配置文件根目录",
default=OptionResult(args={"dir": "config"}),
),
Option("--pw-head", default=False, action=store_true, help_text="是否取消 Playwright 的 headless 模式"),
meta=CommandMeta("Raianbot 的命令行工具", hide=True),
)
arp = cli()
if not arp.matched:
exit()
config = load_config(root_dir=arp.query[str]("root.dir", "config"))
setup_logger(config.log_level)
setup_qrcode(config)
with namespace("Alconna") as np:
np.prefixes = config.command.headers
np.builtin_option_name["help"] = set(config.command.help)
np.builtin_option_name["shortcut"] = set(config.command.shortcut)
np.builtin_option_name["completion"] = set(config.command.completion)
np.disable_builtin_options = config.command.disables
np.formatter_type = MarkdownTextFormatter
manager = Launart()
loop = it(asyncio.AbstractEventLoop)
loop.set_exception_handler(loguru_exc_callback_async)
saya = it(Saya)
bcc = it(Broadcast)
it(AlconnaBehaviour)
it(GraiaScheduler)
manager.add_component(
PlaywrightService(
config.browser.type,
headless=not arp.query[bool]("pw-head.value"),
channel=config.browser.channel,
auto_download_browser=(not config.browser.channel),
# user_data_dir=Path(config.cache_dir) / "browser"
)
)
manager.add_component(AiohttpClientService())
# manager.add_component(FastAPIService(fastapi))
# manager.add_component(UvicornASGIService(config.api.host, config.api.port))
manager.add_component(SchedulerService(it(GraiaScheduler)))
manager.add_component(bot_service := RaianBotService(config))
bcc.finale_dispatchers.append(RaianBotDispatcher(bot_service))
avilla = Avilla(broadcast=bcc, launch_manager=manager, record_send=True, message_cache_size=0)
protocols = {}
for bot_config in config.bots:
t, c = bot_config.export()
protocols.setdefault(t, []).append(c)
for protocol_type, configs in protocols.items():
protocol = protocol_type()
for config in configs:
protocol.configure(config)
avilla.apply_protocols(protocol)
@avilla.listen(AlconnaOutputMessage)
async def send_handler(output: str, otype: str, ctx: Context):
bcc.listeners[1].oplog.clear()
# length = (output.count("\n") + 5) * 16
if otype in ("shortcut", "error"):
if ctx.scene.follows("::group"):
output = f"\n{output}"
return await ctx.scene.send_message(re.sub(r"\w\.\w", lambda mat: mat[0].replace(".", ". "), output))
if otype == "completion":
output = (
output.replace("\n\n", "\n")
.replace("<", "<")
.replace(">", ">")
.replace("{", "{")
.replace("}", "}")
)
if ctx.scene.follows("::group"):
output = f"\n{output}"
return await ctx.scene.send_message(output)
if not output.startswith("#"):
output = f"# {output}"
output = (
output.replace("\n\n", "\n")
.replace("\n", "\n\n")
.replace("#", "##")
.replace("<", "<")
.replace(">", ">")
)
img = await md2img(output)
try:
return await ctx.scene.send_message(Picture(RawResource(img)))
except Exception as e:
logger.error(e)
url = await bot_service.upload_to_cos(img, f"output_{token_hex(16)}.png")
try:
return await ctx.scene.send_message(picture(url, ctx))
except ActionFailed:
output = (
output.replace("<", "<")
.replace(">", ">")
.replace("\n\n", "\n")
.replace("##", "#")
.replace("**", "")
)
if ctx.scene.follows("::group"):
output = f"\n{output}"
return await ctx.scene.send_message(output)
logger.success("------------------机器人初始化完毕--------------------")
avilla.launch()
logger.success("机器人关闭成功. 晚安")