Browse Source

Merge pull request #22 from ipfs/feat/fallback-dialer

Add fallback dialer
pull/17/merge
Jeromy Johnson 9 years ago
parent
commit
be1f55801d
  1. 2
      .travis.yml
  2. 5
      p2p/net/conn/dial.go
  3. 2
      p2p/net/conn/interface.go
  4. 55
      p2p/net/swarm/dial_test.go
  5. 25
      p2p/net/swarm/swarm_test.go
  6. 64
      p2p/net/transport/fallback.go
  7. 135
      p2p/net/transport/transport_test.go

2
.travis.yml

@ -5,7 +5,7 @@ os:
language: go
go:
- 1.5.1
- 1.5.2
env:
- GO15VENDOREXPERIMENT=1

5
p2p/net/conn/dial.go

@ -22,6 +22,7 @@ func NewDialer(p peer.ID, pk ci.PrivKey, wrap WrapFunc) *Dialer {
LocalPeer: p,
PrivateKey: pk,
Wrapper: wrap,
fallback: new(transport.FallbackDialer),
}
}
@ -115,6 +116,10 @@ func (d *Dialer) subDialerForAddr(raddr ma.Multiaddr) transport.Dialer {
}
}
if d.fallback.Matches(raddr) {
return d.fallback
}
return nil
}

2
p2p/net/conn/interface.go

@ -65,6 +65,8 @@ type Dialer struct {
// Wrapper to wrap the raw connection (optional)
Wrapper WrapFunc
fallback transport.Dialer
}
// Listener is an object that can accept connections. It matches net.Listener

55
p2p/net/swarm/dial_test.go

