Merge pull request #58 from ad-m/patch-1
Limit metadata size, move constants to a new file, allow custom database paths
This commit is contained in:
commit
818667b363
@ -27,18 +27,15 @@ import time
|
|||||||
import typing
|
import typing
|
||||||
|
|
||||||
import appdirs
|
import appdirs
|
||||||
|
import humanfriendly
|
||||||
|
|
||||||
|
from .constants import TICK_INTERVAL, MAX_ACTIVE_PEERS_PER_INFO_HASH, DEFAULT_MAX_METADATA_SIZE
|
||||||
from . import __version__
|
from . import __version__
|
||||||
from . import bittorrent
|
from . import bittorrent
|
||||||
from . import dht
|
from . import dht
|
||||||
from . import persistence
|
from . import persistence
|
||||||
|
|
||||||
|
|
||||||
TICK_INTERVAL = 1 # in seconds (soft constraint)
|
|
||||||
# maximum (inclusive) number of active (disposable) peers to fetch the metadata per info hash at the same time:
|
|
||||||
MAX_ACTIVE_PEERS_PER_INFO_HASH = 5
|
|
||||||
|
|
||||||
|
|
||||||
# Global variables are bad bla bla bla, BUT these variables are used so many times that I think it is justified; else
|
# Global variables are bad bla bla bla, BUT these variables are used so many times that I think it is justified; else
|
||||||
# the signatures of many functions are literally cluttered.
|
# the signatures of many functions are literally cluttered.
|
||||||
#
|
#
|
||||||
@ -55,16 +52,14 @@ complete_info_hashes = set()
|
|||||||
def main():
|
def main():
|
||||||
global complete_info_hashes, database, node, peers, selector
|
global complete_info_hashes, database, node, peers, selector
|
||||||
|
|
||||||
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)-8s %(message)s")
|
|
||||||
logging.info("magneticod v%d.%d.%d started", *__version__)
|
|
||||||
|
|
||||||
arguments = parse_cmdline_arguments()
|
arguments = parse_cmdline_arguments()
|
||||||
if arguments is None:
|
|
||||||
return 2
|
logging.basicConfig(level=arguments.loglevel, format="%(asctime)s %(levelname)-8s %(message)s")
|
||||||
|
logging.info("magneticod v%d.%d.%d started", *__version__)
|
||||||
|
|
||||||
# noinspection PyBroadException
|
# noinspection PyBroadException
|
||||||
try:
|
try:
|
||||||
path = os.path.join(appdirs.user_data_dir("magneticod"), "database.sqlite3")
|
path = arguments.database_file
|
||||||
database = persistence.Database(path)
|
database = persistence.Database(path)
|
||||||
except:
|
except:
|
||||||
logging.exception("could NOT connect to the database!")
|
logging.exception("could NOT connect to the database!")
|
||||||
@ -73,7 +68,10 @@ def main():
|
|||||||
complete_info_hashes = database.get_complete_info_hashes()
|
complete_info_hashes = database.get_complete_info_hashes()
|
||||||
|
|
||||||
node = dht.SybilNode(arguments.node_addr)
|
node = dht.SybilNode(arguments.node_addr)
|
||||||
node.when_peer_found = on_peer_found
|
|
||||||
|
node.when_peer_found = lambda info_hash, peer_address: on_peer_found(info_hash=info_hash,
|
||||||
|
peer_address=peer_address,
|
||||||
|
max_metadata_size=arguments.max_metadata_size)
|
||||||
|
|
||||||
selector.register(node, selectors.EVENT_READ)
|
selector.register(node, selectors.EVENT_READ)
|
||||||
|
|
||||||
@ -92,14 +90,14 @@ def main():
|
|||||||
return 0
|
return 0
|
||||||
|
|
||||||
|
|
||||||
def on_peer_found(info_hash: dht.InfoHash, peer_address) -> None:
|
def on_peer_found(info_hash: dht.InfoHash, peer_address, max_metadata_size: int=DEFAULT_MAX_METADATA_SIZE) -> None:
|
||||||
global selector, peers, complete_info_hashes
|
global selector, peers, complete_info_hashes
|
||||||
|
|
||||||
if len(peers[info_hash]) > MAX_ACTIVE_PEERS_PER_INFO_HASH or info_hash in complete_info_hashes:
|
if len(peers[info_hash]) > MAX_ACTIVE_PEERS_PER_INFO_HASH or info_hash in complete_info_hashes:
|
||||||
return
|
return
|
||||||
|
|
||||||
try:
|
try:
|
||||||
peer = bittorrent.DisposablePeer(info_hash, peer_address)
|
peer = bittorrent.DisposablePeer(info_hash, peer_address, max_metadata_size)
|
||||||
except ConnectionError:
|
except ConnectionError:
|
||||||
return
|
return
|
||||||
|
|
||||||
@ -171,6 +169,33 @@ def loop() -> None:
|
|||||||
selector.modify(fileobj, selectors.EVENT_READ)
|
selector.modify(fileobj, selectors.EVENT_READ)
|
||||||
|
|
||||||
|
|
||||||
|
def parse_ip_port(netloc) -> typing.Optional[typing.Tuple[str, int]]:
|
||||||
|
# In case no port supplied
|
||||||
|
try:
|
||||||
|
return str(ipaddress.ip_address(netloc)), 0
|
||||||
|
except ValueError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
# In case port supplied
|
||||||
|
try:
|
||||||
|
parsed = urllib.parse.urlparse("//{}".format(netloc))
|
||||||
|
ip = str(ipaddress.ip_address(parsed.hostname))
|
||||||
|
port = parsed.port
|
||||||
|
if port is None:
|
||||||
|
raise argparse.ArgumentParser("Invalid node address supplied!")
|
||||||
|
except ValueError:
|
||||||
|
raise argparse.ArgumentParser("Invalid node address supplied!")
|
||||||
|
|
||||||
|
return ip, port
|
||||||
|
|
||||||
|
|
||||||
|
def parse_size(value: str) -> int:
|
||||||
|
try:
|
||||||
|
return humanfriendly.parse_size(value)
|
||||||
|
except humanfriendly.InvalidSize as e:
|
||||||
|
raise argparse.ArgumentTypeError("Invalid argument. {}".format(e))
|
||||||
|
|
||||||
|
|
||||||
def parse_cmdline_arguments() -> typing.Optional[argparse.Namespace]:
|
def parse_cmdline_arguments() -> typing.Optional[argparse.Namespace]:
|
||||||
parser = argparse.ArgumentParser(
|
parser = argparse.ArgumentParser(
|
||||||
description="Autonomous BitTorrent DHT crawler and metadata fetcher.",
|
description="Autonomous BitTorrent DHT crawler and metadata fetcher.",
|
||||||
@ -194,40 +219,29 @@ def parse_cmdline_arguments() -> typing.Optional[argparse.Namespace]:
|
|||||||
allow_abbrev=False,
|
allow_abbrev=False,
|
||||||
formatter_class=argparse.RawDescriptionHelpFormatter
|
formatter_class=argparse.RawDescriptionHelpFormatter
|
||||||
)
|
)
|
||||||
|
|
||||||
parser.add_argument(
|
parser.add_argument(
|
||||||
"--node-addr", action="store", type=str, required=False,
|
"--node-addr", action="store", type=parse_ip_port, required=False, default="0.0.0.0:0",
|
||||||
help="the address of the (DHT) node magneticod will use"
|
help="the address of the (DHT) node magneticod will use"
|
||||||
)
|
)
|
||||||
|
|
||||||
args = parser.parse_args(sys.argv[1:])
|
parser.add_argument(
|
||||||
|
"--max-metadata-size", type=parse_size, default=DEFAULT_MAX_METADATA_SIZE,
|
||||||
|
help="Limit metadata size to protect memory overflow. Provide in human friendly format eg. 1 M, 1 GB"
|
||||||
|
)
|
||||||
|
|
||||||
args.node_addr = parse_ip_port(args.node_addr) if args.node_addr else ("0.0.0.0", 0)
|
default_database_dir = os.path.join(appdirs.user_data_dir("magneticod"), "database.sqlite3")
|
||||||
if args.node_addr is None:
|
parser.add_argument(
|
||||||
logging.critical("Invalid node address supplied!")
|
"--database-file", type=str, default=default_database_dir,
|
||||||
return None
|
help="Path to database file (default: {})".format(humanfriendly.format_path(default_database_dir))
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
'-d', '--debug',
|
||||||
|
action="store_const", dest="loglevel", const=logging.DEBUG, default=logging.INFO,
|
||||||
|
help="Print debugging information in addition to normal processing.",
|
||||||
|
)
|
||||||
|
return parser.parse_args(sys.argv[1:])
|
||||||
|
|
||||||
return args
|
|
||||||
|
|
||||||
|
|
||||||
def parse_ip_port(netloc) -> typing.Optional[typing.Tuple[str, int]]:
|
|
||||||
# In case no port supplied
|
|
||||||
try:
|
|
||||||
return str(ipaddress.ip_address(netloc)), 0
|
|
||||||
except ValueError:
|
|
||||||
pass
|
|
||||||
|
|
||||||
# In case port supplied
|
|
||||||
try:
|
|
||||||
parsed = urllib.parse.urlparse("//{}".format(netloc))
|
|
||||||
ip = str(ipaddress.ip_address(parsed.hostname))
|
|
||||||
port = parsed.port
|
|
||||||
if port is None:
|
|
||||||
# Invalid port
|
|
||||||
return None
|
|
||||||
except ValueError:
|
|
||||||
return None
|
|
||||||
|
|
||||||
return ip, port
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
sys.exit(main())
|
sys.exit(main())
|
||||||
|
@ -21,14 +21,14 @@ import typing
|
|||||||
import os
|
import os
|
||||||
|
|
||||||
from . import bencode
|
from . import bencode
|
||||||
|
from .constants import DEFAULT_MAX_METADATA_SIZE
|
||||||
|
|
||||||
InfoHash = bytes
|
InfoHash = bytes
|
||||||
PeerAddress = typing.Tuple[str, int]
|
PeerAddress = typing.Tuple[str, int]
|
||||||
|
|
||||||
|
|
||||||
class DisposablePeer:
|
class DisposablePeer:
|
||||||
def __init__(self, info_hash: InfoHash, peer_addr: PeerAddress):
|
def __init__(self, info_hash: InfoHash, peer_addr: PeerAddress, max_metadata_size: int= DEFAULT_MAX_METADATA_SIZE):
|
||||||
self.__socket = socket.socket()
|
self.__socket = socket.socket()
|
||||||
self.__socket.setblocking(False)
|
self.__socket.setblocking(False)
|
||||||
# To reduce the latency:
|
# To reduce the latency:
|
||||||
@ -40,8 +40,11 @@ class DisposablePeer:
|
|||||||
if res != errno.EINPROGRESS:
|
if res != errno.EINPROGRESS:
|
||||||
raise ConnectionError()
|
raise ConnectionError()
|
||||||
|
|
||||||
|
self.__peer_addr = peer_addr
|
||||||
self.__info_hash = info_hash
|
self.__info_hash = info_hash
|
||||||
|
|
||||||
|
self.__max_metadata_size = max_metadata_size
|
||||||
|
|
||||||
self.__incoming_buffer = bytearray()
|
self.__incoming_buffer = bytearray()
|
||||||
self.__outgoing_buffer = bytearray()
|
self.__outgoing_buffer = bytearray()
|
||||||
|
|
||||||
@ -209,8 +212,16 @@ class DisposablePeer:
|
|||||||
# Just to make sure that the remote peer supports ut_metadata extension:
|
# Just to make sure that the remote peer supports ut_metadata extension:
|
||||||
ut_metadata = msg_dict[b"m"][b"ut_metadata"]
|
ut_metadata = msg_dict[b"m"][b"ut_metadata"]
|
||||||
metadata_size = msg_dict[b"metadata_size"]
|
metadata_size = msg_dict[b"metadata_size"]
|
||||||
assert metadata_size > 0
|
assert metadata_size > 0, "Invalid (empty) metadata size"
|
||||||
except (AssertionError, KeyError):
|
assert metadata_size < self.__max_metadata_size, "Malicious or malfunctioning peer {}:{} tried send above" \
|
||||||
|
" {} max metadata size".format(self.__peer_addr[0],
|
||||||
|
self.__peer_addr[1],
|
||||||
|
self.__max_metadata_size)
|
||||||
|
except KeyError:
|
||||||
|
self.when_error()
|
||||||
|
return
|
||||||
|
except AssertionError as e:
|
||||||
|
logging.debug(str(e))
|
||||||
self.when_error()
|
self.when_error()
|
||||||
return
|
return
|
||||||
|
|
||||||
|
11
magneticod/magneticod/constants.py
Normal file
11
magneticod/magneticod/constants.py
Normal file
@ -0,0 +1,11 @@
|
|||||||
|
# coding=utf-8
|
||||||
|
DEFAULT_MAX_METADATA_SIZE = 10 * 1024 * 1024
|
||||||
|
BOOTSTRAPPING_NODES = [
|
||||||
|
("router.bittorrent.com", 6881),
|
||||||
|
("dht.transmissionbt.com", 6881)
|
||||||
|
]
|
||||||
|
PENDING_INFO_HASHES = 10 # threshold for pending info hashes before being committed to database:
|
||||||
|
|
||||||
|
TICK_INTERVAL = 1 # in seconds (soft constraint)
|
||||||
|
# maximum (inclusive) number of active (disposable) peers to fetch the metadata per info hash at the same time:
|
||||||
|
MAX_ACTIVE_PEERS_PER_INFO_HASH = 5
|
@ -20,6 +20,7 @@ import socket
|
|||||||
import typing
|
import typing
|
||||||
import os
|
import os
|
||||||
|
|
||||||
|
from .constants import BOOTSTRAPPING_NODES, DEFAULT_MAX_METADATA_SIZE
|
||||||
from . import bencode
|
from . import bencode
|
||||||
|
|
||||||
NodeID = bytes
|
NodeID = bytes
|
||||||
@ -28,12 +29,6 @@ PeerAddress = typing.Tuple[str, int]
|
|||||||
InfoHash = bytes
|
InfoHash = bytes
|
||||||
|
|
||||||
|
|
||||||
BOOTSTRAPPING_NODES = [
|
|
||||||
("router.bittorrent.com", 6881),
|
|
||||||
("dht.transmissionbt.com", 6881)
|
|
||||||
]
|
|
||||||
|
|
||||||
|
|
||||||
class SybilNode:
|
class SybilNode:
|
||||||
def __init__(self, address: typing.Tuple[str, int]):
|
def __init__(self, address: typing.Tuple[str, int]):
|
||||||
self.__true_id = self.__random_bytes(20)
|
self.__true_id = self.__random_bytes(20)
|
||||||
@ -48,7 +43,6 @@ class SybilNode:
|
|||||||
self.__routing_table = {} # type: typing.Dict[NodeID, NodeAddress]
|
self.__routing_table = {} # type: typing.Dict[NodeID, NodeAddress]
|
||||||
|
|
||||||
self.__token_secret = self.__random_bytes(4)
|
self.__token_secret = self.__random_bytes(4)
|
||||||
|
|
||||||
# Maximum number of neighbours (this is a THRESHOLD where, once reached, the search for new neighbours will
|
# Maximum number of neighbours (this is a THRESHOLD where, once reached, the search for new neighbours will
|
||||||
# stop; but until then, the total number of neighbours might exceed the threshold).
|
# stop; but until then, the total number of neighbours might exceed the threshold).
|
||||||
self.__n_max_neighbours = 2000
|
self.__n_max_neighbours = 2000
|
||||||
|
@ -18,11 +18,9 @@ import time
|
|||||||
import typing
|
import typing
|
||||||
import os
|
import os
|
||||||
|
|
||||||
from . import bencode
|
from magneticod import bencode
|
||||||
|
|
||||||
|
from .constants import PENDING_INFO_HASHES
|
||||||
# threshold for pending info hashes before being committed to database:
|
|
||||||
PENDING_INFO_HASHES = 10
|
|
||||||
|
|
||||||
|
|
||||||
class Database:
|
class Database:
|
||||||
|
@ -23,7 +23,8 @@ setup(
|
|||||||
|
|
||||||
install_requires=[
|
install_requires=[
|
||||||
"appdirs >= 1.4.3",
|
"appdirs >= 1.4.3",
|
||||||
"bencoder.pyx >= 1.1.3"
|
"bencoder.pyx >= 1.1.3",
|
||||||
|
"humanfriendly"
|
||||||
],
|
],
|
||||||
|
|
||||||
classifiers=[
|
classifiers=[
|
||||||
|
Loading…
Reference in New Issue
Block a user