Start work on httpbin integration tests.
This commit is contained in:
parent
db513aa956
commit
48b599a774
100
tests/integration/conftest.py
Normal file
100
tests/integration/conftest.py
Normal file
@ -0,0 +1,100 @@
|
|||||||
|
import re
|
||||||
|
import sys
|
||||||
|
import socket
|
||||||
|
import functools
|
||||||
|
import os.path
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
import pytestqt.plugin
|
||||||
|
from PyQt5.QtCore import pyqtSlot, pyqtSignal, QProcess, QObject
|
||||||
|
|
||||||
|
|
||||||
|
class InvalidLine(Exception):
|
||||||
|
|
||||||
|
"""Exception raised when HTTPBin prints a line which is not parsable."""
|
||||||
|
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class HTTPBin(QObject):
|
||||||
|
|
||||||
|
ready = pyqtSignal()
|
||||||
|
got_new_url = pyqtSignal(str)
|
||||||
|
|
||||||
|
def __init__(self, parent=None):
|
||||||
|
super().__init__(parent)
|
||||||
|
self._invalid = False
|
||||||
|
self._visited_urls = []
|
||||||
|
self.port = self._get_port()
|
||||||
|
self.proc = QProcess()
|
||||||
|
self.proc.setReadChannel(QProcess.StandardError)
|
||||||
|
|
||||||
|
def _get_port(self):
|
||||||
|
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||||
|
sock.bind(('localhost', 0))
|
||||||
|
port = sock.getsockname()[1]
|
||||||
|
sock.close()
|
||||||
|
return port
|
||||||
|
|
||||||
|
def get_visited(self):
|
||||||
|
self.proc.waitForReadyRead(500)
|
||||||
|
self.read_urls()
|
||||||
|
return self._visited_urls
|
||||||
|
|
||||||
|
@pyqtSlot()
|
||||||
|
def read_urls(self):
|
||||||
|
while self.proc.canReadLine():
|
||||||
|
line = self.proc.readLine()
|
||||||
|
line = bytes(line).decode('utf-8').rstrip('\n')
|
||||||
|
print(line)
|
||||||
|
|
||||||
|
if line == (' * Running on http://127.0.0.1:{}/ (Press CTRL+C to '
|
||||||
|
'quit)'.format(self.port)):
|
||||||
|
self.ready.emit()
|
||||||
|
continue
|
||||||
|
|
||||||
|
match = re.match(r'.*"(GET [^ ]*) .*', line) # FIXME
|
||||||
|
if match is None:
|
||||||
|
self._invalid = True
|
||||||
|
print("INVALID: {}".format(line))
|
||||||
|
continue
|
||||||
|
url = match.group(1)
|
||||||
|
print(url)
|
||||||
|
self._visited_urls.append(url)
|
||||||
|
self.got_new_url.emit(url)
|
||||||
|
|
||||||
|
def start(self):
|
||||||
|
filename = os.path.join(os.path.dirname(__file__), 'webserver.py')
|
||||||
|
self.proc.start(sys.executable, [filename, str(self.port)])
|
||||||
|
ok = self.proc.waitForStarted()
|
||||||
|
assert ok
|
||||||
|
self.proc.readyRead.connect(self.read_urls)
|
||||||
|
|
||||||
|
def after_test(self):
|
||||||
|
self._visited_urls.clear()
|
||||||
|
if self._invalid:
|
||||||
|
raise InvalidLine
|
||||||
|
|
||||||
|
def cleanup(self):
|
||||||
|
self.proc.terminate()
|
||||||
|
self.proc.waitForFinished()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.yield_fixture(scope='session', autouse=True)
|
||||||
|
def httpbin(qapp):
|
||||||
|
httpbin = HTTPBin()
|
||||||
|
|
||||||
|
blocker = pytestqt.plugin.SignalBlocker(timeout=5000, raising=True)
|
||||||
|
blocker.connect(httpbin.ready)
|
||||||
|
with blocker:
|
||||||
|
httpbin.start()
|
||||||
|
|
||||||
|
yield httpbin
|
||||||
|
|
||||||
|
httpbin.cleanup()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.yield_fixture(autouse=True)
|
||||||
|
def httpbin_clean(httpbin):
|
||||||
|
yield
|
||||||
|
httpbin.after_test()
|
1
tests/integration/html/hello.html
Normal file
1
tests/integration/html/hello.html
Normal file
@ -0,0 +1 @@
|
|||||||
|
foo
|
20
tests/integration/test_webserver.py
Normal file
20
tests/integration/test_webserver.py
Normal file
@ -0,0 +1,20 @@
|
|||||||
|
import urllib.request
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize('path, content, expected', [
|
||||||
|
('/', '<title>httpbin(1): HTTP Client Testing Service</title>', True),
|
||||||
|
# https://github.com/Runscope/httpbin/issues/245
|
||||||
|
('/', 'www.google-analytics.com', False),
|
||||||
|
('/html/hello.html', 'Hello World!', True),
|
||||||
|
])
|
||||||
|
def test_httpbin(httpbin, qtbot, path, content, expected):
|
||||||
|
with qtbot.waitSignal(httpbin.got_new_url, raising=True, timeout=100):
|
||||||
|
url = 'http://localhost:{}{}'.format(httpbin.port, path)
|
||||||
|
response = urllib.request.urlopen(url)
|
||||||
|
|
||||||
|
data = response.read().decode('utf-8')
|
||||||
|
|
||||||
|
assert httpbin.get_visited() == ['GET {}'.format(path)]
|
||||||
|
assert (content in data) == expected
|
15
tests/integration/webserver.py
Normal file
15
tests/integration/webserver.py
Normal file
@ -0,0 +1,15 @@
|
|||||||
|
import sys
|
||||||
|
import os.path
|
||||||
|
|
||||||
|
from httpbin.core import app
|
||||||
|
import flask
|
||||||
|
|
||||||
|
|
||||||
|
@app.route('/html/<path:path>')
|
||||||
|
def send_html(path):
|
||||||
|
basedir = os.path.realpath(os.path.dirname(__file__))
|
||||||
|
print(basedir)
|
||||||
|
return flask.send_from_directory(os.path.join(basedir, 'html'), path)
|
||||||
|
|
||||||
|
|
||||||
|
app.run(port=int(sys.argv[1]), debug=True, use_reloader=False)
|
1
tox.ini
1
tox.ini
@ -26,6 +26,7 @@ deps =
|
|||||||
coverage==3.7.1
|
coverage==3.7.1
|
||||||
pytest-cov==2.1.0
|
pytest-cov==2.1.0
|
||||||
beautifulsoup4==4.4.0
|
beautifulsoup4==4.4.0
|
||||||
|
httpbin
|
||||||
commands =
|
commands =
|
||||||
{envpython} scripts/link_pyqt.py --tox {envdir}
|
{envpython} scripts/link_pyqt.py --tox {envdir}
|
||||||
{envpython} -m py.test --strict -rfEsw -m 'not integration' --cov qutebrowser --cov-report xml --cov-report=html --cov-report= {posargs:tests}
|
{envpython} -m py.test --strict -rfEsw -m 'not integration' --cov qutebrowser --cov-report xml --cov-report=html --cov-report= {posargs:tests}
|
||||||
|
Loading…
Reference in New Issue
Block a user