1
0
mirror of https://github.com/vikstrous/pirate-get synced 2025-01-09 09:59:51 +01:00

reimplement sorting

This commit is contained in:
Michele Guerini Rocco 2020-05-22 10:04:15 +02:00
parent 8eefc63226
commit 03c5a396e1
Signed by: rnhmjoj
GPG Key ID: BFBAF4C975F76450
5 changed files with 65 additions and 49 deletions

View File

@ -1,15 +1,15 @@
{ {
"TitleDsc": 1, "TitleDsc": [1, "name", true],
"TitleAsc": 2, "TitleAsc": [2, "name", false],
"DateDsc": 3, "DateDsc": [3, "raw_uploaded", true],
"DateAsc": 4, "DateAsc": [4, "raw_uploaded", false],
"SizeDsc": 5, "SizeDsc": [5, "raw_size", true],
"SizeAsc": 6, "SizeAsc": [6, "raw_size", false],
"SeedersDsc": 7, "SeedersDsc": [7, "seeders", true],
"SeedersAsc": 8, "SeedersAsc": [8, "seeders", false],
"LeechersDsc": 9, "LeechersDsc": [9, "leechers", true],
"LeechersAsc": 10, "LeechersAsc": [10, "leechers", false],
"CategoryDsc": 13, "CategoryDsc": [13, "category", true],
"CategoryAsc": 14, "CategoryAsc": [14, "category", false],
"Default": 99 "Default": [99, "seeders", true]
} }

View File

@ -344,7 +344,7 @@ def pirate_main(args):
cur_color = 'zebra_0' cur_color = 'zebra_0'
for key, value in sorted(pirate.data.sorts.items()): for key, value in sorted(pirate.data.sorts.items()):
cur_color = 'zebra_0' if cur_color == 'zebra_1' else 'zebra_1' cur_color = 'zebra_0' if cur_color == 'zebra_1' else 'zebra_1'
printer.print(str(value), '\t', key, sep='', color=cur_color) printer.print(str(value[0]), '\t', key, sep='', color=cur_color)
return return
# fetch torrents # fetch torrents

View File

@ -33,13 +33,42 @@ def parse_sort(printer, sort):
sort = int(sort) sort = int(sort)
except ValueError: except ValueError:
pass pass
if sort in pirate.data.sorts.values(): for key, val in pirate.data.sorts.items():
return sort if sort == key or sort == val[0]:
elif sort in pirate.data.sorts.keys(): return val[1:]
return pirate.data.sorts[sort]
else: else:
printer.print('Invalid sort ignored', color='WARN') printer.print('Invalid sort ignored', color='WARN')
return 99 return pirate.data.sorts['Default'][1:]
def parse_page(page):
results = []
try:
data = json.load(page)
except json.decoder.JSONDecodeError:
raise IOError('invalid JSON in API reply: blocked mirror?')
if len(data) == 1 and 'No results' in data[0]['name']:
return results
for res in data:
res['raw_size'] = int(res['size'])
res['size'] = pretty_size(int(res['size']))
res['magnet'] = build_magnet(res['name'], res['info_hash'])
res['info_hash'] = int(res['info_hash'], 16)
res['raw_uploaded'] = int(res['added'])
res['uploaded'] = pretty_date(res['added'])
res['seeders'] = int(res['seeders'])
res['leechers'] = int(res['leechers'])
res['category'] = int(res['category'])
results.append(res)
return results
def sort_results(sort, res):
key, reverse = sort
return sorted(res, key=lambda x: x[key], reverse=reverse)
def pretty_size(size): def pretty_size(size):
@ -59,31 +88,11 @@ def pretty_date(ts):
return date.strftime('%Y-%m-%d %H:%M') return date.strftime('%Y-%m-%d %H:%M')
def make_magnet(name, info_hash): def build_magnet(name, info_hash):
return 'magnet:?xt=urn:btih:{}&dn={}'.format( return 'magnet:?xt=urn:btih:{}&dn={}'.format(
info_hash, parse.quote(name, '')) info_hash, parse.quote(name, ''))
def parse_page(page):
results = []
try:
data = json.load(page)
except json.decoder.JSONDecodeError:
raise IOError('invalid JSON in API reply: blocked mirror?')
if len(data) == 1 and 'No results' in data[0]['name']:
return results
for res in data:
res['size'] = pretty_size(int(res['size']))
res['magnet'] = make_magnet(res['name'], res['info_hash'])
res['info_hash'] = int(res['info_hash'], 16)
res['uploaded'] = pretty_date(res['added'])
results.append(res)
return results
def build_request_path(mode, category, terms): def build_request_path(mode, category, terms):
if mode == 'search': if mode == 'search':
query = '/q.php?q={}&cat={}'.format(' '.join(terms), category) query = '/q.php?q={}&cat={}'.format(' '.join(terms), category)
@ -114,12 +123,12 @@ def remote(printer, category, sort, mode, terms, mirror, timeout):
if f.info().get('Content-Encoding') == 'gzip': if f.info().get('Content-Encoding') == 'gzip':
f = gzip.GzipFile(fileobj=BytesIO(f.read())) f = gzip.GzipFile(fileobj=BytesIO(f.read()))
return parse_page(f)
except KeyboardInterrupt: except KeyboardInterrupt:
printer.print('\nCancelled.') printer.print('\nCancelled.')
sys.exit(0) sys.exit(0)
return sort_results(sort, parse_page(f))
def find_api(mirror, timeout): def find_api(mirror, timeout):
# try common paths # try common paths

