initial commit of go-rewrite

This commit is contained in:
Bora M. Alper 2017-08-08 14:58:51 +03:00
parent 35f07d84b9
commit 374ce0538a
40 changed files with 2202 additions and 2420 deletions

4
.gitignore vendored
View File

@ -1,3 +1,7 @@
src/magneticod/vendor
src/magneticod/Gopkg.lock
src/magneticow/vendor
src/magneticow/Gopkg.lock
# Created by https://www.gitignore.io/api/linux,python,pycharm # Created by https://www.gitignore.io/api/linux,python,pycharm

4
bin/.gitignore vendored Normal file
View File

@ -0,0 +1,4 @@
# Ignore everything in this directory
*
# Except this file
!.gitignore

File diff suppressed because one or more lines are too long

After

Width:  |  Height:  |  Size: 89 KiB

View File

@ -1,2 +0,0 @@
Dockerfile
.dockerignore

View File

@ -1,9 +0,0 @@
FROM python:3.6
RUN mkdir -p /usr/src/app
WORKDIR /usr/src/app
COPY . .
RUN pip install -e .
CMD ["python", "-mmagneticod"]

View File

@ -1,161 +0,0 @@
==========
magneticod
==========
*Autonomous BitTorrent DHT crawler and metadata fetcher.*
**magneticod** is the daemon that crawls the BitTorrent DHT network in the background to discover info hashes and
fetches metadata from the peers. It uses SQLite 3 that is built-in your Python 3.x distribution to persist data.
Installation
============
Requirements
------------
- Python 3.5 or above.
**WARNING:**
Python 3.6.0 and 3.6.1 suffer from a bug (`issue #29714 <http://bugs.python.org/issue29714>`_) that causes
magneticod to fail. As it is an interpreter bug that I have no control on, please make sure that you are not using
any of those Python 3 versions to run magneticod.
- Decent Internet access (IPv4)
**magneticod** uses UDP protocol to communicate with the nodes in the DHT network, and TCP to communicate with the
peers while fetching metadata. **Please make sure you have a healthy connection;** you can confirm this by checking at
the *connection status indicator* of your BitTorrent client: if it does not indicate any error, **magneticod** should
just work fine.
Instructions
------------
1. Download the latest version of **magneticod** from PyPI using pip3: ::
pip3 install magneticod --user
2. Add installation path to the ``$PATH``; append the following line to your ``~/.profile`` if you are using bash ::
export PATH=$PATH:~/.local/bin
**or if you are on macOS** and using bash, (assuming that you are using Python 3.5): ::
export PATH="${PATH}:${HOME}/Library/Python/3.5/bin/"
3. Activate the changes to ``$PATH`` (again, if you are using bash): ::
source ~/.profile
4. Confirm that it is running: ::
magneticod
Within maximum 5 minutes (and usually under a minute) **magneticod** will discover a few torrents! This, of course,
depends on your bandwidth, and your network configuration (existence of a firewall, misconfigured NAT, etc.).
5. *(only for systemd users, skip the rest of the steps and proceed to the* `Using`_ *section if you are not a systemd
user or want to use a different solution)*
Download the magneticod systemd service file (at
`magneticod/systemd/magneticod.service <systemd/magneticod.service>`_) and change the tilde symbol with
the path of your home directory, and the ``PORT_NUMBER`` with the preferred port number. For example, if my username
is ``bora`` and I prefer the port 64879, this line ::
ExecStart=~/.local/bin/magneticod magneticod --node-addr 0.0.0.0:PORT_NUMBER
should become this: ::
ExecStart=/home/bora/.local/bin/magneticod --node-addr 0.0.0.0:64879
Here, tilde (``~``) is replaced with ``/home/bora`` and the ``PORT_NUMBER`` with 64879. Run ``echo ~`` to see the
path of your own home directory, if you do not already know. Port numbers above 1000 typically do not require
special permissions.
6. Copy the magneticod systemd service file to your local systemd configuration directory: ::
cp magneticod.service ~/.config/systemd/user/
You might need to create intermediate directories (``.config``, ``systemd``, and ``user``) if not exists.
7. (Optional, **requires root**) Disable iptables for a specified port: ::
iptables -I OUTPUT -t raw -p udp --sport PORT_NUMBER -j NOTRACK
iptables -I PREROUTING -t raw -p udp --dport PORT_NUMBER -j NOTRACK
This is to prevent excessive number of ``EPERM`` "Operation not permitted" errors, which also has a negative impact
on the performance.
8. Start **magneticod**: ::
systemctl --user enable magneticod --now
**magneticod** should now be running under the supervision of systemd and it should also be automatically started
whenever you boot your machine.
You can check its status and most recent log entries using the following command: ::
systemctl --user status magneticod
To stop **magneticod**, issue the following: ::
systemctl --user stop magneticod
\
**Suggestion:**
Keep **magneticod** running so that when you finish installing **magneticow**, database will be populated and you
can see some results.
Using
=====
**magneticod** does not require user interference to operate, once it starts running. Hence, there is no "user manual",
although you should beware of these points:
1. **Network Usage:**
**magneticod** does *not* have any built-in rate limiter *yet*, and it will literally suck the hell out of your
bandwidth. Unless you are running **magneticod** on a separate machine dedicated for it, you might want to consider
starting it manually only when network load is low (e.g. when you are at work or sleeping at night).
2. **Pre-Alpha Bugs:**
**magneticod** is *supposed* to work "just fine", but as being at pre-alpha stage, it's likely that you might find
some bugs. It will be much appreciated if you can report those bugs, so that **magneticod** can be improved. See the
next sub-section for how to mitigate the issue if you are *not* using systemd.
Automatic Restarting
--------------------
Due to minor bugs at this stage of its development, **magneticod** should be supervised by another program to be ensured
that it's running, and should be restarted if not. systemd service file supplied by **magneticod** implements that,
although (if you wish) you can also use a much more primitive approach using GNU screen (which comes pre-installed in
many GNU/Linux distributions):
1. Start screen session named ``magneticod``: ::
screen -S magneticod
2. Run **magneticod** forever: ::
until magneticod; do echo "restarting..."; sleep 5; done;
This will keep restarting **magneticod** after five seconds in case if it fails.
3. Detach the session by pressing Ctrl+A and after Ctrl+D.
4. If you wish to see the logs, or to kill **magneticod**, ``screen -r magneticod`` will attach the original screen
session back. **magneticod** will exit gracefully upon keyboard interrupt (Ctrl+C) [SIGINT].
Database
--------
**magneticod** uses SQLite 3 that is built-in by default in almost all Python distributions.
`appdirs <https://pypi.python.org/pypi/appdirs/>`_ package is used to determine user data directory, which is often
``~/.local/share/magneticod``. **magneticod** uses write-ahead logging for its database, so there might be multiple
files while it is operating, but ``database.sqlite3`` is *the main database where every torrent metadata is stored*.
License
=======
All the code is licensed under AGPLv3, unless otherwise stated in the source specific source. See ``COPYING`` file
in ``magnetico`` directory for the full license text.
----
Dedicated to Cemile Binay, in whose hands I thrived.
Bora M. ALPER <bora@boramalper.org>

View File

@ -1,15 +0,0 @@
# magneticod - Autonomous BitTorrent DHT crawler and metadata fetcher.
# Copyright (C) 2017 Mert Bora ALPER <bora@boramalper.org>
# Dedicated to Cemile Binay, in whose hands I thrived.
#
# This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General
# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any
# later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
# details.
#
# You should have received a copy of the GNU Affero General Public License along with this program. If not, see
# <http://www.gnu.org/licenses/>.
__version__ = (0, 6, 0)

View File

@ -1,147 +0,0 @@
# magneticod - Autonomous BitTorrent DHT crawler and metadata fetcher.
# Copyright (C) 2017 Mert Bora ALPER <bora@boramalper.org>
# Dedicated to Cemile Binay, in whose hands I thrived.
#
# This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General
# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any
# later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
# details.
#
# You should have received a copy of the GNU Affero General Public License along with this program. If not, see
# <http://www.gnu.org/licenses/>.
import argparse
import asyncio
import logging
import ipaddress
import textwrap
import urllib.parse
import os
import sys
import typing
import appdirs
import humanfriendly
from .constants import DEFAULT_MAX_METADATA_SIZE
from . import __version__
from . import dht
from . import persistence
def parse_ip_port(netloc: str) -> typing.Optional[typing.Tuple[str, int]]:
# In case no port supplied
try:
return str(ipaddress.ip_address(netloc)), 0
except ValueError:
pass
# In case port supplied
try:
parsed = urllib.parse.urlparse("//{}".format(netloc))
ip = str(ipaddress.ip_address(parsed.hostname))
port = parsed.port
if port is None:
return None
except ValueError:
return None
return ip, port
def parse_size(value: str) -> int:
try:
return humanfriendly.parse_size(value)
except humanfriendly.InvalidSize as e:
raise argparse.ArgumentTypeError("Invalid argument. {}".format(e))
def parse_cmdline_arguments(args: typing.List[str]) -> typing.Optional[argparse.Namespace]:
parser = argparse.ArgumentParser(
description="Autonomous BitTorrent DHT crawler and metadata fetcher.",
epilog=textwrap.dedent("""\
Copyright (C) 2017 Mert Bora ALPER <bora@boramalper.org>
Dedicated to Cemile Binay, in whose hands I thrived.
This program is free software: you can redistribute it and/or modify it under
the terms of the GNU Affero General Public License as published by the Free
Software Foundation, either version 3 of the License, or (at your option) any
later version.
This program is distributed in the hope that it will be useful, but WITHOUT ANY
WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
PARTICULAR PURPOSE. See the GNU Affero General Public License for more
details.
You should have received a copy of the GNU Affero General Public License along
with this program. If not, see <http://www.gnu.org/licenses/>.
"""),
allow_abbrev=False,
formatter_class=argparse.RawDescriptionHelpFormatter
)
parser.add_argument(
"--node-addr", action="store", type=parse_ip_port, required=False, default="0.0.0.0:0",
help="the address of the (DHT) node magneticod will use"
)
parser.add_argument(
"--max-metadata-size", type=parse_size, default=DEFAULT_MAX_METADATA_SIZE,
help="Limit metadata size to protect memory overflow. Provide in human friendly format eg. 1 M, 1 GB"
)
default_database_dir = os.path.join(appdirs.user_data_dir("magneticod"), "database.sqlite3")
parser.add_argument(
"--database-file", type=str, default=default_database_dir,
help="Path to database file (default: {})".format(humanfriendly.format_path(default_database_dir))
)
parser.add_argument(
'-d', '--debug',
action="store_const", dest="loglevel", const=logging.DEBUG, default=logging.INFO,
help="Print debugging information in addition to normal processing.",
)
return parser.parse_args(args)
def main() -> int:
# main_task = create_tasks()
arguments = parse_cmdline_arguments(sys.argv[1:])
logging.basicConfig(level=arguments.loglevel, format="%(asctime)s %(levelname)-8s %(message)s")
logging.info("magneticod v%d.%d.%d started", *__version__)
# use uvloop if it's installed
try:
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
logging.info("uvloop is in use")
except ImportError:
if sys.platform not in ["linux", "darwin"]:
logging.warning("uvloop could not be imported, using the default asyncio implementation")
# noinspection PyBroadException
try:
database = persistence.Database(arguments.database_file)
except:
logging.exception("could NOT connect to the database!")
return 1
loop = asyncio.get_event_loop()
node = dht.SybilNode(database, arguments.max_metadata_size)
loop.create_task(node.launch(arguments.node_addr))
try:
asyncio.get_event_loop().run_forever()
except KeyboardInterrupt:
logging.critical("Keyboard interrupt received! Exiting gracefully...")
finally:
loop.run_until_complete(node.shutdown())
database.close()
return 0
if __name__ == "__main__":
sys.exit(main())

View File

@ -1,80 +0,0 @@
# magneticod - Autonomous BitTorrent DHT crawler and metadata fetcher.
# Copyright (C) 2017 Mert Bora ALPER <bora@boramalper.org>
# Dedicated to Cemile Binay, in whose hands I thrived.
#
# This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General
# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any
# later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
# details.
#
# You should have received a copy of the GNU Affero General Public License along with this program. If not, see
# <http://www.gnu.org/licenses/>.
"""
Warning:
Encoders do NOT check for circular objects! (and will NEVER check due to speed concerns).
"""
import typing
from io import BytesIO
import better_bencode
"""
The type definitions under this comment is actually this:
KRPCTypes = typing.Union[int, bytes, "KRPCList", "KRPCDict"]
KRPCList = typing.List[KRPCTypes]
KRPCDict = typing.Dict[bytes, KRPCTypes]
But since mypy:
* does NOT support self-referential types
* have problems with complex Unions (in case you thought about expanding manually: I tried)
just write `typing.Any`. =(
"""
KRPCTypes = typing.Any
KRPCList = typing.Any
KRPCDict = typing.Any
def dumps(obj: KRPCTypes) -> bytes:
try:
return better_bencode.dumps(obj)
except:
raise BencodeEncodingError()
def loads(bytes_object: bytes) -> KRPCTypes:
try:
return better_bencode.loads(bytes_object)
except Exception as exc:
raise BencodeDecodingError(exc)
def loads2(bytes_object: bytes) -> typing.Tuple[KRPCTypes, int]:
"""
Returns the bencoded object AND the index where the dump of the decoded object ends (exclusive). In less words:
dump = b"i12eOH YEAH"
object, i = loads2(dump)
print(">>>", dump[i:]) # OUTPUT: >>> b'OH YEAH'
"""
bio = BytesIO(bytes_object)
try:
return better_bencode.load(bio), bio.tell()
except Exception as exc:
raise BencodeDecodingError(exc)
class BencodeEncodingError(Exception):
pass
class BencodeDecodingError(Exception):
def __init__(self, original_exc):
super().__init__()
self.original_exc = original_exc

View File

