diff --git a/src/magneticod/bittorrent/operations.go b/src/magneticod/bittorrent/operations.go index ea8cadc..ca229a6 100644 --- a/src/magneticod/bittorrent/operations.go +++ b/src/magneticod/bittorrent/operations.go @@ -11,26 +11,30 @@ import ( func (ms *MetadataSink) awaitMetadata(infoHash metainfo.Hash, peer torrent.Peer) { - zap.L().Sugar().Debugf("awaiting %x...", infoHash[:]) t, isNew := ms.client.AddTorrentInfoHash(infoHash) + // If the infoHash we added was not new (i.e. it's already being downloaded by the client) + // then t is the handle of the (old) torrent. We add the (presumably new) peer to the torrent + // so we can increase the chance of operation being successful, or that the metadata might be + // fetched. t.AddPeers([]torrent.Peer{peer}) if !isNew { // If the recently added torrent is not new, then quit as we do not want multiple // awaitMetadata goroutines waiting on the same torrent. return } else { + // Drop the torrent once we got the metadata. defer t.Drop() } // Wait for the torrent client to receive the metadata for the torrent, meanwhile allowing // termination to be handled gracefully. + zap.L().Sugar().Debugf("awaiting %x...", infoHash[:]) select { case <- ms.termination: return case <- t.GotInfo(): } - zap.L().Sugar().Warnf("==== GOT INFO for %x", infoHash[:]) info := t.Info() var files []metainfo.FileInfo @@ -38,6 +42,7 @@ func (ms *MetadataSink) awaitMetadata(infoHash metainfo.Hash, peer torrent.Peer) if strings.ContainsRune(info.Name, '/') { // A single file torrent cannot have any '/' characters in its name. We treat it as // illegal. + zap.L().Sugar().Debugf("!!!! illegal character in name! \"%s\"", info.Name) return } files = []metainfo.FileInfo{{Length: info.Length, Path:[]string{info.Name}}} @@ -52,6 +57,7 @@ func (ms *MetadataSink) awaitMetadata(infoHash metainfo.Hash, peer torrent.Peer) if file.Length < 0 { // All files' sizes must be greater than or equal to zero, otherwise treat them as // illegal and ignore. + zap.L().Sugar().Debugf("!!!! file size zero or less! \"%s\" (%d)", file.Path, file.Length) return } totalSize += uint64(file.Length) diff --git a/src/magneticod/bittorrent/sink.go b/src/magneticod/bittorrent/sink.go index 717f31b..30fe7bb 100644 --- a/src/magneticod/bittorrent/sink.go +++ b/src/magneticod/bittorrent/sink.go @@ -39,7 +39,7 @@ func NewMetadataSink(laddr net.TCPAddr) *MetadataSink { DisableTrackers: true, DisablePEX: true, // TODO: Should we disable DHT to force the client to use the peers we supplied only, or not? - NoDHT: false, + NoDHT: true, PreferNoEncryption: true, }) @@ -80,7 +80,7 @@ func (ms *MetadataSink) Terminate() { func (ms *MetadataSink) flush(metadata Metadata) { - if ms.terminated { + if !ms.terminated { ms.drain <- metadata } } diff --git a/src/magneticod/main.go b/src/magneticod/main.go index f3bdfb1..2c743c2 100644 --- a/src/magneticod/main.go +++ b/src/magneticod/main.go @@ -4,15 +4,14 @@ import ( "net" "os" "os/signal" + "regexp" "go.uber.org/zap" + "go.uber.org/zap/zapcore" "github.com/jessevdk/go-flags" -// "magneticod/bittorrent" - "magneticod/dht" - "go.uber.org/zap/zapcore" - "regexp" "magneticod/bittorrent" + "magneticod/dht" ) @@ -133,11 +132,16 @@ func main() { select { case result := <-trawlingManager.Output(): logger.Info("result: ", zap.String("hash", result.InfoHash.String())) - metadataSink.Sink(result) + if !database.DoesExist(result.InfoHash[:]) { + metadataSink.Sink(result) + } case metadata := <-metadataSink.Drain(): + logger.Sugar().Infof("D I S C O V E R E D: `%s` %x", + metadata.Name, metadata.InfoHash) if err := database.AddNewTorrent(metadata); err != nil { - logger.Sugar().Fatalf("Could not add new torrent %x to the database: %s", metadata.InfoHash, err.Error()) + logger.Sugar().Fatalf("Could not add new torrent %x to the database: %s", + metadata.InfoHash, err.Error()) } case <-interrupt_chan: diff --git a/src/magneticod/persistence.go b/src/magneticod/persistence.go index 4cae745..ade719f 100644 --- a/src/magneticod/persistence.go +++ b/src/magneticod/persistence.go @@ -13,6 +13,7 @@ import ( "path" "os" + "bytes" ) type engineType uint8 @@ -26,7 +27,7 @@ const ( type Database struct { database *sql.DB engine engineType - newTorrents chan bittorrent.Metadata + newTorrents [] bittorrent.Metadata } // NewDatabase creates a new Database. @@ -43,23 +44,11 @@ func NewDatabase(rawurl string) (*Database, error) { switch dbURL.Scheme { case "sqlite": db.engine = SQLITE - // All this pain is to make sure that an empty file exist (in case the database is not there - // yet) so that sql.Open won't fail. dbDir, _ := path.Split(dbURL.Path) if err := os.MkdirAll(dbDir, 0755); err != nil { return nil, fmt.Errorf("for directory `%s`: %s", dbDir, err.Error()) } - f, err := os.OpenFile(dbURL.Path, os.O_CREATE, 0666) - if err != nil { - return nil, fmt.Errorf("for file `%s`: %s", dbURL.Path, err.Error()) - } - if err := f.Sync(); err != nil { - return nil, fmt.Errorf("for file `%s`: %s", dbURL.Path, err.Error()) - } - if err := f.Close(); err != nil { - return nil, fmt.Errorf("for file `%s`: %s", dbURL.Path, err.Error()) - } - db.database, err = sql.Open("sqlite3", dbURL.RawPath) + db.database, err = sql.Open("sqlite3", dbURL.Path) case "postgresql": db.engine = POSTGRESQL @@ -75,51 +64,83 @@ func NewDatabase(rawurl string) (*Database, error) { // Check for errors from sql.Open() if err != nil { - return nil, fmt.Errorf("sql.Open()! %s", err.Error()) + return nil, fmt.Errorf("error in sql.Open(): %s", err.Error()) } if err = db.database.Ping(); err != nil { - return nil, fmt.Errorf("DB.Ping()! %s", err.Error()) + return nil, fmt.Errorf("error in DB.Ping(): %s", err.Error()) } if err := db.setupDatabase(); err != nil { - return nil, fmt.Errorf("setupDatabase()! %s", err.Error()) + return nil, fmt.Errorf("error in setupDatabase(): %s", err.Error()) } - db.newTorrents = make(chan bittorrent.Metadata, 10) - return &db, nil } -// AddNewTorrent adds a new torrent to the *queue* to be flushed to the persistent database. -func (db *Database) AddNewTorrent(torrent bittorrent.Metadata) error { - for { - select { - case db.newTorrents <- torrent: - return nil - default: - // newTorrents queue was full: flush and try again and again (and again)... - err := db.flushNewTorrents() - if err != nil { - return err - } - continue + +func (db *Database) DoesExist(infoHash []byte) bool { + for _, torrent := range db.newTorrents { + if bytes.Equal(infoHash, torrent.InfoHash) { + return true; } } + + rows, err := db.database.Query("SELECT info_hash FROM torrents WHERE info_hash = ?;", infoHash) + if err != nil { + zap.L().Sugar().Fatalf("Could not query whether a torrent exists in the database! %s", err.Error()) + } + defer rows.Close() + + // If rows.Next() returns true, meaning that the torrent is in the database, return true; else + // return false. + return rows.Next() } -func (db *Database) flushNewTorrents() error { - tx, err := db.database.Begin() + +// AddNewTorrent adds a new torrent to the *queue* to be flushed to the persistent database. +func (db *Database) AddNewTorrent(torrent bittorrent.Metadata) error { + // Although we check whether the torrent exists in the database before asking MetadataSink to + // fetch its metadata, the torrent can also exists in the Sink before that. Now, if a torrent in + // the sink is still being fetched, that's still not a problem as we just add the new peer for + // the torrent and exit, but if the torrent is complete (i.e. its metadata) and if its waiting + // in the channel to be received, a race condition arises when we query the database and seeing + // that it doesn't exists there, add it to the sink. + // Hence check for the last time whether the torrent exists in the database, and only if not, + // add it. + if db.DoesExist(torrent.InfoHash) { + return nil; + } + + db.newTorrents = append(db.newTorrents, torrent) + + if len(db.newTorrents) >= 10 { + zap.L().Sugar().Debugf("newTorrents queue is full, attempting to commit %d torrents...", + len(db.newTorrents)) + if err := db.commitNewTorrents(); err != nil { + return err + } + } + + return nil +} + + +func (db *Database) commitNewTorrents() error { + tx, err := db.database.Begin() if err != nil { return fmt.Errorf("sql.DB.Begin()! %s", err.Error()) } var nTorrents, nFiles uint - for torrent := range db.newTorrents { - res, err := tx.Exec("INSERT INTO torrents (info_hash, name, total_size, discovered_on) VALUES ($1, $2, $3, $4);", + nTorrents = uint(len(db.newTorrents)) + for i, torrent := range db.newTorrents { + zap.L().Sugar().Debugf("Flushing torrent %d of %d: `%s` (%x)...", + i + 1, len(db.newTorrents), torrent.Name, torrent.InfoHash) + res, err := tx.Exec("INSERT INTO torrents (info_hash, name, total_size, discovered_on) VALUES (?, ?, ?, ?);", torrent.InfoHash, torrent.Name, torrent.TotalSize, torrent.DiscoveredOn) if err != nil { - ourError := fmt.Errorf("error while INSERTing INTO torrents! %s", err.Error()) + ourError := fmt.Errorf("error while INSERTing INTO torrent: %s", err.Error()) if err := tx.Rollback(); err != nil { return fmt.Errorf("%s\tmeanwhile, could not rollback the current transaction either! %s", ourError.Error(), err.Error()) } @@ -131,25 +152,27 @@ func (db *Database) flushNewTorrents() error { } for _, file := range torrent.Files { - _, err := tx.Exec("INSERT INTO files (torrent_id, size, path) VALUES($1, $2, $3);", - lastInsertId, file.Length, file.Path) + zap.L().Sugar().Debugf("Flushing file `%s` (of torrent %x)", path.Join(file.Path...), torrent.InfoHash) + _, err := tx.Exec("INSERT INTO files (torrent_id, size, path) VALUES(?, ?, ?);", + lastInsertId, file.Length, path.Join(file.Path...)) if err != nil { - ourError := fmt.Errorf("error while INSERTing INTO files! %s", err.Error()) + ourError := fmt.Errorf("error while INSERTing INTO files: %s", err.Error()) if err := tx.Rollback(); err != nil { return fmt.Errorf("%s\tmeanwhile, could not rollback the current transaction either! %s", ourError.Error(), err.Error()) } return ourError } - nFiles++ } - nTorrents++ + nFiles += uint(len(torrent.Files)) } - err = tx.Commit() - if err != nil { + if err = tx.Commit(); err != nil { return fmt.Errorf("sql.Tx.Commit()! %s", err.Error()) } + // Clear the queue + db.newTorrents = nil + zap.L().Sugar().Infof("%d torrents (%d files) are flushed to the database successfully.", nTorrents, nFiles) return nil