Browse Source

Merge pull request #454 from libp2p/feat/autorelay

autorelay
pull/476/head
vyzo 6 years ago
committed by GitHub
parent
commit
3e2dc09212
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 44
      config/config.go
  2. 11
      options.go
  3. 23
      p2p/host/basic/basic_host.go
  4. 304
      p2p/host/relay/autorelay.go
  5. 219
      p2p/host/relay/autorelay_test.go
  6. 30
      p2p/host/relay/doc.go
  7. 7
      p2p/host/relay/log.go
  8. 36
      p2p/host/relay/relay.go
  9. 25
      p2p/protocol/identify/id.go
  10. 12
      package.json

44
config/config.go

@ -5,10 +5,13 @@ import (
"fmt"
bhost "github.com/libp2p/go-libp2p/p2p/host/basic"
relay "github.com/libp2p/go-libp2p/p2p/host/relay"
routed "github.com/libp2p/go-libp2p/p2p/host/routed"
logging "github.com/ipfs/go-log"
circuit "github.com/libp2p/go-libp2p-circuit"
crypto "github.com/libp2p/go-libp2p-crypto"
discovery "github.com/libp2p/go-libp2p-discovery"
host "github.com/libp2p/go-libp2p-host"
ifconnmgr "github.com/libp2p/go-libp2p-interface-connmgr"
pnet "github.com/libp2p/go-libp2p-interface-pnet"
@ -16,6 +19,7 @@ import (
inet "github.com/libp2p/go-libp2p-net"
peer "github.com/libp2p/go-libp2p-peer"
pstore "github.com/libp2p/go-libp2p-peerstore"
routing "github.com/libp2p/go-libp2p-routing"
swarm "github.com/libp2p/go-libp2p-swarm"
tptu "github.com/libp2p/go-libp2p-transport-upgrader"
filter "github.com/libp2p/go-maddr-filter"
@ -31,6 +35,8 @@ type AddrsFactory = bhost.AddrsFactory
// NATManagerC is a NATManager constructor.
type NATManagerC func(inet.Network) bhost.NATManager
type RoutingC func(host.Host) (routing.PeerRouting, error)
// Config describes a set of settings for a libp2p node
//
// This is *not* a stable interface. Use the options defined in the root
@ -58,6 +64,8 @@ type Config struct {
Reporter metrics.Reporter
DisablePing bool
Routing RoutingC
}
// NewNode constructs a new libp2p Host from the Config.
@ -99,8 +107,8 @@ func (cfg *Config) NewNode(ctx context.Context) (host.Host, error) {
swrm.Filters = cfg.Filters
}
// TODO: make host implementation configurable.
h, err := bhost.NewHost(ctx, swrm, &bhost.HostOpts{
var h host.Host
h, err = bhost.NewHost(ctx, swrm, &bhost.HostOpts{
ConnManager: cfg.ConnManager,
AddrsFactory: cfg.AddrsFactory,
NATManager: cfg.NATManager,
@ -158,7 +166,37 @@ func (cfg *Config) NewNode(ctx context.Context) (host.Host, error) {
return nil, err
}
// TODO: Configure routing (it's a pain to setup).
if cfg.Routing != nil {
router, err := cfg.Routing(h)
if err != nil {
h.Close()
return nil, err
}
crouter, ok := router.(routing.ContentRouting)
if ok {
if cfg.Relay {
discovery := discovery.NewRoutingDiscovery(crouter)
hop := false
for _, opt := range cfg.RelayOpts {
if opt == circuit.OptHop {
hop = true
break
}
}
if hop {
h = relay.NewRelayHost(swrm.Context(), h.(*bhost.BasicHost), discovery)
} else {
h = relay.NewAutoRelayHost(swrm.Context(), h.(*bhost.BasicHost), discovery)
}
}
}
h = routed.Wrap(h, router)
}
// TODO: Bootstrapping.
return h, nil

11
options.go

@ -260,6 +260,17 @@ func Ping(enable bool) Option {
}
}
// Routing will configure libp2p to use routing.
func Routing(rt config.RoutingC) Option {
return func(cfg *Config) error {
if cfg.Routing != nil {
return fmt.Errorf("cannot specify multiple routing options")
}
cfg.Routing = rt
return nil
}
}
// NoListenAddrs will configure libp2p to not listen by default.
//
// This will both clear any configured listen addrs and prevent libp2p from

23
p2p/host/basic/basic_host.go

@ -59,10 +59,11 @@ type BasicHost struct {
ids *identify.IDService
pings *ping.PingService
natmgr NATManager
addrs AddrsFactory
maResolver *madns.Resolver
cmgr ifconnmgr.ConnManager
AddrsFactory AddrsFactory
negtimeout time.Duration
proc goprocess.Process
@ -106,11 +107,11 @@ type HostOpts struct {
// NewHost constructs a new *BasicHost and activates it by attaching its stream and connection handlers to the given inet.Network.
func NewHost(ctx context.Context, net inet.Network, opts *HostOpts) (*BasicHost, error) {
h := &BasicHost{
network: net,
mux: msmux.NewMultistreamMuxer(),
negtimeout: DefaultNegotiationTimeout,
addrs: DefaultAddrsFactory,
maResolver: madns.DefaultResolver,
network: net,
mux: msmux.NewMultistreamMuxer(),
negtimeout: DefaultNegotiationTimeout,
AddrsFactory: DefaultAddrsFactory,
maResolver: madns.DefaultResolver,
}
h.proc = goprocessctx.WithContextAndTeardown(ctx, func() error {
@ -136,7 +137,7 @@ func NewHost(ctx context.Context, net inet.Network, opts *HostOpts) (*BasicHost,
}
if opts.AddrsFactory != nil {
h.addrs = opts.AddrsFactory
h.AddrsFactory = opts.AddrsFactory
}
if opts.NATManager != nil {
@ -256,6 +257,12 @@ func (h *BasicHost) newStreamHandler(s inet.Stream) {
go handle(protoID, s)
}
// PushIdentify pushes an identify update through the identify push protocol
// Warning: this interface is unstable and may disappear in the future.
func (h *BasicHost) PushIdentify() {
h.ids.Push()
}
// ID returns the (local) peer.ID associated with this Host
func (h *BasicHost) ID() peer.ID {
return h.Network().LocalPeer()
@ -474,7 +481,7 @@ func (h *BasicHost) ConnManager() ifconnmgr.ConnManager {
// Addrs returns listening addresses that are safe to announce to the network.
// The output is the same as AllAddrs, but processed by AddrsFactory.
func (h *BasicHost) Addrs() []ma.Multiaddr {
return h.addrs(h.AllAddrs())
return h.AddrsFactory(h.AllAddrs())
}
// mergeAddrs merges input address lists, leave only unique addresses

304
p2p/host/relay/autorelay.go

@ -0,0 +1,304 @@
package relay
import (
"context"
"fmt"
"math/rand"
"sync"
"time"
basic "github.com/libp2p/go-libp2p/p2p/host/basic"
autonat "github.com/libp2p/go-libp2p-autonat"
_ "github.com/libp2p/go-libp2p-circuit"
discovery "github.com/libp2p/go-libp2p-discovery"
host "github.com/libp2p/go-libp2p-host"
inet "github.com/libp2p/go-libp2p-net"
peer "github.com/libp2p/go-libp2p-peer"
pstore "github.com/libp2p/go-libp2p-peerstore"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr-net"
)
const (
RelayRendezvous = "/libp2p/relay"
)
var (
DesiredRelays = 3
BootDelay = 60 * time.Second
unspecificRelay ma.Multiaddr
)
func init() {
var err error
unspecificRelay, err = ma.NewMultiaddr("/p2p-circuit")
if err != nil {
panic(err)
}
}
// AutoRelayHost is a Host that uses relays for connectivity when a NAT is detected.
type AutoRelayHost struct {
*basic.BasicHost
discover discovery.Discoverer
autonat autonat.AutoNAT
addrsF basic.AddrsFactory
disconnect chan struct{}
mx sync.Mutex
relays map[peer.ID]pstore.PeerInfo
addrs []ma.Multiaddr
}
func NewAutoRelayHost(ctx context.Context, bhost *basic.BasicHost, discover discovery.Discoverer) *AutoRelayHost {
h := &AutoRelayHost{
BasicHost: bhost,
discover: discover,
addrsF: bhost.AddrsFactory,
relays: make(map[peer.ID]pstore.PeerInfo),
disconnect: make(chan struct{}, 1),
}
h.autonat = autonat.NewAutoNAT(ctx, bhost, h.baseAddrs)
bhost.AddrsFactory = h.hostAddrs
bhost.Network().Notify(h)
go h.background(ctx)
return h
}
func (h *AutoRelayHost) hostAddrs(addrs []ma.Multiaddr) []ma.Multiaddr {
h.mx.Lock()
defer h.mx.Unlock()
if h.addrs != nil && h.autonat.Status() == autonat.NATStatusPrivate {
return h.addrs
} else {
return filterUnspecificRelay(h.addrsF(addrs))
}
}
func (h *AutoRelayHost) baseAddrs() []ma.Multiaddr {
return filterUnspecificRelay(h.addrsF(h.AllAddrs()))
}
func (h *AutoRelayHost) background(ctx context.Context) {
select {
case <-time.After(autonat.AutoNATBootDelay + BootDelay):
case <-ctx.Done():
return
}
for {
wait := autonat.AutoNATRefreshInterval
switch h.autonat.Status() {
case autonat.NATStatusUnknown:
wait = autonat.AutoNATRetryInterval
case autonat.NATStatusPublic:
case autonat.NATStatusPrivate:
h.findRelays(ctx)
}
select {
case <-h.disconnect:
// invalidate addrs
h.mx.Lock()
h.addrs = nil
h.mx.Unlock()
case <-time.After(wait):
case <-ctx.Done():
return
}
}
}
func (h *AutoRelayHost) findRelays(ctx context.Context) {
h.mx.Lock()
if len(h.relays) >= DesiredRelays {
h.mx.Unlock()
return
}
need := DesiredRelays - len(h.relays)
h.mx.Unlock()
limit := 20
if need > limit/2 {
limit = 2 * need
}
dctx, cancel := context.WithTimeout(ctx, 60*time.Second)
pis, err := discovery.FindPeers(dctx, h.discover, RelayRendezvous, limit)
cancel()
if err != nil {
log.Debugf("error discovering relays: %s", err.Error())
return
}
pis = h.selectRelays(pis)
update := 0
for _, pi := range pis {
h.mx.Lock()
if _, ok := h.relays[pi.ID]; ok {
h.mx.Unlock()
continue
}
h.mx.Unlock()
cctx, cancel := context.WithTimeout(ctx, 60*time.Second)
err = h.Connect(cctx, pi)
cancel()
if err != nil {
log.Debugf("error connecting to relay %s: %s", pi.ID, err.Error())
continue
}
log.Debugf("connected to relay %s", pi.ID)
h.mx.Lock()
h.relays[pi.ID] = pi
h.mx.Unlock()
// tag the connection as very important
h.ConnManager().TagPeer(pi.ID, "relay", 42)
update++
need--
if need == 0 {
break
}
}
if update > 0 || h.addrs == nil {
h.updateAddrs()
}
}
func (h *AutoRelayHost) selectRelays(pis []pstore.PeerInfo) []pstore.PeerInfo {
// TODO better relay selection strategy; this just selects random relays
// but we should probably use ping latency as the selection metric
shuffleRelays(pis)
return pis
}
func (h *AutoRelayHost) updateAddrs() {
h.doUpdateAddrs()
h.PushIdentify()
}
// This function updates our NATed advertised addrs (h.addrs)
// The public addrs are rewritten so that they only retain the public IP part; they
// become undialable but are useful as a hint to the dialer to determine whether or not
// to dial private addrs.
// The non-public addrs are included verbatim so that peers behind the same NAT/firewall
// can still dial us directly.
// On top of those, we add the relay-specific addrs for the relays to which we are
// connected. For each non-private relay addr, we encapsulate the p2p-circuit addr
// through which we can be dialed.
func (h *AutoRelayHost) doUpdateAddrs() {
h.mx.Lock()
defer h.mx.Unlock()
addrs := h.baseAddrs()
raddrs := make([]ma.Multiaddr, 0, len(addrs)+len(h.relays))
// remove our public addresses from the list and replace them by just the public IP
for _, addr := range addrs {
if manet.IsPublicAddr(addr) {
ip, err := addr.ValueForProtocol(ma.P_IP4)
if err == nil {
pub, err := ma.NewMultiaddr(fmt.Sprintf("/ip4/%s", ip))
if err != nil {
panic(err)
}
if !containsAddr(raddrs, pub) {
raddrs = append(raddrs, pub)
}
continue
}
ip, err = addr.ValueForProtocol(ma.P_IP6)
if err == nil {
pub, err := ma.NewMultiaddr(fmt.Sprintf("/ip6/%s", ip))
if err != nil {
panic(err)
}
if !containsAddr(raddrs, pub) {
raddrs = append(raddrs, pub)
}
continue
}
} else {
raddrs = append(raddrs, addr)
}
}
// add relay specific addrs to the list
for _, pi := range h.relays {
circuit, err := ma.NewMultiaddr(fmt.Sprintf("/p2p/%s/p2p-circuit", pi.ID.Pretty()))
if err != nil {
panic(err)
}
for _, addr := range pi.Addrs {
if !manet.IsPrivateAddr(addr) {
pub := addr.Encapsulate(circuit)
raddrs = append(raddrs, pub)
}
}
}
h.addrs = raddrs
}
func filterUnspecificRelay(addrs []ma.Multiaddr) []ma.Multiaddr {
res := make([]ma.Multiaddr, 0, len(addrs))
for _, addr := range addrs {
if addr.Equal(unspecificRelay) {
continue
}
res = append(res, addr)
}
return res
}
func shuffleRelays(pis []pstore.PeerInfo) {
for i := range pis {
j := rand.Intn(i + 1)
pis[i], pis[j] = pis[j], pis[i]
}
}
func containsAddr(lst []ma.Multiaddr, addr ma.Multiaddr) bool {
for _, xaddr := range lst {
if xaddr.Equal(addr) {
return true
}
}
return false
}
// notify
func (h *AutoRelayHost) Listen(inet.Network, ma.Multiaddr) {}
func (h *AutoRelayHost) ListenClose(inet.Network, ma.Multiaddr) {}
func (h *AutoRelayHost) Connected(inet.Network, inet.Conn) {}
func (h *AutoRelayHost) Disconnected(_ inet.Network, c inet.Conn) {
p := c.RemotePeer()
h.mx.Lock()
defer h.mx.Unlock()
if _, ok := h.relays[p]; ok {
delete(h.relays, p)
select {
case h.disconnect <- struct{}{}:
default:
}
}
}
func (h *AutoRelayHost) OpenedStream(inet.Network, inet.Stream) {}
func (h *AutoRelayHost) ClosedStream(inet.Network, inet.Stream) {}
var _ host.Host = (*AutoRelayHost)(nil)

219
p2p/host/relay/autorelay_test.go

@ -0,0 +1,219 @@
package relay_test
import (
"context"
"net"
"sync"
"testing"
"time"
libp2p "github.com/libp2p/go-libp2p"
relay "github.com/libp2p/go-libp2p/p2p/host/relay"
ggio "github.com/gogo/protobuf/io"
cid "github.com/ipfs/go-cid"
autonat "github.com/libp2p/go-libp2p-autonat"
autonatpb "github.com/libp2p/go-libp2p-autonat/pb"
circuit "github.com/libp2p/go-libp2p-circuit"
host "github.com/libp2p/go-libp2p-host"
inet "github.com/libp2p/go-libp2p-net"
peer "github.com/libp2p/go-libp2p-peer"
pstore "github.com/libp2p/go-libp2p-peerstore"
routing "github.com/libp2p/go-libp2p-routing"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr-net"
)
// test specific parameters
func init() {
autonat.AutoNATIdentifyDelay = 100 * time.Millisecond
autonat.AutoNATBootDelay = 1 * time.Second
relay.BootDelay = 1 * time.Second
manet.Private4 = []*net.IPNet{}
}
// mock routing
type mockRoutingTable struct {
mx sync.Mutex
providers map[string]map[peer.ID]pstore.PeerInfo
}
type mockRouting struct {
h host.Host
tab *mockRoutingTable
}
func newMockRoutingTable() *mockRoutingTable {
return &mockRoutingTable{providers: make(map[string]map[peer.ID]pstore.PeerInfo)}
}
func newMockRouting(h host.Host, tab *mockRoutingTable) *mockRouting {
return &mockRouting{h: h, tab: tab}
}
func (m *mockRouting) FindPeer(ctx context.Context, p peer.ID) (pstore.PeerInfo, error) {
return pstore.PeerInfo{}, routing.ErrNotFound
}
func (m *mockRouting) Provide(ctx context.Context, cid cid.Cid, bcast bool) error {
m.tab.mx.Lock()
defer m.tab.mx.Unlock()
pmap, ok := m.tab.providers[cid.String()]
if !ok {
pmap = make(map[peer.ID]pstore.PeerInfo)
m.tab.providers[cid.String()] = pmap
}
pmap[m.h.ID()] = pstore.PeerInfo{ID: m.h.ID(), Addrs: m.h.Addrs()}
return nil
}
func (m *mockRouting) FindProvidersAsync(ctx context.Context, cid cid.Cid, limit int) <-chan pstore.PeerInfo {
ch := make(chan pstore.PeerInfo)
go func() {
defer close(ch)
m.tab.mx.Lock()
defer m.tab.mx.Unlock()
pmap, ok := m.tab.providers[cid.String()]
if !ok {
return
}
for _, pi := range pmap {
select {
case ch <- pi:
case <-ctx.Done():
return
}
}
}()
return ch
}
// mock autonat
func makeAutoNATServicePrivate(ctx context.Context, t *testing.T) host.Host {
h, err := libp2p.New(ctx)
if err != nil {
t.Fatal(err)
}
h.SetStreamHandler(autonat.AutoNATProto, sayAutoNATPrivate)
return h
}
func sayAutoNATPrivate(s inet.Stream) {
defer s.Close()
w := ggio.NewDelimitedWriter(s)
res := autonatpb.Message{
Type: autonatpb.Message_DIAL_RESPONSE.Enum(),
DialResponse: newDialResponseError(autonatpb.Message_E_DIAL_ERROR, "no dialable addresses"),
}
w.WriteMsg(&res)
}
func newDialResponseError(status autonatpb.Message_ResponseStatus, text string) *autonatpb.Message_DialResponse {
dr := new(autonatpb.Message_DialResponse)
dr.Status = status.Enum()
dr.StatusText = &text
return dr
}
// connector
func connect(t *testing.T, a, b host.Host) {
pinfo := pstore.PeerInfo{ID: a.ID(), Addrs: a.Addrs()}
err := b.Connect(context.Background(), pinfo)
if err != nil {
t.Fatal(err)
}
}
// and the actual test!
func TestAutoRelay(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
mtab := newMockRoutingTable()
makeRouting := func(h host.Host) (routing.PeerRouting, error) {
mr := newMockRouting(h, mtab)
return mr, nil
}
h1 := makeAutoNATServicePrivate(ctx, t)
_, err := libp2p.New(ctx, libp2p.EnableRelay(circuit.OptHop), libp2p.Routing(makeRouting))
if err != nil {
t.Fatal(err)
}
h3, err := libp2p.New(ctx, libp2p.EnableRelay(), libp2p.Routing(makeRouting))
if err != nil {
t.Fatal(err)
}
h4, err := libp2p.New(ctx, libp2p.EnableRelay())
// verify that we don't advertise relay addrs initially
for _, addr := range h3.Addrs() {
_, err := addr.ValueForProtocol(circuit.P_CIRCUIT)
if err == nil {
t.Fatal("relay addr advertised before auto detection")
}
}
// connect to AutoNAT and let detection/discovery work its magic
connect(t, h1, h3)
time.Sleep(3 * time.Second)
// verify that we now advertise relay addrs (but not unspecific relay addrs)
unspecificRelay, err := ma.NewMultiaddr("/p2p-circuit")
if err != nil {
t.Fatal(err)
}
haveRelay := false
for _, addr := range h3.Addrs() {
if addr.Equal(unspecificRelay) {
t.Fatal("unspecific relay addr advertised")
}
_, err := addr.ValueForProtocol(circuit.P_CIRCUIT)
if err == nil {
haveRelay = true
}
}
if !haveRelay {
t.Fatal("No relay addrs advertised")
}
// verify that we can connect through the relay
var raddrs []ma.Multiaddr
for _, addr := range h3.Addrs() {
_, err := addr.ValueForProtocol(circuit.P_CIRCUIT)
if err == nil {
raddrs = append(raddrs, addr)
}
}
err = h4.Connect(ctx, pstore.PeerInfo{ID: h3.ID(), Addrs: raddrs})
if err != nil {
t.Fatal(err)
}
// verify that we have pushed relay addrs to connected peers
haveRelay = false
for _, addr := range h1.Peerstore().Addrs(h3.ID()) {
if addr.Equal(unspecificRelay) {
t.Fatal("unspecific relay addr advertised")
}
_, err := addr.ValueForProtocol(circuit.P_CIRCUIT)
if err == nil {
haveRelay = true
}
}
if !haveRelay {
t.Fatal("No relay addrs pushed")
}
}

30
p2p/host/relay/doc.go

@ -0,0 +1,30 @@
/*
The relay package contains host implementations that automatically
advertise relay addresses when the presence of NAT is detected. This
feature is dubbed `autorelay`.
Warning: the internal interfaces are unstable.
System Components:
- AutoNATService instances -- see https://github.com/libp2p/go-libp2p-autonat-svc
- One or more relays, instances of `RelayHost`
- The autorelayed hosts, instances of `AutoRelayHost`.
How it works:
- `AutoNATService` instances are instantiated in the
bootstrappers (or other well known publicly reachable hosts)
- `RelayHost`s are constructed with
`libp2p.New(libp2p.EnableRelay(circuit.OptHop), libp2p.Routing(makeDHT))`.
They provide Relay Hop services, and advertise through the DHT
in the `/libp2p/relay` namespace
- `AutoRelayHost`s are constructed with `libp2p.New(libp2p.Routing(makeDHT))`
They passively discover autonat service instances and test dialability of
their listen address set through them. When the presence of NAT is detected,
they discover relays through the DHT, connect to some of them and begin
advertising relay addresses. The new set of addresses is propagated to
connected peers through the `identify/push` protocol.
*/
package relay

7
p2p/host/relay/log.go

@ -0,0 +1,7 @@
package relay
import (
logging "github.com/ipfs/go-log"
)
var log = logging.Logger("autorelay")

36
p2p/host/relay/relay.go

@ -0,0 +1,36 @@
package relay
import (
"context"
basic "github.com/libp2p/go-libp2p/p2p/host/basic"
discovery "github.com/libp2p/go-libp2p-discovery"
host "github.com/libp2p/go-libp2p-host"
ma "github.com/multiformats/go-multiaddr"
)
// RelayHost is a Host that provides Relay services.
type RelayHost struct {
*basic.BasicHost
advertise discovery.Advertiser
addrsF basic.AddrsFactory
}
// New constructs a new RelayHost
func NewRelayHost(ctx context.Context, bhost *basic.BasicHost, advertise discovery.Advertiser) *RelayHost {
h := &RelayHost{
BasicHost: bhost,
addrsF: bhost.AddrsFactory,
advertise: advertise,
}
bhost.AddrsFactory = h.hostAddrs
discovery.Advertise(ctx, advertise, RelayRendezvous)
return h
}
func (h *RelayHost) hostAddrs(addrs []ma.Multiaddr) []ma.Multiaddr {
return filterUnspecificRelay(h.addrsF(addrs))
}
var _ host.Host = (*RelayHost)(nil)

25
p2p/protocol/identify/id.go

@ -3,6 +3,7 @@ package identify
import (
"context"
"sync"
"time"
pb "github.com/libp2p/go-libp2p/p2p/protocol/identify/pb"
@ -23,6 +24,9 @@ var log = logging.Logger("net/identify")
// ID is the protocol.ID of the Identify Service.
const ID = "/ipfs/id/1.0.0"
// IDPush is the protocol.ID of the Identify push protocol
const IDPush = "/ipfs/id/push/1.0.0"
// LibP2PVersion holds the current protocol version for a client running this code
// TODO(jbenet): fix the versioning mess.
const LibP2PVersion = "ipfs/0.1.0"
@ -60,6 +64,7 @@ func NewIDService(h host.Host) *IDService {
currid: make(map[inet.Conn]chan struct{}),
}
h.SetStreamHandler(ID, s.requestHandler)
h.SetStreamHandler(IDPush, s.pushHandler)
h.Network().Notify((*netNotifiee)(s))
return s
}
@ -138,6 +143,26 @@ func (ids *IDService) responseHandler(s inet.Stream) {
go inet.FullClose(s)
}
func (ids *IDService) pushHandler(s inet.Stream) {
ids.responseHandler(s)
}
func (ids *IDService) Push() {
for _, p := range ids.Host.Network().Peers() {
go func(p peer.ID) {
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()
s, err := ids.Host.NewStream(ctx, p, IDPush)
if err != nil {
log.Debugf("error opening push stream: %s", err.Error())
return
}
ids.requestHandler(s)
}(p)
}
}
func (ids *IDService) populateMessage(mes *pb.Identify, c inet.Conn) {
// set protocols this node is currently handling

12
package.json

@ -226,6 +226,18 @@
"hash": "QmdxUuburamoF6zF9qjeQC4WYcWGbWuRmdLacMEsW8ioD8",
"name": "gogo-protobuf",
"version": "0.0.0"
},
{
"author": "vyzo",
"hash": "QmT8eNT96scr6wcrARzyxrCcsBAPKn9GkEx2TeXZniHiMP",
"name": "go-libp2p-discovery",
"version": "1.0.2"
},
{
"author": "vyzo",
"hash": "QmU2XQcwPxikg1jZqDBMFgLQVan7zjT2HtQWrWui3vbUVS",
"name": "go-libp2p-autonat",
"version": "1.0.2"
}
],
"gxVersion": "0.4.0",

Loading…
Cancel
Save