diff --git a/magneticod/magneticod/bittorrent.py b/magneticod/magneticod/bittorrent.py index 083553f..49c97d0 100644 --- a/magneticod/magneticod/bittorrent.py +++ b/magneticod/magneticod/bittorrent.py @@ -61,7 +61,7 @@ class DisposablePeer: try: self._reader, self._writer = await asyncio.open_connection(*self.__peer_addr, loop=event_loop) # type: ignore - # Send the BitTorrent handshake message (0x13 = 19 in decimal, the length of the handshake message) + # Send the BitTorrent initiate_the_bittorrent_handshake message (0x13 = 19 in decimal, the length of the initiate_the_bittorrent_handshake message) self._writer.write(b"\x13BitTorrent protocol%s%s%s" % ( # type: ignore b"\x00\x00\x00\x00\x00\x10\x00\x01", self.__info_hash, @@ -70,13 +70,13 @@ class DisposablePeer: # Honestly speaking, BitTorrent protocol might be one of the most poorly documented and (not the most but) # badly designed protocols I have ever seen (I am 19 years old so what I could have seen?). # - # Anyway, all the messages EXCEPT the handshake are length-prefixed by 4 bytes in network order, BUT the - # size of the handshake message is the 1-byte length prefix + 49 bytes, but luckily, there is only one + # Anyway, all the messages EXCEPT the initiate_the_bittorrent_handshake are length-prefixed by 4 bytes in network order, BUT the + # size of the initiate_the_bittorrent_handshake message is the 1-byte length prefix + 49 bytes, but luckily, there is only one # canonical way of handshaking in the wild. message = await self._reader.readexactly(68) if message[1:20] != b"BitTorrent protocol": - # Erroneous handshake, possibly unknown version... - raise ProtocolError("Erroneous BitTorrent handshake! %s" % message) + # Erroneous initiate_the_bittorrent_handshake, possibly unknown version... + raise ProtocolError("Erroneous BitTorrent initiate_the_bittorrent_handshake! %s" % message) self.__on_bt_handshake(message) @@ -114,7 +114,7 @@ class DisposablePeer: self.__on_ext_message(message[2:]) def __on_bt_handshake(self, message: bytes) -> None: - """ on BitTorrent Handshake... send the extension handshake! """ + """ on BitTorrent Handshake... send the extension initiate_the_bittorrent_handshake! """ if message[25] != 16: logging.info("Peer does NOT support the extension protocol") @@ -125,7 +125,7 @@ class DisposablePeer: }) # In case you cannot read hex: # 0x14 = 20 (BitTorrent ID indicating that it's an extended message) - # 0x00 = 0 (Extension ID indicating that it's the handshake message) + # 0x00 = 0 (Extension ID indicating that it's the initiate_the_bittorrent_handshake message) self._writer.write(b"%b\x14%s" % ( # type: ignore (2 + len(msg_dict_dump)).to_bytes(4, "big"), b'\0' + msg_dict_dump @@ -140,7 +140,7 @@ class DisposablePeer: except bencode.BencodeDecodingError: # One might be tempted to close the connection, but why care? Any DisposableNode will be disposed # automatically anyway (after a certain amount of time if the metadata is still not complete). - logging.debug("Could NOT decode extension handshake message! %s", message[:200]) + logging.debug("Could NOT decode extension initiate_the_bittorrent_handshake message! %s", message[:200]) return try: @@ -166,7 +166,7 @@ class DisposablePeer: self.__metadata_size = metadata_size self.__ext_handshake_complete = True - # After the handshake is complete, request all the pieces of metadata + # After the initiate_the_bittorrent_handshake is complete, request all the pieces of metadata n_pieces = math.ceil(self.__metadata_size / (2 ** 14)) for piece in range(n_pieces): self.__request_metadata_piece(piece) diff --git a/magneticod/magneticod/bittorrent/codec.py b/magneticod/magneticod/bittorrent/codec.py deleted file mode 100644 index e69de29..0000000 diff --git a/magneticod/magneticod/bittorrent/protocol.py b/magneticod/magneticod/bittorrent/protocol.py deleted file mode 100644 index e69de29..0000000 diff --git a/magneticod/magneticod/bittorrent/service.py b/magneticod/magneticod/bittorrent/service.py deleted file mode 100644 index e69de29..0000000 diff --git a/magneticod/magneticod/bittorrent/transport.py b/magneticod/magneticod/bittorrent/transport.py deleted file mode 100644 index e69de29..0000000 diff --git a/magneticod/magneticod/codecs/bencode.py b/magneticod/magneticod/codecs/bencode.py new file mode 100644 index 0000000..6e123df --- /dev/null +++ b/magneticod/magneticod/codecs/bencode.py @@ -0,0 +1,64 @@ +# magneticod - Autonomous BitTorrent DHT crawler and metadata fetcher. +# Copyright (C) 2017 Mert Bora ALPER +# Dedicated to Cemile Binay, in whose hands I thrived. +# +# This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General +# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any +# later version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied +# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more +# details. +# +# You should have received a copy of the GNU Affero General Public License along with this program. If not, see +# . +import typing +import io + +import better_bencode + + +Message = typing.Dict[bytes, typing.Any] + + +def encode(message: Message) -> bytes: + try: + return better_bencode.dumps(message) + except Exception as exc: + raise EncodeError(exc) + + +def decode(data: typing.ByteString) -> Message: + try: + return better_bencode.loads(data) + except Exception as exc: + raise DecodeError(exc) + + +def decode_prefix(data: typing.ByteString) -> typing.Tuple[Message, int]: + """ + Returns the bencoded object AND the index where the dump of the decoded object ends (exclusive). In less words: + + dump = b"i12eOH YEAH" + object, i = decode_prefix(dump) + print(">>>", dump[i:]) # OUTPUT: >>> b'OH YEAH' + """ + bio = io.BytesIO(data) + try: + return better_bencode.load(bio), bio.tell() + except Exception as exc: + raise DecodeError(exc) + + +class BaseCodecError(Exception): + def __init__(self, original_exception: Exception): + self.original_exception = original_exception + + +class EncodeError(BaseCodecError): + pass + + +class DecodeError(BaseCodecError): + pass + diff --git a/magneticod/magneticod/dht/sink.py b/magneticod/magneticod/coordinator.py similarity index 55% rename from magneticod/magneticod/dht/sink.py rename to magneticod/magneticod/coordinator.py index 2896666..b2a6826 100644 --- a/magneticod/magneticod/dht/sink.py +++ b/magneticod/magneticod/coordinator.py @@ -12,8 +12,31 @@ # # You should have received a copy of the GNU Affero General Public License along with this program. If not, see # . +import asyncio + +from .dht import mainline +from . import bittorrent -class InfoHashSink: +class Coordinator: def __init__(self): + self._peer_service = mainline.service.PeerService() + + self._metadata_service_tasks = {} + + async def run(self): + await self._peer_service.launch(("0.0.0.0", 0)) + + # Private Functionality + # ===================== + def _when_peer(self, info_hash: mainline.protocol.InfoHash, address: mainline.transport.Address) \ + -> None: + if info_hash in self._metadata_service_tasks: + return + + self._metadata_service_tasks[info_hash] = asyncio.ensure_future() + + + + def _when_metadata(self, info_hash: mainline.protocol.InfoHash, address: mainline.transport.Address) -> None: pass diff --git a/magneticod/magneticod/dht/mainline/__init__.py b/magneticod/magneticod/dht/mainline/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/magneticod/magneticod/dht/mainline/codec.py b/magneticod/magneticod/dht/mainline/codec.py deleted file mode 100644 index 709e1e7..0000000 --- a/magneticod/magneticod/dht/mainline/codec.py +++ /dev/null @@ -1,28 +0,0 @@ -# magneticod - Autonomous BitTorrent DHT crawler and metadata fetcher. -# Copyright (C) 2017 Mert Bora ALPER -# Dedicated to Cemile Binay, in whose hands I thrived. -# -# This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General -# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any -# later version. -# -# This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied -# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more -# details. -# -# You should have received a copy of the GNU Affero General Public License along with this program. If not, see -# . -def encode(): - pass - - -def decode(data): - pass - - -class EncodeError(Exception): - pass - - -class DecodeError(Exception): - pass diff --git a/magneticod/magneticod/protocols/bittorrent.py b/magneticod/magneticod/protocols/bittorrent.py new file mode 100644 index 0000000..aff5f2a --- /dev/null +++ b/magneticod/magneticod/protocols/bittorrent.py @@ -0,0 +1,296 @@ +# magneticod - Autonomous BitTorrent DHT crawler and metadata fetcher. +# Copyright (C) 2017 Mert Bora ALPER +# Dedicated to Cemile Binay, in whose hands I thrived. +# +# This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General +# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any +# later version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied +# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more +# details. +# +# You should have received a copy of the GNU Affero General Public License along with this program. If not, see +# . +import enum +import typing + +import cerberus + +from magneticod import types +from ..codecs import bencode +from .. import transports + + +class Protocol: + def __init__(self): + self._transport = transports.bittorrent.TCPTransport() + + self._transport.on_the_bittorrent_handshake_completed = self._when_the_bittorrent_handshake_completed + self._transport.on_message = self._when_message + self._transport.on_keepalive = self._when_keepalive + + # When we initiate extension handshake, we set the keys of this dictionary with the ExtensionType's we show + # interest in, and if the remote peer is also interested in them, we record which type code the remote peer + # prefers for them in this dictionary, but until then, the keys have the None value. If our interest for an + # ExtensionType is not mutual, we remove the key from the dictionary. + self._enabled_extensions = {} # typing.Dict[ExtensionType, typing.Optional[int]] + + async def launch(self) -> None: + await self._transport.launch() + + # Offered Functionality + # ===================== + def initiate_the_bittorrent_handshake(self, reserved: bytes, info_hash: types.InfoHash, peer_id: types.PeerID) \ + -> None: + self._transport.initiate_the_bittorrent_handshake(reserved, info_hash, peer_id) + + def send_keepalive(self) -> None: + pass + + def send_choke_message(self) -> None: + raise NotImplementedError() + + def send_unchoke_message(self) -> None: + raise NotImplementedError() + + def send_interested_message(self) -> None: + raise NotImplementedError() + + def send_not_interested_message(self) -> None: + raise NotImplementedError() + + def send_have_message(self) -> None: + raise NotImplementedError() + + def send_bitfield_message(self) -> None: + raise NotImplementedError() + + def send_request_message(self) -> None: + raise NotImplementedError() + + def send_piece_message(self) -> None: + raise NotImplementedError() + + def send_cancel_message(self) -> None: + raise NotImplementedError() + + def send_extension_handshake( + self, + supported_extensions: typing.Set[ExtensionType], + local_port: int=-1, + name_and_version: str="magneticod 0.x.x" + ) -> None: + pass + + @staticmethod + def on_the_bittorrent_handshake_completed(reserved: bytes, info_hash: types.InfoHash, peer_id: types.PeerID) \ + -> None: + pass + + @staticmethod + def on_keepalive() -> None: + pass + + @staticmethod + def on_choke_message() -> None: + pass + + @staticmethod + def on_unchoke_message() -> None: + pass + + @staticmethod + def on_interested_message() -> None: + pass + + @staticmethod + def on_not_interested_message() -> None: + pass + + @staticmethod + def on_have_message(index: int) -> None: + pass + + @staticmethod + def on_bitfield_message() -> None: + raise NotImplementedError() + + @staticmethod + def on_request_message(index: int, begin: int, length: int) -> None: + pass + + @staticmethod + def on_piece_message(index: int, begin: int, piece: int) -> None: + pass + + @staticmethod + def on_cancel_message(index: int, begin: int, length: int) -> None: + pass + + @staticmethod + def on_extension_handshake_completed(payload: types.Dictionary) -> None: + pass + + # Private Functionality + # ===================== + def _when_the_bittorrent_handshake_completed( + self, + reserved: bytes, + info_hash: types.InfoHash, + peer_id: types.PeerID + ) -> None: + self.on_the_bittorrent_handshake_completed(reserved, info_hash, peer_id) + + def _when_keepalive(self) -> None: + self.on_keepalive() + + def _when_message(self, type_: bytes, payload: typing.ByteString) -> None: + if type_ == MessageTypes.CHOKE: + self.on_choke_message() + elif type_ == MessageTypes.UNCHOKE: + self.on_unchoke_message() + elif type_ == MessageTypes.INTERESTED: + self.on_interested_message() + elif type_ == MessageTypes.NOT_INTERESTED: + self.on_not_interested_message() + elif type_ == MessageTypes.HAVE: + index = int.from_bytes(payload[:4], "big") + self.on_have_message(index) + elif type_ == MessageTypes.BITFIELD: + raise NotImplementedError() + elif type_ == MessageTypes.REQUEST: + index = int.from_bytes(payload[:4], "big") + begin = int.from_bytes(payload[4:8], "big") + length = int.from_bytes(payload[8:12], "big") + self.on_request_message(index, begin, length) + elif type_ == MessageTypes.PIECE: + index = int.from_bytes(payload[:4], "big") + begin = int.from_bytes(payload[4:8], "big") + piece = int.from_bytes(payload[8:12], "big") + self.on_piece_message(index, begin, piece) + elif type_ == MessageTypes.CANCEL: + index = int.from_bytes(payload[:4], "big") + begin = int.from_bytes(payload[4:8], "big") + length = int.from_bytes(payload[8:12], "big") + self.on_cancel_message(index, begin, length) + elif type_ == MessageTypes.EXTENDED: + self._when_extended_message(type_=payload[:1], payload=payload[1:]) + else: + pass + + def _when_extended_message(self, type_: bytes, payload: typing.ByteString) -> None: + if type_ == 0: + self._when_extension_handshake(payload) + elif type_ == ExtensionType.UT_METADATA.value: + pass + else: + pass + + def _when_extension_handshake(self, payload: typing.ByteString) -> None: + dictionary_schema = { + b"m": { + "type": "dict", + "keyschema": {"type": "binary", "empty": False}, + "valueschema": {"type": "integer", "min": 0}, + "required": True, + }, + b"p": { + "type": "integer", + "min": 1, + "max": 2**16 - 1, + "required": False + }, + b"v": { + "type": "binary", + "empty": False, + "required": False + }, + b"yourip": { + "type": "binary", + # It's actually EITHER 4 OR 16, not anything in-between. We need to validate this ourselves! + "minlength": 4, + "maxlength": 16, + "required": False + }, + b"ipv6": { + "type": "binary", + "minlength": 16, + "maxlength": 16, + "required": False + }, + b"ipv4": { + "type": "binary", + "minlength": 4, + "maxlength": 4, + "required": False + }, + b"reqq": { + "type": "integer", + "min": 0, + "required": False + } + } + + try: + dictionary = bencode.decode(payload) + except bencode.DecodeError: + return + + if not cerberus.Validator(dictionary_schema).validate(dictionary): + return + + # Check which extensions, that we show interest in, are enabled by the remote peer. + if ExtensionType.UT_METADATA in self._enabled_extensions and b"ut_metadata" in dictionary[b"m"]: + self._enabled_extensions[ExtensionType.UT_METADATA] = dictionary[b"m"][b"ut_metadata"] + + # As there can be multiple SUBSEQUENT extension-handshake-messages, check for the existence of b"metadata_size" + # in the top level dictionary ONLY IF b"ut_metadata" exists in b"m". `ut_metadata` might be enabled before, + # and other subsequent extension-handshake-messages do not need to include 'metadata_size` field in the top + # level dictionary whilst enabling-and/or-disabling other extension features. + if (b"ut_metadata" in dictionary[b"m"]) ^ (b"metadata_size" not in dictionary): + return + + self.on_extension_handshake_completed(dictionary) + +@enum.unique +class MessageTypes(enum.IntEnum): + CHOKE = 0 + UNCHOKE = 1 + INTERESTED = 2 + NOT_INTERESTED = 3 + HAVE = 4 + BITFIELD = 5 + REQUEST = 6 + PIECE = 7 + CANCEL = 8 + EXTENDED = 20 + + +@enum.unique +class ReservedFeature(enum.Enum): + # What do values mean? + # The first number is the offset, and the second number is the bit to set. For instance, + # EXTENSION_PROTOCOL = (5, 0x10) means that reserved[5] & 0x10 should be true. + DHT = (7, 0x01) + EXTENSION_PROTOCOL = (5, 0x10) + + +def reserved_feature_set_to_reserved(reserved_feature_set: typing.Set[ReservedFeature]) -> bytes: + reserved = 8 * b"\x00" + for reserved_feature in reserved_feature_set: + reserved[reserved_feature.value[0]] |= reserved_feature.value[1] + return reserved + + +def reserved_to_reserved_feature_set(reserved: bytes) -> typing.Set[ReservedFeature]: + return { + reserved_feature + for reserved_feature in ReservedFeature + if reserved[reserved_feature.value[0]] & reserved_feature.value[1] + } + + +@enum.unique +class ExtensionType(enum.IntEnum): + UT_METADATA = 1 diff --git a/magneticod/magneticod/dht/mainline/protocol.py b/magneticod/magneticod/protocols/mainline.py similarity index 100% rename from magneticod/magneticod/dht/mainline/protocol.py rename to magneticod/magneticod/protocols/mainline.py diff --git a/magneticod/magneticod/services/bittorrent.py b/magneticod/magneticod/services/bittorrent.py new file mode 100644 index 0000000..2684ab5 --- /dev/null +++ b/magneticod/magneticod/services/bittorrent.py @@ -0,0 +1,63 @@ +# magneticod - Autonomous BitTorrent DHT crawler and metadata fetcher. +# Copyright (C) 2017 Mert Bora ALPER +# Dedicated to Cemile Binay, in whose hands I thrived. +# +# This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General +# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any +# later version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied +# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more +# details. +# +# You should have received a copy of the GNU Affero General Public License along with this program. If not, see +# . +import logging + +from ..protocols import bittorrent as bt_protocol +from magneticod import types + + +class MetadataService: + def __init__(self, peer_id: types.PeerID, info_hash: types.InfoHash): + self._protocol = bt_protocol.Protocol() + + self._protocol.on_the_bittorrent_handshake_completed = self._when_the_bittorrent_handshake_completed + self._protocol.on_extension_handshake_completed = self._when_extension_handshake_completed + + self._peer_id = peer_id + self._info_hash = info_hash + + async def launch(self) -> None: + await self._protocol.launch() + + self._protocol.initiate_the_bittorrent_handshake( + bt_protocol.reserved_feature_set_to_reserved({ + bt_protocol.ReservedFeature.EXTENSION_PROTOCOL, + bt_protocol.ReservedFeature.DHT + }), + self._info_hash, + self._peer_id + ) + + # Offered Functionality + # ===================== + @staticmethod + def on_fatal_failure() -> None: + pass + + # Private Functionality + # ===================== + def _when_the_bittorrent_handshake_completed( + self, + reserved: bytes, + info_hash: types.InfoHash, + peer_id: types.PeerID + ) -> None: + if bt_protocol.ReservedFeature.EXTENSION_PROTOCOL not in bt_protocol.reserved_to_reserved_feature_set(reserved): + logging.info("Peer does NOT support the extension protocol.") + self.on_fatal_failure() + self._protocol.send_extension_handshake({bt_protocol.ExtensionType.UT_METADATA}) + + def _when_extension_handshake_completed(self, payload: types.Dictionary) -> None: + pass diff --git a/magneticod/magneticod/dht/mainline/service.py b/magneticod/magneticod/services/dht.py similarity index 93% rename from magneticod/magneticod/dht/mainline/service.py rename to magneticod/magneticod/services/dht.py index f6dea09..90257b5 100644 --- a/magneticod/magneticod/dht/mainline/service.py +++ b/magneticod/magneticod/services/dht.py @@ -22,7 +22,7 @@ from magneticod import constants from . import protocol -class TrawlingService: +class PeerService: def __init__(self): self._protocol = protocol.Protocol(b"mc00") @@ -34,18 +34,21 @@ class TrawlingService: self._token_secret = os.urandom(4) self._routing_table = {} # typing.Dict[protocol.NodeID, protocol.transport.Address] + self._tick_task = None + async def launch(self, address: protocol.transport.Address): await self._protocol.launch(address) + self._tick_task = asyncio.ensure_future(self._tick_periodically()) # Offered Functionality # ===================== @staticmethod - def on_info_hash_and_peer(info_hash: protocol.InfoHash, address: protocol.transport.Address) -> None: + def on_peer(info_hash: protocol.InfoHash, address: protocol.transport.Address) -> None: pass # Private Functionality # ===================== - async def tick_periodically(self) -> None: + async def _tick_periodically(self) -> None: while True: if not self._routing_table: await self._bootstrap() diff --git a/magneticod/magneticod/transports/bittorrent.py b/magneticod/magneticod/transports/bittorrent.py new file mode 100644 index 0000000..19c0e1a --- /dev/null +++ b/magneticod/magneticod/transports/bittorrent.py @@ -0,0 +1,102 @@ +# magneticod - Autonomous BitTorrent DHT crawler and metadata fetcher. +# Copyright (C) 2017 Mert Bora ALPER +# Dedicated to Cemile Binay, in whose hands I thrived. +# +# This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General +# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any +# later version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied +# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more +# details. +# +# You should have received a copy of the GNU Affero General Public License along with this program. If not, see +# . +import asyncio +import typing + +from magneticod import types + + +class TCPTransport(asyncio.Protocol): + def __init__(self): + self._stream_transport = asyncio.Transport() + + self._incoming = bytearray() + + self._awaiting_the_bittorrent_handshake = True + + async def launch(self): + await asyncio.get_event_loop().create_connection(lambda: self, "0.0.0.0", 0) + + # Offered Functionality + # ===================== + def initiate_the_bittorrent_handshake(self, reserved: bytes, info_hash: types.InfoHash, peer_id: types.PeerID) -> None: + self._stream_transport.write(b"\x13BitTorrent protocol%s%s%s" % ( + reserved, + info_hash, + peer_id + )) + + def send_keepalive(self) -> None: + self._stream_transport.write(b"\x00\x00\x00\x00") + + def send_message(self, type_: bytes, payload: typing.ByteString) -> None: + if len(type_) != 1: + raise ValueError("Argument `type_` must be a single byte!") + length = 1 + len(payload) + + @staticmethod + def on_keepalive() -> None: + pass + + @staticmethod + def on_message(type_: bytes, payload: typing.ByteString) -> None: + pass + + @staticmethod + def on_the_bittorrent_handshake_completed( + reserved: typing.ByteString, + info_hash: types.InfoHash, + peer_id: types.PeerID + ) -> None: + pass + + # Private Functionality + # ===================== + def connection_made(self, transport: asyncio.Transport) -> None: + self._stream_transport = transport + + def data_received(self, data: typing.ByteString) -> None: + self._incoming += data + + if self._awaiting_the_bittorrent_handshake: + if len(self._incoming) >= 68: + assert self._incoming.startswith(b"\x13BitTorrent protocol") + self.on_the_bittorrent_handshake_completed( + reserved=self._incoming[20:28], + info_hash=self._incoming[28:48], + peer_id=self._incoming[48:68] + ) + self._incoming = self._incoming[68:] + self._awaiting_the_bittorrent_handshake = False + else: + return + + # Continue or Start the "usual" processing from here below + + if len(self._incoming) >= 4 and len(self._incoming) - 1 >= int.from_bytes(self._incoming[:4], "big"): + if int.from_bytes(self._incoming[:4], "big"): + self.on_keepalive() + else: + self.on_message(self._incoming[4], self._incoming[5:]) + + def eof_received(self): + pass + + def connection_lost(self, exc: Exception) -> None: + pass + + +class UTPTransport: + pass diff --git a/magneticod/magneticod/dht/mainline/transport.py b/magneticod/magneticod/transports/mainline.py similarity index 90% rename from magneticod/magneticod/dht/mainline/transport.py rename to magneticod/magneticod/transports/mainline.py index 9678688..4829f29 100644 --- a/magneticod/magneticod/dht/mainline/transport.py +++ b/magneticod/magneticod/transports/mainline.py @@ -24,8 +24,8 @@ from . import codec Address = typing.Tuple[str, int] MessageQueueEntry = typing.NamedTuple("MessageQueueEntry", [ - ("queued_on", float), - ("message", bytes), + ("queued_on", int), + ("message", codec.Message), ("address", Address) ]) @@ -50,13 +50,13 @@ class Transport(asyncio.DatagramProtocol): # Offered Functionality # ===================== - def send_message(self, message, address: Address) -> None: - self._message_queue.append(MessageQueueEntry(time.monotonic(), message, address)) + def send_message(self, message: codec.Message, address: Address) -> None: + self._message_queue.append(MessageQueueEntry(int(time.monotonic()), message, address)) if not self._queue_nonempty.is_set(): self._queue_nonempty.set() @staticmethod - def on_message(message: dict, address: Address): + def on_message(message: codec.Message, address: Address): pass # Private Functionality @@ -73,7 +73,7 @@ class Transport(asyncio.DatagramProtocol): try: message = codec.decode(data) - except codec.EncodeError: + except codec.DecodeError: return if not isinstance(message, dict): @@ -111,4 +111,4 @@ class Transport(asyncio.DatagramProtocol): if time.monotonic() - queued_on > 60: return - self._datagram_transport.sendto(message, address) + self._datagram_transport.sendto(codec.encode(message), address) diff --git a/magneticod/magneticod/types.py b/magneticod/magneticod/types.py new file mode 100644 index 0000000..90228bd --- /dev/null +++ b/magneticod/magneticod/types.py @@ -0,0 +1,8 @@ +import typing + +InfoHash = typing.ByteString +NodeID = typing.ByteString +PeerID = typing.ByteString +IPAddress = typing.Tuple[str, int] + +Dictionary = typing.Dict[bytes, typing.Any] \ No newline at end of file