Skip to content

Commit

Permalink
✨ optimize download flow
Browse files Browse the repository at this point in the history
  • Loading branch information
zhaomaoniu committed May 12, 2024
1 parent 7a71e0b commit fb50418
Show file tree
Hide file tree
Showing 6 changed files with 114 additions and 92 deletions.
160 changes: 76 additions & 84 deletions nonebot_plugin_anime_downloader/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from .tasks import TaskManager
from .routes import VideoManager
from .downloader import TorrentDownloader
from .downloader.exceptions import TorrentExistsError
from .data_source import ACGRIPBase, UserBase, VideoBase, ACGRIPData, User, Video
from .utils import is_tag_match_title, generate_folder_name, extract_tags_from_title
from .acgrip import (
Expand Down Expand Up @@ -93,29 +94,21 @@
video_session = sessionmaker(bind=video_engine)()


async def check_video_downloaded(title: str, torrent_id: int, user_id: str):
logger.info(f"Checking whether {title} is downloaded for {user_id}...")
if torrent_id in video_manager.ids:
logger.info(f"{title} is downloaded for {user_id}! Sending message...")
# send message to user
msg = (
f"{title} 现在可以观看了!\n"
+ f"{plugin_config.anime_url}:{nonebot.get_driver().config.port}/anime/{torrent_id}"
)
chat_type = str(user_id).split("_")[0]
id_ = str(user_id).split("_")[-1]
target = Target(
id=id_,
parent_id=id_ if chat_type == "group" else "",
channel=chat_type == "group",
private=chat_type == "private",
)
await UniMessage(msg).send(target)
logger.info(f"Message sent to {user_id}!")

scheduler.remove_job(f"{title}_{user_id}")
else:
logger.info(f"{title} is not downloaded yet for {user_id}.")
async def send_notification(title: str, torrent_id: int, user_id: str):
msg = (
f"{title} 现在可以观看了!\n"
+ f"{plugin_config.anime_url}:{nonebot.get_driver().config.port}/anime/{torrent_id}"
)
chat_type = str(user_id).split("_")[0]
id_ = str(user_id).split("_")[-1]
target = Target(
id=id_,
parent_id=id_ if chat_type == "group" else "",
channel=chat_type == "group",
private=chat_type == "private",
)
await UniMessage(msg).send(target)
logger.info(f"Message sent to {user_id}!")


@scheduler.scheduled_job("interval", seconds=plugin_config.acgrip_interval)
Expand All @@ -134,7 +127,7 @@ async def fetch_acgrip_data():
logger.debug("No new data found.")
return None

logger.info(f"ACG.RIP updated with {len(new_data)} new entries.\n{new_data}")
logger.info(f"ACG.RIP updated with {len(new_data)} new entries.")

# store new data
for entry in new_data:
Expand All @@ -151,20 +144,7 @@ async def fetch_acgrip_data():
if is_tag_match_title(tags, entry["title"]):
# check if the video is already downloaded
if entry["id"] in video_manager.ids:
# send message to user
msg = (
f"{entry['title']} 现在可以观看了!\n"
+ f"{plugin_config.anime_url}:{nonebot.get_driver().config.port}/anime/{entry['id']}"
)
chat_type = str(user.id).split("_")[0]
id_ = str(user.id).split("_")[-1]
target = Target(
id=id_,
parent_id=id_ if chat_type == "group" else "",
channel=chat_type == "group",
private=chat_type == "private",
)
await UniMessage(msg).send(target)
await send_notification(entry["title"], entry["id"], user.id)
continue

# start download torrent
Expand All @@ -180,6 +160,11 @@ async def fetch_acgrip_data():
torrent_info = await torrent_downloader.download_torrent(
torrent_data, generate_folder_name(tags)
)
except TorrentExistsError:
logger.warning(
f"Torrent {entry['title']} already exists in the download queue."
)
continue
except Exception:
logger.warning(
f"Failed to download {entry['title']} for {user.id}! Probably due to the same torrent."
Expand All @@ -189,59 +174,71 @@ async def fetch_acgrip_data():
"id": user.id,
"content": torrent_info,
"torrent_id": int(entry["id"]),
"status": "downloading",
}
)

# set a scheduler to check if the video is downloaded
scheduler.add_job(
check_video_downloaded,
"interval",
seconds=10,
args=(entry["title"], int(entry["id"]), user.id),
id=f"{entry['title']}_{user.id}",
)


@scheduler.scheduled_job("interval", seconds=60)
async def check_for_tasks():
tasks = task_manager.get()

for task in tasks:
for task in task_manager.content:
hash_str = task["content"]["hash"]

torrent_info = await torrent_downloader.get_torrent_info(hash_str)

