qutebrowser/tests/integration/quteprocess.py

456 lines
17 KiB
Python
Raw Normal View History

# vim: ft=python fileencoding=utf-8 sts=4 sw=4 et:
2016-01-04 07:12:39 +01:00
# Copyright 2015-2016 Florian Bruhin (The Compiler) <mail@qutebrowser.org>
#
# This file is part of qutebrowser.
#
# qutebrowser is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# qutebrowser is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with qutebrowser. If not, see <http://www.gnu.org/licenses/>.
"""Fixtures to run qutebrowser in a QProcess and communicate."""
import os
import re
import sys
2015-11-18 20:01:40 +01:00
import time
import os.path
2015-10-30 21:07:03 +01:00
import datetime
import logging
2015-11-03 14:55:46 +01:00
import tempfile
2015-11-23 13:31:17 +01:00
import contextlib
import itertools
2015-11-03 14:55:46 +01:00
import yaml
import pytest
from PyQt5.QtCore import pyqtSignal, QUrl
import testprocess
from qutebrowser.misc import ipc
from qutebrowser.utils import log, utils
from helpers import utils as testutils
instance_counter = itertools.count()
def is_ignored_qt_message(message):
"""Check if the message is listed in qt_log_ignore."""
2015-12-01 20:55:38 +01:00
# pylint: disable=no-member
# WORKAROUND for https://bitbucket.org/logilab/pylint/issues/717/
# we should switch to generated-members after that
regexes = pytest.config.getini('qt_log_ignore')
for regex in regexes:
if re.match(regex, message):
return True
return False
class LogLine(testprocess.Line):
2015-10-27 08:07:16 +01:00
"""A parsed line from the qutebrowser log output.
Attributes:
timestamp/loglevel/category/module/function/line/message:
Parsed from the log output.
expected: Whether the message was expected or not.
"""
LOG_RE = re.compile(r"""
(?P<timestamp>\d\d:\d\d:\d\d)
\ (?P<loglevel>VDEBUG|DEBUG|INFO|WARNING|ERROR)
\ +(?P<category>\w+)
2015-11-09 19:55:05 +01:00
\ +(?P<module>(\w+|Unknown\ module)):
(?P<function>[^"][^:]*|"[^"]+"):
2015-11-09 19:55:05 +01:00
(?P<line>\d+)
\ (?P<message>.+)
""", re.VERBOSE)
def __init__(self, data):
super().__init__(data)
match = self.LOG_RE.match(data)
2015-10-22 06:44:05 +02:00
if match is None:
raise testprocess.InvalidLine(data)
2015-10-30 21:07:03 +01:00
self.timestamp = datetime.datetime.strptime(match.group('timestamp'),
'%H:%M:%S')
loglevel = match.group('loglevel')
if loglevel == 'VDEBUG':
self.loglevel = log.VDEBUG_LEVEL
else:
self.loglevel = getattr(logging, loglevel)
self.category = match.group('category')
module = match.group('module')
if module == 'Unknown module':
self.module = None
else:
self.module = module
function = match.group('function')
if function == 'none':
self.function = None
else:
2015-11-09 19:55:05 +01:00
self.function = function.strip('"')
line = int(match.group('line'))
if self.function is None and line == 0:
self.line = None
else:
self.line = line
msg_match = re.match(r'^(\[(?P<prefix>\d+s ago)\] )?(?P<message>.*)',
match.group('message'))
self.prefix = msg_match.group('prefix')
self.message = msg_match.group('message')
2015-10-30 21:07:03 +01:00
self.expected = is_ignored_qt_message(self.message)
2015-10-22 06:44:05 +02:00
class QuteProc(testprocess.Process):
"""A running qutebrowser process used for tests.
Attributes:
2015-11-18 20:01:40 +01:00
_delay: Delay to wait between commands.
2015-10-22 06:44:05 +02:00
_ipc_socket: The IPC socket of the started instance.
_httpbin: The HTTPBin webserver.
basedir: The base directory for this instance.
_focus_ready: Whether the main window got focused.
_load_ready: Whether the about:blank page got loaded.
_profile: If True, do profiling of the subprocesses.
_instance_id: An unique ID for this QuteProc instance
_run_counter: A counter to get an unique ID for each run.
2015-11-26 14:25:33 +01:00
Signals:
got_error: Emitted when there was an error log line.
2015-10-22 06:44:05 +02:00
"""
2015-10-21 22:05:41 +02:00
got_error = pyqtSignal()
KEYS = ['timestamp', 'loglevel', 'category', 'module', 'function', 'line',
'message']
def __init__(self, httpbin, delay, *, profile=False, parent=None):
super().__init__(parent)
self._profile = profile
2015-11-18 20:01:40 +01:00
self._delay = delay
2015-10-11 13:53:59 +02:00
self._httpbin = httpbin
self._ipc_socket = None
self.basedir = None
self._focus_ready = False
self._load_ready = False
self._instance_id = next(instance_counter)
self._run_counter = itertools.count()
def _is_ready(self, what):
"""Called by _parse_line if loading/focusing is done.
When both are done, emits the 'ready' signal.
"""
if what == 'load':
self._load_ready = True
elif what == 'focus':
self._focus_ready = True
else:
raise ValueError("Invalid value {!r} for 'what'.".format(what))
if self._load_ready and self._focus_ready:
self.ready.emit()
def _parse_line(self, line):
2015-10-22 06:44:05 +02:00
try:
log_line = LogLine(line)
except testprocess.InvalidLine:
if line.startswith(' '):
# Multiple lines in some log output...
return None
elif not line.strip():
return None
elif is_ignored_qt_message(line):
2015-11-02 06:10:31 +01:00
return None
else:
raise
self._log(line)
start_okay_message_load = (
"load status for <qutebrowser.browser.webview.WebView tab_id=0 "
"url='about:blank'>: LoadStatus.success")
start_okay_message_focus = (
"Focus object changed: <qutebrowser.browser.webview.WebView "
"tab_id=0 url='about:blank'>")
if (log_line.category == 'ipc' and
log_line.message.startswith("Listening as ")):
self._ipc_socket = log_line.message.split(' ', maxsplit=2)[2]
elif (log_line.category == 'webview' and
log_line.message == start_okay_message_load):
self._is_ready('load')
elif (log_line.category == 'misc' and
log_line.message == start_okay_message_focus):
self._is_ready('focus')
elif (log_line.category == 'init' and
log_line.module == 'standarddir' and
log_line.function == 'init' and
log_line.message.startswith('Base directory:')):
self.basedir = log_line.message.split(':', maxsplit=1)[1].strip()
elif self._is_error_logline(log_line):
2015-10-27 06:38:45 +01:00
self.got_error.emit()
return log_line
def _executable_args(self):
if hasattr(sys, 'frozen'):
if self._profile:
raise Exception("Can't profile with sys.frozen!")
executable = os.path.join(os.path.dirname(sys.executable),
'qutebrowser')
args = []
else:
executable = sys.executable
if self._profile:
profile_dir = os.path.join(os.getcwd(), 'prof')
profile_id = '{}_{}'.format(self._instance_id,
next(self._run_counter))
profile_file = os.path.join(profile_dir,
'{}.pstats'.format(profile_id))
try:
os.mkdir(profile_dir)
except FileExistsError:
pass
args = [os.path.join('scripts', 'dev', 'run_profile.py'),
'--profile-tool', 'none',
'--profile-file', profile_file]
else:
args = ['-m', 'qutebrowser']
return executable, args
def _default_args(self):
return ['--debug', '--no-err-windows', '--temp-basedir', 'about:blank']
2016-01-12 22:48:38 +01:00
def path_to_url(self, path, *, port=None, https=False):
2015-11-26 14:25:33 +01:00
"""Get a URL based on a filename for the localhost webserver.
URLs like about:... and qute:... are handled specially and returned
verbatim.
"""
if path.startswith('about:') or path.startswith('qute:'):
return path
else:
2016-01-12 22:48:38 +01:00
return '{}://localhost:{}/{}'.format(
'https' if https else 'http',
self._httpbin.port if port is None else port,
path if path != '/' else '')
def wait_for_js(self, message):
"""Wait for the given javascript console message."""
self.wait_for(category='js', function='javaScriptConsoleMessage',
message='[*] {}'.format(message))
def _is_error_logline(self, msg):
"""Check if the given LogLine is some kind of error message."""
is_js_error = (msg.category == 'js' and
msg.function == 'javaScriptConsoleMessage' and
testutils.pattern_match(pattern='[*] [FAIL] *',
value=msg.message))
# Try to complain about the most common mistake when accidentally
# loading external resources.
is_ddg_load = testutils.pattern_match(
2016-02-27 02:41:01 +01:00
pattern="load status for <qutebrowser.browser.webview.WebView "
"tab_id=* url='*duckduckgo*'>: *",
value=msg.message)
return msg.loglevel > logging.INFO or is_js_error or is_ddg_load
def _maybe_skip(self):
"""Skip the test if [SKIP] lines were logged."""
skip_texts = []
for msg in self._data:
if (msg.category == 'js' and
msg.function == 'javaScriptConsoleMessage' and
testutils.pattern_match(pattern='[*] [SKIP] *',
value=msg.message)):
skip_texts.append(msg.message.partition(' [SKIP] ')[2])
if skip_texts:
pytest.skip(', '.join(skip_texts))
2016-02-04 07:10:06 +01:00
def after_test(self, did_fail): # pylint: disable=arguments-differ
"""Handle unexpected/skip logging and clean up after each test.
Args:
did_fail: Set if the main test failed already, then logged errors
are ignored.
"""
__tracebackhide__ = True
bad_msgs = [msg for msg in self._data
if self._is_error_logline(msg) and not msg.expected]
if did_fail:
super().after_test()
return
try:
if bad_msgs:
text = 'Logged unexpected errors:\n\n' + '\n'.join(
str(e) for e in bad_msgs)
# We'd like to use pytrace=False here but don't as a WORKAROUND
# for https://github.com/pytest-dev/pytest/issues/1316
pytest.fail(text)
else:
self._maybe_skip()
finally:
super().after_test()
2015-11-15 20:48:07 +01:00
def send_cmd(self, command, count=None):
2015-11-26 14:25:33 +01:00
"""Send a command to the running qutebrowser instance."""
assert self._ipc_socket is not None
2015-11-18 20:01:40 +01:00
time.sleep(self._delay / 1000)
2015-11-15 20:48:07 +01:00
if count is not None:
command = ':{}:{}'.format(count, command.lstrip(':'))
ipc.send_to_running_instance(self._ipc_socket, [command],
target_arg='')
self.wait_for(category='commands', module='command', function='run',
message='command called: *')
2015-11-23 13:30:49 +01:00
def get_setting(self, sect, opt):
"""Get the value of a qutebrowser setting."""
self.send_cmd(':set {} {}?'.format(sect, opt))
msg = self.wait_for(loglevel=logging.INFO, category='message',
message='{} {} = *'.format(sect, opt))
return msg.message.split(' = ')[1]
2015-10-11 13:53:59 +02:00
def set_setting(self, sect, opt, value):
self.send_cmd(':set "{}" "{}" "{}"'.format(sect, opt, value))
self.wait_for(category='config', message='Config option changed: *')
2015-10-11 13:53:59 +02:00
2015-11-23 13:31:17 +01:00
@contextlib.contextmanager
def temp_setting(self, sect, opt, value):
"""Context manager to set a setting and reset it on exit."""
old_value = self.get_setting(sect, opt)
self.set_setting(sect, opt, value)
yield
self.set_setting(sect, opt, old_value)
2016-01-12 22:48:38 +01:00
def open_path(self, path, *, new_tab=False, new_window=False, port=None,
https=False):
2015-11-26 14:25:33 +01:00
"""Open the given path on the local webserver in qutebrowser."""
2016-01-12 22:48:38 +01:00
url = self.path_to_url(path, port=port, https=https)
2016-03-28 23:06:05 +02:00
self.open_url(url, new_tab=new_tab, new_window=new_window)
2015-10-11 13:53:59 +02:00
def open_url(self, url, *, new_tab=False, new_window=False):
"""Open the given url in qutebrowser."""
if new_tab and new_window:
raise ValueError("new_tab and new_window given!")
if new_tab:
self.send_cmd(':open -t ' + url)
elif new_window:
self.send_cmd(':open -w ' + url)
else:
self.send_cmd(':open ' + url)
2015-11-01 22:10:48 +01:00
def mark_expected(self, category=None, loglevel=None, message=None):
2015-10-22 06:44:05 +02:00
"""Mark a given logging message as expected."""
line = self.wait_for(category=category, loglevel=loglevel,
message=message)
line.expected = True
2016-01-12 22:48:38 +01:00
def wait_for_load_finished(self, path, *, port=None, https=False,
2016-01-12 23:21:52 +01:00
timeout=None, load_status='success'):
"""Wait until any tab has finished loading."""
if timeout is None:
if 'CI' in os.environ:
timeout = 15000
else:
timeout = 5000
2016-01-12 22:48:38 +01:00
url = self.path_to_url(path, port=port, https=https)
# We really need the same representation that the webview uses in its
# __repr__
url = utils.elide(QUrl(url).toDisplayString(QUrl.EncodeUnicode), 100)
pattern = re.compile(
r"(load status for <qutebrowser\.browser\.webview\.WebView "
r"tab_id=\d+ url='{url}/?'>: LoadStatus\.{load_status}|fetch: "
2016-01-06 23:19:44 +01:00
r"PyQt5\.QtCore\.QUrl\('{url}'\) -> .*)".format(
2016-01-12 23:21:52 +01:00
load_status=re.escape(load_status), url=re.escape(url)))
self.wait_for(message=pattern, timeout=timeout)
2015-11-03 14:55:46 +01:00
def get_session(self):
"""Save the session and get the parsed session data."""
with tempfile.TemporaryDirectory() as tmpdir:
session = os.path.join(tmpdir, 'session.yml')
self.send_cmd(':session-save "{}"'.format(session))
self.wait_for(category='message', loglevel=logging.INFO,
message='Saved session {}.'.format(session))
with open(session, encoding='utf-8') as f:
data = f.read()
self._log(data)
return yaml.load(data)
2015-11-03 14:55:46 +01:00
def get_content(self, plain=True):
2015-11-23 14:37:29 +01:00
"""Get the contents of the current page."""
with tempfile.TemporaryDirectory() as tmpdir:
path = os.path.join(tmpdir, 'page')
if plain:
self.send_cmd(':debug-dump-page --plain "{}"'.format(path))
else:
self.send_cmd(':debug-dump-page "{}"'.format(path))
self.wait_for(category='message', loglevel=logging.INFO,
message='Dumped page to {}.'.format(path))
2015-11-23 14:37:29 +01:00
with open(path, 'r', encoding='utf-8') as f:
return f.read()
2015-11-25 17:19:16 +01:00
def press_keys(self, keys):
"""Press the given keys using :fake-key."""
self.send_cmd(':fake-key -g "{}"'.format(keys))
@pytest.yield_fixture(scope='module')
def quteproc_process(qapp, httpbin, request):
"""Fixture for qutebrowser process which is started once per file."""
2015-11-18 20:01:40 +01:00
delay = request.config.getoption('--qute-delay')
profile = request.config.getoption('--qute-profile-subprocs')
proc = QuteProc(httpbin, delay, profile=profile)
proc.start()
yield proc
proc.terminate()
@pytest.yield_fixture
def quteproc(quteproc_process, httpbin, request):
"""Per-test qutebrowser fixture which uses the per-file process."""
request.node._quteproc_log = quteproc_process.captured_log
quteproc_process.before_test()
yield quteproc_process
quteproc_process.after_test(did_fail=request.node.rep_call.failed)
@pytest.yield_fixture
def quteproc_new(qapp, httpbin, request):
"""Per-test qutebrowser process to test invocations."""
delay = request.config.getoption('--qute-delay')
profile = request.config.getoption('--qute-profile-subprocs')
proc = QuteProc(httpbin, delay, profile=profile)
request.node._quteproc_log = proc.captured_log
# Not calling before_test here as that would start the process
yield proc
proc.after_test(did_fail=request.node.rep_call.failed)