From 75abc0ee02960959879b3b9bc192b203229cb6ae Mon Sep 17 00:00:00 2001 From: "Bora M. Alper" Date: Tue, 21 May 2019 13:31:01 +0100 Subject: [PATCH] [magneticod] add parameters to adjust resource usage --- .../dht/mainline/indexingService.go | 20 ++++++++++++++----- cmd/magneticod/dht/managers.go | 4 ++-- cmd/magneticod/main.go | 19 +++++++++--------- go.mod | 4 ++++ 4 files changed, 30 insertions(+), 17 deletions(-) diff --git a/cmd/magneticod/dht/mainline/indexingService.go b/cmd/magneticod/dht/mainline/indexingService.go index b40387e..0a6d029 100644 --- a/cmd/magneticod/dht/mainline/indexingService.go +++ b/cmd/magneticod/dht/mainline/indexingService.go @@ -46,7 +46,7 @@ func (ir IndexingResult) PeerAddrs() []net.TCPAddr { return ir.peerAddrs } -func NewIndexingService(laddr string, interval time.Duration, eventHandlers IndexingServiceEventHandlers) *IndexingService { +func NewIndexingService(laddr string, interval time.Duration, maxNeighbors uint, eventHandlers IndexingServiceEventHandlers) *IndexingService { service := new(IndexingService) service.interval = interval service.protocol = NewProtocol( @@ -60,7 +60,7 @@ func NewIndexingService(laddr string, interval time.Duration, eventHandlers Inde service.nodeID = make([]byte, 20) service.routingTable = make(map[string]*net.UDPAddr) service.routingTableMutex = new(sync.Mutex) - service.maxNeighbors = 50 + service.maxNeighbors = maxNeighbors service.eventHandlers = eventHandlers service.getPeersRequests = make(map[[2]byte][20]byte) @@ -92,6 +92,7 @@ func (is *IndexingService) index() { } else { zap.L().Info("Latest status:", zap.Int("n", len(is.routingTable)), zap.Uint("maxNeighbors", is.maxNeighbors)) + //TODO is.findNeighbors() is.routingTable = make(map[string]*net.UDPAddr) } @@ -134,7 +135,7 @@ func (is *IndexingService) findNeighbors() { } is.protocol.SendMessage( - NewFindNodeQuery(is.nodeID, target), + NewSampleInfohashesQuery(is.nodeID, []byte("aa"), target), addr, ) } @@ -144,8 +145,6 @@ func (is *IndexingService) onFindNodeResponse(response *Message, addr *net.UDPAd is.routingTableMutex.Lock() defer is.routingTableMutex.Unlock() - //zap.S().Debugf("find node response from %+v -- %+v", addr, response) - for _, node := range response.R.Nodes { if uint(len(is.routingTable)) >= is.maxNeighbors { break @@ -221,6 +220,16 @@ func (is *IndexingService) onSampleInfohashesResponse(msg *Message, addr *net.UD // iterate for _, node := range msg.R.Nodes { + if uint(len(is.routingTable)) >= is.maxNeighbors { + break + } + if node.Addr.Port == 0 { // Ignore nodes who "use" port 0. + continue + } + is.routingTable[string(node.ID)] = &node.Addr + + // TODO + /* target := make([]byte, 20) _, err := rand.Read(target) if err != nil { @@ -230,6 +239,7 @@ func (is *IndexingService) onSampleInfohashesResponse(msg *Message, addr *net.UD NewSampleInfohashesQuery(is.nodeID, []byte("aa"), target), &node.Addr, ) + */ } } diff --git a/cmd/magneticod/dht/managers.go b/cmd/magneticod/dht/managers.go index 1c655d6..0665e9b 100644 --- a/cmd/magneticod/dht/managers.go +++ b/cmd/magneticod/dht/managers.go @@ -24,12 +24,12 @@ type Manager struct { indexingServices []Service } -func NewTrawlingManager(addrs []string, interval time.Duration) *Manager { +func NewManager(addrs []string, interval time.Duration, maxNeighbors uint) *Manager { manager := new(Manager) manager.output = make(chan Result, 20) for _, addr := range addrs { - service := mainline.NewIndexingService(addr, 2*time.Second, mainline.IndexingServiceEventHandlers{ + service := mainline.NewIndexingService(addr, interval, maxNeighbors, mainline.IndexingServiceEventHandlers{ OnResult: manager.onIndexingResult, }) manager.indexingServices = append(manager.indexingServices, service) diff --git a/cmd/magneticod/main.go b/cmd/magneticod/main.go index 251c671..938d0bf 100644 --- a/cmd/magneticod/main.go +++ b/cmd/magneticod/main.go @@ -26,8 +26,9 @@ import ( type opFlags struct { DatabaseURL string - IndexerAddrs []string - IndexerInterval time.Duration + IndexerAddrs []string + IndexerInterval time.Duration + IndexerMaxNeighbors uint LeechMaxN int @@ -96,7 +97,7 @@ func main() { logger.Sugar().Fatalf("Could not open the database at `%s`", opFlags.DatabaseURL, zap.Error(err)) } - trawlingManager := dht.NewTrawlingManager(opFlags.IndexerAddrs, opFlags.IndexerInterval) + trawlingManager := dht.NewManager(opFlags.IndexerAddrs, opFlags.IndexerInterval, opFlags.IndexerMaxNeighbors) metadataSink := metadata.NewSink(5*time.Second, opFlags.LeechMaxN) // The Event Loop @@ -135,8 +136,9 @@ func parseFlags() (*opFlags, error) { var cmdF struct { DatabaseURL string `long:"database" description:"URL of the database."` - IndexerAddrs []string `long:"indexer-addr" description:"Address(es) to be used by indexing DHT nodes." default:"0.0.0.0:0"` - IndexerInterval uint `long:"indexer-interval" description:"Indexing interval in integer seconds."` + IndexerAddrs []string `long:"indexer-addr" description:"Address(es) to be used by indexing DHT nodes." default:"0.0.0.0:0"` + IndexerInterval uint `long:"indexer-interval" description:"Indexing interval in integer seconds." default:"1"` + IndexerMaxNeighbors uint `long:"indexer-max-neighbors" description:"Maximum number of neighbors of an indexer." default:"10000"` LeechMaxN uint `long:"leech-max-n" description:"Maximum number of leeches." default:"200"` @@ -170,11 +172,8 @@ func parseFlags() (*opFlags, error) { opF.IndexerAddrs = cmdF.IndexerAddrs } - if cmdF.IndexerInterval == 0 { - opF.IndexerInterval = 2 * time.Second - } else { - opF.IndexerInterval = time.Duration(cmdF.IndexerInterval) * time.Second - } + opF.IndexerInterval = time.Duration(cmdF.IndexerInterval) * time.Second + opF.IndexerMaxNeighbors = cmdF.IndexerMaxNeighbors opF.LeechMaxN = int(cmdF.LeechMaxN) if opF.LeechMaxN > 1000 { diff --git a/go.mod b/go.mod index 980b210..b8d1f96 100644 --- a/go.mod +++ b/go.mod @@ -4,14 +4,18 @@ require ( github.com/Wessie/appdirs v0.0.0-20141031215813-6573e894f8e2 github.com/anacrolix/missinggo v1.1.0 github.com/anacrolix/torrent v1.1.4 + github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e // indirect github.com/dustin/go-humanize v1.0.0 + github.com/google/pprof v0.0.0-20190515194954-54271f7e092f // indirect github.com/gorilla/mux v1.7.2 github.com/gorilla/schema v1.1.0 + github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6 // indirect github.com/jessevdk/go-flags v1.4.0 github.com/kevinburke/go-bindata v3.13.0+incompatible // indirect github.com/libp2p/go-sockaddr v0.0.1 github.com/mattn/go-sqlite3 v1.10.0 github.com/pkg/errors v0.8.1 + github.com/pkg/profile v1.3.0 github.com/willf/bloom v2.0.3+incompatible go.uber.org/atomic v1.4.0 // indirect go.uber.org/multierr v1.1.0 // indirect