From ada2c135efb3bcf15370e197565adb54ae4e0741 Mon Sep 17 00:00:00 2001 From: Jeremy Watt Date: Wed, 17 Jul 2024 04:56:46 -0700 Subject: [PATCH] swap to gradio and docker added --- Dockerfile | 14 ++ README.md | 10 +- docker-compose.yml | 8 + requirements.txt | 5 +- tests/__init__.py | 2 +- tests/test_app.py | 27 +++ tests/test_streamlit.py | 27 --- youtube_downloader/app.py | 343 +++++------------------------- youtube_downloader/download.py | 42 ---- youtube_downloader/streams.py | 57 ----- youtube_downloader/yt_download.py | 44 ++++ 11 files changed, 162 insertions(+), 417 deletions(-) create mode 100644 Dockerfile create mode 100644 docker-compose.yml create mode 100644 tests/test_app.py delete mode 100644 tests/test_streamlit.py delete mode 100644 youtube_downloader/download.py delete mode 100644 youtube_downloader/streams.py create mode 100644 youtube_downloader/yt_download.py diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..ce623a2 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,14 @@ +FROM python:3.10-slim + +WORKDIR /home + +ENV PYTHONPATH=. + +COPY requirements.txt /home/requirements.txt +COPY youtube_downloader /home/youtube_downloader +RUN pip3 install -r /home/requirements.txt + +EXPOSE 7860 +ENV GRADIO_SERVER_NAME="0.0.0.0" + +CMD ["gradio", "/home/youtube_downloader/app.py"] \ No newline at end of file diff --git a/README.md b/README.md index faac80e..9fd9f62 100644 --- a/README.md +++ b/README.md @@ -4,8 +4,14 @@ A simple python app that lets you painlessly download youtube / shorts video files without needing to visit chintzy sites online. Can be easily run locally. Try it out now in your browser at [![HuggingFace Space](https://img.shields.io/badge/🤗-HuggingFace%20Space-cyan.svg)](https://huggingface.co/spaces/neonwatty/youtube_downloader). -To run the app install the associated `requirements.txt` (in a venv) and run +To run the app install the associated `requirements.txt` and run ```python -python -m streamlit run youtube_downloader/app.py +python youtube_downloader/app.py +``` + +Or run via Docker + +```sh +docker compose up ``` diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..1c598b6 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,8 @@ +services: + youtube_downloader: + build: + context: . + image: youtube_downloader + container_name: youtube_downloader + ports: + - 7860:7860 \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 7b4bda3..d0fa5bb 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,2 @@ -pytube -moviepy # make sure ffmpeg is installed on your machine! -streamlit \ No newline at end of file +yt-dlp +gradio==4.38.1 \ No newline at end of file diff --git a/tests/__init__.py b/tests/__init__.py index c7bf888..11d9f01 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -2,4 +2,4 @@ cwd = os.getcwd() CONTAINER_NAME = "youtube_downloader" -STREAMLIT_FILE = "youtube_downloader/app.py" +APP_FILE = "youtube_downloader/app.py" diff --git a/tests/test_app.py b/tests/test_app.py new file mode 100644 index 0000000..0143a0f --- /dev/null +++ b/tests/test_app.py @@ -0,0 +1,27 @@ +import subprocess +import pytest +import time +from tests import APP_FILE + + +@pytest.fixture(scope="module") +def start_app(): + cmd = f"python {APP_FILE}" + process = subprocess.Popen( + cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE + ) + time.sleep(5) + yield process + process.terminate() + process.wait() + + +def test_streamlit(subtests, start_app): + with subtests.test(msg="server up"): + assert start_app.poll() is None, "app failed to start" + + with subtests.test(msg="streamlit down"): + start_app.terminate() + time.sleep(2) + assert start_app.poll() is not None, "app failed to stop" + diff --git a/tests/test_streamlit.py b/tests/test_streamlit.py deleted file mode 100644 index 90d214e..0000000 --- a/tests/test_streamlit.py +++ /dev/null @@ -1,27 +0,0 @@ -import subprocess -import pytest -import time -from tests import STREAMLIT_FILE - - -@pytest.fixture(scope="module") -def start_streamlit_app(): - cmd = f"python -m streamlit run {STREAMLIT_FILE} --server.headless true" - process = subprocess.Popen( - cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE - ) - time.sleep(5) - yield process - process.terminate() - process.wait() - - -def test_streamlit(subtests, start_streamlit_app): - with subtests.test(msg="streamlit up"): - assert start_streamlit_app.poll() is None, "Streamlit app failed to start" - - with subtests.test(msg="streamlit down"): - start_streamlit_app.terminate() - time.sleep(2) - assert start_streamlit_app.poll() is not None, "Streamlit app failed to stop" - diff --git a/youtube_downloader/app.py b/youtube_downloader/app.py index f27db4d..de2cf48 100644 --- a/youtube_downloader/app.py +++ b/youtube_downloader/app.py @@ -1,292 +1,65 @@ -import streamlit as st -from youtube_downloader.streams import get_yt_streams -from youtube_downloader.download import ( - download_joint_stream, - download_separate_streams_and_join, -) -import os - - -st.set_page_config(page_title="YT Downloader") -st.title("Youtube Downloader") -st.markdown("instructions: paste a valid youtube url in the textbox and download ") - -# Initialization -if "url" not in st.session_state: - st.session_state["url"] = "" -if "url_stream_count" not in st.session_state: - st.session_state["url_stream_count"] = 0 -if "stream_button_pressed" not in st.session_state: - st.session_state["stream_button_pressed"] = False -if "yt" not in st.session_state: - st.session_state["yt"] = None -if "yt_title" not in st.session_state: - st.session_state["yt_title"] = None -if "yt_thumbnail_url" not in st.session_state: - st.session_state["yt_thumbnail_url"] = None - -if "a_v_selection_index" not in st.session_state: - st.session_state["a_v_selection_index"] = 1 -if "audio_video_streams" not in st.session_state: - st.session_state["audio_video_streams"] = None -if "audio_video_choices" not in st.session_state: - st.session_state["audio_video_choices"] = None - -if "v_selection_index" not in st.session_state: - st.session_state["v_selection_index"] = 0 -if "video_only_streams" not in st.session_state: - st.session_state["video_streams"] = None -if "video_only_choices" not in st.session_state: - st.session_state["video_only_choices"] = None - -if "a_selection_index" not in st.session_state: - st.session_state["a_selection_index"] = 0 -if "audio_only_streams" not in st.session_state: - st.session_state["audio_only_streams"] = None -if "audio_only_choices" not in st.session_state: - st.session_state["audio_only_choices"] = None - - -def reset_session_state(): - if "stream_button_pressed" in st.session_state: - st.session_state["stream_button_pressed"] = False - if "yt" in st.session_state: - st.session_state["yt"] = None - if "yt_title" in st.session_state: - st.session_state["yt_title"] = None - if "yt_thumbnail_url" in st.session_state: - st.session_state["yt_thumbnail_url"] = None - - if "a_v_selection_index" in st.session_state: - st.session_state["a_v_selection_index"] = 1 - if "audio_video_streams" in st.session_state: - st.session_state["audio_video_streams"] = None - if "audio_video_choices" in st.session_state: - st.session_state["audio_video_choices"] = None - - if "v_selection_index" in st.session_state: - st.session_state["v_selection_index"] = 0 - if "video_only_streams" in st.session_state: - st.session_state["video_streams"] = None - if "video_only_choices" in st.session_state: - st.session_state["video_only_choices"] = None - - if "a_selection_index" in st.session_state: - st.session_state["a_selection_index"] = 0 - if "audio_only_streams" in st.session_state: - st.session_state["audio_only_streams"] = None - if "audio_only_choices" in st.session_state: - st.session_state["audio_only_choices"] = None - - if "panel" in st.session_state: - del st.session_state["panel"] - - -base = st.container(border=True) -with base: - x, col1, y = st.columns([3, 20, 3]) - col_a, col_b, col_c = st.columns([5, 5, 5]) - - with col1: - url = col1.text_input( - label="enter youtube url", - placeholder="your youtube url goes here", - value="https://www.youtube.com/watch?v=H1r4IMS0vf8", - ) - if st.session_state["url_stream_count"] == 0: - st.session_state["url"] = url - else: - if st.session_state["url"] != url: - st.session_state["url_stream_count"] = 0 - st.session_state["url"] = url - if "panel" in st.session_state: - print("Deleting panel for refresh") - del st.session_state["panel"] - - col2, col3, col4 = st.columns([3, 2, 3]) - with col2: - check_button_val = st.button(label="fetch available streams", type="primary") - with col3: - panel = st.container() - with col4: - empty = st.empty() - -my_panel = st.empty() - - -def get_set_streams(url: str) -> None: - # collect video data - ( - yt, - yt_title, - yt_thumbnail_url, - audio_only_streams, - video_only_streams, - audio_video_streams, - ) = get_yt_streams(url) - - # save to session state - st.session_state["yt"] = yt - st.session_state["yt_title"] = yt_title - st.session_state["yt_thumbnail_url"] = yt_thumbnail_url - - audio_video_choices = [] - if len(audio_video_streams) > 0: - audio_video_choices = [(v.resolution, v.itag) for v in audio_video_streams] - audio_video_choices = tuple([None] + [v[0] for v in audio_video_choices]) - st.session_state["audio_video_choices"] = audio_video_choices - st.session_state["audio_video_streams"] = audio_video_streams - - video_only_choices = [] - if len(video_only_streams): - video_only_choices = [(v.resolution, v.itag) for v in video_only_streams] - video_only_choices = tuple([None] + [v[0] for v in video_only_choices]) - st.session_state["video_only_choices"] = video_only_choices - st.session_state["video_only_streams"] = video_only_streams - - audio_only_choices = [] - if len(audio_only_streams) > 0: - audio_only_choices = [(v.abr, v.itag) for v in audio_only_streams] - audio_only_choices = tuple([None] + [v[0] for v in audio_only_choices]) - st.session_state["audio_only_choices"] = audio_only_choices - st.session_state["audio_only_streams"] = audio_only_streams - - -def download_button_logic(download_button_val: bool): - if download_button_val: - if st.session_state["a_v_selection_index"] == 0: - if st.session_state["a_selection_index"] == 0 and st.session_state["v_selection_index"] == 0: - st.warning("please make a selection", icon="⚠️") - elif st.session_state["a_selection_index"] == 0 or st.session_state["v_selection_index"] == 0: - st.warning( - "if video only value chosen so must audio only value and vice-versa", - icon="⚠️", - ) - else: - with st.spinner(text="download in progress..."): - savedir = os.path.expanduser("~/Downloads") - vid_col, img_col = st.columns([10, 1]) - - # download audio/video jointly - audio_index = st.session_state["a_selection_index"] - audio_only_streams = st.session_state["audio_only_streams"] - audio_index -= 1 - audio_selection = audio_only_streams[audio_index] - audio_itag = audio_selection.itag - - video_index = st.session_state["v_selection_index"] - video_index -= 1 - video_only_streams = st.session_state["video_only_streams"] - - video_selection = video_only_streams[video_index] - video_itag = video_selection.itag - video_savepath = download_separate_streams_and_join( - st.session_state["yt"], - audio_itag, - video_itag, - savedir, - st.session_state["yt_title"], +import io +import tempfile +from youtube_downloader.yt_download import download_video +import gradio as gr + +video_choices = ["best", "1080", "720", "360"] + + +print("Setting up Gradio interface...") +with gr.Blocks(theme=gr.themes.Soft(), title=" youtube downloader") as demo: + with gr.Tabs(): + with gr.TabItem("youtube downloader"): + with tempfile.TemporaryDirectory() as tmpdirname: + with gr.Row(): + with gr.Column(scale=4): + url_input = gr.Textbox( + value="https://www.youtube.com/shorts/43BhDHYBG0o", + label="🔗 Paste YouTube / Shorts URL here", + placeholder="e.g., https://www.youtube.com/watch?v=.", + max_lines=1, + ) + with gr.Column(scale=3): + resolution_dropdown = gr.Dropdown( + choices=video_choices, value="best", label="video resolution", info="choose video resolution", interactive=True + ) + + with gr.Column(scale=2): + download_button = gr.Button("download", variant="primary") + + with gr.Row(): + og_video = gr.Video( + visible=False, ) - with vid_col: - st.subheader(st.session_state["yt_title"]) - video_file = open(video_savepath, "rb") - video_bytes = video_file.read() - st.video(video_bytes) - - else: - if st.session_state["a_selection_index"] != 0 or st.session_state["v_selection_index"] != 0: - st.warning( - "cannot chose option for audio/video joint, video only, and audio only", - icon="⚠️", - ) - else: - with st.spinner(text="download in progress..."): - savedir = os.path.expanduser("~/Downloads") - vid_col, img_col = st.columns([10, 1]) - - # download audio/video jointly - index = st.session_state["a_v_selection_index"] - index -= 1 - audio_video_streams = st.session_state["audio_video_streams"] - selection = audio_video_streams[index] - itag = selection.itag - video_savepath = download_joint_stream( - st.session_state["yt"], - itag, - savedir, - st.session_state["yt_title"], + @download_button.click(inputs=[url_input, resolution_dropdown], outputs=[og_video]) + def download_this(url_input, resolution_dropdown): + # temporary_video_location = tmpdirname + "/original_" + str(uuid.uuid4()) + ".mp4" + # temporary_audio_location = temporary_video_location.replace("mp4", "mp3") + + temporary_video_location = download_video(url_input, tmpdirname) + temporary_audio_location = temporary_video_location.replace("mp4", "mp3") + + filename = open(temporary_video_location, "rb") + byte_file = io.BytesIO(filename.read()) + with open(temporary_video_location, "wb") as out: + out.write(byte_file.read()) + + new_og_video = gr.Video( + value=temporary_video_location, + visible=True, + show_download_button=True, + show_label=True, + label="your video", + format="mp4", + width="50vw", + height="50vw", ) - with vid_col: - st.subheader(st.session_state["yt_title"]) - video_file = open(video_savepath, "rb") - video_bytes = video_file.read() - st.video(video_bytes) - - -def render_panel(): - my_panel = st.empty() - with my_panel.container(border=True): - a_selection, v_selection, a_v_selection = None, None, None - - with col_a: - a_v_selection = st.selectbox( - "joint selection (fps)", - options=st.session_state["audio_video_choices"], - index=st.session_state["a_v_selection_index"], - placeholder="Select video fps", - ) - if a_v_selection: - st.session_state["a_v_selection_index"] = list(st.session_state["audio_video_choices"]).index(a_v_selection) - else: - st.session_state["a_v_selection_index"] = 0 - - with col_b: - v_selection = st.selectbox( - "video only selection (fps)", - options=st.session_state["video_only_choices"], - index=st.session_state["v_selection_index"], - placeholder="Select video fps", - ) - if v_selection: - st.session_state["v_selection_index"] = list(st.session_state["video_only_choices"]).index(v_selection) - else: - st.session_state["v_selection_index"] = 0 - - with col_c: - a_selection = st.selectbox( - "audio only selection (kbps)", - options=st.session_state["audio_only_choices"], - index=st.session_state["a_selection_index"], - placeholder="Select audio kbps", - ) - if a_selection: - st.session_state["a_selection_index"] = list(st.session_state["audio_only_choices"]).index(a_selection) - else: - st.session_state["a_selection_index"] = 0 - - # download button - download_button_val = st.button(label="download selected streams", type="primary") - download_button_logic(download_button_val) - st.session_state["panel"] = my_panel - -def streams_button_logic(url: str) -> None: - if check_button_val: - if st.session_state["url_stream_count"] == 0: - st.session_state["url_stream_count"] += 1 - reset_session_state() - st.session_state["stream_button_pressed"] = True - get_set_streams(url) - render_panel() + return new_og_video -if st.session_state["stream_button_pressed"]: - st.empty() - render_panel() -with st.spinner(text="streams pull in progress..."): - try: - streams_button_logic(st.session_state["url"]) - except: # noqa E722 - pass +if __name__ == "__main__": + print("Launching Gradio interface...") + demo.launch() # allow_flagging="never" diff --git a/youtube_downloader/download.py b/youtube_downloader/download.py deleted file mode 100644 index b98e47a..0000000 --- a/youtube_downloader/download.py +++ /dev/null @@ -1,42 +0,0 @@ -from pytube import YouTube -from moviepy.editor import VideoFileClip, AudioFileClip -import requests -import tempfile -import re - - -def download_joint_stream(yt: YouTube, itag: int, save_dir: str, yt_title: str) -> str: - try: - final_save_path = save_dir + "/" + yt_title + ".mp4" - final_save_path = re.sub(r"[^a-zA-Z0-9./]", " ", final_save_path) - yt.streams.get_by_itag(itag).download(filename=final_save_path) - return final_save_path - except Exception as e: - raise ValueError(f"download_joint_stream failed with exception {e}") - - -def download_separate_streams_and_join(yt: YouTube, audio_itag: int, video_itag: int, save_dir: str, yt_title: str) -> str: - with tempfile.TemporaryDirectory() as tmpdirname: - tmpaudiopath = tmpdirname + "/" + yt_title + "_audio.mp4" - tmpaudiopath = re.sub(r"[^a-zA-Z0-9./]", " ", tmpaudiopath) - - tmpvideopath = tmpdirname + "/" + yt_title + "_video.mp4" - tmpvideopath = re.sub(r"[^a-zA-Z0-9./]", " ", tmpvideopath) - - yt.streams.get_by_itag(audio_itag).download(filename=tmpaudiopath) - yt.streams.get_by_itag(video_itag).download(filename=tmpvideopath) - - # combine the video clip with the audio clip - video_clip = VideoFileClip(tmpvideopath) - audio_clip = AudioFileClip(tmpaudiopath) - video_clip.audio = audio_clip - final_save_path = save_dir + "/" + yt_title + ".mp4" - final_save_path = re.sub(r"[^a-zA-Z0-9./]", " ", final_save_path) - video_clip.write_videofile( - final_save_path, - codec="libx264", - audio_codec="aac", - temp_audiofile="temp-audio.m4a", - remove_temp=True, - ) - return final_save_path diff --git a/youtube_downloader/streams.py b/youtube_downloader/streams.py deleted file mode 100644 index 62c5642..0000000 --- a/youtube_downloader/streams.py +++ /dev/null @@ -1,57 +0,0 @@ -from typing import Tuple -from pytube import YouTube -from pytube.query import StreamQuery -import re - - -def is_valid_youtube_url(url: str) -> bool: - if not isinstance(url, str): - return False - pattern = r"^https://www\.youtube\.com/watch\?v=[A-Za-z0-9_-]{11}$" # youtube vido ids are always 11 chars long - if "shorts" in url: - pattern = r"^https://www\.youtube\.com/shorts/[A-Za-z0-9_-]{11}$" # youtube vido ids are always 11 chars long - return re.match(pattern, url) is not None - - -def get_yt_streams(url: str, my_proxies: dict = {}) -> Tuple[YouTube, str, str, StreamQuery, StreamQuery, StreamQuery]: - try: - # validate url - if is_valid_youtube_url(url): - # load in video - yt = YouTube(url, proxies=my_proxies) - - # audio only streams - audio_only_streams = yt.streams.filter(file_extension="mp4", only_audio=True, type="audio").order_by("abr").asc() - - # video only streams - video_only_streams = yt.streams.filter(file_extension="mp4", only_video=True, type="video").order_by("resolution").asc() - - # audio and video joint streams - audio_video_streams = ( - yt.streams.filter( - file_extension="mp4", - only_audio=False, - only_video=False, - progressive=True, - type="video", - ) - .order_by("resolution") - .asc() - ) - - # get title and thumbnail - yt_title = yt.title.replace("/", " ") - yt_thumbnail_url = yt.thumbnail_url - - return ( - yt, - yt_title, - yt_thumbnail_url, - audio_only_streams, - video_only_streams, - audio_video_streams, - ) - else: - raise ValueError(f"invalid input url: {url}") - except Exception as e: - raise ValueError(f"get_yt_streams failed with exception {e}") diff --git a/youtube_downloader/yt_download.py b/youtube_downloader/yt_download.py new file mode 100644 index 0000000..495418b --- /dev/null +++ b/youtube_downloader/yt_download.py @@ -0,0 +1,44 @@ +import yt_dlp +from yt_dlp import YoutubeDL +import re + + +def is_valid_youtube_url(url: str) -> bool: + if not isinstance(url, str): + return False + pattern = r"^https://www\.youtube\.com/watch\?v=[A-Za-z0-9_-]{11}$" # youtube vido ids are always 11 chars long + if "shorts" in url: + pattern = r"^https://www\.youtube\.com/shorts/[A-Za-z0-9_-]{11}$" # youtube vido ids are always 11 chars long + return re.match(pattern, url) is not None + + +def download_video(url: str, savedir: str, my_proxies: dict = {}) -> str: + try: + print("Downloading video from youtube...") + if is_valid_youtube_url(url): + with YoutubeDL() as ydl: + info_dict = ydl.extract_info(url, download=False) + video_url = info_dict.get("url", None) + video_id = info_dict.get("id", None) + video_title = info_dict.get('title', None) + if video_title is None: + savepath = savedir + "/" + video_id + ".mp4" + else: + savepath = savedir + "/" + video_title + ".mp4" + + print("Title: " + video_title) + + ydl_opts = { + "format": "bestvideo[height<=720]+bestaudio/best", + "merge_output_format": "mp4", + "outtmpl": savepath, + } + with yt_dlp.YoutubeDL(ydl_opts) as ydl: + ydl.download([url]) + + print("...done!") + return savepath + else: + raise ValueError(f"invalid input url: {url}") + except Exception as e: + raise ValueError(f"yt_download failed with exception {e}")