1
0
mirror of https://github.com/vikstrous/pirate-get synced 2025-01-09 09:59:51 +01:00

Merge pull request #132 from vikstrous/cookie

handle cookies for tpb http requests
This commit is contained in:
Michele Guerini Rocco 2020-03-22 18:01:20 +01:00 committed by GitHub
commit e58ecb87dd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 46 additions and 12 deletions

View File

@ -4,12 +4,14 @@ import gzip
import urllib.parse as parse import urllib.parse as parse
import urllib.request as request import urllib.request as request
import shutil import shutil
from io import BytesIO
import pirate.data
import colorama import colorama
import veryprettytable import veryprettytable
import pirate.data from io import BytesIO
from http.cookiejar import CookieJar
class Printer: class Printer:
@ -101,12 +103,17 @@ class Printer:
self.print(table) self.print(table)
def descriptions(self, chosen_links, results, site): def descriptions(self, chosen_links, results, site):
jar = CookieJar()
opener = request.build_opener(
request.HTTPErrorProcessor,
request.HTTPCookieProcessor(jar))
for link in chosen_links: for link in chosen_links:
path = '/torrent/%s/' % results[link]['id'] path = '/torrent/%s/' % results[link]['id']
req = request.Request(site + path, req = request.Request(site + path,
headers=pirate.data.default_headers) headers=pirate.data.default_headers)
req.add_header('Accept-encoding', 'gzip') req.add_header('Accept-encoding', 'gzip')
f = request.urlopen(req, timeout=pirate.data.default_timeout) f = opener.open(req, timeout=pirate.data.default_timeout)
if f.info().get('Content-Encoding') == 'gzip': if f.info().get('Content-Encoding') == 'gzip':
f = gzip.GzipFile(fileobj=BytesIO(f.read())) f = gzip.GzipFile(fileobj=BytesIO(f.read()))
@ -125,13 +132,18 @@ class Printer:
self.print(desc, color='zebra_0') self.print(desc, color='zebra_0')
def file_lists(self, chosen_links, results, site): def file_lists(self, chosen_links, results, site):
jar = CookieJar()
opener = request.build_opener(
request.HTTPErrorProcessor,
request.HTTPCookieProcessor(jar))
for link in chosen_links: for link in chosen_links:
path = '/ajax_details_filelist.php' path = '/ajax_details_filelist.php'
query = '?id=' + results[link]['id'] query = '?id=' + results[link]['id']
req = request.Request(site + path + query, req = request.Request(site + path + query,
headers=pirate.data.default_headers) headers=pirate.data.default_headers)
req.add_header('Accept-encoding', 'gzip') req.add_header('Accept-encoding', 'gzip')
f = request.urlopen(req, timeout=pirate.data.default_timeout) f = opener.open(req, timeout=pirate.data.default_timeout)
if f.info().get('Content-Encoding') == 'gzip': if f.info().get('Content-Encoding') == 'gzip':
f = gzip.GzipFile(fileobj=BytesIO(f.read())) f = gzip.GzipFile(fileobj=BytesIO(f.read()))

View File

@ -7,11 +7,11 @@ import urllib.parse as parse
import urllib.error import urllib.error
import os.path import os.path
from bs4 import BeautifulSoup
import pirate.data import pirate.data
from bs4 import BeautifulSoup
from io import BytesIO from io import BytesIO
from http.cookiejar import CookieJar
parser_regex = r'"(magnet\:\?xt=[^"]*)|<td align="right">([^<]+)</td>' parser_regex = r'"(magnet\:\?xt=[^"]*)|<td align="right">([^<]+)</td>'
@ -152,13 +152,18 @@ def remote(printer, pages, category, sort, mode, terms, mirror):
# Catch the Ctrl-C exception and exit cleanly # Catch the Ctrl-C exception and exit cleanly
try: try:
jar = CookieJar()
opener = request.build_opener(
request.HTTPErrorProcessor,
request.HTTPCookieProcessor(jar))
for page in range(pages): for page in range(pages):
path = build_request_path(page, category, sort, mode, terms) path = build_request_path(page, category, sort, mode, terms)
req = request.Request(mirror + path, req = request.Request(mirror + path,
headers=pirate.data.default_headers) headers=pirate.data.default_headers)
req.add_header('Accept-encoding', 'gzip') req.add_header('Accept-encoding', 'gzip')
f = request.urlopen(req, timeout=pirate.data.default_timeout) f = opener.open(req, timeout=pirate.data.default_timeout)
if f.info().get('Content-Encoding') == 'gzip': if f.info().get('Content-Encoding') == 'gzip':
f = gzip.GzipFile(fileobj=BytesIO(f.read())) f = gzip.GzipFile(fileobj=BytesIO(f.read()))
res = f.read().decode('utf-8') res = f.read().decode('utf-8')

View File

