More clean-up and simplification.
This commit is contained in:
parent
f3ae493308
commit
635fbe8cb1
@ -27,14 +27,16 @@ PeerAddress = typing.Tuple[str, int]
|
|||||||
|
|
||||||
|
|
||||||
async def fetch_metadata(info_hash: InfoHash, peer_addr: PeerAddress, max_metadata_size):
|
async def fetch_metadata(info_hash: InfoHash, peer_addr: PeerAddress, max_metadata_size):
|
||||||
loop = asyncio.get_event_loop()
|
return await DisposablePeer().run(
|
||||||
peer = DisposablePeer(info_hash, peer_addr, max_metadata_size)
|
asyncio.get_event_loop(), info_hash, peer_addr, max_metadata_size)
|
||||||
r = await peer.launch(loop)
|
|
||||||
return r
|
|
||||||
|
class ProtocolError(Exception):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
class DisposablePeer:
|
class DisposablePeer:
|
||||||
def __init__(self, info_hash: InfoHash, peer_addr: PeerAddress,
|
async def run(self, loop, info_hash: InfoHash, peer_addr: PeerAddress,
|
||||||
max_metadata_size: int=DEFAULT_MAX_METADATA_SIZE):
|
max_metadata_size: int=DEFAULT_MAX_METADATA_SIZE):
|
||||||
self.__peer_addr = peer_addr
|
self.__peer_addr = peer_addr
|
||||||
self.__info_hash = info_hash
|
self.__info_hash = info_hash
|
||||||
@ -48,25 +50,14 @@ class DisposablePeer:
|
|||||||
self.__metadata_size = None
|
self.__metadata_size = None
|
||||||
self.__metadata_received = 0 # Amount of metadata bytes received...
|
self.__metadata_received = 0 # Amount of metadata bytes received...
|
||||||
self.__metadata = None
|
self.__metadata = None
|
||||||
self._writer = None
|
|
||||||
self._run_task = None
|
self._run_task = None
|
||||||
|
|
||||||
def launch(self, loop):
|
self._metadata_future = loop.create_future()
|
||||||
self._loop = loop
|
self._writer = None
|
||||||
self._metadata_future = self._loop.create_future()
|
|
||||||
self._run_task = self._loop.create_task(self.run())
|
|
||||||
self._loop.create_task(self.timeout())
|
|
||||||
return self._metadata_future
|
|
||||||
|
|
||||||
async def timeout(self):
|
|
||||||
# After 12 seconds passed, a peer should report an error and shut itself down due to being stall.
|
|
||||||
await asyncio.sleep(12)
|
|
||||||
self.close()
|
|
||||||
|
|
||||||
async def run(self):
|
|
||||||
try:
|
try:
|
||||||
self._reader, self._writer = await asyncio.open_connection(
|
self._reader, self._writer = await asyncio.open_connection(
|
||||||
self.__peer_addr[0], self.__peer_addr[1], loop=self._loop)
|
self.__peer_addr[0], self.__peer_addr[1], loop=loop)
|
||||||
# Send the BitTorrent handshake message (0x13 = 19 in decimal, the length of the handshake message)
|
# Send the BitTorrent handshake message (0x13 = 19 in decimal, the length of the handshake message)
|
||||||
self._writer.write(b"\x13BitTorrent protocol%s%s%s" % (
|
self._writer.write(b"\x13BitTorrent protocol%s%s%s" % (
|
||||||
b"\x00\x00\x00\x00\x00\x10\x00\x01",
|
b"\x00\x00\x00\x00\x00\x10\x00\x01",
|
||||||
@ -82,9 +73,7 @@ class DisposablePeer:
|
|||||||
message = await self._reader.readexactly(68)
|
message = await self._reader.readexactly(68)
|
||||||
if message[1:20] != b"BitTorrent protocol":
|
if message[1:20] != b"BitTorrent protocol":
|
||||||
# Erroneous handshake, possibly unknown version...
|
# Erroneous handshake, possibly unknown version...
|
||||||
logging.debug("Erroneous BitTorrent handshake! %s", message)
|
raise ProtocolError("Erroneous BitTorrent handshake! %s" % message)
|
||||||
self.close()
|
|
||||||
return
|
|
||||||
|
|
||||||
self.__on_bt_handshake(message)
|
self.__on_bt_handshake(message)
|
||||||
|
|
||||||
@ -94,16 +83,12 @@ class DisposablePeer:
|
|||||||
message = await self._reader.readexactly(length)
|
message = await self._reader.readexactly(length)
|
||||||
self.__on_message(message)
|
self.__on_message(message)
|
||||||
except Exception as ex:
|
except Exception as ex:
|
||||||
pass
|
logging.info("closing %s to %s", self.__info_hash.hex(), self.__peer_addr)
|
||||||
self.close()
|
|
||||||
|
|
||||||
def close(self):
|
|
||||||
if not self._metadata_future.done():
|
if not self._metadata_future.done():
|
||||||
self._metadata_future.set_result(None)
|
self._metadata_future.set_result(None)
|
||||||
if self._writer:
|
if self._writer:
|
||||||
self._writer.close()
|
self._writer.close()
|
||||||
if self._run_task:
|
return self._metadata_future.result()
|
||||||
self._run_task.cancel()
|
|
||||||
|
|
||||||
def __on_message(self, message: bytes) -> None:
|
def __on_message(self, message: bytes) -> None:
|
||||||
length = len(message)
|
length = len(message)
|
||||||
@ -169,21 +154,16 @@ class DisposablePeer:
|
|||||||
" {} max metadata size".format(self.__peer_addr[0],
|
" {} max metadata size".format(self.__peer_addr[0],
|
||||||
self.__peer_addr[1],
|
self.__peer_addr[1],
|
||||||
self.__max_metadata_size)
|
self.__max_metadata_size)
|
||||||
except KeyError:
|
|
||||||
self.close()
|
|
||||||
return
|
|
||||||
except AssertionError as e:
|
except AssertionError as e:
|
||||||
logging.debug(str(e))
|
logging.debug(str(e))
|
||||||
self.close()
|
raise
|
||||||
return
|
|
||||||
|
|
||||||
self.__ut_metadata = ut_metadata
|
self.__ut_metadata = ut_metadata
|
||||||
try:
|
try:
|
||||||
self.__metadata = bytearray(metadata_size)
|
self.__metadata = bytearray(metadata_size)
|
||||||
except MemoryError:
|
except MemoryError:
|
||||||
logging.exception("Could not allocate %.1f KiB for the metadata!", metadata_size / 1024)
|
logging.exception("Could not allocate %.1f KiB for the metadata!", metadata_size / 1024)
|
||||||
self.close()
|
raise
|
||||||
return
|
|
||||||
|
|
||||||
self.__metadata_size = metadata_size
|
self.__metadata_size = metadata_size
|
||||||
self.__ext_handshake_complete = True
|
self.__ext_handshake_complete = True
|
||||||
@ -222,13 +202,11 @@ class DisposablePeer:
|
|||||||
if hashlib.sha1(self.__metadata).digest() == self.__info_hash:
|
if hashlib.sha1(self.__metadata).digest() == self.__info_hash:
|
||||||
if not self._metadata_future.done():
|
if not self._metadata_future.done():
|
||||||
self._metadata_future.set_result((self.__info_hash, bytes(self.__metadata)))
|
self._metadata_future.set_result((self.__info_hash, bytes(self.__metadata)))
|
||||||
self.close()
|
|
||||||
else:
|
else:
|
||||||
logging.debug("Invalid Metadata! Ignoring.")
|
logging.debug("Invalid Metadata! Ignoring.")
|
||||||
|
|
||||||
elif msg_type == 2: # reject
|
elif msg_type == 2: # reject
|
||||||
logging.info("Peer rejected us.")
|
logging.info("Peer rejected us.")
|
||||||
self.close()
|
|
||||||
|
|
||||||
def __request_metadata_piece(self, piece: int) -> None:
|
def __request_metadata_piece(self, piece: int) -> None:
|
||||||
msg_dict_dump = bencode.dumps({
|
msg_dict_dump = bencode.dumps({
|
||||||
|
@ -9,3 +9,5 @@ PENDING_INFO_HASHES = 10 # threshold for pending info hashes before being commi
|
|||||||
TICK_INTERVAL = 1 # in seconds (soft constraint)
|
TICK_INTERVAL = 1 # in seconds (soft constraint)
|
||||||
# maximum (inclusive) number of active (disposable) peers to fetch the metadata per info hash at the same time:
|
# maximum (inclusive) number of active (disposable) peers to fetch the metadata per info hash at the same time:
|
||||||
MAX_ACTIVE_PEERS_PER_INFO_HASH = 5
|
MAX_ACTIVE_PEERS_PER_INFO_HASH = 5
|
||||||
|
|
||||||
|
PEER_TIMEOUT=12 # seconds
|
||||||
|
@ -22,7 +22,7 @@ import socket
|
|||||||
import typing
|
import typing
|
||||||
import os
|
import os
|
||||||
|
|
||||||
from .constants import BOOTSTRAPPING_NODES, DEFAULT_MAX_METADATA_SIZE, MAX_ACTIVE_PEERS_PER_INFO_HASH
|
from .constants import BOOTSTRAPPING_NODES, DEFAULT_MAX_METADATA_SIZE, MAX_ACTIVE_PEERS_PER_INFO_HASH, PEER_TIMEOUT
|
||||||
from . import bencode
|
from . import bencode
|
||||||
from . import bittorrent
|
from . import bittorrent
|
||||||
|
|
||||||
@ -102,7 +102,7 @@ class SybilNode:
|
|||||||
|
|
||||||
def shutdown(self) -> None:
|
def shutdown(self) -> None:
|
||||||
for peer in itertools.chain.from_iterable(self.__peers.values()):
|
for peer in itertools.chain.from_iterable(self.__peers.values()):
|
||||||
peer.close()
|
peer.cancel()
|
||||||
self._transport.close()
|
self._transport.close()
|
||||||
|
|
||||||
def __on_FIND_NODE_response(self, message: bencode.KRPCDict) -> None:
|
def __on_FIND_NODE_response(self, message: bencode.KRPCDict) -> None:
|
||||||
@ -182,18 +182,21 @@ class SybilNode:
|
|||||||
info_hash in self._complete_info_hashes:
|
info_hash in self._complete_info_hashes:
|
||||||
return
|
return
|
||||||
|
|
||||||
peer = bittorrent.fetch_metadata(info_hash, peer_addr, self.__max_metadata_size)
|
peer = self._loop.create_task(self._launch_fetch(info_hash, peer_addr))
|
||||||
self.__peers[info_hash].append(peer)
|
self.__peers[info_hash].append(peer)
|
||||||
self._loop.create_task(peer).add_done_callback(self.metadata_found)
|
|
||||||
|
|
||||||
def metadata_found(self, future):
|
async def _launch_fetch(self, info_hash, peer_addr):
|
||||||
r = future.result()
|
try:
|
||||||
|
f = bittorrent.fetch_metadata(info_hash, peer_addr, self.__max_metadata_size)
|
||||||
|
r = await asyncio.wait_for(f, timeout=PEER_TIMEOUT)
|
||||||
if r:
|
if r:
|
||||||
info_hash, metadata = r
|
info_hash, metadata = r
|
||||||
for peer in self.__peers[info_hash]:
|
for peer in self.__peers[info_hash]:
|
||||||
peer.close()
|
peer.cancel()
|
||||||
self._metadata_q.put_nowait(r)
|
|
||||||
self._complete_info_hashes.add(info_hash)
|
self._complete_info_hashes.add(info_hash)
|
||||||
|
await self._metadata_q.put(r)
|
||||||
|
except asyncio.TimeoutError:
|
||||||
|
pass
|
||||||
|
|
||||||
def __bootstrap(self) -> None:
|
def __bootstrap(self) -> None:
|
||||||
for addr in BOOTSTRAPPING_NODES:
|
for addr in BOOTSTRAPPING_NODES:
|
||||||
|
Loading…
Reference in New Issue
Block a user