fixed concurrency on routingTable map

This commit is contained in:
Luca Ruggieri 2019-10-27 10:26:24 +01:00
parent c4215d2657
commit 7ec193b598

View File

@ -22,7 +22,7 @@ type IndexingService struct {
// (or even the conversion between each other) is a pain; hence map[string]net.UDPAddr
// ^~~~~~
routingTable map[string]*net.UDPAddr
routingTableMutex *sync.Mutex
routingTableMutex sync.RWMutex
maxNeighbors uint
counter uint16
@ -59,7 +59,6 @@ func NewIndexingService(laddr string, interval time.Duration, maxNeighbors uint,
)
service.nodeID = make([]byte, 20)
service.routingTable = make(map[string]*net.UDPAddr)
service.routingTableMutex = new(sync.Mutex)
service.maxNeighbors = maxNeighbors
service.eventHandlers = eventHandlers
@ -86,18 +85,21 @@ func (is *IndexingService) Terminate() {
func (is *IndexingService) index() {
for range time.Tick(is.interval) {
is.routingTableMutex.Lock()
if len(is.routingTable) == 0 {
is.routingTableMutex.RLock()
routingTableLen := len(is.routingTable)
is.routingTableMutex.RUnlock()
if routingTableLen == 0 {
is.bootstrap()
} else {
zap.L().Info("Latest status:", zap.Int("n", len(is.routingTable)),
zap.L().Info("Latest status:", zap.Int("n", routingTableLen),
zap.Uint("maxNeighbors", is.maxNeighbors))
//TODO
is.findNeighbors()
is.routingTableMutex.Lock()
is.routingTable = make(map[string]*net.UDPAddr)
}
is.routingTableMutex.Unlock()
}
}
}
func (is *IndexingService) bootstrap() {
@ -128,7 +130,20 @@ func (is *IndexingService) bootstrap() {
func (is *IndexingService) findNeighbors() {
target := make([]byte, 20)
/*
We could just RLock and defer RUnlock here, but that would mean that each response that we get could not Lock
the table because we are sending. So we would basically make read and write NOT concurrent.
A better approach would be to get all addresses to send in a slice and then work on that, releasing the main map.
*/
is.routingTableMutex.RLock()
addressesToSend := make([]*net.UDPAddr, 0, len(is.routingTable))
for _, addr := range is.routingTable {
addressesToSend = append(addressesToSend, addr)
}
is.routingTableMutex.RUnlock()
for _, addr := range addressesToSend {
_, err := rand.Read(target)
if err != nil {
zap.L().Panic("Could NOT generate random bytes during bootstrapping!")
@ -219,6 +234,8 @@ func (is *IndexingService) onSampleInfohashesResponse(msg *Message, addr *net.UD
}
// iterate
is.routingTableMutex.Lock()
defer is.routingTableMutex.Unlock()
for _, node := range msg.R.Nodes {
if uint(len(is.routingTable)) >= is.maxNeighbors {
break