@ -1,221 +0,0 @@
# magneticod - Autonomous BitTorrent DHT crawler and metadata fetcher.
# Copyright (C) 2017 Mert Bora ALPER <bora@boramalper.org>
# Dedicated to Cemile Binay, in whose hands I thrived.
#
# This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General
# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any
# later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
# details.
#
# You should have received a copy of the GNU Affero General Public License along with this program. If not, see
# <http://www.gnu.org/licenses/>.
import asyncio
import logging
import hashlib
import math
import typing
import os
from . import bencode
InfoHash = bytes
PeerAddress = typing.Tuple[str, int]
async def fetch_metadata_from_peer(info_hash: InfoHash, peer_addr: PeerAddress, max_metadata_size: int, timeout=None) \
-> typing.Optional[bytes]:
try:
# asyncio.wait_for "returns result of the Future or coroutine."Returns result of the Future or coroutine.
return await asyncio.wait_for(DisposablePeer(info_hash, peer_addr, max_metadata_size).run(), timeout=timeout)
except asyncio.TimeoutError:
return None
class ProtocolError(Exception):
pass
class DisposablePeer:
def __init__(self, info_hash: InfoHash, peer_addr: PeerAddress, max_metadata_size: int) -> None:
self.__peer_addr = peer_addr
self.__info_hash = info_hash
self.__ext_handshake_complete = False # Extension Handshake
self.__ut_metadata = int() # Since we don't know ut_metadata code that remote peer uses...
self.__max_metadata_size = max_metadata_size
self.__metadata_size = None
self.__metadata_received = 0 # Amount of metadata bytes received...
self.__metadata = bytearray()
self._run_task = None
self._writer = None
async def run(self) -> typing.Optional[bytes]:
event_loop = asyncio.get_event_loop()
self._metadata_future = event_loop.create_future()
try:
self._reader, self._writer = await asyncio.open_connection(*self.__peer_addr, loop=event_loop) # type: ignore
# Send the BitTorrent initiate_the_bittorrent_handshake message (0x13 = 19 in decimal, the length of the initiate_the_bittorrent_handshake message)
self._writer.write(b"\x13BitTorrent protocol%s%s%s" % ( # type: ignore
b"\x00\x00\x00\x00\x00\x10\x00\x01",
self.__info_hash,
os.urandom(20)
))
# Honestly speaking, BitTorrent protocol might be one of the most poorly documented and (not the most but)
# badly designed protocols I have ever seen (I am 19 years old so what I could have seen?).
#
# Anyway, all the messages EXCEPT the initiate_the_bittorrent_handshake are length-prefixed by 4 bytes in network order, BUT the
# size of the initiate_the_bittorrent_handshake message is the 1-byte length prefix + 49 bytes, but luckily, there is only one
# canonical way of handshaking in the wild.
message = await self._reader.readexactly(68)
if message[1:20] != b"BitTorrent protocol":
# Erroneous initiate_the_bittorrent_handshake, possibly unknown version...
raise ProtocolError("Erroneous BitTorrent initiate_the_bittorrent_handshake! %s" % message)
self.__on_bt_handshake(message)
while not self._metadata_future.done():
buffer = await self._reader.readexactly(4)
length = int.from_bytes(buffer, "big")
message = await self._reader.readexactly(length)
self.__on_message(message)
except Exception:
logging.debug("closing %s to %s", self.__info_hash.hex(), self.__peer_addr)
finally:
if not self._metadata_future.done():
self._metadata_future.set_result(None)
if self._writer:
self._writer.close()
return self._metadata_future.result()
def __on_message(self, message: bytes) -> None:
# Every extension message has BitTorrent Message ID = 20
if message[0] != 20:
# logging.debug("Message is NOT an EXTension message! %s", message[:200])
return
# Extension Handshake has the Extension Message ID = 0
if message[1] == 0:
self.__on_ext_handshake_message(message[2:])
return
# ut_metadata extension messages has the Extension Message ID = 1 (as we arbitrarily decided!)
if message[1] != 1:
logging.debug("Message is NOT an ut_metadata message! %s", message[:200])
return
# Okay, now we are -almost- sure that this is an extension message, a kind we are most likely interested in...
self.__on_ext_message(message[2:])
def __on_bt_handshake(self, message: bytes) -> None:
""" on BitTorrent Handshake... send the extension initiate_the_bittorrent_handshake! """
if message[25] != 16:
logging.info("Peer does NOT support the extension protocol")
msg_dict_dump = bencode.dumps({
b"m": {
b"ut_metadata": 1
}
})
# In case you cannot read hex:
# 0x14 = 20 (BitTorrent ID indicating that it's an extended message)
# 0x00 = 0 (Extension ID indicating that it's the initiate_the_bittorrent_handshake message)
self._writer.write(b"%b\x14%s" % ( # type: ignore
(2 + len(msg_dict_dump)).to_bytes(4, "big"),
b'\0' + msg_dict_dump
))
def __on_ext_handshake_message(self, message: bytes) -> None:
if self.__ext_handshake_complete:
return
try:
msg_dict = bencode.loads(bytes(message))
except bencode.BencodeDecodingError:
# One might be tempted to close the connection, but why care? Any DisposableNode will be disposed
# automatically anyway (after a certain amount of time if the metadata is still not complete).
logging.debug("Could NOT decode extension initiate_the_bittorrent_handshake message! %s", message[:200])
return
try:
# Just to make sure that the remote peer supports ut_metadata extension:
ut_metadata = msg_dict[b"m"][b"ut_metadata"]
metadata_size = msg_dict[b"metadata_size"]
assert metadata_size > 0, "Invalid (empty) metadata size"
assert metadata_size < self.__max_metadata_size, "Malicious or malfunctioning peer {}:{} tried send above" \
" {} max metadata size".format(self.__peer_addr[0],
self.__peer_addr[1],
self.__max_metadata_size)
except AssertionError as e:
logging.debug(str(e))
raise
self.__ut_metadata = ut_metadata
try:
self.__metadata = bytearray(metadata_size) # type: ignore
except MemoryError:
logging.exception("Could not allocate %.1f KiB for the metadata!", metadata_size / 1024)
raise
self.__metadata_size = metadata_size
self.__ext_handshake_complete = True
# After the initiate_the_bittorrent_handshake is complete, request all the pieces of metadata
n_pieces = math.ceil(self.__metadata_size / (2 ** 14))
for piece in range(n_pieces):
self.__request_metadata_piece(piece)
def __on_ext_message(self, message: bytes) -> None:
try:
msg_dict, i = bencode.loads2(bytes(message))
except bencode.BencodeDecodingError:
# One might be tempted to close the connection, but why care? Any DisposableNode will be disposed
# automatically anyway (after a certain amount of time if the metadata is still not complete).
logging.debug("Could NOT decode extension message! %s", message[:200])
return
try:
msg_type = msg_dict[b"msg_type"]
piece = msg_dict[b"piece"]
except KeyError:
logging.debug("Missing EXT keys! %s", msg_dict)
return
if msg_type == 1: # data
metadata_piece = message[i:]
self.__metadata[piece * 2**14: piece * 2**14 + len(metadata_piece)] = metadata_piece
self.__metadata_received += len(metadata_piece)
# self.__metadata += metadata_piece
# logging.debug("PIECE %d RECEIVED %s", piece, metadata_piece[:200])
if self.__metadata_received == self.__metadata_size:
if hashlib.sha1(self.__metadata).digest() == self.__info_hash:
if not self._metadata_future.done():
self._metadata_future.set_result(bytes(self.__metadata))
else:
logging.debug("Invalid Metadata! Ignoring.")
elif msg_type == 2: # reject
logging.info("Peer rejected us.")
def __request_metadata_piece(self, piece: int) -> None:
msg_dict_dump = bencode.dumps({
b"msg_type": 0,
b"piece": piece
})
# In case you cannot read_file hex:
# 0x14 = 20 (BitTorrent ID indicating that it's an extended message)
# 0x03 = 3 (Extension ID indicating that it's an ut_metadata message)
self._writer.write(b"%b\x14%s%s" % ( # type: ignore
(2 + len(msg_dict_dump)).to_bytes(4, "big"),
self.__ut_metadata.to_bytes(1, "big"),
msg_dict_dump
))

View File

@ -1,64 +0,0 @@
# magneticod - Autonomous BitTorrent DHT crawler and metadata fetcher.
# Copyright (C) 2017 Mert Bora ALPER <bora@boramalper.org>
# Dedicated to Cemile Binay, in whose hands I thrived.
#
# This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General
# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any
# later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
# details.
#
# You should have received a copy of the GNU Affero General Public License along with this program. If not, see
# <http://www.gnu.org/licenses/>.
import typing
import io
import better_bencode
Message = typing.Dict[bytes, typing.Any]
def encode(message: Message) -> bytes:
try:
return better_bencode.dumps(message)
except Exception as exc:
raise EncodeError(exc)
def decode(data: typing.ByteString) -> Message:
try:
return better_bencode.loads(data)
except Exception as exc:
raise DecodeError(exc)
def decode_prefix(data: typing.ByteString) -> typing.Tuple[Message, int]:
"""
Returns the bencoded object AND the index where the dump of the decoded object ends (exclusive). In less words:
dump = b"i12eOH YEAH"
object, i = decode_prefix(dump)
print(">>>", dump[i:]) # OUTPUT: >>> b'OH YEAH'
"""
bio = io.BytesIO(data)
try:
return better_bencode.load(bio), bio.tell()
except Exception as exc:
raise DecodeError(exc)
class BaseCodecError(Exception):
def __init__(self, original_exception: Exception):
self.original_exception = original_exception
class EncodeError(BaseCodecError):
pass
class DecodeError(BaseCodecError):
pass

View File

@ -1,14 +0,0 @@
# coding=utf-8
DEFAULT_MAX_METADATA_SIZE = 10 * 1024 * 1024
BOOTSTRAPPING_NODES = [
("router.bittorrent.com", 6881),
("dht.transmissionbt.com", 6881)
]
PENDING_INFO_HASHES = 10 # threshold for pending info hashes before being committed to database:
TICK_INTERVAL = 1 # in seconds
# maximum (inclusive) number of active (disposable) peers to fetch the metadata per info hash at the same time:
MAX_ACTIVE_PEERS_PER_INFO_HASH = 5
PEER_TIMEOUT=120 # seconds

View File

@ -1,42 +0,0 @@
# magneticod - Autonomous BitTorrent DHT crawler and metadata fetcher.
# Copyright (C) 2017 Mert Bora ALPER <bora@boramalper.org>
# Dedicated to Cemile Binay, in whose hands I thrived.
#
# This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General
# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any
# later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
# details.
#
# You should have received a copy of the GNU Affero General Public License along with this program. If not, see
# <http://www.gnu.org/licenses/>.
import asyncio
from .dht import mainline
from . import bittorrent
class Coordinator:
def __init__(self):
self._peer_service = mainline.service.PeerService()
self._metadata_service_tasks = {}
async def run(self):
await self._peer_service.launch(("0.0.0.0", 0))
# Private Functionality
# =====================
def _when_peer(self, info_hash: mainline.protocol.InfoHash, address: mainline.transport.Address) \
-> None:
if info_hash in self._metadata_service_tasks:
return
self._metadata_service_tasks[info_hash] = asyncio.ensure_future()
def _when_metadata(self, info_hash: mainline.protocol.InfoHash, address: mainline.transport.Address) -> None:
pass

View File

@ -1,393 +0,0 @@
# magneticod - Autonomous BitTorrent DHT crawler and metadata fetcher.
# Copyright (C) 2017 Mert Bora ALPER <bora@boramalper.org>
# Dedicated to Cemile Binay, in whose hands I thrived.
#
# This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General
# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any
# later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
# details.
#
# You should have received a copy of the GNU Affero General Public License along with this program. If not, see
# <http://www.gnu.org/licenses/>.
import asyncio
import errno
import zlib
import logging
import socket
import typing
import os
from .constants import BOOTSTRAPPING_NODES, MAX_ACTIVE_PEERS_PER_INFO_HASH, PEER_TIMEOUT, TICK_INTERVAL
from . import bencode
from . import bittorrent
from . import persistence
NodeID = bytes
NodeAddress = typing.Tuple[str, int]
PeerAddress = typing.Tuple[str, int]
InfoHash = bytes
Metadata = bytes
class SybilNode(asyncio.DatagramProtocol):
def __init__(self, database: persistence.Database, max_metadata_size):
self.__true_id = os.urandom(20)
self._routing_table = {} # type: typing.Dict[NodeID, NodeAddress]
self.__token_secret = os.urandom(4)
# Maximum number of neighbours (this is a THRESHOLD where, once reached, the search for new neighbours will
# stop; but until then, the total number of neighbours might exceed the threshold).
self.__n_max_neighbours = 2000
self.__parent_futures = {} # type: typing.Dict[InfoHash, asyncio.Future]
self.__database = database
self.__max_metadata_size = max_metadata_size
self._is_writing_paused = False
self._tick_task = None
logging.info("SybilNode %s initialized!", self.__true_id.hex().upper())
async def launch(self, address):
await asyncio.get_event_loop().create_datagram_endpoint(lambda: self, local_addr=address)
logging.info("SybliNode is launched on %s!", address)
# mypy ignored: mypy errors because we explicitly stated `transport`s type =)
def connection_made(self, transport: asyncio.DatagramTransport) -> None: # type: ignore
# mypy ignored: mypy doesn't know (yet) about coroutines
self._tick_task = asyncio.get_event_loop().create_task(self.tick_periodically()) # type: ignore
self._transport = transport
def connection_lost(self, exc) -> None:
logging.critical("SybilNode's connection is lost.")
self._is_writing_paused = True
def pause_writing(self) -> None:
self._is_writing_paused = True
# In case of congestion, decrease the maximum number of nodes to the 90% of the current value.
self.__n_max_neighbours = self.__n_max_neighbours * 9 // 10
logging.debug("Maximum number of neighbours now %d", self.__n_max_neighbours)
def resume_writing(self) -> None:
self._is_writing_paused = False
def sendto(self, data, addr) -> None:
if not self._is_writing_paused:
self._transport.sendto(data, addr)
def error_received(self, exc: Exception) -> None:
if isinstance(exc, PermissionError) or (isinstance(exc, OSError) and exc.errno == errno.ENOBUFS):
# This exception (EPERM errno: 1) is kernel's way of saying that "you are far too fast, chill".
# It is also likely that we have received a ICMP source quench packet (meaning, that we really need to
# slow down.
#
# Read more here: http://www.archivum.info/comp.protocols.tcp-ip/2009-05/00088/UDP-socket-amp-amp-sendto
# -amp-amp-EPERM.html
# > Note On BSD systems (OS X, FreeBSD, etc.) flow control is not supported for DatagramProtocol, because
# > send failures caused by writing too many packets cannot be detected easily. The socket always appears
# > ready and excess packets are dropped; an OSError with errno set to errno.ENOBUFS may or may not be
# > raised; if it is raised, it will be reported to DatagramProtocol.error_received() but otherwise ignored.
# Source: https://docs.python.org/3/library/asyncio-protocol.html#flow-control-callbacks
# In case of congestion, decrease the maximum number of nodes to the 90% of the current value.
if self.__n_max_neighbours < 200:
logging.warning("Max. number of neighbours are < 200 and there is still congestion! (check your network "
"connection if this message recurs)")
else:
self.__n_max_neighbours = self.__n_max_neighbours * 9 // 10
logging.debug("Maximum number of neighbours now %d", self.__n_max_neighbours)
else:
# The previous "exception" was kind of "unexceptional", but we should log anything else.
logging.error("SybilNode operational error: `%s`", exc)
async def tick_periodically(self) -> None:
while True:
await asyncio.sleep(TICK_INTERVAL)
# Bootstrap (by querying the bootstrapping servers) ONLY IF the routing table is empty (i.e. we don't have
# any neighbours). Otherwise we'll increase the load on those central servers by querying them every second.
if not self._routing_table:
await self.__bootstrap()
self.__make_neighbours()
self._routing_table.clear()
if not self._is_writing_paused:
self.__n_max_neighbours = self.__n_max_neighbours * 101 // 100
# mypy ignore: because .child_count on Future is monkey-patched
logging.debug("fetch metadata task count: %d", sum(
x.child_count for x in self.__parent_futures.values())) # type: ignore
logging.debug("asyncio task count: %d", len(asyncio.Task.all_tasks()))
def datagram_received(self, data, addr) -> None:
# Ignore nodes that "uses" port 0, as we cannot communicate with them reliably across the different systems.
# See https://tools.cisco.com/security/center/viewAlert.x?alertId=19935 for slightly more details
if addr[1] == 0:
return
if self._transport.is_closing():
return
try:
message = bencode.loads(data)
except bencode.BencodeDecodingError:
return
if isinstance(message.get(b"r"), dict) and type(message[b"r"].get(b"nodes")) is bytes:
self.__on_FIND_NODE_response(message)
elif message.get(b"q") == b"get_peers":
self.__on_GET_PEERS_query(message, addr)
elif message.get(b"q") == b"announce_peer":
self.__on_ANNOUNCE_PEER_query(message, addr)
async def shutdown(self) -> None:
parent_futures = list(self.__parent_futures.values())
for pf in parent_futures:
pf.set_result(None)
self._tick_task.cancel()
await asyncio.wait([self._tick_task])
self._transport.close()
def __on_FIND_NODE_response(self, message: bencode.KRPCDict) -> None: # pylint: disable=invalid-name
# Well, we are not really interested in your response if our routing table is already full; sorry.
# (Thanks to Glandos@GitHub for the heads up!)
if len(self._routing_table) >= self.__n_max_neighbours:
return
try:
nodes_arg = message[b"r"][b"nodes"]
assert type(nodes_arg) is bytes and len(nodes_arg) % 26 == 0
except (TypeError, KeyError, AssertionError):
return
try:
nodes = self.__decode_nodes(nodes_arg)
except AssertionError:
return
nodes = [n for n in nodes if n[1][1] != 0] # Ignore nodes with port 0.
self._routing_table.update(nodes[:self.__n_max_neighbours - len(self._routing_table)])
def __on_GET_PEERS_query(self, message: bencode.KRPCDict, addr: NodeAddress) -> None: # pylint: disable=invalid-name
try:
transaction_id = message[b"t"]
assert type(transaction_id) is bytes and transaction_id
info_hash = message[b"a"][b"info_hash"]
assert type(info_hash) is bytes and len(info_hash) == 20
except (TypeError, KeyError, AssertionError):
return
data = self.__build_GET_PEERS_response(
info_hash[:15] + self.__true_id[:5], transaction_id, self.__calculate_token(addr, info_hash)
)
# TODO:
# We would like to prioritise GET_PEERS responses as they are the most fruitful ones, i.e., that leads to the
# discovery of an info hash & metadata! But there is no easy way to do this with asyncio...
# Maybe use priority queues to prioritise certain messages and let them accumulate, and dispatch them to the
# transport at every tick?
self.sendto(data, addr)
def __on_ANNOUNCE_PEER_query(self, message: bencode.KRPCDict, addr: NodeAddress) -> None: # pylint: disable=invalid-name
try:
node_id = message[b"a"][b"id"]
assert type(node_id) is bytes and len(node_id) == 20
transaction_id = message[b"t"]
assert type(transaction_id) is bytes and transaction_id
token = message[b"a"][b"token"]
assert type(token) is bytes
info_hash = message[b"a"][b"info_hash"]
assert type(info_hash) is bytes and len(info_hash) == 20
if b"implied_port" in message[b"a"]:
implied_port = message[b"a"][b"implied_port"]
assert implied_port in (0, 1)
else:
implied_port = None
port = message[b"a"][b"port"]
assert type(port) is int and 0 < port < 65536
except (TypeError, KeyError, AssertionError):
return
data = self.__build_ANNOUNCE_PEER_response(node_id[:15] + self.__true_id[:5], transaction_id)
self.sendto(data, addr)
if implied_port:
peer_addr = (addr[0], addr[1])
else:
peer_addr = (addr[0], port)
if not self.__database.is_infohash_new(info_hash):
return
event_loop = asyncio.get_event_loop()
# A little clarification about parent and child futures might be really useful here:
# For every info hash we are interested in, we create ONE parent future and save it under self.__tasks
# (info_hash -> task) dictionary.
# For EVERY DisposablePeer working to fetch the metadata of that info hash, we create a child future. Hence, for
# every parent future, there should be *at least* one child future.
#
# Parent and child futures are "connected" to each other through `add_done_callback` functionality:
# When a child is successfully done, it sets the result of its parent (`set_result()`), and if it was
# unsuccessful to fetch the metadata, it just checks whether there are any other child futures left and if not
# it terminates the parent future (by setting its result to None) and quits.
# When a parent future is successfully done, (through the callback) it adds the info hash to the set of
# completed metadatas and puts the metadata in the queue to be committed to the database.
# create the parent future
if info_hash not in self.__parent_futures:
parent_f = event_loop.create_future()
# mypy ignore: because .child_count on Future is being monkey-patched here!
parent_f.child_count = 0 # type: ignore
parent_f.add_done_callback(lambda f: self._parent_task_done(f, info_hash))
self.__parent_futures[info_hash] = parent_f
parent_f = self.__parent_futures[info_hash]
if parent_f.done():
return
# mypy ignore: because .child_count on Future is monkey-patched
if parent_f.child_count > MAX_ACTIVE_PEERS_PER_INFO_HASH: # type: ignore
return
task = asyncio.ensure_future(bittorrent.fetch_metadata_from_peer(
info_hash, peer_addr, self.__max_metadata_size, timeout=PEER_TIMEOUT))
task.add_done_callback(lambda task: self._got_child_result(parent_f, task))
# mypy ignore: because .child_count on Future is monkey-patched
parent_f.child_count += 1 # type: ignore
parent_f.add_done_callback(lambda f: task.cancel())
def _got_child_result(self, parent_task, child_task):
parent_task.child_count -= 1
try:
metadata = child_task.result()
# Bora asked:
# Why do we check for parent_task being done here when a child got result? I mean, if parent_task is
# done before, and successful, all of its childs will be terminated and this function cannot be called
# anyway.
#
# --- https://github.com/boramalper/magnetico/pull/76#discussion_r119555423
#
# Suppose two child tasks are fetching the same metadata for a parent and they finish at the same time
# (or very close). The first one wakes up, sets the parent_task result which will cause the done
# callback to be scheduled. The scheduler might still then chooses the second child task to run next
# (why not? It's been waiting longer) before the parent has a chance to cancel it.
#
# Thus spoke Richard.
if metadata and not parent_task.done():
parent_task.set_result(metadata)
except asyncio.CancelledError:
pass
except Exception:
logging.exception("child result is exception")
if parent_task.child_count <= 0 and not parent_task.done():
parent_task.set_result(None)
def _parent_task_done(self, parent_task, info_hash):
del self.__parent_futures[info_hash]
try:
metadata = parent_task.result()
if not metadata:
return
except asyncio.CancelledError:
return
succeeded = self.__database.add_metadata(info_hash, metadata)
if not succeeded:
logging.info("Corrupt metadata for %s! Ignoring.", info_hash.hex())
async def __bootstrap(self) -> None:
event_loop = asyncio.get_event_loop()
for node in BOOTSTRAPPING_NODES:
try:
# AF_INET means ip4 only
responses = await event_loop.getaddrinfo(*node, family=socket.AF_INET)
for (family, type, proto, canonname, sockaddr) in responses:
data = self.__build_FIND_NODE_query(self.__true_id)
self.sendto(data, sockaddr)
except Exception:
logging.exception("An exception occurred during bootstrapping!")
def __make_neighbours(self) -> None:
for node_id, addr in self._routing_table.items():
self.sendto(self.__build_FIND_NODE_query(node_id[:15] + self.__true_id[:5]), addr)
@staticmethod
def __decode_nodes(infos: bytes) -> typing.List[typing.Tuple[NodeID, NodeAddress]]:
""" Reference Implementation:
nodes = []
for i in range(0, len(infos), 26):
info = infos[i: i + 26]
node_id = info[:20]
node_host = socket.inet_ntoa(info[20:24])
node_port = int.from_bytes(info[24:], "big")
nodes.append((node_id, (node_host, node_port)))
return nodes
"""
""" Optimized Version: """
# Because dot-access also has a cost
inet_ntoa = socket.inet_ntoa
int_from_bytes = int.from_bytes
return [
(infos[i:i+20], (inet_ntoa(infos[i+20:i+24]), int_from_bytes(infos[i+24:i+26], "big")))
for i in range(0, len(infos), 26)
]
def __calculate_token(self, addr: NodeAddress, info_hash: InfoHash) -> bytes:
# Believe it or not, faster than using built-in hash (including conversion from int -> bytes of course)
checksum = zlib.adler32(b"%s%s%d%s" % (self.__token_secret, socket.inet_aton(addr[0]), addr[1], info_hash))
return checksum.to_bytes(4, "big")
@staticmethod
def __build_FIND_NODE_query(id_: bytes) -> bytes: # pylint: disable=invalid-name
""" Reference Implementation:
bencode.dumps({
b"y": b"q",
b"q": b"find_node",
b"t": b"aa",
b"a": {
b"id": id_,
b"target": self.__random_bytes(20)
}
})
"""
""" Optimized Version: """
return b"d1:ad2:id20:%s6:target20:%se1:q9:find_node1:t2:aa1:y1:qe" % (
id_,
os.urandom(20)
)
@staticmethod
def __build_GET_PEERS_response(id_: bytes, transaction_id: bytes, token: bytes) -> bytes: # pylint: disable=invalid-name
""" Reference Implementation:
bencode.dumps({
b"y": b"r",
b"t": transaction_id,
b"r": {
b"id": info_hash[:15] + self.__true_id[:5],
b"nodes": b"",
b"token": self.__calculate_token(addr, info_hash)
}
})
"""
""" Optimized Version: """
return b"d1:rd2:id20:%s5:nodes0:5:token%d:%se1:t%d:%s1:y1:re" % (
id_, len(token), token, len(transaction_id), transaction_id
)
@staticmethod
def __build_ANNOUNCE_PEER_response(id_: bytes, transaction_id: bytes) -> bytes: # pylint: disable=invalid-name
""" Reference Implementation:
bencode.dumps({
b"y": b"r",
b"t": transaction_id,
b"r": {
b"id": node_id[:15] + self.__true_id[:5]
}
})
"""
""" Optimized Version: """
return b"d1:rd2:id20:%se1:t%d:%s1:y1:re" % (id_, len(transaction_id), transaction_id)

