Merge branch 'go-rewrite' into go-mysql
This commit is contained in:
commit
19b0adcc3d
@ -11,26 +11,30 @@ import (
|
|||||||
|
|
||||||
|
|
||||||
func (ms *MetadataSink) awaitMetadata(infoHash metainfo.Hash, peer torrent.Peer) {
|
func (ms *MetadataSink) awaitMetadata(infoHash metainfo.Hash, peer torrent.Peer) {
|
||||||
zap.L().Sugar().Debugf("awaiting %x...", infoHash[:])
|
|
||||||
t, isNew := ms.client.AddTorrentInfoHash(infoHash)
|
t, isNew := ms.client.AddTorrentInfoHash(infoHash)
|
||||||
|
// If the infoHash we added was not new (i.e. it's already being downloaded by the client)
|
||||||
|
// then t is the handle of the (old) torrent. We add the (presumably new) peer to the torrent
|
||||||
|
// so we can increase the chance of operation being successful, or that the metadata might be
|
||||||
|
// fetched.
|
||||||
t.AddPeers([]torrent.Peer{peer})
|
t.AddPeers([]torrent.Peer{peer})
|
||||||
if !isNew {
|
if !isNew {
|
||||||
// If the recently added torrent is not new, then quit as we do not want multiple
|
// If the recently added torrent is not new, then quit as we do not want multiple
|
||||||
// awaitMetadata goroutines waiting on the same torrent.
|
// awaitMetadata goroutines waiting on the same torrent.
|
||||||
return
|
return
|
||||||
} else {
|
} else {
|
||||||
|
// Drop the torrent once we got the metadata.
|
||||||
defer t.Drop()
|
defer t.Drop()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Wait for the torrent client to receive the metadata for the torrent, meanwhile allowing
|
// Wait for the torrent client to receive the metadata for the torrent, meanwhile allowing
|
||||||
// termination to be handled gracefully.
|
// termination to be handled gracefully.
|
||||||
|
zap.L().Sugar().Debugf("awaiting %x...", infoHash[:])
|
||||||
select {
|
select {
|
||||||
case <- ms.termination:
|
case <- ms.termination:
|
||||||
return
|
return
|
||||||
|
|
||||||
case <- t.GotInfo():
|
case <- t.GotInfo():
|
||||||
}
|
}
|
||||||
zap.L().Sugar().Warnf("==== GOT INFO for %x", infoHash[:])
|
|
||||||
|
|
||||||
info := t.Info()
|
info := t.Info()
|
||||||
var files []metainfo.FileInfo
|
var files []metainfo.FileInfo
|
||||||
@ -38,6 +42,7 @@ func (ms *MetadataSink) awaitMetadata(infoHash metainfo.Hash, peer torrent.Peer)
|
|||||||
if strings.ContainsRune(info.Name, '/') {
|
if strings.ContainsRune(info.Name, '/') {
|
||||||
// A single file torrent cannot have any '/' characters in its name. We treat it as
|
// A single file torrent cannot have any '/' characters in its name. We treat it as
|
||||||
// illegal.
|
// illegal.
|
||||||
|
zap.L().Sugar().Debugf("!!!! illegal character in name! \"%s\"", info.Name)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
files = []metainfo.FileInfo{{Length: info.Length, Path:[]string{info.Name}}}
|
files = []metainfo.FileInfo{{Length: info.Length, Path:[]string{info.Name}}}
|
||||||
@ -52,6 +57,7 @@ func (ms *MetadataSink) awaitMetadata(infoHash metainfo.Hash, peer torrent.Peer)
|
|||||||
if file.Length < 0 {
|
if file.Length < 0 {
|
||||||
// All files' sizes must be greater than or equal to zero, otherwise treat them as
|
// All files' sizes must be greater than or equal to zero, otherwise treat them as
|
||||||
// illegal and ignore.
|
// illegal and ignore.
|
||||||
|
zap.L().Sugar().Debugf("!!!! file size zero or less! \"%s\" (%d)", file.Path, file.Length)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
totalSize += uint64(file.Length)
|
totalSize += uint64(file.Length)
|
||||||
|
@ -39,7 +39,7 @@ func NewMetadataSink(laddr net.TCPAddr) *MetadataSink {
|
|||||||
DisableTrackers: true,
|
DisableTrackers: true,
|
||||||
DisablePEX: true,
|
DisablePEX: true,
|
||||||
// TODO: Should we disable DHT to force the client to use the peers we supplied only, or not?
|
// TODO: Should we disable DHT to force the client to use the peers we supplied only, or not?
|
||||||
NoDHT: false,
|
NoDHT: true,
|
||||||
PreferNoEncryption: true,
|
PreferNoEncryption: true,
|
||||||
|
|
||||||
})
|
})
|
||||||
@ -80,7 +80,7 @@ func (ms *MetadataSink) Terminate() {
|
|||||||
|
|
||||||
|
|
||||||
func (ms *MetadataSink) flush(metadata Metadata) {
|
func (ms *MetadataSink) flush(metadata Metadata) {
|
||||||
if ms.terminated {
|
if !ms.terminated {
|
||||||
ms.drain <- metadata
|
ms.drain <- metadata
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -4,15 +4,14 @@ import (
|
|||||||
"net"
|
"net"
|
||||||
"os"
|
"os"
|
||||||
"os/signal"
|
"os/signal"
|
||||||
|
"regexp"
|
||||||
|
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
"go.uber.org/zap/zapcore"
|
||||||
"github.com/jessevdk/go-flags"
|
"github.com/jessevdk/go-flags"
|
||||||
|
|
||||||
// "magneticod/bittorrent"
|
|
||||||
"magneticod/dht"
|
|
||||||
"go.uber.org/zap/zapcore"
|
|
||||||
"regexp"
|
|
||||||
"magneticod/bittorrent"
|
"magneticod/bittorrent"
|
||||||
|
"magneticod/dht"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@ -133,11 +132,16 @@ func main() {
|
|||||||
select {
|
select {
|
||||||
case result := <-trawlingManager.Output():
|
case result := <-trawlingManager.Output():
|
||||||
logger.Info("result: ", zap.String("hash", result.InfoHash.String()))
|
logger.Info("result: ", zap.String("hash", result.InfoHash.String()))
|
||||||
|
if !database.DoesExist(result.InfoHash[:]) {
|
||||||
metadataSink.Sink(result)
|
metadataSink.Sink(result)
|
||||||
|
}
|
||||||
|
|
||||||
case metadata := <-metadataSink.Drain():
|
case metadata := <-metadataSink.Drain():
|
||||||
|
logger.Sugar().Infof("D I S C O V E R E D: `%s` %x",
|
||||||
|
metadata.Name, metadata.InfoHash)
|
||||||
if err := database.AddNewTorrent(metadata); err != nil {
|
if err := database.AddNewTorrent(metadata); err != nil {
|
||||||
logger.Sugar().Fatalf("Could not add new torrent %x to the database: %s", metadata.InfoHash, err.Error())
|
logger.Sugar().Fatalf("Could not add new torrent %x to the database: %s",
|
||||||
|
metadata.InfoHash, err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
case <-interrupt_chan:
|
case <-interrupt_chan:
|
||||||
|
@ -13,6 +13,7 @@ import (
|
|||||||
|
|
||||||
"path"
|
"path"
|
||||||
"os"
|
"os"
|
||||||
|
"bytes"
|
||||||
)
|
)
|
||||||
|
|
||||||
type engineType uint8
|
type engineType uint8
|
||||||
@ -26,7 +27,7 @@ const (
|
|||||||
type Database struct {
|
type Database struct {
|
||||||
database *sql.DB
|
database *sql.DB
|
||||||
engine engineType
|
engine engineType
|
||||||
newTorrents chan bittorrent.Metadata
|
newTorrents [] bittorrent.Metadata
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewDatabase creates a new Database.
|
// NewDatabase creates a new Database.
|
||||||
@ -43,23 +44,11 @@ func NewDatabase(rawurl string) (*Database, error) {
|
|||||||
switch dbURL.Scheme {
|
switch dbURL.Scheme {
|
||||||
case "sqlite":
|
case "sqlite":
|
||||||
db.engine = SQLITE
|
db.engine = SQLITE
|
||||||
// All this pain is to make sure that an empty file exist (in case the database is not there
|
|
||||||
// yet) so that sql.Open won't fail.
|
|
||||||
dbDir, _ := path.Split(dbURL.Path)
|
dbDir, _ := path.Split(dbURL.Path)
|
||||||
if err := os.MkdirAll(dbDir, 0755); err != nil {
|
if err := os.MkdirAll(dbDir, 0755); err != nil {
|
||||||
return nil, fmt.Errorf("for directory `%s`: %s", dbDir, err.Error())
|
return nil, fmt.Errorf("for directory `%s`: %s", dbDir, err.Error())
|
||||||
}
|
}
|
||||||
f, err := os.OpenFile(dbURL.Path, os.O_CREATE, 0666)
|
db.database, err = sql.Open("sqlite3", dbURL.Path)
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("for file `%s`: %s", dbURL.Path, err.Error())
|
|
||||||
}
|
|
||||||
if err := f.Sync(); err != nil {
|
|
||||||
return nil, fmt.Errorf("for file `%s`: %s", dbURL.Path, err.Error())
|
|
||||||
}
|
|
||||||
if err := f.Close(); err != nil {
|
|
||||||
return nil, fmt.Errorf("for file `%s`: %s", dbURL.Path, err.Error())
|
|
||||||
}
|
|
||||||
db.database, err = sql.Open("sqlite3", dbURL.RawPath)
|
|
||||||
|
|
||||||
case "postgresql":
|
case "postgresql":
|
||||||
db.engine = POSTGRESQL
|
db.engine = POSTGRESQL
|
||||||
@ -75,51 +64,83 @@ func NewDatabase(rawurl string) (*Database, error) {
|
|||||||
|
|
||||||
// Check for errors from sql.Open()
|
// Check for errors from sql.Open()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("sql.Open()! %s", err.Error())
|
return nil, fmt.Errorf("error in sql.Open(): %s", err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = db.database.Ping(); err != nil {
|
if err = db.database.Ping(); err != nil {
|
||||||
return nil, fmt.Errorf("DB.Ping()! %s", err.Error())
|
return nil, fmt.Errorf("error in DB.Ping(): %s", err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := db.setupDatabase(); err != nil {
|
if err := db.setupDatabase(); err != nil {
|
||||||
return nil, fmt.Errorf("setupDatabase()! %s", err.Error())
|
return nil, fmt.Errorf("error in setupDatabase(): %s", err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
db.newTorrents = make(chan bittorrent.Metadata, 10)
|
|
||||||
|
|
||||||
return &db, nil
|
return &db, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// AddNewTorrent adds a new torrent to the *queue* to be flushed to the persistent database.
|
|
||||||
func (db *Database) AddNewTorrent(torrent bittorrent.Metadata) error {
|
func (db *Database) DoesExist(infoHash []byte) bool {
|
||||||
for {
|
for _, torrent := range db.newTorrents {
|
||||||
select {
|
if bytes.Equal(infoHash, torrent.InfoHash) {
|
||||||
case db.newTorrents <- torrent:
|
return true;
|
||||||
return nil
|
|
||||||
default:
|
|
||||||
// newTorrents queue was full: flush and try again and again (and again)...
|
|
||||||
err := db.flushNewTorrents()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *Database) flushNewTorrents() error {
|
rows, err := db.database.Query("SELECT info_hash FROM torrents WHERE info_hash = ?;", infoHash)
|
||||||
|
if err != nil {
|
||||||
|
zap.L().Sugar().Fatalf("Could not query whether a torrent exists in the database! %s", err.Error())
|
||||||
|
}
|
||||||
|
defer rows.Close()
|
||||||
|
|
||||||
|
// If rows.Next() returns true, meaning that the torrent is in the database, return true; else
|
||||||
|
// return false.
|
||||||
|
return rows.Next()
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
// AddNewTorrent adds a new torrent to the *queue* to be flushed to the persistent database.
|
||||||
|
func (db *Database) AddNewTorrent(torrent bittorrent.Metadata) error {
|
||||||
|
// Although we check whether the torrent exists in the database before asking MetadataSink to
|
||||||
|
// fetch its metadata, the torrent can also exists in the Sink before that. Now, if a torrent in
|
||||||
|
// the sink is still being fetched, that's still not a problem as we just add the new peer for
|
||||||
|
// the torrent and exit, but if the torrent is complete (i.e. its metadata) and if its waiting
|
||||||
|
// in the channel to be received, a race condition arises when we query the database and seeing
|
||||||
|
// that it doesn't exists there, add it to the sink.
|
||||||
|
// Hence check for the last time whether the torrent exists in the database, and only if not,
|
||||||
|
// add it.
|
||||||
|
if db.DoesExist(torrent.InfoHash) {
|
||||||
|
return nil;
|
||||||
|
}
|
||||||
|
|
||||||
|
db.newTorrents = append(db.newTorrents, torrent)
|
||||||
|
|
||||||
|
if len(db.newTorrents) >= 10 {
|
||||||
|
zap.L().Sugar().Debugf("newTorrents queue is full, attempting to commit %d torrents...",
|
||||||
|
len(db.newTorrents))
|
||||||
|
if err := db.commitNewTorrents(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
func (db *Database) commitNewTorrents() error {
|
||||||
tx, err := db.database.Begin()
|
tx, err := db.database.Begin()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("sql.DB.Begin()! %s", err.Error())
|
return fmt.Errorf("sql.DB.Begin()! %s", err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
var nTorrents, nFiles uint
|
var nTorrents, nFiles uint
|
||||||
for torrent := range db.newTorrents {
|
nTorrents = uint(len(db.newTorrents))
|
||||||
res, err := tx.Exec("INSERT INTO torrents (info_hash, name, total_size, discovered_on) VALUES ($1, $2, $3, $4);",
|
for i, torrent := range db.newTorrents {
|
||||||
|
zap.L().Sugar().Debugf("Flushing torrent %d of %d: `%s` (%x)...",
|
||||||
|
i + 1, len(db.newTorrents), torrent.Name, torrent.InfoHash)
|
||||||
|
res, err := tx.Exec("INSERT INTO torrents (info_hash, name, total_size, discovered_on) VALUES (?, ?, ?, ?);",
|
||||||
torrent.InfoHash, torrent.Name, torrent.TotalSize, torrent.DiscoveredOn)
|
torrent.InfoHash, torrent.Name, torrent.TotalSize, torrent.DiscoveredOn)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
ourError := fmt.Errorf("error while INSERTing INTO torrents! %s", err.Error())
|
ourError := fmt.Errorf("error while INSERTing INTO torrent: %s", err.Error())
|
||||||
if err := tx.Rollback(); err != nil {
|
if err := tx.Rollback(); err != nil {
|
||||||
return fmt.Errorf("%s\tmeanwhile, could not rollback the current transaction either! %s", ourError.Error(), err.Error())
|
return fmt.Errorf("%s\tmeanwhile, could not rollback the current transaction either! %s", ourError.Error(), err.Error())
|
||||||
}
|
}
|
||||||
@ -131,25 +152,27 @@ func (db *Database) flushNewTorrents() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for _, file := range torrent.Files {
|
for _, file := range torrent.Files {
|
||||||
_, err := tx.Exec("INSERT INTO files (torrent_id, size, path) VALUES($1, $2, $3);",
|
zap.L().Sugar().Debugf("Flushing file `%s` (of torrent %x)", path.Join(file.Path...), torrent.InfoHash)
|
||||||
lastInsertId, file.Length, file.Path)
|
_, err := tx.Exec("INSERT INTO files (torrent_id, size, path) VALUES(?, ?, ?);",
|
||||||
|
lastInsertId, file.Length, path.Join(file.Path...))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
ourError := fmt.Errorf("error while INSERTing INTO files! %s", err.Error())
|
ourError := fmt.Errorf("error while INSERTing INTO files: %s", err.Error())
|
||||||
if err := tx.Rollback(); err != nil {
|
if err := tx.Rollback(); err != nil {
|
||||||
return fmt.Errorf("%s\tmeanwhile, could not rollback the current transaction either! %s", ourError.Error(), err.Error())
|
return fmt.Errorf("%s\tmeanwhile, could not rollback the current transaction either! %s", ourError.Error(), err.Error())
|
||||||
}
|
}
|
||||||
return ourError
|
return ourError
|
||||||
}
|
}
|
||||||
nFiles++
|
|
||||||
}
|
}
|
||||||
nTorrents++
|
nFiles += uint(len(torrent.Files))
|
||||||
}
|
}
|
||||||
|
|
||||||
err = tx.Commit()
|
if err = tx.Commit(); err != nil {
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("sql.Tx.Commit()! %s", err.Error())
|
return fmt.Errorf("sql.Tx.Commit()! %s", err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Clear the queue
|
||||||
|
db.newTorrents = nil
|
||||||
|
|
||||||
zap.L().Sugar().Infof("%d torrents (%d files) are flushed to the database successfully.",
|
zap.L().Sugar().Infof("%d torrents (%d files) are flushed to the database successfully.",
|
||||||
nTorrents, nFiles)
|
nTorrents, nFiles)
|
||||||
return nil
|
return nil
|
||||||
|
Loading…
Reference in New Issue
Block a user