From ce393beb5eae11830f9bf78445522842f3cd45ff Mon Sep 17 00:00:00 2001 From: Alexey Skobkin Date: Sat, 30 Mar 2019 02:51:32 +0300 Subject: [PATCH] Implementing write-only part (magneticod) of the PostgreSQL back-end. --- cmd/magneticod/README.md | 10 +- go.mod | 10 +- pkg/README.md | 25 ++- pkg/persistence/interface.go | 9 +- pkg/persistence/postgres.go | 378 +++++++++++++++++++++++++++++++++++ 5 files changed, 420 insertions(+), 12 deletions(-) create mode 100644 pkg/persistence/postgres.go diff --git a/cmd/magneticod/README.md b/cmd/magneticod/README.md index 141f010..c423d40 100644 --- a/cmd/magneticod/README.md +++ b/cmd/magneticod/README.md @@ -36,7 +36,11 @@ the repository `boramalper/magnetico`. Images are tagged as `d-vMAJOR.MINOR.PATC ## Usage ### Database **magneticod** is designed to be able to use different database engines to store its data, but -currently only SQLite 3 is supported. The database file can be found in: +currently only SQLite 3 and PostgreSQL 9+ are supported. + +#### SQLite + +The database file can be found in: - **On Linux** @@ -45,6 +49,10 @@ currently only SQLite 3 is supported. The database file can be found in: **magneticod** uses write-ahead logging (WAL) for its database, so there might be multiple files while it is operating, but ``database.sqlite3`` is *the database*. +#### More engines (PostgreSQL and others) + +You can read about other supported persistence engines [here](pkg/README.md). + ### Using the Docker Image You need to mount diff --git a/go.mod b/go.mod index 7385d15..df5a46d 100644 --- a/go.mod +++ b/go.mod @@ -11,7 +11,7 @@ require ( github.com/google/pprof v0.0.0-20190515194954-54271f7e092f // indirect github.com/gorilla/mux v1.7.4 github.com/gorilla/schema v1.1.0 - github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6 // indirect + github.com/jackc/pgx/v4 v4.9.2 github.com/jessevdk/go-flags v1.4.0 github.com/kevinburke/go-bindata v3.16.0+incompatible // indirect github.com/libp2p/go-sockaddr v0.0.1 @@ -19,14 +19,12 @@ require ( github.com/pkg/errors v0.9.1 github.com/pkg/profile v1.4.0 github.com/willf/bloom v2.0.3+incompatible - go.uber.org/atomic v1.5.1 // indirect - go.uber.org/multierr v1.4.0 // indirect go.uber.org/zap v1.14.0 - golang.org/x/crypto v0.0.0-20200221231518-2aa609cf4a9d + golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 golang.org/x/lint v0.0.0-20200130185559-910be7a94367 // indirect golang.org/x/net v0.0.0-20200222125558-5a598a2470a0 // indirect - golang.org/x/sys v0.0.0-20200219091948-cb0a6d8edb6c - golang.org/x/text v0.3.2 + golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae + golang.org/x/text v0.3.3 golang.org/x/tools v0.0.0-20200221224223-e1da425f72fd // indirect honnef.co/go/tools v0.0.1-2020.1.3 // indirect ) diff --git a/pkg/README.md b/pkg/README.md index 30ea5ee..8746f4b 100644 --- a/pkg/README.md +++ b/pkg/README.md @@ -2,10 +2,33 @@ [![GoDoc](https://godoc.org/github.com/boramalper/magnetico?status.svg)](https://godoc.org/github.com/boramalper/magnetico) - The most significant package is `persistence`, that abstracts access to the - magnetico databases with different engines (currently, only SQLite). + magnetico databases with different engines (currently, SQLite, stdout and partly PostgreSQL). **For REST-ful magneticow API, see [https://app.swaggerhub.com/apis/boramalper/magneticow-api/](https://app.swaggerhub.com/apis/boramalper/magneticow-api/).** +## PostgreSQL database engine (only `magneticod` part implemented) + +PostgreSQL database engine uses [PostgreSQL](https://www.postgresql.org/) to store indexed +torrents. It's more performant and flexible than SQLite but requires additional software configuration. + +**WARNING:** `pg_trgm` extension required. You need to enable it for your database before starting `magneticod`: + +```postgresql +CREATE EXTENSION pg_trgm; +``` + +Engine usage example: + +```shell +magneticod --database=postgres://username:password@127.0.0.1:5432/database?schema=custom_schema_name&sslmode=disable +``` + +See [pgx repository](https://github.com/jackc/pgx/blob/master/stdlib/sql.go) +for more examples. + +Optional parameter `schema` was added to choose which schema will be used to store magnetico tables, +sequences and indexes. + ## Stdout Dummy Database Engine for magneticod Stdout dummy database engine for **magneticod** prints a new [JSON Line](http://jsonlines.org/) diff --git a/pkg/persistence/interface.go b/pkg/persistence/interface.go index 1225ce1..26fd623 100644 --- a/pkg/persistence/interface.go +++ b/pkg/persistence/interface.go @@ -59,7 +59,8 @@ const ( type databaseEngine uint8 const ( - Sqlite3 databaseEngine = 1 + Sqlite3 databaseEngine = 1 + Postgres databaseEngine = 2 Stdout ) @@ -113,12 +114,12 @@ func MakeDatabase(rawURL string, logger *zap.Logger) (Database, error) { case "sqlite3": return makeSqlite3Database(url_) + case "postgres": + return makePostgresDatabase(url_) + case "stdout": return makeStdoutDatabase(url_) - case "postgresql": - return nil, fmt.Errorf("postgresql is not yet supported") - case "mysql": return nil, fmt.Errorf("mysql is not yet supported") diff --git a/pkg/persistence/postgres.go b/pkg/persistence/postgres.go new file mode 100644 index 0000000..edfdd71 --- /dev/null +++ b/pkg/persistence/postgres.go @@ -0,0 +1,378 @@ +package persistence + +import ( + "database/sql" + "fmt" + "net/url" + "time" + "unicode/utf8" + + _ "github.com/jackc/pgx/v4" + _ "github.com/jackc/pgx/v4/stdlib" + + "github.com/pkg/errors" + "go.uber.org/zap" +) + +type postgresDatabase struct { + conn *sql.DB + schema string +} + +func makePostgresDatabase(url_ *url.URL) (Database, error) { + db := new(postgresDatabase) + + schema := url_.Query().Get("schema") + if schema == "" { + db.schema = "magneticod" + url_.Query().Set("search_path", "magneticod") + } else { + db.schema = schema + url_.Query().Set("search_path", schema) + } + url_.Query().Del("schema") + + var err error + db.conn, err = sql.Open("pgx", url_.String()) + if err != nil { + return nil, errors.Wrap(err, "sql.Open") + } + + // > Open may just validate its arguments without creating a connection to the database. To + // > verify that the data source Name is valid, call Ping. + // https://golang.org/pkg/database/sql/#Open + if err = db.conn.Ping(); err != nil { + return nil, errors.Wrap(err, "sql.DB.Ping") + } + + // https://github.com/mattn/go-sqlite3/issues/618 + db.conn.SetConnMaxLifetime(0) // https://golang.org/pkg/database/sql/#DB.SetConnMaxLifetime + db.conn.SetMaxOpenConns(3) + db.conn.SetMaxIdleConns(3) + + if err := db.setupDatabase(); err != nil { + return nil, errors.Wrap(err, "setupDatabase") + } + + return db, nil +} + +func (db *postgresDatabase) Engine() databaseEngine { + return Postgres +} + +func (db *postgresDatabase) DoesTorrentExist(infoHash []byte) (bool, error) { + rows, err := db.conn.Query("SELECT 1 FROM torrents WHERE info_hash = $1;", infoHash) + if err != nil { + return false, err + } + defer rows.Close() + + exists := rows.Next() + if rows.Err() != nil { + return false, err + } + + return exists, nil +} + +func (db *postgresDatabase) AddNewTorrent(infoHash []byte, name string, files []File) error { + if !utf8.ValidString(name) { + zap.L().Warn( + "Ignoring a torrent whose name is not UTF-8 compliant.", + zap.ByteString("infoHash", infoHash), + zap.Binary("name", []byte(name)), + ) + + return nil + } + + tx, err := db.conn.Begin() + if err != nil { + return errors.Wrap(err, "conn.Begin") + } + // If everything goes as planned and no error occurs, we will commit the transaction before + // returning from the function so the tx.Rollback() call will fail, trying to rollback a + // committed transaction. BUT, if an error occurs, we'll get our transaction rollback'ed, which + // is nice. + defer tx.Rollback() + + var totalSize uint64 = 0 + for _, file := range files { + totalSize += uint64(file.Size) + } + + // This is a workaround for a bug: the database will not accept total_size to be zero. + if totalSize == 0 { + zap.L().Debug("Ignoring a torrent whose total size is zero.") + return nil + } + + if exist, err := db.DoesTorrentExist(infoHash); exist || err != nil { + return err + } + + var lastInsertId int64 + + err = tx.QueryRow(` + INSERT INTO torrents ( + info_hash, + name, + total_size, + discovered_on + ) VALUES ($1, $2, $3, $4) + RETURNING id; + `, infoHash, name, totalSize, time.Now().Unix()).Scan(&lastInsertId) + if err != nil { + return errors.Wrap(err, "tx.QueryRow (INSERT INTO torrents)") + } + + for _, file := range files { + if !utf8.ValidString(file.Path) { + zap.L().Warn( + "Ignoring a file whose path is not UTF-8 compliant.", + zap.Binary("path", []byte(file.Path)), + ) + + // Returning nil so deferred tx.Rollback() will be called and transaction will be canceled. + return nil + } + + _, err = tx.Exec("INSERT INTO files (torrent_id, size, path) VALUES ($1, $2, $3);", + lastInsertId, file.Size, file.Path, + ) + if err != nil { + return errors.Wrap(err, "tx.Exec (INSERT INTO files)") + } + } + + err = tx.Commit() + if err != nil { + return errors.Wrap(err, "tx.Commit") + } + + return nil +} + +func (db *postgresDatabase) Close() error { + return db.conn.Close() +} + +func (db *postgresDatabase) GetNumberOfTorrents() (uint, error) { + // Using estimated number of rows which can make queries much faster + // https://www.postgresql.org/message-id/568BF820.9060101%40comarch.com + // https://wiki.postgresql.org/wiki/Count_estimate + rows, err := db.conn.Query( + "SELECT reltuples::BIGINT AS estimate_count FROM pg_class WHERE relname='torrents';", + ) + if err != nil { + return 0, err + } + defer rows.Close() + + if !rows.Next() { + return 0, fmt.Errorf("no rows returned from `SELECT reltuples::BIGINT AS estimate_count`") + } + + // Returns int64: https://godoc.org/github.com/lib/pq#hdr-Data_Types + var n *uint + if err = rows.Scan(&n); err != nil { + return 0, err + } + + // If the database is empty (i.e. 0 entries in 'torrents') then the query will return nil. + if n == nil { + return 0, nil + } else { + return *n, nil + } +} + +func (db *postgresDatabase) QueryTorrents( + query string, + epoch int64, + orderBy OrderingCriteria, + ascending bool, + limit uint, + lastOrderedValue *float64, + lastID *uint64, +) ([]TorrentMetadata, error) { + err := fmt.Errorf("QueryTorrents is not supported yet by PostgreSQL backend") + + torrents := make([]TorrentMetadata, 0) + + return torrents, err +} + +func (db *postgresDatabase) GetTorrent(infoHash []byte) (*TorrentMetadata, error) { + rows, err := db.conn.Query(` + SELECT + t.info_hash, + t.name, + t.total_size, + t.discovered_on, + (SELECT COUNT(*) FROM files f WHERE f.torrent_id = t.id) AS n_files + FROM torrents t + WHERE t.info_hash = $1;`, + infoHash, + ) + defer db.closeRows(rows) + if err != nil { + return nil, err + } + + if !rows.Next() { + return nil, nil + } + + var tm TorrentMetadata + if err = rows.Scan(&tm.InfoHash, &tm.Name, &tm.Size, &tm.DiscoveredOn, &tm.NFiles); err != nil { + return nil, err + } + + return &tm, nil +} + +func (db *postgresDatabase) GetFiles(infoHash []byte) ([]File, error) { + rows, err := db.conn.Query(` + SELECT + f.size, + f.path + FROM files f, torrents t WHERE f.torrent_id = t.id AND t.info_hash = $1;`, + infoHash, + ) + defer db.closeRows(rows) + if err != nil { + return nil, err + } + + var files []File + for rows.Next() { + var file File + if err = rows.Scan(&file.Size, &file.Path); err != nil { + return nil, err + } + files = append(files, file) + } + + return files, nil +} + +func (db *postgresDatabase) GetStatistics(from string, n uint) (*Statistics, error) { + panic("Not implemented yet for PostgreSQL") +} + +func (db *postgresDatabase) setupDatabase() error { + tx, err := db.conn.Begin() + if err != nil { + return errors.Wrap(err, "sql.DB.Begin") + } + + defer tx.Rollback() + + rows, err := db.conn.Query("SELECT 1 FROM pg_extension WHERE extname = 'pg_trgm';") + if err != nil { + return err + } + defer db.closeRows(rows) + + trgmInstalled := rows.Next() + if rows.Err() != nil { + return err + } + + if !trgmInstalled { + return fmt.Errorf( + "pg_trgm extension is not enabled. You need to execute 'CREATE EXTENSION pg_trgm' on this database", + ) + } + + // Initial Setup for schema version 0: + // FROZEN. + _, err = tx.Exec(` + CREATE SCHEMA IF NOT EXISTS ` + db.schema + `; + + -- Torrents ID sequence generator + CREATE SEQUENCE IF NOT EXISTS seq_torrents_id; + -- Files ID sequence generator + CREATE SEQUENCE IF NOT EXISTS seq_files_id; + + CREATE TABLE IF NOT EXISTS torrents ( + id INTEGER PRIMARY KEY DEFAULT nextval('seq_torrents_id'), + info_hash bytea NOT NULL UNIQUE, + name TEXT NOT NULL, + total_size BIGINT NOT NULL CHECK(total_size > 0), + discovered_on INTEGER NOT NULL CHECK(discovered_on > 0) + ); + + -- Indexes for search sorting options + CREATE INDEX IF NOT EXISTS idx_torrents_total_size ON torrents (total_size); + CREATE INDEX IF NOT EXISTS idx_torrents_discovered_on ON torrents (discovered_on); + + -- Using pg_trgm GIN index for fast ILIKE queries + -- You need to execute "CREATE EXTENSION pg_trgm" on your database for this index to work + -- Be aware that using this type of index implies that making ILIKE queries with less that + -- 3 character values will cause full table scan instead of using index. + -- You can try to avoid that by doing 'SET enable_seqscan=off'. + CREATE INDEX IF NOT EXISTS idx_torrents_name_gin_trgm ON torrents USING GIN (name gin_trgm_ops); + + CREATE TABLE IF NOT EXISTS files ( + id INTEGER PRIMARY KEY DEFAULT nextval('seq_files_id'), + torrent_id INTEGER REFERENCES torrents ON DELETE CASCADE ON UPDATE RESTRICT, + size BIGINT NOT NULL, + path TEXT NOT NULL + ); + + CREATE INDEX IF NOT EXISTS idx_files_torrent_id ON files (torrent_id); + + CREATE TABLE IF NOT EXISTS migrations ( + schema_version SMALLINT NOT NULL UNIQUE + ); + + INSERT INTO migrations (schema_version) VALUES (0) ON CONFLICT DO NOTHING; + `) + if err != nil { + return errors.Wrap(err, "sql.Tx.Exec (v0)") + } + + // Get current schema version + rows, err = tx.Query("SELECT MAX(schema_version) FROM migrations;") + if err != nil { + return errors.Wrap(err, "sql.Tx.Query (SELECT MAX(version) FROM migrations)") + } + defer db.closeRows(rows) + + var schemaVersion int + if !rows.Next() { + return fmt.Errorf("sql.Rows.Next (SELECT MAX(version) FROM migrations): Query did not return any rows") + } + if err = rows.Scan(&schemaVersion); err != nil { + return errors.Wrap(err, "sql.Rows.Scan (MAX(version))") + } + // If next line is removed we're getting error on sql.Tx.Commit: unexpected command tag SELECT + // https://stackoverflow.com/questions/36295883/golang-postgres-commit-unknown-command-error/36866993#36866993 + db.closeRows(rows) + + // Uncomment for future migrations: + //switch schemaVersion { + //case 0: // FROZEN. + // zap.L().Warn("Updating (fake) database schema from 0 to 1...") + // _, err = tx.Exec(`INSERT INTO migrations (schema_version) VALUES (1);`) + // if err != nil { + // return errors.Wrap(err, "sql.Tx.Exec (v0 -> v1)") + // } + // //fallthrough + //} + + if err = tx.Commit(); err != nil { + return errors.Wrap(err, "sql.Tx.Commit") + } + + return nil +} + +func (db *postgresDatabase) closeRows(rows *sql.Rows) { + if err := rows.Close(); err != nil { + zap.L().Error("could not close row", zap.Error(err)) + } +}