From 5d37737d0dad2099fa407d3936aeb9a378946bab Mon Sep 17 00:00:00 2001 From: Richard Kiss Date: Sat, 27 May 2017 12:20:24 -0700 Subject: [PATCH 1/3] Add some resource debug logging. --- magneticod/magneticod/dht.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/magneticod/magneticod/dht.py b/magneticod/magneticod/dht.py index b2e83e1..c136401 100644 --- a/magneticod/magneticod/dht.py +++ b/magneticod/magneticod/dht.py @@ -92,6 +92,9 @@ class SybilNode: self._routing_table.clear() if not self._is_paused: self.__n_max_neighbours = self.__n_max_neighbours * 101 // 100 + logging.debug("fetch metadata task count: %d", sum( + x.child_count for x in self.__tasks.values())) + logging.debug("asyncio task count: %d", len(asyncio.Task.all_tasks())) def datagram_received(self, data, addr) -> None: # Ignore nodes that uses port 0 (assholes). From d7ead951a472e65ec707c92906c82224381ad63c Mon Sep 17 00:00:00 2001 From: Richard Kiss Date: Sat, 27 May 2017 16:05:22 -0700 Subject: [PATCH 2/3] Refactor create_tasks out of main. --- magneticod/magneticod/__main__.py | 40 ++++++++++++++++++------------- magneticod/magneticod/dht.py | 3 +++ 2 files changed, 26 insertions(+), 17 deletions(-) diff --git a/magneticod/magneticod/__main__.py b/magneticod/magneticod/__main__.py index 02a724b..dc7e0cf 100644 --- a/magneticod/magneticod/__main__.py +++ b/magneticod/magneticod/__main__.py @@ -31,8 +31,8 @@ from . import dht from . import persistence -def main(): - arguments = parse_cmdline_arguments() +def create_tasks(): + arguments = parse_cmdline_arguments(sys.argv[1:]) logging.basicConfig(level=arguments.loglevel, format="%(asctime)s %(levelname)-8s %(message)s") logging.info("magneticod v%d.%d.%d started", *__version__) @@ -57,21 +57,15 @@ def main(): loop = asyncio.get_event_loop() node = dht.SybilNode(arguments.node_addr, complete_info_hashes, arguments.max_metadata_size) - loop.run_until_complete(node.launch(loop)) + loop.create_task(node.launch(loop)) + watch_q_task = loop.create_task(watch_q(database, node.metadata_q())) + watch_q_task.add_done_callback(lambda x: clean_up(loop, database, node)) + return watch_q_task - watch_q_task = loop.create_task(watch_q(database, node._metadata_q)) - try: - loop.run_forever() - except KeyboardInterrupt: - logging.critical("Keyboard interrupt received! Exiting gracefully...") - finally: - database.close() - watch_q_task.cancel() - loop.run_until_complete(node.shutdown()) - loop.run_until_complete(asyncio.wait([watch_q_task])) - - return 0 +def clean_up(loop, database, node): + database.close() + loop.run_until_complete(node.shutdown()) async def watch_q(database, q): @@ -109,7 +103,7 @@ def parse_size(value: str) -> int: raise argparse.ArgumentTypeError("Invalid argument. {}".format(e)) -def parse_cmdline_arguments() -> typing.Optional[argparse.Namespace]: +def parse_cmdline_arguments(args) -> typing.Optional[argparse.Namespace]: parser = argparse.ArgumentParser( description="Autonomous BitTorrent DHT crawler and metadata fetcher.", epilog=textwrap.dedent("""\ @@ -153,7 +147,19 @@ def parse_cmdline_arguments() -> typing.Optional[argparse.Namespace]: action="store_const", dest="loglevel", const=logging.DEBUG, default=logging.INFO, help="Print debugging information in addition to normal processing.", ) - return parser.parse_args(sys.argv[1:]) + return parser.parse_args(args) + + +def main(): + main_task = create_tasks() + try: + asyncio.get_event_loop().run_forever() + except KeyboardInterrupt: + logging.critical("Keyboard interrupt received! Exiting gracefully...") + finally: + main_task.cancel() + + return 0 if __name__ == "__main__": diff --git a/magneticod/magneticod/dht.py b/magneticod/magneticod/dht.py index c136401..9393a16 100644 --- a/magneticod/magneticod/dht.py +++ b/magneticod/magneticod/dht.py @@ -51,6 +51,9 @@ class SybilNode: logging.info("SybilNode %s on %s initialized!", self.__true_id.hex().upper(), address) + def metadata_q(self): + return self._metadata_q + async def launch(self, loop): self._loop = loop await loop.create_datagram_endpoint(lambda: self, local_addr=self.__address) From 0e389aa619fed41c93dd7b6ef2eaf24fcbe25fa0 Mon Sep 17 00:00:00 2001 From: Richard Kiss Date: Sat, 27 May 2017 16:11:29 -0700 Subject: [PATCH 3/3] Query DB when checking if an infohash is new or not. --- magneticod/magneticod/__main__.py | 2 +- magneticod/magneticod/dht.py | 7 +++---- magneticod/magneticod/persistence.py | 9 ++++++--- 3 files changed, 10 insertions(+), 8 deletions(-) diff --git a/magneticod/magneticod/__main__.py b/magneticod/magneticod/__main__.py index dc7e0cf..1e9d899 100644 --- a/magneticod/magneticod/__main__.py +++ b/magneticod/magneticod/__main__.py @@ -56,7 +56,7 @@ def create_tasks(): complete_info_hashes = database.get_complete_info_hashes() loop = asyncio.get_event_loop() - node = dht.SybilNode(arguments.node_addr, complete_info_hashes, arguments.max_metadata_size) + node = dht.SybilNode(arguments.node_addr, database.is_infohash_new, arguments.max_metadata_size) loop.create_task(node.launch(loop)) watch_q_task = loop.create_task(watch_q(database, node.metadata_q())) watch_q_task.add_done_callback(lambda x: clean_up(loop, database, node)) diff --git a/magneticod/magneticod/dht.py b/magneticod/magneticod/dht.py index 9393a16..2d5e262 100644 --- a/magneticod/magneticod/dht.py +++ b/magneticod/magneticod/dht.py @@ -31,7 +31,7 @@ InfoHash = bytes class SybilNode: - def __init__(self, address: typing.Tuple[str, int], complete_info_hashes, max_metadata_size): + def __init__(self, address: typing.Tuple[str, int], is_infohash_new, max_metadata_size): self.__true_id = self.__random_bytes(20) self.__address = address @@ -43,7 +43,7 @@ class SybilNode: # stop; but until then, the total number of neighbours might exceed the threshold). self.__n_max_neighbours = 2000 self.__tasks = {} # type: typing.Dict[dht.InfoHash, asyncio.Future] - self._complete_info_hashes = complete_info_hashes + self._is_inforhash_new = is_infohash_new self.__max_metadata_size = max_metadata_size self._metadata_q = asyncio.Queue() self._is_paused = False @@ -200,7 +200,7 @@ class SybilNode: else: peer_addr = (addr[0], port) - if info_hash in self._complete_info_hashes: + if not self._is_inforhash_new(info_hash): return # create the parent future @@ -240,7 +240,6 @@ class SybilNode: try: metadata = parent_task.result() if metadata: - self._complete_info_hashes.add(info_hash) self._metadata_q.put_nowait((info_hash, metadata)) except asyncio.CancelledError: pass diff --git a/magneticod/magneticod/persistence.py b/magneticod/magneticod/persistence.py index 56d1966..9921c6e 100644 --- a/magneticod/magneticod/persistence.py +++ b/magneticod/magneticod/persistence.py @@ -94,11 +94,14 @@ class Database: return True - def get_complete_info_hashes(self) -> typing.Set[bytes]: + def is_infohash_new(self, info_hash): + if info_hash in [x[0] for x in self.__pending_metadata]: + return False cur = self.__db_conn.cursor() try: - cur.execute("SELECT info_hash FROM torrents;") - return set(x[0] for x in cur.fetchall()) + cur.execute("SELECT count(info_hash) FROM torrents where info_hash = ?;", [info_hash]) + x, = cur.fetchone() + return x == 0 finally: cur.close()