Browse Source

exchange signed routing records in identify (#747)

*  Exchange signed routing records in identify


Co-authored-by: Aarsh Shah <aarshkshah1992@gmail.com>
pull/905/head
Yusef Napora 5 years ago
committed by GitHub
parent
commit
077a81814f
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 94
      p2p/host/basic/basic_host.go
  2. 131
      p2p/host/basic/basic_host_test.go
  3. 200
      p2p/protocol/identify/id.go
  4. 2
      p2p/protocol/identify/id_delta.go
  5. 15
      p2p/protocol/identify/id_push.go
  6. 147
      p2p/protocol/identify/id_test.go
  7. 101
      p2p/protocol/identify/pb/identify.pb.go
  8. 7
      p2p/protocol/identify/pb/identify.proto

94
p2p/host/basic/basic_host.go

@ -2,27 +2,30 @@ package basichost
import (
"context"
"errors"
"io"
"net"
"sync"
"time"
"github.com/libp2p/go-libp2p/p2p/protocol/identify"
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
"github.com/libp2p/go-libp2p-core/connmgr"
"github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/event"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peerstore"
"github.com/libp2p/go-libp2p-core/protocol"
"github.com/libp2p/go-libp2p-core/record"
"github.com/libp2p/go-eventbus"
inat "github.com/libp2p/go-libp2p-nat"
"github.com/libp2p/go-libp2p/p2p/protocol/identify"
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
logging "github.com/ipfs/go-log"
"github.com/multiformats/go-multiaddr"
ma "github.com/multiformats/go-multiaddr"
madns "github.com/multiformats/go-multiaddr-dns"
manet "github.com/multiformats/go-multiaddr-net"
@ -93,6 +96,9 @@ type BasicHost struct {
}
addrChangeChan chan struct{}
signKey crypto.PrivKey
caBook peerstore.CertifiedAddrBook
}
var _ host.Host = (*BasicHost)(nil)
@ -150,10 +156,21 @@ func NewHost(ctx context.Context, net network.Network, opts *HostOpts) (*BasicHo
if h.emitters.evtLocalProtocolsUpdated, err = h.eventbus.Emitter(&event.EvtLocalProtocolsUpdated{}); err != nil {
return nil, err
}
if h.emitters.evtLocalAddrsUpdated, err = h.eventbus.Emitter(&event.EvtLocalAddressesUpdated{}); err != nil {
if h.emitters.evtLocalAddrsUpdated, err = h.eventbus.Emitter(&event.EvtLocalAddressesUpdated{}, eventbus.Stateful); err != nil {
return nil, err
}
cab, ok := peerstore.GetCertifiedAddrBook(net.Peerstore())
if !ok {
return nil, errors.New("peerstore should also be a certified address book")
}
h.caBook = cab
h.signKey = h.Peerstore().PrivKey(h.ID())
if h.signKey == nil {
return nil, errors.New("unable to access host key")
}
if opts.MultistreamMuxer != nil {
h.mux = opts.MultistreamMuxer
}
@ -221,12 +238,12 @@ func New(net network.Network, opts ...interface{}) *BasicHost {
}
h, err := NewHost(context.Background(), net, hostopts)
h.Start()
if err != nil {
// this cannot happen with legacy options
// plus we want to keep the (deprecated) legacy interface unchanged
panic(err)
}
h.Start()
return h
}
@ -327,39 +344,68 @@ func makeUpdatedAddrEvent(prev, current []ma.Multiaddr) *event.EvtLocalAddresses
return &evt
}
func (h *BasicHost) makeSignedPeerRecord(evt *event.EvtLocalAddressesUpdated) (*record.Envelope, error) {
current := make([]multiaddr.Multiaddr, 0, len(evt.Current))
for _, a := range evt.Current {
current = append(current, a.Address)
}
rec := peer.PeerRecordFromAddrInfo(peer.AddrInfo{h.ID(), current})
return record.Seal(rec, h.signKey)
}
func (h *BasicHost) background() {
defer h.refCount.Done()
var lastAddrs []ma.Multiaddr
emitAddrChange := func(currentAddrs []ma.Multiaddr, lastAddrs []ma.Multiaddr) {
// nothing to do if both are nil..defensive check
if currentAddrs == nil && lastAddrs == nil {
return
}
changeEvt := makeUpdatedAddrEvent(lastAddrs, currentAddrs)
if changeEvt == nil {
return
}
// add signed peer record to the event
sr, err := h.makeSignedPeerRecord(changeEvt)
if err != nil {
log.Errorf("error creating a signed peer record from the set of current addresses, err=%s", err)
return
}
changeEvt.SignedPeerRecord = *sr
// persist the signed record to the peerstore
if _, err := h.caBook.ConsumePeerRecord(sr, peerstore.PermanentAddrTTL); err != nil {
log.Errorf("failed to persist signed peer record in peer store, err=%s", err)
return
}
// emit addr change event on the bus
if err := h.emitters.evtLocalAddrsUpdated.Emit(*changeEvt); err != nil {
log.Warnf("error emitting event for updated addrs: %s", err)
}
}
// periodically schedules an IdentifyPush to update our peers for changes
// in our address set (if needed)
ticker := time.NewTicker(10 * time.Second)
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
// initialize lastAddrs
lastAddrs := h.Addrs()
for {
curr := h.Addrs()
emitAddrChange(curr, lastAddrs)
lastAddrs = curr
select {
case <-ticker.C:
case <-h.addrChangeChan:
case <-h.ctx.Done():
return
}
// emit an EvtLocalAddressesUpdatedEvent & a Push Identify if our listen addresses have changed.
addrs := h.Addrs()
changeEvt := makeUpdatedAddrEvent(lastAddrs, addrs)
if changeEvt != nil {
lastAddrs = addrs
}
if changeEvt != nil {
err := h.emitters.evtLocalAddrsUpdated.Emit(*changeEvt)
if err != nil {
log.Warnf("error emitting event for updated addrs: %s", err)
}
h.ids.Push()
}
}
}

131
p2p/host/basic/basic_host_test.go

@ -5,21 +5,24 @@ import (
"context"
"io"
"reflect"
"sort"
"sync"
"testing"
"time"
"github.com/libp2p/go-eventbus"
"github.com/libp2p/go-libp2p-core/event"
"github.com/libp2p/go-libp2p-core/helpers"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peerstore"
"github.com/libp2p/go-libp2p-core/protocol"
"github.com/libp2p/go-libp2p-core/record"
"github.com/libp2p/go-libp2p-core/test"
"github.com/libp2p/go-eventbus"
swarmt "github.com/libp2p/go-libp2p-swarm/testing"
"github.com/libp2p/go-libp2p/p2p/protocol/identify"
ma "github.com/multiformats/go-multiaddr"
madns "github.com/multiformats/go-multiaddr-dns"
"github.com/stretchr/testify/require"
@ -102,16 +105,35 @@ func TestProtocolHandlerEvents(t *testing.T) {
}
defer sub.Close()
assert := func(added, removed []protocol.ID) {
var next event.EvtLocalProtocolsUpdated
select {
case evt := <-sub.Out():
next = evt.(event.EvtLocalProtocolsUpdated)
break
case <-time.After(5 * time.Second):
t.Fatal("event not received in 5 seconds")
// the identify service adds new protocol handlers shortly after the host
// starts. this helps us filter those events out, since they're unrelated
// to the test.
isIdentify := func(evt event.EvtLocalProtocolsUpdated) bool {
for _, p := range evt.Added {
if p == identify.ID || p == identify.IDPush {
return true
}
}
return false
}
nextEvent := func() event.EvtLocalProtocolsUpdated {
for {
select {
case evt := <-sub.Out():
next := evt.(event.EvtLocalProtocolsUpdated)
if isIdentify(next) {
continue
}
return next
case <-time.After(5 * time.Second):
t.Fatal("event not received in 5 seconds")
}
}
}
assert := func(added, removed []protocol.ID) {
next := nextEvent()
if !reflect.DeepEqual(added, next.Added) {
t.Errorf("expected added: %v; received: %v", added, next.Added)
}
@ -460,11 +482,10 @@ func TestAddrResolution(t *testing.T) {
_ = h.Connect(tctx, *pi)
addrs := h.Peerstore().Addrs(pi.ID)
sort.Sort(sortedMultiaddrs(addrs))
if len(addrs) != 2 || !addrs[0].Equal(addr1) || !addrs[1].Equal(addr2) {
t.Fatalf("expected [%s %s], got %+v", addr1, addr2, addrs)
}
require.Len(t, addrs, 2)
require.Contains(t, addrs, addr1)
require.Contains(t, addrs, addr2)
}
func TestAddrResolutionRecursive(t *testing.T) {
@ -515,11 +536,9 @@ func TestAddrResolutionRecursive(t *testing.T) {
_ = h.Connect(tctx, *pi1)
addrs1 := h.Peerstore().Addrs(pi1.ID)
sort.Sort(sortedMultiaddrs(addrs1))
if len(addrs1) != 2 || !addrs1[0].Equal(addr1) || !addrs1[1].Equal(addr2) {
t.Fatalf("expected [%s %s], got %+v", addr1, addr2, addrs1)
}
require.Len(t, addrs1, 2)
require.Contains(t, addrs1, addr1)
require.Contains(t, addrs1, addr2)
pi2, err := peer.AddrInfoFromP2pAddr(p2paddr2)
if err != nil {
@ -529,11 +548,49 @@ func TestAddrResolutionRecursive(t *testing.T) {
_ = h.Connect(tctx, *pi2)
addrs2 := h.Peerstore().Addrs(pi2.ID)
sort.Sort(sortedMultiaddrs(addrs2))
require.Len(t, addrs2, 1)
require.Contains(t, addrs2, addr1)
}
func TestAddrChangeImmediatelyIfAddressNonEmpty(t *testing.T) {
ctx := context.Background()
taddrs := []ma.Multiaddr{ma.StringCast("/ip4/1.2.3.4/tcp/1234")}
if len(addrs2) != 1 || !addrs2[0].Equal(addr1) {
t.Fatalf("expected [%s], got %+v", addr1, addrs2)
h := New(swarmt.GenSwarm(t, ctx), AddrsFactory(func(addrs []ma.Multiaddr) []ma.Multiaddr {
return taddrs
}))
defer h.Close()
sub, err := h.EventBus().Subscribe(&event.EvtLocalAddressesUpdated{})
if err != nil {
t.Error(err)
}
defer sub.Close()
// wait for the host background thread to start
time.Sleep(1 * time.Second)
expected := event.EvtLocalAddressesUpdated{
Diffs: true,
Current: []event.UpdatedAddress{
{Action: event.Added, Address: ma.StringCast("/ip4/1.2.3.4/tcp/1234")},
},
Removed: []event.UpdatedAddress{}}
// assert we get expected event
evt := waitForAddrChangeEvent(ctx, sub, t)
if !updatedAddrEventsEqual(expected, evt) {
t.Errorf("change events not equal: \n\texpected: %v \n\tactual: %v", expected, evt)
}
// assert it's on the signed record
rc := peerRecordFromEnvelope(t, evt.SignedPeerRecord)
require.Equal(t, taddrs, rc.Addrs)
// assert it's in the peerstore
ev := h.Peerstore().(peerstore.CertifiedAddrBook).GetPeerRecord(h.ID())
require.NotNil(t, ev)
rc = peerRecordFromEnvelope(t, *ev)
require.Equal(t, taddrs, rc.Addrs)
}
func TestHostAddrChangeDetection(t *testing.T) {
@ -611,9 +668,18 @@ func TestHostAddrChangeDetection(t *testing.T) {
h.SignalAddressChange()
evt := waitForAddrChangeEvent(ctx, sub, t)
if !updatedAddrEventsEqual(expectedEvents[i-1], evt) {
t.Errorf("change events not equal: \n\texpected: %v \n\tactual: %v", expectedEvents[i], evt)
t.Errorf("change events not equal: \n\texpected: %v \n\tactual: %v", expectedEvents[i-1], evt)
}
// assert it's on the signed record
rc := peerRecordFromEnvelope(t, evt.SignedPeerRecord)
require.Equal(t, addrSets[i], rc.Addrs)
// assert it's in the peerstore
ev := h.Peerstore().(peerstore.CertifiedAddrBook).GetPeerRecord(h.ID())
require.NotNil(t, ev)
rc = peerRecordFromEnvelope(t, *ev)
require.Equal(t, addrSets[i], rc.Addrs)
}
}
@ -672,10 +738,17 @@ func updatedAddrEventsEqual(a, b event.EvtLocalAddressesUpdated) bool {
updatedAddrsEqual(a.Removed, b.Removed)
}
type sortedMultiaddrs []ma.Multiaddr
func (sma sortedMultiaddrs) Len() int { return len(sma) }
func (sma sortedMultiaddrs) Swap(i, j int) { sma[i], sma[j] = sma[j], sma[i] }
func (sma sortedMultiaddrs) Less(i, j int) bool {
return bytes.Compare(sma[i].Bytes(), sma[j].Bytes()) == 1
func peerRecordFromEnvelope(t *testing.T, ev record.Envelope) *peer.PeerRecord {
t.Helper()
rec, err := ev.Record()
if err != nil {
t.Fatalf("error getting PeerRecord from event: %v", err)
return nil
}
peerRec, ok := rec.(*peer.PeerRecord)
if !ok {
t.Fatalf("wrong type for peer record")
return nil
}
return peerRec
}

200
p2p/protocol/identify/id.go

@ -15,13 +15,13 @@ import (
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peerstore"
"github.com/libp2p/go-libp2p-core/protocol"
"github.com/libp2p/go-libp2p-core/record"
"github.com/libp2p/go-eventbus"
pb "github.com/libp2p/go-libp2p/p2p/protocol/identify/pb"
ggio "github.com/gogo/protobuf/io"
logging "github.com/ipfs/go-log"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr-net"
msmux "github.com/multiformats/go-multistream"
@ -30,7 +30,11 @@ import (
var log = logging.Logger("net/identify")
// ID is the protocol.ID of the Identify Service.
const ID = "/ipfs/id/1.0.0"
const ID = "/p2p/id/1.1.0"
// LegacyID is the protocol.ID of version 1.0.0 of the identify
// service, which does not support signed peer records.
const LegacyID = "/ipfs/id/1.0.0"
// LibP2PVersion holds the current protocol version for a client running this code
// TODO(jbenet): fix the versioning mess.
@ -84,11 +88,13 @@ type IDService struct {
addrMu sync.Mutex
peerrec *record.Envelope
peerrecMu sync.RWMutex
// our own observed addresses.
observedAddrs *ObservedAddrManager
subscription event.Subscription
emitters struct {
emitters struct {
evtPeerProtocolsUpdated event.Emitter
evtPeerIdentificationCompleted event.Emitter
evtPeerIdentificationFailed event.Emitter
@ -121,61 +127,90 @@ func NewIDService(h host.Host, opts ...Option) *IDService {
// handle local protocol handler updates, and push deltas to peers.
var err error
s.subscription, err = h.EventBus().Subscribe(&event.EvtLocalProtocolsUpdated{}, eventbus.BufSize(128))
if err != nil {
log.Warningf("identify service not subscribed to local protocol handlers updates; err: %s", err)
} else {
s.refCount.Add(1)
go s.handleEvents()
}
s.refCount.Add(1)
go s.handleEvents()
s.emitters.evtPeerProtocolsUpdated, err = h.EventBus().Emitter(&event.EvtPeerProtocolsUpdated{})
if err != nil {
log.Warningf("identify service not emitting peer protocol updates; err: %s", err)
log.Warnf("identify service not emitting peer protocol updates; err: %s", err)
}
s.emitters.evtPeerIdentificationCompleted, err = h.EventBus().Emitter(&event.EvtPeerIdentificationCompleted{})
if err != nil {
log.Warningf("identify service not emitting identification completed events; err: %s", err)
log.Warnf("identify service not emitting identification completed events; err: %s", err)
}
s.emitters.evtPeerIdentificationFailed, err = h.EventBus().Emitter(&event.EvtPeerIdentificationFailed{})
if err != nil {
log.Warningf("identify service not emitting identification failed events; err: %s", err)
log.Warnf("identify service not emitting identification failed events; err: %s", err)
}
// register protocols that do not depend on peer records.
h.SetStreamHandler(IDDelta, s.deltaHandler)
h.SetStreamHandler(LegacyID, s.requestHandler)
h.SetStreamHandler(LegacyIDPush, s.pushHandler)
// register protocols that depend on peer records.
h.SetStreamHandler(ID, s.requestHandler)
h.SetStreamHandler(IDPush, s.pushHandler)
h.SetStreamHandler(IDDelta, s.deltaHandler)
h.Network().Notify((*netNotifiee)(s))
return s
}
// Close shuts down the IDService
func (ids *IDService) Close() error {
ids.closeSync.Do(func() {
ids.ctxCancel()
ids.refCount.Wait()
})
return nil
}
func (ids *IDService) handleEvents() {
sub := ids.subscription
defer ids.refCount.Done()
sub, err := ids.Host.EventBus().Subscribe([]interface{}{&event.EvtLocalProtocolsUpdated{},
&event.EvtLocalAddressesUpdated{}}, eventbus.BufSize(256))
if err != nil {
log.Errorf("failed to subscribe to events on the bus, err=%s", err)
return
}
defer sub.Close()
for {
select {
case evt, more := <-sub.Out():
case e, more := <-sub.Out():
if !more {
return
}
ids.fireProtocolDelta(evt.(event.EvtLocalProtocolsUpdated))
switch evt := e.(type) {
case event.EvtLocalAddressesUpdated:
ids.handleLocalAddrsUpdated(evt)
case event.EvtLocalProtocolsUpdated:
ids.handleProtosChanged(evt)
}
case <-ids.ctx.Done():
return
}
}
}
// Close shuts down the IDService
func (ids *IDService) Close() error {
ids.closeSync.Do(func() {
ids.ctxCancel()
ids.refCount.Wait()
})
return nil
}
func (ids *IDService) handleProtosChanged(evt event.EvtLocalProtocolsUpdated) {
ids.fireProtocolDelta(evt)
}
func (ids *IDService) handleLocalAddrsUpdated(evt event.EvtLocalAddressesUpdated) {
ids.peerrecMu.Lock()
rec := evt.SignedPeerRecord
ids.peerrec = &rec
ids.peerrecMu.Unlock()
log.Debug("triggering push based on updated local PeerRecord")
ids.Push()
}
// OwnObservedAddrs returns the addresses peers have reported we've dialed from
func (ids *IDService) OwnObservedAddrs() []ma.Multiaddr {
return ids.observedAddrs.Addrs()
@ -259,25 +294,29 @@ func (ids *IDService) identifyConn(c network.Conn, signal chan struct{}) {
return
}
s.SetProtocol(ID)
protocolIDs := []string{ID, LegacyID}
// ok give the response to our handler.
if err = msmux.SelectProtoOrFail(ID, s); err != nil {
var selectedProto string
if selectedProto, err = msmux.SelectOneOf(protocolIDs, s); err != nil {
log.Event(context.TODO(), "IdentifyOpenFailed", c.RemotePeer(), logging.Metadata{"error": err})
s.Reset()
return
}
s.SetProtocol(protocol.ID(selectedProto))
ids.responseHandler(s)
}
func protoSupportsPeerRecords(proto protocol.ID) bool {
return proto == ID || proto == IDPush
}
func (ids *IDService) requestHandler(s network.Stream) {
defer helpers.FullClose(s)
c := s.Conn()
w := ggio.NewDelimitedWriter(s)
mes := pb.Identify{}
ids.populateMessage(&mes, s.Conn())
ids.populateMessage(&mes, s.Conn(), protoSupportsPeerRecords(s.Protocol()))
w.WriteMsg(&mes)
log.Debugf("%s sent message to %s %s", ID, c.RemotePeer(), c.RemoteMultiaddr())
@ -297,14 +336,15 @@ func (ids *IDService) responseHandler(s network.Stream) {
defer func() { go helpers.FullClose(s) }()
log.Debugf("%s received message from %s %s", s.Protocol(), c.RemotePeer(), c.RemoteMultiaddr())
ids.consumeMessage(&mes, c)
ids.consumeMessage(&mes, c, protoSupportsPeerRecords(s.Protocol()))
}
func (ids *IDService) broadcast(proto protocol.ID, payloadWriter func(s network.Stream)) {
func (ids *IDService) broadcast(protos []protocol.ID, payloadWriter func(s network.Stream)) {
var wg sync.WaitGroup
protoStrs := protocol.ConvertToStrings(protos)
ctx, cancel := context.WithTimeout(ids.ctx, 30*time.Second)
ctx = network.WithNoDial(ctx, string(proto))
ctx = network.WithNoDial(ctx, protoStrs[0])
pstore := ids.Host.Peerstore()
for _, p := range ids.Host.Network().Peers() {
@ -324,13 +364,13 @@ func (ids *IDService) broadcast(proto protocol.ID, payloadWriter func(s network.
}
// avoid the unnecessary stream if the peer does not support the protocol.
if sup, err := pstore.SupportsProtocols(p, string(proto)); err != nil && len(sup) == 0 {
if sup, err := pstore.SupportsProtocols(p, protoStrs...); err != nil && len(sup) == 0 {
// the peer does not support the required protocol.
return
}
// if the peerstore query errors, we go ahead anyway.
s, err := ids.Host.NewStream(ctx, p, proto)
s, err := ids.Host.NewStream(ctx, p, protos...)
if err != nil {
log.Debugf("error opening push stream to %s: %s", p, err.Error())
return
@ -358,7 +398,7 @@ func (ids *IDService) broadcast(proto protocol.ID, payloadWriter func(s network.
}()
}
func (ids *IDService) populateMessage(mes *pb.Identify, c network.Conn) {
func (ids *IDService) populateMessage(mes *pb.Identify, c network.Conn, usePeerRecords bool) {
// set protocols this node is currently handling
protos := ids.Host.Mux().Protocols()
mes.Protocols = make([]string, len(protos))
@ -370,18 +410,35 @@ func (ids *IDService) populateMessage(mes *pb.Identify, c network.Conn) {
// "public" address, at least in relation to us.
mes.ObservedAddr = c.RemoteMultiaddr().Bytes()
// set listen addrs, get our latest addrs from Host.
laddrs := ids.Host.Addrs()
// Note: LocalMultiaddr is sometimes 0.0.0.0
viaLoopback := manet.IsIPLoopback(c.LocalMultiaddr()) || manet.IsIPLoopback(c.RemoteMultiaddr())
mes.ListenAddrs = make([][]byte, 0, len(laddrs))
for _, addr := range laddrs {
if !viaLoopback && manet.IsIPLoopback(addr) {
continue
if usePeerRecords {
ids.peerrecMu.RLock()
rec := ids.peerrec
ids.peerrecMu.RUnlock()
if rec == nil {
log.Errorf("latest peer record does not exist. identify message incomplete!")
} else {
recBytes, err := rec.Marshal()
if err != nil {
log.Errorf("error marshaling peer record: %v", err)
} else {
mes.SignedPeerRecord = recBytes
log.Debugf("%s sent peer record to %s", c.LocalPeer(), c.RemotePeer())
}
}
} else {
// set listen addrs, get our latest addrs from Host.
laddrs := ids.Host.Addrs()
// Note: LocalMultiaddr is sometimes 0.0.0.0
viaLoopback := manet.IsIPLoopback(c.LocalMultiaddr()) || manet.IsIPLoopback(c.RemoteMultiaddr())
mes.ListenAddrs = make([][]byte, 0, len(laddrs))
for _, addr := range laddrs {
if !viaLoopback && manet.IsIPLoopback(addr) {
continue
}
mes.ListenAddrs = append(mes.ListenAddrs, addr.Bytes())
}
mes.ListenAddrs = append(mes.ListenAddrs, addr.Bytes())
}
log.Debugf("%s sent listen addrs to %s: %s", c.LocalPeer(), c.RemotePeer(), laddrs)
// set our public key
ownKey := ids.Host.Peerstore().PubKey(ids.Host.ID())
@ -411,7 +468,7 @@ func (ids *IDService) populateMessage(mes *pb.Identify, c network.Conn) {
mes.AgentVersion = &av
}
func (ids *IDService) consumeMessage(mes *pb.Identify, c network.Conn) {
func (ids *IDService) consumeMessage(mes *pb.Identify, c network.Conn, usePeerRecords bool) {
p := c.RemotePeer()
// mes.Protocols
@ -441,18 +498,37 @@ func (ids *IDService) consumeMessage(mes *pb.Identify, c network.Conn) {
// that picks random source ports, this can cause DHT nodes to collect
// many undialable addresses for other peers.
// add certified addresses for the peer, if they sent us a signed peer record
var signedPeerRecord *record.Envelope
if usePeerRecords {
var err error
signedPeerRecord, err = signedPeerRecordFromMessage(mes)
if err != nil {
log.Errorf("error getting peer record from Identify message: %v", err)
}
}
// Extend the TTLs on the known (probably) good addresses.
// Taking the lock ensures that we don't concurrently process a disconnect.
ids.addrMu.Lock()
switch ids.Host.Network().Connectedness(p) {
case network.Connected:
// invalidate previous addrs -- we use a transient ttl instead of 0 to ensure there
// is no period of having no good addrs whatsoever
ids.Host.Peerstore().UpdateAddrs(p, peerstore.ConnectedAddrTTL, transientTTL)
ids.Host.Peerstore().AddAddrs(p, lmaddrs, peerstore.ConnectedAddrTTL)
default:
ids.Host.Peerstore().UpdateAddrs(p, peerstore.ConnectedAddrTTL, transientTTL)
ids.Host.Peerstore().AddAddrs(p, lmaddrs, peerstore.RecentlyConnectedAddrTTL)
ttl := peerstore.RecentlyConnectedAddrTTL
if ids.Host.Network().Connectedness(p) == network.Connected {
ttl = peerstore.ConnectedAddrTTL
}
// invalidate previous addrs -- we use a transient ttl instead of 0 to ensure there
// is no period of having no good addrs whatsoever
ids.Host.Peerstore().UpdateAddrs(p, peerstore.ConnectedAddrTTL, transientTTL)
// add signed addrs if we have them and the peerstore supports them
cab, ok := peerstore.GetCertifiedAddrBook(ids.Host.Peerstore())
if ok && signedPeerRecord != nil {
_, addErr := cab.ConsumePeerRecord(signedPeerRecord, ttl)
if addErr != nil {
log.Debugf("error adding signed addrs to peerstore: %v", addErr)
}
} else {
ids.Host.Peerstore().AddAddrs(p, lmaddrs, ttl)
}
ids.addrMu.Unlock()
@ -595,6 +671,14 @@ func addrInAddrs(a ma.Multiaddr, as []ma.Multiaddr) bool {
return false
}
func signedPeerRecordFromMessage(msg *pb.Identify) (*record.Envelope, error) {
if msg.SignedPeerRecord == nil || len(msg.SignedPeerRecord) == 0 {
return nil, nil
}
env, _, err := record.ConsumeEnvelope(msg.SignedPeerRecord, peer.PeerRecordEnvelopeDomain)
return env, err
}
// netNotifiee defines methods to be used with the IpfsDHT
type netNotifiee IDService

2
p2p/protocol/identify/id_delta.go

@ -58,7 +58,7 @@ func (ids *IDService) fireProtocolDelta(evt event.EvtLocalProtocolsUpdated) {
}
log.Debugf("%s sent delta update to %s: %s", IDDelta, c.RemotePeer(), c.RemoteMultiaddr())
}
ids.broadcast(IDDelta, deltaWriter)
ids.broadcast([]protocol.ID{IDDelta}, deltaWriter)
}
// consumeDelta processes an incoming delta from a peer, updating the peerstore

15
p2p/protocol/identify/id_push.go

@ -1,17 +1,26 @@
package identify
import "github.com/libp2p/go-libp2p-core/network"
import (
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/protocol"
)
// IDPush is the protocol.ID of the Identify push protocol. It sends full identify messages containing
// the current state of the peer.
//
// It is in the process of being replaced by identify delta, which sends only diffs for better
// resource utilisation.
const IDPush = "/ipfs/id/push/1.0.0"
const IDPush = "/p2p/id/push/1.1.0"
// LegacyIDPush is the protocol.ID of the previous version of the Identify push protocol,
// which does not support exchanging signed addresses in PeerRecords.
// It is still supported for backwards compatibility if a remote peer does not support
// the current version.
const LegacyIDPush = "/ipfs/id/push/1.0.0"
// Push pushes a full identify message to all peers containing the current state.
func (ids *IDService) Push() {
ids.broadcast(IDPush, ids.requestHandler)
ids.broadcast([]protocol.ID{IDPush, LegacyIDPush}, ids.requestHandler)
}
// pushHandler handles incoming identify push streams. The behaviour is identical to the ordinary identify protocol.

147
p2p/protocol/identify/id_test.go

@ -2,6 +2,8 @@ package identify_test
import (
"context"
"fmt"
"github.com/libp2p/go-libp2p-core/record"
"reflect"
"sort"
"testing"
@ -34,6 +36,8 @@ func subtestIDService(t *testing.T) {
h1 := blhost.NewBlankHost(swarmt.GenSwarm(t, ctx))
h2 := blhost.NewBlankHost(swarmt.GenSwarm(t, ctx))
generatePeerRecord(t, h1)
generatePeerRecord(t, h2)
h1p := h1.ID()
h2p := h2.ID()
@ -46,6 +50,9 @@ func subtestIDService(t *testing.T) {
testKnowsAddrs(t, h1, h2p, []ma.Multiaddr{}) // nothing
testKnowsAddrs(t, h2, h1p, []ma.Multiaddr{}) // nothing
// the forgetMe addr represents an address for h1 that h2 has learned out of band
// (not via identify protocol). Shortly after the identify exchange, it will be
// forgotten and replaced by the addrs h1 sends during identify
forgetMe, _ := ma.NewMultiaddr("/ip4/1.2.3.4/tcp/1234")
h2.Peerstore().AddAddr(h1p, forgetMe, peerstore.RecentlyConnectedAddrTTL)
@ -71,7 +78,8 @@ func subtestIDService(t *testing.T) {
// the IDService should be opened automatically, by the network.
// what we should see now is that both peers know about each others listen addresses.
t.Log("test peer1 has peer2 addrs correctly")
testKnowsAddrs(t, h1, h2p, h2.Peerstore().Addrs(h2p)) // has them
testKnowsAddrs(t, h1, h2p, h2.Peerstore().Addrs(h2p)) // has them
testHasCertifiedAddrs(t, h1, h2p, h2.Peerstore().Addrs(h2p)) // should have signed addrs also
testHasProtocolVersions(t, h1, h2p)
testHasPublicKey(t, h1, h2p, h2.Peerstore().PubKey(h2p)) // h1 should have h2's public key
@ -89,6 +97,7 @@ func subtestIDService(t *testing.T) {
// and the protocol versions.
t.Log("test peer2 has peer1 addrs correctly")
testKnowsAddrs(t, h2, h1p, addrs) // has them
testHasCertifiedAddrs(t, h2, h1p, h1.Peerstore().Addrs(h1p))
testHasProtocolVersions(t, h2, h1p)
testHasPublicKey(t, h2, h1p, h1.Peerstore().PubKey(h1p)) // h1 should have h2's public key
@ -99,19 +108,21 @@ func subtestIDService(t *testing.T) {
t.Fatal("should have no connections")
}
t.Log("testing addrs just after disconnect")
// addresses don't immediately expire on disconnect, so we should still have them
testKnowsAddrs(t, h2, h1p, addrs)
testKnowsAddrs(t, h1, h2p, h2.Peerstore().Addrs(h2p))
testHasCertifiedAddrs(t, h1, h2p, h2.Peerstore().Addrs(h2p))
testHasCertifiedAddrs(t, h2, h1p, h1.Peerstore().Addrs(h1p))
time.Sleep(500 * time.Millisecond)
// Forget the first one.
testKnowsAddrs(t, h2, h1p, addrs[:len(addrs)-1])
time.Sleep(1 * time.Second)
// Forget the rest.
// the addrs had their TTLs reduced on disconnect, and
// will be forgotten soon after
t.Log("testing addrs after TTL expiration")
time.Sleep(2 * time.Second)
testKnowsAddrs(t, h1, h2p, []ma.Multiaddr{})
testKnowsAddrs(t, h2, h1p, []ma.Multiaddr{})
testHasCertifiedAddrs(t, h1, h2p, []ma.Multiaddr{})
testHasCertifiedAddrs(t, h2, h1p, []ma.Multiaddr{})
// test that we received the "identify completed" event.
select {
@ -125,7 +136,36 @@ func testKnowsAddrs(t *testing.T, h host.Host, p peer.ID, expected []ma.Multiadd
t.Helper()
actual := h.Peerstore().Addrs(p)
checkAddrs(t, expected, actual, fmt.Sprintf("%s did not have addr for %s", h.ID(), p))
}
func testHasCertifiedAddrs(t *testing.T, h host.Host, p peer.ID, expected []ma.Multiaddr) {
t.Helper()
cab, ok := peerstore.GetCertifiedAddrBook(h.Peerstore())
if !ok {
t.Error("expected peerstore to implement CertifiedAddrBook")
}
recordEnvelope := cab.GetPeerRecord(p)
if recordEnvelope == nil {
if len(expected) == 0 {
return
}
t.Fatalf("peerstore has no signed record for peer %s", p)
}
r, err := recordEnvelope.Record()
if err != nil {
t.Error("Error unwrapping signed PeerRecord from envelope", err)
}
rec, ok := r.(*peer.PeerRecord)
if !ok {
t.Error("unexpected record type")
}
checkAddrs(t, expected, rec.Addrs, fmt.Sprintf("%s did not have certified addr for %s", h.ID(), p))
}
func checkAddrs(t *testing.T, expected, actual []ma.Multiaddr, msg string) {
t.Helper()
if len(actual) != len(expected) {
t.Errorf("expected: %s", expected)
t.Errorf("actual: %s", actual)
@ -138,7 +178,7 @@ func testKnowsAddrs(t *testing.T, h host.Host, p peer.ID, expected []ma.Multiadd
}
for _, addr := range expected {
if _, found := have[addr.String()]; !found {
t.Errorf("%s did not have addr for %s: %s", h.ID(), p, addr)
t.Errorf("%s: %s", msg, addr)
}
}
}
@ -177,6 +217,36 @@ func testHasPublicKey(t *testing.T, h host.Host, p peer.ID, shouldBe ic.PubKey)
}
}
// we're using BlankHost in our tests, which doesn't automatically generate peer records
// like BasicHost. This generates a record and puts it on the host's event bus, which
// will cause the identify service to start supporting new protocol versions that
// depend on peer records being available.
func generatePeerRecord(t *testing.T, h host.Host) {
t.Helper()
key := h.Peerstore().PrivKey(h.ID())
if key == nil {
t.Fatal("no private key for host")
}
rec := peer.NewPeerRecord()
rec.PeerID = h.ID()
rec.Addrs = h.Addrs()
signed, err := record.Seal(rec, key)
if err != nil {
t.Fatalf("error generating peer record: %s", err)
}
evt := event.EvtLocalAddressesUpdated{SignedPeerRecord: *signed}
emitter, err := h.EventBus().Emitter(new(event.EvtLocalAddressesUpdated), eventbus.Stateful)
if err != nil {
t.Fatal(err)
}
err = emitter.Emit(evt)
if err != nil {
t.Fatal(err)
}
}
// TestIDServiceWait gives the ID service 1s to finish after dialing
// this is because it used to be concurrent. Now, Dial wait till the
// id service is done.
@ -423,11 +493,14 @@ func TestIdentifyDeltaWhileIdentifyingConn(t *testing.T) {
// replace the original identify handler by one that blocks until we close the block channel.
// this allows us to control how long identify runs.
block := make(chan struct{})
h1.RemoveStreamHandler(identify.ID)
h1.SetStreamHandler(identify.ID, func(s network.Stream) {
handler := func(s network.Stream) {
<-block
go helpers.FullClose(s)
})
}
h1.RemoveStreamHandler(identify.ID)
h1.RemoveStreamHandler(identify.LegacyID)
h1.SetStreamHandler(identify.ID, handler)
h1.SetStreamHandler(identify.LegacyID, handler)
// from h2 connect to h1.
if err := h2.Connect(ctx, peer.AddrInfo{ID: h1.ID(), Addrs: h1.Addrs()}); err != nil {
@ -507,3 +580,51 @@ func TestUserAgent(t *testing.T) {
t.Errorf("expected agent version %q, got %q", "bar", av)
}
}
// make sure that we still support older peers using "legacy" versions of identify
func TestCompatibilityWithPeersThatDoNotSupportSignedAddrs(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
h1 := blhost.NewBlankHost(swarmt.GenSwarm(t, ctx))
h2 := blhost.NewBlankHost(swarmt.GenSwarm(t, ctx))
defer h2.Close()
defer h1.Close()
ids := identify.NewIDService(h1)
ids2 := identify.NewIDService(h2)
defer ids.Close()
defer ids2.Close()
// generate initial peer record only for h1. this will cause h1 to enable
// the new protocols, but h2 will still use legacy protos
generatePeerRecord(t, h1)
h2p := h2.ID()
h2pi := h2.Peerstore().PeerInfo(h2p)
if err := h1.Connect(ctx, h2pi); err != nil {
t.Fatal(err)
}
h1t2c := h1.Network().ConnsToPeer(h2p)
if len(h1t2c) == 0 {
t.Fatal("should have a conn here")
}
ids.IdentifyConn(h1t2c[0])
// the IDService should be opened automatically, by the network.
// what we should see now is that both peers know about each others listen addresses.
t.Log("test peer1 has peer2 addrs correctly")
testKnowsAddrs(t, h1, h2p, h2.Peerstore().Addrs(h2p)) // has them
testHasCertifiedAddrs(t, h1, h2p, []ma.Multiaddr{}) // should not have signed addrs
// double check that it works when both peers support the new protos
// enable new protos for h2 by generating a peer record
generatePeerRecord(t, h2)
// if we re-identify, h1 should now have certified addrs for h2
ids.IdentifyConn(h1t2c[0])
t.Log("test peer1 has peer2 certified addrs correctly")
testHasCertifiedAddrs(t, h1, h2p, h2.Peerstore().Addrs(h2p))
}

101
p2p/protocol/identify/pb/identify.pb.go

@ -98,7 +98,13 @@ type Identify struct {
// protocols are the services this node is running
Protocols []string `protobuf:"bytes,3,rep,name=protocols" json:"protocols,omitempty"`
// a delta update is incompatible with everything else. If this field is included, none of the others can appear.
Delta *Delta `protobuf:"bytes,7,opt,name=delta" json:"delta,omitempty"`
Delta *Delta `protobuf:"bytes,7,opt,name=delta" json:"delta,omitempty"`
// signedPeerRecord contains a serialized SignedEnvelope containing a PeerRecord,
// signed by the sending node. It contains the same addresses as the listenAddrs field, but
// in a form that lets us share authenticated addrs with other peers.
// see github.com/libp2p/go-libp2p-core/record/pb/envelope.proto and
// github.com/libp2p/go-libp2p-core/peer/pb/peer_record.proto for message definitions.
SignedPeerRecord []byte `protobuf:"bytes,8,opt,name=signedPeerRecord" json:"signedPeerRecord,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
@ -186,6 +192,13 @@ func (m *Identify) GetDelta() *Delta {
return nil
}
func (m *Identify) GetSignedPeerRecord() []byte {
if m != nil {
return m.SignedPeerRecord
}
return nil
}
func init() {
proto.RegisterType((*Delta)(nil), "identify.pb.Delta")
proto.RegisterType((*Identify)(nil), "identify.pb.Identify")
@ -194,23 +207,24 @@ func init() {
func init() { proto.RegisterFile("identify.proto", fileDescriptor_83f1e7e6b485409f) }
var fileDescriptor_83f1e7e6b485409f = []byte{
// 251 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x5c, 0x90, 0xb1, 0x4e, 0xc3, 0x30,
0x14, 0x45, 0xe5, 0x96, 0x02, 0x79, 0xb1, 0x5a, 0xe9, 0x4d, 0x1e, 0x50, 0x64, 0xb2, 0xe0, 0x29,
0x03, 0x7f, 0x00, 0x62, 0x41, 0x2c, 0xc8, 0x48, 0xac, 0x28, 0xa9, 0x1f, 0xc8, 0x52, 0x1a, 0x57,
0x8e, 0x41, 0xea, 0xce, 0xc7, 0x31, 0xf2, 0x09, 0x28, 0x5f, 0x82, 0xe2, 0x92, 0x26, 0x65, 0xf4,
0xd1, 0x91, 0xef, 0xbb, 0x17, 0x96, 0xd6, 0x50, 0x13, 0xec, 0xeb, 0xae, 0xd8, 0x7a, 0x17, 0x1c,
0xa6, 0xe3, 0xbb, 0xca, 0x9f, 0x60, 0x71, 0x47, 0x75, 0x28, 0xf1, 0x0a, 0x56, 0xa5, 0x31, 0x64,
0x5e, 0xa2, 0xb4, 0x76, 0x75, 0x2b, 0x98, 0x9c, 0xab, 0x44, 0x2f, 0x23, 0x7e, 0x1c, 0x28, 0x5e,
0x02, 0xf7, 0x9b, 0x89, 0x35, 0x8b, 0x56, 0xea, 0x37, 0x07, 0x25, 0xff, 0x9c, 0xc1, 0xf9, 0xfd,
0x5f, 0x08, 0x2a, 0x58, 0x0d, 0xf2, 0x33, 0xf9, 0xd6, 0xba, 0x46, 0x2c, 0x24, 0x53, 0x89, 0xfe,
0x8f, 0x31, 0x07, 0x5e, 0xbe, 0x51, 0x13, 0x06, 0xed, 0x34, 0x6a, 0x47, 0x0c, 0x2f, 0x20, 0xd9,
0xbe, 0x57, 0xb5, 0x5d, 0x3f, 0xd0, 0x4e, 0x30, 0xc9, 0x14, 0xd7, 0x23, 0x40, 0x09, 0x69, 0x6d,
0xdb, 0x40, 0xcd, 0x8d, 0x31, 0x7e, 0x7f, 0x1a, 0xd7, 0x53, 0xd4, 0x67, 0xb8, 0xaa, 0x25, 0xff,
0x41, 0xa6, 0x07, 0xe2, 0x24, 0x7e, 0x71, 0xc4, 0x62, 0xc6, 0xa1, 0xde, 0x3c, 0xd6, 0x1b, 0x01,
0x2a, 0x58, 0x98, 0x7e, 0x31, 0x71, 0x26, 0x99, 0x4a, 0xaf, 0xb1, 0x98, 0xcc, 0x59, 0xc4, 0x2d,
0xf5, 0x5e, 0xb8, 0xe5, 0x5f, 0x5d, 0xc6, 0xbe, 0xbb, 0x8c, 0xfd, 0x74, 0x19, 0xfb, 0x0d, 0x00,
0x00, 0xff, 0xff, 0xa1, 0x77, 0x03, 0x42, 0x87, 0x01, 0x00, 0x00,
// 272 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x64, 0x90, 0x41, 0x4a, 0xc3, 0x40,
0x14, 0x86, 0x99, 0xd6, 0x6a, 0xf3, 0x12, 0x5a, 0x99, 0xd5, 0x2c, 0x24, 0xc4, 0x6c, 0x1c, 0x5c,
0x64, 0xe1, 0x0d, 0x14, 0x37, 0xe2, 0xa6, 0x8c, 0xe0, 0x56, 0x92, 0xbc, 0x67, 0x19, 0x48, 0x33,
0x65, 0x32, 0x0a, 0xbd, 0x95, 0xc7, 0x70, 0xe9, 0x11, 0x24, 0x27, 0x91, 0x4c, 0x4d, 0x93, 0xea,
0x72, 0x3e, 0x3e, 0xe6, 0x7f, 0xff, 0x0f, 0x0b, 0x8d, 0x54, 0x3b, 0xfd, 0xba, 0xcb, 0xb6, 0xd6,
0x38, 0xc3, 0xc3, 0xe1, 0x5d, 0xa4, 0x4f, 0x30, 0xbb, 0xa7, 0xca, 0xe5, 0xfc, 0x0a, 0x96, 0x39,
0x22, 0xe1, 0x8b, 0x97, 0x4a, 0x53, 0x35, 0x82, 0x25, 0x53, 0x19, 0xa8, 0x85, 0xc7, 0xab, 0x9e,
0xf2, 0x4b, 0x88, 0xec, 0x66, 0x64, 0x4d, 0xbc, 0x15, 0xda, 0xcd, 0x41, 0x49, 0x3f, 0x26, 0x30,
0x7f, 0xf8, 0x0d, 0xe1, 0x12, 0x96, 0xbd, 0xfc, 0x4c, 0xb6, 0xd1, 0xa6, 0x16, 0xb3, 0x84, 0xc9,
0x40, 0xfd, 0xc5, 0x3c, 0x85, 0x28, 0x5f, 0x53, 0xed, 0x7a, 0xed, 0xd4, 0x6b, 0x47, 0x8c, 0x5f,
0x40, 0xb0, 0x7d, 0x2b, 0x2a, 0x5d, 0x3e, 0xd2, 0x4e, 0xb0, 0x84, 0xc9, 0x48, 0x0d, 0x80, 0x27,
0x10, 0x56, 0xba, 0x71, 0x54, 0xdf, 0x22, 0xda, 0xfd, 0x69, 0x91, 0x1a, 0xa3, 0x2e, 0xc3, 0x14,
0x0d, 0xd9, 0x77, 0xc2, 0x0e, 0x88, 0x13, 0xff, 0xc5, 0x11, 0xf3, 0x19, 0x87, 0x7a, 0x53, 0x5f,
0x6f, 0x00, 0x5c, 0xc2, 0x0c, 0xbb, 0xc5, 0xc4, 0x59, 0xc2, 0x64, 0x78, 0xc3, 0xb3, 0xd1, 0x9c,
0x99, 0xdf, 0x52, 0xed, 0x05, 0x7e, 0x0d, 0xe7, 0x8d, 0x5e, 0xd7, 0x84, 0x2b, 0x22, 0xab, 0xa8,
0x34, 0x16, 0xc5, 0xdc, 0xe7, 0xfd, 0xe3, 0x77, 0xd1, 0x67, 0x1b, 0xb3, 0xaf, 0x36, 0x66, 0xdf,
0x6d, 0xcc, 0x7e, 0x02, 0x00, 0x00, 0xff, 0xff, 0xc0, 0x03, 0xc8, 0x41, 0xb3, 0x01, 0x00, 0x00,
}
func (m *Delta) Marshal() (dAtA []byte, err error) {
@ -282,6 +296,13 @@ func (m *Identify) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i -= len(m.XXX_unrecognized)
copy(dAtA[i:], m.XXX_unrecognized)
}
if m.SignedPeerRecord != nil {
i -= len(m.SignedPeerRecord)
copy(dAtA[i:], m.SignedPeerRecord)
i = encodeVarintIdentify(dAtA, i, uint64(len(m.SignedPeerRecord)))
i--
dAtA[i] = 0x42
}
if m.Delta != nil {
{
size, err := m.Delta.MarshalToSizedBuffer(dAtA[:i])
@ -416,6 +437,10 @@ func (m *Identify) Size() (n int) {
l = m.Delta.Size()
n += 1 + l + sovIdentify(uint64(l))
}
if m.SignedPeerRecord != nil {
l = len(m.SignedPeerRecord)
n += 1 + l + sovIdentify(uint64(l))
}
if m.XXX_unrecognized != nil {
n += len(m.XXX_unrecognized)
}
@ -809,6 +834,40 @@ func (m *Identify) Unmarshal(dAtA []byte) error {
return err
}
iNdEx = postIndex
case 8:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field SignedPeerRecord", wireType)
}
var byteLen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowIdentify
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
byteLen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if byteLen < 0 {
return ErrInvalidLengthIdentify
}
postIndex := iNdEx + byteLen
if postIndex < 0 {
return ErrInvalidLengthIdentify
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.SignedPeerRecord = append(m.SignedPeerRecord[:0], dAtA[iNdEx:postIndex]...)
if m.SignedPeerRecord == nil {
m.SignedPeerRecord = []byte{}
}
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipIdentify(dAtA[iNdEx:])
@ -891,9 +950,6 @@ func skipIdentify(dAtA []byte) (n int, err error) {
return 0, ErrInvalidLengthIdentify
}
iNdEx += length
if iNdEx < 0 {
return 0, ErrInvalidLengthIdentify
}
case 3:
depth++
case 4:
@ -906,6 +962,9 @@ func skipIdentify(dAtA []byte) (n int, err error) {
default:
return 0, fmt.Errorf("proto: illegal wireType %d", wireType)
}
if iNdEx < 0 {
return 0, ErrInvalidLengthIdentify
}
if depth == 0 {
return iNdEx, nil
}

7
p2p/protocol/identify/pb/identify.proto

@ -36,4 +36,11 @@ message Identify {
// a delta update is incompatible with everything else. If this field is included, none of the others can appear.
optional Delta delta = 7;
// signedPeerRecord contains a serialized SignedEnvelope containing a PeerRecord,
// signed by the sending node. It contains the same addresses as the listenAddrs field, but
// in a form that lets us share authenticated addrs with other peers.
// see github.com/libp2p/go-libp2p-core/record/pb/envelope.proto and
// github.com/libp2p/go-libp2p-core/peer/pb/peer_record.proto for message definitions.
optional bytes signedPeerRecord = 8;
}

Loading…
Cancel
Save