Merge pull request #214 from skobkin/feature_postgresql_support
[Partial] PostgreSQL support for magneticod
This commit is contained in:
commit
0a497e8fea
@ -36,7 +36,11 @@ the repository `boramalper/magnetico`. Images are tagged as `d-vMAJOR.MINOR.PATC
|
||||
## Usage
|
||||
### Database
|
||||
**magneticod** is designed to be able to use different database engines to store its data, but
|
||||
currently only SQLite 3 is supported. The database file can be found in:
|
||||
currently only SQLite 3 and PostgreSQL 9+ are supported.
|
||||
|
||||
#### SQLite
|
||||
|
||||
The database file can be found in:
|
||||
|
||||
- **On Linux**
|
||||
|
||||
@ -45,6 +49,10 @@ currently only SQLite 3 is supported. The database file can be found in:
|
||||
**magneticod** uses write-ahead logging (WAL) for its database, so there might be multiple
|
||||
files while it is operating, but ``database.sqlite3`` is *the database*.
|
||||
|
||||
#### More engines (PostgreSQL and others)
|
||||
|
||||
You can read about other supported persistence engines [here](pkg/README.md).
|
||||
|
||||
### Using the Docker Image
|
||||
You need to mount
|
||||
|
||||
|
10
go.mod
10
go.mod
@ -11,7 +11,7 @@ require (
|
||||
github.com/google/pprof v0.0.0-20190515194954-54271f7e092f // indirect
|
||||
github.com/gorilla/mux v1.7.4
|
||||
github.com/gorilla/schema v1.1.0
|
||||
github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6 // indirect
|
||||
github.com/jackc/pgx/v4 v4.9.2
|
||||
github.com/jessevdk/go-flags v1.4.0
|
||||
github.com/kevinburke/go-bindata v3.16.0+incompatible // indirect
|
||||
github.com/libp2p/go-sockaddr v0.0.1
|
||||
@ -19,14 +19,12 @@ require (
|
||||
github.com/pkg/errors v0.9.1
|
||||
github.com/pkg/profile v1.4.0
|
||||
github.com/willf/bloom v2.0.3+incompatible
|
||||
go.uber.org/atomic v1.5.1 // indirect
|
||||
go.uber.org/multierr v1.4.0 // indirect
|
||||
go.uber.org/zap v1.14.0
|
||||
golang.org/x/crypto v0.0.0-20200221231518-2aa609cf4a9d
|
||||
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9
|
||||
golang.org/x/lint v0.0.0-20200130185559-910be7a94367 // indirect
|
||||
golang.org/x/net v0.0.0-20200222125558-5a598a2470a0 // indirect
|
||||
golang.org/x/sys v0.0.0-20200219091948-cb0a6d8edb6c
|
||||
golang.org/x/text v0.3.2
|
||||
golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae
|
||||
golang.org/x/text v0.3.3
|
||||
golang.org/x/tools v0.0.0-20200221224223-e1da425f72fd // indirect
|
||||
honnef.co/go/tools v0.0.1-2020.1.3 // indirect
|
||||
)
|
||||
|
@ -2,10 +2,33 @@
|
||||
[![GoDoc](https://godoc.org/github.com/boramalper/magnetico?status.svg)](https://godoc.org/github.com/boramalper/magnetico)
|
||||
|
||||
- The most significant package is `persistence`, that abstracts access to the
|
||||
magnetico databases with different engines (currently, only SQLite).
|
||||
magnetico databases with different engines (currently, SQLite, stdout and partly PostgreSQL).
|
||||
|
||||
**For REST-ful magneticow API, see [https://app.swaggerhub.com/apis/boramalper/magneticow-api/](https://app.swaggerhub.com/apis/boramalper/magneticow-api/).**
|
||||
|
||||
## PostgreSQL database engine (only `magneticod` part implemented)
|
||||
|
||||
PostgreSQL database engine uses [PostgreSQL](https://www.postgresql.org/) to store indexed
|
||||
torrents. It's more performant and flexible than SQLite but requires additional software configuration.
|
||||
|
||||
**WARNING:** `pg_trgm` extension required. You need to enable it for your database before starting `magneticod`:
|
||||
|
||||
```postgresql
|
||||
CREATE EXTENSION pg_trgm;
|
||||
```
|
||||
|
||||
Engine usage example:
|
||||
|
||||
```shell
|
||||
magneticod --database=postgres://username:password@127.0.0.1:5432/database?schema=custom_schema_name&sslmode=disable
|
||||
```
|
||||
|
||||
See [pgx repository](https://github.com/jackc/pgx/blob/master/stdlib/sql.go)
|
||||
for more examples.
|
||||
|
||||
Optional parameter `schema` was added to choose which schema will be used to store magnetico tables,
|
||||
sequences and indexes.
|
||||
|
||||
## Stdout Dummy Database Engine for magneticod
|
||||
|
||||
Stdout dummy database engine for **magneticod** prints a new [JSON Line](http://jsonlines.org/)
|
||||
|
@ -59,7 +59,8 @@ const (
|
||||
type databaseEngine uint8
|
||||
|
||||
const (
|
||||
Sqlite3 databaseEngine = 1
|
||||
Sqlite3 databaseEngine = 1
|
||||
Postgres databaseEngine = 2
|
||||
Stdout
|
||||
)
|
||||
|
||||
@ -113,12 +114,12 @@ func MakeDatabase(rawURL string, logger *zap.Logger) (Database, error) {
|
||||
case "sqlite3":
|
||||
return makeSqlite3Database(url_)
|
||||
|
||||
case "postgres":
|
||||
return makePostgresDatabase(url_)
|
||||
|
||||
case "stdout":
|
||||
return makeStdoutDatabase(url_)
|
||||
|
||||
case "postgresql":
|
||||
return nil, fmt.Errorf("postgresql is not yet supported")
|
||||
|
||||
case "mysql":
|
||||
return nil, fmt.Errorf("mysql is not yet supported")
|
||||
|
||||
|
378
pkg/persistence/postgres.go
Normal file
378
pkg/persistence/postgres.go
Normal file
@ -0,0 +1,378 @@
|
||||
package persistence
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"net/url"
|
||||
"time"
|
||||
"unicode/utf8"
|
||||
|
||||
_ "github.com/jackc/pgx/v4"
|
||||
_ "github.com/jackc/pgx/v4/stdlib"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type postgresDatabase struct {
|
||||
conn *sql.DB
|
||||
schema string
|
||||
}
|
||||
|
||||
func makePostgresDatabase(url_ *url.URL) (Database, error) {
|
||||
db := new(postgresDatabase)
|
||||
|
||||
schema := url_.Query().Get("schema")
|
||||
if schema == "" {
|
||||
db.schema = "magneticod"
|
||||
url_.Query().Set("search_path", "magneticod")
|
||||
} else {
|
||||
db.schema = schema
|
||||
url_.Query().Set("search_path", schema)
|
||||
}
|
||||
url_.Query().Del("schema")
|
||||
|
||||
var err error
|
||||
db.conn, err = sql.Open("pgx", url_.String())
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "sql.Open")
|
||||
}
|
||||
|
||||
// > Open may just validate its arguments without creating a connection to the database. To
|
||||
// > verify that the data source Name is valid, call Ping.
|
||||
// https://golang.org/pkg/database/sql/#Open
|
||||
if err = db.conn.Ping(); err != nil {
|
||||
return nil, errors.Wrap(err, "sql.DB.Ping")
|
||||
}
|
||||
|
||||
// https://github.com/mattn/go-sqlite3/issues/618
|
||||
db.conn.SetConnMaxLifetime(0) // https://golang.org/pkg/database/sql/#DB.SetConnMaxLifetime
|
||||
db.conn.SetMaxOpenConns(3)
|
||||
db.conn.SetMaxIdleConns(3)
|
||||
|
||||
if err := db.setupDatabase(); err != nil {
|
||||
return nil, errors.Wrap(err, "setupDatabase")
|
||||
}
|
||||
|
||||
return db, nil
|
||||
}
|
||||
|
||||
func (db *postgresDatabase) Engine() databaseEngine {
|
||||
return Postgres
|
||||
}
|
||||
|
||||
func (db *postgresDatabase) DoesTorrentExist(infoHash []byte) (bool, error) {
|
||||
rows, err := db.conn.Query("SELECT 1 FROM torrents WHERE info_hash = $1;", infoHash)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
exists := rows.Next()
|
||||
if rows.Err() != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
return exists, nil
|
||||
}
|
||||
|
||||
func (db *postgresDatabase) AddNewTorrent(infoHash []byte, name string, files []File) error {
|
||||
if !utf8.ValidString(name) {
|
||||
zap.L().Warn(
|
||||
"Ignoring a torrent whose name is not UTF-8 compliant.",
|
||||
zap.ByteString("infoHash", infoHash),
|
||||
zap.Binary("name", []byte(name)),
|
||||
)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
tx, err := db.conn.Begin()
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "conn.Begin")
|
||||
}
|
||||
// If everything goes as planned and no error occurs, we will commit the transaction before
|
||||
// returning from the function so the tx.Rollback() call will fail, trying to rollback a
|
||||
// committed transaction. BUT, if an error occurs, we'll get our transaction rollback'ed, which
|
||||
// is nice.
|
||||
defer tx.Rollback()
|
||||
|
||||
var totalSize uint64 = 0
|
||||
for _, file := range files {
|
||||
totalSize += uint64(file.Size)
|
||||
}
|
||||
|
||||
// This is a workaround for a bug: the database will not accept total_size to be zero.
|
||||
if totalSize == 0 {
|
||||
zap.L().Debug("Ignoring a torrent whose total size is zero.")
|
||||
return nil
|
||||
}
|
||||
|
||||
if exist, err := db.DoesTorrentExist(infoHash); exist || err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var lastInsertId int64
|
||||
|
||||
err = tx.QueryRow(`
|
||||
INSERT INTO torrents (
|
||||
info_hash,
|
||||
name,
|
||||
total_size,
|
||||
discovered_on
|
||||
) VALUES ($1, $2, $3, $4)
|
||||
RETURNING id;
|
||||
`, infoHash, name, totalSize, time.Now().Unix()).Scan(&lastInsertId)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "tx.QueryRow (INSERT INTO torrents)")
|
||||
}
|
||||
|
||||
for _, file := range files {
|
||||
if !utf8.ValidString(file.Path) {
|
||||
zap.L().Warn(
|
||||
"Ignoring a file whose path is not UTF-8 compliant.",
|
||||
zap.Binary("path", []byte(file.Path)),
|
||||
)
|
||||
|
||||
// Returning nil so deferred tx.Rollback() will be called and transaction will be canceled.
|
||||
return nil
|
||||
}
|
||||
|
||||
_, err = tx.Exec("INSERT INTO files (torrent_id, size, path) VALUES ($1, $2, $3);",
|
||||
lastInsertId, file.Size, file.Path,
|
||||
)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "tx.Exec (INSERT INTO files)")
|
||||
}
|
||||
}
|
||||
|
||||
err = tx.Commit()
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "tx.Commit")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (db *postgresDatabase) Close() error {
|
||||
return db.conn.Close()
|
||||
}
|
||||
|
||||
func (db *postgresDatabase) GetNumberOfTorrents() (uint, error) {
|
||||
// Using estimated number of rows which can make queries much faster
|
||||
// https://www.postgresql.org/message-id/568BF820.9060101%40comarch.com
|
||||
// https://wiki.postgresql.org/wiki/Count_estimate
|
||||
rows, err := db.conn.Query(
|
||||
"SELECT reltuples::BIGINT AS estimate_count FROM pg_class WHERE relname='torrents';",
|
||||
)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
if !rows.Next() {
|
||||
return 0, fmt.Errorf("no rows returned from `SELECT reltuples::BIGINT AS estimate_count`")
|
||||
}
|
||||
|
||||
// Returns int64: https://godoc.org/github.com/lib/pq#hdr-Data_Types
|
||||
var n *uint
|
||||
if err = rows.Scan(&n); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
// If the database is empty (i.e. 0 entries in 'torrents') then the query will return nil.
|
||||
if n == nil {
|
||||
return 0, nil
|
||||
} else {
|
||||
return *n, nil
|
||||
}
|
||||
}
|
||||
|
||||
func (db *postgresDatabase) QueryTorrents(
|
||||
query string,
|
||||
epoch int64,
|
||||
orderBy OrderingCriteria,
|
||||
ascending bool,
|
||||
limit uint,
|
||||
lastOrderedValue *float64,
|
||||
lastID *uint64,
|
||||
) ([]TorrentMetadata, error) {
|
||||
err := fmt.Errorf("QueryTorrents is not supported yet by PostgreSQL backend")
|
||||
|
||||
torrents := make([]TorrentMetadata, 0)
|
||||
|
||||
return torrents, err
|
||||
}
|
||||
|
||||
func (db *postgresDatabase) GetTorrent(infoHash []byte) (*TorrentMetadata, error) {
|
||||
rows, err := db.conn.Query(`
|
||||
SELECT
|
||||
t.info_hash,
|
||||
t.name,
|
||||
t.total_size,
|
||||
t.discovered_on,
|
||||
(SELECT COUNT(*) FROM files f WHERE f.torrent_id = t.id) AS n_files
|
||||
FROM torrents t
|
||||
WHERE t.info_hash = $1;`,
|
||||
infoHash,
|
||||
)
|
||||
defer db.closeRows(rows)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if !rows.Next() {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
var tm TorrentMetadata
|
||||
if err = rows.Scan(&tm.InfoHash, &tm.Name, &tm.Size, &tm.DiscoveredOn, &tm.NFiles); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &tm, nil
|
||||
}
|
||||
|
||||
func (db *postgresDatabase) GetFiles(infoHash []byte) ([]File, error) {
|
||||
rows, err := db.conn.Query(`
|
||||
SELECT
|
||||
f.size,
|
||||
f.path
|
||||
FROM files f, torrents t WHERE f.torrent_id = t.id AND t.info_hash = $1;`,
|
||||
infoHash,
|
||||
)
|
||||
defer db.closeRows(rows)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var files []File
|
||||
for rows.Next() {
|
||||
var file File
|
||||
if err = rows.Scan(&file.Size, &file.Path); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
files = append(files, file)
|
||||
}
|
||||
|
||||
return files, nil
|
||||
}
|
||||
|
||||
func (db *postgresDatabase) GetStatistics(from string, n uint) (*Statistics, error) {
|
||||
panic("Not implemented yet for PostgreSQL")
|
||||
}
|
||||
|
||||
func (db *postgresDatabase) setupDatabase() error {
|
||||
tx, err := db.conn.Begin()
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "sql.DB.Begin")
|
||||
}
|
||||
|
||||
defer tx.Rollback()
|
||||
|
||||
rows, err := db.conn.Query("SELECT 1 FROM pg_extension WHERE extname = 'pg_trgm';")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer db.closeRows(rows)
|
||||
|
||||
trgmInstalled := rows.Next()
|
||||
if rows.Err() != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if !trgmInstalled {
|
||||
return fmt.Errorf(
|
||||
"pg_trgm extension is not enabled. You need to execute 'CREATE EXTENSION pg_trgm' on this database",
|
||||
)
|
||||
}
|
||||
|
||||
// Initial Setup for schema version 0:
|
||||
// FROZEN.
|
||||
_, err = tx.Exec(`
|
||||
CREATE SCHEMA IF NOT EXISTS ` + db.schema + `;
|
||||
|
||||
-- Torrents ID sequence generator
|
||||
CREATE SEQUENCE IF NOT EXISTS seq_torrents_id;
|
||||
-- Files ID sequence generator
|
||||
CREATE SEQUENCE IF NOT EXISTS seq_files_id;
|
||||
|
||||
CREATE TABLE IF NOT EXISTS torrents (
|
||||
id INTEGER PRIMARY KEY DEFAULT nextval('seq_torrents_id'),
|
||||
info_hash bytea NOT NULL UNIQUE,
|
||||
name TEXT NOT NULL,
|
||||
total_size BIGINT NOT NULL CHECK(total_size > 0),
|
||||
discovered_on INTEGER NOT NULL CHECK(discovered_on > 0)
|
||||
);
|
||||
|
||||
-- Indexes for search sorting options
|
||||
CREATE INDEX IF NOT EXISTS idx_torrents_total_size ON torrents (total_size);
|
||||
CREATE INDEX IF NOT EXISTS idx_torrents_discovered_on ON torrents (discovered_on);
|
||||
|
||||
-- Using pg_trgm GIN index for fast ILIKE queries
|
||||
-- You need to execute "CREATE EXTENSION pg_trgm" on your database for this index to work
|
||||
-- Be aware that using this type of index implies that making ILIKE queries with less that
|
||||
-- 3 character values will cause full table scan instead of using index.
|
||||
-- You can try to avoid that by doing 'SET enable_seqscan=off'.
|
||||
CREATE INDEX IF NOT EXISTS idx_torrents_name_gin_trgm ON torrents USING GIN (name gin_trgm_ops);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS files (
|
||||
id INTEGER PRIMARY KEY DEFAULT nextval('seq_files_id'),
|
||||
torrent_id INTEGER REFERENCES torrents ON DELETE CASCADE ON UPDATE RESTRICT,
|
||||
size BIGINT NOT NULL,
|
||||
path TEXT NOT NULL
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_files_torrent_id ON files (torrent_id);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS migrations (
|
||||
schema_version SMALLINT NOT NULL UNIQUE
|
||||
);
|
||||
|
||||
INSERT INTO migrations (schema_version) VALUES (0) ON CONFLICT DO NOTHING;
|
||||
`)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "sql.Tx.Exec (v0)")
|
||||
}
|
||||
|
||||
// Get current schema version
|
||||
rows, err = tx.Query("SELECT MAX(schema_version) FROM migrations;")
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "sql.Tx.Query (SELECT MAX(version) FROM migrations)")
|
||||
}
|
||||
defer db.closeRows(rows)
|
||||
|
||||
var schemaVersion int
|
||||
if !rows.Next() {
|
||||
return fmt.Errorf("sql.Rows.Next (SELECT MAX(version) FROM migrations): Query did not return any rows")
|
||||
}
|
||||
if err = rows.Scan(&schemaVersion); err != nil {
|
||||
return errors.Wrap(err, "sql.Rows.Scan (MAX(version))")
|
||||
}
|
||||
// If next line is removed we're getting error on sql.Tx.Commit: unexpected command tag SELECT
|
||||
// https://stackoverflow.com/questions/36295883/golang-postgres-commit-unknown-command-error/36866993#36866993
|
||||
db.closeRows(rows)
|
||||
|
||||
// Uncomment for future migrations:
|
||||
//switch schemaVersion {
|
||||
//case 0: // FROZEN.
|
||||
// zap.L().Warn("Updating (fake) database schema from 0 to 1...")
|
||||
// _, err = tx.Exec(`INSERT INTO migrations (schema_version) VALUES (1);`)
|
||||
// if err != nil {
|
||||
// return errors.Wrap(err, "sql.Tx.Exec (v0 -> v1)")
|
||||
// }
|
||||
// //fallthrough
|
||||
//}
|
||||
|
||||
if err = tx.Commit(); err != nil {
|
||||
return errors.Wrap(err, "sql.Tx.Commit")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (db *postgresDatabase) closeRows(rows *sql.Rows) {
|
||||
if err := rows.Close(); err != nil {
|
||||
zap.L().Error("could not close row", zap.Error(err))
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user