diff --git a/magneticod/magneticod/__main__.py b/magneticod/magneticod/__main__.py index a99dbdd..fd80c43 100644 --- a/magneticod/magneticod/__main__.py +++ b/magneticod/magneticod/__main__.py @@ -31,17 +31,6 @@ from . import dht from . import persistence -async def metadata_queue_watcher(database: persistence.Database, metadata_queue: asyncio.Queue) -> None: - """ - Watches for the metadata queue to commit any complete info hashes to the database. - """ - while True: - info_hash, metadata = await metadata_queue.get() - succeeded = database.add_metadata(info_hash, metadata) - if not succeeded: - logging.info("Corrupt metadata for %s! Ignoring.", info_hash.hex()) - - def parse_ip_port(netloc: str) -> typing.Optional[typing.Tuple[str, int]]: # In case no port supplied try: @@ -140,17 +129,14 @@ def main() -> int: return 1 loop = asyncio.get_event_loop() - node = dht.SybilNode(database.is_infohash_new, arguments.max_metadata_size) + node = dht.SybilNode(database, arguments.max_metadata_size) loop.create_task(node.launch(arguments.node_addr)) - # mypy ignored: mypy doesn't know (yet) about coroutines - metadata_queue_watcher_task = loop.create_task(metadata_queue_watcher(database, node.metadata_q())) # type: ignore try: asyncio.get_event_loop().run_forever() except KeyboardInterrupt: logging.critical("Keyboard interrupt received! Exiting gracefully...") finally: - metadata_queue_watcher_task.cancel() loop.run_until_complete(node.shutdown()) database.close() diff --git a/magneticod/magneticod/dht.py b/magneticod/magneticod/dht.py index 7991dba..452a9b8 100644 --- a/magneticod/magneticod/dht.py +++ b/magneticod/magneticod/dht.py @@ -23,6 +23,7 @@ import os from .constants import BOOTSTRAPPING_NODES, MAX_ACTIVE_PEERS_PER_INFO_HASH, PEER_TIMEOUT, TICK_INTERVAL from . import bencode from . import bittorrent +from . import persistence NodeID = bytes NodeAddress = typing.Tuple[str, int] @@ -32,7 +33,7 @@ Metadata = bytes class SybilNode(asyncio.DatagramProtocol): - def __init__(self, is_info_hash_new, max_metadata_size): + def __init__(self, database: persistence.Database, max_metadata_size): self.__true_id = os.urandom(20) self._routing_table = {} # type: typing.Dict[NodeID, NodeAddress] @@ -42,7 +43,7 @@ class SybilNode(asyncio.DatagramProtocol): # stop; but until then, the total number of neighbours might exceed the threshold). self.__n_max_neighbours = 2000 self.__parent_futures = {} # type: typing.Dict[InfoHash, asyncio.Future] - self._is_info_hash_new = is_info_hash_new + self.__database = database self.__max_metadata_size = max_metadata_size # Complete metadatas will be added to the queue, to be retrieved and committed to the database. self.__metadata_queue = asyncio.Queue() # typing.Collection[typing.Tuple[InfoHash, Metadata]] @@ -289,13 +290,17 @@ class SybilNode(asyncio.DatagramProtocol): parent_task.set_result(None) def _parent_task_done(self, parent_task, info_hash): + del self.__parent_futures[info_hash] try: metadata = parent_task.result() - if metadata: - self.__metadata_queue.put_nowait((info_hash, metadata)) + if not metadata: + return except asyncio.CancelledError: - pass - del self.__parent_futures[info_hash] + return + + succeeded = self.__database.add_metadata(info_hash, metadata) + if not succeeded: + logging.info("Corrupt metadata for %s! Ignoring.", info_hash.hex()) async def __bootstrap(self) -> None: event_loop = asyncio.get_event_loop()