diff --git a/src/main/java/org/bdware/server/executor/unconsistency/MultiPointCooperationExecutor.java b/src/main/java/org/bdware/server/executor/unconsistency/MultiPointCooperationExecutor.java index 7697fcf..6d7668e 100644 --- a/src/main/java/org/bdware/server/executor/unconsistency/MultiPointCooperationExecutor.java +++ b/src/main/java/org/bdware/server/executor/unconsistency/MultiPointCooperationExecutor.java @@ -43,6 +43,7 @@ public class MultiPointCooperationExecutor implements ContractExecutor { Map seqMap = new ConcurrentHashMap<>(); Map resultCache = new ConcurrentHashMap<>(); // MultiPointContractInfo info; + MultiContractMeta multiMeta; String contractID; public MultiPointCooperationExecutor(ContractExecType t, int c, String con_id) { @@ -50,6 +51,7 @@ public class MultiPointCooperationExecutor implements ContractExecutor { type = t; resultCount = c; contractID = con_id; + multiMeta = CMActions.manager.multiContractRecorder.getMultiContractMeta(contractID); } public void setSeq(int seq) { @@ -63,31 +65,26 @@ public class MultiPointCooperationExecutor implements ContractExecutor { final int request_seq, final String contractID, JoinInfo joinInfo) { // TODO 加对应的超时? - return new ResultCollector( requestID, new MultiPointCooperationExecutor.ResultMerger(originalCb, count, request_seq, contractID, joinInfo), count); // 把count改成了1,设置成获得1个响应就行 } - public void sendRequest(String id, ContractRequest req, ResultCallback collector) { + public void sendRequest(String id, ContractRequest req, String[] nodes) { Map reqStr = new HashMap<>(); reqStr.put("uniReqID", id); reqStr.put("data", req); req.needSeq = false; reqStr.put("action", "executeContractLocally"); String sendStr = JsonUtil.toJson(reqStr); - // master负责缓存请求 if (!MasterServerTCPAction.requestCache.containsKey(contractID)) { MasterServerTCPAction.requestCache.put(contractID, new RequestCache()); } // TODO 多调多统一个seq的有多个请求,这个需要改 MasterServerTCPAction.requestCache.get(contractID).put(req.seq, sendStr); - LOGGER.debug(JsonUtil.toJson(req)); - String[] nodes = getAccordingToRouteInfo(req); - LOGGER.info("node size = " + nodes.length); LOGGER.debug("nodes:" + JsonUtil.toJson(nodes)); for (String node : nodes) { @@ -98,20 +95,11 @@ public class MultiPointCooperationExecutor implements ContractExecutor { + "RequestAllExecutor 发送请求给 " + node.substring(0, 5)); NetworkManager.instance.sendToAgent(node, sendStr); - } } - private String[] getAccordingToRouteInfo(ContractRequest req) { - - MultiContractMeta multiContractMeta = - CMActions.manager.multiContractRecorder.getMultiContractMeta(contractID); - - String[] members = multiContractMeta.getMembers(); + private String[] getAccordingToRouteInfo(RouteInfo routeInfo, ContractRequest req, String[] members) { try { - ContractMeta meta = CMActions.manager.statusRecorder.getContractMeta(contractID); - FunctionDesp fun = meta.getExportedFunction(req.getAction()); - RouteInfo routeInfo = fun.getRoute(); int val; switch (routeInfo.useDefault) { case byRequester: @@ -150,11 +138,7 @@ public class MultiPointCooperationExecutor implements ContractExecutor { public boolean checkCurNodeNumValid() { LOGGER.info("checkCurNodeNumValid"); - String[] nodes = - CMActions.manager - .multiContractRecorder - .getMultiContractMeta(contractID) - .getMembers(); + String[] nodes = multiMeta.getMembers(); // List nodes = info.members; int validNode = 0; for (String node : nodes) { @@ -175,9 +159,7 @@ public class MultiPointCooperationExecutor implements ContractExecutor { LOGGER.info("[MultiPointCooperationExecutor] execute " + JsonUtil.toJson(req)); // 获得action 函数名 LOGGER.info("action is : " + req.getAction()); - req.setContractID(CMActions.manager.getContractIDByName(req.getContractID())); - if (requestID != null && requestID.endsWith("_mul")) { synchronized (lock) { if (seqMap.containsKey(requestID)) { @@ -194,7 +176,6 @@ public class MultiPointCooperationExecutor implements ContractExecutor { String id = System.currentTimeMillis() + "_" + (int) (Math.random() * 1000000) + "_" + req.seq; LOGGER.info("execute receive requestID= " + requestID + " msgID=" + id); - if (checkCurNodeNumValid()) { // 校验成功 current node num 合法 LOGGER.info("checkCurNodeNumValid true"); ContractMeta meta = @@ -202,20 +183,23 @@ public class MultiPointCooperationExecutor implements ContractExecutor { FunctionDesp fun = meta.getExportedFunction(req.getAction()); ResultCallback collector; // TODO @fanbo 下面的count 1要改,应该是根据route的规则来。 + //Count 根据join规则来。 + //nodes 根据route规则来。 JoinInfo joinInfo = fun.joinInfo; - if (fun.getRoute() != null && fun.getRoute().useDefault != null) { - // 根据函数的类型决定不同的 collector - collector = - createResultCallback(id, rc, 1, req.seq, req.getContractID(), joinInfo); // 初始化结果收集器 - } else { - collector = - createResultCallback( - id, rc, resultCount, req.seq, req.getContractID(), joinInfo); // 初始化结果收集器 - } + RouteInfo routeInfo = fun.routeInfo; + int count = getJoinCount(joinInfo, contractID); + LOGGER.info("requestID=" + requestID + " join Count: " + count); + String[] members = multiMeta.getMembers(); + String[] nodes = getAccordingToRouteInfo(routeInfo, req, members); + if (nodes.length < count) { + count = nodes.length; + } + collector = + createResultCallback(id, rc, count, req.seq, req.getContractID(), joinInfo); // 初始化结果收集器 MasterServerTCPAction.sync.sleep(id, collector); LOGGER.info("requestID=" + requestID + " master broadcasts request " + req.seq); - sendRequest(id, req, collector); // 发送请求 + sendRequest(id, req, nodes); // 发送请求 } else { LOGGER.info("invalidNodeNumOnResult"); request_index.getAndDecrement(); @@ -227,6 +211,27 @@ public class MultiPointCooperationExecutor implements ContractExecutor { } } + private int getJoinCount(JoinInfo joinInfo, String contractID) { + if (joinInfo == null) return resultCount; + if (joinInfo.joinCount.isJsonPrimitive() && joinInfo.joinCount.getAsJsonPrimitive().isNumber()) { + return joinInfo.joinCount.getAsJsonPrimitive().getAsInt(); + } + try { + ContractRequest cr = new ContractRequest(); + cr.setContractID(contractID); + cr.setAction(joinInfo.joinCount.getAsString()); + //TODO Arg需要好好设计一下。 + //TODO 又好用又简单的那种设计 + //TODO + cr.setArg(""); + String result = CMActions.manager.executeLocally(cr, null); + return JsonUtil.parseString(result).getAsJsonObject().get("result").getAsInt(); + } catch (Exception e) { + e.printStackTrace(); + return 1; + } + } + // 清理缓存的多点合约请求序号 public void clearCache() { final long time = System.currentTimeMillis() - 30000L; @@ -373,16 +378,15 @@ public class MultiPointCooperationExecutor implements ContractExecutor { JsonObject jo = finalResult.result.getAsJsonObject(); if (joinInfo != null && joinInfo.joinRule != null) { //TODO 不应该是double 类型 - switch (joinInfo.joinRule) { - case add: + case "add": double val = 0; for (String key : jo.keySet()) { val += jo.get(key).getAsDouble(); } finalResult.result = new JsonPrimitive(val); break; - case multiply: + case "multiply": val = 1; for (String key : jo.keySet()) { val *= jo.get(key).getAsDouble();