feat: update event mechanism

update EventBroker and EventCenter to use ContractManager.instance.masterStub to deliver event, and use DHTUtil to compute target nodes
This commit is contained in:
Frank.R.Wu 2021-12-06 18:05:48 +08:00
parent 9657f1930f
commit 9137563db8
7 changed files with 114 additions and 73 deletions

View File

@ -4,10 +4,13 @@ import org.bdware.sc.bean.ContractRequest;
import org.bdware.sc.conn.ResultCallback; import org.bdware.sc.conn.ResultCallback;
public interface AgentPeerManagerIntf { public interface AgentPeerManagerIntf {
void transferToOtherNode(String pubKey, String contractID); void transferToOtherNode(String pubKey, String contractID);
void executeByOtherNodeAsync(String pubKey, ContractRequest c, ResultCallback cb); void executeByOtherNodeAsync(String pubKey, ContractRequest c, ResultCallback cb);
boolean hasAgentConnection(String pubKey); boolean hasAgentConnection(String pubKey);
boolean deliverEvent(String pubKey, String event);
String[] listNodes();
} }

View File

@ -4,7 +4,6 @@ import org.bdware.sc.bean.ContractRequest;
import org.bdware.sc.conn.OnHashCallback; import org.bdware.sc.conn.OnHashCallback;
public interface ChainOpener { public interface ChainOpener {
void writeContractResultToLocalAndLedger( void writeContractResultToLocalAndLedger(
String result, String result,
ContractClient client, ContractClient client,

View File

@ -56,7 +56,7 @@ public class EventBroker {
tempTopics.keySet().forEach(topic -> { tempTopics.keySet().forEach(topic -> {
if (tempTopics.get(topic) + EXPIRED_TIME > current) { if (tempTopics.get(topic) + EXPIRED_TIME > current) {
String reqID = String reqID =
ContractManager.instance.nodeCenterConn.getNodeKeyPair().getPublicKeyStr() + ContractManager.instance.nodeCenterConn.getNodeId() +
"_" + System.currentTimeMillis(); "_" + System.currentTimeMillis();
REvent cleanEvent = REvent cleanEvent =
new REvent( new REvent(

View File

@ -1,5 +1,6 @@
package org.bdware.sc.event; package org.bdware.sc.event;
import org.bdware.sc.util.DHTUtil;
import org.bdware.sc.util.JsonUtil; import org.bdware.sc.util.JsonUtil;
import static org.bdware.sc.ContractManager.instance; import static org.bdware.sc.ContractManager.instance;
@ -20,8 +21,8 @@ public class EventCenter {
if (null == instance.nodeCenterConn) { if (null == instance.nodeCenterConn) {
return null; return null;
} }
String[] centers = instance.nodeCenterConn.getClusterByKey(topic, 1); String[] centers = DHTUtil.getClusterByKey(topic, 1);
if (null == centers) { if (null == centers || centers.length == 0) {
return null; return null;
} }
return centers[0]; return centers[0];
@ -41,12 +42,12 @@ public class EventCenter {
topic, topic,
REvent.REventType.SUBSCRIBE, REvent.REventType.SUBSCRIBE,
String.format("{\"subscriber\":\"%s\"}", String.format("{\"subscriber\":\"%s\"}",
instance.nodeCenterConn.getNodeId(null)), instance.nodeCenterConn.getNodeId()),
""); "");
msg.setSemantics(semantics); msg.setSemantics(semantics);
msg.doSignature(instance.nodeCenterConn.getNodeKeyPair()); msg.doSignature(instance.nodeCenterConn.getNodeKeyPair());
String nodeId = getCenterByTopic(topic); String nodeId = getCenterByTopic(topic);
instance.nodeCenterConn.deliverEvent(JsonUtil.toJson(msg), nodeId); instance.masterStub.deliverEvent(JsonUtil.toJson(msg), nodeId);
} }
/** /**
@ -57,11 +58,11 @@ public class EventCenter {
* @return if this node is the center, return false; otherwise, return true * @return if this node is the center, return false; otherwise, return true
*/ */
public boolean deliverEvent(String topic, REvent event) { public boolean deliverEvent(String topic, REvent event) {
if (null == instance.nodeCenterConn) { if (null == instance.masterStub) {
return false; return false;
} }
String nodeId = getCenterByTopic(topic); String nodeId = getCenterByTopic(topic);
return instance.nodeCenterConn.deliverEvent(JsonUtil.toJson(event), nodeId); return instance.masterStub.deliverEvent(JsonUtil.toJson(event), nodeId);
} }
/** /**
@ -71,8 +72,8 @@ public class EventCenter {
* @param target id of the target node * @param target id of the target node
*/ */
public void publishEvent(String eStr, String target) { public void publishEvent(String eStr, String target) {
if (null != instance.nodeCenterConn) { if (null != instance.masterStub) {
instance.nodeCenterConn.deliverEvent(eStr, target); instance.masterStub.deliverEvent(eStr, target);
} }
} }
} }

View File

@ -89,13 +89,4 @@ public class ManagerHandler extends MsgHandler {
ContractRequest cr = JsonUtil.fromJson(msg.arg, ContractRequest.class); ContractRequest cr = JsonUtil.fromJson(msg.arg, ContractRequest.class);
cb.onResult(cm.addDEMember(cr.getContractID(), cr.getArg().getAsString())); cb.onResult(cm.addDEMember(cr.getContractID(), cr.getArg().getAsString()));
} }
/**
* @author Kaidong Wu
*/
@Description("Deliver event message")
public void deliverEMessage(GetMessage msg, ResultCallback cb) {
REvent eMsg = JsonUtil.fromJson(msg.arg, REvent.class);
cb.onResult(cm.deliverEvent(eMsg));
}
} }

View File

@ -1,53 +0,0 @@
package org.bdware.sc.units;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
public class ByteUtil {
// public static byte[] readRest(ByteArrayInputStream bi) {
// byte[] ret = new byte[bi.available()];
// bi.read(ret, 0, ret.length);
// return ret;
// }
public static int readInt(ByteArrayInputStream bi) {
int ret = 0;
ret |= (bi.read() & 0xff);
ret <<= 8;
ret |= (bi.read() & 0xff);
ret <<= 8;
ret |= (bi.read() & 0xff);
ret <<= 8;
ret |= (bi.read() & 0xff);
return ret;
}
public static void writeInt(ByteArrayOutputStream bo, int i) {
bo.write((i >> 24) & 0xff);
bo.write((i >> 16) & 0xff);
bo.write((i >> 8) & 0xff);
bo.write(i & 0xff);
}
public static void writeLong(ByteArrayOutputStream bo, long l) {
for (int i = 56; i >= 0; i -= 8) {
bo.write(((int) (l >> i)) & 0xff);
}
}
public static long readLong(ByteArrayInputStream bi) {
long ret = 0L;
for (int i = 0; i < 8; i++) {
ret <<= 8;
ret |= (bi.read() & 0xff);
}
return ret;
}
public static byte[] readBytes(ByteArrayInputStream bi, int len) {
byte[] ret = new byte[len];
bi.read(ret, 0, len);
return ret;
}
}

View File

@ -0,0 +1,100 @@
package org.bdware.sc.util;
import org.bdware.sc.ContractManager;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class DHTUtil {
private static final Map<String, BigInteger> ID_INTEGER_CACHE = new HashMap<>();
public static String[] getClusterByKey(String key, int k) {
String[] nodes = ContractManager.instance.masterStub.listNodes();
if (nodes.length == 0) {
return null;
}
if (nodes.length <= k) {
return nodes;
}
String hash = HashUtil.sha3ToFixedLen(key, nodes[0].length());
int l = 0, r = nodes.length - 1, m,
h2l = hash.compareTo(nodes[l]), r2h = nodes[r].compareTo(hash),
h2m;
BigInteger bigH = null;
String selected;
do {
if (h2l < 1) { // if the left is bigger than hash
selected = nodes[l];
m = l;
break;
}
if (r2h < 1) { // if the right is smaller than hash
selected = nodes[r];
m = r;
break;
}
if (l + 1 == r) {
BigInteger bigL = getBigInteger(nodes[l]),
bigR = getBigInteger(nodes[r]);
bigH = new BigInteger(hash.substring(2));
if (bigR.subtract(bigH).compareTo(bigH.subtract(bigL)) > -1) {
selected = nodes[l];
m = l;
} else {
selected = nodes[r];
m = r;
}
break;
}
m = (l + r) >> 1;
h2m = hash.compareTo(nodes[m]);
if (h2m < 1) {
r = m;
r2h = -h2m;
} else {
l = m;
h2l = h2m;
}
} while (true);
if (k == 1) {
return new String[]{selected};
}
List<String> ret = new ArrayList<>();
ret.add(selected);
if (k > 1) {
l = m - 1;
r = m + 1;
while (ret.size() < k && (l >= 0 || r < nodes.length)) {
if (l < 0) {
ret.add(nodes[r++]);
} else if (r >= nodes.length) {
ret.add(nodes[l--]);
} else {
if (null == bigH) {
bigH = new BigInteger(hash.substring(2));
}
if (getBigInteger(nodes[r]).subtract(bigH)
.compareTo(bigH.subtract(getBigInteger(nodes[l]))) > -1) {
ret.add(nodes[l--]);
} else {
ret.add(nodes[r++]);
}
}
}
}
return ret.toArray(new String[0]);
}
private static BigInteger getBigInteger(String id) {
String bigInt = id.substring(2);
if (!ID_INTEGER_CACHE.containsKey(bigInt)) {
ID_INTEGER_CACHE.put(bigInt, new BigInteger(bigInt));
}
return ID_INTEGER_CACHE.get(bigInt);
}
}