diff --git a/magneticod/magneticod/bittorrent.py b/magneticod/magneticod/bittorrent.py index e0fe2b2..75e0c56 100644 --- a/magneticod/magneticod/bittorrent.py +++ b/magneticod/magneticod/bittorrent.py @@ -27,15 +27,17 @@ PeerAddress = typing.Tuple[str, int] async def fetch_metadata(info_hash: InfoHash, peer_addr: PeerAddress, max_metadata_size): - loop = asyncio.get_event_loop() - peer = DisposablePeer(info_hash, peer_addr, max_metadata_size) - r = await peer.launch(loop) - return r + return await DisposablePeer().run( + asyncio.get_event_loop(), info_hash, peer_addr, max_metadata_size) + + +class ProtocolError(Exception): + pass class DisposablePeer: - def __init__(self, info_hash: InfoHash, peer_addr: PeerAddress, - max_metadata_size: int=DEFAULT_MAX_METADATA_SIZE): + async def run(self, loop, info_hash: InfoHash, peer_addr: PeerAddress, + max_metadata_size: int=DEFAULT_MAX_METADATA_SIZE): self.__peer_addr = peer_addr self.__info_hash = info_hash @@ -48,25 +50,14 @@ class DisposablePeer: self.__metadata_size = None self.__metadata_received = 0 # Amount of metadata bytes received... self.__metadata = None - self._writer = None self._run_task = None - def launch(self, loop): - self._loop = loop - self._metadata_future = self._loop.create_future() - self._run_task = self._loop.create_task(self.run()) - self._loop.create_task(self.timeout()) - return self._metadata_future + self._metadata_future = loop.create_future() + self._writer = None - async def timeout(self): - # After 12 seconds passed, a peer should report an error and shut itself down due to being stall. - await asyncio.sleep(12) - self.close() - - async def run(self): try: self._reader, self._writer = await asyncio.open_connection( - self.__peer_addr[0], self.__peer_addr[1], loop=self._loop) + self.__peer_addr[0], self.__peer_addr[1], loop=loop) # Send the BitTorrent handshake message (0x13 = 19 in decimal, the length of the handshake message) self._writer.write(b"\x13BitTorrent protocol%s%s%s" % ( b"\x00\x00\x00\x00\x00\x10\x00\x01", @@ -82,9 +73,7 @@ class DisposablePeer: message = await self._reader.readexactly(68) if message[1:20] != b"BitTorrent protocol": # Erroneous handshake, possibly unknown version... - logging.debug("Erroneous BitTorrent handshake! %s", message) - self.close() - return + raise ProtocolError("Erroneous BitTorrent handshake! %s" % message) self.__on_bt_handshake(message) @@ -94,16 +83,12 @@ class DisposablePeer: message = await self._reader.readexactly(length) self.__on_message(message) except Exception as ex: - pass - self.close() - - def close(self): - if not self._metadata_future.done(): - self._metadata_future.set_result(None) + logging.info("closing %s to %s", self.__info_hash.hex(), self.__peer_addr) + if not self._metadata_future.done(): + self._metadata_future.set_result(None) if self._writer: self._writer.close() - if self._run_task: - self._run_task.cancel() + return self._metadata_future.result() def __on_message(self, message: bytes) -> None: length = len(message) @@ -169,21 +154,16 @@ class DisposablePeer: " {} max metadata size".format(self.__peer_addr[0], self.__peer_addr[1], self.__max_metadata_size) - except KeyError: - self.close() - return except AssertionError as e: logging.debug(str(e)) - self.close() - return + raise self.__ut_metadata = ut_metadata try: self.__metadata = bytearray(metadata_size) except MemoryError: logging.exception("Could not allocate %.1f KiB for the metadata!", metadata_size / 1024) - self.close() - return + raise self.__metadata_size = metadata_size self.__ext_handshake_complete = True @@ -222,13 +202,11 @@ class DisposablePeer: if hashlib.sha1(self.__metadata).digest() == self.__info_hash: if not self._metadata_future.done(): self._metadata_future.set_result((self.__info_hash, bytes(self.__metadata))) - self.close() else: logging.debug("Invalid Metadata! Ignoring.") elif msg_type == 2: # reject logging.info("Peer rejected us.") - self.close() def __request_metadata_piece(self, piece: int) -> None: msg_dict_dump = bencode.dumps({ diff --git a/magneticod/magneticod/constants.py b/magneticod/magneticod/constants.py index bc2c954..b99ebff 100644 --- a/magneticod/magneticod/constants.py +++ b/magneticod/magneticod/constants.py @@ -9,3 +9,5 @@ PENDING_INFO_HASHES = 10 # threshold for pending info hashes before being commi TICK_INTERVAL = 1 # in seconds (soft constraint) # maximum (inclusive) number of active (disposable) peers to fetch the metadata per info hash at the same time: MAX_ACTIVE_PEERS_PER_INFO_HASH = 5 + +PEER_TIMEOUT=12 # seconds diff --git a/magneticod/magneticod/dht.py b/magneticod/magneticod/dht.py index dae64ac..be4f045 100644 --- a/magneticod/magneticod/dht.py +++ b/magneticod/magneticod/dht.py @@ -22,7 +22,7 @@ import socket import typing import os -from .constants import BOOTSTRAPPING_NODES, DEFAULT_MAX_METADATA_SIZE, MAX_ACTIVE_PEERS_PER_INFO_HASH +from .constants import BOOTSTRAPPING_NODES, DEFAULT_MAX_METADATA_SIZE, MAX_ACTIVE_PEERS_PER_INFO_HASH, PEER_TIMEOUT from . import bencode from . import bittorrent @@ -102,7 +102,7 @@ class SybilNode: def shutdown(self) -> None: for peer in itertools.chain.from_iterable(self.__peers.values()): - peer.close() + peer.cancel() self._transport.close() def __on_FIND_NODE_response(self, message: bencode.KRPCDict) -> None: @@ -182,18 +182,21 @@ class SybilNode: info_hash in self._complete_info_hashes: return - peer = bittorrent.fetch_metadata(info_hash, peer_addr, self.__max_metadata_size) + peer = self._loop.create_task(self._launch_fetch(info_hash, peer_addr)) self.__peers[info_hash].append(peer) - self._loop.create_task(peer).add_done_callback(self.metadata_found) - def metadata_found(self, future): - r = future.result() - if r: - info_hash, metadata = r - for peer in self.__peers[info_hash]: - peer.close() - self._metadata_q.put_nowait(r) - self._complete_info_hashes.add(info_hash) + async def _launch_fetch(self, info_hash, peer_addr): + try: + f = bittorrent.fetch_metadata(info_hash, peer_addr, self.__max_metadata_size) + r = await asyncio.wait_for(f, timeout=PEER_TIMEOUT) + if r: + info_hash, metadata = r + for peer in self.__peers[info_hash]: + peer.cancel() + self._complete_info_hashes.add(info_hash) + await self._metadata_q.put(r) + except asyncio.TimeoutError: + pass def __bootstrap(self) -> None: for addr in BOOTSTRAPPING_NODES: