First working version

The files can be opened with qutebrowser

Problems still with Umlauts in the encoded file.
This commit is contained in:
Daniel 2015-09-20 14:30:12 +02:00
parent fbe5386e56
commit 930871be01

View File

@ -20,11 +20,13 @@
"""Utils for writing a MHTML file."""
import functools
import quopri
import io
from collections import namedtuple
from base64 import b64encode
from urllib.parse import urljoin
from uuid import uuid4
from PyQt5.QtCore import QUrl
from PyQt5.QtNetwork import QNetworkRequest, QNetworkReply
@ -36,18 +38,44 @@ _File = namedtuple("_File",
"content content_type content_location transfer_encoding")
def _chunked_base64(data, maxlen=76, linesep=b"\r\n"):
"""Just like b64encode, except that it breaks long lines.
Args:
maxlen: Maximum length of a line, not including the line separator.
linesep: Line separator to use as bytes.
"""
encoded = b64encode(data)
result = []
for i in range(0, len(encoded), maxlen):
result.append(encoded[i:i+maxlen])
return linesep.join(result)
def _rn_quopri(data):
"""Return a quoted-printable representation of data."""
orig_funcs = (quopri.b2a_qp, quopri.a2b_qp)
# Workaround for quopri mixing \n and \r\n
quopri.b2a_qp = quopri.a2b_qp = None
encoded = quopri.encodestring(data)
quopri.b2a_qp, quopri.a2b_qp = orig_funcs
return encoded.replace(b"\n", b"\r\n")
E_NONE = (None, lambda x: x)
"""No transfer encoding, copy the bytes from input to output"""
E_BASE64 = ("BASE64", b64encode)
E_BASE64 = ("base64", _chunked_base64)
"""Encode the file using base64 encoding"""
E_QUOPRI = ("quoted-printable", _rn_quopri)
"""Encode the file using MIME quoted-printable encoding."""
class MHTMLWriter(object):
"""A class for aggregating multiple files and outputting them to a MHTML
file."""
BOUNDARY = b"qute-mhtml"
BOUNDARY = b"---qute-mhtml-" + str(uuid4()).encode("ascii")
def __init__(self, root_content=None, content_location=None,
content_type=None):
@ -90,6 +118,9 @@ class MHTMLWriter(object):
self._output_root_file(fp)
for file_data in self._files.values():
self._output_file(fp, file_data)
fp.write(b"\r\n--")
fp.write(self.BOUNDARY)
fp.write(b"--")
def _output_header(self, fp):
if self.content_location is None:
@ -99,34 +130,34 @@ class MHTMLWriter(object):
fp.write(b"Content-Location: ")
fp.write(self.content_location.encode("utf-8"))
fp.write(b'\nContent-Type: multipart/related;boundary="')
fp.write(b'\r\nContent-Type: multipart/related;boundary="')
fp.write(self.BOUNDARY)
fp.write(b'";type="')
fp.write(self.content_type.encode("utf-8"))
fp.write(b'"\n\n')
fp.write(b'"\r\n\r\n')
def _output_root_file(self, fp):
root_file = _File(
content=self.root_content, content_type=self.content_type,
content_location=self.content_location, transfer_encoding=E_BASE64
content_location=self.content_location, transfer_encoding=E_QUOPRI,
)
self._output_file(fp, root_file)
def _output_file(self, fp, file_struct):
fp.write(b"--")
fp.write(self.BOUNDARY)
fp.write(b"\nContent-Location: ")
fp.write(b"\r\nContent-Location: ")
fp.write(file_struct.content_location.encode("utf-8"))
if file_struct.content_type is not None:
fp.write(b"\nContent-Type: ")
fp.write(b"\r\nContent-Type: ")
fp.write(file_struct.content_type.encode("utf-8"))
encoding_name, encoding_func = file_struct.transfer_encoding
if encoding_name:
fp.write(b"\nContent-Transfer-Encoding: ")
fp.write(b"\r\nContent-Transfer-Encoding: ")
fp.write(encoding_name.encode("utf-8"))
fp.write(b"\n\n")
fp.write(b"\r\n\r\n")
fp.write(encoding_func(file_struct.content))
fp.write(b"\n\n")
fp.write(b"\r\n\r\n")
def start_download(dest):
@ -163,13 +194,15 @@ def start_download(dest):
pending_downloads.remove(item)
mime = item.raw_headers.get(b"Content-Type", b"")
mime = mime.decode("ascii", "ignore")
writer.add_file(name, item.fileobj.getvalue(), mime)
encode = E_QUOPRI if mime.startswith("text/") else E_BASE64
writer.add_file(name, item.fileobj.getvalue(), mime, encode)
if pending_downloads:
return
finish_file()
def error(item, *args):
def error(name, item, *args):
pending_downloads.remove(item)
writer.add_file(name, b"")
if pending_downloads:
return
finish_file()
@ -190,7 +223,6 @@ def start_download(dest):
# Might be a local <script> tag or something else
continue
absolute_url_str = urljoin(web_url_str, element_url)
name = absolute_url_str if element_url.startswith("//") else element_url
# Prevent loading an asset twice
if absolute_url_str in loaded_urls:
continue
@ -204,8 +236,8 @@ def start_download(dest):
auto_remove=True)
pending_downloads.add(item)
item.finished.connect(
functools.partial(finished, name, item))
functools.partial(finished, absolute_url_str, item))
item.error.connect(
functools.partial(error, item))
functools.partial(error, absolute_url_str, item))
item.cancelled.connect(
functools.partial(error, item))
functools.partial(error, absolute_url_str, item))