misc/python/elearning.py

382 lines
12 KiB
Python
Executable File

#!/usr/bin/env nix-script
#!>python3
#! python3 | requests beautifulsoup4
#! shell | ffmpeg
#! env | EUSER EPASS
import requests
import subprocess
import argparse
import pathlib
import getpass
import json
import sys
import os
from datetime import datetime
from requests.utils import unquote, urlparse
from bs4 import BeautifulSoup
# combines raw descriptions and default values
formatter = type('CustomFormatter',
(argparse.RawDescriptionHelpFormatter,
argparse.ArgumentDefaultsHelpFormatter), {})
parser = argparse.ArgumentParser(
formatter_class=formatter,
description='''
Download all video lessons from an elearning course.
The videos are taken at the original quality and encoded
using h.265 slow profile, 96kb/s opus for audio, via ffmpeg.
You can run the program multiple times to keep the archive
in sync with elearning: existing files won't be replaced or
downloaded again, even if you have renamed them.
If authentication is required the EUSER,EPASS variables
are tried for logging in, otherwise they will be prompted.
Only Kaltura videos are supported (dual screen and captions
work, though).''',
epilog='''
Copyright (C) 2020 Michele Guerini Rocco (rnhmjoj)
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
''')
parser.add_argument('course_id', metavar='course-id', type=str,
help='the id of the course to download. it can be found'
' at the end of the course homepage url')
parser.add_argument('--skip', '-s', metavar='N', type=int,
default=0, help='skip the first N links')
parser.add_argument('--link-only', '-l', action='store_true',
help='only print the links without downloading')
parser.add_argument('--json', '-j', action='store_true',
help='print the video metadata in JSON')
parser.add_argument('--directory', '-d', metavar='DIR',
default='.', type=pathlib.Path,
help='directory where to save the videos. defaults to'
' the currenct directory if not given')
parser.add_argument('--ffmpeg', '-f', metavar='ARG',
type=str, default=[], nargs='+',
help='extra arguments to pass to ffmpeg')
parser.add_argument('--base-url', metavar='URL', type=str,
default='https://elearning.unimib.it',
help='the base elearning website url')
parser.add_argument('--auth-url', metavar='URL', type=str,
default='https://idp-idm.unimib.it/idp/'
'profile/SAML2/Redirect/SSO',
help='the url of Shibboleth identity provider.'
' if you have no idea what it is, leave it')
def printr(*args, **kwargs):
'''
Shorthand for print to the stderr.
'''
print(*args, **kwargs, file=sys.stderr)
def inputr(prompt):
printr(prompt, end='')
return input()
def getenv(var, fallback):
'''
Read an environment variable or use
a call a function for a default value.
'''
val = os.getenv(var)
if val is None:
return fallback()
def open_course(url, args):
'''
GET and parse the couse page.
Also tames the legendary black beast of Shibboleth.
'''
session = requests.Session()
res = session.get(url)
page = BeautifulSoup(res.content, 'html.parser')
printr('done')
# do the authentication
if 'enrol' in res.url:
printr('\n# authentication needed.. sigh')
username = getenv('EUSER', lambda: inputr('username: '))
password = getenv('EPASS', lambda: getpass.getpass('password: '))
# elearning login request
key = page.find('input', attrs={'name': 'sesskey'})['value']
res = session.get(args.base_url + '/auth/unimibsaml/login.php',
params=dict(wantsurl=url, sesskey=key))
# shibboleth auth request
page = BeautifulSoup(res.content, 'html.parser')
res = session.post(args.auth_url, params=dict(execution='e1s1'),
data=dict(_eventId_proceed=''))
# submit shibboleth login form
login_form = dict(j_username=username,
j_password=password,
_eventId_proceed='')
res = session.post(res.url, data=login_form)
if 'e1s3' in res.url:
printr('# authentication failed :(')
exit(1)
# finally get the auth token
page = BeautifulSoup(res.content, 'html.parser')
form = page.find('form')
resp = form.find('input', attrs={'name': 'SAMLResponse'})['value']
res = session.post(unquote(form['action']),
data=dict(SAMLResponse=resp))
page = BeautifulSoup(res.content, 'html.parser')
printr('# done!\n')
return session, page
def parse(url, session=requests):
'''
GET a url and parse the html response.
'''
res = session.get(url)
return BeautifulSoup(res.content, 'html.parser')
def get_info(partner_id, entry_id):
'''
Downloads metadata information of the video
with 'entry_id' from the 'partner_id'.
'''
url = 'https://cdnapisec.kaltura.com/api_v3/index.php'
init = dict(
action='null',
apiVersion='3.1',
clientTag='kwidget:v2.80',
format=1,
service='multirequest')
# this returns a session key "ks"
# which is used in subsequest reqs.
session = dict(
expiry=86400,
service='session',
action='startWidgetSession',
widgetId=f'_{partner_id}')
# video metadata
info_parent = {
'action': 'get',
'entryId': entry_id,
'service': 'baseentry',
'ks': '{1:result:ks}'}
# child contains a secondary stream:
# it could be screen+webcam
info_child = {
'ks': '{1:result:ks}',
'service': 'baseEntry',
'action': 'list',
'filter:objectType': 'KalturaBaseEntryFilter',
'filter:typeEqual': 1,
'filter:parentEntryIdEqual': entry_id}
# join requests
query = init
for i, a in enumerate([session, info_parent, info_child], start=1):
for k, v in a.items():
query['%d:%s' % (i, k)] = v
info_parent, info_child = requests.get(url, params=query).json()[1:]
info = [info_parent]
if info_child['totalCount'] > 0:
info += info_child['objects']
# strip html from description
for i in info:
text = BeautifulSoup(i['description'], 'html.parser').get_text()
i['description'] = text
return info
def extract_ids(page, partner_id=None):
'''
Given the player iframe page extracts the
'partner_id' and 'entry_id' of the video.
The partner is is only fetcher if 'partner_id' is
None, this saves one http request per video.
'''
url = page.find(id='contentframe')['src']
query = urlparse(url).query
params = dict(i.split('=') for i in query.split('&'))
source = unquote(params['source'])
settings = urlparse(source).path.split('/')
entry_id = settings[settings.index('entryid') + 1]
if partner_id is None:
iframe = parse(url)
partner_id = iframe.find(
'input', attrs=dict(name='oauth_consumer_key'))['value']
return partner_id, entry_id
def save_video(infos, files, args):
'''
Download and convert the video
using ffmpeg and x265.
'''
urls = (i['downloadUrl'] for i in infos)
info = infos[0]
# fallback to name if no description
if not info['description']:
info['description'] = info['name']
# use the description as a filename
title = []
for word in info['description'].split():
if word != '-':
title.append(word.lower())
filename = '-'.join(title)
# parse creation date
date = datetime.fromtimestamp(info['createdAt'])
info['createdAt'] = date.isoformat()
# create directory if necessary
dir = pathlib.Path(args.directory)
dir.mkdir(parents=True, exist_ok=True)
# create ffmpeg input args
inputs, maps = [], []
for i, url in enumerate(urls):
inputs.extend(['-i', url])
maps.extend(['-map', str(i) + (':v' if i > 0 else '')])
# video ids, used to check for existing files
ids = ','.join(i['id'] for i in infos)
if ids in files:
printr('# already downloaded "{description}"'.format_map(info))
printr('# skipping', end='\n\n')
return
ffmpeg = [
'ffmpeg', '-hide_banner',
'-loglevel', 'error',
'-stats'
] + inputs + maps + args.ffmpeg + [
# video
'-c:v', 'libx265', '-preset', 'slow', '-crf', '23',
'-x265-params', 'log-level=error',
# audio
'-c:a', 'libopus', '-b:a', '96k',
# metadata
'-metadata', 'title=' + info['description'],
'-metadata', 'AUTHOR=' + info['userId'],
'-metadata', 'DATE=' + info['createdAt'],
'-metadata', 'IDS=' + ids,
# output
(dir / filename).with_suffix('.mkv')
]
info['duration'] = int(info['duration'])/60
printr('# downloading "{description}" '
'- {duration:.1f}min'.format_map(info))
printr('# by {userId}, {views} views'.format_map(info))
subprocess.run(ffmpeg)
printr()
def get_filenames(dir):
'''
This is where the magic happens. This extracts the `IDS`
tag from the downloaded videos and builts a dictionary
ids -> filename. Checking these ids we can avoid downloading
existing videos even if they were renamed.
'''
files = {}
for file in dir.glob('*.mkv'):
ffprobe = ['ffprobe', file, '-show_format', '-of', 'json']
output = subprocess.run(ffprobe, capture_output=True).stdout
metadata = json.loads(output)['format']
files[metadata['tags']['IDS']] = file
return files
def main(args):
course = ('{base_url}/course'
'/view.php?id={course_id}'.format_map(vars(args)))
printr('* opening course...', end='', flush=True)
session, page = open_course(course, args)
links = []
for li in page.find_all('li', class_='kalvidres'):
links.append(li.find('a')['href'])
printr('* {} videos found!\n'.format(len(links) or 'no'))
# filenames of already saved videos
files = get_filenames(args.directory)
partner = None
output = []
for i, link in enumerate(links[args.skip:], start=args.skip):
page = parse(link, session)
printr(f'{i+1}. fetching video metadata...', end='', flush=True)
partner, entry = extract_ids(page, partner)
info = get_info(partner, entry)
printr('done')
if args.link_only:
print('desc: {description}\n'
'author: {userId}\n'
'views: {views}'.format_map(info[0]))
if len(info) > 1:
print('dual video')
print('camera url:', info[0]['downloadUrl'])
print('screen url:', info[1]['downloadUrl'])
else:
print('url:', info[0]['downloadUrl'])
printr()
else:
save_video(info, files, args)
if args.json:
output.append(info)
if args.json:
print(json.dumps(output))
if __name__ == '__main__':
try:
main(parser.parse_args())
except KeyboardInterrupt:
printr('\nbye!')