diff --git a/config/config.go b/config/config.go index 8be5a4399..ff6fa765e 100644 --- a/config/config.go +++ b/config/config.go @@ -1,6 +1,7 @@ package config import ( + "context" "crypto/rand" "errors" "fmt" @@ -38,6 +39,7 @@ import ( ma "github.com/multiformats/go-multiaddr" madns "github.com/multiformats/go-multiaddr-dns" + "github.com/quic-go/quic-go" "go.uber.org/fx" "go.uber.org/fx/fxevent" ) @@ -190,20 +192,11 @@ func (cfg *Config) makeSwarm(eventBus event.Bus, enableMetrics bool) (*swarm.Swa return swarm.NewSwarm(pid, cfg.Peerstore, eventBus, opts...) } -func (cfg *Config) addTransports(h host.Host) error { - swrm, ok := h.Network().(transport.TransportNetwork) - if !ok { - // Should probably skip this if no transports. - return fmt.Errorf("swarm does not support transports") - } - +func (cfg *Config) addTransports() ([]fx.Option, error) { fxopts := []fx.Option{ fx.WithLogger(func() fxevent.Logger { return getFXLogger() }), fx.Provide(fx.Annotate(tptu.New, fx.ParamTags(`name:"security"`))), fx.Supply(cfg.Muxers), - fx.Supply(h.ID()), - fx.Provide(func() host.Host { return h }), - fx.Provide(func() crypto.PrivKey { return h.Peerstore().PrivKey(h.ID()) }), fx.Provide(func() connmgr.ConnectionGater { return cfg.ConnectionGater }), fx.Provide(func() pnet.PSK { return cfg.PSK }), fx.Provide(func() network.ResourceManager { return cfg.ResourceManager }), @@ -265,12 +258,21 @@ func (cfg *Config) addTransports(h host.Host) error { if cfg.QUICReuse != nil { fxopts = append(fxopts, cfg.QUICReuse...) } else { - fxopts = append(fxopts, fx.Provide(quicreuse.NewConnManager)) // TODO: close the ConnManager when shutting down the node + fxopts = append(fxopts, + fx.Provide(func(key quic.StatelessResetKey, tokenGenerator quic.TokenGeneratorKey, _ *swarm.Swarm, lifecycle fx.Lifecycle) (*quicreuse.ConnManager, error) { + cm, err := quicreuse.NewConnManager(key, tokenGenerator) + if err != nil { + return nil, err + } + lifecycle.Append(fx.StopHook(cm.Close)) + return cm, nil + }), + ) } fxopts = append(fxopts, fx.Invoke( fx.Annotate( - func(tpts []transport.Transport) error { + func(swrm *swarm.Swarm, tpts []transport.Transport) error { for _, t := range tpts { if err := swrm.AddTransport(t); err != nil { return err @@ -278,43 +280,16 @@ func (cfg *Config) addTransports(h host.Host) error { } return nil }, - fx.ParamTags(`group:"transport"`), + fx.ParamTags("", `group:"transport"`), )), ) if cfg.Relay { fxopts = append(fxopts, fx.Invoke(circuitv2.AddTransport)) } - app := fx.New(fxopts...) - if err := app.Err(); err != nil { - h.Close() - return err - } - return nil + return fxopts, nil } -// NewNode constructs a new libp2p Host from the Config. -// -// This function consumes the config. Do not reuse it (really!). -func (cfg *Config) NewNode() (host.Host, error) { - // If possible check that the resource manager conn limit is higher than the - // limit set in the conn manager. - if l, ok := cfg.ResourceManager.(connmgr.GetConnLimiter); ok { - err := cfg.ConnManager.CheckLimit(l) - if err != nil { - log.Warn(fmt.Sprintf("rcmgr limit conflicts with connmgr limit: %v", err)) - } - } - - eventBus := eventbus.NewBus(eventbus.WithMetricsTracer(eventbus.NewMetricsTracer(eventbus.WithRegisterer(cfg.PrometheusRegisterer)))) - swrm, err := cfg.makeSwarm(eventBus, !cfg.DisableMetrics) - if err != nil { - return nil, err - } - - if !cfg.DisableMetrics { - rcmgr.MustRegisterWith(cfg.PrometheusRegisterer) - } - +func (cfg *Config) newBasicHost(swrm *swarm.Swarm, eventBus event.Bus) (*bhost.BasicHost, error) { h, err := bhost.NewHost(swrm, &bhost.HostOpts{ EventBus: eventBus, ConnManager: cfg.ConnManager, @@ -331,10 +306,8 @@ func (cfg *Config) NewNode() (host.Host, error) { PrometheusRegisterer: cfg.PrometheusRegisterer, }) if err != nil { - swrm.Close() return nil, err } - if cfg.Relay { // If we've enabled the relay, we should filter out relay // addresses by default. @@ -345,60 +318,137 @@ func (cfg *Config) NewNode() (host.Host, error) { return oldFactory(autorelay.Filter(addrs)) } } + return h, nil +} - if err := cfg.addTransports(h); err != nil { - h.Close() - return nil, err +// NewNode constructs a new libp2p Host from the Config. +// +// This function consumes the config. Do not reuse it (really!). +func (cfg *Config) NewNode() (host.Host, error) { + if cfg.EnableAutoRelay && !cfg.Relay { + return nil, fmt.Errorf("cannot enable autorelay; relay is not enabled") + } + // If possible check that the resource manager conn limit is higher than the + // limit set in the conn manager. + if l, ok := cfg.ResourceManager.(connmgr.GetConnLimiter); ok { + err := cfg.ConnManager.CheckLimit(l) + if err != nil { + log.Warn(fmt.Sprintf("rcmgr limit conflicts with connmgr limit: %v", err)) + } } - // TODO: This method succeeds if listening on one address succeeds. We - // should probably fail if listening on *any* addr fails. - if err := h.Network().Listen(cfg.ListenAddrs...); err != nil { - h.Close() + if !cfg.DisableMetrics { + rcmgr.MustRegisterWith(cfg.PrometheusRegisterer) + } + + fxopts := []fx.Option{ + fx.Provide(func() event.Bus { + return eventbus.NewBus(eventbus.WithMetricsTracer(eventbus.NewMetricsTracer(eventbus.WithRegisterer(cfg.PrometheusRegisterer)))) + }), + fx.Provide(func(eventBus event.Bus, lifecycle fx.Lifecycle) (*swarm.Swarm, error) { + sw, err := cfg.makeSwarm(eventBus, !cfg.DisableMetrics) + if err != nil { + return nil, err + } + lifecycle.Append(fx.StopHook(sw.Close)) + return sw, nil + }), + // Make sure the swarm constructor depends on the quicreuse.ConnManager. + // That way, the ConnManager will be started before the swarm, and more importantly, + // the swarm will be stopped before the ConnManager. + fx.Decorate(func(sw *swarm.Swarm, _ *quicreuse.ConnManager, lifecycle fx.Lifecycle) *swarm.Swarm { + lifecycle.Append(fx.Hook{ + OnStart: func(context.Context) error { + // TODO: This method succeeds if listening on one address succeeds. We + // should probably fail if listening on *any* addr fails. + return sw.Listen(cfg.ListenAddrs...) + }, + OnStop: func(context.Context) error { + return sw.Close() + }, + }) + return sw + }), + fx.Provide(cfg.newBasicHost), + fx.Provide(func(h *bhost.BasicHost, lifecycle fx.Lifecycle) host.Host { + lifecycle.Append(fx.StartHook(h.Start)) + return h + }), + fx.Provide(func(h host.Host) peer.ID { return h.ID() }), + fx.Provide(func(h host.Host) crypto.PrivKey { return h.Peerstore().PrivKey(h.ID()) }), + } + transportOpts, err := cfg.addTransports() + if err != nil { return nil, err } + fxopts = append(fxopts, transportOpts...) // Configure routing and autorelay - var router routing.PeerRouting if cfg.Routing != nil { - router, err = cfg.Routing(h) - if err != nil { - h.Close() - return nil, err - } + fxopts = append(fxopts, + fx.Provide(cfg.Routing), + fx.Provide(func(h host.Host, router routing.PeerRouting) *routed.RoutedHost { + return routed.Wrap(h, router) + }), + ) } // Note: h.AddrsFactory may be changed by relayFinder, but non-relay version is // used by AutoNAT below. - var ar *autorelay.AutoRelay - addrF := h.AddrsFactory if cfg.EnableAutoRelay { - if !cfg.Relay { - h.Close() - return nil, fmt.Errorf("cannot enable autorelay; relay is not enabled") - } if !cfg.DisableMetrics { mt := autorelay.WithMetricsTracer( autorelay.NewMetricsTracer(autorelay.WithRegisterer(cfg.PrometheusRegisterer))) mtOpts := []autorelay.Option{mt} cfg.AutoRelayOpts = append(mtOpts, cfg.AutoRelayOpts...) } + fxopts = append(fxopts, + fx.Invoke(func(h *bhost.BasicHost, lifecycle fx.Lifecycle) (*autorelay.AutoRelay, error) { + ar, err := autorelay.NewAutoRelay(h, cfg.AutoRelayOpts...) + if err != nil { + return nil, err + } + lifecycle.Append(fx.StartStopHook(ar.Start, ar.Close)) + return ar, nil + }), + ) + } - ar, err = autorelay.NewAutoRelay(h, cfg.AutoRelayOpts...) - if err != nil { - return nil, err - } + var bh *bhost.BasicHost + fxopts = append(fxopts, fx.Invoke(func(bho *bhost.BasicHost) { bh = bho })) + + var rh *routed.RoutedHost + if cfg.Routing != nil { + fxopts = append(fxopts, fx.Invoke(func(bho *routed.RoutedHost) { rh = bho })) + } + + app := fx.New(fxopts...) + if err := app.Start(context.Background()); err != nil { + return nil, err } + if err := cfg.addAutoNAT(bh); err != nil { + rh.Close() + return nil, err + } + + if cfg.Routing != nil { + return &closableRoutedHost{App: app, RoutedHost: rh}, nil + } + return &closableBasicHost{App: app, BasicHost: bh}, nil +} + +func (cfg *Config) addAutoNAT(h *bhost.BasicHost) error { + addrF := h.AddrsFactory autonatOpts := []autonat.Option{ autonat.UsingAddresses(func() []ma.Multiaddr { return addrF(h.AllAddrs()) }), } if !cfg.DisableMetrics { - autonatOpts = append(autonatOpts, - autonat.WithMetricsTracer( - autonat.NewMetricsTracer(autonat.WithRegisterer(cfg.PrometheusRegisterer)))) + autonatOpts = append(autonatOpts, autonat.WithMetricsTracer( + autonat.NewMetricsTracer(autonat.WithRegisterer(cfg.PrometheusRegisterer)), + )) } if cfg.AutoNATConfig.ThrottleInterval != 0 { autonatOpts = append(autonatOpts, @@ -408,11 +458,11 @@ func (cfg *Config) NewNode() (host.Host, error) { if cfg.AutoNATConfig.EnableService { autonatPrivKey, _, err := crypto.GenerateEd25519Key(rand.Reader) if err != nil { - return nil, err + return err } ps, err := pstoremem.NewPeerstore() if err != nil { - return nil, err + return err } // Pull out the pieces of the config that we _actually_ care about. @@ -438,14 +488,23 @@ func (cfg *Config) NewNode() (host.Host, error) { dialer, err := autoNatCfg.makeSwarm(eventbus.NewBus(), false) if err != nil { - h.Close() - return nil, err + return err } dialerHost := blankhost.NewBlankHost(dialer) - if err := autoNatCfg.addTransports(dialerHost); err != nil { + fxopts, err := autoNatCfg.addTransports() + if err != nil { + dialerHost.Close() + return err + } + fxopts = append(fxopts, + fx.Supply(dialerHost.ID()), + fx.Supply(dialer), + fx.Provide(func() crypto.PrivKey { return autonatPrivKey }), + ) + app := fx.New(fxopts...) + if err := app.Err(); err != nil { dialerHost.Close() - h.Close() - return nil, err + return err } // NOTE: We're dropping the blank host here but that's fine. It // doesn't really _do_ anything and doesn't even need to be @@ -458,25 +517,10 @@ func (cfg *Config) NewNode() (host.Host, error) { autonat, err := autonat.New(h, autonatOpts...) if err != nil { - h.Close() - return nil, fmt.Errorf("cannot enable autorelay; autonat failed to start: %v", err) + return fmt.Errorf("cannot enable autorelay; autonat failed to start: %v", err) } h.SetAutoNat(autonat) - - // start the host background tasks - h.Start() - - var ho host.Host - ho = h - if router != nil { - ho = routed.Wrap(h, router) - } - if ar != nil { - arh := autorelay.NewAutoRelayHost(ho, ar) - arh.Start() - ho = arh - } - return ho, nil + return nil } // Option is a libp2p config option that can be given to the libp2p constructor diff --git a/config/host.go b/config/host.go new file mode 100644 index 000000000..ac61df2cd --- /dev/null +++ b/config/host.go @@ -0,0 +1,30 @@ +package config + +import ( + "context" + + basichost "github.com/libp2p/go-libp2p/p2p/host/basic" + routed "github.com/libp2p/go-libp2p/p2p/host/routed" + + "go.uber.org/fx" +) + +type closableBasicHost struct { + *fx.App + *basichost.BasicHost +} + +func (h *closableBasicHost) Close() error { + _ = h.App.Stop(context.Background()) + return h.BasicHost.Close() +} + +type closableRoutedHost struct { + *fx.App + *routed.RoutedHost +} + +func (h *closableRoutedHost) Close() error { + _ = h.App.Stop(context.Background()) + return h.RoutedHost.Close() +} diff --git a/leaky_tests/README.md b/leaky_tests/README.md new file mode 100644 index 000000000..398a91a8e --- /dev/null +++ b/leaky_tests/README.md @@ -0,0 +1 @@ +Tests that leak goroutines for various reasons. Mostly because libp2p node shutdown logic doesn't run if we fail to construct the node. diff --git a/leaky_tests/leaky_test.go b/leaky_tests/leaky_test.go new file mode 100644 index 000000000..fd7d164ac --- /dev/null +++ b/leaky_tests/leaky_test.go @@ -0,0 +1,26 @@ +package leaky_test + +import ( + "strings" + "testing" + + "github.com/libp2p/go-libp2p" + "github.com/stretchr/testify/require" +) + +func TestBadTransportConstructor(t *testing.T) { + h, err := libp2p.New(libp2p.Transport(func() {})) + if err == nil { + h.Close() + t.Fatal("expected an error") + } + if !strings.Contains(err.Error(), "_test.go") { + t.Error("expected error to contain debugging info") + } +} + +func TestAutoNATService(t *testing.T) { + h, err := libp2p.New(libp2p.EnableNATService()) + require.NoError(t, err) + h.Close() +} diff --git a/libp2p_test.go b/libp2p_test.go index 7e67a6224..fe05b5aae 100644 --- a/libp2p_test.go +++ b/libp2p_test.go @@ -2,15 +2,17 @@ package libp2p import ( "context" + "crypto/rand" + "errors" "fmt" "regexp" - "strings" "testing" "github.com/libp2p/go-libp2p/core/connmgr" "github.com/libp2p/go-libp2p/core/crypto" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/routing" "github.com/libp2p/go-libp2p/core/transport" "github.com/libp2p/go-libp2p/p2p/net/swarm" "github.com/libp2p/go-libp2p/p2p/security/noise" @@ -18,6 +20,7 @@ import ( quic "github.com/libp2p/go-libp2p/p2p/transport/quic" "github.com/libp2p/go-libp2p/p2p/transport/tcp" webtransport "github.com/libp2p/go-libp2p/p2p/transport/webtransport" + "go.uber.org/goleak" ma "github.com/multiformats/go-multiaddr" "github.com/stretchr/testify/require" @@ -31,17 +34,6 @@ func TestNewHost(t *testing.T) { h.Close() } -func TestBadTransportConstructor(t *testing.T) { - h, err := New(Transport(func() {})) - if err == nil { - h.Close() - t.Fatal("expected an error") - } - if !strings.Contains(err.Error(), "libp2p_test.go") { - t.Error("expected error to contain debugging info") - } -} - func TestTransportConstructor(t *testing.T) { ctor := func( h host.Host, @@ -91,12 +83,6 @@ func TestInsecure(t *testing.T) { h.Close() } -func TestAutoNATService(t *testing.T) { - h, err := New(EnableNATService()) - require.NoError(t, err) - h.Close() -} - func TestDefaultListenAddrs(t *testing.T) { reTCP := regexp.MustCompile("/(ip)[4|6]/((0.0.0.0)|(::))/tcp/") reQUIC := regexp.MustCompile("/(ip)[4|6]/((0.0.0.0)|(::))/udp/([0-9]*)/quic-v1") @@ -356,3 +342,42 @@ func TestTransportCustomAddressWebTransportDoesNotStall(t *testing.T) { // We did not add the certhash to the multiaddr require.Equal(t, addrs[0], customAddr) } + +type mockPeerRouting struct { + queried []peer.ID +} + +func (r *mockPeerRouting) FindPeer(_ context.Context, id peer.ID) (peer.AddrInfo, error) { + r.queried = append(r.queried, id) + return peer.AddrInfo{}, errors.New("mock peer routing error") +} + +func TestRoutedHost(t *testing.T) { + mockRouter := &mockPeerRouting{} + h, err := New( + NoListenAddrs, + Routing(func(host.Host) (routing.PeerRouting, error) { return mockRouter, nil }), + DisableRelay(), + ) + require.NoError(t, err) + defer h.Close() + + priv, _, err := crypto.GenerateEd25519Key(rand.Reader) + require.NoError(t, err) + id, err := peer.IDFromPrivateKey(priv) + require.NoError(t, err) + require.EqualError(t, h.Connect(context.Background(), peer.AddrInfo{ID: id}), "mock peer routing error") + require.Equal(t, []peer.ID{id}, mockRouter.queried) +} + +func TestMain(m *testing.M) { + goleak.VerifyTestMain( + m, + // This will return eventually (5s timeout) but doesn't take a context. + goleak.IgnoreAnyFunction("github.com/koron/go-ssdp.Search"), + // Logging & Stats + goleak.IgnoreTopFunction("github.com/ipfs/go-log/v2/writer.(*MirrorWriter).logRoutine"), + goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"), + goleak.IgnoreAnyFunction("github.com/jackpal/go-nat-pmp.(*Client).GetExternalAddress"), + ) +} diff --git a/p2p/host/autorelay/host.go b/p2p/host/autorelay/host.go deleted file mode 100644 index c6bd9c570..000000000 --- a/p2p/host/autorelay/host.go +++ /dev/null @@ -1,23 +0,0 @@ -package autorelay - -import ( - "github.com/libp2p/go-libp2p/core/host" -) - -type AutoRelayHost struct { - host.Host - ar *AutoRelay -} - -func (h *AutoRelayHost) Close() error { - _ = h.ar.Close() - return h.Host.Close() -} - -func (h *AutoRelayHost) Start() { - h.ar.Start() -} - -func NewAutoRelayHost(h host.Host, ar *AutoRelay) *AutoRelayHost { - return &AutoRelayHost{Host: h, ar: ar} -} diff --git a/p2p/host/basic/basic_host.go b/p2p/host/basic/basic_host.go index c148b5286..123293911 100644 --- a/p2p/host/basic/basic_host.go +++ b/p2p/host/basic/basic_host.go @@ -1029,7 +1029,6 @@ func (h *BasicHost) Close() error { _ = h.emitters.evtLocalProtocolsUpdated.Close() _ = h.emitters.evtLocalAddrsUpdated.Close() - h.Network().Close() h.psManager.Close() if h.Peerstore() != nil { diff --git a/p2p/net/swarm/swarm.go b/p2p/net/swarm/swarm.go index a76edce6c..f1a0f590d 100644 --- a/p2p/net/swarm/swarm.go +++ b/p2p/net/swarm/swarm.go @@ -281,9 +281,10 @@ func (s *Swarm) close() { // Lots of goroutines but we might as well do this in parallel. We want to shut down as fast as // possible. - + s.refs.Add(len(listeners)) for l := range listeners { go func(l transport.Listener) { + defer s.refs.Done() if err := l.Close(); err != nil && err != transport.ErrListenerClosed { log.Errorf("error when shutting down listener: %s", err) } diff --git a/p2p/protocol/identify/id_test.go b/p2p/protocol/identify/id_test.go index 02e425143..0f9203568 100644 --- a/p2p/protocol/identify/id_test.go +++ b/p2p/protocol/identify/id_test.go @@ -18,7 +18,6 @@ import ( "github.com/libp2p/go-libp2p/core/protocol" "github.com/libp2p/go-libp2p/core/record" coretest "github.com/libp2p/go-libp2p/core/test" - basichost "github.com/libp2p/go-libp2p/p2p/host/basic" blhost "github.com/libp2p/go-libp2p/p2p/host/blank" "github.com/libp2p/go-libp2p/p2p/host/eventbus" "github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoremem" @@ -855,11 +854,10 @@ func TestOutOfOrderConnectedNotifs(t *testing.T) { // This callback may be called before identify's Connnected callback completes. If it does, the IdentifyWait should still finish successfully. h1.Network().Notify(&network.NotifyBundle{ ConnectedF: func(n network.Network, c network.Conn) { - bh1 := h1.(*basichost.BasicHost) - idChan := bh1.IDService().IdentifyWait(c) + idChan := h1.(interface{ IDService() identify.IDService }).IDService().IdentifyWait(c) go func() { <-idChan - protos, err := bh1.Peerstore().GetProtocols(h2.ID()) + protos, err := h1.Peerstore().GetProtocols(h2.ID()) if err != nil { errCh <- err } diff --git a/p2p/test/transport/rcmgr_test.go b/p2p/test/transport/rcmgr_test.go index 9a58a344f..c19a05dfc 100644 --- a/p2p/test/transport/rcmgr_test.go +++ b/p2p/test/transport/rcmgr_test.go @@ -15,6 +15,7 @@ import ( "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/p2p/protocol/identify" "github.com/libp2p/go-libp2p/p2p/protocol/ping" + "github.com/stretchr/testify/require" "go.uber.org/mock/gomock" )