changed directory structure, bittorrent is still being refactored

This commit is contained in:
Bora M. Alper 2017-07-18 17:51:33 +03:00
parent e0241fe48c
commit 5ee325f8f5
16 changed files with 579 additions and 48 deletions

View File

@ -61,7 +61,7 @@ class DisposablePeer:
try: try:
self._reader, self._writer = await asyncio.open_connection(*self.__peer_addr, loop=event_loop) # type: ignore self._reader, self._writer = await asyncio.open_connection(*self.__peer_addr, loop=event_loop) # type: ignore
# Send the BitTorrent handshake message (0x13 = 19 in decimal, the length of the handshake message) # Send the BitTorrent initiate_the_bittorrent_handshake message (0x13 = 19 in decimal, the length of the initiate_the_bittorrent_handshake message)
self._writer.write(b"\x13BitTorrent protocol%s%s%s" % ( # type: ignore self._writer.write(b"\x13BitTorrent protocol%s%s%s" % ( # type: ignore
b"\x00\x00\x00\x00\x00\x10\x00\x01", b"\x00\x00\x00\x00\x00\x10\x00\x01",
self.__info_hash, self.__info_hash,
@ -70,13 +70,13 @@ class DisposablePeer:
# Honestly speaking, BitTorrent protocol might be one of the most poorly documented and (not the most but) # Honestly speaking, BitTorrent protocol might be one of the most poorly documented and (not the most but)
# badly designed protocols I have ever seen (I am 19 years old so what I could have seen?). # badly designed protocols I have ever seen (I am 19 years old so what I could have seen?).
# #
# Anyway, all the messages EXCEPT the handshake are length-prefixed by 4 bytes in network order, BUT the # Anyway, all the messages EXCEPT the initiate_the_bittorrent_handshake are length-prefixed by 4 bytes in network order, BUT the
# size of the handshake message is the 1-byte length prefix + 49 bytes, but luckily, there is only one # size of the initiate_the_bittorrent_handshake message is the 1-byte length prefix + 49 bytes, but luckily, there is only one
# canonical way of handshaking in the wild. # canonical way of handshaking in the wild.
message = await self._reader.readexactly(68) message = await self._reader.readexactly(68)
if message[1:20] != b"BitTorrent protocol": if message[1:20] != b"BitTorrent protocol":
# Erroneous handshake, possibly unknown version... # Erroneous initiate_the_bittorrent_handshake, possibly unknown version...
raise ProtocolError("Erroneous BitTorrent handshake! %s" % message) raise ProtocolError("Erroneous BitTorrent initiate_the_bittorrent_handshake! %s" % message)
self.__on_bt_handshake(message) self.__on_bt_handshake(message)
@ -114,7 +114,7 @@ class DisposablePeer:
self.__on_ext_message(message[2:]) self.__on_ext_message(message[2:])
def __on_bt_handshake(self, message: bytes) -> None: def __on_bt_handshake(self, message: bytes) -> None:
""" on BitTorrent Handshake... send the extension handshake! """ """ on BitTorrent Handshake... send the extension initiate_the_bittorrent_handshake! """
if message[25] != 16: if message[25] != 16:
logging.info("Peer does NOT support the extension protocol") logging.info("Peer does NOT support the extension protocol")
@ -125,7 +125,7 @@ class DisposablePeer:
}) })
# In case you cannot read hex: # In case you cannot read hex:
# 0x14 = 20 (BitTorrent ID indicating that it's an extended message) # 0x14 = 20 (BitTorrent ID indicating that it's an extended message)
# 0x00 = 0 (Extension ID indicating that it's the handshake message) # 0x00 = 0 (Extension ID indicating that it's the initiate_the_bittorrent_handshake message)
self._writer.write(b"%b\x14%s" % ( # type: ignore self._writer.write(b"%b\x14%s" % ( # type: ignore
(2 + len(msg_dict_dump)).to_bytes(4, "big"), (2 + len(msg_dict_dump)).to_bytes(4, "big"),
b'\0' + msg_dict_dump b'\0' + msg_dict_dump
@ -140,7 +140,7 @@ class DisposablePeer:
except bencode.BencodeDecodingError: except bencode.BencodeDecodingError:
# One might be tempted to close the connection, but why care? Any DisposableNode will be disposed # One might be tempted to close the connection, but why care? Any DisposableNode will be disposed
# automatically anyway (after a certain amount of time if the metadata is still not complete). # automatically anyway (after a certain amount of time if the metadata is still not complete).
logging.debug("Could NOT decode extension handshake message! %s", message[:200]) logging.debug("Could NOT decode extension initiate_the_bittorrent_handshake message! %s", message[:200])
return return
try: try:
@ -166,7 +166,7 @@ class DisposablePeer:
self.__metadata_size = metadata_size self.__metadata_size = metadata_size
self.__ext_handshake_complete = True self.__ext_handshake_complete = True
# After the handshake is complete, request all the pieces of metadata # After the initiate_the_bittorrent_handshake is complete, request all the pieces of metadata
n_pieces = math.ceil(self.__metadata_size / (2 ** 14)) n_pieces = math.ceil(self.__metadata_size / (2 ** 14))
for piece in range(n_pieces): for piece in range(n_pieces):
self.__request_metadata_piece(piece) self.__request_metadata_piece(piece)

View File

@ -0,0 +1,64 @@
# magneticod - Autonomous BitTorrent DHT crawler and metadata fetcher.
# Copyright (C) 2017 Mert Bora ALPER <bora@boramalper.org>
# Dedicated to Cemile Binay, in whose hands I thrived.
#
# This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General
# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any
# later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
# details.
#
# You should have received a copy of the GNU Affero General Public License along with this program. If not, see
# <http://www.gnu.org/licenses/>.
import typing
import io
import better_bencode
Message = typing.Dict[bytes, typing.Any]
def encode(message: Message) -> bytes:
try:
return better_bencode.dumps(message)
except Exception as exc:
raise EncodeError(exc)
def decode(data: typing.ByteString) -> Message:
try:
return better_bencode.loads(data)
except Exception as exc:
raise DecodeError(exc)
def decode_prefix(data: typing.ByteString) -> typing.Tuple[Message, int]:
"""
Returns the bencoded object AND the index where the dump of the decoded object ends (exclusive). In less words:
dump = b"i12eOH YEAH"
object, i = decode_prefix(dump)
print(">>>", dump[i:]) # OUTPUT: >>> b'OH YEAH'
"""
bio = io.BytesIO(data)
try:
return better_bencode.load(bio), bio.tell()
except Exception as exc:
raise DecodeError(exc)
class BaseCodecError(Exception):
def __init__(self, original_exception: Exception):
self.original_exception = original_exception
class EncodeError(BaseCodecError):
pass
class DecodeError(BaseCodecError):
pass

View File

@ -12,8 +12,31 @@
# #
# You should have received a copy of the GNU Affero General Public License along with this program. If not, see # You should have received a copy of the GNU Affero General Public License along with this program. If not, see
# <http://www.gnu.org/licenses/>. # <http://www.gnu.org/licenses/>.
import asyncio
from .dht import mainline
from . import bittorrent
class InfoHashSink: class Coordinator:
def __init__(self): def __init__(self):
self._peer_service = mainline.service.PeerService()
self._metadata_service_tasks = {}
async def run(self):
await self._peer_service.launch(("0.0.0.0", 0))
# Private Functionality
# =====================
def _when_peer(self, info_hash: mainline.protocol.InfoHash, address: mainline.transport.Address) \
-> None:
if info_hash in self._metadata_service_tasks:
return
self._metadata_service_tasks[info_hash] = asyncio.ensure_future()
def _when_metadata(self, info_hash: mainline.protocol.InfoHash, address: mainline.transport.Address) -> None:
pass pass

View File

@ -1,28 +0,0 @@
# magneticod - Autonomous BitTorrent DHT crawler and metadata fetcher.
# Copyright (C) 2017 Mert Bora ALPER <bora@boramalper.org>
# Dedicated to Cemile Binay, in whose hands I thrived.
#
# This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General
# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any
# later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
# details.
#
# You should have received a copy of the GNU Affero General Public License along with this program. If not, see
# <http://www.gnu.org/licenses/>.
def encode():
pass
def decode(data):
pass
class EncodeError(Exception):
pass
class DecodeError(Exception):
pass

View File

@ -0,0 +1,296 @@
# magneticod - Autonomous BitTorrent DHT crawler and metadata fetcher.
# Copyright (C) 2017 Mert Bora ALPER <bora@boramalper.org>
# Dedicated to Cemile Binay, in whose hands I thrived.
#
# This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General
# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any
# later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
# details.
#
# You should have received a copy of the GNU Affero General Public License along with this program. If not, see
# <http://www.gnu.org/licenses/>.
import enum
import typing
import cerberus
from magneticod import types
from ..codecs import bencode
from .. import transports
class Protocol:
def __init__(self):
self._transport = transports.bittorrent.TCPTransport()
self._transport.on_the_bittorrent_handshake_completed = self._when_the_bittorrent_handshake_completed
self._transport.on_message = self._when_message
self._transport.on_keepalive = self._when_keepalive
# When we initiate extension handshake, we set the keys of this dictionary with the ExtensionType's we show
# interest in, and if the remote peer is also interested in them, we record which type code the remote peer
# prefers for them in this dictionary, but until then, the keys have the None value. If our interest for an
# ExtensionType is not mutual, we remove the key from the dictionary.
self._enabled_extensions = {} # typing.Dict[ExtensionType, typing.Optional[int]]
async def launch(self) -> None:
await self._transport.launch()
# Offered Functionality
# =====================
def initiate_the_bittorrent_handshake(self, reserved: bytes, info_hash: types.InfoHash, peer_id: types.PeerID) \
-> None:
self._transport.initiate_the_bittorrent_handshake(reserved, info_hash, peer_id)
def send_keepalive(self) -> None:
pass
def send_choke_message(self) -> None:
raise NotImplementedError()
def send_unchoke_message(self) -> None:
raise NotImplementedError()
def send_interested_message(self) -> None:
raise NotImplementedError()
def send_not_interested_message(self) -> None:
raise NotImplementedError()
def send_have_message(self) -> None:
raise NotImplementedError()
def send_bitfield_message(self) -> None:
raise NotImplementedError()
def send_request_message(self) -> None:
raise NotImplementedError()
def send_piece_message(self) -> None:
raise NotImplementedError()
def send_cancel_message(self) -> None:
raise NotImplementedError()
def send_extension_handshake(
self,
supported_extensions: typing.Set[ExtensionType],
local_port: int=-1,
name_and_version: str="magneticod 0.x.x"
) -> None:
pass
@staticmethod
def on_the_bittorrent_handshake_completed(reserved: bytes, info_hash: types.InfoHash, peer_id: types.PeerID) \
-> None:
pass
@staticmethod
def on_keepalive() -> None:
pass
@staticmethod
def on_choke_message() -> None:
pass
@staticmethod
def on_unchoke_message() -> None:
pass
@staticmethod
def on_interested_message() -> None:
pass
@staticmethod
def on_not_interested_message() -> None:
pass
@staticmethod
def on_have_message(index: int) -> None:
pass
@staticmethod
def on_bitfield_message() -> None:
raise NotImplementedError()
@staticmethod
def on_request_message(index: int, begin: int, length: int) -> None:
pass
@staticmethod
def on_piece_message(index: int, begin: int, piece: int) -> None:
pass
@staticmethod
def on_cancel_message(index: int, begin: int, length: int) -> None:
pass
@staticmethod
def on_extension_handshake_completed(payload: types.Dictionary) -> None:
pass
# Private Functionality
# =====================
def _when_the_bittorrent_handshake_completed(
self,
reserved: bytes,
info_hash: types.InfoHash,
peer_id: types.PeerID
) -> None:
self.on_the_bittorrent_handshake_completed(reserved, info_hash, peer_id)
def _when_keepalive(self) -> None:
self.on_keepalive()
def _when_message(self, type_: bytes, payload: typing.ByteString) -> None:
if type_ == MessageTypes.CHOKE:
self.on_choke_message()
elif type_ == MessageTypes.UNCHOKE:
self.on_unchoke_message()
elif type_ == MessageTypes.INTERESTED:
self.on_interested_message()
elif type_ == MessageTypes.NOT_INTERESTED:
self.on_not_interested_message()
elif type_ == MessageTypes.HAVE:
index = int.from_bytes(payload[:4], "big")
self.on_have_message(index)
elif type_ == MessageTypes.BITFIELD:
raise NotImplementedError()
elif type_ == MessageTypes.REQUEST:
index = int.from_bytes(payload[:4], "big")
begin = int.from_bytes(payload[4:8], "big")
length = int.from_bytes(payload[8:12], "big")
self.on_request_message(index, begin, length)
elif type_ == MessageTypes.PIECE:
index = int.from_bytes(payload[:4], "big")
begin = int.from_bytes(payload[4:8], "big")
piece = int.from_bytes(payload[8:12], "big")
self.on_piece_message(index, begin, piece)
elif type_ == MessageTypes.CANCEL:
index = int.from_bytes(payload[:4], "big")
begin = int.from_bytes(payload[4:8], "big")
length = int.from_bytes(payload[8:12], "big")
self.on_cancel_message(index, begin, length)
elif type_ == MessageTypes.EXTENDED:
self._when_extended_message(type_=payload[:1], payload=payload[1:])
else:
pass
def _when_extended_message(self, type_: bytes, payload: typing.ByteString) -> None:
if type_ == 0:
self._when_extension_handshake(payload)
elif type_ == ExtensionType.UT_METADATA.value:
pass
else:
pass
def _when_extension_handshake(self, payload: typing.ByteString) -> None:
dictionary_schema = {
b"m": {
"type": "dict",
"keyschema": {"type": "binary", "empty": False},
"valueschema": {"type": "integer", "min": 0},
"required": True,
},
b"p": {
"type": "integer",
"min": 1,
"max": 2**16 - 1,
"required": False
},
b"v": {
"type": "binary",
"empty": False,
"required": False
},
b"yourip": {
"type": "binary",
# It's actually EITHER 4 OR 16, not anything in-between. We need to validate this ourselves!
"minlength": 4,
"maxlength": 16,
"required": False
},
b"ipv6": {
"type": "binary",
"minlength": 16,
"maxlength": 16,
"required": False
},
b"ipv4": {
"type": "binary",
"minlength": 4,
"maxlength": 4,
"required": False
},
b"reqq": {
"type": "integer",
"min": 0,
"required": False
}
}
try:
dictionary = bencode.decode(payload)
except bencode.DecodeError:
return
if not cerberus.Validator(dictionary_schema).validate(dictionary):
return
# Check which extensions, that we show interest in, are enabled by the remote peer.
if ExtensionType.UT_METADATA in self._enabled_extensions and b"ut_metadata" in dictionary[b"m"]:
self._enabled_extensions[ExtensionType.UT_METADATA] = dictionary[b"m"][b"ut_metadata"]
# As there can be multiple SUBSEQUENT extension-handshake-messages, check for the existence of b"metadata_size"
# in the top level dictionary ONLY IF b"ut_metadata" exists in b"m". `ut_metadata` might be enabled before,
# and other subsequent extension-handshake-messages do not need to include 'metadata_size` field in the top
# level dictionary whilst enabling-and/or-disabling other extension features.
if (b"ut_metadata" in dictionary[b"m"]) ^ (b"metadata_size" not in dictionary):
return
self.on_extension_handshake_completed(dictionary)
@enum.unique
class MessageTypes(enum.IntEnum):
CHOKE = 0
UNCHOKE = 1
INTERESTED = 2
NOT_INTERESTED = 3
HAVE = 4
BITFIELD = 5
REQUEST = 6
PIECE = 7
CANCEL = 8
EXTENDED = 20
@enum.unique
class ReservedFeature(enum.Enum):
# What do values mean?
# The first number is the offset, and the second number is the bit to set. For instance,
# EXTENSION_PROTOCOL = (5, 0x10) means that reserved[5] & 0x10 should be true.
DHT = (7, 0x01)
EXTENSION_PROTOCOL = (5, 0x10)
def reserved_feature_set_to_reserved(reserved_feature_set: typing.Set[ReservedFeature]) -> bytes:
reserved = 8 * b"\x00"
for reserved_feature in reserved_feature_set:
reserved[reserved_feature.value[0]] |= reserved_feature.value[1]
return reserved
def reserved_to_reserved_feature_set(reserved: bytes) -> typing.Set[ReservedFeature]:
return {
reserved_feature
for reserved_feature in ReservedFeature
if reserved[reserved_feature.value[0]] & reserved_feature.value[1]
}
@enum.unique
class ExtensionType(enum.IntEnum):
UT_METADATA = 1

View File

@ -0,0 +1,63 @@
# magneticod - Autonomous BitTorrent DHT crawler and metadata fetcher.
# Copyright (C) 2017 Mert Bora ALPER <bora@boramalper.org>
# Dedicated to Cemile Binay, in whose hands I thrived.
#
# This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General
# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any
# later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
# details.
#
# You should have received a copy of the GNU Affero General Public License along with this program. If not, see
# <http://www.gnu.org/licenses/>.
import logging
from ..protocols import bittorrent as bt_protocol
from magneticod import types
class MetadataService:
def __init__(self, peer_id: types.PeerID, info_hash: types.InfoHash):
self._protocol = bt_protocol.Protocol()
self._protocol.on_the_bittorrent_handshake_completed = self._when_the_bittorrent_handshake_completed
self._protocol.on_extension_handshake_completed = self._when_extension_handshake_completed
self._peer_id = peer_id
self._info_hash = info_hash
async def launch(self) -> None:
await self._protocol.launch()
self._protocol.initiate_the_bittorrent_handshake(
bt_protocol.reserved_feature_set_to_reserved({
bt_protocol.ReservedFeature.EXTENSION_PROTOCOL,
bt_protocol.ReservedFeature.DHT
}),
self._info_hash,
self._peer_id
)
# Offered Functionality
# =====================
@staticmethod
def on_fatal_failure() -> None:
pass
# Private Functionality
# =====================
def _when_the_bittorrent_handshake_completed(
self,
reserved: bytes,
info_hash: types.InfoHash,
peer_id: types.PeerID
) -> None:
if bt_protocol.ReservedFeature.EXTENSION_PROTOCOL not in bt_protocol.reserved_to_reserved_feature_set(reserved):
logging.info("Peer does NOT support the extension protocol.")
self.on_fatal_failure()
self._protocol.send_extension_handshake({bt_protocol.ExtensionType.UT_METADATA})
def _when_extension_handshake_completed(self, payload: types.Dictionary) -> None:
pass

View File

@ -22,7 +22,7 @@ from magneticod import constants
from . import protocol from . import protocol
class TrawlingService: class PeerService:
def __init__(self): def __init__(self):
self._protocol = protocol.Protocol(b"mc00") self._protocol = protocol.Protocol(b"mc00")
@ -34,18 +34,21 @@ class TrawlingService:
self._token_secret = os.urandom(4) self._token_secret = os.urandom(4)
self._routing_table = {} # typing.Dict[protocol.NodeID, protocol.transport.Address] self._routing_table = {} # typing.Dict[protocol.NodeID, protocol.transport.Address]
self._tick_task = None
async def launch(self, address: protocol.transport.Address): async def launch(self, address: protocol.transport.Address):
await self._protocol.launch(address) await self._protocol.launch(address)
self._tick_task = asyncio.ensure_future(self._tick_periodically())
# Offered Functionality # Offered Functionality
# ===================== # =====================
@staticmethod @staticmethod
def on_info_hash_and_peer(info_hash: protocol.InfoHash, address: protocol.transport.Address) -> None: def on_peer(info_hash: protocol.InfoHash, address: protocol.transport.Address) -> None:
pass pass
# Private Functionality # Private Functionality
# ===================== # =====================
async def tick_periodically(self) -> None: async def _tick_periodically(self) -> None:
while True: while True:
if not self._routing_table: if not self._routing_table:
await self._bootstrap() await self._bootstrap()

View File

@ -0,0 +1,102 @@
# magneticod - Autonomous BitTorrent DHT crawler and metadata fetcher.
# Copyright (C) 2017 Mert Bora ALPER <bora@boramalper.org>
# Dedicated to Cemile Binay, in whose hands I thrived.
#
# This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General
# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any
# later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
# details.
#
# You should have received a copy of the GNU Affero General Public License along with this program. If not, see
# <http://www.gnu.org/licenses/>.
import asyncio
import typing
from magneticod import types
class TCPTransport(asyncio.Protocol):
def __init__(self):
self._stream_transport = asyncio.Transport()
self._incoming = bytearray()
self._awaiting_the_bittorrent_handshake = True
async def launch(self):
await asyncio.get_event_loop().create_connection(lambda: self, "0.0.0.0", 0)
# Offered Functionality
# =====================
def initiate_the_bittorrent_handshake(self, reserved: bytes, info_hash: types.InfoHash, peer_id: types.PeerID) -> None:
self._stream_transport.write(b"\x13BitTorrent protocol%s%s%s" % (
reserved,
info_hash,
peer_id
))
def send_keepalive(self) -> None:
self._stream_transport.write(b"\x00\x00\x00\x00")
def send_message(self, type_: bytes, payload: typing.ByteString) -> None:
if len(type_) != 1:
raise ValueError("Argument `type_` must be a single byte!")
length = 1 + len(payload)
@staticmethod
def on_keepalive() -> None:
pass
@staticmethod
def on_message(type_: bytes, payload: typing.ByteString) -> None:
pass
@staticmethod
def on_the_bittorrent_handshake_completed(
reserved: typing.ByteString,
info_hash: types.InfoHash,
peer_id: types.PeerID
) -> None:
pass
# Private Functionality
# =====================
def connection_made(self, transport: asyncio.Transport) -> None:
self._stream_transport = transport
def data_received(self, data: typing.ByteString) -> None:
self._incoming += data
if self._awaiting_the_bittorrent_handshake:
if len(self._incoming) >= 68:
assert self._incoming.startswith(b"\x13BitTorrent protocol")
self.on_the_bittorrent_handshake_completed(
reserved=self._incoming[20:28],
info_hash=self._incoming[28:48],
peer_id=self._incoming[48:68]
)
self._incoming = self._incoming[68:]
self._awaiting_the_bittorrent_handshake = False
else:
return
# Continue or Start the "usual" processing from here below
if len(self._incoming) >= 4 and len(self._incoming) - 1 >= int.from_bytes(self._incoming[:4], "big"):
if int.from_bytes(self._incoming[:4], "big"):
self.on_keepalive()
else:
self.on_message(self._incoming[4], self._incoming[5:])
def eof_received(self):
pass
def connection_lost(self, exc: Exception) -> None:
pass
class UTPTransport:
pass

View File

@ -24,8 +24,8 @@ from . import codec
Address = typing.Tuple[str, int] Address = typing.Tuple[str, int]
MessageQueueEntry = typing.NamedTuple("MessageQueueEntry", [ MessageQueueEntry = typing.NamedTuple("MessageQueueEntry", [
("queued_on", float), ("queued_on", int),
("message", bytes), ("message", codec.Message),
("address", Address) ("address", Address)
]) ])
@ -50,13 +50,13 @@ class Transport(asyncio.DatagramProtocol):
# Offered Functionality # Offered Functionality
# ===================== # =====================
def send_message(self, message, address: Address) -> None: def send_message(self, message: codec.Message, address: Address) -> None:
self._message_queue.append(MessageQueueEntry(time.monotonic(), message, address)) self._message_queue.append(MessageQueueEntry(int(time.monotonic()), message, address))
if not self._queue_nonempty.is_set(): if not self._queue_nonempty.is_set():
self._queue_nonempty.set() self._queue_nonempty.set()
@staticmethod @staticmethod
def on_message(message: dict, address: Address): def on_message(message: codec.Message, address: Address):
pass pass
# Private Functionality # Private Functionality
@ -73,7 +73,7 @@ class Transport(asyncio.DatagramProtocol):
try: try:
message = codec.decode(data) message = codec.decode(data)
except codec.EncodeError: except codec.DecodeError:
return return
if not isinstance(message, dict): if not isinstance(message, dict):
@ -111,4 +111,4 @@ class Transport(asyncio.DatagramProtocol):
if time.monotonic() - queued_on > 60: if time.monotonic() - queued_on > 60:
return return
self._datagram_transport.sendto(message, address) self._datagram_transport.sendto(codec.encode(message), address)

View File

@ -0,0 +1,8 @@
import typing
InfoHash = typing.ByteString
NodeID = typing.ByteString
PeerID = typing.ByteString
IPAddress = typing.Tuple[str, int]
Dictionary = typing.Dict[bytes, typing.Any]