diff --git a/cmd/magneticod/bittorrent/metadata/sink.go b/cmd/magneticod/bittorrent/metadata/sink.go index 386a1d3..518ecd0 100644 --- a/cmd/magneticod/bittorrent/metadata/sink.go +++ b/cmd/magneticod/bittorrent/metadata/sink.go @@ -7,7 +7,7 @@ import ( "go.uber.org/zap" - "github.com/boramalper/magnetico/cmd/magneticod/dht/mainline" + "github.com/boramalper/magnetico/cmd/magneticod/dht" "github.com/boramalper/magnetico/pkg/persistence" "github.com/boramalper/magnetico/pkg/util" ) @@ -81,7 +81,10 @@ func NewSink(deadline time.Duration, maxNLeeches int) *Sink { return ms } -func (ms *Sink) Sink(res mainline.TrawlingResult) { +func (ms *Sink) Sink(res dht.Result) { + infoHash := res.InfoHash() + peerAddr := res.PeerAddr() + if ms.terminated { zap.L().Panic("Trying to Sink() an already closed Sink!") } @@ -93,7 +96,7 @@ func (ms *Sink) Sink(res mainline.TrawlingResult) { return } - if _, exists := ms.incomingInfoHashes[res.InfoHash]; exists { + if _, exists := ms.incomingInfoHashes[infoHash]; exists { return } // BEWARE! @@ -102,14 +105,14 @@ func (ms *Sink) Sink(res mainline.TrawlingResult) { // check whether res.infoHash exists in the ms.incomingInfoHashes, and where we add the infoHash // to the incomingInfoHashes at the end of this function. - zap.L().Info("Sunk!", zap.Int("leeches", len(ms.incomingInfoHashes)), util.HexField("infoHash", res.InfoHash[:])) + zap.L().Info("Sunk!", zap.Int("leeches", len(ms.incomingInfoHashes)), util.HexField("infoHash", infoHash[:])) - go NewLeech(res.InfoHash, res.PeerAddr, ms.PeerID, LeechEventHandlers{ + go NewLeech(infoHash, peerAddr, ms.PeerID, LeechEventHandlers{ OnSuccess: ms.flush, OnError: ms.onLeechError, }).Do(time.Now().Add(ms.deadline)) - ms.incomingInfoHashes[res.InfoHash] = struct{}{} + ms.incomingInfoHashes[infoHash] = struct{}{} } func (ms *Sink) Drain() <-chan Metadata { diff --git a/cmd/magneticod/dht/mainline/codec.go b/cmd/magneticod/dht/mainline/codec.go index 768be3c..7852762 100644 --- a/cmd/magneticod/dht/mainline/codec.go +++ b/cmd/magneticod/dht/mainline/codec.go @@ -20,7 +20,12 @@ import ( ) type Message struct { - // Query method (one of 4: "ping", "find_node", "get_peers", "announce_peer") + // Query method. One of 5: + // - "ping" + // - "find_node" + // - "get_peers" + // - "announce_peer" + // - "sample_infohashes" (added by BEP 51) Q string `bencode:"q,omitempty"` // named QueryArguments sent with a query A QueryArguments `bencode:"a,omitempty"` @@ -74,6 +79,13 @@ type ResponseValues struct { // Torrent peers Values []CompactPeer `bencode:"values,omitempty"` + // The subset refresh interval in seconds. Added by BEP 51. + Interval int `bencode:"interval,omitempty"` + // Number of infohashes in storage. Added by BEP 51. + Num int `bencode:"num,omitempty"` + // Subset of stored infohashes, N × 20 bytes. Added by BEP 51. + Samples []byte `bencode:"samples,omitempty"` + // If `scrape` is set to 1 in the `get_peers` query then the responding node should add the // below two fields to the "r" dictionary in the response: // Defined in BEP 33 "DHT Scrapes" for responses to `get_peers` queries. diff --git a/cmd/magneticod/dht/mainline/indexingService.go b/cmd/magneticod/dht/mainline/indexingService.go new file mode 100644 index 0000000..307c13d --- /dev/null +++ b/cmd/magneticod/dht/mainline/indexingService.go @@ -0,0 +1,111 @@ +package mainline + +import ( + "net" + "sync" + "time" +) + +type IndexingService struct { + // Private + protocol *Protocol + started bool + interval time.Duration + eventHandlers IndexingServiceEventHandlers + + nodeID []byte + // []byte type would be a much better fit for the keys but unfortunately (and quite + // understandably) slices cannot be used as keys (since they are not hashable), and using arrays + // (or even the conversion between each other) is a pain; hence map[string]net.UDPAddr + // ^~~~~~ + routingTable map[string]*net.UDPAddr + routingTableMutex *sync.Mutex + + counter uint16 + getPeersRequests map[[2]byte][20]byte // GetPeersQuery.`t` -> infohash +} + +type IndexingServiceEventHandlers struct { + OnResult func(IndexingResult) +} + +type IndexingResult struct { + infoHash [20]byte + peerAddr *net.TCPAddr +} + +func (ir IndexingResult) InfoHash() [20]byte { + return ir.infoHash +} + +func (ir IndexingResult) PeerAddr() *net.TCPAddr { + return ir.peerAddr +} + +func NewIndexingService(laddr string, interval time.Duration, eventHandlers IndexingServiceEventHandlers) *IndexingService { + service := new(IndexingService) + service.interval = interval + service.protocol = NewProtocol( + laddr, + ProtocolEventHandlers{ + OnGetPeersResponse: service.onGetPeersResponse, + OnSampleInfohashesResponse: service.onSampleInfohashesResponse, + }, + ) + service.nodeID = make([]byte, 20) + service.routingTable = make(map[string]*net.UDPAddr) + service.routingTableMutex = new(sync.Mutex) + service.eventHandlers = eventHandlers + + return service +} + +func (is *IndexingService) onGetPeersResponse(msg *Message, addr *net.UDPAddr) { + var t [2]byte + copy(t[:], msg.T) + + infoHash := is.getPeersRequests[t] + // We got a response, so free the key! + delete(is.getPeersRequests, t) + + // BEP 51 specifies that + // The new sample_infohashes remote procedure call requests that a remote node return a string of multiple + // concatenated infohashes (20 bytes each) FOR WHICH IT HOLDS GET_PEERS VALUES. + // ^^^^^^ + // So theoretically we should never hit the case where `values` is empty, but c'est la vie. + if len(msg.R.Values) == 0 { + return + } + + for _, peer := range msg.R.Values { + is.eventHandlers.OnResult(IndexingResult{ + infoHash: infoHash, + peerAddr: &net.TCPAddr{ + IP: peer.IP, + Port: peer.Port, + }, + }) + } +} + +func (is *IndexingService) onSampleInfohashesResponse(msg *Message, addr *net.UDPAddr) { + for i := 0; i < len(msg.R.Samples)/20; i++ { + var infoHash [20]byte + copy(infoHash[:], msg.R.Samples[i:(i+1)*20]) + + msg := NewGetPeersQuery(is.nodeID, infoHash[:]) + t := uint16BE(is.counter) + msg.T = t[:] + + is.protocol.SendMessage(msg, addr) + + is.getPeersRequests[t] = infoHash + is.counter++ + } +} + +func uint16BE(v uint16) (b [2]byte) { + b[0] = byte(v >> 8) + b[1] = byte(v) + return +} diff --git a/cmd/magneticod/dht/mainline/protocol.go b/cmd/magneticod/dht/mainline/protocol.go index f6b3b6c..a436147 100644 --- a/cmd/magneticod/dht/mainline/protocol.go +++ b/cmd/magneticod/dht/mainline/protocol.go @@ -26,7 +26,12 @@ type ProtocolEventHandlers struct { OnGetPeersResponse func(*Message, *net.UDPAddr) OnFindNodeResponse func(*Message, *net.UDPAddr) OnPingORAnnouncePeerResponse func(*Message, *net.UDPAddr) - OnCongestion func() + + // Added by BEP 51 + OnSampleInfohashesQuery func(*Message, *net.UDPAddr) + OnSampleInfohashesResponse func(*Message, *net.UDPAddr) + + OnCongestion func() } func NewProtocol(laddr string, eventHandlers ProtocolEventHandlers) (p *Protocol) { @@ -46,7 +51,7 @@ func NewProtocol(laddr string, eventHandlers ProtocolEventHandlers) (p *Protocol func (p *Protocol) Start() { if p.started { - zap.L().Panic("Attempting to Start() a mainline/Transport that has been already started! (Programmer error.)") + zap.L().Panic("Attempting to Start() a mainline/Protocol that has been already started! (Programmer error.)") } p.started = true @@ -55,6 +60,10 @@ func (p *Protocol) Start() { } func (p *Protocol) Terminate() { + if !p.started { + zap.L().Panic("Attempted to Terminate() a mainline/Protocol that has not been Start()ed! (Programmer error.)") + } + p.transport.Terminate() } @@ -103,13 +112,42 @@ func (p *Protocol) onMessage(msg *Message, addr *net.UDPAddr) { case "vote": // Although we are aware that such method exists, we ignore. + case "sample_infohashes": // Added by BEP 51 + if !validateSampleInfohashesQueryMessage(msg) { + // zap.L().Debug("An invalid sample_infohashes query received!") + return + } + if p.eventHandlers.OnSampleInfohashesQuery != nil { + p.eventHandlers.OnSampleInfohashesQuery(msg, addr) + } + default: // zap.L().Debug("A KRPC query of an unknown method received!", zap.String("method", msg.Q)) return } case "r": - // get_peers > find_node > ping / announce_peer - if len(msg.R.Token) != 0 { // The message should be a get_peers response. + // Query messages have a `q` field which indicates their type but response messages have no such field that we + // can rely on. + // The idea is you'd use transaction ID (the `t` key) to deduce the type of a response message, as it must be + // sent in response to a query message (with the same transaction ID) that we have sent earlier. + // This approach is, unfortunately, not very practical for our needs since we send up to thousands messages per + // second, meaning that we'd run out of transaction IDs very quickly (since some [many?] clients assume + // transaction IDs are no longer than 2 bytes), and we'd also then have to consider retention too (as we might + // not get a response at all). + // Our approach uses an ad-hoc pattern matching: all response messages share a subset of fields (such as `t`, + // `y`) but only one type of them contain a particular field (such as `token` field is unique to `get_peers` + // responses, `samples` is unique to `sample_infohashes` etc). + // + // sample_infohashes > get_peers > find_node > ping / announce_peer + if len(msg.R.Samples) != 0 { // The message should be a sample_infohashes response. + if !validateSampleInfohashesResponseMessage(msg) { + // zap.L().Debug("An invalid sample_infohashes response received!") + return + } + if p.eventHandlers.OnSampleInfohashesResponse != nil { + p.eventHandlers.OnSampleInfohashesResponse(msg, addr) + } + } else if len(msg.R.Token) != 0 { // The message should be a get_peers response. if !validateGetPeersResponseMessage(msg) { // zap.L().Debug("An invalid get_peers response received!") return @@ -259,6 +297,11 @@ func validateAnnouncePeerQueryMessage(msg *Message) bool { len(msg.A.Token) > 0 } +func validateSampleInfohashesQueryMessage(msg *Message) bool { + return len(msg.A.ID) == 20 && + len(msg.A.Target) == 20 +} + func validatePingORannouncePeerResponseMessage(msg *Message) bool { return len(msg.R.ID) == 20 } @@ -279,3 +322,11 @@ func validateGetPeersResponseMessage(msg *Message) bool { // TODO: check for values or nodes } + +func validateSampleInfohashesResponseMessage(msg *Message) bool { + return len(msg.R.ID) == 20 && + msg.R.Interval >= 0 && + // TODO: check for nodes + msg.R.Num >= 0 && + len(msg.R.Samples)%20 == 0 +} diff --git a/cmd/magneticod/dht/mainline/service.go b/cmd/magneticod/dht/mainline/trawlingService.go similarity index 96% rename from cmd/magneticod/dht/mainline/service.go rename to cmd/magneticod/dht/mainline/trawlingService.go index f8787d4..f88d7e0 100644 --- a/cmd/magneticod/dht/mainline/service.go +++ b/cmd/magneticod/dht/mainline/trawlingService.go @@ -9,11 +9,6 @@ import ( "go.uber.org/zap" ) -type TrawlingResult struct { - InfoHash [20]byte - PeerAddr *net.TCPAddr -} - type TrawlingService struct { // Private protocol *Protocol @@ -35,6 +30,19 @@ type TrawlingServiceEventHandlers struct { OnResult func(TrawlingResult) } +type TrawlingResult struct { + infoHash [20]byte + peerAddr *net.TCPAddr +} + +func (tr TrawlingResult) InfoHash() [20]byte { + return tr.infoHash +} + +func (tr TrawlingResult) PeerAddr() *net.TCPAddr { + return tr.peerAddr +} + func NewTrawlingService(laddr string, initialMaxNeighbors uint, interval time.Duration, eventHandlers TrawlingServiceEventHandlers) *TrawlingService { service := new(TrawlingService) service.interval = interval @@ -173,8 +181,8 @@ func (s *TrawlingService) onAnnouncePeerQuery(query *Message, addr *net.UDPAddr) var infoHash [20]byte copy(infoHash[:], query.A.InfoHash) s.eventHandlers.OnResult(TrawlingResult{ - InfoHash: infoHash, - PeerAddr: &net.TCPAddr{ + infoHash: infoHash, + peerAddr: &net.TCPAddr{ IP: addr.IP, Port: peerPort, }, diff --git a/cmd/magneticod/dht/managers.go b/cmd/magneticod/dht/managers.go index 32a370f..2578912 100644 --- a/cmd/magneticod/dht/managers.go +++ b/cmd/magneticod/dht/managers.go @@ -1,19 +1,28 @@ package dht import ( - "github.com/boramalper/magnetico/cmd/magneticod/dht/mainline" + "net" "time" + + "go.uber.org/zap" + + "github.com/boramalper/magnetico/cmd/magneticod/dht/mainline" ) type TrawlingManager struct { // private - output chan mainline.TrawlingResult + output chan Result services []*mainline.TrawlingService } +type Result interface { + InfoHash() [20]byte + PeerAddr() *net.TCPAddr +} + func NewTrawlingManager(mlAddrs []string, interval time.Duration) *TrawlingManager { manager := new(TrawlingManager) - manager.output = make(chan mainline.TrawlingResult) + manager.output = make(chan Result, 20) if mlAddrs == nil { mlAddrs = []string{"0.0.0.0:0"} @@ -24,7 +33,7 @@ func NewTrawlingManager(mlAddrs []string, interval time.Duration) *TrawlingManag 2000, interval, mainline.TrawlingServiceEventHandlers{ - OnResult: manager.onResult, + OnResult: manager.onTrawlingResult, }, )) } @@ -36,11 +45,15 @@ func NewTrawlingManager(mlAddrs []string, interval time.Duration) *TrawlingManag return manager } -func (m *TrawlingManager) onResult(res mainline.TrawlingResult) { - m.output <- res +func (m *TrawlingManager) onTrawlingResult(res mainline.TrawlingResult) { + select { + case m.output <- res: + default: + zap.L().Warn("DHT manager output ch is full, result dropped!") + } } -func (m *TrawlingManager) Output() <-chan mainline.TrawlingResult { +func (m *TrawlingManager) Output() <-chan Result { return m.output } diff --git a/cmd/magneticod/main.go b/cmd/magneticod/main.go index 3ecb6d3..9bfd686 100644 --- a/cmd/magneticod/main.go +++ b/cmd/magneticod/main.go @@ -126,8 +126,10 @@ func main() { for stopped := false; !stopped; { select { case result := <-trawlingManager.Output(): - zap.L().Debug("Trawled!", util.HexField("infoHash", result.InfoHash[:])) - exists, err := database.DoesTorrentExist(result.InfoHash[:]) + infoHash := result.InfoHash() + + zap.L().Debug("Trawled!", util.HexField("infoHash", infoHash[:])) + exists, err := database.DoesTorrentExist(infoHash[:]) if err != nil { zap.L().Fatal("Could not check whether torrent exists!", zap.Error(err)) } else if !exists {