Browse Source

Merge pull request #2535 from libp2p/merge-gostream

move libp2p/go-libp2p-gostream to p2p/net/gostream
pull/2537/head
Marten Seemann 1 year ago
committed by GitHub
parent
commit
b427242ee2
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 14
      p2p/net/gostream/addr.go
  2. 43
      p2p/net/gostream/conn.go
  3. 19
      p2p/net/gostream/gostream.go
  4. 141
      p2p/net/gostream/gostream_test.go
  5. 71
      p2p/net/gostream/listener.go

14
p2p/net/gostream/addr.go

@ -0,0 +1,14 @@
package gostream
import "github.com/libp2p/go-libp2p/core/peer"
// addr implements net.Addr and holds a libp2p peer ID.
type addr struct{ id peer.ID }
// Network returns the name of the network that this address belongs to
// (libp2p).
func (a *addr) Network() string { return Network }
// String returns the peer ID of this address in string form
// (B58-encoded).
func (a *addr) String() string { return a.id.String() }

43
p2p/net/gostream/conn.go

@ -0,0 +1,43 @@
package gostream
import (
"context"
"net"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
)
// conn is an implementation of net.Conn which wraps
// libp2p streams.
type conn struct {
network.Stream
}
// newConn creates a conn given a libp2p stream
func newConn(s network.Stream) net.Conn {
return &conn{s}
}
// LocalAddr returns the local network address.
func (c *conn) LocalAddr() net.Addr {
return &addr{c.Stream.Conn().LocalPeer()}
}
// RemoteAddr returns the remote network address.
func (c *conn) RemoteAddr() net.Addr {
return &addr{c.Stream.Conn().RemotePeer()}
}
// Dial opens a stream to the destination address
// (which should parseable to a peer ID) using the given
// host and returns it as a standard net.Conn.
func Dial(ctx context.Context, h host.Host, pid peer.ID, tag protocol.ID) (net.Conn, error) {
s, err := h.NewStream(ctx, pid, tag)
if err != nil {
return nil, err
}
return newConn(s), nil
}

19
p2p/net/gostream/gostream.go

@ -0,0 +1,19 @@
// Package gostream allows to replace the standard net stack in Go
// with [LibP2P](https://github.com/libp2p/libp2p) streams.
//
// Given a libp2p.Host, gostream provides Dial() and Listen() methods which
// return implementations of net.Conn and net.Listener.
//
// Instead of the regular "host:port" addressing, `gostream` uses a Peer ID,
// and rather than a raw TCP connection, gostream will use libp2p's net.Stream.
// This means your connections will take advantage of LibP2P's multi-routes,
// NAT transversal and stream multiplexing.
//
// Note that LibP2P hosts cannot dial to themselves, so there is no possibility
// of using the same Host as server and as client.
package gostream
// Network is the "net.Addr.Network()" name returned by
// addresses used by gostream connections. In turn, the "net.Addr.String()" will
// be a peer ID.
var Network = "libp2p"

141
p2p/net/gostream/gostream_test.go

