Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/youtube summarisation #2

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,6 @@ Thumbs.db

# Debugging
debug.log

downloads/
src/test/
81 changes: 71 additions & 10 deletions src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,24 @@ def save_debug_info(output_folder, content, topics, clips):

def upload_to_s3(file_path, config):
logger.debug(f"Uploading file to S3: {file_path}")
command = f"{config['aws_cli_path']} s3 cp {file_path} s3://{config['s3_bucket']}/public/{os.path.basename(file_path)}"
# Use raw string literals or replace single backslashes with double backslashes
file_path = r"{}".format(file_path)
s3_destination = f"s3://{config['s3_bucket']}/public/{os.path.basename(file_path)}"
command = f"{config['aws_cli_path']} s3 cp \"{file_path}\" \"{s3_destination}\""
logger.debug(f"S3 upload command: {command}")
subprocess.run(command, shell=True, check=True)
logger.info(f"File uploaded successfully to S3: {file_path}")

# Check if the file exists before attempting to upload
if not os.path.isfile(file_path):
logger.error(f"The file does not exist: {file_path}")
return

try:
subprocess.run(command, shell=True, check=True)
logger.info(f"File uploaded successfully to S3: {file_path}")
except subprocess.CalledProcessError as e:
logger.error(f"Failed to upload file to S3: {e}")
raise


def get_s3_presigned_url(file_name, config):
logger.debug(f"Getting presigned URL for file: {file_name}")
Expand Down Expand Up @@ -255,7 +269,7 @@ def find_sentence_boundary(transcript, time, direction):
start_time = max(0, start_time - buffer)
end_time += buffer

command = f"/opt/homebrew/bin/ffmpeg -i {source_file} -ss {start_time:.2f} -to {end_time:.2f} -y -c copy {output_file}"
command = f"ffmpeg -i {source_file} -ss {start_time:.2f} -to {end_time:.2f} -y -c copy {output_file}"
ffmpeg_commands.append(command)

logger.debug(f"Generated FFmpeg commands: {ffmpeg_commands}")
Expand All @@ -269,7 +283,7 @@ def execute_ffmpeg_commands(commands):
subprocess.run(command.strip(), shell=True, check=True)
logger.info("All FFmpeg commands executed successfully")

def main(media_file, goal=TranscriptionGoal.GENERAL_TRANSCRIPTION, progress_callback=None):
def transcribe_video(media_file, goal=TranscriptionGoal.GENERAL_TRANSCRIPTION, progress_callback=None):
try:
logger.info(f"Starting main process for file: {media_file}")
if progress_callback:
Expand Down Expand Up @@ -324,13 +338,60 @@ def main(media_file, goal=TranscriptionGoal.GENERAL_TRANSCRIPTION, progress_call
logger.error(f"An error occurred: {str(e)}", exc_info=True)
raise

if __name__ == "__main__":
logger.info("Script started")
media_file = prompt_for_media_file()

def download_youtube_video(youtube_url):
# Extract the video ID from the YouTube URL
video_id = youtube_url.split('v=')[1]
# Define the output template for the downloaded video using the video ID
output_template = os.path.join('downloads', f'{video_id}.%(ext)s')

# Ensure the downloads directory exists
if not os.path.exists('downloads'):
os.makedirs('downloads')

# Define the command to download the video using yt-dlp
command = [
'yt-dlp',
'-f', 'bestvideo[ext=mp4]+bestaudio[ext=m4a]/mp4',
'--output', output_template,
youtube_url
]

# Run the yt-dlp command
result = subprocess.run(command, capture_output=True, text=True)
# Check for errors in stderr
if result.stderr:
print(result.stderr)

# Check if the file with the video ID already exists
downloaded_file_path = output_template.replace('%(ext)s', 'mp4') # Assuming mp4 is the extension
if os.path.exists(downloaded_file_path):
return downloaded_file_path
else:
# If the file path couldn't be found, raise an error
raise Exception("The video could not be downloaded or the file path could not be found.")



def main():
choice = input("Enter '1' to upload a local file or '2' to transcribe a YouTube video: ")
if choice == '1':
media_file = prompt_for_media_file()
elif choice == '2':
youtube_url = input("Enter the full YouTube video URL: ")
media_file = download_youtube_video(youtube_url)
else:
print("Invalid choice. Exiting.")
return

if media_file:
logger.info(f"Media file selected: {media_file}")
goal = prompt_for_goal()
logger.info(f"Transcription goal selected: {goal.value}")
main(media_file, goal)
transcribe_video(media_file, goal)
else:
logger.warning("No media file selected. Exiting.")
logger.warning("No media file selected. Exiting.")


if __name__ == "__main__":
main()