1
0
mirror of https://github.com/vikstrous/pirate-get synced 2025-01-09 09:59:51 +01:00

Merge pull request #136 from vikstrous/api

Big rewrite for the pirate bay new API
This commit is contained in:
Michele Guerini Rocco 2020-05-28 12:20:08 +02:00 committed by GitHub
commit 7c276882ab
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 490 additions and 1072 deletions

View File

@ -5,7 +5,8 @@ import pkgutil
def get_resource(filename):
return pkgutil.get_data(__package__, 'data/' + filename)
version = '0.3.7'
version = '0.4.0'
categories = json.loads(get_resource('categories.json').decode())
sorts = json.loads(get_resource('sorts.json').decode())
@ -14,5 +15,5 @@ blacklist = set(json.loads(get_resource('blacklist.json').decode()))
default_headers = {'User-Agent': 'pirate get'}
default_timeout = 10
default_mirror = 'https://thepiratebay.org/'
default_mirror = 'https://apibay.org'
mirror_list = 'https://proxybay.bz/list.txt'

View File

@ -1,15 +1,15 @@
{
"TitleDsc": 1,
"TitleAsc": 2,
"DateDsc": 3,
"DateAsc": 4,
"SizeDsc": 5,
"SizeAsc": 6,
"SeedersDsc": 7,
"SeedersAsc": 8,
"LeechersDsc": 9,
"LeechersAsc": 10,
"CategoryDsc": 13,
"CategoryAsc": 14,
"Default": 99
}
"TitleDsc": [1, "name", true],
"TitleAsc": [2, "name", false],
"DateDsc": [3, "raw_uploaded", true],
"DateAsc": [4, "raw_uploaded", false],
"SizeDsc": [5, "raw_size", true],
"SizeAsc": [6, "raw_size", false],
"SeedersDsc": [7, "seeders", true],
"SeedersAsc": [8, "seeders", false],
"LeechersDsc": [9, "leechers", true],
"LeechersAsc": [10, "leechers", false],
"CategoryDsc": [13, "category", true],
"CategoryAsc": [14, "category", false],
"Default": [99, "seeders", true]
}

View File

