QueryTorrents() of sqlite3 is complete! (testing needed)

This commit is contained in:
Bora Alper 2018-04-21 10:05:12 +01:00
parent f8b489f4a0
commit ac7d0a514f
4 changed files with 418 additions and 46 deletions

View File

@ -3,7 +3,6 @@ package persistence
import ( import (
"fmt" "fmt"
"net/url" "net/url"
"time"
"go.uber.org/zap" "go.uber.org/zap"
) )
@ -22,12 +21,20 @@ type Database interface {
// * that match the @query if it's not empty, else all torrents // * that match the @query if it's not empty, else all torrents
// * ordered by the @orderBy in ascending order if @ascending is true, else in descending order // * ordered by the @orderBy in ascending order if @ascending is true, else in descending order
// after skipping (@page * @pageSize) torrents that also fits the criteria above. // after skipping (@page * @pageSize) torrents that also fits the criteria above.
QueryTorrents(query string, discoveredOnBefore int64, orderBy orderingCriteria, ascending bool, page uint, pageSize uint) ([]TorrentMetadata, error) QueryTorrents(
query string,
epoch int64,
orderBy orderingCriteria,
ascending bool,
limit uint,
lastOrderedValue *uint,
lastID *uint,
) ([]TorrentMetadata, error)
// GetTorrents returns the TorrentExtMetadata for the torrent of the given InfoHash. Will return // GetTorrents returns the TorrentExtMetadata for the torrent of the given InfoHash. Will return
// nil, nil if the torrent does not exist in the database. // nil, nil if the torrent does not exist in the database.
GetTorrent(infoHash []byte) (*TorrentMetadata, error) GetTorrent(infoHash []byte) (*TorrentMetadata, error)
GetFiles(infoHash []byte) ([]File, error) GetFiles(infoHash []byte) ([]File, error)
GetStatistics(n uint, granularity Granularity, to time.Time) (*Statistics, error) GetStatistics(n uint, to string) (*Statistics, error)
} }
type orderingCriteria uint8 type orderingCriteria uint8
@ -36,15 +43,8 @@ const (
BySize BySize
ByDiscoveredOn ByDiscoveredOn
ByNFiles ByNFiles
) ByNSeeders
ByNLeechers
type Granularity uint8
const (
Yearly Granularity = iota
Monthly
Weekly
Daily
Hourly
) )
type databaseEngine uint8 type databaseEngine uint8
@ -53,11 +53,12 @@ const (
) )
type Statistics struct { type Statistics struct {
N uint N uint64
// All these slices below have the exact length equal to the Period. // All these slices below have the exact length equal to the Period.
NTorrentsDiscovered []uint NTorrents []uint64
NFilesDiscovered []uint NFiles []uint64
TotalSize []uint64
} }
type File struct { type File struct {
@ -68,7 +69,7 @@ type File struct {
type TorrentMetadata struct { type TorrentMetadata struct {
InfoHash []byte InfoHash []byte
Name string Name string
TotalSize uint64 Size uint64
DiscoveredOn int64 DiscoveredOn int64
NFiles uint NFiles uint
} }

184
pkg/persistence/iso8601.go Normal file
View File

@ -0,0 +1,184 @@
package persistence
import (
"fmt"
"regexp"
"strconv"
"time"
)
var yearRE = regexp.MustCompile(`^(\d{4})$`)
var monthRE = regexp.MustCompile(`^(\d{4})-(\d{2})$`)
var weekRE = regexp.MustCompile(`^(\d{4})-W(\d{2})$`)
var dayRE = regexp.MustCompile(`^(\d{4})-(\d{2})-(\d{2})$`)
var hourRE = regexp.MustCompile(`^(\d{4})-(\d{2})-(\d{2})T(\d{2})$`)
type Granularity int
const (
Year Granularity = iota
Month
Week
Day
Hour
)
// ParseISO8601 is **not** a function to parse all and every kind of valid ISO 8601
// date, nor it's intended to be, since we don't need that.
func ParseISO8601(s string) (*time.Time, Granularity, error) {
if matches := yearRE.FindStringSubmatch(s); len(matches) != 0 {
year, err := parseYear(matches[1])
if err != nil {
return nil, -1, err
}
t := time.Date(year, time.December, daysOfMonth(time.December, year), 23, 59, 59, 0, time.UTC)
return &t, Year, nil
}
if matches := monthRE.FindStringSubmatch(s); len(matches) != 0 {
month, err := parseMonth(matches[2])
year, err := parseYear(matches[1])
if err != nil {
return nil, -1, err
}
t := time.Date(year, month, 31, 23, 59, 59, 0, time.UTC)
return &t, Month, nil
}
if matches := weekRE.FindStringSubmatch(s); len(matches) != 0 {
week, err := parseWeek(matches[2])
year, err := parseYear(matches[1])
if err != nil {
return nil, -1, err
}
t := time.Date(year, time.January, week * 7, 23, 59, 59, 0, time.UTC)
return &t, Week, nil
}
if matches := dayRE.FindStringSubmatch(s); len(matches) != 0 {
month, err := parseMonth(matches[2])
year, err := parseYear(matches[1])
if err != nil {
return nil, -1, err
}
day, err := parseDay(matches[3], daysOfMonth(month, year))
if err != nil {
return nil, -1, err
}
t := time.Date(year, month, day, 23, 59, 59, 0, time.UTC)
return &t, Day, nil
}
if matches := hourRE.FindStringSubmatch(s); len(matches) != 0 {
month, err := parseMonth(matches[2])
year, err := parseYear(matches[1])
if err != nil {
return nil, -1, err
}
hour, err := parseHour(matches[4])
day, err := parseDay(matches[3], daysOfMonth(month, year))
if err != nil {
return nil, -1, err
}
t := time.Date(year, month, day, hour, 59, 59, 0, time.UTC)
return &t, Hour, nil
}
return nil, -1, fmt.Errorf("string does not match any formats")
}
func daysOfMonth(month time.Month, year int) int {
switch month {
case time.January:
return 31
case time.February:
if isLeap(year) {
return 29
} else {
return 28
}
case time.March:
return 31
case time.April:
return 30
case time.May:
return 31
case time.June:
return 30
case time.July:
return 31
case time.August:
return 31
case time.September:
return 30
case time.October:
return 31
case time.November:
return 30
case time.December:
return 31
default:
panic("invalid month!")
}
}
func isLeap(year int) bool {
if year % 4 != 0 {
return false
} else if year % 100 != 0 {
return true
} else if year % 400 != 0 {
return false
} else {
return true
}
}
func atoi(s string) int {
i, e := strconv.Atoi(s)
if e != nil {
// panic on error since atoi() will be called only after we parse it with regex
// (hopefully `\d`!)
panic(e.Error())
}
return i
}
func parseYear(s string) (int, error) {
year := atoi(s)
if year <= 1583 {
return 0, fmt.Errorf("years before 1583 are not allowed")
}
return year, nil
}
func parseMonth(s string) (time.Month, error) {
month := atoi(s)
if month <= 0 || month >= 13 {
return time.Month(-1), fmt.Errorf("month is not in range [01, 12]")
}
return time.Month(month), nil
}
func parseWeek(s string) (int, error) {
week := atoi(s)
if week <= 0 || week >= 54 {
return -1, fmt.Errorf("week is not in range [01, 53]")
}
return week, nil
}
func parseDay(s string, max int) (int, error) {
day := atoi(s)
if day <= 0 || day > max {
return -1, fmt.Errorf("day is not in range [01, %d]", max)
}
return day, nil
}
func parseHour(s string) (int, error) {
hour := atoi(s)
if hour <= -1 || hour >= 25 {
return -1, fmt.Errorf("hour is not in range [00, 24]")
}
return hour, nil
}

View File

@ -0,0 +1,45 @@
package persistence
import "testing"
var validDates = []struct {
date string
granularity Granularity
}{
{
"2018",
Year,
},
{
"2018-04",
Month,
},
{
"2018-W16",
Week,
},
{
"2018-04-20",
Day,
},
{
"2018-04-20T15",
Hour,
},
}
func TestParseISO8601(t *testing.T) {
for i, date := range validDates {
_, gr, err := ParseISO8601(date.date)
if err != nil {
t.Errorf("Error while parsing valid date #%d", i+1, err)
continue
}
if gr != date.granularity {
t.Errorf("Granularity of the date #%d is wrong! Got %d (expected %d)",
i+1, gr, date.granularity)
continue
}
}
}

View File

@ -1,8 +1,10 @@
package persistence package persistence
import ( import (
"bytes"
"database/sql" "database/sql"
"fmt" "fmt"
"text/template"
"net/url" "net/url"
"os" "os"
"path" "path"
@ -79,9 +81,9 @@ func (db *sqlite3Database) AddNewTorrent(infoHash []byte, name string, files []F
// is nice. // is nice.
defer tx.Rollback() defer tx.Rollback()
var totalSize int64 = 0 var totalSize uint64 = 0
for _, file := range files { for _, file := range files {
totalSize += file.Size totalSize += uint64(file.Size)
} }
// This is a workaround for a bug: the database will not accept total_size to be zero. // This is a workaround for a bug: the database will not accept total_size to be zero.
@ -114,7 +116,7 @@ func (db *sqlite3Database) AddNewTorrent(infoHash []byte, name string, files []F
} }
for _, file := range files { for _, file := range files {
_, err = tx.Exec("INSERT INTO files (torrent_id, Size, path) VALUES (?, ?, ?);", _, err = tx.Exec("INSERT INTO files (torrent_id, size, path) VALUES (?, ?, ?);",
lastInsertId, file.Size, file.Path, lastInsertId, file.Size, file.Path,
) )
if err != nil { if err != nil {
@ -130,7 +132,6 @@ func (db *sqlite3Database) AddNewTorrent(infoHash []byte, name string, files []F
return nil return nil
} }
func (db *sqlite3Database) Close() error { func (db *sqlite3Database) Close() error {
return db.conn.Close() return db.conn.Close()
} }
@ -144,7 +145,7 @@ func (db *sqlite3Database) GetNumberOfTorrents() (uint, error) {
} }
if rows.Next() != true { if rows.Next() != true {
fmt.Errorf("No rows returned from `SELECT MAX(ROWID)`!") fmt.Errorf("No rows returned from `SELECT MAX(ROWID)`")
} }
var n uint var n uint
@ -159,26 +160,137 @@ func (db *sqlite3Database) GetNumberOfTorrents() (uint, error) {
return n, nil return n, nil
} }
func (db *sqlite3Database) QueryTorrents(query string, discoveredOnBefore int64, orderBy orderingCriteria, ascending bool, page uint, pageSize uint) ([]TorrentMetadata, error) { func (db *sqlite3Database) QueryTorrents(
query string,
epoch int64,
orderBy orderingCriteria,
ascending bool,
limit uint,
lastOrderedValue *uint,
lastID *uint,
) ([]TorrentMetadata, error) {
if query == "" && orderBy == ByRelevance { if query == "" && orderBy == ByRelevance {
return nil, fmt.Errorf("torrents cannot be ordered by \"relevance\" when the query is empty") return nil, fmt.Errorf("torrents cannot be ordered by relevance when the query is empty")
}
if (lastOrderedValue == nil) != (lastID == nil) {
return nil, fmt.Errorf("lastOrderedValue and lastID should be supplied together, if supplied")
}
doJoin := query != ""
firstPage := lastID != nil
// executeTemplate is used to prepare the SQL query, WITH PLACEHOLDERS FOR USER INPUT.
sqlQuery := executeTemplate(`
SELECT info_hash
, name
, total_size
, discovered_on
, (SELECT COUNT(*) FROM files WHERE torrents.id = files.torrent_id) AS n_files
FROM torrents
{{ if .DoJoin }}
INNER JOIN (
SELECT rowid AS id
, bm25(torrents_idx) AS rank
FROM torrents_idx
WHERE torrents_idx MATCH ?
) AS idx USING(id)
{{ end }}
WHERE modified_on <= ?
{{ if not FirstPage }}
AND id > ?
AND {{ .OrderOn }} {{ GTEorLTE(.Ascending) }} ?
{{ end }}
ORDER BY {{ .OrderOn }} {{ AscOrDesc(.Ascending) }}, id ASC
LIMIT ?;
`, struct {
DoJoin bool
FirstPage bool
OrderOn string
Ascending bool
}{
DoJoin: doJoin, // if there is a query, do join
FirstPage: firstPage, // lastID != nil implies that lastOrderedValue != nil as well
OrderOn: orderOn(orderBy),
Ascending: ascending,
}, template.FuncMap{
"GTEorLTE": func(ascending bool) string {
// TODO: or maybe vice versa idk
if ascending {
return "<"
} else {
return ">"
}
},
"AscOrDesc": func(ascending bool) string {
if ascending {
return "ASC"
} else {
return "DESC"
}
},
})
// Prepare query
queryArgs := make([]interface{}, 0)
if doJoin {
queryArgs = append(queryArgs, query)
}
queryArgs = append(queryArgs, epoch)
if firstPage {
queryArgs = append(queryArgs, lastID)
queryArgs = append(queryArgs, lastOrderedValue)
}
queryArgs = append(queryArgs, limit)
rows, err := db.conn.Query(sqlQuery, queryArgs...)
if err != nil {
return nil, fmt.Errorf("error while querying torrents: %s", err.Error())
} }
var torrents []TorrentMetadata
for rows.Next() {
var torrent TorrentMetadata
if err = rows.Scan(&torrent.InfoHash, &torrent.Name, &torrent.Size, &torrent.DiscoveredOn, &torrent.NFiles); err != nil {
return nil, err
}
torrents = append(torrents, torrent)
}
// TODO if err := rows.Close(); err != nil {
return nil, err
}
return nil, nil return torrents, nil
}
func orderOn(orderBy orderingCriteria) string {
switch orderBy {
case ByRelevance:
return "idx.rank"
case BySize:
return "total_size"
case ByDiscoveredOn:
return "discovered_on"
case ByNFiles:
return "n_files"
default:
panic(fmt.Sprintf("unknown orderBy: %v", orderBy))
}
} }
func (db *sqlite3Database) GetTorrent(infoHash []byte) (*TorrentMetadata, error) { func (db *sqlite3Database) GetTorrent(infoHash []byte) (*TorrentMetadata, error) {
rows, err := db.conn.Query( rows, err := db.conn.Query(`
`SELECT SELECT
info_hash, info_hash,
name, name,
total_size, total_size,
discovered_on, discovered_on,
(SELECT COUNT(1) FROM files WHERE torrent_id = torrents.id) AS n_files (SELECT COUNT(*) FROM files WHERE torrent_id = torrents.id) AS n_files
FROM torrents FROM torrents
WHERE info_hash = ?`, WHERE info_hash = ?`,
infoHash, infoHash,
@ -188,12 +300,14 @@ func (db *sqlite3Database) GetTorrent(infoHash []byte) (*TorrentMetadata, error)
} }
if rows.Next() != true { if rows.Next() != true {
zap.L().Warn("torrent not found amk")
return nil, nil return nil, nil
} }
var tm TorrentMetadata var tm TorrentMetadata
rows.Scan(&tm.InfoHash, &tm.Name, &tm.Size, &tm.DiscoveredOn, &tm.NFiles) if err = rows.Scan(&tm.InfoHash, &tm.Name, &tm.Size, &tm.DiscoveredOn, &tm.NFiles); err != nil {
return nil, err
}
if err = rows.Close(); err != nil { if err = rows.Close(); err != nil {
return nil, err return nil, err
} }
@ -210,21 +324,28 @@ func (db *sqlite3Database) GetFiles(infoHash []byte) ([]File, error) {
var files []File var files []File
for rows.Next() { for rows.Next() {
var file File var file File
rows.Scan(&file.Size, &file.Path) if err = rows.Scan(&file.Size, &file.Path); err != nil {
return nil, err
}
files = append(files, file) files = append(files, file)
} }
if err := rows.Close(); err != nil {
return nil, err
}
return files, nil return files, nil
} }
func (db *sqlite3Database) GetStatistics(from ISO8601, period uint) (*Statistics, error) { func (db *sqlite3Database) GetStatistics(n uint, to string) (*Statistics, error) {
// TODO to_time, granularity, err := ParseISO8601(to)
return nil, nil if err != nil {
return nil, fmt.Errorf("parsing @to error: %s", err.Error())
} }
func (db *sqlite3Database) commitQueuedTorrents() error {
// TODO // TODO
return nil
return nil, nil
} }
func (db *sqlite3Database) setupDatabase() error { func (db *sqlite3Database) setupDatabase() error {
@ -263,8 +384,10 @@ func (db *sqlite3Database) setupDatabase() error {
// is nice. // is nice.
defer tx.Rollback() defer tx.Rollback()
// Essential, and valid for all user_version`s: // Initial Setup for `user_version` 0:
// TODO: "torrent_id" column of the "files" table can be NULL, how can we fix this in a new schema? // FROZEN.
// TODO: "torrent_id" column of the "files" table can be NULL, how can we fix this in a new
// version schema?
_, err = tx.Exec(` _, err = tx.Exec(`
CREATE TABLE IF NOT EXISTS torrents ( CREATE TABLE IF NOT EXISTS torrents (
id INTEGER PRIMARY KEY, id INTEGER PRIMARY KEY,
@ -303,13 +426,13 @@ func (db *sqlite3Database) setupDatabase() error {
} }
switch userVersion { switch userVersion {
case 0: case 0: // FROZEN.
// Upgrade from user_version 0 to 1 // Upgrade from user_version 0 to 1
// Changes: // Changes:
// * `info_hash_index` is recreated as UNIQUE. // * `info_hash_index` is recreated as UNIQUE.
zap.L().Warn("Updating database schema from 0 to 1... (this might take a while)") zap.L().Warn("Updating database schema from 0 to 1... (this might take a while)")
_, err = tx.Exec(` _, err = tx.Exec(`
DROP INDEX info_hash_index; DROP INDEX IF EXISTS info_hash_index;
CREATE UNIQUE INDEX info_hash_index ON torrents (info_hash); CREATE UNIQUE INDEX info_hash_index ON torrents (info_hash);
PRAGMA user_version = 1; PRAGMA user_version = 1;
`) `)
@ -318,7 +441,7 @@ func (db *sqlite3Database) setupDatabase() error {
} }
fallthrough fallthrough
case 1: case 1: // FROZEN.
// Upgrade from user_version 1 to 2 // Upgrade from user_version 1 to 2
// Changes: // Changes:
// * Added `n_seeders`, `n_leechers`, and `updated_on` columns to the `torrents` table, and // * Added `n_seeders`, `n_leechers`, and `updated_on` columns to the `torrents` table, and
@ -363,13 +486,16 @@ func (db *sqlite3Database) setupDatabase() error {
} }
fallthrough fallthrough
case 2: case 2: // NOT FROZEN! (subject to change or complete removal)
// Upgrade from user_version 2 to 3 // Upgrade from user_version 2 to 3
// Changes: // Changes:
// * Created `torrents_idx` FTS5 virtual table. // * Created `torrents_idx` FTS5 virtual table.
//
// See: // See:
// * https://sqlite.org/fts5.html // * https://sqlite.org/fts5.html
// * https://sqlite.org/fts3.html // * https://sqlite.org/fts3.html
//
// * Added `n_files` column to the `torrents` table.
zap.L().Warn("Updating database schema from 2 to 3... (this might take a while)") zap.L().Warn("Updating database schema from 2 to 3... (this might take a while)")
tx.Exec(` tx.Exec(`
CREATE VIRTUAL TABLE torrents_idx USING fts5(name, content='torrents', content_rowid='id', tokenize="porter unicode61 separators ' !""#$%&''()*+,-./:;<=>?@[\]^_` + "`" + `{|}~'"); CREATE VIRTUAL TABLE torrents_idx USING fts5(name, content='torrents', content_rowid='id', tokenize="porter unicode61 separators ' !""#$%&''()*+,-./:;<=>?@[\]^_` + "`" + `{|}~'");
@ -389,6 +515,11 @@ func (db *sqlite3Database) setupDatabase() error {
INSERT INTO torrents_idx(rowid, name) VALUES (new.id, new.name); INSERT INTO torrents_idx(rowid, name) VALUES (new.id, new.name);
END; END;
-- Add column modified_on
ALTER TABLE torrents ADD COLUMN modified_on INTEGER;
CREATE INDEX modified_on_index ON torrents (modified_on);
UPDATE torrents SET torrents.modified_on = (SELECT discovered_on);
PRAGMA user_version = 3; PRAGMA user_version = 3;
`) `)
if err != nil { if err != nil {
@ -402,3 +533,14 @@ func (db *sqlite3Database) setupDatabase() error {
return nil return nil
} }
func executeTemplate(text string, data interface{}, funcs template.FuncMap) string {
t := template.Must(template.New("anon").Funcs(funcs).Parse(text))
var buf bytes.Buffer
err := t.Execute(&buf, data)
if err != nil {
panic(err.Error())
}
return buf.String()
}