ipc tests: More tests.
This commit is contained in:
parent
b4c90c5db4
commit
d966720900
@ -267,107 +267,110 @@ class TestHandleConnection:
|
||||
assert "We can read a line immediately." in all_msgs
|
||||
|
||||
|
||||
class TestRealConnections:
|
||||
@pytest.yield_fixture
|
||||
def connected_socket(qtbot, qlocalsocket, ipc_server):
|
||||
ipc_server.listen()
|
||||
with qtbot.waitSignal(ipc_server._server.newConnection, raising=True):
|
||||
qlocalsocket.connectToServer('qutebrowser-test')
|
||||
yield qlocalsocket
|
||||
qlocalsocket.disconnectFromServer()
|
||||
|
||||
@pytest.yield_fixture
|
||||
def connected_socket(self, qtbot, qlocalsocket, ipc_server):
|
||||
ipc_server.listen()
|
||||
with qtbot.waitSignal(ipc_server._server.newConnection, raising=True):
|
||||
qlocalsocket.connectToServer('qutebrowser-test')
|
||||
yield qlocalsocket
|
||||
qlocalsocket.disconnectFromServer()
|
||||
|
||||
@pytest.mark.parametrize('has_cwd', [True, False])
|
||||
def test_normal(self, qtbot, tmpdir, ipc_server, mocker, has_cwd):
|
||||
ipc_server.listen()
|
||||
spy = QSignalSpy(ipc_server.got_args)
|
||||
error_spy = QSignalSpy(ipc_server.got_invalid_data)
|
||||
@pytest.mark.parametrize('has_cwd', [True, False])
|
||||
def test_normal(qtbot, tmpdir, ipc_server, mocker, has_cwd):
|
||||
ipc_server.listen()
|
||||
spy = QSignalSpy(ipc_server.got_args)
|
||||
error_spy = QSignalSpy(ipc_server.got_invalid_data)
|
||||
|
||||
with qtbot.waitSignal(ipc_server.got_args, raising=True):
|
||||
with tmpdir.as_cwd():
|
||||
if not has_cwd:
|
||||
m = mocker.patch('qutebrowser.misc.ipc.os')
|
||||
m.getcwd.side_effect = OSError
|
||||
sent = ipc.send_to_running_instance('qutebrowser-test',
|
||||
['foo'])
|
||||
with qtbot.waitSignal(ipc_server.got_args, raising=True):
|
||||
with tmpdir.as_cwd():
|
||||
if not has_cwd:
|
||||
m = mocker.patch('qutebrowser.misc.ipc.os')
|
||||
m.getcwd.side_effect = OSError
|
||||
sent = ipc.send_to_running_instance('qutebrowser-test', ['foo'])
|
||||
|
||||
assert sent
|
||||
assert len(spy) == 1
|
||||
assert not error_spy
|
||||
assert sent
|
||||
assert len(spy) == 1
|
||||
assert not error_spy
|
||||
|
||||
if has_cwd:
|
||||
expected_cwd = str(tmpdir)
|
||||
else:
|
||||
expected_cwd = ''
|
||||
assert spy[0] == [['foo'], expected_cwd]
|
||||
if has_cwd:
|
||||
expected_cwd = str(tmpdir)
|
||||
else:
|
||||
expected_cwd = ''
|
||||
assert spy[0] == [['foo'], expected_cwd]
|
||||
|
||||
def test_double_connection(self, qtbot, connected_socket, ipc_server,
|
||||
caplog):
|
||||
spy = QSignalSpy(ipc_server.got_args)
|
||||
error_spy = QSignalSpy(ipc_server.got_invalid_data)
|
||||
with qtbot.waitSignal(ipc_server._server.newConnection, raising=True):
|
||||
sent = ipc.send_to_running_instance('qutebrowser-test', [])
|
||||
assert sent
|
||||
assert not spy
|
||||
assert not error_spy
|
||||
message = ("Got new connection but ignoring it because we're still "
|
||||
"handling another one.")
|
||||
assert message in [rec.message for rec in caplog.records()]
|
||||
|
||||
def test_disconnected_without_data(self, qtbot, connected_socket,
|
||||
ipc_server, caplog):
|
||||
"""Disconnect without sending data.
|
||||
|
||||
This means self._socket will be None on on_disconnected.
|
||||
"""
|
||||
connected_socket.disconnectFromServer()
|
||||
|
||||
def test_partial_line(self, connected_socket):
|
||||
connected_socket.write(b'foo')
|
||||
|
||||
@pytest.mark.parametrize('data', [
|
||||
b'\x80\n', # invalid UTF8
|
||||
b'\n',
|
||||
b'{"is this invalid json?": true\n',
|
||||
b'{"valid json without args": true}\n',
|
||||
])
|
||||
def test_invalid_data(self, qtbot, ipc_server, connected_socket, caplog,
|
||||
data):
|
||||
signals = [ipc_server.got_invalid_data, connected_socket.disconnected]
|
||||
with caplog.atLevel(logging.ERROR):
|
||||
with qtbot.waitSignals(signals, raising=True):
|
||||
connected_socket.write(data)
|
||||
messages = [r.message for r in caplog.records()]
|
||||
assert messages[-1] == 'Ignoring invalid IPC data.'
|
||||
|
||||
def test_multiline(self, qtbot, ipc_server, connected_socket):
|
||||
spy = QSignalSpy(ipc_server.got_args)
|
||||
error_spy = QSignalSpy(ipc_server.got_invalid_data)
|
||||
|
||||
with qtbot.waitSignals([ipc_server.got_args, ipc_server.got_args],
|
||||
raising=True):
|
||||
connected_socket.write(b'{"args": ["one"]}\n{"args": ["two"]}\n')
|
||||
|
||||
assert len(spy) == 2
|
||||
assert not error_spy
|
||||
assert spy[0][0] == ['one']
|
||||
assert spy[1][0] == ['two']
|
||||
|
||||
def test_connect_no_server(self, caplog):
|
||||
def test_double_connection(qtbot, connected_socket, ipc_server, caplog):
|
||||
spy = QSignalSpy(ipc_server.got_args)
|
||||
error_spy = QSignalSpy(ipc_server.got_invalid_data)
|
||||
with qtbot.waitSignal(ipc_server._server.newConnection, raising=True):
|
||||
sent = ipc.send_to_running_instance('qutebrowser-test', [])
|
||||
assert not sent
|
||||
msg = caplog.records()[-1].message
|
||||
assert msg == "No existing instance present (error 2)"
|
||||
assert sent
|
||||
assert not spy
|
||||
assert not error_spy
|
||||
message = ("Got new connection but ignoring it because we're still "
|
||||
"handling another one.")
|
||||
assert message in [rec.message for rec in caplog.records()]
|
||||
|
||||
def test_timeout(self, qtbot, caplog, qlocalsocket, ipc_server):
|
||||
ipc_server._timer.setInterval(100)
|
||||
ipc_server.listen()
|
||||
|
||||
with qtbot.waitSignal(ipc_server._server.newConnection, raising=True):
|
||||
qlocalsocket.connectToServer('qutebrowser-test')
|
||||
def test_disconnected_without_data(qtbot, connected_socket,
|
||||
ipc_server, caplog):
|
||||
"""Disconnect without sending data.
|
||||
|
||||
with caplog.atLevel(logging.ERROR):
|
||||
with qtbot.waitSignal(qlocalsocket.disconnected, raising=True):
|
||||
pass
|
||||
This means self._socket will be None on on_disconnected.
|
||||
"""
|
||||
connected_socket.disconnectFromServer()
|
||||
|
||||
assert caplog.records()[-1].message == "IPC connection timed out."
|
||||
|
||||
def test_partial_line(connected_socket):
|
||||
connected_socket.write(b'foo')
|
||||
|
||||
|
||||
@pytest.mark.parametrize('data', [
|
||||
b'\x80\n', # invalid UTF8
|
||||
b'\n',
|
||||
b'{"is this invalid json?": true\n',
|
||||
b'{"valid json without args": true}\n',
|
||||
])
|
||||
def test_invalid_data(qtbot, ipc_server, connected_socket, caplog, data):
|
||||
signals = [ipc_server.got_invalid_data, connected_socket.disconnected]
|
||||
with caplog.atLevel(logging.ERROR):
|
||||
with qtbot.waitSignals(signals, raising=True):
|
||||
connected_socket.write(data)
|
||||
messages = [r.message for r in caplog.records()]
|
||||
assert messages[-1] == 'Ignoring invalid IPC data.'
|
||||
|
||||
|
||||
def test_multiline(qtbot, ipc_server, connected_socket):
|
||||
spy = QSignalSpy(ipc_server.got_args)
|
||||
error_spy = QSignalSpy(ipc_server.got_invalid_data)
|
||||
|
||||
with qtbot.waitSignals([ipc_server.got_args, ipc_server.got_args],
|
||||
raising=True):
|
||||
connected_socket.write(b'{"args": ["one"]}\n{"args": ["two"]}\n')
|
||||
|
||||
assert len(spy) == 2
|
||||
assert not error_spy
|
||||
assert spy[0][0] == ['one']
|
||||
assert spy[1][0] == ['two']
|
||||
|
||||
|
||||
def test_connect_no_server(caplog):
|
||||
sent = ipc.send_to_running_instance('qutebrowser-test', [])
|
||||
assert not sent
|
||||
msg = caplog.records()[-1].message
|
||||
assert msg == "No existing instance present (error 2)"
|
||||
|
||||
|
||||
def test_timeout(qtbot, caplog, qlocalsocket, ipc_server):
|
||||
ipc_server._timer.setInterval(100)
|
||||
ipc_server.listen()
|
||||
|
||||
with qtbot.waitSignal(ipc_server._server.newConnection, raising=True):
|
||||
qlocalsocket.connectToServer('qutebrowser-test')
|
||||
|
||||
with caplog.atLevel(logging.ERROR):
|
||||
with qtbot.waitSignal(qlocalsocket.disconnected, raising=True):
|
||||
pass
|
||||
|
||||
assert caplog.records()[-1].message == "IPC connection timed out."
|
||||
|
Loading…
Reference in New Issue
Block a user