[magneticod] added mutex to BT/metadata/Sink.incomingInfoHashes

This commit is contained in:
Bora Alper 2018-08-07 10:32:12 +03:00
parent aa40589e0a
commit 4854239576
2 changed files with 17 additions and 10 deletions

View File

@ -309,7 +309,7 @@ func (l *Leech) Do(deadline time.Time) {
l.OnError(errors.Wrap(err, "doExHandshake")) l.OnError(errors.Wrap(err, "doExHandshake"))
return return
} }
err = l.requestAllPieces() err = l.requestAllPieces()
if err != nil { if err != nil {
l.OnError(errors.Wrap(err, "requestAllPieces")) l.OnError(errors.Wrap(err, "requestAllPieces"))

View File

@ -2,6 +2,7 @@ package metadata
import ( import (
"crypto/rand" "crypto/rand"
"sync"
"time" "time"
"github.com/boramalper/magnetico/pkg/util" "github.com/boramalper/magnetico/pkg/util"
@ -23,12 +24,13 @@ type Metadata struct {
} }
type Sink struct { type Sink struct {
clientID []byte clientID []byte
deadline time.Duration deadline time.Duration
drain chan Metadata drain chan Metadata
incomingInfoHashes map[[20]byte]struct{} incomingInfoHashes map[[20]byte]struct{}
terminated bool incomingInfoHashesMx sync.Mutex
termination chan interface{} terminated bool
termination chan interface{}
} }
func NewSink(deadline time.Duration) *Sink { func NewSink(deadline time.Duration) *Sink {
@ -50,6 +52,8 @@ func (ms *Sink) Sink(res mainline.TrawlingResult) {
if ms.terminated { if ms.terminated {
zap.L().Panic("Trying to Sink() an already closed Sink!") zap.L().Panic("Trying to Sink() an already closed Sink!")
} }
ms.incomingInfoHashesMx.Lock()
defer ms.incomingInfoHashesMx.Unlock()
if _, exists := ms.incomingInfoHashes[res.InfoHash]; exists { if _, exists := ms.incomingInfoHashes[res.InfoHash]; exists {
return return
@ -62,11 +66,10 @@ func (ms *Sink) Sink(res mainline.TrawlingResult) {
zap.L().Info("Sunk!", zap.Int("leeches", len(ms.incomingInfoHashes)), util.HexField("infoHash", res.InfoHash[:])) zap.L().Info("Sunk!", zap.Int("leeches", len(ms.incomingInfoHashes)), util.HexField("infoHash", res.InfoHash[:]))
leech := NewLeech(res.InfoHash, res.PeerAddr, LeechEventHandlers{ go NewLeech(res.InfoHash, res.PeerAddr, LeechEventHandlers{
OnSuccess: ms.flush, OnSuccess: ms.flush,
OnError: ms.onLeechError, OnError: ms.onLeechError,
}) }).Do(time.Now().Add(ms.deadline))
go leech.Do(time.Now().Add(ms.deadline))
ms.incomingInfoHashes[res.InfoHash] = struct{}{} ms.incomingInfoHashes[res.InfoHash] = struct{}{}
} }
@ -91,11 +94,15 @@ func (ms *Sink) flush(result Metadata) {
// metadata! // metadata!
var infoHash [20]byte var infoHash [20]byte
copy(infoHash[:], result.InfoHash) copy(infoHash[:], result.InfoHash)
ms.incomingInfoHashesMx.Lock()
delete(ms.incomingInfoHashes, infoHash) delete(ms.incomingInfoHashes, infoHash)
ms.incomingInfoHashesMx.Unlock()
} }
} }
func (ms *Sink) onLeechError(infoHash [20]byte, err error) { func (ms *Sink) onLeechError(infoHash [20]byte, err error) {
zap.L().Debug("leech error", util.HexField("infoHash", infoHash[:]), zap.Error(err)) zap.L().Debug("leech error", util.HexField("infoHash", infoHash[:]), zap.Error(err))
ms.incomingInfoHashesMx.Lock()
delete(ms.incomingInfoHashes, infoHash) delete(ms.incomingInfoHashes, infoHash)
ms.incomingInfoHashesMx.Unlock()
} }