#!/usr/bin/env nix-script #!>python3 #! python3 | requests beautifulsoup4 #! shell | ffmpeg #! env | EUSER EPASS import requests import subprocess import argparse import pathlib import getpass import json import sys import os from datetime import datetime from requests.utils import unquote, urlparse from bs4 import BeautifulSoup # combines raw descriptions and default values formatter = type('CustomFormatter', (argparse.RawDescriptionHelpFormatter, argparse.ArgumentDefaultsHelpFormatter), {}) parser = argparse.ArgumentParser( formatter_class=formatter, description=''' Download all video lessons from an elearning course. The videos are taken at the original quality and encoded using h.265 slow profile, 96kb/s opus for audio, via ffmpeg. You can run the program multiple times to keep the archive in sync with elearning: existing files won't be replaced or downloaded again, even if you have renamed them. If authentication is required the EUSER,EPASS variables are tried for logging in, otherwise they will be prompted. Only Kaltura videos are supported (dual screen and captions work, though).''', epilog=''' Copyright (C) 2020 Michele Guerini Rocco (rnhmjoj) This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see . ''') parser.add_argument('course_id', metavar='course-id', type=str, help='the id of the course to download. it can be found' ' at the end of the course homepage url') parser.add_argument('--skip', '-s', metavar='N', type=int, default=0, help='skip the first N links') parser.add_argument('--link-only', '-l', action='store_true', help='only print the links without downloading') parser.add_argument('--json', '-j', action='store_true', help='print the video metadata in JSON') parser.add_argument('--directory', '-d', metavar='DIR', default='.', type=pathlib.Path, help='directory where to save the videos. defaults to' ' the currenct directory if not given') parser.add_argument('--ffmpeg', '-f', metavar='ARG', type=str, default=[], nargs='+', help='extra arguments to pass to ffmpeg') parser.add_argument('--base-url', metavar='URL', type=str, default='https://elearning.unimib.it', help='the base elearning website url') parser.add_argument('--auth-url', metavar='URL', type=str, default='https://idp-idm.unimib.it/idp/' 'profile/SAML2/Redirect/SSO', help='the url of Shibboleth identity provider.' ' if you have no idea what it is, leave it') def printr(*args, **kwargs): ''' Shorthand for print to the stderr. ''' print(*args, **kwargs, file=sys.stderr) def inputr(prompt): printr(prompt, end='') return input() def getenv(var, fallback): ''' Read an environment variable or use a call a function for a default value. ''' val = os.getenv(var) if val is None: return fallback() def open_course(url, args): ''' GET and parse the couse page. Also tames the legendary black beast of Shibboleth. ''' session = requests.Session() res = session.get(url) page = BeautifulSoup(res.content, 'html.parser') printr('done') # do the authentication if 'enrol' in res.url: printr('\n# authentication needed.. sigh') username = getenv('EUSER', lambda: inputr('username: ')) password = getenv('EPASS', lambda: getpass.getpass('password: ')) # elearning login request key = page.find('input', attrs={'name': 'sesskey'})['value'] res = session.get(args.base_url + '/auth/unimibsaml/login.php', params=dict(wantsurl=url, sesskey=key)) # shibboleth auth request page = BeautifulSoup(res.content, 'html.parser') res = session.post(args.auth_url, params=dict(execution='e1s1'), data=dict(_eventId_proceed='')) # submit shibboleth login form login_form = dict(j_username=username, j_password=password, _eventId_proceed='') res = session.post(res.url, data=login_form) if 'e1s3' in res.url: printr('# authentication failed :(') exit(1) # finally get the auth token page = BeautifulSoup(res.content, 'html.parser') form = page.find('form') resp = form.find('input', attrs={'name': 'SAMLResponse'})['value'] res = session.post(unquote(form['action']), data=dict(SAMLResponse=resp)) page = BeautifulSoup(res.content, 'html.parser') printr('# done!\n') return session, page def parse(url, session=requests): ''' GET a url and parse the html response. ''' res = session.get(url) return BeautifulSoup(res.content, 'html.parser') def get_info(partner_id, entry_id): ''' Downloads metadata information of the video with 'entry_id' from the 'partner_id'. ''' url = 'https://cdnapisec.kaltura.com/api_v3/index.php' init = dict( action='null', apiVersion='3.1', clientTag='kwidget:v2.80', format=1, service='multirequest') # this returns a session key "ks" # which is used in subsequest reqs. session = dict( expiry=86400, service='session', action='startWidgetSession', widgetId=f'_{partner_id}') # video metadata info_parent = { 'action': 'get', 'entryId': entry_id, 'service': 'baseentry', 'ks': '{1:result:ks}'} # child contains a secondary stream: # it could be screen+webcam info_child = { 'ks': '{1:result:ks}', 'service': 'baseEntry', 'action': 'list', 'filter:objectType': 'KalturaBaseEntryFilter', 'filter:typeEqual': 1, 'filter:parentEntryIdEqual': entry_id} # join requests query = init for i, a in enumerate([session, info_parent, info_child], start=1): for k, v in a.items(): query['%d:%s' % (i, k)] = v info_parent, info_child = requests.get(url, params=query).json()[1:] info = [info_parent] if info_child['totalCount'] > 0: info += info_child['objects'] # strip html from description for i in info: text = BeautifulSoup(i['description'], 'html.parser').get_text() i['description'] = text return info def extract_ids(page, partner_id=None): ''' Given the player iframe page extracts the 'partner_id' and 'entry_id' of the video. The partner is is only fetcher if 'partner_id' is None, this saves one http request per video. ''' url = page.find(id='contentframe')['src'] query = urlparse(url).query params = dict(i.split('=') for i in query.split('&')) source = unquote(params['source']) settings = urlparse(source).path.split('/') entry_id = settings[settings.index('entryid') + 1] if partner_id is None: iframe = parse(url) partner_id = iframe.find( 'input', attrs=dict(name='oauth_consumer_key'))['value'] return partner_id, entry_id def save_video(infos, files, args): ''' Download and convert the video using ffmpeg and x265. ''' urls = (i['downloadUrl'] for i in infos) info = infos[0] # use the description as a filename title = [] for word in info['description'].split(): if word != '-': title.append(word.lower()) filename = '-'.join(title) # parse creation date date = datetime.fromtimestamp(info['createdAt']) info['createdAt'] = date.isoformat() # create directory if necessary dir = pathlib.Path(args.directory) dir.mkdir(parents=True, exist_ok=True) # create ffmpeg input args inputs, maps = [], [] for i, url in enumerate(urls): inputs.extend(['-i', url]) maps.extend(['-map', str(i) + (':v' if i > 0 else '')]) # video ids, used to check for existing files ids = ','.join(i['id'] for i in infos) if ids in files: printr('# already downloaded "{description}"'.format_map(info)) printr('# skipping', end='\n\n') return ffmpeg = [ 'ffmpeg', '-hide_banner', '-loglevel', 'error', '-stats', '-y' ] + inputs + maps + args.ffmpeg + [ # video '-c:v', 'libx265', '-preset', 'slow', '-crf', '23', '-x265-params', 'log-level=error', # audio '-c:a', 'libopus', '-b:a', '96k', # metadata '-metadata', 'title=' + info['description'], '-metadata', 'AUTHOR=' + info['userId'], '-metadata', 'DATE=' + info['createdAt'], '-metadata', 'IDS=' + ids, # output (dir / filename).with_suffix('.mkv') ] info['duration'] = int(info['duration'])/60 printr('# downloading "{description}" ' '- {duration:.1f}min'.format_map(info)) printr('# by {userId}, {views} views'.format_map(info)) subprocess.run(ffmpeg) printr() def get_filenames(dir): ''' This is where the magic happens. This extracts the `IDS` tag from the downloaded videos and builts a dictionary ids -> filename. Checking these ids we can avoid downloading existing videos even if they were renamed. ''' files = {} for file in dir.glob('*.mkv'): ffprobe = ['ffprobe', file, '-show_format', '-of', 'json'] output = subprocess.run(ffprobe, capture_output=True).stdout metadata = json.loads(output)['format'] files[metadata['tags']['IDS']] = file return files def main(args): course = ('{base_url}/course' '/view.php?id={course_id}'.format_map(vars(args))) printr('* opening course...', end='', flush=True) session, page = open_course(course, args) links = [] for li in page.find_all('li', class_='kalvidres'): links.append(li.find('a')['href']) printr('* {} videos found!\n'.format(len(links) or 'no')) # filenames of already saved videos files = get_filenames(args.directory) partner = None output = [] for i, link in enumerate(links[args.skip:], start=args.skip): page = parse(link, session) printr(f'{i+1}. fetching video metadata...', end='', flush=True) partner, entry = extract_ids(page, partner) info = get_info(partner, entry) printr('done') if args.link_only: print('desc: {description}\n' 'author: {userId}\n' 'views: {views}'.format_map(info[0])) if len(info) > 1: print('dual video') print('camera url:', info[0]['downloadUrl']) print('screen url:', info[1]['downloadUrl']) else: print('url:', info[0]['downloadUrl']) printr() if args.json: output.append(info) else: save_video(info, files, args) if args.json: print(json.dumps(output)) if __name__ == '__main__': try: main(parser.parse_args()) except KeyboardInterrupt: printr('\nbye!')