max # of leeches limited, fixed parameters are now configurable

This commit is contained in:
Bora M. Alper 2018-12-30 08:24:14 +03:00
parent c30f69d1e9
commit 32fbe90604
No known key found for this signature in database
GPG Key ID: 8F1A9504E1BD114D
4 changed files with 74 additions and 32 deletions

View File

@ -5,11 +5,11 @@ import (
"sync" "sync"
"time" "time"
"github.com/boramalper/magnetico/pkg/util"
"go.uber.org/zap" "go.uber.org/zap"
"github.com/boramalper/magnetico/cmd/magneticod/dht/mainline" "github.com/boramalper/magnetico/cmd/magneticod/dht/mainline"
"github.com/boramalper/magnetico/pkg/persistence" "github.com/boramalper/magnetico/pkg/persistence"
"github.com/boramalper/magnetico/pkg/util"
) )
type Metadata struct { type Metadata struct {
@ -26,6 +26,7 @@ type Metadata struct {
type Sink struct { type Sink struct {
PeerID []byte PeerID []byte
deadline time.Duration deadline time.Duration
maxNLeeches int
drain chan Metadata drain chan Metadata
incomingInfoHashes map[[20]byte]struct{} incomingInfoHashes map[[20]byte]struct{}
incomingInfoHashesMx sync.Mutex incomingInfoHashesMx sync.Mutex
@ -67,11 +68,12 @@ func randomDigit() byte {
return byte(rand.Intn(max-min) + min) return byte(rand.Intn(max-min) + min)
} }
func NewSink(deadline time.Duration) *Sink { func NewSink(deadline time.Duration, maxNLeeches int) *Sink {
ms := new(Sink) ms := new(Sink)
ms.PeerID = randomID() ms.PeerID = randomID()
ms.deadline = deadline ms.deadline = deadline
ms.maxNLeeches = maxNLeeches
ms.drain = make(chan Metadata) ms.drain = make(chan Metadata)
ms.incomingInfoHashes = make(map[[20]byte]struct{}) ms.incomingInfoHashes = make(map[[20]byte]struct{})
ms.termination = make(chan interface{}) ms.termination = make(chan interface{})
@ -86,6 +88,11 @@ func (ms *Sink) Sink(res mainline.TrawlingResult) {
ms.incomingInfoHashesMx.Lock() ms.incomingInfoHashesMx.Lock()
defer ms.incomingInfoHashesMx.Unlock() defer ms.incomingInfoHashesMx.Unlock()
// cap the max # of leeches
if len(ms.incomingInfoHashes) >= ms.maxNLeeches {
return
}
if _, exists := ms.incomingInfoHashes[res.InfoHash]; exists { if _, exists := ms.incomingInfoHashes[res.InfoHash]; exists {
return return
} }
@ -135,6 +142,7 @@ func (ms *Sink) flush(result Metadata) {
func (ms *Sink) onLeechError(infoHash [20]byte, err error) { func (ms *Sink) onLeechError(infoHash [20]byte, err error) {
zap.L().Debug("leech error", util.HexField("infoHash", infoHash[:]), zap.Error(err)) zap.L().Debug("leech error", util.HexField("infoHash", infoHash[:]), zap.Error(err))
ms.incomingInfoHashesMx.Lock() ms.incomingInfoHashesMx.Lock()
delete(ms.incomingInfoHashes, infoHash) delete(ms.incomingInfoHashes, infoHash)
ms.incomingInfoHashesMx.Unlock() ms.incomingInfoHashesMx.Unlock()

View File

@ -18,6 +18,7 @@ type TrawlingService struct {
// Private // Private
protocol *Protocol protocol *Protocol
started bool started bool
interval time.Duration
eventHandlers TrawlingServiceEventHandlers eventHandlers TrawlingServiceEventHandlers
trueNodeID []byte trueNodeID []byte
@ -34,8 +35,9 @@ type TrawlingServiceEventHandlers struct {
OnResult func(TrawlingResult) OnResult func(TrawlingResult)
} }
func NewTrawlingService(laddr string, initialMaxNeighbors uint, eventHandlers TrawlingServiceEventHandlers) *TrawlingService { func NewTrawlingService(laddr string, initialMaxNeighbors uint, interval time.Duration, eventHandlers TrawlingServiceEventHandlers) *TrawlingService {
service := new(TrawlingService) service := new(TrawlingService)
service.interval = interval
service.protocol = NewProtocol( service.protocol = NewProtocol(
laddr, laddr,
ProtocolEventHandlers{ ProtocolEventHandlers{
@ -76,7 +78,7 @@ func (s *TrawlingService) Terminate() {
} }
func (s *TrawlingService) trawl() { func (s *TrawlingService) trawl() {
for range time.Tick(1 * time.Second) { for range time.Tick(s.interval) {
// TODO // TODO
// For some reason, we can't still detect congestion and this keeps increasing... // For some reason, we can't still detect congestion and this keeps increasing...
// Disable for now. // Disable for now.

View File

@ -1,6 +1,9 @@
package dht package dht
import "github.com/boramalper/magnetico/cmd/magneticod/dht/mainline" import (
"github.com/boramalper/magnetico/cmd/magneticod/dht/mainline"
"time"
)
type TrawlingManager struct { type TrawlingManager struct {
// private // private
@ -8,7 +11,7 @@ type TrawlingManager struct {
services []*mainline.TrawlingService services []*mainline.TrawlingService
} }
func NewTrawlingManager(mlAddrs []string) *TrawlingManager { func NewTrawlingManager(mlAddrs []string, interval time.Duration) *TrawlingManager {
manager := new(TrawlingManager) manager := new(TrawlingManager)
manager.output = make(chan mainline.TrawlingResult) manager.output = make(chan mainline.TrawlingResult)
@ -19,6 +22,7 @@ func NewTrawlingManager(mlAddrs []string) *TrawlingManager {
manager.services = append(manager.services, mainline.NewTrawlingService( manager.services = append(manager.services, mainline.NewTrawlingService(
addr, addr,
2000, 2000,
interval,
mainline.TrawlingServiceEventHandlers{ mainline.TrawlingServiceEventHandlers{
OnResult: manager.onResult, OnResult: manager.onResult,
}, },

View File

@ -1,7 +1,6 @@
package main package main
import ( import (
"github.com/pkg/errors"
"math/rand" "math/rand"
"net" "net"
"os" "os"
@ -9,34 +8,30 @@ import (
"runtime/pprof" "runtime/pprof"
"time" "time"
"github.com/boramalper/magnetico/pkg/util" "github.com/pkg/errors"
"github.com/jessevdk/go-flags" "github.com/jessevdk/go-flags"
"go.uber.org/zap" "go.uber.org/zap"
"go.uber.org/zap/zapcore" "go.uber.org/zap/zapcore"
"github.com/boramalper/magnetico/pkg/util"
"github.com/boramalper/magnetico/cmd/magneticod/bittorrent/metadata" "github.com/boramalper/magnetico/cmd/magneticod/bittorrent/metadata"
"github.com/boramalper/magnetico/cmd/magneticod/dht" "github.com/boramalper/magnetico/cmd/magneticod/dht"
"github.com/Wessie/appdirs" "github.com/Wessie/appdirs"
"github.com/boramalper/magnetico/pkg/persistence" "github.com/boramalper/magnetico/pkg/persistence"
) )
type cmdFlags struct {
DatabaseURL string `long:"database" description:"URL of the database."`
TrawlerMlAddrs []string `long:"trawler-ml-addr" description:"Address(es) to be used by trawling DHT (Mainline) nodes." default:"0.0.0.0:0"`
TrawlerMlInterval uint `long:"trawler-ml-interval" description:"Trawling interval in integer deciseconds (one tenth of a second)."`
Verbose []bool `short:"v" long:"verbose" description:"Increases verbosity."`
Profile string `long:"profile" description:"Enable profiling." choice:"cpu" choice:"memory" choice:"trace"`
}
type opFlags struct { type opFlags struct {
DatabaseURL string DatabaseURL string
TrawlerMlAddrs []string TrawlerMlAddrs []string
TrawlerMlInterval time.Duration TrawlerMlInterval time.Duration
LeechMaxN int
Verbosity int Verbosity int
Profile string Profile string
} }
@ -51,7 +46,11 @@ func main() {
zapcore.Lock(os.Stderr), zapcore.Lock(os.Stderr),
loggerLevel, loggerLevel,
)) ))
defer logger.Sync() defer func() {
if err := logger.Sync(); err != nil {
panic(err)
}
}()
zap.ReplaceGlobals(logger) zap.ReplaceGlobals(logger)
// opFlags is the "operational flags" // opFlags is the "operational flags"
@ -84,8 +83,19 @@ func main() {
if err != nil { if err != nil {
zap.L().Panic("Could not open the cpu profile file!", zap.Error(err)) zap.L().Panic("Could not open the cpu profile file!", zap.Error(err))
} }
pprof.StartCPUProfile(file) if err = pprof.StartCPUProfile(file); err != nil {
defer file.Close() zap.L().Fatal("Could not start CPU profiling!", zap.Error(err))
}
defer func() {
if err = file.Sync(); err != nil {
zap.L().Fatal("Could not sync profiling file!", zap.Error(err))
}
}()
defer func() {
if err = file.Close(); err != nil {
zap.L().Fatal("Could not close profiling file!", zap.Error(err))
}
}()
defer pprof.StopCPUProfile() defer pprof.StopCPUProfile()
case "memory": case "memory":
@ -99,7 +109,7 @@ func main() {
rand.Seed(time.Now().UnixNano()) rand.Seed(time.Now().UnixNano())
// Handle Ctrl-C gracefully. // Handle Ctrl-C gracefully.
interruptChan := make(chan os.Signal) interruptChan := make(chan os.Signal, 1)
signal.Notify(interruptChan, os.Interrupt) signal.Notify(interruptChan, os.Interrupt)
database, err := persistence.MakeDatabase(opFlags.DatabaseURL, logger) database, err := persistence.MakeDatabase(opFlags.DatabaseURL, logger)
@ -107,8 +117,8 @@ func main() {
logger.Sugar().Fatalf("Could not open the database at `%s`", opFlags.DatabaseURL, zap.Error(err)) logger.Sugar().Fatalf("Could not open the database at `%s`", opFlags.DatabaseURL, zap.Error(err))
} }
trawlingManager := dht.NewTrawlingManager(opFlags.TrawlerMlAddrs) trawlingManager := dht.NewTrawlingManager(opFlags.TrawlerMlAddrs, opFlags.TrawlerMlInterval)
metadataSink := metadata.NewSink(2 * time.Minute) metadataSink := metadata.NewSink(2*time.Minute, opFlags.LeechMaxN)
zap.L().Debug("Peer ID", zap.ByteString("peerID", metadataSink.PeerID)) zap.L().Debug("Peer ID", zap.ByteString("peerID", metadataSink.PeerID))
@ -126,8 +136,8 @@ func main() {
case md := <-metadataSink.Drain(): case md := <-metadataSink.Drain():
if err := database.AddNewTorrent(md.InfoHash, md.Name, md.Files); err != nil { if err := database.AddNewTorrent(md.InfoHash, md.Name, md.Files); err != nil {
logger.Sugar().Fatalf("Could not add new torrent %x to the database", zap.L().Fatal("Could not add new torrent to the database",
md.InfoHash, zap.Error(err)) util.HexField("infohash", md.InfoHash), zap.Error(err))
} }
zap.L().Info("Fetched!", zap.String("name", md.Name), util.HexField("infoHash", md.InfoHash)) zap.L().Info("Fetched!", zap.String("name", md.Name), util.HexField("infoHash", md.InfoHash))
@ -143,10 +153,21 @@ func main() {
} }
func parseFlags() (*opFlags, error) { func parseFlags() (*opFlags, error) {
opF := new(opFlags) var cmdF struct {
cmdF := new(cmdFlags) DatabaseURL string `long:"database" description:"URL of the database."`
_, err := flags.Parse(cmdF) TrawlerMlAddrs []string `long:"trawler-ml-addr" description:"Address(es) to be used by trawling DHT (Mainline) nodes." default:"0.0.0.0:0"`
TrawlerMlInterval uint `long:"trawler-ml-interval" description:"Trawling interval in integer seconds."`
LeechMaxN uint `long:"leech-max-n" description:"Maximum number of leeches." default:"1000"`
Verbose []bool `short:"v" long:"verbose" description:"Increases verbosity."`
Profile string `long:"profile" description:"Enable profiling." choice:"cpu" choice:"memory" choice:"trace"`
}
opF := new(opFlags)
_, err := flags.Parse(&cmdF)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -170,11 +191,18 @@ func parseFlags() (*opFlags, error) {
opF.TrawlerMlAddrs = cmdF.TrawlerMlAddrs opF.TrawlerMlAddrs = cmdF.TrawlerMlAddrs
} }
// 1 decisecond = 100 milliseconds = 0.1 seconds
if cmdF.TrawlerMlInterval == 0 { if cmdF.TrawlerMlInterval == 0 {
opF.TrawlerMlInterval = time.Duration(1) * 100 * time.Millisecond opF.TrawlerMlInterval = 1 * time.Second
} else { } else {
opF.TrawlerMlInterval = time.Duration(cmdF.TrawlerMlInterval) * 100 * time.Millisecond opF.TrawlerMlInterval = time.Duration(cmdF.TrawlerMlInterval) * time.Second
}
opF.LeechMaxN = int(cmdF.LeechMaxN)
if opF.LeechMaxN > 1000 {
zap.S().Warnf(
"Beware that on many systems max # of file descriptors per process is limited to 1024. " +
"Setting maximum number of leeches greater than 1k might cause \"too many open files\" errors!",
)
} }
opF.Verbosity = len(cmdF.Verbose) opF.Verbosity = len(cmdF.Verbose)