added incomingInfoHashes to keep track of active fetching operations
This commit is contained in:
parent
7048110cd0
commit
453ecae634
@ -32,6 +32,7 @@ type MetadataSink struct {
|
|||||||
clientID []byte
|
clientID []byte
|
||||||
deadline time.Duration
|
deadline time.Duration
|
||||||
drain chan Metadata
|
drain chan Metadata
|
||||||
|
incomingInfoHashes map[[20]byte]struct{}
|
||||||
terminated bool
|
terminated bool
|
||||||
termination chan interface{}
|
termination chan interface{}
|
||||||
}
|
}
|
||||||
@ -44,13 +45,9 @@ func NewMetadataSink(deadline time.Duration) *MetadataSink {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
zap.L().Panic("sinkMetadata couldn't read 20 random bytes for client ID!", zap.Error(err))
|
zap.L().Panic("sinkMetadata couldn't read 20 random bytes for client ID!", zap.Error(err))
|
||||||
}
|
}
|
||||||
// TODO: remove this
|
|
||||||
if len(ms.clientID) != 20 {
|
|
||||||
panic("CLIENT ID NOT 20!")
|
|
||||||
}
|
|
||||||
|
|
||||||
ms.deadline = deadline
|
ms.deadline = deadline
|
||||||
ms.drain = make(chan Metadata)
|
ms.drain = make(chan Metadata)
|
||||||
|
ms.incomingInfoHashes = make(map[[20]byte]struct{})
|
||||||
ms.termination = make(chan interface{})
|
ms.termination = make(chan interface{})
|
||||||
return ms
|
return ms
|
||||||
}
|
}
|
||||||
@ -60,6 +57,15 @@ func (ms *MetadataSink) Sink(res mainline.TrawlingResult) {
|
|||||||
zap.L().Panic("Trying to Sink() an already closed MetadataSink!")
|
zap.L().Panic("Trying to Sink() an already closed MetadataSink!")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if _, exists := ms.incomingInfoHashes[res.InfoHash]; exists {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// BEWARE!
|
||||||
|
// Although not crucial, the assumption is that MetadataSink.Sink() will be called by only one
|
||||||
|
// goroutine (i.e. it's not thread-safe), lest there might be a race condition between where we
|
||||||
|
// check whether res.infoHash exists in the ms.incomingInfoHashes, and where we add the infoHash
|
||||||
|
// to the incomingInfoHashes at the end of this function.
|
||||||
|
|
||||||
IPs := res.PeerIP.String()
|
IPs := res.PeerIP.String()
|
||||||
var rhostport string
|
var rhostport string
|
||||||
if IPs == "<nil>" {
|
if IPs == "<nil>" {
|
||||||
@ -81,6 +87,8 @@ func (ms *MetadataSink) Sink(res mainline.TrawlingResult) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
go ms.awaitMetadata(res.InfoHash, Peer{Addr: raddr})
|
go ms.awaitMetadata(res.InfoHash, Peer{Addr: raddr})
|
||||||
|
|
||||||
|
ms.incomingInfoHashes[res.InfoHash] = struct{}{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ms *MetadataSink) Drain() <-chan Metadata {
|
func (ms *MetadataSink) Drain() <-chan Metadata {
|
||||||
@ -99,5 +107,10 @@ func (ms *MetadataSink) Terminate() {
|
|||||||
func (ms *MetadataSink) flush(result Metadata) {
|
func (ms *MetadataSink) flush(result Metadata) {
|
||||||
if !ms.terminated {
|
if !ms.terminated {
|
||||||
ms.drain <- result
|
ms.drain <- result
|
||||||
|
// Delete the infoHash from ms.incomingInfoHashes ONLY AFTER once we've flushed the
|
||||||
|
// metadata!
|
||||||
|
var infoHash [20]byte
|
||||||
|
copy(infoHash[:], result.InfoHash)
|
||||||
|
delete(ms.incomingInfoHashes, infoHash)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user