support join annotation

This commit is contained in:
CaiHQ 2021-12-07 21:09:05 +08:00
parent 802713a001
commit dbd4953daa
2 changed files with 52 additions and 40 deletions

3
script/uploadToServer.sh Normal file
View File

@ -0,0 +1,3 @@
#/bin/bash
scp -P 222 ./build/bdserver.zip dev@47.95.110.68:/data/public/releases/bdcontract/$1/

View File

@ -8,10 +8,7 @@ import org.apache.logging.log4j.Logger;
import org.bdware.sc.ComponedContractResult;
import org.bdware.sc.ContractMeta;
import org.bdware.sc.ContractResult;
import org.bdware.sc.bean.ContractExecType;
import org.bdware.sc.bean.ContractRequest;
import org.bdware.sc.bean.FunctionDesp;
import org.bdware.sc.bean.RouteInfo;
import org.bdware.sc.bean.*;
import org.bdware.sc.conn.OnHashCallback;
import org.bdware.sc.conn.ResultCallback;
import org.bdware.sc.units.MultiContractMeta;
@ -64,12 +61,12 @@ public class MultiPointCooperationExecutor implements ContractExecutor {
final ResultCallback originalCb,
final int count,
final int request_seq,
final String contractID) {
ComponedContractResult componedContractResult = new ComponedContractResult(count);
final String contractID, JoinInfo joinInfo) {
// TODO 加对应的超时
return new ResultCollector(
requestID,
new MultiPointCooperationExecutor.ResultMerger(originalCb, count, request_seq, contractID),
new MultiPointCooperationExecutor.ResultMerger(originalCb, count, request_seq, contractID, joinInfo),
count); // 把count改成了1设置成获得1个响应就行
}
@ -89,31 +86,11 @@ public class MultiPointCooperationExecutor implements ContractExecutor {
MasterServerTCPAction.requestCache.get(contractID).put(req.seq, sendStr);
LOGGER.debug(JsonUtil.toJson(req));
String[] nodes = getAccordingToRouteInfo(req);
LOGGER.info("node size = " + nodes.length);
LOGGER.debug("nodes:" + JsonUtil.toJson(nodes));
for (String node : nodes) {
LOGGER.info(node);
LOGGER.info("[sendRequests] get cmNode " + node.substring(0, 5));
if (!NetworkManager.instance.hasAgentConnection(node)) {
LOGGER.info("[sendRequests] get cmNode " + node.substring(0, 5) + " is null");
collector.onResult(
"{\"status\":\"Error\",\"result\":\"node offline\","
+ "\"nodeID\":\""
+ node
+ "\","
+ "\"action\":\"onExecuteContractTrustfully\"}");
} else if (MasterServerRecoverMechAction.recoverStatus.get(node).get(contractID)
!= RecoverFlag.Fine) {
collector.onResult(
"{\"status\":\"Error\",\"result\":\"node recovering\","
+ "\"nodeID\":\""
+ node
+ "\","
+ "\"action\":\"onExecuteContractTrustfully\"}");
NetworkManager.instance.sendToAgent(node, sendStr);
} else {
LOGGER.info(
"[sendRequests] get cmNode "
+ node.substring(0, 5)
@ -121,7 +98,7 @@ public class MultiPointCooperationExecutor implements ContractExecutor {
+ "RequestAllExecutor 发送请求给 "
+ node.substring(0, 5));
NetworkManager.instance.sendToAgent(node, sendStr);
}
}
}
@ -224,15 +201,16 @@ public class MultiPointCooperationExecutor implements ContractExecutor {
CMActions.manager.statusRecorder.getContractMeta(req.getContractID());
FunctionDesp fun = meta.getExportedFunction(req.getAction());
ResultCallback collector;
// TODO @huyueyang这个ResultCallback可能要根据@Join来
// TODO @fanbo 下面的count 1要改应该是根据route的规则来
JoinInfo joinInfo = fun.joinInfo;
if (fun.getRoute() != null && fun.getRoute().useDefault != null) {
// 根据函数的类型决定不同的 collector
collector =
createResultCallback(id, rc, 1, req.seq, req.getContractID()); // 初始化结果收集器
createResultCallback(id, rc, 1, req.seq, req.getContractID(), joinInfo); // 初始化结果收集器
} else {
collector =
createResultCallback(
id, rc, resultCount, req.seq, req.getContractID()); // 初始化结果收集器
id, rc, resultCount, req.seq, req.getContractID(), joinInfo); // 初始化结果收集器
}
MasterServerTCPAction.sync.sleep(id, collector);
@ -271,18 +249,21 @@ public class MultiPointCooperationExecutor implements ContractExecutor {
int request_seq;
ResultCallback originalCallback;
Set<String> nodeIDs = new HashSet<>(); // 已收到返回结果的节点
JoinInfo joinInfo;
ResultMerger(
final ResultCallback originalCb,
final int count,
final int request_seq,
final String contractID) {
final String contractID,
final JoinInfo joinInfo) {
originalCallback = originalCb;
this.count = count;
this.request_seq = request_seq;
this.contractID = contractID;
componedContractResult = new ComponedContractResult(count);
order = new AtomicInteger(0);
this.joinInfo = joinInfo;
}
public String getInfo() {
@ -327,6 +308,7 @@ public class MultiPointCooperationExecutor implements ContractExecutor {
// 收集到所有结果
if (order.incrementAndGet() == count) {
ContractResult finalResult = componedContractResult.mergeFinalResult();
finalResult.needSeq = true;
finalResult.seq = request_seq;
@ -340,6 +322,9 @@ public class MultiPointCooperationExecutor implements ContractExecutor {
// originalCallback.onResult(new
// Gson().toJson(finalResult));
// } else {
if (joinInfo != null) {
handleJoinInfo(finalResult, joinInfo);
}
originalCallback.onResult(JsonUtil.toJson(finalResult));
// }
LOGGER.info(
@ -383,5 +368,29 @@ public class MultiPointCooperationExecutor implements ContractExecutor {
LOGGER.info("本次执行最终结果为有异常");
}
}
private void handleJoinInfo(ContractResult finalResult, JoinInfo joinInfo) {
JsonObject jo = finalResult.result.getAsJsonObject();
if (joinInfo != null && joinInfo.joinRule != null) {
//TODO 不应该是double 类型
switch (joinInfo.joinRule) {
case add:
double val = 0;
for (String key : jo.keySet()) {
val += jo.get(key).getAsDouble();
}
finalResult.result = new JsonPrimitive(val);
break;
case multiply:
val = 1;
for (String key : jo.keySet()) {
val *= jo.get(key).getAsDouble();
}
finalResult.result = new JsonPrimitive(val);
break;
}
}
}
}
}