[magneticod] add parameters to adjust resource usage

This commit is contained in:
Bora M. Alper 2019-05-21 13:31:01 +01:00
parent d55e65419d
commit 75abc0ee02
No known key found for this signature in database
GPG Key ID: 8F1A9504E1BD114D
4 changed files with 30 additions and 17 deletions

View File

@ -46,7 +46,7 @@ func (ir IndexingResult) PeerAddrs() []net.TCPAddr {
return ir.peerAddrs
}
func NewIndexingService(laddr string, interval time.Duration, eventHandlers IndexingServiceEventHandlers) *IndexingService {
func NewIndexingService(laddr string, interval time.Duration, maxNeighbors uint, eventHandlers IndexingServiceEventHandlers) *IndexingService {
service := new(IndexingService)
service.interval = interval
service.protocol = NewProtocol(
@ -60,7 +60,7 @@ func NewIndexingService(laddr string, interval time.Duration, eventHandlers Inde
service.nodeID = make([]byte, 20)
service.routingTable = make(map[string]*net.UDPAddr)
service.routingTableMutex = new(sync.Mutex)
service.maxNeighbors = 50
service.maxNeighbors = maxNeighbors
service.eventHandlers = eventHandlers
service.getPeersRequests = make(map[[2]byte][20]byte)
@ -92,6 +92,7 @@ func (is *IndexingService) index() {
} else {
zap.L().Info("Latest status:", zap.Int("n", len(is.routingTable)),
zap.Uint("maxNeighbors", is.maxNeighbors))
//TODO
is.findNeighbors()
is.routingTable = make(map[string]*net.UDPAddr)
}
@ -134,7 +135,7 @@ func (is *IndexingService) findNeighbors() {
}
is.protocol.SendMessage(
NewFindNodeQuery(is.nodeID, target),
NewSampleInfohashesQuery(is.nodeID, []byte("aa"), target),
addr,
)
}
@ -144,8 +145,6 @@ func (is *IndexingService) onFindNodeResponse(response *Message, addr *net.UDPAd
is.routingTableMutex.Lock()
defer is.routingTableMutex.Unlock()
//zap.S().Debugf("find node response from %+v -- %+v", addr, response)
for _, node := range response.R.Nodes {
if uint(len(is.routingTable)) >= is.maxNeighbors {
break
@ -221,6 +220,16 @@ func (is *IndexingService) onSampleInfohashesResponse(msg *Message, addr *net.UD
// iterate
for _, node := range msg.R.Nodes {
if uint(len(is.routingTable)) >= is.maxNeighbors {
break
}
if node.Addr.Port == 0 { // Ignore nodes who "use" port 0.
continue
}
is.routingTable[string(node.ID)] = &node.Addr
// TODO
/*
target := make([]byte, 20)
_, err := rand.Read(target)
if err != nil {
@ -230,6 +239,7 @@ func (is *IndexingService) onSampleInfohashesResponse(msg *Message, addr *net.UD
NewSampleInfohashesQuery(is.nodeID, []byte("aa"), target),
&node.Addr,
)
*/
}
}

View File

@ -24,12 +24,12 @@ type Manager struct {
indexingServices []Service
}
func NewTrawlingManager(addrs []string, interval time.Duration) *Manager {
func NewManager(addrs []string, interval time.Duration, maxNeighbors uint) *Manager {
manager := new(Manager)
manager.output = make(chan Result, 20)
for _, addr := range addrs {
service := mainline.NewIndexingService(addr, 2*time.Second, mainline.IndexingServiceEventHandlers{
service := mainline.NewIndexingService(addr, interval, maxNeighbors, mainline.IndexingServiceEventHandlers{
OnResult: manager.onIndexingResult,
})
manager.indexingServices = append(manager.indexingServices, service)

View File

@ -28,6 +28,7 @@ type opFlags struct {
IndexerAddrs []string
IndexerInterval time.Duration
IndexerMaxNeighbors uint
LeechMaxN int
@ -96,7 +97,7 @@ func main() {
logger.Sugar().Fatalf("Could not open the database at `%s`", opFlags.DatabaseURL, zap.Error(err))
}
trawlingManager := dht.NewTrawlingManager(opFlags.IndexerAddrs, opFlags.IndexerInterval)
trawlingManager := dht.NewManager(opFlags.IndexerAddrs, opFlags.IndexerInterval, opFlags.IndexerMaxNeighbors)
metadataSink := metadata.NewSink(5*time.Second, opFlags.LeechMaxN)
// The Event Loop
@ -136,7 +137,8 @@ func parseFlags() (*opFlags, error) {
DatabaseURL string `long:"database" description:"URL of the database."`
IndexerAddrs []string `long:"indexer-addr" description:"Address(es) to be used by indexing DHT nodes." default:"0.0.0.0:0"`
IndexerInterval uint `long:"indexer-interval" description:"Indexing interval in integer seconds."`
IndexerInterval uint `long:"indexer-interval" description:"Indexing interval in integer seconds." default:"1"`
IndexerMaxNeighbors uint `long:"indexer-max-neighbors" description:"Maximum number of neighbors of an indexer." default:"10000"`
LeechMaxN uint `long:"leech-max-n" description:"Maximum number of leeches." default:"200"`
@ -170,11 +172,8 @@ func parseFlags() (*opFlags, error) {
opF.IndexerAddrs = cmdF.IndexerAddrs
}
if cmdF.IndexerInterval == 0 {
opF.IndexerInterval = 2 * time.Second
} else {
opF.IndexerInterval = time.Duration(cmdF.IndexerInterval) * time.Second
}
opF.IndexerMaxNeighbors = cmdF.IndexerMaxNeighbors
opF.LeechMaxN = int(cmdF.LeechMaxN)
if opF.LeechMaxN > 1000 {

4
go.mod
View File

@ -4,14 +4,18 @@ require (
github.com/Wessie/appdirs v0.0.0-20141031215813-6573e894f8e2
github.com/anacrolix/missinggo v1.1.0
github.com/anacrolix/torrent v1.1.4
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e // indirect
github.com/dustin/go-humanize v1.0.0
github.com/google/pprof v0.0.0-20190515194954-54271f7e092f // indirect
github.com/gorilla/mux v1.7.2
github.com/gorilla/schema v1.1.0
github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6 // indirect
github.com/jessevdk/go-flags v1.4.0
github.com/kevinburke/go-bindata v3.13.0+incompatible // indirect
github.com/libp2p/go-sockaddr v0.0.1
github.com/mattn/go-sqlite3 v1.10.0
github.com/pkg/errors v0.8.1
github.com/pkg/profile v1.3.0
github.com/willf/bloom v2.0.3+incompatible
go.uber.org/atomic v1.4.0 // indirect
go.uber.org/multierr v1.1.0 // indirect