misc/python/elearning.py

406 lines
13 KiB
Python
Raw Normal View History

2020-04-04 10:49:40 +02:00
#!/usr/bin/env nix-script
#!>python3
#! python3 | requests beautifulsoup4
#! shell | ffmpeg
#! env | EUSER EPASS
import requests
import subprocess
import argparse
2020-12-11 14:14:36 +01:00
import tempfile
2020-04-04 10:49:40 +02:00
import pathlib
import getpass
import json
import sys
import os
from datetime import datetime
from requests.utils import unquote, urlparse
from bs4 import BeautifulSoup
2020-04-07 19:42:33 +02:00
# combines raw descriptions and default values
formatter = type('CustomFormatter',
(argparse.RawDescriptionHelpFormatter,
argparse.ArgumentDefaultsHelpFormatter), {})
2020-04-04 10:49:40 +02:00
parser = argparse.ArgumentParser(
2020-04-07 19:42:33 +02:00
formatter_class=formatter,
2020-04-04 10:49:40 +02:00
description='''
Download all video lessons from an elearning course.
The videos are taken at the original quality and encoded
2020-12-11 14:14:36 +01:00
using x265 slow profile, 96kb/s opus for audio, via ffmpeg.
2020-04-04 10:49:40 +02:00
2020-04-07 19:42:33 +02:00
You can run the program multiple times to keep the archive
in sync with elearning: existing files won't be replaced or
downloaded again, even if you have renamed them.
2020-04-04 10:49:40 +02:00
If authentication is required the EUSER,EPASS variables
are tried for logging in, otherwise they will be prompted.
Only Kaltura videos are supported (dual screen and captions
work, though).''',
epilog='''
Copyright (C) 2020 Michele Guerini Rocco (rnhmjoj)
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
''')
parser.add_argument('course_id', metavar='course-id', type=str,
help='the id of the course to download. it can be found'
' at the end of the course homepage url')
parser.add_argument('--skip', '-s', metavar='N', type=int,
default=0, help='skip the first N links')
parser.add_argument('--link-only', '-l', action='store_true',
help='only print the links without downloading')
parser.add_argument('--json', '-j', action='store_true',
help='print the video metadata in JSON')
parser.add_argument('--directory', '-d', metavar='DIR',
2020-04-07 19:42:33 +02:00
default='.', type=pathlib.Path,
2020-04-04 10:49:40 +02:00
help='directory where to save the videos. defaults to'
' the currenct directory if not given')
parser.add_argument('--ffmpeg', '-f', metavar='ARG',
type=str, default=[], nargs='+',
help='extra arguments to pass to ffmpeg')
parser.add_argument('--base-url', metavar='URL', type=str,
default='https://elearning.unimib.it',
help='the base elearning website url')
parser.add_argument('--auth-url', metavar='URL', type=str,
default='https://idp-idm.unimib.it/idp/'
'profile/SAML2/Redirect/SSO',
help='the url of Shibboleth identity provider.'
' if you have no idea what it is, leave it')
def printr(*args, **kwargs):
'''
Shorthand for print to the stderr.
'''
print(*args, **kwargs, file=sys.stderr)
def inputr(prompt):
printr(prompt, end='')
return input()
def getenv(var, fallback):
'''
Read an environment variable or use
a call a function for a default value.
'''
2020-11-24 14:18:44 +01:00
val = os.environ.get(var)
return fallback() if val is None else val
2020-04-04 10:49:40 +02:00
def open_course(url, args):
'''
GET and parse the couse page.
Also tames the legendary black beast of Shibboleth.
'''
session = requests.Session()
res = session.get(url)
page = BeautifulSoup(res.content, 'html.parser')
printr('done')
# do the authentication
if 'enrol' in res.url:
printr('\n# authentication needed.. sigh')
username = getenv('EUSER', lambda: inputr('username: '))
password = getenv('EPASS', lambda: getpass.getpass('password: '))
# elearning login request
key = page.find('input', attrs={'name': 'sesskey'})['value']
res = session.get(args.base_url + '/auth/unimibsaml/login.php',
params=dict(wantsurl=url, sesskey=key))
# shibboleth auth request
page = BeautifulSoup(res.content, 'html.parser')
res = session.post(args.auth_url, params=dict(execution='e1s1'),
data=dict(_eventId_proceed=''))
# submit shibboleth login form
login_form = dict(j_username=username,
j_password=password,
_eventId_proceed='')
res = session.post(res.url, data=login_form)
if 'e1s3' in res.url:
printr('# authentication failed :(')
exit(1)
# finally get the auth token
page = BeautifulSoup(res.content, 'html.parser')
form = page.find('form')
resp = form.find('input', attrs={'name': 'SAMLResponse'})['value']
res = session.post(unquote(form['action']),
data=dict(SAMLResponse=resp))
page = BeautifulSoup(res.content, 'html.parser')
printr('# done!\n')
return session, page
def parse(url, session=requests):
'''
GET a url and parse the html response.
'''
res = session.get(url)
return BeautifulSoup(res.content, 'html.parser')
def get_info(partner_id, entry_id):
'''
Downloads metadata information of the video
with 'entry_id' from the 'partner_id'.
'''
url = 'https://cdnapisec.kaltura.com/api_v3/index.php'
init = dict(
action='null',
apiVersion='3.1',
clientTag='kwidget:v2.80',
format=1,
service='multirequest')
# this returns a session key "ks"
# which is used in subsequest reqs.
session = dict(
expiry=86400,
service='session',
action='startWidgetSession',
widgetId=f'_{partner_id}')
# video metadata
info_parent = {
'action': 'get',
'entryId': entry_id,
'service': 'baseentry',
'ks': '{1:result:ks}'}
# child contains a secondary stream:
# it could be screen+webcam
info_child = {
'ks': '{1:result:ks}',
'service': 'baseEntry',
'action': 'list',
'filter:objectType': 'KalturaBaseEntryFilter',
'filter:typeEqual': 1,
'filter:parentEntryIdEqual': entry_id}
# join requests
query = init
for i, a in enumerate([session, info_parent, info_child], start=1):
for k, v in a.items():
query['%d:%s' % (i, k)] = v
info_parent, info_child = requests.get(url, params=query).json()[1:]
info = [info_parent]
if info_child['totalCount'] > 0:
info += info_child['objects']
# strip html from description
for i in info:
text = BeautifulSoup(i['description'], 'html.parser').get_text()
i['description'] = text
return info
def extract_ids(page, partner_id=None):
'''
Given the player iframe page extracts the
'partner_id' and 'entry_id' of the video.
The partner is is only fetcher if 'partner_id' is
None, this saves one http request per video.
'''
url = page.find(id='contentframe')['src']
query = urlparse(url).query
params = dict(i.split('=') for i in query.split('&'))
source = unquote(params['source'])
settings = urlparse(source).path.split('/')
entry_id = settings[settings.index('entryid') + 1]
if partner_id is None:
iframe = parse(url)
partner_id = iframe.find(
'input', attrs=dict(name='oauth_consumer_key'))['value']
return partner_id, entry_id
2020-04-07 19:42:33 +02:00
def save_video(infos, files, args):
2020-04-04 10:49:40 +02:00
'''
Download and convert the video
using ffmpeg and x265.
'''
urls = (i['downloadUrl'] for i in infos)
info = infos[0]
# fallback to name if no description
if not info['description']:
info['description'] = info['name']
2020-04-04 10:49:40 +02:00
# use the description as a filename
title = []
for word in info['description'].split():
if word != '-':
title.append(word.lower())
filename = '-'.join(title)
# parse creation date
date = datetime.fromtimestamp(info['createdAt'])
info['createdAt'] = date.isoformat()
# create directory if necessary
dir = pathlib.Path(args.directory)
dir.mkdir(parents=True, exist_ok=True)
# create ffmpeg input args
inputs, maps = [], []
for i, url in enumerate(urls):
inputs.extend(['-i', url])
maps.extend(['-map', str(i) + (':v' if i > 0 else '')])
2020-04-07 19:42:33 +02:00
# video ids, used to check for existing files
ids = ','.join(i['id'] for i in infos)
if ids in files:
printr('# already downloaded "{description}"'.format_map(info))
printr('# skipping', end='\n\n')
return
2020-12-11 14:14:36 +01:00
# where to save the stream
tmp = pathlib.Path(tempfile.gettempdir())
original = (tmp / filename).with_suffix('.mkv')
base = [
2020-04-04 10:49:40 +02:00
'ffmpeg', '-hide_banner',
'-loglevel', 'error',
'-stats'
2020-12-11 14:14:36 +01:00
]
download = base + inputs + maps + ['-y', original]
convert = base + args.ffmpeg + [
# source
'-i', original,
'-map', '0',
2020-04-04 10:49:40 +02:00
# video
'-c:v', 'libx265', '-preset', 'slow', '-crf', '23',
# audio
'-c:a', 'libopus', '-b:a', '96k',
# metadata
'-metadata', 'title=' + info['description'],
'-metadata', 'AUTHOR=' + info['userId'],
'-metadata', 'DATE=' + info['createdAt'],
2020-04-07 19:42:33 +02:00
'-metadata', 'IDS=' + ids,
2020-04-04 10:49:40 +02:00
# output
(dir / filename).with_suffix('.mkv')
]
info['duration'] = int(info['duration'])/60
printr('# downloading "{description}" '
'- {duration:.1f}min'.format_map(info))
printr('# by {userId}, {views} views'.format_map(info))
2020-12-11 14:14:36 +01:00
# retry on failure
for _ in range(3):
try:
print('# copying')
subprocess.run(download, check=True)
print('# converting')
subprocess.run(convert, check=True)
print()
break
except subprocess.CalledProcessError:
if input('Conversion failed, retry? [Y/n]') == 'n':
break
# remove original file
original.unlink(missing_ok=True)
2020-04-04 10:49:40 +02:00
2020-04-07 19:42:33 +02:00
def get_filenames(dir):
'''
This is where the magic happens. This extracts the `IDS`
tag from the downloaded videos and builts a dictionary
ids -> filename. Checking these ids we can avoid downloading
existing videos even if they were renamed.
'''
files = {}
for file in dir.glob('*.mkv'):
ffprobe = ['ffprobe', file, '-show_format', '-of', 'json']
output = subprocess.run(ffprobe, capture_output=True).stdout
2020-12-11 14:14:36 +01:00
try:
metadata = json.loads(output)['format']
files[metadata['tags']['IDS']] = file
except KeyError:
pass
2020-04-07 19:42:33 +02:00
return files
2020-04-04 10:49:40 +02:00
def main(args):
course = ('{base_url}/course'
'/view.php?id={course_id}'.format_map(vars(args)))
printr('* opening course...', end='', flush=True)
session, page = open_course(course, args)
links = []
for li in page.find_all('li', class_='kalvidres'):
links.append(li.find('a')['href'])
printr('* {} videos found!\n'.format(len(links) or 'no'))
2020-04-07 19:42:33 +02:00
# filenames of already saved videos
files = get_filenames(args.directory)
2020-04-04 10:49:40 +02:00
partner = None
output = []
for i, link in enumerate(links[args.skip:], start=args.skip):
page = parse(link, session)
printr(f'{i+1}. fetching video metadata...', end='', flush=True)
partner, entry = extract_ids(page, partner)
info = get_info(partner, entry)
printr('done')
if args.link_only:
print('desc: {description}\n'
'author: {userId}\n'
'views: {views}'.format_map(info[0]))
if len(info) > 1:
print('dual video')
print('camera url:', info[0]['downloadUrl'])
print('screen url:', info[1]['downloadUrl'])
else:
print('url:', info[0]['downloadUrl'])
printr()
else:
2020-04-07 19:42:33 +02:00
save_video(info, files, args)
if args.json:
output.append(info)
2020-04-04 10:49:40 +02:00
if args.json:
print(json.dumps(output))
if __name__ == '__main__':
try:
main(parser.parse_args())
except KeyboardInterrupt:
printr('\nbye!')