From e10866ff8bfb9d411b6c05749437f0cde6a8f8a2 Mon Sep 17 00:00:00 2001 From: CaiHQ Date: Thu, 19 May 2022 11:06:06 +0800 Subject: [PATCH] rebase --- .../AbstractContextContractExecutor.java | 1 + .../MultiPointCooperationExecutor.java | 64 +++++++++++++------ 2 files changed, 47 insertions(+), 18 deletions(-) diff --git a/src/main/java/org/bdware/consistency/plugin/common/AbstractContextContractExecutor.java b/src/main/java/org/bdware/consistency/plugin/common/AbstractContextContractExecutor.java index 76f69ea..1785741 100644 --- a/src/main/java/org/bdware/consistency/plugin/common/AbstractContextContractExecutor.java +++ b/src/main/java/org/bdware/consistency/plugin/common/AbstractContextContractExecutor.java @@ -8,6 +8,7 @@ public abstract class AbstractContextContractExecutor implements ContractExecuto static protected IGlobalConf globalConf = ConsistencyPluginManager.getContext().getGlobalConf(); static protected ICMActions cmActions = ConsistencyPluginManager.getContext().getCMActions(); static protected INetworkManager networkManager = ConsistencyPluginManager.getContext().getNetworkManager(); + static protected IMasterClientTCPAction masterClientTCPAction = ConsistencyPluginManager.getContext().getMasterClientTCPAction(); static protected IMasterServerTCPAction masterServerTCPAction = ConsistencyPluginManager.getContext().getMasterServerTCPAction(); static protected IMasterServerRecoverMechAction masterServerRecoverMechAction = ConsistencyPluginManager.getContext().getMasterServerRecoverMechAction(); } diff --git a/src/main/java/org/bdware/consistency/plugin/sharding/MultiPointCooperationExecutor.java b/src/main/java/org/bdware/consistency/plugin/sharding/MultiPointCooperationExecutor.java index 65689ca..f91fc69 100644 --- a/src/main/java/org/bdware/consistency/plugin/sharding/MultiPointCooperationExecutor.java +++ b/src/main/java/org/bdware/consistency/plugin/sharding/MultiPointCooperationExecutor.java @@ -1,5 +1,6 @@ package org.bdware.consistency.plugin.sharding; +import com.google.gson.JsonArray; import com.google.gson.JsonObject; import com.google.gson.JsonParser; import com.google.gson.JsonPrimitive; @@ -7,6 +8,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.bdware.consistency.plugin.common.AbstractContextContractExecutor; import org.bdware.sc.ComponedContractResult; +import org.bdware.sc.ContractClient; import org.bdware.sc.ContractMeta; import org.bdware.sc.ContractResult; import org.bdware.sc.bean.*; @@ -20,10 +22,7 @@ import org.bdware.sc.util.JsonUtil; import org.bdware.server.trustedmodel.MultiReqSeq; import java.math.BigInteger; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; @@ -66,13 +65,13 @@ public class MultiPointCooperationExecutor extends AbstractContextContractExecut count); // 把count改成了1,设置成获得1个响应就行 } - public void sendRequest(String id, ContractRequest req, String[] nodes) { - Map reqStr = new HashMap<>(); - reqStr.put("uniReqID", id); - reqStr.put("data", req); + public void sendRequest(String id, ContractRequest req, String[] nodes, ResultCallback rc) { req.needSeq = false; - reqStr.put("action", "executeContractLocally"); - String sendStr = JsonUtil.toJson(reqStr); + JsonObject jo = new JsonObject(); + jo.addProperty("uniReqID", id); + jo.add("data", JsonUtil.parseObject(req)); + jo.addProperty("action", "executeContractLocally"); + String sendStr = jo.toString(); // master负责缓存请求 if (!masterServerTCPAction.getReqCache().containsKey(contractID)) { masterServerTCPAction.getReqCache().put(contractID, new RequestCache()); @@ -83,13 +82,17 @@ public class MultiPointCooperationExecutor extends AbstractContextContractExecut LOGGER.info("node size = " + nodes.length); LOGGER.debug("nodes:" + JsonUtil.toJson(nodes)); for (String node : nodes) { - LOGGER.info( - "[sendRequests] get cmNode " - + node.substring(0, 5) - + " not null " - + "RequestAllExecutor 发送请求给 " - + node.substring(0, 5)); - networkManager.sendToAgent(node, sendStr); + if (node.equals(globalConf.getNodeID())) { + masterClientTCPAction.asyncExecuteContractLocally(jo, rc); + } else { + LOGGER.info( + "[sendRequests] get cmNode " + + node.substring(0, 5) + + " not null " + + "RequestAllExecutor 发送请求给 " + + node.substring(0, 5)); + networkManager.sendToAgent(node, sendStr); + } } } @@ -123,6 +126,31 @@ public class MultiPointCooperationExecutor extends AbstractContextContractExecut val = val + members.length; } return new String[]{members[val]}; + case byFunc: + ContractClient client = cmActions.getManager().getClient(req.getContractID()); + + JsonArray membersArr = new JsonArray(members.length); + for (String member : members) { + membersArr.add(member); + } + + JsonObject arg = new JsonObject(); + arg.addProperty("funcName", routeInfo.funcName); + // func myFunc (currentNode, members, membersCount, sourceArg) + JsonArray funcArgs = new JsonArray(); + funcArgs.add(globalConf.getNodeID()); + funcArgs.add(membersArr); + funcArgs.add(membersArr.size()); + funcArgs.add(req.getArg()); + arg.add("funcArgs", funcArgs); + + String routeResultStr = client.executeMethod("", "invokeFunctionWithoutLimit", arg.toString()); + JsonObject routeResult = JsonUtil.parseString(routeResultStr).getAsJsonObject(); + List nodes = new ArrayList<>(); + for (String key: routeResult.keySet()) { + nodes.add(routeResult.get(key).getAsString()); + } + return nodes.toArray(new String[]{}); default: return members; } @@ -194,7 +222,7 @@ public class MultiPointCooperationExecutor extends AbstractContextContractExecut createResultCallback(id, rc, count, req.seq, req.getContractID(), joinInfo); // 初始化结果收集器 masterServerTCPAction.getSync().sleep(id, collector); LOGGER.info("requestID=" + requestID + " master broadcasts request " + req.seq); - sendRequest(id, req, nodes); // 发送请求 + sendRequest(id, req, nodes, collector); // 发送请求 } else { LOGGER.info("invalidNodeNumOnResult"); request_index.getAndDecrement();