fixed database URL issue, and some other changes as well

This commit is contained in:
Bora M. Alper 2017-08-15 18:00:55 +03:00
parent 374ce0538a
commit cab372c504
4 changed files with 43 additions and 46 deletions

View File

@ -11,26 +11,30 @@ import (
func (ms *MetadataSink) awaitMetadata(infoHash metainfo.Hash, peer torrent.Peer) { func (ms *MetadataSink) awaitMetadata(infoHash metainfo.Hash, peer torrent.Peer) {
zap.L().Sugar().Debugf("awaiting %x...", infoHash[:])
t, isNew := ms.client.AddTorrentInfoHash(infoHash) t, isNew := ms.client.AddTorrentInfoHash(infoHash)
// If the infoHash we added was not new (i.e. it's already being downloaded by the client)
// then t is the handle of the (old) torrent. We add the (presumably new) peer to the torrent
// so we can increase the chance of operation being successful, or that the metadata might be
// fetched.
t.AddPeers([]torrent.Peer{peer}) t.AddPeers([]torrent.Peer{peer})
if !isNew { if !isNew {
// If the recently added torrent is not new, then quit as we do not want multiple // If the recently added torrent is not new, then quit as we do not want multiple
// awaitMetadata goroutines waiting on the same torrent. // awaitMetadata goroutines waiting on the same torrent.
return return
} else { } else {
// Drop the torrent once we got the metadata.
defer t.Drop() defer t.Drop()
} }
// Wait for the torrent client to receive the metadata for the torrent, meanwhile allowing // Wait for the torrent client to receive the metadata for the torrent, meanwhile allowing
// termination to be handled gracefully. // termination to be handled gracefully.
zap.L().Sugar().Debugf("awaiting %x...", infoHash[:])
select { select {
case <- ms.termination: case <- ms.termination:
return return
case <- t.GotInfo(): case <- t.GotInfo():
} }
zap.L().Sugar().Warnf("==== GOT INFO for %x", infoHash[:])
info := t.Info() info := t.Info()
var files []metainfo.FileInfo var files []metainfo.FileInfo
@ -38,6 +42,7 @@ func (ms *MetadataSink) awaitMetadata(infoHash metainfo.Hash, peer torrent.Peer)
if strings.ContainsRune(info.Name, '/') { if strings.ContainsRune(info.Name, '/') {
// A single file torrent cannot have any '/' characters in its name. We treat it as // A single file torrent cannot have any '/' characters in its name. We treat it as
// illegal. // illegal.
zap.L().Sugar().Debugf("!!!! illegal character in name! \"%s\"", info.Name)
return return
} }
files = []metainfo.FileInfo{{Length: info.Length, Path:[]string{info.Name}}} files = []metainfo.FileInfo{{Length: info.Length, Path:[]string{info.Name}}}
@ -52,6 +57,7 @@ func (ms *MetadataSink) awaitMetadata(infoHash metainfo.Hash, peer torrent.Peer)
if file.Length < 0 { if file.Length < 0 {
// All files' sizes must be greater than or equal to zero, otherwise treat them as // All files' sizes must be greater than or equal to zero, otherwise treat them as
// illegal and ignore. // illegal and ignore.
zap.L().Sugar().Debugf("!!!! file size zero or less! \"%s\" (%d)", file.Path, file.Length)
return return
} }
totalSize += uint64(file.Length) totalSize += uint64(file.Length)

View File

@ -39,7 +39,7 @@ func NewMetadataSink(laddr net.TCPAddr) *MetadataSink {
DisableTrackers: true, DisableTrackers: true,
DisablePEX: true, DisablePEX: true,
// TODO: Should we disable DHT to force the client to use the peers we supplied only, or not? // TODO: Should we disable DHT to force the client to use the peers we supplied only, or not?
NoDHT: false, NoDHT: true,
PreferNoEncryption: true, PreferNoEncryption: true,
}) })
@ -80,7 +80,7 @@ func (ms *MetadataSink) Terminate() {
func (ms *MetadataSink) flush(metadata Metadata) { func (ms *MetadataSink) flush(metadata Metadata) {
if ms.terminated { if !ms.terminated {
ms.drain <- metadata ms.drain <- metadata
} }
} }

View File

@ -136,8 +136,11 @@ func main() {
metadataSink.Sink(result) metadataSink.Sink(result)
case metadata := <-metadataSink.Drain(): case metadata := <-metadataSink.Drain():
logger.Sugar().Infof("D I S C O V E R E D: `%s` %x",
metadata.Name, metadata.InfoHash)
if err := database.AddNewTorrent(metadata); err != nil { if err := database.AddNewTorrent(metadata); err != nil {
logger.Sugar().Fatalf("Could not add new torrent %x to the database: %s", metadata.InfoHash, err.Error()) logger.Sugar().Fatalf("Could not add new torrent %x to the database: %s",
metadata.InfoHash, err.Error())
} }
case <-interrupt_chan: case <-interrupt_chan:

View File

@ -26,7 +26,7 @@ const (
type Database struct { type Database struct {
database *sql.DB database *sql.DB
engine engineType engine engineType
newTorrents chan bittorrent.Metadata newTorrents []bittorrent.Metadata
} }
@ -44,23 +44,11 @@ func NewDatabase(rawurl string) (*Database, error) {
switch dbURL.Scheme { switch dbURL.Scheme {
case "sqlite": case "sqlite":
db.engine = SQLITE db.engine = SQLITE
// All this pain is to make sure that an empty file exist (in case the database is not there
// yet) so that sql.Open won't fail.
dbDir, _ := path.Split(dbURL.Path) dbDir, _ := path.Split(dbURL.Path)
if err := os.MkdirAll(dbDir, 0755); err != nil { if err := os.MkdirAll(dbDir, 0755); err != nil {
return nil, fmt.Errorf("for directory `%s`: %s", dbDir, err.Error()) return nil, fmt.Errorf("for directory `%s`: %s", dbDir, err.Error())
} }
f, err := os.OpenFile(dbURL.Path, os.O_CREATE, 0666) db.database, err = sql.Open("sqlite3", dbURL.Path)
if err != nil {
return nil, fmt.Errorf("for file `%s`: %s", dbURL.Path, err.Error())
}
if err := f.Sync(); err != nil {
return nil, fmt.Errorf("for file `%s`: %s", dbURL.Path, err.Error())
}
if err := f.Close(); err != nil {
return nil, fmt.Errorf("for file `%s`: %s", dbURL.Path, err.Error())
}
db.database, err = sql.Open("sqlite3", dbURL.RawPath)
case "postgresql": case "postgresql":
db.engine = POSTGRESQL db.engine = POSTGRESQL
@ -72,53 +60,51 @@ func NewDatabase(rawurl string) (*Database, error) {
// Check for errors from sql.Open() // Check for errors from sql.Open()
if err != nil { if err != nil {
return nil, fmt.Errorf("sql.Open()! %s", err.Error()) return nil, fmt.Errorf("error in sql.Open(): %s", err.Error())
} }
if err = db.database.Ping(); err != nil { if err = db.database.Ping(); err != nil {
return nil, fmt.Errorf("DB.Ping()! %s", err.Error()) return nil, fmt.Errorf("error in DB.Ping(): %s", err.Error())
} }
if err := db.setupDatabase(); err != nil { if err := db.setupDatabase(); err != nil {
return nil, fmt.Errorf("setupDatabase()! %s", err.Error()) return nil, fmt.Errorf("error in setupDatabase(): %s", err.Error())
} }
db.newTorrents = make(chan bittorrent.Metadata, 10)
return &db, nil return &db, nil
} }
// AddNewTorrent adds a new torrent to the *queue* to be flushed to the persistent database. // AddNewTorrent adds a new torrent to the *queue* to be flushed to the persistent database.
func (db *Database) AddNewTorrent(torrent bittorrent.Metadata) error { func (db *Database) AddNewTorrent(torrent bittorrent.Metadata) error {
for { if len(db.newTorrents) >= 1 {
select { zap.L().Sugar().Debugf("newTorrents queue is full, attempting to commit %d torrents...",
case db.newTorrents <- torrent: len(db.newTorrents))
return nil if err := db.commitNewTorrents(); err != nil {
default: return err
// newTorrents queue was full: flush and try again and again (and again)...
err := db.flushNewTorrents()
if err != nil {
return err
}
continue
} }
} }
db.newTorrents = append(db.newTorrents, torrent)
return nil
} }
func (db *Database) flushNewTorrents() error { func (db *Database) commitNewTorrents() error {
tx, err := db.database.Begin() tx, err := db.database.Begin()
if err != nil { if err != nil {
return fmt.Errorf("sql.DB.Begin()! %s", err.Error()) return fmt.Errorf("sql.DB.Begin()! %s", err.Error())
} }
var nTorrents, nFiles uint var nTorrents, nFiles uint
for torrent := range db.newTorrents { nTorrents = uint(len(db.newTorrents))
res, err := tx.Exec("INSERT INTO torrents (info_hash, name, total_size, discovered_on) VALUES ($1, $2, $3, $4);", for i, torrent := range db.newTorrents {
zap.L().Sugar().Debugf("Flushing torrent %d of %d: `%s` (%x)...",
i + 1, len(db.newTorrents), torrent.Name, torrent.InfoHash)
res, err := tx.Exec("INSERT INTO torrents (info_hash, name, total_size, discovered_on) VALUES (?, ?, ?, ?);",
torrent.InfoHash, torrent.Name, torrent.TotalSize, torrent.DiscoveredOn) torrent.InfoHash, torrent.Name, torrent.TotalSize, torrent.DiscoveredOn)
if err != nil { if err != nil {
ourError := fmt.Errorf("error while INSERTing INTO torrents! %s", err.Error()) ourError := fmt.Errorf("error while INSERTing INTO torrent: %s", err.Error())
if err := tx.Rollback(); err != nil { if err := tx.Rollback(); err != nil {
return fmt.Errorf("%s\tmeanwhile, could not rollback the current transaction either! %s", ourError.Error(), err.Error()) return fmt.Errorf("%s\tmeanwhile, could not rollback the current transaction either! %s", ourError.Error(), err.Error())
} }
@ -130,25 +116,27 @@ func (db *Database) flushNewTorrents() error {
} }
for _, file := range torrent.Files { for _, file := range torrent.Files {
_, err := tx.Exec("INSERT INTO files (torrent_id, size, path) VALUES($1, $2, $3);", zap.L().Sugar().Debugf("Flushing file `%s` (of torrent %x)", path.Join(file.Path...), torrent.InfoHash)
lastInsertId, file.Length, file.Path) _, err := tx.Exec("INSERT INTO files (torrent_id, size, path) VALUES(?, ?, ?);",
lastInsertId, file.Length, path.Join(file.Path...))
if err != nil { if err != nil {
ourError := fmt.Errorf("error while INSERTing INTO files! %s", err.Error()) ourError := fmt.Errorf("error while INSERTing INTO files: %s", err.Error())
if err := tx.Rollback(); err != nil { if err := tx.Rollback(); err != nil {
return fmt.Errorf("%s\tmeanwhile, could not rollback the current transaction either! %s", ourError.Error(), err.Error()) return fmt.Errorf("%s\tmeanwhile, could not rollback the current transaction either! %s", ourError.Error(), err.Error())
} }
return ourError return ourError
} }
nFiles++
} }
nTorrents++ nFiles += uint(len(torrent.Files))
} }
err = tx.Commit() if err = tx.Commit(); err != nil {
if err != nil {
return fmt.Errorf("sql.Tx.Commit()! %s", err.Error()) return fmt.Errorf("sql.Tx.Commit()! %s", err.Error())
} }
// Clear the queue
db.newTorrents = nil
zap.L().Sugar().Infof("%d torrents (%d files) are flushed to the database successfully.", zap.L().Sugar().Infof("%d torrents (%d files) are flushed to the database successfully.",
nTorrents, nFiles) nTorrents, nFiles)
return nil return nil