From 57d466a666c68fa10cc0269897a05eae5ccf4a8a Mon Sep 17 00:00:00 2001 From: "Bora M. Alper" Date: Fri, 14 Jul 2017 21:19:07 +0300 Subject: [PATCH] d/dht/mainline/protocol completed --- magneticod/magneticod/dht/mainline/codec.py | 14 + .../magneticod/dht/mainline/protocol.py | 284 ++++++++++++++++-- magneticod/magneticod/dht/mainline/service.py | 14 + .../magneticod/dht/mainline/transport.py | 14 + magneticod/setup.py | 3 +- 5 files changed, 308 insertions(+), 21 deletions(-) diff --git a/magneticod/magneticod/dht/mainline/codec.py b/magneticod/magneticod/dht/mainline/codec.py index 6170537..709e1e7 100644 --- a/magneticod/magneticod/dht/mainline/codec.py +++ b/magneticod/magneticod/dht/mainline/codec.py @@ -1,3 +1,17 @@ +# magneticod - Autonomous BitTorrent DHT crawler and metadata fetcher. +# Copyright (C) 2017 Mert Bora ALPER +# Dedicated to Cemile Binay, in whose hands I thrived. +# +# This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General +# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any +# later version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied +# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more +# details. +# +# You should have received a copy of the GNU Affero General Public License along with this program. If not, see +# . def encode(): pass diff --git a/magneticod/magneticod/dht/mainline/protocol.py b/magneticod/magneticod/dht/mainline/protocol.py index 155442d..d20eb26 100644 --- a/magneticod/magneticod/dht/mainline/protocol.py +++ b/magneticod/magneticod/dht/mainline/protocol.py @@ -1,42 +1,126 @@ +# magneticod - Autonomous BitTorrent DHT crawler and metadata fetcher. +# Copyright (C) 2017 Mert Bora ALPER +# Dedicated to Cemile Binay, in whose hands I thrived. +# +# This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General +# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any +# later version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied +# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more +# details. +# +# You should have received a copy of the GNU Affero General Public License along with this program. If not, see +# . +import asyncio +import enum +import functools import typing +import cerberus + from . import transport class Protocol: - def __init__(self): - pass + def __init__(self, *, client_version: bytes=b"mc00"): + self.client_version = client_version + self.transport = transport.Transport() + + self.transport.on_message = functools.partial(self.__when_message, self) + + async def launch(self, address: transport.Address): + await asyncio.get_event_loop().create_datagram_endpoint(lambda: self.transport, local_addr=address) # Offered Functionality # ===================== - def on_ping_query(self, query: PingQuery) -> typing.Optional[typing.Union[PingResponse, Error]]: + @staticmethod + def on_ping_query(query: PingQuery) -> typing.Optional[typing.Union[PingResponse, Error]]: pass - def on_find_node_query(self, query: FindNodeQuery) -> typing.Optional[typing.Union[FindNodeResponse, Error]]: + @staticmethod + def on_find_node_query(query: FindNodeQuery) -> typing.Optional[typing.Union[FindNodeResponse, Error]]: pass - def on_get_peers_query(self, query: GetPeersQuery) -> typing.Optional[typing.Union[GetPeersQuery, Error]]: + @staticmethod + def on_get_peers_query(query: GetPeersQuery) -> typing.Optional[typing.Union[GetPeersQuery, Error]]: pass - def on_announce_peer_query(self, query: AnnouncePeerQuery) -> typing.Optional[typing.Union[AnnouncePeerResponse, Error]]: + @staticmethod + def on_announce_peer_query(query: AnnouncePeerQuery) -> typing.Optional[typing.Union[AnnouncePeerResponse, Error]]: pass - def on_ping_OR_announce_peer_response(self, response: PingResponse) -> None: + @staticmethod + def on_ping_OR_announce_peer_response(response: PingResponse) -> None: pass - def on_find_node_response(self, response: FindNodeResponse) -> None: + @staticmethod + def on_find_node_response(response: FindNodeResponse) -> None: pass - def on_get_peers_response(self, response: GetPeersResponse) -> None: + @staticmethod + def on_get_peers_response(response: GetPeersResponse) -> None: pass - def on_error(self, response: Error) -> None: + @staticmethod + def on_error(error: Error) -> None: pass # Private Functionality # ===================== - def when_message_received(self, message): - pass + def __when_message(self, message: typing.Dict[bytes, typing.Any], address: transport.Address) -> None: + # We need to ignore unknown fields in the messages, in consideration of forward-compatibility, but that also + # requires us to be careful about the "order" we are following. For instance, every single query can also be + # misunderstood as a ping query, since they all have `id` as an argument. Hence, we start validating against the + # query/response type that is most distinguishing against all other. + + if BaseQuery.validate_message(message): + args = message[b"a"] + if AnnouncePeerQuery.validate_message(message): + response = self.on_announce_peer_query(AnnouncePeerQuery( + args[b"id"], args[b"info_hash"], args[b"port"], args[b"token"], args[b"implied_port"] + )) + elif GetPeersQuery.validate_message(message): + response = self.on_get_peers_query(GetPeersQuery(args[b"id"], args[b"info_hash"])) + elif FindNodeQuery.validate_message(message): + response = self.on_find_node_query(FindNodeQuery(args[b"id"], args[b"target"])) + elif PingQuery.validate_message(message): + response = self.on_ping_query(PingQuery(args[b"id"])) + else: + # Unknown Query received! + response = None + if response: + self.transport.send_message(response.to_message(message[b"t"], self.client_version), address) + + elif BaseResponse.validate_message(message): + return_values = message[b"r"] + if GetPeersResponse.validate_message(message): + if b"nodes" in return_values: + self.on_get_peers_response(GetPeersResponse( + return_values[b"id"], return_values[b"token"], nodes=return_values[b"nodes"] + )) + else: + self.on_get_peers_response(GetPeersResponse( + return_values[b"id"], return_values[b"token"], values=return_values[b"values"] + )) + elif FindNodeResponse.validate_message(message): + self.on_find_node_response(FindNodeResponse(return_values[b"id"], return_values[b"nodes"])) + elif PingResponse.validate_message(message): + self.on_ping_OR_announce_peer_response(PingResponse(return_values[b"id"])) + else: + # Unknown Response received! + pass + + elif Error.validate_message(message): + if Error.validate_message(message): + self.on_error(Error(message[b"e"][0], message[b"e"][1])) + else: + # Erroneous Error received! + pass + + else: + # Unknown message received! + pass NodeID = typing.NewType("NodeID", bytes) @@ -49,11 +133,15 @@ NodeInfo = typing.NamedTuple("NodeInfo", [ class BaseQuery: method_name = b"" + _arguments_schema = { + b"id": {"type": "binary", "minlength": 20, "maxlength": 20, "required": True} + } + __validator = cerberus.Validator() def __init__(self, id_: NodeID): self.id = id_ - def to_message(self, *, transaction_id: bytes, client_version: bytes=b"") -> typing.Dict[bytes, typing.Any]: + def to_message(self, transaction_id: bytes, client_version: bytes) -> typing.Dict[bytes, typing.Any]: return { b"t": transaction_id, b"y": b"q", @@ -62,6 +150,23 @@ class BaseQuery: b"a": self.__dict__ } + @classmethod + def validate_message(cls, message: typing.Dict[bytes, typing.Any]) -> bool: + if cls.__validator.schema is None: + cls.__validator.schema = cls.__get_message_schema() + + return cls.__validator.validate(message) + + @classmethod + def __get_message_schema(cls): + return { + b"t": {"type": "binary", "empty": False, "required": True}, + b"y": {"type": "binary", "empty": False, "required": True}, + b"v": {"type": "binary", "empty": False, "required": False}, + b"q": {"type": "binary", "empty": False, "required": True}, + b"a": cls._arguments_schema + } + class PingQuery(BaseQuery): method_name = b"ping" @@ -72,6 +177,10 @@ class PingQuery(BaseQuery): class FindNodeQuery(BaseQuery): method_name = b"find_node" + _arguments_schema = { + **super()._arguments_schema, + b"target": {"type": "binary", "minlength": 20, "maxlength": 20, "required": True} + } def __init__(self, id_: NodeID, target: NodeID): super().__init__(id_) @@ -80,6 +189,10 @@ class FindNodeQuery(BaseQuery): class GetPeersQuery(BaseQuery): method_name = b"get_peers" + _arguments_schema = { + **super()._arguments_schema, + b"info_hash": {"type": "binary", "minlength": 20, "maxlength": 20, "required": True} + } def __init__(self, id_: NodeID, info_hash: InfoHash): super().__init__(id_) @@ -88,6 +201,13 @@ class GetPeersQuery(BaseQuery): class AnnouncePeerQuery(BaseQuery): method_name = b"announce_peer" + _arguments_schema = { + **super()._arguments_schema, + b"info_hash": {"type": "binary", "minlength": 20, "maxlength": 20, "required": True}, + b"port": {"type": "integer", "min": 1, "max": 2**16 - 1, "required": True}, + b"token": {"type": "binary", "empty": False, "required": True}, + b"implied_port": {"type": "integer", "required": False} + } def __init__(self, id_: NodeID, info_hash: InfoHash, port: int, token: bytes, implied_port: int=0): super().__init__(id_) @@ -98,10 +218,15 @@ class AnnouncePeerQuery(BaseQuery): class BaseResponse: + _return_values_schema = { + b"id": {"type": "binary", "minlength": 20, "maxlength": 20, "required": True} + } + __validator = cerberus.Validator() + def __init__(self, id_: NodeID): self.id = id_ - def to_message(self, *, transaction_id: bytes, client_version: bytes = b"") -> typing.Dict[bytes, typing.Any]: + def to_message(self, transaction_id: bytes, client_version: bytes) -> typing.Dict[bytes, typing.Any]: return { b"t": transaction_id, b"y": b"r", @@ -109,9 +234,25 @@ class BaseResponse: b"r": self._return_values() } + @classmethod + def validate_message(cls, message: typing.Dict[bytes, typing.Any]) -> bool: + if cls.__validator.schema is None: + cls.__validator.schema = cls.__get_message_schema() + + return cls.__validator.validate(message) + def _return_values(self) -> typing.Dict[bytes, typing.Any]: return {b"id": self.id} + @classmethod + def __get_message_schema(cls): + return { + b"t": {"type": "binary", "empty": False, "required": True}, + b"y": {"type": "binary", "empty": False, "required": True}, + b"v": {"type": "binary", "empty": False, "required": False}, + b"r": cls._return_values_schema + } + class PingResponse(BaseResponse): def __init__(self, id_: NodeID): @@ -119,28 +260,131 @@ class PingResponse(BaseResponse): class FindNodeResponse(BaseResponse): + _return_values_schema = { + **super()._return_values_schema, + b"nodes": {"type": "binary", "required": True} + } + __validator = cerberus.Validator() + def __init__(self, id_: NodeID, nodes: typing.List[NodeInfo]): super().__init__(id_) self.nodes = nodes + @classmethod + def validate_message(cls, message: typing.Dict[bytes, typing.Any]) -> bool: + if cls.__validator.schema is None: + cls.__validator.schema = cls.__get_message_schema() + + if not cls.__validator.validate(message): + return False + + # Unfortunately, Cerberus cannot check some fine details. + # For instance, the length of the `nodes` field in the return values of the response message has to be a + # multiple of 26, as "contact information for nodes is encoded as a 26-byte string" (BEP 5). + if not message[b"r"][b"nodes"] % 26 == 0: + return False + + return True + def _return_values(self) -> typing.Dict[bytes, typing.Any]: - d = super()._return_values() - d.update({ + return { + **super()._return_values(), b"nodes": self.nodes # TODO: this is not right obviously, encode & decode! - }) - return d + } class GetPeersResponse(BaseResponse): - def __init__(self, id_: NodeID, token: bytes, *, values, nodes: typing.Optional[typing.List[NodeInfo]]=None): - assert bool(values) ^ bool(nodes) + _return_values_schema = { + **super()._return_values_schema, + b"token": {"type": "binary", "empty": False, "required": True}, + b"values": { + "type": "list", + "schema": {"type": "binary", "minlength": 6, "maxlength": 6}, + "excludes": b"nodes", + "empty": False, + "require": True + }, + b"nodes": {"type": "binary", "excludes": b"values", "empty": True, "require": True} + } + __validator = cerberus.Validator() + + def __init__(self, id_: NodeID, token: bytes, *, values: typing.Optional[typing.List[bytes]]=None, + nodes: typing.Optional[typing.List[NodeInfo]]=None + ): + if not bool(values) ^ bool(nodes): + raise ValueError("Supply either `values` or `nodes` but not both or neither.") super().__init__(id_) self.token = token self.values = values, self.nodes = nodes + @classmethod + def validate_message(cls, message: typing.Dict[bytes, typing.Any]) -> bool: + if cls.__validator.schema is None: + cls.__validator.schema = cls.__get_message_schema() + + if not cls.__validator.validate(message): + return False + + # Unfortunately, Cerberus cannot check some fine details. + # For instance, the length of the `nodes` field in the return values of the response message has to be a + # multiple of 26, as "contact information for nodes is encoded as a 26-byte string" (BEP 5). + if b"nodes" in message[b"r"] ^ message[b"r"][b"nodes"] % 26 == 0: + return False + + return True + class AnnouncePeerResponse(BaseResponse): def __init__(self, id_: NodeID): super().__init__(id_) + + +@enum.unique +class ErrorCodes(enum.IntEnum): + GENERIC = 201 + SERVER = 202 + PROTOCOL = 203 + METHOD_UNKNOWN = 204 + + +class Error: + __validator = cerberus.Validator() + + def __init__(self, code: ErrorCodes, error_message: bytes): + self.code = code + self.error_message = error_message + + def to_message(self, transaction_id: bytes, client_version: bytes) -> typing.Dict[bytes, typing.Any]: + return { + b"t": transaction_id, + b"y": b"e", + b"v": client_version, + b"e": [self.code, self.error_message] + } + + @classmethod + def validate_message(cls, message: typing.Dict[bytes, typing.Any]) -> bool: + if cls.__validator.schema is None: + cls.__validator.schema = cls.__get_message_schema() + + if not cls.__validator.validate(message): + return False + + # Unfortunately, Cerberus cannot check some fine details. + # For instance, the `e` field of the error message should be an array with first element being an integer, and + # the second element being a (binary) string. + if not (isinstance(message[b"e"], int) and isinstance(message[b"e"], bytes)): + return False + + return True + + @classmethod + def __get_message_schema(cls): + return { + b"t": {"type": "binary", "empty": False, "required": True}, + b"y": {"type": "binary", "empty": False, "required": True}, + b"v": {"type": "binary", "empty": False, "required": False}, + b"e": {"type": "list", "minlength": 2, "maxlength": 2, "required": True} + } diff --git a/magneticod/magneticod/dht/mainline/service.py b/magneticod/magneticod/dht/mainline/service.py index e69de29..1178873 100644 --- a/magneticod/magneticod/dht/mainline/service.py +++ b/magneticod/magneticod/dht/mainline/service.py @@ -0,0 +1,14 @@ +# magneticod - Autonomous BitTorrent DHT crawler and metadata fetcher. +# Copyright (C) 2017 Mert Bora ALPER +# Dedicated to Cemile Binay, in whose hands I thrived. +# +# This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General +# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any +# later version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied +# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more +# details. +# +# You should have received a copy of the GNU Affero General Public License along with this program. If not, see +# . diff --git a/magneticod/magneticod/dht/mainline/transport.py b/magneticod/magneticod/dht/mainline/transport.py index 5cdc6ab..c097a85 100644 --- a/magneticod/magneticod/dht/mainline/transport.py +++ b/magneticod/magneticod/dht/mainline/transport.py @@ -1,3 +1,17 @@ +# magneticod - Autonomous BitTorrent DHT crawler and metadata fetcher. +# Copyright (C) 2017 Mert Bora ALPER +# Dedicated to Cemile Binay, in whose hands I thrived. +# +# This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General +# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any +# later version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied +# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more +# details. +# +# You should have received a copy of the GNU Affero General Public License along with this program. If not, see +# . import asyncio import collections import logging diff --git a/magneticod/setup.py b/magneticod/setup.py index 4ac2c56..d655461 100644 --- a/magneticod/setup.py +++ b/magneticod/setup.py @@ -11,7 +11,8 @@ def run_setup(): install_requirements = [ "appdirs >= 1.4.3", "humanfriendly", - "better_bencode >= 0.2.1" + "better_bencode >= 0.2.1", + "cerberus >= 1.1" ] if sys.platform in ["linux", "darwin"]: