mirror of https://github.com/libp2p/go-libp2p.git
Steven Allen
6 years ago
25 changed files with 4 additions and 1972 deletions
@ -1,15 +1,3 @@ |
|||
# go-libp2p Examples |
|||
|
|||
# `go-libp2p` examples and tutorials |
|||
|
|||
In this folder, you can find a variety of examples to help you get started in using go-libp2p. Every example as a specific purpose and some of each incorporate a full tutorial that you can follow through, helping you expand your knowledge about libp2p and p2p networks in general. |
|||
|
|||
Let us know if you find any issue or if you want to contribute and add a new tutorial, feel welcome to submit a pr, thank you! |
|||
|
|||
## Examples and Tutorials |
|||
|
|||
- [The libp2p 'host'](./libp2p-host) |
|||
- [Building an http proxy with libp2p](./http-proxy) |
|||
- [Protocol Multiplexing with multicodecs](./protocol-multiplexing-with-multicodecs) |
|||
- [An echo host](./echo) |
|||
- [Multicodecs with protobufs](./multipro) |
|||
- [P2P chat application](./chat) |
|||
The go-libp2p examples have moved to [go-libp2p-examples](https://github.com/libp2p/go-libp2p-examples). |
|||
|
@ -1,49 +0,0 @@ |
|||
# p2p chat app with libp2p |
|||
|
|||
This program demonstrates a simple p2p chat application. It can work between two peers if |
|||
1. Both have private IP address (same network). |
|||
2. At least one of them has a public IP address. |
|||
|
|||
Assume if 'A' and 'B' are on different networks host 'A' may or may not have a public IP address but host 'B' has one. |
|||
|
|||
Usage: Run `./chat -sp <SOURCE_PORT>` on host 'B' where <SOURCE_PORT> can be any port number. Now run `./chat -d <MULTIADDR_B>` on node 'A' [`<MULTIADDR_B>` is multiaddress of host 'B' which can be obtained from host 'B' console]. |
|||
|
|||
## Build |
|||
|
|||
To build the example, first run `make deps` in the root directory. |
|||
|
|||
``` |
|||
> make deps |
|||
> go build ./examples/chat |
|||
``` |
|||
|
|||
## Usage |
|||
|
|||
On node 'B' |
|||
|
|||
``` |
|||
> ./chat -sp 3001 |
|||
Run ./chat -d /ip4/127.0.0.1/tcp/3001/ipfs/QmdXGaeGiVA745XorV1jr11RHxB9z4fqykm6xCUPX1aTJo |
|||
|
|||
2018/02/27 01:21:32 Got a new stream! |
|||
> hi (received messages in green colour) |
|||
> hello (sent messages in white colour) |
|||
> no |
|||
``` |
|||
|
|||
On node 'A'. Replace 127.0.0.1 with <PUBLIC_IP> if node 'B' has one. |
|||
|
|||
``` |
|||
> ./chat -d /ip4/127.0.0.1/tcp/3001/ipfs/QmdXGaeGiVA745XorV1jr11RHxB9z4fqykm6xCUPX1aTJo |
|||
Run ./chat -d /ip4/127.0.0.1/tcp/3001/ipfs/QmdXGaeGiVA745XorV1jr11RHxB9z4fqykm6xCUPX1aTJo |
|||
|
|||
This node's multiaddress: |
|||
/ip4/0.0.0.0/tcp/0/ipfs/QmWVx9NwsgaVWMRHNCpesq1WQAw2T3JurjGDNeVNWifPS7 |
|||
> hi |
|||
> hello |
|||
``` |
|||
|
|||
**NOTE: debug mode is enabled by default, debug mode will always generate same node id (on each node) on every execution. Disable debug using `--debug false` flag while running your executable.** |
|||
|
|||
## Authors |
|||
1. Abhishek Upperwal |
@ -1,221 +0,0 @@ |
|||
/* |
|||
* |
|||
* The MIT License (MIT) |
|||
* |
|||
* Copyright (c) 2014 Juan Batiz-Benet |
|||
* |
|||
* Permission is hereby granted, free of charge, to any person obtaining a copy |
|||
* of this software and associated documentation files (the "Software"), to deal |
|||
* in the Software without restriction, including without limitation the rights |
|||
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell |
|||
* copies of the Software, and to permit persons to whom the Software is |
|||
* furnished to do so, subject to the following conditions: |
|||
* |
|||
* The above copyright notice and this permission notice shall be included in |
|||
* all copies or substantial portions of the Software. |
|||
* |
|||
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR |
|||
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, |
|||
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE |
|||
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER |
|||
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, |
|||
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN |
|||
* THE SOFTWARE. |
|||
* |
|||
* This program demonstrate a simple chat application using p2p communication. |
|||
* |
|||
*/ |
|||
|
|||
package main |
|||
|
|||
import ( |
|||
"bufio" |
|||
"context" |
|||
"crypto/rand" |
|||
"flag" |
|||
"fmt" |
|||
"io" |
|||
"log" |
|||
mrand "math/rand" |
|||
"os" |
|||
|
|||
"github.com/libp2p/go-libp2p" |
|||
|
|||
"github.com/libp2p/go-libp2p-crypto" |
|||
"github.com/libp2p/go-libp2p-host" |
|||
"github.com/libp2p/go-libp2p-net" |
|||
"github.com/libp2p/go-libp2p-peer" |
|||
"github.com/libp2p/go-libp2p-peerstore" |
|||
"github.com/multiformats/go-multiaddr" |
|||
) |
|||
|
|||
/* |
|||
* addAddrToPeerstore parses a peer multiaddress and adds |
|||
* it to the given host's peerstore, so it knows how to |
|||
* contact it. It returns the peer ID of the remote peer. |
|||
* @credit examples/http-proxy/proxy.go |
|||
*/ |
|||
func addAddrToPeerstore(h host.Host, addr string) peer.ID { |
|||
// The following code extracts target's the peer ID from the
|
|||
// given multiaddress
|
|||
ipfsaddr, err := multiaddr.NewMultiaddr(addr) |
|||
if err != nil { |
|||
log.Fatalln(err) |
|||
} |
|||
pid, err := ipfsaddr.ValueForProtocol(multiaddr.P_IPFS) |
|||
if err != nil { |
|||
log.Fatalln(err) |
|||
} |
|||
|
|||
peerid, err := peer.IDB58Decode(pid) |
|||
if err != nil { |
|||
log.Fatalln(err) |
|||
} |
|||
|
|||
// Decapsulate the /ipfs/<peerID> part from the target
|
|||
// /ip4/<a.b.c.d>/ipfs/<peer> becomes /ip4/<a.b.c.d>
|
|||
targetPeerAddr, _ := multiaddr.NewMultiaddr( |
|||
fmt.Sprintf("/ipfs/%s", peer.IDB58Encode(peerid))) |
|||
targetAddr := ipfsaddr.Decapsulate(targetPeerAddr) |
|||
|
|||
// We have a peer ID and a targetAddr so we add
|
|||
// it to the peerstore so LibP2P knows how to contact it
|
|||
h.Peerstore().AddAddr(peerid, targetAddr, peerstore.PermanentAddrTTL) |
|||
return peerid |
|||
} |
|||
|
|||
func handleStream(s net.Stream) { |
|||
log.Println("Got a new stream!") |
|||
|
|||
// Create a buffer stream for non blocking read and write.
|
|||
rw := bufio.NewReadWriter(bufio.NewReader(s), bufio.NewWriter(s)) |
|||
|
|||
go readData(rw) |
|||
go writeData(rw) |
|||
|
|||
// stream 's' will stay open until you close it (or the other side closes it).
|
|||
} |
|||
func readData(rw *bufio.ReadWriter) { |
|||
for { |
|||
str, _ := rw.ReadString('\n') |
|||
|
|||
if str == "" { |
|||
return |
|||
} |
|||
if str != "\n" { |
|||
// Green console colour: \x1b[32m
|
|||
// Reset console colour: \x1b[0m
|
|||
fmt.Printf("\x1b[32m%s\x1b[0m> ", str) |
|||
} |
|||
|
|||
} |
|||
} |
|||
|
|||
func writeData(rw *bufio.ReadWriter) { |
|||
stdReader := bufio.NewReader(os.Stdin) |
|||
|
|||
for { |
|||
fmt.Print("> ") |
|||
sendData, err := stdReader.ReadString('\n') |
|||
|
|||
if err != nil { |
|||
panic(err) |
|||
} |
|||
|
|||
rw.WriteString(fmt.Sprintf("%s\n", sendData)) |
|||
rw.Flush() |
|||
} |
|||
|
|||
} |
|||
|
|||
func main() { |
|||
|
|||
sourcePort := flag.Int("sp", 0, "Source port number") |
|||
dest := flag.String("d", "", "Dest MultiAddr String") |
|||
help := flag.Bool("help", false, "Display Help") |
|||
debug := flag.Bool("debug", true, "Debug generated same node id on every execution.") |
|||
|
|||
flag.Parse() |
|||
|
|||
if *help { |
|||
fmt.Printf("This program demonstrates a simple p2p chat application using libp2p\n\n") |
|||
fmt.Printf("Usage: Run './chat -sp <SOURCE_PORT>' where <SOURCE_PORT> can be any port number. Now run './chat -d <MULTIADDR>' where <MULTIADDR> is multiaddress of previous listener host.\n") |
|||
|
|||
os.Exit(0) |
|||
} |
|||
|
|||
// If debug is enabled used constant random source else cryptographic randomness.
|
|||
var r io.Reader |
|||
if *debug { |
|||
// Constant random source. This will always generate the same host ID on multiple execution.
|
|||
// Don't do this in production code.
|
|||
r = mrand.New(mrand.NewSource(int64(*sourcePort))) |
|||
} else { |
|||
r = rand.Reader |
|||
} |
|||
|
|||
// Creates a new RSA key pair for this host
|
|||
prvKey, _, err := crypto.GenerateKeyPairWithReader(crypto.RSA, 2048, r) |
|||
|
|||
if err != nil { |
|||
panic(err) |
|||
} |
|||
|
|||
// 0.0.0.0 will listen on any interface device
|
|||
sourceMultiAddr, _ := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/0.0.0.0/tcp/%d", *sourcePort)) |
|||
|
|||
// libp2p.New constructs a new libp2p Host.
|
|||
// Other options can be added here.
|
|||
host, err := libp2p.New( |
|||
context.Background(), |
|||
libp2p.ListenAddrs(sourceMultiAddr), |
|||
libp2p.Identity(prvKey), |
|||
) |
|||
|
|||
if err != nil { |
|||
panic(err) |
|||
} |
|||
|
|||
if *dest == "" { |
|||
// Set a function as stream handler.
|
|||
// This function is called when a peer initiate a connection and starts a stream with this peer.
|
|||
// Only applicable on the receiving side.
|
|||
host.SetStreamHandler("/chat/1.0.0", handleStream) |
|||
|
|||
fmt.Printf("Run './chat -d /ip4/127.0.0.1/tcp/%d/ipfs/%s' on another console.\n You can replace 127.0.0.1 with public IP as well.\n", *sourcePort, host.ID().Pretty()) |
|||
fmt.Printf("\nWaiting for incoming connection\n\n") |
|||
// Hang forever
|
|||
<-make(chan struct{}) |
|||
|
|||
} else { |
|||
|
|||
// Add destination peer multiaddress in the peerstore.
|
|||
// This will be used during connection and stream creation by libp2p.
|
|||
peerID := addAddrToPeerstore(host, *dest) |
|||
|
|||
fmt.Println("This node's multiaddress: ") |
|||
// IP will be 0.0.0.0 (listen on any interface) and port will be 0 (choose one for me).
|
|||
// Although this node will not listen for any connection. It will just initiate a connect with
|
|||
// one of its peer and use that stream to communicate.
|
|||
fmt.Printf("%s/ipfs/%s\n", sourceMultiAddr, host.ID().Pretty()) |
|||
|
|||
// Start a stream with peer with peer Id: 'peerId'.
|
|||
// Multiaddress of the destination peer is fetched from the peerstore using 'peerId'.
|
|||
s, err := host.NewStream(context.Background(), peerID, "/chat/1.0.0") |
|||
|
|||
if err != nil { |
|||
panic(err) |
|||
} |
|||
|
|||
// Create a buffered stream so that read and writes are non blocking.
|
|||
rw := bufio.NewReadWriter(bufio.NewReader(s), bufio.NewWriter(s)) |
|||
|
|||
// Create a thread to read and write data.
|
|||
go writeData(rw) |
|||
go readData(rw) |
|||
|
|||
// Hang forever.
|
|||
select {} |
|||
|
|||
} |
|||
} |
@ -1,52 +0,0 @@ |
|||
# Echo client/server with libp2p |
|||
|
|||
This is an example that quickly shows how to use the `go-libp2p` stack, including Host/Basichost, Network/Swarm, Streams, Peerstores and Multiaddresses. |
|||
|
|||
This example can be started in either listen mode, or dial mode. |
|||
|
|||
In listen mode, it will sit and wait for incoming connections on the `/echo/1.0.0` protocol. Whenever it receives a stream, it will write the message `"Hello, world!"` over the stream and close it. |
|||
|
|||
In dial mode, the node will start up, connect to the given address, open a stream to the target peer, and read a message on the protocol `/echo/1.0.0`. |
|||
|
|||
## Build |
|||
|
|||
From `go-libp2p` base folder: |
|||
|
|||
``` |
|||
> make deps |
|||
> go build ./examples/echo |
|||
``` |
|||
|
|||
## Usage |
|||
|
|||
``` |
|||
> ./echo -secio -l 10000 |
|||
2017/03/15 14:11:32 I am /ip4/127.0.0.1/tcp/10000/ipfs/QmYo41GybvrXk8y8Xnm1P7pfA4YEXCpfnLyzgRPnNbG35e |
|||
2017/03/15 14:11:32 Now run "./echo -l 10001 -d /ip4/127.0.0.1/tcp/10000/ipfs/QmYo41GybvrXk8y8Xnm1P7pfA4YEXCpfnLyzgRPnNbG35e -secio" on a different terminal |
|||
2017/03/15 14:11:32 listening for connections |
|||
``` |
|||
|
|||
The listener libp2p host will print its `Multiaddress`, which indicates how it can be reached (ip4+tcp) and its randomly generated ID (`QmYo41Gyb...`) |
|||
|
|||
Now, launch another node that talks to the listener: |
|||
|
|||
``` |
|||
> ./echo -secio -l 10001 -d /ip4/127.0.0.1/tcp/10000/ipfs/QmYo41GybvrXk8y8Xnm1P7pfA4YEXCpfnLyzgRPnNbG35e |
|||
``` |
|||
|
|||
The new node with send the message `"Hello, world!"` to the listener, which will in turn echo it over the stream and close it. The listener logs the message, and the sender logs the response. |
|||
|
|||
## Details |
|||
|
|||
The `makeBasicHost()` function creates a [go-libp2p-basichost](https://godoc.org/github.com/libp2p/go-libp2p/p2p/host/basic) object. `basichost` objects wrap [go-libp2 swarms](https://godoc.org/github.com/libp2p/go-libp2p-swarm#Swarm) and should be used preferentially. A [go-libp2p-swarm Network](https://godoc.org/github.com/libp2p/go-libp2p-swarm#Network) is a `swarm` which complies to the [go-libp2p-net Network interface](https://godoc.org/github.com/libp2p/go-libp2p-net#Network) and takes care of maintaining streams, connections, multiplexing different protocols on them, handling incoming connections etc. |
|||
|
|||
In order to create the swarm (and a `basichost`), the example needs: |
|||
|
|||
- An [ipfs-procotol ID](https://godoc.org/github.com/libp2p/go-libp2p-peer#ID) like `QmNtX1cvrm2K6mQmMEaMxAuB4rTexhd87vpYVot4sEZzxc`. The example autogenerates a key pair on every run and uses an ID extracted from the public key (the hash of the public key). When using `-secio`, it uses the key pair to encrypt communications (otherwise, it leaves the connections unencrypted). |
|||
- A [Multiaddress](https://godoc.org/github.com/multiformats/go-multiaddr), which indicates how to reach this peer. There can be several of them (using different protocols or locations for example). Example: `/ip4/127.0.0.1/tcp/1234`. |
|||
- A [go-libp2p-peerstore](https://godoc.org/github.com/libp2p/go-libp2p-peerstore), which is used as a address book which matches node IDs to the multiaddresses through which they can be contacted. This peerstore gets autopopulated when manually opening a connection (with [`Connect()`](https://godoc.org/github.com/libp2p/go-libp2p/p2p/host/basic#BasicHost.Connect). Alternatively, we can manually [`AddAddr()`](https://godoc.org/github.com/libp2p/go-libp2p-peerstore#AddrManager.AddAddr) as in the example. |
|||
|
|||
A `basichost` can now open streams (bi-directional channel between to peers) using [NewStream](https://godoc.org/github.com/libp2p/go-libp2p/p2p/host/basic#BasicHost.NewStream) and use them to send and receive data tagged with a `Protocol.ID` (a string). The host can also listen for incoming connections for a given |
|||
`Protocol` with [`SetStreamHandle()`](https://godoc.org/github.com/libp2p/go-libp2p/p2p/host/basic#BasicHost.SetStreamHandler). |
|||
|
|||
The example makes use of all of this to enable communication between a listener and a sender using protocol `/echo/1.0.0` (which could be any other thing). |
@ -1,178 +0,0 @@ |
|||
package main |
|||
|
|||
import ( |
|||
"bufio" |
|||
"context" |
|||
"crypto/rand" |
|||
"flag" |
|||
"fmt" |
|||
"io" |
|||
"io/ioutil" |
|||
"log" |
|||
mrand "math/rand" |
|||
|
|||
golog "github.com/ipfs/go-log" |
|||
libp2p "github.com/libp2p/go-libp2p" |
|||
crypto "github.com/libp2p/go-libp2p-crypto" |
|||
host "github.com/libp2p/go-libp2p-host" |
|||
net "github.com/libp2p/go-libp2p-net" |
|||
peer "github.com/libp2p/go-libp2p-peer" |
|||
pstore "github.com/libp2p/go-libp2p-peerstore" |
|||
ma "github.com/multiformats/go-multiaddr" |
|||
gologging "github.com/whyrusleeping/go-logging" |
|||
) |
|||
|
|||
// makeBasicHost creates a LibP2P host with a random peer ID listening on the
|
|||
// given multiaddress. It will use secio if secio is true.
|
|||
func makeBasicHost(listenPort int, secio bool, randseed int64) (host.Host, error) { |
|||
|
|||
// If the seed is zero, use real cryptographic randomness. Otherwise, use a
|
|||
// deterministic randomness source to make generated keys stay the same
|
|||
// across multiple runs
|
|||
var r io.Reader |
|||
if randseed == 0 { |
|||
r = rand.Reader |
|||
} else { |
|||
r = mrand.New(mrand.NewSource(randseed)) |
|||
} |
|||
|
|||
// Generate a key pair for this host. We will use it at least
|
|||
// to obtain a valid host ID.
|
|||
priv, _, err := crypto.GenerateKeyPairWithReader(crypto.RSA, 2048, r) |
|||
if err != nil { |
|||
return nil, err |
|||
} |
|||
|
|||
opts := []libp2p.Option{ |
|||
libp2p.ListenAddrStrings(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", listenPort)), |
|||
libp2p.Identity(priv), |
|||
} |
|||
|
|||
if !secio { |
|||
opts = append(opts, libp2p.NoSecurity) |
|||
} |
|||
|
|||
basicHost, err := libp2p.New(context.Background(), opts...) |
|||
if err != nil { |
|||
return nil, err |
|||
} |
|||
|
|||
// Build host multiaddress
|
|||
hostAddr, _ := ma.NewMultiaddr(fmt.Sprintf("/ipfs/%s", basicHost.ID().Pretty())) |
|||
|
|||
// Now we can build a full multiaddress to reach this host
|
|||
// by encapsulating both addresses:
|
|||
addr := basicHost.Addrs()[0] |
|||
fullAddr := addr.Encapsulate(hostAddr) |
|||
log.Printf("I am %s\n", fullAddr) |
|||
if secio { |
|||
log.Printf("Now run \"./echo -l %d -d %s -secio\" on a different terminal\n", listenPort+1, fullAddr) |
|||
} else { |
|||
log.Printf("Now run \"./echo -l %d -d %s\" on a different terminal\n", listenPort+1, fullAddr) |
|||
} |
|||
|
|||
return basicHost, nil |
|||
} |
|||
|
|||
func main() { |
|||
// LibP2P code uses golog to log messages. They log with different
|
|||
// string IDs (i.e. "swarm"). We can control the verbosity level for
|
|||
// all loggers with:
|
|||
golog.SetAllLoggers(gologging.INFO) // Change to DEBUG for extra info
|
|||
|
|||
// Parse options from the command line
|
|||
listenF := flag.Int("l", 0, "wait for incoming connections") |
|||
target := flag.String("d", "", "target peer to dial") |
|||
secio := flag.Bool("secio", false, "enable secio") |
|||
seed := flag.Int64("seed", 0, "set random seed for id generation") |
|||
flag.Parse() |
|||
|
|||
if *listenF == 0 { |
|||
log.Fatal("Please provide a port to bind on with -l") |
|||
} |
|||
|
|||
// Make a host that listens on the given multiaddress
|
|||
ha, err := makeBasicHost(*listenF, *secio, *seed) |
|||
if err != nil { |
|||
log.Fatal(err) |
|||
} |
|||
|
|||
// Set a stream handler on host A. /echo/1.0.0 is
|
|||
// a user-defined protocol name.
|
|||
ha.SetStreamHandler("/echo/1.0.0", func(s net.Stream) { |
|||
log.Println("Got a new stream!") |
|||
if err := doEcho(s); err != nil { |
|||
log.Println(err) |
|||
s.Reset() |
|||
} else { |
|||
s.Close() |
|||
} |
|||
}) |
|||
|
|||
if *target == "" { |
|||
log.Println("listening for connections") |
|||
select {} // hang forever
|
|||
} |
|||
/**** This is where the listener code ends ****/ |
|||
|
|||
// The following code extracts target's the peer ID from the
|
|||
// given multiaddress
|
|||
ipfsaddr, err := ma.NewMultiaddr(*target) |
|||
if err != nil { |
|||
log.Fatalln(err) |
|||
} |
|||
|
|||
pid, err := ipfsaddr.ValueForProtocol(ma.P_IPFS) |
|||
if err != nil { |
|||
log.Fatalln(err) |
|||
} |
|||
|
|||
peerid, err := peer.IDB58Decode(pid) |
|||
if err != nil { |
|||
log.Fatalln(err) |
|||
} |
|||
|
|||
// Decapsulate the /ipfs/<peerID> part from the target
|
|||
// /ip4/<a.b.c.d>/ipfs/<peer> becomes /ip4/<a.b.c.d>
|
|||
targetPeerAddr, _ := ma.NewMultiaddr( |
|||
fmt.Sprintf("/ipfs/%s", peer.IDB58Encode(peerid))) |
|||
targetAddr := ipfsaddr.Decapsulate(targetPeerAddr) |
|||
|
|||
// We have a peer ID and a targetAddr so we add it to the peerstore
|
|||
// so LibP2P knows how to contact it
|
|||
ha.Peerstore().AddAddr(peerid, targetAddr, pstore.PermanentAddrTTL) |
|||
|
|||
log.Println("opening stream") |
|||
// make a new stream from host B to host A
|
|||
// it should be handled on host A by the handler we set above because
|
|||
// we use the same /echo/1.0.0 protocol
|
|||
s, err := ha.NewStream(context.Background(), peerid, "/echo/1.0.0") |
|||
if err != nil { |
|||
log.Fatalln(err) |
|||
} |
|||
|
|||
_, err = s.Write([]byte("Hello, world!\n")) |
|||
if err != nil { |
|||
log.Fatalln(err) |
|||
} |
|||
|
|||
out, err := ioutil.ReadAll(s) |
|||
if err != nil { |
|||
log.Fatalln(err) |
|||
} |
|||
|
|||
log.Printf("read reply: %q\n", out) |
|||
} |
|||
|
|||
// doEcho reads a line of data a stream and writes it back
|
|||
func doEcho(s net.Stream) error { |
|||
buf := bufio.NewReader(s) |
|||
str, err := buf.ReadString('\n') |
|||
if err != nil { |
|||
return err |
|||
} |
|||
|
|||
log.Printf("read: %s\n", str) |
|||
_, err = s.Write([]byte(str)) |
|||
return err |
|||
} |
@ -1,58 +0,0 @@ |
|||
# HTTP proxy service with libp2p |
|||
|
|||
This examples shows how to create a simple HTTP proxy service with libp2p: |
|||
|
|||
``` |
|||
XXX |
|||
XX XXXXXX |
|||
X XX |
|||
XXXXXXX XX XX XXXXXXXXXX |
|||
+----------------+ +-----------------+ XXX XXX XXX XXX |
|||
HTTP Request | | | | XX XX |
|||
+-----------------> | libp2p stream | | HTTP X X |
|||
| Local peer <----------------> Remote peer <-------------> HTTP SERVER - THE INTERNET XX |
|||
<-----------------+ | | | Req & Resp XX X |
|||
HTTP Response | libp2p host | | libp2p host | XXXX XXXX XXXXXXXXXXXXXXXXXXXX XXXX |
|||
+----------------+ +-----------------+ XXXXX |
|||
``` |
|||
|
|||
In order to proxy an HTTP request, we create a local peer which listens on `localhost:9900`. HTTP requests performed to that address are tunneled via a libp2p stream to a remote peer, which then performs the HTTP requests and sends the response back to the local peer, which relays it to the user. |
|||
|
|||
Note that this is a very simple approach to a proxy, and does not perform any header management, nor supports HTTPS. The `proxy.go` code is thoroughly commeted, detailing what is happening in every step. |
|||
|
|||
## Build |
|||
|
|||
From `go-libp2p` base folder: |
|||
|
|||
``` |
|||
> make deps |
|||
> go build ./examples/http-proxy |
|||
``` |
|||
|
|||
## Usage |
|||
|
|||
First run the "remote" peer as follows. It will print a local peer address. If you would like to run this on a separate machine, please replace the IP accordingly: |
|||
|
|||
```sh |
|||
> ./http-proxy |
|||
Proxy server is ready |
|||
libp2p-peer addresses: |
|||
/ip4/127.0.0.1/tcp/12000/ipfs/QmddTrQXhA9AkCpXPTkcY7e22NK73TwkUms3a44DhTKJTD |
|||
``` |
|||
|
|||
The run the local peer, indicating that it will need to forward http requests to the remote peer as follows: |
|||
|
|||
``` |
|||
> ./http-proxy -d /ip4/127.0.0.1/tcp/12000/ipfs/QmddTrQXhA9AkCpXPTkcY7e22NK73TwkUms3a44DhTKJTD |
|||
Proxy server is ready |
|||
libp2p-peer addresses: |
|||
/ip4/127.0.0.1/tcp/12001/ipfs/Qmaa2AYTha1UqcFVX97p9R1UP7vbzDLY7bqWsZw1135QvN |
|||
proxy listening on 127.0.0.1:9900 |
|||
``` |
|||
|
|||
As you can see, the proxy prints the listening address `127.0.0.1:9900`. You can now use this address as proxy, for example with `curl`: |
|||
|
|||
``` |
|||
> curl -x "127.0.0.1:9900" "http://ipfs.io/ipfs/QmfUX75pGRBRDnjeoMkQzuQczuCup2aYbeLxz5NzeSu9G6" |
|||
it works! |
|||
``` |
@ -1,276 +0,0 @@ |
|||
package main |
|||
|
|||
import ( |
|||
"bufio" |
|||
"context" |
|||
"flag" |
|||
"fmt" |
|||
"io" |
|||
"log" |
|||
"net/http" |
|||
"strings" |
|||
|
|||
// We need to import libp2p's libraries that we use in this project.
|
|||
// In order to work, these libraries need to be rewritten by gx-go.
|
|||
host "github.com/libp2p/go-libp2p-host" |
|||
inet "github.com/libp2p/go-libp2p-net" |
|||
peer "github.com/libp2p/go-libp2p-peer" |
|||
ps "github.com/libp2p/go-libp2p-peerstore" |
|||
ma "github.com/multiformats/go-multiaddr" |
|||
manet "github.com/multiformats/go-multiaddr-net" |
|||
|
|||
libp2p "github.com/libp2p/go-libp2p" |
|||
) |
|||
|
|||
// Protocol defines the libp2p protocol that we will use for the libp2p proxy
|
|||
// service that we are going to provide. This will tag the streams used for
|
|||
// this service. Streams are multiplexed and their protocol tag helps
|
|||
// libp2p handle them to the right handler functions.
|
|||
const Protocol = "/proxy-example/0.0.1" |
|||
|
|||
// makeRandomHost creates a libp2p host with a randomly generated identity.
|
|||
// This step is described in depth in other tutorials.
|
|||
func makeRandomHost(port int) host.Host { |
|||
host, err := libp2p.New(context.Background(), libp2p.ListenAddrStrings(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", port))) |
|||
if err != nil { |
|||
log.Fatalln(err) |
|||
} |
|||
return host |
|||
} |
|||
|
|||
// ProxyService provides HTTP proxying on top of libp2p by launching an
|
|||
// HTTP server which tunnels the requests to a destination peer running
|
|||
// ProxyService too.
|
|||
type ProxyService struct { |
|||
host host.Host |
|||
dest peer.ID |
|||
proxyAddr ma.Multiaddr |
|||
} |
|||
|
|||
// NewProxyService attaches a proxy service to the given libp2p Host.
|
|||
// The proxyAddr parameter specifies the address on which the
|
|||
// HTTP proxy server listens. The dest parameter specifies the peer
|
|||
// ID of the remote peer in charge of performing the HTTP requests.
|
|||
//
|
|||
// ProxyAddr/dest may be nil/"" it is not necessary that this host
|
|||
// provides a listening HTTP server (and instead its only function is to
|
|||
// perform the proxied http requests it receives from a different peer.
|
|||
//
|
|||
// The addresses for the dest peer should be part of the host's peerstore.
|
|||
func NewProxyService(h host.Host, proxyAddr ma.Multiaddr, dest peer.ID) *ProxyService { |
|||
// We let our host know that it needs to handle streams tagged with the
|
|||
// protocol id that we have defined, and then handle them to
|
|||
// our own streamHandling function.
|
|||
h.SetStreamHandler(Protocol, streamHandler) |
|||
|
|||
fmt.Println("Proxy server is ready") |
|||
fmt.Println("libp2p-peer addresses:") |
|||
for _, a := range h.Addrs() { |
|||
fmt.Printf("%s/ipfs/%s\n", a, peer.IDB58Encode(h.ID())) |
|||
} |
|||
|
|||
return &ProxyService{ |
|||
host: h, |
|||
dest: dest, |
|||
proxyAddr: proxyAddr, |
|||
} |
|||
} |
|||
|
|||
// streamHandler is our function to handle any libp2p-net streams that belong
|
|||
// to our protocol. The streams should contain an HTTP request which we need
|
|||
// to parse, make on behalf of the original node, and then write the response
|
|||
// on the stream, before closing it.
|
|||
func streamHandler(stream inet.Stream) { |
|||
// Remember to close the stream when we are done.
|
|||
defer stream.Close() |
|||
|
|||
// Create a new buffered reader, as ReadRequest needs one.
|
|||
// The buffered reader reads from our stream, on which we
|
|||
// have sent the HTTP request (see ServeHTTP())
|
|||
buf := bufio.NewReader(stream) |
|||
// Read the HTTP request from the buffer
|
|||
req, err := http.ReadRequest(buf) |
|||
if err != nil { |
|||
stream.Reset() |
|||
log.Println(err) |
|||
return |
|||
} |
|||
defer req.Body.Close() |
|||
|
|||
// We need to reset these fields in the request
|
|||
// URL as they are not maintained.
|
|||
req.URL.Scheme = "http" |
|||
hp := strings.Split(req.Host, ":") |
|||
if len(hp) > 1 && hp[1] == "443" { |
|||
req.URL.Scheme = "https" |
|||
} else { |
|||
req.URL.Scheme = "http" |
|||
} |
|||
req.URL.Host = req.Host |
|||
|
|||
outreq := new(http.Request) |
|||
*outreq = *req |
|||
|
|||
// We now make the request
|
|||
fmt.Printf("Making request to %s\n", req.URL) |
|||
resp, err := http.DefaultTransport.RoundTrip(outreq) |
|||
if err != nil { |
|||
stream.Reset() |
|||
log.Println(err) |
|||
return |
|||
} |
|||
|
|||
// resp.Write writes whatever response we obtained for our
|
|||
// request back to the stream.
|
|||
resp.Write(stream) |
|||
} |
|||
|
|||
// Serve listens on the ProxyService's proxy address. This effectively
|
|||
// allows to set the listening address as http proxy.
|
|||
func (p *ProxyService) Serve() { |
|||
_, serveArgs, _ := manet.DialArgs(p.proxyAddr) |
|||
fmt.Println("proxy listening on ", serveArgs) |
|||
if p.dest != "" { |
|||
http.ListenAndServe(serveArgs, p) |
|||
} |
|||
} |
|||
|
|||
// ServeHTTP implements the http.Handler interface. WARNING: This is the
|
|||
// simplest approach to a proxy. Therefore we do not do any of the things
|
|||
// that should be done when implementing a reverse proxy (like handling
|
|||
// headers correctly). For how to do it properly, see:
|
|||
// https://golang.org/src/net/http/httputil/reverseproxy.go?s=3845:3920#L121
|
|||
//
|
|||
// ServeHTTP opens a stream to the dest peer for every HTTP request.
|
|||
// Streams are multiplexed over single connections so, unlike connections
|
|||
// themselves, they are cheap to create and dispose of.
|
|||
func (p *ProxyService) ServeHTTP(w http.ResponseWriter, r *http.Request) { |
|||
fmt.Printf("proxying request for %s to peer %s\n", r.URL, p.dest.Pretty()) |
|||
// We need to send the request to the remote libp2p peer, so
|
|||
// we open a stream to it
|
|||
stream, err := p.host.NewStream(context.Background(), p.dest, Protocol) |
|||
// If an error happens, we write an error for response.
|
|||
if err != nil { |
|||
log.Println(err) |
|||
http.Error(w, err.Error(), http.StatusInternalServerError) |
|||
return |
|||
} |
|||
defer stream.Close() |
|||
|
|||
// r.Write() writes the HTTP request to the stream.
|
|||
err = r.Write(stream) |
|||
if err != nil { |
|||
stream.Reset() |
|||
log.Println(err) |
|||
http.Error(w, err.Error(), http.StatusServiceUnavailable) |
|||
return |
|||
} |
|||
|
|||
// Now we read the response that was sent from the dest
|
|||
// peer
|
|||
buf := bufio.NewReader(stream) |
|||
resp, err := http.ReadResponse(buf, r) |
|||
if err != nil { |
|||
stream.Reset() |
|||
log.Println(err) |
|||
http.Error(w, err.Error(), http.StatusServiceUnavailable) |
|||
return |
|||
} |
|||
|
|||
// Copy any headers
|
|||
for k, v := range resp.Header { |
|||
for _, s := range v { |
|||
w.Header().Add(k, s) |
|||
} |
|||
} |
|||
|
|||
// Write response status and headers
|
|||
w.WriteHeader(resp.StatusCode) |
|||
|
|||
// Finally copy the body
|
|||
io.Copy(w, resp.Body) |
|||
resp.Body.Close() |
|||
} |
|||
|
|||
// addAddrToPeerstore parses a peer multiaddress and adds
|
|||
// it to the given host's peerstore, so it knows how to
|
|||
// contact it. It returns the peer ID of the remote peer.
|
|||
func addAddrToPeerstore(h host.Host, addr string) peer.ID { |
|||
// The following code extracts target's the peer ID from the
|
|||
// given multiaddress
|
|||
ipfsaddr, err := ma.NewMultiaddr(addr) |
|||
if err != nil { |
|||
log.Fatalln(err) |
|||
} |
|||
pid, err := ipfsaddr.ValueForProtocol(ma.P_IPFS) |
|||
if err != nil { |
|||
log.Fatalln(err) |
|||
} |
|||
|
|||
peerid, err := peer.IDB58Decode(pid) |
|||
if err != nil { |
|||
log.Fatalln(err) |
|||
} |
|||
|
|||
// Decapsulate the /ipfs/<peerID> part from the target
|
|||
// /ip4/<a.b.c.d>/ipfs/<peer> becomes /ip4/<a.b.c.d>
|
|||
targetPeerAddr, _ := ma.NewMultiaddr( |
|||
fmt.Sprintf("/ipfs/%s", peer.IDB58Encode(peerid))) |
|||
targetAddr := ipfsaddr.Decapsulate(targetPeerAddr) |
|||
|
|||
// We have a peer ID and a targetAddr so we add
|
|||
// it to the peerstore so LibP2P knows how to contact it
|
|||
h.Peerstore().AddAddr(peerid, targetAddr, ps.PermanentAddrTTL) |
|||
return peerid |
|||
} |
|||
|
|||
const help = ` |
|||
This example creates a simple HTTP Proxy using two libp2p peers. The first peer |
|||
provides an HTTP server locally which tunnels the HTTP requests with libp2p |
|||
to a remote peer. The remote peer performs the requests and |
|||
send the sends the response back. |
|||
|
|||
Usage: Start remote peer first with: ./proxy |
|||
Then start the local peer with: ./proxy -d <remote-peer-multiaddress> |
|||
|
|||
Then you can do something like: curl -x "localhost:9900" "http://ipfs.io". |
|||
This proxies sends the request through the local peer, which proxies it to |
|||
the remote peer, which makes it and sends the response back. |
|||
` |
|||
|
|||
func main() { |
|||
flag.Usage = func() { |
|||
fmt.Println(help) |
|||
flag.PrintDefaults() |
|||
} |
|||
|
|||
// Parse some flags
|
|||
destPeer := flag.String("d", "", "destination peer address") |
|||
port := flag.Int("p", 9900, "proxy port") |
|||
p2pport := flag.Int("l", 12000, "libp2p listen port") |
|||
flag.Parse() |
|||
|
|||
// If we have a destination peer we will start a local server
|
|||
if *destPeer != "" { |
|||
// We use p2pport+1 in order to not collide if the user
|
|||
// is running the remote peer locally on that port
|
|||
host := makeRandomHost(*p2pport + 1) |
|||
// Make sure our host knows how to reach destPeer
|
|||
destPeerID := addAddrToPeerstore(host, *destPeer) |
|||
proxyAddr, err := ma.NewMultiaddr(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", *port)) |
|||
if err != nil { |
|||
log.Fatalln(err) |
|||
} |
|||
// Create the proxy service and start the http server
|
|||
proxy := NewProxyService(host, proxyAddr, destPeerID) |
|||
proxy.Serve() // serve hangs forever
|
|||
} else { |
|||
host := makeRandomHost(*p2pport) |
|||
// In this case we only need to make sure our host
|
|||
// knows how to handle incoming proxied requests from
|
|||
// another peer.
|
|||
_ = NewProxyService(host, nil, "") |
|||
<-make(chan struct{}) // hang forever
|
|||
} |
|||
|
|||
} |
@ -1,63 +0,0 @@ |
|||
# The libp2p 'host' |
|||
|
|||
For most applications, the host is the basic building block you'll need to get started. This guide will show how to construct and use a simple host. |
|||
|
|||
The host is an abstraction that manages services on top of a swarm. It provides a clean interface to connect to a service on a given remote peer. |
|||
|
|||
If you want to create a host with a default configuration, you can do the following: |
|||
|
|||
```go |
|||
import ( |
|||
"context" |
|||
"crypto/rand" |
|||
"fmt" |
|||
|
|||
libp2p "github.com/libp2p/go-libp2p" |
|||
crypto "github.com/libp2p/go-libp2p-crypto" |
|||
) |
|||
|
|||
|
|||
// The context governs the lifetime of the libp2p node |
|||
ctx, cancel := context.WithCancel(context.Background()) |
|||
defer cancel() |
|||
|
|||
// To construct a simple host with all the default settings, just use `New` |
|||
h, err := libp2p.New(ctx) |
|||
if err != nil { |
|||
panic(err) |
|||
} |
|||
|
|||
fmt.Printf("Hello World, my hosts ID is %s\n", h.ID()) |
|||
``` |
|||
|
|||
If you want more control over the configuration, you can specify some options to the constructor. For a full list of all the configuration supported by the constructor see: [options.go](https://github.com/libp2p/go-libp2p/blob/master/options.go) |
|||
|
|||
In this snippet we generate our own ID and specified on which address we want to listen: |
|||
|
|||
```go |
|||
// Set your own keypair |
|||
priv, _, err := crypto.GenerateEd25519Key(rand.Reader) |
|||
if err != nil { |
|||
panic(err) |
|||
} |
|||
|
|||
h2, err := libp2p.New(ctx, |
|||
// Use your own created keypair |
|||
libp2p.Identity(priv), |
|||
|
|||
// Set your own listen address |
|||
// The config takes an array of addresses, specify as many as you want. |
|||
libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/9000"), |
|||
) |
|||
if err != nil { |
|||
panic(err) |
|||
} |
|||
|
|||
fmt.Printf("Hello World, my second hosts ID is %s\n", h2.ID()) |
|||
``` |
|||
|
|||
And thats it, you have a libp2p host and you're ready to start doing some awesome p2p networking! |
|||
|
|||
In future guides we will go over ways to use hosts, configure them differently (hint: there are a huge number of ways to set these up), and interesting ways to apply this technology to various applications you might want to build. |
|||
|
|||
To see this code all put together, take a look at [host.go](host.go). |
@ -1,47 +0,0 @@ |
|||
package main |
|||
|
|||
import ( |
|||
"context" |
|||
"crypto/rand" |
|||
"fmt" |
|||
|
|||
libp2p "github.com/libp2p/go-libp2p" |
|||
crypto "github.com/libp2p/go-libp2p-crypto" |
|||
) |
|||
|
|||
func main() { |
|||
// The context governs the lifetime of the libp2p node
|
|||
ctx, cancel := context.WithCancel(context.Background()) |
|||
defer cancel() |
|||
|
|||
// To construct a simple host with all the default settings, just use `New`
|
|||
h, err := libp2p.New(ctx) |
|||
if err != nil { |
|||
panic(err) |
|||
} |
|||
|
|||
fmt.Printf("Hello World, my hosts ID is %s\n", h.ID()) |
|||
|
|||
// If you want more control over the configuration, you can specify some
|
|||
// options to the constructor
|
|||
|
|||
// Set your own keypair
|
|||
priv, _, err := crypto.GenerateEd25519Key(rand.Reader) |
|||
if err != nil { |
|||
panic(err) |
|||
} |
|||
|
|||
h2, err := libp2p.New(ctx, |
|||
// Use your own created keypair
|
|||
libp2p.Identity(priv), |
|||
|
|||
// Set your own listen address
|
|||
// The config takes an array of addresses, specify as many as you want.
|
|||
libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/9000"), |
|||
) |
|||
if err != nil { |
|||
panic(err) |
|||
} |
|||
|
|||
fmt.Printf("Hello World, my second hosts ID is %s\n", h2.ID()) |
|||
} |
@ -1,3 +0,0 @@ |
|||
# This is the official list of authors for copyright purposes. |
|||
|
|||
Aviv Eyal <aviveyal07@gmail.com> |
@ -1,21 +0,0 @@ |
|||
The MIT License (MIT) |
|||
|
|||
Copyright (c) 2017 Aviv Eyal |
|||
|
|||
Permission is hereby granted, free of charge, to any person obtaining a copy |
|||
of this software and associated documentation files (the "Software"), to deal |
|||
in the Software without restriction, including without limitation the rights |
|||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell |
|||
copies of the Software, and to permit persons to whom the Software is |
|||
furnished to do so, subject to the following conditions: |
|||
|
|||
The above copyright notice and this permission notice shall be included in |
|||
all copies or substantial portions of the Software. |
|||
|
|||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR |
|||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, |
|||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE |
|||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER |
|||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, |
|||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN |
|||
THE SOFTWARE. |
@ -1,62 +0,0 @@ |
|||
# Protocol Multiplexing using rpc-style multicodecs, protobufs with libp2p |
|||
|
|||
This examples shows how to use multicodecs (i.e. protobufs) to encode and transmit information between LibP2P hosts using LibP2P Streams. |
|||
Multicodecs present a common interface, making it very easy to swap the codec implementation if needed. |
|||
This example expects that you area already familiar with the [echo example](https://github.com/libp2p/go-libp2p/tree/master/examples/echo). |
|||
|
|||
## Build |
|||
|
|||
Install gx: |
|||
```sh |
|||
> go get -u github.com/whyrusleeping/gx |
|||
|
|||
``` |
|||
|
|||
Run GX from the root libp2p source dir: |
|||
```sh |
|||
>gx install |
|||
``` |
|||
|
|||
Build libp2p: |
|||
```sh |
|||
> make deps |
|||
> make |
|||
``` |
|||
|
|||
Run from `multipro` directory |
|||
|
|||
```sh |
|||
> go build |
|||
``` |
|||
|
|||
|
|||
## Usage |
|||
|
|||
```sh |
|||
> ./multipro |
|||
|
|||
``` |
|||
|
|||
## Details |
|||
|
|||
The example creates two LibP2P Hosts supporting 2 protocols: ping and echo. |
|||
|
|||
Each protocol consists RPC-style requests and responses and each request and response is a typed protobufs message (and a go data object). |
|||
|
|||
This is a different pattern then defining a whole p2p protocol as one protobuf message with lots of optional fields (as can be observed in various p2p-lib protocols using protobufs such as dht). |
|||
|
|||
The example shows how to match async received responses with their requests. This is useful when processing a response requires access to the request data. |
|||
|
|||
The idea is to use lib-p2p protocol multiplexing on a per-message basis. |
|||
|
|||
### Features |
|||
1. 2 fully implemented protocols using an RPC-like request-response pattern - Ping and Echo |
|||
2. Scaffolding for quickly implementing new app-level versioned RPC-like protocols |
|||
3. Full authentication of incoming message data by author (who might not be the message's sender peer) |
|||
4. Base p2p format in protobufs with fields shared by all protocol messages |
|||
5. Full access to request data when processing a response. |
|||
|
|||
## Author |
|||
@avive |
|||
|
|||
|
@ -1,157 +0,0 @@ |
|||
package main |
|||
|
|||
import ( |
|||
"bufio" |
|||
"context" |
|||
"fmt" |
|||
"log" |
|||
|
|||
inet "github.com/libp2p/go-libp2p-net" |
|||
|
|||
uuid "github.com/google/uuid" |
|||
"github.com/libp2p/go-libp2p-host" |
|||
pb "github.com/libp2p/go-libp2p/examples/multipro/pb" |
|||
protobufCodec "github.com/multiformats/go-multicodec/protobuf" |
|||
) |
|||
|
|||
// pattern: /protocol-name/request-or-response-message/version
|
|||
const echoRequest = "/echo/echoreq/0.0.1" |
|||
const echoResponse = "/echo/echoresp/0.0.1" |
|||
|
|||
type EchoProtocol struct { |
|||
node *Node // local host
|
|||
requests map[string]*pb.EchoRequest // used to access request data from response handlers
|
|||
done chan bool // only for demo purposes to hold main from terminating
|
|||
} |
|||
|
|||
func NewEchoProtocol(node *Node, done chan bool) *EchoProtocol { |
|||
e := EchoProtocol{node: node, requests: make(map[string]*pb.EchoRequest), done: done} |
|||
node.SetStreamHandler(echoRequest, e.onEchoRequest) |
|||
node.SetStreamHandler(echoResponse, e.onEchoResponse) |
|||
|
|||
// design note: to implement fire-and-forget style messages you may just skip specifying a response callback.
|
|||
// a fire-and-forget message will just include a request and not specify a response object
|
|||
|
|||
return &e |
|||
} |
|||
|
|||
// remote peer requests handler
|
|||
func (e *EchoProtocol) onEchoRequest(s inet.Stream) { |
|||
// get request data
|
|||
data := &pb.EchoRequest{} |
|||
decoder := protobufCodec.Multicodec(nil).Decoder(bufio.NewReader(s)) |
|||
err := decoder.Decode(data) |
|||
if err != nil { |
|||
log.Println(err) |
|||
return |
|||
} |
|||
|
|||
log.Printf("%s: Received echo request from %s. Message: %s", s.Conn().LocalPeer(), s.Conn().RemotePeer(), data.Message) |
|||
|
|||
valid := e.node.authenticateMessage(data, data.MessageData) |
|||
|
|||
if !valid { |
|||
log.Println("Failed to authenticate message") |
|||
return |
|||
} |
|||
|
|||
log.Printf("%s: Sending echo response to %s. Message id: %s...", s.Conn().LocalPeer(), s.Conn().RemotePeer(), data.MessageData.Id) |
|||
|
|||
// send response to the request using the message string he provided
|
|||
|
|||
resp := &pb.EchoResponse{ |
|||
MessageData: e.node.NewMessageData(data.MessageData.Id, false), |
|||
Message: data.Message} |
|||
|
|||
// sign the data
|
|||
signature, err := e.node.signProtoMessage(resp) |
|||
if err != nil { |
|||
log.Println("failed to sign response") |
|||
return |
|||
} |
|||
|
|||
// add the signature to the message
|
|||
resp.MessageData.Sign = string(signature) |
|||
|
|||
s, respErr := e.node.NewStream(context.Background(), s.Conn().RemotePeer(), echoResponse) |
|||
if respErr != nil { |
|||
log.Println(respErr) |
|||
return |
|||
} |
|||
|
|||
ok := e.node.sendProtoMessage(resp, s) |
|||
|
|||
if ok { |
|||
log.Printf("%s: Echo response to %s sent.", s.Conn().LocalPeer().String(), s.Conn().RemotePeer().String()) |
|||
} |
|||
} |
|||
|
|||
// remote echo response handler
|
|||
func (e *EchoProtocol) onEchoResponse(s inet.Stream) { |
|||
data := &pb.EchoResponse{} |
|||
decoder := protobufCodec.Multicodec(nil).Decoder(bufio.NewReader(s)) |
|||
err := decoder.Decode(data) |
|||
if err != nil { |
|||
return |
|||
} |
|||
|
|||
// authenticate message content
|
|||
valid := e.node.authenticateMessage(data, data.MessageData) |
|||
|
|||
if !valid { |
|||
log.Println("Failed to authenticate message") |
|||
return |
|||
} |
|||
|
|||
// locate request data and remove it if found
|
|||
req, ok := e.requests[data.MessageData.Id] |
|||
if ok { |
|||
// remove request from map as we have processed it here
|
|||
delete(e.requests, data.MessageData.Id) |
|||
} else { |
|||
log.Println("Failed to locate request data boject for response") |
|||
return |
|||
} |
|||
|
|||
if req.Message != data.Message { |
|||
log.Fatalln("Expected echo to respond with request message") |
|||
} |
|||
|
|||
log.Printf("%s: Received echo response from %s. Message id:%s. Message: %s.", s.Conn().LocalPeer(), s.Conn().RemotePeer(), data.MessageData.Id, data.Message) |
|||
e.done <- true |
|||
} |
|||
|
|||
func (e *EchoProtocol) Echo(host host.Host) bool { |
|||
log.Printf("%s: Sending echo to: %s....", e.node.ID(), host.ID()) |
|||
|
|||
// create message data
|
|||
req := &pb.EchoRequest{ |
|||
MessageData: e.node.NewMessageData(uuid.New().String(), false), |
|||
Message: fmt.Sprintf("Echo from %s", e.node.ID())} |
|||
|
|||
signature, err := e.node.signProtoMessage(req) |
|||
if err != nil { |
|||
log.Println("failed to sign message") |
|||
return false |
|||
} |
|||
|
|||
// add the signature to the message
|
|||
req.MessageData.Sign = string(signature) |
|||
|
|||
s, err := e.node.NewStream(context.Background(), host.ID(), echoRequest) |
|||
if err != nil { |
|||
log.Println(err) |
|||
return false |
|||
} |
|||
|
|||
ok := e.node.sendProtoMessage(req, s) |
|||
|
|||
if !ok { |
|||
return false |
|||
} |
|||
|
|||
// store request so response handler has access to it
|
|||
e.requests[req.MessageData.Id] = req |
|||
log.Printf("%s: Echo to: %s was sent. Message Id: %s, Message: %s", e.node.ID(), host.ID(), req.MessageData.Id, req.Message) |
|||
return true |
|||
} |
@ -1,56 +0,0 @@ |
|||
package main |
|||
|
|||
import ( |
|||
"context" |
|||
"fmt" |
|||
"log" |
|||
"math/rand" |
|||
|
|||
libp2p "github.com/libp2p/go-libp2p" |
|||
crypto "github.com/libp2p/go-libp2p-crypto" |
|||
ps "github.com/libp2p/go-libp2p-peerstore" |
|||
ma "github.com/multiformats/go-multiaddr" |
|||
) |
|||
|
|||
// helper method - create a lib-p2p host to listen on a port
|
|||
func makeRandomNode(port int, done chan bool) *Node { |
|||
// Ignoring most errors for brevity
|
|||
// See echo example for more details and better implementation
|
|||
priv, _, _ := crypto.GenerateKeyPair(crypto.Secp256k1, 256) |
|||
listen, _ := ma.NewMultiaddr(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", port)) |
|||
host, _ := libp2p.New( |
|||
context.Background(), |
|||
libp2p.ListenAddrs(listen), |
|||
libp2p.Identity(priv), |
|||
) |
|||
|
|||
return NewNode(host, done) |
|||
} |
|||
|
|||
func main() { |
|||
// Choose random ports between 10000-10100
|
|||
rand.Seed(666) |
|||
port1 := rand.Intn(100) + 10000 |
|||
port2 := port1 + 1 |
|||
|
|||
done := make(chan bool, 1) |
|||
|
|||
// Make 2 hosts
|
|||
h1 := makeRandomNode(port1, done) |
|||
h2 := makeRandomNode(port2, done) |
|||
h1.Peerstore().AddAddrs(h2.ID(), h2.Addrs(), ps.PermanentAddrTTL) |
|||
h2.Peerstore().AddAddrs(h1.ID(), h1.Addrs(), ps.PermanentAddrTTL) |
|||
|
|||
log.Printf("This is a conversation between %s and %s\n", h1.ID(), h2.ID()) |
|||
|
|||
// send messages using the protocols
|
|||
h1.Ping(h2.Host) |
|||
h2.Ping(h1.Host) |
|||
h1.Echo(h2.Host) |
|||
h2.Echo(h1.Host) |
|||
|
|||
// block until all responses have been processed
|
|||
for i := 0; i < 4; i++ { |
|||
<-done |
|||
} |
|||
} |
@ -1,150 +0,0 @@ |
|||
package main |
|||
|
|||
import ( |
|||
"bufio" |
|||
"log" |
|||
"time" |
|||
|
|||
"github.com/gogo/protobuf/proto" |
|||
crypto "github.com/libp2p/go-libp2p-crypto" |
|||
host "github.com/libp2p/go-libp2p-host" |
|||
inet "github.com/libp2p/go-libp2p-net" |
|||
peer "github.com/libp2p/go-libp2p-peer" |
|||
p2p "github.com/libp2p/go-libp2p/examples/multipro/pb" |
|||
protobufCodec "github.com/multiformats/go-multicodec/protobuf" |
|||
) |
|||
|
|||
// node client version
|
|||
const clientVersion = "go-p2p-node/0.0.1" |
|||
|
|||
// Node type - a p2p host implementing one or more p2p protocols
|
|||
type Node struct { |
|||
host.Host // lib-p2p host
|
|||
*PingProtocol // ping protocol impl
|
|||
*EchoProtocol // echo protocol impl
|
|||
// add other protocols here...
|
|||
} |
|||
|
|||
// Create a new node with its implemented protocols
|
|||
func NewNode(host host.Host, done chan bool) *Node { |
|||
node := &Node{Host: host} |
|||
node.PingProtocol = NewPingProtocol(node, done) |
|||
node.EchoProtocol = NewEchoProtocol(node, done) |
|||
return node |
|||
} |
|||
|
|||
// Authenticate incoming p2p message
|
|||
// message: a protobufs go data object
|
|||
// data: common p2p message data
|
|||
func (n *Node) authenticateMessage(message proto.Message, data *p2p.MessageData) bool { |
|||
// store a temp ref to signature and remove it from message data
|
|||
// sign is a string to allow easy reset to zero-value (empty string)
|
|||
sign := data.Sign |
|||
data.Sign = "" |
|||
|
|||
// marshall data without the signature to protobufs3 binary format
|
|||
bin, err := proto.Marshal(message) |
|||
if err != nil { |
|||
log.Println(err, "failed to marshal pb message") |
|||
return false |
|||
} |
|||
|
|||
// restore sig in message data (for possible future use)
|
|||
data.Sign = sign |
|||
|
|||
// restore peer id binary format from base58 encoded node id data
|
|||
peerId, err := peer.IDB58Decode(data.NodeId) |
|||
if err != nil { |
|||
log.Println(err, "Failed to decode node id from base58") |
|||
return false |
|||
} |
|||
|
|||
// verify the data was authored by the signing peer identified by the public key
|
|||
// and signature included in the message
|
|||
return n.verifyData(bin, []byte(sign), peerId, data.NodePubKey) |
|||
} |
|||
|
|||
// sign an outgoing p2p message payload
|
|||
func (n *Node) signProtoMessage(message proto.Message) ([]byte, error) { |
|||
data, err := proto.Marshal(message) |
|||
if err != nil { |
|||
return nil, err |
|||
} |
|||
return n.signData(data) |
|||
} |
|||
|
|||
// sign binary data using the local node's private key
|
|||
func (n *Node) signData(data []byte) ([]byte, error) { |
|||
key := n.Peerstore().PrivKey(n.ID()) |
|||
res, err := key.Sign(data) |
|||
return res, err |
|||
} |
|||
|
|||
// Verify incoming p2p message data integrity
|
|||
// data: data to verify
|
|||
// signature: author signature provided in the message payload
|
|||
// peerId: author peer id from the message payload
|
|||
// pubKeyData: author public key from the message payload
|
|||
func (n *Node) verifyData(data []byte, signature []byte, peerId peer.ID, pubKeyData []byte) bool { |
|||
key, err := crypto.UnmarshalPublicKey(pubKeyData) |
|||
if err != nil { |
|||
log.Println(err, "Failed to extract key from message key data") |
|||
return false |
|||
} |
|||
|
|||
// extract node id from the provided public key
|
|||
idFromKey, err := peer.IDFromPublicKey(key) |
|||
|
|||
if err != nil { |
|||
log.Println(err, "Failed to extract peer id from public key") |
|||
return false |
|||
} |
|||
|
|||
// verify that message author node id matches the provided node public key
|
|||
if idFromKey != peerId { |
|||
log.Println(err, "Node id and provided public key mismatch") |
|||
return false |
|||
} |
|||
|
|||
res, err := key.Verify(data, signature) |
|||
if err != nil { |
|||
log.Println(err, "Error authenticating data") |
|||
return false |
|||
} |
|||
|
|||
return res |
|||
} |
|||
|
|||
// helper method - generate message data shared between all node's p2p protocols
|
|||
// messageId: unique for requests, copied from request for responses
|
|||
func (n *Node) NewMessageData(messageId string, gossip bool) *p2p.MessageData { |
|||
// Add protobufs bin data for message author public key
|
|||
// this is useful for authenticating messages forwarded by a node authored by another node
|
|||
nodePubKey, err := n.Peerstore().PubKey(n.ID()).Bytes() |
|||
|
|||
if err != nil { |
|||
panic("Failed to get public key for sender from local peer store.") |
|||
} |
|||
|
|||
return &p2p.MessageData{ClientVersion: clientVersion, |
|||
NodeId: peer.IDB58Encode(n.ID()), |
|||
NodePubKey: nodePubKey, |
|||
Timestamp: time.Now().Unix(), |
|||
Id: messageId, |
|||
Gossip: gossip} |
|||
} |
|||
|
|||
// helper method - writes a protobuf go data object to a network stream
|
|||
// data: reference of protobuf go data object to send (not the object itself)
|
|||
// s: network stream to write the data to
|
|||
func (n *Node) sendProtoMessage(data proto.Message, s inet.Stream) bool { |
|||
writer := bufio.NewWriter(s) |
|||
enc := protobufCodec.Multicodec(nil).Encoder(writer) |
|||
err := enc.Encode(data) |
|||
if err != nil { |
|||
log.Println(err) |
|||
return false |
|||
} |
|||
writer.Flush() |
|||
return true |
|||
} |
@ -1,121 +0,0 @@ |
|||
// Code generated by protoc-gen-gogo.
|
|||
// source: p2p.proto
|
|||
// DO NOT EDIT!
|
|||
|
|||
/* |
|||
Package protocols_p2p is a generated protocol buffer package. |
|||
|
|||
It is generated from these files: |
|||
p2p.proto |
|||
|
|||
It has these top-level messages: |
|||
MessageData |
|||
PingRequest |
|||
PingResponse |
|||
EchoRequest |
|||
EchoResponse |
|||
*/ |
|||
package protocols_p2p |
|||
|
|||
import proto "github.com/gogo/protobuf/proto" |
|||
import fmt "fmt" |
|||
import math "math" |
|||
|
|||
// Reference imports to suppress errors if they are not otherwise used.
|
|||
var _ = proto.Marshal |
|||
var _ = fmt.Errorf |
|||
var _ = math.Inf |
|||
|
|||
// designed to be shared between all app protocols
|
|||
type MessageData struct { |
|||
// shared between all requests
|
|||
ClientVersion string `protobuf:"bytes,1,opt,name=clientVersion,proto3" json:"clientVersion,omitempty"` |
|||
Timestamp int64 `protobuf:"varint,2,opt,name=timestamp,proto3" json:"timestamp,omitempty"` |
|||
Id string `protobuf:"bytes,3,opt,name=id,proto3" json:"id,omitempty"` |
|||
Gossip bool `protobuf:"varint,4,opt,name=gossip,proto3" json:"gossip,omitempty"` |
|||
NodeId string `protobuf:"bytes,5,opt,name=nodeId,proto3" json:"nodeId,omitempty"` |
|||
NodePubKey []byte `protobuf:"bytes,6,opt,name=nodePubKey,proto3" json:"nodePubKey,omitempty"` |
|||
Sign string `protobuf:"bytes,7,opt,name=sign,proto3" json:"sign,omitempty"` |
|||
} |
|||
|
|||
func (m *MessageData) Reset() { *m = MessageData{} } |
|||
func (m *MessageData) String() string { return proto.CompactTextString(m) } |
|||
func (*MessageData) ProtoMessage() {} |
|||
|
|||
// a protocol define a set of reuqest and responses
|
|||
type PingRequest struct { |
|||
MessageData *MessageData `protobuf:"bytes,1,opt,name=messageData" json:"messageData,omitempty"` |
|||
// method specific data
|
|||
Message string `protobuf:"bytes,2,opt,name=message,proto3" json:"message,omitempty"` |
|||
} |
|||
|
|||
func (m *PingRequest) Reset() { *m = PingRequest{} } |
|||
func (m *PingRequest) String() string { return proto.CompactTextString(m) } |
|||
func (*PingRequest) ProtoMessage() {} |
|||
|
|||
func (m *PingRequest) GetMessageData() *MessageData { |
|||
if m != nil { |
|||
return m.MessageData |
|||
} |
|||
return nil |
|||
} |
|||
|
|||
type PingResponse struct { |
|||
MessageData *MessageData `protobuf:"bytes,1,opt,name=messageData" json:"messageData,omitempty"` |
|||
// response specific data
|
|||
Message string `protobuf:"bytes,2,opt,name=message,proto3" json:"message,omitempty"` |
|||
} |
|||
|
|||
func (m *PingResponse) Reset() { *m = PingResponse{} } |
|||
func (m *PingResponse) String() string { return proto.CompactTextString(m) } |
|||
func (*PingResponse) ProtoMessage() {} |
|||
|
|||
func (m *PingResponse) GetMessageData() *MessageData { |
|||
if m != nil { |
|||
return m.MessageData |
|||
} |
|||
return nil |
|||
} |
|||
|
|||
// a protocol define a set of reuqest and responses
|
|||
type EchoRequest struct { |
|||
MessageData *MessageData `protobuf:"bytes,1,opt,name=messageData" json:"messageData,omitempty"` |
|||
// method specific data
|
|||
Message string `protobuf:"bytes,2,opt,name=message,proto3" json:"message,omitempty"` |
|||
} |
|||
|
|||
func (m *EchoRequest) Reset() { *m = EchoRequest{} } |
|||
func (m *EchoRequest) String() string { return proto.CompactTextString(m) } |
|||
func (*EchoRequest) ProtoMessage() {} |
|||
|
|||
func (m *EchoRequest) GetMessageData() *MessageData { |
|||
if m != nil { |
|||
return m.MessageData |
|||
} |
|||
return nil |
|||
} |
|||
|
|||
type EchoResponse struct { |
|||
MessageData *MessageData `protobuf:"bytes,1,opt,name=messageData" json:"messageData,omitempty"` |
|||
// response specific data
|
|||
Message string `protobuf:"bytes,2,opt,name=message,proto3" json:"message,omitempty"` |
|||
} |
|||
|
|||
func (m *EchoResponse) Reset() { *m = EchoResponse{} } |
|||
func (m *EchoResponse) String() string { return proto.CompactTextString(m) } |
|||
func (*EchoResponse) ProtoMessage() {} |
|||
|
|||
func (m *EchoResponse) GetMessageData() *MessageData { |
|||
if m != nil { |
|||
return m.MessageData |
|||
} |
|||
return nil |
|||
} |
|||
|
|||
func init() { |
|||
proto.RegisterType((*MessageData)(nil), "protocols.p2p.MessageData") |
|||
proto.RegisterType((*PingRequest)(nil), "protocols.p2p.PingRequest") |
|||
proto.RegisterType((*PingResponse)(nil), "protocols.p2p.PingResponse") |
|||
proto.RegisterType((*EchoRequest)(nil), "protocols.p2p.EchoRequest") |
|||
proto.RegisterType((*EchoResponse)(nil), "protocols.p2p.EchoResponse") |
|||
} |
@ -1,56 +0,0 @@ |
|||
syntax = "proto3"; |
|||
|
|||
package protocols.p2p; |
|||
|
|||
// designed to be shared between all app protocols |
|||
message MessageData { |
|||
// shared between all requests |
|||
string clientVersion = 1; // client version |
|||
int64 timestamp = 2; // unix time |
|||
string id = 3; // allows requesters to use request data when processing a response |
|||
bool gossip = 4; // true to have receiver peer gossip the message to neighbors |
|||
string nodeId = 5; // id of node that created the message (not the peer that may have sent it). =base58(mh(sha256(nodePubKey))) |
|||
bytes nodePubKey = 6; // Authoring node Secp256k1 public key (32bytes) - protobufs serielized |
|||
string sign = 7; // signature of message data + method specific data by message authoring node. format: string([]bytes) |
|||
} |
|||
|
|||
//// ping protocol |
|||
|
|||
// a protocol define a set of reuqest and responses |
|||
message PingRequest { |
|||
MessageData messageData = 1; |
|||
|
|||
// method specific data |
|||
string message = 2; |
|||
// add any data here.... |
|||
} |
|||
|
|||
message PingResponse { |
|||
MessageData messageData = 1; |
|||
|
|||
// response specific data |
|||
string message = 2; |
|||
|
|||
// ... add any additional message data here |
|||
} |
|||
|
|||
//// echo protocol |
|||
|
|||
// a protocol define a set of reuqest and responses |
|||
message EchoRequest { |
|||
MessageData messageData = 1; |
|||
|
|||
// method specific data |
|||
string message = 2; |
|||
|
|||
// add any additional message data here.... |
|||
} |
|||
|
|||
message EchoResponse { |
|||
MessageData messageData = 1; |
|||
|
|||
// response specific data |
|||
string message = 2; |
|||
|
|||
// ... add any additional message data here.... |
|||
} |
@ -1,4 +0,0 @@ |
|||
# building p2p.pb.go: |
|||
protoc --gogo_out=. --proto_path=../../../../../../:/usr/local/opt/protobuf/include:. *.proto |
|||
|
|||
|
@ -1,148 +0,0 @@ |
|||
package main |
|||
|
|||
import ( |
|||
"bufio" |
|||
"context" |
|||
"fmt" |
|||
"log" |
|||
|
|||
uuid "github.com/google/uuid" |
|||
"github.com/libp2p/go-libp2p-host" |
|||
inet "github.com/libp2p/go-libp2p-net" |
|||
p2p "github.com/libp2p/go-libp2p/examples/multipro/pb" |
|||
protobufCodec "github.com/multiformats/go-multicodec/protobuf" |
|||
) |
|||
|
|||
// pattern: /protocol-name/request-or-response-message/version
|
|||
const pingRequest = "/ping/pingreq/0.0.1" |
|||
const pingResponse = "/ping/pingresp/0.0.1" |
|||
|
|||
// PingProtocol type
|
|||
type PingProtocol struct { |
|||
node *Node // local host
|
|||
requests map[string]*p2p.PingRequest // used to access request data from response handlers
|
|||
done chan bool // only for demo purposes to stop main from terminating
|
|||
} |
|||
|
|||
func NewPingProtocol(node *Node, done chan bool) *PingProtocol { |
|||
p := &PingProtocol{node: node, requests: make(map[string]*p2p.PingRequest), done: done} |
|||
node.SetStreamHandler(pingRequest, p.onPingRequest) |
|||
node.SetStreamHandler(pingResponse, p.onPingResponse) |
|||
return p |
|||
} |
|||
|
|||
// remote peer requests handler
|
|||
func (p *PingProtocol) onPingRequest(s inet.Stream) { |
|||
|
|||
// get request data
|
|||
data := &p2p.PingRequest{} |
|||
decoder := protobufCodec.Multicodec(nil).Decoder(bufio.NewReader(s)) |
|||
err := decoder.Decode(data) |
|||
if err != nil { |
|||
log.Println(err) |
|||
return |
|||
} |
|||
|
|||
log.Printf("%s: Received ping request from %s. Message: %s", s.Conn().LocalPeer(), s.Conn().RemotePeer(), data.Message) |
|||
|
|||
valid := p.node.authenticateMessage(data, data.MessageData) |
|||
|
|||
if !valid { |
|||
log.Println("Failed to authenticate message") |
|||
return |
|||
} |
|||
|
|||
// generate response message
|
|||
log.Printf("%s: Sending ping response to %s. Message id: %s...", s.Conn().LocalPeer(), s.Conn().RemotePeer(), data.MessageData.Id) |
|||
|
|||
resp := &p2p.PingResponse{MessageData: p.node.NewMessageData(data.MessageData.Id, false), |
|||
Message: fmt.Sprintf("Ping response from %s", p.node.ID())} |
|||
|
|||
// sign the data
|
|||
signature, err := p.node.signProtoMessage(resp) |
|||
if err != nil { |
|||
log.Println("failed to sign response") |
|||
return |
|||
} |
|||
|
|||
// add the signature to the message
|
|||
resp.MessageData.Sign = string(signature) |
|||
|
|||
// send the response
|
|||
s, respErr := p.node.NewStream(context.Background(), s.Conn().RemotePeer(), pingResponse) |
|||
if respErr != nil { |
|||
log.Println(respErr) |
|||
return |
|||
} |
|||
|
|||
ok := p.node.sendProtoMessage(resp, s) |
|||
|
|||
if ok { |
|||
log.Printf("%s: Ping response to %s sent.", s.Conn().LocalPeer().String(), s.Conn().RemotePeer().String()) |
|||
} |
|||
} |
|||
|
|||
// remote ping response handler
|
|||
func (p *PingProtocol) onPingResponse(s inet.Stream) { |
|||
data := &p2p.PingResponse{} |
|||
decoder := protobufCodec.Multicodec(nil).Decoder(bufio.NewReader(s)) |
|||
err := decoder.Decode(data) |
|||
if err != nil { |
|||
return |
|||
} |
|||
|
|||
valid := p.node.authenticateMessage(data, data.MessageData) |
|||
|
|||
if !valid { |
|||
log.Println("Failed to authenticate message") |
|||
return |
|||
} |
|||
|
|||
// locate request data and remove it if found
|
|||
_, ok := p.requests[data.MessageData.Id] |
|||
if ok { |
|||
// remove request from map as we have processed it here
|
|||
delete(p.requests, data.MessageData.Id) |
|||
} else { |
|||
log.Println("Failed to locate request data boject for response") |
|||
return |
|||
} |
|||
|
|||
log.Printf("%s: Received ping response from %s. Message id:%s. Message: %s.", s.Conn().LocalPeer(), s.Conn().RemotePeer(), data.MessageData.Id, data.Message) |
|||
p.done <- true |
|||
} |
|||
|
|||
func (p *PingProtocol) Ping(host host.Host) bool { |
|||
log.Printf("%s: Sending ping to: %s....", p.node.ID(), host.ID()) |
|||
|
|||
// create message data
|
|||
req := &p2p.PingRequest{MessageData: p.node.NewMessageData(uuid.New().String(), false), |
|||
Message: fmt.Sprintf("Ping from %s", p.node.ID())} |
|||
|
|||
// sign the data
|
|||
signature, err := p.node.signProtoMessage(req) |
|||
if err != nil { |
|||
log.Println("failed to sign pb data") |
|||
return false |
|||
} |
|||
|
|||
// add the signature to the message
|
|||
req.MessageData.Sign = string(signature) |
|||
|
|||
s, err := p.node.NewStream(context.Background(), host.ID(), pingRequest) |
|||
if err != nil { |
|||
log.Println(err) |
|||
return false |
|||
} |
|||
|
|||
ok := p.node.sendProtoMessage(req, s) |
|||
|
|||
if !ok { |
|||
return false |
|||
} |
|||
|
|||
// store ref request so response handler has access to it
|
|||
p.requests[req.MessageData.Id] = req |
|||
log.Printf("%s: Ping to: %s was sent. Message Id: %s, Message: %s", p.node.ID(), host.ID(), req.MessageData.Id, req.Message) |
|||
return true |
|||
} |
@ -1,41 +0,0 @@ |
|||
|
|||
|
|||
# Protocol Multiplexing using multicodecs with libp2p |
|||
|
|||
This examples shows how to use multicodecs (i.e. json) to encode and transmit information between LibP2P hosts using LibP2P Streams. |
|||
|
|||
Multicodecs present a common interface, making it very easy to swap the codec implementation if needed. |
|||
|
|||
This example expects that you area already familiar with the [echo example](https://github.com/libp2p/go-libp2p/tree/master/examples/echo). |
|||
|
|||
## Build |
|||
|
|||
From `go-libp2p` base folder: |
|||
|
|||
``` |
|||
> make deps-protocol-muxing |
|||
> go build -o multicodecs ./examples/protocol-multiplexing-with-multicodecs |
|||
``` |
|||
|
|||
## Usage |
|||
|
|||
``` |
|||
> ./multicodecs |
|||
|
|||
``` |
|||
|
|||
## Details |
|||
|
|||
The example creates two LibP2P Hosts. Host1 opens a stream to Host2. Host2 has an `StreamHandler` to deal with the incoming stream. This is covered in the `echo` example. |
|||
|
|||
Both hosts simulate a conversation. But rather than sending raw messages on the stream, each message in the conversation is encoded under a `json` object (using the `json` multicodec). For example: |
|||
|
|||
``` |
|||
{ |
|||
"Msg": "This is the message", |
|||
"Index": 3, |
|||
"HangUp": false |
|||
} |
|||
``` |
|||
|
|||
The stream lasts until one of the sides closes it when the HangUp field is `true`. |
@ -1,171 +0,0 @@ |
|||
package main |
|||
|
|||
import ( |
|||
"bufio" |
|||
"context" |
|||
"fmt" |
|||
"log" |
|||
"math/rand" |
|||
"time" |
|||
|
|||
host "github.com/libp2p/go-libp2p-host" |
|||
inet "github.com/libp2p/go-libp2p-net" |
|||
ps "github.com/libp2p/go-libp2p-peerstore" |
|||
|
|||
libp2p "github.com/libp2p/go-libp2p" |
|||
multicodec "github.com/multiformats/go-multicodec" |
|||
json "github.com/multiformats/go-multicodec/json" |
|||
) |
|||
|
|||
const proto = "/example/1.0.0" |
|||
|
|||
// Message is a serializable/encodable object that we will send
|
|||
// on a Stream.
|
|||
type Message struct { |
|||
Msg string |
|||
Index int |
|||
HangUp bool |
|||
} |
|||
|
|||
// streamWrap wraps a libp2p stream. We encode/decode whenever we
|
|||
// write/read from a stream, so we can just carry the encoders
|
|||
// and bufios with us
|
|||
type WrappedStream struct { |
|||
stream inet.Stream |
|||
enc multicodec.Encoder |
|||
dec multicodec.Decoder |
|||
w *bufio.Writer |
|||
r *bufio.Reader |
|||
} |
|||
|
|||
// wrapStream takes a stream and complements it with r/w bufios and
|
|||
// decoder/encoder. In order to write raw data to the stream we can use
|
|||
// wrap.w.Write(). To encode something into it we can wrap.enc.Encode().
|
|||
// Finally, we should wrap.w.Flush() to actually send the data. Handling
|
|||
// incoming data works similarly with wrap.r.Read() for raw-reading and
|
|||
// wrap.dec.Decode() to decode.
|
|||
func WrapStream(s inet.Stream) *WrappedStream { |
|||
reader := bufio.NewReader(s) |
|||
writer := bufio.NewWriter(s) |
|||
// This is where we pick our specific multicodec. In order to change the
|
|||
// codec, we only need to change this place.
|
|||
// See https://godoc.org/github.com/multiformats/go-multicodec/json
|
|||
dec := json.Multicodec(false).Decoder(reader) |
|||
enc := json.Multicodec(false).Encoder(writer) |
|||
return &WrappedStream{ |
|||
stream: s, |
|||
r: reader, |
|||
w: writer, |
|||
enc: enc, |
|||
dec: dec, |
|||
} |
|||
} |
|||
|
|||
// messages that will be sent between the hosts.
|
|||
var conversationMsgs = []string{ |
|||
"Hello!", |
|||
"Hey!", |
|||
"How are you doing?", |
|||
"Very good! It is great that you can send data on a stream to me!", |
|||
"Not only that, the data is encoded in a JSON object.", |
|||
"Yeah, and we are using the multicodecs interface to encode and decode.", |
|||
"This way we could swap it easily for, say, cbor, or msgpack!", |
|||
"Let's leave that as an excercise for the reader...", |
|||
"Agreed, our last message should activate the HangUp flag", |
|||
"Yes, and the example code will close streams. So sad :(. Bye!", |
|||
} |
|||
|
|||
func makeRandomHost(port int) host.Host { |
|||
h, err := libp2p.New(context.Background(), libp2p.ListenAddrStrings(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", port))) |
|||
if err != nil { |
|||
panic(err) |
|||
} |
|||
return h |
|||
} |
|||
|
|||
func main() { |
|||
// Choose random ports between 10000-10100
|
|||
rand.Seed(666) |
|||
port1 := rand.Intn(100) + 10000 |
|||
port2 := port1 + 1 |
|||
|
|||
// Make 2 hosts
|
|||
h1 := makeRandomHost(port1) |
|||
h2 := makeRandomHost(port2) |
|||
h1.Peerstore().AddAddrs(h2.ID(), h2.Addrs(), ps.PermanentAddrTTL) |
|||
h2.Peerstore().AddAddrs(h1.ID(), h1.Addrs(), ps.PermanentAddrTTL) |
|||
|
|||
log.Printf("This is a conversation between %s and %s\n", h1.ID(), h2.ID()) |
|||
|
|||
// Define a stream handler for host number 2
|
|||
h2.SetStreamHandler(proto, func(stream inet.Stream) { |
|||
log.Printf("%s: Received a stream", h2.ID()) |
|||
wrappedStream := WrapStream(stream) |
|||
defer stream.Close() |
|||
handleStream(wrappedStream) |
|||
}) |
|||
|
|||
// Create new stream from h1 to h2 and start the conversation
|
|||
stream, err := h1.NewStream(context.Background(), h2.ID(), proto) |
|||
if err != nil { |
|||
log.Fatal(err) |
|||
} |
|||
wrappedStream := WrapStream(stream) |
|||
// This sends the first message
|
|||
sendMessage(0, wrappedStream) |
|||
// We keep the conversation on the created stream so we launch
|
|||
// this to handle any responses
|
|||
handleStream(wrappedStream) |
|||
// When we are done, close the stream on our side and exit.
|
|||
stream.Close() |
|||
} |
|||
|
|||
// receiveMessage reads and decodes a message from the stream
|
|||
func receiveMessage(ws *WrappedStream) (*Message, error) { |
|||
var msg Message |
|||
err := ws.dec.Decode(&msg) |
|||
if err != nil { |
|||
return nil, err |
|||
} |
|||
return &msg, nil |
|||
} |
|||
|
|||
// sendMessage encodes and writes a message to the stream
|
|||
func sendMessage(index int, ws *WrappedStream) error { |
|||
msg := &Message{ |
|||
Msg: conversationMsgs[index], |
|||
Index: index, |
|||
HangUp: index >= len(conversationMsgs)-1, |
|||
} |
|||
|
|||
err := ws.enc.Encode(msg) |
|||
// Because output is buffered with bufio, we need to flush!
|
|||
ws.w.Flush() |
|||
return err |
|||
} |
|||
|
|||
// handleStream is a for loop which receives and then sends a message
|
|||
// an artificial delay of 500ms happens in-between.
|
|||
// When Message.HangUp is true, it exists. This will close the stream
|
|||
// on one of the sides. The other side's receiveMessage() will error
|
|||
// with EOF, thus also breaking out from the loop.
|
|||
func handleStream(ws *WrappedStream) { |
|||
for { |
|||
// Read
|
|||
msg, err := receiveMessage(ws) |
|||
if err != nil { |
|||
break |
|||
} |
|||
pid := ws.stream.Conn().LocalPeer() |
|||
log.Printf("%s says: %s\n", pid, msg.Msg) |
|||
time.Sleep(500 * time.Millisecond) |
|||
if msg.HangUp { |
|||
break |
|||
} |
|||
// Send response
|
|||
err = sendMessage(msg.Index+1, ws) |
|||
if err != nil { |
|||
break |
|||
} |
|||
} |
|||
} |
Loading…
Reference in new issue