mirror of
https://github.com/RetroZelda/beacon-snatch.git
synced 2026-06-11 19:06:52 +00:00
151 lines
6.1 KiB
Python
151 lines
6.1 KiB
Python
from .authentication import BeaconAuthentication
|
|
from .stream import BeaconStreamInfo
|
|
from . import helpers
|
|
|
|
import subprocess
|
|
import requests
|
|
import logging
|
|
import json
|
|
import m3u8
|
|
import os
|
|
|
|
from selenium import webdriver
|
|
from selenium.webdriver.common.by import By
|
|
|
|
content_url = "https://beacon.tv/content"
|
|
|
|
class BeaconContent:
|
|
def __init__(self, auth : BeaconAuthentication):
|
|
self.auth = auth
|
|
self.id = None
|
|
self.title = None
|
|
self.description = None
|
|
self.duration = None
|
|
self.slug = None
|
|
self.publishedDate = None
|
|
self.primaryCollection = None
|
|
self.closedCaptions = None
|
|
self.m3u8_url = None
|
|
self.m3u8_obj = None
|
|
self.available_streams = []
|
|
|
|
@property
|
|
def video_only_streams(self):
|
|
return [stream for stream in self.available_streams if not stream.audio_codec and stream.video_codec]
|
|
|
|
@property
|
|
def audio_only_streams(self):
|
|
return [stream for stream in self.available_streams if stream.audio_codec and not stream.video_codec]
|
|
|
|
@property
|
|
def video_and_audio_streams(self):
|
|
return [stream for stream in self.available_streams if stream.audio_codec and stream.video_codec]
|
|
|
|
@classmethod
|
|
def create(cls, auth : BeaconAuthentication, content_id : str):
|
|
|
|
# Initialize the browser
|
|
driver = auth.get_driver()
|
|
new_content = None
|
|
|
|
# grab the chunk of json that holds the key to where we can get our m3u8 url
|
|
driver.get(f"{content_url}/{content_id}")
|
|
script_block = driver.find_element(By.ID, '__NEXT_DATA__')
|
|
json_data = script_block.get_attribute('innerHTML')
|
|
json_blob = json.loads(json_data)
|
|
|
|
# Traverse the JSON to find the "Content_ContentVideo" block
|
|
# this block is under a block with a key of "Content:[content_id]"
|
|
# so we need to find an element under "__APOLLO_STATE__" with "__typename" of "Content" first
|
|
apollo_state = json_blob.get("props", {}).get("pageProps", {}).get("__APOLLO_STATE__", {})
|
|
content_block = None
|
|
for key, value in apollo_state.items():
|
|
if isinstance(value, dict) and value.get("__typename") == "Content":
|
|
if value["slug"] != content_id:
|
|
continue
|
|
content_block = value
|
|
break
|
|
|
|
if not content_block:
|
|
logging.warn(f"cant find content for content_id \"{content_id}\"")
|
|
return None
|
|
|
|
if "video" not in content_block["contentType"]:
|
|
logging.warn(f"Skipping non-video content \"{ content_block['contentType'] }\" for Content \"{content_id}\"")
|
|
return None
|
|
|
|
content_video_block = content_block["contentVideo"]
|
|
if content_video_block and "video" in content_video_block:
|
|
|
|
new_content = BeaconContent(auth)
|
|
|
|
# Extract all the info for this video from the block
|
|
new_content.id = content_block["id"]
|
|
new_content.title = content_block["title"]
|
|
new_content.description = content_block["description"]
|
|
new_content.duration = content_block["duration"]
|
|
new_content.slug = content_block["slug"]
|
|
new_content.publishedDate = content_block["publishedAt"]
|
|
# new_content.primaryCollection = content_block["primaryCollection"]["__ref"] # sometimes crashes but we dont need it so it can remain commented out
|
|
new_content.m3u8_url = content_video_block["video"]["video"]
|
|
new_content.closedCaptions = content_video_block["video"]["closedCaptions"]
|
|
|
|
new_content.fetch()
|
|
else:
|
|
logging.error("Content_ContentVideo block or video data not found.")
|
|
|
|
return new_content
|
|
|
|
# Fetches the m3u8 playlist to get all available streams that we can download
|
|
def fetch(self):
|
|
|
|
try:
|
|
# Fetch the m3u8 playlist content
|
|
response = requests.get(self.m3u8_url)
|
|
response.raise_for_status()
|
|
|
|
self.m3u8_obj = m3u8.loads(response.text)
|
|
logging.log(helpers.LOG_VERBOSE, f'M3U8 file fetched')
|
|
|
|
# create BeaconStreamInfo for every found in this m3u8's playlist
|
|
for stream_info in self.m3u8_obj.playlists:
|
|
new_stream = BeaconStreamInfo.from_m3u8_playlist(stream_info)
|
|
if new_stream is not None:
|
|
self.available_streams.append(new_stream)
|
|
|
|
self.available_streams.sort(
|
|
key=lambda stream: ((stream.width or 0) * (stream.height or 0), stream.bandwidth or 0),
|
|
reverse=True)
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
logging.error(f"Error fetching the M3U8 file: {e}")
|
|
|
|
# Downloads the given stream using ffmpeg and saves it to the destination folder.
|
|
def download(self, stream: BeaconStreamInfo, destination_folder: str = "."):
|
|
|
|
# Ensure the destination folder exists
|
|
if not os.path.exists(destination_folder):
|
|
os.makedirs(destination_folder)
|
|
|
|
# Sanitize the title to create a safe filename
|
|
safe_title = helpers.sanitize(self.slug)
|
|
file_extension = "mp4" if stream.video_codec else "m4a"
|
|
output_filename = f"{safe_title}.{file_extension}"
|
|
output_path = os.path.join(destination_folder, output_filename)
|
|
|
|
# Build the ffmpeg command
|
|
command = [
|
|
"ffmpeg",
|
|
"-i", stream.m3u8_uri, # Input M3U8 URI from the stream
|
|
"-c", "copy", # Copy codecs without re-encoding
|
|
"-bsf:a", "aac_adtstoasc", # Bitstream filter for AAC audio
|
|
"-y",
|
|
output_path
|
|
]
|
|
|
|
# Run the ffmpeg command and capture output
|
|
logging.log(helpers.LOG_VERBOSE, f"Starting download for {self.title}...")
|
|
helpers.run_ffmpeg_with_progress(command=command, progress_header=f"Downloading \"{self.title}\"")
|
|
logging.log(helpers.LOG_VERBOSE, f"Download saved at '{output_path}'")
|
|
return output_path
|