From da5d5d9d4283ecf737d869f9255f068f9fdc8ba2 Mon Sep 17 00:00:00 2001 From: "Bora M. Alper" Date: Wed, 16 Aug 2017 20:28:33 +0300 Subject: [PATCH] now we check whether a torrent exists in the database --- src/magneticod/main.go | 11 ++++++----- src/magneticod/persistence.go | 32 ++++++++++++++++++++++++++++++++ 2 files changed, 38 insertions(+), 5 deletions(-) diff --git a/src/magneticod/main.go b/src/magneticod/main.go index 3da7e6e..2c743c2 100644 --- a/src/magneticod/main.go +++ b/src/magneticod/main.go @@ -4,15 +4,14 @@ import ( "net" "os" "os/signal" + "regexp" "go.uber.org/zap" + "go.uber.org/zap/zapcore" "github.com/jessevdk/go-flags" -// "magneticod/bittorrent" - "magneticod/dht" - "go.uber.org/zap/zapcore" - "regexp" "magneticod/bittorrent" + "magneticod/dht" ) @@ -133,7 +132,9 @@ func main() { select { case result := <-trawlingManager.Output(): logger.Info("result: ", zap.String("hash", result.InfoHash.String())) - metadataSink.Sink(result) + if !database.DoesExist(result.InfoHash[:]) { + metadataSink.Sink(result) + } case metadata := <-metadataSink.Drain(): logger.Sugar().Infof("D I S C O V E R E D: `%s` %x", diff --git a/src/magneticod/persistence.go b/src/magneticod/persistence.go index 208714d..eed15fd 100644 --- a/src/magneticod/persistence.go +++ b/src/magneticod/persistence.go @@ -12,6 +12,7 @@ import ( "path" "os" + "bytes" ) @@ -75,8 +76,39 @@ func NewDatabase(rawurl string) (*Database, error) { } +func (db *Database) DoesExist(infoHash []byte) bool { + for _, torrent := range db.newTorrents { + if bytes.Equal(infoHash, torrent.InfoHash) { + return true; + } + } + + rows, err := db.database.Query("SELECT info_hash FROM torrents WHERE info_hash = ?;", infoHash) + if err != nil { + zap.L().Sugar().Fatalf("Could not query whether a torrent exists in the database! %s", err.Error()) + } + defer rows.Close() + + // If rows.Next() returns true, meaning that the torrent is in the database, return true; else + // return false. + return rows.Next() +} + + // AddNewTorrent adds a new torrent to the *queue* to be flushed to the persistent database. func (db *Database) AddNewTorrent(torrent bittorrent.Metadata) error { + // Although we check whether the torrent exists in the database before asking MetadataSink to + // fetch its metadata, the torrent can also exists in the Sink before that. Now, if a torrent in + // the sink is still being fetched, that's still not a problem as we just add the new peer for + // the torrent and exit, but if the torrent is complete (i.e. its metadata) and if its waiting + // in the channel to be received, a race condition arises when we query the database and seeing + // that it doesn't exists there, add it to the sink. + // Hence check for the last time whether the torrent exists in the database, and only if not, + // add it. + if db.DoesExist(torrent.InfoHash) { + return nil; + } + db.newTorrents = append(db.newTorrents, torrent) if len(db.newTorrents) >= 10 {