1
0
mirror of https://github.com/vikstrous/pirate-get synced 2025-01-10 10:04:21 +01:00

Merge pull request #60 from vikstrous/refactor2

refactoring and testing
This commit is contained in:
Viktor Stanchev 2015-09-08 20:01:31 -07:00
commit 2970be2f52
13 changed files with 989 additions and 154 deletions

View File

@ -66,6 +66,43 @@ def parse_cmd(cmd, url):
return ret_no_quotes
def parse_torrent_command(l):
# Very permissive handling
# Check for any occurances or d, f, p, t, m, or q
cmd_code_match = re.search(r'([hdfpmtq])', l,
flags=re.IGNORECASE)
if cmd_code_match:
code = cmd_code_match.group(0).lower()
else:
code = None
# Clean up command codes
# Substitute multiple consecutive spaces/commas for single
# comma remove anything that isn't an integer or comma.
# Turn into list
l = re.sub(r'^[hdfp, ]*|[hdfp, ]*$', '', l)
l = re.sub('[ ,]+', ',', l)
l = re.sub('[^0-9,-]', '', l)
parsed_input = l.split(',')
# expand ranges
choices = []
# loop will generate a list of lists
for elem in parsed_input:
left, sep, right = elem.partition('-')
if right:
choices.append(list(range(int(left), int(right) + 1)))
elif left != '':
choices.append([int(left)])
# flatten list
choices = sum(choices, [])
# the current code stores the choices as strings
# instead of ints. not sure if necessary
choices = [elem for elem in choices]
return code, choices
def main():
config = load_config()
@ -91,7 +128,7 @@ def main():
help='list Sortable Types')
parser.add_argument('-L', '--local', dest='database',
help='an xml file containing the Pirate Bay database')
parser.add_argument('-p', dest='pages', default=1,
parser.add_argument('-p', dest='pages', default=1, type=int,
help='the number of pages to fetch '
"(doesn't work with --local)")
parser.add_argument('-0', dest='first',
@ -123,6 +160,16 @@ def main():
help='disable colored output')
args = parser.parse_args()
# figure out the mode - browse, search, top or recent
if args.browse:
args.mode = 'browse'
elif args.recent:
args.mode = 'recent'
elif len(args.search) == 0:
args.mode = 'top'
else:
args.mode = 'search'
if (config.getboolean('Misc', 'colors') and not args.color
or not config.getboolean('Misc', 'colors')):
pirate.data.colored_output = False
@ -161,11 +208,11 @@ def main():
path = args.database
else:
path = config.get('LocalDB', 'path')
mags = pirate.local.search(path, args.search)
results = pirate.local.search(path, args.search)
sizes, uploaded = [], []
else:
mags, mirrors = [], {'https://thepiratebay.mn'}
results, mirrors = [], {'https://thepiratebay.mn'}
try:
req = request.Request('https://proxybay.co/list.txt',
headers=pirate.data.default_headers)
@ -181,9 +228,15 @@ def main():
for mirror in mirrors:
try:
print('Trying', mirror, end='... ')
mags, sizes, uploaded, ids = pirate.torrent.remote(args,
mirror)
print('Trying', mirror, end='... \n')
results = pirate.torrent.remote(
pages=args.pages,
category=pirate.torrent.parse_category(args.category),
sort=pirate.torrent.parse_sort(args.sort),
mode=args.mode,
terms=args.search,
mirror=mirror
)
except (urllib.error.URLError, socket.timeout,
IOError, ValueError):
print('Failed', color='WARN')
@ -195,18 +248,18 @@ def main():
print('No available mirrors :(', color='WARN')
return
if not mags:
if len(results) == 0:
print('No results')
return
pirate.print.search_results(mags, sizes, uploaded, local=args.database)
pirate.print.search_results(results, local=args.database)
if args.first:
print('Choosing first result')
choices = [0]
elif args.download_all:
print('Downloading all results')
choices = range(len(mags))
choices = range(len(results))
else:
# New input loop to support different link options
while True:
@ -219,40 +272,7 @@ def main():
return
try:
# Very permissive handling
# Check for any occurances or d, f, p, t, m, or q
cmd_code_match = re.search(r'([hdfpmtq])', l,
flags=re.IGNORECASE)
if cmd_code_match:
code = cmd_code_match.group(0).lower()
else:
code = None
# Clean up command codes
# Substitute multiple consecutive spaces/commas for single
# comma remove anything that isn't an integer or comma.
# Turn into list
l = re.sub(r'^[hdfp, ]*|[hdfp, ]*$', '', l)
l = re.sub('[ ,]+', ',', l)
l = re.sub('[^0-9,-]', '', l)
parsed_input = l.split(',')
# expand ranges
choices = []
# loop will generate a list of lists
for elem in parsed_input:
left, sep, right = elem.partition('-')
if right:
choices.append(list(range(int(left), int(right) + 1)))
elif left != '':
choices.append([int(left)])
# flatten list
choices = sum(choices, [])
# the current code stores the choices as strings
# instead of ints. not sure if necessary
choices = [str(elem) for elem in choices]
code, choices = parse_torrent_command(l)
# Act on option, if supplied
print('')
if code == 'h':
@ -268,16 +288,16 @@ def main():
print('Bye.', color='alt')
return
elif code == 'd':
pirate.print.descriptions(choices, mags, site, ids)
pirate.print.descriptions(choices, results, site)
elif code == 'f':
pirate.print.file_lists(choices, mags, site, ids)
pirate.print.file_lists(choices, results, site)
elif code == 'p':
pirate.print.search_results(mags, sizes, uploaded)
pirate.print.search_results(results)
elif code == 'm':
pirate.torrent.save_magnets(choices, mags, config.get(
pirate.torrent.save_magnets(choices, results, config.get(
'Save', 'directory'))
elif code == 't':
pirate.torrent.save_torrents(choices, mags, config.get(
pirate.torrent.save_torrents(choices, results, config.get(
'Save', 'directory'))
elif not l:
print('No links entered!', color='WARN')
@ -291,13 +311,13 @@ def main():
if args.save_magnets or config.getboolean('Save', 'magnets'):
print('Saving selected magnets...')
pirate.torrent.save_magnets(choices, mags, config.get(
pirate.torrent.save_magnets(choices, results, config.get(
'Save', 'directory'))
save_to_file = True
if args.save_torrents or config.getboolean('Save', 'torrents'):
print('Saving selected torrents...')
pirate.torrent.save_torrents(choices, mags, config.get(
pirate.torrent.save_torrents(choices, results, config.get(
'Save', 'directory'))
save_to_file = True
@ -305,7 +325,7 @@ def main():
return
for choice in choices:
url = mags[int(choice)][0]
url = results[int(choice)]['magnet']
if args.transmission or config.getboolean('Misc', 'transmission'):
subprocess.call(transmission_command + ['--add', url])

View File

@ -5,6 +5,7 @@ import gzip
import colorama
import urllib.parse as parse
import urllib.request as request
import shutil
from io import BytesIO
import pirate.data
@ -31,8 +32,9 @@ def print(*args, **kwargs):
return builtins.print(*args, **kwargs)
def search_results(mags, sizes, uploaded, local=None):
columns = int(os.popen('stty size', 'r').read().split()[1])
# TODO: extract the name from the search results instead of the magnet link when possible
def search_results(results, local=None):
columns = shutil.get_terminal_size((80, 20)).columns
cur_color = 'zebra_0'
if local:
@ -45,21 +47,26 @@ def search_results(mags, sizes, uploaded, local=None):
'SIZE', 'UPLOAD', 'NAME', length=columns - 52),
color='header')
for m, magnet in enumerate(mags):
for n, result in enumerate(results):
# Alternate between colors
cur_color = 'zebra_0' if cur_color == 'zebra_1' else 'zebra_1'
name = re.search(r'dn=([^\&]*)', magnet[0])
torrent_name = parse.unquote(name.group(1)).replace('+', ' ')
name = re.search(r'dn=([^\&]*)', result['magnet'])
torrent_name = parse.unquote_plus(name.group(1))
if local:
line = '{:5} {:{length}}'
content = [m, torrent_name[:columns]]
content = [n, torrent_name[:columns]]
else:
no_seeders, no_leechers = map(int, magnet[1:])
size, unit = (float(sizes[m][0]),
sizes[m][1]) if sizes else (0, '???')
date = uploaded[m]
no_seeders = int(result['seeds'])
no_leechers = int(result['leechers'])
if result['size'] != []:
size = float(result['size'][0])
unit = result['size'][1]
else:
size = 0
unit = '???'
date = result['uploaded']
# compute the S/L ratio (Higher is better)
try:
@ -69,17 +76,16 @@ def search_results(mags, sizes, uploaded, local=None):
line = ('{:4} {:5} {:5} {:5.1f} {:5.1f}'
' {:3} {:<11} {:{length}}')
content = [m, no_seeders, no_leechers, ratio,
content = [n, no_seeders, no_leechers, ratio,
size, unit, date, torrent_name[:columns - 52]]
# enhanced print output with justified columns
print(line.format(*content, length=columns - 52), color=cur_color)
def descriptions(chosen_links, mags, site, identifiers):
def descriptions(chosen_links, results, site):
for link in chosen_links:
link = int(link)
path = '/torrent/%s/' % identifiers[link]
path = '/torrent/%s/' % results[link]['id']
req = request.Request(site + path, headers=pirate.data.default_headers)
req.add_header('Accept-encoding', 'gzip')
f = request.urlopen(req, timeout=pirate.data.default_timeout)
@ -88,7 +94,7 @@ def descriptions(chosen_links, mags, site, identifiers):
f = gzip.GzipFile(fileobj=BytesIO(f.read()))
res = f.read().decode('utf-8')
name = re.search(r'dn=([^\&]*)', mags[link][0])
name = re.search(r'dn=([^\&]*)', results[link]['magnet'])
torrent_name = parse.unquote(name.group(1)).replace('+', ' ')
desc = re.search(r'<div class="nfo">\s*<pre>(.+?)(?=</pre>)',
res, re.DOTALL).group(1)
@ -101,10 +107,10 @@ def descriptions(chosen_links, mags, site, identifiers):
print(desc, color='zebra_0')
def file_lists(chosen_links, mags, site, identifiers):
def file_lists(chosen_links, results, site):
for link in chosen_links:
path = '/ajax_details_filelist.php'
query = '?id=' + identifiers[int(link)]
query = '?id=' + results[link]['id']
req = request.Request(site + path + query,
headers=pirate.data.default_headers)
req.add_header('Accept-encoding', 'gzip')
@ -113,10 +119,14 @@ def file_lists(chosen_links, mags, site, identifiers):
if f.info().get('Content-Encoding') == 'gzip':
f = gzip.GzipFile(fileobj=BytesIO(f.read()))
# TODO: proper html decoding/parsing
res = f.read().decode('utf-8').replace('&nbsp;', ' ')
if 'File list not available.' in res:
print('File list not available.')
return
files = re.findall(r'<td align="left">\s*([^<]+?)\s*</td><td ali'
r'gn="right">\s*([^<]+?)\s*</tr>', res)
name = re.search(r'dn=([^\&]*)', mags[int(link)][0])
name = re.search(r'dn=([^\&]*)', results[link]['magnet'])
torrent_name = parse.unquote(name.group(1)).replace('+', ' ')
print('Files in "%s":' % torrent_name, color='zebra_1')

View File

@ -6,57 +6,130 @@ import urllib.parse as parse
import urllib.error
import os.path
from pyquery import PyQuery as pq
import pirate.data
from pirate.print import print
from io import BytesIO
#todo: redo this with html parser instead of regex
def remote(args, mirror):
parser_regex = r'"(magnet\:\?xt=[^"]*)|<td align="right">([^<]+)</td>'
def parse_category(category):
try:
category = int(category)
except ValueError:
pass
if category in pirate.data.categories.values():
return category
elif category in pirate.data.categories.keys():
return pirate.data.categories[category]
else:
print('Invalid category ignored', color='WARN')
return '0'
def parse_sort(sort):
try:
sort = int(sort)
except ValueError:
pass
if sort in pirate.data.sorts.values():
return sort
elif sort in pirate.data.sorts.keys():
return pirate.data.sorts[sort]
else:
print('Invalid sort ignored', color='WARN')
return '99'
#TODO: warn users when using a sort in a mode that doesn't accept sorts
#TODO: warn users when using search terms in a mode that doesn't accept search terms
#TODO: same with page parameter for top and top48h
#TODO: warn the user if trying to use a minor category with top48h
def build_request_path(page, category, sort, mode, terms):
if mode == 'browse':
if(category == 0):
category = 100
return '/browse/{}/{}/{}'.format(category, page, sort)
elif mode == 'recent':
# This is not a typo. There is no / between 48h and the category.
path = '/top/48h'
# only major categories can be used with this mode
if(category == 0):
return path + 'all'
else:
return path + str(category)
elif mode == 'top':
path = '/top/'
if(category == 0):
return path + 'all'
else:
return path + str(category)
elif mode == 'search':
query = urllib.parse.quote_plus(' '.join(terms))
return '/search/{}/{}/{}/{}'.format(query, page, sort, category)
else:
raise Exception('Unknown mode.')
# this returns a list of dictionaries
def parse_page(html):
d = pq(html)
results = []
# parse the rows one by one
for row in d('table#searchResult tr'):
drow = d(row)
if len(drow('th')) > 0:
continue
# grab info about the row
magnet = pq(drow(':eq(0)>td:nth-child(2)>a:nth-child(2)')[0]).attr('href')
seeds = pq(drow(':eq(0)>td:nth-child(3)')).text()
leechers = pq(drow(':eq(0)>td:nth-child(4)')).text()
id_ = pq(drow('.detLink')).attr('href').split('/')[2]
# parse descriptions separately
desc_text = pq(drow('font.detDesc')[0]).text()
size = re.findall(r'(?<=Size )[0-9.]+\s[KMGT]*[i ]*B', desc_text)[0].split()
uploaded = re.findall(r'(?<=Uploaded ).+(?=\, Size)', desc_text)[0]
results.append({
'magnet': magnet,
'seeds': seeds,
'leechers': leechers,
'size': size,
'uploaded': uploaded,
'id': id_
})
# check for a blocked mirror
no_results = re.search(r'No hits\. Try adding an asterisk in '
r'you search phrase\.', html)
if len(results) == 0 and no_results is None:
# Contradiction - we found no results,
# but the page didn't say there were no results.
# The page is probably not actually the pirate bay,
# so let's try another mirror
raise IOError('Blocked mirror detected.')
return results
def remote(pages, category, sort, mode, terms, mirror):
res_l = []
pages = int(args.pages)
if pages < 1:
raise ValueError('Please provide an integer greater than 0 '
'for the number of pages to fetch.')
if str(args.category) in pirate.data.categories.values():
category = args.category
elif args.category in pirate.data.categories.keys():
category = pirate.data.categories[args.category]
else:
category = '0'
print('Invalid category ignored', color='WARN')
if str(args.sort) in pirate.data.sorts.values():
sort = args.sort
elif args.sort in pirate.data.sorts.keys():
sort = pirate.data.sorts[args.sort]
else:
sort = '99'
print('Invalid sort ignored', color='WARN')
# Catch the Ctrl-C exception and exit cleanly
try:
sizes = []
uploaded = []
identifiers = []
for page in range(pages):
if args.browse:
path = '/browse/'
if(category == 0):
category = 100
path = '/browse/' + '/'.join(str(i) for i in (
category, page, sort))
elif len(args.search) == 0:
path = '/top/48h' if args.recent else '/top/'
if(category == 0):
path += 'all'
else:
path += str(category)
else:
path = '/search/' + '/'.join(str(i) for i in (
'+'.join(args.search),
page, sort,
category))
path = build_request_path(page, category, sort, mode, terms)
req = request.Request(mirror + path,
headers=pirate.data.default_headers)
@ -65,53 +138,14 @@ def remote(args, mirror):
if f.info().get('Content-Encoding') == 'gzip':
f = gzip.GzipFile(fileobj=BytesIO(f.read()))
res = f.read().decode('utf-8')
found = re.findall(r'"(magnet\:\?xt=[^"]*)|<td align="right">'
r'([^<]+)</td>', res)
# check for a blocked mirror
no_results = re.search(r'No hits\. Try adding an asterisk in '
r'you search phrase\.', res)
if found == [] and no_results is None:
# Contradiction - we found no results,
# but the page didn't say there were no results.
# The page is probably not actually the pirate bay,
# so let's try another mirror
raise IOError('Blocked mirror detected.')
res_l += parse_page(res)
# get sizes as well and substitute the &nbsp; character
sizes.extend([match.replace('&nbsp;', ' ').split()
for match in re.findall(r'(?<=Size )[0-9.]'
r'+\&nbsp\;[KMGT]*[i ]*B', res)])
uploaded.extend([match.replace('&nbsp;', ' ')
for match in re.findall(r'(?<=Uploaded )'
r'.+(?=\, Size)',res)])
identifiers.extend([match.replace('&nbsp;', ' ')
for match in re.findall('(?<=/torrent/)'
'[0-9]+(?=/)',res)])
state = 'seeds'
curr = ['', 0, 0] #magnet, seeds, leeches
for f in found:
if f[1] == '':
curr[0] = f[0]
else:
if state == 'seeds':
curr[1] = f[1]
state = 'leeches'
else:
curr[2] = f[1]
state = 'seeds'
res_l.append(curr)
curr = ['', 0, 0]
except KeyboardInterrupt:
print('\nCancelled.')
sys.exit(0)
# return the sizes in a spearate list
return res_l, sizes, uploaded, identifiers
return res_l
def get_torrent(info_hash):
@ -127,9 +161,9 @@ def get_torrent(info_hash):
return torrent.read()
def save_torrents(chosen_links, mags, folder):
def save_torrents(chosen_links, results, folder):
for link in chosen_links:
magnet = mags[int(link)][0]
magnet = results[link]['magnet']
name = re.search(r'dn=([^\&]*)', magnet)
torrent_name = parse.unquote(name.group(1)).replace('+', ' ')
info_hash = int(re.search(r'btih:([a-f0-9]{40})', magnet).group(1), 16)
@ -146,7 +180,7 @@ def save_torrents(chosen_links, mags, folder):
def save_magnets(chosen_links, mags, folder):
for link in chosen_links:
magnet = mags[int(link)][0]
magnet = results[link]['magnet']
name = re.search(r'dn=([^\&]*)', magnet)
torrent_name = parse.unquote(name.group(1)).replace('+', ' ')
info_hash = int(re.search(r'btih:([a-f0-9]{40})', magnet).group(1), 16)

View File

@ -13,7 +13,7 @@ setup(name='pirate-get',
entry_points={
'console_scripts': ['pirate-get = pirate.pirate:main']
},
install_requires=['colorama>=0.3.3'],
install_requires=['colorama>=0.3.3', 'pyquery>=1.2.9'],
keywords=['torrent', 'magnet', 'download', 'tpb', 'client'],
classifiers=[
'Topic :: Utilities',

1
tests/data/blocked.html Normal file
View File

@ -0,0 +1 @@
blocked.

File diff suppressed because one or more lines are too long

200
tests/data/no_hits.html Normal file

File diff suppressed because one or more lines are too long

View File

@ -3,10 +3,13 @@ import unittest
import pirate.local
import os
from tests import util
class TestLocal(unittest.TestCase):
def test_rich_xml(self):
path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'rich.xml')
path = util.data_path('rich.xml')
expected = [['magnet:?xt=urn:btih:b03c8641415d3a0fc7077f5bf567634442989a74&dn=High.Chaparall.S02E02.PDTV.XViD.SWEDiSH-HuBBaTiX', '?', '?']]
actual = pirate.local.search(path, ('High',))
self.assertEqual(actual, expected)

40
tests/test_pirate.py Executable file
View File

@ -0,0 +1,40 @@
#!/usr/bin/env python3
import unittest
import pirate.pirate
class TestPirate(unittest.TestCase):
def test_parse_cmd(self):
tests = [
[['abc', ''], ['abc']],
[['abc %s', 'url'], ['abc', 'url']],
[['abc "%s"', 'url'], ['abc', 'url']],
[["abc \'%s\'", 'url'], ['abc', 'url']],
[['abc bash -c "\'%s\'"', 'url'], ['abc', 'bash', '-c', "'url'"]],
[['abc %s %s', 'url'], ['abc', 'url', 'url']],
]
for test in tests:
self.assertEqual(pirate.pirate.parse_cmd(*test[0]), test[1])
def test_parse_torrent_command(self):
tests = [
[['h'], ('h', [])],
[['q'], ('q', [])],
[['d1'], ('d', [1])],
[['f1'], ('f', [1])],
[['p1'], ('p', [1])],
[['t1'], ('t', [1])],
[['m1'], ('m', [1])],
[['d 23'], ('d', [23])],
[['d 23,1'], ('d', [23, 1])],
[['d 23, 1'], ('d', [23, 1])],
[['1d'], ('d', [1])],
[['1 ... d'], ('d', [1])],
[['1-3 d'], ('d', [1,2,3])],
]
for test in tests:
self.assertEqual(pirate.pirate.parse_torrent_command(*test[0]), test[1])
if __name__ == '__main__':
unittest.main()

29
tests/test_print.py Executable file
View File

@ -0,0 +1,29 @@
#!/usr/bin/env python3
import unittest
from unittest.mock import patch
from unittest.mock import call
import pirate.print
class TestPrint(unittest.TestCase):
def test_print_results(self):
with patch('pirate.print.print') as mock:
results = [{
'magnet': 'dn=name',
'seeds': 1,
'leechers': 2,
'size': ['3','MiB'],
'uploaded': 'never'
}]
pirate.print.search_results(results)
actual = mock.call_args_list
expected = [
call('LINK SEED LEECH RATIO SIZE UPLOAD NAME ', color='header'),
call(' 0 1 2 0.5 3.0 MiB never name ', color='zebra_1'),
]
self.assertEqual(expected, actual)
if __name__ == '__main__':
unittest.main()

29
tests/test_torrent.py Executable file

File diff suppressed because one or more lines are too long

8
tests/util.py Normal file
View File

@ -0,0 +1,8 @@
import os
def data_path(name):
return os.path.join(os.path.dirname(os.path.realpath(__file__)), 'data', name)
def read_data(name):
with open(data_path(name)) as f:
return f.read()