From 4b4c312fbed4f5c4561cc046cd69c66955629941 Mon Sep 17 00:00:00 2001 From: Richard Kiss Date: Sun, 14 May 2017 13:58:21 -0700 Subject: [PATCH] SybilNode now support pause_writing. --- magneticod/magneticod/bittorrent.py | 7 ------- magneticod/magneticod/dht.py | 20 ++++++++++++++++---- 2 files changed, 16 insertions(+), 11 deletions(-) diff --git a/magneticod/magneticod/bittorrent.py b/magneticod/magneticod/bittorrent.py index f5784af..dce6336 100644 --- a/magneticod/magneticod/bittorrent.py +++ b/magneticod/magneticod/bittorrent.py @@ -52,7 +52,6 @@ class DisposablePeer: self._metadata_future = loop.create_future() self._writer = None - self._is_paused = False try: self._reader, self._writer = await asyncio.open_connection( @@ -89,12 +88,6 @@ class DisposablePeer: self._writer.close() return self._metadata_future.result() - def pause_writing(self): - self._is_paused = True - - def resume_writing(self): - self._is_paused = False - def __on_message(self, message: bytes) -> None: length = len(message) diff --git a/magneticod/magneticod/dht.py b/magneticod/magneticod/dht.py index a39ef4d..91cbe4a 100644 --- a/magneticod/magneticod/dht.py +++ b/magneticod/magneticod/dht.py @@ -49,6 +49,7 @@ class SybilNode: self.__max_metadata_size = max_metadata_size self._metadata_q = asyncio.Queue() self._tasks = [] + self._is_paused = False logging.info("SybilNode %s on %s initialized!", self.__true_id.hex().upper(), address) @@ -61,6 +62,17 @@ class SybilNode: self._tasks.append(self._loop.create_task(self.increase_neighbour_task())) self._transport = transport + def pause_writing(self): + self._is_paused = True + + def resume_writing(self): + self._is_paused = False + + def sendto(self, data, addr): + if self._is_paused: + return + self._transport.sendto(data, addr) + def error_received(self, exc): logging.error("got error %s", exc) if isinstance(exc, PermissionError): @@ -148,7 +160,7 @@ class SybilNode: }) # we want to prioritise GET_PEERS responses as they are the most fruitful ones! # but there is no easy way to do this with asyncio - self._transport.sendto(data, addr) + self.sendto(data, addr) def __on_ANNOUNCE_PEER_query(self, message: bencode.KRPCDict, addr: NodeAddress) -> None: try: @@ -178,7 +190,7 @@ class SybilNode: b"id": node_id[:15] + self.__true_id[:5] } }) - self._transport.sendto(data, addr) + self.sendto(data, addr) if implied_port: peer_addr = (addr[0], addr[1]) @@ -208,11 +220,11 @@ class SybilNode: def __bootstrap(self) -> None: for addr in BOOTSTRAPPING_NODES: data = self.__build_FIND_NODE_query(self.__true_id) - self._transport.sendto(data, addr) + self.sendto(data, addr) def __make_neighbours(self) -> None: for node_id, addr in self._routing_table.items(): - self._transport.sendto(self.__build_FIND_NODE_query(node_id[:15] + self.__true_id[:5]), addr) + self.sendto(self.__build_FIND_NODE_query(node_id[:15] + self.__true_id[:5]), addr) @staticmethod def __decode_nodes(infos: bytes) -> typing.List[typing.Tuple[NodeID, NodeAddress]]: