Initial Submit

This commit is contained in:
retrozelda
2024-08-24 01:24:34 -04:00
commit 8616521124
13 changed files with 1040 additions and 0 deletions

22
.gitignore vendored Normal file
View File

@@ -0,0 +1,22 @@
# Python bytecode
__pycache__/
*.py[cod]
# Distribution / packaging
build/
dist/
*.egg-info/
# Virtual environment
venv/
env/
.venv/
# Editor and IDE files
.vscode/
.idea/
*.swp
# OS-specific files
.DS_Store
Thumbs.db

9
MANIFEST.in Normal file
View File

@@ -0,0 +1,9 @@
include dependencies
include README.md
include LICENSE
include setup.py
include pyproject.toml
recursive-include beacon_snatch *.py

2
README.md Normal file
View File

@@ -0,0 +1,2 @@
Snatch from Beacon.tv

17
beacon_snatch/__init__.py Normal file
View File

@@ -0,0 +1,17 @@
import logging
from . import helpers
from .series import BeaconSeries
from .content import BeaconContent
from .stream import BeaconStreamInfo
from .authentication import BeaconAuthentication
__all__ = ["BeaconSeries",
"BeaconContent",
"BeaconStreamInfo",
"BeaconAuthentication"]
logging.addLevelName(helpers.LOG_VERBOSE, helpers.LOG_VERBOSE_NAME)
logging.basicConfig(level=logging.INFO, format='%(levelname)s: %(message)s')

View File

@@ -0,0 +1,173 @@
from . import helpers
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.options import Options
from selenium.common.exceptions import InvalidCookieDomainException
import logging
import json
import time
import os
base_url = "https://beacon.tv"
profile_url = "https://beacon.tv/profile"
class BeaconAuthentication:
def __init__(self, email = None, password = None, cookies_file = None):
self.email = email
self.password = password
self.cookies_file = cookies_file
self.driver = None
self.authenticated_cookies = None
self.username = None
self.IsAuthenticated = False
self.CheckedAuthentication = False
# Set up Chrome options to simulate a real user
self.chrome_options = Options()
self.chrome_options.add_argument("--headless") # comment this out to debug view what is going on
self.chrome_options.add_argument("--no-sandbox")
self.chrome_options.add_argument("--disable-dev-shm-usage")
self.chrome_options.add_argument("user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.5735.199 Safari/537.36")
if self.cookies_file is None:
self.cookies_file = os.path.expanduser(helpers.DEFAULT_COOKIES)
if self.cookies_file is not None:
self.cookies_file = os.path.expanduser(self.cookies_file)
self.load_cookies()
if self.authenticated_cookies is not None:
self.check_authentication()
else:
assert self.email is not None and self.password is not None, "Cookies are somehow invalid. Need to specify an email and password"
def __del__(self):
if self.driver is not None:
self.driver.quit()
self.driver = None
def get_driver(self):
if self.driver is None:
self.driver = webdriver.Chrome(options=self.chrome_options)
self.driver.implicitly_wait(10)
self.driver.get(base_url)
# set cookies if they are loaded
if self.authenticated_cookies:
for cookie in self.authenticated_cookies:
cookie['domain'] = cookie['domain'].lstrip('.')
try:
self.driver.add_cookie(cookie)
except InvalidCookieDomainException:
logging.log(helpers.LOG_VERBOSE, f"Wrong domain for cookie: {cookie}")
continue
return self.driver
def authenticate(self, force : bool = False):
if self.IsAuthenticated == True and self.CheckedAuthentication == True and not force:
return
# Open the login page
driver = self.get_driver()
try:
driver.get(base_url)
# find and click the login button(Note we need to either do this now, or we need to do it after we enter our credentials)
login_button = driver.find_element(By.LINK_TEXT, 'Login')
login_button.click()
# Find the email input field and enter the email address
email_input = driver.find_element(By.ID, 'session_email')
email_input.send_keys(self.email)
# Click the "Continue" button
continue_button = driver.find_element(By.NAME, 'commit')
continue_button.click()
# Find the password input field and enter the password
password_input = driver.find_element(By.ID, 'session_password')
password_input.send_keys(self.password)
# Click the "Sign In" button
sign_in_button = driver.find_element(By.NAME, 'commit')
sign_in_button.click()
# wait for our cookies to arrive
time.sleep(5)
# Capture all cookies after logging in
self.authenticated_cookies = driver.get_cookies()
self.save_cookies()
self.check_authentication()
except:
logging.warn("Unable to login. Please check your credentials or clear your cookies and try again.")
def check_authentication(self):
driver = self.get_driver()
try:
# Open the login page
driver.get(profile_url)
# if we arent properly logged in, we will redirect back to the homepage
if driver.current_url != profile_url:
self.username = None
self.IsAuthenticated = False
self.CheckedAuthentication = False
logging.warn("Not properly authenticated. Please check your credentials or clear your cookies and try again.")
return
profile_name = driver.find_element(By.XPATH, "//h1[contains(@class, 'is_Type') and contains(@class, 'font_heading')]")
self.username = profile_name.text
self.IsAuthenticated = True
self.CheckedAuthentication = True
logging.info(f"Authenticated as: {self.username}")
except:
self.username = None
self.IsAuthenticated = False
self.CheckedAuthentication = False
logging.warn("Unable to verify authentication. Please check your credentials or clear your cookies and try again.")
def save_cookies(self):
cookies_dict = {
"cookies": self.authenticated_cookies
}
os.makedirs(os.path.dirname(self.cookies_file), exist_ok=True)
with open(self.cookies_file, 'w') as file:
json.dump(cookies_dict, file, indent=4)
def load_cookies(self):
if self.cookies_file is not None and os.path.exists(self.cookies_file):
with open(self.cookies_file, 'r') as file:
self.authenticated_cookies = json.load(file).get('cookies', [])
def clear_cookies(self):
self.email = None
self.password = None
self.authenticated_cookies = None
self.username = None
self.IsAuthenticated = False
self.CheckedAuthentication = False
if os.path.exists(self.cookies_file):
os.remove(self.cookies_file)
if self.driver:
self.driver.delete_all_cookies()
self.driver.get(base_url)
logging.info("Cookies cleared.")

