This commit is contained in:
CaiHQ 2022-05-19 11:06:06 +08:00
parent 579024ad56
commit e10866ff8b
2 changed files with 47 additions and 18 deletions

View File

@ -8,6 +8,7 @@ public abstract class AbstractContextContractExecutor implements ContractExecuto
static protected IGlobalConf globalConf = ConsistencyPluginManager.getContext().getGlobalConf();
static protected ICMActions cmActions = ConsistencyPluginManager.getContext().getCMActions();
static protected INetworkManager networkManager = ConsistencyPluginManager.getContext().getNetworkManager();
static protected IMasterClientTCPAction masterClientTCPAction = ConsistencyPluginManager.getContext().getMasterClientTCPAction();
static protected IMasterServerTCPAction masterServerTCPAction = ConsistencyPluginManager.getContext().getMasterServerTCPAction();
static protected IMasterServerRecoverMechAction masterServerRecoverMechAction = ConsistencyPluginManager.getContext().getMasterServerRecoverMechAction();
}

View File

@ -1,5 +1,6 @@
package org.bdware.consistency.plugin.sharding;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.google.gson.JsonPrimitive;
@ -7,6 +8,7 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.bdware.consistency.plugin.common.AbstractContextContractExecutor;
import org.bdware.sc.ComponedContractResult;
import org.bdware.sc.ContractClient;
import org.bdware.sc.ContractMeta;
import org.bdware.sc.ContractResult;
import org.bdware.sc.bean.*;
@ -20,10 +22,7 @@ import org.bdware.sc.util.JsonUtil;
import org.bdware.server.trustedmodel.MultiReqSeq;
import java.math.BigInteger;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
@ -66,13 +65,13 @@ public class MultiPointCooperationExecutor extends AbstractContextContractExecut
count); // 把count改成了1设置成获得1个响应就行
}
public void sendRequest(String id, ContractRequest req, String[] nodes) {
Map<String, Object> reqStr = new HashMap<>();
reqStr.put("uniReqID", id);
reqStr.put("data", req);
public void sendRequest(String id, ContractRequest req, String[] nodes, ResultCallback rc) {
req.needSeq = false;
reqStr.put("action", "executeContractLocally");
String sendStr = JsonUtil.toJson(reqStr);
JsonObject jo = new JsonObject();
jo.addProperty("uniReqID", id);
jo.add("data", JsonUtil.parseObject(req));
jo.addProperty("action", "executeContractLocally");
String sendStr = jo.toString();
// master负责缓存请求
if (!masterServerTCPAction.getReqCache().containsKey(contractID)) {
masterServerTCPAction.getReqCache().put(contractID, new RequestCache());
@ -83,13 +82,17 @@ public class MultiPointCooperationExecutor extends AbstractContextContractExecut
LOGGER.info("node size = " + nodes.length);
LOGGER.debug("nodes:" + JsonUtil.toJson(nodes));
for (String node : nodes) {
LOGGER.info(
"[sendRequests] get cmNode "
+ node.substring(0, 5)
+ " not null "
+ "RequestAllExecutor 发送请求给 "
+ node.substring(0, 5));
networkManager.sendToAgent(node, sendStr);
if (node.equals(globalConf.getNodeID())) {
masterClientTCPAction.asyncExecuteContractLocally(jo, rc);
} else {
LOGGER.info(
"[sendRequests] get cmNode "
+ node.substring(0, 5)
+ " not null "
+ "RequestAllExecutor 发送请求给 "
+ node.substring(0, 5));
networkManager.sendToAgent(node, sendStr);
}
}
}
@ -123,6 +126,31 @@ public class MultiPointCooperationExecutor extends AbstractContextContractExecut
val = val + members.length;
}
return new String[]{members[val]};
case byFunc:
ContractClient client = cmActions.getManager().getClient(req.getContractID());
JsonArray membersArr = new JsonArray(members.length);
for (String member : members) {
membersArr.add(member);
}
JsonObject arg = new JsonObject();
arg.addProperty("funcName", routeInfo.funcName);
// func myFunc (currentNode, members, membersCount, sourceArg)
JsonArray funcArgs = new JsonArray();
funcArgs.add(globalConf.getNodeID());
funcArgs.add(membersArr);
funcArgs.add(membersArr.size());
funcArgs.add(req.getArg());
arg.add("funcArgs", funcArgs);
String routeResultStr = client.executeMethod("", "invokeFunctionWithoutLimit", arg.toString());
JsonObject routeResult = JsonUtil.parseString(routeResultStr).getAsJsonObject();
List<String> nodes = new ArrayList<>();
for (String key: routeResult.keySet()) {
nodes.add(routeResult.get(key).getAsString());
}
return nodes.toArray(new String[]{});
default:
return members;
}
@ -194,7 +222,7 @@ public class MultiPointCooperationExecutor extends AbstractContextContractExecut
createResultCallback(id, rc, count, req.seq, req.getContractID(), joinInfo); // 初始化结果收集器
masterServerTCPAction.getSync().sleep(id, collector);
LOGGER.info("requestID=" + requestID + " master broadcasts request " + req.seq);
sendRequest(id, req, nodes); // 发送请求
sendRequest(id, req, nodes, collector); // 发送请求
} else {
LOGGER.info("invalidNodeNumOnResult");
request_index.getAndDecrement();