update join info

support joinCount function
This commit is contained in:
CaiHQ 2021-12-17 12:53:48 +08:00
parent 0ea97d17cb
commit db5e8451b0

View File

@ -43,6 +43,7 @@ public class MultiPointCooperationExecutor implements ContractExecutor {
Map<String, MultiReqSeq> seqMap = new ConcurrentHashMap<>(); Map<String, MultiReqSeq> seqMap = new ConcurrentHashMap<>();
Map<String, ResultCache> resultCache = new ConcurrentHashMap<>(); Map<String, ResultCache> resultCache = new ConcurrentHashMap<>();
// MultiPointContractInfo info; // MultiPointContractInfo info;
MultiContractMeta multiMeta;
String contractID; String contractID;
public MultiPointCooperationExecutor(ContractExecType t, int c, String con_id) { public MultiPointCooperationExecutor(ContractExecType t, int c, String con_id) {
@ -50,6 +51,7 @@ public class MultiPointCooperationExecutor implements ContractExecutor {
type = t; type = t;
resultCount = c; resultCount = c;
contractID = con_id; contractID = con_id;
multiMeta = CMActions.manager.multiContractRecorder.getMultiContractMeta(contractID);
} }
public void setSeq(int seq) { public void setSeq(int seq) {
@ -63,31 +65,26 @@ public class MultiPointCooperationExecutor implements ContractExecutor {
final int request_seq, final int request_seq,
final String contractID, JoinInfo joinInfo) { final String contractID, JoinInfo joinInfo) {
// TODO 加对应的超时 // TODO 加对应的超时
return new ResultCollector( return new ResultCollector(
requestID, requestID,
new MultiPointCooperationExecutor.ResultMerger(originalCb, count, request_seq, contractID, joinInfo), new MultiPointCooperationExecutor.ResultMerger(originalCb, count, request_seq, contractID, joinInfo),
count); // 把count改成了1设置成获得1个响应就行 count); // 把count改成了1设置成获得1个响应就行
} }
public void sendRequest(String id, ContractRequest req, ResultCallback collector) { public void sendRequest(String id, ContractRequest req, String[] nodes) {
Map<String, Object> reqStr = new HashMap<>(); Map<String, Object> reqStr = new HashMap<>();
reqStr.put("uniReqID", id); reqStr.put("uniReqID", id);
reqStr.put("data", req); reqStr.put("data", req);
req.needSeq = false; req.needSeq = false;
reqStr.put("action", "executeContractLocally"); reqStr.put("action", "executeContractLocally");
String sendStr = JsonUtil.toJson(reqStr); String sendStr = JsonUtil.toJson(reqStr);
// master负责缓存请求 // master负责缓存请求
if (!MasterServerTCPAction.requestCache.containsKey(contractID)) { if (!MasterServerTCPAction.requestCache.containsKey(contractID)) {
MasterServerTCPAction.requestCache.put(contractID, new RequestCache()); MasterServerTCPAction.requestCache.put(contractID, new RequestCache());
} }
// TODO 多调多统一个seq的有多个请求这个需要改 // TODO 多调多统一个seq的有多个请求这个需要改
MasterServerTCPAction.requestCache.get(contractID).put(req.seq, sendStr); MasterServerTCPAction.requestCache.get(contractID).put(req.seq, sendStr);
LOGGER.debug(JsonUtil.toJson(req)); LOGGER.debug(JsonUtil.toJson(req));
String[] nodes = getAccordingToRouteInfo(req);
LOGGER.info("node size = " + nodes.length); LOGGER.info("node size = " + nodes.length);
LOGGER.debug("nodes:" + JsonUtil.toJson(nodes)); LOGGER.debug("nodes:" + JsonUtil.toJson(nodes));
for (String node : nodes) { for (String node : nodes) {
@ -98,20 +95,11 @@ public class MultiPointCooperationExecutor implements ContractExecutor {
+ "RequestAllExecutor 发送请求给 " + "RequestAllExecutor 发送请求给 "
+ node.substring(0, 5)); + node.substring(0, 5));
NetworkManager.instance.sendToAgent(node, sendStr); NetworkManager.instance.sendToAgent(node, sendStr);
} }
} }
private String[] getAccordingToRouteInfo(ContractRequest req) { private String[] getAccordingToRouteInfo(RouteInfo routeInfo, ContractRequest req, String[] members) {
MultiContractMeta multiContractMeta =
CMActions.manager.multiContractRecorder.getMultiContractMeta(contractID);
String[] members = multiContractMeta.getMembers();
try { try {
ContractMeta meta = CMActions.manager.statusRecorder.getContractMeta(contractID);
FunctionDesp fun = meta.getExportedFunction(req.getAction());
RouteInfo routeInfo = fun.getRoute();
int val; int val;
switch (routeInfo.useDefault) { switch (routeInfo.useDefault) {
case byRequester: case byRequester:
@ -150,11 +138,7 @@ public class MultiPointCooperationExecutor implements ContractExecutor {
public boolean checkCurNodeNumValid() { public boolean checkCurNodeNumValid() {
LOGGER.info("checkCurNodeNumValid"); LOGGER.info("checkCurNodeNumValid");
String[] nodes = String[] nodes = multiMeta.getMembers();
CMActions.manager
.multiContractRecorder
.getMultiContractMeta(contractID)
.getMembers();
// List<String> nodes = info.members; // List<String> nodes = info.members;
int validNode = 0; int validNode = 0;
for (String node : nodes) { for (String node : nodes) {
@ -175,9 +159,7 @@ public class MultiPointCooperationExecutor implements ContractExecutor {
LOGGER.info("[MultiPointCooperationExecutor] execute " + JsonUtil.toJson(req)); LOGGER.info("[MultiPointCooperationExecutor] execute " + JsonUtil.toJson(req));
// 获得action 函数名 // 获得action 函数名
LOGGER.info("action is : " + req.getAction()); LOGGER.info("action is : " + req.getAction());
req.setContractID(CMActions.manager.getContractIDByName(req.getContractID())); req.setContractID(CMActions.manager.getContractIDByName(req.getContractID()));
if (requestID != null && requestID.endsWith("_mul")) { if (requestID != null && requestID.endsWith("_mul")) {
synchronized (lock) { synchronized (lock) {
if (seqMap.containsKey(requestID)) { if (seqMap.containsKey(requestID)) {
@ -194,7 +176,6 @@ public class MultiPointCooperationExecutor implements ContractExecutor {
String id = String id =
System.currentTimeMillis() + "_" + (int) (Math.random() * 1000000) + "_" + req.seq; System.currentTimeMillis() + "_" + (int) (Math.random() * 1000000) + "_" + req.seq;
LOGGER.info("execute receive requestID= " + requestID + " msgID=" + id); LOGGER.info("execute receive requestID= " + requestID + " msgID=" + id);
if (checkCurNodeNumValid()) { // 校验成功 current node num 合法 if (checkCurNodeNumValid()) { // 校验成功 current node num 合法
LOGGER.info("checkCurNodeNumValid true"); LOGGER.info("checkCurNodeNumValid true");
ContractMeta meta = ContractMeta meta =
@ -202,20 +183,23 @@ public class MultiPointCooperationExecutor implements ContractExecutor {
FunctionDesp fun = meta.getExportedFunction(req.getAction()); FunctionDesp fun = meta.getExportedFunction(req.getAction());
ResultCallback collector; ResultCallback collector;
// TODO @fanbo 下面的count 1要改应该是根据route的规则来 // TODO @fanbo 下面的count 1要改应该是根据route的规则来
//Count 根据join规则来
//nodes 根据route规则来
JoinInfo joinInfo = fun.joinInfo; JoinInfo joinInfo = fun.joinInfo;
if (fun.getRoute() != null && fun.getRoute().useDefault != null) { RouteInfo routeInfo = fun.routeInfo;
// 根据函数的类型决定不同的 collector int count = getJoinCount(joinInfo, contractID);
collector = LOGGER.info("requestID=" + requestID + " join Count: " + count);
createResultCallback(id, rc, 1, req.seq, req.getContractID(), joinInfo); // 初始化结果收集器
} else {
collector =
createResultCallback(
id, rc, resultCount, req.seq, req.getContractID(), joinInfo); // 初始化结果收集器
}
String[] members = multiMeta.getMembers();
String[] nodes = getAccordingToRouteInfo(routeInfo, req, members);
if (nodes.length < count) {
count = nodes.length;
}
collector =
createResultCallback(id, rc, count, req.seq, req.getContractID(), joinInfo); // 初始化结果收集器
MasterServerTCPAction.sync.sleep(id, collector); MasterServerTCPAction.sync.sleep(id, collector);
LOGGER.info("requestID=" + requestID + " master broadcasts request " + req.seq); LOGGER.info("requestID=" + requestID + " master broadcasts request " + req.seq);
sendRequest(id, req, collector); // 发送请求 sendRequest(id, req, nodes); // 发送请求
} else { } else {
LOGGER.info("invalidNodeNumOnResult"); LOGGER.info("invalidNodeNumOnResult");
request_index.getAndDecrement(); request_index.getAndDecrement();
@ -227,6 +211,27 @@ public class MultiPointCooperationExecutor implements ContractExecutor {
} }
} }
private int getJoinCount(JoinInfo joinInfo, String contractID) {
if (joinInfo == null) return resultCount;
if (joinInfo.joinCount.isJsonPrimitive() && joinInfo.joinCount.getAsJsonPrimitive().isNumber()) {
return joinInfo.joinCount.getAsJsonPrimitive().getAsInt();
}
try {
ContractRequest cr = new ContractRequest();
cr.setContractID(contractID);
cr.setAction(joinInfo.joinCount.getAsString());
//TODO Arg需要好好设计一下
//TODO 又好用又简单的那种设计
//TODO
cr.setArg("");
String result = CMActions.manager.executeLocally(cr, null);
return JsonUtil.parseString(result).getAsJsonObject().get("result").getAsInt();
} catch (Exception e) {
e.printStackTrace();
return 1;
}
}
// 清理缓存的多点合约请求序号 // 清理缓存的多点合约请求序号
public void clearCache() { public void clearCache() {
final long time = System.currentTimeMillis() - 30000L; final long time = System.currentTimeMillis() - 30000L;
@ -373,16 +378,15 @@ public class MultiPointCooperationExecutor implements ContractExecutor {
JsonObject jo = finalResult.result.getAsJsonObject(); JsonObject jo = finalResult.result.getAsJsonObject();
if (joinInfo != null && joinInfo.joinRule != null) { if (joinInfo != null && joinInfo.joinRule != null) {
//TODO 不应该是double 类型 //TODO 不应该是double 类型
switch (joinInfo.joinRule) { switch (joinInfo.joinRule) {
case add: case "add":
double val = 0; double val = 0;
for (String key : jo.keySet()) { for (String key : jo.keySet()) {
val += jo.get(key).getAsDouble(); val += jo.get(key).getAsDouble();
} }
finalResult.result = new JsonPrimitive(val); finalResult.result = new JsonPrimitive(val);
break; break;
case multiply: case "multiply":
val = 1; val = 1;
for (String key : jo.keySet()) { for (String key : jo.keySet()) {
val *= jo.get(key).getAsDouble(); val *= jo.get(key).getAsDouble();