From 5e45904ceb3f20e035dc822f0526833c8de4ea12 Mon Sep 17 00:00:00 2001 From: CaiHQ Date: Tue, 28 Jun 2022 19:49:54 +0800 Subject: [PATCH] add join support --- .../MultiPointCooperationExecutor.java | 220 +++++++----------- 1 file changed, 85 insertions(+), 135 deletions(-) diff --git a/src/main/java/org/bdware/consistency/plugin/sharding/MultiPointCooperationExecutor.java b/src/main/java/org/bdware/consistency/plugin/sharding/MultiPointCooperationExecutor.java index 85a8cec..df41a0e 100644 --- a/src/main/java/org/bdware/consistency/plugin/sharding/MultiPointCooperationExecutor.java +++ b/src/main/java/org/bdware/consistency/plugin/sharding/MultiPointCooperationExecutor.java @@ -1,9 +1,6 @@ package org.bdware.consistency.plugin.sharding; -import com.google.gson.JsonArray; -import com.google.gson.JsonObject; -import com.google.gson.JsonParser; -import com.google.gson.JsonPrimitive; +import com.google.gson.*; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.bdware.consistency.plugin.common.AbstractContextContractExecutor; @@ -22,7 +19,9 @@ import org.bdware.sc.util.JsonUtil; import org.bdware.server.trustedmodel.MultiReqSeq; import java.math.BigInteger; -import java.util.*; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; @@ -52,17 +51,9 @@ public class MultiPointCooperationExecutor extends AbstractContextContractExecut request_index = new AtomicInteger(seq); } - public ResultCallback createResultCallback( - final String requestID, - final ResultCallback originalCb, - final int count, - final int request_seq, - final String contractID, JoinInfo joinInfo) { + public ResultCallback createResultCallback(final String requestID, final ResultCallback originalCb, final int count, final int request_seq, final String contractID, JoinInfo joinInfo) { // TODO 加对应的超时? - return networkManager.createResultCallback( - requestID, - new MultiPointCooperationExecutor.ResultMerger(originalCb, count, request_seq, contractID, joinInfo), - count); // 把count改成了1,设置成获得1个响应就行 + return networkManager.createResultCallback(requestID, new MultiPointCooperationExecutor.ResultMerger(originalCb, count, request_seq, contractID, joinInfo), count); // 把count改成了1,设置成获得1个响应就行 } public void sendRequest(String id, ContractRequest req, String[] nodes, ResultCallback rc) { @@ -85,26 +76,36 @@ public class MultiPointCooperationExecutor extends AbstractContextContractExecut if (node.equals(globalConf.getNodeID())) { masterClientTCPAction.asyncExecuteContractLocally(jo, rc); } else { - LOGGER.info( - "[sendRequests] get cmNode " - + node.substring(0, 5) - + " not null " - + "RequestAllExecutor 发送请求给 " - + node.substring(0, 5)); + LOGGER.info("[sendRequests] get cmNode " + node.substring(0, 5) + " not null " + "RequestAllExecutor 发送请求给 " + node.substring(0, 5)); networkManager.sendToAgent(node, sendStr); } } } + public static T invokeFunctionWithoutLimit(String contractID, String funcName, Class t, JsonElement... args) { + ContractClient client = cmActions.getManager().getClient(contractID); + JsonObject arg = new JsonObject(); + arg.addProperty("funcName", funcName); + + JsonArray funcArgs = new JsonArray(); + if (args != null && args.length > 0) for (JsonElement je : args) + funcArgs.add(je); + arg.add("funcArgs", funcArgs); + String routeResultStr = client.executeMethod("", "invokeFunctionWithoutLimit", arg.toString()); + T routeResult = JsonUtil.fromJson(routeResultStr, t); + return routeResult; + } + private String[] getAccordingToRouteInfo(RouteInfo routeInfo, ContractRequest req, String[] members) { try { int val; + if (routeInfo.useDefault == null) { + // func myFunc (requester, sourceArg) + return invokeFunctionWithoutLimit(req.getContractID(), routeInfo.funcName, String[].class, new JsonPrimitive(req.getRequester()), req.getArg()); + } switch (routeInfo.useDefault) { case byRequester: - val = - new BigInteger(req.getRequester(), 16) - .mod(new BigInteger("" + members.length)) - .intValue(); + val = new BigInteger(req.getRequester(), 16).mod(new BigInteger("" + members.length)).intValue(); while (val < 0) { val = val + members.length; } @@ -118,26 +119,11 @@ public class MultiPointCooperationExecutor extends AbstractContextContractExecut return new String[]{members[val]}; case byTarget: JsonObject jo = req.getArg().getAsJsonObject(); - val = - new BigInteger(jo.get("target").getAsString(), 16) - .mod(new BigInteger("" + members.length)) - .intValue(); + val = new BigInteger(jo.get("target").getAsString(), 16).mod(new BigInteger("" + members.length)).intValue(); while (val < 0) { val = val + members.length; } return new String[]{members[val]}; - case byFunc: - ContractClient client = cmActions.getManager().getClient(req.getContractID()); - JsonObject arg = new JsonObject(); - arg.addProperty("funcName", routeInfo.funcName); - // func myFunc (requester, sourceArg) - JsonArray funcArgs = new JsonArray(); - funcArgs.add(req.getRequester()); - funcArgs.add(req.getArg()); - arg.add("funcArgs", funcArgs); - String routeResultStr = client.executeMethod("", "invokeFunctionWithoutLimit", arg.toString()); - String[] routeResult = JsonUtil.fromJson(routeResultStr, String[].class); - return routeResult; default: return members; } @@ -183,21 +169,18 @@ public class MultiPointCooperationExecutor extends AbstractContextContractExecut req.seq = request_index.getAndIncrement(); } req.needSeq = true; - String id = - System.currentTimeMillis() + "_" + (int) (Math.random() * 1000000) + "_" + req.seq; + String id = System.currentTimeMillis() + "_" + (int) (Math.random() * 1000000) + "_" + req.seq; LOGGER.info("execute receive requestID= " + requestID + " msgID=" + id); if (checkCurNodeNumValid()) { // 校验成功 current node num 合法 LOGGER.info("checkCurNodeNumValid true"); - ContractMeta meta = - cmActions.getManager().statusRecorder.getContractMeta(req.getContractID()); + ContractMeta meta = cmActions.getManager().statusRecorder.getContractMeta(req.getContractID()); FunctionDesp fun = meta.getExportedFunction(req.getAction()); ResultCallback collector; - // TODO @fanbo 下面的count 1要改,应该是根据route的规则来。 //Count 根据join规则来。 //nodes 根据route规则来。 JoinInfo joinInfo = fun.joinInfo; RouteInfo routeInfo = fun.routeInfo; - int count = getJoinCount(joinInfo, contractID); + int count = getJoinCount(joinInfo, contractID, req); LOGGER.info("requestID=" + requestID + " join Count: " + count); String[] members = multiMeta.getMembers(); @@ -205,54 +188,44 @@ public class MultiPointCooperationExecutor extends AbstractContextContractExecut if (nodes.length < count) { count = nodes.length; } - collector = - createResultCallback(id, rc, count, req.seq, req.getContractID(), joinInfo); // 初始化结果收集器 + collector = createResultCallback(id, rc, count, req.seq, req.getContractID(), joinInfo); // 初始化结果收集器 masterServerTCPAction.getSync().sleep(id, collector); LOGGER.info("requestID=" + requestID + " master broadcasts request " + req.seq); sendRequest(id, req, nodes, collector); // 发送请求 } else { LOGGER.info("invalidNodeNumOnResult"); request_index.getAndDecrement(); - ContractResult finalResult = - new ContractResult( - ContractResult.Status.Error, - new JsonPrimitive("node number unavailbale,request refused.")); + ContractResult finalResult = new ContractResult(ContractResult.Status.Error, new JsonPrimitive("node number unavailbale,request refused.")); rc.onResult(JsonUtil.toJson(finalResult)); } } - private int getJoinCount(JoinInfo joinInfo, String contractID) { - if (joinInfo == null) return resultCount; - if (joinInfo != null) return joinInfo.joinCount; - + private int getJoinCount(JoinInfo joinInfo, String contractID, ContractRequest contractRequest) { try { - ContractRequest cr = new ContractRequest(); - cr.setContractID(contractID); - cr.setAction("TODO"); - //TODO Arg需要好好设计一下。 - //TODO 又好用又简单的那种设计 - //TODO - cr.setArg(""); - String result = cmActions.getManager().executeLocally(cr, null); - return JsonUtil.parseString(result).getAsJsonObject().get("result").getAsInt(); + if (joinInfo == null) return resultCount; + if (joinInfo.joinCountFuncName != null) { + Integer count = invokeFunctionWithoutLimit(contractID, joinInfo.joinCountFuncName, Integer.class, new JsonPrimitive(contractRequest.getRequester()), contractRequest.getArg()); + return count; + + } + if (joinInfo.joinCount != 0) return joinInfo.joinCount; } catch (Exception e) { e.printStackTrace(); - return 1; } + return resultCount; + } // 清理缓存的多点合约请求序号 public void clearCache() { final long time = System.currentTimeMillis() - 30000L; - seqMap.entrySet() - .removeIf( - entry -> { - MultiReqSeq cache = entry.getValue(); - if (null == cache) { - return true; - } - return cache.startTime < time; - }); + seqMap.entrySet().removeIf(entry -> { + MultiReqSeq cache = entry.getValue(); + if (null == cache) { + return true; + } + return cache.startTime < time; + }); } public static class ResultMerger extends ResultCallback { @@ -265,12 +238,7 @@ public class MultiPointCooperationExecutor extends AbstractContextContractExecut Set nodeIDs = new HashSet<>(); // 已收到返回结果的节点 JoinInfo joinInfo; - ResultMerger( - final ResultCallback originalCb, - final int count, - final int request_seq, - final String contractID, - final JoinInfo joinInfo) { + ResultMerger(final ResultCallback originalCb, final int count, final int request_seq, final String contractID, final JoinInfo joinInfo) { originalCallback = originalCb; this.count = count; this.request_seq = request_seq; @@ -281,16 +249,7 @@ public class MultiPointCooperationExecutor extends AbstractContextContractExecut } public String getInfo() { - return "contractID=" - + contractID - + " 收到第 " - + order - + " 个节点回复 : " - + " order=" - + order - + " count=" - + count - + " "; + return "contractID=" + contractID + " 收到第 " + order + " 个节点回复 : " + " order=" + order + " count=" + count + " "; } @Override @@ -307,17 +266,7 @@ public class MultiPointCooperationExecutor extends AbstractContextContractExecut return; } nodeIDs.add(id); - LOGGER.info( - "contractID=" - + contractID - + " 收到第 " - + order - + " 个节点回复 : " - + str - + " order=" - + order - + " count=" - + count); + LOGGER.info("contractID=" + contractID + " 收到第 " + order + " 个节点回复 : " + str + " order=" + order + " count=" + count); componedContractResult.add(obj); // 收集到所有结果 if (order.incrementAndGet() == count) { @@ -325,7 +274,6 @@ public class MultiPointCooperationExecutor extends AbstractContextContractExecut finalResult.needSeq = true; finalResult.seq = request_seq; - // if (null == finalResult) { // finalResult = // new ContractResult( @@ -341,15 +289,11 @@ public class MultiPointCooperationExecutor extends AbstractContextContractExecut } originalCallback.onResult(JsonUtil.toJson(finalResult)); // } - LOGGER.info( - "本次执行最终结果为 " + finalResult.size + "个节点合并的,结果为 " + finalResult.result); + LOGGER.info("本次执行最终结果为 " + finalResult.size + "个节点合并的,结果为 " + finalResult.result); // 集群中事务序号+1 // MasterServerTCPAction.contractID2Members.get(contractID).nextSeq(); - cmActions.getManager() - .multiContractRecorder - .getMultiContractMeta(contractID) - .nextSeqAtMaster(); + cmActions.getManager().multiContractRecorder.getMultiContractMeta(contractID).nextSeqAtMaster(); // recover,其中无状态合约CP出错无需恢复 Set nodesID = componedContractResult.getProblemNodes(); if (null == nodesID || nodesID.isEmpty()) { @@ -357,22 +301,17 @@ public class MultiPointCooperationExecutor extends AbstractContextContractExecut } for (String nodeID : nodesID) { LOGGER.info("结果出现问题的节点有:" + nodeID); - if (masterServerRecoverMechAction.getRecoverStatusMap().get(nodeID).get(contractID) - == RecoverFlag.Fine) { - masterServerRecoverMechAction.getRecoverStatusMap() - .get(nodeID) - .put(contractID, RecoverFlag.ToRecover); + if (masterServerRecoverMechAction.getRecoverStatusMap().get(nodeID).get(contractID) == RecoverFlag.Fine) { + masterServerRecoverMechAction.getRecoverStatusMap().get(nodeID).put(contractID, RecoverFlag.ToRecover); } } for (String nodeID : nodesID) { - if (masterServerRecoverMechAction.getRecoverStatusMap().get(nodeID).get(contractID) - == RecoverFlag.ToRecover) { + if (masterServerRecoverMechAction.getRecoverStatusMap().get(nodeID).get(contractID) == RecoverFlag.ToRecover) { LOGGER.info("问题节点开始恢复:" + nodeID); // 因为该节点结果有误,所以即时是stableMode也认为trans记录不可信 // 直接通过load别的节点来恢复 - masterServerRecoverMechAction.restartContractFromCommonMode( - nodeID, contractID); + masterServerRecoverMechAction.restartContractFromCommonMode(nodeID, contractID); } } } @@ -385,24 +324,35 @@ public class MultiPointCooperationExecutor extends AbstractContextContractExecut private void handleJoinInfo(ContractResult finalResult, JoinInfo joinInfo) { JsonObject jo = finalResult.result.getAsJsonObject(); - if (joinInfo != null && joinInfo.joinRule != null) { - //TODO 不应该是double 类型 - switch (joinInfo.joinRule) { - case "add": - double val = 0; - for (String key : jo.keySet()) { - val += jo.get(key).getAsDouble(); + try { + if (joinInfo != null) { + if (joinInfo.useDefault == null) { + if (joinInfo.joinFuncName != null) { + JsonElement ret = MultiPointCooperationExecutor.invokeFunctionWithoutLimit(contractID, joinInfo.joinFuncName, JsonElement.class, jo); + finalResult.result = ret; + return; } - finalResult.result = new JsonPrimitive(val); - break; - case "multiply": - val = 1; - for (String key : jo.keySet()) { - val *= jo.get(key).getAsDouble(); - } - finalResult.result = new JsonPrimitive(val); - break; + } + switch (joinInfo.useDefault) { + case add: + double val = 0; + for (String key : jo.keySet()) { + val += jo.get(key).getAsDouble(); + } + finalResult.result = new JsonPrimitive(val); + break; + case multiply: + val = 1; + for (String key : jo.keySet()) { + val *= jo.get(key).getAsDouble(); + } + finalResult.result = new JsonPrimitive(val); + break; + default: + } } + } catch (Exception e) { + e.printStackTrace(); } } }