Files
beacon-snatch/beacon_snatch/content.py
2025-02-09 23:15:21 -05:00

151 lines
6.1 KiB
Python

from .authentication import BeaconAuthentication
from .stream import BeaconStreamInfo
from . import helpers
import subprocess
import requests
import logging
import json
import m3u8
import os
from selenium import webdriver
from selenium.webdriver.common.by import By
content_url = "https://beacon.tv/content"
class BeaconContent:
def __init__(self, auth : BeaconAuthentication):
self.auth = auth
self.id = None
self.title = None
self.description = None
self.duration = None
self.slug = None
self.publishedDate = None
self.primaryCollection = None
self.closedCaptions = None
self.m3u8_url = None
self.m3u8_obj = None
self.available_streams = []
@property
def video_only_streams(self):
return [stream for stream in self.available_streams if not stream.audio_codec and stream.video_codec]
@property
def audio_only_streams(self):
return [stream for stream in self.available_streams if stream.audio_codec and not stream.video_codec]
@property
def video_and_audio_streams(self):
return [stream for stream in self.available_streams if stream.audio_codec and stream.video_codec]
@classmethod
def create(cls, auth : BeaconAuthentication, content_id : str):
# Initialize the browser
driver = auth.get_driver()
new_content = None
# grab the chunk of json that holds the key to where we can get our m3u8 url
driver.get(f"{content_url}/{content_id}")
script_block = driver.find_element(By.ID, '__NEXT_DATA__')
json_data = script_block.get_attribute('innerHTML')
json_blob = json.loads(json_data)
# Traverse the JSON to find the "Content_ContentVideo" block
# this block is under a block with a key of "Content:[content_id]"
# so we need to find an element under "__APOLLO_STATE__" with "__typename" of "Content" first
apollo_state = json_blob.get("props", {}).get("pageProps", {}).get("__APOLLO_STATE__", {})
content_block = None
for key, value in apollo_state.items():
if isinstance(value, dict) and value.get("__typename") == "Content":
if value["slug"] != content_id:
continue
content_block = value
break
if not content_block:
logging.warn(f"cant find content for content_id \"{content_id}\"")
return None
if "video" not in content_block["contentType"]:
logging.warn(f"Skipping non-video content \"{ content_block['contentType'] }\" for Content \"{content_id}\"")
return None
content_video_block = content_block["contentVideo"]
if content_video_block and "video" in content_video_block:
new_content = BeaconContent(auth)
# Extract all the info for this video from the block
new_content.id = content_block["id"]
new_content.title = content_block["title"]
new_content.description = content_block["description"]
new_content.duration = content_block["duration"]
new_content.slug = content_block["slug"]
new_content.publishedDate = content_block["publishedAt"]
# new_content.primaryCollection = content_block["primaryCollection"]["__ref"] # sometimes crashes but we dont need it so it can remain commented out
new_content.m3u8_url = content_video_block["video"]["video"]
new_content.closedCaptions = content_video_block["video"]["closedCaptions"]
new_content.fetch()
else:
logging.error("Content_ContentVideo block or video data not found.")
return new_content
# Fetches the m3u8 playlist to get all available streams that we can download
def fetch(self):
try:
# Fetch the m3u8 playlist content
response = requests.get(self.m3u8_url)
response.raise_for_status()
self.m3u8_obj = m3u8.loads(response.text)
logging.log(helpers.LOG_VERBOSE, f'M3U8 file fetched')
# create BeaconStreamInfo for every found in this m3u8's playlist
for stream_info in self.m3u8_obj.playlists:
new_stream = BeaconStreamInfo.from_m3u8_playlist(stream_info)
if new_stream is not None:
self.available_streams.append(new_stream)
self.available_streams.sort(
key=lambda stream: ((stream.width or 0) * (stream.height or 0), stream.bandwidth or 0),
reverse=True)
except requests.exceptions.RequestException as e:
logging.error(f"Error fetching the M3U8 file: {e}")
# Downloads the given stream using ffmpeg and saves it to the destination folder.
def download(self, stream: BeaconStreamInfo, destination_folder: str = "."):
# Ensure the destination folder exists
if not os.path.exists(destination_folder):
os.makedirs(destination_folder)
# Sanitize the title to create a safe filename
safe_title = helpers.sanitize(self.slug)
file_extension = "mp4" if stream.video_codec else "m4a"
output_filename = f"{safe_title}.{file_extension}"
output_path = os.path.join(destination_folder, output_filename)
# Build the ffmpeg command
command = [
"ffmpeg",
"-i", stream.m3u8_uri, # Input M3U8 URI from the stream
"-c", "copy", # Copy codecs without re-encoding
"-bsf:a", "aac_adtstoasc", # Bitstream filter for AAC audio
"-y",
output_path
]
# Run the ffmpeg command and capture output
logging.log(helpers.LOG_VERBOSE, f"Starting download for {self.title}...")
helpers.run_ffmpeg_with_progress(command=command, progress_header=f"Downloading \"{self.title}\"")
logging.log(helpers.LOG_VERBOSE, f"Download saved at '{output_path}'")
return output_path