From 4aea5df886d285249c8ac664678cd98704cfb8f6 Mon Sep 17 00:00:00 2001 From: Richard Kiss Date: Sun, 14 May 2017 11:34:38 -0700 Subject: [PATCH] Reduce leakage in bittorrent.py. --- magneticod/magneticod/bittorrent.py | 84 +++++++++++++++-------------- 1 file changed, 45 insertions(+), 39 deletions(-) diff --git a/magneticod/magneticod/bittorrent.py b/magneticod/magneticod/bittorrent.py index 92efc19..c4d9eb5 100644 --- a/magneticod/magneticod/bittorrent.py +++ b/magneticod/magneticod/bittorrent.py @@ -34,7 +34,8 @@ async def fetch_metadata(info_hash: InfoHash, peer_addr: PeerAddress, max_metada class DisposablePeer: - def __init__(self, info_hash: InfoHash, peer_addr: PeerAddress, max_metadata_size: int= DEFAULT_MAX_METADATA_SIZE): + def __init__(self, info_hash: InfoHash, peer_addr: PeerAddress, + max_metadata_size: int=DEFAULT_MAX_METADATA_SIZE): self.__peer_addr = peer_addr self.__info_hash = info_hash @@ -47,56 +48,60 @@ class DisposablePeer: self.__metadata_size = None self.__metadata_received = 0 # Amount of metadata bytes received... self.__metadata = None + self._writer = None - # After 120 ticks passed, a peer should report an error and shut itself down due to being stall. - self.__ticks_passed = 0 - - async def launch(self, loop): + def launch(self, loop): self._loop = loop self._metadata_future = self._loop.create_future() + self._loop.create_task(self.run()) + return self._metadata_future - self._reader, self._writer = await asyncio.open_connection( - self.__peer_addr[0], self.__peer_addr[1], loop=loop) - - # Send the BitTorrent handshake message (0x13 = 19 in decimal, the length of the handshake message) - self._writer.write(b"\x13BitTorrent protocol%s%s%s" % ( - b"\x00\x00\x00\x00\x00\x10\x00\x01", - self.__info_hash, - self.__random_bytes(20) - )) - # Honestly speaking, BitTorrent protocol might be one of the most poorly documented and (not the most but) badly - # designed protocols I have ever seen (I am 19 years old so what I could have seen?). - # - # Anyway, all the messages EXCEPT the handshake are length-prefixed by 4 bytes in network order, BUT the - # size of the handshake message is the 1-byte length prefix + 49 bytes, but luckily, there is only one canonical - # way of handshaking in the wild. - message = await self._reader.readexactly(68) - if message[1:20] != b"BitTorrent protocol": - # Erroneous handshake, possibly unknown version... - logging.debug("Erroneous BitTorrent handshake! %s", message) - self.close() - return - - self.__on_bt_handshake(message) + async def timeout(self): + # After 12 seconds passed, a peer should report an error and shut itself down due to being stall. + await asyncio.sleep(12) + self.close() + async def run(self): try: + self._reader, self._writer = await asyncio.open_connection( + self.__peer_addr[0], self.__peer_addr[1], loop=self._loop) + self._loop.create_task(self.timeout()) + + # Send the BitTorrent handshake message (0x13 = 19 in decimal, the length of the handshake message) + self._writer.write(b"\x13BitTorrent protocol%s%s%s" % ( + b"\x00\x00\x00\x00\x00\x10\x00\x01", + self.__info_hash, + self.__random_bytes(20) + )) + # Honestly speaking, BitTorrent protocol might be one of the most poorly documented and (not the most but) badly + # designed protocols I have ever seen (I am 19 years old so what I could have seen?). + # + # Anyway, all the messages EXCEPT the handshake are length-prefixed by 4 bytes in network order, BUT the + # size of the handshake message is the 1-byte length prefix + 49 bytes, but luckily, there is only one canonical + # way of handshaking in the wild. + message = await self._reader.readexactly(68) + if message[1:20] != b"BitTorrent protocol": + # Erroneous handshake, possibly unknown version... + logging.debug("Erroneous BitTorrent handshake! %s", message) + self.close() + return + + self.__on_bt_handshake(message) + while not self._metadata_future.done(): buffer = await self._reader.readexactly(4) length = int.from_bytes(buffer, "big") message = await self._reader.readexactly(length) self.__on_message(message) except Exception as ex: - self.close() - return await self._metadata_future - - def when_metadata_found(self, info_hash: InfoHash, metadata: bytes) -> None: - self._metadata_future.set_result((info_hash, metadata)) + pass self.close() def close(self): - self._writer.close() if not self._metadata_future.done(): self._metadata_future.set_result(None) + if self._writer: + self._writer.close() def __on_message(self, message: bytes) -> None: length = len(message) @@ -163,11 +168,11 @@ class DisposablePeer: self.__peer_addr[1], self.__max_metadata_size) except KeyError: - self.when_error() + self.close() return except AssertionError as e: logging.debug(str(e)) - self.when_error() + self.close() return self.__ut_metadata = ut_metadata @@ -175,7 +180,7 @@ class DisposablePeer: self.__metadata = bytearray(metadata_size) except MemoryError: logging.exception("Could not allocate %.1f KiB for the metadata!", metadata_size / 1024) - self.when_error() + self.close() return self.__metadata_size = metadata_size @@ -213,13 +218,14 @@ class DisposablePeer: if self.__metadata_received == self.__metadata_size: if hashlib.sha1(self.__metadata).digest() == self.__info_hash: - self.when_metadata_found(self.__info_hash, bytes(self.__metadata)) + self._metadata_future.set_result((self.__info_hash, bytes(self.__metadata))) + self.close() else: logging.debug("Invalid Metadata! Ignoring.") elif msg_type == 2: # reject logging.info("Peer rejected us.") - self.when_error() + self.close() def __request_metadata_piece(self, piece: int) -> None: msg_dict_dump = bencode.dumps({