@ -0,0 +1,141 @@
package gostream
import (
"bufio"
"context"
"io"
"testing"
"time"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/multiformats/go-multiaddr"
)
// newHost illustrates how to build a libp2p host with secio using
// a randomly generated key-pair
func newHost(t *testing.T, listen multiaddr.Multiaddr) host.Host {
h, err := libp2p.New(
libp2p.ListenAddrs(listen),
)
if err != nil {
t.Fatal(err)
}
return h
}
func TestServerClient(t *testing.T) {
m1, _ := multiaddr.NewMultiaddr("/ip4/127.0.0.1/tcp/10000")
m2, _ := multiaddr.NewMultiaddr("/ip4/127.0.0.1/tcp/10001")
srvHost := newHost(t, m1)
clientHost := newHost(t, m2)
defer srvHost.Close()
defer clientHost.Close()
srvHost.Peerstore().AddAddrs(clientHost.ID(), clientHost.Addrs(), peerstore.PermanentAddrTTL)
clientHost.Peerstore().AddAddrs(srvHost.ID(), srvHost.Addrs(), peerstore.PermanentAddrTTL)
var tag protocol.ID = "/testitytest"
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
done := make(chan struct{})
go func() {
defer close(done)
listener, err := Listen(srvHost, tag)
if err != nil {
t.Error(err)
return
}
defer listener.Close()
if listener.Addr().String() != srvHost.ID().Pretty() {
t.Error("bad listener address")
return
}
servConn, err := listener.Accept()
if err != nil {
t.Error(err)
return
}
defer servConn.Close()
reader := bufio.NewReader(servConn)
for {
msg, err := reader.ReadString('\n')
if err == io.EOF {
break
}
if err != nil {
t.Error(err)
return
}
if msg != "is libp2p awesome?\n" {
t.Errorf("Bad incoming message: %s", msg)
return
}
_, err = servConn.Write([]byte("yes it is\n"))
if err != nil {
t.Error(err)
return
}
}
}()
clientConn, err := Dial(ctx, clientHost, srvHost.ID(), tag)
if err != nil {
t.Fatal(err)
}
if clientConn.LocalAddr().String() != clientHost.ID().Pretty() {
t.Fatal("Bad LocalAddr")
}
if clientConn.RemoteAddr().String() != srvHost.ID().Pretty() {
t.Fatal("Bad RemoteAddr")
}
if clientConn.LocalAddr().Network() != Network {
t.Fatal("Bad Network()")
}
err = clientConn.SetDeadline(time.Now().Add(time.Second))
if err != nil {
t.Fatal(err)
}
err = clientConn.SetReadDeadline(time.Now().Add(time.Second))
if err != nil {
t.Fatal(err)
}
err = clientConn.SetWriteDeadline(time.Now().Add(time.Second))
if err != nil {
t.Fatal(err)
}
_, err = clientConn.Write([]byte("is libp2p awesome?\n"))
if err != nil {
t.Fatal(err)
}
reader := bufio.NewReader(clientConn)
resp, err := reader.ReadString('\n')
if err != nil {
t.Fatal(err)
}
if string(resp) != "yes it is\n" {
t.Errorf("Bad response: %s", resp)
}
err = clientConn.Close()
if err != nil {
t.Fatal(err)
}
<-done
}

71
p2p/net/gostream/listener.go

@ -0,0 +1,71 @@
package gostream
import (
"context"
"net"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/protocol"
)
// listener is an implementation of net.Listener which handles
// http-tagged streams from a libp2p connection.
// A listener can be built with Listen()
type listener struct {
host host.Host
ctx context.Context
tag protocol.ID
cancel func()
streamCh chan network.Stream
}
// Accept returns the next a connection to this listener.
// It blocks if there are no connections. Under the hood,
// connections are libp2p streams.
func (l *listener) Accept() (net.Conn, error) {
select {
case s := <-l.streamCh:
return newConn(s), nil
case <-l.ctx.Done():
return nil, l.ctx.Err()
}
}
// Close terminates this listener. It will no longer handle any
// incoming streams
func (l *listener) Close() error {
l.cancel()
l.host.RemoveStreamHandler(l.tag)
return nil
}
// Addr returns the address for this listener, which is its libp2p Peer ID.
func (l *listener) Addr() net.Addr {
return &addr{l.host.ID()}
}
// Listen provides a standard net.Listener ready to accept "connections".
// Under the hood, these connections are libp2p streams tagged with the
// given protocol.ID.
func Listen(h host.Host, tag protocol.ID) (net.Listener, error) {
ctx, cancel := context.WithCancel(context.Background())
l := &listener{
host: h,
ctx: ctx,
cancel: cancel,
tag: tag,
streamCh: make(chan network.Stream),
}
h.SetStreamHandler(tag, func(s network.Stream) {
select {
case l.streamCh <- s:
case <-ctx.Done():
s.Reset()
}
})
return l, nil
}
Loading…
Cancel
Save