Browse Source

feat: update to go-libp2p-core 0.7.0

This updates for the stream interface changes in go-libp2p-core 0.7.0
merge-circuit-v1
Steven Allen 4 years ago
parent
commit
6775ddac87
  1. 2
      p2p/protocol/internal/circuitv1-deprecated/conn.go
  2. 2
      p2p/protocol/internal/circuitv1-deprecated/listen.go
  3. 15
      p2p/protocol/internal/circuitv1-deprecated/relay.go
  4. 10
      p2p/protocol/internal/circuitv1-deprecated/transport_test.go

2
p2p/protocol/internal/circuitv1-deprecated/conn.go

@ -10,7 +10,7 @@ import (
"github.com/libp2p/go-libp2p-core/peer"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr-net"
manet "github.com/multiformats/go-multiaddr/net"
)
// HopTagWeight is the connection manager weight for connections carrying relay hop streams

2
p2p/protocol/internal/circuitv1-deprecated/listen.go

@ -6,7 +6,7 @@ import (
pb "github.com/libp2p/go-libp2p-circuit/pb"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr-net"
manet "github.com/multiformats/go-multiaddr/net"
)
var _ manet.Listener = (*RelayListener)(nil)

15
p2p/protocol/internal/circuitv1-deprecated/relay.go

@ -10,7 +10,6 @@ import (
pb "github.com/libp2p/go-libp2p-circuit/pb"
"github.com/libp2p/go-libp2p-core/helpers"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
@ -207,6 +206,7 @@ func CanHop(ctx context.Context, host host.Host, id peer.ID) (bool, error) {
if err != nil {
return false, err
}
defer s.Close()
rd := newDelimitedReader(s, maxMessageSize)
wr := newDelimitedWriter(s)
@ -227,9 +227,6 @@ func CanHop(ctx context.Context, host host.Host, id peer.ID) (bool, error) {
s.Reset()
return false, err
}
if err := helpers.FullClose(s); err != nil {
return false, err
}
if msg.GetType() != pb.CircuitRelay_STATUS {
return false, fmt.Errorf("unexpected relay response; not a status message (%d)", msg.GetType())
@ -390,6 +387,8 @@ func (r *Relay) handleHopStream(s network.Stream, msg *pb.CircuitRelay) {
*goroutines = 2
done := func() {
if atomic.AddInt32(goroutines, -1) == 0 {
s.Close()
bs.Close()
r.rmLiveHop(src.ID, dst.ID)
}
}
@ -410,7 +409,7 @@ func (r *Relay) handleHopStream(s network.Stream, msg *pb.CircuitRelay) {
bs.Reset()
} else {
// propagate the close
s.Close()
s.CloseWrite()
}
log.Debugf("relayed %d bytes from %s to %s", count, dst.ID.Pretty(), src.ID.Pretty())
}()
@ -429,7 +428,7 @@ func (r *Relay) handleHopStream(s network.Stream, msg *pb.CircuitRelay) {
s.Reset()
} else {
// propagate the close
bs.Close()
bs.CloseWrite()
}
log.Debugf("relayed %d bytes from %s to %s", count, src.ID.Pretty(), dst.ID.Pretty())
}()
@ -474,7 +473,7 @@ func (r *Relay) handleCanHop(s network.Stream, msg *pb.CircuitRelay) {
s.Reset()
log.Debugf("error writing relay response: %s", err.Error())
} else {
helpers.FullClose(s)
s.Close()
}
}
@ -485,7 +484,7 @@ func (r *Relay) handleError(s network.Stream, code pb.CircuitRelay_Status) {
s.Reset()
log.Debugf("error writing relay response: %s", err.Error())
} else {
helpers.FullClose(s)
s.Close()
}
}

10
p2p/protocol/internal/circuitv1-deprecated/transport_test.go

@ -65,7 +65,15 @@ func TestFullAddressTransportDial(t *testing.T) {
hosts := testSetupRelay(t, ctx)
addr, err := ma.NewMultiaddr(fmt.Sprintf("%s/ipfs/%s/p2p-circuit/ipfs/%s", hosts[1].Addrs()[0].String(), hosts[1].ID().Pretty(), hosts[2].ID().Pretty()))
var relayAddr ma.Multiaddr
for _, addr := range hosts[1].Addrs() {
// skip relay addrs.
if _, err := addr.ValueForProtocol(ma.P_CIRCUIT); err != nil {
relayAddr = addr
}
}
addr, err := ma.NewMultiaddr(fmt.Sprintf("%s/p2p/%s/p2p-circuit/p2p/%s", relayAddr.String(), hosts[1].ID().Pretty(), hosts[2].ID().Pretty()))
if err != nil {
t.Fatal(err)
}

Loading…
Cancel
Save