diff --git a/src/main/java/org/bdware/consistency/plugin/sharding/MultiPointCooperationExecutor.java b/src/main/java/org/bdware/consistency/plugin/sharding/MultiPointCooperationExecutor.java index df41a0e..1bef851 100644 --- a/src/main/java/org/bdware/consistency/plugin/sharding/MultiPointCooperationExecutor.java +++ b/src/main/java/org/bdware/consistency/plugin/sharding/MultiPointCooperationExecutor.java @@ -29,6 +29,7 @@ import java.util.concurrent.atomic.AtomicInteger; public class MultiPointCooperationExecutor extends AbstractContextContractExecutor { private static final Logger LOGGER = LogManager.getLogger(MultiPointCooperationExecutor.class); final Object lock = new Object(); + private final ContractMeta meta; int resultCount; AtomicInteger request_index = new AtomicInteger(0); ContractExecType type; @@ -45,6 +46,7 @@ public class MultiPointCooperationExecutor extends AbstractContextContractExecut resultCount = c; contractID = con_id; multiMeta = cmActions.getManager().multiContractRecorder.getMultiContractMeta(contractID); + meta = cmActions.getManager().statusRecorder.getContractMeta(contractID); } public void setSeq(int seq) { @@ -69,14 +71,14 @@ public class MultiPointCooperationExecutor extends AbstractContextContractExecut } // TODO 多调多统一个seq的有多个请求,这个需要改 masterServerTCPAction.getReqCache().get(contractID).put(req.seq, sendStr); - LOGGER.debug(JsonUtil.toJson(req)); - LOGGER.info("node size = " + nodes.length); - LOGGER.debug("nodes:" + JsonUtil.toJson(nodes)); + // LOGGER.debug(JsonUtil.toJson(req)); + // LOGGER.info("node size = " + nodes.length); + // LOGGER.debug("nodes:" + JsonUtil.toJson(nodes)); for (String node : nodes) { if (node.equals(globalConf.getNodeID())) { masterClientTCPAction.asyncExecuteContractLocally(jo, rc); } else { - LOGGER.info("[sendRequests] get cmNode " + node.substring(0, 5) + " not null " + "RequestAllExecutor 发送请求给 " + node.substring(0, 5)); + // LOGGER.info("[sendRequests] get cmNode " + node.substring(0, 5) + " not null " + "RequestAllExecutor 发送请求给 " + node.substring(0, 5)); networkManager.sendToAgent(node, sendStr); } } @@ -86,7 +88,6 @@ public class MultiPointCooperationExecutor extends AbstractContextContractExecut ContractClient client = cmActions.getManager().getClient(contractID); JsonObject arg = new JsonObject(); arg.addProperty("funcName", funcName); - JsonArray funcArgs = new JsonArray(); if (args != null && args.length > 0) for (JsonElement je : args) funcArgs.add(je); @@ -101,39 +102,82 @@ public class MultiPointCooperationExecutor extends AbstractContextContractExecut int val; if (routeInfo.useDefault == null) { // func myFunc (requester, sourceArg) - return invokeFunctionWithoutLimit(req.getContractID(), routeInfo.funcName, String[].class, new JsonPrimitive(req.getRequester()), req.getArg()); + JsonElement requester = req.getRequester() == null ? JsonNull.INSTANCE : new JsonPrimitive(req.getRequester()); + return invokeFunctionWithoutLimit(req.getContractID(), routeInfo.funcName, String[].class, requester, req.getArg()); } switch (routeInfo.useDefault) { case byRequester: val = new BigInteger(req.getRequester(), 16).mod(new BigInteger("" + members.length)).intValue(); - while (val < 0) { + while (val < 0 && members.length > 0) { val = val + members.length; } return new String[]{members[val]}; case byArgHash: val = req.getArg().hashCode(); val = val % members.length; - while (val < 0) { + while (val < 0 && members.length > 0) { val += members.length; } return new String[]{members[val]}; - case byTarget: - JsonObject jo = req.getArg().getAsJsonObject(); - val = new BigInteger(jo.get("target").getAsString(), 16).mod(new BigInteger("" + members.length)).intValue(); - while (val < 0) { - val = val + members.length; + case byJsonPropHash: + JsonElement jo = tryLoadJsonProp(req, routeInfo.param); + val = jo.toString().hashCode() % members.length; + while (val < 0 && members.length > 0) { + val += members.length; } return new String[]{members[val]}; + case byShardingIDTree: + return byShardingIDTree(members, meta.contract.shardingId, Integer.valueOf(routeInfo.param)); default: return members; } } catch (Exception e) { + e.printStackTrace(); return members; } } + private String[] byShardingIDTree(String[] members, int shardingId, int childCount) { + if (childCount > 0) { + int from = shardingId * childCount + 1; + int to = Math.min(members.length, shardingId * childCount + childCount + 1); + if (to > from) { + String[] ret = new String[to - from]; + System.arraycopy(members, from, ret, 0, to - from); + return ret; + } else + return new String[]{}; + } else { + childCount = Math.abs(childCount); + if (shardingId > 0) shardingId = (shardingId - 1) / childCount; + return new String[]{members[shardingId]}; + } + } + + private JsonElement tryLoadJsonProp(ContractRequest req, String param) { + try { + if (req.getArg() == null) + return JsonNull.INSTANCE; + JsonObject arg; + if (req.getArg().isJsonPrimitive()) { + arg = JsonUtil.parseString(req.getArg().getAsString()).getAsJsonObject(); + } else arg = req.getArg().getAsJsonObject(); + if (!param.contains(".")) { + return arg.get(param); + } else { + String[] props = param.split("\\."); + JsonElement result = arg; + for (String str : props) + result = result.getAsJsonObject().get(str); + return result; + } + } catch (Exception e) { + e.printStackTrace(); + return JsonNull.INSTANCE; + } + } + public boolean checkCurNodeNumValid() { - LOGGER.info("checkCurNodeNumValid"); String[] nodes = multiMeta.getMembers(); // List nodes = info.members; int validNode = 0; @@ -146,15 +190,14 @@ public class MultiPointCooperationExecutor extends AbstractContextContractExecut } int c = resultCount; if (type == ContractExecType.Sharding) c = (int) Math.ceil((double) c / 2); - LOGGER.info("c=" + c + " validNode=" + validNode); return validNode >= c; } @Override public void execute(String requestID, ContractRequest req, ResultCallback rc, OnHashCallback hcb) { - LOGGER.info("[MultiPointCooperationExecutor] execute " + JsonUtil.toJson(req)); + //LOGGER.info("[MultiPointCooperationExecutor] execute " + JsonUtil.toJson(req)); // 获得action 函数名 - LOGGER.info("action is : " + req.getAction()); + // LOGGER.info("action is : " + req.getAction()); req.setContractID(cmActions.getManager().getContractIDByName(req.getContractID())); if (requestID != null && requestID.endsWith("_mul")) { synchronized (lock) { @@ -170,33 +213,33 @@ public class MultiPointCooperationExecutor extends AbstractContextContractExecut } req.needSeq = true; String id = System.currentTimeMillis() + "_" + (int) (Math.random() * 1000000) + "_" + req.seq; - LOGGER.info("execute receive requestID= " + requestID + " msgID=" + id); - if (checkCurNodeNumValid()) { // 校验成功 current node num 合法 - LOGGER.info("checkCurNodeNumValid true"); - ContractMeta meta = cmActions.getManager().statusRecorder.getContractMeta(req.getContractID()); - FunctionDesp fun = meta.getExportedFunction(req.getAction()); - ResultCallback collector; - //Count 根据join规则来。 - //nodes 根据route规则来。 - JoinInfo joinInfo = fun.joinInfo; - RouteInfo routeInfo = fun.routeInfo; - int count = getJoinCount(joinInfo, contractID, req); - LOGGER.info("requestID=" + requestID + " join Count: " + count); + //LOGGER.info("execute receive requestID= " + requestID + " msgID=" + id); + // 校验成功 current node num 合法 + //LOGGER.info("checkCurNodeNumValid true"); + ContractMeta meta = cmActions.getManager().statusRecorder.getContractMeta(req.getContractID()); + FunctionDesp fun = meta.getExportedFunction(req.getAction()); + ResultCallback collector; + //Count 根据join规则来。 + //nodes 根据route规则来。 + JoinInfo joinInfo = fun.joinInfo; + RouteInfo routeInfo = fun.routeInfo; + int count = getJoinCount(joinInfo, contractID, req); - String[] members = multiMeta.getMembers(); - String[] nodes = getAccordingToRouteInfo(routeInfo, req, members); - if (nodes.length < count) { - count = nodes.length; - } - collector = createResultCallback(id, rc, count, req.seq, req.getContractID(), joinInfo); // 初始化结果收集器 + String[] members = multiMeta.getMembers(); + String[] nodes = getAccordingToRouteInfo(routeInfo, req, members); + if (nodes.length < count) { + count = nodes.length; + } + //LOGGER.info("requestID=" + requestID + " join Count: " + count + " nodeCount:" + nodes.length); + if (count > 0) { + collector = createResultCallback(id, rc, count, req.seq, req.getContractID(), joinInfo); + // 初始化结果收集器 masterServerTCPAction.getSync().sleep(id, collector); - LOGGER.info("requestID=" + requestID + " master broadcasts request " + req.seq); + // LOGGER.info("requestID=" + requestID + " master broadcasts request " + req.seq); sendRequest(id, req, nodes, collector); // 发送请求 } else { - LOGGER.info("invalidNodeNumOnResult"); - request_index.getAndDecrement(); - ContractResult finalResult = new ContractResult(ContractResult.Status.Error, new JsonPrimitive("node number unavailbale,request refused.")); - rc.onResult(JsonUtil.toJson(finalResult)); + ContractResult cr = new ContractResult(ContractResult.Status.Exception, new JsonPrimitive("broadcast 0")); + rc.onResult(JsonUtil.toJson(cr)); } } @@ -258,15 +301,22 @@ public class MultiPointCooperationExecutor extends AbstractContextContractExecut // str的data是个ContractResult // 在这儿也是返回个ContractResult try { - LOGGER.info(str); + //LOGGER.info(str); JsonObject obj = JsonParser.parseString(str).getAsJsonObject(); - String id = obj.get("nodeID").getAsString(); + + String id; + if (obj.get("nodeID") == null) { + id = "null"; + // LOGGER.info("已经收到第" + order + "节点的结果,无节点ID! 该结果被忽略" + order); + return; + } else + id = obj.get("nodeID").getAsString(); if (nodeIDs.contains(id)) { - LOGGER.info("已经收到节点 " + id.substring(0, 5) + " 的结果,该结果被忽略"); + // LOGGER.info("已经收到节点 " + id.substring(0, 5) + " 的结果,该结果被忽略"); return; } nodeIDs.add(id); - LOGGER.info("contractID=" + contractID + " 收到第 " + order + " 个节点回复 : " + str + " order=" + order + " count=" + count); + //LOGGER.info("contractID=" + contractID + " 收到第 " + order + " 个节点回复 : " + str + " order=" + order + " count=" + count); componedContractResult.add(obj); // 收集到所有结果 if (order.incrementAndGet() == count) { @@ -289,7 +339,7 @@ public class MultiPointCooperationExecutor extends AbstractContextContractExecut } originalCallback.onResult(JsonUtil.toJson(finalResult)); // } - LOGGER.info("本次执行最终结果为 " + finalResult.size + "个节点合并的,结果为 " + finalResult.result); + //LOGGER.info("本次执行最终结果为 " + finalResult.size + "个节点合并的,结果为 " + finalResult.result); // 集群中事务序号+1 // MasterServerTCPAction.contractID2Members.get(contractID).nextSeq(); @@ -330,8 +380,8 @@ public class MultiPointCooperationExecutor extends AbstractContextContractExecut if (joinInfo.joinFuncName != null) { JsonElement ret = MultiPointCooperationExecutor.invokeFunctionWithoutLimit(contractID, joinInfo.joinFuncName, JsonElement.class, jo); finalResult.result = ret; - return; } + return; } switch (joinInfo.useDefault) { case add: