diff --git a/magneticod/magneticod/__init__.py b/magneticod/magneticod/__init__.py index 0c2c6c3..97e0021 100644 --- a/magneticod/magneticod/__init__.py +++ b/magneticod/magneticod/__init__.py @@ -12,4 +12,4 @@ # # You should have received a copy of the GNU Affero General Public License along with this program. If not, see # . -__version__ = (0, 1, 0) +__version__ = (0, 2, 0) diff --git a/magneticod/magneticod/__main__.py b/magneticod/magneticod/__main__.py index 8344f9e..9f2bf73 100644 --- a/magneticod/magneticod/__main__.py +++ b/magneticod/magneticod/__main__.py @@ -12,10 +12,14 @@ # # You should have received a copy of the GNU Affero General Public License along with this program. If not, see # . +import argparse import collections import functools import logging +import ipaddress import selectors +import textwrap +import urllib.parse import itertools import os import sys @@ -51,9 +55,13 @@ complete_info_hashes = set() def main(): global complete_info_hashes, database, node, peers, selector - logging.basicConfig(level=logging.DEBUG, format="%(asctime)s %(levelname)8s %(message)s") + logging.basicConfig(level=logging.DEBUG, format="%(asctime)s %(levelname)-8s %(message)s") logging.info("magneticod v%d.%d.%d started", *__version__) + arguments = parse_cmdline_arguments() + if arguments is None: + return 2 + # noinspection PyBroadException try: path = os.path.join(appdirs.user_data_dir("magneticod"), "database.sqlite3") @@ -64,7 +72,7 @@ def main(): complete_info_hashes = database.get_complete_info_hashes() - node = dht.SybilNode() + node = dht.SybilNode(arguments.node_addr) node.when_peer_found = on_peer_found selector.register(node, selectors.EVENT_READ) @@ -155,5 +163,63 @@ def loop() -> None: selector.modify(fileobj, selectors.EVENT_READ | selectors.EVENT_WRITE) +def parse_cmdline_arguments() -> typing.Optional[argparse.Namespace]: + parser = argparse.ArgumentParser( + description="Autonomous BitTorrent DHT crawler and metadata fetcher.", + epilog=textwrap.dedent("""\ + Copyright (C) 2017 Mert Bora ALPER + Dedicated to Cemile Binay, in whose hands I thrived. + + This program is free software: you can redistribute it and/or modify it under + the terms of the GNU Affero General Public License as published by the Free + Software Foundation, either version 3 of the License, or (at your option) any + later version. + + This program is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A + PARTICULAR PURPOSE. See the GNU Affero General Public License for more + details. + + You should have received a copy of the GNU Affero General Public License along + with this program. If not, see . + """), + allow_abbrev=False, + formatter_class=argparse.RawDescriptionHelpFormatter + ) + parser.add_argument( + "--node-addr", action="store", type=str, required=False, + help="the address of the (DHT) node magneticod will use" + ) + + args = parser.parse_args(sys.argv[1:]) + + args.node_addr = parse_ip_port(args.node_addr) if args.node_addr else ("0.0.0.0", 0) + if args.node_addr is None: + logging.critical("Invalid node address supplied!") + return None + + return args + + +def parse_ip_port(netloc) -> typing.Optional[typing.Tuple[str, int]]: + # In case no port supplied + try: + return str(ipaddress.ip_address(netloc)), 0 + except ValueError: + pass + + # In case port supplied + try: + parsed = urllib.parse.urlparse("//{}".format(netloc)) + ip = str(ipaddress.ip_address(parsed.hostname)) + port = parsed.port + if port is None: + # Invalid port + return None + except ValueError: + return None + + return ip, port + if __name__ == "__main__": sys.exit(main()) diff --git a/magneticod/magneticod/bencode.py b/magneticod/magneticod/bencode.py index 959c526..e65e209 100755 --- a/magneticod/magneticod/bencode.py +++ b/magneticod/magneticod/bencode.py @@ -16,21 +16,23 @@ """ bencode + Wrapper around bencoder.pyx library. + +bencoder.pyx + Copyright (c) 2016, whtsky + All rights reserved. + https://github.com/whtsky/bencoder.pyx Warning: Encoders do NOT check for circular objects! (and will NEVER check due to speed concerns). TODO: - Add support for integers in scientific notation. (?) - Please do re-write this as a shared C module so that we can gain a H U G E speed & performance gain! - - I M P O R T A N T // U R G E N T Support bytearrays as well! (Currently, only bytes). """ - - import typing +import bencoder + Types = typing.Union[int, bytes, list, "KRPCDict"] KRPCDict = typing.Dict[bytes, Types] @@ -38,14 +40,14 @@ KRPCDict = typing.Dict[bytes, Types] def dumps(obj) -> bytes: try: - return __encode[type(obj)](obj) + return bencoder.bencode(obj) except: raise BencodeEncodingError() def loads(bytes_object: bytes) -> Types: try: - return __decoders[bytes_object[0]](bytes_object, 0)[0] + return bencoder.decode_func[bytes_object[0]](bytes_object, 0)[0] except Exception as exc: raise BencodeDecodingError(exc) @@ -59,102 +61,11 @@ def loads2(bytes_object: bytes) -> typing.Tuple[Types, int]: print(">>>", dump[i:]) # OUTPUT: >>> b'OH YEAH' """ try: - return __decoders[bytes_object[0]](bytes_object, 0) + return bencoder.decode_func[bytes_object[0]](bytes_object, 0) except Exception as exc: raise BencodeDecodingError(exc) -def __encode_int(i: int) -> bytes: - # False positive... - return b"i%de" % i - - -def __encode_str(s: typing.ByteString) -> bytes: - return b"%d:%s" % (len(s), s) - - -def __encode_list(l: typing.Sequence) -> bytes: - """ REFERENCE IMPLEMENTATION - s = bytearray() - for obj in l: - s += __encode[type(obj)](obj) - return b"l%se" % (s,) - """ - return b"l%se" % b"".join(__encode[type(obj)](obj) for obj in l) - - -def __encode_dict(d: typing.Dict[typing.ByteString, typing.Any]) -> bytes: - s = bytearray() - # Making sure that the keys are in lexicographical order. - # Source: http://stackoverflow.com/a/7375703/4466589 - items = sorted(d.items(), key=lambda k: (k[0].lower(), k[0])) - for key, value in items: - s += __encode_str(key) - s += __encode[type(value)](value) - return b"d%se" % (s, ) - - -__encode = { - int: __encode_int, - bytes: __encode_str, - bytearray: __encode_str, - list: __encode_list, - dict: __encode_dict -} - - -def __decode_int(b: bytes, start_i: int) -> typing.Tuple[int, int]: - end_i = b.find(b"e", start_i) - assert end_i != -1 - return int(b[start_i + 1: end_i]), end_i + 1 - - -def __decode_str(b: bytes, start_i: int) -> typing.Tuple[bytes, int]: - separator_i = b.find(b":", start_i) - assert separator_i != -1 - length = int(b[start_i: separator_i]) - return b[separator_i + 1: separator_i + 1 + length], separator_i + 1 + length - - -def __decode_list(b: bytes, start_i: int) -> typing.Tuple[list, int]: - list_ = [] - i = start_i + 1 - while b[i] != 101: # 101 = ord(b"e") - item, i = __decoders[b[i]](b, i) - list_.append(item) - return list_, i + 1 - - -def __decode_dict(b: bytes, start_i: int) -> typing.Tuple[dict, int]: - dict_ = {} - - i = start_i + 1 - while b[i] != 101: # 101 = ord(b"e") - # Making sure it's between b"0" and b"9" (incl.) - assert 48 <= b[i] <= 57 - key, end_i = __decode_str(b, i) - dict_[key], i = __decoders[b[end_i]](b, end_i) - - return dict_, i + 1 - - -__decoders = { - ord(b"i"): __decode_int, - ord(b"0"): __decode_str, - ord(b"1"): __decode_str, - ord(b"2"): __decode_str, - ord(b"3"): __decode_str, - ord(b"4"): __decode_str, - ord(b"5"): __decode_str, - ord(b"6"): __decode_str, - ord(b"7"): __decode_str, - ord(b"8"): __decode_str, - ord(b"9"): __decode_str, - ord(b"l"): __decode_list, - ord(b"d"): __decode_dict -} - - class BencodeEncodingError(Exception): pass diff --git a/magneticod/magneticod/bittorrent.py b/magneticod/magneticod/bittorrent.py index a3c399b..472dae4 100644 --- a/magneticod/magneticod/bittorrent.py +++ b/magneticod/magneticod/bittorrent.py @@ -33,8 +33,9 @@ class DisposablePeer: self.__socket.setblocking(False) # To reduce the latency: self.__socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, True) - if hasattr(socket, 'TCP_QUICKACK'): + if hasattr(socket, "TCP_QUICKACK"): self.__socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_QUICKACK, True) + res = self.__socket.connect_ex(peer_addr) if res != errno.EINPROGRESS: raise ConnectionError() diff --git a/magneticod/magneticod/dht.py b/magneticod/magneticod/dht.py index cc7d508..dffefb9 100644 --- a/magneticod/magneticod/dht.py +++ b/magneticod/magneticod/dht.py @@ -35,14 +35,11 @@ BOOTSTRAPPING_NODES = [ class SybilNode: - # Maximum number of neighbours (this is a THRESHOLD where, once reached, the search for new neighbours will stop; - # but until then, the total number of neighbours might exceed the threshold). - - def __init__(self): + def __init__(self, address: typing.Tuple[str, int]): self.__true_id = self.__random_bytes(20) self.__socket = socket.socket(type=socket.SOCK_DGRAM) - self.__socket.bind(("0.0.0.0", 0)) + self.__socket.bind(address) self.__socket.setblocking(False) self.__incoming_buffer = array.array("B", (0 for _ in range(65536))) @@ -52,9 +49,11 @@ class SybilNode: self.__token_secret = self.__random_bytes(4) + # Maximum number of neighbours (this is a THRESHOLD where, once reached, the search for new neighbours will + # stop; but until then, the total number of neighbours might exceed the threshold). self.__n_max_neighbours = 2000 - logging.debug("SybilNode %s initialized!", self.__true_id.hex().upper()) + logging.debug("SybilNode %s on %s initialized!", self.__true_id.hex().upper(), address) @staticmethod def when_peer_found(info_hash: InfoHash, peer_addr: PeerAddress) -> None: @@ -83,7 +82,7 @@ class SybilNode: except bencode.BencodeDecodingError: continue - if type(message.get(b"r")) is dict and type(message[b"r"].get(b"nodes")) is bytes: + if isinstance(message.get(b"r"), dict) and type(message[b"r"].get(b"nodes")) is bytes: self.__on_FIND_NODE_response(message) elif message.get(b"q") == b"get_peers": self.__on_GET_PEERS_query(message, addr) diff --git a/magneticod/magneticod/persistence.py b/magneticod/magneticod/persistence.py index 168fd43..f00e547 100644 --- a/magneticod/magneticod/persistence.py +++ b/magneticod/magneticod/persistence.py @@ -41,16 +41,18 @@ class Database: db_conn = sqlite3.connect(database, isolation_level=None) db_conn.execute("PRAGMA journal_mode=WAL;") + db_conn.execute("PRAGMA temp_store = 2;") db_conn.execute("PRAGMA foreign_keys=ON;") with db_conn: db_conn.execute("CREATE TABLE IF NOT EXISTS torrents (" - "id INTEGER PRIMARY KEY," + "id INTEGER PRIMARY KEY AUTOINCREMENT," "info_hash BLOB NOT NULL UNIQUE," "name TEXT NOT NULL," "total_size INTEGER NOT NULL CHECK(total_size > 0)," "discovered_on INTEGER NOT NULL CHECK(discovered_on > 0)" ");") + db_conn.execute("CREATE INDEX IF NOT EXISTS info_hash_index ON torrents (info_hash);") db_conn.execute("CREATE TABLE IF NOT EXISTS files (" "id INTEGER PRIMARY KEY," "torrent_id INTEGER REFERENCES torrents ON DELETE CASCADE ON UPDATE RESTRICT," diff --git a/magneticod/setup.py b/magneticod/setup.py index 29b90d1..daca224 100644 --- a/magneticod/setup.py +++ b/magneticod/setup.py @@ -8,7 +8,7 @@ def read_file(path): setup( name="magneticod", - version="0.1.0", + version="0.2.0", description="Autonomous BitTorrent DHT crawler and metadata fetcher.", long_description=read_file("README.rst"), url="http://magnetico.org", @@ -22,7 +22,8 @@ setup( }, install_requires=[ - "appdirs>=1.4.3" + "appdirs >= 1.4.3", + "bencoder.pyx >= 1.1.3" ], classifiers=[ diff --git a/magneticow/magneticow/magneticow.py b/magneticow/magneticow/magneticow.py index 7cf7c80..c6d43fc 100644 --- a/magneticow/magneticow/magneticow.py +++ b/magneticow/magneticow/magneticow.py @@ -101,12 +101,12 @@ def search_torrents(): " discovered_on " "FROM torrents " "INNER JOIN (" - " SELECT torrent_id, rank(matchinfo(fts_torrents, 'pcnxal')) AS rank " + " SELECT docid AS id, rank(matchinfo(fts_torrents, 'pcnxal')) AS rank " " FROM fts_torrents " " WHERE name MATCH ? " " ORDER BY rank ASC" " LIMIT 20 OFFSET ?" - ") AS ranktable ON torrents.id=ranktable.torrent_id;", + ") AS ranktable USING(id);", (search, 20 * page) ) context["torrents"] = [Torrent(t[0].hex(), t[1], utils.to_human_size(t[2]), @@ -138,7 +138,7 @@ def newest_torrents(): " total_size, " " discovered_on " "FROM torrents " - "ORDER BY discovered_on DESC LIMIT 20 OFFSET ?", + "ORDER BY id DESC LIMIT 20 OFFSET ?", (20 * page,) ) context["torrents"] = [Torrent(t[0].hex(), t[1], utils.to_human_size(t[2]), datetime.fromtimestamp(t[3]).strftime("%d/%m/%Y"), []) @@ -187,15 +187,14 @@ def torrent(**kwargs): return flask.abort(400) with magneticod_db: - cur = magneticod_db.execute("SELECT name, discovered_on FROM torrents WHERE info_hash=? LIMIT 1;", (info_hash,)) + cur = magneticod_db.execute("SELECT id, name, discovered_on FROM torrents WHERE info_hash=? LIMIT 1;", + (info_hash,)) try: - name, discovered_on = cur.fetchone() + torrent_id, name, discovered_on = cur.fetchone() except TypeError: # In case no results returned, TypeError will be raised when we try to subscript None object return flask.abort(404) - cur = magneticod_db.execute("SELECT path, size FROM files " - "WHERE torrent_id=(SELECT id FROM torrents WHERE info_hash=? LIMIT 1);", - (info_hash,)) + cur = magneticod_db.execute("SELECT path, size FROM files WHERE torrent_id=?;", (torrent_id,)) raw_files = cur.fetchall() size = sum(f[1] for f in raw_files) files = [File(f[0], utils.to_human_size(f[1])) for f in raw_files] @@ -214,12 +213,15 @@ def get_magneticod_db(): magneticod_db = flask.g.magneticod_db = sqlite3.connect(magneticod_db_path, isolation_level=None) with magneticod_db: - magneticod_db.execute("CREATE VIRTUAL TABLE temp.fts_torrents USING fts4(torrent_id INTEGER, name TEXT NOT NULL);") - magneticod_db.execute("INSERT INTO fts_torrents (torrent_id, name) SELECT id, name FROM torrents;") + magneticod_db.execute("PRAGMA journal_mode=WAL;") + magneticod_db.execute("PRAGMA temp_store=2;") + + magneticod_db.execute("CREATE VIRTUAL TABLE temp.fts_torrents USING fts4(name);") + magneticod_db.execute("INSERT INTO fts_torrents (docid, name) SELECT id, name FROM torrents;") magneticod_db.execute("INSERT INTO fts_torrents (fts_torrents) VALUES ('optimize');") magneticod_db.execute("CREATE TEMPORARY TRIGGER on_torrents_insert AFTER INSERT ON torrents FOR EACH ROW BEGIN" - " INSERT INTO fts_torrents (torrent_id, name) VALUES (NEW.id, NEW.name);" + " INSERT INTO fts_torrents (docid, name) VALUES (NEW.id, NEW.name);" "END;") magneticod_db.create_function("rank", 1, utils.rank) diff --git a/magneticow/magneticow/utils.py b/magneticow/magneticow/utils.py index 1297f6e..5125b20 100644 --- a/magneticow/magneticow/utils.py +++ b/magneticow/magneticow/utils.py @@ -37,10 +37,10 @@ def rank(blob): x.append((x0, x1, x2)) # Ignore the first column (torrent_id) - _, avgdl = unpack_from("=LL", blob, 12 + 3*c*p*4) + avgdl = unpack_from("=L", blob, 12 + 3*c*p*4)[0] # Ignore the first column (torrent_id) - _, l = unpack_from("=LL", blob, (12 + 3*c*p*4) + 4*c) + l = unpack_from("=L", blob, (12 + 3*c*p*4) + 4*c)[0] # Multiply by -1 so that sorting in the ASC order would yield the best match first return -1 * okapi_bm25(term_frequencies=[X[0] for X in x], dl=l, avgdl=avgdl, N=n, nq=[X[2] for X in x]) diff --git a/magneticow/setup.py b/magneticow/setup.py index 08627df..4066bec 100644 --- a/magneticow/setup.py +++ b/magneticow/setup.py @@ -8,7 +8,7 @@ def read_file(path): setup( name="magneticow", - version="0.1.0", + version="0.2.0", description="Lightweight web interface for magnetico.", long_description=read_file("README.rst"), url="http://magnetico.org", @@ -23,9 +23,9 @@ setup( }, install_requires=[ - "appdirs>=1.4.3", - "flask>=0.12.1", - "gevent>=1.2.1" + "appdirs >= 1.4.3", + "flask >= 0.12.1", + "gevent >= 1.2.1" ], classifiers=[