diff --git a/magneticod/magneticod/__main__.py b/magneticod/magneticod/__main__.py index 32bd4ba..9e3d80b 100644 --- a/magneticod/magneticod/__main__.py +++ b/magneticod/magneticod/__main__.py @@ -27,18 +27,15 @@ import time import typing import appdirs +import humanfriendly +from .constants import TICK_INTERVAL, MAX_ACTIVE_PEERS_PER_INFO_HASH, DEFAULT_MAX_METADATA_SIZE from . import __version__ from . import bittorrent from . import dht from . import persistence -TICK_INTERVAL = 1 # in seconds (soft constraint) -# maximum (inclusive) number of active (disposable) peers to fetch the metadata per info hash at the same time: -MAX_ACTIVE_PEERS_PER_INFO_HASH = 5 - - # Global variables are bad bla bla bla, BUT these variables are used so many times that I think it is justified; else # the signatures of many functions are literally cluttered. # @@ -55,16 +52,14 @@ complete_info_hashes = set() def main(): global complete_info_hashes, database, node, peers, selector - logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)-8s %(message)s") - logging.info("magneticod v%d.%d.%d started", *__version__) - arguments = parse_cmdline_arguments() - if arguments is None: - return 2 + + logging.basicConfig(level=arguments.loglevel, format="%(asctime)s %(levelname)-8s %(message)s") + logging.info("magneticod v%d.%d.%d started", *__version__) # noinspection PyBroadException try: - path = os.path.join(appdirs.user_data_dir("magneticod"), "database.sqlite3") + path = arguments.database_file database = persistence.Database(path) except: logging.exception("could NOT connect to the database!") @@ -73,7 +68,10 @@ def main(): complete_info_hashes = database.get_complete_info_hashes() node = dht.SybilNode(arguments.node_addr) - node.when_peer_found = on_peer_found + + node.when_peer_found = lambda info_hash, peer_address: on_peer_found(info_hash=info_hash, + peer_address=peer_address, + max_metadata_size=arguments.max_metadata_size) selector.register(node, selectors.EVENT_READ) @@ -92,14 +90,14 @@ def main(): return 0 -def on_peer_found(info_hash: dht.InfoHash, peer_address) -> None: +def on_peer_found(info_hash: dht.InfoHash, peer_address, max_metadata_size: int=DEFAULT_MAX_METADATA_SIZE) -> None: global selector, peers, complete_info_hashes if len(peers[info_hash]) > MAX_ACTIVE_PEERS_PER_INFO_HASH or info_hash in complete_info_hashes: return try: - peer = bittorrent.DisposablePeer(info_hash, peer_address) + peer = bittorrent.DisposablePeer(info_hash, peer_address, max_metadata_size) except ConnectionError: return @@ -171,6 +169,33 @@ def loop() -> None: selector.modify(fileobj, selectors.EVENT_READ) +def parse_ip_port(netloc) -> typing.Optional[typing.Tuple[str, int]]: + # In case no port supplied + try: + return str(ipaddress.ip_address(netloc)), 0 + except ValueError: + pass + + # In case port supplied + try: + parsed = urllib.parse.urlparse("//{}".format(netloc)) + ip = str(ipaddress.ip_address(parsed.hostname)) + port = parsed.port + if port is None: + raise argparse.ArgumentParser("Invalid node address supplied!") + except ValueError: + raise argparse.ArgumentParser("Invalid node address supplied!") + + return ip, port + + +def parse_size(value: str) -> int: + try: + return humanfriendly.parse_size(value) + except humanfriendly.InvalidSize as e: + raise argparse.ArgumentTypeError("Invalid argument. {}".format(e)) + + def parse_cmdline_arguments() -> typing.Optional[argparse.Namespace]: parser = argparse.ArgumentParser( description="Autonomous BitTorrent DHT crawler and metadata fetcher.", @@ -194,40 +219,29 @@ def parse_cmdline_arguments() -> typing.Optional[argparse.Namespace]: allow_abbrev=False, formatter_class=argparse.RawDescriptionHelpFormatter ) + parser.add_argument( - "--node-addr", action="store", type=str, required=False, + "--node-addr", action="store", type=parse_ip_port, required=False, default="0.0.0.0:0", help="the address of the (DHT) node magneticod will use" ) - args = parser.parse_args(sys.argv[1:]) + parser.add_argument( + "--max-metadata-size", type=parse_size, default=DEFAULT_MAX_METADATA_SIZE, + help="Limit metadata size to protect memory overflow. Provide in human friendly format eg. 1 M, 1 GB" + ) - args.node_addr = parse_ip_port(args.node_addr) if args.node_addr else ("0.0.0.0", 0) - if args.node_addr is None: - logging.critical("Invalid node address supplied!") - return None + default_database_dir = os.path.join(appdirs.user_data_dir("magneticod"), "database.sqlite3") + parser.add_argument( + "--database-file", type=str, default=default_database_dir, + help="Path to database file (default: {})".format(humanfriendly.format_path(default_database_dir)) + ) + parser.add_argument( + '-d', '--debug', + action="store_const", dest="loglevel", const=logging.DEBUG, default=logging.INFO, + help="Print debugging information in addition to normal processing.", + ) + return parser.parse_args(sys.argv[1:]) - return args - - -def parse_ip_port(netloc) -> typing.Optional[typing.Tuple[str, int]]: - # In case no port supplied - try: - return str(ipaddress.ip_address(netloc)), 0 - except ValueError: - pass - - # In case port supplied - try: - parsed = urllib.parse.urlparse("//{}".format(netloc)) - ip = str(ipaddress.ip_address(parsed.hostname)) - port = parsed.port - if port is None: - # Invalid port - return None - except ValueError: - return None - - return ip, port if __name__ == "__main__": sys.exit(main()) diff --git a/magneticod/magneticod/bittorrent.py b/magneticod/magneticod/bittorrent.py index 9e4ac54..5a5b4be 100644 --- a/magneticod/magneticod/bittorrent.py +++ b/magneticod/magneticod/bittorrent.py @@ -21,14 +21,14 @@ import typing import os from . import bencode - +from .constants import DEFAULT_MAX_METADATA_SIZE InfoHash = bytes PeerAddress = typing.Tuple[str, int] class DisposablePeer: - def __init__(self, info_hash: InfoHash, peer_addr: PeerAddress): + def __init__(self, info_hash: InfoHash, peer_addr: PeerAddress, max_metadata_size: int= DEFAULT_MAX_METADATA_SIZE): self.__socket = socket.socket() self.__socket.setblocking(False) # To reduce the latency: @@ -40,8 +40,11 @@ class DisposablePeer: if res != errno.EINPROGRESS: raise ConnectionError() + self.__peer_addr = peer_addr self.__info_hash = info_hash + self.__max_metadata_size = max_metadata_size + self.__incoming_buffer = bytearray() self.__outgoing_buffer = bytearray() @@ -209,8 +212,16 @@ class DisposablePeer: # Just to make sure that the remote peer supports ut_metadata extension: ut_metadata = msg_dict[b"m"][b"ut_metadata"] metadata_size = msg_dict[b"metadata_size"] - assert metadata_size > 0 - except (AssertionError, KeyError): + assert metadata_size > 0, "Invalid (empty) metadata size" + assert metadata_size < self.__max_metadata_size, "Malicious or malfunctioning peer {}:{} tried send above" \ + " {} max metadata size".format(self.__peer_addr[0], + self.__peer_addr[1], + self.__max_metadata_size) + except KeyError: + self.when_error() + return + except AssertionError as e: + logging.debug(str(e)) self.when_error() return diff --git a/magneticod/magneticod/constants.py b/magneticod/magneticod/constants.py new file mode 100644 index 0000000..bc2c954 --- /dev/null +++ b/magneticod/magneticod/constants.py @@ -0,0 +1,11 @@ +# coding=utf-8 +DEFAULT_MAX_METADATA_SIZE = 10 * 1024 * 1024 +BOOTSTRAPPING_NODES = [ + ("router.bittorrent.com", 6881), + ("dht.transmissionbt.com", 6881) +] +PENDING_INFO_HASHES = 10 # threshold for pending info hashes before being committed to database: + +TICK_INTERVAL = 1 # in seconds (soft constraint) + # maximum (inclusive) number of active (disposable) peers to fetch the metadata per info hash at the same time: +MAX_ACTIVE_PEERS_PER_INFO_HASH = 5 diff --git a/magneticod/magneticod/dht.py b/magneticod/magneticod/dht.py index dfbfb36..5e2f057 100644 --- a/magneticod/magneticod/dht.py +++ b/magneticod/magneticod/dht.py @@ -20,6 +20,7 @@ import socket import typing import os +from .constants import BOOTSTRAPPING_NODES, DEFAULT_MAX_METADATA_SIZE from . import bencode NodeID = bytes @@ -28,12 +29,6 @@ PeerAddress = typing.Tuple[str, int] InfoHash = bytes -BOOTSTRAPPING_NODES = [ - ("router.bittorrent.com", 6881), - ("dht.transmissionbt.com", 6881) -] - - class SybilNode: def __init__(self, address: typing.Tuple[str, int]): self.__true_id = self.__random_bytes(20) @@ -48,7 +43,6 @@ class SybilNode: self.__routing_table = {} # type: typing.Dict[NodeID, NodeAddress] self.__token_secret = self.__random_bytes(4) - # Maximum number of neighbours (this is a THRESHOLD where, once reached, the search for new neighbours will # stop; but until then, the total number of neighbours might exceed the threshold). self.__n_max_neighbours = 2000 diff --git a/magneticod/magneticod/persistence.py b/magneticod/magneticod/persistence.py index c1f73fd..56d1966 100644 --- a/magneticod/magneticod/persistence.py +++ b/magneticod/magneticod/persistence.py @@ -18,11 +18,9 @@ import time import typing import os -from . import bencode +from magneticod import bencode - -# threshold for pending info hashes before being committed to database: -PENDING_INFO_HASHES = 10 +from .constants import PENDING_INFO_HASHES class Database: diff --git a/magneticod/setup.py b/magneticod/setup.py index 1219fe4..b8ba4d9 100644 --- a/magneticod/setup.py +++ b/magneticod/setup.py @@ -23,7 +23,8 @@ setup( install_requires=[ "appdirs >= 1.4.3", - "bencoder.pyx >= 1.1.3" + "bencoder.pyx >= 1.1.3", + "humanfriendly" ], classifiers=[