diff --git a/src/complements_bot/bot.py b/src/complements_bot/bot.py index 07f36f3..b9e4ec6 100644 --- a/src/complements_bot/bot.py +++ b/src/complements_bot/bot.py @@ -6,16 +6,17 @@ import os import random import textwrap -from typing import Awaitable, Callable, Optional, Tuple, Union, Iterable +from typing import Optional, Tuple from twitchio import Message from twitchio.ext import commands # , routines , eventsub -from . import database, bot_controller -from .utilities import Awaitables, remove_chars, run_with_appropriate_awaiting +from . import bot_controller, database +from .utilities import Awaitables, remove_chars from ..app.app import run_app_and_bot from ..env_reader import CLIENT_SECRET, TMI_TOKEN + # logger = logging.getLogger(__name__) # TODO: @@ -57,7 +58,6 @@ class ComplementsBot(commands.Bot): CMD_PREFIX: str = '!' DEFAULT_MAX_MSG_LEN: int = 500 MAX_COMPLEMENT_LENGTH: int = 350 - F_USER: str = "{user}" SHOULD_LOG: bool = True OWNER_NICK: str = 'ereiarrus' OWNER_ID: str = "118034879" @@ -89,29 +89,9 @@ def run(self): self.loop.close() - async def name_to_id(self, username: str) -> Optional[str]: - """ - :param username: the username of the user whose user id we want - :return: the user id of the specified user, if the user exists; otherwise 'None' - """ - res = await self.fetch_users(names=[username]) - if len(res) > 0: - return str(res[0].id) - return None - - async def id_to_name(self, uid: str) -> Optional[str]: - """ - :param uid: the user id of the user whose username we want - :return: the username of the specified user, if the user exists; otherwise 'None' - """ - res = await self.fetch_users(ids=[int(uid)]) - if len(res) > 0: - return res[0].name - return None - async def event_ready(self) -> None: """ - Called once when the bot goes online; purely informational + Called once when the bot goes online """ joined_channels: list[int] = list(map(int, await database.get_joined_channels())) @@ -125,7 +105,8 @@ async def event_ready(self) -> None: channels: list = list(itertools.chain.from_iterable(await awaitables.gather())) channel_names: list[str] = list(map(lambda x: x.user.name, channels)) await asyncio.gather(self.join_channels(channel_names), - database.join_channel(username=self.nick, name_to_id=self.name_to_id)) + database.join_channel(username=self.nick, + name_to_id=lambda x: bot_controller.name_to_id(self, x))) if ComplementsBot.SHOULD_LOG: bot_controller.custom_log(f"{self.nick} is online!") @@ -148,8 +129,8 @@ async def event_message(self, message: Message) -> None: sender_id_raw: Optional[str] channel_id_raw: Optional[str] - sender_id_raw, channel_id_raw = await asyncio.gather(self.name_to_id(message.author.name), - self.name_to_id(message.channel.name)) + sender_id_raw, channel_id_raw = await asyncio.gather(bot_controller.name_to_id(self, message.author.name), + bot_controller.name_to_id(self, message.channel.name)) assert sender_id_raw assert channel_id_raw sender_id: str @@ -157,11 +138,12 @@ async def event_message(self, message: Message) -> None: sender_id, channel_id = str(sender_id_raw), str(channel_id_raw) sender: str = message.author.name - awaitables: Awaitables = Awaitables([database.is_user_ignored(userid=sender_id), - database.get_complement_chance(userid=channel_id), - database.is_ignoring_bots(userid=channel_id), - database.get_random_complement_enabled(userid=channel_id) - ]) + awaitables: Awaitables = Awaitables([ + database.is_user_ignored(userid=sender_id), + database.get_complement_chance(userid=channel_id), + database.is_ignoring_bots(userid=channel_id), + database.get_random_complement_enabled(userid=channel_id) + ]) is_author_ignored: bool chance: float is_ignoring_bots: bool @@ -202,7 +184,7 @@ async def choose_complement(self, channel: str) -> Tuple[str, bool]: custom_complements: list[str] = [] custom_complements_enabled: bool default_complements_enabled: bool - channel_id: str = str(await self.name_to_id(channel)) + channel_id: str = str(await bot_controller.name_to_id(self, channel)) custom_complements_enabled, default_complements_enabled = \ await asyncio.gather(database.are_custom_complements_enabled(userid=channel_id), database.are_default_complements_enabled(userid=channel_id)) @@ -242,7 +224,8 @@ async def complement_msg(self, who: str, channel: str, is_tts_muted: bool = True exists: bool if is_tts_muted: tts_mute_prefix: str - awaitables.add_task(database.get_tts_mute_prefix(channel, name_to_id=self.name_to_id)) + awaitables.add_task( + database.get_tts_mute_prefix(channel, name_to_id=lambda x: bot_controller.name_to_id(self, x))) (complement, exists), tts_mute_prefix = await awaitables.gather() prefix = f"{tts_mute_prefix} {prefix}" else: @@ -268,8 +251,8 @@ async def complement(self, ctx: commands.Context) -> None: sender_id_raw: Optional[str] channel_id_raw: Optional[str] - sender_id_raw, channel_id_raw = await asyncio.gather(self.name_to_id(ctx.author.name), - self.name_to_id(ctx.channel.name)) + sender_id_raw, channel_id_raw = await asyncio.gather(bot_controller.name_to_id(self, ctx.author.name), + bot_controller.name_to_id(self, ctx.channel.name)) assert sender_id_raw assert channel_id_raw sender_id: str @@ -302,13 +285,13 @@ async def compunignoreme(self, ctx: commands.Context) -> None: complement using the 'complement' command will work. """ - userid: str = str(await self.name_to_id(ctx.author.name)) + userid: str = str(await bot_controller.name_to_id(self, ctx.author.name)) - await ComplementsBot.cmd_body( + await bot_controller.cmd_body( ctx, lambda x: True, None, - ComplementsBot.DoIfElse((lambda ctx: database.is_user_ignored(userid=userid)), + bot_controller.DoIfElse((lambda ctx: database.is_user_ignored(userid=userid)), None, None, (lambda ctx: database.unignore(userid=userid)), @@ -321,13 +304,13 @@ async def compignoreme(self, ctx: commands.Context) -> None: The user of this command will not get any complements sent their way from ComplementsBot """ - userid: str = str(await self.name_to_id(ctx.author.name)) + userid: str = str(await bot_controller.name_to_id(self, ctx.author.name)) - await ComplementsBot.cmd_body( + await bot_controller.cmd_body( ctx, lambda x: True, None, - ComplementsBot.DoIfElse((lambda ctx: database.is_user_ignored(userid=userid)), + bot_controller.DoIfElse((lambda ctx: database.is_user_ignored(userid=userid)), None, None, None, @@ -341,7 +324,7 @@ async def is_in_bot_channel(self, ctx: commands.Context) -> bool: Checks if the context was created in the bot's channel (or the creator's) """ - return await self.name_to_id(ctx.channel.name) in (str(self.user_id), ComplementsBot.OWNER_ID) + return await bot_controller.name_to_id(self, ctx.channel.name) in (str(self.user_id), ComplementsBot.OWNER_ID) @staticmethod async def send_and_log(ctx: commands.Context, msg: Optional[str]) -> None: @@ -356,97 +339,6 @@ async def send_and_log(ctx: commands.Context, msg: Optional[str]) -> None: if ComplementsBot.SHOULD_LOG: bot_controller.custom_log(msg) - class DoIfElse: - """ - Serves as a way to store what to do in an if/else block of a lot of the bodies of the commands - """ - - def __init__(self, - if_check: Union[Callable[[commands.Context], Awaitable[bool]], Callable[[commands.Context], bool]], - true_msg: Optional[str], - false_msg: Optional[str], - do_true: Optional[Union[ - Callable[[commands.Context], Awaitable[None]], Callable[[commands.Context], None]]] = None, - do_false: Optional[Union[Callable[[commands.Context], Awaitable[None]], Callable[ - [commands.Context], None]]] = None) -> None: - """ - :param if_check: what the condition for entering 'if' statement is - :param do_true: what to do when the if_check succeeds (done before sending message to chat); - if 'None', does nothing - :param true_msg: what to send to chat when the 'if' if_check succeeds; any occurrence of - complements_bot.F_USER in the string is replaced with the name of the user in chat who called the - original command - :param do_false: what to do when if_check fails (done before sending message to chat); - if 'None', does nothing - :param false_msg: what to send to chat when the if_check fails; any occurrence of complements_bot.F_USER - in the string is replaced with the name of the user in chat who called the original command - """ - - self.if_check: Union[Callable[[commands.Context], Awaitable[bool]], Callable[[commands.Context], bool]] \ - = if_check - self.true_msg: Optional[str] = true_msg - self.false_msg: Optional[str] = false_msg - - self.do_true: Union[Callable[[commands.Context], Awaitable[None]], Callable[ - [commands.Context], None]] = do_true or ( - lambda ctx: None) - self.do_false: Union[Callable[[commands.Context], None], Callable[ - [commands.Context], Awaitable[None]]] = do_false or ( - lambda ctx: None) - - @staticmethod - async def cmd_body(ctx: commands.Context, - permission_check: Union[ - Callable[[commands.Context], bool], Callable[[commands.Context], Awaitable[bool]]], - do_always: Optional[ - Union[Callable[[commands.Context], Awaitable[None]], Callable[[commands.Context], None]]] = None, - do_if_else: Optional[DoIfElse] = None, - always_msg: Optional[str] = None) -> Optional[Iterable[Optional[str]]]: - """ - The main structure in which commands sent to the bot's channel need to be processed - :param ctx: context from the original call - :param permission_check: who is allowed to run the command in question - :param do_always: this is always ASYNCHRONOUSLY called if permission_check passes; if 'None', does nothing - :param do_if_else: specifies the if condition on which to enter the if block, what to do in the if and else - blocks, and the messages to send in either block; if None, all if/else logic is skipped - :param always_msg: message to always send to chat (after do_before_if and do_if_else); if None, send nothing; - any occurrence of ComplementsBot.F_USER in the string is replaced with the name of the user in chat who - called the original command - :return: True if permission_check passes, False otherwise - """ - - awaitables: Awaitables = Awaitables([]) - - user: str = ctx.author.name - - permission_check_task: asyncio.Task = asyncio.create_task(run_with_appropriate_awaiting(permission_check, ctx)) - - to_send: list[str] = [] - if do_if_else is not None: - permission_check_res, if_check_res = await asyncio.gather(permission_check_task, - run_with_appropriate_awaiting(do_if_else.if_check, ctx)) - if not permission_check_res: - return None - - if if_check_res: - awaitables.add_task(run_with_appropriate_awaiting(do_if_else.do_true, ctx)) - if do_if_else.true_msg: - to_send.append(do_if_else.true_msg.replace(ComplementsBot.F_USER, user)) - else: - awaitables.add_task(run_with_appropriate_awaiting(do_if_else.do_false, ctx)) - if do_if_else.false_msg: - to_send.append(do_if_else.false_msg.replace(ComplementsBot.F_USER, user)) - elif not await permission_check_task: - return None - - if always_msg is not None: - to_send.append(always_msg.replace(ComplementsBot.F_USER, user)) - - awaitables.add_task(run_with_appropriate_awaiting(do_always, ctx)) - await awaitables.gather() - - return to_send - @commands.command() async def joinme(self, ctx: commands.Context) -> None: """ @@ -454,7 +346,7 @@ async def joinme(self, ctx: commands.Context) -> None: Also used to reset channel name if streamer changed their username """ - raw_userid: Optional[str] = str(await self.name_to_id(ctx.author.name)) + raw_userid: Optional[str] = str(await bot_controller.name_to_id(self, ctx.author.name)) assert raw_userid userid: str = str(raw_userid) old_username: Optional[str] = await database.get_username(userid=userid) @@ -475,16 +367,17 @@ async def if_check(ctx: commands.Context) -> bool: return (await database.is_channel_joined(userid=userid) and ctx.author.name == old_username) - await ComplementsBot.cmd_body( + await bot_controller.cmd_body( ctx, self.is_in_bot_channel, None, - ComplementsBot.DoIfElse(if_check, - f"@{ComplementsBot.F_USER} I am already in your channel!", - f"@{ComplementsBot.F_USER} I have joined your channel!", - None, - do_false - ) + bot_controller.DoIfElse( + if_check, + f"@{bot_controller.F_USER} I am already in your channel!", + f"@{bot_controller.F_USER} I have joined your channel!", + None, + do_false + ) ) @commands.command() @@ -493,7 +386,7 @@ async def leaveme(self, ctx: commands.Context) -> None: Bot leaves the user's channel and no longer complements chatters there. """ - userid: str = str(await self.name_to_id(ctx.author.name)) + userid: str = str(await bot_controller.name_to_id(self, ctx.author.name)) async def do_true(ctx: commands.Context) -> None: # Update database and in realtime for "instant" effect @@ -501,16 +394,16 @@ async def do_true(ctx: commands.Context) -> None: self.part_channels([ctx.author.name])]) await awaitables.gather() - await ComplementsBot.cmd_body( + await bot_controller.cmd_body( ctx, self.is_in_bot_channel, None, - ComplementsBot.DoIfElse((lambda ctx: database.is_channel_joined(userid=userid)), - f"@{ComplementsBot.F_USER} I have left your channel; I would appreciate it if " + bot_controller.DoIfElse((lambda ctx: database.is_channel_joined(userid=userid)), + f"@{bot_controller.F_USER} I have left your channel; I would appreciate it if " f"you let me know why you asked me to leave! You can contact me in one of three ways: " f"DM ComplementsBot right here on Twitch; DM Ereiarrus on Twitch" f"(https://www.twitch.tv/ereiarrus) or Discord Ereiarrus#2900. Thank you!", - f"@{ComplementsBot.F_USER} I have not joined your channel.", + f"@{bot_controller.F_USER} I have not joined your channel.", do_true, None ) @@ -522,7 +415,7 @@ async def deleteme(self, ctx: commands.Context) -> None: Same as the 'leaveme' command, but on top, also delete any records of the user (e.g. custom complements) """ - userid: str = str(await self.name_to_id(ctx.author.name)) + userid: str = str(await bot_controller.name_to_id(self, ctx.author.name)) async def do_true(ctx: commands.Context) -> None: # Remove any user records from database and leave their channel NOW @@ -530,19 +423,20 @@ async def do_true(ctx: commands.Context) -> None: self.part_channels([ctx.author.name])]) await awaitables.gather() - await ComplementsBot.cmd_body( + await bot_controller.cmd_body( ctx, self.is_in_bot_channel, None, - ComplementsBot.DoIfElse((lambda ctx: database.channel_exists(userid=userid)), - f"@{ComplementsBot.F_USER} I have deleted your channel data; I would appreciate it if " - f"you let me know why you asked me to leave! You can contact me in one of three ways: " - f"DM ComplementsBot right here on Twitch; DM Ereiarrus on Twitch" - f"(https://www.twitch.tv/ereiarrus) or Discord Ereiarrus#2900. Thank you!", - f"@{ComplementsBot.F_USER} your channel does not exists in my records.", - do_true, - None - ) + bot_controller.DoIfElse( + (lambda ctx: database.channel_exists(userid=userid)), + f"@{bot_controller.F_USER} I have deleted your channel data; I would appreciate it if " + f"you let me know why you asked me to leave! You can contact me in one of three ways: " + f"DM ComplementsBot right here on Twitch; DM Ereiarrus on Twitch" + f"(https://www.twitch.tv/ereiarrus) or Discord Ereiarrus#2900. Thank you!", + f"@{bot_controller.F_USER} your channel does not exists in my records.", + do_true, + None + ) ) @commands.command() @@ -551,15 +445,15 @@ async def ignoreme(self, ctx: commands.Context) -> None: The user of this command will not get any complements sent their way from ComplementsBot """ - userid: str = str(await self.name_to_id(ctx.author.name)) + userid: str = str(await bot_controller.name_to_id(self, ctx.author.name)) - await ComplementsBot.cmd_body( + await bot_controller.cmd_body( ctx, self.is_in_bot_channel, None, - ComplementsBot.DoIfElse((lambda ctx: database.is_user_ignored(userid=userid)), - f"@{ComplementsBot.F_USER} I am already ignoring you.", - f"@{ComplementsBot.F_USER} I am now ignoring you.", + bot_controller.DoIfElse((lambda ctx: database.is_user_ignored(userid=userid)), + f"@{bot_controller.F_USER} I am already ignoring you.", + f"@{bot_controller.F_USER} I am now ignoring you.", None, (lambda ctx: database.ignore(userid=userid))) ) @@ -570,10 +464,10 @@ async def userid(self, ctx: commands.Context) -> None: Get the twitch user's ID from their username """ - userid: Optional[str] = await self.name_to_id(bot_controller.isolate_args(ctx.message.content)) - userid = userid or (await self.name_to_id(ctx.author.name)) + userid: Optional[str] = await bot_controller.name_to_id(self, bot_controller.isolate_args(ctx.message.content)) + userid = userid or (await bot_controller.name_to_id(self, ctx.author.name)) - await ComplementsBot.cmd_body( + await bot_controller.cmd_body( ctx, self.is_in_bot_channel, None, @@ -587,9 +481,9 @@ async def username(self, ctx: commands.Context) -> None: Get the twitch user's ID from their username """ - username: Optional[str] = await self.id_to_name(bot_controller.isolate_args(ctx.message.content)) + username: Optional[str] = await bot_controller.id_to_name(self, bot_controller.isolate_args(ctx.message.content)) - await ComplementsBot.cmd_body( + await bot_controller.cmd_body( ctx, self.is_in_bot_channel, None, @@ -604,15 +498,15 @@ async def unignoreme(self, ctx: commands.Context) -> None: complement using the 'complement' command will work. """ - userid: str = str(await self.name_to_id(ctx.author.name)) + userid: str = str(await bot_controller.name_to_id(self, ctx.author.name)) - await ComplementsBot.cmd_body( + await bot_controller.cmd_body( ctx, self.is_in_bot_channel, None, - ComplementsBot.DoIfElse((lambda ctx: database.is_user_ignored(userid=userid)), - f"@{ComplementsBot.F_USER} I am no longer ignoring you!", - f"@{ComplementsBot.F_USER} I am not ignoring you!", + bot_controller.DoIfElse((lambda ctx: database.is_user_ignored(userid=userid)), + f"@{bot_controller.F_USER} I am no longer ignoring you!", + f"@{bot_controller.F_USER} I am not ignoring you!", (lambda ctx: database.unignore(userid=userid)), None) ) @@ -623,10 +517,10 @@ async def count(self, ctx: commands.Context) -> None: Shows the number of channels that the bot is active in """ - await ComplementsBot.cmd_body( + await bot_controller.cmd_body( ctx, self.is_in_bot_channel, - always_msg=f"! @{ComplementsBot.F_USER} " + always_msg=f"! @{bot_controller.F_USER} " f"{str(await database.number_of_joined_channels())} channels and counting!" ) @@ -636,10 +530,10 @@ async def about(self, ctx: commands.Context) -> None: Shows some information about the bot """ - await ComplementsBot.cmd_body( + await bot_controller.cmd_body( ctx, self.is_in_bot_channel, - always_msg=f"! @{ComplementsBot.F_USER} " + always_msg=f"! @{bot_controller.F_USER} " "For most up-to-date information on commands, please have a look at " "https://github.com/Ereiarrus/ComplementsBotPy#readme " "and for most up-to-date complements, have a look at " @@ -688,7 +582,8 @@ async def setchance(self, ctx: commands.Context) -> None: return to_send = f"@{channel} complement chance set to {chance}!" - awaitables: Awaitables = Awaitables([database.set_complement_chance(chance, channel, name_to_id=self.name_to_id), + awaitables: Awaitables = Awaitables([database.set_complement_chance(chance, channel, name_to_id=( + lambda x: bot_controller.name_to_id(self, self, x))), ComplementsBot.send_and_log(ctx, to_send)]) await awaitables.gather() @@ -698,17 +593,17 @@ async def disablecmdcomplement(self, ctx: commands.Context) -> None: Prevent chatter from being able to use the !complement command in user's channel """ - userid: str = str(await self.name_to_id(ctx.channel.name)) + userid: str = str(await bot_controller.name_to_id(self, ctx.channel.name)) - await ComplementsBot.cmd_body( + await bot_controller.cmd_body( ctx, self.is_by_broadcaster_or_mod, None, - ComplementsBot.DoIfElse( + bot_controller.DoIfElse( (lambda ctx: database.get_cmd_complement_enabled(userid=userid)), - f"@{ComplementsBot.F_USER} your viewers will no longer be able to make use of the " + f"@{bot_controller.F_USER} your viewers will no longer be able to make use of the " f"!complement command.", - f"@{ComplementsBot.F_USER} your viewers already cannot make use of the !complement command.", + f"@{bot_controller.F_USER} your viewers already cannot make use of the !complement command.", (lambda ctx: database.set_cmd_complement_enabled(False, userid=userid)), None ) @@ -720,17 +615,17 @@ async def enablecmdcomplement(self, ctx: commands.Context) -> None: Allow chatters in user's chat to use the !complement command """ - userid: str = str(await self.name_to_id(ctx.channel.name)) + userid: str = str(await bot_controller.name_to_id(self, ctx.channel.name)) - await ComplementsBot.cmd_body( + await bot_controller.cmd_body( ctx, self.is_by_broadcaster_or_mod, None, - ComplementsBot.DoIfElse( + bot_controller.DoIfElse( (lambda ctx: database.get_cmd_complement_enabled(userid=userid)), - f"@{ComplementsBot.F_USER} your viewers can already make use of the !complement command!", + f"@{bot_controller.F_USER} your viewers can already make use of the !complement command!", - f"@{ComplementsBot.F_USER} your viewers will now be able to make use of the !complement command!", + f"@{bot_controller.F_USER} your viewers will now be able to make use of the !complement command!", None, (lambda ctx: database.set_cmd_complement_enabled(True, userid=userid)) ) @@ -742,16 +637,16 @@ async def disablerandomcomplement(self, ctx: commands.Context) -> None: Prevent the bot from randomly complementing chatters in user's chat """ - userid: str = str(await self.name_to_id(ctx.channel.name)) + userid: str = str(await bot_controller.name_to_id(self, ctx.channel.name)) - await ComplementsBot.cmd_body( + await bot_controller.cmd_body( ctx, self.is_by_broadcaster_or_mod, None, - ComplementsBot.DoIfElse( + bot_controller.DoIfElse( (lambda ctx: database.get_random_complement_enabled(userid=userid)), - f"@{ComplementsBot.F_USER} your viewers will no longer randomly receive complements.", - f"@{ComplementsBot.F_USER} your viewers already do not randomly receive complements.", + f"@{bot_controller.F_USER} your viewers will no longer randomly receive complements.", + f"@{bot_controller.F_USER} your viewers already do not randomly receive complements.", (lambda ctx: database.set_random_complement_enabled(False, userid=userid)), None ) @@ -763,16 +658,16 @@ async def enablerandomcomplement(self, ctx: commands.Context) -> None: Allow the bot to randomly complement chatters in user's chat """ - userid: str = str(await self.name_to_id(ctx.channel.name)) + userid: str = str(await bot_controller.name_to_id(self, ctx.channel.name)) - await ComplementsBot.cmd_body( + await bot_controller.cmd_body( ctx, self.is_by_broadcaster_or_mod, None, - ComplementsBot.DoIfElse( + bot_controller.DoIfElse( (lambda ctx: database.get_random_complement_enabled(userid=userid)), - f"@{ComplementsBot.F_USER} I already randomly send out complements!", - f"@{ComplementsBot.F_USER} your viewers will now randomly receive complements!", + f"@{bot_controller.F_USER} I already randomly send out complements!", + f"@{bot_controller.F_USER} your viewers will now randomly receive complements!", None, (lambda ctx: database.set_random_complement_enabled(True, userid=userid)) ) @@ -798,8 +693,9 @@ async def addcomplement(self, ctx: commands.Context) -> None: return awaitables: Awaitables = \ - Awaitables([database.add_complement(complement, user, name_to_id=self.name_to_id), - ComplementsBot.send_and_log(ctx, f"@{user} new complements added: '{complement}'")]) + Awaitables( + [database.add_complement(complement, user, name_to_id=lambda x: bot_controller.name_to_id(self, x)), + ComplementsBot.send_and_log(ctx, f"@{user} new complements added: '{complement}'")]) await awaitables.gather() @commands.command(aliases=["listcomps"]) @@ -814,7 +710,8 @@ async def listcomplements(self, ctx: commands.Context) -> None: return user: str = ctx.channel.name - custom_complements: list[str] = await database.get_custom_complements(user, name_to_id=self.name_to_id) + custom_complements: list[str] = await database.get_custom_complements(user, name_to_id=( + lambda x: bot_controller.name_to_id(self, x))) comps_msg: str = '"' + '", "'.join(custom_complements) + '"' msgs: list[str] = textwrap.wrap(f"@{user} complements: {comps_msg}", ComplementsBot.DEFAULT_MAX_MSG_LEN) @@ -845,7 +742,7 @@ async def removecomplement(self, ctx: commands.Context) -> None: phrase: str = remove_chars(msg[msg.find(" ") + 1:], regex=r"[^a-z0-9]") user: str = ctx.channel.name - userid: str = str(await self.name_to_id(ctx.channel.name)) + userid: str = str(await bot_controller.name_to_id(self, ctx.channel.name)) to_remove_comps, to_keep_comps = database.complements_to_remove( await database.get_custom_complements(userid=userid), phrase) await database.remove_complements(userid=userid, to_keep=to_keep_comps) @@ -870,12 +767,13 @@ async def removeallcomplements(self, ctx: commands.Context) -> None: Remove all custom complements a user has added """ - await ComplementsBot.cmd_body( + await bot_controller.cmd_body( ctx, self.is_by_broadcaster_or_mod, - (lambda ctx: database.remove_all_complements(ctx.channel.name, name_to_id=self.name_to_id)), + (lambda ctx: database.remove_all_complements(ctx.channel.name, + name_to_id=lambda x: bot_controller.name_to_id(self, x))), None, - f"@{ComplementsBot.F_USER} all of your custom complements have been removed.") + f"@{bot_controller.F_USER} all of your custom complements have been removed.") @commands.command() async def setmutettsprefix(self, ctx: commands.Context) -> None: @@ -890,8 +788,12 @@ async def setmutettsprefix(self, ctx: commands.Context) -> None: msg = msg.strip() prefix: str = msg[msg.find(" ") + 1:] awaitables: Awaitables = \ - Awaitables([database.set_tts_mute_prefix(prefix, ctx.channel.name, name_to_id=self.name_to_id), - ComplementsBot.send_and_log(ctx, f"@{ctx.author.name} mute TTS prefix changed to '{prefix}'.")]) + Awaitables([ + database.set_tts_mute_prefix( + prefix, ctx.channel.name, + name_to_id=lambda x: bot_controller.name_to_id(self, x) + ), + ComplementsBot.send_and_log(ctx, f"@{ctx.author.name} mute TTS prefix changed to '{prefix}'.")]) await awaitables.gather() @commands.command(aliases=["mutecommandcomplement", "mutecommandcomp", "mutecmdcomp"]) @@ -900,16 +802,16 @@ async def mutecmdcomplement(self, ctx: commands.Context) -> None: Mutes TTS for complements sent with !complement command """ - userid: str = str(await self.name_to_id(ctx.channel.name)) + userid: str = str(await bot_controller.name_to_id(self, ctx.channel.name)) - await ComplementsBot.cmd_body( + await bot_controller.cmd_body( ctx, self.is_by_broadcaster_or_mod, None, - ComplementsBot.DoIfElse( + bot_controller.DoIfElse( (lambda ctx: database.is_cmd_complement_muted(userid=userid)), - f"@{ComplementsBot.F_USER} command complements are already muted!", - f"@{ComplementsBot.F_USER} command complements are now muted.", + f"@{bot_controller.F_USER} command complements are already muted!", + f"@{bot_controller.F_USER} command complements are now muted.", None, (lambda ctx: database.set_cmd_complement_is_muted(True, userid=userid)) ) @@ -921,16 +823,16 @@ async def muterandomcomplement(self, ctx: commands.Context) -> None: Mutes TTS for complements given out randomly """ - userid: str = str(await self.name_to_id(ctx.channel.name)) + userid: str = str(await bot_controller.name_to_id(self, ctx.channel.name)) - await ComplementsBot.cmd_body( + await bot_controller.cmd_body( ctx, self.is_by_broadcaster_or_mod, None, - ComplementsBot.DoIfElse( + bot_controller.DoIfElse( (lambda ctx: database.are_random_complements_muted(userid=userid)), - f"@{ComplementsBot.F_USER} random complements are already muted!", - f"@{ComplementsBot.F_USER} random complements are now muted.", + f"@{bot_controller.F_USER} random complements are already muted!", + f"@{bot_controller.F_USER} random complements are now muted.", None, (lambda ctx: database.set_random_complements_are_muted(True, userid=userid)) ) @@ -942,16 +844,16 @@ async def unmutecmdcomplement(self, ctx: commands.Context) -> None: Unmutes TTS for complements sent with !complement command """ - userid: str = str(await self.name_to_id(ctx.channel.name)) + userid: str = str(await bot_controller.name_to_id(self, ctx.channel.name)) - await ComplementsBot.cmd_body( + await bot_controller.cmd_body( ctx, self.is_by_broadcaster_or_mod, None, - ComplementsBot.DoIfElse( + bot_controller.DoIfElse( (lambda ctx: database.is_cmd_complement_muted(userid=userid)), - f"@{ComplementsBot.F_USER} command complements are no longer muted!", - f"@{ComplementsBot.F_USER} command complements are already unmuted!", + f"@{bot_controller.F_USER} command complements are no longer muted!", + f"@{bot_controller.F_USER} command complements are already unmuted!", (lambda ctx: database.set_cmd_complement_is_muted(False, userid=userid)), None ) @@ -963,16 +865,16 @@ async def unmuterandomcomplement(self, ctx: commands.Context) -> None: Unmutes TTS for complements given out randomly """ - userid: str = str(await self.name_to_id(ctx.channel.name)) + userid: str = str(await bot_controller.name_to_id(self, ctx.channel.name)) - await ComplementsBot.cmd_body( + await bot_controller.cmd_body( ctx, self.is_by_broadcaster_or_mod, None, - ComplementsBot.DoIfElse( + bot_controller.DoIfElse( (lambda ctx: database.are_random_complements_muted(userid=userid)), - f"@{ComplementsBot.F_USER} random complements are no longer muted!", - f"@{ComplementsBot.F_USER} random complements are already unmuted!", + f"@{bot_controller.F_USER} random complements are no longer muted!", + f"@{bot_controller.F_USER} random complements are already unmuted!", (lambda ctx: database.set_random_complements_are_muted(False, userid=userid)), None ) @@ -984,16 +886,16 @@ async def enablecustomcomplements(self, ctx: commands.Context) -> None: All custom complements will be added to the pool that we choose complements for chatters from """ - userid: str = str(await self.name_to_id(ctx.channel.name)) + userid: str = str(await bot_controller.name_to_id(self, ctx.channel.name)) - await ComplementsBot.cmd_body( + await bot_controller.cmd_body( ctx, self.is_by_broadcaster_or_mod, None, - ComplementsBot.DoIfElse( + bot_controller.DoIfElse( (lambda ctx: database.are_custom_complements_enabled(userid=userid)), - f"@{ComplementsBot.F_USER} custom complements are already enabled!", - f"@{ComplementsBot.F_USER} custom complements are now enabled!", + f"@{bot_controller.F_USER} custom complements are already enabled!", + f"@{bot_controller.F_USER} custom complements are now enabled!", None, (lambda ctx: database.set_are_custom_complements_enabled(True, userid=userid)) ) @@ -1005,16 +907,16 @@ async def enabledefaultcomplements(self, ctx: commands.Context) -> None: All default complements will be added to the pool that we choose complements for chatters from """ - userid: str = str(await self.name_to_id(ctx.channel.name)) + userid: str = str(await bot_controller.name_to_id(self, ctx.channel.name)) - await ComplementsBot.cmd_body( + await bot_controller.cmd_body( ctx, self.is_by_broadcaster_or_mod, None, - ComplementsBot.DoIfElse( + bot_controller.DoIfElse( (lambda ctx: database.are_default_complements_enabled(userid=userid)), - f"@{ComplementsBot.F_USER} default complements are already enabled!", - f"@{ComplementsBot.F_USER} default complements are now enabled!", + f"@{bot_controller.F_USER} default complements are already enabled!", + f"@{bot_controller.F_USER} default complements are now enabled!", None, (lambda ctx: database.set_are_default_complements_enabled(True, userid=userid)) ) @@ -1027,16 +929,16 @@ async def disablecustomcomplements(self, ctx: commands.Context) -> None: delete the custom complements. """ - userid: str = str(await self.name_to_id(ctx.channel.name)) + userid: str = str(await bot_controller.name_to_id(self, ctx.channel.name)) - await ComplementsBot.cmd_body( + await bot_controller.cmd_body( ctx, self.is_by_broadcaster_or_mod, None, - ComplementsBot.DoIfElse( + bot_controller.DoIfElse( (lambda ctx: database.are_custom_complements_enabled(userid=userid)), - f"@{ComplementsBot.F_USER} custom complements are now disabled.", - f"@{ComplementsBot.F_USER} custom complements are already disabled.", + f"@{bot_controller.F_USER} custom complements are now disabled.", + f"@{bot_controller.F_USER} custom complements are already disabled.", (lambda ctx: database.set_are_custom_complements_enabled(False, userid=userid)), None ) @@ -1048,16 +950,16 @@ async def disabledefaultcomplements(self, ctx: commands.Context) -> None: All default complements will be removed from the pool that we choose complements for chatters from """ - userid: str = str(await self.name_to_id(ctx.channel.name)) + userid: str = str(await bot_controller.name_to_id(self, ctx.channel.name)) - await ComplementsBot.cmd_body( + await bot_controller.cmd_body( ctx, self.is_by_broadcaster_or_mod, None, - ComplementsBot.DoIfElse( + bot_controller.DoIfElse( (lambda ctx: database.are_default_complements_enabled(userid=userid)), - f"@{ComplementsBot.F_USER} default complements are now disabled.", - f"@{ComplementsBot.F_USER} default complements are already disabled!", + f"@{bot_controller.F_USER} default complements are now disabled.", + f"@{bot_controller.F_USER} default complements are already disabled!", (lambda ctx: database.set_are_default_complements_enabled(False, userid=userid)), None ) @@ -1069,16 +971,16 @@ async def unignorebots(self, ctx: commands.Context) -> None: Chatters that count as bots might be complemented by ComplementsBot """ - userid: str = str(await self.name_to_id(ctx.channel.name)) + userid: str = str(await bot_controller.name_to_id(self, ctx.channel.name)) - await ComplementsBot.cmd_body( + await bot_controller.cmd_body( ctx, self.is_by_broadcaster_or_mod, None, - ComplementsBot.DoIfElse( + bot_controller.DoIfElse( (lambda ctx: database.is_ignoring_bots(userid=userid)), - f"@{ComplementsBot.F_USER} bots have a chance of being complemented!", - f"@{ComplementsBot.F_USER} bots can already get complements!", + f"@{bot_controller.F_USER} bots have a chance of being complemented!", + f"@{bot_controller.F_USER} bots can already get complements!", (lambda ctx: database.set_should_ignore_bots(False, userid=userid)), None ) @@ -1090,16 +992,16 @@ async def ignorebots(self, ctx: commands.Context) -> None: Chatters that count as bots will not be complemented by ComplementsBot """ - userid: str = str(await self.name_to_id(ctx.channel.name)) + userid: str = str(await bot_controller.name_to_id(self, ctx.channel.name)) - await ComplementsBot.cmd_body( + await bot_controller.cmd_body( ctx, self.is_by_broadcaster_or_mod, None, - ComplementsBot.DoIfElse( + bot_controller.DoIfElse( (lambda ctx: database.is_ignoring_bots(userid=userid)), - f"@{ComplementsBot.F_USER} bots are already not getting complements.", - f"@{ComplementsBot.F_USER} bots will no longer get complemented.", + f"@{bot_controller.F_USER} bots are already not getting complements.", + f"@{bot_controller.F_USER} bots will no longer get complemented.", None, (lambda ctx: database.set_should_ignore_bots(True, userid=userid)) ) @@ -1111,7 +1013,7 @@ async def compleave(self, ctx: commands.Context) -> None: Allows the user to kick ComplementsBot out of their channel from their own channel chat """ - userid: str = str(await self.name_to_id(ctx.channel.name)) + userid: str = str(await bot_controller.name_to_id(self, ctx.channel.name)) async def do_true(ctx: commands.Context) -> None: # Update database and in realtime for "instant" effect @@ -1119,14 +1021,15 @@ async def do_true(ctx: commands.Context) -> None: self.part_channels([ctx.channel.name])]) await awaitables.gather() - await ComplementsBot.cmd_body( + await bot_controller.cmd_body( ctx, (lambda ctx: ctx.author.name == ctx.channel.name), None, - ComplementsBot.DoIfElse((lambda ctx: database.is_channel_joined(userid=userid)), - f"@{ComplementsBot.F_USER} I have left your channel.", - f"@{ComplementsBot.F_USER} I have not joined your channel.", - do_true, - None - ) + bot_controller.DoIfElse( + (lambda ctx: database.is_channel_joined(userid=userid)), + f"@{bot_controller.F_USER} I have left your channel.", + f"@{bot_controller.F_USER} I have not joined your channel.", + do_true, + None + ) ) diff --git a/src/complements_bot/bot_controller.py b/src/complements_bot/bot_controller.py index 3a4461e..5574939 100644 --- a/src/complements_bot/bot_controller.py +++ b/src/complements_bot/bot_controller.py @@ -1,8 +1,15 @@ """ Holds all logic for how ComplementsBot should complement Twitch chatters """ -# import asyncio -# from typing import Awaitable, Callable, Optional, Tuple, Union +import asyncio +from typing import Awaitable, Callable, Iterable, Optional, Union + +# from twitchio import Message +from twitchio.ext import commands # , routines , eventsub + +from .utilities import Awaitables, run_with_appropriate_awaiting + +F_USER: str = "{user}" def custom_log(msg: str) -> None: @@ -42,3 +49,119 @@ def isolate_args(full_cmd_msg: str) -> str: # won't give 'index out of range' as message can't end on a space due to the strip() return full_cmd_msg[first_space_at + 1:] + + +class DoIfElse: + """ + Serves as a way to store what to do in an if/else block of a lot of the bodies of the commands + """ + + def __init__(self, + if_check: Union[Callable[[commands.Context], Awaitable[bool]], Callable[[commands.Context], bool]], + true_msg: Optional[str], + false_msg: Optional[str], + do_true: Optional[Union[ + Callable[[commands.Context], Awaitable[None]], Callable[[commands.Context], None]]] = None, + do_false: Optional[Union[Callable[[commands.Context], Awaitable[None]], Callable[ + [commands.Context], None]]] = None) -> None: + """ + :param if_check: what the condition for entering 'if' statement is + :param do_true: what to do when the if_check succeeds (done before sending message to chat); + if 'None', does nothing + :param true_msg: what to send to chat when the 'if' if_check succeeds; any occurrence of + complements_bot.F_USER in the string is replaced with the name of the user in chat who called the + original command + :param do_false: what to do when if_check fails (done before sending message to chat); + if 'None', does nothing + :param false_msg: what to send to chat when the if_check fails; any occurrence of complements_bot.F_USER + in the string is replaced with the name of the user in chat who called the original command + """ + + self.if_check: Union[Callable[[commands.Context], Awaitable[bool]], Callable[[commands.Context], bool]] \ + = if_check + self.true_msg: Optional[str] = true_msg + self.false_msg: Optional[str] = false_msg + + self.do_true: Union[Callable[[commands.Context], Awaitable[None]], Callable[ + [commands.Context], None]] = do_true or ( + lambda ctx: None) + self.do_false: Union[Callable[[commands.Context], None], Callable[ + [commands.Context], Awaitable[None]]] = do_false or ( + lambda ctx: None) + + +async def cmd_body(ctx: commands.Context, + permission_check: Union[ + Callable[[commands.Context], bool], Callable[[commands.Context], Awaitable[bool]]], + do_always: Optional[ + Union[Callable[[commands.Context], Awaitable[None]], Callable[[commands.Context], None]]] = None, + do_if_else: Optional[DoIfElse] = None, + always_msg: Optional[str] = None) -> Optional[Iterable[Optional[str]]]: + """ + The main structure in which commands sent to the bot's channel need to be processed + :param ctx: context from the original call + :param permission_check: who is allowed to run the command in question + :param do_always: this is always ASYNCHRONOUSLY called if permission_check passes; if 'None', does nothing + :param do_if_else: specifies the if condition on which to enter the if block, what to do in the if and else + blocks, and the messages to send in either block; if None, all if/else logic is skipped + :param always_msg: message to always send to chat (after do_before_if and do_if_else); if None, send nothing; + any occurrence of ComplementsBot.F_USER in the string is replaced with the name of the user in chat who + called the original command + :return: True if permission_check passes, False otherwise + """ + + awaitables: Awaitables = Awaitables([]) + + user: str = ctx.author.name + + permission_check_task: asyncio.Task = asyncio.create_task(run_with_appropriate_awaiting(permission_check, ctx)) + + to_send: list[str] = [] + if do_if_else is not None: + permission_check_res, if_check_res = await asyncio.gather(permission_check_task, + run_with_appropriate_awaiting(do_if_else.if_check, ctx)) + if not permission_check_res: + return None + + if if_check_res: + awaitables.add_task(run_with_appropriate_awaiting(do_if_else.do_true, ctx)) + if do_if_else.true_msg: + to_send.append(do_if_else.true_msg.replace(F_USER, user)) + else: + awaitables.add_task(run_with_appropriate_awaiting(do_if_else.do_false, ctx)) + if do_if_else.false_msg: + to_send.append(do_if_else.false_msg.replace(F_USER, user)) + elif not await permission_check_task: + return None + + if always_msg is not None: + to_send.append(always_msg.replace(F_USER, user)) + + awaitables.add_task(run_with_appropriate_awaiting(do_always, ctx)) + await awaitables.gather() + + return to_send + + +async def name_to_id(bot: commands.Bot, username: str) -> Optional[str]: + """ + :param bot: the twitchio bot to use + :param username: the username of the user whose user id we want + :return: the user id of the specified user, if the user exists; otherwise 'None' + """ + res = await bot.fetch_users(names=[username]) + if len(res) > 0: + return str(res[0].id) + return None + + +async def id_to_name(bot: commands.Bot, uid: str) -> Optional[str]: + """ + :param bot: the twitchio bot to use + :param uid: the user id of the user whose username we want + :return: the username of the specified user, if the user exists; otherwise 'None' + """ + res = await bot.fetch_users(ids=[int(uid)]) + if len(res) > 0: + return res[0].name + return None