mirror of
https://github.com/RetroZelda/beacon-snatch.git
synced 2026-06-11 19:06:52 +00:00
321 lines
11 KiB
Python
321 lines
11 KiB
Python
|
|
import logging
|
|
import getpass
|
|
import click
|
|
import cmd
|
|
import os
|
|
import re
|
|
|
|
from . import helpers
|
|
from .series import BeaconSeries
|
|
from .content import BeaconContent
|
|
from .authentication import BeaconAuthentication
|
|
|
|
LOG_LEVELS = ["DEBUG", "VERBOSE", "INFO", "WARNING", "ERROR", "CRITICAL"]
|
|
|
|
class BeaconSnatchCLI:
|
|
def __init__(self, cache, output):
|
|
self.cache_dir = os.path.expanduser(cache or helpers.DEFAULT_CACHE)
|
|
self.output_dir = os.path.expanduser(output or helpers.DEFAULT_OUTPUT)
|
|
self.auth = BeaconAuthentication(email=None, password=None, cookies_file=f"{self.cache_dir}/{helpers.COOKIE_NAME}")
|
|
|
|
# generally just caching. Should prolly do something better eventually
|
|
self.series_list = None
|
|
self.series_info_cache = {}
|
|
self.content_info_cache = {}
|
|
|
|
def authenticate(self):
|
|
if self.auth.IsAuthenticated:
|
|
print(f"Authenticated as: {self.auth.username}")
|
|
return
|
|
|
|
self.auth.email = input("Enter your Beacon Email: ").strip()
|
|
self.auth.password = getpass.getpass("Enter your Beacon Password: ").strip()
|
|
|
|
self.auth.authenticate(force=True)
|
|
|
|
def list_series(self):
|
|
if self.series_list is None:
|
|
self.series_list = BeaconSeries.get_all_series(self.auth)
|
|
|
|
for num, series_id in enumerate(self.series_list):
|
|
print(f"{num}) {series_id}")
|
|
|
|
def series_info(self, series_id):
|
|
logging.info(f"Fetching information for series {series_id}...")
|
|
|
|
series_info = self.series_info_cache.get(series_id)
|
|
if not series_info:
|
|
series_info = BeaconSeries.create(self.auth, series_id, True)
|
|
if series_info is not None:
|
|
self.series_info_cache[series_id] = series_info
|
|
|
|
for content in series_info.content:
|
|
self.content_info_cache[content.slug] = content
|
|
|
|
if series_info:
|
|
print(f"\tid:\n\t\t{series_info.id}")
|
|
print(f"\ttitle:\n\t\t{series_info.title}")
|
|
print(f"\tdescription:\n\t\t{series_info.description}")
|
|
print(f"\tseries_url:\n\t\t{series_info.series_url}")
|
|
print(f"\tcontent count:\n\t\t{len(series_info.content)}")
|
|
else:
|
|
print(f"Invalid series \"{series_id}\".")
|
|
|
|
def series_list_content(self, series_id):
|
|
logging.info(f"Fetching information for series {series_id}...")
|
|
|
|
series_info = self.series_info_cache.get(series_id)
|
|
if not series_info:
|
|
series_info = BeaconSeries.create(self.auth, series_id, True)
|
|
if series_info is not None:
|
|
self.series_info_cache[series_id] = series_info
|
|
|
|
for content in series_info.content:
|
|
self.content_info_cache[content.slug] = content
|
|
|
|
if series_info:
|
|
print(f"\tcontent_id\t:\tcontent_title")
|
|
for content in series_info.content:
|
|
print(f"\t{content.slug}\t:\t{content.title}")
|
|
else:
|
|
print(f"Invalid series \"{series_id}\".")
|
|
|
|
def series_download(self, series_id):
|
|
logging.info(f"Downloading series {series_id}...")
|
|
series_info = self.series_info_cache.get(series_id)
|
|
if not series_info:
|
|
series_info = BeaconSeries.create(self.auth, series_id, True)
|
|
if series_info is not None:
|
|
self.series_info_cache[series_id] = series_info
|
|
|
|
for content in series_info.content:
|
|
self.content_info_cache[content.slug] = content
|
|
|
|
if series_info:
|
|
for content in series_info.content:
|
|
content.download(content.video_and_audio_streams[0], self.output_dir)
|
|
else:
|
|
print(f"Invalid series \"{series_id}\".")
|
|
|
|
def content_info(self, content_id):
|
|
logging.info(f"Fetching information for content {content_id}...")
|
|
|
|
content_info = self.content_info_cache.get(content_id)
|
|
if not content_info:
|
|
content_info = BeaconContent.create(self.auth, content_id)
|
|
if content_info is not None:
|
|
self.content_info_cache[content_id] = content_info
|
|
|
|
if content_info:
|
|
adjusted_description = re.sub("\n", "\n\t\t", content_info.description)
|
|
print(f"\tid:\n\t\t{content_info.slug}") # display the slug as the id because whatever
|
|
print(f"\ttitle:\n\t\t{content_info.title}")
|
|
print(f"\tdescription:\n\t\t{adjusted_description}")
|
|
print(f"\tduration:\n\t\t{content_info.duration}")
|
|
#print(f"\tslug:\n\t\t{content_info.slug}")
|
|
print(f"\tpublishedDate:\n\t\t{content_info.publishedDate}")
|
|
else:
|
|
print(f"Invalid content \"{content_id}\".")
|
|
|
|
|
|
def content_download(self, content_id):
|
|
logging.info(f"Downloading content {content_id}...")
|
|
|
|
content_info = self.content_info_cache.get(content_id)
|
|
if not content_info:
|
|
content_info = BeaconContent.create(self.auth, content_id)
|
|
if content_info is not None:
|
|
self.content_info_cache[content_id] = content_info
|
|
|
|
if content_info:
|
|
content_info.download(content_info.video_and_audio_streams[0], self.output_dir)
|
|
else:
|
|
print(f"Invalid content \"{content_id}\".")
|
|
|
|
def set_output(self, output_dir):
|
|
logging.info(f"Setting output directory to {output_dir}...")
|
|
helpers.set_output_directory(output_dir)
|
|
|
|
def clear_cookies(self):
|
|
self.auth.clear_cookies()
|
|
logging.info("Cookies cleared.")
|
|
|
|
def show_info(self):
|
|
print(f"\tAuthenticated as: {self.auth.username}")
|
|
print(f"\tCache Directory:\n\t\t{self.cache_dir}")
|
|
print(f"\tOutput Directory:\n\t\t{self.output_dir}")
|
|
|
|
def run(self):
|
|
running = True
|
|
while running:
|
|
user_input = input("> ").strip().lower()
|
|
if not user_input:
|
|
continue # Skip empty inputs
|
|
|
|
# Split the input by space
|
|
parts = user_input.split(" ")
|
|
command = parts[0]
|
|
args = parts[1:] # This will be a list of arguments
|
|
|
|
if command == "info":
|
|
self.show_info()
|
|
elif command == "authenticate":
|
|
self.authenticate()
|
|
elif command == "set" and len(parts) > 1 and parts[1] == "output":
|
|
self.set_output(" ".join(args[1:])) # Pass the arguments as a single string if needed
|
|
elif command == "clear" and len(parts) > 1 and parts[1] == "cookies":
|
|
self.clear_cookies()
|
|
elif command == "help":
|
|
self.display_help()
|
|
elif command == "exit":
|
|
running = False
|
|
else:
|
|
if self.auth is None or not self.auth.IsAuthenticated:
|
|
print('Not authenticated. Use "help" to know what to do.')
|
|
elif command == "list" and len(parts) > 1 and parts[1] == "series":
|
|
self.list_series(args)
|
|
elif command == "series" and len(parts) > 1:
|
|
sub_command = parts[1]
|
|
if sub_command == "info":
|
|
self.series_info(args)
|
|
elif sub_command == "list" and len(parts) > 2 and parts[2] == "content":
|
|
self.series_list_content(args)
|
|
elif sub_command == "download":
|
|
self.series_download(args)
|
|
elif command == "content" and len(parts) > 1:
|
|
sub_command = parts[1]
|
|
if sub_command == "info":
|
|
self.content_info(args)
|
|
elif sub_command == "download":
|
|
self.content_download(args)
|
|
else:
|
|
print(f'Unknown command: {command}. Use "help" to know what to do.')
|
|
|
|
class InteractiveCLI(cmd.Cmd):
|
|
intro = "Welcome to Beacon Snatch Interactive CLI. Type help to list commands.\n"
|
|
prompt = "(Beacon) > "
|
|
|
|
def __init__(self, cli_context):
|
|
super().__init__()
|
|
self.cli_context = cli_context
|
|
|
|
def default(self, line):
|
|
try:
|
|
# ensure we dont recursivly handle interactive mode
|
|
parts = line.split()
|
|
filtered_parts = [part for part in parts if part.lower() != "interactive"]
|
|
if len(filtered_parts) > 0:
|
|
result = cli.main(args=filtered_parts, prog_name="beacon_snatch", standalone_mode=False, obj=self.cli_context.obj)
|
|
except SystemExit:
|
|
# Prevent cmd from exiting due to click's SystemExit
|
|
pass
|
|
except Exception as e:
|
|
print(f"Error: {str(e)}")
|
|
|
|
def do_help(self, arg):
|
|
"""Display the same help text as Click"""
|
|
# Get Click's help text and print it
|
|
click.echo(cli.get_help(self.cli_context))
|
|
|
|
def do_exit(self, arg):
|
|
"""Exit the CLI"""
|
|
print("Goodbye!")
|
|
return True
|
|
|
|
@click.group()
|
|
@click.option("--log-level", default="INFO", type=click.Choice(LOG_LEVELS), help="Set the logging level.")
|
|
@click.option("--cache", help="Path to the cache directory. If empty, a default will be used.")
|
|
@click.option("--output", help="Path to save downloads. If empty, a default will be used.")
|
|
@click.pass_context
|
|
def cli(ctx, log_level, cache, output):
|
|
"""CLI Interface to Snatch from Beacon"""
|
|
if ctx.obj is None:
|
|
logging.basicConfig(level=log_level, format='%(levelname)s: %(message)s')
|
|
ctx.obj = BeaconSnatchCLI(cache, output)
|
|
|
|
@cli.command()
|
|
@click.pass_obj
|
|
def authenticate(cli):
|
|
"""Authenticate with Beacon using your credentials."""
|
|
cli.authenticate()
|
|
|
|
@cli.command()
|
|
@click.pass_obj
|
|
def list_series(cli):
|
|
"""List all available series on Beacon."""
|
|
cli.list_series()
|
|
|
|
@cli.command()
|
|
@click.argument("series_id")
|
|
@click.pass_obj
|
|
def series_info(cli, series_id):
|
|
"""Get detailed information about a specific series."""
|
|
cli.series_info(series_id)
|
|
|
|
@cli.command()
|
|
@click.argument("series_id")
|
|
@click.pass_obj
|
|
def series_list_content(cli, series_id):
|
|
"""Get basic information for each content in a series."""
|
|
cli.series_list_content(series_id)
|
|
|
|
@cli.command()
|
|
@click.argument("series_id")
|
|
@click.pass_obj
|
|
def series_download(cli, series_id):
|
|
"""Download all content from a specific series."""
|
|
cli.series_download(series_id)
|
|
|
|
@cli.command()
|
|
@click.argument("content_id")
|
|
@click.pass_obj
|
|
def content_info(cli, content_id):
|
|
"""Get detailed information about specific content."""
|
|
cli.content_info(content_id)
|
|
|
|
@cli.command()
|
|
@click.argument("content_id")
|
|
@click.pass_obj
|
|
def content_download(cli, content_id):
|
|
"""Download specific content by content ID."""
|
|
cli.content_download(content_id)
|
|
|
|
@cli.command()
|
|
@click.argument("output_dir")
|
|
@click.pass_obj
|
|
def set_output(cli, output_dir):
|
|
"""Set the directory where downloaded files will be saved."""
|
|
cli.set_output(output_dir)
|
|
|
|
@cli.command()
|
|
@click.pass_obj
|
|
def clear_cookies(cli):
|
|
"""Clear the stored authentication cookies."""
|
|
cli.clear_cookies()
|
|
|
|
@cli.command()
|
|
@click.pass_obj
|
|
def info(cli):
|
|
"""View configuration info for the current session."""
|
|
cli.show_info()
|
|
|
|
@cli.command()
|
|
def exit():
|
|
"""Exit the CLI interface."""
|
|
click.echo("Goodbye!")
|
|
raise SystemExit(0)
|
|
|
|
@cli.command()
|
|
@click.pass_context
|
|
def interactive(ctx):
|
|
"""Start the interactive CLI mode."""
|
|
InteractiveCLI(ctx).cmdloop()
|
|
|
|
def main():
|
|
logging.addLevelName(helpers.LOG_VERBOSE, helpers.LOG_VERBOSE_NAME)
|
|
|
|
cli()
|
|
|
|
if __name__ == "__main__":
|
|
main() |