Merge branch 'download-page' of https://github.com/Kingdread/qutebrowser into Kingdread-download-page

This commit is contained in:
Florian Bruhin 2015-11-10 22:39:07 +01:00
commit 7cddd52b2d
14 changed files with 968 additions and 44 deletions

View File

@ -100,6 +100,9 @@ The following software and libraries are required to run qutebrowser:
* http://pygments.org/[pygments] * http://pygments.org/[pygments]
* http://pyyaml.org/wiki/PyYAML[PyYAML] * http://pyyaml.org/wiki/PyYAML[PyYAML]
The following libraries are optional and provide a better user experience:
* http://cthedot.de/cssutils/[cssutils]
To generate the documentation for the `:help` command, when using the git To generate the documentation for the `:help` command, when using the git
repository (rather than a release), http://asciidoc.org/[asciidoc] is needed. repository (rather than a release), http://asciidoc.org/[asciidoc] is needed.

View File

@ -145,13 +145,19 @@ Close the current window.
[[download]] [[download]]
=== download === download
Syntax: +:download ['url'] ['dest']+ Syntax: +:download [*--mhtml*] [*--dest* 'DEST'] ['url'] ['dest-old']+
Download a given URL, or current page if no URL given. Download a given URL, or current page if no URL given.
The form `:download [url] [dest]` is deprecated, use `:download --dest [dest] [url]` instead.
==== positional arguments ==== positional arguments
* +'url'+: The URL to download. If not given, download the current page. * +'url'+: The URL to download. If not given, download the current page.
* +'dest'+: The file path to write the download to, or not given to ask. * +'dest-old'+: (deprecated) Same as dest.
==== optional arguments
* +*-m*+, +*--mhtml*+: Download the current page and all assets as mhtml file.
* +*-d*+, +*--dest*+: The file path to write the download to, or not given to ask.
[[download-cancel]] [[download-cancel]]
=== download-cancel === download-cancel

View File

@ -37,7 +37,7 @@ import pygments.formatters
from qutebrowser.commands import userscripts, cmdexc, cmdutils, runners from qutebrowser.commands import userscripts, cmdexc, cmdutils, runners
from qutebrowser.config import config, configexc from qutebrowser.config import config, configexc
from qutebrowser.browser import webelem, inspector, urlmarks from qutebrowser.browser import webelem, inspector, urlmarks, downloads, mhtml
from qutebrowser.keyinput import modeman from qutebrowser.keyinput import modeman
from qutebrowser.utils import (message, usertypes, log, qtutils, urlutils, from qutebrowser.utils import (message, usertypes, log, qtutils, urlutils,
objreg, utils) objreg, utils)
@ -1140,22 +1140,68 @@ class CommandDispatcher:
cur.inspector.show() cur.inspector.show()
@cmdutils.register(instance='command-dispatcher', scope='window') @cmdutils.register(instance='command-dispatcher', scope='window')
def download(self, url=None, dest=None): def download(self, url=None, dest_old=None, *, mhtml_=False, dest=None):
"""Download a given URL, or current page if no URL given. """Download a given URL, or current page if no URL given.
The form `:download [url] [dest]` is deprecated, use `:download --dest
[dest] [url]` instead.
Args: Args:
url: The URL to download. If not given, download the current page. url: The URL to download. If not given, download the current page.
dest_old: (deprecated) Same as dest.
dest: The file path to write the download to, or None to ask. dest: The file path to write the download to, or None to ask.
mhtml_: Download the current page and all assets as mhtml file.
""" """
if dest_old is not None:
message.warning(
self._win_id, ":download [url] [dest] is deprecated - use"
" download --dest [dest] [url]")
if dest is not None:
raise cmdexc.CommandError("Can't give two destinations for the"
" download.")
dest = dest_old
download_manager = objreg.get('download-manager', scope='window', download_manager = objreg.get('download-manager', scope='window',
window=self._win_id) window=self._win_id)
if url: if url:
if mhtml_:
raise cmdexc.CommandError("Can only download the current page"
" as mhtml.")
url = urlutils.qurl_from_user_input(url) url = urlutils.qurl_from_user_input(url)
urlutils.raise_cmdexc_if_invalid(url) urlutils.raise_cmdexc_if_invalid(url)
download_manager.get(url, filename=dest) download_manager.get(url, filename=dest)
else: else:
page = self._current_widget().page() if mhtml_:
download_manager.get(self._current_url(), page=page) self._download_mhtml(dest)
else:
page = self._current_widget().page()
download_manager.get(self._current_url(), page=page,
filename=dest)
def _download_mhtml(self, dest=None):
"""Download the current page as a MHTML file, including all assets.
Args:
dest: The file path to write the download to.
"""
tab_id = self._current_index()
if dest is None:
suggested_fn = self._current_title() + ".mht"
suggested_fn = utils.sanitize_filename(suggested_fn)
q = usertypes.Question()
q.text = "Save page to: "
q.mode = usertypes.PromptMode.text
q.completed.connect(q.deleteLater)
q.default = downloads.path_suggestion(suggested_fn)
q.answered.connect(functools.partial(
mhtml.start_download_checked, win_id=self._win_id,
tab_id=tab_id))
message_bridge = objreg.get("message-bridge", scope="window",
window=self._win_id)
message_bridge.ask(q, blocking=False)
else:
mhtml.start_download_checked(dest, win_id=self._win_id,
tab_id=tab_id)
@cmdutils.register(instance='command-dispatcher', scope='window', @cmdutils.register(instance='command-dispatcher', scope='window',
deprecated="Use :download instead.") deprecated="Use :download instead.")

View File