320
beacon_snatch/cli.py Normal file
View File

@@ -0,0 +1,320 @@
import logging
import getpass
import click
import cmd
import os
import re
from . import helpers
from .series import BeaconSeries
from .content import BeaconContent
from .authentication import BeaconAuthentication
LOG_LEVELS = ["DEBUG", "VERBOSE", "INFO", "WARNING", "ERROR", "CRITICAL"]
class BeaconSnatchCLI:
def __init__(self, cache, output):
self.cache_dir = os.path.expanduser(cache or helpers.DEFAULT_CACHE)
self.output_dir = os.path.expanduser(output or helpers.DEFAULT_OUTPUT)
self.auth = BeaconAuthentication(email=None, password=None, cookies_file=f"{self.cache_dir}/{helpers.COOKIE_NAME}")
# generally just caching. Should prolly do something better eventually
self.series_list = None
self.series_info_cache = {}
self.content_info_cache = {}
def authenticate(self):
if self.auth.IsAuthenticated:
print(f"Authenticated as: {self.auth.username}")
return
self.auth.email = input("Enter your Beacon Email: ").strip()
self.auth.password = getpass.getpass("Enter your Beacon Password: ").strip()
self.auth.authenticate(force=True)
def list_series(self):
if self.series_list is None:
self.series_list = BeaconSeries.get_all_series(self.auth)
for num, series_id in enumerate(self.series_list):
print(f"{num}) {series_id}")
def series_info(self, series_id):
logging.info(f"Fetching information for series {series_id}...")
series_info = self.series_info_cache.get(series_id)
if not series_info:
series_info = BeaconSeries.create(self.auth, series_id)
if series_info is not None:
self.series_info_cache[series_id] = series_info
for content in series_info.content:
self.content_info_cache[content.slug] = content
if series_info:
print(f"\tid:\n\t\t{series_info.id}")
print(f"\ttitle:\n\t\t{series_info.title}")
print(f"\tdescription:\n\t\t{series_info.description}")
print(f"\tseries_url:\n\t\t{series_info.series_url}")
print(f"\tcontent count:\n\t\t{len(series_info.content)}")
else:
print(f"Invalid series \"{series_id}\".")
def series_list_content(self, series_id):
logging.info(f"Fetching information for series {series_id}...")
series_info = self.series_info_cache.get(series_id)
if not series_info:
series_info = BeaconSeries.create(self.auth, series_id)
if series_info is not None:
self.series_info_cache[series_id] = series_info
for content in series_info.content:
self.content_info_cache[content.slug] = content
if series_info:
print(f"\tcontent_id\t:\tcontent_title")
for content in series_info.content:
print(f"\t{content.slug}\t:\t{content.title}")
else:
print(f"Invalid series \"{series_id}\".")
def series_download(self, series_id):
logging.info(f"Downloading series {series_id}...")
series_info = self.series_info_cache.get(series_id)
if not series_info:
series_info = BeaconSeries.create(self.auth, series_id)
if series_info is not None:
self.series_info_cache[series_id] = series_info
for content in series_info.content:
self.content_info_cache[content.slug] = content
if series_info:
for content in series_info.content:
content.download(content.video_and_audio_streams[0], self.output_dir)
else:
print(f"Invalid series \"{series_id}\".")
def content_info(self, content_id):
logging.info(f"Fetching information for content {content_id}...")
content_info = self.content_info_cache.get(content_id)
if not content_info:
content_info = BeaconContent.create(self.auth, content_id)
if content_info is not None:
self.content_info_cache[content_id] = content_info
if content_info:
print(f"\tid:\n\t\t{content_info.slug}") # display the slug as the id because whatever
print(f"\ttitle:\n\t\t{content_info.title}")
print(f"\tdescription:\n\t\t{re.sub(r'\n', '\n\t\t', content_info.description)}")
print(f"\tduration:\n\t\t{content_info.duration}")
#print(f"\tslug:\n\t\t{content_info.slug}")
print(f"\tpublishedDate:\n\t\t{content_info.publishedDate}")
else:
print(f"Invalid content \"{content_id}\".")
def content_download(self, content_id):
logging.info(f"Downloading content {content_id}...")
content_info = self.content_info_cache.get(content_id)
if not content_info:
content_info = BeaconContent.create(self.auth, content_id)
if content_info is not None:
self.content_info_cache[content_id] = content_info
if content_info:
content_info.download(content_info.video_and_audio_streams[0], self.output_dir)
else:
print(f"Invalid content \"{content_id}\".")
def set_output(self, output_dir):
logging.info(f"Setting output directory to {output_dir}...")
helpers.set_output_directory(output_dir)
def clear_cookies(self):
self.auth.clear_cookies()
logging.info("Cookies cleared.")
def show_info(self):
print(f"\tAuthenticated as: {self.auth.username}")
print(f"\tCache Directory:\n\t\t{self.cache_dir}")
print(f"\tOutput Directory:\n\t\t{self.output_dir}")
def run(self):
running = True
while running:
user_input = input("> ").strip().lower()
if not user_input:
continue # Skip empty inputs
# Split the input by space
parts = user_input.split(" ")
command = parts[0]
args = parts[1:] # This will be a list of arguments
if command == "info":
self.show_info()
elif command == "authenticate":
self.authenticate()
elif command == "set" and len(parts) > 1 and parts[1] == "output":
self.set_output(" ".join(args[1:])) # Pass the arguments as a single string if needed
elif command == "clear" and len(parts) > 1 and parts[1] == "cookies":
self.clear_cookies()
elif command == "help":
self.display_help()
elif command == "exit":
running = False
else:
if self.auth is None or not self.auth.IsAuthenticated:
print('Not authenticated. Use "help" to know what to do.')
elif command == "list" and len(parts) > 1 and parts[1] == "series":
self.list_series(args)
elif command == "series" and len(parts) > 1:
sub_command = parts[1]
if sub_command == "info":
self.series_info(args)
elif sub_command == "list" and len(parts) > 2 and parts[2] == "content":
self.series_list_content(args)
elif sub_command == "download":
self.series_download(args)
elif command == "content" and len(parts) > 1:
sub_command = parts[1]
if sub_command == "info":
self.content_info(args)
elif sub_command == "download":
self.content_download(args)
else:
print(f'Unknown command: {command}. Use "help" to know what to do.')
class InteractiveCLI(cmd.Cmd):
intro = "Welcome to Beacon Snatch Interactive CLI. Type help to list commands.\n"
prompt = "(Beacon) > "
def __init__(self, cli_context):
super().__init__()
self.cli_context = cli_context
def default(self, line):
try:
# ensure we dont recursivly handle interactive mode
parts = line.split()
filtered_parts = [part for part in parts if part.lower() != "interactive"]
if len(filtered_parts) > 0:
result = cli.main(args=filtered_parts, prog_name="beacon_snatch", standalone_mode=False, obj=self.cli_context.obj)
except SystemExit:
# Prevent cmd from exiting due to click's SystemExit
pass
except Exception as e:
print(f"Error: {str(e)}")
def do_help(self, arg):
"""Display the same help text as Click"""
# Get Click's help text and print it
click.echo(cli.get_help(self.cli_context))
def do_exit(self, arg):
"""Exit the CLI"""
print("Goodbye!")
return True
@click.group()
@click.option("--log-level", default="INFO", type=click.Choice(LOG_LEVELS), help="Set the logging level.")
@click.option("--cache", help="Path to the cache directory. If empty, a default will be used.")
@click.option("--output", help="Path to save downloads. If empty, a default will be used.")
@click.pass_context
def cli(ctx, log_level, cache, output):
"""CLI Interface to Snatch from Beacon"""
if ctx.obj is None:
logging.basicConfig(level=log_level, format='%(levelname)s: %(message)s')
ctx.obj = BeaconSnatchCLI(cache, output)
@cli.command()
@click.pass_obj
def authenticate(cli):
"""Authenticate with Beacon using your credentials."""
cli.authenticate()
@cli.command()
@click.pass_obj
def list_series(cli):
"""List all available series on Beacon."""
cli.list_series()
@cli.command()
@click.argument("series_id")
@click.pass_obj
def series_info(cli, series_id):
"""Get detailed information about a specific series."""
cli.series_info(series_id)
@cli.command()
@click.argument("series_id")
@click.pass_obj
def series_list_content(cli, series_id):
"""Get basic information for each content in a series."""
cli.series_list_content(series_id)
@cli.command()
@click.argument("series_id")
@click.pass_obj
def series_download(cli, series_id):
"""Download all content from a specific series."""
cli.series_download(series_id)
@cli.command()
@click.argument("content_id")
@click.pass_obj
def content_info(cli, content_id):
"""Get detailed information about specific content."""
cli.content_info(content_id)
@cli.command()
@click.argument("content_id")
@click.pass_obj
def content_download(cli, content_id):
"""Download specific content by content ID."""
cli.content_download(content_id)
@cli.command()
@click.argument("output_dir")
@click.pass_obj
def set_output(cli, output_dir):
"""Set the directory where downloaded files will be saved."""
cli.set_output(output_dir)
@cli.command()
@click.pass_obj
def clear_cookies(cli):
"""Clear the stored authentication cookies."""
cli.clear_cookies()
@cli.command()
@click.pass_obj
def info(cli):
"""View configuration info for the current session."""
cli.show_info()
@cli.command()
def exit():
"""Exit the CLI interface."""
click.echo("Goodbye!")
raise SystemExit(0)
@cli.command()
@click.pass_context
def interactive(ctx):
"""Start the interactive CLI mode."""
InteractiveCLI(ctx).cmdloop()
def main():
logging.addLevelName(helpers.LOG_VERBOSE, helpers.LOG_VERBOSE_NAME)
cli()
if __name__ == "__main__":
main()

