1
0
mirror of https://github.com/vikstrous/pirate-get synced 2025-01-09 09:59:51 +01:00

initial work on tpb api support

This commit is contained in:
Michele Guerini Rocco 2020-05-21 01:57:46 +02:00
parent 567ea9db11
commit c23c3db3d8
Signed by: rnhmjoj
GPG Key ID: BFBAF4C975F76450
4 changed files with 126 additions and 208 deletions

View File

@ -5,7 +5,8 @@ import pkgutil
def get_resource(filename):
return pkgutil.get_data(__package__, 'data/' + filename)
version = '0.3.7'
version = '0.4.0'
categories = json.loads(get_resource('categories.json').decode())
sorts = json.loads(get_resource('sorts.json').decode())
@ -14,5 +15,5 @@ blacklist = set(json.loads(get_resource('blacklist.json').decode()))
default_headers = {'User-Agent': 'pirate get'}
default_timeout = 10
default_mirror = 'https://thepiratebay.org/'
default_mirror = 'https://apibay.org'
mirror_list = 'https://proxybay.bz/list.txt'

View File

@ -144,9 +144,6 @@ def parse_args(args_in):
help='a csv file containing the Pirate Bay database '
'downloaded from '
'https://thepiratebay.org/static/dump/csv/')
parser.add_argument('-p', dest='pages', default=1, type=int,
help='the number of pages to fetch '
"(doesn't work with --local)")
parser.add_argument('-0', dest='first',
action='store_true',
help='choose the top result')
@ -261,14 +258,14 @@ def combine_configs(config, args):
def connect_mirror(mirror, printer, args):
try:
printer.print('Trying', mirror, end='... ')
url = pirate.torrent.find_api(mirror)
results = pirate.torrent.remote(
printer=printer,
pages=args.pages,
category=pirate.torrent.parse_category(printer, args.category),
sort=pirate.torrent.parse_sort(printer, args.sort),
mode=args.action,
terms=args.search,
mirror=mirror)
mirror=url)
except (urllib.error.URLError, socket.timeout, IOError, ValueError) as e:
printer.print('Failed', color='WARN', end=' ')
printer.print('(', e, ')', sep='')
@ -380,13 +377,13 @@ def pirate_main(args):
printer.print("\nSelect links (Type 'h' for more options"
", 'q' to quit)", end='\b', color='alt')
try:
l = builtins.input(': ')
cmd = builtins.input(': ')
except (KeyboardInterrupt, EOFError):
printer.print('\nCancelled.')
return
try:
code, choices = parse_torrent_command(l)
code, choices = parse_torrent_command(cmd)
# Act on option, if supplied
printer.print('')
if code == 'h':
@ -416,7 +413,7 @@ def pirate_main(args):
elif code == 't':
pirate.torrent.save_torrents(printer, choices, results,
args.save_directory)
elif not l:
elif not cmd:
printer.print('No links entered!', color='WARN')
else:
break

View File

