SybilNode now support pause_writing.

This commit is contained in:
Richard Kiss 2017-05-14 13:58:21 -07:00
parent 73d97d8188
commit 4b4c312fbe
2 changed files with 16 additions and 11 deletions

View File

@ -52,7 +52,6 @@ class DisposablePeer:
self._metadata_future = loop.create_future()
self._writer = None
self._is_paused = False
try:
self._reader, self._writer = await asyncio.open_connection(
@ -89,12 +88,6 @@ class DisposablePeer:
self._writer.close()
return self._metadata_future.result()
def pause_writing(self):
self._is_paused = True
def resume_writing(self):
self._is_paused = False
def __on_message(self, message: bytes) -> None:
length = len(message)

View File

@ -49,6 +49,7 @@ class SybilNode:
self.__max_metadata_size = max_metadata_size
self._metadata_q = asyncio.Queue()
self._tasks = []
self._is_paused = False
logging.info("SybilNode %s on %s initialized!", self.__true_id.hex().upper(), address)
@ -61,6 +62,17 @@ class SybilNode:
self._tasks.append(self._loop.create_task(self.increase_neighbour_task()))
self._transport = transport
def pause_writing(self):
self._is_paused = True
def resume_writing(self):
self._is_paused = False
def sendto(self, data, addr):
if self._is_paused:
return
self._transport.sendto(data, addr)
def error_received(self, exc):
logging.error("got error %s", exc)
if isinstance(exc, PermissionError):
@ -148,7 +160,7 @@ class SybilNode:
})
# we want to prioritise GET_PEERS responses as they are the most fruitful ones!
# but there is no easy way to do this with asyncio
self._transport.sendto(data, addr)
self.sendto(data, addr)
def __on_ANNOUNCE_PEER_query(self, message: bencode.KRPCDict, addr: NodeAddress) -> None:
try:
@ -178,7 +190,7 @@ class SybilNode:
b"id": node_id[:15] + self.__true_id[:5]
}
})
self._transport.sendto(data, addr)
self.sendto(data, addr)
if implied_port:
peer_addr = (addr[0], addr[1])
@ -208,11 +220,11 @@ class SybilNode:
def __bootstrap(self) -> None:
for addr in BOOTSTRAPPING_NODES:
data = self.__build_FIND_NODE_query(self.__true_id)
self._transport.sendto(data, addr)
self.sendto(data, addr)
def __make_neighbours(self) -> None:
for node_id, addr in self._routing_table.items():
self._transport.sendto(self.__build_FIND_NODE_query(node_id[:15] + self.__true_id[:5]), addr)
self.sendto(self.__build_FIND_NODE_query(node_id[:15] + self.__true_id[:5]), addr)
@staticmethod
def __decode_nodes(infos: bytes) -> typing.List[typing.Tuple[NodeID, NodeAddress]]: