v0.8.0 featuring BEP 51 "DHT Infohash Indexing"
This commit is contained in:
parent
aae67090af
commit
1fdfa131aa
@ -234,10 +234,11 @@ func (l *Leech) readUmMessage() ([]byte, error) {
|
|||||||
func (l *Leech) connect(deadline time.Time) error {
|
func (l *Leech) connect(deadline time.Time) error {
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
l.conn, err = net.DialTCP("tcp4", nil, l.peerAddr)
|
x, err := net.DialTimeout("tcp4", l.peerAddr.String(), 1*time.Second)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrap(err, "dial")
|
return errors.Wrap(err, "dial")
|
||||||
}
|
}
|
||||||
|
l.conn = x.(*net.TCPConn)
|
||||||
|
|
||||||
// > If sec == 0, operating system discards any unsent or unacknowledged data [after Close()
|
// > If sec == 0, operating system discards any unsent or unacknowledged data [after Close()
|
||||||
// > has been called].
|
// > has been called].
|
||||||
@ -249,22 +250,6 @@ func (l *Leech) connect(deadline time.Time) error {
|
|||||||
return errors.Wrap(err, "SetLinger")
|
return errors.Wrap(err, "SetLinger")
|
||||||
}
|
}
|
||||||
|
|
||||||
err = l.conn.SetKeepAlive(true)
|
|
||||||
if err != nil {
|
|
||||||
if err := l.conn.Close(); err != nil {
|
|
||||||
zap.L().Panic("couldn't close leech connection!", zap.Error(err))
|
|
||||||
}
|
|
||||||
return errors.Wrap(err, "SetKeepAlive")
|
|
||||||
}
|
|
||||||
|
|
||||||
err = l.conn.SetKeepAlivePeriod(10 * time.Second)
|
|
||||||
if err != nil {
|
|
||||||
if err := l.conn.Close(); err != nil {
|
|
||||||
zap.L().Panic("couldn't close leech connection!", zap.Error(err))
|
|
||||||
}
|
|
||||||
return errors.Wrap(err, "SetKeepAlivePeriod")
|
|
||||||
}
|
|
||||||
|
|
||||||
err = l.conn.SetNoDelay(true)
|
err = l.conn.SetNoDelay(true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err := l.conn.Close(); err != nil {
|
if err := l.conn.Close(); err != nil {
|
||||||
|
@ -2,6 +2,7 @@ package metadata
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"math/rand"
|
"math/rand"
|
||||||
|
"net"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -24,14 +25,18 @@ type Metadata struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type Sink struct {
|
type Sink struct {
|
||||||
PeerID []byte
|
PeerID []byte
|
||||||
deadline time.Duration
|
deadline time.Duration
|
||||||
maxNLeeches int
|
maxNLeeches int
|
||||||
drain chan Metadata
|
drain chan Metadata
|
||||||
incomingInfoHashes map[[20]byte]struct{}
|
|
||||||
|
incomingInfoHashes map[[20]byte][]net.TCPAddr
|
||||||
incomingInfoHashesMx sync.Mutex
|
incomingInfoHashesMx sync.Mutex
|
||||||
terminated bool
|
|
||||||
termination chan interface{}
|
terminated bool
|
||||||
|
termination chan interface{}
|
||||||
|
|
||||||
|
deleted int
|
||||||
}
|
}
|
||||||
|
|
||||||
func randomID() []byte {
|
func randomID() []byte {
|
||||||
@ -52,7 +57,7 @@ func randomID() []byte {
|
|||||||
* - Last two digits for the minor version number
|
* - Last two digits for the minor version number
|
||||||
* - Patch version number is not encoded.
|
* - Patch version number is not encoded.
|
||||||
*/
|
*/
|
||||||
prefix := []byte("-MC0007-")
|
prefix := []byte("-MC0008-")
|
||||||
|
|
||||||
var rando []byte
|
var rando []byte
|
||||||
for i := 20 - len(prefix); i >= 0; i-- {
|
for i := 20 - len(prefix); i >= 0; i-- {
|
||||||
@ -74,17 +79,28 @@ func NewSink(deadline time.Duration, maxNLeeches int) *Sink {
|
|||||||
ms.PeerID = randomID()
|
ms.PeerID = randomID()
|
||||||
ms.deadline = deadline
|
ms.deadline = deadline
|
||||||
ms.maxNLeeches = maxNLeeches
|
ms.maxNLeeches = maxNLeeches
|
||||||
ms.drain = make(chan Metadata)
|
ms.drain = make(chan Metadata, 10)
|
||||||
ms.incomingInfoHashes = make(map[[20]byte]struct{})
|
ms.incomingInfoHashes = make(map[[20]byte][]net.TCPAddr)
|
||||||
ms.termination = make(chan interface{})
|
ms.termination = make(chan interface{})
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
for range time.Tick(deadline) {
|
||||||
|
ms.incomingInfoHashesMx.Lock()
|
||||||
|
l := len(ms.incomingInfoHashes)
|
||||||
|
ms.incomingInfoHashesMx.Unlock()
|
||||||
|
zap.L().Info("Sink status",
|
||||||
|
zap.Int("activeLeeches", l),
|
||||||
|
zap.Int("nDeleted", ms.deleted),
|
||||||
|
zap.Int("drainQueue", len(ms.drain)),
|
||||||
|
)
|
||||||
|
ms.deleted = 0
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
return ms
|
return ms
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ms *Sink) Sink(res dht.Result) {
|
func (ms *Sink) Sink(res dht.Result) {
|
||||||
infoHash := res.InfoHash()
|
|
||||||
peerAddr := res.PeerAddr()
|
|
||||||
|
|
||||||
if ms.terminated {
|
if ms.terminated {
|
||||||
zap.L().Panic("Trying to Sink() an already closed Sink!")
|
zap.L().Panic("Trying to Sink() an already closed Sink!")
|
||||||
}
|
}
|
||||||
@ -96,23 +112,22 @@ func (ms *Sink) Sink(res dht.Result) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
infoHash := res.InfoHash()
|
||||||
|
peerAddrs := res.PeerAddrs()
|
||||||
|
|
||||||
if _, exists := ms.incomingInfoHashes[infoHash]; exists {
|
if _, exists := ms.incomingInfoHashes[infoHash]; exists {
|
||||||
return
|
return
|
||||||
|
} else if len(peerAddrs) > 0 {
|
||||||
|
peer := peerAddrs[0]
|
||||||
|
ms.incomingInfoHashes[infoHash] = peerAddrs[1:]
|
||||||
|
|
||||||
|
go NewLeech(infoHash, &peer, ms.PeerID, LeechEventHandlers{
|
||||||
|
OnSuccess: ms.flush,
|
||||||
|
OnError: ms.onLeechError,
|
||||||
|
}).Do(time.Now().Add(ms.deadline))
|
||||||
}
|
}
|
||||||
// BEWARE!
|
|
||||||
// Although not crucial, the assumption is that Sink.Sink() will be called by only one
|
|
||||||
// goroutine (i.e. it's not thread-safe), lest there might be a race condition between where we
|
|
||||||
// check whether res.infoHash exists in the ms.incomingInfoHashes, and where we add the infoHash
|
|
||||||
// to the incomingInfoHashes at the end of this function.
|
|
||||||
|
|
||||||
zap.L().Debug("Sunk!", zap.Int("leeches", len(ms.incomingInfoHashes)), util.HexField("infoHash", infoHash[:]))
|
zap.L().Debug("Sunk!", zap.Int("leeches", len(ms.incomingInfoHashes)), util.HexField("infoHash", infoHash[:]))
|
||||||
|
|
||||||
go NewLeech(infoHash, peerAddr, ms.PeerID, LeechEventHandlers{
|
|
||||||
OnSuccess: ms.flush,
|
|
||||||
OnError: ms.onLeechError,
|
|
||||||
}).Do(time.Now().Add(ms.deadline))
|
|
||||||
|
|
||||||
ms.incomingInfoHashes[infoHash] = struct{}{}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ms *Sink) Drain() <-chan Metadata {
|
func (ms *Sink) Drain() <-chan Metadata {
|
||||||
@ -136,17 +151,29 @@ func (ms *Sink) flush(result Metadata) {
|
|||||||
ms.drain <- result
|
ms.drain <- result
|
||||||
// Delete the infoHash from ms.incomingInfoHashes ONLY AFTER once we've flushed the
|
// Delete the infoHash from ms.incomingInfoHashes ONLY AFTER once we've flushed the
|
||||||
// metadata!
|
// metadata!
|
||||||
|
ms.incomingInfoHashesMx.Lock()
|
||||||
|
defer ms.incomingInfoHashesMx.Unlock()
|
||||||
|
|
||||||
var infoHash [20]byte
|
var infoHash [20]byte
|
||||||
copy(infoHash[:], result.InfoHash)
|
copy(infoHash[:], result.InfoHash)
|
||||||
ms.incomingInfoHashesMx.Lock()
|
|
||||||
delete(ms.incomingInfoHashes, infoHash)
|
delete(ms.incomingInfoHashes, infoHash)
|
||||||
ms.incomingInfoHashesMx.Unlock()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ms *Sink) onLeechError(infoHash [20]byte, err error) {
|
func (ms *Sink) onLeechError(infoHash [20]byte, err error) {
|
||||||
zap.L().Debug("leech error", util.HexField("infoHash", infoHash[:]), zap.Error(err))
|
zap.L().Debug("leech error", util.HexField("infoHash", infoHash[:]), zap.Error(err))
|
||||||
|
|
||||||
ms.incomingInfoHashesMx.Lock()
|
ms.incomingInfoHashesMx.Lock()
|
||||||
delete(ms.incomingInfoHashes, infoHash)
|
defer ms.incomingInfoHashesMx.Unlock()
|
||||||
ms.incomingInfoHashesMx.Unlock()
|
|
||||||
|
if len(ms.incomingInfoHashes[infoHash]) > 0 {
|
||||||
|
peer := ms.incomingInfoHashes[infoHash][0]
|
||||||
|
ms.incomingInfoHashes[infoHash] = ms.incomingInfoHashes[infoHash][1:]
|
||||||
|
go NewLeech(infoHash, &peer, ms.PeerID, LeechEventHandlers{
|
||||||
|
OnSuccess: ms.flush,
|
||||||
|
OnError: ms.onLeechError,
|
||||||
|
}).Do(time.Now().Add(ms.deadline))
|
||||||
|
} else {
|
||||||
|
ms.deleted++
|
||||||
|
delete(ms.incomingInfoHashes, infoHash)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -34,16 +34,16 @@ type IndexingServiceEventHandlers struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type IndexingResult struct {
|
type IndexingResult struct {
|
||||||
infoHash [20]byte
|
infoHash [20]byte
|
||||||
peerAddr *net.TCPAddr
|
peerAddrs []net.TCPAddr
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ir IndexingResult) InfoHash() [20]byte {
|
func (ir IndexingResult) InfoHash() [20]byte {
|
||||||
return ir.infoHash
|
return ir.infoHash
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ir IndexingResult) PeerAddr() *net.TCPAddr {
|
func (ir IndexingResult) PeerAddrs() []net.TCPAddr {
|
||||||
return ir.peerAddr
|
return ir.peerAddrs
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewIndexingService(laddr string, interval time.Duration, eventHandlers IndexingServiceEventHandlers) *IndexingService {
|
func NewIndexingService(laddr string, interval time.Duration, eventHandlers IndexingServiceEventHandlers) *IndexingService {
|
||||||
@ -86,11 +86,6 @@ func (is *IndexingService) Terminate() {
|
|||||||
|
|
||||||
func (is *IndexingService) index() {
|
func (is *IndexingService) index() {
|
||||||
for range time.Tick(is.interval) {
|
for range time.Tick(is.interval) {
|
||||||
// TODO
|
|
||||||
// For some reason, we can't still detect congestion and this keeps increasing...
|
|
||||||
// Disable for now.
|
|
||||||
// s.maxNeighbors = uint(float32(s.maxNeighbors) * 1.001)
|
|
||||||
|
|
||||||
is.routingTableMutex.Lock()
|
is.routingTableMutex.Lock()
|
||||||
if len(is.routingTable) == 0 {
|
if len(is.routingTable) == 0 {
|
||||||
is.bootstrap()
|
is.bootstrap()
|
||||||
@ -190,15 +185,22 @@ func (is *IndexingService) onGetPeersResponse(msg *Message, addr *net.UDPAddr) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
peerAddrs := make([]net.TCPAddr, 0)
|
||||||
for _, peer := range msg.R.Values {
|
for _, peer := range msg.R.Values {
|
||||||
is.eventHandlers.OnResult(IndexingResult{
|
if peer.Port == 0 {
|
||||||
infoHash: infoHash,
|
continue
|
||||||
peerAddr: &net.TCPAddr{
|
}
|
||||||
IP: peer.IP,
|
|
||||||
Port: peer.Port,
|
peerAddrs = append(peerAddrs, net.TCPAddr{
|
||||||
},
|
IP: peer.IP,
|
||||||
|
Port: peer.Port,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
is.eventHandlers.OnResult(IndexingResult{
|
||||||
|
infoHash: infoHash,
|
||||||
|
peerAddrs: peerAddrs,
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (is *IndexingService) onSampleInfohashesResponse(msg *Message, addr *net.UDPAddr) {
|
func (is *IndexingService) onSampleInfohashesResponse(msg *Message, addr *net.UDPAddr) {
|
||||||
|
@ -225,8 +225,8 @@ func NewSampleInfohashesQuery(id []byte, t []byte, target []byte) *Message {
|
|||||||
Y: "q",
|
Y: "q",
|
||||||
T: t,
|
T: t,
|
||||||
Q: "sample_infohashes",
|
Q: "sample_infohashes",
|
||||||
A: QueryArguments {
|
A: QueryArguments{
|
||||||
ID: id,
|
ID: id,
|
||||||
Target: target,
|
Target: target,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
@ -1,5 +0,0 @@
|
|||||||
package mainline
|
|
||||||
|
|
||||||
type Service interface {
|
|
||||||
// TODO: develop a service interface to be used by the manager
|
|
||||||
}
|
|
@ -94,12 +94,11 @@ func (t *Transport) readMessages() {
|
|||||||
zap.L().Warn("READ CONGESTION!", zap.Error(err))
|
zap.L().Warn("READ CONGESTION!", zap.Error(err))
|
||||||
t.onCongestion()
|
t.onCongestion()
|
||||||
} else if err != nil {
|
} else if err != nil {
|
||||||
// TODO: isn't there a more reliable way to detect if UDPConn is closed?
|
|
||||||
zap.L().Warn("Could NOT read an UDP packet!", zap.Error(err))
|
zap.L().Warn("Could NOT read an UDP packet!", zap.Error(err))
|
||||||
}
|
}
|
||||||
|
|
||||||
if n == 0 {
|
if n == 0 {
|
||||||
/* Datagram sockets in various domains (e.g., the UNIX and Internet domains) permit
|
/* Datagram sockets in various domains (e.g., the UNIX and Internet domains) permit
|
||||||
* zero-length datagrams. When such a datagram is received, the return value (n) is 0.
|
* zero-length datagrams. When such a datagram is received, the return value (n) is 0.
|
||||||
*/
|
*/
|
||||||
continue
|
continue
|
||||||
@ -150,7 +149,7 @@ func (t *Transport) WriteMessages(msg *Message, addr *net.UDPAddr) {
|
|||||||
*
|
*
|
||||||
* Source: https://docs.python.org/3/library/asyncio-protocol.html#flow-control-callbacks
|
* Source: https://docs.python.org/3/library/asyncio-protocol.html#flow-control-callbacks
|
||||||
*/
|
*/
|
||||||
//zap.L().Warn("WRITE CONGESTION!", zap.Error(err))
|
zap.L().Warn("WRITE CONGESTION!", zap.Error(err))
|
||||||
if t.onCongestion != nil {
|
if t.onCongestion != nil {
|
||||||
t.onCongestion()
|
t.onCongestion()
|
||||||
}
|
}
|
||||||
|
@ -1,228 +0,0 @@
|
|||||||
package mainline
|
|
||||||
|
|
||||||
import (
|
|
||||||
"math/rand"
|
|
||||||
"net"
|
|
||||||
"sync"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"go.uber.org/zap"
|
|
||||||
)
|
|
||||||
|
|
||||||
type TrawlingService struct {
|
|
||||||
// Private
|
|
||||||
protocol *Protocol
|
|
||||||
started bool
|
|
||||||
interval time.Duration
|
|
||||||
eventHandlers TrawlingServiceEventHandlers
|
|
||||||
|
|
||||||
trueNodeID []byte
|
|
||||||
// []byte type would be a much better fit for the keys but unfortunately (and quite
|
|
||||||
// understandably) slices cannot be used as keys (since they are not hashable), and using arrays
|
|
||||||
// (or even the conversion between each other) is a pain; hence map[string]net.UDPAddr
|
|
||||||
// ^~~~~~
|
|
||||||
routingTable map[string]*net.UDPAddr
|
|
||||||
routingTableMutex *sync.Mutex
|
|
||||||
maxNeighbors uint
|
|
||||||
}
|
|
||||||
|
|
||||||
type TrawlingServiceEventHandlers struct {
|
|
||||||
OnResult func(TrawlingResult)
|
|
||||||
}
|
|
||||||
|
|
||||||
type TrawlingResult struct {
|
|
||||||
infoHash [20]byte
|
|
||||||
peerAddr *net.TCPAddr
|
|
||||||
}
|
|
||||||
|
|
||||||
func (tr TrawlingResult) InfoHash() [20]byte {
|
|
||||||
return tr.infoHash
|
|
||||||
}
|
|
||||||
|
|
||||||
func (tr TrawlingResult) PeerAddr() *net.TCPAddr {
|
|
||||||
return tr.peerAddr
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewTrawlingService(laddr string, initialMaxNeighbors uint, interval time.Duration, eventHandlers TrawlingServiceEventHandlers) *TrawlingService {
|
|
||||||
service := new(TrawlingService)
|
|
||||||
service.interval = interval
|
|
||||||
service.protocol = NewProtocol(
|
|
||||||
laddr,
|
|
||||||
ProtocolEventHandlers{
|
|
||||||
OnGetPeersQuery: service.onGetPeersQuery,
|
|
||||||
OnAnnouncePeerQuery: service.onAnnouncePeerQuery,
|
|
||||||
OnFindNodeResponse: service.onFindNodeResponse,
|
|
||||||
OnCongestion: service.onCongestion,
|
|
||||||
},
|
|
||||||
)
|
|
||||||
service.trueNodeID = make([]byte, 20)
|
|
||||||
service.routingTable = make(map[string]*net.UDPAddr)
|
|
||||||
service.routingTableMutex = new(sync.Mutex)
|
|
||||||
service.eventHandlers = eventHandlers
|
|
||||||
service.maxNeighbors = initialMaxNeighbors
|
|
||||||
|
|
||||||
_, err := rand.Read(service.trueNodeID)
|
|
||||||
if err != nil {
|
|
||||||
zap.L().Panic("Could NOT generate random bytes for node ID!")
|
|
||||||
}
|
|
||||||
|
|
||||||
return service
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *TrawlingService) Start() {
|
|
||||||
if s.started {
|
|
||||||
zap.L().Panic("Attempting to Start() a mainline/TrawlingService that has been already started! (Programmer error.)")
|
|
||||||
}
|
|
||||||
s.started = true
|
|
||||||
|
|
||||||
s.protocol.Start()
|
|
||||||
go s.trawl()
|
|
||||||
|
|
||||||
zap.L().Info("Trawling Service started!")
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *TrawlingService) Terminate() {
|
|
||||||
s.protocol.Terminate()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *TrawlingService) trawl() {
|
|
||||||
for range time.Tick(s.interval) {
|
|
||||||
// TODO
|
|
||||||
// For some reason, we can't still detect congestion and this keeps increasing...
|
|
||||||
// Disable for now.
|
|
||||||
// s.maxNeighbors = uint(float32(s.maxNeighbors) * 1.001)
|
|
||||||
|
|
||||||
s.routingTableMutex.Lock()
|
|
||||||
if len(s.routingTable) == 0 {
|
|
||||||
s.bootstrap()
|
|
||||||
} else {
|
|
||||||
zap.L().Info("Latest status:", zap.Int("n", len(s.routingTable)),
|
|
||||||
zap.Uint("maxNeighbors", s.maxNeighbors))
|
|
||||||
s.findNeighbors()
|
|
||||||
s.routingTable = make(map[string]*net.UDPAddr)
|
|
||||||
}
|
|
||||||
s.routingTableMutex.Unlock()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *TrawlingService) bootstrap() {
|
|
||||||
bootstrappingNodes := []string{
|
|
||||||
"router.bittorrent.com:6881",
|
|
||||||
"dht.transmissionbt.com:6881",
|
|
||||||
"dht.libtorrent.org:25401",
|
|
||||||
}
|
|
||||||
zap.L().Info("Bootstrapping as routing table is empty...")
|
|
||||||
for _, node := range bootstrappingNodes {
|
|
||||||
target := make([]byte, 20)
|
|
||||||
_, err := rand.Read(target)
|
|
||||||
if err != nil {
|
|
||||||
zap.L().Panic("Could NOT generate random bytes during bootstrapping!")
|
|
||||||
}
|
|
||||||
|
|
||||||
addr, err := net.ResolveUDPAddr("udp", node)
|
|
||||||
if err != nil {
|
|
||||||
zap.L().Error("Could NOT resolve (UDP) address of the bootstrapping node!",
|
|
||||||
zap.String("node", node))
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
s.protocol.SendMessage(NewFindNodeQuery(s.trueNodeID, target), addr)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *TrawlingService) findNeighbors() {
|
|
||||||
target := make([]byte, 20)
|
|
||||||
for nodeID, addr := range s.routingTable {
|
|
||||||
_, err := rand.Read(target)
|
|
||||||
if err != nil {
|
|
||||||
zap.L().Panic("Could NOT generate random bytes during bootstrapping!")
|
|
||||||
}
|
|
||||||
|
|
||||||
s.protocol.SendMessage(
|
|
||||||
NewFindNodeQuery(append([]byte(nodeID[:15]), s.trueNodeID[:5]...), target),
|
|
||||||
addr,
|
|
||||||
)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *TrawlingService) onGetPeersQuery(query *Message, addr *net.UDPAddr) {
|
|
||||||
s.protocol.SendMessage(
|
|
||||||
NewGetPeersResponseWithNodes(
|
|
||||||
query.T,
|
|
||||||
append(query.A.ID[:15], s.trueNodeID[:5]...),
|
|
||||||
s.protocol.CalculateToken(net.ParseIP(addr.String()))[:],
|
|
||||||
[]CompactNodeInfo{},
|
|
||||||
),
|
|
||||||
addr,
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *TrawlingService) onAnnouncePeerQuery(query *Message, addr *net.UDPAddr) {
|
|
||||||
/* BEP 5
|
|
||||||
*
|
|
||||||
* There is an optional argument called implied_port which value is either 0 or 1. If it is
|
|
||||||
* present and non-zero, the port argument should be ignored and the source port of the UDP
|
|
||||||
* packet should be used as the peer's port instead. This is useful for peers behind a NAT that
|
|
||||||
* may not know their external port, and supporting uTP, they accept incoming connections on the
|
|
||||||
* same port as the DHT port.
|
|
||||||
*/
|
|
||||||
var peerPort int
|
|
||||||
if query.A.ImpliedPort != 0 {
|
|
||||||
// TODO: Peer uses uTP, ignore for now
|
|
||||||
// return
|
|
||||||
peerPort = addr.Port
|
|
||||||
} else {
|
|
||||||
peerPort = query.A.Port
|
|
||||||
// return
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: It looks ugly, am I doing it right? --Bora
|
|
||||||
// (Converting slices to arrays in Go shouldn't have been such a pain...)
|
|
||||||
var infoHash [20]byte
|
|
||||||
copy(infoHash[:], query.A.InfoHash)
|
|
||||||
s.eventHandlers.OnResult(TrawlingResult{
|
|
||||||
infoHash: infoHash,
|
|
||||||
peerAddr: &net.TCPAddr{
|
|
||||||
IP: addr.IP,
|
|
||||||
Port: peerPort,
|
|
||||||
},
|
|
||||||
})
|
|
||||||
|
|
||||||
s.protocol.SendMessage(
|
|
||||||
NewAnnouncePeerResponse(
|
|
||||||
query.T,
|
|
||||||
append(query.A.ID[:15], s.trueNodeID[:5]...),
|
|
||||||
),
|
|
||||||
addr,
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *TrawlingService) onFindNodeResponse(response *Message, addr *net.UDPAddr) {
|
|
||||||
s.routingTableMutex.Lock()
|
|
||||||
defer s.routingTableMutex.Unlock()
|
|
||||||
|
|
||||||
for _, node := range response.R.Nodes {
|
|
||||||
if uint(len(s.routingTable)) >= s.maxNeighbors {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
if node.Addr.Port == 0 { // Ignore nodes who "use" port 0.
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
s.routingTable[string(node.ID)] = &node.Addr
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *TrawlingService) onCongestion() {
|
|
||||||
/* The Congestion Prevention Strategy:
|
|
||||||
*
|
|
||||||
* In case of congestion, decrease the maximum number of nodes to the 90% of the current value.
|
|
||||||
*/
|
|
||||||
if s.maxNeighbors < 200 {
|
|
||||||
zap.L().Warn("Max. number of neighbours are < 200 and there is still congestion!" +
|
|
||||||
"(check your network connection if this message recurs)")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
s.maxNeighbors = uint(float32(s.maxNeighbors) * 0.9)
|
|
||||||
}
|
|
@ -9,39 +9,27 @@ import (
|
|||||||
"github.com/boramalper/magnetico/cmd/magneticod/dht/mainline"
|
"github.com/boramalper/magnetico/cmd/magneticod/dht/mainline"
|
||||||
)
|
)
|
||||||
|
|
||||||
type TrawlingManager struct {
|
type Service interface {
|
||||||
// private
|
Start()
|
||||||
output chan Result
|
Terminate()
|
||||||
trawlingServices []*mainline.TrawlingService
|
|
||||||
indexingServices []*mainline.IndexingService
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type Result interface {
|
type Result interface {
|
||||||
InfoHash() [20]byte
|
InfoHash() [20]byte
|
||||||
PeerAddr() *net.TCPAddr
|
PeerAddrs() []net.TCPAddr
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewTrawlingManager(tsAddrs []string, isAddrs []string, interval time.Duration) *TrawlingManager {
|
type Manager struct {
|
||||||
manager := new(TrawlingManager)
|
output chan Result
|
||||||
|
indexingServices []Service
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewTrawlingManager(addrs []string, interval time.Duration) *Manager {
|
||||||
|
manager := new(Manager)
|
||||||
manager.output = make(chan Result, 20)
|
manager.output = make(chan Result, 20)
|
||||||
|
|
||||||
// Trawling Services
|
for _, addr := range addrs {
|
||||||
for _, addr := range tsAddrs {
|
service := mainline.NewIndexingService(addr, 2*time.Second, mainline.IndexingServiceEventHandlers{
|
||||||
service := mainline.NewTrawlingService(
|
|
||||||
addr,
|
|
||||||
2000,
|
|
||||||
interval,
|
|
||||||
mainline.TrawlingServiceEventHandlers{
|
|
||||||
OnResult: manager.onTrawlingResult,
|
|
||||||
},
|
|
||||||
)
|
|
||||||
manager.trawlingServices = append(manager.trawlingServices, service)
|
|
||||||
service.Start()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Indexing Services
|
|
||||||
for _, addr := range isAddrs {
|
|
||||||
service := mainline.NewIndexingService(addr, 2 * time.Second, mainline.IndexingServiceEventHandlers{
|
|
||||||
OnResult: manager.onIndexingResult,
|
OnResult: manager.onIndexingResult,
|
||||||
})
|
})
|
||||||
manager.indexingServices = append(manager.indexingServices, service)
|
manager.indexingServices = append(manager.indexingServices, service)
|
||||||
@ -51,30 +39,20 @@ func NewTrawlingManager(tsAddrs []string, isAddrs []string, interval time.Durati
|
|||||||
return manager
|
return manager
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *TrawlingManager) onTrawlingResult(res mainline.TrawlingResult) {
|
func (m *Manager) onIndexingResult(res mainline.IndexingResult) {
|
||||||
select {
|
select {
|
||||||
case m.output <- res:
|
case m.output <- res:
|
||||||
default:
|
default:
|
||||||
// TODO: should be a warn
|
|
||||||
zap.L().Debug("DHT manager output ch is full, result dropped!")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *TrawlingManager) onIndexingResult(res mainline.IndexingResult) {
|
|
||||||
select {
|
|
||||||
case m.output <- res:
|
|
||||||
default:
|
|
||||||
// TODO: should be a warn
|
|
||||||
zap.L().Debug("DHT manager output ch is full, idx result dropped!")
|
zap.L().Debug("DHT manager output ch is full, idx result dropped!")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *TrawlingManager) Output() <-chan Result {
|
func (m *Manager) Output() <-chan Result {
|
||||||
return m.output
|
return m.output
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *TrawlingManager) Terminate() {
|
func (m *Manager) Terminate() {
|
||||||
for _, service := range m.trawlingServices {
|
for _, service := range m.indexingServices {
|
||||||
service.Terminate()
|
service.Terminate()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -5,6 +5,7 @@ import (
|
|||||||
"net"
|
"net"
|
||||||
"os"
|
"os"
|
||||||
"os/signal"
|
"os/signal"
|
||||||
|
"runtime"
|
||||||
"runtime/pprof"
|
"runtime/pprof"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -27,8 +28,8 @@ import (
|
|||||||
type opFlags struct {
|
type opFlags struct {
|
||||||
DatabaseURL string
|
DatabaseURL string
|
||||||
|
|
||||||
TrawlerMlAddrs []string
|
IndexerAddrs []string
|
||||||
TrawlerMlInterval time.Duration
|
IndexerInterval time.Duration
|
||||||
|
|
||||||
LeechMaxN int
|
LeechMaxN int
|
||||||
|
|
||||||
@ -46,11 +47,7 @@ func main() {
|
|||||||
zapcore.Lock(os.Stderr),
|
zapcore.Lock(os.Stderr),
|
||||||
loggerLevel,
|
loggerLevel,
|
||||||
))
|
))
|
||||||
defer func() {
|
defer logger.Sync()
|
||||||
if err := logger.Sync(); err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
zap.ReplaceGlobals(logger)
|
zap.ReplaceGlobals(logger)
|
||||||
|
|
||||||
// opFlags is the "operational flags"
|
// opFlags is the "operational flags"
|
||||||
@ -60,8 +57,8 @@ func main() {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
zap.L().Info("magneticod v0.7.2 has been started.")
|
zap.L().Info("magneticod v0.8.0 has been started.")
|
||||||
zap.L().Info("Copyright (C) 2018 Mert Bora ALPER <bora@boramalper.org>.")
|
zap.L().Info("Copyright (C) 2017-2019 Mert Bora ALPER <bora@boramalper.org>.")
|
||||||
zap.L().Info("Dedicated to Cemile Binay, in whose hands I thrived.")
|
zap.L().Info("Dedicated to Cemile Binay, in whose hands I thrived.")
|
||||||
zap.S().Infof("Compiled on %s", compiledOn)
|
zap.S().Infof("Compiled on %s", compiledOn)
|
||||||
|
|
||||||
@ -77,8 +74,7 @@ func main() {
|
|||||||
|
|
||||||
zap.ReplaceGlobals(logger)
|
zap.ReplaceGlobals(logger)
|
||||||
|
|
||||||
switch opFlags.Profile {
|
if opFlags.Profile == "cpu" {
|
||||||
case "cpu":
|
|
||||||
file, err := os.OpenFile("magneticod_cpu.prof", os.O_CREATE|os.O_WRONLY, 0755)
|
file, err := os.OpenFile("magneticod_cpu.prof", os.O_CREATE|os.O_WRONLY, 0755)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
zap.L().Panic("Could not open the cpu profile file!", zap.Error(err))
|
zap.L().Panic("Could not open the cpu profile file!", zap.Error(err))
|
||||||
@ -97,12 +93,6 @@ func main() {
|
|||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
defer pprof.StopCPUProfile()
|
defer pprof.StopCPUProfile()
|
||||||
|
|
||||||
case "memory":
|
|
||||||
zap.L().Panic("NOT IMPLEMENTED")
|
|
||||||
|
|
||||||
case "trace":
|
|
||||||
zap.L().Panic("NOT IMPLEMENTED")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Initialise the random number generator
|
// Initialise the random number generator
|
||||||
@ -117,8 +107,8 @@ func main() {
|
|||||||
logger.Sugar().Fatalf("Could not open the database at `%s`", opFlags.DatabaseURL, zap.Error(err))
|
logger.Sugar().Fatalf("Could not open the database at `%s`", opFlags.DatabaseURL, zap.Error(err))
|
||||||
}
|
}
|
||||||
|
|
||||||
trawlingManager := dht.NewTrawlingManager(nil, []string{"0.0.0.0:0"}, opFlags.TrawlerMlInterval)
|
trawlingManager := dht.NewTrawlingManager(opFlags.IndexerAddrs, opFlags.IndexerInterval)
|
||||||
metadataSink := metadata.NewSink(2*time.Minute, opFlags.LeechMaxN)
|
metadataSink := metadata.NewSink(5*time.Second, opFlags.LeechMaxN)
|
||||||
|
|
||||||
zap.L().Debug("Peer ID", zap.ByteString("peerID", metadataSink.PeerID))
|
zap.L().Debug("Peer ID", zap.ByteString("peerID", metadataSink.PeerID))
|
||||||
|
|
||||||
@ -144,6 +134,23 @@ func main() {
|
|||||||
zap.L().Info("Fetched!", zap.String("name", md.Name), util.HexField("infoHash", md.InfoHash))
|
zap.L().Info("Fetched!", zap.String("name", md.Name), util.HexField("infoHash", md.InfoHash))
|
||||||
|
|
||||||
case <-interruptChan:
|
case <-interruptChan:
|
||||||
|
if opFlags.Profile == "heap" {
|
||||||
|
file, err := os.OpenFile("magneticod_heap.prof", os.O_CREATE|os.O_WRONLY, 0755)
|
||||||
|
if err != nil {
|
||||||
|
zap.L().Panic("Could not open the memory profile file!", zap.Error(err))
|
||||||
|
}
|
||||||
|
runtime.GC() // get up-to-date statistics
|
||||||
|
if err = pprof.WriteHeapProfile(file); err != nil {
|
||||||
|
zap.L().Fatal("Could not write heap profile!", zap.Error(err))
|
||||||
|
}
|
||||||
|
if err = file.Sync(); err != nil {
|
||||||
|
zap.L().Fatal("Could not sync profiling file!", zap.Error(err))
|
||||||
|
}
|
||||||
|
if err = file.Close(); err != nil {
|
||||||
|
zap.L().Fatal("Could not close profiling file!", zap.Error(err))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
trawlingManager.Terminate()
|
trawlingManager.Terminate()
|
||||||
stopped = true
|
stopped = true
|
||||||
}
|
}
|
||||||
@ -158,13 +165,13 @@ func parseFlags() (*opFlags, error) {
|
|||||||
var cmdF struct {
|
var cmdF struct {
|
||||||
DatabaseURL string `long:"database" description:"URL of the database."`
|
DatabaseURL string `long:"database" description:"URL of the database."`
|
||||||
|
|
||||||
TrawlerMlAddrs []string `long:"trawler-ml-addr" description:"Address(es) to be used by trawling DHT (Mainline) nodes." default:"0.0.0.0:0"`
|
IndexerAddrs []string `long:"indexer-addr" description:"Address(es) to be used by indexing DHT nodes." default:"0.0.0.0:0"`
|
||||||
TrawlerMlInterval uint `long:"trawler-ml-interval" description:"Trawling interval in integer seconds."`
|
IndexerInterval uint `long:"indexer-interval" description:"Indexing interval in integer seconds."`
|
||||||
|
|
||||||
LeechMaxN uint `long:"leech-max-n" description:"Maximum number of leeches." default:"100"`
|
LeechMaxN uint `long:"leech-max-n" description:"Maximum number of leeches." default:"200"`
|
||||||
|
|
||||||
Verbose []bool `short:"v" long:"verbose" description:"Increases verbosity."`
|
Verbose []bool `short:"v" long:"verbose" description:"Increases verbosity."`
|
||||||
Profile string `long:"profile" description:"Enable profiling." choice:"cpu" choice:"memory" choice:"trace"`
|
Profile string `long:"profile" description:"Enable profiling." choice:"cpu" choice:"heap"`
|
||||||
}
|
}
|
||||||
|
|
||||||
opF := new(opFlags)
|
opF := new(opFlags)
|
||||||
@ -187,16 +194,16 @@ func parseFlags() (*opFlags, error) {
|
|||||||
opF.DatabaseURL = cmdF.DatabaseURL
|
opF.DatabaseURL = cmdF.DatabaseURL
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = checkAddrs(cmdF.TrawlerMlAddrs); err != nil {
|
if err = checkAddrs(cmdF.IndexerAddrs); err != nil {
|
||||||
zap.S().Fatalf("Of argument (list) `trawler-ml-addr`", zap.Error(err))
|
zap.S().Fatalf("Of argument (list) `trawler-ml-addr`", zap.Error(err))
|
||||||
} else {
|
} else {
|
||||||
opF.TrawlerMlAddrs = cmdF.TrawlerMlAddrs
|
opF.IndexerAddrs = cmdF.IndexerAddrs
|
||||||
}
|
}
|
||||||
|
|
||||||
if cmdF.TrawlerMlInterval == 0 {
|
if cmdF.IndexerInterval == 0 {
|
||||||
opF.TrawlerMlInterval = 1 * time.Second
|
opF.IndexerInterval = 2 * time.Second
|
||||||
} else {
|
} else {
|
||||||
opF.TrawlerMlInterval = time.Duration(cmdF.TrawlerMlInterval) * time.Second
|
opF.IndexerInterval = time.Duration(cmdF.IndexerInterval) * time.Second
|
||||||
}
|
}
|
||||||
|
|
||||||
opF.LeechMaxN = int(cmdF.LeechMaxN)
|
opF.LeechMaxN = int(cmdF.LeechMaxN)
|
||||||
|
@ -62,8 +62,8 @@ func main() {
|
|||||||
defer logger.Sync()
|
defer logger.Sync()
|
||||||
zap.ReplaceGlobals(logger)
|
zap.ReplaceGlobals(logger)
|
||||||
|
|
||||||
zap.L().Info("magneticow v0.7.2 has been started.")
|
zap.L().Info("magneticow v0.8 has been started.")
|
||||||
zap.L().Info("Copyright (C) 2018 Mert Bora ALPER <bora@boramalper.org>.")
|
zap.L().Info("Copyright (C) 2017-2019 Mert Bora ALPER <bora@boramalper.org>.")
|
||||||
zap.L().Info("Dedicated to Cemile Binay, in whose hands I thrived.")
|
zap.L().Info("Dedicated to Cemile Binay, in whose hands I thrived.")
|
||||||
zap.S().Infof("Compiled on %s", compiledOn)
|
zap.S().Infof("Compiled on %s", compiledOn)
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user