From aae67090af3d3d47b0aec60d07f6ee54545fa71c Mon Sep 17 00:00:00 2001 From: "Bora M. Alper" Date: Wed, 15 May 2019 13:18:42 +0100 Subject: [PATCH] WIP for BEP 51 (complete but too buggy) --- cmd/magneticod/bittorrent/metadata/sink.go | 2 +- .../dht/mainline/indexingService.go | 127 ++++++++++++++++++ cmd/magneticod/dht/mainline/protocol.go | 28 +++- cmd/magneticod/dht/mainline/service.go | 5 + cmd/magneticod/dht/mainline/transport.go | 6 +- .../dht/mainline/trawlingService.go | 2 +- cmd/magneticod/dht/managers.go | 40 ++++-- cmd/magneticod/main.go | 4 +- 8 files changed, 191 insertions(+), 23 deletions(-) create mode 100644 cmd/magneticod/dht/mainline/service.go diff --git a/cmd/magneticod/bittorrent/metadata/sink.go b/cmd/magneticod/bittorrent/metadata/sink.go index 518ecd0..dab24b6 100644 --- a/cmd/magneticod/bittorrent/metadata/sink.go +++ b/cmd/magneticod/bittorrent/metadata/sink.go @@ -105,7 +105,7 @@ func (ms *Sink) Sink(res dht.Result) { // check whether res.infoHash exists in the ms.incomingInfoHashes, and where we add the infoHash // to the incomingInfoHashes at the end of this function. - zap.L().Info("Sunk!", zap.Int("leeches", len(ms.incomingInfoHashes)), util.HexField("infoHash", infoHash[:])) + zap.L().Debug("Sunk!", zap.Int("leeches", len(ms.incomingInfoHashes)), util.HexField("infoHash", infoHash[:])) go NewLeech(infoHash, peerAddr, ms.PeerID, LeechEventHandlers{ OnSuccess: ms.flush, diff --git a/cmd/magneticod/dht/mainline/indexingService.go b/cmd/magneticod/dht/mainline/indexingService.go index 307c13d..67bb81d 100644 --- a/cmd/magneticod/dht/mainline/indexingService.go +++ b/cmd/magneticod/dht/mainline/indexingService.go @@ -1,9 +1,12 @@ package mainline import ( + "math/rand" "net" "sync" "time" + + "go.uber.org/zap" ) type IndexingService struct { @@ -20,6 +23,7 @@ type IndexingService struct { // ^~~~~~ routingTable map[string]*net.UDPAddr routingTableMutex *sync.Mutex + maxNeighbors uint counter uint16 getPeersRequests map[[2]byte][20]byte // GetPeersQuery.`t` -> infohash @@ -48,6 +52,7 @@ func NewIndexingService(laddr string, interval time.Duration, eventHandlers Inde service.protocol = NewProtocol( laddr, ProtocolEventHandlers{ + OnFindNodeResponse: service.onFindNodeResponse, OnGetPeersResponse: service.onGetPeersResponse, OnSampleInfohashesResponse: service.onSampleInfohashesResponse, }, @@ -55,11 +60,119 @@ func NewIndexingService(laddr string, interval time.Duration, eventHandlers Inde service.nodeID = make([]byte, 20) service.routingTable = make(map[string]*net.UDPAddr) service.routingTableMutex = new(sync.Mutex) + service.maxNeighbors = 50 service.eventHandlers = eventHandlers + service.getPeersRequests = make(map[[2]byte][20]byte) + return service } +func (is *IndexingService) Start() { + if is.started { + zap.L().Panic("Attempting to Start() a mainline/IndexingService that has been already started! (Programmer error.)") + } + is.started = true + + is.protocol.Start() + go is.index() + + zap.L().Info("Indexing Service started!") +} + +func (is *IndexingService) Terminate() { + is.protocol.Terminate() +} + +func (is *IndexingService) index() { + for range time.Tick(is.interval) { + // TODO + // For some reason, we can't still detect congestion and this keeps increasing... + // Disable for now. + // s.maxNeighbors = uint(float32(s.maxNeighbors) * 1.001) + + is.routingTableMutex.Lock() + if len(is.routingTable) == 0 { + is.bootstrap() + } else { + zap.L().Info("Latest status:", zap.Int("n", len(is.routingTable)), + zap.Uint("maxNeighbors", is.maxNeighbors)) + is.findNeighbors() + is.routingTable = make(map[string]*net.UDPAddr) + } + is.routingTableMutex.Unlock() + } +} + +func (is *IndexingService) bootstrap() { + bootstrappingNodes := []string{ + "router.bittorrent.com:6881", + "dht.transmissionbt.com:6881", + "dht.libtorrent.org:25401", + } + + zap.L().Info("Bootstrapping as routing table is empty...") + for _, node := range bootstrappingNodes { + target := make([]byte, 20) + _, err := rand.Read(target) + if err != nil { + zap.L().Panic("Could NOT generate random bytes during bootstrapping!") + } + + addr, err := net.ResolveUDPAddr("udp", node) + if err != nil { + zap.L().Error("Could NOT resolve (UDP) address of the bootstrapping node!", + zap.String("node", node)) + continue + } + + is.protocol.SendMessage(NewFindNodeQuery(is.nodeID, target), addr) + } +} + +func (is *IndexingService) findNeighbors() { + target := make([]byte, 20) + for _, addr := range is.routingTable { + _, err := rand.Read(target) + if err != nil { + zap.L().Panic("Could NOT generate random bytes during bootstrapping!") + } + + is.protocol.SendMessage( + NewFindNodeQuery(is.nodeID, target), + addr, + ) + } +} + +func (is *IndexingService) onFindNodeResponse(response *Message, addr *net.UDPAddr) { + is.routingTableMutex.Lock() + defer is.routingTableMutex.Unlock() + + //zap.S().Debugf("find node response from %+v -- %+v", addr, response) + + for _, node := range response.R.Nodes { + if uint(len(is.routingTable)) >= is.maxNeighbors { + break + } + if node.Addr.Port == 0 { // Ignore nodes who "use" port 0. + continue + } + + is.routingTable[string(node.ID)] = &node.Addr + + target := make([]byte, 20) + _, err := rand.Read(target) + if err != nil { + zap.L().Panic("Could NOT generate random bytes!") + } + is.protocol.SendMessage( + NewSampleInfohashesQuery(is.nodeID, []byte("aa"), target), + &node.Addr, + ) + } +} + func (is *IndexingService) onGetPeersResponse(msg *Message, addr *net.UDPAddr) { var t [2]byte copy(t[:], msg.T) @@ -89,6 +202,7 @@ func (is *IndexingService) onGetPeersResponse(msg *Message, addr *net.UDPAddr) { } func (is *IndexingService) onSampleInfohashesResponse(msg *Message, addr *net.UDPAddr) { + // request samples for i := 0; i < len(msg.R.Samples)/20; i++ { var infoHash [20]byte copy(infoHash[:], msg.R.Samples[i:(i+1)*20]) @@ -102,6 +216,19 @@ func (is *IndexingService) onSampleInfohashesResponse(msg *Message, addr *net.UD is.getPeersRequests[t] = infoHash is.counter++ } + + // iterate + for _, node := range msg.R.Nodes { + target := make([]byte, 20) + _, err := rand.Read(target) + if err != nil { + zap.L().Panic("Could NOT generate random bytes!") + } + is.protocol.SendMessage( + NewSampleInfohashesQuery(is.nodeID, []byte("aa"), target), + &node.Addr, + ) + } } func uint16BE(v uint16) (b [2]byte) { diff --git a/cmd/magneticod/dht/mainline/protocol.go b/cmd/magneticod/dht/mainline/protocol.go index a436147..fab7d80 100644 --- a/cmd/magneticod/dht/mainline/protocol.go +++ b/cmd/magneticod/dht/mainline/protocol.go @@ -204,14 +204,32 @@ func NewFindNodeQuery(id []byte, target []byte) *Message { } } -func NewGetPeersQuery(id []byte, info_hash []byte) *Message { +func NewGetPeersQuery(id []byte, infoHash []byte) *Message { + return &Message{ + Y: "q", + T: []byte("aa"), + Q: "get_peers", + A: QueryArguments{ + ID: id, + InfoHash: infoHash, + }, + } +} + +func NewAnnouncePeerQuery(id []byte, implied_port bool, info_hash []byte, port uint16, token []byte) *Message { panic("Not implemented yet!") } -func NewAnnouncePeerQuery(id []byte, implied_port bool, info_hash []byte, port uint16, - token []byte) *Message { - - panic("Not implemented yet!") +func NewSampleInfohashesQuery(id []byte, t []byte, target []byte) *Message { + return &Message{ + Y: "q", + T: t, + Q: "sample_infohashes", + A: QueryArguments { + ID: id, + Target: target, + }, + } } func NewPingResponse(t []byte, id []byte) *Message { diff --git a/cmd/magneticod/dht/mainline/service.go b/cmd/magneticod/dht/mainline/service.go new file mode 100644 index 0000000..c7a9752 --- /dev/null +++ b/cmd/magneticod/dht/mainline/service.go @@ -0,0 +1,5 @@ +package mainline + +type Service interface { + // TODO: develop a service interface to be used by the manager +} diff --git a/cmd/magneticod/dht/mainline/transport.go b/cmd/magneticod/dht/mainline/transport.go index 5d21ab7..2902160 100644 --- a/cmd/magneticod/dht/mainline/transport.go +++ b/cmd/magneticod/dht/mainline/transport.go @@ -150,8 +150,10 @@ func (t *Transport) WriteMessages(msg *Message, addr *net.UDPAddr) { * * Source: https://docs.python.org/3/library/asyncio-protocol.html#flow-control-callbacks */ - zap.L().Warn("WRITE CONGESTION!", zap.Error(err)) - t.onCongestion() + //zap.L().Warn("WRITE CONGESTION!", zap.Error(err)) + if t.onCongestion != nil { + t.onCongestion() + } } else if err != nil { zap.L().Warn("Could NOT write an UDP packet!", zap.Error(err)) } diff --git a/cmd/magneticod/dht/mainline/trawlingService.go b/cmd/magneticod/dht/mainline/trawlingService.go index f88d7e0..08e44cd 100644 --- a/cmd/magneticod/dht/mainline/trawlingService.go +++ b/cmd/magneticod/dht/mainline/trawlingService.go @@ -1,7 +1,7 @@ package mainline import ( - "crypto/rand" + "math/rand" "net" "sync" "time" diff --git a/cmd/magneticod/dht/managers.go b/cmd/magneticod/dht/managers.go index 2578912..aed2efb 100644 --- a/cmd/magneticod/dht/managers.go +++ b/cmd/magneticod/dht/managers.go @@ -11,8 +11,9 @@ import ( type TrawlingManager struct { // private - output chan Result - services []*mainline.TrawlingService + output chan Result + trawlingServices []*mainline.TrawlingService + indexingServices []*mainline.IndexingService } type Result interface { @@ -20,25 +21,30 @@ type Result interface { PeerAddr() *net.TCPAddr } -func NewTrawlingManager(mlAddrs []string, interval time.Duration) *TrawlingManager { +func NewTrawlingManager(tsAddrs []string, isAddrs []string, interval time.Duration) *TrawlingManager { manager := new(TrawlingManager) manager.output = make(chan Result, 20) - if mlAddrs == nil { - mlAddrs = []string{"0.0.0.0:0"} - } - for _, addr := range mlAddrs { - manager.services = append(manager.services, mainline.NewTrawlingService( + // Trawling Services + for _, addr := range tsAddrs { + service := mainline.NewTrawlingService( addr, 2000, interval, mainline.TrawlingServiceEventHandlers{ OnResult: manager.onTrawlingResult, }, - )) + ) + manager.trawlingServices = append(manager.trawlingServices, service) + service.Start() } - for _, service := range manager.services { + // Indexing Services + for _, addr := range isAddrs { + service := mainline.NewIndexingService(addr, 2 * time.Second, mainline.IndexingServiceEventHandlers{ + OnResult: manager.onIndexingResult, + }) + manager.indexingServices = append(manager.indexingServices, service) service.Start() } @@ -49,7 +55,17 @@ func (m *TrawlingManager) onTrawlingResult(res mainline.TrawlingResult) { select { case m.output <- res: default: - zap.L().Warn("DHT manager output ch is full, result dropped!") + // TODO: should be a warn + zap.L().Debug("DHT manager output ch is full, result dropped!") + } +} + +func (m *TrawlingManager) onIndexingResult(res mainline.IndexingResult) { + select { + case m.output <- res: + default: + // TODO: should be a warn + zap.L().Debug("DHT manager output ch is full, idx result dropped!") } } @@ -58,7 +74,7 @@ func (m *TrawlingManager) Output() <-chan Result { } func (m *TrawlingManager) Terminate() { - for _, service := range m.services { + for _, service := range m.trawlingServices { service.Terminate() } } diff --git a/cmd/magneticod/main.go b/cmd/magneticod/main.go index 9bfd686..3102544 100644 --- a/cmd/magneticod/main.go +++ b/cmd/magneticod/main.go @@ -117,7 +117,7 @@ func main() { logger.Sugar().Fatalf("Could not open the database at `%s`", opFlags.DatabaseURL, zap.Error(err)) } - trawlingManager := dht.NewTrawlingManager(opFlags.TrawlerMlAddrs, opFlags.TrawlerMlInterval) + trawlingManager := dht.NewTrawlingManager(nil, []string{"0.0.0.0:0"}, opFlags.TrawlerMlInterval) metadataSink := metadata.NewSink(2*time.Minute, opFlags.LeechMaxN) zap.L().Debug("Peer ID", zap.ByteString("peerID", metadataSink.PeerID)) @@ -161,7 +161,7 @@ func parseFlags() (*opFlags, error) { TrawlerMlAddrs []string `long:"trawler-ml-addr" description:"Address(es) to be used by trawling DHT (Mainline) nodes." default:"0.0.0.0:0"` TrawlerMlInterval uint `long:"trawler-ml-interval" description:"Trawling interval in integer seconds."` - LeechMaxN uint `long:"leech-max-n" description:"Maximum number of leeches." default:"1000"` + LeechMaxN uint `long:"leech-max-n" description:"Maximum number of leeches." default:"100"` Verbose []bool `short:"v" long:"verbose" description:"Increases verbosity."` Profile string `long:"profile" description:"Enable profiling." choice:"cpu" choice:"memory" choice:"trace"`