View File

@ -1,140 +0,0 @@
# magneticod - Autonomous BitTorrent DHT crawler and metadata fetcher.
# Copyright (C) 2017 Mert Bora ALPER <bora@boramalper.org>
# Dedicated to Cemile Binay, in whose hands I thrived.
#
# This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General
# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any
# later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
# details.
#
# You should have received a copy of the GNU Affero General Public License along with this program. If not, see
# <http://www.gnu.org/licenses/>.
import logging
import sqlite3
import time
import typing
import os
from magneticod import bencode
from .constants import PENDING_INFO_HASHES
class Database:
def __init__(self, database) -> None:
self.__db_conn = self.__connect(database)
# We buffer metadata to flush many entries at once, for performance reasons.
# list of tuple (info_hash, name, total_size, discovered_on)
self.__pending_metadata = [] # type: typing.List[typing.Tuple[bytes, str, int, int]]
# list of tuple (info_hash, size, path)
self.__pending_files = [] # type: typing.List[typing.Tuple[bytes, int, bytes]]
@staticmethod
def __connect(database) -> sqlite3.Connection:
os.makedirs(os.path.split(database)[0], exist_ok=True)
db_conn = sqlite3.connect(database, isolation_level=None)
db_conn.execute("PRAGMA journal_mode=WAL;")
db_conn.execute("PRAGMA temp_store=1;")
db_conn.execute("PRAGMA foreign_keys=ON;")
with db_conn:
db_conn.execute("CREATE TABLE IF NOT EXISTS torrents ("
"id INTEGER PRIMARY KEY AUTOINCREMENT,"
"info_hash BLOB NOT NULL UNIQUE,"
"name TEXT NOT NULL,"
"total_size INTEGER NOT NULL CHECK(total_size > 0),"
"discovered_on INTEGER NOT NULL CHECK(discovered_on > 0)"
");")
db_conn.execute("CREATE INDEX IF NOT EXISTS info_hash_index ON torrents (info_hash);")
db_conn.execute("CREATE TABLE IF NOT EXISTS files ("
"id INTEGER PRIMARY KEY,"
"torrent_id INTEGER REFERENCES torrents ON DELETE CASCADE ON UPDATE RESTRICT,"
"size INTEGER NOT NULL,"
"path TEXT NOT NULL"
");")
return db_conn
def add_metadata(self, info_hash: bytes, metadata: bytes) -> bool:
files = []
discovered_on = int(time.time())
try:
info = bencode.loads(metadata)
assert b"/" not in info[b"name"]
name = info[b"name"].decode("utf-8")
if b"files" in info: # Multiple File torrent:
for file in info[b"files"]:
assert type(file[b"length"]) is int
# Refuse trailing slash in any of the path items
assert not any(b"/" in item for item in file[b"path"])
path = "/".join(i.decode("utf-8") for i in file[b"path"])
files.append((info_hash, file[b"length"], path))
else: # Single File torrent:
assert type(info[b"length"]) is int
files.append((info_hash, info[b"length"], name))
# TODO: Make sure this catches ALL, AND ONLY operational errors
except (bencode.BencodeDecodingError, AssertionError, KeyError, AttributeError, UnicodeDecodeError, TypeError):
return False
self.__pending_metadata.append((info_hash, name, sum(f[1] for f in files), discovered_on))
# MYPY BUG: error: Argument 1 to "__iadd__" of "list" has incompatible type List[Tuple[bytes, Any, str]];
# expected Iterable[Tuple[bytes, int, bytes]]
# List is an Iterable man...
self.__pending_files += files # type: ignore
logging.info("Added: `%s`", name)
# Automatically check if the buffer is full, and commit to the SQLite database if so.
if len(self.__pending_metadata) >= PENDING_INFO_HASHES:
self.__commit_metadata()
return True
def is_infohash_new(self, info_hash):
if info_hash in [x[0] for x in self.__pending_metadata]:
return False
cur = self.__db_conn.cursor()
try:
cur.execute("SELECT count(info_hash) FROM torrents where info_hash = ?;", [info_hash])
x, = cur.fetchone()
return x == 0
finally:
cur.close()
def __commit_metadata(self) -> None:
cur = self.__db_conn.cursor()
cur.execute("BEGIN;")
# noinspection PyBroadException
try:
cur.executemany(
"INSERT INTO torrents (info_hash, name, total_size, discovered_on) VALUES (?, ?, ?, ?);",
self.__pending_metadata
)
cur.executemany(
"INSERT INTO files (torrent_id, size, path) "
"VALUES ((SELECT id FROM torrents WHERE info_hash=?), ?, ?);",
self.__pending_files
)
cur.execute("COMMIT;")
logging.info("%d metadata (%d files) are committed to the database.",
len(self.__pending_metadata), len(self.__pending_files))
self.__pending_metadata.clear()
self.__pending_files.clear()
except:
cur.execute("ROLLBACK;")
logging.exception("Could NOT commit metadata to the database! (%d metadata are pending)",
len(self.__pending_metadata))
finally:
cur.close()
def close(self) -> None:
if self.__pending_metadata:
self.__commit_metadata()
self.__db_conn.close()

View File

@ -1,296 +0,0 @@
# magneticod - Autonomous BitTorrent DHT crawler and metadata fetcher.
# Copyright (C) 2017 Mert Bora ALPER <bora@boramalper.org>
# Dedicated to Cemile Binay, in whose hands I thrived.
#
# This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General
# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any
# later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
# details.
#
# You should have received a copy of the GNU Affero General Public License along with this program. If not, see
# <http://www.gnu.org/licenses/>.
import enum
import typing
import cerberus
from magneticod import types
from ..codecs import bencode
from .. import transports
class Protocol:
def __init__(self):
self._transport = transports.bittorrent.TCPTransport()
self._transport.on_the_bittorrent_handshake_completed = self._when_the_bittorrent_handshake_completed
self._transport.on_message = self._when_message
self._transport.on_keepalive = self._when_keepalive
# When we initiate extension handshake, we set the keys of this dictionary with the ExtensionType's we show
# interest in, and if the remote peer is also interested in them, we record which type code the remote peer
# prefers for them in this dictionary, but until then, the keys have the None value. If our interest for an
# ExtensionType is not mutual, we remove the key from the dictionary.
self._enabled_extensions = {} # typing.Dict[ExtensionType, typing.Optional[int]]
async def launch(self) -> None:
await self._transport.launch()
# Offered Functionality
# =====================
def initiate_the_bittorrent_handshake(self, reserved: bytes, info_hash: types.InfoHash, peer_id: types.PeerID) \
-> None:
self._transport.initiate_the_bittorrent_handshake(reserved, info_hash, peer_id)
def send_keepalive(self) -> None:
pass
def send_choke_message(self) -> None:
raise NotImplementedError()
def send_unchoke_message(self) -> None:
raise NotImplementedError()
def send_interested_message(self) -> None:
raise NotImplementedError()
def send_not_interested_message(self) -> None:
raise NotImplementedError()
def send_have_message(self) -> None:
raise NotImplementedError()
def send_bitfield_message(self) -> None:
raise NotImplementedError()
def send_request_message(self) -> None:
raise NotImplementedError()
def send_piece_message(self) -> None:
raise NotImplementedError()
def send_cancel_message(self) -> None:
raise NotImplementedError()
def send_extension_handshake(
self,
supported_extensions: typing.Set[ExtensionType],
local_port: int=-1,
name_and_version: str="magneticod 0.x.x"
) -> None:
pass
@staticmethod
def on_the_bittorrent_handshake_completed(reserved: bytes, info_hash: types.InfoHash, peer_id: types.PeerID) \
-> None:
pass
@staticmethod
def on_keepalive() -> None:
pass
@staticmethod
def on_choke_message() -> None:
pass
@staticmethod
def on_unchoke_message() -> None:
pass
@staticmethod
def on_interested_message() -> None:
pass
@staticmethod
def on_not_interested_message() -> None:
pass
@staticmethod
def on_have_message(index: int) -> None:
pass
@staticmethod
def on_bitfield_message() -> None:
raise NotImplementedError()
@staticmethod
def on_request_message(index: int, begin: int, length: int) -> None:
pass
@staticmethod
def on_piece_message(index: int, begin: int, piece: int) -> None:
pass
@staticmethod
def on_cancel_message(index: int, begin: int, length: int) -> None:
pass
@staticmethod
def on_extension_handshake_completed(payload: types.Dictionary) -> None:
pass
# Private Functionality
# =====================
def _when_the_bittorrent_handshake_completed(
self,
reserved: bytes,
info_hash: types.InfoHash,
peer_id: types.PeerID
) -> None:
self.on_the_bittorrent_handshake_completed(reserved, info_hash, peer_id)
def _when_keepalive(self) -> None:
self.on_keepalive()
def _when_message(self, type_: bytes, payload: typing.ByteString) -> None:
if type_ == MessageTypes.CHOKE:
self.on_choke_message()
elif type_ == MessageTypes.UNCHOKE:
self.on_unchoke_message()
elif type_ == MessageTypes.INTERESTED:
self.on_interested_message()
elif type_ == MessageTypes.NOT_INTERESTED:
self.on_not_interested_message()
elif type_ == MessageTypes.HAVE:
index = int.from_bytes(payload[:4], "big")
self.on_have_message(index)
elif type_ == MessageTypes.BITFIELD:
raise NotImplementedError()
elif type_ == MessageTypes.REQUEST:
index = int.from_bytes(payload[:4], "big")
begin = int.from_bytes(payload[4:8], "big")
length = int.from_bytes(payload[8:12], "big")
self.on_request_message(index, begin, length)
elif type_ == MessageTypes.PIECE:
index = int.from_bytes(payload[:4], "big")
begin = int.from_bytes(payload[4:8], "big")
piece = int.from_bytes(payload[8:12], "big")
self.on_piece_message(index, begin, piece)
elif type_ == MessageTypes.CANCEL:
index = int.from_bytes(payload[:4], "big")
begin = int.from_bytes(payload[4:8], "big")
length = int.from_bytes(payload[8:12], "big")
self.on_cancel_message(index, begin, length)
elif type_ == MessageTypes.EXTENDED:
self._when_extended_message(type_=payload[:1], payload=payload[1:])
else:
pass
def _when_extended_message(self, type_: bytes, payload: typing.ByteString) -> None:
if type_ == 0:
self._when_extension_handshake(payload)
elif type_ == ExtensionType.UT_METADATA.value:
pass
else:
pass
def _when_extension_handshake(self, payload: typing.ByteString) -> None:
dictionary_schema = {
b"m": {
"type": "dict",
"keyschema": {"type": "binary", "empty": False},
"valueschema": {"type": "integer", "min": 0},
"required": True,
},
b"p": {
"type": "integer",
"min": 1,
"max": 2**16 - 1,
"required": False
},
b"v": {
"type": "binary",
"empty": False,
"required": False
},
b"yourip": {
"type": "binary",
# It's actually EITHER 4 OR 16, not anything in-between. We need to validate this ourselves!
"minlength": 4,
"maxlength": 16,
"required": False
},
b"ipv6": {
"type": "binary",
"minlength": 16,
"maxlength": 16,
"required": False
},
b"ipv4": {
"type": "binary",
"minlength": 4,
"maxlength": 4,
"required": False
},
b"reqq": {
"type": "integer",
"min": 0,
"required": False
}
}
try:
dictionary = bencode.decode(payload)
except bencode.DecodeError:
return
if not cerberus.Validator(dictionary_schema).validate(dictionary):
return
# Check which extensions, that we show interest in, are enabled by the remote peer.
if ExtensionType.UT_METADATA in self._enabled_extensions and b"ut_metadata" in dictionary[b"m"]:
self._enabled_extensions[ExtensionType.UT_METADATA] = dictionary[b"m"][b"ut_metadata"]
# As there can be multiple SUBSEQUENT extension-handshake-messages, check for the existence of b"metadata_size"
# in the top level dictionary ONLY IF b"ut_metadata" exists in b"m". `ut_metadata` might be enabled before,
# and other subsequent extension-handshake-messages do not need to include 'metadata_size` field in the top
# level dictionary whilst enabling-and/or-disabling other extension features.
if (b"ut_metadata" in dictionary[b"m"]) ^ (b"metadata_size" not in dictionary):
return
self.on_extension_handshake_completed(dictionary)
@enum.unique
class MessageTypes(enum.IntEnum):
CHOKE = 0
UNCHOKE = 1
INTERESTED = 2
NOT_INTERESTED = 3
HAVE = 4
BITFIELD = 5
REQUEST = 6
PIECE = 7
CANCEL = 8
EXTENDED = 20
@enum.unique
class ReservedFeature(enum.Enum):
# What do values mean?
# The first number is the offset, and the second number is the bit to set. For instance,
# EXTENSION_PROTOCOL = (5, 0x10) means that reserved[5] & 0x10 should be true.
DHT = (7, 0x01)
EXTENSION_PROTOCOL = (5, 0x10)
def reserved_feature_set_to_reserved(reserved_feature_set: typing.Set[ReservedFeature]) -> bytes:
reserved = 8 * b"\x00"
for reserved_feature in reserved_feature_set:
reserved[reserved_feature.value[0]] |= reserved_feature.value[1]
return reserved
def reserved_to_reserved_feature_set(reserved: bytes) -> typing.Set[ReservedFeature]:
return {
reserved_feature
for reserved_feature in ReservedFeature
if reserved[reserved_feature.value[0]] & reserved_feature.value[1]
}
@enum.unique
class ExtensionType(enum.IntEnum):
UT_METADATA = 1

