diff --git a/nonebot_plugin_anime_downloader/__init__.py b/nonebot_plugin_anime_downloader/__init__.py index 97c8329..41ff83e 100644 --- a/nonebot_plugin_anime_downloader/__init__.py +++ b/nonebot_plugin_anime_downloader/__init__.py @@ -25,6 +25,7 @@ from .tasks import TaskManager from .routes import VideoManager from .downloader import TorrentDownloader +from .downloader.exceptions import TorrentExistsError from .data_source import ACGRIPBase, UserBase, VideoBase, ACGRIPData, User, Video from .utils import is_tag_match_title, generate_folder_name, extract_tags_from_title from .acgrip import ( @@ -93,29 +94,21 @@ video_session = sessionmaker(bind=video_engine)() -async def check_video_downloaded(title: str, torrent_id: int, user_id: str): - logger.info(f"Checking whether {title} is downloaded for {user_id}...") - if torrent_id in video_manager.ids: - logger.info(f"{title} is downloaded for {user_id}! Sending message...") - # send message to user - msg = ( - f"{title} 现在可以观看了!\n" - + f"{plugin_config.anime_url}:{nonebot.get_driver().config.port}/anime/{torrent_id}" - ) - chat_type = str(user_id).split("_")[0] - id_ = str(user_id).split("_")[-1] - target = Target( - id=id_, - parent_id=id_ if chat_type == "group" else "", - channel=chat_type == "group", - private=chat_type == "private", - ) - await UniMessage(msg).send(target) - logger.info(f"Message sent to {user_id}!") - - scheduler.remove_job(f"{title}_{user_id}") - else: - logger.info(f"{title} is not downloaded yet for {user_id}.") +async def send_notification(title: str, torrent_id: int, user_id: str): + msg = ( + f"{title} 现在可以观看了!\n" + + f"{plugin_config.anime_url}:{nonebot.get_driver().config.port}/anime/{torrent_id}" + ) + chat_type = str(user_id).split("_")[0] + id_ = str(user_id).split("_")[-1] + target = Target( + id=id_, + parent_id=id_ if chat_type == "group" else "", + channel=chat_type == "group", + private=chat_type == "private", + ) + await UniMessage(msg).send(target) + logger.info(f"Message sent to {user_id}!") @scheduler.scheduled_job("interval", seconds=plugin_config.acgrip_interval) @@ -134,7 +127,7 @@ async def fetch_acgrip_data(): logger.debug("No new data found.") return None - logger.info(f"ACG.RIP updated with {len(new_data)} new entries.\n{new_data}") + logger.info(f"ACG.RIP updated with {len(new_data)} new entries.") # store new data for entry in new_data: @@ -151,20 +144,7 @@ async def fetch_acgrip_data(): if is_tag_match_title(tags, entry["title"]): # check if the video is already downloaded if entry["id"] in video_manager.ids: - # send message to user - msg = ( - f"{entry['title']} 现在可以观看了!\n" - + f"{plugin_config.anime_url}:{nonebot.get_driver().config.port}/anime/{entry['id']}" - ) - chat_type = str(user.id).split("_")[0] - id_ = str(user.id).split("_")[-1] - target = Target( - id=id_, - parent_id=id_ if chat_type == "group" else "", - channel=chat_type == "group", - private=chat_type == "private", - ) - await UniMessage(msg).send(target) + await send_notification(entry["title"], entry["id"], user.id) continue # start download torrent @@ -180,6 +160,11 @@ async def fetch_acgrip_data(): torrent_info = await torrent_downloader.download_torrent( torrent_data, generate_folder_name(tags) ) + except TorrentExistsError: + logger.warning( + f"Torrent {entry['title']} already exists in the download queue." + ) + continue except Exception: logger.warning( f"Failed to download {entry['title']} for {user.id}! Probably due to the same torrent." @@ -189,59 +174,71 @@ async def fetch_acgrip_data(): "id": user.id, "content": torrent_info, "torrent_id": int(entry["id"]), + "status": "downloading", } ) - # set a scheduler to check if the video is downloaded - scheduler.add_job( - check_video_downloaded, - "interval", - seconds=10, - args=(entry["title"], int(entry["id"]), user.id), - id=f"{entry['title']}_{user.id}", - ) - @scheduler.scheduled_job("interval", seconds=60) async def check_for_tasks(): - tasks = task_manager.get() - - for task in tasks: + for task in task_manager.content: hash_str = task["content"]["hash"] torrent_info = await torrent_downloader.get_torrent_info(hash_str) - if torrent_info["progress"] != 1: - logger.info(f"Torrent {task['content']['name']} is not downloaded yet.") - task_manager.add( - { - "id": task["id"], - "content": torrent_info, - "torrent_id": task["torrent_id"], - } + if torrent_info["progress"] == 1: + logger.info(f"Torrent {task['content']['name']} downloaded.") + task["status"] = "downloaded" + task_manager.save() + + if task["status"] == "downloaded": + existing_video = ( + video_session.query(Video).filter_by(id=task["torrent_id"]).first() ) - return None + if existing_video is None: + video_session.add( + Video( + id=task["torrent_id"], + title=torrent_info["name"], + path=f"{torrent_info['save_path']}\\{torrent_info['name']}", + ) + ) - logger.success( - f"Torrent {task['content']['name']} is downloaded, waiting to be added to the route..." - ) + video_session.commit() - existing_video = ( - video_session.query(Video).filter_by(id=task["torrent_id"]).first() - ) - if existing_video is None: - video_session.add( - Video( - id=task["torrent_id"], - title=torrent_info["name"], - path=f"{torrent_info['save_path']}\\{torrent_info['name']}", - ) + if not Path( + f"{torrent_info['save_path']}\\{torrent_info['name']}" + ).exists(): + continue + + video_manager.add_route( + Path(f"{torrent_info['save_path']}\\{torrent_info['name']}"), + task["torrent_id"], ) - video_session.commit() + logger.success(f"Added route for video {torrent_info['name']}.") + + task["status"] = "wait_for_send" + + if task["status"] == "wait_for_send": + await send_notification( + torrent_info["name"], task["torrent_id"], task["id"] + ) + + task["status"] = "sent" + task_manager.save() + + logger.info( + f"Notification of {torrent_info['name']} sent for {task['id']}." + ) + + # remove sent tasks + task_manager.content = [ + task for task in task_manager.content if task["status"] != "sent" + ] + task_manager.save() -@scheduler.scheduled_job("interval", seconds=30) @nonebot.get_driver().on_startup async def check_for_new_videos(): videos = video_session.query(Video).all() @@ -412,23 +409,18 @@ async def download_handle( torrent_info = await torrent_downloader.download_torrent( torrent_data, generate_folder_name(tags) ) - except Exception: - await download.finish("下载失败!请稍后重试。") + except TorrentExistsError: + await download.finish("任务已处于下载队列中") + except Exception as e: + await download.finish(f"下载失败!请稍后重试。\n{type(e).__name__}: {e}") task_manager.add( { "id": id_, "content": torrent_info, "torrent_id": int(torrent_id), + "status": "downloading", } ) await download.send(f"开始下载 {anime_entry.title}...") - - scheduler.add_job( - check_video_downloaded, - "interval", - seconds=10, - args=(anime_entry.title, int(torrent_id), id_), - id=f"{anime_entry.title}_{id_}", - ) diff --git a/nonebot_plugin_anime_downloader/downloader/__init__.py b/nonebot_plugin_anime_downloader/downloader/__init__.py index 8b55fc9..a19ac47 100644 --- a/nonebot_plugin_anime_downloader/downloader/__init__.py +++ b/nonebot_plugin_anime_downloader/downloader/__init__.py @@ -8,6 +8,7 @@ from nonebot.utils import run_sync from .models import TorrentInfo +from .exceptions import TorrentExistsError, TorrentUnexistsError class TorrentDownloader: @@ -24,6 +25,11 @@ def _get_torrent_hash(self, torrent_file: bytes): async def download_torrent( self, torrent_file: bytes, folder_name: str ) -> TorrentInfo: + hash_str = self._get_torrent_hash(torrent_file) + + if (await self.is_torrent_exists(hash_str)): + raise TorrentExistsError(f"Torrent with hash {hash_str} already exists.") + text = await run_sync(self.client.torrents_add)( torrent_files=torrent_file, save_path=self.download_path / folder_name ) @@ -31,7 +37,6 @@ async def download_torrent( if text == "Fails.": raise Exception("Failed to download torrent.") - hash_str = self._get_torrent_hash(torrent_file) await run_sync(self.client.torrents_reannounce)(torrent_hashes=hash_str) torrent_info = ( @@ -40,6 +45,12 @@ async def download_torrent( return torrent_info + async def is_torrent_exists(self, hash_str: str) -> bool: + torrents = await run_sync(self.client.torrents_info)(torrent_hashes=hash_str) + return torrents != [] + async def get_torrent_info(self, hash_str: str) -> TorrentInfo: torrents = await run_sync(self.client.torrents_info)(torrent_hashes=hash_str) + if torrents == []: + raise TorrentUnexistsError(f"Torrent with hash {hash_str} does not exist.") return torrents[0] diff --git a/nonebot_plugin_anime_downloader/downloader/exceptions.py b/nonebot_plugin_anime_downloader/downloader/exceptions.py new file mode 100644 index 0000000..b3f27b2 --- /dev/null +++ b/nonebot_plugin_anime_downloader/downloader/exceptions.py @@ -0,0 +1,20 @@ +class TorrentExistsError(Exception): + def __init__(self, message: str): + self.message = message + + def __str__(self): + return f"TorrentExistsError: {self.message}" + + def __repr__(self): + return f"TorrentExistsError: {self.message}" + + +class TorrentUnexistsError(Exception): + def __init__(self, message: str): + self.message = message + + def __str__(self): + return f"TorrentUnexistsError: {self.message}" + + def __repr__(self): + return f"TorrentUnexistsError: {self.message}" diff --git a/nonebot_plugin_anime_downloader/models.py b/nonebot_plugin_anime_downloader/models.py index 606b1da..c3bbc7a 100644 --- a/nonebot_plugin_anime_downloader/models.py +++ b/nonebot_plugin_anime_downloader/models.py @@ -7,3 +7,4 @@ class Task(TypedDict): id: str # group/private_id content: TorrentInfo torrent_id: int + status: str diff --git a/nonebot_plugin_anime_downloader/routes.py b/nonebot_plugin_anime_downloader/routes.py index 37451e8..0aeb34f 100644 --- a/nonebot_plugin_anime_downloader/routes.py +++ b/nonebot_plugin_anime_downloader/routes.py @@ -75,4 +75,8 @@ def iterfile(): media_type=f"video/{video_path.suffix[1:]}", ) - logger.success(f"Added route for video {video_path.name}.") + def add_routes(self, video_paths: List[Path], torrent_ids: List[int]) -> None: + for video_path, torrent_id in zip(video_paths, torrent_ids): + self.add_route(video_path, torrent_id) + + logger.success(f"Added routes for {len(video_paths)} videos.") diff --git a/nonebot_plugin_anime_downloader/tasks.py b/nonebot_plugin_anime_downloader/tasks.py index 1d8cc8d..a36d814 100644 --- a/nonebot_plugin_anime_downloader/tasks.py +++ b/nonebot_plugin_anime_downloader/tasks.py @@ -22,9 +22,3 @@ def save(self) -> None: def add(self, content: Task) -> None: self.content.append(content) self.save() - - def get(self) -> List[Task]: - tasks = self.content - self.content = [] - self.save() - return tasks