@ -1,17 +1,17 @@
import builtins
import re
import gzip
import urllib.parse as parse
import urllib.request as request
import shutil
import json
import pirate.data
import pirate.torrent
import colorama
import veryprettytable
from veryprettytable import VeryPrettyTable
from io import BytesIO
from http.cookiejar import CookieJar
class Printer:
@ -45,14 +45,14 @@ class Printer:
even = True
if local:
table = veryprettytable.VeryPrettyTable(['LINK', 'DATE', 'SIZE', 'NAME'])
table = VeryPrettyTable(['LINK', 'DATE', 'SIZE', 'NAME'])
table.align['SIZE'] = 'r'
table.align['NAME'] = 'l'
else:
table = veryprettytable.VeryPrettyTable(['LINK', 'SEED', 'LEECH',
'RATIO', 'SIZE',
'UPLOAD', 'NAME'])
table = VeryPrettyTable(['LINK', 'SEED', 'LEECH',
'RATIO', 'SIZE',
'UPLOAD', 'NAME'])
table.align['NAME'] = 'l'
table.align['SEED'] = 'r'
table.align['LEECH'] = 'r'
@ -65,21 +65,15 @@ class Printer:
table.padding_width = 1
for n, result in enumerate(results):
name = re.search(r'dn=([^\&]*)', result['magnet'])
torrent_name = parse.unquote_plus(name.group(1))
torrent_name = result['name']
if local:
content = [n, result['date'], result['size'], torrent_name[:columns - 42]]
content = [n, result['date'], result['size'],
torrent_name[:columns - 42]]
else:
no_seeders = int(result['seeds'])
no_seeders = int(result['seeders'])
no_leechers = int(result['leechers'])
if result['size'] != []:
size = float(result['size'][0])
unit = result['size'][1]
else:
size = 0
unit = '???'
size = result['size']
date = result['uploaded']
# compute the S/L ratio (Higher is better)
@ -90,8 +84,7 @@ class Printer:
content = [n, no_seeders, no_leechers,
'{:.1f}'.format(ratio),
'{:.1f} '.format(size) + unit,
date, torrent_name[:columns - 50]]
size, date, torrent_name[:columns - 50]]
if even or not self.enable_color:
table.add_row(content)
@ -103,64 +96,63 @@ class Printer:
self.print(table)
def descriptions(self, chosen_links, results, site):
jar = CookieJar()
opener = request.build_opener(
request.HTTPErrorProcessor,
request.HTTPCookieProcessor(jar))
opener = request.build_opener(request.HTTPErrorProcessor)
for link in chosen_links:
path = '/torrent/%s/' % results[link]['id']
req = request.Request(site + path,
headers=pirate.data.default_headers)
result = results[link]
req = request.Request(
site + '/t.php?id=' + result['id'],
headers=pirate.data.default_headers)
req.add_header('Accept-encoding', 'gzip')
f = opener.open(req, timeout=pirate.data.default_timeout)
if f.info().get('Content-Encoding') == 'gzip':
f = gzip.GzipFile(fileobj=BytesIO(f.read()))
res = f.read().decode('utf-8')
name = re.search(r'dn=([^\&]*)', results[link]['magnet'])
torrent_name = parse.unquote(name.group(1)).replace('+', ' ')
desc = re.search(r'<div class="nfo">\s*<pre>(.+?)(?=</pre>)',
res, re.DOTALL).group(1)
res = json.load(f)
# Replace HTML links with markdown style versions
desc = re.sub(r'<a href="\s*([^"]+?)\s*"[^>]*>(\s*)([^<]+?)(\s*'
r')</a>', r'\2[\3](\1)\4', desc)
r')</a>', r'\2[\3](\1)\4', res['descr'])
self.print('Description for "%s":' % torrent_name, color='zebra_1')
self.print('Description for "{}":'.format(result['name']),
color='zebra_1')
self.print(desc, color='zebra_0')
def file_lists(self, chosen_links, results, site):
jar = CookieJar()
opener = request.build_opener(
request.HTTPErrorProcessor,
request.HTTPCookieProcessor(jar))
opener = request.build_opener(request.HTTPErrorProcessor)
# the API may returns object instead of list
def get(obj):
try:
return obj[0]
except KeyError:
return obj['0']
for link in chosen_links:
path = '/ajax_details_filelist.php'
query = '?id=' + results[link]['id']
req = request.Request(site + path + query,
headers=pirate.data.default_headers)
result = results[link]
req = request.Request(
site + '/f.php?id=' + result['id'],
headers=pirate.data.default_headers)
req.add_header('Accept-encoding', 'gzip')
f = opener.open(req, timeout=pirate.data.default_timeout)
if f.info().get('Content-Encoding') == 'gzip':
f = gzip.GzipFile(fileobj=BytesIO(f.read()))
# TODO: proper html decoding/parsing
res = f.read().decode('utf-8').replace('&nbsp;', ' ')
if 'File list not available.' in res:
res = json.load(f)
if len(res) == 1 and 'not found' in get(res[0]['name']):
self.print('File list not available.')
return
files = re.findall(r'<td align="left">\s*([^<]+?)\s*</td><td ali'
r'gn="right">\s*([^<]+?)\s*</tr>', res)
name = re.search(r'dn=([^\&]*)', results[link]['magnet'])
torrent_name = parse.unquote(name.group(1)).replace('+', ' ')
self.print('Files in "%s":' % torrent_name, color='zebra_1')
self.print('Files in {}:'.format(result['name']), color='zebra_1')
cur_color = 'zebra_0'
for f in files:
self.print('{0[0]:>11} {0[1]}'.format(f), color=cur_color)
for f in res:
name = get(f['name'])
size = pirate.torrent.pretty_size(int(get(f['size'])))
self.print('{:>11} {}'.format(
size, name),
color=cur_color)
cur_color = 'zebra_0' if cur_color == 'zebra_1' else 'zebra_1'

