You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

294 lines
11 KiB

##
## cli/dl-kaltura.py
## Downloads videos from Kaltura (external tool #78985) with
## Selenium WebDriver informed by the bCourses API.
##
## Starts a Selenium session, authenticates as a Calnet user,
## asks for a course to download from, then downloads all
## found videos to a target location.
##
## Copyright (c) 2022 Kevin Mo
##
## Libraries
import shutil
import time
import re
import os
import sys
import requests
# from selenium import webdriver
from seleniumwire import webdriver
from selenium.webdriver.firefox.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from getpass import getpass
import json
from api import auth
from tqdm import tqdm, trange
from tqdm.contrib.concurrent import thread_map
## STEP 0: Common variables
DATA_LOCATION = "D:/Class/Lectures"
IMPLICIT_WAIT_PERIOD = 10 # how long the driver should wait to find elements before quitting (general)
IMPLICIT_LONG_PERIOD = 30 # how long the driver should wait to find elements before quitting (ajax/spa elements)
LECTURE_FETCH_PERIOD = 10 # how long the driver should stream lectures to fetch all download links/resources
FETCH_ALL_STALL_TIME = 3 # how long the driver should stall before expanding more through the expand button
NUM_PARALLEL_THREADS = 5 # how many parallel threads to download lecture resources
DRIVER_HEADLESS_MODE = True # whether to run the driver in headless mode (browser does not show up)
DRIVER_AUDIO_DISABLE = True # whether to disable browser audio on the driver
## STEP 1: Start session and load cookies
options = Options()
if DRIVER_HEADLESS_MODE:
options.add_argument("--headless")
if DRIVER_AUDIO_DISABLE:
options.set_preference("media.volume_scale", "0.0")
driver = webdriver.Firefox(options=options)
driver.implicitly_wait(IMPLICIT_WAIT_PERIOD)
## STEP 2: Direct to bcourses, then check auth.berkeley.edu
BCOURSES_URL = "https://bcourses.berkeley.edu"
driver.get("https://bcourses.berkeley.edu")
if auth.check_calnet_auth(driver):
# Prompt user for username
uid = input("CalNet ID: ")
pwd = getpass("CalNet password: ")
print("Performing authentication using credentials...")
auth.perform_calnet_auth(driver, uid, pwd)
wait = WebDriverWait(driver, IMPLICIT_WAIT_PERIOD)
if not wait.until(EC.url_contains(BCOURSES_URL)):
raise Exception("Could not navigate to bCourses url")
## STEP 3: Navigate to bCourses API and parse JSON
BCOURSES_API = "https://bcourses.berkeley.edu/api/v1/courses"
page_num = 1
selected_class = None
while True:
driver.get(BCOURSES_API + "?page=" + str(page_num))
json_content = driver.find_element(By.ID, "json").text
all_classes = json.loads(json_content)
if not isinstance(all_classes, list):
print("Received unexpected JSON response from bCourses API.")
print()
print("Select a class to download from:")
for i, bclass in enumerate(all_classes):
if "name" not in bclass:
bclass["name"] = "Unknown class"
print(str(i+1) + ") " + bclass["name"] + " (class ID #" + str(bclass["id"]) + ")")
class_option = input("Select an numbered option ('b' for last page, 'n' for next page): ")
if class_option == 'b':
page_num = max(1, page_num-1)
elif class_option == 'n':
page_num = page_num + 1
else:
try:
class_idx = int(class_option)
selected_class = all_classes[class_idx-1]
break
except:
print("Invalid input or numbered option!")
if selected_class is None:
raise Exception("No class ID found.")
print("\nSelected " + selected_class["name"])
## STEP 3.5: Initialize directory for video transfer
final_dir = DATA_LOCATION + "/" + re.sub(r'[^\w\s\(\)]', '', selected_class["name"])
if os.path.isdir(final_dir):
print("Folder already detected for this class in data directory.")
confirm_del = input("Delete this directory to proceed? (y/N): ")
if confirm_del.lower() != "y":
sys.exit(1)
print("Deleting class directory...")
shutil.rmtree(final_dir)
os.makedirs(final_dir, exist_ok=False)
## STEP 4: Navigate to Kaltura and pull all videos
driver.get(BCOURSES_URL + "/courses/" + str(selected_class["id"]) + "/external_tools/78985")
kaltura_frame = driver.find_element(By.ID, "tool_content")
driver.switch_to.frame(kaltura_frame)
# Fetch more gallery items by clicking on
# the more button
print("Fetching all videos from Kaltura library...")
try:
while True:
expand_more = driver.find_element(By.CSS_SELECTOR, ".endless-scroll-more > .btn")
expand_more.click()
time.sleep(FETCH_ALL_STALL_TIME)
except:
print("Expanded all videos from the listing as possible.")
gallery_elems = driver.find_elements(By.CLASS_NAME, "galleryItem")
num_elems = len(gallery_elems)
class GalleryItem:
def __init__(self, elem, index=-1):
self.index = index
self.title = elem.find_element(By.CLASS_NAME, "thumb_name_content").text
self.author = elem.find_element(By.CLASS_NAME, "userLink").text
self.date_added = elem.find_element(By.CSS_SELECTOR, ".thumbTimeAdded > span > span").text
self.thumbnail = elem.find_element(By.CLASS_NAME, "thumb_img").get_attribute("src")
self.video_url = elem.find_element(By.CLASS_NAME, "item_link").get_attribute("href")
self.download_urls = {}
self.srt_urls = {}
self.download_path = None
self.processed = False
self.downloaded = False
def __str__(self):
return "(#" + self.str_index() + ") " + self.title + " - " + self.author
def get_folder_name(self):
return re.sub(r'[^\w\s\(\)\-]', '', str(self))
def str_index(self):
if self.index < 0:
return "UNK"
return f'{self.index:03}'
gallery_items = [GalleryItem(g, index=num_elems-i) for i, g in enumerate(gallery_elems)]
print("This tool will download", len(gallery_items), "videos from the Kaltura gallery.\n")
driver.switch_to.parent_frame()
# Regex patterns
re_vid = re.compile(r"\/(scf\/hls)\/p\/(\d+)\/sp\/(\d+)\/serveFlavor\/entryId\/(\w+)\/v\/\d+\/ev\/\d+\/flavorId\/(\w+)\/name\/([\w\.]+)\/seg-(\d+)-[\w\-]+.ts")
re_str = re.compile(r"\/api_v3\/index.php\/service\/caption_captionAsset\/action\/serve\/captionAssetId\/(\w+)(:?[\/\w]+)\/ks\/([\w\-]+)\/.srt")
print("Now processing detailed metadata and download links for all videos.")
print("This process will take a while to complete.")
def process_gallery_item(gallery_item):
"""
Gather full information for each video and
find video + srt links from browser requests.
"""
# Read in requests
def read_requests(request):
if request.host == "cfvod.kaltura.com":
vid_match = re_vid.match(request.path)
srt_match = re_str.match(request.path)
if vid_match:
gallery_item.download_urls[vid_match.group(4) + '.mp4'] = request.url.replace("/scf/hls/", "/pd/")
elif srt_match:
gallery_item.srt_urls[srt_match.group(1) + '.srt'] = request.url
# Reset all requests to proxy
del driver.requests
driver.request_interceptor = read_requests
driver.get(gallery_item.video_url)
wait = WebDriverWait(driver, IMPLICIT_LONG_PERIOD)
wait.until(EC.visibility_of_all_elements_located((By.CLASS_NAME, "userLink")))
gallery_item.author = driver.find_element(By.CLASS_NAME, "userLink").text
wait = WebDriverWait(driver, IMPLICIT_LONG_PERIOD)
wait.until(EC.visibility_of_all_elements_located((By.CSS_SELECTOR, "#js-entry-create-at > span")))
gallery_item.date_added = driver.find_element(By.CSS_SELECTOR, "#js-entry-create-at > span").text
wait = WebDriverWait(driver, IMPLICIT_LONG_PERIOD)
wait.until(EC.element_to_be_clickable((By.ID, "kplayer_ifp")))
play_frame = driver.find_element(By.ID, "kplayer_ifp")
driver.switch_to.frame(play_frame)
play_button = driver.find_element(By.CLASS_NAME, "largePlayBtn")
play_button.click()
# Open quality options
settings_button = driver.find_element(By.CSS_SELECTOR, ".sourceSelector > button")
settings_button.click()
"""
pbar = trange(50, position=1)
for _ in pbar:
time.sleep(0.1)
pbar.set_description("Reading network")
"""
time.sleep(LECTURE_FETCH_PERIOD)
gallery_item.processed = True
del driver.request_interceptor
def print_gallery_item(gallery_item):
print(str(gallery_item))
print(gallery_item.date_added)
print(gallery_item.thumbnail)
print(gallery_item.video_url)
print(gallery_item.download_urls)
print(gallery_item.srt_urls)
with tqdm(gallery_items, position=0) as pbar:
for gallery_item in pbar:
pbar.set_description("Processing '" + gallery_item.title + "'")
process_gallery_item(gallery_item)
print()
print("All links have finished pre-processing.")
print()
print("Creating the required folders for each lecture and")
print("saving pre-processed metadata...")
with tqdm(gallery_items, position=0) as pbar:
for gallery_item in pbar:
pbar.set_description("Allocating '" + gallery_item.title + "'")
# Create the subfolder
dl_path = final_dir + "/" + gallery_item.get_folder_name()
os.makedirs(dl_path)
gallery_item.download_path = dl_path
with open(dl_path + "/download.json", 'w') as f:
f.write(json.dumps(gallery_item.__dict__, indent=4))
# Close the driver connection
driver.close()
## STEP 5: Download all the lecture data in parallel
print()
print("Downloading lecture data in parallel (# of streams: " + str(NUM_PARALLEL_THREADS) + ").")
print("This process will take very long depending on your internet speed,")
print("but should take less time to finish towards the end.")
def download_lecture(gallery_item):
dl_path = gallery_item.download_path
# Download all subtitles
dl_list = {**gallery_item.download_urls, **gallery_item.srt_urls}
for fname in dl_list:
if not os.path.exists(dl_path + "/" + fname):
r = requests.get(dl_list[fname], stream=True)
if r.status_code == 200:
total_length = int(r.headers.get("Content-Length"))
with tqdm.wrapattr(r.raw, "read", total=total_length, desc=gallery_item.title + " (" + fname + ")") as raw:
# save the output to a file
with open(dl_path + "/" + fname, 'wb') as output:
shutil.copyfileobj(raw, output)
"""
with open(dl_path + "/" + fname, "wb") as f:
for chunk in r:
f.write(chunk)
"""
else:
print("Encountered unexpected status code", r.status_code, "while downloading", fname)
gallery_item.downloaded = True
return gallery_item
next_items = thread_map(download_lecture, gallery_items, max_workers=NUM_PARALLEL_THREADS, position=0)
print()
print("Status Report:")
print("Total videos downloaded", sum([i.downloaded for i in next_items]), "out of", len(next_items))
print("Tool has finished execution.")