diff --git a/p2p/net/swarm/limiter.go b/p2p/net/swarm/limiter.go index 19fc84c32..4bbfb1186 100644 --- a/p2p/net/swarm/limiter.go +++ b/p2p/net/swarm/limiter.go @@ -131,18 +131,6 @@ func (dl *dialLimiter) AddDialJob(dj *dialJob) { go dl.executeDial(dj) } -func (dl *dialLimiter) schedulePerPeerDial(j *dialJob) { - if dl.activePerPeer[j.peer] >= dl.perPeerLimit { - wlist := dl.waitingOnPeerLimit[j.peer] - dl.waitingOnPeerLimit[j.peer] = append(wlist, j) - return - } - - // take second needed token and start dial! - dl.activePerPeer[j.peer]++ - go dl.executeDial(j) -} - // executeDial calls the dialFunc, and reports the result through the response // channel when finished. Once the response is sent it also releases all tokens // it held during the dial. diff --git a/p2p/net/swarm/swarm_addr.go b/p2p/net/swarm/swarm_addr.go index 85d4aafe8..39fbe02af 100644 --- a/p2p/net/swarm/swarm_addr.go +++ b/p2p/net/swarm/swarm_addr.go @@ -25,15 +25,3 @@ func (s *Swarm) ListenAddresses() []ma.Multiaddr { func (s *Swarm) InterfaceListenAddresses() ([]ma.Multiaddr, error) { return addrutil.ResolveUnspecifiedAddresses(s.ListenAddresses(), nil) } - -// checkNATWarning checks if our observed addresses differ. if so, -// informs the user that certain things might not work yet -func checkNATWarning(s *Swarm, observed ma.Multiaddr, expected ma.Multiaddr) { - listen, err := s.InterfaceListenAddresses() - if err != nil { - log.Debugf("Error retrieving swarm.InterfaceListenAddresses: %s", err) - return - } - - addrutil.CheckNATWarning(observed, expected, listen) -} diff --git a/p2p/net/swarm/swarm_net_test.go b/p2p/net/swarm/swarm_net_test.go index fddc8523f..1294e899b 100644 --- a/p2p/net/swarm/swarm_net_test.go +++ b/p2p/net/swarm/swarm_net_test.go @@ -35,9 +35,10 @@ func TestConnectednessCorrect(t *testing.T) { dial(nets[1], nets[2]) dial(nets[3], nets[2]) - // there's something wrong with dial, i think. it's not finishing - // completely. there must be some async stuff. - <-time.After(100 * time.Millisecond) + // The notifications for new connections get sent out asynchronously. + // There is the potential for a race condition here, so we sleep to ensure + // that they have been received. + time.Sleep(time.Millisecond * 100) // test those connected show up correctly @@ -51,20 +52,44 @@ func TestConnectednessCorrect(t *testing.T) { expectConnectedness(t, nets[0], nets[2], inet.NotConnected) expectConnectedness(t, nets[1], nets[3], inet.NotConnected) + if len(nets[0].Peers()) != 2 { + t.Fatal("expected net 0 to have two peers") + } + + if len(nets[2].Conns()) != 2 { + t.Fatal("expected net 2 to have two conns") + } + + if len(nets[1].ConnsToPeer(nets[3].LocalPeer())) != 0 { + t.Fatal("net 1 should have no connections to net 3") + } + + if err := nets[2].ClosePeer(nets[1].LocalPeer()); err != nil { + t.Fatal(err) + } + + expectConnectedness(t, nets[2], nets[1], inet.NotConnected) + for _, n := range nets { n.Close() } + + for _, n := range nets { + <-n.Process().Closed() + } } func expectConnectedness(t *testing.T, a, b inet.Network, expected inet.Connectedness) { - es := "%s is connected to %s, but Connectedness incorrect. %s %s" - if a.Connectedness(b.LocalPeer()) != expected { - t.Errorf(es, a, b, printConns(a), printConns(b)) + es := "%s is connected to %s, but Connectedness incorrect. %s %s %s" + atob := a.Connectedness(b.LocalPeer()) + btoa := b.Connectedness(a.LocalPeer()) + if atob != expected { + t.Errorf(es, a, b, printConns(a), printConns(b), atob) } // test symmetric case - if b.Connectedness(a.LocalPeer()) != expected { - t.Errorf(es, b, a, printConns(b), printConns(a)) + if btoa != expected { + t.Errorf(es, b, a, printConns(b), printConns(a), btoa) } } @@ -75,3 +100,66 @@ func printConns(n inet.Network) string { } return s } + +func TestNetworkOpenStream(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + nets := make([]inet.Network, 4) + for i := 0; i < 4; i++ { + nets[i] = testutil.GenSwarmNetwork(t, ctx) + } + + dial := func(a, b inet.Network) { + testutil.DivulgeAddresses(b, a) + if _, err := a.DialPeer(ctx, b.LocalPeer()); err != nil { + t.Fatalf("Failed to dial: %s", err) + } + } + + dial(nets[0], nets[1]) + dial(nets[0], nets[3]) + dial(nets[1], nets[2]) + + done := make(chan bool) + nets[1].SetStreamHandler(func(s inet.Stream) { + defer close(done) + defer s.Close() + + buf := make([]byte, 10) + _, err := s.Read(buf) + if err != nil { + t.Error(err) + return + } + if string(buf) != "hello ipfs" { + t.Error("got wrong message") + } + }) + + s, err := nets[0].NewStream(ctx, nets[1].LocalPeer()) + if err != nil { + t.Fatal(err) + } + + _, err = s.Write([]byte("hello ipfs")) + if err != nil { + t.Fatal(err) + } + + err = s.Close() + if err != nil { + t.Fatal(err) + } + + select { + case <-done: + case <-time.After(time.Millisecond * 100): + t.Fatal("timed out waiting on stream") + } + + _, err = nets[1].NewStream(ctx, nets[3].LocalPeer()) + if err == nil { + t.Fatal("expected stream open 1->3 to fail") + } +}