diff --git a/cmd/magneticod/bittorrent/metadata/sink.go b/cmd/magneticod/bittorrent/metadata/sink.go index 39bc7c6..386a1d3 100644 --- a/cmd/magneticod/bittorrent/metadata/sink.go +++ b/cmd/magneticod/bittorrent/metadata/sink.go @@ -5,11 +5,11 @@ import ( "sync" "time" - "github.com/boramalper/magnetico/pkg/util" "go.uber.org/zap" "github.com/boramalper/magnetico/cmd/magneticod/dht/mainline" "github.com/boramalper/magnetico/pkg/persistence" + "github.com/boramalper/magnetico/pkg/util" ) type Metadata struct { @@ -26,6 +26,7 @@ type Metadata struct { type Sink struct { PeerID []byte deadline time.Duration + maxNLeeches int drain chan Metadata incomingInfoHashes map[[20]byte]struct{} incomingInfoHashesMx sync.Mutex @@ -67,11 +68,12 @@ func randomDigit() byte { return byte(rand.Intn(max-min) + min) } -func NewSink(deadline time.Duration) *Sink { +func NewSink(deadline time.Duration, maxNLeeches int) *Sink { ms := new(Sink) ms.PeerID = randomID() ms.deadline = deadline + ms.maxNLeeches = maxNLeeches ms.drain = make(chan Metadata) ms.incomingInfoHashes = make(map[[20]byte]struct{}) ms.termination = make(chan interface{}) @@ -86,6 +88,11 @@ func (ms *Sink) Sink(res mainline.TrawlingResult) { ms.incomingInfoHashesMx.Lock() defer ms.incomingInfoHashesMx.Unlock() + // cap the max # of leeches + if len(ms.incomingInfoHashes) >= ms.maxNLeeches { + return + } + if _, exists := ms.incomingInfoHashes[res.InfoHash]; exists { return } @@ -135,6 +142,7 @@ func (ms *Sink) flush(result Metadata) { func (ms *Sink) onLeechError(infoHash [20]byte, err error) { zap.L().Debug("leech error", util.HexField("infoHash", infoHash[:]), zap.Error(err)) + ms.incomingInfoHashesMx.Lock() delete(ms.incomingInfoHashes, infoHash) ms.incomingInfoHashesMx.Unlock() diff --git a/cmd/magneticod/dht/mainline/service.go b/cmd/magneticod/dht/mainline/service.go index 6e2101f..4a735af 100644 --- a/cmd/magneticod/dht/mainline/service.go +++ b/cmd/magneticod/dht/mainline/service.go @@ -18,6 +18,7 @@ type TrawlingService struct { // Private protocol *Protocol started bool + interval time.Duration eventHandlers TrawlingServiceEventHandlers trueNodeID []byte @@ -34,8 +35,9 @@ type TrawlingServiceEventHandlers struct { OnResult func(TrawlingResult) } -func NewTrawlingService(laddr string, initialMaxNeighbors uint, eventHandlers TrawlingServiceEventHandlers) *TrawlingService { +func NewTrawlingService(laddr string, initialMaxNeighbors uint, interval time.Duration, eventHandlers TrawlingServiceEventHandlers) *TrawlingService { service := new(TrawlingService) + service.interval = interval service.protocol = NewProtocol( laddr, ProtocolEventHandlers{ @@ -76,7 +78,7 @@ func (s *TrawlingService) Terminate() { } func (s *TrawlingService) trawl() { - for range time.Tick(1 * time.Second) { + for range time.Tick(s.interval) { // TODO // For some reason, we can't still detect congestion and this keeps increasing... // Disable for now. diff --git a/cmd/magneticod/dht/managers.go b/cmd/magneticod/dht/managers.go index 95990d0..32a370f 100644 --- a/cmd/magneticod/dht/managers.go +++ b/cmd/magneticod/dht/managers.go @@ -1,6 +1,9 @@ package dht -import "github.com/boramalper/magnetico/cmd/magneticod/dht/mainline" +import ( + "github.com/boramalper/magnetico/cmd/magneticod/dht/mainline" + "time" +) type TrawlingManager struct { // private @@ -8,7 +11,7 @@ type TrawlingManager struct { services []*mainline.TrawlingService } -func NewTrawlingManager(mlAddrs []string) *TrawlingManager { +func NewTrawlingManager(mlAddrs []string, interval time.Duration) *TrawlingManager { manager := new(TrawlingManager) manager.output = make(chan mainline.TrawlingResult) @@ -19,6 +22,7 @@ func NewTrawlingManager(mlAddrs []string) *TrawlingManager { manager.services = append(manager.services, mainline.NewTrawlingService( addr, 2000, + interval, mainline.TrawlingServiceEventHandlers{ OnResult: manager.onResult, }, diff --git a/cmd/magneticod/main.go b/cmd/magneticod/main.go index 188679b..1cfb1ed 100644 --- a/cmd/magneticod/main.go +++ b/cmd/magneticod/main.go @@ -1,7 +1,6 @@ package main import ( - "github.com/pkg/errors" "math/rand" "net" "os" @@ -9,34 +8,30 @@ import ( "runtime/pprof" "time" - "github.com/boramalper/magnetico/pkg/util" + "github.com/pkg/errors" + "github.com/jessevdk/go-flags" "go.uber.org/zap" "go.uber.org/zap/zapcore" + "github.com/boramalper/magnetico/pkg/util" + "github.com/boramalper/magnetico/cmd/magneticod/bittorrent/metadata" "github.com/boramalper/magnetico/cmd/magneticod/dht" "github.com/Wessie/appdirs" + "github.com/boramalper/magnetico/pkg/persistence" ) -type cmdFlags struct { - DatabaseURL string `long:"database" description:"URL of the database."` - - TrawlerMlAddrs []string `long:"trawler-ml-addr" description:"Address(es) to be used by trawling DHT (Mainline) nodes." default:"0.0.0.0:0"` - TrawlerMlInterval uint `long:"trawler-ml-interval" description:"Trawling interval in integer deciseconds (one tenth of a second)."` - - Verbose []bool `short:"v" long:"verbose" description:"Increases verbosity."` - Profile string `long:"profile" description:"Enable profiling." choice:"cpu" choice:"memory" choice:"trace"` -} - type opFlags struct { DatabaseURL string TrawlerMlAddrs []string TrawlerMlInterval time.Duration + LeechMaxN int + Verbosity int Profile string } @@ -51,7 +46,11 @@ func main() { zapcore.Lock(os.Stderr), loggerLevel, )) - defer logger.Sync() + defer func() { + if err := logger.Sync(); err != nil { + panic(err) + } + }() zap.ReplaceGlobals(logger) // opFlags is the "operational flags" @@ -84,8 +83,19 @@ func main() { if err != nil { zap.L().Panic("Could not open the cpu profile file!", zap.Error(err)) } - pprof.StartCPUProfile(file) - defer file.Close() + if err = pprof.StartCPUProfile(file); err != nil { + zap.L().Fatal("Could not start CPU profiling!", zap.Error(err)) + } + defer func() { + if err = file.Sync(); err != nil { + zap.L().Fatal("Could not sync profiling file!", zap.Error(err)) + } + }() + defer func() { + if err = file.Close(); err != nil { + zap.L().Fatal("Could not close profiling file!", zap.Error(err)) + } + }() defer pprof.StopCPUProfile() case "memory": @@ -99,7 +109,7 @@ func main() { rand.Seed(time.Now().UnixNano()) // Handle Ctrl-C gracefully. - interruptChan := make(chan os.Signal) + interruptChan := make(chan os.Signal, 1) signal.Notify(interruptChan, os.Interrupt) database, err := persistence.MakeDatabase(opFlags.DatabaseURL, logger) @@ -107,8 +117,8 @@ func main() { logger.Sugar().Fatalf("Could not open the database at `%s`", opFlags.DatabaseURL, zap.Error(err)) } - trawlingManager := dht.NewTrawlingManager(opFlags.TrawlerMlAddrs) - metadataSink := metadata.NewSink(2 * time.Minute) + trawlingManager := dht.NewTrawlingManager(opFlags.TrawlerMlAddrs, opFlags.TrawlerMlInterval) + metadataSink := metadata.NewSink(2*time.Minute, opFlags.LeechMaxN) zap.L().Debug("Peer ID", zap.ByteString("peerID", metadataSink.PeerID)) @@ -126,8 +136,8 @@ func main() { case md := <-metadataSink.Drain(): if err := database.AddNewTorrent(md.InfoHash, md.Name, md.Files); err != nil { - logger.Sugar().Fatalf("Could not add new torrent %x to the database", - md.InfoHash, zap.Error(err)) + zap.L().Fatal("Could not add new torrent to the database", + util.HexField("infohash", md.InfoHash), zap.Error(err)) } zap.L().Info("Fetched!", zap.String("name", md.Name), util.HexField("infoHash", md.InfoHash)) @@ -143,10 +153,21 @@ func main() { } func parseFlags() (*opFlags, error) { - opF := new(opFlags) - cmdF := new(cmdFlags) + var cmdF struct { + DatabaseURL string `long:"database" description:"URL of the database."` - _, err := flags.Parse(cmdF) + TrawlerMlAddrs []string `long:"trawler-ml-addr" description:"Address(es) to be used by trawling DHT (Mainline) nodes." default:"0.0.0.0:0"` + TrawlerMlInterval uint `long:"trawler-ml-interval" description:"Trawling interval in integer seconds."` + + LeechMaxN uint `long:"leech-max-n" description:"Maximum number of leeches." default:"1000"` + + Verbose []bool `short:"v" long:"verbose" description:"Increases verbosity."` + Profile string `long:"profile" description:"Enable profiling." choice:"cpu" choice:"memory" choice:"trace"` + } + + opF := new(opFlags) + + _, err := flags.Parse(&cmdF) if err != nil { return nil, err } @@ -170,11 +191,18 @@ func parseFlags() (*opFlags, error) { opF.TrawlerMlAddrs = cmdF.TrawlerMlAddrs } - // 1 decisecond = 100 milliseconds = 0.1 seconds if cmdF.TrawlerMlInterval == 0 { - opF.TrawlerMlInterval = time.Duration(1) * 100 * time.Millisecond + opF.TrawlerMlInterval = 1 * time.Second } else { - opF.TrawlerMlInterval = time.Duration(cmdF.TrawlerMlInterval) * 100 * time.Millisecond + opF.TrawlerMlInterval = time.Duration(cmdF.TrawlerMlInterval) * time.Second + } + + opF.LeechMaxN = int(cmdF.LeechMaxN) + if opF.LeechMaxN > 1000 { + zap.S().Warnf( + "Beware that on many systems max # of file descriptors per process is limited to 1024. " + + "Setting maximum number of leeches greater than 1k might cause \"too many open files\" errors!", + ) } opF.Verbosity = len(cmdF.Verbose)