@ -49,7 +49,7 @@ ModelRole = usertypes.enum('ModelRole', ['item'], start=Qt.UserRole,
RetryInfo = collections.namedtuple('RetryInfo', ['request', 'manager']) RetryInfo = collections.namedtuple('RetryInfo', ['request', 'manager'])
# Remember the last used directory # Remember the last used directory
_last_used_directory = None last_used_directory = None
# All REFRESH_INTERVAL milliseconds, speeds will be recalculated and downloads # All REFRESH_INTERVAL milliseconds, speeds will be recalculated and downloads
@ -57,20 +57,20 @@ _last_used_directory = None
REFRESH_INTERVAL = 500 REFRESH_INTERVAL = 500
def _download_dir(): def download_dir():
"""Get the download directory to use.""" """Get the download directory to use."""
directory = config.get('storage', 'download-directory') directory = config.get('storage', 'download-directory')
remember_dir = config.get('storage', 'remember-download-directory') remember_dir = config.get('storage', 'remember-download-directory')
if remember_dir and _last_used_directory is not None: if remember_dir and last_used_directory is not None:
return _last_used_directory return last_used_directory
elif directory is None: elif directory is None:
return standarddir.download() return standarddir.download()
else: else:
return directory return directory
def _path_suggestion(filename): def path_suggestion(filename):
"""Get the suggested file path. """Get the suggested file path.
Args: Args:
@ -79,15 +79,36 @@ def _path_suggestion(filename):
suggestion = config.get('completion', 'download-path-suggestion') suggestion = config.get('completion', 'download-path-suggestion')
if suggestion == 'path': if suggestion == 'path':
# add trailing '/' if not present # add trailing '/' if not present
return os.path.join(_download_dir(), '') return os.path.join(download_dir(), '')
elif suggestion == 'filename': elif suggestion == 'filename':
return filename return filename
elif suggestion == 'both': elif suggestion == 'both':
return os.path.join(_download_dir(), filename) return os.path.join(download_dir(), filename)
else: else:
raise ValueError("Invalid suggestion value {}!".format(suggestion)) raise ValueError("Invalid suggestion value {}!".format(suggestion))
def create_full_filename(basename, filename):
"""Create a full filename based on the given basename and filename.
Args:
basename: The basename to use if filename is a directory.
filename: The path to a folder or file where you want to save.
Return:
The full absolute path, or None if filename creation was not possible.
"""
if os.path.isabs(filename) and os.path.isdir(filename):
# We got an absolute directory from the user, so we save it under
# the default filename in that directory.
return os.path.join(filename, basename)
elif os.path.isabs(filename):
# We got an absolute filename from the user, so we save it under
# that filename.
return filename
return None
class DownloadItemStats(QObject): class DownloadItemStats(QObject):
"""Statistics (bytes done, total bytes, time, etc.) about a download. """Statistics (bytes done, total bytes, time, etc.) about a download.
@ -201,6 +222,7 @@ class DownloadItem(QObject):
fileobj: The file object to download the file to. fileobj: The file object to download the file to.
reply: The QNetworkReply associated with this download. reply: The QNetworkReply associated with this download.
retry_info: A RetryInfo instance. retry_info: A RetryInfo instance.
raw_headers: The headers sent by the server.
_filename: The filename of the download. _filename: The filename of the download.
_redirects: How many time we were redirected already. _redirects: How many time we were redirected already.
_buffer: A BytesIO object to buffer incoming data until we know the _buffer: A BytesIO object to buffer incoming data until we know the
@ -255,6 +277,7 @@ class DownloadItem(QObject):
self._filename = None self._filename = None
self.init_reply(reply) self.init_reply(reply)
self._win_id = win_id self._win_id = win_id
self.raw_headers = {}
def __repr__(self): def __repr__(self):
return utils.get_repr(self, basename=self.basename) return utils.get_repr(self, basename=self.basename)
@ -354,6 +377,7 @@ class DownloadItem(QObject):
reply.finished.connect(self.on_reply_finished) reply.finished.connect(self.on_reply_finished)
reply.error.connect(self.on_reply_error) reply.error.connect(self.on_reply_error)
reply.readyRead.connect(self.on_ready_read) reply.readyRead.connect(self.on_ready_read)
reply.metaDataChanged.connect(self.on_meta_data_changed)
self.retry_info = RetryInfo(request=reply.request(), self.retry_info = RetryInfo(request=reply.request(),
manager=reply.manager()) manager=reply.manager())
if not self.fileobj: if not self.fileobj:
@ -444,7 +468,7 @@ class DownloadItem(QObject):
filename: The full filename to save the download to. filename: The full filename to save the download to.
None: special value to stop the download. None: special value to stop the download.
""" """
global _last_used_directory global last_used_directory
if self.fileobj is not None: if self.fileobj is not None:
raise ValueError("fileobj was already set! filename: {}, " raise ValueError("fileobj was already set! filename: {}, "
"existing: {}, fileobj {}".format( "existing: {}, fileobj {}".format(
@ -454,13 +478,16 @@ class DownloadItem(QObject):
# See https://github.com/The-Compiler/qutebrowser/issues/427 # See https://github.com/The-Compiler/qutebrowser/issues/427
encoding = sys.getfilesystemencoding() encoding = sys.getfilesystemencoding()
filename = utils.force_encoding(filename, encoding) filename = utils.force_encoding(filename, encoding)
if not self._create_full_filename(filename): self._filename = create_full_filename(self.basename, filename)
if self._filename is None:
# We only got a filename (without directory) or a relative path # We only got a filename (without directory) or a relative path
# from the user, so we append that to the default directory and # from the user, so we append that to the default directory and
# try again. # try again.
self._create_full_filename(os.path.join(_download_dir(), filename)) self._filename = create_full_filename(
self.basename, os.path.join(download_dir(), filename))
_last_used_directory = os.path.dirname(self._filename) self.basename = os.path.basename(self._filename)
last_used_directory = os.path.dirname(self._filename)
log.downloads.debug("Setting filename to {}".format(filename)) log.downloads.debug("Setting filename to {}".format(filename))
if os.path.isfile(self._filename): if os.path.isfile(self._filename):
@ -477,25 +504,6 @@ class DownloadItem(QObject):
else: else:
self._create_fileobj() self._create_fileobj()
def _create_full_filename(self, filename):
"""Try to create the full filename.
Return:
True if the full filename was created, False otherwise.
"""
if os.path.isabs(filename) and os.path.isdir(filename):
# We got an absolute directory from the user, so we save it under
# the default filename in that directory.
self._filename = os.path.join(filename, self.basename)
return True
elif os.path.isabs(filename):
# We got an absolute filename from the user, so we save it under
# that filename.
self._filename = filename
self.basename = os.path.basename(self._filename)
return True
return False
def set_fileobj(self, fileobj): def set_fileobj(self, fileobj):
""""Set the file object to write the download to. """"Set the file object to write the download to.
@ -593,6 +601,15 @@ class DownloadItem(QObject):
if data is not None: if data is not None:
self._buffer.write(data) self._buffer.write(data)
@pyqtSlot()
def on_meta_data_changed(self):
"""Update the download's metadata."""
if self.reply is None:
return
self.raw_headers = {}
for key, value in self.reply.rawHeaderPairs():
self.raw_headers[bytes(key)] = bytes(value)
def _handle_redirect(self): def _handle_redirect(self):
"""Handle a HTTP redirect. """Handle a HTTP redirect.
@ -720,7 +737,7 @@ class DownloadManager(QAbstractListModel):
prompt_download_directory = config.get( prompt_download_directory = config.get(
'storage', 'prompt-download-directory') 'storage', 'prompt-download-directory')
if not prompt_download_directory and not fileobj: if not prompt_download_directory and not fileobj:
filename = _download_dir() filename = download_dir()
if fileobj is not None or filename is not None: if fileobj is not None or filename is not None:
return self.fetch_request(request, return self.fetch_request(request,
@ -735,7 +752,7 @@ class DownloadManager(QAbstractListModel):
suggested_fn = utils.force_encoding(suggested_fn, encoding) suggested_fn = utils.force_encoding(suggested_fn, encoding)
q = self._prepare_question() q = self._prepare_question()
q.default = _path_suggestion(suggested_fn) q.default = path_suggestion(suggested_fn)
message_bridge = objreg.get('message-bridge', scope='window', message_bridge = objreg.get('message-bridge', scope='window',
window=self._win_id) window=self._win_id)
q.answered.connect( q.answered.connect(
@ -820,7 +837,7 @@ class DownloadManager(QAbstractListModel):
prompt_download_directory = config.get('storage', prompt_download_directory = config.get('storage',
'prompt-download-directory') 'prompt-download-directory')
if not prompt_download_directory and not fileobj: if not prompt_download_directory and not fileobj:
filename = _download_dir() filename = download_dir()
if filename is not None: if filename is not None:
download.set_filename(filename) download.set_filename(filename)
@ -829,7 +846,7 @@ class DownloadManager(QAbstractListModel):
download.autoclose = False download.autoclose = False
else: else:
q = self._prepare_question() q = self._prepare_question()
q.default = _path_suggestion(suggested_filename) q.default = path_suggestion(suggested_filename)
q.answered.connect(download.set_filename) q.answered.connect(download.set_filename)
q.cancelled.connect(download.cancel) q.cancelled.connect(download.cancel)
download.cancelled.connect(q.abort) download.cancelled.connect(q.abort)

View File

@ -0,0 +1,511 @@
# vim: ft=python fileencoding=utf-8 sts=4 sw=4 et:
# Copyright 2015 Daniel Schadt
#
# This file is part of qutebrowser.
#
# qutebrowser is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# qutebrowser is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with qutebrowser. If not, see <http://www.gnu.org/licenses/>.
"""Utils for writing a MHTML file."""
import functools
import io
import os
import re
import sys
import collections
import uuid
import email.policy
import email.generator
import email.encoders
import email.mime.multipart
from PyQt5.QtCore import QUrl
from qutebrowser.browser import webelem, downloads
from qutebrowser.utils import log, objreg, message, usertypes, utils, urlutils
try:
import cssutils
except (ImportError, re.error):
# Catching re.error because cssutils in earlier releases (<= 1.0) is broken
# on Python 3.5
# See https://bitbucket.org/cthedot/cssutils/issues/52
cssutils = None
_File = collections.namedtuple('_File',
['content', 'content_type', 'content_location',
'transfer_encoding'])
_CSS_URL_PATTERNS = [re.compile(x) for x in [
r"@import\s+'(?P<url>[^']+)'",
r'@import\s+"(?P<url>[^"]+)"',
r'''url\((?P<url>[^'"][^)]*)\)''',
r'url\("(?P<url>[^"]+)"\)',
r"url\('(?P<url>[^']+)'\)",
]]
def _get_css_imports_regex(data):
"""Return all assets that are referenced in the given CSS document.
The returned URLs are relative to the stylesheet's URL.
Args:
data: The content of the stylesheet to scan as string.
"""
urls = []
for pattern in _CSS_URL_PATTERNS:
for match in pattern.finditer(data):
url = match.group("url")
if url:
urls.append(url)
return urls
def _get_css_imports_cssutils(data, inline=False):
"""Return all assets that are referenced in the given CSS document.
The returned URLs are relative to the stylesheet's URL.
Args:
data: The content of the stylesheet to scan as string.
inline: True if the argument is a inline HTML style attribute.
"""
# We don't care about invalid CSS data, this will only litter the log
# output with CSS errors
parser = cssutils.CSSParser(loglevel=100,
fetcher=lambda url: (None, ""), validate=False)
if not inline:
sheet = parser.parseString(data)
return list(cssutils.getUrls(sheet))
else:
urls = []
declaration = parser.parseStyle(data)
# prop = background, color, margin, ...
for prop in declaration:
# value = red, 10px, url(foobar), ...
for value in prop.propertyValue:
if isinstance(value, cssutils.css.URIValue):
if value.uri:
urls.append(value.uri)
return urls
def _get_css_imports(data, inline=False):
"""Return all assets that are referenced in the given CSS document.
The returned URLs are relative to the stylesheet's URL.
Args:
data: The content of the stylesheet to scan as string.
inline: True if the argument is a inline HTML style attribute.
"""
if cssutils is None:
return _get_css_imports_regex(data)
else:
return _get_css_imports_cssutils(data, inline)
def _check_rel(element):
"""Return true if the element's rel attribute fits our criteria.
rel has to contain 'stylesheet' or 'icon'. Also returns True if the rel
attribute is unset.
Args:
element: The WebElementWrapper which should be checked.
"""
if 'rel' not in element:
return True
must_have = {'stylesheet', 'icon'}
rels = [rel.lower() for rel in element['rel'].split(' ')]
return any(rel in rels for rel in must_have)
MHTMLPolicy = email.policy.default.clone(linesep='\r\n', max_line_length=0)
# Encode the file using base64 encoding.
E_BASE64 = email.encoders.encode_base64
# Encode the file using MIME quoted-printable encoding.
E_QUOPRI = email.encoders.encode_quopri
class MHTMLWriter():
"""A class for outputting multiple files to a MHTML document.
Attributes:
root_content: The root content as bytes.
content_location: The url of the page as str.
content_type: The MIME-type of the root content as str.
_files: Mapping of location->_File namedtuple.
"""
def __init__(self, root_content, content_location, content_type):
self.root_content = root_content
self.content_location = content_location
self.content_type = content_type
self._files = {}
def add_file(self, location, content, content_type=None,
transfer_encoding=E_QUOPRI):
"""Add a file to the given MHTML collection.
Args:
location: The original location (URL) of the file.
content: The binary content of the file.
content_type: The MIME-type of the content (if available)
transfer_encoding: The transfer encoding to use for this file.
"""
self._files[location] = _File(
content=content, content_type=content_type,
content_location=location, transfer_encoding=transfer_encoding,
)
def write_to(self, fp):
"""Output the MHTML file to the given file-like object.
Args:
fp: The file-object, opened in "wb" mode.
"""
msg = email.mime.multipart.MIMEMultipart(
'related', '---=_qute-{}'.format(uuid.uuid4()))
root = self._create_root_file()
msg.attach(root)
for _, file_data in sorted(self._files.items()):
msg.attach(self._create_file(file_data))
gen = email.generator.BytesGenerator(fp, policy=MHTMLPolicy)
gen.flatten(msg)
def _create_root_file(self):
"""Return the root document as MIMEMultipart."""
root_file = _File(
content=self.root_content, content_type=self.content_type,
content_location=self.content_location, transfer_encoding=E_QUOPRI,
)
return self._create_file(root_file)
def _create_file(self, f):
"""Return the single given file as MIMEMultipart."""
msg = email.mime.multipart.MIMEMultipart()
msg['Content-Location'] = f.content_location
# Get rid of the default type multipart/mixed
del msg['Content-Type']
if f.content_type:
msg.set_type(f.content_type)
msg.set_payload(f.content)
f.transfer_encoding(msg)
return msg
class _Downloader():
"""A class to download whole websites.
Attributes:
web_view: The QWebView which contains the website that will be saved.
dest: Destination filename.
writer: The MHTMLWriter object which is used to save the page.
loaded_urls: A set of QUrls of finished asset downloads.
pending_downloads: A set of unfinished (url, DownloadItem) tuples.
_finished: A flag indicating if the file has already been written.
_used: A flag indicating if the downloader has already been used.
"""
def __init__(self, web_view, dest):
self.web_view = web_view
self.dest = dest
self.writer = None
self.loaded_urls = {web_view.url()}
self.pending_downloads = set()
self._finished = False
self._used = False
def run(self):
"""Download and save the page.
The object must not be reused, you should create a new one if
you want to download another page.
"""
if self._used:
raise ValueError("Downloader already used")
self._used = True
web_url = self.web_view.url()
web_frame = self.web_view.page().mainFrame()
self.writer = MHTMLWriter(
web_frame.toHtml().encode('utf-8'),
content_location=urlutils.encoded_url(web_url),
# I've found no way of getting the content type of a QWebView, but
# since we're using .toHtml, it's probably safe to say that the
# content-type is HTML
content_type='text/html; charset="UTF-8"',
)
# Currently only downloading <link> (stylesheets), <script>
# (javascript) and <img> (image) elements.
elements = web_frame.findAllElements('link, script, img')
for element in elements:
element = webelem.WebElementWrapper(element)
# Websites are free to set whatever rel=... attribute they want.
# We just care about stylesheets and icons.
if not _check_rel(element):
continue
if 'src' in element:
element_url = element['src']
elif 'href' in element:
element_url = element['href']
else:
# Might be a local <script> tag or something else
continue
absolute_url = web_url.resolved(QUrl(element_url))
self.fetch_url(absolute_url)
styles = web_frame.findAllElements('style')
for style in styles:
style = webelem.WebElementWrapper(style)
if 'type' in style and style['type'] != 'text/css':
continue
for element_url in _get_css_imports(str(style)):
self.fetch_url(web_url.resolved(QUrl(element_url)))
# Search for references in inline styles
for element in web_frame.findAllElements('[style]'):
element = webelem.WebElementWrapper(element)
style = element['style']
for element_url in _get_css_imports(style, inline=True):
self.fetch_url(web_url.resolved(QUrl(element_url)))
# Shortcut if no assets need to be downloaded, otherwise the file would
# never be saved. Also might happen if the downloads are fast enough to
# complete before connecting their finished signal.
self.collect_zombies()
if not self.pending_downloads and not self._finished:
self.finish_file()
def fetch_url(self, url):
"""Download the given url and add the file to the collection.
Args:
url: The file to download as QUrl.
"""
if url.scheme() not in {'http', 'https'}:
return
# Prevent loading an asset twice
if url in self.loaded_urls:
return
self.loaded_urls.add(url)
log.downloads.debug("loading asset at %s", url)
# Using the download manager to download host-blocked urls might crash
# qute, see the comments/discussion on
# https://github.com/The-Compiler/qutebrowser/pull/962#discussion_r40256987
# and https://github.com/The-Compiler/qutebrowser/issues/1053
host_blocker = objreg.get('host-blocker')
if host_blocker.is_blocked(url):
log.downloads.debug("Skipping %s, host-blocked", url)
# We still need an empty file in the output, QWebView can be pretty
# picky about displaying a file correctly when not all assets are
# at least referenced in the mhtml file.
self.writer.add_file(urlutils.encoded_url(url), b'')
return
download_manager = objreg.get('download-manager', scope='window',
window='current')
item = download_manager.get(url, fileobj=_NoCloseBytesIO(),
auto_remove=True)
self.pending_downloads.add((url, item))
item.finished.connect(
functools.partial(self.finished, url, item))
item.error.connect(
functools.partial(self.error, url, item))
item.cancelled.connect(
functools.partial(self.error, url, item))
def finished(self, url, item):
"""Callback when a single asset is downloaded.
Args:
url: The original url of the asset as QUrl.
item: The DownloadItem given by the DownloadManager
"""
self.pending_downloads.remove((url, item))
mime = item.raw_headers.get(b'Content-Type', b'')
# Note that this decoding always works and doesn't produce errors
# RFC 7230 (https://tools.ietf.org/html/rfc7230) states:
# Historically, HTTP has allowed field content with text in the
# ISO-8859-1 charset [ISO-8859-1], supporting other charsets only
# through use of [RFC2047] encoding. In practice, most HTTP header
# field values use only a subset of the US-ASCII charset [USASCII].
# Newly defined header fields SHOULD limit their field values to
# US-ASCII octets. A recipient SHOULD treat other octets in field
# content (obs-text) as opaque data.
mime = mime.decode('iso-8859-1')
if mime.lower() == 'text/css':
# We can't always assume that CSS files are UTF-8, but CSS files
# shouldn't contain many non-ASCII characters anyway (in most
# cases). Using "ignore" lets us decode the file even if it's
# invalid UTF-8 data.
# The file written to the MHTML file won't be modified by this
# decoding, since there we're taking the original bytestream.
try:
css_string = item.fileobj.getvalue().decode('utf-8')
except UnicodeDecodeError:
log.downloads.warning("Invalid UTF-8 data in %s", url)
css_string = item.fileobj.getvalue().decode('utf-8', 'ignore')
import_urls = _get_css_imports(css_string)
for import_url in import_urls:
absolute_url = url.resolved(QUrl(import_url))
self.fetch_url(absolute_url)
encode = E_QUOPRI if mime.startswith('text/') else E_BASE64
# Our MHTML handler refuses non-ASCII headers. This will replace every
# non-ASCII char with '?'. This is probably okay, as official Content-
# Type headers contain ASCII only anyway. Anything else is madness.
mime = utils.force_encoding(mime, 'ascii')
self.writer.add_file(urlutils.encoded_url(url),
item.fileobj.getvalue(), mime, encode)
item.fileobj.actual_close()
if self.pending_downloads:
return
self.finish_file()
def error(self, url, item, *_args):
"""Callback when a download error occurred.
Args:
url: The orignal url of the asset as QUrl.
item: The DownloadItem given by the DownloadManager.
"""
try:
self.pending_downloads.remove((url, item))
except KeyError:
# This might happen if .collect_zombies() calls .finished() and the
# error handler will be called after .collect_zombies
log.downloads.debug("Oops! Download already gone: %s", item)
return
item.fileobj.actual_close()
# Add a stub file, see comment in .fetch_url() for more information
self.writer.add_file(urlutils.encoded_url(url), b'')
if self.pending_downloads:
return
self.finish_file()
def finish_file(self):
"""Save the file to the filename given in __init__."""
if self._finished:
log.downloads.debug("finish_file called twice, ignored!")
return
self._finished = True
log.downloads.debug("All assets downloaded, ready to finish off!")
with open(self.dest, 'wb') as file_output:
self.writer.write_to(file_output)
message.info('current', "Page saved as {}".format(self.dest))
def collect_zombies(self):
"""Collect done downloads and add their data to the MHTML file.
This is needed if a download finishes before attaching its
finished signal.
"""
items = set((url, item) for url, item in self.pending_downloads
if item.done)
log.downloads.debug("Zombie downloads: %s", items)
for url, item in items:
self.finished(url, item)
class _NoCloseBytesIO(io.BytesIO): # pylint: disable=no-init
"""BytesIO that can't be .closed().
This is needed to prevent the DownloadManager from closing the stream, thus
discarding the data.
"""
def close(self):
"""Do nothing."""
pass
def actual_close(self):
"""Close the stream."""
super().close()
def _start_download(dest, win_id, tab_id):
"""Start downloading the current page and all assets to a MHTML file.
This will overwrite dest if it already exists.
Args:
dest: The filename where the resulting file should be saved.
win_id, tab_id: Specify the tab whose page should be loaded.
"""
web_view = objreg.get('webview', scope='tab', window=win_id, tab=tab_id)
loader = _Downloader(web_view, dest)
loader.run()
def start_download_checked(dest, win_id, tab_id):
"""First check if dest is already a file, then start the download.
Args:
dest: The filename where the resulting file should be saved.
win_id, tab_id: Specify the tab whose page should be loaded.
"""
# The default name is 'page title.mht'
title = (objreg.get('webview', scope='tab', window=win_id, tab=tab_id)
.title())
default_name = utils.sanitize_filename(title + '.mht')
# Remove characters which cannot be expressed in the file system encoding
encoding = sys.getfilesystemencoding()
default_name = utils.force_encoding(default_name, encoding)
dest = utils.force_encoding(dest, encoding)
dest = os.path.expanduser(dest)
# See if we already have an absolute path
path = downloads.create_full_filename(default_name, dest)
if path is None:
# We still only have a relative path, prepend download_dir and
# try again.
path = downloads.create_full_filename(
default_name, os.path.join(downloads.download_dir(), dest))
downloads.last_used_directory = os.path.dirname(path)
if not os.path.isfile(path):
_start_download(path, win_id=win_id, tab_id=tab_id)
return
q = usertypes.Question()
q.mode = usertypes.PromptMode.yesno
q.text = "{} exists. Overwrite?".format(path)
q.completed.connect(q.deleteLater)
q.answered_yes.connect(functools.partial(
_start_download, path, win_id=win_id, tab_id=tab_id))
message_bridge = objreg.get('message-bridge', scope='window',
window=win_id)
message_bridge.ask(q, blocking=False)

View File

@ -438,6 +438,15 @@ def same_domain(url1, url2):
return domain1 == domain2 return domain1 == domain2
def encoded_url(url):
"""Return the fully encoded url as string.
Args:
url: The url to encode as QUrl.
"""
return bytes(url.toEncoded()).decode('ascii')
class IncDecError(Exception): class IncDecError(Exception):
"""Exception raised by incdec_number on problems. """Exception raised by incdec_number on problems.

View File

@ -611,6 +611,27 @@ def force_encoding(text, encoding):
return text.encode(encoding, errors='replace').decode(encoding) return text.encode(encoding, errors='replace').decode(encoding)
def sanitize_filename(name, replacement='_'):
"""Replace invalid filename characters.
Note: This should be used for the basename, as it also removes the path
separator.
Args:
name: The filename.
replacement: The replacement character (or None).
"""
if replacement is None:
replacement = ''
# Bad characters taken from Windows, there are even fewer on Linux
# See also
# https://en.wikipedia.org/wiki/Filename#Reserved_characters_and_words
bad_chars = '\\/:*?"<>|'
for bad_char in bad_chars:
name = name.replace(bad_char, replacement)
return name
def newest_slice(iterable, count): def newest_slice(iterable, count):
"""Get an iterable for the n newest items of the given iterable. """Get an iterable for the n newest items of the given iterable.

View File

@ -133,6 +133,7 @@ def _module_versions():
('jinja2', ['__version__']), ('jinja2', ['__version__']),
('pygments', ['__version__']), ('pygments', ['__version__']),
('yaml', ['__version__']), ('yaml', ['__version__']),
('cssutils', ['__version__']),
]) ])
for name, attributes in modules.items(): for name, attributes in modules.items():
try: try:

View File

@ -5,3 +5,4 @@ pyPEG2==2.15.2
PyYAML==3.11 PyYAML==3.11
colorama==0.3.3 colorama==0.3.3
colorlog==2.6.0 colorlog==2.6.0
cssutils==1.0.1

View File

@ -80,6 +80,7 @@ def whitelist_generator():
# https://bitbucket.org/jendrikseipp/vulture/issues/10/ # https://bitbucket.org/jendrikseipp/vulture/issues/10/
yield 'qutebrowser.misc.utilcmds.pyeval_output' yield 'qutebrowser.misc.utilcmds.pyeval_output'
yield 'utils.use_color' yield 'utils.use_color'
yield 'qutebrowser.browser.mhtml.last_used_directory'
# Other false-positives # Other false-positives
yield ('qutebrowser.completion.models.sortfilter.CompletionFilterModel().' yield ('qutebrowser.completion.models.sortfilter.CompletionFilterModel().'

View File

@ -0,0 +1,277 @@
# vim: ft=python fileencoding=utf-8 sts=4 sw=4 et:
import io
import textwrap
import re
import pytest
from qutebrowser.browser import mhtml
@pytest.fixture(autouse=True)
def patch_uuid(monkeypatch):
monkeypatch.setattr("uuid.uuid4", lambda: "UUID")
class Checker:
"""A helper to check mhtml output.
Attrs:
fp: A BytesIO object for passing to MHTMLWriter.write_to.
"""
def __init__(self):
self.fp = io.BytesIO()
@property
def value(self):
return self.fp.getvalue()
def expect(self, expected):
actual = self.value.decode('ascii')
# Make sure there are no stray \r or \n
assert re.search(r'\r[^\n]', actual) is None
assert re.search(r'[^\r]\n', actual) is None
actual = actual.replace('\r\n', '\n')
expected = textwrap.dedent(expected).lstrip('\n')
assert expected == actual
@pytest.fixture
def checker():
return Checker()
def test_quoted_printable_umlauts(checker):
content = 'Die süße Hündin läuft in die Höhle des Bären'
content = content.encode('iso-8859-1')
writer = mhtml.MHTMLWriter(root_content=content,
content_location='localhost',
content_type='text/plain')
writer.write_to(checker.fp)
checker.expect("""
Content-Type: multipart/related; boundary="---=_qute-UUID"
MIME-Version: 1.0
-----=_qute-UUID
Content-Location: localhost
MIME-Version: 1.0
Content-Type: text/plain
Content-Transfer-Encoding: quoted-printable
Die=20s=FC=DFe=20H=FCndin=20l=E4uft=20in=20die=20H=F6hle=20des=20B=E4ren
-----=_qute-UUID--
""")
@pytest.mark.parametrize('header, value', [
('content_location', 'http://brötli.com'),
('content_type', 'text/pläin'),
])
def test_refuses_non_ascii_header_value(checker, header, value):
defaults = {
'root_content': b'',
'content_location': 'http://example.com',
'content_type': 'text/plain',
}
defaults[header] = value
writer = mhtml.MHTMLWriter(**defaults)
with pytest.raises(UnicodeEncodeError) as excinfo:
writer.write_to(checker.fp)
assert "'ascii' codec can't encode" in str(excinfo.value)
def test_file_encoded_as_base64(checker):
content = b'Image file attached'
writer = mhtml.MHTMLWriter(root_content=content, content_type='text/plain',
content_location='http://example.com')
writer.add_file(location='http://a.example.com/image.png',
content='\U0001F601 image data'.encode('utf-8'),
content_type='image/png',
transfer_encoding=mhtml.E_BASE64)
writer.write_to(checker.fp)
checker.expect("""
Content-Type: multipart/related; boundary="---=_qute-UUID"
MIME-Version: 1.0
-----=_qute-UUID
Content-Location: http://example.com
MIME-Version: 1.0
Content-Type: text/plain
Content-Transfer-Encoding: quoted-printable
Image=20file=20attached
-----=_qute-UUID
Content-Location: http://a.example.com/image.png
MIME-Version: 1.0
Content-Type: image/png
Content-Transfer-Encoding: base64
8J+YgSBpbWFnZSBkYXRh
-----=_qute-UUID--
""")
@pytest.mark.parametrize('transfer_encoding', [mhtml.E_BASE64, mhtml.E_QUOPRI],
ids=['base64', 'quoted-printable'])
def test_payload_lines_wrap(checker, transfer_encoding):
payload = b'1234567890' * 10
writer = mhtml.MHTMLWriter(root_content=b'', content_type='text/plain',
content_location='http://example.com')
writer.add_file(location='http://example.com/payload', content=payload,
content_type='text/plain',
transfer_encoding=transfer_encoding)
writer.write_to(checker.fp)
for line in checker.value.split(b'\r\n'):
assert len(line) < 77
def test_files_appear_sorted(checker):
writer = mhtml.MHTMLWriter(root_content=b'root file',
content_type='text/plain',
content_location='http://www.example.com/')
for subdomain in 'ahgbizt':
writer.add_file(location='http://{}.example.com/'.format(subdomain),
content='file {}'.format(subdomain).encode('utf-8'),
content_type='text/plain',
transfer_encoding=mhtml.E_QUOPRI)
writer.write_to(checker.fp)
checker.expect("""
Content-Type: multipart/related; boundary="---=_qute-UUID"
MIME-Version: 1.0
-----=_qute-UUID
Content-Location: http://www.example.com/
MIME-Version: 1.0
Content-Type: text/plain
Content-Transfer-Encoding: quoted-printable
root=20file
-----=_qute-UUID
Content-Location: http://a.example.com/
MIME-Version: 1.0
Content-Type: text/plain
Content-Transfer-Encoding: quoted-printable
file=20a
-----=_qute-UUID
Content-Location: http://b.example.com/
MIME-Version: 1.0
Content-Type: text/plain
Content-Transfer-Encoding: quoted-printable
file=20b
-----=_qute-UUID
Content-Location: http://g.example.com/
MIME-Version: 1.0
Content-Type: text/plain
Content-Transfer-Encoding: quoted-printable
file=20g
-----=_qute-UUID
Content-Location: http://h.example.com/
MIME-Version: 1.0
Content-Type: text/plain
Content-Transfer-Encoding: quoted-printable
file=20h
-----=_qute-UUID
Content-Location: http://i.example.com/
MIME-Version: 1.0
Content-Type: text/plain
Content-Transfer-Encoding: quoted-printable
file=20i
-----=_qute-UUID
Content-Location: http://t.example.com/
MIME-Version: 1.0
Content-Type: text/plain
Content-Transfer-Encoding: quoted-printable
file=20t
-----=_qute-UUID
Content-Location: http://z.example.com/
MIME-Version: 1.0
Content-Type: text/plain
Content-Transfer-Encoding: quoted-printable
file=20z
-----=_qute-UUID--
""")
def test_empty_content_type(checker):
writer = mhtml.MHTMLWriter(root_content=b'',
content_location='http://example.com/',
content_type='text/plain')
writer.add_file('http://example.com/file', b'file content')
writer.write_to(checker.fp)
checker.expect("""
Content-Type: multipart/related; boundary="---=_qute-UUID"
MIME-Version: 1.0
-----=_qute-UUID
Content-Location: http://example.com/
MIME-Version: 1.0
Content-Type: text/plain
Content-Transfer-Encoding: quoted-printable
-----=_qute-UUID
MIME-Version: 1.0
Content-Location: http://example.com/file
Content-Transfer-Encoding: quoted-printable
file=20content
-----=_qute-UUID--
""")
@pytest.mark.parametrize('has_cssutils', [
pytest.mark.skipif(mhtml.cssutils is None,
reason="requires cssutils")(True),
False,
], ids=['with_cssutils', 'no_cssutils'])
@pytest.mark.parametrize('inline, style, expected_urls', [
(False, "@import 'default.css'", ['default.css']),
(False, '@import "default.css"', ['default.css']),
(False, "@import \t 'tabbed.css'", ['tabbed.css']),
(False, "@import url('default.css')", ['default.css']),
(False, """body {
background: url("/bg-img.png")
}""", ['/bg-img.png']),
(True, 'background: url(folder/file.png) no-repeat', ['folder/file.png']),
(True, 'content: url()', []),
])
def test_css_url_scanner(monkeypatch, has_cssutils, inline, style,
expected_urls):
if not has_cssutils:
monkeypatch.setattr('qutebrowser.browser.mhtml.cssutils', None)
expected_urls.sort()
urls = mhtml._get_css_imports(style, inline=inline)
urls.sort()
assert urls == expected_urls
class TestNoCloseBytesIO:
# WORKAROUND for https://bitbucket.org/logilab/pylint/issues/540/
# pylint: disable=no-member
def test_fake_close(self):
fp = mhtml._NoCloseBytesIO()
fp.write(b'Value')
fp.close()
assert fp.getvalue() == b'Value'
fp.write(b'Eulav')
assert fp.getvalue() == b'ValueEulav'
def test_actual_close(self):
fp = mhtml._NoCloseBytesIO()
fp.write(b'Value')
fp.actual_close()
with pytest.raises(ValueError) as excinfo:
fp.getvalue()
assert str(excinfo.value) == 'I/O operation on closed file.'
with pytest.raises(ValueError) as excinfo:
fp.write(b'Closed')
assert str(excinfo.value) == 'I/O operation on closed file.'

View File

@ -527,6 +527,19 @@ def test_same_domain_invalid_url(url1, url2):
with pytest.raises(urlutils.InvalidUrlError): with pytest.raises(urlutils.InvalidUrlError):
urlutils.same_domain(QUrl(url1), QUrl(url2)) urlutils.same_domain(QUrl(url1), QUrl(url2))
@pytest.mark.parametrize('url, expected', [
('http://example.com', 'http://example.com'),
('http://ünicode.com', 'http://xn--nicode-2ya.com'),
('http://foo.bar/?header=text/pläin',
'http://foo.bar/?header=text/pl%C3%A4in'),
])
def test_encoded_url(url, expected):
"""Test encoded_url"""
url = QUrl(url)
assert urlutils.encoded_url(url) == expected
class TestIncDecNumber: class TestIncDecNumber:
"""Tests for urlutils.incdec_number().""" """Tests for urlutils.incdec_number()."""

View File

@ -839,6 +839,20 @@ def test_force_encoding(inp, enc, expected):
assert utils.force_encoding(inp, enc) == expected assert utils.force_encoding(inp, enc) == expected
@pytest.mark.parametrize('inp, expected', [
('normal.txt', 'normal.txt'),
('user/repo issues.mht', 'user_repo issues.mht'),
('<Test\\File> - "*?:|', '_Test_File_ - _____'),
])
def test_sanitize_filename(inp, expected):
assert utils.sanitize_filename(inp) == expected
def test_sanitize_filename_empty_replacement():
name = '/<Bad File>/'
assert utils.sanitize_filename(name, replacement=None) == 'Bad File'
class TestNewestSlice: class TestNewestSlice:
"""Test newest_slice.""" """Test newest_slice."""

View File

@ -324,6 +324,7 @@ class ImportFake:
'jinja2': True, 'jinja2': True,
'pygments': True, 'pygments': True,
'yaml': True, 'yaml': True,
'cssutils': True,
} }
self.version_attribute = '__version__' self.version_attribute = '__version__'
self.version = '1.2.3' self.version = '1.2.3'
@ -383,12 +384,13 @@ class TestModuleVersions:
"""Test with all modules present in version 1.2.3.""" """Test with all modules present in version 1.2.3."""
expected = ['sip: yes', 'colorlog: yes', 'colorama: 1.2.3', expected = ['sip: yes', 'colorlog: yes', 'colorama: 1.2.3',
'pypeg2: 1.2.3', 'jinja2: 1.2.3', 'pygments: 1.2.3', 'pypeg2: 1.2.3', 'jinja2: 1.2.3', 'pygments: 1.2.3',
'yaml: 1.2.3'] 'yaml: 1.2.3', 'cssutils: 1.2.3']
assert version._module_versions() == expected assert version._module_versions() == expected
@pytest.mark.parametrize('module, idx, expected', [ @pytest.mark.parametrize('module, idx, expected', [
('colorlog', 1, 'colorlog: no'), ('colorlog', 1, 'colorlog: no'),
('colorama', 2, 'colorama: no'), ('colorama', 2, 'colorama: no'),
('cssutils', 7, 'cssutils: no'),
]) ])
def test_missing_module(self, module, idx, expected, import_fake): def test_missing_module(self, module, idx, expected, import_fake):
"""Test with a module missing. """Test with a module missing.
@ -404,12 +406,13 @@ class TestModuleVersions:
@pytest.mark.parametrize('value, expected', [ @pytest.mark.parametrize('value, expected', [
('VERSION', ['sip: yes', 'colorlog: yes', 'colorama: 1.2.3', ('VERSION', ['sip: yes', 'colorlog: yes', 'colorama: 1.2.3',
'pypeg2: yes', 'jinja2: yes', 'pygments: yes', 'pypeg2: yes', 'jinja2: yes', 'pygments: yes',
'yaml: yes']), 'yaml: yes', 'cssutils: yes']),
('SIP_VERSION_STR', ['sip: 1.2.3', 'colorlog: yes', 'colorama: yes', ('SIP_VERSION_STR', ['sip: 1.2.3', 'colorlog: yes', 'colorama: yes',
'pypeg2: yes', 'jinja2: yes', 'pygments: yes', 'pypeg2: yes', 'jinja2: yes', 'pygments: yes',
'yaml: yes']), 'yaml: yes', 'cssutils: yes']),
(None, ['sip: yes', 'colorlog: yes', 'colorama: yes', 'pypeg2: yes', (None, ['sip: yes', 'colorlog: yes', 'colorama: yes', 'pypeg2: yes',
'jinja2: yes', 'pygments: yes', 'yaml: yes']), 'jinja2: yes', 'pygments: yes', 'yaml: yes',
'cssutils: yes']),
]) ])
def test_version_attribute(self, value, expected, import_fake): def test_version_attribute(self, value, expected, import_fake):
"""Test with a different version attribute. """Test with a different version attribute.
@ -432,6 +435,7 @@ class TestModuleVersions:
('jinja2', True), ('jinja2', True),
('pygments', True), ('pygments', True),
('yaml', True), ('yaml', True),
('cssutils', True),
]) ])
def test_existing_attributes(self, name, has_version): def test_existing_attributes(self, name, has_version):
"""Check if all dependencies have an expected __version__ attribute. """Check if all dependencies have an expected __version__ attribute.