1
0
mirror of https://github.com/vikstrous/pirate-get synced 2025-01-10 10:04:21 +01:00

fix test falldown from new api change

This commit is contained in:
Michele Guerini Rocco 2020-05-21 14:21:55 +02:00
parent f9f491ea65
commit 7f879cc31f
Signed by: rnhmjoj
GPG Key ID: BFBAF4C975F76450
12 changed files with 229 additions and 816 deletions

View File

@ -286,7 +286,7 @@ def search_mirrors(printer, args):
# try default or user mirrors # try default or user mirrors
for mirror in args.mirror: for mirror in args.mirror:
result = connect_mirror(mirror, printer, args) result = connect_mirror(mirror, printer, args)
if result: if result is not None:
return result return result
# download mirror list # download mirror list
@ -308,7 +308,7 @@ def search_mirrors(printer, args):
if mirror in pirate.data.blacklist: if mirror in pirate.data.blacklist:
continue continue
result = connect_mirror(mirror, printer, args) result = connect_mirror(mirror, printer, args)
if result: if result is not None:
return result return result
else: else:
raise IOError('No more available mirrors') raise IOError('No more available mirrors')

View File

@ -9,8 +9,8 @@ import pirate.data
import pirate.torrent import pirate.torrent
import colorama import colorama
import veryprettytable as pretty
from veryprettytable import VeryPrettyTable
from io import BytesIO from io import BytesIO
@ -45,12 +45,12 @@ class Printer:
even = True even = True
if local: if local:
table = VeryPrettyTable(['LINK', 'DATE', 'SIZE', 'NAME']) table = pretty.VeryPrettyTable(['LINK', 'DATE', 'SIZE', 'NAME'])
table.align['SIZE'] = 'r' table.align['SIZE'] = 'r'
table.align['NAME'] = 'l' table.align['NAME'] = 'l'
else: else:
table = VeryPrettyTable(['LINK', 'SEED', 'LEECH', table = pretty.VeryPrettyTable(['LINK', 'SEED', 'LEECH',
'RATIO', 'SIZE', 'RATIO', 'SIZE',
'UPLOAD', 'NAME']) 'UPLOAD', 'NAME'])
table.align['NAME'] = 'l' table.align['NAME'] = 'l'
@ -99,7 +99,7 @@ class Printer:
for link in chosen_links: for link in chosen_links:
result = results[link] result = results[link]
req = request.Request( req = request.Request(
site + '/t.php?id=' + result['id'], site + '/t.php?id=' + str(result['id']),
headers=pirate.data.default_headers) headers=pirate.data.default_headers)
req.add_header('Accept-encoding', 'gzip') req.add_header('Accept-encoding', 'gzip')
f = request.urlopen(req, timeout=timeout) f = request.urlopen(req, timeout=timeout)
@ -128,7 +128,7 @@ class Printer:
for link in chosen_links: for link in chosen_links:
result = results[link] result = results[link]
req = request.Request( req = request.Request(
site + '/f.php?id=' + result['id'], site + '/f.php?id=' + str(result['id']),
headers=pirate.data.default_headers) headers=pirate.data.default_headers)
req.add_header('Accept-encoding', 'gzip') req.add_header('Accept-encoding', 'gzip')
f = request.urlopen(req, timeout=timeout) f = request.urlopen(req, timeout=timeout)

View File

@ -64,9 +64,27 @@ def make_magnet(name, info_hash):
info_hash, parse.quote(name, '')) info_hash, parse.quote(name, ''))
def remote(printer, category, sort, mode, terms, mirror, timeout): def parse_page(page):
results = [] results = []
try:
data = json.load(page)
except json.decoder.JSONDecodeError:
raise IOError('invalid JSON in API reply: blocked mirror?')
if len(data) == 1 and 'No results' in data[0]['name']:
return results
for res in data:
res['size'] = pretty_size(int(res['size']))
res['magnet'] = make_magnet(res['name'], res['info_hash'])
res['info_hash'] = int(res['info_hash'], 16)
res['uploaded'] = pretty_date(res['added'])
results.append(res)
return results
def remote(printer, category, sort, mode, terms, mirror, timeout):
# special query when no terms # special query when no terms
if not terms: if not terms:
if category == 0: if category == 0:
@ -87,19 +105,7 @@ def remote(printer, category, sort, mode, terms, mirror, timeout):
if f.info().get('Content-Encoding') == 'gzip': if f.info().get('Content-Encoding') == 'gzip':
f = gzip.GzipFile(fileobj=BytesIO(f.read())) f = gzip.GzipFile(fileobj=BytesIO(f.read()))
data = json.load(f) return parse_page(f)
if len(data) == 1 and 'No results' in data[0]['name']:
return []
for res in data:
res['size'] = pretty_size(int(res['size']))
res['magnet'] = make_magnet(res['name'], res['info_hash'])
res['info_hash'] = int(res['info_hash'], 16)
res['uploaded'] = pretty_date(res['added'])
results.append(res)
return results
except KeyboardInterrupt: except KeyboardInterrupt:
printer.print('\nCancelled.') printer.print('\nCancelled.')

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