@ -8,7 +8,7 @@ import socket
import urllib.request as request
import urllib.error
import builtins
import json
import webbrowser
import pirate.data
@ -42,6 +42,7 @@ def parse_config_file(text):
config.set('Misc', 'transmission-port', '') # for backward compatibility
config.set('Misc', 'colors', 'true')
config.set('Misc', 'mirror', pirate.data.default_mirror)
config.set('Misc', 'timeout', pirate.data.default_timeout)
config.read_string(text)
@ -122,31 +123,33 @@ def parse_torrent_command(l):
def parse_args(args_in):
parser = argparse.ArgumentParser(
description='finds and downloads torrents from the Pirate Bay')
parser.add_argument('-b', dest='browse',
parser.add_argument('-b', '--browse',
action='store_true',
help='display in Browse mode')
parser.add_argument('search', metavar='search',
parser.add_argument('search',
nargs='*', help='term to search for')
parser.add_argument('-c', dest='category', metavar='category',
parser.add_argument('-c', '--category',
help='specify a category to search', default='All')
parser.add_argument('-s', dest='sort', metavar='sort',
parser.add_argument('-s', '--sort',
help='specify a sort option', default='SeedersDsc')
parser.add_argument('-R', dest='recent', action='store_true',
help='torrents uploaded in the last 48hours.'
parser.add_argument('-R', '--recent',
action='store_true',
help='torrents uploaded in the last 48hours. '
'*ignored in searches*')
parser.add_argument('-l', dest='list_categories',
parser.add_argument('-l', '--list-categories',
action='store_true',
help='list categories')
parser.add_argument('--list_sorts', dest='list_sorts',
parser.add_argument('--list-sorts', '--list_sorts',
action='store_true',
help='list Sortable Types')
help='list types by which results can be sorted')
parser.add_argument('-p', '--pages',
default=1, type=int,
help='the number of pages to fetch. '
'(only used with --recent)')
parser.add_argument('-L', '--local', dest='database',
help='a csv file containing the Pirate Bay database '
'downloaded from '
'https://thepiratebay.org/static/dump/csv/')
parser.add_argument('-p', dest='pages', default=1, type=int,
help='the number of pages to fetch '
"(doesn't work with --local)")
parser.add_argument('-0', dest='first',
action='store_true',
help='choose the top result')
@ -182,9 +185,14 @@ def parse_args(args_in):
parser.add_argument('-m', '--mirror',
type=str, nargs='+',
help='the pirate bay mirror(s) to use')
parser.add_argument('-z', '--timeout', type=int,
help='timeout in seconds for http requests')
parser.add_argument('-v', '--version',
action='store_true',
help='print pirate-get version number')
parser.add_argument('-j', '--json',
action='store_true',
help='print results in JSON format to stdout')
args = parser.parse_args(args_in)
return args
@ -223,6 +231,9 @@ def combine_configs(config, args):
if not args.mirror:
args.mirror = config.get('Misc', 'mirror').split()
if not args.timeout:
args.timeout = int(config.get('Misc', 'timeout'))
args.transmission_command = ['transmission-remote']
if args.endpoint:
args.transmission_command.append(args.endpoint)
@ -261,6 +272,7 @@ def combine_configs(config, args):
def connect_mirror(mirror, printer, args):
try:
printer.print('Trying', mirror, end='... ')
url = pirate.torrent.find_api(mirror, args.timeout)
results = pirate.torrent.remote(
printer=printer,
pages=args.pages,
@ -268,7 +280,8 @@ def connect_mirror(mirror, printer, args):
sort=pirate.torrent.parse_sort(printer, args.sort),
mode=args.action,
terms=args.search,
mirror=mirror)
mirror=url,
timeout=args.timeout)
except (urllib.error.URLError, socket.timeout, IOError, ValueError) as e:
printer.print('Failed', color='WARN', end=' ')
printer.print('(', e, ')', sep='')
@ -282,14 +295,14 @@ def search_mirrors(printer, args):
# try default or user mirrors
for mirror in args.mirror:
result = connect_mirror(mirror, printer, args)
if result:
if result is not None:
return result
# download mirror list
try:
req = request.Request(pirate.data.mirror_list,
headers=pirate.data.default_headers)
f = request.urlopen(req, timeout=pirate.data.default_timeout)
f = request.urlopen(req, timeout=args.timeout)
except urllib.error.URLError as e:
raise IOError('Could not fetch mirrors', e.reason)
@ -304,7 +317,7 @@ def search_mirrors(printer, args):
if mirror in pirate.data.blacklist:
continue
result = connect_mirror(mirror, printer, args)
if result:
if result is not None:
return result
else:
raise IOError('No more available mirrors')
@ -313,6 +326,13 @@ def search_mirrors(printer, args):
def pirate_main(args):
printer = Printer(args.color)
# browse mode needs a specific category
if args.browse:
if args.category == 'All' or args.category == 0:
printer.print('You must select a specific category in browse mode.'
' ("All" is not valid)', color='ERROR')
sys.exit(1)
# print version
if args.version:
printer.print('pirate-get, version {}'.format(pirate.data.version))
@ -340,7 +360,7 @@ def pirate_main(args):
cur_color = 'zebra_0'
for key, value in sorted(pirate.data.sorts.items()):
cur_color = 'zebra_0' if cur_color == 'zebra_1' else 'zebra_1'
printer.print(str(value), '\t', key, sep='', color=cur_color)
printer.print(str(value[0]), '\t', key, sep='', color=cur_color)
return
# fetch torrents
@ -365,7 +385,11 @@ def pirate_main(args):
printer.print('No results')
return
printer.search_results(results, local=args.source == 'local_tpb')
if args.json:
print(json.dumps(results))
return
else:
printer.search_results(results, local=args.source == 'local_tpb')
# number of results to pick
if args.first:
@ -380,13 +404,13 @@ def pirate_main(args):
printer.print("\nSelect links (Type 'h' for more options"
", 'q' to quit)", end='\b', color='alt')
try:
l = builtins.input(': ')
cmd = builtins.input(': ')
except (KeyboardInterrupt, EOFError):
printer.print('\nCancelled.')
return
try:
code, choices = parse_torrent_command(l)
code, choices = parse_torrent_command(cmd)
# Act on option, if supplied
printer.print('')
if code == 'h':
@ -403,9 +427,9 @@ def pirate_main(args):
printer.print('Bye.', color='alt')
return
elif code == 'd':
printer.descriptions(choices, results, site)
printer.descriptions(choices, results, site, args.timeout)
elif code == 'f':
printer.file_lists(choices, results, site)
printer.file_lists(choices, results, site, args.timeout)
elif code == 'p':
printer.search_results(results)
elif code == 'm':
@ -415,8 +439,9 @@ def pirate_main(args):
pirate.torrent.copy_magnets(printer, choices, results)
elif code == 't':
pirate.torrent.save_torrents(printer, choices, results,
args.save_directory)
elif not l:
args.save_directory,
args.timeout)
elif not cmd:
printer.print('No links entered!', color='WARN')
else:
break
@ -435,7 +460,8 @@ def pirate_main(args):
if args.output == 'save_torrent_files':
printer.print('Saving selected torrents...')
pirate.torrent.save_torrents(printer, choices,
results, args.save_directory)
results, args.save_directory,
args.timeout)
return
for choice in choices:

View File

@ -1,17 +1,18 @@
import builtins
import re
import gzip
import urllib.parse as parse
import urllib.request as request
import shutil
import json
import sys
import pirate.data
import pirate.torrent
import colorama
import veryprettytable
import veryprettytable as pretty
from io import BytesIO
from http.cookiejar import CookieJar
class Printer:
@ -33,10 +34,10 @@ class Printer:
c = color_dict[kwargs.pop('color')]
args = (c + args[0],) + args[1:] + (colorama.Style.RESET_ALL,)
kwargs.pop('color', None)
return builtins.print(*args, **kwargs)
return builtins.print(*args, file=sys.stderr, **kwargs)
else:
kwargs.pop('color', None)
return builtins.print(*args, **kwargs)
return builtins.print(*args, file=sys.stderr, **kwargs)
# TODO: extract the name from the search results
# instead of from the magnet link when possible
@ -45,14 +46,14 @@ class Printer:
even = True
if local:
table = veryprettytable.VeryPrettyTable(['LINK', 'DATE', 'SIZE', 'NAME'])
table = pretty.VeryPrettyTable(['LINK', 'DATE', 'SIZE', 'NAME'])
table.align['SIZE'] = 'r'
table.align['NAME'] = 'l'
else:
table = veryprettytable.VeryPrettyTable(['LINK', 'SEED', 'LEECH',
'RATIO', 'SIZE',
'UPLOAD', 'NAME'])
table = pretty.VeryPrettyTable(['LINK', 'SEED', 'LEECH',
'RATIO', 'SIZE',
'UPLOAD', 'NAME'])
table.align['NAME'] = 'l'
table.align['SEED'] = 'r'
table.align['LEECH'] = 'r'
@ -65,21 +66,15 @@ class Printer:
table.padding_width = 1
for n, result in enumerate(results):
name = re.search(r'dn=([^\&]*)', result['magnet'])
torrent_name = parse.unquote_plus(name.group(1))
torrent_name = result['name']
if local:
content = [n, result['date'], result['size'], torrent_name[:columns - 42]]
content = [n, result['date'], result['size'],
torrent_name[:columns - 42]]
else:
no_seeders = int(result['seeds'])
no_seeders = int(result['seeders'])
no_leechers = int(result['leechers'])
if result['size'] != []:
size = float(result['size'][0])
unit = result['size'][1]
else:
size = 0
unit = '???'
size = result['size']
date = result['uploaded']
# compute the S/L ratio (Higher is better)
@ -90,8 +85,7 @@ class Printer:
content = [n, no_seeders, no_leechers,
'{:.1f}'.format(ratio),
'{:.1f} '.format(size) + unit,
date, torrent_name[:columns - 50]]
size, date, torrent_name[:columns - 50]]
if even or not self.enable_color:
table.add_row(content)
@ -102,65 +96,60 @@ class Printer:
even = not even
self.print(table)
def descriptions(self, chosen_links, results, site):
jar = CookieJar()
opener = request.build_opener(
request.HTTPErrorProcessor,
request.HTTPCookieProcessor(jar))
def descriptions(self, chosen_links, results, site, timeout):
for link in chosen_links:
path = '/torrent/%s/' % results[link]['id']
req = request.Request(site + path,
headers=pirate.data.default_headers)
result = results[link]
req = request.Request(
site + '/t.php?id=' + str(result['id']),
headers=pirate.data.default_headers)
req.add_header('Accept-encoding', 'gzip')
f = opener.open(req, timeout=pirate.data.default_timeout)
f = request.urlopen(req, timeout=timeout)
if f.info().get('Content-Encoding') == 'gzip':
f = gzip.GzipFile(fileobj=BytesIO(f.read()))
res = f.read().decode('utf-8')
name = re.search(r'dn=([^\&]*)', results[link]['magnet'])
torrent_name = parse.unquote(name.group(1)).replace('+', ' ')
desc = re.search(r'<div class="nfo">\s*<pre>(.+?)(?=</pre>)',
res, re.DOTALL).group(1)
res = json.load(f)
# Replace HTML links with markdown style versions
desc = re.sub(r'<a href="\s*([^"]+?)\s*"[^>]*>(\s*)([^<]+?)(\s*'
r')</a>', r'\2[\3](\1)\4', desc)
r')</a>', r'\2[\3](\1)\4', res['descr'])
self.print('Description for "%s":' % torrent_name, color='zebra_1')
self.print('Description for "{}":'.format(result['name']),
color='zebra_1')
self.print(desc, color='zebra_0')
def file_lists(self, chosen_links, results, site):
jar = CookieJar()
opener = request.build_opener(
request.HTTPErrorProcessor,
request.HTTPCookieProcessor(jar))
def file_lists(self, chosen_links, results, site, timeout):
# the API may returns object instead of list
def get(obj):
try:
return obj[0]
except KeyError:
return obj['0']
for link in chosen_links:
path = '/ajax_details_filelist.php'
query = '?id=' + results[link]['id']
req = request.Request(site + path + query,
headers=pirate.data.default_headers)
result = results[link]
req = request.Request(
site + '/f.php?id=' + str(result['id']),
headers=pirate.data.default_headers)
req.add_header('Accept-encoding', 'gzip')
f = opener.open(req, timeout=pirate.data.default_timeout)
f = request.urlopen(req, timeout=timeout)
if f.info().get('Content-Encoding') == 'gzip':
f = gzip.GzipFile(fileobj=BytesIO(f.read()))
# TODO: proper html decoding/parsing
res = f.read().decode('utf-8').replace('&nbsp;', ' ')
if 'File list not available.' in res:
res = json.load(f)
if len(res) == 1 and 'not found' in get(res[0]['name']):
self.print('File list not available.')
return
files = re.findall(r'<td align="left">\s*([^<]+?)\s*</td><td ali'
r'gn="right">\s*([^<]+?)\s*</tr>', res)
name = re.search(r'dn=([^\&]*)', results[link]['magnet'])
torrent_name = parse.unquote(name.group(1)).replace('+', ' ')
self.print('Files in "%s":' % torrent_name, color='zebra_1')
self.print('Files in {}:'.format(result['name']), color='zebra_1')
cur_color = 'zebra_0'
for f in files:
self.print('{0[0]:>11} {0[1]}'.format(f), color=cur_color)
for f in res:
name = get(f['name'])
size = pirate.torrent.pretty_size(int(get(f['size'])))
self.print('{:>11} {}'.format(
size, name),
color=cur_color)
cur_color = 'zebra_0' if cur_color == 'zebra_1' else 'zebra_1'

View File

@ -8,13 +8,10 @@ import urllib.error
import os.path
import pirate.data
import json
from bs4 import BeautifulSoup
from datetime import datetime
from io import BytesIO
from http.cookiejar import CookieJar
parser_regex = r'"(magnet\:\?xt=[^"]*)|<td align="right">([^<]+)</td>'
def parse_category(printer, category):
@ -36,208 +33,184 @@ def parse_sort(printer, sort):
sort = int(sort)
except ValueError:
pass
if sort in pirate.data.sorts.values():
return sort
elif sort in pirate.data.sorts.keys():
return pirate.data.sorts[sort]
for key, val in pirate.data.sorts.items():
if sort == key or sort == val[0]:
return val[1:]
else:
printer.print('Invalid sort ignored', color='WARN')
return 99
return pirate.data.sorts['Default'][1:]
# TODO:
# * warn users when using a sort in a mode that doesn't accept sorts
# * warn users when using search terms in a mode
# that doesn't accept search terms
# * same with page parameter for top and top48h
# * warn the user if trying to use a minor category with top48h
def build_request_path(page, category, sort, mode, terms):
if mode == 'browse':
if(category == 0):
category = 100
return '/browse/{}/{}/{}'.format(category, page, sort)
elif mode == 'recent':
# This is not a typo. There is no / between 48h and the category.
path = '/top/48h'
# only major categories can be used with this mode
if(category == 0):
return path + 'all'
else:
return path + str(category)
elif mode == 'top':
path = '/top/'
if(category == 0):
return path + 'all'
else:
return path + str(category)
elif mode == 'search':
query = urllib.parse.quote_plus(' '.join(terms))
return '/search/{}/{}/{}/{}'.format(query, page, sort, category)
else:
raise Exception('Unknown mode.')
# this returns a list of dictionaries
def parse_page(html):
soup = BeautifulSoup(html, 'html.parser')
tables = soup.find_all('table', id='searchResult')
no_results = re.search(r'No hits\. Try adding an asterisk in '
r'you search phrase\.', html)
# check for a blocked mirror
if not tables and not no_results:
# Contradiction - we found no results,
# but the page didn't say there were no results.
# The page is probably not actually the pirate bay,
# so let's try another mirror
raise IOError('Blocked mirror detected.')
if no_results:
return []
# handle ads disguised as fake result tables
for table in tables:
results = parse_table(table)
if results:
break
else:
raise IOError('Mirror does not contain magnets.')
return results
def parse_table(table):
def parse_page(page):
results = []
try:
data = json.load(page)
except json.decoder.JSONDecodeError:
raise IOError('invalid JSON in API reply: blocked mirror?')
# parse the rows one by one (skipping headings)
for row in table('tr')[1:]:
# grab info about the row
row_link = row.find('a', class_='detLink')
if row_link is None:
continue
if len(data) == 1 and 'No results' in data[0]['name']:
return results
id_ = row_link['href'].split('/')[2]
seeds, leechers = [i.text for i in row('td')[-2:]]
magnet_tag = row.find(lambda tag: tag.name == 'a' and
tag['href'].startswith('magnet'))
if magnet_tag is None:
continue
magnet = magnet_tag['href']
# parse descriptions separately
description = row.find('font', class_='detDesc').text
size = re.findall(r'(?<=Size )[0-9.]+\s[KMGT]*[i ]*B',
description)[0].split()
uploaded = re.findall(r'(?<=Uploaded ).+(?=\, Size)',
description)[0]
results.append({
'magnet': magnet,
'seeds': seeds,
'leechers': leechers,
'size': size,
'uploaded': uploaded,
'id': id_
})
for res in data:
res['raw_size'] = int(res['size'])
res['size'] = pretty_size(int(res['size']))
res['magnet'] = build_magnet(res['name'], res['info_hash'])
res['info_hash'] = int(res['info_hash'], 16)
res['raw_uploaded'] = int(res['added'])
res['uploaded'] = pretty_date(res['added'])
res['seeders'] = int(res['seeders'])
res['leechers'] = int(res['leechers'])
res['category'] = int(res['category'])
results.append(res)
return results
def remote(printer, pages, category, sort, mode, terms, mirror):
res_l = []
def sort_results(sort, res):
key, reverse = sort
return sorted(res, key=lambda x: x[key], reverse=reverse)
if pages < 1:
raise ValueError('Please provide an integer greater than 0 '
'for the number of pages to fetch.')
# Catch the Ctrl-C exception and exit cleanly
try:
jar = CookieJar()
opener = request.build_opener(
request.HTTPErrorProcessor,
request.HTTPCookieProcessor(jar))
def pretty_size(size):
ranges = [('PiB', 1125899906842624),
('TiB', 1099511627776),
('GiB', 1073741824),
('MiB', 1048576),
('KiB', 1024)]
for unit, value in ranges:
if size >= value:
return '{:.1f} {}'.format(size/value, unit)
return str(size) + ' B'
for page in range(pages):
path = build_request_path(page, category, sort, mode, terms)
req = request.Request(mirror + path,
headers=pirate.data.default_headers)
req.add_header('Accept-encoding', 'gzip')
def pretty_date(ts):
date = datetime.fromtimestamp(int(ts))
return date.strftime('%Y-%m-%d %H:%M')
def build_magnet(name, info_hash):
return 'magnet:?xt=urn:btih:{}&dn={}'.format(
info_hash, parse.quote(name, ''))
def build_request_path(mode, page, category, terms):
if mode == 'search':
query = '/q.php?q={}&cat={}'.format(' '.join(terms), category)
elif mode == 'top':
cat = 'all' if category == 0 else category
query = '/precompiled/data_top100_{}.json'.format(cat)
elif mode == 'recent':
query = '/precompiled/data_top100_recent_{}.json'.format(page)
elif mode == 'browse':
if category == 0:
raise Exception('You must specify a category')
query = '/q.php?q=category:{}'.format(category)
else:
raise Exception('Invalid mode', mode)
return parse.quote(query, '?=&/')
def remote(printer, pages, category, sort, mode, terms, mirror, timeout):
results = []
for i in range(1, pages + 1):
query = build_request_path(mode, i, category, terms)
# Catch the Ctrl-C exception and exit cleanly
try:
req = request.Request(
mirror + query,
headers=pirate.data.default_headers)
try:
f = opener.open(req, timeout=pirate.data.default_timeout)
f = request.urlopen(req, timeout=timeout)
except urllib.error.URLError as e:
res = e.fp.read().decode()
if e.code == 503 and 'cf-browser-verification' in res:
raise IOError('Cloudflare protected')
raise e
if f.info().get('Content-Encoding') == 'gzip':
f = gzip.GzipFile(fileobj=BytesIO(f.read()))
res = f.read().decode('utf-8')
except KeyboardInterrupt:
printer.print('\nCancelled.')
sys.exit(0)
res_l += parse_page(res)
results.extend(parse_page(f))
except KeyboardInterrupt:
printer.print('\nCancelled.')
sys.exit(0)
return res_l
return sort_results(sort, results)
def get_torrent(info_hash):
def find_api(mirror, timeout):
# try common paths
for path in ['', '/apip', '/api.php?url=']:
req = request.Request(mirror + path + '/q.php?q=test&cat=0',
headers=pirate.data.default_headers)
try:
f = request.urlopen(req, timeout=timeout)
if f.info().get_content_type() == 'application/json':
return mirror + path
except urllib.error.URLError as e:
res = e.fp.read().decode()
if e.code == 503 and 'cf-browser-verification' in res:
raise IOError('Cloudflare protected')
# extract api path from main.js
req = request.Request(mirror + '/static/main.js',
headers=pirate.data.default_headers)
try:
f = request.urlopen(req, timeout=timeout)
if f.info().get_content_type() == 'application/javascript':
match = re.search("var server='([^']+)'", f.read().decode())
return mirror + match.group(1)
except urllib.error.URLError:
raise IOError('API not found: no main.js')
raise IOError('API not found')
def get_torrent(info_hash, timeout):
url = 'http://itorrents.org/torrent/{:X}.torrent'
req = request.Request(url.format(info_hash),
headers=pirate.data.default_headers)
req.add_header('Accept-encoding', 'gzip')
torrent = request.urlopen(req, timeout=pirate.data.default_timeout)
torrent = request.urlopen(req, timeout=timeout)
if torrent.info().get('Content-Encoding') == 'gzip':
torrent = gzip.GzipFile(fileobj=BytesIO(torrent.read()))
return torrent.read()
def save_torrents(printer, chosen_links, results, folder):
def save_torrents(printer, chosen_links, results, folder, timeout):
for link in chosen_links:
magnet = results[link]['magnet']
name = re.search(r'dn=([^\&]*)', magnet)
torrent_name = parse.unquote(name.group(1)).replace('+', ' ')
info_hash = int(re.search(r'btih:([a-f0-9]{40})', magnet).group(1), 16)
torrent_name = torrent_name.replace('/', '_').replace('\\', '_')
result = results[link]
torrent_name = result['name'].replace('/', '_').replace('\\', '_')
file = os.path.join(folder, torrent_name + '.torrent')
try:
torrent = get_torrent(info_hash)
torrent = get_torrent(result['info_hash'], timeout)
except urllib.error.HTTPError as e:
printer.print('There is no cached file for this torrent :('
' \nCode: {} - {}'.format(e.code, e.reason),
color='ERROR')
else:
open(file, 'wb').write(torrent)
printer.print('Saved {:X} in {}'.format(info_hash, file))
printer.print('Saved {:X} in {}'.format(result['info_hash'], file))
def save_magnets(printer, chosen_links, results, folder):
for link in chosen_links:
magnet = results[link]['magnet']
name = re.search(r'dn=([^\&]*)', magnet)
torrent_name = parse.unquote(name.group(1)).replace('+', ' ')
info_hash = int(re.search(r'btih:([a-f0-9]{40})', magnet).group(1), 16)
torrent_name = torrent_name.replace('/', '_').replace('\\', '_')
result = results[link]
torrent_name = result['name'].replace('/', '_').replace('\\', '_')
file = os.path.join(folder, torrent_name + '.magnet')
printer.print('Saved {:X} in {}'.format(info_hash, file))
printer.print('Saved {:X} in {}'.format(result['info_hash'], file))
with open(file, 'w') as f:
f.write(magnet + '\n')
f.write(result['magnet'] + '\n')
def copy_magnets(printer, chosen_links, results):
clipboard_text = ''
for link in chosen_links:
magnet = results[link]['magnet']
info_hash = int(re.search(r'btih:([a-fA-F0-9]{40})', magnet).group(1), 16)
clipboard_text += magnet + "\n"
printer.print('Copying {:X} to clipboard'.format(info_hash))
result = results[link]
clipboard_text += result['magnet'] + "\n"
printer.print('Copying {:X} to clipboard'.format(result['info_hash']))
pyperclip.copy(clipboard_text)

View File

@ -18,12 +18,11 @@ if __name__ == '__main__':
author_email='me@viktorstanchev.com',
license='AGPL',
packages=find_packages(),
package_data={'': ["data/*.json"]},
package_data={'': ["data/*.json", "tests/data/*"]},
entry_points={
'console_scripts': ['pirate-get = pirate.pirate:main']
},
install_requires=['colorama>=0.3.3',
'beautifulsoup4>=4.4.1',
'veryprettytable>=0.8.1',
'pyperclip>=1.6.2'],
keywords=['torrent', 'magnet', 'download', 'tpb', 'client'],

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

1
tests/data/no_hits.json Normal file
View File

@ -0,0 +1 @@
[{"id":"0","name":"No results returned","info_hash":"0000000000000000000000000000000000000000","leechers":"0","seeders":"0","num_files":"0","size":"0","username":"","added":"0","status":"member","category":"0","imdb":"","total_found":"1"}]

1
tests/data/result.json Normal file

File diff suppressed because one or more lines are too long

View File

@ -1,7 +1,6 @@
#!/usr/bin/env python3
import socket
import unittest
import subprocess
from argparse import Namespace
from unittest import mock
from unittest.mock import patch, call, MagicMock
@ -28,33 +27,42 @@ class TestPirate(unittest.TestCase):
@patch('subprocess.call')
def test_main(self, mock_call):
result = {
'magnet': 'dn=derp',
'seeds': '1',
'name': 'derp',
'magnet': 'magnet:?xt=urn:btih:deadbeef&dn=derp',
'seeders': '1',
'leechers': '1',
'size': ('1', 'mb'),
'size': '1 MB',
'uploaded': '1',
}
with patch('pirate.torrent.remote', return_value=[result]) as mock_remote:
with patch('pirate.pirate.connect_mirror',
return_value=([result], '')):
config = pirate.pirate.parse_config_file('')
args = pirate.pirate.combine_configs(config, pirate.pirate.parse_args(['-0', 'term', '-C', 'blah %s']))
args = pirate.pirate.combine_configs(
config,
pirate.pirate.parse_args(['-0', 'term', '-C', 'blah %s']))
pirate.pirate.pirate_main(args)
mock_call.assert_called_once_with(['blah', 'dn=derp'])
mock_call.assert_called_once_with(
['blah', 'magnet:?xt=urn:btih:deadbeef&dn=derp'])
@patch('pirate.pirate.builtins.input', return_value='0')
@patch('subprocess.call')
def test_main_choice(self, mock_call, mock_input):
result = {
'magnet': 'dn=derp',
'seeds': '1',
'name': 'derp',
'magnet': 'magnet:?xt=urn:btih:deadbeef&dn=derp',
'seeders': '1',
'leechers': '1',
'size': ('1', 'mb'),
'size': '1 MB',
'uploaded': '1',
}
with patch('pirate.torrent.remote', return_value=[result]) as mock_remote:
with patch('pirate.pirate.connect_mirror',
return_value=([result], '')):
config = pirate.pirate.parse_config_file('')
args = pirate.pirate.combine_configs(config, pirate.pirate.parse_args(['term', '-C', 'blah %s']))
args = pirate.pirate.combine_configs(
config, pirate.pirate.parse_args(['term', '-C', 'blah %s']))
pirate.pirate.pirate_main(args)
mock_call.assert_called_once_with(['blah', 'dn=derp'])
mock_call.assert_called_once_with(
['blah', 'magnet:?xt=urn:btih:deadbeef&dn=derp'])
def test_parse_torrent_command(self):
tests = [
@ -70,11 +78,13 @@ class TestPirate(unittest.TestCase):
[['d 23, 1'], ('d', [23, 1])],
[['1d'], ('d', [1])],
[['1 ... d'], ('d', [1])],
[['1-3 d'], ('d', [1,2,3])],
[['1-3'], (None, [1,2,3])],
[['1-3 d'], ('d', [1, 2, 3])],
[['1-3'], (None, [1, 2, 3])],
]
for test in tests:
self.assertEqual(pirate.pirate.parse_torrent_command(*test[0]), test[1])
self.assertEqual(
pirate.pirate.parse_torrent_command(*test[0]),
test[1])
def test_parse_config_file(self):
types = {
@ -128,16 +138,30 @@ class TestPirate(unittest.TestCase):
('', ['-l'], {'action': 'list_categories'}),
('', ['--list_sorts'], {'action': 'list_sorts'}),
('', ['term'], {'action': 'search', 'source': 'tpb'}),
('', ['-L', 'filename', 'term'], {'action': 'search', 'source': 'local_tpb', 'database': 'filename'}),
('', ['term', '-S', 'dir'], {'action': 'search', 'save_directory': 'dir'}),
('', ['-E', 'localhost:1337'], {'transmission_command': ['transmission-remote', 'localhost:1337']}),
('',
['-L', 'filename', 'term'],
{'action': 'search', 'source': 'local_tpb',
'database': 'filename'}),
('',
['term', '-S', 'dir'],
{'action': 'search', 'save_directory': 'dir'}),
('',
['-E', 'localhost:1337'],
{'transmission_command':
['transmission-remote', 'localhost:1337']}),
('', ['term'], {'output': 'browser_open'}),
('', ['term', '-t'], {'output': 'transmission'}),
('', ['term', '--save-magnets'], {'output': 'save_magnet_files'}),
('', ['term', '--save-torrents'], {'output': 'save_torrent_files'}),
('', ['term', '-C', 'command'], {'output': 'open_command', 'open_command': 'command'}),
('',
['term', '-C', 'command'],
{'output': 'open_command', 'open_command': 'command'}),
('', ['internets'], {'action': 'search', 'search': ['internets']}),
('', ['internets lol', 'lel'], {'action': 'search', 'search': ['internets lol', 'lel']}),
('',
['term', '--save-torrents'],
{'output': 'save_torrent_files'}),
('',
['internets lol', 'lel'],
{'action': 'search', 'search': ['internets lol', 'lel']}),
]
for test in tests:
args = pirate.pirate.parse_args(test[1])
@ -148,29 +172,36 @@ class TestPirate(unittest.TestCase):
self.assertEqual(test[2][option], value)
def test_search_mirrors(self):
args = Namespace(pages=1, category=100, sort=10,
action='browse', search=[],
mirror=[pirate.data.default_mirror])
args = Namespace(
category=100, sort=10,
action='browse', search=[],
mirror=[pirate.data.default_mirror],
timeout=pirate.data.default_timeout)
class MockResponse():
readlines = mock.MagicMock(return_value=[x.encode('utf-8') for x in ['', '', '', 'https://example.com']])
readlines = mock.MagicMock(
return_value=[
x.encode('utf-8') for x in
['', '', '', 'https://example.com']])
info = mock.MagicMock()
getcode = mock.MagicMock(return_value=200)
response_obj = MockResponse()
returns = [None, ([], 'https://example.com')]
printer = MagicMock(Printer)
with patch('urllib.request.urlopen', return_value=response_obj) as urlopen:
with patch('pirate.torrent.remote', return_value=[]) as remote:
with patch('pirate.pirate.connect_mirror',
side_effect=returns) as connect:
with patch('urllib.request.urlopen', return_value=response_obj):
results, mirror = pirate.pirate.search_mirrors(printer, args)
self.assertEqual(results, [])
self.assertEqual(mirror, pirate.data.default_mirror)
remote.assert_called_once_with(printer=printer, pages=1, category=100, sort=10, mode='browse', terms=[], mirror=pirate.data.default_mirror)
with patch('pirate.torrent.remote', side_effect=[socket.timeout, []]) as remote:
results, mirror = pirate.pirate.search_mirrors(printer, args)
self.assertEqual(results, [])
self.assertEqual(mirror, 'https://example.com')
remote.assert_has_calls([
call(printer=printer, pages=1, category=100, sort=10, mode='browse', terms=[], mirror=pirate.data.default_mirror),
call(printer=printer, pages=1, category=100, sort=10, mode='browse', terms=[], mirror='https://example.com')
])
connect.assert_has_calls([
call(pirate.data.default_mirror, printer, args),
call('https://example.com', printer, args)])
self.assertEqual(results, [])
self.assertEqual(mirror, 'https://example.com')
if __name__ == '__main__':
unittest.main()

View File

@ -1,8 +1,10 @@
#!/usr/bin/env python3
import os
import unittest
from unittest.mock import patch, call, MagicMock
import json
import sys
from unittest.mock import patch, call, MagicMock
from pirate.print import Printer
@ -19,17 +21,21 @@ class TestPrint(unittest.TestCase):
mock = MockTable()
printer = Printer(False)
printer.print = MagicMock()
with patch('veryprettytable.VeryPrettyTable', return_value=mock) as prettytable:
with patch('veryprettytable.VeryPrettyTable',
return_value=mock) as prettytable:
results = [{
'magnet': 'dn=name',
'seeds': 1,
'name': 'name',
'seeders': 1,
'leechers': 2,
'size': ['3','MiB'],
'size': '3.0 MiB',
'uploaded': 'never'
}]
printer.search_results(results)
prettytable.assert_called_once_with(['LINK', 'SEED', 'LEECH', 'RATIO', 'SIZE', 'UPLOAD', 'NAME'])
mock.add_row.assert_has_calls([call([0, 1, 2, '0.5', '3.0 MiB', 'never', 'name'])])
prettytable.assert_called_once_with([
'LINK', 'SEED', 'LEECH', 'RATIO',
'SIZE', 'UPLOAD', 'NAME'])
mock.add_row.assert_has_calls([
call([0, 1, 2, '0.5', '3.0 MiB', 'never', 'name'])])
def test_print_results_local(self):
class MockTable:
@ -38,29 +44,36 @@ class TestPrint(unittest.TestCase):
mock = MockTable()
printer = Printer(False)
printer.print = MagicMock()
with patch('veryprettytable.VeryPrettyTable', return_value=mock) as prettytable:
with patch('veryprettytable.VeryPrettyTable',
return_value=mock) as prettytable:
results = [{
'magnet': 'dn=name',
'name': 'name1',
'date': '1',
'size': '1',
},{
'magnet': 'dn=name2',
}, {
'name': 'name2',
'date': '2',
'size': '2',
}]
printer.search_results(results, local=True)
prettytable.assert_called_once_with(['LINK', 'DATE', 'SIZE', 'NAME'])
mock.add_row.assert_has_calls([call([0, '1', '1', 'name']), call([1, '2', '2', 'name2'])])
prettytable.assert_called_once_with(
['LINK', 'DATE', 'SIZE', 'NAME'])
mock.add_row.assert_has_calls(
[call([0, '1', '1', 'name1']), call([1, '2', '2', 'name2'])])
def test_print_color(self):
printer = Printer(False)
with patch('pirate.print.builtins.print') as mock_print:
printer.print('abc', color='zebra_1')
mock_print.assert_called_once_with('abc')
mock_print.assert_called_once_with(
'abc',
file=sys.stderr)
printer = Printer(True)
with patch('pirate.print.builtins.print') as mock_print:
printer.print('abc', color='zebra_1')
mock_print.assert_called_once_with('\x1b[34mabc', '\x1b[0m')
mock_print.assert_called_once_with(
'\x1b[34mabc', '\x1b[0m',
file=sys.stderr)
def test_print_results_local2(self):
class MockTable:
@ -69,57 +82,77 @@ class TestPrint(unittest.TestCase):
mock = MockTable()
printer = Printer(True)
printer.print = MagicMock()
with patch('veryprettytable.VeryPrettyTable', return_value=mock) as prettytable:
with patch('veryprettytable.VeryPrettyTable',
return_value=mock) as prettytable:
results = [{
'magnet': 'dn=name',
'name': 'name1',
'date': '1',
'size': '1',
},{
'magnet': 'dn=name2',
}, {
'name': 'name2',
'date': '2',
'size': '2',
}]
printer.search_results(results, local=True)
prettytable.assert_called_once_with(['LINK', 'DATE', 'SIZE', 'NAME'])
mock.add_row.assert_has_calls([call([0, '1', '1', 'name']), call([1, '2', '2', 'name2'], fore_color='blue')])
prettytable.assert_called_once_with(
['LINK', 'DATE', 'SIZE', 'NAME'])
mock.add_row.assert_has_calls([
call([0, '1', '1', 'name1']),
call([1, '2', '2', 'name2'], fore_color='blue')])
def test_print_descriptions(self):
printer = Printer(False)
printer.print = MagicMock()
class MockRequest():
add_header = MagicMock()
request_obj = MockRequest()
class MockResponse():
read = MagicMock(return_value='<html><div class="nfo"><pre>stuff <a href="href">link</a></pre></div></html>'.encode('utf8'))
read = MagicMock(return_value=json.dumps(
{'name': 'cool torrent',
'descr': 'A fake torrent.\n'}))
info = MagicMock()
response_obj = MockResponse()
class MockOpener():
open = MagicMock(return_value=response_obj)
add_handler = MagicMock()
opener_obj = MockOpener()
with patch('urllib.request.Request', return_value=request_obj) as request:
with patch('urllib.request.OpenerDirector', return_value=opener_obj) as opener:
printer.descriptions([0], [{'id': '1', 'magnet': 'dn=name'}], 'example.com')
printer.print.assert_has_calls([call('Description for "name":', color='zebra_1'),call('stuff [link](href)', color='zebra_0')])
with patch('urllib.request.Request', return_value=request_obj):
with patch('urllib.request.urlopen',
return_value=response_obj):
printer.descriptions([0], [{'id': '1', 'name': 'name'}],
'example.com', 9)
printer.print.assert_has_calls([
call('Description for "name":', color='zebra_1'),
call('A fake torrent.\n', color='zebra_0')])
def test_print_file_lists(self):
printer = Printer(False)
printer.print = MagicMock()
class MockRequest():
add_header = MagicMock()
info = MagicMock()
request_obj = MockRequest()
class MockResponse():
read = MagicMock(return_value='<html><tr><td align="left">1.</td><td align="right">filename</tr></html>'.encode('utf8'))
read = MagicMock(return_value=json.dumps(
[{'name': ['readme.txt'], 'size': [16]},
{'name': ['a.mkv'], 'size': [677739464]},
{'name': ['b.nfo'], 'size': [61]}]))
info = MagicMock()
response_obj = MockResponse()
class MockOpener():
open = MagicMock(return_value=response_obj)
add_handler = MagicMock()
opener_obj = MockOpener()
with patch('urllib.request.Request', return_value=request_obj) as request:
with patch('urllib.request.OpenerDirector', return_value=opener_obj) as opener:
printer.file_lists([0], [{'id': '1', 'magnet': 'dn=name'}], 'example.com')
printer.print.assert_has_calls([call('Files in "name":', color='zebra_1'),call(' 1. filename', color='zebra_0')])
with patch('urllib.request.Request',
return_value=request_obj):
with patch('urllib.request.urlopen',
return_value=response_obj):
printer.file_lists([0], [{'id': '1', 'name': 'name'}],
'example.com', 9)
printer.print.assert_has_calls([
call('Files in name:', color='zebra_1'),
call(' 16 B readme.txt', color='zebra_0'),
call(' 646.3 MiB a.mkv', color='zebra_1'),
call(' 61 B b.nfo', color='zebra_0')])
if __name__ == '__main__':
unittest.main()

View File

@ -2,48 +2,43 @@
import unittest
from unittest import mock
from unittest.mock import patch, MagicMock
import os
import io
import urllib
import json
import os
import time
import pirate.torrent
import pirate.data
from pirate.print import Printer
from tests import util
class TestTorrent(unittest.TestCase):
@classmethod
def setUpClass(cls):
# to make test deterministic
os.environ['TZ'] = 'Etc/UTC'
time.tzset()
def test_no_hits(self):
res = util.read_data('no_hits.html')
actual = pirate.torrent.parse_page(res)
expected = []
with util.open_data('no_hits.json') as res:
actual = pirate.torrent.parse_page(res)
self.assertEqual(actual, expected)
def test_blocked_mirror(self):
res = util.read_data('blocked.html')
with self.assertRaises(IOError):
pirate.torrent.parse_page(res)
with util.open_data('blocked.html') as res:
with self.assertRaises(IOError):
pirate.torrent.parse_page(res)
def test_search_results(self):
res = util.read_data('dan_bull_search.html')
actual = pirate.torrent.parse_page(res)
expected = [
{'uploaded': '04-04\xa02014', 'seeds': '16', 'leechers': '1', 'id': '9890864', 'magnet': 'magnet:?xt=urn:btih:30df4f8b42b8fd77f5e5aa34abbffe97f5e81fbf&dn=Dan+Croll+%26bull%3B+Sweet+Disarray+%5B2014%5D+320&tr=udp%3A%2F%2Ftracker.openbittorrent.com%3A80&tr=udp%3A%2F%2Fopen.demonii.com%3A1337&tr=udp%3A%2F%2Ftracker.coppersurfer.tk%3A6969&tr=udp%3A%2F%2Fexodus.desync.com%3A6969', 'size': ['89.33', 'MiB']},
{'uploaded': '03-02\xa02014', 'seeds': '4', 'leechers': '0', 'id': '9684858', 'magnet': 'magnet:?xt=urn:btih:7abd3eda600996b8e6fc9a61b83288e0c6ac0d83&dn=Dan+Bull+-+Massive+Collection&tr=udp%3A%2F%2Ftracker.openbittorrent.com%3A80&tr=udp%3A%2F%2Fopen.demonii.com%3A1337&tr=udp%3A%2F%2Ftracker.coppersurfer.tk%3A6969&tr=udp%3A%2F%2Fexodus.desync.com%3A6969', 'size': ['294', 'MiB']},
{'uploaded': '01-19\xa02013', 'seeds': '2', 'leechers': '0', 'id': '8037968', 'magnet': 'magnet:?xt=urn:btih:8f8d68fd0a51237c89692c428ed8a8f64a969c70&dn=Dan+Bull+-+Generation+Gaming+-+2013&tr=udp%3A%2F%2Ftracker.openbittorrent.com%3A80&tr=udp%3A%2F%2Fopen.demonii.com%3A1337&tr=udp%3A%2F%2Ftracker.coppersurfer.tk%3A6969&tr=udp%3A%2F%2Fexodus.desync.com%3A6969', 'size': ['54.86', 'MiB']},
{'uploaded': '01-21\xa02010', 'seeds': '1', 'leechers': '0', 'id': '5295449', 'magnet': 'magnet:?xt=urn:btih:3da6a0fdc1d67a768cb32597e926abdf3e1a2fdd&dn=Dan+Bull+Collection&tr=udp%3A%2F%2Ftracker.openbittorrent.com%3A80&tr=udp%3A%2F%2Fopen.demonii.com%3A1337&tr=udp%3A%2F%2Ftracker.coppersurfer.tk%3A6969&tr=udp%3A%2F%2Fexodus.desync.com%3A6969', 'size': ['236.78', 'MiB']},
{'uploaded': '09-02\xa02014', 'seeds': '1', 'leechers': '0', 'id': '10954408', 'magnet': 'magnet:?xt=urn:btih:5cd371a235317319db7da52c64422f9c2ac75d77&dn=Dan+Bull+-+The+Garden+%7B2014-Album%7D&tr=udp%3A%2F%2Ftracker.openbittorrent.com%3A80&tr=udp%3A%2F%2Fopen.demonii.com%3A1337&tr=udp%3A%2F%2Ftracker.coppersurfer.tk%3A6969&tr=udp%3A%2F%2Fexodus.desync.com%3A6969', 'size': ['36.27', 'MiB']},
{'uploaded': '09-27\xa02009', 'seeds': '0', 'leechers': '1', 'id': '5101630', 'magnet': 'magnet:?xt=urn:btih:4e14dbd077c920875be4c15971b23b609ad6716a&dn=Dan+Bull+-+Dear+Lily+%5Ban+open+letter+to+Lily+Allen%5D+-+2009%5BMP3+%40&tr=udp%3A%2F%2Ftracker.openbittorrent.com%3A80&tr=udp%3A%2F%2Fopen.demonii.com%3A1337&tr=udp%3A%2F%2Ftracker.coppersurfer.tk%3A6969&tr=udp%3A%2F%2Fexodus.desync.com%3A6969', 'size': ['5.51', 'MiB']},
{'uploaded': '11-29\xa02009', 'seeds': '0', 'leechers': '0', 'id': '5185893', 'magnet': 'magnet:?xt=urn:btih:5d9319cf852f7462422cb1bffc37b65174645047&dn=Dan+Bull+-+Dear+Mandy+%5Ban+open+letter+to+Lord+Mandelson%5D&tr=udp%3A%2F%2Ftracker.openbittorrent.com%3A80&tr=udp%3A%2F%2Fopen.demonii.com%3A1337&tr=udp%3A%2F%2Ftracker.coppersurfer.tk%3A6969&tr=udp%3A%2F%2Fexodus.desync.com%3A6969', 'size': ['5.07', 'MiB']},
{'uploaded': '11-10\xa02011', 'seeds': '0', 'leechers': '0', 'id': '6806996', 'magnet': 'magnet:?xt=urn:btih:1c54af57426f53fdef4bbf1a9dbddf32f7b4988a&dn=Dan+Bull+-+Dear+Lily+%28Lily+Allen%29+%28Song+about+filesharing%29&tr=udp%3A%2F%2Ftracker.openbittorrent.com%3A80&tr=udp%3A%2F%2Fopen.demonii.com%3A1337&tr=udp%3A%2F%2Ftracker.coppersurfer.tk%3A6969&tr=udp%3A%2F%2Fexodus.desync.com%3A6969', 'size': ['5.34', 'MiB']},
{'uploaded': '12-20\xa02011', 'seeds': '0', 'leechers': '0', 'id': '6901871', 'magnet': 'magnet:?xt=urn:btih:942c5bf3e1e9bc263939e13cea6ad7bd5f62aa36&dn=Dan+Bull+-+SOPA+Cabana.mp3&tr=udp%3A%2F%2Ftracker.openbittorrent.com%3A80&tr=udp%3A%2F%2Fopen.demonii.com%3A1337&tr=udp%3A%2F%2Ftracker.coppersurfer.tk%3A6969&tr=udp%3A%2F%2Fexodus.desync.com%3A6969', 'size': ['4.8', 'MiB']},
{'uploaded': '12-21\xa02011', 'seeds': '0', 'leechers': '1', 'id': '6902247', 'magnet': 'magnet:?xt=urn:btih:d376f68a31b0db652234e790ed7256ac5e32db57&dn=Dan+Bull+-+SOPA+Cabana&tr=udp%3A%2F%2Ftracker.openbittorrent.com%3A80&tr=udp%3A%2F%2Fopen.demonii.com%3A1337&tr=udp%3A%2F%2Ftracker.coppersurfer.tk%3A6969&tr=udp%3A%2F%2Fexodus.desync.com%3A6969', 'size': ['3.4', 'MiB']},
{'uploaded': '12-21\xa02011', 'seeds': '0', 'leechers': '1', 'id': '6903548', 'magnet': 'magnet:?xt=urn:btih:28163770a532eb24b9e0865878288a9bbdb7a5e6&dn=Dan+Bull+-+SOPA+Cabana+%5BWORKING%5D&tr=udp%3A%2F%2Ftracker.openbittorrent.com%3A80&tr=udp%3A%2F%2Fopen.demonii.com%3A1337&tr=udp%3A%2F%2Ftracker.coppersurfer.tk%3A6969&tr=udp%3A%2F%2Fexodus.desync.com%3A6969', 'size': ['4.8', 'MiB']},
{'uploaded': '03-09\xa02012', 'seeds': '0', 'leechers': '1', 'id': '7088979', 'magnet': 'magnet:?xt=urn:btih:779ab0f13a3fbb12ba68b27721491e4d143f26eb&dn=Dan+Bull+-+Bye+Bye+BPI+2012++%5BMP3%40192%5D%28oan%29&tr=udp%3A%2F%2Ftracker.openbittorrent.com%3A80&tr=udp%3A%2F%2Fopen.demonii.com%3A1337&tr=udp%3A%2F%2Ftracker.coppersurfer.tk%3A6969&tr=udp%3A%2F%2Fexodus.desync.com%3A6969', 'size': ['60.72', 'MiB']},
{'uploaded': '10-24\xa02012', 'seeds': '0', 'leechers': '0', 'id': '7756344', 'magnet': 'magnet:?xt=urn:btih:2667e4795bd5c868dedcabcb52943f4bb7212bab&dn=Dan+Bull+-+Dishonored+%5BExplicit+ver.%5D+%28Single+2012%29&tr=udp%3A%2F%2Ftracker.openbittorrent.com%3A80&tr=udp%3A%2F%2Fopen.demonii.com%3A1337&tr=udp%3A%2F%2Ftracker.coppersurfer.tk%3A6969&tr=udp%3A%2F%2Fexodus.desync.com%3A6969', 'size': ['6.29', 'MiB']},
{'uploaded': '11-10\xa02012', 'seeds': '0', 'leechers': '0', 'id': '7812951', 'magnet': 'magnet:?xt=urn:btih:16364f83c556ad0fd3bb57a4a7c890e7e8087414&dn=Halo+4+EPIC+Rap+By+Dan+Bull&tr=udp%3A%2F%2Ftracker.openbittorrent.com%3A80&tr=udp%3A%2F%2Fopen.demonii.com%3A1337&tr=udp%3A%2F%2Ftracker.coppersurfer.tk%3A6969&tr=udp%3A%2F%2Fexodus.desync.com%3A6969', 'size': ['6.41', 'MiB']},
{'uploaded': '01-19\xa02013', 'seeds': '0', 'leechers': '1', 'id': '8037899', 'magnet': 'magnet:?xt=urn:btih:843b466d9fd1f0bee3a476573b272dc2d6d0ebae&dn=Dan+Bull+-+Generation+Gaming+-+2013&tr=udp%3A%2F%2Ftracker.openbittorrent.com%3A80&tr=udp%3A%2F%2Fopen.demonii.com%3A1337&tr=udp%3A%2F%2Ftracker.coppersurfer.tk%3A6969&tr=udp%3A%2F%2Fexodus.desync.com%3A6969', 'size': ['54.87', 'MiB']}
]
with util.open_data('result.json') as file:
expected = json.load(file)
with util.open_data('debian_iso.json') as res:
actual = pirate.torrent.parse_page(res)
json.dump(actual, open('result.json', 'w'))
self.assertEqual(actual, expected)
def test_parse_category(self):
@ -60,83 +55,113 @@ class TestTorrent(unittest.TestCase):
def test_parse_sort(self):
sort = pirate.torrent.parse_sort(MagicMock(Printer), 'SeedersDsc')
self.assertEqual(7, sort)
self.assertEqual(['seeders', True], sort)
sort = pirate.torrent.parse_sort(MagicMock(Printer), 'CategoryAsc')
self.assertEqual(['category', False], sort)
sort = pirate.torrent.parse_sort(MagicMock(Printer), 'DateAsc')
self.assertEqual(['raw_uploaded', False], sort)
sort = pirate.torrent.parse_sort(MagicMock(Printer), '7')
self.assertEqual(7, sort)
self.assertEqual(['seeders', True], sort)
sort = pirate.torrent.parse_sort(MagicMock(Printer), 'asdf')
self.assertEqual(99, sort)
self.assertEqual(['seeders', True], sort)
sort = pirate.torrent.parse_sort(MagicMock(Printer), '7000')
self.assertEqual(99, sort)
self.assertEqual(['seeders', True], sort)
def test_request_path(self):
# the args are (page, category, sort, mode, terms)
tests = [
((0, 100, 10, 'browse', []), '/browse/100/0/10'),
((0, 0, 10, 'browse', []), '/browse/100/0/10'),
((0, 0, 10, 'recent', []), '/top/48hall'),
((0, 100, 10, 'recent', []), '/top/48h100'),
((0, 100, 10, 'top', []), '/top/100'),
((0, 0, 10, 'top', []), '/top/all'),
((0, 100, 10, 'search', ['abc']), '/search/abc/0/10/100'),
((0, 100, 10, 'search', ['abc', 'def']), '/search/abc+def/0/10/100'),
((0, 100, 10, 'search', [u'\u1234']), '/search/%E1%88%B4/0/10/100'),
((0, 100, 10, 'asdf', []), Exception),
# the args are (mode, category, terms)
succeed = [
(('recent', 1, 0, []), '/precompiled/data_top100_recent_1.json'),
(('recent', 2, 100, []), '/precompiled/data_top100_recent_2.json'),
(('top', 1, 0, []), '/precompiled/data_top100_all.json'),
(('top', 1, 100, []), '/precompiled/data_top100_100.json'),
(('search', 1, 100, ['abc']), '/q.php?q=abc&cat=100'),
(('search', 1, 100, ['abc', 'def']), '/q.php?q=abc%20def&cat=100'),
(('search', 1, 100, ['\u1234']), '/q.php?q=%E1%88%B4&cat=100'),
(('browse', 1, 100, []), '/q.php?q=category%3A100'),
]
for test in tests:
if test[1] != Exception:
path = pirate.torrent.build_request_path(*test[0])
self.assertEqual(test[1], path)
else:
with self.assertRaises(test[1]):
pirate.torrent.build_request_path(test[0])
fail = [
(('browse', 1, 0, []), Exception),
(('asdf', 1, 100, []), Exception)
]
for inp, out in succeed:
path = pirate.torrent.build_request_path(*inp)
self.assertEqual(out, path)
for inp, out, in fail:
with self.assertRaises(out):
pirate.torrent.build_request_path(*inp)
@patch('pirate.torrent.get_torrent')
def test_save_torrents(self, get_torrent):
with patch('pirate.torrent.open', mock.mock_open(), create=True) as open_:
magnet = 'magnet:?xt=urn:btih:335fcd3cfbecc85554616d73de888033c6c16d37&dn=Test+Drive+Unl\im/ited+%5BPC+Version%5D&tr=udp%3A%2F%2Ftracker.openbittorrent.com%3A80&tr=udp%3A%2F%2Fopen.demonii.com%3A1337&tr=udp%3A%2F%2Ftracker.coppersurfer.tk%3A6969&tr=udp%3A%2F%2Fexodus.desync.com%3A6969'
pirate.torrent.save_torrents(MagicMock(Printer), [0], [{'magnet':magnet}], 'path')
get_torrent.assert_called_once_with(293294978876299923284263767676068334936407502135)
open_.assert_called_once_with('path/Test Drive Unl_im_ited [PC Version].torrent', 'wb')
with patch('pirate.torrent.open',
mock.mock_open(), create=True) as open_:
pirate.torrent.save_torrents(
MagicMock(Printer), [0],
[{'name': 'cool torrent',
'info_hash': 3735928559,
'magnet': 'magnet:?xt=urn:btih:deadbeef'}], 'path', 9)
get_torrent.assert_called_once_with(3735928559, 9)
open_.assert_called_once_with('path/cool torrent.torrent', 'wb')
@patch('pirate.torrent.get_torrent', side_effect=urllib.error.HTTPError('', '', '', '', io.StringIO()))
@patch('pirate.torrent.get_torrent',
side_effect=urllib.error.HTTPError('', '', '', '', io.StringIO()))
def test_save_torrents_fail(self, get_torrent):
magnet = 'magnet:?xt=urn:btih:335fcd3cfbecc85554616d73de888033c6c16d37&dn=Test+Drive+Unlimited+%5BPC+Version%5D&tr=udp%3A%2F%2Ftracker.openbittorrent.com%3A80&tr=udp%3A%2F%2Fopen.demonii.com%3A1337&tr=udp%3A%2F%2Ftracker.coppersurfer.tk%3A6969&tr=udp%3A%2F%2Fexodus.desync.com%3A6969'
pirate.torrent.save_torrents(MagicMock(Printer), [0], [{'magnet':magnet}], 'path')
pirate.torrent.save_torrents(
MagicMock(Printer), [0],
[{'name': 'cool torrent',
'info_hash': 3735928559,
'magnet': 'magnet:?xt=urn:btih:deadbeef'}], 'path', 9)
def test_save_magnets(self):
with patch('pirate.torrent.open', mock.mock_open(), create=True) as open_:
magnet = 'magnet:?xt=urn:btih:335fcd3cfbecc85554616d73de888033c6c16d37&dn=Test+Drive+Unl\im/ited+%5BPC+Version%5D&tr=udp%3A%2F%2Ftracker.openbittorrent.com%3A80&tr=udp%3A%2F%2Fopen.demonii.com%3A1337&tr=udp%3A%2F%2Ftracker.coppersurfer.tk%3A6969&tr=udp%3A%2F%2Fexodus.desync.com%3A6969'
pirate.torrent.save_magnets(MagicMock(Printer), [0], [{'magnet':magnet}], 'path')
open_.assert_called_once_with('path/Test Drive Unl_im_ited [PC Version].magnet', 'w')
with patch('pirate.torrent.open',
mock.mock_open(), create=True) as open_:
pirate.torrent.save_magnets(
MagicMock(Printer), [0],
[{'name': 'cool torrent',
'info_hash': 3735928559,
'magnet': 'magnet:?xt=urn:btih:deadbeef'}], 'path')
open_.assert_called_once_with('path/cool torrent.magnet', 'w')
@patch('urllib.request.urlopen')
def test_get_torrent(self, urlopen):
class MockRequest():
add_header = mock.MagicMock()
request_obj = MockRequest()
with patch('urllib.request.Request', return_value=request_obj) as request:
pirate.torrent.get_torrent(100000000000000)
request.assert_called_once_with('http://itorrents.org/torrent/5AF3107A4000.torrent', headers=pirate.data.default_headers)
urlopen.assert_called_once_with(request_obj, timeout=pirate.data.default_timeout)
with patch('urllib.request.Request', return_value=request_obj) as req:
pirate.torrent.get_torrent(100000000000000, 9)
req.assert_called_once_with(
'http://itorrents.org/torrent/5AF3107A4000.torrent',
headers=pirate.data.default_headers)
urlopen.assert_called_once_with(
request_obj,
timeout=9)
def test_remote(self):
class MockRequest():
add_header = mock.MagicMock()
request_obj = MockRequest()
req_obj = MockRequest()
class MockInfo():
get_content_type = mock.MagicMock(return_value='application/json')
get = mock.MagicMock()
class MockResponse():
read = mock.MagicMock(return_value=b'<html>No hits. Try adding an asterisk in you search phrase.</html>')
info = mock.MagicMock()
response_obj = MockResponse()
class MockOpener():
open = mock.MagicMock(return_value=response_obj)
add_handler = mock.MagicMock()
opener_obj = MockOpener()
with patch('urllib.request.Request', return_value=request_obj) as request:
with patch('urllib.request.OpenerDirector', return_value=opener_obj) as opener:
res = pirate.torrent.remote(MagicMock(Printer), 1, 100, 10, 'browse', [], 'http://example.com')
request.assert_called_once_with('http://example.com/browse/100/0/10', headers=pirate.data.default_headers)
opener_obj.open.assert_called_once_with(request_obj, timeout=pirate.data.default_timeout)
self.assertEqual(res, [])
read = mock.MagicMock(return_value=b'[]')
info = mock.MagicMock(return_value=MockInfo())
res_obj = MockResponse()
sort = pirate.torrent.parse_sort(MagicMock(Printer), 10)
with patch('urllib.request.Request', return_value=req_obj) as req:
with patch('urllib.request.urlopen', return_value=res_obj) as res:
results = pirate.torrent.remote(
MagicMock(Printer), 1, 100, sort, 'top',
[], 'http://example.com', 9)
req.assert_called_once_with(
'http://example.com/precompiled/data_top100_100.json',
headers=pirate.data.default_headers)
res.assert_called_once_with(req_obj, timeout=9)
self.assertEqual(results, [])
if __name__ == '__main__':
unittest.main()

View File

@ -3,6 +3,5 @@ import os
def data_path(name):
return os.path.join(os.path.dirname(os.path.realpath(__file__)), 'data', name)
def read_data(name):
with open(data_path(name)) as f:
return f.read()
def open_data(name):
return open(data_path(name))