2015-10-28 07:13:02 +01:00
|
|
|
# vim: ft=python fileencoding=utf-8 sts=4 sw=4 et:
|
|
|
|
|
|
|
|
# Copyright 2015 Florian Bruhin (The Compiler) <mail@qutebrowser.org>
|
|
|
|
#
|
|
|
|
# This file is part of qutebrowser.
|
|
|
|
#
|
|
|
|
# qutebrowser is free software: you can redistribute it and/or modify
|
|
|
|
# it under the terms of the GNU General Public License as published by
|
|
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
|
|
# (at your option) any later version.
|
|
|
|
#
|
|
|
|
# qutebrowser is distributed in the hope that it will be useful,
|
|
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
|
|
# GNU General Public License for more details.
|
|
|
|
#
|
|
|
|
# You should have received a copy of the GNU General Public License
|
|
|
|
# along with qutebrowser. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
|
|
|
|
"""Test testprocess.Process."""
|
|
|
|
|
|
|
|
import sys
|
2015-11-05 06:37:35 +01:00
|
|
|
import time
|
2015-10-28 07:13:02 +01:00
|
|
|
import contextlib
|
|
|
|
import datetime
|
|
|
|
|
|
|
|
import pytest
|
|
|
|
from PyQt5.QtCore import QProcess
|
|
|
|
|
|
|
|
import testprocess
|
|
|
|
|
|
|
|
pytestmark = [pytest.mark.not_frozen]
|
|
|
|
|
|
|
|
|
|
|
|
@contextlib.contextmanager
|
|
|
|
def stopwatch(min_ms=None, max_ms=None):
|
|
|
|
if min_ms is None and max_ms is None:
|
|
|
|
raise ValueError("Using stopwatch with both min_ms/max_ms None does "
|
|
|
|
"nothing.")
|
|
|
|
start = datetime.datetime.now()
|
|
|
|
yield
|
|
|
|
stop = datetime.datetime.now()
|
|
|
|
delta_ms = (stop - start).total_seconds() * 1000
|
|
|
|
if min_ms is not None:
|
|
|
|
assert delta_ms >= min_ms
|
|
|
|
if max_ms is not None:
|
|
|
|
assert delta_ms <= max_ms
|
|
|
|
|
|
|
|
|
|
|
|
class PythonProcess(testprocess.Process):
|
|
|
|
|
|
|
|
"""A testprocess which runs the given Python code."""
|
|
|
|
|
|
|
|
def __init__(self):
|
|
|
|
super().__init__()
|
|
|
|
self.proc.setReadChannel(QProcess.StandardOutput)
|
|
|
|
self.code = None
|
|
|
|
|
|
|
|
def _parse_line(self, line):
|
|
|
|
print("LINE: {}".format(line))
|
|
|
|
if line.strip() == 'ready':
|
|
|
|
self.ready.emit()
|
2015-11-06 06:49:36 +01:00
|
|
|
return testprocess.Line(line)
|
2015-10-28 07:13:02 +01:00
|
|
|
|
|
|
|
def _executable_args(self):
|
|
|
|
code = [
|
|
|
|
'import sys, time',
|
|
|
|
'print("ready")',
|
|
|
|
'sys.stdout.flush()',
|
|
|
|
self.code,
|
|
|
|
'sys.stdout.flush()',
|
|
|
|
'time.sleep(20)',
|
|
|
|
]
|
|
|
|
return (sys.executable, ['-c', ';'.join(code)])
|
|
|
|
|
|
|
|
|
2015-11-13 23:27:33 +01:00
|
|
|
@pytest.yield_fixture
|
|
|
|
def pyproc():
|
|
|
|
proc = PythonProcess()
|
|
|
|
yield proc
|
|
|
|
proc.terminate()
|
|
|
|
|
2015-10-28 07:13:02 +01:00
|
|
|
|
2015-11-13 23:27:33 +01:00
|
|
|
class TestWaitFor:
|
2015-10-28 07:13:02 +01:00
|
|
|
|
|
|
|
def test_successful(self, pyproc):
|
|
|
|
"""Using wait_for with the expected text."""
|
2015-11-05 07:40:10 +01:00
|
|
|
pyproc.code = "time.sleep(0.5); print('foobar')"
|
2015-11-12 20:29:06 +01:00
|
|
|
with stopwatch(min_ms=500):
|
|
|
|
pyproc.start()
|
2015-10-28 07:13:02 +01:00
|
|
|
pyproc.wait_for(data="foobar")
|
|
|
|
|
|
|
|
def test_other_text(self, pyproc):
|
|
|
|
"""Test wait_for when getting some unrelated text."""
|
2015-11-05 07:40:10 +01:00
|
|
|
pyproc.code = "time.sleep(0.1); print('blahblah')"
|
2015-10-28 07:13:02 +01:00
|
|
|
pyproc.start()
|
|
|
|
with pytest.raises(testprocess.WaitForTimeout):
|
|
|
|
pyproc.wait_for(data="foobar", timeout=500)
|
|
|
|
|
|
|
|
def test_no_text(self, pyproc):
|
|
|
|
"""Test wait_for when getting no text at all."""
|
|
|
|
pyproc.code = "pass"
|
|
|
|
pyproc.start()
|
|
|
|
with pytest.raises(testprocess.WaitForTimeout):
|
|
|
|
pyproc.wait_for(data="foobar", timeout=100)
|
2015-11-05 06:37:35 +01:00
|
|
|
|
2015-11-13 07:49:33 +01:00
|
|
|
@pytest.mark.parametrize('message', ['foobar', 'literal [x]'])
|
|
|
|
def test_existing_message(self, message, pyproc):
|
2015-11-05 06:37:35 +01:00
|
|
|
"""Test with a message which already passed when waiting."""
|
2015-11-13 07:49:33 +01:00
|
|
|
pyproc.code = "print('{}')".format(message)
|
2015-11-05 06:37:35 +01:00
|
|
|
pyproc.start()
|
|
|
|
time.sleep(0.5) # to make sure the message is printed
|
2015-11-13 07:49:33 +01:00
|
|
|
pyproc.wait_for(data=message)
|
2015-11-05 06:37:35 +01:00
|
|
|
|
|
|
|
def test_existing_message_previous_test(self, pyproc):
|
|
|
|
"""Make sure the message of a previous test gets ignored."""
|
tests: Don't wait for the same line twice.
We need to search for lines in the history because we could miss something
otherwise, but for subsequent wait_for calls, we really don't want to wait for
the same thing again.
This should make test_backforward.py more stable as it *actually* waits when
going back now. Before, it did produce failures such as this one on OS X:
____________________________ test_going_backforward ____________________________
[..]
@bdd.then(bdd.parsers.parse("The requests should be:\n{pages}"))
def list_of_loaded_pages(httpbin, pages):
requests = [httpbin.Request('GET', '/' + path.strip())
for path in pages.split('\n')]
> assert httpbin.get_requests() == requests
E assert [Request(verb...rward/1.txt')] == [Request(verb=...rward/2.txt')]
E At index 3 diff: Request(verb='GET', path='/data/backforward/1.txt') != Request(verb='GET', path='/data/backforward/2.txt')
E Full diff:
E [Request(verb='GET', path='/data/backforward/1.txt'),
E Request(verb='GET', path='/data/backforward/2.txt'),
E Request(verb='GET', path='/data/backforward/1.txt'),
E - Request(verb='GET', path='/data/backforward/1.txt')]
E ? ^
E + Request(verb='GET', path='/data/backforward/2.txt')]
E ? ^
tests/integration/features/conftest.py:85: AssertionError
2015-11-06 06:57:38 +01:00
|
|
|
pyproc.code = "print('foobar')"
|
|
|
|
pyproc.start()
|
2015-11-06 07:17:15 +01:00
|
|
|
line = pyproc.wait_for(data="foobar")
|
|
|
|
line.waited_for = False # so we don't test what the next test does
|
tests: Don't wait for the same line twice.
We need to search for lines in the history because we could miss something
otherwise, but for subsequent wait_for calls, we really don't want to wait for
the same thing again.
This should make test_backforward.py more stable as it *actually* waits when
going back now. Before, it did produce failures such as this one on OS X:
____________________________ test_going_backforward ____________________________
[..]
@bdd.then(bdd.parsers.parse("The requests should be:\n{pages}"))
def list_of_loaded_pages(httpbin, pages):
requests = [httpbin.Request('GET', '/' + path.strip())
for path in pages.split('\n')]
> assert httpbin.get_requests() == requests
E assert [Request(verb...rward/1.txt')] == [Request(verb=...rward/2.txt')]
E At index 3 diff: Request(verb='GET', path='/data/backforward/1.txt') != Request(verb='GET', path='/data/backforward/2.txt')
E Full diff:
E [Request(verb='GET', path='/data/backforward/1.txt'),
E Request(verb='GET', path='/data/backforward/2.txt'),
E Request(verb='GET', path='/data/backforward/1.txt'),
E - Request(verb='GET', path='/data/backforward/1.txt')]
E ? ^
E + Request(verb='GET', path='/data/backforward/2.txt')]
E ? ^
tests/integration/features/conftest.py:85: AssertionError
2015-11-06 06:57:38 +01:00
|
|
|
pyproc.after_test()
|
|
|
|
with pytest.raises(testprocess.WaitForTimeout):
|
|
|
|
pyproc.wait_for(data="foobar", timeout=100)
|
|
|
|
|
|
|
|
def test_existing_message_already_waited(self, pyproc):
|
|
|
|
"""Make sure an existing message doesn't stop waiting twice.
|
|
|
|
|
|
|
|
wait_for checks existing messages (see above), but we don't want it to
|
|
|
|
automatically proceed if we already *did* use wait_for on one of the
|
|
|
|
existing messages, as that makes it likely it's not what we actually
|
|
|
|
want.
|
|
|
|
"""
|
2015-11-05 07:40:10 +01:00
|
|
|
pyproc.code = "time.sleep(0.1); print('foobar')"
|
2015-11-05 06:37:35 +01:00
|
|
|
pyproc.start()
|
2015-11-05 07:39:31 +01:00
|
|
|
pyproc.wait_for(data="foobar")
|
2015-11-05 06:37:35 +01:00
|
|
|
with pytest.raises(testprocess.WaitForTimeout):
|
|
|
|
pyproc.wait_for(data="foobar", timeout=100)
|
2015-11-13 23:27:33 +01:00
|
|
|
|
|
|
|
|
|
|
|
class TestEnsureNotLogged:
|
|
|
|
|
|
|
|
@pytest.mark.parametrize('message, pattern', [
|
|
|
|
('blacklisted', 'blacklisted'),
|
|
|
|
('bl[a]cklisted', 'bl[a]cklisted'),
|
|
|
|
('blacklisted', 'black*'),
|
|
|
|
])
|
|
|
|
def test_existing_message(self, pyproc, message, pattern):
|
|
|
|
pyproc.code = "print('{}')".format(message)
|
|
|
|
pyproc.start()
|
|
|
|
with stopwatch(max_ms=1000):
|
|
|
|
with pytest.raises(testprocess.BlacklistedMessageError):
|
|
|
|
pyproc.ensure_not_logged(data=pattern, delay=2000)
|
|
|
|
|
|
|
|
def test_late_message(self, pyproc):
|
|
|
|
pyproc.code = "time.sleep(0.5); print('blacklisted')"
|
|
|
|
pyproc.start()
|
|
|
|
with pytest.raises(testprocess.BlacklistedMessageError):
|
|
|
|
pyproc.ensure_not_logged(data='blacklisted', delay=1000)
|
|
|
|
|
|
|
|
def test_no_matching_message(self, pyproc):
|
|
|
|
pyproc.code = "print('blacklisted... nope!')"
|
|
|
|
pyproc.start()
|
|
|
|
pyproc.ensure_not_logged(data='blacklisted', delay=100)
|
|
|
|
|
|
|
|
def test_wait_for_and_blacklist(self, pyproc):
|
|
|
|
pyproc.code = "print('blacklisted')"
|
|
|
|
pyproc.start()
|
|
|
|
pyproc.wait_for(data='blacklisted')
|
|
|
|
with pytest.raises(testprocess.BlacklistedMessageError):
|
|
|
|
pyproc.ensure_not_logged(data='blacklisted', delay=0)
|