WIP for BEP 51 (complete but too buggy)

This commit is contained in:
Bora M. Alper 2019-05-15 13:18:42 +01:00
parent 88df49cb40
commit aae67090af
No known key found for this signature in database
GPG Key ID: 8F1A9504E1BD114D
8 changed files with 191 additions and 23 deletions

View File

@ -105,7 +105,7 @@ func (ms *Sink) Sink(res dht.Result) {
// check whether res.infoHash exists in the ms.incomingInfoHashes, and where we add the infoHash // check whether res.infoHash exists in the ms.incomingInfoHashes, and where we add the infoHash
// to the incomingInfoHashes at the end of this function. // to the incomingInfoHashes at the end of this function.
zap.L().Info("Sunk!", zap.Int("leeches", len(ms.incomingInfoHashes)), util.HexField("infoHash", infoHash[:])) zap.L().Debug("Sunk!", zap.Int("leeches", len(ms.incomingInfoHashes)), util.HexField("infoHash", infoHash[:]))
go NewLeech(infoHash, peerAddr, ms.PeerID, LeechEventHandlers{ go NewLeech(infoHash, peerAddr, ms.PeerID, LeechEventHandlers{
OnSuccess: ms.flush, OnSuccess: ms.flush,

View File

@ -1,9 +1,12 @@
package mainline package mainline
import ( import (
"math/rand"
"net" "net"
"sync" "sync"
"time" "time"
"go.uber.org/zap"
) )
type IndexingService struct { type IndexingService struct {
@ -20,6 +23,7 @@ type IndexingService struct {
// ^~~~~~ // ^~~~~~
routingTable map[string]*net.UDPAddr routingTable map[string]*net.UDPAddr
routingTableMutex *sync.Mutex routingTableMutex *sync.Mutex
maxNeighbors uint
counter uint16 counter uint16
getPeersRequests map[[2]byte][20]byte // GetPeersQuery.`t` -> infohash getPeersRequests map[[2]byte][20]byte // GetPeersQuery.`t` -> infohash
@ -48,6 +52,7 @@ func NewIndexingService(laddr string, interval time.Duration, eventHandlers Inde
service.protocol = NewProtocol( service.protocol = NewProtocol(
laddr, laddr,
ProtocolEventHandlers{ ProtocolEventHandlers{
OnFindNodeResponse: service.onFindNodeResponse,
OnGetPeersResponse: service.onGetPeersResponse, OnGetPeersResponse: service.onGetPeersResponse,
OnSampleInfohashesResponse: service.onSampleInfohashesResponse, OnSampleInfohashesResponse: service.onSampleInfohashesResponse,
}, },
@ -55,11 +60,119 @@ func NewIndexingService(laddr string, interval time.Duration, eventHandlers Inde
service.nodeID = make([]byte, 20) service.nodeID = make([]byte, 20)
service.routingTable = make(map[string]*net.UDPAddr) service.routingTable = make(map[string]*net.UDPAddr)
service.routingTableMutex = new(sync.Mutex) service.routingTableMutex = new(sync.Mutex)
service.maxNeighbors = 50
service.eventHandlers = eventHandlers service.eventHandlers = eventHandlers
service.getPeersRequests = make(map[[2]byte][20]byte)
return service return service
} }
func (is *IndexingService) Start() {
if is.started {
zap.L().Panic("Attempting to Start() a mainline/IndexingService that has been already started! (Programmer error.)")
}
is.started = true
is.protocol.Start()
go is.index()
zap.L().Info("Indexing Service started!")
}
func (is *IndexingService) Terminate() {
is.protocol.Terminate()
}
func (is *IndexingService) index() {
for range time.Tick(is.interval) {
// TODO
// For some reason, we can't still detect congestion and this keeps increasing...
// Disable for now.
// s.maxNeighbors = uint(float32(s.maxNeighbors) * 1.001)
is.routingTableMutex.Lock()
if len(is.routingTable) == 0 {
is.bootstrap()
} else {
zap.L().Info("Latest status:", zap.Int("n", len(is.routingTable)),
zap.Uint("maxNeighbors", is.maxNeighbors))
is.findNeighbors()
is.routingTable = make(map[string]*net.UDPAddr)
}
is.routingTableMutex.Unlock()
}
}
func (is *IndexingService) bootstrap() {
bootstrappingNodes := []string{
"router.bittorrent.com:6881",
"dht.transmissionbt.com:6881",
"dht.libtorrent.org:25401",
}
zap.L().Info("Bootstrapping as routing table is empty...")
for _, node := range bootstrappingNodes {
target := make([]byte, 20)
_, err := rand.Read(target)
if err != nil {
zap.L().Panic("Could NOT generate random bytes during bootstrapping!")
}
addr, err := net.ResolveUDPAddr("udp", node)
if err != nil {
zap.L().Error("Could NOT resolve (UDP) address of the bootstrapping node!",
zap.String("node", node))
continue
}
is.protocol.SendMessage(NewFindNodeQuery(is.nodeID, target), addr)
}
}
func (is *IndexingService) findNeighbors() {
target := make([]byte, 20)
for _, addr := range is.routingTable {
_, err := rand.Read(target)
if err != nil {
zap.L().Panic("Could NOT generate random bytes during bootstrapping!")
}
is.protocol.SendMessage(
NewFindNodeQuery(is.nodeID, target),
addr,
)
}
}
func (is *IndexingService) onFindNodeResponse(response *Message, addr *net.UDPAddr) {
is.routingTableMutex.Lock()
defer is.routingTableMutex.Unlock()
//zap.S().Debugf("find node response from %+v -- %+v", addr, response)
for _, node := range response.R.Nodes {
if uint(len(is.routingTable)) >= is.maxNeighbors {
break
}
if node.Addr.Port == 0 { // Ignore nodes who "use" port 0.
continue
}
is.routingTable[string(node.ID)] = &node.Addr
target := make([]byte, 20)
_, err := rand.Read(target)
if err != nil {
zap.L().Panic("Could NOT generate random bytes!")
}
is.protocol.SendMessage(
NewSampleInfohashesQuery(is.nodeID, []byte("aa"), target),
&node.Addr,
)
}
}
func (is *IndexingService) onGetPeersResponse(msg *Message, addr *net.UDPAddr) { func (is *IndexingService) onGetPeersResponse(msg *Message, addr *net.UDPAddr) {
var t [2]byte var t [2]byte
copy(t[:], msg.T) copy(t[:], msg.T)
@ -89,6 +202,7 @@ func (is *IndexingService) onGetPeersResponse(msg *Message, addr *net.UDPAddr) {
} }
func (is *IndexingService) onSampleInfohashesResponse(msg *Message, addr *net.UDPAddr) { func (is *IndexingService) onSampleInfohashesResponse(msg *Message, addr *net.UDPAddr) {
// request samples
for i := 0; i < len(msg.R.Samples)/20; i++ { for i := 0; i < len(msg.R.Samples)/20; i++ {
var infoHash [20]byte var infoHash [20]byte
copy(infoHash[:], msg.R.Samples[i:(i+1)*20]) copy(infoHash[:], msg.R.Samples[i:(i+1)*20])
@ -102,6 +216,19 @@ func (is *IndexingService) onSampleInfohashesResponse(msg *Message, addr *net.UD
is.getPeersRequests[t] = infoHash is.getPeersRequests[t] = infoHash
is.counter++ is.counter++
} }
// iterate
for _, node := range msg.R.Nodes {
target := make([]byte, 20)
_, err := rand.Read(target)
if err != nil {
zap.L().Panic("Could NOT generate random bytes!")
}
is.protocol.SendMessage(
NewSampleInfohashesQuery(is.nodeID, []byte("aa"), target),
&node.Addr,
)
}
} }
func uint16BE(v uint16) (b [2]byte) { func uint16BE(v uint16) (b [2]byte) {

View File

@ -204,14 +204,32 @@ func NewFindNodeQuery(id []byte, target []byte) *Message {
} }
} }
func NewGetPeersQuery(id []byte, info_hash []byte) *Message { func NewGetPeersQuery(id []byte, infoHash []byte) *Message {
return &Message{
Y: "q",
T: []byte("aa"),
Q: "get_peers",
A: QueryArguments{
ID: id,
InfoHash: infoHash,
},
}
}
func NewAnnouncePeerQuery(id []byte, implied_port bool, info_hash []byte, port uint16, token []byte) *Message {
panic("Not implemented yet!") panic("Not implemented yet!")
} }
func NewAnnouncePeerQuery(id []byte, implied_port bool, info_hash []byte, port uint16, func NewSampleInfohashesQuery(id []byte, t []byte, target []byte) *Message {
token []byte) *Message { return &Message{
Y: "q",
panic("Not implemented yet!") T: t,
Q: "sample_infohashes",
A: QueryArguments {
ID: id,
Target: target,
},
}
} }
func NewPingResponse(t []byte, id []byte) *Message { func NewPingResponse(t []byte, id []byte) *Message {

View File

@ -0,0 +1,5 @@
package mainline
type Service interface {
// TODO: develop a service interface to be used by the manager
}

View File

@ -150,8 +150,10 @@ func (t *Transport) WriteMessages(msg *Message, addr *net.UDPAddr) {
* *
* Source: https://docs.python.org/3/library/asyncio-protocol.html#flow-control-callbacks * Source: https://docs.python.org/3/library/asyncio-protocol.html#flow-control-callbacks
*/ */
zap.L().Warn("WRITE CONGESTION!", zap.Error(err)) //zap.L().Warn("WRITE CONGESTION!", zap.Error(err))
t.onCongestion() if t.onCongestion != nil {
t.onCongestion()
}
} else if err != nil { } else if err != nil {
zap.L().Warn("Could NOT write an UDP packet!", zap.Error(err)) zap.L().Warn("Could NOT write an UDP packet!", zap.Error(err))
} }

View File

@ -1,7 +1,7 @@
package mainline package mainline
import ( import (
"crypto/rand" "math/rand"
"net" "net"
"sync" "sync"
"time" "time"

View File

@ -11,8 +11,9 @@ import (
type TrawlingManager struct { type TrawlingManager struct {
// private // private
output chan Result output chan Result
services []*mainline.TrawlingService trawlingServices []*mainline.TrawlingService
indexingServices []*mainline.IndexingService
} }
type Result interface { type Result interface {
@ -20,25 +21,30 @@ type Result interface {
PeerAddr() *net.TCPAddr PeerAddr() *net.TCPAddr
} }
func NewTrawlingManager(mlAddrs []string, interval time.Duration) *TrawlingManager { func NewTrawlingManager(tsAddrs []string, isAddrs []string, interval time.Duration) *TrawlingManager {
manager := new(TrawlingManager) manager := new(TrawlingManager)
manager.output = make(chan Result, 20) manager.output = make(chan Result, 20)
if mlAddrs == nil { // Trawling Services
mlAddrs = []string{"0.0.0.0:0"} for _, addr := range tsAddrs {
} service := mainline.NewTrawlingService(
for _, addr := range mlAddrs {
manager.services = append(manager.services, mainline.NewTrawlingService(
addr, addr,
2000, 2000,
interval, interval,
mainline.TrawlingServiceEventHandlers{ mainline.TrawlingServiceEventHandlers{
OnResult: manager.onTrawlingResult, OnResult: manager.onTrawlingResult,
}, },
)) )
manager.trawlingServices = append(manager.trawlingServices, service)
service.Start()
} }
for _, service := range manager.services { // Indexing Services
for _, addr := range isAddrs {
service := mainline.NewIndexingService(addr, 2 * time.Second, mainline.IndexingServiceEventHandlers{
OnResult: manager.onIndexingResult,
})
manager.indexingServices = append(manager.indexingServices, service)
service.Start() service.Start()
} }
@ -49,7 +55,17 @@ func (m *TrawlingManager) onTrawlingResult(res mainline.TrawlingResult) {
select { select {
case m.output <- res: case m.output <- res:
default: default:
zap.L().Warn("DHT manager output ch is full, result dropped!") // TODO: should be a warn
zap.L().Debug("DHT manager output ch is full, result dropped!")
}
}
func (m *TrawlingManager) onIndexingResult(res mainline.IndexingResult) {
select {
case m.output <- res:
default:
// TODO: should be a warn
zap.L().Debug("DHT manager output ch is full, idx result dropped!")
} }
} }
@ -58,7 +74,7 @@ func (m *TrawlingManager) Output() <-chan Result {
} }
func (m *TrawlingManager) Terminate() { func (m *TrawlingManager) Terminate() {
for _, service := range m.services { for _, service := range m.trawlingServices {
service.Terminate() service.Terminate()
} }
} }

View File

@ -117,7 +117,7 @@ func main() {
logger.Sugar().Fatalf("Could not open the database at `%s`", opFlags.DatabaseURL, zap.Error(err)) logger.Sugar().Fatalf("Could not open the database at `%s`", opFlags.DatabaseURL, zap.Error(err))
} }
trawlingManager := dht.NewTrawlingManager(opFlags.TrawlerMlAddrs, opFlags.TrawlerMlInterval) trawlingManager := dht.NewTrawlingManager(nil, []string{"0.0.0.0:0"}, opFlags.TrawlerMlInterval)
metadataSink := metadata.NewSink(2*time.Minute, opFlags.LeechMaxN) metadataSink := metadata.NewSink(2*time.Minute, opFlags.LeechMaxN)
zap.L().Debug("Peer ID", zap.ByteString("peerID", metadataSink.PeerID)) zap.L().Debug("Peer ID", zap.ByteString("peerID", metadataSink.PeerID))
@ -161,7 +161,7 @@ func parseFlags() (*opFlags, error) {
TrawlerMlAddrs []string `long:"trawler-ml-addr" description:"Address(es) to be used by trawling DHT (Mainline) nodes." default:"0.0.0.0:0"` TrawlerMlAddrs []string `long:"trawler-ml-addr" description:"Address(es) to be used by trawling DHT (Mainline) nodes." default:"0.0.0.0:0"`
TrawlerMlInterval uint `long:"trawler-ml-interval" description:"Trawling interval in integer seconds."` TrawlerMlInterval uint `long:"trawler-ml-interval" description:"Trawling interval in integer seconds."`
LeechMaxN uint `long:"leech-max-n" description:"Maximum number of leeches." default:"1000"` LeechMaxN uint `long:"leech-max-n" description:"Maximum number of leeches." default:"100"`
Verbose []bool `short:"v" long:"verbose" description:"Increases verbosity."` Verbose []bool `short:"v" long:"verbose" description:"Increases verbosity."`
Profile string `long:"profile" description:"Enable profiling." choice:"cpu" choice:"memory" choice:"trace"` Profile string `long:"profile" description:"Enable profiling." choice:"cpu" choice:"memory" choice:"trace"`