Rewrite userscripts to work with async dumping
This commit is contained in:
parent
edafa7c99f
commit
a6307497c0
@ -1033,14 +1033,13 @@ class CommandDispatcher:
|
|||||||
if idx != -1:
|
if idx != -1:
|
||||||
env['QUTE_TITLE'] = self._tabbed_browser.page_title(idx)
|
env['QUTE_TITLE'] = self._tabbed_browser.page_title(idx)
|
||||||
|
|
||||||
webview = self._tabbed_browser.currentWidget()
|
tab = self._tabbed_browser.currentWidget()
|
||||||
if webview is None:
|
if tab is None:
|
||||||
mainframe = None
|
mainframe = None
|
||||||
else:
|
else:
|
||||||
if webview.caret.has_selection():
|
if tab.caret.has_selection():
|
||||||
env['QUTE_SELECTED_TEXT'] = webview.caret.selection()
|
env['QUTE_SELECTED_TEXT'] = tab.caret.selection()
|
||||||
env['QUTE_SELECTED_HTML'] = webview.caret.selection(html=True)
|
env['QUTE_SELECTED_HTML'] = tab.caret.selection(html=True)
|
||||||
mainframe = webview.page().mainFrame()
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
url = self._tabbed_browser.current_url()
|
url = self._tabbed_browser.current_url()
|
||||||
@ -1049,8 +1048,7 @@ class CommandDispatcher:
|
|||||||
else:
|
else:
|
||||||
env['QUTE_URL'] = url.toString(QUrl.FullyEncoded)
|
env['QUTE_URL'] = url.toString(QUrl.FullyEncoded)
|
||||||
|
|
||||||
env.update(userscripts.store_source(mainframe))
|
userscripts.run_async(tab, cmd, *args, win_id=self._win_id, env=env,
|
||||||
userscripts.run(cmd, *args, win_id=self._win_id, env=env,
|
|
||||||
verbose=verbose)
|
verbose=verbose)
|
||||||
|
|
||||||
@cmdutils.register(instance='command-dispatcher', scope='window')
|
@cmdutils.register(instance='command-dispatcher', scope='window')
|
||||||
|
@ -84,6 +84,7 @@ class HintContext:
|
|||||||
args: Custom arguments for userscript/spawn
|
args: Custom arguments for userscript/spawn
|
||||||
rapid: Whether to do rapid hinting.
|
rapid: Whether to do rapid hinting.
|
||||||
mainframe: The main QWebFrame where we started hinting in.
|
mainframe: The main QWebFrame where we started hinting in.
|
||||||
|
tab: The WebTab object we started hinting in.
|
||||||
group: The group of web elements to hint.
|
group: The group of web elements to hint.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
@ -98,6 +99,7 @@ class HintContext:
|
|||||||
self.destroyed_frames = []
|
self.destroyed_frames = []
|
||||||
self.args = []
|
self.args = []
|
||||||
self.mainframe = None
|
self.mainframe = None
|
||||||
|
self.tab = None
|
||||||
self.group = None
|
self.group = None
|
||||||
|
|
||||||
def get_args(self, urlstr):
|
def get_args(self, urlstr):
|
||||||
@ -569,7 +571,6 @@ class HintManager(QObject):
|
|||||||
"""
|
"""
|
||||||
cmd = context.args[0]
|
cmd = context.args[0]
|
||||||
args = context.args[1:]
|
args = context.args[1:]
|
||||||
frame = context.mainframe
|
|
||||||
env = {
|
env = {
|
||||||
'QUTE_MODE': 'hints',
|
'QUTE_MODE': 'hints',
|
||||||
'QUTE_SELECTED_TEXT': str(elem),
|
'QUTE_SELECTED_TEXT': str(elem),
|
||||||
@ -578,8 +579,9 @@ class HintManager(QObject):
|
|||||||
url = self._resolve_url(elem, context.baseurl)
|
url = self._resolve_url(elem, context.baseurl)
|
||||||
if url is not None:
|
if url is not None:
|
||||||
env['QUTE_URL'] = url.toString(QUrl.FullyEncoded)
|
env['QUTE_URL'] = url.toString(QUrl.FullyEncoded)
|
||||||
env.update(userscripts.store_source(frame))
|
|
||||||
userscripts.run(cmd, *args, win_id=self._win_id, env=env)
|
userscripts.run_async(context.tab, cmd, *args, win_id=self._win_id,
|
||||||
|
env=env)
|
||||||
|
|
||||||
def _spawn(self, url, context):
|
def _spawn(self, url, context):
|
||||||
"""Spawn a simple command from a hint.
|
"""Spawn a simple command from a hint.
|
||||||
@ -837,6 +839,7 @@ class HintManager(QObject):
|
|||||||
|
|
||||||
self._check_args(target, *args)
|
self._check_args(target, *args)
|
||||||
self._context = HintContext()
|
self._context = HintContext()
|
||||||
|
self._context.tab = tab
|
||||||
self._context.target = target
|
self._context.target = target
|
||||||
self._context.rapid = rapid
|
self._context.rapid = rapid
|
||||||
try:
|
try:
|
||||||
|
@ -86,6 +86,10 @@ class _BaseUserscriptRunner(QObject):
|
|||||||
_proc: The GUIProcess which is being executed.
|
_proc: The GUIProcess which is being executed.
|
||||||
_win_id: The window ID this runner is associated with.
|
_win_id: The window ID this runner is associated with.
|
||||||
_cleaned_up: Whether temporary files were cleaned up.
|
_cleaned_up: Whether temporary files were cleaned up.
|
||||||
|
_text_stored: Set when the page text was stored async.
|
||||||
|
_html_stored: Set when the page html was stored async.
|
||||||
|
_args: Arguments to pass to _run_process.
|
||||||
|
_kwargs: Keyword arguments to pass to _run_process.
|
||||||
|
|
||||||
Signals:
|
Signals:
|
||||||
got_cmd: Emitted when a new command arrived and should be executed.
|
got_cmd: Emitted when a new command arrived and should be executed.
|
||||||
@ -101,9 +105,41 @@ class _BaseUserscriptRunner(QObject):
|
|||||||
self._win_id = win_id
|
self._win_id = win_id
|
||||||
self._filepath = None
|
self._filepath = None
|
||||||
self._proc = None
|
self._proc = None
|
||||||
self._env = None
|
self._env = {}
|
||||||
|
self._text_stored = False
|
||||||
|
self._html_stored = False
|
||||||
|
self._args = None
|
||||||
|
self._kwargs = None
|
||||||
|
|
||||||
def _run_process(self, cmd, *args, env, verbose):
|
def store_text(self, text):
|
||||||
|
"""Called as callback when the text is ready from the web backend."""
|
||||||
|
with tempfile.NamedTemporaryFile(mode='w', encoding='utf-8',
|
||||||
|
suffix='.txt',
|
||||||
|
delete=False) as txt_file:
|
||||||
|
txt_file.write(text)
|
||||||
|
self._env['QUTE_TEXT'] = txt_file.name
|
||||||
|
|
||||||
|
self._text_stored = True
|
||||||
|
log.procs.debug("Text stored from webview")
|
||||||
|
if self._text_stored and self._html_stored:
|
||||||
|
log.procs.debug("Both text/HTML stored, kicking off userscript!")
|
||||||
|
self._run_process(*self._args, **self._kwargs)
|
||||||
|
|
||||||
|
def store_html(self, html):
|
||||||
|
"""Called as callback when the html is ready from the web backend."""
|
||||||
|
with tempfile.NamedTemporaryFile(mode='w', encoding='utf-8',
|
||||||
|
suffix='.html',
|
||||||
|
delete=False) as html_file:
|
||||||
|
html_file.write(html)
|
||||||
|
self._env['QUTE_HTML'] = html_file.name
|
||||||
|
|
||||||
|
self._html_stored = True
|
||||||
|
log.procs.debug("HTML stored from webview")
|
||||||
|
if self._text_stored and self._html_stored:
|
||||||
|
log.procs.debug("Both text/HTML stored, kicking off userscript!")
|
||||||
|
self._run_process(*self._args, **self._kwargs)
|
||||||
|
|
||||||
|
def _run_process(self, cmd, *args, env=None, verbose=False):
|
||||||
"""Start the given command.
|
"""Start the given command.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
@ -112,7 +148,7 @@ class _BaseUserscriptRunner(QObject):
|
|||||||
env: A dictionary of environment variables to add.
|
env: A dictionary of environment variables to add.
|
||||||
verbose: Show notifications when the command started/exited.
|
verbose: Show notifications when the command started/exited.
|
||||||
"""
|
"""
|
||||||
self._env = {'QUTE_FIFO': self._filepath}
|
self._env['QUTE_FIFO'] = self._filepath
|
||||||
if env is not None:
|
if env is not None:
|
||||||
self._env.update(env)
|
self._env.update(env)
|
||||||
self._proc = guiprocess.GUIProcess(self._win_id, 'userscript',
|
self._proc = guiprocess.GUIProcess(self._win_id, 'userscript',
|
||||||
@ -144,18 +180,19 @@ class _BaseUserscriptRunner(QObject):
|
|||||||
fn, e))
|
fn, e))
|
||||||
self._filepath = None
|
self._filepath = None
|
||||||
self._proc = None
|
self._proc = None
|
||||||
self._env = None
|
self._env = {}
|
||||||
|
self._text_stored = False
|
||||||
|
self._html_stored = False
|
||||||
|
|
||||||
def run(self, cmd, *args, env=None, verbose=False):
|
def prepare_run(self, *args, **kwargs):
|
||||||
"""Run the userscript given.
|
"""Prepare ruinning the userscript given.
|
||||||
|
|
||||||
Needs to be overridden by subclasses.
|
Needs to be overridden by subclasses.
|
||||||
|
The script will actually run after store_text and store_html have been
|
||||||
|
called.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
cmd: The command to be started.
|
Passed to _run_process.
|
||||||
*args: The arguments to hand to the command
|
|
||||||
env: A dictionary of environment variables to add.
|
|
||||||
verbose: Show notifications when the command started/exited.
|
|
||||||
"""
|
"""
|
||||||
raise NotImplementedError
|
raise NotImplementedError
|
||||||
|
|
||||||
@ -190,7 +227,10 @@ class _POSIXUserscriptRunner(_BaseUserscriptRunner):
|
|||||||
super().__init__(win_id, parent)
|
super().__init__(win_id, parent)
|
||||||
self._reader = None
|
self._reader = None
|
||||||
|
|
||||||
def run(self, cmd, *args, env=None, verbose=False):
|
def prepare_run(self, *args, **kwargs):
|
||||||
|
self._args = args
|
||||||
|
self._kwargs = kwargs
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# tempfile.mktemp is deprecated and discouraged, but we use it here
|
# tempfile.mktemp is deprecated and discouraged, but we use it here
|
||||||
# to create a FIFO since the only other alternative would be to
|
# to create a FIFO since the only other alternative would be to
|
||||||
@ -209,8 +249,6 @@ class _POSIXUserscriptRunner(_BaseUserscriptRunner):
|
|||||||
self._reader = _QtFIFOReader(self._filepath)
|
self._reader = _QtFIFOReader(self._filepath)
|
||||||
self._reader.got_line.connect(self.got_cmd)
|
self._reader.got_line.connect(self.got_cmd)
|
||||||
|
|
||||||
self._run_process(cmd, *args, env=env, verbose=verbose)
|
|
||||||
|
|
||||||
@pyqtSlot()
|
@pyqtSlot()
|
||||||
def on_proc_finished(self):
|
def on_proc_finished(self):
|
||||||
self._cleanup()
|
self._cleanup()
|
||||||
@ -280,14 +318,16 @@ class _WindowsUserscriptRunner(_BaseUserscriptRunner):
|
|||||||
"""Read back the commands when the process finished."""
|
"""Read back the commands when the process finished."""
|
||||||
self._cleanup()
|
self._cleanup()
|
||||||
|
|
||||||
def run(self, cmd, *args, env=None, verbose=False):
|
def prepare_run(self, *args, **kwargs):
|
||||||
|
self._args = args
|
||||||
|
self._kwargs = kwargs
|
||||||
|
|
||||||
try:
|
try:
|
||||||
self._oshandle, self._filepath = tempfile.mkstemp(text=True)
|
self._oshandle, self._filepath = tempfile.mkstemp(text=True)
|
||||||
except OSError as e:
|
except OSError as e:
|
||||||
message.error(self._win_id, "Error while creating tempfile: "
|
message.error(self._win_id, "Error while creating tempfile: "
|
||||||
"{}".format(e))
|
"{}".format(e))
|
||||||
return
|
return
|
||||||
self._run_process(cmd, *args, env=env, verbose=verbose)
|
|
||||||
|
|
||||||
|
|
||||||
class _DummyUserscriptRunner(QObject):
|
class _DummyUserscriptRunner(QObject):
|
||||||
@ -307,7 +347,7 @@ class _DummyUserscriptRunner(QObject):
|
|||||||
# pylint: disable=unused-argument
|
# pylint: disable=unused-argument
|
||||||
super().__init__(parent)
|
super().__init__(parent)
|
||||||
|
|
||||||
def run(self, cmd, *args, env=None, verbose=False):
|
def prepare_run(self, *args, **kwargs):
|
||||||
"""Print an error as userscripts are not supported."""
|
"""Print an error as userscripts are not supported."""
|
||||||
# pylint: disable=unused-argument,unused-variable
|
# pylint: disable=unused-argument,unused-variable
|
||||||
self.finished.emit()
|
self.finished.emit()
|
||||||
@ -325,41 +365,11 @@ else: # pragma: no cover
|
|||||||
UserscriptRunner = _DummyUserscriptRunner
|
UserscriptRunner = _DummyUserscriptRunner
|
||||||
|
|
||||||
|
|
||||||
def store_source(frame):
|
def run_async(tab, cmd, *args, win_id, env, verbose=False):
|
||||||
"""Store HTML/plaintext in files.
|
|
||||||
|
|
||||||
This writes files containing the HTML/plaintext source of the page, and
|
|
||||||
returns a dict with the paths as QUTE_HTML/QUTE_TEXT.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
frame: The QWebFrame to get the info from, or None to do nothing.
|
|
||||||
|
|
||||||
Return:
|
|
||||||
A dictionary with the needed environment variables.
|
|
||||||
|
|
||||||
Warning:
|
|
||||||
The caller is responsible to delete the files after using them!
|
|
||||||
"""
|
|
||||||
if frame is None:
|
|
||||||
return {}
|
|
||||||
env = {}
|
|
||||||
with tempfile.NamedTemporaryFile(mode='w', encoding='utf-8',
|
|
||||||
suffix='.html',
|
|
||||||
delete=False) as html_file:
|
|
||||||
html_file.write(frame.toHtml())
|
|
||||||
env['QUTE_HTML'] = html_file.name
|
|
||||||
with tempfile.NamedTemporaryFile(mode='w', encoding='utf-8',
|
|
||||||
suffix='.txt',
|
|
||||||
delete=False) as txt_file:
|
|
||||||
txt_file.write(frame.toPlainText())
|
|
||||||
env['QUTE_TEXT'] = txt_file.name
|
|
||||||
return env
|
|
||||||
|
|
||||||
|
|
||||||
def run(cmd, *args, win_id, env, verbose=False):
|
|
||||||
"""Convenience method to run a userscript.
|
"""Convenience method to run a userscript.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
|
tab: The WebKitTab/WebEngineTab to get the source from.
|
||||||
cmd: The userscript binary to run.
|
cmd: The userscript binary to run.
|
||||||
*args: The arguments to pass to the userscript.
|
*args: The arguments to pass to the userscript.
|
||||||
win_id: The window id the userscript is executed in.
|
win_id: The window id the userscript is executed in.
|
||||||
@ -398,6 +408,9 @@ def run(cmd, *args, win_id, env, verbose=False):
|
|||||||
"userscripts", cmd)
|
"userscripts", cmd)
|
||||||
log.misc.debug("Userscript to run: {}".format(cmd_path))
|
log.misc.debug("Userscript to run: {}".format(cmd_path))
|
||||||
|
|
||||||
runner.run(cmd_path, *args, env=env, verbose=verbose)
|
|
||||||
runner.finished.connect(commandrunner.deleteLater)
|
runner.finished.connect(commandrunner.deleteLater)
|
||||||
runner.finished.connect(runner.deleteLater)
|
runner.finished.connect(runner.deleteLater)
|
||||||
|
|
||||||
|
runner.prepare_run(cmd_path, *args, env=env, verbose=verbose)
|
||||||
|
tab.dump_async(runner.store_html)
|
||||||
|
tab.dump_async(runner.store_text, plain=True)
|
||||||
|
@ -80,7 +80,9 @@ def test_command(qtbot, py_proc, runner):
|
|||||||
f.write('foo\n')
|
f.write('foo\n')
|
||||||
""")
|
""")
|
||||||
with qtbot.waitSignal(runner.got_cmd, timeout=10000) as blocker:
|
with qtbot.waitSignal(runner.got_cmd, timeout=10000) as blocker:
|
||||||
runner.run(cmd, *args)
|
runner.prepare_run(cmd, *args)
|
||||||
|
runner.store_html('')
|
||||||
|
runner.store_text('')
|
||||||
assert blocker.args == ['foo']
|
assert blocker.args == ['foo']
|
||||||
|
|
||||||
|
|
||||||
@ -100,7 +102,9 @@ def test_custom_env(qtbot, monkeypatch, py_proc, runner):
|
|||||||
""")
|
""")
|
||||||
|
|
||||||
with qtbot.waitSignal(runner.got_cmd, timeout=10000) as blocker:
|
with qtbot.waitSignal(runner.got_cmd, timeout=10000) as blocker:
|
||||||
runner.run(cmd, *args, env=env)
|
runner.prepare_run(cmd, *args, env=env)
|
||||||
|
runner.store_html('')
|
||||||
|
runner.store_text('')
|
||||||
|
|
||||||
data = blocker.args[0]
|
data = blocker.args[0]
|
||||||
ret_env = json.loads(data)
|
ret_env = json.loads(data)
|
||||||
@ -108,20 +112,16 @@ def test_custom_env(qtbot, monkeypatch, py_proc, runner):
|
|||||||
assert 'QUTEBROWSER_TEST_2' in ret_env
|
assert 'QUTEBROWSER_TEST_2' in ret_env
|
||||||
|
|
||||||
|
|
||||||
def test_temporary_files(qtbot, tmpdir, py_proc, runner):
|
def test_source(qtbot, py_proc, runner):
|
||||||
"""Make sure temporary files are passed and cleaned up correctly."""
|
"""Make sure the page source is read and cleaned up correctly."""
|
||||||
text_file = tmpdir / 'text'
|
|
||||||
text_file.write('This is text')
|
|
||||||
html_file = tmpdir / 'html'
|
|
||||||
html_file.write('This is HTML')
|
|
||||||
|
|
||||||
env = {'QUTE_TEXT': str(text_file), 'QUTE_HTML': str(html_file)}
|
|
||||||
|
|
||||||
cmd, args = py_proc(r"""
|
cmd, args = py_proc(r"""
|
||||||
import os
|
import os
|
||||||
import json
|
import json
|
||||||
|
|
||||||
data = {'html': None, 'text': None}
|
data = {
|
||||||
|
'html_file': os.environ['QUTE_HTML'],
|
||||||
|
'text_file': os.environ['QUTE_TEXT'],
|
||||||
|
}
|
||||||
|
|
||||||
with open(os.environ['QUTE_HTML'], 'r') as f:
|
with open(os.environ['QUTE_HTML'], 'r') as f:
|
||||||
data['html'] = f.read()
|
data['html'] = f.read()
|
||||||
@ -136,76 +136,85 @@ def test_temporary_files(qtbot, tmpdir, py_proc, runner):
|
|||||||
|
|
||||||
with qtbot.waitSignal(runner.finished, timeout=10000):
|
with qtbot.waitSignal(runner.finished, timeout=10000):
|
||||||
with qtbot.waitSignal(runner.got_cmd, timeout=10000) as blocker:
|
with qtbot.waitSignal(runner.got_cmd, timeout=10000) as blocker:
|
||||||
runner.run(cmd, *args, env=env)
|
runner.prepare_run(cmd, *args)
|
||||||
|
runner.store_html('This is HTML')
|
||||||
|
runner.store_text('This is text')
|
||||||
|
|
||||||
data = blocker.args[0]
|
data = blocker.args[0]
|
||||||
parsed = json.loads(data)
|
parsed = json.loads(data)
|
||||||
assert parsed['text'] == 'This is text'
|
assert parsed['text'] == 'This is text'
|
||||||
assert parsed['html'] == 'This is HTML'
|
assert parsed['html'] == 'This is HTML'
|
||||||
|
|
||||||
assert not text_file.exists()
|
assert not os.path.exists(parsed['text_file'])
|
||||||
assert not html_file.exists()
|
assert not os.path.exists(parsed['html_file'])
|
||||||
|
|
||||||
|
|
||||||
def test_command_with_error(qtbot, tmpdir, py_proc, runner):
|
def test_command_with_error(qtbot, py_proc, runner):
|
||||||
text_file = tmpdir / 'text'
|
|
||||||
text_file.write('This is text')
|
|
||||||
|
|
||||||
env = {'QUTE_TEXT': str(text_file)}
|
|
||||||
cmd, args = py_proc(r"""
|
cmd, args = py_proc(r"""
|
||||||
import sys
|
import sys, os, json
|
||||||
|
|
||||||
|
with open(os.environ['QUTE_FIFO'], 'w') as f:
|
||||||
|
json.dump(os.environ['QUTE_TEXT'], f)
|
||||||
|
f.write('\n')
|
||||||
|
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
""")
|
""")
|
||||||
|
|
||||||
with qtbot.waitSignal(runner.finished, timeout=10000):
|
with qtbot.waitSignal(runner.finished, timeout=10000):
|
||||||
runner.run(cmd, *args, env=env)
|
with qtbot.waitSignal(runner.got_cmd, timeout=10000) as blocker:
|
||||||
|
runner.prepare_run(cmd, *args)
|
||||||
|
runner.store_text('Hello World')
|
||||||
|
runner.store_html('')
|
||||||
|
|
||||||
assert not text_file.exists()
|
data = json.loads(blocker.args[0])
|
||||||
|
assert not os.path.exists(data)
|
||||||
|
|
||||||
|
|
||||||
def test_killed_command(qtbot, tmpdir, py_proc, runner):
|
def test_killed_command(qtbot, tmpdir, py_proc, runner):
|
||||||
text_file = tmpdir / 'text'
|
data_file = tmpdir / 'data'
|
||||||
text_file.write('This is text')
|
|
||||||
|
|
||||||
pidfile = tmpdir / 'pid'
|
|
||||||
watcher = QFileSystemWatcher()
|
watcher = QFileSystemWatcher()
|
||||||
watcher.addPath(str(tmpdir))
|
watcher.addPath(str(tmpdir))
|
||||||
|
|
||||||
env = {'QUTE_TEXT': str(text_file)}
|
|
||||||
cmd, args = py_proc(r"""
|
cmd, args = py_proc(r"""
|
||||||
import os
|
import os
|
||||||
import time
|
import time
|
||||||
import sys
|
import sys
|
||||||
|
import json
|
||||||
|
|
||||||
|
data = {
|
||||||
|
'pid': os.getpid(),
|
||||||
|
'text_file': os.environ['QUTE_TEXT'],
|
||||||
|
}
|
||||||
|
|
||||||
# We can't use QUTE_FIFO to transmit the PID because that wouldn't work
|
# We can't use QUTE_FIFO to transmit the PID because that wouldn't work
|
||||||
# on Windows, where QUTE_FIFO is only monitored after the script has
|
# on Windows, where QUTE_FIFO is only monitored after the script has
|
||||||
# exited.
|
# exited.
|
||||||
|
|
||||||
with open(sys.argv[1], 'w') as f:
|
with open(sys.argv[1], 'w') as f:
|
||||||
f.write(str(os.getpid()))
|
json.dump(data, f)
|
||||||
|
|
||||||
time.sleep(30)
|
time.sleep(30)
|
||||||
""")
|
""")
|
||||||
args.append(str(pidfile))
|
args.append(str(data_file))
|
||||||
|
|
||||||
with qtbot.waitSignal(watcher.directoryChanged, timeout=10000):
|
with qtbot.waitSignal(watcher.directoryChanged, timeout=10000):
|
||||||
runner.run(cmd, *args, env=env)
|
runner.prepare_run(cmd, *args)
|
||||||
|
runner.store_text('Hello World')
|
||||||
|
runner.store_html('')
|
||||||
|
|
||||||
# Make sure the PID was written to the file, not just the file created
|
# Make sure the PID was written to the file, not just the file created
|
||||||
time.sleep(0.5)
|
time.sleep(0.5)
|
||||||
|
|
||||||
|
data = json.load(data_file)
|
||||||
|
|
||||||
with qtbot.waitSignal(runner.finished):
|
with qtbot.waitSignal(runner.finished):
|
||||||
os.kill(int(pidfile.read()), signal.SIGTERM)
|
os.kill(int(data['pid']), signal.SIGTERM)
|
||||||
|
|
||||||
assert not text_file.exists()
|
assert not os.path.exists(data['text_file'])
|
||||||
|
|
||||||
|
|
||||||
def test_temporary_files_failed_cleanup(caplog, qtbot, tmpdir, py_proc,
|
def test_temporary_files_failed_cleanup(caplog, qtbot, py_proc, runner):
|
||||||
runner):
|
|
||||||
"""Delete a temporary file from the script so cleanup fails."""
|
"""Delete a temporary file from the script so cleanup fails."""
|
||||||
test_file = tmpdir / 'test'
|
|
||||||
test_file.write('foo')
|
|
||||||
|
|
||||||
cmd, args = py_proc(r"""
|
cmd, args = py_proc(r"""
|
||||||
import os
|
import os
|
||||||
os.remove(os.environ['QUTE_HTML'])
|
os.remove(os.environ['QUTE_HTML'])
|
||||||
@ -213,10 +222,12 @@ def test_temporary_files_failed_cleanup(caplog, qtbot, tmpdir, py_proc,
|
|||||||
|
|
||||||
with caplog.at_level(logging.ERROR):
|
with caplog.at_level(logging.ERROR):
|
||||||
with qtbot.waitSignal(runner.finished, timeout=10000):
|
with qtbot.waitSignal(runner.finished, timeout=10000):
|
||||||
runner.run(cmd, *args, env={'QUTE_HTML': str(test_file)})
|
runner.prepare_run(cmd, *args)
|
||||||
|
runner.store_text('')
|
||||||
|
runner.store_html('')
|
||||||
|
|
||||||
assert len(caplog.records) == 1
|
assert len(caplog.records) == 1
|
||||||
expected = "Failed to delete tempfile {} (".format(test_file)
|
expected = "Failed to delete tempfile"
|
||||||
assert caplog.records[0].message.startswith(expected)
|
assert caplog.records[0].message.startswith(expected)
|
||||||
|
|
||||||
|
|
||||||
@ -224,30 +235,4 @@ def test_dummy_runner(qtbot):
|
|||||||
runner = userscripts._DummyUserscriptRunner(0)
|
runner = userscripts._DummyUserscriptRunner(0)
|
||||||
with pytest.raises(cmdexc.CommandError):
|
with pytest.raises(cmdexc.CommandError):
|
||||||
with qtbot.waitSignal(runner.finished):
|
with qtbot.waitSignal(runner.finished):
|
||||||
runner.run('cmd', 'arg')
|
runner.prepare_run('cmd', 'arg')
|
||||||
|
|
||||||
|
|
||||||
def test_store_source_none():
|
|
||||||
assert userscripts.store_source(None) == {}
|
|
||||||
|
|
||||||
|
|
||||||
def test_store_source(stubs):
|
|
||||||
expected_text = 'This is text'
|
|
||||||
expected_html = 'This is HTML'
|
|
||||||
|
|
||||||
frame = stubs.FakeWebFrame(plaintext=expected_text, html=expected_html)
|
|
||||||
env = userscripts.store_source(frame)
|
|
||||||
|
|
||||||
with open(env['QUTE_TEXT'], 'r', encoding='utf-8') as f:
|
|
||||||
text = f.read()
|
|
||||||
with open(env['QUTE_HTML'], 'r', encoding='utf-8') as f:
|
|
||||||
html = f.read()
|
|
||||||
|
|
||||||
os.remove(env['QUTE_TEXT'])
|
|
||||||
os.remove(env['QUTE_HTML'])
|
|
||||||
|
|
||||||
assert set(env.keys()) == {'QUTE_TEXT', 'QUTE_HTML'}
|
|
||||||
assert text == expected_text
|
|
||||||
assert html == expected_html
|
|
||||||
assert env['QUTE_TEXT'].endswith('.txt')
|
|
||||||
assert env['QUTE_HTML'].endswith('.html')
|
|
||||||
|
Loading…
Reference in New Issue
Block a user