From 4515fa8b0aded0781c9ede19aaf3fcbf8aec2ccf Mon Sep 17 00:00:00 2001 From: Richard Kiss Date: Sun, 14 May 2017 13:37:22 -0700 Subject: [PATCH] More cleanup. Only hit bootstrap if it seems necessary. --- magneticod/magneticod/__main__.py | 3 +-- magneticod/magneticod/bittorrent.py | 4 +--- magneticod/magneticod/constants.py | 3 +-- magneticod/magneticod/dht.py | 22 +++++++++++++++------- 4 files changed, 18 insertions(+), 14 deletions(-) diff --git a/magneticod/magneticod/__main__.py b/magneticod/magneticod/__main__.py index a3729c5..8295536 100644 --- a/magneticod/magneticod/__main__.py +++ b/magneticod/magneticod/__main__.py @@ -57,10 +57,9 @@ def main(): loop.run_forever() except KeyboardInterrupt: logging.critical("Keyboard interrupt received! Exiting gracefully...") - pass finally: database.close() - node.shutdown() + loop.run_until_complete(node.shutdown()) return 0 diff --git a/magneticod/magneticod/bittorrent.py b/magneticod/magneticod/bittorrent.py index 75e0c56..dce6336 100644 --- a/magneticod/magneticod/bittorrent.py +++ b/magneticod/magneticod/bittorrent.py @@ -20,7 +20,6 @@ import typing import os from . import bencode -from .constants import DEFAULT_MAX_METADATA_SIZE InfoHash = bytes PeerAddress = typing.Tuple[str, int] @@ -36,8 +35,7 @@ class ProtocolError(Exception): class DisposablePeer: - async def run(self, loop, info_hash: InfoHash, peer_addr: PeerAddress, - max_metadata_size: int=DEFAULT_MAX_METADATA_SIZE): + async def run(self, loop, info_hash: InfoHash, peer_addr: PeerAddress, max_metadata_size: int): self.__peer_addr = peer_addr self.__info_hash = info_hash diff --git a/magneticod/magneticod/constants.py b/magneticod/magneticod/constants.py index b99ebff..5f2b1b2 100644 --- a/magneticod/magneticod/constants.py +++ b/magneticod/magneticod/constants.py @@ -6,8 +6,7 @@ BOOTSTRAPPING_NODES = [ ] PENDING_INFO_HASHES = 10 # threshold for pending info hashes before being committed to database: -TICK_INTERVAL = 1 # in seconds (soft constraint) # maximum (inclusive) number of active (disposable) peers to fetch the metadata per info hash at the same time: MAX_ACTIVE_PEERS_PER_INFO_HASH = 5 -PEER_TIMEOUT=12 # seconds +PEER_TIMEOUT=120 # seconds diff --git a/magneticod/magneticod/dht.py b/magneticod/magneticod/dht.py index be4f045..79f64ac 100644 --- a/magneticod/magneticod/dht.py +++ b/magneticod/magneticod/dht.py @@ -12,7 +12,6 @@ # # You should have received a copy of the GNU Affero General Public License along with this program. If not, see # . -import array import asyncio import collections import itertools @@ -22,7 +21,7 @@ import socket import typing import os -from .constants import BOOTSTRAPPING_NODES, DEFAULT_MAX_METADATA_SIZE, MAX_ACTIVE_PEERS_PER_INFO_HASH, PEER_TIMEOUT +from .constants import BOOTSTRAPPING_NODES, MAX_ACTIVE_PEERS_PER_INFO_HASH, PEER_TIMEOUT from . import bencode from . import bittorrent @@ -49,6 +48,7 @@ class SybilNode: self._complete_info_hashes = complete_info_hashes self.__max_metadata_size = max_metadata_size self._metadata_q = asyncio.Queue() + self._tasks = [] logging.info("SybilNode %s on %s initialized!", self.__true_id.hex().upper(), address) @@ -57,8 +57,8 @@ class SybilNode: await loop.create_datagram_endpoint(lambda: self, local_addr=self.__address) def connection_made(self, transport): - self._loop.create_task(self.on_tick()) - self._loop.create_task(self.increase_neighbour_task()) + self._tasks.append(self._loop.create_task(self.on_tick())) + self._tasks.append(self._loop.create_task(self.increase_neighbour_task())) self._transport = transport def error_received(self, exc): @@ -74,7 +74,8 @@ class SybilNode: async def on_tick(self) -> None: while True: await asyncio.sleep(1) - self.__bootstrap() + if len(self._routing_table) == 0: + self.__bootstrap() self.__make_neighbours() self._routing_table.clear() @@ -100,9 +101,16 @@ class SybilNode: await asyncio.sleep(10) self.__n_max_neighbours = self.__n_max_neighbours * 101 // 100 - def shutdown(self) -> None: - for peer in itertools.chain.from_iterable(self.__peers.values()): + async def shutdown(self) -> None: + peers = [peer for peer in itertools.chain.from_iterable(self.__peers.values())] + for peer in peers: peer.cancel() + for peer in peers: + await peer + for task in self._tasks: + task.cancel() + for task in self._tasks: + await task self._transport.close() def __on_FIND_NODE_response(self, message: bencode.KRPCDict) -> None: