magnetico/cmd/magneticod/dht/mainline/indexingService.go
Michał Gątkowski 53d0ba0410 Clean up logs
2022-08-05 22:39:09 +02:00

275 lines
7.1 KiB
Go

package mainline
import (
"math/rand"
"net"
"sync"
"time"
"go.uber.org/zap"
)
type IndexingService struct {
// Private
protocol *Protocol
started bool
interval time.Duration
eventHandlers IndexingServiceEventHandlers
nodeID []byte
// []byte type would be a much better fit for the keys but unfortunately (and quite
// understandably) slices cannot be used as keys (since they are not hashable), and using arrays
// (or even the conversion between each other) is a pain; hence map[string]net.UDPAddr
// ^~~~~~
routingTable map[string]*net.UDPAddr
routingTableMutex sync.RWMutex
maxNeighbors uint
counter uint16
getPeersRequests map[[2]byte][20]byte // GetPeersQuery.`t` -> infohash
}
type IndexingServiceEventHandlers struct {
OnResult func(IndexingResult)
}
type IndexingResult struct {
infoHash [20]byte
peerAddrs []net.TCPAddr
}
func (ir IndexingResult) InfoHash() [20]byte {
return ir.infoHash
}
func (ir IndexingResult) PeerAddrs() []net.TCPAddr {
return ir.peerAddrs
}
func NewIndexingService(laddr string, interval time.Duration, maxNeighbors uint, eventHandlers IndexingServiceEventHandlers) *IndexingService {
service := new(IndexingService)
service.interval = interval
service.protocol = NewProtocol(
laddr,
ProtocolEventHandlers{
OnFindNodeResponse: service.onFindNodeResponse,
OnGetPeersResponse: service.onGetPeersResponse,
OnSampleInfohashesResponse: service.onSampleInfohashesResponse,
},
)
service.nodeID = make([]byte, 20)
service.routingTable = make(map[string]*net.UDPAddr)
service.maxNeighbors = maxNeighbors
service.eventHandlers = eventHandlers
service.getPeersRequests = make(map[[2]byte][20]byte)
return service
}
func (is *IndexingService) Start() {
if is.started {
zap.L().Panic("Attempting to Start() a mainline/IndexingService that has been already started! (Programmer error.)")
}
is.started = true
is.protocol.Start()
go is.index()
zap.L().Info("Indexing Service started!")
}
func (is *IndexingService) Terminate() {
is.protocol.Terminate()
}
func (is *IndexingService) index() {
for range time.Tick(is.interval) {
is.routingTableMutex.RLock()
routingTableLen := len(is.routingTable)
is.routingTableMutex.RUnlock()
if routingTableLen == 0 {
is.bootstrap()
} else {
zap.L().Debug("Latest status:", zap.Int("n", routingTableLen),
zap.Uint("maxNeighbors", is.maxNeighbors))
//TODO
is.findNeighbors()
is.routingTableMutex.Lock()
is.routingTable = make(map[string]*net.UDPAddr)
is.routingTableMutex.Unlock()
}
}
}
func (is *IndexingService) bootstrap() {
bootstrappingNodes := []string{
"router.bittorrent.com:6881",
"dht.transmissionbt.com:6881",
"dht.libtorrent.org:25401",
}
zap.L().Info("Bootstrapping as routing table is empty...")
for _, node := range bootstrappingNodes {
target := make([]byte, 20)
_, err := rand.Read(target)
if err != nil {
zap.L().Panic("Could NOT generate random bytes during bootstrapping!")
}
addr, err := net.ResolveUDPAddr("udp", node)
if err != nil {
zap.L().Error("Could NOT resolve (UDP) address of the bootstrapping node!",
zap.String("node", node))
continue
}
is.protocol.SendMessage(NewFindNodeQuery(is.nodeID, target), addr)
}
}
func (is *IndexingService) findNeighbors() {
target := make([]byte, 20)
/*
We could just RLock and defer RUnlock here, but that would mean that each response that we get could not Lock
the table because we are sending. So we would basically make read and write NOT concurrent.
A better approach would be to get all addresses to send in a slice and then work on that, releasing the main map.
*/
is.routingTableMutex.RLock()
addressesToSend := make([]*net.UDPAddr, 0, len(is.routingTable))
for _, addr := range is.routingTable {
addressesToSend = append(addressesToSend, addr)
}
is.routingTableMutex.RUnlock()
for _, addr := range addressesToSend {
_, err := rand.Read(target)
if err != nil {
zap.L().Panic("Could NOT generate random bytes during bootstrapping!")
}
is.protocol.SendMessage(
NewSampleInfohashesQuery(is.nodeID, []byte("aa"), target),
addr,
)
}
}
func (is *IndexingService) onFindNodeResponse(response *Message, addr *net.UDPAddr) {
is.routingTableMutex.Lock()
defer is.routingTableMutex.Unlock()
for _, node := range response.R.Nodes {
if uint(len(is.routingTable)) >= is.maxNeighbors {
break
}
if node.Addr.Port == 0 { // Ignore nodes who "use" port 0.
continue
}
is.routingTable[string(node.ID)] = &node.Addr
target := make([]byte, 20)
_, err := rand.Read(target)
if err != nil {
zap.L().Panic("Could NOT generate random bytes!")
}
is.protocol.SendMessage(
NewSampleInfohashesQuery(is.nodeID, []byte("aa"), target),
&node.Addr,
)
}
}
func (is *IndexingService) onGetPeersResponse(msg *Message, addr *net.UDPAddr) {
var t [2]byte
copy(t[:], msg.T)
infoHash := is.getPeersRequests[t]
// We got a response, so free the key!
delete(is.getPeersRequests, t)
// BEP 51 specifies that
// The new sample_infohashes remote procedure call requests that a remote node return a string of multiple
// concatenated infohashes (20 bytes each) FOR WHICH IT HOLDS GET_PEERS VALUES.
// ^^^^^^
// So theoretically we should never hit the case where `values` is empty, but c'est la vie.
if len(msg.R.Values) == 0 {
return
}
peerAddrs := make([]net.TCPAddr, 0)
for _, peer := range msg.R.Values {
if peer.Port == 0 {
continue
}
peerAddrs = append(peerAddrs, net.TCPAddr{
IP: peer.IP,
Port: peer.Port,
})
}
is.eventHandlers.OnResult(IndexingResult{
infoHash: infoHash,
peerAddrs: peerAddrs,
})
}
func (is *IndexingService) onSampleInfohashesResponse(msg *Message, addr *net.UDPAddr) {
// request samples
for i := 0; i < len(msg.R.Samples)/20; i++ {
var infoHash [20]byte
copy(infoHash[:], msg.R.Samples[i:(i+1)*20])
msg := NewGetPeersQuery(is.nodeID, infoHash[:])
t := uint16BE(is.counter)
msg.T = t[:]
is.protocol.SendMessage(msg, addr)
is.getPeersRequests[t] = infoHash
is.counter++
}
// TODO: good idea, but also need to track how long they have been here
//if msg.R.Num > len(msg.R.Samples) / 20 && time.Duration(msg.R.Interval) <= is.interval {
// if addr.Port != 0 { // ignore nodes who "use" port 0...
// is.routingTable[string(msg.R.ID)] = addr
// }
//}
// iterate
is.routingTableMutex.Lock()
defer is.routingTableMutex.Unlock()
for _, node := range msg.R.Nodes {
if uint(len(is.routingTable)) >= is.maxNeighbors {
break
}
if node.Addr.Port == 0 { // Ignore nodes who "use" port 0.
continue
}
is.routingTable[string(node.ID)] = &node.Addr
// TODO
/*
target := make([]byte, 20)
_, err := rand.Read(target)
if err != nil {
zap.L().Panic("Could NOT generate random bytes!")
}
is.protocol.SendMessage(
NewSampleInfohashesQuery(is.nodeID, []byte("aa"), target),
&node.Addr,
)
*/
}
}
func uint16BE(v uint16) (b [2]byte) {
b[0] = byte(v >> 8)
b[1] = byte(v)
return
}