presumably fixes #97 (UNIQUE constaint failed)

This commit is contained in:
Bora M. Alper 2017-07-05 09:44:37 +03:00
parent ecd04a84a0
commit b3e77e60b6
2 changed files with 12 additions and 21 deletions

View File

@ -31,17 +31,6 @@ from . import dht
from . import persistence from . import persistence
async def metadata_queue_watcher(database: persistence.Database, metadata_queue: asyncio.Queue) -> None:
"""
Watches for the metadata queue to commit any complete info hashes to the database.
"""
while True:
info_hash, metadata = await metadata_queue.get()
succeeded = database.add_metadata(info_hash, metadata)
if not succeeded:
logging.info("Corrupt metadata for %s! Ignoring.", info_hash.hex())
def parse_ip_port(netloc: str) -> typing.Optional[typing.Tuple[str, int]]: def parse_ip_port(netloc: str) -> typing.Optional[typing.Tuple[str, int]]:
# In case no port supplied # In case no port supplied
try: try:
@ -140,17 +129,14 @@ def main() -> int:
return 1 return 1
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
node = dht.SybilNode(database.is_infohash_new, arguments.max_metadata_size) node = dht.SybilNode(database, arguments.max_metadata_size)
loop.create_task(node.launch(arguments.node_addr)) loop.create_task(node.launch(arguments.node_addr))
# mypy ignored: mypy doesn't know (yet) about coroutines
metadata_queue_watcher_task = loop.create_task(metadata_queue_watcher(database, node.metadata_q())) # type: ignore
try: try:
asyncio.get_event_loop().run_forever() asyncio.get_event_loop().run_forever()
except KeyboardInterrupt: except KeyboardInterrupt:
logging.critical("Keyboard interrupt received! Exiting gracefully...") logging.critical("Keyboard interrupt received! Exiting gracefully...")
finally: finally:
metadata_queue_watcher_task.cancel()
loop.run_until_complete(node.shutdown()) loop.run_until_complete(node.shutdown())
database.close() database.close()

View File

@ -23,6 +23,7 @@ import os
from .constants import BOOTSTRAPPING_NODES, MAX_ACTIVE_PEERS_PER_INFO_HASH, PEER_TIMEOUT, TICK_INTERVAL from .constants import BOOTSTRAPPING_NODES, MAX_ACTIVE_PEERS_PER_INFO_HASH, PEER_TIMEOUT, TICK_INTERVAL
from . import bencode from . import bencode
from . import bittorrent from . import bittorrent
from . import persistence
NodeID = bytes NodeID = bytes
NodeAddress = typing.Tuple[str, int] NodeAddress = typing.Tuple[str, int]
@ -32,7 +33,7 @@ Metadata = bytes
class SybilNode(asyncio.DatagramProtocol): class SybilNode(asyncio.DatagramProtocol):
def __init__(self, is_info_hash_new, max_metadata_size): def __init__(self, database: persistence.Database, max_metadata_size):
self.__true_id = os.urandom(20) self.__true_id = os.urandom(20)
self._routing_table = {} # type: typing.Dict[NodeID, NodeAddress] self._routing_table = {} # type: typing.Dict[NodeID, NodeAddress]
@ -42,7 +43,7 @@ class SybilNode(asyncio.DatagramProtocol):
# stop; but until then, the total number of neighbours might exceed the threshold). # stop; but until then, the total number of neighbours might exceed the threshold).
self.__n_max_neighbours = 2000 self.__n_max_neighbours = 2000
self.__parent_futures = {} # type: typing.Dict[InfoHash, asyncio.Future] self.__parent_futures = {} # type: typing.Dict[InfoHash, asyncio.Future]
self._is_info_hash_new = is_info_hash_new self.__database = database
self.__max_metadata_size = max_metadata_size self.__max_metadata_size = max_metadata_size
# Complete metadatas will be added to the queue, to be retrieved and committed to the database. # Complete metadatas will be added to the queue, to be retrieved and committed to the database.
self.__metadata_queue = asyncio.Queue() # typing.Collection[typing.Tuple[InfoHash, Metadata]] self.__metadata_queue = asyncio.Queue() # typing.Collection[typing.Tuple[InfoHash, Metadata]]
@ -289,13 +290,17 @@ class SybilNode(asyncio.DatagramProtocol):
parent_task.set_result(None) parent_task.set_result(None)
def _parent_task_done(self, parent_task, info_hash): def _parent_task_done(self, parent_task, info_hash):
del self.__parent_futures[info_hash]
try: try:
metadata = parent_task.result() metadata = parent_task.result()
if metadata: if not metadata:
self.__metadata_queue.put_nowait((info_hash, metadata)) return
except asyncio.CancelledError: except asyncio.CancelledError:
pass return
del self.__parent_futures[info_hash]
succeeded = self.__database.add_metadata(info_hash, metadata)
if not succeeded:
logging.info("Corrupt metadata for %s! Ignoring.", info_hash.hex())
async def __bootstrap(self) -> None: async def __bootstrap(self) -> None:
event_loop = asyncio.get_event_loop() event_loop = asyncio.get_event_loop()