150
beacon_snatch/content.py Normal file
View File

@@ -0,0 +1,150 @@
from .authentication import BeaconAuthentication
from .stream import BeaconStreamInfo
from . import helpers
import subprocess
import requests
import logging
import json
import m3u8
import os
from selenium import webdriver
from selenium.webdriver.common.by import By
content_url = "https://beacon.tv/content"
class BeaconContent:
def __init__(self, auth : BeaconAuthentication):
self.auth = auth
self.id = None
self.title = None
self.description = None
self.duration = None
self.slug = None
self.publishedDate = None
self.primaryCollection = None
self.closedCaptions = None
self.m3u8_url = None
self.m3u8_obj = None
self.available_streams = []
@property
def video_only_streams(self):
return [stream for stream in self.available_streams if not stream.audio_codec and stream.video_codec]
@property
def audio_only_streams(self):
return [stream for stream in self.available_streams if stream.audio_codec and not stream.video_codec]
@property
def video_and_audio_streams(self):
return [stream for stream in self.available_streams if stream.audio_codec and stream.video_codec]
@classmethod
def create(cls, auth : BeaconAuthentication, content_id : str):
# Initialize the browser
driver = auth.get_driver()
new_content = None
# grab the chunk of json that holds the key to where we can get our m3u8 url
driver.get(f"{content_url}/{content_id}")
script_block = driver.find_element(By.ID, '__NEXT_DATA__')
json_data = script_block.get_attribute('innerHTML')
json_blob = json.loads(json_data)
# Traverse the JSON to find the "Content_ContentVideo" block
# this block is under a block with a key of "Content:[content_id]"
# so we need to find an element under "__APOLLO_STATE__" with "__typename" of "Content" first
apollo_state = json_blob.get("props", {}).get("pageProps", {}).get("__APOLLO_STATE__", {})
content_block = None
for key, value in apollo_state.items():
if isinstance(value, dict) and value.get("__typename") == "Content":
if value["slug"] != content_id:
continue
content_block = value
break
if not content_block:
logging.warn(f"cant find content for content_id \"{content_id}\"")
return None
if "video" not in content_block["contentType"]:
logging.warn(f"Skipping non-video content \"{ content_block["contentType"] }\" for Content \"{content_id}\"")
return None
content_video_block = content_block["contentVideo"]
if content_video_block and "video" in content_video_block:
new_content = BeaconContent(auth)
# Extract all the info for this video from the block
new_content.id = content_block["id"]
new_content.title = content_block["title"]
new_content.description = content_block["description"]
new_content.duration = content_block["duration"]
new_content.slug = content_block["slug"]
new_content.publishedDate = content_block["publishedAt"]
new_content.primaryCollection = content_block["primaryCollection"]["__ref"]
new_content.m3u8_url = content_video_block["video"]["video"]
new_content.closedCaptions = content_video_block["video"]["closedCaptions"]
new_content.fetch()
else:
logging.error("Content_ContentVideo block or video data not found.")
return new_content
# Fetches the m3u8 playlist to get all available streams that we can download
def fetch(self):
try:
# Fetch the m3u8 playlist content
response = requests.get(self.m3u8_url)
response.raise_for_status()
self.m3u8_obj = m3u8.loads(response.text)
logging.log(helpers.LOG_VERBOSE, f'M3U8 file fetched')
# create BeaconStreamInfo for every found in this m3u8's playlist
for stream_info in self.m3u8_obj.playlists:
new_stream = BeaconStreamInfo.from_m3u8_playlist(stream_info)
if new_stream is not None:
self.available_streams.append(new_stream)
self.available_streams.sort(
key=lambda stream: ((stream.width or 0) * (stream.height or 0), stream.bandwidth or 0),
reverse=True)
except requests.exceptions.RequestException as e:
logging.error(f"Error fetching the M3U8 file: {e}")
# Downloads the given stream using ffmpeg and saves it to the destination folder.
def download(self, stream: BeaconStreamInfo, destination_folder: str = "."):
# Ensure the destination folder exists
if not os.path.exists(destination_folder):
os.makedirs(destination_folder)
# Sanitize the title to create a safe filename
safe_title = helpers.sanitize(self.slug)
file_extension = "mp4" if stream.video_codec else "m4a"
output_filename = f"{safe_title}.{file_extension}"
output_path = os.path.join(destination_folder, output_filename)
# Build the ffmpeg command
command = [
"ffmpeg",
"-i", stream.m3u8_uri, # Input M3U8 URI from the stream
"-c", "copy", # Copy codecs without re-encoding
"-bsf:a", "aac_adtstoasc", # Bitstream filter for AAC audio
"-y",
output_path
]
# Run the ffmpeg command and capture output
logging.log(helpers.LOG_VERBOSE, f"Starting download for {self.title}...")
helpers.run_ffmpeg_with_progress(command=command, progress_header=f"Downloading \"{self.title}\"")
logging.log(helpers.LOG_VERBOSE, f"Download saved at '{output_path}'")

