added fetcher timeout, proto. tests, fixed codec, ignored "vote" queries
This commit is contained in:
parent
da5d5d9d42
commit
4b987d904a
@ -22,18 +22,22 @@ func (ms *MetadataSink) awaitMetadata(infoHash metainfo.Hash, peer torrent.Peer)
|
|||||||
// awaitMetadata goroutines waiting on the same torrent.
|
// awaitMetadata goroutines waiting on the same torrent.
|
||||||
return
|
return
|
||||||
} else {
|
} else {
|
||||||
// Drop the torrent once we got the metadata.
|
// Drop the torrent once we return from this function, whether we got the metadata or an
|
||||||
|
// error.
|
||||||
defer t.Drop()
|
defer t.Drop()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Wait for the torrent client to receive the metadata for the torrent, meanwhile allowing
|
// Wait for the torrent client to receive the metadata for the torrent, meanwhile allowing
|
||||||
// termination to be handled gracefully.
|
// termination to be handled gracefully.
|
||||||
zap.L().Sugar().Debugf("awaiting %x...", infoHash[:])
|
|
||||||
select {
|
select {
|
||||||
case <- ms.termination:
|
case <- t.GotInfo():
|
||||||
|
|
||||||
|
case <-time.After(5 * time.Minute):
|
||||||
|
zap.L().Sugar().Debugf("Fetcher timeout! %x", infoHash)
|
||||||
return
|
return
|
||||||
|
|
||||||
case <- t.GotInfo():
|
case <- ms.termination:
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
info := t.Info()
|
info := t.Info()
|
||||||
|
@ -40,7 +40,8 @@ func NewMetadataSink(laddr net.TCPAddr) *MetadataSink {
|
|||||||
DisablePEX: true,
|
DisablePEX: true,
|
||||||
// TODO: Should we disable DHT to force the client to use the peers we supplied only, or not?
|
// TODO: Should we disable DHT to force the client to use the peers we supplied only, or not?
|
||||||
NoDHT: true,
|
NoDHT: true,
|
||||||
PreferNoEncryption: true,
|
Seed: false,
|
||||||
|
|
||||||
|
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -236,11 +236,12 @@ func (e Error) MarshalBencode() ([]byte, error) {
|
|||||||
func (e *Error) UnmarshalBencode(b []byte) (err error) {
|
func (e *Error) UnmarshalBencode(b []byte) (err error) {
|
||||||
var code, msgLen int
|
var code, msgLen int
|
||||||
|
|
||||||
regex := regexp.MustCompile(`li([0-9]+)e([0-9]+):(.+)e`)
|
result := regexp.MustCompile(`li([0-9]+)e([0-9]+):(.+)e`).FindAllSubmatch(b, 1)
|
||||||
// I don't know how to use regexp.Regexp.FindAllSubmatch:
|
if len(result) == 0 {
|
||||||
// TODO: Why three level deep slices?
|
return fmt.Errorf("could not parse the error list")
|
||||||
// TODO: What is @n?
|
}
|
||||||
matches := regex.FindAllSubmatch(b, 1)[0][1:]
|
|
||||||
|
matches := result[0][1:]
|
||||||
if _, err := fmt.Sscanf(string(matches[0]), "%d", &code); err != nil {
|
if _, err := fmt.Sscanf(string(matches[0]), "%d", &code); err != nil {
|
||||||
return fmt.Errorf("could not parse the error code: %s", err.Error())
|
return fmt.Errorf("could not parse the error code: %s", err.Error())
|
||||||
}
|
}
|
||||||
|
@ -105,6 +105,9 @@ func (p *Protocol) onMessage(msg *Message, addr net.Addr) {
|
|||||||
p.eventHandlers.OnAnnouncePeerQuery(msg, addr)
|
p.eventHandlers.OnAnnouncePeerQuery(msg, addr)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
case "vote":
|
||||||
|
// Although we are aware that such method exists, we ignore.
|
||||||
|
|
||||||
default:
|
default:
|
||||||
zap.L().Debug("A KRPC query of an unknown method received!",
|
zap.L().Debug("A KRPC query of an unknown method received!",
|
||||||
zap.String("method", msg.Q))
|
zap.String("method", msg.Q))
|
||||||
@ -140,8 +143,9 @@ func (p *Protocol) onMessage(msg *Message, addr net.Addr) {
|
|||||||
case "e":
|
case "e":
|
||||||
zap.L().Sugar().Debugf("Protocol error received: `%s` (%d)", msg.E.Message, msg.E.Code)
|
zap.L().Sugar().Debugf("Protocol error received: `%s` (%d)", msg.E.Message, msg.E.Code)
|
||||||
default:
|
default:
|
||||||
zap.L().Debug("A KRPC message of an unknown type received!",
|
/* zap.L().Debug("A KRPC message of an unknown type received!",
|
||||||
zap.String("type", msg.Y))
|
zap.String("type", msg.Y))
|
||||||
|
*/
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -159,7 +163,7 @@ func NewPingQuery(id []byte) *Message {
|
|||||||
func NewFindNodeQuery(id []byte, target []byte) *Message {
|
func NewFindNodeQuery(id []byte, target []byte) *Message {
|
||||||
return &Message{
|
return &Message{
|
||||||
Y: "q",
|
Y: "q",
|
||||||
T: []byte("\x00"),
|
T: []byte("aa"),
|
||||||
Q: "find_node",
|
Q: "find_node",
|
||||||
A: QueryArguments{
|
A: QueryArguments{
|
||||||
ID: id,
|
ID: id,
|
||||||
|
@ -198,3 +198,24 @@ func TestValidators(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
func TestNewFindNodeQuery(t *testing.T) {
|
||||||
|
if !validateFindNodeQueryMessage(NewFindNodeQuery([]byte("qwertyuopasdfghjklzx"), []byte("xzlkjhgfdsapouytrewq"))) {
|
||||||
|
t.Errorf("NewFindNodeQuery returned an invalid message!")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
func TestNewPingResponse(t *testing.T) {
|
||||||
|
if !validatePingORannouncePeerResponseMessage(NewPingResponse([]byte("tt"), []byte("qwertyuopasdfghjklzx"))) {
|
||||||
|
t.Errorf("NewPingResponse returned an invalid message!")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
func TestNewGetPeersResponseWithNodes(t *testing.T) {
|
||||||
|
if !validateGetPeersResponseMessage(NewGetPeersResponseWithNodes([]byte("tt"), []byte("qwertyuopasdfghjklzx"), []byte("token"), []CompactNodeInfo{})) {
|
||||||
|
t.Errorf("NewGetPeersResponseWithNodes returned an invalid message!")
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user