1
0
mirror of https://github.com/vikstrous/pirate-get synced 2025-01-09 09:59:51 +01:00

refactor pirate.py

This commit is contained in:
Viktor Stanchev 2015-09-16 23:15:27 -07:00
parent 06d551f69d
commit d77118055d
3 changed files with 135 additions and 113 deletions

View File

@ -49,6 +49,7 @@ def parse_config_file(text):
return config
def load_config():
# user-defined config files
main = expandvars('$XDG_CONFIG_HOME/pirate-get')
@ -172,108 +173,138 @@ def parse_args(args_in):
help='disable colored output')
args = parser.parse_args(args_in)
# figure out the mode - browse, search, top or recent
return args
def combine_configs(config, args):
# figure out the action - browse, search, top, etc.
if args.browse:
args.mode = 'browse'
args.action = 'browse'
elif args.recent:
args.mode = 'recent'
args.action = 'recent'
elif len(args.search) == 0:
args.mode = 'top'
args.action = 'top'
elif args.list_categories:
args.action = 'list_categories'
elif args.list_sorts:
args.action = 'list_sorts'
else:
args.mode = 'search'
args.action = 'search'
args.source = 'tpb'
if args.database or config.getboolean('LocalDB', 'enabled'):
args.source = 'local_tpb'
if not args.database:
args.database = config.get('LocalDB', 'path')
if not args.color or not config.getboolean('Misc', 'colors'):
# TODO: consider how this can be moved to the args
pirate.data.colored_output = False
if not args.save_directory:
args.save_directory = config.get('Save', 'directory')
args.transmission_command = ['transmission-remote']
if args.port:
args.transmission_command.append(args.port)
args.output = 'browser_open'
if args.transmission or config.getboolean('Misc', 'transmission'):
args.output = 'transmission'
elif args.save_magnets or config.getboolean('Save', 'magnets'):
args.output = 'save_magnet_files'
elif args.save_torrents or config.getboolean('Save', 'torrents'):
args.output = 'save_torrent_files'
elif args.command or config.get('Misc', 'openCommand'):
args.output = 'open_command'
args.open_command = args.command
if not args.open_command:
args.open_command = config.get('Misc', 'openCommand')
return args
def search_mirrors(args):
mirrors = {'https://thepiratebay.mn'}
try:
req = request.Request('https://proxybay.co/list.txt',
headers=pirate.data.default_headers)
f = request.urlopen(req, timeout=pirate.data.default_timeout)
except IOError:
print('Could not fetch additional mirrors', color='WARN')
else:
if f.getcode() != 200:
raise IOError('The proxy bay responded with an error.')
mirrors = mirrors.union([i.decode('utf-8').strip()
for i in f.readlines()][3:]
).difference(pirate.data.blacklist)
for mirror in mirrors:
try:
print('Trying', mirror, end='... \n')
results = pirate.torrent.remote(
pages=args.pages,
category=pirate.torrent.parse_category(args.category),
sort=pirate.torrent.parse_sort(args.sort),
mode=args.action,
terms=args.search,
mirror=mirror
)
except (urllib.error.URLError, socket.timeout,
IOError, ValueError):
print('Failed', color='WARN')
else:
print('Ok', color='alt')
return results, mirror
else:
print('No available mirrors :(', color='WARN')
return [], None
def main():
config = load_config()
args = combine_configs(load_config(), parse_args(sys.argv[1:]))
args = parse_args(sys.argv[1:])
if (config.getboolean('Misc', 'colors') and not args.color
or not config.getboolean('Misc', 'colors')):
pirate.data.colored_output = False
if args.save_directory:
config.set('Save', 'directory', args.save_directory)
transmission_command = ['transmission-remote']
if args.port:
transmission_command.append(args.port)
if args.transmission or config.getboolean('Misc', 'transmission'):
ret = subprocess.call(transmission_command + ['-l'],
# check it transmission is running
if args.transmission:
ret = subprocess.call(args.transmission_command + ['-l'],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL)
if ret != 0:
print('Transmission is not running.')
return
sys.exit(1)
if args.list_categories:
# non-torrent fetching actions
if args.action == 'list_categories':
cur_color = 'zebra_0'
for key, value in sorted(pirate.data.categories.items()):
cur_color = 'zebra_0' if cur_color == 'zebra_1' else 'zebra_1'
print(str(value), '\t', key, sep='', color=cur_color)
return
if args.list_sorts:
if args.action == 'list_sorts':
cur_color = 'zebra_0'
for key, value in sorted(pirate.data.sorts.items()):
cur_color = 'zebra_0' if cur_color == 'zebra_1' else 'zebra_1'
print(str(value), '\t', key, sep='', color=cur_color)
return
if args.database or config.getboolean('LocalDB', 'enabled'):
if args.database:
path = args.database
else:
path = config.get('LocalDB', 'path')
results = pirate.local.search(path, args.search)
sizes, uploaded = [], []
# fetch torrents
else:
results, mirrors = [], {'https://thepiratebay.mn'}
try:
req = request.Request('https://proxybay.co/list.txt',
headers=pirate.data.default_headers)
f = request.urlopen(req, timeout=pirate.data.default_timeout)
except IOError:
print('Could not fetch additional mirrors', color='WARN')
else:
if f.getcode() != 200:
raise IOError('The proxy bay responded with an error.')
mirrors = mirrors.union([i.decode('utf-8').strip()
for i in f.readlines()][3:]
).difference(pirate.data.blacklist)
for mirror in mirrors:
try:
print('Trying', mirror, end='... \n')
results = pirate.torrent.remote(
pages=args.pages,
category=pirate.torrent.parse_category(args.category),
sort=pirate.torrent.parse_sort(args.sort),
mode=args.mode,
terms=args.search,
mirror=mirror
)
except (urllib.error.URLError, socket.timeout,
IOError, ValueError):
print('Failed', color='WARN')
else:
site = mirror
print('Ok', color='alt')
break
else:
print('No available mirrors :(', color='WARN')
return
if args.source == 'local_tpb':
results = pirate.local.search(args.database, args.search)
elif args.source == 'tpb':
results, site = search_mirrors(args)
if len(results) == 0:
print('No results')
return
pirate.print.search_results(results, local=args.database)
pirate.print.search_results(results, local=args.source == 'local_tpb')
# number of results to pick
if args.first:
print('Choosing first result')
choices = [0]
@ -281,13 +312,13 @@ def main():
print('Downloading all results')
choices = range(len(results))
else:
# New input loop to support different link options
# interactive loop for per-torrent actions
while True:
print("\nSelect links (Type 'h' for more options"
", 'q' to quit)", end='\b', color='alt')
try:
l = input(': ')
except KeyboardInterrupt:
except (KeyboardInterrupt, EOFError):
print('\nCancelled.')
return
@ -314,53 +345,42 @@ def main():
elif code == 'p':
pirate.print.search_results(results)
elif code == 'm':
pirate.torrent.save_magnets(choices, results, config.get(
'Save', 'directory'))
pirate.torrent.save_magnets(choices, results, args.save_directory)
elif code == 't':
pirate.torrent.save_torrents(choices, results, config.get(
'Save', 'directory'))
pirate.torrent.save_torrents(choices, results, args.save_directory)
elif not l:
print('No links entered!', color='WARN')
else:
break
except Exception as e:
print('Exception:', e, color='ERROR')
choices = ()
return
save_to_file = False
# output
if args.save_magnets or config.getboolean('Save', 'magnets'):
if args.output == 'save_magnet_files':
print('Saving selected magnets...')
pirate.torrent.save_magnets(choices, results, config.get(
'Save', 'directory'))
save_to_file = True
pirate.torrent.save_magnets(choices, results, args.save_directory)
return
if args.save_torrents or config.getboolean('Save', 'torrents'):
if args.output == 'save_torrent_files':
print('Saving selected torrents...')
pirate.torrent.save_torrents(choices, results, config.get(
'Save', 'directory'))
save_to_file = True
if save_to_file:
pirate.torrent.save_torrents(choices, results, args.save_directory)
return
for choice in choices:
url = results[int(choice)]['magnet']
url = results[choice]['magnet']
if args.transmission or config.getboolean('Misc', 'transmission'):
subprocess.call(transmission_command + ['--add', url])
elif args.command or config.get('Misc', 'openCommand'):
command = config.get('Misc', 'openCommand')
if args.command:
command = args.command
subprocess.call(parse_cmd(command, url))
else:
if args.output == 'transmission':
subprocess.call(args.transmission_command + ['--add', url])
elif args.output == 'open_command':
subprocess.call(parse_cmd(args.open_command, url))
elif args.output == 'browser_open':
webbrowser.open(url)
if args.transmission or config.getboolean('Misc', 'transmission'):
subprocess.call(transmission_command + ['-l'])
if args.output == 'transmission':
subprocess.call(args.transmission_command + ['-l'])
if __name__ == '__main__':
main()

View File

@ -45,10 +45,10 @@ def parse_sort(sort):
return 99
#TODO: warn users when using a sort in a mode that doesn't accept sorts
#TODO: warn users when using search terms in a mode that doesn't accept search terms
#TODO: same with page parameter for top and top48h
#TODO: warn the user if trying to use a minor category with top48h
# TODO: warn users when using a sort in a mode that doesn't accept sorts
# TODO: warn users when using search terms in a mode that doesn't accept search terms
# TODO: same with page parameter for top and top48h
# TODO: warn the user if trying to use a minor category with top48h
def build_request_path(page, category, sort, mode, terms):
if mode == 'browse':
if(category == 0):
@ -151,9 +151,9 @@ def remote(pages, category, sort, mode, terms, mirror):
def get_torrent(info_hash):
url = 'http://torcache.net/torrent/{:X}.torrent'
req = request.Request(url.format(info_hash),
headers=pirate.data.default_headers)
headers=pirate.data.default_headers)
req.add_header('Accept-encoding', 'gzip')
torrent = request.urlopen(req, timeout=pirate.data.default_timeout)
if torrent.info().get('Content-Encoding') == 'gzip':
torrent = gzip.GzipFile(fileobj=BytesIO(torrent.read()))
@ -175,7 +175,7 @@ def save_torrents(chosen_links, results, folder):
except urllib.error.HTTPError:
print('There is no cached file for this torrent :(', color='ERROR')
else:
open(file,'wb').write(torrent)
open(file, 'wb').write(torrent)
print('Saved {:X} in {}'.format(info_hash, file))

View File

@ -83,16 +83,18 @@ class TestPirate(unittest.TestCase):
def test_parse_args(self):
tests = [
(['-b'], {'mode': 'browse'}),
([], {'mode': 'top'}),
(['-R'], {'mode': 'recent'}),
(['internets'], {'mode': 'search', 'search': ['internets']}),
(['internets lol', 'lel'], {'mode': 'search', 'search': ['internets lol', 'lel']}),
(['-b'], {'action': 'browse'}),
([], {'action': 'top'}),
(['-R'], {'action': 'recent'}),
(['internets'], {'action': 'search', 'search': ['internets']}),
(['internets lol', 'lel'], {'action': 'search', 'search': ['internets lol', 'lel']}),
]
for test in tests:
config = pirate.pirate.parse_args(test[0])
args = pirate.pirate.parse_args(test[0])
config = pirate.pirate.parse_config_file('')
args = pirate.pirate.combine_configs(config, args)
for option in test[1].keys():
value = getattr(config, option)
value = getattr(args, option)
self.assertEqual(test[1][option], value)
if __name__ == '__main__':