406 lines
13 KiB
Python
Executable File
406 lines
13 KiB
Python
Executable File
#!/usr/bin/env nix-script
|
|
#!>python3
|
|
#! python3 | requests beautifulsoup4
|
|
#! shell | ffmpeg
|
|
#! env | EUSER EPASS
|
|
|
|
import requests
|
|
import subprocess
|
|
import argparse
|
|
import tempfile
|
|
import pathlib
|
|
import getpass
|
|
import json
|
|
import sys
|
|
import os
|
|
|
|
from datetime import datetime
|
|
from requests.utils import unquote, urlparse
|
|
from bs4 import BeautifulSoup
|
|
|
|
|
|
# combines raw descriptions and default values
|
|
formatter = type('CustomFormatter',
|
|
(argparse.RawDescriptionHelpFormatter,
|
|
argparse.ArgumentDefaultsHelpFormatter), {})
|
|
|
|
parser = argparse.ArgumentParser(
|
|
formatter_class=formatter,
|
|
description='''
|
|
Download all video lessons from an elearning course.
|
|
|
|
The videos are taken at the original quality and encoded
|
|
using x265 slow profile, 96kb/s opus for audio, via ffmpeg.
|
|
|
|
You can run the program multiple times to keep the archive
|
|
in sync with elearning: existing files won't be replaced or
|
|
downloaded again, even if you have renamed them.
|
|
|
|
If authentication is required the EUSER,EPASS variables
|
|
are tried for logging in, otherwise they will be prompted.
|
|
Only Kaltura videos are supported (dual screen and captions
|
|
work, though).''',
|
|
epilog='''
|
|
Copyright (C) 2020 Michele Guerini Rocco (rnhmjoj)
|
|
|
|
This program is free software: you can redistribute it and/or modify
|
|
it under the terms of the GNU General Public License as published by
|
|
the Free Software Foundation, either version 3 of the License, or
|
|
(at your option) any later version.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
''')
|
|
parser.add_argument('course_id', metavar='course-id', type=str,
|
|
help='the id of the course to download. it can be found'
|
|
' at the end of the course homepage url')
|
|
parser.add_argument('--skip', '-s', metavar='N', type=int,
|
|
default=0, help='skip the first N links')
|
|
parser.add_argument('--link-only', '-l', action='store_true',
|
|
help='only print the links without downloading')
|
|
parser.add_argument('--json', '-j', action='store_true',
|
|
help='print the video metadata in JSON')
|
|
parser.add_argument('--directory', '-d', metavar='DIR',
|
|
default='.', type=pathlib.Path,
|
|
help='directory where to save the videos. defaults to'
|
|
' the currenct directory if not given')
|
|
parser.add_argument('--ffmpeg', '-f', metavar='ARG',
|
|
type=str, default=[], nargs='+',
|
|
help='extra arguments to pass to ffmpeg')
|
|
parser.add_argument('--base-url', metavar='URL', type=str,
|
|
default='https://elearning.unimib.it',
|
|
help='the base elearning website url')
|
|
parser.add_argument('--auth-url', metavar='URL', type=str,
|
|
default='https://idp-idm.unimib.it/idp/'
|
|
'profile/SAML2/Redirect/SSO',
|
|
help='the url of Shibboleth identity provider.'
|
|
' if you have no idea what it is, leave it')
|
|
|
|
|
|
def printr(*args, **kwargs):
|
|
'''
|
|
Shorthand for print to the stderr.
|
|
'''
|
|
print(*args, **kwargs, file=sys.stderr)
|
|
|
|
|
|
def inputr(prompt):
|
|
printr(prompt, end='')
|
|
return input()
|
|
|
|
|
|
def getenv(var, fallback):
|
|
'''
|
|
Read an environment variable or use
|
|
a call a function for a default value.
|
|
'''
|
|
val = os.environ.get(var)
|
|
return fallback() if val is None else val
|
|
|
|
|
|
def open_course(url, args):
|
|
'''
|
|
GET and parse the couse page.
|
|
Also tames the legendary black beast of Shibboleth.
|
|
'''
|
|
session = requests.Session()
|
|
|
|
res = session.get(url)
|
|
page = BeautifulSoup(res.content, 'html.parser')
|
|
printr('done')
|
|
|
|
# do the authentication
|
|
if 'enrol' in res.url:
|
|
printr('\n# authentication needed.. sigh')
|
|
username = getenv('EUSER', lambda: inputr('username: '))
|
|
password = getenv('EPASS', lambda: getpass.getpass('password: '))
|
|
|
|
# elearning login request
|
|
key = page.find('input', attrs={'name': 'sesskey'})['value']
|
|
res = session.get(args.base_url + '/auth/unimibsaml/login.php',
|
|
params=dict(wantsurl=url, sesskey=key))
|
|
|
|
# shibboleth auth request
|
|
page = BeautifulSoup(res.content, 'html.parser')
|
|
res = session.post(args.auth_url, params=dict(execution='e1s1'),
|
|
data=dict(_eventId_proceed=''))
|
|
|
|
# submit shibboleth login form
|
|
login_form = dict(j_username=username,
|
|
j_password=password,
|
|
_eventId_proceed='')
|
|
res = session.post(res.url, data=login_form)
|
|
|
|
if 'e1s3' in res.url:
|
|
printr('# authentication failed :(')
|
|
exit(1)
|
|
|
|
# finally get the auth token
|
|
page = BeautifulSoup(res.content, 'html.parser')
|
|
form = page.find('form')
|
|
resp = form.find('input', attrs={'name': 'SAMLResponse'})['value']
|
|
res = session.post(unquote(form['action']),
|
|
data=dict(SAMLResponse=resp))
|
|
page = BeautifulSoup(res.content, 'html.parser')
|
|
|
|
printr('# done!\n')
|
|
|
|
return session, page
|
|
|
|
|
|
def parse(url, session=requests):
|
|
'''
|
|
GET a url and parse the html response.
|
|
'''
|
|
res = session.get(url)
|
|
return BeautifulSoup(res.content, 'html.parser')
|
|
|
|
|
|
def get_info(partner_id, entry_id):
|
|
'''
|
|
Downloads metadata information of the video
|
|
with 'entry_id' from the 'partner_id'.
|
|
'''
|
|
url = 'https://cdnapisec.kaltura.com/api_v3/index.php'
|
|
|
|
init = dict(
|
|
action='null',
|
|
apiVersion='3.1',
|
|
clientTag='kwidget:v2.80',
|
|
format=1,
|
|
service='multirequest')
|
|
|
|
# this returns a session key "ks"
|
|
# which is used in subsequest reqs.
|
|
session = dict(
|
|
expiry=86400,
|
|
service='session',
|
|
action='startWidgetSession',
|
|
widgetId=f'_{partner_id}')
|
|
|
|
# video metadata
|
|
info_parent = {
|
|
'action': 'get',
|
|
'entryId': entry_id,
|
|
'service': 'baseentry',
|
|
'ks': '{1:result:ks}'}
|
|
|
|
# child contains a secondary stream:
|
|
# it could be screen+webcam
|
|
info_child = {
|
|
'ks': '{1:result:ks}',
|
|
'service': 'baseEntry',
|
|
'action': 'list',
|
|
'filter:objectType': 'KalturaBaseEntryFilter',
|
|
'filter:typeEqual': 1,
|
|
'filter:parentEntryIdEqual': entry_id}
|
|
|
|
# join requests
|
|
query = init
|
|
for i, a in enumerate([session, info_parent, info_child], start=1):
|
|
for k, v in a.items():
|
|
query['%d:%s' % (i, k)] = v
|
|
|
|
info_parent, info_child = requests.get(url, params=query).json()[1:]
|
|
|
|
info = [info_parent]
|
|
if info_child['totalCount'] > 0:
|
|
info += info_child['objects']
|
|
|
|
# strip html from description
|
|
for i in info:
|
|
text = BeautifulSoup(i['description'], 'html.parser').get_text()
|
|
i['description'] = text
|
|
return info
|
|
|
|
|
|
def extract_ids(page, partner_id=None):
|
|
'''
|
|
Given the player iframe page extracts the
|
|
'partner_id' and 'entry_id' of the video.
|
|
The partner is is only fetcher if 'partner_id' is
|
|
None, this saves one http request per video.
|
|
'''
|
|
url = page.find(id='contentframe')['src']
|
|
query = urlparse(url).query
|
|
params = dict(i.split('=') for i in query.split('&'))
|
|
source = unquote(params['source'])
|
|
|
|
settings = urlparse(source).path.split('/')
|
|
entry_id = settings[settings.index('entryid') + 1]
|
|
|
|
if partner_id is None:
|
|
iframe = parse(url)
|
|
partner_id = iframe.find(
|
|
'input', attrs=dict(name='oauth_consumer_key'))['value']
|
|
|
|
return partner_id, entry_id
|
|
|
|
|
|
def save_video(infos, files, args):
|
|
'''
|
|
Download and convert the video
|
|
using ffmpeg and x265.
|
|
'''
|
|
urls = (i['downloadUrl'] for i in infos)
|
|
info = infos[0]
|
|
|
|
# fallback to name if no description
|
|
if not info['description']:
|
|
info['description'] = info['name']
|
|
|
|
# use the description as a filename
|
|
title = []
|
|
for word in info['description'].split():
|
|
if word != '-':
|
|
title.append(word.lower())
|
|
filename = '-'.join(title)
|
|
|
|
# parse creation date
|
|
date = datetime.fromtimestamp(info['createdAt'])
|
|
info['createdAt'] = date.isoformat()
|
|
|
|
# create directory if necessary
|
|
dir = pathlib.Path(args.directory)
|
|
dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
# create ffmpeg input args
|
|
inputs, maps = [], []
|
|
for i, url in enumerate(urls):
|
|
inputs.extend(['-i', url])
|
|
maps.extend(['-map', str(i) + (':v' if i > 0 else '')])
|
|
|
|
# video ids, used to check for existing files
|
|
ids = ','.join(i['id'] for i in infos)
|
|
|
|
if ids in files:
|
|
printr('# already downloaded "{description}"'.format_map(info))
|
|
printr('# skipping', end='\n\n')
|
|
return
|
|
|
|
# where to save the stream
|
|
tmp = pathlib.Path(tempfile.gettempdir())
|
|
original = (tmp / filename).with_suffix('.mkv')
|
|
|
|
base = [
|
|
'ffmpeg', '-hide_banner',
|
|
'-loglevel', 'error',
|
|
'-stats'
|
|
]
|
|
download = base + inputs + maps + ['-y', original]
|
|
|
|
convert = base + args.ffmpeg + [
|
|
# source
|
|
'-i', original,
|
|
'-map', '0',
|
|
# video
|
|
'-c:v', 'libx265', '-preset', 'slow', '-crf', '23',
|
|
# audio
|
|
'-c:a', 'libopus', '-b:a', '96k',
|
|
# metadata
|
|
'-metadata', 'title=' + info['description'],
|
|
'-metadata', 'AUTHOR=' + info['userId'],
|
|
'-metadata', 'DATE=' + info['createdAt'],
|
|
'-metadata', 'IDS=' + ids,
|
|
# output
|
|
(dir / filename).with_suffix('.mkv')
|
|
]
|
|
|
|
info['duration'] = int(info['duration'])/60
|
|
printr('# downloading "{description}" '
|
|
'- {duration:.1f}min'.format_map(info))
|
|
printr('# by {userId}, {views} views'.format_map(info))
|
|
|
|
# retry on failure
|
|
for _ in range(3):
|
|
try:
|
|
print('# copying')
|
|
subprocess.run(download, check=True)
|
|
print('# converting')
|
|
subprocess.run(convert, check=True)
|
|
print()
|
|
break
|
|
except subprocess.CalledProcessError:
|
|
if input('Conversion failed, retry? [Y/n]') == 'n':
|
|
break
|
|
|
|
# remove original file
|
|
original.unlink(missing_ok=True)
|
|
|
|
|
|
def get_filenames(dir):
|
|
'''
|
|
This is where the magic happens. This extracts the `IDS`
|
|
tag from the downloaded videos and builts a dictionary
|
|
ids -> filename. Checking these ids we can avoid downloading
|
|
existing videos even if they were renamed.
|
|
'''
|
|
files = {}
|
|
for file in dir.glob('*.mkv'):
|
|
ffprobe = ['ffprobe', file, '-show_format', '-of', 'json']
|
|
output = subprocess.run(ffprobe, capture_output=True).stdout
|
|
try:
|
|
metadata = json.loads(output)['format']
|
|
files[metadata['tags']['IDS']] = file
|
|
except KeyError:
|
|
pass
|
|
|
|
return files
|
|
|
|
|
|
def main(args):
|
|
course = ('{base_url}/course'
|
|
'/view.php?id={course_id}'.format_map(vars(args)))
|
|
|
|
printr('* opening course...', end='', flush=True)
|
|
session, page = open_course(course, args)
|
|
|
|
links = []
|
|
for li in page.find_all('li', class_='kalvidres'):
|
|
links.append(li.find('a')['href'])
|
|
printr('* {} videos found!\n'.format(len(links) or 'no'))
|
|
|
|
# filenames of already saved videos
|
|
files = get_filenames(args.directory)
|
|
|
|
partner = None
|
|
output = []
|
|
for i, link in enumerate(links[args.skip:], start=args.skip):
|
|
page = parse(link, session)
|
|
|
|
printr(f'{i+1}. fetching video metadata...', end='', flush=True)
|
|
partner, entry = extract_ids(page, partner)
|
|
info = get_info(partner, entry)
|
|
printr('done')
|
|
|
|
if args.link_only:
|
|
print('desc: {description}\n'
|
|
'author: {userId}\n'
|
|
'views: {views}'.format_map(info[0]))
|
|
if len(info) > 1:
|
|
print('dual video')
|
|
print('camera url:', info[0]['downloadUrl'])
|
|
print('screen url:', info[1]['downloadUrl'])
|
|
else:
|
|
print('url:', info[0]['downloadUrl'])
|
|
printr()
|
|
else:
|
|
save_video(info, files, args)
|
|
if args.json:
|
|
output.append(info)
|
|
|
|
if args.json:
|
|
print(json.dumps(output))
|
|
|
|
|
|
if __name__ == '__main__':
|
|
try:
|
|
main(parser.parse_args())
|
|
except KeyboardInterrupt:
|
|
printr('\nbye!')
|