View File

@ -1,396 +0,0 @@
# magneticod - Autonomous BitTorrent DHT crawler and metadata fetcher.
# Copyright (C) 2017 Mert Bora ALPER <bora@boramalper.org>
# Dedicated to Cemile Binay, in whose hands I thrived.
#
# This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General
# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any
# later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
# details.
#
# You should have received a copy of the GNU Affero General Public License along with this program. If not, see
# <http://www.gnu.org/licenses/>.
import asyncio
import enum
import typing
import cerberus
from . import transport
class Protocol:
def __init__(self, client_version: bytes):
self._client_version = client_version
self._transport = transport.Transport()
self._transport.on_message = self.__when_message
async def launch(self, address: transport.Address):
await asyncio.get_event_loop().create_datagram_endpoint(lambda: self._transport, local_addr=address)
# Offered Functionality
# =====================
def make_query(self, query: BaseQuery, address: transport.Address) -> None:
return self._transport.send_message(query.to_message(b"\0\0", self._client_version), address)
@staticmethod
def on_ping_query(query: PingQuery, address: transport.Address) \
-> typing.Optional[typing.Union[PingResponse, Error]]:
pass
@staticmethod
def on_find_node_query(query: FindNodeQuery, address: transport.Address) \
-> typing.Optional[typing.Union[FindNodeResponse, Error]]:
pass
@staticmethod
def on_get_peers_query(query: GetPeersQuery, address: transport.Address) \
-> typing.Optional[typing.Union[GetPeersResponse, Error]]:
pass
@staticmethod
def on_announce_peer_query(query: AnnouncePeerQuery, address: transport.Address) \
-> typing.Optional[typing.Union[AnnouncePeerResponse, Error]]:
pass
@staticmethod
def on_ping_OR_announce_peer_response(response: PingResponse, address: transport.Address) -> None:
pass
@staticmethod
def on_find_node_response(response: FindNodeResponse, address: transport.Address) -> None:
pass
@staticmethod
def on_get_peers_response(response: GetPeersResponse, address: transport.Address) -> None:
pass
@staticmethod
def on_error(error: Error, address: transport.Address) -> None:
pass
# Private Functionality
# =====================
def __when_message(self, message: typing.Dict[bytes, typing.Any], address: transport.Address) -> None:
# We need to ignore unknown fields in the messages, in consideration of forward-compatibility, but that also
# requires us to be careful about the "order" we are following. For instance, every single query can also be
# misunderstood as a ping query, since they all have `id` as an argument. Hence, we start validating against the
# query/response type that is most distinguishing against all other.
if BaseQuery.validate_message(message):
args = message[b"a"]
if AnnouncePeerQuery.validate_message(message):
response = self.on_announce_peer_query(AnnouncePeerQuery(
args[b"id"], args[b"info_hash"], args[b"port"], args[b"token"], args[b"implied_port"]
), address)
elif GetPeersQuery.validate_message(message):
response = self.on_get_peers_query(GetPeersQuery(args[b"id"], args[b"info_hash"]), address)
elif FindNodeQuery.validate_message(message):
response = self.on_find_node_query(FindNodeQuery(args[b"id"], args[b"target"]), address)
elif PingQuery.validate_message(message):
response = self.on_ping_query(PingQuery(args[b"id"]), address)
else:
# Unknown Query received!
response = None
if response:
self._transport.send_message(response.to_message(message[b"t"], self._client_version), address)
elif BaseResponse.validate_message(message):
return_values = message[b"r"]
if GetPeersResponse.validate_message(message):
if b"nodes" in return_values:
self.on_get_peers_response(GetPeersResponse(
return_values[b"id"], return_values[b"token"], nodes=return_values[b"nodes"]
), address)
else:
self.on_get_peers_response(GetPeersResponse(
return_values[b"id"], return_values[b"token"], values=return_values[b"values"]
), address)
elif FindNodeResponse.validate_message(message):
self.on_find_node_response(FindNodeResponse(return_values[b"id"], return_values[b"nodes"]), address)
elif PingResponse.validate_message(message):
self.on_ping_OR_announce_peer_response(PingResponse(return_values[b"id"]), address)
else:
# Unknown Response received!
pass
elif Error.validate_message(message):
if Error.validate_message(message):
self.on_error(Error(message[b"e"][0], message[b"e"][1]), address)
else:
# Erroneous Error received!
pass
else:
# Unknown message received!
pass
NodeID = typing.NewType("NodeID", bytes)
InfoHash = typing.NewType("InfoHash", bytes)
NodeInfo = typing.NamedTuple("NodeInfo", [
("id", NodeID),
("address", transport.Address),
])
class BaseQuery:
method_name = b""
_arguments_schema = {
b"id": {"type": "binary", "minlength": 20, "maxlength": 20, "required": True}
}
__validator = cerberus.Validator()
def __init__(self, id_: NodeID):
self.id = id_
def to_message(self, transaction_id: bytes, client_version: bytes) -> typing.Dict[bytes, typing.Any]:
return {
b"t": transaction_id,
b"y": b"q",
b"v": client_version,
b"q": self.method_name,
b"a": self.__dict__
}
@classmethod
def validate_message(cls, message: typing.Dict[bytes, typing.Any]) -> bool:
if cls.__validator.schema is None:
cls.__validator.schema = cls.__get_message_schema()
return cls.__validator.validate(message)
@classmethod
def __get_message_schema(cls):
return {
b"t": {"type": "binary", "empty": False, "required": True},
b"y": {"type": "binary", "empty": False, "required": True},
b"v": {"type": "binary", "empty": False, "required": False},
b"q": {"type": "binary", "empty": False, "required": True},
b"a": cls._arguments_schema
}
class PingQuery(BaseQuery):
method_name = b"ping"
def __init__(self, id_: NodeID):
super().__init__(id_)
class FindNodeQuery(BaseQuery):
method_name = b"find_node"
_arguments_schema = {
**super()._arguments_schema,
b"target": {"type": "binary", "minlength": 20, "maxlength": 20, "required": True}
}
def __init__(self, id_: NodeID, target: NodeID):
super().__init__(id_)
self.target = target
class GetPeersQuery(BaseQuery):
method_name = b"get_peers"
_arguments_schema = {
**super()._arguments_schema,
b"info_hash": {"type": "binary", "minlength": 20, "maxlength": 20, "required": True}
}
def __init__(self, id_: NodeID, info_hash: InfoHash):
super().__init__(id_)
self.info_hash = info_hash
class AnnouncePeerQuery(BaseQuery):
method_name = b"announce_peer"
_arguments_schema = {
**super()._arguments_schema,
b"info_hash": {"type": "binary", "minlength": 20, "maxlength": 20, "required": True},
b"port": {"type": "integer", "min": 1, "max": 2**16 - 1, "required": True},
b"token": {"type": "binary", "empty": False, "required": True},
b"implied_port": {"type": "integer", "required": False}
}
def __init__(self, id_: NodeID, info_hash: InfoHash, port: int, token: bytes, implied_port: int=0):
super().__init__(id_)
self.info_hash = info_hash
self.port = port
self.token = token
self.implied_port = implied_port
class BaseResponse:
_return_values_schema = {
b"id": {"type": "binary", "minlength": 20, "maxlength": 20, "required": True}
}
__validator = cerberus.Validator()
def __init__(self, id_: NodeID):
self.id = id_
def to_message(self, transaction_id: bytes, client_version: bytes) -> typing.Dict[bytes, typing.Any]:
return {
b"t": transaction_id,
b"y": b"r",
b"v": client_version,
b"r": self._return_values()
}
@classmethod
def validate_message(cls, message: typing.Dict[bytes, typing.Any]) -> bool:
if cls.__validator.schema is None:
cls.__validator.schema = cls.__get_message_schema()
return cls.__validator.validate(message)
def _return_values(self) -> typing.Dict[bytes, typing.Any]:
return {b"id": self.id}
@classmethod
def __get_message_schema(cls):
return {
b"t": {"type": "binary", "empty": False, "required": True},
b"y": {"type": "binary", "empty": False, "required": True},
b"v": {"type": "binary", "empty": False, "required": False},
b"r": cls._return_values_schema
}
class PingResponse(BaseResponse):
def __init__(self, id_: NodeID):
super().__init__(id_)
class FindNodeResponse(BaseResponse):
_return_values_schema = {
**super()._return_values_schema,
b"nodes": {"type": "binary", "required": True}
}
__validator = cerberus.Validator()
def __init__(self, id_: NodeID, nodes: typing.List[NodeInfo]):
super().__init__(id_)
self.nodes = nodes
@classmethod
def validate_message(cls, message: typing.Dict[bytes, typing.Any]) -> bool:
if cls.__validator.schema is None:
cls.__validator.schema = cls.__get_message_schema()
if not cls.__validator.validate(message):
return False
# Unfortunately, Cerberus cannot check some fine details.
# For instance, the length of the `nodes` field in the return values of the response message has to be a
# multiple of 26, as "contact information for nodes is encoded as a 26-byte string" (BEP 5).
if not message[b"r"][b"nodes"] % 26 == 0:
return False
return True
def _return_values(self) -> typing.Dict[bytes, typing.Any]:
return {
**super()._return_values(),
b"nodes": self.nodes # TODO: this is not right obviously, encode & decode!
}
class GetPeersResponse(BaseResponse):
_return_values_schema = {
**super()._return_values_schema,
b"token": {"type": "binary", "empty": False, "required": True},
b"values": {
"type": "list",
"schema": {"type": "binary", "minlength": 6, "maxlength": 6},
"excludes": b"nodes",
"empty": False,
"require": True
},
b"nodes": {"type": "binary", "excludes": b"values", "empty": True, "require": True}
}
__validator = cerberus.Validator()
def __init__(self, id_: NodeID, token: bytes, *, values: typing.Optional[typing.List[transport.Address]]=None,
nodes: typing.Optional[typing.List[NodeInfo]]=None
):
if not (values and nodes):
raise ValueError("Supply either `values` or `nodes` or neither but not both.")
super().__init__(id_)
self.token = token
self.values = values,
self.nodes = nodes
@classmethod
def validate_message(cls, message: typing.Dict[bytes, typing.Any]) -> bool:
if cls.__validator.schema is None:
cls.__validator.schema = cls.__get_message_schema()
if not cls.__validator.validate(message):
return False
# Unfortunately, Cerberus cannot check some fine details.
# For instance, the length of the `nodes` field in the return values of the response message has to be a
# multiple of 26, as "contact information for nodes is encoded as a 26-byte string" (BEP 5).
if b"nodes" in message[b"r"] ^ message[b"r"][b"nodes"] % 26 == 0:
return False
return True
class AnnouncePeerResponse(BaseResponse):
def __init__(self, id_: NodeID):
super().__init__(id_)
@enum.unique
class ErrorCodes(enum.IntEnum):
GENERIC = 201
SERVER = 202
PROTOCOL = 203
METHOD_UNKNOWN = 204
class Error:
__validator = cerberus.Validator()
def __init__(self, code: ErrorCodes, error_message: bytes):
self.code = code
self.error_message = error_message
def to_message(self, transaction_id: bytes, client_version: bytes) -> typing.Dict[bytes, typing.Any]:
return {
b"t": transaction_id,
b"y": b"e",
b"v": client_version,
b"e": [self.code, self.error_message]
}
@classmethod
def validate_message(cls, message: typing.Dict[bytes, typing.Any]) -> bool:
if cls.__validator.schema is None:
cls.__validator.schema = cls.__get_message_schema()
if not cls.__validator.validate(message):
return False
# Unfortunately, Cerberus cannot check some fine details.
# For instance, the `e` field of the error message should be an array with first element being an integer, and
# the second element being a (binary) string.
if not (isinstance(message[b"e"], int) and isinstance(message[b"e"], bytes)):
return False
return True
@classmethod
def __get_message_schema(cls):
return {
b"t": {"type": "binary", "empty": False, "required": True},
b"y": {"type": "binary", "empty": False, "required": True},
b"v": {"type": "binary", "empty": False, "required": False},
b"e": {"type": "list", "minlength": 2, "maxlength": 2, "required": True}
}

View File

@ -1,63 +0,0 @@
# magneticod - Autonomous BitTorrent DHT crawler and metadata fetcher.
# Copyright (C) 2017 Mert Bora ALPER <bora@boramalper.org>
# Dedicated to Cemile Binay, in whose hands I thrived.
#
# This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General
# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any
# later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
# details.
#
# You should have received a copy of the GNU Affero General Public License along with this program. If not, see
# <http://www.gnu.org/licenses/>.
import logging
from ..protocols import bittorrent as bt_protocol
from magneticod import types
class MetadataService:
def __init__(self, peer_id: types.PeerID, info_hash: types.InfoHash):
self._protocol = bt_protocol.Protocol()
self._protocol.on_the_bittorrent_handshake_completed = self._when_the_bittorrent_handshake_completed
self._protocol.on_extension_handshake_completed = self._when_extension_handshake_completed
self._peer_id = peer_id
self._info_hash = info_hash
async def launch(self) -> None:
await self._protocol.launch()
self._protocol.initiate_the_bittorrent_handshake(
bt_protocol.reserved_feature_set_to_reserved({
bt_protocol.ReservedFeature.EXTENSION_PROTOCOL,
bt_protocol.ReservedFeature.DHT
}),
self._info_hash,
self._peer_id
)
# Offered Functionality
# =====================
@staticmethod
def on_fatal_failure() -> None:
pass
# Private Functionality
# =====================
def _when_the_bittorrent_handshake_completed(
self,
reserved: bytes,
info_hash: types.InfoHash,
peer_id: types.PeerID
) -> None:
if bt_protocol.ReservedFeature.EXTENSION_PROTOCOL not in bt_protocol.reserved_to_reserved_feature_set(reserved):
logging.info("Peer does NOT support the extension protocol.")
self.on_fatal_failure()
self._protocol.send_extension_handshake({bt_protocol.ExtensionType.UT_METADATA})
def _when_extension_handshake_completed(self, payload: types.Dictionary) -> None:
pass

View File

@ -1,92 +0,0 @@
# magneticod - Autonomous BitTorrent DHT crawler and metadata fetcher.
# Copyright (C) 2017 Mert Bora ALPER <bora@boramalper.org>
# Dedicated to Cemile Binay, in whose hands I thrived.
#
# This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General
# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any
# later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
# details.
#
# You should have received a copy of the GNU Affero General Public License along with this program. If not, see
# <http://www.gnu.org/licenses/>.
import asyncio
import hashlib
import os
import socket
import typing
from magneticod import constants
from . import protocol
class PeerService:
def __init__(self):
self._protocol = protocol.Protocol(b"mc00")
self._protocol.on_get_peers_query = self._when_get_peers_query
self._protocol.on_announce_peer_query = self._when_announce_peer_query
self._protocol.on_find_node_response = self._when_find_node_response
self._true_node_id = os.urandom(20)
self._token_secret = os.urandom(4)
self._routing_table = {} # typing.Dict[protocol.NodeID, protocol.transport.Address]
self._tick_task = None
async def launch(self, address: protocol.transport.Address):
await self._protocol.launch(address)
self._tick_task = asyncio.ensure_future(self._tick_periodically())
# Offered Functionality
# =====================
@staticmethod
def on_peer(info_hash: protocol.InfoHash, address: protocol.transport.Address) -> None:
pass
# Private Functionality
# =====================
async def _tick_periodically(self) -> None:
while True:
if not self._routing_table:
await self._bootstrap()
else:
self._make_neighbors()
self._routing_table.clear()
await asyncio.sleep(constants.TICK_INTERVAL)
async def _bootstrap(self) -> None:
event_loop = asyncio.get_event_loop()
for node in constants.BOOTSTRAPPING_NODES:
for *_, address in await event_loop.getaddrinfo(*node, family=socket.AF_INET):
self._protocol.make_query(protocol.FindNodeQuery(self._true_node_id, os.urandom(20)), address)
def _make_neighbors(self) -> None:
for id_, address in self._routing_table.items():
self._protocol.make_query(
protocol.FindNodeQuery(id_[:15] + self._true_node_id[:5], os.urandom(20)),
address
)
def _when_get_peers_query(self, query: protocol.GetPeersQuery, address: protocol.transport.Address) \
-> typing.Optional[typing.Union[protocol.GetPeersResponse, protocol.Error]]:
return protocol.GetPeersResponse(query.info_hash[:15] + self._true_node_id[:5], self._calculate_token(address))
def _when_announce_peer_query(self, query: protocol.AnnouncePeerQuery, address: protocol.transport.Address) \
-> typing.Optional[typing.Union[protocol.AnnouncePeerResponse, protocol.Error]]:
if query.implied_port:
peer_address = (address[0], address[1])
else:
peer_address = (address[0], query.port)
self.on_info_hash_and_peer(query.info_hash, peer_address)
return protocol.AnnouncePeerResponse(query.info_hash[:15] + self._true_node_id[:5])
def _when_find_node_response(self, response: protocol.FindNodeResponse, address: protocol.transport.Address) \
-> None:
self._routing_table.update({node.id: node.address for node in response.nodes if node.address != 0})
def _calculate_token(self, address: protocol.transport.Address) -> bytes:
return hashlib.sha1(b"%s%d" % (socket.inet_aton(address[0]), socket.htons(address[1]))).digest()

View File

