diff --git a/cmd/magneticod/bittorrent/metadata/leech.go b/cmd/magneticod/bittorrent/metadata/leech.go index 22b5729..3ca7397 100644 --- a/cmd/magneticod/bittorrent/metadata/leech.go +++ b/cmd/magneticod/bittorrent/metadata/leech.go @@ -309,7 +309,7 @@ func (l *Leech) Do(deadline time.Time) { l.OnError(errors.Wrap(err, "doExHandshake")) return } - + err = l.requestAllPieces() if err != nil { l.OnError(errors.Wrap(err, "requestAllPieces")) diff --git a/cmd/magneticod/bittorrent/metadata/sink.go b/cmd/magneticod/bittorrent/metadata/sink.go index 0fa8e10..2842a9c 100644 --- a/cmd/magneticod/bittorrent/metadata/sink.go +++ b/cmd/magneticod/bittorrent/metadata/sink.go @@ -2,6 +2,7 @@ package metadata import ( "crypto/rand" + "sync" "time" "github.com/boramalper/magnetico/pkg/util" @@ -23,12 +24,13 @@ type Metadata struct { } type Sink struct { - clientID []byte - deadline time.Duration - drain chan Metadata - incomingInfoHashes map[[20]byte]struct{} - terminated bool - termination chan interface{} + clientID []byte + deadline time.Duration + drain chan Metadata + incomingInfoHashes map[[20]byte]struct{} + incomingInfoHashesMx sync.Mutex + terminated bool + termination chan interface{} } func NewSink(deadline time.Duration) *Sink { @@ -50,6 +52,8 @@ func (ms *Sink) Sink(res mainline.TrawlingResult) { if ms.terminated { zap.L().Panic("Trying to Sink() an already closed Sink!") } + ms.incomingInfoHashesMx.Lock() + defer ms.incomingInfoHashesMx.Unlock() if _, exists := ms.incomingInfoHashes[res.InfoHash]; exists { return @@ -62,11 +66,10 @@ func (ms *Sink) Sink(res mainline.TrawlingResult) { zap.L().Info("Sunk!", zap.Int("leeches", len(ms.incomingInfoHashes)), util.HexField("infoHash", res.InfoHash[:])) - leech := NewLeech(res.InfoHash, res.PeerAddr, LeechEventHandlers{ + go NewLeech(res.InfoHash, res.PeerAddr, LeechEventHandlers{ OnSuccess: ms.flush, OnError: ms.onLeechError, - }) - go leech.Do(time.Now().Add(ms.deadline)) + }).Do(time.Now().Add(ms.deadline)) ms.incomingInfoHashes[res.InfoHash] = struct{}{} } @@ -91,11 +94,15 @@ func (ms *Sink) flush(result Metadata) { // metadata! var infoHash [20]byte copy(infoHash[:], result.InfoHash) + ms.incomingInfoHashesMx.Lock() delete(ms.incomingInfoHashes, infoHash) + ms.incomingInfoHashesMx.Unlock() } } func (ms *Sink) onLeechError(infoHash [20]byte, err error) { zap.L().Debug("leech error", util.HexField("infoHash", infoHash[:]), zap.Error(err)) + ms.incomingInfoHashesMx.Lock() delete(ms.incomingInfoHashes, infoHash) + ms.incomingInfoHashesMx.Unlock() }