From 7ec193b598d3cfb779e289f0658201dd7699c25e Mon Sep 17 00:00:00 2001 From: Luca Ruggieri Date: Sun, 27 Oct 2019 10:26:24 +0100 Subject: [PATCH] fixed concurrency on routingTable map --- .../dht/mainline/indexingService.go | 29 +++++++++++++++---- 1 file changed, 23 insertions(+), 6 deletions(-) diff --git a/cmd/magneticod/dht/mainline/indexingService.go b/cmd/magneticod/dht/mainline/indexingService.go index 2dcdc78..b4f88bb 100644 --- a/cmd/magneticod/dht/mainline/indexingService.go +++ b/cmd/magneticod/dht/mainline/indexingService.go @@ -22,7 +22,7 @@ type IndexingService struct { // (or even the conversion between each other) is a pain; hence map[string]net.UDPAddr // ^~~~~~ routingTable map[string]*net.UDPAddr - routingTableMutex *sync.Mutex + routingTableMutex sync.RWMutex maxNeighbors uint counter uint16 @@ -59,7 +59,6 @@ func NewIndexingService(laddr string, interval time.Duration, maxNeighbors uint, ) service.nodeID = make([]byte, 20) service.routingTable = make(map[string]*net.UDPAddr) - service.routingTableMutex = new(sync.Mutex) service.maxNeighbors = maxNeighbors service.eventHandlers = eventHandlers @@ -86,17 +85,20 @@ func (is *IndexingService) Terminate() { func (is *IndexingService) index() { for range time.Tick(is.interval) { - is.routingTableMutex.Lock() - if len(is.routingTable) == 0 { + is.routingTableMutex.RLock() + routingTableLen := len(is.routingTable) + is.routingTableMutex.RUnlock() + if routingTableLen == 0 { is.bootstrap() } else { - zap.L().Info("Latest status:", zap.Int("n", len(is.routingTable)), + zap.L().Info("Latest status:", zap.Int("n", routingTableLen), zap.Uint("maxNeighbors", is.maxNeighbors)) //TODO is.findNeighbors() + is.routingTableMutex.Lock() is.routingTable = make(map[string]*net.UDPAddr) + is.routingTableMutex.Unlock() } - is.routingTableMutex.Unlock() } } @@ -128,7 +130,20 @@ func (is *IndexingService) bootstrap() { func (is *IndexingService) findNeighbors() { target := make([]byte, 20) + + /* + We could just RLock and defer RUnlock here, but that would mean that each response that we get could not Lock + the table because we are sending. So we would basically make read and write NOT concurrent. + A better approach would be to get all addresses to send in a slice and then work on that, releasing the main map. + */ + is.routingTableMutex.RLock() + addressesToSend := make([]*net.UDPAddr, 0, len(is.routingTable)) for _, addr := range is.routingTable { + addressesToSend = append(addressesToSend, addr) + } + is.routingTableMutex.RUnlock() + + for _, addr := range addressesToSend { _, err := rand.Read(target) if err != nil { zap.L().Panic("Could NOT generate random bytes during bootstrapping!") @@ -219,6 +234,8 @@ func (is *IndexingService) onSampleInfohashesResponse(msg *Message, addr *net.UD } // iterate + is.routingTableMutex.Lock() + defer is.routingTableMutex.Unlock() for _, node := range msg.R.Nodes { if uint(len(is.routingTable)) >= is.maxNeighbors { break