1
0
mirror of https://github.com/vikstrous/pirate-get synced 2025-01-25 12:24:20 +01:00
pirate-get/pirate/torrent.py

231 lines
7.3 KiB
Python
Raw Normal View History

2015-08-30 03:28:43 +02:00
import re
import sys
import gzip
import pyperclip
2015-08-30 03:28:43 +02:00
import urllib.request as request
import urllib.parse as parse
import urllib.error
import os.path
2015-08-30 03:28:43 +02:00
2016-07-03 19:26:58 +02:00
from bs4 import BeautifulSoup
2015-09-04 06:44:02 +02:00
2015-08-30 03:28:43 +02:00
import pirate.data
from io import BytesIO
parser_regex = r'"(magnet\:\?xt=[^"]*)|<td align="right">([^<]+)</td>'
def parse_category(printer, category):
2015-09-03 09:18:11 +02:00
try:
category = int(category)
except ValueError:
pass
if category in pirate.data.categories.values():
return category
elif category in pirate.data.categories.keys():
return pirate.data.categories[category]
2015-08-30 03:28:43 +02:00
else:
printer.print('Invalid category ignored', color='WARN')
2015-09-15 05:21:13 +02:00
return 0
2015-08-30 03:28:43 +02:00
def parse_sort(printer, sort):
2015-09-03 09:18:11 +02:00
try:
sort = int(sort)
except ValueError:
pass
if sort in pirate.data.sorts.values():
return sort
elif sort in pirate.data.sorts.keys():
return pirate.data.sorts[sort]
2015-08-30 03:28:43 +02:00
else:
printer.print('Invalid sort ignored', color='WARN')
2015-09-15 05:21:13 +02:00
return 99
2016-07-07 03:51:13 +02:00
# TODO:
# * warn users when using a sort in a mode that doesn't accept sorts
# * warn users when using search terms in a mode
# that doesn't accept search terms
# * same with page parameter for top and top48h
# * warn the user if trying to use a minor category with top48h
2015-09-04 05:25:24 +02:00
def build_request_path(page, category, sort, mode, terms):
if mode == 'browse':
if(category == 0):
category = 100
return '/browse/{}/{}/{}'.format(category, page, sort)
elif mode == 'recent':
# This is not a typo. There is no / between 48h and the category.
path = '/top/48h'
# only major categories can be used with this mode
if(category == 0):
return path + 'all'
else:
return path + str(category)
elif mode == 'top':
path = '/top/'
if(category == 0):
return path + 'all'
else:
return path + str(category)
elif mode == 'search':
query = urllib.parse.quote_plus(' '.join(terms))
return '/search/{}/{}/{}/{}'.format(query, page, sort, category)
else:
raise Exception('Unknown mode.')
2015-09-04 07:18:38 +02:00
# this returns a list of dictionaries
2015-09-04 06:44:02 +02:00
def parse_page(html):
2016-07-03 19:26:58 +02:00
soup = BeautifulSoup(html, 'html.parser')
2019-11-24 11:01:58 +01:00
tables = soup.find_all('table', id='searchResult')
2016-07-03 19:26:58 +02:00
no_results = re.search(r'No hits\. Try adding an asterisk in '
r'you search phrase\.', html)
# check for a blocked mirror
2019-11-24 11:01:58 +01:00
if not tables and not no_results:
2016-07-03 19:26:58 +02:00
# Contradiction - we found no results,
# but the page didn't say there were no results.
# The page is probably not actually the pirate bay,
# so let's try another mirror
raise IOError('Blocked mirror detected.')
if no_results:
2019-11-24 11:01:58 +01:00
return []
# handle ads disguised as fake result tables
for table in tables:
results = parse_table(table)
if results:
break
else:
raise IOError('Mirror does not contain magnets.')
return results
def parse_table(table):
results = []
2016-07-03 19:26:58 +02:00
# parse the rows one by one (skipping headings)
for row in table('tr')[1:]:
# grab info about the row
row_link = row.find('a', class_='detLink')
if row_link is None:
continue
2019-11-24 11:01:58 +01:00
id_ = row_link['href'].split('/')[2]
2016-07-03 19:26:58 +02:00
seeds, leechers = [i.text for i in row('td')[-2:]]
2019-11-24 11:01:58 +01:00
magnet_tag = row.find(lambda tag: tag.name == 'a' and
tag['href'].startswith('magnet'))
if magnet_tag is None:
continue
magnet = magnet_tag['href']
# parse descriptions separately
2016-07-03 19:26:58 +02:00
description = row.find('font', class_='detDesc').text
2016-07-07 03:51:13 +02:00
size = re.findall(r'(?<=Size )[0-9.]+\s[KMGT]*[i ]*B',
description)[0].split()
uploaded = re.findall(r'(?<=Uploaded ).+(?=\, Size)',
description)[0]
results.append({
'magnet': magnet,
'seeds': seeds,
'leechers': leechers,
'size': size,
'uploaded': uploaded,
'id': id_
})
2015-09-04 05:25:24 +02:00
return results
2015-09-04 05:25:24 +02:00
def remote(printer, pages, category, sort, mode, terms, mirror):
res_l = []
2015-09-04 05:25:24 +02:00
if pages < 1:
raise ValueError('Please provide an integer greater than 0 '
'for the number of pages to fetch.')
2015-08-30 03:28:43 +02:00
# Catch the Ctrl-C exception and exit cleanly
try:
for page in range(pages):
2015-09-04 05:25:24 +02:00
path = build_request_path(page, category, sort, mode, terms)
2015-08-30 03:28:43 +02:00
req = request.Request(mirror + path,
headers=pirate.data.default_headers)
req.add_header('Accept-encoding', 'gzip')
f = request.urlopen(req, timeout=pirate.data.default_timeout)
if f.info().get('Content-Encoding') == 'gzip':
f = gzip.GzipFile(fileobj=BytesIO(f.read()))
res = f.read().decode('utf-8')
2015-09-04 05:25:24 +02:00
2015-09-04 07:18:38 +02:00
res_l += parse_page(res)
2015-09-04 05:25:24 +02:00
except KeyboardInterrupt:
printer.print('\nCancelled.')
2015-08-30 03:28:43 +02:00
sys.exit(0)
2015-09-04 07:18:38 +02:00
return res_l
2015-08-30 03:28:43 +02:00
def get_torrent(info_hash):
url = 'http://itorrents.org/torrent/{:X}.torrent'
2015-08-30 03:28:43 +02:00
req = request.Request(url.format(info_hash),
2015-09-17 08:15:27 +02:00
headers=pirate.data.default_headers)
2015-08-30 03:28:43 +02:00
req.add_header('Accept-encoding', 'gzip')
2015-09-17 08:15:27 +02:00
2015-08-30 03:28:43 +02:00
torrent = request.urlopen(req, timeout=pirate.data.default_timeout)
if torrent.info().get('Content-Encoding') == 'gzip':
torrent = gzip.GzipFile(fileobj=BytesIO(torrent.read()))
return torrent.read()
def save_torrents(printer, chosen_links, results, folder):
2015-08-30 03:28:43 +02:00
for link in chosen_links:
2015-09-04 07:18:38 +02:00
magnet = results[link]['magnet']
2015-08-30 03:28:43 +02:00
name = re.search(r'dn=([^\&]*)', magnet)
torrent_name = parse.unquote(name.group(1)).replace('+', ' ')
info_hash = int(re.search(r'btih:([a-f0-9]{40})', magnet).group(1), 16)
torrent_name = torrent_name.replace('/', '_').replace('\\', '_')
2015-08-30 03:28:43 +02:00
file = os.path.join(folder, torrent_name + '.torrent')
try:
torrent = get_torrent(info_hash)
2018-09-15 04:14:29 +02:00
except urllib.error.HTTPError as e:
printer.print('There is no cached file for this torrent :('
' \nCode: {} - {}'.format(e.code, e.reason),
2016-07-07 03:51:13 +02:00
color='ERROR')
2015-08-30 03:28:43 +02:00
else:
2015-09-17 08:15:27 +02:00
open(file, 'wb').write(torrent)
printer.print('Saved {:X} in {}'.format(info_hash, file))
2015-08-30 03:28:43 +02:00
def save_magnets(printer, chosen_links, results, folder):
2015-08-30 03:28:43 +02:00
for link in chosen_links:
2015-09-04 07:18:38 +02:00
magnet = results[link]['magnet']
2015-08-30 03:28:43 +02:00
name = re.search(r'dn=([^\&]*)', magnet)
torrent_name = parse.unquote(name.group(1)).replace('+', ' ')
info_hash = int(re.search(r'btih:([a-f0-9]{40})', magnet).group(1), 16)
torrent_name = torrent_name.replace('/', '_').replace('\\', '_')
2015-08-30 03:28:43 +02:00
file = os.path.join(folder, torrent_name + '.magnet')
printer.print('Saved {:X} in {}'.format(info_hash, file))
2015-08-30 03:28:43 +02:00
with open(file, 'w') as f:
f.write(magnet + '\n')
2019-11-24 11:01:58 +01:00
def copy_magnets(printer, chosen_links, results):
clipboard_text = ''
for link in chosen_links:
magnet = results[link]['magnet']
info_hash = int(re.search(r'btih:([a-f0-9]{40})', magnet).group(1), 16)
clipboard_text += magnet + "\n"
printer.print('Copying {:X} to clipboard'.format(info_hash))
pyperclip.copy(clipboard_text)