mirror of https://github.com/libp2p/go-libp2p.git
Steven Allen
7 years ago
committed by
GitHub
15 changed files with 799 additions and 0 deletions
@ -1,3 +1,4 @@ |
|||
*.swp |
|||
examples/echo/echo |
|||
examples/multicodecs/multicodecs |
|||
.idea |
@ -0,0 +1,3 @@ |
|||
# This is the official list of authors for copyright purposes. |
|||
|
|||
Aviv Eyal <aviveyal07@gmail.com> |
@ -0,0 +1,21 @@ |
|||
The MIT License (MIT) |
|||
|
|||
Copyright (c) 2017 Aviv Eyal |
|||
|
|||
Permission is hereby granted, free of charge, to any person obtaining a copy |
|||
of this software and associated documentation files (the "Software"), to deal |
|||
in the Software without restriction, including without limitation the rights |
|||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell |
|||
copies of the Software, and to permit persons to whom the Software is |
|||
furnished to do so, subject to the following conditions: |
|||
|
|||
The above copyright notice and this permission notice shall be included in |
|||
all copies or substantial portions of the Software. |
|||
|
|||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR |
|||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, |
|||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE |
|||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER |
|||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, |
|||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN |
|||
THE SOFTWARE. |
@ -0,0 +1,62 @@ |
|||
# Protocol Multiplexing using rpc-style multicodecs, protobufs with libp2p |
|||
|
|||
This examples shows how to use multicodecs (i.e. protobufs) to encode and transmit information between LibP2P hosts using LibP2P Streams. |
|||
Multicodecs present a common interface, making it very easy to swap the codec implementation if needed. |
|||
This example expects that you area already familiar with the [echo example](https://github.com/libp2p/go-libp2p/tree/master/examples/echo). |
|||
|
|||
## Build |
|||
|
|||
Install gx: |
|||
```sh |
|||
> go get -u github.com/whyrusleeping/gx |
|||
|
|||
``` |
|||
|
|||
Run GX from the root libp2p source dir: |
|||
```sh |
|||
>gx install |
|||
``` |
|||
|
|||
Build libp2p: |
|||
```sh |
|||
> make deps |
|||
> make |
|||
``` |
|||
|
|||
Run from `multipro` directory |
|||
|
|||
```sh |
|||
> go build |
|||
``` |
|||
|
|||
|
|||
## Usage |
|||
|
|||
```sh |
|||
> ./multipro |
|||
|
|||
``` |
|||
|
|||
## Details |
|||
|
|||
The example creates two LibP2P Hosts supporting 2 protocols: ping and echo. |
|||
|
|||
Each protocol consists RPC-style requests and responses and each request and response is a typed protobufs message (and a go data object). |
|||
|
|||
This is a different pattern then defining a whole p2p protocol as one protobuf message with lots of optional fields (as can be observed in various p2p-lib protocols using protobufs such as dht). |
|||
|
|||
The example shows how to match async received responses with their requests. This is useful when processing a response requires access to the request data. |
|||
|
|||
The idea is to use lib-p2p protocol multiplexing on a per-message basis. |
|||
|
|||
### Features |
|||
1. 2 fully implemented protocols using an RPC-like request-response pattern - Ping and Echo |
|||
2. Scaffolding for quickly implementing new app-level versioned RPC-like protocols |
|||
3. Full authentication of incoming message data by author (who might not be the message's sender peer) |
|||
4. Base p2p format in protobufs with fields shared by all protocol messages |
|||
5. Full access to request data when processing a response. |
|||
|
|||
## Author |
|||
@avive |
|||
|
|||
|
@ -0,0 +1,157 @@ |
|||
package main |
|||
|
|||
import ( |
|||
"bufio" |
|||
"context" |
|||
"fmt" |
|||
"log" |
|||
|
|||
inet "github.com/libp2p/go-libp2p-net" |
|||
|
|||
"github.com/libp2p/go-libp2p-host" |
|||
p2p "github.com/libp2p/go-libp2p/examples/multipro/pb" |
|||
protobufCodec "github.com/multiformats/go-multicodec/protobuf" |
|||
uuid "github.com/satori/go.uuid" |
|||
) |
|||
|
|||
// pattern: /protocol-name/request-or-response-message/version
|
|||
const echoRequest = "/echo/echoreq/0.0.1" |
|||
const echoResponse = "/echo/echoresp/0.0.1" |
|||
|
|||
type EchoProtocol struct { |
|||
node *Node // local host
|
|||
requests map[string]*p2p.EchoRequest // used to access request data from response handlers
|
|||
done chan bool // only for demo purposes to hold main from terminating
|
|||
} |
|||
|
|||
func NewEchoProtocol(node *Node, done chan bool) *EchoProtocol { |
|||
e := EchoProtocol{node: node, requests: make(map[string]*p2p.EchoRequest), done: done} |
|||
node.SetStreamHandler(echoRequest, e.onEchoRequest) |
|||
node.SetStreamHandler(echoResponse, e.onEchoResponse) |
|||
|
|||
// design note: to implement fire-and-forget style messages you may just skip specifying a response callback.
|
|||
// a fire-and-forget message will just include a request and not specify a response object
|
|||
|
|||
return &e |
|||
} |
|||
|
|||
// remote peer requests handler
|
|||
func (e *EchoProtocol) onEchoRequest(s inet.Stream) { |
|||
// get request data
|
|||
data := &p2p.EchoRequest{} |
|||
decoder := protobufCodec.Multicodec(nil).Decoder(bufio.NewReader(s)) |
|||
err := decoder.Decode(data) |
|||
if err != nil { |
|||
log.Println(err) |
|||
return |
|||
} |
|||
|
|||
log.Printf("%s: Received echo request from %s. Message: %s", s.Conn().LocalPeer(), s.Conn().RemotePeer(), data.Message) |
|||
|
|||
valid := e.node.authenticateMessage(data, data.MessageData) |
|||
|
|||
if !valid { |
|||
log.Println("Failed to authenticate message") |
|||
return |
|||
} |
|||
|
|||
log.Printf("%s: Sending echo response to %s. Message id: %s...", s.Conn().LocalPeer(), s.Conn().RemotePeer(), data.MessageData.Id) |
|||
|
|||
// send response to the request using the message string he provided
|
|||
|
|||
resp := &p2p.EchoResponse{ |
|||
MessageData: e.node.NewMessageData(data.MessageData.Id, false), |
|||
Message: data.Message} |
|||
|
|||
// sign the data
|
|||
signature, err := e.node.signProtoMessage(resp) |
|||
if err != nil { |
|||
log.Println("failed to sign response") |
|||
return |
|||
} |
|||
|
|||
// add the signature to the message
|
|||
resp.MessageData.Sign = string(signature) |
|||
|
|||
s, respErr := e.node.NewStream(context.Background(), s.Conn().RemotePeer(), echoResponse) |
|||
if respErr != nil { |
|||
log.Println(respErr) |
|||
return |
|||
} |
|||
|
|||
ok := e.node.sendProtoMessage(resp, s) |
|||
|
|||
if ok { |
|||
log.Printf("%s: Echo response to %s sent.", s.Conn().LocalPeer().String(), s.Conn().RemotePeer().String()) |
|||
} |
|||
} |
|||
|
|||
// remote echo response handler
|
|||
func (e *EchoProtocol) onEchoResponse(s inet.Stream) { |
|||
data := &p2p.EchoResponse{} |
|||
decoder := protobufCodec.Multicodec(nil).Decoder(bufio.NewReader(s)) |
|||
err := decoder.Decode(data) |
|||
if err != nil { |
|||
return |
|||
} |
|||
|
|||
// authenticate message content
|
|||
valid := e.node.authenticateMessage(data, data.MessageData) |
|||
|
|||
if !valid { |
|||
log.Println("Failed to authenticate message") |
|||
return |
|||
} |
|||
|
|||
// locate request data and remove it if found
|
|||
req, ok := e.requests[data.MessageData.Id] |
|||
if ok { |
|||
// remove request from map as we have processed it here
|
|||
delete(e.requests, data.MessageData.Id) |
|||
} else { |
|||
log.Println("Failed to locate request data boject for response") |
|||
return |
|||
} |
|||
|
|||
if req.Message != data.Message { |
|||
log.Fatalln("Expected echo to respond with request message") |
|||
} |
|||
|
|||
log.Printf("%s: Received echo response from %s. Message id:%s. Message: %s.", s.Conn().LocalPeer(), s.Conn().RemotePeer(), data.MessageData.Id, data.Message) |
|||
e.done <- true |
|||
} |
|||
|
|||
func (e *EchoProtocol) Echo(host host.Host) bool { |
|||
log.Printf("%s: Sending echo to: %s....", e.node.ID(), host.ID()) |
|||
|
|||
// create message data
|
|||
req := &p2p.EchoRequest{ |
|||
MessageData: e.node.NewMessageData(uuid.Must(uuid.NewV4()).String(), false), |
|||
Message: fmt.Sprintf("Echo from %s", e.node.ID())} |
|||
|
|||
signature, err := e.node.signProtoMessage(req) |
|||
if err != nil { |
|||
log.Println("failed to sign message") |
|||
return false |
|||
} |
|||
|
|||
// add the signature to the message
|
|||
req.MessageData.Sign = string(signature) |
|||
|
|||
s, err := e.node.NewStream(context.Background(), host.ID(), echoRequest) |
|||
if err != nil { |
|||
log.Println(err) |
|||
return false |
|||
} |
|||
|
|||
ok := e.node.sendProtoMessage(req, s) |
|||
|
|||
if !ok { |
|||
return false |
|||
} |
|||
|
|||
// store request so response handler has access to it
|
|||
e.requests[req.MessageData.Id] = req |
|||
log.Printf("%s: Echo to: %s was sent. Message Id: %s, Message: %s", e.node.ID(), host.ID(), req.MessageData.Id, req.Message) |
|||
return true |
|||
} |
@ -0,0 +1,59 @@ |
|||
package main |
|||
|
|||
import ( |
|||
"context" |
|||
"fmt" |
|||
"log" |
|||
"math/rand" |
|||
|
|||
crypto "github.com/libp2p/go-libp2p-crypto" |
|||
peer "github.com/libp2p/go-libp2p-peer" |
|||
ps "github.com/libp2p/go-libp2p-peerstore" |
|||
swarm "github.com/libp2p/go-libp2p-swarm" |
|||
bhost "github.com/libp2p/go-libp2p/p2p/host/basic" |
|||
ma "github.com/multiformats/go-multiaddr" |
|||
) |
|||
|
|||
// helper method - create a lib-p2p host to listen on a port
|
|||
func makeRandomNode(port int, done chan bool) *Node { |
|||
// Ignoring most errors for brevity
|
|||
// See echo example for more details and better implementation
|
|||
priv, pub, _ := crypto.GenerateKeyPair(crypto.Secp256k1, 256) |
|||
pid, _ := peer.IDFromPublicKey(pub) |
|||
listen, _ := ma.NewMultiaddr(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", port)) |
|||
peerStore := ps.NewPeerstore() |
|||
peerStore.AddPrivKey(pid, priv) |
|||
peerStore.AddPubKey(pid, pub) |
|||
n, _ := swarm.NewNetwork(context.Background(), []ma.Multiaddr{listen}, pid, peerStore, nil) |
|||
host := bhost.New(n) |
|||
|
|||
return NewNode(host, done) |
|||
} |
|||
|
|||
func main() { |
|||
// Choose random ports between 10000-10100
|
|||
rand.Seed(666) |
|||
port1 := rand.Intn(100) + 10000 |
|||
port2 := port1 + 1 |
|||
|
|||
done := make(chan bool, 1) |
|||
|
|||
// Make 2 hosts
|
|||
h1 := makeRandomNode(port1, done) |
|||
h2 := makeRandomNode(port2, done) |
|||
h1.Peerstore().AddAddrs(h2.ID(), h2.Addrs(), ps.PermanentAddrTTL) |
|||
h2.Peerstore().AddAddrs(h1.ID(), h1.Addrs(), ps.PermanentAddrTTL) |
|||
|
|||
log.Printf("This is a conversation between %s and %s\n", h1.ID(), h2.ID()) |
|||
|
|||
// send messages using the protocols
|
|||
h1.Ping(h2.Host) |
|||
h2.Ping(h1.Host) |
|||
h1.Echo(h2.Host) |
|||
h2.Echo(h1.Host) |
|||
|
|||
// block until all responses have been processed
|
|||
for i := 0; i < 4; i++ { |
|||
<-done |
|||
} |
|||
} |
@ -0,0 +1,150 @@ |
|||
package main |
|||
|
|||
import ( |
|||
"bufio" |
|||
"log" |
|||
"time" |
|||
|
|||
"github.com/gogo/protobuf/proto" |
|||
crypto "github.com/libp2p/go-libp2p-crypto" |
|||
host "github.com/libp2p/go-libp2p-host" |
|||
inet "github.com/libp2p/go-libp2p-net" |
|||
peer "github.com/libp2p/go-libp2p-peer" |
|||
p2p "github.com/libp2p/go-libp2p/examples/multipro/pb" |
|||
protobufCodec "github.com/multiformats/go-multicodec/protobuf" |
|||
) |
|||
|
|||
// node client version
|
|||
const clientVersion = "go-p2p-node/0.0.1" |
|||
|
|||
// Node type - a p2p host implementing one or more p2p protocols
|
|||
type Node struct { |
|||
host.Host // lib-p2p host
|
|||
*PingProtocol // ping protocol impl
|
|||
*EchoProtocol // echo protocol impl
|
|||
// add other protocols here...
|
|||
} |
|||
|
|||
// Create a new node with its implemented protocols
|
|||
func NewNode(host host.Host, done chan bool) *Node { |
|||
node := &Node{Host: host} |
|||
node.PingProtocol = NewPingProtocol(node, done) |
|||
node.EchoProtocol = NewEchoProtocol(node, done) |
|||
return node |
|||
} |
|||
|
|||
// Authenticate incoming p2p message
|
|||
// message: a protobufs go data object
|
|||
// data: common p2p message data
|
|||
func (n *Node) authenticateMessage(message proto.Message, data *p2p.MessageData) bool { |
|||
// store a temp ref to signature and remove it from message data
|
|||
// sign is a string to allow easy reset to zero-value (empty string)
|
|||
sign := data.Sign |
|||
data.Sign = "" |
|||
|
|||
// marshall data without the signature to protobufs3 binary format
|
|||
bin, err := proto.Marshal(message) |
|||
if err != nil { |
|||
log.Println(err, "failed to marshal pb message") |
|||
return false |
|||
} |
|||
|
|||
// restore sig in message data (for possible future use)
|
|||
data.Sign = sign |
|||
|
|||
// restore peer id binary format from base58 encoded node id data
|
|||
peerId, err := peer.IDB58Decode(data.NodeId) |
|||
if err != nil { |
|||
log.Println(err, "Failed to decode node id from base58") |
|||
return false |
|||
} |
|||
|
|||
// verify the data was authored by the signing peer identified by the public key
|
|||
// and signature included in the message
|
|||
return n.verifyData(bin, []byte(sign), peerId, data.NodePubKey) |
|||
} |
|||
|
|||
// sign an outgoing p2p message payload
|
|||
func (n *Node) signProtoMessage(message proto.Message) ([]byte, error) { |
|||
data, err := proto.Marshal(message) |
|||
if err != nil { |
|||
return nil, err |
|||
} |
|||
return n.signData(data) |
|||
} |
|||
|
|||
// sign binary data using the local node's private key
|
|||
func (n *Node) signData(data []byte) ([]byte, error) { |
|||
key := n.Peerstore().PrivKey(n.ID()) |
|||
res, err := key.Sign(data) |
|||
return res, err |
|||
} |
|||
|
|||
// Verify incoming p2p message data integrity
|
|||
// data: data to verify
|
|||
// signature: author signature provided in the message payload
|
|||
// peerId: author peer id from the message payload
|
|||
// pubKeyData: author public key from the message payload
|
|||
func (n *Node) verifyData(data []byte, signature []byte, peerId peer.ID, pubKeyData []byte) bool { |
|||
key, err := crypto.UnmarshalPublicKey(pubKeyData) |
|||
if err != nil { |
|||
log.Println(err, "Failed to extract key from message key data") |
|||
return false |
|||
} |
|||
|
|||
// extract node id from the provided public key
|
|||
idFromKey, err := peer.IDFromPublicKey(key) |
|||
|
|||
if err != nil { |
|||
log.Println(err, "Failed to extract peer id from public key") |
|||
return false |
|||
} |
|||
|
|||
// verify that message author node id matches the provided node public key
|
|||
if idFromKey != peerId { |
|||
log.Println(err, "Node id and provided public key mismatch") |
|||
return false |
|||
} |
|||
|
|||
res, err := key.Verify(data, signature) |
|||
if err != nil { |
|||
log.Println(err, "Error authenticating data") |
|||
return false |
|||
} |
|||
|
|||
return res |
|||
} |
|||
|
|||
// helper method - generate message data shared between all node's p2p protocols
|
|||
// messageId: unique for requests, copied from request for responses
|
|||
func (n *Node) NewMessageData(messageId string, gossip bool) *p2p.MessageData { |
|||
// Add protobufs bin data for message author public key
|
|||
// this is useful for authenticating messages forwarded by a node authored by another node
|
|||
nodePubKey, err := n.Peerstore().PubKey(n.ID()).Bytes() |
|||
|
|||
if err != nil { |
|||
panic("Failed to get public key for sender from local peer store.") |
|||
} |
|||
|
|||
return &p2p.MessageData{ClientVersion: clientVersion, |
|||
NodeId: peer.IDB58Encode(n.ID()), |
|||
NodePubKey: nodePubKey, |
|||
Timestamp: time.Now().Unix(), |
|||
Id: messageId, |
|||
Gossip: gossip} |
|||
} |
|||
|
|||
// helper method - writes a protobuf go data object to a network stream
|
|||
// data: reference of protobuf go data object to send (not the object itself)
|
|||
// s: network stream to write the data to
|
|||
func (n *Node) sendProtoMessage(data proto.Message, s inet.Stream) bool { |
|||
writer := bufio.NewWriter(s) |
|||
enc := protobufCodec.Multicodec(nil).Encoder(writer) |
|||
err := enc.Encode(data) |
|||
if err != nil { |
|||
log.Println(err) |
|||
return false |
|||
} |
|||
writer.Flush() |
|||
return true |
|||
} |
@ -0,0 +1,121 @@ |
|||
// Code generated by protoc-gen-gogo.
|
|||
// source: p2p.proto
|
|||
// DO NOT EDIT!
|
|||
|
|||
/* |
|||
Package protocols_p2p is a generated protocol buffer package. |
|||
|
|||
It is generated from these files: |
|||
p2p.proto |
|||
|
|||
It has these top-level messages: |
|||
MessageData |
|||
PingRequest |
|||
PingResponse |
|||
EchoRequest |
|||
EchoResponse |
|||
*/ |
|||
package protocols_p2p |
|||
|
|||
import proto "github.com/gogo/protobuf/proto" |
|||
import fmt "fmt" |
|||
import math "math" |
|||
|
|||
// Reference imports to suppress errors if they are not otherwise used.
|
|||
var _ = proto.Marshal |
|||
var _ = fmt.Errorf |
|||
var _ = math.Inf |
|||
|
|||
// designed to be shared between all app protocols
|
|||
type MessageData struct { |
|||
// shared between all requests
|
|||
ClientVersion string `protobuf:"bytes,1,opt,name=clientVersion,proto3" json:"clientVersion,omitempty"` |
|||
Timestamp int64 `protobuf:"varint,2,opt,name=timestamp,proto3" json:"timestamp,omitempty"` |
|||
Id string `protobuf:"bytes,3,opt,name=id,proto3" json:"id,omitempty"` |
|||
Gossip bool `protobuf:"varint,4,opt,name=gossip,proto3" json:"gossip,omitempty"` |
|||
NodeId string `protobuf:"bytes,5,opt,name=nodeId,proto3" json:"nodeId,omitempty"` |
|||
NodePubKey []byte `protobuf:"bytes,6,opt,name=nodePubKey,proto3" json:"nodePubKey,omitempty"` |
|||
Sign string `protobuf:"bytes,7,opt,name=sign,proto3" json:"sign,omitempty"` |
|||
} |
|||
|
|||
func (m *MessageData) Reset() { *m = MessageData{} } |
|||
func (m *MessageData) String() string { return proto.CompactTextString(m) } |
|||
func (*MessageData) ProtoMessage() {} |
|||
|
|||
// a protocol define a set of reuqest and responses
|
|||
type PingRequest struct { |
|||
MessageData *MessageData `protobuf:"bytes,1,opt,name=messageData" json:"messageData,omitempty"` |
|||
// method specific data
|
|||
Message string `protobuf:"bytes,2,opt,name=message,proto3" json:"message,omitempty"` |
|||
} |
|||
|
|||
func (m *PingRequest) Reset() { *m = PingRequest{} } |
|||
func (m *PingRequest) String() string { return proto.CompactTextString(m) } |
|||
func (*PingRequest) ProtoMessage() {} |
|||
|
|||
func (m *PingRequest) GetMessageData() *MessageData { |
|||
if m != nil { |
|||
return m.MessageData |
|||
} |
|||
return nil |
|||
} |
|||
|
|||
type PingResponse struct { |
|||
MessageData *MessageData `protobuf:"bytes,1,opt,name=messageData" json:"messageData,omitempty"` |
|||
// response specific data
|
|||
Message string `protobuf:"bytes,2,opt,name=message,proto3" json:"message,omitempty"` |
|||
} |
|||
|
|||
func (m *PingResponse) Reset() { *m = PingResponse{} } |
|||
func (m *PingResponse) String() string { return proto.CompactTextString(m) } |
|||
func (*PingResponse) ProtoMessage() {} |
|||
|
|||
func (m *PingResponse) GetMessageData() *MessageData { |
|||
if m != nil { |
|||
return m.MessageData |
|||
} |
|||
return nil |
|||
} |
|||
|
|||
// a protocol define a set of reuqest and responses
|
|||
type EchoRequest struct { |
|||
MessageData *MessageData `protobuf:"bytes,1,opt,name=messageData" json:"messageData,omitempty"` |
|||
// method specific data
|
|||
Message string `protobuf:"bytes,2,opt,name=message,proto3" json:"message,omitempty"` |
|||
} |
|||
|
|||
func (m *EchoRequest) Reset() { *m = EchoRequest{} } |
|||
func (m *EchoRequest) String() string { return proto.CompactTextString(m) } |
|||
func (*EchoRequest) ProtoMessage() {} |
|||
|
|||
func (m *EchoRequest) GetMessageData() *MessageData { |
|||
if m != nil { |
|||
return m.MessageData |
|||
} |
|||
return nil |
|||
} |
|||
|
|||
type EchoResponse struct { |
|||
MessageData *MessageData `protobuf:"bytes,1,opt,name=messageData" json:"messageData,omitempty"` |
|||
// response specific data
|
|||
Message string `protobuf:"bytes,2,opt,name=message,proto3" json:"message,omitempty"` |
|||
} |
|||
|
|||
func (m *EchoResponse) Reset() { *m = EchoResponse{} } |
|||
func (m *EchoResponse) String() string { return proto.CompactTextString(m) } |
|||
func (*EchoResponse) ProtoMessage() {} |
|||
|
|||
func (m *EchoResponse) GetMessageData() *MessageData { |
|||
if m != nil { |
|||
return m.MessageData |
|||
} |
|||
return nil |
|||
} |
|||
|
|||
func init() { |
|||
proto.RegisterType((*MessageData)(nil), "protocols.p2p.MessageData") |
|||
proto.RegisterType((*PingRequest)(nil), "protocols.p2p.PingRequest") |
|||
proto.RegisterType((*PingResponse)(nil), "protocols.p2p.PingResponse") |
|||
proto.RegisterType((*EchoRequest)(nil), "protocols.p2p.EchoRequest") |
|||
proto.RegisterType((*EchoResponse)(nil), "protocols.p2p.EchoResponse") |
|||
} |
@ -0,0 +1,56 @@ |
|||
syntax = "proto3"; |
|||
|
|||
package protocols.p2p; |
|||
|
|||
// designed to be shared between all app protocols |
|||
message MessageData { |
|||
// shared between all requests |
|||
string clientVersion = 1; // client version |
|||
int64 timestamp = 2; // unix time |
|||
string id = 3; // allows requesters to use request data when processing a response |
|||
bool gossip = 4; // true to have receiver peer gossip the message to neighbors |
|||
string nodeId = 5; // id of node that created the message (not the peer that may have sent it). =base58(mh(sha256(nodePubKey))) |
|||
bytes nodePubKey = 6; // Authoring node Secp256k1 public key (32bytes) - protobufs serielized |
|||
string sign = 7; // signature of message data + method specific data by message authoring node. format: string([]bytes) |
|||
} |
|||
|
|||
//// ping protocol |
|||
|
|||
// a protocol define a set of reuqest and responses |
|||
message PingRequest { |
|||
MessageData messageData = 1; |
|||
|
|||
// method specific data |
|||
string message = 2; |
|||
// add any data here.... |
|||
} |
|||
|
|||
message PingResponse { |
|||
MessageData messageData = 1; |
|||
|
|||
// response specific data |
|||
string message = 2; |
|||
|
|||
// ... add any additional message data here |
|||
} |
|||
|
|||
//// echo protocol |
|||
|
|||
// a protocol define a set of reuqest and responses |
|||
message EchoRequest { |
|||
MessageData messageData = 1; |
|||
|
|||
// method specific data |
|||
string message = 2; |
|||
|
|||
// add any additional message data here.... |
|||
} |
|||
|
|||
message EchoResponse { |
|||
MessageData messageData = 1; |
|||
|
|||
// response specific data |
|||
string message = 2; |
|||
|
|||
// ... add any additional message data here.... |
|||
} |
@ -0,0 +1,4 @@ |
|||
# building p2p.pb.go: |
|||
protoc --gogo_out=. --proto_path=../../../../../../:/usr/local/opt/protobuf/include:. *.proto |
|||
|
|||
|
@ -0,0 +1,148 @@ |
|||
package main |
|||
|
|||
import ( |
|||
"bufio" |
|||
"context" |
|||
"fmt" |
|||
"log" |
|||
|
|||
"github.com/libp2p/go-libp2p-host" |
|||
inet "github.com/libp2p/go-libp2p-net" |
|||
p2p "github.com/libp2p/go-libp2p/examples/multipro/pb" |
|||
protobufCodec "github.com/multiformats/go-multicodec/protobuf" |
|||
uuid "github.com/satori/go.uuid" |
|||
) |
|||
|
|||
// pattern: /protocol-name/request-or-response-message/version
|
|||
const pingRequest = "/ping/pingreq/0.0.1" |
|||
const pingResponse = "/ping/pingresp/0.0.1" |
|||
|
|||
// PingProtocol type
|
|||
type PingProtocol struct { |
|||
node *Node // local host
|
|||
requests map[string]*p2p.PingRequest // used to access request data from response handlers
|
|||
done chan bool // only for demo purposes to stop main from terminating
|
|||
} |
|||
|
|||
func NewPingProtocol(node *Node, done chan bool) *PingProtocol { |
|||
p := &PingProtocol{node: node, requests: make(map[string]*p2p.PingRequest), done: done} |
|||
node.SetStreamHandler(pingRequest, p.onPingRequest) |
|||
node.SetStreamHandler(pingResponse, p.onPingResponse) |
|||
return p |
|||
} |
|||
|
|||
// remote peer requests handler
|
|||
func (p *PingProtocol) onPingRequest(s inet.Stream) { |
|||
|
|||
// get request data
|
|||
data := &p2p.PingRequest{} |
|||
decoder := protobufCodec.Multicodec(nil).Decoder(bufio.NewReader(s)) |
|||
err := decoder.Decode(data) |
|||
if err != nil { |
|||
log.Println(err) |
|||
return |
|||
} |
|||
|
|||
log.Printf("%s: Received ping request from %s. Message: %s", s.Conn().LocalPeer(), s.Conn().RemotePeer(), data.Message) |
|||
|
|||
valid := p.node.authenticateMessage(data, data.MessageData) |
|||
|
|||
if !valid { |
|||
log.Println("Failed to authenticate message") |
|||
return |
|||
} |
|||
|
|||
// generate response message
|
|||
log.Printf("%s: Sending ping response to %s. Message id: %s...", s.Conn().LocalPeer(), s.Conn().RemotePeer(), data.MessageData.Id) |
|||
|
|||
resp := &p2p.PingResponse{MessageData: p.node.NewMessageData(data.MessageData.Id, false), |
|||
Message: fmt.Sprintf("Ping response from %s", p.node.ID())} |
|||
|
|||
// sign the data
|
|||
signature, err := p.node.signProtoMessage(resp) |
|||
if err != nil { |
|||
log.Println("failed to sign response") |
|||
return |
|||
} |
|||
|
|||
// add the signature to the message
|
|||
resp.MessageData.Sign = string(signature) |
|||
|
|||
// send the response
|
|||
s, respErr := p.node.NewStream(context.Background(), s.Conn().RemotePeer(), pingResponse) |
|||
if respErr != nil { |
|||
log.Println(respErr) |
|||
return |
|||
} |
|||
|
|||
ok := p.node.sendProtoMessage(resp, s) |
|||
|
|||
if ok { |
|||
log.Printf("%s: Ping response to %s sent.", s.Conn().LocalPeer().String(), s.Conn().RemotePeer().String()) |
|||
} |
|||
} |
|||
|
|||
// remote ping response handler
|
|||
func (p *PingProtocol) onPingResponse(s inet.Stream) { |
|||
data := &p2p.PingResponse{} |
|||
decoder := protobufCodec.Multicodec(nil).Decoder(bufio.NewReader(s)) |
|||
err := decoder.Decode(data) |
|||
if err != nil { |
|||
return |
|||
} |
|||
|
|||
valid := p.node.authenticateMessage(data, data.MessageData) |
|||
|
|||
if !valid { |
|||
log.Println("Failed to authenticate message") |
|||
return |
|||
} |
|||
|
|||
// locate request data and remove it if found
|
|||
_, ok := p.requests[data.MessageData.Id] |
|||
if ok { |
|||
// remove request from map as we have processed it here
|
|||
delete(p.requests, data.MessageData.Id) |
|||
} else { |
|||
log.Println("Failed to locate request data boject for response") |
|||
return |
|||
} |
|||
|
|||
log.Printf("%s: Received ping response from %s. Message id:%s. Message: %s.", s.Conn().LocalPeer(), s.Conn().RemotePeer(), data.MessageData.Id, data.Message) |
|||
p.done <- true |
|||
} |
|||
|
|||
func (p *PingProtocol) Ping(host host.Host) bool { |
|||
log.Printf("%s: Sending ping to: %s....", p.node.ID(), host.ID()) |
|||
|
|||
// create message data
|
|||
req := &p2p.PingRequest{MessageData: p.node.NewMessageData(uuid.Must(uuid.NewV4()).String(), false), |
|||
Message: fmt.Sprintf("Ping from %s", p.node.ID())} |
|||
|
|||
// sign the data
|
|||
signature, err := p.node.signProtoMessage(req) |
|||
if err != nil { |
|||
log.Println("failed to sign pb data") |
|||
return false |
|||
} |
|||
|
|||
// add the signature to the message
|
|||
req.MessageData.Sign = string(signature) |
|||
|
|||
s, err := p.node.NewStream(context.Background(), host.ID(), pingRequest) |
|||
if err != nil { |
|||
log.Println(err) |
|||
return false |
|||
} |
|||
|
|||
ok := p.node.sendProtoMessage(req, s) |
|||
|
|||
if !ok { |
|||
return false |
|||
} |
|||
|
|||
// store ref request so response handler has access to it
|
|||
p.requests[req.MessageData.Id] = req |
|||
log.Printf("%s: Ping to: %s was sent. Message Id: %s, Message: %s", p.node.ID(), host.ID(), req.MessageData.Id, req.Message) |
|||
return true |
|||
} |
Loading…
Reference in new issue