mirror of
https://github.com/vikstrous/pirate-get
synced 2025-01-09 09:59:51 +01:00
commit
07ad7e33e6
218
pirate/pirate.py
218
pirate/pirate.py
@ -49,6 +49,7 @@ def parse_config_file(text):
|
||||
|
||||
return config
|
||||
|
||||
|
||||
def load_config():
|
||||
# user-defined config files
|
||||
main = expandvars('$XDG_CONFIG_HOME/pirate-get')
|
||||
@ -172,108 +173,138 @@ def parse_args(args_in):
|
||||
help='disable colored output')
|
||||
args = parser.parse_args(args_in)
|
||||
|
||||
# figure out the mode - browse, search, top or recent
|
||||
return args
|
||||
|
||||
|
||||
def combine_configs(config, args):
|
||||
# figure out the action - browse, search, top, etc.
|
||||
if args.browse:
|
||||
args.mode = 'browse'
|
||||
args.action = 'browse'
|
||||
elif args.recent:
|
||||
args.mode = 'recent'
|
||||
args.action = 'recent'
|
||||
elif len(args.search) == 0:
|
||||
args.mode = 'top'
|
||||
args.action = 'top'
|
||||
elif args.list_categories:
|
||||
args.action = 'list_categories'
|
||||
elif args.list_sorts:
|
||||
args.action = 'list_sorts'
|
||||
else:
|
||||
args.mode = 'search'
|
||||
args.action = 'search'
|
||||
|
||||
args.source = 'tpb'
|
||||
if args.database or config.getboolean('LocalDB', 'enabled'):
|
||||
args.source = 'local_tpb'
|
||||
|
||||
if not args.database:
|
||||
args.database = config.get('LocalDB', 'path')
|
||||
|
||||
if not args.color or not config.getboolean('Misc', 'colors'):
|
||||
# TODO: consider how this can be moved to the args
|
||||
pirate.data.colored_output = False
|
||||
|
||||
if not args.save_directory:
|
||||
args.save_directory = config.get('Save', 'directory')
|
||||
|
||||
args.transmission_command = ['transmission-remote']
|
||||
if args.port:
|
||||
args.transmission_command.append(args.port)
|
||||
|
||||
args.output = 'browser_open'
|
||||
if args.transmission or config.getboolean('Misc', 'transmission'):
|
||||
args.output = 'transmission'
|
||||
elif args.save_magnets or config.getboolean('Save', 'magnets'):
|
||||
args.output = 'save_magnet_files'
|
||||
elif args.save_torrents or config.getboolean('Save', 'torrents'):
|
||||
args.output = 'save_torrent_files'
|
||||
elif args.command or config.get('Misc', 'openCommand'):
|
||||
args.output = 'open_command'
|
||||
|
||||
args.open_command = args.command
|
||||
if not args.open_command:
|
||||
args.open_command = config.get('Misc', 'openCommand')
|
||||
|
||||
return args
|
||||
|
||||
|
||||
def search_mirrors(args):
|
||||
mirrors = {'https://thepiratebay.mn'}
|
||||
try:
|
||||
req = request.Request('https://proxybay.co/list.txt',
|
||||
headers=pirate.data.default_headers)
|
||||
f = request.urlopen(req, timeout=pirate.data.default_timeout)
|
||||
except IOError:
|
||||
print('Could not fetch additional mirrors', color='WARN')
|
||||
else:
|
||||
if f.getcode() != 200:
|
||||
raise IOError('The proxy bay responded with an error.')
|
||||
mirrors = mirrors.union([i.decode('utf-8').strip()
|
||||
for i in f.readlines()][3:]
|
||||
).difference(pirate.data.blacklist)
|
||||
|
||||
for mirror in mirrors:
|
||||
try:
|
||||
print('Trying', mirror, end='... \n')
|
||||
results = pirate.torrent.remote(
|
||||
pages=args.pages,
|
||||
category=pirate.torrent.parse_category(args.category),
|
||||
sort=pirate.torrent.parse_sort(args.sort),
|
||||
mode=args.action,
|
||||
terms=args.search,
|
||||
mirror=mirror
|
||||
)
|
||||
except (urllib.error.URLError, socket.timeout,
|
||||
IOError, ValueError):
|
||||
print('Failed', color='WARN')
|
||||
else:
|
||||
print('Ok', color='alt')
|
||||
return results, mirror
|
||||
else:
|
||||
print('No available mirrors :(', color='WARN')
|
||||
return [], None
|
||||
|
||||
|
||||
def main():
|
||||
config = load_config()
|
||||
args = combine_configs(load_config(), parse_args(sys.argv[1:]))
|
||||
|
||||
args = parse_args(sys.argv[1:])
|
||||
|
||||
if (config.getboolean('Misc', 'colors') and not args.color
|
||||
or not config.getboolean('Misc', 'colors')):
|
||||
pirate.data.colored_output = False
|
||||
|
||||
if args.save_directory:
|
||||
config.set('Save', 'directory', args.save_directory)
|
||||
|
||||
transmission_command = ['transmission-remote']
|
||||
if args.port:
|
||||
transmission_command.append(args.port)
|
||||
|
||||
if args.transmission or config.getboolean('Misc', 'transmission'):
|
||||
ret = subprocess.call(transmission_command + ['-l'],
|
||||
# check it transmission is running
|
||||
if args.transmission:
|
||||
ret = subprocess.call(args.transmission_command + ['-l'],
|
||||
stdout=subprocess.DEVNULL,
|
||||
stderr=subprocess.DEVNULL)
|
||||
if ret != 0:
|
||||
print('Transmission is not running.')
|
||||
return
|
||||
sys.exit(1)
|
||||
|
||||
if args.list_categories:
|
||||
# non-torrent fetching actions
|
||||
|
||||
if args.action == 'list_categories':
|
||||
cur_color = 'zebra_0'
|
||||
for key, value in sorted(pirate.data.categories.items()):
|
||||
cur_color = 'zebra_0' if cur_color == 'zebra_1' else 'zebra_1'
|
||||
print(str(value), '\t', key, sep='', color=cur_color)
|
||||
return
|
||||
|
||||
if args.list_sorts:
|
||||
if args.action == 'list_sorts':
|
||||
cur_color = 'zebra_0'
|
||||
for key, value in sorted(pirate.data.sorts.items()):
|
||||
cur_color = 'zebra_0' if cur_color == 'zebra_1' else 'zebra_1'
|
||||
print(str(value), '\t', key, sep='', color=cur_color)
|
||||
return
|
||||
|
||||
if args.database or config.getboolean('LocalDB', 'enabled'):
|
||||
if args.database:
|
||||
path = args.database
|
||||
else:
|
||||
path = config.get('LocalDB', 'path')
|
||||
results = pirate.local.search(path, args.search)
|
||||
sizes, uploaded = [], []
|
||||
# fetch torrents
|
||||
|
||||
else:
|
||||
results, mirrors = [], {'https://thepiratebay.mn'}
|
||||
try:
|
||||
req = request.Request('https://proxybay.co/list.txt',
|
||||
headers=pirate.data.default_headers)
|
||||
f = request.urlopen(req, timeout=pirate.data.default_timeout)
|
||||
except IOError:
|
||||
print('Could not fetch additional mirrors', color='WARN')
|
||||
else:
|
||||
if f.getcode() != 200:
|
||||
raise IOError('The proxy bay responded with an error.')
|
||||
mirrors = mirrors.union([i.decode('utf-8').strip()
|
||||
for i in f.readlines()][3:]
|
||||
).difference(pirate.data.blacklist)
|
||||
|
||||
for mirror in mirrors:
|
||||
try:
|
||||
print('Trying', mirror, end='... \n')
|
||||
results = pirate.torrent.remote(
|
||||
pages=args.pages,
|
||||
category=pirate.torrent.parse_category(args.category),
|
||||
sort=pirate.torrent.parse_sort(args.sort),
|
||||
mode=args.mode,
|
||||
terms=args.search,
|
||||
mirror=mirror
|
||||
)
|
||||
except (urllib.error.URLError, socket.timeout,
|
||||
IOError, ValueError):
|
||||
print('Failed', color='WARN')
|
||||
else:
|
||||
site = mirror
|
||||
print('Ok', color='alt')
|
||||
break
|
||||
else:
|
||||
print('No available mirrors :(', color='WARN')
|
||||
return
|
||||
if args.source == 'local_tpb':
|
||||
results = pirate.local.search(args.database, args.search)
|
||||
elif args.source == 'tpb':
|
||||
results, site = search_mirrors(args)
|
||||
|
||||
if len(results) == 0:
|
||||
print('No results')
|
||||
return
|
||||
|
||||
pirate.print.search_results(results, local=args.database)
|
||||
pirate.print.search_results(results, local=args.source == 'local_tpb')
|
||||
|
||||
# number of results to pick
|
||||
if args.first:
|
||||
print('Choosing first result')
|
||||
choices = [0]
|
||||
@ -281,13 +312,13 @@ def main():
|
||||
print('Downloading all results')
|
||||
choices = range(len(results))
|
||||
else:
|
||||
# New input loop to support different link options
|
||||
# interactive loop for per-torrent actions
|
||||
while True:
|
||||
print("\nSelect links (Type 'h' for more options"
|
||||
", 'q' to quit)", end='\b', color='alt')
|
||||
try:
|
||||
l = input(': ')
|
||||
except KeyboardInterrupt:
|
||||
except (KeyboardInterrupt, EOFError):
|
||||
print('\nCancelled.')
|
||||
return
|
||||
|
||||
@ -314,53 +345,42 @@ def main():
|
||||
elif code == 'p':
|
||||
pirate.print.search_results(results)
|
||||
elif code == 'm':
|
||||
pirate.torrent.save_magnets(choices, results, config.get(
|
||||
'Save', 'directory'))
|
||||
pirate.torrent.save_magnets(choices, results, args.save_directory)
|
||||
elif code == 't':
|
||||
pirate.torrent.save_torrents(choices, results, config.get(
|
||||
'Save', 'directory'))
|
||||
pirate.torrent.save_torrents(choices, results, args.save_directory)
|
||||
elif not l:
|
||||
print('No links entered!', color='WARN')
|
||||
else:
|
||||
break
|
||||
except Exception as e:
|
||||
print('Exception:', e, color='ERROR')
|
||||
choices = ()
|
||||
return
|
||||
|
||||
save_to_file = False
|
||||
# output
|
||||
|
||||
if args.save_magnets or config.getboolean('Save', 'magnets'):
|
||||
if args.output == 'save_magnet_files':
|
||||
print('Saving selected magnets...')
|
||||
pirate.torrent.save_magnets(choices, results, config.get(
|
||||
'Save', 'directory'))
|
||||
save_to_file = True
|
||||
pirate.torrent.save_magnets(choices, results, args.save_directory)
|
||||
return
|
||||
|
||||
if args.save_torrents or config.getboolean('Save', 'torrents'):
|
||||
if args.output == 'save_torrent_files':
|
||||
print('Saving selected torrents...')
|
||||
pirate.torrent.save_torrents(choices, results, config.get(
|
||||
'Save', 'directory'))
|
||||
save_to_file = True
|
||||
|
||||
if save_to_file:
|
||||
pirate.torrent.save_torrents(choices, results, args.save_directory)
|
||||
return
|
||||
|
||||
for choice in choices:
|
||||
url = results[int(choice)]['magnet']
|
||||
url = results[choice]['magnet']
|
||||
|
||||
if args.transmission or config.getboolean('Misc', 'transmission'):
|
||||
subprocess.call(transmission_command + ['--add', url])
|
||||
|
||||
elif args.command or config.get('Misc', 'openCommand'):
|
||||
command = config.get('Misc', 'openCommand')
|
||||
if args.command:
|
||||
command = args.command
|
||||
subprocess.call(parse_cmd(command, url))
|
||||
|
||||
else:
|
||||
if args.output == 'transmission':
|
||||
subprocess.call(args.transmission_command + ['--add', url])
|
||||
elif args.output == 'open_command':
|
||||
subprocess.call(parse_cmd(args.open_command, url))
|
||||
elif args.output == 'browser_open':
|
||||
webbrowser.open(url)
|
||||
|
||||
if args.transmission or config.getboolean('Misc', 'transmission'):
|
||||
subprocess.call(transmission_command + ['-l'])
|
||||
if args.output == 'transmission':
|
||||
subprocess.call(args.transmission_command + ['-l'])
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
|
@ -45,10 +45,10 @@ def parse_sort(sort):
|
||||
return 99
|
||||
|
||||
|
||||
#TODO: warn users when using a sort in a mode that doesn't accept sorts
|
||||
#TODO: warn users when using search terms in a mode that doesn't accept search terms
|
||||
#TODO: same with page parameter for top and top48h
|
||||
#TODO: warn the user if trying to use a minor category with top48h
|
||||
# TODO: warn users when using a sort in a mode that doesn't accept sorts
|
||||
# TODO: warn users when using search terms in a mode that doesn't accept search terms
|
||||
# TODO: same with page parameter for top and top48h
|
||||
# TODO: warn the user if trying to use a minor category with top48h
|
||||
def build_request_path(page, category, sort, mode, terms):
|
||||
if mode == 'browse':
|
||||
if(category == 0):
|
||||
@ -151,7 +151,7 @@ def remote(pages, category, sort, mode, terms, mirror):
|
||||
def get_torrent(info_hash):
|
||||
url = 'http://torcache.net/torrent/{:X}.torrent'
|
||||
req = request.Request(url.format(info_hash),
|
||||
headers=pirate.data.default_headers)
|
||||
headers=pirate.data.default_headers)
|
||||
req.add_header('Accept-encoding', 'gzip')
|
||||
|
||||
torrent = request.urlopen(req, timeout=pirate.data.default_timeout)
|
||||
@ -175,7 +175,7 @@ def save_torrents(chosen_links, results, folder):
|
||||
except urllib.error.HTTPError:
|
||||
print('There is no cached file for this torrent :(', color='ERROR')
|
||||
else:
|
||||
open(file,'wb').write(torrent)
|
||||
open(file, 'wb').write(torrent)
|
||||
print('Saved {:X} in {}'.format(info_hash, file))
|
||||
|
||||
|
||||
|
@ -83,16 +83,18 @@ class TestPirate(unittest.TestCase):
|
||||
|
||||
def test_parse_args(self):
|
||||
tests = [
|
||||
(['-b'], {'mode': 'browse'}),
|
||||
([], {'mode': 'top'}),
|
||||
(['-R'], {'mode': 'recent'}),
|
||||
(['internets'], {'mode': 'search', 'search': ['internets']}),
|
||||
(['internets lol', 'lel'], {'mode': 'search', 'search': ['internets lol', 'lel']}),
|
||||
(['-b'], {'action': 'browse'}),
|
||||
([], {'action': 'top'}),
|
||||
(['-R'], {'action': 'recent'}),
|
||||
(['internets'], {'action': 'search', 'search': ['internets']}),
|
||||
(['internets lol', 'lel'], {'action': 'search', 'search': ['internets lol', 'lel']}),
|
||||
]
|
||||
for test in tests:
|
||||
config = pirate.pirate.parse_args(test[0])
|
||||
args = pirate.pirate.parse_args(test[0])
|
||||
config = pirate.pirate.parse_config_file('')
|
||||
args = pirate.pirate.combine_configs(config, args)
|
||||
for option in test[1].keys():
|
||||
value = getattr(config, option)
|
||||
value = getattr(args, option)
|
||||
self.assertEqual(test[1][option], value)
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
Loading…
Reference in New Issue
Block a user