1
tests/data/no_hits.json Normal file
View File

@ -0,0 +1 @@
[{"id":"0","name":"No results returned","info_hash":"0000000000000000000000000000000000000000","leechers":"0","seeders":"0","num_files":"0","size":"0","username":"","added":"0","status":"member","category":"0","imdb":"","total_found":"1"}]

1
tests/data/result.json Normal file

File diff suppressed because one or more lines are too long

View File

@ -1,7 +1,6 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import socket import socket
import unittest import unittest
import subprocess
from argparse import Namespace from argparse import Namespace
from unittest import mock from unittest import mock
from unittest.mock import patch, call, MagicMock from unittest.mock import patch, call, MagicMock
@ -28,33 +27,40 @@ class TestPirate(unittest.TestCase):
@patch('subprocess.call') @patch('subprocess.call')
def test_main(self, mock_call): def test_main(self, mock_call):
result = { result = {
'magnet': 'dn=derp', 'name': 'derp',
'seeds': '1', 'magnet': 'magnet:?xt=urn:btih:deadbeef&dn=derp',
'seeders': '1',
'leechers': '1', 'leechers': '1',
'size': ('1', 'mb'), 'size': ('1', 'mb'),
'uploaded': '1', 'uploaded': '1',
} }
with patch('pirate.torrent.remote', return_value=[result]) as mock_remote: with patch('pirate.torrent.remote', return_value=[result]):
config = pirate.pirate.parse_config_file('') config = pirate.pirate.parse_config_file('')
args = pirate.pirate.combine_configs(config, pirate.pirate.parse_args(['-0', 'term', '-C', 'blah %s'])) args = pirate.pirate.combine_configs(
config,
pirate.pirate.parse_args(['-0', 'term', '-C', 'blah %s']))
pirate.pirate.pirate_main(args) pirate.pirate.pirate_main(args)
mock_call.assert_called_once_with(['blah', 'dn=derp']) mock_call.assert_called_once_with(
['blah', 'magnet:?xt=urn:btih:deadbeef&dn=derp'])
@patch('pirate.pirate.builtins.input', return_value='0') @patch('pirate.pirate.builtins.input', return_value='0')
@patch('subprocess.call') @patch('subprocess.call')
def test_main_choice(self, mock_call, mock_input): def test_main_choice(self, mock_call, mock_input):
result = { result = {
'magnet': 'dn=derp', 'name': 'derp',
'seeds': '1', 'magnet': 'magnet:?xt=urn:btih:deadbeef&dn=derp',
'seeders': '1',
'leechers': '1', 'leechers': '1',
'size': ('1', 'mb'), 'size': ('1', 'mb'),
'uploaded': '1', 'uploaded': '1',
} }
with patch('pirate.torrent.remote', return_value=[result]) as mock_remote: with patch('pirate.torrent.remote', return_value=[result]):
config = pirate.pirate.parse_config_file('') config = pirate.pirate.parse_config_file('')
args = pirate.pirate.combine_configs(config, pirate.pirate.parse_args(['term', '-C', 'blah %s'])) args = pirate.pirate.combine_configs(
config, pirate.pirate.parse_args(['term', '-C', 'blah %s']))
pirate.pirate.pirate_main(args) pirate.pirate.pirate_main(args)
mock_call.assert_called_once_with(['blah', 'dn=derp']) mock_call.assert_called_once_with(
['blah', 'magnet:?xt=urn:btih:deadbeef&dn=derp'])
def test_parse_torrent_command(self): def test_parse_torrent_command(self):
tests = [ tests = [
@ -74,7 +80,9 @@ class TestPirate(unittest.TestCase):
[['1-3'], (None, [1, 2, 3])], [['1-3'], (None, [1, 2, 3])],
] ]
for test in tests: for test in tests:
self.assertEqual(pirate.pirate.parse_torrent_command(*test[0]), test[1]) self.assertEqual(
pirate.pirate.parse_torrent_command(*test[0]),
test[1])
def test_parse_config_file(self): def test_parse_config_file(self):
types = { types = {
@ -128,16 +136,30 @@ class TestPirate(unittest.TestCase):
('', ['-l'], {'action': 'list_categories'}), ('', ['-l'], {'action': 'list_categories'}),
('', ['--list_sorts'], {'action': 'list_sorts'}), ('', ['--list_sorts'], {'action': 'list_sorts'}),
('', ['term'], {'action': 'search', 'source': 'tpb'}), ('', ['term'], {'action': 'search', 'source': 'tpb'}),
('', ['-L', 'filename', 'term'], {'action': 'search', 'source': 'local_tpb', 'database': 'filename'}), ('',
('', ['term', '-S', 'dir'], {'action': 'search', 'save_directory': 'dir'}), ['-L', 'filename', 'term'],
('', ['-E', 'localhost:1337'], {'transmission_command': ['transmission-remote', 'localhost:1337']}), {'action': 'search', 'source': 'local_tpb',
'database': 'filename'}),
('',
['term', '-S', 'dir'],
{'action': 'search', 'save_directory': 'dir'}),
('',
['-E', 'localhost:1337'],
{'transmission_command':
['transmission-remote', 'localhost:1337']}),
('', ['term'], {'output': 'browser_open'}), ('', ['term'], {'output': 'browser_open'}),
('', ['term', '-t'], {'output': 'transmission'}), ('', ['term', '-t'], {'output': 'transmission'}),
('', ['term', '--save-magnets'], {'output': 'save_magnet_files'}), ('', ['term', '--save-magnets'], {'output': 'save_magnet_files'}),
('', ['term', '--save-torrents'], {'output': 'save_torrent_files'}), ('',
('', ['term', '-C', 'command'], {'output': 'open_command', 'open_command': 'command'}), ['term', '-C', 'command'],
{'output': 'open_command', 'open_command': 'command'}),
('', ['internets'], {'action': 'search', 'search': ['internets']}), ('', ['internets'], {'action': 'search', 'search': ['internets']}),
('', ['internets lol', 'lel'], {'action': 'search', 'search': ['internets lol', 'lel']}), ('',
['term', '--save-torrents'],
{'output': 'save_torrent_files'}),
('',
['internets lol', 'lel'],
{'action': 'search', 'search': ['internets lol', 'lel']}),
] ]
for test in tests: for test in tests:
args = pirate.pirate.parse_args(test[1]) args = pirate.pirate.parse_args(test[1])
@ -148,29 +170,36 @@ class TestPirate(unittest.TestCase):
self.assertEqual(test[2][option], value) self.assertEqual(test[2][option], value)
def test_search_mirrors(self): def test_search_mirrors(self):
args = Namespace(pages=1, category=100, sort=10, args = Namespace(
category=100, sort=10,
action='browse', search=[], action='browse', search=[],
mirror=[pirate.data.default_mirror]) mirror=[pirate.data.default_mirror],
timeout=pirate.data.default_timeout)
class MockResponse(): class MockResponse():
readlines = mock.MagicMock(return_value=[x.encode('utf-8') for x in ['', '', '', 'https://example.com']]) readlines = mock.MagicMock(
return_value=[
x.encode('utf-8') for x in
['', '', '', 'https://example.com']])
info = mock.MagicMock() info = mock.MagicMock()
getcode = mock.MagicMock(return_value=200) getcode = mock.MagicMock(return_value=200)
response_obj = MockResponse() response_obj = MockResponse()
returns = [None, ([], 'https://example.com')]
printer = MagicMock(Printer) printer = MagicMock(Printer)
with patch('urllib.request.urlopen', return_value=response_obj) as urlopen: with patch('pirate.pirate.connect_mirror',
with patch('pirate.torrent.remote', return_value=[]) as remote: side_effect=returns) as connect:
results, mirror = pirate.pirate.search_mirrors(printer, args) with patch('urllib.request.urlopen', return_value=response_obj):
self.assertEqual(results, [])
self.assertEqual(mirror, pirate.data.default_mirror)
remote.assert_called_once_with(printer=printer, pages=1, category=100, sort=10, mode='browse', terms=[], mirror=pirate.data.default_mirror)
with patch('pirate.torrent.remote', side_effect=[socket.timeout, []]) as remote:
results, mirror = pirate.pirate.search_mirrors(printer, args) results, mirror = pirate.pirate.search_mirrors(printer, args)
connect.assert_has_calls([
call(pirate.data.default_mirror, printer, args),
call('https://example.com', printer, args)])
self.assertEqual(results, []) self.assertEqual(results, [])
self.assertEqual(mirror, 'https://example.com') self.assertEqual(mirror, 'https://example.com')
remote.assert_has_calls([
call(printer=printer, pages=1, category=100, sort=10, mode='browse', terms=[], mirror=pirate.data.default_mirror),
call(printer=printer, pages=1, category=100, sort=10, mode='browse', terms=[], mirror='https://example.com')
])
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()

