You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

128 lines
4.7 KiB

import random
import asyncio
import logging
from rpcudp.protocol import RPCProtocol
from kademlia.node import Node
from kademlia.routing import RoutingTable
from kademlia.utils import digest
log = logging.getLogger(__name__)
class KademliaProtocol(RPCProtocol):
def __init__(self, sourceNode, storage, ksize):
RPCProtocol.__init__(self)
self.router = RoutingTable(self, ksize, sourceNode)
self.storage = storage
self.sourceNode = sourceNode
def getRefreshIDs(self):
"""
Get ids to search for to keep old buckets up to date.
"""
ids = []
for bucket in self.router.getLonelyBuckets():
rid = random.randint(*bucket.range).to_bytes(20, byteorder='big')
ids.append(rid)
return ids
def rpc_stun(self, sender):
return sender
def rpc_ping(self, sender, nodeid):
source = Node(nodeid, sender[0], sender[1])
self.welcomeIfNewNode(source)
return self.sourceNode.id
def rpc_store(self, sender, nodeid, key, value):
source = Node(nodeid, sender[0], sender[1])
self.welcomeIfNewNode(source)
log.debug("got a store request from %s, storing '%s'='%s'",
sender, key.hex(), value)
self.storage[key] = value
return True
def rpc_find_node(self, sender, nodeid, key):
log.info("finding neighbors of %i in local table",
int(nodeid.hex(), 16))
source = Node(nodeid, sender[0], sender[1])
self.welcomeIfNewNode(source)
node = Node(key)
neighbors = self.router.findNeighbors(node, exclude=source)
return list(map(tuple, neighbors))
def rpc_find_value(self, sender, nodeid, key):
source = Node(nodeid, sender[0], sender[1])
self.welcomeIfNewNode(source)
value = self.storage.get(key, None)
if value is None:
return self.rpc_find_node(sender, nodeid, key)
return {'value': value}
async def callFindNode(self, nodeToAsk, nodeToFind):
address = (nodeToAsk.ip, nodeToAsk.port)
result = await self.find_node(address, self.sourceNode.id,
nodeToFind.id)
return self.handleCallResponse(result, nodeToAsk)
async def callFindValue(self, nodeToAsk, nodeToFind):
address = (nodeToAsk.ip, nodeToAsk.port)
result = await self.find_value(address, self.sourceNode.id,
nodeToFind.id)
return self.handleCallResponse(result, nodeToAsk)
async def callPing(self, nodeToAsk):
address = (nodeToAsk.ip, nodeToAsk.port)
result = await self.ping(address, self.sourceNode.id)
return self.handleCallResponse(result, nodeToAsk)
async def callStore(self, nodeToAsk, key, value):
address = (nodeToAsk.ip, nodeToAsk.port)
result = await self.store(address, self.sourceNode.id, key, value)
return self.handleCallResponse(result, nodeToAsk)
def welcomeIfNewNode(self, node):
"""
Given a new node, send it all the keys/values it should be storing,
then add it to the routing table.
@param node: A new node that just joined (or that we just found out
about).
Process:
For each key in storage, get k closest nodes. If newnode is closer
than the furtherst in that list, and the node for this server
is closer than the closest in that list, then store the key/value
on the new node (per section 2.5 of the paper)
"""
if not self.router.isNewNode(node):
return
log.info("never seen %s before, adding to router", node)
for key, value in self.storage.items():
keynode = Node(digest(key))
neighbors = self.router.findNeighbors(keynode)
if len(neighbors) > 0:
last = neighbors[-1].distanceTo(keynode)
newNodeClose = node.distanceTo(keynode) < last
first = neighbors[0].distanceTo(keynode)
thisNodeClosest = self.sourceNode.distanceTo(keynode) < first
if len(neighbors) == 0 or (newNodeClose and thisNodeClosest):
asyncio.ensure_future(self.callStore(node, key, value))
self.router.addContact(node)
def handleCallResponse(self, result, node):
"""
If we get a response, add the node to the routing table. If
we get no response, make sure it's removed from the routing table.
"""
if not result[0]:
log.warning("no response from %s, removing from router", node)
self.router.removeContact(node)
return result
log.info("got successful response from %s", node)
self.welcomeIfNewNode(node)
return result