More cleanup. Only hit bootstrap if it seems necessary.

This commit is contained in:
Richard Kiss 2017-05-14 13:37:22 -07:00
parent 635fbe8cb1
commit 4515fa8b0a
4 changed files with 18 additions and 14 deletions

View File

@ -57,10 +57,9 @@ def main():
loop.run_forever() loop.run_forever()
except KeyboardInterrupt: except KeyboardInterrupt:
logging.critical("Keyboard interrupt received! Exiting gracefully...") logging.critical("Keyboard interrupt received! Exiting gracefully...")
pass
finally: finally:
database.close() database.close()
node.shutdown() loop.run_until_complete(node.shutdown())
return 0 return 0

View File

@ -20,7 +20,6 @@ import typing
import os import os
from . import bencode from . import bencode
from .constants import DEFAULT_MAX_METADATA_SIZE
InfoHash = bytes InfoHash = bytes
PeerAddress = typing.Tuple[str, int] PeerAddress = typing.Tuple[str, int]
@ -36,8 +35,7 @@ class ProtocolError(Exception):
class DisposablePeer: class DisposablePeer:
async def run(self, loop, info_hash: InfoHash, peer_addr: PeerAddress, async def run(self, loop, info_hash: InfoHash, peer_addr: PeerAddress, max_metadata_size: int):
max_metadata_size: int=DEFAULT_MAX_METADATA_SIZE):
self.__peer_addr = peer_addr self.__peer_addr = peer_addr
self.__info_hash = info_hash self.__info_hash = info_hash

View File

@ -6,8 +6,7 @@ BOOTSTRAPPING_NODES = [
] ]
PENDING_INFO_HASHES = 10 # threshold for pending info hashes before being committed to database: PENDING_INFO_HASHES = 10 # threshold for pending info hashes before being committed to database:
TICK_INTERVAL = 1 # in seconds (soft constraint)
# maximum (inclusive) number of active (disposable) peers to fetch the metadata per info hash at the same time: # maximum (inclusive) number of active (disposable) peers to fetch the metadata per info hash at the same time:
MAX_ACTIVE_PEERS_PER_INFO_HASH = 5 MAX_ACTIVE_PEERS_PER_INFO_HASH = 5
PEER_TIMEOUT=12 # seconds PEER_TIMEOUT=120 # seconds

View File

@ -12,7 +12,6 @@
# #
# You should have received a copy of the GNU Affero General Public License along with this program. If not, see # You should have received a copy of the GNU Affero General Public License along with this program. If not, see
# <http://www.gnu.org/licenses/>. # <http://www.gnu.org/licenses/>.
import array
import asyncio import asyncio
import collections import collections
import itertools import itertools
@ -22,7 +21,7 @@ import socket
import typing import typing
import os import os
from .constants import BOOTSTRAPPING_NODES, DEFAULT_MAX_METADATA_SIZE, MAX_ACTIVE_PEERS_PER_INFO_HASH, PEER_TIMEOUT from .constants import BOOTSTRAPPING_NODES, MAX_ACTIVE_PEERS_PER_INFO_HASH, PEER_TIMEOUT
from . import bencode from . import bencode
from . import bittorrent from . import bittorrent
@ -49,6 +48,7 @@ class SybilNode:
self._complete_info_hashes = complete_info_hashes self._complete_info_hashes = complete_info_hashes
self.__max_metadata_size = max_metadata_size self.__max_metadata_size = max_metadata_size
self._metadata_q = asyncio.Queue() self._metadata_q = asyncio.Queue()
self._tasks = []
logging.info("SybilNode %s on %s initialized!", self.__true_id.hex().upper(), address) logging.info("SybilNode %s on %s initialized!", self.__true_id.hex().upper(), address)
@ -57,8 +57,8 @@ class SybilNode:
await loop.create_datagram_endpoint(lambda: self, local_addr=self.__address) await loop.create_datagram_endpoint(lambda: self, local_addr=self.__address)
def connection_made(self, transport): def connection_made(self, transport):
self._loop.create_task(self.on_tick()) self._tasks.append(self._loop.create_task(self.on_tick()))
self._loop.create_task(self.increase_neighbour_task()) self._tasks.append(self._loop.create_task(self.increase_neighbour_task()))
self._transport = transport self._transport = transport
def error_received(self, exc): def error_received(self, exc):
@ -74,6 +74,7 @@ class SybilNode:
async def on_tick(self) -> None: async def on_tick(self) -> None:
while True: while True:
await asyncio.sleep(1) await asyncio.sleep(1)
if len(self._routing_table) == 0:
self.__bootstrap() self.__bootstrap()
self.__make_neighbours() self.__make_neighbours()
self._routing_table.clear() self._routing_table.clear()
@ -100,9 +101,16 @@ class SybilNode:
await asyncio.sleep(10) await asyncio.sleep(10)
self.__n_max_neighbours = self.__n_max_neighbours * 101 // 100 self.__n_max_neighbours = self.__n_max_neighbours * 101 // 100
def shutdown(self) -> None: async def shutdown(self) -> None:
for peer in itertools.chain.from_iterable(self.__peers.values()): peers = [peer for peer in itertools.chain.from_iterable(self.__peers.values())]
for peer in peers:
peer.cancel() peer.cancel()
for peer in peers:
await peer
for task in self._tasks:
task.cancel()
for task in self._tasks:
await task
self._transport.close() self._transport.close()
def __on_FIND_NODE_response(self, message: bencode.KRPCDict) -> None: def __on_FIND_NODE_response(self, message: bencode.KRPCDict) -> None: