Browse Source

more protocol strings

feat/signed-records-chunking
Aarsh Shah 4 years ago
committed by Steven Allen
parent
commit
60b52f10ae
  1. 30
      p2p/protocol/identify/id.go
  2. 7
      p2p/protocol/identify/id_test.go
  3. 57
      p2p/protocol/identify/pb/identify.pb.go
  4. 6
      p2p/protocol/identify/pb/identify.proto

30
p2p/protocol/identify/id.go

@ -406,7 +406,7 @@ func (ids *IDService) sendIdentifyResp(s network.Stream) {
func (ids *IDService) handleIdentifyResponse(s network.Stream) {
c := s.Conn()
r := ggio.NewDelimitedReader(s, legacyIDSize)
r := ggio.NewDelimitedReader(s, signedIDSize)
mes := pb.Identify{}
if err := r.ReadMsg(&mes); err != nil {
log.Warning("error reading identify message: ", err)
@ -414,9 +414,9 @@ func (ids *IDService) handleIdentifyResponse(s network.Stream) {
return
}
if mes.SignedRecordInNextMessage {
if mes.More {
m := &pb.Identify{}
if err := ggio.NewDelimitedReader(s, signedIDSize).ReadMsg(m); err != nil {
if err := r.ReadMsg(m); err != nil {
log.Warning("error reading identify message: ", err)
s.Reset()
return
@ -444,28 +444,28 @@ func (ids *IDService) getSnapshot() *identifySnapshot {
}
func (ids *IDService) writeChunkedIdentifyMsg(c network.Conn, snapshot *identifySnapshot, s network.Stream) error {
mes := ids.getIdentifyMsgUnsigned(c, snapshot)
mes := ids.createBaseIdentifyResponse(c, snapshot)
sr := ids.getSignedRecord(snapshot)
mes.SignedPeerRecord = sr
writer := ggio.NewDelimitedWriter(s)
if sr == nil || proto.Size(mes) <= legacyIDSize {
return writer.WriteMsg(mes)
} else {
mes.SignedPeerRecord = nil
mes.SignedRecordInNextMessage = true
if err := writer.WriteMsg(mes); err != nil {
return err
}
// then write just the signed record
m := &pb.Identify{SignedPeerRecord: sr}
err := writer.WriteMsg(m)
}
mes.SignedPeerRecord = nil
mes.More = true
if err := writer.WriteMsg(mes); err != nil {
return err
}
// then write just the signed record
m := &pb.Identify{SignedPeerRecord: sr}
err := writer.WriteMsg(m)
return err
}
func (ids *IDService) getIdentifyMsgUnsigned(
func (ids *IDService) createBaseIdentifyResponse(
conn network.Conn,
snapshot *identifySnapshot,
) *pb.Identify {

7
p2p/protocol/identify/id_test.go

@ -767,7 +767,7 @@ func TestSendPushIfDeltaNotSupported(t *testing.T) {
}, 5*time.Second, 500*time.Millisecond)
}
/*func TestLargeIdentifyMessage(t *testing.T) {
func TestLargeIdentifyMessage(t *testing.T) {
oldTTL := peerstore.RecentlyConnectedAddrTTL
peerstore.RecentlyConnectedAddrTTL = time.Second
defer func() {
@ -786,7 +786,8 @@ func TestSendPushIfDeltaNotSupported(t *testing.T) {
h2 := blhost.NewBlankHost(swarmt.GenSwarm(t, ctx, swarmt.OptPeerPrivateKey(sk2)))
// add protocol strings to make the message larger
for i := 0; i < 100; i++ {
// about 2K of protocol strings
for i := 0; i < 500; i++ {
r := "rand" + string(i)
h1.SetStreamHandler(protocol.ID(r), func(network.Stream) {})
h2.SetStreamHandler(protocol.ID(r), func(network.Stream) {})
@ -880,4 +881,4 @@ func TestSendPushIfDeltaNotSupported(t *testing.T) {
case <-time.After(10 * time.Second):
t.Fatalf("expected EvtPeerIdentificationCompleted event within 10 seconds; none received")
}
}*/
}

57
p2p/protocol/identify/pb/identify.pb.go

@ -100,10 +100,10 @@ type Identify struct {
Protocols []string `protobuf:"bytes,3,rep,name=protocols" json:"protocols,omitempty"`
// a delta update is incompatible with everything else. If this field is included, none of the others can appear.
Delta *Delta `protobuf:"bytes,7,opt,name=delta" json:"delta,omitempty"`
// signedRecordInNextMessage is set to true by a peer to indicate to the receiver that the signed record is absent in the current message
// more is set to true by a peer to indicate to the receiver that the signed record is absent in the current message
// and the peer should read the next message from the stream to fetch it.
// This is done because messages with signed records can exceed the legacy message size of 2K bytes.
SignedRecordInNextMessage bool `protobuf:"varint,9,opt,name=signedRecordInNextMessage" json:"signedRecordInNextMessage"`
More bool `protobuf:"varint,9,opt,name=more" json:"more"`
// signedPeerRecord contains a serialized SignedEnvelope containing a PeerRecord,
// signed by the sending node. It contains the same addresses as the listenAddrs field, but
// in a form that lets us share authenticated addrs with other peers.
@ -197,9 +197,9 @@ func (m *Identify) GetDelta() *Delta {
return nil
}
func (m *Identify) GetSignedRecordInNextMessage() bool {
func (m *Identify) GetMore() bool {
if m != nil {
return m.SignedRecordInNextMessage
return m.More
}
return false
}
@ -219,28 +219,27 @@ func init() {
func init() { proto.RegisterFile("identify.proto", fileDescriptor_83f1e7e6b485409f) }
var fileDescriptor_83f1e7e6b485409f = []byte{
// 331 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x74, 0x90, 0xdf, 0x4a, 0xfb, 0x30,
0x14, 0xc7, 0x7f, 0xd9, 0x9f, 0x9f, 0x6b, 0x5a, 0x36, 0xc9, 0x55, 0x15, 0x99, 0x75, 0x37, 0x06,
0xc1, 0x0e, 0x7c, 0x03, 0x87, 0x37, 0x43, 0x94, 0x11, 0xc1, 0x5b, 0x69, 0x9b, 0xb3, 0x1a, 0xe8,
0x9a, 0x91, 0x64, 0xe2, 0xde, 0x70, 0x97, 0x3e, 0x81, 0xc8, 0x9e, 0xc1, 0x07, 0x90, 0x9d, 0xba,
0x7f, 0x8a, 0x77, 0x39, 0x9f, 0x7c, 0x92, 0x73, 0xce, 0x97, 0xb6, 0x95, 0x84, 0xd2, 0xa9, 0xf1,
0x3c, 0x9e, 0x1a, 0xed, 0x34, 0xf3, 0xb7, 0x75, 0x7a, 0x7c, 0x99, 0x2b, 0xf7, 0x3c, 0x4b, 0xe3,
0x4c, 0x4f, 0xfa, 0xb9, 0xce, 0x75, 0x1f, 0x9d, 0x74, 0x36, 0xc6, 0x0a, 0x0b, 0x3c, 0x55, 0x6f,
0x7b, 0x0f, 0xb4, 0x79, 0x03, 0x85, 0x4b, 0xd8, 0x39, 0xed, 0x24, 0x52, 0x82, 0x7c, 0x42, 0x9e,
0xe9, 0xc2, 0x86, 0x24, 0xaa, 0x73, 0x4f, 0xb4, 0x11, 0x8f, 0xd6, 0x94, 0x9d, 0xd1, 0xc0, 0x4c,
0x76, 0xac, 0x1a, 0x5a, 0xbe, 0x99, 0x6c, 0x94, 0xde, 0x67, 0x8d, 0xb6, 0x86, 0xdf, 0x33, 0x31,
0x4e, 0x3b, 0x6b, 0xf9, 0x11, 0x8c, 0x55, 0xba, 0x0c, 0x9b, 0x11, 0xe1, 0x9e, 0xf8, 0x89, 0x59,
0x8f, 0x06, 0x49, 0x0e, 0xa5, 0x5b, 0x6b, 0xff, 0x51, 0xdb, 0x63, 0xec, 0x84, 0x7a, 0xd3, 0x59,
0x5a, 0xa8, 0xec, 0x16, 0xe6, 0x21, 0x89, 0x08, 0x0f, 0xc4, 0x16, 0xb0, 0x88, 0xfa, 0x85, 0xb2,
0x0e, 0xca, 0x6b, 0x29, 0x4d, 0x35, 0x5a, 0x20, 0x76, 0xd1, 0xaa, 0x87, 0x4e, 0x2d, 0x98, 0x17,
0x90, 0x2b, 0x10, 0x36, 0xf0, 0x8b, 0x3d, 0x86, 0x3d, 0x36, 0xeb, 0xd5, 0x71, 0xbd, 0x2d, 0x60,
0x9c, 0x36, 0xe5, 0x2a, 0xb1, 0xf0, 0x20, 0x22, 0xdc, 0xbf, 0x62, 0xf1, 0x4e, 0xfa, 0x31, 0x66,
0x29, 0x2a, 0x81, 0x0d, 0xe8, 0x91, 0x55, 0x79, 0x09, 0x52, 0x40, 0xa6, 0x8d, 0x1c, 0x96, 0xf7,
0xf0, 0xea, 0xee, 0xc0, 0xda, 0x24, 0x87, 0xd0, 0x8b, 0x08, 0x6f, 0x0d, 0x1a, 0x8b, 0xf7, 0xd3,
0x7f, 0xe2, 0x6f, 0x8d, 0x5d, 0xd0, 0xc3, 0xea, 0x72, 0x04, 0x60, 0x2a, 0x21, 0x6c, 0xe1, 0xcc,
0xbf, 0xf8, 0x20, 0x58, 0x2c, 0xbb, 0xe4, 0x6d, 0xd9, 0x25, 0x1f, 0xcb, 0x2e, 0xf9, 0x0a, 0x00,
0x00, 0xff, 0xff, 0x49, 0x35, 0xe7, 0x94, 0x26, 0x02, 0x00, 0x00,
// 316 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x64, 0x90, 0xcd, 0x4a, 0xf3, 0x40,
0x14, 0x86, 0xbf, 0xe9, 0xcf, 0x67, 0x73, 0x12, 0x5a, 0x99, 0xd5, 0x20, 0x52, 0x63, 0x37, 0x0e,
0x82, 0x29, 0x78, 0x07, 0x16, 0x37, 0xe2, 0xa6, 0x8c, 0xe0, 0x56, 0x9a, 0xcc, 0x69, 0x1c, 0x48,
0x32, 0x65, 0x32, 0x15, 0x7a, 0x87, 0xc5, 0x95, 0x57, 0x20, 0xd2, 0x2b, 0x91, 0x9e, 0xda, 0x3f,
0xdd, 0xe5, 0x3c, 0x3c, 0x99, 0xf3, 0x9e, 0x17, 0xba, 0x46, 0x63, 0xe5, 0xcd, 0x74, 0x91, 0xcc,
0x9c, 0xf5, 0x96, 0x87, 0xfb, 0x39, 0x3d, 0xbb, 0xc9, 0x8d, 0x7f, 0x9d, 0xa7, 0x49, 0x66, 0xcb,
0x61, 0x6e, 0x73, 0x3b, 0x24, 0x27, 0x9d, 0x4f, 0x69, 0xa2, 0x81, 0xbe, 0x36, 0xff, 0x0e, 0x9e,
0xa0, 0x7d, 0x8f, 0x85, 0x9f, 0xf0, 0x2b, 0xe8, 0x4d, 0xb4, 0x46, 0xfd, 0x42, 0x3c, 0xb3, 0x45,
0x2d, 0x58, 0xdc, 0x94, 0x81, 0xea, 0x12, 0x1e, 0x6f, 0x29, 0xbf, 0x84, 0xc8, 0x95, 0x07, 0x56,
0x83, 0xac, 0xd0, 0x95, 0x3b, 0x65, 0xf0, 0xde, 0x80, 0xce, 0xc3, 0x4f, 0x26, 0x2e, 0xa1, 0xb7,
0x95, 0x9f, 0xd1, 0xd5, 0xc6, 0x56, 0xa2, 0x1d, 0x33, 0x19, 0xa8, 0xdf, 0x98, 0x0f, 0x20, 0x9a,
0xe4, 0x58, 0xf9, 0xad, 0xf6, 0x9f, 0xb4, 0x23, 0xc6, 0xcf, 0x21, 0x98, 0xcd, 0xd3, 0xc2, 0x64,
0x8f, 0xb8, 0x10, 0x2c, 0x66, 0x32, 0x52, 0x7b, 0xc0, 0x63, 0x08, 0x0b, 0x53, 0x7b, 0xac, 0xee,
0xb4, 0x76, 0x9b, 0x68, 0x91, 0x3a, 0x44, 0xeb, 0x1d, 0x36, 0xad, 0xd1, 0xbd, 0xa1, 0x5e, 0x03,
0xd1, 0xa2, 0x27, 0x8e, 0x18, 0xed, 0xd8, 0x9d, 0xd7, 0xa4, 0xf3, 0xf6, 0x80, 0x4b, 0x68, 0xeb,
0x75, 0x63, 0xe2, 0x24, 0x66, 0x32, 0xbc, 0xe5, 0xc9, 0x41, 0xfb, 0x09, 0x75, 0xa9, 0x36, 0x02,
0x17, 0xd0, 0x2a, 0xad, 0x43, 0x11, 0xc4, 0x4c, 0x76, 0x46, 0xad, 0xe5, 0xe7, 0xc5, 0x3f, 0x45,
0x84, 0x5f, 0xc3, 0x69, 0x6d, 0xf2, 0x0a, 0xf5, 0x18, 0xd1, 0x29, 0xcc, 0xac, 0xd3, 0xa2, 0x43,
0x49, 0xfe, 0xf0, 0x51, 0xb4, 0x5c, 0xf5, 0xd9, 0xc7, 0xaa, 0xcf, 0xbe, 0x56, 0x7d, 0xf6, 0x1d,
0x00, 0x00, 0xff, 0xff, 0xff, 0xed, 0x85, 0x1b, 0xfc, 0x01, 0x00, 0x00,
}
func (m *Delta) Marshal() (dAtA []byte, err error) {
@ -313,7 +312,7 @@ func (m *Identify) MarshalToSizedBuffer(dAtA []byte) (int, error) {
copy(dAtA[i:], m.XXX_unrecognized)
}
i--
if m.SignedRecordInNextMessage {
if m.More {
dAtA[i] = 1
} else {
dAtA[i] = 0
@ -895,7 +894,7 @@ func (m *Identify) Unmarshal(dAtA []byte) error {
iNdEx = postIndex
case 9:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field SignedRecordInNextMessage", wireType)
return fmt.Errorf("proto: wrong wireType = %d for field More", wireType)
}
var v int
for shift := uint(0); ; shift += 7 {
@ -912,7 +911,7 @@ func (m *Identify) Unmarshal(dAtA []byte) error {
break
}
}
m.SignedRecordInNextMessage = bool(v != 0)
m.More = bool(v != 0)
default:
iNdEx = preIndex
skippy, err := skipIdentify(dAtA[iNdEx:])

6
p2p/protocol/identify/pb/identify.proto

@ -39,10 +39,10 @@ message Identify {
// a delta update is incompatible with everything else. If this field is included, none of the others can appear.
optional Delta delta = 7;
// signedRecordInNextMessage is set to true by a peer to indicate to the receiver that the signed record is absent in the current message
// more is set to true by a peer to indicate to the receiver that the signed record is absent in the current message
// and the peer should read the next message from the stream to fetch it.
// This is done because messages with signed records can exceed the legacy message size of 2K bytes.
optional bool signedRecordInNextMessage = 9 [(gogoproto.nullable) = false];
optional bool more = 9 [(gogoproto.nullable) = false];
// signedPeerRecord contains a serialized SignedEnvelope containing a PeerRecord,
// signed by the sending node. It contains the same addresses as the listenAddrs field, but
@ -50,4 +50,4 @@ message Identify {
// see github.com/libp2p/go-libp2p-core/record/pb/envelope.proto and
// github.com/libp2p/go-libp2p-core/peer/pb/peer_record.proto for message definitions.
optional bytes signedPeerRecord = 8;
}
}

Loading…
Cancel
Save