@ -1,102 +0,0 @@
# magneticod - Autonomous BitTorrent DHT crawler and metadata fetcher.
# Copyright (C) 2017 Mert Bora ALPER <bora@boramalper.org>
# Dedicated to Cemile Binay, in whose hands I thrived.
#
# This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General
# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any
# later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
# details.
#
# You should have received a copy of the GNU Affero General Public License along with this program. If not, see
# <http://www.gnu.org/licenses/>.
import asyncio
import typing
from magneticod import types
class TCPTransport(asyncio.Protocol):
def __init__(self):
self._stream_transport = asyncio.Transport()
self._incoming = bytearray()
self._awaiting_the_bittorrent_handshake = True
async def launch(self):
await asyncio.get_event_loop().create_connection(lambda: self, "0.0.0.0", 0)
# Offered Functionality
# =====================
def initiate_the_bittorrent_handshake(self, reserved: bytes, info_hash: types.InfoHash, peer_id: types.PeerID) -> None:
self._stream_transport.write(b"\x13BitTorrent protocol%s%s%s" % (
reserved,
info_hash,
peer_id
))
def send_keepalive(self) -> None:
self._stream_transport.write(b"\x00\x00\x00\x00")
def send_message(self, type_: bytes, payload: typing.ByteString) -> None:
if len(type_) != 1:
raise ValueError("Argument `type_` must be a single byte!")
length = 1 + len(payload)
@staticmethod
def on_keepalive() -> None:
pass
@staticmethod
def on_message(type_: bytes, payload: typing.ByteString) -> None:
pass
@staticmethod
def on_the_bittorrent_handshake_completed(
reserved: typing.ByteString,
info_hash: types.InfoHash,
peer_id: types.PeerID
) -> None:
pass
# Private Functionality
# =====================
def connection_made(self, transport: asyncio.Transport) -> None:
self._stream_transport = transport
def data_received(self, data: typing.ByteString) -> None:
self._incoming += data
if self._awaiting_the_bittorrent_handshake:
if len(self._incoming) >= 68:
assert self._incoming.startswith(b"\x13BitTorrent protocol")
self.on_the_bittorrent_handshake_completed(
reserved=self._incoming[20:28],
info_hash=self._incoming[28:48],
peer_id=self._incoming[48:68]
)
self._incoming = self._incoming[68:]
self._awaiting_the_bittorrent_handshake = False
else:
return
# Continue or Start the "usual" processing from here below
if len(self._incoming) >= 4 and len(self._incoming) - 1 >= int.from_bytes(self._incoming[:4], "big"):
if int.from_bytes(self._incoming[:4], "big"):
self.on_keepalive()
else:
self.on_message(self._incoming[4], self._incoming[5:])
def eof_received(self):
pass
def connection_lost(self, exc: Exception) -> None:
pass
class UTPTransport:
pass

View File

@ -1,114 +0,0 @@
# magneticod - Autonomous BitTorrent DHT crawler and metadata fetcher.
# Copyright (C) 2017 Mert Bora ALPER <bora@boramalper.org>
# Dedicated to Cemile Binay, in whose hands I thrived.
#
# This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General
# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any
# later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
# details.
#
# You should have received a copy of the GNU Affero General Public License along with this program. If not, see
# <http://www.gnu.org/licenses/>.
import asyncio
import collections
import logging
import sys
import time
import typing
from . import codec
Address = typing.Tuple[str, int]
MessageQueueEntry = typing.NamedTuple("MessageQueueEntry", [
("queued_on", int),
("message", codec.Message),
("address", Address)
])
class Transport(asyncio.DatagramProtocol):
"""
Mainline DHT Transport
The signature `class Transport(asyncio.DatagramProtocol)` seems almost oxymoron, but it's indeed more sensible than
it first seems. `Transport` handles ALL that is related to transporting messages, which includes receiving them
(`asyncio.DatagramProtocol.datagram_received`), sending them (`asyncio.DatagramTransport.send_to`), pausing and
resuming writing as requested by the asyncio, and also handling operational errors.
"""
def __init__(self):
super().__init__()
self._datagram_transport = asyncio.DatagramTransport()
self._write_allowed = asyncio.Event()
self._queue_nonempty = asyncio.Event()
self._message_queue = collections.deque() # type: typing.Deque[MessageQueueEntry]
self._messenger_task = asyncio.Task(self._send_messages())
# Offered Functionality
# =====================
def send_message(self, message: codec.Message, address: Address) -> None:
self._message_queue.append(MessageQueueEntry(int(time.monotonic()), message, address))
if not self._queue_nonempty.is_set():
self._queue_nonempty.set()
@staticmethod
def on_message(message: codec.Message, address: Address):
pass
# Private Functionality
# =====================
def connection_made(self, transport: asyncio.DatagramTransport) -> None:
self._datagram_transport = transport
self._write_allowed.set()
def datagram_received(self, data: bytes, address: Address) -> None:
# Ignore nodes that "uses" port 0, as we cannot communicate with them reliably across the different systems.
# See https://tools.cisco.com/security/center/viewAlert.x?alertId=19935 for slightly more details
if address[1] == 0:
return
try:
message = codec.decode(data)
except codec.DecodeError:
return
if not isinstance(message, dict):
return
self.on_message(message, address)
def error_received(self, exc: OSError):
logging.debug("Mainline DHT received error!", exc_info=exc)
def pause_writing(self):
self._write_allowed.clear()
def resume_writing(self):
self._write_allowed.set()
def connection_lost(self, exc: Exception):
if exc:
logging.fatal("Mainline DHT lost connection! (See the following log entry for the exception.)",
exc_info=exc
)
else:
logging.fatal("Mainline DHT lost connection!")
sys.exit(1)
async def _send_messages(self) -> None:
while True:
await asyncio.wait([self._write_allowed.wait(), self._queue_nonempty.wait()])
try:
queued_on, message, address = self._message_queue.pop()
except IndexError:
self._queue_nonempty.clear()
continue
if time.monotonic() - queued_on > 60:
return
self._datagram_transport.sendto(codec.encode(message), address)

View File

@ -1,8 +0,0 @@
import typing
InfoHash = typing.ByteString
NodeID = typing.ByteString
PeerID = typing.ByteString
IPAddress = typing.Tuple[str, int]
Dictionary = typing.Dict[bytes, typing.Any]

View File

@ -1,51 +0,0 @@
from setuptools import find_packages, setup, Extension
import sys
def read_file(path):
with open(path) as file:
return file.read()
def run_setup():
install_requirements = [
"appdirs >= 1.4.3",
"humanfriendly",
"better_bencode >= 0.2.1",
"cerberus >= 1.1"
]
if sys.platform in ["linux", "darwin"]:
install_requirements.append("uvloop >= 0.8.0")
setup(
name="magneticod",
version="0.6.0",
description="Autonomous BitTorrent DHT crawler and metadata fetcher.",
long_description=read_file("README.rst"),
url="http://magnetico.org",
author="Mert Bora ALPER",
author_email="bora@boramalper.org",
license="GNU Affero General Public License v3 or later (AGPLv3+)",
packages=find_packages(),
zip_safe=False,
entry_points={
"console_scripts": ["magneticod=magneticod.__main__:main"]
},
install_requires=install_requirements,
classifiers=[
"Development Status :: 2 - Pre-Alpha",
"Environment :: No Input/Output (Daemon)",
"Intended Audience :: End Users/Desktop",
"License :: OSI Approved :: GNU Affero General Public License v3 or later (AGPLv3+)",
"Natural Language :: English",
"Operating System :: POSIX :: Linux",
"Programming Language :: Python :: 3 :: Only",
"Programming Language :: Python :: Implementation :: CPython",
]
)
run_setup()

View File

@ -1,10 +0,0 @@
[Unit]
Description=magneticod: autonomous BitTorrent DHT crawler and metadata fetcher
[Service]
ExecStart=~/.local/bin/magneticod --node-addr 0.0.0.0:PORT_NUMBER
Restart=always
RestartSec=5
[Install]
WantedBy=multi-user.target

4
pkg/.gitignore vendored Normal file
View File

@ -0,0 +1,4 @@
# Ignore everything in this directory
*
# Except this file
!.gitignore

46
src/magneticod/Gopkg.toml Normal file
View File

@ -0,0 +1,46 @@
# Gopkg.toml example
#
# Refer to https://github.com/golang/dep/blob/master/docs/Gopkg.toml.md
# for detailed Gopkg.toml documentation.
#
# required = ["github.com/user/thing/cmd/thing"]
# ignored = ["github.com/user/project/pkgX", "bitbucket.org/user/project/pkgA/pkgY"]
#
# [[constraint]]
# name = "github.com/user/project"
# version = "1.0.0"
#
# [[constraint]]
# name = "github.com/user/project2"
# branch = "dev"
# source = "github.com/myfork/project2"
#
# [[override]]
# name = "github.com/x/y"
# version = "2.4.0"
[[constraint]]
branch = "master"
name = "github.com/anacrolix/missinggo"
[[constraint]]
branch = "master"
name = "github.com/anacrolix/torrent"
[[constraint]]
branch = "master"
name = "github.com/bradfitz/iter"
[[constraint]]
name = "github.com/jessevdk/go-flags"
version = "1.3.0"
[[constraint]]
name = "github.com/mattn/go-sqlite3"
version = "1.2.0"
[[constraint]]
name = "go.uber.org/zap"
version = "1.5.0"

View File

@ -0,0 +1,67 @@
package bittorrent
import (
"time"
"strings"
"github.com/anacrolix/torrent"
"github.com/anacrolix/torrent/metainfo"
"go.uber.org/zap"
)
func (ms *MetadataSink) awaitMetadata(infoHash metainfo.Hash, peer torrent.Peer) {
zap.L().Sugar().Debugf("awaiting %x...", infoHash[:])
t, isNew := ms.client.AddTorrentInfoHash(infoHash)
t.AddPeers([]torrent.Peer{peer})
if !isNew {
// If the recently added torrent is not new, then quit as we do not want multiple
// awaitMetadata goroutines waiting on the same torrent.
return
} else {
defer t.Drop()
}
// Wait for the torrent client to receive the metadata for the torrent, meanwhile allowing
// termination to be handled gracefully.
select {
case <- ms.termination:
return
case <- t.GotInfo():
}
zap.L().Sugar().Warnf("==== GOT INFO for %x", infoHash[:])
info := t.Info()
var files []metainfo.FileInfo
if len(info.Files) == 0 {
if strings.ContainsRune(info.Name, '/') {
// A single file torrent cannot have any '/' characters in its name. We treat it as
// illegal.
return
}
files = []metainfo.FileInfo{{Length: info.Length, Path:[]string{info.Name}}}
} else {
// TODO: We have to make sure that anacrolix/torrent checks for '/' character in file paths
// before concatenating them. This is currently assumed here. We should write a test for it.
files = info.Files
}
var totalSize uint64
for _, file := range files {
if file.Length < 0 {
// All files' sizes must be greater than or equal to zero, otherwise treat them as
// illegal and ignore.
return
}
totalSize += uint64(file.Length)
}
ms.flush(Metadata{
InfoHash: infoHash[:],
Name: info.Name,
TotalSize: totalSize,
DiscoveredOn: time.Now().Unix(),
Files: files,
})
}

View File

@ -0,0 +1,86 @@
package bittorrent
import (
"go.uber.org/zap"
"github.com/anacrolix/torrent"
"github.com/anacrolix/torrent/metainfo"
"magneticod/dht/mainline"
"net"
)
type Metadata struct {
InfoHash []byte
// Name should be thought of "Title" of the torrent. For single-file torrents, it is the name
// of the file, and for multi-file torrents, it is the name of the root directory.
Name string
TotalSize uint64
DiscoveredOn int64
// Files must be populated for both single-file and multi-file torrents!
Files []metainfo.FileInfo
}
type MetadataSink struct {
activeInfoHashes []metainfo.Hash
client *torrent.Client
drain chan Metadata
terminated bool
termination chan interface{}
}
func NewMetadataSink(laddr net.TCPAddr) *MetadataSink {
ms := new(MetadataSink)
var err error
ms.client, err = torrent.NewClient(&torrent.Config{
ListenAddr: laddr.String(),
DisableTrackers: true,
DisablePEX: true,
// TODO: Should we disable DHT to force the client to use the peers we supplied only, or not?
NoDHT: false,
PreferNoEncryption: true,
})
if err != nil {
zap.L().Fatal("Fetcher could NOT create a new torrent client!", zap.Error(err))
}
ms.drain = make(chan Metadata)
ms.termination = make(chan interface{})
return ms
}
func (ms *MetadataSink) Sink(res mainline.TrawlingResult) {
if ms.terminated {
zap.L().Panic("Trying to Sink() an already closed MetadataSink!")
}
ms.activeInfoHashes = append(ms.activeInfoHashes, res.InfoHash)
go ms.awaitMetadata(res.InfoHash, res.Peer)
}
func (ms *MetadataSink) Drain() <-chan Metadata {
if ms.terminated {
zap.L().Panic("Trying to Drain() an already closed MetadataSink!")
}
return ms.drain
}
func (ms *MetadataSink) Terminate() {
ms.terminated = true
close(ms.termination)
ms.client.Close()
close(ms.drain)
}
func (ms *MetadataSink) flush(metadata Metadata) {
if ms.terminated {
ms.drain <- metadata
}
}

View File

@ -0,0 +1,260 @@
// TODO: This file, as a whole, needs a little skim-through to clear things up, sprinkle a little
// documentation here and there, and also to make the test coverage 100%.
// It, most importantly, lacks IPv6 support, if it's not altogether messy and unreliable
// (hint: it is).
package mainline
import (
"fmt"
"encoding/binary"
"net"
"github.com/anacrolix/torrent/bencode"
"github.com/anacrolix/missinggo/iter"
"regexp"
)
type Message struct {
// Query method (one of 4: "ping", "find_node", "get_peers", "announce_peer")
Q string `bencode:"q,omitempty"`
// named QueryArguments sent with a query
A QueryArguments `bencode:"a,omitempty"`
// required: transaction ID
T []byte `bencode:"t"`
// required: type of the message: q for QUERY, r for RESPONSE, e for ERROR
Y string `bencode:"y"`
// RESPONSE type only
R ResponseValues `bencode:"r,omitempty"`
// ERROR type only
E Error `bencode:"e,omitempty"`
}
type QueryArguments struct {
// ID of the quirying Node
ID []byte `bencode:"id"`
// InfoHash of the torrent
InfoHash []byte `bencode:"info_hash,omitempty"`
// ID of the node sought
Target []byte `bencode:"target,omitempty"`
// Token received from an earlier get_peers query
Token []byte `bencode:"token,omitempty"`
// Senders torrent port
Port int `bencode:"port,omitempty"`
// Use senders apparent DHT port
ImpliedPort int `bencode:"implied_port,omitempty"`
}
type ResponseValues struct {
// ID of the querying node
ID []byte `bencode:"id"`
// K closest nodes to the requested target
Nodes CompactNodeInfos `bencode:"nodes,omitempty"`
// Token for future announce_peer
Token []byte `bencode:"token,omitempty"`
// Torrent peers
Values []CompactPeer `bencode:"values,omitempty"`
}
type Error struct {
Code int
Message []byte
}
// Represents peer address in either IPv6 or IPv4 form.
type CompactPeer struct {
IP net.IP
Port int
}
type CompactPeers []CompactPeer
type CompactNodeInfo struct {
ID []byte
Addr net.UDPAddr
}
type CompactNodeInfos []CompactNodeInfo
// This allows bencode.Unmarshal to do better than a string or []byte.
func (cps *CompactPeers) UnmarshalBencode(b []byte) (err error) {
var bb []byte
err = bencode.Unmarshal(b, &bb)
if err != nil {
return
}
*cps, err = UnmarshalCompactPeers(bb)
return
}
func (cps CompactPeers) MarshalBinary() (ret []byte, err error) {
ret = make([]byte, len(cps)*6)
for i, cp := range cps {
copy(ret[6*i:], cp.IP.To4())
binary.BigEndian.PutUint16(ret[6*i+4:], uint16(cp.Port))
}
return
}
func (cp CompactPeer) MarshalBencode() (ret []byte, err error) {
ip := cp.IP
if ip4 := ip.To4(); ip4 != nil {
ip = ip4
}
ret = make([]byte, len(ip)+2)
copy(ret, ip)
binary.BigEndian.PutUint16(ret[len(ip):], uint16(cp.Port))
return bencode.Marshal(ret)
}
func (cp *CompactPeer) UnmarshalBinary(b []byte) error {
switch len(b) {
case 18:
cp.IP = make([]byte, 16)
case 6:
cp.IP = make([]byte, 4)
default:
return fmt.Errorf("bad compact peer string: %q", b)
}
copy(cp.IP, b)
b = b[len(cp.IP):]
cp.Port = int(binary.BigEndian.Uint16(b))
return nil
}
func (cp *CompactPeer) UnmarshalBencode(b []byte) (err error) {
var _b []byte
err = bencode.Unmarshal(b, &_b)
if err != nil {
return
}
return cp.UnmarshalBinary(_b)
}
func UnmarshalCompactPeers(b []byte) (ret []CompactPeer, err error) {
num := len(b) / 6
ret = make([]CompactPeer, num)
for i := range iter.N(num) {
off := i * 6
err = ret[i].UnmarshalBinary(b[off : off+6])
if err != nil {
return
}
}
return
}
// This allows bencode.Unmarshal to do better than a string or []byte.
func (cnis *CompactNodeInfos) UnmarshalBencode(b []byte) (err error) {
var bb []byte
err = bencode.Unmarshal(b, &bb)
if err != nil {
return
}
*cnis, err = UnmarshalCompactNodeInfos(bb)
return
}
func UnmarshalCompactNodeInfos(b []byte) (ret []CompactNodeInfo, err error) {
if len(b) % 26 != 0 {
err = fmt.Errorf("compact node is not a multiple of 26")
return
}
num := len(b) / 26
ret = make([]CompactNodeInfo, num)
for i := range iter.N(num) {
off := i * 26
ret[i].ID = make([]byte, 20)
err = ret[i].UnmarshalBinary(b[off : off+26])
if err != nil {
return
}
}
return
}
func (cni *CompactNodeInfo) UnmarshalBinary(b []byte) error {
copy(cni.ID[:], b)
b = b[len(cni.ID):]
cni.Addr.IP = make([]byte, 4)
copy(cni.Addr.IP, b)
b = b[len(cni.Addr.IP):]
cni.Addr.Port = int(binary.BigEndian.Uint16(b))
cni.Addr.Zone = ""
return nil
}
func (cnis CompactNodeInfos) MarshalBencode() ([]byte, error) {
ret := make([]byte, 0) // TODO: this doesn't look idiomatic at all, is this the right way?
for _, cni := range cnis {
ret = append(ret, cni.MarshalBinary()...)
}
return bencode.Marshal(ret)
}
func (cni CompactNodeInfo) MarshalBinary() []byte {
ret := make([]byte, 20)
copy(ret, cni.ID)
ret = append(ret, cni.Addr.IP.To4()...)
portEncoding := make([]byte, 2)
binary.BigEndian.PutUint16(portEncoding, uint16(cni.Addr.Port))
ret = append(ret, portEncoding...)
return ret
}
func (e Error) MarshalBencode() ([]byte, error) {
return []byte(fmt.Sprintf("li%de%d:%se", e.Code, len(e.Message), e.Message)), nil
}
func (e *Error) UnmarshalBencode(b []byte) (err error) {
var code, msgLen int
regex := regexp.MustCompile(`li([0-9]+)e([0-9]+):(.+)e`)
// I don't know how to use regexp.Regexp.FindAllSubmatch:
// TODO: Why three level deep slices?
// TODO: What is @n?
matches := regex.FindAllSubmatch(b, 1)[0][1:]
if _, err := fmt.Sscanf(string(matches[0]), "%d", &code); err != nil {
return fmt.Errorf("could not parse the error code: %s", err.Error())
}
if _, err := fmt.Sscanf(string(matches[1]), "%d", &msgLen); err != nil {
return fmt.Errorf("could not parse the error message length: %s", err.Error())
}
if len(matches[2]) != msgLen {
return
return fmt.Errorf("error message have different lengths (%d vs %d) \"%s\"!", len(matches[2]), msgLen, matches[2])
}
e.Code = code
e.Message = matches[2]
return nil
}

