1
0
mirror of https://github.com/vikstrous/pirate-get synced 2025-01-25 12:24:20 +01:00

tidy up search_mirrors function

This commit is contained in:
rnhmjoj 2016-08-31 21:48:25 +02:00
parent 00aad06acd
commit 2f40523db2
No known key found for this signature in database
GPG Key ID: 362BB82B7E496B7C
3 changed files with 23 additions and 13 deletions

View File

@ -11,3 +11,6 @@ blacklist = set(json.loads(get_resource('blacklist.json').decode()))
default_headers = {'User-Agent': 'pirate get'} default_headers = {'User-Agent': 'pirate get'}
default_timeout = 10 default_timeout = 10
default_mirror = 'https://thepiratebay.org/'
mirror_list = 'https://proxybay.co/list.txt'

View File

@ -241,6 +241,7 @@ def connect_mirror(mirror, printer, pages, category, sort, action, search):
mirror=mirror) mirror=mirror)
except (urllib.error.URLError, socket.timeout, IOError, ValueError): except (urllib.error.URLError, socket.timeout, IOError, ValueError):
printer.print('Failed', color='WARN') printer.print('Failed', color='WARN')
return None
else: else:
printer.print('Ok', color='alt') printer.print('Ok', color='alt')
return results, mirror return results, mirror
@ -248,21 +249,21 @@ def connect_mirror(mirror, printer, pages, category, sort, action, search):
def search_mirrors(printer, *args): def search_mirrors(printer, *args):
# try official site # try official site
result = connect_mirror('https://thepiratebay.mn', printer, *args) result = connect_mirror(pirate.data.default_mirror, printer, *args)
if result: if result:
return result return result
# download mirror list # download mirror list
try: try:
req = request.Request('https://proxybay.co/list.txt', req = request.Request(pirate.data.mirror_list,
headers=pirate.data.default_headers) headers=pirate.data.default_headers)
f = request.urlopen(req, timeout=pirate.data.default_timeout) f = request.urlopen(req, timeout=pirate.data.default_timeout)
except IOError: except urllib.error.URLError as e:
printer.print('Could not fetch mirrors :(', color='ERROR') raise IOError('Could not fetch mirrors', e.reason)
sys.exit(1)
if f.getcode() != 200: if f.getcode() != 200:
raise IOError('The proxy bay responded with an error') raise IOError('The proxy bay responded with an error',
f.read().decode('utf-8'))
mirrors = [i.decode('utf-8').strip() for i in f.readlines()][3:] mirrors = [i.decode('utf-8').strip() for i in f.readlines()][3:]
@ -274,8 +275,7 @@ def search_mirrors(printer, *args):
if result: if result:
return result return result
else: else:
printer.print('No more available mirrors :(', color='ERROR') raise IOError('No more available mirrors')
sys.exit(1)
def pirate_main(args): def pirate_main(args):
@ -311,8 +311,14 @@ def pirate_main(args):
if args.source == 'local_tpb': if args.source == 'local_tpb':
results = pirate.local.search(args.database, args.search) results = pirate.local.search(args.database, args.search)
elif args.source == 'tpb': elif args.source == 'tpb':
results, site = search_mirrors(printer, args.pages, args.category, try:
args.sort, args.action, args.search) results, site = search_mirrors(printer, args.pages, args.category,
args.sort, args.action, args.search)
except IOError as e:
printer.print(e.args[0] + ' :( ', color='ERROR')
if len(e.args) > 1:
printer.print(e.args[1])
sys.exit(1)
if len(results) == 0: if len(results) == 0:
printer.print('No results') printer.print('No results')

View File

@ -6,6 +6,7 @@ from unittest import mock
from unittest.mock import patch, call, MagicMock from unittest.mock import patch, call, MagicMock
import pirate.pirate import pirate.pirate
import pirate.data
from pirate.print import Printer from pirate.print import Printer
@ -157,14 +158,14 @@ class TestPirate(unittest.TestCase):
with patch('pirate.torrent.remote', return_value=[]) as remote: with patch('pirate.torrent.remote', return_value=[]) as remote:
results, mirror = pirate.pirate.search_mirrors(printer, pages, category, sort, action, search) results, mirror = pirate.pirate.search_mirrors(printer, pages, category, sort, action, search)
self.assertEqual(results, []) self.assertEqual(results, [])
self.assertEqual(mirror, 'https://thepiratebay.mn') self.assertEqual(mirror, pirate.data.default_mirror)
remote.assert_called_once_with(printer=printer, pages=1, category=100, sort=10, mode='browse', terms=[], mirror='https://thepiratebay.mn') remote.assert_called_once_with(printer=printer, pages=1, category=100, sort=10, mode='browse', terms=[], mirror=pirate.data.default_mirror)
with patch('pirate.torrent.remote', side_effect=[socket.timeout, []]) as remote: with patch('pirate.torrent.remote', side_effect=[socket.timeout, []]) as remote:
results, mirror = pirate.pirate.search_mirrors(printer, pages, category, sort, action, search) results, mirror = pirate.pirate.search_mirrors(printer, pages, category, sort, action, search)
self.assertEqual(results, []) self.assertEqual(results, [])
self.assertEqual(mirror, 'https://example.com') self.assertEqual(mirror, 'https://example.com')
remote.assert_has_calls([ remote.assert_has_calls([
call(printer=printer, pages=1, category=100, sort=10, mode='browse', terms=[], mirror='https://thepiratebay.mn'), call(printer=printer, pages=1, category=100, sort=10, mode='browse', terms=[], mirror=pirate.data.default_mirror),
call(printer=printer, pages=1, category=100, sort=10, mode='browse', terms=[], mirror='https://example.com') call(printer=printer, pages=1, category=100, sort=10, mode='browse', terms=[], mirror='https://example.com')
]) ])