@ -1,4 +1,5 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import os
import unittest import unittest
from unittest.mock import patch, call, MagicMock from unittest.mock import patch, call, MagicMock
@ -6,6 +7,10 @@ from pirate.print import Printer
class TestPrint(unittest.TestCase): class TestPrint(unittest.TestCase):
@classmethod
def setUpClass(cls):
# needed to display the results table
os.environ['COLUMNS'] = '80'
def test_print_results_remote(self): def test_print_results_remote(self):
class MockTable: class MockTable:
@ -88,8 +93,12 @@ class TestPrint(unittest.TestCase):
read = MagicMock(return_value='<html><div class="nfo"><pre>stuff <a href="href">link</a></pre></div></html>'.encode('utf8')) read = MagicMock(return_value='<html><div class="nfo"><pre>stuff <a href="href">link</a></pre></div></html>'.encode('utf8'))
info = MagicMock() info = MagicMock()
response_obj = MockResponse() response_obj = MockResponse()
class MockOpener():
open = MagicMock(return_value=response_obj)
add_handler = MagicMock()
opener_obj = MockOpener()
with patch('urllib.request.Request', return_value=request_obj) as request: with patch('urllib.request.Request', return_value=request_obj) as request:
with patch('urllib.request.urlopen', return_value=response_obj) as urlopen: with patch('urllib.request.OpenerDirector', return_value=opener_obj) as opener:
printer.descriptions([0], [{'id': '1', 'magnet': 'dn=name'}], 'example.com') printer.descriptions([0], [{'id': '1', 'magnet': 'dn=name'}], 'example.com')
printer.print.assert_has_calls([call('Description for "name":', color='zebra_1'),call('stuff [link](href)', color='zebra_0')]) printer.print.assert_has_calls([call('Description for "name":', color='zebra_1'),call('stuff [link](href)', color='zebra_0')])
@ -103,8 +112,12 @@ class TestPrint(unittest.TestCase):
read = MagicMock(return_value='<html><tr><td align="left">1.</td><td align="right">filename</tr></html>'.encode('utf8')) read = MagicMock(return_value='<html><tr><td align="left">1.</td><td align="right">filename</tr></html>'.encode('utf8'))
info = MagicMock() info = MagicMock()
response_obj = MockResponse() response_obj = MockResponse()
class MockOpener():
open = MagicMock(return_value=response_obj)
add_handler = MagicMock()
opener_obj = MockOpener()
with patch('urllib.request.Request', return_value=request_obj) as request: with patch('urllib.request.Request', return_value=request_obj) as request:
with patch('urllib.request.urlopen', return_value=response_obj) as urlopen: with patch('urllib.request.OpenerDirector', return_value=opener_obj) as opener:
printer.file_lists([0], [{'id': '1', 'magnet': 'dn=name'}], 'example.com') printer.file_lists([0], [{'id': '1', 'magnet': 'dn=name'}], 'example.com')
printer.print.assert_has_calls([call('Files in "name":', color='zebra_1'),call(' 1. filename', color='zebra_0')]) printer.print.assert_has_calls([call('Files in "name":', color='zebra_1'),call(' 1. filename', color='zebra_0')])

View File

@ -124,14 +124,18 @@ class TestTorrent(unittest.TestCase):
add_header = mock.MagicMock() add_header = mock.MagicMock()
request_obj = MockRequest() request_obj = MockRequest()
class MockResponse(): class MockResponse():
read = mock.MagicMock(return_value='<html>No hits. Try adding an asterisk in you search phrase.</html>'.encode('utf8')) read = mock.MagicMock(return_value=b'<html>No hits. Try adding an asterisk in you search phrase.</html>')
info = mock.MagicMock() info = mock.MagicMock()
response_obj = MockResponse() response_obj = MockResponse()
class MockOpener():
open = mock.MagicMock(return_value=response_obj)
add_handler = mock.MagicMock()
opener_obj = MockOpener()
with patch('urllib.request.Request', return_value=request_obj) as request: with patch('urllib.request.Request', return_value=request_obj) as request:
with patch('urllib.request.urlopen', return_value=response_obj) as urlopen: with patch('urllib.request.OpenerDirector', return_value=opener_obj) as opener:
res = pirate.torrent.remote(MagicMock(Printer), 1, 100, 10, 'browse', [], 'http://example.com') res = pirate.torrent.remote(MagicMock(Printer), 1, 100, 10, 'browse', [], 'http://example.com')
request.assert_called_once_with('http://example.com/browse/100/0/10', headers=pirate.data.default_headers) request.assert_called_once_with('http://example.com/browse/100/0/10', headers=pirate.data.default_headers)
urlopen.assert_called_once_with(request_obj, timeout=pirate.data.default_timeout) opener_obj.open.assert_called_once_with(request_obj, timeout=pirate.data.default_timeout)
self.assertEqual(res, []) self.assertEqual(res, [])
if __name__ == '__main__': if __name__ == '__main__':