add join support

This commit is contained in:
CaiHQ 2022-06-28 19:49:54 +08:00
parent bfd389f30e
commit 5e45904ceb

View File

@ -1,9 +1,6 @@
package org.bdware.consistency.plugin.sharding; package org.bdware.consistency.plugin.sharding;
import com.google.gson.JsonArray; import com.google.gson.*;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.google.gson.JsonPrimitive;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.bdware.consistency.plugin.common.AbstractContextContractExecutor; import org.bdware.consistency.plugin.common.AbstractContextContractExecutor;
@ -22,7 +19,9 @@ import org.bdware.sc.util.JsonUtil;
import org.bdware.server.trustedmodel.MultiReqSeq; import org.bdware.server.trustedmodel.MultiReqSeq;
import java.math.BigInteger; import java.math.BigInteger;
import java.util.*; import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@ -52,17 +51,9 @@ public class MultiPointCooperationExecutor extends AbstractContextContractExecut
request_index = new AtomicInteger(seq); request_index = new AtomicInteger(seq);
} }
public ResultCallback createResultCallback( public ResultCallback createResultCallback(final String requestID, final ResultCallback originalCb, final int count, final int request_seq, final String contractID, JoinInfo joinInfo) {
final String requestID,
final ResultCallback originalCb,
final int count,
final int request_seq,
final String contractID, JoinInfo joinInfo) {
// TODO 加对应的超时 // TODO 加对应的超时
return networkManager.createResultCallback( return networkManager.createResultCallback(requestID, new MultiPointCooperationExecutor.ResultMerger(originalCb, count, request_seq, contractID, joinInfo), count); // 把count改成了1设置成获得1个响应就行
requestID,
new MultiPointCooperationExecutor.ResultMerger(originalCb, count, request_seq, contractID, joinInfo),
count); // 把count改成了1设置成获得1个响应就行
} }
public void sendRequest(String id, ContractRequest req, String[] nodes, ResultCallback rc) { public void sendRequest(String id, ContractRequest req, String[] nodes, ResultCallback rc) {
@ -85,26 +76,36 @@ public class MultiPointCooperationExecutor extends AbstractContextContractExecut
if (node.equals(globalConf.getNodeID())) { if (node.equals(globalConf.getNodeID())) {
masterClientTCPAction.asyncExecuteContractLocally(jo, rc); masterClientTCPAction.asyncExecuteContractLocally(jo, rc);
} else { } else {
LOGGER.info( LOGGER.info("[sendRequests] get cmNode " + node.substring(0, 5) + " not null " + "RequestAllExecutor 发送请求给 " + node.substring(0, 5));
"[sendRequests] get cmNode "
+ node.substring(0, 5)
+ " not null "
+ "RequestAllExecutor 发送请求给 "
+ node.substring(0, 5));
networkManager.sendToAgent(node, sendStr); networkManager.sendToAgent(node, sendStr);
} }
} }
} }
public static <T> T invokeFunctionWithoutLimit(String contractID, String funcName, Class<T> t, JsonElement... args) {
ContractClient client = cmActions.getManager().getClient(contractID);
JsonObject arg = new JsonObject();
arg.addProperty("funcName", funcName);
JsonArray funcArgs = new JsonArray();
if (args != null && args.length > 0) for (JsonElement je : args)
funcArgs.add(je);
arg.add("funcArgs", funcArgs);
String routeResultStr = client.executeMethod("", "invokeFunctionWithoutLimit", arg.toString());
T routeResult = JsonUtil.fromJson(routeResultStr, t);
return routeResult;
}
private String[] getAccordingToRouteInfo(RouteInfo routeInfo, ContractRequest req, String[] members) { private String[] getAccordingToRouteInfo(RouteInfo routeInfo, ContractRequest req, String[] members) {
try { try {
int val; int val;
if (routeInfo.useDefault == null) {
// func myFunc (requester, sourceArg)
return invokeFunctionWithoutLimit(req.getContractID(), routeInfo.funcName, String[].class, new JsonPrimitive(req.getRequester()), req.getArg());
}
switch (routeInfo.useDefault) { switch (routeInfo.useDefault) {
case byRequester: case byRequester:
val = val = new BigInteger(req.getRequester(), 16).mod(new BigInteger("" + members.length)).intValue();
new BigInteger(req.getRequester(), 16)
.mod(new BigInteger("" + members.length))
.intValue();
while (val < 0) { while (val < 0) {
val = val + members.length; val = val + members.length;
} }
@ -118,26 +119,11 @@ public class MultiPointCooperationExecutor extends AbstractContextContractExecut
return new String[]{members[val]}; return new String[]{members[val]};
case byTarget: case byTarget:
JsonObject jo = req.getArg().getAsJsonObject(); JsonObject jo = req.getArg().getAsJsonObject();
val = val = new BigInteger(jo.get("target").getAsString(), 16).mod(new BigInteger("" + members.length)).intValue();
new BigInteger(jo.get("target").getAsString(), 16)
.mod(new BigInteger("" + members.length))
.intValue();
while (val < 0) { while (val < 0) {
val = val + members.length; val = val + members.length;
} }
return new String[]{members[val]}; return new String[]{members[val]};
case byFunc:
ContractClient client = cmActions.getManager().getClient(req.getContractID());
JsonObject arg = new JsonObject();
arg.addProperty("funcName", routeInfo.funcName);
// func myFunc (requester, sourceArg)
JsonArray funcArgs = new JsonArray();
funcArgs.add(req.getRequester());
funcArgs.add(req.getArg());
arg.add("funcArgs", funcArgs);
String routeResultStr = client.executeMethod("", "invokeFunctionWithoutLimit", arg.toString());
String[] routeResult = JsonUtil.fromJson(routeResultStr, String[].class);
return routeResult;
default: default:
return members; return members;
} }
@ -183,21 +169,18 @@ public class MultiPointCooperationExecutor extends AbstractContextContractExecut
req.seq = request_index.getAndIncrement(); req.seq = request_index.getAndIncrement();
} }
req.needSeq = true; req.needSeq = true;
String id = String id = System.currentTimeMillis() + "_" + (int) (Math.random() * 1000000) + "_" + req.seq;
System.currentTimeMillis() + "_" + (int) (Math.random() * 1000000) + "_" + req.seq;
LOGGER.info("execute receive requestID= " + requestID + " msgID=" + id); LOGGER.info("execute receive requestID= " + requestID + " msgID=" + id);
if (checkCurNodeNumValid()) { // 校验成功 current node num 合法 if (checkCurNodeNumValid()) { // 校验成功 current node num 合法
LOGGER.info("checkCurNodeNumValid true"); LOGGER.info("checkCurNodeNumValid true");
ContractMeta meta = ContractMeta meta = cmActions.getManager().statusRecorder.getContractMeta(req.getContractID());
cmActions.getManager().statusRecorder.getContractMeta(req.getContractID());
FunctionDesp fun = meta.getExportedFunction(req.getAction()); FunctionDesp fun = meta.getExportedFunction(req.getAction());
ResultCallback collector; ResultCallback collector;
// TODO @fanbo 下面的count 1要改应该是根据route的规则来
//Count 根据join规则来 //Count 根据join规则来
//nodes 根据route规则来 //nodes 根据route规则来
JoinInfo joinInfo = fun.joinInfo; JoinInfo joinInfo = fun.joinInfo;
RouteInfo routeInfo = fun.routeInfo; RouteInfo routeInfo = fun.routeInfo;
int count = getJoinCount(joinInfo, contractID); int count = getJoinCount(joinInfo, contractID, req);
LOGGER.info("requestID=" + requestID + " join Count: " + count); LOGGER.info("requestID=" + requestID + " join Count: " + count);
String[] members = multiMeta.getMembers(); String[] members = multiMeta.getMembers();
@ -205,48 +188,38 @@ public class MultiPointCooperationExecutor extends AbstractContextContractExecut
if (nodes.length < count) { if (nodes.length < count) {
count = nodes.length; count = nodes.length;
} }
collector = collector = createResultCallback(id, rc, count, req.seq, req.getContractID(), joinInfo); // 初始化结果收集器
createResultCallback(id, rc, count, req.seq, req.getContractID(), joinInfo); // 初始化结果收集器
masterServerTCPAction.getSync().sleep(id, collector); masterServerTCPAction.getSync().sleep(id, collector);
LOGGER.info("requestID=" + requestID + " master broadcasts request " + req.seq); LOGGER.info("requestID=" + requestID + " master broadcasts request " + req.seq);
sendRequest(id, req, nodes, collector); // 发送请求 sendRequest(id, req, nodes, collector); // 发送请求
} else { } else {
LOGGER.info("invalidNodeNumOnResult"); LOGGER.info("invalidNodeNumOnResult");
request_index.getAndDecrement(); request_index.getAndDecrement();
ContractResult finalResult = ContractResult finalResult = new ContractResult(ContractResult.Status.Error, new JsonPrimitive("node number unavailbale,request refused."));
new ContractResult(
ContractResult.Status.Error,
new JsonPrimitive("node number unavailbale,request refused."));
rc.onResult(JsonUtil.toJson(finalResult)); rc.onResult(JsonUtil.toJson(finalResult));
} }
} }
private int getJoinCount(JoinInfo joinInfo, String contractID) { private int getJoinCount(JoinInfo joinInfo, String contractID, ContractRequest contractRequest) {
if (joinInfo == null) return resultCount;
if (joinInfo != null) return joinInfo.joinCount;
try { try {
ContractRequest cr = new ContractRequest(); if (joinInfo == null) return resultCount;
cr.setContractID(contractID); if (joinInfo.joinCountFuncName != null) {
cr.setAction("TODO"); Integer count = invokeFunctionWithoutLimit(contractID, joinInfo.joinCountFuncName, Integer.class, new JsonPrimitive(contractRequest.getRequester()), contractRequest.getArg());
//TODO Arg需要好好设计一下 return count;
//TODO 又好用又简单的那种设计
//TODO }
cr.setArg(""); if (joinInfo.joinCount != 0) return joinInfo.joinCount;
String result = cmActions.getManager().executeLocally(cr, null);
return JsonUtil.parseString(result).getAsJsonObject().get("result").getAsInt();
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
return 1;
} }
return resultCount;
} }
// 清理缓存的多点合约请求序号 // 清理缓存的多点合约请求序号
public void clearCache() { public void clearCache() {
final long time = System.currentTimeMillis() - 30000L; final long time = System.currentTimeMillis() - 30000L;
seqMap.entrySet() seqMap.entrySet().removeIf(entry -> {
.removeIf(
entry -> {
MultiReqSeq cache = entry.getValue(); MultiReqSeq cache = entry.getValue();
if (null == cache) { if (null == cache) {
return true; return true;
@ -265,12 +238,7 @@ public class MultiPointCooperationExecutor extends AbstractContextContractExecut
Set<String> nodeIDs = new HashSet<>(); // 已收到返回结果的节点 Set<String> nodeIDs = new HashSet<>(); // 已收到返回结果的节点
JoinInfo joinInfo; JoinInfo joinInfo;
ResultMerger( ResultMerger(final ResultCallback originalCb, final int count, final int request_seq, final String contractID, final JoinInfo joinInfo) {
final ResultCallback originalCb,
final int count,
final int request_seq,
final String contractID,
final JoinInfo joinInfo) {
originalCallback = originalCb; originalCallback = originalCb;
this.count = count; this.count = count;
this.request_seq = request_seq; this.request_seq = request_seq;
@ -281,16 +249,7 @@ public class MultiPointCooperationExecutor extends AbstractContextContractExecut
} }
public String getInfo() { public String getInfo() {
return "contractID=" return "contractID=" + contractID + " 收到第 " + order + " 个节点回复 : " + " order=" + order + " count=" + count + " ";
+ contractID
+ " 收到第 "
+ order
+ " 个节点回复 : "
+ " order="
+ order
+ " count="
+ count
+ " ";
} }
@Override @Override
@ -307,17 +266,7 @@ public class MultiPointCooperationExecutor extends AbstractContextContractExecut
return; return;
} }
nodeIDs.add(id); nodeIDs.add(id);
LOGGER.info( LOGGER.info("contractID=" + contractID + " 收到第 " + order + " 个节点回复 : " + str + " order=" + order + " count=" + count);
"contractID="
+ contractID
+ " 收到第 "
+ order
+ " 个节点回复 : "
+ str
+ " order="
+ order
+ " count="
+ count);
componedContractResult.add(obj); componedContractResult.add(obj);
// 收集到所有结果 // 收集到所有结果
if (order.incrementAndGet() == count) { if (order.incrementAndGet() == count) {
@ -325,7 +274,6 @@ public class MultiPointCooperationExecutor extends AbstractContextContractExecut
finalResult.needSeq = true; finalResult.needSeq = true;
finalResult.seq = request_seq; finalResult.seq = request_seq;
// if (null == finalResult) { // if (null == finalResult) {
// finalResult = // finalResult =
// new ContractResult( // new ContractResult(
@ -341,15 +289,11 @@ public class MultiPointCooperationExecutor extends AbstractContextContractExecut
} }
originalCallback.onResult(JsonUtil.toJson(finalResult)); originalCallback.onResult(JsonUtil.toJson(finalResult));
// } // }
LOGGER.info( LOGGER.info("本次执行最终结果为 " + finalResult.size + "个节点合并的,结果为 " + finalResult.result);
"本次执行最终结果为 " + finalResult.size + "个节点合并的,结果为 " + finalResult.result);
// 集群中事务序号+1 // 集群中事务序号+1
// MasterServerTCPAction.contractID2Members.get(contractID).nextSeq(); // MasterServerTCPAction.contractID2Members.get(contractID).nextSeq();
cmActions.getManager() cmActions.getManager().multiContractRecorder.getMultiContractMeta(contractID).nextSeqAtMaster();
.multiContractRecorder
.getMultiContractMeta(contractID)
.nextSeqAtMaster();
// recover其中无状态合约CP出错无需恢复 // recover其中无状态合约CP出错无需恢复
Set<String> nodesID = componedContractResult.getProblemNodes(); Set<String> nodesID = componedContractResult.getProblemNodes();
if (null == nodesID || nodesID.isEmpty()) { if (null == nodesID || nodesID.isEmpty()) {
@ -357,22 +301,17 @@ public class MultiPointCooperationExecutor extends AbstractContextContractExecut
} }
for (String nodeID : nodesID) { for (String nodeID : nodesID) {
LOGGER.info("结果出现问题的节点有:" + nodeID); LOGGER.info("结果出现问题的节点有:" + nodeID);
if (masterServerRecoverMechAction.getRecoverStatusMap().get(nodeID).get(contractID) if (masterServerRecoverMechAction.getRecoverStatusMap().get(nodeID).get(contractID) == RecoverFlag.Fine) {
== RecoverFlag.Fine) { masterServerRecoverMechAction.getRecoverStatusMap().get(nodeID).put(contractID, RecoverFlag.ToRecover);
masterServerRecoverMechAction.getRecoverStatusMap()
.get(nodeID)
.put(contractID, RecoverFlag.ToRecover);
} }
} }
for (String nodeID : nodesID) { for (String nodeID : nodesID) {
if (masterServerRecoverMechAction.getRecoverStatusMap().get(nodeID).get(contractID) if (masterServerRecoverMechAction.getRecoverStatusMap().get(nodeID).get(contractID) == RecoverFlag.ToRecover) {
== RecoverFlag.ToRecover) {
LOGGER.info("问题节点开始恢复:" + nodeID); LOGGER.info("问题节点开始恢复:" + nodeID);
// 因为该节点结果有误所以即时是stableMode也认为trans记录不可信 // 因为该节点结果有误所以即时是stableMode也认为trans记录不可信
// 直接通过load别的节点来恢复 // 直接通过load别的节点来恢复
masterServerRecoverMechAction.restartContractFromCommonMode( masterServerRecoverMechAction.restartContractFromCommonMode(nodeID, contractID);
nodeID, contractID);
} }
} }
} }
@ -385,24 +324,35 @@ public class MultiPointCooperationExecutor extends AbstractContextContractExecut
private void handleJoinInfo(ContractResult finalResult, JoinInfo joinInfo) { private void handleJoinInfo(ContractResult finalResult, JoinInfo joinInfo) {
JsonObject jo = finalResult.result.getAsJsonObject(); JsonObject jo = finalResult.result.getAsJsonObject();
if (joinInfo != null && joinInfo.joinRule != null) { try {
//TODO 不应该是double 类型 if (joinInfo != null) {
switch (joinInfo.joinRule) { if (joinInfo.useDefault == null) {
case "add": if (joinInfo.joinFuncName != null) {
JsonElement ret = MultiPointCooperationExecutor.invokeFunctionWithoutLimit(contractID, joinInfo.joinFuncName, JsonElement.class, jo);
finalResult.result = ret;
return;
}
}
switch (joinInfo.useDefault) {
case add:
double val = 0; double val = 0;
for (String key : jo.keySet()) { for (String key : jo.keySet()) {
val += jo.get(key).getAsDouble(); val += jo.get(key).getAsDouble();
} }
finalResult.result = new JsonPrimitive(val); finalResult.result = new JsonPrimitive(val);
break; break;
case "multiply": case multiply:
val = 1; val = 1;
for (String key : jo.keySet()) { for (String key : jo.keySet()) {
val *= jo.get(key).getAsDouble(); val *= jo.get(key).getAsDouble();
} }
finalResult.result = new JsonPrimitive(val); finalResult.result = new JsonPrimitive(val);
break; break;
} default:
}
}
} catch (Exception e) {
e.printStackTrace();
} }
} }
} }