now we check whether a torrent exists in the database
This commit is contained in:
parent
09d1a57a87
commit
da5d5d9d42
@ -4,15 +4,14 @@ import (
|
|||||||
"net"
|
"net"
|
||||||
"os"
|
"os"
|
||||||
"os/signal"
|
"os/signal"
|
||||||
|
"regexp"
|
||||||
|
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
"go.uber.org/zap/zapcore"
|
||||||
"github.com/jessevdk/go-flags"
|
"github.com/jessevdk/go-flags"
|
||||||
|
|
||||||
// "magneticod/bittorrent"
|
|
||||||
"magneticod/dht"
|
|
||||||
"go.uber.org/zap/zapcore"
|
|
||||||
"regexp"
|
|
||||||
"magneticod/bittorrent"
|
"magneticod/bittorrent"
|
||||||
|
"magneticod/dht"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@ -133,7 +132,9 @@ func main() {
|
|||||||
select {
|
select {
|
||||||
case result := <-trawlingManager.Output():
|
case result := <-trawlingManager.Output():
|
||||||
logger.Info("result: ", zap.String("hash", result.InfoHash.String()))
|
logger.Info("result: ", zap.String("hash", result.InfoHash.String()))
|
||||||
|
if !database.DoesExist(result.InfoHash[:]) {
|
||||||
metadataSink.Sink(result)
|
metadataSink.Sink(result)
|
||||||
|
}
|
||||||
|
|
||||||
case metadata := <-metadataSink.Drain():
|
case metadata := <-metadataSink.Drain():
|
||||||
logger.Sugar().Infof("D I S C O V E R E D: `%s` %x",
|
logger.Sugar().Infof("D I S C O V E R E D: `%s` %x",
|
||||||
|
@ -12,6 +12,7 @@ import (
|
|||||||
|
|
||||||
"path"
|
"path"
|
||||||
"os"
|
"os"
|
||||||
|
"bytes"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@ -75,8 +76,39 @@ func NewDatabase(rawurl string) (*Database, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
func (db *Database) DoesExist(infoHash []byte) bool {
|
||||||
|
for _, torrent := range db.newTorrents {
|
||||||
|
if bytes.Equal(infoHash, torrent.InfoHash) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
rows, err := db.database.Query("SELECT info_hash FROM torrents WHERE info_hash = ?;", infoHash)
|
||||||
|
if err != nil {
|
||||||
|
zap.L().Sugar().Fatalf("Could not query whether a torrent exists in the database! %s", err.Error())
|
||||||
|
}
|
||||||
|
defer rows.Close()
|
||||||
|
|
||||||
|
// If rows.Next() returns true, meaning that the torrent is in the database, return true; else
|
||||||
|
// return false.
|
||||||
|
return rows.Next()
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
// AddNewTorrent adds a new torrent to the *queue* to be flushed to the persistent database.
|
// AddNewTorrent adds a new torrent to the *queue* to be flushed to the persistent database.
|
||||||
func (db *Database) AddNewTorrent(torrent bittorrent.Metadata) error {
|
func (db *Database) AddNewTorrent(torrent bittorrent.Metadata) error {
|
||||||
|
// Although we check whether the torrent exists in the database before asking MetadataSink to
|
||||||
|
// fetch its metadata, the torrent can also exists in the Sink before that. Now, if a torrent in
|
||||||
|
// the sink is still being fetched, that's still not a problem as we just add the new peer for
|
||||||
|
// the torrent and exit, but if the torrent is complete (i.e. its metadata) and if its waiting
|
||||||
|
// in the channel to be received, a race condition arises when we query the database and seeing
|
||||||
|
// that it doesn't exists there, add it to the sink.
|
||||||
|
// Hence check for the last time whether the torrent exists in the database, and only if not,
|
||||||
|
// add it.
|
||||||
|
if db.DoesExist(torrent.InfoHash) {
|
||||||
|
return nil;
|
||||||
|
}
|
||||||
|
|
||||||
db.newTorrents = append(db.newTorrents, torrent)
|
db.newTorrents = append(db.newTorrents, torrent)
|
||||||
|
|
||||||
if len(db.newTorrents) >= 10 {
|
if len(db.newTorrents) >= 10 {
|
||||||
|
Loading…
Reference in New Issue
Block a user