View File

@ -1,8 +1,9 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import os import os
import unittest import unittest
from unittest.mock import patch, call, MagicMock import json
from unittest.mock import patch, call, MagicMock
from pirate.print import Printer from pirate.print import Printer
@ -19,17 +20,21 @@ class TestPrint(unittest.TestCase):
mock = MockTable() mock = MockTable()
printer = Printer(False) printer = Printer(False)
printer.print = MagicMock() printer.print = MagicMock()
with patch('veryprettytable.VeryPrettyTable', return_value=mock) as prettytable: with patch('veryprettytable.VeryPrettyTable',
return_value=mock) as prettytable:
results = [{ results = [{
'magnet': 'dn=name', 'name': 'name',
'seeds': 1, 'seeders': 1,
'leechers': 2, 'leechers': 2,
'size': ['3','MiB'], 'size': '3.0 MiB',
'uploaded': 'never' 'uploaded': 'never'
}] }]
printer.search_results(results) printer.search_results(results)
prettytable.assert_called_once_with(['LINK', 'SEED', 'LEECH', 'RATIO', 'SIZE', 'UPLOAD', 'NAME']) prettytable.assert_called_once_with([
mock.add_row.assert_has_calls([call([0, 1, 2, '0.5', '3.0 MiB', 'never', 'name'])]) 'LINK', 'SEED', 'LEECH', 'RATIO',
'SIZE', 'UPLOAD', 'NAME'])
mock.add_row.assert_has_calls([
call([0, 1, 2, '0.5', '3.0 MiB', 'never', 'name'])])
def test_print_results_local(self): def test_print_results_local(self):
class MockTable: class MockTable:
@ -38,19 +43,22 @@ class TestPrint(unittest.TestCase):
mock = MockTable() mock = MockTable()
printer = Printer(False) printer = Printer(False)
printer.print = MagicMock() printer.print = MagicMock()
with patch('veryprettytable.VeryPrettyTable', return_value=mock) as prettytable: with patch('veryprettytable.VeryPrettyTable',
return_value=mock) as prettytable:
results = [{ results = [{
'magnet': 'dn=name', 'name': 'name1',
'date': '1', 'date': '1',
'size': '1', 'size': '1',
}, { }, {
'magnet': 'dn=name2', 'name': 'name2',
'date': '2', 'date': '2',
'size': '2', 'size': '2',
}] }]
printer.search_results(results, local=True) printer.search_results(results, local=True)
prettytable.assert_called_once_with(['LINK', 'DATE', 'SIZE', 'NAME']) prettytable.assert_called_once_with(
mock.add_row.assert_has_calls([call([0, '1', '1', 'name']), call([1, '2', '2', 'name2'])]) ['LINK', 'DATE', 'SIZE', 'NAME'])
mock.add_row.assert_has_calls(
[call([0, '1', '1', 'name1']), call([1, '2', '2', 'name2'])])
def test_print_color(self): def test_print_color(self):
printer = Printer(False) printer = Printer(False)
@ -69,57 +77,77 @@ class TestPrint(unittest.TestCase):
mock = MockTable() mock = MockTable()
printer = Printer(True) printer = Printer(True)
printer.print = MagicMock() printer.print = MagicMock()
with patch('veryprettytable.VeryPrettyTable', return_value=mock) as prettytable: with patch('veryprettytable.VeryPrettyTable',
return_value=mock) as prettytable:
results = [{ results = [{
'magnet': 'dn=name', 'name': 'name1',
'date': '1', 'date': '1',
'size': '1', 'size': '1',
}, { }, {
'magnet': 'dn=name2', 'name': 'name2',
'date': '2', 'date': '2',
'size': '2', 'size': '2',
}] }]
printer.search_results(results, local=True) printer.search_results(results, local=True)
prettytable.assert_called_once_with(['LINK', 'DATE', 'SIZE', 'NAME']) prettytable.assert_called_once_with(
mock.add_row.assert_has_calls([call([0, '1', '1', 'name']), call([1, '2', '2', 'name2'], fore_color='blue')]) ['LINK', 'DATE', 'SIZE', 'NAME'])
mock.add_row.assert_has_calls([
call([0, '1', '1', 'name1']),
call([1, '2', '2', 'name2'], fore_color='blue')])
def test_print_descriptions(self): def test_print_descriptions(self):
printer = Printer(False) printer = Printer(False)
printer.print = MagicMock() printer.print = MagicMock()
class MockRequest(): class MockRequest():
add_header = MagicMock() add_header = MagicMock()
request_obj = MockRequest() request_obj = MockRequest()
class MockResponse(): class MockResponse():
read = MagicMock(return_value='<html><div class="nfo"><pre>stuff <a href="href">link</a></pre></div></html>'.encode('utf8')) read = MagicMock(return_value=json.dumps(
{'name': 'cool torrent',
'descr': 'A fake torrent.\n'}))
info = MagicMock() info = MagicMock()
response_obj = MockResponse() response_obj = MockResponse()
class MockOpener():
open = MagicMock(return_value=response_obj) with patch('urllib.request.Request', return_value=request_obj):
add_handler = MagicMock() with patch('urllib.request.urlopen',
opener_obj = MockOpener() return_value=response_obj):
with patch('urllib.request.Request', return_value=request_obj) as request: printer.descriptions([0], [{'id': '1', 'name': 'name'}],
with patch('urllib.request.OpenerDirector', return_value=opener_obj) as opener: 'example.com', 9)
printer.descriptions([0], [{'id': '1', 'magnet': 'dn=name'}], 'example.com') printer.print.assert_has_calls([
printer.print.assert_has_calls([call('Description for "name":', color='zebra_1'),call('stuff [link](href)', color='zebra_0')]) call('Description for "name":', color='zebra_1'),
call('A fake torrent.\n', color='zebra_0')])
def test_print_file_lists(self): def test_print_file_lists(self):
printer = Printer(False) printer = Printer(False)
printer.print = MagicMock() printer.print = MagicMock()
class MockRequest(): class MockRequest():
add_header = MagicMock() add_header = MagicMock()
info = MagicMock()
request_obj = MockRequest() request_obj = MockRequest()
class MockResponse(): class MockResponse():
read = MagicMock(return_value='<html><tr><td align="left">1.</td><td align="right">filename</tr></html>'.encode('utf8')) read = MagicMock(return_value=json.dumps(
[{'name': ['readme.txt'], 'size': [16]},
{'name': ['a.mkv'], 'size': [677739464]},
{'name': ['b.nfo'], 'size': [61]}]))
info = MagicMock() info = MagicMock()
response_obj = MockResponse() response_obj = MockResponse()
class MockOpener():
open = MagicMock(return_value=response_obj) with patch('urllib.request.Request',
add_handler = MagicMock() return_value=request_obj):
opener_obj = MockOpener() with patch('urllib.request.urlopen',
with patch('urllib.request.Request', return_value=request_obj) as request: return_value=response_obj):
with patch('urllib.request.OpenerDirector', return_value=opener_obj) as opener: printer.file_lists([0], [{'id': '1', 'name': 'name'}],
printer.file_lists([0], [{'id': '1', 'magnet': 'dn=name'}], 'example.com') 'example.com', 9)
printer.print.assert_has_calls([call('Files in "name":', color='zebra_1'),call(' 1. filename', color='zebra_0')]) printer.print.assert_has_calls([
call('Files in name:', color='zebra_1'),
call(' 16 B readme.txt', color='zebra_0'),
call(' 646.3 MiB a.mkv', color='zebra_1'),
call(' 61 B b.nfo', color='zebra_0')])
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()

