1
0
mirror of https://github.com/vikstrous/pirate-get synced 2025-04-23 01:58:39 +02:00

Compare commits

..

No commits in common. "master" and "v0.3.7" have entirely different histories.

17 changed files with 1079 additions and 513 deletions

View File

@ -43,10 +43,6 @@ enabled = false
; path of the database
path = ~/downloads/pirate-get/db
[Search]
; maximum number of results to show
total-results = 50
[Misc]
; specify a custom command for opening the magnet
; ex. myprogram --open %s

View File

@ -5,8 +5,7 @@ import pkgutil
def get_resource(filename):
return pkgutil.get_data(__package__, 'data/' + filename)
version = '0.4.2'
version = '0.3.7'
categories = json.loads(get_resource('categories.json').decode())
sorts = json.loads(get_resource('sorts.json').decode())
@ -15,5 +14,5 @@ blacklist = set(json.loads(get_resource('blacklist.json').decode()))
default_headers = {'User-Agent': 'pirate get'}
default_timeout = 10
default_mirror = 'https://apibay.org'
mirror_list = 'https://proxy-bay.app/list.txt'
default_mirror = 'https://thepiratebay.org/'
mirror_list = 'https://proxybay.bz/list.txt'

View File

@ -3,5 +3,6 @@
"https://piratebaymirror.eu",
"http://proxyspotting.in",
"https://proxyspotting.in",
"https://knaben.xyz/ThePirateBay.php"
"https://knaben.xyz/ThePirateBay.php",
"https://piratesbaycc.com"
]

View File

@ -1,15 +1,15 @@
{
"TitleDsc": [1, "name", true],
"TitleAsc": [2, "name", false],
"DateDsc": [3, "raw_uploaded", true],
"DateAsc": [4, "raw_uploaded", false],
"SizeDsc": [5, "raw_size", true],
"SizeAsc": [6, "raw_size", false],
"SeedersDsc": [7, "seeders", true],
"SeedersAsc": [8, "seeders", false],
"LeechersDsc": [9, "leechers", true],
"LeechersAsc": [10, "leechers", false],
"CategoryDsc": [13, "category", true],
"CategoryAsc": [14, "category", false],
"Default": [99, "seeders", true]
"TitleDsc": 1,
"TitleAsc": 2,
"DateDsc": 3,
"DateAsc": 4,
"SizeDsc": 5,
"SizeAsc": 6,
"SeedersDsc": 7,
"SeedersAsc": 8,
"LeechersDsc": 9,
"LeechersAsc": 10,
"CategoryDsc": 13,
"CategoryAsc": 14,
"Default": 99
}

View File

