diff --git a/pirate/pirate.py b/pirate/pirate.py index 258edd3..e657573 100755 --- a/pirate/pirate.py +++ b/pirate/pirate.py @@ -7,6 +7,7 @@ import socket import urllib.request as request import urllib.error import sys +from collections import OrderedDict import webbrowser @@ -182,12 +183,12 @@ def combine_configs(config, args): args.action = 'browse' elif args.recent: args.action = 'recent' - elif len(args.search) == 0: - args.action = 'top' elif args.list_categories: args.action = 'list_categories' elif args.list_sorts: args.action = 'list_sorts' + elif len(args.search) == 0: + args.action = 'top' else: args.action = 'search' @@ -226,8 +227,9 @@ def combine_configs(config, args): return args -def search_mirrors(args): - mirrors = {'https://thepiratebay.mn'} +def search_mirrors(pages, category, sort, action, search): + mirrors = OrderedDict() + mirrors['https://thepiratebay.mn'] = None try: req = request.Request('https://proxybay.co/list.txt', headers=pirate.data.default_headers) @@ -237,19 +239,21 @@ def search_mirrors(args): else: if f.getcode() != 200: raise IOError('The proxy bay responded with an error.') - mirrors = mirrors.union([i.decode('utf-8').strip() - for i in f.readlines()][3:] - ).difference(pirate.data.blacklist) + for mirror in [i.decode('utf-8').strip() for i in f.readlines()][3:]: + mirrors[mirror] = None + for mirror in pirate.data.blacklist: + if mirror in mirrors: + del mirrors[mirror] - for mirror in mirrors: + for mirror in mirrors.keys(): try: print('Trying', mirror, end='... \n') results = pirate.torrent.remote( - pages=args.pages, - category=pirate.torrent.parse_category(args.category), - sort=pirate.torrent.parse_sort(args.sort), - mode=args.action, - terms=args.search, + pages=pages, + category=pirate.torrent.parse_category(category), + sort=pirate.torrent.parse_sort(sort), + mode=action, + terms=search, mirror=mirror ) except (urllib.error.URLError, socket.timeout, @@ -296,7 +300,7 @@ def main(): if args.source == 'local_tpb': results = pirate.local.search(args.database, args.search) elif args.source == 'tpb': - results, site = search_mirrors(args) + results, site = search_mirrors(args.pages, args.category, args.sort, args.action, args.search) if len(results) == 0: print('No results') diff --git a/tests/test_pirate.py b/tests/test_pirate.py index fcf567c..5f6d45c 100755 --- a/tests/test_pirate.py +++ b/tests/test_pirate.py @@ -1,5 +1,9 @@ #!/usr/bin/env python3 +import socket import unittest +from unittest import mock +from unittest.mock import patch, call + import pirate.pirate @@ -83,19 +87,48 @@ class TestPirate(unittest.TestCase): def test_parse_args(self): tests = [ - (['-b'], {'action': 'browse'}), - ([], {'action': 'top'}), - (['-R'], {'action': 'recent'}), - (['internets'], {'action': 'search', 'search': ['internets']}), - (['internets lol', 'lel'], {'action': 'search', 'search': ['internets lol', 'lel']}), + ('', ['-b'], {'action': 'browse'}), + ('', [], {'action': 'top'}), + ('', ['-R'], {'action': 'recent'}), + ('', ['-l'], {'action': 'list_categories'}), + ('', ['--list_sorts'], {'action': 'list_sorts'}), + ('', ['term'], {'action': 'search', 'source': 'tpb'}), + ('', ['-L', 'filename', 'term'], {'action': 'search', 'source': 'local_tpb', 'database': 'filename'}), + ('', ['term', '-S', 'dir'], {'action': 'search', 'save_directory': 'dir'}), + ('', ['-P', '1337'], {'transmission_command': ['transmission-remote', '1337']}), + ('', ['term'], {'output': 'browser_open'}), + ('', ['term', '-t'], {'output': 'transmission'}), + ('', ['term', '--save-magnets'], {'output': 'save_magnet_files'}), + ('', ['term', '--save-torrents'], {'output': 'save_torrent_files'}), + ('', ['term', '-C', 'command'], {'output': 'open_command', 'open_command': 'command'}), + ('', ['internets'], {'action': 'search', 'search': ['internets']}), + ('', ['internets lol', 'lel'], {'action': 'search', 'search': ['internets lol', 'lel']}), ] for test in tests: - args = pirate.pirate.parse_args(test[0]) - config = pirate.pirate.parse_config_file('') + args = pirate.pirate.parse_args(test[1]) + config = pirate.pirate.parse_config_file(test[0]) args = pirate.pirate.combine_configs(config, args) - for option in test[1].keys(): + for option in test[2].keys(): value = getattr(args, option) - self.assertEqual(test[1][option], value) + self.assertEqual(test[2][option], value) + + def test_search_mirrors(self): + pages, category, sort, action, search = (1, 100, 10, 'browse', []) + class MockResponse(): + readlines = mock.MagicMock(return_value=[x.encode('utf-8') for x in ['', '', '', 'https://example.com']]) + info = mock.MagicMock() + getcode = mock.MagicMock(return_value=200) + response_obj = MockResponse() + with patch('urllib.request.urlopen', return_value=response_obj) as urlopen: + with patch('pirate.torrent.remote', return_value=[]) as remote: + results, mirror = pirate.pirate.search_mirrors(pages, category, sort, action, search) + remote.assert_called_once_with(pages=1, category=100, sort=10, mode='browse', terms=[], mirror='https://thepiratebay.mn') + with patch('pirate.torrent.remote', side_effect=[socket.timeout, []]) as remote: + results, mirror = pirate.pirate.search_mirrors(pages, category, sort, action, search) + remote.assert_has_calls([ + call(pages=1, category=100, sort=10, mode='browse', terms=[], mirror='https://thepiratebay.mn'), + call(pages=1, category=100, sort=10, mode='browse', terms=[], mirror='https://example.com') + ]) if __name__ == '__main__': unittest.main() diff --git a/tests/test_torrent.py b/tests/test_torrent.py index 137e2df..d8fb4ad 100755 --- a/tests/test_torrent.py +++ b/tests/test_torrent.py @@ -109,15 +109,30 @@ class TestTorrent(unittest.TestCase): pirate.torrent.save_magnets([0], [{'magnet':magnet}], 'path') open_.assert_called_once_with('path/Test Drive Unlimited [PC Version].magnet', 'w') - def test_get_torrent(self): - with patch('urllib.request.urlopen') as urlopen: - class MockResponse(): - add_header = mock.MagicMock() - response = MockResponse() - with patch('urllib.request.Request', return_value=response) as request: - pirate.torrent.get_torrent(100000000000000) - request.assert_called_once_with('http://torcache.net/torrent/5AF3107A4000.torrent', headers=pirate.data.default_headers) - urlopen.assert_called_once_with(response, timeout=pirate.data.default_timeout) + @patch('urllib.request.urlopen') + def test_get_torrent(self, urlopen): + class MockRequest(): + add_header = mock.MagicMock() + request_obj = MockRequest() + with patch('urllib.request.Request', return_value=request_obj) as request: + pirate.torrent.get_torrent(100000000000000) + request.assert_called_once_with('http://torcache.net/torrent/5AF3107A4000.torrent', headers=pirate.data.default_headers) + urlopen.assert_called_once_with(request_obj, timeout=pirate.data.default_timeout) + + def test_remote(self): + class MockRequest(): + add_header = mock.MagicMock() + request_obj = MockRequest() + class MockResponse(): + read = mock.MagicMock(return_value='No hits. Try adding an asterisk in you search phrase.'.encode('utf8')) + info = mock.MagicMock() + response_obj = MockResponse() + with patch('urllib.request.Request', return_value=request_obj) as request: + with patch('urllib.request.urlopen', return_value=response_obj) as urlopen: + res = pirate.torrent.remote(1, 100, 10, 'browse', [], 'http://example.com') + request.assert_called_once_with('http://example.com/browse/100/0/10', headers=pirate.data.default_headers) + urlopen.assert_called_once_with(request_obj, timeout=pirate.data.default_timeout) + self.assertEqual(res, []) if __name__ == '__main__': unittest.main()