mirror of
https://github.com/vikstrous/pirate-get
synced 2025-01-25 12:24:20 +01:00
commit
3d0707f3a7
@ -7,6 +7,7 @@ import socket
|
|||||||
import urllib.request as request
|
import urllib.request as request
|
||||||
import urllib.error
|
import urllib.error
|
||||||
import sys
|
import sys
|
||||||
|
from collections import OrderedDict
|
||||||
|
|
||||||
import webbrowser
|
import webbrowser
|
||||||
|
|
||||||
@ -182,12 +183,12 @@ def combine_configs(config, args):
|
|||||||
args.action = 'browse'
|
args.action = 'browse'
|
||||||
elif args.recent:
|
elif args.recent:
|
||||||
args.action = 'recent'
|
args.action = 'recent'
|
||||||
elif len(args.search) == 0:
|
|
||||||
args.action = 'top'
|
|
||||||
elif args.list_categories:
|
elif args.list_categories:
|
||||||
args.action = 'list_categories'
|
args.action = 'list_categories'
|
||||||
elif args.list_sorts:
|
elif args.list_sorts:
|
||||||
args.action = 'list_sorts'
|
args.action = 'list_sorts'
|
||||||
|
elif len(args.search) == 0:
|
||||||
|
args.action = 'top'
|
||||||
else:
|
else:
|
||||||
args.action = 'search'
|
args.action = 'search'
|
||||||
|
|
||||||
@ -226,8 +227,9 @@ def combine_configs(config, args):
|
|||||||
return args
|
return args
|
||||||
|
|
||||||
|
|
||||||
def search_mirrors(args):
|
def search_mirrors(pages, category, sort, action, search):
|
||||||
mirrors = {'https://thepiratebay.mn'}
|
mirrors = OrderedDict()
|
||||||
|
mirrors['https://thepiratebay.mn'] = None
|
||||||
try:
|
try:
|
||||||
req = request.Request('https://proxybay.co/list.txt',
|
req = request.Request('https://proxybay.co/list.txt',
|
||||||
headers=pirate.data.default_headers)
|
headers=pirate.data.default_headers)
|
||||||
@ -237,19 +239,21 @@ def search_mirrors(args):
|
|||||||
else:
|
else:
|
||||||
if f.getcode() != 200:
|
if f.getcode() != 200:
|
||||||
raise IOError('The proxy bay responded with an error.')
|
raise IOError('The proxy bay responded with an error.')
|
||||||
mirrors = mirrors.union([i.decode('utf-8').strip()
|
for mirror in [i.decode('utf-8').strip() for i in f.readlines()][3:]:
|
||||||
for i in f.readlines()][3:]
|
mirrors[mirror] = None
|
||||||
).difference(pirate.data.blacklist)
|
for mirror in pirate.data.blacklist:
|
||||||
|
if mirror in mirrors:
|
||||||
|
del mirrors[mirror]
|
||||||
|
|
||||||
for mirror in mirrors:
|
for mirror in mirrors.keys():
|
||||||
try:
|
try:
|
||||||
print('Trying', mirror, end='... \n')
|
print('Trying', mirror, end='... \n')
|
||||||
results = pirate.torrent.remote(
|
results = pirate.torrent.remote(
|
||||||
pages=args.pages,
|
pages=pages,
|
||||||
category=pirate.torrent.parse_category(args.category),
|
category=pirate.torrent.parse_category(category),
|
||||||
sort=pirate.torrent.parse_sort(args.sort),
|
sort=pirate.torrent.parse_sort(sort),
|
||||||
mode=args.action,
|
mode=action,
|
||||||
terms=args.search,
|
terms=search,
|
||||||
mirror=mirror
|
mirror=mirror
|
||||||
)
|
)
|
||||||
except (urllib.error.URLError, socket.timeout,
|
except (urllib.error.URLError, socket.timeout,
|
||||||
@ -296,7 +300,7 @@ def main():
|
|||||||
if args.source == 'local_tpb':
|
if args.source == 'local_tpb':
|
||||||
results = pirate.local.search(args.database, args.search)
|
results = pirate.local.search(args.database, args.search)
|
||||||
elif args.source == 'tpb':
|
elif args.source == 'tpb':
|
||||||
results, site = search_mirrors(args)
|
results, site = search_mirrors(args.pages, args.category, args.sort, args.action, args.search)
|
||||||
|
|
||||||
if len(results) == 0:
|
if len(results) == 0:
|
||||||
print('No results')
|
print('No results')
|
||||||
|
@ -1,5 +1,9 @@
|
|||||||
#!/usr/bin/env python3
|
#!/usr/bin/env python3
|
||||||
|
import socket
|
||||||
import unittest
|
import unittest
|
||||||
|
from unittest import mock
|
||||||
|
from unittest.mock import patch, call
|
||||||
|
|
||||||
import pirate.pirate
|
import pirate.pirate
|
||||||
|
|
||||||
|
|
||||||
@ -83,19 +87,48 @@ class TestPirate(unittest.TestCase):
|
|||||||
|
|
||||||
def test_parse_args(self):
|
def test_parse_args(self):
|
||||||
tests = [
|
tests = [
|
||||||
(['-b'], {'action': 'browse'}),
|
('', ['-b'], {'action': 'browse'}),
|
||||||
([], {'action': 'top'}),
|
('', [], {'action': 'top'}),
|
||||||
(['-R'], {'action': 'recent'}),
|
('', ['-R'], {'action': 'recent'}),
|
||||||
(['internets'], {'action': 'search', 'search': ['internets']}),
|
('', ['-l'], {'action': 'list_categories'}),
|
||||||
(['internets lol', 'lel'], {'action': 'search', 'search': ['internets lol', 'lel']}),
|
('', ['--list_sorts'], {'action': 'list_sorts'}),
|
||||||
|
('', ['term'], {'action': 'search', 'source': 'tpb'}),
|
||||||
|
('', ['-L', 'filename', 'term'], {'action': 'search', 'source': 'local_tpb', 'database': 'filename'}),
|
||||||
|
('', ['term', '-S', 'dir'], {'action': 'search', 'save_directory': 'dir'}),
|
||||||
|
('', ['-P', '1337'], {'transmission_command': ['transmission-remote', '1337']}),
|
||||||
|
('', ['term'], {'output': 'browser_open'}),
|
||||||
|
('', ['term', '-t'], {'output': 'transmission'}),
|
||||||
|
('', ['term', '--save-magnets'], {'output': 'save_magnet_files'}),
|
||||||
|
('', ['term', '--save-torrents'], {'output': 'save_torrent_files'}),
|
||||||
|
('', ['term', '-C', 'command'], {'output': 'open_command', 'open_command': 'command'}),
|
||||||
|
('', ['internets'], {'action': 'search', 'search': ['internets']}),
|
||||||
|
('', ['internets lol', 'lel'], {'action': 'search', 'search': ['internets lol', 'lel']}),
|
||||||
]
|
]
|
||||||
for test in tests:
|
for test in tests:
|
||||||
args = pirate.pirate.parse_args(test[0])
|
args = pirate.pirate.parse_args(test[1])
|
||||||
config = pirate.pirate.parse_config_file('')
|
config = pirate.pirate.parse_config_file(test[0])
|
||||||
args = pirate.pirate.combine_configs(config, args)
|
args = pirate.pirate.combine_configs(config, args)
|
||||||
for option in test[1].keys():
|
for option in test[2].keys():
|
||||||
value = getattr(args, option)
|
value = getattr(args, option)
|
||||||
self.assertEqual(test[1][option], value)
|
self.assertEqual(test[2][option], value)
|
||||||
|
|
||||||
|
def test_search_mirrors(self):
|
||||||
|
pages, category, sort, action, search = (1, 100, 10, 'browse', [])
|
||||||
|
class MockResponse():
|
||||||
|
readlines = mock.MagicMock(return_value=[x.encode('utf-8') for x in ['', '', '', 'https://example.com']])
|
||||||
|
info = mock.MagicMock()
|
||||||
|
getcode = mock.MagicMock(return_value=200)
|
||||||
|
response_obj = MockResponse()
|
||||||
|
with patch('urllib.request.urlopen', return_value=response_obj) as urlopen:
|
||||||
|
with patch('pirate.torrent.remote', return_value=[]) as remote:
|
||||||
|
results, mirror = pirate.pirate.search_mirrors(pages, category, sort, action, search)
|
||||||
|
remote.assert_called_once_with(pages=1, category=100, sort=10, mode='browse', terms=[], mirror='https://thepiratebay.mn')
|
||||||
|
with patch('pirate.torrent.remote', side_effect=[socket.timeout, []]) as remote:
|
||||||
|
results, mirror = pirate.pirate.search_mirrors(pages, category, sort, action, search)
|
||||||
|
remote.assert_has_calls([
|
||||||
|
call(pages=1, category=100, sort=10, mode='browse', terms=[], mirror='https://thepiratebay.mn'),
|
||||||
|
call(pages=1, category=100, sort=10, mode='browse', terms=[], mirror='https://example.com')
|
||||||
|
])
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
unittest.main()
|
unittest.main()
|
||||||
|
@ -109,15 +109,30 @@ class TestTorrent(unittest.TestCase):
|
|||||||
pirate.torrent.save_magnets([0], [{'magnet':magnet}], 'path')
|
pirate.torrent.save_magnets([0], [{'magnet':magnet}], 'path')
|
||||||
open_.assert_called_once_with('path/Test Drive Unlimited [PC Version].magnet', 'w')
|
open_.assert_called_once_with('path/Test Drive Unlimited [PC Version].magnet', 'w')
|
||||||
|
|
||||||
def test_get_torrent(self):
|
@patch('urllib.request.urlopen')
|
||||||
with patch('urllib.request.urlopen') as urlopen:
|
def test_get_torrent(self, urlopen):
|
||||||
class MockResponse():
|
class MockRequest():
|
||||||
add_header = mock.MagicMock()
|
add_header = mock.MagicMock()
|
||||||
response = MockResponse()
|
request_obj = MockRequest()
|
||||||
with patch('urllib.request.Request', return_value=response) as request:
|
with patch('urllib.request.Request', return_value=request_obj) as request:
|
||||||
pirate.torrent.get_torrent(100000000000000)
|
pirate.torrent.get_torrent(100000000000000)
|
||||||
request.assert_called_once_with('http://torcache.net/torrent/5AF3107A4000.torrent', headers=pirate.data.default_headers)
|
request.assert_called_once_with('http://torcache.net/torrent/5AF3107A4000.torrent', headers=pirate.data.default_headers)
|
||||||
urlopen.assert_called_once_with(response, timeout=pirate.data.default_timeout)
|
urlopen.assert_called_once_with(request_obj, timeout=pirate.data.default_timeout)
|
||||||
|
|
||||||
|
def test_remote(self):
|
||||||
|
class MockRequest():
|
||||||
|
add_header = mock.MagicMock()
|
||||||
|
request_obj = MockRequest()
|
||||||
|
class MockResponse():
|
||||||
|
read = mock.MagicMock(return_value='<html>No hits. Try adding an asterisk in you search phrase.</html>'.encode('utf8'))
|
||||||
|
info = mock.MagicMock()
|
||||||
|
response_obj = MockResponse()
|
||||||
|
with patch('urllib.request.Request', return_value=request_obj) as request:
|
||||||
|
with patch('urllib.request.urlopen', return_value=response_obj) as urlopen:
|
||||||
|
res = pirate.torrent.remote(1, 100, 10, 'browse', [], 'http://example.com')
|
||||||
|
request.assert_called_once_with('http://example.com/browse/100/0/10', headers=pirate.data.default_headers)
|
||||||
|
urlopen.assert_called_once_with(request_obj, timeout=pirate.data.default_timeout)
|
||||||
|
self.assertEqual(res, [])
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
unittest.main()
|
unittest.main()
|
||||||
|
Loading…
Reference in New Issue
Block a user