@ -8,7 +8,7 @@ import socket
import urllib.request as request
import urllib.error
import builtins
import json
import webbrowser
import pirate.data
@ -32,9 +32,6 @@ def parse_config_file(text):
config.set('LocalDB', 'enabled', 'false')
config.set('LocalDB', 'path', expanduser('~/downloads/pirate-get/db'))
config.add_section('Search')
config.set('Search', 'total-results', 50)
config.add_section('Misc')
# TODO: try to use configparser.BasicInterpolation
# for interpolating in the command
@ -45,7 +42,6 @@ def parse_config_file(text):
config.set('Misc', 'transmission-port', '') # for backward compatibility
config.set('Misc', 'colors', 'true')
config.set('Misc', 'mirror', pirate.data.default_mirror)
config.set('Misc', 'timeout', pirate.data.default_timeout)
config.read_string(text)
@ -126,36 +122,31 @@ def parse_torrent_command(l):
def parse_args(args_in):
parser = argparse.ArgumentParser(
description='finds and downloads torrents from the Pirate Bay')
parser.add_argument('-b', '--browse',
parser.add_argument('-b', dest='browse',
action='store_true',
help='display in Browse mode')
parser.add_argument('search',
parser.add_argument('search', metavar='search',
nargs='*', help='term to search for')
parser.add_argument('-c', '--category',
parser.add_argument('-c', dest='category', metavar='category',
help='specify a category to search', default='All')
parser.add_argument('-s', '--sort',
parser.add_argument('-s', dest='sort', metavar='sort',
help='specify a sort option', default='SeedersDsc')
parser.add_argument('-R', '--recent',
action='store_true',
parser.add_argument('-R', dest='recent', action='store_true',
help='torrents uploaded in the last 48hours.'
'*ignored in searches*')
parser.add_argument('-l', '--list-categories',
parser.add_argument('-l', dest='list_categories',
action='store_true',
help='list categories')
parser.add_argument('--list-sorts', '--list_sorts',
parser.add_argument('--list_sorts', dest='list_sorts',
action='store_true',
help='list types by which results can be sorted')
parser.add_argument('-p', '--pages',
default=1, type=int,
help='the number of pages to fetch. '
'(only used with --recent)')
parser.add_argument('-r', '--total-results',
type=int,
help='maximum number of results to show')
help='list Sortable Types')
parser.add_argument('-L', '--local', dest='database',
help='a csv file containing the Pirate Bay database '
'downloaded from '
'https://thepiratebay.org/static/dump/csv/')
parser.add_argument('-p', dest='pages', default=1, type=int,
help='the number of pages to fetch '
"(doesn't work with --local)")
parser.add_argument('-0', dest='first',
action='store_true',
help='choose the top result')
@ -191,14 +182,9 @@ def parse_args(args_in):
parser.add_argument('-m', '--mirror',
type=str, nargs='+',
help='the pirate bay mirror(s) to use')
parser.add_argument('-z', '--timeout', type=int,
help='timeout in seconds for http requests')
parser.add_argument('-v', '--version',
action='store_true',
help='print pirate-get version number')
parser.add_argument('-j', '--json',
action='store_true',
help='print results in JSON format to stdout')
args = parser.parse_args(args_in)
return args
@ -237,13 +223,6 @@ def combine_configs(config, args):
if not args.mirror:
args.mirror = config.get('Misc', 'mirror').split()
if not args.timeout:
args.timeout = int(config.get('Misc', 'timeout'))
config_total_results = int(config.get('Search', 'total-results'))
if not args.total_results and config_total_results:
args.total_results = config_total_results
args.transmission_command = ['transmission-remote']
if args.endpoint:
args.transmission_command.append(args.endpoint)
@ -282,7 +261,6 @@ def combine_configs(config, args):
def connect_mirror(mirror, printer, args):
try:
printer.print('Trying', mirror, end='... ')
url = pirate.torrent.find_api(mirror, args.timeout)
results = pirate.torrent.remote(
printer=printer,
pages=args.pages,
@ -290,8 +268,7 @@ def connect_mirror(mirror, printer, args):
sort=pirate.torrent.parse_sort(printer, args.sort),
mode=args.action,
terms=args.search,
mirror=url,
timeout=args.timeout)
mirror=mirror)
except (urllib.error.URLError, socket.timeout, IOError, ValueError) as e:
printer.print('Failed', color='WARN', end=' ')
printer.print('(', e, ')', sep='')
@ -305,14 +282,14 @@ def search_mirrors(printer, args):
# try default or user mirrors
for mirror in args.mirror:
result = connect_mirror(mirror, printer, args)
if result is not None:
if result:
return result
# download mirror list
try:
req = request.Request(pirate.data.mirror_list,
headers=pirate.data.default_headers)
f = request.urlopen(req, timeout=args.timeout)
f = request.urlopen(req, timeout=pirate.data.default_timeout)
except urllib.error.URLError as e:
raise IOError('Could not fetch mirrors', e.reason)
@ -327,7 +304,7 @@ def search_mirrors(printer, args):
if mirror in pirate.data.blacklist:
continue
result = connect_mirror(mirror, printer, args)
if result is not None:
if result:
return result
else:
raise IOError('No more available mirrors')
@ -336,13 +313,6 @@ def search_mirrors(printer, args):
def pirate_main(args):
printer = Printer(args.color)
# browse mode needs a specific category
if args.browse:
if args.category == 'All' or args.category == 0:
printer.print('You must select a specific category in browse mode.'
' ("All" is not valid)', color='ERROR')
sys.exit(1)
# print version
if args.version:
printer.print('pirate-get, version {}'.format(pirate.data.version))
@ -370,7 +340,7 @@ def pirate_main(args):
cur_color = 'zebra_0'
for key, value in sorted(pirate.data.sorts.items()):
cur_color = 'zebra_0' if cur_color == 'zebra_1' else 'zebra_1'
printer.print(str(value[0]), '\t', key, sep='', color=cur_color)
printer.print(str(value), '\t', key, sep='', color=cur_color)
return
# fetch torrents
@ -395,13 +365,6 @@ def pirate_main(args):
printer.print('No results')
return
if args.json:
print(json.dumps(results))
return
else:
# Results are sorted on the request, so it's safe to remove results here.
if args.total_results:
results = results[0:args.total_results]
printer.search_results(results, local=args.source == 'local_tpb')
# number of results to pick
@ -417,13 +380,13 @@ def pirate_main(args):
printer.print("\nSelect links (Type 'h' for more options"
", 'q' to quit)", end='\b', color='alt')
try:
cmd = builtins.input(': ')
l = builtins.input(': ')
except (KeyboardInterrupt, EOFError):
printer.print('\nCancelled.')
return
try:
code, choices = parse_torrent_command(cmd)
code, choices = parse_torrent_command(l)
# Act on option, if supplied
printer.print('')
if code == 'h':
@ -440,9 +403,9 @@ def pirate_main(args):
printer.print('Bye.', color='alt')
return
elif code == 'd':
printer.descriptions(choices, results, site, args.timeout)
printer.descriptions(choices, results, site)
elif code == 'f':
printer.file_lists(choices, results, site, args.timeout)
printer.file_lists(choices, results, site)
elif code == 'p':
printer.search_results(results)
elif code == 'm':
@ -452,9 +415,8 @@ def pirate_main(args):
pirate.torrent.copy_magnets(printer, choices, results)
elif code == 't':
pirate.torrent.save_torrents(printer, choices, results,
args.save_directory,
args.timeout)
elif not cmd:
args.save_directory)
elif not l:
printer.print('No links entered!', color='WARN')
else:
break
@ -473,8 +435,7 @@ def pirate_main(args):
if args.output == 'save_torrent_files':
printer.print('Saving selected torrents...')
pirate.torrent.save_torrents(printer, choices,
results, args.save_directory,
args.timeout)
results, args.save_directory)
return
for choice in choices:

View File

@ -1,18 +1,17 @@
import builtins
import re
import gzip
import urllib.parse as parse
import urllib.request as request
import shutil
import json
import sys
import pirate.data
import pirate.torrent
import colorama
import veryprettytable as pretty
import veryprettytable
from io import BytesIO
from http.cookiejar import CookieJar
class Printer:
@ -34,10 +33,10 @@ class Printer:
c = color_dict[kwargs.pop('color')]
args = (c + args[0],) + args[1:] + (colorama.Style.RESET_ALL,)
kwargs.pop('color', None)
return builtins.print(*args, file=sys.stderr, **kwargs)
return builtins.print(*args, **kwargs)
else:
kwargs.pop('color', None)
return builtins.print(*args, file=sys.stderr, **kwargs)
return builtins.print(*args, **kwargs)
# TODO: extract the name from the search results
# instead of from the magnet link when possible
@ -46,12 +45,12 @@ class Printer:
even = True
if local:
table = pretty.VeryPrettyTable(['LINK', 'DATE', 'SIZE', 'NAME'])
table = veryprettytable.VeryPrettyTable(['LINK', 'DATE', 'SIZE', 'NAME'])
table.align['SIZE'] = 'r'
table.align['NAME'] = 'l'
else:
table = pretty.VeryPrettyTable(['LINK', 'SEED', 'LEECH',
table = veryprettytable.VeryPrettyTable(['LINK', 'SEED', 'LEECH',
'RATIO', 'SIZE',
'UPLOAD', 'NAME'])
table.align['NAME'] = 'l'
@ -66,15 +65,21 @@ class Printer:
table.padding_width = 1
for n, result in enumerate(results):
torrent_name = result['name']
name = re.search(r'dn=([^\&]*)', result['magnet'])
torrent_name = parse.unquote_plus(name.group(1))
if local:
content = [n, result['date'], result['size'],
torrent_name[:columns - 42]]
content = [n, result['date'], result['size'], torrent_name[:columns - 42]]
else:
no_seeders = int(result['seeders'])
no_seeders = int(result['seeds'])
no_leechers = int(result['leechers'])
size = result['size']
if result['size'] != []:
size = float(result['size'][0])
unit = result['size'][1]
else:
size = 0
unit = '???'
date = result['uploaded']
# compute the S/L ratio (Higher is better)
@ -85,7 +90,8 @@ class Printer:
content = [n, no_seeders, no_leechers,
'{:.1f}'.format(ratio),
size, date, torrent_name[:columns - 50]]
'{:.1f} '.format(size) + unit,
date, torrent_name[:columns - 50]]
if even or not self.enable_color:
table.add_row(content)
@ -96,60 +102,65 @@ class Printer:
even = not even
self.print(table)
def descriptions(self, chosen_links, results, site, timeout):
def descriptions(self, chosen_links, results, site):
jar = CookieJar()
opener = request.build_opener(
request.HTTPErrorProcessor,
request.HTTPCookieProcessor(jar))
for link in chosen_links:
result = results[link]
req = request.Request(
site + '/t.php?id=' + str(result['id']),
path = '/torrent/%s/' % results[link]['id']
req = request.Request(site + path,
headers=pirate.data.default_headers)
req.add_header('Accept-encoding', 'gzip')
f = request.urlopen(req, timeout=timeout)
f = opener.open(req, timeout=pirate.data.default_timeout)
if f.info().get('Content-Encoding') == 'gzip':
f = gzip.GzipFile(fileobj=BytesIO(f.read()))
res = json.load(f)
res = f.read().decode('utf-8')
name = re.search(r'dn=([^\&]*)', results[link]['magnet'])
torrent_name = parse.unquote(name.group(1)).replace('+', ' ')
desc = re.search(r'<div class="nfo">\s*<pre>(.+?)(?=</pre>)',
res, re.DOTALL).group(1)
# Replace HTML links with markdown style versions
desc = re.sub(r'<a href="\s*([^"]+?)\s*"[^>]*>(\s*)([^<]+?)(\s*'
r')</a>', r'\2[\3](\1)\4', res['descr'])
r')</a>', r'\2[\3](\1)\4', desc)
self.print('Description for "{}":'.format(result['name']),
color='zebra_1')
self.print('Description for "%s":' % torrent_name, color='zebra_1')
self.print(desc, color='zebra_0')
def file_lists(self, chosen_links, results, site, timeout):
# the API may returns object instead of list
def get(obj):
try:
return obj[0]
except KeyError:
return obj['0']
def file_lists(self, chosen_links, results, site):
jar = CookieJar()
opener = request.build_opener(
request.HTTPErrorProcessor,
request.HTTPCookieProcessor(jar))
for link in chosen_links:
result = results[link]
req = request.Request(
site + '/f.php?id=' + str(result['id']),
path = '/ajax_details_filelist.php'
query = '?id=' + results[link]['id']
req = request.Request(site + path + query,
headers=pirate.data.default_headers)
req.add_header('Accept-encoding', 'gzip')
f = request.urlopen(req, timeout=timeout)
f = opener.open(req, timeout=pirate.data.default_timeout)
if f.info().get('Content-Encoding') == 'gzip':
f = gzip.GzipFile(fileobj=BytesIO(f.read()))
res = json.load(f)
if len(res) == 1 and 'not found' in get(res[0]['name']):
# TODO: proper html decoding/parsing
res = f.read().decode('utf-8').replace('&nbsp;', ' ')
if 'File list not available.' in res:
self.print('File list not available.')
return
files = re.findall(r'<td align="left">\s*([^<]+?)\s*</td><td ali'
r'gn="right">\s*([^<]+?)\s*</tr>', res)
name = re.search(r'dn=([^\&]*)', results[link]['magnet'])
torrent_name = parse.unquote(name.group(1)).replace('+', ' ')
self.print('Files in {}:'.format(result['name']), color='zebra_1')
self.print('Files in "%s":' % torrent_name, color='zebra_1')
cur_color = 'zebra_0'
for f in res:
name = get(f['name'])
size = pirate.torrent.pretty_size(int(get(f['size'])))
self.print('{:>11} {}'.format(
size, name),
color=cur_color)
for f in files:
self.print('{0[0]:>11} {0[1]}'.format(f), color=cur_color)
cur_color = 'zebra_0' if cur_color == 'zebra_1' else 'zebra_1'

View File

@ -8,10 +8,13 @@ import urllib.error
import os.path
import pirate.data
import json
from datetime import datetime
from bs4 import BeautifulSoup
from io import BytesIO
from http.cookiejar import CookieJar
parser_regex = r'"(magnet\:\?xt=[^"]*)|<td align="right">([^<]+)</td>'
def parse_category(printer, category):
@ -33,184 +36,208 @@ def parse_sort(printer, sort):
sort = int(sort)
except ValueError:
pass
for key, val in pirate.data.sorts.items():
if sort == key or sort == val[0]:
return val[1:]
if sort in pirate.data.sorts.values():
return sort
elif sort in pirate.data.sorts.keys():
return pirate.data.sorts[sort]
else:
printer.print('Invalid sort ignored', color='WARN')
return pirate.data.sorts['Default'][1:]
return 99
def parse_page(page):
results = []
try:
data = json.load(page)
except json.decoder.JSONDecodeError:
raise IOError('invalid JSON in API reply: blocked mirror?')
if len(data) == 1 and 'No results' in data[0]['name']:
return results
for res in data:
res['raw_size'] = int(res['size'])
res['size'] = pretty_size(int(res['size']))
res['magnet'] = build_magnet(res['name'], res['info_hash'])
res['info_hash'] = int(res['info_hash'], 16)
res['raw_uploaded'] = int(res['added'])
res['uploaded'] = pretty_date(res['added'])
res['seeders'] = int(res['seeders'])
res['leechers'] = int(res['leechers'])
res['category'] = int(res['category'])
results.append(res)
return results
def sort_results(sort, res):
key, reverse = sort
return sorted(res, key=lambda x: x[key], reverse=reverse)
def pretty_size(size):
ranges = [('PiB', 1125899906842624),
('TiB', 1099511627776),
('GiB', 1073741824),
('MiB', 1048576),
('KiB', 1024)]
for unit, value in ranges:
if size >= value:
return '{:.1f} {}'.format(size/value, unit)
return str(size) + ' B'
def pretty_date(ts):
date = datetime.fromtimestamp(int(ts))
return date.strftime('%Y-%m-%d %H:%M')
def build_magnet(name, info_hash):
return 'magnet:?xt=urn:btih:{}&dn={}'.format(
info_hash, parse.quote(name, ''))
def build_request_path(mode, page, category, terms):
if mode == 'search':
query = '/q.php?q={}&cat={}'.format(' '.join(terms), category)
elif mode == 'top':
cat = 'all' if category == 0 else category
query = '/precompiled/data_top100_{}.json'.format(cat)
# TODO:
# * warn users when using a sort in a mode that doesn't accept sorts
# * warn users when using search terms in a mode
# that doesn't accept search terms
# * same with page parameter for top and top48h
# * warn the user if trying to use a minor category with top48h
def build_request_path(page, category, sort, mode, terms):
if mode == 'browse':
if(category == 0):
category = 100
return '/browse/{}/{}/{}'.format(category, page, sort)
elif mode == 'recent':
query = '/precompiled/data_top100_recent_{}.json'.format(page)
elif mode == 'browse':
if category == 0:
raise Exception('You must specify a category')
query = '/q.php?q=category:{}'.format(category)
# This is not a typo. There is no / between 48h and the category.
path = '/top/48h'
# only major categories can be used with this mode
if(category == 0):
return path + 'all'
else:
raise Exception('Invalid mode', mode)
return parse.quote(query, '?=&/')
return path + str(category)
elif mode == 'top':
path = '/top/'
if(category == 0):
return path + 'all'
else:
return path + str(category)
elif mode == 'search':
query = urllib.parse.quote_plus(' '.join(terms))
return '/search/{}/{}/{}/{}'.format(query, page, sort, category)
else:
raise Exception('Unknown mode.')
def remote(printer, pages, category, sort, mode, terms, mirror, timeout):
# this returns a list of dictionaries
def parse_page(html):
soup = BeautifulSoup(html, 'html.parser')
tables = soup.find_all('table', id='searchResult')
no_results = re.search(r'No hits\. Try adding an asterisk in '
r'you search phrase\.', html)
# check for a blocked mirror
if not tables and not no_results:
# Contradiction - we found no results,
# but the page didn't say there were no results.
# The page is probably not actually the pirate bay,
# so let's try another mirror
raise IOError('Blocked mirror detected.')
if no_results:
return []
# handle ads disguised as fake result tables
for table in tables:
results = parse_table(table)
if results:
break
else:
raise IOError('Mirror does not contain magnets.')
return results
def parse_table(table):
results = []
for i in range(1, pages + 1):
query = build_request_path(mode, i, category, terms)
# parse the rows one by one (skipping headings)
for row in table('tr')[1:]:
# grab info about the row
row_link = row.find('a', class_='detLink')
if row_link is None:
continue
id_ = row_link['href'].split('/')[2]
seeds, leechers = [i.text for i in row('td')[-2:]]
magnet_tag = row.find(lambda tag: tag.name == 'a' and
tag['href'].startswith('magnet'))
if magnet_tag is None:
continue
magnet = magnet_tag['href']
# parse descriptions separately
description = row.find('font', class_='detDesc').text
size = re.findall(r'(?<=Size )[0-9.]+\s[KMGT]*[i ]*B',
description)[0].split()
uploaded = re.findall(r'(?<=Uploaded ).+(?=\, Size)',
description)[0]
results.append({
'magnet': magnet,
'seeds': seeds,
'leechers': leechers,
'size': size,
'uploaded': uploaded,
'id': id_
})
return results
def remote(printer, pages, category, sort, mode, terms, mirror):
res_l = []
if pages < 1:
raise ValueError('Please provide an integer greater than 0 '
'for the number of pages to fetch.')
# Catch the Ctrl-C exception and exit cleanly
try:
req = request.Request(
mirror + query,
jar = CookieJar()
opener = request.build_opener(
request.HTTPErrorProcessor,
request.HTTPCookieProcessor(jar))
for page in range(pages):
path = build_request_path(page, category, sort, mode, terms)
req = request.Request(mirror + path,
headers=pirate.data.default_headers)
req.add_header('Accept-encoding', 'gzip')
try:
f = request.urlopen(req, timeout=timeout)
f = opener.open(req, timeout=pirate.data.default_timeout)
except urllib.error.URLError as e:
res = e.fp.read().decode()
if e.code == 503 and 'cf-browser-verification' in res:
raise IOError('Cloudflare protected')
raise e
if f.info().get('Content-Encoding') == 'gzip':
f = gzip.GzipFile(fileobj=BytesIO(f.read()))
res = f.read().decode('utf-8')
res_l += parse_page(res)
except KeyboardInterrupt:
printer.print('\nCancelled.')
sys.exit(0)
results.extend(parse_page(f))
return sort_results(sort, results)
return res_l
def find_api(mirror, timeout):
# try common paths
for path in ['', '/apip', '/api.php?url=']:
req = request.Request(mirror + path + '/q.php?q=test&cat=0',
headers=pirate.data.default_headers)
try:
f = request.urlopen(req, timeout=timeout)
if f.info().get_content_type() == 'application/json':
return mirror + path
except urllib.error.HTTPError as e:
res = e.fp.read().decode()
if e.code == 503 and 'cf-browser-verification' in res:
raise IOError('Cloudflare protected')
# extract api path from main.js
req = request.Request(mirror + '/static/main.js',
headers=pirate.data.default_headers)
try:
f = request.urlopen(req, timeout=timeout)
if f.info().get_content_type() == 'application/javascript':
match = re.search("var server='([^']+)'", f.read().decode())
return mirror + match.group(1)
except urllib.error.URLError:
raise IOError('API not found: no main.js')
raise IOError('API not found')
def get_torrent(info_hash, timeout):
def get_torrent(info_hash):
url = 'http://itorrents.org/torrent/{:X}.torrent'
req = request.Request(url.format(info_hash),
headers=pirate.data.default_headers)
req.add_header('Accept-encoding', 'gzip')
torrent = request.urlopen(req, timeout=timeout)
torrent = request.urlopen(req, timeout=pirate.data.default_timeout)
if torrent.info().get('Content-Encoding') == 'gzip':
torrent = gzip.GzipFile(fileobj=BytesIO(torrent.read()))
return torrent.read()
def save_torrents(printer, chosen_links, results, folder, timeout):
def save_torrents(printer, chosen_links, results, folder):
for link in chosen_links:
result = results[link]
torrent_name = result['name'].replace('/', '_').replace('\\', '_')
magnet = results[link]['magnet']
name = re.search(r'dn=([^\&]*)', magnet)
torrent_name = parse.unquote(name.group(1)).replace('+', ' ')
info_hash = int(re.search(r'btih:([a-f0-9]{40})', magnet).group(1), 16)
torrent_name = torrent_name.replace('/', '_').replace('\\', '_')
file = os.path.join(folder, torrent_name + '.torrent')
try:
torrent = get_torrent(result['info_hash'], timeout)
torrent = get_torrent(info_hash)
except urllib.error.HTTPError as e:
printer.print('There is no cached file for this torrent :('
' \nCode: {} - {}'.format(e.code, e.reason),
color='ERROR')
else:
open(file, 'wb').write(torrent)
printer.print('Saved {:X} in {}'.format(result['info_hash'], file))
printer.print('Saved {:X} in {}'.format(info_hash, file))
def save_magnets(printer, chosen_links, results, folder):
for link in chosen_links:
result = results[link]
torrent_name = result['name'].replace('/', '_').replace('\\', '_')
magnet = results[link]['magnet']
name = re.search(r'dn=([^\&]*)', magnet)
torrent_name = parse.unquote(name.group(1)).replace('+', ' ')
info_hash = int(re.search(r'btih:([a-f0-9]{40})', magnet).group(1), 16)
torrent_name = torrent_name.replace('/', '_').replace('\\', '_')
file = os.path.join(folder, torrent_name + '.magnet')
printer.print('Saved {:X} in {}'.format(result['info_hash'], file))
printer.print('Saved {:X} in {}'.format(info_hash, file))
with open(file, 'w') as f:
f.write(result['magnet'] + '\n')
f.write(magnet + '\n')
def copy_magnets(printer, chosen_links, results):
clipboard_text = ''
for link in chosen_links:
result = results[link]
clipboard_text += result['magnet'] + "\n"
printer.print('Copying {:X} to clipboard'.format(result['info_hash']))
magnet = results[link]['magnet']
info_hash = int(re.search(r'btih:([a-fA-F0-9]{40})', magnet).group(1), 16)
clipboard_text += magnet + "\n"
printer.print('Copying {:X} to clipboard'.format(info_hash))
pyperclip.copy(clipboard_text)

View File

@ -18,11 +18,12 @@ if __name__ == '__main__':
author_email='me@viktorstanchev.com',
license='AGPL',
packages=find_packages(),
package_data={'': ["data/*", "tests/data/*"]},
package_data={'': ["data/*.json"]},
entry_points={
'console_scripts': ['pirate-get = pirate.pirate:main']
},
install_requires=['colorama>=0.3.3',
'beautifulsoup4>=4.4.1',
'veryprettytable>=0.8.1',
'pyperclip>=1.6.2'],
keywords=['torrent', 'magnet', 'download', 'tpb', 'client'],

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

200
tests/data/no_hits.html Normal file

File diff suppressed because one or more lines are too long

View File

@ -1 +0,0 @@
[{"id":"0","name":"No results returned","info_hash":"0000000000000000000000000000000000000000","leechers":"0","seeders":"0","num_files":"0","size":"0","username":"","added":"0","status":"member","category":"0","imdb":"","total_found":"1"}]

File diff suppressed because one or more lines are too long

View File

@ -1,6 +1,7 @@
#!/usr/bin/env python3
import socket
import unittest
import subprocess
from argparse import Namespace
from unittest import mock
from unittest.mock import patch, call, MagicMock
@ -27,42 +28,33 @@ class TestPirate(unittest.TestCase):
@patch('subprocess.call')
def test_main(self, mock_call):
result = {
'name': 'derp',
'magnet': 'magnet:?xt=urn:btih:deadbeef&dn=derp',
'seeders': '1',
'magnet': 'dn=derp',
'seeds': '1',
'leechers': '1',
'size': '1 MB',
'size': ('1', 'mb'),
'uploaded': '1',
}
with patch('pirate.pirate.connect_mirror',
return_value=([result], '')):
with patch('pirate.torrent.remote', return_value=[result]) as mock_remote:
config = pirate.pirate.parse_config_file('')
args = pirate.pirate.combine_configs(
config,
pirate.pirate.parse_args(['-0', 'term', '-C', 'blah %s']))
args = pirate.pirate.combine_configs(config, pirate.pirate.parse_args(['-0', 'term', '-C', 'blah %s']))
pirate.pirate.pirate_main(args)
mock_call.assert_called_once_with(
['blah', 'magnet:?xt=urn:btih:deadbeef&dn=derp'])
mock_call.assert_called_once_with(['blah', 'dn=derp'])
@patch('pirate.pirate.builtins.input', return_value='0')
@patch('subprocess.call')
def test_main_choice(self, mock_call, mock_input):
result = {
'name': 'derp',
'magnet': 'magnet:?xt=urn:btih:deadbeef&dn=derp',
'seeders': '1',
'magnet': 'dn=derp',
'seeds': '1',
'leechers': '1',
'size': '1 MB',
'size': ('1', 'mb'),
'uploaded': '1',
}
with patch('pirate.pirate.connect_mirror',
return_value=([result], '')):
with patch('pirate.torrent.remote', return_value=[result]) as mock_remote:
config = pirate.pirate.parse_config_file('')
args = pirate.pirate.combine_configs(
config, pirate.pirate.parse_args(['term', '-C', 'blah %s']))
args = pirate.pirate.combine_configs(config, pirate.pirate.parse_args(['term', '-C', 'blah %s']))
pirate.pirate.pirate_main(args)
mock_call.assert_called_once_with(
['blah', 'magnet:?xt=urn:btih:deadbeef&dn=derp'])
mock_call.assert_called_once_with(['blah', 'dn=derp'])
def test_parse_torrent_command(self):
tests = [
@ -82,9 +74,7 @@ class TestPirate(unittest.TestCase):
[['1-3'], (None, [1,2,3])],
]
for test in tests:
self.assertEqual(
pirate.pirate.parse_torrent_command(*test[0]),
test[1])
self.assertEqual(pirate.pirate.parse_torrent_command(*test[0]), test[1])
def test_parse_config_file(self):
types = {
@ -138,30 +128,16 @@ class TestPirate(unittest.TestCase):
('', ['-l'], {'action': 'list_categories'}),
('', ['--list_sorts'], {'action': 'list_sorts'}),
('', ['term'], {'action': 'search', 'source': 'tpb'}),
('',
['-L', 'filename', 'term'],
{'action': 'search', 'source': 'local_tpb',
'database': 'filename'}),
('',
['term', '-S', 'dir'],
{'action': 'search', 'save_directory': 'dir'}),
('',
['-E', 'localhost:1337'],
{'transmission_command':
['transmission-remote', 'localhost:1337']}),
('', ['-L', 'filename', 'term'], {'action': 'search', 'source': 'local_tpb', 'database': 'filename'}),
('', ['term', '-S', 'dir'], {'action': 'search', 'save_directory': 'dir'}),
('', ['-E', 'localhost:1337'], {'transmission_command': ['transmission-remote', 'localhost:1337']}),
('', ['term'], {'output': 'browser_open'}),
('', ['term', '-t'], {'output': 'transmission'}),
('', ['term', '--save-magnets'], {'output': 'save_magnet_files'}),
('',
['term', '-C', 'command'],
{'output': 'open_command', 'open_command': 'command'}),
('', ['term', '--save-torrents'], {'output': 'save_torrent_files'}),
('', ['term', '-C', 'command'], {'output': 'open_command', 'open_command': 'command'}),
('', ['internets'], {'action': 'search', 'search': ['internets']}),
('',
['term', '--save-torrents'],
{'output': 'save_torrent_files'}),
('',
['internets lol', 'lel'],
{'action': 'search', 'search': ['internets lol', 'lel']}),
('', ['internets lol', 'lel'], {'action': 'search', 'search': ['internets lol', 'lel']}),
]
for test in tests:
args = pirate.pirate.parse_args(test[1])
@ -172,36 +148,29 @@ class TestPirate(unittest.TestCase):
self.assertEqual(test[2][option], value)
def test_search_mirrors(self):
args = Namespace(
category=100, sort=10,
args = Namespace(pages=1, category=100, sort=10,
action='browse', search=[],
mirror=[pirate.data.default_mirror],
timeout=pirate.data.default_timeout)
mirror=[pirate.data.default_mirror])
class MockResponse():
readlines = mock.MagicMock(
return_value=[
x.encode('utf-8') for x in
['', '', '', 'https://example.com']])
readlines = mock.MagicMock(return_value=[x.encode('utf-8') for x in ['', '', '', 'https://example.com']])
info = mock.MagicMock()
getcode = mock.MagicMock(return_value=200)
response_obj = MockResponse()
returns = [None, ([], 'https://example.com')]
printer = MagicMock(Printer)
with patch('pirate.pirate.connect_mirror',
side_effect=returns) as connect:
with patch('urllib.request.urlopen', return_value=response_obj):
with patch('urllib.request.urlopen', return_value=response_obj) as urlopen:
with patch('pirate.torrent.remote', return_value=[]) as remote:
results, mirror = pirate.pirate.search_mirrors(printer, args)
self.assertEqual(results, [])
self.assertEqual(mirror, pirate.data.default_mirror)
remote.assert_called_once_with(printer=printer, pages=1, category=100, sort=10, mode='browse', terms=[], mirror=pirate.data.default_mirror)
with patch('pirate.torrent.remote', side_effect=[socket.timeout, []]) as remote:
results, mirror = pirate.pirate.search_mirrors(printer, args)
connect.assert_has_calls([
call(pirate.data.default_mirror, printer, args),
call('https://example.com', printer, args)])
self.assertEqual(results, [])
self.assertEqual(mirror, 'https://example.com')
remote.assert_has_calls([
call(printer=printer, pages=1, category=100, sort=10, mode='browse', terms=[], mirror=pirate.data.default_mirror),
call(printer=printer, pages=1, category=100, sort=10, mode='browse', terms=[], mirror='https://example.com')
])
if __name__ == '__main__':
unittest.main()

View File

@ -1,10 +1,8 @@
#!/usr/bin/env python3
import os
import unittest
import json
import sys
from unittest.mock import patch, call, MagicMock
from pirate.print import Printer
@ -21,21 +19,17 @@ class TestPrint(unittest.TestCase):
mock = MockTable()
printer = Printer(False)
printer.print = MagicMock()
with patch('veryprettytable.VeryPrettyTable',
return_value=mock) as prettytable:
with patch('veryprettytable.VeryPrettyTable', return_value=mock) as prettytable:
results = [{
'name': 'name',
'seeders': 1,
'magnet': 'dn=name',
'seeds': 1,
'leechers': 2,
'size': '3.0 MiB',
'size': ['3','MiB'],
'uploaded': 'never'
}]
printer.search_results(results)
prettytable.assert_called_once_with([
'LINK', 'SEED', 'LEECH', 'RATIO',
'SIZE', 'UPLOAD', 'NAME'])
mock.add_row.assert_has_calls([
call([0, 1, 2, '0.5', '3.0 MiB', 'never', 'name'])])
prettytable.assert_called_once_with(['LINK', 'SEED', 'LEECH', 'RATIO', 'SIZE', 'UPLOAD', 'NAME'])
mock.add_row.assert_has_calls([call([0, 1, 2, '0.5', '3.0 MiB', 'never', 'name'])])
def test_print_results_local(self):
class MockTable:
@ -44,36 +38,29 @@ class TestPrint(unittest.TestCase):
mock = MockTable()
printer = Printer(False)
printer.print = MagicMock()
with patch('veryprettytable.VeryPrettyTable',
return_value=mock) as prettytable:
with patch('veryprettytable.VeryPrettyTable', return_value=mock) as prettytable:
results = [{
'name': 'name1',
'magnet': 'dn=name',
'date': '1',
'size': '1',
},{
'name': 'name2',
'magnet': 'dn=name2',
'date': '2',
'size': '2',
}]
printer.search_results(results, local=True)
prettytable.assert_called_once_with(
['LINK', 'DATE', 'SIZE', 'NAME'])
mock.add_row.assert_has_calls(
[call([0, '1', '1', 'name1']), call([1, '2', '2', 'name2'])])
prettytable.assert_called_once_with(['LINK', 'DATE', 'SIZE', 'NAME'])
mock.add_row.assert_has_calls([call([0, '1', '1', 'name']), call([1, '2', '2', 'name2'])])
def test_print_color(self):
printer = Printer(False)
with patch('pirate.print.builtins.print') as mock_print:
printer.print('abc', color='zebra_1')
mock_print.assert_called_once_with(
'abc',
file=sys.stderr)
mock_print.assert_called_once_with('abc')
printer = Printer(True)
with patch('pirate.print.builtins.print') as mock_print:
printer.print('abc', color='zebra_1')
mock_print.assert_called_once_with(
'\x1b[34mabc', '\x1b[0m',
file=sys.stderr)
mock_print.assert_called_once_with('\x1b[34mabc', '\x1b[0m')
def test_print_results_local2(self):
class MockTable:
@ -82,77 +69,57 @@ class TestPrint(unittest.TestCase):
mock = MockTable()
printer = Printer(True)
printer.print = MagicMock()
with patch('veryprettytable.VeryPrettyTable',
return_value=mock) as prettytable:
with patch('veryprettytable.VeryPrettyTable', return_value=mock) as prettytable:
results = [{
'name': 'name1',
'magnet': 'dn=name',
'date': '1',
'size': '1',
},{
'name': 'name2',
'magnet': 'dn=name2',
'date': '2',
'size': '2',
}]
printer.search_results(results, local=True)
prettytable.assert_called_once_with(
['LINK', 'DATE', 'SIZE', 'NAME'])
mock.add_row.assert_has_calls([
call([0, '1', '1', 'name1']),
call([1, '2', '2', 'name2'], fore_color='blue')])
prettytable.assert_called_once_with(['LINK', 'DATE', 'SIZE', 'NAME'])
mock.add_row.assert_has_calls([call([0, '1', '1', 'name']), call([1, '2', '2', 'name2'], fore_color='blue')])
def test_print_descriptions(self):
printer = Printer(False)
printer.print = MagicMock()
class MockRequest():
add_header = MagicMock()
request_obj = MockRequest()
class MockResponse():
read = MagicMock(return_value=json.dumps(
{'name': 'cool torrent',
'descr': 'A fake torrent.\n'}))
read = MagicMock(return_value='<html><div class="nfo"><pre>stuff <a href="href">link</a></pre></div></html>'.encode('utf8'))
info = MagicMock()
response_obj = MockResponse()
with patch('urllib.request.Request', return_value=request_obj):
with patch('urllib.request.urlopen',
return_value=response_obj):
printer.descriptions([0], [{'id': '1', 'name': 'name'}],
'example.com', 9)
printer.print.assert_has_calls([
call('Description for "name":', color='zebra_1'),
call('A fake torrent.\n', color='zebra_0')])
class MockOpener():
open = MagicMock(return_value=response_obj)
add_handler = MagicMock()
opener_obj = MockOpener()
with patch('urllib.request.Request', return_value=request_obj) as request:
with patch('urllib.request.OpenerDirector', return_value=opener_obj) as opener:
printer.descriptions([0], [{'id': '1', 'magnet': 'dn=name'}], 'example.com')
printer.print.assert_has_calls([call('Description for "name":', color='zebra_1'),call('stuff [link](href)', color='zebra_0')])
def test_print_file_lists(self):
printer = Printer(False)
printer.print = MagicMock()
class MockRequest():
add_header = MagicMock()
info = MagicMock()
request_obj = MockRequest()
class MockResponse():
read = MagicMock(return_value=json.dumps(
[{'name': ['readme.txt'], 'size': [16]},
{'name': ['a.mkv'], 'size': [677739464]},
{'name': ['b.nfo'], 'size': [61]}]))
read = MagicMock(return_value='<html><tr><td align="left">1.</td><td align="right">filename</tr></html>'.encode('utf8'))
info = MagicMock()
response_obj = MockResponse()
with patch('urllib.request.Request',
return_value=request_obj):
with patch('urllib.request.urlopen',
return_value=response_obj):
printer.file_lists([0], [{'id': '1', 'name': 'name'}],
'example.com', 9)
printer.print.assert_has_calls([
call('Files in name:', color='zebra_1'),
call(' 16 B readme.txt', color='zebra_0'),
call(' 646.3 MiB a.mkv', color='zebra_1'),
call(' 61 B b.nfo', color='zebra_0')])
class MockOpener():
open = MagicMock(return_value=response_obj)
add_handler = MagicMock()
opener_obj = MockOpener()
with patch('urllib.request.Request', return_value=request_obj) as request:
with patch('urllib.request.OpenerDirector', return_value=opener_obj) as opener:
printer.file_lists([0], [{'id': '1', 'magnet': 'dn=name'}], 'example.com')
printer.print.assert_has_calls([call('Files in "name":', color='zebra_1'),call(' 1. filename', color='zebra_0')])
if __name__ == '__main__':
unittest.main()

View File

@ -2,43 +2,48 @@
import unittest
from unittest import mock
from unittest.mock import patch, MagicMock
import os
import io
import urllib
import json
import os
import time
import pirate.torrent
import pirate.data
from pirate.print import Printer
from tests import util
class TestTorrent(unittest.TestCase):
@classmethod
def setUpClass(cls):
# to make test deterministic
os.environ['TZ'] = 'Etc/UTC'
time.tzset()
def test_no_hits(self):
expected = []
with util.open_data('no_hits.json') as res:
res = util.read_data('no_hits.html')
actual = pirate.torrent.parse_page(res)
expected = []
self.assertEqual(actual, expected)
def test_blocked_mirror(self):
with util.open_data('blocked.html') as res:
res = util.read_data('blocked.html')
with self.assertRaises(IOError):
pirate.torrent.parse_page(res)
def test_search_results(self):
with util.open_data('result.json') as file:
expected = json.load(file)
with util.open_data('debian_iso.json') as res:
res = util.read_data('dan_bull_search.html')
actual = pirate.torrent.parse_page(res)
json.dump(actual, open('result.json', 'w'))
expected = [
{'uploaded': '04-04\xa02014', 'seeds': '16', 'leechers': '1', 'id': '9890864', 'magnet': 'magnet:?xt=urn:btih:30df4f8b42b8fd77f5e5aa34abbffe97f5e81fbf&dn=Dan+Croll+%26bull%3B+Sweet+Disarray+%5B2014%5D+320&tr=udp%3A%2F%2Ftracker.openbittorrent.com%3A80&tr=udp%3A%2F%2Fopen.demonii.com%3A1337&tr=udp%3A%2F%2Ftracker.coppersurfer.tk%3A6969&tr=udp%3A%2F%2Fexodus.desync.com%3A6969', 'size': ['89.33', 'MiB']},
{'uploaded': '03-02\xa02014', 'seeds': '4', 'leechers': '0', 'id': '9684858', 'magnet': 'magnet:?xt=urn:btih:7abd3eda600996b8e6fc9a61b83288e0c6ac0d83&dn=Dan+Bull+-+Massive+Collection&tr=udp%3A%2F%2Ftracker.openbittorrent.com%3A80&tr=udp%3A%2F%2Fopen.demonii.com%3A1337&tr=udp%3A%2F%2Ftracker.coppersurfer.tk%3A6969&tr=udp%3A%2F%2Fexodus.desync.com%3A6969', 'size': ['294', 'MiB']},
{'uploaded': '01-19\xa02013', 'seeds': '2', 'leechers': '0', 'id': '8037968', 'magnet': 'magnet:?xt=urn:btih:8f8d68fd0a51237c89692c428ed8a8f64a969c70&dn=Dan+Bull+-+Generation+Gaming+-+2013&tr=udp%3A%2F%2Ftracker.openbittorrent.com%3A80&tr=udp%3A%2F%2Fopen.demonii.com%3A1337&tr=udp%3A%2F%2Ftracker.coppersurfer.tk%3A6969&tr=udp%3A%2F%2Fexodus.desync.com%3A6969', 'size': ['54.86', 'MiB']},
{'uploaded': '01-21\xa02010', 'seeds': '1', 'leechers': '0', 'id': '5295449', 'magnet': 'magnet:?xt=urn:btih:3da6a0fdc1d67a768cb32597e926abdf3e1a2fdd&dn=Dan+Bull+Collection&tr=udp%3A%2F%2Ftracker.openbittorrent.com%3A80&tr=udp%3A%2F%2Fopen.demonii.com%3A1337&tr=udp%3A%2F%2Ftracker.coppersurfer.tk%3A6969&tr=udp%3A%2F%2Fexodus.desync.com%3A6969', 'size': ['236.78', 'MiB']},
{'uploaded': '09-02\xa02014', 'seeds': '1', 'leechers': '0', 'id': '10954408', 'magnet': 'magnet:?xt=urn:btih:5cd371a235317319db7da52c64422f9c2ac75d77&dn=Dan+Bull+-+The+Garden+%7B2014-Album%7D&tr=udp%3A%2F%2Ftracker.openbittorrent.com%3A80&tr=udp%3A%2F%2Fopen.demonii.com%3A1337&tr=udp%3A%2F%2Ftracker.coppersurfer.tk%3A6969&tr=udp%3A%2F%2Fexodus.desync.com%3A6969', 'size': ['36.27', 'MiB']},
{'uploaded': '09-27\xa02009', 'seeds': '0', 'leechers': '1', 'id': '5101630', 'magnet': 'magnet:?xt=urn:btih:4e14dbd077c920875be4c15971b23b609ad6716a&dn=Dan+Bull+-+Dear+Lily+%5Ban+open+letter+to+Lily+Allen%5D+-+2009%5BMP3+%40&tr=udp%3A%2F%2Ftracker.openbittorrent.com%3A80&tr=udp%3A%2F%2Fopen.demonii.com%3A1337&tr=udp%3A%2F%2Ftracker.coppersurfer.tk%3A6969&tr=udp%3A%2F%2Fexodus.desync.com%3A6969', 'size': ['5.51', 'MiB']},
{'uploaded': '11-29\xa02009', 'seeds': '0', 'leechers': '0', 'id': '5185893', 'magnet': 'magnet:?xt=urn:btih:5d9319cf852f7462422cb1bffc37b65174645047&dn=Dan+Bull+-+Dear+Mandy+%5Ban+open+letter+to+Lord+Mandelson%5D&tr=udp%3A%2F%2Ftracker.openbittorrent.com%3A80&tr=udp%3A%2F%2Fopen.demonii.com%3A1337&tr=udp%3A%2F%2Ftracker.coppersurfer.tk%3A6969&tr=udp%3A%2F%2Fexodus.desync.com%3A6969', 'size': ['5.07', 'MiB']},
{'uploaded': '11-10\xa02011', 'seeds': '0', 'leechers': '0', 'id': '6806996', 'magnet': 'magnet:?xt=urn:btih:1c54af57426f53fdef4bbf1a9dbddf32f7b4988a&dn=Dan+Bull+-+Dear+Lily+%28Lily+Allen%29+%28Song+about+filesharing%29&tr=udp%3A%2F%2Ftracker.openbittorrent.com%3A80&tr=udp%3A%2F%2Fopen.demonii.com%3A1337&tr=udp%3A%2F%2Ftracker.coppersurfer.tk%3A6969&tr=udp%3A%2F%2Fexodus.desync.com%3A6969', 'size': ['5.34', 'MiB']},
{'uploaded': '12-20\xa02011', 'seeds': '0', 'leechers': '0', 'id': '6901871', 'magnet': 'magnet:?xt=urn:btih:942c5bf3e1e9bc263939e13cea6ad7bd5f62aa36&dn=Dan+Bull+-+SOPA+Cabana.mp3&tr=udp%3A%2F%2Ftracker.openbittorrent.com%3A80&tr=udp%3A%2F%2Fopen.demonii.com%3A1337&tr=udp%3A%2F%2Ftracker.coppersurfer.tk%3A6969&tr=udp%3A%2F%2Fexodus.desync.com%3A6969', 'size': ['4.8', 'MiB']},
{'uploaded': '12-21\xa02011', 'seeds': '0', 'leechers': '1', 'id': '6902247', 'magnet': 'magnet:?xt=urn:btih:d376f68a31b0db652234e790ed7256ac5e32db57&dn=Dan+Bull+-+SOPA+Cabana&tr=udp%3A%2F%2Ftracker.openbittorrent.com%3A80&tr=udp%3A%2F%2Fopen.demonii.com%3A1337&tr=udp%3A%2F%2Ftracker.coppersurfer.tk%3A6969&tr=udp%3A%2F%2Fexodus.desync.com%3A6969', 'size': ['3.4', 'MiB']},
{'uploaded': '12-21\xa02011', 'seeds': '0', 'leechers': '1', 'id': '6903548', 'magnet': 'magnet:?xt=urn:btih:28163770a532eb24b9e0865878288a9bbdb7a5e6&dn=Dan+Bull+-+SOPA+Cabana+%5BWORKING%5D&tr=udp%3A%2F%2Ftracker.openbittorrent.com%3A80&tr=udp%3A%2F%2Fopen.demonii.com%3A1337&tr=udp%3A%2F%2Ftracker.coppersurfer.tk%3A6969&tr=udp%3A%2F%2Fexodus.desync.com%3A6969', 'size': ['4.8', 'MiB']},
{'uploaded': '03-09\xa02012', 'seeds': '0', 'leechers': '1', 'id': '7088979', 'magnet': 'magnet:?xt=urn:btih:779ab0f13a3fbb12ba68b27721491e4d143f26eb&dn=Dan+Bull+-+Bye+Bye+BPI+2012++%5BMP3%40192%5D%28oan%29&tr=udp%3A%2F%2Ftracker.openbittorrent.com%3A80&tr=udp%3A%2F%2Fopen.demonii.com%3A1337&tr=udp%3A%2F%2Ftracker.coppersurfer.tk%3A6969&tr=udp%3A%2F%2Fexodus.desync.com%3A6969', 'size': ['60.72', 'MiB']},
{'uploaded': '10-24\xa02012', 'seeds': '0', 'leechers': '0', 'id': '7756344', 'magnet': 'magnet:?xt=urn:btih:2667e4795bd5c868dedcabcb52943f4bb7212bab&dn=Dan+Bull+-+Dishonored+%5BExplicit+ver.%5D+%28Single+2012%29&tr=udp%3A%2F%2Ftracker.openbittorrent.com%3A80&tr=udp%3A%2F%2Fopen.demonii.com%3A1337&tr=udp%3A%2F%2Ftracker.coppersurfer.tk%3A6969&tr=udp%3A%2F%2Fexodus.desync.com%3A6969', 'size': ['6.29', 'MiB']},
{'uploaded': '11-10\xa02012', 'seeds': '0', 'leechers': '0', 'id': '7812951', 'magnet': 'magnet:?xt=urn:btih:16364f83c556ad0fd3bb57a4a7c890e7e8087414&dn=Halo+4+EPIC+Rap+By+Dan+Bull&tr=udp%3A%2F%2Ftracker.openbittorrent.com%3A80&tr=udp%3A%2F%2Fopen.demonii.com%3A1337&tr=udp%3A%2F%2Ftracker.coppersurfer.tk%3A6969&tr=udp%3A%2F%2Fexodus.desync.com%3A6969', 'size': ['6.41', 'MiB']},
{'uploaded': '01-19\xa02013', 'seeds': '0', 'leechers': '1', 'id': '8037899', 'magnet': 'magnet:?xt=urn:btih:843b466d9fd1f0bee3a476573b272dc2d6d0ebae&dn=Dan+Bull+-+Generation+Gaming+-+2013&tr=udp%3A%2F%2Ftracker.openbittorrent.com%3A80&tr=udp%3A%2F%2Fopen.demonii.com%3A1337&tr=udp%3A%2F%2Ftracker.coppersurfer.tk%3A6969&tr=udp%3A%2F%2Fexodus.desync.com%3A6969', 'size': ['54.87', 'MiB']}
]
self.assertEqual(actual, expected)
def test_parse_category(self):
@ -55,113 +60,83 @@ class TestTorrent(unittest.TestCase):
def test_parse_sort(self):
sort = pirate.torrent.parse_sort(MagicMock(Printer), 'SeedersDsc')
self.assertEqual(['seeders', True], sort)
sort = pirate.torrent.parse_sort(MagicMock(Printer), 'CategoryAsc')
self.assertEqual(['category', False], sort)
sort = pirate.torrent.parse_sort(MagicMock(Printer), 'DateAsc')
self.assertEqual(['raw_uploaded', False], sort)
self.assertEqual(7, sort)
sort = pirate.torrent.parse_sort(MagicMock(Printer), '7')
self.assertEqual(['seeders', True], sort)
self.assertEqual(7, sort)
sort = pirate.torrent.parse_sort(MagicMock(Printer), 'asdf')
self.assertEqual(['seeders', True], sort)
self.assertEqual(99, sort)
sort = pirate.torrent.parse_sort(MagicMock(Printer), '7000')
self.assertEqual(['seeders', True], sort)
self.assertEqual(99, sort)
def test_request_path(self):
# the args are (mode, category, terms)
succeed = [
(('recent', 1, 0, []), '/precompiled/data_top100_recent_1.json'),
(('recent', 2, 100, []), '/precompiled/data_top100_recent_2.json'),
(('top', 1, 0, []), '/precompiled/data_top100_all.json'),
(('top', 1, 100, []), '/precompiled/data_top100_100.json'),
(('search', 1, 100, ['abc']), '/q.php?q=abc&cat=100'),
(('search', 1, 100, ['abc', 'def']), '/q.php?q=abc%20def&cat=100'),
(('search', 1, 100, ['\u1234']), '/q.php?q=%E1%88%B4&cat=100'),
(('browse', 1, 100, []), '/q.php?q=category%3A100'),
# the args are (page, category, sort, mode, terms)
tests = [
((0, 100, 10, 'browse', []), '/browse/100/0/10'),
((0, 0, 10, 'browse', []), '/browse/100/0/10'),
((0, 0, 10, 'recent', []), '/top/48hall'),
((0, 100, 10, 'recent', []), '/top/48h100'),
((0, 100, 10, 'top', []), '/top/100'),
((0, 0, 10, 'top', []), '/top/all'),
((0, 100, 10, 'search', ['abc']), '/search/abc/0/10/100'),
((0, 100, 10, 'search', ['abc', 'def']), '/search/abc+def/0/10/100'),
((0, 100, 10, 'search', [u'\u1234']), '/search/%E1%88%B4/0/10/100'),
((0, 100, 10, 'asdf', []), Exception),
]
fail = [
(('browse', 1, 0, []), Exception),
(('asdf', 1, 100, []), Exception)
]
for inp, out in succeed:
path = pirate.torrent.build_request_path(*inp)
self.assertEqual(out, path)
for inp, out, in fail:
with self.assertRaises(out):
pirate.torrent.build_request_path(*inp)
for test in tests:
if test[1] != Exception:
path = pirate.torrent.build_request_path(*test[0])
self.assertEqual(test[1], path)
else:
with self.assertRaises(test[1]):
pirate.torrent.build_request_path(test[0])
@patch('pirate.torrent.get_torrent')
def test_save_torrents(self, get_torrent):
with patch('pirate.torrent.open',
mock.mock_open(), create=True) as open_:
pirate.torrent.save_torrents(
MagicMock(Printer), [0],
[{'name': 'cool torrent',
'info_hash': 3735928559,
'magnet': 'magnet:?xt=urn:btih:deadbeef'}], 'path', 9)
get_torrent.assert_called_once_with(3735928559, 9)
open_.assert_called_once_with('path/cool torrent.torrent', 'wb')
with patch('pirate.torrent.open', mock.mock_open(), create=True) as open_:
magnet = 'magnet:?xt=urn:btih:335fcd3cfbecc85554616d73de888033c6c16d37&dn=Test+Drive+Unl\im/ited+%5BPC+Version%5D&tr=udp%3A%2F%2Ftracker.openbittorrent.com%3A80&tr=udp%3A%2F%2Fopen.demonii.com%3A1337&tr=udp%3A%2F%2Ftracker.coppersurfer.tk%3A6969&tr=udp%3A%2F%2Fexodus.desync.com%3A6969'
pirate.torrent.save_torrents(MagicMock(Printer), [0], [{'magnet':magnet}], 'path')
get_torrent.assert_called_once_with(293294978876299923284263767676068334936407502135)
open_.assert_called_once_with('path/Test Drive Unl_im_ited [PC Version].torrent', 'wb')
@patch('pirate.torrent.get_torrent',
side_effect=urllib.error.HTTPError('', '', '', '', io.StringIO()))
@patch('pirate.torrent.get_torrent', side_effect=urllib.error.HTTPError('', '', '', '', io.StringIO()))
def test_save_torrents_fail(self, get_torrent):
pirate.torrent.save_torrents(
MagicMock(Printer), [0],
[{'name': 'cool torrent',
'info_hash': 3735928559,
'magnet': 'magnet:?xt=urn:btih:deadbeef'}], 'path', 9)
magnet = 'magnet:?xt=urn:btih:335fcd3cfbecc85554616d73de888033c6c16d37&dn=Test+Drive+Unlimited+%5BPC+Version%5D&tr=udp%3A%2F%2Ftracker.openbittorrent.com%3A80&tr=udp%3A%2F%2Fopen.demonii.com%3A1337&tr=udp%3A%2F%2Ftracker.coppersurfer.tk%3A6969&tr=udp%3A%2F%2Fexodus.desync.com%3A6969'
pirate.torrent.save_torrents(MagicMock(Printer), [0], [{'magnet':magnet}], 'path')
def test_save_magnets(self):
with patch('pirate.torrent.open',
mock.mock_open(), create=True) as open_:
pirate.torrent.save_magnets(
MagicMock(Printer), [0],
[{'name': 'cool torrent',
'info_hash': 3735928559,
'magnet': 'magnet:?xt=urn:btih:deadbeef'}], 'path')
open_.assert_called_once_with('path/cool torrent.magnet', 'w')
with patch('pirate.torrent.open', mock.mock_open(), create=True) as open_:
magnet = 'magnet:?xt=urn:btih:335fcd3cfbecc85554616d73de888033c6c16d37&dn=Test+Drive+Unl\im/ited+%5BPC+Version%5D&tr=udp%3A%2F%2Ftracker.openbittorrent.com%3A80&tr=udp%3A%2F%2Fopen.demonii.com%3A1337&tr=udp%3A%2F%2Ftracker.coppersurfer.tk%3A6969&tr=udp%3A%2F%2Fexodus.desync.com%3A6969'
pirate.torrent.save_magnets(MagicMock(Printer), [0], [{'magnet':magnet}], 'path')
open_.assert_called_once_with('path/Test Drive Unl_im_ited [PC Version].magnet', 'w')
@patch('urllib.request.urlopen')
def test_get_torrent(self, urlopen):
class MockRequest():
add_header = mock.MagicMock()
request_obj = MockRequest()
with patch('urllib.request.Request', return_value=request_obj) as req:
pirate.torrent.get_torrent(100000000000000, 9)
req.assert_called_once_with(
'http://itorrents.org/torrent/5AF3107A4000.torrent',
headers=pirate.data.default_headers)
urlopen.assert_called_once_with(
request_obj,
timeout=9)
with patch('urllib.request.Request', return_value=request_obj) as request:
pirate.torrent.get_torrent(100000000000000)
request.assert_called_once_with('http://itorrents.org/torrent/5AF3107A4000.torrent', headers=pirate.data.default_headers)
urlopen.assert_called_once_with(request_obj, timeout=pirate.data.default_timeout)
def test_remote(self):
class MockRequest():
add_header = mock.MagicMock()
req_obj = MockRequest()
class MockInfo():
get_content_type = mock.MagicMock(return_value='application/json')
get = mock.MagicMock()
request_obj = MockRequest()
class MockResponse():
read = mock.MagicMock(return_value=b'[]')
info = mock.MagicMock(return_value=MockInfo())
res_obj = MockResponse()
sort = pirate.torrent.parse_sort(MagicMock(Printer), 10)
with patch('urllib.request.Request', return_value=req_obj) as req:
with patch('urllib.request.urlopen', return_value=res_obj) as res:
results = pirate.torrent.remote(
MagicMock(Printer), 1, 100, sort, 'top',
[], 'http://example.com', 9)
req.assert_called_once_with(
'http://example.com/precompiled/data_top100_100.json',
headers=pirate.data.default_headers)
res.assert_called_once_with(req_obj, timeout=9)
self.assertEqual(results, [])
read = mock.MagicMock(return_value=b'<html>No hits. Try adding an asterisk in you search phrase.</html>')
info = mock.MagicMock()
response_obj = MockResponse()
class MockOpener():
open = mock.MagicMock(return_value=response_obj)
add_handler = mock.MagicMock()
opener_obj = MockOpener()
with patch('urllib.request.Request', return_value=request_obj) as request:
with patch('urllib.request.OpenerDirector', return_value=opener_obj) as opener:
res = pirate.torrent.remote(MagicMock(Printer), 1, 100, 10, 'browse', [], 'http://example.com')
request.assert_called_once_with('http://example.com/browse/100/0/10', headers=pirate.data.default_headers)
opener_obj.open.assert_called_once_with(request_obj, timeout=pirate.data.default_timeout)
self.assertEqual(res, [])
if __name__ == '__main__':
unittest.main()

View File

@ -3,5 +3,6 @@ import os
def data_path(name):
return os.path.join(os.path.dirname(os.path.realpath(__file__)), 'data', name)
def open_data(name):
return open(data_path(name))
def read_data(name):
with open(data_path(name)) as f:
return f.read()