if torrent_info["progress"] != 1:
logger.info(f"Torrent {task['content']['name']} is not downloaded yet.")
task_manager.add(
{
"id": task["id"],
"content": torrent_info,
"torrent_id": task["torrent_id"],
}
if torrent_info["progress"] == 1:
logger.info(f"Torrent {task['content']['name']} downloaded.")
task["status"] = "downloaded"
task_manager.save()

if task["status"] == "downloaded":
existing_video = (
video_session.query(Video).filter_by(id=task["torrent_id"]).first()
)
return None
if existing_video is None:
video_session.add(
Video(
id=task["torrent_id"],
title=torrent_info["name"],
path=f"{torrent_info['save_path']}\\{torrent_info['name']}",
)
)

logger.success(
f"Torrent {task['content']['name']} is downloaded, waiting to be added to the route..."
)
video_session.commit()

existing_video = (
video_session.query(Video).filter_by(id=task["torrent_id"]).first()
)
if existing_video is None:
video_session.add(
Video(
id=task["torrent_id"],
title=torrent_info["name"],
path=f"{torrent_info['save_path']}\\{torrent_info['name']}",
)
if not Path(
f"{torrent_info['save_path']}\\{torrent_info['name']}"
).exists():
continue

video_manager.add_route(
Path(f"{torrent_info['save_path']}\\{torrent_info['name']}"),
task["torrent_id"],
)

video_session.commit()
logger.success(f"Added route for video {torrent_info['name']}.")

task["status"] = "wait_for_send"

if task["status"] == "wait_for_send":
await send_notification(
torrent_info["name"], task["torrent_id"], task["id"]
)

task["status"] = "sent"
task_manager.save()

logger.info(
f"Notification of {torrent_info['name']} sent for {task['id']}."
)

# remove sent tasks
task_manager.content = [
task for task in task_manager.content if task["status"] != "sent"
]
task_manager.save()


@scheduler.scheduled_job("interval", seconds=30)
@nonebot.get_driver().on_startup
async def check_for_new_videos():
videos = video_session.query(Video).all()
Expand Down Expand Up @@ -412,23 +409,18 @@ async def download_handle(
torrent_info = await torrent_downloader.download_torrent(
torrent_data, generate_folder_name(tags)
)
except Exception:
await download.finish("下载失败!请稍后重试。")
except TorrentExistsError:
await download.finish("任务已处于下载队列中")
except Exception as e:
await download.finish(f"下载失败!请稍后重试。\n{type(e).__name__}: {e}")

task_manager.add(
{
"id": id_,
"content": torrent_info,
"torrent_id": int(torrent_id),
"status": "downloading",
}
)

await download.send(f"开始下载 {anime_entry.title}...")

scheduler.add_job(
check_video_downloaded,
"interval",
seconds=10,
args=(anime_entry.title, int(torrent_id), id_),
id=f"{anime_entry.title}_{id_}",
)
13 changes: 12 additions & 1 deletion nonebot_plugin_anime_downloader/downloader/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from nonebot.utils import run_sync

from .models import TorrentInfo
from .exceptions import TorrentExistsError, TorrentUnexistsError


class TorrentDownloader:
Expand All @@ -24,14 +25,18 @@ def _get_torrent_hash(self, torrent_file: bytes):
async def download_torrent(
self, torrent_file: bytes, folder_name: str
) -> TorrentInfo:
hash_str = self._get_torrent_hash(torrent_file)

if (await self.is_torrent_exists(hash_str)):
raise TorrentExistsError(f"Torrent with hash {hash_str} already exists.")

text = await run_sync(self.client.torrents_add)(
torrent_files=torrent_file, save_path=self.download_path / folder_name
)

if text == "Fails.":
raise Exception("Failed to download torrent.")

hash_str = self._get_torrent_hash(torrent_file)
await run_sync(self.client.torrents_reannounce)(torrent_hashes=hash_str)

torrent_info = (
Expand All @@ -40,6 +45,12 @@ async def download_torrent(

return torrent_info

async def is_torrent_exists(self, hash_str: str) -> bool:
torrents = await run_sync(self.client.torrents_info)(torrent_hashes=hash_str)
return torrents != []

async def get_torrent_info(self, hash_str: str) -> TorrentInfo:
torrents = await run_sync(self.client.torrents_info)(torrent_hashes=hash_str)
if torrents == []:
raise TorrentUnexistsError(f"Torrent with hash {hash_str} does not exist.")
return torrents[0]
20 changes: 20 additions & 0 deletions nonebot_plugin_anime_downloader/downloader/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
class TorrentExistsError(Exception):
def __init__(self, message: str):
self.message = message

def __str__(self):
return f"TorrentExistsError: {self.message}"

def __repr__(self):
return f"TorrentExistsError: {self.message}"


class TorrentUnexistsError(Exception):
def __init__(self, message: str):
self.message = message

def __str__(self):
return f"TorrentUnexistsError: {self.message}"

def __repr__(self):
return f"TorrentUnexistsError: {self.message}"
1 change: 1 addition & 0 deletions nonebot_plugin_anime_downloader/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ class Task(TypedDict):
id: str # group/private_id
content: TorrentInfo
torrent_id: int
status: str
6 changes: 5 additions & 1 deletion nonebot_plugin_anime_downloader/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,4 +75,8 @@ def iterfile():
media_type=f"video/{video_path.suffix[1:]}",
)

logger.success(f"Added route for video {video_path.name}.")
def add_routes(self, video_paths: List[Path], torrent_ids: List[int]) -> None:
for video_path, torrent_id in zip(video_paths, torrent_ids):
self.add_route(video_path, torrent_id)

logger.success(f"Added routes for {len(video_paths)} videos.")
6 changes: 0 additions & 6 deletions nonebot_plugin_anime_downloader/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,3 @@ def save(self) -> None:
def add(self, content: Task) -> None:
self.content.append(content)
self.save()

def get(self) -> List[Task]:
tasks = self.content
self.content = []
self.save()
return tasks

0 comments on commit fb50418

Please sign in to comment.