Be a little smarter with task clean-up.
This commit is contained in:
parent
9b1bbfcaa1
commit
8df4015e06
@ -13,7 +13,6 @@
|
|||||||
# You should have received a copy of the GNU Affero General Public License along with this program. If not, see
|
# You should have received a copy of the GNU Affero General Public License along with this program. If not, see
|
||||||
# <http://www.gnu.org/licenses/>.
|
# <http://www.gnu.org/licenses/>.
|
||||||
import asyncio
|
import asyncio
|
||||||
import collections
|
|
||||||
import itertools
|
import itertools
|
||||||
import zlib
|
import zlib
|
||||||
import logging
|
import logging
|
||||||
@ -43,8 +42,7 @@ class SybilNode:
|
|||||||
# Maximum number of neighbours (this is a THRESHOLD where, once reached, the search for new neighbours will
|
# Maximum number of neighbours (this is a THRESHOLD where, once reached, the search for new neighbours will
|
||||||
# stop; but until then, the total number of neighbours might exceed the threshold).
|
# stop; but until then, the total number of neighbours might exceed the threshold).
|
||||||
self.__n_max_neighbours = 2000
|
self.__n_max_neighbours = 2000
|
||||||
self.__tasks = collections.defaultdict(
|
self.__tasks = {} # type: typing.Dict[dht.InfoHash, asyncio.Future]
|
||||||
set) # type: typing.DefaultDict[dht.InfoHash, typing.Set[asyncio.Task]]
|
|
||||||
self._complete_info_hashes = complete_info_hashes
|
self._complete_info_hashes = complete_info_hashes
|
||||||
self.__max_metadata_size = max_metadata_size
|
self.__max_metadata_size = max_metadata_size
|
||||||
self._metadata_q = asyncio.Queue()
|
self._metadata_q = asyncio.Queue()
|
||||||
@ -116,7 +114,7 @@ class SybilNode:
|
|||||||
self.__on_ANNOUNCE_PEER_query(message, addr)
|
self.__on_ANNOUNCE_PEER_query(message, addr)
|
||||||
|
|
||||||
async def shutdown(self) -> None:
|
async def shutdown(self) -> None:
|
||||||
futures = [task for task in itertools.chain.from_iterable(self.__tasks.values())]
|
futures = list(self.__tasks.values())
|
||||||
if self._tick_task:
|
if self._tick_task:
|
||||||
futures.append(self._tick_task)
|
futures.append(self._tick_task)
|
||||||
for future in futures:
|
for future in futures:
|
||||||
@ -197,23 +195,45 @@ class SybilNode:
|
|||||||
else:
|
else:
|
||||||
peer_addr = (addr[0], port)
|
peer_addr = (addr[0], port)
|
||||||
|
|
||||||
if info_hash in self._complete_info_hashes or \
|
if info_hash in self._complete_info_hashes:
|
||||||
len(self.__tasks[info_hash]) > MAX_ACTIVE_PEERS_PER_INFO_HASH:
|
|
||||||
return
|
return
|
||||||
task = self._loop.create_task(bittorrent.fetch_metadata(
|
|
||||||
info_hash, peer_addr, self.__max_metadata_size, timeout=PEER_TIMEOUT))
|
|
||||||
self.__tasks[info_hash].add(task)
|
|
||||||
task.add_done_callback(lambda f: self._got_result(task, info_hash))
|
|
||||||
|
|
||||||
def _got_result(self, task, info_hash):
|
# create the parent future
|
||||||
task_set = self.__tasks[info_hash]
|
if info_hash not in self.__tasks:
|
||||||
metadata = task.result()
|
parent_f = self._loop.create_future()
|
||||||
|
parent_f.child_count = 0
|
||||||
|
parent_f.add_done_callback(lambda f: self._parent_task_done(f, info_hash))
|
||||||
|
self.__tasks[info_hash] = parent_f
|
||||||
|
|
||||||
|
parent_f = self.__tasks[info_hash]
|
||||||
|
|
||||||
|
if parent_f.done():
|
||||||
|
return
|
||||||
|
if parent_f.child_count > MAX_ACTIVE_PEERS_PER_INFO_HASH:
|
||||||
|
return
|
||||||
|
|
||||||
|
task = asyncio.ensure_future(bittorrent.fetch_metadata(
|
||||||
|
info_hash, peer_addr, self.__max_metadata_size, timeout=PEER_TIMEOUT))
|
||||||
|
task.add_done_callback(lambda task: self._got_child_result(parent_f, task))
|
||||||
|
parent_f.child_count += 1
|
||||||
|
parent_f.add_done_callback(lambda f: task.cancel())
|
||||||
|
|
||||||
|
def _got_child_result(self, parent_task, child_task):
|
||||||
|
parent_task.child_count -= 1
|
||||||
|
try:
|
||||||
|
metadata = child_task.result()
|
||||||
|
if metadata and not parent_task.done():
|
||||||
|
parent_task.set_result(metadata)
|
||||||
|
except Exception:
|
||||||
|
logging.exception("child result is exception")
|
||||||
|
if parent_task.child_count <= 0 and not parent_task.done():
|
||||||
|
parent_task.set_result(None)
|
||||||
|
|
||||||
|
def _parent_task_done(self, parent_task, info_hash):
|
||||||
|
metadata = parent_task.result()
|
||||||
if metadata:
|
if metadata:
|
||||||
self._complete_info_hashes.add(info_hash)
|
self._complete_info_hashes.add(info_hash)
|
||||||
self._metadata_q.put_nowait((info_hash, metadata))
|
self._metadata_q.put_nowait((info_hash, metadata))
|
||||||
for task in task_set:
|
|
||||||
task.cancel()
|
|
||||||
if len(task_set) == 0:
|
|
||||||
del self.__tasks[info_hash]
|
del self.__tasks[info_hash]
|
||||||
|
|
||||||
async def __bootstrap(self) -> None:
|
async def __bootstrap(self) -> None:
|
||||||
|
Loading…
Reference in New Issue
Block a user