555 lines
14 KiB
Go
555 lines
14 KiB
Go
package persistence
|
|
|
|
import (
|
|
"database/sql"
|
|
"fmt"
|
|
"github.com/boramalper/magnetico/pkg/util"
|
|
"net/url"
|
|
"text/template"
|
|
"time"
|
|
"unicode/utf8"
|
|
|
|
_ "github.com/jackc/pgx/v4"
|
|
_ "github.com/jackc/pgx/v4/stdlib"
|
|
|
|
"github.com/pkg/errors"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
type postgresDatabase struct {
|
|
conn *sql.DB
|
|
schema string
|
|
}
|
|
|
|
func makePostgresDatabase(url_ *url.URL) (Database, error) {
|
|
db := new(postgresDatabase)
|
|
|
|
schema := url_.Query().Get("schema")
|
|
if schema == "" {
|
|
db.schema = "magneticod"
|
|
url_.Query().Set("search_path", "magneticod")
|
|
} else {
|
|
db.schema = schema
|
|
url_.Query().Set("search_path", schema)
|
|
}
|
|
url_.Query().Del("schema")
|
|
|
|
var err error
|
|
db.conn, err = sql.Open("pgx", url_.String())
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "sql.Open")
|
|
}
|
|
|
|
// > Open may just validate its arguments without creating a connection to the database. To
|
|
// > verify that the data source Name is valid, call Ping.
|
|
// https://golang.org/pkg/database/sql/#Open
|
|
if err = db.conn.Ping(); err != nil {
|
|
return nil, errors.Wrap(err, "sql.DB.Ping")
|
|
}
|
|
|
|
// https://github.com/mattn/go-sqlite3/issues/618
|
|
db.conn.SetConnMaxLifetime(0) // https://golang.org/pkg/database/sql/#DB.SetConnMaxLifetime
|
|
db.conn.SetMaxOpenConns(3)
|
|
db.conn.SetMaxIdleConns(3)
|
|
|
|
if err := db.setupDatabase(); err != nil {
|
|
return nil, errors.Wrap(err, "setupDatabase")
|
|
}
|
|
|
|
return db, nil
|
|
}
|
|
|
|
func (db *postgresDatabase) Engine() databaseEngine {
|
|
return Postgres
|
|
}
|
|
|
|
func (db *postgresDatabase) DoesTorrentExist(infoHash []byte) (bool, error) {
|
|
rows, err := db.conn.Query("SELECT 1 FROM torrents WHERE info_hash = $1;", infoHash)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
exists := rows.Next()
|
|
if rows.Err() != nil {
|
|
return false, err
|
|
}
|
|
|
|
return exists, nil
|
|
}
|
|
|
|
func (db *postgresDatabase) AddNewTorrent(infoHash []byte, name string, files []File) error {
|
|
if !utf8.ValidString(name) {
|
|
zap.L().Warn(
|
|
"Ignoring a torrent whose name is not UTF-8 compliant.",
|
|
util.HexField("infoHash", infoHash),
|
|
zap.Binary("name", []byte(name)),
|
|
)
|
|
|
|
return nil
|
|
}
|
|
|
|
tx, err := db.conn.Begin()
|
|
if err != nil {
|
|
return errors.Wrap(err, "conn.Begin")
|
|
}
|
|
// If everything goes as planned and no error occurs, we will commit the transaction before
|
|
// returning from the function so the tx.Rollback() call will fail, trying to rollback a
|
|
// committed transaction. BUT, if an error occurs, we'll get our transaction rollback'ed, which
|
|
// is nice.
|
|
defer tx.Rollback()
|
|
|
|
var totalSize uint64 = 0
|
|
for _, file := range files {
|
|
totalSize += uint64(file.Size)
|
|
}
|
|
|
|
// This is a workaround for a bug: the database will not accept total_size to be zero.
|
|
if totalSize == 0 {
|
|
zap.L().Debug("Ignoring a torrent whose total size is zero.")
|
|
return nil
|
|
}
|
|
|
|
if exist, err := db.DoesTorrentExist(infoHash); exist || err != nil {
|
|
return err
|
|
}
|
|
|
|
var lastInsertId int64
|
|
|
|
err = tx.QueryRow(`
|
|
INSERT INTO torrents (
|
|
info_hash,
|
|
name,
|
|
total_size,
|
|
discovered_on
|
|
) VALUES ($1, $2, $3, $4)
|
|
RETURNING id;
|
|
`, infoHash, name, totalSize, time.Now().Unix()).Scan(&lastInsertId)
|
|
if err != nil {
|
|
return errors.Wrap(err, "tx.QueryRow (INSERT INTO torrents)")
|
|
}
|
|
|
|
for _, file := range files {
|
|
if !utf8.ValidString(file.Path) {
|
|
zap.L().Warn(
|
|
"Ignoring a file whose path is not UTF-8 compliant.",
|
|
zap.Binary("path", []byte(file.Path)),
|
|
)
|
|
|
|
// Returning nil so deferred tx.Rollback() will be called and transaction will be canceled.
|
|
return nil
|
|
}
|
|
|
|
_, err = tx.Exec("INSERT INTO files (torrent_id, size, path) VALUES ($1, $2, $3);",
|
|
lastInsertId, file.Size, file.Path,
|
|
)
|
|
if err != nil {
|
|
return errors.Wrap(err, "tx.Exec (INSERT INTO files)")
|
|
}
|
|
}
|
|
|
|
err = tx.Commit()
|
|
if err != nil {
|
|
return errors.Wrap(err, "tx.Commit")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (db *postgresDatabase) Close() error {
|
|
return db.conn.Close()
|
|
}
|
|
|
|
func (db *postgresDatabase) GetNumberOfTorrents() (uint, error) {
|
|
// Using estimated number of rows which can make queries much faster
|
|
// https://www.postgresql.org/message-id/568BF820.9060101%40comarch.com
|
|
// https://wiki.postgresql.org/wiki/Count_estimate
|
|
rows, err := db.conn.Query(
|
|
"SELECT reltuples::BIGINT AS estimate_count FROM pg_class WHERE relname='torrents';",
|
|
)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
if !rows.Next() {
|
|
return 0, fmt.Errorf("no rows returned from `SELECT reltuples::BIGINT AS estimate_count`")
|
|
}
|
|
|
|
// Returns int64: https://godoc.org/github.com/lib/pq#hdr-Data_Types
|
|
var n *uint
|
|
if err = rows.Scan(&n); err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
// If the database is empty (i.e. 0 entries in 'torrents') then the query will return nil.
|
|
if n == nil {
|
|
return 0, nil
|
|
} else {
|
|
return *n, nil
|
|
}
|
|
}
|
|
|
|
func (db *postgresDatabase) QueryTorrents(
|
|
query string,
|
|
epoch int64,
|
|
orderBy OrderingCriteria,
|
|
ascending bool,
|
|
limit uint,
|
|
lastOrderedValue *float64,
|
|
lastID *uint64,
|
|
) ([]TorrentMetadata, error) {
|
|
if query == "" && orderBy == ByRelevance {
|
|
return nil, fmt.Errorf("torrents cannot be ordered by relevance when the query is empty")
|
|
}
|
|
if (lastOrderedValue == nil) != (lastID == nil) {
|
|
return nil, fmt.Errorf("lastOrderedValue and lastID should be supplied together, if supplied")
|
|
}
|
|
|
|
firstPage := lastID == nil
|
|
|
|
// executeTemplate is used to prepare the SQL query, WITH PLACEHOLDERS FOR USER INPUT.
|
|
sqlQuery := executeTemplate(`
|
|
SELECT id
|
|
, info_hash
|
|
, name
|
|
, total_size
|
|
, discovered_on
|
|
, (SELECT COUNT(*) FROM files WHERE torrents.id = files.torrent_id) AS n_files
|
|
{{ if .QueryExists }}
|
|
, similarity(name, '{{ .Query }}') * -1 as relevance
|
|
{{ else }}
|
|
, 0
|
|
{{ end }}
|
|
FROM torrents
|
|
{{ if not .FirstPage }}
|
|
WHERE
|
|
{{ if not .QueryExists }}
|
|
{{.OrderOn}}
|
|
{{ else }}
|
|
similarity(name, '{{ .Query }}') * -1
|
|
{{ end }}
|
|
{{GTEorLTE .Ascending}} {{.LastOrderedValue}}
|
|
{{ if .QueryExists }}
|
|
AND
|
|
{{ end }}
|
|
{{ end }}
|
|
{{ if and .QueryExists .FirstPage }} WHERE {{ end }}
|
|
{{ if .QueryExists }}
|
|
to_tsvector(regexp_replace(name, '\W+', ' ', 'g')) @@ websearch_to_tsquery('{{ .Query }}')
|
|
{{ end }}
|
|
ORDER BY {{.OrderOn}} {{AscOrDesc .Ascending}}
|
|
LIMIT {{.Limit}};
|
|
`, struct {
|
|
FirstPage bool
|
|
OrderOn string
|
|
Ascending bool
|
|
Limit uint
|
|
LastOrderedValue *float64
|
|
QueryExists bool
|
|
Query string
|
|
}{
|
|
FirstPage: firstPage,
|
|
OrderOn: orderOnPostgreSQL(orderBy),
|
|
Ascending: ascending,
|
|
Limit: limit,
|
|
LastOrderedValue: lastOrderedValue,
|
|
QueryExists: query != "",
|
|
Query: query,
|
|
}, template.FuncMap{
|
|
"GTEorLTE": func(ascending bool) string {
|
|
if ascending {
|
|
return ">"
|
|
} else {
|
|
return "<"
|
|
}
|
|
},
|
|
"AscOrDesc": func(ascending bool) string {
|
|
if ascending {
|
|
return "ASC"
|
|
} else {
|
|
return "DESC"
|
|
}
|
|
},
|
|
})
|
|
|
|
rows, err := db.conn.Query(sqlQuery)
|
|
defer db.closeRows(rows)
|
|
if err != nil {
|
|
zap.L().Error(fmt.Sprint(err))
|
|
return nil, errors.Wrap(err, "query error")
|
|
}
|
|
|
|
torrents := make([]TorrentMetadata, 0)
|
|
for rows.Next() {
|
|
var torrent TorrentMetadata
|
|
err = rows.Scan(
|
|
&torrent.ID,
|
|
&torrent.InfoHash,
|
|
&torrent.Name,
|
|
&torrent.Size,
|
|
&torrent.DiscoveredOn,
|
|
&torrent.NFiles,
|
|
&torrent.Relevance,
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
torrents = append(torrents, torrent)
|
|
}
|
|
|
|
return torrents, nil
|
|
}
|
|
|
|
func orderOnPostgreSQL(orderBy OrderingCriteria) string {
|
|
switch orderBy {
|
|
case ByRelevance:
|
|
return "relevance"
|
|
|
|
case ByTotalSize:
|
|
return "total_size"
|
|
|
|
case ByDiscoveredOn:
|
|
return "discovered_on"
|
|
|
|
case ByNFiles:
|
|
return "n_files"
|
|
|
|
default:
|
|
panic(fmt.Sprintf("unknown orderBy: %v", orderBy))
|
|
}
|
|
}
|
|
|
|
func (db *postgresDatabase) GetTorrent(infoHash []byte) (*TorrentMetadata, error) {
|
|
rows, err := db.conn.Query(`
|
|
SELECT
|
|
t.info_hash,
|
|
t.name,
|
|
t.total_size,
|
|
t.discovered_on,
|
|
(SELECT COUNT(*) FROM files f WHERE f.torrent_id = t.id) AS n_files
|
|
FROM torrents t
|
|
WHERE t.info_hash = $1;`,
|
|
infoHash,
|
|
)
|
|
defer db.closeRows(rows)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if !rows.Next() {
|
|
return nil, nil
|
|
}
|
|
|
|
var tm TorrentMetadata
|
|
if err = rows.Scan(&tm.InfoHash, &tm.Name, &tm.Size, &tm.DiscoveredOn, &tm.NFiles); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &tm, nil
|
|
}
|
|
|
|
func (db *postgresDatabase) GetFiles(infoHash []byte) ([]File, error) {
|
|
rows, err := db.conn.Query(`
|
|
SELECT
|
|
f.size,
|
|
f.path
|
|
FROM files f, torrents t
|
|
WHERE f.torrent_id = t.id AND t.info_hash = $1;`,
|
|
infoHash,
|
|
)
|
|
defer db.closeRows(rows)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var files []File
|
|
for rows.Next() {
|
|
var file File
|
|
if err = rows.Scan(&file.Size, &file.Path); err != nil {
|
|
return nil, err
|
|
}
|
|
files = append(files, file)
|
|
}
|
|
|
|
return files, nil
|
|
}
|
|
|
|
func (db *postgresDatabase) GetStatistics(from string, n uint) (*Statistics, error) {
|
|
fromTime, gran, err := ParseISO8601(from)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "parsing ISO8601 error")
|
|
}
|
|
|
|
var toTime time.Time
|
|
var timeF string // time format: https://www.postgresql.org/docs/current/functions-datetime.html#FUNCTIONS-DATETIME-TRUNC
|
|
|
|
switch gran {
|
|
case Year:
|
|
toTime = fromTime.AddDate(int(n), 0, 0)
|
|
timeF = "year"
|
|
case Month:
|
|
toTime = fromTime.AddDate(0, int(n), 0)
|
|
timeF = "month"
|
|
case Week:
|
|
toTime = fromTime.AddDate(0, 0, int(n)*7)
|
|
timeF = "week"
|
|
case Day:
|
|
toTime = fromTime.AddDate(0, 0, int(n))
|
|
timeF = "day"
|
|
case Hour:
|
|
toTime = fromTime.Add(time.Duration(n) * time.Hour)
|
|
timeF = "hour"
|
|
}
|
|
|
|
// TODO: make it faster!
|
|
rows, err := db.conn.Query(fmt.Sprintf(`
|
|
SELECT date_trunc('%s', to_timestamp(discovered_on)) AS dT
|
|
, sum(files.size) AS tS
|
|
, count(DISTINCT torrents.id) AS nD
|
|
, count(DISTINCT files.id) AS nF
|
|
FROM torrents, files
|
|
WHERE torrents.id = files.torrent_id
|
|
AND discovered_on >= $1
|
|
AND discovered_on <= $2
|
|
GROUP BY dt;`, timeF),
|
|
fromTime.Unix(), toTime.Unix())
|
|
defer closeRows(rows)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
stats := NewStatistics()
|
|
|
|
for rows.Next() {
|
|
var dT string
|
|
var tS, nD, nF uint64
|
|
if err := rows.Scan(&dT, &tS, &nD, &nF); err != nil {
|
|
if err := rows.Close(); err != nil {
|
|
panic(err.Error())
|
|
}
|
|
return nil, err
|
|
}
|
|
stats.NDiscovered[dT] = nD
|
|
stats.TotalSize[dT] = tS
|
|
stats.NFiles[dT] = nF
|
|
}
|
|
|
|
return stats, nil
|
|
}
|
|
|
|
func (db *postgresDatabase) setupDatabase() error {
|
|
tx, err := db.conn.Begin()
|
|
if err != nil {
|
|
return errors.Wrap(err, "sql.DB.Begin")
|
|
}
|
|
|
|
defer tx.Rollback()
|
|
|
|
rows, err := db.conn.Query("SELECT 1 FROM pg_extension WHERE extname = 'pg_trgm';")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer db.closeRows(rows)
|
|
|
|
trgmInstalled := rows.Next()
|
|
if rows.Err() != nil {
|
|
return err
|
|
}
|
|
|
|
if !trgmInstalled {
|
|
return fmt.Errorf(
|
|
"pg_trgm extension is not enabled. You need to execute 'CREATE EXTENSION pg_trgm' on this database",
|
|
)
|
|
}
|
|
|
|
// Initial Setup for schema version 0:
|
|
// FROZEN.
|
|
_, err = tx.Exec(`
|
|
CREATE SCHEMA IF NOT EXISTS ` + db.schema + `;
|
|
|
|
-- Torrents ID sequence generator
|
|
CREATE SEQUENCE IF NOT EXISTS seq_torrents_id;
|
|
-- Files ID sequence generator
|
|
CREATE SEQUENCE IF NOT EXISTS seq_files_id;
|
|
|
|
CREATE TABLE IF NOT EXISTS torrents (
|
|
id INTEGER PRIMARY KEY DEFAULT nextval('seq_torrents_id'),
|
|
info_hash bytea NOT NULL UNIQUE,
|
|
name TEXT NOT NULL,
|
|
total_size BIGINT NOT NULL CHECK(total_size > 0),
|
|
discovered_on INTEGER NOT NULL CHECK(discovered_on > 0)
|
|
);
|
|
|
|
-- Indexes for search sorting options
|
|
CREATE INDEX IF NOT EXISTS idx_torrents_total_size ON torrents (total_size);
|
|
CREATE INDEX IF NOT EXISTS idx_torrents_discovered_on ON torrents (discovered_on);
|
|
|
|
-- Using pg_trgm GIN index for fast ILIKE queries
|
|
-- You need to execute "CREATE EXTENSION pg_trgm" on your database for this index to work
|
|
-- Be aware that using this type of index implies that making ILIKE queries with less that
|
|
-- 3 character values will cause full table scan instead of using index.
|
|
-- You can try to avoid that by doing 'SET enable_seqscan=off'.
|
|
CREATE INDEX IF NOT EXISTS idx_torrents_name_gin_trgm ON torrents USING GIN (name gin_trgm_ops);
|
|
|
|
CREATE TABLE IF NOT EXISTS files (
|
|
id INTEGER PRIMARY KEY DEFAULT nextval('seq_files_id'),
|
|
torrent_id INTEGER REFERENCES torrents ON DELETE CASCADE ON UPDATE RESTRICT,
|
|
size BIGINT NOT NULL,
|
|
path TEXT NOT NULL
|
|
);
|
|
|
|
CREATE INDEX IF NOT EXISTS idx_files_torrent_id ON files (torrent_id);
|
|
|
|
CREATE TABLE IF NOT EXISTS migrations (
|
|
schema_version SMALLINT NOT NULL UNIQUE
|
|
);
|
|
|
|
INSERT INTO migrations (schema_version) VALUES (0) ON CONFLICT DO NOTHING;
|
|
`)
|
|
if err != nil {
|
|
return errors.Wrap(err, "sql.Tx.Exec (v0)")
|
|
}
|
|
|
|
// Get current schema version
|
|
rows, err = tx.Query("SELECT MAX(schema_version) FROM migrations;")
|
|
if err != nil {
|
|
return errors.Wrap(err, "sql.Tx.Query (SELECT MAX(version) FROM migrations)")
|
|
}
|
|
defer db.closeRows(rows)
|
|
|
|
var schemaVersion int
|
|
if !rows.Next() {
|
|
return fmt.Errorf("sql.Rows.Next (SELECT MAX(version) FROM migrations): Query did not return any rows")
|
|
}
|
|
if err = rows.Scan(&schemaVersion); err != nil {
|
|
return errors.Wrap(err, "sql.Rows.Scan (MAX(version))")
|
|
}
|
|
// If next line is removed we're getting error on sql.Tx.Commit: unexpected command tag SELECT
|
|
// https://stackoverflow.com/questions/36295883/golang-postgres-commit-unknown-command-error/36866993#36866993
|
|
db.closeRows(rows)
|
|
|
|
// Uncomment for future migrations:
|
|
//switch schemaVersion {
|
|
//case 0: // FROZEN.
|
|
// zap.L().Warn("Updating (fake) database schema from 0 to 1...")
|
|
// _, err = tx.Exec(`INSERT INTO migrations (schema_version) VALUES (1);`)
|
|
// if err != nil {
|
|
// return errors.Wrap(err, "sql.Tx.Exec (v0 -> v1)")
|
|
// }
|
|
// //fallthrough
|
|
//}
|
|
|
|
if err = tx.Commit(); err != nil {
|
|
return errors.Wrap(err, "sql.Tx.Commit")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (db *postgresDatabase) closeRows(rows *sql.Rows) {
|
|
if err := rows.Close(); err != nil {
|
|
zap.L().Error("could not close row", zap.Error(err))
|
|
}
|
|
}
|