File diff suppressed because one or more lines are too long

View File

@ -38,6 +38,7 @@ class TestTorrent(unittest.TestCase):
expected = json.load(file) expected = json.load(file)
with util.open_data('debian_iso.json') as res: with util.open_data('debian_iso.json') as res:
actual = pirate.torrent.parse_page(res) actual = pirate.torrent.parse_page(res)
json.dump(actual, open('result.json', 'w'))
self.assertEqual(actual, expected) self.assertEqual(actual, expected)
def test_parse_category(self): def test_parse_category(self):
@ -54,13 +55,17 @@ class TestTorrent(unittest.TestCase):
def test_parse_sort(self): def test_parse_sort(self):
sort = pirate.torrent.parse_sort(MagicMock(Printer), 'SeedersDsc') sort = pirate.torrent.parse_sort(MagicMock(Printer), 'SeedersDsc')
self.assertEqual(7, sort) self.assertEqual(['seeders', True], sort)
sort = pirate.torrent.parse_sort(MagicMock(Printer), 'CategoryAsc')
self.assertEqual(['category', False], sort)
sort = pirate.torrent.parse_sort(MagicMock(Printer), 'DateAsc')
self.assertEqual(['raw_uploaded', False], sort)
sort = pirate.torrent.parse_sort(MagicMock(Printer), '7') sort = pirate.torrent.parse_sort(MagicMock(Printer), '7')
self.assertEqual(7, sort) self.assertEqual(['seeders', True], sort)
sort = pirate.torrent.parse_sort(MagicMock(Printer), 'asdf') sort = pirate.torrent.parse_sort(MagicMock(Printer), 'asdf')
self.assertEqual(99, sort) self.assertEqual(['seeders', True], sort)
sort = pirate.torrent.parse_sort(MagicMock(Printer), '7000') sort = pirate.torrent.parse_sort(MagicMock(Printer), '7000')
self.assertEqual(99, sort) self.assertEqual(['seeders', True], sort)
def test_request_path(self): def test_request_path(self):
# the args are (mode, category, terms) # the args are (mode, category, terms)
@ -144,10 +149,12 @@ class TestTorrent(unittest.TestCase):
info = mock.MagicMock(return_value=MockInfo()) info = mock.MagicMock(return_value=MockInfo())
res_obj = MockResponse() res_obj = MockResponse()
sort = pirate.torrent.parse_sort(MagicMock(Printer), 10)
with patch('urllib.request.Request', return_value=req_obj) as req: with patch('urllib.request.Request', return_value=req_obj) as req:
with patch('urllib.request.urlopen', return_value=res_obj) as res: with patch('urllib.request.urlopen', return_value=res_obj) as res:
results = pirate.torrent.remote( results = pirate.torrent.remote(
MagicMock(Printer), 100, 10, 'top', MagicMock(Printer), 100, sort, 'top',
[], 'http://example.com', 9) [], 'http://example.com', 9)
req.assert_called_once_with( req.assert_called_once_with(
'http://example.com/precompiled/data_top100_100.json', 'http://example.com/precompiled/data_top100_100.json',