View File

@ -0,0 +1,234 @@
package mainline
import (
"bytes"
"net"
"reflect"
"testing"
"github.com/anacrolix/torrent/bencode"
)
var codecTest_validInstances = []struct{
data []byte
msg Message
}{
// ping Query:
{
data: []byte("d1:ad2:id20:abcdefghij0123456789e1:q4:ping1:t2:aa1:y1:qe"),
msg: Message{
T: []byte("aa"),
Y: "q",
Q: "ping",
A: QueryArguments{
ID: []byte("abcdefghij0123456789"),
},
},
},
// ping or announce_peer Response:
// Also, includes NUL and EOT characters as transaction ID (`t`).
{
data: []byte("d1:rd2:id20:mnopqrstuvwxyz123456e1:t2:\x00\x041:y1:re"),
msg: Message{
T: []byte("\x00\x04"),
Y: "r",
R: ResponseValues{
ID: []byte("mnopqrstuvwxyz123456"),
},
},
},
// find_node Query:
{
data: []byte("d1:ad2:id20:abcdefghij01234567896:target20:mnopqrstuvwxyz123456e1:q9:find_node1:t2:\x09\x0a1:y1:qe"),
msg: Message{
T: []byte("\x09\x0a"),
Y: "q",
Q: "find_node",
A: QueryArguments{
ID: []byte("abcdefghij0123456789"),
Target: []byte("mnopqrstuvwxyz123456"),
},
},
},
// find_node Response with no nodes (`nodes` key still exists):
{
data: []byte("d1:rd2:id20:0123456789abcdefghij5:nodes0:e1:t2:aa1:y1:re"),
msg: Message{
T: []byte("aa"),
Y: "r",
R: ResponseValues{
ID: []byte("0123456789abcdefghij"),
Nodes: []CompactNodeInfo{},
},
},
},
// find_node Response with a single node:
{
data: []byte("d1:rd2:id20:0123456789abcdefghij5:nodes26:abcdefghijklmnopqrst\x8b\x82\x8e\xf5\x0cae1:t2:aa1:y1:re"),
msg: Message{
T: []byte("aa"),
Y: "r",
R: ResponseValues{
ID: []byte("0123456789abcdefghij"),
Nodes: []CompactNodeInfo{
{
ID: []byte("abcdefghijklmnopqrst"),
Addr: net.UDPAddr{IP: []byte("\x8b\x82\x8e\xf5"), Port: 3169, Zone: ""},
},
},
},
},
},
// find_node Response with 8 nodes (all the same except the very last one):
{
data: []byte("d1:rd2:id20:0123456789abcdefghij5:nodes208:abcdefghijklmnopqrst\x8b\x82\x8e\xf5\x0caabcdefghijklmnopqrst\x8b\x82\x8e\xf5\x0caabcdefghijklmnopqrst\x8b\x82\x8e\xf5\x0caabcdefghijklmnopqrst\x8b\x82\x8e\xf5\x0caabcdefghijklmnopqrst\x8b\x82\x8e\xf5\x0caabcdefghijklmnopqrst\x8b\x82\x8e\xf5\x0caabcdefghijklmnopqrst\x8b\x82\x8e\xf5\x0cazyxwvutsrqponmlkjihg\xf5\x8e\x82\x8b\x1b\x13e1:t2:aa1:y1:re"),
msg: Message{
T: []byte("aa"),
Y: "r",
R: ResponseValues{
ID: []byte("0123456789abcdefghij"),
Nodes: []CompactNodeInfo{
{
ID: []byte("abcdefghijklmnopqrst"),
Addr: net.UDPAddr{IP: []byte("\x8b\x82\x8e\xf5"), Port: 3169, Zone: ""},
},
{
ID: []byte("abcdefghijklmnopqrst"),
Addr: net.UDPAddr{IP: []byte("\x8b\x82\x8e\xf5"), Port: 3169, Zone: ""},
},
{
ID: []byte("abcdefghijklmnopqrst"),
Addr: net.UDPAddr{IP: []byte("\x8b\x82\x8e\xf5"), Port: 3169, Zone: ""},
},
{
ID: []byte("abcdefghijklmnopqrst"),
Addr: net.UDPAddr{IP: []byte("\x8b\x82\x8e\xf5"), Port: 3169, Zone: ""},
},
{
ID: []byte("abcdefghijklmnopqrst"),
Addr: net.UDPAddr{IP: []byte("\x8b\x82\x8e\xf5"), Port: 3169, Zone: ""},
},
{
ID: []byte("abcdefghijklmnopqrst"),
Addr: net.UDPAddr{IP: []byte("\x8b\x82\x8e\xf5"), Port: 3169, Zone: ""},
},
{
ID: []byte("abcdefghijklmnopqrst"),
Addr: net.UDPAddr{IP: []byte("\x8b\x82\x8e\xf5"), Port: 3169, Zone: ""},
},
{
ID: []byte("zyxwvutsrqponmlkjihg"),
Addr: net.UDPAddr{IP: []byte("\xf5\x8e\x82\x8b"), Port: 6931, Zone: ""},
},
},
},
},
},
// get_peers Query:
{
data: []byte("d1:ad2:id20:abcdefghij01234567899:info_hash20:mnopqrstuvwxyz123456e1:q9:get_peers1:t2:aa1:y1:qe"),
msg: Message{
T: []byte("aa"),
Y: "q",
Q: "get_peers",
A: QueryArguments{
ID: []byte("abcdefghij0123456789"),
InfoHash: []byte("mnopqrstuvwxyz123456"),
},
},
},
// get_peers Response with 2 peers (`values`):
{
data: []byte("d1:rd2:id20:abcdefghij01234567895:token8:aoeusnth6:valuesl6:axje.u6:idhtnmee1:t2:aa1:y1:re"),
msg: Message{
T: []byte("aa"),
Y: "r",
R: ResponseValues{
ID: []byte("abcdefghij0123456789"),
Token: []byte("aoeusnth"),
Values: []CompactPeer{
{IP: []byte("axje"), Port: 11893},
{IP: []byte("idht"), Port: 28269},
},
},
},
},
// get_peers Response with 2 closest nodes (`nodes`):
{
data: []byte("d1:rd2:id20:abcdefghij01234567895:nodes52:abcdefghijklmnopqrst\x8b\x82\x8e\xf5\x0cazyxwvutsrqponmlkjihg\xf5\x8e\x82\x8b\x1b\x135:token8:aoeusnthe1:t2:aa1:y1:re"),
msg: Message{
T: []byte("aa"),
Y: "r",
R: ResponseValues{
ID: []byte("abcdefghij0123456789"),
Token: []byte("aoeusnth"),
Nodes: []CompactNodeInfo{
{
ID: []byte("abcdefghijklmnopqrst"),
Addr: net.UDPAddr{IP: []byte("\x8b\x82\x8e\xf5"), Port: 3169, Zone: ""},
},
{
ID: []byte("zyxwvutsrqponmlkjihg"),
Addr: net.UDPAddr{IP: []byte("\xf5\x8e\x82\x8b"), Port: 6931, Zone: ""},
},
},
},
},
},
// announce_peer Query without optional `implied_port` argument:
{
data: []byte("d1:ad2:id20:abcdefghij01234567899:info_hash20:mnopqrstuvwxyz1234564:porti6881e5:token8:aoeusnthe1:q13:announce_peer1:t2:aa1:y1:qe"),
msg: Message{
T: []byte("aa"),
Y: "q",
Q: "announce_peer",
A: QueryArguments{
ID: []byte("abcdefghij0123456789"),
InfoHash: []byte("mnopqrstuvwxyz123456"),
Port: 6881,
Token: []byte("aoeusnth"),
},
},
},
{
data: []byte("d1:eli201e23:A Generic Error Ocurrede1:t2:aa1:y1:ee"),
msg: Message{
T: []byte("aa"),
Y: "e",
E: Error{Code: 201, Message: []byte("A Generic Error Ocurred")},
},
},
// TODO: Test Error where E.Message is an empty string, and E.Message contains invalid Unicode characters.
// TODO: Add announce_peer Query with optional `implied_port` argument.
}
func TestUnmarshal(t *testing.T) {
for i, instance := range codecTest_validInstances {
msg := Message{}
err := bencode.Unmarshal(instance.data, &msg)
if err != nil {
t.Errorf("Error while unmarshalling valid data #%d: %v", i + 1, err)
continue
}
if reflect.DeepEqual(msg, instance.msg) != true {
t.Errorf("Valid data #%d unmarshalled wrong!\n\tGot : %+v\n\tExpected: %+v",
i + 1, msg, instance.msg)
}
}
}
func TestMarshal(t *testing.T) {
for i, instance := range codecTest_validInstances {
data, err := bencode.Marshal(instance.msg)
if err != nil {
t.Errorf("Error while marshalling valid msg #%d: %v", i + 1, err)
}
if bytes.Equal(data, instance.data) != true {
t.Errorf("Valid msg #%d marshalled wrong!\n\tGot : %q\n\tExpected: %q",
i + 1, data, instance.data)
}
}
}

View File

@ -0,0 +1,300 @@
package mainline
import (
"crypto/rand"
"crypto/sha1"
"net"
"sync"
"time"
"go.uber.org/zap"
)
type Protocol struct {
previousTokenSecret, currentTokenSecret []byte
tokenLock sync.Mutex
transport *Transport
eventHandlers ProtocolEventHandlers
started bool
}
type ProtocolEventHandlers struct {
OnPingQuery func(*Message, net.Addr)
OnFindNodeQuery func(*Message, net.Addr)
OnGetPeersQuery func(*Message, net.Addr)
OnAnnouncePeerQuery func(*Message, net.Addr)
OnGetPeersResponse func(*Message, net.Addr)
OnFindNodeResponse func(*Message, net.Addr)
OnPingORAnnouncePeerResponse func(*Message, net.Addr)
}
func NewProtocol(laddr net.UDPAddr, eventHandlers ProtocolEventHandlers) (p *Protocol) {
p = new(Protocol)
p.transport = NewTransport(laddr, p.onMessage)
p.eventHandlers = eventHandlers
p.currentTokenSecret, p.previousTokenSecret = make([]byte, 20), make([]byte, 20)
_, err := rand.Read(p.currentTokenSecret)
if err != nil {
zap.L().Fatal("Could NOT generate random bytes for token secret!", zap.Error(err))
}
copy(p.previousTokenSecret, p.currentTokenSecret)
return
}
func (p *Protocol) Start() {
if p.started {
zap.L().Panic("Attempting to Start() a mainline/Transport that has been already started! (Programmer error.)")
}
p.started = true
p.transport.Start()
go p.updateTokenSecret()
}
func (p *Protocol) Terminate() {
p.transport.Terminate()
}
func (p *Protocol) onMessage(msg *Message, addr net.Addr) {
switch msg.Y {
case "q":
switch msg.Q {
case "ping":
if !validatePingQueryMessage(msg) {
zap.L().Debug("An invalid ping query received!")
return
}
// Check whether there is a registered event handler for the ping queries, before
// attempting to call.
if p.eventHandlers.OnPingQuery != nil {
p.eventHandlers.OnPingQuery(msg, addr)
}
case "find_node":
if !validateFindNodeQueryMessage(msg) {
zap.L().Debug("An invalid find_node query received!")
return
}
if p.eventHandlers.OnFindNodeQuery != nil {
p.eventHandlers.OnFindNodeQuery(msg, addr)
}
case "get_peers":
if !validateGetPeersQueryMessage(msg) {
zap.L().Debug("An invalid get_peers query received!")
return
}
if p.eventHandlers.OnGetPeersQuery != nil {
p.eventHandlers.OnGetPeersQuery(msg, addr)
}
case "announce_peer":
if !validateAnnouncePeerQueryMessage(msg) {
zap.L().Debug("An invalid announce_peer query received!")
return
}
if p.eventHandlers.OnAnnouncePeerQuery != nil {
p.eventHandlers.OnAnnouncePeerQuery(msg, addr)
}
default:
zap.L().Debug("A KRPC query of an unknown method received!",
zap.String("method", msg.Q))
return
}
case "r":
// get_peers > find_node > ping / announce_peer
if len(msg.R.Token) != 0 { // The message should be a get_peers response.
if !validateGetPeersResponseMessage(msg) {
zap.L().Debug("An invalid get_peers response received!")
return
}
if p.eventHandlers.OnGetPeersResponse != nil{
p.eventHandlers.OnGetPeersResponse(msg, addr)
}
} else if len(msg.R.Nodes) != 0 { // The message should be a find_node response.
if !validateFindNodeResponseMessage(msg) {
zap.L().Debug("An invalid find_node response received!")
return
}
if p.eventHandlers.OnFindNodeResponse != nil{
p.eventHandlers.OnFindNodeResponse(msg, addr)
}
} else { // The message should be a ping or an announce_peer response.
if !validatePingORannouncePeerResponseMessage(msg) {
zap.L().Debug("An invalid ping OR announce_peer response received!")
return
}
if p.eventHandlers.OnPingORAnnouncePeerResponse != nil {
p.eventHandlers.OnPingORAnnouncePeerResponse(msg, addr)
}
}
case "e":
zap.L().Sugar().Debugf("Protocol error received: `%s` (%d)", msg.E.Message, msg.E.Code)
default:
zap.L().Debug("A KRPC message of an unknown type received!",
zap.String("type", msg.Y))
}
}
func (p *Protocol) SendMessage(msg *Message, addr net.Addr) {
p.transport.WriteMessages(msg, addr)
}
func NewPingQuery(id []byte) *Message {
panic("Not implemented yet!")
}
func NewFindNodeQuery(id []byte, target []byte) *Message {
return &Message{
Y: "q",
T: []byte("\x00"),
Q: "find_node",
A: QueryArguments{
ID: id,
Target: target,
},
}
}
func NewGetPeersQuery(id []byte, info_hash []byte) *Message {
panic("Not implemented yet!")
}
func NewAnnouncePeerQuery(id []byte, implied_port bool, info_hash []byte, port uint16,
token []byte) *Message {
panic("Not implemented yet!")
}
func NewPingResponse(t []byte, id []byte) *Message {
return &Message{
Y: "r",
T: t,
R: ResponseValues{
ID: id,
},
}
}
func NewFindNodeResponse(t []byte, id []byte, nodes []CompactNodeInfo) *Message {
panic("Not implemented yet!")
}
func NewGetPeersResponseWithValues(t []byte, id []byte, token []byte, values []CompactPeer) *Message {
panic("Not implemented yet!")
}
func NewGetPeersResponseWithNodes(t []byte, id []byte, token []byte, nodes []CompactNodeInfo) *Message {
return &Message{
Y: "r",
T: t,
R: ResponseValues{
ID: id,
Token: token,
Nodes: nodes,
},
}
}
func NewAnnouncePeerResponse(t []byte, id []byte) *Message {
// Because they are indistinguishable.
return NewPingResponse(t, id)
}
func (p *Protocol) CalculateToken(address net.IP) []byte {
p.tokenLock.Lock()
defer p.tokenLock.Unlock()
sum := sha1.Sum(append(p.currentTokenSecret, address...))
return sum[:]
}
func (p *Protocol) VerifyToken(address net.IP, token []byte) bool {
p.tokenLock.Lock()
defer p.tokenLock.Unlock()
panic("VerifyToken() not implemented yet!")
// TODO
return false
}
func (p *Protocol) updateTokenSecret() {
for range time.Tick(10 * time.Minute) {
p.tokenLock.Lock()
copy(p.previousTokenSecret, p.currentTokenSecret)
_, err := rand.Read(p.currentTokenSecret)
if err != nil {
p.tokenLock.Unlock()
zap.L().Fatal("Could NOT generate random bytes for token secret!", zap.Error(err))
}
p.tokenLock.Unlock()
}
}
func validatePingQueryMessage(msg *Message) bool {
return len(msg.A.ID) == 20
}
func validateFindNodeQueryMessage(msg *Message) bool {
return len(msg.A.ID) == 20 &&
len(msg.A.Target) == 20
}
func validateGetPeersQueryMessage(msg *Message) bool {
return len(msg.A.ID) == 20 &&
len(msg.A.InfoHash) == 20
}
func validateAnnouncePeerQueryMessage(msg *Message) bool {
return len(msg.A.ID) == 20 &&
len(msg.A.InfoHash) == 20 &&
msg.A.Port > 0 &&
len(msg.A.Token) > 0
}
func validatePingORannouncePeerResponseMessage(msg *Message) bool {
return len(msg.R.ID) == 20
}
func validateFindNodeResponseMessage(msg *Message) bool {
if len(msg.R.ID) != 20 {
return false
}
// TODO: check nodes field
return true
}
func validateGetPeersResponseMessage(msg *Message) bool {
return len(msg.R.ID) == 20 &&
len(msg.R.Token) > 0
// TODO: check for values or nodes
}

