Use named placeholders for sql queries.

This commit is contained in:
Ryan Roden-Corrent 2017-06-08 21:43:00 -04:00
parent 862f8d3188
commit 22b7b21d5a
5 changed files with 92 additions and 66 deletions

View File

@ -96,30 +96,31 @@ class WebHistory(sql.SqlTable):
self._between_query = sql.Query('SELECT * FROM History '
'where not redirect '
'and not url like "qute://%" '
'and atime > ? '
'and atime <= ? '
'and atime > :earliest '
'and atime <= :latest '
'ORDER BY atime desc')
self._before_query = sql.Query('SELECT * FROM History '
'where not redirect '
'and not url like "qute://%" '
'and atime <= ? '
'and atime <= :latest '
'ORDER BY atime desc '
'limit ? offset ?')
'limit :limit offset :offset')
def __repr__(self):
return utils.get_repr(self, length=len(self))
def __contains__(self, url):
return self._contains_query.run([url]).value()
return self._contains_query.run(val=url).value()
def _add_entry(self, entry):
"""Add an entry to the in-memory database."""
self.insert([entry.url_str(), entry.title, int(entry.atime),
entry.redirect])
self.insert(url=entry.url_str(), title=entry.title,
atime=int(entry.atime), redirect=entry.redirect)
if not entry.redirect:
self.completion.insert([entry.url_str(), entry.title,
int(entry.atime)], replace=True)
self.completion.insert_or_replace(url=entry.url_str(),
title=entry.title,
last_atime=int(entry.atime))
def get_recent(self):
"""Get the most recent history entries."""
@ -132,7 +133,7 @@ class WebHistory(sql.SqlTable):
earliest: Omit timestamps earlier than this.
latest: Omit timestamps later than this.
"""
self._between_query.run([earliest, latest])
self._between_query.run(earliest=earliest, latest=latest)
return iter(self._between_query)
def entries_before(self, latest, limit, offset):
@ -143,7 +144,7 @@ class WebHistory(sql.SqlTable):
limit: Max number of entries to include.
offset: Number of entries to skip.
"""
self._before_query.run([latest, limit, offset])
self._before_query.run(latest=latest, limit=limit, offset=offset)
return iter(self._before_query)
@cmdutils.register(name='history-clear', instance='web-history')

View File

