diff --git a/qutebrowser/commands/userscripts.py b/qutebrowser/commands/userscripts.py index 759d96ba5..315433c0d 100644 --- a/qutebrowser/commands/userscripts.py +++ b/qutebrowser/commands/userscripts.py @@ -300,9 +300,9 @@ class _DummyUserscriptRunner(QObject): # right thing depending on the platform. if os.name == 'posix': UserscriptRunner = _POSIXUserscriptRunner -elif os.name == 'nt': +elif os.name == 'nt': # pragma: no cover UserscriptRunner = _WindowsUserscriptRunner -else: +else: # pragma: no cover UserscriptRunner = _DummyUserscriptRunner diff --git a/tests/helpers/stubs.py b/tests/helpers/stubs.py index bcc7b866e..90604602f 100644 --- a/tests/helpers/stubs.py +++ b/tests/helpers/stubs.py @@ -76,12 +76,15 @@ class FakeWebFrame: focus_elem: The 'focused' element. """ - def __init__(self, geometry, scroll=None, parent=None): + def __init__(self, geometry=None, *, scroll=None, plaintext=None, + html=None, parent=None): """Constructor. Args: geometry: The geometry of the frame as QRect. scroll: The scroll position as QPoint. + plaintext: Return value of toPlainText + html: Return value of tohtml. parent: The parent frame. """ if scroll is None: @@ -90,6 +93,8 @@ class FakeWebFrame: self.scrollPosition = mock.Mock(return_value=scroll) self.parentFrame = mock.Mock(return_value=parent) self.focus_elem = None + self.toPlainText = mock.Mock(return_value=plaintext) + self.toHtml = mock.Mock(return_value=html) def findFirstElement(self, selector): if selector == '*:focus': diff --git a/tests/unit/commands/test_userscripts.py b/tests/unit/commands/test_userscripts.py new file mode 100644 index 000000000..dab2aada7 --- /dev/null +++ b/tests/unit/commands/test_userscripts.py @@ -0,0 +1,254 @@ +# vim: ft=python fileencoding=utf-8 sts=4 sw=4 et: + +# Copyright 2015 Florian Bruhin (The Compiler) +# +# This file is part of qutebrowser. +# +# qutebrowser is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# qutebrowser is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with qutebrowser. If not, see . + +import os +import json +import logging +import signal + +import pytest +from PyQt5.QtCore import QFileSystemWatcher + +from qutebrowser.commands import userscripts, cmdexc + + +@pytest.fixture(autouse=True) +def guiprocess_message_mock(message_mock): + message_mock.patch('qutebrowser.misc.guiprocess.message') + return message_mock + + +@pytest.mark.posix +class TestQtFIFOReader: + + @pytest.yield_fixture + def reader(self, tmpdir, qapp): + fifo_path = str(tmpdir / 'fifo') + os.mkfifo(fifo_path) + reader = userscripts._QtFIFOReader(fifo_path) + yield reader + if reader._notifier.isEnabled(): + reader.cleanup() + + def test_single_line(self, reader, qtbot): + """Test QSocketNotifier with a single line of data.""" + with qtbot.waitSignal(reader.got_line) as blocker: + with open(reader._filepath, 'w', encoding='utf-8') as f: + f.write('foobar\n') + + assert blocker.args == ['foobar'] + + def test_cleanup(self, reader): + assert not reader._fifo.closed + reader.cleanup() + assert reader._fifo.closed + + +@pytest.fixture(params=[ + userscripts._POSIXUserscriptRunner, + userscripts._WindowsUserscriptRunner, +]) +def runner(request): + if (os.name != 'posix' and + request.param is userscripts._POSIXUserscriptRunner): + pytest.skip("Requires a POSIX os") + else: + return request.param(0) + + +def test_command(qtbot, py_proc, runner): + cmd, args = py_proc(r""" + import os + with open(os.environ['QUTE_FIFO'], 'w') as f: + f.write('foo\n') + """) + with qtbot.waitSignal(runner.got_cmd, raising=True, + timeout=10000) as blocker: + runner.run(cmd, *args) + assert blocker.args == ['foo'] + + +def test_custom_env(qtbot, monkeypatch, py_proc, runner): + monkeypatch.setenv('QUTEBROWSER_TEST_1', '1') + env = {'QUTEBROWSER_TEST_2': '2'} + + cmd, args = py_proc(r""" + import os + import json + + env = dict(os.environ) + + with open(os.environ['QUTE_FIFO'], 'w') as f: + json.dump(env, f) + f.write('\n') + """) + + with qtbot.waitSignal(runner.got_cmd, raising=True, + timeout=10000) as blocker: + runner.run(cmd, *args, env=env) + + data = blocker.args[0] + ret_env = json.loads(data) + assert 'QUTEBROWSER_TEST_1' in ret_env + assert 'QUTEBROWSER_TEST_2' in ret_env + + +def test_temporary_files(qtbot, tmpdir, py_proc, runner): + """Make sure temporary files are passed and cleaned up correctly.""" + text_file = tmpdir / 'text' + text_file.write('This is text') + html_file = tmpdir / 'html' + html_file.write('This is HTML') + + env = {'QUTE_TEXT': str(text_file), 'QUTE_HTML': str(html_file)} + + cmd, args = py_proc(r""" + import os + import json + + data = {'html': None, 'text': None} + + with open(os.environ['QUTE_HTML'], 'r') as f: + data['html'] = f.read() + + with open(os.environ['QUTE_TEXT'], 'r') as f: + data['text'] = f.read() + + with open(os.environ['QUTE_FIFO'], 'w') as f: + json.dump(data, f) + f.write('\n') + """) + + with qtbot.waitSignal(runner.finished, raising=True, timeout=10000): + with qtbot.waitSignal(runner.got_cmd, raising=True, + timeout=10000) as blocker: + runner.run(cmd, *args, env=env) + + data = blocker.args[0] + parsed = json.loads(data) + assert parsed['text'] == 'This is text' + assert parsed['html'] == 'This is HTML' + + assert not text_file.exists() + assert not html_file.exists() + + +def test_command_with_error(qtbot, tmpdir, py_proc, runner): + text_file = tmpdir / 'text' + text_file.write('This is text') + + env = {'QUTE_TEXT': str(text_file)} + cmd, args = py_proc(r""" + import sys + sys.exit(1) + """) + + with qtbot.waitSignal(runner.finished, raising=True, timeout=10000): + runner.run(cmd, *args, env=env) + + assert not text_file.exists() + + +def test_killed_command(qtbot, tmpdir, py_proc, runner): + text_file = tmpdir / 'text' + text_file.write('This is text') + + pidfile = tmpdir / 'pid' + watcher = QFileSystemWatcher() + watcher.addPath(str(tmpdir)) + + env = {'QUTE_TEXT': str(text_file)} + cmd, args = py_proc(r""" + import os + import time + import sys + + # We can't use QUTE_FIFO to transmit the PID because that wouldn't work + # on Windows, where QUTE_FIFO is only monitored after the script has + # exited. + + with open(sys.argv[1], 'w') as f: + f.write(str(os.getpid())) + + time.sleep(30) + """) + args.append(str(pidfile)) + + with qtbot.waitSignal(watcher.directoryChanged, raising=True, + timeout=10000): + runner.run(cmd, *args, env=env) + + with qtbot.waitSignal(runner.finished, raising=True): + os.kill(int(pidfile.read()), signal.SIGTERM) + + assert not text_file.exists() + + +def test_temporary_files_failed_cleanup(caplog, qtbot, tmpdir, py_proc, + runner): + """Delete a temporary file from the script so cleanup fails.""" + test_file = tmpdir / 'test' + test_file.write('foo') + + cmd, args = py_proc(r""" + import os + os.remove(os.environ['QUTE_HTML']) + """) + + with caplog.at_level(logging.ERROR): + with qtbot.waitSignal(runner.finished, raising=True, timeout=10000): + runner.run(cmd, *args, env={'QUTE_HTML': str(test_file)}) + + assert len(caplog.records) == 1 + expected = ("Failed to delete tempfile {file} ([Errno 2] No such file or " + "directory: '{file}')!".format(file=test_file)) + assert caplog.records[0].message == expected + + +def test_dummy_runner(qtbot): + runner = userscripts._DummyUserscriptRunner(0) + with pytest.raises(cmdexc.CommandError): + with qtbot.waitSignal(runner.finished): + runner.run('cmd', 'arg') + + +def test_store_source_none(): + assert userscripts.store_source(None) == {} + + +def test_store_source(stubs): + expected_text = 'This is text' + expected_html = 'This is HTML' + + frame = stubs.FakeWebFrame(plaintext=expected_text, html=expected_html) + env = userscripts.store_source(frame) + + with open(env['QUTE_TEXT'], 'r', encoding='utf-8') as f: + text = f.read() + with open(env['QUTE_HTML'], 'r', encoding='utf-8') as f: + html = f.read() + + os.remove(env['QUTE_TEXT']) + os.remove(env['QUTE_HTML']) + + assert set(env.keys()) == {'QUTE_TEXT', 'QUTE_HTML'} + assert text == expected_text + assert html == expected_html + assert env['QUTE_TEXT'].endswith('.txt') + assert env['QUTE_HTML'].endswith('.html')