View File

@ -8,13 +8,10 @@ import urllib.error
import os.path
import pirate.data
import json
from bs4 import BeautifulSoup
from datetime import datetime
from io import BytesIO
from http.cookiejar import CookieJar
parser_regex = r'"(magnet\:\?xt=[^"]*)|<td align="right">([^<]+)</td>'
def parse_category(printer, category):
@ -45,144 +42,82 @@ def parse_sort(printer, sort):
return 99
# TODO:
# * warn users when using a sort in a mode that doesn't accept sorts
# * warn users when using search terms in a mode
# that doesn't accept search terms
# * same with page parameter for top and top48h
# * warn the user if trying to use a minor category with top48h
def build_request_path(page, category, sort, mode, terms):
if mode == 'browse':
if(category == 0):
category = 100
return '/browse/{}/{}/{}'.format(category, page, sort)
elif mode == 'recent':
# This is not a typo. There is no / between 48h and the category.
path = '/top/48h'
# only major categories can be used with this mode
if(category == 0):
return path + 'all'
else:
return path + str(category)
elif mode == 'top':
path = '/top/'
if(category == 0):
return path + 'all'
else:
return path + str(category)
elif mode == 'search':
query = urllib.parse.quote_plus(' '.join(terms))
return '/search/{}/{}/{}/{}'.format(query, page, sort, category)
else:
raise Exception('Unknown mode.')
def pretty_size(size):
ranges = [('PiB', 1125899906842624),
('TiB', 1099511627776),
('GiB', 1073741824),
('MiB', 1048576),
('KiB', 1024)]
for unit, value in ranges:
if size >= value:
return '{:.1f} {}'.format(size/value, unit)
return str(size) + ' B'
# this returns a list of dictionaries
def parse_page(html):
soup = BeautifulSoup(html, 'html.parser')
tables = soup.find_all('table', id='searchResult')
no_results = re.search(r'No hits\. Try adding an asterisk in '
r'you search phrase\.', html)
# check for a blocked mirror
if not tables and not no_results:
# Contradiction - we found no results,
# but the page didn't say there were no results.
# The page is probably not actually the pirate bay,
# so let's try another mirror
raise IOError('Blocked mirror detected.')
if no_results:
return []
# handle ads disguised as fake result tables
for table in tables:
results = parse_table(table)
if results:
break
else:
raise IOError('Mirror does not contain magnets.')
return results
def pretty_date(ts):
date = datetime.fromtimestamp(int(ts))
return date.strftime('%Y-%m-%d %H:%M')
def parse_table(table):
def make_magnet(name, info_hash):
return 'magnet:?xt=urn:btih:{}&dn={}'.format(
info_hash, parse.quote(name, ''))
def remote(printer, category, sort, mode, terms, mirror):
results = []
# parse the rows one by one (skipping headings)
for row in table('tr')[1:]:
# grab info about the row
row_link = row.find('a', class_='detLink')
if row_link is None:
continue
id_ = row_link['href'].split('/')[2]
seeds, leechers = [i.text for i in row('td')[-2:]]
magnet_tag = row.find(lambda tag: tag.name == 'a' and
tag['href'].startswith('magnet'))
if magnet_tag is None:
continue
magnet = magnet_tag['href']
# parse descriptions separately
description = row.find('font', class_='detDesc').text
size = re.findall(r'(?<=Size )[0-9.]+\s[KMGT]*[i ]*B',
description)[0].split()
uploaded = re.findall(r'(?<=Uploaded ).+(?=\, Size)',
description)[0]
results.append({
'magnet': magnet,
'seeds': seeds,
'leechers': leechers,
'size': size,
'uploaded': uploaded,
'id': id_
})
return results
def remote(printer, pages, category, sort, mode, terms, mirror):
res_l = []
if pages < 1:
raise ValueError('Please provide an integer greater than 0 '
'for the number of pages to fetch.')
# Catch the Ctrl-C exception and exit cleanly
try:
jar = CookieJar()
opener = request.build_opener(
request.HTTPErrorProcessor,
request.HTTPCookieProcessor(jar))
req = request.Request(
'{}/q.php?q={}&cat={}'.format(
mirror, ' '.join(terms), category),
headers=pirate.data.default_headers)
try:
f = request.urlopen(req, timeout=pirate.data.default_timeout)
except urllib.error.URLError as e:
raise e
for page in range(pages):
path = build_request_path(page, category, sort, mode, terms)
if f.info().get('Content-Encoding') == 'gzip':
f = gzip.GzipFile(fileobj=BytesIO(f.read()))
for res in json.load(f):
res['size'] = pretty_size(int(res['size']))
res['magnet'] = make_magnet(res['name'], res['info_hash'])
res['info_hash'] = int(res['info_hash'], 16)
res['uploaded'] = pretty_date(res['added'])
results.append(res)
req = request.Request(mirror + path,
headers=pirate.data.default_headers)
req.add_header('Accept-encoding', 'gzip')
try:
f = opener.open(req, timeout=pirate.data.default_timeout)
except urllib.error.URLError as e:
res = e.fp.read().decode()
if e.code == 503 and 'cf-browser-verification' in res:
raise IOError('Cloudflare protected')
raise e
if f.info().get('Content-Encoding') == 'gzip':
f = gzip.GzipFile(fileobj=BytesIO(f.read()))
res = f.read().decode('utf-8')
res_l += parse_page(res)
return results
except KeyboardInterrupt:
printer.print('\nCancelled.')
sys.exit(0)
return res_l
def find_api(mirror):
# try common paths
for path in ['', '/apip', '/api.php?url=']:
req = request.Request(mirror + path + '/q.php?q=test&cat=0',
headers=pirate.data.default_headers)
try:
f = request.urlopen(req, timeout=pirate.data.default_timeout)
if f.info().get_content_type() == 'application/json':
return mirror + path
except urllib.error.URLError:
pass
# extract api path from main.js
req = request.Request(mirror + '/static/main.js',
headers=pirate.data.default_headers)
try:
f = request.urlopen(req, timeout=pirate.data.default_timeout)
if f.info().get_content_type() == 'application/javascript':
match = re.search("var server='([^']+)'", f.read().decode())
return mirror + match.group(1)
except urllib.error.URLError:
raise IOError('API not found: no main.js')
raise IOError('API not found')
def get_torrent(info_hash):
@ -200,44 +135,37 @@ def get_torrent(info_hash):
def save_torrents(printer, chosen_links, results, folder):
for link in chosen_links:
magnet = results[link]['magnet']
name = re.search(r'dn=([^\&]*)', magnet)
torrent_name = parse.unquote(name.group(1)).replace('+', ' ')
info_hash = int(re.search(r'btih:([a-f0-9]{40})', magnet).group(1), 16)
torrent_name = torrent_name.replace('/', '_').replace('\\', '_')
result = results[link]
torrent_name = result['name'].replace('/', '_').replace('\\', '_')
file = os.path.join(folder, torrent_name + '.torrent')
try:
torrent = get_torrent(info_hash)
torrent = get_torrent(result['info_hash'])
except urllib.error.HTTPError as e:
printer.print('There is no cached file for this torrent :('
' \nCode: {} - {}'.format(e.code, e.reason),
color='ERROR')
else:
open(file, 'wb').write(torrent)
printer.print('Saved {:X} in {}'.format(info_hash, file))
printer.print('Saved {:X} in {}'.format(result['info_hash'], file))
def save_magnets(printer, chosen_links, results, folder):
for link in chosen_links:
magnet = results[link]['magnet']
name = re.search(r'dn=([^\&]*)', magnet)
torrent_name = parse.unquote(name.group(1)).replace('+', ' ')
info_hash = int(re.search(r'btih:([a-f0-9]{40})', magnet).group(1), 16)
torrent_name = torrent_name.replace('/', '_').replace('\\', '_')
result = results[link]
torrent_name = result['name'].replace('/', '_').replace('\\', '_')
file = os.path.join(folder, torrent_name + '.magnet')
printer.print('Saved {:X} in {}'.format(info_hash, file))
printer.print('Saved {:X} in {}'.format(result['info_hash'], file))
with open(file, 'w') as f:
f.write(magnet + '\n')
f.write(result['magnet'] + '\n')
def copy_magnets(printer, chosen_links, results):
clipboard_text = ''
for link in chosen_links:
magnet = results[link]['magnet']
info_hash = int(re.search(r'btih:([a-fA-F0-9]{40})', magnet).group(1), 16)
clipboard_text += magnet + "\n"
printer.print('Copying {:X} to clipboard'.format(info_hash))
result = results[link]
clipboard_text += result['magnet'] + "\n"
printer.print('Copying {:X} to clipboard'.format(result['info_hash']))
pyperclip.copy(clipboard_text)