88
beacon_snatch/helpers.py Normal file
View File

@@ -0,0 +1,88 @@
from . import helpers
import subprocess
import logging
import time
import re
import progressbar
LOG_VERBOSE = 15
LOG_VERBOSE_NAME = "VERBOSE"
COOKIE_NAME = "cookies.json"
DEFAULT_CACHE = "~/.beacon-snatch"
DEFAULT_OUTPUT = DEFAULT_CACHE + "/downloads"
DEFAULT_COOKIES = DEFAULT_CACHE + "/" + COOKIE_NAME
def sanitize(input_string):
sanitized = input_string.strip()
sanitized = re.sub(r'[^\w\s\-.,!?]', '', sanitized)
sanitized = re.sub(r'\s+', ' ', sanitized)
sanitized = sanitized.replace(' ', '_')
sanitized = sanitized.lower()
return sanitized
def format_duration(seconds):
milliseconds = int((seconds % 1) * 1000)
seconds = int(seconds)
if seconds < 60:
return f"{seconds}.{milliseconds:03d} seconds"
elif seconds < 3600:
minutes = seconds // 60
seconds = seconds % 60
return f"{minutes} minutes {seconds} seconds"
else:
hours = seconds // 3600
minutes = (seconds % 3600) // 60
seconds = seconds % 60
return f"{hours} hours {minutes} minutes {seconds} seconds"
def parse_ffmpeg_duration(output):
match = re.search(r'Duration: (\d{2}):(\d{2}):(\d{2}\.\d{2})', output)
if match:
hours, minutes, seconds = map(float, match.groups())
return hours * 3600 + minutes * 60 + seconds
return None
def parse_ffmpeg_time(output):
match = re.search(r'time=(\d{2}):(\d{2}):(\d{2}\.\d{2})', output)
if match:
hours, minutes, seconds = map(float, match.groups())
return hours * 3600 + minutes * 60 + seconds
return None
def run_ffmpeg_with_progress(command, progress_header: str = "Processing"):
logging.log(helpers.LOG_VERBOSE, f"Executing ffmpeg")
logging.debug(f'{" ".join(command)}')
try:
process = subprocess.Popen(command, stderr=subprocess.PIPE, universal_newlines=True)
except Exception as e:
logging.error(f"Failed to start ffmpeg: {e}")
return
duration = None
progress_bar = None
start_time = time.time()
for line in process.stderr:
if duration is None:
duration = parse_ffmpeg_duration(line)
if duration:
progress_bar = progressbar.ProgressBar(max_value=duration)
progress_bar.start()
if progress_bar and duration:
current_time = parse_ffmpeg_time(line)
if current_time:
if current_time > duration:
current_time = duration
progress_bar.update(current_time)
process.wait()
if progress_bar:
progress_bar.finish()
logging.log(helpers.LOG_VERBOSE, f"Completed in {format_duration(time.time() - start_time)}")

