started working on BEP 51: DHT Infohash Indexing

This commit is contained in:
Bora M. Alper 2019-01-05 21:35:13 +03:00
parent 10ba415c9a
commit b0c9198f8d
No known key found for this signature in database
GPG Key ID: 8F1A9504E1BD114D
7 changed files with 227 additions and 27 deletions

View File

@ -7,7 +7,7 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
"github.com/boramalper/magnetico/cmd/magneticod/dht/mainline" "github.com/boramalper/magnetico/cmd/magneticod/dht"
"github.com/boramalper/magnetico/pkg/persistence" "github.com/boramalper/magnetico/pkg/persistence"
"github.com/boramalper/magnetico/pkg/util" "github.com/boramalper/magnetico/pkg/util"
) )
@ -81,7 +81,10 @@ func NewSink(deadline time.Duration, maxNLeeches int) *Sink {
return ms return ms
} }
func (ms *Sink) Sink(res mainline.TrawlingResult) { func (ms *Sink) Sink(res dht.Result) {
infoHash := res.InfoHash()
peerAddr := res.PeerAddr()
if ms.terminated { if ms.terminated {
zap.L().Panic("Trying to Sink() an already closed Sink!") zap.L().Panic("Trying to Sink() an already closed Sink!")
} }
@ -93,7 +96,7 @@ func (ms *Sink) Sink(res mainline.TrawlingResult) {
return return
} }
if _, exists := ms.incomingInfoHashes[res.InfoHash]; exists { if _, exists := ms.incomingInfoHashes[infoHash]; exists {
return return
} }
// BEWARE! // BEWARE!
@ -102,14 +105,14 @@ func (ms *Sink) Sink(res mainline.TrawlingResult) {
// check whether res.infoHash exists in the ms.incomingInfoHashes, and where we add the infoHash // check whether res.infoHash exists in the ms.incomingInfoHashes, and where we add the infoHash
// to the incomingInfoHashes at the end of this function. // to the incomingInfoHashes at the end of this function.
zap.L().Info("Sunk!", zap.Int("leeches", len(ms.incomingInfoHashes)), util.HexField("infoHash", res.InfoHash[:])) zap.L().Info("Sunk!", zap.Int("leeches", len(ms.incomingInfoHashes)), util.HexField("infoHash", infoHash[:]))
go NewLeech(res.InfoHash, res.PeerAddr, ms.PeerID, LeechEventHandlers{ go NewLeech(infoHash, peerAddr, ms.PeerID, LeechEventHandlers{
OnSuccess: ms.flush, OnSuccess: ms.flush,
OnError: ms.onLeechError, OnError: ms.onLeechError,
}).Do(time.Now().Add(ms.deadline)) }).Do(time.Now().Add(ms.deadline))
ms.incomingInfoHashes[res.InfoHash] = struct{}{} ms.incomingInfoHashes[infoHash] = struct{}{}
} }
func (ms *Sink) Drain() <-chan Metadata { func (ms *Sink) Drain() <-chan Metadata {

View File

@ -20,7 +20,12 @@ import (
) )
type Message struct { type Message struct {
// Query method (one of 4: "ping", "find_node", "get_peers", "announce_peer") // Query method. One of 5:
// - "ping"
// - "find_node"
// - "get_peers"
// - "announce_peer"
// - "sample_infohashes" (added by BEP 51)
Q string `bencode:"q,omitempty"` Q string `bencode:"q,omitempty"`
// named QueryArguments sent with a query // named QueryArguments sent with a query
A QueryArguments `bencode:"a,omitempty"` A QueryArguments `bencode:"a,omitempty"`
@ -74,6 +79,13 @@ type ResponseValues struct {
// Torrent peers // Torrent peers
Values []CompactPeer `bencode:"values,omitempty"` Values []CompactPeer `bencode:"values,omitempty"`
// The subset refresh interval in seconds. Added by BEP 51.
Interval int `bencode:"interval,omitempty"`
// Number of infohashes in storage. Added by BEP 51.
Num int `bencode:"num,omitempty"`
// Subset of stored infohashes, N × 20 bytes. Added by BEP 51.
Samples []byte `bencode:"samples,omitempty"`
// If `scrape` is set to 1 in the `get_peers` query then the responding node should add the // If `scrape` is set to 1 in the `get_peers` query then the responding node should add the
// below two fields to the "r" dictionary in the response: // below two fields to the "r" dictionary in the response:
// Defined in BEP 33 "DHT Scrapes" for responses to `get_peers` queries. // Defined in BEP 33 "DHT Scrapes" for responses to `get_peers` queries.

View File

@ -0,0 +1,111 @@
package mainline
import (
"net"
"sync"
"time"
)
type IndexingService struct {
// Private
protocol *Protocol
started bool
interval time.Duration
eventHandlers IndexingServiceEventHandlers
nodeID []byte
// []byte type would be a much better fit for the keys but unfortunately (and quite
// understandably) slices cannot be used as keys (since they are not hashable), and using arrays
// (or even the conversion between each other) is a pain; hence map[string]net.UDPAddr
// ^~~~~~
routingTable map[string]*net.UDPAddr
routingTableMutex *sync.Mutex
counter uint16
getPeersRequests map[[2]byte][20]byte // GetPeersQuery.`t` -> infohash
}
type IndexingServiceEventHandlers struct {
OnResult func(IndexingResult)
}
type IndexingResult struct {
infoHash [20]byte
peerAddr *net.TCPAddr
}
func (ir IndexingResult) InfoHash() [20]byte {
return ir.infoHash
}
func (ir IndexingResult) PeerAddr() *net.TCPAddr {
return ir.peerAddr
}
func NewIndexingService(laddr string, interval time.Duration, eventHandlers IndexingServiceEventHandlers) *IndexingService {
service := new(IndexingService)
service.interval = interval
service.protocol = NewProtocol(
laddr,
ProtocolEventHandlers{
OnGetPeersResponse: service.onGetPeersResponse,
OnSampleInfohashesResponse: service.onSampleInfohashesResponse,
},
)
service.nodeID = make([]byte, 20)
service.routingTable = make(map[string]*net.UDPAddr)
service.routingTableMutex = new(sync.Mutex)
service.eventHandlers = eventHandlers
return service
}
func (is *IndexingService) onGetPeersResponse(msg *Message, addr *net.UDPAddr) {
var t [2]byte
copy(t[:], msg.T)
infoHash := is.getPeersRequests[t]
// We got a response, so free the key!
delete(is.getPeersRequests, t)
// BEP 51 specifies that
// The new sample_infohashes remote procedure call requests that a remote node return a string of multiple
// concatenated infohashes (20 bytes each) FOR WHICH IT HOLDS GET_PEERS VALUES.
// ^^^^^^
// So theoretically we should never hit the case where `values` is empty, but c'est la vie.
if len(msg.R.Values) == 0 {
return
}
for _, peer := range msg.R.Values {
is.eventHandlers.OnResult(IndexingResult{
infoHash: infoHash,
peerAddr: &net.TCPAddr{
IP: peer.IP,
Port: peer.Port,
},
})
}
}
func (is *IndexingService) onSampleInfohashesResponse(msg *Message, addr *net.UDPAddr) {
for i := 0; i < len(msg.R.Samples)/20; i++ {
var infoHash [20]byte
copy(infoHash[:], msg.R.Samples[i:(i+1)*20])
msg := NewGetPeersQuery(is.nodeID, infoHash[:])
t := uint16BE(is.counter)
msg.T = t[:]
is.protocol.SendMessage(msg, addr)
is.getPeersRequests[t] = infoHash
is.counter++
}
}
func uint16BE(v uint16) (b [2]byte) {
b[0] = byte(v >> 8)
b[1] = byte(v)
return
}

View File

@ -26,7 +26,12 @@ type ProtocolEventHandlers struct {
OnGetPeersResponse func(*Message, *net.UDPAddr) OnGetPeersResponse func(*Message, *net.UDPAddr)
OnFindNodeResponse func(*Message, *net.UDPAddr) OnFindNodeResponse func(*Message, *net.UDPAddr)
OnPingORAnnouncePeerResponse func(*Message, *net.UDPAddr) OnPingORAnnouncePeerResponse func(*Message, *net.UDPAddr)
OnCongestion func()
// Added by BEP 51
OnSampleInfohashesQuery func(*Message, *net.UDPAddr)
OnSampleInfohashesResponse func(*Message, *net.UDPAddr)
OnCongestion func()
} }
func NewProtocol(laddr string, eventHandlers ProtocolEventHandlers) (p *Protocol) { func NewProtocol(laddr string, eventHandlers ProtocolEventHandlers) (p *Protocol) {
@ -46,7 +51,7 @@ func NewProtocol(laddr string, eventHandlers ProtocolEventHandlers) (p *Protocol
func (p *Protocol) Start() { func (p *Protocol) Start() {
if p.started { if p.started {
zap.L().Panic("Attempting to Start() a mainline/Transport that has been already started! (Programmer error.)") zap.L().Panic("Attempting to Start() a mainline/Protocol that has been already started! (Programmer error.)")
} }
p.started = true p.started = true
@ -55,6 +60,10 @@ func (p *Protocol) Start() {
} }
func (p *Protocol) Terminate() { func (p *Protocol) Terminate() {
if !p.started {
zap.L().Panic("Attempted to Terminate() a mainline/Protocol that has not been Start()ed! (Programmer error.)")
}
p.transport.Terminate() p.transport.Terminate()
} }
@ -103,13 +112,42 @@ func (p *Protocol) onMessage(msg *Message, addr *net.UDPAddr) {
case "vote": case "vote":
// Although we are aware that such method exists, we ignore. // Although we are aware that such method exists, we ignore.
case "sample_infohashes": // Added by BEP 51
if !validateSampleInfohashesQueryMessage(msg) {
// zap.L().Debug("An invalid sample_infohashes query received!")
return
}
if p.eventHandlers.OnSampleInfohashesQuery != nil {
p.eventHandlers.OnSampleInfohashesQuery(msg, addr)
}
default: default:
// zap.L().Debug("A KRPC query of an unknown method received!", zap.String("method", msg.Q)) // zap.L().Debug("A KRPC query of an unknown method received!", zap.String("method", msg.Q))
return return
} }
case "r": case "r":
// get_peers > find_node > ping / announce_peer // Query messages have a `q` field which indicates their type but response messages have no such field that we
if len(msg.R.Token) != 0 { // The message should be a get_peers response. // can rely on.
// The idea is you'd use transaction ID (the `t` key) to deduce the type of a response message, as it must be
// sent in response to a query message (with the same transaction ID) that we have sent earlier.
// This approach is, unfortunately, not very practical for our needs since we send up to thousands messages per
// second, meaning that we'd run out of transaction IDs very quickly (since some [many?] clients assume
// transaction IDs are no longer than 2 bytes), and we'd also then have to consider retention too (as we might
// not get a response at all).
// Our approach uses an ad-hoc pattern matching: all response messages share a subset of fields (such as `t`,
// `y`) but only one type of them contain a particular field (such as `token` field is unique to `get_peers`
// responses, `samples` is unique to `sample_infohashes` etc).
//
// sample_infohashes > get_peers > find_node > ping / announce_peer
if len(msg.R.Samples) != 0 { // The message should be a sample_infohashes response.
if !validateSampleInfohashesResponseMessage(msg) {
// zap.L().Debug("An invalid sample_infohashes response received!")
return
}
if p.eventHandlers.OnSampleInfohashesResponse != nil {
p.eventHandlers.OnSampleInfohashesResponse(msg, addr)
}
} else if len(msg.R.Token) != 0 { // The message should be a get_peers response.
if !validateGetPeersResponseMessage(msg) { if !validateGetPeersResponseMessage(msg) {
// zap.L().Debug("An invalid get_peers response received!") // zap.L().Debug("An invalid get_peers response received!")
return return
@ -259,6 +297,11 @@ func validateAnnouncePeerQueryMessage(msg *Message) bool {
len(msg.A.Token) > 0 len(msg.A.Token) > 0
} }
func validateSampleInfohashesQueryMessage(msg *Message) bool {
return len(msg.A.ID) == 20 &&
len(msg.A.Target) == 20
}
func validatePingORannouncePeerResponseMessage(msg *Message) bool { func validatePingORannouncePeerResponseMessage(msg *Message) bool {
return len(msg.R.ID) == 20 return len(msg.R.ID) == 20
} }
@ -279,3 +322,11 @@ func validateGetPeersResponseMessage(msg *Message) bool {
// TODO: check for values or nodes // TODO: check for values or nodes
} }
func validateSampleInfohashesResponseMessage(msg *Message) bool {
return len(msg.R.ID) == 20 &&
msg.R.Interval >= 0 &&
// TODO: check for nodes
msg.R.Num >= 0 &&
len(msg.R.Samples)%20 == 0
}

View File

@ -9,11 +9,6 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
) )
type TrawlingResult struct {
InfoHash [20]byte
PeerAddr *net.TCPAddr
}
type TrawlingService struct { type TrawlingService struct {
// Private // Private
protocol *Protocol protocol *Protocol
@ -35,6 +30,19 @@ type TrawlingServiceEventHandlers struct {
OnResult func(TrawlingResult) OnResult func(TrawlingResult)
} }
type TrawlingResult struct {
infoHash [20]byte
peerAddr *net.TCPAddr
}
func (tr TrawlingResult) InfoHash() [20]byte {
return tr.infoHash
}
func (tr TrawlingResult) PeerAddr() *net.TCPAddr {
return tr.peerAddr
}
func NewTrawlingService(laddr string, initialMaxNeighbors uint, interval time.Duration, eventHandlers TrawlingServiceEventHandlers) *TrawlingService { func NewTrawlingService(laddr string, initialMaxNeighbors uint, interval time.Duration, eventHandlers TrawlingServiceEventHandlers) *TrawlingService {
service := new(TrawlingService) service := new(TrawlingService)
service.interval = interval service.interval = interval
@ -173,8 +181,8 @@ func (s *TrawlingService) onAnnouncePeerQuery(query *Message, addr *net.UDPAddr)
var infoHash [20]byte var infoHash [20]byte
copy(infoHash[:], query.A.InfoHash) copy(infoHash[:], query.A.InfoHash)
s.eventHandlers.OnResult(TrawlingResult{ s.eventHandlers.OnResult(TrawlingResult{
InfoHash: infoHash, infoHash: infoHash,
PeerAddr: &net.TCPAddr{ peerAddr: &net.TCPAddr{
IP: addr.IP, IP: addr.IP,
Port: peerPort, Port: peerPort,
}, },

View File

@ -1,19 +1,28 @@
package dht package dht
import ( import (
"github.com/boramalper/magnetico/cmd/magneticod/dht/mainline" "net"
"time" "time"
"go.uber.org/zap"
"github.com/boramalper/magnetico/cmd/magneticod/dht/mainline"
) )
type TrawlingManager struct { type TrawlingManager struct {
// private // private
output chan mainline.TrawlingResult output chan Result
services []*mainline.TrawlingService services []*mainline.TrawlingService
} }
type Result interface {
InfoHash() [20]byte
PeerAddr() *net.TCPAddr
}
func NewTrawlingManager(mlAddrs []string, interval time.Duration) *TrawlingManager { func NewTrawlingManager(mlAddrs []string, interval time.Duration) *TrawlingManager {
manager := new(TrawlingManager) manager := new(TrawlingManager)
manager.output = make(chan mainline.TrawlingResult) manager.output = make(chan Result, 20)
if mlAddrs == nil { if mlAddrs == nil {
mlAddrs = []string{"0.0.0.0:0"} mlAddrs = []string{"0.0.0.0:0"}
@ -24,7 +33,7 @@ func NewTrawlingManager(mlAddrs []string, interval time.Duration) *TrawlingManag
2000, 2000,
interval, interval,
mainline.TrawlingServiceEventHandlers{ mainline.TrawlingServiceEventHandlers{
OnResult: manager.onResult, OnResult: manager.onTrawlingResult,
}, },
)) ))
} }
@ -36,11 +45,15 @@ func NewTrawlingManager(mlAddrs []string, interval time.Duration) *TrawlingManag
return manager return manager
} }
func (m *TrawlingManager) onResult(res mainline.TrawlingResult) { func (m *TrawlingManager) onTrawlingResult(res mainline.TrawlingResult) {
m.output <- res select {
case m.output <- res:
default:
zap.L().Warn("DHT manager output ch is full, result dropped!")
}
} }
func (m *TrawlingManager) Output() <-chan mainline.TrawlingResult { func (m *TrawlingManager) Output() <-chan Result {
return m.output return m.output
} }

View File

@ -126,8 +126,10 @@ func main() {
for stopped := false; !stopped; { for stopped := false; !stopped; {
select { select {
case result := <-trawlingManager.Output(): case result := <-trawlingManager.Output():
zap.L().Debug("Trawled!", util.HexField("infoHash", result.InfoHash[:])) infoHash := result.InfoHash()
exists, err := database.DoesTorrentExist(result.InfoHash[:])
zap.L().Debug("Trawled!", util.HexField("infoHash", infoHash[:]))
exists, err := database.DoesTorrentExist(infoHash[:])
if err != nil { if err != nil {
zap.L().Fatal("Could not check whether torrent exists!", zap.Error(err)) zap.L().Fatal("Could not check whether torrent exists!", zap.Error(err))
} else if !exists { } else if !exists {