Reduce leakage in bittorrent.py.

This commit is contained in:
Richard Kiss 2017-05-14 11:34:38 -07:00
parent 2f68ac3c7a
commit 4aea5df886

View File

@ -34,7 +34,8 @@ async def fetch_metadata(info_hash: InfoHash, peer_addr: PeerAddress, max_metada
class DisposablePeer: class DisposablePeer:
def __init__(self, info_hash: InfoHash, peer_addr: PeerAddress, max_metadata_size: int= DEFAULT_MAX_METADATA_SIZE): def __init__(self, info_hash: InfoHash, peer_addr: PeerAddress,
max_metadata_size: int=DEFAULT_MAX_METADATA_SIZE):
self.__peer_addr = peer_addr self.__peer_addr = peer_addr
self.__info_hash = info_hash self.__info_hash = info_hash
@ -47,16 +48,24 @@ class DisposablePeer:
self.__metadata_size = None self.__metadata_size = None
self.__metadata_received = 0 # Amount of metadata bytes received... self.__metadata_received = 0 # Amount of metadata bytes received...
self.__metadata = None self.__metadata = None
self._writer = None
# After 120 ticks passed, a peer should report an error and shut itself down due to being stall. def launch(self, loop):
self.__ticks_passed = 0
async def launch(self, loop):
self._loop = loop self._loop = loop
self._metadata_future = self._loop.create_future() self._metadata_future = self._loop.create_future()
self._loop.create_task(self.run())
return self._metadata_future
async def timeout(self):
# After 12 seconds passed, a peer should report an error and shut itself down due to being stall.
await asyncio.sleep(12)
self.close()
async def run(self):
try:
self._reader, self._writer = await asyncio.open_connection( self._reader, self._writer = await asyncio.open_connection(
self.__peer_addr[0], self.__peer_addr[1], loop=loop) self.__peer_addr[0], self.__peer_addr[1], loop=self._loop)
self._loop.create_task(self.timeout())
# Send the BitTorrent handshake message (0x13 = 19 in decimal, the length of the handshake message) # Send the BitTorrent handshake message (0x13 = 19 in decimal, the length of the handshake message)
self._writer.write(b"\x13BitTorrent protocol%s%s%s" % ( self._writer.write(b"\x13BitTorrent protocol%s%s%s" % (
@ -79,24 +88,20 @@ class DisposablePeer:
self.__on_bt_handshake(message) self.__on_bt_handshake(message)
try:
while not self._metadata_future.done(): while not self._metadata_future.done():
buffer = await self._reader.readexactly(4) buffer = await self._reader.readexactly(4)
length = int.from_bytes(buffer, "big") length = int.from_bytes(buffer, "big")
message = await self._reader.readexactly(length) message = await self._reader.readexactly(length)
self.__on_message(message) self.__on_message(message)
except Exception as ex: except Exception as ex:
self.close() pass
return await self._metadata_future
def when_metadata_found(self, info_hash: InfoHash, metadata: bytes) -> None:
self._metadata_future.set_result((info_hash, metadata))
self.close() self.close()
def close(self): def close(self):
self._writer.close()
if not self._metadata_future.done(): if not self._metadata_future.done():
self._metadata_future.set_result(None) self._metadata_future.set_result(None)
if self._writer:
self._writer.close()
def __on_message(self, message: bytes) -> None: def __on_message(self, message: bytes) -> None:
length = len(message) length = len(message)
@ -163,11 +168,11 @@ class DisposablePeer:
self.__peer_addr[1], self.__peer_addr[1],
self.__max_metadata_size) self.__max_metadata_size)
except KeyError: except KeyError:
self.when_error() self.close()
return return
except AssertionError as e: except AssertionError as e:
logging.debug(str(e)) logging.debug(str(e))
self.when_error() self.close()
return return
self.__ut_metadata = ut_metadata self.__ut_metadata = ut_metadata
@ -175,7 +180,7 @@ class DisposablePeer:
self.__metadata = bytearray(metadata_size) self.__metadata = bytearray(metadata_size)
except MemoryError: except MemoryError:
logging.exception("Could not allocate %.1f KiB for the metadata!", metadata_size / 1024) logging.exception("Could not allocate %.1f KiB for the metadata!", metadata_size / 1024)
self.when_error() self.close()
return return
self.__metadata_size = metadata_size self.__metadata_size = metadata_size
@ -213,13 +218,14 @@ class DisposablePeer:
if self.__metadata_received == self.__metadata_size: if self.__metadata_received == self.__metadata_size:
if hashlib.sha1(self.__metadata).digest() == self.__info_hash: if hashlib.sha1(self.__metadata).digest() == self.__info_hash:
self.when_metadata_found(self.__info_hash, bytes(self.__metadata)) self._metadata_future.set_result((self.__info_hash, bytes(self.__metadata)))
self.close()
else: else:
logging.debug("Invalid Metadata! Ignoring.") logging.debug("Invalid Metadata! Ignoring.")
elif msg_type == 2: # reject elif msg_type == 2: # reject
logging.info("Peer rejected us.") logging.info("Peer rejected us.")
self.when_error() self.close()
def __request_metadata_piece(self, piece: int) -> None: def __request_metadata_piece(self, piece: int) -> None:
msg_dict_dump = bencode.dumps({ msg_dict_dump = bencode.dumps({