qutebrowser/qutebrowser/utils/qtutils.py
Florian Bruhin 35f0b26f4a PyQIODevice: Remove readinto().
Our implementation was broken, and the BufferedIOBase mixin does a better job
at doing this.
2015-05-26 20:57:01 +02:00

370 lines
10 KiB
Python

# vim: ft=python fileencoding=utf-8 sts=4 sw=4 et:
# Copyright 2014-2015 Florian Bruhin (The Compiler) <mail@qutebrowser.org>
#
# This file is part of qutebrowser.
#
# qutebrowser is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# qutebrowser is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with qutebrowser. If not, see <http://www.gnu.org/licenses/>.
"""Misc. utilities related to Qt.
Module attributes:
MAXVALS: A dictionary of C/Qt types (as string) mapped to their maximum
value.
MINVALS: A dictionary of C/Qt types (as string) mapped to their minimum
value.
"""
import io
import os
import sys
import operator
import distutils.version # pylint: disable=no-name-in-module,import-error
# https://bitbucket.org/logilab/pylint/issue/73/
import contextlib
from PyQt5.QtCore import (qVersion, QEventLoop, QDataStream, QByteArray,
QIODevice, QSaveFile)
from PyQt5.QtWidgets import QApplication
MAXVALS = {
'int': 2 ** 31 - 1,
'int64': 2 ** 63 - 1,
}
MINVALS = {
'int': -(2 ** 31),
'int64': -(2 ** 63),
}
def version_check(version, op=operator.ge):
"""Check if the Qt runtime version is the version supplied or newer.
Args:
version: The version to check against.
op: The operator to use for the check.
"""
# pylint: disable=no-member
# https://bitbucket.org/logilab/pylint/issue/73/
return op(distutils.version.StrictVersion(qVersion()),
distutils.version.StrictVersion(version))
def check_overflow(arg, ctype, fatal=True):
"""Check if the given argument is in bounds for the given type.
Args:
arg: The argument to check
ctype: The C/Qt type to check as a string.
fatal: Wether to raise exceptions (True) or truncate values (False)
Return
The truncated argument if fatal=False
The original argument if it's in bounds.
"""
maxval = MAXVALS[ctype]
minval = MINVALS[ctype]
if arg > maxval:
if fatal:
raise OverflowError(arg)
else:
return maxval
elif arg < minval:
if fatal:
raise OverflowError(arg)
else:
return minval
else:
return arg
def get_args(namespace):
"""Get the Qt QApplication arguments based on an argparse namespace.
Args:
namespace: The argparse namespace.
Return:
The argv list to be passed to Qt.
"""
argv = [sys.argv[0]]
for argname, val in vars(namespace).items():
if not argname.startswith('qt_'):
continue
elif val is None:
# flag/argument not given
continue
elif val is True:
argv.append('-' + argname[3:])
else:
argv.append('-' + argname[3:])
argv.append(val)
return argv
def check_print_compat():
"""Check if printing should work in the given Qt version."""
# WORKAROUND (remove this when we bump the requirements to 5.3.0)
return not (os.name == 'nt' and version_check('5.3.0', operator.lt))
def ensure_valid(obj):
"""Ensure a Qt object with an .isValid() method is valid."""
if not obj.isValid():
raise QtValueError(obj)
def ensure_not_null(obj):
"""Ensure a Qt object with an .isNull() method is not null."""
if obj.isNull():
raise QtValueError(obj, null=True)
def check_qdatastream(stream):
"""Check the status of a QDataStream and raise OSError if it's not ok."""
status_to_str = {
QDataStream.Ok: "The data stream is operating normally.",
QDataStream.ReadPastEnd: ("The data stream has read past the end of "
"the data in the underlying device."),
QDataStream.ReadCorruptData: "The data stream has read corrupt data.",
QDataStream.WriteFailed: ("The data stream cannot write to the "
"underlying device."),
}
if stream.status() != QDataStream.Ok:
raise OSError(status_to_str[stream.status()])
def serialize(obj):
"""Serialize an object into a QByteArray."""
data = QByteArray()
stream = QDataStream(data, QIODevice.WriteOnly)
serialize_stream(stream, obj)
return data
def deserialize(data, obj):
"""Deserialize an object from a QByteArray."""
stream = QDataStream(data, QIODevice.ReadOnly)
deserialize_stream(stream, obj)
def serialize_stream(stream, obj):
"""Serialize an object into a QDataStream."""
check_qdatastream(stream)
stream << obj # pylint: disable=pointless-statement
check_qdatastream(stream)
def deserialize_stream(stream, obj):
"""Deserialize a QDataStream into an object."""
check_qdatastream(stream)
stream >> obj # pylint: disable=pointless-statement
check_qdatastream(stream)
@contextlib.contextmanager
def savefile_open(filename, binary=False, encoding='utf-8'):
"""Context manager to easily use a QSaveFile."""
f = QSaveFile(filename)
cancelled = False
try:
ok = f.open(QIODevice.WriteOnly)
if not ok:
raise OSError(f.errorString())
if binary:
new_f = PyQIODevice(f)
else:
new_f = io.TextIOWrapper(PyQIODevice(f), encoding=encoding)
yield new_f
except:
f.cancelWriting()
cancelled = True
raise
else:
if new_f is not None:
new_f.flush()
finally:
commit_ok = f.commit()
if not commit_ok and not cancelled:
raise OSError("Commit failed!")
@contextlib.contextmanager
def unset_organization():
"""Temporarily unset QApplication.organizationName().
This is primarily needed in config.py.
"""
qapp = QApplication.instance()
orgname = qapp.organizationName()
qapp.setOrganizationName(None)
try:
yield
finally:
qapp.setOrganizationName(orgname)
class PyQIODevice(io.BufferedIOBase):
"""Wrapper for a QIODevice which provides a python interface.
Attributes:
_dev: The underlying QIODevice.
"""
# pylint: disable=missing-docstring
def __init__(self, dev):
self._dev = dev
def __len__(self):
return self._dev.size()
def _check_open(self):
"""Check if the device is open, raise ValueError if not."""
if not self._dev.isOpen():
raise ValueError("IO operation on closed device!")
def _check_random(self):
"""Check if the device supports random access, raise OSError if not."""
if not self.seekable():
raise OSError("Random access not allowed!")
def fileno(self):
raise io.UnsupportedOperation
def seek(self, offset, whence=io.SEEK_SET):
self._check_open()
self._check_random()
if whence == io.SEEK_SET:
ok = self._dev.seek(offset)
elif whence == io.SEEK_CUR:
ok = self._dev.seek(self.tell() + offset)
elif whence == io.SEEK_END:
ok = self._dev.seek(len(self) + offset)
else:
raise io.UnsupportedOperation("whence = {} is not "
"supported!".format(whence))
if not ok:
raise OSError(self._dev.errorString())
def truncate(self, size=None): # pylint: disable=unused-argument
raise io.UnsupportedOperation
def close(self):
self._dev.close()
@property
def closed(self):
return not self._dev.isOpen()
def flush(self):
self._check_open()
self._dev.waitForBytesWritten(-1)
def isatty(self):
self._check_open()
return False
def readable(self):
return self._dev.isReadable()
def readline(self, size=-1):
self._check_open()
if size == -1:
size = 0
return self._dev.readLine(size)
def seekable(self):
return not self._dev.isSequential()
def tell(self):
self._check_open()
self._check_random()
return self._dev.pos()
def writable(self):
return self._dev.isWritable()
def write(self, b):
self._check_open()
num = self._dev.write(b)
if num == -1 or num < len(b):
raise OSError(self._dev.errorString())
return num
def read(self, size=-1):
self._check_open()
self._check_readable()
if size == 0:
# Read no data
return b''
elif size < 0:
# Read all data
if self._dev.bytesAvailable() > 0:
buf = self._dev.readAll()
if not buf:
raise OSError(self._dev.errorString())
else:
return b''
else:
if self._dev.bytesAvailable() > 0:
buf = self._dev.read(size)
if not buf:
raise OSError(self._dev.errorString())
else:
return b''
return buf
class QtValueError(ValueError):
"""Exception which gets raised by ensure_valid."""
def __init__(self, obj, null=False):
try:
self.reason = obj.errorString()
except AttributeError:
self.reason = None
if null:
err = "{} is null".format(obj)
else:
err = "{} is not valid".format(obj)
if self.reason:
err += ": {}".format(self.reason)
super().__init__(err)
class EventLoop(QEventLoop):
"""A thin wrapper around QEventLoop.
Raises an exception when doing exec_() multiple times.
"""
def __init__(self, parent=None):
super().__init__(parent)
self._executing = False
def exec_(self, flags=QEventLoop.AllEvents):
"""Override exec_ to raise an exception when re-running."""
if self._executing:
raise AssertionError("Eventloop is already running!")
self._executing = True
super().exec_(flags)
self._executing = False