initial commit for refactoring magneticod (far, far away from complete!)

This commit is contained in:
Bora M. Alper 2017-07-09 11:59:48 +03:00
parent 90538b10af
commit 1df6204a5f
10 changed files with 255 additions and 0 deletions

View File

@ -0,0 +1,14 @@
def encode():
pass
def decode(data):
pass
class EncodeError(Exception):
pass
class DecodeError(Exception):
pass

View File

@ -0,0 +1,146 @@
import typing
from . import transport
class Protocol:
def __init__(self):
pass
# Offered Functionality
# =====================
def on_ping_query(self, query: PingQuery) -> typing.Optional[typing.Union[PingResponse, Error]]:
pass
def on_find_node_query(self, query: FindNodeQuery) -> typing.Optional[typing.Union[FindNodeResponse, Error]]:
pass
def on_get_peers_query(self, query: GetPeersQuery) -> typing.Optional[typing.Union[GetPeersQuery, Error]]:
pass
def on_announce_peer_query(self, query: AnnouncePeerQuery) -> typing.Optional[typing.Union[AnnouncePeerResponse, Error]]:
pass
def on_ping_OR_announce_peer_response(self, response: PingResponse) -> None:
pass
def on_find_node_response(self, response: FindNodeResponse) -> None:
pass
def on_get_peers_response(self, response: GetPeersResponse) -> None:
pass
def on_error(self, response: Error) -> None:
pass
# Private Functionality
# =====================
def when_message_received(self, message):
pass
NodeID = typing.NewType("NodeID", bytes)
InfoHash = typing.NewType("InfoHash", bytes)
NodeInfo = typing.NamedTuple("NodeInfo", [
("id", NodeID),
("address", transport.Address),
])
class BaseQuery:
method_name = b""
def __init__(self, id_: NodeID):
self.id = id_
def to_message(self, *, transaction_id: bytes, client_version: bytes=b"") -> typing.Dict[bytes, typing.Any]:
return {
b"t": transaction_id,
b"y": b"q",
b"v": client_version,
b"q": self.method_name,
b"a": self.__dict__
}
class PingQuery(BaseQuery):
method_name = b"ping"
def __init__(self, id_: NodeID):
super().__init__(id_)
class FindNodeQuery(BaseQuery):
method_name = b"find_node"
def __init__(self, id_: NodeID, target: NodeID):
super().__init__(id_)
self.target = target
class GetPeersQuery(BaseQuery):
method_name = b"get_peers"
def __init__(self, id_: NodeID, info_hash: InfoHash):
super().__init__(id_)
self.info_hash = info_hash
class AnnouncePeerQuery(BaseQuery):
method_name = b"announce_peer"
def __init__(self, id_: NodeID, info_hash: InfoHash, port: int, token: bytes, implied_port: int=0):
super().__init__(id_)
self.info_hash = info_hash
self.port = port
self.token = token
self.implied_port = implied_port
class BaseResponse:
def __init__(self, id_: NodeID):
self.id = id_
def to_message(self, *, transaction_id: bytes, client_version: bytes = b"") -> typing.Dict[bytes, typing.Any]:
return {
b"t": transaction_id,
b"y": b"r",
b"v": client_version,
b"r": self._return_values()
}
def _return_values(self) -> typing.Dict[bytes, typing.Any]:
return {b"id": self.id}
class PingResponse(BaseResponse):
def __init__(self, id_: NodeID):
super().__init__(id_)
class FindNodeResponse(BaseResponse):
def __init__(self, id_: NodeID, nodes: typing.List[NodeInfo]):
super().__init__(id_)
self.nodes = nodes
def _return_values(self) -> typing.Dict[bytes, typing.Any]:
d = super()._return_values()
d.update({
b"nodes": self.nodes # TODO: this is not right obviously, encode & decode!
})
return d
class GetPeersResponse(BaseResponse):
def __init__(self, id_: NodeID, token: bytes, *, values, nodes: typing.Optional[typing.List[NodeInfo]]=None):
assert bool(values) ^ bool(nodes)
super().__init__(id_)
self.token = token
self.values = values,
self.nodes = nodes
class AnnouncePeerResponse(BaseResponse):
def __init__(self, id_: NodeID):
super().__init__(id_)

View File

@ -0,0 +1,95 @@
import asyncio
import collections
import logging
import sys
import time
import typing
from . import codec
Address = typing.Tuple[str, int]
MessageQueueEntry = typing.NamedTuple("MessageQueueEntry", [
("queued_on", float),
("message", bytes),
("address", Address)
])
class Transport(asyncio.DatagramProtocol):
"""
Mainline DHT Transport
The signature `class Transport(asyncio.DatagramProtocol)` seems almost oxymoron, but it's indeed more sensible than
it first seems. `Transport` handles ALL that is related to transporting messages, which includes receiving them
(`asyncio.DatagramProtocol.datagram_received`), sending them (`asyncio.DatagramTransport.send_to`), pausing and
resuming writing as requested by the asyncio, and also handling operational errors.
"""
def __init__(self):
super().__init__()
self.__datagram_transport = asyncio.DatagramTransport()
self.__write_allowed = asyncio.Event()
self.__queue_nonempty = asyncio.Event()
self.__message_queue = collections.deque() # type: typing.Deque[MessageQueueEntry]
self.__messenger_task = asyncio.Task(self.__send_messages())
# Offered Functionality
# =====================
def send_message(self, message, address: Address) -> None:
self.__message_queue.append(MessageQueueEntry(time.monotonic(), message, address))
if not self.__queue_nonempty.is_set():
self.__queue_nonempty.set()
@staticmethod
def on_message(message: dict, address: Address):
pass
# Private Functionality
# =====================
def connection_made(self, transport: asyncio.DatagramTransport) -> None:
self.__datagram_transport = transport
self.__write_allowed.set()
def datagram_received(self, data: bytes, address: Address) -> None:
try:
message = codec.decode(data)
except codec.EncodeError:
return
if not isinstance(message, dict):
return
self.on_message(message, address)
def error_received(self, exc: OSError):
logging.debug("Mainline DHT received error!", exc_info=exc)
def pause_writing(self):
self.__write_allowed.clear()
def resume_writing(self):
self.__write_allowed.set()
def connection_lost(self, exc: Exception):
if exc:
logging.fatal("Mainline DHT lost connection! (See the following log entry for the exception.)",
exc_info=exc
)
else:
logging.fatal("Mainline DHT lost connection!")
sys.exit(1)
async def __send_messages(self) -> None:
while True:
await asyncio.wait([self.__write_allowed.wait(), self.__queue_nonempty.wait()])
try:
queued_on, message, address = self.__message_queue.pop()
except IndexError:
self.__queue_nonempty.clear()
continue
if time.monotonic() - queued_on > 60:
return
self.__datagram_transport.sendto(message, address)

View File