From dbd4953daad14eae3a07bd10c9e512ff0e3f3287 Mon Sep 17 00:00:00 2001 From: CaiHQ Date: Tue, 7 Dec 2021 21:09:05 +0800 Subject: [PATCH] support join annotation --- script/uploadToServer.sh | 3 + .../MultiPointCooperationExecutor.java | 89 ++++++++++--------- 2 files changed, 52 insertions(+), 40 deletions(-) create mode 100644 script/uploadToServer.sh diff --git a/script/uploadToServer.sh b/script/uploadToServer.sh new file mode 100644 index 0000000..5acecfd --- /dev/null +++ b/script/uploadToServer.sh @@ -0,0 +1,3 @@ +#/bin/bash + +scp -P 222 ./build/bdserver.zip dev@47.95.110.68:/data/public/releases/bdcontract/$1/ diff --git a/src/main/java/org/bdware/server/executor/unconsistency/MultiPointCooperationExecutor.java b/src/main/java/org/bdware/server/executor/unconsistency/MultiPointCooperationExecutor.java index c7ac257..7697fcf 100644 --- a/src/main/java/org/bdware/server/executor/unconsistency/MultiPointCooperationExecutor.java +++ b/src/main/java/org/bdware/server/executor/unconsistency/MultiPointCooperationExecutor.java @@ -8,10 +8,7 @@ import org.apache.logging.log4j.Logger; import org.bdware.sc.ComponedContractResult; import org.bdware.sc.ContractMeta; import org.bdware.sc.ContractResult; -import org.bdware.sc.bean.ContractExecType; -import org.bdware.sc.bean.ContractRequest; -import org.bdware.sc.bean.FunctionDesp; -import org.bdware.sc.bean.RouteInfo; +import org.bdware.sc.bean.*; import org.bdware.sc.conn.OnHashCallback; import org.bdware.sc.conn.ResultCallback; import org.bdware.sc.units.MultiContractMeta; @@ -64,12 +61,12 @@ public class MultiPointCooperationExecutor implements ContractExecutor { final ResultCallback originalCb, final int count, final int request_seq, - final String contractID) { - ComponedContractResult componedContractResult = new ComponedContractResult(count); + final String contractID, JoinInfo joinInfo) { // TODO 加对应的超时? + return new ResultCollector( requestID, - new MultiPointCooperationExecutor.ResultMerger(originalCb, count, request_seq, contractID), + new MultiPointCooperationExecutor.ResultMerger(originalCb, count, request_seq, contractID, joinInfo), count); // 把count改成了1,设置成获得1个响应就行 } @@ -89,39 +86,19 @@ public class MultiPointCooperationExecutor implements ContractExecutor { MasterServerTCPAction.requestCache.get(contractID).put(req.seq, sendStr); LOGGER.debug(JsonUtil.toJson(req)); - String[] nodes = getAccordingToRouteInfo(req); + LOGGER.info("node size = " + nodes.length); LOGGER.debug("nodes:" + JsonUtil.toJson(nodes)); for (String node : nodes) { - LOGGER.info(node); - LOGGER.info("[sendRequests] get cmNode " + node.substring(0, 5)); - if (!NetworkManager.instance.hasAgentConnection(node)) { - LOGGER.info("[sendRequests] get cmNode " + node.substring(0, 5) + " is null"); - collector.onResult( - "{\"status\":\"Error\",\"result\":\"node offline\"," - + "\"nodeID\":\"" - + node - + "\"," - + "\"action\":\"onExecuteContractTrustfully\"}"); - } else if (MasterServerRecoverMechAction.recoverStatus.get(node).get(contractID) - != RecoverFlag.Fine) { - collector.onResult( - "{\"status\":\"Error\",\"result\":\"node recovering\"," - + "\"nodeID\":\"" - + node - + "\"," - + "\"action\":\"onExecuteContractTrustfully\"}"); - NetworkManager.instance.sendToAgent(node, sendStr); - } else { - LOGGER.info( - "[sendRequests] get cmNode " - + node.substring(0, 5) - + " not null " - + "RequestAllExecutor 发送请求给 " - + node.substring(0, 5)); - NetworkManager.instance.sendToAgent(node, sendStr); - } + LOGGER.info( + "[sendRequests] get cmNode " + + node.substring(0, 5) + + " not null " + + "RequestAllExecutor 发送请求给 " + + node.substring(0, 5)); + NetworkManager.instance.sendToAgent(node, sendStr); + } } @@ -224,15 +201,16 @@ public class MultiPointCooperationExecutor implements ContractExecutor { CMActions.manager.statusRecorder.getContractMeta(req.getContractID()); FunctionDesp fun = meta.getExportedFunction(req.getAction()); ResultCallback collector; - // TODO @huyueyang,这个ResultCallback可能要根据@Join来。 + // TODO @fanbo 下面的count 1要改,应该是根据route的规则来。 + JoinInfo joinInfo = fun.joinInfo; if (fun.getRoute() != null && fun.getRoute().useDefault != null) { // 根据函数的类型决定不同的 collector collector = - createResultCallback(id, rc, 1, req.seq, req.getContractID()); // 初始化结果收集器 + createResultCallback(id, rc, 1, req.seq, req.getContractID(), joinInfo); // 初始化结果收集器 } else { collector = createResultCallback( - id, rc, resultCount, req.seq, req.getContractID()); // 初始化结果收集器 + id, rc, resultCount, req.seq, req.getContractID(), joinInfo); // 初始化结果收集器 } MasterServerTCPAction.sync.sleep(id, collector); @@ -271,18 +249,21 @@ public class MultiPointCooperationExecutor implements ContractExecutor { int request_seq; ResultCallback originalCallback; Set nodeIDs = new HashSet<>(); // 已收到返回结果的节点 + JoinInfo joinInfo; ResultMerger( final ResultCallback originalCb, final int count, final int request_seq, - final String contractID) { + final String contractID, + final JoinInfo joinInfo) { originalCallback = originalCb; this.count = count; this.request_seq = request_seq; this.contractID = contractID; componedContractResult = new ComponedContractResult(count); order = new AtomicInteger(0); + this.joinInfo = joinInfo; } public String getInfo() { @@ -327,6 +308,7 @@ public class MultiPointCooperationExecutor implements ContractExecutor { // 收集到所有结果 if (order.incrementAndGet() == count) { ContractResult finalResult = componedContractResult.mergeFinalResult(); + finalResult.needSeq = true; finalResult.seq = request_seq; @@ -340,6 +322,9 @@ public class MultiPointCooperationExecutor implements ContractExecutor { // originalCallback.onResult(new // Gson().toJson(finalResult)); // } else { + if (joinInfo != null) { + handleJoinInfo(finalResult, joinInfo); + } originalCallback.onResult(JsonUtil.toJson(finalResult)); // } LOGGER.info( @@ -383,5 +368,29 @@ public class MultiPointCooperationExecutor implements ContractExecutor { LOGGER.info("本次执行最终结果为有异常"); } } + + private void handleJoinInfo(ContractResult finalResult, JoinInfo joinInfo) { + JsonObject jo = finalResult.result.getAsJsonObject(); + if (joinInfo != null && joinInfo.joinRule != null) { + //TODO 不应该是double 类型 + + switch (joinInfo.joinRule) { + case add: + double val = 0; + for (String key : jo.keySet()) { + val += jo.get(key).getAsDouble(); + } + finalResult.result = new JsonPrimitive(val); + break; + case multiply: + val = 1; + for (String key : jo.keySet()) { + val *= jo.get(key).getAsDouble(); + } + finalResult.result = new JsonPrimitive(val); + break; + } + } + } } }