Merge pull request #225 from lruggieri/concurrency_fix

fixed concurrency on routingTable map
This commit is contained in:
Bora M. Alper 2019-11-09 16:49:32 +00:00 committed by GitHub
commit 17e43fa504
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -22,7 +22,7 @@ type IndexingService struct {
// (or even the conversion between each other) is a pain; hence map[string]net.UDPAddr // (or even the conversion between each other) is a pain; hence map[string]net.UDPAddr
// ^~~~~~ // ^~~~~~
routingTable map[string]*net.UDPAddr routingTable map[string]*net.UDPAddr
routingTableMutex *sync.Mutex routingTableMutex sync.RWMutex
maxNeighbors uint maxNeighbors uint
counter uint16 counter uint16
@ -59,7 +59,6 @@ func NewIndexingService(laddr string, interval time.Duration, maxNeighbors uint,
) )
service.nodeID = make([]byte, 20) service.nodeID = make([]byte, 20)
service.routingTable = make(map[string]*net.UDPAddr) service.routingTable = make(map[string]*net.UDPAddr)
service.routingTableMutex = new(sync.Mutex)
service.maxNeighbors = maxNeighbors service.maxNeighbors = maxNeighbors
service.eventHandlers = eventHandlers service.eventHandlers = eventHandlers
@ -86,17 +85,20 @@ func (is *IndexingService) Terminate() {
func (is *IndexingService) index() { func (is *IndexingService) index() {
for range time.Tick(is.interval) { for range time.Tick(is.interval) {
is.routingTableMutex.Lock() is.routingTableMutex.RLock()
if len(is.routingTable) == 0 { routingTableLen := len(is.routingTable)
is.routingTableMutex.RUnlock()
if routingTableLen == 0 {
is.bootstrap() is.bootstrap()
} else { } else {
zap.L().Info("Latest status:", zap.Int("n", len(is.routingTable)), zap.L().Info("Latest status:", zap.Int("n", routingTableLen),
zap.Uint("maxNeighbors", is.maxNeighbors)) zap.Uint("maxNeighbors", is.maxNeighbors))
//TODO //TODO
is.findNeighbors() is.findNeighbors()
is.routingTableMutex.Lock()
is.routingTable = make(map[string]*net.UDPAddr) is.routingTable = make(map[string]*net.UDPAddr)
is.routingTableMutex.Unlock()
} }
is.routingTableMutex.Unlock()
} }
} }
@ -128,7 +130,20 @@ func (is *IndexingService) bootstrap() {
func (is *IndexingService) findNeighbors() { func (is *IndexingService) findNeighbors() {
target := make([]byte, 20) target := make([]byte, 20)
/*
We could just RLock and defer RUnlock here, but that would mean that each response that we get could not Lock
the table because we are sending. So we would basically make read and write NOT concurrent.
A better approach would be to get all addresses to send in a slice and then work on that, releasing the main map.
*/
is.routingTableMutex.RLock()
addressesToSend := make([]*net.UDPAddr, 0, len(is.routingTable))
for _, addr := range is.routingTable { for _, addr := range is.routingTable {
addressesToSend = append(addressesToSend, addr)
}
is.routingTableMutex.RUnlock()
for _, addr := range addressesToSend {
_, err := rand.Read(target) _, err := rand.Read(target)
if err != nil { if err != nil {
zap.L().Panic("Could NOT generate random bytes during bootstrapping!") zap.L().Panic("Could NOT generate random bytes during bootstrapping!")
@ -219,6 +234,8 @@ func (is *IndexingService) onSampleInfohashesResponse(msg *Message, addr *net.UD
} }
// iterate // iterate
is.routingTableMutex.Lock()
defer is.routingTableMutex.Unlock()
for _, node := range msg.R.Nodes { for _, node := range msg.R.Nodes {
if uint(len(is.routingTable)) >= is.maxNeighbors { if uint(len(is.routingTable)) >= is.maxNeighbors {
break break