@ -49,7 +49,7 @@ class SqlCategory(QSqlQueryModel):
querystr = 'select {} from {} where ('.format(select, name)
# the incoming pattern will have literal % and _ escaped with '\'
# we need to tell sql to treat '\' as an escape character
querystr += ' or '.join("{} like ? escape '\\'".format(f)
querystr += ' or '.join("{} like :pattern escape '\\'".format(f)
for f in filter_fields)
querystr += ')'
@ -83,5 +83,5 @@ class SqlCategory(QSqlQueryModel):
pattern = re.sub(r' +', '%', pattern)
pattern = '%{}%'.format(pattern)
with debug.log_time('sql', 'Running completion query'):
self._query.run([pattern] * self._param_count)
self._query.run(pattern=pattern)
self.setQuery(self._query)

View File

@ -84,11 +84,11 @@ class Query(QSqlQuery):
rec = self.record()
yield rowtype(*[rec.value(i) for i in range(rec.count())])
def run(self, values=None):
def run(self, **values):
"""Execute the prepared query."""
log.sql.debug('Running SQL query: "{}"'.format(self.lastQuery()))
for val in values or []:
self.addBindValue(val)
for key, val in values.items():
self.bindValue(':{}'.format(key), val)
log.sql.debug('self bindings: {}'.format(self.boundValues()))
if not self.exec_():
raise SqlException('Failed to exec query "{}": "{}"'.format(
@ -162,7 +162,7 @@ class SqlTable(QObject):
Args:
field: Field to match.
"""
return Query("SELECT EXISTS(SELECT * FROM {} WHERE {} = ?)"
return Query("SELECT EXISTS(SELECT * FROM {} WHERE {} = :val)"
.format(self._name, field))
def __len__(self):
@ -181,23 +181,34 @@ class SqlTable(QObject):
Return:
The number of rows deleted.
"""
q = Query("DELETE FROM {} where {} = ?".format(self._name, field))
q.run([value])
q = Query("DELETE FROM {} where {} = :val".format(self._name, field))
q.run(val=value)
if not q.numRowsAffected():
raise KeyError('No row with {} = "{}"'.format(field, value))
self.changed.emit()
def insert(self, values, replace=False):
def insert(self, **values):
"""Append a row to the table.
Args:
values: A list of values to insert.
replace: If set, replace existing values.
"""
paramstr = ','.join(['?'] * len(values))
q = Query("INSERT {} INTO {} values({})".format(
'OR REPLACE' if replace else '', self._name, paramstr))
q.run(values)
paramstr = ','.join(':{}'.format(key) for key in values.keys())
q = Query("INSERT INTO {} values({})".format(self._name, paramstr))
q.run(**values)
self.changed.emit()
def insert_or_replace(self, **values):
"""Append a row to the table.
Args:
values: A list of values to insert.
replace: If set, replace existing values.
"""
paramstr = ','.join(':{}'.format(key) for key in values.keys())
q = Query("REPLACE INTO {} values({})".format(self._name, paramstr))
q.run(**values)
self.changed.emit()
def insert_batch(self, rows, replace=False):
@ -238,7 +249,7 @@ class SqlTable(QObject):
Return: A prepared and executed select query.
"""
q = Query('SELECT * FROM {} ORDER BY {} {} LIMIT ?'
q = Query('SELECT * FROM {} ORDER BY {} {} LIMIT :limit'
.format(self._name, sort_by, sort_order))
q.run([limit])
q.run(limit=limit)
return q

View File

@ -61,7 +61,7 @@ pytestmark = pytest.mark.usefixtures('init_sql')
def test_sorting(sort_by, sort_order, data, expected):
table = sql.SqlTable('Foo', ['a', 'b', 'c'])
for row in data:
table.insert(row)
table.insert(a=row[0], b=row[1], c=row[2])
cat = sqlcategory.SqlCategory('Foo', filter_fields=['a'], sort_by=sort_by,
sort_order=sort_order)
cat.set_pattern('')
@ -117,7 +117,7 @@ def test_set_pattern(pattern, filter_cols, before, after):
"""Validate the filtering and sorting results of set_pattern."""
table = sql.SqlTable('Foo', ['a', 'b', 'c'])
for row in before:
table.insert(row)
table.insert(a=row[0], b=row[1], c=row[2])
filter_fields = [['a', 'b', 'c'][i] for i in filter_cols]
cat = sqlcategory.SqlCategory('Foo', filter_fields=filter_fields)
cat.set_pattern(pattern)
@ -126,7 +126,7 @@ def test_set_pattern(pattern, filter_cols, before, after):
def test_select():
table = sql.SqlTable('Foo', ['a', 'b', 'c'])
table.insert(['foo', 'bar', 'baz'])
table.insert({'a': 'foo', 'b': 'bar', 'c': 'baz'})
cat = sqlcategory.SqlCategory('Foo', filter_fields=['a'], select='b, c, a')
cat.set_pattern('')
utils.validate_model(cat, [('bar', 'baz', 'foo')])
@ -134,8 +134,8 @@ def test_select():
def test_where():
table = sql.SqlTable('Foo', ['a', 'b', 'c'])
table.insert(['foo', 'bar', False])
table.insert(['baz', 'biz', True])
table.insert({'a': 'foo', 'b': 'bar', 'c': False})
table.insert({'a': 'baz', 'b': 'biz', 'c': True})
cat = sqlcategory.SqlCategory('Foo', filter_fields=['a'], where='not c')
cat.set_pattern('')
utils.validate_model(cat, [('foo', 'bar', False)])
@ -143,10 +143,10 @@ def test_where():
def test_group():
table = sql.SqlTable('Foo', ['a', 'b'])
table.insert(['foo', 1])
table.insert(['bar', 3])
table.insert(['foo', 2])
table.insert(['bar', 0])
table.insert({'a': 'foo', 'b': 1})
table.insert({'a': 'bar', 'b': 3})
table.insert({'a': 'foo', 'b': 2})
table.insert({'a': 'bar', 'b': 0})
cat = sqlcategory.SqlCategory('Foo', filter_fields=['a'],
select='a, max(b)', group_by='a')
cat.set_pattern('')

View File

@ -35,39 +35,53 @@ def test_init():
def test_insert(qtbot):
table = sql.SqlTable('Foo', ['name', 'val', 'lucky'])
with qtbot.waitSignal(table.changed):
table.insert(['one', 1, False])
table.insert(name='one', val=1, lucky=False)
with qtbot.waitSignal(table.changed):
table.insert(['wan', 1, False])
table.insert(name='wan', val=1, lucky=False)
def test_insert_or_replace(qtbot):
table = sql.SqlTable('Foo', ['name', 'val', 'lucky'],
constraints={'name': 'PRIMARY KEY'})
with qtbot.waitSignal(table.changed):
table.insert_or_replace(name='one', val=1, lucky=False)
with qtbot.waitSignal(table.changed):
table.insert_or_replace(name='one', val=11, lucky=True)
assert list(table) == [('one', 11, True)]
def test_iter():
table = sql.SqlTable('Foo', ['name', 'val', 'lucky'])
table.insert(['one', 1, False])
table.insert(['nine', 9, False])
table.insert(['thirteen', 13, True])
table.insert(name='one', val=1, lucky=False)
table.insert(name='nine', val=9, lucky=False)
table.insert(name='thirteen', val=13, lucky=True)
assert list(table) == [('one', 1, False),
('nine', 9, False),
('thirteen', 13, True)]
@pytest.mark.parametrize('rows, sort_by, sort_order, limit, result', [
([[2, 5], [1, 6], [3, 4]], 'a', 'asc', 5, [(1, 6), (2, 5), (3, 4)]),
([[2, 5], [1, 6], [3, 4]], 'a', 'desc', 3, [(3, 4), (2, 5), (1, 6)]),
([[2, 5], [1, 6], [3, 4]], 'b', 'desc', 2, [(1, 6), (2, 5)]),
([[2, 5], [1, 6], [3, 4]], 'a', 'asc', -1, [(1, 6), (2, 5), (3, 4)]),
([{"a": 2, "b": 5}, {"a": 1, "b": 6}, {"a": 3, "b": 4}], 'a', 'asc', 5,
[(1, 6), (2, 5), (3, 4)]),
([{"a": 2, "b": 5}, {"a": 1, "b": 6}, {"a": 3, "b": 4}], 'a', 'desc', 3,
[(3, 4), (2, 5), (1, 6)]),
([{"a": 2, "b": 5}, {"a": 1, "b": 6}, {"a": 3, "b": 4}], 'b', 'desc', 2,
[(1, 6), (2, 5)]),
([{"a": 2, "b": 5}, {"a": 1, "b": 6}, {"a": 3, "b": 4}], 'a', 'asc', -1,
[(1, 6), (2, 5), (3, 4)]),
])
def test_select(rows, sort_by, sort_order, limit, result):
table = sql.SqlTable('Foo', ['a', 'b'])
for row in rows:
table.insert(row)
table.insert(**row)
assert list(table.select(sort_by, sort_order, limit)) == result
def test_delete(qtbot):
table = sql.SqlTable('Foo', ['name', 'val', 'lucky'])
table.insert(['one', 1, False])
table.insert(['nine', 9, False])
table.insert(['thirteen', 13, True])
table.insert(name='one', val=1, lucky=False)
table.insert(name='nine', val=9, lucky=False)
table.insert(name='thirteen', val=13, lucky=True)
with pytest.raises(KeyError):
table.delete('nope', 'name')
with qtbot.waitSignal(table.changed):
@ -81,40 +95,40 @@ def test_delete(qtbot):
def test_len():
table = sql.SqlTable('Foo', ['name', 'val', 'lucky'])
assert len(table) == 0
table.insert(['one', 1, False])
table.insert(name='one', val=1, lucky=False)
assert len(table) == 1
table.insert(['nine', 9, False])
table.insert(name='nine', val=9, lucky=False)
assert len(table) == 2
table.insert(['thirteen', 13, True])
table.insert(name='thirteen', val=13, lucky=True)
assert len(table) == 3
def test_contains():
table = sql.SqlTable('Foo', ['name', 'val', 'lucky'])
table.insert(['one', 1, False])
table.insert(['nine', 9, False])
table.insert(['thirteen', 13, True])
table.insert(name='one', val=1, lucky=False)
table.insert(name='nine', val=9, lucky=False)
table.insert(name='thirteen', val=13, lucky=True)
name_query = table.contains_query('name')
val_query = table.contains_query('val')
lucky_query = table.contains_query('lucky')
assert name_query.run(['one']).value()
assert name_query.run(['thirteen']).value()
assert val_query.run([9]).value()
assert lucky_query.run([False]).value()
assert lucky_query.run([True]).value()
assert not name_query.run(['oone']).value()
assert not name_query.run([1]).value()
assert not name_query.run(['*']).value()
assert not val_query.run([10]).value()
assert name_query.run(val='one').value()
assert name_query.run(val='thirteen').value()
assert val_query.run(val=9).value()
assert lucky_query.run(val=False).value()
assert lucky_query.run(val=True).value()
assert not name_query.run(val='oone').value()
assert not name_query.run(val=1).value()
assert not name_query.run(val='*').value()
assert not val_query.run(val=10).value()
def test_delete_all(qtbot):
table = sql.SqlTable('Foo', ['name', 'val', 'lucky'])
table.insert(['one', 1, False])
table.insert(['nine', 9, False])
table.insert(['thirteen', 13, True])
table.insert(name='one', val=1, lucky=False)
table.insert(name='nine', val=9, lucky=False)
table.insert(name='thirteen', val=13, lucky=True)
with qtbot.waitSignal(table.changed):
table.delete_all()
assert list(table) == []