View File

@ -2,48 +2,34 @@
import unittest import unittest
from unittest import mock from unittest import mock
from unittest.mock import patch, MagicMock from unittest.mock import patch, MagicMock
import os
import io import io
import urllib import urllib
import json
import pirate.torrent import pirate.torrent
import pirate.data import pirate.data
from pirate.print import Printer from pirate.print import Printer
from tests import util from tests import util
class TestTorrent(unittest.TestCase): class TestTorrent(unittest.TestCase):
def test_no_hits(self): def test_no_hits(self):
res = util.read_data('no_hits.html')
actual = pirate.torrent.parse_page(res)
expected = [] expected = []
with util.open_data('no_hits.json') as res:
actual = pirate.torrent.parse_page(res)
self.assertEqual(actual, expected) self.assertEqual(actual, expected)
def test_blocked_mirror(self): def test_blocked_mirror(self):
res = util.read_data('blocked.html') with util.open_data('blocked.html') as res:
with self.assertRaises(IOError): with self.assertRaises(IOError):
pirate.torrent.parse_page(res) pirate.torrent.parse_page(res)
def test_search_results(self): def test_search_results(self):
res = util.read_data('dan_bull_search.html') with util.open_data('result.json') as file:
expected = json.load(file)
with util.open_data('debian_iso.json') as res:
actual = pirate.torrent.parse_page(res) actual = pirate.torrent.parse_page(res)
expected = [
{'uploaded': '04-04\xa02014', 'seeds': '16', 'leechers': '1', 'id': '9890864', 'magnet': 'magnet:?xt=urn:btih:30df4f8b42b8fd77f5e5aa34abbffe97f5e81fbf&dn=Dan+Croll+%26bull%3B+Sweet+Disarray+%5B2014%5D+320&tr=udp%3A%2F%2Ftracker.openbittorrent.com%3A80&tr=udp%3A%2F%2Fopen.demonii.com%3A1337&tr=udp%3A%2F%2Ftracker.coppersurfer.tk%3A6969&tr=udp%3A%2F%2Fexodus.desync.com%3A6969', 'size': ['89.33', 'MiB']},
{'uploaded': '03-02\xa02014', 'seeds': '4', 'leechers': '0', 'id': '9684858', 'magnet': 'magnet:?xt=urn:btih:7abd3eda600996b8e6fc9a61b83288e0c6ac0d83&dn=Dan+Bull+-+Massive+Collection&tr=udp%3A%2F%2Ftracker.openbittorrent.com%3A80&tr=udp%3A%2F%2Fopen.demonii.com%3A1337&tr=udp%3A%2F%2Ftracker.coppersurfer.tk%3A6969&tr=udp%3A%2F%2Fexodus.desync.com%3A6969', 'size': ['294', 'MiB']},
{'uploaded': '01-19\xa02013', 'seeds': '2', 'leechers': '0', 'id': '8037968', 'magnet': 'magnet:?xt=urn:btih:8f8d68fd0a51237c89692c428ed8a8f64a969c70&dn=Dan+Bull+-+Generation+Gaming+-+2013&tr=udp%3A%2F%2Ftracker.openbittorrent.com%3A80&tr=udp%3A%2F%2Fopen.demonii.com%3A1337&tr=udp%3A%2F%2Ftracker.coppersurfer.tk%3A6969&tr=udp%3A%2F%2Fexodus.desync.com%3A6969', 'size': ['54.86', 'MiB']},
{'uploaded': '01-21\xa02010', 'seeds': '1', 'leechers': '0', 'id': '5295449', 'magnet': 'magnet:?xt=urn:btih:3da6a0fdc1d67a768cb32597e926abdf3e1a2fdd&dn=Dan+Bull+Collection&tr=udp%3A%2F%2Ftracker.openbittorrent.com%3A80&tr=udp%3A%2F%2Fopen.demonii.com%3A1337&tr=udp%3A%2F%2Ftracker.coppersurfer.tk%3A6969&tr=udp%3A%2F%2Fexodus.desync.com%3A6969', 'size': ['236.78', 'MiB']},
{'uploaded': '09-02\xa02014', 'seeds': '1', 'leechers': '0', 'id': '10954408', 'magnet': 'magnet:?xt=urn:btih:5cd371a235317319db7da52c64422f9c2ac75d77&dn=Dan+Bull+-+The+Garden+%7B2014-Album%7D&tr=udp%3A%2F%2Ftracker.openbittorrent.com%3A80&tr=udp%3A%2F%2Fopen.demonii.com%3A1337&tr=udp%3A%2F%2Ftracker.coppersurfer.tk%3A6969&tr=udp%3A%2F%2Fexodus.desync.com%3A6969', 'size': ['36.27', 'MiB']},
{'uploaded': '09-27\xa02009', 'seeds': '0', 'leechers': '1', 'id': '5101630', 'magnet': 'magnet:?xt=urn:btih:4e14dbd077c920875be4c15971b23b609ad6716a&dn=Dan+Bull+-+Dear+Lily+%5Ban+open+letter+to+Lily+Allen%5D+-+2009%5BMP3+%40&tr=udp%3A%2F%2Ftracker.openbittorrent.com%3A80&tr=udp%3A%2F%2Fopen.demonii.com%3A1337&tr=udp%3A%2F%2Ftracker.coppersurfer.tk%3A6969&tr=udp%3A%2F%2Fexodus.desync.com%3A6969', 'size': ['5.51', 'MiB']},
{'uploaded': '11-29\xa02009', 'seeds': '0', 'leechers': '0', 'id': '5185893', 'magnet': 'magnet:?xt=urn:btih:5d9319cf852f7462422cb1bffc37b65174645047&dn=Dan+Bull+-+Dear+Mandy+%5Ban+open+letter+to+Lord+Mandelson%5D&tr=udp%3A%2F%2Ftracker.openbittorrent.com%3A80&tr=udp%3A%2F%2Fopen.demonii.com%3A1337&tr=udp%3A%2F%2Ftracker.coppersurfer.tk%3A6969&tr=udp%3A%2F%2Fexodus.desync.com%3A6969', 'size': ['5.07', 'MiB']},
{'uploaded': '11-10\xa02011', 'seeds': '0', 'leechers': '0', 'id': '6806996', 'magnet': 'magnet:?xt=urn:btih:1c54af57426f53fdef4bbf1a9dbddf32f7b4988a&dn=Dan+Bull+-+Dear+Lily+%28Lily+Allen%29+%28Song+about+filesharing%29&tr=udp%3A%2F%2Ftracker.openbittorrent.com%3A80&tr=udp%3A%2F%2Fopen.demonii.com%3A1337&tr=udp%3A%2F%2Ftracker.coppersurfer.tk%3A6969&tr=udp%3A%2F%2Fexodus.desync.com%3A6969', 'size': ['5.34', 'MiB']},
{'uploaded': '12-20\xa02011', 'seeds': '0', 'leechers': '0', 'id': '6901871', 'magnet': 'magnet:?xt=urn:btih:942c5bf3e1e9bc263939e13cea6ad7bd5f62aa36&dn=Dan+Bull+-+SOPA+Cabana.mp3&tr=udp%3A%2F%2Ftracker.openbittorrent.com%3A80&tr=udp%3A%2F%2Fopen.demonii.com%3A1337&tr=udp%3A%2F%2Ftracker.coppersurfer.tk%3A6969&tr=udp%3A%2F%2Fexodus.desync.com%3A6969', 'size': ['4.8', 'MiB']},
{'uploaded': '12-21\xa02011', 'seeds': '0', 'leechers': '1', 'id': '6902247', 'magnet': 'magnet:?xt=urn:btih:d376f68a31b0db652234e790ed7256ac5e32db57&dn=Dan+Bull+-+SOPA+Cabana&tr=udp%3A%2F%2Ftracker.openbittorrent.com%3A80&tr=udp%3A%2F%2Fopen.demonii.com%3A1337&tr=udp%3A%2F%2Ftracker.coppersurfer.tk%3A6969&tr=udp%3A%2F%2Fexodus.desync.com%3A6969', 'size': ['3.4', 'MiB']},
{'uploaded': '12-21\xa02011', 'seeds': '0', 'leechers': '1', 'id': '6903548', 'magnet': 'magnet:?xt=urn:btih:28163770a532eb24b9e0865878288a9bbdb7a5e6&dn=Dan+Bull+-+SOPA+Cabana+%5BWORKING%5D&tr=udp%3A%2F%2Ftracker.openbittorrent.com%3A80&tr=udp%3A%2F%2Fopen.demonii.com%3A1337&tr=udp%3A%2F%2Ftracker.coppersurfer.tk%3A6969&tr=udp%3A%2F%2Fexodus.desync.com%3A6969', 'size': ['4.8', 'MiB']},
{'uploaded': '03-09\xa02012', 'seeds': '0', 'leechers': '1', 'id': '7088979', 'magnet': 'magnet:?xt=urn:btih:779ab0f13a3fbb12ba68b27721491e4d143f26eb&dn=Dan+Bull+-+Bye+Bye+BPI+2012++%5BMP3%40192%5D%28oan%29&tr=udp%3A%2F%2Ftracker.openbittorrent.com%3A80&tr=udp%3A%2F%2Fopen.demonii.com%3A1337&tr=udp%3A%2F%2Ftracker.coppersurfer.tk%3A6969&tr=udp%3A%2F%2Fexodus.desync.com%3A6969', 'size': ['60.72', 'MiB']},
{'uploaded': '10-24\xa02012', 'seeds': '0', 'leechers': '0', 'id': '7756344', 'magnet': 'magnet:?xt=urn:btih:2667e4795bd5c868dedcabcb52943f4bb7212bab&dn=Dan+Bull+-+Dishonored+%5BExplicit+ver.%5D+%28Single+2012%29&tr=udp%3A%2F%2Ftracker.openbittorrent.com%3A80&tr=udp%3A%2F%2Fopen.demonii.com%3A1337&tr=udp%3A%2F%2Ftracker.coppersurfer.tk%3A6969&tr=udp%3A%2F%2Fexodus.desync.com%3A6969', 'size': ['6.29', 'MiB']},
{'uploaded': '11-10\xa02012', 'seeds': '0', 'leechers': '0', 'id': '7812951', 'magnet': 'magnet:?xt=urn:btih:16364f83c556ad0fd3bb57a4a7c890e7e8087414&dn=Halo+4+EPIC+Rap+By+Dan+Bull&tr=udp%3A%2F%2Ftracker.openbittorrent.com%3A80&tr=udp%3A%2F%2Fopen.demonii.com%3A1337&tr=udp%3A%2F%2Ftracker.coppersurfer.tk%3A6969&tr=udp%3A%2F%2Fexodus.desync.com%3A6969', 'size': ['6.41', 'MiB']},
{'uploaded': '01-19\xa02013', 'seeds': '0', 'leechers': '1', 'id': '8037899', 'magnet': 'magnet:?xt=urn:btih:843b466d9fd1f0bee3a476573b272dc2d6d0ebae&dn=Dan+Bull+-+Generation+Gaming+-+2013&tr=udp%3A%2F%2Ftracker.openbittorrent.com%3A80&tr=udp%3A%2F%2Fopen.demonii.com%3A1337&tr=udp%3A%2F%2Ftracker.coppersurfer.tk%3A6969&tr=udp%3A%2F%2Fexodus.desync.com%3A6969', 'size': ['54.87', 'MiB']}
]
self.assertEqual(actual, expected) self.assertEqual(actual, expected)
def test_parse_category(self): def test_parse_category(self):
@ -92,51 +78,74 @@ class TestTorrent(unittest.TestCase):
@patch('pirate.torrent.get_torrent') @patch('pirate.torrent.get_torrent')
def test_save_torrents(self, get_torrent): def test_save_torrents(self, get_torrent):
with patch('pirate.torrent.open', mock.mock_open(), create=True) as open_: with patch('pirate.torrent.open',
magnet = 'magnet:?xt=urn:btih:335fcd3cfbecc85554616d73de888033c6c16d37&dn=Test+Drive+Unl\im/ited+%5BPC+Version%5D&tr=udp%3A%2F%2Ftracker.openbittorrent.com%3A80&tr=udp%3A%2F%2Fopen.demonii.com%3A1337&tr=udp%3A%2F%2Ftracker.coppersurfer.tk%3A6969&tr=udp%3A%2F%2Fexodus.desync.com%3A6969' mock.mock_open(), create=True) as open_:
pirate.torrent.save_torrents(MagicMock(Printer), [0], [{'magnet':magnet}], 'path') pirate.torrent.save_torrents(
get_torrent.assert_called_once_with(293294978876299923284263767676068334936407502135) MagicMock(Printer), [0],
open_.assert_called_once_with('path/Test Drive Unl_im_ited [PC Version].torrent', 'wb') [{'name': 'cool torrent',
'info_hash': 3735928559,
'magnet': 'magnet:?xt=urn:btih:deadbeef'}], 'path', 9)
get_torrent.assert_called_once_with(3735928559, 9)
open_.assert_called_once_with('path/cool torrent.torrent', 'wb')
@patch('pirate.torrent.get_torrent', side_effect=urllib.error.HTTPError('', '', '', '', io.StringIO())) @patch('pirate.torrent.get_torrent',
side_effect=urllib.error.HTTPError('', '', '', '', io.StringIO()))
def test_save_torrents_fail(self, get_torrent): def test_save_torrents_fail(self, get_torrent):
magnet = 'magnet:?xt=urn:btih:335fcd3cfbecc85554616d73de888033c6c16d37&dn=Test+Drive+Unlimited+%5BPC+Version%5D&tr=udp%3A%2F%2Ftracker.openbittorrent.com%3A80&tr=udp%3A%2F%2Fopen.demonii.com%3A1337&tr=udp%3A%2F%2Ftracker.coppersurfer.tk%3A6969&tr=udp%3A%2F%2Fexodus.desync.com%3A6969' pirate.torrent.save_torrents(
pirate.torrent.save_torrents(MagicMock(Printer), [0], [{'magnet':magnet}], 'path') MagicMock(Printer), [0],
[{'name': 'cool torrent',
'info_hash': 3735928559,
'magnet': 'magnet:?xt=urn:btih:deadbeef'}], 'path', 9)
def test_save_magnets(self): def test_save_magnets(self):
with patch('pirate.torrent.open', mock.mock_open(), create=True) as open_: with patch('pirate.torrent.open',
magnet = 'magnet:?xt=urn:btih:335fcd3cfbecc85554616d73de888033c6c16d37&dn=Test+Drive+Unl\im/ited+%5BPC+Version%5D&tr=udp%3A%2F%2Ftracker.openbittorrent.com%3A80&tr=udp%3A%2F%2Fopen.demonii.com%3A1337&tr=udp%3A%2F%2Ftracker.coppersurfer.tk%3A6969&tr=udp%3A%2F%2Fexodus.desync.com%3A6969' mock.mock_open(), create=True) as open_:
pirate.torrent.save_magnets(MagicMock(Printer), [0], [{'magnet':magnet}], 'path') pirate.torrent.save_magnets(
open_.assert_called_once_with('path/Test Drive Unl_im_ited [PC Version].magnet', 'w') MagicMock(Printer), [0],
[{'name': 'cool torrent',
'info_hash': 3735928559,
'magnet': 'magnet:?xt=urn:btih:deadbeef'}], 'path')
open_.assert_called_once_with('path/cool torrent.magnet', 'w')
@patch('urllib.request.urlopen') @patch('urllib.request.urlopen')
def test_get_torrent(self, urlopen): def test_get_torrent(self, urlopen):
class MockRequest(): class MockRequest():
add_header = mock.MagicMock() add_header = mock.MagicMock()
request_obj = MockRequest() request_obj = MockRequest()
with patch('urllib.request.Request', return_value=request_obj) as request: with patch('urllib.request.Request', return_value=request_obj) as req:
pirate.torrent.get_torrent(100000000000000) pirate.torrent.get_torrent(100000000000000, 9)
request.assert_called_once_with('http://itorrents.org/torrent/5AF3107A4000.torrent', headers=pirate.data.default_headers) req.assert_called_once_with(
urlopen.assert_called_once_with(request_obj, timeout=pirate.data.default_timeout) 'http://itorrents.org/torrent/5AF3107A4000.torrent',
headers=pirate.data.default_headers)
urlopen.assert_called_once_with(
request_obj,
timeout=9)
def test_remote(self): def test_remote(self):
class MockRequest(): class MockRequest():
add_header = mock.MagicMock() add_header = mock.MagicMock()
request_obj = MockRequest() req_obj = MockRequest()
class MockInfo():
get_content_type = mock.MagicMock(return_value='application/json')
get = mock.MagicMock()
class MockResponse(): class MockResponse():
read = mock.MagicMock(return_value=b'<html>No hits. Try adding an asterisk in you search phrase.</html>') read = mock.MagicMock(return_value=b'[]')
info = mock.MagicMock() info = mock.MagicMock(return_value=MockInfo())
response_obj = MockResponse() res_obj = MockResponse()
class MockOpener():
open = mock.MagicMock(return_value=response_obj) with patch('urllib.request.Request', return_value=req_obj) as req:
add_handler = mock.MagicMock() with patch('urllib.request.urlopen', return_value=res_obj) as res:
opener_obj = MockOpener() results = pirate.torrent.remote(
with patch('urllib.request.Request', return_value=request_obj) as request: MagicMock(Printer), 100, 10, 'browse',
with patch('urllib.request.OpenerDirector', return_value=opener_obj) as opener: [], 'http://example.com', 9)
res = pirate.torrent.remote(MagicMock(Printer), 1, 100, 10, 'browse', [], 'http://example.com') req.assert_called_once_with(
request.assert_called_once_with('http://example.com/browse/100/0/10', headers=pirate.data.default_headers) 'http://example.com/precompiled/data_top100_100.json',
opener_obj.open.assert_called_once_with(request_obj, timeout=pirate.data.default_timeout) headers=pirate.data.default_headers)
self.assertEqual(res, []) res.assert_called_once_with(req_obj, timeout=9)
self.assertEqual(results, [])
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()

View File

@ -3,6 +3,5 @@ import os
def data_path(name): def data_path(name):
return os.path.join(os.path.dirname(os.path.realpath(__file__)), 'data', name) return os.path.join(os.path.dirname(os.path.realpath(__file__)), 'data', name)
def read_data(name): def open_data(name):
with open(data_path(name)) as f: return open(data_path(name))
return f.read()