149
beacon_snatch/series.py Normal file
View File

@@ -0,0 +1,149 @@
from .authentication import BeaconAuthentication
from .content import BeaconContent
from . import helpers
import subprocess
import requests
import logging
import json
import time
import m3u8
import os
import progressbar
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.common.exceptions import NoSuchElementException
from selenium.common.exceptions import StaleElementReferenceException
from selenium.common.exceptions import ElementClickInterceptedException
series_url = "https://beacon.tv/series"
class BeaconSeries:
def __init__(self, auth : BeaconAuthentication):
self.auth = auth
self.id = None
self.title = None
self.description = None
self.series_url = None
self.content = []
def get_all_series(auth : BeaconAuthentication):
logging.info("Finding all series IDs")
driver = auth.get_driver()
driver.get(series_url)
# click "load more" until everything is loaded
click_count = 0
while True:
try:
# find the button
load_more_span = driver.find_element(By.XPATH, "//span[text()='Load More']")
load_more_button = load_more_span.find_element(By.XPATH, "./ancestor::button")
driver.execute_script("arguments[0].scrollIntoView();", load_more_button)
logging.log(helpers.LOG_VERBOSE, f"\"Load More\" click #{click_count}")
click_count = click_count + 1
load_more_button.click()
time.sleep(1)
except ElementClickInterceptedException: # clicking too fast or while its loading will throw this, so we will just try again
continue
except NoSuchElementException: # I hate python
break
except StaleElementReferenceException: # if we get the element when the page removes it
break
# get all the links
unique_ids = set()
links = driver.find_elements(By.CSS_SELECTOR, 'a[href*="series"]')
for link in links:
href = link.get_attribute('href')
if series_url in href:
value = href.split("/series/")[-1]
if value != series_url: # bit of a hack to ignore the main series link at the top of the page
unique_ids.add(value)
# Convert the set to a list
series_ids = list(unique_ids)
logging.info(f"found {len(series_ids)} series after {click_count} clicks to load")
# create content info for each found id
for series_id in series_ids:
logging.log(helpers.LOG_VERBOSE, f"Found series \"{series_id}\"")
return series_ids
@classmethod
def create(cls, auth : BeaconAuthentication, series_id : str, auto_fetch : bool = False):
# Initialize the browser
driver = auth.get_driver()
new_series = None
try:
url = f"{series_url}/{series_id}"
driver.get(url)
title = driver.find_element(By.CSS_SELECTOR, 'h2.is_Type.font_heading').text
description = driver.find_element(By.CSS_SELECTOR, 'p.is_Type.font_body').text
new_series = cls(auth)
new_series.id = series_id
new_series.title = title
new_series.description = description
new_series.series_url = url
new_series.fetch(auth)
except:
logging.warn(f"Unable to create series \"{series_id}\".")
return new_series
# fetches all the content for this series
def fetch(self, auth : BeaconAuthentication):
driver = auth.get_driver()
driver.get(self.series_url)
# click "load more" until everything is loaded
click_count = 0
while True:
try:
# find the button
load_more_span = driver.find_element(By.XPATH, "//span[text()='Load More']")
load_more_button = load_more_span.find_element(By.XPATH, "./ancestor::button")
driver.execute_script("arguments[0].scrollIntoView();", load_more_button)
logging.log(helpers.LOG_VERBOSE, f"\"Load More\" click #{click_count}")
click_count = click_count + 1
load_more_button.click()
time.sleep(1)
except ElementClickInterceptedException: # clicking too fast or while its loading will throw this, so we will just try again
continue
except NoSuchElementException: # I hate python
break
except StaleElementReferenceException: # if we get the element when the page removes it
break
# get all the links
logging.info("Finding all Content IDs")
unique_ids = set()
links = driver.find_elements(By.CSS_SELECTOR, 'a[href*="content"]')
for link in links:
href = link.get_attribute('href')
if '/content/' in href:
value = href.split('/content/')[-1]
unique_ids.add(value)
# Convert the set to a list
content_ids = list(unique_ids)
logging.info(f"found {len(content_ids)} content after {click_count} clicks to load")
# create content info for each found id
for content_id in progressbar.ProgressBar(redirect_stdout=True, redirect_stderr=True)(content_ids):
logging.log(helpers.LOG_VERBOSE, f"Reading Content for \"{content_id}\"")
new_content = BeaconContent.create(auth, content_id)
if new_content is not None:
self.content.append(new_content)

