diff --git a/cmd/magneticod/bittorrent/metadata/leech.go b/cmd/magneticod/bittorrent/metadata/leech.go index ff1c18c..d2e0726 100644 --- a/cmd/magneticod/bittorrent/metadata/leech.go +++ b/cmd/magneticod/bittorrent/metadata/leech.go @@ -81,21 +81,18 @@ func (l *Leech) writeAll(b []byte) error { func (l *Leech) doBtHandshake() error { lHandshake := []byte(fmt.Sprintf( "\x13BitTorrent protocol\x00\x00\x00\x00\x00\x10\x00\x01%s%s", - l.infoHash[:], + l.infoHash, l.clientID, )) // ASSERTION if len(lHandshake) != 68 { panic(fmt.Sprintf("len(lHandshake) == %d", len(lHandshake))) } - err := l.writeAll(lHandshake) if err != nil { return errors.Wrap(err, "writeAll lHandshake") } - zap.L().Debug("BitTorrent handshake sent, waiting for the remote's...") - rHandshake, err := l.readExactly(68) if err != nil { return errors.Wrap(err, "readExactly rHandshake") @@ -151,6 +148,15 @@ func (l *Leech) doExHandshake() error { } func (l *Leech) requestAllPieces() error { + /* + * reqq + * An integer, the number of outstanding request messages this client supports without + * dropping any. The default in in libtorrent is 250. + * "handshake message" @ "Extension Protocol" @ http://www.bittorrent.org/beps/bep_0010.html + * + * TODO: maybe by requesting all pieces at once we are exceeding this limit? maybe we should + * request as we receive pieces? + */ // Request all the pieces of metadata nPieces := int(math.Ceil(float64(l.metadataSize) / math.Pow(2, 14))) for piece := 0; piece < nPieces; piece++ { @@ -205,6 +211,11 @@ func (l *Leech) readExMessage() ([]byte, error) { return nil, errors.Wrap(err, "readMessage") } + // Every extension message has at least 2 bytes. + if len(rMessage) < 2 { + continue; + } + // We are interested only in extension messages, whose first byte is always 20 if rMessage[0] == 20 { return rMessage, nil @@ -232,19 +243,50 @@ func (l *Leech) readUmMessage() ([]byte, error) { func (l *Leech) connect(deadline time.Time) error { var err error - l.conn, err = net.DialTCP("tcp", nil, l.peerAddr) + l.conn, err = net.DialTCP("tcp4", nil, l.peerAddr) if err != nil { return errors.Wrap(err, "dial") } - defer l.conn.Close() + + // > If sec == 0, operating system discards any unsent or unacknowledged data [after Close() + // > has been called]. + err = l.conn.SetLinger(0) + if err != nil { + if err := l.conn.Close(); err != nil { + zap.L().Panic("couldn't close leech connection!", zap.Error(err)) + } + return errors.Wrap(err, "SetLinger") + } + + err = l.conn.SetKeepAlive(true) + if err != nil { + if err := l.conn.Close(); err != nil { + zap.L().Panic("couldn't close leech connection!", zap.Error(err)) + } + return errors.Wrap(err, "SetKeepAlive") + } + + err = l.conn.SetKeepAlivePeriod(10 * time.Second) + if err != nil { + if err := l.conn.Close(); err != nil { + zap.L().Panic("couldn't close leech connection!", zap.Error(err)) + } + return errors.Wrap(err, "SetKeepAlivePeriod") + } err = l.conn.SetNoDelay(true) if err != nil { + if err := l.conn.Close(); err != nil { + zap.L().Panic("couldn't close leech connection!", zap.Error(err)) + } return errors.Wrap(err, "NODELAY") } err = l.conn.SetDeadline(deadline) if err != nil { + if err := l.conn.Close(); err != nil { + zap.L().Panic("couldn't close leech connection!", zap.Error(err)) + } return errors.Wrap(err, "SetDeadline") } @@ -257,6 +299,11 @@ func (l *Leech) Do(deadline time.Time) { l.OnError(errors.Wrap(err, "connect")) return } + defer func() { + if err := l.conn.Close(); err != nil { + zap.L().Panic("couldn't close leech connection!", zap.Error(err)) + } + }() err = l.doBtHandshake() if err != nil { diff --git a/cmd/magneticod/bittorrent/metadata/sink.go b/cmd/magneticod/bittorrent/metadata/sink.go index fef4572..605817f 100644 --- a/cmd/magneticod/bittorrent/metadata/sink.go +++ b/cmd/magneticod/bittorrent/metadata/sink.go @@ -2,11 +2,9 @@ package metadata import ( "crypto/rand" - "fmt" - "net" - "strings" "time" + "github.com/boramalper/magnetico/pkg/util" "go.uber.org/zap" "github.com/boramalper/magnetico/cmd/magneticod/dht/mainline" @@ -62,29 +60,9 @@ func (ms *Sink) Sink(res mainline.TrawlingResult) { // check whether res.infoHash exists in the ms.incomingInfoHashes, and where we add the infoHash // to the incomingInfoHashes at the end of this function. - zap.L().Info("Sunk!", zap.String("infoHash", res.InfoHash.String())) + zap.L().Info("Sunk!", zap.Int("leeches", len(ms.incomingInfoHashes)), util.HexField("infoHash", res.InfoHash[:])) - IPs := res.PeerIP.String() - var rhostport string - if IPs == "" { - zap.L().Debug("Sink.Sink: Peer IP is nil!") - return - } else if IPs[0] == '?' { - zap.L().Debug("Sink.Sink: Peer IP is invalid!") - return - } else if strings.ContainsRune(IPs, ':') { // IPv6 - rhostport = fmt.Sprintf("[%s]:%d", IPs, res.PeerPort) - } else { // IPv4 - rhostport = fmt.Sprintf("%s:%d", IPs, res.PeerPort) - } - - raddr, err := net.ResolveTCPAddr("tcp", rhostport) - if err != nil { - zap.L().Debug("Sink.Sink: Couldn't resolve peer address!", zap.Error(err)) - return - } - - leech := NewLeech(res.InfoHash, raddr, LeechEventHandlers{ + leech := NewLeech(res.InfoHash, res.PeerAddr, LeechEventHandlers{ OnSuccess: ms.flush, OnError: ms.onLeechError, }) @@ -118,6 +96,6 @@ func (ms *Sink) flush(result Metadata) { } func (ms *Sink) onLeechError(infoHash [20]byte, err error) { - zap.L().Debug("leech error", zap.ByteString("infoHash", infoHash[:]), zap.Error(err)) + zap.L().Debug("leech error", util.HexField("infoHash", infoHash[:]), zap.Error(err)) delete(ms.incomingInfoHashes, infoHash) } diff --git a/cmd/magneticod/dht/mainline/protocol.go b/cmd/magneticod/dht/mainline/protocol.go index 9ec2849..db67136 100644 --- a/cmd/magneticod/dht/mainline/protocol.go +++ b/cmd/magneticod/dht/mainline/protocol.go @@ -19,13 +19,13 @@ type Protocol struct { } type ProtocolEventHandlers struct { - OnPingQuery func(*Message, net.Addr) - OnFindNodeQuery func(*Message, net.Addr) - OnGetPeersQuery func(*Message, net.Addr) - OnAnnouncePeerQuery func(*Message, net.Addr) - OnGetPeersResponse func(*Message, net.Addr) - OnFindNodeResponse func(*Message, net.Addr) - OnPingORAnnouncePeerResponse func(*Message, net.Addr) + OnPingQuery func(*Message, *net.UDPAddr) + OnFindNodeQuery func(*Message, *net.UDPAddr) + OnGetPeersQuery func(*Message, *net.UDPAddr) + OnAnnouncePeerQuery func(*Message, *net.UDPAddr) + OnGetPeersResponse func(*Message, *net.UDPAddr) + OnFindNodeResponse func(*Message, *net.UDPAddr) + OnPingORAnnouncePeerResponse func(*Message, *net.UDPAddr) OnCongestion func() } @@ -58,13 +58,13 @@ func (p *Protocol) Terminate() { p.transport.Terminate() } -func (p *Protocol) onMessage(msg *Message, addr net.Addr) { +func (p *Protocol) onMessage(msg *Message, addr *net.UDPAddr) { switch msg.Y { case "q": switch msg.Q { case "ping": if !validatePingQueryMessage(msg) { - zap.L().Debug("An invalid ping query received!") + // zap.L().Debug("An invalid ping query received!") return } // Check whether there is a registered event handler for the ping queries, before @@ -75,7 +75,7 @@ func (p *Protocol) onMessage(msg *Message, addr net.Addr) { case "find_node": if !validateFindNodeQueryMessage(msg) { - zap.L().Debug("An invalid find_node query received!") + // zap.L().Debug("An invalid find_node query received!") return } if p.eventHandlers.OnFindNodeQuery != nil { @@ -84,7 +84,7 @@ func (p *Protocol) onMessage(msg *Message, addr net.Addr) { case "get_peers": if !validateGetPeersQueryMessage(msg) { - zap.L().Debug("An invalid get_peers query received!") + // zap.L().Debug("An invalid get_peers query received!") return } if p.eventHandlers.OnGetPeersQuery != nil { @@ -93,7 +93,7 @@ func (p *Protocol) onMessage(msg *Message, addr net.Addr) { case "announce_peer": if !validateAnnouncePeerQueryMessage(msg) { - zap.L().Debug("An invalid announce_peer query received!") + // zap.L().Debug("An invalid announce_peer query received!") return } if p.eventHandlers.OnAnnouncePeerQuery != nil { @@ -104,15 +104,14 @@ func (p *Protocol) onMessage(msg *Message, addr net.Addr) { // Although we are aware that such method exists, we ignore. default: - zap.L().Debug("A KRPC query of an unknown method received!", - zap.String("method", msg.Q)) + // zap.L().Debug("A KRPC query of an unknown method received!", zap.String("method", msg.Q)) return } case "r": // get_peers > find_node > ping / announce_peer if len(msg.R.Token) != 0 { // The message should be a get_peers response. if !validateGetPeersResponseMessage(msg) { - zap.L().Debug("An invalid get_peers response received!") + // zap.L().Debug("An invalid get_peers response received!") return } if p.eventHandlers.OnGetPeersResponse != nil { @@ -120,7 +119,7 @@ func (p *Protocol) onMessage(msg *Message, addr net.Addr) { } } else if len(msg.R.Nodes) != 0 { // The message should be a find_node response. if !validateFindNodeResponseMessage(msg) { - zap.L().Debug("An invalid find_node response received!") + // zap.L().Debug("An invalid find_node response received!") return } if p.eventHandlers.OnFindNodeResponse != nil { @@ -128,7 +127,7 @@ func (p *Protocol) onMessage(msg *Message, addr net.Addr) { } } else { // The message should be a ping or an announce_peer response. if !validatePingORannouncePeerResponseMessage(msg) { - zap.L().Debug("An invalid ping OR announce_peer response received!") + // zap.L().Debug("An invalid ping OR announce_peer response received!") return } if p.eventHandlers.OnPingORAnnouncePeerResponse != nil { @@ -147,7 +146,7 @@ func (p *Protocol) onMessage(msg *Message, addr net.Addr) { } } -func (p *Protocol) SendMessage(msg *Message, addr net.Addr) { +func (p *Protocol) SendMessage(msg *Message, addr *net.UDPAddr) { p.transport.WriteMessages(msg, addr) } diff --git a/cmd/magneticod/dht/mainline/service.go b/cmd/magneticod/dht/mainline/service.go index e0e754e..f1a8573 100644 --- a/cmd/magneticod/dht/mainline/service.go +++ b/cmd/magneticod/dht/mainline/service.go @@ -1,21 +1,17 @@ package mainline import ( - "math/rand" + "crypto/rand" "net" "sync" "time" - "github.com/anacrolix/torrent" - "github.com/anacrolix/torrent/metainfo" "go.uber.org/zap" ) type TrawlingResult struct { - InfoHash metainfo.Hash - Peer torrent.Peer - PeerIP net.IP - PeerPort int + InfoHash [20]byte + PeerAddr *net.TCPAddr } type TrawlingService struct { @@ -29,7 +25,7 @@ type TrawlingService struct { // understandably) slices cannot be used as keys (since they are not hashable), and using arrays // (or even the conversion between each other) is a pain; hence map[string]net.UDPAddr // ^~~~~~ - routingTable map[string]net.Addr + routingTable map[string]*net.UDPAddr routingTableMutex *sync.Mutex maxNeighbors uint } @@ -50,7 +46,7 @@ func NewTrawlingService(laddr string, initialMaxNeighbors uint, eventHandlers Tr }, ) service.trueNodeID = make([]byte, 20) - service.routingTable = make(map[string]net.Addr) + service.routingTable = make(map[string]*net.UDPAddr) service.routingTableMutex = new(sync.Mutex) service.eventHandlers = eventHandlers service.maxNeighbors = initialMaxNeighbors @@ -80,8 +76,11 @@ func (s *TrawlingService) Terminate() { } func (s *TrawlingService) trawl() { - for range time.Tick(3 * time.Second) { - s.maxNeighbors = uint(float32(s.maxNeighbors) * 1.01) + for range time.Tick(1 * time.Second) { + // TODO + // For some reason, we can't still detect congestion and this keeps increasing... + // Disable for now. + // s.maxNeighbors = uint(float32(s.maxNeighbors) * 1.001) s.routingTableMutex.Lock() if len(s.routingTable) == 0 { @@ -90,7 +89,7 @@ func (s *TrawlingService) trawl() { zap.L().Warn("Latest status:", zap.Int("n", len(s.routingTable)), zap.Uint("maxNeighbors", s.maxNeighbors)) s.findNeighbors() - s.routingTable = make(map[string]net.Addr) + s.routingTable = make(map[string]*net.UDPAddr) } s.routingTableMutex.Unlock() } @@ -114,6 +113,7 @@ func (s *TrawlingService) bootstrap() { if err != nil { zap.L().Error("Could NOT resolve (UDP) address of the bootstrapping node!", zap.String("node", node)) + continue; } s.protocol.SendMessage(NewFindNodeQuery(s.trueNodeID, target), addr) @@ -135,7 +135,7 @@ func (s *TrawlingService) findNeighbors() { } } -func (s *TrawlingService) onGetPeersQuery(query *Message, addr net.Addr) { +func (s *TrawlingService) onGetPeersQuery(query *Message, addr *net.UDPAddr) { s.protocol.SendMessage( NewGetPeersResponseWithNodes( query.T, @@ -147,35 +147,35 @@ func (s *TrawlingService) onGetPeersQuery(query *Message, addr net.Addr) { ) } -func (s *TrawlingService) onAnnouncePeerQuery(query *Message, addr net.Addr) { +func (s *TrawlingService) onAnnouncePeerQuery(query *Message, addr *net.UDPAddr) { + /* BEP 5 + * + * There is an optional argument called implied_port which value is either 0 or 1. If it is + * present and non-zero, the port argument should be ignored and the source port of the UDP + * packet should be used as the peer's port instead. This is useful for peers behind a NAT that + * may not know their external port, and supporting uTP, they accept incoming connections on the + * same port as the DHT port. + */ var peerPort int if query.A.ImpliedPort != 0 { - peerPort = addr.(*net.UDPAddr).Port + // TODO: Peer uses uTP, ignore for now + // return + peerPort = addr.Port } else { peerPort = query.A.Port + // return } // TODO: It looks ugly, am I doing it right? --Bora // (Converting slices to arrays in Go shouldn't have been such a pain...) - var peerId, infoHash [20]byte - copy(peerId[:], []byte("\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00")) + var infoHash [20]byte copy(infoHash[:], query.A.InfoHash) s.eventHandlers.OnResult(TrawlingResult{ InfoHash: infoHash, - Peer: torrent.Peer{ - // As we don't know the ID of the remote peer, set it empty. - Id: peerId, - IP: addr.(*net.UDPAddr).IP, + PeerAddr: &net.TCPAddr{ + IP: addr.IP, Port: peerPort, - // "Ha" indicates that we discovered the peer through DHT Announce Peer (query); not - // sure how anacrolix/torrent utilizes that information though. - Source: "Ha", - // We don't know whether the remote peer supports encryption either, but let's pretend - // that it doesn't. - SupportsEncryption: false, }, - PeerIP: addr.(*net.UDPAddr).IP, - PeerPort: peerPort, }) s.protocol.SendMessage( @@ -187,19 +187,15 @@ func (s *TrawlingService) onAnnouncePeerQuery(query *Message, addr net.Addr) { ) } -func (s *TrawlingService) onFindNodeResponse(response *Message, addr net.Addr) { +func (s *TrawlingService) onFindNodeResponse(response *Message, addr *net.UDPAddr) { s.routingTableMutex.Lock() defer s.routingTableMutex.Unlock() - zap.L().Debug("find node response!!", zap.Uint("maxNeighbors", s.maxNeighbors), - zap.Int("response.R.Nodes length", len(response.R.Nodes))) - for _, node := range response.R.Nodes { if uint(len(s.routingTable)) >= s.maxNeighbors { break } if node.Addr.Port == 0 { // Ignore nodes who "use" port 0. - zap.L().Debug("ignoring 0 port!!!") continue } @@ -219,6 +215,4 @@ func (s *TrawlingService) onCongestion() { } s.maxNeighbors = uint(float32(s.maxNeighbors) * 0.9) - zap.L().Debug("Max. number of neighbours updated!", - zap.Uint("s.maxNeighbors", s.maxNeighbors)) } diff --git a/cmd/magneticod/dht/mainline/transport.go b/cmd/magneticod/dht/mainline/transport.go index 60f37e7..2cdbbcf 100644 --- a/cmd/magneticod/dht/mainline/transport.go +++ b/cmd/magneticod/dht/mainline/transport.go @@ -18,12 +18,12 @@ type Transport struct { // OnMessage is the function that will be called when Transport receives a packet that is // successfully unmarshalled as a syntactically correct Message (but -of course- the checking // the semantic correctness of the Message is left to Protocol). - onMessage func(*Message, net.Addr) + onMessage func(*Message, *net.UDPAddr) // OnCongestion onCongestion func() } -func NewTransport(laddr string, onMessage func(*Message, net.Addr), onCongestion func()) *Transport { +func NewTransport(laddr string, onMessage func(*Message, *net.UDPAddr), onCongestion func()) *Transport { t := new(Transport) /* The field size sets a theoretical limit of 65,535 bytes (8 byte header + 65,527 bytes of * data) for a UDP datagram. However the actual limit for the data length, which is imposed by @@ -95,39 +95,44 @@ func (t *Transport) readMessages() { t.onCongestion() } else if err != nil { // TODO: isn't there a more reliable way to detect if UDPConn is closed? - zap.L().Debug("Could NOT read an UDP packet!", zap.Error(err)) + zap.L().Warn("Could NOT read an UDP packet!", zap.Error(err)) } if n == 0 { /* Datagram sockets in various domains (e.g., the UNIX and Internet domains) permit * zero-length datagrams. When such a datagram is received, the return value (n) is 0. */ - zap.L().Debug("zero-length received!!") continue } from := sockaddr.SockaddrToUDPAddr(fromSA) + if from == nil { + zap.L().Panic("dht mainline transport SockaddrToUDPAddr: nil") + } var msg Message err = bencode.Unmarshal(t.buffer[:n], &msg) if err != nil { - zap.L().Debug("Could NOT unmarshal packet data!", zap.Error(err)) + // couldn't unmarshal packet data + continue } - zap.L().Debug("message read! (first 20...)", zap.ByteString("msg", t.buffer[:20])) t.onMessage(&msg, from) } } -func (t *Transport) WriteMessages(msg *Message, addr net.Addr) { +func (t *Transport) WriteMessages(msg *Message, addr *net.UDPAddr) { data, err := bencode.Marshal(msg) if err != nil { zap.L().Panic("Could NOT marshal an outgoing message! (Programmer error.)") } addrSA := sockaddr.NetAddrToSockaddr(addr) - - zap.L().Debug("sent message!!!") + if addrSA == nil { + zap.L().Debug("Wrong net address for the remote peer!", + zap.String("addr", addr.String())) + return; + } err = unix.Sendto(t.fd, data, 0, addrSA) if err == unix.EPERM || err == unix.ENOBUFS { @@ -148,6 +153,6 @@ func (t *Transport) WriteMessages(msg *Message, addr net.Addr) { zap.L().Warn("WRITE CONGESTION!", zap.Error(err)) t.onCongestion() } else if err != nil { - zap.L().Debug("Could NOT write an UDP packet!", zap.Error(err)) + zap.L().Warn("Could NOT write an UDP packet!", zap.Error(err)) } } diff --git a/cmd/magneticod/main.go b/cmd/magneticod/main.go index 9d92ff1..eaf544e 100644 --- a/cmd/magneticod/main.go +++ b/cmd/magneticod/main.go @@ -1,7 +1,6 @@ package main import ( - "encoding/hex" "fmt" "net" "os" @@ -9,6 +8,7 @@ import ( "runtime/pprof" "time" + "github.com/boramalper/magnetico/pkg/util" "github.com/jessevdk/go-flags" "go.uber.org/zap" "go.uber.org/zap/zapcore" @@ -74,8 +74,6 @@ func main() { zap.ReplaceGlobals(logger) - zap.L().Debug("debug message!") - switch opFlags.Profile { case "cpu": file, err := os.OpenFile("magneticod_cpu.prof", os.O_CREATE | os.O_WRONLY, 0755) @@ -109,7 +107,7 @@ func main() { for stopped := false; !stopped; { select { case result := <-trawlingManager.Output(): - zap.L().Debug("Trawled!", zap.String("infoHash", result.InfoHash.String())) + zap.L().Debug("Trawled!", util.HexField("infoHash", result.InfoHash[:])) exists, err := database.DoesTorrentExist(result.InfoHash[:]) if err != nil { zap.L().Fatal("Could not check whether torrent exists!", zap.Error(err)) @@ -117,12 +115,12 @@ func main() { metadataSink.Sink(result) } - case metadata := <-metadataSink.Drain(): - if err := database.AddNewTorrent(metadata.InfoHash, metadata.Name, metadata.Files); err != nil { + case md := <-metadataSink.Drain(): + if err := database.AddNewTorrent(md.InfoHash, md.Name, md.Files); err != nil { logger.Sugar().Fatalf("Could not add new torrent %x to the database: %s", - metadata.InfoHash, err.Error()) + md.InfoHash, err.Error()) } - zap.L().Info("Fetched!", zap.String("name", metadata.Name), zap.String("infoHash", hex.EncodeToString(metadata.InfoHash))) + zap.L().Info("Fetched!", zap.String("name", md.Name), util.HexField("infoHash", md.InfoHash)) case <-interruptChan: trawlingManager.Terminate() diff --git a/cmd/magneticow/main.go b/cmd/magneticow/main.go index 9012540..b9adb63 100644 --- a/cmd/magneticow/main.go +++ b/cmd/magneticow/main.go @@ -162,7 +162,7 @@ func main() { decoder.IgnoreUnknownKeys(false) decoder.ZeroEmpty(true) - zap.L().Info("magneticow is ready to serve!") + zap.S().Infof("magneticow is ready to serve on %s!", opts.Addr) err = http.ListenAndServe(opts.Addr, router) if err != nil { zap.L().Error("ListenAndServe error", zap.Error(err)) @@ -203,10 +203,13 @@ func parseFlags() error { opts.Addr = cmdFlags.Addr if cmdFlags.Database == "" { - opts.Database = "sqlite3://" + path.Join( - appdirs.UserDataDir("magneticod", "", "", false), - "database.sqlite3", - ) + opts.Database = + "sqlite3://" + + appdirs.UserDataDir("magneticod", "", "", false) + + "/database.sqlite3" + + "?_journal_mode=WAL" // https://github.com/mattn/go-sqlite3#connection-string + } else { + opts.Database = cmdFlags.Database } if cmdFlags.Cred == "" && !cmdFlags.NoAuth { diff --git a/pkg/persistence/interface.go b/pkg/persistence/interface.go index 1e8bd91..9ad824e 100644 --- a/pkg/persistence/interface.go +++ b/pkg/persistence/interface.go @@ -109,13 +109,13 @@ func MakeDatabase(rawURL string, logger *zap.Logger) (Database, error) { return makeSqlite3Database(url_) case "postgresql": - return nil, fmt.Errorf("postgresql is not yet supported!") + return nil, fmt.Errorf("postgresql is not yet supported") case "mysql": - return nil, fmt.Errorf("mysql is not yet supported!") + return nil, fmt.Errorf("mysql is not yet supported") default: - return nil, fmt.Errorf("unknown URI scheme (database engine)!") + return nil, fmt.Errorf("unknown URI scheme: `%s`", url_.Scheme) } } diff --git a/pkg/persistence/iso8601_test.go b/pkg/persistence/iso8601_test.go index 4120937..ac3b8fa 100644 --- a/pkg/persistence/iso8601_test.go +++ b/pkg/persistence/iso8601_test.go @@ -32,7 +32,7 @@ func TestParseISO8601(t *testing.T) { for i, date := range validDates { _, gr, err := ParseISO8601(date.date) if err != nil { - t.Errorf("Error while parsing valid date #%d", i+1, err) + t.Errorf("Error while parsing valid date #%d: %s", i+1, err.Error()) continue } diff --git a/pkg/persistence/sqlite3.go b/pkg/persistence/sqlite3.go index 131d89a..9ed410d 100644 --- a/pkg/persistence/sqlite3.go +++ b/pkg/persistence/sqlite3.go @@ -30,7 +30,8 @@ func makeSqlite3Database(url_ *url.URL) (Database, error) { } var err error - db.conn, err = sql.Open("sqlite3", url_.Path) + url_.Scheme = "" + db.conn, err = sql.Open("sqlite3", url_.String()) if err != nil { return nil, fmt.Errorf("sql.Open: %s", err.Error()) } diff --git a/pkg/util/util.go b/pkg/util/util.go new file mode 100644 index 0000000..eff2728 --- /dev/null +++ b/pkg/util/util.go @@ -0,0 +1,11 @@ +package util + +import ( + "encoding/hex" + + "go.uber.org/zap/zapcore" +) + +func HexField(key string, val []byte) zapcore.Field { + return zapcore.Field{Key: key, Type: zapcore.StringType, String: hex.EncodeToString(val)} +} \ No newline at end of file