diff --git a/magneticod/bittorrent/sinkMetadata.go b/magneticod/bittorrent/sinkMetadata.go index 55842f3..2d7ff32 100644 --- a/magneticod/bittorrent/sinkMetadata.go +++ b/magneticod/bittorrent/sinkMetadata.go @@ -32,6 +32,7 @@ type MetadataSink struct { clientID []byte deadline time.Duration drain chan Metadata + incomingInfoHashes map[[20]byte]struct{} terminated bool termination chan interface{} } @@ -44,13 +45,9 @@ func NewMetadataSink(deadline time.Duration) *MetadataSink { if err != nil { zap.L().Panic("sinkMetadata couldn't read 20 random bytes for client ID!", zap.Error(err)) } - // TODO: remove this - if len(ms.clientID) != 20 { - panic("CLIENT ID NOT 20!") - } - ms.deadline = deadline ms.drain = make(chan Metadata) + ms.incomingInfoHashes = make(map[[20]byte]struct{}) ms.termination = make(chan interface{}) return ms } @@ -60,6 +57,15 @@ func (ms *MetadataSink) Sink(res mainline.TrawlingResult) { zap.L().Panic("Trying to Sink() an already closed MetadataSink!") } + if _, exists := ms.incomingInfoHashes[res.InfoHash]; exists { + return + } + // BEWARE! + // Although not crucial, the assumption is that MetadataSink.Sink() will be called by only one + // goroutine (i.e. it's not thread-safe), lest there might be a race condition between where we + // check whether res.infoHash exists in the ms.incomingInfoHashes, and where we add the infoHash + // to the incomingInfoHashes at the end of this function. + IPs := res.PeerIP.String() var rhostport string if IPs == "" { @@ -81,6 +87,8 @@ func (ms *MetadataSink) Sink(res mainline.TrawlingResult) { } go ms.awaitMetadata(res.InfoHash, Peer{Addr: raddr}) + + ms.incomingInfoHashes[res.InfoHash] = struct{}{} } func (ms *MetadataSink) Drain() <-chan Metadata { @@ -99,5 +107,10 @@ func (ms *MetadataSink) Terminate() { func (ms *MetadataSink) flush(result Metadata) { if !ms.terminated { ms.drain <- result + // Delete the infoHash from ms.incomingInfoHashes ONLY AFTER once we've flushed the + // metadata! + var infoHash [20]byte + copy(infoHash[:], result.InfoHash) + delete(ms.incomingInfoHashes, infoHash) } }