Tidy up clean-up. Simplify fetch_metadata.

This commit is contained in:
Richard Kiss 2017-05-24 14:50:26 -07:00
parent 8df4015e06
commit 4dc11b047f
2 changed files with 17 additions and 18 deletions

View File

@ -26,15 +26,10 @@ PeerAddress = typing.Tuple[str, int]
async def fetch_metadata(info_hash: InfoHash, peer_addr: PeerAddress, max_metadata_size, timeout=None): async def fetch_metadata(info_hash: InfoHash, peer_addr: PeerAddress, max_metadata_size, timeout=None):
loop = asyncio.get_event_loop()
task = asyncio.ensure_future(DisposablePeer().run(
asyncio.get_event_loop(), info_hash, peer_addr, max_metadata_size))
h = None
if timeout is not None:
h = loop.call_later(timeout, lambda: task.cancel())
try: try:
return await task return await asyncio.wait_for(DisposablePeer().run(
except asyncio.CancelledError: asyncio.get_event_loop(), info_hash, peer_addr, max_metadata_size), timeout=timeout)
except asyncio.TimeoutError:
return None return None

View File

@ -114,12 +114,11 @@ class SybilNode:
self.__on_ANNOUNCE_PEER_query(message, addr) self.__on_ANNOUNCE_PEER_query(message, addr)
async def shutdown(self) -> None: async def shutdown(self) -> None:
futures = list(self.__tasks.values()) tasks = list(self.__tasks.values())
if self._tick_task: for t in tasks:
futures.append(self._tick_task) t.set_result(None)
for future in futures: self._tick_task.cancel()
future.cancel() await asyncio.wait([self._tick_task])
await asyncio.wait(futures)
self._transport.close() self._transport.close()
def __on_FIND_NODE_response(self, message: bencode.KRPCDict) -> None: def __on_FIND_NODE_response(self, message: bencode.KRPCDict) -> None:
@ -224,16 +223,21 @@ class SybilNode:
metadata = child_task.result() metadata = child_task.result()
if metadata and not parent_task.done(): if metadata and not parent_task.done():
parent_task.set_result(metadata) parent_task.set_result(metadata)
except asyncio.CancelledError:
pass
except Exception: except Exception:
logging.exception("child result is exception") logging.exception("child result is exception")
if parent_task.child_count <= 0 and not parent_task.done(): if parent_task.child_count <= 0 and not parent_task.done():
parent_task.set_result(None) parent_task.set_result(None)
def _parent_task_done(self, parent_task, info_hash): def _parent_task_done(self, parent_task, info_hash):
metadata = parent_task.result() try:
if metadata: metadata = parent_task.result()
self._complete_info_hashes.add(info_hash) if metadata:
self._metadata_q.put_nowait((info_hash, metadata)) self._complete_info_hashes.add(info_hash)
self._metadata_q.put_nowait((info_hash, metadata))
except asyncio.CancelledError:
pass
del self.__tasks[info_hash] del self.__tasks[info_hash]
async def __bootstrap(self) -> None: async def __bootstrap(self) -> None: