From 8da8d20b53f10b20ef5310c2bc3ae7e9ee991815 Mon Sep 17 00:00:00 2001 From: Richard Kiss Date: Sat, 13 May 2017 22:25:34 -0700 Subject: [PATCH 01/20] First crack at porting to asyncio. --- magneticod/magneticod/__main__.py | 120 ++----------------- magneticod/magneticod/bittorrent.py | 163 +++++++------------------ magneticod/magneticod/dht.py | 177 +++++++++++++--------------- 3 files changed, 140 insertions(+), 320 deletions(-) diff --git a/magneticod/magneticod/__main__.py b/magneticod/magneticod/__main__.py index 9e3d80b..a3729c5 100644 --- a/magneticod/magneticod/__main__.py +++ b/magneticod/magneticod/__main__.py @@ -13,45 +13,25 @@ # You should have received a copy of the GNU Affero General Public License along with this program. If not, see # . import argparse -import collections -import functools +import asyncio import logging import ipaddress -import selectors import textwrap import urllib.parse -import itertools import os import sys -import time import typing import appdirs import humanfriendly -from .constants import TICK_INTERVAL, MAX_ACTIVE_PEERS_PER_INFO_HASH, DEFAULT_MAX_METADATA_SIZE +from .constants import DEFAULT_MAX_METADATA_SIZE from . import __version__ -from . import bittorrent from . import dht from . import persistence -# Global variables are bad bla bla bla, BUT these variables are used so many times that I think it is justified; else -# the signatures of many functions are literally cluttered. -# -# If you are using a global variable, please always indicate that at the VERY BEGINNING of the function instead of right -# before using the variable for the first time. -selector = selectors.DefaultSelector() -database = None # type: persistence.Database -node = None -peers = collections.defaultdict(list) # type: typing.DefaultDict[dht.InfoHash, typing.List[bittorrent.DisposablePeer]] -# info hashes whose metadata is valid & complete (OR complete but deemed to be corrupt) so do NOT download them again: -complete_info_hashes = set() - - def main(): - global complete_info_hashes, database, node, peers, selector - arguments = parse_cmdline_arguments() logging.basicConfig(level=arguments.loglevel, format="%(asctime)s %(levelname)-8s %(message)s") @@ -67,106 +47,30 @@ def main(): complete_info_hashes = database.get_complete_info_hashes() - node = dht.SybilNode(arguments.node_addr) + loop = asyncio.get_event_loop() + node = dht.SybilNode(arguments.node_addr, complete_info_hashes, arguments.max_metadata_size) + loop.run_until_complete(node.launch(loop)) - node.when_peer_found = lambda info_hash, peer_address: on_peer_found(info_hash=info_hash, - peer_address=peer_address, - max_metadata_size=arguments.max_metadata_size) - - selector.register(node, selectors.EVENT_READ) + loop.create_task(watch_q(database, node._metadata_q)) try: - loop() + loop.run_forever() except KeyboardInterrupt: logging.critical("Keyboard interrupt received! Exiting gracefully...") pass finally: database.close() - selector.close() node.shutdown() - for peer in itertools.chain.from_iterable(peers.values()): - peer.shutdown() return 0 -def on_peer_found(info_hash: dht.InfoHash, peer_address, max_metadata_size: int=DEFAULT_MAX_METADATA_SIZE) -> None: - global selector, peers, complete_info_hashes - - if len(peers[info_hash]) > MAX_ACTIVE_PEERS_PER_INFO_HASH or info_hash in complete_info_hashes: - return - - try: - peer = bittorrent.DisposablePeer(info_hash, peer_address, max_metadata_size) - except ConnectionError: - return - - selector.register(peer, selectors.EVENT_READ | selectors.EVENT_WRITE) - peer.when_metadata_found = on_metadata_found - peer.when_error = functools.partial(on_peer_error, peer, info_hash) - peers[info_hash].append(peer) - - -def on_metadata_found(info_hash: dht.InfoHash, metadata: bytes) -> None: - global complete_info_hashes, database, peers, selector - - succeeded = database.add_metadata(info_hash, metadata) - if not succeeded: - logging.info("Corrupt metadata for %s! Ignoring.", info_hash.hex()) - - # When we fetch the metadata of an info hash completely, shut down all other peers who are trying to do the same. - for peer in peers[info_hash]: - selector.unregister(peer) - peer.shutdown() - del peers[info_hash] - - complete_info_hashes.add(info_hash) - - -def on_peer_error(peer: bittorrent.DisposablePeer, info_hash: dht.InfoHash) -> None: - global peers, selector - peer.shutdown() - peers[info_hash].remove(peer) - selector.unregister(peer) - - -# TODO: -# Consider whether time.monotonic() is a good choice. Maybe we should use CLOCK_MONOTONIC_RAW as its not affected by NTP -# adjustments, and all we need is how many seconds passed since a certain point in time. -def loop() -> None: - global selector, node, peers - - t0 = time.monotonic() +async def watch_q(database, q): while True: - keys_and_events = selector.select(timeout=TICK_INTERVAL) - - # Check if it is time to tick - delta = time.monotonic() - t0 - if delta >= TICK_INTERVAL: - if not (delta < 2 * TICK_INTERVAL): - logging.warning("Belated TICK! (Δ = %d)", delta) - - node.on_tick() - for peer_list in peers.values(): - for peer in peer_list: - peer.on_tick() - - t0 = time.monotonic() - - for key, events in keys_and_events: - if events & selectors.EVENT_READ: - key.fileobj.on_receivable() - if events & selectors.EVENT_WRITE: - key.fileobj.on_sendable() - - # Check for entities that would like to write to their socket - keymap = selector.get_map() - for fd in keymap: - fileobj = keymap[fd].fileobj - if fileobj.would_send(): - selector.modify(fileobj, selectors.EVENT_READ | selectors.EVENT_WRITE) - else: - selector.modify(fileobj, selectors.EVENT_READ) + info_hash, metadata = await q.get() + succeeded = database.add_metadata(info_hash, metadata) + if not succeeded: + logging.info("Corrupt metadata for %s! Ignoring.", info_hash.hex()) def parse_ip_port(netloc) -> typing.Optional[typing.Tuple[str, int]]: diff --git a/magneticod/magneticod/bittorrent.py b/magneticod/magneticod/bittorrent.py index 5a5b4be..6f3c98d 100644 --- a/magneticod/magneticod/bittorrent.py +++ b/magneticod/magneticod/bittorrent.py @@ -12,11 +12,10 @@ # # You should have received a copy of the GNU Affero General Public License along with this program. If not, see # . -import errno +import asyncio import logging import hashlib import math -import socket import typing import os @@ -27,27 +26,20 @@ InfoHash = bytes PeerAddress = typing.Tuple[str, int] +async def get_torrent_data(info_hash: InfoHash, peer_addr: PeerAddress, max_metadata_size): + loop = asyncio.get_event_loop() + peer = DisposablePeer(info_hash, peer_addr, max_metadata_size) + r = await peer.launch(loop) + return r + + class DisposablePeer: def __init__(self, info_hash: InfoHash, peer_addr: PeerAddress, max_metadata_size: int= DEFAULT_MAX_METADATA_SIZE): - self.__socket = socket.socket() - self.__socket.setblocking(False) - # To reduce the latency: - self.__socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, True) - if hasattr(socket, "TCP_QUICKACK"): - self.__socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_QUICKACK, True) - - res = self.__socket.connect_ex(peer_addr) - if res != errno.EINPROGRESS: - raise ConnectionError() - self.__peer_addr = peer_addr self.__info_hash = info_hash self.__max_metadata_size = max_metadata_size - self.__incoming_buffer = bytearray() - self.__outgoing_buffer = bytearray() - self.__bt_handshake_complete = False # BitTorrent Handshake self.__ext_handshake_complete = False # Extension Handshake self.__ut_metadata = None # Since we don't know ut_metadata code that remote peer uses... @@ -56,102 +48,55 @@ class DisposablePeer: self.__metadata_received = 0 # Amount of metadata bytes received... self.__metadata = None - # To prevent double shutdown - self.__shutdown = False - # After 120 ticks passed, a peer should report an error and shut itself down due to being stall. self.__ticks_passed = 0 + async def launch(self, loop): + self._loop = loop + self._metadata_future = self._loop.create_future() + + self._reader, self._writer = await asyncio.open_connection( + self.__peer_addr[0], self.__peer_addr[1], loop=loop) + # Send the BitTorrent handshake message (0x13 = 19 in decimal, the length of the handshake message) - self.__outgoing_buffer += b"\x13BitTorrent protocol%s%s%s" % ( + self._writer.write(b"\x13BitTorrent protocol%s%s%s" % ( b"\x00\x00\x00\x00\x00\x10\x00\x01", self.__info_hash, self.__random_bytes(20) - ) - - @staticmethod - def when_error() -> None: - raise NotImplementedError() - - @staticmethod - def when_metadata_found(info_hash: InfoHash, metadata: bytes) -> None: - raise NotImplementedError() - - def on_tick(self): - self.__ticks_passed += 1 - - if self.__ticks_passed == 120: - logging.debug("Peer failed to fetch metadata in time for info hash %s!", self.__info_hash.hex()) - self.when_error() - - def on_receivable(self) -> None: - while True: - try: - received = self.__socket.recv(8192) - except BlockingIOError: - break - except ConnectionResetError: - self.when_error() - return - except ConnectionRefusedError: - self.when_error() - return - except OSError: # TODO: check for "no route to host 113" error - self.when_error() - return - - if not received: - self.when_error() - return - - self.__incoming_buffer += received + )) # Honestly speaking, BitTorrent protocol might be one of the most poorly documented and (not the most but) badly # designed protocols I have ever seen (I am 19 years old so what I could have seen?). # # Anyway, all the messages EXCEPT the handshake are length-prefixed by 4 bytes in network order, BUT the # size of the handshake message is the 1-byte length prefix + 49 bytes, but luckily, there is only one canonical # way of handshaking in the wild. - if not self.__bt_handshake_complete: - if len(self.__incoming_buffer) < 68: - # We are still receiving the handshake... - return + message = await self._reader.readexactly(68) + if message[1:20] != b"BitTorrent protocol": + # Erroneous handshake, possibly unknown version... + logging.debug("Erroneous BitTorrent handshake! %s", message) + self.close() + return - if self.__incoming_buffer[1:20] != b"BitTorrent protocol": - # Erroneous handshake, possibly unknown version... - logging.debug("Erroneous BitTorrent handshake! %s", self.__incoming_buffer[:68]) - self.when_error() - return + self.__on_bt_handshake(message) - self.__on_bt_handshake(self.__incoming_buffer[:68]) + try: + while not self._metadata_future.done(): + buffer = await self._reader.readexactly(4) + length = int.from_bytes(buffer, "big") + message = await self._reader.readexactly(length) + self.__on_message(message) + except Exception as ex: + self.close() + return await self._metadata_future - self.__bt_handshake_complete = True - self.__incoming_buffer = self.__incoming_buffer[68:] + def when_metadata_found(self, info_hash: InfoHash, metadata: bytes) -> None: + self._metadata_future.set_result((info_hash, metadata)) + self.close() - while len(self.__incoming_buffer) >= 4: - # Beware that while there are still messages in the incoming queue/buffer, one of previous messages might - # have caused an error that necessitates us to quit. - if self.__shutdown: - break - - length = int.from_bytes(self.__incoming_buffer[:4], "big") - if len(self.__incoming_buffer) - 4 < length: - # Message is still incoming... - return - - self.__on_message(self.__incoming_buffer[4:4+length]) - self.__incoming_buffer = self.__incoming_buffer[4+length:] - - def on_sendable(self) -> None: - while self.__outgoing_buffer: - try: - n_sent = self.__socket.send(self.__outgoing_buffer) - assert n_sent - self.__outgoing_buffer = self.__outgoing_buffer[n_sent:] - except BlockingIOError: - break - except OSError: - # In case -while looping- on_sendable is called after socket is closed (mostly because of an error) - return + def close(self): + self._writer.close() + if not self._metadata_future.done(): + self._metadata_future.set_result(None) def __on_message(self, message: bytes) -> None: length = len(message) @@ -191,10 +136,10 @@ class DisposablePeer: # In case you cannot read_file hex: # 0x14 = 20 (BitTorrent ID indicating that it's an extended message) # 0x00 = 0 (Extension ID indicating that it's the handshake message) - self.__outgoing_buffer += b"%s\x14\x00%s" % ( + self._writer.write(b"%b\x14%s" % ( (2 + len(msg_dict_dump)).to_bytes(4, "big"), - msg_dict_dump - ) + b'\0' + msg_dict_dump + )) def __on_ext_handshake_message(self, message: bytes) -> None: if self.__ext_handshake_complete: @@ -284,29 +229,11 @@ class DisposablePeer: # In case you cannot read_file hex: # 0x14 = 20 (BitTorrent ID indicating that it's an extended message) # 0x03 = 3 (Extension ID indicating that it's an ut_metadata message) - self.__outgoing_buffer += b"%b\x14%s%s" % ( + self._writer.write(b"%b\x14%s%s" % ( (2 + len(msg_dict_dump)).to_bytes(4, "big"), self.__ut_metadata.to_bytes(1, "big"), msg_dict_dump - ) - - def shutdown(self) -> None: - if self.__shutdown: - return - try: - self.__socket.shutdown(socket.SHUT_RDWR) - except OSError: - # OSError might be raised in case the connection to the remote peer fails: nevertheless, when_error should - # be called, and the supervisor will try to shutdown the peer, and ta da: OSError! - pass - self.__socket.close() - self.__shutdown = True - - def would_send(self) -> bool: - return bool(len(self.__outgoing_buffer)) - - def fileno(self) -> int: - return self.__socket.fileno() + )) @staticmethod def __random_bytes(n: int) -> bytes: diff --git a/magneticod/magneticod/dht.py b/magneticod/magneticod/dht.py index 5e2f057..ad60836 100644 --- a/magneticod/magneticod/dht.py +++ b/magneticod/magneticod/dht.py @@ -13,15 +13,18 @@ # You should have received a copy of the GNU Affero General Public License along with this program. If not, see # . import array +import asyncio import collections +import itertools import zlib import logging import socket import typing import os -from .constants import BOOTSTRAPPING_NODES, DEFAULT_MAX_METADATA_SIZE +from .constants import BOOTSTRAPPING_NODES, DEFAULT_MAX_METADATA_SIZE, MAX_ACTIVE_PEERS_PER_INFO_HASH from . import bencode +from . import bittorrent NodeID = bytes NodeAddress = typing.Tuple[str, int] @@ -30,107 +33,77 @@ InfoHash = bytes class SybilNode: - def __init__(self, address: typing.Tuple[str, int]): + def __init__(self, address: typing.Tuple[str, int], complete_info_hashes, max_metadata_size): self.__true_id = self.__random_bytes(20) - self.__socket = socket.socket(type=socket.SOCK_DGRAM) - self.__socket.bind(address) - self.__socket.setblocking(False) + self.__address = address - self.__incoming_buffer = array.array("B", (0 for _ in range(65536))) - self.__outgoing_queue = collections.deque() - - self.__routing_table = {} # type: typing.Dict[NodeID, NodeAddress] + self._routing_table = {} # type: typing.Dict[NodeID, NodeAddress] self.__token_secret = self.__random_bytes(4) # Maximum number of neighbours (this is a THRESHOLD where, once reached, the search for new neighbours will # stop; but until then, the total number of neighbours might exceed the threshold). self.__n_max_neighbours = 2000 + self.__peers = collections.defaultdict( + list) # type: typing.DefaultDict[dht.InfoHash, typing.List[bittorrent.DisposablePeer]] + self._complete_info_hashes = complete_info_hashes + self.__max_metadata_size = max_metadata_size + self._metadata_q = asyncio.Queue() logging.info("SybilNode %s on %s initialized!", self.__true_id.hex().upper(), address) - @staticmethod - def when_peer_found(info_hash: InfoHash, peer_addr: PeerAddress) -> None: - raise NotImplementedError() + async def launch(self, loop): + self._loop = loop + await loop.create_datagram_endpoint(lambda: self, local_addr=self.__address) - def on_tick(self) -> None: - self.__bootstrap() - self.__make_neighbours() - self.__routing_table.clear() + def connection_made(self, transport): + self._loop.create_task(self.on_tick()) + self._loop.create_task(self.increase_neighbour_task()) + self._transport = transport - def on_receivable(self) -> None: - buffer = self.__incoming_buffer - while True: - try: - _, addr = self.__socket.recvfrom_into(buffer, 65536) - data = buffer.tobytes() - except BlockingIOError: - break - except ConnectionResetError: - continue - except ConnectionRefusedError: - continue - - # Ignore nodes that uses port 0 (assholes). - if addr[1] == 0: - continue - - try: - message = bencode.loads(data) - except bencode.BencodeDecodingError: - continue - - if isinstance(message.get(b"r"), dict) and type(message[b"r"].get(b"nodes")) is bytes: - self.__on_FIND_NODE_response(message) - elif message.get(b"q") == b"get_peers": - self.__on_GET_PEERS_query(message, addr) - elif message.get(b"q") == b"announce_peer": - self.__on_ANNOUNCE_PEER_query(message, addr) - - def on_sendable(self) -> None: - congestion = None - while True: - try: - addr, data = self.__outgoing_queue.pop() - except IndexError: - break - - try: - self.__socket.sendto(data, addr) - except BlockingIOError: - self.__outgoing_queue.appendleft((addr, data)) - break - except PermissionError: - # This exception (EPERM errno: 1) is kernel's way of saying that "you are far too fast, chill". - # It is also likely that we have received a ICMP source quench packet (meaning, that we really need to - # slow down. - # - # Read more here: http://www.archivum.info/comp.protocols.tcp-ip/2009-05/00088/UDP-socket-amp-amp-sendto - # -amp-amp-EPERM.html - congestion = True - break - except OSError: - # Pass in case of trying to send to port 0 (it is much faster to catch exceptions than using an - # if-statement). - pass - - if congestion: - self.__outgoing_queue.clear() + def error_received(self, exc): + logging.error("got error %s", exc) + if isinstance(exc, PermissionError): # In case of congestion, decrease the maximum number of nodes to the 90% of the current value. if self.__n_max_neighbours < 200: logging.warning("Maximum number of neighbours are now less than 200 due to congestion!") else: self.__n_max_neighbours = self.__n_max_neighbours * 9 // 10 - else: - # In case of the lack of congestion, increase the maximum number of nodes by 1%. + logging.debug("Maximum number of neighbours now %d", self.__n_max_neighbours) + + async def on_tick(self) -> None: + while True: + await asyncio.sleep(1) + self.__bootstrap() + self.__make_neighbours() + self._routing_table.clear() + + def datagram_received(self, data, addr) -> None: + # Ignore nodes that uses port 0 (assholes). + if addr[1] == 0: + return + + try: + message = bencode.loads(data) + except bencode.BencodeDecodingError: + return + + if isinstance(message.get(b"r"), dict) and type(message[b"r"].get(b"nodes")) is bytes: + self.__on_FIND_NODE_response(message) + elif message.get(b"q") == b"get_peers": + self.__on_GET_PEERS_query(message, addr) + elif message.get(b"q") == b"announce_peer": + self.__on_ANNOUNCE_PEER_query(message, addr) + + async def increase_neighbour_task(self): + while True: + await asyncio.sleep(10) self.__n_max_neighbours = self.__n_max_neighbours * 101 // 100 - def would_send(self) -> bool: - """ Whether node is waiting to write on its socket or not. """ - return bool(self.__outgoing_queue) - def shutdown(self) -> None: - self.__socket.close() + for peer in itertools.chain.from_iterable(self.__peers.values()): + peer.close() + self._transport.close() def __on_FIND_NODE_response(self, message: bencode.KRPCDict) -> None: try: @@ -145,8 +118,8 @@ class SybilNode: return # Add new found nodes to the routing table, assuring that we have no more than n_max_neighbours in total. - if len(self.__routing_table) < self.__n_max_neighbours: - self.__routing_table.update(nodes) + if len(self._routing_table) < self.__n_max_neighbours: + self._routing_table.update(nodes) def __on_GET_PEERS_query(self, message: bencode.KRPCDict, addr: NodeAddress) -> None: try: @@ -157,8 +130,7 @@ class SybilNode: except (TypeError, KeyError, AssertionError): return - # appendleft to prioritise GET_PEERS responses as they are the most fruitful ones! - self.__outgoing_queue.appendleft((addr, bencode.dumps({ + data = bencode.dumps({ b"y": b"r", b"t": transaction_id, b"r": { @@ -166,7 +138,10 @@ class SybilNode: b"nodes": b"", b"token": self.__calculate_token(addr, info_hash) } - }))) + }) + # we want to prioritise GET_PEERS responses as they are the most fruitful ones! + # but there is no easy way to do this with asyncio + self._transport.sendto(data, addr) def __on_ANNOUNCE_PEER_query(self, message: bencode.KRPCDict, addr: NodeAddress) -> None: try: @@ -189,31 +164,45 @@ class SybilNode: except (TypeError, KeyError, AssertionError): return - self.__outgoing_queue.append((addr, bencode.dumps({ + data = bencode.dumps({ b"y": b"r", b"t": transaction_id, b"r": { b"id": node_id[:15] + self.__true_id[:5] } - }))) + }) + self._transport.sendto(data, addr) if implied_port: peer_addr = (addr[0], addr[1]) else: peer_addr = (addr[0], port) - self.when_peer_found(info_hash, peer_addr) + if len(self.__peers[info_hash]) > MAX_ACTIVE_PEERS_PER_INFO_HASH or \ + info_hash in self._complete_info_hashes: + return - def fileno(self) -> int: - return self.__socket.fileno() + peer = bittorrent.get_torrent_data(info_hash, peer_addr, self.__max_metadata_size) + self.__peers[info_hash].append(peer) + self._loop.create_task(peer).add_done_callback(self.metadata_found) + + def metadata_found(self, future): + r = future.result() + if r: + info_hash, metadata = r + for peer in self.__peers[info_hash]: + peer.close() + self._metadata_q.put_nowait(r) + self._complete_info_hashes.add(info_hash) def __bootstrap(self) -> None: for addr in BOOTSTRAPPING_NODES: - self.__outgoing_queue.append((addr, self.__build_FIND_NODE_query(self.__true_id))) + data = self.__build_FIND_NODE_query(self.__true_id) + self._transport.sendto(data, addr) def __make_neighbours(self) -> None: - for node_id, addr in self.__routing_table.items(): - self.__outgoing_queue.append((addr, self.__build_FIND_NODE_query(node_id[:15] + self.__true_id[:5]))) + for node_id, addr in self._routing_table.items(): + self._transport.sendto(self.__build_FIND_NODE_query(node_id[:15] + self.__true_id[:5]), addr) @staticmethod def __decode_nodes(infos: bytes) -> typing.List[typing.Tuple[NodeID, NodeAddress]]: From eb8a2c72e680d387813e84ed0080d3afeb5b4037 Mon Sep 17 00:00:00 2001 From: Richard Kiss Date: Sat, 13 May 2017 22:52:43 -0700 Subject: [PATCH 02/20] Fix conditional. --- magneticod/magneticod/dht.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/magneticod/magneticod/dht.py b/magneticod/magneticod/dht.py index ad60836..07053c6 100644 --- a/magneticod/magneticod/dht.py +++ b/magneticod/magneticod/dht.py @@ -192,8 +192,8 @@ class SybilNode: info_hash, metadata = r for peer in self.__peers[info_hash]: peer.close() - self._metadata_q.put_nowait(r) - self._complete_info_hashes.add(info_hash) + self._metadata_q.put_nowait(r) + self._complete_info_hashes.add(info_hash) def __bootstrap(self) -> None: for addr in BOOTSTRAPPING_NODES: From 2f68ac3c7a646c51be3f304d12a30f66799e1b17 Mon Sep 17 00:00:00 2001 From: Richard Kiss Date: Sat, 13 May 2017 22:55:49 -0700 Subject: [PATCH 03/20] Improve naming. --- magneticod/magneticod/bittorrent.py | 2 +- magneticod/magneticod/dht.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/magneticod/magneticod/bittorrent.py b/magneticod/magneticod/bittorrent.py index 6f3c98d..92efc19 100644 --- a/magneticod/magneticod/bittorrent.py +++ b/magneticod/magneticod/bittorrent.py @@ -26,7 +26,7 @@ InfoHash = bytes PeerAddress = typing.Tuple[str, int] -async def get_torrent_data(info_hash: InfoHash, peer_addr: PeerAddress, max_metadata_size): +async def fetch_metadata(info_hash: InfoHash, peer_addr: PeerAddress, max_metadata_size): loop = asyncio.get_event_loop() peer = DisposablePeer(info_hash, peer_addr, max_metadata_size) r = await peer.launch(loop) diff --git a/magneticod/magneticod/dht.py b/magneticod/magneticod/dht.py index 07053c6..dae64ac 100644 --- a/magneticod/magneticod/dht.py +++ b/magneticod/magneticod/dht.py @@ -182,7 +182,7 @@ class SybilNode: info_hash in self._complete_info_hashes: return - peer = bittorrent.get_torrent_data(info_hash, peer_addr, self.__max_metadata_size) + peer = bittorrent.fetch_metadata(info_hash, peer_addr, self.__max_metadata_size) self.__peers[info_hash].append(peer) self._loop.create_task(peer).add_done_callback(self.metadata_found) From 4aea5df886d285249c8ac664678cd98704cfb8f6 Mon Sep 17 00:00:00 2001 From: Richard Kiss Date: Sun, 14 May 2017 11:34:38 -0700 Subject: [PATCH 04/20] Reduce leakage in bittorrent.py. --- magneticod/magneticod/bittorrent.py | 84 +++++++++++++++-------------- 1 file changed, 45 insertions(+), 39 deletions(-) diff --git a/magneticod/magneticod/bittorrent.py b/magneticod/magneticod/bittorrent.py index 92efc19..c4d9eb5 100644 --- a/magneticod/magneticod/bittorrent.py +++ b/magneticod/magneticod/bittorrent.py @@ -34,7 +34,8 @@ async def fetch_metadata(info_hash: InfoHash, peer_addr: PeerAddress, max_metada class DisposablePeer: - def __init__(self, info_hash: InfoHash, peer_addr: PeerAddress, max_metadata_size: int= DEFAULT_MAX_METADATA_SIZE): + def __init__(self, info_hash: InfoHash, peer_addr: PeerAddress, + max_metadata_size: int=DEFAULT_MAX_METADATA_SIZE): self.__peer_addr = peer_addr self.__info_hash = info_hash @@ -47,56 +48,60 @@ class DisposablePeer: self.__metadata_size = None self.__metadata_received = 0 # Amount of metadata bytes received... self.__metadata = None + self._writer = None - # After 120 ticks passed, a peer should report an error and shut itself down due to being stall. - self.__ticks_passed = 0 - - async def launch(self, loop): + def launch(self, loop): self._loop = loop self._metadata_future = self._loop.create_future() + self._loop.create_task(self.run()) + return self._metadata_future - self._reader, self._writer = await asyncio.open_connection( - self.__peer_addr[0], self.__peer_addr[1], loop=loop) - - # Send the BitTorrent handshake message (0x13 = 19 in decimal, the length of the handshake message) - self._writer.write(b"\x13BitTorrent protocol%s%s%s" % ( - b"\x00\x00\x00\x00\x00\x10\x00\x01", - self.__info_hash, - self.__random_bytes(20) - )) - # Honestly speaking, BitTorrent protocol might be one of the most poorly documented and (not the most but) badly - # designed protocols I have ever seen (I am 19 years old so what I could have seen?). - # - # Anyway, all the messages EXCEPT the handshake are length-prefixed by 4 bytes in network order, BUT the - # size of the handshake message is the 1-byte length prefix + 49 bytes, but luckily, there is only one canonical - # way of handshaking in the wild. - message = await self._reader.readexactly(68) - if message[1:20] != b"BitTorrent protocol": - # Erroneous handshake, possibly unknown version... - logging.debug("Erroneous BitTorrent handshake! %s", message) - self.close() - return - - self.__on_bt_handshake(message) + async def timeout(self): + # After 12 seconds passed, a peer should report an error and shut itself down due to being stall. + await asyncio.sleep(12) + self.close() + async def run(self): try: + self._reader, self._writer = await asyncio.open_connection( + self.__peer_addr[0], self.__peer_addr[1], loop=self._loop) + self._loop.create_task(self.timeout()) + + # Send the BitTorrent handshake message (0x13 = 19 in decimal, the length of the handshake message) + self._writer.write(b"\x13BitTorrent protocol%s%s%s" % ( + b"\x00\x00\x00\x00\x00\x10\x00\x01", + self.__info_hash, + self.__random_bytes(20) + )) + # Honestly speaking, BitTorrent protocol might be one of the most poorly documented and (not the most but) badly + # designed protocols I have ever seen (I am 19 years old so what I could have seen?). + # + # Anyway, all the messages EXCEPT the handshake are length-prefixed by 4 bytes in network order, BUT the + # size of the handshake message is the 1-byte length prefix + 49 bytes, but luckily, there is only one canonical + # way of handshaking in the wild. + message = await self._reader.readexactly(68) + if message[1:20] != b"BitTorrent protocol": + # Erroneous handshake, possibly unknown version... + logging.debug("Erroneous BitTorrent handshake! %s", message) + self.close() + return + + self.__on_bt_handshake(message) + while not self._metadata_future.done(): buffer = await self._reader.readexactly(4) length = int.from_bytes(buffer, "big") message = await self._reader.readexactly(length) self.__on_message(message) except Exception as ex: - self.close() - return await self._metadata_future - - def when_metadata_found(self, info_hash: InfoHash, metadata: bytes) -> None: - self._metadata_future.set_result((info_hash, metadata)) + pass self.close() def close(self): - self._writer.close() if not self._metadata_future.done(): self._metadata_future.set_result(None) + if self._writer: + self._writer.close() def __on_message(self, message: bytes) -> None: length = len(message) @@ -163,11 +168,11 @@ class DisposablePeer: self.__peer_addr[1], self.__max_metadata_size) except KeyError: - self.when_error() + self.close() return except AssertionError as e: logging.debug(str(e)) - self.when_error() + self.close() return self.__ut_metadata = ut_metadata @@ -175,7 +180,7 @@ class DisposablePeer: self.__metadata = bytearray(metadata_size) except MemoryError: logging.exception("Could not allocate %.1f KiB for the metadata!", metadata_size / 1024) - self.when_error() + self.close() return self.__metadata_size = metadata_size @@ -213,13 +218,14 @@ class DisposablePeer: if self.__metadata_received == self.__metadata_size: if hashlib.sha1(self.__metadata).digest() == self.__info_hash: - self.when_metadata_found(self.__info_hash, bytes(self.__metadata)) + self._metadata_future.set_result((self.__info_hash, bytes(self.__metadata))) + self.close() else: logging.debug("Invalid Metadata! Ignoring.") elif msg_type == 2: # reject logging.info("Peer rejected us.") - self.when_error() + self.close() def __request_metadata_piece(self, piece: int) -> None: msg_dict_dump = bencode.dumps({ From f3ae493308569cb0e2147719bf69e0e53a234232 Mon Sep 17 00:00:00 2001 From: Richard Kiss Date: Sun, 14 May 2017 12:03:50 -0700 Subject: [PATCH 05/20] More clean-up in error cases. --- magneticod/magneticod/bittorrent.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/magneticod/magneticod/bittorrent.py b/magneticod/magneticod/bittorrent.py index c4d9eb5..e0fe2b2 100644 --- a/magneticod/magneticod/bittorrent.py +++ b/magneticod/magneticod/bittorrent.py @@ -49,11 +49,13 @@ class DisposablePeer: self.__metadata_received = 0 # Amount of metadata bytes received... self.__metadata = None self._writer = None + self._run_task = None def launch(self, loop): self._loop = loop self._metadata_future = self._loop.create_future() - self._loop.create_task(self.run()) + self._run_task = self._loop.create_task(self.run()) + self._loop.create_task(self.timeout()) return self._metadata_future async def timeout(self): @@ -65,8 +67,6 @@ class DisposablePeer: try: self._reader, self._writer = await asyncio.open_connection( self.__peer_addr[0], self.__peer_addr[1], loop=self._loop) - self._loop.create_task(self.timeout()) - # Send the BitTorrent handshake message (0x13 = 19 in decimal, the length of the handshake message) self._writer.write(b"\x13BitTorrent protocol%s%s%s" % ( b"\x00\x00\x00\x00\x00\x10\x00\x01", @@ -102,6 +102,8 @@ class DisposablePeer: self._metadata_future.set_result(None) if self._writer: self._writer.close() + if self._run_task: + self._run_task.cancel() def __on_message(self, message: bytes) -> None: length = len(message) @@ -218,8 +220,9 @@ class DisposablePeer: if self.__metadata_received == self.__metadata_size: if hashlib.sha1(self.__metadata).digest() == self.__info_hash: - self._metadata_future.set_result((self.__info_hash, bytes(self.__metadata))) - self.close() + if not self._metadata_future.done(): + self._metadata_future.set_result((self.__info_hash, bytes(self.__metadata))) + self.close() else: logging.debug("Invalid Metadata! Ignoring.") From 635fbe8cb154954233b6f2f841e76be922166c60 Mon Sep 17 00:00:00 2001 From: Richard Kiss Date: Sun, 14 May 2017 12:47:33 -0700 Subject: [PATCH 06/20] More clean-up and simplification. --- magneticod/magneticod/bittorrent.py | 58 +++++++++-------------------- magneticod/magneticod/constants.py | 2 + magneticod/magneticod/dht.py | 27 ++++++++------ 3 files changed, 35 insertions(+), 52 deletions(-) diff --git a/magneticod/magneticod/bittorrent.py b/magneticod/magneticod/bittorrent.py index e0fe2b2..75e0c56 100644 --- a/magneticod/magneticod/bittorrent.py +++ b/magneticod/magneticod/bittorrent.py @@ -27,15 +27,17 @@ PeerAddress = typing.Tuple[str, int] async def fetch_metadata(info_hash: InfoHash, peer_addr: PeerAddress, max_metadata_size): - loop = asyncio.get_event_loop() - peer = DisposablePeer(info_hash, peer_addr, max_metadata_size) - r = await peer.launch(loop) - return r + return await DisposablePeer().run( + asyncio.get_event_loop(), info_hash, peer_addr, max_metadata_size) + + +class ProtocolError(Exception): + pass class DisposablePeer: - def __init__(self, info_hash: InfoHash, peer_addr: PeerAddress, - max_metadata_size: int=DEFAULT_MAX_METADATA_SIZE): + async def run(self, loop, info_hash: InfoHash, peer_addr: PeerAddress, + max_metadata_size: int=DEFAULT_MAX_METADATA_SIZE): self.__peer_addr = peer_addr self.__info_hash = info_hash @@ -48,25 +50,14 @@ class DisposablePeer: self.__metadata_size = None self.__metadata_received = 0 # Amount of metadata bytes received... self.__metadata = None - self._writer = None self._run_task = None - def launch(self, loop): - self._loop = loop - self._metadata_future = self._loop.create_future() - self._run_task = self._loop.create_task(self.run()) - self._loop.create_task(self.timeout()) - return self._metadata_future + self._metadata_future = loop.create_future() + self._writer = None - async def timeout(self): - # After 12 seconds passed, a peer should report an error and shut itself down due to being stall. - await asyncio.sleep(12) - self.close() - - async def run(self): try: self._reader, self._writer = await asyncio.open_connection( - self.__peer_addr[0], self.__peer_addr[1], loop=self._loop) + self.__peer_addr[0], self.__peer_addr[1], loop=loop) # Send the BitTorrent handshake message (0x13 = 19 in decimal, the length of the handshake message) self._writer.write(b"\x13BitTorrent protocol%s%s%s" % ( b"\x00\x00\x00\x00\x00\x10\x00\x01", @@ -82,9 +73,7 @@ class DisposablePeer: message = await self._reader.readexactly(68) if message[1:20] != b"BitTorrent protocol": # Erroneous handshake, possibly unknown version... - logging.debug("Erroneous BitTorrent handshake! %s", message) - self.close() - return + raise ProtocolError("Erroneous BitTorrent handshake! %s" % message) self.__on_bt_handshake(message) @@ -94,16 +83,12 @@ class DisposablePeer: message = await self._reader.readexactly(length) self.__on_message(message) except Exception as ex: - pass - self.close() - - def close(self): - if not self._metadata_future.done(): - self._metadata_future.set_result(None) + logging.info("closing %s to %s", self.__info_hash.hex(), self.__peer_addr) + if not self._metadata_future.done(): + self._metadata_future.set_result(None) if self._writer: self._writer.close() - if self._run_task: - self._run_task.cancel() + return self._metadata_future.result() def __on_message(self, message: bytes) -> None: length = len(message) @@ -169,21 +154,16 @@ class DisposablePeer: " {} max metadata size".format(self.__peer_addr[0], self.__peer_addr[1], self.__max_metadata_size) - except KeyError: - self.close() - return except AssertionError as e: logging.debug(str(e)) - self.close() - return + raise self.__ut_metadata = ut_metadata try: self.__metadata = bytearray(metadata_size) except MemoryError: logging.exception("Could not allocate %.1f KiB for the metadata!", metadata_size / 1024) - self.close() - return + raise self.__metadata_size = metadata_size self.__ext_handshake_complete = True @@ -222,13 +202,11 @@ class DisposablePeer: if hashlib.sha1(self.__metadata).digest() == self.__info_hash: if not self._metadata_future.done(): self._metadata_future.set_result((self.__info_hash, bytes(self.__metadata))) - self.close() else: logging.debug("Invalid Metadata! Ignoring.") elif msg_type == 2: # reject logging.info("Peer rejected us.") - self.close() def __request_metadata_piece(self, piece: int) -> None: msg_dict_dump = bencode.dumps({ diff --git a/magneticod/magneticod/constants.py b/magneticod/magneticod/constants.py index bc2c954..b99ebff 100644 --- a/magneticod/magneticod/constants.py +++ b/magneticod/magneticod/constants.py @@ -9,3 +9,5 @@ PENDING_INFO_HASHES = 10 # threshold for pending info hashes before being commi TICK_INTERVAL = 1 # in seconds (soft constraint) # maximum (inclusive) number of active (disposable) peers to fetch the metadata per info hash at the same time: MAX_ACTIVE_PEERS_PER_INFO_HASH = 5 + +PEER_TIMEOUT=12 # seconds diff --git a/magneticod/magneticod/dht.py b/magneticod/magneticod/dht.py index dae64ac..be4f045 100644 --- a/magneticod/magneticod/dht.py +++ b/magneticod/magneticod/dht.py @@ -22,7 +22,7 @@ import socket import typing import os -from .constants import BOOTSTRAPPING_NODES, DEFAULT_MAX_METADATA_SIZE, MAX_ACTIVE_PEERS_PER_INFO_HASH +from .constants import BOOTSTRAPPING_NODES, DEFAULT_MAX_METADATA_SIZE, MAX_ACTIVE_PEERS_PER_INFO_HASH, PEER_TIMEOUT from . import bencode from . import bittorrent @@ -102,7 +102,7 @@ class SybilNode: def shutdown(self) -> None: for peer in itertools.chain.from_iterable(self.__peers.values()): - peer.close() + peer.cancel() self._transport.close() def __on_FIND_NODE_response(self, message: bencode.KRPCDict) -> None: @@ -182,18 +182,21 @@ class SybilNode: info_hash in self._complete_info_hashes: return - peer = bittorrent.fetch_metadata(info_hash, peer_addr, self.__max_metadata_size) + peer = self._loop.create_task(self._launch_fetch(info_hash, peer_addr)) self.__peers[info_hash].append(peer) - self._loop.create_task(peer).add_done_callback(self.metadata_found) - def metadata_found(self, future): - r = future.result() - if r: - info_hash, metadata = r - for peer in self.__peers[info_hash]: - peer.close() - self._metadata_q.put_nowait(r) - self._complete_info_hashes.add(info_hash) + async def _launch_fetch(self, info_hash, peer_addr): + try: + f = bittorrent.fetch_metadata(info_hash, peer_addr, self.__max_metadata_size) + r = await asyncio.wait_for(f, timeout=PEER_TIMEOUT) + if r: + info_hash, metadata = r + for peer in self.__peers[info_hash]: + peer.cancel() + self._complete_info_hashes.add(info_hash) + await self._metadata_q.put(r) + except asyncio.TimeoutError: + pass def __bootstrap(self) -> None: for addr in BOOTSTRAPPING_NODES: From 4515fa8b0aded0781c9ede19aaf3fcbf8aec2ccf Mon Sep 17 00:00:00 2001 From: Richard Kiss Date: Sun, 14 May 2017 13:37:22 -0700 Subject: [PATCH 07/20] More cleanup. Only hit bootstrap if it seems necessary. --- magneticod/magneticod/__main__.py | 3 +-- magneticod/magneticod/bittorrent.py | 4 +--- magneticod/magneticod/constants.py | 3 +-- magneticod/magneticod/dht.py | 22 +++++++++++++++------- 4 files changed, 18 insertions(+), 14 deletions(-) diff --git a/magneticod/magneticod/__main__.py b/magneticod/magneticod/__main__.py index a3729c5..8295536 100644 --- a/magneticod/magneticod/__main__.py +++ b/magneticod/magneticod/__main__.py @@ -57,10 +57,9 @@ def main(): loop.run_forever() except KeyboardInterrupt: logging.critical("Keyboard interrupt received! Exiting gracefully...") - pass finally: database.close() - node.shutdown() + loop.run_until_complete(node.shutdown()) return 0 diff --git a/magneticod/magneticod/bittorrent.py b/magneticod/magneticod/bittorrent.py index 75e0c56..dce6336 100644 --- a/magneticod/magneticod/bittorrent.py +++ b/magneticod/magneticod/bittorrent.py @@ -20,7 +20,6 @@ import typing import os from . import bencode -from .constants import DEFAULT_MAX_METADATA_SIZE InfoHash = bytes PeerAddress = typing.Tuple[str, int] @@ -36,8 +35,7 @@ class ProtocolError(Exception): class DisposablePeer: - async def run(self, loop, info_hash: InfoHash, peer_addr: PeerAddress, - max_metadata_size: int=DEFAULT_MAX_METADATA_SIZE): + async def run(self, loop, info_hash: InfoHash, peer_addr: PeerAddress, max_metadata_size: int): self.__peer_addr = peer_addr self.__info_hash = info_hash diff --git a/magneticod/magneticod/constants.py b/magneticod/magneticod/constants.py index b99ebff..5f2b1b2 100644 --- a/magneticod/magneticod/constants.py +++ b/magneticod/magneticod/constants.py @@ -6,8 +6,7 @@ BOOTSTRAPPING_NODES = [ ] PENDING_INFO_HASHES = 10 # threshold for pending info hashes before being committed to database: -TICK_INTERVAL = 1 # in seconds (soft constraint) # maximum (inclusive) number of active (disposable) peers to fetch the metadata per info hash at the same time: MAX_ACTIVE_PEERS_PER_INFO_HASH = 5 -PEER_TIMEOUT=12 # seconds +PEER_TIMEOUT=120 # seconds diff --git a/magneticod/magneticod/dht.py b/magneticod/magneticod/dht.py index be4f045..79f64ac 100644 --- a/magneticod/magneticod/dht.py +++ b/magneticod/magneticod/dht.py @@ -12,7 +12,6 @@ # # You should have received a copy of the GNU Affero General Public License along with this program. If not, see # . -import array import asyncio import collections import itertools @@ -22,7 +21,7 @@ import socket import typing import os -from .constants import BOOTSTRAPPING_NODES, DEFAULT_MAX_METADATA_SIZE, MAX_ACTIVE_PEERS_PER_INFO_HASH, PEER_TIMEOUT +from .constants import BOOTSTRAPPING_NODES, MAX_ACTIVE_PEERS_PER_INFO_HASH, PEER_TIMEOUT from . import bencode from . import bittorrent @@ -49,6 +48,7 @@ class SybilNode: self._complete_info_hashes = complete_info_hashes self.__max_metadata_size = max_metadata_size self._metadata_q = asyncio.Queue() + self._tasks = [] logging.info("SybilNode %s on %s initialized!", self.__true_id.hex().upper(), address) @@ -57,8 +57,8 @@ class SybilNode: await loop.create_datagram_endpoint(lambda: self, local_addr=self.__address) def connection_made(self, transport): - self._loop.create_task(self.on_tick()) - self._loop.create_task(self.increase_neighbour_task()) + self._tasks.append(self._loop.create_task(self.on_tick())) + self._tasks.append(self._loop.create_task(self.increase_neighbour_task())) self._transport = transport def error_received(self, exc): @@ -74,7 +74,8 @@ class SybilNode: async def on_tick(self) -> None: while True: await asyncio.sleep(1) - self.__bootstrap() + if len(self._routing_table) == 0: + self.__bootstrap() self.__make_neighbours() self._routing_table.clear() @@ -100,9 +101,16 @@ class SybilNode: await asyncio.sleep(10) self.__n_max_neighbours = self.__n_max_neighbours * 101 // 100 - def shutdown(self) -> None: - for peer in itertools.chain.from_iterable(self.__peers.values()): + async def shutdown(self) -> None: + peers = [peer for peer in itertools.chain.from_iterable(self.__peers.values())] + for peer in peers: peer.cancel() + for peer in peers: + await peer + for task in self._tasks: + task.cancel() + for task in self._tasks: + await task self._transport.close() def __on_FIND_NODE_response(self, message: bencode.KRPCDict) -> None: From d04634b57bbe7331abf9472e8e2d0b96de321083 Mon Sep 17 00:00:00 2001 From: Richard Kiss Date: Sun, 14 May 2017 13:43:15 -0700 Subject: [PATCH 08/20] Improve dht shutdown. Notice writing pauses. --- magneticod/magneticod/bittorrent.py | 7 +++++++ magneticod/magneticod/dht.py | 3 +++ 2 files changed, 10 insertions(+) diff --git a/magneticod/magneticod/bittorrent.py b/magneticod/magneticod/bittorrent.py index dce6336..f5784af 100644 --- a/magneticod/magneticod/bittorrent.py +++ b/magneticod/magneticod/bittorrent.py @@ -52,6 +52,7 @@ class DisposablePeer: self._metadata_future = loop.create_future() self._writer = None + self._is_paused = False try: self._reader, self._writer = await asyncio.open_connection( @@ -88,6 +89,12 @@ class DisposablePeer: self._writer.close() return self._metadata_future.result() + def pause_writing(self): + self._is_paused = True + + def resume_writing(self): + self._is_paused = False + def __on_message(self, message: bytes) -> None: length = len(message) diff --git a/magneticod/magneticod/dht.py b/magneticod/magneticod/dht.py index 79f64ac..bdd1f54 100644 --- a/magneticod/magneticod/dht.py +++ b/magneticod/magneticod/dht.py @@ -84,6 +84,9 @@ class SybilNode: if addr[1] == 0: return + if self._transport.is_closing(): + return + try: message = bencode.loads(data) except bencode.BencodeDecodingError: From 73d97d8188f723560953a3af25c25d1a7096b4b3 Mon Sep 17 00:00:00 2001 From: Richard Kiss Date: Sun, 14 May 2017 13:55:19 -0700 Subject: [PATCH 09/20] Better cancel all outstanding tasks. --- magneticod/magneticod/__main__.py | 4 +++- magneticod/magneticod/dht.py | 14 +++++--------- 2 files changed, 8 insertions(+), 10 deletions(-) diff --git a/magneticod/magneticod/__main__.py b/magneticod/magneticod/__main__.py index 8295536..0d412ae 100644 --- a/magneticod/magneticod/__main__.py +++ b/magneticod/magneticod/__main__.py @@ -51,7 +51,7 @@ def main(): node = dht.SybilNode(arguments.node_addr, complete_info_hashes, arguments.max_metadata_size) loop.run_until_complete(node.launch(loop)) - loop.create_task(watch_q(database, node._metadata_q)) + watch_q_task = loop.create_task(watch_q(database, node._metadata_q)) try: loop.run_forever() @@ -59,7 +59,9 @@ def main(): logging.critical("Keyboard interrupt received! Exiting gracefully...") finally: database.close() + watch_q_task.cancel() loop.run_until_complete(node.shutdown()) + loop.run_until_complete(watch_q_task) return 0 diff --git a/magneticod/magneticod/dht.py b/magneticod/magneticod/dht.py index bdd1f54..a39ef4d 100644 --- a/magneticod/magneticod/dht.py +++ b/magneticod/magneticod/dht.py @@ -105,15 +105,11 @@ class SybilNode: self.__n_max_neighbours = self.__n_max_neighbours * 101 // 100 async def shutdown(self) -> None: - peers = [peer for peer in itertools.chain.from_iterable(self.__peers.values())] - for peer in peers: - peer.cancel() - for peer in peers: - await peer - for task in self._tasks: - task.cancel() - for task in self._tasks: - await task + futures = [peer for peer in itertools.chain.from_iterable(self.__peers.values())] + futures.extend(self._tasks) + for future in futures: + future.cancel() + await asyncio.wait(futures) self._transport.close() def __on_FIND_NODE_response(self, message: bencode.KRPCDict) -> None: From 4b4c312fbed4f5c4561cc046cd69c66955629941 Mon Sep 17 00:00:00 2001 From: Richard Kiss Date: Sun, 14 May 2017 13:58:21 -0700 Subject: [PATCH 10/20] SybilNode now support pause_writing. --- magneticod/magneticod/bittorrent.py | 7 ------- magneticod/magneticod/dht.py | 20 ++++++++++++++++---- 2 files changed, 16 insertions(+), 11 deletions(-) diff --git a/magneticod/magneticod/bittorrent.py b/magneticod/magneticod/bittorrent.py index f5784af..dce6336 100644 --- a/magneticod/magneticod/bittorrent.py +++ b/magneticod/magneticod/bittorrent.py @@ -52,7 +52,6 @@ class DisposablePeer: self._metadata_future = loop.create_future() self._writer = None - self._is_paused = False try: self._reader, self._writer = await asyncio.open_connection( @@ -89,12 +88,6 @@ class DisposablePeer: self._writer.close() return self._metadata_future.result() - def pause_writing(self): - self._is_paused = True - - def resume_writing(self): - self._is_paused = False - def __on_message(self, message: bytes) -> None: length = len(message) diff --git a/magneticod/magneticod/dht.py b/magneticod/magneticod/dht.py index a39ef4d..91cbe4a 100644 --- a/magneticod/magneticod/dht.py +++ b/magneticod/magneticod/dht.py @@ -49,6 +49,7 @@ class SybilNode: self.__max_metadata_size = max_metadata_size self._metadata_q = asyncio.Queue() self._tasks = [] + self._is_paused = False logging.info("SybilNode %s on %s initialized!", self.__true_id.hex().upper(), address) @@ -61,6 +62,17 @@ class SybilNode: self._tasks.append(self._loop.create_task(self.increase_neighbour_task())) self._transport = transport + def pause_writing(self): + self._is_paused = True + + def resume_writing(self): + self._is_paused = False + + def sendto(self, data, addr): + if self._is_paused: + return + self._transport.sendto(data, addr) + def error_received(self, exc): logging.error("got error %s", exc) if isinstance(exc, PermissionError): @@ -148,7 +160,7 @@ class SybilNode: }) # we want to prioritise GET_PEERS responses as they are the most fruitful ones! # but there is no easy way to do this with asyncio - self._transport.sendto(data, addr) + self.sendto(data, addr) def __on_ANNOUNCE_PEER_query(self, message: bencode.KRPCDict, addr: NodeAddress) -> None: try: @@ -178,7 +190,7 @@ class SybilNode: b"id": node_id[:15] + self.__true_id[:5] } }) - self._transport.sendto(data, addr) + self.sendto(data, addr) if implied_port: peer_addr = (addr[0], addr[1]) @@ -208,11 +220,11 @@ class SybilNode: def __bootstrap(self) -> None: for addr in BOOTSTRAPPING_NODES: data = self.__build_FIND_NODE_query(self.__true_id) - self._transport.sendto(data, addr) + self.sendto(data, addr) def __make_neighbours(self) -> None: for node_id, addr in self._routing_table.items(): - self._transport.sendto(self.__build_FIND_NODE_query(node_id[:15] + self.__true_id[:5]), addr) + self.sendto(self.__build_FIND_NODE_query(node_id[:15] + self.__true_id[:5]), addr) @staticmethod def __decode_nodes(infos: bytes) -> typing.List[typing.Tuple[NodeID, NodeAddress]]: From f38a796181bb676b7091a473f7596611a549f783 Mon Sep 17 00:00:00 2001 From: Richard Kiss Date: Sun, 14 May 2017 14:04:46 -0700 Subject: [PATCH 11/20] Add connection_lost. Properly handle shutdown of watch_q. --- magneticod/magneticod/__main__.py | 2 +- magneticod/magneticod/dht.py | 3 +++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/magneticod/magneticod/__main__.py b/magneticod/magneticod/__main__.py index 0d412ae..bd075c9 100644 --- a/magneticod/magneticod/__main__.py +++ b/magneticod/magneticod/__main__.py @@ -61,7 +61,7 @@ def main(): database.close() watch_q_task.cancel() loop.run_until_complete(node.shutdown()) - loop.run_until_complete(watch_q_task) + loop.run_until_complete(asyncio.wait([watch_q_task])) return 0 diff --git a/magneticod/magneticod/dht.py b/magneticod/magneticod/dht.py index 91cbe4a..6b85446 100644 --- a/magneticod/magneticod/dht.py +++ b/magneticod/magneticod/dht.py @@ -62,6 +62,9 @@ class SybilNode: self._tasks.append(self._loop.create_task(self.increase_neighbour_task())) self._transport = transport + def connection_lost(self, exc): + self._is_paused = True + def pause_writing(self): self._is_paused = True From 71f55f0c919ca509645b34726fbc16ff52645979 Mon Sep 17 00:00:00 2001 From: Richard Kiss Date: Sun, 14 May 2017 14:13:34 -0700 Subject: [PATCH 12/20] Reduce noise. --- magneticod/magneticod/bittorrent.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/magneticod/magneticod/bittorrent.py b/magneticod/magneticod/bittorrent.py index dce6336..7a9a60a 100644 --- a/magneticod/magneticod/bittorrent.py +++ b/magneticod/magneticod/bittorrent.py @@ -81,7 +81,7 @@ class DisposablePeer: message = await self._reader.readexactly(length) self.__on_message(message) except Exception as ex: - logging.info("closing %s to %s", self.__info_hash.hex(), self.__peer_addr) + logging.debug("closing %s to %s", self.__info_hash.hex(), self.__peer_addr) if not self._metadata_future.done(): self._metadata_future.set_result(None) if self._writer: From 35c617654887c7883204facffa6421afb55bcd50 Mon Sep 17 00:00:00 2001 From: Richard Kiss Date: Sun, 14 May 2017 23:01:14 -0700 Subject: [PATCH 13/20] Move neighbour task work to tick task. --- magneticod/magneticod/dht.py | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/magneticod/magneticod/dht.py b/magneticod/magneticod/dht.py index 6b85446..7303f29 100644 --- a/magneticod/magneticod/dht.py +++ b/magneticod/magneticod/dht.py @@ -48,8 +48,8 @@ class SybilNode: self._complete_info_hashes = complete_info_hashes self.__max_metadata_size = max_metadata_size self._metadata_q = asyncio.Queue() - self._tasks = [] self._is_paused = False + self._tick_task = None logging.info("SybilNode %s on %s initialized!", self.__true_id.hex().upper(), address) @@ -58,8 +58,7 @@ class SybilNode: await loop.create_datagram_endpoint(lambda: self, local_addr=self.__address) def connection_made(self, transport): - self._tasks.append(self._loop.create_task(self.on_tick())) - self._tasks.append(self._loop.create_task(self.increase_neighbour_task())) + self._tick_task = self._loop.create_task(self.on_tick()) self._transport = transport def connection_lost(self, exc): @@ -93,6 +92,8 @@ class SybilNode: self.__bootstrap() self.__make_neighbours() self._routing_table.clear() + if not self._is_paused: + self.__n_max_neighbours = self.__n_max_neighbours * 101 // 100 def datagram_received(self, data, addr) -> None: # Ignore nodes that uses port 0 (assholes). @@ -114,14 +115,10 @@ class SybilNode: elif message.get(b"q") == b"announce_peer": self.__on_ANNOUNCE_PEER_query(message, addr) - async def increase_neighbour_task(self): - while True: - await asyncio.sleep(10) - self.__n_max_neighbours = self.__n_max_neighbours * 101 // 100 - async def shutdown(self) -> None: futures = [peer for peer in itertools.chain.from_iterable(self.__peers.values())] - futures.extend(self._tasks) + if self._tick_task: + futures.append(self._tick_task) for future in futures: future.cancel() await asyncio.wait(futures) From e6098ffb4a7f3016af7f30250107c69eefa3b879 Mon Sep 17 00:00:00 2001 From: Richard Kiss Date: Mon, 15 May 2017 15:44:36 -0700 Subject: [PATCH 14/20] Use uvloop if available. --- magneticod/magneticod/__main__.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/magneticod/magneticod/__main__.py b/magneticod/magneticod/__main__.py index bd075c9..f17c2ee 100644 --- a/magneticod/magneticod/__main__.py +++ b/magneticod/magneticod/__main__.py @@ -37,6 +37,14 @@ def main(): logging.basicConfig(level=arguments.loglevel, format="%(asctime)s %(levelname)-8s %(message)s") logging.info("magneticod v%d.%d.%d started", *__version__) + # use uvloop if it's installed + try: + import uvloop + asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) + logging.info("using uvloop") + except ModuleNotFoundError: + pass + # noinspection PyBroadException try: path = arguments.database_file From 3e4eba740c2c2e77a950b1aa4ab4a857b6a06192 Mon Sep 17 00:00:00 2001 From: Richard Kiss Date: Mon, 15 May 2017 15:58:13 -0700 Subject: [PATCH 15/20] Do explicit look-up of bootstrap nodes, and query all responses. --- magneticod/magneticod/dht.py | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/magneticod/magneticod/dht.py b/magneticod/magneticod/dht.py index 7303f29..56e2417 100644 --- a/magneticod/magneticod/dht.py +++ b/magneticod/magneticod/dht.py @@ -89,7 +89,7 @@ class SybilNode: while True: await asyncio.sleep(1) if len(self._routing_table) == 0: - self.__bootstrap() + await self.__bootstrap() self.__make_neighbours() self._routing_table.clear() if not self._is_paused: @@ -217,10 +217,16 @@ class SybilNode: except asyncio.TimeoutError: pass - def __bootstrap(self) -> None: - for addr in BOOTSTRAPPING_NODES: - data = self.__build_FIND_NODE_query(self.__true_id) - self.sendto(data, addr) + async def __bootstrap(self) -> None: + for node in BOOTSTRAPPING_NODES: + try: + # AF_INET means ip4 only + responses = await self._loop.getaddrinfo(*node, family=socket.AF_INET) + for (family, type, proto, canonname, sockaddr) in responses: + data = self.__build_FIND_NODE_query(self.__true_id) + self.sendto(data, sockaddr) + except Exception: + logging.exception("bootstrap problem") def __make_neighbours(self) -> None: for node_id, addr in self._routing_table.items(): From 29b99a338e2ac64dcc01235cbfc0a994b6880ac9 Mon Sep 17 00:00:00 2001 From: Richard Kiss Date: Wed, 17 May 2017 12:10:40 -0700 Subject: [PATCH 16/20] ModuleNotFoundError (new in 3.6) => ImportError. --- magneticod/magneticod/__main__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/magneticod/magneticod/__main__.py b/magneticod/magneticod/__main__.py index f17c2ee..02a724b 100644 --- a/magneticod/magneticod/__main__.py +++ b/magneticod/magneticod/__main__.py @@ -42,7 +42,7 @@ def main(): import uvloop asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) logging.info("using uvloop") - except ModuleNotFoundError: + except ImportError: pass # noinspection PyBroadException From 9b1bbfcaa15e763f57320207b5fed6cbb1946dc5 Mon Sep 17 00:00:00 2001 From: Richard Kiss Date: Wed, 17 May 2017 13:16:30 -0700 Subject: [PATCH 17/20] Properly clean up fetch_metadata tasks. --- magneticod/magneticod/bittorrent.py | 23 +++++++++++------ magneticod/magneticod/dht.py | 39 ++++++++++++++--------------- 2 files changed, 35 insertions(+), 27 deletions(-) diff --git a/magneticod/magneticod/bittorrent.py b/magneticod/magneticod/bittorrent.py index 7a9a60a..1f9e0c7 100644 --- a/magneticod/magneticod/bittorrent.py +++ b/magneticod/magneticod/bittorrent.py @@ -25,9 +25,17 @@ InfoHash = bytes PeerAddress = typing.Tuple[str, int] -async def fetch_metadata(info_hash: InfoHash, peer_addr: PeerAddress, max_metadata_size): - return await DisposablePeer().run( - asyncio.get_event_loop(), info_hash, peer_addr, max_metadata_size) +async def fetch_metadata(info_hash: InfoHash, peer_addr: PeerAddress, max_metadata_size, timeout=None): + loop = asyncio.get_event_loop() + task = asyncio.ensure_future(DisposablePeer().run( + asyncio.get_event_loop(), info_hash, peer_addr, max_metadata_size)) + h = None + if timeout is not None: + h = loop.call_later(timeout, lambda: task.cancel()) + try: + return await task + except asyncio.CancelledError: + return None class ProtocolError(Exception): @@ -80,12 +88,13 @@ class DisposablePeer: length = int.from_bytes(buffer, "big") message = await self._reader.readexactly(length) self.__on_message(message) - except Exception as ex: + except Exception: logging.debug("closing %s to %s", self.__info_hash.hex(), self.__peer_addr) + finally: if not self._metadata_future.done(): self._metadata_future.set_result(None) - if self._writer: - self._writer.close() + if self._writer: + self._writer.close() return self._metadata_future.result() def __on_message(self, message: bytes) -> None: @@ -199,7 +208,7 @@ class DisposablePeer: if self.__metadata_received == self.__metadata_size: if hashlib.sha1(self.__metadata).digest() == self.__info_hash: if not self._metadata_future.done(): - self._metadata_future.set_result((self.__info_hash, bytes(self.__metadata))) + self._metadata_future.set_result(bytes(self.__metadata)) else: logging.debug("Invalid Metadata! Ignoring.") diff --git a/magneticod/magneticod/dht.py b/magneticod/magneticod/dht.py index 56e2417..ae6092b 100644 --- a/magneticod/magneticod/dht.py +++ b/magneticod/magneticod/dht.py @@ -43,8 +43,8 @@ class SybilNode: # Maximum number of neighbours (this is a THRESHOLD where, once reached, the search for new neighbours will # stop; but until then, the total number of neighbours might exceed the threshold). self.__n_max_neighbours = 2000 - self.__peers = collections.defaultdict( - list) # type: typing.DefaultDict[dht.InfoHash, typing.List[bittorrent.DisposablePeer]] + self.__tasks = collections.defaultdict( + set) # type: typing.DefaultDict[dht.InfoHash, typing.Set[asyncio.Task]] self._complete_info_hashes = complete_info_hashes self.__max_metadata_size = max_metadata_size self._metadata_q = asyncio.Queue() @@ -116,7 +116,7 @@ class SybilNode: self.__on_ANNOUNCE_PEER_query(message, addr) async def shutdown(self) -> None: - futures = [peer for peer in itertools.chain.from_iterable(self.__peers.values())] + futures = [task for task in itertools.chain.from_iterable(self.__tasks.values())] if self._tick_task: futures.append(self._tick_task) for future in futures: @@ -197,25 +197,24 @@ class SybilNode: else: peer_addr = (addr[0], port) - if len(self.__peers[info_hash]) > MAX_ACTIVE_PEERS_PER_INFO_HASH or \ - info_hash in self._complete_info_hashes: + if info_hash in self._complete_info_hashes or \ + len(self.__tasks[info_hash]) > MAX_ACTIVE_PEERS_PER_INFO_HASH: return + task = self._loop.create_task(bittorrent.fetch_metadata( + info_hash, peer_addr, self.__max_metadata_size, timeout=PEER_TIMEOUT)) + self.__tasks[info_hash].add(task) + task.add_done_callback(lambda f: self._got_result(task, info_hash)) - peer = self._loop.create_task(self._launch_fetch(info_hash, peer_addr)) - self.__peers[info_hash].append(peer) - - async def _launch_fetch(self, info_hash, peer_addr): - try: - f = bittorrent.fetch_metadata(info_hash, peer_addr, self.__max_metadata_size) - r = await asyncio.wait_for(f, timeout=PEER_TIMEOUT) - if r: - info_hash, metadata = r - for peer in self.__peers[info_hash]: - peer.cancel() - self._complete_info_hashes.add(info_hash) - await self._metadata_q.put(r) - except asyncio.TimeoutError: - pass + def _got_result(self, task, info_hash): + task_set = self.__tasks[info_hash] + metadata = task.result() + if metadata: + self._complete_info_hashes.add(info_hash) + self._metadata_q.put_nowait((info_hash, metadata)) + for task in task_set: + task.cancel() + if len(task_set) == 0: + del self.__tasks[info_hash] async def __bootstrap(self) -> None: for node in BOOTSTRAPPING_NODES: From 8df4015e06d099b58e7fb215f51ef73fcd2c5c2e Mon Sep 17 00:00:00 2001 From: Richard Kiss Date: Wed, 24 May 2017 12:36:47 -0700 Subject: [PATCH 18/20] Be a little smarter with task clean-up. --- magneticod/magneticod/dht.py | 54 ++++++++++++++++++++++++------------ 1 file changed, 37 insertions(+), 17 deletions(-) diff --git a/magneticod/magneticod/dht.py b/magneticod/magneticod/dht.py index ae6092b..ea2dcd2 100644 --- a/magneticod/magneticod/dht.py +++ b/magneticod/magneticod/dht.py @@ -13,7 +13,6 @@ # You should have received a copy of the GNU Affero General Public License along with this program. If not, see # . import asyncio -import collections import itertools import zlib import logging @@ -43,8 +42,7 @@ class SybilNode: # Maximum number of neighbours (this is a THRESHOLD where, once reached, the search for new neighbours will # stop; but until then, the total number of neighbours might exceed the threshold). self.__n_max_neighbours = 2000 - self.__tasks = collections.defaultdict( - set) # type: typing.DefaultDict[dht.InfoHash, typing.Set[asyncio.Task]] + self.__tasks = {} # type: typing.Dict[dht.InfoHash, asyncio.Future] self._complete_info_hashes = complete_info_hashes self.__max_metadata_size = max_metadata_size self._metadata_q = asyncio.Queue() @@ -116,7 +114,7 @@ class SybilNode: self.__on_ANNOUNCE_PEER_query(message, addr) async def shutdown(self) -> None: - futures = [task for task in itertools.chain.from_iterable(self.__tasks.values())] + futures = list(self.__tasks.values()) if self._tick_task: futures.append(self._tick_task) for future in futures: @@ -197,24 +195,46 @@ class SybilNode: else: peer_addr = (addr[0], port) - if info_hash in self._complete_info_hashes or \ - len(self.__tasks[info_hash]) > MAX_ACTIVE_PEERS_PER_INFO_HASH: + if info_hash in self._complete_info_hashes: return - task = self._loop.create_task(bittorrent.fetch_metadata( - info_hash, peer_addr, self.__max_metadata_size, timeout=PEER_TIMEOUT)) - self.__tasks[info_hash].add(task) - task.add_done_callback(lambda f: self._got_result(task, info_hash)) - def _got_result(self, task, info_hash): - task_set = self.__tasks[info_hash] - metadata = task.result() + # create the parent future + if info_hash not in self.__tasks: + parent_f = self._loop.create_future() + parent_f.child_count = 0 + parent_f.add_done_callback(lambda f: self._parent_task_done(f, info_hash)) + self.__tasks[info_hash] = parent_f + + parent_f = self.__tasks[info_hash] + + if parent_f.done(): + return + if parent_f.child_count > MAX_ACTIVE_PEERS_PER_INFO_HASH: + return + + task = asyncio.ensure_future(bittorrent.fetch_metadata( + info_hash, peer_addr, self.__max_metadata_size, timeout=PEER_TIMEOUT)) + task.add_done_callback(lambda task: self._got_child_result(parent_f, task)) + parent_f.child_count += 1 + parent_f.add_done_callback(lambda f: task.cancel()) + + def _got_child_result(self, parent_task, child_task): + parent_task.child_count -= 1 + try: + metadata = child_task.result() + if metadata and not parent_task.done(): + parent_task.set_result(metadata) + except Exception: + logging.exception("child result is exception") + if parent_task.child_count <= 0 and not parent_task.done(): + parent_task.set_result(None) + + def _parent_task_done(self, parent_task, info_hash): + metadata = parent_task.result() if metadata: self._complete_info_hashes.add(info_hash) self._metadata_q.put_nowait((info_hash, metadata)) - for task in task_set: - task.cancel() - if len(task_set) == 0: - del self.__tasks[info_hash] + del self.__tasks[info_hash] async def __bootstrap(self) -> None: for node in BOOTSTRAPPING_NODES: From 4dc11b047fc3b4ff5e76cc0067382721d9049e68 Mon Sep 17 00:00:00 2001 From: Richard Kiss Date: Wed, 24 May 2017 14:50:26 -0700 Subject: [PATCH 19/20] Tidy up clean-up. Simplify fetch_metadata. --- magneticod/magneticod/bittorrent.py | 11 +++-------- magneticod/magneticod/dht.py | 24 ++++++++++++++---------- 2 files changed, 17 insertions(+), 18 deletions(-) diff --git a/magneticod/magneticod/bittorrent.py b/magneticod/magneticod/bittorrent.py index 1f9e0c7..993ca0b 100644 --- a/magneticod/magneticod/bittorrent.py +++ b/magneticod/magneticod/bittorrent.py @@ -26,15 +26,10 @@ PeerAddress = typing.Tuple[str, int] async def fetch_metadata(info_hash: InfoHash, peer_addr: PeerAddress, max_metadata_size, timeout=None): - loop = asyncio.get_event_loop() - task = asyncio.ensure_future(DisposablePeer().run( - asyncio.get_event_loop(), info_hash, peer_addr, max_metadata_size)) - h = None - if timeout is not None: - h = loop.call_later(timeout, lambda: task.cancel()) try: - return await task - except asyncio.CancelledError: + return await asyncio.wait_for(DisposablePeer().run( + asyncio.get_event_loop(), info_hash, peer_addr, max_metadata_size), timeout=timeout) + except asyncio.TimeoutError: return None diff --git a/magneticod/magneticod/dht.py b/magneticod/magneticod/dht.py index ea2dcd2..b2e83e1 100644 --- a/magneticod/magneticod/dht.py +++ b/magneticod/magneticod/dht.py @@ -114,12 +114,11 @@ class SybilNode: self.__on_ANNOUNCE_PEER_query(message, addr) async def shutdown(self) -> None: - futures = list(self.__tasks.values()) - if self._tick_task: - futures.append(self._tick_task) - for future in futures: - future.cancel() - await asyncio.wait(futures) + tasks = list(self.__tasks.values()) + for t in tasks: + t.set_result(None) + self._tick_task.cancel() + await asyncio.wait([self._tick_task]) self._transport.close() def __on_FIND_NODE_response(self, message: bencode.KRPCDict) -> None: @@ -224,16 +223,21 @@ class SybilNode: metadata = child_task.result() if metadata and not parent_task.done(): parent_task.set_result(metadata) + except asyncio.CancelledError: + pass except Exception: logging.exception("child result is exception") if parent_task.child_count <= 0 and not parent_task.done(): parent_task.set_result(None) def _parent_task_done(self, parent_task, info_hash): - metadata = parent_task.result() - if metadata: - self._complete_info_hashes.add(info_hash) - self._metadata_q.put_nowait((info_hash, metadata)) + try: + metadata = parent_task.result() + if metadata: + self._complete_info_hashes.add(info_hash) + self._metadata_q.put_nowait((info_hash, metadata)) + except asyncio.CancelledError: + pass del self.__tasks[info_hash] async def __bootstrap(self) -> None: From 6a459d5e58431eba5562bb0fa1e2dbd4d1f037be Mon Sep 17 00:00:00 2001 From: "Bora M. Alper" Date: Fri, 2 Jun 2017 15:34:22 +0300 Subject: [PATCH 20/20] huge commit, code review done on asyncio port --- magneticod/magneticod/__main__.py | 15 ++-- magneticod/magneticod/bittorrent.py | 38 ++++------ magneticod/magneticod/constants.py | 2 + magneticod/magneticod/dht.py | 114 +++++++++++++++++++--------- 4 files changed, 106 insertions(+), 63 deletions(-) diff --git a/magneticod/magneticod/__main__.py b/magneticod/magneticod/__main__.py index 02a724b..9205ffb 100644 --- a/magneticod/magneticod/__main__.py +++ b/magneticod/magneticod/__main__.py @@ -41,9 +41,9 @@ def main(): try: import uvloop asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) - logging.info("using uvloop") + logging.info("uvloop is being used") except ImportError: - pass + logging.exception("uvloop could not be imported, using the default asyncio implementation") # noinspection PyBroadException try: @@ -57,9 +57,9 @@ def main(): loop = asyncio.get_event_loop() node = dht.SybilNode(arguments.node_addr, complete_info_hashes, arguments.max_metadata_size) - loop.run_until_complete(node.launch(loop)) + loop.run_until_complete(node.launch()) - watch_q_task = loop.create_task(watch_q(database, node._metadata_q)) + watch_q_task = loop.create_task(metadata_queue_watcher(database, node.__metadata_queue)) try: loop.run_forever() @@ -74,9 +74,12 @@ def main(): return 0 -async def watch_q(database, q): +async def metadata_queue_watcher(database: persistence.Database, metadata_queue: asyncio.Queue) -> None: + """ + Watches for the metadata queue to commit any complete info hashes to the database. + """ while True: - info_hash, metadata = await q.get() + info_hash, metadata = await metadata_queue.get() succeeded = database.add_metadata(info_hash, metadata) if not succeeded: logging.info("Corrupt metadata for %s! Ignoring.", info_hash.hex()) diff --git a/magneticod/magneticod/bittorrent.py b/magneticod/magneticod/bittorrent.py index 993ca0b..0e80bfc 100644 --- a/magneticod/magneticod/bittorrent.py +++ b/magneticod/magneticod/bittorrent.py @@ -25,10 +25,9 @@ InfoHash = bytes PeerAddress = typing.Tuple[str, int] -async def fetch_metadata(info_hash: InfoHash, peer_addr: PeerAddress, max_metadata_size, timeout=None): +async def fetch_metadata_from_peer(info_hash: InfoHash, peer_addr: PeerAddress, max_metadata_size: int, timeout=None): try: - return await asyncio.wait_for(DisposablePeer().run( - asyncio.get_event_loop(), info_hash, peer_addr, max_metadata_size), timeout=timeout) + return await asyncio.wait_for(DisposablePeer(info_hash, peer_addr, max_metadata_size).run(), timeout=timeout) except asyncio.TimeoutError: return None @@ -38,39 +37,40 @@ class ProtocolError(Exception): class DisposablePeer: - async def run(self, loop, info_hash: InfoHash, peer_addr: PeerAddress, max_metadata_size: int): + def __init__(self, info_hash: InfoHash, peer_addr: PeerAddress, max_metadata_size: int) -> None: self.__peer_addr = peer_addr self.__info_hash = info_hash - self.__max_metadata_size = max_metadata_size - - self.__bt_handshake_complete = False # BitTorrent Handshake self.__ext_handshake_complete = False # Extension Handshake self.__ut_metadata = None # Since we don't know ut_metadata code that remote peer uses... + self.__max_metadata_size = max_metadata_size self.__metadata_size = None self.__metadata_received = 0 # Amount of metadata bytes received... self.__metadata = None - self._run_task = None - self._metadata_future = loop.create_future() + self._run_task = None self._writer = None + + async def run(self): + event_loop = asyncio.get_event_loop() + self._metadata_future = event_loop.create_future() + try: - self._reader, self._writer = await asyncio.open_connection( - self.__peer_addr[0], self.__peer_addr[1], loop=loop) + self._reader, self._writer = await asyncio.open_connection(*self.__peer_addr, loop=event_loop) # Send the BitTorrent handshake message (0x13 = 19 in decimal, the length of the handshake message) self._writer.write(b"\x13BitTorrent protocol%s%s%s" % ( b"\x00\x00\x00\x00\x00\x10\x00\x01", self.__info_hash, self.__random_bytes(20) )) - # Honestly speaking, BitTorrent protocol might be one of the most poorly documented and (not the most but) badly - # designed protocols I have ever seen (I am 19 years old so what I could have seen?). + # Honestly speaking, BitTorrent protocol might be one of the most poorly documented and (not the most but) + # badly designed protocols I have ever seen (I am 19 years old so what I could have seen?). # # Anyway, all the messages EXCEPT the handshake are length-prefixed by 4 bytes in network order, BUT the - # size of the handshake message is the 1-byte length prefix + 49 bytes, but luckily, there is only one canonical - # way of handshaking in the wild. + # size of the handshake message is the 1-byte length prefix + 49 bytes, but luckily, there is only one + # canonical way of handshaking in the wild. message = await self._reader.readexactly(68) if message[1:20] != b"BitTorrent protocol": # Erroneous handshake, possibly unknown version... @@ -93,12 +93,6 @@ class DisposablePeer: return self._metadata_future.result() def __on_message(self, message: bytes) -> None: - length = len(message) - - if length < 2: - # An extension message has minimum length of 2. - return - # Every extension message has BitTorrent Message ID = 20 if message[0] != 20: # logging.debug("Message is NOT an EXTension message! %s", message[:200]) @@ -127,7 +121,7 @@ class DisposablePeer: b"ut_metadata": 1 } }) - # In case you cannot read_file hex: + # In case you cannot read hex: # 0x14 = 20 (BitTorrent ID indicating that it's an extended message) # 0x00 = 0 (Extension ID indicating that it's the handshake message) self._writer.write(b"%b\x14%s" % ( diff --git a/magneticod/magneticod/constants.py b/magneticod/magneticod/constants.py index 5f2b1b2..07f2e76 100644 --- a/magneticod/magneticod/constants.py +++ b/magneticod/magneticod/constants.py @@ -6,6 +6,8 @@ BOOTSTRAPPING_NODES = [ ] PENDING_INFO_HASHES = 10 # threshold for pending info hashes before being committed to database: +TICK_INTERVAL = 1 # in seconds + # maximum (inclusive) number of active (disposable) peers to fetch the metadata per info hash at the same time: MAX_ACTIVE_PEERS_PER_INFO_HASH = 5 diff --git a/magneticod/magneticod/dht.py b/magneticod/magneticod/dht.py index b2e83e1..f451710 100644 --- a/magneticod/magneticod/dht.py +++ b/magneticod/magneticod/dht.py @@ -20,7 +20,7 @@ import socket import typing import os -from .constants import BOOTSTRAPPING_NODES, MAX_ACTIVE_PEERS_PER_INFO_HASH, PEER_TIMEOUT +from .constants import BOOTSTRAPPING_NODES, MAX_ACTIVE_PEERS_PER_INFO_HASH, PEER_TIMEOUT, TICK_INTERVAL from . import bencode from . import bittorrent @@ -28,9 +28,10 @@ NodeID = bytes NodeAddress = typing.Tuple[str, int] PeerAddress = typing.Tuple[str, int] InfoHash = bytes +Metadata = bytes -class SybilNode: +class SybilNode(asyncio.DatagramProtocol): def __init__(self, address: typing.Tuple[str, int], complete_info_hashes, max_metadata_size): self.__true_id = self.__random_bytes(20) @@ -42,55 +43,69 @@ class SybilNode: # Maximum number of neighbours (this is a THRESHOLD where, once reached, the search for new neighbours will # stop; but until then, the total number of neighbours might exceed the threshold). self.__n_max_neighbours = 2000 - self.__tasks = {} # type: typing.Dict[dht.InfoHash, asyncio.Future] + self.__parent_futures = {} # type: typing.Dict[dht.InfoHash, asyncio.Future] self._complete_info_hashes = complete_info_hashes self.__max_metadata_size = max_metadata_size - self._metadata_q = asyncio.Queue() - self._is_paused = False + # Complete metadatas will be added to the queue, to be retrieved and committed to the database. + self.__metadata_queue = asyncio.Queue() # typing.Collection[typing.Tuple[InfoHash, Metadata]] + self._is_writing_paused = False self._tick_task = None logging.info("SybilNode %s on %s initialized!", self.__true_id.hex().upper(), address) - async def launch(self, loop): - self._loop = loop - await loop.create_datagram_endpoint(lambda: self, local_addr=self.__address) + async def launch(self) -> None: + event_loop = asyncio.get_event_loop() + await event_loop.create_datagram_endpoint(lambda: self, local_addr=self.__address) - def connection_made(self, transport): - self._tick_task = self._loop.create_task(self.on_tick()) + def connection_made(self, transport: asyncio.DatagramTransport) -> None: + event_loop = asyncio.get_event_loop() + self._tick_task = event_loop.create_task(self.tick_periodically()) self._transport = transport - def connection_lost(self, exc): - self._is_paused = True + def connection_lost(self, exc) -> None: + logging.critical("SybilNode's connection is lost.") + self._is_writing_paused = True - def pause_writing(self): - self._is_paused = True + def pause_writing(self) -> None: + self._is_writing_paused = True + # In case of congestion, decrease the maximum number of nodes to the 90% of the current value. + self.__n_max_neighbours = self.__n_max_neighbours * 9 // 10 + logging.debug("Maximum number of neighbours now %d", self.__n_max_neighbours) - def resume_writing(self): - self._is_paused = False + def resume_writing(self) -> None: + self._is_writing_paused = False - def sendto(self, data, addr): - if self._is_paused: - return - self._transport.sendto(data, addr) + def sendto(self, data, addr) -> None: + if not self._is_writing_paused: + self._transport.sendto(data, addr) - def error_received(self, exc): - logging.error("got error %s", exc) + def error_received(self, exc: Exception) -> None: if isinstance(exc, PermissionError): + # This exception (EPERM errno: 1) is kernel's way of saying that "you are far too fast, chill". + # It is also likely that we have received a ICMP source quench packet (meaning, that we really need to + # slow down. + # + # Read more here: http://www.archivum.info/comp.protocols.tcp-ip/2009-05/00088/UDP-socket-amp-amp-sendto + # -amp-amp-EPERM.html + # # In case of congestion, decrease the maximum number of nodes to the 90% of the current value. if self.__n_max_neighbours < 200: logging.warning("Maximum number of neighbours are now less than 200 due to congestion!") else: self.__n_max_neighbours = self.__n_max_neighbours * 9 // 10 logging.debug("Maximum number of neighbours now %d", self.__n_max_neighbours) + else: + # The previous "exception" was kind of "unexceptional", but we should log anything else. + logging.error("SybilNode operational error: `%s`", exc) - async def on_tick(self) -> None: + async def tick_periodically(self) -> None: while True: - await asyncio.sleep(1) - if len(self._routing_table) == 0: + await asyncio.sleep(TICK_INTERVAL) + if not self._routing_table: await self.__bootstrap() self.__make_neighbours() self._routing_table.clear() - if not self._is_paused: + if not self._is_writing_paused: self.__n_max_neighbours = self.__n_max_neighbours * 101 // 100 def datagram_received(self, data, addr) -> None: @@ -114,7 +129,7 @@ class SybilNode: self.__on_ANNOUNCE_PEER_query(message, addr) async def shutdown(self) -> None: - tasks = list(self.__tasks.values()) + tasks = list(self.__parent_futures.values()) for t in tasks: t.set_result(None) self._tick_task.cancel() @@ -197,21 +212,36 @@ class SybilNode: if info_hash in self._complete_info_hashes: return + event_loop = asyncio.get_event_loop() + + # A little clarification about parent and child futures might be really useful here: + # For every info hash we are interested in, we create ONE parent future and save it under self.__tasks + # (info_hash -> task) dictionary. + # For EVERY DisposablePeer working to fetch the metadata of that info hash, we create a child future. Hence, for + # every parent future, there should be *at least* one child future. + # + # Parent and child futures are "connected" to each other through `add_done_callback` functionality: + # When a child is successfully done, it sets the result of its parent (`set_result()`), and if it was + # unsuccessful to fetch the metadata, it just checks whether there are any other child futures left and if not + # it terminates the parent future (by setting its result to None) and quits. + # When a parent future is successfully done, (through the callback) it adds the info hash to the set of + # completed metadatas and puts the metadata in the queue to be committed to the database. + # create the parent future - if info_hash not in self.__tasks: - parent_f = self._loop.create_future() + if info_hash not in self.__parent_futures: + parent_f = event_loop.create_future() parent_f.child_count = 0 parent_f.add_done_callback(lambda f: self._parent_task_done(f, info_hash)) - self.__tasks[info_hash] = parent_f + self.__parent_futures[info_hash] = parent_f - parent_f = self.__tasks[info_hash] + parent_f = self.__parent_futures[info_hash] if parent_f.done(): return if parent_f.child_count > MAX_ACTIVE_PEERS_PER_INFO_HASH: return - task = asyncio.ensure_future(bittorrent.fetch_metadata( + task = asyncio.ensure_future(bittorrent.fetch_metadata_from_peer( info_hash, peer_addr, self.__max_metadata_size, timeout=PEER_TIMEOUT)) task.add_done_callback(lambda task: self._got_child_result(parent_f, task)) parent_f.child_count += 1 @@ -221,6 +251,19 @@ class SybilNode: parent_task.child_count -= 1 try: metadata = child_task.result() + # Bora asked: + # Why do we check for parent_task being done here when a child got result? I mean, if parent_task is + # done before, and successful, all of its childs will be terminated and this function cannot be called + # anyway. + # + # --- https://github.com/boramalper/magnetico/pull/76#discussion_r119555423 + # + # Suppose two child tasks are fetching the same metadata for a parent and they finish at the same time + # (or very close). The first one wakes up, sets the parent_task result which will cause the done + # callback to be scheduled. The scheduler might still then chooses the second child task to run next + # (why not? It's been waiting longer) before the parent has a chance to cancel it. + # + # Thus spoke Richard. if metadata and not parent_task.done(): parent_task.set_result(metadata) except asyncio.CancelledError: @@ -235,16 +278,17 @@ class SybilNode: metadata = parent_task.result() if metadata: self._complete_info_hashes.add(info_hash) - self._metadata_q.put_nowait((info_hash, metadata)) + self.__metadata_queue.put_nowait((info_hash, metadata)) except asyncio.CancelledError: pass - del self.__tasks[info_hash] + del self.__parent_futures[info_hash] async def __bootstrap(self) -> None: + event_loop = asyncio.get_event_loop() for node in BOOTSTRAPPING_NODES: try: # AF_INET means ip4 only - responses = await self._loop.getaddrinfo(*node, family=socket.AF_INET) + responses = await event_loop.getaddrinfo(*node, family=socket.AF_INET) for (family, type, proto, canonname, sockaddr) in responses: data = self.__build_FIND_NODE_query(self.__true_id) self.sendto(data, sockaddr)