From 9137563db8c9607c499c7257cc941c6942737d1e Mon Sep 17 00:00:00 2001 From: "Frank.R.Wu" Date: Mon, 6 Dec 2021 18:05:48 +0800 Subject: [PATCH] feat: update event mechanism update EventBroker and EventCenter to use ContractManager.instance.masterStub to deliver event, and use DHTUtil to compute target nodes --- .../org/bdware/sc/AgentPeerManagerIntf.java | 5 +- src/main/java/org/bdware/sc/ChainOpener.java | 1 - .../java/org/bdware/sc/event/EventBroker.java | 2 +- .../java/org/bdware/sc/event/EventCenter.java | 17 +-- .../org/bdware/sc/handler/ManagerHandler.java | 9 -- .../java/org/bdware/sc/units/ByteUtil.java | 53 ---------- src/main/java/org/bdware/sc/util/DHTUtil.java | 100 ++++++++++++++++++ 7 files changed, 114 insertions(+), 73 deletions(-) delete mode 100644 src/main/java/org/bdware/sc/units/ByteUtil.java create mode 100644 src/main/java/org/bdware/sc/util/DHTUtil.java diff --git a/src/main/java/org/bdware/sc/AgentPeerManagerIntf.java b/src/main/java/org/bdware/sc/AgentPeerManagerIntf.java index 2f25058..639bb57 100644 --- a/src/main/java/org/bdware/sc/AgentPeerManagerIntf.java +++ b/src/main/java/org/bdware/sc/AgentPeerManagerIntf.java @@ -4,10 +4,13 @@ import org.bdware.sc.bean.ContractRequest; import org.bdware.sc.conn.ResultCallback; public interface AgentPeerManagerIntf { - void transferToOtherNode(String pubKey, String contractID); void executeByOtherNodeAsync(String pubKey, ContractRequest c, ResultCallback cb); boolean hasAgentConnection(String pubKey); + + boolean deliverEvent(String pubKey, String event); + + String[] listNodes(); } diff --git a/src/main/java/org/bdware/sc/ChainOpener.java b/src/main/java/org/bdware/sc/ChainOpener.java index c62b6b6..9e8c427 100644 --- a/src/main/java/org/bdware/sc/ChainOpener.java +++ b/src/main/java/org/bdware/sc/ChainOpener.java @@ -4,7 +4,6 @@ import org.bdware.sc.bean.ContractRequest; import org.bdware.sc.conn.OnHashCallback; public interface ChainOpener { - void writeContractResultToLocalAndLedger( String result, ContractClient client, diff --git a/src/main/java/org/bdware/sc/event/EventBroker.java b/src/main/java/org/bdware/sc/event/EventBroker.java index 4b203e6..2cad159 100644 --- a/src/main/java/org/bdware/sc/event/EventBroker.java +++ b/src/main/java/org/bdware/sc/event/EventBroker.java @@ -56,7 +56,7 @@ public class EventBroker { tempTopics.keySet().forEach(topic -> { if (tempTopics.get(topic) + EXPIRED_TIME > current) { String reqID = - ContractManager.instance.nodeCenterConn.getNodeKeyPair().getPublicKeyStr() + + ContractManager.instance.nodeCenterConn.getNodeId() + "_" + System.currentTimeMillis(); REvent cleanEvent = new REvent( diff --git a/src/main/java/org/bdware/sc/event/EventCenter.java b/src/main/java/org/bdware/sc/event/EventCenter.java index ab5c6c7..dc95d0e 100644 --- a/src/main/java/org/bdware/sc/event/EventCenter.java +++ b/src/main/java/org/bdware/sc/event/EventCenter.java @@ -1,5 +1,6 @@ package org.bdware.sc.event; +import org.bdware.sc.util.DHTUtil; import org.bdware.sc.util.JsonUtil; import static org.bdware.sc.ContractManager.instance; @@ -20,8 +21,8 @@ public class EventCenter { if (null == instance.nodeCenterConn) { return null; } - String[] centers = instance.nodeCenterConn.getClusterByKey(topic, 1); - if (null == centers) { + String[] centers = DHTUtil.getClusterByKey(topic, 1); + if (null == centers || centers.length == 0) { return null; } return centers[0]; @@ -41,12 +42,12 @@ public class EventCenter { topic, REvent.REventType.SUBSCRIBE, String.format("{\"subscriber\":\"%s\"}", - instance.nodeCenterConn.getNodeId(null)), + instance.nodeCenterConn.getNodeId()), ""); msg.setSemantics(semantics); msg.doSignature(instance.nodeCenterConn.getNodeKeyPair()); String nodeId = getCenterByTopic(topic); - instance.nodeCenterConn.deliverEvent(JsonUtil.toJson(msg), nodeId); + instance.masterStub.deliverEvent(JsonUtil.toJson(msg), nodeId); } /** @@ -57,11 +58,11 @@ public class EventCenter { * @return if this node is the center, return false; otherwise, return true */ public boolean deliverEvent(String topic, REvent event) { - if (null == instance.nodeCenterConn) { + if (null == instance.masterStub) { return false; } String nodeId = getCenterByTopic(topic); - return instance.nodeCenterConn.deliverEvent(JsonUtil.toJson(event), nodeId); + return instance.masterStub.deliverEvent(JsonUtil.toJson(event), nodeId); } /** @@ -71,8 +72,8 @@ public class EventCenter { * @param target id of the target node */ public void publishEvent(String eStr, String target) { - if (null != instance.nodeCenterConn) { - instance.nodeCenterConn.deliverEvent(eStr, target); + if (null != instance.masterStub) { + instance.masterStub.deliverEvent(eStr, target); } } } \ No newline at end of file diff --git a/src/main/java/org/bdware/sc/handler/ManagerHandler.java b/src/main/java/org/bdware/sc/handler/ManagerHandler.java index 95f1f0a..91770ca 100644 --- a/src/main/java/org/bdware/sc/handler/ManagerHandler.java +++ b/src/main/java/org/bdware/sc/handler/ManagerHandler.java @@ -89,13 +89,4 @@ public class ManagerHandler extends MsgHandler { ContractRequest cr = JsonUtil.fromJson(msg.arg, ContractRequest.class); cb.onResult(cm.addDEMember(cr.getContractID(), cr.getArg().getAsString())); } - - /** - * @author Kaidong Wu - */ - @Description("Deliver event message") - public void deliverEMessage(GetMessage msg, ResultCallback cb) { - REvent eMsg = JsonUtil.fromJson(msg.arg, REvent.class); - cb.onResult(cm.deliverEvent(eMsg)); - } } diff --git a/src/main/java/org/bdware/sc/units/ByteUtil.java b/src/main/java/org/bdware/sc/units/ByteUtil.java deleted file mode 100644 index 85b0538..0000000 --- a/src/main/java/org/bdware/sc/units/ByteUtil.java +++ /dev/null @@ -1,53 +0,0 @@ -package org.bdware.sc.units; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; - -public class ByteUtil { -// public static byte[] readRest(ByteArrayInputStream bi) { -// byte[] ret = new byte[bi.available()]; -// bi.read(ret, 0, ret.length); -// return ret; -// } - - public static int readInt(ByteArrayInputStream bi) { - int ret = 0; - ret |= (bi.read() & 0xff); - ret <<= 8; - ret |= (bi.read() & 0xff); - ret <<= 8; - ret |= (bi.read() & 0xff); - ret <<= 8; - ret |= (bi.read() & 0xff); - return ret; - } - - public static void writeInt(ByteArrayOutputStream bo, int i) { - bo.write((i >> 24) & 0xff); - bo.write((i >> 16) & 0xff); - bo.write((i >> 8) & 0xff); - bo.write(i & 0xff); - - } - - public static void writeLong(ByteArrayOutputStream bo, long l) { - for (int i = 56; i >= 0; i -= 8) { - bo.write(((int) (l >> i)) & 0xff); - } - } - - public static long readLong(ByteArrayInputStream bi) { - long ret = 0L; - for (int i = 0; i < 8; i++) { - ret <<= 8; - ret |= (bi.read() & 0xff); - } - return ret; - } - - public static byte[] readBytes(ByteArrayInputStream bi, int len) { - byte[] ret = new byte[len]; - bi.read(ret, 0, len); - return ret; - } -} diff --git a/src/main/java/org/bdware/sc/util/DHTUtil.java b/src/main/java/org/bdware/sc/util/DHTUtil.java new file mode 100644 index 0000000..a4f87ef --- /dev/null +++ b/src/main/java/org/bdware/sc/util/DHTUtil.java @@ -0,0 +1,100 @@ +package org.bdware.sc.util; + +import org.bdware.sc.ContractManager; + +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class DHTUtil { + private static final Map ID_INTEGER_CACHE = new HashMap<>(); + + public static String[] getClusterByKey(String key, int k) { + String[] nodes = ContractManager.instance.masterStub.listNodes(); + if (nodes.length == 0) { + return null; + } + if (nodes.length <= k) { + return nodes; + } + + String hash = HashUtil.sha3ToFixedLen(key, nodes[0].length()); + + int l = 0, r = nodes.length - 1, m, + h2l = hash.compareTo(nodes[l]), r2h = nodes[r].compareTo(hash), + h2m; + BigInteger bigH = null; + String selected; + do { + if (h2l < 1) { // if the left is bigger than hash + selected = nodes[l]; + m = l; + break; + } + if (r2h < 1) { // if the right is smaller than hash + selected = nodes[r]; + m = r; + break; + } + if (l + 1 == r) { + BigInteger bigL = getBigInteger(nodes[l]), + bigR = getBigInteger(nodes[r]); + bigH = new BigInteger(hash.substring(2)); + if (bigR.subtract(bigH).compareTo(bigH.subtract(bigL)) > -1) { + selected = nodes[l]; + m = l; + } else { + selected = nodes[r]; + m = r; + } + break; + } + m = (l + r) >> 1; + h2m = hash.compareTo(nodes[m]); + if (h2m < 1) { + r = m; + r2h = -h2m; + } else { + l = m; + h2l = h2m; + } + } while (true); + if (k == 1) { + return new String[]{selected}; + } + List ret = new ArrayList<>(); + ret.add(selected); + if (k > 1) { + l = m - 1; + r = m + 1; + while (ret.size() < k && (l >= 0 || r < nodes.length)) { + if (l < 0) { + ret.add(nodes[r++]); + } else if (r >= nodes.length) { + ret.add(nodes[l--]); + } else { + if (null == bigH) { + bigH = new BigInteger(hash.substring(2)); + } + if (getBigInteger(nodes[r]).subtract(bigH) + .compareTo(bigH.subtract(getBigInteger(nodes[l]))) > -1) { + ret.add(nodes[l--]); + } else { + ret.add(nodes[r++]); + } + } + } + } + return ret.toArray(new String[0]); + } + + private static BigInteger getBigInteger(String id) { + String bigInt = id.substring(2); + if (!ID_INTEGER_CACHE.containsKey(bigInt)) { + ID_INTEGER_CACHE.put(bigInt, new BigInteger(bigInt)); + } + return ID_INTEGER_CACHE.get(bigInt); + } +}