@ -18,6 +18,61 @@ import (
ma "gx/ipfs/QmcobAGsCjYt5DXoq9et9L8yR8er7o7Cu3DTvpaq12jYSz/go-multiaddr"
)
func closeSwarms(swarms []*Swarm) {
for _, s := range swarms {
s.Close()
}
}
func TestBasicDial(t *testing.T) {
t.Parallel()
ctx := context.Background()
swarms := makeSwarms(ctx, t, 2)
defer closeSwarms(swarms)
s1 := swarms[0]
s2 := swarms[1]
s1.peers.AddAddrs(s2.local, s2.ListenAddresses(), peer.PermanentAddrTTL)
c, err := s1.Dial(ctx, s2.local)
if err != nil {
t.Fatal(err)
}
s, err := c.NewStream()
if err != nil {
t.Fatal(err)
}
s.Close()
}
func TestDialWithNoListeners(t *testing.T) {
t.Parallel()
ctx := context.Background()
s1 := makeDialOnlySwarm(ctx, t)
swarms := makeSwarms(ctx, t, 1)
defer closeSwarms(swarms)
s2 := swarms[0]
s1.peers.AddAddrs(s2.local, s2.ListenAddresses(), peer.PermanentAddrTTL)
c, err := s1.Dial(ctx, s2.local)
if err != nil {
t.Fatal(err)
}
s, err := c.NewStream()
if err != nil {
t.Fatal(err)
}
s.Close()
}
func acceptAndHang(l net.Listener) {
conns := make([]net.Conn, 0, 10)
for {

25
p2p/net/swarm/swarm_test.go

@ -24,31 +24,48 @@ func EchoStreamHandler(stream inet.Stream) {
// pull out the ipfs conn
c := stream.Conn()
log.Infof("%s ponging to %s", c.LocalPeer(), c.RemotePeer())
log.Errorf("%s ponging to %s", c.LocalPeer(), c.RemotePeer())
buf := make([]byte, 4)
for {
if _, err := stream.Read(buf); err != nil {
if err != io.EOF {
log.Info("ping receive error:", err)
log.Error("ping receive error:", err)
}
return
}
if !bytes.Equal(buf, []byte("ping")) {
log.Infof("ping receive error: ping != %s %v", buf, buf)
log.Errorf("ping receive error: ping != %s %v", buf, buf)
return
}
if _, err := stream.Write([]byte("pong")); err != nil {
log.Info("pond send error:", err)
log.Error("pond send error:", err)
return
}
}
}()
}
func makeDialOnlySwarm(ctx context.Context, t *testing.T) *Swarm {
id := testutil.RandIdentityOrFatal(t)
peerstore := peer.NewPeerstore()
peerstore.AddPubKey(id.ID(), id.PublicKey())
peerstore.AddPrivKey(id.ID(), id.PrivateKey())
swarm, err := NewSwarm(ctx, nil, id.ID(), peerstore, metrics.NewBandwidthCounter())
if err != nil {
t.Fatal(err)
}
swarm.SetStreamHandler(EchoStreamHandler)
return swarm
}
func makeSwarms(ctx context.Context, t *testing.T, num int) []*Swarm {
swarms := make([]*Swarm, 0, num)

64
p2p/net/transport/fallback.go

@ -0,0 +1,64 @@
package transport
import (
"fmt"
manet "gx/ipfs/QmQB7mNP3QE7b4zP2MQmsyJDqG5hzYE2CL8k1VyLWky2Ed/go-multiaddr-net"
mautp "gx/ipfs/QmQB7mNP3QE7b4zP2MQmsyJDqG5hzYE2CL8k1VyLWky2Ed/go-multiaddr-net/utp"
utp "gx/ipfs/QmVs3wq4cN64TFCxANzgSHjGPrjMnRnwPrxU8bqc7YP42s/utp"
mafmt "gx/ipfs/QmWLfU4tstw2aNcTykDm44xbSTCYJ9pUJwfhQCKGwckcHx/mafmt"
ma "gx/ipfs/QmcobAGsCjYt5DXoq9et9L8yR8er7o7Cu3DTvpaq12jYSz/go-multiaddr"
)
type FallbackDialer struct {
madialer manet.Dialer
}
func (fbd *FallbackDialer) Matches(a ma.Multiaddr) bool {
return mafmt.TCP.Matches(a) || mafmt.UTP.Matches(a)
}
func (fbd *FallbackDialer) Dial(a ma.Multiaddr) (Conn, error) {
if mafmt.TCP.Matches(a) {
return fbd.tcpDial(a)
}
if mafmt.UTP.Matches(a) {
return fbd.tcpDial(a)
}
return nil, fmt.Errorf("cannot dial %s with fallback dialer", a)
}
func (fbd *FallbackDialer) tcpDial(raddr ma.Multiaddr) (Conn, error) {
var c manet.Conn
var err error
c, err = fbd.madialer.Dial(raddr)
if err != nil {
return nil, err
}
return &connWrap{
Conn: c,
}, nil
}
func (fbd *FallbackDialer) utpDial(raddr ma.Multiaddr) (Conn, error) {
_, addr, err := manet.DialArgs(raddr)
if err != nil {
return nil, err
}
con, err := utp.Dial(addr)
if err != nil {
return nil, err
}
mnc, err := manet.WrapNetConn(&mautp.Conn{Conn: con})
if err != nil {
return nil, err
}
return &connWrap{
Conn: mnc,
}, nil
}

135
p2p/net/transport/transport_test.go

@ -0,0 +1,135 @@
package transport
import (
"fmt"
"io"
"testing"
ma "gx/ipfs/QmcobAGsCjYt5DXoq9et9L8yR8er7o7Cu3DTvpaq12jYSz/go-multiaddr"
)
func TestTcpTransport(t *testing.T) {
ta := NewTCPTransport()
tb := NewTCPTransport()
zero := "/ip4/127.0.0.1/tcp/0"
subtestTransport(t, ta, tb, zero)
}
func TestUtpTransport(t *testing.T) {
ta := NewUtpTransport()
tb := NewUtpTransport()
zero := "/ip4/127.0.0.1/udp/0/utp"
subtestTransport(t, ta, tb, zero)
}
func subtestTransport(t *testing.T, ta, tb Transport, addr string) {
maddr, err := ma.NewMultiaddr(addr)
if err != nil {
t.Fatal(err)
}
list, err := ta.Listen(maddr)
if err != nil {
t.Fatal(err)
}
dialer, err := tb.Dialer(maddr)
if err != nil {
t.Fatal(err)
}
accepted := make(chan Conn, 1)
errs := make(chan error, 1)
go func() {
b, err := list.Accept()
if err != nil {
errs <- err
return
}
accepted <- b
}()
a, err := dialer.Dial(list.Multiaddr())
if err != nil {
t.Fatal(err)
}
var b Conn
select {
case b = <-accepted:
case err := <-errs:
t.Fatal(err)
}
defer a.Close()
defer b.Close()
err = checkDataTransfer(a, b)
if err != nil {
t.Fatal(err)
}
}
func checkDataTransfer(a, b io.ReadWriter) error {
errs := make(chan error, 2)
data := []byte("this is some test data")
go func() {
n, err := a.Write(data)
if err != nil {
errs <- err
return
}
if n != len(data) {
errs <- fmt.Errorf("failed to write enough data (a->b)")
return
}
buf := make([]byte, len(data))
_, err = io.ReadFull(a, buf)
if err != nil {
errs <- err
return
}
errs <- nil
}()
go func() {
buf := make([]byte, len(data))
_, err := io.ReadFull(b, buf)
if err != nil {
errs <- err
return
}
n, err := b.Write(data)
if err != nil {
errs <- err
return
}
if n != len(data) {
errs <- fmt.Errorf("failed to write enough data (b->a)")
return
}
errs <- nil
}()
err := <-errs
if err != nil {
return err
}
err = <-errs
if err != nil {
return err
}
return nil
}
Loading…
Cancel
Save