Browse Source

Merge pull request #22 from libp2p/feat/consolidate-abstractions

Consolidate abstractions and core types into go-libp2p-core (#28)
pull/1463/head
Steven Allen 6 years ago
committed by GitHub
parent
commit
b1dd953814
  1. 12
      p2p/net/upgrader/conn.go
  2. 11
      p2p/net/upgrader/listener.go
  3. 52
      p2p/net/upgrader/listener_test.go
  4. 31
      p2p/net/upgrader/upgrader.go

12
p2p/net/upgrader/conn.go

@ -3,15 +3,15 @@ package stream
import (
"fmt"
inet "github.com/libp2p/go-libp2p-net"
transport "github.com/libp2p/go-libp2p-transport"
smux "github.com/libp2p/go-stream-muxer"
mux "github.com/libp2p/go-libp2p-core/mux"
network "github.com/libp2p/go-libp2p-core/network"
transport "github.com/libp2p/go-libp2p-core/transport"
)
type transportConn struct {
smux.Conn
inet.ConnMultiaddrs
inet.ConnSecurity
mux.MuxedConn
network.ConnMultiaddrs
network.ConnSecurity
transport transport.Transport
}

11
p2p/net/upgrader/listener.go

@ -7,24 +7,19 @@ import (
logging "github.com/ipfs/go-log"
tec "github.com/jbenet/go-temp-err-catcher"
transport "github.com/libp2p/go-libp2p-transport"
transport "github.com/libp2p/go-libp2p-core/transport"
manet "github.com/multiformats/go-multiaddr-net"
)
var log = logging.Logger("stream-upgrader")
type connErr struct {
conn transport.Conn
err error
}
type listener struct {
manet.Listener
transport transport.Transport
upgrader *Upgrader
incoming chan transport.Conn
incoming chan transport.CapableConn
err error
// Used for backpressure
@ -139,7 +134,7 @@ func (l *listener) handleIncoming() {
}
// Accept accepts a connection.
func (l *listener) Accept() (transport.Conn, error) {
func (l *listener) Accept() (transport.CapableConn, error) {
for c := range l.incoming {
// Could have been sitting there for a while.
if !c.IsClosed() {

52
p2p/net/upgrader/listener_test.go

@ -7,14 +7,14 @@ import (
"sync"
"time"
insecure "github.com/libp2p/go-conn-security/insecure"
peer "github.com/libp2p/go-libp2p-peer"
tpt "github.com/libp2p/go-libp2p-transport"
core "github.com/libp2p/go-libp2p-core"
mux "github.com/libp2p/go-libp2p-core/mux"
insecure "github.com/libp2p/go-libp2p-core/sec/insecure"
tpt "github.com/libp2p/go-libp2p-core/transport"
mplex "github.com/libp2p/go-libp2p-mplex"
st "github.com/libp2p/go-libp2p-transport-upgrader"
smux "github.com/libp2p/go-stream-muxer"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr-net"
mplex "github.com/libp2p/go-libp2p-mplex"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
@ -24,7 +24,7 @@ import (
// It makes sure that this happens at the same time for client and server.
type negotiatingMuxer struct{}
func (m *negotiatingMuxer) NewConn(c net.Conn, isServer bool) (smux.Conn, error) {
func (m *negotiatingMuxer) NewConn(c net.Conn, isServer bool) (mux.MuxedConn, error) {
var err error
// run a fake muxer negotiation
if isServer {
@ -43,10 +43,10 @@ type blockingMuxer struct {
unblock chan struct{}
}
var _ smux.Transport = &blockingMuxer{}
var _ mux.Multiplexer = &blockingMuxer{}
func newBlockingMuxer() *blockingMuxer { return &blockingMuxer{unblock: make(chan struct{})} }
func (m *blockingMuxer) NewConn(c net.Conn, isServer bool) (smux.Conn, error) {
func (m *blockingMuxer) NewConn(c net.Conn, isServer bool) (mux.MuxedConn, error) {
<-m.unblock
return (&negotiatingMuxer{}).NewConn(c, isServer)
}
@ -55,21 +55,21 @@ func (m *blockingMuxer) Unblock() { close(m.unblock) }
// errorMuxer is a muxer that errors while setting up
type errorMuxer struct{}
var _ smux.Transport = &errorMuxer{}
var _ mux.Multiplexer = &errorMuxer{}
func (m *errorMuxer) NewConn(c net.Conn, isServer bool) (smux.Conn, error) {
func (m *errorMuxer) NewConn(c net.Conn, isServer bool) (mux.MuxedConn, error) {
return nil, errors.New("mux error")
}
var _ = Describe("Listener", func() {
var (
defaultUpgrader = &st.Upgrader{
Secure: insecure.New(peer.ID(1)),
Secure: insecure.New(core.PeerID(1)),
Muxer: &negotiatingMuxer{},
}
)
testConn := func(clientConn, serverConn tpt.Conn) {
testConn := func(clientConn, serverConn tpt.CapableConn) {
cstr, err := clientConn.OpenStream()
ExpectWithOffset(0, err).ToNot(HaveOccurred())
_, err = cstr.Write([]byte("foobar"))
@ -90,7 +90,7 @@ var _ = Describe("Listener", func() {
return upgrader.UpgradeListener(nil, ln)
}
dial := func(upgrader *st.Upgrader, raddr ma.Multiaddr, p peer.ID) (tpt.Conn, error) {
dial := func(upgrader *st.Upgrader, raddr ma.Multiaddr, p core.PeerID) (tpt.CapableConn, error) {
macon, err := manet.Dial(raddr)
if err != nil {
return nil, err
@ -105,7 +105,7 @@ var _ = Describe("Listener", func() {
It("accepts a single connection", func() {
ln := createListener(defaultUpgrader)
defer ln.Close()
cconn, err := dial(defaultUpgrader, ln.Multiaddr(), peer.ID(1))
cconn, err := dial(defaultUpgrader, ln.Multiaddr(), core.PeerID(1))
Expect(err).ToNot(HaveOccurred())
sconn, err := ln.Accept()
Expect(err).ToNot(HaveOccurred())
@ -117,7 +117,7 @@ var _ = Describe("Listener", func() {
defer ln.Close()
const num = 10
for i := 0; i < 10; i++ {
cconn, err := dial(defaultUpgrader, ln.Multiaddr(), peer.ID(1))
cconn, err := dial(defaultUpgrader, ln.Multiaddr(), core.PeerID(1))
Expect(err).ToNot(HaveOccurred())
sconn, err := ln.Accept()
Expect(err).ToNot(HaveOccurred())
@ -130,7 +130,7 @@ var _ = Describe("Listener", func() {
tpt.AcceptTimeout = timeout
ln := createListener(defaultUpgrader)
defer ln.Close()
conn, err := dial(defaultUpgrader, ln.Multiaddr(), peer.ID(2))
conn, err := dial(defaultUpgrader, ln.Multiaddr(), core.PeerID(2))
if !Expect(err).ToNot(HaveOccurred()) {
return
}
@ -150,7 +150,7 @@ var _ = Describe("Listener", func() {
It("doesn't accept connections that fail to setup", func() {
upgrader := &st.Upgrader{
Secure: insecure.New(peer.ID(1)),
Secure: insecure.New(core.PeerID(1)),
Muxer: &errorMuxer{},
}
ln := createListener(upgrader)
@ -163,7 +163,7 @@ var _ = Describe("Listener", func() {
}
close(done)
}()
conn, err := dial(defaultUpgrader, ln.Multiaddr(), peer.ID(2))
conn, err := dial(defaultUpgrader, ln.Multiaddr(), core.PeerID(2))
if !Expect(err).To(HaveOccurred()) {
conn.Close()
}
@ -178,11 +178,11 @@ var _ = Describe("Listener", func() {
num := 3 * st.AcceptQueueLength
bm := newBlockingMuxer()
upgrader := &st.Upgrader{
Secure: insecure.New(peer.ID(1)),
Secure: insecure.New(core.PeerID(1)),
Muxer: bm,
}
ln := createListener(upgrader)
accepted := make(chan tpt.Conn, num)
accepted := make(chan tpt.CapableConn, num)
go func() {
defer GinkgoRecover()
for {
@ -200,7 +200,7 @@ var _ = Describe("Listener", func() {
wg.Add(1)
go func() {
defer GinkgoRecover()
conn, err := dial(defaultUpgrader, ln.Multiaddr(), peer.ID(2))
conn, err := dial(defaultUpgrader, ln.Multiaddr(), core.PeerID(2))
if Expect(err).ToNot(HaveOccurred()) {
stream, err := conn.AcceptStream() // wait for conn to be accepted.
if !Expect(err).To(HaveOccurred()) {
@ -223,11 +223,11 @@ var _ = Describe("Listener", func() {
defer ln.Close()
// setup AcceptQueueLength connections, but don't accept any of them
dialed := make(chan tpt.Conn, 10*st.AcceptQueueLength) // used as a thread-safe counter
dialed := make(chan tpt.CapableConn, 10*st.AcceptQueueLength) // used as a thread-safe counter
for i := 0; i < st.AcceptQueueLength; i++ {
go func() {
defer GinkgoRecover()
conn, err := dial(defaultUpgrader, ln.Multiaddr(), peer.ID(2))
conn, err := dial(defaultUpgrader, ln.Multiaddr(), core.PeerID(2))
Expect(err).ToNot(HaveOccurred())
dialed <- conn
}()
@ -236,7 +236,7 @@ var _ = Describe("Listener", func() {
// dial a new connection. This connection should not complete setup, since the queue is full
go func() {
defer GinkgoRecover()
conn, err := dial(defaultUpgrader, ln.Multiaddr(), peer.ID(2))
conn, err := dial(defaultUpgrader, ln.Multiaddr(), core.PeerID(2))
Expect(err).ToNot(HaveOccurred())
dialed <- conn
}()
@ -279,7 +279,7 @@ var _ = Describe("Listener", func() {
It("doesn't accept new connections when it is closed", func() {
ln := createListener(defaultUpgrader)
Expect(ln.Close()).To(Succeed())
conn, err := dial(defaultUpgrader, ln.Multiaddr(), peer.ID(1))
conn, err := dial(defaultUpgrader, ln.Multiaddr(), core.PeerID(1))
if !Expect(err).To(HaveOccurred()) {
conn.Close()
}
@ -287,7 +287,7 @@ var _ = Describe("Listener", func() {
It("closes incoming connections that have not yet been accepted", func() {
ln := createListener(defaultUpgrader)
conn, err := dial(defaultUpgrader, ln.Multiaddr(), peer.ID(2))
conn, err := dial(defaultUpgrader, ln.Multiaddr(), core.PeerID(2))
if !Expect(err).ToNot(HaveOccurred()) {
ln.Close()
return

31
p2p/net/upgrader/upgrader.go

@ -6,12 +6,12 @@ import (
"fmt"
"net"
ss "github.com/libp2p/go-conn-security"
pnet "github.com/libp2p/go-libp2p-interface-pnet"
peer "github.com/libp2p/go-libp2p-peer"
transport "github.com/libp2p/go-libp2p-transport"
core "github.com/libp2p/go-libp2p-core"
mux "github.com/libp2p/go-libp2p-core/mux"
pnet "github.com/libp2p/go-libp2p-core/pnet"
sec "github.com/libp2p/go-libp2p-core/sec"
transport "github.com/libp2p/go-libp2p-core/transport"
filter "github.com/libp2p/go-maddr-filter"
smux "github.com/libp2p/go-stream-muxer"
manet "github.com/multiformats/go-multiaddr-net"
)
@ -26,8 +26,8 @@ var AcceptQueueLength = 16
// to a full transport connection (secure and multiplexed).
type Upgrader struct {
Protector pnet.Protector
Secure ss.Transport
Muxer smux.Transport
Secure sec.SecureTransport
Muxer mux.Multiplexer
Filters *filter.Filters
}
@ -39,7 +39,7 @@ func (u *Upgrader) UpgradeListener(t transport.Transport, list manet.Listener) t
upgrader: u,
transport: t,
threshold: newThreshold(AcceptQueueLength),
incoming: make(chan transport.Conn),
incoming: make(chan transport.CapableConn),
cancel: cancel,
ctx: ctx,
}
@ -49,7 +49,7 @@ func (u *Upgrader) UpgradeListener(t transport.Transport, list manet.Listener) t
// UpgradeOutbound upgrades the given outbound multiaddr-net connection into a
// full libp2p-transport connection.
func (u *Upgrader) UpgradeOutbound(ctx context.Context, t transport.Transport, maconn manet.Conn, p peer.ID) (transport.Conn, error) {
func (u *Upgrader) UpgradeOutbound(ctx context.Context, t transport.Transport, maconn manet.Conn, p core.PeerID) (transport.CapableConn, error) {
if p == "" {
return nil, ErrNilPeer
}
@ -58,11 +58,11 @@ func (u *Upgrader) UpgradeOutbound(ctx context.Context, t transport.Transport, m
// UpgradeInbound upgrades the given inbound multiaddr-net connection into a
// full libp2p-transport connection.
func (u *Upgrader) UpgradeInbound(ctx context.Context, t transport.Transport, maconn manet.Conn) (transport.Conn, error) {
func (u *Upgrader) UpgradeInbound(ctx context.Context, t transport.Transport, maconn manet.Conn) (transport.CapableConn, error) {
return u.upgrade(ctx, t, maconn, "")
}
func (u *Upgrader) upgrade(ctx context.Context, t transport.Transport, maconn manet.Conn, p peer.ID) (transport.Conn, error) {
func (u *Upgrader) upgrade(ctx context.Context, t transport.Transport, maconn manet.Conn, p core.PeerID) (transport.CapableConn, error) {
if u.Filters != nil && u.Filters.AddrBlocked(maconn.RemoteMultiaddr()) {
log.Debugf("blocked connection from %s", maconn.RemoteMultiaddr())
maconn.Close()
@ -78,6 +78,7 @@ func (u *Upgrader) upgrade(ctx context.Context, t transport.Transport, maconn ma
}
conn = pconn
} else if pnet.ForcePrivateNetwork {
conn.Close()
log.Error("tried to dial with no Private Network Protector but usage" +
" of Private Networks is forced by the enviroment")
return nil, pnet.ErrNotInPrivateNetwork
@ -93,25 +94,25 @@ func (u *Upgrader) upgrade(ctx context.Context, t transport.Transport, maconn ma
return nil, fmt.Errorf("failed to negotiate security stream multiplexer: %s", err)
}
return &transportConn{
Conn: smconn,
MuxedConn: smconn,
ConnMultiaddrs: maconn,
ConnSecurity: sconn,
transport: t,
}, nil
}
func (u *Upgrader) setupSecurity(ctx context.Context, conn net.Conn, p peer.ID) (ss.Conn, error) {
func (u *Upgrader) setupSecurity(ctx context.Context, conn net.Conn, p core.PeerID) (sec.SecureConn, error) {
if p == "" {
return u.Secure.SecureInbound(ctx, conn)
}
return u.Secure.SecureOutbound(ctx, conn, p)
}
func (u *Upgrader) setupMuxer(ctx context.Context, conn net.Conn, p peer.ID) (smux.Conn, error) {
func (u *Upgrader) setupMuxer(ctx context.Context, conn net.Conn, p core.PeerID) (mux.MuxedConn, error) {
// TODO: The muxer should take a context.
done := make(chan struct{})
var smconn smux.Conn
var smconn mux.MuxedConn
var err error
go func() {
defer close(done)

Loading…
Cancel
Save