diff --git a/magneticod/magneticod/bittorrent/codec.py b/magneticod/magneticod/bittorrent/codec.py new file mode 100644 index 0000000..e69de29 diff --git a/magneticod/magneticod/bittorrent/protocol.py b/magneticod/magneticod/bittorrent/protocol.py new file mode 100644 index 0000000..e69de29 diff --git a/magneticod/magneticod/bittorrent/service.py b/magneticod/magneticod/bittorrent/service.py new file mode 100644 index 0000000..e69de29 diff --git a/magneticod/magneticod/bittorrent/transport.py b/magneticod/magneticod/bittorrent/transport.py new file mode 100644 index 0000000..e69de29 diff --git a/magneticod/magneticod/dht/mainline/__init__.py b/magneticod/magneticod/dht/mainline/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/magneticod/magneticod/dht/mainline/codec.py b/magneticod/magneticod/dht/mainline/codec.py new file mode 100644 index 0000000..6170537 --- /dev/null +++ b/magneticod/magneticod/dht/mainline/codec.py @@ -0,0 +1,14 @@ +def encode(): + pass + + +def decode(data): + pass + + +class EncodeError(Exception): + pass + + +class DecodeError(Exception): + pass diff --git a/magneticod/magneticod/dht/mainline/protocol.py b/magneticod/magneticod/dht/mainline/protocol.py new file mode 100644 index 0000000..155442d --- /dev/null +++ b/magneticod/magneticod/dht/mainline/protocol.py @@ -0,0 +1,146 @@ +import typing + +from . import transport + + +class Protocol: + def __init__(self): + pass + + # Offered Functionality + # ===================== + def on_ping_query(self, query: PingQuery) -> typing.Optional[typing.Union[PingResponse, Error]]: + pass + + def on_find_node_query(self, query: FindNodeQuery) -> typing.Optional[typing.Union[FindNodeResponse, Error]]: + pass + + def on_get_peers_query(self, query: GetPeersQuery) -> typing.Optional[typing.Union[GetPeersQuery, Error]]: + pass + + def on_announce_peer_query(self, query: AnnouncePeerQuery) -> typing.Optional[typing.Union[AnnouncePeerResponse, Error]]: + pass + + def on_ping_OR_announce_peer_response(self, response: PingResponse) -> None: + pass + + def on_find_node_response(self, response: FindNodeResponse) -> None: + pass + + def on_get_peers_response(self, response: GetPeersResponse) -> None: + pass + + def on_error(self, response: Error) -> None: + pass + + # Private Functionality + # ===================== + def when_message_received(self, message): + pass + + +NodeID = typing.NewType("NodeID", bytes) +InfoHash = typing.NewType("InfoHash", bytes) +NodeInfo = typing.NamedTuple("NodeInfo", [ + ("id", NodeID), + ("address", transport.Address), +]) + + +class BaseQuery: + method_name = b"" + + def __init__(self, id_: NodeID): + self.id = id_ + + def to_message(self, *, transaction_id: bytes, client_version: bytes=b"") -> typing.Dict[bytes, typing.Any]: + return { + b"t": transaction_id, + b"y": b"q", + b"v": client_version, + b"q": self.method_name, + b"a": self.__dict__ + } + + +class PingQuery(BaseQuery): + method_name = b"ping" + + def __init__(self, id_: NodeID): + super().__init__(id_) + + +class FindNodeQuery(BaseQuery): + method_name = b"find_node" + + def __init__(self, id_: NodeID, target: NodeID): + super().__init__(id_) + self.target = target + + +class GetPeersQuery(BaseQuery): + method_name = b"get_peers" + + def __init__(self, id_: NodeID, info_hash: InfoHash): + super().__init__(id_) + self.info_hash = info_hash + + +class AnnouncePeerQuery(BaseQuery): + method_name = b"announce_peer" + + def __init__(self, id_: NodeID, info_hash: InfoHash, port: int, token: bytes, implied_port: int=0): + super().__init__(id_) + self.info_hash = info_hash + self.port = port + self.token = token + self.implied_port = implied_port + + +class BaseResponse: + def __init__(self, id_: NodeID): + self.id = id_ + + def to_message(self, *, transaction_id: bytes, client_version: bytes = b"") -> typing.Dict[bytes, typing.Any]: + return { + b"t": transaction_id, + b"y": b"r", + b"v": client_version, + b"r": self._return_values() + } + + def _return_values(self) -> typing.Dict[bytes, typing.Any]: + return {b"id": self.id} + + +class PingResponse(BaseResponse): + def __init__(self, id_: NodeID): + super().__init__(id_) + + +class FindNodeResponse(BaseResponse): + def __init__(self, id_: NodeID, nodes: typing.List[NodeInfo]): + super().__init__(id_) + self.nodes = nodes + + def _return_values(self) -> typing.Dict[bytes, typing.Any]: + d = super()._return_values() + d.update({ + b"nodes": self.nodes # TODO: this is not right obviously, encode & decode! + }) + return d + + +class GetPeersResponse(BaseResponse): + def __init__(self, id_: NodeID, token: bytes, *, values, nodes: typing.Optional[typing.List[NodeInfo]]=None): + assert bool(values) ^ bool(nodes) + + super().__init__(id_) + self.token = token + self.values = values, + self.nodes = nodes + + +class AnnouncePeerResponse(BaseResponse): + def __init__(self, id_: NodeID): + super().__init__(id_) diff --git a/magneticod/magneticod/dht/mainline/service.py b/magneticod/magneticod/dht/mainline/service.py new file mode 100644 index 0000000..e69de29 diff --git a/magneticod/magneticod/dht/mainline/transport.py b/magneticod/magneticod/dht/mainline/transport.py new file mode 100644 index 0000000..5cdc6ab --- /dev/null +++ b/magneticod/magneticod/dht/mainline/transport.py @@ -0,0 +1,95 @@ +import asyncio +import collections +import logging +import sys +import time +import typing + +from . import codec + +Address = typing.Tuple[str, int] + +MessageQueueEntry = typing.NamedTuple("MessageQueueEntry", [ + ("queued_on", float), + ("message", bytes), + ("address", Address) +]) + + +class Transport(asyncio.DatagramProtocol): + """ + Mainline DHT Transport + + The signature `class Transport(asyncio.DatagramProtocol)` seems almost oxymoron, but it's indeed more sensible than + it first seems. `Transport` handles ALL that is related to transporting messages, which includes receiving them + (`asyncio.DatagramProtocol.datagram_received`), sending them (`asyncio.DatagramTransport.send_to`), pausing and + resuming writing as requested by the asyncio, and also handling operational errors. + """ + + def __init__(self): + super().__init__() + self.__datagram_transport = asyncio.DatagramTransport() + self.__write_allowed = asyncio.Event() + self.__queue_nonempty = asyncio.Event() + self.__message_queue = collections.deque() # type: typing.Deque[MessageQueueEntry] + self.__messenger_task = asyncio.Task(self.__send_messages()) + + # Offered Functionality + # ===================== + def send_message(self, message, address: Address) -> None: + self.__message_queue.append(MessageQueueEntry(time.monotonic(), message, address)) + if not self.__queue_nonempty.is_set(): + self.__queue_nonempty.set() + + @staticmethod + def on_message(message: dict, address: Address): + pass + + # Private Functionality + # ===================== + def connection_made(self, transport: asyncio.DatagramTransport) -> None: + self.__datagram_transport = transport + self.__write_allowed.set() + + def datagram_received(self, data: bytes, address: Address) -> None: + try: + message = codec.decode(data) + except codec.EncodeError: + return + + if not isinstance(message, dict): + return + + self.on_message(message, address) + + def error_received(self, exc: OSError): + logging.debug("Mainline DHT received error!", exc_info=exc) + + def pause_writing(self): + self.__write_allowed.clear() + + def resume_writing(self): + self.__write_allowed.set() + + def connection_lost(self, exc: Exception): + if exc: + logging.fatal("Mainline DHT lost connection! (See the following log entry for the exception.)", + exc_info=exc + ) + else: + logging.fatal("Mainline DHT lost connection!") + sys.exit(1) + + async def __send_messages(self) -> None: + while True: + await asyncio.wait([self.__write_allowed.wait(), self.__queue_nonempty.wait()]) + try: + queued_on, message, address = self.__message_queue.pop() + except IndexError: + self.__queue_nonempty.clear() + continue + + if time.monotonic() - queued_on > 60: + return + + self.__datagram_transport.sendto(message, address) diff --git a/magneticod/magneticod/dht/sink.py b/magneticod/magneticod/dht/sink.py new file mode 100644 index 0000000..e69de29