cumulative commit! (see the description for changes)

magneticod:
!!! disabled the gradual increase in congestion control, for some reason we still can't detect congestion...
- `*net.UDPAddr` in dht/mainline instead of `net.Addr`
- fixed a bug when a very small extension message received
- simplified how peer adress is handled in bittorrent/metadata/sink
- simplified TrawlingResult in dht/mainline

magneticow:
- use WAL for sqlite3

persistence:
- use URL.String() instead of url.Path in sql.Open() so that URL parameters are not lost...
This commit is contained in:
Bora Alper 2018-08-03 11:28:50 +03:00
parent c07daa3eca
commit dc420da802
11 changed files with 150 additions and 114 deletions

View File

@ -81,21 +81,18 @@ func (l *Leech) writeAll(b []byte) error {
func (l *Leech) doBtHandshake() error { func (l *Leech) doBtHandshake() error {
lHandshake := []byte(fmt.Sprintf( lHandshake := []byte(fmt.Sprintf(
"\x13BitTorrent protocol\x00\x00\x00\x00\x00\x10\x00\x01%s%s", "\x13BitTorrent protocol\x00\x00\x00\x00\x00\x10\x00\x01%s%s",
l.infoHash[:], l.infoHash,
l.clientID, l.clientID,
)) ))
// ASSERTION // ASSERTION
if len(lHandshake) != 68 { panic(fmt.Sprintf("len(lHandshake) == %d", len(lHandshake))) } if len(lHandshake) != 68 { panic(fmt.Sprintf("len(lHandshake) == %d", len(lHandshake))) }
err := l.writeAll(lHandshake) err := l.writeAll(lHandshake)
if err != nil { if err != nil {
return errors.Wrap(err, "writeAll lHandshake") return errors.Wrap(err, "writeAll lHandshake")
} }
zap.L().Debug("BitTorrent handshake sent, waiting for the remote's...")
rHandshake, err := l.readExactly(68) rHandshake, err := l.readExactly(68)
if err != nil { if err != nil {
return errors.Wrap(err, "readExactly rHandshake") return errors.Wrap(err, "readExactly rHandshake")
@ -151,6 +148,15 @@ func (l *Leech) doExHandshake() error {
} }
func (l *Leech) requestAllPieces() error { func (l *Leech) requestAllPieces() error {
/*
* reqq
* An integer, the number of outstanding request messages this client supports without
* dropping any. The default in in libtorrent is 250.
* "handshake message" @ "Extension Protocol" @ http://www.bittorrent.org/beps/bep_0010.html
*
* TODO: maybe by requesting all pieces at once we are exceeding this limit? maybe we should
* request as we receive pieces?
*/
// Request all the pieces of metadata // Request all the pieces of metadata
nPieces := int(math.Ceil(float64(l.metadataSize) / math.Pow(2, 14))) nPieces := int(math.Ceil(float64(l.metadataSize) / math.Pow(2, 14)))
for piece := 0; piece < nPieces; piece++ { for piece := 0; piece < nPieces; piece++ {
@ -205,6 +211,11 @@ func (l *Leech) readExMessage() ([]byte, error) {
return nil, errors.Wrap(err, "readMessage") return nil, errors.Wrap(err, "readMessage")
} }
// Every extension message has at least 2 bytes.
if len(rMessage) < 2 {
continue;
}
// We are interested only in extension messages, whose first byte is always 20 // We are interested only in extension messages, whose first byte is always 20
if rMessage[0] == 20 { if rMessage[0] == 20 {
return rMessage, nil return rMessage, nil
@ -232,19 +243,50 @@ func (l *Leech) readUmMessage() ([]byte, error) {
func (l *Leech) connect(deadline time.Time) error { func (l *Leech) connect(deadline time.Time) error {
var err error var err error
l.conn, err = net.DialTCP("tcp", nil, l.peerAddr) l.conn, err = net.DialTCP("tcp4", nil, l.peerAddr)
if err != nil { if err != nil {
return errors.Wrap(err, "dial") return errors.Wrap(err, "dial")
} }
defer l.conn.Close()
// > If sec == 0, operating system discards any unsent or unacknowledged data [after Close()
// > has been called].
err = l.conn.SetLinger(0)
if err != nil {
if err := l.conn.Close(); err != nil {
zap.L().Panic("couldn't close leech connection!", zap.Error(err))
}
return errors.Wrap(err, "SetLinger")
}
err = l.conn.SetKeepAlive(true)
if err != nil {
if err := l.conn.Close(); err != nil {
zap.L().Panic("couldn't close leech connection!", zap.Error(err))
}
return errors.Wrap(err, "SetKeepAlive")
}
err = l.conn.SetKeepAlivePeriod(10 * time.Second)
if err != nil {
if err := l.conn.Close(); err != nil {
zap.L().Panic("couldn't close leech connection!", zap.Error(err))
}
return errors.Wrap(err, "SetKeepAlivePeriod")
}
err = l.conn.SetNoDelay(true) err = l.conn.SetNoDelay(true)
if err != nil { if err != nil {
if err := l.conn.Close(); err != nil {
zap.L().Panic("couldn't close leech connection!", zap.Error(err))
}
return errors.Wrap(err, "NODELAY") return errors.Wrap(err, "NODELAY")
} }
err = l.conn.SetDeadline(deadline) err = l.conn.SetDeadline(deadline)
if err != nil { if err != nil {
if err := l.conn.Close(); err != nil {
zap.L().Panic("couldn't close leech connection!", zap.Error(err))
}
return errors.Wrap(err, "SetDeadline") return errors.Wrap(err, "SetDeadline")
} }
@ -257,6 +299,11 @@ func (l *Leech) Do(deadline time.Time) {
l.OnError(errors.Wrap(err, "connect")) l.OnError(errors.Wrap(err, "connect"))
return return
} }
defer func() {
if err := l.conn.Close(); err != nil {
zap.L().Panic("couldn't close leech connection!", zap.Error(err))
}
}()
err = l.doBtHandshake() err = l.doBtHandshake()
if err != nil { if err != nil {

View File

@ -2,11 +2,9 @@ package metadata
import ( import (
"crypto/rand" "crypto/rand"
"fmt"
"net"
"strings"
"time" "time"
"github.com/boramalper/magnetico/pkg/util"
"go.uber.org/zap" "go.uber.org/zap"
"github.com/boramalper/magnetico/cmd/magneticod/dht/mainline" "github.com/boramalper/magnetico/cmd/magneticod/dht/mainline"
@ -62,29 +60,9 @@ func (ms *Sink) Sink(res mainline.TrawlingResult) {
// check whether res.infoHash exists in the ms.incomingInfoHashes, and where we add the infoHash // check whether res.infoHash exists in the ms.incomingInfoHashes, and where we add the infoHash
// to the incomingInfoHashes at the end of this function. // to the incomingInfoHashes at the end of this function.
zap.L().Info("Sunk!", zap.String("infoHash", res.InfoHash.String())) zap.L().Info("Sunk!", zap.Int("leeches", len(ms.incomingInfoHashes)), util.HexField("infoHash", res.InfoHash[:]))
IPs := res.PeerIP.String() leech := NewLeech(res.InfoHash, res.PeerAddr, LeechEventHandlers{
var rhostport string
if IPs == "<nil>" {
zap.L().Debug("Sink.Sink: Peer IP is nil!")
return
} else if IPs[0] == '?' {
zap.L().Debug("Sink.Sink: Peer IP is invalid!")
return
} else if strings.ContainsRune(IPs, ':') { // IPv6
rhostport = fmt.Sprintf("[%s]:%d", IPs, res.PeerPort)
} else { // IPv4
rhostport = fmt.Sprintf("%s:%d", IPs, res.PeerPort)
}
raddr, err := net.ResolveTCPAddr("tcp", rhostport)
if err != nil {
zap.L().Debug("Sink.Sink: Couldn't resolve peer address!", zap.Error(err))
return
}
leech := NewLeech(res.InfoHash, raddr, LeechEventHandlers{
OnSuccess: ms.flush, OnSuccess: ms.flush,
OnError: ms.onLeechError, OnError: ms.onLeechError,
}) })
@ -118,6 +96,6 @@ func (ms *Sink) flush(result Metadata) {
} }
func (ms *Sink) onLeechError(infoHash [20]byte, err error) { func (ms *Sink) onLeechError(infoHash [20]byte, err error) {
zap.L().Debug("leech error", zap.ByteString("infoHash", infoHash[:]), zap.Error(err)) zap.L().Debug("leech error", util.HexField("infoHash", infoHash[:]), zap.Error(err))
delete(ms.incomingInfoHashes, infoHash) delete(ms.incomingInfoHashes, infoHash)
} }

View File

@ -19,13 +19,13 @@ type Protocol struct {
} }
type ProtocolEventHandlers struct { type ProtocolEventHandlers struct {
OnPingQuery func(*Message, net.Addr) OnPingQuery func(*Message, *net.UDPAddr)
OnFindNodeQuery func(*Message, net.Addr) OnFindNodeQuery func(*Message, *net.UDPAddr)
OnGetPeersQuery func(*Message, net.Addr) OnGetPeersQuery func(*Message, *net.UDPAddr)
OnAnnouncePeerQuery func(*Message, net.Addr) OnAnnouncePeerQuery func(*Message, *net.UDPAddr)
OnGetPeersResponse func(*Message, net.Addr) OnGetPeersResponse func(*Message, *net.UDPAddr)
OnFindNodeResponse func(*Message, net.Addr) OnFindNodeResponse func(*Message, *net.UDPAddr)
OnPingORAnnouncePeerResponse func(*Message, net.Addr) OnPingORAnnouncePeerResponse func(*Message, *net.UDPAddr)
OnCongestion func() OnCongestion func()
} }
@ -58,13 +58,13 @@ func (p *Protocol) Terminate() {
p.transport.Terminate() p.transport.Terminate()
} }
func (p *Protocol) onMessage(msg *Message, addr net.Addr) { func (p *Protocol) onMessage(msg *Message, addr *net.UDPAddr) {
switch msg.Y { switch msg.Y {
case "q": case "q":
switch msg.Q { switch msg.Q {
case "ping": case "ping":
if !validatePingQueryMessage(msg) { if !validatePingQueryMessage(msg) {
zap.L().Debug("An invalid ping query received!") // zap.L().Debug("An invalid ping query received!")
return return
} }
// Check whether there is a registered event handler for the ping queries, before // Check whether there is a registered event handler for the ping queries, before
@ -75,7 +75,7 @@ func (p *Protocol) onMessage(msg *Message, addr net.Addr) {
case "find_node": case "find_node":
if !validateFindNodeQueryMessage(msg) { if !validateFindNodeQueryMessage(msg) {
zap.L().Debug("An invalid find_node query received!") // zap.L().Debug("An invalid find_node query received!")
return return
} }
if p.eventHandlers.OnFindNodeQuery != nil { if p.eventHandlers.OnFindNodeQuery != nil {
@ -84,7 +84,7 @@ func (p *Protocol) onMessage(msg *Message, addr net.Addr) {
case "get_peers": case "get_peers":
if !validateGetPeersQueryMessage(msg) { if !validateGetPeersQueryMessage(msg) {
zap.L().Debug("An invalid get_peers query received!") // zap.L().Debug("An invalid get_peers query received!")
return return
} }
if p.eventHandlers.OnGetPeersQuery != nil { if p.eventHandlers.OnGetPeersQuery != nil {
@ -93,7 +93,7 @@ func (p *Protocol) onMessage(msg *Message, addr net.Addr) {
case "announce_peer": case "announce_peer":
if !validateAnnouncePeerQueryMessage(msg) { if !validateAnnouncePeerQueryMessage(msg) {
zap.L().Debug("An invalid announce_peer query received!") // zap.L().Debug("An invalid announce_peer query received!")
return return
} }
if p.eventHandlers.OnAnnouncePeerQuery != nil { if p.eventHandlers.OnAnnouncePeerQuery != nil {
@ -104,15 +104,14 @@ func (p *Protocol) onMessage(msg *Message, addr net.Addr) {
// Although we are aware that such method exists, we ignore. // Although we are aware that such method exists, we ignore.
default: default:
zap.L().Debug("A KRPC query of an unknown method received!", // zap.L().Debug("A KRPC query of an unknown method received!", zap.String("method", msg.Q))
zap.String("method", msg.Q))
return return
} }
case "r": case "r":
// get_peers > find_node > ping / announce_peer // get_peers > find_node > ping / announce_peer
if len(msg.R.Token) != 0 { // The message should be a get_peers response. if len(msg.R.Token) != 0 { // The message should be a get_peers response.
if !validateGetPeersResponseMessage(msg) { if !validateGetPeersResponseMessage(msg) {
zap.L().Debug("An invalid get_peers response received!") // zap.L().Debug("An invalid get_peers response received!")
return return
} }
if p.eventHandlers.OnGetPeersResponse != nil { if p.eventHandlers.OnGetPeersResponse != nil {
@ -120,7 +119,7 @@ func (p *Protocol) onMessage(msg *Message, addr net.Addr) {
} }
} else if len(msg.R.Nodes) != 0 { // The message should be a find_node response. } else if len(msg.R.Nodes) != 0 { // The message should be a find_node response.
if !validateFindNodeResponseMessage(msg) { if !validateFindNodeResponseMessage(msg) {
zap.L().Debug("An invalid find_node response received!") // zap.L().Debug("An invalid find_node response received!")
return return
} }
if p.eventHandlers.OnFindNodeResponse != nil { if p.eventHandlers.OnFindNodeResponse != nil {
@ -128,7 +127,7 @@ func (p *Protocol) onMessage(msg *Message, addr net.Addr) {
} }
} else { // The message should be a ping or an announce_peer response. } else { // The message should be a ping or an announce_peer response.
if !validatePingORannouncePeerResponseMessage(msg) { if !validatePingORannouncePeerResponseMessage(msg) {
zap.L().Debug("An invalid ping OR announce_peer response received!") // zap.L().Debug("An invalid ping OR announce_peer response received!")
return return
} }
if p.eventHandlers.OnPingORAnnouncePeerResponse != nil { if p.eventHandlers.OnPingORAnnouncePeerResponse != nil {
@ -147,7 +146,7 @@ func (p *Protocol) onMessage(msg *Message, addr net.Addr) {
} }
} }
func (p *Protocol) SendMessage(msg *Message, addr net.Addr) { func (p *Protocol) SendMessage(msg *Message, addr *net.UDPAddr) {
p.transport.WriteMessages(msg, addr) p.transport.WriteMessages(msg, addr)
} }

View File

@ -1,21 +1,17 @@
package mainline package mainline
import ( import (
"math/rand" "crypto/rand"
"net" "net"
"sync" "sync"
"time" "time"
"github.com/anacrolix/torrent"
"github.com/anacrolix/torrent/metainfo"
"go.uber.org/zap" "go.uber.org/zap"
) )
type TrawlingResult struct { type TrawlingResult struct {
InfoHash metainfo.Hash InfoHash [20]byte
Peer torrent.Peer PeerAddr *net.TCPAddr
PeerIP net.IP
PeerPort int
} }
type TrawlingService struct { type TrawlingService struct {
@ -29,7 +25,7 @@ type TrawlingService struct {
// understandably) slices cannot be used as keys (since they are not hashable), and using arrays // understandably) slices cannot be used as keys (since they are not hashable), and using arrays
// (or even the conversion between each other) is a pain; hence map[string]net.UDPAddr // (or even the conversion between each other) is a pain; hence map[string]net.UDPAddr
// ^~~~~~ // ^~~~~~
routingTable map[string]net.Addr routingTable map[string]*net.UDPAddr
routingTableMutex *sync.Mutex routingTableMutex *sync.Mutex
maxNeighbors uint maxNeighbors uint
} }
@ -50,7 +46,7 @@ func NewTrawlingService(laddr string, initialMaxNeighbors uint, eventHandlers Tr
}, },
) )
service.trueNodeID = make([]byte, 20) service.trueNodeID = make([]byte, 20)
service.routingTable = make(map[string]net.Addr) service.routingTable = make(map[string]*net.UDPAddr)
service.routingTableMutex = new(sync.Mutex) service.routingTableMutex = new(sync.Mutex)
service.eventHandlers = eventHandlers service.eventHandlers = eventHandlers
service.maxNeighbors = initialMaxNeighbors service.maxNeighbors = initialMaxNeighbors
@ -80,8 +76,11 @@ func (s *TrawlingService) Terminate() {
} }
func (s *TrawlingService) trawl() { func (s *TrawlingService) trawl() {
for range time.Tick(3 * time.Second) { for range time.Tick(1 * time.Second) {
s.maxNeighbors = uint(float32(s.maxNeighbors) * 1.01) // TODO
// For some reason, we can't still detect congestion and this keeps increasing...
// Disable for now.
// s.maxNeighbors = uint(float32(s.maxNeighbors) * 1.001)
s.routingTableMutex.Lock() s.routingTableMutex.Lock()
if len(s.routingTable) == 0 { if len(s.routingTable) == 0 {
@ -90,7 +89,7 @@ func (s *TrawlingService) trawl() {
zap.L().Warn("Latest status:", zap.Int("n", len(s.routingTable)), zap.L().Warn("Latest status:", zap.Int("n", len(s.routingTable)),
zap.Uint("maxNeighbors", s.maxNeighbors)) zap.Uint("maxNeighbors", s.maxNeighbors))
s.findNeighbors() s.findNeighbors()
s.routingTable = make(map[string]net.Addr) s.routingTable = make(map[string]*net.UDPAddr)
} }
s.routingTableMutex.Unlock() s.routingTableMutex.Unlock()
} }
@ -114,6 +113,7 @@ func (s *TrawlingService) bootstrap() {
if err != nil { if err != nil {
zap.L().Error("Could NOT resolve (UDP) address of the bootstrapping node!", zap.L().Error("Could NOT resolve (UDP) address of the bootstrapping node!",
zap.String("node", node)) zap.String("node", node))
continue;
} }
s.protocol.SendMessage(NewFindNodeQuery(s.trueNodeID, target), addr) s.protocol.SendMessage(NewFindNodeQuery(s.trueNodeID, target), addr)
@ -135,7 +135,7 @@ func (s *TrawlingService) findNeighbors() {
} }
} }
func (s *TrawlingService) onGetPeersQuery(query *Message, addr net.Addr) { func (s *TrawlingService) onGetPeersQuery(query *Message, addr *net.UDPAddr) {
s.protocol.SendMessage( s.protocol.SendMessage(
NewGetPeersResponseWithNodes( NewGetPeersResponseWithNodes(
query.T, query.T,
@ -147,35 +147,35 @@ func (s *TrawlingService) onGetPeersQuery(query *Message, addr net.Addr) {
) )
} }
func (s *TrawlingService) onAnnouncePeerQuery(query *Message, addr net.Addr) { func (s *TrawlingService) onAnnouncePeerQuery(query *Message, addr *net.UDPAddr) {
/* BEP 5
*
* There is an optional argument called implied_port which value is either 0 or 1. If it is
* present and non-zero, the port argument should be ignored and the source port of the UDP
* packet should be used as the peer's port instead. This is useful for peers behind a NAT that
* may not know their external port, and supporting uTP, they accept incoming connections on the
* same port as the DHT port.
*/
var peerPort int var peerPort int
if query.A.ImpliedPort != 0 { if query.A.ImpliedPort != 0 {
peerPort = addr.(*net.UDPAddr).Port // TODO: Peer uses uTP, ignore for now
// return
peerPort = addr.Port
} else { } else {
peerPort = query.A.Port peerPort = query.A.Port
// return
} }
// TODO: It looks ugly, am I doing it right? --Bora // TODO: It looks ugly, am I doing it right? --Bora
// (Converting slices to arrays in Go shouldn't have been such a pain...) // (Converting slices to arrays in Go shouldn't have been such a pain...)
var peerId, infoHash [20]byte var infoHash [20]byte
copy(peerId[:], []byte("\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"))
copy(infoHash[:], query.A.InfoHash) copy(infoHash[:], query.A.InfoHash)
s.eventHandlers.OnResult(TrawlingResult{ s.eventHandlers.OnResult(TrawlingResult{
InfoHash: infoHash, InfoHash: infoHash,
Peer: torrent.Peer{ PeerAddr: &net.TCPAddr{
// As we don't know the ID of the remote peer, set it empty. IP: addr.IP,
Id: peerId,
IP: addr.(*net.UDPAddr).IP,
Port: peerPort, Port: peerPort,
// "Ha" indicates that we discovered the peer through DHT Announce Peer (query); not
// sure how anacrolix/torrent utilizes that information though.
Source: "Ha",
// We don't know whether the remote peer supports encryption either, but let's pretend
// that it doesn't.
SupportsEncryption: false,
}, },
PeerIP: addr.(*net.UDPAddr).IP,
PeerPort: peerPort,
}) })
s.protocol.SendMessage( s.protocol.SendMessage(
@ -187,19 +187,15 @@ func (s *TrawlingService) onAnnouncePeerQuery(query *Message, addr net.Addr) {
) )
} }
func (s *TrawlingService) onFindNodeResponse(response *Message, addr net.Addr) { func (s *TrawlingService) onFindNodeResponse(response *Message, addr *net.UDPAddr) {
s.routingTableMutex.Lock() s.routingTableMutex.Lock()
defer s.routingTableMutex.Unlock() defer s.routingTableMutex.Unlock()
zap.L().Debug("find node response!!", zap.Uint("maxNeighbors", s.maxNeighbors),
zap.Int("response.R.Nodes length", len(response.R.Nodes)))
for _, node := range response.R.Nodes { for _, node := range response.R.Nodes {
if uint(len(s.routingTable)) >= s.maxNeighbors { if uint(len(s.routingTable)) >= s.maxNeighbors {
break break
} }
if node.Addr.Port == 0 { // Ignore nodes who "use" port 0. if node.Addr.Port == 0 { // Ignore nodes who "use" port 0.
zap.L().Debug("ignoring 0 port!!!")
continue continue
} }
@ -219,6 +215,4 @@ func (s *TrawlingService) onCongestion() {
} }
s.maxNeighbors = uint(float32(s.maxNeighbors) * 0.9) s.maxNeighbors = uint(float32(s.maxNeighbors) * 0.9)
zap.L().Debug("Max. number of neighbours updated!",
zap.Uint("s.maxNeighbors", s.maxNeighbors))
} }

View File

@ -18,12 +18,12 @@ type Transport struct {
// OnMessage is the function that will be called when Transport receives a packet that is // OnMessage is the function that will be called when Transport receives a packet that is
// successfully unmarshalled as a syntactically correct Message (but -of course- the checking // successfully unmarshalled as a syntactically correct Message (but -of course- the checking
// the semantic correctness of the Message is left to Protocol). // the semantic correctness of the Message is left to Protocol).
onMessage func(*Message, net.Addr) onMessage func(*Message, *net.UDPAddr)
// OnCongestion // OnCongestion
onCongestion func() onCongestion func()
} }
func NewTransport(laddr string, onMessage func(*Message, net.Addr), onCongestion func()) *Transport { func NewTransport(laddr string, onMessage func(*Message, *net.UDPAddr), onCongestion func()) *Transport {
t := new(Transport) t := new(Transport)
/* The field size sets a theoretical limit of 65,535 bytes (8 byte header + 65,527 bytes of /* The field size sets a theoretical limit of 65,535 bytes (8 byte header + 65,527 bytes of
* data) for a UDP datagram. However the actual limit for the data length, which is imposed by * data) for a UDP datagram. However the actual limit for the data length, which is imposed by
@ -95,39 +95,44 @@ func (t *Transport) readMessages() {
t.onCongestion() t.onCongestion()
} else if err != nil { } else if err != nil {
// TODO: isn't there a more reliable way to detect if UDPConn is closed? // TODO: isn't there a more reliable way to detect if UDPConn is closed?
zap.L().Debug("Could NOT read an UDP packet!", zap.Error(err)) zap.L().Warn("Could NOT read an UDP packet!", zap.Error(err))
} }
if n == 0 { if n == 0 {
/* Datagram sockets in various domains (e.g., the UNIX and Internet domains) permit /* Datagram sockets in various domains (e.g., the UNIX and Internet domains) permit
* zero-length datagrams. When such a datagram is received, the return value (n) is 0. * zero-length datagrams. When such a datagram is received, the return value (n) is 0.
*/ */
zap.L().Debug("zero-length received!!")
continue continue
} }
from := sockaddr.SockaddrToUDPAddr(fromSA) from := sockaddr.SockaddrToUDPAddr(fromSA)
if from == nil {
zap.L().Panic("dht mainline transport SockaddrToUDPAddr: nil")
}
var msg Message var msg Message
err = bencode.Unmarshal(t.buffer[:n], &msg) err = bencode.Unmarshal(t.buffer[:n], &msg)
if err != nil { if err != nil {
zap.L().Debug("Could NOT unmarshal packet data!", zap.Error(err)) // couldn't unmarshal packet data
continue
} }
zap.L().Debug("message read! (first 20...)", zap.ByteString("msg", t.buffer[:20]))
t.onMessage(&msg, from) t.onMessage(&msg, from)
} }
} }
func (t *Transport) WriteMessages(msg *Message, addr net.Addr) { func (t *Transport) WriteMessages(msg *Message, addr *net.UDPAddr) {
data, err := bencode.Marshal(msg) data, err := bencode.Marshal(msg)
if err != nil { if err != nil {
zap.L().Panic("Could NOT marshal an outgoing message! (Programmer error.)") zap.L().Panic("Could NOT marshal an outgoing message! (Programmer error.)")
} }
addrSA := sockaddr.NetAddrToSockaddr(addr) addrSA := sockaddr.NetAddrToSockaddr(addr)
if addrSA == nil {
zap.L().Debug("sent message!!!") zap.L().Debug("Wrong net address for the remote peer!",
zap.String("addr", addr.String()))
return;
}
err = unix.Sendto(t.fd, data, 0, addrSA) err = unix.Sendto(t.fd, data, 0, addrSA)
if err == unix.EPERM || err == unix.ENOBUFS { if err == unix.EPERM || err == unix.ENOBUFS {
@ -148,6 +153,6 @@ func (t *Transport) WriteMessages(msg *Message, addr net.Addr) {
zap.L().Warn("WRITE CONGESTION!", zap.Error(err)) zap.L().Warn("WRITE CONGESTION!", zap.Error(err))
t.onCongestion() t.onCongestion()
} else if err != nil { } else if err != nil {
zap.L().Debug("Could NOT write an UDP packet!", zap.Error(err)) zap.L().Warn("Could NOT write an UDP packet!", zap.Error(err))
} }
} }

View File

@ -1,7 +1,6 @@
package main package main
import ( import (
"encoding/hex"
"fmt" "fmt"
"net" "net"
"os" "os"
@ -9,6 +8,7 @@ import (
"runtime/pprof" "runtime/pprof"
"time" "time"
"github.com/boramalper/magnetico/pkg/util"
"github.com/jessevdk/go-flags" "github.com/jessevdk/go-flags"
"go.uber.org/zap" "go.uber.org/zap"
"go.uber.org/zap/zapcore" "go.uber.org/zap/zapcore"
@ -74,8 +74,6 @@ func main() {
zap.ReplaceGlobals(logger) zap.ReplaceGlobals(logger)
zap.L().Debug("debug message!")
switch opFlags.Profile { switch opFlags.Profile {
case "cpu": case "cpu":
file, err := os.OpenFile("magneticod_cpu.prof", os.O_CREATE | os.O_WRONLY, 0755) file, err := os.OpenFile("magneticod_cpu.prof", os.O_CREATE | os.O_WRONLY, 0755)
@ -109,7 +107,7 @@ func main() {
for stopped := false; !stopped; { for stopped := false; !stopped; {
select { select {
case result := <-trawlingManager.Output(): case result := <-trawlingManager.Output():
zap.L().Debug("Trawled!", zap.String("infoHash", result.InfoHash.String())) zap.L().Debug("Trawled!", util.HexField("infoHash", result.InfoHash[:]))
exists, err := database.DoesTorrentExist(result.InfoHash[:]) exists, err := database.DoesTorrentExist(result.InfoHash[:])
if err != nil { if err != nil {
zap.L().Fatal("Could not check whether torrent exists!", zap.Error(err)) zap.L().Fatal("Could not check whether torrent exists!", zap.Error(err))
@ -117,12 +115,12 @@ func main() {
metadataSink.Sink(result) metadataSink.Sink(result)
} }
case metadata := <-metadataSink.Drain(): case md := <-metadataSink.Drain():
if err := database.AddNewTorrent(metadata.InfoHash, metadata.Name, metadata.Files); err != nil { if err := database.AddNewTorrent(md.InfoHash, md.Name, md.Files); err != nil {
logger.Sugar().Fatalf("Could not add new torrent %x to the database: %s", logger.Sugar().Fatalf("Could not add new torrent %x to the database: %s",
metadata.InfoHash, err.Error()) md.InfoHash, err.Error())
} }
zap.L().Info("Fetched!", zap.String("name", metadata.Name), zap.String("infoHash", hex.EncodeToString(metadata.InfoHash))) zap.L().Info("Fetched!", zap.String("name", md.Name), util.HexField("infoHash", md.InfoHash))
case <-interruptChan: case <-interruptChan:
trawlingManager.Terminate() trawlingManager.Terminate()

View File

@ -162,7 +162,7 @@ func main() {
decoder.IgnoreUnknownKeys(false) decoder.IgnoreUnknownKeys(false)
decoder.ZeroEmpty(true) decoder.ZeroEmpty(true)
zap.L().Info("magneticow is ready to serve!") zap.S().Infof("magneticow is ready to serve on %s!", opts.Addr)
err = http.ListenAndServe(opts.Addr, router) err = http.ListenAndServe(opts.Addr, router)
if err != nil { if err != nil {
zap.L().Error("ListenAndServe error", zap.Error(err)) zap.L().Error("ListenAndServe error", zap.Error(err))
@ -203,10 +203,13 @@ func parseFlags() error {
opts.Addr = cmdFlags.Addr opts.Addr = cmdFlags.Addr
if cmdFlags.Database == "" { if cmdFlags.Database == "" {
opts.Database = "sqlite3://" + path.Join( opts.Database =
appdirs.UserDataDir("magneticod", "", "", false), "sqlite3://" +
"database.sqlite3", appdirs.UserDataDir("magneticod", "", "", false) +
) "/database.sqlite3" +
"?_journal_mode=WAL" // https://github.com/mattn/go-sqlite3#connection-string
} else {
opts.Database = cmdFlags.Database
} }
if cmdFlags.Cred == "" && !cmdFlags.NoAuth { if cmdFlags.Cred == "" && !cmdFlags.NoAuth {

View File

@ -109,13 +109,13 @@ func MakeDatabase(rawURL string, logger *zap.Logger) (Database, error) {
return makeSqlite3Database(url_) return makeSqlite3Database(url_)
case "postgresql": case "postgresql":
return nil, fmt.Errorf("postgresql is not yet supported!") return nil, fmt.Errorf("postgresql is not yet supported")
case "mysql": case "mysql":
return nil, fmt.Errorf("mysql is not yet supported!") return nil, fmt.Errorf("mysql is not yet supported")
default: default:
return nil, fmt.Errorf("unknown URI scheme (database engine)!") return nil, fmt.Errorf("unknown URI scheme: `%s`", url_.Scheme)
} }
} }

View File

@ -32,7 +32,7 @@ func TestParseISO8601(t *testing.T) {
for i, date := range validDates { for i, date := range validDates {
_, gr, err := ParseISO8601(date.date) _, gr, err := ParseISO8601(date.date)
if err != nil { if err != nil {
t.Errorf("Error while parsing valid date #%d", i+1, err) t.Errorf("Error while parsing valid date #%d: %s", i+1, err.Error())
continue continue
} }

View File

@ -30,7 +30,8 @@ func makeSqlite3Database(url_ *url.URL) (Database, error) {
} }
var err error var err error
db.conn, err = sql.Open("sqlite3", url_.Path) url_.Scheme = ""
db.conn, err = sql.Open("sqlite3", url_.String())
if err != nil { if err != nil {
return nil, fmt.Errorf("sql.Open: %s", err.Error()) return nil, fmt.Errorf("sql.Open: %s", err.Error())
} }

11
pkg/util/util.go Normal file
View File

@ -0,0 +1,11 @@
package util
import (
"encoding/hex"
"go.uber.org/zap/zapcore"
)
func HexField(key string, val []byte) zapcore.Field {
return zapcore.Field{Key: key, Type: zapcore.StringType, String: hex.EncodeToString(val)}
}