Query DB when checking if an infohash is new or not.
This commit is contained in:
parent
d7ead951a4
commit
0e389aa619
@ -56,7 +56,7 @@ def create_tasks():
|
|||||||
complete_info_hashes = database.get_complete_info_hashes()
|
complete_info_hashes = database.get_complete_info_hashes()
|
||||||
|
|
||||||
loop = asyncio.get_event_loop()
|
loop = asyncio.get_event_loop()
|
||||||
node = dht.SybilNode(arguments.node_addr, complete_info_hashes, arguments.max_metadata_size)
|
node = dht.SybilNode(arguments.node_addr, database.is_infohash_new, arguments.max_metadata_size)
|
||||||
loop.create_task(node.launch(loop))
|
loop.create_task(node.launch(loop))
|
||||||
watch_q_task = loop.create_task(watch_q(database, node.metadata_q()))
|
watch_q_task = loop.create_task(watch_q(database, node.metadata_q()))
|
||||||
watch_q_task.add_done_callback(lambda x: clean_up(loop, database, node))
|
watch_q_task.add_done_callback(lambda x: clean_up(loop, database, node))
|
||||||
|
@ -31,7 +31,7 @@ InfoHash = bytes
|
|||||||
|
|
||||||
|
|
||||||
class SybilNode:
|
class SybilNode:
|
||||||
def __init__(self, address: typing.Tuple[str, int], complete_info_hashes, max_metadata_size):
|
def __init__(self, address: typing.Tuple[str, int], is_infohash_new, max_metadata_size):
|
||||||
self.__true_id = self.__random_bytes(20)
|
self.__true_id = self.__random_bytes(20)
|
||||||
|
|
||||||
self.__address = address
|
self.__address = address
|
||||||
@ -43,7 +43,7 @@ class SybilNode:
|
|||||||
# stop; but until then, the total number of neighbours might exceed the threshold).
|
# stop; but until then, the total number of neighbours might exceed the threshold).
|
||||||
self.__n_max_neighbours = 2000
|
self.__n_max_neighbours = 2000
|
||||||
self.__tasks = {} # type: typing.Dict[dht.InfoHash, asyncio.Future]
|
self.__tasks = {} # type: typing.Dict[dht.InfoHash, asyncio.Future]
|
||||||
self._complete_info_hashes = complete_info_hashes
|
self._is_inforhash_new = is_infohash_new
|
||||||
self.__max_metadata_size = max_metadata_size
|
self.__max_metadata_size = max_metadata_size
|
||||||
self._metadata_q = asyncio.Queue()
|
self._metadata_q = asyncio.Queue()
|
||||||
self._is_paused = False
|
self._is_paused = False
|
||||||
@ -200,7 +200,7 @@ class SybilNode:
|
|||||||
else:
|
else:
|
||||||
peer_addr = (addr[0], port)
|
peer_addr = (addr[0], port)
|
||||||
|
|
||||||
if info_hash in self._complete_info_hashes:
|
if not self._is_inforhash_new(info_hash):
|
||||||
return
|
return
|
||||||
|
|
||||||
# create the parent future
|
# create the parent future
|
||||||
@ -240,7 +240,6 @@ class SybilNode:
|
|||||||
try:
|
try:
|
||||||
metadata = parent_task.result()
|
metadata = parent_task.result()
|
||||||
if metadata:
|
if metadata:
|
||||||
self._complete_info_hashes.add(info_hash)
|
|
||||||
self._metadata_q.put_nowait((info_hash, metadata))
|
self._metadata_q.put_nowait((info_hash, metadata))
|
||||||
except asyncio.CancelledError:
|
except asyncio.CancelledError:
|
||||||
pass
|
pass
|
||||||
|
@ -94,11 +94,14 @@ class Database:
|
|||||||
|
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def get_complete_info_hashes(self) -> typing.Set[bytes]:
|
def is_infohash_new(self, info_hash):
|
||||||
|
if info_hash in [x[0] for x in self.__pending_metadata]:
|
||||||
|
return False
|
||||||
cur = self.__db_conn.cursor()
|
cur = self.__db_conn.cursor()
|
||||||
try:
|
try:
|
||||||
cur.execute("SELECT info_hash FROM torrents;")
|
cur.execute("SELECT count(info_hash) FROM torrents where info_hash = ?;", [info_hash])
|
||||||
return set(x[0] for x in cur.fetchall())
|
x, = cur.fetchone()
|
||||||
|
return x == 0
|
||||||
finally:
|
finally:
|
||||||
cur.close()
|
cur.close()
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user