From 4b987d904a38fbc7e322da21903dba22498fe922 Mon Sep 17 00:00:00 2001 From: "Bora M. Alper" Date: Mon, 21 Aug 2017 17:15:41 +0300 Subject: [PATCH] added fetcher timeout, proto. tests, fixed codec, ignored "vote" queries --- src/magneticod/bittorrent/operations.go | 12 +++++++---- src/magneticod/bittorrent/sink.go | 3 ++- src/magneticod/dht/mainline/codec.go | 11 +++++----- src/magneticod/dht/mainline/protocol.go | 8 ++++++-- src/magneticod/dht/mainline/protocol_test.go | 21 ++++++++++++++++++++ 5 files changed, 43 insertions(+), 12 deletions(-) diff --git a/src/magneticod/bittorrent/operations.go b/src/magneticod/bittorrent/operations.go index ca229a6..fec2768 100644 --- a/src/magneticod/bittorrent/operations.go +++ b/src/magneticod/bittorrent/operations.go @@ -22,18 +22,22 @@ func (ms *MetadataSink) awaitMetadata(infoHash metainfo.Hash, peer torrent.Peer) // awaitMetadata goroutines waiting on the same torrent. return } else { - // Drop the torrent once we got the metadata. + // Drop the torrent once we return from this function, whether we got the metadata or an + // error. defer t.Drop() } // Wait for the torrent client to receive the metadata for the torrent, meanwhile allowing // termination to be handled gracefully. - zap.L().Sugar().Debugf("awaiting %x...", infoHash[:]) select { - case <- ms.termination: + case <- t.GotInfo(): + + case <-time.After(5 * time.Minute): + zap.L().Sugar().Debugf("Fetcher timeout! %x", infoHash) return - case <- t.GotInfo(): + case <- ms.termination: + return } info := t.Info() diff --git a/src/magneticod/bittorrent/sink.go b/src/magneticod/bittorrent/sink.go index 30fe7bb..09f7192 100644 --- a/src/magneticod/bittorrent/sink.go +++ b/src/magneticod/bittorrent/sink.go @@ -40,7 +40,8 @@ func NewMetadataSink(laddr net.TCPAddr) *MetadataSink { DisablePEX: true, // TODO: Should we disable DHT to force the client to use the peers we supplied only, or not? NoDHT: true, - PreferNoEncryption: true, + Seed: false, + }) if err != nil { diff --git a/src/magneticod/dht/mainline/codec.go b/src/magneticod/dht/mainline/codec.go index 0cee2f6..ee66ed6 100644 --- a/src/magneticod/dht/mainline/codec.go +++ b/src/magneticod/dht/mainline/codec.go @@ -236,11 +236,12 @@ func (e Error) MarshalBencode() ([]byte, error) { func (e *Error) UnmarshalBencode(b []byte) (err error) { var code, msgLen int - regex := regexp.MustCompile(`li([0-9]+)e([0-9]+):(.+)e`) - // I don't know how to use regexp.Regexp.FindAllSubmatch: - // TODO: Why three level deep slices? - // TODO: What is @n? - matches := regex.FindAllSubmatch(b, 1)[0][1:] + result := regexp.MustCompile(`li([0-9]+)e([0-9]+):(.+)e`).FindAllSubmatch(b, 1) + if len(result) == 0 { + return fmt.Errorf("could not parse the error list") + } + + matches := result[0][1:] if _, err := fmt.Sscanf(string(matches[0]), "%d", &code); err != nil { return fmt.Errorf("could not parse the error code: %s", err.Error()) } diff --git a/src/magneticod/dht/mainline/protocol.go b/src/magneticod/dht/mainline/protocol.go index 73de6dd..42249ca 100644 --- a/src/magneticod/dht/mainline/protocol.go +++ b/src/magneticod/dht/mainline/protocol.go @@ -105,6 +105,9 @@ func (p *Protocol) onMessage(msg *Message, addr net.Addr) { p.eventHandlers.OnAnnouncePeerQuery(msg, addr) } + case "vote": + // Although we are aware that such method exists, we ignore. + default: zap.L().Debug("A KRPC query of an unknown method received!", zap.String("method", msg.Q)) @@ -140,8 +143,9 @@ func (p *Protocol) onMessage(msg *Message, addr net.Addr) { case "e": zap.L().Sugar().Debugf("Protocol error received: `%s` (%d)", msg.E.Message, msg.E.Code) default: - zap.L().Debug("A KRPC message of an unknown type received!", + /* zap.L().Debug("A KRPC message of an unknown type received!", zap.String("type", msg.Y)) + */ } } @@ -159,7 +163,7 @@ func NewPingQuery(id []byte) *Message { func NewFindNodeQuery(id []byte, target []byte) *Message { return &Message{ Y: "q", - T: []byte("\x00"), + T: []byte("aa"), Q: "find_node", A: QueryArguments{ ID: id, diff --git a/src/magneticod/dht/mainline/protocol_test.go b/src/magneticod/dht/mainline/protocol_test.go index c12ea23..abbb0de 100644 --- a/src/magneticod/dht/mainline/protocol_test.go +++ b/src/magneticod/dht/mainline/protocol_test.go @@ -198,3 +198,24 @@ func TestValidators(t *testing.T) { } } } + + +func TestNewFindNodeQuery(t *testing.T) { + if !validateFindNodeQueryMessage(NewFindNodeQuery([]byte("qwertyuopasdfghjklzx"), []byte("xzlkjhgfdsapouytrewq"))) { + t.Errorf("NewFindNodeQuery returned an invalid message!") + } +} + + +func TestNewPingResponse(t *testing.T) { + if !validatePingORannouncePeerResponseMessage(NewPingResponse([]byte("tt"), []byte("qwertyuopasdfghjklzx"))) { + t.Errorf("NewPingResponse returned an invalid message!") + } +} + + +func TestNewGetPeersResponseWithNodes(t *testing.T) { + if !validateGetPeersResponseMessage(NewGetPeersResponseWithNodes([]byte("tt"), []byte("qwertyuopasdfghjklzx"), []byte("token"), []CompactNodeInfo{})) { + t.Errorf("NewGetPeersResponseWithNodes returned an invalid message!") + } +} \ No newline at end of file