View File

@ -0,0 +1,200 @@
package mainline
import (
"testing"
"net"
)
var protocolTest_validInstances = []struct {
validator func(*Message) bool
msg Message
} {
// ping Query:
{
validator: validatePingQueryMessage,
msg: Message{
T: []byte("aa"),
Y: "q",
Q: "ping",
A: QueryArguments{
ID: []byte("abcdefghij0123456789"),
},
},
},
// ping or announce_peer Response:
// Also, includes NUL and EOT characters as transaction ID (`t`).
{
validator: validatePingORannouncePeerResponseMessage,
msg: Message{
T: []byte("\x00\x04"),
Y: "r",
R: ResponseValues{
ID: []byte("mnopqrstuvwxyz123456"),
},
},
},
// find_node Query:
{
validator: validateFindNodeQueryMessage,
msg: Message{
T: []byte("\x09\x0a"),
Y: "q",
Q: "find_node",
A: QueryArguments{
ID: []byte("abcdefghij0123456789"),
Target: []byte("mnopqrstuvwxyz123456"),
},
},
},
// find_node Response with no nodes (`nodes` key still exists):
{
validator: validateFindNodeResponseMessage,
msg: Message{
T: []byte("aa"),
Y: "r",
R: ResponseValues{
ID: []byte("0123456789abcdefghij"),
Nodes: []CompactNodeInfo{},
},
},
},
// find_node Response with a single node:
{
validator: validateFindNodeResponseMessage,
msg: Message{
T: []byte("aa"),
Y: "r",
R: ResponseValues{
ID: []byte("0123456789abcdefghij"),
Nodes: []CompactNodeInfo{
{
ID: []byte("abcdefghijklmnopqrst"),
Addr: net.UDPAddr{IP: []byte("\x8b\x82\x8e\xf5"), Port: 3169, Zone: ""},
},
},
},
},
},
// find_node Response with 8 nodes (all the same except the very last one):
{
validator: validateFindNodeResponseMessage,
msg: Message{
T: []byte("aa"),
Y: "r",
R: ResponseValues{
ID: []byte("0123456789abcdefghij"),
Nodes: []CompactNodeInfo{
{
ID: []byte("abcdefghijklmnopqrst"),
Addr: net.UDPAddr{IP: []byte("\x8b\x82\x8e\xf5"), Port: 3169, Zone: ""},
},
{
ID: []byte("abcdefghijklmnopqrst"),
Addr: net.UDPAddr{IP: []byte("\x8b\x82\x8e\xf5"), Port: 3169, Zone: ""},
},
{
ID: []byte("abcdefghijklmnopqrst"),
Addr: net.UDPAddr{IP: []byte("\x8b\x82\x8e\xf5"), Port: 3169, Zone: ""},
},
{
ID: []byte("abcdefghijklmnopqrst"),
Addr: net.UDPAddr{IP: []byte("\x8b\x82\x8e\xf5"), Port: 3169, Zone: ""},
},
{
ID: []byte("abcdefghijklmnopqrst"),
Addr: net.UDPAddr{IP: []byte("\x8b\x82\x8e\xf5"), Port: 3169, Zone: ""},
},
{
ID: []byte("abcdefghijklmnopqrst"),
Addr: net.UDPAddr{IP: []byte("\x8b\x82\x8e\xf5"), Port: 3169, Zone: ""},
},
{
ID: []byte("abcdefghijklmnopqrst"),
Addr: net.UDPAddr{IP: []byte("\x8b\x82\x8e\xf5"), Port: 3169, Zone: ""},
},
{
ID: []byte("zyxwvutsrqponmlkjihg"),
Addr: net.UDPAddr{IP: []byte("\xf5\x8e\x82\x8b"), Port: 6931, Zone: ""},
},
},
},
},
},
// get_peers Query:
{
validator: validateGetPeersQueryMessage,
msg: Message{
T: []byte("aa"),
Y: "q",
Q: "get_peers",
A: QueryArguments{
ID: []byte("abcdefghij0123456789"),
InfoHash: []byte("mnopqrstuvwxyz123456"),
},
},
},
// get_peers Response with 2 peers (`values`):
{
validator: validateGetPeersResponseMessage,
msg: Message{
T: []byte("aa"),
Y: "r",
R: ResponseValues{
ID: []byte("abcdefghij0123456789"),
Token: []byte("aoeusnth"),
Values: []CompactPeer{
{IP: []byte("axje"), Port: 11893},
{IP: []byte("idht"), Port: 28269},
},
},
},
},
// get_peers Response with 2 closest nodes (`nodes`):
{
validator: validateGetPeersResponseMessage,
msg: Message{
T: []byte("aa"),
Y: "r",
R: ResponseValues{
ID: []byte("abcdefghij0123456789"),
Token: []byte("aoeusnth"),
Nodes: []CompactNodeInfo{
{
ID: []byte("abcdefghijklmnopqrst"),
Addr: net.UDPAddr{IP: []byte("\x8b\x82\x8e\xf5"), Port: 3169, Zone: ""},
},
{
ID: []byte("zyxwvutsrqponmlkjihg"),
Addr: net.UDPAddr{IP: []byte("\xf5\x8e\x82\x8b"), Port: 6931, Zone: ""},
},
},
},
},
},
// announce_peer Query without optional `implied_port` argument:
{
validator: validateAnnouncePeerQueryMessage,
msg: Message{
T: []byte("aa"),
Y: "q",
Q: "announce_peer",
A: QueryArguments{
ID: []byte("abcdefghij0123456789"),
InfoHash: []byte("mnopqrstuvwxyz123456"),
Port: 6881,
Token: []byte("aoeusnth"),
},
},
},
// TODO: Add announce_peer Query with optional `implied_port` argument.
}
func TestValidators(t *testing.T) {
for i, instance := range protocolTest_validInstances {
if isValid := instance.validator(&instance.msg); !isValid {
t.Errorf("False-positive for valid msg #%d!", i + 1)
}
}
}

View File

@ -0,0 +1,200 @@
package mainline
import (
"crypto/rand"
"net"
"sync"
"time"
"github.com/anacrolix/torrent"
"github.com/anacrolix/torrent/metainfo"
"go.uber.org/zap"
)
type TrawlingResult struct {
InfoHash metainfo.Hash
Peer torrent.Peer
}
type TrawlingService struct {
// Private
protocol *Protocol
started bool
eventHandlers TrawlingServiceEventHandlers
trueNodeID []byte
// []byte type would be a much better fit for the keys but unfortunately (and quite
// understandably) slices cannot be used as keys (since they are not hashable), and using arrays
// (or even the conversion between each other) is a pain; hence map[string]net.UDPAddr
// ^~~~~~
routingTable map[string]net.Addr
routingTableMutex *sync.Mutex
}
type TrawlingServiceEventHandlers struct {
OnResult func(TrawlingResult)
}
func NewTrawlingService(laddr net.UDPAddr, eventHandlers TrawlingServiceEventHandlers) *TrawlingService {
service := new(TrawlingService)
service.protocol = NewProtocol(
laddr,
ProtocolEventHandlers{
OnGetPeersQuery: service.onGetPeersQuery,
OnAnnouncePeerQuery: service.onAnnouncePeerQuery,
OnFindNodeResponse: service.onFindNodeResponse,
},
)
service.trueNodeID = make([]byte, 20)
service.routingTable = make(map[string]net.Addr)
service.routingTableMutex = new(sync.Mutex)
service.eventHandlers = eventHandlers
_, err := rand.Read(service.trueNodeID)
if err != nil {
zap.L().Panic("Could NOT generate random bytes for node ID!")
}
return service
}
func (s *TrawlingService) Start() {
if s.started {
zap.L().Panic("Attempting to Start() a mainline/TrawlingService that has been already started! (Programmer error.)")
}
s.started = true
s.protocol.Start()
go s.trawl()
zap.L().Info("Trawling Service started!")
}
func (s *TrawlingService) Terminate() {
s.protocol.Terminate()
}
func (s *TrawlingService) trawl() {
for range time.Tick(1 * time.Second) {
s.routingTableMutex.Lock()
if len(s.routingTable) == 0 {
s.bootstrap()
} else {
zap.L().Info("Latest status:", zap.Int("n", len(s.routingTable)))
s.findNeighbors()
s.routingTable = make(map[string]net.Addr)
}
s.routingTableMutex.Unlock()
}
}
func (s *TrawlingService) bootstrap() {
bootstrappingNodes := []string{
"router.bittorrent.com:6881",
"dht.transmissionbt.com:6881",
"dht.libtorrent.org:25401",
}
zap.L().Info("Bootstrapping as routing table is empty...")
for _, node := range bootstrappingNodes {
target := make([]byte, 20)
_, err := rand.Read(target)
if err != nil {
zap.L().Panic("Could NOT generate random bytes during bootstrapping!")
}
addr, err := net.ResolveUDPAddr("udp", node)
if err != nil {
zap.L().Error("Could NOT resolve (UDP) address of the bootstrapping node!",
zap.String("node", node))
}
s.protocol.SendMessage(NewFindNodeQuery(s.trueNodeID, target), addr)
}
}
func (s *TrawlingService) findNeighbors() {
target := make([]byte, 20)
for nodeID, addr := range s.routingTable {
_, err := rand.Read(target)
if err != nil {
zap.L().Panic("Could NOT generate random bytes during bootstrapping!")
}
s.protocol.SendMessage(
NewFindNodeQuery(append([]byte(nodeID[:15]), s.trueNodeID[:5]...), target),
addr,
)
}
}
func (s *TrawlingService) onGetPeersQuery(query *Message, addr net.Addr) {
s.protocol.SendMessage(
NewGetPeersResponseWithNodes(
query.T,
append(query.A.ID[:15], s.trueNodeID[:5]...),
s.protocol.CalculateToken(net.ParseIP(addr.String()))[:],
[]CompactNodeInfo{},
),
addr,
)
}
func (s *TrawlingService) onAnnouncePeerQuery(query *Message, addr net.Addr) {
var peerPort int
if query.A.ImpliedPort != 0 {
peerPort = addr.(*net.UDPAddr).Port
} else {
peerPort = query.A.Port
}
// TODO: It looks ugly, am I doing it right? --Bora
// (Converting slices to arrays in Go shouldn't have been such a pain...)
var peerId, infoHash [20]byte
copy(peerId[:], []byte("\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"))
copy(infoHash[:], query.A.InfoHash)
s.eventHandlers.OnResult(TrawlingResult{
InfoHash: infoHash,
Peer: torrent.Peer{
// As we don't know the ID of the remote peer, set it empty.
Id: peerId,
IP: addr.(*net.UDPAddr).IP,
Port: peerPort,
// "Ha" indicates that we discovered the peer through DHT Announce Peer (query); not
// sure how anacrolix/torrent utilizes that information though.
Source: "Ha",
// We don't know whether the remote peer supports encryption either, but let's pretend
// that it doesn't.
SupportsEncryption: false,
},
})
s.protocol.SendMessage(
NewAnnouncePeerResponse(
query.T,
append(query.A.ID[:15], s.trueNodeID[:5]...),
),
addr,
)
}
func (s *TrawlingService) onFindNodeResponse(response *Message, addr net.Addr) {
s.routingTableMutex.Lock()
defer s.routingTableMutex.Unlock()
for _, node := range response.R.Nodes {
if node.Addr.Port != 0 { // Ignore nodes who "use" port 0.
s.routingTable[string(node.ID)] = &node.Addr
}
}
}

View File

@ -0,0 +1,99 @@
package mainline
import (
"net"
"go.uber.org/zap"
"github.com/anacrolix/torrent/bencode"
"strings"
)
type Transport struct {
conn *net.UDPConn
laddr net.UDPAddr
started bool
// OnMessage is the function that will be called when Transport receives a packet that is
// successfully unmarshalled as a syntactically correct Message (but -of course- the checking
// the semantic correctness of the Message is left to Protocol).
onMessage func(*Message, net.Addr)
}
func NewTransport(laddr net.UDPAddr, onMessage func(*Message, net.Addr)) (*Transport) {
transport := new(Transport)
transport.onMessage = onMessage
transport.laddr = laddr
return transport
}
func (t *Transport) Start() {
// Why check whether the Transport `t` started or not, here and not -for instance- in
// t.Terminate()?
// Because in t.Terminate() the programmer (i.e. you & me) would stumble upon an error while
// trying close an uninitialised net.UDPConn or something like that: it's mostly harmless
// because its effects are immediate. But if you try to start a Transport `t` for the second
// (or the third, 4th, ...) time, it will keep spawning goroutines and any small mistake may
// end up in a debugging horror.
// Here ends my justification.
if t.started {
zap.L().Panic("Attempting to Start() a mainline/Transport that has been already started! (Programmer error.)")
}
t.started = true
var err error
t.conn, err = net.ListenUDP("udp", &t.laddr)
if err != nil {
zap.L().Fatal("Could NOT create a UDP socket!", zap.Error(err))
}
go t.readMessages()
}
func (t *Transport) Terminate() {
t.conn.Close()
}
// readMessages is a goroutine!
func (t *Transport) readMessages() {
buffer := make([]byte, 65536)
for {
n, addr, err := t.conn.ReadFrom(buffer)
if err != nil {
// TODO: isn't there a more reliable way to detect if UDPConn is closed?
if strings.HasSuffix(err.Error(), "use of closed network connection") {
break
} else {
zap.L().Debug("Could NOT read an UDP packet!", zap.Error(err))
}
}
var msg Message
err = bencode.Unmarshal(buffer[:n], &msg)
if err != nil {
zap.L().Debug("Could NOT unmarshal packet data!", zap.Error(err))
}
t.onMessage(&msg, addr)
}
}
func (t *Transport) WriteMessages(msg *Message, addr net.Addr) {
data, err := bencode.Marshal(msg)
if err != nil {
zap.L().Panic("Could NOT marshal an outgoing message! (Programmer error.)")
}
_, err = t.conn.WriteTo(data, addr)
// TODO: isn't there a more reliable way to detect if UDPConn is closed?
if err != nil && !strings.HasSuffix(err.Error(), "use of closed network connection") {
zap.L().Debug("Could NOT write an UDP packet!", zap.Error(err))
}
}

View File

@ -0,0 +1,55 @@
package mainline
import (
"net"
"strings"
"testing"
)
func TestReadFromOnClosedConn(t *testing.T) {
// Initialization
laddr, err := net.ResolveUDPAddr("udp", "0.0.0.0:0")
if err != nil {
t.Skipf("Skipping due to an error during initialization!")
}
conn, err := net.ListenUDP("udp", laddr)
if err != nil {
t.Skipf("Skipping due to an error during initialization!")
}
buffer := make([]byte, 65536)
// Setting Up
conn.Close()
// Testing
_, _, err = conn.ReadFrom(buffer)
if !(err != nil && strings.HasSuffix(err.Error(), "use of closed network connection")) {
t.Fatalf("Unexpected suffix in the error message!")
}
}
func TestWriteToOnClosedConn(t *testing.T) {
// Initialization
laddr, err := net.ResolveUDPAddr("udp", "0.0.0.0:0")
if err != nil {
t.Skipf("Skipping due to an error during initialization!")
}
conn, err := net.ListenUDP("udp", laddr)
if err != nil {
t.Skipf("Skipping due to an error during initialization!")
}
// Setting Up
conn.Close()
// Testing
_, err = conn.WriteTo([]byte("estarabim"), laddr)
if !(err != nil && strings.HasSuffix(err.Error(), "use of closed network connection")) {
t.Fatalf("Unexpected suffix in the error message!")
}
}

View File

@ -0,0 +1,64 @@
package dht
import (
"magneticod/dht/mainline"
"net"
"github.com/bradfitz/iter"
)
type TrawlingManager struct {
// private
output chan mainline.TrawlingResult
services []*mainline.TrawlingService
}
func NewTrawlingManager(mlAddrs []net.UDPAddr) *TrawlingManager {
manager := new(TrawlingManager)
manager.output = make(chan mainline.TrawlingResult)
if mlAddrs != nil {
for _, addr := range mlAddrs {
manager.services = append(manager.services, mainline.NewTrawlingService(
addr,
mainline.TrawlingServiceEventHandlers{
OnResult: manager.onResult,
},
))
}
} else {
addr := net.UDPAddr{IP: []byte("\x00\x00\x00\x00"), Port: 0}
for range iter.N(1) {
manager.services = append(manager.services, mainline.NewTrawlingService(
addr,
mainline.TrawlingServiceEventHandlers{
OnResult: manager.onResult,
},
))
}
}
for _, service := range manager.services {
service.Start()
}
return manager
}
func (m *TrawlingManager) onResult(res mainline.TrawlingResult) {
m.output <- res
}
func (m *TrawlingManager) Output() <-chan mainline.TrawlingResult {
return m.output
}
func (m *TrawlingManager) Terminate() {
for _, service := range m.services {
service.Terminate()
}
}

