From 1fdfa131aaeae371a1702ecd02fc8933287ce44a Mon Sep 17 00:00:00 2001 From: "Bora M. Alper" Date: Sun, 19 May 2019 00:07:37 +0100 Subject: [PATCH] v0.8.0 featuring BEP 51 "DHT Infohash Indexing" --- cmd/magneticod/bittorrent/metadata/leech.go | 19 +- cmd/magneticod/bittorrent/metadata/sink.go | 85 ++++--- .../dht/mainline/indexingService.go | 32 +-- cmd/magneticod/dht/mainline/protocol.go | 4 +- cmd/magneticod/dht/mainline/service.go | 5 - cmd/magneticod/dht/mainline/transport.go | 5 +- .../dht/mainline/trawlingService.go | 228 ------------------ cmd/magneticod/dht/managers.go | 56 ++--- cmd/magneticod/main.go | 63 ++--- cmd/magneticow/main.go | 4 +- 10 files changed, 133 insertions(+), 368 deletions(-) delete mode 100644 cmd/magneticod/dht/mainline/service.go delete mode 100644 cmd/magneticod/dht/mainline/trawlingService.go diff --git a/cmd/magneticod/bittorrent/metadata/leech.go b/cmd/magneticod/bittorrent/metadata/leech.go index fad486a..db44dcc 100644 --- a/cmd/magneticod/bittorrent/metadata/leech.go +++ b/cmd/magneticod/bittorrent/metadata/leech.go @@ -234,10 +234,11 @@ func (l *Leech) readUmMessage() ([]byte, error) { func (l *Leech) connect(deadline time.Time) error { var err error - l.conn, err = net.DialTCP("tcp4", nil, l.peerAddr) + x, err := net.DialTimeout("tcp4", l.peerAddr.String(), 1*time.Second) if err != nil { return errors.Wrap(err, "dial") } + l.conn = x.(*net.TCPConn) // > If sec == 0, operating system discards any unsent or unacknowledged data [after Close() // > has been called]. @@ -249,22 +250,6 @@ func (l *Leech) connect(deadline time.Time) error { return errors.Wrap(err, "SetLinger") } - err = l.conn.SetKeepAlive(true) - if err != nil { - if err := l.conn.Close(); err != nil { - zap.L().Panic("couldn't close leech connection!", zap.Error(err)) - } - return errors.Wrap(err, "SetKeepAlive") - } - - err = l.conn.SetKeepAlivePeriod(10 * time.Second) - if err != nil { - if err := l.conn.Close(); err != nil { - zap.L().Panic("couldn't close leech connection!", zap.Error(err)) - } - return errors.Wrap(err, "SetKeepAlivePeriod") - } - err = l.conn.SetNoDelay(true) if err != nil { if err := l.conn.Close(); err != nil { diff --git a/cmd/magneticod/bittorrent/metadata/sink.go b/cmd/magneticod/bittorrent/metadata/sink.go index dab24b6..cec518e 100644 --- a/cmd/magneticod/bittorrent/metadata/sink.go +++ b/cmd/magneticod/bittorrent/metadata/sink.go @@ -2,6 +2,7 @@ package metadata import ( "math/rand" + "net" "sync" "time" @@ -24,14 +25,18 @@ type Metadata struct { } type Sink struct { - PeerID []byte - deadline time.Duration - maxNLeeches int - drain chan Metadata - incomingInfoHashes map[[20]byte]struct{} + PeerID []byte + deadline time.Duration + maxNLeeches int + drain chan Metadata + + incomingInfoHashes map[[20]byte][]net.TCPAddr incomingInfoHashesMx sync.Mutex - terminated bool - termination chan interface{} + + terminated bool + termination chan interface{} + + deleted int } func randomID() []byte { @@ -52,7 +57,7 @@ func randomID() []byte { * - Last two digits for the minor version number * - Patch version number is not encoded. */ - prefix := []byte("-MC0007-") + prefix := []byte("-MC0008-") var rando []byte for i := 20 - len(prefix); i >= 0; i-- { @@ -74,17 +79,28 @@ func NewSink(deadline time.Duration, maxNLeeches int) *Sink { ms.PeerID = randomID() ms.deadline = deadline ms.maxNLeeches = maxNLeeches - ms.drain = make(chan Metadata) - ms.incomingInfoHashes = make(map[[20]byte]struct{}) + ms.drain = make(chan Metadata, 10) + ms.incomingInfoHashes = make(map[[20]byte][]net.TCPAddr) ms.termination = make(chan interface{}) + go func() { + for range time.Tick(deadline) { + ms.incomingInfoHashesMx.Lock() + l := len(ms.incomingInfoHashes) + ms.incomingInfoHashesMx.Unlock() + zap.L().Info("Sink status", + zap.Int("activeLeeches", l), + zap.Int("nDeleted", ms.deleted), + zap.Int("drainQueue", len(ms.drain)), + ) + ms.deleted = 0 + } + }() + return ms } func (ms *Sink) Sink(res dht.Result) { - infoHash := res.InfoHash() - peerAddr := res.PeerAddr() - if ms.terminated { zap.L().Panic("Trying to Sink() an already closed Sink!") } @@ -96,23 +112,22 @@ func (ms *Sink) Sink(res dht.Result) { return } + infoHash := res.InfoHash() + peerAddrs := res.PeerAddrs() + if _, exists := ms.incomingInfoHashes[infoHash]; exists { return + } else if len(peerAddrs) > 0 { + peer := peerAddrs[0] + ms.incomingInfoHashes[infoHash] = peerAddrs[1:] + + go NewLeech(infoHash, &peer, ms.PeerID, LeechEventHandlers{ + OnSuccess: ms.flush, + OnError: ms.onLeechError, + }).Do(time.Now().Add(ms.deadline)) } - // BEWARE! - // Although not crucial, the assumption is that Sink.Sink() will be called by only one - // goroutine (i.e. it's not thread-safe), lest there might be a race condition between where we - // check whether res.infoHash exists in the ms.incomingInfoHashes, and where we add the infoHash - // to the incomingInfoHashes at the end of this function. zap.L().Debug("Sunk!", zap.Int("leeches", len(ms.incomingInfoHashes)), util.HexField("infoHash", infoHash[:])) - - go NewLeech(infoHash, peerAddr, ms.PeerID, LeechEventHandlers{ - OnSuccess: ms.flush, - OnError: ms.onLeechError, - }).Do(time.Now().Add(ms.deadline)) - - ms.incomingInfoHashes[infoHash] = struct{}{} } func (ms *Sink) Drain() <-chan Metadata { @@ -136,17 +151,29 @@ func (ms *Sink) flush(result Metadata) { ms.drain <- result // Delete the infoHash from ms.incomingInfoHashes ONLY AFTER once we've flushed the // metadata! + ms.incomingInfoHashesMx.Lock() + defer ms.incomingInfoHashesMx.Unlock() + var infoHash [20]byte copy(infoHash[:], result.InfoHash) - ms.incomingInfoHashesMx.Lock() delete(ms.incomingInfoHashes, infoHash) - ms.incomingInfoHashesMx.Unlock() } func (ms *Sink) onLeechError(infoHash [20]byte, err error) { zap.L().Debug("leech error", util.HexField("infoHash", infoHash[:]), zap.Error(err)) ms.incomingInfoHashesMx.Lock() - delete(ms.incomingInfoHashes, infoHash) - ms.incomingInfoHashesMx.Unlock() + defer ms.incomingInfoHashesMx.Unlock() + + if len(ms.incomingInfoHashes[infoHash]) > 0 { + peer := ms.incomingInfoHashes[infoHash][0] + ms.incomingInfoHashes[infoHash] = ms.incomingInfoHashes[infoHash][1:] + go NewLeech(infoHash, &peer, ms.PeerID, LeechEventHandlers{ + OnSuccess: ms.flush, + OnError: ms.onLeechError, + }).Do(time.Now().Add(ms.deadline)) + } else { + ms.deleted++ + delete(ms.incomingInfoHashes, infoHash) + } } diff --git a/cmd/magneticod/dht/mainline/indexingService.go b/cmd/magneticod/dht/mainline/indexingService.go index 67bb81d..b40387e 100644 --- a/cmd/magneticod/dht/mainline/indexingService.go +++ b/cmd/magneticod/dht/mainline/indexingService.go @@ -34,16 +34,16 @@ type IndexingServiceEventHandlers struct { } type IndexingResult struct { - infoHash [20]byte - peerAddr *net.TCPAddr + infoHash [20]byte + peerAddrs []net.TCPAddr } func (ir IndexingResult) InfoHash() [20]byte { return ir.infoHash } -func (ir IndexingResult) PeerAddr() *net.TCPAddr { - return ir.peerAddr +func (ir IndexingResult) PeerAddrs() []net.TCPAddr { + return ir.peerAddrs } func NewIndexingService(laddr string, interval time.Duration, eventHandlers IndexingServiceEventHandlers) *IndexingService { @@ -86,11 +86,6 @@ func (is *IndexingService) Terminate() { func (is *IndexingService) index() { for range time.Tick(is.interval) { - // TODO - // For some reason, we can't still detect congestion and this keeps increasing... - // Disable for now. - // s.maxNeighbors = uint(float32(s.maxNeighbors) * 1.001) - is.routingTableMutex.Lock() if len(is.routingTable) == 0 { is.bootstrap() @@ -190,15 +185,22 @@ func (is *IndexingService) onGetPeersResponse(msg *Message, addr *net.UDPAddr) { return } + peerAddrs := make([]net.TCPAddr, 0) for _, peer := range msg.R.Values { - is.eventHandlers.OnResult(IndexingResult{ - infoHash: infoHash, - peerAddr: &net.TCPAddr{ - IP: peer.IP, - Port: peer.Port, - }, + if peer.Port == 0 { + continue + } + + peerAddrs = append(peerAddrs, net.TCPAddr{ + IP: peer.IP, + Port: peer.Port, }) } + + is.eventHandlers.OnResult(IndexingResult{ + infoHash: infoHash, + peerAddrs: peerAddrs, + }) } func (is *IndexingService) onSampleInfohashesResponse(msg *Message, addr *net.UDPAddr) { diff --git a/cmd/magneticod/dht/mainline/protocol.go b/cmd/magneticod/dht/mainline/protocol.go index fab7d80..9a88ac3 100644 --- a/cmd/magneticod/dht/mainline/protocol.go +++ b/cmd/magneticod/dht/mainline/protocol.go @@ -225,8 +225,8 @@ func NewSampleInfohashesQuery(id []byte, t []byte, target []byte) *Message { Y: "q", T: t, Q: "sample_infohashes", - A: QueryArguments { - ID: id, + A: QueryArguments{ + ID: id, Target: target, }, } diff --git a/cmd/magneticod/dht/mainline/service.go b/cmd/magneticod/dht/mainline/service.go deleted file mode 100644 index c7a9752..0000000 --- a/cmd/magneticod/dht/mainline/service.go +++ /dev/null @@ -1,5 +0,0 @@ -package mainline - -type Service interface { - // TODO: develop a service interface to be used by the manager -} diff --git a/cmd/magneticod/dht/mainline/transport.go b/cmd/magneticod/dht/mainline/transport.go index 2902160..0554237 100644 --- a/cmd/magneticod/dht/mainline/transport.go +++ b/cmd/magneticod/dht/mainline/transport.go @@ -94,12 +94,11 @@ func (t *Transport) readMessages() { zap.L().Warn("READ CONGESTION!", zap.Error(err)) t.onCongestion() } else if err != nil { - // TODO: isn't there a more reliable way to detect if UDPConn is closed? zap.L().Warn("Could NOT read an UDP packet!", zap.Error(err)) } if n == 0 { - /* Datagram sockets in various domains (e.g., the UNIX and Internet domains) permit + /* Datagram sockets in various domains (e.g., the UNIX and Internet domains) permit * zero-length datagrams. When such a datagram is received, the return value (n) is 0. */ continue @@ -150,7 +149,7 @@ func (t *Transport) WriteMessages(msg *Message, addr *net.UDPAddr) { * * Source: https://docs.python.org/3/library/asyncio-protocol.html#flow-control-callbacks */ - //zap.L().Warn("WRITE CONGESTION!", zap.Error(err)) + zap.L().Warn("WRITE CONGESTION!", zap.Error(err)) if t.onCongestion != nil { t.onCongestion() } diff --git a/cmd/magneticod/dht/mainline/trawlingService.go b/cmd/magneticod/dht/mainline/trawlingService.go deleted file mode 100644 index 08e44cd..0000000 --- a/cmd/magneticod/dht/mainline/trawlingService.go +++ /dev/null @@ -1,228 +0,0 @@ -package mainline - -import ( - "math/rand" - "net" - "sync" - "time" - - "go.uber.org/zap" -) - -type TrawlingService struct { - // Private - protocol *Protocol - started bool - interval time.Duration - eventHandlers TrawlingServiceEventHandlers - - trueNodeID []byte - // []byte type would be a much better fit for the keys but unfortunately (and quite - // understandably) slices cannot be used as keys (since they are not hashable), and using arrays - // (or even the conversion between each other) is a pain; hence map[string]net.UDPAddr - // ^~~~~~ - routingTable map[string]*net.UDPAddr - routingTableMutex *sync.Mutex - maxNeighbors uint -} - -type TrawlingServiceEventHandlers struct { - OnResult func(TrawlingResult) -} - -type TrawlingResult struct { - infoHash [20]byte - peerAddr *net.TCPAddr -} - -func (tr TrawlingResult) InfoHash() [20]byte { - return tr.infoHash -} - -func (tr TrawlingResult) PeerAddr() *net.TCPAddr { - return tr.peerAddr -} - -func NewTrawlingService(laddr string, initialMaxNeighbors uint, interval time.Duration, eventHandlers TrawlingServiceEventHandlers) *TrawlingService { - service := new(TrawlingService) - service.interval = interval - service.protocol = NewProtocol( - laddr, - ProtocolEventHandlers{ - OnGetPeersQuery: service.onGetPeersQuery, - OnAnnouncePeerQuery: service.onAnnouncePeerQuery, - OnFindNodeResponse: service.onFindNodeResponse, - OnCongestion: service.onCongestion, - }, - ) - service.trueNodeID = make([]byte, 20) - service.routingTable = make(map[string]*net.UDPAddr) - service.routingTableMutex = new(sync.Mutex) - service.eventHandlers = eventHandlers - service.maxNeighbors = initialMaxNeighbors - - _, err := rand.Read(service.trueNodeID) - if err != nil { - zap.L().Panic("Could NOT generate random bytes for node ID!") - } - - return service -} - -func (s *TrawlingService) Start() { - if s.started { - zap.L().Panic("Attempting to Start() a mainline/TrawlingService that has been already started! (Programmer error.)") - } - s.started = true - - s.protocol.Start() - go s.trawl() - - zap.L().Info("Trawling Service started!") -} - -func (s *TrawlingService) Terminate() { - s.protocol.Terminate() -} - -func (s *TrawlingService) trawl() { - for range time.Tick(s.interval) { - // TODO - // For some reason, we can't still detect congestion and this keeps increasing... - // Disable for now. - // s.maxNeighbors = uint(float32(s.maxNeighbors) * 1.001) - - s.routingTableMutex.Lock() - if len(s.routingTable) == 0 { - s.bootstrap() - } else { - zap.L().Info("Latest status:", zap.Int("n", len(s.routingTable)), - zap.Uint("maxNeighbors", s.maxNeighbors)) - s.findNeighbors() - s.routingTable = make(map[string]*net.UDPAddr) - } - s.routingTableMutex.Unlock() - } -} - -func (s *TrawlingService) bootstrap() { - bootstrappingNodes := []string{ - "router.bittorrent.com:6881", - "dht.transmissionbt.com:6881", - "dht.libtorrent.org:25401", - } - zap.L().Info("Bootstrapping as routing table is empty...") - for _, node := range bootstrappingNodes { - target := make([]byte, 20) - _, err := rand.Read(target) - if err != nil { - zap.L().Panic("Could NOT generate random bytes during bootstrapping!") - } - - addr, err := net.ResolveUDPAddr("udp", node) - if err != nil { - zap.L().Error("Could NOT resolve (UDP) address of the bootstrapping node!", - zap.String("node", node)) - continue - } - - s.protocol.SendMessage(NewFindNodeQuery(s.trueNodeID, target), addr) - } -} - -func (s *TrawlingService) findNeighbors() { - target := make([]byte, 20) - for nodeID, addr := range s.routingTable { - _, err := rand.Read(target) - if err != nil { - zap.L().Panic("Could NOT generate random bytes during bootstrapping!") - } - - s.protocol.SendMessage( - NewFindNodeQuery(append([]byte(nodeID[:15]), s.trueNodeID[:5]...), target), - addr, - ) - } -} - -func (s *TrawlingService) onGetPeersQuery(query *Message, addr *net.UDPAddr) { - s.protocol.SendMessage( - NewGetPeersResponseWithNodes( - query.T, - append(query.A.ID[:15], s.trueNodeID[:5]...), - s.protocol.CalculateToken(net.ParseIP(addr.String()))[:], - []CompactNodeInfo{}, - ), - addr, - ) -} - -func (s *TrawlingService) onAnnouncePeerQuery(query *Message, addr *net.UDPAddr) { - /* BEP 5 - * - * There is an optional argument called implied_port which value is either 0 or 1. If it is - * present and non-zero, the port argument should be ignored and the source port of the UDP - * packet should be used as the peer's port instead. This is useful for peers behind a NAT that - * may not know their external port, and supporting uTP, they accept incoming connections on the - * same port as the DHT port. - */ - var peerPort int - if query.A.ImpliedPort != 0 { - // TODO: Peer uses uTP, ignore for now - // return - peerPort = addr.Port - } else { - peerPort = query.A.Port - // return - } - - // TODO: It looks ugly, am I doing it right? --Bora - // (Converting slices to arrays in Go shouldn't have been such a pain...) - var infoHash [20]byte - copy(infoHash[:], query.A.InfoHash) - s.eventHandlers.OnResult(TrawlingResult{ - infoHash: infoHash, - peerAddr: &net.TCPAddr{ - IP: addr.IP, - Port: peerPort, - }, - }) - - s.protocol.SendMessage( - NewAnnouncePeerResponse( - query.T, - append(query.A.ID[:15], s.trueNodeID[:5]...), - ), - addr, - ) -} - -func (s *TrawlingService) onFindNodeResponse(response *Message, addr *net.UDPAddr) { - s.routingTableMutex.Lock() - defer s.routingTableMutex.Unlock() - - for _, node := range response.R.Nodes { - if uint(len(s.routingTable)) >= s.maxNeighbors { - break - } - if node.Addr.Port == 0 { // Ignore nodes who "use" port 0. - continue - } - - s.routingTable[string(node.ID)] = &node.Addr - } -} - -func (s *TrawlingService) onCongestion() { - /* The Congestion Prevention Strategy: - * - * In case of congestion, decrease the maximum number of nodes to the 90% of the current value. - */ - if s.maxNeighbors < 200 { - zap.L().Warn("Max. number of neighbours are < 200 and there is still congestion!" + - "(check your network connection if this message recurs)") - return - } - - s.maxNeighbors = uint(float32(s.maxNeighbors) * 0.9) -} diff --git a/cmd/magneticod/dht/managers.go b/cmd/magneticod/dht/managers.go index aed2efb..1c655d6 100644 --- a/cmd/magneticod/dht/managers.go +++ b/cmd/magneticod/dht/managers.go @@ -9,39 +9,27 @@ import ( "github.com/boramalper/magnetico/cmd/magneticod/dht/mainline" ) -type TrawlingManager struct { - // private - output chan Result - trawlingServices []*mainline.TrawlingService - indexingServices []*mainline.IndexingService +type Service interface { + Start() + Terminate() } type Result interface { InfoHash() [20]byte - PeerAddr() *net.TCPAddr + PeerAddrs() []net.TCPAddr } -func NewTrawlingManager(tsAddrs []string, isAddrs []string, interval time.Duration) *TrawlingManager { - manager := new(TrawlingManager) +type Manager struct { + output chan Result + indexingServices []Service +} + +func NewTrawlingManager(addrs []string, interval time.Duration) *Manager { + manager := new(Manager) manager.output = make(chan Result, 20) - // Trawling Services - for _, addr := range tsAddrs { - service := mainline.NewTrawlingService( - addr, - 2000, - interval, - mainline.TrawlingServiceEventHandlers{ - OnResult: manager.onTrawlingResult, - }, - ) - manager.trawlingServices = append(manager.trawlingServices, service) - service.Start() - } - - // Indexing Services - for _, addr := range isAddrs { - service := mainline.NewIndexingService(addr, 2 * time.Second, mainline.IndexingServiceEventHandlers{ + for _, addr := range addrs { + service := mainline.NewIndexingService(addr, 2*time.Second, mainline.IndexingServiceEventHandlers{ OnResult: manager.onIndexingResult, }) manager.indexingServices = append(manager.indexingServices, service) @@ -51,30 +39,20 @@ func NewTrawlingManager(tsAddrs []string, isAddrs []string, interval time.Durati return manager } -func (m *TrawlingManager) onTrawlingResult(res mainline.TrawlingResult) { +func (m *Manager) onIndexingResult(res mainline.IndexingResult) { select { case m.output <- res: default: - // TODO: should be a warn - zap.L().Debug("DHT manager output ch is full, result dropped!") - } -} - -func (m *TrawlingManager) onIndexingResult(res mainline.IndexingResult) { - select { - case m.output <- res: - default: - // TODO: should be a warn zap.L().Debug("DHT manager output ch is full, idx result dropped!") } } -func (m *TrawlingManager) Output() <-chan Result { +func (m *Manager) Output() <-chan Result { return m.output } -func (m *TrawlingManager) Terminate() { - for _, service := range m.trawlingServices { +func (m *Manager) Terminate() { + for _, service := range m.indexingServices { service.Terminate() } } diff --git a/cmd/magneticod/main.go b/cmd/magneticod/main.go index 3102544..cdead3f 100644 --- a/cmd/magneticod/main.go +++ b/cmd/magneticod/main.go @@ -5,6 +5,7 @@ import ( "net" "os" "os/signal" + "runtime" "runtime/pprof" "time" @@ -27,8 +28,8 @@ import ( type opFlags struct { DatabaseURL string - TrawlerMlAddrs []string - TrawlerMlInterval time.Duration + IndexerAddrs []string + IndexerInterval time.Duration LeechMaxN int @@ -46,11 +47,7 @@ func main() { zapcore.Lock(os.Stderr), loggerLevel, )) - defer func() { - if err := logger.Sync(); err != nil { - panic(err) - } - }() + defer logger.Sync() zap.ReplaceGlobals(logger) // opFlags is the "operational flags" @@ -60,8 +57,8 @@ func main() { return } - zap.L().Info("magneticod v0.7.2 has been started.") - zap.L().Info("Copyright (C) 2018 Mert Bora ALPER .") + zap.L().Info("magneticod v0.8.0 has been started.") + zap.L().Info("Copyright (C) 2017-2019 Mert Bora ALPER .") zap.L().Info("Dedicated to Cemile Binay, in whose hands I thrived.") zap.S().Infof("Compiled on %s", compiledOn) @@ -77,8 +74,7 @@ func main() { zap.ReplaceGlobals(logger) - switch opFlags.Profile { - case "cpu": + if opFlags.Profile == "cpu" { file, err := os.OpenFile("magneticod_cpu.prof", os.O_CREATE|os.O_WRONLY, 0755) if err != nil { zap.L().Panic("Could not open the cpu profile file!", zap.Error(err)) @@ -97,12 +93,6 @@ func main() { } }() defer pprof.StopCPUProfile() - - case "memory": - zap.L().Panic("NOT IMPLEMENTED") - - case "trace": - zap.L().Panic("NOT IMPLEMENTED") } // Initialise the random number generator @@ -117,8 +107,8 @@ func main() { logger.Sugar().Fatalf("Could not open the database at `%s`", opFlags.DatabaseURL, zap.Error(err)) } - trawlingManager := dht.NewTrawlingManager(nil, []string{"0.0.0.0:0"}, opFlags.TrawlerMlInterval) - metadataSink := metadata.NewSink(2*time.Minute, opFlags.LeechMaxN) + trawlingManager := dht.NewTrawlingManager(opFlags.IndexerAddrs, opFlags.IndexerInterval) + metadataSink := metadata.NewSink(5*time.Second, opFlags.LeechMaxN) zap.L().Debug("Peer ID", zap.ByteString("peerID", metadataSink.PeerID)) @@ -144,6 +134,23 @@ func main() { zap.L().Info("Fetched!", zap.String("name", md.Name), util.HexField("infoHash", md.InfoHash)) case <-interruptChan: + if opFlags.Profile == "heap" { + file, err := os.OpenFile("magneticod_heap.prof", os.O_CREATE|os.O_WRONLY, 0755) + if err != nil { + zap.L().Panic("Could not open the memory profile file!", zap.Error(err)) + } + runtime.GC() // get up-to-date statistics + if err = pprof.WriteHeapProfile(file); err != nil { + zap.L().Fatal("Could not write heap profile!", zap.Error(err)) + } + if err = file.Sync(); err != nil { + zap.L().Fatal("Could not sync profiling file!", zap.Error(err)) + } + if err = file.Close(); err != nil { + zap.L().Fatal("Could not close profiling file!", zap.Error(err)) + } + } + trawlingManager.Terminate() stopped = true } @@ -158,13 +165,13 @@ func parseFlags() (*opFlags, error) { var cmdF struct { DatabaseURL string `long:"database" description:"URL of the database."` - TrawlerMlAddrs []string `long:"trawler-ml-addr" description:"Address(es) to be used by trawling DHT (Mainline) nodes." default:"0.0.0.0:0"` - TrawlerMlInterval uint `long:"trawler-ml-interval" description:"Trawling interval in integer seconds."` + IndexerAddrs []string `long:"indexer-addr" description:"Address(es) to be used by indexing DHT nodes." default:"0.0.0.0:0"` + IndexerInterval uint `long:"indexer-interval" description:"Indexing interval in integer seconds."` - LeechMaxN uint `long:"leech-max-n" description:"Maximum number of leeches." default:"100"` + LeechMaxN uint `long:"leech-max-n" description:"Maximum number of leeches." default:"200"` Verbose []bool `short:"v" long:"verbose" description:"Increases verbosity."` - Profile string `long:"profile" description:"Enable profiling." choice:"cpu" choice:"memory" choice:"trace"` + Profile string `long:"profile" description:"Enable profiling." choice:"cpu" choice:"heap"` } opF := new(opFlags) @@ -187,16 +194,16 @@ func parseFlags() (*opFlags, error) { opF.DatabaseURL = cmdF.DatabaseURL } - if err = checkAddrs(cmdF.TrawlerMlAddrs); err != nil { + if err = checkAddrs(cmdF.IndexerAddrs); err != nil { zap.S().Fatalf("Of argument (list) `trawler-ml-addr`", zap.Error(err)) } else { - opF.TrawlerMlAddrs = cmdF.TrawlerMlAddrs + opF.IndexerAddrs = cmdF.IndexerAddrs } - if cmdF.TrawlerMlInterval == 0 { - opF.TrawlerMlInterval = 1 * time.Second + if cmdF.IndexerInterval == 0 { + opF.IndexerInterval = 2 * time.Second } else { - opF.TrawlerMlInterval = time.Duration(cmdF.TrawlerMlInterval) * time.Second + opF.IndexerInterval = time.Duration(cmdF.IndexerInterval) * time.Second } opF.LeechMaxN = int(cmdF.LeechMaxN) diff --git a/cmd/magneticow/main.go b/cmd/magneticow/main.go index f01319e..a3beffa 100644 --- a/cmd/magneticow/main.go +++ b/cmd/magneticow/main.go @@ -62,8 +62,8 @@ func main() { defer logger.Sync() zap.ReplaceGlobals(logger) - zap.L().Info("magneticow v0.7.2 has been started.") - zap.L().Info("Copyright (C) 2018 Mert Bora ALPER .") + zap.L().Info("magneticow v0.8 has been started.") + zap.L().Info("Copyright (C) 2017-2019 Mert Bora ALPER .") zap.L().Info("Dedicated to Cemile Binay, in whose hands I thrived.") zap.S().Infof("Compiled on %s", compiledOn)