Beanstalkd engine implementation.
This commit is contained in:
parent
9f9024674e
commit
3dfda7c2be
5
go.mod
5
go.mod
@ -1,5 +1,7 @@
|
|||||||
module github.com/boramalper/magnetico
|
module github.com/boramalper/magnetico
|
||||||
|
|
||||||
|
go 1.15
|
||||||
|
|
||||||
require (
|
require (
|
||||||
github.com/Wessie/appdirs v0.0.0-20141031215813-6573e894f8e2
|
github.com/Wessie/appdirs v0.0.0-20141031215813-6573e894f8e2
|
||||||
github.com/anacrolix/dht v1.0.1 // indirect
|
github.com/anacrolix/dht v1.0.1 // indirect
|
||||||
@ -11,6 +13,7 @@ require (
|
|||||||
github.com/google/pprof v0.0.0-20190515194954-54271f7e092f // indirect
|
github.com/google/pprof v0.0.0-20190515194954-54271f7e092f // indirect
|
||||||
github.com/gorilla/mux v1.7.4
|
github.com/gorilla/mux v1.7.4
|
||||||
github.com/gorilla/schema v1.1.0
|
github.com/gorilla/schema v1.1.0
|
||||||
|
github.com/iwanbk/gobeanstalk v0.0.0-20160903043409-dbbb23937c31
|
||||||
github.com/jackc/pgx/v4 v4.9.2
|
github.com/jackc/pgx/v4 v4.9.2
|
||||||
github.com/jessevdk/go-flags v1.4.0
|
github.com/jessevdk/go-flags v1.4.0
|
||||||
github.com/kevinburke/go-bindata v3.16.0+incompatible // indirect
|
github.com/kevinburke/go-bindata v3.16.0+incompatible // indirect
|
||||||
@ -28,5 +31,3 @@ require (
|
|||||||
golang.org/x/tools v0.0.0-20200221224223-e1da425f72fd // indirect
|
golang.org/x/tools v0.0.0-20200221224223-e1da425f72fd // indirect
|
||||||
honnef.co/go/tools v0.0.1-2020.1.3 // indirect
|
honnef.co/go/tools v0.0.1-2020.1.3 // indirect
|
||||||
)
|
)
|
||||||
|
|
||||||
go 1.13
|
|
||||||
|
@ -29,6 +29,27 @@ for more examples.
|
|||||||
Optional parameter `schema` was added to choose which schema will be used to store magnetico tables,
|
Optional parameter `schema` was added to choose which schema will be used to store magnetico tables,
|
||||||
sequences and indexes.
|
sequences and indexes.
|
||||||
|
|
||||||
|
## Beanstalk MQ engine for magneticod
|
||||||
|
|
||||||
|
[Beanstalkd](https://beanstalkd.github.io/) is very lightweight and simple MQ server implementation.
|
||||||
|
You can use it to organize delivery of the indexed data to your application.
|
||||||
|
|
||||||
|
Use `beanstalk` URL schema to connect to beanstalkd server. For example:
|
||||||
|
|
||||||
|
```shell
|
||||||
|
magneticod --database=beanstalkd://127.0.0.1:11300/magneticod_tube
|
||||||
|
```
|
||||||
|
|
||||||
|
Don't forget to [set](https://linux.die.net/man/1/beanstalkd) binlog persistence, change maximum job size
|
||||||
|
and `fsync()` period to be able to reliably save torrents with a large number of files:
|
||||||
|
|
||||||
|
```shell
|
||||||
|
# Example settings (may not work for you)
|
||||||
|
beanstalkd -z 1048560 -b /var/lib/beanstalkd -f 2400000
|
||||||
|
```
|
||||||
|
|
||||||
|
For job data example see `stdout` engine documentation below as `beanstalk` engine uses the same format.
|
||||||
|
|
||||||
## Stdout Dummy Database Engine for magneticod
|
## Stdout Dummy Database Engine for magneticod
|
||||||
|
|
||||||
Stdout dummy database engine for **magneticod** prints a new [JSON Line](http://jsonlines.org/)
|
Stdout dummy database engine for **magneticod** prints a new [JSON Line](http://jsonlines.org/)
|
||||||
|
108
pkg/persistence/beanstalkd.go
Normal file
108
pkg/persistence/beanstalkd.go
Normal file
@ -0,0 +1,108 @@
|
|||||||
|
package persistence
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/hex"
|
||||||
|
"encoding/json"
|
||||||
|
"go.uber.org/zap"
|
||||||
|
"net/url"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/iwanbk/gobeanstalk"
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
)
|
||||||
|
|
||||||
|
func makeBeanstalkDatabase(url_ *url.URL) (Database, error) {
|
||||||
|
s := new(beanstalkd)
|
||||||
|
|
||||||
|
var err error
|
||||||
|
s.bsQueue, err = gobeanstalk.Dial(url_.Hostname() + ":" + url_.Port())
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.Wrap(err, "Beanstalkd connection error")
|
||||||
|
}
|
||||||
|
|
||||||
|
tubeName := strings.TrimPrefix(url_.Path, "/")
|
||||||
|
|
||||||
|
err = s.bsQueue.Use(tubeName)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.Wrap(err, "Beanstalkd tube set error")
|
||||||
|
}
|
||||||
|
|
||||||
|
zap.L().Info(
|
||||||
|
"Beanstalkd connection created",
|
||||||
|
zap.String("host", url_.Hostname()),
|
||||||
|
zap.String("port", url_.Port()),
|
||||||
|
zap.String("tube", tubeName),
|
||||||
|
)
|
||||||
|
|
||||||
|
return s, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type beanstalkd struct {
|
||||||
|
bsQueue *gobeanstalk.Conn
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *beanstalkd) Engine() databaseEngine {
|
||||||
|
return Beanstalkd
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *beanstalkd) DoesTorrentExist(infoHash []byte) (bool, error) {
|
||||||
|
// Always say that "No the torrent does not exist" because we do not have
|
||||||
|
// a way to know if we have seen it before or not.
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *beanstalkd) AddNewTorrent(infoHash []byte, name string, files []File) error {
|
||||||
|
payloadJson, err := json.Marshal(SimpleTorrentSummary{
|
||||||
|
InfoHash: hex.EncodeToString(infoHash),
|
||||||
|
Name: name,
|
||||||
|
Files: files,
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return errors.Wrap(err, "DB engine beanstalkd encode error")
|
||||||
|
}
|
||||||
|
|
||||||
|
jobId, err := s.bsQueue.Put(payloadJson, 0, 0, 30*time.Second)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return errors.Wrap(err, "DB engine beanstalkd Put() error")
|
||||||
|
}
|
||||||
|
|
||||||
|
zap.L().Debug("New item put into the queue", zap.Uint64("job_id", jobId))
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *beanstalkd) Close() error {
|
||||||
|
s.bsQueue.Quit()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *beanstalkd) GetNumberOfTorrents() (uint, error) {
|
||||||
|
return 0, NotImplementedError
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *beanstalkd) QueryTorrents(
|
||||||
|
query string,
|
||||||
|
epoch int64,
|
||||||
|
orderBy OrderingCriteria,
|
||||||
|
ascending bool,
|
||||||
|
limit uint,
|
||||||
|
lastOrderedValue *float64,
|
||||||
|
lastID *uint64,
|
||||||
|
) ([]TorrentMetadata, error) {
|
||||||
|
return nil, NotImplementedError
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *beanstalkd) GetTorrent(infoHash []byte) (*TorrentMetadata, error) {
|
||||||
|
return nil, NotImplementedError
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *beanstalkd) GetFiles(infoHash []byte) ([]File, error) {
|
||||||
|
return nil, NotImplementedError
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *beanstalkd) GetStatistics(from string, n uint) (*Statistics, error) {
|
||||||
|
return nil, NotImplementedError
|
||||||
|
}
|
@ -63,6 +63,7 @@ type databaseEngine uint8
|
|||||||
const (
|
const (
|
||||||
Sqlite3 databaseEngine = iota + 1
|
Sqlite3 databaseEngine = iota + 1
|
||||||
Postgres
|
Postgres
|
||||||
|
Beanstalkd
|
||||||
Stdout
|
Stdout
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -91,6 +92,12 @@ type TorrentMetadata struct {
|
|||||||
Relevance float64 `json:"relevance"`
|
Relevance float64 `json:"relevance"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type SimpleTorrentSummary struct {
|
||||||
|
InfoHash string `json:"infoHash"`
|
||||||
|
Name string `json:"name"`
|
||||||
|
Files []File `json:"files"`
|
||||||
|
}
|
||||||
|
|
||||||
func (tm *TorrentMetadata) MarshalJSON() ([]byte, error) {
|
func (tm *TorrentMetadata) MarshalJSON() ([]byte, error) {
|
||||||
type Alias TorrentMetadata
|
type Alias TorrentMetadata
|
||||||
return json.Marshal(&struct {
|
return json.Marshal(&struct {
|
||||||
@ -122,6 +129,9 @@ func MakeDatabase(rawURL string, logger *zap.Logger) (Database, error) {
|
|||||||
case "stdout":
|
case "stdout":
|
||||||
return makeStdoutDatabase(url_)
|
return makeStdoutDatabase(url_)
|
||||||
|
|
||||||
|
case "beanstalk", "beanstalkd":
|
||||||
|
return makeBeanstalkDatabase(url_)
|
||||||
|
|
||||||
case "mysql":
|
case "mysql":
|
||||||
return nil, fmt.Errorf("mysql is not yet supported")
|
return nil, fmt.Errorf("mysql is not yet supported")
|
||||||
|
|
||||||
|
@ -9,12 +9,6 @@ import (
|
|||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
)
|
)
|
||||||
|
|
||||||
type out struct {
|
|
||||||
InfoHash string `json:"infoHash"`
|
|
||||||
Name string `json:"name"`
|
|
||||||
Files []File `json:"files"`
|
|
||||||
}
|
|
||||||
|
|
||||||
func makeStdoutDatabase(_ *url.URL) (Database, error) {
|
func makeStdoutDatabase(_ *url.URL) (Database, error) {
|
||||||
s := new(stdout)
|
s := new(stdout)
|
||||||
s.encoder = json.NewEncoder(os.Stdout)
|
s.encoder = json.NewEncoder(os.Stdout)
|
||||||
@ -39,7 +33,7 @@ func (s *stdout) DoesTorrentExist(infoHash []byte) (bool, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *stdout) AddNewTorrent(infoHash []byte, name string, files []File) error {
|
func (s *stdout) AddNewTorrent(infoHash []byte, name string, files []File) error {
|
||||||
err := s.encoder.Encode(out{
|
err := s.encoder.Encode(SimpleTorrentSummary{
|
||||||
InfoHash: hex.EncodeToString(infoHash),
|
InfoHash: hex.EncodeToString(infoHash),
|
||||||
Name: name,
|
Name: name,
|
||||||
Files: files,
|
Files: files,
|
||||||
|
Loading…
Reference in New Issue
Block a user