260
src/magneticod/main.go Normal file
View File

@ -0,0 +1,260 @@
package main
import (
"net"
"os"
"os/signal"
"go.uber.org/zap"
"github.com/jessevdk/go-flags"
// "magneticod/bittorrent"
"magneticod/dht"
"go.uber.org/zap/zapcore"
"regexp"
"magneticod/bittorrent"
)
type cmdFlags struct {
Database string `long:"database" description:"URL of the database."`
MlTrawlerAddrs []string `long:"ml-trawler-addrs" description:"Address(es) to be used by trawling DHT (Mainline) nodes." default:"0.0.0.0:0"`
TrawlingInterval uint `long:"trawling-interval" description:"Trawling interval in integer seconds."`
// TODO: is this even supported by anacrolix/torrent?
FetcherAddr string `long:"fetcher-addr" description:"Address(es) to be used by ephemeral peers fetching torrent metadata." default:"0.0.0.0:0"`
FetcherTimeout uint `long:"fetcher-timeout" description:"Number of integer seconds before a fetcher timeouts."`
// TODO: is this even supported by anacrolix/torrent?
MaxMetadataSize uint `long:"max-metadata-size" description:"Maximum metadata size -which must be greater than zero- in bytes."`
MlStatisticianAddrs []string `long:"ml-statistician-addrs" description:"Address(es) to be used by ephemeral nodes fetching latest statistics about individual torrents." default:"0.0.0.0:0"`
StatisticianTimeout uint `long:"statistician-timeout" description:"Number of integer seconds before a statistician timeouts."`
// TODO: is this even supported by anacrolix/torrent?
LeechAddr string `long:"leech-addr" description:"Address(es) to be used by ephemeral peers fetching README files." default:"0.0.0.0:0"`
LeechTimeout uint `long:"leech-timeout" description:"Number of integer seconds before a leech timeouts."`
MaxDescriptionSize uint `long:"max-description-size" description:"Maximum size -which must be greater than zero- of a description file in bytes"`
DescriptionNames []string `long:"description-names" description:"Regular expression(s) which will be tested against the name of the description files, in the supplied order."`
Verbose []bool `short:"v" long:"verbose" description:"Increases verbosity."`
// ==== Deprecated Flags ====
// TODO: don't even support deprecated flags!
// DatabaseFile is akin to Database flag, except that it was used when SQLite was the only
// persistence backend ever conceived, so it's the path* to the database file, which was -by
// default- located in wherever appdata module on Python said:
// On GNU/Linux : `/home/<USER>/.local/share/magneticod/database.sqlite3`
// On Windows : TODO?
// On MacOS (OS X) : TODO?
// On BSDs? : TODO?
// On anywhere else: TODO?
// TODO: Is the path* absolute or can be relative as well?
// DatabaseFile string
}
type opFlags struct {
Database string
MlTrawlerAddrs []net.UDPAddr
TrawlingInterval uint
FetcherAddr net.TCPAddr
FetcherTimeout uint
// TODO: is this even supported by anacrolix/torrent?
MaxMetadataSize uint
MlStatisticianAddrs []net.UDPAddr
StatisticianTimeout uint
LeechAddr net.TCPAddr
LeechTimeout uint
MaxDescriptionSize uint
DescriptionNames []regexp.Regexp
Verbosity uint
}
func main() {
atom := zap.NewAtomicLevel()
// Logging levels: ("debug", "info", "warn", "error", "dpanic", "panic", and "fatal").
logger := zap.New(zapcore.NewCore(
zapcore.NewConsoleEncoder(zap.NewDevelopmentEncoderConfig()),
zapcore.Lock(os.Stderr),
atom,
))
defer logger.Sync()
zap.ReplaceGlobals(logger)
zap.L().Info("magneticod v0.7.0 has been started.")
zap.L().Info("Copyright (C) 2017 Mert Bora ALPER <bora@boramalper.org>.")
zap.L().Info("Dedicated to Cemile Binay, in whose hands I thrived.")
// opFlags is the "operational flags"
opFlags := parseFlags()
logger.Sugar().Warn(">>>", opFlags.MlTrawlerAddrs)
switch opFlags.Verbosity {
case 0:
atom.SetLevel(zap.WarnLevel)
case 1:
atom.SetLevel(zap.InfoLevel)
// Default: i.e. in case of 2 or more.
default:
atom.SetLevel(zap.DebugLevel)
}
zap.ReplaceGlobals(logger)
/*
updating_manager := nil
statistics_sink := nil
completing_manager := nil
file_sink := nil
*/
// Handle Ctrl-C gracefully.
interrupt_chan := make(chan os.Signal)
signal.Notify(interrupt_chan, os.Interrupt)
database, err := NewDatabase(opFlags.Database)
if err != nil {
logger.Sugar().Fatalf("Could not open the database at `%s`: %s", opFlags.Database, err.Error())
}
go func() {
trawlingManager := dht.NewTrawlingManager(opFlags.MlTrawlerAddrs)
metadataSink := bittorrent.NewMetadataSink(opFlags.FetcherAddr)
for {
select {
case result := <-trawlingManager.Output():
logger.Info("result: ", zap.String("hash", result.InfoHash.String()))
metadataSink.Sink(result)
case metadata := <-metadataSink.Drain():
if err := database.AddNewTorrent(metadata); err != nil {
logger.Sugar().Fatalf("Could not add new torrent %x to the database: %s", metadata.InfoHash, err.Error())
}
case <-interrupt_chan:
trawlingManager.Terminate()
break
}
}
}()
go func() {
}()
go func() {
}()
/*
for {
select {
case updating_manager.Output():
case statistics_sink.Sink():
case completing_manager.Output():
case file_sink.Sink():
*/
<-interrupt_chan
}
func parseFlags() (opFlags) {
var cmdF cmdFlags
_, err := flags.Parse(&cmdF)
if err != nil {
zap.L().Fatal("Error while parsing command-line flags: ", zap.Error(err))
}
mlTrawlerAddrs, err := hostPortsToUDPAddrs(cmdF.MlTrawlerAddrs)
if err != nil {
zap.L().Fatal("Erroneous ml-trawler-addrs argument supplied: ", zap.Error(err))
}
fetcherAddr, err := hostPortsToTCPAddr(cmdF.FetcherAddr)
if err != nil {
zap.L().Fatal("Erroneous fetcher-addr argument supplied: ", zap.Error(err))
}
mlStatisticianAddrs, err := hostPortsToUDPAddrs(cmdF.MlStatisticianAddrs)
if err != nil {
zap.L().Fatal("Erroneous ml-statistician-addrs argument supplied: ", zap.Error(err))
}
leechAddr, err := hostPortsToTCPAddr(cmdF.LeechAddr)
if err != nil {
zap.L().Fatal("Erroneous leech-addrs argument supplied: ", zap.Error(err))
}
var descriptionNames []regexp.Regexp
for _, expr := range cmdF.DescriptionNames {
regex, err := regexp.Compile(expr)
if err != nil {
zap.L().Fatal("Erroneous description-names argument supplied: ", zap.Error(err))
}
descriptionNames = append(descriptionNames, *regex)
}
opF := opFlags{
Database: cmdF.Database,
MlTrawlerAddrs: mlTrawlerAddrs,
TrawlingInterval: cmdF.TrawlingInterval,
FetcherAddr: fetcherAddr,
FetcherTimeout: cmdF.FetcherTimeout,
MaxMetadataSize: cmdF.MaxMetadataSize,
MlStatisticianAddrs: mlStatisticianAddrs,
StatisticianTimeout: cmdF.StatisticianTimeout,
LeechAddr: leechAddr,
LeechTimeout: cmdF.LeechTimeout,
MaxDescriptionSize: cmdF.MaxDescriptionSize,
DescriptionNames: descriptionNames,
Verbosity: uint(len(cmdF.Verbose)),
}
return opF
}
func hostPortsToUDPAddrs(hostport []string) ([]net.UDPAddr, error) {
udpAddrs := make([]net.UDPAddr, len(hostport))
for i, hp := range hostport {
udpAddr, err := net.ResolveUDPAddr("udp", hp)
if err != nil {
return nil, err
}
udpAddrs[i] = *udpAddr
}
return udpAddrs, nil
}
func hostPortsToTCPAddr(hostport string) (net.TCPAddr, error) {
tcpAddr, err := net.ResolveTCPAddr("tcp", hostport)
if err != nil {
return net.TCPAddr{}, err
}
return *tcpAddr, nil
}

View File

@ -0,0 +1,228 @@
package main
import (
"fmt"
"database/sql"
"net/url"
_ "github.com/mattn/go-sqlite3"
"go.uber.org/zap"
"magneticod/bittorrent"
"path"
"os"
)
type engineType uint8
const (
SQLITE engineType = 0
POSTGRESQL = 1
)
type Database struct {
database *sql.DB
engine engineType
newTorrents chan bittorrent.Metadata
}
// NewDatabase creates a new Database.
//
// url either starts with "sqlite:" or "postgresql:"
func NewDatabase(rawurl string) (*Database, error) {
db := Database{}
dbURL, err := url.Parse(rawurl)
if err != nil {
return nil, err
}
switch dbURL.Scheme {
case "sqlite":
db.engine = SQLITE
// All this pain is to make sure that an empty file exist (in case the database is not there
// yet) so that sql.Open won't fail.
dbDir, _ := path.Split(dbURL.Path)
if err := os.MkdirAll(dbDir, 0755); err != nil {
return nil, fmt.Errorf("for directory `%s`: %s", dbDir, err.Error())
}
f, err := os.OpenFile(dbURL.Path, os.O_CREATE, 0666)
if err != nil {
return nil, fmt.Errorf("for file `%s`: %s", dbURL.Path, err.Error())
}
if err := f.Sync(); err != nil {
return nil, fmt.Errorf("for file `%s`: %s", dbURL.Path, err.Error())
}
if err := f.Close(); err != nil {
return nil, fmt.Errorf("for file `%s`: %s", dbURL.Path, err.Error())
}
db.database, err = sql.Open("sqlite3", dbURL.RawPath)
case "postgresql":
db.engine = POSTGRESQL
db.database, err = sql.Open("postgresql", rawurl)
default:
return nil, fmt.Errorf("unknown URI scheme (or malformed URI)!")
}
// Check for errors from sql.Open()
if err != nil {
return nil, fmt.Errorf("sql.Open()! %s", err.Error())
}
if err = db.database.Ping(); err != nil {
return nil, fmt.Errorf("DB.Ping()! %s", err.Error())
}
if err := db.setupDatabase(); err != nil {
return nil, fmt.Errorf("setupDatabase()! %s", err.Error())
}
db.newTorrents = make(chan bittorrent.Metadata, 10)
return &db, nil
}
// AddNewTorrent adds a new torrent to the *queue* to be flushed to the persistent database.
func (db *Database) AddNewTorrent(torrent bittorrent.Metadata) error {
for {
select {
case db.newTorrents <- torrent:
return nil
default:
// newTorrents queue was full: flush and try again and again (and again)...
err := db.flushNewTorrents()
if err != nil {
return err
}
continue
}
}
}
func (db *Database) flushNewTorrents() error {
tx, err := db.database.Begin()
if err != nil {
return fmt.Errorf("sql.DB.Begin()! %s", err.Error())
}
var nTorrents, nFiles uint
for torrent := range db.newTorrents {
res, err := tx.Exec("INSERT INTO torrents (info_hash, name, total_size, discovered_on) VALUES ($1, $2, $3, $4);",
torrent.InfoHash, torrent.Name, torrent.TotalSize, torrent.DiscoveredOn)
if err != nil {
ourError := fmt.Errorf("error while INSERTing INTO torrents! %s", err.Error())
if err := tx.Rollback(); err != nil {
return fmt.Errorf("%s\tmeanwhile, could not rollback the current transaction either! %s", ourError.Error(), err.Error())
}
return ourError
}
var lastInsertId int64
if lastInsertId, err = res.LastInsertId(); err != nil {
return fmt.Errorf("sql.Result.LastInsertId()! %s", err.Error())
}
for _, file := range torrent.Files {
_, err := tx.Exec("INSERT INTO files (torrent_id, size, path) VALUES($1, $2, $3);",
lastInsertId, file.Length, file.Path)
if err != nil {
ourError := fmt.Errorf("error while INSERTing INTO files! %s", err.Error())
if err := tx.Rollback(); err != nil {
return fmt.Errorf("%s\tmeanwhile, could not rollback the current transaction either! %s", ourError.Error(), err.Error())
}
return ourError
}
nFiles++
}
nTorrents++
}
err = tx.Commit()
if err != nil {
return fmt.Errorf("sql.Tx.Commit()! %s", err.Error())
}
zap.L().Sugar().Infof("%d torrents (%d files) are flushed to the database successfully.",
nTorrents, nFiles)
return nil
}
func (db *Database) Close() {
// Be careful to not to get into an infinite loop. =)
db.database.Close()
}
func (db *Database) setupDatabase() error {
switch db.engine {
case SQLITE:
return setupSqliteDatabase(db.database)
case POSTGRESQL:
zap.L().Fatal("setupDatabase() is not implemented for PostgreSQL yet!")
default:
zap.L().Sugar().Fatalf("Unknown database engine value %d! (programmer error)", db.engine)
}
return nil
}
func setupSqliteDatabase(database *sql.DB) error {
// Enable Write-Ahead Logging for SQLite as "WAL provides more concurrency as readers do not
// block writers and a writer does not block readers. Reading and writing can proceed
// concurrently."
// Caveats:
// * Might be unsupported by OSes other than Windows and UNIXes.
// * Does not work over a network filesystem.
// * Transactions that involve changes against multiple ATTACHed databases are not atomic
// across all databases as a set.
// See: https://www.sqlite.org/wal.html
//
// Force SQLite to use disk, instead of memory, for all temporary files to reduce the memory
// footprint.
//
// Enable foreign key constraints in SQLite which are crucial to prevent programmer errors on
// our side.
_, err := database.Exec(
`PRAGMA journal_mode=WAL;
PRAGMA temp_store=1;
PRAGMA foreign_keys=ON;`,
)
if err != nil {
return err
}
_, err = database.Exec(
`CREATE TABLE IF NOT EXISTS torrents (
id INTEGER PRIMARY KEY,
info_hash BLOB NOT NULL UNIQUE,
name TEXT NOT NULL,
total_size INTEGER NOT NULL CHECK(total_size > 0),
discovered_on INTEGER NOT NULL CHECK(discovered_on > 0)
);
CREATE INDEX IF NOT EXISTS info_hash_index ON torrents (info_hash);
CREATE TABLE IF NOT EXISTS files (
id INTEGER PRIMARY KEY,
torrent_id INTEGER REFERENCES torrents ON DELETE CASCADE ON UPDATE RESTRICT,
size INTEGER NOT NULL,
path TEXT NOT NULL
);`,
)
if err != nil {
return err
}
return nil
}

26
src/magneticow/Gopkg.toml Normal file
View File

@ -0,0 +1,26 @@
# Gopkg.toml example
#
# Refer to https://github.com/golang/dep/blob/master/docs/Gopkg.toml.md
# for detailed Gopkg.toml documentation.
#
# required = ["github.com/user/thing/cmd/thing"]
# ignored = ["github.com/user/project/pkgX", "bitbucket.org/user/project/pkgA/pkgY"]
#
# [[constraint]]
# name = "github.com/user/project"
# version = "1.0.0"
#
# [[constraint]]
# name = "github.com/user/project2"
# branch = "dev"
# source = "github.com/myfork/project2"
#
# [[override]]
# name = "github.com/x/y"
# version = "2.4.0"
[[constraint]]
name = "github.com/gorilla/mux"
version = "1.4.0"

63
src/magneticow/main.go Normal file
View File

@ -0,0 +1,63 @@
// magneticow - Lightweight web interface for magnetico.
// Copyright (C) 2017 Mert Bora ALPER <bora@boramalper.org>
// Dedicated to Cemile Binay, in whose hands I thrived.
//
// This program is free software: you can redistribute it and/or modify it under the terms of the
// GNU General Public License as published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
// even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
// General Public License for more details.
//
// You should have received a copy of the GNU General Public License along with this program. If
// not, see <http://www.gnu.org/licenses/>.
package main
import (
"net/http"
"github.com/gorilla/mux"
)
func main() {
router := mux.NewRouter()
router.HandleFunc("/", rootHandler)
router.HandleFunc("/torrents", torrentsHandler)
router.HandleFunc("/torrents/{infohash}", torrentsInfohashHandler)
router.HandleFunc("/torrents/{infohash}/{name}", torrentsInfohashNameHandler)
router.HandleFunc("/statistics", statisticsHandler)
router.HandleFunc("/feed", feedHandler)
http.ListenAndServe(":8080", router)
}
func rootHandler(w http.ResponseWriter, r *http.Request) {
}
func torrentsHandler(w http.ResponseWriter, r *http.Request) {
}
func torrentsInfohashHandler(w http.ResponseWriter, r *http.Request) {
}
func torrentsInfohashNameHandler(w http.ResponseWriter, r *http.Request) {
}
func statisticsHandler(w http.ResponseWriter, r *http.Request) {
}
func feedHandler(w http.ResponseWriter, r *http.Request) {
}