72
beacon_snatch/stream.py Normal file
View File

@@ -0,0 +1,72 @@
import logging
import m3u8
class BeaconStreamInfo:
def __init__(self, width : int = None,
height : int = None,
bandwidth : int = None,
video_codec : str = None,
audio_codec : str = None,
m3u8_uri=None):
self.width = width
self.height = height
self.bandwidth = bandwidth
self.video_codec = video_codec
self.audio_codec = audio_codec
self.m3u8_uri = m3u8_uri
self.source_playlist = None
# Returns the resolution as a tuple (width, height).
@property
def resolution(self):
if self.width is not None and self.height is not None:
return (self.width, self.height)
return None
# Creates an instance of MediaFile from an m3u8.model.Playlist object.
@classmethod
def from_m3u8_playlist(cls, playlist : m3u8.model.Playlist):
stream_info = getattr(playlist, 'stream_info', None)
if stream_info is None:
logging.error(f"Unable to get stream_info '{codec}'")
return None
# Extract resolution if available
width, height = None, None
if stream_info.resolution:
width, height = stream_info.resolution
bandwidth = stream_info.bandwidth
# Extract codecs if available
codecs = stream_info.codecs if playlist.stream_info else None
video_codec, audio_codec = None, None
if codecs is not None:
codec_list = codecs.split(",")
for codec in codec_list:
# Determine if the codec is for video or audio
if codec.startswith("avc1") or codec.startswith("avc3") or codec.startswith("hev1") or codec.startswith("hvc1"):
if video_codec:
logging.warn(f"Attempting to set video codec '{codec}' but video codec '{video_codec}' is already set.")
else:
video_codec = codec
elif codec.startswith("mp4a") or codec.startswith("ac-3") or codec.startswith("ec-3"):
if audio_codec:
logging.warn(f"Attempting to set audio codec '{codec}' but audio codec '{audio_codec}' is already set.")
else:
audio_codec = codec
else:
logging.error(f"Unable to match codec '{codec}'")
# Extract URI
uri = getattr(playlist, 'absolute_uri', None)
stream_info = cls(width=width,
height=height,
bandwidth=bandwidth,
video_codec=video_codec,
audio_codec=audio_codec,
m3u8_uri=uri)
stream_info.source_playlist = playlist
return stream_info

5
dependencies Normal file
View File

@@ -0,0 +1,5 @@
selenium==4.23.1
requests==2.32.3
progressbar2==4.4.2
m3u8==6.0.0
click==8.1.7

3
pyproject.toml Normal file
View File

@@ -0,0 +1,3 @@
[build-system]
requires = ["setuptools>=42", "wheel"]
build-backend = "setuptools.build_meta"

30
setup.py Normal file
View File

@@ -0,0 +1,30 @@
from setuptools import setup, find_packages
# Function to read the requirements.txt file
def parse_requirements(filename):
with open(filename, "r") as req_file:
return req_file.read().splitlines()
setup(
name="beacon_snatch",
version="0.1.0",
author="RetroZelda",
author_email="retrozelda@gmail.com",
description="Snatch from Beacon",
long_description=open("README.md").read(),
long_description_content_type="text/markdown",
url="https://github.com/retrozelda/beacon_snatch",
packages=find_packages(),
classifiers=[
"Programming Language :: Python :: 3",
"License :: OSI Approved :: MIT License",
"Operating System :: OS Independent",
],
python_requires=">=3.6",
install_requires=parse_requirements("dependencies"),
entry_points={
"console_scripts": [
"beacon-snatch=beacon_snatch.cli:main",
],
},
)